Spring Boot集成RabbitMQ 进阶版(手动消费)

SpringBoot workingTime 885℃ 0评论

u=2515280859,1232422357-fm=27-gp=0

关于生产和消费的1对多,多对一关系,大家可以看这篇文章,写的还是挺全的,我这个就不CTRL+C/CTRL+V了,文章超时空链接

今天我这里主要写一下,关于手动消费

首先,我们需要删除掉HelloReceiver类,不知道这是啥的请看上一篇文章

之后修改一下我们的单元测试类,将多增加一些生产者


import org.junit.Test;
import org.junit.runner.RunWith;
import org.rabbitmq.Application;
import org.rabbitmq.config.TopicRabbitConfig;
import org.rabbitmq.service.HelloSender;
import org.springframework.amqp.core.AmqpTemplate;
import org.springframework.amqp.core.Message;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.test.context.SpringBootTest;
import org.springframework.test.context.junit4.SpringJUnit4ClassRunner;
import org.springframework.test.context.web.WebAppConfiguration;

import java.util.Objects;

/**
 * @author : R&M www.rmworking.com/blog
 *         2018/6/4 15:45
 *         spring-boot-demo
 *         org.rabbitmq.controller
 */
// 这是JUnit的注解,通过这个注解让SpringJUnit4ClassRunner这个类提供Spring测试上下文。
@RunWith(SpringJUnit4ClassRunner.class)
// 这是Spring Boot注解,为了进行集成测试,需要通过这个注解加载和配置Spring应用上下
@SpringBootTest(classes = Application.class)
@WebAppConfiguration
public class RabbitMqHelloTest {

    @Autowired
    private HelloSender helloSender;

    @Autowired
    private AmqpTemplate rabbitTemplate;

    @Test
    public void hello() throws Exception {
        for (int i = 0; i < 5; i++) {
            helloSender.send();
        }
    }

    @Test
    public void Receiver() throws Exception {
        Message message = rabbitTemplate.receive(TopicRabbitConfig.message);
        if (Objects.equals(null, message)) {
            System.out.println("无数据消费~!");
        } else {
            String result = message.getBody()!=null ?new String(message.getBody()) : "";
            System.out.println("消费第【" + message.getMessageProperties().getMessageCount() + "】条数据,消费记录:" + result);
        }
    }

}

Receiver()方法,就是调用消费,执行一次消费一条数据。

TopicRabbitConfig类代码如下:


import org.springframework.amqp.core.Binding;
import org.springframework.amqp.core.BindingBuilder;
import org.springframework.amqp.core.Queue;
import org.springframework.amqp.core.TopicExchange;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;

/**
 * @author : R&M www.rmworking.com/blog
 *         2018/6/5 10:47
 *         spring-boot-demo
 *         org.rabbitmq.config
 */
@Configuration
public class TopicRabbitConfig {

    public final static String message = "topic.message";
    public final static String messages = "topic.messages";

    @Bean
    public Queue queueMessage() {
        return new Queue(TopicRabbitConfig.message);
    }

    @Bean
    public Queue queueMessages() {
        return new Queue(TopicRabbitConfig.messages);
    }

    @Bean
    TopicExchange exchange() {
        return new TopicExchange("exchange");
    }

    @Bean
    Binding bindingExchangeMessage(Queue queueMessage, TopicExchange exchange) {
        return BindingBuilder.bind(queueMessage).to(exchange).with("topic.message");
    }

    @Bean
    Binding bindingExchangeMessages(Queue queueMessages, TopicExchange exchange) {
        return BindingBuilder.bind(queueMessages).to(exchange).with("topic.#");
    }
}

Application代码如下:


@SpringBootApplication
public class Application {

    public static void main(String[] args) {
        SpringApplication.run(Application.class, args);
    }

}

转载请注明:R&M » Spring Boot集成RabbitMQ 进阶版(手动消费)

喜欢 (0)or分享 (0)
发表我的评论
取消评论

表情

备案号:京ICP备14044161号;联系我:rm@rmworking.com