From bef00e3fc5b75cdbbcc256406aa85322afa40bc8 Mon Sep 17 00:00:00 2001 From: rachytski Date: Mon, 22 Aug 2011 18:32:11 +0300 Subject: [PATCH] implemented generic CommandsQueue for multithreaded commands processing. --- base/base_tests/commands_queue_test.cpp | 73 ++++++++++ base/commands_queue.cpp | 180 +++++++++++++++++++----- base/commands_queue.hpp | 176 ++++++++++++++++++----- 3 files changed, 361 insertions(+), 68 deletions(-) diff --git a/base/base_tests/commands_queue_test.cpp b/base/base_tests/commands_queue_test.cpp index dc301d5de0..0743043a02 100644 --- a/base/base_tests/commands_queue_test.cpp +++ b/base/base_tests/commands_queue_test.cpp @@ -1,7 +1,80 @@ #include "../../testing/testing.hpp" #include "../commands_queue.hpp" #include "../macros.hpp" +#include "../../std/bind.hpp" +#include "../thread.hpp" +#include "../logging.hpp" + +void add_int(core::CommandsQueue::Environment const & env, int & i, int a) +{ + threads::Sleep(500); + if (env.IsCancelled()) + return; + i += a; + LOG(LINFO, ("add_int result:", i)); +} + +void mul_int(core::CommandsQueue::Environment const & env, int & i, int b) +{ + if (env.IsCancelled()) + return; + i *= b; + LOG(LINFO, ("mul_int result:", i)); +} UNIT_TEST(CommandsQueue_SetupAndPerformSimpleTask) { + core::CommandsQueue queue(1); + + int i = 3; + + queue.Start(); + + queue.AddCommand(bind(&add_int, _1, ref(i), 5)); + queue.AddCommand(bind(&mul_int, _1, ref(i), 3)); + + queue.Join(); + +// threads::Sleep(1000); + + queue.Cancel(); + + TEST(i == 24, ()); +} + +UNIT_TEST(CommandsQueue_SetupAndPerformSimpleTaskWith2Executors) +{ + core::CommandsQueue queue(2); + + int i = 3; + + queue.Start(); + + queue.AddCommand(bind(&add_int, _1, ref(i), 5)); + queue.AddCommand(bind(&mul_int, _1, ref(i), 3)); + + queue.Join(); +// threads::Sleep(1000); + + queue.Cancel(); + + TEST(i == 14, ()); +} + +UNIT_TEST(CommandsQueue_TestEnvironmentCancellation) +{ + core::CommandsQueue queue(2); + + int i = 3; + + queue.Start(); + + queue.AddCommand(bind(&add_int, _1, ref(i), 5)); + queue.AddCommand(bind(&mul_int, _1, ref(i), 3)); + + threads::Sleep(200); //< after this second command is executed, first will be cancelled + + queue.Cancel(); + + TEST(i == 9, ()); } diff --git a/base/commands_queue.cpp b/base/commands_queue.cpp index cea15fec38..fd0dae7a9b 100644 --- a/base/commands_queue.cpp +++ b/base/commands_queue.cpp @@ -4,52 +4,166 @@ #include "commands_queue.hpp" -namespace threads +namespace core { - void CommandsQueueRoutine::Do() + CommandsQueue::Environment::Environment(int threadNum) + : m_threadNum(threadNum), m_isCancelled(false) + {} + + void CommandsQueue::Environment::Cancel() { - while (!IsCancelled()) + m_isCancelled = true; + } + + bool CommandsQueue::Environment::IsCancelled() const + { + return m_isCancelled; + } + + int CommandsQueue::Environment::GetThreadNum() const + { + return m_threadNum; + } + + CommandsQueue::Chain::Chain() + {} + + void CommandsQueue::Chain::operator()(CommandsQueue::Environment const & env) + { + for (list::const_iterator it = m_fns.begin(); it != m_fns.end(); ++it) { - command_t cmd; - { - threads::ConditionGuard guard(m_condition); - - while (m_commands.empty()) - { - guard.Wait(); - /// could be awaken on empty queue - /// with the purpose to finish thread execution. - if (IsCancelled()) - return; - } - - cmd = m_commands.front(); - m_commands.pop_front(); - } - - /// command is executed without holding the m_condition lock - /// to allow other threads to add another command while the - /// current one is executing - LOG(LINFO, ("Processing Command")); - cmd(); + (*it)(env); + if (env.IsCancelled()) + break; } } - void CommandsQueueRoutine::Cancel() + CommandsQueue::Routine::Routine() : m_env(-1) + {} + + CommandsQueue::Routine::Routine(CommandsQueue * parent, int idx) + : m_parent(parent), m_idx(idx), m_env(idx) + {} + + void CommandsQueue::Routine::Do() { - IRoutine::Cancel(); - if (m_commands.empty()) - m_condition.Signal(); + // performing initialization tasks + for(list::const_iterator it = m_parent->m_initCommands.begin(); + it != m_parent->m_initCommands.end(); + ++it) + it->m_fn(m_env); + + // main loop + while (!IsCancelled()) + { + CommandsQueue::Command cmd = m_parent->m_commands.Front(true); + + if (m_parent->m_commands.IsCancelled()) + break; + + cmd.m_fn(m_env); + + if (m_env.IsCancelled()) + break; + + m_parent->FinishCommand(); + } + + // performing finalization tasks + for(list::const_iterator it = m_parent->m_finCommands.begin(); + it != m_parent->m_finCommands.end(); + ++it) + it->m_fn(m_env); } - CommandsQueue::CommandsQueue() - : m_routine(new CommandsQueueRoutine()) + void CommandsQueue::Routine::Cancel() { - m_thread.Create(m_routine); + m_env.Cancel(); + IRoutine::Cancel(); + } + + CommandsQueue::Executor::Executor() : m_routine(0) + {} + + void CommandsQueue::Executor::Cancel() + { + if (m_routine != 0) + m_thread.Cancel(); + } + + CommandsQueue::CommandsQueue(size_t executorsCount) + : m_executorsCount(executorsCount), m_cmdId(0), m_activeCommands(0) + { + m_executors = new Executor[executorsCount]; + m_executorsCount = executorsCount; } CommandsQueue::~CommandsQueue() + {} + + void CommandsQueue::Cancel() { - m_thread.Cancel(); + m_commands.Cancel(); + + for (size_t i = 0; i < m_executorsCount; ++i) + m_executors[i].Cancel(); + + delete [] m_executors; + m_executors = 0; + } + + void CommandsQueue::Start() + { + for (size_t i = 0; i < m_executorsCount; ++i) + { + m_executors[i].m_routine = new CommandsQueue::Routine(this, i); + m_executors[i].m_thread.Create(m_executors[i].m_routine); + } + } + + void CommandsQueue::AddCommand(Command const & cmd) + { + threads::ConditionGuard g(m_cond); + m_commands.PushBack(cmd); + ++m_activeCommands; + } + + void CommandsQueue::AddInitCommand(Command const & cmd) + { + m_initCommands.push_back(cmd); + } + + void CommandsQueue::AddFinCommand(Command const & cmd) + { + m_finCommands.push_back(cmd); + } + + void CommandsQueue::Clear() + { + threads::ConditionGuard g(m_cond); + m_commands.Clear(); + m_activeCommands = 0; + if (m_activeCommands == 0) + m_cond.Signal(); + } + + void CommandsQueue::FinishCommand() + { + threads::ConditionGuard g(m_cond); + --m_activeCommands; + if (m_activeCommands == 0) + m_cond.Signal(); + } + + void CommandsQueue::Join() + { + threads::ConditionGuard g(m_cond); + if (m_activeCommands != 0) + m_cond.Wait(); + } + + int CommandsQueue::ExecutorsCount() const + { + return m_executorsCount; } } diff --git a/base/commands_queue.hpp b/base/commands_queue.hpp index 5fb6db5e4a..067fffb866 100644 --- a/base/commands_queue.hpp +++ b/base/commands_queue.hpp @@ -1,53 +1,159 @@ #pragma once -#include "../base/thread.hpp" #include "../std/function.hpp" -#include "../std/list.hpp" -#include "../std/shared_ptr.hpp" -#include "../base/condition.hpp" +#include "../std/vector.hpp" -namespace threads +#include "thread.hpp" +#include "threaded_list.hpp" + +namespace core { - class CommandsQueueRoutine : public threads::IRoutine - { - private: - - typedef function command_t; - list m_commands; - - threads::Condition m_condition; - - public: - template - void addCommand(command_tt cmd) - { - threads::ConditionGuard guard(m_condition); - bool needToSignal = m_commands.empty(); - m_commands.push_back(cmd); - if (needToSignal) - guard.Signal(); - }; - - void Do(); - void Cancel(); - }; - class CommandsQueue { private: - CommandsQueueRoutine * m_routine; - threads::Thread m_thread; + class Routine; public: - CommandsQueue(); + class Command; + + /// execution environment for single command + class Environment + { + private: + + int m_threadNum; + bool m_isCancelled; + + protected: + + explicit Environment(int threadNum); + void Cancel(); + + friend class Routine; + + public: + + int GetThreadNum() const; //< number of thread, that is executing the command + bool IsCancelled() const; //< command should ping this flag to see, whether it should cancel execution + }; + + /// single commmand + typedef function function_t; + + struct Chain + { + list m_fns; + + Chain(); + + template + Chain(fun_tt fn) + { + m_fns.push_back(fn); + } + + template + Chain & addCommand(fun_tt fn) + { + m_fns.push_back(fn); + return *this; + } + + void operator()(Environment const & env); + }; + + struct Command + { + uint64_t m_id; + function_t m_fn; + + Command() : m_id(-1) + {} + + template + Command(uint64_t id, tt t) + : m_id(id), m_fn(t) + {} + }; + + private: + + class Routine : public threads::IRoutine + { + private: + + CommandsQueue * m_parent; + int m_idx; + Environment m_env; + + public: + + Routine(); + Routine(CommandsQueue * parent, int idx); + + void Do(); + void Cancel(); + }; + + struct Executor + { + threads::Thread m_thread; + Routine * m_routine; + Executor(); + void Cancel(); + }; + + Executor * m_executors; + size_t m_executorsCount; + ThreadedList m_commands; + uint64_t m_cmdId; + + + list m_initCommands; + list m_finCommands; + + friend class Routine; + + threads::Condition m_cond; + size_t m_activeCommands; + void FinishCommand(); + + CommandsQueue(CommandsQueue const &); + CommandsQueue const & operator=(CommandsQueue const &); + + public: + + CommandsQueue(size_t executorsCount); ~CommandsQueue(); + int ExecutorsCount() const; + void AddCommand(Command const & cmd); + void AddInitCommand(Command const & cmd); + void AddFinCommand(Command const & cmd); + void Start(); + void Clear(); + void Cancel(); + void Join(); + template - void addCommand(command_tt cmd) + void AddCommand(command_tt cmd) { - m_routine->addCommand(cmd); - }; + AddCommand(Command(m_cmdId++, cmd)); + } + + template + void AddInitCommand(command_tt cmd) + { + AddInitCommand(Command(m_cmdId++, cmd)); + } + + template + void AddFinCommand(command_tt cmd) + { + AddFinCommand(Command(m_cmdId++, cmd)); + } + }; }