gemercheung 3 years ago
parent
commit
3e1c2e3577

+ 49 - 0
src/move/move.service.ts

@@ -354,4 +354,53 @@ export class MoveService {
     const angle = (Math.atan2(det, dot) / Math.PI) * 180;
     return (angle + 360) % 360;
   }
+
+  async stop(traceId, userId, breakPointId, cameraAngle, playerAngle) {
+    const user = this.users[userId];
+    user.breakPointId = breakPointId;
+    const appId = user.appId;
+
+    const breakPointRes = await this.cacheService.get(
+      'breakpoints:app_id:' + appId + ':break_point_id:' + breakPointId,
+    );
+    const breakPoint = JSON.parse(breakPointRes);
+
+    user.player.position = breakPoint.position;
+    user.player.angle = playerAngle;
+
+    const rotateDataRes =
+      'rotateframe:app_id:' +
+      appId +
+      ':frame_index:' +
+      cameraAngle.yaw +
+      ':break_point_id:' +
+      breakPointId;
+    const rotateData = await this.cacheService.get(rotateDataRes);
+
+    user.camera.position = rotateData.cameraPosition;
+    user.camera.angle = rotateData.cameraAngle;
+
+    const reply = JSON.parse(JSON.stringify(this.reply));
+    reply.traceIds.push(traceId);
+    reply['newUserStates'][0].userId = userId;
+    reply['newUserStates'][0].playerState.player.position = breakPoint.position;
+    reply['newUserStates'][0].playerState.player.angle = playerAngle;
+    reply['newUserStates'][0].playerState.camera.position =
+      rotateData.cameraPosition;
+    reply['newUserStates'][0].playerState.camera.angle = rotateData.cameraAngle;
+    reply['newUserStates'][0].playerState.cameraCenter = breakPoint.position;
+    reply['actionResponses'][0].traceId = traceId;
+    reply.mediaSrc =
+      '/' +
+      '0000000001' +
+      '/' +
+      breakPointId +
+      '/' +
+      rotateData.directory +
+      '/' +
+      rotateData.fileName +
+      '?m=' +
+      new Date().getTime();
+    return reply;
+  }
 }

+ 75 - 77
src/scene/scene.service.ts

@@ -23,7 +23,7 @@ export class SceneService implements OnModuleInit, OnModuleDestroy {
     private rotateService: RotateService,
     private moveService: MoveService,
     @InjectQueue('rotate') private rotateQueue: Queue,
-  ) {}
+  ) { }
   @Client(grpcClientOptions) private readonly client: ClientGrpc;
   private sceneGrpcService: SceneGrpcService;
 
@@ -37,21 +37,24 @@ export class SceneService implements OnModuleInit, OnModuleDestroy {
   private roomId: string;
   private onSteaming = false;
   private rotateframeCnt = -1;
-  private moveframeCnt = -1;
   private mockserverTime = Date.now() - 1653000000478;
   private lastRenderMedia = '';
   private frameCnt = new BehaviorSubject<number>(-1);
   private frameCntSubscription: any;
   private roQueueSubscription: any;
   private moveQueueSubscription: any;
+  private walkingSub: any;
 
   private streamServiceSub: any;
   private roQueue: RxQueue = new ThrottleQueue(120);
-  private moveQueue: RxQueue = new DelayQueue(1000);
+  private clickQueue: RxQueue = new DebounceQueue(500);
+  private moveQueue: RxQueue = new DelayQueue(100);
   private rotateTimeStamp: number;
   private lastMoveCnt = -1;
   private currentMoveMaker = '';
   private onMoving = false;
+  private onRotating = false;
+  private firstRender = false;
 
   onModuleInit(): void {
     this.sceneGrpcService =
@@ -80,9 +83,10 @@ export class SceneService implements OnModuleInit, OnModuleDestroy {
   }
 
   resumeStream(value: number) {
+    this.onMoving = false;
+    this.frameCnt.next(value);
     clearTimeout(this._frameTimeout);
     clearInterval(this._frameInteval);
-    this.frameCnt.next(value);
     this._frameTimeout = setTimeout(() => {
       this._frameInteval = setInterval(async () => {
         const next = this.frameCnt.getValue() + 1;
@@ -107,7 +111,6 @@ export class SceneService implements OnModuleInit, OnModuleDestroy {
     this.frameCnt.next(-1);
     clearInterval(this._frameInteval);
     this.rotateframeCnt = -1;
-    this.moveframeCnt = -1;
   }
 
   setConfig(user_id: string, roomId: string): void {
@@ -148,10 +151,6 @@ export class SceneService implements OnModuleInit, OnModuleDestroy {
     // });
   }
 
-  move(request: MoveRequest) {
-    return this.sceneGrpcService.move(request);
-  }
-
   async rotate(request: RotateRequest) {
     try {
       // this.rotateQueue.add(request, {
@@ -265,50 +264,49 @@ export class SceneService implements OnModuleInit, OnModuleDestroy {
   }
 
   handleIframeRequest() {
-    // this.frameCnt += 1;
-    // this.onSteaming = true;
-    // const stream: StreamFrameType = {
-    //   frame: this.frameCnt,
-    //   clipPath: path.join(__dirname, '../ws/video/100/100.0000.h264'),
-    //   metaData: JSON.stringify(frameMetaReply),
-    //   serverTime: this.mockserverTime,
-    // };
-    // this.streamService.pushFrameToSteam(stream);
+    const lastStreamFrame = this.streamService.lastStreamFrame.getValue();
+    lastStreamFrame.DIR = 1;
+    console.log('lastStreamFrame', lastStreamFrame);
+    const nextFrame = this.frameCnt.getValue() + 1;
+    lastStreamFrame.frame = nextFrame;
+    this.streamService.pushFrameToSteam(lastStreamFrame);
   }
 
-  async walking(request) {
-    console.log('walking', request);
-
-    const walkingRes = await this.moveService.move(request);
-    if (walkingRes) {
-      this.onMoving = true;
-      if (!this.moveQueueSubscription) {
-        this.handleMoveSteam();
-      }
-      const res = Object.keys(walkingRes).map((item) => {
-        console.log('item', item);
-        return Array.from(walkingRes[item]).map((i) => {
-          i['marker'] = item;
-          return i;
+  async walking(req) {
+    console.log('walking', req);
+    this.clickQueue.next(req);
+    this.walkingSub = this.clickQueue.subscribe(async (request) => {
+      const walkingRes = await this.moveService.move(request);
+      if (walkingRes && !this.onMoving) {
+        this.onMoving = true;
+        this.holdSteam();
+        if (!this.moveQueueSubscription) {
+          this.handleMoveSteam();
+        }
+        const res = Object.keys(walkingRes).map((item) => {
+          console.log('item', item);
+          return Array.from(walkingRes[item]).map((i) => {
+            i['marker'] = item;
+            return i;
+          });
         });
-      });
-      const seqs = Array.from(res).flat();
-      this.lastMoveCnt = this.frameCnt.value + seqs.length;
-      console.log('lastMoveCnt', this.lastMoveCnt);
-      seqs.forEach((frame: StreamReplyType) => {
-        const mediaSrc = frame.mediaSrc;
-        delete frame.mediaSrc;
-        const stream: StreamFrameType = {
-          frame: -1,
-          clipPath: mediaSrc,
-          metaData: JSON.stringify(frame),
-          serverTime: this.mockserverTime,
-          DIR: 1,
-        };
-        this.moveQueue.next(stream);
-      });
-      this.holdSteam();
-    }
+        const seqs = Array.from(res).flat();
+        this.lastMoveCnt = this.frameCnt.value + seqs.length;
+        console.log('lastMoveCnt', this.lastMoveCnt);
+        seqs.forEach((frame: StreamReplyType) => {
+          const mediaSrc = frame.mediaSrc;
+          delete frame.mediaSrc;
+          const stream: StreamFrameType = {
+            frame: -1,
+            clipPath: mediaSrc,
+            metaData: JSON.stringify(frame),
+            serverTime: this.mockserverTime,
+            DIR: 1,
+          };
+          this.moveQueue.next(stream);
+        });
+      }
+    });
   }
 
   handleBreath(request) {
@@ -460,12 +458,12 @@ export class SceneService implements OnModuleInit, OnModuleDestroy {
   pushFirstRender(clipPath: string, metaData: string): Promise<boolean> {
     return new Promise<boolean>(async (resolve, reject) => {
       try {
-        this.onSteaming = true;
         const streamData: StreamFrameType = {
           frame: 1,
           clipPath: clipPath,
           metaData: metaData,
           serverTime: this.mockserverTime,
+          DIR: 1,
         };
         const hasPush = await this.streamService.pushFrameToSteam(streamData);
         return resolve(hasPush);
@@ -491,16 +489,16 @@ export class SceneService implements OnModuleInit, OnModuleDestroy {
               src = src.replace('/0000000001/', '');
               const clipPath = path.join(__dirname, `../ws/video/${src}`);
               delete redisData.mediaSrc;
-
               this.logger.log(
                 `user:${this.user_id}:first render stream` +
-                  JSON.stringify({ path: clipPath, meta: redisData }),
+                JSON.stringify({ path: clipPath, meta: redisData }),
               );
               const status = await this.pushFirstRender(
                 clipPath,
                 JSON.stringify(redisData),
               );
               if (status) {
+                this.firstRender = true;
                 this.resumeStream(2);
               } else {
                 this.logger.error('first render problem', status);
@@ -508,8 +506,13 @@ export class SceneService implements OnModuleInit, OnModuleDestroy {
             }
           }
         }
-        console.log(`空白流::${frame}-onSteaming`, this.onSteaming);
-        if (frame > 1 && !this.onSteaming) {
+        if (
+          frame > 1 &&
+          !this.onSteaming &&
+          !this.onMoving &&
+          this.firstRender
+        ) {
+          console.log(`空白流::${frame}`);
           const redisDataAuto = await this.rotateService.echo(this.user_id);
           if (redisDataAuto) {
             'mediaSrc' in redisDataAuto && delete redisDataAuto.mediaSrc;
@@ -536,7 +539,7 @@ export class SceneService implements OnModuleInit, OnModuleDestroy {
         stream.frame = this.rotateframeCnt;
         this.logger.log(
           `roQueueSubscription:frame:${this.rotateframeCnt}   ` +
-            JSON.stringify(stream.metaData),
+          JSON.stringify(stream.metaData),
         );
         await this.streamService.pushFrameToSteam(stream);
         setTimeout(() => {
@@ -555,42 +558,37 @@ export class SceneService implements OnModuleInit, OnModuleDestroy {
   cleanMoveSteam() {
     if (this.moveQueueSubscription) {
       this.moveQueueSubscription.unsubscribe();
-      this.moveframeCnt = -1;
       this.lastMoveCnt = -1;
       this.moveQueueSubscription = null;
     }
+    if (this.walkingSub) {
+      this.walkingSub.unsubscribe();
+      this.walkingSub = null;
+    }
   }
   handleMoveSteam() {
     this.moveQueueSubscription = this.moveQueue.subscribe(
       async (stream: StreamFrameType) => {
         const metaData: StreamReplyType = JSON.parse(stream.metaData);
-        this.holdSteam();
-        console.log('handleMoveSteam-onSteaming', this.onSteaming);
+        console.log('handleMoveSteam-onMoving', this.onMoving);
+        const next = this.frameCnt.value + 1;
+        this.currentMoveMaker = metaData.marker;
         if (this.onMoving) {
-          this.currentMoveMaker = metaData.marker;
+          this.frameCnt.next(next);
         } else {
-          if (
-            this.currentMoveMaker &&
-            this.currentMoveMaker !== metaData.marker
-          ) {
-            console.log('kill Queue');
-
-            return;
-          }
-        }
-        // console.log('moveQueueSubscription', stream, metaData.marker);
-        if (this.moveframeCnt === -1) {
-          this.moveframeCnt = this.frameCnt.value;
+          console.log('handleMoveSteam stop', next, this.currentMoveMaker);
+          this.cleanMoveSteam();
+          this.resumeStream(next);
+          return;
         }
-        this.moveframeCnt += 1;
-        console.log('this.moveframeCnt', this.moveframeCnt);
+
         let src = stream.clipPath.split('?')[0];
         // // 临时本地替换路经
         src = src.replace('/0000000001/', '');
         const clipPath = path.join(__dirname, `../ws/video/${src}`);
 
         const streamData: StreamFrameType = {
-          frame: this.moveframeCnt,
+          frame: next,
           clipPath: clipPath,
           metaData: stream.metaData,
           serverTime: this.mockserverTime,
@@ -598,8 +596,8 @@ export class SceneService implements OnModuleInit, OnModuleDestroy {
         };
         await this.streamService.pushFrameToSteam(streamData);
 
-        if (this.lastMoveCnt == this.moveframeCnt) {
-          const next = this.moveframeCnt + 1;
+        if (this.lastMoveCnt == this.frameCnt.getValue()) {
+          const next = this.frameCnt.getValue() + 1;
           console.log('last', next);
           this.resumeStream(next);
           this.cleanMoveSteam();

+ 1 - 0
src/scene/stream/stream.d.ts

@@ -5,6 +5,7 @@ interface StreamFrameType {
   serverTime?: number;
   DIR?: number;
   mediaSrc?: string; // 临时
+  marker?: string;
 }
 interface StreamMetaType {
   frame: number;

+ 17 - 6
src/scene/stream/stream.service.ts

@@ -5,7 +5,8 @@ import { readFileSync } from 'fs';
 import * as streamBuffers from 'stream-buffers';
 import { BehaviorSubject } from 'rxjs';
 import { CacheService } from 'src/cache/cache.service';
-import { resolve } from 'path';
+
+
 @Injectable()
 export class StreamService {
   private channel: DataChannel;
@@ -13,7 +14,12 @@ export class StreamService {
   private readonly block = 36;
   private logger: Logger = new Logger('StreamService');
   public onSteaming = new BehaviorSubject<boolean>(false);
-  constructor(private cacheService: CacheService) {}
+  public lastStreamFrame = new BehaviorSubject<StreamFrameType>({
+    frame: -1,
+    clipPath: '',
+    metaData: '',
+  });
+  constructor(private cacheService: CacheService) { }
 
   setChannel(channel: DataChannel) {
     this.channel = channel;
@@ -21,10 +27,6 @@ export class StreamService {
   closeChannel() {
     this.channel = null;
   }
-
-  getQueueDataFromPool(poolKey: string) {
-    // const redisData = this.cacheService.()
-  }
   /**
    * stream core push normal stream
    * @param data meta Json
@@ -98,6 +100,15 @@ export class StreamService {
         const frame = stream.frame;
         const serverTime = stream.serverTime || 754871824;
         const dir = stream.DIR || 1;
+
+        this.lastStreamFrame.next({
+          clipPath: clipPath,
+          metaData: metaData,
+          frame: frame,
+          serverTime: serverTime,
+          DIR: dir,
+          marker: stream.marker,
+        });
         const metaDataString = metaData.replace(/\s/g, '');
         const coordBuff = Buffer.from(metaDataString, 'utf-8');
         // console.warn('coordBuff', coordBuff.byteLength);