From bd3566e0b820d5d9481bf4e343fa0cf5c766b2b6 Mon Sep 17 00:00:00 2001 From: Arsentiy Milchakov Date: Mon, 15 Jun 2020 16:51:27 +0300 Subject: [PATCH] [base] internal container for thread pool delayed tasks is changed. cancelling have logarithmic complexity now. --- base/thread_pool_delayed.cpp | 39 +++++++++++++++++------------------- base/thread_pool_delayed.hpp | 35 ++++++++++++++++++++++++++++---- 2 files changed, 49 insertions(+), 25 deletions(-) diff --git a/base/thread_pool_delayed.cpp b/base/thread_pool_delayed.cpp index 5bfca0fb7e..9af9a89af1 100644 --- a/base/thread_pool_delayed.cpp +++ b/base/thread_pool_delayed.cpp @@ -72,7 +72,7 @@ TaskLoop::TaskId ThreadPool::AddDelayed(Duration const & delay, T && task) auto const when = Now() + delay; return AddTask([&]() { auto const newId = MakeNextId(m_delayedLastId, kDelayedMinId, kDelayedMaxId); - m_delayed.emplace(newId, when, forward(task)); + m_delayed.Add(newId, make_shared(newId, when, forward(task))); m_delayedLastId = newId; return newId; }); @@ -101,16 +101,16 @@ void ThreadPool::ProcessTasks() { unique_lock lk(m_mu); - if (!m_delayed.empty()) + if (!m_delayed.IsEmpty()) { // We need to wait until the moment when the earliest delayed // task may be executed, given that an immediate task or a // delayed task with an earlier execution time may arrive // while we are waiting. - auto const when = m_delayed.cbegin()->m_when; + auto const when = m_delayed.GetFirstValue()->m_when; m_cv.wait_until(lk, when, [this, when]() { - return m_shutdown || !m_immediate.IsEmpty() || m_delayed.empty() || - (!m_delayed.empty() && m_delayed.cbegin()->m_when < when); + return m_shutdown || !m_immediate.IsEmpty() || m_delayed.IsEmpty() || + (!m_delayed.IsEmpty() && m_delayed.GetFirstValue()->m_when < when); }); } else @@ -118,7 +118,7 @@ void ThreadPool::ProcessTasks() // When there are no delayed tasks in the queue, we need to // wait until there is at least one immediate or delayed task. m_cv.wait(lk, - [this]() { return m_shutdown || !m_immediate.IsEmpty() || !m_delayed.empty(); }); + [this]() { return m_shutdown || !m_immediate.IsEmpty() || !m_delayed.IsEmpty(); }); } if (m_shutdown) @@ -129,8 +129,8 @@ void ThreadPool::ProcessTasks() ASSERT(pendingImmediate.IsEmpty(), ()); m_immediate.Swap(pendingImmediate); - ASSERT(pendingDelayed.empty(), ()); - m_delayed.swap(pendingDelayed); + ASSERT(pendingDelayed.IsEmpty(), ()); + m_delayed.Swap(pendingDelayed); break; case Exit::SkipPending: break; } @@ -139,7 +139,8 @@ void ThreadPool::ProcessTasks() } auto const canExecImmediate = !m_immediate.IsEmpty(); - auto const canExecDelayed = !m_delayed.empty() && Now() >= m_delayed.cbegin()->m_when; + auto const canExecDelayed = + !m_delayed.IsEmpty() && Now() >= m_delayed.GetFirstValue()->m_when; if (canExecImmediate) { @@ -149,8 +150,8 @@ void ThreadPool::ProcessTasks() if (canExecDelayed) { - tasks[QUEUE_TYPE_DELAYED] = move(m_delayed.cbegin()->m_task); - m_delayed.erase(m_delayed.cbegin()); + tasks[QUEUE_TYPE_DELAYED] = move(m_delayed.GetFirstValue()->m_task); + m_delayed.RemoveValue(m_delayed.GetFirstValue()); } } @@ -164,9 +165,9 @@ void ThreadPool::ProcessTasks() for (; !pendingImmediate.IsEmpty(); pendingImmediate.PopFront()) pendingImmediate.Front()(); - while (!pendingDelayed.empty()) + while (!pendingDelayed.IsEmpty()) { - auto const & top = *pendingDelayed.cbegin(); + auto const & top = *pendingDelayed.GetFirstValue(); while (true) { auto const now = Now(); @@ -178,7 +179,7 @@ void ThreadPool::ProcessTasks() ASSERT(Now() >= top.m_when, ()); top.m_task(); - pendingDelayed.erase(pendingDelayed.cbegin()); + pendingDelayed.RemoveValue(pendingDelayed.GetFirstValue()); } } @@ -199,14 +200,10 @@ bool ThreadPool::Cancel(TaskId id) } else { - for (auto it = m_delayed.begin(); it != m_delayed.end(); ++it) + if (m_delayed.RemoveKey(id)) { - if (it->m_id == id) - { - m_delayed.erase(it); - m_cv.notify_one(); - return true; - } + m_cv.notify_one(); + return true; } } diff --git a/base/thread_pool_delayed.hpp b/base/thread_pool_delayed.hpp index a040b1bd18..cd9957fe4d 100644 --- a/base/thread_pool_delayed.hpp +++ b/base/thread_pool_delayed.hpp @@ -1,6 +1,7 @@ #pragma once #include "base/assert.hpp" +#include "base/bidirectional_map.hpp" #include "base/linked_map.hpp" #include "base/task_loop.hpp" #include "base/thread.hpp" @@ -9,8 +10,10 @@ #include #include #include +#include +#include #include -#include +#include #include #include @@ -76,7 +79,6 @@ public: // Cancels task if it is in queue and is not running yet. // Returns false when thread is shut down, // task is not found or already running, otherwise true. - // The complexity is O(1) for immediate tasks and O(N) for delayed tasks. bool Cancel(TaskId id); // Sends a signal to the thread to shut down. Returns false when the @@ -107,7 +109,13 @@ private: { } - bool operator<(DelayedTask const & rhs) const { return m_when < rhs.m_when; } + bool operator<(DelayedTask const & rhs) const + { + if (m_when == rhs.m_when) + return m_id < rhs.m_id; + + return m_when < rhs.m_when; + } bool operator>(DelayedTask const & rhs) const { return rhs < *this; } TaskId m_id = kIncorrectId; @@ -115,8 +123,27 @@ private: Task m_task = {}; }; + template + struct DeRef + { + bool operator()(T const & lhs, T const & rhs) const { return *lhs < *rhs; } + }; + using ImmediateQueue = base::LinkedMap; - using DelayedQueue = std::multiset; + + using DelayedValue = std::shared_ptr; + class DelayedQueue : public BidirectionalMap, + std::multimap, DeRef> + { + public: + Value const & GetFirstValue() const + { + auto const & vTok = GetValuesToKeys(); + CHECK(!vTok.empty(), ()); + return vTok.begin()->first; + } + }; template TaskId AddImmediate(T && task);