⼀、什么是消息中间件
消息中间件顾名思义实现的就是在两个系统或两个客户端之间进⾏消息传送
⼆、什么是ActiveMQ
ActiveMQ是⼀种开源的基于JMS(Java Message Servie)规范的⼀种消息中间件的实现,ActiveMQ的设计⽬标是提供标准的,⾯向消息的,能够跨越多语⾔和多系统的应⽤集成消息通信中间件。
三、什么时候需要⽤ActiveMQ
ActiveMQ常被应⽤与系统业务的解耦,异步消息的推送,增加系统并发量,提⾼⽤户体验。例如以我在⼯作中的使⽤,在⽐较耗时且异步的远程开锁操作时
四、如何使⽤ActiveMQ
1.AcitveMQ的数据传送流程2.ActiveMQ的两种消息传递类型
(1)点对点传输,即⼀个⽣产者对应⼀个消费者,⽣产者向broke推送数据,数据存储在broke的⼀个队列中,当消费者接受该条队列⾥的数据。
两种消息传递类型的不同,点对点传输消费者可以接收到在连接之前⽣产者所推送的数据,⽽基于发布/订阅模式的传输⽅式消费者只能接收到连接之后⽣产者推送的数据。3.ActiveMQ的安装与启动(1)官⽹下载对应服务器版本
(2)解压后进⼊apache-activemq-5.15.9/bin⽬录(3)执⾏./activemq start启动ActiveMQ
(4)浏览器输⼊ActiveMQ启动的服务器ip:8161便可进⼊web界⾯,点击Manage ActiveMQ broker可以查看消息推送的状态,默认账号密码为admin,admin
(5)启动错误分析
进⼊/root/apache-activemq-5.15.9/data⽬录查看activemq.log⽂件,根据错误提⽰信息修改,例如端⼝号被占⽤等。4.ActiveMQ的代码测试
(1)构建maven项⽬,引⼊依赖
(2)⽣产者类
/**
* @Description ⽣产者 * @Date 2019/7/20 * @Created by yqh */
public class MyProducer {
private static final String ACTIVEMQ_URL = \"tcp://192.168.168.242:61616\";
public static void main(String[] args) throws JMSException { // 创建连接⼯⼚
ActiveMQConnectionFactory activeMQConnectionFactory = new ActiveMQConnectionFactory(ACTIVEMQ_URL); // 创建连接
Connection connection = activeMQConnectionFactory.createConnection(); // 打开连接
connection.start(); // 创建会话
Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE); // 创建队列⽬标,并标识队列名称,消费者根据队列名称接收数据 Destination destination = session.createQueue(\"myQueue\"); // 创建⼀个⽣产者
MessageProducer producer = session.createProducer(destination); // 向队列推送10个⽂本消息数据 for (int i = 1 ; i <= 10 ; i++){ // 创建⽂本消息
TextMessage message = session.createTextMessage(\"第\" + i + \"个⽂本消息\"); //发送消息
producer.send(message); //在本地打印消息
System.out.println(\"已发送的消息:\" + message.getText()); }
//关闭连接
connection.close(); }}
运⾏结果:
已发送的消息:第1个⽂本消息已发送的消息:第2个⽂本消息已发送的消息:第3个⽂本消息已发送的消息:第4个⽂本消息
已发送的消息:第5个⽂本消息已发送的消息:第6个⽂本消息已发送的消息:第7个⽂本消息已发送的消息:第8个⽂本消息已发送的消息:第9个⽂本消息已发送的消息:第10个⽂本消息
测试查看web后台显⽰,有10条消息在队列中等待消费
(3)消费者类
/**
* @Description 消费者类 * @Date 2019/7/20 0020 * @Created by yqh */
public class MyConsumer {
private static final String ACTIVEMQ_URL = \"tcp://192.168.168.242:61616\";
public static void main(String[] args) throws JMSException { // 创建连接⼯⼚
ActiveMQConnectionFactory activeMQConnectionFactory = new ActiveMQConnectionFactory(ACTIVEMQ_URL); // 创建连接
Connection connection = activeMQConnectionFactory.createConnection(); // 打开连接
connection.start(); // 创建会话
Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE); // 创建队列⽬标,并标识队列名称,消费者根据队列名称接收数据 Destination destination = session.createQueue(\"myQueue\"); // 创建消费者
MessageConsumer consumer = session.createConsumer(destination); // 创建消费的监听
consumer.setMessageListener(new MessageListener() { public void onMessage(Message message) {
TextMessage textMessage = (TextMessage) message; try {
System.out.println(\"消费的消息:\" + textMessage.getText()); } catch (JMSException e) { e.printStackTrace(); } } }); }}
测试结果:
消费的消息:第1个⽂本消息消费的消息:第2个⽂本消息消费的消息:第3个⽂本消息消费的消息:第4个⽂本消息消费的消息:第5个⽂本消息消费的消息:第6个⽂本消息消费的消息:第7个⽂本消息消费的消息:第8个⽂本消息消费的消息:第9个⽂本消息消费的消息:第10个⽂本消息
web后台显⽰有⼀个消费者处于连接状态,且已消费了10个message,⽽该条队列已没有message待消费了
(4)当我们运⾏两个消费者类,消息⼜是怎么被消费的呢?是两个消费者都能收到⽣产者⽣产的message,还是只有其中⼀个消费者能消费呢?我们先运⾏两个消费者,在运⾏⼀个⽣产者对⽬标队列⽣产10个message,会发现有以下情况
// Consumer1控制台
消费的消息:第1个⽂本消息消费的消息:第3个⽂本消息消费的消息:第5个⽂本消息消费的消息:第7个⽂本消息消费的消息:第9个⽂本消息// Consumer2控制台
消费的消息:第2个⽂本消息消费的消息:第4个⽂本消息消费的消息:第6个⽂本消息消费的消息:第8个⽂本消息消费的消息:第10个⽂本消息
即队列中的数据会平均的分给每⼀个消费者消费,且每⼀条数据只能被消费⼀次(5)以上是基于队列点对点的传输类型,以下是基于发布/订阅模式传输的类型测试
/**
* @Description 基于发布/订阅模式传输类型的⽣产者测试 * @Date 2019/7/20 0020 * @Created by yqh */
public class MyProducerForTopic {
private static final String ACTIVEMQ_URL = \"tcp://192.168.168.242:61616\";
public static void main(String[] args) throws JMSException { // 创建连接⼯⼚
ActiveMQConnectionFactory activeMQConnectionFactory = new ActiveMQConnectionFactory(ACTIVEMQ_URL);
// 创建连接
Connection connection = activeMQConnectionFactory.createConnection(); // 打开连接
connection.start(); // 创建会话
Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE); // 创建队列⽬标,并标识队列名称,消费者根据队列名称接收数据 Destination destination = session.createTopic(\"topicTest\"); // 创建⼀个⽣产者
MessageProducer producer = session.createProducer(destination); // 向队列推送10个⽂本消息数据 for (int i = 1 ; i <= 10 ; i++){ // 创建⽂本消息
TextMessage message = session.createTextMessage(\"第\" + i + \"个⽂本消息\"); //发送消息
producer.send(message); //在本地打印消息
System.out.println(\"已发送的消息:\" + message.getText()); }
//关闭连接
connection.close(); }}
/**
* @Description 基于发布/订阅模式传输类型的消费者测试 * @Date 2019/7/20 0020 * @Created by yqh */
public class MyConsumerForTopic {
private static final String ACTIVEMQ_URL = \"tcp://192.168.168.242:61616\";
public static void main(String[] args) throws JMSException { // 创建连接⼯⼚
ActiveMQConnectionFactory activeMQConnectionFactory = new ActiveMQConnectionFactory(ACTIVEMQ_URL); // 创建连接
Connection connection = activeMQConnectionFactory.createConnection(); // 打开连接
connection.start(); // 创建会话
Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE); // 创建队列⽬标,并标识队列名称,消费者根据队列名称接收数据 Destination destination = session.createTopic(\"topicTest\"); // 创建消费者
MessageConsumer consumer = session.createConsumer(destination); // 创建消费的监听
consumer.setMessageListener(new MessageListener() { public void onMessage(Message message) {
TextMessage textMessage = (TextMessage) message; try {
System.out.println(\"消费的消息:\" + textMessage.getText()); } catch (JMSException e) { e.printStackTrace(); } } }); }}
现在如果我们先启动⽣产者,再启动消费者,会发现消费者是⽆法接收到之前⽣产者之前所⽣产的数据,只有消费者先启动,再让⽣产者消费才可以正常接收数据,这也是发布/订阅的主题模式与点对点的队列模式的⼀个明显区别。
⽽如果启动两个消费者,那么每⼀个消费者都能完整的接收到⽣产者⽣产的数据,即每⼀条数据都被消费了两次,这是发布/订阅的主题模式与点对点的队列模式的另⼀个明显区别。
因篇幅问题不能全部显示,请点此查看更多更全内容