瀏覽代碼

mq 单线程消费

dengsixing 3 年之前
父節點
當前提交
97c1223890

+ 1 - 1
4dkankan-center-modeling/src/main/java/com/fdkankan/modeling/receiver/BuildSceneMQListener.java

@@ -92,7 +92,7 @@ public class BuildSceneMQListener implements RocketMQListener<String> {
     private DingTalkSendUtils dingTalkSendUtils;
 
     @Override
-    public void onMessage(String message) {
+    public synchronized void onMessage(String message) {
         log.info("开始处理消息,消费者组:{},主题名:{}, 消息内容:{}", consumerGroup, topicName, message);
         BuildSceneMqMessage buildSceneMqMessage = JSONObject.parseObject(message, BuildSceneMqMessage.class);
         if(StringUtils.isBlank(buildSceneMqMessage.getSceneNum())){

+ 8 - 5
4dkankan-center-scene/src/main/java/com/fdkankan/scene/controller/TestController.java

@@ -1,5 +1,6 @@
 package com.fdkankan.scene.controller;
 
+import cn.hutool.core.collection.ConcurrentHashSet;
 import com.alibaba.druid.pool.DruidDataSource;
 import com.alibaba.fastjson.JSONObject;
 import com.fdkankan.common.constant.ConstantFilePath;
@@ -40,6 +41,7 @@ import java.util.UUID;
 public class TestController {
 
 
+
     @Autowired
     RedisUtil redisUtil;
 
@@ -57,11 +59,15 @@ public class TestController {
     @Autowired
     private PlatformGoodsClient platformGoodsClient;
 
+    @Autowired
+    private RocketMQProducer rocketMQProducer;
+
 
     @GetMapping("/test")
     public String test() throws Exception {
-        ResultData<Camera> cameraByChildName = platformGoodsClient.getCameraByChildName("123");
-        StrExtUtil.test();
+        for (int i = 0; i< 10; i++){
+            rocketMQProducer.syncSend("qwe",i+"",  "消息体"+i);
+        }
         return "123";
 
     }
@@ -172,9 +178,6 @@ public class TestController {
         sceneService.updatePv();
     }
 
-    @Autowired
-    RocketMQProducer rocketMQProducer;
-
     @Value("${rocketmq.build-scene.topicName.topic-modeling-a}")
     private String topicModelingA;
 

+ 11 - 5
4dkankan-center-scene/src/main/java/com/fdkankan/scene/listener/TestMQListener.java

@@ -1,6 +1,8 @@
 package com.fdkankan.scene.listener;
 
+import lombok.SneakyThrows;
 import lombok.extern.slf4j.Slf4j;
+import org.apache.rocketmq.spring.annotation.ConsumeMode;
 import org.apache.rocketmq.spring.annotation.MessageModel;
 import org.apache.rocketmq.spring.annotation.RocketMQMessageListener;
 import org.apache.rocketmq.spring.core.RocketMQListener;
@@ -17,16 +19,20 @@ import org.springframework.stereotype.Component;
 @Slf4j
 @Component
 @RocketMQMessageListener(
-        consumerGroup = "lll",
-        topic = "kkk",
+        consumerGroup = "qwe",
+        topic = "qwe",
 //        selectorExpression = "${rocketmq.consumer.tags}",
-        messageModel = MessageModel.CLUSTERING)
+        messageModel = MessageModel.CLUSTERING
+)
 public class TestMQListener implements RocketMQListener<String> {
 
+    @SneakyThrows
     @Override
-    public void onMessage(String message) {
+    public synchronized void onMessage(String message) {
 
-        System.out.println(message);
+        System.out.println(Thread.currentThread().getName()+"test1"+message);
+        Thread.sleep(10000);
+        System.out.println("结束");
 
     }
 }

+ 5 - 5
4dkankan-center-scene/src/main/java/com/fdkankan/scene/service/impl/SceneFileBuildServiceImpl.java

@@ -848,7 +848,7 @@ public class SceneFileBuildServiceImpl extends ServiceImpl<ISceneFileBuildMapper
         //删除oss的houst_floor.json
         uploadToOssUtil.delete("data/data"+sceneNum+File.separator+"houst_floor.json");
 
-        mqProducer.sendInOrder(topicModelingA, scene.getMqMsg());
+        mqProducer.sendInOrder(topicModelingA, scene.getId()+"", scene.getMqMsg());
 
         Map<String, String> map = new HashMap<>();
         map.put("sceneNum", sceneNum);
@@ -1073,7 +1073,7 @@ public class SceneFileBuildServiceImpl extends ServiceImpl<ISceneFileBuildMapper
         if(jsonObject.getJSONObject("cam").getIntValue("type") == 10){
             mqProducer.sendOneWay(topicLaserA, scene.getMqMsg());
         }else  if(scene != null){
-            mqProducer.sendInOrder(topicModelingA, scene.getMqMsg());
+            mqProducer.sendInOrder(topicModelingA, scene.getId()+"", scene.getMqMsg());
         }
     }
 
@@ -1148,7 +1148,7 @@ public class SceneFileBuildServiceImpl extends ServiceImpl<ISceneFileBuildMapper
                 //生成标定数据
                 ComputerUtil.createCalibrationData(calPath, filePath);
                 //开始标定计算
-                mqProducer.sendInOrder(topicModelingA, calPath);
+                mqProducer.sendInOrder(topicModelingA, new Random().nextInt()+"", calPath);
 
             }else if(jsonObject.get("calibration") != null && jsonObject.getString("calibration").equals("2")){
                 String mac = filePath.replace(ConstantFilePath.BUILD_MODEL_PATH, "").split("/")[0];
@@ -1163,7 +1163,7 @@ public class SceneFileBuildServiceImpl extends ServiceImpl<ISceneFileBuildMapper
                 //生成标定数据
                 ComputerUtil.createCalibrationData(calPath, filePath);
                 //开始标定计算
-                mqProducer.sendInOrder(topicModelingA, calPath);
+                mqProducer.sendInOrder(topicModelingA, new Random().nextInt()+"", calPath);
             }else {
 
                 String sceneNum = "";
@@ -1402,7 +1402,7 @@ public class SceneFileBuildServiceImpl extends ServiceImpl<ISceneFileBuildMapper
                     return;
                 }
                 if(sceneProPO != null){
-                    mqProducer.sendInOrder(topicModelingA, sceneProPO.getMqMsg());
+                    mqProducer.sendInOrder(topicModelingA, sceneProPO.getId()+"", sceneProPO.getMqMsg());
                 }
 
             }