首页
友链
关于
免责声明
Search
1
王者营地战绩数据王者荣耀查询网页源码
6,210 阅读
2
群晖Active Backup for Business套件备份Linux服务器教程
4,384 阅读
3
影视分享
4,313 阅读
4
(亲测)Jrebel激活破解方式2019-08-21
4,289 阅读
5
centos7 安装及卸载 jekenis
3,573 阅读
日常
文章
后端
前端
Linux
异常
Flutter
分享
群辉
登录
Search
标签搜索
docker
springboot
Spring Boot
java
linux
Shiro
Graphics2D
图片
游戏账号交易
Mybatis
Spring Cloud
centos
脚本
Web Station
群辉
王者营地
战绩查询
平台对接
Spring Cloud Alibaba
nacos
绿林寻猫
累计撰写
249
篇文章
累计收到
26
条评论
首页
栏目
日常
文章
后端
前端
Linux
异常
Flutter
分享
群辉
页面
友链
关于
免责声明
搜索到
1
篇与
RabbitMQ
的结果
2021-12-08
Spring Boot 整合 RabbitMQ(附源码)
一、前言RabbitMQ 的模式介绍可以看看笔者写的这篇文章《RabbitMQ 工作模式介绍》二、整合 RabbitMQ2.1 添加依赖 <dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-amqp</artifactId> </dependency>2.2 添加配置spring.rabbitmq.host=192.168.2.101 spring.rabbitmq.port=5672 spring.rabbitmq.username=admin spring.rabbitmq.password=admin2.3 配置类AmqpConfirguration.javaimport org.springframework.amqp.core.*; import org.springframework.amqp.rabbit.connection.CachingConnectionFactory; import org.springframework.amqp.rabbit.connection.ConnectionFactory; import org.springframework.amqp.rabbit.core.RabbitTemplate; import org.springframework.beans.factory.annotation.Value; import org.springframework.beans.factory.config.ConfigurableBeanFactory; import org.springframework.context.annotation.Bean; import org.springframework.context.annotation.Configuration; import org.springframework.context.annotation.Scope; import java.util.logging.Logger; /** * 配置类: */ @Configuration public class AmqpConfirguration { private final Logger logger = Logger.getLogger(this.getClass().getName()); @Value("${spring.rabbitmq.host}") private String host; @Value("${spring.rabbitmq.port}") private int port; @Value("${spring.rabbitmq.username}") private String username; @Value("${spring.rabbitmq.password}") private String password; @Bean public ConnectionFactory connectionFactory() { CachingConnectionFactory connectionFactory = new CachingConnectionFactory(host,port); connectionFactory.setUsername(username); connectionFactory.setPassword(password); connectionFactory.setVirtualHost("/"); connectionFactory.setPublisherConfirms(true); return connectionFactory; } @Bean @Scope(ConfigurableBeanFactory.SCOPE_PROTOTYPE) //必须是prototype类型 public RabbitTemplate rabbitTemplate() { RabbitTemplate template = new RabbitTemplate(connectionFactory()); return template; } //=============简单、工作队列模式=============== public static final String SIMPLE_QUEUE = "simple_queue"; @Bean public Queue queue() { return new Queue(SIMPLE_QUEUE, true); } //===============发布/订阅模式============ public static final String PS_QUEUE_1 = "ps_queue_1"; public static final String PS_QUEUE_2 = "ps_queue_2"; public static final String FANOUT_EXCHANGE = "fanout_exchange"; @Bean public Queue psQueue1() { return new Queue(PS_QUEUE_1, true); } @Bean public Queue psQueue2() { return new Queue(PS_QUEUE_2, true); } @Bean public FanoutExchange fanoutExchange() { return new FanoutExchange(FANOUT_EXCHANGE); } @Bean public Binding fanoutBinding1() { return BindingBuilder.bind(psQueue1()).to(fanoutExchange()); } @Bean public Binding fanoutBinding2() { return BindingBuilder.bind(psQueue2()).to(fanoutExchange()); } //===============路由模式============ public static final String ROUTING_QUEUE_1 = "routing_queue_1"; public static final String ROUTING_QUEUE_2 = "routing_queue_2"; public static final String DIRECT_EXCHANGE = "direct_exchange"; @Bean public Queue routingQueue1() { return new Queue(ROUTING_QUEUE_1, true); } @Bean public Queue routingQueue2() { return new Queue(ROUTING_QUEUE_2, true); } @Bean public DirectExchange directExchange() { return new DirectExchange(DIRECT_EXCHANGE); } @Bean public Binding directBinding1() { return BindingBuilder.bind(routingQueue1()).to(directExchange()).with("user"); } @Bean public Binding directBinding2() { return BindingBuilder.bind(routingQueue2()).to(directExchange()).with("order"); } //===============主题模式============ public static final String TOPIC_QUEUE_1 = "topic_queue_1"; public static final String TOPIC_QUEUE_2 = "topic_queue_2"; public static final String TOPIC_QUEUE_3 = "topic_queue_3"; public static final String TOPIC_EXCHANGE = "topic_exchange"; @Bean public Queue topicQueue1() { return new Queue(TOPIC_QUEUE_1, true); } @Bean public Queue topicQueue2() { return new Queue(TOPIC_QUEUE_2, true); } @Bean public Queue topicQueue3() { return new Queue(TOPIC_QUEUE_3, true); } @Bean public TopicExchange topicExchange() { return new TopicExchange(TOPIC_EXCHANGE); } @Bean public Binding topicBinding1() { return BindingBuilder.bind(topicQueue1()).to(topicExchange()).with("user.add"); } @Bean public Binding topicBinding2() { return BindingBuilder.bind(topicQueue2()).to(topicExchange()).with("user.#"); } @Bean public Binding topicBinding3() { return BindingBuilder.bind(topicQueue1()).to(topicExchange()).with("test.#"); } }2.4 消息生产者AmqpSender.javaimport org.springframework.amqp.core.AmqpTemplate; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.stereotype.Component; /** * 消息生产者 */ @Component public class AmqpSender { @Autowired private AmqpTemplate amqpTemplate; /** * 简单模式发送 * * @param message */ public void simpleSend(String message) { this.amqpTemplate.convertAndSend(AmqpConfirguration.SIMPLE_QUEUE, message); } /** * 发布/订阅模式发送 * * @param message */ public void psSend(String message) { this.amqpTemplate.convertAndSend(AmqpConfirguration.FANOUT_EXCHANGE, "", message); } /** * 路由模式发送 * * @param message */ public void routingSend(String routingKey, String message) { this.amqpTemplate.convertAndSend(AmqpConfirguration.DIRECT_EXCHANGE, routingKey, message); } /** * 主题模式发送 * * @param routingKey * @param message */ public void topicSend(String routingKey, String message) { this.amqpTemplate.convertAndSend(AmqpConfirguration.TOPIC_EXCHANGE, routingKey, message); } }2.5 消息消费者AmqpReceiver.javaimport org.springframework.amqp.rabbit.annotation.RabbitListener; import org.springframework.stereotype.Component; /** * 消息消费者 */ @Component public class AmqpReceiver { //使@RabbitListener 注解监听消息。 /** * 简单模式接收 * * @param message */ @RabbitListener(queues = AmqpConfirguration.SIMPLE_QUEUE) public void simpleReceive1(String message) { System.out.println("接收消息1:" + message); } @RabbitListener(queues = AmqpConfirguration.SIMPLE_QUEUE) public void simpleReceive2(String message) { System.out.println("接收消息2:" + message); } /** * 发布/订阅模式接收 * * @param message */ @RabbitListener(queues = AmqpConfirguration.PS_QUEUE_1) public void psReceive1(String message) { System.out.println(AmqpConfirguration.PS_QUEUE_1 + "接收消息:" + message); } @RabbitListener(queues = AmqpConfirguration.PS_QUEUE_2) public void psReceive2(String message) { System.out.println(AmqpConfirguration.PS_QUEUE_2 + "接收消息:" + message); } /** * 路由模式接收 * * @param message */ @RabbitListener(queues = AmqpConfirguration.ROUTING_QUEUE_1) public void routingReceive1(String message) { System.out.println(AmqpConfirguration.ROUTING_QUEUE_1 + "接收消息:" + message); } @RabbitListener(queues = AmqpConfirguration.ROUTING_QUEUE_2) public void routingReceive2(String message) { System.out.println(AmqpConfirguration.ROUTING_QUEUE_2 + "接收消息:" + message); } /** * 主题模式接收 * * @param message */ @RabbitListener(queues = AmqpConfirguration.TOPIC_QUEUE_1) public void topicReceive1(String message) { System.out.println(AmqpConfirguration.TOPIC_QUEUE_1 + "接收消息:" + message); } @RabbitListener(queues = AmqpConfirguration.TOPIC_QUEUE_2) public void topicReceive2(String message) { System.out.println(AmqpConfirguration.TOPIC_QUEUE_2 + "接收消息:" + message); } @RabbitListener(queues = AmqpConfirguration.TOPIC_QUEUE_3) public void topicReceive3(String message) { System.out.println(AmqpConfirguration.TOPIC_QUEUE_3 + "接收消息:" + message); } }2.6 测试类@RunWith(SpringRunner.class) @SpringBootTest public class RabbitmqApplicationTests { @Autowired private AmqpSender sender; @Test public void testSimpleSend() { //简单模式发送 for (int i = 1; i < 6; i++) { this.sender.simpleSend("test simpleSend " + i); } } @Test public void testPsSend() { //发布/订阅模式发送 for (int i = 1; i < 6; i++) { this.sender.psSend("test psSend " + i); } } @Test public void testRoutingSend() { //路由模式发送 for (int i = 1; i < 6; i++) { this.sender.routingSend("order", "test routingSend " + i); } } @Test public void testTopicSend() { //主题模式发送 for (int i = 1; i < 6; i++) { this.sender.topicSend("user.add", "test topicSend " + i); } } } 三、实战演练 3.1 简单模式与工作队列模式一个消息消费者:结果:两个消息消费者:结果:由两个消息消费者平均消费消息3.2 发布/订阅模式两个消息消费者:结果:所有消息消费者都能收到所有信息 3.3路由模式在AmqpConfirguration定义了“user”和"order",在路由上绑定了routingKey为“user”与“order”两个队列两个消息消费者: 测试:routingKey为“order” 可以看到只有routing_queue_2收到了消息,routing_queue_2的routingKey为“order”3.4 主题模式在路由上绑定了三个队列,routingKey分别为“user.add”、“user.#”、“test.#”3个消息消费者测试:routingKey为“user.add”可以发现topic_queue_1和topic_queue_2都接收到了消息,而topic_queue_3没有。因为主题模式中routingKey里的 符号 “#” 匹配一个或多个词,符号“*”匹配不多不少一个词。相当于模糊匹配,而topic_queue_3匹配不上三、项目源码源码下载
2021年12月08日
255 阅读
0 评论
0 点赞