[geocoder] Add method for data version retrieve from jsonl-file

This commit is contained in:
Anatoly Serdtcev 2019-11-22 13:10:47 +03:00 committed by LaGrunge
parent 2d8ecec4d5
commit 684443bcbf
6 changed files with 76 additions and 50 deletions

View file

@ -127,4 +127,4 @@ private:
size_t m_precision = kDefaultPrecision;
};
}
}

View file

@ -25,9 +25,6 @@
#include <boost/archive/binary_oarchive.hpp>
#include <boost/exception/exception.hpp>
#include <boost/exception/diagnostic_information.hpp>
#include <boost/iostreams/device/file.hpp>
#include <boost/iostreams/filter/gzip.hpp>
#include <boost/iostreams/filtering_streambuf.hpp>
#include <boost/optional.hpp>
#include <boost/range/adaptor/reversed.hpp>
@ -282,22 +279,11 @@ bool Geocoder::Context::ContainsTokenIds(BeamKey const & beamKey, set<size_t> co
}
// Geocoder ----------------------------------------------------------------------------------------
void Geocoder::LoadFromJsonl(std::string const & pathToJsonHierarchy, unsigned int loadThreadsCount)
void Geocoder::LoadFromJsonl(std::string const & pathToJsonHierarchy, bool dataVersionHeadline,
unsigned int loadThreadsCount)
try
{
using namespace boost::iostreams;
filtering_istreambuf fileStreamBuf;
if (strings::EndsWith(pathToJsonHierarchy, ".gz"))
fileStreamBuf.push(gzip_decompressor());
file_source file(pathToJsonHierarchy);
if (!file.is_open())
MYTHROW(OpenException, ("Failed to open file", pathToJsonHierarchy));
fileStreamBuf.push(file);
std::istream fileStream(&fileStreamBuf);
m_hierarchy = HierarchyReader{fileStream}.Read(loadThreadsCount);
m_hierarchy = HierarchyReader{pathToJsonHierarchy, dataVersionHeadline}.Read(loadThreadsCount);
m_index.BuildIndex(loadThreadsCount);
}
catch (boost::exception const & err)

View file

@ -131,7 +131,8 @@ public:
std::vector<Layer> m_layers;
};
void LoadFromJsonl(std::string const & pathToJsonHierarchy, unsigned int loadThreadsCount = 1);
void LoadFromJsonl(std::string const & pathToJsonHierarchy, bool dataVersionHeadline = false,
unsigned int loadThreadsCount = 1);
void LoadFromBinaryIndex(std::string const & pathToTokenIndex);
void SaveToBinaryIndex(std::string const & pathToTokenIndex) const;

View file

@ -446,7 +446,7 @@ UNIT_TEST(Geocoder_EmptyFileConcurrentRead)
{
Geocoder geocoder;
ScopedFile const regionsJsonFile("regions.jsonl", "");
geocoder.LoadFromJsonl(regionsJsonFile.GetFullPath(), 8 /* reader threads */);
geocoder.LoadFromJsonl(regionsJsonFile.GetFullPath(), false, 8 /* reader threads */);
TEST_EQUAL(geocoder.GetHierarchy().GetEntries().size(), 0, ());
}
@ -469,7 +469,7 @@ UNIT_TEST(Geocoder_BigFileConcurrentRead)
Geocoder geocoder;
ScopedFile const regionsJsonFile("regions.jsonl", s.str());
geocoder.LoadFromJsonl(regionsJsonFile.GetFullPath(), 8 /* reader threads */);
geocoder.LoadFromJsonl(regionsJsonFile.GetFullPath(), false, 8 /* reader threads */);
TEST_EQUAL(geocoder.GetHierarchy().GetEntries().size(), kEntryCount, ());
}

View file

@ -9,6 +9,10 @@
#include <sstream>
#include <vector>
#include <boost/iostreams/device/file.hpp>
#include <boost/iostreams/filter/gzip.hpp>
#include <boost/iostreams/filtering_stream.hpp>
using namespace std;
namespace geocoder
@ -41,28 +45,70 @@ void operator+=(Hierarchy::ParsingStats & accumulator, Hierarchy::ParsingStats &
}
} // namespace
HierarchyReader::HierarchyReader(string const & pathToJsonHierarchy)
: m_fileStream{pathToJsonHierarchy}, m_in{m_fileStream}
HierarchyReader::HierarchyReader(string const & pathToJsonHierarchy, bool dataVersionHeadline)
: m_fileStream{CreateDataStream(pathToJsonHierarchy)}, m_in{*m_fileStream}
{
if (!m_fileStream)
MYTHROW(OpenException, ("Failed to open file", pathToJsonHierarchy));
if (dataVersionHeadline)
m_dataVersion = ReadDataVersion(m_in);
}
HierarchyReader::HierarchyReader(istream & in)
HierarchyReader::HierarchyReader(istream & in, bool dataVersionHeadline)
: m_in{in}
{
if (dataVersionHeadline)
m_dataVersion = ReadDataVersion(m_in);
}
// static
string HierarchyReader::ReadDataVersion(string const & pathToJsonHierarchy)
{
auto stream = CreateDataStream(pathToJsonHierarchy);
return ReadDataVersion(*stream);
}
// static
unique_ptr<istream> HierarchyReader::CreateDataStream(string const & pathToJsonHierarchy)
{
using namespace boost::iostreams;
auto fileStream = make_unique<filtering_istream>();
if (strings::EndsWith(pathToJsonHierarchy, ".gz"))
fileStream->push(gzip_decompressor());
file_source file(pathToJsonHierarchy);
if (!file.is_open())
MYTHROW(OpenException, ("Failed to open file", pathToJsonHierarchy));
fileStream->push(move(file));
return fileStream;
}
// static
string HierarchyReader::ReadDataVersion(istream & stream)
{
auto line = string{};
if (!getline(stream, line))
MYTHROW(NoVersion, ("No version info in data"));
auto const p = line.find(' ');
string const & key = line.substr(0, p);
if (key != kVersionKey)
MYTHROW(NoVersion, ("No version info in data"));
return line.substr(p + 1);
}
Hierarchy HierarchyReader::Read(unsigned int readersCount)
{
CHECK_GREATER_OR_EQUAL(readersCount, 1, ());
LOG(LINFO, ("Loading data version", m_dataVersion));
LOG(LINFO, ("Reading entries..."));
vector<Entry> entries;
NameDictionaryBuilder nameDictionaryBuilder;
ParsingStats stats{};
std::string dataVersion;
base::thread_pool::computational::ThreadPool threadPool{readersCount};
list<future<ParsingResult>> tasks{};
@ -75,15 +121,6 @@ Hierarchy HierarchyReader::Read(unsigned int readersCount)
CHECK(!tasks.empty(), ());
auto & task = tasks.front();
auto taskResult = task.get();
if (!taskResult.m_dataVersion.empty())
{
if (!dataVersion.empty())
LOG(LERROR, ("Duplicate version key"));
dataVersion = taskResult.m_dataVersion;
LOG(LINFO, ("Loaded data version", dataVersion));
}
tasks.pop_front();
auto & taskEntries = taskResult.m_entries;
@ -127,7 +164,7 @@ Hierarchy HierarchyReader::Read(unsigned int readersCount)
("Entries whose names do not match their most specific addresses:", stats.m_mismatchedNames));
LOG(LINFO, ("(End of stats.)"));
return Hierarchy{move(entries), nameDictionaryBuilder.Release(), move(dataVersion)};
return Hierarchy{move(entries), nameDictionaryBuilder.Release(), move(m_dataVersion)};
}
void HierarchyReader::CheckDuplicateOsmIds(vector<geocoder::Hierarchy::Entry> const & entries,
@ -179,7 +216,6 @@ HierarchyReader::ParsingResult HierarchyReader::DeserializeEntries(
entries.reserve(bufferSize);
NameDictionaryBuilder nameDictionaryBuilder;
ParsingStats stats;
std::string dataVersion;
for (size_t i = 0; i < bufferSize; ++i)
{
@ -190,12 +226,7 @@ HierarchyReader::ParsingResult HierarchyReader::DeserializeEntries(
auto const p = line.find(' ');
std::string const & key = line.substr(0, p);
if (key == kVersionKey)
{
dataVersion = line.substr(p + 1);
continue;
}
string const & key = line.substr(0, p);
uint64_t encodedId = 0;
if (p == string::npos || !DeserializeId(key, encodedId))
@ -225,7 +256,7 @@ HierarchyReader::ParsingResult HierarchyReader::DeserializeEntries(
entries.push_back(move(entry));
}
return {move(entries), nameDictionaryBuilder.Release(), move(stats), move(dataVersion)};
return {move(entries), nameDictionaryBuilder.Release(), move(stats)};
}
// static

View file

@ -8,6 +8,7 @@
#include <atomic>
#include <fstream>
#include <memory>
#include <mutex>
#include <string>
#include <thread>
@ -24,23 +25,29 @@ public:
using Entry = Hierarchy::Entry;
using ParsingStats = Hierarchy::ParsingStats;
DECLARE_EXCEPTION(OpenException, RootException);
DECLARE_EXCEPTION(Exception, RootException);
DECLARE_EXCEPTION(OpenException, Exception);
DECLARE_EXCEPTION(NoVersion, Exception);
explicit HierarchyReader(std::string const & pathToJsonHierarchy);
explicit HierarchyReader(std::istream & jsonHierarchy);
explicit HierarchyReader(std::string const & pathToJsonHierarchy,
bool dataVersionHeadline = false);
explicit HierarchyReader(std::istream & jsonHierarchy, bool dataVersionHeadline = false);
// Read hierarchy file/stream concurrently in |readersCount| threads.
Hierarchy Read(unsigned int readersCount = 1);
static std::string ReadDataVersion(std::string const & pathToJsonHierarchy);
private:
struct ParsingResult
{
std::vector<Entry> m_entries;
NameDictionary m_nameDictionary;
ParsingStats m_stats;
std::string m_dataVersion;
};
static std::unique_ptr<std::istream> CreateDataStream(std::string const & pathToJsonHierarchy);
static std::string ReadDataVersion(std::istream & stream);
ParsingResult ReadEntries(size_t count);
ParsingResult DeserializeEntries(std::vector<std::string> const & linesBuffer,
std::size_t const bufferSize);
@ -49,10 +56,11 @@ private:
void CheckDuplicateOsmIds(std::vector<Entry> const & entries, ParsingStats & stats);
std::ifstream m_fileStream;
std::unique_ptr<std::istream> m_fileStream;
std::istream & m_in;
bool m_eof{false};
std::mutex m_mutex;
std::atomic<std::uint64_t> m_totalNumLoaded{0};
std::string m_dataVersion;
};
} // namespace geocoder