在上一篇文章中,我們把消息發(fā)送給了一個主題的生產(chǎn)者,本篇會演示將消息發(fā)送給多個主題的生產(chǎn)者。 復制stream-kafka-8080工程,重命名為stream-kafka2-8080. 首先要在producer文件夾下自定義一個Source接口?! ?/p> package com.javafamily.producer; import org.springframework.cloud.stream.annotation.Output; import org.springframework.messaging.MessageChannel; /** * 自定義管道 */ public interface CustomSource { String CHANNEL_NAME = "xxx"; @Output(CustomSource.CHANNEL_NAME) MessageChannel output(); }1234567891011121314復制代碼類型:[java] 之后將原有的發(fā)布者類PetsProducer進行修改。 package com.javafamily.producer; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.beans.factory.annotation.Qualifier; import org.springframework.cloud.stream.annotation.EnableBinding; import org.springframework.cloud.stream.messaging.Source; import org.springframework.messaging.MessageChannel; import org.springframework.messaging.support.MessageBuilder; import org.springframework.stereotype.Component; @Component // 將MQ與生產(chǎn)者類通過消息管道相綁定 @EnableBinding({Source.class, CustomSource.class}) public class PetsProducer { // 必須使用byName方式的自動注入 @Autowired @Qualifier(Source.OUTPUT) private MessageChannel channel; @Autowired @Qualifier(CustomSource.CHANNEL_NAME) private MessageChannel customChannel; public String sendMessage(String msg) { // 將消息寫入到兩個管道,將會寫入到兩個主題 channel.send(MessageBuilder.withPayload(msg).build()); customChannel.send(MessageBuilder.withPayload(msg).build()); return msg; } }123456789101112131415161718192021222324252627282930復制代碼類型:[java] 在配置文件中添加如下輸出目標。 spring: cloud: stream: bindings: xxx: destination: cities content-type: text/plain1234567復制代碼類型:[java] 完成以上配置后,運行程序,在postman中分別post: http://localhost:8080/msg/send?message=ddddddd http://localhost:8080/msg/send?message=eeeeeee http://localhost:8080/msg/send?message=fffffff 創(chuàng)建消息消費者Spring Cloud Stream提供了三種創(chuàng)建消費者的方式,這三種方式的都是在消費者類的“消費”方法上添加注解。只要有新的消息寫入到了管道,該“消費”方法就會執(zhí)行。只不過三種注解,其底層的實現(xiàn)方式不同。即當新消息到來后,觸發(fā)“消費”方法去執(zhí)行的實現(xiàn)方式不同。 @PostConstruct:以發(fā)布/訂閱方式實現(xiàn) @ServiceActivator:以新消息激活服務的方式實現(xiàn) @StreamListener:以監(jiān)聽方式實現(xiàn) @PostConstruct 在consumer文件夾下創(chuàng)建PostConstructConsumer類。 package com.javafamily.consumer; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.beans.factory.annotation.Qualifier; import org.springframework.cloud.stream.annotation.EnableBinding; import org.springframework.cloud.stream.messaging.Sink; import org.springframework.messaging.SubscribableChannel; import org.springframework.stereotype.Component; import javax.annotation.PostConstruct; @Component @EnableBinding(Sink.class) public class PostConstructConsumer { @Autowired @Qualifier(Sink.INPUT) private SubscribableChannel channel; @PostConstruct public void printMessage() { channel.subscribe(msg -> { // MessageHeaders headers = msg.getHeaders(); System.out.println(new String((byte[]) msg.getPayload())); }); } }1234567891011121314151617181920212223242526復制代碼類型:[java] 添加配置文件。 spring: cloud: stream: bindings: # 指定要綁定的輸入管道,及要消費的管道中的消息主題 input: destination: names1234567復制代碼類型:[java] @ServiceActivator 在consumer文件夾下創(chuàng)建PostConstructConsumer類。 package com.javafamily.consumer; import org.springframework.cloud.stream.annotation.EnableBinding; import org.springframework.cloud.stream.messaging.Sink; import org.springframework.integration.annotation.ServiceActivator; import org.springframework.stereotype.Component; @Component @EnableBinding(Sink.class) public class ServiceActivatorConsumer { @ServiceActivator(inputChannel = Sink.INPUT) public void printMessage(Object msg) { System.out.println(msg); } }123456789101112131415復制代碼類型:[java] 在運行程序前先將PostConstructConsumer類注釋掉。 @StreamListener 在consumer文件夾下創(chuàng)建StreamListenerConsumer類。 package com.javafamily.consumer; import org.springframework.cloud.stream.annotation.EnableBinding; import org.springframework.cloud.stream.annotation.StreamListener; import org.springframework.cloud.stream.messaging.Sink; import org.springframework.stereotype.Component; @Component @EnableBinding(Sink.class) public class StreamListenerConsumer { @StreamListener(Sink.INPUT) public void printMessage(Object msg) { System.out.println(msg); } }123456789101112131415復制代碼類型:[java] gitee: https:///javainfamily/spring-cloud |
|
來自: 碼農(nóng)9527 > 《Java》