Making A Lazy, Cached Observable That Only Execute The Source Once
I'm trying to use an rxjs observable to delegate, but share, a piece of expensive work across the lifetime of an application. Essentially, something like: var work$ = Observable.cr
Solution 1:
How does publishReplay()
actually work
It internally creates a ReplaySubject
and makes it multicast
compatible. The minimal replay value of ReplaySubject
is 1 emission. This results in the following:
- First subscription will trigger the
publishReplay(1)
to internally subscribe to the source stream and pipe all emissions through theReplaySubject
, effectively caching the last n(=1) emissions - If a second subscription is started while the source is still active the
multicast()
will connect us to the samereplaySubject
and we will receive all next emissions until the source stream completes. - If a subscription is started after the source is already completed the replaySubject has cached the last n emissions and it will only receive those before completing.
const source = Rx.Observable.from([1,2])
.mergeMap(i =>Rx.Observable.of('emission:'+i).delay(i * 100))
.do(null,null,() =>console.log('source stream completed'))
.publishReplay(1)
.refCount();
// two subscriptions which are both in time before the stream completes
source.subscribe(val =>console.log(`sub1:${val}`), null, () =>console.log('sub1 completed'));
source.subscribe(val =>console.log(`sub2:${val}`), null, () =>console.log('sub2 completed'));
// new subscription after the stream has completed alreadysetTimeout(() => {
source.subscribe(val =>console.log(`sub_late-to-the-party:${val}`), null, () =>console.log('sub_late-to-the-party completed'));
}, 500);
<scriptsrc="https://cdnjs.cloudflare.com/ajax/libs/rxjs/5.0.3/Rx.js"></script>
Post a Comment for "Making A Lazy, Cached Observable That Only Execute The Source Once"