Merge pull request #4498 from dobriy-eeh/gps-stats

gps tracking reporter
This commit is contained in:
Sergey Yershov 2016-10-19 16:21:03 +03:00 committed by GitHub
commit d27ed175d5
20 changed files with 513 additions and 10 deletions

View file

@ -37,9 +37,9 @@ public:
// Version 0:
// Coordinates are truncated and stored as integers. All integers
// are written as varints.
template <typename Writer>
template <typename Writer, typename Collection>
static size_t SerializeDataPoints(uint32_t version, Writer & writer,
vector<DataPoint> const & points)
Collection const & points)
{
ASSERT_LESS_OR_EQUAL(version, kLatestVersion, ());
@ -78,8 +78,8 @@ public:
}
// Deserializes the points from |source| and appends them to |result|.
template <typename Source>
static void DeserializeDataPoints(uint32_t version, Source & src, vector<DataPoint> & result)
template <typename Source, typename Collection>
static void DeserializeDataPoints(uint32_t version, Source & src, Collection & result)
{
ASSERT_LESS_OR_EQUAL(version, kLatestVersion, ());

View file

@ -52,6 +52,10 @@ else
#define BOOKING_SECRET ""
#define UBER_SERVER_TOKEN ""
#define UBER_CLIENT_ID ""
#define TRACKING_REALTIME_HOST ""
#define TRACKING_REALTIME_PORT 0
#define TRACKING_HISTORICAL_HOST ""
#define TRACKING_HISTORICAL_PORT 0
' > "$PRIVATE_HEADER"
echo 'ext {
spropStoreFile = "../tools/android/debug.keystore"

View file

@ -60,6 +60,7 @@
#include "platform/platform.hpp"
#include "platform/preferred_languages.hpp"
#include "platform/settings.hpp"
#include "platform/socket.hpp"
#include "coding/internal/file_data.hpp"
#include "coding/zip_reader.hpp"
@ -155,6 +156,17 @@ void CancelQuery(weak_ptr<search::ProcessorHandle> & handle)
queryHandle->Cancel();
handle.reset();
}
class StubSocket final : public platform::Socket
{
public:
// Socket overrides
bool Open(string const & host, uint16_t port) override { return false; }
void Close() override {}
bool Read(uint8_t * data, uint32_t count) override { return false; }
bool Write(uint8_t const * data, uint32_t count) override { return false; }
void SetTimeout(uint32_t milliseconds) override {}
};
} // namespace
pair<MwmSet::MwmId, MwmSet::RegResult> Framework::RegisterMap(
@ -200,6 +212,8 @@ void Framework::OnLocationUpdate(GpsInfo const & info)
CallDrapeFunction(bind(&df::DrapeEngine::SetGpsInfo, _1, rInfo,
m_routingSession.IsNavigable(), routeMatchingInfo));
if (IsTrackingReporterEnabled())
m_trackingReporter.AddLocation(info);
}
void Framework::OnCompassUpdate(CompassInfo const & info)
@ -224,6 +238,19 @@ void Framework::SetMyPositionModeListener(TMyPositionModeChanged && fn)
m_myPositionListener = move(fn);
}
bool Framework::IsTrackingReporterEnabled() const
{
if (m_currentRouterType != routing::RouterType::Vehicle)
return false;
if (!m_routingSession.IsOnRoute())
return false;
bool allowStat = false;
UNUSED_VALUE(settings::Get(tracking::Reporter::kEnabledSettingsKey, allowStat));
return allowStat;
}
void Framework::OnUserPositionChanged(m2::PointD const & position)
{
MyPositionMarkPoint * myPosition = UserMarkContainer::UserMarkForMyPostion();
@ -314,6 +341,7 @@ Framework::Framework()
, m_storage(platform::migrate::NeedMigrate() ? COUNTRIES_OBSOLETE_FILE : COUNTRIES_FILE)
, m_bmManager(*this)
, m_isRenderingEnabled(true)
, m_trackingReporter(make_unique<StubSocket>(), tracking::Reporter::kPushDelayMs)
, m_displacementModeManager([this](bool show) {
int const mode = show ? dp::displacement::kHotelMode : dp::displacement::kDefaultMode;
CallDrapeFunction(bind(&df::DrapeEngine::SetDisplacementMode, _1, mode));

View file

@ -33,6 +33,8 @@
#include "storage/downloading_policy.hpp"
#include "storage/storage.hpp"
#include "tracking/reporter.hpp"
#include "partners_api/booking_api.hpp"
#include "partners_api/uber_api.hpp"
@ -161,6 +163,7 @@ protected:
uber::Api m_uberApi;
bool m_isRenderingEnabled;
tracking::Reporter m_trackingReporter;
/// This function will be called by m_storage when latest local files
/// is downloaded.
@ -398,6 +401,7 @@ public:
void SetMyPositionModeListener(location::TMyPositionModeChanged && fn);
private:
bool IsTrackingReporterEnabled() const;
void OnUserPositionChanged(m2::PointD const & position);
//@}

View file

@ -6,7 +6,7 @@ CONFIG -= app_bundle
TEMPLATE = app
ROOT_DIR = ../..
DEPENDENCIES = map drape_frontend routing search storage drape indexer partners_api platform editor geometry coding base \
DEPENDENCIES = map drape_frontend routing search storage tracking drape indexer partners_api platform editor geometry coding base \
freetype fribidi expat protobuf tomcrypt jansson osrm stats_client minizip succinct pugixml stats_client
DEPENDENCIES *= opening_hours

View file

@ -1,7 +1,7 @@
# mapshot binary
ROOT_DIR = ..
DEPENDENCIES = map drape_frontend routing search storage indexer drape partners_api platform editor geometry coding base \
DEPENDENCIES = map drape_frontend routing search storage tracking indexer drape partners_api platform editor geometry coding base \
freetype expat fribidi tomcrypt gflags jansson protobuf osrm stats_client minizip succinct \
pugixml opening_hours

View file

@ -26,7 +26,7 @@ HEADERS += defines.hpp
SUBDIRS = 3party base coding geometry editor indexer routing search
!CONFIG(osrm) {
SUBDIRS *= platform stats storage
SUBDIRS *= platform stats storage tracking
CONFIG(desktop) {
SUBDIRS *= generator
@ -248,5 +248,10 @@ SUBDIRS = 3party base coding geometry editor indexer routing search
partners_api_tests.subdir = partners_api/partners_api_tests
partners_api_tests.depends = base platform partners_api
SUBDIRS *= partners_api_tests
tracking_tests.subdir = tracking/tracking_tests
tracking_tests.depends = base platform tracking
SUBDIRS *= tracking_tests
} # !no-tests
} # !gtool

View file

@ -10,10 +10,12 @@ SOURCES += \
scoped_dir.cpp \
scoped_file.cpp \
scoped_mwm.cpp \
test_socket.cpp \
writable_dir_changer.cpp \
HEADERS += \
scoped_dir.hpp \
scoped_file.hpp \
scoped_mwm.hpp \
test_socket.hpp \
writable_dir_changer.hpp \

View file

@ -0,0 +1,67 @@
#include "test_socket.hpp"
#include "base/assert.hpp"
#include "std/algorithm.hpp"
#include "std/chrono.hpp"
namespace platform
{
namespace tests_support
{
TestSocket::~TestSocket() { m_isConnected = false; }
bool TestSocket::Open(string const & host, uint16_t port)
{
if (m_isConnected)
return false;
m_isConnected = true;
return true;
}
void TestSocket::Close() { m_isConnected = false; }
bool TestSocket::Read(uint8_t * data, uint32_t count)
{
if (!m_isConnected)
return false;
lock_guard<mutex> lg(m_inputMutex);
if (m_input.size() < count)
return false;
copy(m_input.begin(), m_input.end(), data);
m_input.erase(m_input.begin(), m_input.begin() + count);
return true;
}
bool TestSocket::Write(uint8_t const * data, uint32_t count)
{
if (!m_isConnected)
return false;
{
lock_guard<mutex> lg(m_outputMutex);
m_output.insert(m_output.end(), data, data + count);
}
m_outputCondition.notify_one();
return true;
}
void TestSocket::SetTimeout(uint32_t milliseconds) { m_timeoutMs = milliseconds; }
size_t TestSocket::ReadServer(vector<uint8_t> & destination)
{
unique_lock<mutex> lock(m_outputMutex);
m_outputCondition.wait_for(lock, milliseconds(m_timeoutMs),
[this]() { return !m_output.empty(); });
size_t const outputSize = m_output.size();
destination.insert(destination.end(), m_output.begin(), m_output.end());
m_output.clear();
return outputSize;
}
} // namespace tests_support
} // namespace platform

View file

@ -0,0 +1,44 @@
#pragma once
#include "platform/socket.hpp"
#include "std/atomic.hpp"
#include "std/condition_variable.hpp"
#include "std/cstdint.hpp"
#include "std/deque.hpp"
#include "std/mutex.hpp"
#include "std/vector.hpp"
namespace platform
{
namespace tests_support
{
class TestSocket final : public Socket
{
public:
// Socket overrides:
~TestSocket();
bool Open(string const & host, uint16_t port) override;
void Close() override;
bool Read(uint8_t * data, uint32_t count) override;
bool Write(uint8_t const * data, uint32_t count) override;
void SetTimeout(uint32_t milliseconds) override;
// Simulates server reading.
// Waits for some data or timeout.
// Returns size of read data.
size_t ReadServer(vector<uint8_t> & destination);
private:
atomic<bool> m_isConnected = {false};
atomic<uint32_t> m_timeoutMs = {100};
deque<uint8_t> m_input;
mutex m_inputMutex;
vector<uint8_t> m_output;
mutex m_outputMutex;
condition_variable m_outputCondition;
};
} // namespace tests_support
} // namespace platform

View file

@ -1,7 +1,6 @@
#pragma once
#include "std/string.hpp"
#include "std/target_os.hpp"
#include "std/unique_ptr.hpp"
namespace platform

View file

@ -1,6 +1,6 @@
# Main application in qt.
ROOT_DIR = ..
DEPENDENCIES = map drape_frontend routing search storage indexer drape partners_api platform editor geometry \
DEPENDENCIES = map drape_frontend routing search storage tracking indexer drape partners_api platform editor geometry \
coding base freetype expat fribidi tomcrypt jansson protobuf osrm stats_client \
minizip succinct pugixml oauthcpp

View file

@ -6,7 +6,7 @@ CONFIG -= app_bundle
TEMPLATE = app
ROOT_DIR = ../..
DEPENDENCIES = map drape_frontend routing search storage indexer drape partners_api platform_tests_support platform editor opening_hours geometry \
DEPENDENCIES = map drape_frontend routing search storage tracking indexer drape partners_api platform_tests_support platform editor opening_hours geometry \
coding base freetype expat fribidi tomcrypt jansson protobuf osrm stats_client \
minizip succinct pugixml oauthcpp

40
tracking/connection.cpp Normal file
View file

@ -0,0 +1,40 @@
#include "connection.hpp"
#include "platform/socket.hpp"
namespace
{
uint32_t constexpr kSocketTimeoutMs = 10000;
} // namespace
namespace tracking
{
Connection::Connection(unique_ptr<platform::Socket> socket, string const & host, uint16_t port,
bool isHistorical)
: m_socket(move(socket)), m_host(host), m_port(port), m_isHistorical(isHistorical)
{
ASSERT(m_socket.get(), ());
m_socket->SetTimeout(kSocketTimeoutMs);
}
// TODO: implement handshake
bool Connection::Reconnect()
{
m_socket->Close();
return m_socket->Open(m_host, m_port);
}
// TODO: implement historical
bool Connection::Send(boost::circular_buffer<DataPoint> const & points)
{
ASSERT(m_buffer.empty(), ());
MemWriter<decltype(m_buffer)> writer(m_buffer);
using coding::TrafficGPSEncoder;
TrafficGPSEncoder::SerializeDataPoints(TrafficGPSEncoder::kLatestVersion, writer, points);
bool const isSuccess = m_socket->Write(m_buffer.data(), m_buffer.size());
m_buffer.clear();
return isSuccess;
}
} // namespace tracking

36
tracking/connection.hpp Normal file
View file

@ -0,0 +1,36 @@
#pragma once
#include "coding/traffic.hpp"
#include "std/cstdint.hpp"
#include "std/string.hpp"
#include "std/unique_ptr.hpp"
#include "std/vector.hpp"
#include "boost/circular_buffer.hpp"
namespace platform
{
class Socket;
}
namespace tracking
{
using DataPoint = coding::TrafficGPSEncoder::DataPoint;
class Connection final
{
public:
Connection(unique_ptr<platform::Socket> socket, string const & host, uint16_t port,
bool isHistorical);
bool Reconnect();
bool Send(boost::circular_buffer<DataPoint> const & points);
private:
unique_ptr<platform::Socket> m_socket;
string const m_host;
uint16_t const m_port;
bool const m_isHistorical;
vector<uint8_t> m_buffer;
};
} // namespace tracking

117
tracking/reporter.cpp Normal file
View file

@ -0,0 +1,117 @@
#include "reporter.hpp"
#include "platform/location.hpp"
#include "platform/socket.hpp"
#include "base/logging.hpp"
#include "base/timer.hpp"
#include "std/target_os.hpp"
#include "private.h"
namespace
{
double constexpr kMinDelaySeconds = 1.0;
double constexpr kReconnectDelaySeconds = 60.0;
size_t constexpr kRealTimeBufferSize = 60;
} // namespace
namespace tracking
{
// static
// Apple and Android applications use different keys for settings.ini.
// Keys saved for existing users, so can' fix it easy, need migration.
// Use this hack until change to special traffic key.
#if defined(OMIM_OS_IPHONE)
const char Reporter::kEnabledSettingsKey[] = "StatisticsEnabled";
#elif defined(OMIM_OS_ANDROID)
const char Reporter::kEnabledSettingsKey[] = "AllowStat";
#else
const char Reporter::kEnabledSettingsKey[] = "AllowStat";
#endif
// static
milliseconds const Reporter::kPushDelayMs = milliseconds(10000);
Reporter::Reporter(unique_ptr<platform::Socket> socket, milliseconds pushDelay)
: m_realtimeSender(move(socket), TRACKING_REALTIME_HOST, TRACKING_REALTIME_PORT, false)
, m_pushDelay(pushDelay)
, m_points(kRealTimeBufferSize)
, m_thread([this]{Run();})
{
}
Reporter::~Reporter()
{
{
lock_guard<mutex> lg(m_mutex);
m_isFinished = true;
}
m_cv.notify_one();
m_thread.join();
}
void Reporter::AddLocation(location::GpsInfo const & info)
{
lock_guard<mutex> lg(m_mutex);
if (info.m_timestamp < m_lastGpsTime + kMinDelaySeconds)
return;
m_lastGpsTime = info.m_timestamp;
m_input.push_back(DataPoint(info.m_timestamp, ms::LatLon(info.m_latitude, info.m_longitude)));
}
void Reporter::Run()
{
LOG(LINFO, ("Tracking Reporter started"));
unique_lock<mutex> lock(m_mutex);
while (!m_isFinished)
{
auto const startTime = steady_clock::now();
// Fetch input.
m_points.insert(m_points.end(), m_input.begin(), m_input.end());
m_input.clear();
lock.unlock();
if (SendPoints())
m_points.clear();
lock.lock();
auto const passedMs = duration_cast<milliseconds>(steady_clock::now() - startTime);
if (passedMs < m_pushDelay)
m_cv.wait_for(lock, m_pushDelay - passedMs, [this]{return m_isFinished;});
}
LOG(LINFO, ("Tracking Reporter finished"));
}
bool Reporter::SendPoints()
{
if (m_points.empty())
return true;
if (m_wasConnected)
m_wasConnected = m_realtimeSender.Send(m_points);
if (m_wasConnected)
return true;
double const currentTime = my::Timer::LocalTime();
if (currentTime < m_lastConnectionAttempt + kReconnectDelaySeconds)
return false;
m_lastConnectionAttempt = currentTime;
m_wasConnected = m_realtimeSender.Reconnect();
if (!m_wasConnected)
return false;
m_wasConnected = m_realtimeSender.Send(m_points);
return m_wasConnected;
}
} // namespace tracking

56
tracking/reporter.hpp Normal file
View file

@ -0,0 +1,56 @@
#pragma once
#include "tracking/connection.hpp"
#include "base/thread.hpp"
#include "std/chrono.hpp"
#include "std/condition_variable.hpp"
#include "std/mutex.hpp"
#include "std/unique_ptr.hpp"
#include "std/vector.hpp"
#include "boost/circular_buffer.hpp"
namespace location
{
class GpsInfo;
}
namespace platform
{
class Socket;
}
namespace tracking
{
class Reporter final
{
public:
static milliseconds const kPushDelayMs;
static const char kEnabledSettingsKey[];
Reporter(unique_ptr<platform::Socket> socket, milliseconds pushDelay);
~Reporter();
void AddLocation(location::GpsInfo const & info);
private:
void Run();
bool SendPoints();
Connection m_realtimeSender;
milliseconds m_pushDelay;
bool m_wasConnected = false;
double m_lastConnectionAttempt = 0.0;
// Input buffer for incoming points. Worker thread steals it contents.
vector<DataPoint> m_input;
// Last collected points, sends periodically to server.
boost::circular_buffer<DataPoint> m_points;
double m_lastGpsTime = 0.0;
bool m_isFinished = false;
mutex m_mutex;
condition_variable m_cv;
threads::SimpleThread m_thread;
};
} // namespace tracking

15
tracking/tracking.pro Normal file
View file

@ -0,0 +1,15 @@
TARGET = tracking
TEMPLATE = lib
CONFIG += staticlib warn_on
ROOT_DIR = ..
include($$ROOT_DIR/common.pri)
SOURCES += \
connection.cpp \
reporter.cpp \
HEADERS += \
connection.hpp \
reporter.hpp \

View file

@ -0,0 +1,57 @@
#include "tracking/reporter.hpp"
#include "coding/traffic.hpp"
#include "platform/location.hpp"
#include "platform/platform_tests_support/test_socket.hpp"
#include "platform/socket.hpp"
#include "testing/testing.hpp"
#include "base/math.hpp"
#include "base/thread.hpp"
#include "std/cmath.hpp"
using namespace tracking;
using namespace platform::tests_support;
namespace
{
void TransferLocation(Reporter & reporter, TestSocket & testSocket, double timestamp,
double latidute, double longtitude)
{
location::GpsInfo gpsInfo;
gpsInfo.m_timestamp = timestamp;
gpsInfo.m_latitude = latidute;
gpsInfo.m_longitude = longtitude;
reporter.AddLocation(gpsInfo);
vector<uint8_t> buffer;
size_t const readSize = testSocket.ReadServer(buffer);
TEST_GREATER(readSize, 0, ());
vector<coding::TrafficGPSEncoder::DataPoint> points;
MemReader memReader(buffer.data(), buffer.size());
ReaderSource<MemReader> src(memReader);
coding::TrafficGPSEncoder::DeserializeDataPoints(coding::TrafficGPSEncoder::kLatestVersion, src,
points);
TEST_EQUAL(points.size(), 1, ());
auto const & point = points[0];
TEST_EQUAL(point.m_timestamp, timestamp, ());
TEST(my::AlmostEqualAbs(point.m_latLon.lat, latidute, 0.001), ());
TEST(my::AlmostEqualAbs(point.m_latLon.lon, longtitude, 0.001), ());
}
}
UNIT_TEST(Reporter_TransferLocations)
{
auto socket = make_unique<TestSocket>();
TestSocket & testSocket = *socket.get();
Reporter reporter(move(socket), milliseconds(10) /* pushDelay */);
TransferLocation(reporter, testSocket, 1.0, 2.0, 3.0);
TransferLocation(reporter, testSocket, 4.0, 5.0, 6.0);
TransferLocation(reporter, testSocket, 7.0, 8.0, 9.0);
}

View file

@ -0,0 +1,29 @@
TARGET = tracking_tests
CONFIG += console warn_on
CONFIG -= app_bundle
TEMPLATE = app
ROOT_DIR = ../..
INCLUDEPATH *= $$ROOT_DIR/3party/jansson/src
DEPENDENCIES = routing tracking platform_tests_support platform coding geometry base stats_client
include($$ROOT_DIR/common.pri)
DEFINES *= OMIM_UNIT_TEST_WITH_QT_EVENT_LOOP
QT *= core
macx-* {
QT *= widgets # needed for QApplication with event loop, to test async events
LIBS *= "-framework IOKit" "-framework SystemConfiguration"
}
win*|linux* {
QT *= network
}
SOURCES += \
$$ROOT_DIR/testing/testingmain.cpp \
reporter_test.cpp