scene.service.ts 24 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651652653654655656657658659660661662663664665666667668669670671672673674675676677678679680681682683684685686687688689690691692693694695696697698699700701702703704705706707708709710711712713714715716717718719720721722723724725726727728729730731732733734735736737738739740741742743744745746747748749750751752753
  1. import { Injectable, OnModuleDestroy, OnModuleInit } from '@nestjs/common';
  2. import { ClientGrpc, Client } from '@nestjs/microservices';
  3. import { grpcClientOptions } from './grpc-scene.options';
  4. import { Logger } from '@nestjs/common';
  5. import { DataChannel } from 'node-datachannel';
  6. import { BehaviorSubject } from 'rxjs';
  7. // import * as streamBuffers from 'stream-buffers';
  8. import { ActionType } from './actionType';
  9. import { CacheService } from 'src/cache/cache.service';
  10. import { StreamService } from './stream/stream.service';
  11. // import { InjectQueue } from '@nestjs/bull';
  12. // import { Queue } from 'bull';
  13. import { RotateService } from 'src/rotate/rotate.service';
  14. import { DelayQueue, RxQueue, ThrottleQueue, DebounceQueue } from 'rx-queue';
  15. import { MoveService } from 'src/move/move.service';
  16. import { GetRouterService } from 'src/get-router/get-router.service';
  17. import { join } from 'path';
  18. import e from 'express';
  19. @Injectable()
  20. export class SceneService implements OnModuleInit, OnModuleDestroy {
  21. constructor(
  22. private cacheService: CacheService,
  23. private streamService: StreamService,
  24. private rotateService: RotateService,
  25. private moveService: MoveService,
  26. private getRouterService: GetRouterService, // @InjectQueue('rotate') private rotateQueue: Queue, // @InjectQueue('walking') private walkingQueue: Queue,
  27. ) {}
  28. @Client(grpcClientOptions) private readonly client: ClientGrpc;
  29. public _frameInteval: NodeJS.Timeout;
  30. public _frameTimeout: NodeJS.Timeout;
  31. public _rotateTimeout: NodeJS.Timeout;
  32. public _moveTimeout: NodeJS.Timeout;
  33. public startSteaming = new BehaviorSubject<boolean>(false);
  34. public onRotating = new BehaviorSubject<boolean>(false);
  35. public onMoving = new BehaviorSubject<boolean>(false);
  36. public frameCnt = new BehaviorSubject<number>(-1);
  37. private rotateframeCnt = -1;
  38. private moveframeCnt = -1;
  39. private sceneGrpcService: SceneGrpcService;
  40. private channel: DataChannel;
  41. private logger: Logger = new Logger('SceneService');
  42. private frameCntInterval = 1000;
  43. private user_id: string;
  44. private roomId: string;
  45. private onSteaming = false;
  46. private mockserverTime = Date.now() - 1653000000478;
  47. private lastRenderMedia = '';
  48. private frameCntSubscription: any;
  49. private roQueueSubscription: any;
  50. private moveQueueSubscription: any;
  51. private walkingSub: any;
  52. private joystickSub: any;
  53. private streamServiceSub: any;
  54. private roQueue: RxQueue = new DelayQueue(40);
  55. private clickQueue: RxQueue = new DebounceQueue(500);
  56. private moveQueue: RxQueue = new DelayQueue(40);
  57. private joystickQueue: RxQueue = new DebounceQueue(500);
  58. private requestIFrameQueue: RxQueue = new DebounceQueue(1000);
  59. private requestIFrameQueueSub: any;
  60. private rotateTimeStamp: number;
  61. private lastMoveCnt = -1;
  62. private firstRender = false;
  63. private latestBreakPointId: number;
  64. private isHoldingStream = false;
  65. public lastMoveStreamFrame = new BehaviorSubject<StreamFrameType>({
  66. frame: -1,
  67. clipPath: '',
  68. metaData: '',
  69. });
  70. public users = {};
  71. initUsers(app_id, userId) {
  72. const user = {
  73. appId: null,
  74. userId: null,
  75. breakPointId: null,
  76. roomId: null,
  77. player: {
  78. position: { x: -700, y: 0, z: 0 },
  79. angle: {
  80. pitch: 0,
  81. yaw: 0,
  82. roll: 0,
  83. },
  84. },
  85. camera: {
  86. position: { x: -1145, y: 0, z: 160 },
  87. angle: {
  88. pitch: 0,
  89. yaw: 0,
  90. roll: 0,
  91. },
  92. },
  93. rotateInfo: {
  94. frameIndex: 0,
  95. horizontal_move: 0,
  96. mediaSrc: null,
  97. },
  98. moveInfo: {},
  99. // traceIds: [],
  100. // actionResponses:[]
  101. };
  102. user.appId = app_id;
  103. user.userId = userId;
  104. user.breakPointId = 100;
  105. this.users[userId] = user;
  106. }
  107. onModuleInit(): void {
  108. this.sceneGrpcService =
  109. this.client.getService<SceneGrpcService>('SceneGrpcService');
  110. this.logger.log('init SceneGrpcService');
  111. this.streamServiceSub = this.streamService.onSteaming.subscribe((val) => {
  112. this.onSteaming = val;
  113. });
  114. Number.prototype.padLeft = function (n, str) {
  115. return Array(n - String(this).length + 1).join(str || '0') + this;
  116. };
  117. }
  118. public getConfig() {
  119. return {
  120. userId: this.user_id,
  121. roomId: this.roomId,
  122. };
  123. }
  124. public startStream(): void {
  125. clearInterval(this._frameInteval);
  126. if (this.frameCnt.value === -1) {
  127. this._frameInteval = setInterval(async () => {
  128. const next = this.frameCnt.value + 1;
  129. this.frameCnt.next(next);
  130. }, 1000);
  131. }
  132. }
  133. public holdSteam(): void {
  134. clearInterval(this._frameInteval);
  135. this.isHoldingStream = true;
  136. }
  137. public resumeStream(): void {
  138. this.onMoving.next(false);
  139. this.isHoldingStream = false;
  140. this.moveframeCnt = -1;
  141. this.rotateframeCnt = -1;
  142. clearInterval(this._frameInteval);
  143. this._frameInteval = setInterval(async () => {
  144. const next = this.frameCnt.getValue() + 1;
  145. this.frameCnt.next(next);
  146. }, 1000);
  147. }
  148. public stopStream(): void {
  149. if (this.frameCntSubscription) {
  150. this.frameCntSubscription.unsubscribe();
  151. this.frameCntSubscription = null;
  152. }
  153. if (this.roQueueSubscription) {
  154. this.roQueueSubscription.unsubscribe();
  155. this.roQueueSubscription = null;
  156. }
  157. if (this.moveQueueSubscription) {
  158. this.moveQueueSubscription.unsubscribe();
  159. this.moveQueueSubscription = null;
  160. }
  161. this.frameCnt.next(-1);
  162. clearInterval(this._frameInteval);
  163. this.rotateframeCnt = -1;
  164. }
  165. setConfig(user_id: string, roomId: string): void {
  166. this.user_id = user_id;
  167. this.roomId = roomId;
  168. }
  169. onModuleDestroy() {
  170. if ('unsubscribe' in this.streamServiceSub) {
  171. this.streamService.onSteaming.unsubscribe();
  172. }
  173. }
  174. init(request: InitRequest) {
  175. try {
  176. // const initReply = this.sceneGrpcService.init(request);
  177. // initReply.subscribe((reply) => {
  178. // console.log('initReply', reply);
  179. // });
  180. this.rotateService.init(request.app_id, request.user_id);
  181. // this.moveService.init(request.app_id, request.user_id);
  182. // this.initUsers(request.app_id, request.user_id);
  183. } catch (error) {
  184. console.log('error', error);
  185. }
  186. }
  187. exit(request: ExitRequest) {
  188. this.frameCnt.next(-1);
  189. this.stopStream();
  190. // const exitReply = this.sceneGrpcService.exit(request);
  191. // exitReply.subscribe((reply) => {
  192. // console.log('exitReply', reply);
  193. // });
  194. }
  195. async rotate(request: RotateRequest) {
  196. try {
  197. if (this.firstRender) {
  198. if (!this.roQueueSubscription) {
  199. this.handleRotateStream();
  200. }
  201. let redisMeta: StreamReplyType;
  202. this.onRotating.next(true);
  203. if (this.onMoving.value) {
  204. this.onMoving.next(false);
  205. const lastStreamFrame = this.lastMoveStreamFrame.getValue();
  206. const metaData: StreamReplyType = JSON.parse(
  207. lastStreamFrame.metaData,
  208. ) as any as StreamReplyType;
  209. const newUserStates: NewUserStatesType = metaData.newUserStates.find(
  210. (item) => item.userId === this.user_id,
  211. );
  212. const trace_id = metaData.traceIds[0];
  213. const userId = newUserStates.userId;
  214. const breakPointId = metaData.breakPointId;
  215. const cameraAngle = newUserStates.playerState.camera.angle;
  216. const playerAngle = newUserStates.playerState.player.angle;
  217. console.log('stop-data', trace_id, userId, cameraAngle, cameraAngle);
  218. redisMeta = await this.moveService.stop(
  219. trace_id,
  220. userId,
  221. breakPointId,
  222. cameraAngle,
  223. playerAngle,
  224. );
  225. console.log('stop-redisMeta', redisMeta);
  226. // redisMeta = await this.rotateService.rotate(request);
  227. } else {
  228. redisMeta = await this.rotateService.rotate(request);
  229. }
  230. if (redisMeta && 'mediaSrc' in redisMeta) {
  231. const mediaSrc: string = redisMeta.mediaSrc || '';
  232. if (mediaSrc.length > 0) {
  233. const src = mediaSrc.split('?')[0];
  234. // 临时本地替换路经
  235. // src = src.replace('/10086/', '');
  236. // 判断不是同一条源时才推出
  237. if (src.length > 0) {
  238. // console.log('不同源');
  239. // this.frameCnt += 1;
  240. this.holdSteam();
  241. this.lastRenderMedia = src;
  242. const clipPath = join(__dirname, `../ws/${src}`);
  243. // console.log('src-clipPath', src, clipPath);
  244. delete redisMeta.mediaSrc;
  245. // const nextFrame = this.frameCnt.getValue() + 1;
  246. // console.log('nextFrame', nextFrame);
  247. // this.frameCnt.next(nextFrame);
  248. const random_boolean = Math.random() < 0.3;
  249. const stream: StreamFrameType = {
  250. frame: -1,
  251. clipPath: clipPath,
  252. metaData: JSON.stringify(redisMeta),
  253. serverTime: this.mockserverTime,
  254. DIR: this.frameCnt.getValue() < 10 ? 3 : random_boolean ? 1 : 3,
  255. };
  256. // this.rotateQueue.add(stream, {
  257. // delay: 5,
  258. // jobId: `rotate:${this.user_id}:${this.frameCnt.getValue()}`,
  259. // removeOnComplete: true,
  260. // });
  261. clearTimeout(this._rotateTimeout);
  262. this.roQueue.next(stream);
  263. } else {
  264. // this.onRotating.next(false);
  265. }
  266. }
  267. }
  268. }
  269. } catch (error) {
  270. this.logger.error('rotate', error);
  271. console.log('error', error);
  272. }
  273. }
  274. async walking(req) {
  275. try {
  276. this.clickQueue.next(req);
  277. this.walkingSub = this.clickQueue.subscribe(async (request) => {
  278. const user = this.moveService.users[this.user_id];
  279. const path = await this.getRouterService.searchRoad(
  280. user.appId,
  281. user.breakPointId,
  282. req.clicking_action.clicking_point,
  283. );
  284. const walkingRes = await this.moveService.move(path, request);
  285. // console.log('walkingRes-front', walkingRes);
  286. if (walkingRes && !this.onMoving.value) {
  287. // console.log('walkingRes-front', walkingRes);
  288. // shift出前第一个镜头数据
  289. const rotateCamData = walkingRes.shift();
  290. if (rotateCamData?.length) {
  291. rotateCamData.forEach((item: StreamReplyType) => {
  292. item.type = 'rotate';
  293. });
  294. } else {
  295. console.log('rotateCamData无数据');
  296. }
  297. // walkingRes marker to everybody
  298. const seqs = Array.from(
  299. walkingRes,
  300. ).flat() as any as StreamReplyType[];
  301. if (seqs?.length) {
  302. const lastSeq = rotateCamData?.length
  303. ? (Array.from(rotateCamData).concat(
  304. seqs,
  305. ) as any as StreamReplyType[])
  306. : seqs;
  307. this.handleSeqMoving(lastSeq);
  308. } else {
  309. console.error('walking-move无数据');
  310. }
  311. // this.lastMoveCnt = this.frameCnt.value + seqs.length;
  312. }
  313. });
  314. } catch (error) {
  315. this.logger.error('walking', error);
  316. }
  317. }
  318. async joystick(request: JoystickRequest) {
  319. try {
  320. this.joystickQueue.next(request);
  321. if (!this.joystickSub) {
  322. this.joystickSub = this.joystickQueue.subscribe(
  323. async (joystickRequest) => {
  324. const joystickRes = await this.moveService.joystick(
  325. joystickRequest,
  326. );
  327. // 有数据 [0]是rotate数据,[1-infinity]是walking数据
  328. if (Array.isArray(joystickRes)) {
  329. // shift出前第一个镜头数据
  330. const rotateCamData = joystickRes.shift();
  331. console.log('rotateCamData', rotateCamData.length);
  332. if (rotateCamData?.length) {
  333. rotateCamData.forEach((item: StreamReplyType) => {
  334. item.type = 'rotate';
  335. });
  336. }
  337. const seqs = Array.from(
  338. joystickRes,
  339. ).flat() as any as StreamReplyType[];
  340. if (seqs?.length || rotateCamData?.length) {
  341. console.log('joystickRes-seqs', seqs.length);
  342. const lastSeq = rotateCamData?.length
  343. ? (Array.from(rotateCamData).concat(
  344. seqs,
  345. ) as any as StreamReplyType[])
  346. : seqs;
  347. this.handleSeqMoving(lastSeq);
  348. } else {
  349. console.warn('joystick-move无数据');
  350. }
  351. } else {
  352. console.log('转交数据');
  353. this.streamService.pushNormalDataToStream(request);
  354. }
  355. },
  356. );
  357. }
  358. } catch (error) {
  359. this.logger.error('joystick', error);
  360. }
  361. }
  362. /**
  363. * 主要处理moving的序列动作
  364. * @param seqs StreamReplyType[]
  365. */
  366. handleSeqMoving(seqs: StreamReplyType[]) {
  367. if (!this.moveQueueSubscription) {
  368. this.handleMoveSteam();
  369. }
  370. console.log('moving-seqs', seqs.length);
  371. this.onMoving.next(true);
  372. this.holdSteam();
  373. seqs.forEach((frame: StreamReplyType, index: number) => {
  374. const mediaSrc = frame.mediaSrc;
  375. const src = mediaSrc.split('?')[0];
  376. // 临时本地替换路经
  377. // src = src.replace('//', '');
  378. const clipPath = join(__dirname, `../ws/${src}`);
  379. const type = frame.type?.length ? frame.type.slice() : 'move';
  380. console.log('type', frame.type);
  381. delete frame.mediaSrc;
  382. delete frame.type;
  383. //TODO
  384. const random_boolean = Math.random() < 0.3;
  385. // const random_index === 1 ? 3 : random_boolean ? 1 : 3;
  386. const stream: StreamFrameType = {
  387. frame: -1,
  388. clipPath: clipPath,
  389. metaData: JSON.stringify(frame),
  390. serverTime: this.mockserverTime,
  391. DIR: 3,
  392. type: type,
  393. };
  394. this.moveQueue.next(stream);
  395. });
  396. }
  397. cleanMoveSteam() {
  398. if (this.moveQueueSubscription) {
  399. this.moveQueueSubscription.unsubscribe();
  400. this.lastMoveCnt = -1;
  401. this.moveQueueSubscription = null;
  402. }
  403. if (this.walkingSub) {
  404. this.walkingSub.unsubscribe();
  405. this.walkingSub = null;
  406. }
  407. }
  408. handleMoveSteam() {
  409. this.moveQueueSubscription = this.moveQueue.subscribe(
  410. async (stream: StreamFrameType) => {
  411. const metaData: StreamReplyType = JSON.parse(stream.metaData);
  412. if (this.moveframeCnt === -1) {
  413. this.moveframeCnt = this.frameCnt.value;
  414. }
  415. this.moveframeCnt += 1;
  416. this.latestBreakPointId = metaData.breakPointId;
  417. // if (this.onMoving) {
  418. // this.frameCnt.next(this.moveframeCnt);
  419. // } else {
  420. // console.log(
  421. // 'handleMoveSteam stop',
  422. // this.moveframeCnt,
  423. // this.currentMoveMaker,
  424. // );
  425. // this.cleanMoveSteam();
  426. // this.resumeStream();
  427. // return;
  428. // }
  429. const streamData: StreamFrameType = {
  430. frame: this.moveframeCnt,
  431. clipPath: stream.clipPath,
  432. metaData: stream.metaData,
  433. serverTime: this.mockserverTime,
  434. DIR: 3,
  435. };
  436. console.log(
  437. '[media-move]',
  438. this.moveframeCnt,
  439. stream.clipPath,
  440. stream.type,
  441. );
  442. this.lastMoveStreamFrame.next(streamData);
  443. const res = await this.streamService.pushFrameToSteam(streamData);
  444. if (res.done) {
  445. clearTimeout(this._moveTimeout);
  446. this._moveTimeout = setTimeout(() => {
  447. console.log('move 交权给空流,当前pts', res.frame);
  448. //TODO 每个结束点 updateUser metaData
  449. const lastFrame = this.lastMoveStreamFrame.getValue();
  450. const lastFrameMeta = JSON.parse(lastFrame.metaData);
  451. const userId = this.user_id;
  452. const breakPointId = lastFrameMeta.breakPointId;
  453. const lastReply = lastFrameMeta;
  454. this.moveService.updateUser(userId, breakPointId, lastReply);
  455. this.frameCnt.next(res.frame);
  456. this.resumeStream();
  457. this.rotateframeCnt = -1;
  458. this.onMoving.next(false);
  459. console.log('move end');
  460. }, 300);
  461. }
  462. },
  463. );
  464. }
  465. handleDataChanelOpen(channel: DataChannel): void {
  466. this.channel = channel;
  467. this.streamService.setChannel(channel);
  468. this.startSteaming.next(true);
  469. this.startStream();
  470. this.handleStream();
  471. }
  472. handleDataChanelClose(): void {
  473. this.stopStream();
  474. this.startSteaming.next(false);
  475. this.streamService.closeChannel();
  476. const exitRequest: ExitRequest = {
  477. action_type: 1002,
  478. user_id: this.user_id,
  479. trace_id: '',
  480. };
  481. this.exit(exitRequest);
  482. }
  483. handleMessage(message: string | Buffer) {
  484. try {
  485. if (typeof message === 'string') {
  486. // wasm:特例, requestIframe
  487. if (message.includes('wasm:')) {
  488. const parseData = message
  489. ? message.replace('wasm:', '')
  490. : `{"MstType":1}`;
  491. const msg: RTCMessageRequest = JSON.parse(parseData);
  492. this.logger.warn('lostIframe-message', msg);
  493. if (msg.MstType === 0) {
  494. this.handleIframeRequest();
  495. }
  496. } else {
  497. const msg: RTCMessageRequest = JSON.parse(message);
  498. switch (msg.action_type) {
  499. case ActionType.walk:
  500. const walk = msg;
  501. this.walking(walk);
  502. break;
  503. case ActionType.joystick:
  504. const JoystickRequest = msg as any as JoystickRequest;
  505. this.joystick(JoystickRequest);
  506. break;
  507. case ActionType.breathPoint:
  508. this.handleBreath(msg);
  509. break;
  510. case ActionType.rotate:
  511. const rotateRequest: RotateRequest = msg;
  512. this.rotate(rotateRequest);
  513. break;
  514. case ActionType.userStatus:
  515. this.updateUserStatus(msg);
  516. break;
  517. case ActionType.status:
  518. this.updateStatus();
  519. break;
  520. default:
  521. break;
  522. }
  523. }
  524. }
  525. } catch (error) {
  526. this.logger.error('handleMessage:rtc--error', message);
  527. }
  528. }
  529. async handleIframeRequest() {
  530. this.logger.warn('lostIframe');
  531. this.requestIFrameQueue.next(this.streamService.lastStreamFrame);
  532. this.requestIFrameQueueSub = this.requestIFrameQueue.subscribe(
  533. (frameData: StreamFrameType) => {
  534. const nextFrame = this.frameCnt.getValue() + 1;
  535. frameData.frame = nextFrame;
  536. this.streamService.pushFrameToSteam(frameData);
  537. this.frameCnt.next(nextFrame);
  538. this.resumeStream();
  539. },
  540. );
  541. if (!this.requestIFrameQueueSub) {
  542. this.requestIFrameQueueSub.subscribe();
  543. }
  544. //TODO Iframe 最终传什么?
  545. // const lastStreamFrame = this.streamService.lastStreamFrame.getValue();
  546. // lastStreamFrame.DIR = 1;
  547. // console.log('lastStreamFrame', lastStreamFrame);
  548. // const nextFrame = this.frameCnt.getValue() + 1;
  549. // lastStreamFrame.frame = nextFrame;
  550. // this.frameCnt.next(nextFrame);
  551. // this.streamService.pushFrameToSteam(lastStreamFrame);
  552. // const redisDataAuto = await this.rotateService.echo(this.user_id);
  553. // if (redisDataAuto) {
  554. // 'mediaSrc' in redisDataAuto && delete redisDataAuto.mediaSrc;
  555. // const streamMeta: StreamMetaType = {
  556. // frame: nextFrame,
  557. // metaData: JSON.stringify(redisDataAuto),
  558. // };
  559. // this.streamService.pushMetaDataToSteam(streamMeta);
  560. // }
  561. }
  562. async handleBreath(request) {
  563. const npsRes = await this.moveService.getBreakPoints(request);
  564. // console.log('npsRes', npsRes);
  565. this.streamService.pushNormalDataToStream(npsRes);
  566. }
  567. updateStatus() {
  568. const reply = {
  569. data: { action_type: 1009, echo_msg: { echoMsg: Date.now() } },
  570. track: false,
  571. };
  572. this.streamService.pushNormalDataToStream(reply);
  573. }
  574. async updateUserStatus(request) {
  575. try {
  576. const redisMeta = await this.rotateService.getNewUserStateRequest(
  577. request,
  578. );
  579. if (redisMeta) {
  580. redisMeta.actionType = 1024;
  581. this.streamService.pushNormalDataToStream(redisMeta);
  582. }
  583. } catch (error) {
  584. this.logger.error('updateUserStatus::function', error);
  585. }
  586. }
  587. pushFirstRender(clipPath: string, metaData: string): Promise<boolean> {
  588. return new Promise<boolean>(async (resolve, reject) => {
  589. try {
  590. const streamData: StreamFrameType = {
  591. frame: 1,
  592. clipPath: clipPath,
  593. metaData: metaData,
  594. serverTime: this.mockserverTime,
  595. DIR: 1,
  596. };
  597. const hasPush = await this.streamService.pushFrameToSteam(streamData);
  598. return resolve(hasPush.done);
  599. } catch (error) {
  600. return reject(false);
  601. }
  602. });
  603. }
  604. handleStream() {
  605. this.frameCntSubscription = this.frameCnt.subscribe(async (frame) => {
  606. try {
  607. console.log('frame', frame);
  608. if (frame === 1) {
  609. const redisData = await this.rotateService.echo(this.user_id);
  610. this.onSteaming = true;
  611. this.holdSteam();
  612. console.log('redisData', redisData);
  613. if (redisData && 'mediaSrc' in redisData) {
  614. const mediaSrc: string = redisData.mediaSrc || '';
  615. if (mediaSrc.length > 0) {
  616. const src = mediaSrc.split('?')[0];
  617. // 临时本地替换路经
  618. // src = src.replace('/10086/', '');
  619. const clipPath = join(__dirname, `../ws/${src}`);
  620. delete redisData.mediaSrc;
  621. this.logger.log(
  622. `user:${this.user_id}:first render stream` +
  623. JSON.stringify({ path: clipPath, meta: redisData }),
  624. );
  625. const status = await this.pushFirstRender(
  626. clipPath,
  627. JSON.stringify(redisData),
  628. );
  629. if (status) {
  630. this.firstRender = true;
  631. this.frameCnt.next(2);
  632. this.resumeStream();
  633. } else {
  634. this.logger.error('first render problem', status);
  635. }
  636. }
  637. }
  638. }
  639. if (
  640. frame > 1 &&
  641. !this.onMoving.value &&
  642. !this.onRotating.value &&
  643. this.firstRender
  644. ) {
  645. const redisDataAuto = await this.rotateService.echo(this.user_id);
  646. if (redisDataAuto) {
  647. console.log(`空白流::有数据:${frame}`);
  648. 'mediaSrc' in redisDataAuto && delete redisDataAuto.mediaSrc;
  649. const streamMeta: StreamMetaType = {
  650. frame: frame,
  651. metaData: JSON.stringify(redisDataAuto),
  652. };
  653. this.streamService.pushMetaDataToSteam(streamMeta);
  654. } else {
  655. this.stopStream();
  656. console.log('空流无Redis数据');
  657. throw new Error('空流无Redis数据');
  658. }
  659. }
  660. } catch (error) {
  661. this.stopStream();
  662. this.logger.error('handleStream', error);
  663. }
  664. });
  665. }
  666. handleRotateStream() {
  667. this.roQueueSubscription = this.roQueue.subscribe(
  668. async (stream: StreamFrameType) => {
  669. this.rotateTimeStamp = Date.now();
  670. if (this.rotateframeCnt === -1) {
  671. this.rotateframeCnt = this.frameCnt.value;
  672. }
  673. this.rotateframeCnt += 1;
  674. stream.frame = this.rotateframeCnt;
  675. console.log(
  676. '[media-rotate]',
  677. stream.frame,
  678. this.rotateframeCnt,
  679. stream.clipPath,
  680. );
  681. // this.logger.log(
  682. // `roQueueSubscription:frame:${this.rotateframeCnt} ` +
  683. // JSON.stringify(stream.metaData),
  684. // );
  685. const res = await this.streamService.pushFrameToSteam(stream);
  686. if (res.done) {
  687. clearTimeout(this._rotateTimeout);
  688. this._rotateTimeout = setTimeout(() => {
  689. console.log('rotate 1秒', Date.now());
  690. console.log('rotate end');
  691. // const next = res.frame + 1;
  692. this.frameCnt.next(res.frame);
  693. this.resumeStream();
  694. this.rotateframeCnt = -1;
  695. this.onMoving.next(false);
  696. this.onRotating.next(false);
  697. }, 300);
  698. }
  699. },
  700. );
  701. }
  702. }