A common programing problem is program while writing performant client-server applications is to have multiple threads service client requests and data. This increases the effeftive throughput, expecially if the processing requires I/O to/from a slow device. With such a scenarion, having multiple threads to service client requests dramatically improves the thruput because if a thread is blocked for i/o, other threads can start processing on other requests.
In this blog, I present the code for a simple queuing mechanism.
First, we define some utility classes.
Locker is a convenient class to automatically lock and unlock a mutex. It is designed for exception safety. We lock the mutex in the constructor and unlock in the destructor.
The next class is a convenient class to start, stop and join threads. Note, that this class is a simple wrapper and it is not well designed for portabality. In the implementation here, we assume that we are using POSIX threads and all threads are daemon threads (daemon threads terminate when the app terminates as opposed to joinable threads, which have to terminate before the app is terminated by the OS.
Subclasses of this class must override the ThdFunc, which is the main thread function.
Note that there is a race condition in this arrangement. If we create the underlying pthread in the constructor itself, sometimes the ThdFunc gets called even before the constructor of MyThreadBase has finished ! Since the ThdFunc is a pure virtual function, we get a system "Null pointer exception! Therefore, the underlying thread is created in the start call, which can only be made when the constructor returns and the object is fully constructed by that time.
class MyThreadBase { protected: static void *startFunc(void *); pthread_t m_thread; bool m_KeepRunning; bool m_isDaemon; public: MyThreadBase(bool isDaemon = true ); virtual ~MyThreadBase(); void start(); void kill(); void join(); virtual void ThdFunc() = 0; bool KeepRunning() {return m_KeepRunning; } void SetTerminateFlag() { m_KeepRunning = false;} };
Here is the code in its entirety
#include <iostream> #include <list> #include <pthread.h> using namespace std; // Convenient class to make sure the // mutex is unlocked even if there is // an exception. struct Locker { Locker(pthread_mutex_t & inMutex) : m_Mutex(inMutex) { ::pthread_mutex_lock( &m_Mutex); } ~Locker() { ::pthread_mutex_unlock( &m_Mutex ); } private: pthread_mutex_t & m_Mutex; }; // This is the bonded-queue class. In the // example we use a list to store elements // because we can remove items from the // front of the queue. We could have used a // dqueue also. We did not want to use a vector // because removing an item from the front of // the vector will force a copy of all subsequent // items in the vector. However, if you were using // pointers to messages, then a vector would be // more efficiend. Ideally, this class should // be a template class and the data type should // be a vector of shared_ptr #define QUEUE_SIZE 10 class BQueue { listm_Items; pthread_mutex_t m_QueueMutex; pthread_cond_t m_EmptyCond; // wait on this while trying to read pthread_cond_t m_FullCond; // wait on this while trying to write public: BQueue(); ~BQueue(); void AddItem(int inItem); int RemoveItem(); }; BQueue::BQueue() { //m_Items.resize( QUEUE_SIZE ); ::pthread_mutex_init( &m_QueueMutex, NULL ); ::pthread_cond_init( &m_EmptyCond, NULL ); ::pthread_cond_init( &m_FullCond, NULL ); } BQueue::~BQueue() { } int BQueue::RemoveItem() { Locker w(m_QueueMutex); while( m_Items.empty() ) { ::pthread_cond_wait( &m_EmptyCond, &m_QueueMutex ); } int aRet = m_Items.front(); m_Items.pop_front(); ::pthread_cond_signal( &m_FullCond ); return aRet; } void BQueue::AddItem(int inValue) { Locker w( m_QueueMutex ); while( m_Items.size() >= QUEUE_SIZE ) { cout << "BQueue::AddItem, queue is full!\n"; ::pthread_cond_wait( &m_FullCond, &m_QueueMutex); } m_Items.push_back( inValue ); ::pthread_cond_signal( &m_EmptyCond ); } class MyThreadBase { protected: static void *startFunc(void *); pthread_t m_thread; bool m_KeepRunning; bool m_isDaemon; public: MyThreadBase(bool isDaemon = true ) : m_thread(0), m_KeepRunning(true), m_isDaemon( isDaemon ) { } virtual ~MyThreadBase() { } void start() { // assert( m_thread == NULL ); // This code is not exception safe. // We will have a memory leak if we // do not call pthread_attr_destroy ::pthread_attr_t attr; ::pthread_attr_init( &attr ); ::pthread_attr_setdetachstate( &attr, m_isDaemon ); ::pthread_create( &m_thread, &attr, startFunc, this ); ::pthread_attr_destroy( &attr ); } void kill() { m_KeepRunning = false; ::pthread_cancel( m_thread ); } void join() { m_KeepRunning = false; ::pthread_join( m_thread, NULL ); } virtual void ThdFunc() = 0; bool KeepRunning() {return m_KeepRunning; } void SetTerminateFlag() { m_KeepRunning = false;} }; void * MyThreadBase::startFunc(void *p) { MyThreadBase* ap( static_cast (p)); try { ap->ThdFunc(); } catch(...) { cout << "Exception caught in startFunc\n"; } return NULL; } // Consumer threads pull off requests from // the queue and process them. class ConsumerThread: public MyThreadBase { BQueue & m_Queue; public: ConsumerThread(BQueue & inQ):MyThreadBase(true), m_Queue(inQ) { } ~ConsumerThread() { } void ThdFunc() { cout << "Hello, I am a consumer thread!\n"; while( KeepRunning() ) { int aData = m_Queue.RemoveItem(); cout << "Data item is " << aData << "\n"; } } }; // Producer threads produce data and put into // the queue. class ProducerThread: public MyThreadBase { bool m_Done; BQueue & m_Queue; public: ProducerThread(BQueue & inQ ): MyThreadBase(true), m_Done(false), m_Queue(inQ) { } ~ProducerThread() { } bool done() {return m_Done ;} void ThdFunc() { for(int i=0; i< 100; i++) { cout << "Producer Thread: Adding item " << i << "\n"; m_Queue.AddItem( i ); } m_Done = true; } }; #define MAX_CONSUMERS 5 #define MAX_PRODUCERS 1 int main(int, char **) { cout << "Hello world!\n"; cout << "Bonded queue simulation. Please enter the following data...\n"; BQueue a_Queue; ConsumerThread * ct[ MAX_CONSUMERS ]; for(int i=0;i< MAX_CONSUMERS ;i++) { ct[i] = new ConsumerThread( a_Queue ); ct[i]->start(); } cout << "Consumer threads created!!\n"; ProducerThread aProducer( a_Queue ); aProducer.start(); while(! aProducer.done() ) { cout << "Main thread. Producer is not yet done!\n"; ::sleep(10); } cout << "deleting all the threads...\n"; for(int i=0;i< MAX_CONSUMERS ;i++) { ct[i]->kill(); delete ct[i] ; } }
No comments:
Post a Comment