[geocoder] Add concurent hierarchy reading

This commit is contained in:
Anatoly Serdtcev 2019-01-14 18:48:26 +03:00
parent 3482321180
commit 5dad0dcba8
7 changed files with 169 additions and 57 deletions

View file

@ -8,6 +8,8 @@ set(
geocoder.hpp
hierarchy.cpp
hierarchy.hpp
hierarchy_reader.cpp
hierarchy_reader.hpp
index.cpp
index.hpp
result.cpp

View file

@ -193,8 +193,8 @@ vector<Geocoder::Layer> & Geocoder::Context::GetLayers() { return m_layers; }
vector<Geocoder::Layer> const & Geocoder::Context::GetLayers() const { return m_layers; }
// Geocoder ----------------------------------------------------------------------------------------
Geocoder::Geocoder(string const & pathToJsonHierarchy)
: m_hierarchy(pathToJsonHierarchy), m_index(m_hierarchy)
Geocoder::Geocoder(string const & pathToJsonHierarchy, size_t readerCount)
: m_hierarchy(pathToJsonHierarchy, readerCount), m_index(m_hierarchy)
{
}

View file

@ -118,7 +118,7 @@ public:
std::vector<Layer> m_layers;
};
explicit Geocoder(std::string const & pathToJsonHierarchy);
explicit Geocoder(std::string const & pathToJsonHierarchy, std::size_t readerCount = 4);
void ProcessQuery(std::string const & query, std::vector<Result> & results) const;

View file

@ -1,5 +1,7 @@
#include "geocoder/hierarchy.hpp"
#include "geocoder/hierarchy_reader.hpp"
#include "indexer/search_string_utils.hpp"
#include "base/exception.hpp"
@ -9,16 +11,12 @@
#include "base/string_utils.hpp"
#include <algorithm>
#include <fstream>
#include <utility>
using namespace std;
namespace
{
// Information will be logged for every |kLogBatch| entries.
size_t const kLogBatch = 100000;
void CheckDuplicateOsmIds(vector<geocoder::Hierarchy::Entry> const & entries,
geocoder::Hierarchy::ParsingStats & stats)
{
@ -138,50 +136,12 @@ bool Hierarchy::Entry::IsParentTo(Hierarchy::Entry const & e) const
}
// Hierarchy ---------------------------------------------------------------------------------------
Hierarchy::Hierarchy(string const & pathToJsonHierarchy)
Hierarchy::Hierarchy(string const & pathToJsonHierarchy, size_t readerCount)
{
ifstream ifs(pathToJsonHierarchy);
string line;
ParsingStats stats;
LOG(LINFO, ("Reading entries..."));
while (getline(ifs, line))
{
if (line.empty())
continue;
auto const i = line.find(' ');
int64_t encodedId;
if (i == string::npos || !strings::to_any(line.substr(0, i), encodedId))
{
LOG(LWARNING, ("Cannot read osm id. Line:", line));
++stats.m_badOsmIds;
continue;
}
line = line.substr(i + 1);
Entry entry;
// todo(@m) We should really write uints as uints.
entry.m_osmId = base::GeoObjectId(static_cast<uint64_t>(encodedId));
if (!entry.DeserializeFromJSON(line, stats))
continue;
if (entry.m_type == Type::Count)
continue;
++stats.m_numLoaded;
if (stats.m_numLoaded % kLogBatch == 0)
LOG(LINFO, ("Read", stats.m_numLoaded, "entries"));
m_entries.emplace_back(move(entry));
}
if (stats.m_numLoaded % kLogBatch != 0)
LOG(LINFO, ("Read", stats.m_numLoaded, "entries"));
LOG(LINFO, ("Sorting entries..."));
sort(m_entries.begin(), m_entries.end());
HierarchyReader reader{pathToJsonHierarchy};
m_entries = reader.ReadEntries(readerCount, stats);
CheckDuplicateOsmIds(m_entries, stats);

View file

@ -5,6 +5,7 @@
#include "base/geo_object_id.hpp"
#include <array>
#include <atomic>
#include <cstddef>
#include <cstdint>
#include <string>
@ -20,31 +21,31 @@ public:
struct ParsingStats
{
// Number of entries that the hierarchy was constructed from.
uint64_t m_numLoaded = 0;
std::atomic<uint64_t> m_numLoaded{0};
// Number of corrupted json lines.
uint64_t m_badJsons = 0;
std::atomic<uint64_t> m_badJsons{0};
// Number of entries with unreadable base::GeoObjectIds.
uint64_t m_badOsmIds = 0;
std::atomic<uint64_t> m_badOsmIds{0};
// Number of base::GeoObjectsIds that occur as keys in at least two entries.
uint64_t m_duplicateOsmIds = 0;
std::atomic<uint64_t> m_duplicateOsmIds{0};
// Number of entries with duplicate subfields in the address field.
uint64_t m_duplicateAddresses = 0;
std::atomic<uint64_t> m_duplicateAddresses{0};
// Number of entries whose address field either does
// not exist or consists of empty lines.
uint64_t m_emptyAddresses = 0;
std::atomic<uint64_t> m_emptyAddresses{0};
// Number of entries without the name field or with an empty one.
uint64_t m_emptyNames = 0;
std::atomic<uint64_t> m_emptyNames{0};
// Number of entries whose names do not match the most
// specific parts of their addresses.
// This is expected from POIs but not from regions or streets.
uint64_t m_mismatchedNames = 0;
std::atomic<uint64_t> m_mismatchedNames{0};
};
// A single entry in the hierarchy directed acyclic graph.
@ -75,7 +76,7 @@ public:
std::array<Tokens, static_cast<size_t>(Type::Count) + 1> m_address;
};
explicit Hierarchy(std::string const & pathToJsonHierarchy);
explicit Hierarchy(std::string const & pathToJsonHierarchy, std::size_t readerCount = 4);
std::vector<Entry> const & GetEntries() const;

View file

@ -0,0 +1,118 @@
#include "geocoder/hierarchy_reader.hpp"
#include "base/logging.hpp"
#include <thread>
using namespace std;
namespace geocoder {
namespace
{
// Information will be logged for every |kLogBatch| entries.
size_t const kLogBatch = 100000;
} // namespace
HierarchyReader::HierarchyReader(string const & pathToJsonHierarchy) :
m_fileStm{pathToJsonHierarchy}
{
if (!m_fileStm)
throw runtime_error("failed to open file " + pathToJsonHierarchy);
}
auto HierarchyReader::ReadEntries(size_t readerCount, ParsingStats & stats)
-> vector<Entry>
{
LOG(LINFO, ("Reading entries..."));
auto taskEntries = vector<multimap<base::GeoObjectId, Entry>>(readerCount);
auto tasks = vector<thread>{};
for (auto t = size_t{0}; t < readerCount; ++t)
tasks.emplace_back(&HierarchyReader::ReadEntryMap, this, ref(taskEntries[t]), ref(stats));
for (auto & reader : tasks)
reader.join();
if (stats.m_numLoaded % kLogBatch != 0)
LOG(LINFO, ("Read", stats.m_numLoaded, "entries"));
return UnionEntries(taskEntries);
}
auto HierarchyReader::UnionEntries(vector<multimap<base::GeoObjectId, Entry>> & entryParts) -> vector<Entry>
{
auto entries = vector<Entry>{};
auto size = size_t{0};
for (auto const & map : entryParts)
size += map.size();
entries.reserve(size);
LOG(LINFO, ("Sorting entries..."));
while (entryParts.size())
{
auto minPart = min_element(entryParts.begin(), entryParts.end());
entries.emplace_back(std::move(minPart->begin()->second));
minPart->erase(minPart->begin());
if (minPart->empty())
entryParts.erase(minPart);
}
return entries;
}
void HierarchyReader::ReadEntryMap(multimap<base::GeoObjectId, Entry> & entries, ParsingStats & stats)
{
// Temporary local object for efficient concurent processing (individual cache line for container).
auto localEntries = multimap<base::GeoObjectId, Entry>{};
string line;
while (true)
{
{
auto && lock = lock_guard<mutex>(m_mutex);
if (!getline(m_fileStm, line))
break;
}
if (line.empty())
continue;
auto const i = line.find(' ');
int64_t encodedId;
if (i == string::npos || !strings::to_any(line.substr(0, i), encodedId))
{
LOG(LWARNING, ("Cannot read osm id. Line:", line));
++stats.m_badOsmIds;
continue;
}
line = line.substr(i + 1);
Entry entry;
// todo(@m) We should really write uints as uints.
auto osmId = base::GeoObjectId(static_cast<uint64_t>(encodedId));
entry.m_osmId = osmId;
if (!entry.DeserializeFromJSON(line, stats))
continue;
if (entry.m_type == Type::Count)
continue;
++stats.m_numLoaded;
if (stats.m_numLoaded % kLogBatch == 0)
LOG(LINFO, ("Read", (stats.m_numLoaded / kLogBatch) * kLogBatch, "entries"));
localEntries.emplace(osmId, move(entry));
}
entries = move(localEntries);
}
} // namespace geocoder

View file

@ -0,0 +1,31 @@
#pragma once
#include "geocoder/hierarchy.hpp"
#include <fstream>
#include <map>
#include <mutex>
#include <vector>
namespace geocoder
{
class HierarchyReader
{
public:
using Entry = Hierarchy::Entry;
using ParsingStats = Hierarchy::ParsingStats;
HierarchyReader(std::string const & pathToJsonHierarchy);
auto ReadEntries(size_t readerCount, ParsingStats & stats) -> std::vector<Entry>;
private:
void ReadEntryMap(std::multimap<base::GeoObjectId, Entry> & entries, ParsingStats & stats);
auto UnionEntries(std::vector<std::multimap<base::GeoObjectId, Entry>> & entryParts) -> std::vector<Entry>;
std::ifstream m_fileStm;
std::mutex m_mutex;
};
} // namespace geocoder