Why Materialize? And helpers to log Rx.

I wrote some helper extension methods to log what’s going on in my Rx expressions. These leverage the Materialize method which has this signature:

public static IObservable<Notification<TSource>>
  Materialize<TSource>(this IObservable<TSource> source)

Materialize projects a stream of values of T into a stream of Notification<T> values. Notifications have a Kind that tells us whether the original value was an OnNext, OnError or OnCompleted. They give you access to the original Value or Exception as appropriate. You can also pass a Notification on to an observer via its Accept method.

To put it another way, (the way the docs do), it turns implicit notifications into explicit notifications. Implicit because the method called on the observer (OnNext, OnError, OnCompleted) tells us implicitly what kind of notification it is, whereas the Notifications, which are always delivered into OnNext, are explicitly declaring the type via their Kind property.

Incidentally, there is a corresponding Dematerialize method that performs the reverse operation. Finally, Materialized streams send OnCompleted after they send a Notification of Kind OnError or OnCompleted. They don’t OnError themselves.

So why is this useful?

Well if you ever want to process all events on a stream in a uniform manner, without Materialize you’d be stuck writing handlers for each of the three methods. With Materialize, you can do it all in one place.

Here are some practical examples I use when I want to inspect what’s going on with my Rx. The first example is a side effect, like Do(), that can be injected into a stream to write out what’s happening to the Console. It’s an extension method:

public static IObservable<T> LogToConsole<T>(
    this IObservable<T> source, string key = null)
{
    return Observable.Create<T>(observer =>
        source.Materialize().Subscribe(notification =>
        {
            Console.WriteLine(NotifyToString(notification, key));
            notification.Accept(observer);
        }));
}

This uses Materialize to get a stream of Notifications which can then be written out to the Console using the helper method NotifyToString, shown later. I use the Accept method to pass the notification on to the observer. The key property optionally allows me to label the output. Here’s an example:

Observable.Range(0, 4).LogToConsole("Range: ").Subscribe();

Which gives the output:

Range:   OnNext(0)
Range:   OnNext(1)
Range:   OnNext(2)
Range:   OnNext(3)
Range:   OnCompleted()

Note that LogToConsole subscribes “on demand” (as all good Rx operators should) and needs a subscriber itself for anything to happen. It’s a side-effect, so be warned it log for EVERY subscriber.

It’s quite common for me to be experimenting with the output of operators in tools like LINQPad, so I also wrote another extension method to take care of the subscription as well:

public static IDisposable SubscribeConsoleNotifier<T>(
    this IObservable<T> source, string key = null)
{
    return source.Materialize().Subscribe(
        n => Console.WriteLine(NotifyToString(n, key)));
}

And an example of usage putting both together:

Observable.Range(0, 4)
          .LogToConsole("Range: ")
          .Where(i => i % 2 == 0)
          .SubscribeConsoleNotifier("Where: ");

Gives this, notice how the LogToConsole shows the output from Range before the Where filter is applied:

Range:   OnNext(0)
Where:   OnNext(0)
Range:   OnNext(1)
Range:   OnNext(2)
Where:   OnNext(2)
Range:   OnNext(3)
Range:   OnCompleted()
Where:   OnCompleted()

Obviously, you can code these to write to a logger, or something more generally useful.

To wrap this post up, here’s the implementation of the helper NotifyToConsole:

public static string NotifyToString<T>(
    Notification<T> notification, string key = null)
{
    if (key != null)
        return key + "\t" + notification;
    
    return notification.ToString();
}

It’s all trivial stuff, but I’ve found it quite useful.

Rx Maintenance Part 1: Are the reactive extensions a maintenance nightmare?

For the record, I’m a huge fan of Rx, but I’ve been thinking a lot about lately about how to keep Rx code clean and easy to understand. The project I’m currently involved in makes extensive use of Rx in a large multi-tiered system. I’m talking 1000′s of lines of Rx.

I joined the project roughly two years into its lifespan. I am pretty up to speed on Rx and I am quite used to maintaining code. Nonetheless, I have found understanding the code to be more challenging that I would have expected. Debugging complex chains of Rx operators is not for the faint of heart!

This project was an early adopter of Rx, and the developers were learning as they went. Not only that, Rx itself has changed and grown a fair bit along the way. This might go some way to explaining the lack of readability. But I don’t think it is the whole story.

In a previous project, I created a rate limiter to throttle the amount of calls web users were making against an API. This was a refactoring to Rx of some older code. It didn’t have the excuse that Rx changed a lot while I was writing it. What I ended up with was about 10-20% the size of the original code and certainly more elegant.

But trying to explain how it worked was challenging, and at the time I wondered whether the succinctness would actually be a barrier to successful maintenance. The truth of that came from a brown-bag I ran on the code for which I had 10 slides of marble diagrams to help explain how it worked! We all know the chances of that kind of documentation being maintained are pretty much zero.

This combined with other Rx experiences has left me wondering what guidelines we should be laying down to keep Rx maintainable and how, if at all, these guidelines would differ from typical best practices for creating maintainable code.

I’ll be exploring these thoughts along with some practical code examples on and off over the next few weeks. Stay tuned!