gemercheung преди 3 години
родител
ревизия
a605930ed1
променени са 2 файла, в които са добавени 240 реда и са изтрити 9 реда
  1. 9 7
      src/meta.gateway.ts
  2. 231 2
      src/scene/stream/stream.service.ts

+ 9 - 7
src/meta.gateway.ts

@@ -35,12 +35,11 @@ initLogger('Debug');
   path: '/ws',
 })
 export class MetaGateway
-  implements OnGatewayInit, OnGatewayConnection, OnGatewayDisconnect
-{
+  implements OnGatewayInit, OnGatewayConnection, OnGatewayDisconnect {
   constructor(
     private readonly sceneService: SceneService,
     private readonly configService: ConfigService,
-  ) {}
+  ) { }
   private logger: Logger = new Logger('MetaGateway');
   private peer: PeerConnection = null;
   private timer: NodeJS.Timeout;
@@ -100,8 +99,8 @@ export class MetaGateway
       portRangeEnd: portRangeEnd,
       iceServers: stun_server,
       // enableIceTcp: true,
-      maxMessageSize: 662144,
-      mtu: 1200,
+      // maxMessageSize: 662144,
+      mtu: 1500,
     });
 
     this.peer.onLocalDescription((sdp, type) => {
@@ -183,7 +182,7 @@ export class MetaGateway
 
     this.gameChanel = this.peer.createDataChannel('game-input', {
       // ordered: true,
-      // negotiated: false
+      // negotiated: false,
     });
 
     this.peer.onDataChannel((dc) => {
@@ -291,7 +290,7 @@ export class MetaGateway
           }
         },
       );
-    } catch (error) {}
+    } catch (error) { }
   }
 
   handleConnection(client: WebSocket, ...args: any[]) {
@@ -302,8 +301,11 @@ export class MetaGateway
     console.log('useId', params.get('userId'));
     console.log('roomId', params.get('roomId'));
 
+    console.log('reconnect', params.get('reconnect'));
+
     this.user_id = params.get('userId');
     this.roomId = params.get('roomId');
+    const reconnect = params.get('reconnect');
     this.sceneService.setConfig(this.user_id, this.roomId);
 
     this.logger.log(`Client connected:`);

+ 231 - 2
src/scene/stream/stream.service.ts

@@ -6,6 +6,18 @@ import * as streamBuffers from 'stream-buffers';
 import { BehaviorSubject } from 'rxjs';
 import { CacheService } from 'src/cache/cache.service';
 import { join } from 'path';
+import {
+  ReadableStream
+} from 'node:stream/web';
+
+import {
+  arrayBuffer,
+  blob,
+  buffer,
+  json,
+  text,
+} from 'node:stream/consumers';
+
 
 @Injectable()
 export class StreamService {
@@ -154,8 +166,225 @@ export class StreamService {
 
         const clipBuffer = readFileSync(clipPath);
 
+        const allData = Buffer.concat([coordBuff, clipBuffer]);
+        // console.log('allData', allData);
+        // const stream1 = new ReadableStream(allData, {
+        //   highWaterMark: this.chunk_size - this.block,
+        // });
+        const slices = Math.floor(
+          allData.byteLength / (this.chunk_size - this.block),
+        );
+
+        console.log('allData', allData.byteLength);
+        console.log('slices', slices);
+
+        let steamByteLength = 0;
+
+        for (let i = 0; i <= slices; i++) {
+          this.onSteaming.next(true);
+          const startSlot = i * (this.chunk_size - this.block);
+          const sliceLength =
+            i === slices
+              ? allData.byteLength
+              : (i + 1) * (this.chunk_size - this.block);
+
+          const currentSlice = allData.slice(startSlot, sliceLength);
+
+          console.log('startSlot', startSlot);
+          console.log('sliceLength', sliceLength);
+
+          const blockBuffStart = Buffer.alloc(this.block);
+          const packBuffer = Buffer.concat([blockBuffStart, currentSlice]);
+          const framePack = new DataView(
+            packBuffer.buffer.slice(
+              packBuffer.byteOffset,
+              packBuffer.byteOffset + packBuffer.byteLength,
+            ),
+          );
+
+          // 16 bit slot
+          // framePack.setUint32(4)
+          framePack.setUint16(6, this.block);
+          framePack.setUint16(8, frame); // first render cnt
+          framePack.setUint16(10, dir); // isDIR
+          framePack.setUint16(24, 0);
+          framePack.setUint16(26, 0);
+
+          // 32 bit slot
+          // statusPack.setUint32(12, buff.byteLength);
+          // this.logger.log('metaLen', coordBuff.byteLength);
+          // this.logger.log('metaLen', clipBuffer.byteLength);
+          console.log('steamByteLength', steamByteLength);
+          framePack.setUint32(0, 1437227610);
+          framePack.setUint32(12, coordBuff.byteLength); // metaLen
+          framePack.setUint32(16, clipBuffer.byteLength); // mediaLen
+          framePack.setUint32(20, serverTime); //server_time
+          framePack.setUint32(24, 0);
+          framePack.setUint32(28, 0);
+          framePack.setUint32(this.block - 4, steamByteLength);
+          // const isLastFrame = framePack.byteLength - this.chunk_size < 0;
+
+          // this.logger.log('statusPack', statusPack);
+          if (this.channel && this.channel.isOpen()) {
+            const isPush = this.channel.sendMessageBinary(
+              Buffer.from(framePack.buffer),
+            );
+            if (isPush) {
+              steamByteLength += currentSlice.byteLength;
+            }
+          }
+
+          if (i === slices) {
+            this.onSteaming.next(false);
+            steamByteLength = 0;
+            // debugger;
+            const stop = performance.now();
+            const inMillSeconds = stop - start;
+            const rounded = Number(inMillSeconds).toFixed(3);
+            this.logger.log(
+              `[timer]-当前流:${stream.clipPath}流耗时-->${rounded}ms`,
+            );
+            return resolve({ frame: stream.frame, done: true });
+          }
+        }
+        // let steamByteLength = 0;
+        console.log(
+          'stream-size %s : frame: %s, IDR: %s',
+          ((coordBuff.byteLength + clipBuffer.byteLength) / 1024).toFixed(2) +
+          ' KB',
+          frame,
+          dir,
+        );
+        // steam.on('data', (data: Buffer) => {
+        //   this.onSteaming.next(true);
+
+        //   // this.logger.log('data', data, data.byteLength);
+        //   const blockBuffStart = Buffer.alloc(this.block);
+        //   const packBuffer = Buffer.concat([blockBuffStart, data]);
+
+        //   const framePack = new DataView(
+        //     packBuffer.buffer.slice(
+        //       packBuffer.byteOffset,
+        //       packBuffer.byteOffset + packBuffer.byteLength,
+        //     ),
+        //   );
+
+        //   // 16 bit slot
+        //   // framePack.setUint32(4)
+        //   framePack.setUint16(6, this.block);
+        //   framePack.setUint16(8, frame); // first render cnt
+        //   framePack.setUint16(10, dir); // isDIR
+        //   framePack.setUint16(24, 0);
+        //   framePack.setUint16(26, 0);
+
+        //   // 32 bit slot
+        //   // statusPack.setUint32(12, buff.byteLength);
+        //   // this.logger.log('metaLen', coordBuff.byteLength);
+        //   // this.logger.log('metaLen', clipBuffer.byteLength);
+
+        //   framePack.setUint32(0, 1437227610);
+        //   framePack.setUint32(12, coordBuff.byteLength); // metaLen
+        //   framePack.setUint32(16, clipBuffer.byteLength); // mediaLen
+        //   framePack.setUint32(20, serverTime); //server_time
+        //   framePack.setUint32(24, 0);
+        //   framePack.setUint32(28, 0);
+        //   framePack.setUint32(this.block - 4, steamByteLength);
+        //   const isLastFrame = framePack.byteLength - this.chunk_size < 0;
+
+        //   // this.logger.log('statusPack', statusPack);
+        //   if (this.channel && this.channel.isOpen()) {
+        //     this.channel.sendMessageBinary(Buffer.from(framePack.buffer));
+        //   }
+        //   steamByteLength += data.byteLength;
+        //   if (isLastFrame) {
+        //     // this.logger.log('isLastFrame', isLastFrame);
+        //     // steamByteLength = 0;
+        //     // this.onSteaming.next(false);
+        //     steam.stop();
+        //   }
+        // });
+        // //TODO steam can't trigger end
+        // steam.on('end', () => {
+        //   steamByteLength = 0;
+        //   // this.logger.log('stream end');
+        //   const stop = performance.now();
+        //   const inMillSeconds = stop - start;
+        //   const rounded = Number(inMillSeconds).toFixed(3);
+        //   this.logger.log(
+        //     `[timer]-当前流:${stream.clipPath}流耗时-->${rounded}ms`,
+        //   );
+        //   if (this.onSteaming.value) {
+        //     this.onSteaming.next(false);
+        //   }
+
+        //   return resolve({ frame: stream.frame, done: true });
+        // });
+        // steam.on('error', (error) => {
+        //   this.logger.error('steam-error', error.message);
+        //   return reject({ frame: stream.frame, done: false });
+        // });
+      } catch (error) {
+        this.logger.error(error);
+        return reject({ frame: stream.frame, done: false });
+      }
+    });
+  }
+
+  /**
+ *  stream core push with block  stream
+ * @param stream   meta Json and stream
+ */
+
+  pushFrameToSteam1(stream: StreamFrameType): Promise<StreamPushResponse> {
+    return new Promise((resolve, reject) => {
+      try {
+        // let start, stop;
+        const start = performance.now();
+        //TODO process.env 开发路径
+        let clipPath: string;
+        if (process.env.NODE_ENV === 'development') {
+          const src = stream.clipPath.replace('/mnt/metaverse/scene', '');
+          const srcTmp = join(__dirname, `../ws/${src}`);
+          clipPath = srcTmp;
+        } else {
+          clipPath = stream.clipPath;
+        }
+        // 增加不存在帧数据中断数据,原因有太多不准确的路径。
+        // 其次其他地方会拿这里的最后一帧数据会出错,由于上流数据很多不稳定问题尽可保流的稳定性。
+        if (!existsSync(clipPath)) {
+          this.logger.error('不存在的推流路径::' + clipPath);
+          console.error('不存在的推流路径::' + clipPath);
+          return resolve({ frame: stream.frame, done: false });
+        }
+        if (stream.frame < 0) {
+          this.logger.error('不存在的帧位::' + stream.frame);
+          console.error('不存在的帧位::' + stream.frame);
+          return resolve({ frame: stream.frame, done: false });
+        }
+        // const clipPath = stream.clipPath;
+        const metaData = stream.metaData || '{}';
+        const frame = stream.frame;
+        const serverTime = stream.serverTime || 754871824;
+        const dir = stream.DIR || 1;
+
+        this.lastStreamFrame.next({
+          clipPath: stream.clipPath,
+          metaData: metaData,
+          frame: frame,
+          serverTime: serverTime,
+          DIR: dir,
+        });
+
+        const metaDataString = metaData.replace(/\s/g, '');
+        const coordBuff = Buffer.from(metaDataString, 'utf-8');
+        // console.warn('coordBuff', coordBuff.byteLength);
+        // const steamStat = statSync(clipPath);
+        // const steamTotalSize = metaData.length + steamStat.size;
+
+        const clipBuffer = readFileSync(clipPath);
+
         const steam = new streamBuffers.ReadableStreamBuffer({
-          frequency: 1, // in milliseconds.
+          frequency: 0, // in milliseconds.
           chunkSize: this.chunk_size - this.block, // in bytes.
         });
         steam.put(coordBuff);
@@ -165,7 +394,7 @@ export class StreamService {
         console.log(
           'stream-size %s : frame: %s, IDR: %s',
           ((coordBuff.byteLength + clipBuffer.byteLength) / 1024).toFixed(2) +
-            ' KB',
+          ' KB',
           frame,
           dir,
         );