forked from organicmaps/organicmaps
[base] Fixed data races and other concurrency bugs.
This commit is contained in:
parent
76562de5ce
commit
3b4e0ddbc9
19 changed files with 283 additions and 360 deletions
|
@ -41,6 +41,7 @@ HEADERS += \
|
|||
bits.hpp \
|
||||
buffer_vector.hpp \
|
||||
cache.hpp \
|
||||
cancellable.hpp \
|
||||
commands_queue.hpp \
|
||||
condition.hpp \
|
||||
const_helper.hpp \
|
||||
|
|
|
@ -1,16 +1,18 @@
|
|||
#include "../../testing/testing.hpp"
|
||||
#include "../commands_queue.hpp"
|
||||
#include "../macros.hpp"
|
||||
#include "../../std/bind.hpp"
|
||||
#include "../thread.hpp"
|
||||
#include "../logging.hpp"
|
||||
|
||||
#include "../../std/atomic.hpp"
|
||||
#include "../../std/bind.hpp"
|
||||
|
||||
void add_int(core::CommandsQueue::Environment const & env,
|
||||
int & i,
|
||||
atomic<int> & i,
|
||||
int a)
|
||||
{
|
||||
threads::Sleep(500);
|
||||
if (env.isCancelled())
|
||||
if (env.IsCancelled())
|
||||
return;
|
||||
i += a;
|
||||
LOG(LINFO, ("add_int result:", i));
|
||||
|
@ -18,19 +20,21 @@ void add_int(core::CommandsQueue::Environment const & env,
|
|||
|
||||
void join_mul_int(core::CommandsQueue::Environment const & env,
|
||||
shared_ptr<core::CommandsQueue::Command> const & command,
|
||||
int & i,
|
||||
atomic<int> & i,
|
||||
int b)
|
||||
{
|
||||
command->join();
|
||||
i *= b;
|
||||
int value = i;
|
||||
while (!i.compare_exchange_weak(value, value * b));
|
||||
LOG(LINFO, ("join_mul_int result: ", i));
|
||||
}
|
||||
|
||||
void mul_int(core::CommandsQueue::Environment const & env, int & i, int b)
|
||||
void mul_int(core::CommandsQueue::Environment const & env, atomic<int> & i, int b)
|
||||
{
|
||||
if (env.isCancelled())
|
||||
if (env.IsCancelled())
|
||||
return;
|
||||
i *= b;
|
||||
int value = i;
|
||||
while (!i.compare_exchange_weak(value, value * b));
|
||||
LOG(LINFO, ("mul_int result: ", i));
|
||||
}
|
||||
|
||||
|
@ -38,7 +42,7 @@ UNIT_TEST(CommandsQueue_SetupAndPerformSimpleTask)
|
|||
{
|
||||
core::CommandsQueue queue(1);
|
||||
|
||||
int i = 3;
|
||||
atomic<int> i(3);
|
||||
|
||||
queue.Start();
|
||||
|
||||
|
@ -58,7 +62,7 @@ UNIT_TEST(CommandsQueue_SetupAndPerformSimpleTaskWith2Executors)
|
|||
{
|
||||
core::CommandsQueue queue(2);
|
||||
|
||||
int i = 3;
|
||||
atomic<int> i(3);
|
||||
|
||||
queue.Start();
|
||||
|
||||
|
@ -66,7 +70,7 @@ UNIT_TEST(CommandsQueue_SetupAndPerformSimpleTaskWith2Executors)
|
|||
queue.AddCommand(bind(&mul_int, _1, ref(i), 3));
|
||||
|
||||
queue.Join();
|
||||
// threads::Sleep(1000);
|
||||
// threads::Sleep(1000);
|
||||
|
||||
queue.Cancel();
|
||||
|
||||
|
@ -77,14 +81,14 @@ UNIT_TEST(CommandsQueue_TestEnvironmentCancellation)
|
|||
{
|
||||
core::CommandsQueue queue(2);
|
||||
|
||||
int i = 3;
|
||||
atomic<int> i(3);
|
||||
|
||||
queue.Start();
|
||||
|
||||
queue.AddCommand(bind(&add_int, _1, ref(i), 5));
|
||||
queue.AddCommand(bind(&mul_int, _1, ref(i), 3));
|
||||
|
||||
threads::Sleep(200); //< after this second command is executed, first will be cancelled
|
||||
threads::Sleep(200); //< after this second command is executed, first will be cancelled
|
||||
|
||||
queue.Cancel();
|
||||
|
||||
|
@ -96,12 +100,13 @@ UNIT_TEST(CommandsQueue_TestJoinCommand)
|
|||
core::CommandsQueue queue0(1);
|
||||
core::CommandsQueue queue1(1);
|
||||
|
||||
int i = 3;
|
||||
atomic<int> i(3);
|
||||
|
||||
queue0.Start();
|
||||
queue1.Start();
|
||||
|
||||
shared_ptr<core::CommandsQueue::Command> cmd = queue0.AddCommand(bind(&add_int, _1, ref(i), 5), true);
|
||||
shared_ptr<core::CommandsQueue::Command> cmd =
|
||||
queue0.AddCommand(bind(&add_int, _1, ref(i), 5), true);
|
||||
queue1.AddCommand(bind(&join_mul_int, _1, cmd, ref(i), 3), false);
|
||||
|
||||
queue0.Join();
|
||||
|
|
|
@ -11,7 +11,7 @@ void add_int(core::CommandsQueue::Environment const & env,
|
|||
int ms)
|
||||
{
|
||||
threads::Sleep(ms);
|
||||
if (env.isCancelled())
|
||||
if (env.IsCancelled())
|
||||
return;
|
||||
i += a;
|
||||
LOG(LINFO, ("add_int result:", i));
|
||||
|
|
|
@ -2,29 +2,28 @@
|
|||
|
||||
#include "../scheduled_task.hpp"
|
||||
|
||||
#include "../../std/atomic.hpp"
|
||||
#include "../../std/bind.hpp"
|
||||
|
||||
|
||||
namespace
|
||||
{
|
||||
void add_int(int & val, int a)
|
||||
{
|
||||
val += a;
|
||||
}
|
||||
void add_int(atomic<int> & val, int a) { val += a; }
|
||||
|
||||
void mul_int(int & val, int b)
|
||||
{
|
||||
val *= b;
|
||||
}
|
||||
void mul_int(atomic<int> & val, int b)
|
||||
{
|
||||
int value = val;
|
||||
while (!val.compare_exchange_weak(value, value * b))
|
||||
;
|
||||
}
|
||||
|
||||
} // namespace
|
||||
|
||||
/// @todo Next tests are based on assumptions that some delays are suitable for
|
||||
/// performing needed checks, before a task will fire.
|
||||
|
||||
UNIT_TEST(ScheduledTask_Smoke)
|
||||
{
|
||||
int val = 0;
|
||||
atomic<int> val(0);
|
||||
|
||||
ScheduledTask t(bind(&add_int, ref(val), 10), 1000);
|
||||
|
||||
|
@ -38,7 +37,7 @@ UNIT_TEST(ScheduledTask_Smoke)
|
|||
|
||||
UNIT_TEST(ScheduledTask_CancelInfinite)
|
||||
{
|
||||
int val = 2;
|
||||
atomic<int> val(2);
|
||||
|
||||
ScheduledTask t0(bind(&add_int, ref(val), 10), static_cast<unsigned>(-1));
|
||||
|
||||
|
@ -49,7 +48,7 @@ UNIT_TEST(ScheduledTask_CancelInfinite)
|
|||
|
||||
UNIT_TEST(ScheduledTask_Cancel)
|
||||
{
|
||||
int val = 2;
|
||||
atomic<int> val(2);
|
||||
|
||||
ScheduledTask t0(bind(&add_int, ref(val), 10), 500);
|
||||
ScheduledTask t1(bind(&mul_int, ref(val), 2), 1000);
|
||||
|
@ -66,7 +65,7 @@ UNIT_TEST(ScheduledTask_Cancel)
|
|||
|
||||
UNIT_TEST(ScheduledTask_NoWaitInCancel)
|
||||
{
|
||||
int val = 2;
|
||||
atomic<int> val(2);
|
||||
|
||||
ScheduledTask t0(bind(&add_int, ref(val), 10), 1000);
|
||||
ScheduledTask t1(bind(&mul_int, ref(val), 3), 500);
|
||||
|
|
|
@ -135,13 +135,10 @@ UNIT_TEST(ThreadPool_ExecutionTaskTest)
|
|||
|
||||
while(true)
|
||||
{
|
||||
cond.Lock();
|
||||
|
||||
threads::ConditionGuard guard(cond);
|
||||
if (finishCounter == TASK_COUNT)
|
||||
break;
|
||||
|
||||
cond.Wait();
|
||||
cond.Unlock();
|
||||
guard.Wait();
|
||||
}
|
||||
}
|
||||
|
||||
|
|
|
@ -7,23 +7,29 @@
|
|||
|
||||
#include "../../base/logging.hpp"
|
||||
|
||||
#include "../../std/mutex.hpp"
|
||||
|
||||
struct ThreadedListProcessor : public threads::IRoutine
|
||||
{
|
||||
ThreadedList<int> * m_p;
|
||||
list<int> * m_res;
|
||||
ThreadedList<int> & m_p;
|
||||
mutex & m_resMutex;
|
||||
list<int> & m_res;
|
||||
int m_id;
|
||||
|
||||
ThreadedListProcessor(ThreadedList<int> * p, list<int> * res, int id)
|
||||
: m_p(p), m_res(res), m_id(id)
|
||||
{}
|
||||
ThreadedListProcessor(ThreadedList<int> & p, mutex & resMutex, list<int> & res, int id)
|
||||
: m_p(p), m_resMutex(resMutex), m_res(res), m_id(id)
|
||||
{
|
||||
}
|
||||
|
||||
virtual void Do()
|
||||
{
|
||||
while (!m_p->IsCancelled())
|
||||
while (!m_p.IsCancelled())
|
||||
{
|
||||
int res = m_p->Front(true);
|
||||
m_res->push_back(res);
|
||||
int res = m_p.Front(true /* doPop */);
|
||||
{
|
||||
lock_guard<mutex> resGuard(m_resMutex);
|
||||
m_res.push_back(res);
|
||||
}
|
||||
LOG(LINFO, (m_id, " thread got ", res));
|
||||
threads::Sleep(10);
|
||||
}
|
||||
|
@ -33,20 +39,26 @@ struct ThreadedListProcessor : public threads::IRoutine
|
|||
|
||||
struct ThreadedPriorityQueueProcessor : public threads::IRoutine
|
||||
{
|
||||
ThreadedPriorityQueue<int> * m_p;
|
||||
list<int> * m_res;
|
||||
ThreadedPriorityQueue<int> & m_p;
|
||||
mutex & m_resMutex;
|
||||
list<int> & m_res;
|
||||
int m_id;
|
||||
|
||||
ThreadedPriorityQueueProcessor(ThreadedPriorityQueue<int> * p, list<int> * res, int id)
|
||||
: m_p(p), m_res(res), m_id(id)
|
||||
{}
|
||||
ThreadedPriorityQueueProcessor(ThreadedPriorityQueue<int> & p, mutex & resMutex, list<int> & res,
|
||||
int id)
|
||||
: m_p(p), m_resMutex(resMutex), m_res(res), m_id(id)
|
||||
{
|
||||
}
|
||||
|
||||
virtual void Do()
|
||||
{
|
||||
while (!m_p->IsCancelled())
|
||||
while (!m_p.IsCancelled())
|
||||
{
|
||||
int res = m_p->Top(true);
|
||||
m_res->push_back(res);
|
||||
int res = m_p.Top(true /* doPop */);
|
||||
{
|
||||
lock_guard<mutex> resGuard(m_resMutex);
|
||||
m_res.push_back(res);
|
||||
}
|
||||
LOG(LINFO, (m_id, " thread got ", res));
|
||||
threads::Sleep(10);
|
||||
}
|
||||
|
@ -54,22 +66,23 @@ struct ThreadedPriorityQueueProcessor : public threads::IRoutine
|
|||
}
|
||||
};
|
||||
|
||||
|
||||
UNIT_TEST(ThreadedList)
|
||||
{
|
||||
list<int> l;
|
||||
|
||||
mutex resMutex;
|
||||
list<int> res;
|
||||
|
||||
ThreadedList<int> p;
|
||||
|
||||
threads::Thread t0;
|
||||
t0.Create(new ThreadedListProcessor(&p, &res, 0));
|
||||
t0.Create(new ThreadedListProcessor(p, resMutex, res, 0));
|
||||
|
||||
threads::Thread t1;
|
||||
t1.Create(new ThreadedListProcessor(&p, &res, 1));
|
||||
t1.Create(new ThreadedListProcessor(p, resMutex, res, 1));
|
||||
|
||||
threads::Thread t2;
|
||||
t2.Create(new ThreadedListProcessor(&p, &res, 2));
|
||||
t2.Create(new ThreadedListProcessor(p, resMutex, res, 2));
|
||||
|
||||
p.PushBack(0);
|
||||
threads::Sleep(200);
|
||||
|
@ -80,34 +93,35 @@ UNIT_TEST(ThreadedList)
|
|||
p.PushBack(2);
|
||||
threads::Sleep(200);
|
||||
|
||||
p.Cancel();
|
||||
|
||||
t0.Join();
|
||||
t1.Join();
|
||||
t2.Join();
|
||||
|
||||
TEST_EQUAL(res.front(), 0, ());
|
||||
res.pop_front();
|
||||
TEST_EQUAL(res.front(), 1, ());
|
||||
res.pop_front();
|
||||
TEST_EQUAL(res.front(), 2, ());
|
||||
res.pop_front();
|
||||
|
||||
p.Cancel();
|
||||
|
||||
t0.Join();
|
||||
t1.Join();
|
||||
t2.Join();
|
||||
}
|
||||
|
||||
UNIT_TEST(ThreadedPriorityQueue)
|
||||
{
|
||||
mutex resMutex;
|
||||
list<int> res;
|
||||
|
||||
ThreadedPriorityQueue<int> p;
|
||||
|
||||
threads::Thread t0;
|
||||
t0.Create(new ThreadedPriorityQueueProcessor(&p, &res, 0));
|
||||
t0.Create(new ThreadedPriorityQueueProcessor(p, resMutex, res, 0));
|
||||
|
||||
threads::Thread t1;
|
||||
t1.Create(new ThreadedPriorityQueueProcessor(&p, &res, 1));
|
||||
t1.Create(new ThreadedPriorityQueueProcessor(p, resMutex, res, 1));
|
||||
|
||||
threads::Thread t2;
|
||||
t2.Create(new ThreadedPriorityQueueProcessor(&p, &res, 2));
|
||||
t2.Create(new ThreadedPriorityQueueProcessor(p, resMutex, res, 2));
|
||||
|
||||
p.Push(0);
|
||||
threads::Sleep(200);
|
||||
|
@ -118,17 +132,17 @@ UNIT_TEST(ThreadedPriorityQueue)
|
|||
p.Push(2);
|
||||
threads::Sleep(200);
|
||||
|
||||
p.Cancel();
|
||||
|
||||
t0.Join();
|
||||
t1.Join();
|
||||
t2.Join();
|
||||
|
||||
TEST_EQUAL(res.front(), 0, ());
|
||||
res.pop_front();
|
||||
TEST_EQUAL(res.front(), 1, ());
|
||||
res.pop_front();
|
||||
TEST_EQUAL(res.front(), 2, ());
|
||||
res.pop_front();
|
||||
|
||||
p.Cancel();
|
||||
|
||||
t0.Join();
|
||||
t1.Join();
|
||||
t2.Join();
|
||||
}
|
||||
|
||||
|
|
28
base/cancellable.hpp
Normal file
28
base/cancellable.hpp
Normal file
|
@ -0,0 +1,28 @@
|
|||
#pragma once
|
||||
|
||||
#include "../std/atomic.hpp"
|
||||
|
||||
namespace my
|
||||
{
|
||||
/// This class is a helper thread-safe class which can be mixed in
|
||||
/// classes which represent some cancellable activities.
|
||||
class Cancellable
|
||||
{
|
||||
public:
|
||||
Cancellable() : m_cancelled(false) {}
|
||||
|
||||
virtual ~Cancellable() {}
|
||||
|
||||
/// Marks current activity as not cancelled.
|
||||
virtual void Reset() { m_cancelled = false; }
|
||||
|
||||
/// Marks current activity as cancelled.
|
||||
virtual void Cancel() { m_cancelled = true; }
|
||||
|
||||
/// \return True is current activity is cancelled.
|
||||
bool IsCancelled() const { return m_cancelled; }
|
||||
|
||||
private:
|
||||
atomic<bool> m_cancelled;
|
||||
};
|
||||
} // namespace my
|
|
@ -10,19 +10,9 @@
|
|||
namespace core
|
||||
{
|
||||
CommandsQueue::Environment::Environment(int threadNum)
|
||||
: m_threadNum(threadNum), m_isCancelled(false)
|
||||
: m_threadNum(threadNum)
|
||||
{}
|
||||
|
||||
void CommandsQueue::Environment::cancel()
|
||||
{
|
||||
m_isCancelled = true;
|
||||
}
|
||||
|
||||
bool CommandsQueue::Environment::isCancelled() const
|
||||
{
|
||||
return m_isCancelled;
|
||||
}
|
||||
|
||||
int CommandsQueue::Environment::threadNum() const
|
||||
{
|
||||
return m_threadNum;
|
||||
|
@ -99,7 +89,7 @@ namespace core
|
|||
if (m_parent->m_commands.IsCancelled())
|
||||
break;
|
||||
|
||||
m_env.m_isCancelled = false;
|
||||
m_env.Reset();
|
||||
|
||||
cmd->perform(m_env);
|
||||
|
||||
|
@ -115,7 +105,7 @@ namespace core
|
|||
|
||||
void CommandsQueue::Routine::Cancel()
|
||||
{
|
||||
m_env.cancel();
|
||||
m_env.Cancel();
|
||||
|
||||
// performing cancellation tasks
|
||||
for(list<shared_ptr<Command> >::const_iterator it = m_parent->m_cancelCommands.begin();
|
||||
|
@ -128,7 +118,7 @@ namespace core
|
|||
|
||||
void CommandsQueue::Routine::CancelCommand()
|
||||
{
|
||||
m_env.cancel();
|
||||
m_env.Cancel();
|
||||
}
|
||||
|
||||
CommandsQueue::Executor::Executor() : m_routine(0)
|
||||
|
|
|
@ -4,6 +4,7 @@
|
|||
#include "../std/vector.hpp"
|
||||
#include "../std/shared_ptr.hpp"
|
||||
|
||||
#include "cancellable.hpp"
|
||||
#include "thread.hpp"
|
||||
#include "threaded_list.hpp"
|
||||
|
||||
|
@ -26,27 +27,18 @@ namespace core
|
|||
/// - task functor should check the IsCancelled()
|
||||
/// on the reasonable small interval and cancel
|
||||
/// it's work upon receiving "true".
|
||||
class Environment
|
||||
class Environment : public my::Cancellable
|
||||
{
|
||||
private:
|
||||
|
||||
int m_threadNum;
|
||||
bool m_isCancelled;
|
||||
|
||||
protected:
|
||||
|
||||
explicit Environment(int threadNum);
|
||||
|
||||
void cancel(); //< call this from the control thread
|
||||
//< to cancel execution of the tasks
|
||||
|
||||
friend class Routine;
|
||||
|
||||
public:
|
||||
|
||||
int threadNum() const; //< number of the thread, which is executing the commands
|
||||
bool isCancelled() const; //< command should ping this flag to see,
|
||||
// whether it should cancel execution
|
||||
};
|
||||
|
||||
/// single commmand
|
||||
|
|
|
@ -45,6 +45,11 @@ ScheduledTask::ScheduledTask(fn_t const & fn, unsigned ms)
|
|||
m_thread.Create(m_routine.get());
|
||||
}
|
||||
|
||||
ScheduledTask::~ScheduledTask()
|
||||
{
|
||||
CancelBlocking();
|
||||
}
|
||||
|
||||
bool ScheduledTask::CancelNoBlocking()
|
||||
{
|
||||
if (m_cond.TryLock())
|
||||
|
|
|
@ -21,18 +21,23 @@ class ScheduledTask
|
|||
|
||||
public:
|
||||
Routine(fn_t const & fn, unsigned ms, threads::Condition * cond);
|
||||
|
||||
virtual void Do();
|
||||
virtual void Cancel();
|
||||
};
|
||||
|
||||
/// The construction and destruction order is strict here: m_cond is
|
||||
/// used by m_routine and m_routine is used by m_thread.
|
||||
threads::Condition m_cond;
|
||||
unique_ptr<Routine> const m_routine;
|
||||
threads::Thread m_thread;
|
||||
threads::Condition m_cond;
|
||||
|
||||
public:
|
||||
/// Constructor by function and time in miliseconds.
|
||||
ScheduledTask(fn_t const & fn, unsigned ms);
|
||||
|
||||
~ScheduledTask();
|
||||
|
||||
/// @name Task could be cancelled before time elapses.
|
||||
//@{
|
||||
/// @return false If the task is already running or in some intermediate state.
|
||||
|
|
275
base/thread.cpp
275
base/thread.cpp
|
@ -1,201 +1,104 @@
|
|||
#include "thread.hpp"
|
||||
#include "assert.hpp"
|
||||
|
||||
#if !defined(OMIM_OS_WINDOWS_NATIVE)
|
||||
#include <pthread.h>
|
||||
#if defined (OMIM_OS_ANDROID)
|
||||
/// External implementations are in android/jni code
|
||||
void AndroidThreadAttachToJVM();
|
||||
void AndroidThreadDetachFromJVM();
|
||||
#endif
|
||||
#endif
|
||||
#include "../base/logging.hpp"
|
||||
|
||||
#include "../std/chrono.hpp"
|
||||
#include "../std/exception.hpp"
|
||||
|
||||
#if defined(OMIM_OS_ANDROID)
|
||||
void AndroidThreadAttachToJVM();
|
||||
void AndroidThreadDetachFromJVM();
|
||||
#endif // defined(OMIM_OS_ANDROID)
|
||||
|
||||
namespace threads
|
||||
{
|
||||
#if defined(OMIM_OS_WINDOWS_NATIVE)
|
||||
/// Windows native implementation
|
||||
class ThreadImpl
|
||||
{
|
||||
HANDLE m_handle;
|
||||
|
||||
public:
|
||||
ThreadImpl() : m_handle(0) {}
|
||||
|
||||
~ThreadImpl()
|
||||
{
|
||||
if (m_handle)
|
||||
::CloseHandle(m_handle);
|
||||
}
|
||||
|
||||
static DWORD WINAPI WindowsWrapperThreadProc(void * p)
|
||||
{
|
||||
IRoutine * pRoutine = reinterpret_cast<IRoutine *>(p);
|
||||
pRoutine->Do();
|
||||
return 0;
|
||||
}
|
||||
|
||||
int Create(IRoutine * pRoutine)
|
||||
{
|
||||
int error = 0;
|
||||
m_handle = ::CreateThread(NULL, 0, &WindowsWrapperThreadProc, reinterpret_cast<void *>(pRoutine), 0, NULL);
|
||||
if (0 == m_handle)
|
||||
error = ::GetLastError();
|
||||
return error;
|
||||
}
|
||||
|
||||
int Join()
|
||||
{
|
||||
int error = 0;
|
||||
if (WAIT_OBJECT_0 != ::WaitForSingleObject(m_handle, INFINITE))
|
||||
error = ::GetLastError();
|
||||
return error;
|
||||
}
|
||||
};
|
||||
// end of Windows Native implementation
|
||||
|
||||
#else
|
||||
/// POSIX pthreads implementation
|
||||
class ThreadImpl
|
||||
{
|
||||
pthread_t m_handle;
|
||||
|
||||
public:
|
||||
ThreadImpl() {}
|
||||
|
||||
static void * PthreadsWrapperThreadProc(void * p)
|
||||
{
|
||||
#ifdef OMIM_OS_ANDROID
|
||||
// Attach thread to JVM, implemented in android/jni code
|
||||
AndroidThreadAttachToJVM();
|
||||
#endif
|
||||
|
||||
IRoutine * pRoutine = reinterpret_cast<IRoutine *>(p);
|
||||
pRoutine->Do();
|
||||
|
||||
#ifdef OMIM_OS_ANDROID
|
||||
// Detach thread from JVM, implemented in android/jni code
|
||||
AndroidThreadDetachFromJVM();
|
||||
#endif
|
||||
|
||||
::pthread_exit(NULL);
|
||||
return NULL;
|
||||
}
|
||||
|
||||
int Create(IRoutine * pRoutine)
|
||||
{
|
||||
return ::pthread_create(&m_handle, NULL, &PthreadsWrapperThreadProc, reinterpret_cast<void *>(pRoutine));
|
||||
}
|
||||
|
||||
int Join()
|
||||
{
|
||||
return ::pthread_join(m_handle, NULL);
|
||||
}
|
||||
};
|
||||
//////////////////////// end of POSIX pthreads implementation
|
||||
#endif
|
||||
|
||||
/////////////////////////////////////////////////////////////////////
|
||||
// Thread wrapper implementation
|
||||
Thread::Thread() : m_impl(new ThreadImpl()), m_routine(0)
|
||||
namespace
|
||||
{
|
||||
/// Prepares worker thread and runs routine.
|
||||
void RunRoutine(IRoutine * routine)
|
||||
{
|
||||
#if defined(OMIM_OS_ANDROID)
|
||||
AndroidThreadAttachToJVM();
|
||||
#endif // defined(OMIM_OS_ANDROID)
|
||||
|
||||
routine->Do();
|
||||
|
||||
#if defined(OMIM_OS_ANDROID)
|
||||
AndroidThreadDetachFromJVM();
|
||||
#endif // defined(OMIM_OS_ANDROID)
|
||||
}
|
||||
} // namespace
|
||||
|
||||
/////////////////////////////////////////////////////////////////////
|
||||
// Thread wrapper implementation
|
||||
Thread::Thread() : m_routine(0) {}
|
||||
|
||||
Thread::~Thread() { Join(); }
|
||||
|
||||
bool Thread::Create(IRoutine * pRoutine)
|
||||
{
|
||||
ASSERT(!m_routine, ("Current implementation doesn't allow to reuse thread"));
|
||||
thread routineThread;
|
||||
try
|
||||
{
|
||||
routineThread = thread(&RunRoutine, pRoutine);
|
||||
}
|
||||
|
||||
Thread::~Thread()
|
||||
catch (exception & e)
|
||||
{
|
||||
delete m_impl;
|
||||
LOG(LERROR, ("Thread creation failed with error:", e.what()));
|
||||
return false;
|
||||
}
|
||||
m_thread = move(routineThread);
|
||||
m_routine = pRoutine;
|
||||
return true;
|
||||
}
|
||||
|
||||
bool Thread::Create(IRoutine * pRoutine)
|
||||
void Thread::Cancel()
|
||||
{
|
||||
if (!m_routine)
|
||||
return;
|
||||
m_routine->Cancel();
|
||||
Join();
|
||||
}
|
||||
|
||||
void Thread::Join()
|
||||
{
|
||||
if (m_thread.joinable())
|
||||
m_thread.join();
|
||||
}
|
||||
|
||||
SimpleThreadPool::SimpleThreadPool(size_t reserve) { m_pool.reserve(reserve); }
|
||||
|
||||
SimpleThreadPool::~SimpleThreadPool()
|
||||
{
|
||||
for (size_t i = 0; i < m_pool.size(); ++i)
|
||||
{
|
||||
ASSERT_EQUAL(m_routine, 0, ("Current implementation doesn't allow to reuse thread"));
|
||||
int error = m_impl->Create(pRoutine);
|
||||
if (0 != error)
|
||||
{
|
||||
ASSERT ( !"Thread creation failed with error", (error) );
|
||||
return false;
|
||||
}
|
||||
m_routine = pRoutine;
|
||||
return true;
|
||||
}
|
||||
|
||||
void Thread::Cancel()
|
||||
{
|
||||
if (m_routine)
|
||||
{
|
||||
m_routine->Cancel();
|
||||
Join();
|
||||
}
|
||||
}
|
||||
|
||||
void Thread::Join()
|
||||
{
|
||||
if (m_routine)
|
||||
{
|
||||
int const error = m_impl->Join();
|
||||
if (0 != error)
|
||||
{
|
||||
ASSERT ( !"Thread join failed. See error value.", (error) );
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
SimpleThreadPool::SimpleThreadPool(size_t reserve)
|
||||
{
|
||||
m_pool.reserve(reserve);
|
||||
}
|
||||
|
||||
SimpleThreadPool::~SimpleThreadPool()
|
||||
{
|
||||
for (size_t i = 0; i < m_pool.size(); ++i)
|
||||
{
|
||||
delete m_pool[i].first;
|
||||
delete m_pool[i].second;
|
||||
}
|
||||
}
|
||||
|
||||
void SimpleThreadPool::Add(IRoutine * pRoutine)
|
||||
{
|
||||
ValueT v;
|
||||
v.first = new Thread();
|
||||
v.second = pRoutine;
|
||||
|
||||
m_pool.push_back(v);
|
||||
|
||||
v.first->Create(pRoutine);
|
||||
}
|
||||
|
||||
void SimpleThreadPool::Join()
|
||||
{
|
||||
for (size_t i = 0; i < m_pool.size(); ++i)
|
||||
m_pool[i].first->Join();
|
||||
}
|
||||
|
||||
IRoutine * SimpleThreadPool::GetRoutine(size_t i) const
|
||||
{
|
||||
return m_pool[i].second;
|
||||
}
|
||||
|
||||
|
||||
void Sleep(size_t ms)
|
||||
{
|
||||
#ifdef OMIM_OS_WINDOWS
|
||||
::Sleep(ms);
|
||||
#else
|
||||
timespec t;
|
||||
t.tv_nsec =(ms * 1000000) % 1000000000;
|
||||
t.tv_sec = (ms * 1000000) / 1000000000;
|
||||
nanosleep(&t, 0);
|
||||
#endif
|
||||
}
|
||||
|
||||
ThreadID GetCurrentThreadID()
|
||||
{
|
||||
#ifdef OMIM_OS_WINDOWS
|
||||
return ::GetCurrentThreadId();
|
||||
#else
|
||||
return reinterpret_cast<void *>(pthread_self());
|
||||
#endif
|
||||
delete m_pool[i].first;
|
||||
delete m_pool[i].second;
|
||||
}
|
||||
}
|
||||
|
||||
void SimpleThreadPool::Add(IRoutine * pRoutine)
|
||||
{
|
||||
ValueT v;
|
||||
v.first = new Thread();
|
||||
v.second = pRoutine;
|
||||
|
||||
m_pool.push_back(v);
|
||||
|
||||
v.first->Create(pRoutine);
|
||||
}
|
||||
|
||||
void SimpleThreadPool::Join()
|
||||
{
|
||||
for (size_t i = 0; i < m_pool.size(); ++i)
|
||||
m_pool[i].first->Join();
|
||||
}
|
||||
|
||||
IRoutine * SimpleThreadPool::GetRoutine(size_t i) const { return m_pool[i].second; }
|
||||
|
||||
void Sleep(size_t ms) { this_thread::sleep_for(milliseconds(ms)); }
|
||||
|
||||
ThreadID GetCurrentThreadID() { return this_thread::get_id(); }
|
||||
}
|
||||
|
|
110
base/thread.hpp
110
base/thread.hpp
|
@ -1,85 +1,77 @@
|
|||
#pragma once
|
||||
|
||||
#include "../base/cancellable.hpp"
|
||||
#include "../base/macros.hpp"
|
||||
|
||||
#include "../std/target_os.hpp"
|
||||
|
||||
#include "../std/stdint.hpp"
|
||||
#include "../std/vector.hpp"
|
||||
#include "../std/utility.hpp"
|
||||
#include "../std/noncopyable.hpp"
|
||||
#include "../std/stdint.hpp"
|
||||
#include "../std/thread.hpp"
|
||||
#include "../std/unique_ptr.hpp"
|
||||
#include "../std/utility.hpp"
|
||||
#include "../std/vector.hpp"
|
||||
|
||||
#ifdef OMIM_OS_WINDOWS
|
||||
#include "../std/windows.hpp" // for DWORD
|
||||
#include "../std/windows.hpp" // for DWORD
|
||||
#endif
|
||||
|
||||
namespace threads
|
||||
{
|
||||
class IRoutine
|
||||
{
|
||||
private:
|
||||
bool m_isCancelled;
|
||||
class IRoutine : public my::Cancellable
|
||||
{
|
||||
public:
|
||||
/// Performing the main task
|
||||
virtual void Do() = 0;
|
||||
};
|
||||
|
||||
public:
|
||||
IRoutine() : m_isCancelled(false) {}
|
||||
virtual ~IRoutine() {}
|
||||
/// wrapper for Create and Terminate threads API
|
||||
class Thread
|
||||
{
|
||||
thread m_thread;
|
||||
IRoutine * m_routine;
|
||||
|
||||
/// Performing the main task
|
||||
virtual void Do() = 0;
|
||||
public:
|
||||
Thread();
|
||||
|
||||
/// Implement this function to respond to the cancellation event.
|
||||
/// Cancellation means that IRoutine should exit as fast as possible.
|
||||
virtual void Cancel() { m_isCancelled = true; }
|
||||
inline bool IsCancelled() const { return m_isCancelled; }
|
||||
};
|
||||
~Thread();
|
||||
|
||||
class ThreadImpl;
|
||||
/// wrapper for Create and Terminate threads API
|
||||
class Thread
|
||||
{
|
||||
ThreadImpl * m_impl;
|
||||
IRoutine * m_routine;
|
||||
/// Run thread immediately.
|
||||
/// @param pRoutine is owned by Thread class
|
||||
bool Create(IRoutine * pRoutine);
|
||||
|
||||
Thread(Thread const &);
|
||||
Thread & operator=(Thread const &);
|
||||
/// Calling the IRoutine::Cancel method, and Join'ing with the task execution.
|
||||
void Cancel();
|
||||
|
||||
public:
|
||||
Thread();
|
||||
~Thread();
|
||||
/// Wait for thread ending.
|
||||
void Join();
|
||||
|
||||
/// Run thread immediately
|
||||
/// @param pRoutine is owned by Thread class
|
||||
bool Create(IRoutine * pRoutine);
|
||||
/// Calling the IRoutine::Cancel method, and Join'ing with the task execution.
|
||||
void Cancel();
|
||||
/// wait for thread ending
|
||||
void Join();
|
||||
};
|
||||
private:
|
||||
DISALLOW_COPY_AND_MOVE(Thread);
|
||||
};
|
||||
|
||||
/// Simple threads container. Takes ownership for every added IRoutine.
|
||||
class SimpleThreadPool : public noncopyable
|
||||
{
|
||||
typedef pair<Thread *, IRoutine *> ValueT;
|
||||
vector<ValueT> m_pool;
|
||||
/// Simple threads container. Takes ownership for every added IRoutine.
|
||||
class SimpleThreadPool : public noncopyable
|
||||
{
|
||||
typedef pair<Thread *, IRoutine *> ValueT;
|
||||
vector<ValueT> m_pool;
|
||||
|
||||
public:
|
||||
SimpleThreadPool(size_t reserve = 0);
|
||||
~SimpleThreadPool();
|
||||
public:
|
||||
SimpleThreadPool(size_t reserve = 0);
|
||||
~SimpleThreadPool();
|
||||
|
||||
void Add(IRoutine * pRoutine);
|
||||
void Join();
|
||||
void Add(IRoutine * pRoutine);
|
||||
void Join();
|
||||
|
||||
IRoutine * GetRoutine(size_t i) const;
|
||||
};
|
||||
IRoutine * GetRoutine(size_t i) const;
|
||||
};
|
||||
|
||||
/// Suspends the execution of the current thread until the time-out interval elapses.
|
||||
/// @param[in] ms time-out interval in milliseconds
|
||||
void Sleep(size_t ms);
|
||||
/// Suspends the execution of the current thread until the time-out interval elapses.
|
||||
/// @param[in] ms time-out interval in milliseconds
|
||||
void Sleep(size_t ms);
|
||||
|
||||
#ifdef OMIM_OS_WINDOWS
|
||||
typedef DWORD ThreadID;
|
||||
#else
|
||||
typedef void * ThreadID;
|
||||
#endif
|
||||
typedef thread::id ThreadID;
|
||||
|
||||
ThreadID GetCurrentThreadID();
|
||||
ThreadID GetCurrentThreadID();
|
||||
|
||||
} // namespace threads
|
||||
} // namespace threads
|
||||
|
|
|
@ -1,22 +1,14 @@
|
|||
#include "threaded_container.hpp"
|
||||
|
||||
ThreadedContainer::ThreadedContainer()
|
||||
: m_WaitTime(0), m_IsCancelled(false)
|
||||
{
|
||||
}
|
||||
ThreadedContainer::ThreadedContainer() : m_WaitTime(0) {}
|
||||
|
||||
void ThreadedContainer::Cancel()
|
||||
{
|
||||
threads::ConditionGuard g(m_Cond);
|
||||
m_IsCancelled = true;
|
||||
my::Cancellable::Cancel();
|
||||
m_Cond.Signal(true);
|
||||
}
|
||||
|
||||
bool ThreadedContainer::IsCancelled() const
|
||||
{
|
||||
return m_IsCancelled;
|
||||
}
|
||||
|
||||
double ThreadedContainer::WaitTime() const
|
||||
{
|
||||
return m_WaitTime;
|
||||
|
|
|
@ -1,9 +1,10 @@
|
|||
#pragma once
|
||||
|
||||
#include "cancellable.hpp"
|
||||
#include "condition.hpp"
|
||||
#include "timer.hpp"
|
||||
|
||||
struct ThreadedContainer
|
||||
struct ThreadedContainer : public my::Cancellable
|
||||
{
|
||||
protected:
|
||||
|
||||
|
@ -11,14 +12,12 @@ protected:
|
|||
double m_WaitTime;
|
||||
|
||||
mutable threads::Condition m_Cond;
|
||||
bool m_IsCancelled;
|
||||
|
||||
public:
|
||||
|
||||
ThreadedContainer();
|
||||
|
||||
void Cancel();
|
||||
bool IsCancelled() const;
|
||||
/// Cancellable overrides:
|
||||
void Cancel() override;
|
||||
|
||||
double WaitTime() const;
|
||||
};
|
||||
|
|
|
@ -308,7 +308,7 @@ namespace graphics
|
|||
bool Renderer::isCancelled() const
|
||||
{
|
||||
if (m_env)
|
||||
return m_env->isCancelled();
|
||||
return m_env->IsCancelled();
|
||||
else
|
||||
return false;
|
||||
}
|
||||
|
|
|
@ -23,7 +23,7 @@ void PaintEvent::cancel()
|
|||
bool PaintEvent::isCancelled() const
|
||||
{
|
||||
if (m_env)
|
||||
return m_env->isCancelled();
|
||||
return m_env->IsCancelled();
|
||||
else
|
||||
return m_isCancelled;
|
||||
}
|
||||
|
|
|
@ -199,7 +199,7 @@ void TileRenderer::DrawTile(core::CommandsQueue::Environment const & env,
|
|||
drawer->screen()->resetOverlay();
|
||||
drawer->screen()->copyFramebufferToImage(tileTarget);
|
||||
|
||||
if (!env.isCancelled())
|
||||
if (!env.IsCancelled())
|
||||
{
|
||||
if (glQueue)
|
||||
glQueue->completeCommands();
|
||||
|
@ -210,7 +210,7 @@ void TileRenderer::DrawTile(core::CommandsQueue::Environment const & env,
|
|||
glQueue->cancelCommands();
|
||||
}
|
||||
|
||||
if (env.isCancelled())
|
||||
if (env.IsCancelled())
|
||||
{
|
||||
texturePool->Free(tileTarget);
|
||||
}
|
||||
|
|
|
@ -7,9 +7,10 @@
|
|||
|
||||
#include <chrono>
|
||||
|
||||
using std::chrono::high_resolution_clock;
|
||||
using std::chrono::nanoseconds;
|
||||
using std::chrono::duration_cast;
|
||||
using std::chrono::high_resolution_clock;
|
||||
using std::chrono::milliseconds;
|
||||
using std::chrono::nanoseconds;
|
||||
|
||||
#ifdef DEBUG_NEW
|
||||
#define new DEBUG_NEW
|
||||
|
|
Loading…
Add table
Reference in a new issue