CallBuildServiceImpl.java 9.0 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213
  1. package com.fdkankan.contro.mq.service.impl;
  2. import cn.hutool.core.collection.CollUtil;
  3. import cn.hutool.core.io.FileUtil;
  4. import cn.hutool.core.util.StrUtil;
  5. import cn.hutool.http.HttpUtil;
  6. import com.alibaba.fastjson.JSON;
  7. import com.alibaba.fastjson.JSONArray;
  8. import com.alibaba.fastjson.JSONObject;
  9. import com.baomidou.mybatisplus.core.conditions.query.LambdaQueryWrapper;
  10. import com.fdkankan.contro.entity.OrigFileUpload;
  11. import com.fdkankan.contro.entity.OrigFileUploadBatch;
  12. import com.fdkankan.contro.entity.ScenePlus;
  13. import com.fdkankan.contro.mq.service.CallBuildService;
  14. import com.fdkankan.contro.service.IOrigFileUploadBatchService;
  15. import com.fdkankan.contro.service.IOrigFileUploadService;
  16. import com.fdkankan.contro.service.IScenePlusService;
  17. import com.fdkankan.contro.util.HttpUtilExt;
  18. import com.fdkankan.model.utils.SceneUtil;
  19. import com.fdkankan.rabbitmq.bean.BuildSceneCallMessage;
  20. import com.fdkankan.rabbitmq.util.RabbitMqProducer;
  21. import com.fdkankan.redis.util.RedisLockUtil;
  22. import com.fdkankan.redis.util.RedisUtil;
  23. import lombok.extern.slf4j.Slf4j;
  24. import org.springframework.beans.factory.annotation.Autowired;
  25. import org.springframework.beans.factory.annotation.Value;
  26. import org.springframework.stereotype.Service;
  27. import javax.annotation.Resource;
  28. import java.io.File;
  29. import java.util.*;
  30. import java.util.stream.Collectors;
  31. @Slf4j
  32. @Service
  33. public class CallBuildServiceImpl implements CallBuildService {
  34. @Autowired
  35. private IOrigFileUploadBatchService origFileUploadBatchService;
  36. @Resource
  37. private RedisLockUtil redisLockUtil;
  38. @Autowired
  39. private IScenePlusService scenePlusService;
  40. @Autowired
  41. private IOrigFileUploadService origFileUploadService;
  42. @Resource
  43. private RabbitMqProducer rabbitMqProducer;
  44. @Value("${queue.modeling.modeling-pre}")
  45. private String queueModelingPre;
  46. @Resource
  47. private RedisUtil redisUtil;
  48. @Override
  49. public void pushPre() {
  50. log.info("推送计算定时任务开始");
  51. List<OrigFileUploadBatch> list = origFileUploadBatchService.list(
  52. new LambdaQueryWrapper<OrigFileUploadBatch>()
  53. .eq(OrigFileUploadBatch::getStatus, 4)
  54. .eq(OrigFileUploadBatch::getCallType, 1)
  55. .orderByAsc(OrigFileUploadBatch::getCreateTime));
  56. if(CollUtil.isEmpty(list)){
  57. return;
  58. }
  59. LinkedHashMap<String, LinkedList<OrigFileUploadBatch>> map = new LinkedHashMap<>();
  60. for (OrigFileUploadBatch origFileUploadBatch : list) {
  61. String uuid = origFileUploadBatch.getUuid();
  62. LinkedList<OrigFileUploadBatch> origFileUploadBatches = map.get(uuid);
  63. if(origFileUploadBatches == null){
  64. origFileUploadBatches = new LinkedList<>();
  65. map.put(uuid, origFileUploadBatches);
  66. }
  67. origFileUploadBatches.add(origFileUploadBatch);
  68. }
  69. for (String uuid : map.keySet()) {
  70. ScenePlus scenePlus = scenePlusService.getByUuid(uuid);
  71. boolean lock = redisLockUtil.lock("push-modeling-pre:" + scenePlus.getNum(), scenePlus.getNum(), 30*60);
  72. if(!lock){
  73. continue;
  74. }
  75. try {
  76. //过滤出同一个场景且待推送的上传批次
  77. Set<String> batchIds = new HashSet<>();
  78. StringBuilder batchIdStr = new StringBuilder();
  79. for (OrigFileUploadBatch fileUploadBatch : map.get(uuid)) {
  80. if(uuid.equals(fileUploadBatch.getUuid())){
  81. batchIds.add(fileUploadBatch.getBatchId());
  82. batchIdStr.append(",").append(fileUploadBatch.getBatchId());
  83. }
  84. }
  85. BuildSceneCallMessage mqMessage = JSON.parseObject(map.get(uuid).getLast().getMqContent(), BuildSceneCallMessage.class);
  86. mqMessage.getExt().put("batchId", batchIdStr.substring(1));
  87. String threeCamType = (String) mqMessage.getExt().get("threeCamType");
  88. //如果场景原始资源上传批次id不为空,则需要下载批次文件上传到oss目录
  89. if(StrUtil.isNotEmpty(batchIdStr)){
  90. for (String batchId : batchIdStr.substring(1).split(",")) {
  91. if(StrUtil.isNotEmpty(threeCamType) && "yzl".equals(threeCamType)){
  92. this.downloadOrigFile4Yzl(batchId, mqMessage.getPath());
  93. }else{
  94. this.downloadOrigFile(batchId, mqMessage.getPath());
  95. }
  96. }
  97. }
  98. rabbitMqProducer.sendByWorkQueue(queueModelingPre, mqMessage);
  99. long incr = redisUtil.incr("modeling-count:" + scenePlus.getNum(), 1);
  100. log.info("场景:{},计算推送次数:{}", scenePlus.getNum(), incr);
  101. }catch (Exception e){
  102. log.error("推送场景计算失败,uuid:{}", uuid, e);
  103. }
  104. }
  105. log.info("推送计算定时任务结束");
  106. }
  107. private void downloadOrigFile4Yzl(String batchId, String dataSource){
  108. if(StrUtil.isEmpty(batchId)){
  109. return;
  110. }
  111. OrigFileUpload sceneUpData = origFileUploadService.getByBatchIdAndFileName(batchId, "scene_up_data.txt");
  112. String sceneUpDataPath = dataSource + "/" + "scene_up_data.txt";
  113. HttpUtilExt.downloadFileAndCheck(sceneUpData.getFileUrl(), sceneUpDataPath, 60000);
  114. String sceneUpDataStr = FileUtil.readUtf8String(sceneUpDataPath);
  115. JSONArray fileJsonArray = JSON.parseArray(sceneUpDataStr);
  116. Map<String, String> fileMap = fileJsonArray.stream().collect(Collectors.toMap(v -> ((JSONObject) v).getString("fileName"), v -> ((JSONObject) v).getString("filePath")));
  117. List<OrigFileUpload> fileList = origFileUploadService.getByBatchId(batchId);
  118. if(CollUtil.isEmpty(fileList)){
  119. return;
  120. }
  121. String homePath = "/oss/4dkankan/" + SceneUtil.getHomePath(dataSource);
  122. fileList.parallelStream().forEach(origFileUpload->{
  123. String relativeFilePath = fileMap.get(origFileUpload.getFileName());
  124. if(StrUtil.isNotEmpty(relativeFilePath)){
  125. int times = 0;
  126. String filePath = homePath.concat(relativeFilePath);
  127. log.info("开始下载原始文件, url:{}, filePath:{}", origFileUpload.getFileUrl(), filePath);
  128. do{
  129. try {
  130. ++times;
  131. HttpUtil.downloadFile(origFileUpload.getFileUrl(), new File(filePath), 3 * 60 * 1000);
  132. if(FileUtil.exist(filePath)){
  133. break;
  134. }
  135. }catch (Exception e){
  136. log.info("原始文件第{}次下载失败,fileUrl:{}, filePath:{}", times, origFileUpload.getFileUrl(), filePath);
  137. }
  138. }while (times < 4);
  139. log.info("下载原始文件结束, url:{}, filePath:{}", origFileUpload.getFileUrl(), filePath);
  140. }
  141. });
  142. }
  143. private void downloadOrigFile(String batchId, String dataSource){
  144. if(StrUtil.isEmpty(batchId)){
  145. return;
  146. }
  147. List<OrigFileUpload> fileList = origFileUploadService.getByBatchId(batchId);
  148. if(CollUtil.isEmpty(fileList)){
  149. return;
  150. }
  151. String homePath = "/oss/4dkankan/" + SceneUtil.getHomePath(dataSource);
  152. fileList.parallelStream().forEach(origFileUpload->{
  153. int times = 0;
  154. String filePath = homePath.concat(origFileUpload.getFileName());
  155. log.info("开始下载原始文件, url:{}, filePath:{}", origFileUpload.getFileUrl(), filePath);
  156. do{
  157. try {
  158. ++times;
  159. HttpUtil.downloadFile(origFileUpload.getFileUrl(), new File(filePath), 3 * 60 * 1000);
  160. if(FileUtil.exist(filePath)){
  161. break;
  162. }
  163. }catch (Exception e){
  164. log.info("原始文件第{}次下载失败,fileUrl:{}, filePath:{}", times, origFileUpload.getFileUrl(), filePath);
  165. }
  166. }while (times < 3);
  167. log.info("下载原始文件结束, url:{}, filePath:{}", origFileUpload.getFileUrl(), filePath);
  168. });
  169. }
  170. public static void main(String[] args) {
  171. List<OrigFileUploadBatch> list = new ArrayList<>();
  172. OrigFileUploadBatch a = new OrigFileUploadBatch();
  173. a.setUuid("123");
  174. OrigFileUploadBatch b = new OrigFileUploadBatch();
  175. b.setUuid("123");
  176. OrigFileUploadBatch c = new OrigFileUploadBatch();
  177. c.setUuid("123");
  178. list.add(a);
  179. list.add(b);
  180. list.add(c);
  181. for (OrigFileUploadBatch origFileUploadBatch : list) {
  182. String uuid = origFileUploadBatch.getUuid();
  183. for (OrigFileUploadBatch fileUploadBatch : list) {
  184. if(uuid.equals(fileUploadBatch.getUuid())){
  185. list.remove(fileUploadBatch);
  186. }
  187. }
  188. }
  189. }
  190. }