一、通信模式
1.点对点(queue)
一个消息只能被一个服务接收
消息一旦被消费,就会消失
如果没有被消费,就会一直等待,直到被消费
多个服务监听同一个消费空间,先到先得
2.发布/订阅模式(topic)
一个消息可以被多个服务接收
订阅一个主题的消费者,只能消费自它订阅之后发布的消息。
消费端如果在生产端发送消息之后启动,是接收不到消息的,除非生产端对消息进行了持久化(例如广播,只有当时听到的人能听到信息)
二、整合
2.1 添加依赖
<dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-activemq</artifactId> </dependency> <!-- 如果需要配置连接池,添加如下依赖 --> <dependency> <groupId>org.apache.activemq</groupId> <artifactId>activemq-pool</artifactId> </dependency>
2.2添加配置
spring: activemq: broker-url: tcp://127.0.0.1:61616 user: admin password: admin pool: enabled: false #表示关闭连接池 max-connections: 50
此处 spring.activemq.pool.enabled=false,表示关闭连接池。
2.3 编码
配置类:负责创建队列和主题。
import org.apache.activemq.command.ActiveMQQueue; import org.apache.activemq.command.ActiveMQTopic; import org.springframework.context.annotation.Bean; import org.springframework.context.annotation.Configuration; import javax.jms.Queue; import javax.jms.Topic; @Configuration public class JmsConfirguration { public static final String QUEUE_NAME = "activemq_queue"; public static final String TOPIC_NAME = "activemq_topic"; @Bean public Queue queue() { return new ActiveMQQueue(QUEUE_NAME); } @Bean public Topic topic() { return new ActiveMQTopic(TOPIC_NAME); } }
消息生产者:
import org.springframework.beans.factory.annotation.Autowired; import org.springframework.jms.core.JmsMessagingTemplate; import org.springframework.stereotype.Component; import javax.jms.Queue; import javax.jms.Topic; /** * 消息生产者 */ @Component public class JmsSender { @Autowired private Queue queue; @Autowired private Topic topic; @Autowired private JmsMessagingTemplate jmsTemplate; public void sendByQueue(String message) { this.jmsTemplate.convertAndSend(queue, message); } public void sendByTopic(String message) { this.jmsTemplate.convertAndSend(topic, message); } }
消息消费者:消息消费者使用 @JmsListener 注解监听消息。
import org.springframework.context.annotation.Bean; import org.springframework.jms.annotation.JmsListener; import org.springframework.jms.config.DefaultJmsListenerContainerFactory; import org.springframework.jms.config.JmsListenerContainerFactory; import org.springframework.stereotype.Component; import javax.jms.ConnectionFactory; /** * 消息消费者 */ @Component public class JmsReceiver { //需要给topic定义独立的JmsListenerContainer @Bean public JmsListenerContainerFactory<?> jmsListenerContainerTopic(ConnectionFactory activeMQConnectionFactory) { DefaultJmsListenerContainerFactory bean = new DefaultJmsListenerContainerFactory(); bean.setPubSubDomain(true); bean.setConnectionFactory(activeMQConnectionFactory); return bean; } @JmsListener(destination = JmsConfirguration.QUEUE_NAME) public void receiveByQueue1(String message) { System.out.println("接收队列消息1:" + message); } @JmsListener(destination = JmsConfirguration.QUEUE_NAME) public void receiveByQueue2(String message) { System.out.println("接收队列消息2:" + message); } @JmsListener(destination = JmsConfirguration.TOPIC_NAME , containerFactory="jmsListenerContainerTopic") public void receiveByTopic1(String message) { System.out.println("接收主题消息1:" + message); } @JmsListener(destination = JmsConfirguration.TOPIC_NAME, containerFactory="jmsListenerContainerTopic") public void receiveByTopic2(String message) { System.out.println("接收主题消息2:" + message); } }
2.4测试
@RunWith(SpringRunner.class) @SpringBootTest public class ActivemqApplicationTests { @Autowired private JmsSender sender; @Test public void testSendByQueue() { for (int i = 0; i < 10; i++) { this.sender.sendByQueue("发送队列消息: " + i); } } @Test public void testSendByTopic() { for (int i = 0; i < 10; i++) { this.sender.sendByTopic("发送主题消息: " + i); } } }
2.4.1 queue测试
首先注释 receiveByQueue2,运行结果如下:
查看是否消费了
放开注释 receiveByQueue2,运行结果如下:
一个生产者,两个消费者的情况如下,谁先拿到谁先使用。
2.4.2 topic测试
首先注释 receiveByTopic2,运行结果如下:
接着我们把receiveByTopic2注释取消掉,运行结果如下:
可以看到订阅一个主题的多个消费者,都能收到订阅后发布的消息
三、一些简单常用的应用场景
3.1发送邮件
最经典的就是当用户注册时,我们就需要用activeMQ来做为中间件,当用户注册后,我门把用户的邮箱号和验证码等信息通过activeMQ的生产端发送到activeMQ的消息队列中,而一旦消息队列中出现了数据,我们的邮件模块通过实时的监控activeMQ的消息队列就能通过消费端获取到这个数据,染回邮件模块就会自行的去对数据进行解析,给用户发送邮件
3.2发送短信
原理同发送邮件相同
3.3同步索引库
为了缓解数据库的压力,我们把经常被调用的数据放入索引库中,当有请求查询时,我们会先去查询索引库,如果索引库内有数据,那么我们就不用就数据库进行查询,这样就能大大的减轻服务器的压力,可是随之而来的一个问题是,假如我们服务器内的数据已经发生了改变,而浏览用户查询数据时,因为索引库中已经有数据了,那么这样一来数据库与索引库的数据就不一致了,那么怎么解决这个问题呢?我们想到了通过用activeMQ来监听数据库的操作来实现数据库与索引库的数据同步,当后台管理员或房产经纪人对数据库的数据进行了增删改的操作时,我们通过activeMQ监听到了数据的改变,获取到被修改的数据的id,然后在另一个服务模块中通过这个数据的id去数据库先查询一把,然后根据查询结果进行判断,再去做索引库的数据同步。打个比方,如果查询结果返回的是空,就说明商品已经被删除,那么我们就可以根据数据的id去把索引库中的数据也一并删除了。
4.同步静态页面
此原理同上一个同步索引库是一个原理,目的都是为了减缓服务器的压力,我们经过数据分析发现,其实我们的一些商品详情页面的数据其实都是大同小异的,完全可以通过freemarker页面静态化的模块加上后台查询出的数据拼装成一个静态页面,而这些数据从哪来呢?我们经过讨论和研究,最后一致认为还是放在缓冲中比较好,这样一来就能大大的减轻了数据 库的压力,而另一个好处是,由于页面是纯静态页面,所以页面上的数据都是死数据,这样一来就不用像JSP动态页面那样需要和后台数据库有大量的数据交互,可以最大化的降低服务器的压力,其实这个技术已经有很多大型公司在使用了,比如淘宝,京东,网易等,我们要是细心一些就会发现,他们的页面其实就都是HTML格式的静态页面。
评论 (0)