Bläddra i källkod

添加消息幂等处理

tianboguang 2 år sedan
förälder
incheckning
aea3c22f43

+ 6 - 0
src/main/java/com/fdkankan/modeling/entity/BuildLog.java

@@ -32,6 +32,12 @@ public class BuildLog implements Serializable {
     private Long id;
 
     /**
+     * 消息id
+     */
+    @TableField("message_id")
+    private String messageId;
+
+    /**
      * 场景码
      */
     @TableField("scene_num")

+ 15 - 8
src/main/java/com/fdkankan/modeling/receiver/RabbitMqListener.java

@@ -14,7 +14,6 @@ import com.fdkankan.modeling.service.IBuildLogService;
 import com.fdkankan.rabbitmq.bean.BuildSceneCallMessage;
 import com.fdkankan.rabbitmq.bean.BuildSceneResultMqMessage;
 import com.fdkankan.rabbitmq.util.RabbitMqProducer;
-import com.fdkankan.redis.constant.RedisKey;
 import com.fdkankan.redis.util.RedisUtil;
 import com.rabbitmq.client.Channel;
 import lombok.extern.slf4j.Slf4j;
@@ -22,10 +21,8 @@ import org.apache.commons.lang3.time.StopWatch;
 import org.springframework.amqp.core.Message;
 import org.springframework.amqp.rabbit.annotation.Queue;
 import org.springframework.amqp.rabbit.annotation.RabbitListener;
-import org.springframework.amqp.support.AmqpHeaders;
 import org.springframework.beans.factory.annotation.Autowired;
 import org.springframework.beans.factory.annotation.Value;
-import org.springframework.messaging.handler.annotation.Header;
 import org.springframework.stereotype.Component;
 import org.springframework.util.ObjectUtils;
 
@@ -104,21 +101,21 @@ public class RabbitMqListener {
         }
         // 提前确认消息,以免执行超时重发
         channel.basicAck(deliveryTag, false);
-
+        BuildLog buildLog = new BuildLog();
         log.info("场景计算开始,队列名:{},id:{},deliveryTag:{},消息体:{}", queueModelingCall, correlationId,deliveryTag,msg);
-        this.process(buildSceneMessage);
+        buildLog.setMessageId(correlationId);
+
+        this.process(buildSceneMessage,buildLog);
         log.info("场景计算结束,队列名:{},id:{}", queueModelingCall, correlationId);
 
         //计算完毕,将当前系统构建状态改为false
         SysConstants.SYSTEM_BUILDING = false;
     }
 
-    public void process(BuildSceneCallMessage message) {
+    public void process(BuildSceneCallMessage message,BuildLog buildLog) {
         //开始计时
         StopWatch watch = new StopWatch();
         watch.start();
-
-        BuildLog buildLog = new BuildLog();
         final BuildSceneResultBean buildSceneResult = new BuildSceneResultBean();
         ModelingBuildStatus status = null;
         try {
@@ -262,6 +259,16 @@ public class RabbitMqListener {
         // 获取缓存锁,防止重复消费
         Long building = redisUtil.incr(key, 1);
         buildLogService.save(buildLog);
+
+        // 设置消息id幂等性,防止消息重复消费
+        long messageIdCount = redisUtil.incr(SysConstants.SCENE_BUILDING + buildLog.getMessageId(), 1);
+        if (messageIdCount == 1) {
+            redisUtil.expire(key, SysConstants.modelTimeOut);
+        }else{
+            log.error("服务:{},消息重复消费:{},重复消费次数:{}", SysConstants.hostName, buildLog.getMessageId(),messageIdCount);
+            throw new BuildException(ModelingBuildStatus.REPEAT);
+        }
+
         if (building.compareTo(1L) > 0) {
             log.error("服务:{},重复构建:{},构建次数:{}", SysConstants.hostName, buildLog.getSceneNum(),building);
             throw new BuildException(ModelingBuildStatus.REPEAT);