gemercheung hace 3 años
padre
commit
15d5f319ca
Se han modificado 3 ficheros con 46 adiciones y 28 borrados
  1. 2 2
      src/move/move.service.ts
  2. 21 4
      src/queue/delay-queue/delay-queue.ts
  3. 23 22
      src/scene/scene.service.ts

+ 2 - 2
src/move/move.service.ts

@@ -210,7 +210,7 @@ export class MoveService implements OnModuleInit {
       }
       //replys['P' + user.breakPointId + 'T' + user.breakPointId] = checkReplys;
       replys.push(checkReplys);
-      console.log('路径:'+path);
+      console.log('路径:' + path);
       //过渡传到缓存里
       this.reply.traceIds = traceIds;
       this.reply['newUserStates'][0].userId = userId;
@@ -287,7 +287,7 @@ export class MoveService implements OnModuleInit {
     for (i = 1; i < moveFrames.length; i += 5) {
       const moveFrame = moveFrames[i];
       const reply = JSON.parse(JSON.stringify(this.reply));
-      if(reply.traceIds.indexOf(traceId) == -1){
+      if (reply.traceIds.indexOf(traceId) == -1) {
         reply.traceIds.push(traceId);
       }
       reply['newUserStates'][0].userId = userId;

+ 21 - 4
src/queue/delay-queue/delay-queue.ts

@@ -1,5 +1,10 @@
 import { concat, empty, of, Subject, Subscription, timer, EMPTY } from 'rxjs';
-import { concatMap, ignoreElements, switchMap } from 'rxjs/operators';
+import {
+  concatMap,
+  ignoreElements,
+  startWith,
+  switchMap,
+} from 'rxjs/operators';
 
 import RxQueue from '../rx-queue';
 
@@ -19,8 +24,11 @@ export class DelayQueue<T = unknown> extends RxQueue<T> {
     period?: number, // milliseconds
   ) {
     super(period);
-
     this.subject = new Subject<T>();
+    this.initQueue();
+  }
+
+  initQueue(): void {
     this.subscription = this.subject
       .pipe(
         concatMap((x) =>
@@ -48,9 +56,18 @@ export class DelayQueue<T = unknown> extends RxQueue<T> {
 
   override clean(): void {
     // 1
-    // this.subject.pipe(ignoreElements());
+    this.subscription.unsubscribe();
+    this.subject.pipe(ignoreElements());
+    // .subscribe((item: T) => super.next(item));
+
+    this.initQueue();
+    // this.unsubscribe();
+    // this.subject.complete();
+    // this.initQueue();
     // 2
-    this.subject.pipe(switchMap(() => EMPTY)).pipe(ignoreElements());
+    // this.subject.pipe(switchMap(() => EMPTY));
+    // this.subject.pipe(startWith(true), ignoreElements());
+
     // this.subscription = this.subject
     //   .pipe(switchMap(() => EMPTY))
     //   .subscribe((item: T) => super.next(item));

+ 23 - 22
src/scene/scene.service.ts

@@ -26,7 +26,7 @@ export class SceneService implements OnModuleInit, OnModuleDestroy {
     private rotateService: RotateService,
     private moveService: MoveService,
     private getRouterService: GetRouterService, // @InjectQueue('rotate') private rotateQueue: Queue, // @InjectQueue('walking') private walkingQueue: Queue,
-  ) { }
+  ) {}
   @Client(grpcClientOptions) private readonly client: ClientGrpc;
 
   public _frameInteval: NodeJS.Timeout;
@@ -364,7 +364,7 @@ export class SceneService implements OnModuleInit, OnModuleDestroy {
     if (!this.moveSliceLastFrameSub) {
       this.moveSliceLastFrameSub = this.moveSliceLastFrame.subscribe(
         async (frame: MovingLastUpdateType) => {
-          console.log('walkingStop-'+ this.latestWalkingRequest + ','+ this.onMoving.value);
+          // console.log('walkingStop-'+ this.latestWalkingRequest + ','+ this.onMoving.value);
           //TODO 正在行走时,有新的reqest
           if (this.latestWalkingRequest && this.onMoving.value) {
             this.logger.log('stop-data-1', frame);
@@ -372,8 +372,8 @@ export class SceneService implements OnModuleInit, OnModuleDestroy {
             // this.moveQueue.of('');
             // TODO 中断move队列 ?优化如何清空
             this.moveQueue.clean();
-            this.moveQueueSubscription.unsubscribe();
-            this.moveQueueSubscription = null;
+            // this.moveQueueSubscription.unsubscribe();
+            // this.moveQueueSubscription = null;
             //step1 执行stop方法
             const metaData: StreamReplyType = frame.metaData;
             const newUserStates: NewUserStatesType =
@@ -399,7 +399,7 @@ export class SceneService implements OnModuleInit, OnModuleDestroy {
               cameraAngle,
               playerAngle,
             );
-            this.logger.log('stop-redisMeta', redisMeta);
+            this.logger.log('stop-redisMeta', JSON.stringify(redisMeta));
             // 2. 中断重新walking
             console.log('walking-step-reWalking-1', request.trace_id);
             this.handleReWalking(this.latestWalkingRequest);
@@ -540,7 +540,7 @@ export class SceneService implements OnModuleInit, OnModuleDestroy {
       const joystickRes = await this.moveService.seqExeJoystick(request);
       this.logger.log(
         'joystick-breakPointId:' +
-        this.moveService.users[this.user_id].breakPointId,
+          this.moveService.users[this.user_id].breakPointId,
       );
       // 有数据 [0]是rotate数据,[1-infinity]是walking数据
       this.logger.log('joystickRes-1', joystickRes);
@@ -702,14 +702,14 @@ export class SceneService implements OnModuleInit, OnModuleDestroy {
         };
         this.logger.log(
           '[media-move]: ' +
-          ', moveframeCnt: ' +
-          this.moveframeCnt +
-          ', clipPath: ' +
-          stream.clipPath +
-          ', mType: ' +
-          stream.mType +
-          ', DIR: ' +
-          stream.DIR,
+            ', moveframeCnt: ' +
+            this.moveframeCnt +
+            ', clipPath: ' +
+            stream.clipPath +
+            ', mType: ' +
+            stream.mType +
+            ', DIR: ' +
+            stream.DIR,
           // stream.metaData,
         );
         this.logger.log(
@@ -889,7 +889,8 @@ export class SceneService implements OnModuleInit, OnModuleDestroy {
 
           const IDRflag = this._rotateCurrentFame % 5 === 0 ? 1 : 3;
           this.logger.log(
-            `当前rotate ,mainframeCnt:${this.frameCnt.getValue()}, _rotateCurrentFame:${this._rotateCurrentFame
+            `当前rotate ,mainframeCnt:${this.frameCnt.getValue()}, _rotateCurrentFame:${
+              this._rotateCurrentFame
             } IDRflag:${IDRflag}`,
           );
           stream.DIR = this.rotateFirstIDR ? 1 : IDRflag;
@@ -900,12 +901,12 @@ export class SceneService implements OnModuleInit, OnModuleDestroy {
 
           this.logger.log(
             '[media-rotate]: ' +
-            ', frame: ' +
-            stream.frame +
-            ', rotateframeCnt: ' +
-            this.rotateframeCnt +
-            ', clipPath: ' +
-            stream.clipPath,
+              ', frame: ' +
+              stream.frame +
+              ', rotateframeCnt: ' +
+              this.rotateframeCnt +
+              ', clipPath: ' +
+              stream.clipPath,
             // stream.metaData,
           );
           // this.logger.log(
@@ -976,7 +977,7 @@ export class SceneService implements OnModuleInit, OnModuleDestroy {
                 delete redisData.mediaSrc;
                 this.logger.log(
                   `user:${this.user_id}:first render stream` +
-                  JSON.stringify({ path: clipPath, meta: redisData }),
+                    JSON.stringify({ path: clipPath, meta: redisData }),
                 );
                 const status = await this.pushFirstRender(
                   clipPath,