lyhzzz 5 ماه پیش
والد
کامیت
78ecd6fa77

+ 2 - 1
src/main/java/com/fdkankan/mqcontroller/entity/MqCameraLevel.java

@@ -53,5 +53,6 @@ public class MqCameraLevel implements Serializable {
     @TableField("update_time")
     private Date updateTime;
 
-
+    @TableField("type")
+    private String type;
 }

+ 1 - 0
src/main/java/com/fdkankan/mqcontroller/entity/MqEcs.java

@@ -57,5 +57,6 @@ public class MqEcs implements Serializable {
     private String recStatus;
 
     // 默认 4dkk,pano 全景看看
+    @TableField("type")
     private String type;
 }

+ 2 - 1
src/main/java/com/fdkankan/mqcontroller/entity/MqNumLevel.java

@@ -52,6 +52,7 @@ public class MqNumLevel implements Serializable {
 
     @TableField("update_time")
     private Date updateTime;
-
+    @TableField("type")
+    private String type;
 
 }

+ 2 - 0
src/main/java/com/fdkankan/mqcontroller/entity/MqQueueConfig.java

@@ -86,4 +86,6 @@ public class MqQueueConfig implements Serializable {
 
     @TableField("is_special")
     private Integer isSpecial;
+    @TableField("type")
+    private String type;
 }

+ 2 - 1
src/main/java/com/fdkankan/mqcontroller/entity/MqScalingConfig.java

@@ -59,5 +59,6 @@ public class MqScalingConfig implements Serializable {
     @TableField("endpoint")
     private String endpoint;
 
-
+    @TableField("type")
+    private String type;
 }

+ 1 - 0
src/main/java/com/fdkankan/mqcontroller/entity/MqSendLog.java

@@ -65,5 +65,6 @@ public class MqSendLog implements Serializable {
     private Integer configId;
 
     // 默认 4dkk pano 全景看看
+    @TableField("type")
     private String type;
 }

+ 1 - 1
src/main/java/com/fdkankan/mqcontroller/service/IMqCameraLevelService.java

@@ -13,5 +13,5 @@ import com.baomidou.mybatisplus.extension.service.IService;
  */
 public interface IMqCameraLevelService extends IService<MqCameraLevel> {
 
-    MqCameraLevel getByCameraId(Long cameraId);
+    MqCameraLevel getByCameraId(Long cameraId,String type);
 }

+ 1 - 1
src/main/java/com/fdkankan/mqcontroller/service/IMqNumLevelService.java

@@ -13,5 +13,5 @@ import com.baomidou.mybatisplus.extension.service.IService;
  */
 public interface IMqNumLevelService extends IService<MqNumLevel> {
 
-    MqNumLevel getByNum(String num);
+    MqNumLevel getByNum(String num,String type);
 }

+ 4 - 1
src/main/java/com/fdkankan/mqcontroller/service/IMqQueueConfigService.java

@@ -4,6 +4,7 @@ import com.fdkankan.mqcontroller.entity.MqQueueConfig;
 import com.baomidou.mybatisplus.extension.service.IService;
 
 import java.util.HashMap;
+import java.util.List;
 
 /**
  * <p>
@@ -17,7 +18,9 @@ public interface IMqQueueConfigService extends IService<MqQueueConfig> {
 
     HashMap<String, MqQueueConfig> getQueueMap();
 
-    MqQueueConfig getDfConfig();
+    MqQueueConfig getDfConfig(String type);
 
     MqQueueConfig getDfConfig128();
+
+    List<MqQueueConfig> getByType(String type);
 }

+ 2 - 1
src/main/java/com/fdkankan/mqcontroller/service/impl/MqCameraLevelServiceImpl.java

@@ -19,9 +19,10 @@ import org.springframework.stereotype.Service;
 public class MqCameraLevelServiceImpl extends ServiceImpl<IMqCameraLevelMapper, MqCameraLevel> implements IMqCameraLevelService {
 
     @Override
-    public MqCameraLevel getByCameraId(Long cameraId) {
+    public MqCameraLevel getByCameraId(Long cameraId,String type) {
         LambdaQueryWrapper<MqCameraLevel> wrapper = new LambdaQueryWrapper<>();
         wrapper.eq(MqCameraLevel::getCameraId,cameraId);
+        wrapper.eq(MqCameraLevel::getType,type);
         return this.getOne(wrapper);
     }
 }

+ 2 - 1
src/main/java/com/fdkankan/mqcontroller/service/impl/MqNumLevelServiceImpl.java

@@ -19,9 +19,10 @@ import org.springframework.stereotype.Service;
 public class MqNumLevelServiceImpl extends ServiceImpl<IMqNumLevelMapper, MqNumLevel> implements IMqNumLevelService {
 
     @Override
-    public MqNumLevel getByNum(String num) {
+    public MqNumLevel getByNum(String num,String type) {
         LambdaQueryWrapper<MqNumLevel> wrapper = new LambdaQueryWrapper<>();
         wrapper.eq(MqNumLevel::getNum,num);
+        wrapper.eq(MqNumLevel::getType,type);
         return this.getOne(wrapper);
     }
 }

+ 9 - 1
src/main/java/com/fdkankan/mqcontroller/service/impl/MqQueueConfigServiceImpl.java

@@ -30,9 +30,10 @@ public class MqQueueConfigServiceImpl extends ServiceImpl<IMqQueueConfigMapper,
     }
 
     @Override
-    public MqQueueConfig getDfConfig() {
+    public MqQueueConfig getDfConfig(String type) {
         LambdaQueryWrapper<MqQueueConfig> wrapper = new LambdaQueryWrapper<>();
         wrapper.eq(MqQueueConfig::getIsDefault,1);
+        wrapper.eq(MqQueueConfig::getType,type);
         return this.getOne(wrapper);
     }
 
@@ -42,4 +43,11 @@ public class MqQueueConfigServiceImpl extends ServiceImpl<IMqQueueConfigMapper,
         wrapper.eq(MqQueueConfig::getIsSpecial,1);
         return this.getOne(wrapper);
     }
+
+    @Override
+    public List<MqQueueConfig> getByType(String type) {
+        LambdaQueryWrapper<MqQueueConfig> wrapper = new LambdaQueryWrapper<>();
+        wrapper.eq(MqQueueConfig::getType,type);
+        return this.list(wrapper);
+    }
 }

+ 4 - 14
src/main/java/com/fdkankan/mqcontroller/service/impl/ScenePlusServiceImpl.java

@@ -37,32 +37,22 @@ public class ScenePlusServiceImpl extends ServiceImpl<IScenePlusMapper, ScenePlu
             if(scenePro == null){
                 return null;
             }
-            MqQueueConfig dfConfig = queueConfigService.getDfConfig();
+            MqQueueConfig dfConfig = queueConfigService.getDfConfig(mqSendLog.getType());
             if(dfConfig != null){
                 return dfConfig.getId();
             }
             return null;
         }
-        JSONObject jsonObject = JSONObject.parseObject(mqSendLog.getContent());
-        if(jsonObject.containsKey("ext")){
-            JSONObject ext  = jsonObject.getJSONObject("ext");
-            if(ext != null && ext.containsKey("128G") && ext.getInteger("128G") == 1){
-                MqQueueConfig dfConfig = queueConfigService.getDfConfig128();
-                if(dfConfig != null){
-                    return dfConfig.getId();
-                }
-            }
-        }
 
-        MqNumLevel mqNumLevel = mqNumLevelService.getByNum(num);
+        MqNumLevel mqNumLevel = mqNumLevelService.getByNum(num,mqSendLog.getType());
         if(mqNumLevel != null){
             return mqNumLevel.getQueueConfigId();
         }
-        MqCameraLevel mqCameraLevel = mqCameraLevelService.getByCameraId(scenePlus.getCameraId());
+        MqCameraLevel mqCameraLevel = mqCameraLevelService.getByCameraId(scenePlus.getCameraId(),mqSendLog.getType());
         if(mqCameraLevel != null){
             return mqCameraLevel.getQueueConfigId();
         }
-        MqQueueConfig dfConfig = queueConfigService.getDfConfig();
+        MqQueueConfig dfConfig = queueConfigService.getDfConfig(mqSendLog.getType());
         if(dfConfig != null){
             return dfConfig.getId();
         }

+ 12 - 5
src/main/java/com/fdkankan/mqcontroller/task/TaskController.java

@@ -13,18 +13,25 @@ public class TaskController {
 
     @Autowired
     TaskService taskService;
-
-
     @Scheduled(cron = "*/10 * * * * ?") // 每10秒执行一次(使用cron表达式)
+    public void run(){
+        sendMq();
+        openEcs();
+        checkDelEcs();
+        delEcs();
+    }
+
+   // @Scheduled(cron = "*/10 * * * * ?") // 每10秒执行一次(使用cron表达式)
     public void sendMq(){
         try {
             taskService.sendMq();
         }catch (Exception e){
             log.info("runTask:",e);
         }
+
     }
 
-    @Scheduled(cron = "*/30 * * * * ?") // 每30秒执行一次(使用cron表达式)
+    //@Scheduled(cron = "*/30 * * * * ?") // 每30秒执行一次(使用cron表达式)
     public void openEcs(){
         try {
             taskService.openEcsList();
@@ -33,7 +40,7 @@ public class TaskController {
         }
     }
 
-    @Scheduled(cron = "*/20 * * * * ?") // 每20秒执行一次(使用cron表达式)
+   // @Scheduled(cron = "*/20 * * * * ?") // 每20秒执行一次(使用cron表达式)
     public void checkDelEcs() {
         try {
             taskService.checkDelEcs();
@@ -41,7 +48,7 @@ public class TaskController {
             log.info("checkDelEcs:", e);
         }
     }
-    @Scheduled(cron = "*/5 * * * * ?") // 每5秒执行一次(使用cron表达式)
+    //@Scheduled(cron = "*/5 * * * * ?") // 每5秒执行一次(使用cron表达式)
     public void delEcs(){
         try {
             taskService.delEcsList();

+ 15 - 4
src/main/java/com/fdkankan/mqcontroller/task/TaskService.java

@@ -69,8 +69,19 @@ public class TaskService {
             configLogsMap.clear();
             return;
         }
-        log.info("未分配的mq队列数:{}",mqSendLogs.size());
 
+        List<MqSendLog> fdkkList = mqSendLogs.stream().filter(e -> "4dkk".equals(e.getType())).collect(Collectors.toList());
+        List<MqSendLog> panoList = mqSendLogs.stream().filter(e -> "pano".equals(e.getType())).collect(Collectors.toList());
+        if(!fdkkList.isEmpty()){
+            sendMqByType(fdkkList,"4dkk");
+        }
+        if(!panoList.isEmpty()){
+            sendMqByType(fdkkList,"pano");
+        }
+        log.info("未分配的mq队列数4dkk:{},pano:{}",fdkkList.size(),panoList.size());
+    }
+
+    private void sendMqByType(List<MqSendLog> mqSendLogs,String type){
         //设置分配队列
         HashSet<String> numList  = new HashSet<>();
         for (MqSendLog mqSendLog : mqSendLogs) {
@@ -90,7 +101,7 @@ public class TaskService {
             mqSendLog.setConfigId(configId);
             numList.add(mqSendLog.getNum());
         }
-        List<MqQueueConfig> queueConfigList = queueConfigService.list();
+        List<MqQueueConfig> queueConfigList = queueConfigService.getByType(type);
         rabbitMqService.getMqMsgMap(queueConfigList);
 
         for (MqQueueConfig config : queueConfigList) {
@@ -98,7 +109,7 @@ public class TaskService {
             configLogsMap.put(config.getId(),msgList);
         }
         //是否需要重新组合队列
-        allocateQueue2(queueConfigList,mqSendLogs);
+        allocateQueue2(queueConfigList,mqSendLogs,type);
 
         for (MqQueueConfig mqQueueConfig : queueConfigList) {
             List<MqSendLog> msgList = configLogsMap.get(mqQueueConfig.getId());
@@ -130,7 +141,7 @@ public class TaskService {
      * 设置调配队列计算
      * 调配A,B,C
      */
-    private void allocateQueue2(List<MqQueueConfig> queueConfigList,List<MqSendLog> mqSendLogs){
+    private void allocateQueue2(List<MqQueueConfig> queueConfigList,List<MqSendLog> mqSendLogs,String type){
         //获取常驻队列
         for (MqQueueConfig residentConfig : queueConfigList) { // A ,B ,C
             if(residentConfig.getIsSpecial() == 1){