[base] thread pool delayed review fixes

This commit is contained in:
Arsentiy Milchakov 2020-05-29 18:53:33 +03:00 committed by mpimenov
parent cb9cb2bdc1
commit 5104885bbc
5 changed files with 16 additions and 16 deletions

View file

@ -27,7 +27,7 @@ UNIT_TEST(LinkedMap_Smoke)
TEST_EQUAL(getResult, "world", ()); TEST_EQUAL(getResult, "world", ());
TEST_EQUAL(container.Front(), "hello", ()); TEST_EQUAL(container.Front(), "hello", ());
container.Pop(); container.PopFront();
TEST_EQUAL(container.Front(), "world", ()); TEST_EQUAL(container.Front(), "world", ());
TEST_EQUAL(container.Size(), 2, ()); TEST_EQUAL(container.Size(), 2, ());

View file

@ -151,19 +151,19 @@ UNIT_TEST(ThreadPoolDelayed_CancelImmediate)
++value; ++value;
testing::Wait(); testing::Wait();
}); });
TEST_EQUAL(id, ThreadPool::kImmediateMinId + 1, ()); TEST_EQUAL(id, ThreadPool::kImmediateMinId, ());
} }
{ {
cancelTaskId = thread.Push([&]() { value += 1023; }); cancelTaskId = thread.Push([&]() { value += 1023; });
TEST_EQUAL(cancelTaskId, ThreadPool::kImmediateMinId + 2, ()); TEST_EQUAL(cancelTaskId, ThreadPool::kImmediateMinId + 1, ());
} }
{ {
auto const id = thread.Push([&]() { ++value; }); auto const id = thread.Push([&]() { ++value; });
TEST_EQUAL(id, ThreadPool::kImmediateMinId + 3, ()); TEST_EQUAL(id, ThreadPool::kImmediateMinId + 2, ());
} }
TEST(thread.Cancel(cancelTaskId), ()); TEST(thread.Cancel(cancelTaskId), ());
@ -185,22 +185,22 @@ UNIT_TEST(ThreadPoolDelayed_CancelDelayed)
ThreadPool thread; ThreadPool thread;
{ {
auto const id = thread.Push([]() { testing::Wait(); }); auto const id = thread.Push([]() { testing::Wait(); });
TEST_EQUAL(id, ThreadPool::kImmediateMinId + 1, ()); TEST_EQUAL(id, ThreadPool::kImmediateMinId, ());
} }
{ {
auto const delayedId = thread.PushDelayed(milliseconds(1), [&value]() { ++value; }); auto const delayedId = thread.PushDelayed(milliseconds(1), [&value]() { ++value; });
TEST_EQUAL(delayedId, ThreadPool::kDelayedMinId + 1, ()); TEST_EQUAL(delayedId, ThreadPool::kDelayedMinId, ());
} }
{ {
cancelTaskId = thread.PushDelayed(milliseconds(2), [&]() { value += 1023; }); cancelTaskId = thread.PushDelayed(milliseconds(2), [&]() { value += 1023; });
TEST_EQUAL(cancelTaskId, ThreadPool::kDelayedMinId + 2, ()); TEST_EQUAL(cancelTaskId, ThreadPool::kDelayedMinId + 1, ());
} }
{ {
auto const delayedId = thread.PushDelayed(milliseconds(3), [&value]() { ++value; }); auto const delayedId = thread.PushDelayed(milliseconds(3), [&value]() { ++value; });
TEST_EQUAL(delayedId, ThreadPool::kDelayedMinId + 3, ()); TEST_EQUAL(delayedId, ThreadPool::kDelayedMinId + 2, ());
} }
TEST(thread.Cancel(cancelTaskId), ()); TEST(thread.Cancel(cancelTaskId), ());

View file

@ -27,7 +27,7 @@ public:
return true; return true;
} }
void Pop() void PopFront()
{ {
CHECK(!m_map.empty(), ()); CHECK(!m_map.empty(), ());
m_map.erase(m_list.front().first); m_map.erase(m_list.front().first);

View file

@ -23,8 +23,8 @@ TaskLoop::TaskId MakeNextId(TaskLoop::TaskId id, TaskLoop::TaskId minId, TaskLoo
ThreadPool::ThreadPool(size_t threadsCount /* = 1 */, Exit e /* = Exit::SkipPending */) ThreadPool::ThreadPool(size_t threadsCount /* = 1 */, Exit e /* = Exit::SkipPending */)
: m_exit(e) : m_exit(e)
, m_immediateLastId(kImmediateMinId) , m_immediateLastId(kImmediateMaxId)
, m_delayedLastId(kDelayedMinId) , m_delayedLastId(kDelayedMaxId)
{ {
for (size_t i = 0; i < threadsCount; ++i) for (size_t i = 0; i < threadsCount; ++i)
m_threads.emplace_back(threads::SimpleThread(&ThreadPool::ProcessTasks, this)); m_threads.emplace_back(threads::SimpleThread(&ThreadPool::ProcessTasks, this));
@ -144,7 +144,7 @@ void ThreadPool::ProcessTasks()
if (canExecImmediate) if (canExecImmediate)
{ {
tasks[QUEUE_TYPE_IMMEDIATE] = move(m_immediate.Front()); tasks[QUEUE_TYPE_IMMEDIATE] = move(m_immediate.Front());
m_immediate.Pop(); m_immediate.PopFront();
} }
if (canExecDelayed) if (canExecDelayed)
@ -161,7 +161,7 @@ void ThreadPool::ProcessTasks()
} }
} }
for (; !pendingImmediate.IsEmpty(); pendingImmediate.Pop()) for (; !pendingImmediate.IsEmpty(); pendingImmediate.PopFront())
pendingImmediate.Front()(); pendingImmediate.Front()();
while (!pendingDelayed.empty()) while (!pendingDelayed.empty())

View file

@ -47,7 +47,7 @@ public:
~ThreadPool() override; ~ThreadPool() override;
// Pushes task to the end of the thread's queue of immediate tasks. // Pushes task to the end of the thread's queue of immediate tasks.
// Returns task id or empty string when any error occurs // Returns task id or |TaskLoop::kIncorrectId| when any error occurs
// or the thread is shut down. // or the thread is shut down.
// //
// The task |t| is going to be executed after all immediate tasks // The task |t| is going to be executed after all immediate tasks
@ -56,7 +56,7 @@ public:
TaskId Push(Task const & t) override; TaskId Push(Task const & t) override;
// Pushes task to the thread's queue of delayed tasks. // Pushes task to the thread's queue of delayed tasks.
// Returns task id or empty string when any error occurs // Returns task id or |TaskLoop::kIncorrectId| when any error occurs
// or the thread is shut down. // or the thread is shut down.
// //
// The task |t| is going to be executed not earlier than after // The task |t| is going to be executed not earlier than after
@ -110,7 +110,7 @@ private:
bool operator<(DelayedTask const & rhs) const { return m_when < rhs.m_when; } bool operator<(DelayedTask const & rhs) const { return m_when < rhs.m_when; }
bool operator>(DelayedTask const & rhs) const { return rhs < *this; } bool operator>(DelayedTask const & rhs) const { return rhs < *this; }
TaskId m_id; TaskId m_id = kIncorrectId;
TimePoint m_when = {}; TimePoint m_when = {};
Task m_task = {}; Task m_task = {};
}; };