Dealing with occasionally overrunning Timer initiated tasks using recursive scheduling

Sometimes we need to have background jobs that need to run periodically, such as a mail client checking for new e-mail every 5 minutes.

If you set up a Timer like this you may run into trouble:

Timer jobTimer = new Timer();
jobTimer.Interval = 60;
jobTimer.Elapsed += new ElapsedEventHandler(RunJob);
serviceTimer.Start();

void RunJob(object sender, ElapsedEventArgs e)
{
    // do some work
}

The problem is that there’s nothing to address what happens if the work takes longer than the interval. If it does, you’ll get some unplanned concurrency. This may not matter, depending on the work you are doing – but for this post, let’s assume that the concurrency is undesirable. I’ll also address another concern – controlling the thread on which the work is run.

Let’s say you try to resolve the unplanned concurrency with a lock:

object gate = new object();

void RunJob(object sender, ElapsedEventArgs e)
{
    lock(gate)
    {
        // do some work
    }
}

This will remove the concurrency, but will cause successive work to queue up on the lock. If you start to overrun regularly, the queue could just keep growing and you’ll get some unplanned bunching.

I’ve illustrated the situation before and after the lock is introduced below:

Timer Problem

Now, there are a few ways to address this problem and most solutions fall into one of two groups. The first is to keep the periodic timer, but skip the work if a previous invocation is still running. You can do that like this:

object gate = new object();

void RunJob(object sender, ElapsedEventArgs e)
{
    if (Monitor.TryEnter(gate)
    {
        try
        {
            // do some work
        }
        finally
        {
            Monitor.Exit(locker);
        }
    }
}

However, this leads to an unpredictable interval between invocations – sometimes it may just happen that the lock is released just as a new iteration tries to acquire it, other times the lock will be just missed and you’ll go almost double the interval between invocations.

The second group of solutions elect not to use a timer that fires periodically. Instead, after each time we perform the work only then do we schedule the subsequent iteration to take place. That is, instead of taking the interval to be start of one iteration to the start of the next, we take it to be from the end of one iteration to the start of the next. To do this, since we don’t know how long the work will take, we have to wait until the work is finished before scheduling the next iteration. This guarantees a predictable interval without any work running.

The general idea looks like this:

Schedule Per Iteration

If you stick with using one of the various Timer objects offered in the .NET BCL, then this is fairly easy to implement. You could use Timer.AutoReset = false and restart the Timer in the handler, for example.

However, we will look at an elegant solution offered by Rx. Here’s an extension method on an Rx Scheduler (more on this later) that will schedule a recurring task with the behaviour illustrated above:

public static IDisposable ScheduleRecurringAction(
    this IScheduler scheduler, TimeSpan interval, Action action)
{
  return scheduler.Schedule(interval, scheduleNext =>
  {
    action();
    scheduleNext(interval);
  });
}

And here’s how you could call it:

TimeSpan interval = TimeSpan.FromSeconds(5);
Action work = () => Console.WriteLine("Doing some work...");
	
var schedule =
  Scheduler.Default.ScheduleRecurringAction(interval, work);
	
Console.WriteLine("Press return to stop.");
Console.ReadLine();
schedule.Dispose();

This code probably looks a little odd if you haven’t come across recursive scheduling in Rx before. The overload of Schedule we are using schedules work, but also provides an Action for scheduling the next piece of work after that. In the code above, this is the scheduleNext parameter.

Note it’s entirely optional whether you actually use this Action – not calling it will exit the recursive loop. What’s going on under the covers uses a technique called trampolining that avoids generating massive stacks. It’s an idea that’s beyond the scope of this blog post – but if you are feeling brave you can read more about it in this fascinating article by Bart de Smet!

The IDisposable that’s returned by the Schedule call can be used to cancel the scheduled work. Cleverly, this will work not just for the first item scheduled; it’s also good for the work subsequently scheduled recursively.

As previously mentioned, the code above is an extension method on IScheduler. Rx provides a number of IScheduler implementations. These are responsible for managing scheduled work, and in particular for deciding on what threads work should take place.

The Scheduler.Default property returns a platform specific (as of Rx 2.0) scheduler that introduces the “cheapest” form of concurrency. For .NET 4.5 this is the ThreadPoolScheduler – this means that each iteration will be run on a threadpool thread. Other schedulers include the NewThreadScheduler, TaskPoolScheduler, ImmediateScheduler, CurrentThreadScheduler and DispatcherScheduler. I’ll save a fuller discussion of these for another time.

If you were really paying attention, you’ll have noticed that while calling Dispose will cancel any scheduled actions not started and prevent any further actions from being scheduled, it won’t cancel any action already started. If you need to cleanly cancel a longer running piece of work, that might be problematic.

Next time we will look at a technique to cooperatively cancel long running actions, and abort any that run on too long.

Detecting disconnected clients the Rx way

Today we’ll look at a fairly common server-side scenario. In this case, a server installs clients on remote machines and clients report back through a socket.

We want to create a “hold-timer” – each client will periodically send a heartbeat with a unique id back to the server to indicate they are alive. If the server doesn’t receive a heartbeat from a client within a specified limit then the client is removed.

Here’s a simple snippet that demonstrates how you can do this with reactive extensions. If you want to run it, paste the code into a console app – and don’t forget to add Nuget package Rx-Main :

var clientHeartbeats = new Subject<int>();
var timeToHold = TimeSpan.FromSeconds(5);

var expiredClients = clientHeartbeats
  .Sychronize()
  .GroupBy(key => key)
  .SelectMany(grp => grp.Throttle(timeToHold));
  
var subscription = expiredClients.Subscribe(
  // in here add your disconnect action
  i => Console.WriteLine("Disconnect Client: " + i));
	
while(true)
{
  var num = Console.ReadLine();
  if (num == "q")
  {
    break;
  }
  // call OnNext with the client id each time they send hello
  clientHeartbeats.OnNext(int.Parse(num));
}

// to tear down the monitor
subscription.Dispose();

So let’s break this down. We will create a Subject to which the client heartbeats will be posted. Every time a client heartbeat arrives, we call OnNext on the clientHeartbeats Subject, passing the client id. This gives us an IObservable<int> stream of client ids.

Let’s gloss over the Synchronize() call for now – I’ll come back to this later.

The first thing we want to do is separate out each client’s events into seperate streams for independent consideration. We can use Observable.GroupBy to do this. GroupBy needs a key selector which will identify each group. In this case we can use the identity function (key => key) and group by the id itself.

The output of GroupBy is a stream of streams – each stream emitted is an IGroupedObservable<int> that is just like an IObservable<int> but that also has a Key property to indicate which group it is. So the return type of GroupBy is an eye-watering IObservable<IGroupedObservable<TSource, TKey>>.

So now we have individual streams for each client id to work with. Let’s consider the goal – we want to emit a signal whenever a client doesn’t notify us for a specified duration.

Observable.Throttle is what we need. This extension method will suppress an event if another event appears within a specified duration. Perfect! We apply a Throttle to each group and we will only get an event if a client doesn’t say hello in time.

The situation so far is illustrated below:

Now we need to pull all the separate streams back into one result stream. For this, we can use Observable.SelectMany. One of its many (no pun intended!) overloads will do this for us, accepting a stream of streams and using a function pull out elements to be sent to the result stream. The selector function takes each group stream and we can apply the Throttle behaviour described above.

Now, there’s a problem here. The GroupBy operator has a memory leak! As written, it’s groups will never terminate. Even worse, if you try to terminate a group subscription (with say a Take(1) after our throttle), then the group will end, but won’t ever restart! I wouldn’t get too carried away worrying about this unless you have very large numbers of clients and never restart your service but…

You can address this problem by using Observable.GroupByUntil which takes a duration function for each group, and allows groups to restart. Here’s the code – I’ll leave it as an exercise for you to figure out the detail of how it works and why I’ve written it this way. :) :

var expiredClients = clientHeartbeats
  .GroupByUntil(key => key, grp => grp.Throttle(timeToHold))
  .SelectMany(grp => grp.TakeLast(1));

One final note… about that Observable.Synchronize. This function ensures that the IObservable<T> it is applied to behaves itself! It enforces the Rx grammar of OnNext*(OnComplete|OnError)? and it makes OnNext calls thread safe. In Rx 2.x, although Subject enforces correct grammar, OnNext calls are not thread safe. This was a decision made for performance reasons. It’s quite possible that in production server code you will be wanting to call OnNext from different threads. Synchronize will help prevent any nasty race conditions by putting a gate around OnNext calls.

Comparing previous and current items with Rx

Here’s an interesting problem that came up on the RX forums. How do you compare the current and previous items in an observable stream?

We can create a general purpose method to pair up the current and previous items, which will enable you to easily tack on whatever comparison function you like, such as a Where or Select. Here’s a picture of what we want to do:

Pair With Previous

Observable.Scan gives us an excellent starting point for this.

We’ll call the extension PairWithPrevious. Here’s an example of how you might use it to write out the deltas in a stream of integers:

var source = new Subject<int>();
var delta = source.PairWithPrevious()
                  .Select(pair => pair.Item2 - pair.Item1)
                  .Subscribe(Console.WriteLine);
			
source.OnNext(1);
source.OnNext(5);
source.OnNext(3);

This gives the following output (note the first delta value is from the default integer value of 0 to the initial value of 1):

1
4
-2

Here’s the code:

public static IObservable<Tuple<TSource, TSource>>
    PairWithPrevious<TSource>(this IObservable<TSource> source)
{
    return source.Scan(
        Tuple.Create(default(TSource), default(TSource)),
        (acc, current) => Tuple.Create(acc.Item2, current));
}

The first argument to scan specifies a “seed” that initializes the “accumulator”. This is not returned – it gives an initial value for the accumulator function. As each item arrives, the accumulator is passed to an accumulator function along with the current item. The function produces the next value for the accumulator. This new accumulator value is output to observers.

So Observable.Scan is similar to Observable.Aggregate, except it emits the value of the aggregate as it goes instead of only when the source stream completes.

I’m using the built in Tuple type here to produce the resulting pairs – this has an advantage over anonymous types because you can return them from a method. If you have specific needs, you can consider a custom type. Tuple provides a way of grouping values and has a set of ItemX properties, one for each value called Item1, Item2 and so on.

The seed value is a Tuple pair of defaults for the source type. On each iteration, we create a new Tuple, copying the old Item2 to Item1, and setting Item2 with the current value. We create a new Tuple because Tuples are immutable – if you are using a custom type, you could update it – but I actually think immutability is a safer way to go in RX in general, unless you are especially worried about memory and GC performance.

Finally, here’s a unit test I wrote for this function – I used Nuget packages Rx-Testing and NUnit:

public class PairWithPreviousTests : ReactiveTest
{
[Test]
public void Works()
{
    var testScheduler = new TestScheduler();

    var source = Observable.Range(1, 3);

    var results = testScheduler.Start(
        () => source.PairWithPrevious());

    results.Messages.AssertEqual(
        OnNext(Subscribed, Tuple.Create(0, 1)),
        OnNext(Subscribed, Tuple.Create(1, 2)),
        OnNext(Subscribed, Tuple.Create(2, 3)),
        OnCompleted<Tuple<int, int>>(Subscribed));
}
}

This post was inspired in part by an old question on StackOverflow with an answer that suffers from a double subscription to the source. If you like the approach here, I’d be dead pleased if you could up-vote my answer on SO. :) Thanks!

ObserveLatestOn – A coalescing ObserveOn

This is a problem that comes up when the UI dispatcher can’t keep up with inbound activity.

The example I saw on a project was a price stream that had to be displayed on the UI that could get very busy.

If more than one new price arrives on background threads in between dispatcher time slices, there is no point displaying anything but the most recent price – in fact we want the UI to “catch up” by dropping all but the most recent undisplayed price.

So the operation is like an ObserveOn that drops all but the most recent events. Here’s a picture of what’s happening – notice how price 2 is dropped and how prices are only published during the dispatcher time slice:

Here is the code (by the way, if you’re not sure what Materialize and Accept are, I wrote about these here):

public static IObservable<T> ObserveLatestOn<T>(
    this IObservable<T> source, IScheduler scheduler)
{
return Observable.Create<T>(observer =>
    {
    Notification<T> outsideNotification = null;
    var gate = new object();
    bool active = false;
    var cancelable = new MultipleAssignmentDisposable();
    var disposable = source.Materialize().Subscribe(thisNotification =>
    {
        bool wasNotAlreadyActive;
	lock (gate)
	{
	    wasNotAlreadyActive = !active;
	    active = true;
	    outsideNotification = thisNotification;
	}

	if (wasNotAlreadyActive)
	{
	    cancelable.Disposable = scheduler.Schedule(self =>
            {
                Notification<T> localNotification = null;
                lock (gate)
                {
                    localNotification = outsideNotification;
                    outsideNotification = null;
                }
                localNotification.Accept(observer);
                bool hasPendingNotification = false;
                lock (gate)
                {
                    hasPendingNotification = active = (outsideNotification != null);
                }
                if (hasPendingNotification)
                {
                     self();
                }
            });
        }
    });
    return new CompositeDisposable(disposable, cancelable);
});
}

The key idea here is that we keep track of the notification to be pushed on to the target scheduler in pendingNotification – and whenever an event is received, we swap the pendingNotification for the new notification. We ensure the new notification will be scheduled for dispatch on the target scheduler – but we may not need to do this…

If the previousNotification is null we know that either (a) there was no previous notification as this is the first one or (b) the previousNotification was already dispatched. How to we know this? Because in the scheduler action that does the dispatch we swap the pendingNotification for null! So if previousNotification is null, we know we must schedule a new dispatch action.

This approach keeps the checks, locks and scheduled actions to a minimum.

Notes and credits:

I’ve gone round the houses a few times on this implementation – my own attempts to improve it to use CAS rather than lock ran into bugs, so the code below is largely due to Lee Campbell, and edited for RX 2.0 support by Wilka Hudson. For an interesting discussion on this approach see this thread on the official RX forum.