Merge branch 'geocoder.hierarchy-concurrent-read' of https://github.com/cc-engineering/omim into cc-engineering-geocoder.hierarchy-concurrent-read

This commit is contained in:
Anatoly Serdtcev 2019-02-08 17:15:55 +03:00
commit eba53b3adc
10 changed files with 384 additions and 118 deletions

View file

@ -8,6 +8,8 @@ set(
geocoder.hpp
hierarchy.cpp
hierarchy.hpp
hierarchy_reader.cpp
hierarchy_reader.hpp
index.cpp
index.hpp
result.cpp

View file

@ -1,5 +1,7 @@
#include "geocoder/geocoder.hpp"
#include "geocoder/hierarchy_reader.hpp"
#include "search/house_numbers_matcher.hpp"
#include "indexer/search_string_utils.hpp"
@ -13,6 +15,7 @@
#include <algorithm>
#include <set>
#include <thread>
#include <utility>
using namespace std;
@ -193,8 +196,18 @@ vector<Geocoder::Layer> & Geocoder::Context::GetLayers() { return m_layers; }
vector<Geocoder::Layer> const & Geocoder::Context::GetLayers() const { return m_layers; }
// Geocoder ----------------------------------------------------------------------------------------
Geocoder::Geocoder(string const & pathToJsonHierarchy)
: m_hierarchy(pathToJsonHierarchy), m_index(m_hierarchy)
Geocoder::Geocoder(string const & pathToJsonHierarchy, unsigned int loadThreadsCount)
: Geocoder{HierarchyReader{pathToJsonHierarchy}.Read(loadThreadsCount), loadThreadsCount}
{
}
Geocoder::Geocoder(istream & jsonHierarchy, unsigned int loadThreadsCount)
: Geocoder{HierarchyReader{jsonHierarchy}.Read(loadThreadsCount), loadThreadsCount}
{
}
Geocoder::Geocoder(Hierarchy && hierarchy, unsigned int loadThreadsCount)
: m_hierarchy(move(hierarchy)), m_index(m_hierarchy, loadThreadsCount)
{
}

View file

@ -12,6 +12,7 @@
#include <cstddef>
#include <string>
#include <thread>
#include <unordered_map>
#include <utility>
#include <vector>
@ -118,7 +119,8 @@ public:
std::vector<Layer> m_layers;
};
explicit Geocoder(std::string const & pathToJsonHierarchy);
explicit Geocoder(std::string const & pathToJsonHierarchy, unsigned int loadThreadsCount = 1);
explicit Geocoder(std::istream & jsonHierarchy, unsigned int loadThreadsCount = 1);
void ProcessQuery(std::string const & query, std::vector<Result> & results) const;
@ -127,6 +129,8 @@ public:
Index const & GetIndex() const;
private:
explicit Geocoder(Hierarchy && hierarchy, unsigned int loadThreadsCount);
void Go(Context & ctx, Type type) const;
void FillBuildingsLayer(Context & ctx, Tokens const & subquery, Layer & curLayer) const;

View file

@ -1,6 +1,7 @@
#include "testing/testing.hpp"
#include "geocoder/geocoder.hpp"
#include "geocoder/hierarchy_reader.hpp"
#include "indexer/search_string_utils.hpp"
@ -170,4 +171,33 @@ UNIT_TEST(Geocoder_LocalityBuilding)
TestGeocoder(geocoder, "Zelenograd 2", {{building2, 1.0}});
}
UNIT_TEST(Geocoder_EmptyFileConcurrentRead)
{
ScopedFile const regionsJsonFile("regions.jsonl", "");
Geocoder geocoder(regionsJsonFile.GetFullPath(), 8 /* reader threads */);
TEST_EQUAL(geocoder.GetHierarchy().GetEntries().size(), 0, ());
}
UNIT_TEST(Geocoder_BigFileConcurrentRead)
{
int const kEntryCount = 100000;
stringstream s;
for (int i = 0; i < kEntryCount; ++i)
{
s << i << " "
<< "{"
<< R"("type": "Feature",)"
<< R"("geometry": {"type": "Point", "coordinates": [0, 0]},)"
<< R"("properties": {"name": ")" << i << R"(", "rank": 2, "address": {"country": ")" << i << R"("}})"
<< "}\n";
}
ScopedFile const regionsJsonFile("regions.jsonl", s.str());
Geocoder geocoder(regionsJsonFile.GetFullPath(), 8 /* reader threads */);
TEST_EQUAL(geocoder.GetHierarchy().GetEntries().size(), kEntryCount, ());
}
} // namespace geocoder

View file

@ -9,38 +9,10 @@
#include "base/string_utils.hpp"
#include <algorithm>
#include <fstream>
#include <utility>
using namespace std;
namespace
{
// Information will be logged for every |kLogBatch| entries.
size_t const kLogBatch = 100000;
void CheckDuplicateOsmIds(vector<geocoder::Hierarchy::Entry> const & entries,
geocoder::Hierarchy::ParsingStats & stats)
{
size_t i = 0;
while (i < entries.size())
{
size_t j = i + 1;
while (j < entries.size() && entries[i].m_osmId == entries[j].m_osmId)
++j;
if (j != i + 1)
{
++stats.m_duplicateOsmIds;
// todo Remove the cast when the hierarchies no longer contain negative keys.
LOG(LDEBUG,
("Duplicate osm id:", static_cast<int64_t>(entries[i].m_osmId.GetEncodedId()), "(",
entries[i].m_osmId, ")", "occurs as a key in", j - i, "key-value entries."));
}
i = j;
}
}
} // namespace
namespace geocoder
{
// Hierarchy::Entry --------------------------------------------------------------------------------
@ -138,64 +110,14 @@ bool Hierarchy::Entry::IsParentTo(Hierarchy::Entry const & e) const
}
// Hierarchy ---------------------------------------------------------------------------------------
Hierarchy::Hierarchy(string const & pathToJsonHierarchy)
Hierarchy::Hierarchy(vector<Entry> && entries, bool sorted)
: m_entries{std::move(entries)}
{
ifstream ifs(pathToJsonHierarchy);
string line;
ParsingStats stats;
LOG(LINFO, ("Reading entries..."));
while (getline(ifs, line))
if (!sorted)
{
if (line.empty())
continue;
auto const i = line.find(' ');
int64_t encodedId;
if (i == string::npos || !strings::to_any(line.substr(0, i), encodedId))
{
LOG(LWARNING, ("Cannot read osm id. Line:", line));
++stats.m_badOsmIds;
continue;
}
line = line.substr(i + 1);
Entry entry;
// todo(@m) We should really write uints as uints.
entry.m_osmId = base::GeoObjectId(static_cast<uint64_t>(encodedId));
if (!entry.DeserializeFromJSON(line, stats))
continue;
if (entry.m_type == Type::Count)
continue;
++stats.m_numLoaded;
if (stats.m_numLoaded % kLogBatch == 0)
LOG(LINFO, ("Read", stats.m_numLoaded, "entries"));
m_entries.emplace_back(move(entry));
LOG(LINFO, ("Sorting entries..."));
sort(m_entries.begin(), m_entries.end());
}
if (stats.m_numLoaded % kLogBatch != 0)
LOG(LINFO, ("Read", stats.m_numLoaded, "entries"));
LOG(LINFO, ("Sorting entries..."));
sort(m_entries.begin(), m_entries.end());
CheckDuplicateOsmIds(m_entries, stats);
LOG(LINFO, ("Finished reading and indexing the hierarchy. Stats:"));
LOG(LINFO, ("Entries loaded:", stats.m_numLoaded));
LOG(LINFO, ("Corrupted json lines:", stats.m_badJsons));
LOG(LINFO, ("Unreadable base::GeoObjectIds:", stats.m_badOsmIds));
LOG(LINFO, ("Duplicate base::GeoObjectIds:", stats.m_duplicateOsmIds));
LOG(LINFO, ("Entries with duplicate address parts:", stats.m_duplicateAddresses));
LOG(LINFO, ("Entries without address:", stats.m_emptyAddresses));
LOG(LINFO, ("Entries without names:", stats.m_emptyNames));
LOG(LINFO,
("Entries whose names do not match their most specific addresses:", stats.m_mismatchedNames));
LOG(LINFO, ("(End of stats.)"));
}
vector<Hierarchy::Entry> const & Hierarchy::GetEntries() const

View file

@ -75,7 +75,7 @@ public:
std::array<Tokens, static_cast<size_t>(Type::Count) + 1> m_address;
};
explicit Hierarchy(std::string const & pathToJsonHierarchy);
explicit Hierarchy(std::vector<Entry> && entries, bool sorted);
std::vector<Entry> const & GetEntries() const;

View file

@ -0,0 +1,224 @@
#include "geocoder/hierarchy_reader.hpp"
#include "base/logging.hpp"
#include <algorithm>
#include <queue>
#include <thread>
using namespace std;
namespace geocoder
{
namespace
{
// Information will be logged for every |kLogBatch| entries.
size_t const kLogBatch = 100000;
void operator+=(Hierarchy::ParsingStats & accumulator, Hierarchy::ParsingStats & stats)
{
struct ValidationStats
{
uint64_t m_numLoaded, m_badJsons, m_badOsmIds, m_duplicateOsmIds, m_duplicateAddresses,
m_emptyAddresses, m_emptyNames, m_mismatchedNames;
};
static_assert(sizeof(Hierarchy::ParsingStats) == sizeof(ValidationStats),
"Hierarchy::ParsingStats has been modified");
accumulator.m_numLoaded += stats.m_numLoaded;
accumulator.m_badJsons += stats.m_badJsons;
accumulator.m_badOsmIds += stats.m_badOsmIds;
accumulator.m_duplicateOsmIds += stats.m_duplicateOsmIds;
accumulator.m_duplicateAddresses += stats.m_duplicateAddresses;
accumulator.m_emptyAddresses += stats.m_emptyAddresses;
accumulator.m_emptyNames += stats.m_emptyNames;
accumulator.m_mismatchedNames += stats.m_mismatchedNames;
}
} // namespace
HierarchyReader::HierarchyReader(string const & pathToJsonHierarchy)
: m_fileStream{pathToJsonHierarchy}, m_in{m_fileStream}
{
if (!m_fileStream)
MYTHROW(OpenException, ("Failed to open file", pathToJsonHierarchy));
}
HierarchyReader::HierarchyReader(istream & in)
: m_in{in}
{
}
Hierarchy HierarchyReader::Read(unsigned int readersCount)
{
CHECK_GREATER_OR_EQUAL(readersCount, 1, ());
LOG(LINFO, ("Reading entries..."));
vector<multimap<base::GeoObjectId, Entry>> taskEntries(readersCount);
vector<ParsingStats> tasksStats(readersCount);
vector<thread> tasks{};
for (size_t t = 0; t < readersCount; ++t)
tasks.emplace_back(&HierarchyReader::ReadEntryMap, this, ref(taskEntries[t]), ref(tasksStats[t]));
for (auto & reader : tasks)
reader.join();
if (m_totalNumLoaded % kLogBatch != 0)
LOG(LINFO, ("Read", m_totalNumLoaded, "entries"));
ParsingStats stats{};
for (auto & readerStats : tasksStats)
stats += readerStats;
auto entries = MergeEntries(taskEntries);
CheckDuplicateOsmIds(entries, stats);
LOG(LINFO, ("Finished reading and indexing the hierarchy. Stats:"));
LOG(LINFO, ("Entries loaded:", stats.m_numLoaded));
LOG(LINFO, ("Corrupted json lines:", stats.m_badJsons));
LOG(LINFO, ("Unreadable base::GeoObjectIds:", stats.m_badOsmIds));
LOG(LINFO, ("Duplicate base::GeoObjectIds:", stats.m_duplicateOsmIds));
LOG(LINFO, ("Entries with duplicate address parts:", stats.m_duplicateAddresses));
LOG(LINFO, ("Entries without address:", stats.m_emptyAddresses));
LOG(LINFO, ("Entries without names:", stats.m_emptyNames));
LOG(LINFO,
("Entries whose names do not match their most specific addresses:", stats.m_mismatchedNames));
LOG(LINFO, ("(End of stats.)"));
return Hierarchy{move(entries), true};
}
vector<Hierarchy::Entry> HierarchyReader::MergeEntries(vector<multimap<base::GeoObjectId, Entry>> & entryParts)
{
auto entries = vector<Entry>{};
size_t size{0};
for (auto const & map : entryParts)
size += map.size();
entries.reserve(size);
LOG(LINFO, ("Merging entries..."));
using PartReference = reference_wrapper<multimap<base::GeoObjectId, Entry>>;
struct ReferenceGreater
{
bool operator()(PartReference const & l, PartReference const & r) const noexcept
{ return l.get() > r.get(); }
};
auto partsQueue = priority_queue<PartReference, vector<PartReference>, ReferenceGreater>(
entryParts.begin(), entryParts.end());
while (!partsQueue.empty())
{
auto & minPart = partsQueue.top().get();
partsQueue.pop();
while (!minPart.empty() && (partsQueue.empty() || minPart <= partsQueue.top().get()))
{
entries.emplace_back(move(minPart.begin()->second));
minPart.erase(minPart.begin());
}
if (!minPart.empty())
partsQueue.push(ref(minPart));
}
return entries;
}
void HierarchyReader::CheckDuplicateOsmIds(vector<geocoder::Hierarchy::Entry> const & entries,
ParsingStats & stats)
{
size_t i = 0;
while (i < entries.size())
{
size_t j = i + 1;
while (j < entries.size() && entries[i].m_osmId == entries[j].m_osmId)
++j;
if (j != i + 1)
{
++stats.m_duplicateOsmIds;
// TODO: Remove the cast when the hierarchies no longer contain negative keys.
LOG(LDEBUG,
("Duplicate osm id:", static_cast<int64_t>(entries[i].m_osmId.GetEncodedId()), "(",
entries[i].m_osmId, ")", "occurs as a key in", j - i, "key-value entries."));
}
i = j;
}
}
void HierarchyReader::ReadEntryMap(multimap<base::GeoObjectId, Entry> & entries, ParsingStats & stats)
{
// Temporary local object for efficient concurent processing (individual cache line for container).
auto localEntries = multimap<base::GeoObjectId, Entry>{};
size_t const kLineBufferCapacity = 10000;
vector<string> linesBuffer(kLineBufferCapacity);
size_t bufferSize = 0;
while (true)
{
bufferSize = 0;
{
auto && lock = lock_guard<mutex>(m_mutex);
for (; bufferSize < kLineBufferCapacity; ++bufferSize)
{
if (!getline(m_in, linesBuffer[bufferSize]))
break;
}
}
if (!bufferSize)
break;
DeserializeEntryMap(linesBuffer, bufferSize, localEntries, stats);
}
entries = move(localEntries);
}
void HierarchyReader::DeserializeEntryMap(vector<string> const & linesBuffer, size_t const bufferSize,
multimap<base::GeoObjectId, Entry> & entries, ParsingStats & stats)
{
for (size_t i = 0; i < bufferSize; ++i)
{
auto & line = linesBuffer[i];
if (line.empty())
continue;
auto const p = line.find(' ');
int64_t encodedId;
if (p == string::npos || !strings::to_any(line.substr(0, p), encodedId))
{
LOG(LWARNING, ("Cannot read osm id. Line:", line));
++stats.m_badOsmIds;
continue;
}
auto json = line.substr(p + 1);
Entry entry;
// TODO: (@m) We should really write uints as uints.
auto const osmId = base::GeoObjectId(static_cast<uint64_t>(encodedId));
entry.m_osmId = osmId;
if (!entry.DeserializeFromJSON(json, stats))
continue;
if (entry.m_type == Type::Count)
continue;
++stats.m_numLoaded;
auto totalNumLoaded = m_totalNumLoaded.fetch_add(1) + 1;
if (totalNumLoaded % kLogBatch == 0)
LOG(LINFO, ("Read", totalNumLoaded, "entries"));
entries.emplace(osmId, move(entry));
}
}
} // namespace geocoder

View file

@ -0,0 +1,46 @@
#pragma once
#include "geocoder/hierarchy.hpp"
#include "base/exception.hpp"
#include "base/geo_object_id.hpp"
#include <atomic>
#include <fstream>
#include <map>
#include <mutex>
#include <string>
#include <thread>
#include <vector>
namespace geocoder
{
class HierarchyReader
{
public:
using Entry = Hierarchy::Entry;
using ParsingStats = Hierarchy::ParsingStats;
DECLARE_EXCEPTION(OpenException, RootException);
explicit HierarchyReader(std::string const & pathToJsonHierarchy);
explicit HierarchyReader(std::istream & jsonHierarchy);
// Read hierarchy file/stream concurrently in |readersCount| threads.
Hierarchy Read(unsigned int readersCount = 1);
private:
void ReadEntryMap(std::multimap<base::GeoObjectId, Entry> & entries, ParsingStats & stats);
void DeserializeEntryMap(std::vector<std::string> const & linesBuffer, std::size_t const bufferSize,
std::multimap<base::GeoObjectId, Entry> & entries, ParsingStats & stats);
std::vector<Entry> MergeEntries(std::vector<std::multimap<base::GeoObjectId, Entry>> & entryParts);
void CheckDuplicateOsmIds(std::vector<Entry> const & entries, ParsingStats & stats);
std::ifstream m_fileStream;
std::istream & m_in;
std::mutex m_mutex;
std::atomic<std::uint64_t> m_totalNumLoaded{0};
};
} // namespace geocoder

View file

@ -8,7 +8,10 @@
#include "base/logging.hpp"
#include "base/string_utils.hpp"
#include <atomic>
#include <cstddef>
#include <mutex>
#include <thread>
using namespace std;
@ -20,12 +23,15 @@ size_t const kLogBatch = 100000;
namespace geocoder
{
Index::Index(Hierarchy const & hierarchy) : m_docs(hierarchy.GetEntries())
Index::Index(Hierarchy const & hierarchy, unsigned int loadThreadsCount)
: m_docs(hierarchy.GetEntries())
{
CHECK_GREATER_OR_EQUAL(loadThreadsCount, 1, ());
LOG(LINFO, ("Indexing hierarchy entries..."));
AddEntries();
LOG(LINFO, ("Indexing houses..."));
AddHouses();
AddHouses(loadThreadsCount);
}
Index::Doc const & Index::GetDoc(DocId const id) const
@ -89,42 +95,60 @@ void Index::AddStreet(DocId const & docId, Index::Doc const & doc)
}
}
void Index::AddHouses()
void Index::AddHouses(unsigned int loadThreadsCount)
{
size_t numIndexed = 0;
for (DocId docId = 0; docId < static_cast<DocId>(m_docs.size()); ++docId)
atomic<size_t> numIndexed{0};
mutex mutex;
vector<thread> threads(loadThreadsCount);
CHECK_GREATER(threads.size(), 0, ());
for (size_t t = 0; t < threads.size(); ++t)
{
auto const & buildingDoc = GetDoc(docId);
threads[t] = thread([&, t, this]() {
size_t const size = m_docs.size() / threads.size();
size_t docId = t * size;
size_t const docIdEnd = (t + 1 == threads.size() ? m_docs.size() : docId + size);
if (buildingDoc.m_type != Type::Building)
continue;
auto const & street = buildingDoc.m_address[static_cast<size_t>(Type::Street)];
auto const & locality = buildingDoc.m_address[static_cast<size_t>(Type::Locality)];
Tokens const * relationName = nullptr;
if (!street.empty())
relationName = &street;
else if (!locality.empty())
relationName = &locality;
if (!relationName)
continue;
ForEachDocId(*relationName, [&](DocId const & candidate) {
auto const & candidateDoc = GetDoc(candidate);
if (candidateDoc.IsParentTo(buildingDoc))
for (; docId < docIdEnd; ++docId)
{
m_relatedBuildings[candidate].emplace_back(docId);
auto const & buildingDoc = GetDoc(docId);
++numIndexed;
if (numIndexed % kLogBatch == 0)
LOG(LINFO, ("Indexed", numIndexed, "houses"));
if (buildingDoc.m_type != Type::Building)
continue;
auto const & street = buildingDoc.m_address[static_cast<size_t>(Type::Street)];
auto const & locality = buildingDoc.m_address[static_cast<size_t>(Type::Locality)];
Tokens const * relationName = nullptr;
if (!street.empty())
relationName = &street;
else if (!locality.empty())
relationName = &locality;
if (!relationName)
continue;
ForEachDocId(*relationName, [&](DocId const & candidate) {
auto const & candidateDoc = GetDoc(candidate);
if (candidateDoc.IsParentTo(buildingDoc))
{
auto && lock = lock_guard<std::mutex>(mutex);
m_relatedBuildings[candidate].emplace_back(docId);
}
});
auto processedCount = numIndexed.fetch_add(1) + 1;
if (processedCount % kLogBatch == 0)
LOG(LINFO, ("Indexed", processedCount, "houses"));
}
});
}
for (auto & t : threads)
t.join();
if (numIndexed % kLogBatch != 0)
LOG(LINFO, ("Indexed", numIndexed, "houses"));
}

View file

@ -6,6 +6,7 @@
#include <cstdint>
#include <string>
#include <thread>
#include <unordered_map>
#include <vector>
@ -20,7 +21,7 @@ public:
// that the index was constructed from.
using DocId = std::vector<Doc>::size_type;
explicit Index(Hierarchy const & hierarchy);
explicit Index(Hierarchy const & hierarchy, unsigned int loadThreadsCount = 1);
Doc const & GetDoc(DocId const id) const;
@ -66,7 +67,7 @@ private:
void AddStreet(DocId const & docId, Doc const & e);
// Fills the |m_relatedBuildings| field.
void AddHouses();
void AddHouses(unsigned int loadThreadsCount);
std::vector<Doc> const & m_docs;