[generator] Optimize features generation: o5m parallel reading
This commit is contained in:
parent
a49738bd7c
commit
4a302bee84
6 changed files with 142 additions and 132 deletions
|
@ -141,8 +141,6 @@ set(
|
|||
translator_region.hpp
|
||||
translator_streets.cpp
|
||||
translator_streets.hpp
|
||||
translators_pool.cpp
|
||||
translators_pool.hpp
|
||||
type_helper.cpp
|
||||
type_helper.hpp
|
||||
unpack_mwm.cpp
|
||||
|
|
|
@ -47,8 +47,7 @@ SourceReader::SourceReader(string const & filename)
|
|||
|
||||
SourceReader::SourceReader(std::istream & stream)
|
||||
: m_file(unique_ptr<istream, Deleter>(&stream, Deleter(false)))
|
||||
{
|
||||
}
|
||||
{ }
|
||||
|
||||
uint64_t SourceReader::Read(char * buffer, uint64_t bufferSize)
|
||||
{
|
||||
|
|
|
@ -4,14 +4,18 @@
|
|||
#include "generator/processor_factory.hpp"
|
||||
#include "generator/raw_generator_writer.hpp"
|
||||
#include "generator/translator_factory.hpp"
|
||||
#include "generator/translators_pool.hpp"
|
||||
|
||||
#include "base/thread_pool_computational.hpp"
|
||||
|
||||
|
||||
#include <future>
|
||||
#include <string>
|
||||
#include <vector>
|
||||
|
||||
#include <boost/iostreams/device/array.hpp>
|
||||
#include <boost/iostreams/stream.hpp>
|
||||
|
||||
#include <sys/mman.h>
|
||||
|
||||
using namespace std;
|
||||
|
||||
namespace generator
|
||||
|
@ -98,40 +102,14 @@ std::vector<std::string> const & RawGenerator::GetNames() const
|
|||
|
||||
bool RawGenerator::GenerateFilteredFeatures()
|
||||
{
|
||||
SourceReader reader = m_genInfo.m_osmFileName.empty() ? SourceReader()
|
||||
: SourceReader(m_genInfo.m_osmFileName);
|
||||
|
||||
std::unique_ptr<ProcessorOsmElementsInterface> sourceProcessor;
|
||||
switch (m_genInfo.m_osmFileType) {
|
||||
case feature::GenerateInfo::OsmSourceType::O5M:
|
||||
sourceProcessor = std::make_unique<ProcessorOsmElementsFromO5M>(reader);
|
||||
break;
|
||||
case feature::GenerateInfo::OsmSourceType::XML:
|
||||
sourceProcessor = std::make_unique<ProcessorOsmElementsFromXml>(reader);
|
||||
break;
|
||||
}
|
||||
CHECK(sourceProcessor, ());
|
||||
|
||||
TranslatorsPool translators(m_translators, m_genInfo.m_threadsCount);
|
||||
RawGeneratorWriter rawGeneratorWriter(m_queue);
|
||||
rawGeneratorWriter.Run();
|
||||
|
||||
size_t element_pos = 0;
|
||||
std::vector<OsmElement> elements(m_chunkSize);
|
||||
while (sourceProcessor->TryRead(elements[element_pos]))
|
||||
{
|
||||
if (++element_pos != m_chunkSize)
|
||||
continue;
|
||||
auto processorThreadsCount = std::max(m_genInfo.m_threadsCount, 2u) - 1 /* writer */;
|
||||
if (m_genInfo.m_osmFileName.empty()) // stdin
|
||||
processorThreadsCount = 1;
|
||||
|
||||
translators.Emit(std::move(elements));
|
||||
elements = vector<OsmElement>(m_chunkSize);
|
||||
element_pos = 0;
|
||||
}
|
||||
elements.resize(element_pos);
|
||||
translators.Emit(std::move(elements));
|
||||
|
||||
LOG(LINFO, ("Input was processed."));
|
||||
if (!translators.Finish())
|
||||
if (!GenerateFeatures(processorThreadsCount, rawGeneratorWriter))
|
||||
return false;
|
||||
|
||||
rawGeneratorWriter.ShutdownAndJoin();
|
||||
|
@ -139,4 +117,124 @@ bool RawGenerator::GenerateFilteredFeatures()
|
|||
LOG(LINFO, ("Names:", m_names));
|
||||
return true;
|
||||
}
|
||||
|
||||
bool RawGenerator::GenerateFeatures(
|
||||
unsigned int threadsCount, RawGeneratorWriter & /* rawGeneratorWriter */)
|
||||
{
|
||||
auto translators = std::vector<std::shared_ptr<TranslatorInterface>>{};
|
||||
auto sourceMap = boost::optional<boost::iostreams::mapped_file_source>{};
|
||||
if (!m_genInfo.m_osmFileName.empty())
|
||||
{
|
||||
sourceMap = MakeFileMap(m_genInfo.m_osmFileName);
|
||||
LOG_SHORT(LINFO, ("Reading OSM data from", m_genInfo.m_osmFileName));
|
||||
}
|
||||
|
||||
std::vector<std::thread> threads;
|
||||
for (unsigned int i = 0; i < threadsCount; ++i)
|
||||
{
|
||||
auto translator = m_translators->Clone();
|
||||
translators.push_back(translator);
|
||||
|
||||
constexpr size_t chunkSize = 10'000;
|
||||
auto processorMaker =
|
||||
[osmFileType = m_genInfo.m_osmFileType, threadsCount, i, chunkSize] (auto & reader)
|
||||
-> std::unique_ptr<ProcessorOsmElementsInterface>
|
||||
{
|
||||
switch (osmFileType)
|
||||
{
|
||||
case feature::GenerateInfo::OsmSourceType::O5M:
|
||||
return std::make_unique<ProcessorOsmElementsFromO5M>(reader, threadsCount, i, chunkSize);
|
||||
case feature::GenerateInfo::OsmSourceType::XML:
|
||||
return std::make_unique<ProcessorOsmElementsFromXml>(reader);
|
||||
}
|
||||
UNREACHABLE();
|
||||
};
|
||||
|
||||
threads.emplace_back([translator, processorMaker, &sourceMap] {
|
||||
if (!sourceMap)
|
||||
{
|
||||
auto reader = SourceReader{};
|
||||
auto processor = processorMaker(reader);
|
||||
TranslateToFeatures(*processor, *translator);
|
||||
return;
|
||||
}
|
||||
|
||||
namespace io = boost::iostreams;
|
||||
auto && sourceArray = io::array_source{sourceMap->data(), sourceMap->size()};
|
||||
auto && stream = io::stream<io::array_source>{sourceArray, std::ios::binary};
|
||||
auto && reader = SourceReader(stream);
|
||||
auto processor = processorMaker(reader);
|
||||
TranslateToFeatures(*processor, *translator);
|
||||
});
|
||||
}
|
||||
for (auto & thread : threads)
|
||||
thread.join();
|
||||
LOG(LINFO, ("Input was processed."));
|
||||
|
||||
return FinishTranslation(translators);
|
||||
}
|
||||
|
||||
// static
|
||||
void RawGenerator::TranslateToFeatures(ProcessorOsmElementsInterface & sourceProcessor,
|
||||
TranslatorInterface & translator)
|
||||
{
|
||||
OsmElement osmElement{};
|
||||
while (sourceProcessor.TryRead(osmElement))
|
||||
translator.Emit(osmElement);
|
||||
}
|
||||
|
||||
bool RawGenerator::FinishTranslation(
|
||||
std::vector<std::shared_ptr<TranslatorInterface>> & translators)
|
||||
{
|
||||
using TranslatorPtr = std::shared_ptr<TranslatorInterface>;
|
||||
|
||||
base::threads::ThreadSafeQueue<std::future<TranslatorPtr>> queue;
|
||||
for (auto const & translator : translators)
|
||||
{
|
||||
std::promise<TranslatorPtr> p;
|
||||
p.set_value(translator);
|
||||
queue.Push(p.get_future());
|
||||
}
|
||||
CHECK_GREATER_OR_EQUAL(queue.Size(), 1, ());
|
||||
|
||||
base::thread_pool::computational::ThreadPool pool(queue.Size() / 2 + 1);
|
||||
while (queue.Size() != 1)
|
||||
{
|
||||
std::future<TranslatorPtr> left;
|
||||
std::future<TranslatorPtr> right;
|
||||
queue.WaitAndPop(left);
|
||||
queue.WaitAndPop(right);
|
||||
queue.Push(pool.Submit([left{move(left)}, right{move(right)}]() mutable {
|
||||
auto leftTranslator = left.get();
|
||||
auto rigthTranslator = right.get();
|
||||
rigthTranslator->Finish();
|
||||
leftTranslator->Finish();
|
||||
leftTranslator->Merge(*rigthTranslator);
|
||||
return leftTranslator;
|
||||
}));
|
||||
}
|
||||
|
||||
std::future<TranslatorPtr> translatorFuture;
|
||||
queue.WaitAndPop(translatorFuture);
|
||||
auto translator = translatorFuture.get();
|
||||
translator->Finish();
|
||||
return translator->Save();
|
||||
}
|
||||
|
||||
boost::iostreams::mapped_file_source RawGenerator::MakeFileMap(std::string const & filename)
|
||||
{
|
||||
CHECK(!filename.empty(), ());
|
||||
auto fileMap = boost::iostreams::mapped_file_source{filename};
|
||||
if (!fileMap.is_open())
|
||||
MYTHROW(Writer::OpenException, ("Failed to open", filename));
|
||||
|
||||
// Try aggressively (MADV_WILLNEED) and asynchronously (std::launch::async) read ahead
|
||||
// the o5m-file.
|
||||
std::async(std::launch::async, [data = fileMap.data(), size = fileMap.size()] {
|
||||
::madvise(const_cast<char*>(data), size, MADV_WILLNEED);
|
||||
});
|
||||
|
||||
return fileMap;
|
||||
}
|
||||
|
||||
} // namespace generator
|
||||
|
|
|
@ -12,8 +12,14 @@
|
|||
#include <string>
|
||||
#include <vector>
|
||||
|
||||
#include <boost/iostreams/device/mapped_file.hpp>
|
||||
#include <boost/optional.hpp>
|
||||
|
||||
namespace generator
|
||||
{
|
||||
class RawGeneratorWriter;
|
||||
class ProcessorOsmElementsInterface;
|
||||
|
||||
class RawGenerator
|
||||
{
|
||||
public:
|
||||
|
@ -43,6 +49,11 @@ private:
|
|||
};
|
||||
|
||||
bool GenerateFilteredFeatures();
|
||||
bool GenerateFeatures(unsigned int threadsCount, RawGeneratorWriter & rawGeneratorWriter);
|
||||
static void TranslateToFeatures(ProcessorOsmElementsInterface & sourceProcessor,
|
||||
TranslatorInterface & translator);
|
||||
bool FinishTranslation(std::vector<std::shared_ptr<TranslatorInterface>> & translators);
|
||||
boost::iostreams::mapped_file_source MakeFileMap(std::string const & filename);
|
||||
|
||||
feature::GenerateInfo & m_genInfo;
|
||||
size_t m_chunkSize;
|
||||
|
|
|
@ -1,68 +0,0 @@
|
|||
#include "generator/translators_pool.hpp"
|
||||
|
||||
#include <future>
|
||||
|
||||
namespace generator
|
||||
{
|
||||
TranslatorsPool::TranslatorsPool(std::shared_ptr<TranslatorInterface> const & original,
|
||||
size_t threadCount)
|
||||
: m_threadPool(threadCount)
|
||||
{
|
||||
CHECK_GREATER_OR_EQUAL(threadCount, 1, ());
|
||||
|
||||
m_translators.Push(original);
|
||||
for (size_t i = 1; i < threadCount; ++i)
|
||||
m_translators.Push(original->Clone());
|
||||
}
|
||||
|
||||
void TranslatorsPool::Emit(std::vector<OsmElement> && elements)
|
||||
{
|
||||
std::shared_ptr<TranslatorInterface> translator;
|
||||
m_translators.WaitAndPop(translator);
|
||||
m_threadPool.SubmitWork([&, translator, elements{std::move(elements)}]() mutable {
|
||||
for (auto & element : elements)
|
||||
translator->Emit(element);
|
||||
|
||||
m_translators.Push(translator);
|
||||
});
|
||||
}
|
||||
|
||||
bool TranslatorsPool::Finish()
|
||||
{
|
||||
m_threadPool.WaitingStop();
|
||||
using TranslatorPtr = std::shared_ptr<TranslatorInterface>;
|
||||
base::threads::ThreadSafeQueue<std::future<TranslatorPtr>> queue;
|
||||
while (!m_translators.Empty())
|
||||
{
|
||||
std::promise<TranslatorPtr> p;
|
||||
std::shared_ptr<TranslatorInterface> translator;
|
||||
m_translators.TryPop(translator);
|
||||
p.set_value(translator);
|
||||
queue.Push(p.get_future());
|
||||
}
|
||||
|
||||
base::thread_pool::computational::ThreadPool pool(queue.Size() / 2 + 1);
|
||||
CHECK_GREATER_OR_EQUAL(queue.Size(), 1, ());
|
||||
while (queue.Size() != 1)
|
||||
{
|
||||
std::future<TranslatorPtr> left;
|
||||
std::future<TranslatorPtr> right;
|
||||
queue.WaitAndPop(left);
|
||||
queue.WaitAndPop(right);
|
||||
queue.Push(pool.Submit([left{move(left)}, right{move(right)}]() mutable {
|
||||
auto leftTranslator = left.get();
|
||||
auto rigthTranslator = right.get();
|
||||
rigthTranslator->Finish();
|
||||
leftTranslator->Finish();
|
||||
leftTranslator->Merge(*rigthTranslator);
|
||||
return leftTranslator;
|
||||
}));
|
||||
}
|
||||
|
||||
std::future<TranslatorPtr> translatorFuture;
|
||||
queue.WaitAndPop(translatorFuture);
|
||||
auto translator = translatorFuture.get();
|
||||
translator->Finish();
|
||||
return translator->Save();
|
||||
}
|
||||
} // namespace generator
|
|
@ -1,28 +0,0 @@
|
|||
#pragma once
|
||||
|
||||
#include "generator/intermediate_data.hpp"
|
||||
#include "generator/osm_element.hpp"
|
||||
#include "generator/translator_interface.hpp"
|
||||
|
||||
#include "base/thread_pool_computational.hpp"
|
||||
#include "base/thread_safe_queue.hpp"
|
||||
|
||||
#include <memory>
|
||||
#include <vector>
|
||||
|
||||
namespace generator
|
||||
{
|
||||
class TranslatorsPool
|
||||
{
|
||||
public:
|
||||
explicit TranslatorsPool(std::shared_ptr<TranslatorInterface> const & original,
|
||||
size_t threadCount);
|
||||
|
||||
void Emit(std::vector<OsmElement> && elements);
|
||||
bool Finish();
|
||||
|
||||
private:
|
||||
base::thread_pool::computational::ThreadPool m_threadPool;
|
||||
base::threads::ThreadSafeQueue<std::shared_ptr<TranslatorInterface>> m_translators;
|
||||
};
|
||||
} // namespace generator
|
Loading…
Add table
Reference in a new issue