package com.fdkankan.contro.mq.service.impl; import cn.hutool.core.collection.CollUtil; import com.alibaba.fastjson.JSON; import com.baomidou.mybatisplus.core.conditions.query.LambdaQueryWrapper; import com.baomidou.mybatisplus.core.conditions.update.LambdaUpdateWrapper; import com.fdkankan.common.constant.SceneStatus; import com.fdkankan.contro.entity.OrigFileUploadBatch; import com.fdkankan.contro.entity.ScenePlus; import com.fdkankan.contro.mq.service.JmgaService; import com.fdkankan.contro.service.IOrigFileUploadBatchService; import com.fdkankan.contro.service.IOrigFileUploadService; import com.fdkankan.contro.service.IScenePlusService; import com.fdkankan.rabbitmq.bean.BuildSceneCallMessage; import com.fdkankan.rabbitmq.util.RabbitMqProducer; import com.fdkankan.redis.util.RedisLockUtil; import lombok.extern.slf4j.Slf4j; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.beans.factory.annotation.Value; import org.springframework.stereotype.Service; import javax.annotation.Resource; import java.util.List; import java.util.Set; import java.util.stream.Collectors; @Slf4j @Service public class JmgaServiceImpl implements JmgaService { @Autowired private IOrigFileUploadBatchService origFileUploadBatchService; @Resource private RedisLockUtil redisLockUtil; @Autowired private IScenePlusService scenePlusService; @Autowired private IOrigFileUploadService origFileUploadService; @Resource private RabbitMqProducer rabbitMqProducer; @Value("${queue.modeling.modeling-pre}") private String queueModelingPre; @Override public void pushPre() { log.info("推送计算定时任务开始"); boolean lock = redisLockUtil.lock("push-modeling-pre", 1800); try { if(!lock){ return; } log.info("获取到分布式锁"); List list = origFileUploadBatchService.list( new LambdaQueryWrapper() .eq(OrigFileUploadBatch::getStatus, 4) .orderByAsc(OrigFileUploadBatch::getCreateTime)); if(CollUtil.isEmpty(list)){ return; } for (OrigFileUploadBatch origFileUploadBatch : list) { String uuid = origFileUploadBatch.getUuid(); try { ScenePlus scenePlus = scenePlusService.getByUuid(uuid); //如果场景计算中,跳过,等待下一次定时任务 if(scenePlus.getSceneStatus() == SceneStatus.wait.code().intValue()){ continue; } //过滤出同一个场景且待推送的上传批次 Set batchIds = list.stream().filter(v -> uuid.equals(v.getUuid())).map(v -> v.getBatchId()).collect(Collectors.toSet()); BuildSceneCallMessage mqMessage = JSON.parseObject(origFileUploadBatch.getMqContent(), BuildSceneCallMessage.class); StringBuilder batchIdStr = new StringBuilder(); for (String batchId : batchIds) { batchIdStr.append(",").append(batchId); } mqMessage.getExt().put("batchId", batchIdStr.substring(1)); rabbitMqProducer.sendByWorkQueue(queueModelingPre, mqMessage); origFileUploadBatchService.update(new LambdaUpdateWrapper().set(OrigFileUploadBatch::getStatus, 4).in(OrigFileUploadBatch::getBatchId, batchIds)); }catch (Exception e){ log.error("推送场景计算失败,uuid:{}", uuid, e); } } }catch (Exception e){ log.info("推送计算定时任务报错"); }finally { redisLockUtil.unlock("push-modeling-pre"); } log.info("推送计算定时任务结束"); } }