RabbitMqProducer.java 2.8 KB

1234567891011121314151617181920212223242526272829303132333435363738394041424344454647484950515253545556575859606162636465666768697071727374757677787980818283
  1. package com.fdkankan.rabbitmq.util;
  2. import cn.hutool.json.JSONObject;
  3. import com.fdkankan.rabbitmq.config.ModelingQueueConfig;
  4. import com.rabbitmq.client.AMQP;
  5. import com.rabbitmq.client.AMQP.Queue.DeclareOk;
  6. import com.rabbitmq.client.Channel;
  7. import org.slf4j.Logger;
  8. import org.slf4j.LoggerFactory;
  9. import org.springframework.amqp.rabbit.connection.CorrelationData;
  10. import org.springframework.amqp.rabbit.core.ChannelCallback;
  11. import org.springframework.amqp.rabbit.core.RabbitTemplate;
  12. import org.springframework.beans.factory.annotation.Autowired;
  13. import org.springframework.stereotype.Component;
  14. import java.util.UUID;
  15. /**
  16. * <p>
  17. * TODO
  18. * </p>
  19. *
  20. * @author dengsixing
  21. * @since 2022/4/19
  22. **/
  23. @Component
  24. public class RabbitMqProducer {
  25. private Logger log = LoggerFactory.getLogger(this.getClass().getName());
  26. @Autowired
  27. ModelingQueueConfig modelingQueueConfig;
  28. @Autowired
  29. private RabbitTemplate rabbitTemplate;
  30. /**
  31. * 工作队列模式发送消息
  32. * @param queue 队列名
  33. * @param content 载荷
  34. */
  35. public void sendByWorkQueue(String queue, Object content){
  36. String messageId = UUID.randomUUID().toString();
  37. log.info("开始发送Mq消息,messageId:{},消息队列:{},消息内容:{}",messageId, queue, new JSONObject(content).toString());
  38. rabbitTemplate.convertAndSend(queue, content, message -> {
  39. message.getMessageProperties().setMessageId(messageId);
  40. return message;
  41. }, new CorrelationData(messageId));
  42. }
  43. /**
  44. * 工作队列模式发送带优先级消息
  45. * @param queue 队列名
  46. * @param content 载荷
  47. * @param priority 优先级,正整数,值越大优先级越高,当值大于队列设置的最大优先级时,效果等同于最大优先级
  48. */
  49. public void sendByWorkQueue(String queue, Object content, Integer priority){
  50. String messageId = UUID.randomUUID().toString();
  51. log.info("开始发送Mq消息,messageId:{},消息队列:{},消息内容:{}",messageId, queue, new JSONObject(content).toString());
  52. rabbitTemplate.convertAndSend(queue, content, message -> {
  53. message.getMessageProperties().setMessageId(messageId);
  54. message.getMessageProperties().setPriority(priority);
  55. return message;
  56. }, new CorrelationData(messageId));
  57. }
  58. /**
  59. * 获取队列未被消费的消息数量
  60. * @return
  61. */
  62. public int getMessageCount(String queueName) {
  63. AMQP.Queue.DeclareOk declareOk = rabbitTemplate.execute(new ChannelCallback<DeclareOk>() {
  64. public AMQP.Queue.DeclareOk doInRabbit(Channel channel) throws Exception {
  65. return channel.queueDeclarePassive(queueName);
  66. }
  67. });
  68. return declareOk.getMessageCount();
  69. }
  70. }