Selaa lähdekoodia

clean 优化clean方法

gemercheung 3 vuotta sitten
vanhempi
commit
37e35561ef

+ 89 - 0
src/queue/delay-queue/delay-queue copy.ts

@@ -0,0 +1,89 @@
+import {
+  concat,
+  empty,
+  of,
+  Subject,
+  Subscription,
+  timer,
+  EMPTY,
+  BehaviorSubject,
+} from 'rxjs';
+import {
+  concatMap,
+  ignoreElements,
+  startWith,
+  switchMap,
+} from 'rxjs/operators';
+
+import RxQueue from '../rx-queue';
+
+/**
+ * DelayQueue passes all the items and add delays between items.
+ * T: item type
+ */
+export class DelayQueue<T = unknown> extends RxQueue<T> {
+  private subscription: Subscription;
+  public subject: Subject<T>;
+
+  /**
+   *
+   * @param period milliseconds
+   */
+  constructor(
+    period?: number, // milliseconds
+  ) {
+    super(period);
+    this.subject = new Subject<T>();
+    this.initQueue();
+  }
+
+  initQueue(): void {
+    this.subscription = this.subject
+      .pipe(
+        concatMap((x) =>
+          concat(
+            of(x), // emit first item right away
+            /**
+             * Issue #71 - DelayQueue failed: behavior breaking change after RxJS from v6 to v7
+             *  https://github.com/huan/rx-queue/issues/71
+             */
+            timer(this.period).pipe(ignoreElements()),
+          ),
+        ),
+      )
+      .subscribe((item: T) => super.next(item));
+  }
+
+  override next(item: T) {
+    this.subject.next(item);
+  }
+
+  override unsubscribe() {
+    this.subscription.unsubscribe();
+    super.unsubscribe();
+  }
+
+  override clean(): void {
+    this.subscription.unsubscribe();
+    this.subject
+      .asObservable()
+      .pipe(switchMap(() => EMPTY))
+      .pipe(ignoreElements());
+    // .subscribe((item: T) => super.next(item));
+
+    this.initQueue();
+    // this.unsubscribe();
+    // this.subject.complete();
+    // this.initQueue();
+    // 2
+    // this.subject.pipe(switchMap(() => EMPTY));
+    // this.subject.pipe(startWith(true), ignoreElements());
+
+    // this.subscription = this.subject
+    //   .pipe(switchMap(() => EMPTY))
+    //   .subscribe((item: T) => super.next(item));
+    console.log('clean-DelayQueue-1', this.subject);
+  }
+}
+
+export default DelayQueue;

+ 10 - 14
src/queue/delay-queue/delay-queue.ts

@@ -38,6 +38,9 @@ export class DelayQueue<T = unknown> extends RxQueue<T> {
   }
 
   initQueue(): void {
+    if (!this.subject) {
+      this.subject = new Subject<T>();
+    }
     this.subscription = this.subject
       .pipe(
         concatMap((x) =>
@@ -65,23 +68,16 @@ export class DelayQueue<T = unknown> extends RxQueue<T> {
 
   override clean(): void {
     this.subscription.unsubscribe();
-    this.subject
-      .asObservable()
-      .pipe(switchMap(() => EMPTY))
-      .pipe(ignoreElements());
+    this.subject.complete();
+    this.subscription = null;
+    this.subject = null;
+    // this.subject
+    //   .asObservable()
+    //   .pipe(switchMap(() => EMPTY))
+    //   .pipe(ignoreElements());
     // .subscribe((item: T) => super.next(item));
 
     this.initQueue();
-    // this.unsubscribe();
-    // this.subject.complete();
-    // this.initQueue();
-    // 2
-    // this.subject.pipe(switchMap(() => EMPTY));
-    // this.subject.pipe(startWith(true), ignoreElements());
-
-    // this.subscription = this.subject
-    //   .pipe(switchMap(() => EMPTY))
-    //   .subscribe((item: T) => super.next(item));
     console.log('clean-DelayQueue-1', this.subject);
   }
 }

+ 2 - 7
src/scene/scene.service.ts

@@ -258,8 +258,6 @@ export class SceneService implements OnModuleInit, OnModuleDestroy {
    */
 
   async handleRotate(request) {
-    // this.roRequestQueueSub = this.roRequestQueue.subscribe(
-    //   async (request: RotateRequest) => {
     // try {
     if (this.firstRender) {
       if (!this.roQueueSubscription) {
@@ -268,7 +266,7 @@ export class SceneService implements OnModuleInit, OnModuleDestroy {
       let redisMeta: StreamReplyType;
       this.onRotating.next(true);
       const start = performance.now();
-      // 当move时处理 _rotateCount是移动端同时触发的问题
+      // 当move时处理 _rotateCount是移动端同时触发的问题,rotateStopThrottle是减少重复抖动stop的处理。
       if (
         this.onMoving.value &&
         this._rotateCount > 5 &&
@@ -276,9 +274,6 @@ export class SceneService implements OnModuleInit, OnModuleDestroy {
       ) {
         this.rotateStopThrottle = true;
         const lastStreamFrame = this.lastMoveStreamFrame.getValue();
-        // const lastMoveStreamFrameBk = this.lastMoveStreamFrameBk;
-        //TODO对比
-
         this.logger.log('lastStreamFrame', JSON.stringify(lastStreamFrame));
         // this.logger.log(
         //   'lastMoveStreamFrameBk',
@@ -302,7 +297,7 @@ export class SceneService implements OnModuleInit, OnModuleDestroy {
 
         const trace_id = metaData.traceIds[0];
         const userId = newUserStates.userId;
-        const breakPointId = metaData.endBreakPointId;
+        const breakPointId = metaData.endBreakPointId || metaData.breakPointId;
         const cameraAngle = newUserStates.playerState.camera.angle;
         const playerAngle = newUserStates.playerState.player.angle;
         this.logger.log(

+ 1 - 1
src/scene/stream/stream.service.ts

@@ -19,7 +19,7 @@ export class StreamService {
     clipPath: '',
     metaData: '',
   });
-  constructor(private cacheService: CacheService) {}
+  // constructor() {}
 
   setChannel(channel: DataChannel) {
     this.channel = channel;