博客
关于我
强烈建议你试试无所不能的chatGPT,快点击我
kafaka发送接收消息stream方式实例
阅读量:4042 次
发布时间:2019-05-24

本文共 3508 字,大约阅读时间需要 11 分钟。

1.配置文件

input为接收,output为发送

如果发送接收在同一个程序中,则不需要加上consumer: headerMode:raw ,如果本程序仅是接收消息进行消费,需要加上consumer: headerMode:raw 

spring:  cloud:    stream:      bindings:        input-collect:          contentType: text/plain;charset=UTF-8          destination: ACCOUNT_COLLECT_NOTIFY          group: account-dev        input-order:          consumer:            headerMode: raw          contentType: text/plain;charset=UTF-8          destination: ACCOUNT_ORDER_NOTIFY          group: account-dev        output-collect:          contentType: text/plain;charset=UTF-8          destination: ACCOUNT_COLLECT_NOTIFY        output-watch:          contentType: text/plain;charset=UTF-8          destination: ACCOUNT_WATCH_NOTIFY      kafka:        binder:          brokers: 192.168.1.158:9092,192.168.1.159:9092,192.168.1.160:9092          zkNodes: 192.168.1.158:2181,192.168.1.159:2181,192.168.1.160:2181

2.发送消息outPut分类Bean

public interface NotifyMessageChannel {   String COLLECT_OUTPUT = "output-collect";   String WATCH_OUTPUT ="output-watch" ;      @Output(NotifyMessageChannel.COLLECT_OUTPUT)   MessageChannel collectOutPut();   @Output(NotifyMessageChannel.WATCH_OUTPUT)   MessageChannel watchOutPut();}

3.发送消息service

NotifyMessageChannel中定义了2个发送MessageChannel,发送时可以直接.collectOutPut().send,选择不同的output进行发送

@Service@Slf4j@EnableBinding(NotifyMessageChannel.class)public class NotifyServiceImpl implements NotifyService {    @Autowired    private NotifyMessageChannel notifyMessageChannel;    private ObjectMapper mapper = new ObjectMapper();    @Override    public void sendUserCollectCourse(UserCollectCourseNotify userCollectCourseNotify) {        try {            Boolean result = notifyMessageChannel.collectOutPut().send(MessageBuilder.withPayload(                    mapper.writeValueAsString(userCollectCourseNotify)).build()) ;            log.info("send result:"+result);        } catch (Exception e) {            log.error("Exception from create user UserCollectCourse.", e);        }    }}

4.接收消息input配置

public interface ReceiveMessageChannel {   String COLLECT_INPUT = "input-collect";   String ORDER_INPUT ="input-order" ;      @Input(ReceiveMessageChannel.COLLECT_INPUT)   SubscribableChannel collectInput();   @Input(ReceiveMessageChannel.ORDER_INPUT)   SubscribableChannel orderInput();}

5.监听接收到的消息,进行消费处理

@Service@Slf4j@EnableBinding(ReceiveMessageChannel.class)public class CollectListener {      private UserCollectCourseClient userCollectCourseService;      private ObjectMapper mapper = new ObjectMapper();   public CollectListener(UserCollectCourseClient userCollectCourseClient) {      this.userCollectCourseService = userCollectCourseClient;   }   @StreamListener(ReceiveMessageChannel.COLLECT_INPUT)   public void process(Message
message) { log.debug("Received Notify:[{}]",message.toString()); String content = message.getPayload(); UserCollectCourseNotify uccn; try { uccn = mapper.readValue(content,UserCollectCourseNotify.class); log.debug("Received Notify:[userId:{},courseId:{}]",uccn.getUserId(),uccn.getCourseId()); if(uccn!=null){ log.info("receive UserCollectCourseNotify:"+uccn);// userCollectCourseService.saveUserCollectCourse(ucci);// log.debug("Save Collect to Mongo:[userId:{},courseId:{}]",ucci.getUserId(),ucci.getCourseId()); } } catch (Exception e) { log.warn("RECEIVE Collect NOTIFY ERROR:[message_body:{},error:{}]",message.toString(),e.getLocalizedMessage()); } }}

转载地址:http://yeadi.baihongyu.com/

你可能感兴趣的文章
layoutSubviews 和 layoutIf…
查看>>
关于request.getRealPath(…
查看>>
《转》搞定学习《unix环境高级编程…
查看>>
对应iPhone5 长屏幕的方法 我找到…
查看>>
突然发现,MarsEdit可以离线…
查看>>
test MarsEdit
查看>>
static 关键字 整理
查看>>
C 实现冒泡排序
查看>>
在Mac下用Eclipse看Java的源码
查看>>
union:C/C++语言关键字 内存使用
查看>>
sizeof 与 strlen 研究
查看>>
iCloud 学习笔记
查看>>
C语言中的#define宏定义 求一…
查看>>
不用任何局部变量与库函数,写个st…
查看>>
ios禁用多按钮同时点下的效果
查看>>
iSecret 1.2 随着iPhone…
查看>>
iOS 序列化与反序列化
查看>>
给Eclipse安装eUML2插件以及可能出…
查看>>
XCode 4 制作静态库详解
查看>>
旧工程适配iOS6和iPhone5续之第三…
查看>>