|
@@ -3,7 +3,7 @@ import { ClientGrpc, Client } from '@nestjs/microservices';
|
|
|
import { grpcClientOptions } from './grpc-scene.options';
|
|
|
import { Logger } from '@nestjs/common';
|
|
|
import { DataChannel } from 'node-datachannel';
|
|
|
-import { BehaviorSubject, filter, ignoreElements, take } from 'rxjs';
|
|
|
+import { BehaviorSubject } from 'rxjs';
|
|
|
// import * as streamBuffers from 'stream-buffers';
|
|
|
import { ActionType } from './actionType';
|
|
|
import { CacheService } from 'src/cache/cache.service';
|
|
@@ -23,15 +23,14 @@ export class SceneService implements OnModuleInit, OnModuleDestroy {
|
|
|
private streamService: StreamService,
|
|
|
private rotateService: RotateService,
|
|
|
private moveService: MoveService,
|
|
|
- private getRouterService: GetRouterService,
|
|
|
- // @InjectQueue('rotate') private rotateQueue: Queue,
|
|
|
- // @InjectQueue('walking') private walkingQueue: Queue,
|
|
|
+ private getRouterService: GetRouterService, // @InjectQueue('rotate') private rotateQueue: Queue, // @InjectQueue('walking') private walkingQueue: Queue,
|
|
|
) { }
|
|
|
@Client(grpcClientOptions) private readonly client: ClientGrpc;
|
|
|
|
|
|
public _frameInteval: NodeJS.Timeout;
|
|
|
public _frameTimeout: NodeJS.Timeout;
|
|
|
public _rotateTimeout: NodeJS.Timeout;
|
|
|
+ public _moveTimeout: NodeJS.Timeout;
|
|
|
public startSteaming = new BehaviorSubject<boolean>(false);
|
|
|
public onRotating = new BehaviorSubject<boolean>(false);
|
|
|
public onMoving = new BehaviorSubject<boolean>(false);
|
|
@@ -54,17 +53,18 @@ export class SceneService implements OnModuleInit, OnModuleDestroy {
|
|
|
private roQueueSubscription: any;
|
|
|
private moveQueueSubscription: any;
|
|
|
private walkingSub: any;
|
|
|
+ private joystickSub: any;
|
|
|
|
|
|
private streamServiceSub: any;
|
|
|
- private roQueue: RxQueue = new RxQueue(0);
|
|
|
+ private roQueue: RxQueue = new DelayQueue(10);
|
|
|
private clickQueue: RxQueue = new DebounceQueue(500);
|
|
|
-
|
|
|
- private moveQueue: RxQueue = new DelayQueue(600);
|
|
|
+ private moveQueue: RxQueue = new DelayQueue(100);
|
|
|
+ private joystickQueue: RxQueue = new DebounceQueue(500);
|
|
|
private rotateTimeStamp: number;
|
|
|
private lastMoveCnt = -1;
|
|
|
|
|
|
private firstRender = false;
|
|
|
- private currentMoveMaker = '';
|
|
|
+ private latestBreakPointId: number;
|
|
|
|
|
|
public lastMoveStreamFrame = new BehaviorSubject<StreamFrameType>({
|
|
|
frame: -1,
|
|
@@ -184,13 +184,6 @@ export class SceneService implements OnModuleInit, OnModuleDestroy {
|
|
|
}
|
|
|
}
|
|
|
|
|
|
- // getRoute(request: RouteRequest) {
|
|
|
- // return this.sceneGrpcService.getRoute(request);
|
|
|
- // }
|
|
|
- // getBreakPoint(request: GetBreakPointRequest) {
|
|
|
- // return this.sceneGrpcService.getBreakPoint(request);
|
|
|
- // }
|
|
|
-
|
|
|
init(request: InitRequest) {
|
|
|
try {
|
|
|
// const initReply = this.sceneGrpcService.init(request);
|
|
@@ -207,6 +200,8 @@ export class SceneService implements OnModuleInit, OnModuleDestroy {
|
|
|
}
|
|
|
|
|
|
exit(request: ExitRequest) {
|
|
|
+ this.frameCnt.next(-1);
|
|
|
+ this.stopStream();
|
|
|
// const exitReply = this.sceneGrpcService.exit(request);
|
|
|
// exitReply.subscribe((reply) => {
|
|
|
// console.log('exitReply', reply);
|
|
@@ -232,9 +227,8 @@ export class SceneService implements OnModuleInit, OnModuleDestroy {
|
|
|
);
|
|
|
const trace_id = metaData.traceIds[0];
|
|
|
const userId = newUserStates.userId;
|
|
|
- const breakPointId = lastStreamFrame.marker
|
|
|
- .replace('P', '')
|
|
|
- .replace('T', '-');
|
|
|
+ const breakPointId = metaData.breakPointId;
|
|
|
+
|
|
|
const cameraAngle = newUserStates.playerState.camera.angle;
|
|
|
const playerAngle = newUserStates.playerState.player.angle;
|
|
|
console.log('stop-data', trace_id, userId, cameraAngle, cameraAngle);
|
|
@@ -298,11 +292,176 @@ export class SceneService implements OnModuleInit, OnModuleDestroy {
|
|
|
}
|
|
|
}
|
|
|
|
|
|
+ async walking(req) {
|
|
|
+ try {
|
|
|
+ console.log('walking', req);
|
|
|
+ this.clickQueue.next(req);
|
|
|
+ this.walkingSub = this.clickQueue.subscribe(async (request) => {
|
|
|
+ const user = this.moveService.users[this.user_id];
|
|
|
+ const path = await this.getRouterService.searchRoad(
|
|
|
+ user.appId,
|
|
|
+ user.breakPointId,
|
|
|
+ req.clicking_action.clicking_point,
|
|
|
+ );
|
|
|
+ const walkingRes = await this.moveService.move(path, request);
|
|
|
+ // this.moveService.rotateForAngle();
|
|
|
+ console.log('walkingRes', walkingRes);
|
|
|
+
|
|
|
+ if (walkingRes && !this.onMoving.value) {
|
|
|
+ console.log('walkingRes-front', walkingRes);
|
|
|
+ // shift出前第一个镜头数据
|
|
|
+ const rotateCamData = walkingRes.shift();
|
|
|
+
|
|
|
+ // walkingRes marker to everybody
|
|
|
+ const seqs = Array.from(
|
|
|
+ walkingRes,
|
|
|
+ ).flat() as any as StreamReplyType[];
|
|
|
+
|
|
|
+ if (seqs?.length) {
|
|
|
+ this.handleSeqMoving(seqs);
|
|
|
+ } else {
|
|
|
+ console.log('walking无数据');
|
|
|
+ }
|
|
|
+ // this.lastMoveCnt = this.frameCnt.value + seqs.length;
|
|
|
+ }
|
|
|
+ });
|
|
|
+ } catch (error) {
|
|
|
+ this.logger.error('walking', error);
|
|
|
+ }
|
|
|
+ }
|
|
|
+
|
|
|
async joystick(request: JoystickRequest) {
|
|
|
- console.log('JoystickRequest', request);
|
|
|
- const res = await this.moveService.joystick(request);
|
|
|
- console.log('res', res);
|
|
|
- // return this.sceneGrpcService.joystick(request);
|
|
|
+ try {
|
|
|
+ this.joystickQueue.next(request);
|
|
|
+ if (!this.joystickSub) {
|
|
|
+ this.joystickSub = this.joystickQueue.subscribe(
|
|
|
+ async (joystickRequest) => {
|
|
|
+ const joystickRes = await this.moveService.joystick(
|
|
|
+ joystickRequest,
|
|
|
+ );
|
|
|
+ console.log('joystickRes-front', joystickRes);
|
|
|
+ // 有数据 [0]是rotate数据,[1-infinity]是walking数据
|
|
|
+ if (Array.isArray(joystickRes)) {
|
|
|
+ // shift出前第一个镜头数据
|
|
|
+ const rotateCamData = joystickRes.shift();
|
|
|
+ console.log('rotateCamData', rotateCamData);
|
|
|
+ console.log('joystickRes-end', joystickRes);
|
|
|
+
|
|
|
+ if (rotateCamData?.length) {
|
|
|
+ }
|
|
|
+ const seqs = Array.from(
|
|
|
+ joystickRes,
|
|
|
+ ).flat() as any as StreamReplyType[];
|
|
|
+ if (seqs?.length) {
|
|
|
+ this.handleSeqMoving(seqs);
|
|
|
+ } else {
|
|
|
+ console.log('walking无数据');
|
|
|
+ }
|
|
|
+ } else {
|
|
|
+ console.log('转交数据');
|
|
|
+ this.streamService.pushNormalDataToStream(request);
|
|
|
+ }
|
|
|
+ },
|
|
|
+ );
|
|
|
+ }
|
|
|
+ } catch (error) {
|
|
|
+ this.logger.error('joystick', error);
|
|
|
+ }
|
|
|
+ }
|
|
|
+ /**
|
|
|
+ * 主要处理moving的序列动作
|
|
|
+ * @param seqs StreamReplyType[]
|
|
|
+ */
|
|
|
+ handleSeqMoving(seqs: StreamReplyType[]) {
|
|
|
+ if (!this.moveQueueSubscription) {
|
|
|
+ this.handleMoveSteam();
|
|
|
+ }
|
|
|
+ console.log('walking-seqs', seqs);
|
|
|
+ this.onMoving.next(true);
|
|
|
+ this.holdSteam();
|
|
|
+ this.moveframeCnt = this.frameCnt.value;
|
|
|
+ seqs.forEach((frame: StreamReplyType) => {
|
|
|
+ const mediaSrc = frame.mediaSrc;
|
|
|
+ let src = mediaSrc.split('?')[0];
|
|
|
+ // 临时本地替换路经
|
|
|
+ src = src.replace('/0000000001/', '');
|
|
|
+ const clipPath = join(__dirname, `../ws/video/${src}`);
|
|
|
+ this.moveframeCnt += 1;
|
|
|
+ delete frame.mediaSrc;
|
|
|
+ const stream: StreamFrameType = {
|
|
|
+ frame: this.moveframeCnt,
|
|
|
+ clipPath: clipPath,
|
|
|
+ metaData: JSON.stringify(frame),
|
|
|
+ serverTime: this.mockserverTime,
|
|
|
+ DIR: 1,
|
|
|
+ };
|
|
|
+ this.moveQueue.next(stream);
|
|
|
+ });
|
|
|
+ }
|
|
|
+
|
|
|
+ cleanMoveSteam() {
|
|
|
+ if (this.moveQueueSubscription) {
|
|
|
+ this.moveQueueSubscription.unsubscribe();
|
|
|
+ this.lastMoveCnt = -1;
|
|
|
+ this.moveQueueSubscription = null;
|
|
|
+ }
|
|
|
+ if (this.walkingSub) {
|
|
|
+ this.walkingSub.unsubscribe();
|
|
|
+ this.walkingSub = null;
|
|
|
+ }
|
|
|
+ }
|
|
|
+ handleMoveSteam() {
|
|
|
+ this.moveQueueSubscription = this.moveQueue.subscribe(
|
|
|
+ async (stream: StreamFrameType) => {
|
|
|
+ const metaData: StreamReplyType = JSON.parse(stream.metaData);
|
|
|
+
|
|
|
+ if (this.moveframeCnt === -1) {
|
|
|
+ this.moveframeCnt = this.frameCnt.value;
|
|
|
+ }
|
|
|
+ this.moveframeCnt += 1;
|
|
|
+ this.latestBreakPointId = metaData.breakPointId;
|
|
|
+ // if (this.onMoving) {
|
|
|
+ // this.frameCnt.next(this.moveframeCnt);
|
|
|
+ // } else {
|
|
|
+ // console.log(
|
|
|
+ // 'handleMoveSteam stop',
|
|
|
+ // this.moveframeCnt,
|
|
|
+ // this.currentMoveMaker,
|
|
|
+ // );
|
|
|
+ // this.cleanMoveSteam();
|
|
|
+ // this.resumeStream();
|
|
|
+ // return;
|
|
|
+ // }
|
|
|
+ const streamData: StreamFrameType = {
|
|
|
+ frame: this.moveframeCnt,
|
|
|
+ clipPath: stream.clipPath,
|
|
|
+ metaData: stream.metaData,
|
|
|
+ serverTime: this.mockserverTime,
|
|
|
+ DIR: 3,
|
|
|
+ };
|
|
|
+ this.lastMoveStreamFrame.next(streamData);
|
|
|
+ const res = await this.streamService.pushFrameToSteam(streamData);
|
|
|
+ if (res.done) {
|
|
|
+ clearTimeout(this._moveTimeout);
|
|
|
+ this._moveTimeout = setTimeout(() => {
|
|
|
+ console.log('move 交权给空流', Date.now());
|
|
|
+ console.log('move end');
|
|
|
+ //TODO 每个结束点 updateUser metaData
|
|
|
+ const lastFrame = this.lastMoveStreamFrame.getValue();
|
|
|
+ const lastFrameMeta = JSON.parse(lastFrame.metaData);
|
|
|
+
|
|
|
+ const userId = this.user_id;
|
|
|
+ const breakPointId = lastFrameMeta.breakPointId;
|
|
|
+ const lastReply = lastFrameMeta;
|
|
|
+ this.moveService.updateUser(userId, breakPointId, lastReply);
|
|
|
+ this.frameCnt.next(res.frame);
|
|
|
+ this.resumeStream();
|
|
|
+ this.rotateframeCnt = -1;
|
|
|
+ this.onMoving.next(false);
|
|
|
+ }, 300);
|
|
|
+ }
|
|
|
+ },
|
|
|
+ );
|
|
|
}
|
|
|
|
|
|
handleDataChanelOpen(channel: DataChannel): void {
|
|
@@ -391,81 +550,6 @@ export class SceneService implements OnModuleInit, OnModuleDestroy {
|
|
|
// }
|
|
|
}
|
|
|
|
|
|
- async walking(req) {
|
|
|
- try {
|
|
|
- console.log('walking', req);
|
|
|
- this.clickQueue.next(req);
|
|
|
- this.walkingSub = this.clickQueue.subscribe(async (request) => {
|
|
|
- const user = this.moveService.users[this.user_id];
|
|
|
- const path = await this.getRouterService.searchRoad(
|
|
|
- user.appId,
|
|
|
- user.breakPointId,
|
|
|
- req.clicking_action.clicking_point,
|
|
|
- );
|
|
|
- const walkingRes = await this.moveService.move(path, request);
|
|
|
- // this.moveService.rotateForAngle();
|
|
|
- console.log('walkingRes', walkingRes);
|
|
|
- // debugger;
|
|
|
-
|
|
|
- if (walkingRes && !this.onMoving.value) {
|
|
|
- // walkingRes marker to everybody
|
|
|
- const res: ArrayLike<unknown> = Object.keys(walkingRes).map(
|
|
|
- (item) => {
|
|
|
- console.log('item', item);
|
|
|
- return Array.from(walkingRes[item]).map((i) => {
|
|
|
- i['marker'] = item;
|
|
|
- return i;
|
|
|
- });
|
|
|
- },
|
|
|
- );
|
|
|
- //
|
|
|
- const seqs = Array.from(res).flat();
|
|
|
-
|
|
|
- if (seqs?.length) {
|
|
|
- if (!this.moveQueueSubscription) {
|
|
|
- this.handleMoveSteam();
|
|
|
- }
|
|
|
- console.log('walking-seqs', seqs);
|
|
|
- this.onMoving.next(true);
|
|
|
- this.holdSteam();
|
|
|
- // if (this.moveframeCnt === -1) {
|
|
|
- // this.moveframeCnt = this.frameCnt.value;
|
|
|
- // }
|
|
|
- this.moveframeCnt = this.frameCnt.value;
|
|
|
- seqs.forEach((frame: StreamReplyType) => {
|
|
|
- const mediaSrc = frame.mediaSrc;
|
|
|
- let src = mediaSrc.split('?')[0];
|
|
|
- // 临时本地替换路经
|
|
|
- src = src.replace('/0000000001/', '');
|
|
|
- const clipPath = join(__dirname, `../ws/video/${src}`);
|
|
|
- this.moveframeCnt += 1;
|
|
|
- delete frame.mediaSrc;
|
|
|
- const stream: StreamFrameType = {
|
|
|
- frame: this.moveframeCnt,
|
|
|
- clipPath: clipPath,
|
|
|
- metaData: JSON.stringify(frame),
|
|
|
- serverTime: this.mockserverTime,
|
|
|
- DIR: 1,
|
|
|
- };
|
|
|
-
|
|
|
- // this.walkingQueue.add(stream, {
|
|
|
- // delay: 10,
|
|
|
- // jobId: `walking:${this.user_id}:${this.moveframeCnt}`,
|
|
|
- // // lifo: true,
|
|
|
- // });
|
|
|
- this.moveQueue.next(stream);
|
|
|
- });
|
|
|
- } else {
|
|
|
- console.log('walking无数据');
|
|
|
- }
|
|
|
- // this.lastMoveCnt = this.frameCnt.value + seqs.length;
|
|
|
- }
|
|
|
- });
|
|
|
- } catch (error) {
|
|
|
- this.logger.error('walking', error);
|
|
|
- }
|
|
|
- }
|
|
|
-
|
|
|
async handleBreath(request) {
|
|
|
const npsRes = await this.moveService.getBreakPoints(request);
|
|
|
// console.log('npsRes', npsRes);
|
|
@@ -481,7 +565,6 @@ export class SceneService implements OnModuleInit, OnModuleDestroy {
|
|
|
}
|
|
|
async updateUserStatus(request) {
|
|
|
try {
|
|
|
- //TODO 接入redis数据
|
|
|
const redisMeta = await this.rotateService.getNewUserStateRequest(
|
|
|
request,
|
|
|
);
|
|
@@ -520,6 +603,7 @@ export class SceneService implements OnModuleInit, OnModuleDestroy {
|
|
|
const redisData = await this.rotateService.echo(this.user_id);
|
|
|
this.onSteaming = true;
|
|
|
this.holdSteam();
|
|
|
+ console.log('redisData', redisData);
|
|
|
if (redisData && 'mediaSrc' in redisData) {
|
|
|
const mediaSrc: string = redisData.mediaSrc || '';
|
|
|
if (mediaSrc.length > 0) {
|
|
@@ -561,9 +645,13 @@ export class SceneService implements OnModuleInit, OnModuleDestroy {
|
|
|
metaData: JSON.stringify(redisDataAuto),
|
|
|
};
|
|
|
this.streamService.pushMetaDataToSteam(streamMeta);
|
|
|
+ } else {
|
|
|
+ this.stopStream();
|
|
|
+ throw new Error('空流无Redis数据');
|
|
|
}
|
|
|
}
|
|
|
} catch (error) {
|
|
|
+ this.stopStream();
|
|
|
this.logger.error('handleStream', error);
|
|
|
}
|
|
|
});
|
|
@@ -601,56 +689,4 @@ export class SceneService implements OnModuleInit, OnModuleDestroy {
|
|
|
},
|
|
|
);
|
|
|
}
|
|
|
-
|
|
|
- cleanMoveSteam() {
|
|
|
- if (this.moveQueueSubscription) {
|
|
|
- this.moveQueueSubscription.unsubscribe();
|
|
|
- this.lastMoveCnt = -1;
|
|
|
- this.moveQueueSubscription = null;
|
|
|
- }
|
|
|
- if (this.walkingSub) {
|
|
|
- this.walkingSub.unsubscribe();
|
|
|
- this.walkingSub = null;
|
|
|
- }
|
|
|
- }
|
|
|
- handleMoveSteam() {
|
|
|
- this.moveQueueSubscription = this.moveQueue.subscribe(
|
|
|
- async (stream: StreamFrameType) => {
|
|
|
- const metaData: StreamReplyType = JSON.parse(stream.metaData);
|
|
|
- console.log('handleMoveSteam-onMoving', this.onMoving.value);
|
|
|
- stream.marker = metaData.marker;
|
|
|
- this.lastMoveStreamFrame.next(stream);
|
|
|
- const next = this.frameCnt.value + 1;
|
|
|
- this.currentMoveMaker = metaData.marker;
|
|
|
- if (this.onMoving) {
|
|
|
- this.frameCnt.next(next);
|
|
|
- } else {
|
|
|
- console.log('handleMoveSteam stop', next, this.currentMoveMaker);
|
|
|
- this.cleanMoveSteam();
|
|
|
- this.resumeStream();
|
|
|
- return;
|
|
|
- }
|
|
|
- const streamData: StreamFrameType = {
|
|
|
- frame: next,
|
|
|
- clipPath: stream.clipPath,
|
|
|
- metaData: stream.metaData,
|
|
|
- serverTime: this.mockserverTime,
|
|
|
- DIR: 3,
|
|
|
- };
|
|
|
- await this.streamService.pushFrameToSteam(streamData);
|
|
|
-
|
|
|
- if (this.lastMoveCnt == this.frameCnt.getValue()) {
|
|
|
- const next = this.frameCnt.getValue() + 1;
|
|
|
- console.log('last', next);
|
|
|
- this.resumeStream();
|
|
|
- this.cleanMoveSteam();
|
|
|
- const lastFrame = this.lastMoveStreamFrame.getValue();
|
|
|
- const userId = this.user_id;
|
|
|
- const breakPointId = lastFrame.marker.split('T')[1];
|
|
|
- const lastReply = JSON.parse(lastFrame.metaData);
|
|
|
- this.moveService.updateUser(userId, breakPointId, lastReply);
|
|
|
- }
|
|
|
- },
|
|
|
- );
|
|
|
- }
|
|
|
}
|