diff --git a/base/condition.hpp b/base/condition.hpp index 7290ce9093..4462b1fe45 100644 --- a/base/condition.hpp +++ b/base/condition.hpp @@ -2,11 +2,15 @@ namespace threads { + namespace impl + { + class ConditionImpl; + } + /// Implements mutexed condition semantics class Condition { - class Impl; - Impl * m_pImpl; + impl::ConditionImpl * m_pImpl; public: Condition(); @@ -22,7 +26,8 @@ namespace threads class ConditionGuard { public: - ConditionGuard(Condition & condition): m_Condition(condition) { m_Condition.Lock(); } + ConditionGuard(Condition & condition) + : m_Condition(condition) { m_Condition.Lock(); } ~ConditionGuard() { m_Condition.Unlock(); } void Wait() { m_Condition.Wait(); } void Signal() { m_Condition.Signal(); } diff --git a/base/condition_bada.cpp b/base/condition_bada.cpp index 68a683d55a..938185d3b6 100644 --- a/base/condition_bada.cpp +++ b/base/condition_bada.cpp @@ -8,13 +8,16 @@ namespace threads { - class Condition::Impl + namespace impl { - public: - Osp::Base::Runtime::Monitor m_Monitor; - }; + class ConditionImpl + { + public: + Osp::Base::Runtime::Monitor m_Monitor; + }; + } - Condition::Condition() : m_pImpl(new Condition::Impl()) + Condition::Condition() : m_pImpl(new impl::ConditionImpl()) { m_pImpl->m_Monitor.Construct(); } diff --git a/base/condition_posix.cpp b/base/condition_posix.cpp index abbc1f5560..c5087eb359 100644 --- a/base/condition_posix.cpp +++ b/base/condition_posix.cpp @@ -9,14 +9,17 @@ namespace threads { - class Condition::Impl + namespace impl { - public: - Mutex m_Mutex; - pthread_cond_t m_Condition; - }; - - Condition::Condition() : m_pImpl(new Condition::Impl) + class ConditionImpl + { + public: + Mutex m_Mutex; + pthread_cond_t m_Condition; + }; + } + + Condition::Condition() : m_pImpl(new impl::ConditionImpl) { ::pthread_cond_init(&m_pImpl->m_Condition, 0); } diff --git a/base/condition_windows_native.cpp b/base/condition_windows_native.cpp index 8abced5c9f..8bd8774fd7 100644 --- a/base/condition_windows_native.cpp +++ b/base/condition_windows_native.cpp @@ -7,19 +7,173 @@ #include "../std/windows.hpp" +typedef void (WINAPI *InitFn)(PCONDITION_VARIABLE); +typedef void (WINAPI *WakeFn)(PCONDITION_VARIABLE); +typedef BOOL (WINAPI *SleepFn)(PCONDITION_VARIABLE, PCRITICAL_SECTION, DWORD); + namespace threads { - /// Implements mutexed condition semantics - class Condition::Impl + namespace impl { - public: - CONDITION_VARIABLE m_Condition; - Mutex m_mutex; - }; - - Condition::Condition() : m_pImpl(new Condition::Impl()) + class ConditionImpl + { + public: + virtual ~ConditionImpl() {} + virtual void Signal() = 0; + virtual void Wait() = 0; + virtual void Lock() = 0; + virtual void Unlock() = 0; + }; + + class ImplWinVista : public ConditionImpl + { + InitFn m_pInit; + WakeFn m_pWake; + SleepFn m_pSleep; + + CONDITION_VARIABLE m_Condition; + Mutex m_mutex; + + public: + ImplWinVista(InitFn pInit, WakeFn pWake, SleepFn pSleep) + : m_pInit(pInit), m_pWake(pWake), m_pSleep(pSleep) + { + m_pInit(&m_Condition); + } + + void Signal() + { + m_pWake(&m_Condition); + } + + void Wait() + { + m_pSleep(&m_Condition, &m_mutex.m_Mutex, INFINITE); + } + + void Lock() + { + m_mutex.Lock(); + } + + void Unlock() + { + m_mutex.Unlock(); + } + }; + + /////////////////////////////////////////////////////////////// + /// Based on Richter's SignalObjectAndWait solution + class ImplWinXP : public ConditionImpl + { + /// Number of waiting threads + int waiters_count_; + /// Serialize access to + CRITICAL_SECTION waiters_count_lock_; + /// Semaphore used to queue up threads waiting for the condition to + /// become signaled + HANDLE sema_; + /// An auto-reset event used by the broadcast/signal thread to wait + /// for all the waiting thread(s) to wake up and be released from the + /// semaphore + HANDLE waiters_done_; + /// Keeps track of whether we were broadcasting or signaling. This + /// allows us to optimize the code if we're just signaling + size_t was_broadcast_; + + HANDLE m_mutex; + + public: + ImplWinXP() : waiters_count_(0), was_broadcast_(0) + { + ::InitializeCriticalSection(&waiters_count_lock_); + m_mutex = ::CreateMutexA(NULL, FALSE, NULL); + + sema_ = ::CreateSemaphore(NULL, // no security + 0, // initially 0 + 0x7fffffff, // max count + NULL); // unnamed + waiters_done_ = CreateEvent(NULL, // no security + FALSE, // auto-reset + FALSE, // non-signaled initially + NULL); // unnamed + } + + ~ImplWinXP() + { + ::CloseHandle(m_mutex); + ::DeleteCriticalSection(&waiters_count_lock_); + } + + void Signal() + { + EnterCriticalSection(&waiters_count_lock_); + bool const have_waiters = waiters_count_ > 0; + LeaveCriticalSection(&waiters_count_lock_); + + // If there aren't any waiters, then this is a no-op. + if (have_waiters) + ::ReleaseSemaphore(sema_, 1, 0); + } + + void Wait() + { + // Avoid race conditions + ::EnterCriticalSection(&waiters_count_lock_); + ++waiters_count_; + ::LeaveCriticalSection(&waiters_count_lock_); + + // This call atomically releases the mutex and waits on the + // semaphore until or + // are called by another thread + ::SignalObjectAndWait(m_mutex, sema_, INFINITE, FALSE); + + // Reacquire lock to avoid race conditions + ::EnterCriticalSection(&waiters_count_lock_); + + // We're no longer waiting... + --waiters_count_; + + // Check to see if we're the last waiter after . + bool const last_waiter = was_broadcast_ && waiters_count_ == 0; + + ::LeaveCriticalSection(&waiters_count_lock_); + + // If we're the last waiter thread during this particular broadcast + // then let all the other threads proceed + if (last_waiter) + // This call atomically signals the event and waits until + // it can acquire the . This is required to ensure fairness. + ::SignalObjectAndWait(waiters_done_, m_mutex, INFINITE, FALSE); + else + // Always regain the external mutex since that's the guarantee we + // give to our callers. + ::WaitForSingleObject(m_mutex, INFINITE); + } + + void Lock() + { + ::WaitForSingleObject(m_mutex, INFINITE); + } + + void Unlock() + { + ::ReleaseMutex(m_mutex); + } + }; + } + /////////////////////////////////////////////////////////////// + Condition::Condition() { - ::InitializeConditionVariable(&m_pImpl->m_Condition); + HMODULE handle = GetModuleHandle(TEXT("kernel32.dll")); + InitFn pInit = (InitFn)GetProcAddress(handle, "InitializeConditionVariable"); + WakeFn pWake = (WakeFn)GetProcAddress(handle, "WakeConditionVariable"); + SleepFn pSleep = (SleepFn)GetProcAddress(handle, "SleepConditionVariableCS"); + + if (pInit && pWake && pSleep) + m_pImpl = new impl::ImplWinVista(pInit, pWake, pSleep); + else + m_pImpl = new impl::ImplWinXP(); } Condition::~Condition() @@ -29,22 +183,22 @@ namespace threads void Condition::Signal() { - ::WakeConditionVariable(&m_pImpl->m_Condition); + m_pImpl->Signal(); } void Condition::Wait() { - ::SleepConditionVariableCS(&m_pImpl->m_Condition, &m_pImpl->m_mutex.m_Mutex, INFINITE); + m_pImpl->Wait(); } void Condition::Lock() { - m_pImpl->m_mutex.Lock(); + m_pImpl->Lock(); } void Condition::Unlock() { - m_pImpl->m_mutex.Unlock(); + m_pImpl->Unlock(); } } diff --git a/base/mutex.hpp b/base/mutex.hpp index 4b5bef2080..bd359dfd60 100644 --- a/base/mutex.hpp +++ b/base/mutex.hpp @@ -12,6 +12,13 @@ namespace threads { + class Condition; + namespace impl + { + class ConditionImpl; + class ImplWinVista; + } + /// Mutex primitive, used only for synchronizing this process threads /// based on Critical Section under Win32 and pthreads under Linux /// @author Siarhei Rachytski @@ -32,7 +39,9 @@ namespace threads Mutex const & operator=(Mutex const &); Mutex(Mutex const &); - friend class Condition; + friend class threads::impl::ConditionImpl; + friend class threads::impl::ImplWinVista; + friend class threads::Condition; public: