Detecting disconnected clients the Rx way

Today we’ll look at a fairly common server-side scenario. In this case, a server installs clients on remote machines and clients report back through a socket.

We want to create a “hold-timer” – each client will periodically send a heartbeat with a unique id back to the server to indicate they are alive. If the server doesn’t receive a heartbeat from a client within a specified limit then the client is removed.

Here’s a simple snippet that demonstrates how you can do this with reactive extensions. If you want to run it, paste the code into a console app – and don’t forget to add Nuget package Rx-Main :

var clientHeartbeats = new Subject<int>();
var timeToHold = TimeSpan.FromSeconds(5);

var expiredClients = clientHeartbeats
  .Sychronize()
  .GroupBy(key => key)
  .SelectMany(grp => grp.Throttle(timeToHold));
  
var subscription = expiredClients.Subscribe(
  // in here add your disconnect action
  i => Console.WriteLine("Disconnect Client: " + i));
	
while(true)
{
  var num = Console.ReadLine();
  if (num == "q")
  {
    break;
  }
  // call OnNext with the client id each time they send hello
  clientHeartbeats.OnNext(int.Parse(num));
}

// to tear down the monitor
subscription.Dispose();

So let’s break this down. We will create a Subject to which the client heartbeats will be posted. Every time a client heartbeat arrives, we call OnNext on the clientHeartbeats Subject, passing the client id. This gives us an IObservable<int> stream of client ids.

Let’s gloss over the Synchronize() call for now – I’ll come back to this later.

The first thing we want to do is separate out each client’s events into seperate streams for independent consideration. We can use Observable.GroupBy to do this. GroupBy needs a key selector which will identify each group. In this case we can use the identity function (key => key) and group by the id itself.

The output of GroupBy is a stream of streams – each stream emitted is an IGroupedObservable<int> that is just like an IObservable<int> but that also has a Key property to indicate which group it is. So the return type of GroupBy is an eye-watering IObservable<IGroupedObservable<TSource, TKey>>.

So now we have individual streams for each client id to work with. Let’s consider the goal – we want to emit a signal whenever a client doesn’t notify us for a specified duration.

Observable.Throttle is what we need. This extension method will suppress an event if another event appears within a specified duration. Perfect! We apply a Throttle to each group and we will only get an event if a client doesn’t say hello in time.

The situation so far is illustrated below:

Now we need to pull all the separate streams back into one result stream. For this, we can use Observable.SelectMany. One of its many (no pun intended!) overloads will do this for us, accepting a stream of streams and using a function pull out elements to be sent to the result stream. The selector function takes each group stream and we can apply the Throttle behaviour described above.

Now, there’s a problem here. The GroupBy operator has a memory leak! As written, it’s groups will never terminate. Even worse, if you try to terminate a group subscription (with say a Take(1) after our throttle), then the group will end, but won’t ever restart! I wouldn’t get too carried away worrying about this unless you have very large numbers of clients and never restart your service but…

You can address this problem by using Observable.GroupByUntil which takes a duration function for each group, and allows groups to restart. Here’s the code – I’ll leave it as an exercise for you to figure out the detail of how it works and why I’ve written it this way. :) :

var expiredClients = clientHeartbeats
  .GroupByUntil(key => key, grp => grp.Throttle(timeToHold))
  .SelectMany(grp => grp.TakeLast(1));

One final note… about that Observable.Synchronize. This function ensures that the IObservable<T> it is applied to behaves itself! It enforces the Rx grammar of OnNext*(OnComplete|OnError)? and it makes OnNext calls thread safe. In Rx 2.x, although Subject enforces correct grammar, OnNext calls are not thread safe. This was a decision made for performance reasons. It’s quite possible that in production server code you will be wanting to call OnNext from different threads. Synchronize will help prevent any nasty race conditions by putting a gate around OnNext calls.

Comparing previous and current items with Rx

Here’s an interesting problem that came up on the RX forums. How do you compare the current and previous items in an observable stream?

We can create a general purpose method to pair up the current and previous items, which will enable you to easily tack on whatever comparison function you like, such as a Where or Select. Here’s a picture of what we want to do:

Pair With Previous

Observable.Scan gives us an excellent starting point for this.

We’ll call the extension PairWithPrevious. Here’s an example of how you might use it to write out the deltas in a stream of integers:

var source = new Subject<int>();
var delta = source.PairWithPrevious()
                  .Select(pair => pair.Item2 - pair.Item1)
                  .Subscribe(Console.WriteLine);
			
source.OnNext(1);
source.OnNext(5);
source.OnNext(3);

This gives the following output (note the first delta value is from the default integer value of 0 to the initial value of 1):

1
4
-2

Here’s the code:

public static IObservable<Tuple<TSource, TSource>>
    PairWithPrevious<TSource>(this IObservable<TSource> source)
{
    return source.Scan(
        Tuple.Create(default(TSource), default(TSource)),
        (acc, current) => Tuple.Create(acc.Item2, current));
}

The first argument to scan specifies a “seed” that initializes the “accumulator”. This is not returned – it gives an initial value for the accumulator function. As each item arrives, the accumulator is passed to an accumulator function along with the current item. The function produces the next value for the accumulator. This new accumulator value is output to observers.

So Observable.Scan is similar to Observable.Aggregate, except it emits the value of the aggregate as it goes instead of only when the source stream completes.

I’m using the built in Tuple type here to produce the resulting pairs – this has an advantage over anonymous types because you can return them from a method. If you have specific needs, you can consider a custom type. Tuple provides a way of grouping values and has a set of ItemX properties, one for each value called Item1, Item2 and so on.

The seed value is a Tuple pair of defaults for the source type. On each iteration, we create a new Tuple, copying the old Item2 to Item1, and setting Item2 with the current value. We create a new Tuple because Tuples are immutable – if you are using a custom type, you could update it – but I actually think immutability is a safer way to go in RX in general, unless you are especially worried about memory and GC performance.

Finally, here’s a unit test I wrote for this function – I used Nuget packages Rx-Testing and NUnit:

public class PairWithPreviousTests : ReactiveTest
{
[Test]
public void Works()
{
    var testScheduler = new TestScheduler();

    var source = Observable.Range(1, 3);

    var results = testScheduler.Start(
        () => source.PairWithPrevious());

    results.Messages.AssertEqual(
        OnNext(Subscribed, Tuple.Create(0, 1)),
        OnNext(Subscribed, Tuple.Create(1, 2)),
        OnNext(Subscribed, Tuple.Create(2, 3)),
        OnCompleted<Tuple<int, int>>(Subscribed));
}
}

This post was inspired in part by an old question on StackOverflow with an answer that suffers from a double subscription to the source. If you like the approach here, I’d be dead pleased if you could up-vote my answer on SO. :) Thanks!

ObserveLatestOn – A coalescing ObserveOn

This is a problem that comes up when the UI dispatcher can’t keep up with inbound activity.

The example I saw on a project was a price stream that had to be displayed on the UI that could get very busy.

If more than one new price arrives on background threads in between dispatcher time slices, there is no point displaying anything but the most recent price – in fact we want the UI to “catch up” by dropping all but the most recent undisplayed price.

So the operation is like an ObserveOn that drops all but the most recent events. Here’s a picture of what’s happening – notice how price 2 is dropped and how prices are only published during the dispatcher time slice:

Here is the code (by the way, if you’re not sure what Materialize and Accept are, I wrote about these here):

public static IObservable<T> ObserveLatestOn<T>(
    this IObservable<T> source, IScheduler scheduler)
{
return Observable.Create<T>(observer =>
    {
    Notification<T> outsideNotification = null;
    var gate = new object();
    bool active = false;
    var cancelable = new MultipleAssignmentDisposable();
    var disposable = source.Materialize().Subscribe(thisNotification =>
    {
        bool wasNotAlreadyActive;
	lock (gate)
	{
	    wasNotAlreadyActive = !active;
	    active = true;
	    outsideNotification = thisNotification;
	}

	if (wasNotAlreadyActive)
	{
	    cancelable.Disposable = scheduler.Schedule(self =>
            {
                Notification<T> localNotification = null;
                lock (gate)
                {
                    localNotification = outsideNotification;
                    outsideNotification = null;
                }
                localNotification.Accept(observer);
                bool hasPendingNotification = false;
                lock (gate)
                {
                    hasPendingNotification = active = (outsideNotification != null);
                }
                if (hasPendingNotification)
                {
                     self();
                }
            });
        }
    });
    return new CompositeDisposable(disposable, cancelable);
});
}

The key idea here is that we keep track of the notification to be pushed on to the target scheduler in pendingNotification – and whenever an event is received, we swap the pendingNotification for the new notification. We ensure the new notification will be scheduled for dispatch on the target scheduler – but we may not need to do this…

If the previousNotification is null we know that either (a) there was no previous notification as this is the first one or (b) the previousNotification was already dispatched. How to we know this? Because in the scheduler action that does the dispatch we swap the pendingNotification for null! So if previousNotification is null, we know we must schedule a new dispatch action.

This approach keeps the checks, locks and scheduled actions to a minimum.

Notes and credits:

I’ve gone round the houses a few times on this implementation – my own attempts to improve it to use CAS rather than lock ran into bugs, so the code below is largely due to Lee Campbell, and edited for RX 2.0 support by Wilka Hudson. For an interesting discussion on this approach see this thread on the official RX forum.

A reactive join example

A good example for using a reactive join came up the other day. Given a realtime price stream, we want to display the most recent price whenever a button is clicked.

Joins in Rx ask when it is that events from pairs of streams coincide.

To reason about whether events occur at the same time, they need to have a duration as well as a value. That duration could be infinitessimally small (a point event), or it could be longer.

Duration in Rx is expressed with (you might have guessed) an observable. Each event in a source stream has an associated duration stream. The duration of the event is the time it takes for the duration stream to emit an OnNext or OnCompleted.

So back to the example. The prices are represented as an IObservable<Price>.

Through the use of Observable.FromEvent we can easily transform click events into an IObservable<Unit>. (Recall Unit is a type representing an event whose value is not interesting.)

The result we want will be an IObservable<Price> that emits the most recent Price from the price stream whenever the IObservable<Unit> stream emits a Unit.

The join should tell us when the duration of a price coincides with a click. Here’s a visual representation of the idea:

Visualizing SampleOn

This is quite similar to the built in Rx Sample extension method that samples a source stream at periodic intervals, so I decided to call this operation SampleOn.

To use a join here we want to give each Price a duration that lasts until the next Price is emitted. A duration that lasts until the next event is quite a common requirement. Happily, we can use a useful trick and take the Price stream itself as the duration observable. In this way, the next Price will terminate the duration of the previous Price.*

The duration of the Click event is a point event – to represent this we just a stream that terminates immediately, and Observable.Empty<Unit>() will do nicely.

Here is the code:

public static IObservable<TSource>
    SampleOn<TSource, TSample>(
        this IObservable<TSource> source,
        IObservable<TSample> sampler)
        {
            return from s in source
                   join t in sampler
                     on source equals Observable.Empty<Unit>()
                   select s;
        }

The query syntax for joins uses the following pattern:

from left event in left stream
join right event in right stream
on left duration function equals right duration function
select selector function.

* One thing to point out when using a stream as it’s own duration function, as I do in this example, is that this is going to cause the stream to be subscribed to multiple times – you will want to either Publish()/RefCount() the source stream or ensure that it is hot. I talked about this previously here.

Mind your subscriptions in your IObservable extensions

When creating new Rx operators that extend IObservable, it’s a good best practice to make sure you only subscribe once to the source. Users of your operator have a right to expect that subscriptions to your operator will only in turn cause a single subscription to the source. This could become crucial if the source is cold – multiple subscriptions might cause unintended side-effects.

Here is an example (somewhat contrived for simplicity). Let’s say you wanted to have a running count of the number of OnNext()s that have occurred in a source stream within the previous 5 seconds. You might write this as follows:

public static IObservable<int> WindowCount<T>(
  this IObservable<T> source,
  TimeSpan interval)
{
    const int onNextEntersWindow = 1;
    const int onNextExitsWindow = -1;

    return Observable.Create<int>(observer =>
        {
            return Observable
              .Merge(source.Select(_ => onNextEntersWindow),
                     source.Select(_ => onNextExitsWindow)
                           .Delay(interval))
              .Scan((countInWindow, onNextEvent) =>
                countInWindow + onNextEvent)
              .Subscribe(observer);
        });
}

This code is projecting the source stream to TWO separate streams and merging them. The first is a stream of 1′s, the second a stream of -1′s that is delayed by an interval. These are then added together with Scan for a real time running total. The effect of this is for each event in the source, when get a count of the number of events that have occurred within the preceding interval. This is a useful idea for a number of advanced operators… but I digress.

The key point is that this operation creates TWO subscriptions to the source – one for each stream. The fix is straightforward – you simply publish the source and subscribe to the publication as many times as you need. Effectively you are working with a version of the source stream guaranteed to be hot. The fix looks like this:

public static IObservable<int> WindowCount<T>(
  this IObservable<T> source,
  TimeSpan interval)
{
    const int onNextEntersWindow = 1;
    const int onNextExitsWindow = -1;

    return Observable.Create<int>(observer =>
        {
            var hotSource = source.Publish().RefCount();

            return Observable
              .Merge(hotSource.Select(_ => onNextEntersWindow),
                     hotSource.Select(_ => onNextExitsWindow)
                              .Delay(interval))
              .Scan((countInWindow, onNextEvent) =>
                countInWindow + onNextEvent)
              .Subscribe(observer);
        });
}

You must be accurate with the Publish. Putting it before the Create would cause a single Subscription across ALL subscriptions to a given invocation. By doing the Publish inside the Create, we get the desired behaviour of one Subscription to source per invocation.

TypeScript is well worth a look

Anders Hejlsberg, the language designer integral to the development of C#, announced TypeScript last week. In summary, this is a superscript of Javascript that provides an optional way to introduce static typing to part or all of your Javascript to support the development process. It compiles down to Javascript and is fully compatible with Javascript hosts.

In a way, I think TypeScript was inevitable - it’s the clear evolution of tools like Clojure and CoffeeScript that themselves have made solid progress towards solving the problem of application scale javascript.

Microsoft’s timing is perfect for me. I am a strong proponent of SOA and have been more and more inclined as time has gone on to weave services in the UI as late as possible – but I’ve always found large javascript applications problematic to manage. TypeScript looks like it can really help take away a lot of the pain. I particularly like the virtual project system and the way you can provide idl like *.d.ts files to provide intellisense for pre-exisitng Javascript libraries.

If you do any kind of Javascript, do check it out.

Why Materialize? And helpers to log Rx.

I wrote some helper extension methods to log what’s going on in my Rx expressions. These leverage the Materialize method which has this signature:

public static IObservable<Notification<TSource>>
  Materialize<TSource>(this IObservable<TSource> source)

Materialize projects a stream of values of T into a stream of Notification<T> values. Notifications have a Kind that tells us whether the original value was an OnNext, OnError or OnCompleted. They give you access to the original Value or Exception as appropriate. You can also pass a Notification on to an observer via its Accept method.

To put it another way, (the way the docs do), it turns implicit notifications into explicit notifications. Implicit because the method called on the observer (OnNext, OnError, OnCompleted) tells us implicitly what kind of notification it is, whereas the Notifications, which are always delivered into OnNext, are explicitly declaring the type via their Kind property.

Incidentally, there is a corresponding Dematerialize method that performs the reverse operation. Finally, Materialized streams send OnCompleted after they send a Notification of Kind OnError or OnCompleted. They don’t OnError themselves.

So why is this useful?

Well if you ever want to process all events on a stream in a uniform manner, without Materialize you’d be stuck writing handlers for each of the three methods. With Materialize, you can do it all in one place.

Here are some practical examples I use when I want to inspect what’s going on with my Rx. The first example is a side effect, like Do(), that can be injected into a stream to write out what’s happening to the Console. It’s an extension method:

public static IObservable<T> LogToConsole<T>(
    this IObservable<T> source, string key = null)
{
    return Observable.Create<T>(observer =>
        source.Materialize().Subscribe(notification =>
        {
            Console.WriteLine(NotifyToString(notification, key));
            notification.Accept(observer);
        }));
}

This uses Materialize to get a stream of Notifications which can then be written out to the Console using the helper method NotifyToString, shown later. I use the Accept method to pass the notification on to the observer. The key property optionally allows me to label the output. Here’s an example:

Observable.Range(0, 4).LogToConsole("Range: ").Subscribe();

Which gives the output:

Range:   OnNext(0)
Range:   OnNext(1)
Range:   OnNext(2)
Range:   OnNext(3)
Range:   OnCompleted()

Note that LogToConsole subscribes “on demand” (as all good Rx operators should) and needs a subscriber itself for anything to happen. It’s a side-effect, so be warned it log for EVERY subscriber.

It’s quite common for me to be experimenting with the output of operators in tools like LINQPad, so I also wrote another extension method to take care of the subscription as well:

public static IDisposable SubscribeConsoleNotifier<T>(
    this IObservable<T> source, string key = null)
{
    return source.Materialize().Subscribe(
        n => Console.WriteLine(NotifyToString(n, key)));
}

And an example of usage putting both together:

Observable.Range(0, 4)
          .LogToConsole("Range: ")
          .Where(i => i % 2 == 0)
          .SubscribeConsoleNotifier("Where: ");

Gives this, notice how the LogToConsole shows the output from Range before the Where filter is applied:

Range:   OnNext(0)
Where:   OnNext(0)
Range:   OnNext(1)
Range:   OnNext(2)
Where:   OnNext(2)
Range:   OnNext(3)
Range:   OnCompleted()
Where:   OnCompleted()

Obviously, you can code these to write to a logger, or something more generally useful.

To wrap this post up, here’s the implementation of the helper NotifyToConsole:

public static string NotifyToString<T>(
    Notification<T> notification, string key = null)
{
    if (key != null)
        return key + "\t" + notification;
    
    return notification.ToString();
}

It’s all trivial stuff, but I’ve found it quite useful.

Rx Maintenance Part 1: Are the reactive extensions a maintenance nightmare?

For the record, I’m a huge fan of Rx, but I’ve been thinking a lot about lately about how to keep Rx code clean and easy to understand. The project I’m currently involved in makes extensive use of Rx in a large multi-tiered system. I’m talking 1000′s of lines of Rx.

I joined the project roughly two years into its lifespan. I am pretty up to speed on Rx and I am quite used to maintaining code. Nonetheless, I have found understanding the code to be more challenging that I would have expected. Debugging complex chains of Rx operators is not for the faint of heart!

This project was an early adopter of Rx, and the developers were learning as they went. Not only that, Rx itself has changed and grown a fair bit along the way. This might go some way to explaining the lack of readability. But I don’t think it is the whole story.

In a previous project, I created a rate limiter to throttle the amount of calls web users were making against an API. This was a refactoring to Rx of some older code. It didn’t have the excuse that Rx changed a lot while I was writing it. What I ended up with was about 10-20% the size of the original code and certainly more elegant.

But trying to explain how it worked was challenging, and at the time I wondered whether the succinctness would actually be a barrier to successful maintenance. The truth of that came from a brown-bag I ran on the code for which I had 10 slides of marble diagrams to help explain how it worked! We all know the chances of that kind of documentation being maintained are pretty much zero.

This combined with other Rx experiences has left me wondering what guidelines we should be laying down to keep Rx maintainable and how, if at all, these guidelines would differ from typical best practices for creating maintainable code.

I’ll be exploring these thoughts along with some practical code examples on and off over the next few weeks. Stay tuned!