Pausable Observable

There are many times when we might need to “pause” a live stream of data.

For example, your app is receiving live updates to a screen and then the user opens some sort of popup. We might choose to pause the live updates until the popup is closed. Once the popup closes, we need to apply the updates that have arrived in the mean time.

Often, it isn’t possible to actually pause the source of the data, such as a server. We also wouldn’t want to unsubscribe temporarily because updates could be missed.

Here is an extension method that takes an RX stream and makes it pausable. It is controlled by an observable which indicates whether the stream is paused or not. Updates are buffered whilst the stream is paused. When the stream is unpaused, buffered updates are pumped.

PausableObservable: https://gist.github.com/4317938
Unit tests: https://gist.github.com/4317948

public static IObservable<T> Pausable<T>(
           this IObservable<T> sourceStream,
           IObservable<bool> isPausedStream,
           bool startPaused = false)
        {
            return Observable.Create<T>(o =>
            {
                var subscriptions = new SerialDisposable();
                var replaySubjects = new SerialDisposable();
 
                var subscription = sourceStream.Publish(stream =>
                {
                    Func<ReplaySubject<T>> replaySubjectFactory = () =>
                    {
                        var rs = new ReplaySubject<T>();
 
                        replaySubjects.Disposable = rs;
                        subscriptions.Disposable = stream.Subscribe(rs);
 
                        return rs;
                    };
 
                    var replaySubject = replaySubjectFactory();
 
                    Func<bool, IObservable<T>> switcher = isPaused =>
                    {
                        if (isPaused)
                        {
                            replaySubject = replaySubjectFactory();
 
                            return Observable.Empty<T>();
                        }
                        else
                        {
                            return replaySubject.Concat(stream);
                        }
                    };
 
                    return isPausedStream
                        .StartWith(startPaused)
                        .DistinctUntilChanged()
                        .Select(switcher)
                        .Switch();
                }).Subscribe(o);
 
                return new CompositeDisposable(subscription, subscriptions);
            });
        }

This is based on an answer to a question on StackOverflow. However, there was a small issue which I had to fix – and I also needed to be able to specify in which state the pausable stream started.

Advertisements

Observable Cache

Source Code: https://gist.github.com/4115023

Given: An async operation that returns a value e.g web/service call or complex calculation
When: I request the value multiple times
Then: I only perform the operation once and cache the value

When: I haven’t requested the value
Then: Don’t perform the operation

When: I clear the cached value
Then: requesting the value again performs the async operation again

My current solution:


    public class Cache<T> : IDisposable
    {
        private readonly IConnectableObservable<T> _source;
        private IDisposable _connection;

        public Cache(IObservable<T> source)
        {
            _source = Observable.Create<T>(o => source.Subscribe(o))
                                .Replay(1, Scheduler.Immediate);
        }

        public IObservable<T> GetValue()
        {
            return Observable.Create<T>(o =>
                                         {
                                             var subscription = _source.Subscribe(o);
                                             _connection = _source.Connect();
                                             return subscription;
                                         });
        }

        public void Clear()
        {
            if (_connection != null)
            {
                _connection.Dispose();
                _connection = null;
            }
        }

        public void Dispose()
        {
            Clear();
        }
    }

DateTime.MinValue – be careful!

We’ve just had a support call at work where the user was running our application in Singapore.

Our investigation uncovered an interesting “feature” of using DateTime.MinValue.

Have a look at the following test:

namespace Stuff
{
    using System;
    using NUnit.Framework;

    [TestFixture]
    public class MinDateTimeOffset
    {
        [Test]
        public void Testing_MinValue_in_DateTime()
        {
            Action<string> p = Console.WriteLine;

            // This is Now in the current timezone
            var nowOffset = new DateTimeOffset(DateTime.Now);
            p("Current time:");
            p("- current timezone: " + nowOffset);
            p("- UTC equivilent time: " + nowOffset.ToUniversalTime());
            p("");

            // This is the minimum time allowed in the current timezone
            var minOffset = DateTimeOffset.MinValue;
            p("DateTimeOffset.MinValue:");
            p("- current timezone: " + minOffset);
            p("- UTC equivilent time: " + minOffset.ToUniversalTime());
            p("");

            // This is UTC minimum time
            var minValue = DateTime.MinValue;
            p("DateTime.MinValue:");
            p("- current timezone: " + minValue);
            p("- UTC equivilent time: " + minValue.ToUniversalTime());
            p("");

            // Converted to the current timezone
            try
            {
                p("Trying to cast DateTime.Min to DateTimeOffset:");

                var minDateTimeAsDateTimeOffset = (DateTimeOffset)minValue;
                p("- current timezone: " + minDateTimeAsDateTimeOffset);
                p("- UTC equivilent time: " + minDateTimeAsDateTimeOffset.ToUniversalTime());
            }
            catch (Exception ex)
            {
                p("Error: " + ex.Message);
            }
        }
    }
}

We are based in London, and when we run it, we see this output:


Current time:
- current timezone: 31/10/2012 11:46:35 +00:00
- UTC equivilent time: 31/10/2012 11:46:35 +00:00

DateTimeOffset.MinValue:
- current timezone: 01/01/0001 00:00:00 +00:00
- UTC equivilent time: 01/01/0001 00:00:00 +00:00

DateTime.MinValue:
- current timezone: 01/01/0001 00:00:00
- UTC equivilent time: 01/01/0001 00:00:00

Trying to cast DateTime.Min to DateTimeOffset:
- current timezone: 01/01/0001 00:00:00 +00:00
- UTC equivilent time: 01/01/0001 00:00:00 +00:00

However, when run in Singapore, you get this:

Current time:
- current timezone: 31/10/2012 19:47:38 +08:00
- UTC equivilent time: 31/10/2012 11:47:38 +00:00

DateTimeOffset.MinValue:
- current timezone: 01/01/0001 00:00:00 +00:00
- UTC equivilent time: 01/01/0001 00:00:00 +00:00

DateTime.MinValue:
- current timezone: 01/01/0001 00:00:00
- UTC equivilent time: 01/01/0001 00:00:00

Trying to cast DateTime.Min to DateTimeOffset:
Error: The UTC time represented when the offset is applied must be between year 0 and 10,000.
Parameter name: offset

In New York, you get this :

Current time:
- current timezone: 31/10/2012 07:48:50 -04:00
- UTC equivilent time: 31/10/2012 11:48:50 +00:00

DateTimeOffset.MinValue:
- current timezone: 01/01/0001 00:00:00 +00:00
- UTC equivilent time: 01/01/0001 00:00:00 +00:00

DateTime.MinValue:
- current timezone: 01/01/0001 00:00:00
- UTC equivilent time: 01/01/0001 05:00:00

Trying to cast DateTime.Min to DateTimeOffset:
- current timezone: 01/01/0001 00:00:00 -05:00
- UTC equivilent time: 01/01/0001 05:00:00 +00:00

The value of DateTime.MinValue can not be cast to a DateTimeOffset if you are east of London!

C# to Javascript?

In a recent conversation with friends at work, we were talking about the recent emergence of TypeScript.

We’re all primarily C# .Net developers and we were saying that instead of having to learn a new language, wouldn’t it be neat if there was a way to compile C# to Javascript or run .Net IL on a Javascript implementation of the .Net VM?

Well, would you believe it? Miguel de Icaza investigated this only last month:

Miguel de Icaza: 2012 Update: Running C# on the Browser

It’s all about TypeScript

I’ve heard several of my friends talking about a new pretender on the block called TypeScript. It’s a language that sits as a superset of  JavaScript, adding static typing, interfaces, classes and more.

As a C# .Net developer, I was very interested to read about what it’s like from a similar point of view. Thankfully, Dave Hanson (@LordHanson) has blogged about a little experimental app he had a go with. Check it out here:

“Cracking a problem with TypeScript”

It looks we all (.Net devs) might be heading in that direction and Microsoft realise that it’ll need to bridge the gap. Looks like a good start. I’m almost excited about coding for the browser again.

Owned without Owned<> – Part 1

Our neverending quest to make our code neater and tidier continues.

“Given a screen in our application, when the screen closes, make sure everything has been disposed.”

Seems reasonable.

When using a DI container such as Autofac, it seems like a good idea to make the most of its powerful features. If we create the screen is its own child container / lifetime scope, then disposing of that child container disposes the screen and all other objects that it resolved (or that they resolved and so on).

This works very nicely and viewing the app under the memory profiler magnifying glass confirms that everything is being cleared away nicely.

Owned<T>

The syntax for creating the Child Container in AutoFac is by injecting the object you want wrapped (for example the screen) inside an Owned<T>. For more details, see the AutoFac Documentation or Nicholas Blumhardt’s “An Autofac Lifetime Primer”.

Whilst Owned is without doubt powerful and useful, it does has the slight drawback that the fact that an object is Owned can leak out of your component.

So for example, let’s say you are using a strategy pattern to create different Screens, like this:


// Our example Strategies look like this:
public interface IStrategy
{
     IScreen GetScreen();
}

// Here's a strategy that returns Red Screens
public class RedStrategy : IStrategy
{
    private Func<RedScreen> _redScreenFactory;

    // Inject our factory
    public RedStrategy(Func<RedScreen> redScreenFactory)
    {
        _redScreenFactory = redScreenFactory;
    }

    public IScreen GetScreen()
    {
        return _redScreenFactory();
    }
}

You can imagine having several different types of strategies returning, say, Blue and Green screens.

Anyway, let’s say that Red Screens create lots of children and are particularly prone to leaking memory. It would be nice to wrap them in an Owned.

However, what does our strategy return to the consumer?


public class RedStrategy : IStrategy
{
    // Our factory now creates Owned<RedScreen>
    private Func<Owned<RedScreen>> _redScreenFactory;

    // Inject our factory
    public RedStrategy(Func<Owned<RedScreen>> redScreenFactory)
    {
        _redScreenFactory = redScreenFactory;
    }

    public IScreen GetScreen()
    {
        var ownedRedScreen = _redScreenFactory();
        return // ...?
    }
}

If we return ownedRedScreen.Value then we are losing the reference to the child container. This means that we can’t dispose it and clean up all of the contents.

On the other hand, if we change the IStrategy interface to this:

Owned<IScreen> GetScreen()

…then we are forcing all IStrategy implementations to wrap their IScreens in a child container. We are also leaking the fact that they are Owned the consumer and it must now handle Owned<IScreen>.

(This is not to even mention that Owned<RedScreen> can’t be cast to Owned<IScreen> either).

I’ll explain a couple of solutions we came up with – one neat, and one very neat (imho of course :-))