Explorar o código

mq控制,弹性伸缩控制

lyhzzz hai 1 ano
pai
achega
05b70124ca
Modificáronse 35 ficheiros con 1601 adicións e 7 borrados
  1. 100 0
      pom.xml
  2. 0 7
      src/main/java/com/fdkankan/mqcontroller/Main.java
  3. 23 0
      src/main/java/com/fdkankan/mqcontroller/MqControllerApplication.java
  4. 26 0
      src/main/java/com/fdkankan/mqcontroller/config/ThreadPoolConfig.java
  5. 17 0
      src/main/java/com/fdkankan/mqcontroller/entity/DelEcsVo.java
  6. 58 0
      src/main/java/com/fdkankan/mqcontroller/entity/MqEcs.java
  7. 15 0
      src/main/java/com/fdkankan/mqcontroller/entity/MqMsg.java
  8. 79 0
      src/main/java/com/fdkankan/mqcontroller/entity/MqQueueConfig.java
  9. 63 0
      src/main/java/com/fdkankan/mqcontroller/entity/MqScalingConfig.java
  10. 61 0
      src/main/java/com/fdkankan/mqcontroller/entity/MqSendLog.java
  11. 98 0
      src/main/java/com/fdkankan/mqcontroller/generate/AutoGenerate.java
  12. 18 0
      src/main/java/com/fdkankan/mqcontroller/mapper/IMqEcsMapper.java
  13. 18 0
      src/main/java/com/fdkankan/mqcontroller/mapper/IMqQueueConfigMapper.java
  14. 18 0
      src/main/java/com/fdkankan/mqcontroller/mapper/IMqScalingConfigMapper.java
  15. 18 0
      src/main/java/com/fdkankan/mqcontroller/mapper/IMqSendLogMapper.java
  16. 26 0
      src/main/java/com/fdkankan/mqcontroller/service/IMqEcsService.java
  17. 19 0
      src/main/java/com/fdkankan/mqcontroller/service/IMqQueueConfigService.java
  18. 19 0
      src/main/java/com/fdkankan/mqcontroller/service/IMqScalingConfigService.java
  19. 21 0
      src/main/java/com/fdkankan/mqcontroller/service/IMqSendLogService.java
  20. 65 0
      src/main/java/com/fdkankan/mqcontroller/service/impl/MqEcsServiceImpl.java
  21. 30 0
      src/main/java/com/fdkankan/mqcontroller/service/impl/MqQueueConfigServiceImpl.java
  22. 28 0
      src/main/java/com/fdkankan/mqcontroller/service/impl/MqScalingConfigServiceImpl.java
  23. 39 0
      src/main/java/com/fdkankan/mqcontroller/service/impl/MqSendLogServiceImpl.java
  24. 70 0
      src/main/java/com/fdkankan/mqcontroller/task/TaskController.java
  25. 243 0
      src/main/java/com/fdkankan/mqcontroller/task/TaskService.java
  26. 72 0
      src/main/java/com/fdkankan/mqcontroller/utils/ECSUtils.java
  27. 40 0
      src/main/java/com/fdkankan/mqcontroller/utils/RabbitMqUtils.java
  28. 7 0
      src/main/java/com/fdkankan/mqcontroller/utils/RedisKey.java
  29. 27 0
      src/main/resources/bootstrap-test.yml
  30. 8 0
      src/main/resources/bootstrap.yml
  31. 255 0
      src/main/resources/logback-spring.xml
  32. 5 0
      src/main/resources/mapper/mqcontroller/MqEcsMapper.xml
  33. 5 0
      src/main/resources/mapper/mqcontroller/MqQueueConfigMapper.xml
  34. 5 0
      src/main/resources/mapper/mqcontroller/MqScalingConfigMapper.xml
  35. 5 0
      src/main/resources/mapper/mqcontroller/MqSendLogMapper.xml

+ 100 - 0
pom.xml

@@ -7,6 +7,14 @@
     <groupId>com.fdkankan</groupId>
     <artifactId>fdkankan-mq-controller</artifactId>
     <version>1.0-SNAPSHOT</version>
+    <packaging>jar</packaging>
+
+    <parent>
+        <groupId>org.springframework.boot</groupId>
+        <artifactId>spring-boot-starter-parent</artifactId>
+        <version>2.3.12.RELEASE</version>
+        <relativePath/>
+    </parent>
 
     <properties>
         <maven.compiler.source>8</maven.compiler.source>
@@ -14,4 +22,96 @@
         <project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
     </properties>
 
+    <dependencies>
+
+        <dependency>
+            <groupId>org.springframework.boot</groupId>
+            <artifactId>spring-boot-starter-web</artifactId>
+        </dependency>
+        <dependency>
+            <groupId>org.springframework.boot</groupId>
+            <artifactId>spring-boot-starter-aop</artifactId>
+        </dependency>
+
+        <dependency>
+            <groupId>org.springframework.boot</groupId>
+            <artifactId>spring-boot-starter-test</artifactId>
+            <scope>test</scope>
+        </dependency>
+
+        <dependency>
+            <groupId>com.fdkankan</groupId>
+            <artifactId>4dkankan-utils-db</artifactId>
+            <version>3.0.0-SNAPSHOT</version>
+        </dependency>
+
+        <dependency>
+            <groupId>com.fdkankan</groupId>
+            <artifactId>4dkankan-utils-rabbitmq</artifactId>
+            <version>3.0.0-SNAPSHOT</version>
+        </dependency>
+
+        <dependency>
+            <groupId>com.fdkankan</groupId>
+            <artifactId>4dkankan-utils-redis</artifactId>
+            <version>3.0.0-SNAPSHOT</version>
+        </dependency>
+
+
+        <dependency>
+            <groupId>com.alibaba.cloud</groupId>
+            <artifactId>spring-cloud-starter-alibaba-nacos-discovery</artifactId>
+            <version>2.2.7.RELEASE</version>
+        </dependency>
+
+        <dependency>
+            <groupId>com.alibaba.cloud</groupId>
+            <artifactId>spring-cloud-starter-alibaba-nacos-config</artifactId>
+            <version>2.2.7.RELEASE</version>
+        </dependency>
+
+        <dependency>
+            <groupId>com.aliyun</groupId>
+            <artifactId>ess20220222</artifactId>
+            <version>1.0.5</version>
+        </dependency>
+
+        <dependency>
+            <groupId>org.projectlombok</groupId>
+            <artifactId>lombok</artifactId>
+        </dependency>
+
+        <dependency>
+            <groupId>com.alibaba</groupId>
+            <artifactId>fastjson</artifactId>
+            <version>1.2.83</version>
+        </dependency>
+    </dependencies>
+
+
+    <build>
+        <plugins>
+            <plugin>
+                <groupId>org.springframework.boot</groupId>
+                <artifactId>spring-boot-maven-plugin</artifactId>
+            </plugin>
+
+            <plugin>
+                <groupId>org.apache.maven.plugins</groupId>
+                <artifactId>maven-surefire-plugin</artifactId>
+                <configuration>
+                    <testFailureIgnore>true</testFailureIgnore>
+                </configuration>
+            </plugin>
+            <plugin>
+                <groupId>org.apache.maven.plugins</groupId>
+                <artifactId>maven-compiler-plugin</artifactId>
+                <configuration>
+                    <source>8</source>
+                    <target>8</target>
+                </configuration>
+            </plugin>
+        </plugins>
+
+    </build>
 </project>

+ 0 - 7
src/main/java/com/fdkankan/mqcontroller/Main.java

@@ -1,7 +0,0 @@
-package com.fdkankan.mqcontroller;
-
-public class Main {
-    public static void main(String[] args) {
-        System.out.println("Hello world!");
-    }
-}

+ 23 - 0
src/main/java/com/fdkankan/mqcontroller/MqControllerApplication.java

@@ -0,0 +1,23 @@
+package com.fdkankan.mqcontroller;
+
+
+import org.mybatis.spring.annotation.MapperScan;
+import org.springframework.boot.SpringApplication;
+import org.springframework.boot.autoconfigure.SpringBootApplication;
+import org.springframework.cloud.client.discovery.EnableDiscoveryClient;
+import org.springframework.context.annotation.ComponentScan;
+import org.springframework.scheduling.annotation.EnableAsync;
+import org.springframework.scheduling.annotation.EnableScheduling;
+import org.springframework.transaction.annotation.EnableTransactionManagement;
+
+@SpringBootApplication
+@EnableTransactionManagement//开启事务
+@EnableDiscoveryClient
+@EnableScheduling
+@ComponentScan(basePackages = {"com.fdkankan.*"})
+@MapperScan("com.fdkankan.**.mapper")
+public class MqControllerApplication {
+    public static void main(String[] args) {
+        SpringApplication.run(MqControllerApplication.class, args);
+    }
+}

+ 26 - 0
src/main/java/com/fdkankan/mqcontroller/config/ThreadPoolConfig.java

@@ -0,0 +1,26 @@
+package com.fdkankan.mqcontroller.config;
+
+import org.springframework.context.annotation.Bean;
+import org.springframework.context.annotation.Configuration;
+import org.springframework.scheduling.annotation.EnableAsync;
+import org.springframework.scheduling.concurrent.ThreadPoolTaskExecutor;
+
+import java.util.concurrent.Executor;
+
+@Configuration
+@EnableAsync
+public class ThreadPoolConfig {
+
+    @Bean("taskExecutor")
+    public Executor taskExecutor() {
+        ThreadPoolTaskExecutor taskExecutor = new ThreadPoolTaskExecutor();
+        taskExecutor.setCorePoolSize(10);
+        taskExecutor.setMaxPoolSize(50);
+        taskExecutor.setQueueCapacity(200);
+        taskExecutor.setKeepAliveSeconds(60);
+        taskExecutor.setThreadNamePrefix("Thread-");
+        taskExecutor.setAwaitTerminationSeconds(60);
+        return taskExecutor;
+    }
+}
+

+ 17 - 0
src/main/java/com/fdkankan/mqcontroller/entity/DelEcsVo.java

@@ -0,0 +1,17 @@
+package com.fdkankan.mqcontroller.entity;
+
+import lombok.AllArgsConstructor;
+import lombok.Data;
+import lombok.NoArgsConstructor;
+
+import java.util.Date;
+
+@Data
+@AllArgsConstructor
+@NoArgsConstructor
+public class DelEcsVo {
+    private MqEcs mqEcs;
+    private MqScalingConfig mqScalingConfig;
+    private String queueName;
+    private Date putTime;
+}

+ 58 - 0
src/main/java/com/fdkankan/mqcontroller/entity/MqEcs.java

@@ -0,0 +1,58 @@
+package com.fdkankan.mqcontroller.entity;
+
+import com.baomidou.mybatisplus.annotation.IdType;
+import com.baomidou.mybatisplus.annotation.TableField;
+import com.baomidou.mybatisplus.annotation.TableId;
+import com.baomidou.mybatisplus.annotation.TableName;
+import java.io.Serializable;
+import java.util.Date;
+import lombok.Getter;
+import lombok.Setter;
+
+/**
+ * <p>
+ * 
+ * </p>
+ *
+ * @author 
+ * @since 2024-04-02
+ */
+@Getter
+@Setter
+@TableName("mq_ecs")
+public class MqEcs implements Serializable {
+
+    private static final long serialVersionUID = 1L;
+
+    @TableId(value = "id", type = IdType.AUTO)
+    private Integer id;
+
+    @TableField("ecs_name")
+    private String ecsName;
+
+    @TableField("queue_name")
+    private String queueName;
+
+    @TableField("create_time")
+    private Date createTime;
+
+    @TableField("update_time")
+    private Date updateTime;
+
+    @TableField("remark")
+    private String remark;
+
+    /**
+     * 0运行中未计算,1运行中计算中,2停止运行
+     */
+    @TableField("status")
+    private Integer status;
+
+    /**
+     * 是否是弹性伸缩0否1是
+     */
+    @TableField("is_scaling")
+    private Integer isScaling;
+
+
+}

+ 15 - 0
src/main/java/com/fdkankan/mqcontroller/entity/MqMsg.java

@@ -0,0 +1,15 @@
+package com.fdkankan.mqcontroller.entity;
+
+import lombok.Data;
+import lombok.ToString;
+
+@Data
+@ToString
+public class MqMsg {
+    private Integer messages_ready;
+    private Integer messages_unacknowledged;
+    private Integer messages;
+    private Integer consumers;
+    private String state;
+
+}

+ 79 - 0
src/main/java/com/fdkankan/mqcontroller/entity/MqQueueConfig.java

@@ -0,0 +1,79 @@
+package com.fdkankan.mqcontroller.entity;
+
+import com.baomidou.mybatisplus.annotation.IdType;
+import com.baomidou.mybatisplus.annotation.TableField;
+import com.baomidou.mybatisplus.annotation.TableId;
+import com.baomidou.mybatisplus.annotation.TableLogic;
+import com.baomidou.mybatisplus.annotation.TableName;
+import java.io.Serializable;
+import java.util.Date;
+import lombok.Getter;
+import lombok.Setter;
+import lombok.ToString;
+
+/**
+ * <p>
+ * 
+ * </p>
+ *
+ * @author 
+ * @since 2024-04-02
+ */
+@Getter
+@Setter
+@ToString
+@TableName("mq_queue_config")
+public class MqQueueConfig implements Serializable {
+
+    private static final long serialVersionUID = 1L;
+
+    @TableId(value = "id", type = IdType.AUTO)
+    private Integer id;
+
+    /**
+     * mq队列名称
+     */
+    @TableField("queue_name")
+    private String queueName;
+
+    /**
+     * 是否开启弹性伸缩0否,1是
+     */
+    @TableField("open_scaling")
+    private Integer openScaling;
+
+    /**
+     * 开启弹性伸缩阈值
+     */
+    @TableField("scaling_threshold")
+    private Integer scalingThreshold;
+
+    /**
+     * 检测是否需要开启间隔时间,单位分钟
+     */
+    @TableField("open_scaling_time")
+    private Integer openScalingTime;
+    /**
+     * 检测关闭弹性伸缩时间间隔,单位分钟
+     */
+    @TableField("stop_scaling_time")
+    private Integer stopScalingTime;
+
+    @TableField("scaling_config_id")
+    private Integer scalingConfigId;
+
+    @TableField("remark")
+    private String remark;
+
+    @TableField("rec_status")
+    @TableLogic(value = "A",delval = "I")
+    private String recStatus;
+
+    @TableField("create_time")
+    private Date createTime;
+
+    @TableField("update_time")
+    private Date updateTime;
+
+
+}

+ 63 - 0
src/main/java/com/fdkankan/mqcontroller/entity/MqScalingConfig.java

@@ -0,0 +1,63 @@
+package com.fdkankan.mqcontroller.entity;
+
+import com.baomidou.mybatisplus.annotation.IdType;
+import com.baomidou.mybatisplus.annotation.TableField;
+import com.baomidou.mybatisplus.annotation.TableId;
+import com.baomidou.mybatisplus.annotation.TableLogic;
+import com.baomidou.mybatisplus.annotation.TableName;
+import java.io.Serializable;
+import java.util.Date;
+import lombok.Getter;
+import lombok.Setter;
+
+/**
+ * <p>
+ * 
+ * </p>
+ *
+ * @author 
+ * @since 2024-04-02
+ */
+@Getter
+@Setter
+@TableName("mq_scaling_config")
+public class MqScalingConfig implements Serializable {
+
+    private static final long serialVersionUID = 1L;
+
+    @TableId(value = "id", type = IdType.AUTO)
+    private Integer id;
+
+    /**
+     * 弹性伸缩Id
+     */
+    @TableField("scaling_group_id")
+    private String scalingGroupId;
+
+    /**
+     * 规则
+     */
+    @TableField("scaling_rule_ari")
+    private String scalingRuleAri;
+
+    @TableField("access_key")
+    private String accessKey;
+
+    @TableField("secret")
+    private String secret;
+
+    @TableField("rec_status")
+    @TableLogic(value = "A",delval = "I")
+    private String recStatus;
+
+    @TableField("create_time")
+    private Date createTime;
+
+    @TableField("update_time")
+    private Date updateTime;
+
+    @TableField("endpoint")
+    private String endpoint;
+
+
+}

+ 61 - 0
src/main/java/com/fdkankan/mqcontroller/entity/MqSendLog.java

@@ -0,0 +1,61 @@
+package com.fdkankan.mqcontroller.entity;
+
+import com.baomidou.mybatisplus.annotation.IdType;
+import com.baomidou.mybatisplus.annotation.TableField;
+import com.baomidou.mybatisplus.annotation.TableId;
+import com.baomidou.mybatisplus.annotation.TableName;
+import java.io.Serializable;
+import java.util.Date;
+import lombok.Getter;
+import lombok.Setter;
+
+/**
+ * <p>
+ * 
+ * </p>
+ *
+ * @author 
+ * @since 2024-04-02
+ */
+@Getter
+@Setter
+@TableName("mq_send_log")
+public class MqSendLog implements Serializable {
+
+    private static final long serialVersionUID = 1L;
+
+    @TableId(value = "id", type = IdType.AUTO)
+    private Integer id;
+
+    /**
+     * 发送的队列
+     */
+    @TableField("queue")
+    private String queue;
+
+    /**
+     * 发送的mq消息
+     */
+    @TableField("content")
+    private String content;
+
+    @TableField("num")
+    private String num;
+
+    @TableField("es_name")
+    private String esName;
+
+    /**
+     * 0未发送,1已发送
+     */
+    @TableField("status")
+    private Integer status;
+
+    @TableField("create_time")
+    private Date createTime;
+
+    @TableField("update_time")
+    private Date updateTime;
+
+
+}

+ 98 - 0
src/main/java/com/fdkankan/mqcontroller/generate/AutoGenerate.java

@@ -0,0 +1,98 @@
+package com.fdkankan.mqcontroller.generate;
+
+import com.baomidou.mybatisplus.core.mapper.BaseMapper;
+import com.baomidou.mybatisplus.generator.FastAutoGenerator;
+import com.baomidou.mybatisplus.generator.config.OutputFile;
+import com.baomidou.mybatisplus.generator.config.rules.DateType;
+
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.List;
+
+public class AutoGenerate {
+
+
+    public static void main(String[] args) {
+
+        String path =System.getProperty("user.dir");
+
+        generate(path,"mqcontroller", getTables(new String[]{
+                "mq_ecs"
+        }));
+
+//        generate(path,"goods", getTables(new String[]{
+//                        "t_camera","t_camera_detail","t_camera_out","t_camera_space","t_camera_version",
+//                        "t_company","t_goods","t_goods_sku","t_cart","t_goods_spec",
+//                        "t_goods_spec_value","t_goods_spu_spec","t_sn_code"
+//        }));
+//
+//        generate(path,"order", getTables(new String[]{
+//                        "t_increment_order","t_invoice","t_order","t_order_item",
+//                        "t_pre_sale","t_space_sdk","t_trade_log","t_commerce_order","t_download_order","t_expansion_order"
+//        }));
+//        generate(path,"order", getTables(new String[]{
+//                        "t_virtual_order"
+//        }));
+//
+//        generate(path,"user", getTables(new String[]{
+//                        "t_user","t_user_increment","t_manager","t_province","t_increment_type","t_intercom_message","t_receiver_info"
+//        }));
+    }
+
+    public static List<String> getTables(String [] tableNames){
+        return new ArrayList<>(Arrays.asList(tableNames));
+    }
+
+
+    public static void  generate(String path,String moduleName,  List<String> tables){
+        FastAutoGenerator.create("jdbc:mysql://120.24.144.164:3306/4dkankan_v4",
+                "root","4Dage@4Dage#@168")
+                .globalConfig(builder -> {
+                    builder.author("")               //作者
+                            .outputDir(path+"\\src\\main\\java")    //输出路径(写到java目录)
+                            //.enableSwagger()           //开启swagger
+                            .commentDate("yyyy-MM-dd")
+                            .dateType(DateType.ONLY_DATE)
+                            .fileOverride();            //开启覆盖之前生成的文件
+
+                })
+                .packageConfig(builder -> {
+                    builder.parent("com.fdkankan")
+                            .moduleName(moduleName)
+                            .entity("entity")
+                            .service("service")
+                            .serviceImpl("service.impl")
+                            .controller("controller")
+                            .mapper("mapper")
+                            .xml("test.mapper")
+                            .pathInfo(Collections.singletonMap(OutputFile.mapperXml,path+"\\src\\main\\resources\\mapper\\"+moduleName));
+                })
+                .strategyConfig(builder -> {
+                    builder.addInclude(tables)
+                            .addTablePrefix("t_")
+
+                            .serviceBuilder()
+                            .formatServiceFileName("I%sService")
+                            .formatServiceImplFileName("%sServiceImpl")
+
+                            .entityBuilder()
+                            .enableLombok()
+                            .logicDeleteColumnName("rec_status")
+                            .enableTableFieldAnnotation()
+                            //.superClass(BaseEntity.class)
+
+                            .controllerBuilder()
+                            .formatFileName("%sController")
+                            .enableRestStyle()
+
+                            .mapperBuilder()
+                            .superClass(BaseMapper.class)
+                            .formatMapperFileName("I%sMapper")
+                            .enableMapperAnnotation()
+                            .formatXmlFileName("%sMapper");
+                })
+                // .templateEngine(new FreemarkerTemplateEngine()) // 使用Freemarker引擎模板,默认的是Velocity引擎模板
+                .execute();
+    }
+}

+ 18 - 0
src/main/java/com/fdkankan/mqcontroller/mapper/IMqEcsMapper.java

@@ -0,0 +1,18 @@
+package com.fdkankan.mqcontroller.mapper;
+
+import com.fdkankan.mqcontroller.entity.MqEcs;
+import com.baomidou.mybatisplus.core.mapper.BaseMapper;
+import org.apache.ibatis.annotations.Mapper;
+
+/**
+ * <p>
+ *  Mapper 接口
+ * </p>
+ *
+ * @author 
+ * @since 2024-04-02
+ */
+@Mapper
+public interface IMqEcsMapper extends BaseMapper<MqEcs> {
+
+}

+ 18 - 0
src/main/java/com/fdkankan/mqcontroller/mapper/IMqQueueConfigMapper.java

@@ -0,0 +1,18 @@
+package com.fdkankan.mqcontroller.mapper;
+
+import com.fdkankan.mqcontroller.entity.MqQueueConfig;
+import com.baomidou.mybatisplus.core.mapper.BaseMapper;
+import org.apache.ibatis.annotations.Mapper;
+
+/**
+ * <p>
+ *  Mapper 接口
+ * </p>
+ *
+ * @author 
+ * @since 2024-04-02
+ */
+@Mapper
+public interface IMqQueueConfigMapper extends BaseMapper<MqQueueConfig> {
+
+}

+ 18 - 0
src/main/java/com/fdkankan/mqcontroller/mapper/IMqScalingConfigMapper.java

@@ -0,0 +1,18 @@
+package com.fdkankan.mqcontroller.mapper;
+
+import com.fdkankan.mqcontroller.entity.MqScalingConfig;
+import com.baomidou.mybatisplus.core.mapper.BaseMapper;
+import org.apache.ibatis.annotations.Mapper;
+
+/**
+ * <p>
+ *  Mapper 接口
+ * </p>
+ *
+ * @author 
+ * @since 2024-04-02
+ */
+@Mapper
+public interface IMqScalingConfigMapper extends BaseMapper<MqScalingConfig> {
+
+}

+ 18 - 0
src/main/java/com/fdkankan/mqcontroller/mapper/IMqSendLogMapper.java

@@ -0,0 +1,18 @@
+package com.fdkankan.mqcontroller.mapper;
+
+import com.fdkankan.mqcontroller.entity.MqSendLog;
+import com.baomidou.mybatisplus.core.mapper.BaseMapper;
+import org.apache.ibatis.annotations.Mapper;
+
+/**
+ * <p>
+ *  Mapper 接口
+ * </p>
+ *
+ * @author 
+ * @since 2024-04-02
+ */
+@Mapper
+public interface IMqSendLogMapper extends BaseMapper<MqSendLog> {
+
+}

+ 26 - 0
src/main/java/com/fdkankan/mqcontroller/service/IMqEcsService.java

@@ -0,0 +1,26 @@
+package com.fdkankan.mqcontroller.service;
+
+import com.fdkankan.mqcontroller.entity.MqEcs;
+import com.baomidou.mybatisplus.extension.service.IService;
+
+import java.util.List;
+
+/**
+ * <p>
+ *  服务类
+ * </p>
+ *
+ * @author 
+ * @since 2024-04-02
+ */
+public interface IMqEcsService extends IService<MqEcs> {
+
+    void add(String queueName);
+
+    List<MqEcs> getScalingNotStopList();
+
+    List<MqEcs> getNoModelingByQueueName(String queueName);
+
+    void updateMqEcs(MqEcs mqEcs);
+
+}

+ 19 - 0
src/main/java/com/fdkankan/mqcontroller/service/IMqQueueConfigService.java

@@ -0,0 +1,19 @@
+package com.fdkankan.mqcontroller.service;
+
+import com.fdkankan.mqcontroller.entity.MqQueueConfig;
+import com.baomidou.mybatisplus.extension.service.IService;
+
+import java.util.HashMap;
+
+/**
+ * <p>
+ *  服务类
+ * </p>
+ *
+ * @author 
+ * @since 2024-04-02
+ */
+public interface IMqQueueConfigService extends IService<MqQueueConfig> {
+
+    HashMap<String, MqQueueConfig> getQueueMap();
+}

+ 19 - 0
src/main/java/com/fdkankan/mqcontroller/service/IMqScalingConfigService.java

@@ -0,0 +1,19 @@
+package com.fdkankan.mqcontroller.service;
+
+import com.fdkankan.mqcontroller.entity.MqScalingConfig;
+import com.baomidou.mybatisplus.extension.service.IService;
+
+import java.util.HashMap;
+
+/**
+ * <p>
+ *  服务类
+ * </p>
+ *
+ * @author 
+ * @since 2024-04-02
+ */
+public interface IMqScalingConfigService extends IService<MqScalingConfig> {
+
+    HashMap<Integer, MqScalingConfig> getIdMap();
+}

+ 21 - 0
src/main/java/com/fdkankan/mqcontroller/service/IMqSendLogService.java

@@ -0,0 +1,21 @@
+package com.fdkankan.mqcontroller.service;
+
+import com.fdkankan.mqcontroller.entity.MqSendLog;
+import com.baomidou.mybatisplus.extension.service.IService;
+
+import java.util.List;
+
+/**
+ * <p>
+ *  服务类
+ * </p>
+ *
+ * @author 
+ * @since 2024-04-02
+ */
+public interface IMqSendLogService extends IService<MqSendLog> {
+
+    List<MqSendLog> getNoSendMsg();
+
+    List<MqSendLog> getNoSendMsgByQueueName(String key);
+}

+ 65 - 0
src/main/java/com/fdkankan/mqcontroller/service/impl/MqEcsServiceImpl.java

@@ -0,0 +1,65 @@
+package com.fdkankan.mqcontroller.service.impl;
+
+import cn.hutool.core.date.DateUtil;
+import com.baomidou.mybatisplus.core.conditions.query.LambdaQueryWrapper;
+import com.baomidou.mybatisplus.core.conditions.update.LambdaUpdateWrapper;
+import com.fdkankan.mqcontroller.entity.MqEcs;
+import com.fdkankan.mqcontroller.mapper.IMqEcsMapper;
+import com.fdkankan.mqcontroller.service.IMqEcsService;
+import com.baomidou.mybatisplus.extension.service.impl.ServiceImpl;
+import com.fdkankan.mqcontroller.utils.RedisKey;
+import com.fdkankan.redis.util.RedisUtil;
+import org.springframework.beans.factory.annotation.Autowired;
+import org.springframework.stereotype.Service;
+import rx.internal.util.unsafe.MessagePassingQueue;
+
+import java.util.List;
+
+/**
+ * <p>
+ *  服务实现类
+ * </p>
+ *
+ * @author 
+ * @since 2024-04-02
+ */
+@Service
+public class MqEcsServiceImpl extends ServiceImpl<IMqEcsMapper, MqEcs> implements IMqEcsService {
+
+
+    @Override
+    public void add(String queueName) {
+        MqEcs mqEcs = new MqEcs();
+        mqEcs.setQueueName(queueName);
+        this.save(mqEcs);
+    }
+
+    @Override
+    public List<MqEcs> getScalingNotStopList() {
+        LambdaQueryWrapper<MqEcs> wrapper = new LambdaQueryWrapper<>();
+        wrapper.eq(MqEcs::getIsScaling,1);
+        wrapper.eq(MqEcs::getStatus,0);
+        return this.list(wrapper);
+    }
+
+    @Override
+    public List<MqEcs> getNoModelingByQueueName(String queueName) {
+        LambdaQueryWrapper<MqEcs> wrapper = new LambdaQueryWrapper<>();
+        wrapper.eq(MqEcs::getIsScaling,1);
+        wrapper.eq(MqEcs::getStatus,0);
+        wrapper.isNull(MqEcs::getEcsName);
+        wrapper.eq(MqEcs::getQueueName,queueName);
+        return this.list(wrapper);
+    }
+
+
+    @Override
+    public void updateMqEcs(MqEcs mqEcs) {
+        LambdaUpdateWrapper<MqEcs> wrapper = new LambdaUpdateWrapper<>();
+        wrapper.eq(MqEcs::getId,mqEcs.getId());
+        wrapper.set(MqEcs::getStatus,2);
+        this.update(wrapper);
+    }
+
+
+}

+ 30 - 0
src/main/java/com/fdkankan/mqcontroller/service/impl/MqQueueConfigServiceImpl.java

@@ -0,0 +1,30 @@
+package com.fdkankan.mqcontroller.service.impl;
+
+import com.fdkankan.mqcontroller.entity.MqQueueConfig;
+import com.fdkankan.mqcontroller.mapper.IMqQueueConfigMapper;
+import com.fdkankan.mqcontroller.service.IMqQueueConfigService;
+import com.baomidou.mybatisplus.extension.service.impl.ServiceImpl;
+import org.springframework.stereotype.Service;
+
+import java.util.HashMap;
+import java.util.List;
+
+/**
+ * <p>
+ *  服务实现类
+ * </p>
+ *
+ * @author 
+ * @since 2024-04-02
+ */
+@Service
+public class MqQueueConfigServiceImpl extends ServiceImpl<IMqQueueConfigMapper, MqQueueConfig> implements IMqQueueConfigService {
+
+    @Override
+    public HashMap<String, MqQueueConfig> getQueueMap() {
+        HashMap<String, MqQueueConfig> map = new HashMap<>();
+        List<MqQueueConfig> queueConfigList = this.list();
+        queueConfigList.forEach(e -> map.put(e.getQueueName(),e));
+        return map;
+    }
+}

+ 28 - 0
src/main/java/com/fdkankan/mqcontroller/service/impl/MqScalingConfigServiceImpl.java

@@ -0,0 +1,28 @@
+package com.fdkankan.mqcontroller.service.impl;
+
+import com.fdkankan.mqcontroller.entity.MqScalingConfig;
+import com.fdkankan.mqcontroller.mapper.IMqScalingConfigMapper;
+import com.fdkankan.mqcontroller.service.IMqScalingConfigService;
+import com.baomidou.mybatisplus.extension.service.impl.ServiceImpl;
+import org.springframework.stereotype.Service;
+
+import java.util.HashMap;
+
+/**
+ * <p>
+ *  服务实现类
+ * </p>
+ *
+ * @author 
+ * @since 2024-04-02
+ */
+@Service
+public class MqScalingConfigServiceImpl extends ServiceImpl<IMqScalingConfigMapper, MqScalingConfig> implements IMqScalingConfigService {
+
+    @Override
+    public HashMap<Integer, MqScalingConfig> getIdMap() {
+        HashMap<Integer, MqScalingConfig> map = new HashMap<>();
+        this.list().forEach(e ->map.put(e.getId(),e));
+        return map;
+    }
+}

+ 39 - 0
src/main/java/com/fdkankan/mqcontroller/service/impl/MqSendLogServiceImpl.java

@@ -0,0 +1,39 @@
+package com.fdkankan.mqcontroller.service.impl;
+
+import com.baomidou.mybatisplus.core.conditions.query.LambdaQueryWrapper;
+import com.fdkankan.mqcontroller.entity.MqSendLog;
+import com.fdkankan.mqcontroller.mapper.IMqSendLogMapper;
+import com.fdkankan.mqcontroller.service.IMqSendLogService;
+import com.baomidou.mybatisplus.extension.service.impl.ServiceImpl;
+import org.springframework.stereotype.Service;
+
+import java.util.List;
+
+/**
+ * <p>
+ *  服务实现类
+ * </p>
+ *
+ * @author 
+ * @since 2024-04-02
+ */
+@Service
+public class MqSendLogServiceImpl extends ServiceImpl<IMqSendLogMapper, MqSendLog> implements IMqSendLogService {
+
+    @Override
+    public List<MqSendLog> getNoSendMsg() {
+        LambdaQueryWrapper<MqSendLog> wrapper = new LambdaQueryWrapper<>();
+        wrapper.eq(MqSendLog::getStatus,0);
+        wrapper.orderByAsc(MqSendLog::getId);
+        return this.list(wrapper);
+    }
+
+    @Override
+    public List<MqSendLog> getNoSendMsgByQueueName(String key) {
+        LambdaQueryWrapper<MqSendLog> wrapper = new LambdaQueryWrapper<>();
+        wrapper.eq(MqSendLog::getStatus,0);
+        wrapper.eq(MqSendLog::getQueue,key);
+        wrapper.orderByAsc(MqSendLog::getId);
+        return this.list(wrapper);
+    }
+}

+ 70 - 0
src/main/java/com/fdkankan/mqcontroller/task/TaskController.java

@@ -0,0 +1,70 @@
+package com.fdkankan.mqcontroller.task;
+
+import cn.hutool.log.Log;
+import lombok.extern.slf4j.Slf4j;
+import org.springframework.beans.factory.annotation.Autowired;
+import org.springframework.scheduling.annotation.Scheduled;
+import org.springframework.stereotype.Service;
+
+@Service
+@Slf4j
+public class TaskController {
+
+
+    @Autowired
+    TaskService taskService;
+
+    /**
+     * 每1分钟执行一次,将数据库中待计算的mq消息推送,rabbitmq
+     */
+    @Scheduled(initialDelay = 2000,fixedDelay = 1000 * 60 )
+    public void taskSendMq(){
+        try {
+            taskService.sendMq();
+        }catch (Exception e){
+            log.info("taskSendMq执行失败:",e);
+        }
+    }
+
+
+    /**
+     * 每2分钟执行一次,判断是否关闭弹性伸缩
+     */
+    @Scheduled(initialDelay = 3000,fixedDelay = 1000 * 60 * 2)
+    public void checkDelEcs(){
+        try {
+            taskService.checkDelEcs();
+        }catch (Exception e){
+            log.info("checkDelEcs执行失败:",e);
+        }
+    }
+
+
+    /**
+     * 每10秒执行一次,开启缓存弹性伸缩
+     */
+    @Scheduled(initialDelay = 1000,fixedDelay = 1000 * 10 )
+    public void openEcsList(){
+        try {
+            taskService.openEcsList();
+        }catch (Exception e){
+            log.info("openEcsList执行失败:",e);
+        }
+    }
+
+    /**
+     * 每10秒执行一次,关闭缓存弹性伸缩
+     */
+    @Scheduled(initialDelay = 1000,fixedDelay = 1000 * 10 )
+    public void delEcsList(){
+        try {
+            taskService.delEcsList();
+        }catch (Exception e){
+            log.info("delEcsList执行失败:",e);
+        }
+    }
+
+
+
+
+}

+ 243 - 0
src/main/java/com/fdkankan/mqcontroller/task/TaskService.java

@@ -0,0 +1,243 @@
+package com.fdkankan.mqcontroller.task;
+
+import cn.hutool.core.date.DateUnit;
+import cn.hutool.core.date.DateUtil;
+import cn.hutool.log.Log;
+import com.fdkankan.mqcontroller.entity.*;
+import com.fdkankan.mqcontroller.service.IMqEcsService;
+import com.fdkankan.mqcontroller.service.IMqQueueConfigService;
+import com.fdkankan.mqcontroller.service.IMqScalingConfigService;
+import com.fdkankan.mqcontroller.service.IMqSendLogService;
+import com.fdkankan.mqcontroller.utils.ECSUtils;
+import com.fdkankan.mqcontroller.utils.RabbitMqUtils;
+import com.fdkankan.mqcontroller.utils.RedisKey;
+import com.fdkankan.rabbitmq.util.RabbitMqProducer;
+import com.fdkankan.redis.util.RedisUtil;
+import lombok.extern.slf4j.Slf4j;
+import org.apache.commons.lang3.StringUtils;
+import org.springframework.beans.factory.annotation.Autowired;
+import org.springframework.beans.factory.annotation.Value;
+import org.springframework.cloud.context.config.annotation.RefreshScope;
+import org.springframework.stereotype.Service;
+
+import java.sql.SQLOutput;
+import java.util.*;
+import java.util.concurrent.LinkedBlockingQueue;
+import java.util.stream.Collectors;
+
+@Service
+@RefreshScope
+@Slf4j
+public class TaskService {
+
+    @Autowired
+    RabbitMqProducer rabbitMqProducer;
+    @Autowired
+    IMqSendLogService mqSendLogService;
+    @Autowired
+    IMqQueueConfigService queueConfigService;
+    @Autowired
+    IMqScalingConfigService mqScalingConfigService;
+    @Autowired
+    IMqEcsService mqEcsService;
+    @Autowired
+    RedisUtil redisUtil;
+
+    public static Integer checkOpenCount = 0;
+
+    private static final LinkedBlockingQueue<DelEcsVo> delList = new LinkedBlockingQueue<>();
+    private static final HashMap<String,LinkedBlockingQueue<DelEcsVo>> openMap = new HashMap<>();
+
+
+    public void sendMq() {
+        checkCount();
+        List<MqSendLog> mqSendLogs = mqSendLogService.getNoSendMsg();
+        log.info("未分配的mq队列数:{}",mqSendLogs.size());
+        List<MqQueueConfig> queueConfigList = queueConfigService.list();
+        for (MqQueueConfig mqQueueConfig : queueConfigList) {
+            List<MqSendLog> msgList = mqSendLogs.stream().filter(e -> e.getQueue().equals(mqQueueConfig.getQueueName())).collect(Collectors.toList());
+            if(msgList.isEmpty()){
+                continue;
+            }
+            MqMsg mqMsg = getRabbitMqMsg(mqQueueConfig.getQueueName());
+            if(mqMsg == null){
+                log.info("获取mq队列数据失败:{}",mqQueueConfig);
+                continue;
+            }
+            if(mqQueueConfig.getOpenScaling() == 0){    //不开启弹性伸缩
+                if(mqMsg.getMessages_ready() >0){       //待计算队列中有任务
+                    continue;
+                }
+            }
+            if(mqQueueConfig.getOpenScaling() == 1){    //开启弹性伸缩
+                checkOpenEcs(mqQueueConfig,msgList.size(),mqMsg.getMessages_ready());
+            }
+            sendRabbitMq(msgList,mqMsg.getConsumers() - mqMsg.getMessages_unacknowledged() - mqMsg.getMessages_ready());
+        }
+
+    }
+
+    private void checkCount() {
+        if(checkOpenCount > 10000){     //一个W为一个循环
+            checkOpenCount = 0;
+        }
+        checkOpenCount ++;
+    }
+
+    private void sendRabbitMq(List<MqSendLog> msgList, Integer msgCount) {
+        for (int i = 0;i < msgCount ;i++){
+            if(i > msgList.size() -1){
+                continue;
+            }
+            MqSendLog mqSendLog = msgList.get(i);
+            mqSendLog.setStatus(1);
+            mqSendLog.setUpdateTime(null);
+            mqSendLogService.updateById(mqSendLog);
+            rabbitMqProducer.sendByWorkQueue(mqSendLog.getQueue(),mqSendLog.getContent());
+        }
+    }
+
+
+    private void checkOpenEcs(MqQueueConfig mqQueueConfig,Integer msgCount,Integer readCount) {
+        List<MqEcs> list = mqEcsService.getNoModelingByQueueName(mqQueueConfig.getQueueName());
+        LinkedBlockingQueue<DelEcsVo> openList = openMap.get(mqQueueConfig.getQueueName());
+        if(openList == null){
+            openList = new LinkedBlockingQueue<>();
+            openMap.put(mqQueueConfig.getQueueName(),openList);
+        }
+        if(msgCount + readCount > mqQueueConfig.getScalingThreshold() + list.size() + openList.size() && mqQueueConfig.getOpenScalingTime() % checkOpenCount == 0){
+            MqScalingConfig mqScalingConfig = mqScalingConfigService.getById(mqQueueConfig.getScalingConfigId());
+            DelEcsVo vo = new DelEcsVo(null,mqScalingConfig,mqQueueConfig.getQueueName(),new Date());
+            openList.offer(vo);
+        }
+    }
+
+    public void openEcsList() {
+        try {
+            HashMap<String, MqQueueConfig> queueMap = queueConfigService.getQueueMap();
+            for (String key : openMap.keySet()) {
+                LinkedBlockingQueue<DelEcsVo> openList = openMap.get(key);
+                if(openList.isEmpty()){
+                    return;
+                }
+                DelEcsVo take = openList.poll();
+                List<MqSendLog> msgList = mqSendLogService.getNoSendMsgByQueueName(key);
+                MqQueueConfig mqQueueConfig = queueMap.get(key);
+                if(msgList.size() <= mqQueueConfig.getScalingThreshold()){
+                    log.info("openEcsList--待计算任务为:{}未超过阈值:{}无需开启弹性伸缩:{}",msgList.size(),mqQueueConfig.getScalingThreshold() ,key);
+                    return;
+                }
+                log.info("openEcsList--开启弹性伸缩数量:{},{}",key,openList.size());
+                Boolean flag = createEcs( take.getMqScalingConfig());
+                if(flag){
+                    mqEcsService.add(take.getQueueName());
+                    Thread.sleep(1000L * 5);
+                }else {
+                    openList.offer(take);
+                }
+            }
+        }catch (Exception e){
+            log.info("openEcsList--开启弹性伸缩失败:",e);
+        }
+    }
+
+    public void checkDelEcs() {
+        List<MqEcs> mqEcsList = mqEcsService.getScalingNotStopList();
+        log.info("启动中的弹性伸缩数量为:{}",mqEcsList.size());
+        if(mqEcsList.isEmpty()){
+            return;
+        }
+        HashMap<String,MqQueueConfig> queueMap = queueConfigService.getQueueMap();
+        HashMap<Integer,MqScalingConfig> scalingMap = mqScalingConfigService.getIdMap();
+        for (MqEcs mqEcs : mqEcsList) {
+            if(StringUtils.isBlank(mqEcs.getEcsName()) || StringUtils.isBlank(mqEcs.getQueueName())){
+                continue;
+            }
+            MqQueueConfig mqQueueConfig = queueMap.get(mqEcs.getQueueName());
+            if(mqQueueConfig == null || mqQueueConfig.getScalingConfigId() == null){
+                continue;
+            }
+            Long between = DateUtil.between(mqEcs.getCreateTime(), new Date(), DateUnit.MINUTE);
+            //弹性伸缩按照一个小时计费
+            Long count  = between/60;
+            boolean flag = delList.stream().anyMatch(e -> e.getMqEcs().getEcsName().equals(mqEcs.getEcsName()));
+            if(between >= mqQueueConfig.getStopScalingTime() + 60 * count && !flag){
+                log.info("checkDelEcs-实例开启时间大于{}分钟,开始关闭:{}",mqQueueConfig.getStopScalingTime(),mqEcs.getEcsName());
+                DelEcsVo vo = new DelEcsVo(mqEcs,scalingMap.get(mqQueueConfig.getScalingConfigId()),null,new Date());
+                delList.offer(vo);
+            }
+        }
+    }
+
+
+    public void delEcsList() {
+        try {
+            log.info("delEcsList--关闭弹性伸缩数量:{}",delList.size());
+            if(delList.isEmpty()){
+                return;
+            }
+            DelEcsVo take = delList.poll();
+            if(take == null){
+                return;
+            }
+            String stopKey = String.format(RedisKey.ecsStopKey,take.getMqEcs().getEcsName());
+            redisUtil.set(stopKey,take.getMqEcs().getEcsName(),60 * 60 * 24); //设置计算暂停锁
+
+            String modelingKey = String.format(RedisKey.modelingKey,take.getMqEcs().getEcsName());
+            if(redisUtil.hasKey(modelingKey)){
+                redisUtil.del(stopKey);
+                return;
+            }
+
+            Boolean delFlag = delEcs(take.getMqScalingConfig(), take.getMqEcs().getEcsName());
+            if(delFlag){
+                log.info("checkDelEcs--关闭弹性伸缩实例成功:{}", take.getMqEcs().getEcsName());
+                mqEcsService.updateMqEcs(take.getMqEcs());
+            }
+        }catch (Exception e){
+            log.info("delEcsList--关闭弹性伸缩失败:",e);
+        }
+    }
+
+
+
+    @Value("${spring.rabbitmq.host}")
+    public String host;
+    @Value("${spring.rabbitmq.username}")
+    public String username;
+    @Value("${spring.rabbitmq.password}")
+    public String password;
+    @Value("${spring.rabbitmq.virtual-host}")
+    public String virtualHost;
+    @Value("${spring.rabbitmq.mgmt-url}")
+    public String mgmtUrl;
+    @Value("${spring.rabbitmq.mgmt-host}")
+    public String mgmtHost;
+
+    public  MqMsg getRabbitMqMsg(String queueName) {
+        return RabbitMqUtils.getRabbitMqMsg(mgmtUrl+host+":"+mgmtHost,virtualHost,username,password,queueName);
+    }
+
+
+    public synchronized Boolean createEcs(MqScalingConfig mqScaling){
+        try {
+            return ECSUtils.createEcs(mqScaling.getAccessKey(),mqScaling.getSecret(),mqScaling.getEndpoint(),mqScaling.getScalingRuleAri());
+        }catch (Exception e){
+            log.info("触发弹性伸缩失败:",e);
+        }
+        return false;
+    }
+
+
+    public synchronized Boolean delEcs(MqScalingConfig mqScaling, String instanceId){
+        try {
+            return ECSUtils.delEcs(mqScaling.getAccessKey(),mqScaling.getSecret(),mqScaling.getEndpoint(),mqScaling.getScalingGroupId(),instanceId);
+        }catch (Exception e){
+            log.info("关闭弹性伸缩失败:",e);
+        }
+        return false;
+    }
+
+
+
+}

+ 72 - 0
src/main/java/com/fdkankan/mqcontroller/utils/ECSUtils.java

@@ -0,0 +1,72 @@
+package com.fdkankan.mqcontroller.utils;
+
+
+import com.alibaba.fastjson.JSONObject;
+import com.aliyun.ess20220222.Client;
+import com.aliyun.ess20220222.models.ExecuteScalingRuleRequest;
+import com.aliyun.ess20220222.models.ExecuteScalingRuleResponse;
+import com.aliyun.ess20220222.models.RemoveInstancesRequest;
+import com.aliyun.ess20220222.models.RemoveInstancesResponse;
+import com.aliyun.teaopenapi.models.Config;
+import lombok.extern.slf4j.Slf4j;
+
+import java.util.Arrays;
+import java.util.UUID;
+
+
+@Slf4j
+public class ECSUtils {
+
+    public static Client initClient(String accessKey,String accessKeySecret,String endPoint) throws Exception {
+        Config config = new Config();
+        config.setAccessKeyId(accessKey);
+        config.setAccessKeySecret(accessKeySecret);
+        config.setEndpoint(endPoint);
+        return new Client(config);
+    }
+
+    public static Boolean executeScalingActivity(Client client, String scalingRuleAri) throws Exception{
+        ExecuteScalingRuleRequest request = new ExecuteScalingRuleRequest();
+        request.setScalingRuleAri(scalingRuleAri);
+        request.setClientToken(UUID.randomUUID().toString()); // 使用UUID生成唯一标识,避免并发冲突
+        ExecuteScalingRuleResponse response = client.executeScalingRule(request);
+        log.info("executeScalingActivity-resp:{}",JSONObject.toJSONString(response));
+        if(response.getStatusCode() == 200){
+            return true;
+        }
+        return false;
+    }
+    public static Boolean executeScalingStop(Client client, String groupId,String instanceId)  throws Exception{
+        RemoveInstancesRequest removeInstancesRequest = new RemoveInstancesRequest();
+        removeInstancesRequest.setScalingGroupId(groupId);
+        removeInstancesRequest.setInstanceIds(Arrays.asList(instanceId));
+        RemoveInstancesResponse  removeInstancesResponse = client.removeInstances(removeInstancesRequest);
+        log.info("executeScalingStop-resp:{}",JSONObject.toJSONString(removeInstancesResponse));
+        if(removeInstancesResponse.getStatusCode() == 200){
+            return true;
+        }
+        return false;
+    }
+
+    public static boolean createEcs(String accessKey,String secret,String endpoint,String scalingRuleAri){
+        try {
+            Client client = initClient(accessKey,secret,endpoint);
+            return executeScalingActivity(client, scalingRuleAri);
+        }catch (Exception e){
+            log.info("createEcs--error:",e);
+        }
+        return false;
+    }
+
+    public static Boolean delEcs(String accessKey,String secret,String endpoint,String scalingGroupId,String instanceId){
+        try {
+            Client client = initClient(accessKey,secret,endpoint);
+            return executeScalingStop(client, scalingGroupId,instanceId);
+        }catch (Exception e){
+            log.info("delEcs--error:",e);
+        }
+        return false;
+    }
+
+
+}

+ 40 - 0
src/main/java/com/fdkankan/mqcontroller/utils/RabbitMqUtils.java

@@ -0,0 +1,40 @@
+package com.fdkankan.mqcontroller.utils;
+
+
+import cn.hutool.core.codec.Base64;
+import cn.hutool.core.util.CharsetUtil;
+import cn.hutool.http.HttpRequest;
+import com.alibaba.fastjson.JSONObject;
+import com.fdkankan.mqcontroller.entity.MqMsg;
+import lombok.extern.slf4j.Slf4j;
+import org.apache.catalina.Host;
+import org.springframework.beans.factory.annotation.Value;
+import org.springframework.stereotype.Component;
+
+@Slf4j
+public class RabbitMqUtils {
+
+    /**
+     * 根据rabbitmq client 获取信息
+     */
+    public static MqMsg getRabbitMqMsg(String rabbitMgmtUrl,String vhost, String userName,String password,String queueName) {
+        String url = rabbitMgmtUrl + "/api/queues/" + vhost + "/" + queueName;
+        log.info("getRabbitMqMsg-url:{}",url);
+        // 添加请求头
+        // 设置认证信息
+        try {
+            HttpRequest request = HttpRequest.get(url)
+                    .header("authorization", "Basic " +
+                            Base64.encode((userName + ":" + password).getBytes(CharsetUtil.UTF_8)));
+            // 发送请求,并获取响应字符串
+            String response = request.execute().body();
+            log.info("getRabbitMqMsg-resp:{}",response);
+            JSONObject jsonObject = JSONObject.parseObject(response);
+            return JSONObject.toJavaObject(jsonObject,MqMsg.class);
+        }catch (Exception e){
+            log.info("获取mq信息失败:",e);
+        }
+        return null;
+    }
+
+}

+ 7 - 0
src/main/java/com/fdkankan/mqcontroller/utils/RedisKey.java

@@ -0,0 +1,7 @@
+package com.fdkankan.mqcontroller.utils;
+
+public class RedisKey {
+    public static String modelingKey = "4dkankan:modeling:scaling:hostname:%s";
+    public static String ecsStopKey = "4dkankan:modeling:scaling:hostname:stop:%s";
+    public static String ecsStopNumKey = "4dkankan:modeling:scaling:hostname:stopnum:%s";
+}

+ 27 - 0
src/main/resources/bootstrap-test.yml

@@ -0,0 +1,27 @@
+spring:
+  application:
+    name: 4dkankan-mq-controller
+  cloud:
+    nacos:
+      config:
+        server-addr: 172.18.156.39:8848
+        file-extension: yaml
+        namespace: 4dkankan-v4-test
+        extension-configs:
+          - data-id: 4dkankan-mq-controller.yaml
+            group: DEFAULT_GROUP
+            refresh: true
+        shared-configs:
+          - data-id: common-rabbitmq-config.yaml
+            group: DEFAULT_GROUP
+            refresh: true
+
+          - data-id: common-db-config.yaml
+            group: DEFAULT_GROUP
+            refresh: true
+      discovery:
+        server-addr: ${spring.cloud.nacos.config.server-addr}
+        namespace: ${spring.cloud.nacos.config.namespace}
+
+
+

+ 8 - 0
src/main/resources/bootstrap.yml

@@ -0,0 +1,8 @@
+spring:
+  profiles:
+    active: dev
+logging:
+  config: classpath:logback-spring.xml
+mybatis-plus:
+  configuration:
+    log-impl: org.apache.ibatis.logging.stdout.StdOutImpl #开启sql日志

+ 255 - 0
src/main/resources/logback-spring.xml

@@ -0,0 +1,255 @@
+<?xml version="1.0" encoding="UTF-8"?>
+<!-- 日志级别从低到高分为TRACE < DEBUG < INFO < WARN < ERROR < FATAL,如果设置为WARN,则低于WARN的信息都不会输出 -->
+<!-- scan:当此属性设置为true时,配置文件如果发生改变,将会被重新加载,默认值为true -->
+<!-- scanPeriod:设置监测配置文件是否有修改的时间间隔,如果没有给出时间单位,默认单位是毫秒。当scan为true时,此属性生效。默认的时间间隔为1分钟。 -->
+<!-- debug:当此属性设置为true时,将打印出logback内部日志信息,实时查看logback运行状态。默认值为false。 -->
+<configuration scan="true" scanPeriod="10 seconds">
+	<springProperty scope="context" name="LOG_PATH" source="logging.path"/>
+
+	<contextName>logback</contextName>
+	<!-- name的值是变量的名称,value的值时变量定义的值。通过定义的值会被插入到logger上下文中。定义变量后,可以使“${}”来使用变量。 -->
+	<property name="log.path" value="${LOG_PATH}/mqcontroller/logs" />
+
+	<!-- 彩色日志 -->
+	<!-- 彩色日志依赖的渲染类 -->
+	<conversionRule conversionWord="clr" converterClass="org.springframework.boot.logging.logback.ColorConverter" />
+	<conversionRule conversionWord="wex" converterClass="org.springframework.boot.logging.logback.WhitespaceThrowableProxyConverter" />
+	<conversionRule conversionWord="wEx" converterClass="org.springframework.boot.logging.logback.ExtendedWhitespaceThrowableProxyConverter" />
+	<!-- 彩色日志格式 -->
+	<property name="CONSOLE_LOG_PATTERN"
+			  value="${CONSOLE_LOG_PATTERN:-%clr(%d{yyyy-MM-dd HH:mm:ss.SSS}){faint} %clr(${LOG_LEVEL_PATTERN:-%5p}) %clr(${PID:- }){magenta} %clr(---){faint} %clr([%15.15t]){faint} %clr(%-40.40logger{39}){cyan} %clr(:){faint} %m%n${LOG_EXCEPTION_CONVERSION_WORD:-%wEx}}" />
+
+	<!--输出到控制台 -->
+	<appender name="CONSOLE" class="ch.qos.logback.core.ConsoleAppender">
+		<!--此日志appender是为开发使用,只配置最底级别,控制台输出的日志级别是大于或等于此级别的日志信息 -->
+		<filter class="ch.qos.logback.classic.filter.ThresholdFilter">
+			<level>info</level>
+		</filter>
+		<encoder>
+			<Pattern>${CONSOLE_LOG_PATTERN}</Pattern>
+			<!-- 设置字符集 -->
+			<charset>UTF-8</charset>
+		</encoder>
+	</appender>
+	<!--输出到文件 -->
+
+	<!-- 时间滚动输出 level为 DEBUG 日志 -->
+	<appender name="DEBUG_FILE" class="ch.qos.logback.core.rolling.RollingFileAppender">
+		<!-- 正在记录的日志文件的路径及文件名 -->
+		<file>${log.path}/log_debug.log</file>
+		<!--日志文件输出格式 -->
+		<encoder>
+			<pattern>%d{yyyy-MM-dd HH:mm:ss.SSS} [%thread] %-5level %logger{50} - %msg%n</pattern>
+			<charset>UTF-8</charset> <!-- 设置字符集 -->
+		</encoder>
+		<!-- 日志记录器的滚动策略,按日期,按大小记录 -->
+		<rollingPolicy class="ch.qos.logback.core.rolling.TimeBasedRollingPolicy">
+			<!-- 日志归档 -->
+			<fileNamePattern>${log.path}/debug/log-debug-%d{yyyy-MM-dd}.%i.log
+			</fileNamePattern>
+			<timeBasedFileNamingAndTriggeringPolicy class="ch.qos.logback.core.rolling.SizeAndTimeBasedFNATP">
+				<maxFileSize>100MB</maxFileSize>
+			</timeBasedFileNamingAndTriggeringPolicy>
+			<!--日志文件保留天数 -->
+			<maxHistory>15</maxHistory>
+		</rollingPolicy>
+		<!-- 此日志文件只记录debug级别的 -->
+		<filter class="ch.qos.logback.classic.filter.LevelFilter">
+			<level>debug</level>
+			<onMatch>ACCEPT</onMatch>
+			<onMismatch>DENY</onMismatch>
+		</filter>
+	</appender>
+
+	<!-- 时间滚动输出 level为 INFO 日志 -->
+	<appender name="INFO_FILE" class="ch.qos.logback.core.rolling.RollingFileAppender">
+		<!-- 正在记录的日志文件的路径及文件名 -->
+		<file>${log.path}/log_info.log</file>
+		<!--日志文件输出格式 -->
+		<encoder>
+			<pattern>%d{yyyy-MM-dd HH:mm:ss.SSS} [%thread] %-5level %logger{50} - %msg%n</pattern>
+			<charset>UTF-8</charset>
+		</encoder>
+		<!-- 日志记录器的滚动策略,按日期,按大小记录 -->
+		<rollingPolicy class="ch.qos.logback.core.rolling.TimeBasedRollingPolicy">
+			<!-- 每天日志归档路径以及格式 -->
+			<fileNamePattern>${log.path}/info/log-info-%d{yyyy-MM-dd}.%i.log
+			</fileNamePattern>
+			<timeBasedFileNamingAndTriggeringPolicy class="ch.qos.logback.core.rolling.SizeAndTimeBasedFNATP">
+				<maxFileSize>100MB</maxFileSize>
+			</timeBasedFileNamingAndTriggeringPolicy>
+			<!--日志文件保留天数 -->
+			<maxHistory>15</maxHistory>
+		</rollingPolicy>
+		<!-- 此日志文件只记录info级别的 -->
+		<filter class="ch.qos.logback.classic.filter.LevelFilter">
+			<level>info</level>
+			<onMatch>ACCEPT</onMatch>
+			<onMismatch>DENY</onMismatch>
+		</filter>
+	</appender>
+
+	<!-- 时间滚动输出 level为 WARN 日志 -->
+	<appender name="WARN_FILE" class="ch.qos.logback.core.rolling.RollingFileAppender">
+		<!-- 正在记录的日志文件的路径及文件名 -->
+		<file>${log.path}/log_warn.log</file>
+		<!--日志文件输出格式 -->
+		<encoder>
+			<pattern>%d{yyyy-MM-dd HH:mm:ss.SSS} [%thread] %-5level %logger{50} - %msg%n</pattern>
+			<charset>UTF-8</charset> <!-- 此处设置字符集 -->
+		</encoder>
+		<!-- 日志记录器的滚动策略,按日期,按大小记录 -->
+		<rollingPolicy class="ch.qos.logback.core.rolling.TimeBasedRollingPolicy">
+			<fileNamePattern>${log.path}/warn/log-warn-%d{yyyy-MM-dd}.%i.log
+			</fileNamePattern>
+			<timeBasedFileNamingAndTriggeringPolicy class="ch.qos.logback.core.rolling.SizeAndTimeBasedFNATP">
+				<maxFileSize>100MB</maxFileSize>
+			</timeBasedFileNamingAndTriggeringPolicy>
+			<!--日志文件保留天数 -->
+			<maxHistory>15</maxHistory>
+		</rollingPolicy>
+		<!-- 此日志文件只记录warn级别的 -->
+		<filter class="ch.qos.logback.classic.filter.LevelFilter">
+			<level>warn</level>
+			<onMatch>ACCEPT</onMatch>
+			<onMismatch>DENY</onMismatch>
+		</filter>
+	</appender>
+
+
+	<!-- 时间滚动输出 level为 ERROR 日志 -->
+	<appender name="ERROR_FILE" class="ch.qos.logback.core.rolling.RollingFileAppender">
+		<!-- 正在记录的日志文件的路径及文件名 -->
+		<file>${log.path}/log_error.log</file>
+		<!--日志文件输出格式 -->
+		<encoder>
+			<pattern>%d{yyyy-MM-dd HH:mm:ss.SSS} [%thread] %-5level %logger{50} - %msg%n</pattern>
+			<charset>UTF-8</charset> <!-- 此处设置字符集 -->
+		</encoder>
+		<!-- 日志记录器的滚动策略,按日期,按大小记录 -->
+		<rollingPolicy class="ch.qos.logback.core.rolling.TimeBasedRollingPolicy">
+			<fileNamePattern>${log.path}/error/log-error-%d{yyyy-MM-dd}.%i.log
+			</fileNamePattern>
+			<timeBasedFileNamingAndTriggeringPolicy class="ch.qos.logback.core.rolling.SizeAndTimeBasedFNATP">
+				<maxFileSize>100MB</maxFileSize>
+			</timeBasedFileNamingAndTriggeringPolicy>
+			<!--日志文件保留天数 -->
+			<maxHistory>15</maxHistory>
+		</rollingPolicy>
+		<!-- 此日志文件只记录ERROR级别的 -->
+		<filter class="ch.qos.logback.classic.filter.LevelFilter">
+			<level>ERROR</level>
+			<onMatch>ACCEPT</onMatch>
+			<onMismatch>DENY</onMismatch>
+		</filter>
+	</appender>
+
+	<appender name="PROGRAM_FILE" class="ch.qos.logback.core.rolling.RollingFileAppender">
+		<!-- 正在记录的日志文件的路径及文件名 -->
+		<file>${log.path}/program/log_program.log</file>
+		<!--日志文件输出格式 -->
+		<encoder>
+			<pattern>%d{yyyy-MM-dd HH:mm:ss.SSS} [%thread] %-5level %logger{50} - %msg%n</pattern>
+			<charset>UTF-8</charset> <!-- 此处设置字符集 -->
+		</encoder>
+		<!-- 日志记录器的滚动策略,按日期,按大小记录 -->
+		<rollingPolicy class="ch.qos.logback.core.rolling.TimeBasedRollingPolicy">
+			<fileNamePattern>${log.path}/program/log-program-%d{yyyy-MM-dd}.%i.log
+			</fileNamePattern>
+			<timeBasedFileNamingAndTriggeringPolicy class="ch.qos.logback.core.rolling.SizeAndTimeBasedFNATP">
+				<maxFileSize>100MB</maxFileSize>
+			</timeBasedFileNamingAndTriggeringPolicy>
+			<!--日志文件保留天数 -->
+			<maxHistory>15</maxHistory>
+		</rollingPolicy>
+		<!-- 此日志文件只记录ERROR级别的 -->
+		<filter class="ch.qos.logback.classic.filter.LevelFilter">
+			<onMatch>ACCEPT</onMatch>
+			<onMismatch>DENY</onMismatch>
+		</filter>
+	</appender>
+	<logger name="programLog" level="INFO" additivity="true">
+		<appender-ref ref="PROGRAM_FILE"/>
+	</logger>
+
+	<appender name="VISIT_FILE" class="ch.qos.logback.core.rolling.RollingFileAppender">
+		<!-- 正在记录的日志文件的路径及文件名 -->
+		<file>${log.path}/visit/log_visit.log</file>
+		<!--日志文件输出格式 -->
+		<encoder>
+			<pattern>%d{yyyy-MM-dd HH:mm:ss.SSS} [%thread] %-5level %logger{50} - %msg%n</pattern>
+			<charset>UTF-8</charset> <!-- 此处设置字符集 -->
+		</encoder>
+		<!-- 日志记录器的滚动策略,按日期,按大小记录 -->
+		<rollingPolicy class="ch.qos.logback.core.rolling.TimeBasedRollingPolicy">
+			<fileNamePattern>${log.path}/visit/log-visit-%d{yyyy-MM-dd}.%i.log
+			</fileNamePattern>
+			<timeBasedFileNamingAndTriggeringPolicy class="ch.qos.logback.core.rolling.SizeAndTimeBasedFNATP">
+				<maxFileSize>100MB</maxFileSize>
+			</timeBasedFileNamingAndTriggeringPolicy>
+			<!--日志文件保留天数 -->
+			<maxHistory>15</maxHistory>
+		</rollingPolicy>
+		<!-- 此日志文件只记录ERROR级别的 -->
+		<filter class="ch.qos.logback.classic.filter.LevelFilter">
+			<onMatch>ACCEPT</onMatch>
+			<onMismatch>DENY</onMismatch>
+		</filter>
+	</appender>
+	<logger name="visitLog" level="INFO" additivity="true">
+		<appender-ref ref="VISIT_FILE"/>
+	</logger>
+
+
+	<!--  连接时长  -->
+	<appender name="timeLogger" class="ch.qos.logback.core.rolling.RollingFileAppender">
+		<file>${log.path}/timeLogger.log</file>
+		<filter class="ch.qos.logback.classic.filter.ThresholdFilter">
+			<level>DEBUG</level>
+		</filter>
+		<rollingPolicy class="ch.qos.logback.core.rolling.TimeBasedRollingPolicy">
+			<fileNamePattern>${log.path}/timeLogger/timeLogger-%d{yyyy-MM-dd}.%i.log</fileNamePattern>
+			<timeBasedFileNamingAndTriggeringPolicy class="ch.qos.logback.core.rolling.SizeAndTimeBasedFNATP">
+				<maxFileSize>100MB</maxFileSize>
+			</timeBasedFileNamingAndTriggeringPolicy>
+			<!--日志文件保留天数-->
+			<maxHistory>10</maxHistory>
+		</rollingPolicy>
+		<encoder>
+			<pattern>%d{yyyy-MM-dd HH:mm:ss.SSS} [%thread] %-5level %logger{50} - %msg%n</pattern>
+			<charset>UTF-8</charset> <!-- 此处设置字符集 -->
+		</encoder>
+	</appender>
+	<logger name="timeLogger" additivity="false" level="DEBUG">
+		<appender-ref ref="timeLogger"/>
+	</logger>
+
+	<!-- <logger>用来设置某一个包或者具体的某一个类的日志打印级别、 以及指定<appender>。<logger>仅有一个name属性, 一个可选的level和一个可选的addtivity属性。 name:用来指定受此logger约束的某一个包或者具体的某一个类。 level:用来设置打印级别,大小写无关:TRACE,
+		DEBUG, INFO, WARN, ERROR, ALL 和 OFF, 还有一个特俗值INHERITED或者同义词NULL,代表强制执行上级的级别。 如果未设置此属性,那么当前logger将会继承上级的级别。 addtivity:是否向上级logger传递打印信息。默认是true。 -->
+	<!--<logger name="org.springframework.web" level="info"/> -->
+	<!--<logger name="org.springframework.scheduling.annotation.ScheduledAnnotationBeanPostProcessor" level="INFO"/> -->
+	<!-- 使用mybatis的时候,sql语句是debug下才会打印,而这里我们只配置了info,所以想要查看sql语句的话,有以下两种操作: 第一种把<root level="info">改成<root level="DEBUG">这样就会打印sql,不过这样日志那边会出现很多其他消息 第二种就是单独给dao下目录配置debug模式,代码如下,这样配置sql语句会打印,其他还是正常info级别: -->
+	<!-- root节点是必选节点,用来指定最基础的日志输出级别,只有一个level属性 level:用来设置打印级别,大小写无关:TRACE, DEBUG, INFO, WARN, ERROR, ALL 和 OFF, 不能设置为INHERITED或者同义词NULL。默认是DEBUG 可以包含零个或多个元素,标识这个appender将会添加到这个logger。 -->
+
+	<root level="info">
+		<appender-ref ref="CONSOLE" />
+		<appender-ref ref="DEBUG_FILE" />
+		<appender-ref ref="INFO_FILE" />
+		<appender-ref ref="WARN_FILE" />
+		<appender-ref ref="ERROR_FILE" />
+	</root>
+
+	<!--生产环境:输出到文件 -->
+	<!--<springProfile name="pro"> -->
+	<!--<root level="info"> -->
+	<!--<appender-ref ref="CONSOLE" /> -->
+	<!--<appender-ref ref="DEBUG_FILE" /> -->
+	<!--<appender-ref ref="INFO_FILE" /> -->
+	<!--<appender-ref ref="ERROR_FILE" /> -->
+	<!--<appender-ref ref="WARN_FILE" /> -->
+	<!--</root> -->
+	<!--</springProfile> -->
+
+</configuration>
+
+

+ 5 - 0
src/main/resources/mapper/mqcontroller/MqEcsMapper.xml

@@ -0,0 +1,5 @@
+<?xml version="1.0" encoding="UTF-8"?>
+<!DOCTYPE mapper PUBLIC "-//mybatis.org//DTD Mapper 3.0//EN" "http://mybatis.org/dtd/mybatis-3-mapper.dtd">
+<mapper namespace="com.fdkankan.mqcontroller.mapper.IMqEcsMapper">
+
+</mapper>

+ 5 - 0
src/main/resources/mapper/mqcontroller/MqQueueConfigMapper.xml

@@ -0,0 +1,5 @@
+<?xml version="1.0" encoding="UTF-8"?>
+<!DOCTYPE mapper PUBLIC "-//mybatis.org//DTD Mapper 3.0//EN" "http://mybatis.org/dtd/mybatis-3-mapper.dtd">
+<mapper namespace="com.fdkankan.mqcontroller.mapper.IMqQueueConfigMapper">
+
+</mapper>

+ 5 - 0
src/main/resources/mapper/mqcontroller/MqScalingConfigMapper.xml

@@ -0,0 +1,5 @@
+<?xml version="1.0" encoding="UTF-8"?>
+<!DOCTYPE mapper PUBLIC "-//mybatis.org//DTD Mapper 3.0//EN" "http://mybatis.org/dtd/mybatis-3-mapper.dtd">
+<mapper namespace="com.fdkankan.mqcontroller.mapper.IMqScalingConfigMapper">
+
+</mapper>

+ 5 - 0
src/main/resources/mapper/mqcontroller/MqSendLogMapper.xml

@@ -0,0 +1,5 @@
+<?xml version="1.0" encoding="UTF-8"?>
+<!DOCTYPE mapper PUBLIC "-//mybatis.org//DTD Mapper 3.0//EN" "http://mybatis.org/dtd/mybatis-3-mapper.dtd">
+<mapper namespace="com.fdkankan.mqcontroller.mapper.IMqSendLogMapper">
+
+</mapper>