日韩黑丝制服一区视频播放|日韩欧美人妻丝袜视频在线观看|九九影院一级蜜桃|亚洲中文在线导航|青草草视频在线观看|婷婷五月色伊人网站|日本一区二区在线|国产AV一二三四区毛片|正在播放久草视频|亚洲色图精品一区

分享

手把手教你如何玩轉(zhuǎn)消息中間件(ActiveMQ)

 一本正經(jīng)地胡鬧 2019-07-02

#情景引入
小白:起床起床起床起床。。。??炱鸫瞺
我:怎么了又,大驚小怪,嚇到我了。
小白:我有事有事想找你,十萬火急呢~~
我:你能有什么事?反正我不信。。那你說說看~~
小白:就是我有兩個小表弟,叫大白和二白,他們現(xiàn)在每天睡覺之前都要分別和我聊天,讓我給他們講故事,如果不講他們就不睡覺。但是,如果一個個的跟他們輪流來說的話,我就需要每天說兩遍,而且我還要找準(zhǔn)他們的時間點(diǎn),這個有時候我有事情都無法實(shí)現(xiàn)這個問題,他們就會很生氣。。。
我:這不是挺好的嘛,小孩子就是愛聽故事的呀。。。
小白:我也愿意講,但是時間這個不是很好控制,有沒有類似,比如我可以之前就描述好了,然后定點(diǎn)給他們兩個一起發(fā)消息,而可以拋開時間和其他因素的影響呢?
我:這個嘛,很簡單呀,你可以讓他們關(guān)注你的一個公眾號,這樣你再定時的推送給他們故事不就可以了嘛。?;蛘?,你可以拉他們進(jìn)你的一個群這樣,就方便了呀~
小白:這樣是可以,但是如果以后還有小表妹要聽我講,我就要如此反復(fù)的做。。感謝好麻煩好麻煩。。。
我:emmm,我理解你的意思,你就想實(shí)現(xiàn)一種很多人都能夠進(jìn)行類似一種消息推送的方式嘛。。。
小白:對的對的。。就是這樣一種,,,我記得我們在技術(shù)方面好像也有一種類似的技術(shù),這個叫做什么去了呢?
我:這就是消息中間件,一種生產(chǎn)者和消費(fèi)者的關(guān)系。
小白:我也想學(xué)我也想學(xué),,你快給我講講,給我講講唄。。
我:真拿你沒辦法,好吧。。。下面我就給你講一下這方面的知識。
#情景分析
其實(shí),小白的這個問題,是一種比較普遍的問題。既然我們作為技術(shù)人員,當(dāng)然我們就要從技術(shù)成分去分析如何解決了。這里面其實(shí)就是包含著一種消息中間件的技術(shù)。它也是最近技術(shù)層面用得非常非常多的,這也是非常值得我們進(jìn)行學(xué)習(xí)。。這在如今的秒殺系統(tǒng),推薦系統(tǒng)等等,都有廣泛的應(yīng)用。。所以,這章我就主要來跟大家說說這方面的知識。
#基本概念的引導(dǎo)
本模塊主要講解關(guān)于消息中間件的相關(guān)基礎(chǔ)知識,也是方便我們后面的學(xué)習(xí)。
###什么是中間件?
非操作系統(tǒng)軟件,非業(yè)務(wù)應(yīng)用軟件,不是直接給最終用戶使用,不能直接給用戶帶來價值的軟件,我們就可以稱為中間件(比如Dubbo,Tomcat,Jetty,Jboss都是屬于的)。
###什么是消息中間件?
百度百科解釋:消息中間件利用高效可靠的消息傳遞機(jī)制進(jìn)行平臺無關(guān)的數(shù)據(jù)交流,并基于數(shù)據(jù)通信來進(jìn)行分布式系統(tǒng)的集成。通過提供消息傳遞和消息排隊(duì)模型,它可以在分布式環(huán)境下擴(kuò)展進(jìn)程間的通信。
關(guān)鍵點(diǎn):關(guān)注于數(shù)據(jù)的發(fā)送和接受,利用高效可靠的異步消息機(jī)制傳遞機(jī)制集成分布式系統(tǒng)。
先簡單的用下面這個圖說明:
這里寫圖片描述
###為什么要使用消息中間件
舉幾個例子,我想你就會明白了。(其實(shí)使用消息中間件主要就是為了解耦合和異步兩個作用)
1:微博,都用過吧。那么,當(dāng)我們新關(guān)注一個用戶,那么系統(tǒng)會相應(yīng)的推送消息給我們,并且還做了很多關(guān)于我們關(guān)注的處理。這就是消息中間件的異步。
2:秒殺系統(tǒng)。100件商品,幾十萬個人在搶,那這個怎么弄呢?總不能就把服務(wù)器給宕機(jī)了吧。那么就可以把用戶的請求進(jìn)行緩存,然后再異步處理。
3:系統(tǒng)A給系統(tǒng)B進(jìn)行通信,而系統(tǒng)B需要對A的消息進(jìn)行相應(yīng)處理之后才能給A反饋,這時候,總不能讓A就傻傻等著吧。那么,這就是異步的功能。
###什么是JMS?
Java消息服務(wù)(Java Message Service)應(yīng)用程序接口是一個Java平臺中關(guān)于面向消息中間件(MOM)的API,用于在兩個應(yīng)用程序之間,或分布式系統(tǒng)中發(fā)送消息,進(jìn)行異步通信。Java消息服務(wù)是一個與具體平臺無關(guān)的API,絕大多數(shù)MOM提供商都對JMS提供支持。
總結(jié)起來說就是:Java對于應(yīng)用程序之間進(jìn)行信息交互的API(而且是異步)。
里面有下面的概念需要理解,對后續(xù)有幫助:

  • 提供者:實(shí)現(xiàn)JMS的消息服務(wù)中間件服務(wù)器。

  • 客戶端:發(fā)送或接受消息的應(yīng)用。

  • 生產(chǎn)者/發(fā)布者:創(chuàng)建并發(fā)送消息的客戶端。

  • 消費(fèi)者/訂閱者:接受并處理消息的客戶端。

  • 消息:應(yīng)用程序之間傳遞的數(shù)據(jù)。

  • 消息模式:在客戶端之間傳遞消息的模式,JMS主要是隊(duì)列模式和主體模式。

  • 隊(duì)列模式特點(diǎn):
    (1)客戶端包括生產(chǎn)者和消費(fèi)者。
    (2)隊(duì)列中的一個消息只能被一個消費(fèi)者使用。
    (3)消費(fèi)者可以隨時取消息。

  • 主體模式特點(diǎn):
    (1)客戶端包括發(fā)布者和訂閱者。
    (2)主題中的消息可以被所有訂閱者消費(fèi)。
    (3)消費(fèi)者不能消費(fèi)訂閱之前發(fā)送的消息。
    ###什么是AMQP?
    AMQP,即Advanced Message Queuing Protocol,一個提供統(tǒng)一消息服務(wù)的應(yīng)用層標(biāo)準(zhǔn)高級消息隊(duì)列協(xié)議,是應(yīng)用層協(xié)議的一個開放標(biāo)準(zhǔn),為面向消息的中間件設(shè)計?;诖藚f(xié)議的客戶端與消息中間件可傳遞消息,并不受客戶端/中間件不同產(chǎn)品,不同的開發(fā)語言等條件的限制。
    簡單點(diǎn)說:就是對于消息中間件所接受的消息傳輸層的協(xié)議(不懂傳輸層,那么就需要多看看計算機(jī)網(wǎng)絡(luò)相關(guān)知識了,OSI的層次劃分),只有這樣才能保證客戶端和消息中間件能夠進(jìn)行交互(換位思考:HTTP和HTTPS甚至說是TCP/IP與UDP協(xié)議都要的道理)。
    emmm,比較一下JMS和AMQP的不同吧。。

  • JMS是定義與Java,而AMQP是一種傳輸層協(xié)議。

  • JMS是屬于Java的API,而AMQP是跨語言的。

  • JMS消息類型只有兩種(主題和隊(duì)列,后續(xù)會說),而AMQP是有五種。

  • JMS主要就是針對Java的開發(fā)的Client,而AMQP是面向消息,隊(duì)列,路由。
    ###什么是ActiveMQ呢?
    ActiveMQ 是Apache出品,最流行的,能力強(qiáng)勁的開源消息總線。ActiveMQ 是一個完全支持JMS1.1和J2EE 1.4規(guī)范的 JMS Provider實(shí)現(xiàn),盡管JMS規(guī)范出臺已經(jīng)是很久的事情了,但是JMS在當(dāng)今的J2EE應(yīng)用中間仍然扮演著特殊的地位。
    簡單點(diǎn)說:不就是為了實(shí)現(xiàn)我上述所想要的需求嘛。然后它就是一種實(shí)現(xiàn)的方式。就比如,Tomcat是什么?不就是為了實(shí)現(xiàn)一種client與服務(wù)器之間的交互的一種產(chǎn)品嘛。。所以,不需要死記概念,自己理解就好。
    #ActiveMQ的安裝
    ##環(huán)境:Windows
    步驟:
    (1)登錄到ActiveMQ的官網(wǎng),下載安裝包。http://activemq./activemq-5154-release.html
    (2)下載Zip文件
    這里寫圖片描述
    (3)解壓Zip文件,目錄如下
    這里寫圖片描述
    (4)啟動ActiveMQ服務(wù)(注意:要右鍵以管理員身份進(jìn)行運(yùn)行)
    這里寫圖片描述
    注意:有兩種方式,第一種就是類似tomcat啟動,那么啟動圖會一直顯示。
    而第二種的話,就是把這個ActiveMQ注冊到服務(wù)列表中,這樣更方便我們進(jìn)行操作。(推薦使用這種)
    (5)登錄,驗(yàn)證是否啟動成功
    這里寫圖片描述
    (6)進(jìn)入管理頁面
    這里寫圖片描述
    OK,進(jìn)入之后就可以看我們的管理頁面啦。。。是不是很簡單呢?
    ##環(huán)境:Linux
    步驟:(多余的我就不多說了。。。請看windows的步驟)
    (1)同樣需要下載對應(yīng)的文件。后綴為tar.gz的這樣的。其實(shí)可以直接通過下面的這個命令下載,快速一點(diǎn),免得要移動到Linux(注意:如果是通過ssh連接的方式的話)。

wget https://mirrors.tuna./apache//activemq/5.15.4/apache-activemq-5.15.4-bin.tar.gz

(2)然后解壓下載的文件
(3)同樣進(jìn)入相對應(yīng)的目錄,運(yùn)行

./activemq start

(4)然后再訪問相同的地址就可以看到啦。(具體看windows安裝步驟)
#ActiveMQ的使用(基于Maven)
首先要再回頭看看JMS中的一些關(guān)鍵接口。

  • ConnectionFactory:用于創(chuàng)建連接到消息中間件的連接工廠。
  • Connection:代表了應(yīng)用程序和服務(wù)之間的連接通路。
  • Destination:指消息發(fā)布的地點(diǎn),包括隊(duì)列模式和主體模式。
  • Session:表示一個單線程的上下文,用于發(fā)送和接受消息。
  • MessageConsumer:由會話創(chuàng)建,用于接受發(fā)送到目的的消息。
  • MessageProducer:由會話創(chuàng)建,用于發(fā)送消息。
  • Message:是在消費(fèi)者和生產(chǎn)者之間傳遞的對象,消息頭,一組消息屬性,和一個消息體。
    這里寫圖片描述
    環(huán)境:IDEA
    步驟:
  1. 使用IDEA創(chuàng)建一個Maven項(xiàng)目,最簡單的骨架即可(quick)
  2. 導(dǎo)入ActiveMq的依賴
<!--添加activemq的依賴-->
    <dependency>
      <groupId>org.apache.activemq</groupId>
      <artifactId>activemq-all</artifactId>
      <version>5.9.0</version>
    </dependency>

###情形一:隊(duì)列模型的消息
3. 編寫生產(chǎn)者代碼(使用隊(duì)列模型的消息)

package com.hnu.scw.queue;
import org.apache.activemq.ActiveMQConnectionFactory;
import javax.jms.*;
/**
 * @ Author     :scw
 * @ Date       :Created in 上午 11:06 2018/7/14 0014
 * @ Description:用于消息的創(chuàng)建類
 * @ Modified By:
 * @Version: $version$
 */
public class MessageProducer {
    //定義ActivMQ的連接地址
    private static final String ACTIVEMQ_URL = "tcp://127.0.0.1:61616";
    //定義發(fā)送消息的隊(duì)列名稱
    private static final String QUEUE_NAME = "MyMessage";

    public static void main(String[] args) throws JMSException {
        //創(chuàng)建連接工廠
        ActiveMQConnectionFactory activeMQConnectionFactory = new ActiveMQConnectionFactory(ACTIVEMQ_URL);
       //創(chuàng)建連接
        Connection connection = activeMQConnectionFactory.createConnection();
        //打開連接
        connection.start();
        //創(chuàng)建會話
        Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
        //創(chuàng)建隊(duì)列目標(biāo)
        Destination destination = session.createQueue(QUEUE_NAME);
        //創(chuàng)建一個生產(chǎn)者
        javax.jms.MessageProducer producer = session.createProducer(destination);
        //創(chuàng)建模擬100個消息
        for (int i = 1 ; i <= 100 ; i++){
            TextMessage message = session.createTextMessage("我發(fā)送message:" + i);
            //發(fā)送消息
            producer.send(message);
            //在本地打印消息
            System.out.println("我現(xiàn)在發(fā)的消息是:" + message.getText());
        }
        //關(guān)閉連接
        connection.close();
    }
}
  1. 查看是否消息產(chǎn)生成功
    這里寫圖片描述
  2. 編寫消費(fèi)者代碼(消費(fèi)隊(duì)列模型的消息)
package com.hnu.scw.queue;
import org.apache.activemq.ActiveMQConnectionFactory;
import javax.jms.*;
/**
 * @ Author     :scw
 * @ Date       :Created in 上午 11:30 2018/7/14 0014
 * @ Description:消息消費(fèi)者
 * @ Modified By:
 * @Version: $version$
 */
public class MessageConsumer {
    //定義ActivMQ的連接地址
    private static final String ACTIVEMQ_URL = "tcp://127.0.0.1:61616";
    //定義發(fā)送消息的隊(duì)列名稱
    private static final String QUEUE_NAME = "MyMessage";
    public static void main(String[] args) throws JMSException {
        //創(chuàng)建連接工廠
        ActiveMQConnectionFactory activeMQConnectionFactory = new ActiveMQConnectionFactory(ACTIVEMQ_URL);
        //創(chuàng)建連接
        Connection connection = activeMQConnectionFactory.createConnection();
        //打開連接
        connection.start();
        //創(chuàng)建會話
        Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
        //創(chuàng)建隊(duì)列目標(biāo)
        Destination destination = session.createQueue(QUEUE_NAME);
        //創(chuàng)建消費(fèi)者
        javax.jms.MessageConsumer consumer = session.createConsumer(destination);
        //創(chuàng)建消費(fèi)的監(jiān)聽
        consumer.setMessageListener(new MessageListener() {
            @Override
            public void onMessage(Message message) {
                TextMessage textMessage = (TextMessage) message;
                try {
                    System.out.println("獲取消息:" + textMessage.getText());
                } catch (JMSException e) {
                    e.printStackTrace();
                }
            }
        });
    }
}
  1. 查看是否進(jìn)行了消費(fèi)
    這里寫圖片描述
    **備注:**我上面進(jìn)行的是隊(duì)列模式的消息,而且進(jìn)行的都是單個消費(fèi)者,那如果我換成同時有兩個消費(fèi)者消費(fèi)生產(chǎn)者的消息會怎么樣呢?(我們只需要運(yùn)行兩個消費(fèi)者就可以啦。當(dāng)然,要保證生產(chǎn)者是產(chǎn)生了消息的哦~~~~否則,拿什么消費(fèi)呢~)
    一個生產(chǎn)者,兩個消費(fèi)者的情況如下:
    切記:先運(yùn)行兩個消費(fèi)者,然后再運(yùn)行生產(chǎn)者代碼:
    結(jié)果如下:
    這里寫圖片描述
    這里寫圖片描述

    其實(shí),這就是解釋了,我之前說的,隊(duì)列模式的消息,是只會被一個消費(fèi)者所使用的,而不會被共享,這也就是和主題模型的差別哦~~~哈哈
    ###情形二:主題模型的消息
    前面的步驟都一樣,只是生產(chǎn)者和消費(fèi)者的代碼有點(diǎn)區(qū)別:

  2. 編寫生產(chǎn)者(這個和隊(duì)列模型其實(shí)很像,稍微修改就可以)

package com.hnu.scw.topic;

import org.apache.activemq.ActiveMQConnectionFactory;

import javax.jms.*;

/**
 * @ Author     :scw
 * @ Date       :Created in 上午 11:48 2018/7/14 0014
 * @ Description:${description}
 * @ Modified By:
 * @Version: $version$
 */
public class MessageTopicProducer {

    //定義ActivMQ的連接地址
    private static final String ACTIVEMQ_URL = "tcp://127.0.0.1:61616";
    //定義發(fā)送消息的主題名稱
    private static final String TOPIC_NAME = "MyTopicMessage";

    public static void main(String[] args) throws JMSException {
        //創(chuàng)建連接工廠
        ActiveMQConnectionFactory activeMQConnectionFactory = new ActiveMQConnectionFactory(ACTIVEMQ_URL);
        //創(chuàng)建連接
        Connection connection = activeMQConnectionFactory.createConnection();
        //打開連接
        connection.start();
        //創(chuàng)建會話
        Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
        //創(chuàng)建隊(duì)列目標(biāo)
        Destination destination = session.createTopic(TOPIC_NAME);
        //創(chuàng)建一個生產(chǎn)者
        javax.jms.MessageProducer producer = session.createProducer(destination);
        //創(chuàng)建模擬100個消息
        for (int i = 1; i <= 100; i++) {
            TextMessage message = session.createTextMessage("當(dāng)前message是(主題模型):" + i);
            //發(fā)送消息
            producer.send(message);
            //在本地打印消息
            System.out.println("我現(xiàn)在發(fā)的消息是:" + message.getText());
        }
        //關(guān)閉連接
        connection.close();
    }
}
  1. 查看生產(chǎn)者的消息
    這里寫圖片描述
  2. 編寫消費(fèi)者
package com.hnu.scw.topic;

import org.apache.activemq.ActiveMQConnectionFactory;

import javax.jms.*;

/**
 * @ Author     :scw
 * @ Date       :Created in 上午 11:50 2018/7/14 0014
 * @ Description:${description}
 * @ Modified By:
 * @Version: $version$
 */
public class MessageTopicConsumer {
    //定義ActivMQ的連接地址
    private static final String ACTIVEMQ_URL = "tcp://127.0.0.1:61616";
    //定義發(fā)送消息的隊(duì)列名稱
    private static final String TOPIC_NAME = "MyTopicMessage";
    public static void main(String[] args) throws JMSException {
        //創(chuàng)建連接工廠
        ActiveMQConnectionFactory activeMQConnectionFactory = new ActiveMQConnectionFactory(ACTIVEMQ_URL);
        //創(chuàng)建連接
        Connection connection = activeMQConnectionFactory.createConnection();
        //打開連接
        connection.start();
        //創(chuàng)建會話
        Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
        //創(chuàng)建隊(duì)列目標(biāo)
        Destination destination = session.createTopic(TOPIC_NAME);
        //創(chuàng)建消費(fèi)者
        javax.jms.MessageConsumer consumer = session.createConsumer(destination);
        //創(chuàng)建消費(fèi)的監(jiān)聽
        consumer.setMessageListener(new MessageListener() {
            @Override
            public void onMessage(Message message) {
                TextMessage textMessage = (TextMessage) message;
                try {
                    System.out.println("獲取消息:" + textMessage.getText());
                } catch (JMSException e) {
                    e.printStackTrace();
                }
            }
        });
    }
}

  1. 查看是否消費(fèi)成功
    然而,我們運(yùn)行消費(fèi)者代碼,發(fā)現(xiàn)怎么沒有消息消費(fèi)呢?????????
    其實(shí),這就是主題模型的一個特點(diǎn),如果消費(fèi)者是在生產(chǎn)者產(chǎn)生消息之后來的,那么是不會對之前的消息進(jìn)行消費(fèi)的哦。?!,F(xiàn)在知道它們的區(qū)別在哪了吧。
    如果,現(xiàn)在是兩個消費(fèi)者和一個生產(chǎn)者的主題模型又是怎么的結(jié)果呢?
    這里寫圖片描述
    這里寫圖片描述
    哎喲。。。。這種情況消費(fèi)者都各自消費(fèi)了所有的生產(chǎn)者的消息耶。。。。。這就是共享性消息的主題模式,這就是和隊(duì)列模型的區(qū)別,,,大家好好的對比哦~~
    #ActiveMQ使用(基于Spring)
    步驟:
  2. 創(chuàng)建一個Maven項(xiàng)目(基于最簡單的quick骨架即可)
  3. 導(dǎo)入Spring和ActiveMQ的相關(guān)依賴
<?xml version="1.0" encoding="UTF-8"?>

<project xmlns="http://maven./POM/4.0.0" xmlns:xsi="http://www./2001/XMLSchema-instance"
  xsi:schemaLocation="http://maven./POM/4.0.0 http://maven./xsd/maven-4.0.0.xsd">
  <modelVersion>4.0.0</modelVersion>

  <groupId>com.hnu.scw</groupId>
  <artifactId>activemq</artifactId>
  <version>1.0-SNAPSHOT</version>

  <name>activemq</name>
  <!-- FIXME change it to the project's website -->
  <url>http://www.</url>

  <properties>
    <project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
    <maven.compiler.source>1.7</maven.compiler.source>
    <maven.compiler.target>1.7</maven.compiler.target>
    <spring.version>4.2.5.RELEASE</spring.version>
  </properties>

  <dependencies>
    <dependency>
      <groupId>junit</groupId>
      <artifactId>junit</artifactId>
      <version>4.11</version>
      <scope>test</scope>
    </dependency>
    <!--添加activemq的依賴-->
    <dependency>
      <groupId>org.apache.activemq</groupId>
      <artifactId>activemq-all</artifactId>
      <version>5.9.0</version>
    </dependency>

    <!--spring整合activemq所需要的依賴-->
    <dependency>
      <groupId>org.springframework</groupId>
      <artifactId>spring-context</artifactId>
      <version>${spring.version}</version>
    </dependency>
    <dependency>
      <groupId>org.springframework</groupId>
      <artifactId>spring-jms</artifactId>
      <version>${spring.version}</version>
    </dependency>
    <dependency>
      <groupId>org.springframework</groupId>
      <artifactId>spring-test</artifactId>
      <version>${spring.version}</version>
    </dependency>

    <dependency>
      <groupId>org.apache.activemq</groupId>
      <artifactId>activemq-core</artifactId>
      <version>5.7.0</version>
      <exclusions>
        <exclusion>
          <artifactId>spring-context</artifactId>
          <groupId>org.springframework</groupId>
        </exclusion>
      </exclusions>
    </dependency>
  </dependencies>

  <build>
    <pluginManagement><!-- lock down plugins versions to avoid using Maven defaults (may be moved to parent pom) -->
      <plugins>
        <plugin>
          <artifactId>maven-clean-plugin</artifactId>
          <version>3.0.0</version>
        </plugin>
        <!-- see http://maven./ref/current/maven-core/default-bindings.html#Plugin_bindings_for_jar_packaging -->
        <plugin>
          <artifactId>maven-resources-plugin</artifactId>
          <version>3.0.2</version>
        </plugin>
        <plugin>
          <artifactId>maven-compiler-plugin</artifactId>
          <version>3.7.0</version>
        </plugin>
        <plugin>
          <artifactId>maven-surefire-plugin</artifactId>
          <version>2.20.1</version>
        </plugin>
        <plugin>
          <artifactId>maven-jar-plugin</artifactId>
          <version>3.0.2</version>
        </plugin>
        <plugin>
          <artifactId>maven-install-plugin</artifactId>
          <version>2.5.2</version>
        </plugin>
        <plugin>
          <artifactId>maven-deploy-plugin</artifactId>
          <version>2.8.2</version>
        </plugin>
      </plugins>
    </pluginManagement>
  </build>
</project>

  1. 編寫生產(chǎn)者的配置文件.xml,取名為producer.xml
<?xml version="1.0" encoding="UTF-8" ?>

<beans xmlns="http://www./schema/beans"
       xmlns:xsi="http://www./2001/XMLSchema-instance"
       xmlns:context="http://www./schema/context"
       xsi:schemaLocation="http://www./schema/beans
       http://www./schema/beans/spring-beans.xsd
       http://www./schema/context
       http://www./schema/context/spring-context.xsd ">

    <context:annotation-config />

    <!--Activemq的連接工廠-->
    <bean id="targetConnectionFactory" class="org.apache.activemq.ActiveMQConnectionFactory">
        <property name="brokerURL" value="tcp://127.0.0.1:61616" />
    </bean>
   <!--spring jms為我們提供的連接池 獲取一個連接工廠-->
    <bean id="connectionFactory" class="org.springframework.jms.connection.SingleConnectionFactory">
        <property name="targetConnectionFactory" ref="targetConnectionFactory" />
    </bean>

    <!-- 消息目的地  點(diǎn)對點(diǎn)的模式-->
    <bean id="queueDestination" class="org.apache.activemq.command.ActiveMQQueue">
        <constructor-arg value="SpringActiveMQMsg"/>
    </bean>
    <!-- jms模板  用于進(jìn)行消息發(fā)送-->
    <bean id="jmsTemplate" class="org.springframework.jms.core.JmsTemplate">
        <property name="connectionFactory" ref="connectionFactory"/>
    </bean>
</beans>

  1. 編寫生產(chǎn)者的接口
package com.hnu.scw.spring;

/**
 * @ Author     :scw
 * @ Date       :Created in 下午 12:19 2018/7/14 0014
 * @ Description:生產(chǎn)者的接口
 * @ Modified By:
 * @Version: $version$
 */
public interface ProduceService {
    void sendMessage(String msg);
}

  1. 編寫生產(chǎn)者的實(shí)現(xiàn)
package com.hnu.scw.spring;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.jms.core.JmsTemplate;
import org.springframework.jms.core.MessageCreator;
import javax.annotation.Resource;
import javax.jms.*;

/**
 * @ Author     :scw
 * @ Date       :Created in 下午 2:21 2018/7/15 0015
 * @ Description:生產(chǎn)者的實(shí)現(xiàn)類
 * @ Modified By:
 * @Version: $version$
 */

public class ProduceServiceImpl implements ProduceService {
    @Autowired
    private JmsTemplate jmsTemplate;
    @Resource(name = "queueDestination")
    private Destination destination;

    /**
     * 發(fā)送消息
     * @param msg
     */
    @Override
    public void sendMessage(final String msg) {
        jmsTemplate.send(destination , new MessageCreator() {
            @Override
            public Message createMessage(Session session) throws JMSException {
                TextMessage textMessage = session.createTextMessage(msg);
                return textMessage;
            }
        });
        System.out.println("現(xiàn)在發(fā)送的消息為: " + msg);
    }
}

  1. 將生產(chǎn)者的類添加到上述的配置文件中
<!--注入我們的生產(chǎn)者-->
    <bean class="com.hnu.scw.spring.ProduceServiceImpl"/>
  1. 編寫生產(chǎn)者的測試類
package com.hnu.scw.spring;
import org.springframework.context.support.ClassPathXmlApplicationContext;

/**
 * @ Author     :scw
 * @ Date       :Created in 下午 2:27 2018/7/15 0015
 * @ Description:生產(chǎn)者的測試
 * @ Modified By:
 * @Version: $version$
 */
public class ProducerTest {
    public static void main(String[] args){
        ClassPathXmlApplicationContext classPathXmlApplicationContext = new ClassPathXmlApplicationContext("producer.xml");
        ProduceService bean = classPathXmlApplicationContext.getBean(ProduceService.class);
        //進(jìn)行發(fā)送消息
        for (int i = 0; i < 100 ; i++) {
            bean.sendMessage("test" + i);
        }
        //當(dāng)消息發(fā)送完后,關(guān)閉容器
        classPathXmlApplicationContext.close();
    }
}

  1. 運(yùn)行測試類,查看生產(chǎn)者是否產(chǎn)生消息成功
    這里寫圖片描述
    通過上述的界面,就可以看到自己配置的隊(duì)列模式的消息產(chǎn)生成功。
  2. 編寫消費(fèi)者的消息監(jiān)聽類
  3. 編寫消費(fèi)者的配置文件,命名為consumer.xml
<?xml version="1.0" encoding="UTF-8" ?>

<beans xmlns="http://www./schema/beans"
       xmlns:xsi="http://www./2001/XMLSchema-instance"
       xmlns:context="http://www./schema/context"
       xsi:schemaLocation="http://www./schema/beans
       http://www./schema/beans/spring-beans.xsd
       http://www./schema/context
       http://www./schema/context/spring-context.xsd ">

    <context:annotation-config />

    <!--Activemq的連接工廠-->
    <bean id="targetConnectionFactory" class="org.apache.activemq.ActiveMQConnectionFactory">
        <property name="brokerURL" value="tcp://127.0.0.1:61616" />
    </bean>
    <!--spring jms為我們提供的連接池 獲取一個連接工廠-->
    <bean id="connectionFactory" class="org.springframework.jms.connection.SingleConnectionFactory">
        <property name="targetConnectionFactory" ref="targetConnectionFactory" />
    </bean>

    <!-- 消息目的地  點(diǎn)對點(diǎn)的模式-->
    <bean id="queueDestination" class="org.apache.activemq.command.ActiveMQQueue">
        <constructor-arg value="SpringActiveMQMsg"/>
    </bean>

    <!-- 配置消息監(jiān)聽器-->
    <bean id="consumerMessageListener" class="com.hnu.scw.spring.ComsumerMessageListener"/>
    <!--配置消息容器-->
    <bean id ="jmsContainer" class="org.springframework.jms.listener.DefaultMessageListenerContainer">
        <!--配置連接工廠-->
        <property name="connectionFactory" ref="connectionFactory"/>
        <!--配置監(jiān)聽的隊(duì)列-->
        <property name="destination" ref="queueDestination"/>
        <!--配置消息監(jiān)聽器-->
        <property name="messageListener" ref="consumerMessageListener"/>
    </bean>
</beans>
  1. 消息消費(fèi)者ComsumerMessageListener類代碼
package com.hnu.scw.spring;

import javax.jms.JMSException;
import javax.jms.Message;
import javax.jms.MessageListener;
import javax.jms.TextMessage;

/**
 * @ Author     :scw
 * @ Date       :Created in 下午 3:06 2018/7/15 0015
 * @ Description:消息的監(jiān)聽者,用于處理消息
 * @ Modified By:
 * @Version: $version$
 */
public class ComsumerMessageListener implements MessageListener {
    @Override
    public void onMessage(Message message) {
        TextMessage textMessage = (TextMessage) message;
        try {
            System.out.println("接受到消息:" + textMessage.getText());
        } catch (JMSException e) {
            e.printStackTrace();
        }
    }
}

  1. 編寫測試文件,測試消費(fèi)者消費(fèi)消息是否成功
package com.hnu.scw.spring;

import org.springframework.context.support.ClassPathXmlApplicationContext;

/**
 * @ Author     :scw
 * @ Date       :Created in 下午 3:13 2018/7/15 0015
 * @ Description:消費(fèi)者的測試
 * @ Modified By:
 * @Version: $version$
 */
public class ConsumerTest {
    public static void main(String[] args){
        //啟動消費(fèi)者
        ClassPathXmlApplicationContext classPathXmlApplicationContext = new ClassPathXmlApplicationContext("consumer.xml");
    }
}

  1. 查看ActiveMQ網(wǎng)站具體消息情況
    這里寫圖片描述
    這里寫圖片描述
  2. ActiveMQ的隊(duì)列模型就大功告成啦。。。。。。so easy?。?!
    備注:上面都是進(jìn)行的ActiveMQ的隊(duì)列模型的配置,那么我們?nèi)绻脒M(jìn)行主題模型的又是如何進(jìn)行操作呢?其實(shí)也很簡單,只需要修改生產(chǎn)者的xml文件里面的隊(duì)列即可。比如如下代碼:
<!-- 消息目的地  (主題模式)-->
    <!--<bean id="queueDestination" class="org.apache.activemq.command.ActiveMQTopic">
        <!&ndash;配置隊(duì)列模型的消息名稱&ndash;>
        <constructor-arg value="SpringActiveMQMsgTopic"/>
    </bean>-->

將上面的代碼替換之前的就可以了。。。
總結(jié):總的來說,基于Spring來使用消息隊(duì)列還是非常方便的,這比我們正常進(jìn)行JMS規(guī)范操作要簡單很多,畢竟很多對象都是通過Spring的IOC進(jìn)行容器管理了,所以,值得推薦使用哦~~~
#ActiveMQ的集群
###為什么要進(jìn)行集群呢?
原因一:實(shí)現(xiàn)高可用:以排除單點(diǎn)故障所引起的服務(wù)終端。
原因二:實(shí)現(xiàn)負(fù)載均衡:以提升效率為更多的客戶進(jìn)行服務(wù)。
###集群的方式有哪些?
方式一:客戶端集群:多個客戶端消費(fèi)同一個隊(duì)列。
方式二:Broker clusters:多個Broker之間同步消息。(實(shí)現(xiàn)負(fù)載均衡)
這里寫圖片描述
這個的實(shí)現(xiàn)原理主要是通過網(wǎng)絡(luò)連接器來進(jìn)行。
網(wǎng)絡(luò)連接器:用于配置ActiveMQ服務(wù)器與服務(wù)器之間的網(wǎng)絡(luò)通訊方式,用于服務(wù)器透析消息。主要分為靜態(tài)連接和動態(tài)連接。
方式三:Master Slave :實(shí)現(xiàn)高可用。
這種方式的話,可以聯(lián)想到Mysql的主從配置和Zookeeper的負(fù)載均衡的主競爭關(guān)系master。
我們在實(shí)際的開發(fā)中,一般都是將方式二和方式三進(jìn)行集成,從而實(shí)現(xiàn)高可用和負(fù)載均衡。下面的話,我也就這樣的配置思想來進(jìn)行講解:(通過三臺服務(wù)器來模擬消息集群的實(shí)現(xiàn))
這里寫圖片描述
其中的NodeB和NodeC就是一張Master/slave的關(guān)系。都可以成為主服務(wù)器。(只要它們某一個宕機(jī),那么就會其余的一臺就進(jìn)行繼續(xù)服務(wù))
###搭建步驟(基于Windows環(huán)境,而Linux環(huán)境也是一樣的操作)
三臺服務(wù)器的大體功能和描述:
這里寫圖片描述
由于自己沒有三臺服務(wù)器,所以就用自己的一臺電腦來模擬三臺消息服務(wù)器,其實(shí)這個就是假設(shè)有三個不同ActiveMQ消息服務(wù)器了。

  1. 復(fù)制三個ActiveMQ的服務(wù)配置到一個公共目錄
    這里寫圖片描述
  2. 修改activeMQA的配置文件
    這里寫圖片描述
    只需要在activemq.xml添加如下內(nèi)容:
<networkConnectors>
		<networkConnector name="local_network" uri ="static:(tcp://127.0.0.1:61617,tcp://127.0.0.1:61618)" />
	</networkConnectors>
  1. 修改ActiveMQB的配置文件
    (1)首先在activemq,xml中添加如下內(nèi)容:
<!--修改服務(wù)端口-->
<transportConnector name="openwire" uri="tcp://0.0.0.0:61617?maximumConnections=1000&wireFormat.maxFrameSize=104857600"/>
<networkConnectors>
	   <networkConnector name="networktoA" uri="static:(tcp://127.0.0.1:61616)" />
	</networkConnectors>
<!--并修改下面這個標(biāo)簽的內(nèi)容 , 作為B和C的共享文件,目錄就是自己之前創(chuàng)建的一個文件(可以回看上面的整個結(jié)構(gòu))-->
<persistenceAdapter>
            <kahaDB directory="D:\Download\MQJiQun\shareDB"/>
        </persistenceAdapter>

(2)修改jetty.xml內(nèi)容,修改服務(wù)器的服務(wù)端口

<bean id="jettyPort" class="org.apache.activemq.web.WebConsolePort" init-method="start">
             <!-- the default port number for the web console -->
        <property name="host" value="0.0.0.0"/>
        <property name="port" value="8162"/>
    </bean>
  1. 修改ActiveMQC的配置文件(其實(shí)類似和B一樣,只是服務(wù)端口不一樣)
    (1)修改activemq.xml中的內(nèi)容
<!--修改服務(wù)端口-->
<transportConnector name="openwire" uri="tcp://0.0.0.0:61618?maximumConnections=1000&wireFormat.maxFrameSize=104857600"/>
<networkConnectors>
	   <networkConnector name="networktoA" uri="static:(tcp://127.0.0.1:61616)" />
	</networkConnectors>
<!--并修改下面這個標(biāo)簽的內(nèi)容 , 作為B和C的共享文件,目錄就是自己之前創(chuàng)建的一個文件(可以回看上面的整個結(jié)構(gòu))-->
<persistenceAdapter>
            <kahaDB directory="D:\Download\MQJiQun\shareDB"/>
        </persistenceAdapter>

(2)修改jetty.xml中的內(nèi)容

<bean id="jettyPort" class="org.apache.activemq.web.WebConsolePort" init-method="start">
             <!-- the default port number for the web console -->
        <property name="host" value="0.0.0.0"/>
        <property name="port" value="8163"/>
    </bean>
  1. 集群搭建完成~~~~

集群測試(基于IDEA編輯器+Maven)

步驟:
(1)創(chuàng)建Maven項(xiàng)目
(2)導(dǎo)入依賴

<!--添加activemq的依賴-->
    <dependency>
      <groupId>org.apache.activemq</groupId>
      <artifactId>activemq-all</artifactId>
      <version>5.9.0</version>
    </dependency>

(3)編寫生產(chǎn)者代碼

package com.hnu.scw.queue;
import org.apache.activemq.ActiveMQConnectionFactory;
import javax.jms.*;
/**
 * @ Author     :scw
 * @ Date       :Created in 上午 11:06 2018/7/14 0014
 * @ Description:用于消息的創(chuàng)建類
 * @ Modified By:
 * @Version: $version$
 */
public class MessageProducer {
    //通過集群的方式進(jìn)行消息服務(wù)器的管理(failover就是進(jìn)行動態(tài)轉(zhuǎn)移,當(dāng)某個服務(wù)器宕機(jī),
    // 那么就進(jìn)行其他的服務(wù)器選擇,randomize表示隨機(jī)選擇)
    private static final String ACTIVEMQ_URL = "failover:(tcp://127.0.0.1:61617,tcp://127.0.0.1:61618)?randomize=true";
    //定義發(fā)送消息的隊(duì)列名稱
    private static final String QUEUE_NAME = "MyMessage";

    public static void main(String[] args) throws JMSException {
        //創(chuàng)建連接工廠
        ActiveMQConnectionFactory activeMQConnectionFactory = new ActiveMQConnectionFactory(ACTIVEMQ_URL);
       //創(chuàng)建連接
        Connection connection = activeMQConnectionFactory.createConnection();
        //打開連接
        connection.start();
        //創(chuàng)建會話
        Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
        //創(chuàng)建隊(duì)列目標(biāo)
        Destination destination = session.createQueue(QUEUE_NAME);
        //創(chuàng)建一個生產(chǎn)者
        javax.jms.MessageProducer producer = session.createProducer(destination);
        //創(chuàng)建模擬100個消息
        for (int i = 1 ; i <= 100 ; i++){
            TextMessage message = session.createTextMessage("當(dāng)前message是:" + i);
            //發(fā)送消息
            producer.send(message);
            //在本地打印消息
            System.out.println("我現(xiàn)在發(fā)的消息是:" + message.getText());
        }
        //關(guān)閉連接
        connection.close();
    }

}

(4)編寫消費(fèi)者代碼

package com.hnu.scw.queue;

import org.apache.activemq.ActiveMQConnectionFactory;

import javax.jms.*;

/**
 * @ Author     :scw
 * @ Date       :Created in 上午 11:30 2018/7/14 0014
 * @ Description:消息消費(fèi)者
 * @ Modified By:
 * @Version: $version$
 */
public class MessageConsumer {
    //通過集群的方式進(jìn)行消息服務(wù)器的管理(failover就是進(jìn)行動態(tài)轉(zhuǎn)移,當(dāng)某個服務(wù)器宕機(jī),
    // 那么就進(jìn)行其他的服務(wù)器選擇,randomize表示隨機(jī)選擇)
    private static final String ACTIVEMQ_URL = "failover:(tcp://127.0.0.1:61616,tcp://127.0.0.1:61617,tcp://127.0.0.1:61618)?randomize=true";
    //定義發(fā)送消息的隊(duì)列名稱
    private static final String QUEUE_NAME = "MyMessage";

    public static void main(String[] args) throws JMSException {
        //創(chuàng)建連接工廠
        ActiveMQConnectionFactory activeMQConnectionFactory = new ActiveMQConnectionFactory(ACTIVEMQ_URL);
        //創(chuàng)建連接
        Connection connection = activeMQConnectionFactory.createConnection();
        //打開連接
        connection.start();
        //創(chuàng)建會話
        Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
        //創(chuàng)建隊(duì)列目標(biāo)
        Destination destination = session.createQueue(QUEUE_NAME);
        //創(chuàng)建消費(fèi)者
        javax.jms.MessageConsumer consumer = session.createConsumer(destination);
        //創(chuàng)建消費(fèi)的監(jiān)聽
        consumer.setMessageListener(new MessageListener() {
            @Override
            public void onMessage(Message message) {
                TextMessage textMessage = (TextMessage) message;
                try {
                    System.out.println("獲取消息:" + textMessage.getText());
                } catch (JMSException e) {
                    e.printStackTrace();
                }
            }
        });
    }

}

(5)進(jìn)行查看各自的服務(wù)器的消息隊(duì)列的情況。

  1. 首先,是要確保三個ActiveMQ服務(wù)器都進(jìn)行打開。分析:當(dāng)三個都服務(wù)都運(yùn)行之后,我們從瀏覽器運(yùn)行各自的地址,會發(fā)現(xiàn):
    比如:我這里的三個服務(wù)的地址分別如下:
  • http://127.0.0.1:8161/
  • http://127.0.0.1:8162/
  • http://127.0.0.1:8163/
    ###重點(diǎn)
    為什么前面兩個都可以訪問,而第三個不可以呢?(同樣也是按照我的這樣的服務(wù)器打開方式哦。先打開的服務(wù)器A,接著B,最后C)但是,運(yùn)行的時候,提示都成功了呀。。為什么為什么???
    分析:其實(shí)很簡單,我說過B和C是一種master/slave的方式,當(dāng)B運(yùn)行之后就獲得了master的權(quán)限,那么C服務(wù)是可以看到是一種監(jiān)聽的狀態(tài),只有當(dāng)B宕機(jī)之后,才有可能獲取master的資源權(quán)限,所以,這時候C的地址當(dāng)然就無法訪問啦。這就是負(fù)載均衡的一種主/從服務(wù)的結(jié)構(gòu)。當(dāng)然,你可以試著先打開C,再打開B,這時候效果就反過來了。歡迎嘗試哦~~~
  1. 再運(yùn)行MessageProducer的類,用于產(chǎn)生消息。這時候,大家可以去查看每個服務(wù)器的地址,來觀察消息的產(chǎn)生情況。我的如下:
    這里寫圖片描述
    我的消息是產(chǎn)生在服務(wù)器B的里面啦。。。。。。
  2. 再運(yùn)行MessageConsumer的類,用于消費(fèi)消息。這時候,同樣可以去查看每個服務(wù)器的地址中的消息隊(duì)列的情況,來觀察消息的消費(fèi)情況。我的如下:
    這里寫圖片描述
  3. 如果,我們在生產(chǎn)者產(chǎn)生了消息之后,服務(wù)器B突然宕機(jī)了怎么辦怎么辦??
    分析:其實(shí),這時候服務(wù)器C就一樣有消息保存進(jìn)行同步了。。是不是這樣就是一種高可用的架構(gòu)了呢????大家,可以試試哦。。把B服務(wù)器關(guān)掉,再去訪問服務(wù)器C的地址,就發(fā)現(xiàn)如下的結(jié)果。
    這里寫圖片描述
    這時候服務(wù)器C就作為了master,所以,類似zookeeper就是這樣的一種方式的哦。~
    ###總結(jié)
    好了,對于集群方面的簡單使用就到這里了。其實(shí)已經(jīng)可以根據(jù)這個進(jìn)行擴(kuò)展了,所以,小伙伴要好好理解這里面的過程和作用,這樣才能夠?qū)W以致用。。。

#其他的消息中間件
其實(shí),類似ActiveMQ這樣的消息中間件,用得比較多的還有就是RabbitMQ和Kafka。它們?nèi)吒髯杂懈髯缘膬?yōu)勢。大家可以百度進(jìn)行了解,我就不進(jìn)行多說了。后面我會同樣把這兩種消息中間件的使用進(jìn)行詳細(xì)的講解,歡迎大家的關(guān)注哦~總的來說,只有適合的場景對應(yīng)的消息中間件才能發(fā)揮最大的作用,沒有一種是只有好處而沒有壞處的~
#總結(jié)

  • 主要是對消息中間件的基礎(chǔ)知識進(jìn)行講解。
  • 主要講解ActiveMQ的使用
  • 主要講解了關(guān)于ActiveMQ的集群的搭建
  • 稍微提到了類似ActiveMQ消息中間件的其他中間件
  • 我所講述的內(nèi)容,夠大家進(jìn)行入門了,如果要進(jìn)行深入的了解還是需要慢慢的去熟悉和學(xué)習(xí)的,而且消息中間件是非常重要的一個技術(shù),希望大家去好好的了解。
  • 最后,感謝各位的閱讀哦~~~~

    本站是提供個人知識管理的網(wǎng)絡(luò)存儲空間,所有內(nèi)容均由用戶發(fā)布,不代表本站觀點(diǎn)。請注意甄別內(nèi)容中的聯(lián)系方式、誘導(dǎo)購買等信息,謹(jǐn)防詐騙。如發(fā)現(xiàn)有害或侵權(quán)內(nèi)容,請點(diǎn)擊一鍵舉報。
    轉(zhuǎn)藏 分享 獻(xiàn)花(0

    0條評論

    發(fā)表

    請遵守用戶 評論公約

    類似文章 更多