From 851b67edbe8fbf3fd86cb9ac110ad3d3882e959c Mon Sep 17 00:00:00 2001 From: Yuri Gorshenin Date: Fri, 29 Sep 2017 14:12:12 +0300 Subject: [PATCH] Review fixes. --- base/base_tests/worker_thread_tests.cpp | 2 +- base/worker_thread.cpp | 55 +++++++++---------------- base/worker_thread.hpp | 35 ++++++++++++---- 3 files changed, 47 insertions(+), 45 deletions(-) diff --git a/base/base_tests/worker_thread_tests.cpp b/base/base_tests/worker_thread_tests.cpp index e478236cbf..6b07cb6b88 100644 --- a/base/base_tests/worker_thread_tests.cpp +++ b/base/base_tests/worker_thread_tests.cpp @@ -89,7 +89,7 @@ UNIT_TEST(WorkerThread_PushFromPendingTask) p.set_value(); } -UNIT_TEST(WorkerThread_PushDelayedTask) +UNIT_TEST(WorkerThread_DelayedAndImmediateTasks) { int const kNumTasks = 100; diff --git a/base/worker_thread.cpp b/base/worker_thread.cpp index 8c15175127..c42dff9364 100644 --- a/base/worker_thread.cpp +++ b/base/worker_thread.cpp @@ -1,5 +1,7 @@ #include "base/worker_thread.hpp" +#include + using namespace std; namespace base @@ -28,8 +30,6 @@ bool WorkerThread::Push(Task const & t) bool WorkerThread::PushDelayed(Duration const & delay, Task && t) { - // NOTE: this code depends on the fact that steady_clock is the same - // for different threads. auto const when = Now() + delay; return TouchQueues([&]() { m_delayed.emplace(when, move(t)); }); } @@ -47,16 +47,16 @@ void WorkerThread::ProcessTasks() while (true) { - Task task; + array tasks; { unique_lock lk(m_mu); if (!m_delayed.empty()) { - // We need to wait for the moment when the delayed task must - // be executed, but may be interrupted earlier, in case of - // immediate task or another delayed task that must be - // executed earlier. + // We need to wait until the moment when the earliest delayed + // task may be executed, given that an immediate task or a + // delayed task with an earlier execution time may arrive + // while we are waiting. auto const when = m_delayed.top().m_when; m_cv.wait_until(lk, when, [this, when]() { return m_shutdown || !m_immediate.empty() || m_delayed.top().m_when < when; @@ -64,9 +64,8 @@ void WorkerThread::ProcessTasks() } else { - // When there is no delayed tasks in the queue, we need to - // wait until there is at least one immediate task or delayed - // task. + // When there are no delayed tasks in the queue, we need to + // wait until there is at least one immediate or delayed task. m_cv.wait(lk, [this]() { return m_shutdown || !m_immediate.empty() || !m_delayed.empty(); }); } @@ -91,38 +90,24 @@ void WorkerThread::ProcessTasks() auto const canExecImmediate = !m_immediate.empty(); auto const canExecDelayed = !m_delayed.empty() && Now() >= m_delayed.top().m_when; - if (!canExecImmediate && !canExecDelayed) - continue; - - ASSERT(canExecImmediate || canExecDelayed, ()); - bool execImmediate = canExecImmediate; - bool execDelayed = canExecDelayed; - - if (canExecImmediate && canExecDelayed) + if (canExecImmediate) { - // Tasks are executed in the Round-Robin order to prevent - // bias. - execImmediate = m_lastQueue == QueueType::Delayed; - execDelayed = m_lastQueue == QueueType::Immediate; - } - - if (execImmediate) - { - task = move(m_immediate.front()); + tasks[QUEUE_TYPE_IMMEDIATE] = move(m_immediate.front()); m_immediate.pop(); - m_lastQueue = QueueType::Immediate; } - else + + if (canExecDelayed) { - ASSERT(execDelayed, ()); - task = move(m_delayed.top().m_task); + tasks[QUEUE_TYPE_DELAYED] = move(m_delayed.top().m_task); m_delayed.pop(); - m_lastQueue = QueueType::Delayed; } } - if (task) - task(); + for (auto const & task : tasks) + { + if (task) + task(); + } } for (; !pendingImmediate.empty(); pendingImmediate.pop()) @@ -131,7 +116,7 @@ void WorkerThread::ProcessTasks() for (; !pendingDelayed.empty(); pendingDelayed.pop()) { auto const & top = pendingDelayed.top(); - while(true) + while (true) { auto const now = Now(); if (now >= top.m_when) diff --git a/base/worker_thread.hpp b/base/worker_thread.hpp index 6a4a4af554..1408e89c0c 100644 --- a/base/worker_thread.hpp +++ b/base/worker_thread.hpp @@ -33,11 +33,28 @@ public: WorkerThread(); ~WorkerThread() override; - // Pushes task to the end of the thread's queue. Returns false when - // the thread is shut down. + // Pushes task to the end of the thread's queue of immediate tasks. + // Returns false when the thread is shut down. + // + // The task |t| is going to be executed after all immediate tasks + // that were pushed pushed before it. bool Push(Task && t) override; bool Push(Task const & t) override; + // Pushes task to the thread's queue of delayed tasks. Returns false + // when the thread is shut down. + // + // The task |t| is going to be executed not earlier than after + // |delay|. No other guarantees about execution order are made. In + // particular, when executing: + // + // PushDelayed(3ms, task1); + // PushDelayed(1ms, task2); + // + // there is no guarantee that |task2| will be executed before |task1|. + // + // NOTE: current implementation depends on the fact that + // steady_clock is the same for different threads. bool PushDelayed(Duration const & delay, Task && t); bool PushDelayed(Duration const & delay, Task const & t); @@ -48,6 +65,13 @@ public: TimePoint Now() const { return Clock::now(); } private: + enum QueueType + { + QUEUE_TYPE_IMMEDIATE, + QUEUE_TYPE_DELAYED, + QUEUE_TYPE_COUNT + }; + struct DelayedTask { template @@ -66,12 +90,6 @@ private: using DelayedQueue = std::priority_queue, std::greater>; - enum class QueueType - { - Immediate, - Delayed - }; - template bool TouchQueues(Fn && fn) { @@ -94,7 +112,6 @@ private: ImmediateQueue m_immediate; DelayedQueue m_delayed; - QueueType m_lastQueue = QueueType::Immediate; ThreadChecker m_checker; };