RocketMQProducer.java 5.9 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190
  1. package com.fdkankan.mq.util;
  2. import cn.hutool.core.util.StrUtil;
  3. import com.alibaba.fastjson.JSONObject;
  4. import lombok.extern.slf4j.Slf4j;
  5. import org.apache.rocketmq.client.producer.SendCallback;
  6. import org.apache.rocketmq.client.producer.SendResult;
  7. import org.apache.rocketmq.spring.core.RocketMQTemplate;
  8. import org.apache.rocketmq.spring.support.RocketMQUtil;
  9. import org.springframework.messaging.Message;
  10. import org.springframework.messaging.support.MessageBuilder;
  11. import org.springframework.stereotype.Component;
  12. import javax.annotation.Resource;
  13. import java.util.Objects;
  14. @Component
  15. @Slf4j
  16. public class RocketMQProducer {
  17. @Resource
  18. RocketMQTemplate rocketMQTemplate;
  19. /**
  20. * 同步发送
  21. * @param topic 主题名
  22. * @param body 消息体
  23. * @return
  24. */
  25. public SendResult syncSend(String topic, Object body){
  26. return this.syncSend(topic, null, null, body, null);
  27. }
  28. /**
  29. * 同步发送
  30. * @param topic 主题名
  31. * @param tag tag
  32. * @param body 消息体
  33. * @return
  34. */
  35. public SendResult syncSend(String topic, String tag, Object body){
  36. return this.syncSend(topic, tag, null, body, null);
  37. }
  38. /**
  39. * 同步发送
  40. * @param topic 主题名
  41. * @param tag tag
  42. * @param key key
  43. * @param body 消息体
  44. * @return
  45. */
  46. public SendResult syncSend(String topic, String tag, String key, Object body){
  47. return this.syncSend(topic, tag, key, body, null);
  48. }
  49. /**
  50. * 同步发送
  51. * @param topic 主题名
  52. * @param tag tag
  53. * @param key key
  54. * @param body 消息体
  55. * @param sendMsgTimeOut 发送消息超时时间
  56. * @return
  57. */
  58. public SendResult syncSend(String topic, String tag, String key, Object body, Long sendMsgTimeOut){
  59. if(StrUtil.isNotEmpty(tag)){
  60. topic = topic + ":" + tag;
  61. }
  62. MessageBuilder messageBuilder = MessageBuilder.withPayload(body);
  63. if(StrUtil.isNotEmpty(key)){
  64. messageBuilder.setHeader("KEYS", key);
  65. }
  66. Message message = messageBuilder.build();
  67. sendMsgTimeOut = Objects.isNull(sendMsgTimeOut) ? rocketMQTemplate.getProducer().getSendMsgTimeout() : sendMsgTimeOut;
  68. return rocketMQTemplate.syncSend(topic, message, sendMsgTimeOut);
  69. }
  70. /**
  71. * 异步发送消息
  72. * @param topic 主题名
  73. * @param body 消息体
  74. * @param sendCallback 回调函数
  75. */
  76. public void asyncSend(String topic, Object body, SendCallback sendCallback){
  77. this.asyncSend(topic, null, null, body, sendCallback, null);
  78. }
  79. /**
  80. * 异步发送消息
  81. * @param topic 主题名
  82. * @param tag tag
  83. * @param body 消息体
  84. * @param sendCallback 回调函数
  85. */
  86. public void asyncSend(String topic, String tag, Object body, SendCallback sendCallback){
  87. this.asyncSend(topic, tag, null, body, sendCallback, null);
  88. }
  89. /**
  90. * 异步发送消息
  91. * @param topic 主题名
  92. * @param tag tag
  93. * @param key key
  94. * @param body 消息体
  95. * @param sendCallback 回调函数
  96. */
  97. public void asyncSend(String topic, String tag, String key, Object body, SendCallback sendCallback){
  98. this.asyncSend(topic, tag, key, body, sendCallback, null);
  99. }
  100. /**
  101. * 异步发送消息
  102. * @param topic 主题名
  103. * @param tag tag
  104. * @param key key
  105. * @param body 消息体
  106. * @param sendCallback 回调函数
  107. * @param sendMsgTimeOut 发送消息超时时间
  108. */
  109. public void asyncSend(String topic, String tag, String key, Object body, SendCallback sendCallback, Long sendMsgTimeOut){
  110. if(StrUtil.isNotEmpty(tag)){
  111. topic = topic + ":" + tag;
  112. }
  113. MessageBuilder messageBuilder = MessageBuilder.withPayload(body);
  114. if(StrUtil.isNotEmpty(key)){
  115. messageBuilder.setHeader("KEYS", key);
  116. }
  117. Message message = messageBuilder.build();
  118. sendMsgTimeOut = Objects.isNull(sendMsgTimeOut) ? rocketMQTemplate.getProducer().getSendMsgTimeout() : sendMsgTimeOut;
  119. rocketMQTemplate.asyncSend(topic, message, sendCallback, sendMsgTimeOut);
  120. }
  121. /**
  122. * 反向发送消息,生产者只管发送,不管发送结果
  123. * @param topic 主题名
  124. * @param body 消息体
  125. */
  126. public void sendOneWay(String topic, Object body){
  127. this.sendOneWay(topic, null, null, body);
  128. }
  129. /**
  130. * 反向发送消息,生产者只管发送,不管发送结果
  131. * @param topic 主题名
  132. * @param tag tag
  133. * @param body 消息体
  134. */
  135. public void sendOneWay(String topic, String tag, Object body){
  136. this.sendOneWay(topic, tag, null, body);
  137. }
  138. /**
  139. * 反向发送消息,生产者只管发送,不管发送结果
  140. * @param topic 主题名
  141. * @param tag tag
  142. * @param key key
  143. * @param body 消息体
  144. */
  145. public void sendOneWay(String topic, String tag, String key, Object body){
  146. if(StrUtil.isNotEmpty(tag)){
  147. topic = topic + ":" + tag;
  148. }
  149. log.info("MQ推送消息---topic:{},tag:{},key:{},msg:{}",topic,tag,key, JSONObject.toJSONString(body));
  150. MessageBuilder messageBuilder = MessageBuilder.withPayload(body);
  151. if(StrUtil.isNotEmpty(key)){
  152. messageBuilder.setHeader("KEYS", key);
  153. }
  154. Message message = messageBuilder.build();
  155. rocketMQTemplate.sendOneWay(topic, message);
  156. log.info("MQ推送成功...");
  157. }
  158. /**
  159. * 顺序消费 计算场景使用)
  160. * @param topic 主题名
  161. * @param hashKey: 队列key,保证报到同一个队列中 ( number )
  162. * @param body 消息体
  163. */
  164. public void sendInOrder(String topic, String hashKey, Object body){
  165. log.info("MQ推送消息---topic:{},hashKey:{},msg:{}",topic,hashKey, JSONObject.toJSONString(body));
  166. rocketMQTemplate.syncSendOrderly(topic,body,hashKey);
  167. log.info("MQ推送成功...");
  168. }
  169. }