From 5dad0dcba80af88d08984c217d18f96593d80b13 Mon Sep 17 00:00:00 2001 From: Anatoly Serdtcev Date: Mon, 14 Jan 2019 18:48:26 +0300 Subject: [PATCH 01/14] [geocoder] Add concurent hierarchy reading --- geocoder/CMakeLists.txt | 2 + geocoder/geocoder.cpp | 4 +- geocoder/geocoder.hpp | 2 +- geocoder/hierarchy.cpp | 50 ++------------ geocoder/hierarchy.hpp | 19 +++--- geocoder/hierarchy_reader.cpp | 118 ++++++++++++++++++++++++++++++++++ geocoder/hierarchy_reader.hpp | 31 +++++++++ 7 files changed, 169 insertions(+), 57 deletions(-) create mode 100644 geocoder/hierarchy_reader.cpp create mode 100644 geocoder/hierarchy_reader.hpp diff --git a/geocoder/CMakeLists.txt b/geocoder/CMakeLists.txt index 3e50874841..5c82e4398b 100644 --- a/geocoder/CMakeLists.txt +++ b/geocoder/CMakeLists.txt @@ -8,6 +8,8 @@ set( geocoder.hpp hierarchy.cpp hierarchy.hpp + hierarchy_reader.cpp + hierarchy_reader.hpp index.cpp index.hpp result.cpp diff --git a/geocoder/geocoder.cpp b/geocoder/geocoder.cpp index 7049ac7a7e..2489ef083a 100644 --- a/geocoder/geocoder.cpp +++ b/geocoder/geocoder.cpp @@ -193,8 +193,8 @@ vector & Geocoder::Context::GetLayers() { return m_layers; } vector const & Geocoder::Context::GetLayers() const { return m_layers; } // Geocoder ---------------------------------------------------------------------------------------- -Geocoder::Geocoder(string const & pathToJsonHierarchy) - : m_hierarchy(pathToJsonHierarchy), m_index(m_hierarchy) +Geocoder::Geocoder(string const & pathToJsonHierarchy, size_t readerCount) + : m_hierarchy(pathToJsonHierarchy, readerCount), m_index(m_hierarchy) { } diff --git a/geocoder/geocoder.hpp b/geocoder/geocoder.hpp index f987de5f11..513d5c7df3 100644 --- a/geocoder/geocoder.hpp +++ b/geocoder/geocoder.hpp @@ -118,7 +118,7 @@ public: std::vector m_layers; }; - explicit Geocoder(std::string const & pathToJsonHierarchy); + explicit Geocoder(std::string const & pathToJsonHierarchy, std::size_t readerCount = 4); void ProcessQuery(std::string const & query, std::vector & results) const; diff --git a/geocoder/hierarchy.cpp b/geocoder/hierarchy.cpp index 43e334b0bf..5c82e3b7bd 100644 --- a/geocoder/hierarchy.cpp +++ b/geocoder/hierarchy.cpp @@ -1,5 +1,7 @@ #include "geocoder/hierarchy.hpp" +#include "geocoder/hierarchy_reader.hpp" + #include "indexer/search_string_utils.hpp" #include "base/exception.hpp" @@ -9,16 +11,12 @@ #include "base/string_utils.hpp" #include -#include #include using namespace std; namespace { -// Information will be logged for every |kLogBatch| entries. -size_t const kLogBatch = 100000; - void CheckDuplicateOsmIds(vector const & entries, geocoder::Hierarchy::ParsingStats & stats) { @@ -138,50 +136,12 @@ bool Hierarchy::Entry::IsParentTo(Hierarchy::Entry const & e) const } // Hierarchy --------------------------------------------------------------------------------------- -Hierarchy::Hierarchy(string const & pathToJsonHierarchy) +Hierarchy::Hierarchy(string const & pathToJsonHierarchy, size_t readerCount) { - ifstream ifs(pathToJsonHierarchy); - string line; ParsingStats stats; - LOG(LINFO, ("Reading entries...")); - while (getline(ifs, line)) - { - if (line.empty()) - continue; - - auto const i = line.find(' '); - int64_t encodedId; - if (i == string::npos || !strings::to_any(line.substr(0, i), encodedId)) - { - LOG(LWARNING, ("Cannot read osm id. Line:", line)); - ++stats.m_badOsmIds; - continue; - } - line = line.substr(i + 1); - - Entry entry; - // todo(@m) We should really write uints as uints. - entry.m_osmId = base::GeoObjectId(static_cast(encodedId)); - - if (!entry.DeserializeFromJSON(line, stats)) - continue; - - if (entry.m_type == Type::Count) - continue; - - ++stats.m_numLoaded; - if (stats.m_numLoaded % kLogBatch == 0) - LOG(LINFO, ("Read", stats.m_numLoaded, "entries")); - - m_entries.emplace_back(move(entry)); - } - - if (stats.m_numLoaded % kLogBatch != 0) - LOG(LINFO, ("Read", stats.m_numLoaded, "entries")); - - LOG(LINFO, ("Sorting entries...")); - sort(m_entries.begin(), m_entries.end()); + HierarchyReader reader{pathToJsonHierarchy}; + m_entries = reader.ReadEntries(readerCount, stats); CheckDuplicateOsmIds(m_entries, stats); diff --git a/geocoder/hierarchy.hpp b/geocoder/hierarchy.hpp index bae7d2fc38..88e04e21ff 100644 --- a/geocoder/hierarchy.hpp +++ b/geocoder/hierarchy.hpp @@ -5,6 +5,7 @@ #include "base/geo_object_id.hpp" #include +#include #include #include #include @@ -20,31 +21,31 @@ public: struct ParsingStats { // Number of entries that the hierarchy was constructed from. - uint64_t m_numLoaded = 0; + std::atomic m_numLoaded{0}; // Number of corrupted json lines. - uint64_t m_badJsons = 0; + std::atomic m_badJsons{0}; // Number of entries with unreadable base::GeoObjectIds. - uint64_t m_badOsmIds = 0; + std::atomic m_badOsmIds{0}; // Number of base::GeoObjectsIds that occur as keys in at least two entries. - uint64_t m_duplicateOsmIds = 0; + std::atomic m_duplicateOsmIds{0}; // Number of entries with duplicate subfields in the address field. - uint64_t m_duplicateAddresses = 0; + std::atomic m_duplicateAddresses{0}; // Number of entries whose address field either does // not exist or consists of empty lines. - uint64_t m_emptyAddresses = 0; + std::atomic m_emptyAddresses{0}; // Number of entries without the name field or with an empty one. - uint64_t m_emptyNames = 0; + std::atomic m_emptyNames{0}; // Number of entries whose names do not match the most // specific parts of their addresses. // This is expected from POIs but not from regions or streets. - uint64_t m_mismatchedNames = 0; + std::atomic m_mismatchedNames{0}; }; // A single entry in the hierarchy directed acyclic graph. @@ -75,7 +76,7 @@ public: std::array(Type::Count) + 1> m_address; }; - explicit Hierarchy(std::string const & pathToJsonHierarchy); + explicit Hierarchy(std::string const & pathToJsonHierarchy, std::size_t readerCount = 4); std::vector const & GetEntries() const; diff --git a/geocoder/hierarchy_reader.cpp b/geocoder/hierarchy_reader.cpp new file mode 100644 index 0000000000..bac7f4f962 --- /dev/null +++ b/geocoder/hierarchy_reader.cpp @@ -0,0 +1,118 @@ +#include "geocoder/hierarchy_reader.hpp" + +#include "base/logging.hpp" + +#include + +using namespace std; + +namespace geocoder { + +namespace +{ +// Information will be logged for every |kLogBatch| entries. +size_t const kLogBatch = 100000; +} // namespace + +HierarchyReader::HierarchyReader(string const & pathToJsonHierarchy) : + m_fileStm{pathToJsonHierarchy} +{ + if (!m_fileStm) + throw runtime_error("failed to open file " + pathToJsonHierarchy); +} + +auto HierarchyReader::ReadEntries(size_t readerCount, ParsingStats & stats) + -> vector +{ + LOG(LINFO, ("Reading entries...")); + + auto taskEntries = vector>(readerCount); + auto tasks = vector{}; + for (auto t = size_t{0}; t < readerCount; ++t) + tasks.emplace_back(&HierarchyReader::ReadEntryMap, this, ref(taskEntries[t]), ref(stats)); + + for (auto & reader : tasks) + reader.join(); + + if (stats.m_numLoaded % kLogBatch != 0) + LOG(LINFO, ("Read", stats.m_numLoaded, "entries")); + + return UnionEntries(taskEntries); +} + +auto HierarchyReader::UnionEntries(vector> & entryParts) -> vector +{ + auto entries = vector{}; + + auto size = size_t{0}; + for (auto const & map : entryParts) + size += map.size(); + + entries.reserve(size); + + LOG(LINFO, ("Sorting entries...")); + + while (entryParts.size()) + { + auto minPart = min_element(entryParts.begin(), entryParts.end()); + + entries.emplace_back(std::move(minPart->begin()->second)); + + minPart->erase(minPart->begin()); + if (minPart->empty()) + entryParts.erase(minPart); + } + + return entries; +} + +void HierarchyReader::ReadEntryMap(multimap & entries, ParsingStats & stats) +{ + // Temporary local object for efficient concurent processing (individual cache line for container). + auto localEntries = multimap{}; + + string line; + while (true) + { + { + auto && lock = lock_guard(m_mutex); + + if (!getline(m_fileStm, line)) + break; + } + + if (line.empty()) + continue; + + auto const i = line.find(' '); + int64_t encodedId; + if (i == string::npos || !strings::to_any(line.substr(0, i), encodedId)) + { + LOG(LWARNING, ("Cannot read osm id. Line:", line)); + ++stats.m_badOsmIds; + continue; + } + line = line.substr(i + 1); + + Entry entry; + // todo(@m) We should really write uints as uints. + auto osmId = base::GeoObjectId(static_cast(encodedId)); + entry.m_osmId = osmId; + + if (!entry.DeserializeFromJSON(line, stats)) + continue; + + if (entry.m_type == Type::Count) + continue; + + ++stats.m_numLoaded; + if (stats.m_numLoaded % kLogBatch == 0) + LOG(LINFO, ("Read", (stats.m_numLoaded / kLogBatch) * kLogBatch, "entries")); + + localEntries.emplace(osmId, move(entry)); + } + + entries = move(localEntries); +} + +} // namespace geocoder diff --git a/geocoder/hierarchy_reader.hpp b/geocoder/hierarchy_reader.hpp new file mode 100644 index 0000000000..7694b986da --- /dev/null +++ b/geocoder/hierarchy_reader.hpp @@ -0,0 +1,31 @@ +#pragma once + +#include "geocoder/hierarchy.hpp" + +#include +#include +#include +#include + +namespace geocoder +{ + +class HierarchyReader +{ +public: + using Entry = Hierarchy::Entry; + using ParsingStats = Hierarchy::ParsingStats; + + HierarchyReader(std::string const & pathToJsonHierarchy); + + auto ReadEntries(size_t readerCount, ParsingStats & stats) -> std::vector; + +private: + void ReadEntryMap(std::multimap & entries, ParsingStats & stats); + auto UnionEntries(std::vector> & entryParts) -> std::vector; + + std::ifstream m_fileStm; + std::mutex m_mutex; +}; + +} // namespace geocoder From b4cb0204dede2abeea2cd0f2ae11167d0342650b Mon Sep 17 00:00:00 2001 From: Anatoly Serdtcev Date: Tue, 15 Jan 2019 12:33:09 +0300 Subject: [PATCH 02/14] [geocoder] Fix for review --- geocoder/hierarchy_reader.cpp | 23 +++++++++++++---------- geocoder/hierarchy_reader.hpp | 6 ++++-- 2 files changed, 17 insertions(+), 12 deletions(-) diff --git a/geocoder/hierarchy_reader.cpp b/geocoder/hierarchy_reader.cpp index bac7f4f962..3906d212f2 100644 --- a/geocoder/hierarchy_reader.cpp +++ b/geocoder/hierarchy_reader.cpp @@ -6,8 +6,8 @@ using namespace std; -namespace geocoder { - +namespace geocoder +{ namespace { // Information will be logged for every |kLogBatch| entries. @@ -18,7 +18,7 @@ HierarchyReader::HierarchyReader(string const & pathToJsonHierarchy) : m_fileStm{pathToJsonHierarchy} { if (!m_fileStm) - throw runtime_error("failed to open file " + pathToJsonHierarchy); + MYTHROW(OpenException, ("Failed to open file", pathToJsonHierarchy)); } auto HierarchyReader::ReadEntries(size_t readerCount, ParsingStats & stats) @@ -26,9 +26,9 @@ auto HierarchyReader::ReadEntries(size_t readerCount, ParsingStats & stats) { LOG(LINFO, ("Reading entries...")); - auto taskEntries = vector>(readerCount); - auto tasks = vector{}; - for (auto t = size_t{0}; t < readerCount; ++t) + vector> taskEntries(readerCount); + vector tasks{}; + for (size_t t = 0; t < readerCount; ++t) tasks.emplace_back(&HierarchyReader::ReadEntryMap, this, ref(taskEntries[t]), ref(stats)); for (auto & reader : tasks) @@ -44,7 +44,7 @@ auto HierarchyReader::UnionEntries(vector> & { auto entries = vector{}; - auto size = size_t{0}; + size_t size{0}; for (auto const & map : entryParts) size += map.size(); @@ -56,9 +56,12 @@ auto HierarchyReader::UnionEntries(vector> & { auto minPart = min_element(entryParts.begin(), entryParts.end()); - entries.emplace_back(std::move(minPart->begin()->second)); - - minPart->erase(minPart->begin()); + if (minPart->size()) + { + entries.emplace_back(std::move(minPart->begin()->second)); + minPart->erase(minPart->begin()); + } + if (minPart->empty()) entryParts.erase(minPart); } diff --git a/geocoder/hierarchy_reader.hpp b/geocoder/hierarchy_reader.hpp index 7694b986da..db460bbf4f 100644 --- a/geocoder/hierarchy_reader.hpp +++ b/geocoder/hierarchy_reader.hpp @@ -2,6 +2,8 @@ #include "geocoder/hierarchy.hpp" +#include "base/exception.hpp" + #include #include #include @@ -9,13 +11,14 @@ namespace geocoder { - class HierarchyReader { public: using Entry = Hierarchy::Entry; using ParsingStats = Hierarchy::ParsingStats; + DECLARE_EXCEPTION(OpenException, RootException); + HierarchyReader(std::string const & pathToJsonHierarchy); auto ReadEntries(size_t readerCount, ParsingStats & stats) -> std::vector; @@ -27,5 +30,4 @@ private: std::ifstream m_fileStm; std::mutex m_mutex; }; - } // namespace geocoder From cf799c1ea7c7e142db86d0427471344f2ab72a2d Mon Sep 17 00:00:00 2001 From: Anatoly Serdtcev Date: Tue, 15 Jan 2019 13:06:04 +0300 Subject: [PATCH 03/14] [geocoder] Fix for review: readers max - 8 --- geocoder/hierarchy_reader.cpp | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/geocoder/hierarchy_reader.cpp b/geocoder/hierarchy_reader.cpp index 3906d212f2..7d15645d85 100644 --- a/geocoder/hierarchy_reader.cpp +++ b/geocoder/hierarchy_reader.cpp @@ -25,7 +25,8 @@ auto HierarchyReader::ReadEntries(size_t readerCount, ParsingStats & stats) -> vector { LOG(LINFO, ("Reading entries...")); - + + readerCount = max(readerCount, size_t{8}); vector> taskEntries(readerCount); vector tasks{}; for (size_t t = 0; t < readerCount; ++t) From 3842ba3aacb979b924d063b397a19535cd3dc762 Mon Sep 17 00:00:00 2001 From: Anatoly Serdtcev Date: Tue, 22 Jan 2019 19:15:37 +0300 Subject: [PATCH 04/14] Fix for review --- geocoder/geocoder.cpp | 4 +-- geocoder/geocoder.hpp | 2 +- geocoder/geocoder_tests/geocoder_tests.cpp | 29 ++++++++++++++++++++++ geocoder/hierarchy.cpp | 4 +-- geocoder/hierarchy.hpp | 2 +- geocoder/hierarchy_reader.cpp | 8 +++--- geocoder/hierarchy_reader.hpp | 4 +-- 7 files changed, 40 insertions(+), 13 deletions(-) diff --git a/geocoder/geocoder.cpp b/geocoder/geocoder.cpp index 2489ef083a..050ec420f1 100644 --- a/geocoder/geocoder.cpp +++ b/geocoder/geocoder.cpp @@ -193,8 +193,8 @@ vector & Geocoder::Context::GetLayers() { return m_layers; } vector const & Geocoder::Context::GetLayers() const { return m_layers; } // Geocoder ---------------------------------------------------------------------------------------- -Geocoder::Geocoder(string const & pathToJsonHierarchy, size_t readerCount) - : m_hierarchy(pathToJsonHierarchy, readerCount), m_index(m_hierarchy) +Geocoder::Geocoder(string const & pathToJsonHierarchy, size_t hierarchyReadersCount) + : m_hierarchy(pathToJsonHierarchy, hierarchyReadersCount), m_index(m_hierarchy) { } diff --git a/geocoder/geocoder.hpp b/geocoder/geocoder.hpp index 513d5c7df3..22c1452c7d 100644 --- a/geocoder/geocoder.hpp +++ b/geocoder/geocoder.hpp @@ -118,7 +118,7 @@ public: std::vector m_layers; }; - explicit Geocoder(std::string const & pathToJsonHierarchy, std::size_t readerCount = 4); + explicit Geocoder(std::string const & pathToJsonHierarchy, std::size_t hierarchyReadersCount = 4); void ProcessQuery(std::string const & query, std::vector & results) const; diff --git a/geocoder/geocoder_tests/geocoder_tests.cpp b/geocoder/geocoder_tests/geocoder_tests.cpp index 58c951e9fa..08d64bc6e7 100644 --- a/geocoder/geocoder_tests/geocoder_tests.cpp +++ b/geocoder/geocoder_tests/geocoder_tests.cpp @@ -54,6 +54,35 @@ void TestGeocoder(Geocoder & geocoder, string const & query, vector && e } } +UNIT_TEST(Geocoder_EmptyFileConcurrentRead) +{ + ScopedFile const regionsJsonFile("regions.jsonl", ""); + Geocoder geocoder(regionsJsonFile.GetFullPath(), 8 /* reader threads */); + + TEST_EQUAL(geocoder.GetHierarchy().GetEntries().size(), 0, ()); +} + +UNIT_TEST(Geocoder_BigFileConcurrentRead) +{ + int const kEntryCount = 1000000; + + stringstream s; + for (int i = 0; i < kEntryCount; ++i) + { + s << i << " " + << "{" + << R"("type": "Feature",)" + << R"("geometry": {"type": "Point", "coordinates": [0, 0]},)" + << R"("properties": {"name": ")" << i << R"(", "rank": 2, "address": {"country": ")" << i << R"("}})" + << "}\n"; + } + + ScopedFile const regionsJsonFile("regions.jsonl", s.str()); + Geocoder geocoder(regionsJsonFile.GetFullPath(), 8 /* reader threads */); + + TEST_EQUAL(geocoder.GetHierarchy().GetEntries().size(), kEntryCount, ()); +} + UNIT_TEST(Geocoder_Smoke) { ScopedFile const regionsJsonFile("regions.jsonl", kRegionsData); diff --git a/geocoder/hierarchy.cpp b/geocoder/hierarchy.cpp index 5c82e3b7bd..5058b3aaf6 100644 --- a/geocoder/hierarchy.cpp +++ b/geocoder/hierarchy.cpp @@ -136,12 +136,12 @@ bool Hierarchy::Entry::IsParentTo(Hierarchy::Entry const & e) const } // Hierarchy --------------------------------------------------------------------------------------- -Hierarchy::Hierarchy(string const & pathToJsonHierarchy, size_t readerCount) +Hierarchy::Hierarchy(string const & pathToJsonHierarchy, size_t readersCount) { ParsingStats stats; HierarchyReader reader{pathToJsonHierarchy}; - m_entries = reader.ReadEntries(readerCount, stats); + m_entries = reader.ReadEntries(readersCount, stats); CheckDuplicateOsmIds(m_entries, stats); diff --git a/geocoder/hierarchy.hpp b/geocoder/hierarchy.hpp index 88e04e21ff..fc78b9997c 100644 --- a/geocoder/hierarchy.hpp +++ b/geocoder/hierarchy.hpp @@ -76,7 +76,7 @@ public: std::array(Type::Count) + 1> m_address; }; - explicit Hierarchy(std::string const & pathToJsonHierarchy, std::size_t readerCount = 4); + explicit Hierarchy(std::string const & pathToJsonHierarchy, std::size_t readersCount = 4); std::vector const & GetEntries() const; diff --git a/geocoder/hierarchy_reader.cpp b/geocoder/hierarchy_reader.cpp index 7d15645d85..c306e93ca6 100644 --- a/geocoder/hierarchy_reader.cpp +++ b/geocoder/hierarchy_reader.cpp @@ -21,8 +21,7 @@ HierarchyReader::HierarchyReader(string const & pathToJsonHierarchy) : MYTHROW(OpenException, ("Failed to open file", pathToJsonHierarchy)); } -auto HierarchyReader::ReadEntries(size_t readerCount, ParsingStats & stats) - -> vector +vector HierarchyReader::ReadEntries(size_t readerCount, ParsingStats & stats) { LOG(LINFO, ("Reading entries...")); @@ -41,7 +40,7 @@ auto HierarchyReader::ReadEntries(size_t readerCount, ParsingStats & stats) return UnionEntries(taskEntries); } -auto HierarchyReader::UnionEntries(vector> & entryParts) -> vector +vector HierarchyReader::UnionEntries(vector> & entryParts) { auto entries = vector{}; @@ -59,7 +58,7 @@ auto HierarchyReader::UnionEntries(vector> & if (minPart->size()) { - entries.emplace_back(std::move(minPart->begin()->second)); + entries.emplace_back(move(minPart->begin()->second)); minPart->erase(minPart->begin()); } @@ -118,5 +117,4 @@ void HierarchyReader::ReadEntryMap(multimap & entries, entries = move(localEntries); } - } // namespace geocoder diff --git a/geocoder/hierarchy_reader.hpp b/geocoder/hierarchy_reader.hpp index db460bbf4f..d964ac7f2a 100644 --- a/geocoder/hierarchy_reader.hpp +++ b/geocoder/hierarchy_reader.hpp @@ -21,11 +21,11 @@ public: HierarchyReader(std::string const & pathToJsonHierarchy); - auto ReadEntries(size_t readerCount, ParsingStats & stats) -> std::vector; + std::vector ReadEntries(size_t readerCount, ParsingStats & stats); private: void ReadEntryMap(std::multimap & entries, ParsingStats & stats); - auto UnionEntries(std::vector> & entryParts) -> std::vector; + std::vector UnionEntries(std::vector> & entryParts); std::ifstream m_fileStm; std::mutex m_mutex; From 1ea259ab053e2dce54eebfcb83d330a831115685 Mon Sep 17 00:00:00 2001 From: Anatoly Serdtcev Date: Tue, 22 Jan 2019 20:06:38 +0300 Subject: [PATCH 05/14] Fix for review --- geocoder/hierarchy_reader.cpp | 10 +++++----- geocoder/hierarchy_reader.hpp | 6 ++++-- 2 files changed, 9 insertions(+), 7 deletions(-) diff --git a/geocoder/hierarchy_reader.cpp b/geocoder/hierarchy_reader.cpp index c306e93ca6..2ac3e1ae8b 100644 --- a/geocoder/hierarchy_reader.cpp +++ b/geocoder/hierarchy_reader.cpp @@ -21,14 +21,14 @@ HierarchyReader::HierarchyReader(string const & pathToJsonHierarchy) : MYTHROW(OpenException, ("Failed to open file", pathToJsonHierarchy)); } -vector HierarchyReader::ReadEntries(size_t readerCount, ParsingStats & stats) +vector HierarchyReader::ReadEntries(size_t readersCount, ParsingStats & stats) { LOG(LINFO, ("Reading entries...")); - readerCount = max(readerCount, size_t{8}); - vector> taskEntries(readerCount); + readersCount = min(readersCount, size_t{8}); + vector> taskEntries(readersCount); vector tasks{}; - for (size_t t = 0; t < readerCount; ++t) + for (size_t t = 0; t < readersCount; ++t) tasks.emplace_back(&HierarchyReader::ReadEntryMap, this, ref(taskEntries[t]), ref(stats)); for (auto & reader : tasks) @@ -99,7 +99,7 @@ void HierarchyReader::ReadEntryMap(multimap & entries, Entry entry; // todo(@m) We should really write uints as uints. - auto osmId = base::GeoObjectId(static_cast(encodedId)); + auto const osmId = base::GeoObjectId(static_cast(encodedId)); entry.m_osmId = osmId; if (!entry.DeserializeFromJSON(line, stats)) diff --git a/geocoder/hierarchy_reader.hpp b/geocoder/hierarchy_reader.hpp index d964ac7f2a..e28a318fe6 100644 --- a/geocoder/hierarchy_reader.hpp +++ b/geocoder/hierarchy_reader.hpp @@ -3,10 +3,12 @@ #include "geocoder/hierarchy.hpp" #include "base/exception.hpp" +#include "base/geo_object_id.hpp" #include #include #include +#include #include namespace geocoder @@ -19,9 +21,9 @@ public: DECLARE_EXCEPTION(OpenException, RootException); - HierarchyReader(std::string const & pathToJsonHierarchy); + explicit HierarchyReader(std::string const & pathToJsonHierarchy); - std::vector ReadEntries(size_t readerCount, ParsingStats & stats); + std::vector ReadEntries(size_t readersCount, ParsingStats & stats); private: void ReadEntryMap(std::multimap & entries, ParsingStats & stats); From 28ba5721273e5998b45cddb4066ed8a7ae5aadbb Mon Sep 17 00:00:00 2001 From: Anatoly Serdtcev Date: Fri, 25 Jan 2019 16:36:11 +0300 Subject: [PATCH 06/14] [geocoder] Fix for review --- geocoder/geocoder_tests/geocoder_tests.cpp | 58 ++++++------- geocoder/hierarchy_reader.cpp | 95 ++++++++++++++++------ geocoder/hierarchy_reader.hpp | 10 ++- 3 files changed, 105 insertions(+), 58 deletions(-) diff --git a/geocoder/geocoder_tests/geocoder_tests.cpp b/geocoder/geocoder_tests/geocoder_tests.cpp index 08d64bc6e7..6507ddc0a0 100644 --- a/geocoder/geocoder_tests/geocoder_tests.cpp +++ b/geocoder/geocoder_tests/geocoder_tests.cpp @@ -54,35 +54,6 @@ void TestGeocoder(Geocoder & geocoder, string const & query, vector && e } } -UNIT_TEST(Geocoder_EmptyFileConcurrentRead) -{ - ScopedFile const regionsJsonFile("regions.jsonl", ""); - Geocoder geocoder(regionsJsonFile.GetFullPath(), 8 /* reader threads */); - - TEST_EQUAL(geocoder.GetHierarchy().GetEntries().size(), 0, ()); -} - -UNIT_TEST(Geocoder_BigFileConcurrentRead) -{ - int const kEntryCount = 1000000; - - stringstream s; - for (int i = 0; i < kEntryCount; ++i) - { - s << i << " " - << "{" - << R"("type": "Feature",)" - << R"("geometry": {"type": "Point", "coordinates": [0, 0]},)" - << R"("properties": {"name": ")" << i << R"(", "rank": 2, "address": {"country": ")" << i << R"("}})" - << "}\n"; - } - - ScopedFile const regionsJsonFile("regions.jsonl", s.str()); - Geocoder geocoder(regionsJsonFile.GetFullPath(), 8 /* reader threads */); - - TEST_EQUAL(geocoder.GetHierarchy().GetEntries().size(), kEntryCount, ()); -} - UNIT_TEST(Geocoder_Smoke) { ScopedFile const regionsJsonFile("regions.jsonl", kRegionsData); @@ -180,4 +151,33 @@ UNIT_TEST(Geocoder_MismatchedLocality) // "Street 3" looks almost like a match to "Paris-Street-3" but we should not emit it. TestGeocoder(geocoder, "Moscow Street 3", {}); } + +UNIT_TEST(Geocoder_EmptyFileConcurrentRead) +{ + ScopedFile const regionsJsonFile("regions.jsonl", ""); + Geocoder geocoder(regionsJsonFile.GetFullPath(), 8 /* reader threads */); + + TEST_EQUAL(geocoder.GetHierarchy().GetEntries().size(), 0, ()); +} + +UNIT_TEST(Geocoder_BigFileConcurrentRead) +{ + int const kEntryCount = 1000000; + + stringstream s; + for (int i = 0; i < kEntryCount; ++i) + { + s << i << " " + << "{" + << R"("type": "Feature",)" + << R"("geometry": {"type": "Point", "coordinates": [0, 0]},)" + << R"("properties": {"name": ")" << i << R"(", "rank": 2, "address": {"country": ")" << i << R"("}})" + << "}\n"; + } + + ScopedFile const regionsJsonFile("regions.jsonl", s.str()); + Geocoder geocoder(regionsJsonFile.GetFullPath(), 8 /* reader threads */); + + TEST_EQUAL(geocoder.GetHierarchy().GetEntries().size(), kEntryCount, ()); +} } // namespace geocoder diff --git a/geocoder/hierarchy_reader.cpp b/geocoder/hierarchy_reader.cpp index 2ac3e1ae8b..3f8cdd5e0e 100644 --- a/geocoder/hierarchy_reader.cpp +++ b/geocoder/hierarchy_reader.cpp @@ -1,7 +1,12 @@ #include "geocoder/hierarchy_reader.hpp" #include "base/logging.hpp" +#include "base/string_utils.hpp" +#include +#include + +#include #include using namespace std; @@ -14,10 +19,15 @@ namespace size_t const kLogBatch = 100000; } // namespace -HierarchyReader::HierarchyReader(string const & pathToJsonHierarchy) : - m_fileStm{pathToJsonHierarchy} +HierarchyReader::HierarchyReader(string const & pathToJsonHierarchy) { - if (!m_fileStm) + using namespace boost::iostreams; + + if (strings::EndsWith(pathToJsonHierarchy, ".gz")) + m_fileStream.push(gzip_decompressor()); + m_fileStream.push(file_source(pathToJsonHierarchy)); + + if (!m_fileStream) MYTHROW(OpenException, ("Failed to open file", pathToJsonHierarchy)); } @@ -25,7 +35,8 @@ vector HierarchyReader::ReadEntries(size_t readersCount, Parsi { LOG(LINFO, ("Reading entries...")); - readersCount = min(readersCount, size_t{8}); + readersCount = min(readersCount, size_t{thread::hardware_concurrency()}); + vector> taskEntries(readersCount); vector tasks{}; for (size_t t = 0; t < readersCount; ++t) @@ -37,10 +48,10 @@ vector HierarchyReader::ReadEntries(size_t readersCount, Parsi if (stats.m_numLoaded % kLogBatch != 0) LOG(LINFO, ("Read", stats.m_numLoaded, "entries")); - return UnionEntries(taskEntries); + return MergeEntries(taskEntries); } -vector HierarchyReader::UnionEntries(vector> & entryParts) +vector HierarchyReader::MergeEntries(vector> & entryParts) { auto entries = vector{}; @@ -50,20 +61,30 @@ vector HierarchyReader::UnionEntries(vector>; + struct ReferenceGreater { - auto minPart = min_element(entryParts.begin(), entryParts.end()); + bool operator () (PartReference const & l, PartReference const & r) const noexcept + { return l.get() > r.get(); } + }; - if (minPart->size()) + auto partsQueue = priority_queue, ReferenceGreater> + (entryParts.begin(), entryParts.end()); + while (partsQueue.size()) + { + auto & minPart = partsQueue.top().get(); + partsQueue.pop(); + + while (minPart.size() && (partsQueue.empty() || minPart <= partsQueue.top().get())) { - entries.emplace_back(move(minPart->begin()->second)); - minPart->erase(minPart->begin()); + entries.emplace_back(move(minPart.begin()->second)); + minPart.erase(minPart.begin()); } - if (minPart->empty()) - entryParts.erase(minPart); + if (minPart.size()) + partsQueue.push(ref(minPart)); } return entries; @@ -74,35 +95,59 @@ void HierarchyReader::ReadEntryMap(multimap & entries, // Temporary local object for efficient concurent processing (individual cache line for container). auto localEntries = multimap{}; - string line; + int const kLineBufferCapacity = 10000; + vector linesBuffer(kLineBufferCapacity); + int bufferSize = 0; + while (true) { + bufferSize = 0; + { auto && lock = lock_guard(m_mutex); - - if (!getline(m_fileStm, line)) - break; + + for (; bufferSize < kLineBufferCapacity; ++bufferSize) + { + if (!getline(m_fileStream, linesBuffer[bufferSize])) + break; + } } - + + if (!bufferSize) + break; + + DeserializeEntryMap(linesBuffer, bufferSize, localEntries, stats); + } + + entries = move(localEntries); +} + +void HierarchyReader::DeserializeEntryMap(vector const & linesBuffer, int const bufferSize, + multimap & entries, ParsingStats & stats) +{ + for (int i = 0; i < bufferSize; ++i) + { + auto & line = linesBuffer[i]; + if (line.empty()) continue; - auto const i = line.find(' '); + auto const p = line.find(' '); int64_t encodedId; - if (i == string::npos || !strings::to_any(line.substr(0, i), encodedId)) + if (p == string::npos || !strings::to_any(line.substr(0, p), encodedId)) { LOG(LWARNING, ("Cannot read osm id. Line:", line)); ++stats.m_badOsmIds; continue; } - line = line.substr(i + 1); + auto json = line.substr(p + 1); Entry entry; // todo(@m) We should really write uints as uints. auto const osmId = base::GeoObjectId(static_cast(encodedId)); entry.m_osmId = osmId; - if (!entry.DeserializeFromJSON(line, stats)) + if (!entry.DeserializeFromJSON(json, stats)) continue; if (entry.m_type == Type::Count) @@ -112,9 +157,7 @@ void HierarchyReader::ReadEntryMap(multimap & entries, if (stats.m_numLoaded % kLogBatch == 0) LOG(LINFO, ("Read", (stats.m_numLoaded / kLogBatch) * kLogBatch, "entries")); - localEntries.emplace(osmId, move(entry)); + entries.emplace(osmId, move(entry)); } - - entries = move(localEntries); } } // namespace geocoder diff --git a/geocoder/hierarchy_reader.hpp b/geocoder/hierarchy_reader.hpp index e28a318fe6..70b21badb4 100644 --- a/geocoder/hierarchy_reader.hpp +++ b/geocoder/hierarchy_reader.hpp @@ -5,7 +5,8 @@ #include "base/exception.hpp" #include "base/geo_object_id.hpp" -#include +#include + #include #include #include @@ -27,9 +28,12 @@ public: private: void ReadEntryMap(std::multimap & entries, ParsingStats & stats); - std::vector UnionEntries(std::vector> & entryParts); - std::ifstream m_fileStm; + void DeserializeEntryMap(std::vector const & linesBuffer, int const bufferSize, + std::multimap & entries, ParsingStats & stats); + std::vector MergeEntries(std::vector> & entryParts); + + boost::iostreams::filtering_istream m_fileStream; std::mutex m_mutex; }; } // namespace geocoder From 6f6c7a4d25b8b5b984e44e59a16174949cc059c4 Mon Sep 17 00:00:00 2001 From: Anatoly Serdtcev Date: Fri, 25 Jan 2019 19:49:19 +0300 Subject: [PATCH 07/14] [geocoder] Fix for review --- geocoder/hierarchy_reader.cpp | 7 ++++--- geocoder/hierarchy_reader.hpp | 1 + 2 files changed, 5 insertions(+), 3 deletions(-) diff --git a/geocoder/hierarchy_reader.cpp b/geocoder/hierarchy_reader.cpp index 3f8cdd5e0e..4dadc9be7b 100644 --- a/geocoder/hierarchy_reader.cpp +++ b/geocoder/hierarchy_reader.cpp @@ -153,9 +153,10 @@ void HierarchyReader::DeserializeEntryMap(vector const & linesBuffer, in if (entry.m_type == Type::Count) continue; - ++stats.m_numLoaded; - if (stats.m_numLoaded % kLogBatch == 0) - LOG(LINFO, ("Read", (stats.m_numLoaded / kLogBatch) * kLogBatch, "entries")); + auto numLoaded = stats.m_numLoaded.fetch_add(1) + 1; + + if (numLoaded % kLogBatch == 0) + LOG(LINFO, ("Read", numLoaded, "entries")); entries.emplace(osmId, move(entry)); } diff --git a/geocoder/hierarchy_reader.hpp b/geocoder/hierarchy_reader.hpp index 70b21badb4..0d55aaf464 100644 --- a/geocoder/hierarchy_reader.hpp +++ b/geocoder/hierarchy_reader.hpp @@ -31,6 +31,7 @@ private: void DeserializeEntryMap(std::vector const & linesBuffer, int const bufferSize, std::multimap & entries, ParsingStats & stats); + std::vector MergeEntries(std::vector> & entryParts); boost::iostreams::filtering_istream m_fileStream; From 4f07aadd8d233b783943c79b45c81129438d4917 Mon Sep 17 00:00:00 2001 From: Anatoly Serdtcev Date: Mon, 28 Jan 2019 15:16:44 +0300 Subject: [PATCH 08/14] [geocoder] Refact HierchyReader --- geocoder/geocoder.cpp | 13 ++- geocoder/geocoder.hpp | 3 +- geocoder/geocoder_tests/geocoder_tests.cpp | 7 +- geocoder/hierarchy.cpp | 52 ++---------- geocoder/hierarchy.hpp | 19 ++--- geocoder/hierarchy_reader.cpp | 97 +++++++++++++++++----- geocoder/hierarchy_reader.hpp | 12 ++- 7 files changed, 119 insertions(+), 84 deletions(-) diff --git a/geocoder/geocoder.cpp b/geocoder/geocoder.cpp index 050ec420f1..7d5784e17c 100644 --- a/geocoder/geocoder.cpp +++ b/geocoder/geocoder.cpp @@ -1,9 +1,11 @@ #include "geocoder/geocoder.hpp" -#include "search/house_numbers_matcher.hpp" +#include "geocoder/hierarchy_reader.hpp" #include "indexer/search_string_utils.hpp" +#include "search/house_numbers_matcher.hpp" + #include "base/assert.hpp" #include "base/logging.hpp" #include "base/scope_guard.hpp" @@ -193,8 +195,13 @@ vector & Geocoder::Context::GetLayers() { return m_layers; } vector const & Geocoder::Context::GetLayers() const { return m_layers; } // Geocoder ---------------------------------------------------------------------------------------- -Geocoder::Geocoder(string const & pathToJsonHierarchy, size_t hierarchyReadersCount) - : m_hierarchy(pathToJsonHierarchy, hierarchyReadersCount), m_index(m_hierarchy) +Geocoder::Geocoder(string const & pathToJsonHierarchy) + : Geocoder(HierarchyReader(pathToJsonHierarchy).Read()) +{ +} + +Geocoder::Geocoder(Hierarchy && hierarchy) + : m_hierarchy(move(hierarchy)), m_index(m_hierarchy) { } diff --git a/geocoder/geocoder.hpp b/geocoder/geocoder.hpp index 22c1452c7d..24fcda0b9a 100644 --- a/geocoder/geocoder.hpp +++ b/geocoder/geocoder.hpp @@ -118,7 +118,8 @@ public: std::vector m_layers; }; - explicit Geocoder(std::string const & pathToJsonHierarchy, std::size_t hierarchyReadersCount = 4); + explicit Geocoder(std::string const & pathToJsonHierarchy); + explicit Geocoder(Hierarchy && hierarchy); void ProcessQuery(std::string const & query, std::vector & results) const; diff --git a/geocoder/geocoder_tests/geocoder_tests.cpp b/geocoder/geocoder_tests/geocoder_tests.cpp index 6507ddc0a0..27a9d8a5ba 100644 --- a/geocoder/geocoder_tests/geocoder_tests.cpp +++ b/geocoder/geocoder_tests/geocoder_tests.cpp @@ -1,6 +1,7 @@ #include "testing/testing.hpp" #include "geocoder/geocoder.hpp" +#include "geocoder/hierarchy_reader.hpp" #include "indexer/search_string_utils.hpp" @@ -155,7 +156,8 @@ UNIT_TEST(Geocoder_MismatchedLocality) UNIT_TEST(Geocoder_EmptyFileConcurrentRead) { ScopedFile const regionsJsonFile("regions.jsonl", ""); - Geocoder geocoder(regionsJsonFile.GetFullPath(), 8 /* reader threads */); + HierarchyReader reader{regionsJsonFile.GetFullPath()}; + Geocoder geocoder(reader.Read(8 /* reader threads */)); TEST_EQUAL(geocoder.GetHierarchy().GetEntries().size(), 0, ()); } @@ -176,7 +178,8 @@ UNIT_TEST(Geocoder_BigFileConcurrentRead) } ScopedFile const regionsJsonFile("regions.jsonl", s.str()); - Geocoder geocoder(regionsJsonFile.GetFullPath(), 8 /* reader threads */); + HierarchyReader reader{regionsJsonFile.GetFullPath()}; + Geocoder geocoder(reader.Read(8 /* reader threads */)); TEST_EQUAL(geocoder.GetHierarchy().GetEntries().size(), kEntryCount, ()); } diff --git a/geocoder/hierarchy.cpp b/geocoder/hierarchy.cpp index 5058b3aaf6..d606f3459c 100644 --- a/geocoder/hierarchy.cpp +++ b/geocoder/hierarchy.cpp @@ -1,7 +1,5 @@ #include "geocoder/hierarchy.hpp" -#include "geocoder/hierarchy_reader.hpp" - #include "indexer/search_string_utils.hpp" #include "base/exception.hpp" @@ -15,30 +13,6 @@ using namespace std; -namespace -{ -void CheckDuplicateOsmIds(vector const & entries, - geocoder::Hierarchy::ParsingStats & stats) -{ - size_t i = 0; - while (i < entries.size()) - { - size_t j = i + 1; - while (j < entries.size() && entries[i].m_osmId == entries[j].m_osmId) - ++j; - if (j != i + 1) - { - ++stats.m_duplicateOsmIds; - // todo Remove the cast when the hierarchies no longer contain negative keys. - LOG(LDEBUG, - ("Duplicate osm id:", static_cast(entries[i].m_osmId.GetEncodedId()), "(", - entries[i].m_osmId, ")", "occurs as a key in", j - i, "key-value entries.")); - } - i = j; - } -} -} // namespace - namespace geocoder { // Hierarchy::Entry -------------------------------------------------------------------------------- @@ -136,26 +110,14 @@ bool Hierarchy::Entry::IsParentTo(Hierarchy::Entry const & e) const } // Hierarchy --------------------------------------------------------------------------------------- -Hierarchy::Hierarchy(string const & pathToJsonHierarchy, size_t readersCount) +Hierarchy::Hierarchy(vector && entries, bool sorted) + : m_entries{std::move(entries)} { - ParsingStats stats; - - HierarchyReader reader{pathToJsonHierarchy}; - m_entries = reader.ReadEntries(readersCount, stats); - - CheckDuplicateOsmIds(m_entries, stats); - - LOG(LINFO, ("Finished reading and indexing the hierarchy. Stats:")); - LOG(LINFO, ("Entries loaded:", stats.m_numLoaded)); - LOG(LINFO, ("Corrupted json lines:", stats.m_badJsons)); - LOG(LINFO, ("Unreadable base::GeoObjectIds:", stats.m_badOsmIds)); - LOG(LINFO, ("Duplicate base::GeoObjectIds:", stats.m_duplicateOsmIds)); - LOG(LINFO, ("Entries with duplicate address parts:", stats.m_duplicateAddresses)); - LOG(LINFO, ("Entries without address:", stats.m_emptyAddresses)); - LOG(LINFO, ("Entries without names:", stats.m_emptyNames)); - LOG(LINFO, - ("Entries whose names do not match their most specific addresses:", stats.m_mismatchedNames)); - LOG(LINFO, ("(End of stats.)")); + if (!sorted) + { + LOG(LINFO, ("Sorting entries...")); + sort(m_entries.begin(), m_entries.end()); + } } vector const & Hierarchy::GetEntries() const diff --git a/geocoder/hierarchy.hpp b/geocoder/hierarchy.hpp index fc78b9997c..89198bd7af 100644 --- a/geocoder/hierarchy.hpp +++ b/geocoder/hierarchy.hpp @@ -5,7 +5,6 @@ #include "base/geo_object_id.hpp" #include -#include #include #include #include @@ -21,31 +20,31 @@ public: struct ParsingStats { // Number of entries that the hierarchy was constructed from. - std::atomic m_numLoaded{0}; + uint64_t m_numLoaded{0}; // Number of corrupted json lines. - std::atomic m_badJsons{0}; + uint64_t m_badJsons{0}; // Number of entries with unreadable base::GeoObjectIds. - std::atomic m_badOsmIds{0}; + uint64_t m_badOsmIds{0}; // Number of base::GeoObjectsIds that occur as keys in at least two entries. - std::atomic m_duplicateOsmIds{0}; + uint64_t m_duplicateOsmIds{0}; // Number of entries with duplicate subfields in the address field. - std::atomic m_duplicateAddresses{0}; + uint64_t m_duplicateAddresses{0}; // Number of entries whose address field either does // not exist or consists of empty lines. - std::atomic m_emptyAddresses{0}; + uint64_t m_emptyAddresses{0}; // Number of entries without the name field or with an empty one. - std::atomic m_emptyNames{0}; + uint64_t m_emptyNames{0}; // Number of entries whose names do not match the most // specific parts of their addresses. // This is expected from POIs but not from regions or streets. - std::atomic m_mismatchedNames{0}; + uint64_t m_mismatchedNames{0}; }; // A single entry in the hierarchy directed acyclic graph. @@ -76,7 +75,7 @@ public: std::array(Type::Count) + 1> m_address; }; - explicit Hierarchy(std::string const & pathToJsonHierarchy, std::size_t readersCount = 4); + explicit Hierarchy(std::vector && entries, bool sorted); std::vector const & GetEntries() const; diff --git a/geocoder/hierarchy_reader.cpp b/geocoder/hierarchy_reader.cpp index 4dadc9be7b..f917d4bb6c 100644 --- a/geocoder/hierarchy_reader.cpp +++ b/geocoder/hierarchy_reader.cpp @@ -1,10 +1,6 @@ #include "geocoder/hierarchy_reader.hpp" #include "base/logging.hpp" -#include "base/string_utils.hpp" - -#include -#include #include #include @@ -17,38 +13,79 @@ namespace { // Information will be logged for every |kLogBatch| entries. size_t const kLogBatch = 100000; + +void operator += (Hierarchy::ParsingStats & accumulator, Hierarchy::ParsingStats & stats) +{ + struct ValidationStats + { + uint64_t m_numLoaded, m_badJsons, m_badOsmIds, m_duplicateOsmIds, m_duplicateAddresses, + m_emptyAddresses, m_emptyNames, m_mismatchedNames; + }; + static_assert(sizeof(Hierarchy::ParsingStats) == sizeof(ValidationStats), ""); + + accumulator.m_numLoaded += stats.m_numLoaded; + accumulator.m_badJsons += stats.m_badJsons; + accumulator.m_badOsmIds += stats.m_badOsmIds; + accumulator.m_duplicateOsmIds += stats.m_duplicateOsmIds; + accumulator.m_duplicateAddresses += stats.m_duplicateAddresses; + accumulator.m_emptyAddresses += stats.m_emptyAddresses; + accumulator.m_emptyNames += stats.m_emptyNames; + accumulator.m_mismatchedNames += stats.m_mismatchedNames; +} + } // namespace HierarchyReader::HierarchyReader(string const & pathToJsonHierarchy) + : m_fileStream{pathToJsonHierarchy}, m_in{m_fileStream} { - using namespace boost::iostreams; - - if (strings::EndsWith(pathToJsonHierarchy, ".gz")) - m_fileStream.push(gzip_decompressor()); - m_fileStream.push(file_source(pathToJsonHierarchy)); - if (!m_fileStream) MYTHROW(OpenException, ("Failed to open file", pathToJsonHierarchy)); } -vector HierarchyReader::ReadEntries(size_t readersCount, ParsingStats & stats) +HierarchyReader::HierarchyReader(std::istream & in) + : m_in{in} +{ +} + +Hierarchy HierarchyReader::Read(size_t readersCount) { LOG(LINFO, ("Reading entries...")); readersCount = min(readersCount, size_t{thread::hardware_concurrency()}); vector> taskEntries(readersCount); + vector tasksStats(readersCount); vector tasks{}; for (size_t t = 0; t < readersCount; ++t) - tasks.emplace_back(&HierarchyReader::ReadEntryMap, this, ref(taskEntries[t]), ref(stats)); + tasks.emplace_back(&HierarchyReader::ReadEntryMap, this, ref(taskEntries[t]), ref(tasksStats[t])); for (auto & reader : tasks) reader.join(); - if (stats.m_numLoaded % kLogBatch != 0) - LOG(LINFO, ("Read", stats.m_numLoaded, "entries")); + if (m_totalNumLoaded % kLogBatch != 0) + LOG(LINFO, ("Read", m_totalNumLoaded, "entries")); - return MergeEntries(taskEntries); + ParsingStats stats{}; + for (auto & readerStats : tasksStats) + stats += readerStats; + + auto entries = MergeEntries(taskEntries); + + CheckDuplicateOsmIds(entries, stats); + + LOG(LINFO, ("Finished reading and indexing the hierarchy. Stats:")); + LOG(LINFO, ("Entries loaded:", stats.m_numLoaded)); + LOG(LINFO, ("Corrupted json lines:", stats.m_badJsons)); + LOG(LINFO, ("Unreadable base::GeoObjectIds:", stats.m_badOsmIds)); + LOG(LINFO, ("Duplicate base::GeoObjectIds:", stats.m_duplicateOsmIds)); + LOG(LINFO, ("Entries with duplicate address parts:", stats.m_duplicateAddresses)); + LOG(LINFO, ("Entries without address:", stats.m_emptyAddresses)); + LOG(LINFO, ("Entries without names:", stats.m_emptyNames)); + LOG(LINFO, + ("Entries whose names do not match their most specific addresses:", stats.m_mismatchedNames)); + LOG(LINFO, ("(End of stats.)")); + + return Hierarchy{std::move(entries), true}; } vector HierarchyReader::MergeEntries(vector> & entryParts) @@ -90,6 +127,27 @@ vector HierarchyReader::MergeEntries(vector const & entries, + ParsingStats & stats) +{ + size_t i = 0; + while (i < entries.size()) + { + size_t j = i + 1; + while (j < entries.size() && entries[i].m_osmId == entries[j].m_osmId) + ++j; + if (j != i + 1) + { + ++stats.m_duplicateOsmIds; + // todo Remove the cast when the hierarchies no longer contain negative keys. + LOG(LDEBUG, + ("Duplicate osm id:", static_cast(entries[i].m_osmId.GetEncodedId()), "(", + entries[i].m_osmId, ")", "occurs as a key in", j - i, "key-value entries.")); + } + i = j; + } +} + void HierarchyReader::ReadEntryMap(multimap & entries, ParsingStats & stats) { // Temporary local object for efficient concurent processing (individual cache line for container). @@ -108,7 +166,7 @@ void HierarchyReader::ReadEntryMap(multimap & entries, for (; bufferSize < kLineBufferCapacity; ++bufferSize) { - if (!getline(m_fileStream, linesBuffer[bufferSize])) + if (!getline(m_in, linesBuffer[bufferSize])) break; } } @@ -153,10 +211,11 @@ void HierarchyReader::DeserializeEntryMap(vector const & linesBuffer, in if (entry.m_type == Type::Count) continue; - auto numLoaded = stats.m_numLoaded.fetch_add(1) + 1; + ++stats.m_numLoaded; - if (numLoaded % kLogBatch == 0) - LOG(LINFO, ("Read", numLoaded, "entries")); + auto totalNumLoaded = m_totalNumLoaded.fetch_add(1) + 1; + if (totalNumLoaded % kLogBatch == 0) + LOG(LINFO, ("Read", totalNumLoaded, "entries")); entries.emplace(osmId, move(entry)); } diff --git a/geocoder/hierarchy_reader.hpp b/geocoder/hierarchy_reader.hpp index 0d55aaf464..48ae156764 100644 --- a/geocoder/hierarchy_reader.hpp +++ b/geocoder/hierarchy_reader.hpp @@ -5,8 +5,8 @@ #include "base/exception.hpp" #include "base/geo_object_id.hpp" -#include - +#include +#include #include #include #include @@ -23,8 +23,9 @@ public: DECLARE_EXCEPTION(OpenException, RootException); explicit HierarchyReader(std::string const & pathToJsonHierarchy); + explicit HierarchyReader(std::istream & in); - std::vector ReadEntries(size_t readersCount, ParsingStats & stats); + Hierarchy Read(size_t readersCount = 4); private: void ReadEntryMap(std::multimap & entries, ParsingStats & stats); @@ -33,8 +34,11 @@ private: std::multimap & entries, ParsingStats & stats); std::vector MergeEntries(std::vector> & entryParts); + void CheckDuplicateOsmIds(std::vector const & entries, ParsingStats & stats); - boost::iostreams::filtering_istream m_fileStream; + std::ifstream m_fileStream; + std::istream & m_in; std::mutex m_mutex; + std::atomic m_totalNumLoaded{0}; }; } // namespace geocoder From 07ef24680d16929bbd900d65f8658b74382997c0 Mon Sep 17 00:00:00 2001 From: Anatoly Serdtcev Date: Tue, 29 Jan 2019 20:03:36 +0300 Subject: [PATCH 09/14] [geocoder] Concurrent index houses --- geocoder/index.cpp | 51 ++++++++++++++++++++++++++++++++-------------- 1 file changed, 36 insertions(+), 15 deletions(-) diff --git a/geocoder/index.cpp b/geocoder/index.cpp index 86e89ea0c7..f11e991c0a 100644 --- a/geocoder/index.cpp +++ b/geocoder/index.cpp @@ -8,7 +8,10 @@ #include "base/logging.hpp" #include "base/string_utils.hpp" +#include #include +#include +#include using namespace std; @@ -88,29 +91,47 @@ void Index::AddStreet(DocId const & docId, Index::Doc const & doc) void Index::AddHouses() { - size_t numIndexed = 0; - for (DocId docId = 0; docId < static_cast(m_docs.size()); ++docId) + atomic numIndexed{0}; + std::mutex mutex; + + vector threads(thread::hardware_concurrency()); + + for (size_t t = 0; t < threads.size(); ++t) { - auto const & buildingDoc = GetDoc(docId); + threads[t] = thread([&, t, this]() { + size_t const size = m_docs.size() / threads.size(); + size_t docId = t * size; + size_t const docIdEnd = (t + 1 == threads.size() ? m_docs.size() : docId + size); - if (buildingDoc.m_type != Type::Building) - continue; - - size_t const t = static_cast(Type::Street); - - ForEachDocId(buildingDoc.m_address[t], [&](DocId const & streetCandidate) { - auto const & streetDoc = GetDoc(streetCandidate); - if (streetDoc.IsParentTo(buildingDoc)) + for (; docId < docIdEnd; ++docId) { - m_buildingsOnStreet[streetCandidate].emplace_back(docId); + auto const & buildingDoc = GetDoc(docId); - ++numIndexed; - if (numIndexed % kLogBatch == 0) - LOG(LINFO, ("Indexed", numIndexed, "houses")); + if (buildingDoc.m_type != Type::Building) + continue; + + size_t const streetType = static_cast(Type::Street); + + ForEachDocId(buildingDoc.m_address[streetType], [&](DocId const & streetCandidate) { + auto const & streetDoc = GetDoc(streetCandidate); + + if (streetDoc.IsParentTo(buildingDoc)) + { + auto && lock = lock_guard(mutex); + m_buildingsOnStreet[streetCandidate].emplace_back(docId); + } + }); + + auto processedCount = numIndexed.fetch_add(1) + 1; + if (processedCount % kLogBatch == 0) + LOG(LINFO, ("Indexed", processedCount, "houses")); } }); } + for (auto & t : threads) + t.join(); + if (numIndexed % kLogBatch != 0) LOG(LINFO, ("Indexed", numIndexed, "houses")); } From 0e8477ad8bd0f5d6f0ce63a0e766f32ccc2885c4 Mon Sep 17 00:00:00 2001 From: Anatoly Serdtcev Date: Thu, 31 Jan 2019 19:32:25 +0300 Subject: [PATCH 10/14] [geocoder] Fix for review --- geocoder/geocoder.cpp | 4 ++-- geocoder/geocoder_tests/geocoder_tests.cpp | 2 +- geocoder/hierarchy.hpp | 16 +++++++-------- geocoder/hierarchy_reader.cpp | 23 +++++++++++----------- geocoder/hierarchy_reader.hpp | 5 +++-- geocoder/index.cpp | 10 ++++++++-- geocoder/index.hpp | 4 +++- 7 files changed, 37 insertions(+), 27 deletions(-) diff --git a/geocoder/geocoder.cpp b/geocoder/geocoder.cpp index 7d5784e17c..266d68c923 100644 --- a/geocoder/geocoder.cpp +++ b/geocoder/geocoder.cpp @@ -2,10 +2,10 @@ #include "geocoder/hierarchy_reader.hpp" -#include "indexer/search_string_utils.hpp" - #include "search/house_numbers_matcher.hpp" +#include "indexer/search_string_utils.hpp" + #include "base/assert.hpp" #include "base/logging.hpp" #include "base/scope_guard.hpp" diff --git a/geocoder/geocoder_tests/geocoder_tests.cpp b/geocoder/geocoder_tests/geocoder_tests.cpp index 27a9d8a5ba..1f0329e4d5 100644 --- a/geocoder/geocoder_tests/geocoder_tests.cpp +++ b/geocoder/geocoder_tests/geocoder_tests.cpp @@ -164,7 +164,7 @@ UNIT_TEST(Geocoder_EmptyFileConcurrentRead) UNIT_TEST(Geocoder_BigFileConcurrentRead) { - int const kEntryCount = 1000000; + int const kEntryCount = 100000; stringstream s; for (int i = 0; i < kEntryCount; ++i) diff --git a/geocoder/hierarchy.hpp b/geocoder/hierarchy.hpp index 89198bd7af..8579db75fb 100644 --- a/geocoder/hierarchy.hpp +++ b/geocoder/hierarchy.hpp @@ -20,31 +20,31 @@ public: struct ParsingStats { // Number of entries that the hierarchy was constructed from. - uint64_t m_numLoaded{0}; + uint64_t m_numLoaded = 0; // Number of corrupted json lines. - uint64_t m_badJsons{0}; + uint64_t m_badJsons = 0; // Number of entries with unreadable base::GeoObjectIds. - uint64_t m_badOsmIds{0}; + uint64_t m_badOsmIds = 0; // Number of base::GeoObjectsIds that occur as keys in at least two entries. - uint64_t m_duplicateOsmIds{0}; + uint64_t m_duplicateOsmIds = 0; // Number of entries with duplicate subfields in the address field. - uint64_t m_duplicateAddresses{0}; + uint64_t m_duplicateAddresses = 0; // Number of entries whose address field either does // not exist or consists of empty lines. - uint64_t m_emptyAddresses{0}; + uint64_t m_emptyAddresses = 0; // Number of entries without the name field or with an empty one. - uint64_t m_emptyNames{0}; + uint64_t m_emptyNames = 0; // Number of entries whose names do not match the most // specific parts of their addresses. // This is expected from POIs but not from regions or streets. - uint64_t m_mismatchedNames{0}; + uint64_t m_mismatchedNames = 0; }; // A single entry in the hierarchy directed acyclic graph. diff --git a/geocoder/hierarchy_reader.cpp b/geocoder/hierarchy_reader.cpp index f917d4bb6c..c86e60d490 100644 --- a/geocoder/hierarchy_reader.cpp +++ b/geocoder/hierarchy_reader.cpp @@ -14,7 +14,7 @@ namespace // Information will be logged for every |kLogBatch| entries. size_t const kLogBatch = 100000; -void operator += (Hierarchy::ParsingStats & accumulator, Hierarchy::ParsingStats & stats) +void operator+=(Hierarchy::ParsingStats & accumulator, Hierarchy::ParsingStats & stats) { struct ValidationStats { @@ -32,7 +32,6 @@ void operator += (Hierarchy::ParsingStats & accumulator, Hierarchy::ParsingStats accumulator.m_emptyNames += stats.m_emptyNames; accumulator.m_mismatchedNames += stats.m_mismatchedNames; } - } // namespace HierarchyReader::HierarchyReader(string const & pathToJsonHierarchy) @@ -47,11 +46,13 @@ HierarchyReader::HierarchyReader(std::istream & in) { } -Hierarchy HierarchyReader::Read(size_t readersCount) +Hierarchy HierarchyReader::Read(unsigned int readersCount) { LOG(LINFO, ("Reading entries...")); - readersCount = min(readersCount, size_t{thread::hardware_concurrency()}); + if (auto hardwareConcurrency = thread::hardware_concurrency()) + readersCount = min(hardwareConcurrency, readersCount); + readersCount = max(1U, readersCount); vector> taskEntries(readersCount); vector tasksStats(readersCount); @@ -109,7 +110,7 @@ vector HierarchyReader::MergeEntries(vector, ReferenceGreater> (entryParts.begin(), entryParts.end()); - while (partsQueue.size()) + while (!partsQueue.empty()) { auto & minPart = partsQueue.top().get(); partsQueue.pop(); @@ -120,7 +121,7 @@ vector HierarchyReader::MergeEntries(vector & entries, // Temporary local object for efficient concurent processing (individual cache line for container). auto localEntries = multimap{}; - int const kLineBufferCapacity = 10000; + size_t const kLineBufferCapacity = 10000; vector linesBuffer(kLineBufferCapacity); - int bufferSize = 0; + size_t bufferSize = 0; while (true) { @@ -180,10 +181,10 @@ void HierarchyReader::ReadEntryMap(multimap & entries, entries = move(localEntries); } -void HierarchyReader::DeserializeEntryMap(vector const & linesBuffer, int const bufferSize, +void HierarchyReader::DeserializeEntryMap(vector const & linesBuffer, size_t const bufferSize, multimap & entries, ParsingStats & stats) { - for (int i = 0; i < bufferSize; ++i) + for (size_t i = 0; i < bufferSize; ++i) { auto & line = linesBuffer[i]; @@ -220,4 +221,4 @@ void HierarchyReader::DeserializeEntryMap(vector const & linesBuffer, in entries.emplace(osmId, move(entry)); } } -} // namespace geocoder +} // namespace geocoder diff --git a/geocoder/hierarchy_reader.hpp b/geocoder/hierarchy_reader.hpp index 48ae156764..54b38073a5 100644 --- a/geocoder/hierarchy_reader.hpp +++ b/geocoder/hierarchy_reader.hpp @@ -25,12 +25,13 @@ public: explicit HierarchyReader(std::string const & pathToJsonHierarchy); explicit HierarchyReader(std::istream & in); - Hierarchy Read(size_t readersCount = 4); + // Read hierarchy file/stream concurrency in |readersCount| threads. + Hierarchy Read(unsigned int readersCount = 4); private: void ReadEntryMap(std::multimap & entries, ParsingStats & stats); - void DeserializeEntryMap(std::vector const & linesBuffer, int const bufferSize, + void DeserializeEntryMap(std::vector const & linesBuffer, std::size_t const bufferSize, std::multimap & entries, ParsingStats & stats); std::vector MergeEntries(std::vector> & entryParts); diff --git a/geocoder/index.cpp b/geocoder/index.cpp index f11e991c0a..7117805040 100644 --- a/geocoder/index.cpp +++ b/geocoder/index.cpp @@ -23,8 +23,13 @@ size_t const kLogBatch = 100000; namespace geocoder { -Index::Index(Hierarchy const & hierarchy) : m_docs(hierarchy.GetEntries()) +Index::Index(Hierarchy const & hierarchy, unsigned int processingThreadsCount) + : m_docs(hierarchy.GetEntries()), m_processingThreadsCount{processingThreadsCount} { + if (auto hardwareConcurrency = thread::hardware_concurrency()) + m_processingThreadsCount = min(hardwareConcurrency, m_processingThreadsCount); + m_processingThreadsCount = max(1U, m_processingThreadsCount); + LOG(LINFO, ("Indexing hierarchy entries...")); AddEntries(); LOG(LINFO, ("Indexing houses...")); @@ -94,7 +99,8 @@ void Index::AddHouses() atomic numIndexed{0}; std::mutex mutex; - vector threads(thread::hardware_concurrency()); + vector threads(m_processingThreadsCount); + CHECK_GREATER(threads.size(), 0, ()); for (size_t t = 0; t < threads.size(); ++t) { diff --git a/geocoder/index.hpp b/geocoder/index.hpp index 8a56fb4271..a2c96dc1ca 100644 --- a/geocoder/index.hpp +++ b/geocoder/index.hpp @@ -20,7 +20,7 @@ public: // that the index was constructed from. using DocId = std::vector::size_type; - explicit Index(Hierarchy const & hierarchy); + explicit Index(Hierarchy const & hierarchy, unsigned int processingThreadsCount = 4); Doc const & GetDoc(DocId const id) const; @@ -74,5 +74,7 @@ private: // Lists of houses grouped by the streets they belong to. std::unordered_map> m_buildingsOnStreet; + + unsigned int m_processingThreadsCount; }; } // namespace geocoder From 0236ae6c27940def02403af25b748ef1efaeec96 Mon Sep 17 00:00:00 2001 From: Anatoly Serdtcev Date: Fri, 1 Feb 2019 21:41:02 +0300 Subject: [PATCH 11/14] [geocoder] Fix for review --- geocoder/geocoder.cpp | 14 ++++++++++---- geocoder/geocoder.hpp | 9 +++++++-- geocoder/hierarchy_reader.cpp | 13 +++++++------ geocoder/hierarchy_reader.hpp | 5 +++-- geocoder/index.cpp | 16 ++++++++-------- geocoder/index.hpp | 8 ++++---- 6 files changed, 39 insertions(+), 26 deletions(-) diff --git a/geocoder/geocoder.cpp b/geocoder/geocoder.cpp index 266d68c923..ff7603296e 100644 --- a/geocoder/geocoder.cpp +++ b/geocoder/geocoder.cpp @@ -15,6 +15,7 @@ #include #include +#include #include using namespace std; @@ -195,13 +196,18 @@ vector & Geocoder::Context::GetLayers() { return m_layers; } vector const & Geocoder::Context::GetLayers() const { return m_layers; } // Geocoder ---------------------------------------------------------------------------------------- -Geocoder::Geocoder(string const & pathToJsonHierarchy) - : Geocoder(HierarchyReader(pathToJsonHierarchy).Read()) +Geocoder::Geocoder(string const & pathToJsonHierarchy, unsigned int loadThreadsCount) + : Geocoder{HierarchyReader{pathToJsonHierarchy}.Read(loadThreadsCount), loadThreadsCount} { } -Geocoder::Geocoder(Hierarchy && hierarchy) - : m_hierarchy(move(hierarchy)), m_index(m_hierarchy) +Geocoder::Geocoder(istream & jsonHierarchy, unsigned int loadThreadsCount) + : Geocoder{HierarchyReader{jsonHierarchy}.Read(loadThreadsCount), loadThreadsCount} +{ +} + +Geocoder::Geocoder(Hierarchy && hierarchy, unsigned int loadThreadsCount) + : m_hierarchy(move(hierarchy)), m_index(m_hierarchy, loadThreadsCount) { } diff --git a/geocoder/geocoder.hpp b/geocoder/geocoder.hpp index 24fcda0b9a..ff6542afdd 100644 --- a/geocoder/geocoder.hpp +++ b/geocoder/geocoder.hpp @@ -12,6 +12,7 @@ #include #include +#include #include #include #include @@ -118,8 +119,10 @@ public: std::vector m_layers; }; - explicit Geocoder(std::string const & pathToJsonHierarchy); - explicit Geocoder(Hierarchy && hierarchy); + explicit Geocoder(std::string const & pathToJsonHierarchy, + unsigned int loadThreadsCount = std::thread::hardware_concurrency()); + explicit Geocoder(std::istream & jsonHierarchy, + unsigned int loadThreadsCount = std::thread::hardware_concurrency()); void ProcessQuery(std::string const & query, std::vector & results) const; @@ -128,6 +131,8 @@ public: Index const & GetIndex() const; private: + explicit Geocoder(Hierarchy && hierarchy, unsigned int loadThreadsCount); + void Go(Context & ctx, Type type) const; void FillBuildingsLayer(Context & ctx, Tokens const & subquery, Layer & curLayer) const; diff --git a/geocoder/hierarchy_reader.cpp b/geocoder/hierarchy_reader.cpp index c86e60d490..14e06abcc0 100644 --- a/geocoder/hierarchy_reader.cpp +++ b/geocoder/hierarchy_reader.cpp @@ -2,6 +2,7 @@ #include "base/logging.hpp" +#include #include #include @@ -41,7 +42,7 @@ HierarchyReader::HierarchyReader(string const & pathToJsonHierarchy) MYTHROW(OpenException, ("Failed to open file", pathToJsonHierarchy)); } -HierarchyReader::HierarchyReader(std::istream & in) +HierarchyReader::HierarchyReader(istream & in) : m_in{in} { } @@ -86,7 +87,7 @@ Hierarchy HierarchyReader::Read(unsigned int readersCount) ("Entries whose names do not match their most specific addresses:", stats.m_mismatchedNames)); LOG(LINFO, ("(End of stats.)")); - return Hierarchy{std::move(entries), true}; + return Hierarchy{move(entries), true}; } vector HierarchyReader::MergeEntries(vector> & entryParts) @@ -104,18 +105,18 @@ vector HierarchyReader::MergeEntries(vector>; struct ReferenceGreater { - bool operator () (PartReference const & l, PartReference const & r) const noexcept + bool operator()(PartReference const & l, PartReference const & r) const noexcept { return l.get() > r.get(); } }; - auto partsQueue = priority_queue, ReferenceGreater> - (entryParts.begin(), entryParts.end()); + auto partsQueue = priority_queue, ReferenceGreater>( + entryParts.begin(), entryParts.end()); while (!partsQueue.empty()) { auto & minPart = partsQueue.top().get(); partsQueue.pop(); - while (minPart.size() && (partsQueue.empty() || minPart <= partsQueue.top().get())) + while (!minPart.empty() && (partsQueue.empty() || minPart <= partsQueue.top().get())) { entries.emplace_back(move(minPart.begin()->second)); minPart.erase(minPart.begin()); diff --git a/geocoder/hierarchy_reader.hpp b/geocoder/hierarchy_reader.hpp index 54b38073a5..e34b8c1a12 100644 --- a/geocoder/hierarchy_reader.hpp +++ b/geocoder/hierarchy_reader.hpp @@ -10,6 +10,7 @@ #include #include #include +#include #include namespace geocoder @@ -23,10 +24,10 @@ public: DECLARE_EXCEPTION(OpenException, RootException); explicit HierarchyReader(std::string const & pathToJsonHierarchy); - explicit HierarchyReader(std::istream & in); + explicit HierarchyReader(std::istream & jsonHierarchy); // Read hierarchy file/stream concurrency in |readersCount| threads. - Hierarchy Read(unsigned int readersCount = 4); + Hierarchy Read(unsigned int readersCount = std::thread::hardware_concurrency()); private: void ReadEntryMap(std::multimap & entries, ParsingStats & stats); diff --git a/geocoder/index.cpp b/geocoder/index.cpp index 7117805040..60c565bf49 100644 --- a/geocoder/index.cpp +++ b/geocoder/index.cpp @@ -23,17 +23,17 @@ size_t const kLogBatch = 100000; namespace geocoder { -Index::Index(Hierarchy const & hierarchy, unsigned int processingThreadsCount) - : m_docs(hierarchy.GetEntries()), m_processingThreadsCount{processingThreadsCount} +Index::Index(Hierarchy const & hierarchy, unsigned int loadThreadsCount) + : m_docs(hierarchy.GetEntries()) { if (auto hardwareConcurrency = thread::hardware_concurrency()) - m_processingThreadsCount = min(hardwareConcurrency, m_processingThreadsCount); - m_processingThreadsCount = max(1U, m_processingThreadsCount); + loadThreadsCount = min(hardwareConcurrency, loadThreadsCount); + loadThreadsCount = max(1U, loadThreadsCount); LOG(LINFO, ("Indexing hierarchy entries...")); AddEntries(); LOG(LINFO, ("Indexing houses...")); - AddHouses(); + AddHouses(loadThreadsCount); } Index::Doc const & Index::GetDoc(DocId const id) const @@ -94,12 +94,12 @@ void Index::AddStreet(DocId const & docId, Index::Doc const & doc) } } -void Index::AddHouses() +void Index::AddHouses(unsigned int loadThreadsCount) { atomic numIndexed{0}; - std::mutex mutex; + mutex mutex; - vector threads(m_processingThreadsCount); + vector threads(loadThreadsCount); CHECK_GREATER(threads.size(), 0, ()); for (size_t t = 0; t < threads.size(); ++t) diff --git a/geocoder/index.hpp b/geocoder/index.hpp index a2c96dc1ca..35e9e8d6cb 100644 --- a/geocoder/index.hpp +++ b/geocoder/index.hpp @@ -6,6 +6,7 @@ #include #include +#include #include #include @@ -20,7 +21,8 @@ public: // that the index was constructed from. using DocId = std::vector::size_type; - explicit Index(Hierarchy const & hierarchy, unsigned int processingThreadsCount = 4); + explicit Index(Hierarchy const & hierarchy, + unsigned int loadThreadsCount = std::thread::hardware_concurrency()); Doc const & GetDoc(DocId const id) const; @@ -66,7 +68,7 @@ private: void AddStreet(DocId const & docId, Doc const & e); // Fills the |m_buildingsOnStreet| field. - void AddHouses(); + void AddHouses(unsigned int loadThreadsCount); std::vector const & m_docs; @@ -74,7 +76,5 @@ private: // Lists of houses grouped by the streets they belong to. std::unordered_map> m_buildingsOnStreet; - - unsigned int m_processingThreadsCount; }; } // namespace geocoder From 2071998df7f548f18509d7f0297c6e9e67089802 Mon Sep 17 00:00:00 2001 From: Anatoly Serdtcev Date: Mon, 4 Feb 2019 11:05:59 +0300 Subject: [PATCH 12/14] [geocoder] Fix for review --- geocoder/hierarchy_reader.cpp | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/geocoder/hierarchy_reader.cpp b/geocoder/hierarchy_reader.cpp index 14e06abcc0..4b5ceb4bf3 100644 --- a/geocoder/hierarchy_reader.cpp +++ b/geocoder/hierarchy_reader.cpp @@ -22,7 +22,8 @@ void operator+=(Hierarchy::ParsingStats & accumulator, Hierarchy::ParsingStats & uint64_t m_numLoaded, m_badJsons, m_badOsmIds, m_duplicateOsmIds, m_duplicateAddresses, m_emptyAddresses, m_emptyNames, m_mismatchedNames; }; - static_assert(sizeof(Hierarchy::ParsingStats) == sizeof(ValidationStats), ""); + static_assert(sizeof(Hierarchy::ParsingStats) == sizeof(ValidationStats), + "Hierarchy::ParsingStats has modified"); accumulator.m_numLoaded += stats.m_numLoaded; accumulator.m_badJsons += stats.m_badJsons; From c2d0fc4d1324a3f3c231480ba5d0c6401aff61e2 Mon Sep 17 00:00:00 2001 From: Anatoly Serdtcev Date: Mon, 4 Feb 2019 14:00:43 +0300 Subject: [PATCH 13/14] [geocoder] Fix for review --- geocoder/hierarchy_reader.cpp | 2 +- geocoder/hierarchy_reader.hpp | 2 +- 2 files changed, 2 insertions(+), 2 deletions(-) diff --git a/geocoder/hierarchy_reader.cpp b/geocoder/hierarchy_reader.cpp index 4b5ceb4bf3..cee535707b 100644 --- a/geocoder/hierarchy_reader.cpp +++ b/geocoder/hierarchy_reader.cpp @@ -23,7 +23,7 @@ void operator+=(Hierarchy::ParsingStats & accumulator, Hierarchy::ParsingStats & m_emptyAddresses, m_emptyNames, m_mismatchedNames; }; static_assert(sizeof(Hierarchy::ParsingStats) == sizeof(ValidationStats), - "Hierarchy::ParsingStats has modified"); + "Hierarchy::ParsingStats has been modified"); accumulator.m_numLoaded += stats.m_numLoaded; accumulator.m_badJsons += stats.m_badJsons; diff --git a/geocoder/hierarchy_reader.hpp b/geocoder/hierarchy_reader.hpp index e34b8c1a12..d434fff607 100644 --- a/geocoder/hierarchy_reader.hpp +++ b/geocoder/hierarchy_reader.hpp @@ -26,7 +26,7 @@ public: explicit HierarchyReader(std::string const & pathToJsonHierarchy); explicit HierarchyReader(std::istream & jsonHierarchy); - // Read hierarchy file/stream concurrency in |readersCount| threads. + // Read hierarchy file/stream concurrently in |readersCount| threads. Hierarchy Read(unsigned int readersCount = std::thread::hardware_concurrency()); private: From d1ce5c1395b06a8de146dd7d4ec45ca63313df7c Mon Sep 17 00:00:00 2001 From: Anatoly Serdtcev Date: Fri, 8 Feb 2019 11:22:50 +0300 Subject: [PATCH 14/14] [geocoder] Fix for review --- geocoder/geocoder.hpp | 6 ++---- geocoder/hierarchy_reader.cpp | 26 ++++++++++++-------------- geocoder/hierarchy_reader.hpp | 2 +- geocoder/index.cpp | 4 +--- geocoder/index.hpp | 3 +-- 5 files changed, 17 insertions(+), 24 deletions(-) diff --git a/geocoder/geocoder.hpp b/geocoder/geocoder.hpp index ff6542afdd..8b4a74a95d 100644 --- a/geocoder/geocoder.hpp +++ b/geocoder/geocoder.hpp @@ -119,10 +119,8 @@ public: std::vector m_layers; }; - explicit Geocoder(std::string const & pathToJsonHierarchy, - unsigned int loadThreadsCount = std::thread::hardware_concurrency()); - explicit Geocoder(std::istream & jsonHierarchy, - unsigned int loadThreadsCount = std::thread::hardware_concurrency()); + explicit Geocoder(std::string const & pathToJsonHierarchy, unsigned int loadThreadsCount = 1); + explicit Geocoder(std::istream & jsonHierarchy, unsigned int loadThreadsCount = 1); void ProcessQuery(std::string const & query, std::vector & results) const; diff --git a/geocoder/hierarchy_reader.cpp b/geocoder/hierarchy_reader.cpp index cee535707b..c59ff8b356 100644 --- a/geocoder/hierarchy_reader.cpp +++ b/geocoder/hierarchy_reader.cpp @@ -25,14 +25,14 @@ void operator+=(Hierarchy::ParsingStats & accumulator, Hierarchy::ParsingStats & static_assert(sizeof(Hierarchy::ParsingStats) == sizeof(ValidationStats), "Hierarchy::ParsingStats has been modified"); - accumulator.m_numLoaded += stats.m_numLoaded; - accumulator.m_badJsons += stats.m_badJsons; - accumulator.m_badOsmIds += stats.m_badOsmIds; - accumulator.m_duplicateOsmIds += stats.m_duplicateOsmIds; - accumulator.m_duplicateAddresses += stats.m_duplicateAddresses; - accumulator.m_emptyAddresses += stats.m_emptyAddresses; - accumulator.m_emptyNames += stats.m_emptyNames; - accumulator.m_mismatchedNames += stats.m_mismatchedNames; + accumulator.m_numLoaded += stats.m_numLoaded; + accumulator.m_badJsons += stats.m_badJsons; + accumulator.m_badOsmIds += stats.m_badOsmIds; + accumulator.m_duplicateOsmIds += stats.m_duplicateOsmIds; + accumulator.m_duplicateAddresses += stats.m_duplicateAddresses; + accumulator.m_emptyAddresses += stats.m_emptyAddresses; + accumulator.m_emptyNames += stats.m_emptyNames; + accumulator.m_mismatchedNames += stats.m_mismatchedNames; } } // namespace @@ -50,11 +50,9 @@ HierarchyReader::HierarchyReader(istream & in) Hierarchy HierarchyReader::Read(unsigned int readersCount) { - LOG(LINFO, ("Reading entries...")); + CHECK_GREATER_OR_EQUAL(readersCount, 1, ()); - if (auto hardwareConcurrency = thread::hardware_concurrency()) - readersCount = min(hardwareConcurrency, readersCount); - readersCount = max(1U, readersCount); + LOG(LINFO, ("Reading entries...")); vector> taskEntries(readersCount); vector tasksStats(readersCount); @@ -142,7 +140,7 @@ void HierarchyReader::CheckDuplicateOsmIds(vector co if (j != i + 1) { ++stats.m_duplicateOsmIds; - // todo Remove the cast when the hierarchies no longer contain negative keys. + // TODO: Remove the cast when the hierarchies no longer contain negative keys. LOG(LDEBUG, ("Duplicate osm id:", static_cast(entries[i].m_osmId.GetEncodedId()), "(", entries[i].m_osmId, ")", "occurs as a key in", j - i, "key-value entries.")); @@ -204,7 +202,7 @@ void HierarchyReader::DeserializeEntryMap(vector const & linesBuffer, si auto json = line.substr(p + 1); Entry entry; - // todo(@m) We should really write uints as uints. + // TODO: (@m) We should really write uints as uints. auto const osmId = base::GeoObjectId(static_cast(encodedId)); entry.m_osmId = osmId; diff --git a/geocoder/hierarchy_reader.hpp b/geocoder/hierarchy_reader.hpp index d434fff607..19701659f0 100644 --- a/geocoder/hierarchy_reader.hpp +++ b/geocoder/hierarchy_reader.hpp @@ -27,7 +27,7 @@ public: explicit HierarchyReader(std::istream & jsonHierarchy); // Read hierarchy file/stream concurrently in |readersCount| threads. - Hierarchy Read(unsigned int readersCount = std::thread::hardware_concurrency()); + Hierarchy Read(unsigned int readersCount = 1); private: void ReadEntryMap(std::multimap & entries, ParsingStats & stats); diff --git a/geocoder/index.cpp b/geocoder/index.cpp index 60c565bf49..fa6bc2b178 100644 --- a/geocoder/index.cpp +++ b/geocoder/index.cpp @@ -26,9 +26,7 @@ namespace geocoder Index::Index(Hierarchy const & hierarchy, unsigned int loadThreadsCount) : m_docs(hierarchy.GetEntries()) { - if (auto hardwareConcurrency = thread::hardware_concurrency()) - loadThreadsCount = min(hardwareConcurrency, loadThreadsCount); - loadThreadsCount = max(1U, loadThreadsCount); + CHECK_GREATER_OR_EQUAL(loadThreadsCount, 1, ()); LOG(LINFO, ("Indexing hierarchy entries...")); AddEntries(); diff --git a/geocoder/index.hpp b/geocoder/index.hpp index 35e9e8d6cb..fb1cf51437 100644 --- a/geocoder/index.hpp +++ b/geocoder/index.hpp @@ -21,8 +21,7 @@ public: // that the index was constructed from. using DocId = std::vector::size_type; - explicit Index(Hierarchy const & hierarchy, - unsigned int loadThreadsCount = std::thread::hardware_concurrency()); + explicit Index(Hierarchy const & hierarchy, unsigned int loadThreadsCount = 1); Doc const & GetDoc(DocId const id) const;