delay-queue.ts 1.7 KB

12345678910111213141516171819202122232425262728293031323334353637383940414243444546474849505152535455565758596061626364656667686970717273747576777879808182838485
  1. import {
  2. concat,
  3. empty,
  4. of,
  5. Subject,
  6. Subscription,
  7. timer,
  8. EMPTY,
  9. BehaviorSubject,
  10. } from 'rxjs';
  11. import {
  12. concatMap,
  13. ignoreElements,
  14. startWith,
  15. switchMap,
  16. } from 'rxjs/operators';
  17. import RxQueue from '../rx-queue';
  18. /**
  19. * DelayQueue passes all the items and add delays between items.
  20. * T: item type
  21. */
  22. export class DelayQueue<T = unknown> extends RxQueue<T> {
  23. private subscription: Subscription;
  24. public subject: Subject<T>;
  25. /**
  26. *
  27. * @param period milliseconds
  28. */
  29. constructor(
  30. period?: number, // milliseconds
  31. ) {
  32. super(period);
  33. this.subject = new Subject<T>();
  34. this.initQueue();
  35. }
  36. initQueue(): void {
  37. if (!this.subject) {
  38. this.subject = new Subject<T>();
  39. }
  40. this.subscription = this.subject
  41. .pipe(
  42. concatMap((x) =>
  43. concat(
  44. of(x), // emit first item right away
  45. /**
  46. * Issue #71 - DelayQueue failed: behavior breaking change after RxJS from v6 to v7
  47. * https://github.com/huan/rx-queue/issues/71
  48. */
  49. timer(this.period).pipe(ignoreElements()),
  50. ),
  51. ),
  52. )
  53. .subscribe((item: T) => super.next(item));
  54. }
  55. override next(item: T) {
  56. this.subject.next(item);
  57. }
  58. override unsubscribe() {
  59. this.subscription.unsubscribe();
  60. super.unsubscribe();
  61. }
  62. override clean(): void {
  63. this.subscription.unsubscribe();
  64. this.subject.complete();
  65. this.subscription = null;
  66. this.subject = null;
  67. // this.subject
  68. // .asObservable()
  69. // .pipe(switchMap(() => EMPTY))
  70. // .pipe(ignoreElements());
  71. // .subscribe((item: T) => super.next(item));
  72. this.initQueue();
  73. console.log('clean-DelayQueue');
  74. }
  75. }
  76. export default DelayQueue;