From 5b8c7eb5033fe4e1635b840e2b1b3c6b5016e9f4 Mon Sep 17 00:00:00 2001 From: rachytski Date: Sat, 25 Jun 2011 14:25:56 +0300 Subject: [PATCH] implemented multithreaded ObjectPool and test for it. --- base/base.pro | 1 + base/base_tests/base_tests.pro | 1 + base/base_tests/object_pool_test.cpp | 79 ++++++++++++++++++++++++++++ base/condition.hpp | 4 +- base/condition_bada.cpp | 7 ++- base/condition_posix.cpp | 7 ++- base/condition_windows_native.cpp | 74 ++++++++++++++++++++------ base/object_pool.hpp | 68 ++++++++++++++++++++++++ 8 files changed, 219 insertions(+), 22 deletions(-) create mode 100644 base/base_tests/object_pool_test.cpp create mode 100644 base/object_pool.hpp diff --git a/base/base.pro b/base/base.pro index bbc5bbfa6d..27aa74814f 100644 --- a/base/base.pro +++ b/base/base.pro @@ -66,3 +66,4 @@ HEADERS += \ path_utils.hpp \ array_adapters.hpp \ runner.hpp \ + object_pool.hpp diff --git a/base/base_tests/base_tests.pro b/base/base_tests/base_tests.pro index ed0f561557..2987046bc6 100644 --- a/base/base_tests/base_tests.pro +++ b/base/base_tests/base_tests.pro @@ -31,5 +31,6 @@ SOURCES += \ buffer_vector_test.cpp \ assert_test.cpp \ timer_test.cpp \ + object_pool_test.cpp\ HEADERS += diff --git a/base/base_tests/object_pool_test.cpp b/base/base_tests/object_pool_test.cpp new file mode 100644 index 0000000000..60300f4120 --- /dev/null +++ b/base/base_tests/object_pool_test.cpp @@ -0,0 +1,79 @@ +#include "../../base/SRC_FIRST.hpp" +#include "../../testing/testing.hpp" + +#include "../object_pool.hpp" +#include "../thread.hpp" +#include "../../base/logging.hpp" + +namespace my +{ + void sleep(int ms) + { + timespec t; + t.tv_nsec =(ms * 1000000) % 1000000000; + t.tv_sec = (ms * 1000000) / 1000000000; + nanosleep(&t, 0); + } +} + +struct ProcessorThread : public threads::IRoutine +{ + ObjectPool * m_p; + int m_data; + list * m_res; + int m_id; + + ProcessorThread(ObjectPool * p, list * res, int id) : m_p(p), m_res(res), m_id(id) + {} + + virtual void Do() + { + while (!m_p->IsCancelled()) + { + int res = m_p->Reserve(); + m_res->push_back(res); + LOG(LINFO, (m_id, " thread got ", res)); + my::sleep(10); + } + LOG(LINFO, (m_id, " thread is cancelled")); + } +}; + +UNIT_TEST(ObjectPool) +{ + list l; + list res; + + ObjectPool p(l); + + threads::Thread t0; + t0.Create(new ProcessorThread(&p, &res, 0)); + + threads::Thread t1; + t1.Create(new ProcessorThread(&p, &res, 1)); + + threads::Thread t2; + t2.Create(new ProcessorThread(&p, &res, 2)); + + p.Free(0); + my::sleep(200); + + p.Free(1); + my::sleep(200); + + p.Free(2); + my::sleep(200); + + TEST_EQUAL(res.front(), 0, ()); + res.pop_front(); + TEST_EQUAL(res.front(), 1, ()); + res.pop_front(); + TEST_EQUAL(res.front(), 2, ()); + res.pop_front(); + + p.Cancel(); + + t0.Join(); + t1.Join(); + t2.Join(); +} diff --git a/base/condition.hpp b/base/condition.hpp index 8841eae5bf..5b30b6bfaf 100644 --- a/base/condition.hpp +++ b/base/condition.hpp @@ -19,7 +19,7 @@ namespace threads Condition(); ~Condition(); - void Signal(); + void Signal(bool broadcast = false); void Wait(); void Lock(); void Unlock(); @@ -33,7 +33,7 @@ namespace threads : m_Condition(condition) { m_Condition.Lock(); } ~ConditionGuard() { m_Condition.Unlock(); } void Wait() { m_Condition.Wait(); } - void Signal() { m_Condition.Signal(); } + void Signal(bool broadcast = false) { m_Condition.Signal(broadcast); } private: Condition & m_Condition; }; diff --git a/base/condition_bada.cpp b/base/condition_bada.cpp index 938185d3b6..c909cd9b21 100644 --- a/base/condition_bada.cpp +++ b/base/condition_bada.cpp @@ -27,9 +27,12 @@ namespace threads delete m_pImpl; } - void Condition::Signal() + void Condition::Signal(bool broadcast) { - m_pImpl->m_Monitor.Notify(); + if (broadcast) + m_pImpl->m_Monitor.NotifyAll(); + else + m_pImpl->m_Monitor.Notify(); } void Condition::Wait() diff --git a/base/condition_posix.cpp b/base/condition_posix.cpp index c5087eb359..64183a4bdb 100644 --- a/base/condition_posix.cpp +++ b/base/condition_posix.cpp @@ -30,9 +30,12 @@ namespace threads delete m_pImpl; } - void Condition::Signal() + void Condition::Signal(bool broadcast) { - ::pthread_cond_signal(&m_pImpl->m_Condition); + if (broadcast) + ::pthread_cond_broadcast(&m_pImpl->m_Condition); + else + ::pthread_cond_signal(&m_pImpl->m_Condition); } void Condition::Wait() diff --git a/base/condition_windows_native.cpp b/base/condition_windows_native.cpp index 8bd8774fd7..143c8e868d 100644 --- a/base/condition_windows_native.cpp +++ b/base/condition_windows_native.cpp @@ -9,6 +9,7 @@ typedef void (WINAPI *InitFn)(PCONDITION_VARIABLE); typedef void (WINAPI *WakeFn)(PCONDITION_VARIABLE); +typedef void (WINAPI *WakeAllFn)(PCONDITION_VARIABLE); typedef BOOL (WINAPI *SleepFn)(PCONDITION_VARIABLE, PCRITICAL_SECTION, DWORD); namespace threads @@ -19,7 +20,7 @@ namespace threads { public: virtual ~ConditionImpl() {} - virtual void Signal() = 0; + virtual void Signal(bool broadcast) = 0; virtual void Wait() = 0; virtual void Lock() = 0; virtual void Unlock() = 0; @@ -29,21 +30,25 @@ namespace threads { InitFn m_pInit; WakeFn m_pWake; + WakeAllFn m_pWakeAll; SleepFn m_pSleep; CONDITION_VARIABLE m_Condition; Mutex m_mutex; public: - ImplWinVista(InitFn pInit, WakeFn pWake, SleepFn pSleep) - : m_pInit(pInit), m_pWake(pWake), m_pSleep(pSleep) + ImplWinVista(InitFn pInit, WakeFn pWake, WakeAllFn pWakeAll, SleepFn pSleep) + : m_pInit(pInit), m_pWake(pWake), m_pWakeAll(pWakeAll), m_pSleep(pSleep) { m_pInit(&m_Condition); } - void Signal() + void Signal(bool broadcast) { - m_pWake(&m_Condition); + if (broadcast) + m_pWakeAll(&m_Condition); + else + m_pWake(&m_Condition); } void Wait() @@ -105,15 +110,51 @@ namespace threads ::DeleteCriticalSection(&waiters_count_lock_); } - void Signal() + void Signal(bool broadcast) { - EnterCriticalSection(&waiters_count_lock_); - bool const have_waiters = waiters_count_ > 0; - LeaveCriticalSection(&waiters_count_lock_); + if (broadcast) + { + // This is needed to ensure that and are + // consistent relative to each other + EnterCriticalSection(&waiters_count_lock_); + int have_waiters = 0; - // If there aren't any waiters, then this is a no-op. - if (have_waiters) - ::ReleaseSemaphore(sema_, 1, 0); + if (waiters_count_ > 0) + { + // We are broadcasting, even if there is just one waiter... + // Record that we are broadcasting, which helps optimize + // for the non-broadcast case. + was_broadcast_ = 1; + have_waiters = 1; + } + + if (have_waiters) + { + // Wake up all the waiters atomically. + ReleaseSemaphore(sema_, waiters_count_, 0); + + LeaveCriticalSection(&waiters_count_lock_); + + // Wait for all the awakened threads to acquire the counting + // semaphore. + WaitForSingleObject(waiters_done_, INFINITE); + // This assignment is okay, wven without the held + // because no other waiter threads can wake up to access it. + was_broadcast_ = 0; + } + else + LeaveCriticalSection(&waiters_count_lock_); + } + else + { + EnterCriticalSection(&waiters_count_lock_); + bool const have_waiters = waiters_count_ > 0; + LeaveCriticalSection(&waiters_count_lock_); + + // If there aren't any waiters, then this is a no-op. + if (have_waiters) + ::ReleaseSemaphore(sema_, 1, 0); + } } void Wait() @@ -168,10 +209,11 @@ namespace threads HMODULE handle = GetModuleHandle(TEXT("kernel32.dll")); InitFn pInit = (InitFn)GetProcAddress(handle, "InitializeConditionVariable"); WakeFn pWake = (WakeFn)GetProcAddress(handle, "WakeConditionVariable"); + WakeAllFn pWakeAll = (WakeFn)GetProcAddress(handle, "WakeAllConditionVariable"); SleepFn pSleep = (SleepFn)GetProcAddress(handle, "SleepConditionVariableCS"); - if (pInit && pWake && pSleep) - m_pImpl = new impl::ImplWinVista(pInit, pWake, pSleep); + if (pInit && pWake && pWakeAll && pSleep) + m_pImpl = new impl::ImplWinVista(pInit, pWake, pWakeAll, pSleep); else m_pImpl = new impl::ImplWinXP(); } @@ -181,9 +223,9 @@ namespace threads delete m_pImpl; } - void Condition::Signal() + void Condition::Signal(bool broadcast) { - m_pImpl->Signal(); + m_pImpl->Signal(broadcast); } void Condition::Wait() diff --git a/base/object_pool.hpp b/base/object_pool.hpp new file mode 100644 index 0000000000..57e1350acb --- /dev/null +++ b/base/object_pool.hpp @@ -0,0 +1,68 @@ +#pragma once + +#include "condition.hpp" +#include "../std/list.hpp" + +/// multithreaded object pool. +/// implements condition waiting scheme, to save the CPU cycles. +template +class ObjectPool +{ +private: + + list m_List; + + threads::Condition m_Cond; + + bool m_IsCancelled; + +public: + + ObjectPool(list const & l) : m_List(l), m_IsCancelled(false) + {} + + T const Reserve() + { + threads::ConditionGuard Guard(m_Cond); + + while (m_List.empty()) + { + if (m_IsCancelled) + break; + m_Cond.Wait(); + } + + if (m_IsCancelled) + return T(); + + T res = m_List.front(); + m_List.pop_front(); + return res; + } + + /// cancel all waiting requests + void Cancel() + { + m_IsCancelled = true; + m_Cond.Signal(true); + } + + bool IsCancelled() const + { + return m_IsCancelled; + } + + void Free(T const & t) + { + threads::ConditionGuard Guard(m_Cond); + + bool DoSignal = m_List.empty(); + + m_List.push_back(t); + + if (DoSignal) + m_Cond.Signal(); //< this doesn't release the mutex associated with m_cond, + + /// we should return as quickly as possible to minimize "waked threads" waiting time + } +};