Get a .gitignore file for Visual Studio fast with gitignore.io and Powershelll

I often use the wonderful service at https://www.gitignore.io to grab a .gitignore file suitable for Visual Studio use. Unfortunately, the file has unix line endings – so I’ve boiled the operation down to a single line of Powershell that sorts that out in one go:

(curl https://www.gitignore.io/api/visualstudio).Content
  -split "`n" -join "`r`n" | set-content .gitignore

Use Powershell 4 to check file signatures/hashes

I wasted a couple of DVDs this morning trying to burn a corrupted download of SQL Server from MSDN. Finally the light dawned and I decided to check the file hash. MSDN subscriber downloads have the SHA1 hash on the download page. I didn’t have my usual checker installed yet on my new laptop, and in googling I discovered that Powershell 4 can now do this natively!

The command you need is Get-FileHash – MSDN uses SHA1 rather than the default SHA256 of this command so the following will do it:

Get-FileHash <filepath> -Algorithm SHA1

Rx Hammer-Nail Syndrome

This post is inspired by a recent post by Mike Taulty. I think you’ll agree it’s a great introductory post on how to use Rx.

I’m a big fan of Rx – it’s an awesome technology. But sometimes I am guilty of over-using it – the old adage that given a shiny Rx hammer every problem looks like a nail definitely applies!

If you haven’t already, please take a look at Mike’s post here to compare with what follows.

While the post was clearly intended as an instructive and motivational example, it did remind me of my own tendency to reach for Rx in similar production-code situations. I often need to remind myself that a non-Rx version can sometimes be more straight-forward. I think this applies here if you look at code re-written to leverage the async/await syntax of C# 5:

Firstly, here is the non-Rx version of MakeObservableMovieDbRequest:

static async Task<T> MakeMovieRequestAsync<T>(
    string url,
    params KeyValuePair<string, string>[] additionalParameters)
{
    var client = new RestClient(BaseUrl) { Proxy = new WebProxy() };

    var parameterisedUrl = new StringBuilder(MakeUrl(url));

    foreach (var keyValuePair in additionalParameters)
    {
        parameterisedUrl.AppendFormat(@"&{0}={1}",
            keyValuePair.Key, keyValuePair.Value);
    }

    var request = new RestRequest(parameterisedUrl.ToString());

    var result = await client.ExecuteGetTaskAsync<T>(request);

    return result.Data;
}

Since a Task can be easily translated to an IObservable<T> with a call to ToObservable(), I think it’s just easier and more flexible to leave this function returning a Task. With the use of async/await we remove some braces and make the code more readable.

The same is true when using this method to page through the action movies:

static async Task GetActionMovies()
{
    var genreCollection =
        await MakeMovieRequestAsync<GenreCollection>(@"genre/list");

    var actionGenreId = (from genre in genreCollection.Genres
                            where genre.Name == "Action"
                            select genre.Id).First();

    var actionMoviesUrl = string.Format(
        @"genre/{0}/movies", actionGenreId);

    var actionMovieCollection =
        await MakeMovieRequestAsync<MovieCollection>(actionMoviesUrl);

    var totalPages = actionMovieCollection.TotalPages;

    for (int i = 1; i <= totalPages; i++)
    {
        var parameter = new KeyValuePair<string, string>(
            "page", i.ToString(CultureInfo.InvariantCulture));
        var movies = await MakeMovieRequestAsync<MovieCollection>(
            actionMoviesUrl, parameter);

        foreach (var movie in movies.Results)
        {
            Console.WriteLine("Movie {0}", movie.Title);
        }
    }            
}

The use of async/await means this code tells a clear story here – there are no obscure uses of Rx features like Concat to constrain concurrency; the for loop makes it clear how the results are being pulled together.

There are plenty of examples where Rx makes sense – often when coordinating complex streams.

The Rx solution is actually quite involved with some concepts that are not particularly intention revealing. In my opinion, in this case it makes the solution harder to maintain compared with the async/await approach.

As I said at the beginning, I’m often guilty of this – its very easy to forget how baffling Rx can be to the uninitiated. It should be used only when it brings real advantages to the table.

You often don’t have to look too far for this – even an idea as simple as adding a Timeout and decent error handling can be surprising hard to express with the TPL and yet quite simple with Rx. Pre .NET 4.5 and async/await, having to chain tasks together the old way was also a real chore.

That said, I won’t be stopping my love affair with Rx anytime soon – there’s plenty more nails that need hammering in with it!

Constraining a stream of events in Rx to a maximum rate

Sometimes, you want to limit the rate at which events arrive from an Rx stream.

The Throttle operator will suppress an event if another arrives within a specified interval. This is very useful in many instances, but it does have two important side-effects – even an unsuppressed event will be delayed by the interval, and events will get dropped altogether if they arrive too quickly.

I came across a situation where both of these were unacceptable. In this particular case, the desired behaviour was as follows: The events should be output at a maximum rate specified by a TimeSpan, but otherwise as soon as possible.

One solution works like this. Imagine our input stream is a bunch of people arriving at a railway station. For our output, we want people leave the station at a maximum rate. We set the maximum rate by having each person stand at the front of a flatbed railroad truck and sending that truck out of the station at a fixed speed. Because there is only one track, and all trucks travel at the same speed and have the same length, people will leave the station at a maximum rate when trucks are departing back-to-back. However, if the track is clear, the next person will be able to depart immediately.

So how to we translate this metaphor into Rx?

We will use the Concat operator’s ability to accept a stream of streams and merge them together back-to-back – just like sending railroad trucks down the track.

To get the equivalent of each person onto a railroad truck, we will use a Select to project each event (person) to an observable sequence (railroad truck) that starts with a single OnNext event (the person) and ends with an OnComplete exactly the defined interval later.

Lets assume the input events are an IObservable<T> in the variable input. Here’s the code:

var paced = input.Select(i => Observable.Empty<T>()
                                        .Delay(interval)
                                        .StartWith(i)).Concat();

I’ll leave it as an exercise to turn this into an extension method for easy reuse!

Dealing with occasionally overrunning Timer initiated tasks using recursive scheduling

Sometimes we need to have background jobs that need to run periodically, such as a mail client checking for new e-mail every 5 minutes.

If you set up a Timer like this you may run into trouble:

Timer jobTimer = new Timer();
jobTimer.Interval = 60;
jobTimer.Elapsed += new ElapsedEventHandler(RunJob);
serviceTimer.Start();

void RunJob(object sender, ElapsedEventArgs e)
{
    // do some work
}

The problem is that there’s nothing to address what happens if the work takes longer than the interval. If it does, you’ll get some unplanned concurrency. This may not matter, depending on the work you are doing – but for this post, let’s assume that the concurrency is undesirable. I’ll also address another concern – controlling the thread on which the work is run.

Let’s say you try to resolve the unplanned concurrency with a lock:

object gate = new object();

void RunJob(object sender, ElapsedEventArgs e)
{
    lock(gate)
    {
        // do some work
    }
}

This will remove the concurrency, but will cause successive work to queue up on the lock. If you start to overrun regularly, the queue could just keep growing and you’ll get some unplanned bunching.

I’ve illustrated the situation before and after the lock is introduced below:

Timer Problem

Now, there are a few ways to address this problem and most solutions fall into one of two groups. The first is to keep the periodic timer, but skip the work if a previous invocation is still running. You can do that like this:

object gate = new object();

void RunJob(object sender, ElapsedEventArgs e)
{
    if (Monitor.TryEnter(gate)
    {
        try
        {
            // do some work
        }
        finally
        {
            Monitor.Exit(locker);
        }
    }
}

However, this leads to an unpredictable interval between invocations – sometimes it may just happen that the lock is released just as a new iteration tries to acquire it, other times the lock will be just missed and you’ll go almost double the interval between invocations.

The second group of solutions elect not to use a timer that fires periodically. Instead, after each time we perform the work only then do we schedule the subsequent iteration to take place. That is, instead of taking the interval to be start of one iteration to the start of the next, we take it to be from the end of one iteration to the start of the next. To do this, since we don’t know how long the work will take, we have to wait until the work is finished before scheduling the next iteration. This guarantees a predictable interval without any work running.

The general idea looks like this:

Schedule Per Iteration

If you stick with using one of the various Timer objects offered in the .NET BCL, then this is fairly easy to implement. You could use Timer.AutoReset = false and restart the Timer in the handler, for example.

However, we will look at an elegant solution offered by Rx. Here’s an extension method on an Rx Scheduler (more on this later) that will schedule a recurring task with the behaviour illustrated above:

public static IDisposable ScheduleRecurringAction(
    this IScheduler scheduler, TimeSpan interval, Action action)
{
  return scheduler.Schedule(interval, scheduleNext =>
  {
    action();
    scheduleNext(interval);
  });
}

And here’s how you could call it:

TimeSpan interval = TimeSpan.FromSeconds(5);
Action work = () => Console.WriteLine("Doing some work...");
	
var schedule =
  Scheduler.Default.ScheduleRecurringAction(interval, work);
	
Console.WriteLine("Press return to stop.");
Console.ReadLine();
schedule.Dispose();

This code probably looks a little odd if you haven’t come across recursive scheduling in Rx before. The overload of Schedule we are using schedules work, but also provides an Action for scheduling the next piece of work after that. In the code above, this is the scheduleNext parameter.

Note it’s entirely optional whether you actually use this Action – not calling it will exit the recursive loop. What’s going on under the covers uses a technique called trampolining that avoids generating massive stacks. It’s an idea that’s beyond the scope of this blog post – but if you are feeling brave you can read more about it in this fascinating article by Bart de Smet!

The IDisposable that’s returned by the Schedule call can be used to cancel the scheduled work. Cleverly, this will work not just for the first item scheduled; it’s also good for the work subsequently scheduled recursively.

As previously mentioned, the code above is an extension method on IScheduler. Rx provides a number of IScheduler implementations. These are responsible for managing scheduled work, and in particular for deciding on what threads work should take place.

The Scheduler.Default property returns a platform specific (as of Rx 2.0) scheduler that introduces the “cheapest” form of concurrency. For .NET 4.5 this is the ThreadPoolScheduler – this means that each iteration will be run on a threadpool thread. Other schedulers include the NewThreadScheduler, TaskPoolScheduler, ImmediateScheduler, CurrentThreadScheduler and DispatcherScheduler. I’ll save a fuller discussion of these for another time.

If you were really paying attention, you’ll have noticed that while calling Dispose will cancel any scheduled actions not started and prevent any further actions from being scheduled, it won’t cancel any action already started. If you need to cleanly cancel a longer running piece of work, that might be problematic.

Next time we will look at a technique to cooperatively cancel long running actions, and abort any that run on too long.

Detecting disconnected clients the Rx way

Today we’ll look at a fairly common server-side scenario. In this case, a server installs clients on remote machines and clients report back through a socket.

We want to create a “hold-timer” – each client will periodically send a heartbeat with a unique id back to the server to indicate they are alive. If the server doesn’t receive a heartbeat from a client within a specified limit then the client is removed.

Here’s a simple snippet that demonstrates how you can do this with reactive extensions. If you want to run it, paste the code into a console app – and don’t forget to add Nuget package Rx-Main :

var clientHeartbeats = new Subject<int>();
var timeToHold = TimeSpan.FromSeconds(5);

var expiredClients = clientHeartbeats
  .Sychronize()
  .GroupBy(key => key)
  .SelectMany(grp => grp.Throttle(timeToHold));
  
var subscription = expiredClients.Subscribe(
  // in here add your disconnect action
  i => Console.WriteLine("Disconnect Client: " + i));
	
while(true)
{
  var num = Console.ReadLine();
  if (num == "q")
  {
    break;
  }
  // call OnNext with the client id each time they send hello
  clientHeartbeats.OnNext(int.Parse(num));
}

// to tear down the monitor
subscription.Dispose();

So let’s break this down. We will create a Subject to which the client heartbeats will be posted. Every time a client heartbeat arrives, we call OnNext on the clientHeartbeats Subject, passing the client id. This gives us an IObservable<int> stream of client ids.

Let’s gloss over the Synchronize() call for now – I’ll come back to this later.

The first thing we want to do is separate out each client’s events into seperate streams for independent consideration. We can use Observable.GroupBy to do this. GroupBy needs a key selector which will identify each group. In this case we can use the identity function (key => key) and group by the id itself.

The output of GroupBy is a stream of streams – each stream emitted is an IGroupedObservable<int> that is just like an IObservable<int> but that also has a Key property to indicate which group it is. So the return type of GroupBy is an eye-watering IObservable<IGroupedObservable<TSource, TKey>>.

So now we have individual streams for each client id to work with. Let’s consider the goal – we want to emit a signal whenever a client doesn’t notify us for a specified duration.

Observable.Throttle is what we need. This extension method will suppress an event if another event appears within a specified duration. Perfect! We apply a Throttle to each group and we will only get an event if a client doesn’t say hello in time.

The situation so far is illustrated below:

Now we need to pull all the separate streams back into one result stream. For this, we can use Observable.SelectMany. One of its many (no pun intended!) overloads will do this for us, accepting a stream of streams and using a function pull out elements to be sent to the result stream. The selector function takes each group stream and we can apply the Throttle behaviour described above.

Now, there’s a problem here. The GroupBy operator has a memory leak! As written, it’s groups will never terminate. Even worse, if you try to terminate a group subscription (with say a Take(1) after our throttle), then the group will end, but won’t ever restart! I wouldn’t get too carried away worrying about this unless you have very large numbers of clients and never restart your service but…

You can address this problem by using Observable.GroupByUntil which takes a duration function for each group, and allows groups to restart. Here’s the code – I’ll leave it as an exercise for you to figure out the detail of how it works and why I’ve written it this way. :) :

var expiredClients = clientHeartbeats
  .GroupByUntil(key => key, grp => grp.Throttle(timeToHold))
  .SelectMany(grp => grp.TakeLast(1));

One final note… about that Observable.Synchronize. This function ensures that the IObservable<T> it is applied to behaves itself! It enforces the Rx grammar of OnNext*(OnComplete|OnError)? and it makes OnNext calls thread safe. In Rx 2.x, although Subject enforces correct grammar, OnNext calls are not thread safe. This was a decision made for performance reasons. It’s quite possible that in production server code you will be wanting to call OnNext from different threads. Synchronize will help prevent any nasty race conditions by putting a gate around OnNext calls.

Comparing previous and current items with Rx

Here’s an interesting problem that came up on the RX forums. How do you compare the current and previous items in an observable stream?

We can create a general purpose method to pair up the current and previous items, which will enable you to easily tack on whatever comparison function you like, such as a Where or Select. Here’s a picture of what we want to do:

Pair With Previous

Observable.Scan gives us an excellent starting point for this.

We’ll call the extension PairWithPrevious. Here’s an example of how you might use it to write out the deltas in a stream of integers:

var source = new Subject<int>();
var delta = source.PairWithPrevious()
                  .Select(pair => pair.Item2 - pair.Item1)
                  .Subscribe(Console.WriteLine);
			
source.OnNext(1);
source.OnNext(5);
source.OnNext(3);

This gives the following output (note the first delta value is from the default integer value of 0 to the initial value of 1):

1
4
-2

Here’s the code:

public static IObservable<Tuple<TSource, TSource>>
    PairWithPrevious<TSource>(this IObservable<TSource> source)
{
    return source.Scan(
        Tuple.Create(default(TSource), default(TSource)),
        (acc, current) => Tuple.Create(acc.Item2, current));
}

The first argument to scan specifies a “seed” that initializes the “accumulator”. This is not returned – it gives an initial value for the accumulator function. As each item arrives, the accumulator is passed to an accumulator function along with the current item. The function produces the next value for the accumulator. This new accumulator value is output to observers.

So Observable.Scan is similar to Observable.Aggregate, except it emits the value of the aggregate as it goes instead of only when the source stream completes.

I’m using the built in Tuple type here to produce the resulting pairs – this has an advantage over anonymous types because you can return them from a method. If you have specific needs, you can consider a custom type. Tuple provides a way of grouping values and has a set of ItemX properties, one for each value called Item1, Item2 and so on.

The seed value is a Tuple pair of defaults for the source type. On each iteration, we create a new Tuple, copying the old Item2 to Item1, and setting Item2 with the current value. We create a new Tuple because Tuples are immutable – if you are using a custom type, you could update it – but I actually think immutability is a safer way to go in RX in general, unless you are especially worried about memory and GC performance.

Finally, here’s a unit test I wrote for this function – I used Nuget packages Rx-Testing and NUnit:

public class PairWithPreviousTests : ReactiveTest
{
[Test]
public void Works()
{
    var testScheduler = new TestScheduler();

    var source = Observable.Range(1, 3);

    var results = testScheduler.Start(
        () => source.PairWithPrevious());

    results.Messages.AssertEqual(
        OnNext(Subscribed, Tuple.Create(0, 1)),
        OnNext(Subscribed, Tuple.Create(1, 2)),
        OnNext(Subscribed, Tuple.Create(2, 3)),
        OnCompleted<Tuple<int, int>>(Subscribed));
}
}

This post was inspired in part by an old question on StackOverflow with an answer that suffers from a double subscription to the source. If you like the approach here, I’d be dead pleased if you could up-vote my answer on SO. :) Thanks!

ObserveLatestOn – A coalescing ObserveOn

This is a problem that comes up when the UI dispatcher can’t keep up with inbound activity.

The example I saw on a project was a price stream that had to be displayed on the UI that could get very busy.

If more than one new price arrives on background threads in between dispatcher time slices, there is no point displaying anything but the most recent price – in fact we want the UI to “catch up” by dropping all but the most recent undisplayed price.

So the operation is like an ObserveOn that drops all but the most recent events. Here’s a picture of what’s happening – notice how price 2 is dropped and how prices are only published during the dispatcher time slice:

Here is the code (by the way, if you’re not sure what Materialize and Accept are, I wrote about these here):

public static IObservable<T> ObserveLatestOn<T>(
    this IObservable<T> source, IScheduler scheduler)
{
return Observable.Create<T>(observer =>
    {
    Notification<T> outsideNotification = null;
    var gate = new object();
    bool active = false;
    var cancelable = new MultipleAssignmentDisposable();
    var disposable = source.Materialize().Subscribe(thisNotification =>
    {
        bool wasNotAlreadyActive;
	lock (gate)
	{
	    wasNotAlreadyActive = !active;
	    active = true;
	    outsideNotification = thisNotification;
	}

	if (wasNotAlreadyActive)
	{
	    cancelable.Disposable = scheduler.Schedule(self =>
            {
                Notification<T> localNotification = null;
                lock (gate)
                {
                    localNotification = outsideNotification;
                    outsideNotification = null;
                }
                localNotification.Accept(observer);
                bool hasPendingNotification = false;
                lock (gate)
                {
                    hasPendingNotification = active = (outsideNotification != null);
                }
                if (hasPendingNotification)
                {
                     self();
                }
            });
        }
    });
    return new CompositeDisposable(disposable, cancelable);
});
}

The key idea here is that we keep track of the notification to be pushed on to the target scheduler in pendingNotification – and whenever an event is received, we swap the pendingNotification for the new notification. We ensure the new notification will be scheduled for dispatch on the target scheduler – but we may not need to do this…

If the previousNotification is null we know that either (a) there was no previous notification as this is the first one or (b) the previousNotification was already dispatched. How to we know this? Because in the scheduler action that does the dispatch we swap the pendingNotification for null! So if previousNotification is null, we know we must schedule a new dispatch action.

This approach keeps the checks, locks and scheduled actions to a minimum.

Notes and credits:

I’ve gone round the houses a few times on this implementation – my own attempts to improve it to use CAS rather than lock ran into bugs, so the code below is largely due to Lee Campbell, and edited for RX 2.0 support by Wilka Hudson. For an interesting discussion on this approach see this thread on the official RX forum.

A reactive join example

A good example for using a reactive join came up the other day. Given a realtime price stream, we want to display the most recent price whenever a button is clicked.

Joins in Rx ask when it is that events from pairs of streams coincide.

To reason about whether events occur at the same time, they need to have a duration as well as a value. That duration could be infinitessimally small (a point event), or it could be longer.

Duration in Rx is expressed with (you might have guessed) an observable. Each event in a source stream has an associated duration stream. The duration of the event is the time it takes for the duration stream to emit an OnNext or OnCompleted.

So back to the example. The prices are represented as an IObservable<Price>.

Through the use of Observable.FromEvent we can easily transform click events into an IObservable<Unit>. (Recall Unit is a type representing an event whose value is not interesting.)

The result we want will be an IObservable<Price> that emits the most recent Price from the price stream whenever the IObservable<Unit> stream emits a Unit.

The join should tell us when the duration of a price coincides with a click. Here’s a visual representation of the idea:

Visualizing SampleOn

This is quite similar to the built in Rx Sample extension method that samples a source stream at periodic intervals, so I decided to call this operation SampleOn.

To use a join here we want to give each Price a duration that lasts until the next Price is emitted. A duration that lasts until the next event is quite a common requirement. Happily, we can use a useful trick and take the Price stream itself as the duration observable. In this way, the next Price will terminate the duration of the previous Price.*

The duration of the Click event is a point event – to represent this we just a stream that terminates immediately, and Observable.Empty<Unit>() will do nicely.

Here is the code:

public static IObservable<TSource>
    SampleOn<TSource, TSample>(
        this IObservable<TSource> source,
        IObservable<TSample> sampler)
        {
            return from s in source
                   join t in sampler
                     on source equals Observable.Empty<Unit>()
                   select s;
        }

The query syntax for joins uses the following pattern:

from left event in left stream
join right event in right stream
on left duration function equals right duration function
select selector function.

* One thing to point out when using a stream as it’s own duration function, as I do in this example, is that this is going to cause the stream to be subscribed to multiple times – you will want to either Publish()/RefCount() the source stream or ensure that it is hot. I talked about this previously here.

Mind your subscriptions in your IObservable extensions

When creating new Rx operators that extend IObservable, it’s a good best practice to make sure you only subscribe once to the source. Users of your operator have a right to expect that subscriptions to your operator will only in turn cause a single subscription to the source. This could become crucial if the source is cold – multiple subscriptions might cause unintended side-effects.

Here is an example (somewhat contrived for simplicity). Let’s say you wanted to have a running count of the number of OnNext()s that have occurred in a source stream within the previous 5 seconds. You might write this as follows:

public static IObservable<int> WindowCount<T>(
  this IObservable<T> source,
  TimeSpan interval)
{
    const int onNextEntersWindow = 1;
    const int onNextExitsWindow = -1;

    return Observable.Create<int>(observer =>
        {
            return Observable
              .Merge(source.Select(_ => onNextEntersWindow),
                     source.Select(_ => onNextExitsWindow)
                           .Delay(interval))
              .Scan((countInWindow, onNextEvent) =>
                countInWindow + onNextEvent)
              .Subscribe(observer);
        });
}

This code is projecting the source stream to TWO separate streams and merging them. The first is a stream of 1′s, the second a stream of -1′s that is delayed by an interval. These are then added together with Scan for a real time running total. The effect of this is for each event in the source, when get a count of the number of events that have occurred within the preceding interval. This is a useful idea for a number of advanced operators… but I digress.

The key point is that this operation creates TWO subscriptions to the source – one for each stream. The fix is straightforward – you simply publish the source and subscribe to the publication as many times as you need. Effectively you are working with a version of the source stream guaranteed to be hot. The fix looks like this:

public static IObservable<int> WindowCount<T>(
  this IObservable<T> source,
  TimeSpan interval)
{
    const int onNextEntersWindow = 1;
    const int onNextExitsWindow = -1;

    return Observable.Create<int>(observer =>
        {
            var hotSource = source.Publish().RefCount();

            return Observable
              .Merge(hotSource.Select(_ => onNextEntersWindow),
                     hotSource.Select(_ => onNextExitsWindow)
                              .Delay(interval))
              .Scan((countInWindow, onNextEvent) =>
                countInWindow + onNextEvent)
              .Subscribe(observer);
        });
}

You must be accurate with the Publish. Putting it before the Create would cause a single Subscription across ALL subscriptions to a given invocation. By doing the Publish inside the Create, we get the desired behaviour of one Subscription to source per invocation.