Sunday, February 14, 2010

Observer pattern

The Observer Pattern is classified under Object Behavioral Patterns in the book, Design Patterns: Elements of Reusable Object-Oriented Software by Erich Gamma et al. (Addison-Wesley, 1995). In this article, I will be using the terms used by 'Gang of Four (GoF)' to explain Observer Pattern. First, let us understand the problem of object interaction.

GoF classifies the Observer Pattern as an Object Behavioral Pattern. The Observer Pattern is intended to "Define a one-to-many dependency between objects so that when one object changes state, all its dependents are notified and updated automatically". An object that is subjected to change is called a Subject and an object that depends on the Subject's state is called an Observer. In the above example, 'B' can be a Subject and Objects A and C can be Observers. A Subject can have any number of Observers. When the Subject changes its state, it Notifies all the Observers and the Observers query the Subject to maintain state consistency with the Subject.

Here is the code.

#include <iostream>
#include <list>
#include <algorithm>



/*
 * Code for the observer pattern
 *
 */


// fwd declare the classes
class Observer;
class Observable;

// Derive your class from this to
// get notifications in DataChanged
class Observer
{
    Observable *m_Observing;

public:
    Observer();
    virtual ~Observer();
    void AttachToObservable(Observable *);
    void UnhookFromObservable( );
    virtual void DataChanged( Observable * );

};

// Derive your class from Observable and call Notify
// whenever the data changes
class Observable
{
    typedef std::list>Observer *< ObserverList;
    typedef ObserverList::iterator ObserverList_Iter;

    ObserverList  m_Observers;

public:
    Observable();
    virtual ~Observable();
    void AddObserver(Observer *);
    void RemoveObserver(Observer *);
    void Notify();
};


Observable::Observable()
{
}

Observable::~Observable()
{
    ObserverList_Iter aIter;
    for( aIter = m_Observers.begin(); aIter != m_Observers.end();++aIter)
    {
        (**aIter).UnhookFromObservable();
    }
}

void Observable::AddObserver(Observer *o)
{
    ObserverList_Iter aIter;

    aIter = std::find( m_Observers.begin(), m_Observers.end(), o);
    if( aIter == m_Observers.end())
    {
        m_Observers.push_back( o);
    }
}


/*
 * This is called from the observer. We find the
 * observer in the list and remove the item.
 *
 */

void Observable::RemoveObserver(Observer *o)
{
    ObserverList_Iter aIter;

    aIter = std::find( m_Observers.begin(), m_Observers.end(), o);
    if( aIter != m_Observers.end())
    {
        m_Observers.erase( aIter );
    }
}


/*
 * Observable::Notify. Notify all the observers that
 * the data has changed.
 *
 */

void Observable::Notify()
{
    ObserverList_Iter aIter;

    for( aIter = m_Observers.begin(); aIter!= m_Observers.end(); ++aIter)
    {
        (**aIter).DataChanged( this );
    }
}

// ===============================================

Observer::Observer()
: m_Observing( NULL )
{
}

Observer::~Observer()
{
    // We are being destroyed. Make sure
    // we remove ourselves from the 
    // observer we attached ourself to.
    UnhookFromObservable();
}

void Observer::UnhookFromObservable()
{
    if( m_Observing )
    {
        m_Observing->RemoveObserver(this);
    }
    m_Observing = NULL;
}

void Observer::DataChanged(Observable*)
{
    std::cout << "Hey, you should override DataChanged in your derived class !!\n";
}

/*
 * In my line of work we price bonds off 
 * benchmark bonds. If the price of the 
 * benchmark bond changes, the price of all bonds
 * that are priced off that benchmark must be 
 * adjusted accordingly.
 *
 * Here is a simple class, Instrument that 
 * can be used for both the 
 * benchmark and the dependent bond. 
 *
 */
class Instrument : public Observable, public Observer
{
    double m_Price;
    double m_Spread;
    std::string m_ID;

public:
    Instrument( std::string & inID)
    : m_ID(inID), m_Price(0.0), m_Spread(10.0)
    {
    }
    Instrument(const char *p)
    : m_ID(p), m_Price(0.0)
    {
    }

    void PriceChanged( double inPrice )
    {
        m_Price = inPrice;
        Notify();
    }
    void DataChanged( Observable *o)
    {
        Instrument *i = dynamic_cast(o);
        if(i)
        {
            std::cout << m_ID << " price updated from " << i->m_ID << "\n";
            this->m_Price = i-<m_Price + m_Spread;
        }
    }
};


void test1()
{
    Instrument  i1( "FGBLZ0");
    Instrument  i2( "DBS bank 2010");

    i1.AddObserver( &i2 );
    i1.PriceChanged( 100);

}


int main(int, char **)
{
    std::cout << "Hello, world!\n";

    test1();

    return 0;
}

// EOF

Sunday, February 7, 2010

Producer consumer

A common programing problem is program while writing performant client-server applications is to have multiple threads service client requests and data. This increases the effeftive throughput, expecially if the processing requires I/O to/from a slow device. With such a scenarion, having multiple threads to service client requests dramatically improves the thruput because if a thread is blocked for i/o, other threads can start processing on other requests.

In this blog, I present the code for a simple queuing mechanism.

First, we define some utility classes.

Locker is a convenient class to automatically lock and unlock a mutex. It is designed for exception safety. We lock the mutex in the constructor and unlock in the destructor.

The next class is a convenient class to start, stop and join threads. Note, that this class is a simple wrapper and it is not well designed for portabality. In the implementation here, we assume that we are using POSIX threads and all threads are daemon threads (daemon threads terminate when the app terminates as opposed to joinable threads, which have to terminate before the app is terminated by the OS.

Subclasses of this class must override the ThdFunc, which is the main thread function.

Note that there is a race condition in this arrangement. If we create the underlying pthread in the constructor itself, sometimes the ThdFunc gets called even before the constructor of MyThreadBase has finished ! Since the ThdFunc is a pure virtual function, we get a system "Null pointer exception! Therefore, the underlying thread is created in the start call, which can only be made when the constructor returns and the object is fully constructed by that time.


class MyThreadBase
{
protected:
    static void *startFunc(void *);

    pthread_t  m_thread;
    bool m_KeepRunning;
    bool m_isDaemon;

public:

    MyThreadBase(bool isDaemon = true );


    virtual ~MyThreadBase();

    void start();

    void kill();
    void join();

    virtual void ThdFunc() = 0;

    bool KeepRunning() {return m_KeepRunning; }
    void SetTerminateFlag() { m_KeepRunning = false;}
};

Here is the code in its entirety

#include <iostream>
#include <list>
#include <pthread.h>


using namespace std;

// Convenient class to make sure the
// mutex is unlocked even if there is 
// an exception.
struct Locker
{

    Locker(pthread_mutex_t & inMutex)
    : m_Mutex(inMutex)
    {
        ::pthread_mutex_lock( &m_Mutex);
    }

    ~Locker()
    {
        ::pthread_mutex_unlock( &m_Mutex );
    }

private:
    pthread_mutex_t & m_Mutex;
};



// This is the bonded-queue class. In the 
// example we use a list to store elements
// because we can remove items from the 
// front of the queue. We could have used a
// dqueue also. We did not want to use a vector
// because removing an item from the front of
// the vector will force a copy of all subsequent
// items in the vector. However, if you were using
// pointers to messages, then a vector would be
// more efficiend. Ideally, this class should
// be a template class and the data type should
// be a vector of shared_ptr

#define QUEUE_SIZE  10

class BQueue
{
    list m_Items;
    pthread_mutex_t  m_QueueMutex;
    pthread_cond_t   m_EmptyCond;   // wait on this while trying to read
    pthread_cond_t   m_FullCond;  // wait on this while trying to write
public:

    BQueue();
    ~BQueue();
    void  AddItem(int inItem);
    int   RemoveItem();
};

BQueue::BQueue()
{
    //m_Items.resize( QUEUE_SIZE );
    ::pthread_mutex_init( &m_QueueMutex, NULL );
    ::pthread_cond_init( &m_EmptyCond, NULL );
    ::pthread_cond_init( &m_FullCond, NULL );
}

BQueue::~BQueue()
{
}


int BQueue::RemoveItem()
{
    Locker w(m_QueueMutex);
    while( m_Items.empty() )
    {
        ::pthread_cond_wait( &m_EmptyCond, &m_QueueMutex );
    }
    int aRet = m_Items.front();
    m_Items.pop_front();
    ::pthread_cond_signal( &m_FullCond );
    return aRet;
}


void BQueue::AddItem(int inValue)
{
    Locker w( m_QueueMutex );
    while( m_Items.size() >= QUEUE_SIZE )
    {
        cout << "BQueue::AddItem, queue is full!\n";
        ::pthread_cond_wait( &m_FullCond, &m_QueueMutex);
    }

    m_Items.push_back( inValue );
    ::pthread_cond_signal( &m_EmptyCond );
}





class MyThreadBase
{
protected:
    static void *startFunc(void *);

    pthread_t  m_thread;
    bool m_KeepRunning;
    bool m_isDaemon;

public:

    MyThreadBase(bool isDaemon = true )
    : m_thread(0),
    m_KeepRunning(true),
    m_isDaemon( isDaemon )
    {
    }


    virtual ~MyThreadBase()
    {
    }

    void start()
    {
        // assert( m_thread == NULL );
        // This code is not exception safe.
        // We will have a memory leak if we
        // do not call pthread_attr_destroy

        ::pthread_attr_t attr;
        ::pthread_attr_init( &attr );
        ::pthread_attr_setdetachstate( &attr, m_isDaemon );
        ::pthread_create( &m_thread, &attr, startFunc, this );
        ::pthread_attr_destroy( &attr );
    }

    void kill()
    {
        m_KeepRunning = false;
        ::pthread_cancel( m_thread );
    }

    void join()
    {
        m_KeepRunning = false;
        ::pthread_join( m_thread, NULL );
    }

    virtual void ThdFunc() = 0;

    bool KeepRunning() {return m_KeepRunning; }
    void SetTerminateFlag() { m_KeepRunning = false;}
};

void * MyThreadBase::startFunc(void *p)
{
    MyThreadBase* ap( static_cast(p));
    try
    {
        ap->ThdFunc();
    }
    catch(...)
    {
        cout << "Exception caught in startFunc\n";
    }
    return NULL;
}

// Consumer threads pull off requests from
// the queue and process them.
class ConsumerThread: public MyThreadBase
{
    BQueue & m_Queue;

public:
    ConsumerThread(BQueue & inQ):MyThreadBase(true), m_Queue(inQ)
    {
    }

    ~ConsumerThread()
    {
    }

    void ThdFunc()
    {
        cout << "Hello, I am a consumer thread!\n";
        while( KeepRunning() )
        {
            int aData = m_Queue.RemoveItem();
            cout << "Data item is " << aData << "\n";
        }

    }

};

// Producer threads produce data and put into
// the queue.
class ProducerThread: public MyThreadBase
{

    bool m_Done;
    BQueue & m_Queue;

public:
    ProducerThread(BQueue & inQ ):
    MyThreadBase(true), m_Done(false), m_Queue(inQ)
    {
    }

    ~ProducerThread()
    {
    }

    bool done() {return m_Done ;}

    void ThdFunc()
    {
        for(int i=0; i< 100; i++)
        {
            cout << "Producer Thread: Adding item " << i << "\n";
            m_Queue.AddItem(  i );
        }
        m_Done = true;

    }

};


#define MAX_CONSUMERS 5
#define MAX_PRODUCERS 1


int main(int, char **)
{
    cout << "Hello world!\n";
    cout << "Bonded queue simulation. Please enter the following data...\n";


    BQueue a_Queue;

    ConsumerThread * ct[ MAX_CONSUMERS ];

    for(int i=0;i< MAX_CONSUMERS ;i++)
    {
        ct[i] = new ConsumerThread( a_Queue );
        ct[i]->start();
    }
    cout << "Consumer threads created!!\n";

    ProducerThread aProducer( a_Queue );
    aProducer.start();
    while(! aProducer.done() )
    {
        cout << "Main thread. Producer is not yet done!\n";
        ::sleep(10);
    }

    cout << "deleting all the threads...\n";

    for(int i=0;i< MAX_CONSUMERS ;i++)
    {
        ct[i]->kill();
        delete ct[i] ;
    }

}

Tuesday, January 12, 2010

C++ Algorithms summary

The following functions are defined in or , and are part of the std namespace.

accumulate sum up a range of elements

adjacent_difference compute the differences between adjacent elements in a range

adjacent_find finds two identical (or some other relationship) items adjacent to each other.

  FI adjacent_find(FI first, FI last);
  FI adjacent_find( FI first, FI last, Pred p);

binary_search determine if an element exists in a certain range

copy copy some range of elements to a new location

copy_backward copy a range of elements in backwards order

count return the number of elements matching a given value

count_if return the number of elements for which a predicate is true

equal determine if two sets of elements are the same

equal_range search for a range of elements that are all equal to a certain element

fill assign a range of elements a certain value

fill_n assign a value to some number of elements

find find a value in a given range

find_end find the last sequence of elements in a certain range

find_first_of search for any one of a set of elements

find_if find the first element for which a certain predicate is true

for_each apply a function to a range of elements

generate saves the result of a function in a range

generate_n saves the result of N applications of a function

includes returns true if one set is a subset of another

inner_product compute the inner product of two ranges of elements

inplace_merge merge two ordered ranges in-place

is_heap returns true if a given range is a heap

iter_swap swaps the elements pointed to by two iterators

lexicographical_compare returns true if one range is lexicographically less than another

lower_bound search for the first place that a value can be inserted while preserving order

make_heap creates a heap out of a range of elements

max returns the larger of two elements

max_element returns the largest element in a range

merge merge two sorted ranges

min returns the smaller of two elements

min_element returns the smallest element in a range

mismatch finds the first position where two ranges differ

next_permutation generates the next greater lexicographic permutation of a range of elements

nth_element put one element in its sorted location and make sure that no elements to its left are greater than any elements to its right

partial_sort sort the first N elements of a range

partial_sort_copy copy and partially sort a range of elements

partial_sum compute the partial sum of a range of elements

partition divide a range of elements into two groups

pop_heap remove the largest element from a heap

prev_permutation generates the next smaller lexicographic permutation of a range of elements

push_heap add an element to a heap

random_shuffle randomly re-order elements in some range

remove remove elements equal to certain value

remove_copy copy a range of elements omitting those that match a certain value

remove_copy_if create a copy of a range of elements, omitting any for which a predicate is true

remove_if remove all elements for which a predicate is true

replace replace every occurrence of some value in a range with another value

replace_copy copy a range, replacing certain elements with new ones

replace_copy_if copy a range of elements, replacing those for which a predicate is true

replace_if change the values of elements for which a predicate is true

reverse reverse elements in some range

reverse_copy create a copy of a range that is reversed

rotate move the elements in some range to the left by some amount

rotate_copy copy and rotate a range of elements

search search for a range of elements

search_n search for N consecutive copies of an element in some range

set_difference computes the difference between two sets

set_intersection computes the intersection of two sets

set_symmetric_difference computes the symmetric difference between two sets

set_union computes the union of two sets

sort sort a range into ascending order

sort_heap turns a heap into a sorted range of elements

stable_partition divide elements into two groups while preserving their relative order

stable_sort sort a range of elements while preserving order between equal elements

swap swap the values of two objects

swap_ranges swaps two ranges of elements

transform applies a function to a range of elements

unique remove consecutive duplicate elements in a range

unique_copy creates a copy of some range of elements that contains no consecutive duplicates

upper_bound searches for the last place that a value can be inserted while preserving order (first place that is greater than the value)