Spring Boot 整合 ActiveMQ

绿林寻猫
2021-12-08 / 0 评论 / 461 阅读 / 正在检测是否收录...

ActiveMQ安装

一、通信模式

1.点对点(queue)

  • 一个消息只能被一个服务接收

  • 消息一旦被消费,就会消失

  • 如果没有被消费,就会一直等待,直到被消费

  • 多个服务监听同一个消费空间,先到先得

 

2.发布/订阅模式(topic)

  • 一个消息可以被多个服务接收

  • 订阅一个主题的消费者,只能消费自它订阅之后发布的消息。

  • 消费端如果在生产端发送消息之后启动,是接收不到消息的,除非生产端对消息进行了持久化(例如广播,只有当时听到的人能听到信息)

 

二、整合

2.1 添加依赖

<dependency>
  <groupId>org.springframework.boot</groupId>
  <artifactId>spring-boot-starter-activemq</artifactId>
</dependency>

<!-- 如果需要配置连接池,添加如下依赖 -->
<dependency>
    <groupId>org.apache.activemq</groupId>
    <artifactId>activemq-pool</artifactId>
</dependency>

2.2添加配置

spring:
  activemq:
    broker-url: tcp://127.0.0.1:61616
    user: admin
    password: admin
    pool:
      enabled: false #表示关闭连接池
      max-connections: 50

此处 spring.activemq.pool.enabled=false,表示关闭连接池。

2.3 编码

配置类:负责创建队列和主题。

import org.apache.activemq.command.ActiveMQQueue;
import org.apache.activemq.command.ActiveMQTopic;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;

import javax.jms.Queue;
import javax.jms.Topic;

@Configuration
public class JmsConfirguration {

    public static final String QUEUE_NAME = "activemq_queue";

    public static final String TOPIC_NAME = "activemq_topic";

    @Bean
    public Queue queue() {
        return new ActiveMQQueue(QUEUE_NAME);
    }

    @Bean
    public Topic topic() {
        return new ActiveMQTopic(TOPIC_NAME);
    }
}

消息生产者:

import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.jms.core.JmsMessagingTemplate;
import org.springframework.stereotype.Component;

import javax.jms.Queue;
import javax.jms.Topic;

/**
 * 消息生产者
 */
@Component
public class JmsSender {

    @Autowired
    private Queue queue;

    @Autowired
    private Topic topic;

    @Autowired
    private JmsMessagingTemplate jmsTemplate;

    public void sendByQueue(String message) {
        this.jmsTemplate.convertAndSend(queue, message);
    }

    public void sendByTopic(String message) {
        this.jmsTemplate.convertAndSend(topic, message);
    }
}

消息消费者:消息消费者使用 @JmsListener 注解监听消息。

import org.springframework.context.annotation.Bean;
import org.springframework.jms.annotation.JmsListener;
import org.springframework.jms.config.DefaultJmsListenerContainerFactory;
import org.springframework.jms.config.JmsListenerContainerFactory;
import org.springframework.stereotype.Component;

import javax.jms.ConnectionFactory;

/**
 * 消息消费者
 */
@Component
public class JmsReceiver {
    //需要给topic定义独立的JmsListenerContainer
    @Bean
    public JmsListenerContainerFactory<?> jmsListenerContainerTopic(ConnectionFactory activeMQConnectionFactory) {
        DefaultJmsListenerContainerFactory bean = new DefaultJmsListenerContainerFactory();
        bean.setPubSubDomain(true);
        bean.setConnectionFactory(activeMQConnectionFactory);
        return bean;
    }

    @JmsListener(destination = JmsConfirguration.QUEUE_NAME)
    public void receiveByQueue1(String message) {
        System.out.println("接收队列消息1:" + message);
    }

    @JmsListener(destination = JmsConfirguration.QUEUE_NAME)
    public void receiveByQueue2(String message) {
        System.out.println("接收队列消息2:" + message);
    }

    @JmsListener(destination = JmsConfirguration.TOPIC_NAME , containerFactory="jmsListenerContainerTopic")
    public void receiveByTopic1(String message) {
        System.out.println("接收主题消息1:" + message);
    }
    @JmsListener(destination = JmsConfirguration.TOPIC_NAME, containerFactory="jmsListenerContainerTopic")
    public void receiveByTopic2(String message) {
        System.out.println("接收主题消息2:" + message);
    }
}

2.4测试

@RunWith(SpringRunner.class)
@SpringBootTest
public class ActivemqApplicationTests {

    @Autowired
    private JmsSender sender;

    @Test
    public void testSendByQueue() {
        for (int i = 0; i < 10; i++) {
            this.sender.sendByQueue("发送队列消息: " + i);
        }
    }

    @Test
    public void testSendByTopic() {
        for (int i = 0; i < 10; i++) {
            this.sender.sendByTopic("发送主题消息: " + i);
        }
    }

}

2.4.1 queue测试

首先注释  receiveByQueue2,运行结果如下:

查看是否消费了

放开注释  receiveByQueue2,运行结果如下:

一个生产者,两个消费者的情况如下,谁先拿到谁先使用。

 

2.4.2 topic测试

首先注释 receiveByTopic2,运行结果如下:

接着我们把receiveByTopic2注释取消掉,运行结果如下:

可以看到订阅一个主题的多个消费者,都能收到订阅后发布的消息

 

三、一些简单常用的应用场景

3.1发送邮件

  最经典的就是当用户注册时,我们就需要用activeMQ来做为中间件,当用户注册后,我门把用户的邮箱号和验证码等信息通过activeMQ的生产端发送到activeMQ的消息队列中,而一旦消息队列中出现了数据,我们的邮件模块通过实时的监控activeMQ的消息队列就能通过消费端获取到这个数据,染回邮件模块就会自行的去对数据进行解析,给用户发送邮件

3.2发送短信

  原理同发送邮件相同

3.3同步索引库

  为了缓解数据库的压力,我们把经常被调用的数据放入索引库中,当有请求查询时,我们会先去查询索引库,如果索引库内有数据,那么我们就不用就数据库进行查询,这样就能大大的减轻服务器的压力,可是随之而来的一个问题是,假如我们服务器内的数据已经发生了改变,而浏览用户查询数据时,因为索引库中已经有数据了,那么这样一来数据库与索引库的数据就不一致了,那么怎么解决这个问题呢?我们想到了通过用activeMQ来监听数据库的操作来实现数据库与索引库的数据同步,当后台管理员或房产经纪人对数据库的数据进行了增删改的操作时,我们通过activeMQ监听到了数据的改变,获取到被修改的数据的id,然后在另一个服务模块中通过这个数据的id去数据库先查询一把,然后根据查询结果进行判断,再去做索引库的数据同步。打个比方,如果查询结果返回的是空,就说明商品已经被删除,那么我们就可以根据数据的id去把索引库中的数据也一并删除了。

4.同步静态页面

  此原理同上一个同步索引库是一个原理,目的都是为了减缓服务器的压力,我们经过数据分析发现,其实我们的一些商品详情页面的数据其实都是大同小异的,完全可以通过freemarker页面静态化的模块加上后台查询出的数据拼装成一个静态页面,而这些数据从哪来呢?我们经过讨论和研究,最后一致认为还是放在缓冲中比较好,这样一来就能大大的减轻了数据 库的压力,而另一个好处是,由于页面是纯静态页面,所以页面上的数据都是死数据,这样一来就不用像JSP动态页面那样需要和后台数据库有大量的数据交互,可以最大化的降低服务器的压力,其实这个技术已经有很多大型公司在使用了,比如淘宝,京东,网易等,我们要是细心一些就会发现,他们的页面其实就都是HTML格式的静态页面。

 

 

 

 

 

0

评论 (0)

取消