|
@@ -69,8 +69,22 @@ public class TaskService {
|
|
|
configLogsMap.clear();
|
|
|
return;
|
|
|
}
|
|
|
- log.info("未分配的mq队列数:{}",mqSendLogs.size());
|
|
|
|
|
|
+ List<MqSendLog> fdkkList = mqSendLogs.stream().filter(e -> "4dkk".equals(e.getType())).collect(Collectors.toList());
|
|
|
+ List<MqSendLog> panoList = mqSendLogs.stream().filter(e -> "pano".equals(e.getType())).collect(Collectors.toList());
|
|
|
+ List<MqQueueConfig> queueConfigList = queueConfigService.list();
|
|
|
+ rabbitMqService.getMqMsgMap(queueConfigList);
|
|
|
+
|
|
|
+ if(!fdkkList.isEmpty()){
|
|
|
+ sendMqByType(fdkkList,"4dkk");
|
|
|
+ }
|
|
|
+ if(!panoList.isEmpty()){
|
|
|
+ sendMqByType(panoList,"pano");
|
|
|
+ }
|
|
|
+ log.info("未分配的mq队列数4dkk:{},pano:{}",fdkkList.size(),panoList.size());
|
|
|
+ }
|
|
|
+
|
|
|
+ private void sendMqByType(List<MqSendLog> mqSendLogs,String type){
|
|
|
//设置分配队列
|
|
|
HashSet<String> contentList = new HashSet<>();
|
|
|
for (MqSendLog mqSendLog : mqSendLogs) {
|
|
@@ -90,8 +104,8 @@ public class TaskService {
|
|
|
mqSendLog.setConfigId(configId);
|
|
|
contentList.add(mqSendLog.getContent());
|
|
|
}
|
|
|
- List<MqQueueConfig> queueConfigList = queueConfigService.list();
|
|
|
- rabbitMqService.getMqMsgMap(queueConfigList);
|
|
|
+
|
|
|
+ List<MqQueueConfig> queueConfigList = queueConfigService.getByType(type);
|
|
|
|
|
|
for (MqQueueConfig config : queueConfigList) {
|
|
|
List<MqSendLog> msgList = mqSendLogs.stream().filter(e -> e.getConfigId().equals(config.getId())).collect(Collectors.toList());
|
|
@@ -268,7 +282,7 @@ public class TaskService {
|
|
|
}
|
|
|
Boolean flag = createEcs(take.getMqScalingConfig());
|
|
|
if(flag){
|
|
|
- mqEcsService.add(take.getQueueName());
|
|
|
+ mqEcsService.add(take.getQueueName(),take.getMqScalingConfig().getType());
|
|
|
sendRabbitMq(msgList,1,mqQueueConfig.getQueueName());
|
|
|
}else {
|
|
|
openList.offer(take);
|
|
@@ -311,8 +325,9 @@ public class TaskService {
|
|
|
if(mqMsg == null){
|
|
|
continue;
|
|
|
}
|
|
|
+ log.info("mq消息信息:{},{}",mqEcs.getQueueName(),mqMsg);
|
|
|
Integer notModelingCount = mqMsg.getConsumers() - mqMsg.getMessages_unacknowledged() - mqMsg.getMessages_ready();
|
|
|
- if(mqSize - (notModelingCount + mqQueueConfig.getScalingThreshold()) >0 ){
|
|
|
+ if( mqSize - (notModelingCount + mqQueueConfig.getScalingThreshold()) >0 ){
|
|
|
log.info("待计算数量:{}大于空闲服务数量:{},阈值{},不关闭",mqSize,notModelingCount,mqQueueConfig.getScalingThreshold());
|
|
|
continue;
|
|
|
}
|