[geocoder] Refact HierchyReader

This commit is contained in:
Anatoly Serdtcev 2019-01-28 15:16:44 +03:00
parent 6f6c7a4d25
commit 4f07aadd8d
7 changed files with 119 additions and 84 deletions

View file

@ -1,9 +1,11 @@
#include "geocoder/geocoder.hpp"
#include "search/house_numbers_matcher.hpp"
#include "geocoder/hierarchy_reader.hpp"
#include "indexer/search_string_utils.hpp"
#include "search/house_numbers_matcher.hpp"
#include "base/assert.hpp"
#include "base/logging.hpp"
#include "base/scope_guard.hpp"
@ -193,8 +195,13 @@ vector<Geocoder::Layer> & Geocoder::Context::GetLayers() { return m_layers; }
vector<Geocoder::Layer> const & Geocoder::Context::GetLayers() const { return m_layers; }
// Geocoder ----------------------------------------------------------------------------------------
Geocoder::Geocoder(string const & pathToJsonHierarchy, size_t hierarchyReadersCount)
: m_hierarchy(pathToJsonHierarchy, hierarchyReadersCount), m_index(m_hierarchy)
Geocoder::Geocoder(string const & pathToJsonHierarchy)
: Geocoder(HierarchyReader(pathToJsonHierarchy).Read())
{
}
Geocoder::Geocoder(Hierarchy && hierarchy)
: m_hierarchy(move(hierarchy)), m_index(m_hierarchy)
{
}

View file

@ -118,7 +118,8 @@ public:
std::vector<Layer> m_layers;
};
explicit Geocoder(std::string const & pathToJsonHierarchy, std::size_t hierarchyReadersCount = 4);
explicit Geocoder(std::string const & pathToJsonHierarchy);
explicit Geocoder(Hierarchy && hierarchy);
void ProcessQuery(std::string const & query, std::vector<Result> & results) const;

View file

@ -1,6 +1,7 @@
#include "testing/testing.hpp"
#include "geocoder/geocoder.hpp"
#include "geocoder/hierarchy_reader.hpp"
#include "indexer/search_string_utils.hpp"
@ -155,7 +156,8 @@ UNIT_TEST(Geocoder_MismatchedLocality)
UNIT_TEST(Geocoder_EmptyFileConcurrentRead)
{
ScopedFile const regionsJsonFile("regions.jsonl", "");
Geocoder geocoder(regionsJsonFile.GetFullPath(), 8 /* reader threads */);
HierarchyReader reader{regionsJsonFile.GetFullPath()};
Geocoder geocoder(reader.Read(8 /* reader threads */));
TEST_EQUAL(geocoder.GetHierarchy().GetEntries().size(), 0, ());
}
@ -176,7 +178,8 @@ UNIT_TEST(Geocoder_BigFileConcurrentRead)
}
ScopedFile const regionsJsonFile("regions.jsonl", s.str());
Geocoder geocoder(regionsJsonFile.GetFullPath(), 8 /* reader threads */);
HierarchyReader reader{regionsJsonFile.GetFullPath()};
Geocoder geocoder(reader.Read(8 /* reader threads */));
TEST_EQUAL(geocoder.GetHierarchy().GetEntries().size(), kEntryCount, ());
}

View file

@ -1,7 +1,5 @@
#include "geocoder/hierarchy.hpp"
#include "geocoder/hierarchy_reader.hpp"
#include "indexer/search_string_utils.hpp"
#include "base/exception.hpp"
@ -15,30 +13,6 @@
using namespace std;
namespace
{
void CheckDuplicateOsmIds(vector<geocoder::Hierarchy::Entry> const & entries,
geocoder::Hierarchy::ParsingStats & stats)
{
size_t i = 0;
while (i < entries.size())
{
size_t j = i + 1;
while (j < entries.size() && entries[i].m_osmId == entries[j].m_osmId)
++j;
if (j != i + 1)
{
++stats.m_duplicateOsmIds;
// todo Remove the cast when the hierarchies no longer contain negative keys.
LOG(LDEBUG,
("Duplicate osm id:", static_cast<int64_t>(entries[i].m_osmId.GetEncodedId()), "(",
entries[i].m_osmId, ")", "occurs as a key in", j - i, "key-value entries."));
}
i = j;
}
}
} // namespace
namespace geocoder
{
// Hierarchy::Entry --------------------------------------------------------------------------------
@ -136,26 +110,14 @@ bool Hierarchy::Entry::IsParentTo(Hierarchy::Entry const & e) const
}
// Hierarchy ---------------------------------------------------------------------------------------
Hierarchy::Hierarchy(string const & pathToJsonHierarchy, size_t readersCount)
Hierarchy::Hierarchy(vector<Entry> && entries, bool sorted)
: m_entries{std::move(entries)}
{
ParsingStats stats;
HierarchyReader reader{pathToJsonHierarchy};
m_entries = reader.ReadEntries(readersCount, stats);
CheckDuplicateOsmIds(m_entries, stats);
LOG(LINFO, ("Finished reading and indexing the hierarchy. Stats:"));
LOG(LINFO, ("Entries loaded:", stats.m_numLoaded));
LOG(LINFO, ("Corrupted json lines:", stats.m_badJsons));
LOG(LINFO, ("Unreadable base::GeoObjectIds:", stats.m_badOsmIds));
LOG(LINFO, ("Duplicate base::GeoObjectIds:", stats.m_duplicateOsmIds));
LOG(LINFO, ("Entries with duplicate address parts:", stats.m_duplicateAddresses));
LOG(LINFO, ("Entries without address:", stats.m_emptyAddresses));
LOG(LINFO, ("Entries without names:", stats.m_emptyNames));
LOG(LINFO,
("Entries whose names do not match their most specific addresses:", stats.m_mismatchedNames));
LOG(LINFO, ("(End of stats.)"));
if (!sorted)
{
LOG(LINFO, ("Sorting entries..."));
sort(m_entries.begin(), m_entries.end());
}
}
vector<Hierarchy::Entry> const & Hierarchy::GetEntries() const

View file

@ -5,7 +5,6 @@
#include "base/geo_object_id.hpp"
#include <array>
#include <atomic>
#include <cstddef>
#include <cstdint>
#include <string>
@ -21,31 +20,31 @@ public:
struct ParsingStats
{
// Number of entries that the hierarchy was constructed from.
std::atomic<uint64_t> m_numLoaded{0};
uint64_t m_numLoaded{0};
// Number of corrupted json lines.
std::atomic<uint64_t> m_badJsons{0};
uint64_t m_badJsons{0};
// Number of entries with unreadable base::GeoObjectIds.
std::atomic<uint64_t> m_badOsmIds{0};
uint64_t m_badOsmIds{0};
// Number of base::GeoObjectsIds that occur as keys in at least two entries.
std::atomic<uint64_t> m_duplicateOsmIds{0};
uint64_t m_duplicateOsmIds{0};
// Number of entries with duplicate subfields in the address field.
std::atomic<uint64_t> m_duplicateAddresses{0};
uint64_t m_duplicateAddresses{0};
// Number of entries whose address field either does
// not exist or consists of empty lines.
std::atomic<uint64_t> m_emptyAddresses{0};
uint64_t m_emptyAddresses{0};
// Number of entries without the name field or with an empty one.
std::atomic<uint64_t> m_emptyNames{0};
uint64_t m_emptyNames{0};
// Number of entries whose names do not match the most
// specific parts of their addresses.
// This is expected from POIs but not from regions or streets.
std::atomic<uint64_t> m_mismatchedNames{0};
uint64_t m_mismatchedNames{0};
};
// A single entry in the hierarchy directed acyclic graph.
@ -76,7 +75,7 @@ public:
std::array<Tokens, static_cast<size_t>(Type::Count) + 1> m_address;
};
explicit Hierarchy(std::string const & pathToJsonHierarchy, std::size_t readersCount = 4);
explicit Hierarchy(std::vector<Entry> && entries, bool sorted);
std::vector<Entry> const & GetEntries() const;

View file

@ -1,10 +1,6 @@
#include "geocoder/hierarchy_reader.hpp"
#include "base/logging.hpp"
#include "base/string_utils.hpp"
#include <boost/iostreams/device/file.hpp>
#include <boost/iostreams/filter/gzip.hpp>
#include <queue>
#include <thread>
@ -17,38 +13,79 @@ namespace
{
// Information will be logged for every |kLogBatch| entries.
size_t const kLogBatch = 100000;
void operator += (Hierarchy::ParsingStats & accumulator, Hierarchy::ParsingStats & stats)
{
struct ValidationStats
{
uint64_t m_numLoaded, m_badJsons, m_badOsmIds, m_duplicateOsmIds, m_duplicateAddresses,
m_emptyAddresses, m_emptyNames, m_mismatchedNames;
};
static_assert(sizeof(Hierarchy::ParsingStats) == sizeof(ValidationStats), "");
accumulator.m_numLoaded += stats.m_numLoaded;
accumulator.m_badJsons += stats.m_badJsons;
accumulator.m_badOsmIds += stats.m_badOsmIds;
accumulator.m_duplicateOsmIds += stats.m_duplicateOsmIds;
accumulator.m_duplicateAddresses += stats.m_duplicateAddresses;
accumulator.m_emptyAddresses += stats.m_emptyAddresses;
accumulator.m_emptyNames += stats.m_emptyNames;
accumulator.m_mismatchedNames += stats.m_mismatchedNames;
}
} // namespace
HierarchyReader::HierarchyReader(string const & pathToJsonHierarchy)
: m_fileStream{pathToJsonHierarchy}, m_in{m_fileStream}
{
using namespace boost::iostreams;
if (strings::EndsWith(pathToJsonHierarchy, ".gz"))
m_fileStream.push(gzip_decompressor());
m_fileStream.push(file_source(pathToJsonHierarchy));
if (!m_fileStream)
MYTHROW(OpenException, ("Failed to open file", pathToJsonHierarchy));
}
vector<Hierarchy::Entry> HierarchyReader::ReadEntries(size_t readersCount, ParsingStats & stats)
HierarchyReader::HierarchyReader(std::istream & in)
: m_in{in}
{
}
Hierarchy HierarchyReader::Read(size_t readersCount)
{
LOG(LINFO, ("Reading entries..."));
readersCount = min(readersCount, size_t{thread::hardware_concurrency()});
vector<multimap<base::GeoObjectId, Entry>> taskEntries(readersCount);
vector<ParsingStats> tasksStats(readersCount);
vector<thread> tasks{};
for (size_t t = 0; t < readersCount; ++t)
tasks.emplace_back(&HierarchyReader::ReadEntryMap, this, ref(taskEntries[t]), ref(stats));
tasks.emplace_back(&HierarchyReader::ReadEntryMap, this, ref(taskEntries[t]), ref(tasksStats[t]));
for (auto & reader : tasks)
reader.join();
if (stats.m_numLoaded % kLogBatch != 0)
LOG(LINFO, ("Read", stats.m_numLoaded, "entries"));
if (m_totalNumLoaded % kLogBatch != 0)
LOG(LINFO, ("Read", m_totalNumLoaded, "entries"));
return MergeEntries(taskEntries);
ParsingStats stats{};
for (auto & readerStats : tasksStats)
stats += readerStats;
auto entries = MergeEntries(taskEntries);
CheckDuplicateOsmIds(entries, stats);
LOG(LINFO, ("Finished reading and indexing the hierarchy. Stats:"));
LOG(LINFO, ("Entries loaded:", stats.m_numLoaded));
LOG(LINFO, ("Corrupted json lines:", stats.m_badJsons));
LOG(LINFO, ("Unreadable base::GeoObjectIds:", stats.m_badOsmIds));
LOG(LINFO, ("Duplicate base::GeoObjectIds:", stats.m_duplicateOsmIds));
LOG(LINFO, ("Entries with duplicate address parts:", stats.m_duplicateAddresses));
LOG(LINFO, ("Entries without address:", stats.m_emptyAddresses));
LOG(LINFO, ("Entries without names:", stats.m_emptyNames));
LOG(LINFO,
("Entries whose names do not match their most specific addresses:", stats.m_mismatchedNames));
LOG(LINFO, ("(End of stats.)"));
return Hierarchy{std::move(entries), true};
}
vector<Hierarchy::Entry> HierarchyReader::MergeEntries(vector<multimap<base::GeoObjectId, Entry>> & entryParts)
@ -90,6 +127,27 @@ vector<Hierarchy::Entry> HierarchyReader::MergeEntries(vector<multimap<base::Geo
return entries;
}
void HierarchyReader::CheckDuplicateOsmIds(vector<geocoder::Hierarchy::Entry> const & entries,
ParsingStats & stats)
{
size_t i = 0;
while (i < entries.size())
{
size_t j = i + 1;
while (j < entries.size() && entries[i].m_osmId == entries[j].m_osmId)
++j;
if (j != i + 1)
{
++stats.m_duplicateOsmIds;
// todo Remove the cast when the hierarchies no longer contain negative keys.
LOG(LDEBUG,
("Duplicate osm id:", static_cast<int64_t>(entries[i].m_osmId.GetEncodedId()), "(",
entries[i].m_osmId, ")", "occurs as a key in", j - i, "key-value entries."));
}
i = j;
}
}
void HierarchyReader::ReadEntryMap(multimap<base::GeoObjectId, Entry> & entries, ParsingStats & stats)
{
// Temporary local object for efficient concurent processing (individual cache line for container).
@ -108,7 +166,7 @@ void HierarchyReader::ReadEntryMap(multimap<base::GeoObjectId, Entry> & entries,
for (; bufferSize < kLineBufferCapacity; ++bufferSize)
{
if (!getline(m_fileStream, linesBuffer[bufferSize]))
if (!getline(m_in, linesBuffer[bufferSize]))
break;
}
}
@ -153,10 +211,11 @@ void HierarchyReader::DeserializeEntryMap(vector<string> const & linesBuffer, in
if (entry.m_type == Type::Count)
continue;
auto numLoaded = stats.m_numLoaded.fetch_add(1) + 1;
++stats.m_numLoaded;
if (numLoaded % kLogBatch == 0)
LOG(LINFO, ("Read", numLoaded, "entries"));
auto totalNumLoaded = m_totalNumLoaded.fetch_add(1) + 1;
if (totalNumLoaded % kLogBatch == 0)
LOG(LINFO, ("Read", totalNumLoaded, "entries"));
entries.emplace(osmId, move(entry));
}

View file

@ -5,8 +5,8 @@
#include "base/exception.hpp"
#include "base/geo_object_id.hpp"
#include <boost/iostreams/filtering_stream.hpp>
#include <atomic>
#include <fstream>
#include <map>
#include <mutex>
#include <string>
@ -23,8 +23,9 @@ public:
DECLARE_EXCEPTION(OpenException, RootException);
explicit HierarchyReader(std::string const & pathToJsonHierarchy);
explicit HierarchyReader(std::istream & in);
std::vector<Entry> ReadEntries(size_t readersCount, ParsingStats & stats);
Hierarchy Read(size_t readersCount = 4);
private:
void ReadEntryMap(std::multimap<base::GeoObjectId, Entry> & entries, ParsingStats & stats);
@ -33,8 +34,11 @@ private:
std::multimap<base::GeoObjectId, Entry> & entries, ParsingStats & stats);
std::vector<Entry> MergeEntries(std::vector<std::multimap<base::GeoObjectId, Entry>> & entryParts);
void CheckDuplicateOsmIds(std::vector<Entry> const & entries, ParsingStats & stats);
boost::iostreams::filtering_istream m_fileStream;
std::ifstream m_fileStream;
std::istream & m_in;
std::mutex m_mutex;
std::atomic<std::uint64_t> m_totalNumLoaded{0};
};
} // namespace geocoder