RabbitMqConfig.java 4.3 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120
  1. package com.fdkankan.rabbitmq.config;
  2. import com.fdkankan.rabbitmq.callback.ProducerCallback;
  3. import com.fdkankan.rabbitmq.constant.RabbitMQConstant;
  4. import javax.annotation.Resource;
  5. import lombok.extern.slf4j.Slf4j;
  6. import org.springframework.amqp.core.Binding;
  7. import org.springframework.amqp.core.BindingBuilder;
  8. import org.springframework.amqp.rabbit.connection.ConnectionFactory;
  9. import org.springframework.amqp.rabbit.core.RabbitTemplate;
  10. import org.springframework.amqp.support.converter.Jackson2JsonMessageConverter;
  11. import org.springframework.beans.factory.annotation.Autowired;
  12. import org.springframework.context.annotation.Bean;
  13. import org.springframework.context.annotation.Configuration;
  14. @Configuration
  15. @Slf4j
  16. public class RabbitMqConfig {
  17. @Resource
  18. private QueueConfig queueConfig;
  19. @Resource
  20. private ExchangeConfig exchangeConfig;
  21. /**
  22. * 连接工厂
  23. */
  24. @Resource
  25. private ConnectionFactory connectionFactory;
  26. @Resource
  27. private ProducerCallback producerCallback;
  28. /**
  29. * 指定序列化转换器,否则mq消息存进去
  30. * @return
  31. */
  32. @Bean
  33. public Jackson2JsonMessageConverter messageConverter(){
  34. return new Jackson2JsonMessageConverter();
  35. }
  36. /**
  37. * 将消息队列和交换机进行绑定,指定路由
  38. */
  39. // @Bean
  40. // public Binding bindingDirect() {
  41. // return BindingBuilder.bind(queueConfig.easyQueue()).to(exchangeConfig.directExchange()).with(RabbitMQConstant.ROUTING_KEY_EASY);
  42. // }
  43. //
  44. // @Bean
  45. // public Binding bindingWork() {
  46. // return BindingBuilder.bind(queueConfig.workQueue()).to(exchangeConfig.workExchange()).with(RabbitMQConstant.ROUTING_KEY_WORK);
  47. // }
  48. //
  49. // @Bean
  50. // public Binding bindingTopic() {
  51. // return BindingBuilder.bind(queueConfig.topicQueue()).to(exchangeConfig.topicExchange()).with(RabbitMQConstant.ROUTING_KEY_TOPIC_FIRST);
  52. // }
  53. //
  54. // @Bean
  55. // public Binding bindingTopicSecond() {
  56. // return BindingBuilder.bind(queueConfig.topicQueueSecond()).to(exchangeConfig.topicExchange()).with(RabbitMQConstant.ROUTING_KEY_TOPIC_SECOND);
  57. // }
  58. //
  59. // @Bean
  60. // public Binding bindingFanout() {
  61. // return BindingBuilder.bind(queueConfig.fanoutQueue()).to(exchangeConfig.fanoutExchange());
  62. // }
  63. //
  64. // @Bean
  65. // public Binding bindingFanoutSecond() {
  66. // return BindingBuilder.bind(queueConfig.fanoutQueueSecond()).to(exchangeConfig.fanoutExchange());
  67. // }
  68. /** ======================== 定制一些处理策略 =============================*/
  69. /**
  70. * 定制化amqp模版
  71. * <p>
  72. * Rabbit MQ的消息确认有两种。
  73. * <p>
  74. * 一种是消息发送确认:这种是用来确认生产者将消息发送给交换机,交换机传递给队列过程中,消息是否成功投递。
  75. * 发送确认分两步:一是确认是否到达交换机,二是确认是否到达队列
  76. * <p>
  77. * 第二种是消费接收确认:这种是确认消费者是否成功消费了队列中的消息。
  78. */
  79. @Bean
  80. public RabbitTemplate rabbitTemplate() {
  81. RabbitTemplate rabbitTemplate = new RabbitTemplate(connectionFactory);
  82. // 消息发送失败返回到队列中, yml需要配置 publisher-returns: true
  83. // rabbitTemplate.setMandatory(true);
  84. /**
  85. * 使用该功能需要开启消息确认,yml需要配置 publisher-confirm-type: correlated:表示消息成功到达Broker后触发ConfirmCalllBack回调
  86. * 通过实现ConfirmCallBack接口,用于实现消息发送到交换机Exchange后接收ack回调
  87. * correlationData 消息唯一标志
  88. * ack 确认结果
  89. *
  90. * cause 失败原因
  91. */
  92. rabbitTemplate.setConfirmCallback(producerCallback);
  93. /**
  94. * 使用该功能需要开启消息返回确认,yml需要配置 publisher-returns: true
  95. * 通过实现ReturnCallback接口,如果消息从交换机发送到对应队列失败时触发
  96. * message 消息主体 message
  97. * replyCode 消息主体 message
  98. * replyText 描述
  99. * exchange 消息使用的交换机
  100. * routingKey 消息使用的路由键
  101. */
  102. // rabbitTemplate.setReturnCallback(new ProducerCallback());
  103. return rabbitTemplate;
  104. }
  105. }