diff --git a/geocoder/CMakeLists.txt b/geocoder/CMakeLists.txt index 3e50874841..5c82e4398b 100644 --- a/geocoder/CMakeLists.txt +++ b/geocoder/CMakeLists.txt @@ -8,6 +8,8 @@ set( geocoder.hpp hierarchy.cpp hierarchy.hpp + hierarchy_reader.cpp + hierarchy_reader.hpp index.cpp index.hpp result.cpp diff --git a/geocoder/geocoder.cpp b/geocoder/geocoder.cpp index 71af43ae7a..53c62df6a2 100644 --- a/geocoder/geocoder.cpp +++ b/geocoder/geocoder.cpp @@ -1,5 +1,7 @@ #include "geocoder/geocoder.hpp" +#include "geocoder/hierarchy_reader.hpp" + #include "search/house_numbers_matcher.hpp" #include "indexer/search_string_utils.hpp" @@ -13,6 +15,7 @@ #include #include +#include #include using namespace std; @@ -193,8 +196,18 @@ vector & Geocoder::Context::GetLayers() { return m_layers; } vector const & Geocoder::Context::GetLayers() const { return m_layers; } // Geocoder ---------------------------------------------------------------------------------------- -Geocoder::Geocoder(string const & pathToJsonHierarchy) - : m_hierarchy(pathToJsonHierarchy), m_index(m_hierarchy) +Geocoder::Geocoder(string const & pathToJsonHierarchy, unsigned int loadThreadsCount) + : Geocoder{HierarchyReader{pathToJsonHierarchy}.Read(loadThreadsCount), loadThreadsCount} +{ +} + +Geocoder::Geocoder(istream & jsonHierarchy, unsigned int loadThreadsCount) + : Geocoder{HierarchyReader{jsonHierarchy}.Read(loadThreadsCount), loadThreadsCount} +{ +} + +Geocoder::Geocoder(Hierarchy && hierarchy, unsigned int loadThreadsCount) + : m_hierarchy(move(hierarchy)), m_index(m_hierarchy, loadThreadsCount) { } diff --git a/geocoder/geocoder.hpp b/geocoder/geocoder.hpp index a829fbaa0f..27be2587ca 100644 --- a/geocoder/geocoder.hpp +++ b/geocoder/geocoder.hpp @@ -12,6 +12,7 @@ #include #include +#include #include #include #include @@ -118,7 +119,8 @@ public: std::vector m_layers; }; - explicit Geocoder(std::string const & pathToJsonHierarchy); + explicit Geocoder(std::string const & pathToJsonHierarchy, unsigned int loadThreadsCount = 1); + explicit Geocoder(std::istream & jsonHierarchy, unsigned int loadThreadsCount = 1); void ProcessQuery(std::string const & query, std::vector & results) const; @@ -127,6 +129,8 @@ public: Index const & GetIndex() const; private: + explicit Geocoder(Hierarchy && hierarchy, unsigned int loadThreadsCount); + void Go(Context & ctx, Type type) const; void FillBuildingsLayer(Context & ctx, Tokens const & subquery, Layer & curLayer) const; diff --git a/geocoder/geocoder_tests/geocoder_tests.cpp b/geocoder/geocoder_tests/geocoder_tests.cpp index c583598bbe..d8568dcd7c 100644 --- a/geocoder/geocoder_tests/geocoder_tests.cpp +++ b/geocoder/geocoder_tests/geocoder_tests.cpp @@ -1,6 +1,7 @@ #include "testing/testing.hpp" #include "geocoder/geocoder.hpp" +#include "geocoder/hierarchy_reader.hpp" #include "indexer/search_string_utils.hpp" @@ -170,4 +171,33 @@ UNIT_TEST(Geocoder_LocalityBuilding) TestGeocoder(geocoder, "Zelenograd 2", {{building2, 1.0}}); } + +UNIT_TEST(Geocoder_EmptyFileConcurrentRead) +{ + ScopedFile const regionsJsonFile("regions.jsonl", ""); + Geocoder geocoder(regionsJsonFile.GetFullPath(), 8 /* reader threads */); + + TEST_EQUAL(geocoder.GetHierarchy().GetEntries().size(), 0, ()); +} + +UNIT_TEST(Geocoder_BigFileConcurrentRead) +{ + int const kEntryCount = 100000; + + stringstream s; + for (int i = 0; i < kEntryCount; ++i) + { + s << i << " " + << "{" + << R"("type": "Feature",)" + << R"("geometry": {"type": "Point", "coordinates": [0, 0]},)" + << R"("properties": {"name": ")" << i << R"(", "rank": 2, "address": {"country": ")" << i << R"("}})" + << "}\n"; + } + + ScopedFile const regionsJsonFile("regions.jsonl", s.str()); + Geocoder geocoder(regionsJsonFile.GetFullPath(), 8 /* reader threads */); + + TEST_EQUAL(geocoder.GetHierarchy().GetEntries().size(), kEntryCount, ()); +} } // namespace geocoder diff --git a/geocoder/hierarchy.cpp b/geocoder/hierarchy.cpp index 43e334b0bf..d606f3459c 100644 --- a/geocoder/hierarchy.cpp +++ b/geocoder/hierarchy.cpp @@ -9,38 +9,10 @@ #include "base/string_utils.hpp" #include -#include #include using namespace std; -namespace -{ -// Information will be logged for every |kLogBatch| entries. -size_t const kLogBatch = 100000; - -void CheckDuplicateOsmIds(vector const & entries, - geocoder::Hierarchy::ParsingStats & stats) -{ - size_t i = 0; - while (i < entries.size()) - { - size_t j = i + 1; - while (j < entries.size() && entries[i].m_osmId == entries[j].m_osmId) - ++j; - if (j != i + 1) - { - ++stats.m_duplicateOsmIds; - // todo Remove the cast when the hierarchies no longer contain negative keys. - LOG(LDEBUG, - ("Duplicate osm id:", static_cast(entries[i].m_osmId.GetEncodedId()), "(", - entries[i].m_osmId, ")", "occurs as a key in", j - i, "key-value entries.")); - } - i = j; - } -} -} // namespace - namespace geocoder { // Hierarchy::Entry -------------------------------------------------------------------------------- @@ -138,64 +110,14 @@ bool Hierarchy::Entry::IsParentTo(Hierarchy::Entry const & e) const } // Hierarchy --------------------------------------------------------------------------------------- -Hierarchy::Hierarchy(string const & pathToJsonHierarchy) +Hierarchy::Hierarchy(vector && entries, bool sorted) + : m_entries{std::move(entries)} { - ifstream ifs(pathToJsonHierarchy); - string line; - ParsingStats stats; - - LOG(LINFO, ("Reading entries...")); - while (getline(ifs, line)) + if (!sorted) { - if (line.empty()) - continue; - - auto const i = line.find(' '); - int64_t encodedId; - if (i == string::npos || !strings::to_any(line.substr(0, i), encodedId)) - { - LOG(LWARNING, ("Cannot read osm id. Line:", line)); - ++stats.m_badOsmIds; - continue; - } - line = line.substr(i + 1); - - Entry entry; - // todo(@m) We should really write uints as uints. - entry.m_osmId = base::GeoObjectId(static_cast(encodedId)); - - if (!entry.DeserializeFromJSON(line, stats)) - continue; - - if (entry.m_type == Type::Count) - continue; - - ++stats.m_numLoaded; - if (stats.m_numLoaded % kLogBatch == 0) - LOG(LINFO, ("Read", stats.m_numLoaded, "entries")); - - m_entries.emplace_back(move(entry)); + LOG(LINFO, ("Sorting entries...")); + sort(m_entries.begin(), m_entries.end()); } - - if (stats.m_numLoaded % kLogBatch != 0) - LOG(LINFO, ("Read", stats.m_numLoaded, "entries")); - - LOG(LINFO, ("Sorting entries...")); - sort(m_entries.begin(), m_entries.end()); - - CheckDuplicateOsmIds(m_entries, stats); - - LOG(LINFO, ("Finished reading and indexing the hierarchy. Stats:")); - LOG(LINFO, ("Entries loaded:", stats.m_numLoaded)); - LOG(LINFO, ("Corrupted json lines:", stats.m_badJsons)); - LOG(LINFO, ("Unreadable base::GeoObjectIds:", stats.m_badOsmIds)); - LOG(LINFO, ("Duplicate base::GeoObjectIds:", stats.m_duplicateOsmIds)); - LOG(LINFO, ("Entries with duplicate address parts:", stats.m_duplicateAddresses)); - LOG(LINFO, ("Entries without address:", stats.m_emptyAddresses)); - LOG(LINFO, ("Entries without names:", stats.m_emptyNames)); - LOG(LINFO, - ("Entries whose names do not match their most specific addresses:", stats.m_mismatchedNames)); - LOG(LINFO, ("(End of stats.)")); } vector const & Hierarchy::GetEntries() const diff --git a/geocoder/hierarchy.hpp b/geocoder/hierarchy.hpp index bae7d2fc38..8579db75fb 100644 --- a/geocoder/hierarchy.hpp +++ b/geocoder/hierarchy.hpp @@ -75,7 +75,7 @@ public: std::array(Type::Count) + 1> m_address; }; - explicit Hierarchy(std::string const & pathToJsonHierarchy); + explicit Hierarchy(std::vector && entries, bool sorted); std::vector const & GetEntries() const; diff --git a/geocoder/hierarchy_reader.cpp b/geocoder/hierarchy_reader.cpp new file mode 100644 index 0000000000..c59ff8b356 --- /dev/null +++ b/geocoder/hierarchy_reader.cpp @@ -0,0 +1,224 @@ +#include "geocoder/hierarchy_reader.hpp" + +#include "base/logging.hpp" + +#include +#include +#include + +using namespace std; + +namespace geocoder +{ +namespace +{ +// Information will be logged for every |kLogBatch| entries. +size_t const kLogBatch = 100000; + +void operator+=(Hierarchy::ParsingStats & accumulator, Hierarchy::ParsingStats & stats) +{ + struct ValidationStats + { + uint64_t m_numLoaded, m_badJsons, m_badOsmIds, m_duplicateOsmIds, m_duplicateAddresses, + m_emptyAddresses, m_emptyNames, m_mismatchedNames; + }; + static_assert(sizeof(Hierarchy::ParsingStats) == sizeof(ValidationStats), + "Hierarchy::ParsingStats has been modified"); + + accumulator.m_numLoaded += stats.m_numLoaded; + accumulator.m_badJsons += stats.m_badJsons; + accumulator.m_badOsmIds += stats.m_badOsmIds; + accumulator.m_duplicateOsmIds += stats.m_duplicateOsmIds; + accumulator.m_duplicateAddresses += stats.m_duplicateAddresses; + accumulator.m_emptyAddresses += stats.m_emptyAddresses; + accumulator.m_emptyNames += stats.m_emptyNames; + accumulator.m_mismatchedNames += stats.m_mismatchedNames; +} +} // namespace + +HierarchyReader::HierarchyReader(string const & pathToJsonHierarchy) + : m_fileStream{pathToJsonHierarchy}, m_in{m_fileStream} +{ + if (!m_fileStream) + MYTHROW(OpenException, ("Failed to open file", pathToJsonHierarchy)); +} + +HierarchyReader::HierarchyReader(istream & in) + : m_in{in} +{ +} + +Hierarchy HierarchyReader::Read(unsigned int readersCount) +{ + CHECK_GREATER_OR_EQUAL(readersCount, 1, ()); + + LOG(LINFO, ("Reading entries...")); + + vector> taskEntries(readersCount); + vector tasksStats(readersCount); + vector tasks{}; + for (size_t t = 0; t < readersCount; ++t) + tasks.emplace_back(&HierarchyReader::ReadEntryMap, this, ref(taskEntries[t]), ref(tasksStats[t])); + + for (auto & reader : tasks) + reader.join(); + + if (m_totalNumLoaded % kLogBatch != 0) + LOG(LINFO, ("Read", m_totalNumLoaded, "entries")); + + ParsingStats stats{}; + for (auto & readerStats : tasksStats) + stats += readerStats; + + auto entries = MergeEntries(taskEntries); + + CheckDuplicateOsmIds(entries, stats); + + LOG(LINFO, ("Finished reading and indexing the hierarchy. Stats:")); + LOG(LINFO, ("Entries loaded:", stats.m_numLoaded)); + LOG(LINFO, ("Corrupted json lines:", stats.m_badJsons)); + LOG(LINFO, ("Unreadable base::GeoObjectIds:", stats.m_badOsmIds)); + LOG(LINFO, ("Duplicate base::GeoObjectIds:", stats.m_duplicateOsmIds)); + LOG(LINFO, ("Entries with duplicate address parts:", stats.m_duplicateAddresses)); + LOG(LINFO, ("Entries without address:", stats.m_emptyAddresses)); + LOG(LINFO, ("Entries without names:", stats.m_emptyNames)); + LOG(LINFO, + ("Entries whose names do not match their most specific addresses:", stats.m_mismatchedNames)); + LOG(LINFO, ("(End of stats.)")); + + return Hierarchy{move(entries), true}; +} + +vector HierarchyReader::MergeEntries(vector> & entryParts) +{ + auto entries = vector{}; + + size_t size{0}; + for (auto const & map : entryParts) + size += map.size(); + + entries.reserve(size); + + LOG(LINFO, ("Merging entries...")); + + using PartReference = reference_wrapper>; + struct ReferenceGreater + { + bool operator()(PartReference const & l, PartReference const & r) const noexcept + { return l.get() > r.get(); } + }; + + auto partsQueue = priority_queue, ReferenceGreater>( + entryParts.begin(), entryParts.end()); + while (!partsQueue.empty()) + { + auto & minPart = partsQueue.top().get(); + partsQueue.pop(); + + while (!minPart.empty() && (partsQueue.empty() || minPart <= partsQueue.top().get())) + { + entries.emplace_back(move(minPart.begin()->second)); + minPart.erase(minPart.begin()); + } + + if (!minPart.empty()) + partsQueue.push(ref(minPart)); + } + + return entries; +} + +void HierarchyReader::CheckDuplicateOsmIds(vector const & entries, + ParsingStats & stats) +{ + size_t i = 0; + while (i < entries.size()) + { + size_t j = i + 1; + while (j < entries.size() && entries[i].m_osmId == entries[j].m_osmId) + ++j; + if (j != i + 1) + { + ++stats.m_duplicateOsmIds; + // TODO: Remove the cast when the hierarchies no longer contain negative keys. + LOG(LDEBUG, + ("Duplicate osm id:", static_cast(entries[i].m_osmId.GetEncodedId()), "(", + entries[i].m_osmId, ")", "occurs as a key in", j - i, "key-value entries.")); + } + i = j; + } +} + +void HierarchyReader::ReadEntryMap(multimap & entries, ParsingStats & stats) +{ + // Temporary local object for efficient concurent processing (individual cache line for container). + auto localEntries = multimap{}; + + size_t const kLineBufferCapacity = 10000; + vector linesBuffer(kLineBufferCapacity); + size_t bufferSize = 0; + + while (true) + { + bufferSize = 0; + + { + auto && lock = lock_guard(m_mutex); + + for (; bufferSize < kLineBufferCapacity; ++bufferSize) + { + if (!getline(m_in, linesBuffer[bufferSize])) + break; + } + } + + if (!bufferSize) + break; + + DeserializeEntryMap(linesBuffer, bufferSize, localEntries, stats); + } + + entries = move(localEntries); +} + +void HierarchyReader::DeserializeEntryMap(vector const & linesBuffer, size_t const bufferSize, + multimap & entries, ParsingStats & stats) +{ + for (size_t i = 0; i < bufferSize; ++i) + { + auto & line = linesBuffer[i]; + + if (line.empty()) + continue; + + auto const p = line.find(' '); + int64_t encodedId; + if (p == string::npos || !strings::to_any(line.substr(0, p), encodedId)) + { + LOG(LWARNING, ("Cannot read osm id. Line:", line)); + ++stats.m_badOsmIds; + continue; + } + auto json = line.substr(p + 1); + + Entry entry; + // TODO: (@m) We should really write uints as uints. + auto const osmId = base::GeoObjectId(static_cast(encodedId)); + entry.m_osmId = osmId; + + if (!entry.DeserializeFromJSON(json, stats)) + continue; + + if (entry.m_type == Type::Count) + continue; + + ++stats.m_numLoaded; + + auto totalNumLoaded = m_totalNumLoaded.fetch_add(1) + 1; + if (totalNumLoaded % kLogBatch == 0) + LOG(LINFO, ("Read", totalNumLoaded, "entries")); + + entries.emplace(osmId, move(entry)); + } +} +} // namespace geocoder diff --git a/geocoder/hierarchy_reader.hpp b/geocoder/hierarchy_reader.hpp new file mode 100644 index 0000000000..19701659f0 --- /dev/null +++ b/geocoder/hierarchy_reader.hpp @@ -0,0 +1,46 @@ +#pragma once + +#include "geocoder/hierarchy.hpp" + +#include "base/exception.hpp" +#include "base/geo_object_id.hpp" + +#include +#include +#include +#include +#include +#include +#include + +namespace geocoder +{ +class HierarchyReader +{ +public: + using Entry = Hierarchy::Entry; + using ParsingStats = Hierarchy::ParsingStats; + + DECLARE_EXCEPTION(OpenException, RootException); + + explicit HierarchyReader(std::string const & pathToJsonHierarchy); + explicit HierarchyReader(std::istream & jsonHierarchy); + + // Read hierarchy file/stream concurrently in |readersCount| threads. + Hierarchy Read(unsigned int readersCount = 1); + +private: + void ReadEntryMap(std::multimap & entries, ParsingStats & stats); + + void DeserializeEntryMap(std::vector const & linesBuffer, std::size_t const bufferSize, + std::multimap & entries, ParsingStats & stats); + + std::vector MergeEntries(std::vector> & entryParts); + void CheckDuplicateOsmIds(std::vector const & entries, ParsingStats & stats); + + std::ifstream m_fileStream; + std::istream & m_in; + std::mutex m_mutex; + std::atomic m_totalNumLoaded{0}; +}; +} // namespace geocoder diff --git a/geocoder/index.cpp b/geocoder/index.cpp index e6dfcce2aa..622edf34da 100644 --- a/geocoder/index.cpp +++ b/geocoder/index.cpp @@ -8,7 +8,10 @@ #include "base/logging.hpp" #include "base/string_utils.hpp" +#include #include +#include +#include using namespace std; @@ -20,12 +23,15 @@ size_t const kLogBatch = 100000; namespace geocoder { -Index::Index(Hierarchy const & hierarchy) : m_docs(hierarchy.GetEntries()) +Index::Index(Hierarchy const & hierarchy, unsigned int loadThreadsCount) + : m_docs(hierarchy.GetEntries()) { + CHECK_GREATER_OR_EQUAL(loadThreadsCount, 1, ()); + LOG(LINFO, ("Indexing hierarchy entries...")); AddEntries(); LOG(LINFO, ("Indexing houses...")); - AddHouses(); + AddHouses(loadThreadsCount); } Index::Doc const & Index::GetDoc(DocId const id) const @@ -89,42 +95,60 @@ void Index::AddStreet(DocId const & docId, Index::Doc const & doc) } } -void Index::AddHouses() +void Index::AddHouses(unsigned int loadThreadsCount) { - size_t numIndexed = 0; - for (DocId docId = 0; docId < static_cast(m_docs.size()); ++docId) + atomic numIndexed{0}; + mutex mutex; + + vector threads(loadThreadsCount); + CHECK_GREATER(threads.size(), 0, ()); + + for (size_t t = 0; t < threads.size(); ++t) { - auto const & buildingDoc = GetDoc(docId); + threads[t] = thread([&, t, this]() { + size_t const size = m_docs.size() / threads.size(); + size_t docId = t * size; + size_t const docIdEnd = (t + 1 == threads.size() ? m_docs.size() : docId + size); - if (buildingDoc.m_type != Type::Building) - continue; - - auto const & street = buildingDoc.m_address[static_cast(Type::Street)]; - auto const & locality = buildingDoc.m_address[static_cast(Type::Locality)]; - - Tokens const * relationName = nullptr; - - if (!street.empty()) - relationName = &street; - else if (!locality.empty()) - relationName = &locality; - - if (!relationName) - continue; - - ForEachDocId(*relationName, [&](DocId const & candidate) { - auto const & candidateDoc = GetDoc(candidate); - if (candidateDoc.IsParentTo(buildingDoc)) + for (; docId < docIdEnd; ++docId) { - m_relatedBuildings[candidate].emplace_back(docId); + auto const & buildingDoc = GetDoc(docId); - ++numIndexed; - if (numIndexed % kLogBatch == 0) - LOG(LINFO, ("Indexed", numIndexed, "houses")); + if (buildingDoc.m_type != Type::Building) + continue; + + auto const & street = buildingDoc.m_address[static_cast(Type::Street)]; + auto const & locality = buildingDoc.m_address[static_cast(Type::Locality)]; + + Tokens const * relationName = nullptr; + + if (!street.empty()) + relationName = &street; + else if (!locality.empty()) + relationName = &locality; + + if (!relationName) + continue; + + ForEachDocId(*relationName, [&](DocId const & candidate) { + auto const & candidateDoc = GetDoc(candidate); + if (candidateDoc.IsParentTo(buildingDoc)) + { + auto && lock = lock_guard(mutex); + m_relatedBuildings[candidate].emplace_back(docId); + } + }); + + auto processedCount = numIndexed.fetch_add(1) + 1; + if (processedCount % kLogBatch == 0) + LOG(LINFO, ("Indexed", processedCount, "houses")); } }); } + for (auto & t : threads) + t.join(); + if (numIndexed % kLogBatch != 0) LOG(LINFO, ("Indexed", numIndexed, "houses")); } diff --git a/geocoder/index.hpp b/geocoder/index.hpp index 6f23fe39af..34a5941384 100644 --- a/geocoder/index.hpp +++ b/geocoder/index.hpp @@ -6,6 +6,7 @@ #include #include +#include #include #include @@ -20,7 +21,7 @@ public: // that the index was constructed from. using DocId = std::vector::size_type; - explicit Index(Hierarchy const & hierarchy); + explicit Index(Hierarchy const & hierarchy, unsigned int loadThreadsCount = 1); Doc const & GetDoc(DocId const id) const; @@ -66,7 +67,7 @@ private: void AddStreet(DocId const & docId, Doc const & e); // Fills the |m_relatedBuildings| field. - void AddHouses(); + void AddHouses(unsigned int loadThreadsCount); std::vector const & m_docs;