Sfoglia il codice sorgente

Merge branch 'e57' into test

dengsixing 1 anno fa
parent
commit
7734e35149

+ 2 - 2
src/main/java/com/fdkankan/contro/mq/listener/AbstrackBuildSceneListener.java

@@ -57,7 +57,7 @@ public class AbstrackBuildSceneListener implements IBuildSceneListener {
                 buildSceneMessage.getBuildContext().put("sceneNum",buildSceneMessage.getSceneNum());
             }
             //记录日志
-            sceneBuildProcessLogService.clearSceneBuildProcessLog(num, SceneBuildProcessType.PRE.code(), queueName);
+//            sceneBuildProcessLogService.clearSceneBuildProcessLog(num, SceneBuildProcessType.PRE.code(), queueName);
             sceneBuildProcessLogService.saveSceneBuildProcessLog(num, SceneBuildProcessType.PRE.code(), queueName, CommonOperStatus.WAITING.code(), null);
             buildSceneService.buildScenePre(buildSceneMessage);
             commonService.saveMqSendLog(num, buildSceneMessage);
@@ -89,7 +89,7 @@ public class AbstrackBuildSceneListener implements IBuildSceneListener {
         BuildSceneResultMqMessage buildSceneMessage = JSONObject.parseObject(msg, BuildSceneResultMqMessage.class);
         String num = buildSceneMessage.getBuildContext().get("sceneNum").toString();
         try {
-            sceneBuildProcessLogService.clearSceneBuildProcessLog(num, SceneBuildProcessType.POST.code(), queueName);
+//            sceneBuildProcessLogService.clearSceneBuildProcessLog(num, SceneBuildProcessType.POST.code(), queueName);
             sceneBuildProcessLogService.saveSceneBuildProcessLog(num, SceneBuildProcessType.POST.code(), queueName, CommonOperStatus.WAITING.code(), null);
             buildSceneService.buildScenePost(buildSceneMessage);
             sceneBuildProcessLogService.saveSceneBuildProcessLog(num, SceneBuildProcessType.POST.code(), queueName, CommonOperStatus.SUCCESS.code(), null);

+ 120 - 0
src/main/java/com/fdkankan/contro/mq/listener/BuildE57Listener.java

@@ -0,0 +1,120 @@
+package com.fdkankan.contro.mq.listener;
+
+import cn.hutool.core.exceptions.ExceptionUtil;
+import com.alibaba.fastjson.JSON;
+import com.alibaba.fastjson.JSONObject;
+import com.fdkankan.common.constant.CommonOperStatus;
+import com.fdkankan.contro.constant.RedisConstants;
+import com.fdkankan.contro.mq.service.impl.BuildE57SceneServiceImpl;
+import com.fdkankan.contro.service.ICommonService;
+import com.fdkankan.contro.service.ISceneBuildProcessLogService;
+import com.fdkankan.model.constants.SceneBuildProcessType;
+import com.fdkankan.rabbitmq.bean.BuildSceneCallMessage;
+import com.fdkankan.rabbitmq.bean.BuildSceneResultMqMessage;
+import com.fdkankan.redis.util.RedisLockUtil;
+import com.rabbitmq.client.Channel;
+import lombok.extern.slf4j.Slf4j;
+import org.springframework.amqp.core.Message;
+import org.springframework.amqp.rabbit.annotation.Queue;
+import org.springframework.amqp.rabbit.annotation.RabbitListener;
+import org.springframework.beans.factory.annotation.Autowired;
+import org.springframework.beans.factory.annotation.Value;
+import org.springframework.stereotype.Component;
+import org.springframework.util.ObjectUtils;
+
+import java.nio.charset.StandardCharsets;
+import java.util.HashMap;
+
+@Slf4j
+@Component
+public class BuildE57Listener{
+
+    @Value("${queue.modeling.e57.modeling-pre:e57-modeling-pre}")
+    private String queueModelingPre;
+
+    @Value("${queue.modeling.e57.modeling-pre:e57-modeling-post}")
+    private String queueModelingPost;
+
+    @Autowired
+    private RedisLockUtil redisLockUtil;
+
+    @Autowired
+    private ISceneBuildProcessLogService sceneBuildProcessLogService;
+    @Autowired
+    private ICommonService commonService;
+    @Autowired
+    private BuildE57SceneServiceImpl buildSceneService;
+
+
+    /**
+     * 场景计算前置资源准备处理
+     * @param channel
+     * @param message
+     * @throws Exception
+     */
+    @RabbitListener(
+            queuesToDeclare = @Queue("${queue.modeling.e57.modeling-pre:e57-modeling-pre}"),
+            concurrency = "${maxThread.modeling.modeling-pre}"
+    )
+    public void buildScenePreHandler(Channel channel, Message message) throws Exception {
+        String messageId = message.getMessageProperties().getMessageId();
+        String msg = new String(message.getBody(), StandardCharsets.UTF_8);
+        HashMap<String, Object> map = JSON.parseObject(msg, HashMap.class);
+        String num = (String) map.get("num");
+        map.put("bizType", "e57");
+
+        log.info("开始准备e57计算资源,队列名:{},id:{},消息体:{}", queueModelingPre, messageId, msg);
+        BuildSceneCallMessage buildSceneMessage = new BuildSceneCallMessage();
+        buildSceneMessage.setSceneNum(num);
+        buildSceneMessage.setExt(map);
+        buildSceneMessage.setBuildType("V3");
+        try {
+            if(ObjectUtils.isEmpty(buildSceneMessage.getBuildContext())){
+                buildSceneMessage.setBuildContext(new HashMap<>());
+            }
+            if(!ObjectUtils.isEmpty(buildSceneMessage.getSceneNum())){
+                buildSceneMessage.getBuildContext().put("sceneNum",buildSceneMessage.getSceneNum());
+            }
+            //记录日志
+            sceneBuildProcessLogService.saveSceneBuildProcessLog(num, SceneBuildProcessType.PRE.code(), queueModelingPre, CommonOperStatus.WAITING.code(), null);
+            buildSceneService.buildScenePre(buildSceneMessage);
+            commonService.saveMqSendLog(num, buildSceneMessage);
+            sceneBuildProcessLogService.saveSceneBuildProcessLog(num, SceneBuildProcessType.PRE.code(), queueModelingPre, CommonOperStatus.SUCCESS.code(), null);
+        }catch (Exception e){
+            log.error("e57计算前置处理出错,num=" + num, e);
+            sceneBuildProcessLogService.saveSceneBuildProcessLog(num, SceneBuildProcessType.PRE.code(), queueModelingPre, CommonOperStatus.FAILD.code(), ExceptionUtil.stacktraceToString(e, 3000));
+        }
+        log.info("准备e57计算资源完成,队列名:{},id:{},消息体:{}", queueModelingPre, messageId, msg);
+        channel.basicAck(message.getMessageProperties().getDeliveryTag(), false);
+    }
+
+    /**
+     * 场景计算后置结果处理
+     * @param channel
+     * @param message
+     * @throws Exception
+     */
+    @RabbitListener(
+            queuesToDeclare = @Queue("${queue.modeling.e57.modeling-post:e57-modeling-post}"),
+            concurrency = "${maxThread.modeling.modeling-post}"
+    )
+    public void buildScenePostHandler(Channel channel, Message message) throws Exception {
+        String messageId = message.getMessageProperties().getMessageId();
+        String msg = new String(message.getBody(), StandardCharsets.UTF_8);
+        log.info("场景计算完成,开始处理e57计算结果,队列名:{},id:{},消息体:{}", queueModelingPost, messageId, msg);
+        BuildSceneResultMqMessage buildSceneMessage = JSONObject.parseObject(msg, BuildSceneResultMqMessage.class);
+        String num = buildSceneMessage.getBuildContext().get("sceneNum").toString();
+        try {
+//            sceneBuildProcessLogService.clearSceneBuildProcessLog(num, SceneBuildProcessType.POST.code(), queueName);
+            sceneBuildProcessLogService.saveSceneBuildProcessLog(num, SceneBuildProcessType.POST.code(), queueModelingPost, CommonOperStatus.WAITING.code(), null);
+            buildSceneService.buildScenePost(buildSceneMessage);
+            sceneBuildProcessLogService.saveSceneBuildProcessLog(num, SceneBuildProcessType.POST.code(), queueModelingPost, CommonOperStatus.SUCCESS.code(), null);
+        }catch (Exception e){
+            log.error("场景计算结果处理出错,num=" + num, e);
+            sceneBuildProcessLogService.saveSceneBuildProcessLog(num, SceneBuildProcessType.POST.code(), queueModelingPost, CommonOperStatus.FAILD.code(), ExceptionUtil.stacktraceToString(e, 3000));
+        }
+        log.info("场景计算结果处理完成,队列名:{},id:{},消息体:{}", queueModelingPost, messageId, msg);
+        channel.basicAck(message.getMessageProperties().getDeliveryTag(), false);
+
+    }
+}

+ 1 - 1
src/main/java/com/fdkankan/contro/mq/listener/BuildSceneProcessLogListener.java

@@ -48,7 +48,7 @@ public class BuildSceneProcessLogListener{
             int buildStatus = buildSceneMessage.getBuildStatus();
             //新的计算开始,需要将之前的记录置为失效
             if(buildStatus == ModelingBuildStatus.CALCULATING.code()){
-                sceneBuildProcessLogService.clearSceneBuildProcessLog(num, SceneBuildProcessType.CALL.code(),queueModelingCall);
+//                sceneBuildProcessLogService.clearSceneBuildProcessLog(num, SceneBuildProcessType.CALL.code(),queueModelingCall);
                 status = CommonOperStatus.WAITING.code();
             }else{
                 if(buildStatus != ModelingBuildStatus.SUCCESS.code()){

+ 158 - 0
src/main/java/com/fdkankan/contro/mq/service/impl/BuildE57SceneServiceImpl.java

@@ -0,0 +1,158 @@
+package com.fdkankan.contro.mq.service.impl;
+
+import cn.hutool.core.util.ZipUtil;
+import com.fdkankan.common.constant.CommonSuccessStatus;
+import com.fdkankan.contro.entity.ScenePlus;
+import com.fdkankan.contro.entity.ScenePlusExt;
+import com.fdkankan.contro.mq.service.IBuildSceneService;
+import com.fdkankan.contro.service.*;
+import com.fdkankan.fyun.config.FYunFileConfig;
+import com.fdkankan.fyun.face.FYunFileServiceInterface;
+import com.fdkankan.model.constants.ConstantFilePath;
+import com.fdkankan.model.constants.UploadFilePath;
+import com.fdkankan.rabbitmq.bean.BuildSceneCallMessage;
+import com.fdkankan.rabbitmq.bean.BuildSceneResultMqMessage;
+import com.fdkankan.rabbitmq.util.RabbitMqProducer;
+import lombok.extern.slf4j.Slf4j;
+import org.springframework.beans.factory.annotation.Autowired;
+import org.springframework.beans.factory.annotation.Value;
+import org.springframework.cloud.context.config.annotation.RefreshScope;
+import org.springframework.stereotype.Service;
+
+import javax.annotation.Resource;
+import java.io.File;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+
+
+/**
+ * <p>
+ * TODO
+ * </p>
+ *
+ * @author dengsixing
+ * @since 2022/4/20
+ **/
+@Slf4j
+@Service
+@RefreshScope
+public class BuildE57SceneServiceImpl implements IBuildSceneService {
+    public static final String logUrlFormat = "**algorithm-log**: [%sbuild_log/%s/e57/console.log](%sbuild_log/%s/e57/console.log)";
+
+    @Value("${queue.modeling.e57.modeling-post}")
+    private String queueModelingPost;
+    @Value("${model.type:#{null}}")
+    private String modelType;
+    @Value("${env:gn}")
+    private String env;
+    @Value("#{'${build.scene.post.not-delete-nas-nums:}'.split(',')}")
+    private List<String> notDeleteNasNumList;
+    @Value("${queue.modeling.e57.modeling-done:e57-modeling-done}")
+    private String queueE57ModelingDone;
+    @Autowired
+    private RabbitMqProducer mqProducer;
+    @Resource
+    private FYunFileServiceInterface fYunFileService;
+    @Autowired
+    private FYunFileConfig fYunFileConfig;
+    @Autowired
+    private IScenePlusService scenePlusService;
+    @Autowired
+    private IScenePlusExtService scenePlusExtService;
+    @Autowired
+    private IBuildSceneDTService buildSceneDTService;
+    @Autowired
+    private ICommonService commonService;
+    @Autowired
+    private ISceneColdStorageService sceneColdStorageService;
+    @Autowired
+    private IntermitSceneService intermitSceneService;
+
+    @Override
+    public void buildScenePre(BuildSceneCallMessage message) throws Exception{
+        String num = message.getSceneNum();
+        try {
+            ScenePlus scenePlus = scenePlusService.getScenePlusByNum(num);
+            ScenePlusExt scenePlusExt = scenePlusExtService.getScenePlusExtByPlusId(scenePlus.getId());
+            String dataSource = scenePlusExt.getDataSource();
+            String path = dataSource + "_e57";
+            message.setPath(dataSource);
+
+            //根据相机类型,组装资源路径
+            //下载资源到本地
+            this.downLoadSource(message, path);
+
+            //发送mq,就进行计算
+            message.setPath(path);
+            message.setResultReceiverMqName(queueModelingPost);
+
+            log.info("e57计算资源准备结束,场景码:{}", message.getSceneNum());
+
+        }catch (Exception e){
+            log.error("e57计算前置处理出错,num"+num, e);
+            buildSceneDTService.handBaseFail("e57计算资源准备异常!", message.getPath(), message.getSceneNum(), "计算控制服务器");
+            throw e;
+        }
+    }
+
+    private String getOssPath(String path) {
+        String ossPath = ConstantFilePath.OSS_PREFIX
+                + path.replace(ConstantFilePath.BUILD_MODEL_PATH, "")
+                .replace(ConstantFilePath.BUILD_MODEL_LASER_PATH, "");
+        if (!ossPath.endsWith("/")) {
+            ossPath = ossPath.concat("/");
+        }
+        return ossPath;
+    }
+
+    @Override
+    public void downLoadSource(BuildSceneCallMessage buildSceneMqMessage,String path){
+        String ossPath = getOssPath(buildSceneMqMessage.getPath());
+        fYunFileService.downloadFileByCommand(path + "/capture", ossPath);
+    }
+
+    @Override
+    public void buildScenePost(BuildSceneResultMqMessage message) throws Exception {
+        String num = message.getBuildContext().get("sceneNum").toString();
+        String path = message.getPath();
+        String bucket = (String)message.getExt().get("bucket");
+        String ossKeyFormat = (String)message.getExt().get("ossKey");
+        try {
+            // 上传计算日志
+            //如果是重复计算,没有走到计算逻辑,不需要上传日志文件
+            log.info("开始上传计算日志");
+            String buildLogPath = String.format(UploadFilePath.BUILD_LOG_PATH, num) + "e57/";
+            fYunFileService.uploadFile(path + File.separator + "console.log", buildLogPath + "console.log");
+            log.info("计算日志上传完成");
+            Map<String, Object> laserMqContent = new HashMap<>();
+            laserMqContent.put("num", num);
+
+            if (!message.getBuildSuccess()) {
+
+                //发送mq通知激光系统
+                laserMqContent.put("status", CommonSuccessStatus.FAIL.code());
+                mqProducer.sendByWorkQueue(queueE57ModelingDone, laserMqContent);
+
+                // 发送钉钉消息,计算失败
+                String logUrl = String.format(logUrlFormat,fYunFileConfig.getHost(),num,fYunFileConfig.getHost(),num);
+                buildSceneDTService.handModelFail("计算失败", message.getPath(), num, message.getHostName(), logUrl);
+                return;
+            }
+
+            //压缩e57
+            String localPath = path + "/results/laserData/laser.e57";
+            String zipPath = path + "/results/laserData/laser-e57.zip";
+            String ossKey = String.format(ossKeyFormat, num, num);
+            ZipUtil.zip(localPath, zipPath);
+            fYunFileService.uploadFile(bucket, zipPath, ossKey);
+            log.info("e57场景计算结果处理结束,场景码:{}", num);
+
+        }catch (Exception e){
+            log.error("e57场景计算结果处理出错,num"+num, e);
+            buildSceneDTService.handBaseFail("e57场景计算结果处理出错!", message.getPath(), num, "计算控制服务器");
+            throw e;
+        }
+    }
+
+}

+ 2 - 0
src/main/java/com/fdkankan/contro/service/IBuildSceneDTService.java

@@ -12,6 +12,8 @@ public interface IBuildSceneDTService {
 
     void handModelFail(String reason, String serverPath, String num, String hostName);
 
+    void handModelFail(String reason, String serverPath, String num, String hostName, String logUrl);
+
     void handBaseFail(String reason, String serverPath, String num, String hostName);
 
 }

+ 21 - 21
src/main/java/com/fdkankan/contro/service/impl/BuildSceneDTServiceImpl.java

@@ -46,30 +46,30 @@ public class BuildSceneDTServiceImpl implements IBuildSceneDTService {
 
     @Override
     public void handModelFail(String reason, String serverPath, String num, String hostName) {
-        CompletableFuture.runAsync(() -> {
-            try {
-                log.info("开始发送钉钉消息");
-                String logPath = String.format(contentExt,fYunFileConfig.getHost(),num,fYunFileConfig.getHost(),num);
-                log.info("发送钉钉消息,content:{}", logPath);
-                String content = String.format(this.DINGTALK_MSG_PATTERN, this.mainUrl, hostName, reason, num, serverPath) + logPath;
-                log.info("发送钉钉消息,content:{}", content);
-                dingTalkSendUtils.sendActioncardMsgToDingRobot(content,"场景计算失败");
-            } catch (ApiException | UnsupportedEncodingException | NoSuchAlgorithmException | InvalidKeyException apiException) {
-                log.error("发送钉钉消息失败", apiException);
-            }
-        });
+        String logPath = String.format(contentExt,fYunFileConfig.getHost(),num,fYunFileConfig.getHost(),num);
+        this.handModelFail(reason, serverPath, num, hostName, logPath);
+    }
+
+    @Override
+    public void handModelFail(String reason, String serverPath, String num, String hostName, String logPath) {
+        try {
+            log.info("发送钉钉消息,content:{}", logPath);
+            String content = String.format(this.DINGTALK_MSG_PATTERN, this.mainUrl, hostName, reason, num, serverPath) + logPath;
+            log.info("发送钉钉消息,content:{}", content);
+            dingTalkSendUtils.sendActioncardMsgToDingRobot(content,"场景计算失败");
+        } catch (ApiException | UnsupportedEncodingException | NoSuchAlgorithmException | InvalidKeyException apiException) {
+            log.error("发送钉钉消息失败", apiException);
+        }
     }
 
     @Override
     public void handBaseFail(String reason, String serverPath, String num, String hostName) {
-        CompletableFuture.runAsync(() -> {
-            try {
-                String content = String.format(this.DINGTALK_MSG_PATTERN, this.mainUrl, hostName, reason, num, serverPath);
-                log.info("发送钉钉消息,content:{}", content);
-                dingTalkSendUtils.sendActioncardMsgToDingRobot(content,"场景计算失败");
-            } catch (ApiException | UnsupportedEncodingException | NoSuchAlgorithmException | InvalidKeyException apiException) {
-                log.error("发送钉钉消息失败", apiException);
-            }
-        });
+        try {
+            String content = String.format(this.DINGTALK_MSG_PATTERN, this.mainUrl, hostName, reason, num, serverPath);
+            log.info("发送钉钉消息,content:{}", content);
+            dingTalkSendUtils.sendActioncardMsgToDingRobot(content,"场景计算失败");
+        } catch (ApiException | UnsupportedEncodingException | NoSuchAlgorithmException | InvalidKeyException apiException) {
+            log.error("发送钉钉消息失败", apiException);
+        }
     }
 }

+ 5 - 3
src/main/java/com/fdkankan/contro/service/impl/SceneBuildProcessLogServiceImpl.java

@@ -7,6 +7,7 @@ import com.fdkankan.contro.service.ISceneBuildProcessLogService;
 import com.baomidou.mybatisplus.extension.service.impl.ServiceImpl;
 import org.springframework.stereotype.Service;
 
+import java.util.Date;
 import java.util.Objects;
 
 /**
@@ -14,7 +15,7 @@ import java.util.Objects;
  * 场景计算流程状态表 服务实现类
  * </p>
  *
- * @author 
+ * @author
  * @since 2023-01-28
  */
 @Service
@@ -25,8 +26,8 @@ public class SceneBuildProcessLogServiceImpl extends ServiceImpl<ISceneBuildProc
     public void clearSceneBuildProcessLog(String num, String process, String queueName) {
         this.remove(new LambdaQueryWrapper<SceneBuildProcessLog>()
                 .eq(SceneBuildProcessLog::getNum, num)
-                .eq(SceneBuildProcessLog::getProcess, process)
-                .eq(SceneBuildProcessLog::getQueueName, queueName));
+                .eq(SceneBuildProcessLog::getProcess, process));
+//                .eq(SceneBuildProcessLog::getQueueName, queueName));
     }
 
     @Override
@@ -45,6 +46,7 @@ public class SceneBuildProcessLogServiceImpl extends ServiceImpl<ISceneBuildProc
         log.setQueueName(queueName);
         log.setState(status);
         log.setReason(reason);
+        log.setUpdateTime(new Date());
         this.saveOrUpdate(log);
     }
 }