Pages

Sunday, August 29, 2010

Implementation of an Observer

Hi Folks,

As few people told me to implement an observer in my last post where I just showed how to use it, here is the post where I am going to clear out confusions for you. If you have read my other post, you might already know what is an Observer and why it is required. Lets recap this a bit more.

An observer is a container which observes each element individually and notifies you when the object state is modified. The observer should contain methods that enables you to subscribe or unsubscribe individually so that when you subscribe for a  notification, it will keep on creating notification until you explicitly unsubscribe the Observer.

In .NET base class library, there are two interfaces introduced viz, IObservable and IObserver. These interfaces gives you a standard to develop Observable pattern and also recommends you to use it rather than doing it of your own. Microsoft also builds forth its Reactive Framework (I will discuss about it later) based on Observer pattern and lets us use it when Observer is required. 

In this post, I will discuss how you could use IObserver and IObservable to implement you own notifiers.


Download Sample - 33KB



IObserver is actually the individual Observers that when you add to the system will eventually generate notification to the environment.

Clarification of Observable and Observer

For example, say you want to add a tracer to an environment, so that when each object changes its state, you want to get notified and hence write Trace elements based on those state changes. In this case Observer might come handy. The Observer is the individual objects that generate notifications.  On the other hand, the entire environment is observed by the Observable.So Observable acts as the container for all the Observers. Say you want two observer, one for Tracing and another for Logging. In such a case, you need to implement two classes, one to trace the state changes and another to log the state changes. Here once you subscribe each Observer, the Observable will generate notification to the environment which lets you to write your custom code.

IObserver Implementation
Lets start with implementing IObserver.

public class SampleObserver<T> : IObserver<T> where T:class, new()
{
    private IDisposable unsubscriber;

    public string Name { get; set; }

    public SampleObserver(string name)
    {
        this.Name = name;
    }

    #region IObserver<T> Members

    public void OnCompleted()
    {
        Console.WriteLine("{0} : OnComplete is called.", this.Name);
        this.Unsubscribe();
    }

    public void OnError(Exception error)
    {
        Console.WriteLine("{0} : OnError is called", this.Name);
        this.LogException(error);
    }

    public void OnNext(T value)
    {
        Console.WriteLine("{0} : OnNext is called.", this.Name);
        this.LogProperties(value);
    }

    #endregion

    public virtual void Subscribe(IObservable<T> observable)
    {
        if(observable != null)
            this.unsubscriber = observable.Subscribe(this);
    }

    public virtual void Unsubscribe()
    {
        Console.WriteLine("{0} : Calling Unsubscriber for Observer", this.Name);
        if(this.unsubscriber != null)
            this.unsubscriber.Dispose();
    }

    private void LogException(Exception error)
    {
        Console.WriteLine("Exception occurred while traversing thorough objects of type {0}", error.GetType().Name);
        Console.WriteLine("Exception Message : {0}", error.Message);
    }
    private void LogProperties(T value)
    {
        T tobj = value;
        PropertyInfo[] pinfos = tobj.GetType().GetProperties();
        Console.WriteLine("==========================={0}====================================================", this.Name);
        Console.WriteLine("Lets trace all the Properties ");

        foreach (PropertyInfo pinfo in pinfos)
            Console.WriteLine("Value of {0} is {1}", pinfo.Name, pinfo.GetValue(tobj, null));

        Console.WriteLine("============================={0}===================================================", this.Name);

            
    }
}

As I have already told you that IObserver is the unit of Push based observer and it is just a dual of IEnumerator, it has three methods in it.

  1. OnNext acts when the object changes its state. In my own implementation, the object state is changed when new object is inserted. 
  2. OnError acts when observer cannot generate the notification. In my own implementation, it will not generate OnError, as the Observable is eventually a collection and I left out the possibility of generating the Error. I will modify the code later to give you chance to get OnError. 
  3. OnComplete will be called when observer completes the notification. In my own implementation OnComplete is called whenever the subscriber unsubscribes the Observer.
I have also added Subscribe and Unsubscribe method in the implementation of IObserver (which is not mandatory) which will let you to subscribe the object to the Observable and also Unsubscribe the Observer by calling its Dispose. You should note, the Subscribe method actually returns a Disposable object. We will use the object to Dispose the Observer. The disposal is done using a class called Unsubscriber. Lets implement the same :

// A simple Disposable object
public class Unsubscriber<T> :IDisposable
{
    private List<IObserver<T>> observers;
    private IObserver<T> observer;

    public Unsubscriber(List<IObserver<T>> observers, IObserver<T> observer)
    {
        this.observers = observers;
        this.observer = observer;
    }
    #region IDisposable Members

    public void Dispose()
    {
        //We will just remove the Observer from the list whenever the unsubscription calls.
        if (observer != null && observers.Contains(observer))
            observers.Remove(observer);
    }

    #endregion
}

As in my own implementation of Observer, I have used only a Collection to generate the notification, we just pass the Observer container and the Observer so that we could remove the object from the container whenever we require to Dispose.

IObservable Implementation

The implementation of IObservable is very simple. The IObservable has a single method Subscribe. This method lets one to subscribe the object. The method returns an IDisposable object which lets you to Unsubscribe the object.


public class SampleObservable<T> : IObservable<T> where T:class
{

    private List<IObserver<T>> observers = new List<IObserver<T>>();
      
    #region IObservable<T> Members

    public IDisposable Subscribe(IObserver<T> observer)
    {
        if (!observers.Contains(observer))
            observers.Add(observer);
        return new Unsubscriber<T>(observers, observer);

    }

    #endregion

    public void TrackObserver(T obj)
    {
        if (obj != null)
        {
            foreach (IObserver<T> observer in observers)
                if (observers.Contains(observer))
                    observer.OnNext(obj);
                else
                    observer.OnError(new ApplicationException("Not available"));
        }
    }

    public void TrackComplete()
    {
        foreach (IObserver<T> observer in observers)
            observer.OnCompleted();

        observers.Clear();
    }
        
}

In the sample implementation, you can see I have used a List of IObserver. The List will hold all the Observers that I subscribe and I call each observer whenever any object is added. You can see, I have used TrackObserver to invoke the OnNext notification myself. In your actual implementation, you could attach an event handler to invoke OnNext and OnError for a collection as I have showed in my previous post.

Lets try the sample implementation :



static void Main(string[] args)
{
    SampleObservable<MyDummyClass> sovable = new SampleObservable<MyDummyClass>();

    SampleObserver<MyDummyClass> sovr1 = new SampleObserver<MyDummyClass>("First Observer");
    using (sovable.Subscribe(sovr1))
    {
        SampleObserver<MyDummyClass> sovr2 = new SampleObserver<MyDummyClass>("Second Observer");
        using (sovable.Subscribe(sovr2))
        {
            sovable.TrackObserver(new MyDummyClass { Name = "Abhishek", Class = "1" });
            sovable.TrackObserver(new MyDummyClass { Name = "Abhijit", Class = "2" });
        }
        //Lets unsubscribe before adding 3rd object.
        sovable.TrackObserver(new MyDummyClass { Name = "Shibatosh", Class = "3" });

        sovable.TrackComplete();
    }
    Console.Read();
}

public class MyDummyClass
{
    public string Name { get; set; }
    public string Class { get; set; }
}

Here in the sample I have used a class which have two properties. The TraceProperties method will help me to get value of those properties. So we declare the SampleObservable. It is the container for all Observers. and added two Observers for the class MyDummyClass. The TrackObserver will help us to generate OnNext notifications and which will eventually write the Properties on the Console.

We added two Observer, sover1 and sovr2. The using block is useful in this case as it automatically dispose objects.






So from the above snap you can see, the observer generate two notifications for each objects Abhishek and Abhijit but for the third, it calls only once. The first Observer is disposed after Tracing the first two objects.

Download Sample App - 33 KB

Conclusion

So I hope this clears the basic usage of Observable and Observer. I am implementing the same using Events to clear you more.  Stay tune to see the updates . Also I would like to see your feedback and criticisms so that It could improve the post.

Thanks for reading.

4 comments:

  1. .Net has this concept built in - with the "event" keyword... it saves a lot of code.

    ReplyDelete
  2. @David.

    Thank you for your comment. Well, are you talking about Observable.FromEvent ?
    If so, it is not a part of Base class library as of now. Base class library only includes the two interfaces IObservable and IObserver. The Observable implementation is done from within Reactive Framework.

    Anyways, there are lots of workaround to these, but I just showed how to build one.

    ReplyDelete

Please make sure that the question you ask is somehow related to the post you choose. Otherwise you post your general question in Forum section.