123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402 |
- import { Injectable, Logger } from '@nestjs/common';
- import { DataChannel } from 'node-datachannel';
- // import * as path from 'path';
- import { existsSync, readFileSync } from 'fs';
- import * as streamBuffers from 'stream-buffers';
- import { BehaviorSubject } from 'rxjs';
- import { join } from 'path';
- @Injectable()
- export class StreamService {
- private channel: DataChannel;
- private readonly chunk_size = 16000;
- private readonly block = 36;
- private logger: Logger = new Logger('StreamService');
- public onSteaming = new BehaviorSubject<boolean>(false);
- public lastStreamFrame = new BehaviorSubject<StreamFrameType>({
- frame: -1,
- clipPath: '',
- metaData: '',
- });
- // constructor() { }
- setChannel(channel: DataChannel) {
- this.channel = channel;
- }
- closeChannel() {
- this.channel = null;
- }
- public sleep = (ms: number) => new Promise((r) => setTimeout(r, ms));
- /**
- * stream core push normal stream
- * @param data meta Json
- */
- pushNormalDataToStream(data: any) {
- const replyBin = JSON.stringify(data).replace(/\s/g, '');
- const buff = Buffer.from(replyBin, 'utf-8');
- if (this.channel && this.channel.isOpen()) {
- this.channel.sendMessageBinary(buff);
- }
- }
- /**
- * stream core push with block meta stream
- * @param data meta Json
- */
- pushMetaDataToSteam(stream: StreamMetaType): Promise<StreamPushResponse> {
- return new Promise((resolve, reject) => {
- try {
- const metaData = stream.metaData;
- const frame = stream.frame;
- const metaDataBin = metaData.replace(/\s/g, '');
- const buff = Buffer.from(metaDataBin, 'utf-8');
- if (stream.frame < 0) {
- this.logger.error('不存在的帧位::' + stream.frame);
- console.error('不存在的帧位::' + stream.frame);
- return resolve({ frame: stream.frame, done: false });
- }
- // const block = 36;
- const blockBuff = Buffer.alloc(this.block);
- const packBuffer = Buffer.concat([blockBuff, buff]);
- const statusPack = new DataView(
- packBuffer.buffer.slice(
- packBuffer.byteOffset,
- packBuffer.byteOffset + packBuffer.byteLength,
- ),
- );
- statusPack.setUint32(0, 1437227610);
- // 16 bit slot
- statusPack.setUint16(6, this.block);
- statusPack.setUint16(8, frame);
- statusPack.setUint16(10, 255);
- statusPack.setUint16(24, 0);
- statusPack.setUint16(26, 0);
- // 32 bit slot
- statusPack.setUint32(12, buff.byteLength);
- statusPack.setUint32(16, 0);
- // statusPack.setUint32(20, 0);
- statusPack.setUint32(24, 0);
- statusPack.setUint32(28, 0);
- statusPack.setUint32(32, 0);
- if (this.channel && this.channel.isOpen()) {
- const done = this.channel.sendMessageBinary(
- Buffer.from(statusPack.buffer),
- );
- return resolve({ frame: stream.frame, done: done });
- } else {
- return resolve({ frame: stream.frame, done: false });
- }
- } catch (error) {
- this.logger.error(error);
- return reject({ frame: stream.frame, done: false });
- }
- });
- }
- /**
- * stream core push with block stream
- * @param stream meta Json and stream
- */
- pushFrameToSteam(stream: StreamFrameType): Promise<StreamPushResponse> {
- return new Promise(async (resolve, reject) => {
- try {
- // let start, stop;
- const start = performance.now();
- //TODO process.env 开发路径
- let clipPath: string;
- if (process.env.NODE_ENV === 'development') {
- const src = stream.clipPath.replace('/mnt/metaverse/scene', '');
- const srcTmp = join(__dirname, `../ws/${src}`);
- clipPath = srcTmp;
- } else {
- clipPath = stream.clipPath;
- }
- // 增加不存在帧数据中断数据,原因有太多不准确的路径。
- // 其次其他地方会拿这里的最后一帧数据会出错,由于上流数据很多不稳定问题尽可保流的稳定性。
- if (!existsSync(clipPath)) {
- this.logger.error('不存在的推流路径::' + clipPath);
- console.error('不存在的推流路径::' + clipPath);
- return resolve({ frame: stream.frame, done: false });
- }
- if (stream.frame < 0) {
- this.logger.error('不存在的帧位::' + stream.frame);
- console.error('不存在的帧位::' + stream.frame);
- return resolve({ frame: stream.frame, done: false });
- }
- // const clipPath = stream.clipPath;
- const metaData = stream.metaData || '{}';
- const frame = stream.frame;
- const serverTime = stream.serverTime || 754871824;
- const dir = stream.DIR || 1;
- this.lastStreamFrame.next({
- clipPath: stream.clipPath,
- metaData: metaData,
- frame: frame,
- serverTime: serverTime,
- DIR: dir,
- });
- const metaDataString = metaData.replace(/\s/g, '');
- const coordBuff = Buffer.from(metaDataString, 'utf-8');
- const clipBuffer = readFileSync(clipPath);
- const allData = Buffer.concat([coordBuff, clipBuffer]);
- const slices = Math.floor(
- allData.byteLength / (this.chunk_size - this.block),
- );
- let steamByteLength = 0;
- for (let i = 0; i <= slices; i++) {
- this.onSteaming.next(true);
- const startSlot = i * (this.chunk_size - this.block);
- const sliceLength =
- i === slices
- ? allData.byteLength
- : (i + 1) * (this.chunk_size - this.block);
- const currentSlice = allData.slice(startSlot, sliceLength);
- // console.log('startSlot', startSlot);
- // console.log('sliceLength', sliceLength);
- const blockBuffStart = Buffer.alloc(this.block);
- const packBuffer = Buffer.concat([blockBuffStart, currentSlice]);
- const framePack = new DataView(
- packBuffer.buffer.slice(
- packBuffer.byteOffset,
- packBuffer.byteOffset + packBuffer.byteLength,
- ),
- );
- // 16 bit slot
- // framePack.setUint32(4)
- framePack.setUint16(6, this.block);
- framePack.setUint16(8, frame); // first render cnt
- framePack.setUint16(10, dir); // isDIR
- framePack.setUint16(24, 0);
- framePack.setUint16(26, 0);
- // 32 bit slot
- // statusPack.setUint32(12, buff.byteLength);
- // this.logger.log('metaLen', coordBuff.byteLength);
- // this.logger.log('metaLen', clipBuffer.byteLength);
- // console.log('steamByteLength', steamByteLength);
- framePack.setUint32(0, 1437227610);
- framePack.setUint32(12, coordBuff.byteLength); // metaLen
- framePack.setUint32(16, clipBuffer.byteLength); // mediaLen
- framePack.setUint32(20, serverTime); //server_time
- framePack.setUint32(24, 0);
- framePack.setUint32(28, 0);
- framePack.setUint32(this.block - 4, steamByteLength);
- steamByteLength += currentSlice.byteLength;
- // const isLastFrame = framePack.byteLength - this.chunk_size < 0;
- // this.logger.log('statusPack', statusPack);
- if (this.channel && this.channel.isOpen()) {
- const isPush = this.channel.sendMessageBinary(
- Buffer.from(framePack.buffer),
- );
- if (!isPush) {
- await this.sleep(5);
- console.error('流测试推不成功-再试', isPush);
- }
- if (i === slices) {
- this.onSteaming.next(false);
- steamByteLength = 0;
- if (isPush) {
- // debugger;
- const stop = performance.now();
- const inMillSeconds = stop - start;
- const rounded = Number(inMillSeconds).toFixed(3);
- this.logger.log(
- `[timer]-当前流:${stream.clipPath}流耗时-->${rounded}ms`,
- );
- return resolve({
- frame: stream.frame,
- done: true,
- clipPath: stream.clipPath,
- });
- } else {
- return resolve({
- frame: stream.frame,
- done: false,
- clipPath: stream.clipPath,
- });
- }
- }
- } else {
- return resolve({ frame: stream.frame, done: false });
- }
- }
- // let steamByteLength = 0;
- console.log(
- 'stream-size %s : frame: %s, IDR: %s',
- ((coordBuff.byteLength + clipBuffer.byteLength) / 1024).toFixed(2) +
- ' KB',
- frame,
- dir,
- );
- } catch (error) {
- this.logger.error(error);
- return reject({ frame: stream.frame, done: false });
- }
- });
- }
- /**
- * stream core push with block stream
- * @param stream meta Json and stream
- */
- pushFrameToSteam1(stream: StreamFrameType): Promise<StreamPushResponse> {
- return new Promise((resolve, reject) => {
- try {
- // let start, stop;
- const start = performance.now();
- //TODO process.env 开发路径
- let clipPath: string;
- if (process.env.NODE_ENV === 'development') {
- const src = stream.clipPath.replace('/mnt/metaverse/scene', '');
- const srcTmp = join(__dirname, `../ws/${src}`);
- clipPath = srcTmp;
- } else {
- clipPath = stream.clipPath;
- }
- // 增加不存在帧数据中断数据,原因有太多不准确的路径。
- // 其次其他地方会拿这里的最后一帧数据会出错,由于上流数据很多不稳定问题尽可保流的稳定性。
- if (!existsSync(clipPath)) {
- this.logger.error('不存在的推流路径::' + clipPath);
- console.error('不存在的推流路径::' + clipPath);
- return resolve({ frame: stream.frame, done: false });
- }
- if (stream.frame < 0) {
- this.logger.error('不存在的帧位::' + stream.frame);
- console.error('不存在的帧位::' + stream.frame);
- return resolve({ frame: stream.frame, done: false });
- }
- // const clipPath = stream.clipPath;
- const metaData = stream.metaData || '{}';
- const frame = stream.frame;
- const serverTime = stream.serverTime || 754871824;
- const dir = stream.DIR || 1;
- this.lastStreamFrame.next({
- clipPath: stream.clipPath,
- metaData: metaData,
- frame: frame,
- serverTime: serverTime,
- DIR: dir,
- });
- const metaDataString = metaData.replace(/\s/g, '');
- const coordBuff = Buffer.from(metaDataString, 'utf-8');
- // console.warn('coordBuff', coordBuff.byteLength);
- // const steamStat = statSync(clipPath);
- // const steamTotalSize = metaData.length + steamStat.size;
- const clipBuffer = readFileSync(clipPath);
- const steam = new streamBuffers.ReadableStreamBuffer({
- frequency: 0, // in milliseconds.
- chunkSize: this.chunk_size - this.block, // in bytes.
- });
- steam.put(coordBuff);
- steam.put(clipBuffer);
- let steamByteLength = 0;
- console.log(
- 'stream-size %s : frame: %s, IDR: %s',
- ((coordBuff.byteLength + clipBuffer.byteLength) / 1024).toFixed(2) +
- ' KB',
- frame,
- dir,
- );
- steam.on('data', (data: Buffer) => {
- this.onSteaming.next(true);
- // this.logger.log('data', data, data.byteLength);
- const blockBuffStart = Buffer.alloc(this.block);
- const packBuffer = Buffer.concat([blockBuffStart, data]);
- const framePack = new DataView(
- packBuffer.buffer.slice(
- packBuffer.byteOffset,
- packBuffer.byteOffset + packBuffer.byteLength,
- ),
- );
- // 16 bit slot
- // framePack.setUint32(4)
- framePack.setUint16(6, this.block);
- framePack.setUint16(8, frame); // first render cnt
- framePack.setUint16(10, dir); // isDIR
- framePack.setUint16(24, 0);
- framePack.setUint16(26, 0);
- // 32 bit slot
- // statusPack.setUint32(12, buff.byteLength);
- // this.logger.log('metaLen', coordBuff.byteLength);
- // this.logger.log('metaLen', clipBuffer.byteLength);
- framePack.setUint32(0, 1437227610);
- framePack.setUint32(12, coordBuff.byteLength); // metaLen
- framePack.setUint32(16, clipBuffer.byteLength); // mediaLen
- framePack.setUint32(20, serverTime); //server_time
- framePack.setUint32(24, 0);
- framePack.setUint32(28, 0);
- framePack.setUint32(this.block - 4, steamByteLength);
- const isLastFrame = framePack.byteLength - this.chunk_size < 0;
- // this.logger.log('statusPack', statusPack);
- if (this.channel && this.channel.isOpen()) {
- this.channel.sendMessageBinary(Buffer.from(framePack.buffer));
- }
- steamByteLength += data.byteLength;
- if (isLastFrame) {
- // this.logger.log('isLastFrame', isLastFrame);
- // steamByteLength = 0;
- // this.onSteaming.next(false);
- steam.stop();
- }
- });
- //TODO steam can't trigger end
- steam.on('end', () => {
- steamByteLength = 0;
- // this.logger.log('stream end');
- const stop = performance.now();
- const inMillSeconds = stop - start;
- const rounded = Number(inMillSeconds).toFixed(3);
- this.logger.log(
- `[timer]-当前流:${stream.clipPath}流耗时-->${rounded}ms`,
- );
- if (this.onSteaming.value) {
- this.onSteaming.next(false);
- }
- return resolve({ frame: stream.frame, done: true });
- });
- steam.on('error', (error) => {
- this.logger.error('steam-error', error.message);
- return reject({ frame: stream.frame, done: false });
- });
- } catch (error) {
- this.logger.error(error);
- return reject({ frame: stream.frame, done: false });
- }
- });
- }
- }
|