Browse Source

场景计算 每次只消费一个mq,实现同一个时刻只有一个场景在计算中

dengsixing 3 years ago
parent
commit
81b250f4fd

+ 3 - 1
4dkankan-center-modeling/src/main/java/com/fdkankan/modeling/receiver/BuildSceneMQListener.java

@@ -58,7 +58,9 @@ import static com.fdkankan.push.PushMessageConfig.*;
 @RocketMQMessageListener(
         consumerGroup = "${rocketmq.consumer.build-scene-group}",
         topic = "${rocketmq.build-scene.topicName.topic-modeling-a}",
-        messageModel = MessageModel.CLUSTERING,consumeMode = ConsumeMode.ORDERLY)
+        messageModel = MessageModel.CLUSTERING,//负载均衡模式
+        consumeThreadMax = 1//消费者最大线程数,每次只消费一个
+)
 public class BuildSceneMQListener implements RocketMQListener<String> {
 
     @Value("${rocketmq.consumer.build-scene-group}")

+ 3 - 3
4dkankan-center-scene/src/main/java/com/fdkankan/scene/controller/TestController.java

@@ -222,9 +222,9 @@ public class TestController {
     @GetMapping("/testMQ")
     public void testMQ(){
 //        SceneVO body = SceneVO.builder().buildType("123").childName("sdfsdfsdf").dataSource("sadfdhgf").id(1213L).build();
-        BuildSceneMqMessage body = new BuildSceneMqMessage();
-        body.setPayStatus(1);
-        rocketMQProducer.sendOneWay(topicModelingA, body);
+        for(int i = 0; i < 10; i++){
+            rocketMQProducer.sendOneWay("test_topic_dsx", "消息体" + i);
+        }
     }
 
     @GetMapping("/testAsynMQ")

+ 2 - 1
4dkankan-center-scene/src/main/java/com/fdkankan/scene/listener/BuildSceneResultMQListener.java

@@ -36,7 +36,8 @@ import java.util.Objects;
 @RocketMQMessageListener(
         consumerGroup = "${rocketmq.consumer.build-scene-result-group}",
         topic = "${rocketmq.build-scene-result.topicName.topic-modeling-a}",
-        messageModel = MessageModel.CLUSTERING)
+        messageModel = MessageModel.CLUSTERING//负载均衡模式
+)
 public class BuildSceneResultMQListener implements RocketMQListener<String> {
 
     @Value("${rocketmq.consumer.build-scene-result-group}")

+ 0 - 38
4dkankan-center-scene/src/main/java/com/fdkankan/scene/listener/TestMQListener.java

@@ -1,38 +0,0 @@
-package com.fdkankan.scene.listener;
-
-import lombok.SneakyThrows;
-import lombok.extern.slf4j.Slf4j;
-import org.apache.rocketmq.spring.annotation.ConsumeMode;
-import org.apache.rocketmq.spring.annotation.MessageModel;
-import org.apache.rocketmq.spring.annotation.RocketMQMessageListener;
-import org.apache.rocketmq.spring.core.RocketMQListener;
-import org.springframework.cloud.context.config.annotation.RefreshScope;
-import org.springframework.stereotype.Component;
-
-
-/**
- * 设置消息监听
- * 监听组:监听topic:监听tag(默认监听topic下所有)
- * 监听消费模式:默认负载均衡:CLUSTERING(每一个消息只发给一个消费者)、广播模式:BROADCASTING(发送给所有消费者)
- *
- */
-@Slf4j
-@Component
-@RocketMQMessageListener(
-        consumerGroup = "qwe",
-        topic = "qwe",
-//        selectorExpression = "${rocketmq.consumer.tags}",
-        messageModel = MessageModel.CLUSTERING
-)
-public class TestMQListener implements RocketMQListener<String> {
-
-    @SneakyThrows
-    @Override
-    public synchronized void onMessage(String message) {
-
-        System.out.println(Thread.currentThread().getName()+"test1"+message);
-        Thread.sleep(10000);
-        System.out.println("结束");
-
-    }
-}