stream.service.ts 6.3 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193
  1. import { Injectable, Logger } from '@nestjs/common';
  2. import { DataChannel } from 'node-datachannel';
  3. // import * as path from 'path';
  4. import { readFileSync } from 'fs';
  5. import * as streamBuffers from 'stream-buffers';
  6. import { BehaviorSubject } from 'rxjs';
  7. import { CacheService } from 'src/cache/cache.service';
  8. import { join } from 'path';
  9. @Injectable()
  10. export class StreamService {
  11. private channel: DataChannel;
  12. private readonly chunk_size = 16000;
  13. private readonly block = 36;
  14. private logger: Logger = new Logger('StreamService');
  15. public onSteaming = new BehaviorSubject<boolean>(false);
  16. public lastStreamFrame = new BehaviorSubject<StreamFrameType>({
  17. frame: -1,
  18. clipPath: '',
  19. metaData: '',
  20. });
  21. constructor(private cacheService: CacheService) {}
  22. setChannel(channel: DataChannel) {
  23. this.channel = channel;
  24. }
  25. closeChannel() {
  26. this.channel = null;
  27. }
  28. /**
  29. * stream core push normal stream
  30. * @param data meta Json
  31. */
  32. pushNormalDataToStream(data: any) {
  33. const replyBin = JSON.stringify(data).replace(/\s/g, '');
  34. const buff = Buffer.from(replyBin, 'utf-8');
  35. if (this.channel && this.channel.isOpen()) {
  36. this.channel.sendMessageBinary(buff);
  37. }
  38. }
  39. /**
  40. * stream core push with block meta stream
  41. * @param data meta Json
  42. */
  43. pushMetaDataToSteam(stream: StreamMetaType) {
  44. try {
  45. const metaData = stream.metaData;
  46. const frame = stream.frame;
  47. const metaDataBin = metaData.replace(/\s/g, '');
  48. const buff = Buffer.from(metaDataBin, 'utf-8');
  49. // const block = 36;
  50. const blockBuff = Buffer.alloc(this.block);
  51. const packBuffer = Buffer.concat([blockBuff, buff]);
  52. const statusPack = new DataView(
  53. packBuffer.buffer.slice(
  54. packBuffer.byteOffset,
  55. packBuffer.byteOffset + packBuffer.byteLength,
  56. ),
  57. );
  58. statusPack.setUint32(0, 1437227610);
  59. // 16 bit slot
  60. statusPack.setUint16(6, this.block);
  61. statusPack.setUint16(8, frame);
  62. statusPack.setUint16(10, 255);
  63. statusPack.setUint16(24, 0);
  64. statusPack.setUint16(26, 0);
  65. // 32 bit slot
  66. statusPack.setUint32(12, buff.byteLength);
  67. statusPack.setUint32(16, 0);
  68. // statusPack.setUint32(20, 0);
  69. statusPack.setUint32(24, 0);
  70. statusPack.setUint32(28, 0);
  71. statusPack.setUint32(32, 0);
  72. if (this.channel && this.channel.isOpen()) {
  73. this.channel.sendMessageBinary(Buffer.from(statusPack.buffer));
  74. }
  75. } catch (error) {
  76. this.logger.error(error);
  77. }
  78. }
  79. /**
  80. * stream core push with block stream
  81. * @param stream meta Json and stream
  82. */
  83. pushFrameToSteam(stream: StreamFrameType): Promise<StreamPushResponse> {
  84. return new Promise((resolve, reject) => {
  85. try {
  86. // console.log('process.env', process.env.node_env);
  87. //TODO 开发替换path
  88. // const src = stream.clipPath.replace('/mnt/metaverse/scene', '');
  89. // const srcTmp = join(__dirname, `../ws/${src}`);
  90. // const clipPath = srcTmp;
  91. const clipPath = stream.clipPath;
  92. const metaData = stream.metaData || '{}';
  93. const frame = stream.frame;
  94. const serverTime = stream.serverTime || 754871824;
  95. const dir = stream.DIR || 1;
  96. this.lastStreamFrame.next({
  97. clipPath: stream.clipPath,
  98. metaData: metaData,
  99. frame: frame,
  100. serverTime: serverTime,
  101. DIR: dir,
  102. });
  103. const metaDataString = metaData.replace(/\s/g, '');
  104. const coordBuff = Buffer.from(metaDataString, 'utf-8');
  105. // console.warn('coordBuff', coordBuff.byteLength);
  106. // const steamStat = statSync(clipPath);
  107. // const steamTotalSize = metaData.length + steamStat.size;
  108. const clipBuffer = readFileSync(clipPath);
  109. const steam = new streamBuffers.ReadableStreamBuffer({
  110. frequency: 1, // in milliseconds.
  111. chunkSize: this.chunk_size - this.block, // in bytes.
  112. });
  113. steam.put(coordBuff);
  114. steam.put(clipBuffer);
  115. let steamByteLength = 0;
  116. steam.on('data', (data: Buffer) => {
  117. this.onSteaming.next(true);
  118. // console.log('data', data, data.byteLength);
  119. const blockBuffStart = Buffer.alloc(this.block);
  120. const packBuffer = Buffer.concat([blockBuffStart, data]);
  121. const framePack = new DataView(
  122. packBuffer.buffer.slice(
  123. packBuffer.byteOffset,
  124. packBuffer.byteOffset + packBuffer.byteLength,
  125. ),
  126. );
  127. // 16 bit slot
  128. // framePack.setUint32(4)
  129. framePack.setUint16(6, this.block);
  130. framePack.setUint16(8, frame); // first render cnt
  131. framePack.setUint16(10, dir); // isDIR
  132. framePack.setUint16(24, 0);
  133. framePack.setUint16(26, 0);
  134. // 32 bit slot
  135. // statusPack.setUint32(12, buff.byteLength);
  136. // console.log('metaLen', coordBuff.byteLength);
  137. // console.log('metaLen', clipBuffer.byteLength);
  138. framePack.setUint32(0, 1437227610);
  139. framePack.setUint32(12, coordBuff.byteLength); // metaLen
  140. framePack.setUint32(16, clipBuffer.byteLength); // mediaLen
  141. framePack.setUint32(20, serverTime); //server_time
  142. framePack.setUint32(24, 0);
  143. framePack.setUint32(28, 0);
  144. framePack.setUint32(this.block - 4, steamByteLength);
  145. const isLastFrame = framePack.byteLength - this.chunk_size < 0;
  146. // console.log('statusPack', statusPack);
  147. if (this.channel && this.channel.isOpen()) {
  148. this.channel.sendMessageBinary(Buffer.from(framePack.buffer));
  149. }
  150. steamByteLength += data.byteLength;
  151. if (isLastFrame) {
  152. // console.log('isLastFrame', isLastFrame);
  153. // steamByteLength = 0;
  154. // this.onSteaming.next(false);
  155. steam.stop();
  156. }
  157. });
  158. //TODO steam can't trigger end
  159. steam.on('end', () => {
  160. steamByteLength = 0;
  161. // console.log('stream end');
  162. if (this.onSteaming.value) {
  163. this.onSteaming.next(false);
  164. }
  165. return resolve({ frame: stream.frame, done: true });
  166. });
  167. } catch (error) {
  168. this.logger.error(error);
  169. return reject({ frame: stream.frame, done: false });
  170. }
  171. });
  172. }
  173. }