日韩黑丝制服一区视频播放|日韩欧美人妻丝袜视频在线观看|九九影院一级蜜桃|亚洲中文在线导航|青草草视频在线观看|婷婷五月色伊人网站|日本一区二区在线|国产AV一二三四区毛片|正在播放久草视频|亚洲色图精品一区

分享

Spring Cloud Stream消息發(fā)送給多個主題的生產(chǎn)者

 碼農(nóng)9527 2021-11-30

 在上一篇文章中,我們把消息發(fā)送給了一個主題的生產(chǎn)者,本篇會演示將消息發(fā)送給多個主題的生產(chǎn)者。

  復制stream-kafka-8080工程,重命名為stream-kafka2-8080.

  首先要在producer文件夾下自定義一個Source接口?! ?/p>

package com.javafamily.producer;

import org.springframework.cloud.stream.annotation.Output;
import org.springframework.messaging.MessageChannel;

/**
 * 自定義管道
 */
public interface CustomSource {
    String CHANNEL_NAME = "xxx";

    @Output(CustomSource.CHANNEL_NAME)
    MessageChannel output();
}1234567891011121314復制代碼類型:[java]

  之后將原有的發(fā)布者類PetsProducer進行修改。

package com.javafamily.producer;

import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Qualifier;
import org.springframework.cloud.stream.annotation.EnableBinding;
import org.springframework.cloud.stream.messaging.Source;
import org.springframework.messaging.MessageChannel;
import org.springframework.messaging.support.MessageBuilder;
import org.springframework.stereotype.Component;

@Component
// 將MQ與生產(chǎn)者類通過消息管道相綁定
@EnableBinding({Source.class, CustomSource.class})
public class PetsProducer {
    // 必須使用byName方式的自動注入
    @Autowired
    @Qualifier(Source.OUTPUT)
    private MessageChannel channel;

    @Autowired
    @Qualifier(CustomSource.CHANNEL_NAME)
    private MessageChannel customChannel;

    public String sendMessage(String msg) {
        // 將消息寫入到兩個管道,將會寫入到兩個主題
        channel.send(MessageBuilder.withPayload(msg).build());
        customChannel.send(MessageBuilder.withPayload(msg).build());
        return msg;
    }
}123456789101112131415161718192021222324252627282930復制代碼類型:[java]

  在配置文件中添加如下輸出目標。

spring:
  cloud:
    stream:    
      bindings:
        xxx:
          destination: cities
          content-type: text/plain1234567復制代碼類型:[java]

  完成以上配置后,運行程序,在postman中分別post:

  http://localhost:8080/msg/send?message=ddddddd

  http://localhost:8080/msg/send?message=eeeeeee

  http://localhost:8080/msg/send?message=fffffff

  創(chuàng)建消息消費者

  Spring Cloud Stream提供了三種創(chuàng)建消費者的方式,這三種方式的都是在消費者類的“消費”方法上添加注解。只要有新的消息寫入到了管道,該“消費”方法就會執(zhí)行。只不過三種注解,其底層的實現(xiàn)方式不同。即當新消息到來后,觸發(fā)“消費”方法去執(zhí)行的實現(xiàn)方式不同。

  @PostConstruct:以發(fā)布/訂閱方式實現(xiàn)

  @ServiceActivator:以新消息激活服務的方式實現(xiàn)

  @StreamListener:以監(jiān)聽方式實現(xiàn)

  @PostConstruct

  在consumer文件夾下創(chuàng)建PostConstructConsumer類。

package com.javafamily.consumer;

import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Qualifier;
import org.springframework.cloud.stream.annotation.EnableBinding;
import org.springframework.cloud.stream.messaging.Sink;
import org.springframework.messaging.SubscribableChannel;
import org.springframework.stereotype.Component;

import javax.annotation.PostConstruct;

@Component
@EnableBinding(Sink.class)
public class PostConstructConsumer {
    @Autowired
    @Qualifier(Sink.INPUT)
    private SubscribableChannel channel;

    @PostConstruct
    public void printMessage() {
        channel.subscribe(msg -> {
            // MessageHeaders headers = msg.getHeaders();
            System.out.println(new String((byte[]) msg.getPayload()));
        });
    }
}1234567891011121314151617181920212223242526復制代碼類型:[java]

  添加配置文件。

spring:  
  cloud:
    stream:
      bindings:
        # 指定要綁定的輸入管道,及要消費的管道中的消息主題
        input:
          destination: names1234567復制代碼類型:[java]

  @ServiceActivator

  在consumer文件夾下創(chuàng)建PostConstructConsumer類。

package com.javafamily.consumer;

import org.springframework.cloud.stream.annotation.EnableBinding;
import org.springframework.cloud.stream.messaging.Sink;
import org.springframework.integration.annotation.ServiceActivator;
import org.springframework.stereotype.Component;

@Component
@EnableBinding(Sink.class)
public class ServiceActivatorConsumer {
    @ServiceActivator(inputChannel = Sink.INPUT)
    public void printMessage(Object msg) {
        System.out.println(msg);
    }
}123456789101112131415復制代碼類型:[java]

  在運行程序前先將PostConstructConsumer類注釋掉。

  @StreamListener

  在consumer文件夾下創(chuàng)建StreamListenerConsumer類。

package com.javafamily.consumer;

import org.springframework.cloud.stream.annotation.EnableBinding;
import org.springframework.cloud.stream.annotation.StreamListener;
import org.springframework.cloud.stream.messaging.Sink;
import org.springframework.stereotype.Component;

@Component
@EnableBinding(Sink.class)
public class StreamListenerConsumer {
    @StreamListener(Sink.INPUT)
    public void printMessage(Object msg) {
        System.out.println(msg);
    }
}123456789101112131415復制代碼類型:[java]

  gitee:

  https:///javainfamily/spring-cloud

    轉藏 分享 獻花(0

    0條評論

    發(fā)表

    請遵守用戶 評論公約

    類似文章 更多