JMS簡介與ActiveMQ實戰(zhàn)代碼分享
一、異步通信
之前接觸到的RMI,Hessian等技術都是同步通信機制。當客戶端調(diào)用遠程方法時,客戶端必須等到遠程方法完成后,才能繼續(xù)執(zhí)行。這段時間客戶端一直會被阻塞(這樣造成的用戶體驗很不好)。

(同步通信)
同步通信有并不是程序之間交互的唯一方式,異步通信機制中,客戶端不需要等待服務處理消息,可以繼續(xù)執(zhí)行,并且最終能夠收到并處理消息。

(異步通信)
異步通信的優(yōu)勢
無需等待??蛻舳酥恍枰獙⑾l(fā)送給消息代理,不需要等待就可以繼續(xù)執(zhí)行別的任務,且確信消息會被投遞給相應的目的地。
面向消息和解耦。 客戶端不需要擔心遠程服務的接口規(guī)范,只需要把消息放入消息隊列然后獲取結(jié)果即可。
二、JMS
1. 簡介
在JMS出現(xiàn)之前,每個消息代理都是有不同的實現(xiàn),這就使得不同代理之間的消息代碼很難通用。JMS(Java Message Service,Java消息服務)是一個標準,定義了使用消息代理的通用API。即所有遵從規(guī)范的實現(xiàn)都使用通用的接口,類似于JDBC為數(shù)據(jù)庫操作提供通用接口。
JMS幾個重要的要素:
Destination:消息從發(fā)送端發(fā)出后要走的通道。
ConnectionFactory:連接工廠,用于創(chuàng)建連接的對象。
Connection:連接接口,用于創(chuàng)建session。
Session:會話接口,用于創(chuàng)建消息的發(fā)送者,接受者以及消息對象本身。
MessageConsumer:消息的消費者。
MessageProducer:消息的生產(chǎn)者。
XXXMessage:各種類型的消息對象,包括ByteMessage、MapMessage、ObjectMessage、StreamMessage和TextMessage 5種。
2. JMS消息模型
不同的消息系統(tǒng)有不同的消息模型。JMS提供了兩種模型:Queue(點對點)和Topic(發(fā)布/訂閱)。
JMS Queue(點對點)模型
在點對點模型中,消息生產(chǎn)者生產(chǎn)消息發(fā)送到queue中,然后消息消費者從queue中取出并且消費消息,但不可重復消費。
如圖:

發(fā)送者1,發(fā)送者2,發(fā)送者3各發(fā)送一條消息到服務器;
消息1,2,3就會按照順序形成一個隊列,隊列中的消息不知道自己會被哪個接收者消費;
接收者1,2,3分別從隊列中取出一條消息進行消費,每取出一條消息,隊列就會將該消息刪除,這樣即保證了消息不會被重復消費。
JMS Queue模型也成為P2P(Point to Point)模型。
JMS Topic(發(fā)布/訂閱)模型
JMS Topic模型與JMS Queue模型的最大差別在于消息接收的部分。Topic模型類似于微信公眾號,訂閱了該公眾號的接收者都可以接收到公眾號推送的消息。
如圖:

發(fā)布者1,2,3分別發(fā)布3個主題1,2,3;
這樣訂閱了主題1的用戶群:訂閱者1,2,3即能接收到主題1消息;同理訂閱者4,5,6即能接收到主題2消息,訂閱者7,8,9即能接收到主題3消息。
JMS Topic模型也成為Pus/Sub模型。
兩種模式下各要素的對比:

3. 傳統(tǒng)JMS編程模型
Producer:
(1)創(chuàng)建連接工廠ConnectionFactory;
(2) 使用連接工廠創(chuàng)建連接;
(3)啟動連接;
(4)創(chuàng)建會話;
(5) 創(chuàng)建消息發(fā)送的目的地;
(6)創(chuàng)建生產(chǎn)者;
(7)創(chuàng)建消息類型和消息內(nèi)容;
(8)發(fā)送消息;
Consumer:
(1)創(chuàng)建連接工廠ConnectionFactory;
(2) 使用連接工廠創(chuàng)建連接;
(3)啟動連接;
(4)創(chuàng)建會話;
(5) 創(chuàng)建消息發(fā)送的目的地;
(6)創(chuàng)建消費者
(7)創(chuàng)建消息類型;
(8)接收消息;
三、 ActiveMQ簡介
ActiveMQ 是Apache出品,最流行的,能力強勁的開源消息總線。ActiveMQ 是一個完全支持JMS1.1和J2EE 1.4規(guī)范的 JMS Provider實現(xiàn),盡管JMS規(guī)范出臺已經(jīng)是很久的事情了,但是JMS在當今的J2EE應用中間仍然扮演著特殊的地位。
ActiveMQ 主要特性:
多種語言和協(xié)議編寫客戶端。語言: Java,C,C++,C#,Ruby,Perl,Python,PHP。應用協(xié)議:
OpenWire,Stomp REST,WS Notification,XMPP,AMQP
完全支持JMS1.1和J2EE 1.4規(guī)范 (持久化,XA消息,事務)
對spring的支持,ActiveMQ可以很容易內(nèi)嵌到使用Spring的系統(tǒng)里面去,而且也支持Spring2.0的特性
通過了常見J2EE服務器(如 Geronimo,JBoss 4,GlassFish,WebLogic)的測試,其中通過JCA 1.5 resource adaptors的配置,可以讓ActiveMQ可以自動的部署到任何兼容J2EE 1.4 商業(yè)服務器上
支持多種傳送協(xié)議:in-VM,TCP,SSL,NIO,UDP,JGroups,JXTA
支持通過JDBC和journal提供高速的消息持久化
從設計上保證了高性能的集群,客戶端-服務器,點對點
支持Ajax
支持與Axis的整合
可以很容易得調(diào)用內(nèi)嵌JMS provider,進行測試
四、 ActiveMQ實戰(zhàn)
下面看看如何ActiveMQ實現(xiàn)一個簡單的消息隊列。
傳統(tǒng)的JMS編程模型
1. JMS Queue模型代碼實現(xiàn):
Producer:
package com.wgs.mq.queue;
import org.apache.activemq.ActiveMQConnectionFactory;
import javax.jms.*;
/**
* Created by GenshenWang.nomico on 2017/10/19.
*/
public class ActiveMQProducer {
private static final String URL = "tcp://localhost:61616";
private static final String QUEUE_NAME = "queue-name";
public static void main(String[] args) throws JMSException {
//1 創(chuàng)建連接工廠ConnectionFactory
ConnectionFactory connectionFactory = new ActiveMQConnectionFactory(URL);
//2 使用連接工廠創(chuàng)建連接
Connection connection = connectionFactory.createConnection();
//3 啟動連接
connection.start();
//4 創(chuàng)建會話
Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
//5 創(chuàng)建消息發(fā)送的目的地
Destination destination = session.createQueue(QUEUE_NAME);
//6 創(chuàng)建生產(chǎn)者
MessageProducer messageProducer = session.createProducer(destination);
//7 創(chuàng)建消息
TextMessage textMessage = session.createTextMessage();
for (int i = 1; i <= 100; i++) {
//8 創(chuàng)建消息內(nèi)容
textMessage.setText("發(fā)送者- 1 -發(fā)送消息:" + i);
//9 發(fā)送消息
messageProducer.send(textMessage);
}
System.out.println("消息發(fā)送成功");
session.close();
connection.close();
}
}
Conusmer:
package com.wgs.mq.queue;
import org.apache.activemq.ActiveMQConnectionFactory;
import javax.jms.*;
/**
* Created by GenshenWang.nomico on 2017/10/19.
*/
public class ActiveMQConsumer {
private static final String URL = "tcp://localhost:61616";
private static final String QUEUE_NAME = "queue-name";
public static void main(String[] args) throws JMSException {
//1 創(chuàng)建連接工廠ConnectionFactory
ConnectionFactory connectionFactory = new ActiveMQConnectionFactory(URL);
//2 使用連接工廠創(chuàng)建連接
Connection connection = connectionFactory.createConnection();
//3 啟動連接
connection.start();
//4 創(chuàng)建會話
Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
//5 創(chuàng)建消息發(fā)送的目的地
Destination destination = session.createQueue(QUEUE_NAME);
//6 創(chuàng)建消費者
MessageConsumer messageConsumer = session.createConsumer(destination);
messageConsumer.setMessageListener(new MessageListener() {
public void onMessage(Message message) {
//7 創(chuàng)建消息
TextMessage textMessage = (TextMessage)message;
try {
//7 接收消息
System.out.println("消費者- 1 -接收消息:【" + textMessage.getText() + "】");
}
catch (JMSException e) {
e.printStackTrace();
}
}
}
);
}
}
2. JMS Topic模型代碼實現(xiàn):
Producer:
package com.wgs.mq.topic;
import org.apache.activemq.ActiveMQConnectionFactory;
import javax.jms.*;
/**
* 發(fā)布訂閱模式
* Created by GenshenWang.nomico on 2017/10/19.
*/
public class ActiveMQProducer {
private static final String URL = "tcp://localhost:61616";
private static final String TOPIC_NAME = "topic-name";
public static void main(String[] args) throws JMSException {
//1 創(chuàng)建連接工廠ConnectionFactory
ConnectionFactory connectionFactory = new ActiveMQConnectionFactory(URL);
//2 使用連接工廠創(chuàng)建連接
Connection connection = connectionFactory.createConnection();
//3 啟動連接
connection.start();
//4 創(chuàng)建會話
Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
//5 創(chuàng)建帶有主題的消息發(fā)送的目的地
Destination destination = session.createTopic(TOPIC_NAME);
//6 創(chuàng)建生產(chǎn)者
MessageProducer messageProducer = session.createProducer(destination);
//7 創(chuàng)建消息
TextMessage textMessage = session.createTextMessage();
for (int i = 1; i <= 100; i++) {
//8 創(chuàng)建消息內(nèi)容
textMessage.setText("發(fā)送者- 1 -發(fā)送消息:" + i);
//9 發(fā)送消息
messageProducer.send(textMessage);
}
System.out.println("消息發(fā)送成功");
session.close();
connection.close();
}
}
Consumer:
package com.wgs.mq.topic;
import org.apache.activemq.ActiveMQConnectionFactory;
import javax.jms.*;
/**
* 發(fā)布訂閱模式
* Created by GenshenWang.nomico on 2017/10/19.
*/
public class ActiveMQConsumer {
private static final String URL = "tcp://localhost:61616";
private static final String TOPIC_NAME = "topic-name";
public static void main(String[] args) throws JMSException {
//1 創(chuàng)建連接工廠ConnectionFactory
ConnectionFactory connectionFactory = new ActiveMQConnectionFactory(URL);
//2 使用連接工廠創(chuàng)建連接
Connection connection = connectionFactory.createConnection();
//3 啟動連接
connection.start();
//4 創(chuàng)建會話
Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
//5 創(chuàng)建消息發(fā)送的目的地
Destination destination = session.createTopic(TOPIC_NAME);
//6 創(chuàng)建消費者
MessageConsumer messageConsumer = session.createConsumer(destination);
messageConsumer.setMessageListener(new MessageListener() {
public void onMessage(Message message) {
//7 創(chuàng)建消息
TextMessage textMessage = (TextMessage)message;
try {
//7 接收消息
System.out.println("消費者- 1 -接收消息:【" + textMessage.getText() + "】");
}
catch (JMSException e) {
e.printStackTrace();
}
}
}
);
}
}
使用Spring的JMS模板
雖然JMS為所有的消息代理提供了統(tǒng)一的接口,但如同JDBC一樣,在處理連接,語句,結(jié)果集和異常時會顯得很繁雜。不過,Spring為我們提供了JmsTemplate來消除冗余和重復的JMS代碼。
下面看看如何使用JmsTemplate來實現(xiàn)消息隊列。
1. JMS Queue模型代碼實現(xiàn):
配置文件:
producer.xml:
<?xml version="1.0" encoding="UTF-8"?>
<beans xmlns="http://www.springframework.org/schema/beans"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xmlns:context="http://www.springframework.org/schema/context"
xsi:schemaLocation="http://www.springframework.org/schema/beans http://www.springframework.org/schema/beans/spring-beans.xsd http://www.springframework.org/schema/context http://www.springframework.org/schema/context/spring-context.xsd">
<context:annotation-config/>
<!-- ActiveMQ提供的ConnectionFactory-->
<bean id="targetConnectionFactory" class="org.apache.activemq.ActiveMQConnectionFactory">
<property name="brokerURL" value="tcp://localhost:61616"/>
</bean>
<!-- 在Spring 中配置JMS連接工廠,連接到ActiveMQ提供的ConnectionFactory-->
<bean id="connectionFactory" class="org.springframework.jms.connection.SingleConnectionFactory">
<property name="targetConnectionFactory" ref = "targetConnectionFactory"/>
</bean>
<!-- 配置JmsTemplate,用于發(fā)送消息 -->
<bean id="jmsTemplate" class="org.springframework.jms.core.JmsTemplate">
<property name="connectionFactory" ref="connectionFactory"/>
</bean>
<!-- 配置隊列目的地的名稱-->
<bean id="queueDestination" class="org.apache.activemq.command.ActiveMQQueue">
<constructor-arg value="queue-spring-name"/>
</bean>
<!-- 配置隊列目的地的名稱-->
<bean id="topicDestination" class="org.apache.activemq.command.ActiveMQTopic">
<constructor-arg value="topic-spring-name"/>
</bean>
<bean id="producerServiceImpl" class="com.wgs.jms.producer.ActiveMQProducerServiceImpl"/>
</beans>
consumer.xml:
<?xml version="1.0" encoding="UTF-8"?>
<beans xmlns="http://www.springframework.org/schema/beans"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xmlns:context="http://www.springframework.org/schema/context"
xsi:schemaLocation="http://www.springframework.org/schema/beans http://www.springframework.org/schema/beans/spring-beans.xsd http://www.springframework.org/schema/context http://www.springframework.org/schema/context/spring-context.xsd">
<context:annotation-config/>
<!-- ActiveMQ提供的ConnectionFactory-->
<bean id="targetConnectionFactory" class="org.apache.activemq.ActiveMQConnectionFactory">
<property name="brokerURL" value="tcp://localhost:61616"/>
</bean>
<!-- 在Spring 中配置JMS連接工廠,連接到ActiveMQ提供的ConnectionFactory-->
<bean id="connectionFactory" class="org.springframework.jms.connection.SingleConnectionFactory">
<property name="targetConnectionFactory" ref = "targetConnectionFactory"/>
</bean>
<!-- 配置隊列目的地的名稱-->
<bean id="queueDestination" class="org.apache.activemq.command.ActiveMQQueue">
<constructor-arg value="queue-spring-name"/>
</bean>
<!-- 配置消息監(jiān)聽器-->
<bean id="consumerMessageListener" class="com.wgs.jms.consumer.ConsumerMessageListener"/>
<!-- 配置隊列目的地的名稱-->
<bean id="jmsContainer" class="org.springframework.jms.listener.DefaultMessageListenerContainer">
<property name="destination" ref="queueDestination"/>
<property name="connectionFactory" ref="connectionFactory"/>
<property name="messageListener" ref="consumerMessageListener"/>
</bean>
<!-- 配置隊列目的地的名稱-->
<bean id="topicDestination" class="org.apache.activemq.command.ActiveMQTopic">
<constructor-arg value="topic-spring-name"/>
</bean>
</beans>
生產(chǎn)者Producer:
(1)先寫一個接口:
package com.wgs.jms.producer;
/**
* Created by GenshenWang.nomico on 2017/10/20.
*/
public interface ActiveMQProducerService {
void sendMessage(final String message);
}
(2)接口的實現(xiàn):
package com.wgs.jms.producer;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.jms.core.JmsTemplate;
import org.springframework.jms.core.MessageCreator;
import javax.annotation.Resource;
import javax.jms.*;
/**
* Created by GenshenWang.nomico on 2017/10/20.
*/
public class ActiveMQProducerServiceImpl implements ActiveMQProducerService {
@Autowired
JmsTemplate jmsTemplate;
@Resource(name = "queueDestination")
Destination destination;
public void sendMessage(final String message) {
jmsTemplate.send(destination, new MessageCreator() {
public Message createMessage(Session session) throws JMSException {
TextMessage textMessage = session.createTextMessage(message);
return textMessage;
}
}
);
System.out.println("生產(chǎn)者- 1 -發(fā)送消息成功:" + message);
}
}
(3)測試:
package com.wgs.jms.producer;
import org.springframework.context.support.ClassPathXmlApplicationContext;
/**
* Created by GenshenWang.nomico on 2017/10/20.
*/
public class ActiveMQProducerMain {
public static void main(String[] args) {
ClassPathXmlApplicationContext context = new ClassPathXmlApplicationContext("producer.xml");
ActiveMQProducerService service = context.getBean(ActiveMQProducerService.class);
for (int i = 0; i < 100; i++) {
service.sendMessage("test" + i);
}
context.close();
}
}
消費者:
(1)創(chuàng)建消息監(jiān)聽器:
package com.wgs.jms.consumer;
import javax.jms.JMSException;
import javax.jms.Message;
import javax.jms.MessageListener;
import javax.jms.TextMessage;
/**
* Created by GenshenWang.nomico on 2017/10/20.
*/
public class ConsumerMessageListener implements MessageListener {
public void onMessage(Message message) {
try {
TextMessage textMessage = (TextMessage) message;
System.out.println("消費者- 1 -接收消息:" + textMessage.getText());
}
catch (JMSException e) {
e.printStackTrace();
}
}
}
(2)測試:
package com.wgs.jms.consumer;
import org.springframework.context.support.ClassPathXmlApplicationContext;
/**
* Created by GenshenWang.nomico on 2017/10/20.
*/
public class ActiveMQConsumerMain {
public static void main(String[] args) {
ClassPathXmlApplicationContext context = new ClassPathXmlApplicationContext("consumer.xml");
}
}
2. JMS Topic模型代碼實現(xiàn):
將上述代碼中出現(xiàn)的queueDestination改為topicDestination即可。
總結(jié)
以上就是本文關于JMS簡介與ActiveMQ實戰(zhàn)代碼分享的全部內(nèi)容,希望對大家有所幫助。感興趣的朋友可以繼續(xù)參閱本站其他相關專題,如有不足之處,歡迎留言指出。感謝朋友們對本站的支持!
相關文章
java集合類arraylist循環(huán)中刪除特定元素的方法
下面小編就為大家?guī)硪黄狫ava集合類ArrayList循環(huán)中刪除特定元素的方法。小編覺得挺不錯的,現(xiàn)在就分享給大家,也給大家做個參考。一起跟隨小編過來看看吧2016-11-11
Mybatis實體類屬性與數(shù)據(jù)庫不一致解決方案
這篇文章主要介紹了Mybatis實體類屬性與數(shù)據(jù)庫不一致解決方案,文中通過示例代碼介紹的非常詳細,對大家的學習或者工作具有一定的參考學習價值,需要的朋友可以參考下2020-10-10
mybatis調(diào)用存儲過程,帶in、out參數(shù)問題
這篇文章主要介紹了mybatis調(diào)用存儲過程,帶in、out參數(shù)問題,具有很好的參考價值,希望對大家有所幫助,如有錯誤或未考慮完全的地方,望不吝賜教2024-01-01
Java使用Jdbc連接Oracle執(zhí)行簡單查詢操作示例
這篇文章主要介紹了Java使用Jdbc連接Oracle執(zhí)行簡單查詢操作,結(jié)合實例形式詳細分析了java基于jdbc實現(xiàn)Oracle數(shù)據(jù)庫的連接與查詢相關操作技巧,需要的朋友可以參考下2019-09-09

