TaskService.java 9.8 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248
  1. package com.fdkankan.mqcontroller.task;
  2. import cn.hutool.core.date.DateUnit;
  3. import cn.hutool.core.date.DateUtil;
  4. import cn.hutool.log.Log;
  5. import com.fdkankan.mqcontroller.entity.*;
  6. import com.fdkankan.mqcontroller.service.IMqEcsService;
  7. import com.fdkankan.mqcontroller.service.IMqQueueConfigService;
  8. import com.fdkankan.mqcontroller.service.IMqScalingConfigService;
  9. import com.fdkankan.mqcontroller.service.IMqSendLogService;
  10. import com.fdkankan.mqcontroller.utils.ECSUtils;
  11. import com.fdkankan.mqcontroller.utils.RabbitMqUtils;
  12. import com.fdkankan.mqcontroller.utils.RedisKey;
  13. import com.fdkankan.rabbitmq.util.RabbitMqProducer;
  14. import com.fdkankan.redis.util.RedisUtil;
  15. import lombok.extern.slf4j.Slf4j;
  16. import org.apache.commons.lang3.StringUtils;
  17. import org.springframework.beans.factory.annotation.Autowired;
  18. import org.springframework.beans.factory.annotation.Value;
  19. import org.springframework.cloud.context.config.annotation.RefreshScope;
  20. import org.springframework.scheduling.annotation.Async;
  21. import org.springframework.stereotype.Service;
  22. import java.sql.SQLOutput;
  23. import java.util.*;
  24. import java.util.concurrent.LinkedBlockingQueue;
  25. import java.util.stream.Collectors;
  26. @Service
  27. @RefreshScope
  28. @Slf4j
  29. public class TaskService {
  30. @Autowired
  31. RabbitMqProducer rabbitMqProducer;
  32. @Autowired
  33. IMqSendLogService mqSendLogService;
  34. @Autowired
  35. IMqQueueConfigService queueConfigService;
  36. @Autowired
  37. IMqScalingConfigService mqScalingConfigService;
  38. @Autowired
  39. IMqEcsService mqEcsService;
  40. @Autowired
  41. RedisUtil redisUtil;
  42. public static Integer checkOpenCount = 0;
  43. private static final LinkedBlockingQueue<DelEcsVo> delList = new LinkedBlockingQueue<>();
  44. private static final HashMap<String,LinkedBlockingQueue<DelEcsVo>> openMap = new HashMap<>();
  45. @Async
  46. public void sendMq() {
  47. checkCount();
  48. List<MqSendLog> mqSendLogs = mqSendLogService.getNoSendMsg();
  49. log.info("未分配的mq队列数:{}",mqSendLogs.size());
  50. List<MqQueueConfig> queueConfigList = queueConfigService.list();
  51. for (MqQueueConfig mqQueueConfig : queueConfigList) {
  52. List<MqSendLog> msgList = mqSendLogs.stream().filter(e -> e.getQueue().equals(mqQueueConfig.getQueueName())).collect(Collectors.toList());
  53. if(msgList.isEmpty()){
  54. continue;
  55. }
  56. MqMsg mqMsg = getRabbitMqMsg(mqQueueConfig.getQueueName());
  57. if(mqMsg == null){
  58. log.info("获取mq队列数据失败:{}",mqQueueConfig);
  59. continue;
  60. }
  61. if(mqQueueConfig.getOpenScaling() == 0){ //不开启弹性伸缩
  62. if(mqMsg.getMessages_ready() >0){ //待计算队列中有任务
  63. continue;
  64. }
  65. }
  66. if(mqQueueConfig.getOpenScaling() == 1){ //开启弹性伸缩
  67. checkOpenEcs(mqQueueConfig,msgList.size(),mqMsg.getMessages_ready());
  68. }
  69. sendRabbitMq(msgList,mqMsg.getConsumers() - mqMsg.getMessages_unacknowledged() - mqMsg.getMessages_ready());
  70. }
  71. }
  72. private void checkCount() {
  73. if(checkOpenCount > 10000){ //一个W为一个循环
  74. checkOpenCount = 0;
  75. }
  76. checkOpenCount ++;
  77. }
  78. private void sendRabbitMq(List<MqSendLog> msgList, Integer msgCount) {
  79. for (int i = 0;i < msgCount ;i++){
  80. if(i > msgList.size() -1){
  81. continue;
  82. }
  83. MqSendLog mqSendLog = msgList.get(i);
  84. mqSendLog.setStatus(1);
  85. mqSendLog.setUpdateTime(null);
  86. mqSendLogService.updateById(mqSendLog);
  87. rabbitMqProducer.sendByWorkQueue(mqSendLog.getQueue(),mqSendLog.getContent());
  88. }
  89. }
  90. private void checkOpenEcs(MqQueueConfig mqQueueConfig,Integer msgCount,Integer readCount) {
  91. List<MqEcs> list = mqEcsService.getNoModelingByQueueName(mqQueueConfig.getQueueName());
  92. LinkedBlockingQueue<DelEcsVo> openList = openMap.get(mqQueueConfig.getQueueName());
  93. if(openList == null){
  94. openList = new LinkedBlockingQueue<>();
  95. openMap.put(mqQueueConfig.getQueueName(),openList);
  96. }
  97. if(msgCount + readCount > mqQueueConfig.getScalingThreshold() + list.size() + openList.size() && mqQueueConfig.getOpenScalingTime() % checkOpenCount == 0){
  98. MqScalingConfig mqScalingConfig = mqScalingConfigService.getById(mqQueueConfig.getScalingConfigId());
  99. DelEcsVo vo = new DelEcsVo(null,mqScalingConfig,mqQueueConfig.getQueueName(),new Date());
  100. openList.offer(vo);
  101. }
  102. }
  103. @Async
  104. public void openEcsList() {
  105. try {
  106. HashMap<String, MqQueueConfig> queueMap = queueConfigService.getQueueMap();
  107. for (String key : openMap.keySet()) {
  108. LinkedBlockingQueue<DelEcsVo> openList = openMap.get(key);
  109. if(openList.isEmpty()){
  110. return;
  111. }
  112. DelEcsVo take = openList.poll();
  113. List<MqSendLog> msgList = mqSendLogService.getNoSendMsgByQueueName(key);
  114. MqQueueConfig mqQueueConfig = queueMap.get(key);
  115. if(msgList.size() <= mqQueueConfig.getScalingThreshold()){
  116. log.info("openEcsList--待计算任务为:{}未超过阈值:{}无需开启弹性伸缩:{}",msgList.size(),mqQueueConfig.getScalingThreshold() ,key);
  117. return;
  118. }
  119. log.info("openEcsList--开启弹性伸缩数量:{},{}",key,openList.size());
  120. Boolean flag = createEcs( take.getMqScalingConfig());
  121. if(flag){
  122. mqEcsService.add(take.getQueueName());
  123. Thread.sleep(1000L * 5);
  124. }else {
  125. openList.offer(take);
  126. }
  127. }
  128. }catch (Exception e){
  129. log.info("openEcsList--开启弹性伸缩失败:",e);
  130. }
  131. }
  132. public void checkDelEcs() {
  133. List<MqEcs> mqEcsList = mqEcsService.getScalingNotStopList();
  134. log.info("启动中的弹性伸缩数量为:{}",mqEcsList.size());
  135. if(mqEcsList.isEmpty()){
  136. return;
  137. }
  138. HashMap<String,MqQueueConfig> queueMap = queueConfigService.getQueueMap();
  139. HashMap<Integer,MqScalingConfig> scalingMap = mqScalingConfigService.getIdMap();
  140. for (MqEcs mqEcs : mqEcsList) {
  141. if(StringUtils.isBlank(mqEcs.getEcsName()) || StringUtils.isBlank(mqEcs.getQueueName())){
  142. continue;
  143. }
  144. MqQueueConfig mqQueueConfig = queueMap.get(mqEcs.getQueueName());
  145. if(mqQueueConfig == null || mqQueueConfig.getScalingConfigId() == null){
  146. continue;
  147. }
  148. Long between = DateUtil.between(mqEcs.getCreateTime(), new Date(), DateUnit.MINUTE);
  149. //弹性伸缩按照一个小时计费
  150. Long count = between/60;
  151. boolean flag = delList.stream().anyMatch(e -> e.getMqEcs().getEcsName().equals(mqEcs.getEcsName()));
  152. if(between >= mqQueueConfig.getStopScalingTime() + 60 * count && !flag){
  153. log.info("checkDelEcs-实例开启时间大于{}分钟,开始关闭:{}",mqQueueConfig.getStopScalingTime(),mqEcs.getEcsName());
  154. DelEcsVo vo = new DelEcsVo(mqEcs,scalingMap.get(mqQueueConfig.getScalingConfigId()),null,new Date());
  155. delList.offer(vo);
  156. }
  157. }
  158. }
  159. @Async
  160. public void delEcsList() {
  161. try {
  162. log.info("delEcsList--关闭弹性伸缩数量:{}",delList.size());
  163. if(delList.isEmpty()){
  164. return;
  165. }
  166. DelEcsVo take = delList.poll();
  167. if(take == null){
  168. return;
  169. }
  170. String stopKey = String.format(RedisKey.ecsStopKey,take.getMqEcs().getEcsName());
  171. redisUtil.set(stopKey,take.getMqEcs().getEcsName(),60 * 60 * 24); //设置计算暂停锁
  172. String modelingKey = String.format(RedisKey.modelingKey,take.getMqEcs().getEcsName());
  173. if(redisUtil.hasKey(modelingKey)){
  174. redisUtil.del(stopKey);
  175. return;
  176. }
  177. Boolean delFlag = delEcs(take.getMqScalingConfig(), take.getMqEcs().getEcsName());
  178. if(delFlag){
  179. log.info("checkDelEcs--关闭弹性伸缩实例成功:{}", take.getMqEcs().getEcsName());
  180. mqEcsService.updateMqEcs(take.getMqEcs());
  181. }
  182. }catch (Exception e){
  183. log.info("delEcsList--关闭弹性伸缩失败:",e);
  184. }
  185. }
  186. @Value("${spring.rabbitmq.host}")
  187. public String host;
  188. @Value("${spring.rabbitmq.username}")
  189. public String username;
  190. @Value("${spring.rabbitmq.password}")
  191. public String password;
  192. @Value("${spring.rabbitmq.virtual-host}")
  193. public String virtualHost;
  194. @Value("${spring.rabbitmq.mgmt-url}")
  195. public String mgmtUrl;
  196. @Value("${spring.rabbitmq.mgmt-host}")
  197. public String mgmtHost;
  198. public MqMsg getRabbitMqMsg(String queueName) {
  199. return RabbitMqUtils.getRabbitMqMsg(mgmtUrl+host+":"+mgmtHost,virtualHost,username,password,queueName);
  200. }
  201. public synchronized Boolean createEcs(MqScalingConfig mqScaling){
  202. try {
  203. return ECSUtils.createEcs(mqScaling.getAccessKey(),mqScaling.getSecret(),mqScaling.getEndpoint(),mqScaling.getScalingRuleAri());
  204. }catch (Exception e){
  205. log.info("触发弹性伸缩失败:",e);
  206. }
  207. return false;
  208. }
  209. public synchronized Boolean delEcs(MqScalingConfig mqScaling, String instanceId){
  210. try {
  211. return ECSUtils.delEcs(mqScaling.getAccessKey(),mqScaling.getSecret(),mqScaling.getEndpoint(),mqScaling.getScalingGroupId(),instanceId);
  212. }catch (Exception e){
  213. log.info("关闭弹性伸缩失败:",e);
  214. }
  215. return false;
  216. }
  217. }