Review fixes.

This commit is contained in:
Yuri Gorshenin 2017-09-29 14:12:12 +03:00 committed by Arsentiy Milchakov
parent b8cdc1b24e
commit 851b67edbe
3 changed files with 47 additions and 45 deletions

View file

@ -89,7 +89,7 @@ UNIT_TEST(WorkerThread_PushFromPendingTask)
p.set_value();
}
UNIT_TEST(WorkerThread_PushDelayedTask)
UNIT_TEST(WorkerThread_DelayedAndImmediateTasks)
{
int const kNumTasks = 100;

View file

@ -1,5 +1,7 @@
#include "base/worker_thread.hpp"
#include <array>
using namespace std;
namespace base
@ -28,8 +30,6 @@ bool WorkerThread::Push(Task const & t)
bool WorkerThread::PushDelayed(Duration const & delay, Task && t)
{
// NOTE: this code depends on the fact that steady_clock is the same
// for different threads.
auto const when = Now() + delay;
return TouchQueues([&]() { m_delayed.emplace(when, move(t)); });
}
@ -47,16 +47,16 @@ void WorkerThread::ProcessTasks()
while (true)
{
Task task;
array<Task, QUEUE_TYPE_COUNT> tasks;
{
unique_lock<mutex> lk(m_mu);
if (!m_delayed.empty())
{
// We need to wait for the moment when the delayed task must
// be executed, but may be interrupted earlier, in case of
// immediate task or another delayed task that must be
// executed earlier.
// We need to wait until the moment when the earliest delayed
// task may be executed, given that an immediate task or a
// delayed task with an earlier execution time may arrive
// while we are waiting.
auto const when = m_delayed.top().m_when;
m_cv.wait_until(lk, when, [this, when]() {
return m_shutdown || !m_immediate.empty() || m_delayed.top().m_when < when;
@ -64,9 +64,8 @@ void WorkerThread::ProcessTasks()
}
else
{
// When there is no delayed tasks in the queue, we need to
// wait until there is at least one immediate task or delayed
// task.
// When there are no delayed tasks in the queue, we need to
// wait until there is at least one immediate or delayed task.
m_cv.wait(lk,
[this]() { return m_shutdown || !m_immediate.empty() || !m_delayed.empty(); });
}
@ -91,38 +90,24 @@ void WorkerThread::ProcessTasks()
auto const canExecImmediate = !m_immediate.empty();
auto const canExecDelayed = !m_delayed.empty() && Now() >= m_delayed.top().m_when;
if (!canExecImmediate && !canExecDelayed)
continue;
ASSERT(canExecImmediate || canExecDelayed, ());
bool execImmediate = canExecImmediate;
bool execDelayed = canExecDelayed;
if (canExecImmediate && canExecDelayed)
if (canExecImmediate)
{
// Tasks are executed in the Round-Robin order to prevent
// bias.
execImmediate = m_lastQueue == QueueType::Delayed;
execDelayed = m_lastQueue == QueueType::Immediate;
}
if (execImmediate)
{
task = move(m_immediate.front());
tasks[QUEUE_TYPE_IMMEDIATE] = move(m_immediate.front());
m_immediate.pop();
m_lastQueue = QueueType::Immediate;
}
else
if (canExecDelayed)
{
ASSERT(execDelayed, ());
task = move(m_delayed.top().m_task);
tasks[QUEUE_TYPE_DELAYED] = move(m_delayed.top().m_task);
m_delayed.pop();
m_lastQueue = QueueType::Delayed;
}
}
if (task)
task();
for (auto const & task : tasks)
{
if (task)
task();
}
}
for (; !pendingImmediate.empty(); pendingImmediate.pop())
@ -131,7 +116,7 @@ void WorkerThread::ProcessTasks()
for (; !pendingDelayed.empty(); pendingDelayed.pop())
{
auto const & top = pendingDelayed.top();
while(true)
while (true)
{
auto const now = Now();
if (now >= top.m_when)

View file

@ -33,11 +33,28 @@ public:
WorkerThread();
~WorkerThread() override;
// Pushes task to the end of the thread's queue. Returns false when
// the thread is shut down.
// Pushes task to the end of the thread's queue of immediate tasks.
// Returns false when the thread is shut down.
//
// The task |t| is going to be executed after all immediate tasks
// that were pushed pushed before it.
bool Push(Task && t) override;
bool Push(Task const & t) override;
// Pushes task to the thread's queue of delayed tasks. Returns false
// when the thread is shut down.
//
// The task |t| is going to be executed not earlier than after
// |delay|. No other guarantees about execution order are made. In
// particular, when executing:
//
// PushDelayed(3ms, task1);
// PushDelayed(1ms, task2);
//
// there is no guarantee that |task2| will be executed before |task1|.
//
// NOTE: current implementation depends on the fact that
// steady_clock is the same for different threads.
bool PushDelayed(Duration const & delay, Task && t);
bool PushDelayed(Duration const & delay, Task const & t);
@ -48,6 +65,13 @@ public:
TimePoint Now() const { return Clock::now(); }
private:
enum QueueType
{
QUEUE_TYPE_IMMEDIATE,
QUEUE_TYPE_DELAYED,
QUEUE_TYPE_COUNT
};
struct DelayedTask
{
template <typename T>
@ -66,12 +90,6 @@ private:
using DelayedQueue =
std::priority_queue<DelayedTask, std::vector<DelayedTask>, std::greater<DelayedTask>>;
enum class QueueType
{
Immediate,
Delayed
};
template <typename Fn>
bool TouchQueues(Fn && fn)
{
@ -94,7 +112,6 @@ private:
ImmediateQueue m_immediate;
DelayedQueue m_delayed;
QueueType m_lastQueue = QueueType::Immediate;
ThreadChecker m_checker;
};