package com.fdkankan.rabbitmq.util; import cn.hutool.json.JSONObject; import com.fdkankan.rabbitmq.config.ModelingQueueConfig; import com.rabbitmq.client.AMQP; import com.rabbitmq.client.AMQP.Queue.DeclareOk; import com.rabbitmq.client.Channel; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.springframework.amqp.rabbit.connection.CorrelationData; import org.springframework.amqp.rabbit.core.ChannelCallback; import org.springframework.amqp.rabbit.core.RabbitTemplate; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.stereotype.Component; import java.util.UUID; /** *
* TODO *
* * @author dengsixing * @since 2022/4/19 **/ @Component public class RabbitMqProducer { private Logger log = LoggerFactory.getLogger(this.getClass().getName()); @Autowired ModelingQueueConfig modelingQueueConfig; @Autowired private RabbitTemplate rabbitTemplate; /** * 工作队列模式发送消息 * @param queue 队列名 * @param content 载荷 */ public void sendByWorkQueue(String queue, Object content){ String messageId = UUID.randomUUID().toString(); log.info("开始发送Mq消息,messageId:{},消息队列:{},消息内容:{}",messageId, queue, new JSONObject(content).toString()); rabbitTemplate.convertAndSend(queue, content, message -> { message.getMessageProperties().setMessageId(messageId); return message; }, new CorrelationData(messageId)); } /** * 工作队列模式发送带优先级消息 * @param queue 队列名 * @param content 载荷 * @param priority 优先级,正整数,值越大优先级越高,当值大于队列设置的最大优先级时,效果等同于最大优先级 */ public void sendByWorkQueue(String queue, Object content, Integer priority){ String messageId = UUID.randomUUID().toString(); log.info("开始发送Mq消息,messageId:{},消息队列:{},消息内容:{}",messageId, queue, new JSONObject(content).toString()); rabbitTemplate.convertAndSend(queue, content, message -> { message.getMessageProperties().setMessageId(messageId); message.getMessageProperties().setPriority(priority); return message; }, new CorrelationData(messageId)); } /** * 获取队列未被消费的消息数量 * @return */ public int getMessageCount(String queueName) { AMQP.Queue.DeclareOk declareOk = rabbitTemplate.execute(new ChannelCallback