stream.service.ts 14 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402
  1. import { Injectable, Logger } from '@nestjs/common';
  2. import { DataChannel } from 'node-datachannel';
  3. // import * as path from 'path';
  4. import { existsSync, readFileSync } from 'fs';
  5. import * as streamBuffers from 'stream-buffers';
  6. import { BehaviorSubject } from 'rxjs';
  7. import { join } from 'path';
  8. @Injectable()
  9. export class StreamService {
  10. private channel: DataChannel;
  11. private readonly chunk_size = 16000;
  12. private readonly block = 36;
  13. private logger: Logger = new Logger('StreamService');
  14. public onSteaming = new BehaviorSubject<boolean>(false);
  15. public lastStreamFrame = new BehaviorSubject<StreamFrameType>({
  16. frame: -1,
  17. clipPath: '',
  18. metaData: '',
  19. });
  20. // constructor() { }
  21. setChannel(channel: DataChannel) {
  22. this.channel = channel;
  23. }
  24. closeChannel() {
  25. this.channel = null;
  26. }
  27. public sleep = (ms: number) => new Promise((r) => setTimeout(r, ms));
  28. /**
  29. * stream core push normal stream
  30. * @param data meta Json
  31. */
  32. pushNormalDataToStream(data: any) {
  33. const replyBin = JSON.stringify(data).replace(/\s/g, '');
  34. const buff = Buffer.from(replyBin, 'utf-8');
  35. if (this.channel && this.channel.isOpen()) {
  36. this.channel.sendMessageBinary(buff);
  37. }
  38. }
  39. /**
  40. * stream core push with block meta stream
  41. * @param data meta Json
  42. */
  43. pushMetaDataToSteam(stream: StreamMetaType): Promise<StreamPushResponse> {
  44. return new Promise((resolve, reject) => {
  45. try {
  46. const metaData = stream.metaData;
  47. const frame = stream.frame;
  48. const metaDataBin = metaData.replace(/\s/g, '');
  49. const buff = Buffer.from(metaDataBin, 'utf-8');
  50. if (stream.frame < 0) {
  51. this.logger.error('不存在的帧位::' + stream.frame);
  52. console.error('不存在的帧位::' + stream.frame);
  53. return resolve({ frame: stream.frame, done: false });
  54. }
  55. // const block = 36;
  56. const blockBuff = Buffer.alloc(this.block);
  57. const packBuffer = Buffer.concat([blockBuff, buff]);
  58. const statusPack = new DataView(
  59. packBuffer.buffer.slice(
  60. packBuffer.byteOffset,
  61. packBuffer.byteOffset + packBuffer.byteLength,
  62. ),
  63. );
  64. statusPack.setUint32(0, 1437227610);
  65. // 16 bit slot
  66. statusPack.setUint16(6, this.block);
  67. statusPack.setUint16(8, frame);
  68. statusPack.setUint16(10, 255);
  69. statusPack.setUint16(24, 0);
  70. statusPack.setUint16(26, 0);
  71. // 32 bit slot
  72. statusPack.setUint32(12, buff.byteLength);
  73. statusPack.setUint32(16, 0);
  74. // statusPack.setUint32(20, 0);
  75. statusPack.setUint32(24, 0);
  76. statusPack.setUint32(28, 0);
  77. statusPack.setUint32(32, 0);
  78. if (this.channel && this.channel.isOpen()) {
  79. const done = this.channel.sendMessageBinary(
  80. Buffer.from(statusPack.buffer),
  81. );
  82. return resolve({ frame: stream.frame, done: done });
  83. } else {
  84. return resolve({ frame: stream.frame, done: false });
  85. }
  86. } catch (error) {
  87. this.logger.error(error);
  88. return reject({ frame: stream.frame, done: false });
  89. }
  90. });
  91. }
  92. /**
  93. * stream core push with block stream
  94. * @param stream meta Json and stream
  95. */
  96. pushFrameToSteam(stream: StreamFrameType): Promise<StreamPushResponse> {
  97. return new Promise(async (resolve, reject) => {
  98. try {
  99. // let start, stop;
  100. const start = performance.now();
  101. //TODO process.env 开发路径
  102. let clipPath: string;
  103. if (process.env.NODE_ENV === 'development') {
  104. const src = stream.clipPath.replace('/mnt/metaverse/scene', '');
  105. const srcTmp = join(__dirname, `../ws/${src}`);
  106. clipPath = srcTmp;
  107. } else {
  108. clipPath = stream.clipPath;
  109. }
  110. // 增加不存在帧数据中断数据,原因有太多不准确的路径。
  111. // 其次其他地方会拿这里的最后一帧数据会出错,由于上流数据很多不稳定问题尽可保流的稳定性。
  112. if (!existsSync(clipPath)) {
  113. this.logger.error('不存在的推流路径::' + clipPath);
  114. console.error('不存在的推流路径::' + clipPath);
  115. return resolve({ frame: stream.frame, done: false });
  116. }
  117. if (stream.frame < 0) {
  118. this.logger.error('不存在的帧位::' + stream.frame);
  119. console.error('不存在的帧位::' + stream.frame);
  120. return resolve({ frame: stream.frame, done: false });
  121. }
  122. // const clipPath = stream.clipPath;
  123. const metaData = stream.metaData || '{}';
  124. const frame = stream.frame;
  125. const serverTime = stream.serverTime || 754871824;
  126. const dir = stream.DIR || 1;
  127. this.lastStreamFrame.next({
  128. clipPath: stream.clipPath,
  129. metaData: metaData,
  130. frame: frame,
  131. serverTime: serverTime,
  132. DIR: dir,
  133. });
  134. const metaDataString = metaData.replace(/\s/g, '');
  135. const coordBuff = Buffer.from(metaDataString, 'utf-8');
  136. const clipBuffer = readFileSync(clipPath);
  137. const allData = Buffer.concat([coordBuff, clipBuffer]);
  138. const slices = Math.floor(
  139. allData.byteLength / (this.chunk_size - this.block),
  140. );
  141. let steamByteLength = 0;
  142. for (let i = 0; i <= slices; i++) {
  143. this.onSteaming.next(true);
  144. const startSlot = i * (this.chunk_size - this.block);
  145. const sliceLength =
  146. i === slices
  147. ? allData.byteLength
  148. : (i + 1) * (this.chunk_size - this.block);
  149. const currentSlice = allData.slice(startSlot, sliceLength);
  150. // console.log('startSlot', startSlot);
  151. // console.log('sliceLength', sliceLength);
  152. const blockBuffStart = Buffer.alloc(this.block);
  153. const packBuffer = Buffer.concat([blockBuffStart, currentSlice]);
  154. const framePack = new DataView(
  155. packBuffer.buffer.slice(
  156. packBuffer.byteOffset,
  157. packBuffer.byteOffset + packBuffer.byteLength,
  158. ),
  159. );
  160. // 16 bit slot
  161. // framePack.setUint32(4)
  162. framePack.setUint16(6, this.block);
  163. framePack.setUint16(8, frame); // first render cnt
  164. framePack.setUint16(10, dir); // isDIR
  165. framePack.setUint16(24, 0);
  166. framePack.setUint16(26, 0);
  167. // 32 bit slot
  168. // statusPack.setUint32(12, buff.byteLength);
  169. // this.logger.log('metaLen', coordBuff.byteLength);
  170. // this.logger.log('metaLen', clipBuffer.byteLength);
  171. // console.log('steamByteLength', steamByteLength);
  172. framePack.setUint32(0, 1437227610);
  173. framePack.setUint32(12, coordBuff.byteLength); // metaLen
  174. framePack.setUint32(16, clipBuffer.byteLength); // mediaLen
  175. framePack.setUint32(20, serverTime); //server_time
  176. framePack.setUint32(24, 0);
  177. framePack.setUint32(28, 0);
  178. framePack.setUint32(this.block - 4, steamByteLength);
  179. steamByteLength += currentSlice.byteLength;
  180. // const isLastFrame = framePack.byteLength - this.chunk_size < 0;
  181. // this.logger.log('statusPack', statusPack);
  182. if (this.channel && this.channel.isOpen()) {
  183. const isPush = this.channel.sendMessageBinary(
  184. Buffer.from(framePack.buffer),
  185. );
  186. if (!isPush) {
  187. await this.sleep(5);
  188. console.error('流测试推不成功-再试', isPush);
  189. }
  190. if (i === slices) {
  191. this.onSteaming.next(false);
  192. steamByteLength = 0;
  193. if (isPush) {
  194. // debugger;
  195. const stop = performance.now();
  196. const inMillSeconds = stop - start;
  197. const rounded = Number(inMillSeconds).toFixed(3);
  198. this.logger.log(
  199. `[timer]-当前流:${stream.clipPath}流耗时-->${rounded}ms`,
  200. );
  201. return resolve({
  202. frame: stream.frame,
  203. done: true,
  204. clipPath: stream.clipPath,
  205. });
  206. } else {
  207. return resolve({
  208. frame: stream.frame,
  209. done: false,
  210. clipPath: stream.clipPath,
  211. });
  212. }
  213. }
  214. } else {
  215. return resolve({ frame: stream.frame, done: false });
  216. }
  217. }
  218. // let steamByteLength = 0;
  219. console.log(
  220. 'stream-size %s : frame: %s, IDR: %s',
  221. ((coordBuff.byteLength + clipBuffer.byteLength) / 1024).toFixed(2) +
  222. ' KB',
  223. frame,
  224. dir,
  225. );
  226. } catch (error) {
  227. this.logger.error(error);
  228. return reject({ frame: stream.frame, done: false });
  229. }
  230. });
  231. }
  232. /**
  233. * stream core push with block stream
  234. * @param stream meta Json and stream
  235. */
  236. pushFrameToSteam1(stream: StreamFrameType): Promise<StreamPushResponse> {
  237. return new Promise((resolve, reject) => {
  238. try {
  239. // let start, stop;
  240. const start = performance.now();
  241. //TODO process.env 开发路径
  242. let clipPath: string;
  243. if (process.env.NODE_ENV === 'development') {
  244. const src = stream.clipPath.replace('/mnt/metaverse/scene', '');
  245. const srcTmp = join(__dirname, `../ws/${src}`);
  246. clipPath = srcTmp;
  247. } else {
  248. clipPath = stream.clipPath;
  249. }
  250. // 增加不存在帧数据中断数据,原因有太多不准确的路径。
  251. // 其次其他地方会拿这里的最后一帧数据会出错,由于上流数据很多不稳定问题尽可保流的稳定性。
  252. if (!existsSync(clipPath)) {
  253. this.logger.error('不存在的推流路径::' + clipPath);
  254. console.error('不存在的推流路径::' + clipPath);
  255. return resolve({ frame: stream.frame, done: false });
  256. }
  257. if (stream.frame < 0) {
  258. this.logger.error('不存在的帧位::' + stream.frame);
  259. console.error('不存在的帧位::' + stream.frame);
  260. return resolve({ frame: stream.frame, done: false });
  261. }
  262. // const clipPath = stream.clipPath;
  263. const metaData = stream.metaData || '{}';
  264. const frame = stream.frame;
  265. const serverTime = stream.serverTime || 754871824;
  266. const dir = stream.DIR || 1;
  267. this.lastStreamFrame.next({
  268. clipPath: stream.clipPath,
  269. metaData: metaData,
  270. frame: frame,
  271. serverTime: serverTime,
  272. DIR: dir,
  273. });
  274. const metaDataString = metaData.replace(/\s/g, '');
  275. const coordBuff = Buffer.from(metaDataString, 'utf-8');
  276. // console.warn('coordBuff', coordBuff.byteLength);
  277. // const steamStat = statSync(clipPath);
  278. // const steamTotalSize = metaData.length + steamStat.size;
  279. const clipBuffer = readFileSync(clipPath);
  280. const steam = new streamBuffers.ReadableStreamBuffer({
  281. frequency: 0, // in milliseconds.
  282. chunkSize: this.chunk_size - this.block, // in bytes.
  283. });
  284. steam.put(coordBuff);
  285. steam.put(clipBuffer);
  286. let steamByteLength = 0;
  287. console.log(
  288. 'stream-size %s : frame: %s, IDR: %s',
  289. ((coordBuff.byteLength + clipBuffer.byteLength) / 1024).toFixed(2) +
  290. ' KB',
  291. frame,
  292. dir,
  293. );
  294. steam.on('data', (data: Buffer) => {
  295. this.onSteaming.next(true);
  296. // this.logger.log('data', data, data.byteLength);
  297. const blockBuffStart = Buffer.alloc(this.block);
  298. const packBuffer = Buffer.concat([blockBuffStart, data]);
  299. const framePack = new DataView(
  300. packBuffer.buffer.slice(
  301. packBuffer.byteOffset,
  302. packBuffer.byteOffset + packBuffer.byteLength,
  303. ),
  304. );
  305. // 16 bit slot
  306. // framePack.setUint32(4)
  307. framePack.setUint16(6, this.block);
  308. framePack.setUint16(8, frame); // first render cnt
  309. framePack.setUint16(10, dir); // isDIR
  310. framePack.setUint16(24, 0);
  311. framePack.setUint16(26, 0);
  312. // 32 bit slot
  313. // statusPack.setUint32(12, buff.byteLength);
  314. // this.logger.log('metaLen', coordBuff.byteLength);
  315. // this.logger.log('metaLen', clipBuffer.byteLength);
  316. framePack.setUint32(0, 1437227610);
  317. framePack.setUint32(12, coordBuff.byteLength); // metaLen
  318. framePack.setUint32(16, clipBuffer.byteLength); // mediaLen
  319. framePack.setUint32(20, serverTime); //server_time
  320. framePack.setUint32(24, 0);
  321. framePack.setUint32(28, 0);
  322. framePack.setUint32(this.block - 4, steamByteLength);
  323. const isLastFrame = framePack.byteLength - this.chunk_size < 0;
  324. // this.logger.log('statusPack', statusPack);
  325. if (this.channel && this.channel.isOpen()) {
  326. this.channel.sendMessageBinary(Buffer.from(framePack.buffer));
  327. }
  328. steamByteLength += data.byteLength;
  329. if (isLastFrame) {
  330. // this.logger.log('isLastFrame', isLastFrame);
  331. // steamByteLength = 0;
  332. // this.onSteaming.next(false);
  333. steam.stop();
  334. }
  335. });
  336. //TODO steam can't trigger end
  337. steam.on('end', () => {
  338. steamByteLength = 0;
  339. // this.logger.log('stream end');
  340. const stop = performance.now();
  341. const inMillSeconds = stop - start;
  342. const rounded = Number(inMillSeconds).toFixed(3);
  343. this.logger.log(
  344. `[timer]-当前流:${stream.clipPath}流耗时-->${rounded}ms`,
  345. );
  346. if (this.onSteaming.value) {
  347. this.onSteaming.next(false);
  348. }
  349. return resolve({ frame: stream.frame, done: true });
  350. });
  351. steam.on('error', (error) => {
  352. this.logger.error('steam-error', error.message);
  353. return reject({ frame: stream.frame, done: false });
  354. });
  355. } catch (error) {
  356. this.logger.error(error);
  357. return reject({ frame: stream.frame, done: false });
  358. }
  359. });
  360. }
  361. }