本文最后更新于:2 年前
1 ActiveMQ简介
1.1 ActiveMQ是什么
ActiveMQ是一个消息队列应用服务器(推送服务器)。支持JMS规范。
1.1.1 JMS概述
JMS 全称:Java Message Service ,即为 Java 消息服务,是一套 java 消息服务的 API 接口。实现了 JMS 标准的系统,称之为 JMS Provider。
1.1.2 消息队列
消息队列是在消息的传输过程中保存消息的容器,提供一种不同进程或者同一进程不同线程直接通讯的方式。
- Producer:消息生产者,负责产生和发送消息到 Broker;
- Broker:消息处理中心。负责消息存储、确认、重试等,一般其中会包含多个 queue;
- Consumer:消息消费者,负责从 Broker 中获取消息,并进行相应处理;
常见消息队列应用:
- ActiveMQ
ActiveMQ 是Apache出品,最流行的,能力强劲的开源消息总线。ActiveMQ 是一个完全支持JMS1.1和J2EE 1.4规范的 JMS Provider实现。
- RabbitMQ
RabbitMQ是一个在AMQP基础上完成的,可复用的企业消息系统。他遵循Mozilla Public License开源协议。开发语言为Erlang。
- RocketMQ
由阿里巴巴定义开发的一套消息队列应用服务。
1.2 ActiveMQ能做什么
- 实现两个不同应用(程序)之间的消息通讯。
- 实现同一个应用,不同模块之间的消息通讯。(确保数据发送的稳定性)
1.3 ActiveMQ主要特点
- 支持多语言、多协议客户端。语言: Java,C,C++,C#,Ruby,Perl,Python,PHP。应用协议: OpenWire, Stomp REST, WS Notification, XMPP, AMQP
- 对Spring的支持,ActiveMQ可以很容易整合到Spring的系统里面去。
- 支持高可用、高性能的集群模式。
2 入门示例
2.1 需求
使用 ActiveMQ 实现消息队列模型。
2.2 配置步骤说明
- 搭建ActiveMQ消息服务器。
- 创建一个java项目。
- 创建消息生产者,发送消息。
- 创建消息消费者,接收消息。
2.3 第一部分:搭建ActiveMQ消息服务器
2.3.1 第一步:下载、上传至Linux,并解压
2.3.2 第二步:启动ActiveMQ服务器
1 2 3 4 5 6 7 8
| [root@localhost bin]# ./activemq start INFO: Loading '/usr/local/activemq//bin/env' INFO: Using java '/usr/local/java/jdk1.8.0_221/bin/java' INFO: Starting - inspect logfiles specified in logging.properties and log4j.properties to get details INFO: pidfile created : '/usr/local/activemq//data/activemq.pid' (pid '3628') [root@localhost bin]# ./activemq status INFO: Loading '/usr/local/activemq//bin/env' INFO: Using java '/usr/local/java/jdk1.8.0_221/bin/java'ActiveMQ is running (pid '3628')
|
2.3.3 第三步:浏览器访问 ActiveMQ 管理界面
2.3.3.1 Step1:查看ActiveMQ管理界面的服务端口。在 /conf/jetty.xml
中
默认端口为: 8161
2.3.3.2 Step2:查看ActiveMQ用户、密码。在 /conf/users.properties
中
默认用户名密码均为 admin
2.3.3.3 Step3:访问ActiveMQ管理控制台。地址:http://ip:8161/
注意:若防火墙没有配置该服务的端口,必须在防火墙中配置。
点击图示选项后,进行登录
然后成功进入管理界面
2.4 第二部分:创建java项目,导入jar包
ActiveMQ 的解压包中,提供了运行 ActiveMQ 需要的 jar 包。
ActiveMQ 是实现了 JMS 规范的。在实现消息服务的时候,必须基于 API 接口规范。
2.4.1 JMS 常用的 API 说明
下述 API 都是接口类型,定义在 javax.jms 包中,是 JMS 标准接口定义。ActiveMQ 完全实现这一套 api 标准。
2.4.1.1 ConnectionFactory
链接工厂, 用于创建链接的工厂类型。
2.4.1.2 Connection
链接,用于建立访问ActiveMQ连接的类型, 由链接工厂创建。
2.4.1.3 Session
会话, 一次持久有效、有状态的访问,由链接创建。
2.4.1.4 Destination & Queue & Topic
目的地, 即本次访问ActiveMQ消息队列的地址,由Session会话创建。
- interface Queue extends Destination
- Queue:队列模型,只有一个消费者。消息一旦被消费,默认删除。
- Topic:主题订阅中的消息,会发送给所有的消费者同时处理。
2.4.1.5 Message
消息,在消息传递过程中数据载体对象,是所有消息【文本消息TextMessage,对象消息ObjectMessage等】具体类型的顶级接口,可以通过会话创建或通过会话从 ActiveMQ 服务中获取。
2.4.1.6 MessageProducer
消息生成者, 在一次有效会话中, 用于发送消息给ActiveMQ服务的工具,由Session会话创建。
2.4.1.7 MessageCustomer
消息消费者【消息订阅者,消息处理者】, 在一次有效会话中, 用于ActiveMQ服务中获取消息的工具,由Session会话创建。
2.5 第三部分:创建消息生成者,发送消息
注意:ActiveMQ 服务接受消息的入口是 61616 端口,防火墙还需要开放此端口
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123
| package com.shubao.mq.activemq.ptp;
import org.apache.activemq.ActiveMQConnectionFactory; import org.junit.Test;
import javax.jms.*;
public class MyProducer {
ConnectionFactory connectionFactory;
Connection connection;
Session session;
Destination destination;
MessageProducer producer;
Message message;
public void init() throws JMSException { connectionFactory = new ActiveMQConnectionFactory("tcp://localhost:61616"); connection = connectionFactory.createConnection(); connection.start(); session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE); destination = session.createQueue("myQueue"); producer = session.createProducer(destination); message = session.createTextMessage("hello world"); }
public void sendMessage(String message) throws JMSException { TextMessage textMessage = session.createTextMessage(message); producer.send(textMessage); }
public void close() throws JMSException { producer.close(); session.close(); connection.close(); }
@Test public void sentToActiveMQ(){ try {
connectionFactory = new ActiveMQConnectionFactory("admin", "admin", "tcp://127.0.0.1:61616"); connection = connectionFactory.createConnection(); connection.start();
session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE); destination = session.createQueue("myQueue"); producer = session.createProducer(destination); message = session.createTextMessage("hello world"); producer.send(message); } catch (JMSException e) { e.printStackTrace(); } finally { try { if (producer != null) { producer.close(); } if (session != null) { session.close(); } if (connection != null) { connection.close(); } } catch (JMSException e) { e.printStackTrace(); } } } }
|
使用测试方法执行后,可以看到成功将消息加入队列
2.6 第四部分:创建消息消费者,消费消息
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68
| package com.shubao.mq.activemq.ptp;
import org.apache.activemq.ActiveMQConnectionFactory;
import javax.jms.*;
public class MyConsumer {
ConnectionFactory connectionFactory;
Connection connection;
Session session;
Destination destination;
MessageConsumer consumer;
Message message;
public void init() throws Exception { connectionFactory = new ActiveMQConnectionFactory("tcp://localhost:61616"); connection = connectionFactory.createConnection(); connection.start(); session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE); destination = session.createQueue("myQueue"); consumer = session.createConsumer(destination);
message = consumer.receive(); }
public void close() throws Exception { consumer.close(); session.close(); connection.close(); }
public static void main(String[] args) throws Exception { MyConsumer myConsumer = new MyConsumer(); myConsumer.init(); System.out.println("接收到的消息是:" + ((TextMessage) myConsumer.message).getText()); myConsumer.close(); } }
|
测试结果
且后台管理界面也可以看到已被消费
3 ActiveMQ监听器
在前面的示例中,我们发现消费者每次只能消费一条消息。当队列中有多条消息的时候,我们需要多次运行消费者,才能消费完这些消息。我们希望一次将所有的消息全部接收,可以使用 ActiveMQ 监听器来监听队列,持续消费消息。
答:使用ActiveMQ监听器来监听队列,持续消费消息。
3.1 配置步骤说明
- 创建一个监听器对象。
- 修改消费者代码,加载监听器
3.2 配置步骤
3.2.1 第一步:创建监听器 MyListener 类
自定义监听器需要实现 MessageListener 接口
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27
| package com.shubao.mq.activemq.ptp;
import javax.jms.Message; import javax.jms.MessageListener; import javax.jms.TextMessage;
public class MyListener implements MessageListener {
@Override public void onMessage(Message message) { try { TextMessage textMessage = (TextMessage) message; System.out.println("收到消息:" + textMessage.getText()); } catch (Exception e) { e.printStackTrace(); } }
}
|
3.2.2 第二步:修改MyConsumer代码,加载监听器
监听器需要持续加载,因此需要使消费程序不结束。这里我们使用输入流阻塞消费线程结束。(实际开发中,使用web项目加载)
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74
| package com.shubao.mq.activemq.ptp;
import org.apache.activemq.ActiveMQConnectionFactory;
import javax.jms.*;
public class MyConsumer {
ConnectionFactory connectionFactory;
Connection connection;
Session session;
Destination destination;
MessageConsumer consumer;
Message message;
public void init() throws Exception { connectionFactory = new ActiveMQConnectionFactory("tcp://localhost:61616"); connection = connectionFactory.createConnection(); connection.start(); session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE); destination = session.createQueue("myQueue"); consumer = session.createConsumer(destination);
consumer.setMessageListener(new MyListener());
System.in.read(); }
public void close() throws Exception { consumer.close(); session.close(); connection.close(); }
public static void main(String[] args) throws Exception { MyConsumer myConsumer = new MyConsumer(); myConsumer.init(); System.out.println("接收到的消息是:" + ((TextMessage) myConsumer.message).getText()); myConsumer.close(); } }
|
3.3 测试
先添加 5 条消息到队列
运行 Consumer 的测试程序,可以看到连续接受了 5 条消息,且接续添加会继续输出消息。
4 ActiveMQ消息服务模式
在入门示例中,只能向一个消费者发送消息。但是有一些场景,需求有多个消费者都能接收到消息,比如:美团 APP 每天的消息推送。该如何实现呢?
ActiveMQ是通过不同的服务模式来解决这个问题的。要搞清楚这个问题,必须知道ActiveMQ有哪些应用模式。
4.1 PTP模式(point to point)
消息模型
- 消息生产者生产消息发送到queue中,然后消息消费者从queue中取出并且消费消息。
- 消息被消费以后,queue中不再有存储,所以消息消费者不可能消费到已经被消费的消息。
- Queue支持存在多个消费者,但是对一个消息而言,只会有一个消费者可以消费、其它的则不能消费此消息了。
- 当消费者不存在时,消息会一直保存,直到有消费消费
入门示例就是采用的这种 PTP 服务模式
4.2 TOPIC(主题订阅模式)
消息模型
- 消息生产者(发布)将消息发布到topic中,同时有多个消息消费者(订阅)消费该消息。
- 和点对点方式不同,发布到topic的消息会被所有订阅者消费。
- 当生产者发布消息,不管是否有消费者。都不会保存消息
所以,主题订阅模式下,==一定要先有消息的消费者(订阅者),后有消息的生产者(发布者)==。
5 Topic模式实现
5.1 配置步骤说明
搭建ActiveMQ消息服务器。
- 创建主题订阅者。
- 创建主题发布者。
5.2 配置步骤
5.2.1 第一部分:搭建消息服务器。(已实现)
5.2.2 第二部分:创建主题订阅者 MySubscriber
主题订阅模式下,可以有多个订阅者。我们这里用多线程来模拟。创建 MySubscriber 类,实现 Runnable 接口
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73
| package com.shubao.mq.activemq.topic;
import org.apache.activemq.ActiveMQConnectionFactory;
import javax.jms.*;
public class MySubscriber implements Runnable {
TopicConnectionFactory factory;
TopicConnection connection;
TopicSession session;
Topic topic;
TopicSubscriber subscriber;
Message message;
@Override public void run() { try { factory = new ActiveMQConnectionFactory("tcp://localhost:61616"); connection = factory.createTopicConnection(); connection.start(); session = connection.createTopicSession(false, TopicSession.AUTO_ACKNOWLEDGE); topic = session.createTopic("topic"); subscriber = session.createSubscriber(topic); message = subscriber.receive(); System.out.println("收到消息:" + ((TextMessage) message).getText()); } catch (Exception e) { e.printStackTrace(); } finally { try { if (subscriber != null) { subscriber.close(); } if (session != null) { session.close(); } if (connection != null) { connection.close(); } } catch (JMSException e) { e.printStackTrace(); } } } }
|
注意 ==junit 不支持多线程测试==,需要使用 main 方法执行测试
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20
| package com.shubao.mq.activemq.topic;
public class ActiveMQTest {
public static void main(String[] args) { MySubscriber subscriber = new MySubscriber(); Thread t1 = new Thread(subscriber); Thread t2 = new Thread(subscriber); t1.start(); t2.start(); } }
|
启动程序后可以从管理面板看到该 topic 下有两个订阅者
5.2.3 第三部分:创建主题发布者 MyPublisher
创建 MyPublish 类
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72
| package com.shubao.mq.activemq.topic;
import org.apache.activemq.ActiveMQConnectionFactory;
import javax.jms.*;
public class MyPublisher {
TopicConnectionFactory factory;
TopicConnection connection;
TopicSession session;
TopicPublisher publisher;
Topic topic;
Message message;
public void publishTopic(String topicName, String publishText) { try { factory = new ActiveMQConnectionFactory("tcp://localhost:61616"); connection = factory.createTopicConnection(); connection.start(); session = connection.createTopicSession(false, TopicSession.AUTO_ACKNOWLEDGE); topic = session.createTopic(topicName); publisher = session.createPublisher(topic); message = session.createTextMessage(publishText); publisher.send(message); } catch (Exception e) { e.printStackTrace(); } finally { try { if (null != publisher) { publisher.close(); } if (null != session) { session.close(); } if (null != connection) { connection.close(); } } catch (JMSException e) { e.printStackTrace(); } } } }
|
新增测试方法,发布主题消息
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20
| package com.shubao.mq.activemq.topic;
import org.junit.Test;
public class PublisherTest {
@Test public void topicPublishTest() { MyPublisher publisher = new MyPublisher(); publisher.publishTopic("topic", "hello, topic"); } }
|
5.2.4 第四部分:执行测试
执行 topicPublisherTest,可以看到订阅者立即收到了消息
同时在管理页面也能看到一条消息进入,被读取了两次。
6 ActiveMQ持久化
当队列中有未被消费的消息时,我们重新启动ActiveMQ服务器后,发现消息仍然在队列中。
ActiveMQ 是支持持久化的,可以永久保存消息。消息是保存在内存中的。当内存空间不足,或者ActiveMQ服务关闭的时候,消息会被持久化到磁盘上。被消费的时候,再加载到内存空间中。
ActiveMQ持久化方式在 activemq/conf/activemq.xml
中指定
6.1 kahadb 方式
kahadb 方式是 ActiveMQ 默认的持久化策略。不会保存已经被消费过的消息。从配置文件中可以看到默认的存储地址,也就是 /usr/local/activemq/data/kahadb
6.2 AMQ方式(已过时)
5.3 版本之前,现在已经过时,不考虑。
6.3 JDBC持久化方式
ActiveMQ 可以将数据持久化到数据库中,支持使用任意的数据库。
6.3.1 配置步骤说明
- 创建数据库
- 添加数据库连接 jar 依赖到 ActiveMQ 服务器
- 修改 ActiveMQ 配置,创建数据源。
- 修改 ActiveMQ 配置,修改持久化方式为 jdbc
6.3.2配置步骤
6.3.2.1 第一步:创建数据库
数据库最好不要跟 ActiveMQ 服务器在同一台机器。因为当 cpu 线程资源不足时,往队列中写入消息时,如果数据库上一次持久化还没结束,容易造成线程阻塞。
这里数据库建立在宿主机上,ActiveMQ 服务部署在虚拟机。
6.3.2.2 第二步:添加jar依赖
配置数据源时,是支持连接池的,这里使用 druid 作为连接池。将 jdbc 驱动、druid 的 jar 包上传到 activemq/lib/
目录下
6.3.2.3 第三步:修改 activemq/conf/activemq.xml
,创建数据源
注意在 <broker>
节点外,创建数据源。
1 2 3 4 5 6 7 8
| <bean id="mysql-ds" class="com.alibaba.druid.pool.DruidDataSource" destroy-method="close"> <property name="driverClassName" value="com.mysql.cj.jdbc.Driver" /> <property name="url" value="jdbc:mysql://127.0.0.1:3306/activemq?serverTimezone=UTC&useUnicode=true&characterEncoding=utf8&useSSL=false" /> <property name="username" value="root" /> <property name="password" value="1106135" /> <property name="maxActive" value="10" /> <property name="poolPreparedStatements" value="true" /> </bean>
|
6.3.2.4 第四步:修改 /conf/activemq.xml
,修改为 jdbc 持久化方式
在 <broker>
节点内部,注释 kahadb 方式,添加 jdbc 方式。
1 2 3
| <persistenceAdapter> <jdbcPersistenceAdapter dataSource="#mysql-ds" createTablesOnStartup="true"/> </persistenceAdapter>
|
6.3.3 测试
进入 activemq/bin
目录,使用命令 ./activemq restart
重新启动 ActiveMQ。可以看到,数据库中新增了三张表。
运行入门示例中的测试类,往队列中写入一条消息,可以在表activemq_msgs中,看到新写入了一条数据
6.3.4 三张表说明
- activemq_msgs :存储消息,Queue和Topic都存储在这个表中
- activemq_acks :用于存储订阅关系。订阅模式下有效
- activemq_lock :集群模式下,存储主从节点关系
6.3.5 补充说明
jdbc 持久化方式,只要Mysql数据库稳定运行,就能保证队列中消息的安全。安全级别高,但是效率低。因此,在实际开发中,除非是像银行这类对数据安全极高的业务,我们一般都是使用默认持久化方式 kahadb。
7 ActiveMQ应用场景
7.1 多模块解耦(模块之间消息通讯)
我们判断一个程序的优劣,有一个很重要的指标:高内聚、低耦合。
- 高内聚:同一个模块中,功能是高度紧密的。
- 低耦合:各模块之间,业务尽量不要交叉。
但是有一些业务功能,必须涉及到两个不同的业务,那我们就要想办法,尽量将它们解耦。以我们前面学习的 solr 为例,我们知道 solr 的数据来自数据库。这就意味着,当数据库中的商品发生变化时,我们需要同步更新索引库。这个时候我们就可以使用消息队列模型来解耦,添加添加业务和同步索引库业务。
7.2 流量削峰(解决并发请求)
订单处理,就可以由前端应用将订单信息放到队列,后端应用从队列里依次获得消息处理,高峰时的大量订单可以积压在队列里慢慢处理掉。由于同步通常意味着阻塞,而大量线程的阻塞会降低计算机的性能。
7.3 日志处理
日志处理是指将消息队列用在日志处理中,比如Kafka的应用,解决大量日志传输的问题。架构简化如下:
7.4 同步业务异步处理
需要:当我们在网站注册的时候,有时候需要认证邮箱或者手机号,这个时候保存数据到数据库之前,需要先等待认证结束。如果说认证程序耗时比较大,会影响影响用户注册的业务。这个时候,我们可以使用消息队列模型,将同步执行的业务,通过队列,变成异步处理。
- 在保存数据到数据库的时候,只需要将用户的邮箱写入队列,不需要等待邮箱认证程序执行结束,才把数据保存到数据库。
- 认证程序,通过监听队列,从中获取用户的邮箱地址,发送认证链接。
8 Spring 整合 ActiveMQ
8.1 必要性
Spring已经整合了jms规范了(spring-jms.jar),而ActiveMQ是实现了jms规范的。这就意味着Spring整合ActiveMQ是非常方便的。并且Spring-jms,提供了一个JmsTemplate类,用来简化消息读写的业务代码。Spring整合ActivMQ之后,就可以使用该类,简化开发。
8.2 需求
使用Spring整合ActiveMQ,模拟限时抢购下的流量削峰问题。
8.3 配置步骤
8.3.1 第一部分:创建项目(使用maven)
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53
| <project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd"> <modelVersion>4.0.0</modelVersion> <groupId>org.oza</groupId> <artifactId>activemq-demo04-spring</artifactId> <version>0.0.1-SNAPSHOT</version> <packaging>war</packaging> <dependencies> <dependency> <groupId>org.apache.activemq</groupId> <artifactId>activemq-all</artifactId> <version>5.15.9</version> </dependency> <dependency> <groupId>org.apache.activemq</groupId> <artifactId>activemq-pool</artifactId> <version>5.15.9</version> </dependency> <dependency> <groupId>org.springframework</groupId> <artifactId>spring-jms</artifactId> <version>5.1.8.RELEASE</version> </dependency> <dependency> <groupId>org.springframework</groupId> <artifactId>spring-webmvc</artifactId> <version>5.1.8.RELEASE</version> </dependency> <dependency> <groupId>javax.servlet</groupId> <artifactId>jstl</artifactId> <version>1.2</version> </dependency> <dependency> <groupId>javax.servlet</groupId> <artifactId>javax.servlet-api</artifactId> <version>3.1.0</version> <scope>provided</scope> </dependency> <dependency> <groupId>javax.servlet.jsp</groupId> <artifactId>javax.servlet.jsp-api</artifactId> <version>2.2.1</version> <scope>provided</scope> </dependency> <dependency> <groupId>javax.jms</groupId> <artifactId>javax.jms-api</artifactId> <version>2.0.1</version> </dependency> </dependencies> </project>
|
8.3.2 第二部分:spring整合springmvc
8.3.2.1 第一步:修改web.xml,配置springmvc核心控制器
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32
| <?xml version="1.0" encoding="UTF-8"?> <web-app xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xmlns="http://java.sun.com/xml/ns/javaee" xsi:schemaLocation="http://java.sun.com/xml/ns/javaee http://java.sun.com/xml/ns/javaee/web-app_2_5.xsd" version="2.5"> <display-name>activemq-demo04-spring</display-name> <filter> <filter-name>characterEncodingFilter</filter-name> <filter-class>org.springframework.web.filter.CharacterEncodingFilter</filter-class> <init-param> <param-name>encoding</param-name> <param-value>UTF-8</param-value> </init-param> </filter> <filter-mapping> <filter-name>characterEncodingFilter</filter-name> <url-pattern>/*</url-pattern> </filter-mapping> <servlet> <servlet-name>dispatcherServlet</servlet-name> <servlet-class>org.springframework.web.servlet.DispatcherServlet</servlet-class> <init-param> <param-name>contextConfigLocation</param-name> <param-value>classpath:spring*.xml</param-value> </init-param> <load-on-startup>1</load-on-startup> </servlet> <servlet-mapping> <servlet-name>dispatcherServlet</servlet-name> <url-pattern>/</url-pattern> </servlet-mapping> </web-app>
|
8.3.2.2 第二步:配置springmvc.xml核心配置文件
1 2 3 4 5 6 7 8 9 10 11 12
| <?xml version="1.0" encoding="UTF-8"?> <beans xmlns="http://www.springframework.org/schema/beans" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xmlns:context="http://www.springframework.org/schema/context" xmlns:mvc="http://www.springframework.org/schema/mvc" xmlns:jms="http://www.springframework.org/schema/jms" xsi:schemaLocation="http://www.springframework.org/schema/jms http://www.springframework.org/schema/jms/spring-jms-4.3.xsdhttp://www.springframework.org/schema/mvc http://www.springframework.org/schema/mvc/spring-mvc-4.3.xsdhttp://www.springframework.org/schema/beans http://www.springframework.org/schema/beans/spring-beans.xsdhttp://www.springframework.org/schema/context http://www.springframework.org/schema/context/spring-context-4.3.xsd"> <context:component-scan base-package="org.oza.activemq" /> <mvc:annotation-driven /> </beans>
|
8.3.2.3 第三步:创建相关jsp页面
订单页面:order.jsp
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15
| <%@ page language="java" contentType="text/html; charset=UTF-8" pageEncoding="UTF-8" %> <!DOCTYPE html> <html>
<head> <meta charset="UTF-8"> <title>Insert title here</title> </head>
<body> <form action="save" method="post">用户编号:<input type="text" name="userId"><br>订单金额:<input type="text" name="price"><br><input type="submit" value="提交"></form> </body>
</html>
|
结果页面:success.jsp
1 2 3 4 5 6 7 8 9 10 11 12
| <%@ page language="java" contentType="text/html; charset=UTF-8" pageEncoding="UTF-8" %> <!DOCTYPE html> <html>
<head> <meta charset="UTF-8"> <title>Insert title here</title> </head>
<body>订单提交成功!!!请稍后去结算中心支付。。。</body>
</html>
|
8.3.2.4 第四步:java代码实现
创建 Order 类
1
| package org.oza.activemq.pojo;import java.io.Serializable;public class Order implements Serializable{private static final long serialVersionUID = 3622062034498580108L;private Integer id;private Integer userId;private Float price;public Integer getId() {return id;}public void setId(Integer id) {this.id = id;}public Integer getUserId() {return userId;}public void setUserId(Integer userId) {this.userId = userId;}public Float getPrice() {return price;}public void setPrice(Float price) {this.price = price;}@Overridepublic String toString() {return "Order [id=" + id + ", userId=" + userId + ", price=" + price + "]";}}
|
创建 OrderController 类
1
| package org.oza.activemq.controller;import org.oza.activemq.pojo.Order;import org.oza.activemq.producer.OrderProducer;import org.springframework.beans.factory.annotation.Autowired;import org.springframework.stereotype.Controller;import org.springframework.web.bind.annotation.RequestMapping;@Controllerpublic class OrderController {@Autowiredprivate OrderProducer producer;@RequestMapping("/save")public String save(Order order) {System.out.println("当前提交的订单用户是:" + order.getUserId() + ",订单金额:" + order.getPrice());return "/success.jsp";}}
|
8.3.2.5 第五步:整合测试
访问 order 页面下订单,确定 springmvc 正常工作,然后进行下一步。
8.3.3 第三部分:Spring整合ActiveMQ
8.3.3.1 第一步:创建消息生成者 OrderProducer
在这里,我们注入 JmsTemplate 类,来简化代码。另外要注意:
- ActiveMQ 处理对象时,对象必须实现序列化。
- 匿名内部类访问外部类属性,该属性需要用final修饰。
1
| package org.oza.activemq.producer;import javax.jms.JMSException;import javax.jms.Message;import javax.jms.Session;import org.oza.activemq.pojo.Order;import org.springframework.beans.factory.annotation.Autowired;import org.springframework.jms.core.JmsTemplate;import org.springframework.jms.core.MessageCreator;import org.springframework.stereotype.Component;@Componentpublic class OrderProducer {@Autowiredprivate JmsTemplate jmsTemplate;
|
8.3.3.2 第二步:创建消息消费者OrderListener类
这里使用监听器模式
1
| package org.oza.activemq.listener;import javax.jms.Message;import javax.jms.MessageListener;import javax.jms.ObjectMessage;import org.oza.activemq.pojo.Order;import org.springframework.stereotype.Component;@Componentpublic class OrderListener implements MessageListener {public void onMessage(Message message) {ObjectMessage oMsg = (ObjectMessage)message;try {Order order = (Order)oMsg.getObject();System.out.println("当前提交的订单用户是:"+order.getUserId()+",订单金额:"+order.getPrice());} catch (Exception e) {e.printStackTrace();}}}
|
8.3.3.3 第三步:spring整合ActiveMQ
修改 springmvc 的配置文件,添加整合 ActiveMQ 配置
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40
| <?xml version="1.0" encoding="UTF-8"?> <beans xmlns="http://www.springframework.org/schema/beans" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xmlns:context="http://www.springframework.org/schema/context" xmlns:mvc="http://www.springframework.org/schema/mvc" xmlns:jms="http://www.springframework.org/schema/jms" xsi:schemaLocation="http://www.springframework.org/schema/jms http://www.springframework.org/schema/jms/spring-jms-4.3.xsdhttp://www.springframework.org/schema/mvc http://www.springframework.org/schema/mvc/spring-mvc-4.3.xsdhttp://www.springframework.org/schema/beans http://www.springframework.org/schema/beans/spring-beans.xsdhttp://www.springframework.org/schema/context http://www.springframework.org/schema/context/spring-context-4.3.xsd"> <context:component-scan base-package="org.oza.activemq" /> <mvc:annotation-driven />
<bean name="pooledConnectionFactory" class="org.apache.activemq.pool.PooledConnectionFactory"> <property name="connectionFactory"> <bean class="org.apache.activemq.ActiveMQConnectionFactory"> <property name="brokerURL" value="tcp://192.168.125.87:61616" /> <property name="userName" value="admin" /> <property name="password" value="admin" /> <property name="trustAllPackages" value="true" /> </bean> </property> <property name="maxConnections" value="20" /> </bean>
<bean name="cachingConnectionFactory" class="org.springframework.jms.connection.CachingConnectionFactory"> <property name="targetConnectionFactory" ref="pooledConnectionFactory" /> <property name="sessionCacheSize" value="5" /> </bean>
<bean name="jmsTemplate" class="org.springframework.jms.core.JmsTemplate"> <propertyname="connectionFactory" ref="cachingConnectionFactory"/> </bean>
<jms:listener-container acknowledge="auto" container-type="default" destination-type="queue" connection-factory="cachingConnectionFactory">
<jms:listener destination="order-mq" ref="orderListener" /> </jms:listener-container> </beans>
|
8.3.3.4 第四步:修改OrderController类
1
| package org.oza.activemq.controller;import org.oza.activemq.pojo.Order;import org.oza.activemq.producer.OrderProducer;import org.springframework.beans.factory.annotation.Autowired;import org.springframework.stereotype.Controller;import org.springframework.web.bind.annotation.RequestMapping;@Controllerpublic class OrderController {@Autowiredprivate OrderProducer producer;@RequestMapping("/save")public String save(Order order) {
|
8.4 整合测试
重新启动项目,提交多个订单,可以看到控制台持续输出,测试成功。