Pages

Tuesday, August 24, 2010

IObserver and IObservable - A New addition to BCL

With the introduction of new interfaces, it is time to get it on with discussion. With the current release of VS2010, there were two interfaces that were introduced viz, IObservable and IObserver. Here in the post, I am going to discuss about these interfaces and its connection to Push based approach on Reactive Framework.

IObserver and IObservable as a Dual to Enumerables

First, it should be noted, IObserver and IObservable is actually the mathematical dual of IEnumerable and IEnumerator. Based on iterator pattern, IEnumerable is actually a repository of elements that made up the objects. The IEnumerable holds all the objects and it uses IEnumerator to get each individual objects from the repository. The few methods in IEnumerator which the IEnumerable uses is MoveNext and Current. So for each iteration, the Enumerator calls MoveNext and assigns it to Current which is later on sent back to the external environment.

So if you consider the interface IEnumerable and IEnumerator it looks like :

public interface IEnumerator<out T> : IDisposable
{
          T Current { get; }
          bool MoveNext();
          void Reset();
 }

public interface IEnumerable<out T> : IEnumerable
{
     IEnumerator<T> GetEnumerator();
}

So the IEnumerator has MoveNext which is called every when we need to yield next element from the store. The MoveNext sets the Current item and sends it back to the Environment. So IEnumerable might be considered as Pull based approach and it is used for sequential retrieval of objects.

IObservable and IObserver introduced to BCL recently as stated is mathematical dual of IEnumerable and IEnumerator. Lets see the interfaces a bit :

public interface IObserver<in T>
{
    void OnCompleted();
    void OnError(Exception error);
    void OnNext(T value);
}

and for IObservable it is :
public interface IObservable<out T>
{
      IDisposable Subscribe(IObserver<T> observer);
}


Hence, if you see the difference between the two Interfaces, IEnuerator has Current and MoveNext. These methods are used to Pull objects from the repository. IObserver has OnNext which is used to Push objects to the repository. Again, if you look into IEnumerable, it uses GetEnumerator to pull back the object of IEnumerable, while IObservable has a Subscribe method which is used to push an Observer to the Observable. Hence you can easily say, Observable interfaces in BCL is a dual to Enumerables where the Former uses Push based approach and the later uses pull based approach.



Where they are useful ? 

If you recollect the Observer design pattern, you should know it already. Observer patter actually holds a list of dependent objects known as Observer and notifies when certain state of the Observer changes. We have already got an idea of ObservableCollection which notifies the change to the collection to the external environment. IObserver and IObservable gives you a chance to enhance this flexibility more. Lets see few lines of code :

public class CustomObserver
{

    private ObservableCollection<int> myrepository;

    public ObservableCollection<int> MyRepository
    {
        get
        {
            this.myrepository = this.myrepository ?? new ObservableCollection<int>();
            return this.myrepository;
        }
    }

    public void LoadRepository(int item)
    {
        this.MyRepository.Add(item);
    }

    private List<int> filteredcollection;
    public List<int> FilteredCollection
    {
        get
        {
            this.filteredcollection = this.filteredcollection ?? new List<int>();
            return filteredcollection;
        }
    }
    public IDisposable GetObserved()
    {

        IObservable<IEvent<NotifyCollectionChangedEventArgs>> numberObserver = Observable.FromEvent<NotifyCollectionChangedEventArgs>(this.MyRepository, "CollectionChanged");
        Action<IEvent<NotifyCollectionChangedEventArgs>> subscriptionAction = item =>
        {
            switch (item.EventArgs.Action)
            {
                case NotifyCollectionChangedAction.Add:
                    var newValues = item.EventArgs.NewItems.Cast<int>().Where(n => n % 2 == 0);
                    this.FilteredCollection.AddRange(newValues);
                    break;
                case NotifyCollectionChangedAction.Remove:
                    foreach (int n in item.EventArgs.OldItems)
                        this.FilteredCollection.Remove(n);
                    break;
                case NotifyCollectionChangedAction.Replace:
                    foreach (int n in item.EventArgs.OldItems)
                        this.FilteredCollection.Remove(n);
                    goto case NotifyCollectionChangedAction.Add;
            }
        };

        return numberObserver.Subscribe(subscriptionAction);

    }


}

So in this class I have implemented a simple collection of objects and registered the PropertyChanged event of ObservableCollection. Hence the Action method SubscriptionAction will be called automatically and reevaluate the list FilteredCollection whenever the Observer MyRepository gets an object.

So to demonstrate lets look at the Main method :

static void Main(string[] args)
{

    CustomObserver myobj = new CustomObserver();
    IDisposable unregisterobj = myobj.GetObserved();

    bool DoContinue = false;
    do
    {
        try
        {
            Console.Write("Enter a Value:");
            int item = Convert.ToInt32(Console.ReadLine());

            myobj.LoadRepository(item);

            Console.WriteLine("Filtered List counter : {0}", myobj.FilteredCollection.Count);

            Console.WriteLine("Do you want to continue?[1/0]");
            DoContinue = Convert.ToInt32(Console.ReadLine()) == 1;
        }
        catch { continue; }
    } 
    while (DoContinue);

    Console.WriteLine("Disposing Registration...");
    unregisterobj.Dispose();

    Console.ReadKey();
}

Being so much straightforward, we create an object of out class CustomObserver and called the method GetObserved which in turn call Subscribe method from the Observer and returns the Disposable object. You must remember the Subscribe method returns a IDisposable which will be used to unsubscribe the method by just calling the Dispose method of it. 


Just as you might expect if you run the application the application allows you to enter numeric values each of which is observed and added to the FilteredCollection. Hence when you enter Even values, the FilteredCollection gets updated.

The FromEvent is a method from Reactive framework that generates an Observable from an event. In the call, the Observer will observe the collection for the event CollectionChanged. We will discuss more about Reactive framework later.

Download the Sample - 29KB

I hope you like my demonstration. Feel free to write your comment and feedback. Thank you for reading.

2 comments:

  1. Hey Abhishek, nice article, but I do hve a couple of (hopefully constructive!) criticisms:

    You use Rx without really pointing out that you are (until the very end of the article). It's entirely possible to use IObservable, IObserver without using Rx -- although it does require a little more legwork. However, I think it would make sense to point out that the FromEvent method essentially call's the IObserver's OnNext method each time CollectionChanged is fired.

    You might add a version showing how to do all of this without using any Rx stuff -- basically create an implementation of IObservable and IObserver, and in turn you could do any requisite output stuff directly in your Observer's OnNext, OnCompleted, and OnError methods.

    Anyway, keep up the good work!

    ReplyDelete
  2. Hey Jordan, Thanks for your comment. Actually I dont take it as my criticism. It is true, I didnt show the actual implementation of IObserver and IObservable. Actually, I thought of giving the idea how it works, and to give I found Rx is the best and shortest way to implement the same.

    I am now implementing those interfaces and will post a new article on them regarding how to implement the two interfaces and how to actually use the same.

    ReplyDelete

Please make sure that the question you ask is somehow related to the post you choose. Otherwise you post your general question in Forum section.