gemercheung vor 3 Jahren
Ursprung
Commit
afa3b4a0e5

+ 1 - 1
src/queue/config.ts

@@ -1 +1 @@
-export { VERSION } from './version.js';
+export { VERSION } from './version';

+ 1 - 1
src/queue/debounce-queue/debounce-queue.ts

@@ -1,7 +1,7 @@
 import { interval, Subject, Subscription } from 'rxjs';
 import { debounce } from 'rxjs/operators';
 
-import RxQueue from '../rx-queue.js';
+import RxQueue from '../rx-queue';
 
 /**
  * DebounceQueue drops a item if there's another one comes in a period of time.

+ 1 - 1
src/queue/delay-queue/delay-queue-executor.ts

@@ -1,6 +1,6 @@
 import type { Subscription } from 'rxjs';
 
-import DelayQueue from './delay-queue.js';
+import DelayQueue from './delay-queue';
 
 export interface ExecutionUnit<T = unknown> {
   fn: () => T;

+ 13 - 6
src/queue/delay-queue/delay-queue.ts

@@ -1,7 +1,7 @@
-import { concat, of, Subject, Subscription, timer } from 'rxjs';
-import { concatMap, ignoreElements } from 'rxjs/operators';
+import { concat, empty, of, Subject, Subscription, timer, EMPTY } from 'rxjs';
+import { concatMap, ignoreElements, switchMap } from 'rxjs/operators';
 
-import RxQueue from '../rx-queue.js';
+import RxQueue from '../rx-queue';
 
 /**
  * DelayQueue passes all the items and add delays between items.
@@ -9,7 +9,7 @@ import RxQueue from '../rx-queue.js';
  */
 export class DelayQueue<T = unknown> extends RxQueue<T> {
   private subscription: Subscription;
-  private subject: Subject<T>;
+  public subject: Subject<T>;
 
   /**
    *
@@ -46,8 +46,15 @@ export class DelayQueue<T = unknown> extends RxQueue<T> {
     super.unsubscribe();
   }
 
-  clean() {
-    console.log('clean', this.subject);
+  override clean(): void {
+    // 1
+    // this.subject.pipe(ignoreElements());
+    // 2
+    this.subject.pipe(switchMap(() => EMPTY)).pipe(ignoreElements());
+    // this.subscription = this.subject
+    //   .pipe(switchMap(() => EMPTY))
+    //   .subscribe((item: T) => super.next(item));
+    console.log('clean-DelayQueue-1', this.subject);
   }
 }
 

+ 7 - 7
src/queue/mod.ts

@@ -1,9 +1,9 @@
-export { RxQueue } from './rx-queue.js';
+export { RxQueue } from './rx-queue';
 
-export { DelayQueueExecutor } from './delay-queue/delay-queue-executor.js';
-export { DebounceQueue } from './debounce-queue/debounce-queue.js';
-export { DelayQueue } from './delay-queue/delay-queue.js';
-export { ThrottleQueue } from './throttle-queue/throttle-queue.js';
-export { concurrencyExecuter } from './concurrency-executor/concurrency-executer.js';
+export { DelayQueueExecutor } from './delay-queue/delay-queue-executor';
+export { DebounceQueue } from './debounce-queue/debounce-queue';
+export { DelayQueue } from './delay-queue/delay-queue';
+export { ThrottleQueue } from './throttle-queue/throttle-queue';
+export { concurrencyExecuter } from './concurrency-executor/concurrency-executer';
 
-export { VERSION } from './version.js';
+export { VERSION } from './version';

+ 4 - 1
src/queue/rx-queue.ts

@@ -1,6 +1,6 @@
 import { PartialObserver, Subject, Subscription } from 'rxjs';
 
-import { VERSION } from './config.js';
+import { VERSION } from './config';
 
 // default set to 500 milliseconds
 const DEFAULT_PERIOD_TIME = 500;
@@ -49,6 +49,9 @@ export class RxQueue<T = unknown> extends Subject<T> {
   public version(): string {
     return VERSION;
   }
+
+  // eslint-disable-next-line @typescript-eslint/no-empty-function
+  public clean(): void {}
 }
 
 export default RxQueue;

+ 1 - 1
src/queue/throttle-queue/throttle-queue.ts

@@ -1,7 +1,7 @@
 import { interval, Subject, Subscription } from 'rxjs';
 import { throttle } from 'rxjs/operators';
 
-import RxQueue from '../rx-queue.js';
+import RxQueue from '../rx-queue';
 
 /**
  * ThrottleQueue

+ 12 - 7
src/scene/scene.service.ts

@@ -11,7 +11,8 @@ import { StreamService } from './stream/stream.service';
 // import { InjectQueue } from '@nestjs/bull';
 // import { Queue } from 'bull';
 import { RotateService } from 'src/rotate/rotate.service';
-import { DelayQueue, RxQueue, DebounceQueue } from 'rx-queue';
+// import { DelayQueue, RxQueue, DebounceQueue } from 'rx-queue';
+import { DelayQueue, RxQueue, DebounceQueue } from '../queue/mod';
 import { MoveService } from 'src/move/move.service';
 import { GetRouterService } from 'src/get-router/get-router.service';
 import { ConfigService } from '@nestjs/config';
@@ -355,6 +356,7 @@ export class SceneService implements OnModuleInit, OnModuleDestroy {
     this.logger.log('walking-trace_id', request.trace_id);
     // 进入正常walking流程
     if (!this.onMoving.getValue()) {
+      console.log('walking-step-main-1', request.trace_id);
       this.latestWalkingRequest = null;
       this.handleWalking(request);
     }
@@ -362,13 +364,14 @@ export class SceneService implements OnModuleInit, OnModuleDestroy {
     if (!this.moveSliceLastFrameSub) {
       this.moveSliceLastFrameSub = this.moveSliceLastFrame.subscribe(
         async (frame: MovingLastUpdateType) => {
+          console.log('walkingStop-'+ this.latestWalkingRequest + ','+ this.onMoving.value);
           //TODO 正在行走时,有新的reqest
           if (this.latestWalkingRequest && this.onMoving.value) {
             this.logger.log('stop-data-1', frame);
             // this.moveQueue.complete();
             // this.moveQueue.of('');
             // TODO 中断move队列 ?优化如何清空
-            // this.moveQueue.pipe(ignoreElements());
+            this.moveQueue.clean();
             this.moveQueueSubscription.unsubscribe();
             this.moveQueueSubscription = null;
             //step1 执行stop方法
@@ -398,6 +401,7 @@ export class SceneService implements OnModuleInit, OnModuleDestroy {
             );
             this.logger.log('stop-redisMeta', redisMeta);
             // 2. 中断重新walking
+            console.log('walking-step-reWalking-1', request.trace_id);
             this.handleReWalking(this.latestWalkingRequest);
           }
         },
@@ -414,11 +418,12 @@ export class SceneService implements OnModuleInit, OnModuleDestroy {
   async handleWalking(request: MoveRequest): Promise<void> {
     try {
       // if (!this.onMoving.getValue()) {
+      console.log('walking-step-main-2', request.trace_id);
       const start = performance.now();
       this._rotateCount = 0;
       const user = this.moveService.users[this.user_id];
-      this.logger.log('进入1 - searchRoad');
-      this.logger.log('path-start' + user.breakPointId);
+      console.log('进入1 - searchRoad');
+      console.log('path-start' + user.breakPointId);
       const path = await this.getRouterService.searchRoad(
         user.appId,
         user.breakPointId,
@@ -426,7 +431,7 @@ export class SceneService implements OnModuleInit, OnModuleDestroy {
       );
       this.logger.log('walking-path', path);
       if (!path) {
-        this.logger.log('不存在--path', path);
+        console.log('不存在--path', path);
         this.resumeStream();
         return;
       }
@@ -535,7 +540,7 @@ export class SceneService implements OnModuleInit, OnModuleDestroy {
       const joystickRes = await this.moveService.seqExeJoystick(request);
       this.logger.log(
         'joystick-breakPointId:' +
-          this.moveService.users[this.user_id].breakPointId,
+        this.moveService.users[this.user_id].breakPointId,
       );
       // 有数据 [0]是rotate数据,[1-infinity]是walking数据
       this.logger.log('joystickRes-1', joystickRes);
@@ -783,7 +788,7 @@ export class SceneService implements OnModuleInit, OnModuleDestroy {
           }
         } else {
           const msg: RTCMessageRequest = JSON.parse(message);
-          console.log('msg.action_type:' + msg.action_type);
+          // console.log('msg.action_type:' + msg.action_type);
           switch (msg.action_type) {
             case ActionType.walk:
               const walk = msg as any as MoveRequest;