浏览代码

D队列重复推送

lyhzzz 1 年之前
父节点
当前提交
9e4f94c707
共有 1 个文件被更改,包括 9 次插入37 次删除
  1. 9 37
      src/main/java/com/fdkankan/mqcontroller/task/TaskService.java

+ 9 - 37
src/main/java/com/fdkankan/mqcontroller/task/TaskService.java

@@ -66,6 +66,7 @@ public class TaskService {
         checkCount();
         List<MqSendLog> mqSendLogs = mqSendLogService.getNoSendMsg();
         if(mqSendLogs.isEmpty()){
+            configLogsMap.clear();
             return;
         }
         log.info("未分配的mq队列数:{}",mqSendLogs.size());
@@ -101,7 +102,7 @@ public class TaskService {
 
         for (MqQueueConfig mqQueueConfig : queueConfigList) {
             List<MqSendLog> msgList = configLogsMap.get(mqQueueConfig.getId());
-            if(msgList.isEmpty()){
+            if(msgList == null || msgList.isEmpty()){
                 continue;
             }
             MqMsg mqMsg = mqMsgMap.get(mqQueueConfig.getQueueName());
@@ -123,34 +124,6 @@ public class TaskService {
 
     }
 
-    /**
-     * 设置调配队列计算
-     * 只调配常驻
-     */
-    private void allocateQueue(List<MqQueueConfig> queueConfigList,List<MqSendLog> mqSendLogs){
-        //获取常驻队列
-        List<MqQueueConfig> residentList = queueConfigList.stream().filter(e -> e.getIsResident() == 1).collect(Collectors.toList());
-        if(residentList.size() != 1){
-            log.info("常驻队列未配置,请配置常驻队列");
-            return ;
-        }
-        Integer residentCount = 0;
-        MqQueueConfig residentConfig = residentList.get(0);
-        MqMsg mqMsg = mqMsgMap.get(residentConfig.getQueueName());
-        List<MqSendLog> residentLogList = configLogsMap.get(residentConfig.getId());
-        if(mqMsg!= null ){
-             residentCount = mqMsg.getConsumers() - mqMsg.getMessages_unacknowledged() - residentLogList.size() ;
-            log.info("常驻队列空闲服务器数量为:{},{}",residentConfig.getQueueName(),residentCount);
-        }
-        //A空闲,B,C转A
-        for (MqQueueConfig mqQueueConfig : queueConfigList) {
-            List<MqSendLog> msgList = configLogsMap.get(mqQueueConfig.getId());
-            if(residentCount >0 && mqQueueConfig.getIsResident() !=1 && mqQueueConfig.getIsSpecial() !=1 && !msgList.isEmpty()){
-                Integer updateCount = updateMqSendLogConfig(residentCount, mqQueueConfig.getId(), residentConfig.getId(), mqSendLogs, queueConfigList);
-                residentCount = residentCount - updateCount;
-            }
-        }
-    }
 
     /**
      * 设置调配队列计算
@@ -165,8 +138,9 @@ public class TaskService {
             Integer residentCount = 0;
             MqMsg mqMsg = mqMsgMap.get(residentConfig.getQueueName());
             List<MqSendLog> residentLogList = configLogsMap.get(residentConfig.getId());
+            Integer residentSize = residentLogList!=null ? residentLogList.size() :0;
             if(mqMsg!= null ){
-                residentCount = mqMsg.getConsumers() - mqMsg.getMessages_unacknowledged() - residentLogList.size() ;
+                residentCount = mqMsg.getConsumers() - mqMsg.getMessages_unacknowledged() - residentSize ;
                 log.info("allocateQueue2空闲服务器数量为:{},{}",residentConfig.getQueueName(),residentCount);
             }
             //A空闲,BC转A 。B空闲,C转B。C空闲,B转C
@@ -178,7 +152,7 @@ public class TaskService {
                     continue;
                 }
                 List<MqSendLog> msgList = configLogsMap.get(mqQueueConfig.getId());
-                if(residentCount >0 && !msgList.isEmpty()){
+                if(residentCount >0 && msgList!= null &&  !msgList.isEmpty()){
                     Integer updateCount = updateMqSendLogConfig(residentCount, mqQueueConfig.getId(), residentConfig.getId(), mqSendLogs, queueConfigList);
                     residentCount = residentCount - updateCount;
                     log.info("allocateQueue2-调配成功:{},to:{},count:{}",mqQueueConfig.getQueueName(),residentConfig.getQueueName(),updateCount);
@@ -277,7 +251,7 @@ public class TaskService {
                 DelEcsVo take = openList.poll();
                 MqQueueConfig mqQueueConfig = queueMap.get(key);
                 List<MqSendLog> msgList = configLogsMap.get(mqQueueConfig.getId());
-                if(msgList.isEmpty()){
+                if(msgList == null || msgList.isEmpty()){
                     log.info("openEcsList--待推送消息数为0,无需开启:{},{}",key,take.getQueueName());
                     continue;
                 }
@@ -331,16 +305,14 @@ public class TaskService {
             boolean flag = delList.stream().anyMatch(e -> e.getMqEcs().getEcsName().equals(mqEcs.getEcsName()));
             if(between >= mqQueueConfig.getStopScalingTime() + 60 * count && !flag){
                 List<MqSendLog> mqSendLogs = configLogsMap.get(mqQueueConfig.getId());
-                if(mqSendLogs == null){
-                    continue;
-                }
+                Integer mqSize = mqSendLogs != null ? mqSendLogs.size() :0;
                 MqMsg mqMsg = mqMsgMap.get(mqEcs.getQueueName());
                 if(mqMsg == null){
                     continue;
                 }
                 Integer notModelingCount = mqMsg.getConsumers() - mqMsg.getMessages_unacknowledged() - mqMsg.getMessages_ready();
-                if(mqSendLogs.size() - (notModelingCount + mqQueueConfig.getScalingThreshold()) >0 ){
-                    log.info("待计算数量:{}大于空闲服务数量:{},阈值{},不关闭",mqSendLogs.size(),notModelingCount,mqQueueConfig.getScalingThreshold());
+                if(mqSize - (notModelingCount + mqQueueConfig.getScalingThreshold()) >0 ){
+                    log.info("待计算数量:{}大于空闲服务数量:{},阈值{},不关闭",mqSize,notModelingCount,mqQueueConfig.getScalingThreshold());
                     continue;
                 }
                 log.info("checkDelEcs-实例开启{}时间大于{}分钟,开始关闭:{}",between,mqQueueConfig.getStopScalingTime(),mqEcs.getEcsName());