Welcome toVigges Developer Community-Open, Learning,Share
Welcome To Ask or Share your Answers For Others

Categories

0 votes
997 views
in Technique[技术] by (71.8m points)

rxjs - Build "Heartbeat" Observable from Unpredictable Source Observable

I have an Observable, source, that may emit items at unpredictable times. I'm trying to use it to build another Observable that reliably emits its values every 500ms.

Let's say that source emits values at these times:

  • 100ms - first item
  • 980ms - second item
  • 1020ms - third item
  • 1300ms - fourth item, etc.

I'd like to "smooth" this stream, so that I get outputs like:

  • 500ms - first item
  • 1000ms - second item
  • 1500ms - third item
  • 2000ms - fourth item

A naive approach might be to just add a delay in between emissions of source items. But, that won't create evenly spaced intervals, like I want.

I've tried various combinations of .timer(), .interval(), and .flatMap(), but nothing promising, yet.

question from:https://stackoverflow.com/questions/65856535/build-heartbeat-observable-from-unpredictable-source-observable

与恶龙缠斗过久,自身亦成为恶龙;凝视深渊过久,深渊将回以凝视…
Welcome To Ask or Share your Answers For Others

1 Answer

0 votes
by (71.8m points)

For a source emitting faster than your interval

zip your source with an interval of the required time span.

zip(source, interval(500)).pipe(
  map(([value, _]) => value)  // only emit the source value
)

enter image description here

zip emits the 1st item from source with the 1st item from interval, then the 2nd item from source with the 2nd item from interval and so on. If the output observable should only emit when interval emits, the Nth value from source has to arrive before the Nth value from interval.

Potential Problem: If your source emits slower than interval at some point (i.e. the Nth value from source arrives after the Nth value from interval) then zip will emit directly without waiting for the next time interval emits.

// the 5th/6th value from source arrive after the 5th/6th value from interval
                                              v    v
source:       -1--------2-3---4---------------5----6-----
interval:     -----1-----2-----3-----4-----5-----6-----7-
zip output:   -----1-----2-----3-----4--------5----6-----
                   ?     ?     ?     ?        ??    ??
// emits 5 and 6 don't happen when interval emits

For a source emitting at any rate

function emitOnInterval<T>(period: number): MonoTypeOperatorFunction<T> {
  return (source: Observable<T>) =>
    defer(() => {
      let sourceCompleted = false;
      const queue = source.pipe(
        tap({ complete: () => (sourceCompleted = true) }),
        scan((acc, curr) => (acc.push(curr), acc), []) // collect all values in a buffer
      );
      return interval(period).pipe(
        withLatestFrom(queue), // combine with the latest buffer
        takeWhile(([_, buffer]) => !sourceCompleted || buffer.length > 0), // complete when the source completed and the buffer is empty
        filter(([_, buffer]) => buffer.length > 0), // only emit if there is at least on value in the buffer
        map(([_, buffer]) => buffer.shift()) // take the first value from the buffer
      );
    });
}

source.pipe(
  emitOnInterval(500)
)
// the 5th/6th value from source arrive after the 5th/6th value from interval
                                              v    v
source:       -1--------2-3---4---------------5----6-----
interval:     -----1-----2-----3-----4-----5-----6-----7-
output:       -----1-----2-----3-----4-----------5-----6-
                   ?     ?     ?     ?           ?     ?   
// all output emits happen when interval emits

https://stackblitz.com/edit/rxjs-qdlktm?file=index.ts


与恶龙缠斗过久,自身亦成为恶龙;凝视深渊过久,深渊将回以凝视…
Welcome to Vigges Developer Community for programmer and developer-Open, Learning and Share
...