package com.fdkankan.mqcontroller.task; import cn.hutool.core.date.DateUnit; import cn.hutool.core.date.DateUtil; import cn.hutool.log.Log; import com.fdkankan.mqcontroller.entity.*; import com.fdkankan.mqcontroller.service.IMqEcsService; import com.fdkankan.mqcontroller.service.IMqQueueConfigService; import com.fdkankan.mqcontroller.service.IMqScalingConfigService; import com.fdkankan.mqcontroller.service.IMqSendLogService; import com.fdkankan.mqcontroller.utils.ECSUtils; import com.fdkankan.mqcontroller.utils.RabbitMqUtils; import com.fdkankan.mqcontroller.utils.RedisKey; import com.fdkankan.rabbitmq.util.RabbitMqProducer; import com.fdkankan.redis.util.RedisUtil; import lombok.extern.slf4j.Slf4j; import org.apache.commons.lang3.StringUtils; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.beans.factory.annotation.Value; import org.springframework.cloud.context.config.annotation.RefreshScope; import org.springframework.scheduling.annotation.Async; import org.springframework.stereotype.Service; import java.sql.SQLOutput; import java.util.*; import java.util.concurrent.LinkedBlockingQueue; import java.util.stream.Collectors; @Service @RefreshScope @Slf4j public class TaskService { @Autowired RabbitMqProducer rabbitMqProducer; @Autowired IMqSendLogService mqSendLogService; @Autowired IMqQueueConfigService queueConfigService; @Autowired IMqScalingConfigService mqScalingConfigService; @Autowired IMqEcsService mqEcsService; @Autowired RedisUtil redisUtil; public static Integer checkOpenCount = 0; private static final LinkedBlockingQueue delList = new LinkedBlockingQueue<>(); private static final HashMap> openMap = new HashMap<>(); @Async public void sendMq() { checkCount(); List mqSendLogs = mqSendLogService.getNoSendMsg(); log.info("未分配的mq队列数:{}",mqSendLogs.size()); List queueConfigList = queueConfigService.list(); for (MqQueueConfig mqQueueConfig : queueConfigList) { List msgList = mqSendLogs.stream().filter(e -> e.getQueue().equals(mqQueueConfig.getQueueName())).collect(Collectors.toList()); if(msgList.isEmpty()){ continue; } MqMsg mqMsg = getRabbitMqMsg(mqQueueConfig.getQueueName()); if(mqMsg == null){ log.info("获取mq队列数据失败:{}",mqQueueConfig); continue; } if(mqQueueConfig.getOpenScaling() == 0){ //不开启弹性伸缩 if(mqMsg.getMessages_ready() >0){ //待计算队列中有任务 continue; } } if(mqQueueConfig.getOpenScaling() == 1){ //开启弹性伸缩 checkOpenEcs(mqQueueConfig,msgList.size(),mqMsg.getMessages_ready()); } sendRabbitMq(msgList,mqMsg.getConsumers() - mqMsg.getMessages_unacknowledged() - mqMsg.getMessages_ready()); } } private void checkCount() { if(checkOpenCount > 10000){ //一个W为一个循环 checkOpenCount = 0; } checkOpenCount ++; } private void sendRabbitMq(List msgList, Integer msgCount) { for (int i = 0;i < msgCount ;i++){ if(i > msgList.size() -1){ continue; } MqSendLog mqSendLog = msgList.get(i); mqSendLog.setStatus(1); mqSendLog.setUpdateTime(null); mqSendLogService.updateById(mqSendLog); rabbitMqProducer.sendByWorkQueue(mqSendLog.getQueue(),mqSendLog.getContent()); } } private void checkOpenEcs(MqQueueConfig mqQueueConfig,Integer msgCount,Integer readCount) { List list = mqEcsService.getNoModelingByQueueName(mqQueueConfig.getQueueName()); LinkedBlockingQueue openList = openMap.get(mqQueueConfig.getQueueName()); if(openList == null){ openList = new LinkedBlockingQueue<>(); openMap.put(mqQueueConfig.getQueueName(),openList); } if(msgCount + readCount > mqQueueConfig.getScalingThreshold() + list.size() + openList.size() && mqQueueConfig.getOpenScalingTime() % checkOpenCount == 0){ MqScalingConfig mqScalingConfig = mqScalingConfigService.getById(mqQueueConfig.getScalingConfigId()); DelEcsVo vo = new DelEcsVo(null,mqScalingConfig,mqQueueConfig.getQueueName(),new Date()); openList.offer(vo); } } @Async public void openEcsList() { try { HashMap queueMap = queueConfigService.getQueueMap(); for (String key : openMap.keySet()) { LinkedBlockingQueue openList = openMap.get(key); if(openList.isEmpty()){ return; } DelEcsVo take = openList.poll(); List msgList = mqSendLogService.getNoSendMsgByQueueName(key); MqQueueConfig mqQueueConfig = queueMap.get(key); if(msgList.size() <= mqQueueConfig.getScalingThreshold()){ log.info("openEcsList--待计算任务为:{}未超过阈值:{}无需开启弹性伸缩:{}",msgList.size(),mqQueueConfig.getScalingThreshold() ,key); return; } log.info("openEcsList--开启弹性伸缩数量:{},{}",key,openList.size()); Boolean flag = createEcs( take.getMqScalingConfig()); if(flag){ mqEcsService.add(take.getQueueName()); Thread.sleep(1000L * 5); }else { openList.offer(take); } } }catch (Exception e){ log.info("openEcsList--开启弹性伸缩失败:",e); } } public void checkDelEcs() { List mqEcsList = mqEcsService.getScalingNotStopList(); log.info("启动中的弹性伸缩数量为:{}",mqEcsList.size()); if(mqEcsList.isEmpty()){ return; } HashMap queueMap = queueConfigService.getQueueMap(); HashMap scalingMap = mqScalingConfigService.getIdMap(); for (MqEcs mqEcs : mqEcsList) { if(StringUtils.isBlank(mqEcs.getEcsName()) || StringUtils.isBlank(mqEcs.getQueueName())){ continue; } MqQueueConfig mqQueueConfig = queueMap.get(mqEcs.getQueueName()); if(mqQueueConfig == null || mqQueueConfig.getScalingConfigId() == null){ continue; } Long between = DateUtil.between(mqEcs.getCreateTime(), new Date(), DateUnit.MINUTE); //弹性伸缩按照一个小时计费 Long count = between/60; boolean flag = delList.stream().anyMatch(e -> e.getMqEcs().getEcsName().equals(mqEcs.getEcsName())); if(between >= mqQueueConfig.getStopScalingTime() + 60 * count && !flag){ log.info("checkDelEcs-实例开启时间大于{}分钟,开始关闭:{}",mqQueueConfig.getStopScalingTime(),mqEcs.getEcsName()); DelEcsVo vo = new DelEcsVo(mqEcs,scalingMap.get(mqQueueConfig.getScalingConfigId()),null,new Date()); delList.offer(vo); } } } @Async public void delEcsList() { try { log.info("delEcsList--关闭弹性伸缩数量:{}",delList.size()); if(delList.isEmpty()){ return; } DelEcsVo take = delList.poll(); if(take == null){ return; } String stopKey = String.format(RedisKey.ecsStopKey,take.getMqEcs().getEcsName()); redisUtil.set(stopKey,take.getMqEcs().getEcsName(),60 * 60 * 24); //设置计算暂停锁 String modelingKey = String.format(RedisKey.modelingKey,take.getMqEcs().getEcsName()); if(redisUtil.hasKey(modelingKey)){ redisUtil.del(stopKey); return; } Boolean delFlag = delEcs(take.getMqScalingConfig(), take.getMqEcs().getEcsName()); if(delFlag){ log.info("checkDelEcs--关闭弹性伸缩实例成功:{}", take.getMqEcs().getEcsName()); mqEcsService.updateMqEcs(take.getMqEcs()); } }catch (Exception e){ log.info("delEcsList--关闭弹性伸缩失败:",e); } } @Value("${spring.rabbitmq.host}") public String host; @Value("${spring.rabbitmq.username}") public String username; @Value("${spring.rabbitmq.password}") public String password; @Value("${spring.rabbitmq.virtual-host}") public String virtualHost; @Value("${spring.rabbitmq.mgmt-url}") public String mgmtUrl; @Value("${spring.rabbitmq.mgmt-host}") public String mgmtHost; public MqMsg getRabbitMqMsg(String queueName) { return RabbitMqUtils.getRabbitMqMsg(mgmtUrl+host+":"+mgmtHost,virtualHost,username,password,queueName); } public synchronized Boolean createEcs(MqScalingConfig mqScaling){ try { return ECSUtils.createEcs(mqScaling.getAccessKey(),mqScaling.getSecret(),mqScaling.getEndpoint(),mqScaling.getScalingRuleAri()); }catch (Exception e){ log.info("触发弹性伸缩失败:",e); } return false; } public synchronized Boolean delEcs(MqScalingConfig mqScaling, String instanceId){ try { return ECSUtils.delEcs(mqScaling.getAccessKey(),mqScaling.getSecret(),mqScaling.getEndpoint(),mqScaling.getScalingGroupId(),instanceId); }catch (Exception e){ log.info("关闭弹性伸缩失败:",e); } return false; } }