Mind your subscriptions in your IObservable extensions

When creating new Rx operators that extend IObservable, it’s a good best practice to make sure you only subscribe once to the source. Users of your operator have a right to expect that subscriptions to your operator will only in turn cause a single subscription to the source. This could become crucial if the source is cold – multiple subscriptions might cause unintended side-effects.

Here is an example (somewhat contrived for simplicity). Let’s say you wanted to have a running count of the number of OnNext()s that have occurred in a source stream within the previous 5 seconds. You might write this as follows:

public static IObservable<int> WindowCount<T>(
  this IObservable<T> source,
  TimeSpan interval)
{
    const int onNextEntersWindow = 1;
    const int onNextExitsWindow = -1;

    return Observable.Create<int>(observer =>
        {
            return Observable
              .Merge(source.Select(_ => onNextEntersWindow),
                     source.Select(_ => onNextExitsWindow)
                           .Delay(interval))
              .Scan((countInWindow, onNextEvent) =>
                countInWindow + onNextEvent)
              .Subscribe(observer);
        });
}

This code is projecting the source stream to TWO separate streams and merging them. The first is a stream of 1′s, the second a stream of -1′s that is delayed by an interval. These are then added together with Scan for a real time running total. The effect of this is for each event in the source, when get a count of the number of events that have occurred within the preceding interval. This is a useful idea for a number of advanced operators… but I digress.

The key point is that this operation creates TWO subscriptions to the source – one for each stream. The fix is straightforward – you simply publish the source and subscribe to the publication as many times as you need. Effectively you are working with a version of the source stream guaranteed to be hot. The fix looks like this:

public static IObservable<int> WindowCount<T>(
  this IObservable<T> source,
  TimeSpan interval)
{
    const int onNextEntersWindow = 1;
    const int onNextExitsWindow = -1;

    return Observable.Create<int>(observer =>
        {
            var hotSource = source.Publish().RefCount();

            return Observable
              .Merge(hotSource.Select(_ => onNextEntersWindow),
                     hotSource.Select(_ => onNextExitsWindow)
                              .Delay(interval))
              .Scan((countInWindow, onNextEvent) =>
                countInWindow + onNextEvent)
              .Subscribe(observer);
        });
}

You must be accurate with the Publish. Putting it before the Create would cause a single Subscription across ALL subscriptions to a given invocation. By doing the Publish inside the Create, we get the desired behaviour of one Subscription to source per invocation.