Browse Source

更换队列

gemercheung 3 years ago
parent
commit
3cb9ccd409

+ 1 - 1
config.yaml

@@ -20,7 +20,7 @@ queueRedis:
   host: 'localhost' #远程调试需要设置bindip 为0.0.0.0 并且设置密码
   password: 'redis9394' # 非远程不需要密码
   decode_responses: true
-  db: 16
+  db: 0
 
 # redis:
 #   port: 6379

+ 8 - 9
src/get-router/get-router.service.ts

@@ -14,21 +14,20 @@ export class GetRouterService {
     for (let i = 0; i < keys.length; ++i) {
       const breakPointRes = await this.cacheService.get(keys[i]);
       const breakPoint = JSON.parse(breakPointRes);
-      if(breakPoint.breakPointId == startPointId){
+      if (breakPoint.breakPointId == startPointId) {
         continue;
       }
       const position = breakPoint.position;
-      if(minDis == null){
-        minDis = this.getDistance(clicking_point,position);
+      if (minDis == null) {
+        minDis = this.getDistance(clicking_point, position);
         endPoint = breakPoint;
-      }
-      else if(minDis>this.getDistance(clicking_point,position)){
+      } else if (minDis > this.getDistance(clicking_point, position)) {
         endPoint = breakPoint;
-        minDis=this.getDistance(clicking_point,position);
+        minDis = this.getDistance(clicking_point, position);
       }
     }
 
-    if(minDis>100){
+    if (minDis > 100) {
       return [];
     }
 
@@ -77,7 +76,7 @@ export class GetRouterService {
         const itemRes = await this.cacheService.get(
           'breakpoints:app_id:' + appId + ':break_point_id:' + neighPointId,
         );
-        let item = JSON.parse(itemRes);
+        const item = JSON.parse(itemRes);
         //g 到父节点的位置
         const g =
           currentPointInfo.G +
@@ -106,7 +105,7 @@ export class GetRouterService {
       }
 
       openList.sort(this.sortF); //这一步是为了循环回去的时候,找出 F 值最小的, 将它从 "开启列表" 中移掉
-    } while ((result_index = this.existList(endPoint, openList))==-1);
+    } while ((result_index = this.existList(endPoint, openList)) == -1);
 
     //判断结果列表是否为空
     if (result_index == -1) {

+ 8 - 8
src/move/move.service.ts

@@ -3,7 +3,7 @@ import { CacheService } from 'src/cache/cache.service';
 
 @Injectable()
 export class MoveService {
-  constructor(private cacheService: CacheService) { }
+  constructor(private cacheService: CacheService) {}
   private Actions = {
     Clicking: 1,
     Rotation: 1014,
@@ -192,18 +192,18 @@ export class MoveService {
         //读redis里的数据
         const startBreakPointRes = await this.cacheService.get(
           'breakpoints:app_id:' +
-          appId +
-          ':break_point_id:' +
-          start_break_point_id,
+            appId +
+            ':break_point_id:' +
+            start_break_point_id,
         );
 
         const startBreakPoint = JSON.parse(startBreakPointRes);
 
         const endBreakPointRes = await this.cacheService.get(
           'breakpoints:app_id:' +
-          appId +
-          ':break_point_id:' +
-          end_break_point_id,
+            appId +
+            ':break_point_id:' +
+            end_break_point_id,
         );
         const endBreakPoint = JSON.parse(endBreakPointRes);
         pathReplys = this.createCacheReplys(
@@ -304,7 +304,7 @@ export class MoveService {
     const user = this.users[userId];
     user.breakPointId = breakPointId;
 
-    debugger;
+    // debugger;
     user.player.position =
       lastReply['newUserStates'][0].playerState.player.position;
     user.player.angle = lastReply['newUserStates'][0].playerState.player.angle;

+ 1 - 1
src/rotate/rotate.service.ts

@@ -177,7 +177,7 @@ export class RotateService {
 
       actionRequests.splice(0, sub);
       const hAngle = horizontal_move * 90;
-      if (Math.abs(hAngle) < 3) {
+      if (Math.abs(hAngle) < 1) {
         user.rotateInfo.horizontal_move = horizontal_move;
         //user.traceIds = traceIds;
         this.replies[userId] = reply;

+ 56 - 9
src/scene/rotate-consumer.ts

@@ -1,22 +1,69 @@
-import { OnQueueActive, Process, Processor } from '@nestjs/bull';
+import {
+  OnGlobalQueueCompleted,
+  OnQueueActive,
+  OnQueueCleaned,
+  OnQueueCompleted,
+  OnQueueDrained,
+  OnQueueStalled,
+  Process,
+  Processor,
+} from '@nestjs/bull';
+import { Injectable } from '@nestjs/common';
 import { Job } from 'bull';
+import { SceneService } from './scene.service';
+import { StreamService } from './stream/stream.service';
 
 @Processor('rotate')
+@Injectable()
 export class RotateConsumer {
+  constructor(
+    private streamService: StreamService,
+    private sceneService: SceneService,
+  ) {}
+ 
+  private _checkerRotateDone: NodeJS.Timeout;
   @Process()
   async processFrame(job: Job<unknown>) {
-    // const progress = 0;
-    // for (i = 0; i < 100; i++) {
-    // b);
-    console.log('job', job);
+    const jobData = job.data as any as StreamFrameType;
+    // console.log('jobData', jobData);
+    const done = await this.streamService.pushFrameToSteam(jobData);
     // }
-    return {};
+    return { done: done };
   }
 
   @OnQueueActive()
   onActive(job: Job) {
-    console.log(
-      `Processing job ${job.id} of type ${job.name} with data ${job.data}...`,
-    );
+    clearTimeout(this._checkerRotateDone);
+  }
+
+  @OnQueueDrained()
+  onDrained(job: Job) {
+    console.log(`onDrained`);
+  }
+
+  @OnQueueCompleted()
+  onQueueComplete(job: Job) {
+    console.log(`onQueueComplete-frame`, job.data.frame);
+    this._checkerRotateDone = setTimeout(() => {
+      console.log('1秒后开流');
+      const next = job.data.frame + 1;
+      this.sceneService.resumeStream();
+      this.sceneService.onRotating.next(false);
+      this.sceneService.frameCnt.next(next);
+    }, 1000);
+  }
+
+  // @OnGlobalQueueCompleted()
+  // onGlobalQueueCompleted() {
+  //   console.log(`onGlobalQueueCompleted`);
+  // }
+
+  @OnQueueStalled()
+  onStalled() {
+    console.log(`OnQueueStalled`);
+  }
+  @OnQueueCleaned()
+  onCleaned() {
+    console.log(`OnQueueCleaned`);
   }
 }

+ 5 - 0
src/scene/scene.module.ts

@@ -6,6 +6,7 @@ import { CacheService } from '../cache/cache.service';
 import { StreamService } from './stream/stream.service';
 import { BullModule } from '@nestjs/bull';
 import { RotateConsumer } from './rotate-consumer';
+import { WalkingConsumer } from './walking-consumer';
 import { RotateService } from '../rotate/rotate.service';
 import { MoveService } from '../move/move.service';
 import { GetRouterService } from 'src/get-router/get-router.service';
@@ -16,6 +17,9 @@ import { GetRouterService } from 'src/get-router/get-router.service';
     BullModule.registerQueue({
       name: 'rotate',
     }),
+    BullModule.registerQueue({
+      name: 'walking',
+    }),
   ],
   controllers: [],
   providers: [
@@ -24,6 +28,7 @@ import { GetRouterService } from 'src/get-router/get-router.service';
     StreamService,
     RotateService,
     RotateConsumer,
+    WalkingConsumer,
     MoveService,
     GetRouterService,
   ],

+ 268 - 234
src/scene/scene.service.ts

@@ -3,7 +3,6 @@ import { ClientGrpc, Client } from '@nestjs/microservices';
 import { grpcClientOptions } from './grpc-scene.options';
 import { Logger } from '@nestjs/common';
 import { DataChannel } from 'node-datachannel';
-import * as path from 'path';
 import { BehaviorSubject, filter, ignoreElements, take } from 'rxjs';
 // import * as streamBuffers from 'stream-buffers';
 import { ActionType } from './actionType';
@@ -15,6 +14,7 @@ import { RotateService } from 'src/rotate/rotate.service';
 import { DelayQueue, RxQueue, ThrottleQueue, DebounceQueue } from 'rx-queue';
 import { MoveService } from 'src/move/move.service';
 import { GetRouterService } from 'src/get-router/get-router.service';
+import { join } from 'path';
 
 @Injectable()
 export class SceneService implements OnModuleInit, OnModuleDestroy {
@@ -25,40 +25,42 @@ export class SceneService implements OnModuleInit, OnModuleDestroy {
     private moveService: MoveService,
     private getRouterService: GetRouterService,
     @InjectQueue('rotate') private rotateQueue: Queue,
+    @InjectQueue('walking') private walkingQueue: Queue,
   ) { }
   @Client(grpcClientOptions) private readonly client: ClientGrpc;
-  private sceneGrpcService: SceneGrpcService;
 
-  private logger: Logger = new Logger('SceneService');
-  private frameCntInterval = 1000;
   public _frameInteval: NodeJS.Timeout;
   public _frameTimeout: NodeJS.Timeout;
-  private channel: DataChannel;
   public startSteaming = new BehaviorSubject<boolean>(false);
+  public onRotating = new BehaviorSubject<boolean>(false);
+  public onMoving = new BehaviorSubject<boolean>(false);
+  public frameCnt = new BehaviorSubject<number>(-1);
+  private rotateframeCnt = -1;
+  private moveframeCnt = -1;
+
+  private sceneGrpcService: SceneGrpcService;
+  private channel: DataChannel;
+  private logger: Logger = new Logger('SceneService');
+  private frameCntInterval = 1000;
   private user_id: string;
   private roomId: string;
   private onSteaming = false;
-  private rotateframeCnt = -1;
+
   private mockserverTime = Date.now() - 1653000000478;
   private lastRenderMedia = '';
-  private frameCnt = new BehaviorSubject<number>(-1);
   private frameCntSubscription: any;
-  private roQueueSubscription: any;
-  private moveQueueSubscription: any;
+  // private roQueueSubscription: any;
+  // private moveQueueSubscription: any;
   private walkingSub: any;
 
   private streamServiceSub: any;
-  private roQueue: RxQueue = new DelayQueue(100);
+
   private clickQueue: RxQueue = new DebounceQueue(500);
   private moveQueue: RxQueue = new DelayQueue(100);
   private rotateTimeStamp: number;
   private lastMoveCnt = -1;
-  private currentMoveMaker = '';
-  private onMoving = false;
-  private onRotating = false;
-  private firstRender = false;
-  private currentPoint = '';
 
+  private firstRender = false;
 
   public lastMoveStreamFrame = new BehaviorSubject<StreamFrameType>({
     frame: -1,
@@ -78,7 +80,7 @@ export class SceneService implements OnModuleInit, OnModuleDestroy {
     };
   }
 
-  startStream(): void {
+  public startStream(): void {
     clearInterval(this._frameInteval);
     if (this.frameCnt.value === -1) {
       this._frameInteval = setInterval(async () => {
@@ -88,36 +90,34 @@ export class SceneService implements OnModuleInit, OnModuleDestroy {
     }
   }
 
-  holdSteam(): void {
+  public holdSteam(): void {
     clearInterval(this._frameInteval);
   }
 
-  resumeStream(value: number) {
-    this.onMoving = false;
-    this.frameCnt.next(value);
-    clearTimeout(this._frameTimeout);
+  public resumeStream(): void {
+    this.onMoving.next(false);
+    this.moveframeCnt = -1;
+    this.rotateframeCnt = -1;
     clearInterval(this._frameInteval);
-    this._frameTimeout = setTimeout(() => {
-      this._frameInteval = setInterval(async () => {
-        const next = this.frameCnt.getValue() + 1;
-        this.frameCnt.next(next);
-      }, 1000);
+    this._frameInteval = setInterval(async () => {
+      const next = this.frameCnt.getValue() + 1;
+      this.frameCnt.next(next);
     }, 1000);
   }
 
-  stopStream(): void {
+  public stopStream(): void {
     if (this.frameCntSubscription) {
       this.frameCntSubscription.unsubscribe();
       this.frameCntSubscription = null;
     }
-    if (this.roQueueSubscription) {
-      this.roQueueSubscription.unsubscribe();
-      this.roQueueSubscription = null;
-    }
-    if (this.moveQueueSubscription) {
-      this.moveQueueSubscription.unsubscribe();
-      this.moveQueueSubscription = null;
-    }
+    // if (this.roQueueSubscription) {
+    //   this.roQueueSubscription.unsubscribe();
+    //   this.roQueueSubscription = null;
+    // }
+    // if (this.moveQueueSubscription) {
+    //   this.moveQueueSubscription.unsubscribe();
+    //   this.moveQueueSubscription = null;
+    // }
     this.frameCnt.next(-1);
     clearInterval(this._frameInteval);
     this.rotateframeCnt = -1;
@@ -134,12 +134,12 @@ export class SceneService implements OnModuleInit, OnModuleDestroy {
     }
   }
 
-  getRoute(request: RouteRequest) {
-    return this.sceneGrpcService.getRoute(request);
-  }
-  getBreakPoint(request: GetBreakPointRequest) {
-    return this.sceneGrpcService.getBreakPoint(request);
-  }
+  // getRoute(request: RouteRequest) {
+  //   return this.sceneGrpcService.getRoute(request);
+  // }
+  // getBreakPoint(request: GetBreakPointRequest) {
+  //   return this.sceneGrpcService.getBreakPoint(request);
+  // }
 
   init(request: InitRequest) {
     try {
@@ -163,71 +163,78 @@ export class SceneService implements OnModuleInit, OnModuleDestroy {
 
   async rotate(request: RotateRequest) {
     try {
-      if (!this.roQueueSubscription) {
-        this.handleRotateStream();
+      // if (!this.onSteaming) {
+      let redisMeta: StreamReplyType;
+      this.onRotating.next(true);
+      if (this.onMoving.value) {
+        this.onMoving.next(false);
+        const lastStreamFrame = this.lastMoveStreamFrame.getValue();
+        const metaData: StreamReplyType = JSON.parse(
+          lastStreamFrame.metaData,
+        ) as any as StreamReplyType;
+        const newUserStates: NewUserStatesType = metaData.newUserStates.find(
+          (item) => item.userId === this.user_id,
+        );
+        const trace_id = metaData.traceIds[0];
+        const userId = newUserStates.userId;
+        const breakPointId = lastStreamFrame.marker
+          .replace('P', '')
+          .replace('T', '-');
+        const cameraAngle = newUserStates.playerState.camera.angle;
+        const playerAngle = newUserStates.playerState.player.angle;
+        console.log('stop-data', trace_id, userId, cameraAngle, cameraAngle);
+        redisMeta = await this.moveService.stop(
+          trace_id,
+          userId,
+          breakPointId,
+          cameraAngle,
+          playerAngle,
+        );
+        console.log('stop-redisMeta', redisMeta);
+        // redisMeta = await this.rotateService.rotate(request);
+      } else {
+        redisMeta = await this.rotateService.rotate(request);
       }
-      if (!this.onSteaming) {
-        let redisMeta: StreamReplyType;
-        this.onRotating = true;
-        if (this.onMoving) {
-          const lastStreamFrame = this.lastMoveStreamFrame.getValue();
-          const metaData: StreamReplyType = JSON.parse(
-            lastStreamFrame.metaData,
-          ) as any as StreamReplyType;
-          const newUserStates: NewUserStatesType = metaData.newUserStates.find(
-            (item) => item.userId === this.user_id,
-          );
-          const trace_id = metaData.traceIds[0];
-          const userId = newUserStates.userId;
-          const breakPointId = lastStreamFrame.marker
-            .replace('P', '')
-            .replace('T', '-');
-          const cameraAngle = newUserStates.playerState.camera.angle;
-          const playerAngle = newUserStates.playerState.player.angle;
-          this.onMoving = false;
-          console.log('stop-data', trace_id, userId, cameraAngle, cameraAngle);
-          redisMeta = await this.moveService.stop(
-            trace_id,
-            userId,
-            breakPointId,
-            cameraAngle,
-            playerAngle,
-          );
-          console.log('stop-redisMeta', redisMeta);
-          // redisMeta = await this.rotateService.rotate(request);
-        } else {
-          redisMeta = await this.rotateService.rotate(request);
-        }
 
-        if (redisMeta && 'mediaSrc' in redisMeta) {
-          const mediaSrc: string = redisMeta.mediaSrc || '';
-          if (mediaSrc.length > 0) {
-            let src = mediaSrc.split('?')[0];
-            // 临时本地替换路经
-            src = src.replace('/0000000001/', '');
-            // 判断不是同一条源时才推出
-            if (this.lastRenderMedia !== src) {
-              // console.log('不同源');
-              // this.frameCnt += 1;
-              console.log('[core-src]', src);
-
-              this.holdSteam();
-              this.lastRenderMedia = src;
-              const clipPath = path.join(__dirname, `../ws/video/${src}`);
-              // console.log('src-clipPath', src, clipPath);
-              delete redisMeta.mediaSrc;
-              const stream: StreamFrameType = {
-                frame: -1,
-                clipPath: clipPath,
-                metaData: JSON.stringify(redisMeta),
-                serverTime: this.mockserverTime,
-                DIR: 3,
-              };
-              this.roQueue.next(stream);
+      if (redisMeta && 'mediaSrc' in redisMeta) {
+        const mediaSrc: string = redisMeta.mediaSrc || '';
+        if (mediaSrc.length > 0) {
+          let src = mediaSrc.split('?')[0];
+          // 临时本地替换路经
+          src = src.replace('/0000000001/', '');
+          // 判断不是同一条源时才推出
+          if (src.length > 0) {
+            // console.log('不同源');
+            // this.frameCnt += 1;
+            this.holdSteam();
+            this.lastRenderMedia = src;
+            const clipPath = join(__dirname, `../ws/video/${src}`);
+            // console.log('src-clipPath', src, clipPath);
+            delete redisMeta.mediaSrc;
+            if (this.rotateframeCnt === -1) {
+              this.rotateframeCnt = this.frameCnt.value;
             }
+            this.rotateframeCnt += 1;
+            const random_boolean = Math.random() < 0.5;
+
+            const stream: StreamFrameType = {
+              frame: this.rotateframeCnt,
+              clipPath: clipPath,
+              metaData: JSON.stringify(redisMeta),
+              serverTime: this.mockserverTime,
+              DIR: random_boolean ? 1 : 3,
+            };
+            this.rotateQueue.add(stream, {
+              delay: 5,
+              jobId: `rotate:${this.user_id}:${this.rotateframeCnt}`,
+              // lifo: true,
+            });
+          } else {
+            this.onRotating.next(false);
           }
         }
       }
+      // }
     } catch (error) {
       this.logger.error('rotate', error);
       console.log('error', error);
@@ -300,63 +307,90 @@ export class SceneService implements OnModuleInit, OnModuleDestroy {
     }
   }
 
-  handleIframeRequest() {
-    const lastStreamFrame = this.streamService.lastStreamFrame.getValue();
-    lastStreamFrame.DIR = 1;
-    console.log('lastStreamFrame', lastStreamFrame);
+  async handleIframeRequest() {
+    // const lastStreamFrame = this.streamService.lastStreamFrame.getValue();
+    // lastStreamFrame.DIR = 1;
+    // console.log('lastStreamFrame', lastStreamFrame);
     const nextFrame = this.frameCnt.getValue() + 1;
-    lastStreamFrame.frame = nextFrame;
-    this.streamService.pushFrameToSteam(lastStreamFrame);
+    // lastStreamFrame.frame = nextFrame;
+    // this.streamService.pushFrameToSteam(lastStreamFrame);
+
+    const redisDataAuto = await this.rotateService.echo(this.user_id);
+    if (redisDataAuto) {
+      'mediaSrc' in redisDataAuto && delete redisDataAuto.mediaSrc;
+      const streamMeta: StreamMetaType = {
+        frame: nextFrame,
+        metaData: JSON.stringify(redisDataAuto),
+      };
+      this.streamService.pushMetaDataToSteam(streamMeta);
+    }
   }
 
   async walking(req) {
-    console.log('walking', req);
-    this.clickQueue.next(req);
-    this.walkingSub = this.clickQueue.subscribe(async (request) => {
-      const user = this.moveService.users[this.user_id];
-      const path = await this.getRouterService.searchRoad(
-        user.appId,
-        user.breakPointId,
-        req.clicking_action.clicking_point,
-      );
-      const walkingRes = await this.moveService.move(path, request);
-      console.log('walkingRes', walkingRes)
-      debugger;
-      if (walkingRes && !this.onMoving) {
-        this.onMoving = true;
-        this.holdSteam();
-        if (!this.moveQueueSubscription) {
-          this.handleMoveSteam();
-        }
-        const res = Object.keys(walkingRes).map((item) => {
-          console.log('item', item);
-          return Array.from(walkingRes[item]).map((i) => {
-            i['marker'] = item;
-            return i;
+    try {
+      console.log('walking', req);
+      this.clickQueue.next(req);
+      this.walkingSub = this.clickQueue.subscribe(async (request) => {
+        const user = this.moveService.users[this.user_id];
+        const path = await this.getRouterService.searchRoad(
+          user.appId,
+          user.breakPointId,
+          req.clicking_action.clicking_point,
+        );
+        const walkingRes = await this.moveService.move(path, request);
+        // console.log('walkingRes', walkingRes);
+        if (walkingRes && !this.onMoving.value) {
+          this.onMoving.next(true);
+          this.holdSteam();
+          // if (!this.moveQueueSubscription) {
+          //   this.handleMoveSteam();
+          // }
+          if (this.moveframeCnt === -1) {
+            this.moveframeCnt = this.frameCnt.value;
+          }
+
+          const res = Object.keys(walkingRes).map((item) => {
+            console.log('item', item);
+            return Array.from(walkingRes[item]).map((i) => {
+              i['marker'] = item;
+              return i;
+            });
           });
-        });
-        const seqs = Array.from(res).flat();
-        this.lastMoveCnt = this.frameCnt.value + seqs.length;
-        console.log('lastMoveCnt', this.lastMoveCnt);
-        seqs.forEach((frame: StreamReplyType) => {
-          const mediaSrc = frame.mediaSrc;
-          delete frame.mediaSrc;
-          const stream: StreamFrameType = {
-            frame: -1,
-            clipPath: mediaSrc,
-            metaData: JSON.stringify(frame),
-            serverTime: this.mockserverTime,
-            DIR: 1,
-          };
-          this.moveQueue.next(stream);
-        });
-      }
-    });
+          const seqs = Array.from(res).flat();
+          // this.lastMoveCnt = this.frameCnt.value + seqs.length;
+          seqs.forEach((frame: StreamReplyType) => {
+            const mediaSrc = frame.mediaSrc;
+            let src = mediaSrc.split('?')[0];
+            // 临时本地替换路经
+            src = src.replace('/0000000001/', '');
+            const clipPath = join(__dirname, `../ws/video/${src}`);
+            this.moveframeCnt += 1;
+            delete frame.mediaSrc;
+            const stream: StreamFrameType = {
+              frame: this.moveframeCnt,
+              clipPath: clipPath,
+              metaData: JSON.stringify(frame),
+              serverTime: this.mockserverTime,
+              DIR: 1,
+            };
+
+            this.walkingQueue.add(stream, {
+              delay: 5,
+              jobId: `walking:${this.user_id}:${this.moveframeCnt}`,
+              // lifo: true,
+            });
+            // this.moveQueue.next(stream);
+          });
+        }
+      });
+    } catch (error) {
+      this.logger.error('walking', error);
+    }
   }
 
   async handleBreath(request) {
     const npsRes = await this.moveService.getBreakPoints(request);
-    console.log('npsRes', npsRes);
+    // console.log('npsRes', npsRes);
     this.streamService.pushNormalDataToStream(npsRes);
   }
 
@@ -414,7 +448,7 @@ export class SceneService implements OnModuleInit, OnModuleDestroy {
               let src = mediaSrc.split('?')[0];
               // 临时本地替换路经
               src = src.replace('/0000000001/', '');
-              const clipPath = path.join(__dirname, `../ws/video/${src}`);
+              const clipPath = join(__dirname, `../ws/video/${src}`);
               delete redisData.mediaSrc;
               this.logger.log(
                 `user:${this.user_id}:first render stream` +
@@ -426,7 +460,8 @@ export class SceneService implements OnModuleInit, OnModuleDestroy {
               );
               if (status) {
                 this.firstRender = true;
-                this.resumeStream(2);
+                this.frameCnt.next(2);
+                this.resumeStream();
               } else {
                 this.logger.error('first render problem', status);
               }
@@ -436,7 +471,8 @@ export class SceneService implements OnModuleInit, OnModuleDestroy {
         if (
           frame > 1 &&
           !this.onSteaming &&
-          !this.onMoving &&
+          !this.onMoving.value &&
+          !this.onRotating.value &&
           this.firstRender
         ) {
           console.log(`空白流::${frame}`);
@@ -455,89 +491,87 @@ export class SceneService implements OnModuleInit, OnModuleDestroy {
       }
     });
   }
-  handleRotateStream() {
-    this.roQueueSubscription = this.roQueue.subscribe(
-      async (stream: StreamFrameType) => {
-        this.rotateTimeStamp = Date.now();
-        if (this.rotateframeCnt === -1) {
-          this.rotateframeCnt = this.frameCnt.value;
-        }
-        this.rotateframeCnt += 1;
-        stream.frame = this.rotateframeCnt;
-        console.log('[media]', stream.clipPath);
-        this.logger.log(
-          `roQueueSubscription:frame:${this.rotateframeCnt}  ` +
-          JSON.stringify(stream.metaData),
-        );
-        await this.streamService.pushFrameToSteam(stream);
-        setTimeout(() => {
-          const now = Date.now();
-          if (now - this.rotateTimeStamp > 300) {
-            const next = this.rotateframeCnt + 1;
-            this.resumeStream(next);
-            this.rotateframeCnt = -1;
-            this.onMoving = false;
-            this.onRotating = false;
-          }
-        }, 300);
-      },
-    );
-  }
+  // handleRotateStream() {
+  //   this.roQueueSubscription = this.roQueue.subscribe(
+  //     async (stream: StreamFrameType) => {
+  //       this.rotateTimeStamp = Date.now();
 
-  cleanMoveSteam() {
-    if (this.moveQueueSubscription) {
-      this.moveQueueSubscription.unsubscribe();
-      this.lastMoveCnt = -1;
-      this.moveQueueSubscription = null;
-    }
-    if (this.walkingSub) {
-      this.walkingSub.unsubscribe();
-      this.walkingSub = null;
-    }
-  }
-  handleMoveSteam() {
-    this.moveQueueSubscription = this.moveQueue.subscribe(
-      async (stream: StreamFrameType) => {
-        const metaData: StreamReplyType = JSON.parse(stream.metaData);
-        console.log('handleMoveSteam-onMoving', this.onMoving);
-        stream.marker = metaData.marker;
-        this.lastMoveStreamFrame.next(stream);
-        const next = this.frameCnt.value + 1;
-        this.currentMoveMaker = metaData.marker;
-        if (this.onMoving) {
-          this.frameCnt.next(next);
-        } else {
-          console.log('handleMoveSteam stop', next, this.currentMoveMaker);
-          this.cleanMoveSteam();
-          this.resumeStream(next);
-          return;
-        }
-        let src = stream.clipPath.split('?')[0];
-        // // 临时本地替换路经
-        src = src.replace('/0000000001/', '');
-        const clipPath = path.join(__dirname, `../ws/video/${src}`);
+  //       stream.frame = this.rotateframeCnt;
+  //       console.log('[media]', stream.clipPath);
+  //       this.logger.log(
+  //         `roQueueSubscription:frame:${this.rotateframeCnt}  ` +
+  //         JSON.stringify(stream.metaData),
+  //       );
+  //       // await this.streamService.pushFrameToSteam(stream);
+  //       setTimeout(() => {
+  //         const now = Date.now();
+  //         if (now - this.rotateTimeStamp > this.rotatePeriod) {
+  //           console.log('rotate end');
+  //           // const next = this.rotateframeCnt + 1;
+  //           // this.resumeStream(next);
+  //           // this.rotateframeCnt = -1;
+  //           // this.onMoving = false;
+  //           // this.onRotating = false;
+  //         }
+  //       }, 300);
+  //     },
+  //   );
+  // }
 
-        const streamData: StreamFrameType = {
-          frame: next,
-          clipPath: clipPath,
-          metaData: stream.metaData,
-          serverTime: this.mockserverTime,
-          DIR: 3,
-        };
-        await this.streamService.pushFrameToSteam(streamData);
-
-        if (this.lastMoveCnt == this.frameCnt.getValue()) {
-          const next = this.frameCnt.getValue() + 1;
-          console.log('last', next);
-          this.resumeStream(next);
-          this.cleanMoveSteam();
-          const lastFrame = this.lastMoveStreamFrame.getValue();
-          const userId = this.user_id;
-          const breakPointId = lastFrame.marker.split('T')[1];
-          const lastReply = JSON.parse(lastFrame.metaData);
-          this.moveService.updateUser(userId, breakPointId, lastReply);
-        }
-      },
-    );
-  }
+  // cleanMoveSteam() {
+  //   if (this.moveQueueSubscription) {
+  //     this.moveQueueSubscription.unsubscribe();
+  //     this.lastMoveCnt = -1;
+  //     this.moveQueueSubscription = null;
+  //   }
+  //   if (this.walkingSub) {
+  //     this.walkingSub.unsubscribe();
+  //     this.walkingSub = null;
+  //   }
+  // }
+  // handleMoveSteam() {
+  //   this.moveQueueSubscription = this.moveQueue.subscribe(
+  //     async (stream: StreamFrameType) => {
+  //       const metaData: StreamReplyType = JSON.parse(stream.metaData);
+  //       console.log('handleMoveSteam-onMoving', this.onMoving);
+  //       stream.marker = metaData.marker;
+  //       this.lastMoveStreamFrame.next(stream);
+  //       const next = this.frameCnt.value + 1;
+  //       this.currentMoveMaker = metaData.marker;
+  //       if (this.onMoving) {
+  //         this.frameCnt.next(next);
+  //       } else {
+  //         console.log('handleMoveSteam stop', next, this.currentMoveMaker);
+  //         this.cleanMoveSteam();
+  //         this.resumeStream(next);
+  //         return;
+  //       }
+  //       let src = stream.clipPath.split('?')[0];
+  //       // // 临时本地替换路经
+  //       src = src.replace('/0000000001/', '');
+  //       const clipPath = path.join(__dirname, `../ws/video/${src}`);
+
+  //       const streamData: StreamFrameType = {
+  //         frame: next,
+  //         clipPath: clipPath,
+  //         metaData: stream.metaData,
+  //         serverTime: this.mockserverTime,
+  //         DIR: 3,
+  //       };
+  //       await this.streamService.pushFrameToSteam(streamData);
+
+  //       if (this.lastMoveCnt == this.frameCnt.getValue()) {
+  //         const next = this.frameCnt.getValue() + 1;
+  //         console.log('last', next);
+  //         this.resumeStream(next);
+  //         this.cleanMoveSteam();
+  //         const lastFrame = this.lastMoveStreamFrame.getValue();
+  //         const userId = this.user_id;
+  //         const breakPointId = lastFrame.marker.split('T')[1];
+  //         const lastReply = JSON.parse(lastFrame.metaData);
+  //         this.moveService.updateUser(userId, breakPointId, lastReply);
+  //       }
+  //     },
+  //   );
+  // }
 }

+ 68 - 0
src/scene/walking-consumer.ts

@@ -0,0 +1,68 @@
+import {
+  OnQueueActive,
+  OnQueueCleaned,
+  OnQueueCompleted,
+  OnQueueDrained,
+  OnQueueStalled,
+  Process,
+  Processor,
+} from '@nestjs/bull';
+import { Injectable } from '@nestjs/common';
+import { Job } from 'bull';
+import { SceneService } from './scene.service';
+import { StreamService } from './stream/stream.service';
+
+@Processor('walking')
+@Injectable()
+export class WalkingConsumer {
+  constructor(
+    private streamService: StreamService,
+    private sceneService: SceneService,
+  ) {}
+  private isDone = true;
+  private _checkerRotateDone: NodeJS.Timeout;
+  @Process()
+  async processFrame(job: Job<unknown>) {
+    const jobData = job.data as any as StreamFrameType;
+    // console.log('jobData', jobData);
+    const done = await this.streamService.pushFrameToSteam(jobData);
+    // }
+    return { done: done };
+  }
+
+  @OnQueueActive()
+  onActive(job: Job) {
+    clearTimeout(this._checkerRotateDone);
+  }
+
+  @OnQueueDrained()
+  onDrained(job: Job) {
+    console.log(`onDrained`);
+  }
+
+  @OnQueueCompleted()
+  onQueueComplete(job: Job) {
+    console.log(`onQueueComplete-frame`, job.data.frame);
+    this._checkerRotateDone = setTimeout(() => {
+      console.log('1秒后开流');
+      const next = job.data.frame + 1;
+      this.sceneService.resumeStream();
+      this.sceneService.onRotating.next(false);
+      this.sceneService.frameCnt.next(next);
+    }, 1000);
+  }
+
+  // @OnGlobalQueueCompleted()
+  // onGlobalQueueCompleted() {
+  //   console.log(`onGlobalQueueCompleted`);
+  // }
+
+  @OnQueueStalled()
+  onStalled() {
+    console.log(`OnQueueStalled`);
+  }
+  @OnQueueCleaned()
+  onCleaned() {
+    console.log(`OnQueueCleaned`);
+  }
+}