[geocoder] Optimize memory at the start

This commit is contained in:
Anatoly Serdtcev 2019-07-15 12:42:31 +03:00 committed by mpimenov
parent 0ba20f3512
commit 0012650c98
2 changed files with 67 additions and 83 deletions

View file

@ -1,10 +1,14 @@
#include "geocoder/hierarchy_reader.hpp"
#include "base/logging.hpp"
#include "base/thread_pool_computational.hpp"
#include <algorithm>
#include <queue>
#include <iomanip>
#include <list>
#include <sstream>
#include <thread>
#include <vector>
using namespace std;
@ -56,23 +60,33 @@ Hierarchy HierarchyReader::Read(unsigned int readersCount)
LOG(LINFO, ("Reading entries..."));
vector<multimap<base::GeoObjectId, Entry>> taskEntries(readersCount);
vector<ParsingStats> tasksStats(readersCount);
vector<thread> tasks{};
for (size_t t = 0; t < readersCount; ++t)
tasks.emplace_back(&HierarchyReader::ReadEntryMap, this, ref(taskEntries[t]), ref(tasksStats[t]));
vector<Entry> entries;
ParsingStats stats{};
for (auto & reader : tasks)
reader.join();
base::thread_pool::computational::ThreadPool threadPool{readersCount};
list<future<ParsingResult>> tasks{};
while (!m_eof || !tasks.empty())
{
size_t const kReadBlockLineCount = 1000;
while (!m_eof && tasks.size() <= 2 * readersCount)
tasks.emplace_back(threadPool.Submit([&] { return ReadEntries(kReadBlockLineCount); }));
auto & task = tasks.front();
auto taskResult = task.get();
tasks.pop_front();
auto & taskEntries = taskResult.m_entries;
move(begin(taskEntries), end(taskEntries), back_inserter(entries));
stats += taskResult.m_stats;
}
if (m_totalNumLoaded % kLogBatch != 0)
LOG(LINFO, ("Read", m_totalNumLoaded, "entries"));
ParsingStats stats{};
for (auto & readerStats : tasksStats)
stats += readerStats;
auto entries = MergeEntries(taskEntries);
LOG(LINFO, ("Sorting entries..."));
sort(begin(entries), end(entries));
LOG(LINFO, ("Finished entries sorting"));
CheckDuplicateOsmIds(entries, stats);
@ -93,45 +107,6 @@ Hierarchy HierarchyReader::Read(unsigned int readersCount)
return Hierarchy{move(entries), true};
}
vector<Hierarchy::Entry> HierarchyReader::MergeEntries(vector<multimap<base::GeoObjectId, Entry>> & entryParts)
{
auto entries = vector<Entry>{};
size_t size{0};
for (auto const & map : entryParts)
size += map.size();
entries.reserve(size);
LOG(LINFO, ("Merging entries..."));
using PartReference = reference_wrapper<multimap<base::GeoObjectId, Entry>>;
struct ReferenceGreater
{
bool operator()(PartReference const & l, PartReference const & r) const noexcept
{ return l.get() > r.get(); }
};
auto partsQueue = priority_queue<PartReference, vector<PartReference>, ReferenceGreater>(
entryParts.begin(), entryParts.end());
while (!partsQueue.empty())
{
auto & minPart = partsQueue.top().get();
partsQueue.pop();
while (!minPart.empty() && (partsQueue.empty() || minPart <= partsQueue.top().get()))
{
entries.emplace_back(move(minPart.begin()->second));
minPart.erase(minPart.begin());
}
if (!minPart.empty())
partsQueue.push(ref(minPart));
}
return entries;
}
void HierarchyReader::CheckDuplicateOsmIds(vector<geocoder::Hierarchy::Entry> const & entries,
ParsingStats & stats)
{
@ -144,49 +119,43 @@ void HierarchyReader::CheckDuplicateOsmIds(vector<geocoder::Hierarchy::Entry> co
if (j != i + 1)
{
++stats.m_duplicateOsmIds;
// TODO: Remove the cast when the hierarchies no longer contain negative keys.
LOG(LDEBUG,
("Duplicate osm id:", static_cast<int64_t>(entries[i].m_osmId.GetEncodedId()), "(",
entries[i].m_osmId, ")", "occurs as a key in", j - i, "key-value entries."));
("Duplicate osm id:", SerializeId(entries[i].m_osmId.GetEncodedId()), "(",
SerializeId(entries[i].m_osmId.GetEncodedId()), ")", "occurs as a key in",
j - i, "key-value entries."));
}
i = j;
}
}
void HierarchyReader::ReadEntryMap(multimap<base::GeoObjectId, Entry> & entries, ParsingStats & stats)
HierarchyReader::ParsingResult HierarchyReader::ReadEntries(size_t count)
{
// Temporary local object for efficient concurent processing (individual cache line for container).
auto localEntries = multimap<base::GeoObjectId, Entry>{};
vector<string> linesBuffer(count);
size_t bufferSize = 0;
size_t const kLineBufferCapacity = 10000;
vector<string> linesBuffer(kLineBufferCapacity);
while (true)
{
size_t bufferSize = 0;
lock_guard<mutex> lock(m_mutex);
for (; bufferSize < count; ++bufferSize)
{
lock_guard<mutex> lock(m_mutex);
for (; bufferSize < kLineBufferCapacity; ++bufferSize)
if (!getline(m_in, linesBuffer[bufferSize]))
{
if (!getline(m_in, linesBuffer[bufferSize]))
break;
m_eof = true;
break;
}
}
if (!bufferSize)
break;
DeserializeEntryMap(linesBuffer, bufferSize, localEntries, stats);
}
entries = move(localEntries);
return DeserializeEntries(linesBuffer, bufferSize);
}
void HierarchyReader::DeserializeEntryMap(vector<string> const & linesBuffer, size_t const bufferSize,
multimap<base::GeoObjectId, Entry> & entries, ParsingStats & stats)
HierarchyReader::ParsingResult HierarchyReader::DeserializeEntries(
vector<string> const & linesBuffer, size_t const bufferSize)
{
vector<Entry> entries;
entries.reserve(bufferSize);
ParsingStats stats;
for (size_t i = 0; i < bufferSize; ++i)
{
auto & line = linesBuffer[i];
@ -220,12 +189,21 @@ void HierarchyReader::DeserializeEntryMap(vector<string> const & linesBuffer, si
if (totalNumLoaded % kLogBatch == 0)
LOG(LINFO, ("Read", totalNumLoaded, "entries"));
entries.emplace(osmId, move(entry));
entries.push_back(move(entry));
}
return {std::move(entries), std::move(stats)};
}
bool HierarchyReader::DeserializeId(std::string const & str, uint64_t & id)
bool HierarchyReader::DeserializeId(string const & str, uint64_t & id)
{
return strings::to_uint64(str, id, 16 /* base */);
}
string HierarchyReader::SerializeId(uint64_t id)
{
stringstream s;
s << setw(16) << setfill('0') << hex << uppercase << id;
return s.str();
}
} // namespace geocoder

View file

@ -7,10 +7,10 @@
#include <atomic>
#include <fstream>
#include <map>
#include <mutex>
#include <string>
#include <thread>
#include <utility>
#include <vector>
namespace geocoder
@ -30,17 +30,23 @@ public:
Hierarchy Read(unsigned int readersCount = 1);
private:
void ReadEntryMap(std::multimap<base::GeoObjectId, Entry> & entries, ParsingStats & stats);
struct ParsingResult
{
std::vector<Entry> m_entries;
ParsingStats m_stats;
};
void DeserializeEntryMap(std::vector<std::string> const & linesBuffer, std::size_t const bufferSize,
std::multimap<base::GeoObjectId, Entry> & entries, ParsingStats & stats);
ParsingResult ReadEntries(size_t count);
ParsingResult DeserializeEntries(std::vector<std::string> const & linesBuffer,
std::size_t const bufferSize);
bool DeserializeId(std::string const & str, uint64_t & id);
std::string SerializeId(uint64_t id);
std::vector<Entry> MergeEntries(std::vector<std::multimap<base::GeoObjectId, Entry>> & entryParts);
void CheckDuplicateOsmIds(std::vector<Entry> const & entries, ParsingStats & stats);
std::ifstream m_fileStream;
std::istream & m_in;
bool m_eof{false};
std::mutex m_mutex;
std::atomic<std::uint64_t> m_totalNumLoaded{0};
};