|
@@ -190,11 +190,10 @@ public class TaskService {
|
|
|
//获取未关闭的服务器
|
|
|
List<MqEcs> list = mqEcsService.getNoStopByQueueName(mqQueueConfig.getQueueName());
|
|
|
countMap.put(mqQueueConfig.getQueueName(),list.size());
|
|
|
- LinkedBlockingQueue<DelEcsVo> openList = openMap.get(mqQueueConfig.getQueueName());
|
|
|
- if(openList == null){
|
|
|
- openList = new LinkedBlockingQueue<>();
|
|
|
- openMap.put(mqQueueConfig.getQueueName(),openList);
|
|
|
- }
|
|
|
+ LinkedBlockingQueue<DelEcsVo> openList = openMap.computeIfAbsent(
|
|
|
+ mqQueueConfig.getQueueName(),
|
|
|
+ k -> new LinkedBlockingQueue<>()
|
|
|
+ );
|
|
|
|
|
|
Integer ecsCount = countMap.get(mqQueueConfig.getQueueName());
|
|
|
MqMsg mqMsg = mqMsgMap.get(mqQueueConfig.getQueueName());
|
|
@@ -215,22 +214,23 @@ public class TaskService {
|
|
|
public void openEcsList() {
|
|
|
try {
|
|
|
HashMap<String, MqQueueConfig> queueMap = queueConfigService.getQueueMap();
|
|
|
- for (String key : openMap.keySet()) {
|
|
|
- LinkedBlockingQueue<DelEcsVo> openList = openMap.get(key);
|
|
|
+ for (Map.Entry<String, LinkedBlockingQueue<DelEcsVo>> entry : openMap.entrySet()) {
|
|
|
+ String key = entry.getKey();
|
|
|
+ LinkedBlockingQueue<DelEcsVo> openList = entry.getValue();
|
|
|
if(openList.isEmpty()){
|
|
|
return;
|
|
|
}
|
|
|
DelEcsVo take = openList.poll();
|
|
|
MqQueueConfig mqQueueConfig = queueMap.get(key);
|
|
|
List<MqSendLog> msgList = configLogsMap.get(mqQueueConfig.getId());
|
|
|
- countMap.put(mqQueueConfig.getQueueName(),countMap.get(mqQueueConfig.getQueueName()) +1);
|
|
|
+ countMap.merge(mqQueueConfig.getQueueName(), 1, Integer::sum);
|
|
|
|
|
|
log.info("openEcsList--开启弹性伸缩数量:{},{}",key,openList.size());
|
|
|
Boolean flag = createEcs( take.getMqScalingConfig());
|
|
|
if(flag){
|
|
|
mqEcsService.add(take.getQueueName());
|
|
|
sendRabbitMq(msgList,1,mqQueueConfig.getQueueName());
|
|
|
- Thread.sleep(1000L * 5);
|
|
|
+ Thread.sleep(5000L);
|
|
|
}else {
|
|
|
openList.offer(take);
|
|
|
}
|
|
@@ -256,9 +256,9 @@ public class TaskService {
|
|
|
if(mqQueueConfig == null || mqQueueConfig.getScalingConfigId() == null){
|
|
|
continue;
|
|
|
}
|
|
|
- Long between = DateUtil.between(mqEcs.getCreateTime(), new Date(), DateUnit.MINUTE);
|
|
|
+ long between = DateUtil.between(mqEcs.getCreateTime(), new Date(), DateUnit.MINUTE);
|
|
|
//弹性伸缩按照一个小时计费
|
|
|
- Long count = between/60;
|
|
|
+ long count = between/60;
|
|
|
boolean flag = delList.stream().anyMatch(e -> e.getMqEcs().getEcsName().equals(mqEcs.getEcsName()));
|
|
|
if(between >= mqQueueConfig.getStopScalingTime() + 60 * count && !flag){
|
|
|
log.info("checkDelEcs-实例开启时间大于{}分钟,开始关闭:{}",mqQueueConfig.getStopScalingTime(),mqEcs.getEcsName());
|