浏览代码

添加默认队列,常驻队列判断

lyhzzz 1 年之前
父节点
当前提交
21fb9fedde

+ 0 - 2
src/main/java/com/fdkankan/mqcontroller/entity/DelEcsVo.java

@@ -14,6 +14,4 @@ public class DelEcsVo {
     private MqEcs mqEcs;
     private MqScalingConfig mqScalingConfig;
     private String queueName;
-    private Date putTime;
-    private List<MqSendLog> msgList;
 }

+ 7 - 0
src/main/java/com/fdkankan/mqcontroller/entity/MqQueueConfig.java

@@ -78,5 +78,12 @@ public class MqQueueConfig implements Serializable {
     @TableField("level")
     private String level;
 
+    @TableField("is_default")
+    private Integer isDefault;
 
+    @TableField("is_resident")
+    private Integer isResident;
+
+    @TableField("is_special")
+    private Integer isSpecial;
 }

+ 1 - 1
src/main/java/com/fdkankan/mqcontroller/service/IMqEcsService.java

@@ -19,7 +19,7 @@ public interface IMqEcsService extends IService<MqEcs> {
 
     List<MqEcs> getScalingNotStopList();
 
-    List<MqEcs> getNoModelingByQueueName(String queueName);
+    List<MqEcs> getNoStopByQueueName(String queueName);
 
     void updateMqEcs(MqEcs mqEcs);
 

+ 19 - 2
src/main/java/com/fdkankan/mqcontroller/service/impl/MqEcsServiceImpl.java

@@ -9,10 +9,12 @@ import com.fdkankan.mqcontroller.service.IMqEcsService;
 import com.baomidou.mybatisplus.extension.service.impl.ServiceImpl;
 import com.fdkankan.mqcontroller.utils.RedisKey;
 import com.fdkankan.redis.util.RedisUtil;
+import org.apache.commons.lang3.StringUtils;
 import org.springframework.beans.factory.annotation.Autowired;
 import org.springframework.stereotype.Service;
 import rx.internal.util.unsafe.MessagePassingQueue;
 
+import java.util.ArrayList;
 import java.util.List;
 
 /**
@@ -27,6 +29,8 @@ import java.util.List;
 public class MqEcsServiceImpl extends ServiceImpl<IMqEcsMapper, MqEcs> implements IMqEcsService {
 
 
+    @Autowired
+    RedisUtil redisUtil;
     @Override
     public void add(String queueName) {
         MqEcs mqEcs = new MqEcs();
@@ -43,12 +47,25 @@ public class MqEcsServiceImpl extends ServiceImpl<IMqEcsMapper, MqEcs> implement
     }
 
     @Override
-    public List<MqEcs> getNoModelingByQueueName(String queueName) {
+    public List<MqEcs> getNoStopByQueueName(String queueName) {
         LambdaQueryWrapper<MqEcs> wrapper = new LambdaQueryWrapper<>();
         wrapper.eq(MqEcs::getIsScaling,1);
         wrapper.eq(MqEcs::getStatus,0);
         wrapper.eq(MqEcs::getQueueName,queueName);
-        return this.list(wrapper);
+        List<MqEcs> list = this.list(wrapper);
+//        List<MqEcs> newList = new ArrayList<>();
+//        for (MqEcs mqEcs : list) {
+//            if(StringUtils.isBlank(mqEcs.getEcsName())){
+//                String redisStopKey = String.format(RedisKey.ecsStopKey,mqEcs.getEcsName());
+//                if(!redisUtil.hasKey(redisStopKey)){
+//                    newList.add(mqEcs);
+//                }
+//            }else {
+//                newList.add(mqEcs);
+//            }
+//        }
+
+        return list;
     }
 
 

+ 1 - 1
src/main/java/com/fdkankan/mqcontroller/service/impl/MqQueueConfigServiceImpl.java

@@ -32,7 +32,7 @@ public class MqQueueConfigServiceImpl extends ServiceImpl<IMqQueueConfigMapper,
     @Override
     public MqQueueConfig getDfConfig() {
         LambdaQueryWrapper<MqQueueConfig> wrapper = new LambdaQueryWrapper<>();
-        wrapper.eq(MqQueueConfig::getLevel,"A");
+        wrapper.eq(MqQueueConfig::getIsDefault,1);
         return this.getOne(wrapper);
     }
 

+ 39 - 0
src/main/java/com/fdkankan/mqcontroller/task/RabbitMqService.java

@@ -0,0 +1,39 @@
+package com.fdkankan.mqcontroller.task;
+
+import com.fdkankan.mqcontroller.entity.MqMsg;
+import com.fdkankan.mqcontroller.entity.MqQueueConfig;
+import com.fdkankan.mqcontroller.utils.RabbitMqUtils;
+import org.springframework.beans.factory.annotation.Value;
+import org.springframework.stereotype.Service;
+
+import java.util.HashMap;
+import java.util.List;
+
+@Service
+public class RabbitMqService {
+
+    @Value("${spring.rabbitmq.host}")
+    public String host;
+    @Value("${spring.rabbitmq.username}")
+    public String username;
+    @Value("${spring.rabbitmq.password}")
+    public String password;
+    @Value("${spring.rabbitmq.virtual-host}")
+    public String virtualHost;
+    @Value("${spring.rabbitmq.mgmt-url}")
+    public String mgmtUrl;
+    @Value("${spring.rabbitmq.mgmt-host}")
+    public String mgmtHost;
+
+    public MqMsg getRabbitMqMsg(String queueName) {
+        return RabbitMqUtils.getRabbitMqMsg(mgmtUrl+host+":"+mgmtHost,virtualHost,username,password,queueName);
+    }
+
+
+    public  void getMqMsgMap(List<MqQueueConfig> queueConfigList) {
+        for (MqQueueConfig mqQueueConfig : queueConfigList) {
+            MqMsg mqMsg = getRabbitMqMsg(mqQueueConfig.getQueueName());
+            TaskService.mqMsgMap.put(mqQueueConfig.getQueueName(),mqMsg);
+        }
+    }
+}

+ 7 - 7
src/main/java/com/fdkankan/mqcontroller/task/TaskController.java

@@ -15,7 +15,7 @@ public class TaskController {
     TaskService taskService;
 
     /**
-     * 每60秒执行一次,将数据库中待计算的mq消息推送,rabbitmq
+     * 将数据库中待计算的mq消息推送,rabbitmq
      */
     @Scheduled(initialDelay = 2000,fixedDelay = 1000 * 60 )
     public void taskSendMq(){
@@ -28,9 +28,9 @@ public class TaskController {
 
 
     /**
-     * 每90秒执行一次,判断是否关闭弹性伸缩
+     * 判断是否关闭弹性伸缩
      */
-    @Scheduled(initialDelay = 3000,fixedDelay = 1000 * 90 )
+    @Scheduled(initialDelay = 3000,fixedDelay = 1000 * 70 )
     public void checkDelEcs(){
         try {
             taskService.checkDelEcs();
@@ -41,9 +41,9 @@ public class TaskController {
 
 
     /**
-     * 每20秒执行一次,开启缓存弹性伸缩
+     * 开启缓存弹性伸缩
      */
-    @Scheduled(initialDelay = 4000,fixedDelay = 1000 * 20 )
+    @Scheduled(initialDelay = 4000,fixedDelay = 1000 * 35 )
     public void openEcsList(){
         try {
             taskService.openEcsList();
@@ -53,9 +53,9 @@ public class TaskController {
     }
 
     /**
-     * 每30秒执行一次,关闭缓存弹性伸缩
+     * 关闭缓存弹性伸缩
      */
-    @Scheduled(initialDelay = 5000,fixedDelay = 1000 * 30 )
+    @Scheduled(initialDelay = 5000,fixedDelay = 1000 * 15 )
     public void delEcsList(){
         try {
             taskService.delEcsList();

+ 91 - 26
src/main/java/com/fdkankan/mqcontroller/task/TaskService.java

@@ -18,7 +18,9 @@ import org.springframework.beans.factory.annotation.Value;
 import org.springframework.cloud.context.config.annotation.RefreshScope;
 import org.springframework.scheduling.annotation.Async;
 import org.springframework.stereotype.Service;
+import org.springframework.web.bind.annotation.PostMapping;
 
+import javax.annotation.PostConstruct;
 import java.sql.SQLOutput;
 import java.util.*;
 import java.util.concurrent.LinkedBlockingQueue;
@@ -43,12 +45,17 @@ public class TaskService {
     RedisUtil redisUtil;
     @Autowired
     IScenePlusService scenePlusService;
+    @Autowired
+    RabbitMqService rabbitMqService;
 
     public static Integer checkOpenCount = 0;
 
     private static final LinkedBlockingQueue<DelEcsVo> delList = new LinkedBlockingQueue<>();
     private static final HashMap<String,LinkedBlockingQueue<DelEcsVo>> openMap = new HashMap<>();
+    public static HashMap<Integer,List<MqSendLog>> configLogsMap = new HashMap<>();
+    public static HashMap<String,MqMsg> mqMsgMap = new HashMap<>();
 
+    public static HashMap<String,Integer> countMap = new HashMap<>();
 
     public void sendMq() {
         checkCount();
@@ -77,14 +84,25 @@ public class TaskService {
             mqSendLog.setConfigId(configId);
             numList.add(mqSendLog.getNum());
         }
-
         List<MqQueueConfig> queueConfigList = queueConfigService.list();
+        rabbitMqService.getMqMsgMap(queueConfigList);
+
+        for (MqQueueConfig config : queueConfigList) {
+            List<MqSendLog> msgList = mqSendLogs.stream().filter(e -> e.getConfigId().equals(config.getId())).collect(Collectors.toList());
+            configLogsMap.put(config.getId(),msgList);
+        }
+        setMqMsg(queueConfigList);
+        for (MqQueueConfig config : queueConfigList) {
+            List<MqSendLog> msgList = mqSendLogs.stream().filter(e -> e.getConfigId().equals(config.getId())).collect(Collectors.toList());
+            configLogsMap.put(config.getId(),msgList);
+        }
+
         for (MqQueueConfig mqQueueConfig : queueConfigList) {
-            List<MqSendLog> msgList = mqSendLogs.stream().filter(e -> e.getConfigId().equals(mqQueueConfig.getId())).collect(Collectors.toList());
+            List<MqSendLog> msgList = configLogsMap.get(mqQueueConfig.getId());
             if(msgList.isEmpty()){
                 continue;
             }
-            MqMsg mqMsg = getRabbitMqMsg(mqQueueConfig.getQueueName());
+            MqMsg mqMsg = mqMsgMap.get(mqQueueConfig.getQueueName());
             if(mqMsg == null){
                 log.info("获取mq队列数据失败:{}",mqQueueConfig);
                 continue;
@@ -94,14 +112,54 @@ public class TaskService {
                     continue;
                 }
             }
-            if(mqQueueConfig.getOpenScaling() == 1){    //开启弹性伸缩
-                checkOpenEcs(mqQueueConfig,msgList,mqMsg.getMessages_ready());
+            if(mqQueueConfig.getOpenScaling() == 1 ){    //开启弹性伸缩
+                checkOpenEcs(mqQueueConfig,msgList);
             }
             sendRabbitMq(msgList,mqMsg.getConsumers() - mqMsg.getMessages_unacknowledged() - mqMsg.getMessages_ready(),mqQueueConfig.getQueueName());
         }
 
     }
 
+    /**
+     * 设置调配队列计算
+     */
+    private void setMqMsg(List<MqQueueConfig> queueConfigList){
+        //获取常驻队列
+        List<MqQueueConfig> residentList = queueConfigList.stream().filter(e -> e.getIsResident() == 1).collect(Collectors.toList());
+        if(residentList.size() != 1){
+            log.info("常驻队列未配置,请配置常驻队列");
+            return;
+        }
+        Integer residentCount = 0;
+        MqQueueConfig residentConfig = residentList.get(0);
+        MqMsg mqMsg = mqMsgMap.get(residentConfig.getQueueName());
+        List<MqSendLog> residentLogList = configLogsMap.get(residentConfig.getId());
+        if(mqMsg!= null ){
+             residentCount = mqMsg.getConsumers() - mqMsg.getMessages_unacknowledged() - residentLogList.size() ;
+            log.info("常驻队列空闲服务器数量为:{},{}",residentConfig.getQueueName(),residentCount);
+        }
+        //获取默认队列
+
+        for (MqQueueConfig mqQueueConfig : queueConfigList) {
+            List<MqSendLog> msgList = configLogsMap.get(mqQueueConfig.getId());
+            if(residentCount >0 && mqQueueConfig.getIsResident() !=1 && mqQueueConfig.getIsSpecial() !=1 && !msgList.isEmpty()){
+                updateMqSendLogConfig(residentCount,mqQueueConfig.getId(),residentConfig.getId());
+            }
+        }
+    }
+
+    private void updateMqSendLogConfig(Integer residentCount,Integer configId ,Integer residentConfigId){
+        List<MqSendLog> msgList = configLogsMap.get(configId);
+        for (int i = 0;i < residentCount ;i++){
+            if(i > msgList.size() -1){
+                continue;
+            }
+            MqSendLog mqSendLog = msgList.get(i);
+            mqSendLog.setConfigId(residentConfigId);
+        }
+
+    }
+
     private void checkCount() {
         if(checkOpenCount > 10000){     //一个W为一个循环
             checkOpenCount = 0;
@@ -121,17 +179,36 @@ public class TaskService {
     }
 
 
-    private void checkOpenEcs(MqQueueConfig mqQueueConfig,List<MqSendLog> msgList,Integer readCount) {
-        List<MqEcs> list = mqEcsService.getNoModelingByQueueName(mqQueueConfig.getQueueName());
+    private void checkOpenEcs(MqQueueConfig mqQueueConfig,List<MqSendLog> msgList) {
+        //获取未关闭的服务器
+        List<MqEcs> list = mqEcsService.getNoStopByQueueName(mqQueueConfig.getQueueName());
+        countMap.put(mqQueueConfig.getQueueName(),list.size());
         LinkedBlockingQueue<DelEcsVo> openList = openMap.get(mqQueueConfig.getQueueName());
         if(openList == null){
             openList = new LinkedBlockingQueue<>();
             openMap.put(mqQueueConfig.getQueueName(),openList);
         }
-        if(msgList.size() + readCount > mqQueueConfig.getScalingThreshold() + list.size() + openList.size() && mqQueueConfig.getOpenScalingTime() % checkOpenCount == 0){
+
+        Integer ecsCount = countMap.get(mqQueueConfig.getQueueName());
+        MqMsg mqMsg = mqMsgMap.get(mqQueueConfig.getQueueName());
+        //默认队列
+        if(mqQueueConfig.getIsDefault() == 1 && ecsCount + openList.size()  == 0){
+            log.info("默认队列无服务计算中:{}",mqQueueConfig.getQueueName());
+            MqScalingConfig mqScalingConfig = mqScalingConfigService.getById(mqQueueConfig.getScalingConfigId());
+            DelEcsVo vo = new DelEcsVo(null,mqScalingConfig,mqQueueConfig.getQueueName());
+            openList.offer(vo);
+            return;
+        }
+        Boolean flag = mqQueueConfig.getOpenScalingTime() == 0 || checkOpenCount % mqQueueConfig.getOpenScalingTime() == 0;
+        //未在计算的服务器数量
+        Integer noModelingCount = mqMsg.getConsumers() - mqMsg.getMessages_unacknowledged();
+        //未启动的服务器数量
+        Integer notStartCount = ecsCount - mqMsg.getConsumers();
+
+        if(msgList.size()  >  mqQueueConfig.getScalingThreshold() + noModelingCount + notStartCount && flag){
             log.info("将待开启弹性伸缩放入队列:{}",mqQueueConfig.getQueueName());
             MqScalingConfig mqScalingConfig = mqScalingConfigService.getById(mqQueueConfig.getScalingConfigId());
-            DelEcsVo vo = new DelEcsVo(null,mqScalingConfig,mqQueueConfig.getQueueName(),new Date(),msgList);
+            DelEcsVo vo = new DelEcsVo(null,mqScalingConfig,mqQueueConfig.getQueueName());
             openList.offer(vo);
         }
     }
@@ -145,16 +222,19 @@ public class TaskService {
                     return;
                 }
                 DelEcsVo take = openList.poll();
-                List<MqSendLog> msgList = take.getMsgList();
                 MqQueueConfig mqQueueConfig = queueMap.get(key);
+                List<MqSendLog> msgList = configLogsMap.get(mqQueueConfig.getId());
                 if(msgList.size() <= mqQueueConfig.getScalingThreshold()){
                     log.info("openEcsList--待计算任务为:{}未超过阈值:{}无需开启弹性伸缩:{}",msgList.size(),mqQueueConfig.getScalingThreshold() ,key);
                     return;
                 }
+                countMap.put(mqQueueConfig.getQueueName(),countMap.get(mqQueueConfig.getQueueName()) +1);
+
                 log.info("openEcsList--开启弹性伸缩数量:{},{}",key,openList.size());
                 Boolean flag = createEcs( take.getMqScalingConfig());
                 if(flag){
                     mqEcsService.add(take.getQueueName());
+                    sendRabbitMq(msgList,1,mqQueueConfig.getQueueName());
                     Thread.sleep(1000L * 5);
                 }else {
                     openList.offer(take);
@@ -187,7 +267,7 @@ public class TaskService {
             boolean flag = delList.stream().anyMatch(e -> e.getMqEcs().getEcsName().equals(mqEcs.getEcsName()));
             if(between >= mqQueueConfig.getStopScalingTime() + 60 * count && !flag){
                 log.info("checkDelEcs-实例开启时间大于{}分钟,开始关闭:{}",mqQueueConfig.getStopScalingTime(),mqEcs.getEcsName());
-                DelEcsVo vo = new DelEcsVo(mqEcs,scalingMap.get(mqQueueConfig.getScalingConfigId()),null,new Date(),null);
+                DelEcsVo vo = new DelEcsVo(mqEcs,scalingMap.get(mqQueueConfig.getScalingConfigId()),null);
                 delList.offer(vo);
             }
         }
@@ -226,22 +306,7 @@ public class TaskService {
 
 
 
-    @Value("${spring.rabbitmq.host}")
-    public String host;
-    @Value("${spring.rabbitmq.username}")
-    public String username;
-    @Value("${spring.rabbitmq.password}")
-    public String password;
-    @Value("${spring.rabbitmq.virtual-host}")
-    public String virtualHost;
-    @Value("${spring.rabbitmq.mgmt-url}")
-    public String mgmtUrl;
-    @Value("${spring.rabbitmq.mgmt-host}")
-    public String mgmtHost;
 
-    public  MqMsg getRabbitMqMsg(String queueName) {
-        return RabbitMqUtils.getRabbitMqMsg(mgmtUrl+host+":"+mgmtHost,virtualHost,username,password,queueName);
-    }
 
 
     public synchronized Boolean createEcs(MqScalingConfig mqScaling){

+ 5 - 0
src/main/java/com/fdkankan/mqcontroller/utils/RabbitMqUtils.java

@@ -6,11 +6,15 @@ import cn.hutool.core.util.CharsetUtil;
 import cn.hutool.http.HttpRequest;
 import com.alibaba.fastjson.JSONObject;
 import com.fdkankan.mqcontroller.entity.MqMsg;
+import com.fdkankan.mqcontroller.entity.MqQueueConfig;
 import lombok.extern.slf4j.Slf4j;
 import org.apache.catalina.Host;
 import org.springframework.beans.factory.annotation.Value;
 import org.springframework.stereotype.Component;
 
+import java.util.HashMap;
+import java.util.List;
+
 @Slf4j
 public class RabbitMqUtils {
 
@@ -37,4 +41,5 @@ public class RabbitMqUtils {
         return null;
     }
 
+
 }

+ 3 - 0
src/main/resources/bootstrap-dev.yml

@@ -19,6 +19,9 @@ spring:
           - data-id: common-db-config.yaml
             group: DEFAULT_GROUP
             refresh: true
+          - data-id: common-redis-config.yaml
+            group: DEFAULT_GROUP
+            refresh: true
       discovery:
         server-addr: ${spring.cloud.nacos.config.server-addr}
         namespace: ${spring.cloud.nacos.config.namespace}

+ 3 - 0
src/main/resources/bootstrap-test.yml

@@ -19,6 +19,9 @@ spring:
           - data-id: common-db-config.yaml
             group: DEFAULT_GROUP
             refresh: true
+          - data-id: common-redis-config.yaml
+            group: DEFAULT_GROUP
+            refresh: true
       discovery:
         server-addr: ${spring.cloud.nacos.config.server-addr}
         namespace: ${spring.cloud.nacos.config.namespace}