123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192 |
- import { Injectable, Logger } from '@nestjs/common';
- import { DataChannel } from 'node-datachannel';
- // import * as path from 'path';
- import { readFileSync } from 'fs';
- import * as streamBuffers from 'stream-buffers';
- import { BehaviorSubject } from 'rxjs';
- import { CacheService } from 'src/cache/cache.service';
- import { join } from 'path';
- @Injectable()
- export class StreamService {
- private channel: DataChannel;
- private readonly chunk_size = 16000;
- private readonly block = 36;
- private logger: Logger = new Logger('StreamService');
- public onSteaming = new BehaviorSubject<boolean>(false);
- public lastStreamFrame = new BehaviorSubject<StreamFrameType>({
- frame: -1,
- clipPath: '',
- metaData: '',
- });
- constructor(private cacheService: CacheService) {}
- setChannel(channel: DataChannel) {
- this.channel = channel;
- }
- closeChannel() {
- this.channel = null;
- }
- /**
- * stream core push normal stream
- * @param data meta Json
- */
- pushNormalDataToStream(data: any) {
- const replyBin = JSON.stringify(data).replace(/\s/g, '');
- const buff = Buffer.from(replyBin, 'utf-8');
- if (this.channel && this.channel.isOpen()) {
- this.channel.sendMessageBinary(buff);
- }
- }
- /**
- * stream core push with block meta stream
- * @param data meta Json
- */
- pushMetaDataToSteam(stream: StreamMetaType) {
- try {
- const metaData = stream.metaData;
- const frame = stream.frame;
- const metaDataBin = metaData.replace(/\s/g, '');
- const buff = Buffer.from(metaDataBin, 'utf-8');
- // const block = 36;
- const blockBuff = Buffer.alloc(this.block);
- const packBuffer = Buffer.concat([blockBuff, buff]);
- const statusPack = new DataView(
- packBuffer.buffer.slice(
- packBuffer.byteOffset,
- packBuffer.byteOffset + packBuffer.byteLength,
- ),
- );
- statusPack.setUint32(0, 1437227610);
- // 16 bit slot
- statusPack.setUint16(6, this.block);
- statusPack.setUint16(8, frame);
- statusPack.setUint16(10, 255);
- statusPack.setUint16(24, 0);
- statusPack.setUint16(26, 0);
- // 32 bit slot
- statusPack.setUint32(12, buff.byteLength);
- statusPack.setUint32(16, 0);
- // statusPack.setUint32(20, 0);
- statusPack.setUint32(24, 0);
- statusPack.setUint32(28, 0);
- statusPack.setUint32(32, 0);
- if (this.channel && this.channel.isOpen()) {
- this.channel.sendMessageBinary(Buffer.from(statusPack.buffer));
- }
- } catch (error) {
- this.logger.error(error);
- }
- }
- /**
- * stream core push with block stream
- * @param stream meta Json and stream
- */
- pushFrameToSteam(stream: StreamFrameType): Promise<StreamPushResponse> {
- return new Promise((resolve, reject) => {
- try {
- // console.log('process.env', process.env.node_env);
- // const src = stream.clipPath.replace('/mnt/metaverse/scene', '');
- // const srcTmp = join(__dirname, `../ws/${src}`);
- // const clipPath = srcTmp;
- const clipPath = stream.clipPath;
- const metaData = stream.metaData || '{}';
- const frame = stream.frame;
- const serverTime = stream.serverTime || 754871824;
- const dir = stream.DIR || 1;
- this.lastStreamFrame.next({
- clipPath: stream.clipPath,
- metaData: metaData,
- frame: frame,
- serverTime: serverTime,
- DIR: dir,
- });
- const metaDataString = metaData.replace(/\s/g, '');
- const coordBuff = Buffer.from(metaDataString, 'utf-8');
- // console.warn('coordBuff', coordBuff.byteLength);
- // const steamStat = statSync(clipPath);
- // const steamTotalSize = metaData.length + steamStat.size;
- const clipBuffer = readFileSync(clipPath);
- const steam = new streamBuffers.ReadableStreamBuffer({
- frequency: 1, // in milliseconds.
- chunkSize: this.chunk_size - this.block, // in bytes.
- });
- steam.put(coordBuff);
- steam.put(clipBuffer);
- let steamByteLength = 0;
- steam.on('data', (data: Buffer) => {
- this.onSteaming.next(true);
- // console.log('data', data, data.byteLength);
- const blockBuffStart = Buffer.alloc(this.block);
- const packBuffer = Buffer.concat([blockBuffStart, data]);
- const framePack = new DataView(
- packBuffer.buffer.slice(
- packBuffer.byteOffset,
- packBuffer.byteOffset + packBuffer.byteLength,
- ),
- );
- // 16 bit slot
- // framePack.setUint32(4)
- framePack.setUint16(6, this.block);
- framePack.setUint16(8, frame); // first render cnt
- framePack.setUint16(10, dir); // isDIR
- framePack.setUint16(24, 0);
- framePack.setUint16(26, 0);
- // 32 bit slot
- // statusPack.setUint32(12, buff.byteLength);
- // console.log('metaLen', coordBuff.byteLength);
- // console.log('metaLen', clipBuffer.byteLength);
- framePack.setUint32(0, 1437227610);
- framePack.setUint32(12, coordBuff.byteLength); // metaLen
- framePack.setUint32(16, clipBuffer.byteLength); // mediaLen
- framePack.setUint32(20, serverTime); //server_time
- framePack.setUint32(24, 0);
- framePack.setUint32(28, 0);
- framePack.setUint32(this.block - 4, steamByteLength);
- const isLastFrame = framePack.byteLength - this.chunk_size < 0;
- // console.log('statusPack', statusPack);
- if (this.channel && this.channel.isOpen()) {
- this.channel.sendMessageBinary(Buffer.from(framePack.buffer));
- }
- steamByteLength += data.byteLength;
- if (isLastFrame) {
- // console.log('isLastFrame', isLastFrame);
- // steamByteLength = 0;
- // this.onSteaming.next(false);
- steam.stop();
- }
- });
- //TODO steam can't trigger end
- steam.on('end', () => {
- steamByteLength = 0;
- // console.log('stream end');
- if (this.onSteaming.value) {
- this.onSteaming.next(false);
- }
- return resolve({ frame: stream.frame, done: true });
- });
- } catch (error) {
- this.logger.error(error);
- return reject({ frame: stream.frame, done: false });
- }
- });
- }
- }
|