From a65d86ebcf216bd0480c7f576d0eff60df44a160 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=D0=94=D0=BE=D0=B1=D1=80=D1=8B=D0=B8=CC=86=20=D0=AD=D1=8D?= =?UTF-8?q?=D1=85?= Date: Fri, 7 Oct 2016 17:17:03 +0300 Subject: [PATCH] gps tracking reporter --- coding/traffic.hpp | 8 +- map/framework.cpp | 17 ++ map/framework.hpp | 4 + map/map_tests/map_tests.pro | 2 +- mapshot/mapshot.pro | 2 +- omim.pro | 7 +- platform/platform.pro | 1 + platform/socket.cpp | 122 +++++++++++++++ platform/socket.hpp | 18 ++- qt/qt.pro | 2 +- .../storage_integration_tests.pro | 2 +- tracking/connection.cpp | 44 ++++++ tracking/connection.hpp | 35 +++++ tracking/reporter.cpp | 147 ++++++++++++++++++ tracking/reporter.hpp | 38 +++++ tracking/tracking.pro | 15 ++ tracking/tracking_tests/reporter_tests.cpp | 62 ++++++++ tracking/tracking_tests/tracking_tests.pro | 29 ++++ 18 files changed, 545 insertions(+), 10 deletions(-) create mode 100644 platform/socket.cpp create mode 100644 tracking/connection.cpp create mode 100644 tracking/connection.hpp create mode 100644 tracking/reporter.cpp create mode 100644 tracking/reporter.hpp create mode 100644 tracking/tracking.pro create mode 100644 tracking/tracking_tests/reporter_tests.cpp create mode 100644 tracking/tracking_tests/tracking_tests.pro diff --git a/coding/traffic.hpp b/coding/traffic.hpp index ef4d83aa3d..c3313358a9 100644 --- a/coding/traffic.hpp +++ b/coding/traffic.hpp @@ -37,9 +37,9 @@ public: // Version 0: // Coordinates are truncated and stored as integers. All integers // are written as varints. - template + template static size_t SerializeDataPoints(uint32_t version, Writer & writer, - vector const & points) + Collection const & points) { ASSERT_LESS_OR_EQUAL(version, kLatestVersion, ()); @@ -78,8 +78,8 @@ public: } // Deserializes the points from |source| and appends them to |result|. - template - static void DeserializeDataPoints(uint32_t version, Source & src, vector & result) + template + static void DeserializeDataPoints(uint32_t version, Source & src, Collection & result) { ASSERT_LESS_OR_EQUAL(version, kLatestVersion, ()); diff --git a/map/framework.cpp b/map/framework.cpp index 947796101f..780c055c89 100644 --- a/map/framework.cpp +++ b/map/framework.cpp @@ -60,6 +60,7 @@ #include "platform/platform.hpp" #include "platform/preferred_languages.hpp" #include "platform/settings.hpp" +#include "platform/socket.hpp" #include "coding/internal/file_data.hpp" #include "coding/zip_reader.hpp" @@ -200,6 +201,8 @@ void Framework::OnLocationUpdate(GpsInfo const & info) CallDrapeFunction(bind(&df::DrapeEngine::SetGpsInfo, _1, rInfo, m_routingSession.IsNavigable(), routeMatchingInfo)); + if (IsTrackingReporterEnabled()) + m_trackingReporter.AddLocation(info); } void Framework::OnCompassUpdate(CompassInfo const & info) @@ -224,6 +227,19 @@ void Framework::SetMyPositionModeListener(TMyPositionModeChanged && fn) m_myPositionListener = move(fn); } +bool Framework::IsTrackingReporterEnabled() const +{ + if (m_currentRouterType != routing::RouterType::Vehicle) + return false; + + if (!m_routingSession.IsOnRoute()) + return false; + + bool allowStat = false; + settings::Get(tracking::Reporter::kAllowKey, allowStat); + return allowStat; +} + void Framework::OnUserPositionChanged(m2::PointD const & position) { MyPositionMarkPoint * myPosition = UserMarkContainer::UserMarkForMyPostion(); @@ -314,6 +330,7 @@ Framework::Framework() , m_storage(platform::migrate::NeedMigrate() ? COUNTRIES_OBSOLETE_FILE : COUNTRIES_FILE) , m_bmManager(*this) , m_isRenderingEnabled(true) + , m_trackingReporter(platform::createMockSocket(), tracking::Reporter::kPushDelayMs) , m_displacementModeManager([this](bool show) { int const mode = show ? dp::displacement::kHotelMode : dp::displacement::kDefaultMode; CallDrapeFunction(bind(&df::DrapeEngine::SetDisplacementMode, _1, mode)); diff --git a/map/framework.hpp b/map/framework.hpp index 961794dd16..eee4821557 100644 --- a/map/framework.hpp +++ b/map/framework.hpp @@ -33,6 +33,8 @@ #include "storage/downloading_policy.hpp" #include "storage/storage.hpp" +#include "tracking/reporter.hpp" + #include "partners_api/booking_api.hpp" #include "partners_api/uber_api.hpp" @@ -161,6 +163,7 @@ protected: uber::Api m_uberApi; bool m_isRenderingEnabled; + tracking::Reporter m_trackingReporter; /// This function will be called by m_storage when latest local files /// is downloaded. @@ -398,6 +401,7 @@ public: void SetMyPositionModeListener(location::TMyPositionModeChanged && fn); private: + bool IsTrackingReporterEnabled() const; void OnUserPositionChanged(m2::PointD const & position); //@} diff --git a/map/map_tests/map_tests.pro b/map/map_tests/map_tests.pro index e44ce96790..d476a428ae 100644 --- a/map/map_tests/map_tests.pro +++ b/map/map_tests/map_tests.pro @@ -6,7 +6,7 @@ CONFIG -= app_bundle TEMPLATE = app ROOT_DIR = ../.. -DEPENDENCIES = map drape_frontend routing search storage drape indexer partners_api platform editor geometry coding base \ +DEPENDENCIES = map drape_frontend routing search storage tracking drape indexer partners_api platform editor geometry coding base \ freetype fribidi expat protobuf tomcrypt jansson osrm stats_client minizip succinct pugixml stats_client DEPENDENCIES *= opening_hours diff --git a/mapshot/mapshot.pro b/mapshot/mapshot.pro index c1f37891d9..2a727a6a5a 100644 --- a/mapshot/mapshot.pro +++ b/mapshot/mapshot.pro @@ -1,7 +1,7 @@ # mapshot binary ROOT_DIR = .. -DEPENDENCIES = map drape_frontend routing search storage indexer drape partners_api platform editor geometry coding base \ +DEPENDENCIES = map drape_frontend routing search storage tracking indexer drape partners_api platform editor geometry coding base \ freetype expat fribidi tomcrypt gflags jansson protobuf osrm stats_client minizip succinct \ pugixml opening_hours diff --git a/omim.pro b/omim.pro index 9fc68bdcc5..7678c4009c 100644 --- a/omim.pro +++ b/omim.pro @@ -26,7 +26,7 @@ HEADERS += defines.hpp SUBDIRS = 3party base coding geometry editor indexer routing search !CONFIG(osrm) { - SUBDIRS *= platform stats storage + SUBDIRS *= platform stats storage tracking CONFIG(desktop) { SUBDIRS *= generator @@ -248,5 +248,10 @@ SUBDIRS = 3party base coding geometry editor indexer routing search partners_api_tests.subdir = partners_api/partners_api_tests partners_api_tests.depends = base platform partners_api SUBDIRS *= partners_api_tests + + tracking_tests.subdir = tracking/tracking_tests + tracking_tests.depends = base platform tracking + SUBDIRS *= tracking_tests + } # !no-tests } # !gtool diff --git a/platform/platform.pro b/platform/platform.pro index d05cc1f1e4..c6fb2fd10e 100644 --- a/platform/platform.pro +++ b/platform/platform.pro @@ -105,3 +105,4 @@ SOURCES += \ preferred_languages.cpp \ servers_list.cpp \ settings.cpp \ + socket.cpp \ diff --git a/platform/socket.cpp b/platform/socket.cpp new file mode 100644 index 0000000000..59eb1985a8 --- /dev/null +++ b/platform/socket.cpp @@ -0,0 +1,122 @@ +#include "socket.hpp" + +#include "base/assert.hpp" + +#include "std/algorithm.hpp" +#include "std/deque.hpp" +#include "std/mutex.hpp" + +namespace +{ +class MockSocket final : public platform::Socket +{ +public: + // Socket overrides + bool Open(string const & host, uint16_t port) override { return false; } + void Close() override {} + bool Read(uint8_t * data, uint32_t count) override { return false; } + bool Write(uint8_t const * data, uint32_t count) override { return false; } + void SetTimeout(uint32_t milliseconds) override {} +}; + +class TestSocketImpl final : public platform::TestSocket +{ +public: + // Socket overrides + ~TestSocketImpl(); + bool Open(string const & host, uint16_t port) override; + void Close() override; + bool Read(uint8_t * data, uint32_t count) override; + bool Write(uint8_t const * data, uint32_t count) override; + void SetTimeout(uint32_t milliseconds) override; + + // TestSocket overrides + bool HasInput() const override; + bool HasOutput() const override; + void WriteServer(uint8_t const * data, uint32_t count) override; + size_t ReadServer(vector & destination) override; + +private: + bool m_isConnected = false; + + deque m_input; + mutable mutex m_inputMutex; + + vector m_output; + mutable mutex m_outputMutex; +}; + +TestSocketImpl::~TestSocketImpl() { m_isConnected = false; } + +bool TestSocketImpl::Open(string const & host, uint16_t port) +{ + m_isConnected = true; + return true; +} + +void TestSocketImpl::Close() { m_isConnected = false; } +bool TestSocketImpl::Read(uint8_t * data, uint32_t count) +{ + if (!m_isConnected) + return false; + + lock_guard lg(m_inputMutex); + + if (m_input.size() < count) + return false; + + copy(m_input.begin(), m_input.end(), data); + m_input.erase(m_input.begin(), m_input.begin() + count); + return true; +} + +bool TestSocketImpl::Write(uint8_t const * data, uint32_t count) +{ + if (!m_isConnected) + return false; + + lock_guard lg(m_outputMutex); + m_output.insert(m_output.end(), data, data + count); + return true; +} + +void TestSocketImpl::SetTimeout(uint32_t milliseconds) {} +bool TestSocketImpl::HasInput() const +{ + lock_guard lg(m_inputMutex); + return !m_input.empty(); +} + +bool TestSocketImpl::HasOutput() const +{ + lock_guard lg(m_outputMutex); + return !m_output.empty(); +} + +void TestSocketImpl::WriteServer(uint8_t const * data, uint32_t count) +{ + ASSERT(m_isConnected, ()); + + lock_guard lg(m_inputMutex); + m_input.insert(m_input.end(), data, data + count); +} + +size_t TestSocketImpl::ReadServer(vector & destination) +{ + lock_guard lg(m_outputMutex); + + size_t const outputSize = m_output.size(); + if (outputSize == 0) + return 0; + + destination.insert(destination.end(), m_output.begin(), m_output.end()); + m_output.clear(); + return outputSize; +} +} // namespace + +namespace platform +{ +unique_ptr createMockSocket() { return make_unique(); } +unique_ptr createTestSocket() { return make_unique(); } +} // namespace platform diff --git a/platform/socket.hpp b/platform/socket.hpp index 01f51eb133..41a0d7711e 100644 --- a/platform/socket.hpp +++ b/platform/socket.hpp @@ -1,8 +1,8 @@ #pragma once #include "std/string.hpp" -#include "std/target_os.hpp" #include "std/unique_ptr.hpp" +#include "std/vector.hpp" namespace platform { @@ -26,5 +26,21 @@ public: virtual void SetTimeout(uint32_t milliseconds) = 0; }; +class TestSocket : public Socket +{ +public: + virtual bool HasInput() const = 0; + virtual bool HasOutput() const = 0; + + // Simulate server writing + virtual void WriteServer(uint8_t const * data, uint32_t count) = 0; + + // Simulate server reading + // returns size of read data + virtual size_t ReadServer(vector & destination) = 0; +}; + unique_ptr createSocket(); +unique_ptr createMockSocket(); +unique_ptr createTestSocket(); } // namespace platform diff --git a/qt/qt.pro b/qt/qt.pro index 1667db7068..e9a6d23bf7 100644 --- a/qt/qt.pro +++ b/qt/qt.pro @@ -1,6 +1,6 @@ # Main application in qt. ROOT_DIR = .. -DEPENDENCIES = map drape_frontend routing search storage indexer drape partners_api platform editor geometry \ +DEPENDENCIES = map drape_frontend routing search storage tracking indexer drape partners_api platform editor geometry \ coding base freetype expat fribidi tomcrypt jansson protobuf osrm stats_client \ minizip succinct pugixml oauthcpp diff --git a/storage/storage_integration_tests/storage_integration_tests.pro b/storage/storage_integration_tests/storage_integration_tests.pro index 7d6a357505..5f922c9fc7 100644 --- a/storage/storage_integration_tests/storage_integration_tests.pro +++ b/storage/storage_integration_tests/storage_integration_tests.pro @@ -6,7 +6,7 @@ CONFIG -= app_bundle TEMPLATE = app ROOT_DIR = ../.. -DEPENDENCIES = map drape_frontend routing search storage indexer drape partners_api platform_tests_support platform editor opening_hours geometry \ +DEPENDENCIES = map drape_frontend routing search storage tracking indexer drape partners_api platform_tests_support platform editor opening_hours geometry \ coding base freetype expat fribidi tomcrypt jansson protobuf osrm stats_client \ minizip succinct pugixml oauthcpp diff --git a/tracking/connection.cpp b/tracking/connection.cpp new file mode 100644 index 0000000000..09a78a1f41 --- /dev/null +++ b/tracking/connection.cpp @@ -0,0 +1,44 @@ +#include "connection.hpp" + +#include "platform/socket.hpp" + +namespace +{ +uint32_t constexpr kSocketTimeoutMs = 10000; +} // namespace + +namespace tracking +{ +// static +const char Connection::kHost[] = "gps.host"; // TODO change to real host value +uint16_t Connection::kPort = 666; // TODO change to real port value + +Connection::Connection(unique_ptr socket, string const & host, uint16_t port, + bool isHistorical) + : m_socket(move(socket)), m_host(host), m_port(port) +{ + ASSERT(m_socket.get() != nullptr, ()); + + m_socket->SetTimeout(kSocketTimeoutMs); +} + +// TODO: implement handshake +bool Connection::Reconnect() +{ + m_socket->Close(); + return m_socket->Open(m_host, m_port); +} + +// TODO: implement historical +bool Connection::Send(boost::circular_buffer const & points) +{ + ASSERT(m_buffer.empty(), ()); + + MemWriter writer(m_buffer); + coding::TrafficGPSEncoder::SerializeDataPoints(coding::TrafficGPSEncoder::kLatestVersion, writer, + points); + bool isSuccess = m_socket->Write(m_buffer.data(), m_buffer.size()); + m_buffer.clear(); + return isSuccess; +} +} // namespace tracking diff --git a/tracking/connection.hpp b/tracking/connection.hpp new file mode 100644 index 0000000000..117d014903 --- /dev/null +++ b/tracking/connection.hpp @@ -0,0 +1,35 @@ +#pragma once + +#include "boost/circular_buffer.hpp" + +#include "coding/traffic.hpp" + +#include "std/vector.hpp" + +namespace platform +{ +class Socket; +} + +namespace tracking +{ +using DataPoint = coding::TrafficGPSEncoder::DataPoint; + +class Connection final +{ +public: + static const char kHost[]; + static uint16_t kPort; + + Connection(unique_ptr socket, string const & host, uint16_t port, + bool isHistorical); + bool Reconnect(); + bool Send(boost::circular_buffer const & points); + +private: + unique_ptr m_socket; + string const m_host; + uint16_t const m_port; + vector m_buffer; +}; +} // namespace tracking diff --git a/tracking/reporter.cpp b/tracking/reporter.cpp new file mode 100644 index 0000000000..6021ac5d9c --- /dev/null +++ b/tracking/reporter.cpp @@ -0,0 +1,147 @@ +#include "reporter.hpp" + +#include "base/logging.hpp" +#include "base/thread.hpp" +#include "base/timer.hpp" + +#include "boost/circular_buffer.hpp" + +#include "platform/location.hpp" +#include "platform/socket.hpp" + +#include "std/mutex.hpp" +#include "std/vector.hpp" + +#include "tracking/connection.hpp" + +using namespace tracking; + +namespace +{ +double constexpr kMinDelaySeconds = 1.0; +double constexpr kReconnectDelaySeconds = 60.0; +size_t constexpr kRealTimeBufferSize = 60; + +class WorkerImpl final : public Reporter::Worker +{ +public: + WorkerImpl(unique_ptr socket, size_t pushDelayMs); + void Run(); + + // Worker overrides + void AddLocation(location::GpsInfo const & info); + void Stop(); + +private: + void FetchInput(); + bool SendPoints(); + + volatile bool m_stop = false; + Connection m_realtimeSender; + size_t m_pushDelayMs; + bool m_wasConnected = false; + double m_lastConnectTryTime = 0.0; + vector m_input; + mutex m_inputMutex; + boost::circular_buffer m_points; + double m_lastGpsTime = 0.0; +}; +} // namespace + +namespace tracking +{ +// static +const char Reporter::kAllowKey[] = "AllowStat"; + +Reporter::Reporter(unique_ptr socket, size_t pushDelayMs) +{ + WorkerImpl * worker = new WorkerImpl(move(socket), pushDelayMs); + m_worker = worker; + threads::SimpleThread thread([worker] + { + worker->Run(); + delete worker; + }); + thread.detach(); +} + +Reporter::~Reporter() { m_worker->Stop(); } + +void Reporter::AddLocation(location::GpsInfo const & info) { m_worker->AddLocation(info); } +} // namespace tracking + +namespace +{ +WorkerImpl::WorkerImpl(unique_ptr socket, size_t pushDelayMs) + : m_realtimeSender(move(socket), Connection::kHost, Connection::kPort, false) + , m_pushDelayMs(pushDelayMs) + , m_points(kRealTimeBufferSize) +{ +} + +void WorkerImpl::AddLocation(location::GpsInfo const & info) +{ + lock_guard lg(m_inputMutex); + + if (info.m_timestamp < m_lastGpsTime + kMinDelaySeconds) + return; + + m_lastGpsTime = info.m_timestamp; + m_input.push_back(DataPoint(info.m_timestamp, ms::LatLon(info.m_latitude, info.m_longitude))); +} + +void WorkerImpl::Run() +{ + LOG(LINFO, ("Tracking Reporter started")); + + my::HighResTimer timer; + + while (!m_stop) + { + uint64_t const startMs = timer.ElapsedMillis(); + + FetchInput(); + if (SendPoints()) + m_points.clear(); + + uint64_t const passedMs = timer.ElapsedMillis() - startMs; + if (passedMs < m_pushDelayMs) + threads::Sleep(m_pushDelayMs - passedMs); + } + + LOG(LINFO, ("Tracking Reporter finished")); +} + +void WorkerImpl::Stop() { m_stop = true; } +void WorkerImpl::FetchInput() +{ + lock_guard lg(m_inputMutex); + m_points.insert(m_points.end(), m_input.begin(), m_input.end()); + m_input.clear(); +} + +bool WorkerImpl::SendPoints() +{ + if (m_points.empty()) + return true; + + if (m_wasConnected) + m_wasConnected = m_realtimeSender.Send(m_points); + + if (m_wasConnected) + return true; + + double currentTime = my::Timer::LocalTime(); + if (currentTime < m_lastConnectTryTime + kReconnectDelaySeconds) + return false; + + m_lastConnectTryTime = currentTime; + + m_wasConnected = m_realtimeSender.Reconnect(); + if (!m_wasConnected) + return false; + + m_wasConnected = m_realtimeSender.Send(m_points); + return m_wasConnected; +} +} // namespace diff --git a/tracking/reporter.hpp b/tracking/reporter.hpp new file mode 100644 index 0000000000..13b5ffea7a --- /dev/null +++ b/tracking/reporter.hpp @@ -0,0 +1,38 @@ +#pragma once + +#include "std/unique_ptr.hpp" + +namespace location +{ +class GpsInfo; +} + +namespace platform +{ +class Socket; +} + +namespace tracking +{ +class Reporter final +{ +public: + static size_t constexpr kPushDelayMs = 10000; + static const char kAllowKey[]; + + Reporter(unique_ptr socket, size_t pushDelayMs); + ~Reporter(); + void AddLocation(location::GpsInfo const & info); + + class Worker + { + public: + virtual void AddLocation(location::GpsInfo const & info) = 0; + virtual void Stop() = 0; + }; + +private: + Worker * m_worker; +}; + +} // namespace tracking diff --git a/tracking/tracking.pro b/tracking/tracking.pro new file mode 100644 index 0000000000..2418e67637 --- /dev/null +++ b/tracking/tracking.pro @@ -0,0 +1,15 @@ +TARGET = tracking +TEMPLATE = lib +CONFIG += staticlib warn_on + +ROOT_DIR = .. + +include($$ROOT_DIR/common.pri) + +SOURCES += \ + connection.cpp \ + reporter.cpp \ + +HEADERS += \ + connection.hpp \ + reporter.hpp \ diff --git a/tracking/tracking_tests/reporter_tests.cpp b/tracking/tracking_tests/reporter_tests.cpp new file mode 100644 index 0000000000..f8dc89d38b --- /dev/null +++ b/tracking/tracking_tests/reporter_tests.cpp @@ -0,0 +1,62 @@ +#include "base/thread.hpp" +#include "coding/traffic.hpp" + +#include "platform/location.hpp" +#include "platform/socket.hpp" + +#include "std/cmath.hpp" + +#include "testing/testing.hpp" + +#include "tracking/reporter.hpp" + +namespace +{ +template +bool WaitCondition(Condition condition, size_t durationMs = 1000) +{ + size_t sleepMs = 10; + size_t cyclesLimit = durationMs / sleepMs; + for (size_t i = 0; i < cyclesLimit; ++i) + { + threads::Sleep(sleepMs); + if (condition()) + return true; + } + + return false; +} +} // namespace + +using namespace tracking; + +UNIT_TEST(Reporter_TransferLocation) +{ + unique_ptr socket = platform::createTestSocket(); + platform::TestSocket * testSocket = socket.get(); + + Reporter reporter(move(socket), 10); + + location::GpsInfo gpsInfo; + gpsInfo.m_timestamp = 3.0; + gpsInfo.m_latitude = 4.0; + gpsInfo.m_longitude = 5.0; + reporter.AddLocation(gpsInfo); + + TEST(WaitCondition([testSocket] { return testSocket->HasOutput(); }), ()); + + vector buffer; + testSocket->ReadServer(buffer); + + vector points; + MemReader memReader(buffer.data(), buffer.size()); + ReaderSource src(memReader); + coding::TrafficGPSEncoder::DeserializeDataPoints(coding::TrafficGPSEncoder::kLatestVersion, src, + points); + + TEST_EQUAL(points.size(), 1, ()); + coding::TrafficGPSEncoder::DataPoint const & point = points[0]; + TEST_EQUAL(point.m_timestamp, 3, ()); + TEST(abs(point.m_latLon.lat - 4.0) < 0.001, ()); + TEST(abs(point.m_latLon.lon - 5.0) < 0.001, ()); +} diff --git a/tracking/tracking_tests/tracking_tests.pro b/tracking/tracking_tests/tracking_tests.pro new file mode 100644 index 0000000000..f37f3fb860 --- /dev/null +++ b/tracking/tracking_tests/tracking_tests.pro @@ -0,0 +1,29 @@ +TARGET = tracking_tests +CONFIG += console warn_on +CONFIG -= app_bundle +TEMPLATE = app + +ROOT_DIR = ../.. + +INCLUDEPATH *= $$ROOT_DIR/3party/jansson/src + +DEPENDENCIES = base coding geometry platform routing stats_client tracking + +include($$ROOT_DIR/common.pri) + +DEFINES *= OMIM_UNIT_TEST_WITH_QT_EVENT_LOOP + +QT *= core + +macx-* { + QT *= widgets # needed for QApplication with event loop, to test async events + LIBS *= "-framework IOKit" "-framework SystemConfiguration" +} + +win*|linux* { + QT *= network +} + +SOURCES += \ + $$ROOT_DIR/testing/testingmain.cpp \ + reporter_tests.cpp \