From e17a9e0af54a264ad9fc664d6c8fedf2d482f348 Mon Sep 17 00:00:00 2001 From: Constantin Shalnev Date: Wed, 9 Dec 2015 12:38:39 +0300 Subject: [PATCH] Gps track writes/reads data in worker thread --- map/gps_track.cpp | 405 +++++++++++++++++++------------ map/gps_track.hpp | 53 +++- map/map_tests/gps_track_test.cpp | 147 +++++++++++ map/map_tests/map_tests.pro | 1 + 4 files changed, 433 insertions(+), 173 deletions(-) create mode 100644 map/map_tests/gps_track_test.cpp diff --git a/map/gps_track.cpp b/map/gps_track.cpp index 13e103ca7f..34fcf3afa3 100644 --- a/map/gps_track.cpp +++ b/map/gps_track.cpp @@ -12,193 +12,143 @@ namespace { -size_t const kGpsCollectionMaxItemCount = 100000; -size_t const kGpsFileMaxItemCount = 100000; +size_t const kMaxItemCount = 100000; hours const kDefaultDuration = hours(24); +inline pair UnionRanges(pair const & a, pair const & b) +{ + if (a.first == GpsTrack::kInvalidId) + { + ASSERT_EQUAL(a.second, GpsTrack::kInvalidId, ()); + return b; + } + if (b.first == GpsTrack::kInvalidId) + { + ASSERT_EQUAL(b.second, GpsTrack::kInvalidId, ()); + return a; + } + ASSERT_LESS_OR_EQUAL(a.first, a.second, ()); + ASSERT_LESS_OR_EQUAL(b.first, b.second, ()); + return make_pair(min(a.first, b.first), max(a.second, b.second)); +} + } // namespace size_t const GpsTrack::kInvalidId = GpsTrackCollection::kInvalidId; -GpsTrack::GpsTrack(string const & filePath) - : m_filePath(filePath) - , m_duration(kDefaultDuration) +GpsTrack::GpsTrack(string const & filePath, size_t maxItemCount, hours duration) + : m_maxItemCount(maxItemCount) + , m_filePath(filePath) + , m_duration(duration) + , m_needClear(false) + , m_needSendSnapshop(false) + , m_threadExit(false) + , m_threadWakeup(false) { + ASSERT_GREATER(m_maxItemCount, 0, ()); + ASSERT(!m_filePath.empty(), ()); + ASSERT_GREATER(m_duration.count(), 0, ()); } -void GpsTrack::AddPoint(location::GpsTrackInfo const & info) +GpsTrack::~GpsTrack() { - lock_guard lg(m_guard); - - LazyInitFile(); - - // Write point to the file. - // If file exception happens, then drop file. - if (m_file) + if (m_thread.joinable()) { - try { - size_t evictedId; - m_file->Append(info, evictedId); - } - catch (RootException & e) - { - LOG(LINFO, ("GpsTrackFile.Append has caused exception:", e.Msg())); - m_file.reset(); + lock_guard lg(m_threadGuard); + m_threadExit = true; + m_cv.notify_one(); } + m_thread.join(); } - - if (!m_collection) - return; - - // Write point to the collection - pair evictedIds; - size_t const addedId = m_collection->Add(info, evictedIds); - if (addedId == GpsTrackCollection::kInvalidId) - return; // nothing was added - - if (!m_callback) - return; - - vector> toAdd; - toAdd.emplace_back(addedId, info); - - m_callback(move(toAdd), move(evictedIds)); } -void GpsTrack::AddPoints(vector const & points) +void GpsTrack::AddPoint(TItem const & point) { - if (points.empty()) - return; - - lock_guard lg(m_guard); - - LazyInitFile(); - - // Write point to the file. - // If file exception happens, then drop file. - if (m_file) { - try - { - for (auto const & point : points) - { - size_t evictedId; - m_file->Append(point, evictedId); - } - } - catch (RootException & e) - { - LOG(LINFO, ("GpsTrackFile.Append has caused exception:", e.Msg())); - m_file.reset(); - } + lock_guard lg(m_dataGuard); + m_points.emplace_back(point); } + ScheduleTask(); +} - if (!m_collection) - return; - - // Add points - pair evictedIds; - pair const addedIds = m_collection->Add(points, evictedIds); - if (addedIds.first == GpsTrackCollection::kInvalidId) - return; // nothing was added - - if (!m_callback) - return; - - size_t const addedCount = addedIds.second - addedIds.first + 1; - ASSERT_GREATER_OR_EQUAL(m_collection->GetSize(), addedCount, ()); - - // Not all points from infos could be added to collection due to timestamp consequence restriction. - // Get added points from collection. - vector> toAdd; - toAdd.reserve(addedCount); - m_collection->ForEach([&toAdd](location::GpsTrackInfo const & point, size_t id)->bool +void GpsTrack::AddPoints(vector const & points) +{ { - toAdd.emplace_back(id, point); - return true; - }, m_collection->GetSize() - addedCount); - ASSERT_EQUAL(toAdd.size(), addedCount, ()); - - m_callback(move(toAdd), evictedIds); + lock_guard lg(m_dataGuard); + m_points.insert(m_points.end(), points.begin(), points.end()); + } + ScheduleTask(); } void GpsTrack::Clear() { - lock_guard lg(m_guard); - - LazyInitFile(); - - if (m_file) { - try - { - m_file->Clear(); - } - catch (RootException & e) - { - LOG(LINFO, ("GpsTrackFile.Clear has caused exception:", e.Msg())); - m_file.reset(); - } + lock_guard lg(m_dataGuard); + m_points.clear(); + m_needClear = true; } - - if (!m_collection) - return; - - auto const evictedIds = m_collection->Clear(); - if (evictedIds.first == GpsTrackCollection::kInvalidId) - return; // nothing was removed - - if (!m_callback) - return; - - m_callback(vector>(), evictedIds); + ScheduleTask(); } void GpsTrack::SetDuration(hours duration) { - lock_guard lg(m_guard); + ASSERT_GREATER(duration.count(), 0, ()); - m_duration = duration; - - if (!m_collection) - return; - - auto const evictedIds = m_collection->SetDuration(duration); - if (evictedIds.first == GpsTrackCollection::kInvalidId) - return; // nothing was removed - - if (!m_callback) - return; - - m_callback(vector>(), evictedIds); + { + lock_guard lg(m_dataGuard); + if (m_duration == duration) + return; + m_duration = duration; + } + if (HasCallback()) + ScheduleTask(); } hours GpsTrack::GetDuration() const { - lock_guard lg(m_guard); - + lock_guard lg(m_dataGuard); return m_duration; } void GpsTrack::SetCallback(TGpsTrackDiffCallback callback) { - lock_guard lg(m_guard); + { + lock_guard lg(m_callbackGuard); + m_callback = callback; + m_needSendSnapshop = true; + } + ScheduleTask(); +} - m_callback = callback; +void GpsTrack::ScheduleTask() +{ + lock_guard lg(m_threadGuard); - if (!callback) - return; + if (m_thread.get_id() == thread::id()) + { + m_thread = thread([this]() + { + unique_lock ul(m_threadGuard); + while (true) + { + m_cv.wait(ul, [this]()->bool{ return m_threadExit || m_threadWakeup; }); + if (m_threadExit) + break; + m_threadWakeup = false; + ProcessPoints(); + } - LazyInitCollection(); + CloseFile(); + }); + } - SendInitialSnapshot(); + m_threadWakeup = true; + m_cv.notify_one(); } void GpsTrack::LazyInitFile() { - // Must be called under m_guard lock - if (m_file) return; @@ -207,9 +157,9 @@ void GpsTrack::LazyInitFile() // Open or create gps track file try { - if (!m_file->Open(m_filePath, kGpsFileMaxItemCount)) + if (!m_file->Open(m_filePath, m_maxItemCount)) { - if (!m_file->Create(m_filePath, kGpsFileMaxItemCount)) + if (!m_file->Create(m_filePath, m_maxItemCount)) { LOG(LINFO, ("Cannot open or create GpsTrackFile:", m_filePath)); m_file.reset(); @@ -224,10 +174,11 @@ void GpsTrack::LazyInitFile() { // File has been corrupted. // Drop any data from the file. + // If file exception happens, then drop file. try { LOG(LINFO, ("File is corrupted, create new:", m_filePath)); - if (!m_file->Create(m_filePath, kGpsFileMaxItemCount)) + if (!m_file->Create(m_filePath, m_maxItemCount)) { LOG(LINFO, ("Cannot create GpsTrackFile:", m_filePath)); m_file.reset(); @@ -250,24 +201,41 @@ void GpsTrack::LazyInitFile() } } -void GpsTrack::LazyInitCollection() +void GpsTrack::CloseFile() { - // Must be called under m_guard lock - - if (m_collection) + if (!m_file) return; - m_collection = make_unique(kGpsCollectionMaxItemCount, m_duration); + try + { + m_file->Close(); + m_file.reset(); + } + catch (RootException & e) + { + LOG(LINFO, ("GpsTrackFile.Close has caused exception:", e.Msg())); + m_file.reset(); + } +} + +void GpsTrack::InitCollection(hours duration) +{ + ASSERT(m_collection == nullptr, ()); + + m_collection = make_unique(m_maxItemCount, duration); LazyInitFile(); if (!m_file) return; + // Read points from the file + // If CorruptedFileException happens, the clear the file + // If file exception happens, then drop file. try { // Read points from file to the collection - m_file->ForEach([this](location::GpsTrackInfo const & info, size_t /* id */)->bool + m_file->ForEach([this](TItem const & info, size_t /* id */)->bool { pair evictedIds; m_collection->Add(info, evictedIds); @@ -290,30 +258,147 @@ void GpsTrack::LazyInitCollection() } } -void GpsTrack::SendInitialSnapshot() +void GpsTrack::ProcessPoints() { - // Must be called under m_guard lock + vector points; + hours duration; + bool needClear; + // Steal data for processing + { + lock_guard lg(m_dataGuard); + points.swap(m_points); + duration = m_duration; + needClear = m_needClear; + m_needClear = false; + } + + // Create collection only if callback appears + if (!m_collection && HasCallback()) + InitCollection(duration); + + UpdateFile(needClear, points); + + if (!m_collection) + return; + + UpdateCollection(duration, needClear, points); +} + +bool GpsTrack::HasCallback() +{ + lock_guard lg(m_callbackGuard); + return m_callback != nullptr; +} + +void GpsTrack::UpdateFile(bool needClear, vector const & points) +{ + // Update file, if need + // If file exception happens, then drop the file. + + LazyInitFile(); + + if (!m_file) + return; + + try + { + // clear points from file, if need + if (needClear) + m_file->Clear(); + + // add points to file if need + for (auto const & point : points) + { + size_t evictedId; + m_file->Append(point, evictedId); + } + } + catch (RootException & e) + { + LOG(LINFO, ("GpsTrackFile.Append has caused exception:", e.Msg())); + m_file.reset(); + } +} + +void GpsTrack::UpdateCollection(hours duration, bool needClear, vector const & points) +{ + // Apply Clear, SetDuration and Add points + + // Clear points from collection, if need. + pair evictedIdsByClear = make_pair(kInvalidId, kInvalidId); + if (needClear) + evictedIdsByClear = m_collection->Clear(false /* resetIds */); + + // Set duration for collection, if need + // Set duration before Add because new duration can be more than previous value. + pair evictedIdsByDuration = make_pair(kInvalidId, kInvalidId); + if (duration != m_collection->GetDuration()) + evictedIdsByDuration = m_collection->SetDuration(duration); + + // Add points to the collection, if need + pair evictedIds = make_pair(kInvalidId, kInvalidId); + pair addedIds = make_pair(kInvalidId, kInvalidId); + if (!points.empty()) + addedIds = m_collection->Add(points, evictedIds); + + // Result evicted is + evictedIds = UnionRanges(evictedIds, UnionRanges(evictedIdsByClear, evictedIdsByDuration)); + + // Send callback notification. + // Callback must be protected by m_callbackGuard + + lock_guard lg(m_callbackGuard); if (!m_callback) return; - // Get points from collection to send them to the callback - vector> toAdd; - toAdd.reserve(m_collection->GetSize()); - m_collection->ForEach([&toAdd](location::GpsTrackInfo const & info, size_t id)->bool + if (m_needSendSnapshop) { - toAdd.emplace_back(id, info); - return true; - }); + m_needSendSnapshop = false; - if (toAdd.empty()) - return; // nothing to send + // Get all points from collection to send them to the callback + vector> toAdd; + toAdd.reserve(m_collection->GetSize()); + m_collection->ForEach([&toAdd](TItem const & point, size_t id)->bool + { + toAdd.emplace_back(id, point); + return true; + }); - m_callback(move(toAdd), make_pair(kInvalidId, kInvalidId)); + if (toAdd.empty()) + return; // nothing to send + + m_callback(move(toAdd), make_pair(kInvalidId, kInvalidId)); + } + else + { + vector> toAdd; + if (addedIds.first != kInvalidId) + { + size_t const addedCount = addedIds.second - addedIds.first + 1; + ASSERT_GREATER_OR_EQUAL(m_collection->GetSize(), addedCount, ()); + + // Not all points from infos could be added to collection due to timestamp consequence restriction. + // Get added points from collection - take last points from collection, these points + // were added this time. + toAdd.reserve(addedCount); + m_collection->ForEach([&toAdd](TItem const & point, size_t id)->bool + { + toAdd.emplace_back(id, point); + return true; + }, m_collection->GetSize() - addedCount); + ASSERT_EQUAL(toAdd.size(), addedCount, ()); + } + + if (toAdd.empty() && evictedIds.first == kInvalidId) + return; // nothing to send + + m_callback(move(toAdd), evictedIds); + } } GpsTrack & GetDefaultGpsTrack() { - static GpsTrack instance(my::JoinFoldersToPath(GetPlatform().WritableDir(), GPS_TRACK_FILENAME)); + static GpsTrack instance(my::JoinFoldersToPath(GetPlatform().WritableDir(), GPS_TRACK_FILENAME), kMaxItemCount, kDefaultDuration); return instance; } diff --git a/map/gps_track.hpp b/map/gps_track.hpp index bcc52a10ce..d4f60f1daf 100644 --- a/map/gps_track.hpp +++ b/map/gps_track.hpp @@ -3,7 +3,9 @@ #include "map/gps_track_collection.hpp" #include "map/gps_track_file.hpp" +#include "std/condition_variable.hpp" #include "std/mutex.hpp" +#include "std/thread.hpp" #include "std/unique_ptr.hpp" class GpsTrack final @@ -11,13 +13,21 @@ class GpsTrack final public: static size_t const kInvalidId; // = numeric_limits::max(); - GpsTrack(string const & filePath); + using TItem = location::GpsTrackInfo; + + /// @param filePath - path to the file on disk to persist track + GpsTrack(string const & filePath, size_t maxItemCount, hours duration); + + ~GpsTrack(); /// Adds point or collection of points to gps tracking - void AddPoint(location::GpsTrackInfo const & point); - void AddPoints(vector const & points); + /// @note Callback is called with 'toAdd' and 'toRemove' points, if some points were added or removed. + /// @note Only points with good timestamp will be added, other will be skipped. + void AddPoint(TItem const & point); + void AddPoints(vector const & points); /// Clears any previous tracking info + /// @note Callback is called with 'toRemove' points, if some points were removed. void Clear(); /// Sets tracking duration in hours. @@ -31,8 +41,8 @@ public: /// Notification callback about a change of the gps track. /// @param toAdd - collection of points and ids to add. /// @param toRemove - range of point indices to remove, or pair(kInvalidId,kInvalidId) if nothing to remove - /// @note Calling of a GpsTrack's function from the callback causes deadlock. - using TGpsTrackDiffCallback = std::function> && toAdd, + /// @note Calling of a GpsTrack.SetCallback function from the callback causes deadlock. + using TGpsTrackDiffCallback = std::function> && toAdd, pair const & toRemove)>; /// Sets callback on change of gps track. @@ -43,21 +53,38 @@ public: void SetCallback(TGpsTrackDiffCallback callback); private: + void ScheduleTask(); + void ProcessPoints(); // called on the worker thread + bool HasCallback(); void LazyInitFile(); - void LazyInitCollection(); - void SendInitialSnapshot(); + void CloseFile(); + void InitCollection(hours duration); + void UpdateFile(bool needClear, vector const & points); + void UpdateCollection(hours duration, bool needClear, vector const & points); + size_t const m_maxItemCount; string const m_filePath; - mutable mutex m_guard; - + mutable mutex m_dataGuard; // protects data for stealing + vector m_points; // accumulated points to adding hours m_duration; + bool m_needClear; // need clear file - unique_ptr m_file; - - unique_ptr m_collection; - + mutex m_callbackGuard; + // Callback is protected by m_callbackGuard. It ensures that SetCallback and call callback + // will not be interleaved and after SetCallback(null) callbakc is never called. The negative side + // is that GpsTrack.SetCallback must be never called from the callback. TGpsTrackDiffCallback m_callback; + bool m_needSendSnapshop; // need send initial snapshot + + unique_ptr m_file; // used in the worker thread + unique_ptr m_collection; // used in the worker thread + + mutex m_threadGuard; + thread m_thread; + bool m_threadExit; // need exit thread + bool m_threadWakeup; // need wakeup thread + condition_variable m_cv; }; GpsTrack & GetDefaultGpsTrack(); diff --git a/map/map_tests/gps_track_test.cpp b/map/map_tests/gps_track_test.cpp new file mode 100644 index 0000000000..2513558e05 --- /dev/null +++ b/map/map_tests/gps_track_test.cpp @@ -0,0 +1,147 @@ +#include "testing/testing.hpp" + +#include "map/gps_track.hpp" + +#include "platform/platform.hpp" + +#include "coding/file_name_utils.hpp" +#include "coding/file_writer.hpp" + +#include "geometry/latlon.hpp" + +#include "base/logging.hpp" +#include "base/scope_guard.hpp" + +#include "std/bind.hpp" +#include "std/chrono.hpp" + +#include "defines.hpp" + +namespace +{ + +inline location::GpsTrackInfo Make(double timestamp, ms::LatLon const & ll, double speed) +{ + location::GpsTrackInfo info; + info.m_timestamp = timestamp; + info.m_speed = speed; + info.m_latitude = ll.lat; + info.m_longitude = ll.lon; + return info; +} + +inline string GetGpsTrackFilePath() +{ + return my::JoinFoldersToPath(GetPlatform().WritableDir(), GPS_TRACK_FILENAME); +} + +class GpsTrackCallback +{ +public: + GpsTrackCallback() + : m_toRemove(make_pair(GpsTrack::kInvalidId, GpsTrack::kInvalidId)) + , m_gotCallback(false) + { + } + void OnUpdate(vector> && toAdd, + pair const & toRemove) + { + m_toAdd = move(toAdd); + m_toRemove = toRemove; + + lock_guard lg(m_mutex); + m_gotCallback = true; + m_cv.notify_one(); + } + void Reset() + { + m_toAdd.clear(); + m_toRemove = make_pair(GpsTrack::kInvalidId, GpsTrack::kInvalidId); + + lock_guard lg(m_mutex); + m_gotCallback = false; + } + bool WaitForCallback(seconds t) + { + unique_lock ul(m_mutex); + return m_cv.wait_for(ul, t, [this]()->bool{ return m_gotCallback; }); + } + + vector> m_toAdd; + pair m_toRemove; + +private: + mutex m_mutex; + condition_variable m_cv; + bool m_gotCallback; +}; + +seconds const kWaitForCallbackTimeout = seconds(5); + +} // namespace + +UNIT_TEST(GpsTrack_Simple) +{ + string const filePath = GetGpsTrackFilePath(); + MY_SCOPE_GUARD(gpsTestFileDeleter, bind(FileWriter::DeleteFileX, filePath)); + + time_t const t = system_clock::to_time_t(system_clock::now()); + double const timestamp = t; + LOG(LINFO, ("Timestamp", ctime(&t), timestamp)); + + size_t const maxItemCount = 100000; + size_t const writeItemCount = 50000; + + vector points; + points.reserve(writeItemCount); + for (size_t i = 0; i < writeItemCount; ++i) + points.emplace_back(Make(timestamp + i, ms::LatLon(-90 + i, -180 + i), 10 + i)); + + // Store points + { + GpsTrack track(filePath, maxItemCount, hours(24)); + + track.AddPoints(points); + + GpsTrackCallback callback; + + track.SetCallback(bind(&GpsTrackCallback::OnUpdate, &callback, _1, _2)); + + TEST(callback.WaitForCallback(kWaitForCallbackTimeout), ()); + + TEST_EQUAL(callback.m_toRemove.first, GpsTrack::kInvalidId, ()); + TEST_EQUAL(callback.m_toRemove.second, GpsTrack::kInvalidId, ()); + TEST_EQUAL(callback.m_toAdd.size(), writeItemCount, ()); + for (size_t i = 0; i < writeItemCount; ++i) + { + TEST_EQUAL(i, callback.m_toAdd[i].first, ()); + TEST_EQUAL(points[i].m_timestamp, callback.m_toAdd[i].second.m_timestamp, ()); + TEST_EQUAL(points[i].m_speed, callback.m_toAdd[i].second.m_speed, ()); + TEST_EQUAL(points[i].m_latitude, callback.m_toAdd[i].second.m_latitude, ()); + TEST_EQUAL(points[i].m_longitude, callback.m_toAdd[i].second.m_longitude, ()); + } + } + + // Restore points + { + GpsTrack track(filePath, maxItemCount, hours(24)); + + GpsTrackCallback callback; + + track.SetCallback(bind(&GpsTrackCallback::OnUpdate, &callback, _1, _2)); + + TEST(callback.WaitForCallback(kWaitForCallbackTimeout), ()); + + TEST_EQUAL(callback.m_toRemove.first, GpsTrack::kInvalidId, ()); + TEST_EQUAL(callback.m_toRemove.second, GpsTrack::kInvalidId, ()); + TEST_EQUAL(callback.m_toAdd.size(), writeItemCount, ()); + for (size_t i = 0; i < writeItemCount; ++i) + { + TEST_EQUAL(i, callback.m_toAdd[i].first, ()); + TEST_EQUAL(points[i].m_timestamp, callback.m_toAdd[i].second.m_timestamp, ()); + TEST_EQUAL(points[i].m_speed, callback.m_toAdd[i].second.m_speed, ()); + TEST_EQUAL(points[i].m_latitude, callback.m_toAdd[i].second.m_latitude, ()); + TEST_EQUAL(points[i].m_longitude, callback.m_toAdd[i].second.m_longitude, ()); + } + } +} diff --git a/map/map_tests/map_tests.pro b/map/map_tests/map_tests.pro index 57f280f850..ae2cfd66d6 100644 --- a/map/map_tests/map_tests.pro +++ b/map/map_tests/map_tests.pro @@ -33,6 +33,7 @@ SOURCES += \ geourl_test.cpp \ gps_track_collection_test.cpp \ gps_track_file_test.cpp \ + gps_track_test.cpp \ kmz_unarchive_test.cpp \ mwm_url_tests.cpp \