stream.service.ts 7.7 KB


  1. import { Injectable, Logger } from '@nestjs/common';
  2. import { DataChannel } from 'node-datachannel';
  3. // import * as path from 'path';
  4. import { existsSync, readFileSync } from 'fs';
  5. import * as streamBuffers from 'stream-buffers';
  6. import { BehaviorSubject } from 'rxjs';
  7. import { CacheService } from 'src/cache/cache.service';
  8. import { join } from 'path';
  9. @Injectable()
  10. export class StreamService {
  11. private channel: DataChannel;
  12. private readonly chunk_size = 16000;
  13. private readonly block = 36;
  14. private logger: Logger = new Logger('StreamService');
  15. public onSteaming = new BehaviorSubject<boolean>(false);
  16. public lastStreamFrame = new BehaviorSubject<StreamFrameType>({
  17. frame: -1,
  18. clipPath: '',
  19. metaData: '',
  20. });
  21. // constructor() {}
  22. setChannel(channel: DataChannel) {
  23. this.channel = channel;
  24. }
  25. closeChannel() {
  26. this.channel = null;
  27. }
  28. /**
  29. * stream core push normal stream
  30. * @param data meta Json
  31. */
  32. pushNormalDataToStream(data: any) {
  33. const replyBin = JSON.stringify(data).replace(/\s/g, '');
  34. const buff = Buffer.from(replyBin, 'utf-8');
  35. if (this.channel && this.channel.isOpen()) {
  36. this.channel.sendMessageBinary(buff);
  37. }
  38. }
  39. /**
  40. * stream core push with block meta stream
  41. * @param data meta Json
  42. */
  43. pushMetaDataToSteam(stream: StreamMetaType): Promise<StreamPushResponse> {
  44. return new Promise((resolve, reject) => {
  45. try {
  46. const metaData = stream.metaData;
  47. const frame = stream.frame;
  48. const metaDataBin = metaData.replace(/\s/g, '');
  49. const buff = Buffer.from(metaDataBin, 'utf-8');
  50. // const block = 36;
  51. const blockBuff = Buffer.alloc(this.block);
  52. const packBuffer = Buffer.concat([blockBuff, buff]);
  53. const statusPack = new DataView(
  54. packBuffer.buffer.slice(
  55. packBuffer.byteOffset,
  56. packBuffer.byteOffset + packBuffer.byteLength,
  57. ),
  58. );
  59. statusPack.setUint32(0, 1437227610);
  60. // 16 bit slot
  61. statusPack.setUint16(6, this.block);
  62. statusPack.setUint16(8, frame);
  63. statusPack.setUint16(10, 255);
  64. statusPack.setUint16(24, 0);
  65. statusPack.setUint16(26, 0);
  66. // 32 bit slot
  67. statusPack.setUint32(12, buff.byteLength);
  68. statusPack.setUint32(16, 0);
  69. // statusPack.setUint32(20, 0);
  70. statusPack.setUint32(24, 0);
  71. statusPack.setUint32(28, 0);
  72. statusPack.setUint32(32, 0);
  73. if (this.channel && this.channel.isOpen()) {
  74. const done = this.channel.sendMessageBinary(
  75. Buffer.from(statusPack.buffer),
  76. );
  77. return resolve({ frame: stream.frame, done: done });
  78. } else {
  79. return resolve({ frame: stream.frame, done: false });
  80. }
  81. } catch (error) {
  82. this.logger.error(error);
  83. return reject({ frame: stream.frame, done: false });
  84. }
  85. });
  86. }
  87. /**
  88. * stream core push with block stream
  89. * @param stream meta Json and stream
  90. */
  91. pushFrameToSteam(stream: StreamFrameType): Promise<StreamPushResponse> {
  92. return new Promise((resolve, reject) => {
  93. try {
  94. // let start, stop;
  95. const start = performance.now();
  96. //TODO process.env 开发路径
  97. let clipPath: string;
  98. if (process.env.NODE_ENV === 'development') {
  99. const src = stream.clipPath.replace('/mnt/metaverse/scene', '');
  100. const srcTmp = join(__dirname, `../ws/${src}`);
  101. clipPath = srcTmp;
  102. } else {
  103. clipPath = stream.clipPath;
  104. }
  105. // 增加不存在帧数据中断数据,原因有太多不准确的路径。
  106. // 其次其他地方会拿这里的最后一帧数据会出错,由于上流数据很多不稳定问题尽可保流的稳定性。
  107. if (!existsSync(clipPath)) {
  108. this.logger.error('不存在的推流路径::' + clipPath);
  109. return resolve({ frame: stream.frame, done: false });
  110. }
  111. // const clipPath = stream.clipPath;
  112. const metaData = stream.metaData || '{}';
  113. const frame = stream.frame;
  114. const serverTime = stream.serverTime || 754871824;
  115. const dir = stream.DIR || 1;
  116. this.lastStreamFrame.next({
  117. clipPath: stream.clipPath,
  118. metaData: metaData,
  119. frame: frame,
  120. serverTime: serverTime,
  121. DIR: dir,
  122. });
  123. const metaDataString = metaData.replace(/\s/g, '');
  124. const coordBuff = Buffer.from(metaDataString, 'utf-8');
  125. // console.warn('coordBuff', coordBuff.byteLength);
  126. // const steamStat = statSync(clipPath);
  127. // const steamTotalSize = metaData.length + steamStat.size;
  128. const clipBuffer = readFileSync(clipPath);
  129. const steam = new streamBuffers.ReadableStreamBuffer({
  130. frequency: 1, // in milliseconds.
  131. chunkSize: this.chunk_size - this.block, // in bytes.
  132. });
  133. steam.put(coordBuff);
  134. steam.put(clipBuffer);
  135. let steamByteLength = 0;
  136. steam.on('data', (data: Buffer) => {
  137. this.onSteaming.next(true);
  138. // this.logger.log('data', data, data.byteLength);
  139. const blockBuffStart = Buffer.alloc(this.block);
  140. const packBuffer = Buffer.concat([blockBuffStart, data]);
  141. const framePack = new DataView(
  142. packBuffer.buffer.slice(
  143. packBuffer.byteOffset,
  144. packBuffer.byteOffset + packBuffer.byteLength,
  145. ),
  146. );
  147. // 16 bit slot
  148. // framePack.setUint32(4)
  149. framePack.setUint16(6, this.block);
  150. framePack.setUint16(8, frame); // first render cnt
  151. framePack.setUint16(10, dir); // isDIR
  152. framePack.setUint16(24, 0);
  153. framePack.setUint16(26, 0);
  154. // 32 bit slot
  155. // statusPack.setUint32(12, buff.byteLength);
  156. // this.logger.log('metaLen', coordBuff.byteLength);
  157. // this.logger.log('metaLen', clipBuffer.byteLength);
  158. framePack.setUint32(0, 1437227610);
  159. framePack.setUint32(12, coordBuff.byteLength); // metaLen
  160. framePack.setUint32(16, clipBuffer.byteLength); // mediaLen
  161. framePack.setUint32(20, serverTime); //server_time
  162. framePack.setUint32(24, 0);
  163. framePack.setUint32(28, 0);
  164. framePack.setUint32(this.block - 4, steamByteLength);
  165. const isLastFrame = framePack.byteLength - this.chunk_size < 0;
  166. // this.logger.log('statusPack', statusPack);
  167. if (this.channel && this.channel.isOpen()) {
  168. this.channel.sendMessageBinary(Buffer.from(framePack.buffer));
  169. }
  170. steamByteLength += data.byteLength;
  171. if (isLastFrame) {
  172. // this.logger.log('isLastFrame', isLastFrame);
  173. // steamByteLength = 0;
  174. // this.onSteaming.next(false);
  175. steam.stop();
  176. }
  177. });
  178. //TODO steam can't trigger end
  179. steam.on('end', () => {
  180. steamByteLength = 0;
  181. // this.logger.log('stream end');
  182. const stop = performance.now();
  183. const inMillSeconds = stop - start;
  184. const rounded = Number(inMillSeconds).toFixed(3);
  185. this.logger.log(
  186. `[timer]-当前流:${stream.clipPath}流耗时-->${rounded}ms`,
  187. );
  188. if (this.onSteaming.value) {
  189. this.onSteaming.next(false);
  190. }
  191. return resolve({ frame: stream.frame, done: true });
  192. });
  193. steam.on('error', (error) => {
  194. this.logger.error('steam-error', error.message);
  195. return reject({ frame: stream.frame, done: false });
  196. });
  197. } catch (error) {
  198. this.logger.error(error);
  199. return reject({ frame: stream.frame, done: false });
  200. }
  201. });
  202. }
  203. }