|
@@ -3,11 +3,9 @@ package com.fdkankan.mqcontroller.task;
|
|
import cn.hutool.core.date.DateUnit;
|
|
import cn.hutool.core.date.DateUnit;
|
|
import cn.hutool.core.date.DateUtil;
|
|
import cn.hutool.core.date.DateUtil;
|
|
import cn.hutool.log.Log;
|
|
import cn.hutool.log.Log;
|
|
|
|
+import com.alibaba.fastjson.JSONObject;
|
|
import com.fdkankan.mqcontroller.entity.*;
|
|
import com.fdkankan.mqcontroller.entity.*;
|
|
-import com.fdkankan.mqcontroller.service.IMqEcsService;
|
|
|
|
-import com.fdkankan.mqcontroller.service.IMqQueueConfigService;
|
|
|
|
-import com.fdkankan.mqcontroller.service.IMqScalingConfigService;
|
|
|
|
-import com.fdkankan.mqcontroller.service.IMqSendLogService;
|
|
|
|
|
|
+import com.fdkankan.mqcontroller.service.*;
|
|
import com.fdkankan.mqcontroller.utils.ECSUtils;
|
|
import com.fdkankan.mqcontroller.utils.ECSUtils;
|
|
import com.fdkankan.mqcontroller.utils.RabbitMqUtils;
|
|
import com.fdkankan.mqcontroller.utils.RabbitMqUtils;
|
|
import com.fdkankan.mqcontroller.utils.RedisKey;
|
|
import com.fdkankan.mqcontroller.utils.RedisKey;
|
|
@@ -43,6 +41,8 @@ public class TaskService {
|
|
IMqEcsService mqEcsService;
|
|
IMqEcsService mqEcsService;
|
|
@Autowired
|
|
@Autowired
|
|
RedisUtil redisUtil;
|
|
RedisUtil redisUtil;
|
|
|
|
+ @Autowired
|
|
|
|
+ IScenePlusService scenePlusService;
|
|
|
|
|
|
public static Integer checkOpenCount = 0;
|
|
public static Integer checkOpenCount = 0;
|
|
|
|
|
|
@@ -58,9 +58,26 @@ public class TaskService {
|
|
return;
|
|
return;
|
|
}
|
|
}
|
|
log.info("未分配的mq队列数:{}",mqSendLogs.size());
|
|
log.info("未分配的mq队列数:{}",mqSendLogs.size());
|
|
|
|
+
|
|
|
|
+ //设置分配队列
|
|
|
|
+ HashSet<String> numList = new HashSet<>();
|
|
|
|
+ for (MqSendLog mqSendLog : mqSendLogs) {
|
|
|
|
+ Integer configId = scenePlusService.getQueueConfig(mqSendLog);
|
|
|
|
+ if(numList.contains(mqSendLog.getNum())){
|
|
|
|
+ mqSendLogService.updateStatus(mqSendLog.getId(),2,null);
|
|
|
|
+ continue;
|
|
|
|
+ }
|
|
|
|
+ if(configId == null ){
|
|
|
|
+ mqSendLogService.updateStatus(mqSendLog.getId(),3,null);
|
|
|
|
+ continue;
|
|
|
|
+ }
|
|
|
|
+ mqSendLog.setConfigId(configId);
|
|
|
|
+ numList.add(mqSendLog.getNum());
|
|
|
|
+ }
|
|
|
|
+
|
|
List<MqQueueConfig> queueConfigList = queueConfigService.list();
|
|
List<MqQueueConfig> queueConfigList = queueConfigService.list();
|
|
for (MqQueueConfig mqQueueConfig : queueConfigList) {
|
|
for (MqQueueConfig mqQueueConfig : queueConfigList) {
|
|
- List<MqSendLog> msgList = mqSendLogs.stream().filter(e -> e.getQueue().equals(mqQueueConfig.getQueueName())).collect(Collectors.toList());
|
|
|
|
|
|
+ List<MqSendLog> msgList = mqSendLogs.stream().filter(e -> e.getConfigId().equals(mqQueueConfig.getId())).collect(Collectors.toList());
|
|
if(msgList.isEmpty()){
|
|
if(msgList.isEmpty()){
|
|
continue;
|
|
continue;
|
|
}
|
|
}
|
|
@@ -77,7 +94,7 @@ public class TaskService {
|
|
if(mqQueueConfig.getOpenScaling() == 1){ //开启弹性伸缩
|
|
if(mqQueueConfig.getOpenScaling() == 1){ //开启弹性伸缩
|
|
checkOpenEcs(mqQueueConfig,msgList.size(),mqMsg.getMessages_ready());
|
|
checkOpenEcs(mqQueueConfig,msgList.size(),mqMsg.getMessages_ready());
|
|
}
|
|
}
|
|
- sendRabbitMq(msgList,mqMsg.getConsumers() - mqMsg.getMessages_unacknowledged() - mqMsg.getMessages_ready());
|
|
|
|
|
|
+ sendRabbitMq(msgList,mqMsg.getConsumers() - mqMsg.getMessages_unacknowledged() - mqMsg.getMessages_ready(),mqQueueConfig.getQueueName());
|
|
}
|
|
}
|
|
|
|
|
|
}
|
|
}
|
|
@@ -89,16 +106,14 @@ public class TaskService {
|
|
checkOpenCount ++;
|
|
checkOpenCount ++;
|
|
}
|
|
}
|
|
|
|
|
|
- private void sendRabbitMq(List<MqSendLog> msgList, Integer msgCount) {
|
|
|
|
|
|
+ private void sendRabbitMq(List<MqSendLog> msgList, Integer msgCount,String queueName) {
|
|
for (int i = 0;i < msgCount ;i++){
|
|
for (int i = 0;i < msgCount ;i++){
|
|
if(i > msgList.size() -1){
|
|
if(i > msgList.size() -1){
|
|
continue;
|
|
continue;
|
|
}
|
|
}
|
|
MqSendLog mqSendLog = msgList.get(i);
|
|
MqSendLog mqSendLog = msgList.get(i);
|
|
- mqSendLog.setStatus(1);
|
|
|
|
- mqSendLog.setUpdateTime(null);
|
|
|
|
- mqSendLogService.updateById(mqSendLog);
|
|
|
|
- rabbitMqProducer.sendByWorkQueue(mqSendLog.getQueue(),mqSendLog.getContent());
|
|
|
|
|
|
+ mqSendLogService.updateStatus(mqSendLog.getId(),1,queueName);
|
|
|
|
+ rabbitMqProducer.sendByWorkQueue(queueName,mqSendLog.getContent());
|
|
}
|
|
}
|
|
}
|
|
}
|
|
|
|
|