Process this!

During a code review the other day, I came up against an old friend – in this case a method called “ProcessImage”. Set inside, of course a class called “ImageProcessor”. Method names should be *intention* revealing, not *implementation* revealing. If you think of a program as being like a sausage machine – meat in, sausages out – then don’t call the operative method “ProcessMeat” – call it “MakeSausages”. This reveals the purpose clearly and uses the language of the business.

Uploading an existing local git repo to a new github/bitbucket repo.

Note to self: Bitbucket and Github’s instructions for uploading an existing local repo have this instruction:

git remote add origin <address>

But something in my set-up (posh-git?) adds a default origin entry causing this to fail with:

fatal: remote origin already exists

To fix this use:

git remote set-url origin <address>

And then upload as normal with:

git push -u origin

Maybe this post will prevent me having to look this up every time I run into it! :)

Get a .gitignore file for Visual Studio fast with and Powershell

I often use the wonderful service at to grab a .gitignore file suitable for Visual Studio use. Unfortunately, the file has unix line endings – so I’ve boiled the operation down to a single line of Powershell that sorts that out in one go:

  -split "`n" -join "`r`n" | set-content .gitignore

Use Powershell 4 to check file signatures/hashes

I wasted a couple of DVDs this morning trying to burn a corrupted download of SQL Server from MSDN. Finally the light dawned and I decided to check the file hash. MSDN subscriber downloads have the SHA1 hash on the download page. I didn’t have my usual checker installed yet on my new laptop, and in googling I discovered that Powershell 4 can now do this natively!

The command you need is Get-FileHash – MSDN uses SHA1 rather than the default SHA256 of this command so the following will do it:

Get-FileHash <filepath> -Algorithm SHA1

Rx Hammer-Nail Syndrome

This post is inspired by a recent post by Mike Taulty. I think you’ll agree it’s a great introductory post on how to use Rx.

I’m a big fan of Rx – it’s an awesome technology. But sometimes I am guilty of over-using it – the old adage that given a shiny Rx hammer every problem looks like a nail definitely applies!

If you haven’t already, please take a look at Mike’s post here to compare with what follows.

While the post was clearly intended as an instructive and motivational example, it did remind me of my own tendency to reach for Rx in similar production-code situations. I often need to remind myself that a non-Rx version can sometimes be more straight-forward. I think this applies here if you look at code re-written to leverage the async/await syntax of C# 5:

Firstly, here is the non-Rx version of MakeObservableMovieDbRequest:

static async Task<T> MakeMovieRequestAsync<T>(
    string url,
    params KeyValuePair<string, string>[] additionalParameters)
    var client = new RestClient(BaseUrl) { Proxy = new WebProxy() };

    var parameterisedUrl = new StringBuilder(MakeUrl(url));

    foreach (var keyValuePair in additionalParameters)
            keyValuePair.Key, keyValuePair.Value);

    var request = new RestRequest(parameterisedUrl.ToString());

    var result = await client.ExecuteGetTaskAsync<T>(request);

    return result.Data;

Since a Task can be easily translated to an IObservable<T> with a call to ToObservable(), I think it’s just easier and more flexible to leave this function returning a Task. With the use of async/await we remove some braces and make the code more readable.

The same is true when using this method to page through the action movies:

static async Task GetActionMovies()
    var genreCollection =
        await MakeMovieRequestAsync<GenreCollection>(@"genre/list");

    var actionGenreId = (from genre in genreCollection.Genres
                            where genre.Name == "Action"
                            select genre.Id).First();

    var actionMoviesUrl = string.Format(
        @"genre/{0}/movies", actionGenreId);

    var actionMovieCollection =
        await MakeMovieRequestAsync<MovieCollection>(actionMoviesUrl);

    var totalPages = actionMovieCollection.TotalPages;

    for (int i = 1; i <= totalPages; i++)
        var parameter = new KeyValuePair<string, string>(
            "page", i.ToString(CultureInfo.InvariantCulture));
        var movies = await MakeMovieRequestAsync<MovieCollection>(
            actionMoviesUrl, parameter);

        foreach (var movie in movies.Results)
            Console.WriteLine("Movie {0}", movie.Title);

The use of async/await means this code tells a clear story here – there are no obscure uses of Rx features like Concat to constrain concurrency; the for loop makes it clear how the results are being pulled together.

There are plenty of examples where Rx makes sense – often when coordinating complex streams.

The Rx solution is actually quite involved with some concepts that are not particularly intention revealing. In my opinion, in this case it makes the solution harder to maintain compared with the async/await approach.

As I said at the beginning, I’m often guilty of this – its very easy to forget how baffling Rx can be to the uninitiated. It should be used only when it brings real advantages to the table.

You often don’t have to look too far for this – even an idea as simple as adding a Timeout and decent error handling can be surprising hard to express with the TPL and yet quite simple with Rx. Pre .NET 4.5 and async/await, having to chain tasks together the old way was also a real chore.

That said, I won’t be stopping my love affair with Rx anytime soon – there’s plenty more nails that need hammering in with it!

Constraining a stream of events in Rx to a maximum rate

Sometimes, you want to limit the rate at which events arrive from an Rx stream.

The Throttle operator will suppress an event if another arrives within a specified interval. This is very useful in many instances, but it does have two important side-effects – even an unsuppressed event will be delayed by the interval, and events will get dropped altogether if they arrive too quickly.

I came across a situation where both of these were unacceptable. In this particular case, the desired behaviour was as follows: The events should be output at a maximum rate specified by a TimeSpan, but otherwise as soon as possible.

One solution works like this. Imagine our input stream is a bunch of people arriving at a railway station. For our output, we want people leave the station at a maximum rate. We set the maximum rate by having each person stand at the front of a flatbed railroad truck and sending that truck out of the station at a fixed speed. Because there is only one track, and all trucks travel at the same speed and have the same length, people will leave the station at a maximum rate when trucks are departing back-to-back. However, if the track is clear, the next person will be able to depart immediately.

So how to we translate this metaphor into Rx?

We will use the Concat operator’s ability to accept a stream of streams and merge them together back-to-back – just like sending railroad trucks down the track.

To get the equivalent of each person onto a railroad truck, we will use a Select to project each event (person) to an observable sequence (railroad truck) that starts with a single OnNext event (the person) and ends with an OnComplete exactly the defined interval later.

Lets assume the input events are an IObservable<T> in the variable input. Here’s the code:

var paced = input.Select(i => Observable.Empty<T>()

I’ll leave it as an exercise to turn this into an extension method for easy reuse!

Dealing with occasionally overrunning Timer initiated tasks using recursive scheduling

Sometimes we need to have background jobs that need to run periodically, such as a mail client checking for new e-mail every 5 minutes.

If you set up a Timer like this you may run into trouble:

Timer jobTimer = new Timer();
jobTimer.Interval = 60;
jobTimer.Elapsed += new ElapsedEventHandler(RunJob);

void RunJob(object sender, ElapsedEventArgs e)
    // do some work

The problem is that there’s nothing to address what happens if the work takes longer than the interval. If it does, you’ll get some unplanned concurrency. This may not matter, depending on the work you are doing – but for this post, let’s assume that the concurrency is undesirable. I’ll also address another concern – controlling the thread on which the work is run.

Let’s say you try to resolve the unplanned concurrency with a lock:

object gate = new object();

void RunJob(object sender, ElapsedEventArgs e)
        // do some work

This will remove the concurrency, but will cause successive work to queue up on the lock. If you start to overrun regularly, the queue could just keep growing and you’ll get some unplanned bunching.

I’ve illustrated the situation before and after the lock is introduced below:

Timer Problem

Now, there are a few ways to address this problem and most solutions fall into one of two groups. The first is to keep the periodic timer, but skip the work if a previous invocation is still running. You can do that like this:

object gate = new object();

void RunJob(object sender, ElapsedEventArgs e)
    if (Monitor.TryEnter(gate)
            // do some work

However, this leads to an unpredictable interval between invocations – sometimes it may just happen that the lock is released just as a new iteration tries to acquire it, other times the lock will be just missed and you’ll go almost double the interval between invocations.

The second group of solutions elect not to use a timer that fires periodically. Instead, after each time we perform the work only then do we schedule the subsequent iteration to take place. That is, instead of taking the interval to be start of one iteration to the start of the next, we take it to be from the end of one iteration to the start of the next. To do this, since we don’t know how long the work will take, we have to wait until the work is finished before scheduling the next iteration. This guarantees a predictable interval without any work running.

The general idea looks like this:

Schedule Per Iteration

If you stick with using one of the various Timer objects offered in the .NET BCL, then this is fairly easy to implement. You could use Timer.AutoReset = false and restart the Timer in the handler, for example.

However, we will look at an elegant solution offered by Rx. Here’s an extension method on an Rx Scheduler (more on this later) that will schedule a recurring task with the behaviour illustrated above:

public static IDisposable ScheduleRecurringAction(
    this IScheduler scheduler, TimeSpan interval, Action action)
  return scheduler.Schedule(interval, scheduleNext =>

And here’s how you could call it:

TimeSpan interval = TimeSpan.FromSeconds(5);
Action work = () => Console.WriteLine("Doing some work...");
var schedule =
  Scheduler.Default.ScheduleRecurringAction(interval, work);
Console.WriteLine("Press return to stop.");

This code probably looks a little odd if you haven’t come across recursive scheduling in Rx before. The overload of Schedule we are using schedules work, but also provides an Action for scheduling the next piece of work after that. In the code above, this is the scheduleNext parameter.

Note it’s entirely optional whether you actually use this Action – not calling it will exit the recursive loop. What’s going on under the covers uses a technique called trampolining that avoids generating massive stacks. It’s an idea that’s beyond the scope of this blog post – but if you are feeling brave you can read more about it in this fascinating article by Bart de Smet!

The IDisposable that’s returned by the Schedule call can be used to cancel the scheduled work. Cleverly, this will work not just for the first item scheduled; it’s also good for the work subsequently scheduled recursively.

As previously mentioned, the code above is an extension method on IScheduler. Rx provides a number of IScheduler implementations. These are responsible for managing scheduled work, and in particular for deciding on what threads work should take place.

The Scheduler.Default property returns a platform specific (as of Rx 2.0) scheduler that introduces the “cheapest” form of concurrency. For .NET 4.5 this is the ThreadPoolScheduler – this means that each iteration will be run on a threadpool thread. Other schedulers include the NewThreadScheduler, TaskPoolScheduler, ImmediateScheduler, CurrentThreadScheduler and DispatcherScheduler. I’ll save a fuller discussion of these for another time.

If you were really paying attention, you’ll have noticed that while calling Dispose will cancel any scheduled actions not started and prevent any further actions from being scheduled, it won’t cancel any action already started. If you need to cleanly cancel a longer running piece of work, that might be problematic.

Next time we will look at a technique to cooperatively cancel long running actions, and abort any that run on too long.

Detecting disconnected clients the Rx way

Today we’ll look at a fairly common server-side scenario. In this case, a server installs clients on remote machines and clients report back through a socket.

We want to create a “hold-timer” – each client will periodically send a heartbeat with a unique id back to the server to indicate they are alive. If the server doesn’t receive a heartbeat from a client within a specified limit then the client is removed.

Here’s a simple snippet that demonstrates how you can do this with reactive extensions. If you want to run it, paste the code into a console app – and don’t forget to add Nuget package Rx-Main :

var clientHeartbeats = new Subject<int>();
var timeToHold = TimeSpan.FromSeconds(5);

var expiredClients = clientHeartbeats
  .GroupBy(key => key)
  .SelectMany(grp => grp.Throttle(timeToHold));
var subscription = expiredClients.Subscribe(
  // in here add your disconnect action
  i => Console.WriteLine("Disconnect Client: " + i));
  var num = Console.ReadLine();
  if (num == "q")
  // call OnNext with the client id each time they send hello

// to tear down the monitor

So let’s break this down. We will create a Subject to which the client heartbeats will be posted. Every time a client heartbeat arrives, we call OnNext on the clientHeartbeats Subject, passing the client id. This gives us an IObservable<int> stream of client ids.

Let’s gloss over the Synchronize() call for now – I’ll come back to this later.

The first thing we want to do is separate out each client’s events into seperate streams for independent consideration. We can use Observable.GroupBy to do this. GroupBy needs a key selector which will identify each group. In this case we can use the identity function (key => key) and group by the id itself.

The output of GroupBy is a stream of streams – each stream emitted is an IGroupedObservable<int> that is just like an IObservable<int> but that also has a Key property to indicate which group it is. So the return type of GroupBy is an eye-watering IObservable<IGroupedObservable<TSource, TKey>>.

So now we have individual streams for each client id to work with. Let’s consider the goal – we want to emit a signal whenever a client doesn’t notify us for a specified duration.

Observable.Throttle is what we need. This extension method will suppress an event if another event appears within a specified duration. Perfect! We apply a Throttle to each group and we will only get an event if a client doesn’t say hello in time.

The situation so far is illustrated below:

Now we need to pull all the separate streams back into one result stream. For this, we can use Observable.SelectMany. One of its many (no pun intended!) overloads will do this for us, accepting a stream of streams and using a function pull out elements to be sent to the result stream. The selector function takes each group stream and we can apply the Throttle behaviour described above.

Now, there’s a problem here. The GroupBy operator has a memory leak! As written, it’s groups will never terminate. Even worse, if you try to terminate a group subscription (with say a Take(1) after our throttle), then the group will end, but won’t ever restart! I wouldn’t get too carried away worrying about this unless you have very large numbers of clients and never restart your service but…

You can address this problem by using Observable.GroupByUntil which takes a duration function for each group, and allows groups to restart. Here’s the code – I’ll leave it as an exercise for you to figure out the detail of how it works and why I’ve written it this way. :) :

var expiredClients = clientHeartbeats
  .GroupByUntil(key => key, grp => grp.Throttle(timeToHold))
  .SelectMany(grp => grp.TakeLast(1));

One final note… about that Observable.Synchronize. This function ensures that the IObservable<T> it is applied to behaves itself! It enforces the Rx grammar of OnNext*(OnComplete|OnError)? and it makes OnNext calls thread safe. In Rx 2.x, although Subject enforces correct grammar, OnNext calls are not thread safe. This was a decision made for performance reasons. It’s quite possible that in production server code you will be wanting to call OnNext from different threads. Synchronize will help prevent any nasty race conditions by putting a gate around OnNext calls.

Comparing previous and current items with Rx

Here’s an interesting problem that came up on the RX forums. How do you compare the current and previous items in an observable stream?

We can create a general purpose method to pair up the current and previous items, which will enable you to easily tack on whatever comparison function you like, such as a Where or Select. Here’s a picture of what we want to do:

Pair With Previous

Observable.Scan gives us an excellent starting point for this.

We’ll call the extension PairWithPrevious. Here’s an example of how you might use it to write out the deltas in a stream of integers:

var source = new Subject<int>();
var delta = source.PairWithPrevious()
                  .Select(pair => pair.Item2 - pair.Item1)

This gives the following output (note the first delta value is from the default integer value of 0 to the initial value of 1):


Here’s the code:

public static IObservable<Tuple<TSource, TSource>>
    PairWithPrevious<TSource>(this IObservable<TSource> source)
    return source.Scan(
        Tuple.Create(default(TSource), default(TSource)),
        (acc, current) => Tuple.Create(acc.Item2, current));

The first argument to scan specifies a “seed” that initializes the “accumulator”. This is not returned – it gives an initial value for the accumulator function. As each item arrives, the accumulator is passed to an accumulator function along with the current item. The function produces the next value for the accumulator. This new accumulator value is output to observers.

So Observable.Scan is similar to Observable.Aggregate, except it emits the value of the aggregate as it goes instead of only when the source stream completes.

I’m using the built in Tuple type here to produce the resulting pairs – this has an advantage over anonymous types because you can return them from a method. If you have specific needs, you can consider a custom type. Tuple provides a way of grouping values and has a set of ItemX properties, one for each value called Item1, Item2 and so on.

The seed value is a Tuple pair of defaults for the source type. On each iteration, we create a new Tuple, copying the old Item2 to Item1, and setting Item2 with the current value. We create a new Tuple because Tuples are immutable – if you are using a custom type, you could update it – but I actually think immutability is a safer way to go in RX in general, unless you are especially worried about memory and GC performance.

Finally, here’s a unit test I wrote for this function – I used Nuget packages Rx-Testing and NUnit:

public class PairWithPreviousTests : ReactiveTest
public void Works()
    var testScheduler = new TestScheduler();

    var source = Observable.Range(1, 3);

    var results = testScheduler.Start(
        () => source.PairWithPrevious());

        OnNext(Subscribed, Tuple.Create(0, 1)),
        OnNext(Subscribed, Tuple.Create(1, 2)),
        OnNext(Subscribed, Tuple.Create(2, 3)),
        OnCompleted<Tuple<int, int>>(Subscribed));

This post was inspired in part by an old question on StackOverflow with an answer that suffers from a double subscription to the source. If you like the approach here, I’d be dead pleased if you could up-vote my answer on SO. :) Thanks!