import { Injectable, OnModuleDestroy, OnModuleInit } from '@nestjs/common'; import { ClientGrpc, Client } from '@nestjs/microservices'; import { grpcClientOptions } from './grpc-scene.options'; import { Logger } from '@nestjs/common'; import { DataChannel } from 'node-datachannel'; import * as path from 'path'; import { BehaviorSubject, filter, ignoreElements, take } from 'rxjs'; // import * as streamBuffers from 'stream-buffers'; import { ActionType } from './actionType'; import { CacheService } from 'src/cache/cache.service'; import { StreamService } from './stream/stream.service'; import { InjectQueue } from '@nestjs/bull'; import { Queue } from 'bull'; import { RotateService } from 'src/rotate/rotate.service'; import { DelayQueue, RxQueue, ThrottleQueue, DebounceQueue } from 'rx-queue'; import { MoveService } from 'src/move/move.service'; import { GetRouterService } from 'src/get-router/get-router.service'; @Injectable() export class SceneService implements OnModuleInit, OnModuleDestroy { constructor( private cacheService: CacheService, private streamService: StreamService, private rotateService: RotateService, private moveService: MoveService, private getRouterService: GetRouterService, @InjectQueue('rotate') private rotateQueue: Queue, ) { } @Client(grpcClientOptions) private readonly client: ClientGrpc; private sceneGrpcService: SceneGrpcService; private logger: Logger = new Logger('SceneService'); private frameCntInterval = 1000; public _frameInteval: NodeJS.Timeout; public _frameTimeout: NodeJS.Timeout; private channel: DataChannel; public startSteaming = new BehaviorSubject(false); private user_id: string; private roomId: string; private onSteaming = false; private rotateframeCnt = -1; private mockserverTime = Date.now() - 1653000000478; private lastRenderMedia = ''; private frameCnt = new BehaviorSubject(-1); private frameCntSubscription: any; private roQueueSubscription: any; private moveQueueSubscription: any; private walkingSub: any; private streamServiceSub: any; private roQueue: RxQueue = new DelayQueue(80); private clickQueue: RxQueue = new DebounceQueue(500); private moveQueue: RxQueue = new DelayQueue(100); private rotateTimeStamp: number; private lastMoveCnt = -1; private currentMoveMaker = ''; private onMoving = false; private onRotating = false; private firstRender = false; private currentPoint = ''; public lastMoveStreamFrame = new BehaviorSubject({ frame: -1, clipPath: '', metaData: '', }); onModuleInit(): void { this.sceneGrpcService = this.client.getService('SceneGrpcService'); this.logger.log('init SceneGrpcService'); this.streamServiceSub = this.streamService.onSteaming.subscribe((val) => { this.onSteaming = val; }); Number.prototype.padLeft = function (n, str) { return Array(n - String(this).length + 1).join(str || '0') + this; }; } startStream(): void { clearInterval(this._frameInteval); if (this.frameCnt.value === -1) { this._frameInteval = setInterval(async () => { const next = this.frameCnt.value + 1; this.frameCnt.next(next); }, 1000); } } holdSteam(): void { clearInterval(this._frameInteval); } resumeStream(value: number) { this.onMoving = false; this.frameCnt.next(value); clearTimeout(this._frameTimeout); clearInterval(this._frameInteval); this._frameTimeout = setTimeout(() => { this._frameInteval = setInterval(async () => { const next = this.frameCnt.getValue() + 1; this.frameCnt.next(next); }, 1000); }, 1000); } stopStream(): void { if (this.frameCntSubscription) { this.frameCntSubscription.unsubscribe(); this.frameCntSubscription = null; } if (this.roQueueSubscription) { this.roQueueSubscription.unsubscribe(); this.roQueueSubscription = null; } if (this.moveQueueSubscription) { this.moveQueueSubscription.unsubscribe(); this.moveQueueSubscription = null; } this.frameCnt.next(-1); clearInterval(this._frameInteval); this.rotateframeCnt = -1; } setConfig(user_id: string, roomId: string): void { this.user_id = user_id; this.roomId = roomId; } onModuleDestroy() { if ('unsubscribe' in this.streamServiceSub) { this.streamService.onSteaming.unsubscribe(); } } getRoute(request: RouteRequest) { return this.sceneGrpcService.getRoute(request); } getBreakPoint(request: GetBreakPointRequest) { return this.sceneGrpcService.getBreakPoint(request); } init(request: InitRequest) { try { // const initReply = this.sceneGrpcService.init(request); // initReply.subscribe((reply) => { // console.log('initReply', reply); // }); this.rotateService.init(request.app_id, request.user_id); this.moveService.init(request.app_id, request.user_id); } catch (error) { console.log('error', error); } } exit(request: ExitRequest) { // const exitReply = this.sceneGrpcService.exit(request); // exitReply.subscribe((reply) => { // console.log('exitReply', reply); // }); } async rotate(request: RotateRequest) { try { if (!this.roQueueSubscription) { this.handleRotateStream(); } if (!this.onSteaming) { let redisMeta: StreamReplyType; this.onRotating = true; if (this.onMoving) { const lastStreamFrame = this.lastMoveStreamFrame.getValue(); const metaData: StreamReplyType = JSON.parse( lastStreamFrame.metaData, ) as any as StreamReplyType; const newUserStates: NewUserStatesType = metaData.newUserStates.find( (item) => item.userId === this.user_id, ); const trace_id = metaData.traceIds[0]; const userId = newUserStates.userId; const breakPointId = lastStreamFrame.marker .replace('P', '') .replace('T', '-'); const cameraAngle = newUserStates.playerState.camera.angle; const playerAngle = newUserStates.playerState.player.angle; this.onMoving = false; console.log('stop-data', trace_id, userId, cameraAngle, cameraAngle); redisMeta = await this.moveService.stop( trace_id, userId, breakPointId, cameraAngle, playerAngle, ); console.log('stop-redisMeta', redisMeta); // redisMeta = await this.rotateService.rotate(request); } else { redisMeta = await this.rotateService.rotate(request); } if (redisMeta && 'mediaSrc' in redisMeta) { const mediaSrc: string = redisMeta.mediaSrc || ''; if (mediaSrc.length > 0) { let src = mediaSrc.split('?')[0]; // 临时本地替换路经 src = src.replace('/0000000001/', ''); // 判断不是同一条源时才推出 if (this.lastRenderMedia !== src) { // console.log('不同源'); // this.frameCnt += 1; console.log('[core-src]', src); this.holdSteam(); this.lastRenderMedia = src; const clipPath = path.join(__dirname, `../ws/video/${src}`); // console.log('src-clipPath', src, clipPath); delete redisMeta.mediaSrc; const stream: StreamFrameType = { frame: -1, clipPath: clipPath, metaData: JSON.stringify(redisMeta), serverTime: this.mockserverTime, DIR: 3, }; this.roQueue.next(stream); } } } } } catch (error) { this.logger.error('rotate', error); console.log('error', error); } } joystick(request: JoystickRequest) { return this.sceneGrpcService.joystick(request); } handleDataChanelOpen(channel: DataChannel): void { this.channel = channel; this.streamService.setChannel(channel); this.startSteaming.next(true); this.startStream(); this.handleStream(); } handleDataChanelClose(): void { this.stopStream(); this.startSteaming.next(false); this.streamService.closeChannel(); const exitRequest: ExitRequest = { action_type: 1002, user_id: this.user_id, trace_id: '', }; this.exit(exitRequest); } handleMessage(message: string | Buffer) { try { if (typeof message === 'string') { // wasm:特例, requestIframe if (message.includes('wasm:')) { const msg: RTCMessageRequest = JSON.parse( message.replace('wasm:', ''), ); if (msg.MstType === 0) { this.logger.log('lost I frame'); this.handleIframeRequest(); } } else { const msg: RTCMessageRequest = JSON.parse(message); switch (msg.action_type) { case ActionType.walk: const walk = msg; this.walking(walk); break; case ActionType.breathPoint: this.handleBreath(msg); break; case ActionType.rotate: const rotateRequest: RotateRequest = msg; this.rotate(rotateRequest); break; case ActionType.userStatus: this.updateUserStatus(msg); break; case ActionType.status: this.updateStatus(); break; default: break; } } } } catch (error) { this.logger.error('handleMessage:rtc--error', message); } } handleIframeRequest() { const lastStreamFrame = this.streamService.lastStreamFrame.getValue(); lastStreamFrame.DIR = 1; console.log('lastStreamFrame', lastStreamFrame); const nextFrame = this.frameCnt.getValue() + 1; lastStreamFrame.frame = nextFrame; this.streamService.pushFrameToSteam(lastStreamFrame); } async walking(req) { console.log('walking', req); this.clickQueue.next(req); this.walkingSub = this.clickQueue.subscribe(async (request) => { const user = this.moveService.users[this.user_id]; const path = await this.getRouterService.searchRoad( user.appId, user.breakPointId, req.clicking_action.clicking_point, ); const walkingRes = await this.moveService.move(path, request); console.log('walkingRes', walkingRes) debugger; if (walkingRes && !this.onMoving) { this.onMoving = true; this.holdSteam(); if (!this.moveQueueSubscription) { this.handleMoveSteam(); } const res = Object.keys(walkingRes).map((item) => { console.log('item', item); return Array.from(walkingRes[item]).map((i) => { i['marker'] = item; return i; }); }); const seqs = Array.from(res).flat(); this.lastMoveCnt = this.frameCnt.value + seqs.length; console.log('lastMoveCnt', this.lastMoveCnt); seqs.forEach((frame: StreamReplyType) => { const mediaSrc = frame.mediaSrc; delete frame.mediaSrc; const stream: StreamFrameType = { frame: -1, clipPath: mediaSrc, metaData: JSON.stringify(frame), serverTime: this.mockserverTime, DIR: 1, }; this.moveQueue.next(stream); }); } }); } async handleBreath(request) { const npsRes = await this.moveService.getBreakPoints(request); console.log('npsRes', npsRes); this.streamService.pushNormalDataToStream(npsRes); } updateStatus() { const reply = { data: { action_type: 1009, echo_msg: { echoMsg: Date.now() } }, track: false, }; this.streamService.pushNormalDataToStream(reply); } async updateUserStatus(request) { try { //TODO 接入redis数据 const redisMeta = await this.rotateService.getNewUserStateRequest( request, ); if (redisMeta) { redisMeta.actionType = 1024; this.streamService.pushNormalDataToStream(redisMeta); } } catch (error) { this.logger.error('updateUserStatus::function', error); } } pushFirstRender(clipPath: string, metaData: string): Promise { return new Promise(async (resolve, reject) => { try { const streamData: StreamFrameType = { frame: 1, clipPath: clipPath, metaData: metaData, serverTime: this.mockserverTime, DIR: 1, }; const hasPush = await this.streamService.pushFrameToSteam(streamData); return resolve(hasPush); } catch (error) { return reject(false); } }); } handleStream() { this.frameCntSubscription = this.frameCnt.subscribe(async (frame) => { try { console.log('frame', frame); if (frame === 1) { const redisData = await this.rotateService.echo(this.user_id); this.onSteaming = true; this.holdSteam(); if (redisData && 'mediaSrc' in redisData) { const mediaSrc: string = redisData.mediaSrc || ''; if (mediaSrc.length > 0) { let src = mediaSrc.split('?')[0]; // 临时本地替换路经 src = src.replace('/0000000001/', ''); const clipPath = path.join(__dirname, `../ws/video/${src}`); delete redisData.mediaSrc; this.logger.log( `user:${this.user_id}:first render stream` + JSON.stringify({ path: clipPath, meta: redisData }), ); const status = await this.pushFirstRender( clipPath, JSON.stringify(redisData), ); if (status) { this.firstRender = true; this.resumeStream(2); } else { this.logger.error('first render problem', status); } } } } if ( frame > 1 && !this.onSteaming && !this.onMoving && this.firstRender ) { console.log(`空白流::${frame}`); const redisDataAuto = await this.rotateService.echo(this.user_id); if (redisDataAuto) { 'mediaSrc' in redisDataAuto && delete redisDataAuto.mediaSrc; const streamMeta: StreamMetaType = { frame: frame, metaData: JSON.stringify(redisDataAuto), }; this.streamService.pushMetaDataToSteam(streamMeta); } } } catch (error) { this.logger.error('handleStream', error); } }); } handleRotateStream() { this.roQueueSubscription = this.roQueue.subscribe( async (stream: StreamFrameType) => { this.rotateTimeStamp = Date.now(); if (this.rotateframeCnt === -1) { this.rotateframeCnt = this.frameCnt.value; } this.rotateframeCnt += 1; stream.frame = this.rotateframeCnt; console.log('[media]', stream.clipPath); this.logger.log( `roQueueSubscription:frame:${this.rotateframeCnt} ` + JSON.stringify(stream.metaData), ); await this.streamService.pushFrameToSteam(stream); setTimeout(() => { const now = Date.now(); if (now - this.rotateTimeStamp > 300) { const next = this.rotateframeCnt + 1; this.resumeStream(next); this.rotateframeCnt = -1; this.onMoving = false; this.onRotating = false; } }, 300); }, ); } cleanMoveSteam() { if (this.moveQueueSubscription) { this.moveQueueSubscription.unsubscribe(); this.lastMoveCnt = -1; this.moveQueueSubscription = null; } if (this.walkingSub) { this.walkingSub.unsubscribe(); this.walkingSub = null; } } handleMoveSteam() { this.moveQueueSubscription = this.moveQueue.subscribe( async (stream: StreamFrameType) => { const metaData: StreamReplyType = JSON.parse(stream.metaData); console.log('handleMoveSteam-onMoving', this.onMoving); stream.marker = metaData.marker; this.lastMoveStreamFrame.next(stream); const next = this.frameCnt.value + 1; this.currentMoveMaker = metaData.marker; if (this.onMoving) { this.frameCnt.next(next); } else { console.log('handleMoveSteam stop', next, this.currentMoveMaker); this.cleanMoveSteam(); this.resumeStream(next); return; } let src = stream.clipPath.split('?')[0]; // // 临时本地替换路经 src = src.replace('/0000000001/', ''); const clipPath = path.join(__dirname, `../ws/video/${src}`); const streamData: StreamFrameType = { frame: next, clipPath: clipPath, metaData: stream.metaData, serverTime: this.mockserverTime, DIR: 3, }; await this.streamService.pushFrameToSteam(streamData); if (this.lastMoveCnt == this.frameCnt.getValue()) { const next = this.frameCnt.getValue() + 1; console.log('last', next); this.resumeStream(next); this.cleanMoveSteam(); const lastFrame = this.lastMoveStreamFrame.getValue(); const userId = this.user_id; const breakPointId = lastFrame.marker.split('T')[1]; const lastReply = JSON.parse(lastFrame.metaData); this.moveService.updateUser(userId, breakPointId, lastReply); } }, ); } }