ObserveLatestOn – A coalescing ObserveOn

This is a problem that comes up when the UI dispatcher can’t keep up with inbound activity.

The example I saw on a project was a price stream that had to be displayed on the UI that could get very busy.

If more than one new price arrives on background threads in between dispatcher time slices, there is no point displaying anything but the most recent price – in fact we want the UI to “catch up” by dropping all but the most recent undisplayed price.

So the operation is like an ObserveOn that drops all but the most recent events. Here’s a picture of what’s happening – notice how price 2 is dropped and how prices are only published during the dispatcher time slice:

Here is the code (by the way, if you’re not sure what Materialize and Accept are, I wrote about these here):

public static IObservable<T> ObserveLatestOn<T>(
    this IObservable<T> source, IScheduler scheduler)
{
return Observable.Create<T>(observer =>
    {
    Notification<T> outsideNotification = null;
    var gate = new object();
    bool active = false;
    var cancelable = new MultipleAssignmentDisposable();
    var disposable = source.Materialize().Subscribe(thisNotification =>
    {
        bool wasNotAlreadyActive;
	lock (gate)
	{
	    wasNotAlreadyActive = !active;
	    active = true;
	    outsideNotification = thisNotification;
	}

	if (wasNotAlreadyActive)
	{
	    cancelable.Disposable = scheduler.Schedule(self =>
            {
                Notification<T> localNotification = null;
                lock (gate)
                {
                    localNotification = outsideNotification;
                    outsideNotification = null;
                }
                localNotification.Accept(observer);
                bool hasPendingNotification = false;
                lock (gate)
                {
                    hasPendingNotification = active = (outsideNotification != null);
                }
                if (hasPendingNotification)
                {
                     self();
                }
            });
        }
    });
    return new CompositeDisposable(disposable, cancelable);
});
}

The key idea here is that we keep track of the notification to be pushed on to the target scheduler in pendingNotification – and whenever an event is received, we swap the pendingNotification for the new notification. We ensure the new notification will be scheduled for dispatch on the target scheduler – but we may not need to do this…

If the previousNotification is null we know that either (a) there was no previous notification as this is the first one or (b) the previousNotification was already dispatched. How to we know this? Because in the scheduler action that does the dispatch we swap the pendingNotification for null! So if previousNotification is null, we know we must schedule a new dispatch action.

This approach keeps the checks, locks and scheduled actions to a minimum.

Notes and credits:

I’ve gone round the houses a few times on this implementation – my own attempts to improve it to use CAS rather than lock ran into bugs, so the code below is largely due to Lee Campbell, and edited for RX 2.0 support by Wilka Hudson. For an interesting discussion on this approach see this thread on the official RX forum.

Comments are closed.