Pausable Observable

There are many times when we might need to “pause” a live stream of data.

For example, your app is receiving live updates to a screen and then the user opens some sort of popup. We might choose to pause the live updates until the popup is closed. Once the popup closes, we need to apply the updates that have arrived in the mean time.

Often, it isn’t possible to actually pause the source of the data, such as a server. We also wouldn’t want to unsubscribe temporarily because updates could be missed.

Here is an extension method that takes an RX stream and makes it pausable. It is controlled by an observable which indicates whether the stream is paused or not. Updates are buffered whilst the stream is paused. When the stream is unpaused, buffered updates are pumped.

PausableObservable: https://gist.github.com/4317938
Unit tests: https://gist.github.com/4317948

public static IObservable<T> Pausable<T>(
           this IObservable<T> sourceStream,
           IObservable<bool> isPausedStream,
           bool startPaused = false)
        {
            return Observable.Create<T>(o =>
            {
                var subscriptions = new SerialDisposable();
                var replaySubjects = new SerialDisposable();
 
                var subscription = sourceStream.Publish(stream =>
                {
                    Func<ReplaySubject<T>> replaySubjectFactory = () =>
                    {
                        var rs = new ReplaySubject<T>();
 
                        replaySubjects.Disposable = rs;
                        subscriptions.Disposable = stream.Subscribe(rs);
 
                        return rs;
                    };
 
                    var replaySubject = replaySubjectFactory();
 
                    Func<bool, IObservable<T>> switcher = isPaused =>
                    {
                        if (isPaused)
                        {
                            replaySubject = replaySubjectFactory();
 
                            return Observable.Empty<T>();
                        }
                        else
                        {
                            return replaySubject.Concat(stream);
                        }
                    };
 
                    return isPausedStream
                        .StartWith(startPaused)
                        .DistinctUntilChanged()
                        .Select(switcher)
                        .Switch();
                }).Subscribe(o);
 
                return new CompositeDisposable(subscription, subscriptions);
            });
        }

This is based on an answer to a question on StackOverflow. However, there was a small issue which I had to fix – and I also needed to be able to specify in which state the pausable stream started.

Advertisements