From 702e3f6237fa70cfde3da1b5a5670ae9d481a284 Mon Sep 17 00:00:00 2001 From: rachytski Date: Wed, 28 Sep 2011 20:27:35 +0300 Subject: [PATCH] added ThreadedPriorityQueue. --- base/base.pro | 2 + base/base_tests/threaded_list_test.cpp | 76 ++++++++++++++++++++-- base/threaded_priority_queue.hpp | 87 ++++++++++++++++++++++++++ 3 files changed, 160 insertions(+), 5 deletions(-) create mode 100644 base/threaded_priority_queue.hpp diff --git a/base/base.pro b/base/base.pro index 8e47863b97..3a3ec284ec 100644 --- a/base/base.pro +++ b/base/base.pro @@ -71,3 +71,5 @@ HEADERS += \ threaded_list.hpp \ resource_pool.hpp \ limited_priority_queue.hpp \ + threaded_priority_queue.hpp + diff --git a/base/base_tests/threaded_list_test.cpp b/base/base_tests/threaded_list_test.cpp index 531f7aaa41..48de443bce 100644 --- a/base/base_tests/threaded_list_test.cpp +++ b/base/base_tests/threaded_list_test.cpp @@ -2,19 +2,21 @@ #include "../../testing/testing.hpp" #include "../threaded_list.hpp" +#include "../threaded_priority_queue.hpp" #include "../thread.hpp" #include "../../base/logging.hpp" -struct ProcessorThread : public threads::IRoutine +struct ThreadedListProcessor : public threads::IRoutine { ThreadedList * m_p; int m_data; list * m_res; int m_id; - ProcessorThread(ThreadedList * p, list * res, int id) : m_p(p), m_res(res), m_id(id) + ThreadedListProcessor(ThreadedList * p, list * res, int id) + : m_p(p), m_res(res), m_id(id) {} virtual void Do() @@ -30,6 +32,31 @@ struct ProcessorThread : public threads::IRoutine } }; +struct ThreadedPriorityQueueProcessor : public threads::IRoutine +{ + ThreadedPriorityQueue * m_p; + int m_data; + list * m_res; + int m_id; + + ThreadedPriorityQueueProcessor(ThreadedPriorityQueue * p, list * res, int id) + : m_p(p), m_res(res), m_id(id) + {} + + virtual void Do() + { + while (!m_p->IsCancelled()) + { + int res = m_p->Top(true); + m_res->push_back(res); + LOG(LINFO, (m_id, " thread got ", res)); + threads::Sleep(10); + } + LOG(LINFO, (m_id, " thread is cancelled")); + } +}; + + UNIT_TEST(ThreadedList) { list l; @@ -38,13 +65,13 @@ UNIT_TEST(ThreadedList) ThreadedList p; threads::Thread t0; - t0.Create(new ProcessorThread(&p, &res, 0)); + t0.Create(new ThreadedListProcessor(&p, &res, 0)); threads::Thread t1; - t1.Create(new ProcessorThread(&p, &res, 1)); + t1.Create(new ThreadedListProcessor(&p, &res, 1)); threads::Thread t2; - t2.Create(new ProcessorThread(&p, &res, 2)); + t2.Create(new ThreadedListProcessor(&p, &res, 2)); p.PushBack(0); threads::Sleep(200); @@ -68,3 +95,42 @@ UNIT_TEST(ThreadedList) t1.Join(); t2.Join(); } + +UNIT_TEST(ThreadedPriorityQueue) +{ + list res; + + ThreadedPriorityQueue p; + + threads::Thread t0; + t0.Create(new ThreadedPriorityQueueProcessor(&p, &res, 0)); + + threads::Thread t1; + t1.Create(new ThreadedPriorityQueueProcessor(&p, &res, 1)); + + threads::Thread t2; + t2.Create(new ThreadedPriorityQueueProcessor(&p, &res, 2)); + + p.Push(0); + threads::Sleep(200); + + p.Push(1); + threads::Sleep(200); + + p.Push(2); + threads::Sleep(200); + + TEST_EQUAL(res.front(), 0, ()); + res.pop_front(); + TEST_EQUAL(res.front(), 1, ()); + res.pop_front(); + TEST_EQUAL(res.front(), 2, ()); + res.pop_front(); + + p.Cancel(); + + t0.Join(); + t1.Join(); + t2.Join(); +} + diff --git a/base/threaded_priority_queue.hpp b/base/threaded_priority_queue.hpp new file mode 100644 index 0000000000..4a531817c8 --- /dev/null +++ b/base/threaded_priority_queue.hpp @@ -0,0 +1,87 @@ +#pragma once + +#include "threaded_container.hpp" +#include "../std/queue.hpp" + +template +class ThreadedPriorityQueue : public ThreadedContainer +{ +private: + priority_queue m_queue; +public: + + template + void ProcessQueue(Fn const & fn) + { + threads::ConditionGuard g(m_Cond); + + bool hadElements = !m_queue.empty(); + + fn(m_queue); + + bool hasElements = !m_queue.empty(); + + if (!hadElements && hasElements) + m_Cond.Signal(); + } + + void Push(T const & t) + { + threads::ConditionGuard g(m_Cond); + + bool doSignal = m_queue.empty(); + + m_queue.push(t); + + if (doSignal) + m_Cond.Signal(); + } + + bool WaitNonEmpty() + { + double StartWaitTime = m_Timer.ElapsedSeconds(); + + while (m_queue.empty()) + { + if (IsCancelled()) + break; + + m_Cond.Wait(); + } + + m_WaitTime += m_Timer.ElapsedSeconds() - StartWaitTime; + + if (IsCancelled()) + return true; + + return false; + } + + T const & Top(bool doPop) + { + threads::ConditionGuard g(m_Cond); + + if (WaitNonEmpty()) + return T(); + + T res = m_queue.top(); + + if (doPop) + m_queue.pop(); + + return res; + } + + bool Empty() const + { + threads::ConditionGuard g(m_Cond); + return m_queue.empty(); + } + + void Clear() + { + threads::ConditionGuard g(m_Cond); + while (!m_queue.empty()) + m_queue.pop(); + } +};