TaskService.java 15 KB


  1. package com.fdkankan.mqcontroller.task;
  2. import cn.hutool.core.date.DateUnit;
  3. import cn.hutool.core.date.DateUtil;
  4. import cn.hutool.log.Log;
  5. import com.alibaba.fastjson.JSONObject;
  6. import com.fdkankan.mqcontroller.entity.*;
  7. import com.fdkankan.mqcontroller.service.*;
  8. import com.fdkankan.mqcontroller.utils.ECSUtils;
  9. import com.fdkankan.mqcontroller.utils.RabbitMqUtils;
  10. import com.fdkankan.mqcontroller.utils.RedisKey;
  11. import com.fdkankan.rabbitmq.util.RabbitMqProducer;
  12. import com.fdkankan.redis.util.RedisUtil;
  13. import lombok.extern.slf4j.Slf4j;
  14. import org.apache.commons.lang3.StringUtils;
  15. import org.springframework.beans.factory.FactoryBean;
  16. import org.springframework.beans.factory.annotation.Autowired;
  17. import org.springframework.beans.factory.annotation.Value;
  18. import org.springframework.cloud.context.config.annotation.RefreshScope;
  19. import org.springframework.scheduling.annotation.Async;
  20. import org.springframework.stereotype.Service;
  21. import org.springframework.web.bind.annotation.PostMapping;
  22. import javax.annotation.PostConstruct;
  23. import java.sql.SQLOutput;
  24. import java.util.*;
  25. import java.util.concurrent.LinkedBlockingQueue;
  26. import java.util.stream.Collectors;
  27. @Service
  28. @RefreshScope
  29. @Slf4j
  30. public class TaskService {
  31. @Autowired
  32. RabbitMqProducer rabbitMqProducer;
  33. @Autowired
  34. IMqSendLogService mqSendLogService;
  35. @Autowired
  36. IMqQueueConfigService queueConfigService;
  37. @Autowired
  38. IMqScalingConfigService mqScalingConfigService;
  39. @Autowired
  40. IMqEcsService mqEcsService;
  41. @Autowired
  42. RedisUtil redisUtil;
  43. @Autowired
  44. IScenePlusService scenePlusService;
  45. @Autowired
  46. RabbitMqService rabbitMqService;
  47. public static Integer checkOpenCount = 0;
  48. private static final LinkedBlockingQueue<DelEcsVo> delList = new LinkedBlockingQueue<>();
  49. private static final HashMap<String,LinkedBlockingQueue<DelEcsVo>> openMap = new HashMap<>();
  50. public static HashMap<Integer,List<MqSendLog>> configLogsMap = new HashMap<>();
  51. public static HashMap<String,MqMsg> mqMsgMap = new HashMap<>();
  52. public static HashMap<String,Integer> countMap = new HashMap<>();
  53. public void runTask() {
  54. try {
  55. sendMq();
  56. }catch (Exception e){
  57. log.info("sendMq:error:",e);
  58. }
  59. try {
  60. checkDelEcs();
  61. }catch (Exception e){
  62. log.info("checkDelEcs:error:",e);
  63. }
  64. }
  65. /**
  66. * 将数据库中待计算的mq消息推送,rabbitmq
  67. */
  68. public void sendMq() {
  69. checkCount();
  70. List<MqSendLog> mqSendLogs = mqSendLogService.getNoSendMsg();
  71. if(mqSendLogs.isEmpty()){
  72. return;
  73. }
  74. log.info("未分配的mq队列数:{}",mqSendLogs.size());
  75. for (MqSendLog mqSendLog : mqSendLogs) {
  76. redisUtil.set("mq-wait:queue:num:"+mqSendLog.getNum(),mqSendLog.getNum(),60);
  77. }
  78. //设置分配队列
  79. HashSet<String> numList = new HashSet<>();
  80. for (MqSendLog mqSendLog : mqSendLogs) {
  81. if(StringUtils.isBlank(mqSendLog.getContent())){
  82. mqSendLogService.updateStatus(mqSendLog.getId(),2,null);
  83. continue;
  84. }
  85. Integer configId = scenePlusService.getQueueConfig(mqSendLog);
  86. if(numList.contains(mqSendLog.getNum())){
  87. mqSendLogService.updateStatus(mqSendLog.getId(),2,null);
  88. continue;
  89. }
  90. if(configId == null ){
  91. mqSendLogService.updateStatus(mqSendLog.getId(),3,null);
  92. continue;
  93. }
  94. mqSendLog.setConfigId(configId);
  95. numList.add(mqSendLog.getNum());
  96. }
  97. List<MqQueueConfig> queueConfigList = queueConfigService.list();
  98. rabbitMqService.getMqMsgMap(queueConfigList);
  99. for (MqQueueConfig config : queueConfigList) {
  100. List<MqSendLog> msgList = mqSendLogs.stream().filter(e -> e.getConfigId().equals(config.getId())).collect(Collectors.toList());
  101. configLogsMap.put(config.getId(),msgList);
  102. }
  103. //是否需要重新组合队列
  104. boolean flag = allocateQueue(queueConfigList);
  105. if(flag){
  106. for (MqQueueConfig config : queueConfigList) {
  107. List<MqSendLog> msgList = mqSendLogs.stream().filter(e -> e.getConfigId().equals(config.getId())).collect(Collectors.toList());
  108. configLogsMap.put(config.getId(),msgList);
  109. }
  110. }
  111. for (MqQueueConfig mqQueueConfig : queueConfigList) {
  112. List<MqSendLog> msgList = configLogsMap.get(mqQueueConfig.getId());
  113. if(msgList.isEmpty()){
  114. continue;
  115. }
  116. MqMsg mqMsg = mqMsgMap.get(mqQueueConfig.getQueueName());
  117. if(mqMsg == null){
  118. log.info("获取mq队列数据失败:{}",mqQueueConfig);
  119. continue;
  120. }
  121. if(mqQueueConfig.getOpenScaling() == 0){ //不开启弹性伸缩
  122. if(mqMsg.getConsumers() - mqMsg.getMessages_unacknowledged() - mqMsg.getMessages_ready() <= 0 ){ //待计算队列中有任务
  123. continue;
  124. }
  125. }
  126. if(mqQueueConfig.getOpenScaling() == 1 ){ //开启弹性伸缩
  127. checkOpenEcs(mqQueueConfig,msgList);
  128. }
  129. sendRabbitMq(msgList,mqMsg.getConsumers() - mqMsg.getMessages_unacknowledged() - mqMsg.getMessages_ready(),mqQueueConfig.getQueueName());
  130. for (MqSendLog mqSendLog : mqSendLogs) {
  131. redisUtil.del("mq-wait:queue:num:"+mqSendLog.getNum());
  132. }
  133. }
  134. }
  135. /**
  136. * 设置调配队列计算
  137. */
  138. private boolean allocateQueue(List<MqQueueConfig> queueConfigList){
  139. Boolean flag = false;
  140. //获取常驻队列
  141. List<MqQueueConfig> residentList = queueConfigList.stream().filter(e -> e.getIsResident() == 1).collect(Collectors.toList());
  142. if(residentList.size() != 1){
  143. log.info("常驻队列未配置,请配置常驻队列");
  144. return flag;
  145. }
  146. Integer residentCount = 0;
  147. MqQueueConfig residentConfig = residentList.get(0);
  148. MqMsg mqMsg = mqMsgMap.get(residentConfig.getQueueName());
  149. List<MqSendLog> residentLogList = configLogsMap.get(residentConfig.getId());
  150. if(mqMsg!= null ){
  151. residentCount = mqMsg.getConsumers() - mqMsg.getMessages_unacknowledged() - residentLogList.size() ;
  152. log.info("常驻队列空闲服务器数量为:{},{}",residentConfig.getQueueName(),mqMsg.getConsumers() - mqMsg.getMessages_unacknowledged());
  153. }
  154. //获取默认队列
  155. for (MqQueueConfig mqQueueConfig : queueConfigList) {
  156. List<MqSendLog> msgList = configLogsMap.get(mqQueueConfig.getId());
  157. if(residentCount >0 && mqQueueConfig.getIsResident() !=1 && mqQueueConfig.getIsSpecial() !=1 && !msgList.isEmpty()){
  158. updateMqSendLogConfig(residentCount,mqQueueConfig.getId(),residentConfig.getId());
  159. flag = true;
  160. }
  161. }
  162. return flag;
  163. }
  164. private void updateMqSendLogConfig(Integer residentCount,Integer configId ,Integer residentConfigId){
  165. List<MqSendLog> msgList = configLogsMap.get(configId);
  166. for (int i = 0;i < residentCount ;i++){
  167. if(i > msgList.size() -1){
  168. continue;
  169. }
  170. MqSendLog mqSendLog = msgList.get(i);
  171. mqSendLog.setConfigId(residentConfigId);
  172. }
  173. }
  174. private void checkCount() {
  175. if(checkOpenCount > 10000){ //一个W为一个循环
  176. checkOpenCount = 0;
  177. }
  178. checkOpenCount ++;
  179. }
  180. private void sendRabbitMq(List<MqSendLog> msgList, Integer msgCount,String queueName) {
  181. for (int i = 0;i < msgCount ;i++){
  182. if(i > msgList.size() -1){
  183. continue;
  184. }
  185. MqSendLog mqSendLog = msgList.get(i);
  186. mqSendLogService.updateStatus(mqSendLog.getId(),1,queueName);
  187. rabbitMqProducer.sendByWorkQueue(queueName,JSONObject.parseObject(mqSendLog.getContent()));
  188. }
  189. }
  190. private void checkOpenEcs(MqQueueConfig mqQueueConfig,List<MqSendLog> msgList) {
  191. //获取未关闭的服务器
  192. List<MqEcs> list = mqEcsService.getNoStopByQueueName(mqQueueConfig.getQueueName());
  193. countMap.put(mqQueueConfig.getQueueName(),list.size());
  194. LinkedBlockingQueue<DelEcsVo> openList = openMap.computeIfAbsent(
  195. mqQueueConfig.getQueueName(),
  196. k -> new LinkedBlockingQueue<>()
  197. );
  198. Integer ecsCount = countMap.get(mqQueueConfig.getQueueName());
  199. MqMsg mqMsg = mqMsgMap.get(mqQueueConfig.getQueueName());
  200. Boolean flag = mqQueueConfig.getOpenScalingTime() == 0 || checkOpenCount / 2 % mqQueueConfig.getOpenScalingTime() == 0;
  201. //未在计算的服务器数量
  202. Integer noModelingCount = mqMsg.getConsumers() - mqMsg.getMessages_unacknowledged();
  203. //未启动的服务器数量
  204. Integer notStartCount = ecsCount - mqMsg.getConsumers();
  205. if(msgList.size() > mqQueueConfig.getScalingThreshold() + noModelingCount + notStartCount && flag){
  206. log.info("将待开启弹性伸缩放入队列:{}",mqQueueConfig.getQueueName());
  207. MqScalingConfig mqScalingConfig = mqScalingConfigService.getById(mqQueueConfig.getScalingConfigId());
  208. DelEcsVo vo = new DelEcsVo(null,mqScalingConfig,mqQueueConfig.getQueueName());
  209. openList.offer(vo);
  210. }
  211. }
  212. /**
  213. * 开启弹性伸缩
  214. */
  215. public void openEcsList() {
  216. try {
  217. HashMap<String, MqQueueConfig> queueMap = queueConfigService.getQueueMap();
  218. for (Map.Entry<String, LinkedBlockingQueue<DelEcsVo>> entry : openMap.entrySet()) {
  219. String key = entry.getKey();
  220. LinkedBlockingQueue<DelEcsVo> openList = entry.getValue();
  221. if(openList.isEmpty()){
  222. continue;
  223. }
  224. DelEcsVo take = openList.poll();
  225. MqQueueConfig mqQueueConfig = queueMap.get(key);
  226. List<MqSendLog> msgList = configLogsMap.get(mqQueueConfig.getId());
  227. countMap.merge(mqQueueConfig.getQueueName(), 1, Integer::sum);
  228. log.info("openEcsList--开启弹性伸缩数量:{},{}",key,1);
  229. List<MqEcs> mqEcsList = mqEcsService.getNotUpEcs();
  230. log.info("openEcsList--启动中的ecs数量为:{}",mqEcsList.size());
  231. if(!mqEcsList.isEmpty()){
  232. log.info("openEcsList--有启动中的ecs,等待启动完毕执行。。。");
  233. openList.offer(take);
  234. return;
  235. }
  236. Boolean flag = createEcs( take.getMqScalingConfig());
  237. if(flag){
  238. mqEcsService.add(take.getQueueName());
  239. sendRabbitMq(msgList,1,mqQueueConfig.getQueueName());
  240. }else {
  241. openList.offer(take);
  242. }
  243. }
  244. }catch (Exception e){
  245. log.info("openEcsList--开启弹性伸缩失败:",e);
  246. }
  247. }
  248. /**
  249. * 判断是否关闭弹性伸缩
  250. */
  251. public void checkDelEcs() {
  252. List<MqEcs> mqEcsList = mqEcsService.getScalingNotStopList();
  253. if(mqEcsList.isEmpty()){
  254. return;
  255. }
  256. log.info("启动中的弹性伸缩数量为:{}",mqEcsList.size());
  257. HashMap<String,MqQueueConfig> queueMap = queueConfigService.getQueueMap();
  258. HashMap<Integer,MqScalingConfig> scalingMap = mqScalingConfigService.getIdMap();
  259. for (MqEcs mqEcs : mqEcsList) {
  260. if(StringUtils.isBlank(mqEcs.getEcsName()) || StringUtils.isBlank(mqEcs.getQueueName())){
  261. continue;
  262. }
  263. MqQueueConfig mqQueueConfig = queueMap.get(mqEcs.getQueueName());
  264. if(mqQueueConfig == null || mqQueueConfig.getScalingConfigId() == null){
  265. continue;
  266. }
  267. long between = DateUtil.between(mqEcs.getCreateTime(), new Date(), DateUnit.MINUTE);
  268. //弹性伸缩按照一个小时计费
  269. long count = between/60;
  270. boolean flag = delList.stream().anyMatch(e -> e.getMqEcs().getEcsName().equals(mqEcs.getEcsName()));
  271. if(between >= mqQueueConfig.getStopScalingTime() + 60 * count && !flag){
  272. log.info("checkDelEcs-实例开启时间大于{}分钟,开始关闭:{}",mqQueueConfig.getStopScalingTime(),mqEcs.getEcsName());
  273. DelEcsVo vo = new DelEcsVo(mqEcs,scalingMap.get(mqQueueConfig.getScalingConfigId()),null);
  274. delList.offer(vo);
  275. }
  276. }
  277. }
  278. /**
  279. * 关闭弹性伸缩
  280. */
  281. public void delEcsList() {
  282. try {
  283. if(delList.isEmpty()){
  284. return;
  285. }
  286. log.info("delEcsList--关闭弹性伸缩数量:{}",delList.size());
  287. DelEcsVo take = delList.poll();
  288. if(take == null){
  289. return;
  290. }
  291. String stopKey = String.format(RedisKey.ecsStopKey,take.getMqEcs().getEcsName());
  292. redisUtil.set(stopKey,take.getMqEcs().getEcsName(),60 * 60 * 24); //设置计算暂停锁
  293. String modelingKey = String.format(RedisKey.modelingKey,take.getMqEcs().getEcsName());
  294. if(redisUtil.hasKey(modelingKey)){
  295. log.info("delEcsList--有场景正在计算中:{},{}",modelingKey,redisUtil.get(modelingKey));
  296. redisUtil.del(stopKey);
  297. return;
  298. }
  299. Boolean delFlag = delEcs(take.getMqScalingConfig(), take.getMqEcs().getEcsName());
  300. if(delFlag){
  301. log.info("checkDelEcs--关闭弹性伸缩实例成功:{}", take.getMqEcs().getEcsName());
  302. mqEcsService.updateMqEcs(take.getMqEcs());
  303. }else {
  304. delList.offer(take);
  305. }
  306. }catch (Exception e){
  307. log.info("delEcsList--关闭弹性伸缩失败:",e);
  308. }
  309. }
  310. public synchronized Boolean createEcs(MqScalingConfig mqScaling){
  311. try {
  312. return ECSUtils.createEcs(mqScaling.getAccessKey(),mqScaling.getSecret(),mqScaling.getEndpoint(),mqScaling.getScalingRuleAri());
  313. }catch (Exception e){
  314. log.info("触发弹性伸缩失败:",e);
  315. }
  316. return false;
  317. }
  318. public synchronized Boolean delEcs(MqScalingConfig mqScaling, String instanceId){
  319. try {
  320. return ECSUtils.delEcs(mqScaling.getAccessKey(),mqScaling.getSecret(),mqScaling.getEndpoint(),mqScaling.getScalingGroupId(),instanceId);
  321. }catch (Exception e){
  322. log.info("关闭弹性伸缩失败:",e);
  323. }
  324. return false;
  325. }
  326. }