fixed bug with premature cancellation of ScheduledTask, added new tests for it and renamed variables according to our style guide.

This commit is contained in:
rachytski 2013-01-09 15:52:49 +03:00 committed by Alex Zolotarev
parent 435cb719be
commit dffeb8b659
6 changed files with 128 additions and 36 deletions

View file

@ -30,6 +30,15 @@ UNIT_TEST(ScheduledTask_Smoke)
CHECK(val == 10, ());
}
UNIT_TEST(ScheduledTask_CancelInfinite)
{
int val = 2;
ScheduledTask t0(bind(&add_int, ref(val), 10), -1);
t0.Cancel();
}
UNIT_TEST(ScheduledTask_Cancel)
{
int val = 2;
@ -46,3 +55,18 @@ UNIT_TEST(ScheduledTask_Cancel)
CHECK(val == 4, ());
}
UNIT_TEST(ScheduledTask_NoWaitInCancel)
{
int val = 2;
ScheduledTask t0(bind(&add_int, ref(val), 10), 1000);
ScheduledTask t1(bind(&mul_int, ref(val), 3), 500);
t0.Cancel();
val += 3;
threads::Sleep(600);
CHECK(val == 15, (val));
}

View file

@ -20,7 +20,9 @@ namespace threads
~Condition();
void Signal(bool broadcast = false);
void Wait(unsigned ms = -1);
void Wait();
/// @return whether we are exiting by timeout.
bool Wait(unsigned ms);
void Lock();
void Unlock();
};

View file

@ -8,6 +8,8 @@
#include "../std/stdint.hpp"
#include "../std/systime.hpp"
#include <sys/errno.h>
#include <pthread.h>
namespace threads
@ -41,24 +43,32 @@ namespace threads
::pthread_cond_signal(&m_pImpl->m_Condition);
}
void Condition::Wait(unsigned ms)
void Condition::Wait()
{
::pthread_cond_wait(&m_pImpl->m_Condition, &m_pImpl->m_Mutex.m_Mutex);
}
bool Condition::Wait(unsigned ms)
{
if (ms == -1)
::pthread_cond_wait(&m_pImpl->m_Condition, &m_pImpl->m_Mutex.m_Mutex);
else
{
::timeval curtv;
::gettimeofday(&curtv, 0);
::timespec ts;
uint64_t deltaNanoSec = curtv.tv_usec * 1000 + ms * 1000000;
ts.tv_sec = curtv.tv_sec + deltaNanoSec / 1000000000;
ts.tv_nsec = deltaNanoSec % 1000000000;
::pthread_cond_timedwait(&m_pImpl->m_Condition, &m_pImpl->m_Mutex.m_Mutex, &ts);
Wait();
return false;
}
::timeval curtv;
::gettimeofday(&curtv, 0);
::timespec ts;
uint64_t deltaNanoSec = curtv.tv_usec * 1000 + ms * 1000000;
ts.tv_sec = curtv.tv_sec + deltaNanoSec / 1000000000;
ts.tv_nsec = deltaNanoSec % 1000000000;
int res = ::pthread_cond_timedwait(&m_pImpl->m_Condition, &m_pImpl->m_Mutex.m_Mutex, &ts);
return (res == ETIMEDOUT);
}
void Condition::Lock()

View file

@ -22,6 +22,7 @@ namespace threads
virtual ~ConditionImpl() {}
virtual void Signal(bool broadcast) = 0;
virtual void Wait() = 0;
virtual bool Wait(unsigned ms) = 0;
virtual void Lock() = 0;
virtual void Unlock() = 0;
};
@ -51,9 +52,16 @@ namespace threads
m_pWake(&m_Condition);
}
void Wait(unsigned ms)
void Wait()
{
m_pSleep(&m_Condition, &m_mutex.m_Mutex, ms);
m_pSleep(&m_Condition, &m_mutex.m_Mutex, INFINITE);
}
bool Wait(unsigned ms)
{
if (!m_pSleep(&m_Condition, &m_mutex.m_Mutex, ms))
return GetLastError() == ERROR_TIMEOUT;
return false;
}
void Lock()
@ -157,7 +165,13 @@ namespace threads
}
}
void Wait(unsigned ms)
void Wait()
{
Wait(-1);
return false;
}
bool Wait(unsigned ms)
{
// Avoid race conditions
::EnterCriticalSection(&waiters_count_lock_);
@ -167,7 +181,13 @@ namespace threads
// This call atomically releases the mutex and waits on the
// semaphore until <pthread_cond_signal> or <pthread_cond_broadcast>
// are called by another thread
::SignalObjectAndWait(m_mutex, sema_, ms, FALSE);
DWORD toWait = (ms == -1) ? INFINITE : ms;
bool res = false;
if (::SignalObjectAndWait(m_mutex, sema_, toWait, FALSE) == WAIT_TIMEOUT)
res = true;
// Reacquire lock to avoid race conditions
::EnterCriticalSection(&waiters_count_lock_);
@ -190,6 +210,8 @@ namespace threads
// Always regain the external mutex since that's the guarantee we
// give to our callers.
::WaitForSingleObject(m_mutex, INFINITE);
return res;
}
void Lock()
@ -228,9 +250,14 @@ namespace threads
m_pImpl->Signal(broadcast);
}
void Condition::Wait(unsigned ms)
void Condition::Wait()
{
m_pImpl->Wait(ms);
return m_pImpl->Wait();
}
bool Condition::Wait(unsigned ms)
{
return m_pImpl->Wait(ms);
}
void Condition::Lock()

View file

@ -1,24 +1,50 @@
#include "scheduled_task.hpp"
#include "timer.hpp"
ScheduledTask::Routine::Routine(fn_t const & fn, size_t ms)
: m_Fn(fn), m_Interval(ms)
ScheduledTask::Routine::Routine(fn_t const & fn,
unsigned ms,
threads::Condition * cond)
: m_fn(fn),
m_ms(ms),
m_pCond(cond)
{}
void ScheduledTask::Routine::Do()
{
m_Cond.Lock();
m_Cond.Wait(m_Interval);
m_pCond->Lock();
unsigned msLeft = m_ms;
while (!IsCancelled())
{
my::Timer t;
if (m_pCond->Wait(msLeft))
break;
msLeft -= (unsigned)(t.ElapsedSeconds() * 1000);
}
if (!IsCancelled())
m_Fn();
m_Cond.Unlock();
m_fn();
m_pCond->Unlock();
}
ScheduledTask::ScheduledTask(fn_t const & fn, size_t ms)
void ScheduledTask::Routine::Cancel()
{
m_Thread.Create(new Routine(fn, ms));
m_pCond->Lock();
IRoutine::Cancel();
m_pCond->Signal();
m_pCond->Unlock();
}
ScheduledTask::ScheduledTask(fn_t const & fn, unsigned ms)
{
m_thread.Create(new Routine(fn, ms, &m_cond));
}
void ScheduledTask::Cancel()
{
m_Thread.Cancel();
m_thread.Cancel();
}

View file

@ -1,6 +1,7 @@
#pragma once
#include "../std/function.hpp"
#include "thread.hpp"
#include "condition.hpp"
@ -15,24 +16,26 @@ public:
class Routine : public threads::IRoutine
{
private:
fn_t m_Fn;
size_t m_Interval;
threads::Condition m_Cond;
fn_t m_fn;
unsigned m_ms;
threads::Condition * m_pCond;
public:
Routine(fn_t const & fn, size_t ms);
Routine(fn_t const & fn, unsigned ms, threads::Condition * cond);
void Do();
void Cancel();
};
private:
threads::Thread m_Thread;
threads::Thread m_thread;
threads::Condition m_cond;
public:
/// Constructor by function and time in miliseconds.
ScheduledTask(fn_t const& fn, size_t ms);
ScheduledTask(fn_t const & fn, unsigned ms);
/// Task could be cancelled before time elapses.
void Cancel();
};