diff --git a/coding/traffic.hpp b/coding/traffic.hpp index ef4d83aa3d..c3313358a9 100644 --- a/coding/traffic.hpp +++ b/coding/traffic.hpp @@ -37,9 +37,9 @@ public: // Version 0: // Coordinates are truncated and stored as integers. All integers // are written as varints. - template + template static size_t SerializeDataPoints(uint32_t version, Writer & writer, - vector const & points) + Collection const & points) { ASSERT_LESS_OR_EQUAL(version, kLatestVersion, ()); @@ -78,8 +78,8 @@ public: } // Deserializes the points from |source| and appends them to |result|. - template - static void DeserializeDataPoints(uint32_t version, Source & src, vector & result) + template + static void DeserializeDataPoints(uint32_t version, Source & src, Collection & result) { ASSERT_LESS_OR_EQUAL(version, kLatestVersion, ()); diff --git a/configure.sh b/configure.sh index 9679318bbd..c764b0e5eb 100755 --- a/configure.sh +++ b/configure.sh @@ -52,6 +52,10 @@ else #define BOOKING_SECRET "" #define UBER_SERVER_TOKEN "" #define UBER_CLIENT_ID "" +#define TRACKING_REALTIME_HOST "" +#define TRACKING_REALTIME_PORT 0 +#define TRACKING_HISTORICAL_HOST "" +#define TRACKING_HISTORICAL_PORT 0 ' > "$PRIVATE_HEADER" echo 'ext { spropStoreFile = "../tools/android/debug.keystore" diff --git a/map/framework.cpp b/map/framework.cpp index c406e71808..110352c18a 100644 --- a/map/framework.cpp +++ b/map/framework.cpp @@ -60,6 +60,7 @@ #include "platform/platform.hpp" #include "platform/preferred_languages.hpp" #include "platform/settings.hpp" +#include "platform/socket.hpp" #include "coding/internal/file_data.hpp" #include "coding/zip_reader.hpp" @@ -155,6 +156,17 @@ void CancelQuery(weak_ptr & handle) queryHandle->Cancel(); handle.reset(); } + +class StubSocket final : public platform::Socket +{ +public: + // Socket overrides + bool Open(string const & host, uint16_t port) override { return false; } + void Close() override {} + bool Read(uint8_t * data, uint32_t count) override { return false; } + bool Write(uint8_t const * data, uint32_t count) override { return false; } + void SetTimeout(uint32_t milliseconds) override {} +}; } // namespace pair Framework::RegisterMap( @@ -200,6 +212,8 @@ void Framework::OnLocationUpdate(GpsInfo const & info) CallDrapeFunction(bind(&df::DrapeEngine::SetGpsInfo, _1, rInfo, m_routingSession.IsNavigable(), routeMatchingInfo)); + if (IsTrackingReporterEnabled()) + m_trackingReporter.AddLocation(info); } void Framework::OnCompassUpdate(CompassInfo const & info) @@ -224,6 +238,19 @@ void Framework::SetMyPositionModeListener(TMyPositionModeChanged && fn) m_myPositionListener = move(fn); } +bool Framework::IsTrackingReporterEnabled() const +{ + if (m_currentRouterType != routing::RouterType::Vehicle) + return false; + + if (!m_routingSession.IsOnRoute()) + return false; + + bool allowStat = false; + UNUSED_VALUE(settings::Get(tracking::Reporter::kEnabledSettingsKey, allowStat)); + return allowStat; +} + void Framework::OnUserPositionChanged(m2::PointD const & position) { MyPositionMarkPoint * myPosition = UserMarkContainer::UserMarkForMyPostion(); @@ -314,6 +341,7 @@ Framework::Framework() , m_storage(platform::migrate::NeedMigrate() ? COUNTRIES_OBSOLETE_FILE : COUNTRIES_FILE) , m_bmManager(*this) , m_isRenderingEnabled(true) + , m_trackingReporter(make_unique(), tracking::Reporter::kPushDelayMs) , m_displacementModeManager([this](bool show) { int const mode = show ? dp::displacement::kHotelMode : dp::displacement::kDefaultMode; CallDrapeFunction(bind(&df::DrapeEngine::SetDisplacementMode, _1, mode)); diff --git a/map/framework.hpp b/map/framework.hpp index 49fdaa2b4a..712aaa4b58 100644 --- a/map/framework.hpp +++ b/map/framework.hpp @@ -33,6 +33,8 @@ #include "storage/downloading_policy.hpp" #include "storage/storage.hpp" +#include "tracking/reporter.hpp" + #include "partners_api/booking_api.hpp" #include "partners_api/uber_api.hpp" @@ -161,6 +163,7 @@ protected: uber::Api m_uberApi; bool m_isRenderingEnabled; + tracking::Reporter m_trackingReporter; /// This function will be called by m_storage when latest local files /// is downloaded. @@ -398,6 +401,7 @@ public: void SetMyPositionModeListener(location::TMyPositionModeChanged && fn); private: + bool IsTrackingReporterEnabled() const; void OnUserPositionChanged(m2::PointD const & position); //@} diff --git a/map/map_tests/map_tests.pro b/map/map_tests/map_tests.pro index e44ce96790..d476a428ae 100644 --- a/map/map_tests/map_tests.pro +++ b/map/map_tests/map_tests.pro @@ -6,7 +6,7 @@ CONFIG -= app_bundle TEMPLATE = app ROOT_DIR = ../.. -DEPENDENCIES = map drape_frontend routing search storage drape indexer partners_api platform editor geometry coding base \ +DEPENDENCIES = map drape_frontend routing search storage tracking drape indexer partners_api platform editor geometry coding base \ freetype fribidi expat protobuf tomcrypt jansson osrm stats_client minizip succinct pugixml stats_client DEPENDENCIES *= opening_hours diff --git a/mapshot/mapshot.pro b/mapshot/mapshot.pro index c1f37891d9..2a727a6a5a 100644 --- a/mapshot/mapshot.pro +++ b/mapshot/mapshot.pro @@ -1,7 +1,7 @@ # mapshot binary ROOT_DIR = .. -DEPENDENCIES = map drape_frontend routing search storage indexer drape partners_api platform editor geometry coding base \ +DEPENDENCIES = map drape_frontend routing search storage tracking indexer drape partners_api platform editor geometry coding base \ freetype expat fribidi tomcrypt gflags jansson protobuf osrm stats_client minizip succinct \ pugixml opening_hours diff --git a/omim.pro b/omim.pro index 9fc68bdcc5..7678c4009c 100644 --- a/omim.pro +++ b/omim.pro @@ -26,7 +26,7 @@ HEADERS += defines.hpp SUBDIRS = 3party base coding geometry editor indexer routing search !CONFIG(osrm) { - SUBDIRS *= platform stats storage + SUBDIRS *= platform stats storage tracking CONFIG(desktop) { SUBDIRS *= generator @@ -248,5 +248,10 @@ SUBDIRS = 3party base coding geometry editor indexer routing search partners_api_tests.subdir = partners_api/partners_api_tests partners_api_tests.depends = base platform partners_api SUBDIRS *= partners_api_tests + + tracking_tests.subdir = tracking/tracking_tests + tracking_tests.depends = base platform tracking + SUBDIRS *= tracking_tests + } # !no-tests } # !gtool diff --git a/platform/platform_tests_support/platform_tests_support.pro b/platform/platform_tests_support/platform_tests_support.pro index f5456cad2c..8408a7bdbc 100644 --- a/platform/platform_tests_support/platform_tests_support.pro +++ b/platform/platform_tests_support/platform_tests_support.pro @@ -10,10 +10,12 @@ SOURCES += \ scoped_dir.cpp \ scoped_file.cpp \ scoped_mwm.cpp \ + test_socket.cpp \ writable_dir_changer.cpp \ HEADERS += \ scoped_dir.hpp \ scoped_file.hpp \ scoped_mwm.hpp \ + test_socket.hpp \ writable_dir_changer.hpp \ diff --git a/platform/platform_tests_support/test_socket.cpp b/platform/platform_tests_support/test_socket.cpp new file mode 100644 index 0000000000..3924da379c --- /dev/null +++ b/platform/platform_tests_support/test_socket.cpp @@ -0,0 +1,67 @@ +#include "test_socket.hpp" + +#include "base/assert.hpp" + +#include "std/algorithm.hpp" +#include "std/chrono.hpp" + +namespace platform +{ +namespace tests_support +{ +TestSocket::~TestSocket() { m_isConnected = false; } + +bool TestSocket::Open(string const & host, uint16_t port) +{ + if (m_isConnected) + return false; + + m_isConnected = true; + return true; +} + +void TestSocket::Close() { m_isConnected = false; } + +bool TestSocket::Read(uint8_t * data, uint32_t count) +{ + if (!m_isConnected) + return false; + + lock_guard lg(m_inputMutex); + + if (m_input.size() < count) + return false; + + copy(m_input.begin(), m_input.end(), data); + m_input.erase(m_input.begin(), m_input.begin() + count); + return true; +} + +bool TestSocket::Write(uint8_t const * data, uint32_t count) +{ + if (!m_isConnected) + return false; + + { + lock_guard lg(m_outputMutex); + m_output.insert(m_output.end(), data, data + count); + } + m_outputCondition.notify_one(); + return true; +} + +void TestSocket::SetTimeout(uint32_t milliseconds) { m_timeoutMs = milliseconds; } + +size_t TestSocket::ReadServer(vector & destination) +{ + unique_lock lock(m_outputMutex); + m_outputCondition.wait_for(lock, milliseconds(m_timeoutMs), + [this]() { return !m_output.empty(); }); + + size_t const outputSize = m_output.size(); + destination.insert(destination.end(), m_output.begin(), m_output.end()); + m_output.clear(); + return outputSize; +} +} // namespace tests_support +} // namespace platform diff --git a/platform/platform_tests_support/test_socket.hpp b/platform/platform_tests_support/test_socket.hpp new file mode 100644 index 0000000000..8d19071121 --- /dev/null +++ b/platform/platform_tests_support/test_socket.hpp @@ -0,0 +1,44 @@ +#pragma once + +#include "platform/socket.hpp" + +#include "std/atomic.hpp" +#include "std/condition_variable.hpp" +#include "std/cstdint.hpp" +#include "std/deque.hpp" +#include "std/mutex.hpp" +#include "std/vector.hpp" + +namespace platform +{ +namespace tests_support +{ +class TestSocket final : public Socket +{ +public: + // Socket overrides: + ~TestSocket(); + bool Open(string const & host, uint16_t port) override; + void Close() override; + bool Read(uint8_t * data, uint32_t count) override; + bool Write(uint8_t const * data, uint32_t count) override; + void SetTimeout(uint32_t milliseconds) override; + + // Simulates server reading. + // Waits for some data or timeout. + // Returns size of read data. + size_t ReadServer(vector & destination); + +private: + atomic m_isConnected = {false}; + atomic m_timeoutMs = {100}; + + deque m_input; + mutex m_inputMutex; + + vector m_output; + mutex m_outputMutex; + condition_variable m_outputCondition; +}; +} // namespace tests_support +} // namespace platform diff --git a/platform/socket.hpp b/platform/socket.hpp index 01f51eb133..43083630ee 100644 --- a/platform/socket.hpp +++ b/platform/socket.hpp @@ -1,7 +1,6 @@ #pragma once #include "std/string.hpp" -#include "std/target_os.hpp" #include "std/unique_ptr.hpp" namespace platform diff --git a/qt/qt.pro b/qt/qt.pro index 1667db7068..e9a6d23bf7 100644 --- a/qt/qt.pro +++ b/qt/qt.pro @@ -1,6 +1,6 @@ # Main application in qt. ROOT_DIR = .. -DEPENDENCIES = map drape_frontend routing search storage indexer drape partners_api platform editor geometry \ +DEPENDENCIES = map drape_frontend routing search storage tracking indexer drape partners_api platform editor geometry \ coding base freetype expat fribidi tomcrypt jansson protobuf osrm stats_client \ minizip succinct pugixml oauthcpp diff --git a/storage/storage_integration_tests/storage_integration_tests.pro b/storage/storage_integration_tests/storage_integration_tests.pro index 7d6a357505..5f922c9fc7 100644 --- a/storage/storage_integration_tests/storage_integration_tests.pro +++ b/storage/storage_integration_tests/storage_integration_tests.pro @@ -6,7 +6,7 @@ CONFIG -= app_bundle TEMPLATE = app ROOT_DIR = ../.. -DEPENDENCIES = map drape_frontend routing search storage indexer drape partners_api platform_tests_support platform editor opening_hours geometry \ +DEPENDENCIES = map drape_frontend routing search storage tracking indexer drape partners_api platform_tests_support platform editor opening_hours geometry \ coding base freetype expat fribidi tomcrypt jansson protobuf osrm stats_client \ minizip succinct pugixml oauthcpp diff --git a/tracking/connection.cpp b/tracking/connection.cpp new file mode 100644 index 0000000000..550a797afc --- /dev/null +++ b/tracking/connection.cpp @@ -0,0 +1,40 @@ +#include "connection.hpp" + +#include "platform/socket.hpp" + +namespace +{ +uint32_t constexpr kSocketTimeoutMs = 10000; +} // namespace + +namespace tracking +{ +Connection::Connection(unique_ptr socket, string const & host, uint16_t port, + bool isHistorical) + : m_socket(move(socket)), m_host(host), m_port(port), m_isHistorical(isHistorical) +{ + ASSERT(m_socket.get(), ()); + + m_socket->SetTimeout(kSocketTimeoutMs); +} + +// TODO: implement handshake +bool Connection::Reconnect() +{ + m_socket->Close(); + return m_socket->Open(m_host, m_port); +} + +// TODO: implement historical +bool Connection::Send(boost::circular_buffer const & points) +{ + ASSERT(m_buffer.empty(), ()); + + MemWriter writer(m_buffer); + using coding::TrafficGPSEncoder; + TrafficGPSEncoder::SerializeDataPoints(TrafficGPSEncoder::kLatestVersion, writer, points); + bool const isSuccess = m_socket->Write(m_buffer.data(), m_buffer.size()); + m_buffer.clear(); + return isSuccess; +} +} // namespace tracking diff --git a/tracking/connection.hpp b/tracking/connection.hpp new file mode 100644 index 0000000000..a87f3dbbf1 --- /dev/null +++ b/tracking/connection.hpp @@ -0,0 +1,36 @@ +#pragma once + +#include "coding/traffic.hpp" + +#include "std/cstdint.hpp" +#include "std/string.hpp" +#include "std/unique_ptr.hpp" +#include "std/vector.hpp" + +#include "boost/circular_buffer.hpp" + +namespace platform +{ +class Socket; +} + +namespace tracking +{ +using DataPoint = coding::TrafficGPSEncoder::DataPoint; + +class Connection final +{ +public: + Connection(unique_ptr socket, string const & host, uint16_t port, + bool isHistorical); + bool Reconnect(); + bool Send(boost::circular_buffer const & points); + +private: + unique_ptr m_socket; + string const m_host; + uint16_t const m_port; + bool const m_isHistorical; + vector m_buffer; +}; +} // namespace tracking diff --git a/tracking/reporter.cpp b/tracking/reporter.cpp new file mode 100644 index 0000000000..8e2b9a9991 --- /dev/null +++ b/tracking/reporter.cpp @@ -0,0 +1,117 @@ +#include "reporter.hpp" + +#include "platform/location.hpp" +#include "platform/socket.hpp" + +#include "base/logging.hpp" +#include "base/timer.hpp" + +#include "std/target_os.hpp" + +#include "private.h" + +namespace +{ +double constexpr kMinDelaySeconds = 1.0; +double constexpr kReconnectDelaySeconds = 60.0; +size_t constexpr kRealTimeBufferSize = 60; +} // namespace + +namespace tracking +{ +// static +// Apple and Android applications use different keys for settings.ini. +// Keys saved for existing users, so can' fix it easy, need migration. +// Use this hack until change to special traffic key. +#if defined(OMIM_OS_IPHONE) +const char Reporter::kEnabledSettingsKey[] = "StatisticsEnabled"; +#elif defined(OMIM_OS_ANDROID) +const char Reporter::kEnabledSettingsKey[] = "AllowStat"; +#else +const char Reporter::kEnabledSettingsKey[] = "AllowStat"; +#endif + +// static +milliseconds const Reporter::kPushDelayMs = milliseconds(10000); + +Reporter::Reporter(unique_ptr socket, milliseconds pushDelay) + : m_realtimeSender(move(socket), TRACKING_REALTIME_HOST, TRACKING_REALTIME_PORT, false) + , m_pushDelay(pushDelay) + , m_points(kRealTimeBufferSize) + , m_thread([this]{Run();}) +{ +} + +Reporter::~Reporter() +{ + { + lock_guard lg(m_mutex); + m_isFinished = true; + } + m_cv.notify_one(); + m_thread.join(); +} + +void Reporter::AddLocation(location::GpsInfo const & info) +{ + lock_guard lg(m_mutex); + + if (info.m_timestamp < m_lastGpsTime + kMinDelaySeconds) + return; + + m_lastGpsTime = info.m_timestamp; + m_input.push_back(DataPoint(info.m_timestamp, ms::LatLon(info.m_latitude, info.m_longitude))); +} + +void Reporter::Run() +{ + LOG(LINFO, ("Tracking Reporter started")); + + unique_lock lock(m_mutex); + + while (!m_isFinished) + { + auto const startTime = steady_clock::now(); + + // Fetch input. + m_points.insert(m_points.end(), m_input.begin(), m_input.end()); + m_input.clear(); + + lock.unlock(); + if (SendPoints()) + m_points.clear(); + lock.lock(); + + auto const passedMs = duration_cast(steady_clock::now() - startTime); + if (passedMs < m_pushDelay) + m_cv.wait_for(lock, m_pushDelay - passedMs, [this]{return m_isFinished;}); + } + + LOG(LINFO, ("Tracking Reporter finished")); +} + +bool Reporter::SendPoints() +{ + if (m_points.empty()) + return true; + + if (m_wasConnected) + m_wasConnected = m_realtimeSender.Send(m_points); + + if (m_wasConnected) + return true; + + double const currentTime = my::Timer::LocalTime(); + if (currentTime < m_lastConnectionAttempt + kReconnectDelaySeconds) + return false; + + m_lastConnectionAttempt = currentTime; + + m_wasConnected = m_realtimeSender.Reconnect(); + if (!m_wasConnected) + return false; + + m_wasConnected = m_realtimeSender.Send(m_points); + return m_wasConnected; +} +} // namespace tracking diff --git a/tracking/reporter.hpp b/tracking/reporter.hpp new file mode 100644 index 0000000000..2c85b69969 --- /dev/null +++ b/tracking/reporter.hpp @@ -0,0 +1,56 @@ +#pragma once + +#include "tracking/connection.hpp" + +#include "base/thread.hpp" + +#include "std/chrono.hpp" +#include "std/condition_variable.hpp" +#include "std/mutex.hpp" +#include "std/unique_ptr.hpp" +#include "std/vector.hpp" + +#include "boost/circular_buffer.hpp" + +namespace location +{ +class GpsInfo; +} + +namespace platform +{ +class Socket; +} + +namespace tracking +{ +class Reporter final +{ +public: + static milliseconds const kPushDelayMs; + static const char kEnabledSettingsKey[]; + + Reporter(unique_ptr socket, milliseconds pushDelay); + ~Reporter(); + + void AddLocation(location::GpsInfo const & info); + +private: + void Run(); + bool SendPoints(); + + Connection m_realtimeSender; + milliseconds m_pushDelay; + bool m_wasConnected = false; + double m_lastConnectionAttempt = 0.0; + // Input buffer for incoming points. Worker thread steals it contents. + vector m_input; + // Last collected points, sends periodically to server. + boost::circular_buffer m_points; + double m_lastGpsTime = 0.0; + bool m_isFinished = false; + mutex m_mutex; + condition_variable m_cv; + threads::SimpleThread m_thread; +}; +} // namespace tracking diff --git a/tracking/tracking.pro b/tracking/tracking.pro new file mode 100644 index 0000000000..2418e67637 --- /dev/null +++ b/tracking/tracking.pro @@ -0,0 +1,15 @@ +TARGET = tracking +TEMPLATE = lib +CONFIG += staticlib warn_on + +ROOT_DIR = .. + +include($$ROOT_DIR/common.pri) + +SOURCES += \ + connection.cpp \ + reporter.cpp \ + +HEADERS += \ + connection.hpp \ + reporter.hpp \ diff --git a/tracking/tracking_tests/reporter_test.cpp b/tracking/tracking_tests/reporter_test.cpp new file mode 100644 index 0000000000..2492f32e9d --- /dev/null +++ b/tracking/tracking_tests/reporter_test.cpp @@ -0,0 +1,57 @@ +#include "tracking/reporter.hpp" + +#include "coding/traffic.hpp" + +#include "platform/location.hpp" +#include "platform/platform_tests_support/test_socket.hpp" +#include "platform/socket.hpp" + +#include "testing/testing.hpp" + +#include "base/math.hpp" +#include "base/thread.hpp" + +#include "std/cmath.hpp" + +using namespace tracking; +using namespace platform::tests_support; + +namespace +{ +void TransferLocation(Reporter & reporter, TestSocket & testSocket, double timestamp, + double latidute, double longtitude) +{ + location::GpsInfo gpsInfo; + gpsInfo.m_timestamp = timestamp; + gpsInfo.m_latitude = latidute; + gpsInfo.m_longitude = longtitude; + reporter.AddLocation(gpsInfo); + + vector buffer; + size_t const readSize = testSocket.ReadServer(buffer); + TEST_GREATER(readSize, 0, ()); + + vector points; + MemReader memReader(buffer.data(), buffer.size()); + ReaderSource src(memReader); + coding::TrafficGPSEncoder::DeserializeDataPoints(coding::TrafficGPSEncoder::kLatestVersion, src, + points); + + TEST_EQUAL(points.size(), 1, ()); + auto const & point = points[0]; + TEST_EQUAL(point.m_timestamp, timestamp, ()); + TEST(my::AlmostEqualAbs(point.m_latLon.lat, latidute, 0.001), ()); + TEST(my::AlmostEqualAbs(point.m_latLon.lon, longtitude, 0.001), ()); +} +} + +UNIT_TEST(Reporter_TransferLocations) +{ + auto socket = make_unique(); + TestSocket & testSocket = *socket.get(); + + Reporter reporter(move(socket), milliseconds(10) /* pushDelay */); + TransferLocation(reporter, testSocket, 1.0, 2.0, 3.0); + TransferLocation(reporter, testSocket, 4.0, 5.0, 6.0); + TransferLocation(reporter, testSocket, 7.0, 8.0, 9.0); +} diff --git a/tracking/tracking_tests/tracking_tests.pro b/tracking/tracking_tests/tracking_tests.pro new file mode 100644 index 0000000000..81465b60aa --- /dev/null +++ b/tracking/tracking_tests/tracking_tests.pro @@ -0,0 +1,29 @@ +TARGET = tracking_tests +CONFIG += console warn_on +CONFIG -= app_bundle +TEMPLATE = app + +ROOT_DIR = ../.. + +INCLUDEPATH *= $$ROOT_DIR/3party/jansson/src + +DEPENDENCIES = routing tracking platform_tests_support platform coding geometry base stats_client + +include($$ROOT_DIR/common.pri) + +DEFINES *= OMIM_UNIT_TEST_WITH_QT_EVENT_LOOP + +QT *= core + +macx-* { + QT *= widgets # needed for QApplication with event loop, to test async events + LIBS *= "-framework IOKit" "-framework SystemConfiguration" +} + +win*|linux* { + QT *= network +} + +SOURCES += \ + $$ROOT_DIR/testing/testingmain.cpp \ + reporter_test.cpp