Browse Source

Merge branch 'master' into jointWorker-noque

test pc 3 years ago
parent
commit
36611745ca
5 changed files with 151 additions and 160 deletions
  1. 1 1
      config.production1.yaml
  2. 3 2
      src/meta.gateway.ts
  3. 103 139
      src/scene/scene.service.ts
  4. 42 18
      src/scene/stream/stream.service.ts
  5. 2 0
      start.sh

+ 1 - 1
config.production1.yaml

@@ -37,7 +37,7 @@ redis:
 # ['turn:4dage:4dage168@turn.4dage.com:4478', 'stun:stun.callwithus.com:3478']
 stun:
   # server: ['stun:stun.callwithus.com:3478', 'stun:47.107.125.202:3478']
-  server: ['175.178.12.158:3478','106.55.177.178:3478']
+  server: ['stun:175.178.12.158:3478','stun:106.55.177.178:3478']
   portRangeBegin: 49152
   portRangeEnd: 65535
 

+ 3 - 2
src/meta.gateway.ts

@@ -99,8 +99,9 @@ export class MetaGateway
       portRangeEnd: portRangeEnd,
       iceServers: stun_server,
       // enableIceTcp: true,
-      // maxMessageSize: 662144,
-      mtu: 1200,
+      // maxMessageSize: 65535,
+      // maxMessageSize: 16000, // 16k
+      mtu: 1500,
     });
 
     this.peer.onLocalDescription((sdp, type) => {

+ 103 - 139
src/scene/scene.service.ts

@@ -277,6 +277,7 @@ export class SceneService implements OnModuleInit, OnModuleDestroy {
           if (src.length > 0) {
             this.lastRenderMedia = src;
             const clipPath = this.configService.get('app.prefix') + src;
+            delete redisMeta.mediaSrc;
             const stream: StreamFrameType = {
               frame: -1,
               clipPath: clipPath,
@@ -621,48 +622,6 @@ export class SceneService implements OnModuleInit, OnModuleDestroy {
                 // console.log('this', this.rewalking);
               }
             }
-            // if (this.latestWalkingRequest && this.onMoving.value) {
-            //   this.logger.log('stop-data-1', frame);
-            //   this.moveQueueSubscription.unsubscribe();
-            //   this.moveQueueSubscription = null;
-            //   this.moveQueue.clean();
-            //   //step1 执行stop方法
-            //   const metaData: StreamReplyType = frame.metaData;
-            //   const newUserStates: NewUserStatesType =
-            //     metaData.newUserStates.find(
-            //       (item) => item.userId === this.user_id,
-            //     );
-            //   const trace_id = metaData.traceIds[0];
-            //   const userId = newUserStates.userId;
-            //   const breakPointId = metaData.endBreakPointId;
-            //   const cameraAngle = newUserStates.playerState.camera.angle;
-            //   const playerAngle = newUserStates.playerState.player.angle;
-            //   this.logger.log(
-            //     'stop-data-2',
-            //     trace_id,
-            //     userId,
-            //     cameraAngle,
-            //     cameraAngle,
-            //   );
-            //   console.log('moveService.stop-2:' + breakPointId);
-            //   const redisMeta = await this.moveService.stop(
-            //     trace_id,
-            //     userId,
-            //     breakPointId,
-            //     cameraAngle,
-            //     playerAngle,
-            //   );
-            //   this.logger.log('stop-redisMeta', JSON.stringify(redisMeta));
-            //   // 2. 中断重新walking
-            //   console.log(
-            //     'walking-step-reWalking-1',
-            //     request.trace_id + ',' + this.latestWalkingRequest.trace_id,
-            //   );
-
-            //   this.logger.debug('重新行走---handleReWalking');
-            //   console.log('重新行走---handleReWalking');
-            //   this.handleReWalking(this.latestWalkingRequest);
-            // }
           }
         },
       );
@@ -887,7 +846,7 @@ export class SceneService implements OnModuleInit, OnModuleDestroy {
       // if (this.isJoystickHasStream) {
       //   await this.sleep(20);
       // }
-      await this.sleep(20);
+      // await this.sleep(20);
       this.frameCnt.next(hasPush.frame);
       this.moveService.sendingFrameForJoystick = false;
       const data = joystickRes as StreamReplyType;
@@ -959,16 +918,16 @@ export class SceneService implements OnModuleInit, OnModuleDestroy {
         this.hasJoystickFocusRepeat = true;
         this.globalOptLock = true;
         this.testTimer += 1;
-        console.log('gemer-test-complementFrame-有值');
+        console.log('complementFrame-有值');
         const start = performance.now();
         this.handlePushJoyStickSteamSeq(complementFrame);
         const stop = performance.now();
-        console.log('gemer-test-handlePushJoyStickSteam', this.testTimer);
+        console.log('handlePushJoyStickSteam', this.testTimer);
         const inMillSeconds = stop - start;
         const rounded = Number(inMillSeconds).toFixed(3);
-        console.log(`gemer-test-complementFrame调用时间---->${rounded}`);
+        console.log(`complementFrame调用时间---->${rounded}`);
       } else {
-        console.log('gemer-test-complementFrame-空1');
+        console.log('complementFrame-空1');
         this.logger.log('joystick opt done');
         this.logger.log('joystick 交权给空流,当前pts', hasPush.frame);
         this.hasJoystickFocusRepeat = false;
@@ -1026,9 +985,9 @@ export class SceneService implements OnModuleInit, OnModuleDestroy {
    * @param seqs StreamReplyType[]
    */
   handleSeqMoving(seqs: StreamReplyType[]) {
-    if (!this.moveQueueSubscription) {
-      this.handleMoveSteam();
-    }
+    // if (!this.moveQueueSubscription) {
+    //   this.handleMoveSteam();
+    // }
     // this.logger.log('moving-seqs', seqs.length);
     this.onMoving.next(true);
     this.holdSteam();
@@ -1050,100 +1009,105 @@ export class SceneService implements OnModuleInit, OnModuleDestroy {
         DIR: frame.DIR,
         mType: type,
       };
-      this.moveQueue.next(stream);
+      // this.moveQueue.next(stream);
+      this.handleMoveSteam(stream);
     });
   }
 
-  handleMoveSteam() {
-    this.moveQueueSubscription = this.moveQueue.subscribe(
-      async (stream: StreamFrameType) => {
-        try {
-          if (!this.isStopJointing) {
-            const metaData: StreamReplyType = JSON.parse(stream.metaData);
-            if (this.moveframeCnt === -1) {
-              this.moveframeCnt = this.frameCnt.getValue();
-            }
-            this.moveframeCnt += 1;
-            // this.latestBreakPointId = metaData.endBreakPointId;
+  handleMoveSteam = seqExeAsyncFn(this.handleMoveSteamFn);
 
-            const streamData: StreamFrameType = {
-              frame: this.moveframeCnt,
-              clipPath: stream.clipPath,
-              metaData: stream.metaData,
-              serverTime: this.mockserverTime,
-              DIR: stream.DIR,
-            };
-            this.logger.log(
-              '[media-move]: ' +
-              ', moveframeCnt: ' +
-              this.moveframeCnt +
-              ', clipPath: ' +
-              stream.clipPath +
-              ', mType: ' +
-              stream.mType +
-              ', DIR: ' +
-              stream.DIR,
-              // stream.metaData,
-            );
-            this.logger.log(
-              '[media-move-lastMovingPointArray]',
-              this.lastMovingPointArray?.length,
-            );
-            // 记录lastMoveStreamFrame给打断逻辑使用
-            this.lastMoveStreamFrame.next(streamData);
-            // this.lastMoveStreamFrameBk = streamData;
-            this.holdSteam();
-            // this.globalOptLock = true;
-            //console.log('20220627test:handleMoveSteam:' + stream.clipPath)
-            const res = await this.streamService.pushFrameToSteam(streamData);
-
-            const isLastFrameIndex = this.lastMovingPointArray.findIndex(
-              (item) => item.mediaSrc === metaData.mediaSrc,
-            );
-            // this.logger.log('path-update-index', isLastFrameIndex);
-
-            if (res.done) {
-              this.frameCnt.next(res.frame);
-              //关节点入库
-              if (isLastFrameIndex > -1) {
-                //this.logger.log('path-update-array', this.lastMovingPointArray);
-                const currentMeta = this.lastMovingPointArray[isLastFrameIndex];
-                const userId = this.user_id;
-                const breakPointId = currentMeta.metaData.endBreakPointId;
-                const lastReply = currentMeta.metaData;
-                this.moveService.updateUser(userId, breakPointId, lastReply);
-                this.lastMovingPointArray.splice(isLastFrameIndex, 1);
-                this.moveSliceLastFrame.next(currentMeta);
-              }
-
-              clearTimeout(this._moveTimeout);
-              this._moveTimeout = setTimeout(() => {
-                this.logger.log('move 交权给空流,当前pts', res.frame);
-                this.rewalking = false;
-                this.frameCnt.next(res.frame);
-                this.rotateframeCnt = -1;
-                this.onMoving.next(false);
-                this.onJoysticking.next(false);
-                this.lastMovingPointArray = [];
-                this.hasJoystickMoveRequest = false;
-                this.cleanMoveSteam();
-                this.globalOptLock = false;
-                this.resumeStream();
-                this.logger.log('move end');
-              }, 200);
-            } else {
-              console.error('流地址有误::', res.frame, JSON.stringify(res));
-              this.logger.error(
-                `movesteam::当前帧:${res.frame}` + JSON.stringify(res),
-              );
-              this.resumeStream();
-            }
+  async handleMoveSteamFn(stream: StreamFrameType) {
+    try {
+      if (!this.isStopJointing) {
+        const metaData: StreamReplyType = JSON.parse(stream.metaData);
+        // if (this.moveframeCnt === -1) {
+        //   this.moveframeCnt = this.frameCnt.getValue();
+        // }
+        // this.moveframeCnt += 1;
+        // this.latestBreakPointId = metaData.endBreakPointId;
+        this.moveframeCnt = this.frameCnt.value + 1;
+        const streamData: StreamFrameType = {
+          frame: this.moveframeCnt,
+          clipPath: stream.clipPath,
+          metaData: stream.metaData,
+          serverTime: this.mockserverTime,
+          DIR: stream.DIR,
+        };
+        this.logger.log(
+          '[media-move]: ' +
+          ', moveframeCnt: ' +
+          this.moveframeCnt +
+          ', clipPath: ' +
+          stream.clipPath +
+          ', mType: ' +
+          stream.mType +
+          ', DIR: ' +
+          stream.DIR,
+          // stream.metaData,
+        );
+        this.logger.log(
+          '[media-move-lastMovingPointArray]',
+          this.lastMovingPointArray?.length,
+        );
+        // 记录lastMoveStreamFrame给打断逻辑使用
+        this.lastMoveStreamFrame.next(streamData);
+        // this.lastMoveStreamFrameBk = streamData;
+        this.holdSteam();
+        // this.globalOptLock = true;
+        //console.log('20220627test:handleMoveSteam:' + stream.clipPath)
+        const frameTimeStart = performance.now();
+        const res = await this.streamService.pushFrameToSteam(streamData);
+
+        const isLastFrameIndex = this.lastMovingPointArray.findIndex(
+          (item) => item.mediaSrc === metaData.mediaSrc,
+        );
+        // this.logger.log('path-update-index', isLastFrameIndex);
+
+        if (res.done) {
+          const frameTimeEnd = performance.now();
+          const frameAverage = frameTimeEnd - frameTimeStart;
+          console.log('walking-frameAverage', frameAverage);
+          await this.sleep(40);
+          this.frameCnt.next(res.frame);
+          //关节点入库
+          if (isLastFrameIndex > -1) {
+            //this.logger.log('path-update-array', this.lastMovingPointArray);
+            const currentMeta = this.lastMovingPointArray[isLastFrameIndex];
+            const userId = this.user_id;
+            const breakPointId = currentMeta.metaData.endBreakPointId;
+            const lastReply = currentMeta.metaData;
+            this.moveService.updateUser(userId, breakPointId, lastReply);
+            this.lastMovingPointArray.splice(isLastFrameIndex, 1);
+            this.moveSliceLastFrame.next(currentMeta);
           }
-        } catch (error) {
-          this.logger.error('handleMoveSteam::error', error);
+
+          clearTimeout(this._moveTimeout);
+
+          this._moveTimeout = setTimeout(() => {
+            this.logger.log('move 交权给空流,当前pts', res.frame);
+            this.rewalking = false;
+            this.frameCnt.next(res.frame);
+            this.rotateframeCnt = -1;
+            this.onMoving.next(false);
+            this.onJoysticking.next(false);
+            this.lastMovingPointArray = [];
+            this.hasJoystickMoveRequest = false;
+            this.cleanMoveSteam();
+            this.globalOptLock = false;
+            this.resumeStream();
+            this.logger.log('move end');
+          }, 200);
+        } else {
+          console.error('流地址有误::', res.frame, JSON.stringify(res));
+          this.logger.error(
+            `movesteam::当前帧:${res.frame}` + JSON.stringify(res),
+          );
+          this.resumeStream();
         }
-      },
-    );
+      }
+    } catch (error) {
+      this.logger.error('handleMoveSteam::error', error);
+    }
   }
 
   cleanMoveSteam() {
@@ -1178,7 +1142,7 @@ export class SceneService implements OnModuleInit, OnModuleDestroy {
     this.channel.onBufferedAmountLow(() => {
       console.error('onBufferedAmountLow', this.channel.bufferedAmount());
       //64k->65536 128k->131072
-      this.channel.setBufferedAmountLowThreshold(65536);
+      this.channel.setBufferedAmountLowThreshold(262144);
       this.logger.error('onBufferedAmountLow', this.channel.bufferedAmount());
     });
   }

+ 42 - 18
src/scene/stream/stream.service.ts

@@ -6,6 +6,19 @@ import * as streamBuffers from 'stream-buffers';
 import { BehaviorSubject } from 'rxjs';
 import { join } from 'path';
 
+const seqExeAsyncFn = (asyncFn) => {
+  let runPromise = null;
+  return function seq(...args) {
+    if (!runPromise) {
+      runPromise = asyncFn.apply(this, args);
+      runPromise.then(() => (runPromise = null));
+      return runPromise;
+    } else {
+      return runPromise.then(() => seq.apply(this, args));
+    }
+  };
+};
+
 @Injectable()
 export class StreamService {
   private channel: DataChannel;
@@ -106,6 +119,9 @@ export class StreamService {
    *  stream core push with block  stream
    * @param stream   meta Json and stream
    */
+  // pushFrameToSteam: Promise<StreamPushResponse> = seqExeAsyncFn(
+  //   this.pushFrameToSteamFn,
+  // ) as any as Promise<StreamPushResponse>;
 
   pushFrameToSteam(stream: StreamFrameType): Promise<StreamPushResponse> {
     return new Promise(async (resolve, reject) => {
@@ -158,6 +174,13 @@ export class StreamService {
         );
 
         let steamByteLength = 0;
+        console.log(
+          'stream-size %s : frame: %s, IDR: %s',
+          ((coordBuff.byteLength + clipBuffer.byteLength) / 1024).toFixed(2) +
+          ' KB',
+          frame,
+          dir,
+        );
 
         for (let i = 0; i <= slices; i++) {
           this.onSteaming.next(true);
@@ -169,9 +192,6 @@ export class StreamService {
 
           const currentSlice = allData.slice(startSlot, sliceLength);
 
-          // console.log('startSlot', startSlot);
-          // console.log('sliceLength', sliceLength);
-
           const blockBuffStart = Buffer.alloc(this.block);
           const packBuffer = Buffer.concat([blockBuffStart, currentSlice]);
           const framePack = new DataView(
@@ -203,15 +223,25 @@ export class StreamService {
           framePack.setUint32(this.block - 4, steamByteLength);
           steamByteLength += currentSlice.byteLength;
           // const isLastFrame = framePack.byteLength - this.chunk_size < 0;
+          let isPush: boolean;
 
           // this.logger.log('statusPack', statusPack);
           if (this.channel && this.channel.isOpen()) {
-            const isPush = this.channel.sendMessageBinary(
+            console.log(
+              'slice--info:::切片信息 , index: %s,length: %s',
+              i,
+              framePack.byteLength,
+            );
+            console.log('bufferedAmount', i, this.channel.bufferedAmount());
+            isPush = this.channel.sendMessageBinary(
               Buffer.from(framePack.buffer),
             );
             if (!isPush) {
-              await this.sleep(5);
-              console.error('流测试推不成功-再试', isPush);
+              // await this.sleep(10);
+              // isPush = this.channel.sendMessageBinary(
+              //   Buffer.from(framePack.buffer),
+              // );
+              console.error('流测试推不成功-二次试', i, isPush);
             }
 
             if (i === slices) {
@@ -231,9 +261,10 @@ export class StreamService {
                   clipPath: stream.clipPath,
                 });
               } else {
+                //TODO 临时强制为推送成功
                 return resolve({
                   frame: stream.frame,
-                  done: false,
+                  done: true,
                   clipPath: stream.clipPath,
                 });
               }
@@ -242,14 +273,7 @@ export class StreamService {
             return resolve({ frame: stream.frame, done: false });
           }
         }
-        // let steamByteLength = 0;
-        console.log(
-          'stream-size %s : frame: %s, IDR: %s',
-          ((coordBuff.byteLength + clipBuffer.byteLength) / 1024).toFixed(2) +
-          ' KB',
-          frame,
-          dir,
-        );
+
       } catch (error) {
         this.logger.error(error);
         return reject({ frame: stream.frame, done: false });
@@ -258,9 +282,9 @@ export class StreamService {
   }
 
   /**
- *  stream core push with block  stream
- * @param stream   meta Json and stream
- */
+   *  stream core push with block  stream
+   * @param stream   meta Json and stream
+   */
 
   pushFrameToSteam1(stream: StreamFrameType): Promise<StreamPushResponse> {
     return new Promise((resolve, reject) => {

+ 2 - 0
start.sh

@@ -0,0 +1,2 @@
+#! /bin/bash
+NODE_ENV=production pm2 start dist/main.js  -i max