package com.fdkankan.mq.util; import cn.hutool.core.util.StrUtil; import com.alibaba.fastjson.JSONObject; import lombok.extern.slf4j.Slf4j; import org.apache.rocketmq.client.producer.SendCallback; import org.apache.rocketmq.client.producer.SendResult; import org.apache.rocketmq.spring.core.RocketMQTemplate; import org.apache.rocketmq.spring.support.RocketMQUtil; import org.springframework.messaging.Message; import org.springframework.messaging.support.MessageBuilder; import org.springframework.stereotype.Component; import javax.annotation.Resource; import java.util.Objects; @Component @Slf4j public class RocketMQProducer { @Resource RocketMQTemplate rocketMQTemplate; /** * 同步发送 * @param topic 主题名 * @param body 消息体 * @return */ public SendResult syncSend(String topic, Object body){ return this.syncSend(topic, null, null, body, null); } /** * 同步发送 * @param topic 主题名 * @param tag tag * @param body 消息体 * @return */ public SendResult syncSend(String topic, String tag, Object body){ return this.syncSend(topic, tag, null, body, null); } /** * 同步发送 * @param topic 主题名 * @param tag tag * @param key key * @param body 消息体 * @return */ public SendResult syncSend(String topic, String tag, String key, Object body){ return this.syncSend(topic, tag, key, body, null); } /** * 同步发送 * @param topic 主题名 * @param tag tag * @param key key * @param body 消息体 * @param sendMsgTimeOut 发送消息超时时间 * @return */ public SendResult syncSend(String topic, String tag, String key, Object body, Long sendMsgTimeOut){ if(StrUtil.isNotEmpty(tag)){ topic = topic + ":" + tag; } MessageBuilder messageBuilder = MessageBuilder.withPayload(body); if(StrUtil.isNotEmpty(key)){ messageBuilder.setHeader("KEYS", key); } Message message = messageBuilder.build(); sendMsgTimeOut = Objects.isNull(sendMsgTimeOut) ? rocketMQTemplate.getProducer().getSendMsgTimeout() : sendMsgTimeOut; return rocketMQTemplate.syncSend(topic, message, sendMsgTimeOut); } /** * 异步发送消息 * @param topic 主题名 * @param body 消息体 * @param sendCallback 回调函数 */ public void asyncSend(String topic, Object body, SendCallback sendCallback){ this.asyncSend(topic, null, null, body, sendCallback, null); } /** * 异步发送消息 * @param topic 主题名 * @param tag tag * @param body 消息体 * @param sendCallback 回调函数 */ public void asyncSend(String topic, String tag, Object body, SendCallback sendCallback){ this.asyncSend(topic, tag, null, body, sendCallback, null); } /** * 异步发送消息 * @param topic 主题名 * @param tag tag * @param key key * @param body 消息体 * @param sendCallback 回调函数 */ public void asyncSend(String topic, String tag, String key, Object body, SendCallback sendCallback){ this.asyncSend(topic, tag, key, body, sendCallback, null); } /** * 异步发送消息 * @param topic 主题名 * @param tag tag * @param key key * @param body 消息体 * @param sendCallback 回调函数 * @param sendMsgTimeOut 发送消息超时时间 */ public void asyncSend(String topic, String tag, String key, Object body, SendCallback sendCallback, Long sendMsgTimeOut){ if(StrUtil.isNotEmpty(tag)){ topic = topic + ":" + tag; } MessageBuilder messageBuilder = MessageBuilder.withPayload(body); if(StrUtil.isNotEmpty(key)){ messageBuilder.setHeader("KEYS", key); } Message message = messageBuilder.build(); sendMsgTimeOut = Objects.isNull(sendMsgTimeOut) ? rocketMQTemplate.getProducer().getSendMsgTimeout() : sendMsgTimeOut; rocketMQTemplate.asyncSend(topic, message, sendCallback, sendMsgTimeOut); } /** * 反向发送消息,生产者只管发送,不管发送结果 * @param topic 主题名 * @param body 消息体 */ public void sendOneWay(String topic, Object body){ this.sendOneWay(topic, null, null, body); } /** * 反向发送消息,生产者只管发送,不管发送结果 * @param topic 主题名 * @param tag tag * @param body 消息体 */ public void sendOneWay(String topic, String tag, Object body){ this.sendOneWay(topic, tag, null, body); } /** * 反向发送消息,生产者只管发送,不管发送结果 * @param topic 主题名 * @param tag tag * @param key key * @param body 消息体 */ public void sendOneWay(String topic, String tag, String key, Object body){ if(StrUtil.isNotEmpty(tag)){ topic = topic + ":" + tag; } log.info("MQ推送消息---topic:{},tag:{},key:{},msg:{}",topic,tag,key, JSONObject.toJSONString(body)); MessageBuilder messageBuilder = MessageBuilder.withPayload(body); if(StrUtil.isNotEmpty(key)){ messageBuilder.setHeader("KEYS", key); } Message message = messageBuilder.build(); rocketMQTemplate.sendOneWay(topic, message); log.info("MQ推送成功..."); } /** * 顺序消费 计算场景使用) * @param topic 主题名 * @param hashKey: 队列key,保证报到同一个队列中 ( number ) * @param body 消息体 */ public void sendInOrder(String topic, String hashKey, Object body){ log.info("MQ推送消息---topic:{},hashKey:{},msg:{}",topic,hashKey, JSONObject.toJSONString(body)); rocketMQTemplate.syncSendOrderly(topic,body,hashKey); log.info("MQ推送成功..."); } }