Quellcode durchsuchen

计算流程优化改造

dengsixing vor 2 Monaten
Ursprung
Commit
15cc33389b

+ 14 - 1
src/main/java/com/fdkankan/contro/ModelingControlApplication.java

@@ -1,7 +1,9 @@
 package com.fdkankan.contro;
 
 import com.dtflys.forest.springboot.annotation.ForestScan;
+import com.fdkankan.redis.util.RedisUtil;
 import org.mybatis.spring.annotation.MapperScan;
+import org.springframework.boot.CommandLineRunner;
 import org.springframework.boot.SpringApplication;
 import org.springframework.boot.autoconfigure.SpringBootApplication;
 import org.springframework.cloud.client.discovery.EnableDiscoveryClient;
@@ -9,6 +11,8 @@ import org.springframework.context.annotation.ComponentScan;
 import org.springframework.scheduling.annotation.EnableAsync;
 import org.springframework.scheduling.annotation.EnableScheduling;
 
+import javax.annotation.Resource;
+
 /**
  * <p>
  * </p>
@@ -23,10 +27,19 @@ import org.springframework.scheduling.annotation.EnableScheduling;
 @MapperScan("com.fdkankan.**.mapper")
 @EnableDiscoveryClient
 @ForestScan(basePackages = "com.fdkankan.contro.httpclient")
-public class ModelingControlApplication {
+public class ModelingControlApplication implements CommandLineRunner {
+
+    @Resource
+    private RedisUtil redisUtil;
+
     public static void main(String[] args) {
         SpringApplication.run(ModelingControlApplication.class, args);
     }
+
+    @Override
+    public void run(String... args) throws Exception {
+//        redisUtil.del("push-modeling-pre");
+    }
 }
 
 

+ 13 - 4
src/main/java/com/fdkankan/contro/mq/listener/BuildSceneProcessLogListener.java

@@ -1,13 +1,16 @@
 package com.fdkankan.contro.mq.listener;
 
+import cn.hutool.core.util.StrUtil;
 import com.alibaba.fastjson.JSONObject;
+import com.baomidou.mybatisplus.core.conditions.update.LambdaUpdateWrapper;
 import com.fdkankan.common.constant.CommonOperStatus;
 import com.fdkankan.common.constant.ModelingBuildStatus;
+import com.fdkankan.contro.entity.OrigFileUploadBatch;
 import com.fdkankan.contro.service.IJmgaService;
+import com.fdkankan.contro.service.IOrigFileUploadBatchService;
 import com.fdkankan.contro.service.ISceneBuildProcessLogService;
 import com.fdkankan.contro.service.IScenePlusService;
 import com.fdkankan.model.constants.SceneBuildProcessType;
-import com.fdkankan.rabbitmq.bean.BuildSceneProcessLogMessage;
 import com.rabbitmq.client.Channel;
 import lombok.extern.slf4j.Slf4j;
 import org.springframework.amqp.core.Message;
@@ -37,6 +40,8 @@ public class BuildSceneProcessLogListener{
     private IScenePlusService scenePlusService;
     @Autowired
     private IJmgaService jmgaService;
+    @Autowired
+    private IOrigFileUploadBatchService origFileUploadBatchService;
 
     /**
      * 场景计算状态日志记录
@@ -49,16 +54,20 @@ public class BuildSceneProcessLogListener{
     )
     public void buildScenePreHandler(Channel channel, Message message) throws Exception {
         String msg = new String(message.getBody(), StandardCharsets.UTF_8);
-        BuildSceneProcessLogMessage buildSceneMessage = JSONObject.parseObject(msg, BuildSceneProcessLogMessage.class);
-        String num = buildSceneMessage.getNum();
+        JSONObject buildSceneMessage = JSONObject.parseObject(msg);
+        String num = buildSceneMessage.getString("num");
         try {
             String reason = null;
             int status = CommonOperStatus.SUCCESS.code();
-            int buildStatus = buildSceneMessage.getBuildStatus();
+            int buildStatus = buildSceneMessage.getInteger("buildStatus");
+            String batchIds = buildSceneMessage.getString("batchId");
 
             //新的计算开始,需要将之前的记录置为失效
             if(buildStatus == ModelingBuildStatus.CALCULATING.code()){
                 sceneBuildProcessLogService.clearSceneBuildProcessLog(num, SceneBuildProcessType.CALL.code(),queueModelingCall);
+                if(StrUtil.isNotEmpty(batchIds)){
+                    origFileUploadBatchService.update(new LambdaUpdateWrapper<OrigFileUploadBatch>().set(OrigFileUploadBatch::getStatus, 2).in(OrigFileUploadBatch::getBatchId, batchIds.split(",")));
+                }
                 status = CommonOperStatus.WAITING.code();
                 Map<String, Object> param = new HashMap<>();
                 try {

+ 1 - 1
src/main/java/com/fdkankan/contro/mq/service/JmgaService.java

@@ -1,6 +1,6 @@
 package com.fdkankan.contro.mq.service;
 
-public interface JmgaService {
+public interface CallBuildService {
 
     void pushPre();
 

+ 78 - 92
src/main/java/com/fdkankan/contro/mq/service/impl/BuildSceneProgressServiceImpl.java

@@ -1,24 +1,13 @@
 package com.fdkankan.contro.mq.service.impl;
 
-import cn.hutool.core.bean.BeanUtil;
-import cn.hutool.core.io.FileUtil;
 import cn.hutool.core.io.watch.WatchMonitor;
 import cn.hutool.core.io.watch.Watcher;
-import cn.hutool.core.thread.ThreadUtil;
-import cn.hutool.core.util.StrUtil;
 import cn.hutool.http.HttpUtil;
 import com.alibaba.fastjson.JSON;
-import com.alibaba.fastjson.JSONObject;
-import com.baomidou.mybatisplus.core.conditions.query.LambdaQueryWrapper;
-import com.fdkankan.common.constant.*;
-import com.fdkankan.common.util.FileUtils;
-import com.fdkankan.contro.entity.ScenePlus;
-import com.fdkankan.contro.entity.ScenePlusExt;
 import com.fdkankan.contro.mq.service.IBuildSceneProgressService;
 import com.fdkankan.contro.service.IScenePlusExtService;
 import com.fdkankan.contro.service.IScenePlusService;
 import com.fdkankan.rabbitmq.bean.BuildSceneCallMessage;
-import com.fdkankan.redis.constant.RedisKey;
 import com.fdkankan.redis.util.RedisUtil;
 import lombok.extern.slf4j.Slf4j;
 import org.springframework.beans.factory.annotation.Autowired;
@@ -27,11 +16,9 @@ import org.springframework.cloud.context.config.annotation.RefreshScope;
 import org.springframework.stereotype.Service;
 
 import java.io.File;
-import java.math.BigDecimal;
 import java.nio.file.Path;
 import java.nio.file.WatchEvent;
-import java.util.*;
-import java.util.concurrent.*;
+import java.util.Map;
 
 @RefreshScope
 @Slf4j
@@ -53,84 +40,83 @@ public class BuildSceneProgressServiceImpl implements IBuildSceneProgressService
 
     @Override
     public void monitorProgress(BuildSceneCallMessage buildSceneCallMessage) {
-        String num = buildSceneCallMessage.getSceneNum();
-        String customUserId = (String)buildSceneCallMessage.getExt().get("customUserId");
-        String gps = (String)buildSceneCallMessage.getExt().get("gps");
-        String path = buildSceneCallMessage.getPath();
-
-        ScenePlus scenePlus = scenePlusService.getScenePlusByNum(num);
-        ScenePlusExt scenePlusExt = scenePlusExtService.getScenePlusExtByPlusId(scenePlus.getId());
-        String website = scenePlusExt.getWebSite();
-        String title = scenePlus.getTitle();
-
-        Long factor = 10L;
-        Long totalTime = 300L;
-        Integer shootCount = scenePlusExt.getShootCount();
-        if(Objects.nonNull(shootCount)){
-            totalTime += shootCount*7*60;//预估7分钟一个点位
-        }
-        redisUtil.set(String.format(RedisKey.SCENE_BUILD_EXPECT_TOTAL_TIME_NUM, num), String.valueOf(totalTime));
-        Long intervalTime = totalTime/factor;//发送计算进度时间窗口
-        log.info("推送进度时间窗口:{}", intervalTime);
-
-        Map<String, Object> params = new HashMap<>();
-        params.put("website", mainUrl.concat(website));
-        params.put("title", title);
-        params.put("customUserId", customUserId);
-        params.put("gps", gps);
-        params.put("totalTime", totalTime);
-        params.put("progress", 0);
-        params.put("status", CommonSuccessStatus.WAITING.code());
-        log.info("场景计算开始,发送进度请求,url:{}, param:{}", buildProgressUrl, JSON.toJSONString(params));
-        HttpUtil.post(buildProgressUrl, JSON.toJSONString(params), 2000);
-
-        ExecutorService executorService = ThreadUtil.newSingleExecutor();
-        executorService.submit(()->{
-            boolean finish = false;
-            int mainProgress = 0;
-            do {
-                try {
-                    Thread.sleep(intervalTime*1000);
-                } catch (InterruptedException e) {
-                    e.printStackTrace();
-                }
-                String finishStr = redisUtil.get(String.format(RedisKey.SCENE_BUILD_FINISH_NUM, num));
-                if(StrUtil.isEmpty(finishStr)){
-                    finishStr = "0";
-                }
-                CommonSuccessStatus commonSuccessStatus = CommonSuccessStatus.get(Integer.valueOf(finishStr));
-                switch (commonSuccessStatus){
-                    case SUCCESS:
-                        //计算结果处理消费者消费完毕后,会发送一次进度为100的消息,这里就不需要再做任务操作,
-                        finish = true;
-                        break;
-                    case FAIL:
-                        finish = true;
-                        params.put("progress", mainProgress);
-                        params.put("status", commonSuccessStatus.code());
-                        log.info("场景计算失败,发送进度请求,url:{}, param:{}", buildProgressUrl, JSON.toJSONString(params));
-                        this.sendNotice(params);
-                        break;
-                    case WAITING:
-                        mainProgress += factor;
-                        log.info("mainProgress:{}", mainProgress);
-                        //如果预估的时间比实际的时间要慢,那么这里的进度条会草超过100,所以当超过100时,不需要再发送进度了,只需要等计算结果处理监听中的计算完毕去发送100即可
-                        if(mainProgress >= 100){
-                            finish = true;
-                        }else{
-                            params.put("progress", mainProgress);
-                            params.put("status", commonSuccessStatus.code());
-                            log.info("场景计算进行中,发送进度请求,url:{}, param:{}", buildProgressUrl, JSON.toJSONString(params));
-                            this.sendNotice(params);
-                        }
-                        break;
-                }
-            }while (!finish);
-
-            log.info("场景计算完成,删除计算完成redis记录,key:{}", String.format(RedisKey.SCENE_BUILD_FINISH_NUM, num));
-            redisUtil.del(String.format(RedisKey.SCENE_BUILD_FINISH_NUM, num));
-            log.info("推送计算进度结束,num:{}", num);
-        });
+//        String num = buildSceneCallMessage.getSceneNum();
+//        String customUserId = (String)buildSceneCallMessage.getExt().get("customUserId");
+//        String gps = (String)buildSceneCallMessage.getExt().get("gps");
+//        String path = buildSceneCallMessage.getPath();
+//
+//        ScenePlus scenePlus = scenePlusService.getScenePlusByNum(num);
+//        ScenePlusExt scenePlusExt = scenePlusExtService.getScenePlusExtByPlusId(scenePlus.getId());
+//        String website = scenePlusExt.getWebSite();
+//        String title = scenePlus.getTitle();
+//
+//        Long factor = 10L;
+//        Long totalTime = 300L;
+//        Integer shootCount = scenePlusExt.getShootCount();
+//        if(Objects.nonNull(shootCount)){
+//            totalTime += shootCount*7*60;//预估7分钟一个点位
+//        }
+//        redisUtil.set(String.format(RedisKey.SCENE_BUILD_EXPECT_TOTAL_TIME_NUM, num), String.valueOf(totalTime));
+//        Long intervalTime = totalTime/factor;//发送计算进度时间窗口
+//        log.info("推送进度时间窗口:{}", intervalTime);
+//
+//        Map<String, Object> params = new HashMap<>();
+//        params.put("website", mainUrl.concat(website));
+//        params.put("title", title);
+//        params.put("customUserId", customUserId);
+//        params.put("gps", gps);
+//        params.put("totalTime", totalTime);
+//        params.put("progress", 0);
+//        params.put("status", CommonSuccessStatus.WAITING.code());
+//        log.info("场景计算开始,发送进度请求,url:{}, param:{}", buildProgressUrl, JSON.toJSONString(params));
+//        HttpUtil.post(buildProgressUrl, JSON.toJSONString(params), 2000);
+//
+//        ExecutorService executorService = ThreadUtil.newSingleExecutor();
+//        executorService.submit(()->{
+//            boolean finish = false;
+//            int mainProgress = 0;
+//            do {
+//                try {
+//                    Thread.sleep(intervalTime*1000);
+//                } catch (InterruptedException e) {
+//                    e.printStackTrace();
+//                }
+//                String finishStr = redisUtil.get(String.format(RedisKey.SCENE_BUILD_FINISH_NUM, num));
+//                if(StrUtil.isEmpty(finishStr)){
+//                    finishStr = "0";
+//                }
+//                CommonSuccessStatus commonSuccessStatus = CommonSuccessStatus.get(Integer.valueOf(finishStr));
+//                switch (commonSuccessStatus){
+//                    case SUCCESS:
+//                        //计算结果处理消费者消费完毕后,会发送一次进度为100的消息,这里就不需要再做任务操作,
+//                        finish = true;
+//                        break;
+//                    case FAIL:
+//                        finish = true;
+//                        params.put("progress", mainProgress);
+//                        params.put("status", commonSuccessStatus.code());
+//                        log.info("场景计算失败,发送进度请求,url:{}, param:{}", buildProgressUrl, JSON.toJSONString(params));
+//                        this.sendNotice(params);
+//                        break;
+//                    case WAITING:
+//                        mainProgress += factor;
+//                        log.info("mainProgress:{}", mainProgress);
+//                        //如果预估的时间比实际的时间要慢,那么这里的进度条会草超过100,所以当超过100时,不需要再发送进度了,只需要等计算结果处理监听中的计算完毕去发送100即可
+//                        if(mainProgress >= 100){
+//                            finish = true;
+//                        }else{
+//                            params.put("progress", mainProgress);
+//                            params.put("status", commonSuccessStatus.code());
+//                            log.info("场景计算进行中,发送进度请求,url:{}, param:{}", buildProgressUrl, JSON.toJSONString(params));
+//                            this.sendNotice(params);
+//                        }
+//                        break;
+//                }
+//            }while (!finish);
+//
+//            redisUtil.del(String.format(RedisKey.SCENE_BUILD_FINISH_NUM, num));
+//            log.info("推送计算进度结束,num:{}", num);
+//        });
 
     }
 

+ 40 - 37
src/main/java/com/fdkankan/contro/mq/service/impl/BuildSceneServiceImpl.java

@@ -35,7 +35,6 @@ import com.fdkankan.push.utils.PushMsgUtil;
 import com.fdkankan.rabbitmq.bean.BuildSceneCallMessage;
 import com.fdkankan.rabbitmq.bean.BuildSceneResultMqMessage;
 import com.fdkankan.rabbitmq.util.RabbitMqProducer;
-import com.fdkankan.redis.constant.RedisKey;
 import com.fdkankan.redis.util.RedisUtil;
 import lombok.extern.slf4j.Slf4j;
 import org.apache.commons.lang3.ObjectUtils;
@@ -157,12 +156,17 @@ public class BuildSceneServiceImpl implements IBuildSceneService {
         try {
 
             //如果场景原始资源上传批次id不为空,则需要下载批次文件上传到oss目录
-            if(StrUtil.isNotEmpty(threeCamType) && "yzl".equals(threeCamType)){
-                this.downloadOrigFile4Yzl(batchIds, message.getPath());
-            }else{
-                this.downloadOrigFile(batchIds, message.getPath());
+            if(StrUtil.isNotEmpty(batchIds)){
+                for (String batchId : batchIds.split(",")) {
+                    if(StrUtil.isNotEmpty(threeCamType) && "yzl".equals(threeCamType)){
+                        this.downloadOrigFile4Yzl(batchId, message.getPath());
+                    }else{
+                        this.downloadOrigFile(batchId, message.getPath());
+                    }
+                }
             }
 
+
             //重新计算时需要删除文件夹,否知使用缓存
             if(new File(message.getPath() + File.separator + "results").exists()){
                 FileUtils.deleteDirectory(message.getPath() + File.separator + "results");
@@ -253,6 +257,10 @@ public class BuildSceneServiceImpl implements IBuildSceneService {
 
             mqProducer.sendByWorkQueue(queueModelingCall, message);
 
+            if(StrUtil.isNotEmpty(batchIds)){
+                origFileUploadBatchService.update(new LambdaUpdateWrapper<OrigFileUploadBatch>().set(OrigFileUploadBatch::getStatus, 1).in(OrigFileUploadBatch::getBatchId, batchIds.split(",")));
+            }
+
             log.info("场景计算资源准备结束,场景码:{}", message.getSceneNum());
 
         }catch (Exception e){
@@ -378,6 +386,8 @@ public class BuildSceneServiceImpl implements IBuildSceneService {
         String sceneCode = message.getBuildContext().get("sceneNum").toString();
         String path = message.getPath();
         String batchIds = (String) message.getExt().get("batchId");
+        Long count = redisUtil.decr("modeling-count:" + sceneCode, 1);
+        log.info("场景:{},剩余计算次数:{}", sceneCode, count);
         try {
 
             //如果场景被删除,就无需往下运行了
@@ -397,36 +407,29 @@ public class BuildSceneServiceImpl implements IBuildSceneService {
             JSONObject fdageData = getFdageData(path + File.separator + "capture" +File.separator+"data.fdage");
             String uuid = fdageData.getString("creator") + "_" + fdageData.getString("uuidtime");
 
-            OrigFileUploadBatch origFileUploadBatch = null;
-            if(StrUtil.isNotEmpty(batchId)){
-                OrigFileUploadBatch condition = new OrigFileUploadBatch();
-                condition.setBatchId(batchId);
-                origFileUploadBatch = origFileUploadBatchService.getByCondition(condition);
-            }
 
             if (!message.getBuildSuccess()) {
                 log.error("建模失败,修改状态为失败状态");
-                scenePlusService.update(new LambdaUpdateWrapper<ScenePlus>()
-                        .set(ScenePlus::getSceneStatus, SceneStatus.FAILD.code())
-                        .eq(ScenePlus::getNum, sceneCode));
-
-                this.sendFailToLaser(sceneCode);
-
-                redisUtil.set(String.format(RedisKey.SCENE_BUILD_FINISH_NUM, sceneCode), "-1");
-
-
-                Map<String, Object> param = new HashMap<>();
-                try {
-                    param.put("event_type", "计算失败");
-                    param.put("event_content", "算法报错");
-                    param.put("scene_num", sceneCode);
-                    param.put("event_time", new Date());
-                    param.put("ryid", scenePlusService.getRyIdByNum(sceneCode));
-                    jmgaService.sendStatus(param);
-                }catch (Exception e){
-                    log.info("推送事件失败,param:{}", param);
+                if(count < 1){
+                    scenePlusService.update(new LambdaUpdateWrapper<ScenePlus>()
+                            .set(ScenePlus::getSceneStatus, SceneStatus.FAILD.code())
+                            .eq(ScenePlus::getNum, sceneCode));
+
+                    this.sendFailToLaser(sceneCode);
+
+                    Map<String, Object> param = new HashMap<>();
+                    try {
+                        param.put("event_type", "计算失败");
+                        param.put("event_content", "算法报错");
+                        param.put("scene_num", sceneCode);
+                        param.put("event_time", new Date());
+                        param.put("ryid", scenePlusService.getRyIdByNum(sceneCode));
+                        jmgaService.sendStatus(param);
+                    }catch (Exception e){
+                        log.info("推送事件失败,param:{}", param);
+                    }
                 }
-
+//                redisUtil.set(String.format(RedisKey.SCENE_BUILD_FINISH_NUM, sceneCode), "-1");
                 return;
             }
 
@@ -578,11 +581,6 @@ public class BuildSceneServiceImpl implements IBuildSceneService {
             scenePlusService.updateById(scenePlus);
             scenePlusExtService.updateById(scenePlusExt);
 
-            if(Objects.nonNull(origFileUploadBatch)){
-                origFileUploadBatch.setStatus(3);
-                origFileUploadBatchService.updateById(origFileUploadBatch);
-            }
-
             //国际环境需要发邮件通知
             if("eur".equals(env)){
                 commonService.sendEmail(sceneCode);
@@ -594,7 +592,7 @@ public class BuildSceneServiceImpl implements IBuildSceneService {
             //推送ai识别平面图mq
             this.sendMqForAiPano(sceneCode);
 
-            redisUtil.set(String.format(RedisKey.SCENE_BUILD_FINISH_NUM, sceneCode), "1");
+//            redisUtil.set(String.format(RedisKey.SCENE_BUILD_FINISH_NUM, sceneCode), "1");
 
             log.info("场景计算结果处理结束,场景码:{}", sceneCode);
 
@@ -637,6 +635,11 @@ public class BuildSceneServiceImpl implements IBuildSceneService {
             sceneStatusParam.put("num", sceneCode);
             sceneStatusParam.put("status", message.getBuildSuccess() ? 1 : -1);
 //            commonService.sendUpdateSceneStatusMqToQueues(sceneStatusParam);
+
+            if(StrUtil.isNotEmpty(batchIds)){
+                origFileUploadBatchService.update(new LambdaUpdateWrapper<OrigFileUploadBatch>().set(OrigFileUploadBatch::getStatus, 3).in(OrigFileUploadBatch::getBatchId, batchIds.split(",")));
+            }
+
         }
     }
 

+ 123 - 0
src/main/java/com/fdkankan/contro/mq/service/impl/CallBuildServiceImpl.java

@@ -0,0 +1,123 @@
+package com.fdkankan.contro.mq.service.impl;
+
+import cn.hutool.core.collection.CollUtil;
+import com.alibaba.fastjson.JSON;
+import com.baomidou.mybatisplus.core.conditions.query.LambdaQueryWrapper;
+import com.fdkankan.contro.entity.OrigFileUploadBatch;
+import com.fdkankan.contro.entity.ScenePlus;
+import com.fdkankan.contro.mq.service.CallBuildService;
+import com.fdkankan.contro.service.IOrigFileUploadBatchService;
+import com.fdkankan.contro.service.IOrigFileUploadService;
+import com.fdkankan.contro.service.IScenePlusService;
+import com.fdkankan.rabbitmq.bean.BuildSceneCallMessage;
+import com.fdkankan.rabbitmq.util.RabbitMqProducer;
+import com.fdkankan.redis.util.RedisLockUtil;
+import com.fdkankan.redis.util.RedisUtil;
+import lombok.extern.slf4j.Slf4j;
+import org.springframework.beans.factory.annotation.Autowired;
+import org.springframework.beans.factory.annotation.Value;
+import org.springframework.stereotype.Service;
+
+import javax.annotation.Resource;
+import java.util.*;
+
+@Slf4j
+@Service
+public class CallBuildServiceImpl implements CallBuildService {
+
+    @Autowired
+    private IOrigFileUploadBatchService origFileUploadBatchService;
+    @Resource
+    private RedisLockUtil redisLockUtil;
+    @Autowired
+    private IScenePlusService scenePlusService;
+    @Autowired
+    private IOrigFileUploadService origFileUploadService;
+    @Resource
+    private RabbitMqProducer rabbitMqProducer;
+    @Value("${queue.modeling.modeling-pre}")
+    private String queueModelingPre;
+    @Resource
+    private RedisUtil redisUtil;
+
+
+    @Override
+    public void pushPre() {
+
+        log.info("推送计算定时任务开始");
+
+        List<OrigFileUploadBatch> list = origFileUploadBatchService.list(
+                new LambdaQueryWrapper<OrigFileUploadBatch>()
+                        .eq(OrigFileUploadBatch::getStatus, 4)
+                        .orderByAsc(OrigFileUploadBatch::getCreateTime));
+        if(CollUtil.isEmpty(list)){
+            return;
+        }
+        LinkedHashMap<String, LinkedList<OrigFileUploadBatch>> map = new LinkedHashMap<>();
+        for (OrigFileUploadBatch origFileUploadBatch : list) {
+            String uuid = origFileUploadBatch.getUuid();
+            LinkedList<OrigFileUploadBatch> origFileUploadBatches = map.get(uuid);
+            if(origFileUploadBatches == null){
+                origFileUploadBatches = new LinkedList<>();
+                map.put(uuid, origFileUploadBatches);
+            }
+            origFileUploadBatches.add(origFileUploadBatch);
+
+        }
+        for (String uuid : map.keySet()) {
+            ScenePlus scenePlus = scenePlusService.getByUuid(uuid);
+            boolean lock = redisLockUtil.lock("push-modeling-pre:" + scenePlus.getNum(), scenePlus.getNum(), 30*60*1000);
+            if(!lock){
+                continue;
+            }
+            try {
+                //过滤出同一个场景且待推送的上传批次
+                Set<String> batchIds = new HashSet<>();
+                StringBuilder batchIdStr = new StringBuilder();
+                for (OrigFileUploadBatch fileUploadBatch : map.get(uuid)) {
+                    if(uuid.equals(fileUploadBatch.getUuid())){
+                        batchIds.add(fileUploadBatch.getBatchId());
+                        batchIdStr.append(",").append(fileUploadBatch.getBatchId());
+                    }
+                }
+                BuildSceneCallMessage mqMessage = JSON.parseObject(map.get(uuid).getLast().getMqContent(), BuildSceneCallMessage.class);
+                mqMessage.getExt().put("batchId", batchIdStr.substring(1));
+                rabbitMqProducer.sendByWorkQueue(queueModelingPre, mqMessage);
+
+                long incr = redisUtil.incr("modeling-count:" + scenePlus.getNum(), 1);
+                log.info("场景:{},计算推送次数:{}", scenePlus.getNum(), incr);
+
+            }catch (Exception e){
+                log.error("推送场景计算失败,uuid:{}", uuid, e);
+            }finally {
+                redisLockUtil.unlock("push-modeling-pre:" + scenePlus.getNum(), scenePlus.getNum());
+            }
+
+        }
+
+        log.info("推送计算定时任务结束");
+    }
+
+    public static void main(String[] args) {
+        List<OrigFileUploadBatch> list = new ArrayList<>();
+        OrigFileUploadBatch a = new OrigFileUploadBatch();
+        a.setUuid("123");
+        OrigFileUploadBatch b = new OrigFileUploadBatch();
+        b.setUuid("123");
+        OrigFileUploadBatch c = new OrigFileUploadBatch();
+        c.setUuid("123");
+        list.add(a);
+        list.add(b);
+        list.add(c);
+        for (OrigFileUploadBatch origFileUploadBatch : list) {
+            String uuid = origFileUploadBatch.getUuid();
+            for (OrigFileUploadBatch fileUploadBatch : list) {
+                if(uuid.equals(fileUploadBatch.getUuid())){
+                    list.remove(fileUploadBatch);
+                }
+            }
+        }
+
+
+    }
+}

+ 0 - 95
src/main/java/com/fdkankan/contro/mq/service/impl/JmgaServiceImpl.java

@@ -1,95 +0,0 @@
-package com.fdkankan.contro.mq.service.impl;
-
-import cn.hutool.core.collection.CollUtil;
-import com.alibaba.fastjson.JSON;
-import com.baomidou.mybatisplus.core.conditions.query.LambdaQueryWrapper;
-import com.baomidou.mybatisplus.core.conditions.update.LambdaUpdateWrapper;
-import com.fdkankan.common.constant.SceneStatus;
-import com.fdkankan.contro.entity.OrigFileUploadBatch;
-import com.fdkankan.contro.entity.ScenePlus;
-import com.fdkankan.contro.mq.service.JmgaService;
-import com.fdkankan.contro.service.IOrigFileUploadBatchService;
-import com.fdkankan.contro.service.IOrigFileUploadService;
-import com.fdkankan.contro.service.IScenePlusService;
-import com.fdkankan.rabbitmq.bean.BuildSceneCallMessage;
-import com.fdkankan.rabbitmq.util.RabbitMqProducer;
-import com.fdkankan.redis.util.RedisLockUtil;
-import lombok.extern.slf4j.Slf4j;
-import org.springframework.beans.factory.annotation.Autowired;
-import org.springframework.beans.factory.annotation.Value;
-import org.springframework.stereotype.Service;
-
-import javax.annotation.Resource;
-import java.util.List;
-import java.util.Set;
-import java.util.stream.Collectors;
-
-@Slf4j
-@Service
-public class JmgaServiceImpl implements JmgaService {
-
-    @Autowired
-    private IOrigFileUploadBatchService origFileUploadBatchService;
-    @Resource
-    private RedisLockUtil redisLockUtil;
-    @Autowired
-    private IScenePlusService scenePlusService;
-    @Autowired
-    private IOrigFileUploadService origFileUploadService;
-    @Resource
-    private RabbitMqProducer rabbitMqProducer;
-    @Value("${queue.modeling.modeling-pre}")
-    private String queueModelingPre;
-
-
-    @Override
-    public void pushPre() {
-
-        log.info("推送计算定时任务开始");
-        boolean lock = redisLockUtil.lock("push-modeling-pre", 1800);
-        try {
-            if(!lock){
-                return;
-            }
-            log.info("获取到分布式锁");
-
-            List<OrigFileUploadBatch> list = origFileUploadBatchService.list(
-                    new LambdaQueryWrapper<OrigFileUploadBatch>()
-                            .eq(OrigFileUploadBatch::getStatus, 4)
-                            .orderByAsc(OrigFileUploadBatch::getCreateTime));
-            if(CollUtil.isEmpty(list)){
-                return;
-            }
-            for (OrigFileUploadBatch origFileUploadBatch : list) {
-                String uuid = origFileUploadBatch.getUuid();
-                try {
-                    ScenePlus scenePlus = scenePlusService.getByUuid(uuid);
-                    //如果场景计算中,跳过,等待下一次定时任务
-                    if(scenePlus.getSceneStatus() == SceneStatus.wait.code().intValue()){
-                        continue;
-                    }
-                    //过滤出同一个场景且待推送的上传批次
-                    Set<String> batchIds = list.stream().filter(v -> uuid.equals(v.getUuid())).map(v -> v.getBatchId()).collect(Collectors.toSet());
-                    BuildSceneCallMessage mqMessage = JSON.parseObject(origFileUploadBatch.getMqContent(), BuildSceneCallMessage.class);
-                    StringBuilder batchIdStr = new StringBuilder();
-                    for (String batchId : batchIds) {
-                        batchIdStr.append(",").append(batchId);
-                    }
-                    mqMessage.getExt().put("batchId", batchIdStr.substring(1));
-                    rabbitMqProducer.sendByWorkQueue(queueModelingPre, mqMessage);
-                    origFileUploadBatchService.update(new LambdaUpdateWrapper<OrigFileUploadBatch>().set(OrigFileUploadBatch::getStatus, 4).in(OrigFileUploadBatch::getBatchId, batchIds));
-                }catch (Exception e){
-                    log.error("推送场景计算失败,uuid:{}", uuid, e);
-                }
-
-            }
-
-        }catch (Exception e){
-            log.info("推送计算定时任务报错");
-        }finally {
-            redisLockUtil.unlock("push-modeling-pre");
-        }
-
-        log.info("推送计算定时任务结束");
-    }
-}

+ 3 - 3
src/main/java/com/fdkankan/contro/schedule/ScheduleJob.java

@@ -1,6 +1,6 @@
 package com.fdkankan.contro.schedule;
 
-import com.fdkankan.contro.mq.service.JmgaService;
+import com.fdkankan.contro.mq.service.CallBuildService;
 import com.fdkankan.contro.service.IScene3dNumService;
 import lombok.extern.log4j.Log4j2;
 import org.springframework.beans.factory.annotation.Autowired;
@@ -14,7 +14,7 @@ public class ScheduleJob {
     @Autowired
     private IScene3dNumService scene3dNumService;
     @Autowired
-    private JmgaService jmgaService;
+    private CallBuildService callBuildService;
 
 
     /**
@@ -30,6 +30,6 @@ public class ScheduleJob {
      */
     @Scheduled(fixedDelay = 5*60*1000, initialDelay = 1000)
     public void pushPre() {
-        jmgaService.pushPre();
+        callBuildService.pushPre();
     }
 }

+ 1 - 0
src/main/java/com/fdkankan/contro/service/impl/SceneFileBuildServiceImpl.java

@@ -1419,6 +1419,7 @@ public class SceneFileBuildServiceImpl extends ServiceImpl<ISceneFileBuildMapper
         }else{
             rabbitMqProducer.sendByWorkQueue(queueModelingPre, message);
         }
+        redisUtil.incr("modeling-count:" + num, 1);
 
         scenePlusService.update(new LambdaUpdateWrapper<ScenePlus>()
             .set(ScenePlus::getSceneStatus, SceneStatus.wait.code())