|
@@ -4,8 +4,6 @@ import { grpcClientOptions } from './grpc-scene.options';
|
|
|
import { Logger } from '@nestjs/common';
|
|
|
import { DataChannel } from 'node-datachannel';
|
|
|
import * as path from 'path';
|
|
|
-// import { statSync, createReadStream, readFileSync } from 'fs';
|
|
|
-// import { Readable } from 'stream';
|
|
|
import { BehaviorSubject } from 'rxjs';
|
|
|
// import * as streamBuffers from 'stream-buffers';
|
|
|
import { ActionType } from './actionType';
|
|
@@ -14,6 +12,7 @@ import { StreamService } from './stream/stream.service';
|
|
|
import { InjectQueue } from '@nestjs/bull';
|
|
|
import { Queue } from 'bull';
|
|
|
import { RotateService } from 'src/rotate/rotate.service';
|
|
|
+import { DelayQueue, RxQueue, ThrottleQueue, DebounceQueue } from 'rx-queue';
|
|
|
|
|
|
const frameMetaReply = {
|
|
|
traceIds: [''],
|
|
@@ -81,12 +80,11 @@ export class SceneService implements OnModuleInit, OnModuleDestroy {
|
|
|
private streamService: StreamService,
|
|
|
private rotateService: RotateService,
|
|
|
@InjectQueue('rotate') private rotateQueue: Queue,
|
|
|
- ) {}
|
|
|
+ ) { }
|
|
|
@Client(grpcClientOptions) private readonly client: ClientGrpc;
|
|
|
private sceneGrpcService: SceneGrpcService;
|
|
|
|
|
|
private logger: Logger = new Logger('SceneService');
|
|
|
- private frameCnt = -1;
|
|
|
private frameCntInterval = 1000;
|
|
|
public _frameInteval: NodeJS.Timeout;
|
|
|
private channel: DataChannel;
|
|
@@ -95,23 +93,14 @@ export class SceneService implements OnModuleInit, OnModuleDestroy {
|
|
|
private roomId: string;
|
|
|
private onSteaming = false;
|
|
|
private testFrame = -1;
|
|
|
- private RotateframeCnt = -1;
|
|
|
+ private rotateframeCnt = -1;
|
|
|
private mockserverTime = Date.now() - 1653000000478;
|
|
|
private lastRenderMedia = '';
|
|
|
- // private rotateMap = new Map()<>;
|
|
|
-
|
|
|
- setConfig(user_id: string, roomId: string) {
|
|
|
- this.user_id = user_id;
|
|
|
- this.roomId = roomId;
|
|
|
- }
|
|
|
- checkingIsRotating() {
|
|
|
- console.log('async', this.frameCnt, this.RotateframeCnt);
|
|
|
- if (this.frameCnt > 0 && this.frameCnt === this.RotateframeCnt) {
|
|
|
- return true;
|
|
|
- } else {
|
|
|
- return false;
|
|
|
- }
|
|
|
- }
|
|
|
+ private frameCnt = new BehaviorSubject<number>(-1);
|
|
|
+ private frameCntSubscription: any;
|
|
|
+ private roQueueSubscription: any;
|
|
|
+ private roQueue: RxQueue = new ThrottleQueue(50);
|
|
|
+ private rotateTimeStamp: number;
|
|
|
|
|
|
onModuleInit() {
|
|
|
this.sceneGrpcService =
|
|
@@ -125,6 +114,44 @@ export class SceneService implements OnModuleInit, OnModuleDestroy {
|
|
|
};
|
|
|
}
|
|
|
|
|
|
+ startStream() {
|
|
|
+ if (this.frameCnt.value === -1) {
|
|
|
+ this._frameInteval = setInterval(async () => {
|
|
|
+ const next = this.frameCnt.value + 1;
|
|
|
+ this.frameCnt.next(next);
|
|
|
+ }, 1000);
|
|
|
+ }
|
|
|
+ }
|
|
|
+
|
|
|
+ holdSteam() {
|
|
|
+ clearInterval(this._frameInteval);
|
|
|
+ }
|
|
|
+
|
|
|
+ resumeStream(value: number) {
|
|
|
+ this.frameCnt.next(value);
|
|
|
+ this._frameInteval = setInterval(async () => {
|
|
|
+ const next = this.frameCnt.value + 1;
|
|
|
+ this.frameCnt.next(next);
|
|
|
+ }, 1000);
|
|
|
+ }
|
|
|
+
|
|
|
+ stopStream() {
|
|
|
+ if (this.frameCntSubscription) {
|
|
|
+ this.frameCntSubscription.unsubscribe();
|
|
|
+ }
|
|
|
+ if (this.roQueueSubscription) {
|
|
|
+ this.roQueueSubscription.unsubscribe();
|
|
|
+ }
|
|
|
+ this.frameCnt.next(-1);
|
|
|
+ clearInterval(this._frameInteval);
|
|
|
+ this.rotateframeCnt = -1;
|
|
|
+ }
|
|
|
+
|
|
|
+ setConfig(user_id: string, roomId: string) {
|
|
|
+ this.user_id = user_id;
|
|
|
+ this.roomId = roomId;
|
|
|
+ }
|
|
|
+
|
|
|
onModuleDestroy() {
|
|
|
this.streamService.onSteaming.unsubscribe();
|
|
|
}
|
|
@@ -163,14 +190,13 @@ export class SceneService implements OnModuleInit, OnModuleDestroy {
|
|
|
try {
|
|
|
// const reply = this.sceneGrpcService.rotate(request);
|
|
|
if (!this.onSteaming) {
|
|
|
- this.RotateframeCnt = this.frameCnt;
|
|
|
// const redisMeta = await this.cacheService.get(
|
|
|
// `updateFrameMetadata:${this.user_id}`,
|
|
|
// );
|
|
|
// console.log('rotate信息', this.user_id, request.sampleRate);
|
|
|
|
|
|
const redisMeta = await this.rotateService.rotate(request);
|
|
|
- console.log('rotate信息', redisMeta);
|
|
|
+ // console.log('rotate信息', redisMeta);
|
|
|
// await this.rotateQueue.add('processFrame', request, {
|
|
|
// jobId: request.trace_id,
|
|
|
// });
|
|
@@ -187,23 +213,25 @@ export class SceneService implements OnModuleInit, OnModuleDestroy {
|
|
|
// 临时本地替换路经
|
|
|
src = src.replace('/0000000001/100/', '');
|
|
|
// 判断不是同一条源时才推出
|
|
|
- console.log('[media]', this.lastRenderMedia, src);
|
|
|
if (this.lastRenderMedia !== src) {
|
|
|
- console.log('不同源');
|
|
|
- this.frameCnt += 1;
|
|
|
- console.log('src', src);
|
|
|
+ console.log('[media]', src);
|
|
|
+ // console.log('不同源');
|
|
|
+ // this.frameCnt += 1;
|
|
|
+ this.holdSteam();
|
|
|
this.lastRenderMedia = src;
|
|
|
const clipPath = path.join(__dirname, `../ws/video/${src}`);
|
|
|
- console.log('src-clipPath', src, clipPath);
|
|
|
- delete redisMeta.mediaSrc;
|
|
|
+ // console.log('src-clipPath', src, clipPath);
|
|
|
+ // delete redisMeta.mediaSrc;
|
|
|
const stream: StreamFrameType = {
|
|
|
- frame: this.frameCnt,
|
|
|
+ frame: -1,
|
|
|
clipPath: clipPath,
|
|
|
- metaData: JSON.stringify(redisMeta),
|
|
|
+ metaData: JSON.stringify(frameMetaReply),
|
|
|
serverTime: this.mockserverTime,
|
|
|
DIR: 3,
|
|
|
};
|
|
|
- this.streamService.pushFrameToSteam(stream);
|
|
|
+ // console.log('stream', stream)
|
|
|
+ this.roQueue.next(stream);
|
|
|
+ // this.streamService.pushFrameToSteam(stream);
|
|
|
}
|
|
|
}
|
|
|
}
|
|
@@ -218,51 +246,51 @@ export class SceneService implements OnModuleInit, OnModuleDestroy {
|
|
|
}
|
|
|
}
|
|
|
|
|
|
- async rotate1(request: RotateRequest) {
|
|
|
- try {
|
|
|
- // const reply = this.sceneGrpcService.rotate(request);
|
|
|
- // const res = await this.cacheService.publish(
|
|
|
- // 'test',
|
|
|
- // JSON.stringify(request),
|
|
|
- // );
|
|
|
+ // async rotate1(request: RotateRequest) {
|
|
|
+ // try {
|
|
|
+ // // const reply = this.sceneGrpcService.rotate(request);
|
|
|
+ // // const res = await this.cacheService.publish(
|
|
|
+ // // 'test',
|
|
|
+ // // JSON.stringify(request),
|
|
|
+ // // );
|
|
|
|
|
|
- // console.log('res', res);
|
|
|
+ // // console.log('res', res);
|
|
|
|
|
|
- if (!this.onSteaming) {
|
|
|
- this.frameCnt += 1;
|
|
|
- this.RotateframeCnt = this.frameCnt;
|
|
|
- // this.cacheService
|
|
|
- this.testFrame += 3;
|
|
|
- this.mockserverTime += 1;
|
|
|
- this.onSteaming = true;
|
|
|
- if (this.testFrame > 358) this.testFrame = 0;
|
|
|
- const stream: StreamFrameType = {
|
|
|
- frame: this.frameCnt,
|
|
|
- clipPath: path.join(
|
|
|
- __dirname,
|
|
|
- `../ws/video/100/100.${this.testFrame.padLeft(4, '0')}.h264`,
|
|
|
- ),
|
|
|
- metaData: JSON.stringify(frameMetaReply),
|
|
|
- serverTime: this.mockserverTime,
|
|
|
- DIR: 3,
|
|
|
- };
|
|
|
- console.log('stream', this.frameCnt, stream.clipPath);
|
|
|
- this.streamService.pushFrameToSteam(stream);
|
|
|
- // const redisMeta = await this.cacheService.rpop(
|
|
|
- // `updateFrameMetadata:${this.user_id}`,
|
|
|
- // );
|
|
|
- }
|
|
|
- // reply.subscribe((res: NormalReply) => {
|
|
|
- // if (res.code === 200) {
|
|
|
- // }
|
|
|
- // });
|
|
|
- } catch (error) {
|
|
|
- this.logger.error(
|
|
|
- `rotate-${this.frameCnt},src:${this.testFrame}`,
|
|
|
- JSON.stringify(error),
|
|
|
- );
|
|
|
- }
|
|
|
- }
|
|
|
+ // if (!this.onSteaming) {
|
|
|
+ // this.frameCnt += 1;
|
|
|
+ // this.RotateframeCnt = this.frameCnt;
|
|
|
+ // // this.cacheService
|
|
|
+ // this.testFrame += 3;
|
|
|
+ // this.mockserverTime += 1;
|
|
|
+ // this.onSteaming = true;
|
|
|
+ // if (this.testFrame > 358) this.testFrame = 0;
|
|
|
+ // const stream: StreamFrameType = {
|
|
|
+ // frame: this.frameCnt,
|
|
|
+ // clipPath: path.join(
|
|
|
+ // __dirname,
|
|
|
+ // `../ws/video/100/100.${this.testFrame.padLeft(4, '0')}.h264`,
|
|
|
+ // ),
|
|
|
+ // metaData: JSON.stringify(frameMetaReply),
|
|
|
+ // serverTime: this.mockserverTime,
|
|
|
+ // DIR: 3,
|
|
|
+ // };
|
|
|
+ // console.log('stream', this.frameCnt, stream.clipPath);
|
|
|
+ // this.streamService.pushFrameToSteam(stream);
|
|
|
+ // // const redisMeta = await this.cacheService.rpop(
|
|
|
+ // // `updateFrameMetadata:${this.user_id}`,
|
|
|
+ // // );
|
|
|
+ // }
|
|
|
+ // // reply.subscribe((res: NormalReply) => {
|
|
|
+ // // if (res.code === 200) {
|
|
|
+ // // }
|
|
|
+ // // });
|
|
|
+ // } catch (error) {
|
|
|
+ // this.logger.error(
|
|
|
+ // `rotate-${this.frameCnt},src:${this.testFrame}`,
|
|
|
+ // JSON.stringify(error),
|
|
|
+ // );
|
|
|
+ // }
|
|
|
+ // }
|
|
|
|
|
|
joystick(request: JoystickRequest) {
|
|
|
return this.sceneGrpcService.joystick(request);
|
|
@@ -271,12 +299,15 @@ export class SceneService implements OnModuleInit, OnModuleDestroy {
|
|
|
handleDataChanelOpen(channel: DataChannel): void {
|
|
|
this.channel = channel;
|
|
|
this.streamService.setChannel(channel);
|
|
|
- this.handleStartCountingFrame();
|
|
|
+ // this.handleStartCountingFrame();
|
|
|
this.startSteaming.next(true);
|
|
|
+ this.startStream();
|
|
|
+ this.handleStream();
|
|
|
}
|
|
|
|
|
|
handleDataChanelClose(): void {
|
|
|
- this.stopCountingFrame();
|
|
|
+ // this.stopCountingFrame();
|
|
|
+ this.stopStream();
|
|
|
this.startSteaming.next(false);
|
|
|
this.streamService.closeChannel();
|
|
|
const exitRequest: ExitRequest = {
|
|
@@ -330,15 +361,15 @@ export class SceneService implements OnModuleInit, OnModuleDestroy {
|
|
|
}
|
|
|
|
|
|
handleIframeRequest() {
|
|
|
- this.frameCnt += 1;
|
|
|
- this.onSteaming = true;
|
|
|
- const stream: StreamFrameType = {
|
|
|
- frame: this.frameCnt,
|
|
|
- clipPath: path.join(__dirname, '../ws/video/100/100.0000.h264'),
|
|
|
- metaData: JSON.stringify(frameMetaReply),
|
|
|
- serverTime: this.mockserverTime,
|
|
|
- };
|
|
|
- this.streamService.pushFrameToSteam(stream);
|
|
|
+ // this.frameCnt += 1;
|
|
|
+ // this.onSteaming = true;
|
|
|
+ // const stream: StreamFrameType = {
|
|
|
+ // frame: this.frameCnt,
|
|
|
+ // clipPath: path.join(__dirname, '../ws/video/100/100.0000.h264'),
|
|
|
+ // metaData: JSON.stringify(frameMetaReply),
|
|
|
+ // serverTime: this.mockserverTime,
|
|
|
+ // };
|
|
|
+ // this.streamService.pushFrameToSteam(stream);
|
|
|
}
|
|
|
|
|
|
walking(request) {
|
|
@@ -423,9 +454,10 @@ export class SceneService implements OnModuleInit, OnModuleDestroy {
|
|
|
code: 0,
|
|
|
msg: 'OK',
|
|
|
};
|
|
|
- this.frameCnt += 1;
|
|
|
+ const nextframe = this.frameCnt.value + 1;
|
|
|
+
|
|
|
const stream: StreamFrameType = {
|
|
|
- frame: this.frameCnt,
|
|
|
+ frame: nextframe,
|
|
|
clipPath: path.join(__dirname, `../ws/video/2.h264`),
|
|
|
metaData: JSON.stringify(walk1),
|
|
|
serverTime: this.mockserverTime,
|
|
@@ -661,36 +693,79 @@ export class SceneService implements OnModuleInit, OnModuleDestroy {
|
|
|
}
|
|
|
// this.streamService.pushNormalDataToStream(reply);
|
|
|
}
|
|
|
- handleStartCountingFrame() {
|
|
|
- this._frameInteval = setInterval(async () => {
|
|
|
- this.frameCnt += 1;
|
|
|
- try {
|
|
|
- if (this.frameCnt === 1) {
|
|
|
- // this.pushTheFirstFrame();
|
|
|
- const stream: StreamFrameType = {
|
|
|
- frame: 1,
|
|
|
- clipPath: path.join(__dirname, '../ws/video/100/100.0000.h264'),
|
|
|
- metaData: JSON.stringify(frameMetaReply),
|
|
|
- serverTime: this.mockserverTime,
|
|
|
- };
|
|
|
- this.streamService.pushFrameToSteam(stream);
|
|
|
- }
|
|
|
|
|
|
- if (this.frameCnt > 1 && !this.onSteaming) {
|
|
|
- const streamMeta: StreamMetaType = {
|
|
|
- frame: this.frameCnt,
|
|
|
- metaData: JSON.stringify(frameMetaReply),
|
|
|
- };
|
|
|
- this.streamService.pushMetaDataToSteam(streamMeta);
|
|
|
- }
|
|
|
- } catch (error) {
|
|
|
- console.log('error', error);
|
|
|
+ handleStream() {
|
|
|
+ this.frameCntSubscription = this.frameCnt.subscribe((frame) => {
|
|
|
+ console.log('frame', frame);
|
|
|
+ if (frame === 1) {
|
|
|
+ const stream: StreamFrameType = {
|
|
|
+ frame: 1,
|
|
|
+ clipPath: path.join(__dirname, '../ws/video/100/100.0000.h264'),
|
|
|
+ metaData: JSON.stringify(frameMetaReply),
|
|
|
+ serverTime: this.mockserverTime,
|
|
|
+ };
|
|
|
+ this.streamService.pushFrameToSteam(stream);
|
|
|
}
|
|
|
- }, this.frameCntInterval);
|
|
|
- }
|
|
|
+ if (frame > 1 && !this.onSteaming) {
|
|
|
+ const streamMeta: StreamMetaType = {
|
|
|
+ frame: frame,
|
|
|
+ metaData: JSON.stringify(frameMetaReply),
|
|
|
+ };
|
|
|
+ this.streamService.pushMetaDataToSteam(streamMeta);
|
|
|
+ }
|
|
|
+ });
|
|
|
|
|
|
- stopCountingFrame(): void {
|
|
|
- clearInterval(this._frameInteval);
|
|
|
- this.frameCnt = -1;
|
|
|
+ this.roQueueSubscription = this.roQueue.subscribe(
|
|
|
+ async (stream: StreamFrameType) => {
|
|
|
+ this.rotateTimeStamp = Date.now();
|
|
|
+
|
|
|
+ if (this.rotateframeCnt === -1) {
|
|
|
+ this.rotateframeCnt = this.frameCnt.value;
|
|
|
+ }
|
|
|
+ this.rotateframeCnt += 1;
|
|
|
+ stream.frame = this.rotateframeCnt;
|
|
|
+ console.log('this.rotateframeCnt', this.rotateframeCnt);
|
|
|
+ await this.streamService.pushFrameToSteam(stream);
|
|
|
+ setTimeout(() => {
|
|
|
+ const now = Date.now();
|
|
|
+ if (now - this.rotateTimeStamp > 300) {
|
|
|
+ this.resumeStream(this.rotateframeCnt);
|
|
|
+ }
|
|
|
+ }, 300);
|
|
|
+ },
|
|
|
+ );
|
|
|
}
|
|
|
+
|
|
|
+ // handleStartCountingFrame() {
|
|
|
+ // this._frameInteval = setInterval(async () => {
|
|
|
+ // this.frameCnt += 1;
|
|
|
+ // try {
|
|
|
+ // if (this.frameCnt === 1) {
|
|
|
+ // // this.pushTheFirstFrame();
|
|
|
+ // const stream: StreamFrameType = {
|
|
|
+ // frame: 1,
|
|
|
+ // clipPath: path.join(__dirname, '../ws/video/100/100.0000.h264'),
|
|
|
+ // metaData: JSON.stringify(frameMetaReply),
|
|
|
+ // serverTime: this.mockserverTime,
|
|
|
+ // };
|
|
|
+ // this.streamService.pushFrameToSteam(stream);
|
|
|
+ // }
|
|
|
+
|
|
|
+ // if (this.frameCnt > 1 && !this.onSteaming) {
|
|
|
+ // const streamMeta: StreamMetaType = {
|
|
|
+ // frame: this.frameCnt,
|
|
|
+ // metaData: JSON.stringify(frameMetaReply),
|
|
|
+ // };
|
|
|
+ // this.streamService.pushMetaDataToSteam(streamMeta);
|
|
|
+ // }
|
|
|
+ // } catch (error) {
|
|
|
+ // console.log('error', error);
|
|
|
+ // }
|
|
|
+ // }, this.frameCntInterval);
|
|
|
+ // }
|
|
|
+
|
|
|
+ // stopCountingFrame(): void {
|
|
|
+ // clearInterval(this._frameInteval);
|
|
|
+ // this.frameCnt = -1;
|
|
|
+ // }
|
|
|
}
|