Просмотр исходного кода

Merge branch 'jointWorker-noque' of http://192.168.0.115:3000/zhangyupeng/meta-server into jointWorker-noque

test pc 3 лет назад
Родитель
Сommit
53001efbd1

+ 2 - 1
config.production1.yaml

@@ -36,7 +36,8 @@ redis:
 #  server: ['stun:4dage:4dage168@47.107.125.202:3478','stun:120.24.252.95:3478']
 # ['turn:4dage:4dage168@turn.4dage.com:4478', 'stun:stun.callwithus.com:3478']
 stun:
-  server: ['stun:stun.callwithus.com:3478', 'stun:47.107.125.202:3478']
+  # server: ['stun:stun.callwithus.com:3478', 'stun:47.107.125.202:3478']
+  server: ['175.178.12.158:3478','106.55.177.178:3478']
   portRangeBegin: 49152
   portRangeEnd: 65535
 

+ 8 - 6
src/meta.gateway.ts

@@ -35,12 +35,11 @@ initLogger('Debug');
   path: '/ws',
 })
 export class MetaGateway
-  implements OnGatewayInit, OnGatewayConnection, OnGatewayDisconnect
-{
+  implements OnGatewayInit, OnGatewayConnection, OnGatewayDisconnect {
   constructor(
     private readonly sceneService: SceneService,
     private readonly configService: ConfigService,
-  ) {}
+  ) { }
   private logger: Logger = new Logger('MetaGateway');
   private peer: PeerConnection = null;
   private timer: NodeJS.Timeout;
@@ -100,7 +99,7 @@ export class MetaGateway
       portRangeEnd: portRangeEnd,
       iceServers: stun_server,
       // enableIceTcp: true,
-      maxMessageSize: 662144,
+      // maxMessageSize: 662144,
       mtu: 1200,
     });
 
@@ -183,7 +182,7 @@ export class MetaGateway
 
     this.gameChanel = this.peer.createDataChannel('game-input', {
       // ordered: true,
-      // negotiated: false
+      // negotiated: false,
     });
 
     this.peer.onDataChannel((dc) => {
@@ -291,7 +290,7 @@ export class MetaGateway
           }
         },
       );
-    } catch (error) {}
+    } catch (error) { }
   }
 
   handleConnection(client: WebSocket, ...args: any[]) {
@@ -302,8 +301,11 @@ export class MetaGateway
     console.log('useId', params.get('userId'));
     console.log('roomId', params.get('roomId'));
 
+    console.log('reconnect', params.get('reconnect'));
+
     this.user_id = params.get('userId');
     this.roomId = params.get('roomId');
+    const reconnect = params.get('reconnect');
     this.sceneService.setConfig(this.user_id, this.roomId);
 
     this.logger.log(`Client connected:`);

+ 39 - 18
src/scene/scene.service.ts

@@ -251,7 +251,7 @@ export class SceneService implements OnModuleInit, OnModuleDestroy {
     //   // debugger;
     // }
     this.handleRotate(request);
-    // this._rotateCount += 1;
+    this._rotateCount += 1;
   }
   /**
    * rotate请求队列
@@ -262,8 +262,8 @@ export class SceneService implements OnModuleInit, OnModuleDestroy {
     const rotateUnlock = this.firstRender && !this.globalOptLock;
     console.log('rotateUnlock条件--->' + rotateUnlock, this.globalOptLock);
 
-    if (rotateUnlock) {
-      //console.log('20220627test:handleRotate')
+    if (rotateUnlock && this._rotateCount > 2) {
+      console.log('20220627test:handleRotate');
       const start = performance.now();
       // 当move时处理 _rotateCount是移动端同时触发的问题,rotateStopThrottle是减少重复抖动stop的处理。
       this.holdSteam();
@@ -365,6 +365,7 @@ export class SceneService implements OnModuleInit, OnModuleDestroy {
         this.logger.log('rotate end', Date.now());
         this.rotateframeCnt = -1;
         this._rotateCountFame = -1;
+        this._rotateCount = 0;
         this.latestRotateRequest = null;
         this.rotateFirstIDR = true;
         this.resumeStream();
@@ -455,7 +456,7 @@ export class SceneService implements OnModuleInit, OnModuleDestroy {
    */
   async handleRotateOrWalkingStop(request): Promise<boolean> {
     this.rotateStopThrottle = true;
-    this.isStopJointing = true
+    this.isStopJointing = true;
     const lastStreamFrame = this.lastMoveStreamFrame.getValue();
     this.logger.log(
       'handleRotateOrWalkingStop-frame',
@@ -523,7 +524,7 @@ export class SceneService implements OnModuleInit, OnModuleDestroy {
       if (hasPush.done) {
         //console.log('20220627test:handleRotateOrWalkingStop-stop:'+streamData.clipPath+'**'+streamData.frame);
         this.frameCnt.next(hasPush.frame);
-        this.isStopJointing = false
+        this.isStopJointing = false;
         // this.onMoving.next(false);
         // this.cleanMoveSteam();
         return Promise.resolve(true);
@@ -823,11 +824,11 @@ export class SceneService implements OnModuleInit, OnModuleDestroy {
    * joystick 二合一推流
    * @param joystickRes StreamMetaType | StreamFrameType;
    */
-  handlePushJoyStickSteamSeq = seqExeAsyncFn(this.handlePushJoyStickSteam)
+  handlePushJoyStickSteamSeq = seqExeAsyncFn(this.handlePushJoyStickSteam);
 
   async handlePushJoyStickSteam(joystickRes: StreamReplyType) {
     this.holdSteam();
-    this.globalOptLock = true
+    this.globalOptLock = true;
 
     //console.log('joystickRes有mediaSrc', joystickRes.mediaSrc);
     console.log(
@@ -836,10 +837,8 @@ export class SceneService implements OnModuleInit, OnModuleDestroy {
     );
     let streamData: StreamFrameType | StreamMetaType;
 
-    // if (this.joystickFrameCnt === -1) {
     this.joystickFrameCnt = this.frameCnt.getValue() + 1;
-    // }
-    // this.joystickFrameCnt += 1;
+
     const hasMedia = joystickRes?.mediaSrc && joystickRes?.mediaSrc.length > 0;
 
     if (hasMedia) {
@@ -854,7 +853,14 @@ export class SceneService implements OnModuleInit, OnModuleDestroy {
         serverTime: this.mockserverTime,
         DIR: setDIR,
       };
-
+      console.log(
+        'handlejoystick-hasMedia->-------------------------:' +
+        ' frame: ' +
+        streamData.frame +
+        mediaSrc +
+        '  IDR :' +
+        setDIR,
+      );
     } else {
       streamData = {
         frame: this.joystickFrameCnt,
@@ -862,7 +868,10 @@ export class SceneService implements OnModuleInit, OnModuleDestroy {
       };
     }
 
-    console.log('20220627test-complementFrame 进行1' + '***' + streamData.frame+','+JSON.parse(streamData.metaData).mediaSrc+','+this.moveService.cameraInfos.length);
+    console.log(
+      '20220627test-complementFrame 进行1' + '***' + streamData.frame,
+    );
+
     // 过滤新东西, 推完给回false
     this.moveService.sendingFrameForJoystick = true;
 
@@ -878,6 +887,8 @@ export class SceneService implements OnModuleInit, OnModuleDestroy {
       // if (this.isJoystickHasStream) {
       //   await this.sleep(20);
       // }
+      await this.sleep(20);
+      this.frameCnt.next(hasPush.frame);
       this.moveService.sendingFrameForJoystick = false;
       const data = joystickRes as StreamReplyType;
       console.log('handlejoystick-isIDR:' + data.isIDR);
@@ -890,7 +901,7 @@ export class SceneService implements OnModuleInit, OnModuleDestroy {
       //   //const lastReply = JSON.stringify(joystickRes);
       //   //this.moveService.updateUser(userId, breakPointId, lastReply);
       // }
-      this.frameCnt.next(hasPush.frame);
+
       /**
        * 这个complementFrame 具体说明 pools 是complementFrame这个返回值
        * 1. 第一次要在200ms后调用, 如有值(pools) 就要返回主流程执行,但设置hasJoystickFocusRepeat为true
@@ -903,18 +914,24 @@ export class SceneService implements OnModuleInit, OnModuleDestroy {
           this.user_id,
         ) as StreamReplyType;
         if (complementFrame) {
-          console.log('20220627test-complementFrame 进行' + complementFrame.mediaSrc + '***' + this.frameCnt.value);
+          console.log(
+            '20220627test-complementFrame 进行' +
+            complementFrame.mediaSrc +
+            '***' +
+            this.frameCnt.value,
+          );
           // 第二次或N次进入时如果有值直接重新进入流主程
+          this.holdSteam();
           this.handlePushJoyStickSteamSeq(complementFrame);
           this.globalOptLock = true;
         } else {
           console.log('20220627test-complementFrame 结束');
           // 第二次或N次无pool数据再次trigger handleJoystickStop
-          console.log('gemer-test-complementFrame-空2');
           this.hasJoystickFocusRepeat = false;
           this.testTimer = 0;
           //this.handleJoystickStop(hasPush);
           this.globalOptLock = false;
+          this.resumeStream();
         }
       } else {
         this.handleJoystickStop(hasPush);
@@ -990,11 +1007,12 @@ export class SceneService implements OnModuleInit, OnModuleDestroy {
         //     ),
         // );
         if (!this.onMoving.getValue()) {
+          console.log('handlejoystick:data', JSON.stringify(joystickRes));
           this.handlePushJoyStickSteamSeq(joystickRes);
         }
       } else {
         console.log('handlejoystick:null');
-        // this.onJoysticking.next(false);
+        this.onJoysticking.next(false);
       }
     } catch (error) {
       console.error('joystick错误', error);
@@ -1085,7 +1103,7 @@ export class SceneService implements OnModuleInit, OnModuleDestroy {
             // this.logger.log('path-update-index', isLastFrameIndex);
 
             if (res.done) {
-              this.frameCnt.next(res.frame)
+              this.frameCnt.next(res.frame);
               //关节点入库
               if (isLastFrameIndex > -1) {
                 //this.logger.log('path-update-array', this.lastMovingPointArray);
@@ -1158,7 +1176,10 @@ export class SceneService implements OnModuleInit, OnModuleDestroy {
     // this.startStream();
     // this.handleStream();
     this.channel.onBufferedAmountLow(() => {
-      console.log('onBufferedAmountLow');
+      console.error('onBufferedAmountLow', this.channel.bufferedAmount());
+      //64k->65536 128k->131072
+      this.channel.setBufferedAmountLowThreshold(65536);
+      this.logger.error('onBufferedAmountLow', this.channel.bufferedAmount());
     });
   }
 

+ 1 - 0
src/scene/stream/stream.d.ts

@@ -92,4 +92,5 @@ interface StreamReplyType {
 interface StreamPushResponse {
   frame: number;
   done: boolean;
+  clipPath?: string;
 }

+ 159 - 3
src/scene/stream/stream.service.ts

@@ -4,7 +4,6 @@ import { DataChannel } from 'node-datachannel';
 import { existsSync, readFileSync } from 'fs';
 import * as streamBuffers from 'stream-buffers';
 import { BehaviorSubject } from 'rxjs';
-import { CacheService } from 'src/cache/cache.service';
 import { join } from 'path';
 
 @Injectable()
@@ -28,6 +27,8 @@ export class StreamService {
   closeChannel() {
     this.channel = null;
   }
+
+  public sleep = (ms: number) => new Promise((r) => setTimeout(r, ms));
   /**
    * stream core push normal stream
    * @param data meta Json
@@ -107,6 +108,161 @@ export class StreamService {
    */
 
   pushFrameToSteam(stream: StreamFrameType): Promise<StreamPushResponse> {
+    return new Promise(async (resolve, reject) => {
+      try {
+        // let start, stop;
+        const start = performance.now();
+        //TODO process.env 开发路径
+        let clipPath: string;
+        if (process.env.NODE_ENV === 'development') {
+          const src = stream.clipPath.replace('/mnt/metaverse/scene', '');
+          const srcTmp = join(__dirname, `../ws/${src}`);
+          clipPath = srcTmp;
+        } else {
+          clipPath = stream.clipPath;
+        }
+        // 增加不存在帧数据中断数据,原因有太多不准确的路径。
+        // 其次其他地方会拿这里的最后一帧数据会出错,由于上流数据很多不稳定问题尽可保流的稳定性。
+        if (!existsSync(clipPath)) {
+          this.logger.error('不存在的推流路径::' + clipPath);
+          console.error('不存在的推流路径::' + clipPath);
+          return resolve({ frame: stream.frame, done: false });
+        }
+        if (stream.frame < 0) {
+          this.logger.error('不存在的帧位::' + stream.frame);
+          console.error('不存在的帧位::' + stream.frame);
+          return resolve({ frame: stream.frame, done: false });
+        }
+        // const clipPath = stream.clipPath;
+        const metaData = stream.metaData || '{}';
+        const frame = stream.frame;
+        const serverTime = stream.serverTime || 754871824;
+        const dir = stream.DIR || 1;
+
+        this.lastStreamFrame.next({
+          clipPath: stream.clipPath,
+          metaData: metaData,
+          frame: frame,
+          serverTime: serverTime,
+          DIR: dir,
+        });
+
+        const metaDataString = metaData.replace(/\s/g, '');
+        const coordBuff = Buffer.from(metaDataString, 'utf-8');
+        const clipBuffer = readFileSync(clipPath);
+
+        const allData = Buffer.concat([coordBuff, clipBuffer]);
+
+        const slices = Math.floor(
+          allData.byteLength / (this.chunk_size - this.block),
+        );
+
+        let steamByteLength = 0;
+
+        for (let i = 0; i <= slices; i++) {
+          this.onSteaming.next(true);
+          const startSlot = i * (this.chunk_size - this.block);
+          const sliceLength =
+            i === slices
+              ? allData.byteLength
+              : (i + 1) * (this.chunk_size - this.block);
+
+          const currentSlice = allData.slice(startSlot, sliceLength);
+
+          // console.log('startSlot', startSlot);
+          // console.log('sliceLength', sliceLength);
+
+          const blockBuffStart = Buffer.alloc(this.block);
+          const packBuffer = Buffer.concat([blockBuffStart, currentSlice]);
+          const framePack = new DataView(
+            packBuffer.buffer.slice(
+              packBuffer.byteOffset,
+              packBuffer.byteOffset + packBuffer.byteLength,
+            ),
+          );
+
+          // 16 bit slot
+          // framePack.setUint32(4)
+          framePack.setUint16(6, this.block);
+          framePack.setUint16(8, frame); // first render cnt
+          framePack.setUint16(10, dir); // isDIR
+          framePack.setUint16(24, 0);
+          framePack.setUint16(26, 0);
+
+          // 32 bit slot
+          // statusPack.setUint32(12, buff.byteLength);
+          // this.logger.log('metaLen', coordBuff.byteLength);
+          // this.logger.log('metaLen', clipBuffer.byteLength);
+          // console.log('steamByteLength', steamByteLength);
+          framePack.setUint32(0, 1437227610);
+          framePack.setUint32(12, coordBuff.byteLength); // metaLen
+          framePack.setUint32(16, clipBuffer.byteLength); // mediaLen
+          framePack.setUint32(20, serverTime); //server_time
+          framePack.setUint32(24, 0);
+          framePack.setUint32(28, 0);
+          framePack.setUint32(this.block - 4, steamByteLength);
+          steamByteLength += currentSlice.byteLength;
+          // const isLastFrame = framePack.byteLength - this.chunk_size < 0;
+
+          // this.logger.log('statusPack', statusPack);
+          if (this.channel && this.channel.isOpen()) {
+            const isPush = this.channel.sendMessageBinary(
+              Buffer.from(framePack.buffer),
+            );
+            if (!isPush) {
+              await this.sleep(5);
+              console.error('流测试推不成功-再试', isPush);
+            }
+
+            if (i === slices) {
+              this.onSteaming.next(false);
+              steamByteLength = 0;
+              if (isPush) {
+                // debugger;
+                const stop = performance.now();
+                const inMillSeconds = stop - start;
+                const rounded = Number(inMillSeconds).toFixed(3);
+                this.logger.log(
+                  `[timer]-当前流:${stream.clipPath}流耗时-->${rounded}ms`,
+                );
+                return resolve({
+                  frame: stream.frame,
+                  done: true,
+                  clipPath: stream.clipPath,
+                });
+              } else {
+                return resolve({
+                  frame: stream.frame,
+                  done: false,
+                  clipPath: stream.clipPath,
+                });
+              }
+            }
+          } else {
+            return resolve({ frame: stream.frame, done: false });
+          }
+        }
+        // let steamByteLength = 0;
+        console.log(
+          'stream-size %s : frame: %s, IDR: %s',
+          ((coordBuff.byteLength + clipBuffer.byteLength) / 1024).toFixed(2) +
+          ' KB',
+          frame,
+          dir,
+        );
+      } catch (error) {
+        this.logger.error(error);
+        return reject({ frame: stream.frame, done: false });
+      }
+    });
+  }
+
+  /**
+ *  stream core push with block  stream
+ * @param stream   meta Json and stream
+ */
+
+  pushFrameToSteam1(stream: StreamFrameType): Promise<StreamPushResponse> {
     return new Promise((resolve, reject) => {
       try {
         // let start, stop;
@@ -155,7 +311,7 @@ export class StreamService {
         const clipBuffer = readFileSync(clipPath);
 
         const steam = new streamBuffers.ReadableStreamBuffer({
-          frequency: 1, // in milliseconds.
+          frequency: 0, // in milliseconds.
           chunkSize: this.chunk_size - this.block, // in bytes.
         });
         steam.put(coordBuff);
@@ -165,7 +321,7 @@ export class StreamService {
         console.log(
           'stream-size %s : frame: %s, IDR: %s',
           ((coordBuff.byteLength + clipBuffer.byteLength) / 1024).toFixed(2) +
-            ' KB',
+          ' KB',
           frame,
           dir,
         );