[search] Fixed broadcast messages processing order in SearchEngine.

This commit is contained in:
Yuri Gorshenin 2016-02-25 13:36:50 +03:00 committed by Sergey Yershov
parent 4c3e64f8aa
commit 7c1a4dcbc4
2 changed files with 82 additions and 48 deletions

View file

@ -131,21 +131,17 @@ Engine::Engine(Index & index, CategoriesHolder const & categories,
m_categories.ForEachName(bind<void>(ref(doInit), _1));
doInit.GetSuggests(m_suggests);
m_queries.reserve(params.m_numThreads);
m_contexts.resize(params.m_numThreads);
for (size_t i = 0; i < params.m_numThreads; ++i)
{
auto query = factory->BuildSearchQuery(index, m_categories, m_suggests, infoGetter);
query->SetPreferredLocale(params.m_locale);
m_queries.push_back(move(query));
m_contexts[i].m_query = move(query);
}
m_broadcast.resize(params.m_numThreads);
m_threads.reserve(params.m_numThreads);
for (size_t i = 0; i < params.m_numThreads; ++i)
{
m_threads.emplace_back(&Engine::MainLoop, this, ref(*m_queries[i]), ref(m_tasks),
ref(m_broadcast[i]));
}
m_threads.emplace_back(&Engine::MainLoop, this, ref(m_contexts[i]));
}
Engine::~Engine()
@ -163,16 +159,19 @@ Engine::~Engine()
weak_ptr<QueryHandle> Engine::Search(SearchParams const & params, m2::RectD const & viewport)
{
shared_ptr<QueryHandle> handle(new QueryHandle());
PostTask(bind(&Engine::DoSearch, this, params, viewport, handle, _1));
PostMessage(bind(&Engine::DoSearch, this, params, viewport, handle, _1), Message::TYPE_TASK);
return handle;
}
void Engine::SetSupportOldFormat(bool support)
{
PostBroadcast(bind(&Engine::DoSupportOldFormat, this, support, _1));
PostMessage(bind(&Engine::DoSupportOldFormat, this, support, _1), Message::TYPE_BROADCAST);
}
void Engine::ClearCaches() { PostBroadcast(bind(&Engine::DoClearCaches, this, _1)); }
void Engine::ClearCaches()
{
PostMessage(bind(&Engine::DoClearCaches, this, _1), Message::TYPE_BROADCAST);
}
void Engine::SetRankPivot(SearchParams const & params, m2::RectD const & viewport,
bool viewportSearch, Query & query)
@ -195,55 +194,64 @@ void Engine::EmitResults(SearchParams const & params, Results const & res)
params.m_onResults(res);
}
void Engine::MainLoop(Query & query, queue<TTask> & tasks, queue<TTask> & broadcast)
void Engine::MainLoop(Context & context)
{
while (true)
{
unique_lock<mutex> lock(m_mu);
m_cv.wait(lock, [&]()
{
return m_shutdown || !tasks.empty() || !broadcast.empty();
return m_shutdown || !m_messages.empty() || !context.m_messages.empty();
});
if (m_shutdown)
break;
queue<TTask> ts;
// Execute all broadcast tasks at once.
ts.swap(broadcast);
if (!tasks.empty())
// As SearchEngine is thread-safe, there is a global order on
// public API requests, and this order is kept by the global
// |m_messages| queue. When a broadcast message arrives, it must
// be executed by all threads, therefore the first free thread
// extracts it from |m_messages| and replicates to all
// thread-specific |m_messages| queues.
bool hasBroadcast = false;
while (!m_messages.empty() && m_messages.front().m_type == Message::TYPE_BROADCAST)
{
// Execute only first task from the common pool.
ts.push(move(tasks.front()));
tasks.pop();
for (auto & b : m_contexts)
b.m_messages.push(m_messages.front());
m_messages.pop();
hasBroadcast = true;
}
lock.unlock();
while (!ts.empty())
// Consumes first non-broadcast message, if any.
if (!m_messages.empty())
{
ts.front()(query);
ts.pop();
context.m_messages.push(move(m_messages.front()));
m_messages.pop();
}
queue<Message> messages;
messages.swap(context.m_messages);
lock.unlock();
if (hasBroadcast)
m_cv.notify_all();
while (!messages.empty())
{
messages.front()(*context.m_query);
messages.pop();
}
}
}
void Engine::PostTask(TTask && task)
template <typename... TArgs>
void Engine::PostMessage(TArgs && ... args)
{
lock_guard<mutex> lock(m_mu);
m_tasks.push(move(task));
m_messages.emplace(forward<TArgs>(args)...);
m_cv.notify_one();
}
void Engine::PostBroadcast(TTask const & task)
{
lock_guard<mutex> lock(m_mu);
for (auto & pool : m_broadcast)
pool.push(task);
m_cv.notify_all();
}
void Engine::DoSearch(SearchParams const & params, m2::RectD const & viewport,
shared_ptr<QueryHandle> handle, Query & query)
{

View file

@ -107,7 +107,40 @@ public:
void ClearCaches();
private:
using TTask = function<void(Query & query)>;
struct Message
{
using TFn = function<void(Query & query)>;
enum Type
{
TYPE_TASK,
TYPE_BROADCAST
};
Message(TFn && fn, Type type) : m_fn(move(fn)), m_type(type) {}
Message(Message const &) = default;
Message(Message &&) = default;
void operator()(Query & query) { m_fn(query); }
TFn m_fn;
Type m_type;
};
// alignas() is used here to prevent false-sharing between different
// threads.
struct alignas(64 /* the most common cache-line size */) Context
{
// This field *CAN* be accessed by other threads, so |m_mu| must
// be taken before access this queue. Messages are ordered here
// by a timestamp and all timestamps are less than timestamps in
// the global |m_messages| queue.
queue<Message> m_messages;
// This field is thread-specific and *CAN NOT* be accessed by
// other threads.
unique_ptr<Query> m_query;
};
// *ALL* following methods are executed on the m_threads threads.
void SetRankPivot(SearchParams const & params, m2::RectD const & viewport, bool viewportSearch,
@ -119,11 +152,10 @@ private:
// manner. |broadcast| contains per-thread tasks, but nevertheless
// all necessary synchronization primitives must be used to access
// |tasks| and |broadcast|.
void MainLoop(Query & query, queue<TTask> & tasks, queue<TTask> & broadcast);
void MainLoop(Context & context);
void PostTask(TTask && task);
void PostBroadcast(TTask const & task);
template <typename... TArgs>
void PostMessage(TArgs &&... args);
void DoSearch(SearchParams const & params, m2::RectD const & viewport,
shared_ptr<QueryHandle> handle, Query & query);
@ -139,14 +171,8 @@ private:
mutex m_mu;
condition_variable m_cv;
// List of per-thread pools, used to deliver broadcast messages to
// search threads.
vector<queue<TTask>> m_broadcast;
// Common pool of queries, used to store search tasks.
queue<TTask> m_tasks;
vector<unique_ptr<Query>> m_queries;
queue<Message> m_messages;
vector<Context> m_contexts;
vector<threads::SimpleThread> m_threads;
};
} // namespace search