123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248 |
- package com.fdkankan.mqcontroller.task;
- import cn.hutool.core.date.DateUnit;
- import cn.hutool.core.date.DateUtil;
- import cn.hutool.log.Log;
- import com.fdkankan.mqcontroller.entity.*;
- import com.fdkankan.mqcontroller.service.IMqEcsService;
- import com.fdkankan.mqcontroller.service.IMqQueueConfigService;
- import com.fdkankan.mqcontroller.service.IMqScalingConfigService;
- import com.fdkankan.mqcontroller.service.IMqSendLogService;
- import com.fdkankan.mqcontroller.utils.ECSUtils;
- import com.fdkankan.mqcontroller.utils.RabbitMqUtils;
- import com.fdkankan.mqcontroller.utils.RedisKey;
- import com.fdkankan.rabbitmq.util.RabbitMqProducer;
- import com.fdkankan.redis.util.RedisUtil;
- import lombok.extern.slf4j.Slf4j;
- import org.apache.commons.lang3.StringUtils;
- import org.springframework.beans.factory.annotation.Autowired;
- import org.springframework.beans.factory.annotation.Value;
- import org.springframework.cloud.context.config.annotation.RefreshScope;
- import org.springframework.scheduling.annotation.Async;
- import org.springframework.stereotype.Service;
- import java.sql.SQLOutput;
- import java.util.*;
- import java.util.concurrent.LinkedBlockingQueue;
- import java.util.stream.Collectors;
- @Service
- @RefreshScope
- @Slf4j
- public class TaskService {
- @Autowired
- RabbitMqProducer rabbitMqProducer;
- @Autowired
- IMqSendLogService mqSendLogService;
- @Autowired
- IMqQueueConfigService queueConfigService;
- @Autowired
- IMqScalingConfigService mqScalingConfigService;
- @Autowired
- IMqEcsService mqEcsService;
- @Autowired
- RedisUtil redisUtil;
- public static Integer checkOpenCount = 0;
- private static final LinkedBlockingQueue<DelEcsVo> delList = new LinkedBlockingQueue<>();
- private static final HashMap<String,LinkedBlockingQueue<DelEcsVo>> openMap = new HashMap<>();
- @Async
- public void sendMq() {
- checkCount();
- List<MqSendLog> mqSendLogs = mqSendLogService.getNoSendMsg();
- log.info("未分配的mq队列数:{}",mqSendLogs.size());
- List<MqQueueConfig> queueConfigList = queueConfigService.list();
- for (MqQueueConfig mqQueueConfig : queueConfigList) {
- List<MqSendLog> msgList = mqSendLogs.stream().filter(e -> e.getQueue().equals(mqQueueConfig.getQueueName())).collect(Collectors.toList());
- if(msgList.isEmpty()){
- continue;
- }
- MqMsg mqMsg = getRabbitMqMsg(mqQueueConfig.getQueueName());
- if(mqMsg == null){
- log.info("获取mq队列数据失败:{}",mqQueueConfig);
- continue;
- }
- if(mqQueueConfig.getOpenScaling() == 0){ //不开启弹性伸缩
- if(mqMsg.getMessages_ready() >0){ //待计算队列中有任务
- continue;
- }
- }
- if(mqQueueConfig.getOpenScaling() == 1){ //开启弹性伸缩
- checkOpenEcs(mqQueueConfig,msgList.size(),mqMsg.getMessages_ready());
- }
- sendRabbitMq(msgList,mqMsg.getConsumers() - mqMsg.getMessages_unacknowledged() - mqMsg.getMessages_ready());
- }
- }
- private void checkCount() {
- if(checkOpenCount > 10000){ //一个W为一个循环
- checkOpenCount = 0;
- }
- checkOpenCount ++;
- }
- private void sendRabbitMq(List<MqSendLog> msgList, Integer msgCount) {
- for (int i = 0;i < msgCount ;i++){
- if(i > msgList.size() -1){
- continue;
- }
- MqSendLog mqSendLog = msgList.get(i);
- mqSendLog.setStatus(1);
- mqSendLog.setUpdateTime(null);
- mqSendLogService.updateById(mqSendLog);
- rabbitMqProducer.sendByWorkQueue(mqSendLog.getQueue(),mqSendLog.getContent());
- }
- }
- private void checkOpenEcs(MqQueueConfig mqQueueConfig,Integer msgCount,Integer readCount) {
- List<MqEcs> list = mqEcsService.getNoModelingByQueueName(mqQueueConfig.getQueueName());
- LinkedBlockingQueue<DelEcsVo> openList = openMap.get(mqQueueConfig.getQueueName());
- if(openList == null){
- openList = new LinkedBlockingQueue<>();
- openMap.put(mqQueueConfig.getQueueName(),openList);
- }
- if(msgCount + readCount > mqQueueConfig.getScalingThreshold() + list.size() + openList.size() && mqQueueConfig.getOpenScalingTime() % checkOpenCount == 0){
- MqScalingConfig mqScalingConfig = mqScalingConfigService.getById(mqQueueConfig.getScalingConfigId());
- DelEcsVo vo = new DelEcsVo(null,mqScalingConfig,mqQueueConfig.getQueueName(),new Date());
- openList.offer(vo);
- }
- }
- @Async
- public void openEcsList() {
- try {
- HashMap<String, MqQueueConfig> queueMap = queueConfigService.getQueueMap();
- for (String key : openMap.keySet()) {
- LinkedBlockingQueue<DelEcsVo> openList = openMap.get(key);
- if(openList.isEmpty()){
- return;
- }
- DelEcsVo take = openList.poll();
- List<MqSendLog> msgList = mqSendLogService.getNoSendMsgByQueueName(key);
- MqQueueConfig mqQueueConfig = queueMap.get(key);
- if(msgList.size() <= mqQueueConfig.getScalingThreshold()){
- log.info("openEcsList--待计算任务为:{}未超过阈值:{}无需开启弹性伸缩:{}",msgList.size(),mqQueueConfig.getScalingThreshold() ,key);
- return;
- }
- log.info("openEcsList--开启弹性伸缩数量:{},{}",key,openList.size());
- Boolean flag = createEcs( take.getMqScalingConfig());
- if(flag){
- mqEcsService.add(take.getQueueName());
- Thread.sleep(1000L * 5);
- }else {
- openList.offer(take);
- }
- }
- }catch (Exception e){
- log.info("openEcsList--开启弹性伸缩失败:",e);
- }
- }
- public void checkDelEcs() {
- List<MqEcs> mqEcsList = mqEcsService.getScalingNotStopList();
- log.info("启动中的弹性伸缩数量为:{}",mqEcsList.size());
- if(mqEcsList.isEmpty()){
- return;
- }
- HashMap<String,MqQueueConfig> queueMap = queueConfigService.getQueueMap();
- HashMap<Integer,MqScalingConfig> scalingMap = mqScalingConfigService.getIdMap();
- for (MqEcs mqEcs : mqEcsList) {
- if(StringUtils.isBlank(mqEcs.getEcsName()) || StringUtils.isBlank(mqEcs.getQueueName())){
- continue;
- }
- MqQueueConfig mqQueueConfig = queueMap.get(mqEcs.getQueueName());
- if(mqQueueConfig == null || mqQueueConfig.getScalingConfigId() == null){
- continue;
- }
- Long between = DateUtil.between(mqEcs.getCreateTime(), new Date(), DateUnit.MINUTE);
- //弹性伸缩按照一个小时计费
- Long count = between/60;
- boolean flag = delList.stream().anyMatch(e -> e.getMqEcs().getEcsName().equals(mqEcs.getEcsName()));
- if(between >= mqQueueConfig.getStopScalingTime() + 60 * count && !flag){
- log.info("checkDelEcs-实例开启时间大于{}分钟,开始关闭:{}",mqQueueConfig.getStopScalingTime(),mqEcs.getEcsName());
- DelEcsVo vo = new DelEcsVo(mqEcs,scalingMap.get(mqQueueConfig.getScalingConfigId()),null,new Date());
- delList.offer(vo);
- }
- }
- }
- @Async
- public void delEcsList() {
- try {
- log.info("delEcsList--关闭弹性伸缩数量:{}",delList.size());
- if(delList.isEmpty()){
- return;
- }
- DelEcsVo take = delList.poll();
- if(take == null){
- return;
- }
- String stopKey = String.format(RedisKey.ecsStopKey,take.getMqEcs().getEcsName());
- redisUtil.set(stopKey,take.getMqEcs().getEcsName(),60 * 60 * 24); //设置计算暂停锁
- String modelingKey = String.format(RedisKey.modelingKey,take.getMqEcs().getEcsName());
- if(redisUtil.hasKey(modelingKey)){
- redisUtil.del(stopKey);
- return;
- }
- Boolean delFlag = delEcs(take.getMqScalingConfig(), take.getMqEcs().getEcsName());
- if(delFlag){
- log.info("checkDelEcs--关闭弹性伸缩实例成功:{}", take.getMqEcs().getEcsName());
- mqEcsService.updateMqEcs(take.getMqEcs());
- }
- }catch (Exception e){
- log.info("delEcsList--关闭弹性伸缩失败:",e);
- }
- }
- @Value("${spring.rabbitmq.host}")
- public String host;
- @Value("${spring.rabbitmq.username}")
- public String username;
- @Value("${spring.rabbitmq.password}")
- public String password;
- @Value("${spring.rabbitmq.virtual-host}")
- public String virtualHost;
- @Value("${spring.rabbitmq.mgmt-url}")
- public String mgmtUrl;
- @Value("${spring.rabbitmq.mgmt-host}")
- public String mgmtHost;
- public MqMsg getRabbitMqMsg(String queueName) {
- return RabbitMqUtils.getRabbitMqMsg(mgmtUrl+host+":"+mgmtHost,virtualHost,username,password,queueName);
- }
- public synchronized Boolean createEcs(MqScalingConfig mqScaling){
- try {
- return ECSUtils.createEcs(mqScaling.getAccessKey(),mqScaling.getSecret(),mqScaling.getEndpoint(),mqScaling.getScalingRuleAri());
- }catch (Exception e){
- log.info("触发弹性伸缩失败:",e);
- }
- return false;
- }
- public synchronized Boolean delEcs(MqScalingConfig mqScaling, String instanceId){
- try {
- return ECSUtils.delEcs(mqScaling.getAccessKey(),mqScaling.getSecret(),mqScaling.getEndpoint(),mqScaling.getScalingGroupId(),instanceId);
- }catch (Exception e){
- log.info("关闭弹性伸缩失败:",e);
- }
- return false;
- }
- }
|