概述 基于 Spring 支持的客户端编程, 包括发送方客户端 接收方客户端 发送方客户端代码 :jms-producer 接收方客户端代码 :jms-consumer 发送方客户端 这里基于 demo 进行说明 这个 demo 将往 example.queue 和 example.topic 各发一条信息 文件目录结构 1. src/main/resources/ 2. ---- jndi.properties 3. ---- spring-beans.xml 4. src/main/java/ 5. ---- cn.sinobest.asj.producer.springsupport.simple 6. ---- SimpleProducer.java # 发送测试类 7. pom.xml 文件内容 1.jndi.properties 1. java.naming.factory.initial=org.apache.activemq.jndi.activemqinitialcontextfactor y 2. 3. # use the following property to configure the default connector 4. java.naming.provider.url=tcp://localhost:61616 5. 6. # register some queues in JNDI using the form 7. # queue.[jndiname] = [physicalname] 8. queue.examplequeue=example.queue 9. 10. # register some topics in JNDI using the form 11. # topic.[jndiname] = [physicalname] 12. topic.exampletopic=example.topic 2.spring-beans.xml 1. <?xml version="1.0" encoding="utf-8"?> 2. <beans xmlns="http://www.springframework.org/schema/beans" 3. xmlns:xsi="http://www.w3.org/2001/xmlschema-instance" 4. xmlns:context="http://www.springframework.org/schema/context" 5. xsi:schemalocation="http://www.springframework.org/schema/beans 6. http://www.springframework.org/schema/beans/spring-beans-4.2.xsd
7. http://www.springframework.org/schema/context 8. http://www.springframework.org/schema/context/spring-context-4.2.xsd"> 9. <!-- import outer properties file --> 10. <context:property-placeholder location="classpath:jndi.properties" /> 11. 12. <!-- create pooled connection factory --> 13. <bean id="pooledconnectionfactory" class="org.apache.activemq.pool.pooledconnect ionfactory"> 14. <property name="connectionfactory"> 15. <bean class="org.apache.activemq.activemqconnectionfactory"> 16. <property name="brokerurl" value="${java.naming.provider.url}" /> 17. <property name="closetimeout" value="60000" /> 18. </bean> 19. </property> 20. </bean> 21. 22. <!-- create queue destination --> 23. <bean id="examplequeue" class="org.apache.activemq.command.activemqqueue"> 24. <constructor-arg value="${queue.examplequeue}" /> 25. </bean> 26. 27. <!-- create topic destination --> 28. <bean id="exampletopic" class="org.apache.activemq.command.activemqtopic"> 29. <constructor-arg value="${topic.exampletopic}" /> 30. </bean> 31. 32. <!-- create template for send message --> 33. <bean id="jmstemplate" class="org.springframework.jms.core.jmstemplate"> 34. <!-- bind the connection factory --> 35. <property name="connectionfactory" ref="pooledconnectionfactory" /> 36. <!-- bind the default destination, but you can also appoint other destina tion when send --> 37. <property name="defaultdestination" ref="examplequeue" /> 38. </bean> 39. </beans> 说明 : JmsTemplate 提供了多个重载的 send 方法, 用于发送消息, 但是它依赖于 ConnectionFactory 资 源 在声明 ConnectionFactory 资源时, 我们使用 PooledConnectionFactory 包装 ActiveMQConnectionFactory [1], 这是为了有效的利用资源, 就像 JDBC 中的连接池 此外, 还声明了两个 Destination:exampleQueue 和 exampletopic, 并将 examplequeue 作 为 jmstemplate 的 defaultdestination 另一个 exampletopic, 将在 SimpleProducer.java 中使用 这里使用 ${java.naming.provider.url} 的方式引用的属性值, 都来自 jndi.properties 注 : 1. 在我参考的系列文章中, 还有使用 org.springframework.jms.connection.singleconnectionfactory 来包 装 ActiveMQConnectionFactory 的 另外, 我觉得直接使用 ActiveMQConnectionFactory 也可以 3.SimpleProducer.java
1. package cn.sinobest.asj.producer.springsupport.simple; 2. 3. import javax.annotation.resource; 4. import javax.jms.destination; 5. import javax.jms.jmsexception; 6. import javax.jms.message; 7. import javax.jms.session; 8. import javax.jms.textmessage; 9. 10. import org.junit.test; 11. import org.junit.runner.runwith; 12. import org.springframework.jms.core.jmstemplate; 13. import org.springframework.jms.core.messagecreator; 14. import org.springframework.test.context.contextconfiguration; 15. import org.springframework.test.context.junit4.springjunit4classrunner; 16. /** 17. * 单元测试类.<br> 18. * 基于 Spring 运行环境的测试. 19. * @author lijinlong 20. * 21. */ 22. @RunWith(SpringJUnit4ClassRunner.class) // 配置 spring 组件运行时 23. @ContextConfiguration("/spring-beans.xml") // 配置文件 24. public class SimpleProducer { 25. @Resource(name="exampleTopic") 26. private Destination exampletopic; 27. 28. @Resource(name="jmsTemplate") 29. private JmsTemplate jmstemplate; 30. 31. @Test 32. public void test() { 33. // 发送到默认的 destination - queue 34. jmstemplate.send(new MessageCreator() { 35. 36. public Message createmessage(session session) throws JMSException { 37. TextMessage msg = session.createtextmessage(); 38. msg.settext("this message is plan to the queue."); 39. return msg; 40. } 41. }); 42. 43. // 发送到指定的 destination - topic 44. jmstemplate.send(exampletopic, new MessageCreator() { 45. 46. public Message createmessage(session session) throws JMSException { 47. TextMessage msg = session.createtextmessage(); 48. msg.settext("this message is plan to the topic."); 49. return msg; 50. } 51. }); 52. } 53. } 说明 :
使用 RunWith 注解, 为单元测试类指定 Spring 运行环境, 就可以在单元测试类中接受依赖的注 入 使用 ContextConfiguration 注解, 进一步指定 spring beans 的配置文件 test 方法是测试方法, 它使用不同的重载的 send 方法, 分别向默认的 Destination exampletopic destination 发送消息 MessageCreator 是一个消息创建器, 在消息的接收方式和消息的筛选一 文,SendTemplate.java 提供的抽象方法 createmessage(session) 与之是相似的思路 JmsTemplate 的 send 方法 : send(string destinationname, MessageCreator messagecreator):void 将 messagecreator 创建的消息, 发送到 destinationname 指定的目的地 destinationname: 目的地的名称, 具体按照 queue 还是 topic 解析, 需要另外的参数指定 [1] ; 默认情况应该是按照 queue 来解析的 messagecreator: 回调接口, 提供回调方法, 基于 Session 创建消息 send(destination destination, MessageCreator messagecreator):void 将 messagecreator 创建的消息, 发送到指定的 Destination send(messagecreator messagecreator):void 将 messagecreator 创建的消息, 发送到默认的 Destination 注 : 1. 应该是在声明 JmsTemplate bean 时, 指定 pubsubdomain 属性 :false - Queues;true - Topics 4.pom.xml 这里仅记录添加的依赖, 完整的内容可以参考 pom.xml 1. <!-- import org.apache.activemq.pool.pooledconnectionfactory using in spring-bean s.xml --> 2. <dependency> 3. <groupid>org.apache.activemq</groupid> 4. <artifactid>activemq-pool</artifactid> 5. <version>5.13.2</version> 6. </dependency> 7. <!-- import for unit test --> 8. <dependency> 9. <groupid>junit</groupid> 10. <artifactid>junit</artifactid> 11. <version>4.12</version> 12. </dependency> 13. <!-- import spring support --> 14. <dependency> 15. <groupid>org.springframework</groupid> 16. <artifactid>spring-context</artifactid> 17. <version>4.2.4.release</version> 18. </dependency> 19. <dependency> 20. <groupid>org.springframework</groupid> 21. <artifactid>spring-jms</artifactid>
22. <version>4.2.4.release</version> 23. </dependency> 24. <dependency> 25. <groupid>org.springframework</groupid> 26. <artifactid>spring-test</artifactid> 27. <version>4.2.4.release</version> 28. </dependency> 接收方客户端 这里基于 demo 进行说明 这个 demo 将接收 example.queue 和 example.topic 的消息 文件目录结构 1. src/main/resources/ 2. ---- jndi.properties 3. ---- spring-beans.xml 4. src/main/java/ 5. ---- cn.sinobest.asj.consumer.util 6. ---- Hold.java # 提供 pause 功能 7. ---- cn.sinobest.asj.consumer.springsupport.simple 8. ---- SimpleConsumer.java # 单元测试类 9. pom.xml 文件内容 1.jndi.properties 1. java.naming.factory.initial=org.apache.activemq.jndi.activemqinitialcontextfactor y 2. 3. # use the following property to configure the default connector 4. java.naming.provider.url=tcp://localhost:61616 5. 6. # register some queues in JNDI using the form 7. # queue.[jndiname] = [physicalname] 8. queue.examplequeue=example.queue 9. 10. # register some topics in JNDI using the form 11. # topic.[jndiname] = [physicalname] 12. topic.exampletopic=example.topic 2.spring-beans.xml 1. <?xml version="1.0" encoding="utf-8"?> 2. <beans xmlns="http://www.springframework.org/schema/beans" 3. xmlns:xsi="http://www.w3.org/2001/xmlschema-instance" 4. xmlns:context="http://www.springframework.org/schema/context" 5. xmlns:jms="http://www.springframework.org/schema/jms"
6. xsi:schemalocation="http://www.springframework.org/schema/beans 7. http://www.springframework.org/schema/beans/spring-beans-4.2.xsd 8. http://www.springframework.org/schema/context 9. http://www.springframework.org/schema/context/spring-context-4.2.xsd 10. http://www.springframework.org/schema/jms 11. http://www.springframework.org/schema/jms/spring-jms-4.2.xsd"> 12. <!-- import outer properties file --> 13. <context:property-placeholder location="classpath:jndi.properties" /> 14. 15. <!-- create pooled connection factory --> 16. <bean id="pooledconnectionfactory" class="org.apache.activemq.pool.pooledconnect ionfactory"> 17. <property name="connectionfactory"> 18. <bean class="org.apache.activemq.activemqconnectionfactory"> 19. <property name="brokerurl" value="${java.naming.provider.url}" /> 20. <property name="closetimeout" value="60000" /> 21. </bean> 22. </property> 23. </bean> 24. 25. <bean id="simpleconsumer" class="cn.sinobest.asj.consumer.springsupport.simple.s impleconsumer" /> 26. 27. <!-- config listener container for queues --> 28. <jms:listener-container connection-factory="pooledconnectionfactory"> 29. <jms:listener destination="${queue.examplequeue}" ref="simpleconsumer" 30. method="onmessagefromqueue" /> 31. </jms:listener-container> 32. 33. <!-- config listener container for topics --> 34. <jms:listener-container connection-factory="pooledconnectionfactory" 35. destination-type="topic"> 36. <jms:listener destination="${topic.exampletopic}" ref="simpleconsumer" 37. method="onmessagefromtopic" /> 38. </jms:listener-container> 39. </beans> 说明 : 这里声明了两个 listener-container, 这是我们分析配置的起点 listener-container 用于管理 listener:listener-container 的 connection-factory 依赖于 ConnectionFactory 实例,destination-type 标识 listener 监听的 destination 的类型, 默认为 queue listener 配置监听器 :destination 配置监听的目的地,ref method 分别配置消息处理 bean 和方法 监听器收到消息之后, 会调用对应的 bean 方法 [1] simpleconsumer 作为消息处理 bean, 对于这个类本身没有特殊的要求 这里使用 ${java.naming.provider.url} 的方式引用的属性值, 都来自 jndi.properties 注 : 1. 要求配置的方法的参数只有一个, 且类型和 Message 中封装的数据类型兼容 比如 Message 是 一个 TextMessage, 那么 String 类型足以满足 ; 如果 Message 是一个 ObjectMessage, 且封装的 Object 是字符串数组类型, 那就要提供一个可以兼容字符串数组的参数类型 提供重载的方 法,Spring 会在类型兼容的前提下, 选择一个最接近的方法调用
基于 bean 的配置 这是另外一种异步接收的配置方式 : 1. <bean id="messagecontainer" 2. class="org.springframework.jms.listener.defaultmessagelistenercontainer"> 3. <property name="connectionfactory" ref="connectionfactory" /> 4. <property name="destination" ref="destination" /> 5. <property name="messagelistener" ref="messagelistener" /> 6. </bean> 两种配置的比较 基于 listener-container 的配置 一个 container 可以配置多个 listener 配置消息处理类来接收消息, 且对这个类没 有类型的要求 对 Message 进行了预处理 : 根据接收到的 Message 的类型进行解封, 根据解封的类型选 择一个兼容的处理方法调用 基于 bean 的配置 一个 container 只能配置一个 listener 用 messagelistener 属性配置消息监听器, 要 求实现 javax.jms.messagelistener 接口 没有对 Message 进行预处理 同步接收方式 JmsTemplate 同样提供了一系列 receive 方法来接受消息 参考前文发送方客户端发送方客户端中配置 JmsTemplate 的方式, 然后调用 receive 方法进行同步接收 receive 系列方法, 只是单纯的接收, 返回的结果是 javax.jms.message 类型 receiveselected 系列方法, 在 receive 系列方法的基础上加入了选择器 receiveandconvert 系列方法, 接收并转换, 返回的是 Object 类型 ( 实际类型是 Message 内部 封装的类型 ) receiveselectedandconvert 系列方法, 在 receiveandconvert 系列方法的基础上加入了选择 器 3.Hold.java 1. package cn.sinobest.asj.consumer.util; 2. /** 3. * 防止程序立即退出.<br> 4. * @author lijinlong 5. * 6. */ 7. public class Hold { 8. /** 9. * 防止程序立即退出, 直到用户输入 Enter 键. 10. */ 11. public static void hold() { 12. @SuppressWarnings("resource")
13. java.util.scanner scanner = new java.util.scanner(system.in); 14. System.out.println(" 按 Enter 键退出 :"); 15. scanner.nextline(); 16. } 17. } 基于异步的接收方式, 需要防止主线程执行完毕后退出 4.SimpleConsumer.java 1. package cn.sinobest.asj.consumer.springsupport.simple; 2. 3. import org.junit.test; 4. import org.junit.runner.runwith; 5. import org.springframework.test.context.contextconfiguration; 6. import org.springframework.test.context.junit4.springjunit4classrunner; 7. 8. import cn.sinobest.asj.consumer.util.hold; 9. 10. @RunWith(SpringJUnit4ClassRunner.class) 11. @ContextConfiguration("/spring-beans.xml") 12. public class SimpleConsumer { 13. @Test 14. public void test() { 15. Hold.hold(); 16. } 17. 18. public void onmessagefromqueue(string message) { 19. System.out.println(" 从 queue 收到消息 :" + message); 20. } 21. 22. public void onmessagefromtopic(string message) { 23. System.out.println(" 从 Topic 收到消息 :" + message); 24. } 25. } 5.pom.xml 这里仅记录添加的依赖, 完整的内容可以参考 pom.xml 1. <!-- import org.apache.activemq.pool.pooledconnectionfactory using in spring-bean s.xml --> 2. <dependency> 3. <groupid>org.apache.activemq</groupid> 4. <artifactid>activemq-pool</artifactid> 5. <version>5.13.2</version> 6. </dependency> 7. <!-- import for unit test --> 8. <dependency> 9. <groupid>junit</groupid> 10. <artifactid>junit</artifactid> 11. <version>4.12</version> 12. </dependency> 13. <!-- import spring support -->
14. <dependency> 15. <groupid>org.springframework</groupid> 16. <artifactid>spring-context</artifactid> 17. <version>4.2.4.release</version> 18. </dependency> 19. <dependency> 20. <groupid>org.springframework</groupid> 21. <artifactid>spring-jms</artifactid> 22. <version>4.2.4.release</version> 23. </dependency> 24. <dependency> 25. <groupid>org.springframework</groupid> 26. <artifactid>spring-test</artifactid> 27. <version>4.2.4.release</version> 28. </dependency> 测试 1. 启动 ActiveMQ 2. 以 JUnit 的方式运行 SimpleConsumer.java 3. 以 JUnit 的方式运行 SimpleProducer.java 4. 在 SimpleConsumer 的控制台可以看到接收到的消息 附录 参考文章 1. Apache-ActiveMQ 整合 Spring 使用 JmsTemplate 发送消息, 基于 bean 的消息接收配置,JUnit 集成 Spring 的单元测试 2. ActiveMQ 使用示例 基于 listener-container 的消息接收配置, 使用 PooledConnectionFactory 配置连接池 3. Spring Support
ActiveMQ 官方文档