Pārlūkot izejas kodu

添加消息幂等处理

tianboguang 2 gadi atpakaļ
vecāks
revīzija
6d8e28fce7

+ 2 - 0
src/main/java/com/fdkankan/contro/constant/RedisConstants.java

@@ -4,4 +4,6 @@ public class RedisConstants {
     public static final String FOLDER_FILEID_BUILD = "model-controls:folder:%s";
     public static final String FILEID_FOLDER_BUILD = "model-controls:fileId:%s";
     public static final String FOLDER_LOCK_BUILD = "model-controls:lock:%s";
+    public static final String SCENE_PREPARE_BUILDING = "model-controls:scene:building:prepare:message:";
+    public static final String SCENE_POST_BUILDING = "model-controls:scene:building:post:message:";
 }

+ 27 - 0
src/main/java/com/fdkankan/contro/mq/listener/AbstrackBuildSceneListener.java

@@ -1,12 +1,15 @@
 package com.fdkankan.contro.mq.listener;
 
 import com.alibaba.fastjson.JSONObject;
+import com.fdkankan.contro.constant.RedisConstants;
 import com.fdkankan.contro.mq.service.IBuildSceneService;
 import com.fdkankan.rabbitmq.bean.BuildSceneCallMessage;
 import com.fdkankan.rabbitmq.bean.BuildSceneResultMqMessage;
+import com.fdkankan.redis.util.RedisLockUtil;
 import com.rabbitmq.client.Channel;
 import lombok.extern.slf4j.Slf4j;
 import org.springframework.amqp.core.Message;
+import org.springframework.beans.factory.annotation.Autowired;
 import org.springframework.util.ObjectUtils;
 
 import java.io.IOException;
@@ -15,9 +18,23 @@ import java.util.HashMap;
 
 @Slf4j
 public class AbstrackBuildSceneListener implements IBuildSceneListener {
+
+    @Autowired
+    private RedisLockUtil redisLockUtil;
+
     @Override
     public void preHandle(Channel channel, String queueName, Message message, IBuildSceneService buildSceneService) throws IOException {
+        // 添加消息幂等处理
         String messageId = message.getMessageProperties().getMessageId();
+        if(!ObjectUtils.isEmpty(messageId)){
+            // 设置消息id幂等性,防止消息重复消费
+            boolean lock = redisLockUtil.lock(RedisConstants.SCENE_PREPARE_BUILDING + messageId, 24 * 3600);
+            if (!lock) {
+                log.error("服务:{},消息重复消费:{}", "常驻服务", messageId);
+                return;
+            }
+        }
+
         String msg = new String(message.getBody(), StandardCharsets.UTF_8);
         log.info("开始准备场景计算资源,队列名:{},id:{},消息体:{}", queueName, messageId, msg);
         BuildSceneCallMessage buildSceneMessage = JSONObject.parseObject(msg, BuildSceneCallMessage.class);
@@ -34,7 +51,17 @@ public class AbstrackBuildSceneListener implements IBuildSceneListener {
 
     @Override
     public void postHandle(Channel channel, String queueName, Message message, IBuildSceneService buildSceneService) throws Exception {
+        // 添加消息幂等处理
         String messageId = message.getMessageProperties().getMessageId();
+        if(!ObjectUtils.isEmpty(messageId)){
+            // 设置消息id幂等性,防止消息重复消费
+            boolean lock = redisLockUtil.lock(RedisConstants.SCENE_POST_BUILDING + messageId, 24 * 3600);
+            if (!lock) {
+                log.error("服务:{},消息重复消费:{}", "常驻服务", messageId);
+                return;
+            }
+        }
+
         String msg = new String(message.getBody(), StandardCharsets.UTF_8);
         log.info("场景计算完成,开始处理场景计算结果,队列名:{},id:{},消息体:{}", queueName, messageId, msg);
         BuildSceneResultMqMessage buildSceneMessage = JSONObject.parseObject(msg, BuildSceneResultMqMessage.class);

+ 6 - 2
src/main/java/com/fdkankan/contro/mq/service/impl/BuildSceneServiceImpl.java

@@ -232,7 +232,7 @@ public class BuildSceneServiceImpl implements IBuildSceneService {
             this.uploadFloorCad(path, sceneCode, uploadFiles);
 
             //上传文件
-            log.info("开始上传场景计算结果数据,num:{}", sceneCode);
+            log.info("开始上传场景计算结果数据,num:{},{}", sceneCode, JSONObject.toJSONString(uploadFiles));
             fYunFileService.uploadMulFiles(uploadFiles);
 
             Map<String,String> damFileHeaders = new HashMap<>();
@@ -241,8 +241,12 @@ public class BuildSceneServiceImpl implements IBuildSceneService {
             if (!fYunFileService.getFyunType().equals(FYunTypeEnum.LOCAL.code())) {
                 // dam 文件设置请求头
                 uploadFiles.entrySet().stream().filter(entry -> FileNameUtil.extName(entry.getKey()).equals("dam"))
-                        .filter(entry -> new File(entry.getKey()).exists())
                         .forEach(entry -> {
+                            // 防止value重复
+                            if (!new File(entry.getKey()).exists()) {
+                                log.error("文件不存在,不予gzip压缩,文件路径:{}", entry.getKey());
+                                return;
+                            }
                             // gzip压缩
                             FileUtil.writeBytes(ZipUtil.gzip(new File(entry.getKey())), entry.getKey() + ".gzip");
                             // 重命名