123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119 |
- package com.fdkankan.rabbitmq.config;
- import com.fdkankan.rabbitmq.callback.ProducerCallback;
- import javax.annotation.Resource;
- import lombok.extern.slf4j.Slf4j;
- import org.springframework.amqp.rabbit.connection.ConnectionFactory;
- import org.springframework.amqp.rabbit.core.RabbitTemplate;
- import org.springframework.amqp.support.converter.Jackson2JsonMessageConverter;
- import org.springframework.boot.autoconfigure.data.redis.JedisClientConfigurationBuilderCustomizer;
- import org.springframework.context.annotation.Bean;
- import org.springframework.context.annotation.Configuration;
- @Configuration
- @Slf4j
- public class RabbitMqConfig {
- @Resource
- private QueueConfig queueConfig;
- // @Resource
- // private ExchangeConfig exchangeConfig;
- /**
- * 连接工厂
- */
- @Resource
- private ConnectionFactory connectionFactory;
- @Resource
- private ProducerCallback producerCallback;
- /**
- * 指定序列化转换器,否则mq消息存进去
- * @return
- */
- @Bean
- public Jackson2JsonMessageConverter messageConverter(){
- return new Jackson2JsonMessageConverter();
- }
- /**
- * 将消息队列和交换机进行绑定,指定路由
- */
- // @Bean
- // public Binding bindingDirect() {
- // return BindingBuilder.bind(queueConfig.easyQueue()).to(exchangeConfig.directExchange()).with(RabbitMQConstant.ROUTING_KEY_EASY);
- // }
- //
- // @Bean
- // public Binding bindingWork() {
- // return BindingBuilder.bind(queueConfig.workQueue()).to(exchangeConfig.workExchange()).with(RabbitMQConstant.ROUTING_KEY_WORK);
- // }
- //
- // @Bean
- // public Binding bindingTopic() {
- // return BindingBuilder.bind(queueConfig.topicQueue()).to(exchangeConfig.topicExchange()).with(RabbitMQConstant.ROUTING_KEY_TOPIC_FIRST);
- // }
- //
- // @Bean
- // public Binding bindingTopicSecond() {
- // return BindingBuilder.bind(queueConfig.topicQueueSecond()).to(exchangeConfig.topicExchange()).with(RabbitMQConstant.ROUTING_KEY_TOPIC_SECOND);
- // }
- //
- // @Bean
- // public Binding bindingFanout() {
- // return BindingBuilder.bind(queueConfig.fanoutQueue()).to(exchangeConfig.fanoutExchange());
- // }
- //
- // @Bean
- // public Binding bindingFanoutSecond() {
- // return BindingBuilder.bind(queueConfig.fanoutQueueSecond()).to(exchangeConfig.fanoutExchange());
- // }
- /** ======================== 定制一些处理策略 =============================*/
- /**
- * 定制化amqp模版
- * <p>
- * Rabbit MQ的消息确认有两种。
- * <p>
- * 一种是消息发送确认:这种是用来确认生产者将消息发送给交换机,交换机传递给队列过程中,消息是否成功投递。
- * 发送确认分两步:一是确认是否到达交换机,二是确认是否到达队列
- * <p>
- * 第二种是消费接收确认:这种是确认消费者是否成功消费了队列中的消息。
- */
- @Bean
- public RabbitTemplate rabbitTemplate(Jackson2JsonMessageConverter messageConverter) {
- RabbitTemplate rabbitTemplate = new RabbitTemplate(connectionFactory);
- rabbitTemplate.setMessageConverter(messageConverter);
- // 消息发送失败返回到队列中, yml需要配置 publisher-returns: true
- // rabbitTemplate.setMandatory(true);
- /**
- * 使用该功能需要开启消息确认,yml需要配置 publisher-confirm-type: correlated:表示消息成功到达Broker后触发ConfirmCalllBack回调
- * 通过实现ConfirmCallBack接口,用于实现消息发送到交换机Exchange后接收ack回调
- * correlationData 消息唯一标志
- * ack 确认结果
- *
- * cause 失败原因
- */
- rabbitTemplate.setConfirmCallback(producerCallback);
- /**
- * 使用该功能需要开启消息返回确认,yml需要配置 publisher-returns: true
- * 通过实现ReturnCallback接口,如果消息从交换机发送到对应队列失败时触发
- * message 消息主体 message
- * replyCode 消息主体 message
- * replyText 描述
- * exchange 消息使用的交换机
- * routingKey 消息使用的路由键
- */
- // rabbitTemplate.setReturnCallback(new ProducerCallback());
- return rabbitTemplate;
- }
- }
|