[local_ads][user] reduced number of threads

This commit is contained in:
Arsentiy Milchakov 2017-12-18 15:54:27 +03:00 committed by Roman Kuznetsov
parent 70090735f7
commit 9d7092946a
8 changed files with 107 additions and 251 deletions

View file

@ -144,10 +144,11 @@ void UserStatsLoader::Update(string const & userName, UpdatePolicy const policy,
return;
}
threads::SimpleThread([this, userName, fn] {
GetPlatform().RunTask(Platform::Thread::Network, [this, userName, fn]
{
if (Update(userName))
GetPlatform().RunTask(Platform::Thread::Gui, fn);
}).detach();
});
}
void UserStatsLoader::Update(string const & userName, TOnUpdateCallback fn)

View file

@ -12,7 +12,6 @@ public:
StatisticsGuard(local_ads::Statistics & statistics) : m_statistics(statistics) {}
~StatisticsGuard()
{
m_statistics.Teardown();
m_statistics.CleanupAfterTesting();
}

View file

@ -20,6 +20,7 @@
#include <functional>
#include <sstream>
#include <utility>
#include "private.h"
@ -159,87 +160,35 @@ std::string MakeRemoteURL(std::string const & userId, std::string const & name,
namespace local_ads
{
Statistics::~Statistics()
{
std::lock_guard<std::mutex> lock(m_mutex);
ASSERT(!m_isRunning, ());
}
void Statistics::Startup()
{
auto const asyncTask = [this]
{
std::lock_guard<std::mutex> lock(m_mutex);
if (m_isRunning)
return;
m_isRunning = true;
}
m_thread = threads::SimpleThread(&Statistics::ThreadRoutine, this);
}
SendToServer();
};
void Statistics::Teardown()
{
auto const recursiveAsyncTask = [asyncTask]
{
std::lock_guard<std::mutex> lock(m_mutex);
if (!m_isRunning)
return;
m_isRunning = false;
}
m_condition.notify_one();
m_thread.join();
}
asyncTask();
GetPlatform().RunDelayedTask(Platform::Thread::File, kSendingTimeout, asyncTask);
};
bool Statistics::RequestEvents(std::list<Event> & events, bool & needToSend)
{
std::unique_lock<std::mutex> lock(m_mutex);
bool const isTimeout = !m_condition.wait_for(lock, kSendingTimeout, [this]
// The first send immediately, and then every |kSendingTimeout|.
GetPlatform().RunTask(Platform::Thread::File, [recursiveAsyncTask]
{
return !m_isRunning || !m_events.empty();
recursiveAsyncTask();
});
if (!m_isRunning)
return false;
using namespace std::chrono;
needToSend = m_isFirstSending || isTimeout ||
(std::chrono::steady_clock::now() > (m_lastSending + kSendingTimeout));
events = std::move(m_events);
m_events.clear();
return true;
}
void Statistics::RegisterEvent(Event && event)
{
std::lock_guard<std::mutex> lock(m_mutex);
if (!m_isRunning)
return;
m_events.push_back(std::move(event));
m_condition.notify_one();
RegisterEvents({std::move(event)});
}
void Statistics::RegisterEvents(std::list<Event> && events)
{
std::lock_guard<std::mutex> lock(m_mutex);
if (!m_isRunning)
return;
m_events.splice(m_events.end(), std::move(events));
m_condition.notify_one();
}
void Statistics::ThreadRoutine()
{
std::list<Event> events;
bool needToSend = false;
while (RequestEvents(events, needToSend))
{
ProcessEvents(events);
events.clear();
// Send statistics to server.
if (needToSend)
SendToServer();
}
GetPlatform().RunTask(Platform::Thread::File,
std::bind(&Statistics::ProcessEvents, this, std::move(events)));
}
std::list<Event> Statistics::WriteEvents(std::list<Event> & events, std::string & fileNameToRebuild)
@ -385,14 +334,6 @@ void Statistics::ProcessEvents(std::list<Event> & events)
void Statistics::SendToServer()
{
std::string userId;
ServerSerializer serializer;
{
std::lock_guard<std::mutex> lock(m_mutex);
userId = m_userId;
serializer = m_serverSerializer;
}
for (auto it = m_metadataCache.begin(); it != m_metadataCache.end();)
{
std::string const url = MakeRemoteURL(m_userId, it->first.first, it->first.second);
@ -408,8 +349,8 @@ void Statistics::SendToServer()
std::string contentType = "application/octet-stream";
std::string contentEncoding = "";
std::vector<uint8_t> bytes = serializer != nullptr
? serializer(events, userId, contentType, contentEncoding)
std::vector<uint8_t> bytes = m_serverSerializer != nullptr
? m_serverSerializer(events, m_userId, contentType, contentEncoding)
: SerializeForServer(events);
ASSERT(!bytes.empty(), ());
@ -432,15 +373,13 @@ void Statistics::SendToServer()
++it;
}
}
m_lastSending = std::chrono::steady_clock::now();
m_isFirstSending = false;
}
std::vector<uint8_t> Statistics::SerializeForServer(std::list<Event> const & events) const
{
ASSERT(!events.empty(), ());
// TODO: implement serialization
// TODO: implement binary serialization (so far, we are using json serialization).
return std::vector<uint8_t>{1, 2, 3, 4, 5};
}
@ -534,8 +473,7 @@ void Statistics::BalanceMemory()
void Statistics::SetUserId(std::string const & userId)
{
std::lock_guard<std::mutex> lock(m_mutex);
m_userId = userId;
GetPlatform().RunTask(Platform::Thread::File, [this, userId] { m_userId = userId; });
}
std::list<Event> Statistics::ReadEventsForTesting(std::string const & fileName)
@ -558,7 +496,7 @@ void Statistics::CleanupAfterTesting()
void Statistics::SetCustomServerSerializer(ServerSerializer && serializer)
{
std::lock_guard<std::mutex> lock(m_mutex);
m_serverSerializer = std::move(serializer);
GetPlatform().RunTask(Platform::Thread::File,
[this, serializer] { m_serverSerializer = serializer; });
}
} // namespace local_ads

View file

@ -33,10 +33,8 @@ public:
};
Statistics() = default;
~Statistics();
void Startup();
void Teardown();
void SetUserId(std::string const & userId);
@ -52,9 +50,6 @@ public:
void CleanupAfterTesting();
private:
void ThreadRoutine();
bool RequestEvents(std::list<Event> & events, bool & needToSend);
void IndexMetadata();
void ExtractMetadata(std::string const & fileName);
void BalanceMemory();
@ -79,17 +74,8 @@ private:
}
};
std::map<MetadataKey, Metadata> m_metadataCache;
std::chrono::steady_clock::time_point m_lastSending;
bool m_isFirstSending = true;
std::string m_userId;
ServerSerializer m_serverSerializer;
bool m_isRunning = false;
std::list<Event> m_events;
std::condition_variable m_condition;
std::mutex m_mutex;
threads::SimpleThread m_thread;
};
} // namespace local_ads

View file

@ -473,8 +473,7 @@ Framework::Framework(FrameworkParams const & params)
// Local ads manager should be initialized after storage initialization.
if (params.m_enableLocalAds)
{
m_localAdsManager.SetBookmarkManager(m_bmManager.get());
m_localAdsManager.Startup();
m_localAdsManager.Startup(m_bmManager.get());
}
m_routingManager.SetRouterImpl(RouterType::Vehicle);
@ -505,6 +504,8 @@ Framework::Framework(FrameworkParams const & params)
Framework::~Framework()
{
GetPlatform().ShutdownThreads();
osm::Editor & editor = osm::Editor::Instance();
editor.SetDelegate({});
@ -512,11 +513,8 @@ Framework::~Framework()
GetBookmarkManager().Teardown();
m_trafficManager.Teardown();
m_localAdsManager.Teardown();
DestroyDrapeEngine();
m_model.SetOnMapDeregisteredCallback(nullptr);
GetPlatform().ShutdownThreads();
}
booking::Api * Framework::GetBookingApi(platform::NetworkPolicy const & policy)
@ -1836,10 +1834,10 @@ void Framework::DestroyDrapeEngine()
m_trafficManager.SetDrapeEngine(nullptr);
m_localAdsManager.SetDrapeEngine(nullptr);
m_searchMarks.SetDrapeEngine(nullptr);
GetBookmarkManager().SetDrapeEngine(nullptr);
m_bmManager.SetDrapeEngine(nullptr);
m_localAdsManager.SetDrapeEngine(nullptr);
m_trafficManager.Teardown();
m_localAdsManager.Teardown();
GpsTracker::Instance().Disconnect();
m_drapeEngine.reset();
}

View file

@ -270,46 +270,26 @@ LocalAdsManager::LocalAdsManager(GetMwmsByRectFn && getMwmsByRectFn,
#endif
}
LocalAdsManager::~LocalAdsManager()
void LocalAdsManager::Startup(BookmarkManager * bmManager)
{
std::lock_guard<std::mutex> lock(m_mutex);
ASSERT(!m_isRunning, ());
}
void LocalAdsManager::Startup()
{
{
std::lock_guard<std::mutex> lock(m_mutex);
if (m_isRunning)
return;
m_isRunning = true;
}
FillSupportedTypes();
m_thread = threads::SimpleThread(&LocalAdsManager::ThreadRoutine, this);
GetPlatform().RunTask(Platform::Thread::File, [this, bmManager]
{
m_bmManager = bmManager;
local_ads::IconsInfo::Instance().SetSourceFile(kLocalAdsSymbolsFile);
std::string const campaignFile = GetPath(kCampaignFile);
// Read persistence data.
ReadCampaignFile(campaignFile);
Invalidate();
});
m_statistics.Startup();
}
void LocalAdsManager::Teardown()
{
{
std::lock_guard<std::mutex> lock(m_mutex);
if (!m_isRunning)
return;
m_isRunning = false;
}
m_condition.notify_one();
m_thread.join();
m_statistics.Teardown();
}
void LocalAdsManager::SetBookmarkManager(BookmarkManager * bmManager)
{
m_bmManager = bmManager;
}
void LocalAdsManager::SetDrapeEngine(ref_ptr<df::DrapeEngine> engine)
{
m_drapeEngine.Set(engine);
@ -325,16 +305,14 @@ void LocalAdsManager::UpdateViewport(ScreenBase const & screen)
return;
}
std::vector<std::string> requestedCampaigns;
auto mwms = m_getMwmsByRectFn(screen.ClipRect());
if (mwms.empty())
return;
// Request local ads campaigns.
GetPlatform().RunTask(Platform::Thread::File, [this, connectionStatus, mwms]
{
std::lock_guard<std::mutex> lock(m_mutex);
if (!m_isRunning)
return;
std::vector<std::string> requestedCampaigns;
for (auto const & mwm : mwms)
{
@ -376,11 +354,12 @@ void LocalAdsManager::UpdateViewport(ScreenBase const & screen)
if (!requestedCampaigns.empty())
{
std::set<Request> requests;
for (auto const & campaign : requestedCampaigns)
m_requestedCampaigns.insert(std::make_pair(m_getMwmIdByNameFn(campaign), RequestType::Download));
m_condition.notify_one();
requests.insert(std::make_pair(m_getMwmIdByNameFn(campaign), RequestType::Download));
ProcessRequests(requests);
}
}
});
}
bool LocalAdsManager::DownloadCampaign(MwmSet::MwmId const & mwmId, std::vector<uint8_t> & bytes)
@ -391,21 +370,15 @@ bool LocalAdsManager::DownloadCampaign(MwmSet::MwmId const & mwmId, std::vector<
if (url.empty())
return true; // In this case empty result is valid.
// Skip already downloaded campaigns. We do not lock whole method because RunHttpRequest
// is a synchronous method and may take a lot of time. The case in which countryName will
// be added to m_campaigns between locks is neglected.
{
std::lock_guard<std::mutex> lock(m_mutex);
auto const & countryName = mwmId.GetInfo()->GetCountryName();
auto const it = m_campaigns.find(countryName);
if (it != m_campaigns.cend() && it->second)
return false;
}
// Skip already downloaded campaigns.
auto const & countryName = mwmId.GetInfo()->GetCountryName();
auto const it = m_campaigns.find(countryName);
if (it != m_campaigns.cend() && it->second)
return false;
platform::HttpClient request(url);
bool const success = request.RunHttpRequest() && request.ErrorCode() == 200;
std::lock_guard<std::mutex> lock(m_mutex);
if (!success)
{
auto const it = m_failedDownloads.find(mwmId);
@ -430,92 +403,66 @@ bool LocalAdsManager::DownloadCampaign(MwmSet::MwmId const & mwmId, std::vector<
return true;
}
void LocalAdsManager::ThreadRoutine()
void LocalAdsManager::ProcessRequests(std::set<Request> const & campaignMwms)
{
local_ads::IconsInfo::Instance().SetSourceFile(kLocalAdsSymbolsFile);
std::string const campaignFile = GetPath(kCampaignFile);
// Read persistence data.
ReadCampaignFile(campaignFile);
Invalidate();
std::set<Request> campaignMwms;
while (WaitForRequest(campaignMwms))
for (auto const & mwm : campaignMwms)
{
for (auto const & mwm : campaignMwms)
if (!mwm.first.IsAlive())
continue;
std::string const countryName = mwm.first.GetInfo()->GetCountryName();
if (mwm.second == RequestType::Download)
{
if (!mwm.first.IsAlive())
// Download campaign data from server.
CampaignInfo info;
info.m_created = local_ads::Clock::now();
if (!DownloadCampaign(mwm.first, info.m_data))
continue;
std::string const countryName = mwm.first.GetInfo()->GetCountryName();
if (mwm.second == RequestType::Download)
// Parse data and recreate marks.
ClearLocalAdsForMwm(mwm.first);
if (!info.m_data.empty())
{
// Download campaign data from server.
CampaignInfo info;
info.m_created = local_ads::Clock::now();
if (!DownloadCampaign(mwm.first, info.m_data))
continue;
// Parse data and recreate marks.
ClearLocalAdsForMwm(mwm.first);
if (!info.m_data.empty())
auto campaignData = ParseCampaign(std::move(info.m_data), mwm.first, info.m_created);
if (!campaignData.empty())
{
auto campaignData = ParseCampaign(std::move(info.m_data), mwm.first, info.m_created);
if (!campaignData.empty())
{
UpdateFeaturesCache(ReadCampaignFeatures(m_readFeaturesFn, campaignData));
CreateLocalAdsMarks(m_bmManager, campaignData);
}
}
// Update run-time data.
{
std::lock_guard<std::mutex> lock(m_mutex);
m_campaigns[countryName] = true;
m_info[countryName] = info;
UpdateFeaturesCache(ReadCampaignFeatures(m_readFeaturesFn, campaignData));
CreateLocalAdsMarks(m_bmManager, campaignData);
}
}
else if (mwm.second == RequestType::Delete)
{
std::lock_guard<std::mutex> lock(m_mutex);
m_campaigns.erase(countryName);
m_info.erase(countryName);
ClearLocalAdsForMwm(mwm.first);
}
m_campaigns[countryName] = true;
m_info[countryName] = info;
}
else if (mwm.second == RequestType::Delete)
{
m_campaigns.erase(countryName);
m_info.erase(countryName);
ClearLocalAdsForMwm(mwm.first);
}
campaignMwms.clear();
// Save data persistently.
WriteCampaignFile(campaignFile);
}
}
bool LocalAdsManager::WaitForRequest(std::set<Request> & campaignMwms)
{
std::unique_lock<std::mutex> lock(m_mutex);
m_condition.wait(lock, [this] {return !m_isRunning || !m_requestedCampaigns.empty();});
if (!m_isRunning)
return false;
campaignMwms = std::move(m_requestedCampaigns);
m_requestedCampaigns.clear();
return true;
// Save data persistently.
WriteCampaignFile(campaignFile);
}
void LocalAdsManager::OnDownloadCountry(std::string const & countryName)
{
std::lock_guard<std::mutex> lock(m_mutex);
m_campaigns.erase(countryName);
m_info.erase(countryName);
GetPlatform().RunTask(Platform::Thread::File, [this, countryName]
{
m_campaigns.erase(countryName);
m_info.erase(countryName);
});
}
void LocalAdsManager::OnDeleteCountry(std::string const & countryName)
{
std::lock_guard<std::mutex> lock(m_mutex);
m_requestedCampaigns.insert(std::make_pair(m_getMwmIdByNameFn(countryName), RequestType::Delete));
m_condition.notify_one();
GetPlatform().RunTask(Platform::Thread::File, [this, countryName]
{
ProcessRequests({std::make_pair(m_getMwmIdByNameFn(countryName), RequestType::Delete)});
});
}
void LocalAdsManager::ReadCampaignFile(std::string const & campaignFile)
@ -523,7 +470,6 @@ void LocalAdsManager::ReadCampaignFile(std::string const & campaignFile)
if (!GetPlatform().IsFileExistsByFullPath(campaignFile))
return;
std::lock_guard<std::mutex> lock(m_mutex);
try
{
FileReader reader(campaignFile, true /* withExceptions */);
@ -550,7 +496,6 @@ void LocalAdsManager::WriteCampaignFile(std::string const & campaignFile)
{
try
{
std::lock_guard<std::mutex> lock(m_mutex);
FileWriter writer(campaignFile);
for (auto const & info : m_info)
SerializeCampaign(writer, info.first, info.second.m_created, info.second.m_data);
@ -563,21 +508,21 @@ void LocalAdsManager::WriteCampaignFile(std::string const & campaignFile)
void LocalAdsManager::Invalidate()
{
DeleteAllLocalAdsMarks(m_bmManager);
m_drapeEngine.SafeCall(&df::DrapeEngine::RemoveAllCustomFeatures);
CampaignData campaignData;
GetPlatform().RunTask(Platform::Thread::File, [this]
{
std::lock_guard<std::mutex> lock(m_mutex);
DeleteAllLocalAdsMarks(m_bmManager);
m_drapeEngine.SafeCall(&df::DrapeEngine::RemoveAllCustomFeatures);
CampaignData campaignData;
for (auto const & info : m_info)
{
auto data = ParseCampaign(info.second.m_data, m_getMwmIdByNameFn(info.first),
info.second.m_created);
campaignData.insert(data.begin(), data.end());
}
}
UpdateFeaturesCache(ReadCampaignFeatures(m_readFeaturesFn, campaignData));
CreateLocalAdsMarks(m_bmManager, campaignData);
UpdateFeaturesCache(ReadCampaignFeatures(m_readFeaturesFn, campaignData));
CreateLocalAdsMarks(m_bmManager, campaignData);
});
}
void LocalAdsManager::UpdateFeaturesCache(std::set<FeatureID> && ids)

View file

@ -44,12 +44,8 @@ public:
LocalAdsManager(GetMwmsByRectFn && getMwmsByRectFn, GetMwmIdByNameFn && getMwmIdByName,
ReadFeaturesFn && readFeaturesFn);
LocalAdsManager(LocalAdsManager && /* localAdsManager */) = default;
~LocalAdsManager();
void SetBookmarkManager(BookmarkManager * bmManager);
void Startup();
void Teardown();
void Startup(BookmarkManager * bmManager);
void SetDrapeEngine(ref_ptr<df::DrapeEngine> engine);
void UpdateViewport(ScreenBase const & screen);
@ -74,8 +70,7 @@ private:
};
using Request = std::pair<MwmSet::MwmId, RequestType>;
void ThreadRoutine();
bool WaitForRequest(std::set<Request> & campaignMwms);
void ProcessRequests(std::set<Request> const & campaignMwms);
void ReadCampaignFile(std::string const & campaignFile);
void WriteCampaignFile(std::string const & campaignFile);
@ -89,9 +84,9 @@ private:
// by some reason.
bool DownloadCampaign(MwmSet::MwmId const & mwmId, std::vector<uint8_t> & bytes);
GetMwmsByRectFn m_getMwmsByRectFn;
GetMwmIdByNameFn m_getMwmIdByNameFn;
ReadFeaturesFn m_readFeaturesFn;
GetMwmsByRectFn const m_getMwmsByRectFn;
GetMwmIdByNameFn const m_getMwmIdByNameFn;
ReadFeaturesFn const m_readFeaturesFn;
std::atomic<BookmarkManager *> m_bmManager;
@ -128,10 +123,4 @@ private:
std::map<MwmSet::MwmId, BackoffStats> m_failedDownloads;
local_ads::Statistics m_statistics;
bool m_isRunning = false;
std::condition_variable m_condition;
std::set<Request> m_requestedCampaigns;
std::mutex m_mutex;
threads::SimpleThread m_thread;
};

View file

@ -275,18 +275,18 @@ public:
}
template <typename Task>
void RunTask(Thread thread, Task const & task)
void RunDelayedTask(Thread thread, base::WorkerThread::Duration const & delay, Task && task)
{
switch (thread)
{
case Thread::File:
m_fileThread.Push(task);
m_fileThread.PushDelayed(delay, forward<Task>(task));
break;
case Thread::Network:
m_networkThread.Push(task);
m_networkThread.PushDelayed(delay, forward<Task>(task));
break;
case Thread::Gui:
RunOnGuiThread(task);
ASSERT(false, ("Delayed tasks for gui thread are not supported yet"));
break;
}
}