RabbitMqProducer.java 1.8 KB

1234567891011121314151617181920212223242526272829303132333435363738394041424344454647484950515253545556575859606162
  1. package com.fdkankan.rabbitmq.util;
  2. import cn.hutool.json.JSONObject;
  3. import com.fdkankan.rabbitmq.config.ModelingQueueConfig;
  4. import com.rabbitmq.client.AMQP;
  5. import com.rabbitmq.client.AMQP.Queue.DeclareOk;
  6. import com.rabbitmq.client.Channel;
  7. import org.slf4j.Logger;
  8. import org.slf4j.LoggerFactory;
  9. import org.springframework.amqp.rabbit.connection.CorrelationData;
  10. import org.springframework.amqp.rabbit.core.ChannelCallback;
  11. import org.springframework.amqp.rabbit.core.RabbitTemplate;
  12. import org.springframework.beans.factory.annotation.Autowired;
  13. import org.springframework.stereotype.Component;
  14. import java.util.UUID;
  15. /**
  16. * <p>
  17. * TODO
  18. * </p>
  19. *
  20. * @author dengsixing
  21. * @since 2022/4/19
  22. **/
  23. @Component
  24. public class RabbitMqProducer {
  25. private Logger log = LoggerFactory.getLogger(this.getClass().getName());
  26. @Autowired
  27. ModelingQueueConfig modelingQueueConfig;
  28. @Autowired
  29. private RabbitTemplate rabbitTemplate;
  30. /**
  31. * 工作队列模式发送
  32. */
  33. public void sendByWorkQueue(String queue, Object content){
  34. log.info("开始发送Mq消息,消息队列:{},消息内容:{}",queue, new JSONObject(content).toString());
  35. CorrelationData correlationId = new CorrelationData(UUID.randomUUID().toString());
  36. rabbitTemplate.convertAndSend(queue, content, correlationId);
  37. }
  38. /**
  39. * 获取队列未被消费的消息数量
  40. * @return
  41. */
  42. public int getMessageCount(String queueName) {
  43. AMQP.Queue.DeclareOk declareOk = rabbitTemplate.execute(new ChannelCallback<DeclareOk>() {
  44. public AMQP.Queue.DeclareOk doInRabbit(Channel channel) throws Exception {
  45. return channel.queueDeclarePassive(queueName);
  46. }
  47. });
  48. return declareOk.getMessageCount();
  49. }
  50. }