package com.fdkankan.contro.mq.service.impl; import cn.hutool.core.collection.CollUtil; import cn.hutool.core.io.FileUtil; import cn.hutool.core.util.StrUtil; import cn.hutool.http.HttpUtil; import com.alibaba.fastjson.JSON; import com.alibaba.fastjson.JSONArray; import com.alibaba.fastjson.JSONObject; import com.baomidou.mybatisplus.core.conditions.query.LambdaQueryWrapper; import com.fdkankan.contro.entity.OrigFileUpload; import com.fdkankan.contro.entity.OrigFileUploadBatch; import com.fdkankan.contro.entity.ScenePlus; import com.fdkankan.contro.mq.service.CallBuildService; import com.fdkankan.contro.service.IOrigFileUploadBatchService; import com.fdkankan.contro.service.IOrigFileUploadService; import com.fdkankan.contro.service.IScenePlusService; import com.fdkankan.contro.util.HttpUtilExt; import com.fdkankan.model.utils.SceneUtil; import com.fdkankan.rabbitmq.bean.BuildSceneCallMessage; import com.fdkankan.rabbitmq.util.RabbitMqProducer; import com.fdkankan.redis.util.RedisLockUtil; import com.fdkankan.redis.util.RedisUtil; import lombok.extern.slf4j.Slf4j; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.beans.factory.annotation.Value; import org.springframework.stereotype.Service; import javax.annotation.Resource; import java.io.File; import java.util.*; import java.util.stream.Collectors; @Slf4j @Service public class CallBuildServiceImpl implements CallBuildService { @Autowired private IOrigFileUploadBatchService origFileUploadBatchService; @Resource private RedisLockUtil redisLockUtil; @Autowired private IScenePlusService scenePlusService; @Autowired private IOrigFileUploadService origFileUploadService; @Resource private RabbitMqProducer rabbitMqProducer; @Value("${queue.modeling.modeling-pre}") private String queueModelingPre; @Resource private RedisUtil redisUtil; @Override public void pushPre() { log.info("推送计算定时任务开始"); List list = origFileUploadBatchService.list( new LambdaQueryWrapper() .eq(OrigFileUploadBatch::getStatus, 4) .eq(OrigFileUploadBatch::getCallType, 1) .orderByAsc(OrigFileUploadBatch::getCreateTime)); if(CollUtil.isEmpty(list)){ return; } LinkedHashMap> map = new LinkedHashMap<>(); for (OrigFileUploadBatch origFileUploadBatch : list) { String uuid = origFileUploadBatch.getUuid(); LinkedList origFileUploadBatches = map.get(uuid); if(origFileUploadBatches == null){ origFileUploadBatches = new LinkedList<>(); map.put(uuid, origFileUploadBatches); } origFileUploadBatches.add(origFileUploadBatch); } for (String uuid : map.keySet()) { ScenePlus scenePlus = scenePlusService.getByUuid(uuid); boolean lock = redisLockUtil.lock("push-modeling-pre:" + scenePlus.getNum(), scenePlus.getNum(), 30*60); if(!lock){ continue; } try { //过滤出同一个场景且待推送的上传批次 Set batchIds = new HashSet<>(); StringBuilder batchIdStr = new StringBuilder(); for (OrigFileUploadBatch fileUploadBatch : map.get(uuid)) { if(uuid.equals(fileUploadBatch.getUuid())){ batchIds.add(fileUploadBatch.getBatchId()); batchIdStr.append(",").append(fileUploadBatch.getBatchId()); } } BuildSceneCallMessage mqMessage = JSON.parseObject(map.get(uuid).getLast().getMqContent(), BuildSceneCallMessage.class); mqMessage.getExt().put("batchId", batchIdStr.substring(1)); String threeCamType = (String) mqMessage.getExt().get("threeCamType"); //如果场景原始资源上传批次id不为空,则需要下载批次文件上传到oss目录 if(StrUtil.isNotEmpty(batchIdStr)){ for (String batchId : batchIdStr.substring(1).split(",")) { if(StrUtil.isNotEmpty(threeCamType) && "yzl".equals(threeCamType)){ this.downloadOrigFile4Yzl(batchId, mqMessage.getPath()); }else{ this.downloadOrigFile(batchId, mqMessage.getPath()); } } } rabbitMqProducer.sendByWorkQueue(queueModelingPre, mqMessage); long incr = redisUtil.incr("modeling-count:" + scenePlus.getNum(), 1); log.info("场景:{},计算推送次数:{}", scenePlus.getNum(), incr); }catch (Exception e){ log.error("推送场景计算失败,uuid:{}", uuid, e); } } log.info("推送计算定时任务结束"); } private void downloadOrigFile4Yzl(String batchId, String dataSource){ if(StrUtil.isEmpty(batchId)){ return; } OrigFileUpload sceneUpData = origFileUploadService.getByBatchIdAndFileName(batchId, "scene_up_data.txt"); String sceneUpDataPath = dataSource + "/" + "scene_up_data.txt"; HttpUtilExt.downloadFileAndCheck(sceneUpData.getFileUrl(), sceneUpDataPath, 60000); String sceneUpDataStr = FileUtil.readUtf8String(sceneUpDataPath); JSONArray fileJsonArray = JSON.parseArray(sceneUpDataStr); Map fileMap = fileJsonArray.stream().collect(Collectors.toMap(v -> ((JSONObject) v).getString("fileName"), v -> ((JSONObject) v).getString("filePath"))); List fileList = origFileUploadService.getByBatchId(batchId); if(CollUtil.isEmpty(fileList)){ return; } String homePath = "/oss/4dkankan/" + SceneUtil.getHomePath(dataSource); fileList.parallelStream().forEach(origFileUpload->{ String relativeFilePath = fileMap.get(origFileUpload.getFileName()); if(StrUtil.isNotEmpty(relativeFilePath)){ int times = 0; String filePath = homePath.concat(relativeFilePath); log.info("开始下载原始文件, url:{}, filePath:{}", origFileUpload.getFileUrl(), filePath); do{ try { ++times; HttpUtil.downloadFile(origFileUpload.getFileUrl(), new File(filePath), 3 * 60 * 1000); if(FileUtil.exist(filePath)){ break; } }catch (Exception e){ log.info("原始文件第{}次下载失败,fileUrl:{}, filePath:{}", times, origFileUpload.getFileUrl(), filePath); } }while (times < 4); log.info("下载原始文件结束, url:{}, filePath:{}", origFileUpload.getFileUrl(), filePath); } }); } private void downloadOrigFile(String batchId, String dataSource){ if(StrUtil.isEmpty(batchId)){ return; } List fileList = origFileUploadService.getByBatchId(batchId); if(CollUtil.isEmpty(fileList)){ return; } String homePath = "/oss/4dkankan/" + SceneUtil.getHomePath(dataSource); fileList.parallelStream().forEach(origFileUpload->{ int times = 0; String filePath = homePath.concat(origFileUpload.getFileName()); log.info("开始下载原始文件, url:{}, filePath:{}", origFileUpload.getFileUrl(), filePath); do{ try { ++times; HttpUtil.downloadFile(origFileUpload.getFileUrl(), new File(filePath), 3 * 60 * 1000); if(FileUtil.exist(filePath)){ break; } }catch (Exception e){ log.info("原始文件第{}次下载失败,fileUrl:{}, filePath:{}", times, origFileUpload.getFileUrl(), filePath); } }while (times < 3); log.info("下载原始文件结束, url:{}, filePath:{}", origFileUpload.getFileUrl(), filePath); }); } public static void main(String[] args) { List list = new ArrayList<>(); OrigFileUploadBatch a = new OrigFileUploadBatch(); a.setUuid("123"); OrigFileUploadBatch b = new OrigFileUploadBatch(); b.setUuid("123"); OrigFileUploadBatch c = new OrigFileUploadBatch(); c.setUuid("123"); list.add(a); list.add(b); list.add(c); for (OrigFileUploadBatch origFileUploadBatch : list) { String uuid = origFileUploadBatch.getUuid(); for (OrigFileUploadBatch fileUploadBatch : list) { if(uuid.equals(fileUploadBatch.getUuid())){ list.remove(fileUploadBatch); } } } } }