Review fixes.

This commit is contained in:
Yuri Gorshenin 2016-02-25 15:47:49 +03:00 committed by Sergey Yershov
parent 7e08d3f46f
commit edfa64a3f9
3 changed files with 102 additions and 94 deletions

View file

@ -89,28 +89,28 @@ void SendStatistics(SearchParams const & params, m2::RectD const & viewport, Res
} // namespace
// QueryHandle -------------------------------------------------------------------------------------
QueryHandle::QueryHandle() : m_query(nullptr), m_cancelled(false) {}
QueryHandle::QueryHandle() : m_processor(nullptr), m_cancelled(false) {}
void QueryHandle::Cancel()
{
lock_guard<mutex> lock(m_mu);
m_cancelled = true;
if (m_query)
m_query->Cancel();
if (m_processor)
m_processor->Cancel();
}
void QueryHandle::Attach(Query & query)
void QueryHandle::Attach(Query & processor)
{
lock_guard<mutex> lock(m_mu);
m_query = &query;
m_processor = &processor;
if (m_cancelled)
m_query->Cancel();
m_processor->Cancel();
}
void QueryHandle::Detach()
{
lock_guard<mutex> lock(m_mu);
m_query = nullptr;
m_processor = nullptr;
}
// Engine::Params ----------------------------------------------------------------------------------
@ -134,9 +134,9 @@ Engine::Engine(Index & index, CategoriesHolder const & categories,
m_contexts.resize(params.m_numThreads);
for (size_t i = 0; i < params.m_numThreads; ++i)
{
auto query = factory->BuildSearchQuery(index, m_categories, m_suggests, infoGetter);
query->SetPreferredLocale(params.m_locale);
m_contexts[i].m_query = move(query);
auto processor = factory->BuildSearchQuery(index, m_categories, m_suggests, infoGetter);
processor->SetPreferredLocale(params.m_locale);
m_contexts[i].m_processor = move(processor);
}
m_threads.reserve(params.m_numThreads);
@ -159,34 +159,43 @@ Engine::~Engine()
weak_ptr<QueryHandle> Engine::Search(SearchParams const & params, m2::RectD const & viewport)
{
shared_ptr<QueryHandle> handle(new QueryHandle());
PostMessage(bind(&Engine::DoSearch, this, params, viewport, handle, _1), Message::TYPE_TASK);
PostMessage(Message::TYPE_TASK, [this, params, viewport, handle](Query & query)
{
DoSearch(params, viewport, handle, query);
});
return handle;
}
void Engine::SetSupportOldFormat(bool support)
{
PostMessage(bind(&Engine::DoSupportOldFormat, this, support, _1), Message::TYPE_BROADCAST);
PostMessage(Message::TYPE_BROADCAST, [this, support](Query & processor)
{
processor.SupportOldFormat(support);
});
}
void Engine::ClearCaches()
{
PostMessage(bind(&Engine::DoClearCaches, this, _1), Message::TYPE_BROADCAST);
PostMessage(Message::TYPE_BROADCAST, [this](Query & processor)
{
processor.ClearCaches();
});
}
void Engine::SetRankPivot(SearchParams const & params, m2::RectD const & viewport,
bool viewportSearch, Query & query)
bool viewportSearch, Query & processor)
{
if (!viewportSearch && params.IsValidPosition())
{
m2::PointD const pos = MercatorBounds::FromLatLon(params.m_lat, params.m_lon);
if (m2::Inflate(viewport, viewport.SizeX() / 4.0, viewport.SizeY() / 4.0).IsPointInside(pos))
{
query.SetRankPivot(pos);
processor.SetRankPivot(pos);
return;
}
}
query.SetRankPivot(viewport.Center());
processor.SetRankPivot(viewport.Center());
}
void Engine::EmitResults(SearchParams const & params, Results const & res)
@ -198,47 +207,55 @@ void Engine::MainLoop(Context & context)
{
while (true)
{
unique_lock<mutex> lock(m_mu);
m_cv.wait(lock, [&]()
{
return m_shutdown || !m_messages.empty() || !context.m_messages.empty();
});
if (m_shutdown)
break;
// As SearchEngine is thread-safe, there is a global order on
// public API requests, and this order is kept by the global
// |m_messages| queue. When a broadcast message arrives, it must
// be executed by all threads, therefore the first free thread
// extracts it from |m_messages| and replicates to all
// thread-specific |m_messages| queues.
bool hasBroadcast = false;
while (!m_messages.empty() && m_messages.front().m_type == Message::TYPE_BROADCAST)
{
for (auto & b : m_contexts)
b.m_messages.push(m_messages.front());
m_messages.pop();
hasBroadcast = true;
}
// Consumes first non-broadcast message, if any.
if (!m_messages.empty())
{
context.m_messages.push(move(m_messages.front()));
m_messages.pop();
}
queue<Message> messages;
messages.swap(context.m_messages);
lock.unlock();
{
unique_lock<mutex> lock(m_mu);
m_cv.wait(lock, [&]()
{
return m_shutdown || !m_messages.empty() || !context.m_messages.empty();
});
if (m_shutdown)
break;
// As SearchEngine is thread-safe, there is a global order on
// public API requests, and this order is kept by the global
// |m_messages| queue. When a broadcast message arrives, it
// must be executed in any case by all threads, therefore the
// first free thread extracts as many as possible broadcast
// messages from |m_messages| front and replicates them to all
// thread-specific |m_messages| queues.
while (!m_messages.empty() && m_messages.front().m_type == Message::TYPE_BROADCAST)
{
for (auto & b : m_contexts)
b.m_messages.push(m_messages.front());
m_messages.pop();
hasBroadcast = true;
}
// Consumes first non-broadcast message, if any. We process
// only a single task message (in constrast with broadcast
// messages) because task messages are actually search queries,
// whose processing may take an arbitrary amount of time. So
// it's better to process only one message and leave rest to the
// next free search thread.
if (!m_messages.empty())
{
context.m_messages.push(move(m_messages.front()));
m_messages.pop();
}
messages.swap(context.m_messages);
}
if (hasBroadcast)
m_cv.notify_all();
while (!messages.empty())
{
messages.front()(*context.m_query);
messages.front()(*context.m_processor);
messages.pop();
}
}
@ -253,43 +270,43 @@ void Engine::PostMessage(TArgs && ... args)
}
void Engine::DoSearch(SearchParams const & params, m2::RectD const & viewport,
shared_ptr<QueryHandle> handle, Query & query)
shared_ptr<QueryHandle> handle, Query & processor)
{
bool const viewportSearch = params.GetMode() == Mode::Viewport;
// Initialize query.
query.Init(viewportSearch);
handle->Attach(query);
// Initialize query processor.
processor.Init(viewportSearch);
handle->Attach(processor);
MY_SCOPE_GUARD(detach, [&handle] { handle->Detach(); });
// Early exit when query is cancelled.
if (query.IsCancelled())
// Early exit when query processing is cancelled.
if (processor.IsCancelled())
{
params.m_onResults(Results::GetEndMarker(true /* isCancelled */));
return;
}
SetRankPivot(params, viewport, viewportSearch, query);
SetRankPivot(params, viewport, viewportSearch, processor);
if (params.IsValidPosition())
query.SetPosition(MercatorBounds::FromLatLon(params.m_lat, params.m_lon));
processor.SetPosition(MercatorBounds::FromLatLon(params.m_lat, params.m_lon));
else
query.SetPosition(viewport.Center());
processor.SetPosition(viewport.Center());
query.SetMode(params.GetMode());
processor.SetMode(params.GetMode());
// This flag is needed for consistency with old search algorithm
// only. It will be gone when we remove old search code.
query.SetSearchInWorld(true);
processor.SetSearchInWorld(true);
query.SetInputLocale(params.m_inputLocale);
processor.SetInputLocale(params.m_inputLocale);
ASSERT(!params.m_query.empty(), ());
query.SetQuery(params.m_query);
processor.SetQuery(params.m_query);
Results res;
query.SearchCoordinates(res);
processor.SearchCoordinates(res);
try
{
@ -298,13 +315,13 @@ void Engine::DoSearch(SearchParams const & params, m2::RectD const & viewport,
if (viewportSearch)
{
query.SetViewport(viewport, true /* forceUpdate */);
query.SearchViewportPoints(res);
processor.SetViewport(viewport, true /* forceUpdate */);
processor.SearchViewportPoints(res);
}
else
{
query.SetViewport(viewport, params.IsSearchAroundPosition() /* forceUpdate */);
query.Search(res, kResultsCount);
processor.SetViewport(viewport, params.IsSearchAroundPosition() /* forceUpdate */);
processor.Search(res, kResultsCount);
}
EmitResults(params, res);
@ -314,14 +331,10 @@ void Engine::DoSearch(SearchParams const & params, m2::RectD const & viewport,
LOG(LDEBUG, ("Search has been cancelled."));
}
if (!viewportSearch && !query.IsCancelled())
if (!viewportSearch && !processor.IsCancelled())
SendStatistics(params, viewport, res);
// Emit finish marker to client.
params.m_onResults(Results::GetEndMarker(query.IsCancelled()));
params.m_onResults(Results::GetEndMarker(processor.IsCancelled()));
}
void Engine::DoSupportOldFormat(bool support, Query & query) { query.SupportOldFormat(support); }
void Engine::DoClearCaches(Query & query) { query.ClearCaches(); }
} // namespace search

View file

@ -53,18 +53,18 @@ public:
private:
friend class Engine;
// Attaches the handle to a |query|. If there was or will be a
// cancel signal, this signal will be propagated to |query|. This
// method is called only once, when search engine starts to process
// query this handle corresponds to.
void Attach(Query & query);
// Attaches the handle to a |processor|. If there was or will be a
// cancel signal, this signal will be propagated to |processor|.
// This method is called only once, when search engine starts to
// process query this handle corresponds to.
void Attach(Query & processor);
// Detaches handle from a query. This method is called only once,
// when search engine completes process of a query this handle
// Detaches handle from a processor. This method is called only
// once, when search engine completes process of a query this handle
// corresponds to.
void Detach();
Query * m_query;
Query * m_processor;
bool m_cancelled;
mutex m_mu;
@ -109,7 +109,7 @@ public:
private:
struct Message
{
using TFn = function<void(Query & query)>;
using TFn = function<void(Query & processor)>;
enum Type
{
@ -117,14 +117,12 @@ private:
TYPE_BROADCAST
};
Message(TFn && fn, Type type) : m_fn(move(fn)), m_type(type) {}
Message(Message const &) = default;
Message(Message &&) = default;
Message(Type type, TFn && fn) : m_type(type), m_fn(move(fn)) {}
void operator()(Query & query) { m_fn(query); }
void operator()(Query & processor) { m_fn(processor); }
TFn m_fn;
Type m_type;
TFn m_fn;
};
// alignas() is used here to prevent false-sharing between different
@ -139,12 +137,12 @@ private:
// This field is thread-specific and *CAN NOT* be accessed by
// other threads.
unique_ptr<Query> m_query;
unique_ptr<Query> m_processor;
};
// *ALL* following methods are executed on the m_threads threads.
void SetRankPivot(SearchParams const & params, m2::RectD const & viewport, bool viewportSearch,
Query & query);
Query & processor);
void EmitResults(SearchParams const & params, Results const & res);
@ -158,11 +156,7 @@ private:
void PostMessage(TArgs &&... args);
void DoSearch(SearchParams const & params, m2::RectD const & viewport,
shared_ptr<QueryHandle> handle, Query & query);
void DoSupportOldFormat(bool support, Query & query);
void DoClearCaches(Query & query);
shared_ptr<QueryHandle> handle, Query & processor);
CategoriesHolder const & m_categories;
vector<Suggest> m_suggests;

View file

@ -64,6 +64,7 @@ namespace impl
class HouseCompFactory;
}
// TODO (@y): rename this class to QueryProcessor.
class Query : public my::Cancellable
{
public: