import { Injectable, OnModuleDestroy, OnModuleInit } from '@nestjs/common'; import { ClientGrpc, Client } from '@nestjs/microservices'; import { grpcClientOptions } from './grpc-scene.options'; import { Logger } from '@nestjs/common'; import { DataChannel } from 'node-datachannel'; import { BehaviorSubject } from 'rxjs'; // import * as streamBuffers from 'stream-buffers'; import { ActionType } from './actionType'; import { CacheService } from 'src/cache/cache.service'; import { StreamService } from './stream/stream.service'; // import { InjectQueue } from '@nestjs/bull'; // import { Queue } from 'bull'; import { RotateService } from 'src/rotate/rotate.service'; import { DelayQueue, RxQueue, ThrottleQueue, DebounceQueue } from 'rx-queue'; import { MoveService } from 'src/move/move.service'; import { GetRouterService } from 'src/get-router/get-router.service'; import { ConfigService } from '@nestjs/config'; @Injectable() export class SceneService implements OnModuleInit, OnModuleDestroy { constructor( private configService: ConfigService, private cacheService: CacheService, private streamService: StreamService, private rotateService: RotateService, private moveService: MoveService, private getRouterService: GetRouterService, // @InjectQueue('rotate') private rotateQueue: Queue, // @InjectQueue('walking') private walkingQueue: Queue, ) {} @Client(grpcClientOptions) private readonly client: ClientGrpc; public _frameInteval: NodeJS.Timeout; public _frameTimeout: NodeJS.Timeout; public _rotateTimeout: NodeJS.Timeout; public _moveTimeout: NodeJS.Timeout; public startSteaming = new BehaviorSubject(false); public onRotating = new BehaviorSubject(false); public onMoving = new BehaviorSubject(false); public frameCnt = new BehaviorSubject(-1); private rotateframeCnt = -1; private moveframeCnt = -1; private moveKeyFrame = -1; private sceneGrpcService: SceneGrpcService; private channel: DataChannel; private logger: Logger = new Logger('SceneService'); private frameCntInterval = 1000; private user_id: string; private roomId: string; private onSteaming = false; private mockserverTime = Date.now() - 1653000000478; private lastRenderMedia = ''; private frameCntSubscription: any; private roQueueSubscription: any; private moveQueueSubscription: any; private walkingSub: any; private joystickSub: any; private clickQueueSub: any; private streamServiceSub: any; // private roRequestQueue: RxQueue = new DelayQueue(20); private roQueue: RxQueue = new DelayQueue(10); private clickQueue: RxQueue = new DebounceQueue(500); private moveQueue: RxQueue = new DelayQueue(20); private joystickQueue: RxQueue = new DebounceQueue(500); private requestIFrameQueue: RxQueue = new DebounceQueue(2000); private requestIFrameQueueSub: any; private roRequestQueueSub: any; private rotateTimeStamp: number; private lastMoveCnt = -1; private firstRender = false; private latestBreakPointId: number; private isHoldingStream = false; public lastMoveStreamFrame = new BehaviorSubject({ frame: -1, clipPath: '', metaData: '', }); public users = {}; // initUsers(app_id, userId) { // const user = { // appId: null, // userId: null, // breakPointId: null, // roomId: null, // player: { // position: { x: -700, y: 0, z: 0 }, // angle: { // pitch: 0, // yaw: 0, // roll: 0, // }, // }, // camera: { // position: { x: -1145, y: 0, z: 160 }, // angle: { // pitch: 0, // yaw: 0, // roll: 0, // }, // }, // rotateInfo: { // frameIndex: 0, // horizontal_move: 0, // mediaSrc: null, // }, // moveInfo: {}, // // traceIds: [], // // actionResponses:[] // }; // user.appId = app_id; // user.userId = userId; // user.breakPointId = 100; // this.users[userId] = user; // } onModuleInit(): void { this.sceneGrpcService = this.client.getService('SceneGrpcService'); this.logger.log('init SceneGrpcService'); this.streamServiceSub = this.streamService.onSteaming.subscribe((val) => { this.onSteaming = val; }); Number.prototype.padLeft = function (n, str) { return Array(n - String(this).length + 1).join(str || '0') + this; }; } public isHeaderOrLast(index: number, length: number): boolean { if (index === 0 || index === length) { return true; } else { return false; } } public getConfig() { return { userId: this.user_id, roomId: this.roomId, }; } public startStream(): void { clearInterval(this._frameInteval); if (this.frameCnt.value === -1) { this._frameInteval = setInterval(async () => { const next = this.frameCnt.value + 1; this.frameCnt.next(next); }, 1000); } } public holdSteam(): void { clearInterval(this._frameInteval); this.isHoldingStream = true; } public resumeStream(): void { this.onMoving.next(false); this.onRotating.next(false); this.isHoldingStream = false; this.moveframeCnt = -1; this.rotateframeCnt = -1; clearInterval(this._frameInteval); this._frameInteval = setInterval(async () => { const next = this.frameCnt.getValue() + 1; this.frameCnt.next(next); }, 1000); } public stopStream(): void { if (this.frameCntSubscription) { this.frameCntSubscription.unsubscribe(); this.frameCntSubscription = null; } if (this.roQueueSubscription) { this.roQueueSubscription.unsubscribe(); this.roQueueSubscription = null; } if (this.moveQueueSubscription) { this.moveQueueSubscription.unsubscribe(); this.moveQueueSubscription = null; } this.frameCnt.next(-1); clearInterval(this._frameInteval); this.rotateframeCnt = -1; } setConfig(user_id: string, roomId: string): void { this.user_id = user_id; this.roomId = roomId; } onModuleDestroy() { if ('unsubscribe' in this.streamServiceSub) { this.streamService.onSteaming.unsubscribe(); } } init(request: InitRequest) { try { this.rotateService.init(request.app_id, request.user_id); this.startSteaming.next(true); this.startStream(); this.handleStream(); // this.moveService.init(request.app_id, request.user_id); // this.initUsers(request.app_id, request.user_id); } catch (error) { console.log('error', error); } } exit(request: ExitRequest) { this.frameCnt.next(-1); this.stopStream(); // const exitReply = this.sceneGrpcService.exit(request); // exitReply.subscribe((reply) => { // console.log('exitReply', reply); // }); } async rotate(request: RotateRequest) { this.handleRotate(request); // console.log('request', request) // this.roRequestQueue.next(request); // if (!this.roRequestQueueSub) { // this.handleRotate(); // } } /** * rotate请求队列 */ async handleRotate(request) { // this.roRequestQueueSub = this.roRequestQueue.subscribe( // async (request: RotateRequest) => { try { if (this.firstRender) { if (!this.roQueueSubscription) { this.handleRotateStream(); } let redisMeta: StreamReplyType; this.onRotating.next(true); // 当move时处理 if (this.onMoving.value) { this.onMoving.next(false); const lastStreamFrame = this.lastMoveStreamFrame.getValue(); const metaData: StreamReplyType = JSON.parse( lastStreamFrame.metaData, ) as any as StreamReplyType; const newUserStates: NewUserStatesType = metaData.newUserStates.find( (item) => item.userId === this.user_id, ); const trace_id = metaData.traceIds[0]; const userId = newUserStates.userId; const breakPointId = metaData.breakPointId; const cameraAngle = newUserStates.playerState.camera.angle; const playerAngle = newUserStates.playerState.player.angle; console.log('stop-data', trace_id, userId, cameraAngle, cameraAngle); redisMeta = await this.moveService.stop( trace_id, userId, breakPointId, cameraAngle, playerAngle, ); console.log('stop-redisMeta', redisMeta); // redisMeta = await this.rotateService.rotate(request); } else { // 正常rotate redisMeta = await this.rotateService.seqExeRotate(request); // console.log(redisMeta); //debugger; } if (redisMeta && 'mediaSrc' in redisMeta) { const mediaSrc: string = redisMeta.mediaSrc || ''; if (mediaSrc.length > 0) { const src = mediaSrc.split('?')[0]; if (src.length > 0) { // console.log('不同源'); this.holdSteam(); this.lastRenderMedia = src; const clipPath = this.configService.get('app.prefix') + src; //TODO 临时开出 // delete redisMeta.mediaSrc; const random_boolean = Math.random() < 0.3; const stream: StreamFrameType = { frame: -1, clipPath: clipPath, metaData: JSON.stringify(redisMeta), serverTime: this.mockserverTime, DIR: this.frameCnt.getValue() < 10 ? 3 : random_boolean ? 1 : 3, }; // console.log('rotate', stream, Date.now()); clearTimeout(this._rotateTimeout); this.roQueue.next(stream); } else { // this.onRotating.next(false); } } } } } catch (error) { this.logger.error('rotate', error); console.log('error', error); } // }, // ); } async walking(req) { try { if (this.clickQueueSub) { this.clickQueueSub.unsubscribe(); this.clickQueueSub = null; } this.clickQueue.next(req); console.log('walking', this.clickQueueSub, JSON.stringify(req)); // if (!this.clickQueueSub) { this.clickQueueSub = this.clickQueue.subscribe(async (request) => { const user = this.moveService.users[this.user_id]; console.log('进入1 - searchRoad'); const path = await this.getRouterService.searchRoad( user.appId, user.breakPointId, req.clicking_action.clicking_point, ); console.log('walking-path', path); if (!path) { console.log('不存在--path', path); this.resumeStream(); return; } // debugger; const walkingRes = await this.moveService.move(path, request); console.log('walking', walkingRes); // debugger; if (walkingRes && !this.onMoving.value) { // console.log('walkingRes-front', walkingRes); // shift出前第一个镜头数据 const rotateCamData = walkingRes[0]; console.log('rotateCamData', rotateCamData.length); if (rotateCamData?.length) { // 头数组[0] rotate 序列, 头是关键key walkingRes[0].forEach((item: StreamReplyType, index: number) => { item.mType = 'rotate'; item.DIR = index === 0 ? 1 : 3; }); } else { console.log('rotateCamData无数据'); } // 二维数组 做move 序列, move类型 for (let i = 1; i < walkingRes.length; i++) { console.log('walkingRes', walkingRes[i]); Array.from(walkingRes[i]).forEach( (item: StreamReplyType, index: number) => { const dir = this.isHeaderOrLast( index, walkingRes[i].length - 1, ); item.DIR = dir ? 1 : 3; }, ); } // walkingRes marker to everybody const seqs = Array.from( walkingRes, ).flat() as any as StreamReplyType[]; if (seqs?.length) { console.log('walking --总序列--seqs-2', seqs.length); this.handleSeqMoving(seqs); } else { console.error('walking-move无数据'); this.cleanMoveSteam(); this.resumeStream(); } } }); // } } catch (error) { this.logger.error('walking', error); this.cleanMoveSteam(); this.resumeStream(); } } async joystick(request: JoystickRequest) { try { this.joystickQueue.next(request); if (!this.joystickSub) { this.joystickSub = this.joystickQueue.subscribe( async (joystickRequest) => { const joystickRes = await this.moveService.joystick( joystickRequest, ); // 有数据 [0]是rotate数据,[1-infinity]是walking数据 if (Array.isArray(joystickRes)) { // shift出前第一个镜头数据 const rotateCamData = joystickRes.shift(); console.log('rotateCamData', rotateCamData.length); if (rotateCamData?.length) { // 头数组[0] rotate 序列, 头是关键key joystickRes[0].forEach( (item: StreamReplyType, index: number) => { item.mType = 'rotate'; item.DIR = index === 0 ? 1 : 3; }, ); } else { console.log('rotateCamData无数据'); } // 二维数组 做move 序列, move类型 for (let i = 1; i < joystickRes.length; i++) { console.log('walkingRes', joystickRes[i]); Array.from(joystickRes[i]).forEach( (item: StreamReplyType, index: number) => { const dir = this.isHeaderOrLast( index, joystickRes[i].length - 1, ); item.DIR = dir ? 1 : 3; }, ); } const seqs = Array.from( joystickRes, ).flat() as any as StreamReplyType[]; if (seqs?.length) { console.log('joystickRes-seqs', seqs.length); this.handleSeqMoving(seqs); } else { console.warn('joystick-move无数据'); } } else { console.log('转交数据'); this.streamService.pushNormalDataToStream(request); } }, ); } } catch (error) { this.logger.error('joystick', error); } } /** * 主要处理moving的序列动作 * @param seqs StreamReplyType[] */ handleSeqMoving(seqs: StreamReplyType[]) { if (!this.moveQueueSubscription) { this.handleMoveSteam(); } console.log('moving-seqs', seqs.length); this.onMoving.next(true); this.holdSteam(); seqs.forEach((frame: StreamReplyType) => { const mediaSrc = frame.mediaSrc; const src = mediaSrc.split('?')[0]; const clipPath = this.configService.get('app.prefix') + src; const type = frame.mType?.length ? frame.mType.slice() : 'move'; const stream: StreamFrameType = { frame: -1, clipPath: clipPath, metaData: JSON.stringify(frame), serverTime: this.mockserverTime, DIR: frame.DIR, mType: type, }; this.moveQueue.next(stream); }); } cleanMoveSteam() { if (this.moveQueueSubscription) { this.moveQueueSubscription.unsubscribe(); this.lastMoveCnt = -1; this.moveQueueSubscription = null; } if (this.walkingSub) { this.walkingSub.unsubscribe(); this.walkingSub = null; } // if (this.clickQueueSub) { // this.clickQueueSub.unsubscribe(); // this.clickQueueSub = null; // } } handleMoveSteam() { this.moveQueueSubscription = this.moveQueue.subscribe( async (stream: StreamFrameType) => { const metaData: StreamReplyType = JSON.parse(stream.metaData); if (this.moveframeCnt === -1) { this.moveframeCnt = this.frameCnt.value; } this.moveframeCnt += 1; this.latestBreakPointId = metaData.breakPointId; const streamData: StreamFrameType = { frame: this.moveframeCnt, clipPath: stream.clipPath, metaData: stream.metaData, serverTime: this.mockserverTime, DIR: stream.DIR, }; console.log( '[media-move]', this.moveframeCnt, stream.clipPath, stream.mType, stream.DIR, ); this.lastMoveStreamFrame.next(streamData); const res = await this.streamService.pushFrameToSteam(streamData); if (res.done) { clearTimeout(this._moveTimeout); this._moveTimeout = setTimeout(() => { console.log('move 交权给空流,当前pts', res.frame); //TODO 每个结束点 updateUser metaData const lastFrame = this.lastMoveStreamFrame.getValue(); const lastFrameMeta = JSON.parse(lastFrame.metaData); const userId = this.user_id; const breakPointId = lastFrameMeta.breakPointId; const lastReply = lastFrameMeta; this.moveService.updateUser(userId, breakPointId, lastReply); this.frameCnt.next(res.frame); this.resumeStream(); this.rotateframeCnt = -1; this.onMoving.next(false); this.cleanMoveSteam(); console.log('move end'); }, 300); } }, ); } handleDataChanelOpen(channel: DataChannel): void { this.channel = channel; this.streamService.setChannel(channel); } handleDataChanelClose(): void { this.stopStream(); this.startSteaming.next(false); this.streamService.closeChannel(); const exitRequest: ExitRequest = { action_type: 1002, user_id: this.user_id, trace_id: '', }; this.exit(exitRequest); } handleMessage(message: string | Buffer) { try { if (typeof message === 'string') { // wasm:特例, requestIframe if (message.includes('wasm:')) { const parseData = message ? String(message).replace('wasm:', '') : `{"MstType":1}`; const msg: RTCMessageRequest = JSON.parse(parseData); this.logger.warn('lostIframe-message', msg); if (msg.MstType === 0) { this.handleIframeRequest(); } } else { const msg: RTCMessageRequest = JSON.parse(message); switch (msg.action_type) { case ActionType.walk: const walk = msg; this.walking(walk); break; case ActionType.joystick: const JoystickRequest = msg as any as JoystickRequest; this.joystick(JoystickRequest); break; case ActionType.breathPoint: this.handleBreath(msg); break; case ActionType.rotate: const rotateRequest: RotateRequest = msg; this.rotate(rotateRequest); break; case ActionType.userStatus: this.updateUserStatus(msg); break; case ActionType.status: this.updateStatus(); break; default: break; } } } } catch (error) { this.logger.error('handleMessage:rtc--error', message); } } async handleIframeRequest() { //TODO Iframe 最终传什么? this.requestIFrameQueue.next(this.streamService.lastStreamFrame.getValue()); if (!this.requestIFrameQueueSub) { this.requestIFrameQueueSub = this.requestIFrameQueue.subscribe( (frameData: StreamFrameType) => { const nextFrame = this.frameCnt.getValue() + 1; this.logger.warn('lostIframe', nextFrame); frameData.frame = nextFrame; this.streamService.pushFrameToSteam(frameData); this.frameCnt.next(nextFrame); this.resumeStream(); }, ); } } handleBreath(request) { const npsRes = this.moveService.getBreakPoints(request); console.log('npsRes', npsRes.nps.length); this.streamService.pushNormalDataToStream(npsRes); } updateStatus(): void { const reply = { data: { action_type: 1009, echo_msg: { echoMsg: Date.now() } }, track: false, }; this.streamService.pushNormalDataToStream(reply); } async updateUserStatus(request) { try { const redisMeta = await this.rotateService.getNewUserStateRequest( request, ); if (redisMeta) { redisMeta.actionType = 1024; this.streamService.pushNormalDataToStream(redisMeta); } else { this.logger.error('updateUserStatus::function-empty'); } } catch (error) { this.logger.error('updateUserStatus::function', error); } } pushFirstRender(clipPath: string, metaData: string): Promise { return new Promise(async (resolve, reject) => { try { const streamData: StreamFrameType = { frame: 1, clipPath: clipPath, metaData: metaData, serverTime: this.mockserverTime, DIR: 1, }; const hasPush = await this.streamService.pushFrameToSteam(streamData); return resolve(hasPush.done); } catch (error) { return reject(false); } }); } handleStream() { this.logger.log('this.frameCntSubscription', this.frameCntSubscription); if (!this.frameCntSubscription) { this.frameCntSubscription = this.frameCnt.subscribe(async (frame) => { try { console.log('frame', frame); if (frame === 1) { const redisData = await this.rotateService.echo(this.user_id, true); console.log('获取-首屏', redisData); this.onSteaming = true; this.holdSteam(); if (redisData && 'mediaSrc' in redisData) { const mediaSrc: string = redisData.mediaSrc || ''; if (mediaSrc.length > 0) { const src = mediaSrc.split('?')[0]; // 临时本地替换路经 // src = src.replace('/10086/', ''); // const clipPath = join(__dirname, `../ws/${src}`); const clipPath = this.configService.get('app.prefix') + src; delete redisData.mediaSrc; this.logger.log( `user:${this.user_id}:first render stream` + JSON.stringify({ path: clipPath, meta: redisData }), ); const status = await this.pushFirstRender( clipPath, JSON.stringify(redisData), ); if (status) { this.firstRender = true; this.frameCnt.next(2); this.resumeStream(); } else { this.logger.error('first render problem', status); } } } else { this.logger.error(`首屏::无数据:${frame}`); } } if ( frame > 1 && !this.onMoving.value && !this.onRotating.value && this.firstRender ) { const redisDataAuto = await this.rotateService.echo( this.user_id, false, ); if (redisDataAuto) { console.log(`空白流::有数据:${frame}`); 'mediaSrc' in redisDataAuto && delete redisDataAuto.mediaSrc; const streamMeta: StreamMetaType = { frame: frame, metaData: JSON.stringify(redisDataAuto), }; this.streamService.pushMetaDataToSteam(streamMeta); } else { this.stopStream(); console.log('空流无Redis数据'); throw new Error('空流无Redis数据'); } } } catch (error) { this.stopStream(); this.logger.error('handleStream', error); } }); } } /** * rotate 推送队列 */ handleRotateStream() { if (!this.roQueueSubscription) { this.roQueueSubscription = this.roQueue.subscribe( async (stream: StreamFrameType) => { this.rotateTimeStamp = Date.now(); if (this.rotateframeCnt === -1) { this.rotateframeCnt = this.frameCnt.value; } this.rotateframeCnt += 1; stream.frame = this.rotateframeCnt; console.log( '[media-rotate]', stream.frame, this.rotateframeCnt, stream.clipPath, ); // this.logger.log( // `roQueueSubscription:frame:${this.rotateframeCnt} ` + // JSON.stringify(stream.metaData), // ); const res = await this.streamService.pushFrameToSteam(stream); if (res.done) { clearTimeout(this._rotateTimeout); this._rotateTimeout = setTimeout(() => { console.log('rotate end', Date.now()); this.frameCnt.next(res.frame); this.resumeStream(); this.rotateframeCnt = -1; this.onMoving.next(false); this.onRotating.next(false); //TODO rotate完后清除request队列 if (this.roRequestQueueSub) { this.roRequestQueueSub.unsubscribe(); this.roRequestQueueSub = null; } }, 300); } }, ); } } }