scene.service.ts 26 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651652653654655656657658659660661662663664665666667668669670671672673674675676677678679680681682683684685686687688689690691692693694695696697698699700701702703704705706707708709710711712713714715716717718719720721722723724725726727728729730731732733734735736737738739740741742743744745746747748749750751752753754755756757758759760761762763764765766767768769770771772773774775776777778779780781782783784785786787788789790791792793794795796797798799800801802803804805806807
  1. import { Injectable, OnModuleDestroy, OnModuleInit } from '@nestjs/common';
  2. import { ClientGrpc, Client } from '@nestjs/microservices';
  3. import { grpcClientOptions } from './grpc-scene.options';
  4. import { Logger } from '@nestjs/common';
  5. import { DataChannel } from 'node-datachannel';
  6. import { BehaviorSubject } from 'rxjs';
  7. // import * as streamBuffers from 'stream-buffers';
  8. import { ActionType } from './actionType';
  9. import { CacheService } from 'src/cache/cache.service';
  10. import { StreamService } from './stream/stream.service';
  11. // import { InjectQueue } from '@nestjs/bull';
  12. // import { Queue } from 'bull';
  13. import { RotateService } from 'src/rotate/rotate.service';
  14. import { DelayQueue, RxQueue, ThrottleQueue, DebounceQueue } from 'rx-queue';
  15. import { MoveService } from 'src/move/move.service';
  16. import { GetRouterService } from 'src/get-router/get-router.service';
  17. import { ConfigService } from '@nestjs/config';
  18. @Injectable()
  19. export class SceneService implements OnModuleInit, OnModuleDestroy {
  20. constructor(
  21. private configService: ConfigService,
  22. private cacheService: CacheService,
  23. private streamService: StreamService,
  24. private rotateService: RotateService,
  25. private moveService: MoveService,
  26. private getRouterService: GetRouterService, // @InjectQueue('rotate') private rotateQueue: Queue, // @InjectQueue('walking') private walkingQueue: Queue,
  27. ) {}
  28. @Client(grpcClientOptions) private readonly client: ClientGrpc;
  29. public _frameInteval: NodeJS.Timeout;
  30. public _frameTimeout: NodeJS.Timeout;
  31. public _rotateTimeout: NodeJS.Timeout;
  32. public _moveTimeout: NodeJS.Timeout;
  33. public startSteaming = new BehaviorSubject<boolean>(false);
  34. public onRotating = new BehaviorSubject<boolean>(false);
  35. public onMoving = new BehaviorSubject<boolean>(false);
  36. public frameCnt = new BehaviorSubject<number>(-1);
  37. private rotateframeCnt = -1;
  38. private moveframeCnt = -1;
  39. private moveKeyFrame = -1;
  40. private sceneGrpcService: SceneGrpcService;
  41. private channel: DataChannel;
  42. private logger: Logger = new Logger('SceneService');
  43. private frameCntInterval = 1000;
  44. private user_id: string;
  45. private roomId: string;
  46. private onSteaming = false;
  47. private mockserverTime = Date.now() - 1653000000478;
  48. private lastRenderMedia = '';
  49. private frameCntSubscription: any;
  50. private roQueueSubscription: any;
  51. private moveQueueSubscription: any;
  52. private walkingSub: any;
  53. private joystickSub: any;
  54. private clickQueueSub: any;
  55. private streamServiceSub: any;
  56. // private roRequestQueue: RxQueue = new DelayQueue(20);
  57. private roQueue: RxQueue = new DelayQueue(10);
  58. private clickQueue: RxQueue = new DebounceQueue(500);
  59. private moveQueue: RxQueue = new DelayQueue(20);
  60. private joystickQueue: RxQueue = new DebounceQueue(500);
  61. private requestIFrameQueue: RxQueue = new DebounceQueue(2000);
  62. private requestIFrameQueueSub: any;
  63. private roRequestQueueSub: any;
  64. private rotateTimeStamp: number;
  65. private lastMoveCnt = -1;
  66. private firstRender = false;
  67. private latestBreakPointId: number;
  68. private isHoldingStream = false;
  69. public lastMoveStreamFrame = new BehaviorSubject<StreamFrameType>({
  70. frame: -1,
  71. clipPath: '',
  72. metaData: '',
  73. });
  74. public users = {};
  75. // initUsers(app_id, userId) {
  76. // const user = {
  77. // appId: null,
  78. // userId: null,
  79. // breakPointId: null,
  80. // roomId: null,
  81. // player: {
  82. // position: { x: -700, y: 0, z: 0 },
  83. // angle: {
  84. // pitch: 0,
  85. // yaw: 0,
  86. // roll: 0,
  87. // },
  88. // },
  89. // camera: {
  90. // position: { x: -1145, y: 0, z: 160 },
  91. // angle: {
  92. // pitch: 0,
  93. // yaw: 0,
  94. // roll: 0,
  95. // },
  96. // },
  97. // rotateInfo: {
  98. // frameIndex: 0,
  99. // horizontal_move: 0,
  100. // mediaSrc: null,
  101. // },
  102. // moveInfo: {},
  103. // // traceIds: [],
  104. // // actionResponses:[]
  105. // };
  106. // user.appId = app_id;
  107. // user.userId = userId;
  108. // user.breakPointId = 100;
  109. // this.users[userId] = user;
  110. // }
  111. onModuleInit(): void {
  112. this.sceneGrpcService =
  113. this.client.getService<SceneGrpcService>('SceneGrpcService');
  114. this.logger.log('init SceneGrpcService');
  115. this.streamServiceSub = this.streamService.onSteaming.subscribe((val) => {
  116. this.onSteaming = val;
  117. });
  118. Number.prototype.padLeft = function (n, str) {
  119. return Array(n - String(this).length + 1).join(str || '0') + this;
  120. };
  121. }
  122. public isHeaderOrLast(index: number, length: number): boolean {
  123. if (index === 0 || index === length) {
  124. return true;
  125. } else {
  126. return false;
  127. }
  128. }
  129. public getConfig() {
  130. return {
  131. userId: this.user_id,
  132. roomId: this.roomId,
  133. };
  134. }
  135. public startStream(): void {
  136. clearInterval(this._frameInteval);
  137. if (this.frameCnt.value === -1) {
  138. this._frameInteval = setInterval(async () => {
  139. const next = this.frameCnt.value + 1;
  140. this.frameCnt.next(next);
  141. }, 1000);
  142. }
  143. }
  144. public holdSteam(): void {
  145. clearInterval(this._frameInteval);
  146. this.isHoldingStream = true;
  147. }
  148. public resumeStream(): void {
  149. this.onMoving.next(false);
  150. this.onRotating.next(false);
  151. this.isHoldingStream = false;
  152. this.moveframeCnt = -1;
  153. this.rotateframeCnt = -1;
  154. clearInterval(this._frameInteval);
  155. this._frameInteval = setInterval(async () => {
  156. const next = this.frameCnt.getValue() + 1;
  157. this.frameCnt.next(next);
  158. }, 1000);
  159. }
  160. public stopStream(): void {
  161. if (this.frameCntSubscription) {
  162. this.frameCntSubscription.unsubscribe();
  163. this.frameCntSubscription = null;
  164. }
  165. if (this.roQueueSubscription) {
  166. this.roQueueSubscription.unsubscribe();
  167. this.roQueueSubscription = null;
  168. }
  169. if (this.moveQueueSubscription) {
  170. this.moveQueueSubscription.unsubscribe();
  171. this.moveQueueSubscription = null;
  172. }
  173. this.frameCnt.next(-1);
  174. clearInterval(this._frameInteval);
  175. this.rotateframeCnt = -1;
  176. }
  177. setConfig(user_id: string, roomId: string): void {
  178. this.user_id = user_id;
  179. this.roomId = roomId;
  180. }
  181. onModuleDestroy() {
  182. if ('unsubscribe' in this.streamServiceSub) {
  183. this.streamService.onSteaming.unsubscribe();
  184. }
  185. }
  186. init(request: InitRequest) {
  187. try {
  188. this.rotateService.init(request.app_id, request.user_id);
  189. this.startSteaming.next(true);
  190. this.startStream();
  191. this.handleStream();
  192. // this.moveService.init(request.app_id, request.user_id);
  193. // this.initUsers(request.app_id, request.user_id);
  194. } catch (error) {
  195. console.log('error', error);
  196. }
  197. }
  198. exit(request: ExitRequest) {
  199. this.frameCnt.next(-1);
  200. this.stopStream();
  201. // const exitReply = this.sceneGrpcService.exit(request);
  202. // exitReply.subscribe((reply) => {
  203. // console.log('exitReply', reply);
  204. // });
  205. }
  206. async rotate(request: RotateRequest) {
  207. this.handleRotate(request);
  208. // console.log('request', request)
  209. // this.roRequestQueue.next(request);
  210. // if (!this.roRequestQueueSub) {
  211. // this.handleRotate();
  212. // }
  213. }
  214. /**
  215. * rotate请求队列
  216. */
  217. async handleRotate(request) {
  218. // this.roRequestQueueSub = this.roRequestQueue.subscribe(
  219. // async (request: RotateRequest) => {
  220. try {
  221. if (this.firstRender) {
  222. if (!this.roQueueSubscription) {
  223. this.handleRotateStream();
  224. }
  225. let redisMeta: StreamReplyType;
  226. this.onRotating.next(true);
  227. // 当move时处理
  228. if (this.onMoving.value) {
  229. this.onMoving.next(false);
  230. const lastStreamFrame = this.lastMoveStreamFrame.getValue();
  231. const metaData: StreamReplyType = JSON.parse(
  232. lastStreamFrame.metaData,
  233. ) as any as StreamReplyType;
  234. const newUserStates: NewUserStatesType = metaData.newUserStates.find(
  235. (item) => item.userId === this.user_id,
  236. );
  237. const trace_id = metaData.traceIds[0];
  238. const userId = newUserStates.userId;
  239. const breakPointId = metaData.breakPointId;
  240. const cameraAngle = newUserStates.playerState.camera.angle;
  241. const playerAngle = newUserStates.playerState.player.angle;
  242. console.log('stop-data', trace_id, userId, cameraAngle, cameraAngle);
  243. redisMeta = await this.moveService.stop(
  244. trace_id,
  245. userId,
  246. breakPointId,
  247. cameraAngle,
  248. playerAngle,
  249. );
  250. console.log('stop-redisMeta', redisMeta);
  251. // redisMeta = await this.rotateService.rotate(request);
  252. } else {
  253. // 正常rotate
  254. redisMeta = await this.rotateService.seqExeRotate(request);
  255. // console.log(redisMeta);
  256. //debugger;
  257. }
  258. if (redisMeta && 'mediaSrc' in redisMeta) {
  259. const mediaSrc: string = redisMeta.mediaSrc || '';
  260. if (mediaSrc.length > 0) {
  261. const src = mediaSrc.split('?')[0];
  262. if (src.length > 0) {
  263. // console.log('不同源');
  264. this.holdSteam();
  265. this.lastRenderMedia = src;
  266. const clipPath = this.configService.get('app.prefix') + src;
  267. //TODO 临时开出
  268. // delete redisMeta.mediaSrc;
  269. const random_boolean = Math.random() < 0.3;
  270. const stream: StreamFrameType = {
  271. frame: -1,
  272. clipPath: clipPath,
  273. metaData: JSON.stringify(redisMeta),
  274. serverTime: this.mockserverTime,
  275. DIR: this.frameCnt.getValue() < 10 ? 3 : random_boolean ? 1 : 3,
  276. };
  277. // console.log('rotate', stream, Date.now());
  278. clearTimeout(this._rotateTimeout);
  279. this.roQueue.next(stream);
  280. } else {
  281. // this.onRotating.next(false);
  282. }
  283. }
  284. }
  285. }
  286. } catch (error) {
  287. this.logger.error('rotate', error);
  288. console.log('error', error);
  289. }
  290. // },
  291. // );
  292. }
  293. async walking(req) {
  294. try {
  295. if (this.clickQueueSub) {
  296. this.clickQueueSub.unsubscribe();
  297. this.clickQueueSub = null;
  298. }
  299. this.clickQueue.next(req);
  300. console.log('walking', this.clickQueueSub, JSON.stringify(req));
  301. // if (!this.clickQueueSub) {
  302. this.clickQueueSub = this.clickQueue.subscribe(async (request) => {
  303. const user = this.moveService.users[this.user_id];
  304. console.log('进入1 - searchRoad');
  305. const path = await this.getRouterService.searchRoad(
  306. user.appId,
  307. user.breakPointId,
  308. req.clicking_action.clicking_point,
  309. );
  310. console.log('walking-path', path);
  311. if (!path) {
  312. console.log('不存在--path', path);
  313. this.resumeStream();
  314. return;
  315. }
  316. // debugger;
  317. const walkingRes = await this.moveService.move(path, request);
  318. console.log('walking', walkingRes);
  319. // debugger;
  320. if (walkingRes && !this.onMoving.value) {
  321. // console.log('walkingRes-front', walkingRes);
  322. // shift出前第一个镜头数据
  323. const rotateCamData = walkingRes[0];
  324. console.log('rotateCamData', rotateCamData.length);
  325. if (rotateCamData?.length) {
  326. // 头数组[0] rotate 序列, 头是关键key
  327. walkingRes[0].forEach((item: StreamReplyType, index: number) => {
  328. item.mType = 'rotate';
  329. item.DIR = index === 0 ? 1 : 3;
  330. });
  331. } else {
  332. console.log('rotateCamData无数据');
  333. }
  334. // 二维数组 做move 序列, move类型
  335. for (let i = 1; i < walkingRes.length; i++) {
  336. console.log('walkingRes', walkingRes[i]);
  337. Array.from(walkingRes[i]).forEach(
  338. (item: StreamReplyType, index: number) => {
  339. const dir = this.isHeaderOrLast(
  340. index,
  341. walkingRes[i].length - 1,
  342. );
  343. item.DIR = dir ? 1 : 3;
  344. },
  345. );
  346. }
  347. // walkingRes marker to everybody
  348. const seqs = Array.from(
  349. walkingRes,
  350. ).flat() as any as StreamReplyType[];
  351. if (seqs?.length) {
  352. console.log('walking --总序列--seqs-2', seqs.length);
  353. this.handleSeqMoving(seqs);
  354. } else {
  355. console.error('walking-move无数据');
  356. this.cleanMoveSteam();
  357. this.resumeStream();
  358. }
  359. }
  360. });
  361. // }
  362. } catch (error) {
  363. this.logger.error('walking', error);
  364. this.cleanMoveSteam();
  365. this.resumeStream();
  366. }
  367. }
  368. async joystick(request: JoystickRequest) {
  369. try {
  370. this.joystickQueue.next(request);
  371. if (!this.joystickSub) {
  372. this.joystickSub = this.joystickQueue.subscribe(
  373. async (joystickRequest) => {
  374. const joystickRes = await this.moveService.joystick(
  375. joystickRequest,
  376. );
  377. // 有数据 [0]是rotate数据,[1-infinity]是walking数据
  378. if (Array.isArray(joystickRes)) {
  379. // shift出前第一个镜头数据
  380. const rotateCamData = joystickRes.shift();
  381. console.log('rotateCamData', rotateCamData.length);
  382. if (rotateCamData?.length) {
  383. // 头数组[0] rotate 序列, 头是关键key
  384. joystickRes[0].forEach(
  385. (item: StreamReplyType, index: number) => {
  386. item.mType = 'rotate';
  387. item.DIR = index === 0 ? 1 : 3;
  388. },
  389. );
  390. } else {
  391. console.log('rotateCamData无数据');
  392. }
  393. // 二维数组 做move 序列, move类型
  394. for (let i = 1; i < joystickRes.length; i++) {
  395. console.log('walkingRes', joystickRes[i]);
  396. Array.from(joystickRes[i]).forEach(
  397. (item: StreamReplyType, index: number) => {
  398. const dir = this.isHeaderOrLast(
  399. index,
  400. joystickRes[i].length - 1,
  401. );
  402. item.DIR = dir ? 1 : 3;
  403. },
  404. );
  405. }
  406. const seqs = Array.from(
  407. joystickRes,
  408. ).flat() as any as StreamReplyType[];
  409. if (seqs?.length) {
  410. console.log('joystickRes-seqs', seqs.length);
  411. this.handleSeqMoving(seqs);
  412. } else {
  413. console.warn('joystick-move无数据');
  414. }
  415. } else {
  416. console.log('转交数据');
  417. this.streamService.pushNormalDataToStream(request);
  418. }
  419. },
  420. );
  421. }
  422. } catch (error) {
  423. this.logger.error('joystick', error);
  424. }
  425. }
  426. /**
  427. * 主要处理moving的序列动作
  428. * @param seqs StreamReplyType[]
  429. */
  430. handleSeqMoving(seqs: StreamReplyType[]) {
  431. if (!this.moveQueueSubscription) {
  432. this.handleMoveSteam();
  433. }
  434. console.log('moving-seqs', seqs.length);
  435. this.onMoving.next(true);
  436. this.holdSteam();
  437. seqs.forEach((frame: StreamReplyType) => {
  438. const mediaSrc = frame.mediaSrc;
  439. const src = mediaSrc.split('?')[0];
  440. const clipPath = this.configService.get('app.prefix') + src;
  441. const type = frame.mType?.length ? frame.mType.slice() : 'move';
  442. const stream: StreamFrameType = {
  443. frame: -1,
  444. clipPath: clipPath,
  445. metaData: JSON.stringify(frame),
  446. serverTime: this.mockserverTime,
  447. DIR: frame.DIR,
  448. mType: type,
  449. };
  450. this.moveQueue.next(stream);
  451. });
  452. }
  453. cleanMoveSteam() {
  454. if (this.moveQueueSubscription) {
  455. this.moveQueueSubscription.unsubscribe();
  456. this.lastMoveCnt = -1;
  457. this.moveQueueSubscription = null;
  458. }
  459. if (this.walkingSub) {
  460. this.walkingSub.unsubscribe();
  461. this.walkingSub = null;
  462. }
  463. // if (this.clickQueueSub) {
  464. // this.clickQueueSub.unsubscribe();
  465. // this.clickQueueSub = null;
  466. // }
  467. }
  468. handleMoveSteam() {
  469. this.moveQueueSubscription = this.moveQueue.subscribe(
  470. async (stream: StreamFrameType) => {
  471. const metaData: StreamReplyType = JSON.parse(stream.metaData);
  472. if (this.moveframeCnt === -1) {
  473. this.moveframeCnt = this.frameCnt.value;
  474. }
  475. this.moveframeCnt += 1;
  476. this.latestBreakPointId = metaData.breakPointId;
  477. const streamData: StreamFrameType = {
  478. frame: this.moveframeCnt,
  479. clipPath: stream.clipPath,
  480. metaData: stream.metaData,
  481. serverTime: this.mockserverTime,
  482. DIR: stream.DIR,
  483. };
  484. console.log(
  485. '[media-move]',
  486. this.moveframeCnt,
  487. stream.clipPath,
  488. stream.mType,
  489. stream.DIR,
  490. );
  491. this.lastMoveStreamFrame.next(streamData);
  492. const res = await this.streamService.pushFrameToSteam(streamData);
  493. if (res.done) {
  494. clearTimeout(this._moveTimeout);
  495. this._moveTimeout = setTimeout(() => {
  496. console.log('move 交权给空流,当前pts', res.frame);
  497. //TODO 每个结束点 updateUser metaData
  498. const lastFrame = this.lastMoveStreamFrame.getValue();
  499. const lastFrameMeta = JSON.parse(lastFrame.metaData);
  500. const userId = this.user_id;
  501. const breakPointId = lastFrameMeta.breakPointId;
  502. const lastReply = lastFrameMeta;
  503. this.moveService.updateUser(userId, breakPointId, lastReply);
  504. this.frameCnt.next(res.frame);
  505. this.resumeStream();
  506. this.rotateframeCnt = -1;
  507. this.onMoving.next(false);
  508. this.cleanMoveSteam();
  509. console.log('move end');
  510. }, 300);
  511. }
  512. },
  513. );
  514. }
  515. handleDataChanelOpen(channel: DataChannel): void {
  516. this.channel = channel;
  517. this.streamService.setChannel(channel);
  518. }
  519. handleDataChanelClose(): void {
  520. this.stopStream();
  521. this.startSteaming.next(false);
  522. this.streamService.closeChannel();
  523. const exitRequest: ExitRequest = {
  524. action_type: 1002,
  525. user_id: this.user_id,
  526. trace_id: '',
  527. };
  528. this.exit(exitRequest);
  529. }
  530. handleMessage(message: string | Buffer) {
  531. try {
  532. if (typeof message === 'string') {
  533. // wasm:特例, requestIframe
  534. if (message.includes('wasm:')) {
  535. const parseData = message
  536. ? String(message).replace('wasm:', '')
  537. : `{"MstType":1}`;
  538. const msg: RTCMessageRequest = JSON.parse(parseData);
  539. this.logger.warn('lostIframe-message', msg);
  540. if (msg.MstType === 0) {
  541. this.handleIframeRequest();
  542. }
  543. } else {
  544. const msg: RTCMessageRequest = JSON.parse(message);
  545. switch (msg.action_type) {
  546. case ActionType.walk:
  547. const walk = msg;
  548. this.walking(walk);
  549. break;
  550. case ActionType.joystick:
  551. const JoystickRequest = msg as any as JoystickRequest;
  552. this.joystick(JoystickRequest);
  553. break;
  554. case ActionType.breathPoint:
  555. this.handleBreath(msg);
  556. break;
  557. case ActionType.rotate:
  558. const rotateRequest: RotateRequest = msg;
  559. this.rotate(rotateRequest);
  560. break;
  561. case ActionType.userStatus:
  562. this.updateUserStatus(msg);
  563. break;
  564. case ActionType.status:
  565. this.updateStatus();
  566. break;
  567. default:
  568. break;
  569. }
  570. }
  571. }
  572. } catch (error) {
  573. this.logger.error('handleMessage:rtc--error', message);
  574. }
  575. }
  576. async handleIframeRequest() {
  577. //TODO Iframe 最终传什么?
  578. this.requestIFrameQueue.next(this.streamService.lastStreamFrame.getValue());
  579. if (!this.requestIFrameQueueSub) {
  580. this.requestIFrameQueueSub = this.requestIFrameQueue.subscribe(
  581. (frameData: StreamFrameType) => {
  582. const nextFrame = this.frameCnt.getValue() + 1;
  583. this.logger.warn('lostIframe', nextFrame);
  584. frameData.frame = nextFrame;
  585. this.streamService.pushFrameToSteam(frameData);
  586. this.frameCnt.next(nextFrame);
  587. this.resumeStream();
  588. },
  589. );
  590. }
  591. }
  592. handleBreath(request) {
  593. const npsRes = this.moveService.getBreakPoints(request);
  594. console.log('npsRes', npsRes.nps.length);
  595. this.streamService.pushNormalDataToStream(npsRes);
  596. }
  597. updateStatus(): void {
  598. const reply = {
  599. data: { action_type: 1009, echo_msg: { echoMsg: Date.now() } },
  600. track: false,
  601. };
  602. this.streamService.pushNormalDataToStream(reply);
  603. }
  604. async updateUserStatus(request) {
  605. try {
  606. const redisMeta = await this.rotateService.getNewUserStateRequest(
  607. request,
  608. );
  609. if (redisMeta) {
  610. redisMeta.actionType = 1024;
  611. this.streamService.pushNormalDataToStream(redisMeta);
  612. } else {
  613. this.logger.error('updateUserStatus::function-empty');
  614. }
  615. } catch (error) {
  616. this.logger.error('updateUserStatus::function', error);
  617. }
  618. }
  619. pushFirstRender(clipPath: string, metaData: string): Promise<boolean> {
  620. return new Promise<boolean>(async (resolve, reject) => {
  621. try {
  622. const streamData: StreamFrameType = {
  623. frame: 1,
  624. clipPath: clipPath,
  625. metaData: metaData,
  626. serverTime: this.mockserverTime,
  627. DIR: 1,
  628. };
  629. const hasPush = await this.streamService.pushFrameToSteam(streamData);
  630. return resolve(hasPush.done);
  631. } catch (error) {
  632. return reject(false);
  633. }
  634. });
  635. }
  636. handleStream() {
  637. this.logger.log('this.frameCntSubscription', this.frameCntSubscription);
  638. if (!this.frameCntSubscription) {
  639. this.frameCntSubscription = this.frameCnt.subscribe(async (frame) => {
  640. try {
  641. console.log('frame', frame);
  642. if (frame === 1) {
  643. const redisData = await this.rotateService.echo(this.user_id, true);
  644. console.log('获取-首屏', redisData);
  645. this.onSteaming = true;
  646. this.holdSteam();
  647. if (redisData && 'mediaSrc' in redisData) {
  648. const mediaSrc: string = redisData.mediaSrc || '';
  649. if (mediaSrc.length > 0) {
  650. const src = mediaSrc.split('?')[0];
  651. // 临时本地替换路经
  652. // src = src.replace('/10086/', '');
  653. // const clipPath = join(__dirname, `../ws/${src}`);
  654. const clipPath = this.configService.get('app.prefix') + src;
  655. delete redisData.mediaSrc;
  656. this.logger.log(
  657. `user:${this.user_id}:first render stream` +
  658. JSON.stringify({ path: clipPath, meta: redisData }),
  659. );
  660. const status = await this.pushFirstRender(
  661. clipPath,
  662. JSON.stringify(redisData),
  663. );
  664. if (status) {
  665. this.firstRender = true;
  666. this.frameCnt.next(2);
  667. this.resumeStream();
  668. } else {
  669. this.logger.error('first render problem', status);
  670. }
  671. }
  672. } else {
  673. this.logger.error(`首屏::无数据:${frame}`);
  674. }
  675. }
  676. if (
  677. frame > 1 &&
  678. !this.onMoving.value &&
  679. !this.onRotating.value &&
  680. this.firstRender
  681. ) {
  682. const redisDataAuto = await this.rotateService.echo(
  683. this.user_id,
  684. false,
  685. );
  686. if (redisDataAuto) {
  687. console.log(`空白流::有数据:${frame}`);
  688. 'mediaSrc' in redisDataAuto && delete redisDataAuto.mediaSrc;
  689. const streamMeta: StreamMetaType = {
  690. frame: frame,
  691. metaData: JSON.stringify(redisDataAuto),
  692. };
  693. this.streamService.pushMetaDataToSteam(streamMeta);
  694. } else {
  695. this.stopStream();
  696. console.log('空流无Redis数据');
  697. throw new Error('空流无Redis数据');
  698. }
  699. }
  700. } catch (error) {
  701. this.stopStream();
  702. this.logger.error('handleStream', error);
  703. }
  704. });
  705. }
  706. }
  707. /**
  708. * rotate 推送队列
  709. */
  710. handleRotateStream() {
  711. if (!this.roQueueSubscription) {
  712. this.roQueueSubscription = this.roQueue.subscribe(
  713. async (stream: StreamFrameType) => {
  714. this.rotateTimeStamp = Date.now();
  715. if (this.rotateframeCnt === -1) {
  716. this.rotateframeCnt = this.frameCnt.value;
  717. }
  718. this.rotateframeCnt += 1;
  719. stream.frame = this.rotateframeCnt;
  720. console.log(
  721. '[media-rotate]',
  722. stream.frame,
  723. this.rotateframeCnt,
  724. stream.clipPath,
  725. );
  726. // this.logger.log(
  727. // `roQueueSubscription:frame:${this.rotateframeCnt} ` +
  728. // JSON.stringify(stream.metaData),
  729. // );
  730. const res = await this.streamService.pushFrameToSteam(stream);
  731. if (res.done) {
  732. clearTimeout(this._rotateTimeout);
  733. this._rotateTimeout = setTimeout(() => {
  734. console.log('rotate end', Date.now());
  735. this.frameCnt.next(res.frame);
  736. this.resumeStream();
  737. this.rotateframeCnt = -1;
  738. this.onMoving.next(false);
  739. this.onRotating.next(false);
  740. //TODO rotate完后清除request队列
  741. if (this.roRequestQueueSub) {
  742. this.roRequestQueueSub.unsubscribe();
  743. this.roRequestQueueSub = null;
  744. }
  745. }, 300);
  746. }
  747. },
  748. );
  749. }
  750. }
  751. }