用ActiveMQ遇到的消息確認(rèn)問題問題:我的ActiveMQ接收消息用的是topic模式,持久化訂閱,問題是我用了JMS接收消息的代碼每次重新啟動總是會收到最后一次的消息,但這些消息是已經(jīng)接收過了的,而且啟動一次就收到一次,難道ActiveMQ不會清除緩存的嗎?
//創(chuàng)建JMS連接和會話 ActiveMQConnectionFactory factory = new ActiveMQConnectionFactory(url); connection = factory.createConnection(); connection.setClientID(Constant.JMS_CLIENT_ID); session = connection.createSession(true, Session.AUTO_ACKNOWLEDGE); // 創(chuàng)建消息發(fā)送主題和發(fā)送者 Topic jmsSendTopic = session.createTopic(sendTopic); sendTopicProducer = session.createProducer(jmsSendTopic); sendTopicProducer.setDeliveryMode(DeliveryMode.PERSISTENT); sendTopicProducer.setTimeToLive(Message.DEFAULT_TIME_TO_LIVE); // 創(chuàng)建消息接收主題和接收者 Topic jmsReceiveTopic = session.createTopic(receiveTopic); receiveTopicConsumer = session.createDurableSubscriber(jmsReceiveTopic,Constant.JMS_SUBSCRIBE_NAME); receiveTopicConsumer.setMessageListener(this); connection.start(); 解答:問題原因在于這段代碼在接收到JMS消息時(shí)不會向ActiveMQ服務(wù)器確認(rèn)消息的接收,故而ActiveMQ服務(wù)器一直認(rèn)為該消息沒有成功發(fā)送給接收者,因而每次接收者重啟之后就會收到ActiveMQ服務(wù)器發(fā)送過來的消息。在這里要解釋一下session的創(chuàng)建。
session = connection.createSession(true,Session.Auto_ACKNOWLEDGE); 當(dāng)createSession第一個(gè)參數(shù)為true時(shí),表示創(chuàng)建的session被標(biāo)記為transactional的,確認(rèn)消息就通過確認(rèn)和校正來自動地處理,第二個(gè)參數(shù)應(yīng)該是沒用的。
session = connection.createSession(false,Session.Auto_ACKNOWLEDGE); 當(dāng)createSession的第一個(gè)參數(shù)為false時(shí),表示創(chuàng)建的session沒有標(biāo)記為transactional,此時(shí)有三種用于消息確認(rèn)的選項(xiàng): **AUTO_ACKNOWLEDGE session將自動地確認(rèn)收到的一則消息; **CLIENT_ACKNOWLEDGE 客戶端程序?qū)⒋_認(rèn)收到的一則消息,調(diào)用這則消息的確認(rèn)方法; **DUPS_OK_ACKNOWLEDGE 這個(gè)選項(xiàng)命令session“懶散的”確認(rèn)消息傳遞,可以想到,這將導(dǎo)致消息提供者傳遞的一些復(fù)制消息可能出錯(cuò)。 JMS有兩種消息傳遞方式。標(biāo)記為NON_PERSISTENT的消息最多傳遞一次,而標(biāo)記為PERSISTENT的消息將使用暫存后再轉(zhuǎn)發(fā)的機(jī)理投遞。如果一個(gè)JMS服務(wù)離線,那么持久性消息不會丟失,但是得等到這個(gè)服務(wù)恢復(fù)聯(lián)機(jī)的時(shí)候才會被傳遞。所以默認(rèn)的消息傳遞方式是非持久性的,雖然使用非持久性消息可能降低內(nèi)存和需要的存儲器,但這種傳遞方式只有當(dāng)你不需要接收所有消息時(shí)才使用。 因此正確的代碼只需改動一處就行了,即將true改為false
//創(chuàng)建JMS連接和會話 ActiveMQConnectionFactory factory = new ActiveMQConnectionFactory(url); connection = factory.createConnection(); connection.setClientID(Constant.JMS_CLIENT_ID); session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE); // 創(chuàng)建消息發(fā)送主題和發(fā)送者 Topic jmsSendTopic = session.createTopic(sendTopic); sendTopicProducer = session.createProducer(jmsSendTopic); sendTopicProducer.setDeliveryMode(DeliveryMode.PERSISTENT); sendTopicProducer.setTimeToLive(Message.DEFAULT_TIME_TO_LIVE); // 創(chuàng)建消息接收主題和接收者 Topic jmsReceiveTopic = session.createTopic(receiveTopic); receiveTopicConsumer = session.createDurableSubscriber(jmsReceiveTopic,Constant.JMS_SUBSCRIBE_NAME); receiveTopicConsumer.setMessageListener(this); connection.start(); |
|