RabbitMqConfig.java 4.3 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119
  1. package com.fdkankan.rabbitmq.config;
  2. import com.fdkankan.rabbitmq.callback.ProducerCallback;
  3. import javax.annotation.Resource;
  4. import lombok.extern.slf4j.Slf4j;
  5. import org.springframework.amqp.rabbit.connection.ConnectionFactory;
  6. import org.springframework.amqp.rabbit.core.RabbitTemplate;
  7. import org.springframework.amqp.support.converter.Jackson2JsonMessageConverter;
  8. import org.springframework.boot.autoconfigure.data.redis.JedisClientConfigurationBuilderCustomizer;
  9. import org.springframework.context.annotation.Bean;
  10. import org.springframework.context.annotation.Configuration;
  11. @Configuration
  12. @Slf4j
  13. public class RabbitMqConfig {
  14. @Resource
  15. private QueueConfig queueConfig;
  16. // @Resource
  17. // private ExchangeConfig exchangeConfig;
  18. /**
  19. * 连接工厂
  20. */
  21. @Resource
  22. private ConnectionFactory connectionFactory;
  23. @Resource
  24. private ProducerCallback producerCallback;
  25. /**
  26. * 指定序列化转换器,否则mq消息存进去
  27. * @return
  28. */
  29. @Bean
  30. public Jackson2JsonMessageConverter messageConverter(){
  31. return new Jackson2JsonMessageConverter();
  32. }
  33. /**
  34. * 将消息队列和交换机进行绑定,指定路由
  35. */
  36. // @Bean
  37. // public Binding bindingDirect() {
  38. // return BindingBuilder.bind(queueConfig.easyQueue()).to(exchangeConfig.directExchange()).with(RabbitMQConstant.ROUTING_KEY_EASY);
  39. // }
  40. //
  41. // @Bean
  42. // public Binding bindingWork() {
  43. // return BindingBuilder.bind(queueConfig.workQueue()).to(exchangeConfig.workExchange()).with(RabbitMQConstant.ROUTING_KEY_WORK);
  44. // }
  45. //
  46. // @Bean
  47. // public Binding bindingTopic() {
  48. // return BindingBuilder.bind(queueConfig.topicQueue()).to(exchangeConfig.topicExchange()).with(RabbitMQConstant.ROUTING_KEY_TOPIC_FIRST);
  49. // }
  50. //
  51. // @Bean
  52. // public Binding bindingTopicSecond() {
  53. // return BindingBuilder.bind(queueConfig.topicQueueSecond()).to(exchangeConfig.topicExchange()).with(RabbitMQConstant.ROUTING_KEY_TOPIC_SECOND);
  54. // }
  55. //
  56. // @Bean
  57. // public Binding bindingFanout() {
  58. // return BindingBuilder.bind(queueConfig.fanoutQueue()).to(exchangeConfig.fanoutExchange());
  59. // }
  60. //
  61. // @Bean
  62. // public Binding bindingFanoutSecond() {
  63. // return BindingBuilder.bind(queueConfig.fanoutQueueSecond()).to(exchangeConfig.fanoutExchange());
  64. // }
  65. /** ======================== 定制一些处理策略 =============================*/
  66. /**
  67. * 定制化amqp模版
  68. * <p>
  69. * Rabbit MQ的消息确认有两种。
  70. * <p>
  71. * 一种是消息发送确认:这种是用来确认生产者将消息发送给交换机,交换机传递给队列过程中,消息是否成功投递。
  72. * 发送确认分两步:一是确认是否到达交换机,二是确认是否到达队列
  73. * <p>
  74. * 第二种是消费接收确认:这种是确认消费者是否成功消费了队列中的消息。
  75. */
  76. @Bean
  77. public RabbitTemplate rabbitTemplate(Jackson2JsonMessageConverter messageConverter) {
  78. RabbitTemplate rabbitTemplate = new RabbitTemplate(connectionFactory);
  79. rabbitTemplate.setMessageConverter(messageConverter);
  80. // 消息发送失败返回到队列中, yml需要配置 publisher-returns: true
  81. // rabbitTemplate.setMandatory(true);
  82. /**
  83. * 使用该功能需要开启消息确认,yml需要配置 publisher-confirm-type: correlated:表示消息成功到达Broker后触发ConfirmCalllBack回调
  84. * 通过实现ConfirmCallBack接口,用于实现消息发送到交换机Exchange后接收ack回调
  85. * correlationData 消息唯一标志
  86. * ack 确认结果
  87. *
  88. * cause 失败原因
  89. */
  90. rabbitTemplate.setConfirmCallback(producerCallback);
  91. /**
  92. * 使用该功能需要开启消息返回确认,yml需要配置 publisher-returns: true
  93. * 通过实现ReturnCallback接口,如果消息从交换机发送到对应队列失败时触发
  94. * message 消息主体 message
  95. * replyCode 消息主体 message
  96. * replyText 描述
  97. * exchange 消息使用的交换机
  98. * routingKey 消息使用的路由键
  99. */
  100. // rabbitTemplate.setReturnCallback(new ProducerCallback());
  101. return rabbitTemplate;
  102. }
  103. }