From b6d99ee255e7a9c8ce8f17e2335a2ab0ea82422e Mon Sep 17 00:00:00 2001 From: ExMix Date: Mon, 28 Oct 2013 12:30:47 +0300 Subject: [PATCH] [drape] new thread pool --- base/base.pro | 2 + base/thread.hpp | 4 +- base/thread_pool.cpp | 112 +++++++++++++++++++++++++++++++++++++++++++ base/thread_pool.hpp | 23 +++++++++ 4 files changed, 138 insertions(+), 3 deletions(-) create mode 100644 base/thread_pool.cpp create mode 100644 base/thread_pool.hpp diff --git a/base/base.pro b/base/base.pro index e4b5e962f5..02d9b0e6e4 100644 --- a/base/base.pro +++ b/base/base.pro @@ -28,6 +28,7 @@ SOURCES += \ string_format.cpp \ object_tracker.cpp \ scheduled_task.cpp \ + thread_pool.cpp HEADERS += \ SRC_FIRST.hpp \ @@ -76,3 +77,4 @@ HEADERS += \ object_tracker.hpp \ regexp.hpp \ scheduled_task.hpp \ + thread_pool.hpp diff --git a/base/thread.hpp b/base/thread.hpp index b7c9a21124..f3b26a5cbb 100644 --- a/base/thread.hpp +++ b/base/thread.hpp @@ -17,9 +17,6 @@ namespace threads private: bool m_isCancelled; - protected: - inline bool IsCancelled() const { return m_isCancelled; } - public: IRoutine() : m_isCancelled(false) {} virtual ~IRoutine() {} @@ -30,6 +27,7 @@ namespace threads /// Implement this function to respond to the cancellation event. /// Cancellation means that IRoutine should exit as fast as possible. virtual void Cancel() { m_isCancelled = true; } + inline bool IsCancelled() const { return m_isCancelled; } }; class ThreadImpl; diff --git a/base/thread_pool.cpp b/base/thread_pool.cpp new file mode 100644 index 0000000000..4071d6d3c4 --- /dev/null +++ b/base/thread_pool.cpp @@ -0,0 +1,112 @@ +#include "thread_pool.hpp" + +#include "thread.hpp" +#include "threaded_list.hpp" + +#include "../std/vector.hpp" +#include "../std/utility.hpp" +#include "../std/bind.hpp" + +namespace threads +{ + namespace + { + typedef function pop_routine_fn; + class PoolRoutine : public IRoutine + { + public: + PoolRoutine(pop_routine_fn popFn, finish_routine_fn finishFn) + : m_popFn(popFn) + , m_finishFn(finishFn) + { + } + + virtual void Do() + { + while (!IsCancelled()) + { + threads::IRoutine * task = m_popFn(); + if (task == NULL) + { + Cancel(); + continue; + } + + if (!task->IsCancelled()) + task->Do(); + m_finishFn(task); + } + } + + private: + pop_routine_fn m_popFn; + finish_routine_fn m_finishFn; + }; + } + + class ThreadPool::Impl + { + public: + Impl(size_t size, finish_routine_fn finishFn) + { + m_threads.reserve(size); + for (size_t i = 0; i < size; ++i) + { + thread_info_t info = make_pair(new threads::Thread(), new PoolRoutine(bind(&ThreadPool::Impl::PopFront, this), + finishFn)); + info.first->Create(info.second); + m_threads[i] = info; + } + } + + ~Impl() + { + Stop(); + } + + void PushBack(threads::IRoutine * routine) + { + m_tasks.PushBack(routine); + } + + threads::IRoutine * PopFront() + { + return m_tasks.Front(true); + } + + void Stop() + { + m_tasks.Cancel(); + m_tasks.Clear(); + + for (size_t i = 0; i < m_threads.size(); ++i) + m_threads[i].second->Cancel(); + + for (size_t i = 0; i < m_threads.size(); ++i) + { + m_threads[i].first->Cancel(); + delete m_threads[i].second; + delete m_threads[i].first; + } + } + + private: + ThreadedList m_tasks; + + typedef pair thread_info_t; + vector m_threads; + }; + + ThreadPool::ThreadPool(size_t size, finish_routine_fn finishFn) + : m_impl(new Impl(size, finishFn)) {} + + void ThreadPool::AddTask(threads::IRoutine * routine) + { + m_impl->PushBack(routine); + } + + void ThreadPool::Stop() + { + m_impl->Stop(); + } +} diff --git a/base/thread_pool.hpp b/base/thread_pool.hpp new file mode 100644 index 0000000000..db83492735 --- /dev/null +++ b/base/thread_pool.hpp @@ -0,0 +1,23 @@ +#pragma once + +#include "../std/function.hpp" + +namespace threads +{ + class IRoutine; + + typedef function finish_routine_fn; + + class ThreadPool + { + public: + ThreadPool(size_t size, finish_routine_fn finishFn); + + void AddTask(threads::IRoutine * routine); + void Stop(); + + private: + class Impl; + Impl * m_impl; + }; +}