一、前言
RabbitMQ 的模式介绍可以看看笔者写的这篇文章《RabbitMQ 工作模式介绍》
二、整合 RabbitMQ
2.1 添加依赖
<dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-amqp</artifactId> </dependency>
2.2 添加配置
spring.rabbitmq.host=192.168.2.101 spring.rabbitmq.port=5672 spring.rabbitmq.username=admin spring.rabbitmq.password=admin
2.3 配置类
AmqpConfirguration.java
import org.springframework.amqp.core.*; import org.springframework.amqp.rabbit.connection.CachingConnectionFactory; import org.springframework.amqp.rabbit.connection.ConnectionFactory; import org.springframework.amqp.rabbit.core.RabbitTemplate; import org.springframework.beans.factory.annotation.Value; import org.springframework.beans.factory.config.ConfigurableBeanFactory; import org.springframework.context.annotation.Bean; import org.springframework.context.annotation.Configuration; import org.springframework.context.annotation.Scope; import java.util.logging.Logger; /** * 配置类: */ @Configuration public class AmqpConfirguration { private final Logger logger = Logger.getLogger(this.getClass().getName()); @Value("${spring.rabbitmq.host}") private String host; @Value("${spring.rabbitmq.port}") private int port; @Value("${spring.rabbitmq.username}") private String username; @Value("${spring.rabbitmq.password}") private String password; @Bean public ConnectionFactory connectionFactory() { CachingConnectionFactory connectionFactory = new CachingConnectionFactory(host,port); connectionFactory.setUsername(username); connectionFactory.setPassword(password); connectionFactory.setVirtualHost("/"); connectionFactory.setPublisherConfirms(true); return connectionFactory; } @Bean @Scope(ConfigurableBeanFactory.SCOPE_PROTOTYPE) //必须是prototype类型 public RabbitTemplate rabbitTemplate() { RabbitTemplate template = new RabbitTemplate(connectionFactory()); return template; } //=============简单、工作队列模式=============== public static final String SIMPLE_QUEUE = "simple_queue"; @Bean public Queue queue() { return new Queue(SIMPLE_QUEUE, true); } //===============发布/订阅模式============ public static final String PS_QUEUE_1 = "ps_queue_1"; public static final String PS_QUEUE_2 = "ps_queue_2"; public static final String FANOUT_EXCHANGE = "fanout_exchange"; @Bean public Queue psQueue1() { return new Queue(PS_QUEUE_1, true); } @Bean public Queue psQueue2() { return new Queue(PS_QUEUE_2, true); } @Bean public FanoutExchange fanoutExchange() { return new FanoutExchange(FANOUT_EXCHANGE); } @Bean public Binding fanoutBinding1() { return BindingBuilder.bind(psQueue1()).to(fanoutExchange()); } @Bean public Binding fanoutBinding2() { return BindingBuilder.bind(psQueue2()).to(fanoutExchange()); } //===============路由模式============ public static final String ROUTING_QUEUE_1 = "routing_queue_1"; public static final String ROUTING_QUEUE_2 = "routing_queue_2"; public static final String DIRECT_EXCHANGE = "direct_exchange"; @Bean public Queue routingQueue1() { return new Queue(ROUTING_QUEUE_1, true); } @Bean public Queue routingQueue2() { return new Queue(ROUTING_QUEUE_2, true); } @Bean public DirectExchange directExchange() { return new DirectExchange(DIRECT_EXCHANGE); } @Bean public Binding directBinding1() { return BindingBuilder.bind(routingQueue1()).to(directExchange()).with("user"); } @Bean public Binding directBinding2() { return BindingBuilder.bind(routingQueue2()).to(directExchange()).with("order"); } //===============主题模式============ public static final String TOPIC_QUEUE_1 = "topic_queue_1"; public static final String TOPIC_QUEUE_2 = "topic_queue_2"; public static final String TOPIC_QUEUE_3 = "topic_queue_3"; public static final String TOPIC_EXCHANGE = "topic_exchange"; @Bean public Queue topicQueue1() { return new Queue(TOPIC_QUEUE_1, true); } @Bean public Queue topicQueue2() { return new Queue(TOPIC_QUEUE_2, true); } @Bean public Queue topicQueue3() { return new Queue(TOPIC_QUEUE_3, true); } @Bean public TopicExchange topicExchange() { return new TopicExchange(TOPIC_EXCHANGE); } @Bean public Binding topicBinding1() { return BindingBuilder.bind(topicQueue1()).to(topicExchange()).with("user.add"); } @Bean public Binding topicBinding2() { return BindingBuilder.bind(topicQueue2()).to(topicExchange()).with("user.#"); } @Bean public Binding topicBinding3() { return BindingBuilder.bind(topicQueue1()).to(topicExchange()).with("test.#"); } }
2.4 消息生产者
AmqpSender.java
import org.springframework.amqp.core.AmqpTemplate; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.stereotype.Component; /** * 消息生产者 */ @Component public class AmqpSender { @Autowired private AmqpTemplate amqpTemplate; /** * 简单模式发送 * * @param message */ public void simpleSend(String message) { this.amqpTemplate.convertAndSend(AmqpConfirguration.SIMPLE_QUEUE, message); } /** * 发布/订阅模式发送 * * @param message */ public void psSend(String message) { this.amqpTemplate.convertAndSend(AmqpConfirguration.FANOUT_EXCHANGE, "", message); } /** * 路由模式发送 * * @param message */ public void routingSend(String routingKey, String message) { this.amqpTemplate.convertAndSend(AmqpConfirguration.DIRECT_EXCHANGE, routingKey, message); } /** * 主题模式发送 * * @param routingKey * @param message */ public void topicSend(String routingKey, String message) { this.amqpTemplate.convertAndSend(AmqpConfirguration.TOPIC_EXCHANGE, routingKey, message); } }
2.5 消息消费者
AmqpReceiver.java
import org.springframework.amqp.rabbit.annotation.RabbitListener; import org.springframework.stereotype.Component; /** * 消息消费者 */ @Component public class AmqpReceiver { //使@RabbitListener 注解监听消息。 /** * 简单模式接收 * * @param message */ @RabbitListener(queues = AmqpConfirguration.SIMPLE_QUEUE) public void simpleReceive1(String message) { System.out.println("接收消息1:" + message); } @RabbitListener(queues = AmqpConfirguration.SIMPLE_QUEUE) public void simpleReceive2(String message) { System.out.println("接收消息2:" + message); } /** * 发布/订阅模式接收 * * @param message */ @RabbitListener(queues = AmqpConfirguration.PS_QUEUE_1) public void psReceive1(String message) { System.out.println(AmqpConfirguration.PS_QUEUE_1 + "接收消息:" + message); } @RabbitListener(queues = AmqpConfirguration.PS_QUEUE_2) public void psReceive2(String message) { System.out.println(AmqpConfirguration.PS_QUEUE_2 + "接收消息:" + message); } /** * 路由模式接收 * * @param message */ @RabbitListener(queues = AmqpConfirguration.ROUTING_QUEUE_1) public void routingReceive1(String message) { System.out.println(AmqpConfirguration.ROUTING_QUEUE_1 + "接收消息:" + message); } @RabbitListener(queues = AmqpConfirguration.ROUTING_QUEUE_2) public void routingReceive2(String message) { System.out.println(AmqpConfirguration.ROUTING_QUEUE_2 + "接收消息:" + message); } /** * 主题模式接收 * * @param message */ @RabbitListener(queues = AmqpConfirguration.TOPIC_QUEUE_1) public void topicReceive1(String message) { System.out.println(AmqpConfirguration.TOPIC_QUEUE_1 + "接收消息:" + message); } @RabbitListener(queues = AmqpConfirguration.TOPIC_QUEUE_2) public void topicReceive2(String message) { System.out.println(AmqpConfirguration.TOPIC_QUEUE_2 + "接收消息:" + message); } @RabbitListener(queues = AmqpConfirguration.TOPIC_QUEUE_3) public void topicReceive3(String message) { System.out.println(AmqpConfirguration.TOPIC_QUEUE_3 + "接收消息:" + message); } }
2.6 测试类
@RunWith(SpringRunner.class) @SpringBootTest public class RabbitmqApplicationTests { @Autowired private AmqpSender sender; @Test public void testSimpleSend() { //简单模式发送 for (int i = 1; i < 6; i++) { this.sender.simpleSend("test simpleSend " + i); } } @Test public void testPsSend() { //发布/订阅模式发送 for (int i = 1; i < 6; i++) { this.sender.psSend("test psSend " + i); } } @Test public void testRoutingSend() { //路由模式发送 for (int i = 1; i < 6; i++) { this.sender.routingSend("order", "test routingSend " + i); } } @Test public void testTopicSend() { //主题模式发送 for (int i = 1; i < 6; i++) { this.sender.topicSend("user.add", "test topicSend " + i); } } }
三、实战演练
3.1 简单模式与工作队列模式
一个消息消费者:
结果:
两个消息消费者:
结果:由两个消息消费者平均消费消息
3.2 发布/订阅模式
两个消息消费者:
结果:所有消息消费者都能收到所有信息
3.3路由模式
在AmqpConfirguration定义了“user”和"order",在路由上绑定了routingKey为“user”与“order”两个队列
两个消息消费者:
测试:
routingKey为“order”
可以看到只有routing_queue_2收到了消息,routing_queue_2的routingKey为“order”
3.4 主题模式
在路由上绑定了三个队列,routingKey分别为“user.add”、“user.#”、“test.#”
3个消息消费者
测试:
routingKey为“user.add”
可以发现topic_queue_1和topic_queue_2都接收到了消息,而topic_queue_3没有。
因为主题模式中routingKey里的 符号 “#” 匹配一个或多个词,符号“*”匹配不多不少一个词。相当于模糊匹配,而topic_queue_3匹配不上
三、项目源码
评论 (0)