ソースを参照

B队列没有消费者处理

lyhzzz 1 年間 前
コミット
7eeb736d6e
1 ファイル変更13 行追加10 行削除
  1. 13 10
      src/main/java/com/fdkankan/mqcontroller/task/TaskService.java

+ 13 - 10
src/main/java/com/fdkankan/mqcontroller/task/TaskService.java

@@ -13,6 +13,7 @@ import com.fdkankan.rabbitmq.util.RabbitMqProducer;
 import com.fdkankan.redis.util.RedisUtil;
 import lombok.extern.slf4j.Slf4j;
 import org.apache.commons.lang3.StringUtils;
+import org.springframework.beans.factory.FactoryBean;
 import org.springframework.beans.factory.annotation.Autowired;
 import org.springframework.beans.factory.annotation.Value;
 import org.springframework.cloud.context.config.annotation.RefreshScope;
@@ -91,10 +92,13 @@ public class TaskService {
             List<MqSendLog> msgList = mqSendLogs.stream().filter(e -> e.getConfigId().equals(config.getId())).collect(Collectors.toList());
             configLogsMap.put(config.getId(),msgList);
         }
-        setMqMsg(queueConfigList);
-        for (MqQueueConfig config : queueConfigList) {
-            List<MqSendLog> msgList = mqSendLogs.stream().filter(e -> e.getConfigId().equals(config.getId())).collect(Collectors.toList());
-            configLogsMap.put(config.getId(),msgList);
+        //是否需要重新组合队列
+        boolean flag = allocateQueue(queueConfigList);
+        if(flag){
+            for (MqQueueConfig config : queueConfigList) {
+                List<MqSendLog> msgList = mqSendLogs.stream().filter(e -> e.getConfigId().equals(config.getId())).collect(Collectors.toList());
+                configLogsMap.put(config.getId(),msgList);
+            }
         }
 
         for (MqQueueConfig mqQueueConfig : queueConfigList) {
@@ -123,12 +127,13 @@ public class TaskService {
     /**
      * 设置调配队列计算
      */
-    private void setMqMsg(List<MqQueueConfig> queueConfigList){
+    private boolean allocateQueue(List<MqQueueConfig> queueConfigList){
+        Boolean flag = false;
         //获取常驻队列
         List<MqQueueConfig> residentList = queueConfigList.stream().filter(e -> e.getIsResident() == 1).collect(Collectors.toList());
         if(residentList.size() != 1){
             log.info("常驻队列未配置,请配置常驻队列");
-            return;
+            return flag;
         }
         Integer residentCount = 0;
         MqQueueConfig residentConfig = residentList.get(0);
@@ -144,8 +149,10 @@ public class TaskService {
             List<MqSendLog> msgList = configLogsMap.get(mqQueueConfig.getId());
             if(residentCount >0 && mqQueueConfig.getIsResident() !=1 && mqQueueConfig.getIsSpecial() !=1 && !msgList.isEmpty()){
                 updateMqSendLogConfig(residentCount,mqQueueConfig.getId(),residentConfig.getId());
+                flag = true;
             }
         }
+        return flag;
     }
 
     private void updateMqSendLogConfig(Integer residentCount,Integer configId ,Integer residentConfigId){
@@ -224,10 +231,6 @@ public class TaskService {
                 DelEcsVo take = openList.poll();
                 MqQueueConfig mqQueueConfig = queueMap.get(key);
                 List<MqSendLog> msgList = configLogsMap.get(mqQueueConfig.getId());
-                if(msgList.size() <= mqQueueConfig.getScalingThreshold()){
-                    log.info("openEcsList--待计算任务为:{}未超过阈值:{}无需开启弹性伸缩:{}",msgList.size(),mqQueueConfig.getScalingThreshold() ,key);
-                    return;
-                }
                 countMap.put(mqQueueConfig.getQueueName(),countMap.get(mqQueueConfig.getQueueName()) +1);
 
                 log.info("openEcsList--开启弹性伸缩数量:{},{}",key,openList.size());