Reactive WPF – Part 1 – Introduction to Reactive Extensions

Introduction

At some time of the day, you are bored and the procrastination demon takes care of you. You go to Facebook (or Twitter, you can choose :-)) and start taking a look at the updates. After some time, you have checked all new posts and start to press F5 to refresh the page and get new updates, in the hope there’s something interesting there. After some time, you start thinking if there is not some automatic way to refresh the posts when a new message arrives, so you do not have to press F5 and be frustrated that there is nothing new to read.

I know that this never happens to you, but you can see my point: wouldn’t it be better to have something that shows you the new posts without the need of refreshing data? As a matter of fact, there is something available and it’s called Reactive Programming.

In our normal programming, we tend to do the same as our fictitious character and poll for new data, like in the following code:

while (true)
{
    // Get new data
    // Process data
}

This is very inefficient and will drain resources of our devices: your CPU will go high and your battery will be exhausted in no time. With Reactive Programming, instead of querying for new data, our program will be notified when new data arrives: that’s a more efficient way to get new data!

So, let us dive in this new programming paradigm. New? Not quite. Reactive Programming is based in the Observable Pattern.

In .NET, this is implemented by two interfaces, IObserver<T> and IObservable<T>, that were introduced in .NET 4.0, back in 2010. Soon after that, Microsoft released a framework to handle Reactive Programming, named Rx.NET, or Reactive Extensions for .NET.

Introduction to IObservable/IObserver

The Observer Pattern has two working parts: an Observable, that will send data, and one (or more) Observer, that will subscribe to the data and will do some action on it. Every time I think of it, it reminds me a conveyor belt, like the ones you can see in Chaplin’s Modern Times or in I Love Lucy’s Chocolate Factory.


 Charles Chaplin – Modern Times


I Love Lucy – Chocolate  Factory

The observable is releasing parts (or chocolates) and the observers are acting on them, by fixing them or wrapping the chocolates in paper. To represent these sequences, we can use marble diagrams like this ones:

The first sequence shows a series of four points and it’s not finished yet (the observers can continue receiving data). The second one is a series of three points and it was finished. The observers won’t receive data anymore. The third sequence shows three points and one error. Like the finished sequence, the observers won’t receive more data after the error.
The two interfaces are very simple:

public interface IObservable;
{
   IDisposable Subscribe(IObserver observer);
}

public interface IObserver;
{
   void OnCompleted(); // Source has finished sending messages.
   void OnError(Exception error); // An error occurred.
   void OnNext(T value); // There is new data to process.
}

Most of the time, you will not implement these interfaces manually, but will use the Observable and Observer classes to implement them. The Observable class has helper factory methods that will help to create observables that generate data and have a Subscribe method that returns an IDisposable (that should be disposed when you don’t want to get more data), and the Observer class has factory methods that will help you to create an instance that has three delegates, to receive the next data, process errors or finish the processing. This code shows a simple program that processes Observables (you must include the NuGet package System.Reactive to compile this project):

static void Main(string[] args)
{
    IObservable source = Observable.Range(1, 10);
    IObserver obsvr = Observer.Create(
        x => Console.WriteLine("OnNext: {0}", x),
        ex => Console.WriteLine("OnError: {0}", ex.Message),
        () => Console.WriteLine("OnCompleted"));
    using (source.Subscribe(obsvr))
    {
        Console.WriteLine("Press ENTER to unsubscribe...");
        Console.ReadLine();
    }
}

It creates an Observable that emits the integers between 1 and 10 and an Observer that logs the received values in the console. The result is something like this:

In this code, I used a “using” block for the result of the subscribe method, but most of the time this is not OK – normally you store the result of the subscribe method in a field and dispose it later, when you don’t need the subscription anymore. This Is more important with asynchronous observables; with this code you would not observe anything, the subscription could be disposed before you can observe anything.

Filtering Observable sequences with LINQ

Then you may say: “this is not much different than what I used to do, why should I use it?”. The first answer could be LINQ. You can use LINQ to filter the sequence, like this program:

static void Main(string[] args)
{
    IObservable source = Observable.Range(1, 10)
        .Where(x => x % 2 == 0)
        .Select(x => x * x);
    IObserver obsvr = Observer.Create(
        x => Console.WriteLine("OnNext: {0}", x),
        ex => Console.WriteLine("OnError: {0}", ex.Message),
        () => Console.WriteLine("OnCompleted"));
    using (source.Subscribe(obsvr))
    {
        Console.WriteLine("Press ENTER to unsubscribe...");
        Console.ReadLine();
    }
}

This program is similar to the first one, but I used lambdas instead of named methods and the observable is filtered, to emit the squares of the even numbers. The result here is:

Now, I’m sure you are convinced of it (really?), but there is more: the sequences can be generated by many other ways, like timed sequences:

static void Main(string[] args)
{
    var source = Observable.Timer(TimeSpan.FromSeconds(2),TimeSpan.FromSeconds(1))
        .Select((t,i) => new
        {
            Date = DateTime.Now,
            Item = i
        });
    using (source.Subscribe(
        x => Console.WriteLine($"OnNext: {x.Date} - {x.Item}"),
        ex => Console.WriteLine($"OnError: {ex.Message}"),
        () => Console.WriteLine("OnCompleted"))) 
    {
        Console.WriteLine("Press ENTER to unsubscribe...");
        Console.ReadLine();
    }
}

This sequence starts emitting in two seconds, and will emit a new item every second. I haven’t explicitly created a new Observer, but I have used an overload for the Subscribe method, that allows to use three lambdas. When you run this program, you will see something like this:

You can see that the code is not sequential: the observer is created and the message “Press ENTER to unsubscribe” is shown. Then, after two seconds, the program starts the output to the console. Pretty nice, no? Now the reactive extensions start to make sense. This program will continue its output until you press Enter; there is no Completed event and it will continue running until you end the program.

But that’s not all that we can make here. We can create observables from event handlers and combine them to drag an image around a form in WPF.

Combining Observable events

Create a new WPF project and add the System.Reactive package in NuGet. Add an image to the project and, in the MainWindow.xaml, change the root element to a Canvas and add an Image to it:


Then, in MainWindow.xaml.cs, add the drag code for the image:

public MainWindow()
{
    InitializeComponent();
    SetupDrag();
}

private void SetupDrag()
{
    var mousedown = Observable.FromEventPattern(Image, "MouseLeftButtonDown")
            .Select(evt => evt.EventArgs.GetPosition(Image));
    var mouseup = Observable.FromEventPattern(this, "MouseLeftButtonUp");
    var mousemove = Observable.FromEventPattern(this, "MouseMove")
            .Select(evt => evt.EventArgs.GetPosition(this));
    var drag = from start in mousedown
        from move in mousemove.TakeUntil(mouseup)
        select
        new
        {
            X = move.X - start.X,
            Y = move.Y - start.Y
        };
    drag.Subscribe(value =>
    {
        Canvas.SetLeft(Image, value.X);
        Canvas.SetTop(Image, value.Y);
    });
}

In this code, we are combining Observables that come from three events, MouseLeftButtonDown, MouseLeftButtonUp and MouseMove, into the “drag” observable, to set the canvas position of the image. Notice that the events don’t come from the same component: MouseDown comes from the image (we don’t want to start a drag if the user doesn’t click the image) and the other ones come from the window. If we would draw a marble diagram, it would be like this:

To illustrate this process, I will use OzCode trace feature to trace the data that comes from the events. Let’s create three new subscriptions for the events:

private void SetupDrag()
{
    var mousedown = Observable.FromEventPattern(Image, "MouseLeftButtonDown")
            .Select(evt => evt.EventArgs.GetPosition(Image));
    var mouseup = Observable.FromEventPattern(this, "MouseLeftButtonUp");
    var mousemove = Observable.FromEventPattern(this, "MouseMove")
            .Select(evt => evt.EventArgs.GetPosition(this));
    var drag = from start in mousedown
        from move in mousemove.TakeUntil(mouseup)
        select
        new
        {
            X = move.X - start.X,
            Y = move.Y - start.Y
        };
    mousedown.Subscribe(p =>
    {
        
    });
    mouseup.Subscribe(e =>
    {
        
    });
    mousemove.Subscribe(p =>
    {
        
    });
    drag.Subscribe(value =>
    {
        Canvas.SetLeft(Image, value.X);
        Canvas.SetTop(Image, value.Y);
    });
}

The subscriptions don’t do nothing in the result, they will be only points to break and analyze what’s happening. Set a breakpoint in the end of the MouseMove subscribe event:

Then run the program. As soon as you start moving the mouse over the window, the breakpoint will fire. Then, with the magic wand, create a Tracepoint there:

You don’t have to trace anything special here, just trace the name of the event, Mouse Move. You can also trace values using the name of the variable using braces (like in {variable}) but in this case we only want to see that the event was observed and a single string will do:

Do the same for the end of every subscription (including the drag subscription) and run the program. After you finish the program, go to the tracepoints window, by clicking on the number of trace hits at the bottom of the VS window:

When you look at it, you will see something like this:

As you can see there, you see a bunch of MouseMove. Then you have a MouseDown, that will start emitting the drag Observable for each Mouse Move, if you go down in the trace, you will see that, when a Mouse Up occurs, the drag stops, until a new Mouse Down occurs again. One nice thing in this window is the Thread Id field – when you are debugging an async observable, the result can come from several threads. With this field you can see from where the data comes from and filter the data for a single thread.

Pretty cool, no? With few lines of code, we could setup a drag for the image in a WPF app. We could combine the events using LINQ, observe the result, and move the image.

With this introduction to Reactive Extensions and knowing what can be done with the observables, we can go to the second part of this series, where we will see how to use Observables to setup a program using the MVVM pattern using ReactiveUI.

  • Jesse Liberty

    Self promotion; Paul Betts and I have a book on this: http://jliberty.me/2lYAZb8

    Thanks!

    Jesse Liberty

  • alca

    Looking forward for Part 2