implemented generic CommandsQueue for multithreaded commands processing.

This commit is contained in:
rachytski 2011-08-22 18:32:11 +03:00 committed by Alex Zolotarev
parent f0eb269363
commit bef00e3fc5
3 changed files with 361 additions and 68 deletions

View file

@ -1,7 +1,80 @@
#include "../../testing/testing.hpp"
#include "../commands_queue.hpp"
#include "../macros.hpp"
#include "../../std/bind.hpp"
#include "../thread.hpp"
#include "../logging.hpp"
void add_int(core::CommandsQueue::Environment const & env, int & i, int a)
{
threads::Sleep(500);
if (env.IsCancelled())
return;
i += a;
LOG(LINFO, ("add_int result:", i));
}
void mul_int(core::CommandsQueue::Environment const & env, int & i, int b)
{
if (env.IsCancelled())
return;
i *= b;
LOG(LINFO, ("mul_int result:", i));
}
UNIT_TEST(CommandsQueue_SetupAndPerformSimpleTask)
{
core::CommandsQueue queue(1);
int i = 3;
queue.Start();
queue.AddCommand(bind(&add_int, _1, ref(i), 5));
queue.AddCommand(bind(&mul_int, _1, ref(i), 3));
queue.Join();
// threads::Sleep(1000);
queue.Cancel();
TEST(i == 24, ());
}
UNIT_TEST(CommandsQueue_SetupAndPerformSimpleTaskWith2Executors)
{
core::CommandsQueue queue(2);
int i = 3;
queue.Start();
queue.AddCommand(bind(&add_int, _1, ref(i), 5));
queue.AddCommand(bind(&mul_int, _1, ref(i), 3));
queue.Join();
// threads::Sleep(1000);
queue.Cancel();
TEST(i == 14, ());
}
UNIT_TEST(CommandsQueue_TestEnvironmentCancellation)
{
core::CommandsQueue queue(2);
int i = 3;
queue.Start();
queue.AddCommand(bind(&add_int, _1, ref(i), 5));
queue.AddCommand(bind(&mul_int, _1, ref(i), 3));
threads::Sleep(200); //< after this second command is executed, first will be cancelled
queue.Cancel();
TEST(i == 9, ());
}

View file

@ -4,52 +4,166 @@
#include "commands_queue.hpp"
namespace threads
namespace core
{
void CommandsQueueRoutine::Do()
CommandsQueue::Environment::Environment(int threadNum)
: m_threadNum(threadNum), m_isCancelled(false)
{}
void CommandsQueue::Environment::Cancel()
{
while (!IsCancelled())
m_isCancelled = true;
}
bool CommandsQueue::Environment::IsCancelled() const
{
return m_isCancelled;
}
int CommandsQueue::Environment::GetThreadNum() const
{
return m_threadNum;
}
CommandsQueue::Chain::Chain()
{}
void CommandsQueue::Chain::operator()(CommandsQueue::Environment const & env)
{
for (list<function_t>::const_iterator it = m_fns.begin(); it != m_fns.end(); ++it)
{
command_t cmd;
{
threads::ConditionGuard guard(m_condition);
while (m_commands.empty())
{
guard.Wait();
/// could be awaken on empty queue
/// with the purpose to finish thread execution.
if (IsCancelled())
return;
}
cmd = m_commands.front();
m_commands.pop_front();
}
/// command is executed without holding the m_condition lock
/// to allow other threads to add another command while the
/// current one is executing
LOG(LINFO, ("Processing Command"));
cmd();
(*it)(env);
if (env.IsCancelled())
break;
}
}
void CommandsQueueRoutine::Cancel()
CommandsQueue::Routine::Routine() : m_env(-1)
{}
CommandsQueue::Routine::Routine(CommandsQueue * parent, int idx)
: m_parent(parent), m_idx(idx), m_env(idx)
{}
void CommandsQueue::Routine::Do()
{
IRoutine::Cancel();
if (m_commands.empty())
m_condition.Signal();
// performing initialization tasks
for(list<Command>::const_iterator it = m_parent->m_initCommands.begin();
it != m_parent->m_initCommands.end();
++it)
it->m_fn(m_env);
// main loop
while (!IsCancelled())
{
CommandsQueue::Command cmd = m_parent->m_commands.Front(true);
if (m_parent->m_commands.IsCancelled())
break;
cmd.m_fn(m_env);
if (m_env.IsCancelled())
break;
m_parent->FinishCommand();
}
// performing finalization tasks
for(list<Command>::const_iterator it = m_parent->m_finCommands.begin();
it != m_parent->m_finCommands.end();
++it)
it->m_fn(m_env);
}
CommandsQueue::CommandsQueue()
: m_routine(new CommandsQueueRoutine())
void CommandsQueue::Routine::Cancel()
{
m_thread.Create(m_routine);
m_env.Cancel();
IRoutine::Cancel();
}
CommandsQueue::Executor::Executor() : m_routine(0)
{}
void CommandsQueue::Executor::Cancel()
{
if (m_routine != 0)
m_thread.Cancel();
}
CommandsQueue::CommandsQueue(size_t executorsCount)
: m_executorsCount(executorsCount), m_cmdId(0), m_activeCommands(0)
{
m_executors = new Executor[executorsCount];
m_executorsCount = executorsCount;
}
CommandsQueue::~CommandsQueue()
{}
void CommandsQueue::Cancel()
{
m_thread.Cancel();
m_commands.Cancel();
for (size_t i = 0; i < m_executorsCount; ++i)
m_executors[i].Cancel();
delete [] m_executors;
m_executors = 0;
}
void CommandsQueue::Start()
{
for (size_t i = 0; i < m_executorsCount; ++i)
{
m_executors[i].m_routine = new CommandsQueue::Routine(this, i);
m_executors[i].m_thread.Create(m_executors[i].m_routine);
}
}
void CommandsQueue::AddCommand(Command const & cmd)
{
threads::ConditionGuard g(m_cond);
m_commands.PushBack(cmd);
++m_activeCommands;
}
void CommandsQueue::AddInitCommand(Command const & cmd)
{
m_initCommands.push_back(cmd);
}
void CommandsQueue::AddFinCommand(Command const & cmd)
{
m_finCommands.push_back(cmd);
}
void CommandsQueue::Clear()
{
threads::ConditionGuard g(m_cond);
m_commands.Clear();
m_activeCommands = 0;
if (m_activeCommands == 0)
m_cond.Signal();
}
void CommandsQueue::FinishCommand()
{
threads::ConditionGuard g(m_cond);
--m_activeCommands;
if (m_activeCommands == 0)
m_cond.Signal();
}
void CommandsQueue::Join()
{
threads::ConditionGuard g(m_cond);
if (m_activeCommands != 0)
m_cond.Wait();
}
int CommandsQueue::ExecutorsCount() const
{
return m_executorsCount;
}
}

View file

@ -1,53 +1,159 @@
#pragma once
#include "../base/thread.hpp"
#include "../std/function.hpp"
#include "../std/list.hpp"
#include "../std/shared_ptr.hpp"
#include "../base/condition.hpp"
#include "../std/vector.hpp"
namespace threads
#include "thread.hpp"
#include "threaded_list.hpp"
namespace core
{
class CommandsQueueRoutine : public threads::IRoutine
{
private:
typedef function<void()> command_t;
list<command_t> m_commands;
threads::Condition m_condition;
public:
template <typename command_tt>
void addCommand(command_tt cmd)
{
threads::ConditionGuard guard(m_condition);
bool needToSignal = m_commands.empty();
m_commands.push_back(cmd);
if (needToSignal)
guard.Signal();
};
void Do();
void Cancel();
};
class CommandsQueue
{
private:
CommandsQueueRoutine * m_routine;
threads::Thread m_thread;
class Routine;
public:
CommandsQueue();
class Command;
/// execution environment for single command
class Environment
{
private:
int m_threadNum;
bool m_isCancelled;
protected:
explicit Environment(int threadNum);
void Cancel();
friend class Routine;
public:
int GetThreadNum() const; //< number of thread, that is executing the command
bool IsCancelled() const; //< command should ping this flag to see, whether it should cancel execution
};
/// single commmand
typedef function<void(Environment const &)> function_t;
struct Chain
{
list<function_t> m_fns;
Chain();
template <typename fun_tt>
Chain(fun_tt fn)
{
m_fns.push_back(fn);
}
template <typename fun_tt>
Chain & addCommand(fun_tt fn)
{
m_fns.push_back(fn);
return *this;
}
void operator()(Environment const & env);
};
struct Command
{
uint64_t m_id;
function_t m_fn;
Command() : m_id(-1)
{}
template <typename tt>
Command(uint64_t id, tt t)
: m_id(id), m_fn(t)
{}
};
private:
class Routine : public threads::IRoutine
{
private:
CommandsQueue * m_parent;
int m_idx;
Environment m_env;
public:
Routine();
Routine(CommandsQueue * parent, int idx);
void Do();
void Cancel();
};
struct Executor
{
threads::Thread m_thread;
Routine * m_routine;
Executor();
void Cancel();
};
Executor * m_executors;
size_t m_executorsCount;
ThreadedList<Command> m_commands;
uint64_t m_cmdId;
list<Command> m_initCommands;
list<Command> m_finCommands;
friend class Routine;
threads::Condition m_cond;
size_t m_activeCommands;
void FinishCommand();
CommandsQueue(CommandsQueue const &);
CommandsQueue const & operator=(CommandsQueue const &);
public:
CommandsQueue(size_t executorsCount);
~CommandsQueue();
int ExecutorsCount() const;
void AddCommand(Command const & cmd);
void AddInitCommand(Command const & cmd);
void AddFinCommand(Command const & cmd);
void Start();
void Clear();
void Cancel();
void Join();
template<typename command_tt>
void addCommand(command_tt cmd)
void AddCommand(command_tt cmd)
{
m_routine->addCommand(cmd);
};
AddCommand(Command(m_cmdId++, cmd));
}
template <typename command_tt>
void AddInitCommand(command_tt cmd)
{
AddInitCommand(Command(m_cmdId++, cmd));
}
template <typename command_tt>
void AddFinCommand(command_tt cmd)
{
AddFinCommand(Command(m_cmdId++, cmd));
}
};
}