diff --git a/search/search_engine.cpp b/search/search_engine.cpp index 5909f961fd..2e1b2b4f55 100644 --- a/search/search_engine.cpp +++ b/search/search_engine.cpp @@ -89,28 +89,28 @@ void SendStatistics(SearchParams const & params, m2::RectD const & viewport, Res } // namespace // QueryHandle ------------------------------------------------------------------------------------- -QueryHandle::QueryHandle() : m_query(nullptr), m_cancelled(false) {} +QueryHandle::QueryHandle() : m_processor(nullptr), m_cancelled(false) {} void QueryHandle::Cancel() { lock_guard lock(m_mu); m_cancelled = true; - if (m_query) - m_query->Cancel(); + if (m_processor) + m_processor->Cancel(); } -void QueryHandle::Attach(Query & query) +void QueryHandle::Attach(Query & processor) { lock_guard lock(m_mu); - m_query = &query; + m_processor = &processor; if (m_cancelled) - m_query->Cancel(); + m_processor->Cancel(); } void QueryHandle::Detach() { lock_guard lock(m_mu); - m_query = nullptr; + m_processor = nullptr; } // Engine::Params ---------------------------------------------------------------------------------- @@ -134,9 +134,9 @@ Engine::Engine(Index & index, CategoriesHolder const & categories, m_contexts.resize(params.m_numThreads); for (size_t i = 0; i < params.m_numThreads; ++i) { - auto query = factory->BuildSearchQuery(index, m_categories, m_suggests, infoGetter); - query->SetPreferredLocale(params.m_locale); - m_contexts[i].m_query = move(query); + auto processor = factory->BuildSearchQuery(index, m_categories, m_suggests, infoGetter); + processor->SetPreferredLocale(params.m_locale); + m_contexts[i].m_processor = move(processor); } m_threads.reserve(params.m_numThreads); @@ -159,34 +159,43 @@ Engine::~Engine() weak_ptr Engine::Search(SearchParams const & params, m2::RectD const & viewport) { shared_ptr handle(new QueryHandle()); - PostMessage(bind(&Engine::DoSearch, this, params, viewport, handle, _1), Message::TYPE_TASK); + PostMessage(Message::TYPE_TASK, [this, params, viewport, handle](Query & query) + { + DoSearch(params, viewport, handle, query); + }); return handle; } void Engine::SetSupportOldFormat(bool support) { - PostMessage(bind(&Engine::DoSupportOldFormat, this, support, _1), Message::TYPE_BROADCAST); + PostMessage(Message::TYPE_BROADCAST, [this, support](Query & processor) + { + processor.SupportOldFormat(support); + }); } void Engine::ClearCaches() { - PostMessage(bind(&Engine::DoClearCaches, this, _1), Message::TYPE_BROADCAST); + PostMessage(Message::TYPE_BROADCAST, [this](Query & processor) + { + processor.ClearCaches(); + }); } void Engine::SetRankPivot(SearchParams const & params, m2::RectD const & viewport, - bool viewportSearch, Query & query) + bool viewportSearch, Query & processor) { if (!viewportSearch && params.IsValidPosition()) { m2::PointD const pos = MercatorBounds::FromLatLon(params.m_lat, params.m_lon); if (m2::Inflate(viewport, viewport.SizeX() / 4.0, viewport.SizeY() / 4.0).IsPointInside(pos)) { - query.SetRankPivot(pos); + processor.SetRankPivot(pos); return; } } - query.SetRankPivot(viewport.Center()); + processor.SetRankPivot(viewport.Center()); } void Engine::EmitResults(SearchParams const & params, Results const & res) @@ -198,47 +207,55 @@ void Engine::MainLoop(Context & context) { while (true) { - unique_lock lock(m_mu); - m_cv.wait(lock, [&]() - { - return m_shutdown || !m_messages.empty() || !context.m_messages.empty(); - }); - - if (m_shutdown) - break; - - // As SearchEngine is thread-safe, there is a global order on - // public API requests, and this order is kept by the global - // |m_messages| queue. When a broadcast message arrives, it must - // be executed by all threads, therefore the first free thread - // extracts it from |m_messages| and replicates to all - // thread-specific |m_messages| queues. bool hasBroadcast = false; - while (!m_messages.empty() && m_messages.front().m_type == Message::TYPE_BROADCAST) - { - for (auto & b : m_contexts) - b.m_messages.push(m_messages.front()); - m_messages.pop(); - hasBroadcast = true; - } - - // Consumes first non-broadcast message, if any. - if (!m_messages.empty()) - { - context.m_messages.push(move(m_messages.front())); - m_messages.pop(); - } - queue messages; - messages.swap(context.m_messages); - lock.unlock(); + { + unique_lock lock(m_mu); + m_cv.wait(lock, [&]() + { + return m_shutdown || !m_messages.empty() || !context.m_messages.empty(); + }); + + if (m_shutdown) + break; + + // As SearchEngine is thread-safe, there is a global order on + // public API requests, and this order is kept by the global + // |m_messages| queue. When a broadcast message arrives, it + // must be executed in any case by all threads, therefore the + // first free thread extracts as many as possible broadcast + // messages from |m_messages| front and replicates them to all + // thread-specific |m_messages| queues. + while (!m_messages.empty() && m_messages.front().m_type == Message::TYPE_BROADCAST) + { + for (auto & b : m_contexts) + b.m_messages.push(m_messages.front()); + m_messages.pop(); + hasBroadcast = true; + } + + // Consumes first non-broadcast message, if any. We process + // only a single task message (in constrast with broadcast + // messages) because task messages are actually search queries, + // whose processing may take an arbitrary amount of time. So + // it's better to process only one message and leave rest to the + // next free search thread. + if (!m_messages.empty()) + { + context.m_messages.push(move(m_messages.front())); + m_messages.pop(); + } + + messages.swap(context.m_messages); + } + if (hasBroadcast) m_cv.notify_all(); while (!messages.empty()) { - messages.front()(*context.m_query); + messages.front()(*context.m_processor); messages.pop(); } } @@ -253,43 +270,43 @@ void Engine::PostMessage(TArgs && ... args) } void Engine::DoSearch(SearchParams const & params, m2::RectD const & viewport, - shared_ptr handle, Query & query) + shared_ptr handle, Query & processor) { bool const viewportSearch = params.GetMode() == Mode::Viewport; - // Initialize query. - query.Init(viewportSearch); - handle->Attach(query); + // Initialize query processor. + processor.Init(viewportSearch); + handle->Attach(processor); MY_SCOPE_GUARD(detach, [&handle] { handle->Detach(); }); - // Early exit when query is cancelled. - if (query.IsCancelled()) + // Early exit when query processing is cancelled. + if (processor.IsCancelled()) { params.m_onResults(Results::GetEndMarker(true /* isCancelled */)); return; } - SetRankPivot(params, viewport, viewportSearch, query); + SetRankPivot(params, viewport, viewportSearch, processor); if (params.IsValidPosition()) - query.SetPosition(MercatorBounds::FromLatLon(params.m_lat, params.m_lon)); + processor.SetPosition(MercatorBounds::FromLatLon(params.m_lat, params.m_lon)); else - query.SetPosition(viewport.Center()); + processor.SetPosition(viewport.Center()); - query.SetMode(params.GetMode()); + processor.SetMode(params.GetMode()); // This flag is needed for consistency with old search algorithm // only. It will be gone when we remove old search code. - query.SetSearchInWorld(true); + processor.SetSearchInWorld(true); - query.SetInputLocale(params.m_inputLocale); + processor.SetInputLocale(params.m_inputLocale); ASSERT(!params.m_query.empty(), ()); - query.SetQuery(params.m_query); + processor.SetQuery(params.m_query); Results res; - query.SearchCoordinates(res); + processor.SearchCoordinates(res); try { @@ -298,13 +315,13 @@ void Engine::DoSearch(SearchParams const & params, m2::RectD const & viewport, if (viewportSearch) { - query.SetViewport(viewport, true /* forceUpdate */); - query.SearchViewportPoints(res); + processor.SetViewport(viewport, true /* forceUpdate */); + processor.SearchViewportPoints(res); } else { - query.SetViewport(viewport, params.IsSearchAroundPosition() /* forceUpdate */); - query.Search(res, kResultsCount); + processor.SetViewport(viewport, params.IsSearchAroundPosition() /* forceUpdate */); + processor.Search(res, kResultsCount); } EmitResults(params, res); @@ -314,14 +331,10 @@ void Engine::DoSearch(SearchParams const & params, m2::RectD const & viewport, LOG(LDEBUG, ("Search has been cancelled.")); } - if (!viewportSearch && !query.IsCancelled()) + if (!viewportSearch && !processor.IsCancelled()) SendStatistics(params, viewport, res); // Emit finish marker to client. - params.m_onResults(Results::GetEndMarker(query.IsCancelled())); + params.m_onResults(Results::GetEndMarker(processor.IsCancelled())); } - -void Engine::DoSupportOldFormat(bool support, Query & query) { query.SupportOldFormat(support); } - -void Engine::DoClearCaches(Query & query) { query.ClearCaches(); } } // namespace search diff --git a/search/search_engine.hpp b/search/search_engine.hpp index 83fe8d7e4c..7a6d0866b8 100644 --- a/search/search_engine.hpp +++ b/search/search_engine.hpp @@ -53,18 +53,18 @@ public: private: friend class Engine; - // Attaches the handle to a |query|. If there was or will be a - // cancel signal, this signal will be propagated to |query|. This - // method is called only once, when search engine starts to process - // query this handle corresponds to. - void Attach(Query & query); + // Attaches the handle to a |processor|. If there was or will be a + // cancel signal, this signal will be propagated to |processor|. + // This method is called only once, when search engine starts to + // process query this handle corresponds to. + void Attach(Query & processor); - // Detaches handle from a query. This method is called only once, - // when search engine completes process of a query this handle + // Detaches handle from a processor. This method is called only + // once, when search engine completes process of a query this handle // corresponds to. void Detach(); - Query * m_query; + Query * m_processor; bool m_cancelled; mutex m_mu; @@ -109,7 +109,7 @@ public: private: struct Message { - using TFn = function; + using TFn = function; enum Type { @@ -117,14 +117,12 @@ private: TYPE_BROADCAST }; - Message(TFn && fn, Type type) : m_fn(move(fn)), m_type(type) {} - Message(Message const &) = default; - Message(Message &&) = default; + Message(Type type, TFn && fn) : m_type(type), m_fn(move(fn)) {} - void operator()(Query & query) { m_fn(query); } + void operator()(Query & processor) { m_fn(processor); } - TFn m_fn; Type m_type; + TFn m_fn; }; // alignas() is used here to prevent false-sharing between different @@ -139,12 +137,12 @@ private: // This field is thread-specific and *CAN NOT* be accessed by // other threads. - unique_ptr m_query; + unique_ptr m_processor; }; // *ALL* following methods are executed on the m_threads threads. void SetRankPivot(SearchParams const & params, m2::RectD const & viewport, bool viewportSearch, - Query & query); + Query & processor); void EmitResults(SearchParams const & params, Results const & res); @@ -158,11 +156,7 @@ private: void PostMessage(TArgs &&... args); void DoSearch(SearchParams const & params, m2::RectD const & viewport, - shared_ptr handle, Query & query); - - void DoSupportOldFormat(bool support, Query & query); - - void DoClearCaches(Query & query); + shared_ptr handle, Query & processor); CategoriesHolder const & m_categories; vector m_suggests; diff --git a/search/search_query.hpp b/search/search_query.hpp index e815fd322c..db9bed9f0d 100644 --- a/search/search_query.hpp +++ b/search/search_query.hpp @@ -64,6 +64,7 @@ namespace impl class HouseCompFactory; } +// TODO (@y): rename this class to QueryProcessor. class Query : public my::Cancellable { public: