Skip to content Skip to sidebar Skip to footer

Making A Lazy, Cached Observable That Only Execute The Source Once

I'm trying to use an rxjs observable to delegate, but share, a piece of expensive work across the lifetime of an application. Essentially, something like: var work$ = Observable.cr

Solution 1:

How does publishReplay() actually work

It internally creates a ReplaySubject and makes it multicast compatible. The minimal replay value of ReplaySubject is 1 emission. This results in the following:

  • First subscription will trigger the publishReplay(1) to internally subscribe to the source stream and pipe all emissions through the ReplaySubject, effectively caching the last n(=1) emissions
  • If a second subscription is started while the source is still active the multicast() will connect us to the same replaySubject and we will receive all next emissions until the source stream completes.
  • If a subscription is started after the source is already completed the replaySubject has cached the last n emissions and it will only receive those before completing.

const source = Rx.Observable.from([1,2])
  .mergeMap(i =>Rx.Observable.of('emission:'+i).delay(i * 100))
  .do(null,null,() =>console.log('source stream completed'))
  .publishReplay(1)
  .refCount();

// two subscriptions which are both in time before the stream completes
source.subscribe(val =>console.log(`sub1:${val}`), null, () =>console.log('sub1 completed'));
source.subscribe(val =>console.log(`sub2:${val}`), null, () =>console.log('sub2 completed'));

// new subscription after the stream has completed alreadysetTimeout(() => {
  source.subscribe(val =>console.log(`sub_late-to-the-party:${val}`), null, () =>console.log('sub_late-to-the-party completed'));
}, 500);
<scriptsrc="https://cdnjs.cloudflare.com/ajax/libs/rxjs/5.0.3/Rx.js"></script>

Post a Comment for "Making A Lazy, Cached Observable That Only Execute The Source Once"