Review fixes

This commit is contained in:
Maksim Andrianov 2019-01-17 12:47:17 +03:00 committed by mpimenov
parent 573844522a
commit ab205eeae1
3 changed files with 81 additions and 53 deletions

View file

@ -1,13 +1,14 @@
#include "testing/testing.hpp"
#include "base/primitive_thread_pool.hpp"
#include <atomic>
#include <chrono>
#include <future>
#include <mutex>
#include <thread>
#include <vector>
#include "base/primitive_thread_pool.hpp"
namespace
{
size_t const kTimes = 500;
@ -17,16 +18,14 @@ UNIT_TEST(PrimitiveThreadPool_SomeThreads)
{
for (size_t t = 0; t < kTimes; ++t)
{
size_t threadCount = 4;
size_t counter = 0;
size_t const threadCount = 4;
std::atomic<size_t> counter{0};
{
std::mutex mutex;
threads::PrimitiveThreadPool threadPool(threadCount);
for (size_t i = 0; i < threadCount; ++i)
{
threadPool.Submit([&]() {
std::this_thread::sleep_for(std::chrono::milliseconds(1));
std::lock_guard<std::mutex> lock(mutex);
++counter;
});
}
@ -40,16 +39,14 @@ UNIT_TEST(PrimitiveThreadPool_OneThread)
{
for (size_t t = 0; t < kTimes; ++t)
{
size_t threadCount = 1;
size_t counter = 0;
size_t const threadCount = 1;
std::atomic<size_t> counter{0};
{
std::mutex mutex;
threads::PrimitiveThreadPool threadPool(threadCount);
for (size_t i = 0; i < threadCount; ++i)
{
threadPool.Submit([&]() {
std::this_thread::sleep_for(std::chrono::milliseconds(1));
std::lock_guard<std::mutex> lock(mutex);
++counter;
});
}
@ -66,15 +63,13 @@ UNIT_TEST(PrimitiveThreadPool_ManyThread)
size_t threadCount = std::thread::hardware_concurrency();
CHECK_NOT_EQUAL(threadCount, 0, ());
threadCount *= 2;
size_t counter = 0;
std::atomic<size_t> counter{0};
{
std::mutex mutex;
threads::PrimitiveThreadPool threadPool(threadCount);
for (size_t i = 0; i < threadCount; ++i)
{
threadPool.Submit([&]() {
std::this_thread::sleep_for(std::chrono::milliseconds(1));
std::lock_guard<std::mutex> lock(mutex);
++counter;
});
}
@ -88,7 +83,7 @@ UNIT_TEST(PrimitiveThreadPool_ReturnValue)
{
for (size_t t = 0; t < kTimes; ++t)
{
size_t threadCount = 4;
size_t const threadCount = 4;
threads::PrimitiveThreadPool threadPool(threadCount);
std::vector<std::future<size_t>> futures;
for (size_t i = 0; i < threadCount; ++i)
@ -111,15 +106,13 @@ UNIT_TEST(PrimitiveThreadPool_ManyTasks)
for (size_t t = 0; t < kTimes; ++t)
{
size_t taskCount = 11;
size_t counter = 0;
std::atomic<size_t> counter{0};
{
std::mutex mutex;
threads::PrimitiveThreadPool threadPool(4);
for (size_t i = 0; i < taskCount; ++i)
{
threadPool.Submit([&]() {
std::this_thread::sleep_for(std::chrono::milliseconds(1));
std::lock_guard<std::mutex> lock(mutex);
++counter;
});
}

View file

@ -16,38 +16,36 @@ namespace threads
// PrimitiveThreadPool is needed for easy parallelization of tasks.
// PrimitiveThreadPool can accept tasks that return result as std::future.
// When the destructor is called, all threads will join.
//
// Usage example:
// size_t threadCount = 4;
// size_t counter = 0;
// {
// std::mutex mutex;
// threads::PrimitiveThreadPool threadPool(threadCount);
// for (size_t i = 0; i < threadCount; ++i)
// {
// threadPool.Submit([&]() {
// std::this_thread::sleep_for(std::chrono::milliseconds(1));
// std::lock_guard<std::mutex> lock(mutex);
// ++counter;
// });
// }
// }
// TEST_EQUAL(threadCount, counter, ());
//
// Warning: ThreadPool works with std::thread instead of SimpleThread and therefore
// should not be used when the JVM is needed.
class PrimitiveThreadPool
{
public:
using FuntionType = FunctionWrapper;
using FunctionType = FunctionWrapper;
using Threads = std::vector<std::thread>;
// Constructs a ThreadPool.
// threadCount - number of threads used by the thread pool.
// Warning: The constructor may throw exceptions.
PrimitiveThreadPool(size_t threadCount) : m_done(false), m_joiner(m_threads)
{
CHECK_GREATER(threadCount, 0, ());
for (size_t i = 0; i < threadCount; i++)
m_threads.push_back(std::thread(&PrimitiveThreadPool::Worker, this));
m_threads.reserve(threadCount);
try
{
for (size_t i = 0; i < threadCount; i++)
m_threads.emplace_back(&PrimitiveThreadPool::Worker, this);
}
catch (...) // std::system_error etc.
{
Stop();
throw;
}
}
// Destroys the ThreadPool.
// This function will block until all runnables have been completed.
~PrimitiveThreadPool()
{
{
@ -57,27 +55,51 @@ public:
m_condition.notify_all();
}
template<typename F, typename... Args>
auto Submit(F && func, Args &&... args) ->std::future<decltype(func(args...))>
// Submit task for execution.
// func - task to be performed.
// args - arguments for func.
// The function will return the object future.
// Warning: If the thread pool is stopped then the call will be ignored.
template <typename F, typename... Args>
auto Submit(F && func, Args &&... args) -> std::future<decltype(func(args...))>
{
{
std::unique_lock<std::mutex> lock(m_mutex);
if (m_done)
return {};
}
using ResultType = decltype(func(args...));
std::packaged_task<ResultType()> task(std::bind(std::forward<F>(func),
std::forward<Args>(args)...));
std::future<ResultType> result(task.get_future());
{
std::unique_lock<std::mutex> lock(m_mutex);
m_queue.push(std::move(task));
m_queue.emplace(std::move(task));
}
m_condition.notify_one();
return result;
}
// Stop a ThreadPool.
// Removes the tasks that are not yet started from the queue.
// This function will not block until all runnables have been completed.
void Stop()
{
{
std::unique_lock<std::mutex> lock(m_mutex);
auto empty = std::queue<FunctionType>();
m_queue.swap(empty);
m_done = true;
}
m_condition.notify_all();
}
private:
void Worker()
{
while (true)
{
FuntionType task;
FunctionType task;
{
std::unique_lock<std::mutex> lock(m_mutex);
m_condition.wait(lock, [&] {
@ -87,6 +109,9 @@ private:
if (m_done && m_queue.empty())
return;
// It may seem that at this point the queue may be empty, provided that m_done == false and
// m_queue.empty() == true. But it is not possible that the queue is not empty guarantees
// check in m_condition.wait.
task = std::move(m_queue.front());
m_queue.pop();
}
@ -98,8 +123,10 @@ private:
bool m_done;
std::mutex m_mutex;
std::condition_variable m_condition;
std::queue<FuntionType> m_queue;
std::queue<FunctionType> m_queue;
Threads m_threads;
StandartThreadsJoiner m_joiner;
ThreadsJoiner<> m_joiner;
};
} // namespace threads

View file

@ -3,15 +3,16 @@
#include <thread>
#include <vector>
#include <boost/noncopyable.hpp>
#include "base/macros.hpp"
namespace threads
{
template<typename Thread = std::thread, typename ThreadColl= std::vector<Thread>>
template <typename Thread = std::thread, typename ThreadContainer = std::vector<Thread>>
class ThreadsJoiner
{
public:
explicit ThreadsJoiner(ThreadColl & threads) : m_threads(threads) {}
explicit ThreadsJoiner(ThreadContainer & threads) : m_threads(threads) {}
~ThreadsJoiner()
{
for (auto & thread : m_threads)
@ -22,19 +23,22 @@ public:
}
private:
ThreadColl & m_threads;
ThreadContainer & m_threads;
};
using StandartThreadsJoiner = ThreadsJoiner<>;
class FunctionWrapper : boost::noncopyable
// This class is needed in ThreadPool to store std::packaged_task<> objects.
// std::packaged_task<> isnt copyable so we have to use std::move().
// This idea is borrowed from the book C++ Concurrency in action by Anthony Williams (Chapter 9).
class FunctionWrapper
{
public:
template<typename F>
template <typename F>
FunctionWrapper(F && func) : m_impl(new ImplType<F>(std::move(func))) {}
FunctionWrapper() = default;
FunctionWrapper(FunctionWrapper && other) : m_impl(std::move(other.m_impl)) {}
FunctionWrapper & operator=(FunctionWrapper && other)
{
m_impl = std::move(other.m_impl);
@ -47,18 +51,22 @@ private:
struct ImplBase
{
virtual ~ImplBase() = default;
virtual void Call() = 0;
};
template<typename F>
template <typename F>
struct ImplType : ImplBase
{
ImplType(F && func) : m_func(std::move(func)) {}
void Call() override { m_func(); }
F m_func;
};
std::unique_ptr<ImplBase> m_impl;
DISALLOW_COPY(FunctionWrapper);
};
} // namespace threads