Sunday, February 14, 2010

Observer pattern

The Observer Pattern is classified under Object Behavioral Patterns in the book, Design Patterns: Elements of Reusable Object-Oriented Software by Erich Gamma et al. (Addison-Wesley, 1995). In this article, I will be using the terms used by 'Gang of Four (GoF)' to explain Observer Pattern. First, let us understand the problem of object interaction.

GoF classifies the Observer Pattern as an Object Behavioral Pattern. The Observer Pattern is intended to "Define a one-to-many dependency between objects so that when one object changes state, all its dependents are notified and updated automatically". An object that is subjected to change is called a Subject and an object that depends on the Subject's state is called an Observer. In the above example, 'B' can be a Subject and Objects A and C can be Observers. A Subject can have any number of Observers. When the Subject changes its state, it Notifies all the Observers and the Observers query the Subject to maintain state consistency with the Subject.

Here is the code.

#include <iostream>
#include <list>
#include <algorithm>



/*
 * Code for the observer pattern
 *
 */


// fwd declare the classes
class Observer;
class Observable;

// Derive your class from this to
// get notifications in DataChanged
class Observer
{
    Observable *m_Observing;

public:
    Observer();
    virtual ~Observer();
    void AttachToObservable(Observable *);
    void UnhookFromObservable( );
    virtual void DataChanged( Observable * );

};

// Derive your class from Observable and call Notify
// whenever the data changes
class Observable
{
    typedef std::list>Observer *< ObserverList;
    typedef ObserverList::iterator ObserverList_Iter;

    ObserverList  m_Observers;

public:
    Observable();
    virtual ~Observable();
    void AddObserver(Observer *);
    void RemoveObserver(Observer *);
    void Notify();
};


Observable::Observable()
{
}

Observable::~Observable()
{
    ObserverList_Iter aIter;
    for( aIter = m_Observers.begin(); aIter != m_Observers.end();++aIter)
    {
        (**aIter).UnhookFromObservable();
    }
}

void Observable::AddObserver(Observer *o)
{
    ObserverList_Iter aIter;

    aIter = std::find( m_Observers.begin(), m_Observers.end(), o);
    if( aIter == m_Observers.end())
    {
        m_Observers.push_back( o);
    }
}


/*
 * This is called from the observer. We find the
 * observer in the list and remove the item.
 *
 */

void Observable::RemoveObserver(Observer *o)
{
    ObserverList_Iter aIter;

    aIter = std::find( m_Observers.begin(), m_Observers.end(), o);
    if( aIter != m_Observers.end())
    {
        m_Observers.erase( aIter );
    }
}


/*
 * Observable::Notify. Notify all the observers that
 * the data has changed.
 *
 */

void Observable::Notify()
{
    ObserverList_Iter aIter;

    for( aIter = m_Observers.begin(); aIter!= m_Observers.end(); ++aIter)
    {
        (**aIter).DataChanged( this );
    }
}

// ===============================================

Observer::Observer()
: m_Observing( NULL )
{
}

Observer::~Observer()
{
    // We are being destroyed. Make sure
    // we remove ourselves from the 
    // observer we attached ourself to.
    UnhookFromObservable();
}

void Observer::UnhookFromObservable()
{
    if( m_Observing )
    {
        m_Observing->RemoveObserver(this);
    }
    m_Observing = NULL;
}

void Observer::DataChanged(Observable*)
{
    std::cout << "Hey, you should override DataChanged in your derived class !!\n";
}

/*
 * In my line of work we price bonds off 
 * benchmark bonds. If the price of the 
 * benchmark bond changes, the price of all bonds
 * that are priced off that benchmark must be 
 * adjusted accordingly.
 *
 * Here is a simple class, Instrument that 
 * can be used for both the 
 * benchmark and the dependent bond. 
 *
 */
class Instrument : public Observable, public Observer
{
    double m_Price;
    double m_Spread;
    std::string m_ID;

public:
    Instrument( std::string & inID)
    : m_ID(inID), m_Price(0.0), m_Spread(10.0)
    {
    }
    Instrument(const char *p)
    : m_ID(p), m_Price(0.0)
    {
    }

    void PriceChanged( double inPrice )
    {
        m_Price = inPrice;
        Notify();
    }
    void DataChanged( Observable *o)
    {
        Instrument *i = dynamic_cast(o);
        if(i)
        {
            std::cout << m_ID << " price updated from " << i->m_ID << "\n";
            this->m_Price = i-<m_Price + m_Spread;
        }
    }
};


void test1()
{
    Instrument  i1( "FGBLZ0");
    Instrument  i2( "DBS bank 2010");

    i1.AddObserver( &i2 );
    i1.PriceChanged( 100);

}


int main(int, char **)
{
    std::cout << "Hello, world!\n";

    test1();

    return 0;
}

// EOF

Sunday, February 7, 2010

Producer consumer

A common programing problem is program while writing performant client-server applications is to have multiple threads service client requests and data. This increases the effeftive throughput, expecially if the processing requires I/O to/from a slow device. With such a scenarion, having multiple threads to service client requests dramatically improves the thruput because if a thread is blocked for i/o, other threads can start processing on other requests.

In this blog, I present the code for a simple queuing mechanism.

First, we define some utility classes.

Locker is a convenient class to automatically lock and unlock a mutex. It is designed for exception safety. We lock the mutex in the constructor and unlock in the destructor.

The next class is a convenient class to start, stop and join threads. Note, that this class is a simple wrapper and it is not well designed for portabality. In the implementation here, we assume that we are using POSIX threads and all threads are daemon threads (daemon threads terminate when the app terminates as opposed to joinable threads, which have to terminate before the app is terminated by the OS.

Subclasses of this class must override the ThdFunc, which is the main thread function.

Note that there is a race condition in this arrangement. If we create the underlying pthread in the constructor itself, sometimes the ThdFunc gets called even before the constructor of MyThreadBase has finished ! Since the ThdFunc is a pure virtual function, we get a system "Null pointer exception! Therefore, the underlying thread is created in the start call, which can only be made when the constructor returns and the object is fully constructed by that time.


class MyThreadBase
{
protected:
    static void *startFunc(void *);

    pthread_t  m_thread;
    bool m_KeepRunning;
    bool m_isDaemon;

public:

    MyThreadBase(bool isDaemon = true );


    virtual ~MyThreadBase();

    void start();

    void kill();
    void join();

    virtual void ThdFunc() = 0;

    bool KeepRunning() {return m_KeepRunning; }
    void SetTerminateFlag() { m_KeepRunning = false;}
};

Here is the code in its entirety

#include <iostream>
#include <list>
#include <pthread.h>


using namespace std;

// Convenient class to make sure the
// mutex is unlocked even if there is 
// an exception.
struct Locker
{

    Locker(pthread_mutex_t & inMutex)
    : m_Mutex(inMutex)
    {
        ::pthread_mutex_lock( &m_Mutex);
    }

    ~Locker()
    {
        ::pthread_mutex_unlock( &m_Mutex );
    }

private:
    pthread_mutex_t & m_Mutex;
};



// This is the bonded-queue class. In the 
// example we use a list to store elements
// because we can remove items from the 
// front of the queue. We could have used a
// dqueue also. We did not want to use a vector
// because removing an item from the front of
// the vector will force a copy of all subsequent
// items in the vector. However, if you were using
// pointers to messages, then a vector would be
// more efficiend. Ideally, this class should
// be a template class and the data type should
// be a vector of shared_ptr

#define QUEUE_SIZE  10

class BQueue
{
    list m_Items;
    pthread_mutex_t  m_QueueMutex;
    pthread_cond_t   m_EmptyCond;   // wait on this while trying to read
    pthread_cond_t   m_FullCond;  // wait on this while trying to write
public:

    BQueue();
    ~BQueue();
    void  AddItem(int inItem);
    int   RemoveItem();
};

BQueue::BQueue()
{
    //m_Items.resize( QUEUE_SIZE );
    ::pthread_mutex_init( &m_QueueMutex, NULL );
    ::pthread_cond_init( &m_EmptyCond, NULL );
    ::pthread_cond_init( &m_FullCond, NULL );
}

BQueue::~BQueue()
{
}


int BQueue::RemoveItem()
{
    Locker w(m_QueueMutex);
    while( m_Items.empty() )
    {
        ::pthread_cond_wait( &m_EmptyCond, &m_QueueMutex );
    }
    int aRet = m_Items.front();
    m_Items.pop_front();
    ::pthread_cond_signal( &m_FullCond );
    return aRet;
}


void BQueue::AddItem(int inValue)
{
    Locker w( m_QueueMutex );
    while( m_Items.size() >= QUEUE_SIZE )
    {
        cout << "BQueue::AddItem, queue is full!\n";
        ::pthread_cond_wait( &m_FullCond, &m_QueueMutex);
    }

    m_Items.push_back( inValue );
    ::pthread_cond_signal( &m_EmptyCond );
}





class MyThreadBase
{
protected:
    static void *startFunc(void *);

    pthread_t  m_thread;
    bool m_KeepRunning;
    bool m_isDaemon;

public:

    MyThreadBase(bool isDaemon = true )
    : m_thread(0),
    m_KeepRunning(true),
    m_isDaemon( isDaemon )
    {
    }


    virtual ~MyThreadBase()
    {
    }

    void start()
    {
        // assert( m_thread == NULL );
        // This code is not exception safe.
        // We will have a memory leak if we
        // do not call pthread_attr_destroy

        ::pthread_attr_t attr;
        ::pthread_attr_init( &attr );
        ::pthread_attr_setdetachstate( &attr, m_isDaemon );
        ::pthread_create( &m_thread, &attr, startFunc, this );
        ::pthread_attr_destroy( &attr );
    }

    void kill()
    {
        m_KeepRunning = false;
        ::pthread_cancel( m_thread );
    }

    void join()
    {
        m_KeepRunning = false;
        ::pthread_join( m_thread, NULL );
    }

    virtual void ThdFunc() = 0;

    bool KeepRunning() {return m_KeepRunning; }
    void SetTerminateFlag() { m_KeepRunning = false;}
};

void * MyThreadBase::startFunc(void *p)
{
    MyThreadBase* ap( static_cast(p));
    try
    {
        ap->ThdFunc();
    }
    catch(...)
    {
        cout << "Exception caught in startFunc\n";
    }
    return NULL;
}

// Consumer threads pull off requests from
// the queue and process them.
class ConsumerThread: public MyThreadBase
{
    BQueue & m_Queue;

public:
    ConsumerThread(BQueue & inQ):MyThreadBase(true), m_Queue(inQ)
    {
    }

    ~ConsumerThread()
    {
    }

    void ThdFunc()
    {
        cout << "Hello, I am a consumer thread!\n";
        while( KeepRunning() )
        {
            int aData = m_Queue.RemoveItem();
            cout << "Data item is " << aData << "\n";
        }

    }

};

// Producer threads produce data and put into
// the queue.
class ProducerThread: public MyThreadBase
{

    bool m_Done;
    BQueue & m_Queue;

public:
    ProducerThread(BQueue & inQ ):
    MyThreadBase(true), m_Done(false), m_Queue(inQ)
    {
    }

    ~ProducerThread()
    {
    }

    bool done() {return m_Done ;}

    void ThdFunc()
    {
        for(int i=0; i< 100; i++)
        {
            cout << "Producer Thread: Adding item " << i << "\n";
            m_Queue.AddItem(  i );
        }
        m_Done = true;

    }

};


#define MAX_CONSUMERS 5
#define MAX_PRODUCERS 1


int main(int, char **)
{
    cout << "Hello world!\n";
    cout << "Bonded queue simulation. Please enter the following data...\n";


    BQueue a_Queue;

    ConsumerThread * ct[ MAX_CONSUMERS ];

    for(int i=0;i< MAX_CONSUMERS ;i++)
    {
        ct[i] = new ConsumerThread( a_Queue );
        ct[i]->start();
    }
    cout << "Consumer threads created!!\n";

    ProducerThread aProducer( a_Queue );
    aProducer.start();
    while(! aProducer.done() )
    {
        cout << "Main thread. Producer is not yet done!\n";
        ::sleep(10);
    }

    cout << "deleting all the threads...\n";

    for(int i=0;i< MAX_CONSUMERS ;i++)
    {
        ct[i]->kill();
        delete ct[i] ;
    }

}