package com.fdkankan.mqcontroller.task; import cn.hutool.core.date.DateUnit; import cn.hutool.core.date.DateUtil; import cn.hutool.log.Log; import com.alibaba.fastjson.JSONObject; import com.fdkankan.mqcontroller.entity.*; import com.fdkankan.mqcontroller.service.*; import com.fdkankan.mqcontroller.utils.ECSUtils; import com.fdkankan.mqcontroller.utils.RabbitMqUtils; import com.fdkankan.mqcontroller.utils.RedisKey; import com.fdkankan.rabbitmq.util.RabbitMqProducer; import com.fdkankan.redis.util.RedisUtil; import lombok.extern.slf4j.Slf4j; import org.apache.commons.lang3.StringUtils; import org.springframework.beans.factory.FactoryBean; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.beans.factory.annotation.Value; import org.springframework.cloud.context.config.annotation.RefreshScope; import org.springframework.scheduling.annotation.Async; import org.springframework.stereotype.Service; import org.springframework.web.bind.annotation.PostMapping; import javax.annotation.PostConstruct; import java.sql.SQLOutput; import java.util.*; import java.util.concurrent.LinkedBlockingQueue; import java.util.stream.Collectors; @Service @RefreshScope @Slf4j public class TaskService { @Autowired RabbitMqProducer rabbitMqProducer; @Autowired IMqSendLogService mqSendLogService; @Autowired IMqQueueConfigService queueConfigService; @Autowired IMqScalingConfigService mqScalingConfigService; @Autowired IMqEcsService mqEcsService; @Autowired RedisUtil redisUtil; @Autowired IScenePlusService scenePlusService; @Autowired RabbitMqService rabbitMqService; public static Integer checkOpenCount = 0; private static final LinkedBlockingQueue delList = new LinkedBlockingQueue<>(); private static final HashMap> openMap = new HashMap<>(); public static HashMap> configLogsMap = new HashMap<>(); public static HashMap mqMsgMap = new HashMap<>(); public static HashMap countMap = new HashMap<>(); /** * 将数据库中待计算的mq消息推送,rabbitmq */ public void sendMq() { checkCount(); List mqSendLogs = mqSendLogService.getNoSendMsg(); if(mqSendLogs.isEmpty()){ return; } log.info("未分配的mq队列数:{}",mqSendLogs.size()); //设置分配队列 HashSet numList = new HashSet<>(); for (MqSendLog mqSendLog : mqSendLogs) { if(StringUtils.isBlank(mqSendLog.getContent())){ mqSendLogService.updateStatus(mqSendLog.getId(),2,null); continue; } Integer configId = scenePlusService.getQueueConfig(mqSendLog); if(numList.contains(mqSendLog.getNum())){ mqSendLogService.updateStatus(mqSendLog.getId(),2,null); continue; } if(configId == null ){ mqSendLogService.updateStatus(mqSendLog.getId(),3,null); continue; } mqSendLog.setConfigId(configId); numList.add(mqSendLog.getNum()); } List queueConfigList = queueConfigService.list(); rabbitMqService.getMqMsgMap(queueConfigList); for (MqQueueConfig config : queueConfigList) { List msgList = mqSendLogs.stream().filter(e -> e.getConfigId().equals(config.getId())).collect(Collectors.toList()); configLogsMap.put(config.getId(),msgList); } //是否需要重新组合队列 allocateQueue2(queueConfigList,mqSendLogs); for (MqQueueConfig mqQueueConfig : queueConfigList) { List msgList = configLogsMap.get(mqQueueConfig.getId()); if(msgList.isEmpty()){ continue; } MqMsg mqMsg = mqMsgMap.get(mqQueueConfig.getQueueName()); if(mqMsg == null){ log.info("获取mq队列数据失败:{}",mqQueueConfig); continue; } if(mqQueueConfig.getOpenScaling() == 0){ //不开启弹性伸缩 if(mqMsg.getConsumers() - mqMsg.getMessages_unacknowledged() - mqMsg.getMessages_ready() <= 0 ){ //待计算队列中有任务 continue; } } if(mqQueueConfig.getOpenScaling() == 1 ){ //开启弹性伸缩 checkOpenEcs(mqQueueConfig,msgList); } sendRabbitMq(msgList,mqMsg.getConsumers() - mqMsg.getMessages_unacknowledged() - mqMsg.getMessages_ready(),mqQueueConfig.getQueueName()); } } /** * 设置调配队列计算 * 只调配常驻 */ private void allocateQueue(List queueConfigList,List mqSendLogs){ //获取常驻队列 List residentList = queueConfigList.stream().filter(e -> e.getIsResident() == 1).collect(Collectors.toList()); if(residentList.size() != 1){ log.info("常驻队列未配置,请配置常驻队列"); return ; } Integer residentCount = 0; MqQueueConfig residentConfig = residentList.get(0); MqMsg mqMsg = mqMsgMap.get(residentConfig.getQueueName()); List residentLogList = configLogsMap.get(residentConfig.getId()); if(mqMsg!= null ){ residentCount = mqMsg.getConsumers() - mqMsg.getMessages_unacknowledged() - residentLogList.size() ; log.info("常驻队列空闲服务器数量为:{},{}",residentConfig.getQueueName(),residentCount); } //A空闲,B,C转A for (MqQueueConfig mqQueueConfig : queueConfigList) { List msgList = configLogsMap.get(mqQueueConfig.getId()); if(residentCount >0 && mqQueueConfig.getIsResident() !=1 && mqQueueConfig.getIsSpecial() !=1 && !msgList.isEmpty()){ Integer updateCount = updateMqSendLogConfig(residentCount, mqQueueConfig.getId(), residentConfig.getId(), mqSendLogs, queueConfigList); residentCount = residentCount - updateCount; } } } /** * 设置调配队列计算 * 调配A,B,C */ private void allocateQueue2(List queueConfigList,List mqSendLogs){ //获取常驻队列 for (MqQueueConfig residentConfig : queueConfigList) { // A ,B ,C if(residentConfig.getIsSpecial() == 1){ continue; } Integer residentCount = 0; MqMsg mqMsg = mqMsgMap.get(residentConfig.getQueueName()); List residentLogList = configLogsMap.get(residentConfig.getId()); if(mqMsg!= null ){ residentCount = mqMsg.getConsumers() - mqMsg.getMessages_unacknowledged() - residentLogList.size() ; log.info("allocateQueue2空闲服务器数量为:{},{}",residentConfig.getQueueName(),residentCount); } //A空闲,BC转A 。B空闲,C转B。C空闲,B转C for (MqQueueConfig mqQueueConfig : queueConfigList) { // B , C if(mqQueueConfig.getIsSpecial() == 1 || mqQueueConfig.getIsResident() == 1){ continue; } if(mqQueueConfig.getQueueName().equals(residentConfig.getQueueName())){ continue; } List msgList = configLogsMap.get(mqQueueConfig.getId()); if(residentCount >0 && !msgList.isEmpty()){ Integer updateCount = updateMqSendLogConfig(residentCount, mqQueueConfig.getId(), residentConfig.getId(), mqSendLogs, queueConfigList); residentCount = residentCount - updateCount; log.info("allocateQueue2-调配成功:{},to:{},count:{}",mqQueueConfig.getQueueName(),residentConfig.getQueueName(),updateCount); } } } } private Integer updateMqSendLogConfig(Integer residentCount,Integer configId ,Integer residentConfigId,List mqSendLogs,List queueConfigList){ Integer updateCount = 0; List msgList = configLogsMap.get(configId); for (int i = 0;i < residentCount ;i++){ if(i > msgList.size() -1){ continue; } MqSendLog mqSendLog = msgList.get(i); mqSendLog.setConfigId(residentConfigId); updateCount ++; } if(updateCount <=0){ return updateCount; } for (MqQueueConfig config : queueConfigList) { List msgList2 = mqSendLogs.stream().filter(e -> e.getConfigId().equals(config.getId())).collect(Collectors.toList()); configLogsMap.put(config.getId(),msgList2); } return updateCount; } private void checkCount() { if(checkOpenCount > 10000){ //一个W为一个循环 checkOpenCount = 0; } checkOpenCount ++; } private void sendRabbitMq(List msgList, Integer msgCount,String queueName) { for (int i = 0;i < msgCount ;i++){ if(i > msgList.size() -1){ continue; } MqSendLog mqSendLog = msgList.get(i); mqSendLogService.updateStatus(mqSendLog.getId(),1,queueName); rabbitMqProducer.sendByWorkQueue(queueName,JSONObject.parseObject(mqSendLog.getContent())); } } private void checkOpenEcs(MqQueueConfig mqQueueConfig,List msgList) { //获取未关闭的服务器 List list = mqEcsService.getNoStopByQueueName(mqQueueConfig.getQueueName()); countMap.put(mqQueueConfig.getQueueName(),list.size()); LinkedBlockingQueue openList = openMap.computeIfAbsent( mqQueueConfig.getQueueName(), k -> new LinkedBlockingQueue<>() ); Integer ecsCount = countMap.get(mqQueueConfig.getQueueName()); MqMsg mqMsg = mqMsgMap.get(mqQueueConfig.getQueueName()); //时间判断 Boolean flag = mqQueueConfig.getOpenScalingTime() == 0 || checkOpenCount % (mqQueueConfig.getOpenScalingTime() * 6) == 0; //未在计算的服务器数量 Integer noModelingCount = mqMsg.getConsumers() - mqMsg.getMessages_unacknowledged(); //未启动的服务器数量 Integer notStartCount = ecsCount - mqMsg.getConsumers(); Boolean flag2 = msgList.size() > mqQueueConfig.getScalingThreshold() + noModelingCount + notStartCount || (!msgList.isEmpty() && ecsCount + mqMsg.getConsumers() == 0 ); if(flag2 && flag){ log.info("将待开启弹性伸缩放入队列:{}",mqQueueConfig.getQueueName()); MqScalingConfig mqScalingConfig = mqScalingConfigService.getById(mqQueueConfig.getScalingConfigId()); DelEcsVo vo = new DelEcsVo(null,mqScalingConfig,mqQueueConfig.getQueueName()); openList.offer(vo); } } /** * 开启弹性伸缩 */ public void openEcsList() { try { HashMap queueMap = queueConfigService.getQueueMap(); for (Map.Entry> entry : openMap.entrySet()) { String key = entry.getKey(); LinkedBlockingQueue openList = entry.getValue(); if(openList.isEmpty()){ continue; } DelEcsVo take = openList.poll(); MqQueueConfig mqQueueConfig = queueMap.get(key); List msgList = configLogsMap.get(mqQueueConfig.getId()); if(msgList.isEmpty()){ log.info("openEcsList--待推送消息数为0,无需开启:{},{}",key,take.getQueueName()); continue; } countMap.merge(mqQueueConfig.getQueueName(), 1, Integer::sum); log.info("openEcsList--开启弹性伸缩数量:{},{}",key,1); List mqEcsList = mqEcsService.getNotUpEcs(); log.info("openEcsList--启动中的ecs数量为:{}",mqEcsList.size()); if(!mqEcsList.isEmpty()){ log.info("openEcsList--有启动中的ecs,等待启动完毕执行。。。"); openList.offer(take); return; } Boolean flag = createEcs( take.getMqScalingConfig()); if(flag){ mqEcsService.add(take.getQueueName()); sendRabbitMq(msgList,1,mqQueueConfig.getQueueName()); }else { openList.offer(take); } } }catch (Exception e){ log.info("openEcsList--开启弹性伸缩失败:",e); } } /** * 判断是否关闭弹性伸缩 */ public void checkDelEcs() { List mqEcsList = mqEcsService.getScalingNotStopList(); if(mqEcsList.isEmpty()){ return; } log.info("启动中的弹性伸缩数量为:{}",mqEcsList.size()); HashMap queueMap = queueConfigService.getQueueMap(); HashMap scalingMap = mqScalingConfigService.getIdMap(); for (MqEcs mqEcs : mqEcsList) { if(StringUtils.isBlank(mqEcs.getEcsName()) || StringUtils.isBlank(mqEcs.getQueueName())){ continue; } MqQueueConfig mqQueueConfig = queueMap.get(mqEcs.getQueueName()); if(mqQueueConfig == null || mqQueueConfig.getScalingConfigId() == null){ continue; } long between = DateUtil.between(mqEcs.getCreateTime(), new Date(), DateUnit.MINUTE); //弹性伸缩按照一个小时计费 long count = between/60; boolean flag = delList.stream().anyMatch(e -> e.getMqEcs().getEcsName().equals(mqEcs.getEcsName())); if(between >= mqQueueConfig.getStopScalingTime() + 60 * count && !flag){ List mqSendLogs = configLogsMap.get(mqQueueConfig.getId()); if(mqSendLogs == null){ continue; } MqMsg mqMsg = mqMsgMap.get(mqEcs.getQueueName()); if(mqMsg == null){ continue; } Integer notModelingCount = mqMsg.getConsumers() - mqMsg.getMessages_unacknowledged() - mqMsg.getMessages_ready(); if(mqSendLogs.size() - (notModelingCount + mqQueueConfig.getScalingThreshold()) >0 ){ log.info("待计算数量:{}大于空闲服务数量:{},阈值{},不关闭",mqSendLogs.size(),notModelingCount,mqQueueConfig.getScalingThreshold()); continue; } log.info("checkDelEcs-实例开启{}时间大于{}分钟,开始关闭:{}",between,mqQueueConfig.getStopScalingTime(),mqEcs.getEcsName()); DelEcsVo vo = new DelEcsVo(mqEcs,scalingMap.get(mqQueueConfig.getScalingConfigId()),null); delList.offer(vo); } } } /** * 关闭弹性伸缩 */ public void delEcsList() { try { if(delList.isEmpty()){ return; } log.info("delEcsList--关闭弹性伸缩数量:{}",delList.size()); DelEcsVo take = delList.poll(); if(take == null){ return; } String stopKey = String.format(RedisKey.ecsStopKey,take.getMqEcs().getEcsName()); redisUtil.set(stopKey,take.getMqEcs().getEcsName(),60 * 60 * 24); //设置计算暂停锁 String modelingKey = String.format(RedisKey.modelingKey,take.getMqEcs().getEcsName()); if(redisUtil.hasKey(modelingKey)){ log.info("delEcsList--有场景正在计算中:{},{}",modelingKey,redisUtil.get(modelingKey)); redisUtil.del(stopKey); return; } Boolean delFlag = delEcs(take.getMqScalingConfig(), take.getMqEcs().getEcsName()); if(delFlag){ log.info("checkDelEcs--关闭弹性伸缩实例成功:{}", take.getMqEcs().getEcsName()); mqEcsService.updateMqEcs(take.getMqEcs()); }else { delList.offer(take); } }catch (Exception e){ log.info("delEcsList--关闭弹性伸缩失败:",e); } } public synchronized Boolean createEcs(MqScalingConfig mqScaling){ try { return ECSUtils.createEcs(mqScaling.getAccessKey(),mqScaling.getSecret(),mqScaling.getEndpoint(),mqScaling.getScalingRuleAri()); }catch (Exception e){ log.info("触发弹性伸缩失败:",e); } return false; } public synchronized Boolean delEcs(MqScalingConfig mqScaling, String instanceId){ try { return ECSUtils.delEcs(mqScaling.getAccessKey(),mqScaling.getSecret(),mqScaling.getEndpoint(),mqScaling.getScalingGroupId(),instanceId); }catch (Exception e){ log.info("关闭弹性伸缩失败:",e); } return false; } }