From a3cd0a94ca8c7b3069adb088d0eecd26c2a0cfff Mon Sep 17 00:00:00 2001 From: rachytski Date: Sat, 26 Nov 2011 17:51:02 +0400 Subject: [PATCH] made core::CommandQueue::Command joinable and wrote simple test for it. --- base/base_tests/commands_queue_test.cpp | 42 +++++++++-- base/commands_queue.cpp | 82 +++++++++++++++++----- base/commands_queue.hpp | 92 +++++++++++++++++-------- map/events.hpp | 2 +- map/tile_renderer.cpp | 8 +-- 5 files changed, 170 insertions(+), 56 deletions(-) diff --git a/base/base_tests/commands_queue_test.cpp b/base/base_tests/commands_queue_test.cpp index 0743043a02..8125f5c764 100644 --- a/base/base_tests/commands_queue_test.cpp +++ b/base/base_tests/commands_queue_test.cpp @@ -5,21 +5,33 @@ #include "../thread.hpp" #include "../logging.hpp" -void add_int(core::CommandsQueue::Environment const & env, int & i, int a) +void add_int(core::CommandsQueue::Environment const & env, + int & i, + int a) { threads::Sleep(500); - if (env.IsCancelled()) + if (env.isCancelled()) return; i += a; LOG(LINFO, ("add_int result:", i)); } +void join_mul_int(core::CommandsQueue::Environment const & env, + shared_ptr const & command, + int & i, + int b) +{ + command->join(); + i *= b; + LOG(LINFO, ("join_mul_int result: ", i)); +} + void mul_int(core::CommandsQueue::Environment const & env, int & i, int b) { - if (env.IsCancelled()) + if (env.isCancelled()) return; i *= b; - LOG(LINFO, ("mul_int result:", i)); + LOG(LINFO, ("mul_int result: ", i)); } UNIT_TEST(CommandsQueue_SetupAndPerformSimpleTask) @@ -78,3 +90,25 @@ UNIT_TEST(CommandsQueue_TestEnvironmentCancellation) TEST(i == 9, ()); } + +UNIT_TEST(CommandsQueue_TestJoinCommand) +{ + core::CommandsQueue queue0(1); + core::CommandsQueue queue1(1); + + int i = 3; + + queue0.Start(); + queue1.Start(); + + shared_ptr cmd = queue0.AddCommand(bind(&add_int, _1, ref(i), 5), true); + queue1.AddCommand(bind(&join_mul_int, _1, cmd, ref(i), 3), false); + + queue0.Join(); + queue1.Join(); + + queue0.Cancel(); + queue1.Cancel(); + + TEST(i == 24, ("core::CommandsQueue::Command::join doesn't work")); +} diff --git a/base/commands_queue.cpp b/base/commands_queue.cpp index 8595fbfa0c..acdad4dd6b 100644 --- a/base/commands_queue.cpp +++ b/base/commands_queue.cpp @@ -1,6 +1,7 @@ #include "../base/SRC_FIRST.hpp" #include "../base/logging.hpp" +#include "../base/assert.hpp" #include "commands_queue.hpp" @@ -10,21 +11,53 @@ namespace core : m_threadNum(threadNum), m_isCancelled(false) {} - void CommandsQueue::Environment::Cancel() + void CommandsQueue::Environment::cancel() { m_isCancelled = true; } - bool CommandsQueue::Environment::IsCancelled() const + bool CommandsQueue::Environment::isCancelled() const { return m_isCancelled; } - int CommandsQueue::Environment::GetThreadNum() const + int CommandsQueue::Environment::threadNum() const { return m_threadNum; } + CommandsQueue::BaseCommand::BaseCommand(bool isWaitable) + : m_waitCount(0), m_isCompleted(false) + { + if (isWaitable) + m_cond.reset(new threads::Condition()); + } + + void CommandsQueue::BaseCommand::join() + { + if (m_cond) + { + threads::ConditionGuard g(*m_cond.get()); + m_waitCount++; + if (!m_isCompleted) + m_cond->Wait(); + } + else + LOG(LERROR, ("command isn't waitable")); + } + + void CommandsQueue::BaseCommand::finish() const + { + if (m_cond) + { + threads::ConditionGuard g(*m_cond.get()); + m_isCompleted = true; + CHECK(m_waitCount < 2, ("only one thread could wait for the queued command")); + if (m_waitCount) + m_cond->Signal(true); + } + } + CommandsQueue::Chain::Chain() {} @@ -33,11 +66,22 @@ namespace core for (list::const_iterator it = m_fns.begin(); it != m_fns.end(); ++it) { (*it)(env); - if (env.IsCancelled()) + + if (env.isCancelled()) break; } } + CommandsQueue::Command::Command(bool isWaitable) + : BaseCommand(isWaitable) + {} + + void CommandsQueue::Command::perform(Environment const & env) const + { + m_fn(env); + finish(); + } + CommandsQueue::Routine::Routine(CommandsQueue * parent, int idx) : m_parent(parent), m_idx(idx), m_env(idx) {} @@ -45,49 +89,49 @@ namespace core void CommandsQueue::Routine::Do() { // performing initialization tasks - for(list::const_iterator it = m_parent->m_initCommands.begin(); + for(list >::const_iterator it = m_parent->m_initCommands.begin(); it != m_parent->m_initCommands.end(); ++it) - it->m_fn(m_env); + (*it)->perform(m_env); // main loop while (!IsCancelled()) { - CommandsQueue::Command cmd = m_parent->m_commands.Front(true); + shared_ptr cmd = m_parent->m_commands.Front(true); if (m_parent->m_commands.IsCancelled()) break; m_env.m_isCancelled = false; - cmd.m_fn(m_env); + cmd->perform(m_env); m_parent->FinishCommand(); } // performing finalization tasks - for(list::const_iterator it = m_parent->m_finCommands.begin(); + for(list >::const_iterator it = m_parent->m_finCommands.begin(); it != m_parent->m_finCommands.end(); ++it) - it->m_fn(m_env); + (*it)->perform(m_env); } void CommandsQueue::Routine::Cancel() { - m_env.Cancel(); + m_env.cancel(); // performing cancellation tasks - for(list::const_iterator it = m_parent->m_cancelCommands.begin(); + for(list >::const_iterator it = m_parent->m_cancelCommands.begin(); it != m_parent->m_cancelCommands.end(); ++it) - it->m_fn(m_env); + (*it)->perform(m_env); IRoutine::Cancel(); } void CommandsQueue::Routine::CancelCommand() { - m_env.Cancel(); + m_env.cancel(); } CommandsQueue::Executor::Executor() : m_routine(0) @@ -106,7 +150,7 @@ namespace core CommandsQueue::CommandsQueue(size_t executorsCount) : m_executors(new Executor[executorsCount]), m_executorsCount(executorsCount), - m_cmdId(0), m_activeCommands(0) + m_activeCommands(0) { } @@ -141,24 +185,24 @@ namespace core } } - void CommandsQueue::AddCommand(Command const & cmd) + void CommandsQueue::AddCommand(shared_ptr const & cmd) { threads::ConditionGuard g(m_cond); m_commands.PushBack(cmd); ++m_activeCommands; } - void CommandsQueue::AddInitCommand(Command const & cmd) + void CommandsQueue::AddInitCommand(shared_ptr const & cmd) { m_initCommands.push_back(cmd); } - void CommandsQueue::AddFinCommand(Command const & cmd) + void CommandsQueue::AddFinCommand(shared_ptr const & cmd) { m_finCommands.push_back(cmd); } - void CommandsQueue::AddCancelCommand(Command const & cmd) + void CommandsQueue::AddCancelCommand(shared_ptr const & cmd) { m_cancelCommands.push_back(cmd); } diff --git a/base/commands_queue.hpp b/base/commands_queue.hpp index d293d6d6aa..df1ce22caa 100644 --- a/base/commands_queue.hpp +++ b/base/commands_queue.hpp @@ -2,6 +2,7 @@ #include "../std/function.hpp" #include "../std/vector.hpp" +#include "../std/shared_ptr.hpp" #include "thread.hpp" #include "threaded_list.hpp" @@ -35,25 +36,51 @@ namespace core protected: explicit Environment(int threadNum); - void Cancel(); + + void cancel(); //< call this from the control thread + //< to cancel execution of the tasks friend class Routine; public: - int GetThreadNum() const; //< number of thread executing the commands - bool IsCancelled() const; //< command should ping this flag to see, + int threadNum() const; //< number of the thread, which is executing the commands + bool isCancelled() const; //< command should ping this flag to see, // whether it should cancel execution }; /// single commmand typedef function function_t; + /// basic command + /// - could wait for the completion of its execution + struct BaseCommand + { + shared_ptr m_cond; + mutable int m_waitCount; + mutable bool m_isCompleted; + + /// should we create the threads::Condition ? + /// this flag is used to save resources + BaseCommand(bool isWaitable); + + /// call this function when the execution + /// of the command is finished to release the waiters. + void finish() const; + + /// @warning only single thread could "join" command + void join(); + }; + /// chain of commands struct Chain { + private: + list m_fns; + public: + Chain(); template @@ -73,20 +100,23 @@ namespace core }; /// single command. - /// - could be chained together, using Chain class - struct Command + /// - commands could be chained together, using Chain class + struct Command : BaseCommand { - uint64_t m_id; + private: + function_t m_fn; - Command() : m_id(static_cast(-1)) - {} + public: + Command(bool isWaitable = false); template - Command(uint64_t id, tt t) - : m_id(id), m_fn(t) + Command(tt t, bool isWaitable = false) + : BaseCommand(isWaitable), m_fn(t) {} + + void perform(Environment const & env) const; }; private: @@ -121,12 +151,11 @@ namespace core Executor * m_executors; size_t m_executorsCount; - ThreadedList m_commands; - uint64_t m_cmdId; + ThreadedList > m_commands; - list m_initCommands; - list m_finCommands; - list m_cancelCommands; + list > m_initCommands; + list > m_finCommands; + list > m_cancelCommands; friend class Routine; @@ -147,10 +176,10 @@ namespace core /// Adding different types of commands /// @{ - void AddCommand(Command const & cmd); - void AddInitCommand(Command const & cmd); - void AddFinCommand(Command const & cmd); - void AddCancelCommand(Command const & cmd); + void AddCommand(shared_ptr const & cmd); + void AddInitCommand(shared_ptr const & cmd); + void AddFinCommand(shared_ptr const & cmd); + void AddCancelCommand(shared_ptr const & cmd); /// @} void Start(); @@ -160,28 +189,35 @@ namespace core void Clear(); template - void AddCommand(command_tt cmd) + shared_ptr AddCommand(command_tt cmd, bool isWaitable = false) { - AddCommand(Command(m_cmdId++, cmd)); + shared_ptr pcmd(new Command(cmd, isWaitable)); + AddCommand(pcmd); + return pcmd; } template - void AddInitCommand(command_tt cmd) + shared_ptr AddInitCommand(command_tt cmd, bool isWaitable = false) { - AddInitCommand(Command(m_cmdId++, cmd)); + shared_ptr const pcmd(new Command(cmd, isWaitable)); + AddInitCommand(pcmd); + return pcmd; } template - void AddFinCommand(command_tt cmd) + shared_ptr const AddFinCommand(command_tt cmd, bool isWaitable = false) { - AddFinCommand(Command(m_cmdId++, cmd)); + shared_ptr pcmd(new Command(cmd, isWaitable)); + AddFinCommand(pcmd); + return pcmd; } template - void AddCancelCommand(command_tt cmd) + shared_ptr const AddCancelCommand(command_tt cmd, bool isWaitable = false) { - AddCancelCommand(Command(m_cmdId++, cmd)); + shared_ptr pcmd(new Command(cmd, isWaitable)); + AddCancelCommand(pcmd); + return pcmd; } - }; } diff --git a/map/events.hpp b/map/events.hpp index e7adbfce4b..a05f37c579 100644 --- a/map/events.hpp +++ b/map/events.hpp @@ -77,7 +77,7 @@ public: bool isCancelled() const { if (m_env) - return m_env->IsCancelled(); + return m_env->isCancelled(); else return m_isCancelled; } diff --git a/map/tile_renderer.cpp b/map/tile_renderer.cpp index bb242e7880..7a3fbdcae2 100644 --- a/map/tile_renderer.cpp +++ b/map/tile_renderer.cpp @@ -81,7 +81,7 @@ void TileRenderer::CancelThread(core::CommandsQueue::Environment const & /*env*/ void TileRenderer::InitializeThreadGL(core::CommandsQueue::Environment const & env) { - ThreadData & threadData = m_threadData[env.GetThreadNum()]; + ThreadData & threadData = m_threadData[env.threadNum()]; threadData.m_renderContext->makeCurrent(); threadData.m_drawer = new DrawerYG(threadData.m_drawerParams); @@ -89,7 +89,7 @@ void TileRenderer::InitializeThreadGL(core::CommandsQueue::Environment const & e void TileRenderer::FinalizeThreadGL(core::CommandsQueue::Environment const & env) { - ThreadData & threadData = m_threadData[env.GetThreadNum()]; + ThreadData & threadData = m_threadData[env.threadNum()]; threadData.m_renderContext->endThreadDrawing(); @@ -108,7 +108,7 @@ void TileRenderer::DrawTile(core::CommandsQueue::Environment const & env, if (HasTile(rectInfo)) return; - ThreadData & threadData = m_threadData[env.GetThreadNum()]; + ThreadData & threadData = m_threadData[env.threadNum()]; DrawerYG * drawer = threadData.m_drawer; @@ -171,7 +171,7 @@ void TileRenderer::DrawTile(core::CommandsQueue::Environment const & env, double duration = timer.ElapsedSeconds(); - if (env.IsCancelled()) + if (env.isCancelled()) m_resourceManager->renderTargetTextures()->Free(tileTarget); else AddTile(rectInfo, Tile(tileTarget, tileInfoLayer, frameScreen, rectInfo, duration));