|
開始文章之前先澄清幾個(gè)概念
什么是消息 消息是一個(gè)用于在組件和應(yīng)用程序之間通訊的的方法。消息之間的傳遞是點(diǎn)對(duì)點(diǎn)的。任何終端之間都可以相互接受和發(fā)送消息。并且每個(gè)終端都必須遵守如下的規(guī)則 -> 創(chuàng)建消息 -> 發(fā)送消息 -> 接收消息 -> 讀取消息
為什么要使用消息 理由很簡單,消息是一個(gè)分布式的低耦合通訊方案。A發(fā)送一個(gè)消息到一個(gè)agent ,B作為接受者去agent上獲取消息。但是A,B不需要同時(shí)到agent上去注冊(cè)。agent作為一個(gè)中轉(zhuǎn)為A,B提供搞效率的通訊服務(wù)。
開發(fā)者的關(guān)注點(diǎn) 走到這里,我也不想去解釋jms spec上那些抽象且復(fù)雜的概念了,說的很白,1年多了我自己也沒弄懂是個(gè)什么東西,也沒時(shí)間從頭到尾去仔細(xì)的看,同時(shí)我認(rèn)為沒必要,我所關(guān)注的是如何讓jms跑起來,并且工作正常,所以spec只是個(gè)字典,當(dāng)我需要用的時(shí)候才去查。
開發(fā)者的jms環(huán)境 遵守簡單明了的原則,所謂jms環(huán)境只是2個(gè)對(duì)象 1> ConnectionFactory 2> Destination 通常Provider會(huì)提供JNDI的對(duì)象獲取,具體方法可以去Privider的網(wǎng)站上搜索jndi support
下面我以jbossMq為介質(zhì)跑一個(gè)簡單的jms,為了保證jms的本質(zhì)清晰,我沒有使用jbossMq的Api,而是直接調(diào)用的jms Api.
java 代碼
- package com.javaeye.jms.jboss;
-
- import javax.jms.Connection;
- import javax.jms.ConnectionFactory;
- import javax.jms.Destination;
- import javax.jms.JMSException;
- import javax.jms.MessageConsumer;
- import javax.jms.MessageProducer;
- import javax.jms.Queue;
- import javax.jms.QueueSender;
- import javax.jms.Session;
- import javax.jms.TextMessage;
- import javax.naming.Context;
- import javax.naming.InitialContext;
- import javax.naming.NamingException;
-
- public class JbossNativeJmsImpl {
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
- public void sendingProcessing(String messege) throws NamingException, JMSException{
- Context ctx = new InitialContext();
- ConnectionFactory cf = (ConnectionFactory) ctx.lookup("java:JmsXA");
- Connection conn = cf.createConnection();
- Session session = conn.createSession(false, Session.AUTO_ACKNOWLEDGE);
- Destination dest = (Queue) ctx.lookup("queue/A");
- MessageProducer msgp = session.createProducer(dest);
- QueueSender sender = (QueueSender) msgp;
- TextMessage msg = session.createTextMessage();
- msg.setText(messege);
- sender.send(msg);
- conn.close();
- }
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
- public String retriveingProcessing() throws NamingException, JMSException{
- Context ctx = new InitialContext();
- ConnectionFactory cf = (ConnectionFactory) ctx.lookup("java:JmsXA");
- Connection conn = cf.createConnection();
- Session session = conn.createSession(false, Session.AUTO_ACKNOWLEDGE);
- Destination dest = (Queue) ctx.lookup("queue/A");
- MessageConsumer msgconsumer = session.createConsumer(dest);
-
-
- conn.start();
- TextMessage msg = (TextMessage) msgconsumer.receive();
- conn.close();
- System.out.println("messege is" + msg.getText());
- return msg.getText();
- }
- }
注意retrive函數(shù)中comment的掉的兩行,消息Listener的作用是實(shí)現(xiàn)異步通訊,但是它有一個(gè)約定,必須和發(fā)送者 保持物理上的分離,針對(duì)于jboss而言,就要求這個(gè)Listener必須跑在容器外面。這是一個(gè)很搞的問題,每天Jms的郵件列表里面都有無數(shù)的這樣的問題發(fā)過來。但是回復(fù)的人很少。我自己也從來不回復(fù)。 其實(shí)我也不清楚寫這篇文章到底是出于什么目的,怕只是讓這么一個(gè)簡單的問題有一個(gè)回答而已。
把下面這個(gè)程序跑起來就可以異步接受消息了。
java 代碼
- package com.javaeye.jms.jboss;
-
- import java.util.Properties;
-
- import javax.jms.Connection;
- import javax.jms.ConnectionFactory;
- import javax.jms.Destination;
- import javax.jms.JMSException;
- import javax.jms.MessageConsumer;
- import javax.jms.MessageListener;
- import javax.jms.Session;
- import javax.naming.Context;
- import javax.naming.InitialContext;
- import javax.naming.NamingException;
-
- import com.javaeye.spring.services.jms.mdp.JmsListenner;
-
- public class JbossJmsAsync {
-
-
-
-
-
-
- public static void main(String[] args) throws NamingException, JMSException {
- Properties pops = new Properties();
- pops.setProperty("jboss.bind.address", "0.0.0.0");
- pops.setProperty("java.naming.factory.initial", "org.jnp.interfaces.NamingContextFactory");
- pops.setProperty("java.naming.factory.url.pkgs", "org.jboss.naming:org.jnp.interfaces");
- pops.setProperty("java.naming.provider.url", "localhost");
- Context ctx = new InitialContext(pops);
- ConnectionFactory cf = (ConnectionFactory) ctx.lookup("ConnectionFactory");
- Connection conn = cf.createConnection();
- Session session = conn.createSession(false, Session.AUTO_ACKNOWLEDGE);
- Destination dest = (Destination) ctx.lookup("queue/A");
- MessageConsumer msgConsumer = session.createConsumer(dest);
- MessageListener ml = new JmsListenner();
- msgConsumer.setMessageListener(ml);
- conn.start();
- }
-
- }
javaeye的主題好像是spring,為了迎合領(lǐng)導(dǎo),下面我把這套東西跑在spring里面。同時(shí)我發(fā)現(xiàn)spring對(duì)jms的包裝真的簡單,而且還提供了一個(gè)模版,雖然這個(gè)模版的接口是在是很羅唆。
ps:今天是第1次用spring在reference里找了半天找不到方法注入的辦法,于是google了一個(gè)注入辦法,不合理的地方請(qǐng)大家指出。首先我通過方法來注入ConnectionFactory和Destination這兩個(gè)對(duì)象來支撐jms環(huán)境
java 代碼
- package com.javaeye.spring.services.jms.mdp;
-
- import java.util.Properties;
-
- import javax.jms.ConnectionFactory;
- import javax.jms.Destination;
- import javax.jms.Queue;
- import javax.naming.Context;
- import javax.naming.InitialContext;
- import javax.naming.NamingException;
-
- public class UserJmsTransactionUtil {
-
- private String connectionFactoryJndiLookUp;
-
- private String destinationJndiLookUp;
- private String localConnectionFactoryJndiLookUp;
-
- private String containerType;
-
-
- public String getConnectionFactoryJndiLookUp() {
- return connectionFactoryJndiLookUp;
- }
-
-
-
- public void setConnectionFactoryJndiLookUp(String connectionFactoryJndiLookUp) {
- this.connectionFactoryJndiLookUp = connectionFactoryJndiLookUp;
- }
-
-
-
- public String getDestinationJndiLookUp() {
- return destinationJndiLookUp;
- }
-
-
-
- public void setDestinationJndiLookUp(String destinationJndiLookUp) {
- this.destinationJndiLookUp = destinationJndiLookUp;
- }
-
-
-
- public ConnectionFactory getConnectionFactory() throws NamingException{
- Context ctx = new InitialContext();
- ConnectionFactory cf = (ConnectionFactory) ctx.lookup(connectionFactoryJndiLookUp);
- return cf;
- }
-
-
- public Destination getJmsDestination() throws NamingException{
- Context ctx = new InitialContext();
- Destination dest = (Queue) ctx.lookup(destinationJndiLookUp);
- return dest;
- }
-
-
- public ConnectionFactory getQueueConnectionFactory() throws NamingException{
- Properties pops = new Properties();
- pops.setProperty("jboss.bind.address", "0.0.0.0");
- pops.setProperty("java.naming.factory.initial", "org.jnp.interfaces.NamingContextFactory");
- pops.setProperty("java.naming.factory.url.pkgs", "org.jboss.naming:org.jnp.interfaces");
- pops.setProperty("java.naming.provider.url", "localhost");
- Context ctx = new InitialContext(pops);
- ConnectionFactory cf = (ConnectionFactory) ctx.lookup(localConnectionFactoryJndiLookUp);
- return cf;
- }
-
-
- public Destination getLocalJmsDestination() throws NamingException{
- Properties pops = new Properties();
- pops.setProperty("jboss.bind.address", "0.0.0.0");
- pops.setProperty("java.naming.factory.initial", "org.jnp.interfaces.NamingContextFactory");
- pops.setProperty("java.naming.factory.url.pkgs", "org.jboss.naming:org.jnp.interfaces");
- pops.setProperty("java.naming.provider.url", "localhost");
- Context ctx = new InitialContext(pops);
- Destination dest = (Destination) ctx.lookup(destinationJndiLookUp);
- return dest;
- }
-
-
-
- public String getLocalConnectionFactoryJndiLookUp() {
- return localConnectionFactoryJndiLookUp;
- }
-
-
-
- public void setLocalConnectionFactoryJndiLookUp(
- String localConnectionFactoryJndiLookUp) {
- this.localConnectionFactoryJndiLookUp = localConnectionFactoryJndiLookUp;
- }
- }
發(fā)送端的配置如下
xml 代碼
- <beans>
- <bean id="userJmsUtil" class="com.javaeye.spring.services.jms.mdp.UserJmsTransactionUtil">
- <property name="connectionFactoryJndiLookUp" value="java:JmsXA"><!--</span-->property>
- <property name="destinationJndiLookUp" value="queue/A"><!--</span-->property>
- <property name="localConnectionFactoryJndiLookUp" value="ConnectionFactory"><!--</span-->property>
- <!--</span-->bean>
-
- <bean ="connectionFactory" class="org.springframework.beans.factory.config.MethodInvokingFactoryBean">
- <property name="targetObject" ref="userJmsUtil"><!--</span-->property>
- <property name="targetMethod" value="getConnectionFactory"><!--</span-->property>
- <!--</span-->bean>
-
- <bean id="queue" class="org.springframework.beans.factory.config.MethodInvokingFactoryBean">
- <property name="targetObject" ref="userJmsUtil"><!--</span-->property>
- <property name="targetMethod" value="getJmsDestination"><!--</span-->property>
- <!--</span-->bean>
-
- <bean id="jmsQueue" class="org.springframework.jms.core.JmsTemplate">
- <property name="connectionFactory" ref="connectionFactory"><!--</span-->property>
- <property name="defaultDestination" ref="queue"><!--</span-->property>
- <property name="messageConverter">
- <bean class="org.springframework.jms.support.converter.SimpleMessageConverter"><!--</span-->bean>
- <!--</span-->property>
- <!--</span-->bean>
- <!--</span-->beans>
ps:javaeye的模版工具bug還真多,不管了.
如果使用Listenner的化,一樣需要遵守發(fā)送者和接收者物理隔離的原則,我的做法是把發(fā)送者配到一個(gè)xml中,在把接受者配到另外一個(gè)xml中去,發(fā)送的配置綁定到容器里,接收者的跑在本地.否則spring初始化是過不去的.
下面這個(gè)程序是發(fā)送消息的程序.使用了spring的模版,發(fā)條消息比new個(gè)對(duì)象還簡單.同時(shí)spring還提供了適配器的接口,一樣通過聲明式的配置,這樣可以在同一個(gè)接口里發(fā)送各種類型的消息了.同時(shí)支持事務(wù),我還不知道這個(gè)有什么用呵呵,第1次使用嘛!但是就使用上來說,spring是最簡單的.2者都只需要注入一個(gè)對(duì)象而已.
java 代碼
- @Test public void send(){
- ApplicationContext ac = new FileSystemXmlApplicationContext("jms.xml");
- BeanFactory bf = ac;
- JmsTemplate jt = (JmsTemplate) bf.getBean("jmsQueue");
- jt.convertAndSend("2132134");
- }
接收端的配置如下
xml 代碼
- xml version="1.0" encoding="UTF-8"?>
- >
- <beans>
-
- <bean id="listenner" class="com.javaeye.spring.services.jms.mdp.JmsListenner"><!--</span-->bean>
-
- <bean id="userJmsUtil" class="com.javaeye.spring.services.jms.mdp.UserJmsTransactionUtil">
- <property name="connectionFactoryJndiLookUp" value="java:JmsXA"><!--</span-->property>
- <property name="destinationJndiLookUp" value="queue/A"><!--</span-->property>
- <property name="localConnectionFactoryJndiLookUp" value="ConnectionFactory"><!--</span-->property>
- <!--</span-->bean>
-
- <bean id="localConnectionFactory" class="org.springframework.beans.factory.config.MethodInvokingFactoryBean">
- <property name="targetObject" ref="userJmsUtil"><!--</span-->property>
- <property name="targetMethod" value="getQueueConnectionFactory"><!--</span-->property>
- <!--</span-->bean>
-
- <bean id="localDestination" class="org.springframework.beans.factory.config.MethodInvokingFactoryBean">
- <property name="targetObject" ref="userJmsUtil"><!--</span-->property>
- <property name="targetMethod" value="getLocalJmsDestination"><!--</span-->property>
- <!--</span-->bean>
-
- <bean id="listenerContainer" class="org.springframework.jms.listener.DefaultMessageListenerContainer">
- <property name="concurrentConsumers" value="5"><!--</span-->property>
- <property name="connectionFactory" ref="localConnectionFactory"><!--</span-->property>
- <property name="destination" ref="localDestination"><!--</span-->property>
- <property name="messageListener" ref="listenner"><!--</span-->property>
- <!--</span-->bean>
- <!--</span-->beans>
接收端由于需要從jbossmq里取ConnectionFactory和Destination,所以,我調(diào)用的是userJmsUtil的localLookup.這個(gè)函數(shù)的作用等同于發(fā)送者的那個(gè)函數(shù),只不過前者是容器外獲取,而后者是容器內(nèi)的而已.
java 代碼
- package com.javaeye.spring.services.jms.mdp;
-
- import javax.jms.JMSException;
- import javax.jms.Message;
- import javax.jms.MessageListener;
- import javax.jms.TextMessage;
-
- public class JmsListenner implements MessageListener {
-
- public void onMessage(Message message) {
- try {
- TextMessage msg = (TextMessage) message;
- System.out.println(msg.getText());
- } catch (JMSException e) { e.printStackTrace(); }
- }
-
- }
spring對(duì)jms的整合里提到了一個(gè)jms provider ActiveMQ,要用一個(gè)開源框架要做的第一件事就是先跑一個(gè)demo起來,同樣,我們要做的事還是獲取ConnectionFactory和Destination對(duì)象,還好,ActiveMQ的JNDI實(shí)現(xiàn)比jbossMQ還要簡單,直接通過一個(gè)本地的Context就可以查到了,具體的可以參照ActiveMQ官方的支持文檔.
|
|
|