From 5dad0dcba80af88d08984c217d18f96593d80b13 Mon Sep 17 00:00:00 2001 From: Anatoly Serdtcev Date: Mon, 14 Jan 2019 18:48:26 +0300 Subject: [PATCH] [geocoder] Add concurent hierarchy reading --- geocoder/CMakeLists.txt | 2 + geocoder/geocoder.cpp | 4 +- geocoder/geocoder.hpp | 2 +- geocoder/hierarchy.cpp | 50 ++------------ geocoder/hierarchy.hpp | 19 +++--- geocoder/hierarchy_reader.cpp | 118 ++++++++++++++++++++++++++++++++++ geocoder/hierarchy_reader.hpp | 31 +++++++++ 7 files changed, 169 insertions(+), 57 deletions(-) create mode 100644 geocoder/hierarchy_reader.cpp create mode 100644 geocoder/hierarchy_reader.hpp diff --git a/geocoder/CMakeLists.txt b/geocoder/CMakeLists.txt index 3e50874841..5c82e4398b 100644 --- a/geocoder/CMakeLists.txt +++ b/geocoder/CMakeLists.txt @@ -8,6 +8,8 @@ set( geocoder.hpp hierarchy.cpp hierarchy.hpp + hierarchy_reader.cpp + hierarchy_reader.hpp index.cpp index.hpp result.cpp diff --git a/geocoder/geocoder.cpp b/geocoder/geocoder.cpp index 7049ac7a7e..2489ef083a 100644 --- a/geocoder/geocoder.cpp +++ b/geocoder/geocoder.cpp @@ -193,8 +193,8 @@ vector & Geocoder::Context::GetLayers() { return m_layers; } vector const & Geocoder::Context::GetLayers() const { return m_layers; } // Geocoder ---------------------------------------------------------------------------------------- -Geocoder::Geocoder(string const & pathToJsonHierarchy) - : m_hierarchy(pathToJsonHierarchy), m_index(m_hierarchy) +Geocoder::Geocoder(string const & pathToJsonHierarchy, size_t readerCount) + : m_hierarchy(pathToJsonHierarchy, readerCount), m_index(m_hierarchy) { } diff --git a/geocoder/geocoder.hpp b/geocoder/geocoder.hpp index f987de5f11..513d5c7df3 100644 --- a/geocoder/geocoder.hpp +++ b/geocoder/geocoder.hpp @@ -118,7 +118,7 @@ public: std::vector m_layers; }; - explicit Geocoder(std::string const & pathToJsonHierarchy); + explicit Geocoder(std::string const & pathToJsonHierarchy, std::size_t readerCount = 4); void ProcessQuery(std::string const & query, std::vector & results) const; diff --git a/geocoder/hierarchy.cpp b/geocoder/hierarchy.cpp index 43e334b0bf..5c82e3b7bd 100644 --- a/geocoder/hierarchy.cpp +++ b/geocoder/hierarchy.cpp @@ -1,5 +1,7 @@ #include "geocoder/hierarchy.hpp" +#include "geocoder/hierarchy_reader.hpp" + #include "indexer/search_string_utils.hpp" #include "base/exception.hpp" @@ -9,16 +11,12 @@ #include "base/string_utils.hpp" #include -#include #include using namespace std; namespace { -// Information will be logged for every |kLogBatch| entries. -size_t const kLogBatch = 100000; - void CheckDuplicateOsmIds(vector const & entries, geocoder::Hierarchy::ParsingStats & stats) { @@ -138,50 +136,12 @@ bool Hierarchy::Entry::IsParentTo(Hierarchy::Entry const & e) const } // Hierarchy --------------------------------------------------------------------------------------- -Hierarchy::Hierarchy(string const & pathToJsonHierarchy) +Hierarchy::Hierarchy(string const & pathToJsonHierarchy, size_t readerCount) { - ifstream ifs(pathToJsonHierarchy); - string line; ParsingStats stats; - LOG(LINFO, ("Reading entries...")); - while (getline(ifs, line)) - { - if (line.empty()) - continue; - - auto const i = line.find(' '); - int64_t encodedId; - if (i == string::npos || !strings::to_any(line.substr(0, i), encodedId)) - { - LOG(LWARNING, ("Cannot read osm id. Line:", line)); - ++stats.m_badOsmIds; - continue; - } - line = line.substr(i + 1); - - Entry entry; - // todo(@m) We should really write uints as uints. - entry.m_osmId = base::GeoObjectId(static_cast(encodedId)); - - if (!entry.DeserializeFromJSON(line, stats)) - continue; - - if (entry.m_type == Type::Count) - continue; - - ++stats.m_numLoaded; - if (stats.m_numLoaded % kLogBatch == 0) - LOG(LINFO, ("Read", stats.m_numLoaded, "entries")); - - m_entries.emplace_back(move(entry)); - } - - if (stats.m_numLoaded % kLogBatch != 0) - LOG(LINFO, ("Read", stats.m_numLoaded, "entries")); - - LOG(LINFO, ("Sorting entries...")); - sort(m_entries.begin(), m_entries.end()); + HierarchyReader reader{pathToJsonHierarchy}; + m_entries = reader.ReadEntries(readerCount, stats); CheckDuplicateOsmIds(m_entries, stats); diff --git a/geocoder/hierarchy.hpp b/geocoder/hierarchy.hpp index bae7d2fc38..88e04e21ff 100644 --- a/geocoder/hierarchy.hpp +++ b/geocoder/hierarchy.hpp @@ -5,6 +5,7 @@ #include "base/geo_object_id.hpp" #include +#include #include #include #include @@ -20,31 +21,31 @@ public: struct ParsingStats { // Number of entries that the hierarchy was constructed from. - uint64_t m_numLoaded = 0; + std::atomic m_numLoaded{0}; // Number of corrupted json lines. - uint64_t m_badJsons = 0; + std::atomic m_badJsons{0}; // Number of entries with unreadable base::GeoObjectIds. - uint64_t m_badOsmIds = 0; + std::atomic m_badOsmIds{0}; // Number of base::GeoObjectsIds that occur as keys in at least two entries. - uint64_t m_duplicateOsmIds = 0; + std::atomic m_duplicateOsmIds{0}; // Number of entries with duplicate subfields in the address field. - uint64_t m_duplicateAddresses = 0; + std::atomic m_duplicateAddresses{0}; // Number of entries whose address field either does // not exist or consists of empty lines. - uint64_t m_emptyAddresses = 0; + std::atomic m_emptyAddresses{0}; // Number of entries without the name field or with an empty one. - uint64_t m_emptyNames = 0; + std::atomic m_emptyNames{0}; // Number of entries whose names do not match the most // specific parts of their addresses. // This is expected from POIs but not from regions or streets. - uint64_t m_mismatchedNames = 0; + std::atomic m_mismatchedNames{0}; }; // A single entry in the hierarchy directed acyclic graph. @@ -75,7 +76,7 @@ public: std::array(Type::Count) + 1> m_address; }; - explicit Hierarchy(std::string const & pathToJsonHierarchy); + explicit Hierarchy(std::string const & pathToJsonHierarchy, std::size_t readerCount = 4); std::vector const & GetEntries() const; diff --git a/geocoder/hierarchy_reader.cpp b/geocoder/hierarchy_reader.cpp new file mode 100644 index 0000000000..bac7f4f962 --- /dev/null +++ b/geocoder/hierarchy_reader.cpp @@ -0,0 +1,118 @@ +#include "geocoder/hierarchy_reader.hpp" + +#include "base/logging.hpp" + +#include + +using namespace std; + +namespace geocoder { + +namespace +{ +// Information will be logged for every |kLogBatch| entries. +size_t const kLogBatch = 100000; +} // namespace + +HierarchyReader::HierarchyReader(string const & pathToJsonHierarchy) : + m_fileStm{pathToJsonHierarchy} +{ + if (!m_fileStm) + throw runtime_error("failed to open file " + pathToJsonHierarchy); +} + +auto HierarchyReader::ReadEntries(size_t readerCount, ParsingStats & stats) + -> vector +{ + LOG(LINFO, ("Reading entries...")); + + auto taskEntries = vector>(readerCount); + auto tasks = vector{}; + for (auto t = size_t{0}; t < readerCount; ++t) + tasks.emplace_back(&HierarchyReader::ReadEntryMap, this, ref(taskEntries[t]), ref(stats)); + + for (auto & reader : tasks) + reader.join(); + + if (stats.m_numLoaded % kLogBatch != 0) + LOG(LINFO, ("Read", stats.m_numLoaded, "entries")); + + return UnionEntries(taskEntries); +} + +auto HierarchyReader::UnionEntries(vector> & entryParts) -> vector +{ + auto entries = vector{}; + + auto size = size_t{0}; + for (auto const & map : entryParts) + size += map.size(); + + entries.reserve(size); + + LOG(LINFO, ("Sorting entries...")); + + while (entryParts.size()) + { + auto minPart = min_element(entryParts.begin(), entryParts.end()); + + entries.emplace_back(std::move(minPart->begin()->second)); + + minPart->erase(minPart->begin()); + if (minPart->empty()) + entryParts.erase(minPart); + } + + return entries; +} + +void HierarchyReader::ReadEntryMap(multimap & entries, ParsingStats & stats) +{ + // Temporary local object for efficient concurent processing (individual cache line for container). + auto localEntries = multimap{}; + + string line; + while (true) + { + { + auto && lock = lock_guard(m_mutex); + + if (!getline(m_fileStm, line)) + break; + } + + if (line.empty()) + continue; + + auto const i = line.find(' '); + int64_t encodedId; + if (i == string::npos || !strings::to_any(line.substr(0, i), encodedId)) + { + LOG(LWARNING, ("Cannot read osm id. Line:", line)); + ++stats.m_badOsmIds; + continue; + } + line = line.substr(i + 1); + + Entry entry; + // todo(@m) We should really write uints as uints. + auto osmId = base::GeoObjectId(static_cast(encodedId)); + entry.m_osmId = osmId; + + if (!entry.DeserializeFromJSON(line, stats)) + continue; + + if (entry.m_type == Type::Count) + continue; + + ++stats.m_numLoaded; + if (stats.m_numLoaded % kLogBatch == 0) + LOG(LINFO, ("Read", (stats.m_numLoaded / kLogBatch) * kLogBatch, "entries")); + + localEntries.emplace(osmId, move(entry)); + } + + entries = move(localEntries); +} + +} // namespace geocoder diff --git a/geocoder/hierarchy_reader.hpp b/geocoder/hierarchy_reader.hpp new file mode 100644 index 0000000000..7694b986da --- /dev/null +++ b/geocoder/hierarchy_reader.hpp @@ -0,0 +1,31 @@ +#pragma once + +#include "geocoder/hierarchy.hpp" + +#include +#include +#include +#include + +namespace geocoder +{ + +class HierarchyReader +{ +public: + using Entry = Hierarchy::Entry; + using ParsingStats = Hierarchy::ParsingStats; + + HierarchyReader(std::string const & pathToJsonHierarchy); + + auto ReadEntries(size_t readerCount, ParsingStats & stats) -> std::vector; + +private: + void ReadEntryMap(std::multimap & entries, ParsingStats & stats); + auto UnionEntries(std::vector> & entryParts) -> std::vector; + + std::ifstream m_fileStm; + std::mutex m_mutex; +}; + +} // namespace geocoder