Browse Source

update test

gemercheung 3 years ago
parent
commit
51d496c65e
3 changed files with 179 additions and 206 deletions
  1. 5 4
      src/meta.gateway.ts
  2. 155 196
      src/scene/scene.service.ts
  3. 19 6
      src/scene/stream/stream.service.ts

+ 5 - 4
src/meta.gateway.ts

@@ -35,11 +35,12 @@ initLogger('Debug');
   path: '/ws',
 })
 export class MetaGateway
-  implements OnGatewayInit, OnGatewayConnection, OnGatewayDisconnect {
+  implements OnGatewayInit, OnGatewayConnection, OnGatewayDisconnect
+{
   constructor(
     private readonly sceneService: SceneService,
     private readonly configService: ConfigService,
-  ) { }
+  ) {}
   private logger: Logger = new Logger('MetaGateway');
   private peer: PeerConnection = null;
   private timer: NodeJS.Timeout;
@@ -99,7 +100,7 @@ export class MetaGateway
       portRangeEnd: portRangeEnd,
       iceServers: stun_server,
       // enableIceTcp: true,
-      // maxMessageSize: 662144,
+      maxMessageSize: 65536,
       mtu: 1200,
     });
 
@@ -290,7 +291,7 @@ export class MetaGateway
           }
         },
       );
-    } catch (error) { }
+    } catch (error) {}
   }
 
   handleConnection(client: WebSocket, ...args: any[]) {

+ 155 - 196
src/scene/scene.service.ts

@@ -38,7 +38,7 @@ export class SceneService implements OnModuleInit, OnModuleDestroy {
     private rotateService: RotateService,
     private moveService: MoveService,
     private getRouterService: GetRouterService, // @InjectQueue('rotate') private rotateQueue: Queue, // @InjectQueue('walking') private walkingQueue: Queue,
-  ) { }
+  ) {}
   @Client(grpcClientOptions) private readonly client: ClientGrpc;
 
   public _frameInteval: NodeJS.Timeout;
@@ -290,7 +290,7 @@ export class SceneService implements OnModuleInit, OnModuleDestroy {
             const rounded = Number(inMillSeconds).toFixed(3);
             this.logger.log(
               `[timer]-rotate-入队列前: ${rounded}ms -->` +
-              JSON.stringify(stream),
+                JSON.stringify(stream),
             );
             if (!this.stopRotated) {
               await this.seqExehandleRotateStream(stream);
@@ -487,16 +487,16 @@ export class SceneService implements OnModuleInit, OnModuleDestroy {
     const playerAngle = newUserStates.playerState.player.angle;
     this.logger.log(
       'stop-data-0' +
-      'trace_id: ' +
-      trace_id +
-      'userId:' +
-      userId +
-      'breakPointId :' +
-      breakPointId +
-      'cameraAngle :' +
-      JSON.stringify(cameraAngle) +
-      'playerAngle: ' +
-      JSON.stringify(playerAngle),
+        'trace_id: ' +
+        trace_id +
+        'userId:' +
+        userId +
+        'breakPointId :' +
+        breakPointId +
+        'cameraAngle :' +
+        JSON.stringify(cameraAngle) +
+        'playerAngle: ' +
+        JSON.stringify(playerAngle),
     );
     //debugger;
     console.log('moveService.stop-1:' + breakPointId);
@@ -621,48 +621,6 @@ export class SceneService implements OnModuleInit, OnModuleDestroy {
                 // console.log('this', this.rewalking);
               }
             }
-            // if (this.latestWalkingRequest && this.onMoving.value) {
-            //   this.logger.log('stop-data-1', frame);
-            //   this.moveQueueSubscription.unsubscribe();
-            //   this.moveQueueSubscription = null;
-            //   this.moveQueue.clean();
-            //   //step1 执行stop方法
-            //   const metaData: StreamReplyType = frame.metaData;
-            //   const newUserStates: NewUserStatesType =
-            //     metaData.newUserStates.find(
-            //       (item) => item.userId === this.user_id,
-            //     );
-            //   const trace_id = metaData.traceIds[0];
-            //   const userId = newUserStates.userId;
-            //   const breakPointId = metaData.endBreakPointId;
-            //   const cameraAngle = newUserStates.playerState.camera.angle;
-            //   const playerAngle = newUserStates.playerState.player.angle;
-            //   this.logger.log(
-            //     'stop-data-2',
-            //     trace_id,
-            //     userId,
-            //     cameraAngle,
-            //     cameraAngle,
-            //   );
-            //   console.log('moveService.stop-2:' + breakPointId);
-            //   const redisMeta = await this.moveService.stop(
-            //     trace_id,
-            //     userId,
-            //     breakPointId,
-            //     cameraAngle,
-            //     playerAngle,
-            //   );
-            //   this.logger.log('stop-redisMeta', JSON.stringify(redisMeta));
-            //   // 2. 中断重新walking
-            //   console.log(
-            //     'walking-step-reWalking-1',
-            //     request.trace_id + ',' + this.latestWalkingRequest.trace_id,
-            //   );
-
-            //   this.logger.debug('重新行走---handleReWalking');
-            //   console.log('重新行走---handleReWalking');
-            //   this.handleReWalking(this.latestWalkingRequest);
-            // }
           }
         },
       );
@@ -702,15 +660,15 @@ export class SceneService implements OnModuleInit, OnModuleDestroy {
       console.log('进入1 - searchRoad');
       this.logger.log(
         'handleWalking-users' +
-        JSON.stringify(this.moveService.users) +
-        ' this.user_id: ' +
-        this.user_id,
+          JSON.stringify(this.moveService.users) +
+          ' this.user_id: ' +
+          this.user_id,
       );
       this.logger.log(
         'handleWalking-currentUser' +
-        JSON.stringify(user) +
-        ' this.user_id: ' +
-        this.user_id,
+          JSON.stringify(user) +
+          ' this.user_id: ' +
+          this.user_id,
       );
       console.log('path-start' + user.breakPointId);
 
@@ -765,16 +723,16 @@ export class SceneService implements OnModuleInit, OnModuleDestroy {
         if (seqs?.length) {
           this.logger.log(
             'walking --队列总览:' +
-            ' 总段数: ' +
-            walkingRes.length +
-            ' 镜头帧数:' +
-            walkingRes[0].length +
-            ' 行走段数:' +
-            (walkingRes[0]?.length
-              ? walkingRes.length - 1
-              : walkingRes.length) +
-            ' 队列总帧数:' +
-            seqs.length,
+              ' 总段数: ' +
+              walkingRes.length +
+              ' 镜头帧数:' +
+              walkingRes[0].length +
+              ' 行走段数:' +
+              (walkingRes[0]?.length
+                ? walkingRes.length - 1
+                : walkingRes.length) +
+              ' 队列总帧数:' +
+              seqs.length,
           );
           const stop = performance.now();
           const inMillSeconds = stop - start;
@@ -832,7 +790,7 @@ export class SceneService implements OnModuleInit, OnModuleDestroy {
     //console.log('joystickRes有mediaSrc', joystickRes.mediaSrc);
     console.log(
       'handlejoystick-angle->相机角度-------------------------:' +
-      joystickRes['newUserStates'][0].playerState.camera.angle.yaw,
+        joystickRes['newUserStates'][0].playerState.camera.angle.yaw,
     );
     let streamData: StreamFrameType | StreamMetaType;
 
@@ -854,11 +812,11 @@ export class SceneService implements OnModuleInit, OnModuleDestroy {
       };
       console.log(
         'handlejoystick-hasMedia->-------------------------:' +
-        ' frame: ' +
-        streamData.frame +
-        mediaSrc +
-        '  IDR :' +
-        setDIR,
+          ' frame: ' +
+          streamData.frame +
+          mediaSrc +
+          '  IDR :' +
+          setDIR,
       );
     } else {
       streamData = {
@@ -877,8 +835,8 @@ export class SceneService implements OnModuleInit, OnModuleDestroy {
     const hasPush = hasMedia
       ? await this.streamService.pushFrameToSteam(streamData as StreamFrameType)
       : await this.streamService.pushMetaDataToSteam(
-        streamData as StreamMetaType,
-      );
+          streamData as StreamMetaType,
+        );
 
     if (hasPush.done) {
       this.isJoystickHasStream = true;
@@ -886,7 +844,7 @@ export class SceneService implements OnModuleInit, OnModuleDestroy {
       // if (this.isJoystickHasStream) {
       //   await this.sleep(20);
       // }
-      await this.sleep(20);
+      // await this.sleep(20);
       this.frameCnt.next(hasPush.frame);
       this.moveService.sendingFrameForJoystick = false;
       const data = joystickRes as StreamReplyType;
@@ -915,9 +873,9 @@ export class SceneService implements OnModuleInit, OnModuleDestroy {
         if (complementFrame) {
           console.log(
             '20220627test-complementFrame 进行' +
-            complementFrame.mediaSrc +
-            '***' +
-            this.frameCnt.value,
+              complementFrame.mediaSrc +
+              '***' +
+              this.frameCnt.value,
           );
           // 第二次或N次进入时如果有值直接重新进入流主程
           this.holdSteam();
@@ -958,16 +916,16 @@ export class SceneService implements OnModuleInit, OnModuleDestroy {
         this.hasJoystickFocusRepeat = true;
         this.globalOptLock = true;
         this.testTimer += 1;
-        console.log('gemer-test-complementFrame-有值');
+        console.log('complementFrame-有值');
         const start = performance.now();
         this.handlePushJoyStickSteamSeq(complementFrame);
         const stop = performance.now();
-        console.log('gemer-test-handlePushJoyStickSteam', this.testTimer);
+        console.log('handlePushJoyStickSteam', this.testTimer);
         const inMillSeconds = stop - start;
         const rounded = Number(inMillSeconds).toFixed(3);
-        console.log(`gemer-test-complementFrame调用时间---->${rounded}`);
+        console.log(`complementFrame调用时间---->${rounded}`);
       } else {
-        console.log('gemer-test-complementFrame-空1');
+        console.log('complementFrame-空1');
         this.logger.log('joystick opt done');
         this.logger.log('joystick 交权给空流,当前pts', hasPush.frame);
         this.hasJoystickFocusRepeat = false;
@@ -990,7 +948,7 @@ export class SceneService implements OnModuleInit, OnModuleDestroy {
       const joystickRes = await this.moveService.seqExeJoystick(request);
       this.logger.log(
         'joystick-breakPointId:' +
-        this.moveService.users[this.user_id].breakPointId,
+          this.moveService.users[this.user_id].breakPointId,
       );
       // 有数据 [0]是rotate数据,[1-infinity]是walking数据
       //this.logger.log('joystickRes', JSON.stringify(joystickRes));
@@ -1025,9 +983,9 @@ export class SceneService implements OnModuleInit, OnModuleDestroy {
    * @param seqs StreamReplyType[]
    */
   handleSeqMoving(seqs: StreamReplyType[]) {
-    if (!this.moveQueueSubscription) {
-      this.handleMoveSteam();
-    }
+    // if (!this.moveQueueSubscription) {
+    //   this.handleMoveSteam();
+    // }
     // this.logger.log('moving-seqs', seqs.length);
     this.onMoving.next(true);
     this.holdSteam();
@@ -1049,100 +1007,101 @@ export class SceneService implements OnModuleInit, OnModuleDestroy {
         DIR: frame.DIR,
         mType: type,
       };
-      this.moveQueue.next(stream);
+      // this.moveQueue.next(stream);
+      this.handleMoveSteam(stream);
     });
   }
 
-  handleMoveSteam() {
-    this.moveQueueSubscription = this.moveQueue.subscribe(
-      async (stream: StreamFrameType) => {
-        try {
-          if (!this.isStopJointing) {
-            const metaData: StreamReplyType = JSON.parse(stream.metaData);
-            if (this.moveframeCnt === -1) {
-              this.moveframeCnt = this.frameCnt.getValue();
-            }
-            this.moveframeCnt += 1;
-            // this.latestBreakPointId = metaData.endBreakPointId;
-
-            const streamData: StreamFrameType = {
-              frame: this.moveframeCnt,
-              clipPath: stream.clipPath,
-              metaData: stream.metaData,
-              serverTime: this.mockserverTime,
-              DIR: stream.DIR,
-            };
-            this.logger.log(
-              '[media-move]: ' +
-              ', moveframeCnt: ' +
-              this.moveframeCnt +
-              ', clipPath: ' +
-              stream.clipPath +
-              ', mType: ' +
-              stream.mType +
-              ', DIR: ' +
-              stream.DIR,
-              // stream.metaData,
-            );
-            this.logger.log(
-              '[media-move-lastMovingPointArray]',
-              this.lastMovingPointArray?.length,
-            );
-            // 记录lastMoveStreamFrame给打断逻辑使用
-            this.lastMoveStreamFrame.next(streamData);
-            // this.lastMoveStreamFrameBk = streamData;
-            this.holdSteam();
-            // this.globalOptLock = true;
-            //console.log('20220627test:handleMoveSteam:' + stream.clipPath)
-            const res = await this.streamService.pushFrameToSteam(streamData);
-
-            const isLastFrameIndex = this.lastMovingPointArray.findIndex(
-              (item) => item.mediaSrc === metaData.mediaSrc,
-            );
-            // this.logger.log('path-update-index', isLastFrameIndex);
-
-            if (res.done) {
-              this.frameCnt.next(res.frame);
-              //关节点入库
-              if (isLastFrameIndex > -1) {
-                //this.logger.log('path-update-array', this.lastMovingPointArray);
-                const currentMeta = this.lastMovingPointArray[isLastFrameIndex];
-                const userId = this.user_id;
-                const breakPointId = currentMeta.metaData.endBreakPointId;
-                const lastReply = currentMeta.metaData;
-                this.moveService.updateUser(userId, breakPointId, lastReply);
-                this.lastMovingPointArray.splice(isLastFrameIndex, 1);
-                this.moveSliceLastFrame.next(currentMeta);
-              }
+  handleMoveSteam = seqExeAsyncFn(this.handleMoveSteamFn);
 
-              clearTimeout(this._moveTimeout);
-              this._moveTimeout = setTimeout(() => {
-                this.logger.log('move 交权给空流,当前pts', res.frame);
-                this.rewalking = false;
-                this.frameCnt.next(res.frame);
-                this.rotateframeCnt = -1;
-                this.onMoving.next(false);
-                this.onJoysticking.next(false);
-                this.lastMovingPointArray = [];
-                this.hasJoystickMoveRequest = false;
-                this.cleanMoveSteam();
-                this.globalOptLock = false;
-                this.resumeStream();
-                this.logger.log('move end');
-              }, 200);
-            } else {
-              console.error('流地址有误::', res.frame, JSON.stringify(res));
-              this.logger.error(
-                `movesteam::当前帧:${res.frame}` + JSON.stringify(res),
-              );
-              this.resumeStream();
-            }
+  async handleMoveSteamFn(stream: StreamFrameType) {
+    try {
+      if (!this.isStopJointing) {
+        const metaData: StreamReplyType = JSON.parse(stream.metaData);
+        // if (this.moveframeCnt === -1) {
+        //   this.moveframeCnt = this.frameCnt.getValue();
+        // }
+        // this.moveframeCnt += 1;
+        // this.latestBreakPointId = metaData.endBreakPointId;
+        this.moveframeCnt = this.frameCnt.value + 1;
+        const streamData: StreamFrameType = {
+          frame: this.moveframeCnt,
+          clipPath: stream.clipPath,
+          metaData: stream.metaData,
+          serverTime: this.mockserverTime,
+          DIR: stream.DIR,
+        };
+        this.logger.log(
+          '[media-move]: ' +
+            ', moveframeCnt: ' +
+            this.moveframeCnt +
+            ', clipPath: ' +
+            stream.clipPath +
+            ', mType: ' +
+            stream.mType +
+            ', DIR: ' +
+            stream.DIR,
+          // stream.metaData,
+        );
+        this.logger.log(
+          '[media-move-lastMovingPointArray]',
+          this.lastMovingPointArray?.length,
+        );
+        // 记录lastMoveStreamFrame给打断逻辑使用
+        this.lastMoveStreamFrame.next(streamData);
+        // this.lastMoveStreamFrameBk = streamData;
+        this.holdSteam();
+        // this.globalOptLock = true;
+        //console.log('20220627test:handleMoveSteam:' + stream.clipPath)
+        const res = await this.streamService.pushFrameToSteam(streamData);
+
+        const isLastFrameIndex = this.lastMovingPointArray.findIndex(
+          (item) => item.mediaSrc === metaData.mediaSrc,
+        );
+        // this.logger.log('path-update-index', isLastFrameIndex);
+
+        if (res.done) {
+          await this.sleep(20);
+          this.frameCnt.next(res.frame);
+          //关节点入库
+          if (isLastFrameIndex > -1) {
+            //this.logger.log('path-update-array', this.lastMovingPointArray);
+            const currentMeta = this.lastMovingPointArray[isLastFrameIndex];
+            const userId = this.user_id;
+            const breakPointId = currentMeta.metaData.endBreakPointId;
+            const lastReply = currentMeta.metaData;
+            this.moveService.updateUser(userId, breakPointId, lastReply);
+            this.lastMovingPointArray.splice(isLastFrameIndex, 1);
+            this.moveSliceLastFrame.next(currentMeta);
           }
-        } catch (error) {
-          this.logger.error('handleMoveSteam::error', error);
+
+          clearTimeout(this._moveTimeout);
+
+          this._moveTimeout = setTimeout(() => {
+            this.logger.log('move 交权给空流,当前pts', res.frame);
+            this.rewalking = false;
+            this.frameCnt.next(res.frame);
+            this.rotateframeCnt = -1;
+            this.onMoving.next(false);
+            this.onJoysticking.next(false);
+            this.lastMovingPointArray = [];
+            this.hasJoystickMoveRequest = false;
+            this.cleanMoveSteam();
+            this.globalOptLock = false;
+            this.resumeStream();
+            this.logger.log('move end');
+          }, 200);
+        } else {
+          console.error('流地址有误::', res.frame, JSON.stringify(res));
+          this.logger.error(
+            `movesteam::当前帧:${res.frame}` + JSON.stringify(res),
+          );
+          this.resumeStream();
         }
-      },
-    );
+      }
+    } catch (error) {
+      this.logger.error('handleMoveSteam::error', error);
+    }
   }
 
   cleanMoveSteam() {
@@ -1336,12 +1295,12 @@ export class SceneService implements OnModuleInit, OnModuleDestroy {
           this.logger.log('frame', frame);
           console.log(
             'mock' +
-            ' maxMessageSize: ' +
-            this.channel.maxMessageSize() +
-            ' bytesReceived: ' +
-            this.peer.bytesReceived() +
-            ' bytesSent: ' +
-            this.peer.bytesSent(),
+              ' maxMessageSize: ' +
+              this.channel.maxMessageSize() +
+              ' bytesReceived: ' +
+              this.peer.bytesReceived() +
+              ' bytesSent: ' +
+              this.peer.bytesSent(),
             ' state: ' + this.peer.state(),
           );
           if (frame === 1) {
@@ -1363,7 +1322,7 @@ export class SceneService implements OnModuleInit, OnModuleDestroy {
                 delete redisData.mediaSrc;
                 this.logger.log(
                   `user:${this.user_id}:first render stream` +
-                  JSON.stringify({ path: clipPath, meta: redisData }),
+                    JSON.stringify({ path: clipPath, meta: redisData }),
                 );
                 const status = await this.pushFirstRender(
                   clipPath,
@@ -1392,17 +1351,17 @@ export class SceneService implements OnModuleInit, OnModuleDestroy {
 
             console.log(
               '空白流条件-->:' +
-              isOk +
-              ' onMoving: ' +
-              this.onMoving.value +
-              ' onRotating: ' +
-              this.onRotating.value +
-              ' onJoysticking: ' +
-              this.onJoysticking.value +
-              ' onSteaming: ' +
-              this.onSteaming +
-              ' firstRender: ' +
-              this.firstRender,
+                isOk +
+                ' onMoving: ' +
+                this.onMoving.value +
+                ' onRotating: ' +
+                this.onRotating.value +
+                ' onJoysticking: ' +
+                this.onJoysticking.value +
+                ' onSteaming: ' +
+                this.onSteaming +
+                ' firstRender: ' +
+                this.firstRender,
             );
           }
 

+ 19 - 6
src/scene/stream/stream.service.ts

@@ -6,6 +6,19 @@ import * as streamBuffers from 'stream-buffers';
 import { BehaviorSubject } from 'rxjs';
 import { join } from 'path';
 
+const seqExeAsyncFn = (asyncFn) => {
+  let runPromise = null;
+  return function seq(...args) {
+    if (!runPromise) {
+      runPromise = asyncFn.apply(this, args);
+      runPromise.then(() => (runPromise = null));
+      return runPromise;
+    } else {
+      return runPromise.then(() => seq.apply(this, args));
+    }
+  };
+};
+
 @Injectable()
 export class StreamService {
   private channel: DataChannel;
@@ -206,7 +219,7 @@ export class StreamService {
 
           // this.logger.log('statusPack', statusPack);
           if (this.channel && this.channel.isOpen()) {
-            const isPush = this.channel.sendMessageBinary(
+            const isPush = await this.channel.sendMessageBinary(
               Buffer.from(framePack.buffer),
             );
             if (!isPush) {
@@ -246,7 +259,7 @@ export class StreamService {
         console.log(
           'stream-size %s : frame: %s, IDR: %s',
           ((coordBuff.byteLength + clipBuffer.byteLength) / 1024).toFixed(2) +
-          ' KB',
+            ' KB',
           frame,
           dir,
         );
@@ -258,9 +271,9 @@ export class StreamService {
   }
 
   /**
- *  stream core push with block  stream
- * @param stream   meta Json and stream
- */
+   *  stream core push with block  stream
+   * @param stream   meta Json and stream
+   */
 
   pushFrameToSteam1(stream: StreamFrameType): Promise<StreamPushResponse> {
     return new Promise((resolve, reject) => {
@@ -321,7 +334,7 @@ export class StreamService {
         console.log(
           'stream-size %s : frame: %s, IDR: %s',
           ((coordBuff.byteLength + clipBuffer.byteLength) / 1024).toFixed(2) +
-          ' KB',
+            ' KB',
           frame,
           dir,
         );