TaskService.java 14 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358
  1. package com.fdkankan.mqcontroller.task;
  2. import cn.hutool.core.date.DateUnit;
  3. import cn.hutool.core.date.DateUtil;
  4. import cn.hutool.log.Log;
  5. import com.alibaba.fastjson.JSONObject;
  6. import com.fdkankan.mqcontroller.entity.*;
  7. import com.fdkankan.mqcontroller.service.*;
  8. import com.fdkankan.mqcontroller.utils.ECSUtils;
  9. import com.fdkankan.mqcontroller.utils.RabbitMqUtils;
  10. import com.fdkankan.mqcontroller.utils.RedisKey;
  11. import com.fdkankan.rabbitmq.util.RabbitMqProducer;
  12. import com.fdkankan.redis.util.RedisUtil;
  13. import lombok.extern.slf4j.Slf4j;
  14. import org.apache.commons.lang3.StringUtils;
  15. import org.springframework.beans.factory.FactoryBean;
  16. import org.springframework.beans.factory.annotation.Autowired;
  17. import org.springframework.beans.factory.annotation.Value;
  18. import org.springframework.cloud.context.config.annotation.RefreshScope;
  19. import org.springframework.scheduling.annotation.Async;
  20. import org.springframework.stereotype.Service;
  21. import org.springframework.web.bind.annotation.PostMapping;
  22. import javax.annotation.PostConstruct;
  23. import java.sql.SQLOutput;
  24. import java.util.*;
  25. import java.util.concurrent.LinkedBlockingQueue;
  26. import java.util.stream.Collectors;
  27. @Service
  28. @RefreshScope
  29. @Slf4j
  30. public class TaskService {
  31. @Autowired
  32. RabbitMqProducer rabbitMqProducer;
  33. @Autowired
  34. IMqSendLogService mqSendLogService;
  35. @Autowired
  36. IMqQueueConfigService queueConfigService;
  37. @Autowired
  38. IMqScalingConfigService mqScalingConfigService;
  39. @Autowired
  40. IMqEcsService mqEcsService;
  41. @Autowired
  42. RedisUtil redisUtil;
  43. @Autowired
  44. IScenePlusService scenePlusService;
  45. @Autowired
  46. RabbitMqService rabbitMqService;
  47. public static Integer checkOpenCount = 0;
  48. private static final LinkedBlockingQueue<DelEcsVo> delList = new LinkedBlockingQueue<>();
  49. private static final HashMap<String,LinkedBlockingQueue<DelEcsVo>> openMap = new HashMap<>();
  50. public static HashMap<Integer,List<MqSendLog>> configLogsMap = new HashMap<>();
  51. public static HashMap<String,MqMsg> mqMsgMap = new HashMap<>();
  52. public static HashMap<String,Integer> countMap = new HashMap<>();
  53. public void runTask() {
  54. sendMq();
  55. checkDelEcs();
  56. }
  57. public void checkEcs() {
  58. openEcsList();
  59. delEcsList();
  60. }
  61. /**
  62. * 将数据库中待计算的mq消息推送,rabbitmq
  63. */
  64. public void sendMq() {
  65. checkCount();
  66. List<MqSendLog> mqSendLogs = mqSendLogService.getNoSendMsg();
  67. if(mqSendLogs.isEmpty()){
  68. return;
  69. }
  70. log.info("未分配的mq队列数:{}",mqSendLogs.size());
  71. //设置分配队列
  72. HashSet<String> numList = new HashSet<>();
  73. for (MqSendLog mqSendLog : mqSendLogs) {
  74. if(StringUtils.isBlank(mqSendLog.getContent())){
  75. mqSendLogService.updateStatus(mqSendLog.getId(),2,null);
  76. continue;
  77. }
  78. Integer configId = scenePlusService.getQueueConfig(mqSendLog);
  79. if(numList.contains(mqSendLog.getNum())){
  80. mqSendLogService.updateStatus(mqSendLog.getId(),2,null);
  81. continue;
  82. }
  83. if(configId == null ){
  84. mqSendLogService.updateStatus(mqSendLog.getId(),3,null);
  85. continue;
  86. }
  87. mqSendLog.setConfigId(configId);
  88. numList.add(mqSendLog.getNum());
  89. }
  90. List<MqQueueConfig> queueConfigList = queueConfigService.list();
  91. rabbitMqService.getMqMsgMap(queueConfigList);
  92. for (MqQueueConfig config : queueConfigList) {
  93. List<MqSendLog> msgList = mqSendLogs.stream().filter(e -> e.getConfigId().equals(config.getId())).collect(Collectors.toList());
  94. configLogsMap.put(config.getId(),msgList);
  95. }
  96. //是否需要重新组合队列
  97. boolean flag = allocateQueue(queueConfigList);
  98. if(flag){
  99. for (MqQueueConfig config : queueConfigList) {
  100. List<MqSendLog> msgList = mqSendLogs.stream().filter(e -> e.getConfigId().equals(config.getId())).collect(Collectors.toList());
  101. configLogsMap.put(config.getId(),msgList);
  102. }
  103. }
  104. for (MqQueueConfig mqQueueConfig : queueConfigList) {
  105. List<MqSendLog> msgList = configLogsMap.get(mqQueueConfig.getId());
  106. if(msgList.isEmpty()){
  107. continue;
  108. }
  109. MqMsg mqMsg = mqMsgMap.get(mqQueueConfig.getQueueName());
  110. if(mqMsg == null){
  111. log.info("获取mq队列数据失败:{}",mqQueueConfig);
  112. continue;
  113. }
  114. if(mqQueueConfig.getOpenScaling() == 0){ //不开启弹性伸缩
  115. if(mqMsg.getMessages_ready() >0){ //待计算队列中有任务
  116. continue;
  117. }
  118. }
  119. if(mqQueueConfig.getOpenScaling() == 1 ){ //开启弹性伸缩
  120. checkOpenEcs(mqQueueConfig,msgList);
  121. }
  122. sendRabbitMq(msgList,mqMsg.getConsumers() - mqMsg.getMessages_unacknowledged() - mqMsg.getMessages_ready(),mqQueueConfig.getQueueName());
  123. }
  124. }
  125. /**
  126. * 设置调配队列计算
  127. */
  128. private boolean allocateQueue(List<MqQueueConfig> queueConfigList){
  129. Boolean flag = false;
  130. //获取常驻队列
  131. List<MqQueueConfig> residentList = queueConfigList.stream().filter(e -> e.getIsResident() == 1).collect(Collectors.toList());
  132. if(residentList.size() != 1){
  133. log.info("常驻队列未配置,请配置常驻队列");
  134. return flag;
  135. }
  136. Integer residentCount = 0;
  137. MqQueueConfig residentConfig = residentList.get(0);
  138. MqMsg mqMsg = mqMsgMap.get(residentConfig.getQueueName());
  139. List<MqSendLog> residentLogList = configLogsMap.get(residentConfig.getId());
  140. if(mqMsg!= null ){
  141. residentCount = mqMsg.getConsumers() - mqMsg.getMessages_unacknowledged() - residentLogList.size() ;
  142. log.info("常驻队列空闲服务器数量为:{},{}",residentConfig.getQueueName(),residentCount);
  143. }
  144. //获取默认队列
  145. for (MqQueueConfig mqQueueConfig : queueConfigList) {
  146. List<MqSendLog> msgList = configLogsMap.get(mqQueueConfig.getId());
  147. if(residentCount >0 && mqQueueConfig.getIsResident() !=1 && mqQueueConfig.getIsSpecial() !=1 && !msgList.isEmpty()){
  148. updateMqSendLogConfig(residentCount,mqQueueConfig.getId(),residentConfig.getId());
  149. flag = true;
  150. }
  151. }
  152. return flag;
  153. }
  154. private void updateMqSendLogConfig(Integer residentCount,Integer configId ,Integer residentConfigId){
  155. List<MqSendLog> msgList = configLogsMap.get(configId);
  156. for (int i = 0;i < residentCount ;i++){
  157. if(i > msgList.size() -1){
  158. continue;
  159. }
  160. MqSendLog mqSendLog = msgList.get(i);
  161. mqSendLog.setConfigId(residentConfigId);
  162. }
  163. }
  164. private void checkCount() {
  165. if(checkOpenCount > 10000){ //一个W为一个循环
  166. checkOpenCount = 0;
  167. }
  168. checkOpenCount ++;
  169. }
  170. private void sendRabbitMq(List<MqSendLog> msgList, Integer msgCount,String queueName) {
  171. for (int i = 0;i < msgCount ;i++){
  172. if(i > msgList.size() -1){
  173. continue;
  174. }
  175. MqSendLog mqSendLog = msgList.get(i);
  176. mqSendLogService.updateStatus(mqSendLog.getId(),1,queueName);
  177. rabbitMqProducer.sendByWorkQueue(queueName,JSONObject.parseObject(mqSendLog.getContent()));
  178. }
  179. }
  180. private void checkOpenEcs(MqQueueConfig mqQueueConfig,List<MqSendLog> msgList) {
  181. //获取未关闭的服务器
  182. List<MqEcs> list = mqEcsService.getNoStopByQueueName(mqQueueConfig.getQueueName());
  183. countMap.put(mqQueueConfig.getQueueName(),list.size());
  184. LinkedBlockingQueue<DelEcsVo> openList = openMap.computeIfAbsent(
  185. mqQueueConfig.getQueueName(),
  186. k -> new LinkedBlockingQueue<>()
  187. );
  188. Integer ecsCount = countMap.get(mqQueueConfig.getQueueName());
  189. MqMsg mqMsg = mqMsgMap.get(mqQueueConfig.getQueueName());
  190. Boolean flag = mqQueueConfig.getOpenScalingTime() == 0 || checkOpenCount % mqQueueConfig.getOpenScalingTime() == 0;
  191. //未在计算的服务器数量
  192. Integer noModelingCount = mqMsg.getConsumers() - mqMsg.getMessages_unacknowledged();
  193. //未启动的服务器数量
  194. Integer notStartCount = ecsCount - mqMsg.getConsumers();
  195. if(msgList.size() > mqQueueConfig.getScalingThreshold() + noModelingCount + notStartCount && flag){
  196. log.info("将待开启弹性伸缩放入队列:{}",mqQueueConfig.getQueueName());
  197. MqScalingConfig mqScalingConfig = mqScalingConfigService.getById(mqQueueConfig.getScalingConfigId());
  198. DelEcsVo vo = new DelEcsVo(null,mqScalingConfig,mqQueueConfig.getQueueName());
  199. openList.offer(vo);
  200. }
  201. }
  202. /**
  203. * 开启弹性伸缩
  204. */
  205. public void openEcsList() {
  206. try {
  207. HashMap<String, MqQueueConfig> queueMap = queueConfigService.getQueueMap();
  208. for (Map.Entry<String, LinkedBlockingQueue<DelEcsVo>> entry : openMap.entrySet()) {
  209. String key = entry.getKey();
  210. LinkedBlockingQueue<DelEcsVo> openList = entry.getValue();
  211. if(openList.isEmpty()){
  212. return;
  213. }
  214. DelEcsVo take = openList.poll();
  215. MqQueueConfig mqQueueConfig = queueMap.get(key);
  216. List<MqSendLog> msgList = configLogsMap.get(mqQueueConfig.getId());
  217. countMap.merge(mqQueueConfig.getQueueName(), 1, Integer::sum);
  218. log.info("openEcsList--开启弹性伸缩数量:{},{}",key,openList.size());
  219. List<MqEcs> mqEcsList = mqEcsService.getNotUpEcs();
  220. log.info("openEcsList--启动中的ecs数量为:{}",mqEcsList.size());
  221. if(!mqEcsList.isEmpty()){
  222. log.info("openEcsList--有启动中的ecs,等待启动完毕执行。。。");
  223. openList.offer(take);
  224. return;
  225. }
  226. Boolean flag = createEcs( take.getMqScalingConfig());
  227. if(flag){
  228. mqEcsService.add(take.getQueueName());
  229. sendRabbitMq(msgList,1,mqQueueConfig.getQueueName());
  230. Thread.sleep(5000L);
  231. }else {
  232. openList.offer(take);
  233. }
  234. }
  235. }catch (Exception e){
  236. log.info("openEcsList--开启弹性伸缩失败:",e);
  237. }
  238. }
  239. /**
  240. * 判断是否关闭弹性伸缩
  241. */
  242. public void checkDelEcs() {
  243. List<MqEcs> mqEcsList = mqEcsService.getScalingNotStopList();
  244. if(mqEcsList.isEmpty()){
  245. return;
  246. }
  247. log.info("启动中的弹性伸缩数量为:{}",mqEcsList.size());
  248. HashMap<String,MqQueueConfig> queueMap = queueConfigService.getQueueMap();
  249. HashMap<Integer,MqScalingConfig> scalingMap = mqScalingConfigService.getIdMap();
  250. for (MqEcs mqEcs : mqEcsList) {
  251. if(StringUtils.isBlank(mqEcs.getEcsName()) || StringUtils.isBlank(mqEcs.getQueueName())){
  252. continue;
  253. }
  254. MqQueueConfig mqQueueConfig = queueMap.get(mqEcs.getQueueName());
  255. if(mqQueueConfig == null || mqQueueConfig.getScalingConfigId() == null){
  256. continue;
  257. }
  258. long between = DateUtil.between(mqEcs.getCreateTime(), new Date(), DateUnit.MINUTE);
  259. //弹性伸缩按照一个小时计费
  260. long count = between/60;
  261. boolean flag = delList.stream().anyMatch(e -> e.getMqEcs().getEcsName().equals(mqEcs.getEcsName()));
  262. if(between >= mqQueueConfig.getStopScalingTime() + 60 * count && !flag){
  263. log.info("checkDelEcs-实例开启时间大于{}分钟,开始关闭:{}",mqQueueConfig.getStopScalingTime(),mqEcs.getEcsName());
  264. DelEcsVo vo = new DelEcsVo(mqEcs,scalingMap.get(mqQueueConfig.getScalingConfigId()),null);
  265. delList.offer(vo);
  266. }
  267. }
  268. }
  269. /**
  270. * 关闭弹性伸缩
  271. */
  272. public void delEcsList() {
  273. try {
  274. if(delList.isEmpty()){
  275. return;
  276. }
  277. log.info("delEcsList--关闭弹性伸缩数量:{}",delList.size());
  278. DelEcsVo take = delList.poll();
  279. if(take == null){
  280. return;
  281. }
  282. String stopKey = String.format(RedisKey.ecsStopKey,take.getMqEcs().getEcsName());
  283. redisUtil.set(stopKey,take.getMqEcs().getEcsName(),60 * 60 * 24); //设置计算暂停锁
  284. String modelingKey = String.format(RedisKey.modelingKey,take.getMqEcs().getEcsName());
  285. if(redisUtil.hasKey(modelingKey)){
  286. redisUtil.del(stopKey);
  287. return;
  288. }
  289. Boolean delFlag = delEcs(take.getMqScalingConfig(), take.getMqEcs().getEcsName());
  290. if(delFlag){
  291. log.info("checkDelEcs--关闭弹性伸缩实例成功:{}", take.getMqEcs().getEcsName());
  292. mqEcsService.updateMqEcs(take.getMqEcs());
  293. }else {
  294. delList.offer(take);
  295. }
  296. }catch (Exception e){
  297. log.info("delEcsList--关闭弹性伸缩失败:",e);
  298. }
  299. }
  300. public synchronized Boolean createEcs(MqScalingConfig mqScaling){
  301. try {
  302. return ECSUtils.createEcs(mqScaling.getAccessKey(),mqScaling.getSecret(),mqScaling.getEndpoint(),mqScaling.getScalingRuleAri());
  303. }catch (Exception e){
  304. log.info("触发弹性伸缩失败:",e);
  305. }
  306. return false;
  307. }
  308. public synchronized Boolean delEcs(MqScalingConfig mqScaling, String instanceId){
  309. try {
  310. return ECSUtils.delEcs(mqScaling.getAccessKey(),mqScaling.getSecret(),mqScaling.getEndpoint(),mqScaling.getScalingGroupId(),instanceId);
  311. }catch (Exception e){
  312. log.info("关闭弹性伸缩失败:",e);
  313. }
  314. return false;
  315. }
  316. }