added ThreadedPriorityQueue.

This commit is contained in:
rachytski 2011-09-28 20:27:35 +03:00 committed by Alex Zolotarev
parent 655562b2e5
commit 702e3f6237
3 changed files with 160 additions and 5 deletions

View file

@ -71,3 +71,5 @@ HEADERS += \
threaded_list.hpp \
resource_pool.hpp \
limited_priority_queue.hpp \
threaded_priority_queue.hpp

View file

@ -2,19 +2,21 @@
#include "../../testing/testing.hpp"
#include "../threaded_list.hpp"
#include "../threaded_priority_queue.hpp"
#include "../thread.hpp"
#include "../../base/logging.hpp"
struct ProcessorThread : public threads::IRoutine
struct ThreadedListProcessor : public threads::IRoutine
{
ThreadedList<int> * m_p;
int m_data;
list<int> * m_res;
int m_id;
ProcessorThread(ThreadedList<int> * p, list<int> * res, int id) : m_p(p), m_res(res), m_id(id)
ThreadedListProcessor(ThreadedList<int> * p, list<int> * res, int id)
: m_p(p), m_res(res), m_id(id)
{}
virtual void Do()
@ -30,6 +32,31 @@ struct ProcessorThread : public threads::IRoutine
}
};
struct ThreadedPriorityQueueProcessor : public threads::IRoutine
{
ThreadedPriorityQueue<int> * m_p;
int m_data;
list<int> * m_res;
int m_id;
ThreadedPriorityQueueProcessor(ThreadedPriorityQueue<int> * p, list<int> * res, int id)
: m_p(p), m_res(res), m_id(id)
{}
virtual void Do()
{
while (!m_p->IsCancelled())
{
int res = m_p->Top(true);
m_res->push_back(res);
LOG(LINFO, (m_id, " thread got ", res));
threads::Sleep(10);
}
LOG(LINFO, (m_id, " thread is cancelled"));
}
};
UNIT_TEST(ThreadedList)
{
list<int> l;
@ -38,13 +65,13 @@ UNIT_TEST(ThreadedList)
ThreadedList<int> p;
threads::Thread t0;
t0.Create(new ProcessorThread(&p, &res, 0));
t0.Create(new ThreadedListProcessor(&p, &res, 0));
threads::Thread t1;
t1.Create(new ProcessorThread(&p, &res, 1));
t1.Create(new ThreadedListProcessor(&p, &res, 1));
threads::Thread t2;
t2.Create(new ProcessorThread(&p, &res, 2));
t2.Create(new ThreadedListProcessor(&p, &res, 2));
p.PushBack(0);
threads::Sleep(200);
@ -68,3 +95,42 @@ UNIT_TEST(ThreadedList)
t1.Join();
t2.Join();
}
UNIT_TEST(ThreadedPriorityQueue)
{
list<int> res;
ThreadedPriorityQueue<int> p;
threads::Thread t0;
t0.Create(new ThreadedPriorityQueueProcessor(&p, &res, 0));
threads::Thread t1;
t1.Create(new ThreadedPriorityQueueProcessor(&p, &res, 1));
threads::Thread t2;
t2.Create(new ThreadedPriorityQueueProcessor(&p, &res, 2));
p.Push(0);
threads::Sleep(200);
p.Push(1);
threads::Sleep(200);
p.Push(2);
threads::Sleep(200);
TEST_EQUAL(res.front(), 0, ());
res.pop_front();
TEST_EQUAL(res.front(), 1, ());
res.pop_front();
TEST_EQUAL(res.front(), 2, ());
res.pop_front();
p.Cancel();
t0.Join();
t1.Join();
t2.Join();
}

View file

@ -0,0 +1,87 @@
#pragma once
#include "threaded_container.hpp"
#include "../std/queue.hpp"
template <typename T>
class ThreadedPriorityQueue : public ThreadedContainer
{
private:
priority_queue<T> m_queue;
public:
template <typename Fn>
void ProcessQueue(Fn const & fn)
{
threads::ConditionGuard g(m_Cond);
bool hadElements = !m_queue.empty();
fn(m_queue);
bool hasElements = !m_queue.empty();
if (!hadElements && hasElements)
m_Cond.Signal();
}
void Push(T const & t)
{
threads::ConditionGuard g(m_Cond);
bool doSignal = m_queue.empty();
m_queue.push(t);
if (doSignal)
m_Cond.Signal();
}
bool WaitNonEmpty()
{
double StartWaitTime = m_Timer.ElapsedSeconds();
while (m_queue.empty())
{
if (IsCancelled())
break;
m_Cond.Wait();
}
m_WaitTime += m_Timer.ElapsedSeconds() - StartWaitTime;
if (IsCancelled())
return true;
return false;
}
T const & Top(bool doPop)
{
threads::ConditionGuard g(m_Cond);
if (WaitNonEmpty())
return T();
T res = m_queue.top();
if (doPop)
m_queue.pop();
return res;
}
bool Empty() const
{
threads::ConditionGuard g(m_Cond);
return m_queue.empty();
}
void Clear()
{
threads::ConditionGuard g(m_Cond);
while (!m_queue.empty())
m_queue.pop();
}
};