|
@@ -0,0 +1,95 @@
|
|
|
|
+package com.fdkankan.contro.mq.service.impl;
|
|
|
|
+
|
|
|
|
+import cn.hutool.core.collection.CollUtil;
|
|
|
|
+import com.alibaba.fastjson.JSON;
|
|
|
|
+import com.baomidou.mybatisplus.core.conditions.query.LambdaQueryWrapper;
|
|
|
|
+import com.baomidou.mybatisplus.core.conditions.update.LambdaUpdateWrapper;
|
|
|
|
+import com.fdkankan.common.constant.SceneStatus;
|
|
|
|
+import com.fdkankan.contro.entity.OrigFileUploadBatch;
|
|
|
|
+import com.fdkankan.contro.entity.ScenePlus;
|
|
|
|
+import com.fdkankan.contro.mq.service.JmgaService;
|
|
|
|
+import com.fdkankan.contro.service.IOrigFileUploadBatchService;
|
|
|
|
+import com.fdkankan.contro.service.IOrigFileUploadService;
|
|
|
|
+import com.fdkankan.contro.service.IScenePlusService;
|
|
|
|
+import com.fdkankan.rabbitmq.bean.BuildSceneCallMessage;
|
|
|
|
+import com.fdkankan.rabbitmq.util.RabbitMqProducer;
|
|
|
|
+import com.fdkankan.redis.util.RedisLockUtil;
|
|
|
|
+import lombok.extern.slf4j.Slf4j;
|
|
|
|
+import org.springframework.beans.factory.annotation.Autowired;
|
|
|
|
+import org.springframework.beans.factory.annotation.Value;
|
|
|
|
+import org.springframework.stereotype.Service;
|
|
|
|
+
|
|
|
|
+import javax.annotation.Resource;
|
|
|
|
+import java.util.List;
|
|
|
|
+import java.util.Set;
|
|
|
|
+import java.util.stream.Collectors;
|
|
|
|
+
|
|
|
|
+@Slf4j
|
|
|
|
+@Service
|
|
|
|
+public class JmgaServiceImpl implements JmgaService {
|
|
|
|
+
|
|
|
|
+ @Autowired
|
|
|
|
+ private IOrigFileUploadBatchService origFileUploadBatchService;
|
|
|
|
+ @Resource
|
|
|
|
+ private RedisLockUtil redisLockUtil;
|
|
|
|
+ @Autowired
|
|
|
|
+ private IScenePlusService scenePlusService;
|
|
|
|
+ @Autowired
|
|
|
|
+ private IOrigFileUploadService origFileUploadService;
|
|
|
|
+ @Resource
|
|
|
|
+ private RabbitMqProducer rabbitMqProducer;
|
|
|
|
+ @Value("${queue.modeling.modeling-pre}")
|
|
|
|
+ private String queueModelingPre;
|
|
|
|
+
|
|
|
|
+
|
|
|
|
+ @Override
|
|
|
|
+ public void pushPre() {
|
|
|
|
+
|
|
|
|
+ log.info("推送计算定时任务开始");
|
|
|
|
+ boolean lock = redisLockUtil.lock("push-modeling-pre", 1800);
|
|
|
|
+ try {
|
|
|
|
+ if(!lock){
|
|
|
|
+ return;
|
|
|
|
+ }
|
|
|
|
+ log.info("获取到分布式锁");
|
|
|
|
+
|
|
|
|
+ List<OrigFileUploadBatch> list = origFileUploadBatchService.list(
|
|
|
|
+ new LambdaQueryWrapper<OrigFileUploadBatch>()
|
|
|
|
+ .eq(OrigFileUploadBatch::getStatus, 4)
|
|
|
|
+ .orderByAsc(OrigFileUploadBatch::getCreateTime));
|
|
|
|
+ if(CollUtil.isEmpty(list)){
|
|
|
|
+ return;
|
|
|
|
+ }
|
|
|
|
+ for (OrigFileUploadBatch origFileUploadBatch : list) {
|
|
|
|
+ String uuid = origFileUploadBatch.getUuid();
|
|
|
|
+ try {
|
|
|
|
+ ScenePlus scenePlus = scenePlusService.getByUuid(uuid);
|
|
|
|
+ //如果场景计算中,跳过,等待下一次定时任务
|
|
|
|
+ if(scenePlus.getSceneStatus() == SceneStatus.wait.code().intValue()){
|
|
|
|
+ continue;
|
|
|
|
+ }
|
|
|
|
+ //过滤出同一个场景且待推送的上传批次
|
|
|
|
+ Set<String> batchIds = list.stream().filter(v -> uuid.equals(v.getUuid())).map(v -> v.getBatchId()).collect(Collectors.toSet());
|
|
|
|
+ BuildSceneCallMessage mqMessage = JSON.parseObject(origFileUploadBatch.getMqContent(), BuildSceneCallMessage.class);
|
|
|
|
+ StringBuilder batchIdStr = new StringBuilder();
|
|
|
|
+ for (String batchId : batchIds) {
|
|
|
|
+ batchIdStr.append(",").append(batchId);
|
|
|
|
+ }
|
|
|
|
+ mqMessage.getExt().put("batchId", batchIdStr.substring(1));
|
|
|
|
+ rabbitMqProducer.sendByWorkQueue(queueModelingPre, mqMessage);
|
|
|
|
+ origFileUploadBatchService.update(new LambdaUpdateWrapper<OrigFileUploadBatch>().set(OrigFileUploadBatch::getStatus, 4).in(OrigFileUploadBatch::getBatchId, batchIds));
|
|
|
|
+ }catch (Exception e){
|
|
|
|
+ log.error("推送场景计算失败,uuid:{}", uuid, e);
|
|
|
|
+ }
|
|
|
|
+
|
|
|
|
+ }
|
|
|
|
+
|
|
|
|
+ }catch (Exception e){
|
|
|
|
+ log.info("推送计算定时任务报错");
|
|
|
|
+ }finally {
|
|
|
|
+ redisLockUtil.unlock("push-modeling-pre");
|
|
|
|
+ }
|
|
|
|
+
|
|
|
|
+ log.info("推送计算定时任务结束");
|
|
|
|
+ }
|
|
|
|
+}
|