gemercheung 3 éve
szülő
commit
6135ba136e
2 módosított fájl, 71 hozzáadás és 33 törlés
  1. 46 33
      src/scene/scene.service.ts
  2. 25 0
      src/scene/stream/stream.service.ts

+ 46 - 33
src/scene/scene.service.ts

@@ -89,11 +89,21 @@ export class SceneService implements OnModuleInit, OnModuleDestroy {
   private user_id: string;
   private roomId: string;
   private onSteaming = false;
+  private testFrame = -1;
+  private RotateframeCnt = -1;
 
   setConfig(user_id: string, roomId: string) {
     this.user_id = user_id;
     this.roomId = roomId;
   }
+  checkingIsRotating() {
+    console.log('async', this.frameCnt, this.RotateframeCnt);
+    if (this.frameCnt > 0 && this.frameCnt === this.RotateframeCnt) {
+      return true;
+    } else {
+      return false;
+    }
+  }
 
   onModuleInit() {
     this.sceneGrpcService =
@@ -120,20 +130,20 @@ export class SceneService implements OnModuleInit, OnModuleDestroy {
 
   init(request: InitRequest) {
     try {
-      const initReply = this.sceneGrpcService.init(request);
-      initReply.subscribe((reply) => {
-        console.log('initReply', reply);
-      });
+      // const initReply = this.sceneGrpcService.init(request);
+      // initReply.subscribe((reply) => {
+      //   console.log('initReply', reply);
+      // });
     } catch (error) {
       console.log('error', error);
     }
   }
 
   exit(request: ExitRequest) {
-    const exitReply = this.sceneGrpcService.exit(request);
-    exitReply.subscribe((reply) => {
-      console.log('exitReply', reply);
-    });
+    // const exitReply = this.sceneGrpcService.exit(request);
+    // exitReply.subscribe((reply) => {
+    //   console.log('exitReply', reply);
+    // });
   }
 
   move(request: MoveRequest) {
@@ -142,12 +152,29 @@ export class SceneService implements OnModuleInit, OnModuleDestroy {
 
   async rotate(request: RotateRequest) {
     try {
-      const reply = this.sceneGrpcService.rotate(request);
+      // const reply = this.sceneGrpcService.rotate(request);
       if (!this.onSteaming) {
+        this.frameCnt += 1;
+        this.RotateframeCnt = this.frameCnt;
+
+        this.testFrame += 1;
+        if (this.testFrame > 358) this.testFrame = 0;
+        const stream: StreamFrameType = {
+          frame: this.frameCnt,
+          clipPath: path.join(
+            __dirname,
+            `../ws/video/100/100.${this.testFrame.padLeft(4, '0')}.h264`,
+          ),
+          metaData: JSON.stringify(frameMetaReply),
+          serverTime: 754871824,
+          DIR: 1,
+        };
+        console.log('stream', this.frameCnt, stream.clipPath);
+        this.streamService.pushFrameToSteam(stream);
         const redisMeta = await this.cacheService.rpop(
           `updateFrameMetadata:${this.user_id}`,
         );
-        this.frameCnt += 1;
+        // this.frameCnt += 1;
         if (redisMeta && redisMeta.length > 0) {
           console.log('this.user_id', this.user_id);
           const meta = JSON.parse(redisMeta);
@@ -182,18 +209,10 @@ export class SceneService implements OnModuleInit, OnModuleDestroy {
           }
         }
       }
-      reply.subscribe((res: NormalReply) => {
-        if (res.code === 200) {
-          // const stream: StreamFrameType = {
-          //   frame: this.frameCnt,
-          //   clipPath: path.join(__dirname, '../ws/video/2.h264'),
-          //   metaData: JSON.stringify(frameMetaReply),
-          //   serverTime: 754873824,
-          //   DIR: 1,
-          // };
-          // this.streamService.pushFrameToSteam(stream);
-        }
-      });
+      // reply.subscribe((res: NormalReply) => {
+      //   if (res.code === 200) {
+      //   }
+      // });
     } catch (error) {
       this.logger.error('rotate', error);
     }
@@ -225,6 +244,7 @@ export class SceneService implements OnModuleInit, OnModuleDestroy {
   handleMessage(message: string | Buffer) {
     try {
       if (typeof message === 'string') {
+        // console.log('toto', message);
         const msg: RTCMessageRequest = JSON.parse(message);
         switch (msg.action_type) {
           case ActionType.rotate:
@@ -328,7 +348,7 @@ export class SceneService implements OnModuleInit, OnModuleDestroy {
     const redisMeta = await this.cacheService.rpop(
       `updateFrameMetadata:${this.user_id}`,
     );
-    //TODO 接入redis数据
+    //TODO 接入redis数据
     console.log('redisMeta', redisMeta);
     if (redisMeta && redisMeta.length > 0) {
       const meta = JSON.parse(redisMeta);
@@ -348,7 +368,7 @@ export class SceneService implements OnModuleInit, OnModuleDestroy {
           // this.pushTheFirstFrame();
           const stream: StreamFrameType = {
             frame: 1,
-            clipPath: path.join(__dirname, '../ws/video/254.h264'),
+            clipPath: path.join(__dirname, '../ws/video/100/100.0000.h264'),
             metaData: JSON.stringify(frameMetaReply),
             serverTime: 754871824,
           };
@@ -372,6 +392,7 @@ export class SceneService implements OnModuleInit, OnModuleDestroy {
         //   }, 1000 / 30);
         // };
 
+
         if (this.frameCnt > 1 && !this.onSteaming) {
           const streamMeta: StreamMetaType = {
             frame: this.frameCnt,
@@ -380,15 +401,7 @@ export class SceneService implements OnModuleInit, OnModuleDestroy {
           this.streamService.pushMetaDataToSteam(streamMeta);
         };
 
-        // if (this.frameCnt == 4) {
-        //   const stream: StreamFrameType = {
-        //     frame: 4,
-        //     clipPath: path.join(__dirname, '../ws/video/2.h264'),
-        //     metaData: JSON.stringify(frameMetaReply),
-        //     serverTime: 754873824,
-        //     DIR: 1,
-        //   };
-        //   this.streamService.pushFrameToSteam(stream);
+
         // }
       } catch (error) {
         console.log('error', error);

+ 25 - 0
src/scene/stream/stream.service.ts

@@ -4,6 +4,7 @@ import { DataChannel } from 'node-datachannel';
 import { readFileSync } from 'fs';
 import * as streamBuffers from 'stream-buffers';
 import { BehaviorSubject } from 'rxjs';
+import { CacheService } from 'src/cache/cache.service';
 @Injectable()
 export class StreamService {
   private channel: DataChannel;
@@ -11,6 +12,7 @@ export class StreamService {
   private readonly block = 36;
   private logger: Logger = new Logger('StreamService');
   public onSteaming = new BehaviorSubject<boolean>(false);
+  constructor(private cacheService: CacheService) { }
 
   setChannel(channel: DataChannel) {
     this.channel = channel;
@@ -19,6 +21,16 @@ export class StreamService {
     this.channel = null;
   }
 
+  getQueueDataFromPool(poolKey: string) {
+
+    // const redisData = this.cacheService.()
+
+  }
+  /**
+   * stream core push normal stream
+   * @param data meta Json
+   */
+
   pushNormalDataToStream(data: any) {
     const replyBin = JSON.stringify(data).replace(/\s/g, '');
     const buff = Buffer.from(replyBin, 'utf-8');
@@ -27,6 +39,10 @@ export class StreamService {
     }
   }
 
+  /**
+   * stream core push with block meta stream
+   * @param data meta Json
+   */
   pushMetaDataToSteam(stream: StreamMetaType) {
     try {
       const metaData = stream.metaData;
@@ -69,6 +85,11 @@ export class StreamService {
     }
   }
 
+  /**
+   *  stream core push with block  stream
+   * @param stream   meta Json and stream
+   */
+
   pushFrameToSteam(stream: StreamFrameType) {
     try {
       // if (!this.onSteaming) {
@@ -144,6 +165,10 @@ export class StreamService {
         steamByteLength = 0;
         console.log('stream end');
         this.onSteaming.next(false);
+        // if(this.onSteaming){
+        //   setTimeout()
+        // }
+        
       });
     } catch (error) {
       this.logger.error(error);