1234567891011121314151617181920212223242526272829303132333435363738394041424344454647484950515253545556575859606162636465666768697071727374757677787980818283848586878889909192939495 |
- package com.fdkankan.contro.mq.service.impl;
- import cn.hutool.core.collection.CollUtil;
- import com.alibaba.fastjson.JSON;
- import com.baomidou.mybatisplus.core.conditions.query.LambdaQueryWrapper;
- import com.baomidou.mybatisplus.core.conditions.update.LambdaUpdateWrapper;
- import com.fdkankan.common.constant.SceneStatus;
- import com.fdkankan.contro.entity.OrigFileUploadBatch;
- import com.fdkankan.contro.entity.ScenePlus;
- import com.fdkankan.contro.mq.service.JmgaService;
- import com.fdkankan.contro.service.IOrigFileUploadBatchService;
- import com.fdkankan.contro.service.IOrigFileUploadService;
- import com.fdkankan.contro.service.IScenePlusService;
- import com.fdkankan.rabbitmq.bean.BuildSceneCallMessage;
- import com.fdkankan.rabbitmq.util.RabbitMqProducer;
- import com.fdkankan.redis.util.RedisLockUtil;
- import lombok.extern.slf4j.Slf4j;
- import org.springframework.beans.factory.annotation.Autowired;
- import org.springframework.beans.factory.annotation.Value;
- import org.springframework.stereotype.Service;
- import javax.annotation.Resource;
- import java.util.List;
- import java.util.Set;
- import java.util.stream.Collectors;
- @Slf4j
- @Service
- public class JmgaServiceImpl implements JmgaService {
- @Autowired
- private IOrigFileUploadBatchService origFileUploadBatchService;
- @Resource
- private RedisLockUtil redisLockUtil;
- @Autowired
- private IScenePlusService scenePlusService;
- @Autowired
- private IOrigFileUploadService origFileUploadService;
- @Resource
- private RabbitMqProducer rabbitMqProducer;
- @Value("${queue.modeling.modeling-pre}")
- private String queueModelingPre;
- @Override
- public void pushPre() {
- log.info("推送计算定时任务开始");
- boolean lock = redisLockUtil.lock("push-modeling-pre", 1800);
- try {
- if(!lock){
- return;
- }
- log.info("获取到分布式锁");
- List<OrigFileUploadBatch> list = origFileUploadBatchService.list(
- new LambdaQueryWrapper<OrigFileUploadBatch>()
- .eq(OrigFileUploadBatch::getStatus, 4)
- .orderByAsc(OrigFileUploadBatch::getCreateTime));
- if(CollUtil.isEmpty(list)){
- return;
- }
- for (OrigFileUploadBatch origFileUploadBatch : list) {
- String uuid = origFileUploadBatch.getUuid();
- try {
- ScenePlus scenePlus = scenePlusService.getByUuid(uuid);
- //如果场景计算中,跳过,等待下一次定时任务
- if(scenePlus.getSceneStatus() == SceneStatus.wait.code().intValue()){
- continue;
- }
- //过滤出同一个场景且待推送的上传批次
- Set<String> batchIds = list.stream().filter(v -> uuid.equals(v.getUuid())).map(v -> v.getBatchId()).collect(Collectors.toSet());
- BuildSceneCallMessage mqMessage = JSON.parseObject(origFileUploadBatch.getMqContent(), BuildSceneCallMessage.class);
- StringBuilder batchIdStr = new StringBuilder();
- for (String batchId : batchIds) {
- batchIdStr.append(",").append(batchId);
- }
- mqMessage.getExt().put("batchId", batchIdStr.substring(1));
- rabbitMqProducer.sendByWorkQueue(queueModelingPre, mqMessage);
- origFileUploadBatchService.update(new LambdaUpdateWrapper<OrigFileUploadBatch>().set(OrigFileUploadBatch::getStatus, 4).in(OrigFileUploadBatch::getBatchId, batchIds));
- }catch (Exception e){
- log.error("推送场景计算失败,uuid:{}", uuid, e);
- }
- }
- }catch (Exception e){
- log.info("推送计算定时任务报错");
- }finally {
- redisLockUtil.unlock("push-modeling-pre");
- }
- log.info("推送计算定时任务结束");
- }
- }
|