implemented multithreaded ObjectPool and test for it.

This commit is contained in:
rachytski 2011-06-25 14:25:56 +03:00 committed by Alex Zolotarev
parent ffabf8bc74
commit 5b8c7eb503
8 changed files with 219 additions and 22 deletions

View file

@ -66,3 +66,4 @@ HEADERS += \
path_utils.hpp \
array_adapters.hpp \
runner.hpp \
object_pool.hpp

View file

@ -31,5 +31,6 @@ SOURCES += \
buffer_vector_test.cpp \
assert_test.cpp \
timer_test.cpp \
object_pool_test.cpp\
HEADERS +=

View file

@ -0,0 +1,79 @@
#include "../../base/SRC_FIRST.hpp"
#include "../../testing/testing.hpp"
#include "../object_pool.hpp"
#include "../thread.hpp"
#include "../../base/logging.hpp"
namespace my
{
void sleep(int ms)
{
timespec t;
t.tv_nsec =(ms * 1000000) % 1000000000;
t.tv_sec = (ms * 1000000) / 1000000000;
nanosleep(&t, 0);
}
}
struct ProcessorThread : public threads::IRoutine
{
ObjectPool<int> * m_p;
int m_data;
list<int> * m_res;
int m_id;
ProcessorThread(ObjectPool<int> * p, list<int> * res, int id) : m_p(p), m_res(res), m_id(id)
{}
virtual void Do()
{
while (!m_p->IsCancelled())
{
int res = m_p->Reserve();
m_res->push_back(res);
LOG(LINFO, (m_id, " thread got ", res));
my::sleep(10);
}
LOG(LINFO, (m_id, " thread is cancelled"));
}
};
UNIT_TEST(ObjectPool)
{
list<int> l;
list<int> res;
ObjectPool<int> p(l);
threads::Thread t0;
t0.Create(new ProcessorThread(&p, &res, 0));
threads::Thread t1;
t1.Create(new ProcessorThread(&p, &res, 1));
threads::Thread t2;
t2.Create(new ProcessorThread(&p, &res, 2));
p.Free(0);
my::sleep(200);
p.Free(1);
my::sleep(200);
p.Free(2);
my::sleep(200);
TEST_EQUAL(res.front(), 0, ());
res.pop_front();
TEST_EQUAL(res.front(), 1, ());
res.pop_front();
TEST_EQUAL(res.front(), 2, ());
res.pop_front();
p.Cancel();
t0.Join();
t1.Join();
t2.Join();
}

View file

@ -19,7 +19,7 @@ namespace threads
Condition();
~Condition();
void Signal();
void Signal(bool broadcast = false);
void Wait();
void Lock();
void Unlock();
@ -33,7 +33,7 @@ namespace threads
: m_Condition(condition) { m_Condition.Lock(); }
~ConditionGuard() { m_Condition.Unlock(); }
void Wait() { m_Condition.Wait(); }
void Signal() { m_Condition.Signal(); }
void Signal(bool broadcast = false) { m_Condition.Signal(broadcast); }
private:
Condition & m_Condition;
};

View file

@ -27,9 +27,12 @@ namespace threads
delete m_pImpl;
}
void Condition::Signal()
void Condition::Signal(bool broadcast)
{
m_pImpl->m_Monitor.Notify();
if (broadcast)
m_pImpl->m_Monitor.NotifyAll();
else
m_pImpl->m_Monitor.Notify();
}
void Condition::Wait()

View file

@ -30,9 +30,12 @@ namespace threads
delete m_pImpl;
}
void Condition::Signal()
void Condition::Signal(bool broadcast)
{
::pthread_cond_signal(&m_pImpl->m_Condition);
if (broadcast)
::pthread_cond_broadcast(&m_pImpl->m_Condition);
else
::pthread_cond_signal(&m_pImpl->m_Condition);
}
void Condition::Wait()

View file

@ -9,6 +9,7 @@
typedef void (WINAPI *InitFn)(PCONDITION_VARIABLE);
typedef void (WINAPI *WakeFn)(PCONDITION_VARIABLE);
typedef void (WINAPI *WakeAllFn)(PCONDITION_VARIABLE);
typedef BOOL (WINAPI *SleepFn)(PCONDITION_VARIABLE, PCRITICAL_SECTION, DWORD);
namespace threads
@ -19,7 +20,7 @@ namespace threads
{
public:
virtual ~ConditionImpl() {}
virtual void Signal() = 0;
virtual void Signal(bool broadcast) = 0;
virtual void Wait() = 0;
virtual void Lock() = 0;
virtual void Unlock() = 0;
@ -29,21 +30,25 @@ namespace threads
{
InitFn m_pInit;
WakeFn m_pWake;
WakeAllFn m_pWakeAll;
SleepFn m_pSleep;
CONDITION_VARIABLE m_Condition;
Mutex m_mutex;
public:
ImplWinVista(InitFn pInit, WakeFn pWake, SleepFn pSleep)
: m_pInit(pInit), m_pWake(pWake), m_pSleep(pSleep)
ImplWinVista(InitFn pInit, WakeFn pWake, WakeAllFn pWakeAll, SleepFn pSleep)
: m_pInit(pInit), m_pWake(pWake), m_pWakeAll(pWakeAll), m_pSleep(pSleep)
{
m_pInit(&m_Condition);
}
void Signal()
void Signal(bool broadcast)
{
m_pWake(&m_Condition);
if (broadcast)
m_pWakeAll(&m_Condition);
else
m_pWake(&m_Condition);
}
void Wait()
@ -105,15 +110,51 @@ namespace threads
::DeleteCriticalSection(&waiters_count_lock_);
}
void Signal()
void Signal(bool broadcast)
{
EnterCriticalSection(&waiters_count_lock_);
bool const have_waiters = waiters_count_ > 0;
LeaveCriticalSection(&waiters_count_lock_);
if (broadcast)
{
// This is needed to ensure that <waiters_count_> and <was_broadcast_> are
// consistent relative to each other
EnterCriticalSection(&waiters_count_lock_);
int have_waiters = 0;
// If there aren't any waiters, then this is a no-op.
if (have_waiters)
::ReleaseSemaphore(sema_, 1, 0);
if (waiters_count_ > 0)
{
// We are broadcasting, even if there is just one waiter...
// Record that we are broadcasting, which helps optimize
// <pthread_cond_wait> for the non-broadcast case.
was_broadcast_ = 1;
have_waiters = 1;
}
if (have_waiters)
{
// Wake up all the waiters atomically.
ReleaseSemaphore(sema_, waiters_count_, 0);
LeaveCriticalSection(&waiters_count_lock_);
// Wait for all the awakened threads to acquire the counting
// semaphore.
WaitForSingleObject(waiters_done_, INFINITE);
// This assignment is okay, wven without the <waiters_count_lock_> held
// because no other waiter threads can wake up to access it.
was_broadcast_ = 0;
}
else
LeaveCriticalSection(&waiters_count_lock_);
}
else
{
EnterCriticalSection(&waiters_count_lock_);
bool const have_waiters = waiters_count_ > 0;
LeaveCriticalSection(&waiters_count_lock_);
// If there aren't any waiters, then this is a no-op.
if (have_waiters)
::ReleaseSemaphore(sema_, 1, 0);
}
}
void Wait()
@ -168,10 +209,11 @@ namespace threads
HMODULE handle = GetModuleHandle(TEXT("kernel32.dll"));
InitFn pInit = (InitFn)GetProcAddress(handle, "InitializeConditionVariable");
WakeFn pWake = (WakeFn)GetProcAddress(handle, "WakeConditionVariable");
WakeAllFn pWakeAll = (WakeFn)GetProcAddress(handle, "WakeAllConditionVariable");
SleepFn pSleep = (SleepFn)GetProcAddress(handle, "SleepConditionVariableCS");
if (pInit && pWake && pSleep)
m_pImpl = new impl::ImplWinVista(pInit, pWake, pSleep);
if (pInit && pWake && pWakeAll && pSleep)
m_pImpl = new impl::ImplWinVista(pInit, pWake, pWakeAll, pSleep);
else
m_pImpl = new impl::ImplWinXP();
}
@ -181,9 +223,9 @@ namespace threads
delete m_pImpl;
}
void Condition::Signal()
void Condition::Signal(bool broadcast)
{
m_pImpl->Signal();
m_pImpl->Signal(broadcast);
}
void Condition::Wait()

68
base/object_pool.hpp Normal file
View file

@ -0,0 +1,68 @@
#pragma once
#include "condition.hpp"
#include "../std/list.hpp"
/// multithreaded object pool.
/// implements condition waiting scheme, to save the CPU cycles.
template <typename T>
class ObjectPool
{
private:
list<T> m_List;
threads::Condition m_Cond;
bool m_IsCancelled;
public:
ObjectPool(list<T> const & l) : m_List(l), m_IsCancelled(false)
{}
T const Reserve()
{
threads::ConditionGuard Guard(m_Cond);
while (m_List.empty())
{
if (m_IsCancelled)
break;
m_Cond.Wait();
}
if (m_IsCancelled)
return T();
T res = m_List.front();
m_List.pop_front();
return res;
}
/// cancel all waiting requests
void Cancel()
{
m_IsCancelled = true;
m_Cond.Signal(true);
}
bool IsCancelled() const
{
return m_IsCancelled;
}
void Free(T const & t)
{
threads::ConditionGuard Guard(m_Cond);
bool DoSignal = m_List.empty();
m_List.push_back(t);
if (DoSignal)
m_Cond.Signal(); //< this doesn't release the mutex associated with m_cond,
/// we should return as quickly as possible to minimize "waked threads" waiting time
}
};