Pausable Observable

There are many times when we might need to “pause” a live stream of data.

For example, your app is receiving live updates to a screen and then the user opens some sort of popup. We might choose to pause the live updates until the popup is closed. Once the popup closes, we need to apply the updates that have arrived in the mean time.

Often, it isn’t possible to actually pause the source of the data, such as a server. We also wouldn’t want to unsubscribe temporarily because updates could be missed.

Here is an extension method that takes an RX stream and makes it pausable. It is controlled by an observable which indicates whether the stream is paused or not. Updates are buffered whilst the stream is paused. When the stream is unpaused, buffered updates are pumped.

PausableObservable: https://gist.github.com/4317938
Unit tests: https://gist.github.com/4317948

public static IObservable<T> Pausable<T>(
           this IObservable<T> sourceStream,
           IObservable<bool> isPausedStream,
           bool startPaused = false)
        {
            return Observable.Create<T>(o =>
            {
                var subscriptions = new SerialDisposable();
                var replaySubjects = new SerialDisposable();
 
                var subscription = sourceStream.Publish(stream =>
                {
                    Func<ReplaySubject<T>> replaySubjectFactory = () =>
                    {
                        var rs = new ReplaySubject<T>();
 
                        replaySubjects.Disposable = rs;
                        subscriptions.Disposable = stream.Subscribe(rs);
 
                        return rs;
                    };
 
                    var replaySubject = replaySubjectFactory();
 
                    Func<bool, IObservable<T>> switcher = isPaused =>
                    {
                        if (isPaused)
                        {
                            replaySubject = replaySubjectFactory();
 
                            return Observable.Empty<T>();
                        }
                        else
                        {
                            return replaySubject.Concat(stream);
                        }
                    };
 
                    return isPausedStream
                        .StartWith(startPaused)
                        .DistinctUntilChanged()
                        .Select(switcher)
                        .Switch();
                }).Subscribe(o);
 
                return new CompositeDisposable(subscription, subscriptions);
            });
        }

This is based on an answer to a question on StackOverflow. However, there was a small issue which I had to fix – and I also needed to be able to specify in which state the pausable stream started.

Advertisements

Observable Cache

Source Code: https://gist.github.com/4115023

Given: An async operation that returns a value e.g web/service call or complex calculation
When: I request the value multiple times
Then: I only perform the operation once and cache the value

When: I haven’t requested the value
Then: Don’t perform the operation

When: I clear the cached value
Then: requesting the value again performs the async operation again

My current solution:


    public class Cache<T> : IDisposable
    {
        private readonly IConnectableObservable<T> _source;
        private IDisposable _connection;

        public Cache(IObservable<T> source)
        {
            _source = Observable.Create<T>(o => source.Subscribe(o))
                                .Replay(1, Scheduler.Immediate);
        }

        public IObservable<T> GetValue()
        {
            return Observable.Create<T>(o =>
                                         {
                                             var subscription = _source.Subscribe(o);
                                             _connection = _source.Connect();
                                             return subscription;
                                         });
        }

        public void Clear()
        {
            if (_connection != null)
            {
                _connection.Dispose();
                _connection = null;
            }
        }

        public void Dispose()
        {
            Clear();
        }
    }

Neat Disposal Pattern

You know when you’ve been a c# developer too long when seeing a slightly neater and elegant disposal pattern gets you excited.

If you’ve been doing things properly, then you’ll already have components that are IDisposable because they need to release references (to aid garbage collection), unhook event handlers, dispose RX subscriptions ¬†and clean up after themselves.

    public class MyComponent : IDisposable
    {
        // ...

        public void Dispose()
        {
            // Clean up component
        }
    }

Old School

Quite often, you’ll have sub-components that are also IDisposable and you need to make sure you dispose of those when you are disposed (presuming you’re responsible for their lifetime).

You could just fill your Dispose () method with individual calls to each of the IDisposables you hold. Straightforward if a little verbose.

    public class MyComponent : IDisposable
    {
        private ISubComponentA _a;
        private ISubComponentB _b;
        private ISubComponentC _c;

        public MyComponent(
            ISubComponentA a, 
            ISubComponentB b, 
            ISubComponentC c)
        {
            _a = a;
            _b = b;
            _c = c;
        }

        public void Dispose()
        {
            // Clean up component
            _a.Dispose();
            _b.Dispose();
            _c.Dispose();
        }
    }

Middle School

Or you might hold a List and add to it everything that needs disposing. Then in Dispose () you can loop over the list and dispose of everything. A bit neater.

    public class MyComponent : IDisposable
    {
        private List<IDisposable> _disposables = new List<IDisposable>(); 
        
        private ISubComponentA _a;
        private ISubComponentB _b;
        private ISubComponentC _c;

        public MyComponent(
            ISubComponentA a,
            ISubComponentB b,
            ISubComponentC c)
        {
            _a = a;
            _b = b;
            _c = c;

            _disposables.Add(_a);
            _disposables.Add(_b);
            _disposables.Add(_c);
        }

        public void Dispose()
        {
            // Clean up component
            _disposables.ForEach(d => d.Dispose());
            _disposables.Clear();
        }
    }

College

If you’re familiar with RX then you might have come across the CompositeDisposable to which you add IDisposables and then at some point simply dispose the CompositeDisposable. It takes care of disposing its contents.

using System.Reactive.Disposables;

    public class MyComponent : IDisposable
    {
        CompositeDisposable _compositeDisposable = new CompositeDisposable();

        private ISubComponentA _a;
        private ISubComponentB _b;
        private ISubComponentC _c;

        public MyComponent(
            ISubComponentA a,
            ISubComponentB b,
            ISubComponentC c)
        {
            _a = a;
            _b = b;
            _c = c;

            _compositeDisposable.Add(_a);
            _compositeDisposable.Add(_b);
            _compositeDisposable.Add(_c);
        }

        public void Dispose()
        {
            // Clean up component
            _compositeDisposable.Dispose();
        }
    }

University?

So the final neatening up that I saw recently was this.

You have a base class called DisposableObject. It has a method called AddDisposable(IDisposable x) which adds the x to a private CompositeDisposable.

DisposableObject then disposes of its CompositeDisposable in its Dispose method.

    public interface IDisposableObject: IDisposable
    {
        void AddDisposable(IDisposable disposableObject);
    }

    public class DisposableObject : IDisposableObject
    {
        CompositeDisposable _compositeDisposable = new CompositeDisposable();

        public void AddDisposable(IDisposable disposableObject)
        {
            _compositeDisposable.Add(disposableObject);
        }

        public virtual void Dispose()
        {
            _compositeDisposable.Dispose();
        }
    }

You then derive from DisposableObject and the idea is to pass any objects that we need disposing to the base class.

But here’s the neat bit.

Rather than calling the base class’ AddDisposable method directly, write an extension method that looks like this:

    public static class DisposableExtensions
    {
        public static T DisposeWith<T>(this T source, IDisposableObject target) where T : IDisposable
        {
            target.AddDisposable(source);
            return source;
        }
    }

This means that now the code can look like this:

    public class MyComponent : DisposableObject
    {
        private ISubComponentA _a;
        private ISubComponentB _b;
        private ISubComponentC _c;

        public MyComponent(
            ISubComponentA a,
            ISubComponentB b,
            ISubComponentC c)
        {
            _a = a;
            _b = b;
            _c = c;

            _a.DisposeWith(this);
            _b.DisposeWith(this);
            _c.DisposeWith(this);
        }
    }

Very readable and very neat and tidy.

It means that when you make an RX subscription, instead of storing that subscription in a member variable and remembering to dispose of it later, you can do this:

            IObservable<Trade> trades = _tradeService.GetTrades();

            var subscription = trades
                .Where(t => t.TradedAmount > 1000000000.00)
                .Subscribe(HandleLargeTrade)
                .DisposeWith(this);

And I thought that that was rather nice.

My latest thoughts are whether there is a neat way to null the references too. Hmm…