123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403 |
- package com.fdkankan.mqcontroller.task;
- import cn.hutool.core.date.DateUnit;
- import cn.hutool.core.date.DateUtil;
- import cn.hutool.log.Log;
- import com.alibaba.fastjson.JSONObject;
- import com.fdkankan.mqcontroller.entity.*;
- import com.fdkankan.mqcontroller.service.*;
- import com.fdkankan.mqcontroller.utils.ECSUtils;
- import com.fdkankan.mqcontroller.utils.RabbitMqUtils;
- import com.fdkankan.mqcontroller.utils.RedisKey;
- import com.fdkankan.rabbitmq.util.RabbitMqProducer;
- import com.fdkankan.redis.util.RedisUtil;
- import lombok.extern.slf4j.Slf4j;
- import org.apache.commons.lang3.StringUtils;
- import org.springframework.beans.factory.FactoryBean;
- import org.springframework.beans.factory.annotation.Autowired;
- import org.springframework.beans.factory.annotation.Value;
- import org.springframework.cloud.context.config.annotation.RefreshScope;
- import org.springframework.scheduling.annotation.Async;
- import org.springframework.stereotype.Service;
- import org.springframework.web.bind.annotation.PostMapping;
- import javax.annotation.PostConstruct;
- import java.sql.SQLOutput;
- import java.util.*;
- import java.util.concurrent.LinkedBlockingQueue;
- import java.util.stream.Collectors;
- @Service
- @RefreshScope
- @Slf4j
- public class TaskService {
- @Autowired
- RabbitMqProducer rabbitMqProducer;
- @Autowired
- IMqSendLogService mqSendLogService;
- @Autowired
- IMqQueueConfigService queueConfigService;
- @Autowired
- IMqScalingConfigService mqScalingConfigService;
- @Autowired
- IMqEcsService mqEcsService;
- @Autowired
- RedisUtil redisUtil;
- @Autowired
- IScenePlusService scenePlusService;
- @Autowired
- RabbitMqService rabbitMqService;
- public static Integer checkOpenCount = 0;
- private static final LinkedBlockingQueue<DelEcsVo> delList = new LinkedBlockingQueue<>();
- private static final HashMap<String,LinkedBlockingQueue<DelEcsVo>> openMap = new HashMap<>();
- public static HashMap<Integer,List<MqSendLog>> configLogsMap = new HashMap<>();
- public static HashMap<String,MqMsg> mqMsgMap = new HashMap<>();
- public static HashMap<String,Integer> countMap = new HashMap<>();
- /**
- * 将数据库中待计算的mq消息推送,rabbitmq
- */
- public void sendMq() {
- checkCount();
- List<MqSendLog> mqSendLogs = mqSendLogService.getNoSendMsg();
- if(mqSendLogs.isEmpty()){
- return;
- }
- log.info("未分配的mq队列数:{}",mqSendLogs.size());
- //设置分配队列
- HashSet<String> numList = new HashSet<>();
- for (MqSendLog mqSendLog : mqSendLogs) {
- if(StringUtils.isBlank(mqSendLog.getContent())){
- mqSendLogService.updateStatus(mqSendLog.getId(),2,null);
- continue;
- }
- Integer configId = scenePlusService.getQueueConfig(mqSendLog);
- if(numList.contains(mqSendLog.getNum())){
- mqSendLogService.updateStatus(mqSendLog.getId(),2,null);
- continue;
- }
- if(configId == null ){
- mqSendLogService.updateStatus(mqSendLog.getId(),3,null);
- continue;
- }
- mqSendLog.setConfigId(configId);
- numList.add(mqSendLog.getNum());
- }
- List<MqQueueConfig> queueConfigList = queueConfigService.list();
- rabbitMqService.getMqMsgMap(queueConfigList);
- for (MqQueueConfig config : queueConfigList) {
- List<MqSendLog> msgList = mqSendLogs.stream().filter(e -> e.getConfigId().equals(config.getId())).collect(Collectors.toList());
- configLogsMap.put(config.getId(),msgList);
- }
- //是否需要重新组合队列
- allocateQueue2(queueConfigList,mqSendLogs);
- for (MqQueueConfig mqQueueConfig : queueConfigList) {
- List<MqSendLog> msgList = configLogsMap.get(mqQueueConfig.getId());
- if(msgList.isEmpty()){
- continue;
- }
- MqMsg mqMsg = mqMsgMap.get(mqQueueConfig.getQueueName());
- if(mqMsg == null){
- log.info("获取mq队列数据失败:{}",mqQueueConfig);
- continue;
- }
- if(mqQueueConfig.getOpenScaling() == 0){ //不开启弹性伸缩
- if(mqMsg.getConsumers() - mqMsg.getMessages_unacknowledged() - mqMsg.getMessages_ready() <= 0 ){ //待计算队列中有任务
- continue;
- }
- }
- if(mqQueueConfig.getOpenScaling() == 1 ){ //开启弹性伸缩
- checkOpenEcs(mqQueueConfig,msgList);
- }
- sendRabbitMq(msgList,mqMsg.getConsumers() - mqMsg.getMessages_unacknowledged() - mqMsg.getMessages_ready(),mqQueueConfig.getQueueName());
- }
- }
- /**
- * 设置调配队列计算
- * 只调配常驻
- */
- private void allocateQueue(List<MqQueueConfig> queueConfigList,List<MqSendLog> mqSendLogs){
- //获取常驻队列
- List<MqQueueConfig> residentList = queueConfigList.stream().filter(e -> e.getIsResident() == 1).collect(Collectors.toList());
- if(residentList.size() != 1){
- log.info("常驻队列未配置,请配置常驻队列");
- return ;
- }
- Integer residentCount = 0;
- MqQueueConfig residentConfig = residentList.get(0);
- MqMsg mqMsg = mqMsgMap.get(residentConfig.getQueueName());
- List<MqSendLog> residentLogList = configLogsMap.get(residentConfig.getId());
- if(mqMsg!= null ){
- residentCount = mqMsg.getConsumers() - mqMsg.getMessages_unacknowledged() - residentLogList.size() ;
- log.info("常驻队列空闲服务器数量为:{},{}",residentConfig.getQueueName(),residentCount);
- }
- //A空闲,B,C转A
- for (MqQueueConfig mqQueueConfig : queueConfigList) {
- List<MqSendLog> msgList = configLogsMap.get(mqQueueConfig.getId());
- if(residentCount >0 && mqQueueConfig.getIsResident() !=1 && mqQueueConfig.getIsSpecial() !=1 && !msgList.isEmpty()){
- Integer updateCount = updateMqSendLogConfig(residentCount, mqQueueConfig.getId(), residentConfig.getId(), mqSendLogs, queueConfigList);
- residentCount = residentCount - updateCount;
- }
- }
- }
- /**
- * 设置调配队列计算
- * 调配A,B,C
- */
- private void allocateQueue2(List<MqQueueConfig> queueConfigList,List<MqSendLog> mqSendLogs){
- //获取常驻队列
- for (MqQueueConfig residentConfig : queueConfigList) { // A ,B ,C
- if(residentConfig.getIsSpecial() == 1){
- continue;
- }
- Integer residentCount = 0;
- MqMsg mqMsg = mqMsgMap.get(residentConfig.getQueueName());
- List<MqSendLog> residentLogList = configLogsMap.get(residentConfig.getId());
- if(mqMsg!= null ){
- residentCount = mqMsg.getConsumers() - mqMsg.getMessages_unacknowledged() - residentLogList.size() ;
- log.info("allocateQueue2空闲服务器数量为:{},{}",residentConfig.getQueueName(),residentCount);
- }
- //A空闲,BC转A 。B空闲,C转B。C空闲,B转C
- for (MqQueueConfig mqQueueConfig : queueConfigList) { // B , C
- if(mqQueueConfig.getIsSpecial() == 1 || mqQueueConfig.getIsResident() == 1){
- continue;
- }
- if(mqQueueConfig.getQueueName().equals(residentConfig.getQueueName())){
- continue;
- }
- List<MqSendLog> msgList = configLogsMap.get(mqQueueConfig.getId());
- if(residentCount >0 && !msgList.isEmpty()){
- Integer updateCount = updateMqSendLogConfig(residentCount, mqQueueConfig.getId(), residentConfig.getId(), mqSendLogs, queueConfigList);
- residentCount = residentCount - updateCount;
- log.info("allocateQueue2-调配成功:{},to:{},count:{}",mqQueueConfig.getQueueName(),residentConfig.getQueueName(),updateCount);
- }
- }
- }
- }
- private Integer updateMqSendLogConfig(Integer residentCount,Integer configId ,Integer residentConfigId,List<MqSendLog> mqSendLogs,List<MqQueueConfig> queueConfigList){
- Integer updateCount = 0;
- List<MqSendLog> msgList = configLogsMap.get(configId);
- for (int i = 0;i < residentCount ;i++){
- if(i > msgList.size() -1){
- continue;
- }
- MqSendLog mqSendLog = msgList.get(i);
- mqSendLog.setConfigId(residentConfigId);
- updateCount ++;
- }
- if(updateCount <=0){
- return updateCount;
- }
- for (MqQueueConfig config : queueConfigList) {
- List<MqSendLog> msgList2 = mqSendLogs.stream().filter(e -> e.getConfigId().equals(config.getId())).collect(Collectors.toList());
- configLogsMap.put(config.getId(),msgList2);
- }
- return updateCount;
- }
- private void checkCount() {
- if(checkOpenCount > 10000){ //一个W为一个循环
- checkOpenCount = 0;
- }
- checkOpenCount ++;
- }
- private void sendRabbitMq(List<MqSendLog> msgList, Integer msgCount,String queueName) {
- for (int i = 0;i < msgCount ;i++){
- if(i > msgList.size() -1){
- continue;
- }
- MqSendLog mqSendLog = msgList.get(i);
- mqSendLogService.updateStatus(mqSendLog.getId(),1,queueName);
- rabbitMqProducer.sendByWorkQueue(queueName,JSONObject.parseObject(mqSendLog.getContent()));
- }
- }
- private void checkOpenEcs(MqQueueConfig mqQueueConfig,List<MqSendLog> msgList) {
- //获取未关闭的服务器
- List<MqEcs> list = mqEcsService.getNoStopByQueueName(mqQueueConfig.getQueueName());
- countMap.put(mqQueueConfig.getQueueName(),list.size());
- LinkedBlockingQueue<DelEcsVo> openList = openMap.computeIfAbsent(
- mqQueueConfig.getQueueName(),
- k -> new LinkedBlockingQueue<>()
- );
- Integer ecsCount = countMap.get(mqQueueConfig.getQueueName());
- MqMsg mqMsg = mqMsgMap.get(mqQueueConfig.getQueueName());
- //时间判断
- Boolean flag = mqQueueConfig.getOpenScalingTime() == 0 || checkOpenCount % (mqQueueConfig.getOpenScalingTime() * 6) == 0;
- //未在计算的服务器数量
- Integer noModelingCount = mqMsg.getConsumers() - mqMsg.getMessages_unacknowledged();
- //未启动的服务器数量
- Integer notStartCount = ecsCount - mqMsg.getConsumers();
- Boolean flag2 = msgList.size() > mqQueueConfig.getScalingThreshold() + noModelingCount + notStartCount || (!msgList.isEmpty() && ecsCount + mqMsg.getConsumers() == 0 );
- if(flag2 && flag){
- log.info("将待开启弹性伸缩放入队列:{}",mqQueueConfig.getQueueName());
- MqScalingConfig mqScalingConfig = mqScalingConfigService.getById(mqQueueConfig.getScalingConfigId());
- DelEcsVo vo = new DelEcsVo(null,mqScalingConfig,mqQueueConfig.getQueueName());
- openList.offer(vo);
- }
- }
- /**
- * 开启弹性伸缩
- */
- public void openEcsList() {
- try {
- HashMap<String, MqQueueConfig> queueMap = queueConfigService.getQueueMap();
- for (Map.Entry<String, LinkedBlockingQueue<DelEcsVo>> entry : openMap.entrySet()) {
- String key = entry.getKey();
- LinkedBlockingQueue<DelEcsVo> openList = entry.getValue();
- if(openList.isEmpty()){
- continue;
- }
- DelEcsVo take = openList.poll();
- MqQueueConfig mqQueueConfig = queueMap.get(key);
- List<MqSendLog> msgList = configLogsMap.get(mqQueueConfig.getId());
- if(msgList.isEmpty()){
- log.info("openEcsList--待推送消息数为0,无需开启:{},{}",key,take.getQueueName());
- continue;
- }
- countMap.merge(mqQueueConfig.getQueueName(), 1, Integer::sum);
- log.info("openEcsList--开启弹性伸缩数量:{},{}",key,1);
- List<MqEcs> mqEcsList = mqEcsService.getNotUpEcs();
- log.info("openEcsList--启动中的ecs数量为:{}",mqEcsList.size());
- if(!mqEcsList.isEmpty()){
- log.info("openEcsList--有启动中的ecs,等待启动完毕执行。。。");
- openList.offer(take);
- return;
- }
- Boolean flag = createEcs( take.getMqScalingConfig());
- if(flag){
- mqEcsService.add(take.getQueueName());
- sendRabbitMq(msgList,1,mqQueueConfig.getQueueName());
- }else {
- openList.offer(take);
- }
- }
- }catch (Exception e){
- log.info("openEcsList--开启弹性伸缩失败:",e);
- }
- }
- /**
- * 判断是否关闭弹性伸缩
- */
- public void checkDelEcs() {
- List<MqEcs> mqEcsList = mqEcsService.getScalingNotStopList();
- if(mqEcsList.isEmpty()){
- return;
- }
- log.info("启动中的弹性伸缩数量为:{}",mqEcsList.size());
- HashMap<String,MqQueueConfig> queueMap = queueConfigService.getQueueMap();
- HashMap<Integer,MqScalingConfig> scalingMap = mqScalingConfigService.getIdMap();
- for (MqEcs mqEcs : mqEcsList) {
- if(StringUtils.isBlank(mqEcs.getEcsName()) || StringUtils.isBlank(mqEcs.getQueueName())){
- continue;
- }
- MqQueueConfig mqQueueConfig = queueMap.get(mqEcs.getQueueName());
- if(mqQueueConfig == null || mqQueueConfig.getScalingConfigId() == null){
- continue;
- }
- long between = DateUtil.between(mqEcs.getCreateTime(), new Date(), DateUnit.MINUTE);
- //弹性伸缩按照一个小时计费
- long count = between/60;
- boolean flag = delList.stream().anyMatch(e -> e.getMqEcs().getEcsName().equals(mqEcs.getEcsName()));
- if(between >= mqQueueConfig.getStopScalingTime() + 60 * count && !flag){
- List<MqSendLog> mqSendLogs = configLogsMap.get(mqQueueConfig.getId());
- if(mqSendLogs == null){
- continue;
- }
- MqMsg mqMsg = mqMsgMap.get(mqEcs.getQueueName());
- if(mqMsg == null){
- continue;
- }
- Integer notModelingCount = mqMsg.getConsumers() - mqMsg.getMessages_unacknowledged() - mqMsg.getMessages_ready();
- if(mqSendLogs.size() - (notModelingCount + mqQueueConfig.getScalingThreshold()) >0 ){
- log.info("待计算数量:{}大于空闲服务数量:{},阈值{},不关闭",mqSendLogs.size(),notModelingCount,mqQueueConfig.getScalingThreshold());
- continue;
- }
- log.info("checkDelEcs-实例开启{}时间大于{}分钟,开始关闭:{}",between,mqQueueConfig.getStopScalingTime(),mqEcs.getEcsName());
- DelEcsVo vo = new DelEcsVo(mqEcs,scalingMap.get(mqQueueConfig.getScalingConfigId()),null);
- delList.offer(vo);
- }
- }
- }
- /**
- * 关闭弹性伸缩
- */
- public void delEcsList() {
- try {
- if(delList.isEmpty()){
- return;
- }
- log.info("delEcsList--关闭弹性伸缩数量:{}",delList.size());
- DelEcsVo take = delList.poll();
- if(take == null){
- return;
- }
- String stopKey = String.format(RedisKey.ecsStopKey,take.getMqEcs().getEcsName());
- redisUtil.set(stopKey,take.getMqEcs().getEcsName(),60 * 60 * 24); //设置计算暂停锁
- String modelingKey = String.format(RedisKey.modelingKey,take.getMqEcs().getEcsName());
- if(redisUtil.hasKey(modelingKey)){
- log.info("delEcsList--有场景正在计算中:{},{}",modelingKey,redisUtil.get(modelingKey));
- redisUtil.del(stopKey);
- return;
- }
- Boolean delFlag = delEcs(take.getMqScalingConfig(), take.getMqEcs().getEcsName());
- if(delFlag){
- log.info("checkDelEcs--关闭弹性伸缩实例成功:{}", take.getMqEcs().getEcsName());
- mqEcsService.updateMqEcs(take.getMqEcs());
- }else {
- delList.offer(take);
- }
- }catch (Exception e){
- log.info("delEcsList--关闭弹性伸缩失败:",e);
- }
- }
- public synchronized Boolean createEcs(MqScalingConfig mqScaling){
- try {
- return ECSUtils.createEcs(mqScaling.getAccessKey(),mqScaling.getSecret(),mqScaling.getEndpoint(),mqScaling.getScalingRuleAri());
- }catch (Exception e){
- log.info("触发弹性伸缩失败:",e);
- }
- return false;
- }
- public synchronized Boolean delEcs(MqScalingConfig mqScaling, String instanceId){
- try {
- return ECSUtils.delEcs(mqScaling.getAccessKey(),mqScaling.getSecret(),mqScaling.getEndpoint(),mqScaling.getScalingGroupId(),instanceId);
- }catch (Exception e){
- log.info("关闭弹性伸缩失败:",e);
- }
- return false;
- }
- }
|