⭐⭐⭐ Spring Boot 项目实战 ⭐⭐⭐ Spring Cloud 项目实战
《Dubbo 实现原理与源码解析 —— 精品合集》 《Netty 实现原理与源码解析 —— 精品合集》
《Spring 实现原理与源码解析 —— 精品合集》 《MyBatis 实现原理与源码解析 —— 精品合集》
《Spring MVC 实现原理与源码解析 —— 精品合集》 《数据库实体设计合集》
《Spring Boot 实现原理与源码解析 —— 精品合集》 《Java 面试题 + Java 学习指南》

摘要: 原创出处 http://www.iocoder.cn/ActiveMQ/yuliu/doc/ 「芋道源码」欢迎转载,保留摘要,谢谢!


🙂🙂🙂关注**微信公众号:【芋道源码】**有福利:

  1. RocketMQ / MyCAT / Sharding-JDBC 所有源码分析文章列表
  2. RocketMQ / MyCAT / Sharding-JDBC 中文注释源码 GitHub 地址
  3. 您对于源码的疑问每条留言将得到认真回复。甚至不知道如何读源码也可以请教噢
  4. 新的源码解析文章实时收到通知。每周更新一篇左右
  5. 认真的源码交流微信群。

1. 简介

ActiveMQ 特点

ActiveMQ 是由 Apache 出品的一款开源消息中间件,旨在为应用程序提供高效、可扩展、稳定、安全的企业级消息通信。 它的设计目标是提供标准的、面向消息的、多语言的应用集成消息通信中间件。ActiveMQ 实现了 JMS 1.1 并提供了很多附加的特性,比如 JMX 管理、主从管理、消息组通信、消息优先级、延迟接收消息、虚拟接收者、消息持久化、消息队列监控等等。其主要特性有:

  1. 支持包括 Java、C、C++、C#、Ruby、Perl、Python、PHP 等多种语言的客户端和协议。协议包含 OpenWire、Stomp、AMQP、MQTT 。
  2. 提供了像消息组通信、消息优先级、延迟接收消息、虚拟接收者、消息持久化之类的高级特性
  3. 完全支持 JMS 1.1 和 J2EE 1.4规范(包括持久化、分布式事务消息、事务)
  4. 对 Spring 框架的支持,ActiveMQ 可以通过 Spring 的配置文件方式很容易嵌入到 Spring 应用中
  5. 通过了常见的 J2EE 服务器测试,比如 TomEE、Geronimo、JBoss、GlassFish、WebLogic
  6. 连接方式的多样化,ActiveMQ 提供了多种连接模式,例如 in-VM、TCP、SSL、NIO、UDP、多播、JGroups、JXTA
  7. 支持通过使用 JDBC 和 journal 实现消息的快速持久化
  8. 为高性能集群、客户端-服务器、点对点通信等场景而设计
  9. 提供了技术和语言中立的 REST API 接口
  10. 支持 Ajax 方式调用 ActiveMQ
  11. ActiveMQ 可以轻松地与 CXF、Axis 等 Web Service 技术整合,以提供可靠的消息传递
  12. 可用作为内存中的 JMS 提供者,非常适合 JMS 单元测试

基本概念

因为 ActiveMQ 是完整支持 JMS 1.1 的,所以从 Java 使用者的角度其基本概念与 JMS 1.1 规范是一致的。

消息传送模型
  1. 点对点模型(Point to Point) 使用队列(Queue)作为消息通信载体,满足生产者与消费者模式,一条消息只能被一个消费者使用,未被消费的消息在队列中保留直到被消费或超时。
  2. 发布订阅模型(Pub/Sub) 使用主题作为消息通信载体,类似于广播模式,发布者发布一条消息,该消息通过主题传递给所有的订阅者,在一条消息广播之后才订阅的用户则是收不到该条消息的。
基本组件

ActiveMQ 使用时包含的基本组件各与 JMS 是相同的:

  1. Broker,消息代理,表示消息队列服务器实体,接受客户端连接,提供消息通信的核心服务。
  2. Producer,消息生产者,业务的发起方,负责生产消息并传输给 Broker 。
  3. Consumer,消息消费者,业务的处理方,负责从 Broker 获取消息并进行业务逻辑处理。
  4. Topic,主题,发布订阅模式下的消息统一汇集地,不同生产者向 Topic 发送消息,由 Broker 分发到不同的订阅者,实现消息的广播。
  5. Queue,队列,点对点模式下特定生产者向特定队列发送消息,消费者订阅特定队列接收消息并进行业务逻辑处理。
  6. Message,消息体,根据不同通信协议定义的固定格式进行编码的数据包,来封装业务 数据,实现消息的传输。

由于这些概念在 JMS 中已介绍过,这里不再详细介绍。

连接器

ActiveMQ Broker 的主要作用是为客户端应用提供一种通信机制,为此 ActiveMQ 提供了一种连接机制,并用连接器(connector)来描述这种连接机制。ActiveMQ 中连接器有两种,一种是用于客户端与消息代理服务器(client-to-broker)之间通信的传输连接器(transport connector),一种是用于消息代理服务器之间(broker-to-broker)通信的网络连接器(network connector)。connector 使用 URI(统一资源定位符)来表示,URI 格式为: :[?][#] schema name 表示协议, 例如:foo://username:password@example.com:8042/over/there/index.dtb?type=animal&name=narwhal#nose

其中 schema name 部分是 foo,hierarchical part 是 username:password@example.com:8042/over/there/index.dtb,query 是 type=animal&name=narwhal,fragment 是 nose。

  1. 传输连接器 为了交换消息,消息生产者和消息消费者(统称为客户端)都需要连接到消息代理服务器,这种客户端和消息代理服务器之间的通信就是通过传输连接器(Transport connectors)完成的。很多情况下用户连接消息代理时的需求侧重点不同,有的更关注性能,有的更注重安全性,因此 ActiveMQ 提供了一系列l连接协议供选择,来覆盖这些使用场景。从消息代理的角度看,传输连接器就是用来处理和监听客户端连接的,查看 ActiveMQ demo 的配置文件(/examples/conf/activemq-demo.xml),传输连接的相关配置如下:

<transportConnectors>
<transportConnector name="openwire" uri="tcp://localhost:61616" discoveryUri="multicast://default"/>
<transportConnector name="ssl" uri="ssl://localhost:61617"/>
<transportConnector name="stomp" uri="stomp://localhost:61613"/>
<transportConnector name="ws" uri="ws://localhost:61614/" />
</transportConnectors>

传输连接器定义在元素中,一个元素定义一个特定的连接器,一个连接器必须有自己唯一的名字和 URI 属性,但discoveryUri属性是可选的。目前在 ActiveMQ 最新的5.15版本中常用的传输连接器连接协议有:vm、tcp、udp、multicast、nio、ssl、http、https、websocket、amqp、mqtt、stomp 等等

  • vm,允许客户端和消息服务器直接在 VM 内部通信,采用的连接不是 Socket 连接,而是直接的虚拟机本地方法调用,从而避免网络传输的开销。应用场景仅限于服务器和客户端在同一 JVM 中。
  • tcp,客户端通过 TCP 连接到远程的消息服务器。
  • udp,客户端通过 UDP 连接到远程的消息服务器。
  • multicast,允许使用组播传输的方式连接到消息服务器。
  • nio,nio 和 tcp 的作用是一样的,只不过 nio 使用了 java 的 NIO包,这可能在某些场景下可提供更好的性能。
  • ssl,ssl 允许用户在 TCP 的基础上使用 SSL 。
  • http 和 https,允许客户端使用 REST 或 Ajax 的方式进行连接,这意味着可以直接使用 Javascript 向 ActiveMQ 发送消息。
  • websocket,允许客户端通过 HTML5 中的 WebSocket 方式连接到消息服务器。
  • amqp,5.8版本开始支持。
  • mqtt、stomp,5.6版本开始支持。

每个协议的具体配置见官网(http://activemq.apache.org/uri-protocols.html )。除了以上这些基本协议之外 ActiveMQ 还支持一些高级协议也可以通过 URI 的方式进行配置,比如 Failover 和 Fanout 。

  • Failover 是一种重新连接的机制,工作于上面介绍的连接协议的上层,用于建立可靠的传输。其配置语法允许制定任意多个复合的 URI ,它会自动选择其中的一个 URI 来尝试建立连接,如果该连接没有成功,则会继续选择其它的 URI 来尝试。配置语法例如:failover:(tcp://localhost:61616,tcp://remotehost:61616)?initialReconnectDelay=100
  • Fanout 是一种重新连接和复制的机制,它也工作于其它连接的上层,采用复制的方式把消息复制到多个消息服务器。配置语法例如:fanout:(tcp://localhost:61629,tcp://localhost:61639,tcp://localhost:61649)
  1. 网络连接器 很多情况下,我们要处理的数据可能是海量的,这种场景单台服务器很难支撑,这就要用到集群功能,为此 ActiveMQ 提供了网络连接的模式,简单说就是通过把多个消息服务器实例连接在一起作为一个整体对外提供服务,从而提高整体对外的消息服务能力。通过这种方式连接在一起的服务器实例之间可共享队列和消费者列表,从而达到分布式队列的目的,网络连接器就是用来配置服务器之间的通信。

使用网络连接器的简单场景

如图所示,服务器 S1 和 S2 通过 NewworkConnector 相连,生产者 P1 发送的消息,消费者 C3 和 C4 都可以接收到,而生产者 P3 发送的消息,消费者 C1 和 C2 也可以接收到。要使用网络连接器的功能需要在服务器 S1 的 activemq.xml 中的 broker 节点下添加如下配置(假设192.168.11.23:61617 为 S2 的地址):

<networkConnectors>
<networkConnector uri="static:(tcp://192.168.11.23:61617)"/>
</networkConnectors>

如果只是这样,S1 可以将消息发送到 S2,但这只是单方向的通信,发送到 S2 上的的消息还不能发送到 S1 上。如果想 S1 也收到从 S2 发来的消息需要在 S2 的 activemq.xml 中的 broker 节点下也添加如下配置(假设192.168.11.45:61617为 S1 的地址):

<networkConnectors>
<networkConnector uri="static:(tcp://192.168.11.45:61617)"/>
</networkConnectors>

这样,S1和S2就可以双向通信了。目前在 ActiveMQ 最新的5.15版本中常用的网络连接器协议有 static 和 multicast 两种。

  • static,静态协议,用于为一个网络中多个代理创建静态配置,这种配置协议支持复合的 URI (即包含其他 URI 的 URI)。例如static://(tcp://ip:61616,tcp://ip2:61616)
  • multicast,多点传送协议,消息服务器会广播自己的服务,也会定位其他代理。这种方式用于服务器之间实现动态识别,而不是配置静态的 IP 组。

对这块感兴趣的话可以看官方文档:http://activemq.apache.org/networks-of-brokers.html

消息存储

JMS 规范中消息的分发方式有两种:非持久化和持久化。对于非持久化消息 JMS 实现者须保证尽最大努力分发消息,但消息不会持久化存储;而持久化方式分发的消息则必须进行持久化存储。非持久化消息常用于发送通知或实时数据,当你比较看重系统性能并且即使丢失一些消息并不影响业务正常运作时可选择非持久化消息。持久化消息被发送到消息服务器后如果当前消息的消费者并没有运行则该消息继续存在,只有等到消息被处理并被消息消费者确认之后,消息才会从消息服务器中删除。

对以上这两种方式 ActiveMQ 都支持,并且还支持通过缓存在内存中的中间状态消息的方式来恢复消息。概括起来看 ActiveMQ 的消息存储有三种:存储到内存、存储到文件、存储到数据库。具体使用上 ActiveMQ 提供了一个插件式的消息存储机制,类似于消息的多点传播,主要实现了如下几种:

  • AMQ,是 ActiveMQ 5.0及以前版本默认的消息存储方式,它是一个基于文件的、支持事务的消息存储解决方案。 在此方案下消息本身以日志的形式实现持久化,存放在 Data Log 里。并且还对日志里的消息做了引用索引,方便快速取回消息。
  • KahaDB,也是一种基于文件并具有支持事务的消息存储方式,从5.3开始推荐使用 KahaDB 存储消息,它提供了比 AMQ 消息存储更好的可扩展性和可恢复性。
  • JDBC,基于 JDBC 方式将消息存储在数据库中,将消息存到数据库相对来说比较慢,所以 ActiveMQ 建议结合 journal 来存储,它使用了快速的缓存写入技术,大大提高了性能。
  • 内存存储,是指将所有要持久化的消息放到内存中,因为这里没有动态的缓存,所以需要注意设置消息服务器的 JVM 和内存大小。
  • LevelDB,5.6版本之后推出了 LevelDB 的持久化引擎,它使用了自定义的索引代替常用的 BTree 索引,其持久化性能高于 KahaDB,虽然默认的持久化方式还是 KahaDB,但是 LevelDB 将是趋势。在5.9版本还提供了基于 LevelDB 和 ZooKeeper 的数据复制方式,作为 Master-Slave 方式的首选数据复制方案。

2. 工程实例

Java 访问 ActiveMQ 实例

JMS 规范中传递消息的方式有两种,一种是点对点模型的队列(Queue)方式,另一种是发布订阅模型的主题(Topic)方式。下面看下用 ActiveMQ 以主题方式传递消息的 Java 示例。

引入依赖

Java 工程中需要引入 ActiveMQ 包的依赖,jar 包版本同你安装 ActiveMQ 版本一致即可:

<dependency>
<groupId>org.apache.activemq</groupId>
<artifactId>activemq-all</artifactId>
<version>5.15.2</version>
</dependency>

消息生产者

package org.study.mq.activeMQ;

import org.apache.activemq.ActiveMQConnection;
import org.apache.activemq.ActiveMQConnectionFactory;

import javax.jms.*;

public class TopicPublisher {

/**
* 默认用户名
*/
public static final String USERNAME = ActiveMQConnection.DEFAULT_USER;
/**
* 默认密码
*/
public static final String PASSWORD = ActiveMQConnection.DEFAULT_PASSWORD;
/**
* 默认连接地址
*/
public static final String BROKER_URL = ActiveMQConnection.DEFAULT_BROKER_URL;

public static void main(String[] args) {
//创建连接工厂
ConnectionFactory connectionFactory = new ActiveMQConnectionFactory(USERNAME, PASSWORD, BROKER_URL);
try {
//创建连接
Connection connection = connectionFactory.createConnection();
//开启连接
connection.start();
//创建会话,不需要事务
Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
//创建 Topic,用作消费者订阅消息
Topic myTestTopic = session.createTopic("activemq-topic-test1");
//消息生产者
MessageProducer producer = session.createProducer(myTestTopic);

for (int i = 1; i <= 3; i++) {
TextMessage message = session.createTextMessage("发送消息 " + i);
producer.send(myTestTopic, message);
}

//关闭资源
session.close();
connection.close();
} catch (JMSException e) {
e.printStackTrace();
}
}
}

在 Topic 模式中消息生产者是用于发布消息的,绝大部分代码与 Queue 模式中相似,不同的是本例中基于 Session 创建的是主题(Topic),该主题作为消费者消费消息的目的地。

消息消费者

package org.study.mq.activeMQ;

import org.apache.activemq.ActiveMQConnection;
import org.apache.activemq.ActiveMQConnectionFactory;

import javax.jms.*;

public class TopicSubscriber {

/**
* 默认用户名
*/
public static final String USERNAME = ActiveMQConnection.DEFAULT_USER;
/**
* 默认密码
*/
public static final String PASSWORD = ActiveMQConnection.DEFAULT_PASSWORD;
/**
* 默认连接地址
*/
public static final String BROKER_URL = ActiveMQConnection.DEFAULT_BROKER_URL;

public static void main(String[] args) {
//创建连接工厂
ConnectionFactory connectionFactory = new ActiveMQConnectionFactory(USERNAME, PASSWORD, BROKER_URL);
try {
//创建连接
Connection connection = connectionFactory.createConnection();
//开启连接
connection.start();
//创建会话,不需要事务
Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
//创建 Topic
Topic myTestTopic = session.createTopic("activemq-topic-test1");

MessageConsumer messageConsumer = session.createConsumer(myTestTopic);
messageConsumer.setMessageListener(new MessageListener() {
@Override
public void onMessage(Message message) {
try {
System.out.println("消费者1 接收到消息:" + ((TextMessage) message).getText());
} catch (JMSException e) {
e.printStackTrace();
}
}
});

MessageConsumer messageConsumer2 = session.createConsumer(myTestTopic);
messageConsumer2.setMessageListener(new MessageListener() {
@Override
public void onMessage(Message message) {
try {
System.out.println("消费者2 接收到消息:" + ((TextMessage) message).getText());
} catch (JMSException e) {
e.printStackTrace();
}
}
});

MessageConsumer messageConsumer3 = session.createConsumer(myTestTopic);
messageConsumer3.setMessageListener(new MessageListener() {
@Override
public void onMessage(Message message) {
try {
System.out.println("消费者3 接收到消息:" + ((TextMessage) message).getText());
} catch (JMSException e) {
e.printStackTrace();
}
}
});

//让主线程休眠100秒,使消息消费者对象能继续存活一段时间从而能监听到消息
Thread.sleep(100 * 1000);
//关闭资源
session.close();
connection.close();
} catch (Exception e) {
e.printStackTrace();
}
}
}

为了展示主题模式中消息广播给多个订阅者的功能,这里创建了三个消费者对象并订阅了同一个主题,比较特殊的是最后让主线程休眠了一段时间,这么做的目的是让消费者对象能继续存活,从而使控制台能打印出监听到的消息内容。

启动 ActiveMQ 服务器

在 ActiveMQ 的 bin 目录下直接执行activemq start即启动了 ActiveMQ

运行 TopicSubscriber

需要先运行 TopicSubscriber 类的 main 方法,这样发布者发布消息的时候订阅者才能接收到消息,如果将执行顺序倒过来则消息先发布出去但没有任何订阅者在运行,则看不到消息被消费了。

运行 TopicPublisher

接着运行 TopicPublisher 类的 main 方法,向主题中发布3条消息,然后可以在 TopicSubscriber 后台看到接收到的消息内容:

消费者接收到消息

Spring 整合 ActiveMQ

在实际项目中如果使用原生的 ActiveMQ API 开发显然比较啰嗦,这中间创建连接工厂、创建连接之类代码完全可以抽取出来由框架统一做,这些事情 Spring 也想到了并帮我们做了。ActiveMQ 完全支持基于 Spring 的方式 配置 JMS 客户端和服务器,下面的例子展示一下在 Spring 中如何使用队列模式和主题模式传递消息。

引入依赖

<dependency>
<groupId>org.apache.activemq</groupId>
<artifactId>activemq-all</artifactId>
<version>5.15.2</version>
</dependency>

<dependency>
<groupId>org.springframework</groupId>
<artifactId>spring-jms</artifactId>
<version>4.3.10.RELEASE</version>
</dependency>

<dependency>
<groupId>org.apache.activemq</groupId>
<artifactId>activemq-pool</artifactId>
<version>5.15.0</version>
</dependency>

工程中除了 activemq 的包之外还要添加 Spring 支持 JMS 的包。由于 connection、session、producer 的创建会消耗大量系统资源,为此这里使用 连接池 来复用这些资源,所以还要添加 activemq-pool 的依赖。

Spring 配置文件

<?xml version="1.0" encoding="UTF-8"?>
<beans xmlns="http://www.springframework.org/schema/beans"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xmlns:context="http://www.springframework.org/schema/context"
xsi:schemaLocation="
http://www.springframework.org/schema/beans
http://www.springframework.org/schema/beans/spring-beans-3.0.xsd
http://www.springframework.org/schema/context
http://www.springframework.org/schema/context/spring-context-3.0.xsd">
<context:component-scan base-package="org.study.mq.activeMQ.spring"/>

<bean id="jmsFactory" class="org.apache.activemq.pool.PooledConnectionFactory" destroy-method="stop">
<property name="connectionFactory">
<bean class="org.apache.activemq.ActiveMQConnectionFactory">
<property name="brokerURL">
<value>tcp://localhost:61616</value>
</property>
</bean>
</property>
<property name="maxConnections" value="100"></property>
</bean>
<bean id="cachingConnectionFactory" class="org.springframework.jms.connection.CachingConnectionFactory">
<property name="targetConnectionFactory" ref="jmsFactory"/>
<property name="sessionCacheSize" value="1"/>
</bean>
<bean id="jmsTemplate" class="org.springframework.jms.core.JmsTemplate">
<property name="connectionFactory" ref="cachingConnectionFactory"/>
<property name="messageConverter">
<bean class="org.springframework.jms.support.converter.SimpleMessageConverter"/>
</property>
</bean>

<bean id="testQueue" class="org.apache.activemq.command.ActiveMQQueue">
<constructor-arg name="name" value="spring-queue"/>
</bean>
<bean id="testTopic" class="org.apache.activemq.command.ActiveMQTopic">
<constructor-arg index="0" value="spring-topic"/>
</bean>

<bean id="queueListener" class="org.study.mq.activeMQ.spring.QueueListener"/>
<bean id="topic1Listener" class="org.study.mq.activeMQ.spring.Topic1Listener"/>
<bean id="topic2Listener" class="org.study.mq.activeMQ.spring.Topic2Listener"/>

<bean id="queueContainer"
class="org.springframework.jms.listener.DefaultMessageListenerContainer">
<property name="connectionFactory" ref="cachingConnectionFactory"/>
<property name="destination" ref="testQueue"/>
<property name="messageListener" ref="queueListener"/>
</bean>
<bean id="topic1Container"
class="org.springframework.jms.listener.DefaultMessageListenerContainer">
<property name="connectionFactory" ref="cachingConnectionFactory"/>
<property name="destination" ref="testTopic"/>
<property name="messageListener" ref="topic1Listener"/>
</bean>
<bean id="topic2Container"
class="org.springframework.jms.listener.DefaultMessageListenerContainer">
<property name="connectionFactory" ref="cachingConnectionFactory"/>
<property name="destination" ref="testTopic"/>
<property name="messageListener" ref="topic2Listener"/>
</bean>

</beans>

下面的项目示例中的 Java 代码采用注解的方式,这也是现在很多程序员的习惯用法,所以在配置文件一开始定义注解扫描包路径org.study.mq.activeMQ.spring,您可以根据自己实际情况修改包名称,本例中的所有 Java 代码都放在该包之下。

接下来定义了一个 JMS 工厂 bean,采用的是池化连接工厂类org.apache.activemq.pool.PooledConnectionFactory,实际就是对内部的 ActiveMQ 连接工厂增加了连接池的功能,从其内部配置可以看到就是对org.apache.activemq.ActiveMQConnectionFactory的功能封装,而ActiveMQConnectionFactory类则比较熟悉了,就是上面 Java 访问 ActiveMQ 示例一开始创建连接工厂时使用的类。brokerURL 属性配置的就是连接服务器的协议和服务器地址。接下来的 cachingConnectionFactory 是实际项目代码中常用的,对连接工厂的又一层增强,使用连接的缓存功能以提升效率,读者可酌情选择使用。

jmsTemplate 就是 Spring 解决 JMS 访问时冗长重复代码的方案,它需要配置的两个主要属性是 connectionFactory 和 messageConverter,通过 connectionFactory 获取连接、会话等对象, messageConverter 则是配置消息转换器,因为通常消息在发送前和接收后都需要进行一个前置和后置处理,转换器便进行这个工作。这样实际代码直接通过 jmsTemplate 来发送和接收消息,而每次发送接收消息时创建连接工厂、创建连接、创建会话等工作都由 Spring 框架做了。

有了 JMS 模板还需要知道队列和主题作为实际发送和接收消息的目的地,所以接下来定义了 testQueue 和 testTopic 作为两种模式的示例。而异步接收消息时则需要提供 MessageListener 的实现类,所以定义了 queueListener 作为队列模式下异步接收消息的监听器,topic1Listener 和 topic2Listener 作为主题模式下异步接收消息的监听器,主题模式用两个监听器是为了演示多个消费者时都能收到消息。最后的 queueContainer、topic1Container、topic2Container 用于将消息监听器绑定到具体的消息目的地上。

消息服务类

下面是使用 JMS 模板处理消息的消息服务类

package org.study.mq.activeMQ.spring;

import org.springframework.jms.core.JmsTemplate;
import org.springframework.jms.core.MessageCreator;
import org.springframework.stereotype.Service;

import javax.annotation.Resource;
import javax.jms.*;

@Service
public class MessageService {

@Resource(name = "jmsTemplate")
private JmsTemplate jmsTemplate;

@Resource(name = "testQueue")
private Destination testQueue;

@Resource(name = "testTopic")
private Destination testTopic;

//向队列发送消息
public void sendQueueMessage(String messageContent) {
jmsTemplate.send(testQueue, new MessageCreator() {
@Override
public Message createMessage(Session session) throws JMSException {
TextMessage msg = session.createTextMessage();
// 设置消息内容
msg.setText(messageContent);
return msg;
}
});

}

//向主题发送消息
public void sendTopicMessage(String messageContent) {
jmsTemplate.send(testTopic, new MessageCreator() {
@Override
public Message createMessage(Session session) throws JMSException {
TextMessage msg = session.createTextMessage();
// 设置消息内容
msg.setText(messageContent);
return msg;
}
});

}
}

@Service 将该类声明为一个服务,实际项目中很多服务代码也类似。通过 Resource 注解直接将上面配置文件中定义的 jmsTemplate 引入到 MessageService 类中就可以直接使用了,testQueue 和 testTopic 也是类似,服务类中直接引入配置文件中定义好的队列和主题。重点是下面的两个发送消息的方法,sendQueueMessage 向队列发送消息,sendTopicMessage 向主题发送消息,两种模式都使用了 jmsTemplate 的 send 方法,send 方法第1个参数是javax.jms.Destination类型,表示消息目的地。由于javax.jms.Queuejavax.jms.Topic都继承了javax.jms.Destination接口,所以该方法对队列模式和主题模式都适用。send 方法的第2个参数是org.springframework.jms.core.MessageCreator,这里使用了匿名内部类的方式创建对象,从支持的 Session 对象中创建文本消息,这样就可以发送消息了。可以看到无论是队列还是主题,通过 Spring 框架来发送消息的代码比之前的 Java 代码示例简洁了很多。

消息监听器类

package org.study.mq.activeMQ.spring;

import javax.jms.JMSException;
import javax.jms.Message;
import javax.jms.MessageListener;
import javax.jms.TextMessage;

public class QueueListener implements MessageListener {
@Override
public void onMessage(Message message) {
if (message instanceof TextMessage) {
try {
TextMessage txtMsg = (TextMessage) message;
String messageStr = txtMsg.getText();
System.out.println("队列监听器接收到文本消息:" + messageStr);
} catch (JMSException e) {
e.printStackTrace();
}
} else {
throw new IllegalArgumentException("只支持 TextMessage 类型消息!");
}
}
}

队列消息监听器在收到消息时校验是否是文本消息类型,是的话则打印出内容。

package org.study.mq.activeMQ.spring;

import javax.jms.JMSException;
import javax.jms.Message;
import javax.jms.MessageListener;
import javax.jms.TextMessage;

public class Topic1Listener implements MessageListener {
@Override
public void onMessage(Message message) {
if (message instanceof TextMessage) {
try {
TextMessage txtMsg = (TextMessage) message;
String messageStr = txtMsg.getText();
System.out.println("主题监听器1 接收到文本消息:" + messageStr);
} catch (JMSException e) {
e.printStackTrace();
}
} else {
throw new IllegalArgumentException("只支持 TextMessage 类型消息!");
}
}
}

package org.study.mq.activeMQ.spring;

import javax.jms.JMSException;
import javax.jms.Message;
import javax.jms.MessageListener;
import javax.jms.TextMessage;

public class Topic2Listener implements MessageListener {
@Override
public void onMessage(Message message) {
if (message instanceof TextMessage) {
try {
TextMessage txtMsg = (TextMessage) message;
String messageStr = txtMsg.getText();
System.out.println("主题监听器2 接收到文本消息:" + messageStr);
} catch (JMSException e) {
e.printStackTrace();
}
} else {
throw new IllegalArgumentException("只支持 TextMessage 类型消息!");
}
}
}

主题监听器的代码与队列监听器类似,只是打印时通过不同字符串表示当前是不同监听器接收的消息。

启动应用

为了演示例子,写了一个 StartApplication 类,在 main 方法中加载 Spring ,获取到 MessageService 服务之后调用 sendQueueMessage 和 sendTopicMessage 方法发送消息。

package org.study.mq.activeMQ.spring;

import org.springframework.context.ApplicationContext;
import org.springframework.context.support.ClassPathXmlApplicationContext;

public class StartApplication {
public static void main(String[] args) {
ApplicationContext ctx = new ClassPathXmlApplicationContext("spring-context.xml");
MessageService messageService = (MessageService) ctx.getBean("messageService");

messageService.sendQueueMessage("我的测试消息1");
messageService.sendTopicMessage("我的测试消息2");
messageService.sendTopicMessage("我的测试消息3");
}

}

启动好 activeMQ 服务之后运行 StartApplication 类,在控制台看到接收到文本消息:

接收到文本消息

队列监听器监听到了一条消息,两个主题监听器分别监听到了两条消息。

文章目录
  1. 1. 1. 简介
    1. 1.0.1. ActiveMQ 特点
    2. 1.0.2. 基本概念
      1. 1.0.2.0.1. 消息传送模型
      2. 1.0.2.0.2. 基本组件
      3. 1.0.2.0.3. 连接器
      4. 1.0.2.0.4. 消息存储
  • 2. 2. 工程实例
    1. 2.0.1. Java 访问 ActiveMQ 实例
      1. 2.0.1.0.1. 引入依赖
      2. 2.0.1.0.2. 消息生产者
      3. 2.0.1.0.3. 消息消费者
      4. 2.0.1.0.4. 启动 ActiveMQ 服务器
      5. 2.0.1.0.5. 运行 TopicSubscriber
      6. 2.0.1.0.6. 运行 TopicPublisher
  • 2.0.2. Spring 整合 ActiveMQ
    1. 2.0.2.0.1. 引入依赖
    2. 2.0.2.0.2. Spring 配置文件
    3. 2.0.2.0.3. 消息服务类
    4. 2.0.2.0.4. 消息监听器类
    5. 2.0.2.0.5. 启动应用