|
@@ -95,7 +95,7 @@ public class TaskService {
|
|
|
}
|
|
|
}
|
|
|
if(mqQueueConfig.getOpenScaling() == 1){ //开启弹性伸缩
|
|
|
- checkOpenEcs(mqQueueConfig,msgList.size(),mqMsg.getMessages_ready());
|
|
|
+ checkOpenEcs(mqQueueConfig,msgList,mqMsg.getMessages_ready());
|
|
|
}
|
|
|
sendRabbitMq(msgList,mqMsg.getConsumers() - mqMsg.getMessages_unacknowledged() - mqMsg.getMessages_ready(),mqQueueConfig.getQueueName());
|
|
|
}
|
|
@@ -121,16 +121,16 @@ public class TaskService {
|
|
|
}
|
|
|
|
|
|
|
|
|
- private void checkOpenEcs(MqQueueConfig mqQueueConfig,Integer msgCount,Integer readCount) {
|
|
|
+ private void checkOpenEcs(MqQueueConfig mqQueueConfig,List<MqSendLog> msgList,Integer readCount) {
|
|
|
List<MqEcs> list = mqEcsService.getNoModelingByQueueName(mqQueueConfig.getQueueName());
|
|
|
LinkedBlockingQueue<DelEcsVo> openList = openMap.get(mqQueueConfig.getQueueName());
|
|
|
if(openList == null){
|
|
|
openList = new LinkedBlockingQueue<>();
|
|
|
openMap.put(mqQueueConfig.getQueueName(),openList);
|
|
|
}
|
|
|
- if(msgCount + readCount > mqQueueConfig.getScalingThreshold() + list.size() + openList.size() && mqQueueConfig.getOpenScalingTime() % checkOpenCount == 0){
|
|
|
+ if(msgList.size() + readCount > mqQueueConfig.getScalingThreshold() + list.size() + openList.size() && mqQueueConfig.getOpenScalingTime() % checkOpenCount == 0){
|
|
|
MqScalingConfig mqScalingConfig = mqScalingConfigService.getById(mqQueueConfig.getScalingConfigId());
|
|
|
- DelEcsVo vo = new DelEcsVo(null,mqScalingConfig,mqQueueConfig.getQueueName(),new Date());
|
|
|
+ DelEcsVo vo = new DelEcsVo(null,mqScalingConfig,mqQueueConfig.getQueueName(),new Date(),msgList);
|
|
|
openList.offer(vo);
|
|
|
}
|
|
|
}
|
|
@@ -144,7 +144,7 @@ public class TaskService {
|
|
|
return;
|
|
|
}
|
|
|
DelEcsVo take = openList.poll();
|
|
|
- List<MqSendLog> msgList = mqSendLogService.getNoSendMsgByQueueName(key);
|
|
|
+ List<MqSendLog> msgList = take.getMsgList();
|
|
|
MqQueueConfig mqQueueConfig = queueMap.get(key);
|
|
|
if(msgList.size() <= mqQueueConfig.getScalingThreshold()){
|
|
|
log.info("openEcsList--待计算任务为:{}未超过阈值:{}无需开启弹性伸缩:{}",msgList.size(),mqQueueConfig.getScalingThreshold() ,key);
|
|
@@ -186,7 +186,7 @@ public class TaskService {
|
|
|
boolean flag = delList.stream().anyMatch(e -> e.getMqEcs().getEcsName().equals(mqEcs.getEcsName()));
|
|
|
if(between >= mqQueueConfig.getStopScalingTime() + 60 * count && !flag){
|
|
|
log.info("checkDelEcs-实例开启时间大于{}分钟,开始关闭:{}",mqQueueConfig.getStopScalingTime(),mqEcs.getEcsName());
|
|
|
- DelEcsVo vo = new DelEcsVo(mqEcs,scalingMap.get(mqQueueConfig.getScalingConfigId()),null,new Date());
|
|
|
+ DelEcsVo vo = new DelEcsVo(mqEcs,scalingMap.get(mqQueueConfig.getScalingConfigId()),null,new Date(),null);
|
|
|
delList.offer(vo);
|
|
|
}
|
|
|
}
|