Parcourir la source

空闲服务器调配设置

lyhzzz il y a 1 an
Parent
commit
cafdd64052
1 fichiers modifiés avec 59 ajouts et 24 suppressions
  1. 59 24
      src/main/java/com/fdkankan/mqcontroller/task/TaskService.java

+ 59 - 24
src/main/java/com/fdkankan/mqcontroller/task/TaskService.java

@@ -82,9 +82,6 @@ public class TaskService {
             return;
         }
         log.info("未分配的mq队列数:{}",mqSendLogs.size());
-        for (MqSendLog mqSendLog : mqSendLogs) {
-            redisUtil.set("mq-wait:queue:num:"+mqSendLog.getNum(),mqSendLog.getNum(),60);
-        }
 
         //设置分配队列
         HashSet<String> numList  = new HashSet<>();
@@ -113,13 +110,7 @@ public class TaskService {
             configLogsMap.put(config.getId(),msgList);
         }
         //是否需要重新组合队列
-        boolean flag = allocateQueue(queueConfigList);
-        if(flag){
-            for (MqQueueConfig config : queueConfigList) {
-                List<MqSendLog> msgList = mqSendLogs.stream().filter(e -> e.getConfigId().equals(config.getId())).collect(Collectors.toList());
-                configLogsMap.put(config.getId(),msgList);
-            }
-        }
+        allocateQueue2(queueConfigList,mqSendLogs);
 
         for (MqQueueConfig mqQueueConfig : queueConfigList) {
             List<MqSendLog> msgList = configLogsMap.get(mqQueueConfig.getId());
@@ -141,23 +132,20 @@ public class TaskService {
             }
             sendRabbitMq(msgList,mqMsg.getConsumers() - mqMsg.getMessages_unacknowledged() - mqMsg.getMessages_ready(),mqQueueConfig.getQueueName());
 
-            for (MqSendLog mqSendLog : mqSendLogs) {
-                redisUtil.del("mq-wait:queue:num:"+mqSendLog.getNum());
-            }
         }
 
     }
 
     /**
      * 设置调配队列计算
+     * 只调配常驻
      */
-    private boolean allocateQueue(List<MqQueueConfig> queueConfigList){
-        Boolean flag = false;
+    private void allocateQueue(List<MqQueueConfig> queueConfigList,List<MqSendLog> mqSendLogs){
         //获取常驻队列
         List<MqQueueConfig> residentList = queueConfigList.stream().filter(e -> e.getIsResident() == 1).collect(Collectors.toList());
         if(residentList.size() != 1){
             log.info("常驻队列未配置,请配置常驻队列");
-            return flag;
+            return ;
         }
         Integer residentCount = 0;
         MqQueueConfig residentConfig = residentList.get(0);
@@ -165,21 +153,56 @@ public class TaskService {
         List<MqSendLog> residentLogList = configLogsMap.get(residentConfig.getId());
         if(mqMsg!= null ){
              residentCount = mqMsg.getConsumers() - mqMsg.getMessages_unacknowledged() - residentLogList.size() ;
-            log.info("常驻队列空闲服务器数量为:{},{}",residentConfig.getQueueName(),mqMsg.getConsumers() - mqMsg.getMessages_unacknowledged());
+            log.info("常驻队列空闲服务器数量为:{},{}",residentConfig.getQueueName(),residentCount);
         }
-        //获取默认队列
-
+        //A空闲,B,C转A
         for (MqQueueConfig mqQueueConfig : queueConfigList) {
             List<MqSendLog> msgList = configLogsMap.get(mqQueueConfig.getId());
             if(residentCount >0 && mqQueueConfig.getIsResident() !=1 && mqQueueConfig.getIsSpecial() !=1 && !msgList.isEmpty()){
-                updateMqSendLogConfig(residentCount,mqQueueConfig.getId(),residentConfig.getId());
-                flag = true;
+                Integer updateCount = updateMqSendLogConfig(residentCount, mqQueueConfig.getId(), residentConfig.getId(), mqSendLogs, queueConfigList);
+                residentCount = residentCount - updateCount;
             }
         }
-        return flag;
     }
 
-    private void updateMqSendLogConfig(Integer residentCount,Integer configId ,Integer residentConfigId){
+    /**
+     * 设置调配队列计算
+     * 调配A,B,C
+     */
+    private void allocateQueue2(List<MqQueueConfig> queueConfigList,List<MqSendLog> mqSendLogs){
+        //获取常驻队列
+        for (MqQueueConfig residentConfig : queueConfigList) { // A ,B ,C
+            if(residentConfig.getIsSpecial() == 1){
+                continue;
+            }
+            Integer residentCount = 0;
+            MqMsg mqMsg = mqMsgMap.get(residentConfig.getQueueName());
+            List<MqSendLog> residentLogList = configLogsMap.get(residentConfig.getId());
+            if(mqMsg!= null ){
+                residentCount = mqMsg.getConsumers() - mqMsg.getMessages_unacknowledged() - residentLogList.size() ;
+                log.info("空闲服务器数量为:{},{}",residentConfig.getQueueName(),residentCount);
+            }
+            //A空闲,BC转A 。B空闲,C转B。C空闲,B转C
+            for (MqQueueConfig mqQueueConfig : queueConfigList) { // B , C
+                if(mqQueueConfig.getIsSpecial() == 1 || mqQueueConfig.getIsResident() == 1){
+                    continue;
+                }
+                if(mqQueueConfig.getQueueName().equals(residentConfig.getQueueName())){
+                    continue;
+                }
+                List<MqSendLog> msgList = configLogsMap.get(mqQueueConfig.getId());
+                if(residentCount >0 && !msgList.isEmpty()){
+                    Integer updateCount = updateMqSendLogConfig(residentCount, mqQueueConfig.getId(), residentConfig.getId(), mqSendLogs, queueConfigList);
+                    residentCount = residentCount - updateCount;
+                }
+            }
+        }
+
+
+    }
+
+    private Integer updateMqSendLogConfig(Integer residentCount,Integer configId ,Integer residentConfigId,List<MqSendLog> mqSendLogs,List<MqQueueConfig> queueConfigList){
+        Integer updateCount = 0;
         List<MqSendLog> msgList = configLogsMap.get(configId);
         for (int i = 0;i < residentCount ;i++){
             if(i > msgList.size() -1){
@@ -187,8 +210,16 @@ public class TaskService {
             }
             MqSendLog mqSendLog = msgList.get(i);
             mqSendLog.setConfigId(residentConfigId);
+            updateCount ++;
         }
-
+        if(updateCount <=0){
+            return updateCount;
+        }
+        for (MqQueueConfig config : queueConfigList) {
+            List<MqSendLog> msgList2 = mqSendLogs.stream().filter(e -> e.getConfigId().equals(config.getId())).collect(Collectors.toList());
+            configLogsMap.put(config.getId(),msgList2);
+        }
+        return updateCount;
     }
 
     private void checkCount() {
@@ -250,6 +281,10 @@ public class TaskService {
                 DelEcsVo take = openList.poll();
                 MqQueueConfig mqQueueConfig = queueMap.get(key);
                 List<MqSendLog> msgList = configLogsMap.get(mqQueueConfig.getId());
+                if(msgList.isEmpty()){
+                    log.info("openEcsList--待推送消息数为0,无需开启:{}",take.getQueueName());
+                    return;
+                }
                 countMap.merge(mqQueueConfig.getQueueName(), 1, Integer::sum);
 
                 log.info("openEcsList--开启弹性伸缩数量:{},{}",key,1);