From ab205eeae163a8b80ed2de3945a33247a7895058 Mon Sep 17 00:00:00 2001 From: Maksim Andrianov Date: Thu, 17 Jan 2019 12:47:17 +0300 Subject: [PATCH] Review fixes --- .../primitive_thread_pool_tests.cpp | 27 +++---- base/primitive_thread_pool.hpp | 81 ++++++++++++------- base/thread_utils.hpp | 26 +++--- 3 files changed, 81 insertions(+), 53 deletions(-) diff --git a/base/base_tests/primitive_thread_pool_tests.cpp b/base/base_tests/primitive_thread_pool_tests.cpp index 6180c82c1c..6ca1e5420f 100644 --- a/base/base_tests/primitive_thread_pool_tests.cpp +++ b/base/base_tests/primitive_thread_pool_tests.cpp @@ -1,13 +1,14 @@ #include "testing/testing.hpp" +#include "base/primitive_thread_pool.hpp" + +#include #include #include #include #include #include -#include "base/primitive_thread_pool.hpp" - namespace { size_t const kTimes = 500; @@ -17,16 +18,14 @@ UNIT_TEST(PrimitiveThreadPool_SomeThreads) { for (size_t t = 0; t < kTimes; ++t) { - size_t threadCount = 4; - size_t counter = 0; + size_t const threadCount = 4; + std::atomic counter{0}; { - std::mutex mutex; threads::PrimitiveThreadPool threadPool(threadCount); for (size_t i = 0; i < threadCount; ++i) { threadPool.Submit([&]() { std::this_thread::sleep_for(std::chrono::milliseconds(1)); - std::lock_guard lock(mutex); ++counter; }); } @@ -40,16 +39,14 @@ UNIT_TEST(PrimitiveThreadPool_OneThread) { for (size_t t = 0; t < kTimes; ++t) { - size_t threadCount = 1; - size_t counter = 0; + size_t const threadCount = 1; + std::atomic counter{0}; { - std::mutex mutex; threads::PrimitiveThreadPool threadPool(threadCount); for (size_t i = 0; i < threadCount; ++i) { threadPool.Submit([&]() { std::this_thread::sleep_for(std::chrono::milliseconds(1)); - std::lock_guard lock(mutex); ++counter; }); } @@ -66,15 +63,13 @@ UNIT_TEST(PrimitiveThreadPool_ManyThread) size_t threadCount = std::thread::hardware_concurrency(); CHECK_NOT_EQUAL(threadCount, 0, ()); threadCount *= 2; - size_t counter = 0; + std::atomic counter{0}; { - std::mutex mutex; threads::PrimitiveThreadPool threadPool(threadCount); for (size_t i = 0; i < threadCount; ++i) { threadPool.Submit([&]() { std::this_thread::sleep_for(std::chrono::milliseconds(1)); - std::lock_guard lock(mutex); ++counter; }); } @@ -88,7 +83,7 @@ UNIT_TEST(PrimitiveThreadPool_ReturnValue) { for (size_t t = 0; t < kTimes; ++t) { - size_t threadCount = 4; + size_t const threadCount = 4; threads::PrimitiveThreadPool threadPool(threadCount); std::vector> futures; for (size_t i = 0; i < threadCount; ++i) @@ -111,15 +106,13 @@ UNIT_TEST(PrimitiveThreadPool_ManyTasks) for (size_t t = 0; t < kTimes; ++t) { size_t taskCount = 11; - size_t counter = 0; + std::atomic counter{0}; { - std::mutex mutex; threads::PrimitiveThreadPool threadPool(4); for (size_t i = 0; i < taskCount; ++i) { threadPool.Submit([&]() { std::this_thread::sleep_for(std::chrono::milliseconds(1)); - std::lock_guard lock(mutex); ++counter; }); } diff --git a/base/primitive_thread_pool.hpp b/base/primitive_thread_pool.hpp index e86d88e5ab..33dd9696a9 100644 --- a/base/primitive_thread_pool.hpp +++ b/base/primitive_thread_pool.hpp @@ -16,38 +16,36 @@ namespace threads // PrimitiveThreadPool is needed for easy parallelization of tasks. // PrimitiveThreadPool can accept tasks that return result as std::future. // When the destructor is called, all threads will join. -// -// Usage example: -// size_t threadCount = 4; -// size_t counter = 0; -// { -// std::mutex mutex; -// threads::PrimitiveThreadPool threadPool(threadCount); -// for (size_t i = 0; i < threadCount; ++i) -// { -// threadPool.Submit([&]() { -// std::this_thread::sleep_for(std::chrono::milliseconds(1)); -// std::lock_guard lock(mutex); -// ++counter; -// }); -// } -// } -// TEST_EQUAL(threadCount, counter, ()); -// +// Warning: ThreadPool works with std::thread instead of SimpleThread and therefore +// should not be used when the JVM is needed. class PrimitiveThreadPool { public: - using FuntionType = FunctionWrapper; + using FunctionType = FunctionWrapper; using Threads = std::vector; + // Constructs a ThreadPool. + // threadCount - number of threads used by the thread pool. + // Warning: The constructor may throw exceptions. PrimitiveThreadPool(size_t threadCount) : m_done(false), m_joiner(m_threads) { CHECK_GREATER(threadCount, 0, ()); - for (size_t i = 0; i < threadCount; i++) - m_threads.push_back(std::thread(&PrimitiveThreadPool::Worker, this)); + m_threads.reserve(threadCount); + try + { + for (size_t i = 0; i < threadCount; i++) + m_threads.emplace_back(&PrimitiveThreadPool::Worker, this); + } + catch (...) // std::system_error etc. + { + Stop(); + throw; + } } + // Destroys the ThreadPool. + // This function will block until all runnables have been completed. ~PrimitiveThreadPool() { { @@ -57,27 +55,51 @@ public: m_condition.notify_all(); } - template - auto Submit(F && func, Args &&... args) ->std::future + // Submit task for execution. + // func - task to be performed. + // args - arguments for func. + // The function will return the object future. + // Warning: If the thread pool is stopped then the call will be ignored. + template + auto Submit(F && func, Args &&... args) -> std::future { + { + std::unique_lock lock(m_mutex); + if (m_done) + return {}; + } using ResultType = decltype(func(args...)); std::packaged_task task(std::bind(std::forward(func), std::forward(args)...)); std::future result(task.get_future()); { std::unique_lock lock(m_mutex); - m_queue.push(std::move(task)); + m_queue.emplace(std::move(task)); } m_condition.notify_one(); return result; } + // Stop a ThreadPool. + // Removes the tasks that are not yet started from the queue. + // This function will not block until all runnables have been completed. + void Stop() + { + { + std::unique_lock lock(m_mutex); + auto empty = std::queue(); + m_queue.swap(empty); + m_done = true; + } + m_condition.notify_all(); + } + private: void Worker() { while (true) { - FuntionType task; + FunctionType task; { std::unique_lock lock(m_mutex); m_condition.wait(lock, [&] { @@ -87,6 +109,9 @@ private: if (m_done && m_queue.empty()) return; + // It may seem that at this point the queue may be empty, provided that m_done == false and + // m_queue.empty() == true. But it is not possible that the queue is not empty guarantees + // check in m_condition.wait. task = std::move(m_queue.front()); m_queue.pop(); } @@ -98,8 +123,10 @@ private: bool m_done; std::mutex m_mutex; std::condition_variable m_condition; - std::queue m_queue; + std::queue m_queue; Threads m_threads; - StandartThreadsJoiner m_joiner; + ThreadsJoiner<> m_joiner; }; + + } // namespace threads diff --git a/base/thread_utils.hpp b/base/thread_utils.hpp index 553103677d..8253b44e29 100644 --- a/base/thread_utils.hpp +++ b/base/thread_utils.hpp @@ -3,15 +3,16 @@ #include #include -#include +#include "base/macros.hpp" namespace threads { -template> +template > class ThreadsJoiner { public: - explicit ThreadsJoiner(ThreadColl & threads) : m_threads(threads) {} + explicit ThreadsJoiner(ThreadContainer & threads) : m_threads(threads) {} + ~ThreadsJoiner() { for (auto & thread : m_threads) @@ -22,19 +23,22 @@ public: } private: - ThreadColl & m_threads; + ThreadContainer & m_threads; }; -using StandartThreadsJoiner = ThreadsJoiner<>; - -class FunctionWrapper : boost::noncopyable +// This class is needed in ThreadPool to store std::packaged_task<> objects. +// std::packaged_task<> isn’t copyable so we have to use std::move(). +// This idea is borrowed from the book C++ Concurrency in action by Anthony Williams (Chapter 9). +class FunctionWrapper { public: - template + template FunctionWrapper(F && func) : m_impl(new ImplType(std::move(func))) {} + FunctionWrapper() = default; FunctionWrapper(FunctionWrapper && other) : m_impl(std::move(other.m_impl)) {} + FunctionWrapper & operator=(FunctionWrapper && other) { m_impl = std::move(other.m_impl); @@ -47,18 +51,22 @@ private: struct ImplBase { virtual ~ImplBase() = default; + virtual void Call() = 0; }; - template + template struct ImplType : ImplBase { ImplType(F && func) : m_func(std::move(func)) {} + void Call() override { m_func(); } F m_func; }; std::unique_ptr m_impl; + + DISALLOW_COPY(FunctionWrapper); }; } // namespace threads