Review fixes.

This commit is contained in:
Yuri Gorshenin 2017-06-26 18:27:47 +03:00 committed by Arsentiy Milchakov
parent 8187ce775d
commit ca7a32dcac
3 changed files with 36 additions and 32 deletions

View file

@ -18,12 +18,12 @@ UNIT_TEST(WorkerThread_Smoke)
{
WorkerThread thread;
thread.Shutdown(WorkerThread::Exit::SkipPending);
TEST(thread.Shutdown(WorkerThread::Exit::SkipPending), ());
}
{
WorkerThread thread;
thread.Shutdown(WorkerThread::Exit::ExecPending);
TEST(thread.Shutdown(WorkerThread::Exit::ExecPending), ());
}
}
@ -36,14 +36,14 @@ UNIT_TEST(WorkerThread_SimpleSync)
bool done = false;
WorkerThread thread;
thread.Push([&value]() { ++value; });
thread.Push([&value]() { value *= 2; });
thread.Push([&value]() { value = value * value * value; });
thread.Push([&]() {
TEST(thread.Push([&value]() { ++value; }), ());
TEST(thread.Push([&value]() { value *= 2; }), ());
TEST(thread.Push([&value]() { value = value * value * value; }), ());
TEST(thread.Push([&]() {
lock_guard<mutex> lk(mu);
done = true;
cv.notify_one();
});
}), ());
{
unique_lock<mutex> lk(mu);
@ -58,12 +58,12 @@ UNIT_TEST(WorkerThread_SimpleFlush)
int value = 0;
{
WorkerThread thread;
thread.Push([&value]() { ++value; });
thread.Push([&value]() {
TEST(thread.Push([&value]() { ++value; }), ());
TEST(thread.Push([&value]() {
for (int i = 0; i < 10; ++i)
value *= 2;
});
thread.Shutdown(WorkerThread::Exit::ExecPending);
}), ());
TEST(thread.Shutdown(WorkerThread::Exit::ExecPending), ());
}
TEST_EQUAL(value, 1024, ());
}

View file

@ -18,6 +18,8 @@ WorkerThread::~WorkerThread()
void WorkerThread::ProcessTasks()
{
queue<Task> pending;
unique_lock<mutex> lk(m_mu, defer_lock);
while (true)
@ -33,17 +35,11 @@ void WorkerThread::ProcessTasks()
switch (m_exit)
{
case Exit::ExecPending:
{
while (!m_queue.empty())
{
m_queue.front()();
m_queue.pop();
}
CHECK(pending.empty(), ());
m_queue.swap(pending);
break;
}
case Exit::SkipPending: break;
}
break;
}
@ -55,19 +51,22 @@ void WorkerThread::ProcessTasks()
task();
}
while (!pending.empty())
{
pending.front()();
pending.pop();
}
}
void WorkerThread::Shutdown(Exit e)
bool WorkerThread::Shutdown(Exit e)
{
ASSERT(m_checker.CalledOnOriginalThread(), ());
if (m_shutdown)
return;
CHECK(!m_shutdown, ());
lock_guard<mutex> lk(m_mu);
if (m_shutdown)
return false;
m_shutdown = true;
m_exit = e;
m_cv.notify_one();
return true;
}
} // namespace base

View file

@ -13,7 +13,8 @@ namespace base
{
// This class represents a simple worker thread with a queue of tasks.
//
// *NOTE* This class is not thread-safe.
// *NOTE* This class IS thread-safe, but it must be destroyed on the
// same thread it was created.
class WorkerThread
{
public:
@ -28,18 +29,22 @@ public:
WorkerThread();
~WorkerThread();
// Pushes task to the end of the thread's queue. Returns false when
// the thread is shut down.
template <typename T>
void Push(T && t)
bool Push(T && t)
{
ASSERT(m_checker.CalledOnOriginalThread(), ());
CHECK(!m_shutdown, ());
std::lock_guard<std::mutex> lk(m_mu);
if (m_shutdown)
return false;
m_queue.emplace(std::forward<T>(t));
m_cv.notify_one();
return true;
}
void Shutdown(Exit e);
// Sends a signal to the thread to shut down. Returns false when the
// thread was shut down previously.
bool Shutdown(Exit e);
private:
void ProcessTasks();