Ver código fonte

完成raster 模块 队列

wuweihao 5 anos atrás
pai
commit
17dacd2300
1 arquivos alterados com 142 adições e 53 exclusões
  1. 142 53
      src/main/java/com/fd/controller/RasterController.java

+ 142 - 53
src/main/java/com/fd/controller/RasterController.java

@@ -26,6 +26,7 @@ import java.util.ArrayList;
 import java.util.Date;
 import java.util.concurrent.BlockingQueue;
 import java.util.concurrent.LinkedBlockingQueue;
+import java.util.concurrent.TimeUnit;
 
 /**
  * Created by Owen on 2019/11/12 0012 9:40
@@ -65,8 +66,11 @@ public class RasterController {
     @PostConstruct
     private void init() {
 
-        // 消费队列
-//        new Thread(new FdModelController.modelSliceConsumerThread(modelQueue)).start();
+        // 判断坐标消费队列
+        new Thread(new JudgeCoordConsumerThread(coordQueue)).start();
+
+        // 切片消费队列
+        new Thread(new SliceConsumerThread(sliceQueue)).start();
 
     }
 
@@ -84,7 +88,6 @@ public class RasterController {
             return new R(50008,MsgCode.E50008);
         }
 
-//        return fileServer.uploadRasterBigFile(file, TypeCode.FILE_TYPE_RASTER);
         return rasterServer.uploadBigFile(file);
 
     }
@@ -111,34 +114,85 @@ public class RasterController {
     @GetMapping("command/judge/coord/{fileId}/")
     private R cmdJudgeCoord(@PathVariable("fileId") Long fileId) {
         log.info("run cmdJudgeCoord: {}", fileId);
-//        FileEntity entity = fileServer.findById(fileId);
-
 
         OutputFileEntity entity = rasterServer.findById(fileId);
 
-
         String cmd = Command.RASTER_JUDGE_COORD;
         cmd = cmd.replace("@inputFile", entity.getUploadPath());
         log.info("cmd: {}", cmd);
 
-        Integer isJudge = cmdServer.exeCmdRasterJudgeCoord(cmd);
-
-        if (1000 == isJudge){
-            log.info("need to transform");
-            // 严格坐标转换
-            entity = cmdTansformGdalwarpStrict(entity);
-        } else if (0 == isJudge){
-            log.info("not to transform");
-        } else {
-            log.info("error exeCmd");
-            return new R(50005, MsgCode.E50005);
+        // 把数据放入队列中
+        MyQueue data = new MyQueue();
+        data.setOutputFile(entity);
+        data.setStr(cmd);
+        try {
+            coordQueue.offer(data, 1, TimeUnit.SECONDS);
+            log.info("入队成功");
+        } catch (InterruptedException e) {
+            e.printStackTrace();
         }
 
 
+//        Integer isJudge = cmdServer.exeCmdRasterJudgeCoord(cmd);
+//
+//        if (1000 == isJudge){
+//            log.info("need to transform");
+//            // 严格坐标转换
+//            entity = cmdTansformGdalwarpStrict(entity);
+//        } else if (0 == isJudge){
+//            log.info("not to transform");
+//        } else {
+//            log.info("error exeCmd");
+//            return new R(50005, MsgCode.E50005);
+//        }
+
         return new R(200, entity);
     }
 
 
+    /**
+     * 坐标判断消费队列
+     */
+    public class JudgeCoordConsumerThread implements Runnable{
+
+        private BlockingQueue<MyQueue> queue;
+        public JudgeCoordConsumerThread(BlockingQueue<MyQueue> queue){
+            this.queue = queue;
+        }
+
+        @Override
+        public void run() {
+            log.warn("run JudgeCoordConsumerThread");
+            while (true) {
+                try {
+                    MyQueue data = queue.poll(2, TimeUnit.SECONDS);
+                    if (data != null) {
+                        log.info("消费者,拿到队列中的数据data:" + data.toString());
+
+                        Integer isJudge = cmdServer.exeCmdRasterJudgeCoord(data.getStr());
+                        OutputFileEntity  entity = data.getOutputFile();
+
+                        if (1000 == isJudge){
+                            log.info("need to transform");
+                            // 严格坐标转换
+                            entity = cmdTansformGdalwarpStrict(entity);
+                        } else if (0 == isJudge){
+                            log.info("not to transform");
+                        } else {
+                            log.info("error exeCmd");
+                            entity.setStatus(0);
+                            rasterServer.save(entity);
+                        }
+                        OutputFileEntity obj = data.getOutputFile();
+
+                    }
+                } catch (InterruptedException e) {
+                    e.printStackTrace();
+                }
+            }
+        }
+    }
+
     @ApiOperation("栅格数据切片命令")
     @GetMapping("command/osgeo/{fileId}/")
     private R cmdOsgeo(@PathVariable("fileId") Long fileId) {
@@ -146,8 +200,6 @@ public class RasterController {
         OutputFileEntity entity = rasterServer.findById(fileId);
 
         String fileName = StringUtils.substringBeforeLast(entity.getFileName(), ".");
-        // fileName_tiles
-//        fileName = fileName + "_tiles";
 
         String outFilePath = OUTPUT_FILE_PATH + "tiles" + File.separator + fileName;
         FileUtils.createDir(outFilePath);
@@ -159,18 +211,6 @@ public class RasterController {
         log.info("cmd: {}", cmd);
 
 
-//        FileEntity fileEntity = new FileEntity();
-//        fileEntity.setFileName(fileName);
-//        fileEntity.setFileUrl(INPUT_FILE_PATH + fileName);
-//        fileEntity.setCreateTime(new Date());
-//        fileEntity.setUpdateTime(new Date());
-//        fileEntity.setType(TypeCode.FILE_TYPE_RASTER);
-//        // 设个默认进度给前端显示
-//        fileEntity.setProgress(1);
-//        fileEntity.setStatus(6);
-//
-//        fileEntity = fileServer.save(fileEntity);
-
         // 设个默认进度给前端显示
         entity.setProgress(1);
         entity.setStatus(6);
@@ -180,42 +220,91 @@ public class RasterController {
         entity = rasterServer.save(entity);
 
         // 多线程运行切片
-        new Thread(new RasterSliceThread(cmd, entity)).start();
+//        new Thread(new RasterSliceThread(cmd, entity)).start();
+
+        // 把数据放入队列中
+        MyQueue data = new MyQueue();
+        data.setOutputFile(entity);
+        data.setStr(cmd);
+        try {
+            sliceQueue.offer(data, 1, TimeUnit.SECONDS);
+            log.info("入队成功");
+        } catch (InterruptedException e) {
+            e.printStackTrace();
+        }
 
         return new R(200, entity) ;
     }
 
 
-    public class RasterSliceThread implements Runnable{
-
-        private String cmd;
-
-        private OutputFileEntity entity;
+    /**
+     * 切片消费队列
+     */
+    public class SliceConsumerThread implements Runnable{
 
-        private RasterSliceThread(String cmd, OutputFileEntity entity){
-            this.cmd = cmd;
-            this.entity = entity;
+        private BlockingQueue<MyQueue> queue;
+        public SliceConsumerThread(BlockingQueue<MyQueue> queue){
+            this.queue = queue;
         }
 
         @Override
         public void run() {
-            log.warn("run RasterSliceThread");
-            Integer integer = cmdServer.exeCmdRasterSlice(cmd, entity);
-            if (integer != 0) {
-                log.info("error command exeCmdRasterSlice");
-                // 如果命令运行失败,状态改为0
-                entity.setStatus(0);
-                rasterServer.save(entity);
-                return;
+            log.warn("run SliceConsumerThread");
+            while (true) {
+                try {
+                    MyQueue data = queue.poll(2, TimeUnit.SECONDS);
+                    if (data != null) {
+                        log.info("消费者,拿到队列中的数据data:" + data.toString());
+
+                        OutputFileEntity  entity = data.getOutputFile();
+
+                        Integer integer = cmdServer.exeCmdRasterSlice(data.getStr(), entity);
+                        if (integer != 0) {
+                            log.info("error command exeCmdRasterSlice");
+                            // 如果命令运行失败,状态改为0
+                            entity.setStatus(0);
+                            rasterServer.save(entity);
+                            return;
+                        }
+                    }
+                } catch (InterruptedException e) {
+                    e.printStackTrace();
+                }
             }
-
-            // 修改状态, 告诉前端切片完成
-            entity.setStatus(5);
-            rasterServer.save(entity);
-            log.warn("end RasterSliceThread");
         }
     }
 
+
+//    public class RasterSliceThread implements Runnable{
+//
+//        private String cmd;
+//
+//        private OutputFileEntity entity;
+//
+//        private RasterSliceThread(String cmd, OutputFileEntity entity){
+//            this.cmd = cmd;
+//            this.entity = entity;
+//        }
+//
+//        @Override
+//        public void run() {
+//            log.warn("run RasterSliceThread");
+//            Integer integer = cmdServer.exeCmdRasterSlice(cmd, entity);
+//            if (integer != 0) {
+//                log.info("error command exeCmdRasterSlice");
+//                // 如果命令运行失败,状态改为0
+//                entity.setStatus(0);
+//                rasterServer.save(entity);
+//                return;
+//            }
+//
+//            // 修改状态, 告诉前端切片完成
+//            entity.setStatus(5);
+//            rasterServer.save(entity);
+//            log.warn("end RasterSliceThread");
+//        }
+//    }
+
     @ApiOperation("栅格数据进度查询")
     @GetMapping("progress/{fileId}/")
     private R getProgress(@PathVariable("fileId") Long fileId) {