Browse Source

增加优化

gemercheung 3 years ago
parent
commit
905c176ad4

+ 4 - 45
src/meta.gateway.ts

@@ -185,43 +185,17 @@ export class MetaGateway
     this.gameChanel.onOpen(() => {
       console.log('channel is open');
       this.sceneService.handleDataChanelOpen(this.gameChanel);
-
-      clearInterval(this.timer);
-
       const peers = this.peer.getSelectedCandidatePair();
       this.logger.log('配对成功', peers);
 
-      const i = 1;
-      const paths = path.join(__dirname, '../ws/video/100');
-      console.error('__dirname', __dirname);
-      console.error('paths', paths);
       if (this.gameChanel.isOpen()) {
         console.log('gameChanel', this.gameChanel.isOpen());
         this.sendWertcHeartPack(this.gameChanel);
       }
 
-      Number.prototype.padLeft = function (n, str) {
-        return Array(n - String(this).length + 1).join(str || '0') + this;
-      };
-      // this.timer = setInterval(() => {
-      //   if (i < 30) {
-      //     const steam = createReadStream(
-      //       paths + `/100.${Number(i).padLeft(4, '0')}.h264`,
-      //     );
-      //     // const steam = createReadStream(paths + `/test2`);
-      //     steam.on('data', (data: Buffer) => {
-      //       // console.log(data.buffer);
-      //       const frame = new DataView(data.buffer);
-      //       frame.setUint32(0, 1437227610);
-      //       // frame.setUint16(6, 36);
-      //       // frame.setUint16(24, 0);
-      //       // frame.setUint16(26, 0);
-      //       // frame.setUint32(28, 0);
-      //       this.gameChanel.sendMessageBinary(Buffer.from(frame.buffer));
-      //     });
-      //   }
-      //   i++;
-      // }, 1000 / 30);
+      // Number.prototype.padLeft = function (n, str) {
+      //   return Array(n - String(this).length + 1).join(str || '0') + this;
+      // };
     });
     this.gameChanel.onClosed(() => {
       console.log('gameChanel close');
@@ -278,27 +252,12 @@ export class MetaGateway
         user_id: this.user_id,
         roomId: this.roomId,
       });
-      console.log('requestPayLoad', JSON.stringify(requestPayLoad));
       this.sceneService.init(requestPayLoad);
       this.logger.log(
         'start and send to gprc sceneService,method=>init',
         requestPayLoad,
       );
 
-      // const demoVal = {
-      //   id: 'start',
-      //   data: '{"IsHost":false,"SkinID":"10089","SkinDataVersion":"1008900008","RoomTypeID":""}',
-      //   room_id: 'e629ef3e-022d-4e64-8654-703bb96410eb',
-      //   channel_id: '4f2cf027d926f3cc___channel',
-      //   user_id: '15bc22119cfa1',
-      //   trace_id: '45a925d6-7d05-4c11-93a3-0fdaabd7db6f',
-      //   packet_id: '',
-      //   session_id: '4bbfd45cbeab43d3a9b992349f00dfba',
-      //   client_os: '',
-      //   fe_version: '',
-      //   is_browser: false,
-      // };
-
       const startReply = {
         id: 'start',
         data: '{"IsHost":false,"SkinID":"10089","SkinDataVersion":"1008900008","RoomTypeID":""}',
@@ -311,7 +270,7 @@ export class MetaGateway
         client_os: '',
         fe_version: '',
       };
-      this.sceneService.onSteaming.subscribe((val) => {
+      this.sceneService.startSteaming.subscribe((val) => {
         if (val) {
           console.log('onSteaming-start', val);
           client.send(JSON.stringify(startReply));

+ 4 - 1
src/scene/scene.d.ts

@@ -1,7 +1,7 @@
 interface SceneGrpcService {
   getRoute(request: RouteRequest): Observable<any>;
   init(request: InitRequest): Observable<any>;
-  rotate(request: RotateRequest): Observable<any>;
+  rotate(request: RotateRequest): Observable<NormalReply>;
   move(request: MoveRequest): Observable<any>;
   getBreakPoint(request: GetBreakPointRequest): Observable<any>;
   joystick(request: JoystickRequest): Observable<any>;
@@ -175,3 +175,6 @@ interface RTCMessageRequest {
   trace_id: string;
   user_id: string;
 }
+interface NormalReply {
+  code: number;
+}

+ 50 - 186
src/scene/scene.service.ts

@@ -1,13 +1,13 @@
-import { Injectable, OnModuleInit } from '@nestjs/common';
+import { Injectable, OnModuleDestroy, OnModuleInit } from '@nestjs/common';
 import { ClientGrpc, Client } from '@nestjs/microservices';
 import { grpcClientOptions } from './grpc-scene.options';
 import { Logger } from '@nestjs/common';
 import { DataChannel } from 'node-datachannel';
 import * as path from 'path';
-import { statSync, createReadStream, readFileSync } from 'fs';
+// import { statSync, createReadStream, readFileSync } from 'fs';
 // import { Readable } from 'stream';
 import { BehaviorSubject } from 'rxjs';
-import * as streamBuffers from 'stream-buffers';
+// import * as streamBuffers from 'stream-buffers';
 import { ActionType } from './actionType';
 import { CacheService } from 'src/cache/cache.service';
 import { StreamService } from './stream/stream.service';
@@ -72,11 +72,11 @@ const frameMetaReply = {
   msg: 'OK',
 };
 @Injectable()
-export class SceneService implements OnModuleInit {
+export class SceneService implements OnModuleInit, OnModuleDestroy {
   constructor(
     private cacheService: CacheService,
     private streamService: StreamService,
-  ) { }
+  ) {}
   @Client(grpcClientOptions) private readonly client: ClientGrpc;
   private sceneGrpcService: SceneGrpcService;
 
@@ -85,14 +85,22 @@ export class SceneService implements OnModuleInit {
   private frameCntInterval = 1000;
   public _frameInteval: NodeJS.Timeout;
   private channel: DataChannel;
-  public onSteaming = new BehaviorSubject<boolean>(false);
+  public startSteaming = new BehaviorSubject<boolean>(false);
   public user_id: string;
   public roomId: string;
+  private onSteaming = false;
 
   onModuleInit() {
     this.sceneGrpcService =
       this.client.getService<SceneGrpcService>('SceneGrpcService');
     this.logger.log('init SceneGrpcService');
+    this.streamService.onSteaming.subscribe((val) => {
+      this.onSteaming = val;
+    });
+  }
+
+  onModuleDestroy() {
+    this.streamService.onSteaming.unsubscribe();
   }
 
   getRoute(request: RouteRequest) {
@@ -102,10 +110,6 @@ export class SceneService implements OnModuleInit {
     return this.sceneGrpcService.getBreakPoint(request);
   }
 
-  test(request: TestRequest) {
-    return this.sceneGrpcService.test(request);
-  }
-
   init(request: InitRequest) {
     try {
       const initReply = this.sceneGrpcService.init(request);
@@ -123,17 +127,29 @@ export class SceneService implements OnModuleInit {
 
   rotate(request: RotateRequest) {
     const reply = this.sceneGrpcService.rotate(request);
-    console.log('reply', reply);
-    reply.subscribe((res) => {
-      console.log('rotate-reply', res);
+    if (!this.onSteaming) {
+      this.frameCnt += 1;
+      const stream: StreamFrameType = {
+        frame: this.frameCnt,
+        clipPath: path.join(__dirname, '../ws/video/2.h264'),
+        metaData: JSON.stringify(frameMetaReply),
+        serverTime: 754873824,
+        DIR: 1,
+      };
+      this.streamService.pushFrameToSteam(stream);
+    }
+    reply.subscribe((res: NormalReply) => {
+      if (res.code === 200) {
+        // const stream: StreamFrameType = {
+        //   frame: this.frameCnt,
+        //   clipPath: path.join(__dirname, '../ws/video/2.h264'),
+        //   metaData: JSON.stringify(frameMetaReply),
+        //   serverTime: 754873824,
+        //   DIR: 1,
+        // };
+        // this.streamService.pushFrameToSteam(stream);
+      }
     });
-    // console.log('this.frameCnt', this.frameCnt);
-    // const stream: StreamFrameType = {
-    //   frame: this.frameCnt,
-    //   clipPath: path.join(__dirname, '../ws/video/254.h264'),
-    //   metaData: JSON.stringify(frameMetaReply),
-    // };
-    // this.streamService.pushFrameToSteam(stream);
   }
 
   joystick(request: JoystickRequest) {
@@ -144,12 +160,12 @@ export class SceneService implements OnModuleInit {
     this.channel = channel;
     this.streamService.setChannel(channel);
     this.handleStartCountingFrame();
-    this.onSteaming.next(true);
+    this.startSteaming.next(true);
   }
 
   handleDataChanelClose(): void {
     this.stopCountingFrame();
-    this.onSteaming.next(false);
+    this.startSteaming.next(false);
     this.streamService.closeChannel();
   }
 
@@ -159,7 +175,6 @@ export class SceneService implements OnModuleInit {
         const msg: RTCMessageRequest = JSON.parse(message);
         switch (msg.action_type) {
           case ActionType.rotate:
-            console.log('rotate', msg);
             const rotateRequest: RotateRequest = msg;
             this.rotate(rotateRequest);
             break;
@@ -173,7 +188,6 @@ export class SceneService implements OnModuleInit {
             break;
         }
       }
-      // console.log('get rtc message', message);
     } catch (error) {
       this.logger.error('handleMessage:rtc', error);
     }
@@ -270,20 +284,28 @@ export class SceneService implements OnModuleInit {
             frame: 1,
             clipPath: path.join(__dirname, '../ws/video/254.h264'),
             metaData: JSON.stringify(frameMetaReply),
+            serverTime: 754871824,
           };
           this.streamService.pushFrameToSteam(stream);
         }
 
-        // console.log('11', await this.redisService.get('test'));
-        // console.log('data', data);
-        if (this.frameCnt > 1) {
+        if (this.frameCnt > 1 && !this.onSteaming) {
           const streamMeta: StreamMetaType = {
             frame: this.frameCnt,
             metaData: JSON.stringify(frameMetaReply),
           };
           this.streamService.pushMetaDataToSteam(streamMeta);
         }
-
+        // if (this.frameCnt == 4) {
+        //   const stream: StreamFrameType = {
+        //     frame: 4,
+        //     clipPath: path.join(__dirname, '../ws/video/2.h264'),
+        //     metaData: JSON.stringify(frameMetaReply),
+        //     serverTime: 754873824,
+        //     DIR: 1,
+        //   };
+        //   this.streamService.pushFrameToSteam(stream);
+        // }
       } catch (error) {
         console.log('error', error);
       }
@@ -294,162 +316,4 @@ export class SceneService implements OnModuleInit {
     clearInterval(this._frameInteval);
     this.frameCnt = -1;
   }
-  // 首屏渲染
-  // pushTheFirstFrame() {
-  //   const chunk_size = 16000;
-  //   const block = 36;
-  //   console.log('首屏--数据');
-  //   const paths = path.join(__dirname, '../ws/video');
-  //   // const clipPath = paths + `/1.v2.h264`;
-  //   const testClipPath = paths + `/2.h264`;
-  //   const metaData = {
-  //     traceIds: [],
-  //     vehicle: null,
-  //     newUserStates: [
-  //       {
-  //         userId: '1e3fa84d5c29a',
-  //         playerState: {
-  //           roomTypeId: '',
-  //           person: 0,
-  //           avatarId: 'KGe_Boy',
-  //           skinId: '10089',
-  //           roomId: '',
-  //           isHost: false,
-  //           isFollowHost: false,
-  //           skinDataVersion: '1008900008',
-  //           avatarComponents: '',
-  //           nickName: '1e3fa84d5c29a',
-  //           movingMode: 0,
-  //           attitude: 'walk',
-  //           areaName: 'LQC',
-  //           pathName: 'thirdwalk',
-  //           pathId: 'thirdwalk',
-  //           avatarSize: 1,
-  //           extra: '{"removeWhenDisconnected":true}',
-  //           prioritySync: false,
-  //           player: {
-  //             position: { x: -755, y: -1450, z: -34 },
-  //             angle: { pitch: 0, yaw: 0, roll: 0 },
-  //           },
-  //           camera: {
-  //             position: { x: -1075, y: -1450, z: 86 },
-  //             angle: { pitch: 0, yaw: 0, roll: 0 },
-  //           },
-  //           cameraCenter: null,
-  //         },
-  //         renderInfo: {
-  //           renderType: 0,
-  //           videoFrame: null,
-  //           cameraStateType: 0,
-  //           isMoving: 0,
-  //           needIfr: 0,
-  //           isVideo: 0,
-  //           stillFrame: 0,
-  //           isRotating: 0,
-  //           isFollowing: 0,
-  //           clientPanoTitlesBitmap: [],
-  //           clientPanoTreceId: '',
-  //           prefetchVideoId: '',
-  //           noMedia: false,
-  //         },
-  //         event: {
-  //           id: '',
-  //           type: 0,
-  //           points: [],
-  //           rotateEvent: null,
-  //           removeVisitorEvent: null,
-  //         },
-  //         relation: 1,
-  //       },
-  //     ],
-  //     actionResponses: [],
-  //     getStateType: 0,
-  //     code: 0,
-  //     msg: 'OK',
-  //   };
-  //   const metaDataString = JSON.stringify(metaData).replace(/\s/g, '');
-
-  //   const coordBuff = Buffer.from(metaDataString, 'utf-8');
-
-  //   console.warn('coordBuff', coordBuff.byteLength);
-
-  //   // const steamStat = statSync(clipPath);
-
-  //   // const steamTotalSize = metaData.length + steamStat.size;
-  //   const clipBuffer = readFileSync(testClipPath);
-  //   // console.log('clipBuffer', clipBuffer);
-  //   // const fullBufferList = Buffer.concat([coordBuff, clipBuffer]);
-
-  //   // const steam = createReadStream(clipPath, {
-  //   //   highWaterMark: chunk_size - block,
-  //   // });
-
-  //   // console.log('fullBufferList', fullBufferList);
-
-  //   // const steam1 = createReadStream(fullBufferList.toString(), {
-  //   //   highWaterMark: chunk_size - block,
-  //   // });
-  //   const steam = new streamBuffers.ReadableStreamBuffer({
-  //     frequency: 1, // in milliseconds.
-  //     chunkSize: chunk_size - block, // in bytes.
-  //   });
-  //   steam.put(coordBuff);
-  //   steam.put(clipBuffer);
-
-  //   let steamByteLength = 0;
-
-  //   steam.on('data', (data: Buffer) => {
-  //     // console.log('data', data, data.byteLength);
-  //     // console.log('data-size', data);
-  //     const blockBuffStart = Buffer.alloc(block);
-
-  //     const packBuffer = Buffer.concat([blockBuffStart, data]);
-
-  //     // const isLastFrame = packBuffer.byteLength - chunk_size < 0;
-  //     // console.log('packBuffer', packBuffer.byteLength);
-  //     // if (isLastFrame) {
-  //     //   // last frame
-  //     //   packBuffer = Buffer.concat([packBuffer, coordBuff]);
-  //     //   console.log('last frame', packBuffer.byteLength);
-  //     // }
-
-  //     const framePack = new DataView(
-  //       packBuffer.buffer.slice(
-  //         packBuffer.byteOffset,
-  //         packBuffer.byteOffset + packBuffer.byteLength,
-  //       ),
-  //     );
-
-  //     // 16 bit slot
-  //     // framePack.setUint32(4)
-  //     framePack.setUint16(6, block);
-  //     framePack.setUint16(8, 1); // first render cnt
-  //     framePack.setUint16(10, 1); // isDIR
-  //     framePack.setUint16(24, 0);
-  //     framePack.setUint16(26, 0);
-
-  //     // 32 bit slot
-  //     // statusPack.setUint32(12, buff.byteLength);
-  //     // console.log('metaLen', coordBuff.byteLength);
-  //     // console.log('metaLen', clipBuffer.byteLength);
-
-  //     framePack.setUint32(0, 1437227610);
-  //     framePack.setUint32(12, coordBuff.byteLength); // metaLen
-  //     framePack.setUint32(16, clipBuffer.byteLength); // mediaLen
-  //     framePack.setUint32(20, 754871824); //server_time
-  //     framePack.setUint32(24, 0);
-  //     framePack.setUint32(28, 0);
-  //     framePack.setUint32(block - 4, steamByteLength);
-
-  //     // console.log('statusPack', statusPack);
-  //     if (this.channel && this.channel.isOpen()) {
-  //       this.channel.sendMessageBinary(Buffer.from(framePack.buffer));
-  //     }
-  //     steamByteLength += data.byteLength;
-  //   });
-  //   steam.on('end', () => {
-  //     steamByteLength = 0;
-  //     // this.onSteaming = false;
-  //   });
-  // }
 }

+ 2 - 0
src/scene/stream/stream.d.ts

@@ -2,6 +2,8 @@ interface StreamFrameType {
   frame: number;
   clipPath: string;
   metaData: string;
+  serverTime?: number;
+  DIR?: number;
 }
 interface StreamMetaType {
   frame: number;

+ 19 - 4
src/scene/stream/stream.service.ts

@@ -1,14 +1,16 @@
 import { Injectable, Logger } from '@nestjs/common';
 import { DataChannel } from 'node-datachannel';
-import * as path from 'path';
+// import * as path from 'path';
 import { readFileSync } from 'fs';
 import * as streamBuffers from 'stream-buffers';
+import { BehaviorSubject } from 'rxjs';
 @Injectable()
 export class StreamService {
   private channel: DataChannel;
   private readonly chunk_size = 16000;
   private readonly block = 36;
   private logger: Logger = new Logger('StreamService');
+  public onSteaming = new BehaviorSubject<boolean>(false);
 
   setChannel(channel: DataChannel) {
     this.channel = channel;
@@ -69,9 +71,12 @@ export class StreamService {
 
   pushFrameToSteam(stream: StreamFrameType) {
     try {
+      // if (!this.onSteaming) {
       const clipPath = stream.clipPath;
       const metaData = stream.metaData;
       const frame = stream.frame;
+      const serverTime = stream.serverTime || 754871824;
+      const dir = stream.DIR || 1;
       const metaDataString = metaData.replace(/\s/g, '');
       const coordBuff = Buffer.from(metaDataString, 'utf-8');
       console.warn('coordBuff', coordBuff.byteLength);
@@ -88,6 +93,7 @@ export class StreamService {
       let steamByteLength = 0;
 
       steam.on('data', (data: Buffer) => {
+        this.onSteaming.next(true);
         // console.log('data', data, data.byteLength);
         const blockBuffStart = Buffer.alloc(this.block);
         const packBuffer = Buffer.concat([blockBuffStart, data]);
@@ -103,7 +109,7 @@ export class StreamService {
         // framePack.setUint32(4)
         framePack.setUint16(6, this.block);
         framePack.setUint16(8, frame); // first render cnt
-        framePack.setUint16(10, 1); // isDIR
+        framePack.setUint16(10, dir); // isDIR
         framePack.setUint16(24, 0);
         framePack.setUint16(26, 0);
 
@@ -115,20 +121,29 @@ export class StreamService {
         framePack.setUint32(0, 1437227610);
         framePack.setUint32(12, coordBuff.byteLength); // metaLen
         framePack.setUint32(16, clipBuffer.byteLength); // mediaLen
-        framePack.setUint32(20, 754871824); //server_time
+        framePack.setUint32(20, serverTime); //server_time
         framePack.setUint32(24, 0);
         framePack.setUint32(28, 0);
         framePack.setUint32(this.block - 4, steamByteLength);
+        const isLastFrame = framePack.byteLength - this.chunk_size < 0;
 
         // console.log('statusPack', statusPack);
         if (this.channel && this.channel.isOpen()) {
           this.channel.sendMessageBinary(Buffer.from(framePack.buffer));
         }
         steamByteLength += data.byteLength;
+        if (isLastFrame) {
+          // console.log('isLastFrame', isLastFrame);
+          // steamByteLength = 0;
+          // this.onSteaming.next(false);
+          steam.stop();
+        }
       });
+      //TODO steam can't trigger end
       steam.on('end', () => {
         steamByteLength = 0;
-        // this.onSteaming = false;
+        console.log('stream end');
+        this.onSteaming.next(false);
       });
     } catch (error) {
       this.logger.error(error);