gemercheung hace 3 años
padre
commit
a2672ede4c
Se han modificado 3 ficheros con 32 adiciones y 5 borrados
  1. 1 0
      package.json
  2. 22 5
      src/scene/scene.service.ts
  3. 9 0
      yarn.lock

+ 1 - 0
package.json

@@ -31,6 +31,7 @@
     "@nestjs/platform-socket.io": "^8.4.4",
     "@nestjs/platform-ws": "^8.4.4",
     "@nestjs/websockets": "^8.4.4",
+    "bl": "^5.0.0",
     "buffer": "^6.0.3",
     "multistream": "^4.1.0",
     "nestjs-redis": "^1.3.3",

+ 22 - 5
src/scene/scene.service.ts

@@ -5,8 +5,18 @@ import { Logger } from '@nestjs/common';
 import { DataChannel } from 'node-datachannel';
 import * as path from 'path';
 import { statSync, createReadStream } from 'fs';
-import multistream from 'multistream';
-import { Readable } from 'stream';
+// import multistream from 'multistream';
+import { PassThrough, Readable } from 'stream';
+
+const MergeSteam = (...streams) => {
+  let pass = new PassThrough();
+  let waiting = streams.length;
+  for (const stream of streams) {
+    pass = stream.pipe(pass, { end: false });
+    stream.once('end', () => waiting-- === 0 && pass.end());
+  }
+  return pass;
+};
 
 @Injectable()
 export class SceneService implements OnModuleInit {
@@ -174,18 +184,25 @@ export class SceneService implements OnModuleInit {
 
     // const steamTotalSize = metaData.length + steamStat.size;
 
-
     const steam = createReadStream(clipPath, {
       highWaterMark: chunk_size - block,
     });
+    const steam1 = createReadStream(clipPath);
+    const coordBuffSteam = Readable.from(coordBuff.toString());
+
+    const mergeSteam = MergeSteam(coordBuffSteam, steam1);
 
-    // const mergeSteam = MergeSteam(coordBuff, steam);
+    console.log('mergeSteam', mergeSteam);
 
     let steamByteLength = 0;
 
+    mergeSteam.on('data', (data: Buffer) => {
+      console.log('mergeSteam', data);
+    });
+
     steam.on('data', (data: Buffer) => {
       // console.log('data', data.byteLength);
-      console.log('steamStat-size', steamStat.size);
+      console.log('data-size', data);
       const blockBuffStart = Buffer.alloc(block);
 
       let packBuffer = Buffer.concat([blockBuffStart, data]);

+ 9 - 0
yarn.lock

@@ -1650,6 +1650,15 @@ bl@^4.0.3, bl@^4.1.0:
     inherits "^2.0.4"
     readable-stream "^3.4.0"
 
+bl@^5.0.0:
+  version "5.0.0"
+  resolved "https://registry.yarnpkg.com/bl/-/bl-5.0.0.tgz#6928804a41e9da9034868e1c50ca88f21f57aea2"
+  integrity sha512-8vxFNZ0pflFfi0WXA3WQXlj6CaMEwsmh63I1CNp0q+wWv8sD0ARx1KovSQd0l2GkwrMIOyedq0EF1FxI+RCZLQ==
+  dependencies:
+    buffer "^6.0.3"
+    inherits "^2.0.4"
+    readable-stream "^3.4.0"
+
 body-parser@1.19.2:
   version "1.19.2"
   resolved "https://registry.npmmirror.com/body-parser/-/body-parser-1.19.2.tgz"