JmgaServiceImpl.java 3.9 KB

1234567891011121314151617181920212223242526272829303132333435363738394041424344454647484950515253545556575859606162636465666768697071727374757677787980818283848586878889909192939495
  1. package com.fdkankan.contro.mq.service.impl;
  2. import cn.hutool.core.collection.CollUtil;
  3. import com.alibaba.fastjson.JSON;
  4. import com.baomidou.mybatisplus.core.conditions.query.LambdaQueryWrapper;
  5. import com.baomidou.mybatisplus.core.conditions.update.LambdaUpdateWrapper;
  6. import com.fdkankan.common.constant.SceneStatus;
  7. import com.fdkankan.contro.entity.OrigFileUploadBatch;
  8. import com.fdkankan.contro.entity.ScenePlus;
  9. import com.fdkankan.contro.mq.service.JmgaService;
  10. import com.fdkankan.contro.service.IOrigFileUploadBatchService;
  11. import com.fdkankan.contro.service.IOrigFileUploadService;
  12. import com.fdkankan.contro.service.IScenePlusService;
  13. import com.fdkankan.rabbitmq.bean.BuildSceneCallMessage;
  14. import com.fdkankan.rabbitmq.util.RabbitMqProducer;
  15. import com.fdkankan.redis.util.RedisLockUtil;
  16. import lombok.extern.slf4j.Slf4j;
  17. import org.springframework.beans.factory.annotation.Autowired;
  18. import org.springframework.beans.factory.annotation.Value;
  19. import org.springframework.stereotype.Service;
  20. import javax.annotation.Resource;
  21. import java.util.List;
  22. import java.util.Set;
  23. import java.util.stream.Collectors;
  24. @Slf4j
  25. @Service
  26. public class JmgaServiceImpl implements JmgaService {
  27. @Autowired
  28. private IOrigFileUploadBatchService origFileUploadBatchService;
  29. @Resource
  30. private RedisLockUtil redisLockUtil;
  31. @Autowired
  32. private IScenePlusService scenePlusService;
  33. @Autowired
  34. private IOrigFileUploadService origFileUploadService;
  35. @Resource
  36. private RabbitMqProducer rabbitMqProducer;
  37. @Value("${queue.modeling.modeling-pre}")
  38. private String queueModelingPre;
  39. @Override
  40. public void pushPre() {
  41. log.info("推送计算定时任务开始");
  42. boolean lock = redisLockUtil.lock("push-modeling-pre", 1800);
  43. try {
  44. if(!lock){
  45. return;
  46. }
  47. log.info("获取到分布式锁");
  48. List<OrigFileUploadBatch> list = origFileUploadBatchService.list(
  49. new LambdaQueryWrapper<OrigFileUploadBatch>()
  50. .eq(OrigFileUploadBatch::getStatus, 4)
  51. .orderByAsc(OrigFileUploadBatch::getCreateTime));
  52. if(CollUtil.isEmpty(list)){
  53. return;
  54. }
  55. for (OrigFileUploadBatch origFileUploadBatch : list) {
  56. String uuid = origFileUploadBatch.getUuid();
  57. try {
  58. ScenePlus scenePlus = scenePlusService.getByUuid(uuid);
  59. //如果场景计算中,跳过,等待下一次定时任务
  60. if(scenePlus.getSceneStatus() == SceneStatus.wait.code().intValue()){
  61. continue;
  62. }
  63. //过滤出同一个场景且待推送的上传批次
  64. Set<String> batchIds = list.stream().filter(v -> uuid.equals(v.getUuid())).map(v -> v.getBatchId()).collect(Collectors.toSet());
  65. BuildSceneCallMessage mqMessage = JSON.parseObject(origFileUploadBatch.getMqContent(), BuildSceneCallMessage.class);
  66. StringBuilder batchIdStr = new StringBuilder();
  67. for (String batchId : batchIds) {
  68. batchIdStr.append(",").append(batchId);
  69. }
  70. mqMessage.getExt().put("batchId", batchIdStr.substring(1));
  71. rabbitMqProducer.sendByWorkQueue(queueModelingPre, mqMessage);
  72. origFileUploadBatchService.update(new LambdaUpdateWrapper<OrigFileUploadBatch>().set(OrigFileUploadBatch::getStatus, 4).in(OrigFileUploadBatch::getBatchId, batchIds));
  73. }catch (Exception e){
  74. log.error("推送场景计算失败,uuid:{}", uuid, e);
  75. }
  76. }
  77. }catch (Exception e){
  78. log.info("推送计算定时任务报错");
  79. }finally {
  80. redisLockUtil.unlock("push-modeling-pre");
  81. }
  82. log.info("推送计算定时任务结束");
  83. }
  84. }