diff --git a/map/framework.cpp b/map/framework.cpp index 162dc4ebc7..1e3458b33e 100644 --- a/map/framework.cpp +++ b/map/framework.cpp @@ -256,6 +256,24 @@ Framework::Framework() m_storage.Init(bind(&Framework::UpdateLatestCountryFile, this, _1)); LOG(LDEBUG, ("Storage initialized")); + auto const routingStatisticsFn = [](map const & statistics) + { + alohalytics::LogEvent("Routing_CalculatingRoute", statistics); + }; +#ifdef DEBUG + auto const routingVisualizerFn = [this](m2::PointD const & pt) + { + GetPlatform().RunOnGuiThread([this,pt]() + { + m_bmManager.UserMarksGetController(UserMarkContainer::DEBUG_MARK).CreateUserMark(pt); + Invalidate(); + }); + }; +#else + routing::RouterDelegate::TPointCheckCallback const routingVisualizerFn = nullptr; +#endif + m_routingSession.Init(routingStatisticsFn, routingVisualizerFn); + SetRouterImpl(RouterType::Vehicle); LOG(LDEBUG, ("Routing engine initialized")); @@ -2148,22 +2166,6 @@ routing::RouterType Framework::GetRouter() const void Framework::SetRouterImpl(RouterType type) { - auto const routingStatisticsFn = [](map const & statistics) - { - alohalytics::LogEvent("Routing_CalculatingRoute", statistics); - }; - - auto countryFileGetter = [this](m2::PointD const & p) -> string - { - // TODO (@gorshenin): fix search engine to return CountryFile - // instances instead of plain strings. - return GetSearchEngine()->GetCountryFile(p); - }; - auto localFileGetter = [this](string const & countryFile) -> shared_ptr - { - return m_storage.GetLatestLocalFile(CountryFile(countryFile)); - }; - unique_ptr router; unique_ptr fetcher; if (type == RouterType::Pedestrian) @@ -2173,24 +2175,23 @@ void Framework::SetRouterImpl(RouterType type) } else { + auto countryFileGetter = [this](m2::PointD const & p) -> string + { + // TODO (@gorshenin): fix search engine to return CountryFile + // instances instead of plain strings. + return GetSearchEngine()->GetCountryFile(p); + }; + auto localFileGetter = [this](string const & countryFile) -> shared_ptr + { + return m_storage.GetLatestLocalFile(CountryFile(countryFile)); + }; + router.reset(new OsrmRouter(&m_model.GetIndex(), countryFileGetter)); fetcher.reset(new OnlineAbsentCountriesFetcher(countryFileGetter, localFileGetter)); m_routingSession.SetRoutingSettings(routing::GetCarRoutingSettings()); } -#ifdef DEBUG - routing::RouterDelegate::TPointCheckCallback const routingVisualizerFn = [this](m2::PointD const & pt) - { - GetPlatform().RunOnGuiThread([this,pt]() - { - m_bmManager.UserMarksGetController(UserMarkContainer::DEBUG_MARK).CreateUserMark(pt); - Invalidate(); - }); - }; -#else - routing::RouterDelegate::TPointCheckCallback const routingVisualizerFn = nullptr; -#endif - m_routingSession.SetRouter(move(router), move(fetcher), routingStatisticsFn, routingVisualizerFn); + m_routingSession.SetRouter(move(router), move(fetcher)); m_currentRouterType = type; } diff --git a/routing/async_router.cpp b/routing/async_router.cpp index 444eee16ca..8a66dc7726 100644 --- a/routing/async_router.cpp +++ b/routing/async_router.cpp @@ -55,48 +55,117 @@ map PrepareStatisticsData(string const & routerName, } // namespace -AsyncRouter::AsyncRouter(unique_ptr && router, unique_ptr && fetcher, - TRoutingStatisticsCallback const & routingStatisticsCallback, - RouterDelegate::TPointCheckCallback const & pointCheckCallback) - : m_absentFetcher(move(fetcher)), - m_router(move(router)), - m_routingStatisticsCallback(routingStatisticsCallback) +// ---------------------------------------------------------------------------------------------------------------------------- + +AsyncRouter::RouterDelegateProxy::RouterDelegateProxy(TReadyCallback const & onReady, + RouterDelegate::TPointCheckCallback const & onPointCheck, + RouterDelegate::TProgressCallback const & onProgress, + uint32_t timeoutSec) + : m_onReady(onReady), m_onPointCheck(onPointCheck), m_onProgress(onProgress) { - ASSERT(m_router, ()); - m_delegate.SetPointCheckCallback(pointCheckCallback); - m_isReadyThread.clear(); + m_delegate.Reset(); + m_delegate.SetPointCheckCallback(bind(&RouterDelegateProxy::OnPointCheck, this, _1)); + m_delegate.SetProgressCallback(bind(&RouterDelegateProxy::OnProgress, this, _1)); + m_delegate.SetTimeout(timeoutSec); } -AsyncRouter::~AsyncRouter() { ClearState(); } +void AsyncRouter::RouterDelegateProxy::OnReady(Route & route, IRouter::ResultCode resultCode) +{ + if (!m_onReady) + return; + lock_guard l(m_guard); + if (m_delegate.IsCancelled()) + return; + m_onReady(route, resultCode); +} + +void AsyncRouter::RouterDelegateProxy::Cancel() +{ + lock_guard l(m_guard); + m_delegate.Cancel(); +} + +void AsyncRouter::RouterDelegateProxy::OnProgress(float progress) +{ + if (!m_onProgress) + return; + lock_guard l(m_guard); + if (m_delegate.IsCancelled()) + return; + m_onProgress(progress); +} + +void AsyncRouter::RouterDelegateProxy::OnPointCheck(m2::PointD const & pt) +{ + if (!m_onPointCheck) + return; + lock_guard l(m_guard); + if (m_delegate.IsCancelled()) + return; + m_onPointCheck(pt); +} + +// ---------------------------------------------------------------------------------------------------------------------------- + +AsyncRouter::AsyncRouter(TRoutingStatisticsCallback const & routingStatisticsCallback, + RouterDelegate::TPointCheckCallback const & pointCheckCallback) + : m_threadExit(false), m_hasRequest(false), m_clearState(false), + m_routingStatisticsCallback(routingStatisticsCallback), + m_pointCheckCallback(pointCheckCallback) +{ + m_thread = thread(bind(&AsyncRouter::ThreadFunc, this)); +} + +AsyncRouter::~AsyncRouter() +{ + { + unique_lock ul(m_guard); + + ResetDelegate(); + + m_threadExit = true; + m_threadCondVar.notify_one(); + } + + m_thread.join(); +} + +void AsyncRouter::SetRouter(unique_ptr && router, unique_ptr && fetcher) +{ + unique_lock ul(m_guard); + + ResetDelegate(); + + m_router = move(router); + m_absentFetcher = move(fetcher); +} void AsyncRouter::CalculateRoute(m2::PointD const & startPoint, m2::PointD const & direction, m2::PointD const & finalPoint, TReadyCallback const & readyCallback, RouterDelegate::TProgressCallback const & progressCallback, uint32_t timeoutSec) { - { - lock_guard paramsGuard(m_paramsMutex); + unique_lock ul(m_guard); - m_startPoint = startPoint; - m_startDirection = direction; - m_finalPoint = finalPoint; + m_startPoint = startPoint; + m_startDirection = direction; + m_finalPoint = finalPoint; - m_delegate.Cancel(); - m_delegate.SetProgressCallback(progressCallback); - } + ResetDelegate(); - GetPlatform().RunAsync(bind(&AsyncRouter::CalculateRouteImpl, this, readyCallback, timeoutSec)); + m_delegate = make_shared(readyCallback, m_pointCheckCallback, progressCallback, timeoutSec); + + m_hasRequest = true; + m_threadCondVar.notify_one(); } void AsyncRouter::ClearState() { - // Send cancel flag to the algorithms. - m_delegate.Cancel(); + unique_lock ul(m_guard); - // And wait while it is finishing. - lock_guard routingGuard(m_routingMutex); + m_clearState = true; - m_router->ClearState(); + ResetDelegate(); } void AsyncRouter::LogCode(IRouter::ResultCode code, double const elapsedSec) @@ -137,34 +206,71 @@ void AsyncRouter::LogCode(IRouter::ResultCode code, double const elapsedSec) case IRouter::InternalError: LOG(LINFO, ("Internal error")); break; + case IRouter::FileTooOld: + LOG(LINFO, ("File too old")); + break; } } -void AsyncRouter::CalculateRouteImpl(TReadyCallback const & readyCallback, uint32_t timeoutSec) +void AsyncRouter::ResetDelegate() { - ASSERT(m_router, ()); - if (m_isReadyThread.test_and_set()) - return; - - Route route(m_router->GetName()); - IRouter::ResultCode code; - - lock_guard routingGuard(m_routingMutex); - - m_isReadyThread.clear(); - - m2::PointD startPoint, finalPoint, startDirection; + if (m_delegate) { - lock_guard paramsGuard(m_paramsMutex); + m_delegate->Cancel(); + m_delegate.reset(); + } +} + +void AsyncRouter::ThreadFunc() +{ + while (true) + { + { + unique_lock ul(m_guard); + m_threadCondVar.wait(ul, [this](){ return m_threadExit || m_hasRequest; }); + + if (m_threadExit) + break; + } + + CalculateRoute(); + } +} + +void AsyncRouter::CalculateRoute() +{ + bool clearState = true; + shared_ptr delegate; + m2::PointD startPoint, finalPoint, startDirection; + shared_ptr absentFetcher; + shared_ptr router; + + { + unique_lock ul(m_guard); + + bool hasRequest = m_hasRequest; + m_hasRequest = false; + if (!hasRequest) + return; + if (!m_router) + return; + if (!m_delegate) + return; startPoint = m_startPoint; finalPoint = m_finalPoint; startDirection = m_startDirection; + clearState = m_clearState; + delegate = m_delegate; + router = m_router; + absentFetcher = m_absentFetcher; - m_delegate.Reset(); - m_delegate.SetTimeout(timeoutSec); + m_clearState = false; } + Route route(router->GetName()); + IRouter::ResultCode code; + my::Timer timer; double elapsedSec = 0.0; @@ -172,11 +278,14 @@ void AsyncRouter::CalculateRouteImpl(TReadyCallback const & readyCallback, uint3 { LOG(LDEBUG, ("Calculating the route from", startPoint, "to", finalPoint, "startDirection", startDirection)); - if (m_absentFetcher) - m_absentFetcher->GenerateRequest(startPoint, finalPoint); + if (absentFetcher) + absentFetcher->GenerateRequest(startPoint, finalPoint); + + if (clearState) + router->ClearState(); // Run basic request. - code = m_router->CalculateRoute(startPoint, startDirection, finalPoint, m_delegate, route); + code = router->CalculateRoute(startPoint, startDirection, finalPoint, delegate->GetDelegate(), route); elapsedSec = timer.ElapsedSeconds(); // routing time LogCode(code, elapsedSec); @@ -186,23 +295,23 @@ void AsyncRouter::CalculateRouteImpl(TReadyCallback const & readyCallback, uint3 code = IRouter::InternalError; LOG(LERROR, ("Exception happened while calculating route:", e.Msg())); SendStatistics(startPoint, startDirection, finalPoint, e.Msg()); - readyCallback(route, code); + delegate->OnReady(route, code); return; } SendStatistics(startPoint, startDirection, finalPoint, code, route, elapsedSec); - //Draw route without waiting network latency. + // Draw route without waiting network latency. if (code == IRouter::NoError) - readyCallback(route, code); + delegate->OnReady(route, code); bool const needFetchAbsent = (code != IRouter::Cancelled); // Check online response if we have. vector absent; - if (m_absentFetcher && needFetchAbsent) + if (absentFetcher && needFetchAbsent) { - m_absentFetcher->GetAbsentCountries(absent); + absentFetcher->GetAbsentCountries(absent); for (string const & country : absent) route.AddAbsentCountry(country); } @@ -215,7 +324,7 @@ void AsyncRouter::CalculateRouteImpl(TReadyCallback const & readyCallback, uint3 // Call callback only if we have some new data. if (code != IRouter::NoError) - readyCallback(route, code); + delegate->OnReady(route, code); } void AsyncRouter::SendStatistics(m2::PointD const & startPoint, m2::PointD const & startDirection, diff --git a/routing/async_router.hpp b/routing/async_router.hpp index 148df81826..a290e8702b 100644 --- a/routing/async_router.hpp +++ b/routing/async_router.hpp @@ -5,16 +5,19 @@ #include "router.hpp" #include "router_delegate.hpp" -#include "std/atomic.hpp" +#include "std/condition_variable.hpp" #include "std/map.hpp" #include "std/mutex.hpp" +#include "std/shared_ptr.hpp" #include "std/string.hpp" +#include "std/thread.hpp" #include "std/unique_ptr.hpp" namespace routing { -class AsyncRouter +/// Dispatches a route calculation on a worker thread +class AsyncRouter final { public: /// Callback takes ownership of passed route. @@ -24,13 +27,14 @@ public: using TRoutingStatisticsCallback = function const &)>; /// AsyncRouter is a wrapper class to run routing routines in the different thread - /// @param router pointer to the router implementation. AsyncRouter will take ownership over - /// router. - AsyncRouter(unique_ptr && router, unique_ptr && fetcher, - TRoutingStatisticsCallback const & routingStatisticsCallback, + AsyncRouter(TRoutingStatisticsCallback const & routingStatisticsCallback, RouterDelegate::TPointCheckCallback const & pointCheckCallback); + ~AsyncRouter(); - virtual ~AsyncRouter(); + /// Sets a synchronous router, current route calculation will be cancelled + /// @param router pointer to a router implementation + /// @param fetcher pointer to a online fetcher + void SetRouter(unique_ptr && router, unique_ptr && fetcher); /// Main method to calulate new route from startPt to finalPt with start direction /// Processed result will be passed to callback. Callback will called at GUI thread. @@ -41,17 +45,22 @@ public: /// @param readyCallback function to return routing result /// @param progressCallback function to update the router progress /// @param timeoutSec timeout to cancel routing. 0 is infinity. - virtual void CalculateRoute(m2::PointD const & startPoint, m2::PointD const & direction, - m2::PointD const & finalPoint, TReadyCallback const & readyCallback, - RouterDelegate::TProgressCallback const & progressCallback, - uint32_t timeoutSec); + void CalculateRoute(m2::PointD const & startPoint, m2::PointD const & direction, + m2::PointD const & finalPoint, TReadyCallback const & readyCallback, + RouterDelegate::TProgressCallback const & progressCallback, + uint32_t timeoutSec); /// Interrupt routing and clear buffers - virtual void ClearState(); + void ClearState(); private: - /// This function is called in async mode - void CalculateRouteImpl(TReadyCallback const & readyCallback, uint32_t timeoutSec); + /// Worker thread function + void ThreadFunc(); + + /// This function is called in worker thread + void CalculateRoute(); + + void ResetDelegate(); /// These functions are called to send statistics about the routing void SendStatistics(m2::PointD const & startPoint, m2::PointD const & startDirection, @@ -65,19 +74,51 @@ private: void LogCode(IRouter::ResultCode code, double const elapsedSec); - mutex m_paramsMutex; - mutex m_routingMutex; - atomic_flag m_isReadyThread; + /// Blocks callbacks when routing has been cancelled + class RouterDelegateProxy + { + public: + RouterDelegateProxy(TReadyCallback const & onReady, + RouterDelegate::TPointCheckCallback const & onPointCheck, + RouterDelegate::TProgressCallback const & onProgress, + uint32_t timeoutSec); + void OnReady(Route & route, IRouter::ResultCode resultCode); + void Cancel(); + + RouterDelegate const & GetDelegate() const { return m_delegate; } + + private: + void OnProgress(float progress); + void OnPointCheck(m2::PointD const & pt); + + mutex m_guard; + TReadyCallback const m_onReady; + RouterDelegate::TPointCheckCallback const m_onPointCheck; + RouterDelegate::TProgressCallback const m_onProgress; + RouterDelegate m_delegate; + }; + +private: + mutex m_guard; + + /// Thread which executes routing calculation + thread m_thread; + condition_variable m_threadCondVar; + bool m_threadExit; + bool m_hasRequest; + + /// Current request parameters + bool m_clearState; m2::PointD m_startPoint; m2::PointD m_finalPoint; m2::PointD m_startDirection; + shared_ptr m_delegate; + shared_ptr m_absentFetcher; + shared_ptr m_router; - RouterDelegate m_delegate; - - unique_ptr const m_absentFetcher; - unique_ptr const m_router; TRoutingStatisticsCallback const m_routingStatisticsCallback; + RouterDelegate::TPointCheckCallback const m_pointCheckCallback; }; } // namespace routing diff --git a/routing/routing_session.cpp b/routing/routing_session.cpp index 8303a49051..0f8fe03ce9 100644 --- a/routing/routing_session.cpp +++ b/routing/routing_session.cpp @@ -34,6 +34,13 @@ RoutingSession::RoutingSession() { } +void RoutingSession::Init(TRoutingStatisticsCallback const & routingStatisticsFn, + RouterDelegate::TPointCheckCallback const & pointCheckCallback) +{ + ASSERT(m_router == nullptr, ()); + m_router.reset(new AsyncRouter(routingStatisticsFn, pointCheckCallback)); +} + void RoutingSession::BuildRoute(m2::PointD const & startPoint, m2::PointD const & endPoint, TReadyCallback const & readyCallback, TProgressCallback const & progressCallback, @@ -91,6 +98,8 @@ void RoutingSession::RemoveRoute() void RoutingSession::Reset() { + ASSERT(m_router != nullptr, ()); + threads::MutexGuard guard(m_routeSessionMutex); UNUSED_VALUE(guard); @@ -240,15 +249,11 @@ void RoutingSession::AssignRoute(Route & route, IRouter::ResultCode e) } void RoutingSession::SetRouter(unique_ptr && router, - unique_ptr && fetcher, - TRoutingStatisticsCallback const & routingStatisticsFn, - RouterDelegate::TPointCheckCallback const & pointCheckCallback) + unique_ptr && fetcher) { - if (m_router) - Reset(); - - m_router.reset(new AsyncRouter(move(router), move(fetcher), routingStatisticsFn, - pointCheckCallback)); + ASSERT(m_router != nullptr, ()); + Reset(); + m_router->SetRouter(move(router), move(fetcher)); } void RoutingSession::MatchLocationToRoute(location::GpsInfo & location, diff --git a/routing/routing_session.hpp b/routing/routing_session.hpp index cf759c2532..f4e67f6a90 100644 --- a/routing/routing_session.hpp +++ b/routing/routing_session.hpp @@ -56,9 +56,10 @@ public: RoutingSession(); - void SetRouter(unique_ptr && router, unique_ptr && fetcher, - TRoutingStatisticsCallback const & routingStatisticsFn, - RouterDelegate::TPointCheckCallback const & pointCheckCallback); + void Init(TRoutingStatisticsCallback const & routingStatisticsFn, + RouterDelegate::TPointCheckCallback const & pointCheckCallback); + + void SetRouter(unique_ptr && router, unique_ptr && fetcher); /// @param[in] startPoint and endPoint in mercator /// @param[in] timeoutSec timeout in seconds, if zero then there is no timeout diff --git a/routing/routing_tests/async_router_test.cpp b/routing/routing_tests/async_router_test.cpp index 8f826800b3..83827f58b6 100644 --- a/routing/routing_tests/async_router_test.cpp +++ b/routing/routing_tests/async_router_test.cpp @@ -95,8 +95,8 @@ UNIT_TEST(NeedMoreMapsSignalTest) unique_ptr fetcher(new DummyFetcher(absentData)); unique_ptr router(new DummyRouter(ResultCode::NoError, {})); DummyResultCallback resultCallback(2 /* expectedCalls */); - AsyncRouter async(move(router), move(fetcher), DummyStatisticsCallback, - nullptr /* pointCheckCallback */); + AsyncRouter async(DummyStatisticsCallback, nullptr /* pointCheckCallback */); + async.SetRouter(move(router), move(fetcher)); async.CalculateRoute({1, 2}, {3, 4}, {5, 6}, bind(ref(resultCallback), _1, _2), nullptr /* progressCallback */, 0 /* timeoutSec */); @@ -116,8 +116,8 @@ UNIT_TEST(StandartAsyncFogTest) unique_ptr fetcher(new DummyFetcher({})); unique_ptr router(new DummyRouter(ResultCode::NoError, {})); DummyResultCallback resultCallback(1 /* expectedCalls */); - AsyncRouter async(move(router), move(fetcher), DummyStatisticsCallback, - nullptr /* pointCheckCallback */); + AsyncRouter async(DummyStatisticsCallback, nullptr /* pointCheckCallback */); + async.SetRouter(move(router), move(fetcher)); async.CalculateRoute({1, 2}, {3, 4}, {5, 6}, bind(ref(resultCallback), _1, _2), nullptr /* progressCallback */, 0 /* timeoutSec */);