Gps track writes/reads data in worker thread

This commit is contained in:
Constantin Shalnev 2015-12-09 12:38:39 +03:00
parent ed62f99af8
commit e17a9e0af5
4 changed files with 433 additions and 173 deletions

View file

@ -12,193 +12,143 @@
namespace
{
size_t const kGpsCollectionMaxItemCount = 100000;
size_t const kGpsFileMaxItemCount = 100000;
size_t const kMaxItemCount = 100000;
hours const kDefaultDuration = hours(24);
inline pair<size_t, size_t> UnionRanges(pair<size_t, size_t> const & a, pair<size_t, size_t> const & b)
{
if (a.first == GpsTrack::kInvalidId)
{
ASSERT_EQUAL(a.second, GpsTrack::kInvalidId, ());
return b;
}
if (b.first == GpsTrack::kInvalidId)
{
ASSERT_EQUAL(b.second, GpsTrack::kInvalidId, ());
return a;
}
ASSERT_LESS_OR_EQUAL(a.first, a.second, ());
ASSERT_LESS_OR_EQUAL(b.first, b.second, ());
return make_pair(min(a.first, b.first), max(a.second, b.second));
}
} // namespace
size_t const GpsTrack::kInvalidId = GpsTrackCollection::kInvalidId;
GpsTrack::GpsTrack(string const & filePath)
: m_filePath(filePath)
, m_duration(kDefaultDuration)
GpsTrack::GpsTrack(string const & filePath, size_t maxItemCount, hours duration)
: m_maxItemCount(maxItemCount)
, m_filePath(filePath)
, m_duration(duration)
, m_needClear(false)
, m_needSendSnapshop(false)
, m_threadExit(false)
, m_threadWakeup(false)
{
ASSERT_GREATER(m_maxItemCount, 0, ());
ASSERT(!m_filePath.empty(), ());
ASSERT_GREATER(m_duration.count(), 0, ());
}
void GpsTrack::AddPoint(location::GpsTrackInfo const & info)
GpsTrack::~GpsTrack()
{
lock_guard<mutex> lg(m_guard);
LazyInitFile();
// Write point to the file.
// If file exception happens, then drop file.
if (m_file)
if (m_thread.joinable())
{
try
{
size_t evictedId;
m_file->Append(info, evictedId);
}
catch (RootException & e)
{
LOG(LINFO, ("GpsTrackFile.Append has caused exception:", e.Msg()));
m_file.reset();
lock_guard<mutex> lg(m_threadGuard);
m_threadExit = true;
m_cv.notify_one();
}
m_thread.join();
}
if (!m_collection)
return;
// Write point to the collection
pair<size_t, size_t> evictedIds;
size_t const addedId = m_collection->Add(info, evictedIds);
if (addedId == GpsTrackCollection::kInvalidId)
return; // nothing was added
if (!m_callback)
return;
vector<pair<size_t, location::GpsTrackInfo>> toAdd;
toAdd.emplace_back(addedId, info);
m_callback(move(toAdd), move(evictedIds));
}
void GpsTrack::AddPoints(vector<location::GpsTrackInfo> const & points)
void GpsTrack::AddPoint(TItem const & point)
{
if (points.empty())
return;
lock_guard<mutex> lg(m_guard);
LazyInitFile();
// Write point to the file.
// If file exception happens, then drop file.
if (m_file)
{
try
{
for (auto const & point : points)
{
size_t evictedId;
m_file->Append(point, evictedId);
}
}
catch (RootException & e)
{
LOG(LINFO, ("GpsTrackFile.Append has caused exception:", e.Msg()));
m_file.reset();
}
lock_guard<mutex> lg(m_dataGuard);
m_points.emplace_back(point);
}
ScheduleTask();
}
if (!m_collection)
return;
// Add points
pair<size_t, size_t> evictedIds;
pair<size_t, size_t> const addedIds = m_collection->Add(points, evictedIds);
if (addedIds.first == GpsTrackCollection::kInvalidId)
return; // nothing was added
if (!m_callback)
return;
size_t const addedCount = addedIds.second - addedIds.first + 1;
ASSERT_GREATER_OR_EQUAL(m_collection->GetSize(), addedCount, ());
// Not all points from infos could be added to collection due to timestamp consequence restriction.
// Get added points from collection.
vector<pair<size_t, location::GpsTrackInfo>> toAdd;
toAdd.reserve(addedCount);
m_collection->ForEach([&toAdd](location::GpsTrackInfo const & point, size_t id)->bool
void GpsTrack::AddPoints(vector<TItem> const & points)
{
{
toAdd.emplace_back(id, point);
return true;
}, m_collection->GetSize() - addedCount);
ASSERT_EQUAL(toAdd.size(), addedCount, ());
m_callback(move(toAdd), evictedIds);
lock_guard<mutex> lg(m_dataGuard);
m_points.insert(m_points.end(), points.begin(), points.end());
}
ScheduleTask();
}
void GpsTrack::Clear()
{
lock_guard<mutex> lg(m_guard);
LazyInitFile();
if (m_file)
{
try
{
m_file->Clear();
}
catch (RootException & e)
{
LOG(LINFO, ("GpsTrackFile.Clear has caused exception:", e.Msg()));
m_file.reset();
}
lock_guard<mutex> lg(m_dataGuard);
m_points.clear();
m_needClear = true;
}
if (!m_collection)
return;
auto const evictedIds = m_collection->Clear();
if (evictedIds.first == GpsTrackCollection::kInvalidId)
return; // nothing was removed
if (!m_callback)
return;
m_callback(vector<pair<size_t, location::GpsTrackInfo>>(), evictedIds);
ScheduleTask();
}
void GpsTrack::SetDuration(hours duration)
{
lock_guard<mutex> lg(m_guard);
ASSERT_GREATER(duration.count(), 0, ());
m_duration = duration;
if (!m_collection)
return;
auto const evictedIds = m_collection->SetDuration(duration);
if (evictedIds.first == GpsTrackCollection::kInvalidId)
return; // nothing was removed
if (!m_callback)
return;
m_callback(vector<pair<size_t, location::GpsTrackInfo>>(), evictedIds);
{
lock_guard<mutex> lg(m_dataGuard);
if (m_duration == duration)
return;
m_duration = duration;
}
if (HasCallback())
ScheduleTask();
}
hours GpsTrack::GetDuration() const
{
lock_guard<mutex> lg(m_guard);
lock_guard<mutex> lg(m_dataGuard);
return m_duration;
}
void GpsTrack::SetCallback(TGpsTrackDiffCallback callback)
{
lock_guard<mutex> lg(m_guard);
{
lock_guard<mutex> lg(m_callbackGuard);
m_callback = callback;
m_needSendSnapshop = true;
}
ScheduleTask();
}
m_callback = callback;
void GpsTrack::ScheduleTask()
{
lock_guard<mutex> lg(m_threadGuard);
if (!callback)
return;
if (m_thread.get_id() == thread::id())
{
m_thread = thread([this]()
{
unique_lock<mutex> ul(m_threadGuard);
while (true)
{
m_cv.wait(ul, [this]()->bool{ return m_threadExit || m_threadWakeup; });
if (m_threadExit)
break;
m_threadWakeup = false;
ProcessPoints();
}
LazyInitCollection();
CloseFile();
});
}
SendInitialSnapshot();
m_threadWakeup = true;
m_cv.notify_one();
}
void GpsTrack::LazyInitFile()
{
// Must be called under m_guard lock
if (m_file)
return;
@ -207,9 +157,9 @@ void GpsTrack::LazyInitFile()
// Open or create gps track file
try
{
if (!m_file->Open(m_filePath, kGpsFileMaxItemCount))
if (!m_file->Open(m_filePath, m_maxItemCount))
{
if (!m_file->Create(m_filePath, kGpsFileMaxItemCount))
if (!m_file->Create(m_filePath, m_maxItemCount))
{
LOG(LINFO, ("Cannot open or create GpsTrackFile:", m_filePath));
m_file.reset();
@ -224,10 +174,11 @@ void GpsTrack::LazyInitFile()
{
// File has been corrupted.
// Drop any data from the file.
// If file exception happens, then drop file.
try
{
LOG(LINFO, ("File is corrupted, create new:", m_filePath));
if (!m_file->Create(m_filePath, kGpsFileMaxItemCount))
if (!m_file->Create(m_filePath, m_maxItemCount))
{
LOG(LINFO, ("Cannot create GpsTrackFile:", m_filePath));
m_file.reset();
@ -250,24 +201,41 @@ void GpsTrack::LazyInitFile()
}
}
void GpsTrack::LazyInitCollection()
void GpsTrack::CloseFile()
{
// Must be called under m_guard lock
if (m_collection)
if (!m_file)
return;
m_collection = make_unique<GpsTrackCollection>(kGpsCollectionMaxItemCount, m_duration);
try
{
m_file->Close();
m_file.reset();
}
catch (RootException & e)
{
LOG(LINFO, ("GpsTrackFile.Close has caused exception:", e.Msg()));
m_file.reset();
}
}
void GpsTrack::InitCollection(hours duration)
{
ASSERT(m_collection == nullptr, ());
m_collection = make_unique<GpsTrackCollection>(m_maxItemCount, duration);
LazyInitFile();
if (!m_file)
return;
// Read points from the file
// If CorruptedFileException happens, the clear the file
// If file exception happens, then drop file.
try
{
// Read points from file to the collection
m_file->ForEach([this](location::GpsTrackInfo const & info, size_t /* id */)->bool
m_file->ForEach([this](TItem const & info, size_t /* id */)->bool
{
pair<size_t, size_t> evictedIds;
m_collection->Add(info, evictedIds);
@ -290,30 +258,147 @@ void GpsTrack::LazyInitCollection()
}
}
void GpsTrack::SendInitialSnapshot()
void GpsTrack::ProcessPoints()
{
// Must be called under m_guard lock
vector<TItem> points;
hours duration;
bool needClear;
// Steal data for processing
{
lock_guard<mutex> lg(m_dataGuard);
points.swap(m_points);
duration = m_duration;
needClear = m_needClear;
m_needClear = false;
}
// Create collection only if callback appears
if (!m_collection && HasCallback())
InitCollection(duration);
UpdateFile(needClear, points);
if (!m_collection)
return;
UpdateCollection(duration, needClear, points);
}
bool GpsTrack::HasCallback()
{
lock_guard<mutex> lg(m_callbackGuard);
return m_callback != nullptr;
}
void GpsTrack::UpdateFile(bool needClear, vector<TItem> const & points)
{
// Update file, if need
// If file exception happens, then drop the file.
LazyInitFile();
if (!m_file)
return;
try
{
// clear points from file, if need
if (needClear)
m_file->Clear();
// add points to file if need
for (auto const & point : points)
{
size_t evictedId;
m_file->Append(point, evictedId);
}
}
catch (RootException & e)
{
LOG(LINFO, ("GpsTrackFile.Append has caused exception:", e.Msg()));
m_file.reset();
}
}
void GpsTrack::UpdateCollection(hours duration, bool needClear, vector<TItem> const & points)
{
// Apply Clear, SetDuration and Add points
// Clear points from collection, if need.
pair<size_t, size_t> evictedIdsByClear = make_pair(kInvalidId, kInvalidId);
if (needClear)
evictedIdsByClear = m_collection->Clear(false /* resetIds */);
// Set duration for collection, if need
// Set duration before Add because new duration can be more than previous value.
pair<size_t, size_t> evictedIdsByDuration = make_pair(kInvalidId, kInvalidId);
if (duration != m_collection->GetDuration())
evictedIdsByDuration = m_collection->SetDuration(duration);
// Add points to the collection, if need
pair<size_t, size_t> evictedIds = make_pair(kInvalidId, kInvalidId);
pair<size_t, size_t> addedIds = make_pair(kInvalidId, kInvalidId);
if (!points.empty())
addedIds = m_collection->Add(points, evictedIds);
// Result evicted is
evictedIds = UnionRanges(evictedIds, UnionRanges(evictedIdsByClear, evictedIdsByDuration));
// Send callback notification.
// Callback must be protected by m_callbackGuard
lock_guard<mutex> lg(m_callbackGuard);
if (!m_callback)
return;
// Get points from collection to send them to the callback
vector<pair<size_t, location::GpsTrackInfo>> toAdd;
toAdd.reserve(m_collection->GetSize());
m_collection->ForEach([&toAdd](location::GpsTrackInfo const & info, size_t id)->bool
if (m_needSendSnapshop)
{
toAdd.emplace_back(id, info);
return true;
});
m_needSendSnapshop = false;
if (toAdd.empty())
return; // nothing to send
// Get all points from collection to send them to the callback
vector<pair<size_t, TItem>> toAdd;
toAdd.reserve(m_collection->GetSize());
m_collection->ForEach([&toAdd](TItem const & point, size_t id)->bool
{
toAdd.emplace_back(id, point);
return true;
});
m_callback(move(toAdd), make_pair(kInvalidId, kInvalidId));
if (toAdd.empty())
return; // nothing to send
m_callback(move(toAdd), make_pair(kInvalidId, kInvalidId));
}
else
{
vector<pair<size_t, TItem>> toAdd;
if (addedIds.first != kInvalidId)
{
size_t const addedCount = addedIds.second - addedIds.first + 1;
ASSERT_GREATER_OR_EQUAL(m_collection->GetSize(), addedCount, ());
// Not all points from infos could be added to collection due to timestamp consequence restriction.
// Get added points from collection - take last <addedCount> points from collection, these points
// were added this time.
toAdd.reserve(addedCount);
m_collection->ForEach([&toAdd](TItem const & point, size_t id)->bool
{
toAdd.emplace_back(id, point);
return true;
}, m_collection->GetSize() - addedCount);
ASSERT_EQUAL(toAdd.size(), addedCount, ());
}
if (toAdd.empty() && evictedIds.first == kInvalidId)
return; // nothing to send
m_callback(move(toAdd), evictedIds);
}
}
GpsTrack & GetDefaultGpsTrack()
{
static GpsTrack instance(my::JoinFoldersToPath(GetPlatform().WritableDir(), GPS_TRACK_FILENAME));
static GpsTrack instance(my::JoinFoldersToPath(GetPlatform().WritableDir(), GPS_TRACK_FILENAME), kMaxItemCount, kDefaultDuration);
return instance;
}

View file

@ -3,7 +3,9 @@
#include "map/gps_track_collection.hpp"
#include "map/gps_track_file.hpp"
#include "std/condition_variable.hpp"
#include "std/mutex.hpp"
#include "std/thread.hpp"
#include "std/unique_ptr.hpp"
class GpsTrack final
@ -11,13 +13,21 @@ class GpsTrack final
public:
static size_t const kInvalidId; // = numeric_limits<size_t>::max();
GpsTrack(string const & filePath);
using TItem = location::GpsTrackInfo;
/// @param filePath - path to the file on disk to persist track
GpsTrack(string const & filePath, size_t maxItemCount, hours duration);
~GpsTrack();
/// Adds point or collection of points to gps tracking
void AddPoint(location::GpsTrackInfo const & point);
void AddPoints(vector<location::GpsTrackInfo> const & points);
/// @note Callback is called with 'toAdd' and 'toRemove' points, if some points were added or removed.
/// @note Only points with good timestamp will be added, other will be skipped.
void AddPoint(TItem const & point);
void AddPoints(vector<TItem> const & points);
/// Clears any previous tracking info
/// @note Callback is called with 'toRemove' points, if some points were removed.
void Clear();
/// Sets tracking duration in hours.
@ -31,8 +41,8 @@ public:
/// Notification callback about a change of the gps track.
/// @param toAdd - collection of points and ids to add.
/// @param toRemove - range of point indices to remove, or pair(kInvalidId,kInvalidId) if nothing to remove
/// @note Calling of a GpsTrack's function from the callback causes deadlock.
using TGpsTrackDiffCallback = std::function<void(vector<pair<size_t, location::GpsTrackInfo>> && toAdd,
/// @note Calling of a GpsTrack.SetCallback function from the callback causes deadlock.
using TGpsTrackDiffCallback = std::function<void(vector<pair<size_t, TItem>> && toAdd,
pair<size_t, size_t> const & toRemove)>;
/// Sets callback on change of gps track.
@ -43,21 +53,38 @@ public:
void SetCallback(TGpsTrackDiffCallback callback);
private:
void ScheduleTask();
void ProcessPoints(); // called on the worker thread
bool HasCallback();
void LazyInitFile();
void LazyInitCollection();
void SendInitialSnapshot();
void CloseFile();
void InitCollection(hours duration);
void UpdateFile(bool needClear, vector<TItem> const & points);
void UpdateCollection(hours duration, bool needClear, vector<TItem> const & points);
size_t const m_maxItemCount;
string const m_filePath;
mutable mutex m_guard;
mutable mutex m_dataGuard; // protects data for stealing
vector<TItem> m_points; // accumulated points to adding
hours m_duration;
bool m_needClear; // need clear file
unique_ptr<GpsTrackFile> m_file;
unique_ptr<GpsTrackCollection> m_collection;
mutex m_callbackGuard;
// Callback is protected by m_callbackGuard. It ensures that SetCallback and call callback
// will not be interleaved and after SetCallback(null) callbakc is never called. The negative side
// is that GpsTrack.SetCallback must be never called from the callback.
TGpsTrackDiffCallback m_callback;
bool m_needSendSnapshop; // need send initial snapshot
unique_ptr<GpsTrackFile> m_file; // used in the worker thread
unique_ptr<GpsTrackCollection> m_collection; // used in the worker thread
mutex m_threadGuard;
thread m_thread;
bool m_threadExit; // need exit thread
bool m_threadWakeup; // need wakeup thread
condition_variable m_cv;
};
GpsTrack & GetDefaultGpsTrack();

View file

@ -0,0 +1,147 @@
#include "testing/testing.hpp"
#include "map/gps_track.hpp"
#include "platform/platform.hpp"
#include "coding/file_name_utils.hpp"
#include "coding/file_writer.hpp"
#include "geometry/latlon.hpp"
#include "base/logging.hpp"
#include "base/scope_guard.hpp"
#include "std/bind.hpp"
#include "std/chrono.hpp"
#include "defines.hpp"
namespace
{
inline location::GpsTrackInfo Make(double timestamp, ms::LatLon const & ll, double speed)
{
location::GpsTrackInfo info;
info.m_timestamp = timestamp;
info.m_speed = speed;
info.m_latitude = ll.lat;
info.m_longitude = ll.lon;
return info;
}
inline string GetGpsTrackFilePath()
{
return my::JoinFoldersToPath(GetPlatform().WritableDir(), GPS_TRACK_FILENAME);
}
class GpsTrackCallback
{
public:
GpsTrackCallback()
: m_toRemove(make_pair(GpsTrack::kInvalidId, GpsTrack::kInvalidId))
, m_gotCallback(false)
{
}
void OnUpdate(vector<pair<size_t, location::GpsTrackInfo>> && toAdd,
pair<size_t, size_t> const & toRemove)
{
m_toAdd = move(toAdd);
m_toRemove = toRemove;
lock_guard<mutex> lg(m_mutex);
m_gotCallback = true;
m_cv.notify_one();
}
void Reset()
{
m_toAdd.clear();
m_toRemove = make_pair(GpsTrack::kInvalidId, GpsTrack::kInvalidId);
lock_guard<mutex> lg(m_mutex);
m_gotCallback = false;
}
bool WaitForCallback(seconds t)
{
unique_lock<mutex> ul(m_mutex);
return m_cv.wait_for(ul, t, [this]()->bool{ return m_gotCallback; });
}
vector<pair<size_t, location::GpsTrackInfo>> m_toAdd;
pair<size_t, size_t> m_toRemove;
private:
mutex m_mutex;
condition_variable m_cv;
bool m_gotCallback;
};
seconds const kWaitForCallbackTimeout = seconds(5);
} // namespace
UNIT_TEST(GpsTrack_Simple)
{
string const filePath = GetGpsTrackFilePath();
MY_SCOPE_GUARD(gpsTestFileDeleter, bind(FileWriter::DeleteFileX, filePath));
time_t const t = system_clock::to_time_t(system_clock::now());
double const timestamp = t;
LOG(LINFO, ("Timestamp", ctime(&t), timestamp));
size_t const maxItemCount = 100000;
size_t const writeItemCount = 50000;
vector<location::GpsTrackInfo> points;
points.reserve(writeItemCount);
for (size_t i = 0; i < writeItemCount; ++i)
points.emplace_back(Make(timestamp + i, ms::LatLon(-90 + i, -180 + i), 10 + i));
// Store points
{
GpsTrack track(filePath, maxItemCount, hours(24));
track.AddPoints(points);
GpsTrackCallback callback;
track.SetCallback(bind(&GpsTrackCallback::OnUpdate, &callback, _1, _2));
TEST(callback.WaitForCallback(kWaitForCallbackTimeout), ());
TEST_EQUAL(callback.m_toRemove.first, GpsTrack::kInvalidId, ());
TEST_EQUAL(callback.m_toRemove.second, GpsTrack::kInvalidId, ());
TEST_EQUAL(callback.m_toAdd.size(), writeItemCount, ());
for (size_t i = 0; i < writeItemCount; ++i)
{
TEST_EQUAL(i, callback.m_toAdd[i].first, ());
TEST_EQUAL(points[i].m_timestamp, callback.m_toAdd[i].second.m_timestamp, ());
TEST_EQUAL(points[i].m_speed, callback.m_toAdd[i].second.m_speed, ());
TEST_EQUAL(points[i].m_latitude, callback.m_toAdd[i].second.m_latitude, ());
TEST_EQUAL(points[i].m_longitude, callback.m_toAdd[i].second.m_longitude, ());
}
}
// Restore points
{
GpsTrack track(filePath, maxItemCount, hours(24));
GpsTrackCallback callback;
track.SetCallback(bind(&GpsTrackCallback::OnUpdate, &callback, _1, _2));
TEST(callback.WaitForCallback(kWaitForCallbackTimeout), ());
TEST_EQUAL(callback.m_toRemove.first, GpsTrack::kInvalidId, ());
TEST_EQUAL(callback.m_toRemove.second, GpsTrack::kInvalidId, ());
TEST_EQUAL(callback.m_toAdd.size(), writeItemCount, ());
for (size_t i = 0; i < writeItemCount; ++i)
{
TEST_EQUAL(i, callback.m_toAdd[i].first, ());
TEST_EQUAL(points[i].m_timestamp, callback.m_toAdd[i].second.m_timestamp, ());
TEST_EQUAL(points[i].m_speed, callback.m_toAdd[i].second.m_speed, ());
TEST_EQUAL(points[i].m_latitude, callback.m_toAdd[i].second.m_latitude, ());
TEST_EQUAL(points[i].m_longitude, callback.m_toAdd[i].second.m_longitude, ());
}
}
}

View file

@ -33,6 +33,7 @@ SOURCES += \
geourl_test.cpp \
gps_track_collection_test.cpp \
gps_track_file_test.cpp \
gps_track_test.cpp \
kmz_unarchive_test.cpp \
mwm_url_tests.cpp \