gemercheung 3 년 전
부모
커밋
39f83b3344

+ 8 - 0
src/cache/cache.service.ts

@@ -60,6 +60,14 @@ export class CacheService implements OnModuleInit {
     return JSON.parse(data);
   }
 
+  public async keys(key: string) {
+    if (!this.client) {
+      await this.getClient();
+    }
+    const data = await this.client.keys(key);
+    if (!data) return;
+    return data;
+  }
   //获取值的方法
   public async del(key: string) {
     if (!this.client) {

+ 164 - 0
src/get-router/get-router.service.ts

@@ -0,0 +1,164 @@
+import { Injectable } from '@nestjs/common';
+import { CacheService } from 'src/cache/cache.service';
+
+@Injectable()
+export class GetRouterService {
+  constructor(private cacheService: CacheService) {}
+
+  async searchRoad(appId, startPointId, clicking_point) {
+    //表示终点
+    let endPoint;
+
+    const keys = await this.cacheService.keys(`breakpoints:app_id:${appId}*`);
+    let minDis = null;
+    for (let i = 0; i < keys.length; ++i) {
+      const breakPointRes = await this.cacheService.get(keys[i]);
+      const breakPoint = JSON.parse(breakPointRes);
+      if(breakPoint.breakPointId == startPointId){
+        continue;
+      }
+      const position = breakPoint.position;
+      if(minDis == null){
+        minDis = this.getDistance(clicking_point,position);
+        endPoint = breakPoint;
+      }
+      else if(minDis>this.getDistance(clicking_point,position)){
+        endPoint = breakPoint;
+        minDis=this.getDistance(clicking_point,position);
+      }
+    }
+
+    if(minDis>100){
+      return [];
+    }
+
+    const startPointRes = await this.cacheService.get(
+      'breakpoints:app_id:' + appId + ':break_point_id:' + startPointId,
+    );
+    const startPoint = JSON.parse(startPointRes);
+
+    // const endPointRes = await this.cacheService.get(
+    //   'breakpoints:app_id:' + appId + ':break_point_id:' + endPointId,
+    // );
+    // const endPoint = JSON.parse(endPointRes);
+
+    const openList = [], //开启列表
+      closeList = []; //关闭列表
+    let result = []; //结果数组
+    let result_index: number; //结果数组在开启列表中的序号
+
+    //openList.push({x:startPoint.x,y:startPoint.y,G:0});//把当前点加入到开启列表中,并且G是0
+    openList.push({ breakPointId: startPointId, G: 0 }); //把当前点加入到开启列表中,并且G是0
+
+    do {
+      const currentPointInfo = openList.pop();
+      const currentPointRes = await this.cacheService.get(
+        'breakpoints:app_id:' +
+          appId +
+          ':break_point_id:' +
+          currentPointInfo.breakPointId,
+      );
+      const currentPoint = JSON.parse(currentPointRes);
+      closeList.push(currentPointInfo);
+
+      //读redis里的数据
+      const breakPointRes = await this.cacheService.get(
+        'breakpoints:app_id:' +
+          appId +
+          ':break_point_id:' +
+          currentPointInfo.breakPointId,
+      );
+      let surroundPoint = [];
+      const _breakPoint = JSON.parse(breakPointRes);
+      surroundPoint = _breakPoint.contact;
+
+      for (let i = 0; i < surroundPoint.length; ++i) {
+        const neighPointId = surroundPoint[i];
+        const itemRes = await this.cacheService.get(
+          'breakpoints:app_id:' + appId + ':break_point_id:' + neighPointId,
+        );
+        let item = JSON.parse(itemRes);
+        //g 到父节点的位置
+        const g =
+          currentPointInfo.G +
+          this.getDistance(currentPoint.position, item.position);
+        if (this.existList(item, openList) == -1) {
+          //如果不在开启列表中
+          item['H'] = 0;
+          item['G'] = g;
+          item['F'] = item.H + item.G;
+          item['parent'] = currentPoint;
+          openList.push(item);
+        } else {
+          //存在在开启列表中,比较目前的g值和之前的g的大小
+          const index = this.existList(item, openList);
+          //如果当前点的g更小
+          if (g < openList[index].G) {
+            openList[index].parent = currentPoint;
+            openList[index].G = g;
+            openList[index].F = g + openList[index].H;
+          }
+        }
+      }
+      //如果开启列表空了,没有通路,结果为空
+      if (openList.length == 0) {
+        break;
+      }
+
+      openList.sort(this.sortF); //这一步是为了循环回去的时候,找出 F 值最小的, 将它从 "开启列表" 中移掉
+    } while ((result_index = this.existList(endPoint, openList))==-1);
+
+    //判断结果列表是否为空
+    if (result_index == -1) {
+      result = [];
+    } else {
+      let currentObj = openList[result_index];
+      do {
+        //把路劲节点添加到result当中
+        result.unshift(currentObj.breakPointId);
+        currentObj = currentObj.parent;
+        // debugger
+      } while (
+        currentObj.position.x != startPoint.position.x ||
+        currentObj.position.y != startPoint.position.y
+      );
+    }
+    result.unshift(startPointId);
+    return result;
+  }
+
+  //用F值对数组排序
+  sortF(a, b) {
+    return b.F - a.F;
+  }
+
+  //获取周围点
+  async SurroundPoint(appId, curPointId, endPoint) {
+    //读redis里的数据
+    const breakPointRes = await this.cacheService.get(
+      'breakpoints:app_id:' + appId + ':break_point_id:' + curPointId,
+    );
+    const breakPoint = JSON.parse(breakPointRes);
+    if (this.getDistance(breakPoint.position, endPoint.position) < 1) {
+      breakPoint.contact.push(endPoint.breakPointId);
+    }
+    return breakPoint.contact;
+  }
+
+  //判断点是否存在在列表中,是的话返回的是序列号
+  existList(point, list) {
+    for (let i = 0; i < list.length; ++i) {
+      if (point.breakPointId == list[i].breakPointId) {
+        return i;
+      }
+    }
+    return -1;
+  }
+
+  getDistance(position1, position2) {
+    return Math.sqrt(
+      (position1.x - position2.x) * (position1.x - position2.x) +
+        (position1.y - position2.y) * (position1.y - position2.y),
+    );
+  }
+}

+ 35 - 17
src/move/move.service.ts

@@ -3,13 +3,13 @@ import { CacheService } from 'src/cache/cache.service';
 
 @Injectable()
 export class MoveService {
-  constructor(private cacheService: CacheService) {}
+  constructor(private cacheService: CacheService) { }
   private Actions = {
     Clicking: 1,
     Rotation: 1014,
     Joystick: 15,
   };
-  private users = {};
+  public users = {};
 
   private reply = {
     traceIds: [],
@@ -135,7 +135,7 @@ export class MoveService {
     this.users[userId] = user;
   }
 
-  async move(actionRequest) {
+  async move(pathArray, actionRequest) {
     try {
       const userId = actionRequest['user_id'];
       const traceId = actionRequest['trace_id'];
@@ -143,7 +143,7 @@ export class MoveService {
       const user = this.users[userId];
 
       const appId = user.appId;
-      const path = [100, 101, 102]; //需要计算路径
+      const path = pathArray || [100, 101, 102]; //需要计算路径
       const angle = user.camera.angle.yaw % 45; //纠正需要
       const index = Math.round(user.camera.angle.yaw / 45); //过渡需要
 
@@ -192,18 +192,18 @@ export class MoveService {
         //读redis里的数据
         const startBreakPointRes = await this.cacheService.get(
           'breakpoints:app_id:' +
-            appId +
-            ':break_point_id:' +
-            start_break_point_id,
+          appId +
+          ':break_point_id:' +
+          start_break_point_id,
         );
 
         const startBreakPoint = JSON.parse(startBreakPointRes);
 
         const endBreakPointRes = await this.cacheService.get(
           'breakpoints:app_id:' +
-            appId +
-            ':break_point_id:' +
-            end_break_point_id,
+          appId +
+          ':break_point_id:' +
+          end_break_point_id,
         );
         const endBreakPoint = JSON.parse(endBreakPointRes);
         pathReplys = this.createCacheReplys(
@@ -214,6 +214,13 @@ export class MoveService {
           startBreakPoint.position,
           endBreakPoint.position,
         );
+
+        if (i == path.length - 2) {
+          pathReplys[pathReplys.length - 1][
+            'newUserStates'
+          ][0].renderInfo.isMoving = 0;
+        }
+
         //replys.push(pathReplys);
         replys['P' + start_break_point_id + 'T' + end_break_point_id] =
           pathReplys;
@@ -297,6 +304,7 @@ export class MoveService {
     const user = this.users[userId];
     user.breakPointId = breakPointId;
 
+    debugger;
     user.player.position =
       lastReply['newUserStates'][0].playerState.player.position;
     user.player.angle = lastReply['newUserStates'][0].playerState.player.angle;
@@ -334,10 +342,16 @@ export class MoveService {
 
     //const breakPoints = await this.cacheService.get('breakpoints:app_id:'+appId+':break_point_id:'+breakPointId);
     //获取redis表全部元素,'breakpoints:app_id:'+appId+':break_point_id:'开头的
-    const breakPoints = [];
-    for (let i = 0; i < breakPoints.length; ++i) {
-      const position = breakPoints[i].position;
-      reply['nps'].push(position);
+    const keys = await this.cacheService.keys(`breakpoints:app_id:${appId}*`);
+
+    for (let i = 0; i < keys.length; ++i) {
+      const breakPointRes = await this.cacheService.get(keys[i]);
+      const breakPoint = JSON.parse(breakPointRes);
+      const position = breakPoint.position;
+      reply['nps'].push({
+        position: position,
+        breakPointId: breakPoint.breakPointId,
+      });
     }
     return reply;
   }
@@ -355,7 +369,8 @@ export class MoveService {
     return (angle + 360) % 360;
   }
 
-  async stop(traceId, userId, breakPointId, cameraAngle, playerAngle) {
+  async stop(traceId, userId, movePointIds, cameraAngle, playerAngle) {
+    const breakPointId = movePointIds.substring(movePointIds.indexOf('-') + 1);
     const user = this.users[userId];
     user.breakPointId = breakPointId;
     const appId = user.appId;
@@ -363,20 +378,22 @@ export class MoveService {
     const breakPointRes = await this.cacheService.get(
       'breakpoints:app_id:' + appId + ':break_point_id:' + breakPointId,
     );
+
     const breakPoint = JSON.parse(breakPointRes);
 
     user.player.position = breakPoint.position;
     user.player.angle = playerAngle;
 
-    const rotateDataRes =
+    const rotateKey =
       'rotateframe:app_id:' +
       appId +
       ':frame_index:' +
       cameraAngle.yaw +
       ':break_point_id:' +
       breakPointId;
-    const rotateData = await this.cacheService.get(rotateDataRes);
 
+    const rotateDataRes = await this.cacheService.get(rotateKey);
+    const rotateData = JSON.parse(rotateDataRes);
     user.camera.position = rotateData.cameraPosition;
     user.camera.angle = rotateData.cameraAngle;
 
@@ -401,6 +418,7 @@ export class MoveService {
       rotateData.fileName +
       '?m=' +
       new Date().getTime();
+    reply['newUserStates'][0].renderInfo.isMoving = 0;
     return reply;
   }
 }

+ 47 - 0
src/queue/concurrency-executor/concurrency-executer.spec.ts

@@ -0,0 +1,47 @@
+#!/usr/bin/env -S node --no-warnings --loader ts-node/esm
+
+import { test, sinon } from 'tstest';
+
+import { concurrencyExecuter } from './concurrency-executer.js';
+
+test('concurrencyExecuter() smoke testing', async (t) => {
+  const sandbox = sinon.createSandbox({
+    useFakeTimers: true,
+  });
+
+  const INPUT_LIST = [1, 2, 3, 4, 5, 6, 7, 8, 9];
+  const CONCURRENCY = 2;
+  const SLEEP_MS = 10;
+
+  const task = async (v: number) => {
+    await new Promise((resolve) => setTimeout(resolve, SLEEP_MS));
+    return v * 10;
+  };
+
+  const iterator = concurrencyExecuter(CONCURRENCY)(task)(INPUT_LIST);
+
+  const outputList: number[] = [];
+
+  (async () => {
+    for await (const item of iterator) {
+      outputList.push(item);
+    }
+  })().catch((e) => t.fail(e));
+
+  for (let i = 0; i < 3; i++) {
+    t.equal(
+      outputList.length,
+      i * CONCURRENCY,
+      'should has ' +
+        i * CONCURRENCY +
+        ' output item(s) after ' +
+        i +
+        ' iteration(s)',
+    );
+    await sandbox.clock.tickAsync(SLEEP_MS + 1);
+  }
+
+  t.pass('smoke testing passed');
+
+  sandbox.restore();
+});

+ 53 - 0
src/queue/concurrency-executor/concurrency-executer.ts

@@ -0,0 +1,53 @@
+#!/usr/bin/env -S node --no-warnings --loader ts-node/esm
+
+/**
+ * If you know how iterators work and how they are consumed you would't need any extra library,
+ *  since it can become very easy to build your own concurrency yourself.
+ *    — @Endless
+ *
+ * Inspired by: @link https://stackoverflow.com/a/51020535/1123955
+ */
+
+/**
+ * Huan's stackoverflow answer (code example) for `merge`:
+ *  @link https://stackoverflow.com/a/69985103/1123955
+ */
+import { merge } from 'ix/asynciterable/index.js';
+
+type ExecuterTask<S, T> = (value: S) => T | Promise<T>;
+
+const executeTask = <S, T>(task: ExecuterTask<S, T>) =>
+  async function* (iterator: IterableIterator<S>): AsyncIterableIterator<T> {
+    for (const one of iterator) {
+      const result = await task(one);
+      yield result;
+    }
+  };
+
+/**
+ * Execute task with the concurrency on an iterator
+ * The order will not be guaranteed. (mostly will be different)
+ */
+const concurrencyExecuter =
+  (concurrency = 1) =>
+  <S, T>(task: ExecuterTask<S, T>) =>
+    async function* (
+      iterator: Array<S> | IterableIterator<S>,
+    ): AsyncIterableIterator<T> {
+      if (Array.isArray(iterator)) {
+        iterator = iterator.values();
+      }
+
+      const executer = executeTask(task);
+
+      const resultIteratorList = new Array(concurrency)
+        .fill(iterator)
+        .map(executer) as [
+        AsyncIterableIterator<T>,
+        ...AsyncIterableIterator<T>[],
+      ];
+
+      yield* merge(...resultIteratorList);
+    };
+
+export { concurrencyExecuter };

+ 1 - 0
src/queue/config.ts

@@ -0,0 +1 @@
+export { VERSION } from './version.js';

+ 87 - 0
src/queue/debounce-queue/debounce-queue.spec.ts

@@ -0,0 +1,87 @@
+#!/usr/bin/env -S node --no-warnings --loader ts-node/esm
+
+import { test, sinon } from 'tstest';
+
+import DebounceQueue from './debounce-queue.js';
+
+const EXPECTED_ITEM1 = { test: 'testing123' };
+const EXPECTED_ITEM2 = { mol: 42 };
+const EXPECTED_ITEM3 = 42;
+
+const DELAY_PERIOD_TIME = 10; // milliseconds
+
+test('DebounceQueue 1 item', async (t) => {
+  const q = new DebounceQueue(DELAY_PERIOD_TIME);
+
+  const spy = sinon.spy();
+  q.subscribe(spy);
+
+  q.next(EXPECTED_ITEM1);
+  t.ok(spy.notCalled, 'should not called right after first item');
+
+  await new Promise((resolve) => setTimeout(resolve, DELAY_PERIOD_TIME + 3));
+  t.ok(spy.calledOnce, 'should be called after the DELAY_PERIOD_TIME');
+  t.deepEqual(
+    spy.firstCall.args[0],
+    EXPECTED_ITEM1,
+    'should get the first item immediately',
+  );
+});
+
+test('DebounceQueue 2 item', async (t) => {
+  const q = new DebounceQueue(DELAY_PERIOD_TIME);
+
+  const spy = sinon.spy();
+  q.subscribe(spy);
+
+  q.next(EXPECTED_ITEM1);
+  q.next(EXPECTED_ITEM2);
+
+  await new Promise((resolve) => setTimeout(resolve, DELAY_PERIOD_TIME + 3));
+  t.equal(
+    spy.callCount,
+    1,
+    'should be called only once after DELAY_PERIOD_TIME because its debounced',
+  );
+  t.deepEqual(
+    spy.lastCall.args[0],
+    EXPECTED_ITEM2,
+    'should get the EXPECTED_ITEM2',
+  );
+});
+
+test('DebounceQueue 3 items', async (t) => {
+  const q = new DebounceQueue(DELAY_PERIOD_TIME);
+
+  const spy = sinon.spy();
+  q.subscribe(spy);
+
+  q.next(EXPECTED_ITEM1);
+  q.next(EXPECTED_ITEM2);
+
+  await new Promise((resolve) => setTimeout(resolve, DELAY_PERIOD_TIME + 3));
+
+  q.next(EXPECTED_ITEM3);
+  t.equal(
+    spy.callCount,
+    1,
+    'should called once right after next(EXPECTED_ITEM3)',
+  );
+  t.deepEqual(
+    spy.lastCall.args[0],
+    EXPECTED_ITEM2,
+    'the first call should receive EXPECTED_ITEM2',
+  );
+
+  await new Promise((resolve) => setTimeout(resolve, DELAY_PERIOD_TIME + 3));
+  t.equal(
+    spy.callCount,
+    2,
+    'should be called twice after the DELAY_PERIOD_TIME',
+  );
+  t.deepEqual(
+    spy.lastCall.args[0],
+    EXPECTED_ITEM3,
+    'should get EXPECTED_ITEM3',
+  );
+});

+ 40 - 0
src/queue/debounce-queue/debounce-queue.ts

@@ -0,0 +1,40 @@
+import { interval, Subject, Subscription } from 'rxjs';
+import { debounce } from 'rxjs/operators';
+
+import RxQueue from '../rx-queue.js';
+
+/**
+ * DebounceQueue drops a item if there's another one comes in a period of time.
+ *
+ * T: item type
+ */
+export class DebounceQueue<T = unknown> extends RxQueue<T> {
+  private subscription: Subscription;
+  private subject: Subject<T>;
+
+  /**
+   *
+   * @param period milliseconds
+   */
+  constructor(
+    period?: number, // milliseconds
+  ) {
+    super(period);
+
+    this.subject = new Subject<T>();
+    this.subscription = this.subject
+      .pipe(debounce(() => interval(this.period)))
+      .subscribe((item: T) => super.next(item));
+  }
+
+  override next(item: T) {
+    this.subject.next(item);
+  }
+
+  override unsubscribe() {
+    this.subscription.unsubscribe();
+    super.unsubscribe();
+  }
+}
+
+export default DebounceQueue;

+ 81 - 0
src/queue/delay-queue/delay-queue-executor.spec.ts

@@ -0,0 +1,81 @@
+#!/usr/bin/env -S node --no-warnings --loader ts-node/esm
+
+import { test, sinon } from 'tstest';
+
+import DelayExecutor from './delay-queue-executor.js';
+
+const DELAY_PERIOD_TIME = 10;
+
+const EXPECTED_VAL1 = 1;
+const EXPECTED_VAL2 = 2;
+const EXPECTED_VAL3 = 3;
+
+const MEANING_OF_LIFE = 42;
+
+test('DelayQueueExecutor execute once', async (t) => {
+  const spy = sinon.spy();
+
+  const delay = new DelayExecutor(DELAY_PERIOD_TIME);
+
+  delay
+    .execute(() => spy(EXPECTED_VAL1))
+    .catch(() => {
+      /* */
+    });
+
+  t.ok(spy.calledOnce, 'should received 1 call immediately');
+  t.equal(spy.firstCall.args[0], EXPECTED_VAL1, 'should get EXPECTED_VAL1');
+});
+
+test('DelayQueueExecutor execute thrice', async (t) => {
+  const spy = sinon.spy();
+
+  const delay = new DelayExecutor(DELAY_PERIOD_TIME);
+
+  delay
+    .execute(() => spy(EXPECTED_VAL1))
+    .catch(() => {
+      /* */
+    });
+  delay
+    .execute(() => spy(EXPECTED_VAL2))
+    .catch(() => {
+      /* */
+    });
+  delay
+    .execute(() => spy(EXPECTED_VAL3))
+    .catch(() => {
+      /* */
+    });
+
+  t.equal(spy.callCount, 1, 'should call once immediately');
+  t.equal(spy.lastCall.args[0], EXPECTED_VAL1, 'should get EXPECTED_VAL1');
+
+  await new Promise((resolve) => setTimeout(resolve, DELAY_PERIOD_TIME + 3));
+  t.equal(spy.callCount, 2, 'should call twice after DELAY_PERIOD_TIME');
+  t.equal(spy.lastCall.args[0], EXPECTED_VAL2, 'should get EXPECTED_VAL2');
+
+  await new Promise((resolve) => setTimeout(resolve, DELAY_PERIOD_TIME + 3));
+  t.equal(spy.callCount, 3, 'should call thrice after 2 x DELAY_PERIOD_TIME');
+  t.equal(spy.lastCall.args[0], EXPECTED_VAL3, 'should get EXPECTED_VAL3');
+
+  await new Promise((resolve) => setTimeout(resolve, DELAY_PERIOD_TIME + 3));
+  t.equal(spy.callCount, 3, 'should keep third call...');
+});
+
+test('DelayQueueExecutor return Promise', async (t) => {
+  const delay = new DelayExecutor(0);
+
+  const mol = await delay.execute(() => MEANING_OF_LIFE);
+  t.equal(mol, MEANING_OF_LIFE, 'should get the function return value');
+
+  const p = delay.execute(() => Promise.resolve(MEANING_OF_LIFE));
+  t.ok(p instanceof Promise, 'should get the function return value(promise)');
+
+  const value = await p;
+  t.equal(
+    value,
+    MEANING_OF_LIFE,
+    'should get the function return value by await',
+  );
+});

+ 55 - 0
src/queue/delay-queue/delay-queue-executor.ts

@@ -0,0 +1,55 @@
+import type { Subscription } from 'rxjs';
+
+import DelayQueue from './delay-queue.js';
+
+export interface ExecutionUnit<T = unknown> {
+  fn: () => T;
+  name: string;
+  resolve: (value: T | PromiseLike<T>) => void;
+  reject: (e?: any) => void;
+}
+
+/**
+ * DelayQueueExecutor calls functions one by one with a delay time period between calls.
+ */
+export class DelayQueueExecutor<T = unknown> extends DelayQueue<
+  ExecutionUnit<T>
+> {
+  private readonly delayQueueSubscription: Subscription;
+
+  /**
+   *
+   * @param period milliseconds
+   */
+  constructor(period: number) {
+    super(period);
+
+    this.delayQueueSubscription = this.subscribe((unit) => {
+      try {
+        const ret = unit.fn();
+        return unit.resolve(ret);
+      } catch (e) {
+        return unit.reject(e);
+      }
+    });
+  }
+
+  async execute(fn: () => T, name?: string): Promise<T> {
+    return new Promise<T>((resolve, reject) => {
+      const unit: ExecutionUnit<T> = {
+        fn,
+        name: name || fn.name,
+        reject,
+        resolve,
+      };
+      this.next(unit);
+    });
+  }
+
+  override unsubscribe() {
+    this.delayQueueSubscription.unsubscribe();
+    super.unsubscribe();
+  }
+}
+
+export default DelayQueueExecutor;

+ 86 - 0
src/queue/delay-queue/delay-queue.spec.ts

@@ -0,0 +1,86 @@
+#!/usr/bin/env -S node --no-warnings --loader ts-node/esm
+
+import { test, sinon } from 'tstest';
+
+import DelayQueue from './delay-queue.js';
+
+const EXPECTED_ITEM1 = { test: 'testing123' };
+const EXPECTED_ITEM2 = { mol: 42 };
+const EXPECTED_ITEM3 = 42;
+
+const DELAY_PERIOD_TIME = 10; // milliseconds
+
+test('DelayQueue 1 item', async (t) => {
+  const q = new DelayQueue(DELAY_PERIOD_TIME);
+
+  const spy = sinon.spy();
+  q.subscribe(spy);
+
+  q.next(EXPECTED_ITEM1);
+
+  t.equal(spy.callCount, 1, 'should called right after first item');
+  t.deepEqual(
+    spy.lastCall.args[0],
+    EXPECTED_ITEM1,
+    'should get the first item immediately',
+  );
+});
+
+test('DelayQueue 2 item', async (t) => {
+  const q = new DelayQueue(DELAY_PERIOD_TIME);
+
+  const spy = sinon.spy();
+  q.subscribe(spy);
+
+  q.next(EXPECTED_ITEM1);
+  q.next(EXPECTED_ITEM2);
+
+  t.equal(spy.callCount, 1, 'should get one item after next two item');
+  t.deepEqual(
+    spy.lastCall.args[0],
+    EXPECTED_ITEM1,
+    'should get the first item only',
+  );
+
+  await new Promise((resolve) => setTimeout(resolve, DELAY_PERIOD_TIME + 3));
+  t.equal(spy.callCount, 2, 'should get the second item after period delay');
+  t.deepEqual(
+    spy.lastCall.args[0],
+    EXPECTED_ITEM2,
+    'should get the second item for last call',
+  );
+});
+
+test('DelayQueue 3 items', async (t) => {
+  const q = new DelayQueue(DELAY_PERIOD_TIME);
+
+  const spy = sinon.spy();
+  q.subscribe(spy);
+
+  q.next(EXPECTED_ITEM1);
+  q.next(EXPECTED_ITEM2);
+  q.next(EXPECTED_ITEM3);
+
+  t.equal(spy.callCount, 1, 'get first item immediatelly');
+  t.deepEqual(
+    spy.lastCall.args[0],
+    EXPECTED_ITEM1,
+    'should received EXPECTED_ITEM1 immediatelly',
+  );
+
+  await new Promise((resolve) => setTimeout(resolve, DELAY_PERIOD_TIME + 3));
+  t.equal(spy.callCount, 2, 'get second item after period');
+  t.deepEqual(
+    spy.lastCall.args[0],
+    EXPECTED_ITEM2,
+    'should received EXPECTED_ITEM2 after 1 x period',
+  );
+
+  await new Promise((resolve) => setTimeout(resolve, DELAY_PERIOD_TIME + 3));
+  t.equal(spy.callCount, 3, 'should get the third item after 2 x period');
+  t.deepEqual(
+    spy.lastCall.args[0],
+    EXPECTED_ITEM3,
+    'should received EXPECTED_ITEM3 after 2 x period',
+  );
+});

+ 54 - 0
src/queue/delay-queue/delay-queue.ts

@@ -0,0 +1,54 @@
+import { concat, of, Subject, Subscription, timer } from 'rxjs';
+import { concatMap, ignoreElements } from 'rxjs/operators';
+
+import RxQueue from '../rx-queue.js';
+
+/**
+ * DelayQueue passes all the items and add delays between items.
+ * T: item type
+ */
+export class DelayQueue<T = unknown> extends RxQueue<T> {
+  private subscription: Subscription;
+  private subject: Subject<T>;
+
+  /**
+   *
+   * @param period milliseconds
+   */
+  constructor(
+    period?: number, // milliseconds
+  ) {
+    super(period);
+
+    this.subject = new Subject<T>();
+    this.subscription = this.subject
+      .pipe(
+        concatMap((x) =>
+          concat(
+            of(x), // emit first item right away
+            /**
+             * Issue #71 - DelayQueue failed: behavior breaking change after RxJS from v6 to v7
+             *  https://github.com/huan/rx-queue/issues/71
+             */
+            timer(this.period).pipe(ignoreElements()),
+          ),
+        ),
+      )
+      .subscribe((item: T) => super.next(item));
+  }
+
+  override next(item: T) {
+    this.subject.next(item);
+  }
+
+  override unsubscribe() {
+    this.subscription.unsubscribe();
+    super.unsubscribe();
+  }
+
+  clean() {
+    console.log('clean', this.subject);
+  }
+}
+
+export default DelayQueue;

+ 9 - 0
src/queue/mod.ts

@@ -0,0 +1,9 @@
+export { RxQueue } from './rx-queue.js';
+
+export { DelayQueueExecutor } from './delay-queue/delay-queue-executor.js';
+export { DebounceQueue } from './debounce-queue/debounce-queue.js';
+export { DelayQueue } from './delay-queue/delay-queue.js';
+export { ThrottleQueue } from './throttle-queue/throttle-queue.js';
+export { concurrencyExecuter } from './concurrency-executor/concurrency-executer.js';
+
+export { VERSION } from './version.js';

+ 28 - 0
src/queue/rx-queue.spec.ts

@@ -0,0 +1,28 @@
+#!/usr/bin/env -S node --no-warnings --loader ts-node/esm
+
+// tslint:disable:no-shadowed-variable
+import { test, sinon } from 'tstest';
+
+import RxQueue from './rx-queue.js';
+
+test('RxQueue subscribe & next', async (t) => {
+  const EXPECTED_ITEM = { test: 'testing123' };
+  const spy = sinon.spy();
+
+  const q = new RxQueue();
+
+  q.subscribe(spy);
+  q.next(EXPECTED_ITEM);
+
+  t.ok(spy.calledOnce, 'should received 1 call');
+  t.deepEqual(
+    spy.firstCall.args[0],
+    EXPECTED_ITEM,
+    'should received EXPECTED_ITEM',
+  );
+});
+
+test('RxQueue version()', async (t) => {
+  const q = new RxQueue();
+  t.ok(/^\d+\.\d+\.\d+$/.test(q.version()), 'get version');
+});

+ 54 - 0
src/queue/rx-queue.ts

@@ -0,0 +1,54 @@
+import { PartialObserver, Subject, Subscription } from 'rxjs';
+
+import { VERSION } from './config.js';
+
+// default set to 500 milliseconds
+const DEFAULT_PERIOD_TIME = 500;
+
+// https://codepen.io/maindg/pen/xRwGvL
+export class RxQueue<T = unknown> extends Subject<T> {
+  private itemList: T[] = [];
+
+  constructor(public period = DEFAULT_PERIOD_TIME) {
+    super();
+  }
+
+  override next(item: T) {
+    if (this.observers.length > 0) {
+      super.next(item);
+    } else {
+      this.itemList.push(item);
+    }
+  }
+
+  override subscribe(observer: PartialObserver<T>): Subscription;
+  override subscribe(
+    next: (value: T) => void,
+    error?: (error: any) => void,
+    complete?: () => void,
+  ): Subscription;
+
+  override subscribe(...args: never[]): never;
+
+  override subscribe(
+    nextOrObserver: ((value: T) => void) | PartialObserver<T>,
+    error?: (error: any) => void,
+    complete?: () => void,
+  ) {
+    let subscription: Subscription; // TypeScript strict require strong typing differenciation
+    if (typeof nextOrObserver === 'function') {
+      subscription = super.subscribe(nextOrObserver, error, complete);
+    } else {
+      subscription = super.subscribe(nextOrObserver);
+    }
+    this.itemList.forEach((item) => this.next(item));
+    this.itemList = [];
+    return subscription;
+  }
+
+  public version(): string {
+    return VERSION;
+  }
+}
+
+export default RxQueue;

+ 70 - 0
src/queue/throttle-queue/throttle-queue.spec.ts

@@ -0,0 +1,70 @@
+#!/usr/bin/env -S node --no-warnings --loader ts-node/esm
+
+import { test, sinon } from 'tstest';
+
+import ThrottleQueue from './throttle-queue.js';
+
+const EXPECTED_ITEM1 = { test: 'testing123' };
+const EXPECTED_ITEM2 = { mol: 42 };
+const EXPECTED_ITEM3 = 42;
+
+const THROTTLE_PERIOD_TIME = 10; // milliseconds
+
+test('ThrottleQueue 1 item', async (t) => {
+  const q = new ThrottleQueue(THROTTLE_PERIOD_TIME);
+
+  const spy = sinon.spy();
+  q.subscribe(spy);
+
+  q.next(EXPECTED_ITEM1);
+
+  t.ok(spy.calledOnce, 'should called right after first item');
+  t.deepEqual(
+    spy.firstCall.args[0],
+    EXPECTED_ITEM1,
+    'should get the first item immediately',
+  );
+});
+
+test('ThrottleQueue 2 item', async (t) => {
+  const q = new ThrottleQueue(THROTTLE_PERIOD_TIME);
+
+  const spy = sinon.spy();
+  q.subscribe(spy);
+
+  q.next(EXPECTED_ITEM1);
+  q.next(EXPECTED_ITEM2);
+
+  t.ok(spy.calledOnce, 'should only be called once right after next two items');
+  t.deepEqual(
+    spy.firstCall.args[0],
+    EXPECTED_ITEM1,
+    'should get the first item',
+  );
+
+  await new Promise((resolve) => setTimeout(resolve, THROTTLE_PERIOD_TIME + 3));
+  t.ok(
+    spy.calledOnce,
+    'should drop the second call after period because of throttle',
+  );
+});
+
+test('ThrottleQueue 3 items', async (t) => {
+  const q = new ThrottleQueue(THROTTLE_PERIOD_TIME);
+
+  const spy = sinon.spy();
+  q.subscribe(spy);
+
+  q.next(EXPECTED_ITEM1);
+  q.next(EXPECTED_ITEM2);
+
+  await new Promise((resolve) => setTimeout(resolve, THROTTLE_PERIOD_TIME + 3));
+
+  q.next(EXPECTED_ITEM3);
+  t.ok(spy.calledTwice, 'should received the third item after THROTTLE_TIME');
+  t.deepEqual(
+    spy.secondCall.args[0],
+    EXPECTED_ITEM3,
+    'should received EXPECTED_ITEM3 (not the ITEM2!)',
+  );
+});

+ 42 - 0
src/queue/throttle-queue/throttle-queue.ts

@@ -0,0 +1,42 @@
+import { interval, Subject, Subscription } from 'rxjs';
+import { throttle } from 'rxjs/operators';
+
+import RxQueue from '../rx-queue.js';
+
+/**
+ * ThrottleQueue
+ *
+ * passes one item and then drop all the following items in a period of time.
+ *
+ * T: item type
+ */
+export class ThrottleQueue<T = unknown> extends RxQueue<T> {
+  private subscription: Subscription;
+  private subject: Subject<T>;
+
+  /**
+   *
+   * @param period milliseconds
+   */
+  constructor(
+    period?: number, // milliseconds
+  ) {
+    super(period);
+
+    this.subject = new Subject<T>();
+    this.subscription = this.subject
+      .pipe(throttle(() => interval(this.period)))
+      .subscribe((item: T) => super.next(item));
+  }
+
+  override next(item: T) {
+    this.subject.next(item);
+  }
+
+  override unsubscribe() {
+    this.subscription.unsubscribe();
+    super.unsubscribe();
+  }
+}
+
+export default ThrottleQueue;

+ 13 - 0
src/queue/version.spec.ts

@@ -0,0 +1,13 @@
+#!/usr/bin/env -S node --no-warnings --loader ts-node/esm
+
+import { test } from 'tstest';
+
+import { VERSION } from './version.js';
+
+test('Make sure the VERSION is fresh in source code', async (t) => {
+  t.equal(
+    VERSION,
+    '0.0.0',
+    'version should be 0.0.0 in source code, only updated before publish to NPM',
+  );
+});

+ 4 - 0
src/queue/version.ts

@@ -0,0 +1,4 @@
+/**
+ * This file was auto generated from scripts/generate-version.sh
+ */
+export const VERSION = '1.0.5';

+ 1 - 1
src/rotate/rotate.service.ts

@@ -177,7 +177,7 @@ export class RotateService {
 
       actionRequests.splice(0, sub);
       const hAngle = horizontal_move * 90;
-      if (Math.abs(hAngle) < 1) {
+      if (Math.abs(hAngle) < 3) {
         user.rotateInfo.horizontal_move = horizontal_move;
         //user.traceIds = traceIds;
         this.replies[userId] = reply;

+ 2 - 0
src/scene/scene.module.ts

@@ -8,6 +8,7 @@ import { BullModule } from '@nestjs/bull';
 import { RotateConsumer } from './rotate-consumer';
 import { RotateService } from '../rotate/rotate.service';
 import { MoveService } from '../move/move.service';
+import { GetRouterService } from 'src/get-router/get-router.service';
 
 @Module({
   imports: [
@@ -24,6 +25,7 @@ import { MoveService } from '../move/move.service';
     RotateService,
     RotateConsumer,
     MoveService,
+    GetRouterService,
   ],
   exports: [SceneService, CacheService],
 })

+ 67 - 132
src/scene/scene.service.ts

@@ -14,6 +14,7 @@ import { Queue } from 'bull';
 import { RotateService } from 'src/rotate/rotate.service';
 import { DelayQueue, RxQueue, ThrottleQueue, DebounceQueue } from 'rx-queue';
 import { MoveService } from 'src/move/move.service';
+import { GetRouterService } from 'src/get-router/get-router.service';
 
 @Injectable()
 export class SceneService implements OnModuleInit, OnModuleDestroy {
@@ -22,6 +23,7 @@ export class SceneService implements OnModuleInit, OnModuleDestroy {
     private streamService: StreamService,
     private rotateService: RotateService,
     private moveService: MoveService,
+    private getRouterService: GetRouterService,
     @InjectQueue('rotate') private rotateQueue: Queue,
   ) { }
   @Client(grpcClientOptions) private readonly client: ClientGrpc;
@@ -46,7 +48,7 @@ export class SceneService implements OnModuleInit, OnModuleDestroy {
   private walkingSub: any;
 
   private streamServiceSub: any;
-  private roQueue: RxQueue = new ThrottleQueue(120);
+  private roQueue: RxQueue = new DelayQueue(80);
   private clickQueue: RxQueue = new DebounceQueue(500);
   private moveQueue: RxQueue = new DelayQueue(100);
   private rotateTimeStamp: number;
@@ -55,6 +57,14 @@ export class SceneService implements OnModuleInit, OnModuleDestroy {
   private onMoving = false;
   private onRotating = false;
   private firstRender = false;
+  private currentPoint = '';
+
+
+  public lastMoveStreamFrame = new BehaviorSubject<StreamFrameType>({
+    frame: -1,
+    clipPath: '',
+    metaData: '',
+  });
 
   onModuleInit(): void {
     this.sceneGrpcService =
@@ -153,15 +163,42 @@ export class SceneService implements OnModuleInit, OnModuleDestroy {
 
   async rotate(request: RotateRequest) {
     try {
-      // this.rotateQueue.add(request, {
-      //   jobId: request.trace_id,
-      // });
       if (!this.roQueueSubscription) {
         this.handleRotateStream();
       }
       if (!this.onSteaming) {
-        this.onMoving = false;
-        const redisMeta = await this.rotateService.rotate(request);
+        let redisMeta: StreamReplyType;
+        this.onRotating = true;
+        if (this.onMoving) {
+          const lastStreamFrame = this.lastMoveStreamFrame.getValue();
+          const metaData: StreamReplyType = JSON.parse(
+            lastStreamFrame.metaData,
+          ) as any as StreamReplyType;
+          const newUserStates: NewUserStatesType = metaData.newUserStates.find(
+            (item) => item.userId === this.user_id,
+          );
+          const trace_id = metaData.traceIds[0];
+          const userId = newUserStates.userId;
+          const breakPointId = lastStreamFrame.marker
+            .replace('P', '')
+            .replace('T', '-');
+          const cameraAngle = newUserStates.playerState.camera.angle;
+          const playerAngle = newUserStates.playerState.player.angle;
+          this.onMoving = false;
+          console.log('stop-data', trace_id, userId, cameraAngle, cameraAngle);
+          redisMeta = await this.moveService.stop(
+            trace_id,
+            userId,
+            breakPointId,
+            cameraAngle,
+            playerAngle,
+          );
+          console.log('stop-redisMeta', redisMeta);
+          // redisMeta = await this.rotateService.rotate(request);
+        } else {
+          redisMeta = await this.rotateService.rotate(request);
+        }
+
         if (redisMeta && 'mediaSrc' in redisMeta) {
           const mediaSrc: string = redisMeta.mediaSrc || '';
           if (mediaSrc.length > 0) {
@@ -170,9 +207,9 @@ export class SceneService implements OnModuleInit, OnModuleDestroy {
             src = src.replace('/0000000001/', '');
             // 判断不是同一条源时才推出
             if (this.lastRenderMedia !== src) {
-              console.log('[media]', src);
               // console.log('不同源');
               // this.frameCnt += 1;
+              console.log('[core-src]', src);
 
               this.holdSteam();
               this.lastRenderMedia = src;
@@ -276,7 +313,15 @@ export class SceneService implements OnModuleInit, OnModuleDestroy {
     console.log('walking', req);
     this.clickQueue.next(req);
     this.walkingSub = this.clickQueue.subscribe(async (request) => {
-      const walkingRes = await this.moveService.move(request);
+      const user = this.moveService.users[this.user_id];
+      const path = await this.getRouterService.searchRoad(
+        user.appId,
+        user.breakPointId,
+        req.clicking_action.clicking_point,
+      );
+      const walkingRes = await this.moveService.move(path, request);
+      console.log('walkingRes', walkingRes)
+      debugger;
       if (walkingRes && !this.onMoving) {
         this.onMoving = true;
         this.holdSteam();
@@ -309,128 +354,10 @@ export class SceneService implements OnModuleInit, OnModuleDestroy {
     });
   }
 
-  handleBreath(request) {
-    // console.log('handleBreath', request);
-    const nps = {
-      actionType: 1004,
-      pointType: 100,
-      extra: '',
-      traceId: request.trace_id,
-      packetId: '',
-      nps: [
-        { x: 235, y: -1791.875, z: -0.25 },
-        { x: 235, y: -1750, z: 0.5045047 },
-        { x: 235, y: -1722.5, z: 1 },
-        { x: 235, y: -1700, z: 1.4054055 },
-        { x: 235, y: -1653.125, z: 2.25 },
-        { x: 235, y: -1600, z: 3.2072072 },
-        { x: 235, y: -1583.75, z: 3.5 },
-        { x: 235, y: -1550, z: 4.108108 },
-        { x: 235, y: -1514.375, z: 4.75 },
-        { x: 235, y: -1499.9999, z: 5.0090113 },
-        { x: 235, y: -1445, z: 6 },
-        { x: 200, y: -1900, z: -1.8723404 },
-        { x: 200, y: -1850, z: -0.80851054 },
-        { x: 200, y: -1800, z: 0.23423433 },
-        { x: 200, y: -1750, z: 1.1351352 },
-        { x: 200, y: -1700, z: 2.036036 },
-        { x: 200, y: -1650, z: 2.9369369 },
-        { x: 200, y: -1600, z: 3.837838 },
-        { x: 200, y: -1550, z: 4.738739 },
-        { x: 200, y: -1500, z: 5.63964 },
-        { x: 200, y: -1444.2554, z: 6 },
-        { x: 176.25, y: -1443.75, z: 6 },
-        { x: 150, y: -2000, z: -4 },
-        { x: 150, y: -1950, z: -2.93617 },
-        { x: 150, y: -1932.8572, z: -2.5714293 },
-        { x: 150, y: -1900, z: -1.8723404 },
-        { x: 150, y: -1865.7142, z: -1.1428561 },
-        { x: 150, y: -1850.0001, z: -0.8085134 },
-        { x: 150, y: -1798.5714, z: 0.28571463 },
-        { x: 150, y: -1750, z: 1.319149 },
-        { x: 150, y: -1731.4286, z: 1.7142854 },
-        { x: 150, y: -1700, z: 2.382979 },
-        { x: 150, y: -1664.2856, z: 3.142859 },
-        { x: 150, y: -1650.0001, z: 3.446806 },
-        { x: 150, y: -1597.1428, z: 4.5714293 },
-        { x: 150, y: -1550, z: 5.5744677 },
-        { x: 150, y: -1443.1917, z: 6 },
-        { x: 150, y: -1500, z: 6 },
-        { x: 150, y: -1530, z: 6 },
-        { x: 117.5, y: -1442.5, z: 6 },
-        { x: 115, y: -1495, z: 6 },
-        { x: 100, y: -1442.1277, z: 6 },
-        { x: 100, y: -1495, z: 6 },
-        { x: 58.75, y: -1441.25, z: 6 },
-        { x: 57.5, y: -1495, z: 6 },
-        { x: 0, y: -1440, z: 6 },
-        { x: 0, y: -1495, z: 6 },
-        { x: -50, y: -1494.9999, z: 6 },
-        { x: -50, y: -1439.3378, z: 7.324497 },
-        { x: -64.16667, y: -1495, z: 6 },
-        { x: -68.63635, y: -1439.091, z: 7.8181725 },
-        { x: -100, y: -1494.9999, z: 6 },
-        { x: -100, y: -1438.6755, z: 8.648995 },
-        { x: -128.33334, y: -1495, z: 6 },
-        { x: -137.2727, y: -1438.1818, z: 9.636378 },
-        { x: -150, y: -1494.9999, z: 6 },
-        { x: -150, y: -1438.0132, z: 9.973526 },
-        { x: -192.50002, y: -1495, z: 6 },
-        { x: -200, y: -1450, z: 7.936013 },
-        { x: -205.90906, y: -1437.2727, z: 11.45455 },
-        { x: -250, y: -1450, z: 9.084496 },
-        { x: -250, y: -1436.6887, z: 12.622522 },
-        { x: -256.6667, y: -1495, z: 6 },
-        { x: -274.5454, y: -1436.3636, z: 13.272723 },
-        { x: -300, y: -1494.9999, z: 6 },
-        { x: -300, y: -1450, z: 10.232978 },
-        { x: -300, y: -1436.0264, z: 13.947053 },
-        { x: -320.83337, y: -1495, z: 6 },
-        { x: -343.1818, y: -1435.4546, z: 15.090897 },
-        { x: -350, y: -1494.9999, z: 6 },
-        { x: -350, y: -1450, z: 11.38146 },
-        { x: -385, y: -1495, z: 6 },
-        { x: -400, y: -1450, z: 10.610336 },
-        { x: -405, y: -1475, z: 6 },
-        { x: -411.81818, y: -1434.5454, z: 11.454548 },
-        { x: -450, y: -1474.9999, z: 0.8571429 },
-        { x: -450, y: -1450, z: 3.8835106 },
-        { x: -450, y: -1434.0398, z: 6.3973427 },
-        { x: -475, y: -1475, z: -2 },
-        { x: -480.45453, y: -1433.6364, z: 2.3636398 },
-        { x: -500, y: -1474.9999, z: -4.8571415 },
-        { x: -500, y: -1450, z: -2.8433151 },
-        { x: -500, y: -1433.3776, z: -0.22517776 },
-        { x: -545, y: -1475, z: -10 },
-        { x: -549.0909, y: -1432.7273, z: -6.727272 },
-        { x: -550, y: -1450, z: -9.570139 },
-        { x: -600, y: -1450, z: -16.285715 },
-        { x: -600, y: -1474.9999, z: -16.285715 },
-        { x: -600, y: -1432.053, z: -13.4702 },
-        { x: -615, y: -1475, z: -18 },
-        { x: -617.7273, y: -1431.8182, z: -15.818193 },
-        { x: -650, y: -1450, z: -22 },
-        { x: -650, y: -1474.9999, z: -22 },
-        { x: -650, y: -1431.3907, z: -20.09272 },
-        { x: -685, y: -1475, z: -26 },
-        { x: -686.36365, y: -1430.909, z: -24.909088 },
-        { x: -700, y: -1450, z: -27.714287 },
-        { x: -700, y: -1474.9999, z: -27.714283 },
-        { x: -700, y: -1430.7284, z: -26.71522 },
-        { x: -755, y: -1430, z: -34 },
-        { x: -755, y: -1450, z: -34 },
-        { x: -755, y: -1475, z: -34 },
-      ],
-      peopleNum: 0,
-      zoneId: '',
-      echoMsg: '',
-      reserveDetail: null,
-      userWithAvatarList: [],
-      newUserStates: [],
-      code: 0,
-      msg: '',
-    };
-    this.streamService.pushNormalDataToStream(nps);
+  async handleBreath(request) {
+    const npsRes = await this.moveService.getBreakPoints(request);
+    console.log('npsRes', npsRes);
+    this.streamService.pushNormalDataToStream(npsRes);
   }
 
   updateStatus() {
@@ -537,8 +464,9 @@ export class SceneService implements OnModuleInit, OnModuleDestroy {
         }
         this.rotateframeCnt += 1;
         stream.frame = this.rotateframeCnt;
+        console.log('[media]', stream.clipPath);
         this.logger.log(
-          `roQueueSubscription:frame:${this.rotateframeCnt}   ` +
+          `roQueueSubscription:frame:${this.rotateframeCnt}  ` +
           JSON.stringify(stream.metaData),
         );
         await this.streamService.pushFrameToSteam(stream);
@@ -549,6 +477,7 @@ export class SceneService implements OnModuleInit, OnModuleDestroy {
             this.resumeStream(next);
             this.rotateframeCnt = -1;
             this.onMoving = false;
+            this.onRotating = false;
           }
         }, 300);
       },
@@ -571,6 +500,8 @@ export class SceneService implements OnModuleInit, OnModuleDestroy {
       async (stream: StreamFrameType) => {
         const metaData: StreamReplyType = JSON.parse(stream.metaData);
         console.log('handleMoveSteam-onMoving', this.onMoving);
+        stream.marker = metaData.marker;
+        this.lastMoveStreamFrame.next(stream);
         const next = this.frameCnt.value + 1;
         this.currentMoveMaker = metaData.marker;
         if (this.onMoving) {
@@ -581,7 +512,6 @@ export class SceneService implements OnModuleInit, OnModuleDestroy {
           this.resumeStream(next);
           return;
         }
-
         let src = stream.clipPath.split('?')[0];
         // // 临时本地替换路经
         src = src.replace('/0000000001/', '');
@@ -601,6 +531,11 @@ export class SceneService implements OnModuleInit, OnModuleDestroy {
           console.log('last', next);
           this.resumeStream(next);
           this.cleanMoveSteam();
+          const lastFrame = this.lastMoveStreamFrame.getValue();
+          const userId = this.user_id;
+          const breakPointId = lastFrame.marker.split('T')[1];
+          const lastReply = JSON.parse(lastFrame.metaData);
+          this.moveService.updateUser(userId, breakPointId, lastReply);
         }
       },
     );

+ 58 - 60
src/scene/stream/stream.d.ts

@@ -12,73 +12,71 @@ interface StreamMetaType {
   metaData: string;
 }
 
-// interface   newUserStates: [
-//   {
-//     userId: 'dcff36ae4fc1d',
-//     playerState: {
-//       roomTypeId: '',
-//       person: 0,
-//       avatarId: '',
-//       skinId: '',
-//       roomId: '',
-//       isHost: false,
-//       isFollowHost: false,
-//       skinDataVersion: '',
-//       avatarComponents: '',
-//       nickName: '',
-//       movingMode: 0,
-//       attitude: '',
-//       areaName: '',
-//       pathName: '',
-//       pathId: '',
-//       avatarSize: 1,
-//       extra: '',
-//       prioritySync: false,
-//       player: {
-//         position: { x: -700, y: 0, z: 0 },
-//         angle: {
-//           pitch: 0,
-//           yaw: 0,
-//           roll: 0,
-//         },
-//       },
-//       camera: {
-//         position: { x: -1145, y: 0, z: 160 },
-//         angle: {
-//           pitch: 0,
-//           yaw: 0,
-//           roll: 0,
-//         },
-//       },
-//       cameraCenter: { x: -700, y: 0, z: 0 },
-//     },
-//     renderInfo: {
-//       renderType: 0,
-//       videoFrame: null,
-//       cameraStateType: 3,
-//       isMoving: 1,
-//       needIfr: 0,
-//       isVideo: 0,
-//       stillFrame: 0,
-//       isRotating: 0,
-//       isFollowing: 0,
-//       clientPanoTitlesBitmap: [],
-//       clientPanoTreceId: '',
-//       prefetchVideoId: '',
-//       noMedia: false,
-//     },
-//     event: null,
-//     relation: 1,
-//   },
-// ],
+interface NewUserStatesType {
+  userId: string;
+  playerState: PlayerStateType;
+  renderInfo: RenderInfoType;
+  event: string | null;
+  relation: number;
+}
+
+interface PlayerStateType {
+  roomTypeId: string;
+  person: number;
+  avatarId: string;
+  skinId: string;
+  roomId: string;
+  isHost: boolean;
+  isFollowHost: boolean;
+  skinDataVersion: string;
+  avatarComponents: string;
+  nickName: string;
+  movingMode: number;
+  attitude: string;
+  areaName: string;
+  pathName: string;
+  pathId: string;
+  avatarSize: number;
+  extra: string;
+  prioritySync: boolean;
+  player: {
+    position: Point;
+    angle: Angle;
+  };
+  camera: {
+    position: Point;
+    angle: Angle;
+  };
+  cameraCenter: Point;
+}
+
+interface RenderInfoType {
+  renderType: number;
+  videoFrame: null | string;
+  cameraStateType: number;
+  isMoving: number;
+  needIfr: number;
+  isVideo: number;
+  stillFrame: number;
+  isRotating: number;
+  isFollowing: number;
+  clientPanoTitlesBitmap: any[];
+  clientPanoTreceId: string;
+  prefetchVideoId: string;
+  noMedia: boolean;
+}
 interface StreamReplyType {
   traceIds: string[];
   vehicle: string;
   mediaSrc?: string;
-  newUserStates: any[];
+  newUserStates: NewUserStatesType[];
   actionResponses: any[];
   getStateType: number;
   code: number;
   msg: string;
   marker?: string;
 }
+
+// interface NewUserStatesType{
+
+// }

+ 1 - 2
src/scene/stream/stream.service.ts

@@ -6,7 +6,6 @@ import * as streamBuffers from 'stream-buffers';
 import { BehaviorSubject } from 'rxjs';
 import { CacheService } from 'src/cache/cache.service';
 
-
 @Injectable()
 export class StreamService {
   private channel: DataChannel;
@@ -19,7 +18,7 @@ export class StreamService {
     clipPath: '',
     metaData: '',
   });
-  constructor(private cacheService: CacheService) { }
+  constructor(private cacheService: CacheService) {}
 
   setChannel(channel: DataChannel) {
     this.channel = channel;