scene.service.ts 18 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543
  1. import { Injectable, OnModuleDestroy, OnModuleInit } from '@nestjs/common';
  2. import { ClientGrpc, Client } from '@nestjs/microservices';
  3. import { grpcClientOptions } from './grpc-scene.options';
  4. import { Logger } from '@nestjs/common';
  5. import { DataChannel } from 'node-datachannel';
  6. import * as path from 'path';
  7. import { BehaviorSubject, filter, ignoreElements, take } from 'rxjs';
  8. // import * as streamBuffers from 'stream-buffers';
  9. import { ActionType } from './actionType';
  10. import { CacheService } from 'src/cache/cache.service';
  11. import { StreamService } from './stream/stream.service';
  12. import { InjectQueue } from '@nestjs/bull';
  13. import { Queue } from 'bull';
  14. import { RotateService } from 'src/rotate/rotate.service';
  15. import { DelayQueue, RxQueue, ThrottleQueue, DebounceQueue } from 'rx-queue';
  16. import { MoveService } from 'src/move/move.service';
  17. import { GetRouterService } from 'src/get-router/get-router.service';
  18. @Injectable()
  19. export class SceneService implements OnModuleInit, OnModuleDestroy {
  20. constructor(
  21. private cacheService: CacheService,
  22. private streamService: StreamService,
  23. private rotateService: RotateService,
  24. private moveService: MoveService,
  25. private getRouterService: GetRouterService,
  26. @InjectQueue('rotate') private rotateQueue: Queue,
  27. ) { }
  28. @Client(grpcClientOptions) private readonly client: ClientGrpc;
  29. private sceneGrpcService: SceneGrpcService;
  30. private logger: Logger = new Logger('SceneService');
  31. private frameCntInterval = 1000;
  32. public _frameInteval: NodeJS.Timeout;
  33. public _frameTimeout: NodeJS.Timeout;
  34. private channel: DataChannel;
  35. public startSteaming = new BehaviorSubject<boolean>(false);
  36. private user_id: string;
  37. private roomId: string;
  38. private onSteaming = false;
  39. private rotateframeCnt = -1;
  40. private mockserverTime = Date.now() - 1653000000478;
  41. private lastRenderMedia = '';
  42. private frameCnt = new BehaviorSubject<number>(-1);
  43. private frameCntSubscription: any;
  44. private roQueueSubscription: any;
  45. private moveQueueSubscription: any;
  46. private walkingSub: any;
  47. private streamServiceSub: any;
  48. private roQueue: RxQueue = new DelayQueue(80);
  49. private clickQueue: RxQueue = new DebounceQueue(500);
  50. private moveQueue: RxQueue = new DelayQueue(100);
  51. private rotateTimeStamp: number;
  52. private lastMoveCnt = -1;
  53. private currentMoveMaker = '';
  54. private onMoving = false;
  55. private onRotating = false;
  56. private firstRender = false;
  57. private currentPoint = '';
  58. public lastMoveStreamFrame = new BehaviorSubject<StreamFrameType>({
  59. frame: -1,
  60. clipPath: '',
  61. metaData: '',
  62. });
  63. onModuleInit(): void {
  64. this.sceneGrpcService =
  65. this.client.getService<SceneGrpcService>('SceneGrpcService');
  66. this.logger.log('init SceneGrpcService');
  67. this.streamServiceSub = this.streamService.onSteaming.subscribe((val) => {
  68. this.onSteaming = val;
  69. });
  70. Number.prototype.padLeft = function (n, str) {
  71. return Array(n - String(this).length + 1).join(str || '0') + this;
  72. };
  73. }
  74. startStream(): void {
  75. clearInterval(this._frameInteval);
  76. if (this.frameCnt.value === -1) {
  77. this._frameInteval = setInterval(async () => {
  78. const next = this.frameCnt.value + 1;
  79. this.frameCnt.next(next);
  80. }, 1000);
  81. }
  82. }
  83. holdSteam(): void {
  84. clearInterval(this._frameInteval);
  85. }
  86. resumeStream(value: number) {
  87. this.onMoving = false;
  88. this.frameCnt.next(value);
  89. clearTimeout(this._frameTimeout);
  90. clearInterval(this._frameInteval);
  91. this._frameTimeout = setTimeout(() => {
  92. this._frameInteval = setInterval(async () => {
  93. const next = this.frameCnt.getValue() + 1;
  94. this.frameCnt.next(next);
  95. }, 1000);
  96. }, 1000);
  97. }
  98. stopStream(): void {
  99. if (this.frameCntSubscription) {
  100. this.frameCntSubscription.unsubscribe();
  101. this.frameCntSubscription = null;
  102. }
  103. if (this.roQueueSubscription) {
  104. this.roQueueSubscription.unsubscribe();
  105. this.roQueueSubscription = null;
  106. }
  107. if (this.moveQueueSubscription) {
  108. this.moveQueueSubscription.unsubscribe();
  109. this.moveQueueSubscription = null;
  110. }
  111. this.frameCnt.next(-1);
  112. clearInterval(this._frameInteval);
  113. this.rotateframeCnt = -1;
  114. }
  115. setConfig(user_id: string, roomId: string): void {
  116. this.user_id = user_id;
  117. this.roomId = roomId;
  118. }
  119. onModuleDestroy() {
  120. if ('unsubscribe' in this.streamServiceSub) {
  121. this.streamService.onSteaming.unsubscribe();
  122. }
  123. }
  124. getRoute(request: RouteRequest) {
  125. return this.sceneGrpcService.getRoute(request);
  126. }
  127. getBreakPoint(request: GetBreakPointRequest) {
  128. return this.sceneGrpcService.getBreakPoint(request);
  129. }
  130. init(request: InitRequest) {
  131. try {
  132. // const initReply = this.sceneGrpcService.init(request);
  133. // initReply.subscribe((reply) => {
  134. // console.log('initReply', reply);
  135. // });
  136. this.rotateService.init(request.app_id, request.user_id);
  137. this.moveService.init(request.app_id, request.user_id);
  138. } catch (error) {
  139. console.log('error', error);
  140. }
  141. }
  142. exit(request: ExitRequest) {
  143. // const exitReply = this.sceneGrpcService.exit(request);
  144. // exitReply.subscribe((reply) => {
  145. // console.log('exitReply', reply);
  146. // });
  147. }
  148. async rotate(request: RotateRequest) {
  149. try {
  150. if (!this.roQueueSubscription) {
  151. this.handleRotateStream();
  152. }
  153. if (!this.onSteaming) {
  154. let redisMeta: StreamReplyType;
  155. this.onRotating = true;
  156. if (this.onMoving) {
  157. const lastStreamFrame = this.lastMoveStreamFrame.getValue();
  158. const metaData: StreamReplyType = JSON.parse(
  159. lastStreamFrame.metaData,
  160. ) as any as StreamReplyType;
  161. const newUserStates: NewUserStatesType = metaData.newUserStates.find(
  162. (item) => item.userId === this.user_id,
  163. );
  164. const trace_id = metaData.traceIds[0];
  165. const userId = newUserStates.userId;
  166. const breakPointId = lastStreamFrame.marker
  167. .replace('P', '')
  168. .replace('T', '-');
  169. const cameraAngle = newUserStates.playerState.camera.angle;
  170. const playerAngle = newUserStates.playerState.player.angle;
  171. this.onMoving = false;
  172. console.log('stop-data', trace_id, userId, cameraAngle, cameraAngle);
  173. redisMeta = await this.moveService.stop(
  174. trace_id,
  175. userId,
  176. breakPointId,
  177. cameraAngle,
  178. playerAngle,
  179. );
  180. console.log('stop-redisMeta', redisMeta);
  181. // redisMeta = await this.rotateService.rotate(request);
  182. } else {
  183. redisMeta = await this.rotateService.rotate(request);
  184. }
  185. if (redisMeta && 'mediaSrc' in redisMeta) {
  186. const mediaSrc: string = redisMeta.mediaSrc || '';
  187. if (mediaSrc.length > 0) {
  188. let src = mediaSrc.split('?')[0];
  189. // 临时本地替换路经
  190. src = src.replace('/0000000001/', '');
  191. // 判断不是同一条源时才推出
  192. if (this.lastRenderMedia !== src) {
  193. // console.log('不同源');
  194. // this.frameCnt += 1;
  195. console.log('[core-src]', src);
  196. this.holdSteam();
  197. this.lastRenderMedia = src;
  198. const clipPath = path.join(__dirname, `../ws/video/${src}`);
  199. // console.log('src-clipPath', src, clipPath);
  200. delete redisMeta.mediaSrc;
  201. const stream: StreamFrameType = {
  202. frame: -1,
  203. clipPath: clipPath,
  204. metaData: JSON.stringify(redisMeta),
  205. serverTime: this.mockserverTime,
  206. DIR: 3,
  207. };
  208. this.roQueue.next(stream);
  209. }
  210. }
  211. }
  212. }
  213. } catch (error) {
  214. this.logger.error('rotate', error);
  215. console.log('error', error);
  216. }
  217. }
  218. joystick(request: JoystickRequest) {
  219. return this.sceneGrpcService.joystick(request);
  220. }
  221. handleDataChanelOpen(channel: DataChannel): void {
  222. this.channel = channel;
  223. this.streamService.setChannel(channel);
  224. this.startSteaming.next(true);
  225. this.startStream();
  226. this.handleStream();
  227. }
  228. handleDataChanelClose(): void {
  229. this.stopStream();
  230. this.startSteaming.next(false);
  231. this.streamService.closeChannel();
  232. const exitRequest: ExitRequest = {
  233. action_type: 1002,
  234. user_id: this.user_id,
  235. trace_id: '',
  236. };
  237. this.exit(exitRequest);
  238. }
  239. handleMessage(message: string | Buffer) {
  240. try {
  241. if (typeof message === 'string') {
  242. // wasm:特例, requestIframe
  243. if (message.includes('wasm:')) {
  244. const msg: RTCMessageRequest = JSON.parse(
  245. message.replace('wasm:', ''),
  246. );
  247. if (msg.MstType === 0) {
  248. this.logger.log('lost I frame');
  249. this.handleIframeRequest();
  250. }
  251. } else {
  252. const msg: RTCMessageRequest = JSON.parse(message);
  253. switch (msg.action_type) {
  254. case ActionType.walk:
  255. const walk = msg;
  256. this.walking(walk);
  257. break;
  258. case ActionType.breathPoint:
  259. this.handleBreath(msg);
  260. break;
  261. case ActionType.rotate:
  262. const rotateRequest: RotateRequest = msg;
  263. this.rotate(rotateRequest);
  264. break;
  265. case ActionType.userStatus:
  266. this.updateUserStatus(msg);
  267. break;
  268. case ActionType.status:
  269. this.updateStatus();
  270. break;
  271. default:
  272. break;
  273. }
  274. }
  275. }
  276. } catch (error) {
  277. this.logger.error('handleMessage:rtc--error', message);
  278. }
  279. }
  280. handleIframeRequest() {
  281. const lastStreamFrame = this.streamService.lastStreamFrame.getValue();
  282. lastStreamFrame.DIR = 1;
  283. console.log('lastStreamFrame', lastStreamFrame);
  284. const nextFrame = this.frameCnt.getValue() + 1;
  285. lastStreamFrame.frame = nextFrame;
  286. this.streamService.pushFrameToSteam(lastStreamFrame);
  287. }
  288. async walking(req) {
  289. console.log('walking', req);
  290. this.clickQueue.next(req);
  291. this.walkingSub = this.clickQueue.subscribe(async (request) => {
  292. const user = this.moveService.users[this.user_id];
  293. const path = await this.getRouterService.searchRoad(
  294. user.appId,
  295. user.breakPointId,
  296. req.clicking_action.clicking_point,
  297. );
  298. const walkingRes = await this.moveService.move(path, request);
  299. console.log('walkingRes', walkingRes)
  300. debugger;
  301. if (walkingRes && !this.onMoving) {
  302. this.onMoving = true;
  303. this.holdSteam();
  304. if (!this.moveQueueSubscription) {
  305. this.handleMoveSteam();
  306. }
  307. const res = Object.keys(walkingRes).map((item) => {
  308. console.log('item', item);
  309. return Array.from(walkingRes[item]).map((i) => {
  310. i['marker'] = item;
  311. return i;
  312. });
  313. });
  314. const seqs = Array.from(res).flat();
  315. this.lastMoveCnt = this.frameCnt.value + seqs.length;
  316. console.log('lastMoveCnt', this.lastMoveCnt);
  317. seqs.forEach((frame: StreamReplyType) => {
  318. const mediaSrc = frame.mediaSrc;
  319. delete frame.mediaSrc;
  320. const stream: StreamFrameType = {
  321. frame: -1,
  322. clipPath: mediaSrc,
  323. metaData: JSON.stringify(frame),
  324. serverTime: this.mockserverTime,
  325. DIR: 1,
  326. };
  327. this.moveQueue.next(stream);
  328. });
  329. }
  330. });
  331. }
  332. async handleBreath(request) {
  333. const npsRes = await this.moveService.getBreakPoints(request);
  334. console.log('npsRes', npsRes);
  335. this.streamService.pushNormalDataToStream(npsRes);
  336. }
  337. updateStatus() {
  338. const reply = {
  339. data: { action_type: 1009, echo_msg: { echoMsg: Date.now() } },
  340. track: false,
  341. };
  342. this.streamService.pushNormalDataToStream(reply);
  343. }
  344. async updateUserStatus(request) {
  345. try {
  346. //TODO 接入redis数据
  347. const redisMeta = await this.rotateService.getNewUserStateRequest(
  348. request,
  349. );
  350. if (redisMeta) {
  351. redisMeta.actionType = 1024;
  352. this.streamService.pushNormalDataToStream(redisMeta);
  353. }
  354. } catch (error) {
  355. this.logger.error('updateUserStatus::function', error);
  356. }
  357. }
  358. pushFirstRender(clipPath: string, metaData: string): Promise<boolean> {
  359. return new Promise<boolean>(async (resolve, reject) => {
  360. try {
  361. const streamData: StreamFrameType = {
  362. frame: 1,
  363. clipPath: clipPath,
  364. metaData: metaData,
  365. serverTime: this.mockserverTime,
  366. DIR: 1,
  367. };
  368. const hasPush = await this.streamService.pushFrameToSteam(streamData);
  369. return resolve(hasPush);
  370. } catch (error) {
  371. return reject(false);
  372. }
  373. });
  374. }
  375. handleStream() {
  376. this.frameCntSubscription = this.frameCnt.subscribe(async (frame) => {
  377. try {
  378. console.log('frame', frame);
  379. if (frame === 1) {
  380. const redisData = await this.rotateService.echo(this.user_id);
  381. this.onSteaming = true;
  382. this.holdSteam();
  383. if (redisData && 'mediaSrc' in redisData) {
  384. const mediaSrc: string = redisData.mediaSrc || '';
  385. if (mediaSrc.length > 0) {
  386. let src = mediaSrc.split('?')[0];
  387. // 临时本地替换路经
  388. src = src.replace('/0000000001/', '');
  389. const clipPath = path.join(__dirname, `../ws/video/${src}`);
  390. delete redisData.mediaSrc;
  391. this.logger.log(
  392. `user:${this.user_id}:first render stream` +
  393. JSON.stringify({ path: clipPath, meta: redisData }),
  394. );
  395. const status = await this.pushFirstRender(
  396. clipPath,
  397. JSON.stringify(redisData),
  398. );
  399. if (status) {
  400. this.firstRender = true;
  401. this.resumeStream(2);
  402. } else {
  403. this.logger.error('first render problem', status);
  404. }
  405. }
  406. }
  407. }
  408. if (
  409. frame > 1 &&
  410. !this.onSteaming &&
  411. !this.onMoving &&
  412. this.firstRender
  413. ) {
  414. console.log(`空白流::${frame}`);
  415. const redisDataAuto = await this.rotateService.echo(this.user_id);
  416. if (redisDataAuto) {
  417. 'mediaSrc' in redisDataAuto && delete redisDataAuto.mediaSrc;
  418. const streamMeta: StreamMetaType = {
  419. frame: frame,
  420. metaData: JSON.stringify(redisDataAuto),
  421. };
  422. this.streamService.pushMetaDataToSteam(streamMeta);
  423. }
  424. }
  425. } catch (error) {
  426. this.logger.error('handleStream', error);
  427. }
  428. });
  429. }
  430. handleRotateStream() {
  431. this.roQueueSubscription = this.roQueue.subscribe(
  432. async (stream: StreamFrameType) => {
  433. this.rotateTimeStamp = Date.now();
  434. if (this.rotateframeCnt === -1) {
  435. this.rotateframeCnt = this.frameCnt.value;
  436. }
  437. this.rotateframeCnt += 1;
  438. stream.frame = this.rotateframeCnt;
  439. console.log('[media]', stream.clipPath);
  440. this.logger.log(
  441. `roQueueSubscription:frame:${this.rotateframeCnt} ` +
  442. JSON.stringify(stream.metaData),
  443. );
  444. await this.streamService.pushFrameToSteam(stream);
  445. setTimeout(() => {
  446. const now = Date.now();
  447. if (now - this.rotateTimeStamp > 300) {
  448. const next = this.rotateframeCnt + 1;
  449. this.resumeStream(next);
  450. this.rotateframeCnt = -1;
  451. this.onMoving = false;
  452. this.onRotating = false;
  453. }
  454. }, 300);
  455. },
  456. );
  457. }
  458. cleanMoveSteam() {
  459. if (this.moveQueueSubscription) {
  460. this.moveQueueSubscription.unsubscribe();
  461. this.lastMoveCnt = -1;
  462. this.moveQueueSubscription = null;
  463. }
  464. if (this.walkingSub) {
  465. this.walkingSub.unsubscribe();
  466. this.walkingSub = null;
  467. }
  468. }
  469. handleMoveSteam() {
  470. this.moveQueueSubscription = this.moveQueue.subscribe(
  471. async (stream: StreamFrameType) => {
  472. const metaData: StreamReplyType = JSON.parse(stream.metaData);
  473. console.log('handleMoveSteam-onMoving', this.onMoving);
  474. stream.marker = metaData.marker;
  475. this.lastMoveStreamFrame.next(stream);
  476. const next = this.frameCnt.value + 1;
  477. this.currentMoveMaker = metaData.marker;
  478. if (this.onMoving) {
  479. this.frameCnt.next(next);
  480. } else {
  481. console.log('handleMoveSteam stop', next, this.currentMoveMaker);
  482. this.cleanMoveSteam();
  483. this.resumeStream(next);
  484. return;
  485. }
  486. let src = stream.clipPath.split('?')[0];
  487. // // 临时本地替换路经
  488. src = src.replace('/0000000001/', '');
  489. const clipPath = path.join(__dirname, `../ws/video/${src}`);
  490. const streamData: StreamFrameType = {
  491. frame: next,
  492. clipPath: clipPath,
  493. metaData: stream.metaData,
  494. serverTime: this.mockserverTime,
  495. DIR: 3,
  496. };
  497. await this.streamService.pushFrameToSteam(streamData);
  498. if (this.lastMoveCnt == this.frameCnt.getValue()) {
  499. const next = this.frameCnt.getValue() + 1;
  500. console.log('last', next);
  501. this.resumeStream(next);
  502. this.cleanMoveSteam();
  503. const lastFrame = this.lastMoveStreamFrame.getValue();
  504. const userId = this.user_id;
  505. const breakPointId = lastFrame.marker.split('T')[1];
  506. const lastReply = JSON.parse(lastFrame.metaData);
  507. this.moveService.updateUser(userId, breakPointId, lastReply);
  508. }
  509. },
  510. );
  511. }
  512. }