Explorar o código

更新rotate,去除延时同步队列

gemercheung %!s(int64=3) %!d(string=hai) anos
pai
achega
88347a766a
Modificáronse 1 ficheiros con 163 adicións e 98 borrados
  1. 163 98
      src/scene/scene.service.ts

+ 163 - 98
src/scene/scene.service.ts

@@ -17,6 +17,18 @@ import { MoveService } from 'src/move/move.service';
 import { GetRouterService } from 'src/get-router/get-router.service';
 import { ConfigService } from '@nestjs/config';
 
+const seqExeAsyncFn = (asyncFn) => {
+  let runPromise = null;
+  return function seq(...args) {
+    if (!runPromise) {
+      runPromise = asyncFn.apply(this, args);
+      runPromise.then(() => (runPromise = null));
+      return runPromise;
+    } else {
+      return runPromise.then(() => seq.apply(this, args));
+    }
+  };
+};
 @Injectable()
 export class SceneService implements OnModuleInit, OnModuleDestroy {
   constructor(
@@ -45,6 +57,7 @@ export class SceneService implements OnModuleInit, OnModuleDestroy {
   private joystickFrameCnt = -1;
   private rotateFirstIDR = true;
   private rotateStopThrottle = false; //防止多次瞬间解触发
+  private rotateTimeStamp: number;
 
   private channel: DataChannel;
   private peer: PeerConnection;
@@ -62,14 +75,15 @@ export class SceneService implements OnModuleInit, OnModuleDestroy {
   private moveQueueSubscription: any;
   private walkingSub: any;
 
-  private _rotateCurrentFame = -1;
+  private _rotateCountFame = -1;
+  private _rotateStartFame = new BehaviorSubject<number>(-1);
   private _rotateCount = -1;
 
   private streamServiceSub: any;
   // private roRequestQueue: RxQueue = new DelayQueue(20);
-  private roQueue: RxQueue = new DelayQueue(
-    Number(this.configService.get('queueConfig.rotate')) || 20,
-  );
+  // private roQueue: RxQueue = new DelayQueue(
+  //   Number(this.configService.get('queueConfig.rotate')) || 20,
+  // );
   private moveQueue: RxQueue = new DelayQueue(
     Number(this.configService.get('queueConfig.move')) || 20,
   );
@@ -77,16 +91,15 @@ export class SceneService implements OnModuleInit, OnModuleDestroy {
 
   private requestIFrameQueueSub: any;
   private roRequestQueueSub: any;
-  private joystickQueueSub: any;
-  private rotateTimeStamp: number;
+
   private rewalking = false;
   private firstRender = false;
-  private latestBreakPointId: number;
-  private isHoldingStream = false;
+
   private lastMovingPointArray: MovingLastUpdateType[] = [];
   private latestRotateRequest: any; // 最新Rotate的接收值
   private latestWalkingRequest: any; // 最新waking的接收值
   private hasJoystickMoveRequest = false; // 最新joystick的接收值
+  private stopRotated = false;
 
   private moveSliceLastFrame = new BehaviorSubject<MovingLastUpdateType>(null);
   private moveSliceLastFrameSub: any;
@@ -116,7 +129,7 @@ export class SceneService implements OnModuleInit, OnModuleDestroy {
     Number.prototype.padLeft = function (n, str) {
       return Array(n - String(this).length + 1).join(str || '0') + this;
     };
-    this.logger.log('roQueue-period :' + Number(this.roQueue.period));
+    // this.logger.log('roQueue-period :' + Number(this.roQueue.period));
     this.logger.log('moveQueue-period :' + Number(this.moveQueue.period));
   }
 
@@ -147,14 +160,12 @@ export class SceneService implements OnModuleInit, OnModuleDestroy {
 
   public holdSteam(): void {
     clearInterval(this._frameInteval);
-    this.isHoldingStream = true;
   }
 
   public resumeStream(): void {
     this.onMoving.next(false);
     this.onRotating.next(false);
     this.onJoysticking.next(false);
-    this.isHoldingStream = false;
     this.moveframeCnt = -1;
     this.rotateframeCnt = -1;
     clearInterval(this._frameInteval);
@@ -238,7 +249,7 @@ export class SceneService implements OnModuleInit, OnModuleDestroy {
     //   // debugger;
     // }
     this.handleRotate(request);
-    this._rotateCount += 1;
+    // this._rotateCount += 1;
   }
   /**
    * rotate请求队列
@@ -246,33 +257,23 @@ export class SceneService implements OnModuleInit, OnModuleDestroy {
 
   async handleRotate(request) {
     // try {
-    const rotateKey = this.firstRender && !this.globalOptLock;
-    console.log('handleRotate条件--->' + rotateKey, this.globalOptLock);
+    const rotateUnlock = this.firstRender && !this.globalOptLock;
+    console.log('rotateUnlock条件--->' + rotateUnlock, this.globalOptLock);
 
-    if (rotateKey) {
-      if (!this.roQueueSubscription) {
-        this.handleRotateStream();
-      }
+    if (rotateUnlock) {
       const start = performance.now();
-
       // 当move时处理 _rotateCount是移动端同时触发的问题,rotateStopThrottle是减少重复抖动stop的处理。
-      this.onRotating.next(true);
-
+      this.holdSteam();
       const redisMeta: StreamReplyType = await this.rotateService.seqExeRotate(
         request,
       );
+
       if (redisMeta && 'mediaSrc' in redisMeta) {
         if (redisMeta.mediaSrc?.length) {
           const src = redisMeta.mediaSrc.split('?')[0];
-          //this.logger.log('进入roQueue1', redisMeta.newUserStates[0].playerState.camera.angle.yaw);
-          //this.logger.log('进入roQueue2', src);
           if (src.length > 0) {
-            //this.logger.log('不同源');
-            this.holdSteam();
             this.lastRenderMedia = src;
             const clipPath = this.configService.get('app.prefix') + src;
-            //TODO 临时开出
-            // delete redisMeta.mediaSrc;
             const stream: StreamFrameType = {
               frame: -1,
               clipPath: clipPath,
@@ -288,18 +289,17 @@ export class SceneService implements OnModuleInit, OnModuleDestroy {
               `[timer]-rotate-入队列前: ${rounded}ms -->` +
               JSON.stringify(stream),
             );
-
-            this.roQueue.next(stream);
+            if (!this.stopRotated) {
+              await this.seqExehandleRotateStream(stream);
+            }
+            // this.roQueue.next(stream);
           } else {
             this.onRotating.next(false);
           }
         }
       }
     } else {
-      if (this.roQueueSubscription) {
-        this.roQueueSubscription.unsubscribe();
-        this.roQueueSubscription = null;
-      }
+      return;
     }
     // } catch (error) {
     //   this.logger.error('rotate', error.message);
@@ -307,76 +307,139 @@ export class SceneService implements OnModuleInit, OnModuleDestroy {
     // }
   }
 
+  handleStopRotate() {
+    this.stopRotated = true;
+  }
+
+  resumeRotate() {
+    this.stopRotated = false;
+  }
+
+  seqExehandleRotateStream = seqExeAsyncFn(this.handleRotateStream);
+
   /**
-   * rotate 推送队列
+   * rotate 推送seq(不存在队列,直推)
    */
-  handleRotateStream() {
-    if (!this.roQueueSubscription) {
-      this.roQueueSubscription = this.roQueue.subscribe(
-        async (stream: StreamFrameType) => {
-          this.rotateTimeStamp = Date.now();
-          if (this.rotateframeCnt === -1) {
-            this.rotateframeCnt = this.frameCnt.value;
-          }
-          this.rotateframeCnt += 1;
+  async handleRotateStream(stream: StreamFrameType) {
+    this.rotateTimeStamp = Date.now();
+    this.holdSteam();
+    // 在未开始前开始
+    if (!this.onRotating.value) {
+      this._rotateStartFame.next(this.frameCnt.value);
+    }
 
-          stream.frame = this.rotateframeCnt;
-          this._rotateCurrentFame += 1;
+    this.onRotating.next(true);
+    stream.frame = this.frameCnt.value + 1;
+    // 从记录第一帧到最新一帧
+    this._rotateCountFame = stream.frame - this._rotateStartFame.value;
+    const IDRflag = this._rotateCountFame % 5 === 0 ? 1 : 3;
+    stream.DIR = this.rotateFirstIDR ? 1 : IDRflag;
 
-          const IDRflag = this._rotateCurrentFame % 5 === 0 ? 1 : 3;
-          this.logger.log(
-            `当前rotate ,mainframeCnt:${this.frameCnt.getValue()}, _rotateCurrentFame:${this._rotateCurrentFame
-            } IDRflag:${IDRflag}`,
-          );
-          stream.DIR = this.rotateFirstIDR ? 1 : IDRflag;
-          if (this.rotateFirstIDR) {
-            this.rotateFirstIDR = false;
-          }
+    console.log(
+      '[旋转信息:::info--->]:clipPath: %s, frameCnt: %s,_rotateCountFame: %s, stream.frame %s , IDRflag: %s',
+      stream.clipPath,
+      // this._rotateStartFame.value,
+      this.frameCnt.value,
+      this._rotateCountFame,
+      stream.frame,
+      // this.rotateframeCnt,
+      IDRflag,
+    );
+    const res = await this.streamService.pushFrameToSteam(stream);
+    if (res.done) {
+      this.frameCnt.next(res.frame);
+      if (this.rotateFirstIDR) {
+        this.rotateFirstIDR = false;
+      }
 
-          this.logger.log(
-            '[media-rotate]: ' +
-            ', frame: ' +
-            stream.frame +
-            ', rotateframeCnt: ' +
-            this.rotateframeCnt +
-            ', clipPath: ' +
-            stream.clipPath,
-            // stream.metaData,
-          );
-          // this.logger.log(
-          //   `roQueueSubscription:frame:${this.rotateframeCnt}  ` +
-          //   JSON.stringify(stream.metaData),
-          // );
+      console.log('[旋转信息:::info:::done--->]', res);
 
-          const res = await this.streamService.pushFrameToSteam(stream);
-          if (res.done) {
-            clearTimeout(this._rotateTimeout);
-            this._rotateTimeout = setTimeout(() => {
-              this.logger.log('rotate end', Date.now());
-              this.frameCnt.next(res.frame);
-              this.rotateframeCnt = -1;
-              this._rotateCurrentFame = -1;
-              // this.onMoving.next(false);
-              // this.onRotating.next(false);
-              this.latestRotateRequest = null;
-              this.rotateFirstIDR = true;
-              this.resumeStream();
-              //TODO rotate完后清除request队列
-              if (this.roRequestQueueSub) {
-                this.roRequestQueueSub.unsubscribe();
-                this.roRequestQueueSub = null;
-              }
-            }, 100);
-          } else {
-            console.error('流地址有误::', res.frame, JSON.stringify(res));
-            this.logger.error('流地址有误::', res.frame, JSON.stringify(res));
-            this.resumeStream();
-          }
-        },
-      );
+      clearTimeout(this._rotateTimeout);
+      this._rotateTimeout = setTimeout(() => {
+        this.logger.log('rotate end', Date.now());
+        this.rotateframeCnt = -1;
+        this._rotateCountFame = -1;
+        this.latestRotateRequest = null;
+        this.rotateFirstIDR = true;
+        this.resumeStream();
+      }, 200);
+    } else {
+      console.error('流地址有误::', res.frame, JSON.stringify(res));
+      this.logger.error('流地址有误::', res.frame, JSON.stringify(res));
+      this.resumeStream();
     }
   }
 
+  // /**
+  //  * rotate 推送队列 --backup
+  //  */
+  // handleRotateStream() {
+  //   if (!this.roQueueSubscription) {
+  //     this.roQueueSubscription = this.roQueue.subscribe(
+  //       async (stream: StreamFrameType) => {
+  //         this.rotateTimeStamp = Date.now();
+  //         if (this.rotateframeCnt === -1) {
+  //           this.rotateframeCnt = this.frameCnt.value;
+  //         }
+  //         this.rotateframeCnt += 1;
+
+  //         stream.frame = this.rotateframeCnt;
+  //         this._rotateCountFame += 1;
+
+  //         const IDRflag = this._rotateCountFame % 5 === 0 ? 1 : 3;
+  //         this.logger.log(
+  //           `当前rotate ,mainframeCnt:${this.frameCnt.getValue()}, _rotateCountFame:${this._rotateCountFame
+  //           } IDRflag:${IDRflag}`,
+  //         );
+  //         stream.DIR = this.rotateFirstIDR ? 1 : IDRflag;
+  //         if (this.rotateFirstIDR) {
+  //           this.rotateFirstIDR = false;
+  //         }
+
+  //         this.logger.log(
+  //           '[media-rotate]: ' +
+  //           ', frame: ' +
+  //           stream.frame +
+  //           ', rotateframeCnt: ' +
+  //           this.rotateframeCnt +
+  //           ', clipPath: ' +
+  //           stream.clipPath,
+  //           // stream.metaData,
+  //         );
+  //         // this.logger.log(
+  //         //   `roQueueSubscription:frame:${this.rotateframeCnt}  ` +
+  //         //   JSON.stringify(stream.metaData),
+  //         // );
+
+  //         const res = await this.streamService.pushFrameToSteam(stream);
+  //         if (res.done) {
+  //           clearTimeout(this._rotateTimeout);
+  //           this._rotateTimeout = setTimeout(() => {
+  //             this.logger.log('rotate end', Date.now());
+  //             this.frameCnt.next(res.frame);
+  //             this.rotateframeCnt = -1;
+  //             this._rotateCountFame = -1;
+  //             // this.onMoving.next(false);
+  //             // this.onRotating.next(false);
+  //             this.latestRotateRequest = null;
+  //             this.rotateFirstIDR = true;
+  //             this.resumeStream();
+  //             //TODO rotate完后清除request队列
+  //             if (this.roRequestQueueSub) {
+  //               this.roRequestQueueSub.unsubscribe();
+  //               this.roRequestQueueSub = null;
+  //             }
+  //           }, 100);
+  //         } else {
+  //           console.error('流地址有误::', res.frame, JSON.stringify(res));
+  //           this.logger.error('流地址有误::', res.frame, JSON.stringify(res));
+  //           this.resumeStream();
+  //         }
+  //       },
+  //     );
+  //   }
+  // }
+
   /**
    * 旋转中断逻辑
    * 1. 行走间
@@ -447,7 +510,7 @@ export class SceneService implements OnModuleInit, OnModuleDestroy {
         serverTime: this.mockserverTime,
         DIR: 1,
       };
-      //推最后一个STOPFrame
+      //推最后一个 STOP Frame
       const hasPush = await this.streamService.pushFrameToSteam(streamData);
       if (hasPush.done) {
         this.frameCnt.next(hasPush.frame);
@@ -504,6 +567,7 @@ export class SceneService implements OnModuleInit, OnModuleDestroy {
           if (frame) {
             // console.log('unlock-Joints', JSON.stringify(frame));
             this.logger.log('Joints', JSON.stringify(frame));
+            this.resumeRotate();
             let isRotateStop = false;
             let isWalkingStop = false;
             // 在全局锁的情况下
@@ -658,6 +722,8 @@ export class SceneService implements OnModuleInit, OnModuleDestroy {
       if (walkingRes && (!this.onMoving.value || this.rewalking)) {
         // 二维数组 做move 序列, move类型
         //console.log('move-walkingRes:' + JSON.stringify(walkingRes));
+        this.handleStopRotate();
+
         if (walkingRes && walkingRes?.length >= 1) {
           for (let i = 0; i <= walkingRes.length - 1; i++) {
             Array.from(walkingRes[i]).forEach(
@@ -751,7 +817,7 @@ export class SceneService implements OnModuleInit, OnModuleDestroy {
     //console.log('joystickRes有mediaSrc', joystickRes.mediaSrc);
     console.log(
       'handlejoystick-angle->相机角度-------------------------:' +
-        joystickRes['newUserStates'][0].playerState.camera.angle.yaw,
+      joystickRes['newUserStates'][0].playerState.camera.angle.yaw,
     );
     let streamData: StreamFrameType | StreamMetaType;
 
@@ -959,7 +1025,7 @@ export class SceneService implements OnModuleInit, OnModuleDestroy {
             this.moveframeCnt = this.frameCnt.getValue();
           }
           this.moveframeCnt += 1;
-          this.latestBreakPointId = metaData.endBreakPointId;
+          // this.latestBreakPointId = metaData.endBreakPointId;
 
           const streamData: StreamFrameType = {
             frame: this.moveframeCnt,
@@ -1072,7 +1138,6 @@ export class SceneService implements OnModuleInit, OnModuleDestroy {
   }
 
   handleDataChanelClose(): void {
-    this.roQueue.clean();
     this.stopStream();
     this.startSteaming.next(false);
     this.streamService.closeChannel();