Browse Source

Merge remote-tracking branch 'origin/master'

# Conflicts:
#	4dkankan-center-platform/pom.xml
by su 3 years ago
parent
commit
f2f09c72dd

+ 2 - 1
4dkankan-center-modeling/src/main/java/com/fdkankan/modeling/receiver/BuildSceneMQListener.java

@@ -24,6 +24,7 @@ import com.fdkankan.common.utils.FileUtils;
 import com.taobao.api.ApiException;
 import lombok.extern.slf4j.Slf4j;
 import org.apache.commons.lang3.StringUtils;
+import org.apache.rocketmq.spring.annotation.ConsumeMode;
 import org.apache.rocketmq.spring.annotation.MessageModel;
 import org.apache.rocketmq.spring.annotation.RocketMQMessageListener;
 import org.apache.rocketmq.spring.core.RocketMQListener;
@@ -53,7 +54,7 @@ import static com.fdkankan.modeling.push.PushMessageConfig.*;
 @RocketMQMessageListener(
         consumerGroup = "${rocketmq.consumer.group}",
         topic = "${rocketmq.build-scene.topicName.topic-modeling-a}",
-        messageModel = MessageModel.CLUSTERING)
+        messageModel = MessageModel.CLUSTERING,consumeMode = ConsumeMode.ORDERLY)
 public class BuildSceneMQListener implements RocketMQListener<String> {
 
     @Value("${rocketmq.consumer.group}")

+ 58 - 0
4dkankan-center-modeling/src/main/java/com/fdkankan/modeling/receiver/BuildSceneMQListenerTest.java

@@ -0,0 +1,58 @@
+package com.fdkankan.modeling.receiver;
+
+import com.fdkankan.mq.util.RocketMQProducer;
+import lombok.extern.slf4j.Slf4j;
+import org.apache.rocketmq.spring.annotation.ConsumeMode;
+import org.apache.rocketmq.spring.annotation.MessageModel;
+import org.apache.rocketmq.spring.annotation.RocketMQMessageListener;
+import org.apache.rocketmq.spring.core.RocketMQListener;
+import org.springframework.beans.factory.annotation.Autowired;
+import org.springframework.stereotype.Component;
+
+import java.util.concurrent.*;
+
+
+@Slf4j
+@Component
+@RocketMQMessageListener(
+        consumerGroup = "test_model_order",
+        topic = "test_model_order",
+        messageModel = MessageModel.CLUSTERING,consumeMode = ConsumeMode.ORDERLY)
+public class BuildSceneMQListenerTest implements RocketMQListener<String> {
+
+    private String consumerGroup ="test_model_order";
+
+    private String topicName = "test_model_order";
+
+    @Autowired
+    RocketMQProducer rocketMQProducer;
+
+    @Override
+    public  void onMessage(String message) {
+        log.info("开始处理消息,消费者组:{},主题名:{}, 消息内容:{}", consumerGroup, topicName, message);
+        System.out.println(1);
+        process2();
+        System.out.println(2);
+    }
+    public   void process2() {
+        Long startTime = System.currentTimeMillis();
+        final ExecutorService exec = Executors.newFixedThreadPool(1);
+        Callable<String> call = (Callable<String>) () -> {
+            while (System.currentTimeMillis() - startTime <=1000 * 60 ){
+
+            }
+          return "success";
+        };
+        Future<String> future = exec.submit(call);
+        try {
+            future.get(1, TimeUnit.HOURS); //任务处理超时时间设为 24个小时
+        } catch (InterruptedException e) {
+            e.printStackTrace();
+        } catch (ExecutionException e) {
+            e.printStackTrace();
+        } catch (TimeoutException e) {
+            e.printStackTrace();
+        }
+    }
+
+}

+ 25 - 0
4dkankan-center-modeling/src/main/java/com/fdkankan/modeling/test/TestSendController.java

@@ -0,0 +1,25 @@
+package com.fdkankan.modeling.test;
+
+import org.springframework.web.bind.annotation.RequestMapping;
+import org.springframework.web.bind.annotation.RestController;
+
+import javax.annotation.Resource;
+
+@RestController
+@RequestMapping("/send")
+public class TestSendController {
+
+    @Resource
+    com.fdkankan.mq.util.RocketMQProducer rocketMQProducer;
+
+    private final String topic ="test_model_order";
+
+    @RequestMapping("/test")
+    private void sentTest(){
+        rocketMQProducer.sendInOrder(topic,"1");
+    }
+    @RequestMapping("/test1")
+    private void sentTest1(){
+        rocketMQProducer.sendOneWay(topic,"1");
+    }
+}

+ 0 - 6
4dkankan-center-platform/pom.xml

@@ -90,12 +90,6 @@
           <version>2.0.0</version>
       </dependency>
 
-      <dependency>
-          <groupId>com.fdkankan</groupId>
-          <artifactId>4dkankan-common-utils</artifactId>
-          <version>2.0.3</version>
-      </dependency>
-
   </dependencies>
 
     <build>

+ 5 - 5
4dkankan-center-scene/src/main/java/com/fdkankan/scene/service/impl/SceneFileBuildServiceImpl.java

@@ -827,7 +827,7 @@ public class SceneFileBuildServiceImpl extends ServiceImpl<ISceneFileBuildMapper
         //删除oss的houst_floor.json
         uploadToOssUtil.delete("data/data"+sceneNum+File.separator+"houst_floor.json");
 
-        mqProducer.sendOneWay(topicModelingA, scene.getMqMsg());
+        mqProducer.sendInOrder(topicModelingA, scene.getMqMsg());
 
         Map<String, String> map = new HashMap<>();
         map.put("sceneNum", sceneNum);
@@ -1052,7 +1052,7 @@ public class SceneFileBuildServiceImpl extends ServiceImpl<ISceneFileBuildMapper
         if(jsonObject.getJSONObject("cam").getIntValue("type") == 10){
             mqProducer.sendOneWay(topicLaserA, scene.getMqMsg());
         }else  if(scene != null){
-            mqProducer.sendOneWay(topicModelingA, scene.getMqMsg());
+            mqProducer.sendInOrder(topicModelingA, scene.getMqMsg());
         }
     }
 
@@ -1070,7 +1070,7 @@ public class SceneFileBuildServiceImpl extends ServiceImpl<ISceneFileBuildMapper
                 //生成标定数据
                 ComputerUtil.createCalibrationData(calPath, filePath);
                 //开始标定计算
-                mqProducer.sendOneWay(topicModelingA, calPath);
+                mqProducer.sendInOrder(topicModelingA, calPath);
 
             }else if(jsonObject.get("calibration") != null && jsonObject.getString("calibration").equals("2")){
                 String mac = filePath.replace(ConstantFilePath.BUILD_MODEL_PATH, "").split("/")[0];
@@ -1085,7 +1085,7 @@ public class SceneFileBuildServiceImpl extends ServiceImpl<ISceneFileBuildMapper
                 //生成标定数据
                 ComputerUtil.createCalibrationData(calPath, filePath);
                 //开始标定计算
-                mqProducer.sendOneWay(topicModelingA, calPath);
+                mqProducer.sendInOrder(topicModelingA, calPath);
             }else {
 
                 String sceneNum = "";
@@ -1324,7 +1324,7 @@ public class SceneFileBuildServiceImpl extends ServiceImpl<ISceneFileBuildMapper
                     return;
                 }
                 if(sceneProPO != null){
-                    mqProducer.sendOneWay(topicModelingA, sceneProPO.getMqMsg());
+                    mqProducer.sendInOrder(topicModelingA, sceneProPO.getMqMsg());
                 }
 
             }