日韩黑丝制服一区视频播放|日韩欧美人妻丝袜视频在线观看|九九影院一级蜜桃|亚洲中文在线导航|青草草视频在线观看|婷婷五月色伊人网站|日本一区二区在线|国产AV一二三四区毛片|正在播放久草视频|亚洲色图精品一区

分享

ActiveMQ入門篇

 貪挽懶月 2022-06-20 發(fā)布于廣東
什么是MQ?

MQ,中文名字叫做消息中間件。既然是中間件,那么就說明它左邊有東西,右邊也有東西。那么左邊是什么?右邊又是什么呢?MQ在中間能干嘛呢?看看下面的例子。

1、生活中的case:
老師講完了練習,然后對同學們說有問題的現(xiàn)在就過來問。然后張三李四王五趙六都有問題要問。那么他們就按順序排隊。張三需要5分鐘,然后是李四8分鐘,再然后才是王五10分鐘,最后是趙六。這就相當于dubbo的RPC遠程調(diào)用。也就是說,張三問的時候老師這個系統(tǒng)只能響應(yīng)張三,后面的人都得等著。這樣就會導致學生和老師耦合度高,而且效率低,如果問問題的學生多,越后面的人等待的時間也越長,老師還會累死。怎么優(yōu)化呢?

2、優(yōu)化方案:
老師會叫同學們把需要問的問題按照約定的格式在紙上寫好,然后交給班長。等老師解答完當前學生的問題,就從班長那里拿出一份問題。這樣一來,同學們也不用干等著,交了問題后該干嘛就干嘛去,老師也可以選擇適當?shù)臅r間再解答,不會被累死。

這個案例中的班長就是一個中間件,它不處理真正的邏輯,只是一個中間人。學生不直接問老師,而是通過班長,使得學生和老師解耦了;其次,學生上午交的問題,可能下午才得到老師的解答,整個過程是異步的;即便有一大群學生來問問題,這些請求也會堆積在班長那里,可以幫老師抵流量沖擊,而不會影響到老師。綜上:
MQ的作用:

  • 異步;

  • 解耦;

  • 削峰

AcitiveMQ的安裝
  • 首先從官網(wǎng)下載activeMQ (linux版本);

  • 然后解壓就行了(activeMQ是java編寫的,所以需要安裝JDK)。

進入到bin目錄,然后執(zhí)行如下命令:

  • 啟動:./activemq start

  • 指定xml配置文件啟動:./activemq start xbean:file:/文件路徑

  • 關(guān)閉:./activemq stop

  • 重啟:./activemq restart

activeMQ的后臺啟動端口是 61616,要想查看是否啟動成功,有如下幾種方式:

  • ps -ef | grep activemq| grep -v grep

  • netstat -anp | grep 61616

  • lsof -i:61616

activemq還有一個圖形界面,端口是 8161。首先保證你的 Linux 虛擬機和 windows 的 ip 處于同一個網(wǎng)段,然后確保沒有被防火墻給屏蔽,在Linux 和 windows 上互 ping 一下。能 ping 通后,就在 瀏覽器訪問 192.168.x.xx:8161, 默認的用戶名和密碼都是 admin。訪問后可以看到如下界面:

activemq的圖形界面
ActiveMQ怎么玩?

上面舉了生活中的例子來說明MQ的作用,說白了就是我們先把問題發(fā)到MQ中,然后從MQ中取出消息。那么具體是發(fā)送到MQ中的什么位置呢?這個位置我們管它叫destination,即目的地。
目的地有以下兩種:

  • 隊列queue(點對點);

  • 主題topic(發(fā)布與訂閱);



1、點對點傳輸:
所謂點對點傳輸,可以理解為發(fā)私信。你發(fā)了一條消息給你女朋友,只有你女朋友能收到。那接下來就看看怎么發(fā)消息和收消息。首先添加依賴:

 <!-- activemq-all -->
 <dependency>
      <groupId>org.apache.activemq</groupId>
      <artifactId>activemq-all</artifactId>
      <version>5.15.8</version>
 </dependency>
 <dependency>
      <groupId>org.apache.xbean</groupId>
      <artifactId>xbean-spring</artifactId>
      <version>4.12</version>
 </dependency>
  • 生產(chǎn)消息:

public class Productor {
    private static final String URL = "tcp://192.168.0.103:61616";
    private static final String QUEUE_NAME = "queue_test";

    public static void main(String[] args) throws Exception {
        // 1. 創(chuàng)建factory工廠
        ActiveMQConnectionFactory factory = new ActiveMQConnectionFactory(URL);
        // 2. 創(chuàng)建connection連接
        Connection connection = factory.createConnection();
        connection.start();
        // 3. 創(chuàng)建session
        Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
        // 4. 創(chuàng)建目的地queue
        Queue queue = session.createQueue(QUEUE_NAME);
        MessageProducer producer = session.createProducer(queue);
        // 5. 生產(chǎn)消息
        for (int i = 1; i <= 3; i++) {
            TextMessage message = session.createTextMessage("queue" + 1);
            // 6. 將消息發(fā)送到MQ
            producer.send(message);
        }
        // 7. 關(guān)閉資源(順著申請,倒著關(guān)閉)
        producer.close();
        session.close();
        connection.close();
        System.out.println("發(fā)送到MQ完成!");
    }
}

運行后,就可以在8161端口看到如下信息了:

生產(chǎn)消息
  • 消費消息:

public class Consumer {
    private static final String URL = "tcp://192.168.0.103:61616";
    private static final String QUEUE_NAME = "queue_test";

    public static void main(String[] args) throws Exception {
        // 1. 創(chuàng)建factory工廠
        ActiveMQConnectionFactory factory = new ActiveMQConnectionFactory(URL);
        // 2. 創(chuàng)建connection連接
        Connection connection = factory.createConnection();
        connection.start();
        // 3. 創(chuàng)建session
        Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
        // 4. 創(chuàng)建目的地queue
        Queue queue = session.createQueue(QUEUE_NAME);
        MessageConsumer consumer = session.createConsumer(queue);
        // 5. 消費消息
        while (true){
            // receive里面的參數(shù)表示超時時間
            TextMessage message = (TextMessage) consumer.receive(3000);
            if (message != null)
                System.out.println(message.getText());
            else
                break;
        }
        // 6. 關(guān)閉資源(順著申請,倒著關(guān)閉)
        consumer.close();
        session.close();
        connection.close();
        System.out.println("3秒還沒消息來,我溜了!");
    }
}

運行后,在8161端口就可以看到如下變化:

消費消息

可以看到消息隊列為3,出列的也是3,說明消費完了。

  • 異步監(jiān)聽的方式消費消息:
    異步相對的就是同步,上面那種方式就是同步的。就是調(diào)用receive方法來接收消息,在沒接收到消息或超時之前,程序?qū)⒁恢弊枞?。在上面那段代碼中,receive方法設(shè)置了3秒的超時時間,假如MQ中此刻沒有消息供消費,那么程序?qū)⒁?秒后才能輸出 “3秒還沒消息,我溜了!” 這句話。異步就是不會阻塞,即使沒收到消息,程序還是該干嘛就干嘛。異步監(jiān)聽方式寫法如下:

TextMessage message = (TextMessage) consumer.receive();
consumer.setMessageListener(new MessageListener() {
            @Override
            public void onMessage(Message message
{
                if (message != null && message instanceof TextMessage){
                    TextMessage textMessage = (TextMessage) message;
                    try {
                        System.out.println("收到消息: " +  textMessage.getText());
                    } catch (JMSException e) {
                        e.printStackTrace();
                    }
                }
            }
        });
        System.in.read();
        // 6. 關(guān)閉資源(順著申請,倒著關(guān)閉)
        consumer.close();
        session.close();
        connection.close();
  • 啟動順序問題:
    -- 先啟動生產(chǎn)者,再依次啟動兩個消費者:
    ------- 先啟動的消費者可以拿到消息,后啟動的就不能消費了。結(jié)論:消息不能被重復消費。
    -- 先啟動兩個消費者,再啟動生產(chǎn)者生產(chǎn)消息:
    ------- 結(jié)果就是兩個消費者一人消費一半。

  • 小總結(jié):

從上面生產(chǎn)消息和消費消息的demo中可以發(fā)現(xiàn),其步驟其實和JDBC操作數(shù)據(jù)庫差不多,都是先創(chuàng)建factory,然后通過factory創(chuàng)建connection連接,再創(chuàng)建session,最后執(zhí)行操作的是session。點對點傳輸還有如下特點:

  • 每條消息只能有一個消費者,也就是上面說的消息不能被重復消費;

  • 消息生產(chǎn)者和消費者沒有時間上的關(guān)聯(lián),生產(chǎn)消息時不用管是不是有人消費,消費者也隨時可以提取消息;

  • 消息被消費后將不會再存儲,用過就沒了。



2、發(fā)布與訂閱:
上面說了點對點,就是你跟你女朋友發(fā)微信。那么發(fā)布與訂閱就是你在微信公眾號發(fā)推文,凡是關(guān)注了你公眾號的人都能收到消息。點對點的目的地是queue,發(fā)布與訂閱的目的地是topic,每條消息可以有多個消費者;生產(chǎn)者和消費者有時間上的關(guān)聯(lián),訂閱了某個topic,只能消費你訂閱之后的消息,說簡單就是,關(guān)注了你公眾號的人,他不能收到在他關(guān)注你之前的消息;假如無人訂閱就去生產(chǎn),那就是一條廢消息,沒有人關(guān)注你的公眾號,那么你發(fā)的推文就沒有意思,就是一條廢消息,所以一般會先啟動消費者,再啟動生產(chǎn)者。

關(guān)于發(fā)布與訂閱,相比點對點,只需要把queue改成topic就可以了,這里就不再貼代碼了。

關(guān)于topic和queue的區(qū)別,如下表所示:
 |topic|queue
:-:|:-:|:-:
工作模式|一對多|一對一
狀態(tài)|無狀態(tài)|queue數(shù)據(jù)會在mq服務(wù)器上以文件形式保存,也可配置成DB存儲
完整性|如果沒有訂閱者,消息將被丟棄|消息不會被丟棄
處理效率|隨著訂閱者的增加效率會降低|由于一條消息只發(fā)給一個消費者,所以消費者再多也不會明顯地影響性能。

關(guān)于JMS

1、什么是JMS?
JMS中文名叫Java消息服務(wù),它是一種規(guī)范,是javaEE的13種核心規(guī)范之一。關(guān)于javaEE的13種核心規(guī)范,網(wǎng)上一搜一大堆,這里不再贅述。JMS就是天上飛的理念,而各種MQ就是這種理念的落地實現(xiàn)。比如activeMQ、rocketMQ等,都要遵循JMS這個規(guī)范。


2、JMS的結(jié)構(gòu)和特點:

  • JMS結(jié)構(gòu):

  • JMS Provider:實現(xiàn)了JMS接口和規(guī)范的消息中間件,像activeMQ、rocketMQ等

  • JMS Producer:消息生產(chǎn)者

  • JMS consumer:消息消費者

  • JMS message:消息

    • 消息頭

    • JMSDestination:目的地,queue和topic

    • JMSDeliveryMode:分為持久和非持久模式。持久模式意味著消息即使JMS提供者出現(xiàn)故障,該消息并不會丟失,會在服務(wù)器恢復后再次發(fā)送;反之,非持久模式就是服務(wù)器出現(xiàn)故障,該消息將永久丟失。

    • JMSExpiration:消息過期時間,如果為0,表示永不過期。

    • JMSPriority:優(yōu)先級,0到4是普通消息,5到9是加急消息,默認是4。

    • JMSMessageID:消息的唯一標識,由MQ生成。

    • 消息體

    • 封裝的具體消息數(shù)據(jù)就是消息體

    • 消息體格式,有5種,常用的 TextMessage(String類型) 和 MapMessage(key、value形式)

    • 發(fā)送和接收的消息體類型必須對應(yīng)一致

    • 消息屬性

    • 是什么:一個對象的屬性能干嘛?用來描述這個對象的特點嘛,消息屬性也一樣地理解就好了。

    • 如果需要除消息頭字段以外的值,可以使用消息屬性

    • 消息屬性可以用來做識別/去重/重點標注等操作,設(shè)置消息屬性的方法如下:

TextMessage textMessage = new session.createTextMessage("這是一條TextMessage");
// TextMessage 類型設(shè)置消息屬性
textMessage.setStringProperty("property""VIP");

在消費者中取出消息后:

textMessage.getStringProperty("property")

即可取出消息屬性。
注意上面JMS結(jié)構(gòu)的層級關(guān)系。

3、如何保證消息的可靠性?(面試重點)

一般要從三個角度去回答(持久性、事務(wù)、簽收)。

- 持久性:持久,是MQ掛了,消息依然存在,非持久,就是MQ掛了,消息就沒了。

隊列生產(chǎn)者的持久性:

// 這個producer是隊列
 producer.setDeliveryMode(DeliveryMode.NON_PERSISTENT); // 非持久
 producer.setDeliveryMode(DeliveryMode.PERSISTENT); // 持久

隊列設(shè)置為非持久,如果生產(chǎn)者將消息發(fā)送到MQ后,MQ掛了,那么這些消息就沒了,即使MQ恢復正常也沒了。隊列設(shè)置為持久,那么消息只要還沒消費就還會有。activeMQ的隊列默認設(shè)置了持久,可保證消息只被傳送一次和成功使用一次。


主題的持久性:
主題要設(shè)置持久,生產(chǎn)者和消費者的編碼方式與之前都有點兒不一樣,代碼如下:

public class Consumer {
    private static final String URL = "tcp://192.168.x.xxx:61616";
    private static final String TOPIC_NAME = "topic_test";

    public static void main(String[] args) throws Exception {
        // 1. 創(chuàng)建factory工廠
        ActiveMQConnectionFactory factory = new ActiveMQConnectionFactory(URL);
        // 2. 創(chuàng)建connection連接
        Connection connection = factory.createConnection();
        connection.setClientID("張三");
        System.out.println("張三訂閱");
        // 3. 創(chuàng)建session
        Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
        // 4. 訂閱topic
        Topic topic = session.createTopic(TOPIC_NAME);
        TopicSubscriber subscriber = session.createDurableSubscriber(topic,"備注信息");
        // 5. 啟動
        connection.start();
        // 6. 消費topic的消息
        Message message = subscriber.receive();
        while (null != message){
            TextMessage textMessage = (TextMessage) message;
            System.out.println("收到消息:" +  textMessage.getText());
            message = subscriber.receive(5000L);
        }
        // 6. 關(guān)閉資源(順著申請,倒著關(guān)閉)
        session.close();
        connection.close();
        System.out.println("5秒還沒消息來,我溜了!");
    }
}
public class Productor {
    private static final String URL = "tcp://192.168.0.103:61616";
    private static final String TOPIC_NAME = "topic_test";

    public static void main(String[] args) throws Exception {
        // 1. 創(chuàng)建factory工廠
        ActiveMQConnectionFactory factory = new ActiveMQConnectionFactory(URL);
        // 2. 創(chuàng)建connection連接
        Connection connection = factory.createConnection();
        // 3. 創(chuàng)建session
        Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
        // 4. 創(chuàng)建目的地topic
        Topic topic = session.createTopic(TOPIC_NAME);
        MessageProducer producer = session.createProducer(topic);
        // 設(shè)置持久性
        //producer.setDeliveryMode(DeliveryMode.NON_PERSISTENT); // 非持久
        producer.setDeliveryMode(DeliveryMode.PERSISTENT); // 持久

        connection.start();
        // 5. 生產(chǎn)消息
        for (int i = 1; i <= 3; i++) {
            TextMessage message = session.createTextMessage("queue" + i);
            // 6. 將消息發(fā)送到MQ
            producer.send(message);
        }
        // 7. 關(guān)閉資源(順著申請,倒著關(guān)閉)
        producer.close();
        session.close();
        connection.close();
        System.out.println("發(fā)送到MQ完成!");
    }
}

主題設(shè)置了持久的話,一定要先運行一次消費者,等于向MQ注冊,表示我訂閱了這個主題。然后再運行生產(chǎn)者發(fā)送信息,此時,不論消費者是否還在線,都會接收到消息,不在線的話,下次連接的時候,會把沒有收過的消息都接收下來。


- 事務(wù):創(chuàng)建session的時候要傳兩個參數(shù),一個是事務(wù),一個是簽收。

生產(chǎn)者事務(wù):

Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);

第一個參數(shù)就是表示事務(wù),設(shè)置為false,表示只要執(zhí)行了send方法,消息就進入到隊列中了;如果設(shè)置為true,需要send后再執(zhí)行commit,消息才會被提交到隊列中。所以在session提交前,需要調(diào)commit方法,如下:

try{
  //沒問題就提交事務(wù)
  session.commit();
}catch(Exception e){
  //有問題就回滾
  session.rollback();
}finally{
  producer.close();
  session.close();
}

生產(chǎn)者主事務(wù),不管簽收,因為消費者才需要簽收嘛。生產(chǎn)者設(shè)置了事務(wù),簽收機制就無所謂了,只是這個方法需要傳一個簽收機制,其實事務(wù)設(shè)置為true后,起作用的就是事務(wù)了。


消費者事務(wù):

如果消費者開啟了事務(wù),進行消費時而沒有commit的話,MQ會認為你還沒有成功消費消息,就會出現(xiàn)重復消費的情況,所以消費者一般不開啟事務(wù),而是以簽收機制為主。

簽收:簽收機制有四種,用得較多的是自動和手動兩種方式。

消費者非事務(wù)的手動簽收:

Session session = connection.createSession(false, Session.CLIENT_ACKNOWLEDGE);

如果這個時候直接運行消費者,發(fā)現(xiàn)又可以重復消費消息,因為MQ不知道你已經(jīng)簽收消息了。所以在receive到消息后,應(yīng)該手動簽收,才不會重復消費,如下:

while (null != message){
      TextMessage textMessage = (TextMessage) message;
      textMessage.acknowledge(); // 手動簽收
      System.out.println("收到消息:" +  textMessage.getText());
      message = subscriber.receive(5000L);
}

消費者開啟事務(wù)的情況下的簽收:

Session session = connection.createSession(true, Session.CLIENT_ACKNOWLEDGE);

開啟了事務(wù),就會自動設(shè)置為自動簽收,即使后面那個參數(shù)設(shè)置了手動簽收,也不起作用了。所以,不需要調(diào)用acknowledge()方法進行簽收。如果開啟了事務(wù),設(shè)置了手動簽收,調(diào)用了acknowledge()方法,但是沒有commit,還是會重復消費。


總之,在事務(wù)會話中,當一個事務(wù)被成功提交則消息被自動簽收,如果事務(wù)回滾,則消息會被再次傳遞。非事務(wù)會話中,消息何時被確認取決于創(chuàng)建會話時的簽收模式。

小結(jié):不能容忍丟失消息,就用持久訂閱,可以容忍丟失消息,就用非持久訂閱

ActiveMQ的broker

1、什么是broker?
broker就是嵌入式的activemq,也就是說,使用broker,只需要引入相關(guān)依賴就可以了,而不需要你本地安裝activemq,類似于springboot那樣內(nèi)嵌tomcat。

2、怎么用?
除了之前引入的activemq-all,還需要引入如下依賴:

<dependency>
    <groupId>com.fasterxml.jackson.core</groupId>
    <artifactId>jackson-databind</artifactId>
    <version>2.9.9.3</version>
</dependency>

然后編碼:

public static void main(String[] args) throws Exception{
        BrokerService service = new BrokerService();
        service.setUseJmx(true);
        service.addConnector("tcp://localhost:61616");
        service.start();
}

運行后,就可以在控制臺看到這個嵌入式的activemq已經(jīng)啟動了。

image.png
未完待續(xù)…

    轉(zhuǎn)藏 分享 獻花(0

    0條評論

    發(fā)表

    請遵守用戶 評論公約

    類似文章 更多