package com.fdkankan.mqcontroller.task; import cn.hutool.core.date.DateUnit; import cn.hutool.core.date.DateUtil; import cn.hutool.log.Log; import com.alibaba.fastjson.JSONObject; import com.fdkankan.mqcontroller.entity.*; import com.fdkankan.mqcontroller.service.*; import com.fdkankan.mqcontroller.utils.ECSUtils; import com.fdkankan.mqcontroller.utils.RabbitMqUtils; import com.fdkankan.mqcontroller.utils.RedisKey; import com.fdkankan.rabbitmq.util.RabbitMqProducer; import com.fdkankan.redis.util.RedisUtil; import lombok.extern.slf4j.Slf4j; import org.apache.commons.lang3.StringUtils; import org.springframework.beans.factory.FactoryBean; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.beans.factory.annotation.Value; import org.springframework.cloud.context.config.annotation.RefreshScope; import org.springframework.scheduling.annotation.Async; import org.springframework.stereotype.Service; import org.springframework.web.bind.annotation.PostMapping; import javax.annotation.PostConstruct; import java.sql.SQLOutput; import java.util.*; import java.util.concurrent.LinkedBlockingQueue; import java.util.stream.Collectors; @Service @RefreshScope @Slf4j public class TaskService { @Autowired RabbitMqProducer rabbitMqProducer; @Autowired IMqSendLogService mqSendLogService; @Autowired IMqQueueConfigService queueConfigService; @Autowired IMqScalingConfigService mqScalingConfigService; @Autowired IMqEcsService mqEcsService; @Autowired RedisUtil redisUtil; @Autowired IScenePlusService scenePlusService; @Autowired RabbitMqService rabbitMqService; public static Integer checkOpenCount = 0; private static final LinkedBlockingQueue delList = new LinkedBlockingQueue<>(); private static final HashMap> openMap = new HashMap<>(); public static HashMap> configLogsMap = new HashMap<>(); public static HashMap mqMsgMap = new HashMap<>(); public static HashMap countMap = new HashMap<>(); public void runTask() { sendMq(); checkDelEcs(); } public void checkEcs() { openEcsList(); delEcsList(); } /** * 将数据库中待计算的mq消息推送,rabbitmq */ public void sendMq() { checkCount(); List mqSendLogs = mqSendLogService.getNoSendMsg(); if(mqSendLogs.isEmpty()){ return; } log.info("未分配的mq队列数:{}",mqSendLogs.size()); //设置分配队列 HashSet numList = new HashSet<>(); for (MqSendLog mqSendLog : mqSendLogs) { if(StringUtils.isBlank(mqSendLog.getContent())){ mqSendLogService.updateStatus(mqSendLog.getId(),2,null); continue; } Integer configId = scenePlusService.getQueueConfig(mqSendLog); if(numList.contains(mqSendLog.getNum())){ mqSendLogService.updateStatus(mqSendLog.getId(),2,null); continue; } if(configId == null ){ mqSendLogService.updateStatus(mqSendLog.getId(),3,null); continue; } mqSendLog.setConfigId(configId); numList.add(mqSendLog.getNum()); } List queueConfigList = queueConfigService.list(); rabbitMqService.getMqMsgMap(queueConfigList); for (MqQueueConfig config : queueConfigList) { List msgList = mqSendLogs.stream().filter(e -> e.getConfigId().equals(config.getId())).collect(Collectors.toList()); configLogsMap.put(config.getId(),msgList); } //是否需要重新组合队列 boolean flag = allocateQueue(queueConfigList); if(flag){ for (MqQueueConfig config : queueConfigList) { List msgList = mqSendLogs.stream().filter(e -> e.getConfigId().equals(config.getId())).collect(Collectors.toList()); configLogsMap.put(config.getId(),msgList); } } for (MqQueueConfig mqQueueConfig : queueConfigList) { List msgList = configLogsMap.get(mqQueueConfig.getId()); if(msgList.isEmpty()){ continue; } MqMsg mqMsg = mqMsgMap.get(mqQueueConfig.getQueueName()); if(mqMsg == null){ log.info("获取mq队列数据失败:{}",mqQueueConfig); continue; } if(mqQueueConfig.getOpenScaling() == 0){ //不开启弹性伸缩 if(mqMsg.getMessages_ready() >0){ //待计算队列中有任务 continue; } } if(mqQueueConfig.getOpenScaling() == 1 ){ //开启弹性伸缩 checkOpenEcs(mqQueueConfig,msgList); } sendRabbitMq(msgList,mqMsg.getConsumers() - mqMsg.getMessages_unacknowledged() - mqMsg.getMessages_ready(),mqQueueConfig.getQueueName()); } } /** * 设置调配队列计算 */ private boolean allocateQueue(List queueConfigList){ Boolean flag = false; //获取常驻队列 List residentList = queueConfigList.stream().filter(e -> e.getIsResident() == 1).collect(Collectors.toList()); if(residentList.size() != 1){ log.info("常驻队列未配置,请配置常驻队列"); return flag; } Integer residentCount = 0; MqQueueConfig residentConfig = residentList.get(0); MqMsg mqMsg = mqMsgMap.get(residentConfig.getQueueName()); List residentLogList = configLogsMap.get(residentConfig.getId()); if(mqMsg!= null ){ residentCount = mqMsg.getConsumers() - mqMsg.getMessages_unacknowledged() - residentLogList.size() ; log.info("常驻队列空闲服务器数量为:{},{}",residentConfig.getQueueName(),residentCount); } //获取默认队列 for (MqQueueConfig mqQueueConfig : queueConfigList) { List msgList = configLogsMap.get(mqQueueConfig.getId()); if(residentCount >0 && mqQueueConfig.getIsResident() !=1 && mqQueueConfig.getIsSpecial() !=1 && !msgList.isEmpty()){ updateMqSendLogConfig(residentCount,mqQueueConfig.getId(),residentConfig.getId()); flag = true; } } return flag; } private void updateMqSendLogConfig(Integer residentCount,Integer configId ,Integer residentConfigId){ List msgList = configLogsMap.get(configId); for (int i = 0;i < residentCount ;i++){ if(i > msgList.size() -1){ continue; } MqSendLog mqSendLog = msgList.get(i); mqSendLog.setConfigId(residentConfigId); } } private void checkCount() { if(checkOpenCount > 10000){ //一个W为一个循环 checkOpenCount = 0; } checkOpenCount ++; } private void sendRabbitMq(List msgList, Integer msgCount,String queueName) { for (int i = 0;i < msgCount ;i++){ if(i > msgList.size() -1){ continue; } MqSendLog mqSendLog = msgList.get(i); mqSendLogService.updateStatus(mqSendLog.getId(),1,queueName); rabbitMqProducer.sendByWorkQueue(queueName,JSONObject.parseObject(mqSendLog.getContent())); } } private void checkOpenEcs(MqQueueConfig mqQueueConfig,List msgList) { //获取未关闭的服务器 List list = mqEcsService.getNoStopByQueueName(mqQueueConfig.getQueueName()); countMap.put(mqQueueConfig.getQueueName(),list.size()); LinkedBlockingQueue openList = openMap.computeIfAbsent( mqQueueConfig.getQueueName(), k -> new LinkedBlockingQueue<>() ); Integer ecsCount = countMap.get(mqQueueConfig.getQueueName()); MqMsg mqMsg = mqMsgMap.get(mqQueueConfig.getQueueName()); Boolean flag = mqQueueConfig.getOpenScalingTime() == 0 || checkOpenCount % mqQueueConfig.getOpenScalingTime() == 0; //未在计算的服务器数量 Integer noModelingCount = mqMsg.getConsumers() - mqMsg.getMessages_unacknowledged(); //未启动的服务器数量 Integer notStartCount = ecsCount - mqMsg.getConsumers(); if(msgList.size() > mqQueueConfig.getScalingThreshold() + noModelingCount + notStartCount && flag){ log.info("将待开启弹性伸缩放入队列:{}",mqQueueConfig.getQueueName()); MqScalingConfig mqScalingConfig = mqScalingConfigService.getById(mqQueueConfig.getScalingConfigId()); DelEcsVo vo = new DelEcsVo(null,mqScalingConfig,mqQueueConfig.getQueueName()); openList.offer(vo); } } /** * 开启弹性伸缩 */ public void openEcsList() { try { HashMap queueMap = queueConfigService.getQueueMap(); for (Map.Entry> entry : openMap.entrySet()) { String key = entry.getKey(); LinkedBlockingQueue openList = entry.getValue(); if(openList.isEmpty()){ return; } DelEcsVo take = openList.poll(); MqQueueConfig mqQueueConfig = queueMap.get(key); List msgList = configLogsMap.get(mqQueueConfig.getId()); countMap.merge(mqQueueConfig.getQueueName(), 1, Integer::sum); log.info("openEcsList--开启弹性伸缩数量:{},{}",key,openList.size()); List mqEcsList = mqEcsService.getNotUpEcs(); log.info("openEcsList--启动中的ecs数量为:{}",mqEcsList.size()); if(!mqEcsList.isEmpty()){ log.info("openEcsList--有启动中的ecs,等待启动完毕执行。。。"); openList.offer(take); return; } Boolean flag = createEcs( take.getMqScalingConfig()); if(flag){ mqEcsService.add(take.getQueueName()); sendRabbitMq(msgList,1,mqQueueConfig.getQueueName()); Thread.sleep(5000L); }else { openList.offer(take); } } }catch (Exception e){ log.info("openEcsList--开启弹性伸缩失败:",e); } } /** * 判断是否关闭弹性伸缩 */ public void checkDelEcs() { List mqEcsList = mqEcsService.getScalingNotStopList(); if(mqEcsList.isEmpty()){ return; } log.info("启动中的弹性伸缩数量为:{}",mqEcsList.size()); HashMap queueMap = queueConfigService.getQueueMap(); HashMap scalingMap = mqScalingConfigService.getIdMap(); for (MqEcs mqEcs : mqEcsList) { if(StringUtils.isBlank(mqEcs.getEcsName()) || StringUtils.isBlank(mqEcs.getQueueName())){ continue; } MqQueueConfig mqQueueConfig = queueMap.get(mqEcs.getQueueName()); if(mqQueueConfig == null || mqQueueConfig.getScalingConfigId() == null){ continue; } long between = DateUtil.between(mqEcs.getCreateTime(), new Date(), DateUnit.MINUTE); //弹性伸缩按照一个小时计费 long count = between/60; boolean flag = delList.stream().anyMatch(e -> e.getMqEcs().getEcsName().equals(mqEcs.getEcsName())); if(between >= mqQueueConfig.getStopScalingTime() + 60 * count && !flag){ log.info("checkDelEcs-实例开启时间大于{}分钟,开始关闭:{}",mqQueueConfig.getStopScalingTime(),mqEcs.getEcsName()); DelEcsVo vo = new DelEcsVo(mqEcs,scalingMap.get(mqQueueConfig.getScalingConfigId()),null); delList.offer(vo); } } } /** * 关闭弹性伸缩 */ public void delEcsList() { try { if(delList.isEmpty()){ return; } log.info("delEcsList--关闭弹性伸缩数量:{}",delList.size()); DelEcsVo take = delList.poll(); if(take == null){ return; } String stopKey = String.format(RedisKey.ecsStopKey,take.getMqEcs().getEcsName()); redisUtil.set(stopKey,take.getMqEcs().getEcsName(),60 * 60 * 24); //设置计算暂停锁 String modelingKey = String.format(RedisKey.modelingKey,take.getMqEcs().getEcsName()); if(redisUtil.hasKey(modelingKey)){ redisUtil.del(stopKey); return; } Boolean delFlag = delEcs(take.getMqScalingConfig(), take.getMqEcs().getEcsName()); if(delFlag){ log.info("checkDelEcs--关闭弹性伸缩实例成功:{}", take.getMqEcs().getEcsName()); mqEcsService.updateMqEcs(take.getMqEcs()); }else { delList.offer(take); } }catch (Exception e){ log.info("delEcsList--关闭弹性伸缩失败:",e); } } public synchronized Boolean createEcs(MqScalingConfig mqScaling){ try { return ECSUtils.createEcs(mqScaling.getAccessKey(),mqScaling.getSecret(),mqScaling.getEndpoint(),mqScaling.getScalingRuleAri()); }catch (Exception e){ log.info("触发弹性伸缩失败:",e); } return false; } public synchronized Boolean delEcs(MqScalingConfig mqScaling, String instanceId){ try { return ECSUtils.delEcs(mqScaling.getAccessKey(),mqScaling.getSecret(),mqScaling.getEndpoint(),mqScaling.getScalingGroupId(),instanceId); }catch (Exception e){ log.info("关闭弹性伸缩失败:",e); } return false; } }