gemercheung преди 3 години
родител
ревизия
e55b8ee44e
променени са 7 файла, в които са добавени 165 реда и са изтрити 152 реда
  1. 6 6
      src/app.module.ts
  2. 4 3
      src/move/move.service.ts
  3. 30 32
      src/rotate/rotate.service.ts
  4. 11 11
      src/scene/scene.module.ts
  5. 104 95
      src/scene/scene.service.ts
  6. 5 0
      src/scene/stream/stream.d.ts
  7. 5 5
      src/scene/stream/stream.service.ts

+ 6 - 6
src/app.module.ts

@@ -7,19 +7,19 @@ import { RoomModule } from './room/room.module';
 import { SceneModule } from './scene/scene.module';
 // import { CacheModule } from './cache/cache.module';
 import configuration from './config/configuration';
-import { BullModule } from '@nestjs/bull';
-console.log('redis配置2', configuration().queueRedis);
+// import { BullModule } from '@nestjs/bull';
+// console.log('redis配置2', configuration().queueRedis);
 @Module({
   imports: [
     ConfigModule.forRoot({ isGlobal: true, load: [configuration] }),
     RoomModule,
     SceneModule,
-    BullModule.forRoot({
-      redis: configuration().queueRedis,
-    }),
+    // BullModule.forRoot({
+    //   redis: configuration().queueRedis,
+    // }),
     // CacheModule,
   ],
   controllers: [AppController],
   providers: [AppService, MetaGateway],
 })
-export class AppModule { }
+export class AppModule {}

+ 4 - 3
src/move/move.service.ts

@@ -150,7 +150,7 @@ export class MoveService {
       const appId = user.appId;
       const path = pathArray || [100, 101, 102]; //需要计算路径
       const angle = user.camera.angle.yaw % 44; //纠正需要
-      console.log('矫正前-相机:'+user.camera.angle.yaw);
+      console.log('矫正前-相机:' + user.camera.angle.yaw);
       const replys = {};
       const traceIds = [];
       traceIds.push(traceId);
@@ -181,7 +181,7 @@ export class MoveService {
       this.reply.traceIds = traceIds;
       this.reply['newUserStates'][0].userId = userId;
       this.reply['actionResponses'][0].traceId = traceId;
-      console.log('矫正后-相机:'+user.camera.angle.yaw);
+      console.log('矫正后-相机:' + user.camera.angle.yaw);
       const index = Math.round(user.camera.angle.yaw / 44); //过渡需要
       for (let i = 0; i < path.length - 1; ++i) {
         let pathReplys = [];
@@ -487,6 +487,7 @@ export class MoveService {
             'breakpoints:app_id:' + appId + ':break_point_id:' + contact[i],
           ); //通过contact[i],去redis里找
           //通过user.player.position;neighPoint.position获得角度
+          neighPoint = JSON.parse(neighPoint);
           angle = this.getAngle(
             user.player.position,
             { x: user.player.position.x + 1, y: user.player.position.y },
@@ -538,7 +539,7 @@ export class MoveService {
             ':end_break_point_id:' +
             chooseBreakPointId +
             ':angle:' +
-            (user.camera.angle % 44);
+            (user.camera.angle.yaw % 44);
           const moveFramesRes = await this.cacheService.get(key);
           if (moveFramesRes == null) {
             return replys;

+ 30 - 32
src/rotate/rotate.service.ts

@@ -269,43 +269,41 @@ export class RotateService {
 
       const redisData = await this.cacheService.get(key);
       //if (redisData && redisData.length > 0) {
-        const value = JSON.parse(redisData); //redisData ? JSON.parse(redisData) : null;
-        // console.log('rotate-service', value);
-        user.camera['position'] = value.cameraPosition;//value ? value.cameraPosition : '';
-        user.camera['angle'] = value.cameraAngle;//value ? value.cameraAngle : '';
+      const value = JSON.parse(redisData); //redisData ? JSON.parse(redisData) : null;
+      // console.log('rotate-service', value);
+      user.camera['position'] = value.cameraPosition; //value ? value.cameraPosition : '';
+      user.camera['angle'] = value.cameraAngle; //value ? value.cameraAngle : '';
 
-        reply['newUserStates'][0]['playerState'].player.position =
-          user.player.position;
-        reply['newUserStates'][0]['playerState'].player.angle =
-          user.player.angle;
+      reply['newUserStates'][0]['playerState'].player.position =
+        user.player.position;
+      reply['newUserStates'][0]['playerState'].player.angle = user.player.angle;
 
-        //this.reply['newUserStates'][0]['playerState'] .player
-        reply['newUserStates'][0]['playerState'].camera.position =
+      //this.reply['newUserStates'][0]['playerState'] .player
+      reply['newUserStates'][0]['playerState'].camera.position =
         value.cameraPosition;
-        reply['newUserStates'][0]['playerState'].camera.angle =
-        value.cameraAngle;
-        reply['newUserStates'][0]['playerState'].cameraCenter =
-          user.player.position;
-        // debugger
-        reply.mediaSrc =
-          '/' +
-          '0000000001' +
-          '/' +
-          user.breakPointId +
-          '/' +
-          value.directory +
-          '/' +
-          value.fileName +
-          '?m=' +
-          new Date().getTime();
+      reply['newUserStates'][0]['playerState'].camera.angle = value.cameraAngle;
+      reply['newUserStates'][0]['playerState'].cameraCenter =
+        user.player.position;
+      // debugger
+      reply.mediaSrc =
+        '/' +
+        '0000000001' +
+        '/' +
+        user.breakPointId +
+        '/' +
+        value.directory +
+        '/' +
+        value.fileName +
+        '?m=' +
+        new Date().getTime();
 
-        this.replies[userId].traceIds = [];
-        this.replies[userId].actionResponses = [];
+      this.replies[userId].traceIds = [];
+      this.replies[userId].actionResponses = [];
 
-        user.rotateInfo.horizontal_move = 0;
-  
-        return reply;
-      //} 
+      user.rotateInfo.horizontal_move = 0;
+
+      return reply;
+      //}
       // else {
       //   return null;
       // }

+ 11 - 11
src/scene/scene.module.ts

@@ -4,9 +4,9 @@ import { CacheModule } from '../cache/cache.module';
 import { CacheService } from '../cache/cache.service';
 // import { RedisService } from '../redis/redis.service';
 import { StreamService } from './stream/stream.service';
-import { BullModule } from '@nestjs/bull';
-import { RotateConsumer } from './rotate-consumer';
-import { WalkingConsumer } from './walking-consumer';
+// import { BullModule } from '@nestjs/bull';
+// import { RotateConsumer } from './rotate-consumer';
+// import { WalkingConsumer } from './walking-consumer';
 import { RotateService } from '../rotate/rotate.service';
 import { MoveService } from '../move/move.service';
 import { GetRouterService } from 'src/get-router/get-router.service';
@@ -14,12 +14,12 @@ import { GetRouterService } from 'src/get-router/get-router.service';
 @Module({
   imports: [
     CacheModule,
-    BullModule.registerQueue({
-      name: 'rotate',
-    }),
-    BullModule.registerQueue({
-      name: 'walking',
-    }),
+    // BullModule.registerQueue({
+    //   name: 'rotate',
+    // }),
+    // BullModule.registerQueue({
+    //   name: 'walking',
+    // }),
   ],
   controllers: [],
   providers: [
@@ -27,8 +27,8 @@ import { GetRouterService } from 'src/get-router/get-router.service';
     CacheService,
     StreamService,
     RotateService,
-    RotateConsumer,
-    WalkingConsumer,
+    // RotateConsumer,
+    // WalkingConsumer,
     MoveService,
     GetRouterService,
   ],

+ 104 - 95
src/scene/scene.service.ts

@@ -8,8 +8,8 @@ import { BehaviorSubject, filter, ignoreElements, take } from 'rxjs';
 import { ActionType } from './actionType';
 import { CacheService } from 'src/cache/cache.service';
 import { StreamService } from './stream/stream.service';
-import { InjectQueue } from '@nestjs/bull';
-import { Queue } from 'bull';
+// import { InjectQueue } from '@nestjs/bull';
+// import { Queue } from 'bull';
 import { RotateService } from 'src/rotate/rotate.service';
 import { DelayQueue, RxQueue, ThrottleQueue, DebounceQueue } from 'rx-queue';
 import { MoveService } from 'src/move/move.service';
@@ -24,13 +24,14 @@ export class SceneService implements OnModuleInit, OnModuleDestroy {
     private rotateService: RotateService,
     private moveService: MoveService,
     private getRouterService: GetRouterService,
-    @InjectQueue('rotate') private rotateQueue: Queue,
-    @InjectQueue('walking') private walkingQueue: Queue,
+    // @InjectQueue('rotate') private rotateQueue: Queue,
+    // @InjectQueue('walking') private walkingQueue: Queue,
   ) { }
   @Client(grpcClientOptions) private readonly client: ClientGrpc;
 
   public _frameInteval: NodeJS.Timeout;
   public _frameTimeout: NodeJS.Timeout;
+  public _rotateTimeout: NodeJS.Timeout;
   public startSteaming = new BehaviorSubject<boolean>(false);
   public onRotating = new BehaviorSubject<boolean>(false);
   public onMoving = new BehaviorSubject<boolean>(false);
@@ -55,8 +56,9 @@ export class SceneService implements OnModuleInit, OnModuleDestroy {
   private walkingSub: any;
 
   private streamServiceSub: any;
-  private roQueue: RxQueue = new DelayQueue(100);
+  private roQueue: RxQueue = new RxQueue(0);
   private clickQueue: RxQueue = new DebounceQueue(500);
+
   private moveQueue: RxQueue = new DelayQueue(600);
   private rotateTimeStamp: number;
   private lastMoveCnt = -1;
@@ -199,8 +201,6 @@ export class SceneService implements OnModuleInit, OnModuleDestroy {
       // this.moveService.init(request.app_id, request.user_id);
 
       // this.initUsers(request.app_id, request.user_id);
-      this.rotateQueue.empty();
-      this.walkingQueue.empty();
     } catch (error) {
       console.log('error', error);
     }
@@ -215,79 +215,83 @@ export class SceneService implements OnModuleInit, OnModuleDestroy {
 
   async rotate(request: RotateRequest) {
     try {
-      // if (!this.onSteaming) {
-      let redisMeta: StreamReplyType;
-      this.onRotating.next(true);
-      if (this.onMoving.value) {
-        this.onMoving.next(false);
-        const lastStreamFrame = this.lastMoveStreamFrame.getValue();
-        const metaData: StreamReplyType = JSON.parse(
-          lastStreamFrame.metaData,
-        ) as any as StreamReplyType;
-        const newUserStates: NewUserStatesType = metaData.newUserStates.find(
-          (item) => item.userId === this.user_id,
-        );
-        const trace_id = metaData.traceIds[0];
-        const userId = newUserStates.userId;
-        const breakPointId = lastStreamFrame.marker
-          .replace('P', '')
-          .replace('T', '-');
-        const cameraAngle = newUserStates.playerState.camera.angle;
-        const playerAngle = newUserStates.playerState.player.angle;
-        console.log('stop-data', trace_id, userId, cameraAngle, cameraAngle);
-        redisMeta = await this.moveService.stop(
-          trace_id,
-          userId,
-          breakPointId,
-          cameraAngle,
-          playerAngle,
-        );
-        console.log('stop-redisMeta', redisMeta);
-        // redisMeta = await this.rotateService.rotate(request);
-      } else {
-        redisMeta = await this.rotateService.rotate(request);
-      }
+      if (this.firstRender) {
+        if (!this.roQueueSubscription) {
+          this.handleRotateStream();
+        }
+        let redisMeta: StreamReplyType;
+        this.onRotating.next(true);
+        if (this.onMoving.value) {
+          this.onMoving.next(false);
+          const lastStreamFrame = this.lastMoveStreamFrame.getValue();
+          const metaData: StreamReplyType = JSON.parse(
+            lastStreamFrame.metaData,
+          ) as any as StreamReplyType;
+          const newUserStates: NewUserStatesType = metaData.newUserStates.find(
+            (item) => item.userId === this.user_id,
+          );
+          const trace_id = metaData.traceIds[0];
+          const userId = newUserStates.userId;
+          const breakPointId = lastStreamFrame.marker
+            .replace('P', '')
+            .replace('T', '-');
+          const cameraAngle = newUserStates.playerState.camera.angle;
+          const playerAngle = newUserStates.playerState.player.angle;
+          console.log('stop-data', trace_id, userId, cameraAngle, cameraAngle);
+          redisMeta = await this.moveService.stop(
+            trace_id,
+            userId,
+            breakPointId,
+            cameraAngle,
+            playerAngle,
+          );
+          console.log('stop-redisMeta', redisMeta);
+          // redisMeta = await this.rotateService.rotate(request);
+        } else {
+          redisMeta = await this.rotateService.rotate(request);
+        }
 
-      if (redisMeta && 'mediaSrc' in redisMeta) {
-        const mediaSrc: string = redisMeta.mediaSrc || '';
-        if (mediaSrc.length > 0) {
-          let src = mediaSrc.split('?')[0];
-          // 临时本地替换路经
-          src = src.replace('/0000000001/', '');
-          // 判断不是同一条源时才推出
-          if (src.length > 0) {
-            // console.log('不同源');
-            // this.frameCnt += 1;
-            this.holdSteam();
-            this.lastRenderMedia = src;
-            const clipPath = join(__dirname, `../ws/video/${src}`);
-            // console.log('src-clipPath', src, clipPath);
-            delete redisMeta.mediaSrc;
-            // if (this.rotateframeCnt === -1) {
-            //   this.rotateframeCnt = this.frameCnt.value;
-            // }
-            const nextFrame = this.frameCnt.getValue() + 1;
-            this.frameCnt.next(nextFrame);
-            const random_boolean = Math.random() < 0.3;
-
-            const stream: StreamFrameType = {
-              frame: this.frameCnt.getValue(),
-              clipPath: clipPath,
-              metaData: JSON.stringify(redisMeta),
-              serverTime: this.mockserverTime,
-              DIR: random_boolean ? 1 : 3,
-            };
-            this.rotateQueue.add(stream, {
-              delay: 5,
-              jobId: `rotate:${this.user_id}:${this.frameCnt.getValue()}`,
-              removeOnComplete: true,
-            });
-          } else {
-            this.onRotating.next(false);
+        if (redisMeta && 'mediaSrc' in redisMeta) {
+          const mediaSrc: string = redisMeta.mediaSrc || '';
+          if (mediaSrc.length > 0) {
+            let src = mediaSrc.split('?')[0];
+            // 临时本地替换路经
+            src = src.replace('/0000000001/', '');
+            // 判断不是同一条源时才推出
+            if (src.length > 0) {
+              // console.log('不同源');
+              // this.frameCnt += 1;
+              this.holdSteam();
+              this.lastRenderMedia = src;
+              const clipPath = join(__dirname, `../ws/video/${src}`);
+              // console.log('src-clipPath', src, clipPath);
+              delete redisMeta.mediaSrc;
+
+              // const nextFrame = this.frameCnt.getValue() + 1;
+              // console.log('nextFrame', nextFrame);
+              // this.frameCnt.next(nextFrame);
+              const random_boolean = Math.random() < 0.3;
+
+              const stream: StreamFrameType = {
+                frame: -1,
+                clipPath: clipPath,
+                metaData: JSON.stringify(redisMeta),
+                serverTime: this.mockserverTime,
+                DIR: this.frameCnt.getValue() < 10 ? 3 : random_boolean ? 1 : 3,
+              };
+              // this.rotateQueue.add(stream, {
+              //   delay: 5,
+              //   jobId: `rotate:${this.user_id}:${this.frameCnt.getValue()}`,
+              //   removeOnComplete: true,
+              // });
+              clearTimeout(this._rotateTimeout);
+              this.roQueue.next(stream);
+            } else {
+              // this.onRotating.next(false);
+            }
           }
         }
       }
-      // }
     } catch (error) {
       this.logger.error('rotate', error);
       console.log('error', error);
@@ -404,7 +408,6 @@ export class SceneService implements OnModuleInit, OnModuleDestroy {
         // debugger;
 
         if (walkingRes && !this.onMoving.value) {
-
           // walkingRes marker to everybody
           const res: ArrayLike<unknown> = Object.keys(walkingRes).map(
             (item) => {
@@ -545,14 +548,13 @@ export class SceneService implements OnModuleInit, OnModuleDestroy {
         }
         if (
           frame > 1 &&
-          !this.onSteaming &&
           !this.onMoving.value &&
           !this.onRotating.value &&
           this.firstRender
         ) {
-          console.log(`空白流::${frame}`);
           const redisDataAuto = await this.rotateService.echo(this.user_id);
           if (redisDataAuto) {
+            console.log(`空白流::有数据:${frame}`);
             'mediaSrc' in redisDataAuto && delete redisDataAuto.mediaSrc;
             const streamMeta: StreamMetaType = {
               frame: frame,
@@ -570,25 +572,32 @@ export class SceneService implements OnModuleInit, OnModuleDestroy {
     this.roQueueSubscription = this.roQueue.subscribe(
       async (stream: StreamFrameType) => {
         this.rotateTimeStamp = Date.now();
+        if (this.rotateframeCnt === -1) {
+          this.rotateframeCnt = this.frameCnt.value;
+        }
+        this.rotateframeCnt += 1;
 
         stream.frame = this.rotateframeCnt;
-        console.log('[media]', stream.clipPath);
-        this.logger.log(
-          `roQueueSubscription:frame:${this.rotateframeCnt}  ` +
-          JSON.stringify(stream.metaData),
-        );
-        // await this.streamService.pushFrameToSteam(stream);
-        setTimeout(() => {
-          const now = Date.now();
-          if (now - this.rotateTimeStamp > this.rotatePeriod) {
+        console.log('[media-rotate]', stream.frame, stream.clipPath);
+        // this.logger.log(
+        //   `roQueueSubscription:frame:${this.rotateframeCnt}  ` +
+        //   JSON.stringify(stream.metaData),
+        // );
+
+        const res = await this.streamService.pushFrameToSteam(stream);
+        if (res.done) {
+          clearTimeout(this._rotateTimeout);
+          this._rotateTimeout = setTimeout(() => {
+            console.log('rotate 1秒', Date.now());
             console.log('rotate end');
-            // const next = this.rotateframeCnt + 1;
-            // this.resumeStream(next);
-            // this.rotateframeCnt = -1;
-            // this.onMoving = false;
-            // this.onRotating = false;
-          }
-        }, 300);
+            // const next = res.frame + 1;
+            this.frameCnt.next(res.frame);
+            this.resumeStream();
+            this.rotateframeCnt = -1;
+            this.onMoving.next(false);
+            this.onRotating.next(false);
+          }, 300);
+        }
       },
     );
   }

+ 5 - 0
src/scene/stream/stream.d.ts

@@ -80,3 +80,8 @@ interface StreamReplyType {
 // interface NewUserStatesType{
 
 // }
+
+interface StreamPushResponse {
+  frame: number;
+  done: boolean;
+}

+ 5 - 5
src/scene/stream/stream.service.ts

@@ -18,7 +18,7 @@ export class StreamService {
     clipPath: '',
     metaData: '',
   });
-  constructor(private cacheService: CacheService) {}
+  constructor(private cacheService: CacheService) { }
 
   setChannel(channel: DataChannel) {
     this.channel = channel;
@@ -90,7 +90,7 @@ export class StreamService {
    * @param stream   meta Json and stream
    */
 
-  pushFrameToSteam(stream: StreamFrameType): Promise<boolean> {
+  pushFrameToSteam(stream: StreamFrameType): Promise<StreamPushResponse> {
     return new Promise((resolve, reject) => {
       try {
         // if (!this.onSteaming) {
@@ -168,20 +168,20 @@ export class StreamService {
             // steamByteLength = 0;
             // this.onSteaming.next(false);
             steam.stop();
-            resolve(true);
           }
         });
         //TODO steam can't trigger end
         steam.on('end', () => {
           steamByteLength = 0;
-          console.log('stream end');
+          // console.log('stream end');
           if (this.onSteaming.value) {
             this.onSteaming.next(false);
           }
+          return resolve({ frame: stream.frame, done: true });
         });
       } catch (error) {
         this.logger.error(error);
-        reject(error);
+        return reject({ frame: stream.frame, done: false });
       }
     });
   }