|
- package com.fdkankan.contro.mq.service.impl;
- import cn.hutool.core.collection.CollUtil;
- import cn.hutool.core.io.FileUtil;
- import cn.hutool.core.util.StrUtil;
- import cn.hutool.http.HttpUtil;
- import com.alibaba.fastjson.JSON;
- import com.alibaba.fastjson.JSONArray;
- import com.alibaba.fastjson.JSONObject;
- import com.baomidou.mybatisplus.core.conditions.query.LambdaQueryWrapper;
- import com.fdkankan.contro.entity.OrigFileUpload;
- import com.fdkankan.contro.entity.OrigFileUploadBatch;
- import com.fdkankan.contro.entity.ScenePlus;
- import com.fdkankan.contro.mq.service.CallBuildService;
- import com.fdkankan.contro.service.IOrigFileUploadBatchService;
- import com.fdkankan.contro.service.IOrigFileUploadService;
- import com.fdkankan.contro.service.IScenePlusService;
- import com.fdkankan.contro.util.HttpUtilExt;
- import com.fdkankan.model.utils.SceneUtil;
- import com.fdkankan.rabbitmq.bean.BuildSceneCallMessage;
- import com.fdkankan.rabbitmq.util.RabbitMqProducer;
- import com.fdkankan.redis.util.RedisLockUtil;
- import com.fdkankan.redis.util.RedisUtil;
- import lombok.extern.slf4j.Slf4j;
- import org.springframework.beans.factory.annotation.Autowired;
- import org.springframework.beans.factory.annotation.Value;
- import org.springframework.stereotype.Service;
- import javax.annotation.Resource;
- import java.io.File;
- import java.util.*;
- import java.util.stream.Collectors;
- @Slf4j
- @Service
- public class CallBuildServiceImpl implements CallBuildService {
- @Autowired
- private IOrigFileUploadBatchService origFileUploadBatchService;
- @Resource
- private RedisLockUtil redisLockUtil;
- @Autowired
- private IScenePlusService scenePlusService;
- @Autowired
- private IOrigFileUploadService origFileUploadService;
- @Resource
- private RabbitMqProducer rabbitMqProducer;
- @Value("${queue.modeling.modeling-pre}")
- private String queueModelingPre;
- @Resource
- private RedisUtil redisUtil;
- @Override
- public void pushPre() {
- log.info("推送计算定时任务开始");
- List<OrigFileUploadBatch> list = origFileUploadBatchService.list(
- new LambdaQueryWrapper<OrigFileUploadBatch>()
- .eq(OrigFileUploadBatch::getStatus, 4)
- .eq(OrigFileUploadBatch::getCallType, 1)
- .orderByAsc(OrigFileUploadBatch::getCreateTime));
- if(CollUtil.isEmpty(list)){
- return;
- }
- LinkedHashMap<String, LinkedList<OrigFileUploadBatch>> map = new LinkedHashMap<>();
- for (OrigFileUploadBatch origFileUploadBatch : list) {
- String uuid = origFileUploadBatch.getUuid();
- LinkedList<OrigFileUploadBatch> origFileUploadBatches = map.get(uuid);
- if(origFileUploadBatches == null){
- origFileUploadBatches = new LinkedList<>();
- map.put(uuid, origFileUploadBatches);
- }
- origFileUploadBatches.add(origFileUploadBatch);
- }
- for (String uuid : map.keySet()) {
- ScenePlus scenePlus = scenePlusService.getByUuid(uuid);
- boolean lock = redisLockUtil.lock("push-modeling-pre:" + scenePlus.getNum(), scenePlus.getNum(), 30*60);
- if(!lock){
- continue;
- }
- try {
- //过滤出同一个场景且待推送的上传批次
- Set<String> batchIds = new HashSet<>();
- StringBuilder batchIdStr = new StringBuilder();
- for (OrigFileUploadBatch fileUploadBatch : map.get(uuid)) {
- if(uuid.equals(fileUploadBatch.getUuid())){
- batchIds.add(fileUploadBatch.getBatchId());
- batchIdStr.append(",").append(fileUploadBatch.getBatchId());
- }
- }
- BuildSceneCallMessage mqMessage = JSON.parseObject(map.get(uuid).getLast().getMqContent(), BuildSceneCallMessage.class);
- mqMessage.getExt().put("batchId", batchIdStr.substring(1));
- String threeCamType = (String) mqMessage.getExt().get("threeCamType");
- //如果场景原始资源上传批次id不为空,则需要下载批次文件上传到oss目录
- if(StrUtil.isNotEmpty(batchIdStr)){
- for (String batchId : batchIdStr.substring(1).split(",")) {
- if(StrUtil.isNotEmpty(threeCamType) && "yzl".equals(threeCamType)){
- this.downloadOrigFile4Yzl(batchId, mqMessage.getPath());
- }else{
- this.downloadOrigFile(batchId, mqMessage.getPath());
- }
- }
- }
- rabbitMqProducer.sendByWorkQueue(queueModelingPre, mqMessage);
- long incr = redisUtil.incr("modeling-count:" + scenePlus.getNum(), 1);
- log.info("场景:{},计算推送次数:{}", scenePlus.getNum(), incr);
- }catch (Exception e){
- log.error("推送场景计算失败,uuid:{}", uuid, e);
- }
- }
- log.info("推送计算定时任务结束");
- }
- private void downloadOrigFile4Yzl(String batchId, String dataSource){
- if(StrUtil.isEmpty(batchId)){
- return;
- }
- OrigFileUpload sceneUpData = origFileUploadService.getByBatchIdAndFileName(batchId, "scene_up_data.txt");
- String sceneUpDataPath = dataSource + "/" + "scene_up_data.txt";
- HttpUtilExt.downloadFileAndCheck(sceneUpData.getFileUrl(), sceneUpDataPath, 60000);
- String sceneUpDataStr = FileUtil.readUtf8String(sceneUpDataPath);
- JSONArray fileJsonArray = JSON.parseArray(sceneUpDataStr);
- Map<String, String> fileMap = fileJsonArray.stream().collect(Collectors.toMap(v -> ((JSONObject) v).getString("fileName"), v -> ((JSONObject) v).getString("filePath")));
- List<OrigFileUpload> fileList = origFileUploadService.getByBatchId(batchId);
- if(CollUtil.isEmpty(fileList)){
- return;
- }
- String homePath = "/oss/4dkankan/" + SceneUtil.getHomePath(dataSource);
- fileList.parallelStream().forEach(origFileUpload->{
- String relativeFilePath = fileMap.get(origFileUpload.getFileName());
- if(StrUtil.isNotEmpty(relativeFilePath)){
- int times = 0;
- String filePath = homePath.concat(relativeFilePath);
- log.info("开始下载原始文件, url:{}, filePath:{}", origFileUpload.getFileUrl(), filePath);
- do{
- try {
- ++times;
- HttpUtil.downloadFile(origFileUpload.getFileUrl(), new File(filePath), 3 * 60 * 1000);
- if(FileUtil.exist(filePath)){
- break;
- }
- }catch (Exception e){
- log.info("原始文件第{}次下载失败,fileUrl:{}, filePath:{}", times, origFileUpload.getFileUrl(), filePath);
- }
- }while (times < 4);
- log.info("下载原始文件结束, url:{}, filePath:{}", origFileUpload.getFileUrl(), filePath);
- }
- });
- }
- private void downloadOrigFile(String batchId, String dataSource){
- if(StrUtil.isEmpty(batchId)){
- return;
- }
- List<OrigFileUpload> fileList = origFileUploadService.getByBatchId(batchId);
- if(CollUtil.isEmpty(fileList)){
- return;
- }
- String homePath = "/oss/4dkankan/" + SceneUtil.getHomePath(dataSource);
- fileList.parallelStream().forEach(origFileUpload->{
- int times = 0;
- String filePath = homePath.concat(origFileUpload.getFileName());
- log.info("开始下载原始文件, url:{}, filePath:{}", origFileUpload.getFileUrl(), filePath);
- do{
- try {
- ++times;
- HttpUtil.downloadFile(origFileUpload.getFileUrl(), new File(filePath), 3 * 60 * 1000);
- if(FileUtil.exist(filePath)){
- break;
- }
- }catch (Exception e){
- log.info("原始文件第{}次下载失败,fileUrl:{}, filePath:{}", times, origFileUpload.getFileUrl(), filePath);
- }
- }while (times < 3);
- log.info("下载原始文件结束, url:{}, filePath:{}", origFileUpload.getFileUrl(), filePath);
- });
- }
- public static void main(String[] args) {
- List<OrigFileUploadBatch> list = new ArrayList<>();
- OrigFileUploadBatch a = new OrigFileUploadBatch();
- a.setUuid("123");
- OrigFileUploadBatch b = new OrigFileUploadBatch();
- b.setUuid("123");
- OrigFileUploadBatch c = new OrigFileUploadBatch();
- c.setUuid("123");
- list.add(a);
- list.add(b);
- list.add(c);
- for (OrigFileUploadBatch origFileUploadBatch : list) {
- String uuid = origFileUploadBatch.getUuid();
- for (OrigFileUploadBatch fileUploadBatch : list) {
- if(uuid.equals(fileUploadBatch.getUuid())){
- list.remove(fileUploadBatch);
- }
- }
- }
- }
- }
|