[generator:preprocess] Optimize o5m reading: parallel reading

This commit is contained in:
Anatoly Serdtcev 2019-11-03 18:57:25 +03:00 committed by Sergey Yershov
parent 7b111c1583
commit 7fd4934dd6
8 changed files with 273 additions and 37 deletions

View file

@ -154,6 +154,8 @@ set(
world_map_generator.hpp
)
find_package(Boost REQUIRED COMPONENTS iostreams)
geocore_add_library(${PROJECT_NAME} ${SRC})
geocore_link_libraries(${PROJECT_NAME}
base
@ -162,6 +164,7 @@ geocore_link_libraries(${PROJECT_NAME}
platform
geometry
jansson
${Boost_LIBRARIES}
)
geocore_add_test_subdirectory(generator_tests_support)

View file

@ -39,7 +39,7 @@ struct GenerateInfo
OsmSourceType m_osmFileType;
std::string m_osmFileName;
unsigned int m_threadsCount;
unsigned int m_threadsCount{1};
bool m_preloadCache = false;
bool m_verbose = false;

View file

@ -144,7 +144,8 @@ std::vector<OsmElement> ReadOsmElements(std::string const & filename, OsmFormatP
feature::GenerateInfo MakeGenerateInfo(
std::string const & dataPath, std::string const & osmFilename,
std::string const & osmFileType, std::string const & nodeStorageType)
std::string const & osmFileType, std::string const & nodeStorageType,
unsigned int threadsCount)
{
auto genInfo = feature::GenerateInfo{};
genInfo.m_dataPath = dataPath;
@ -153,6 +154,7 @@ feature::GenerateInfo MakeGenerateInfo(
genInfo.m_osmFileName = osmFilename;
genInfo.SetOsmFileType(osmFileType);
genInfo.SetNodeStorageType(nodeStorageType);
genInfo.m_threadsCount = threadsCount;
return genInfo;
}
@ -170,19 +172,23 @@ void TestIntermediateDataGeneration(
// Skip test for node storage type "mem": 64Gb required.
for (auto const & nodeStorageType : {"raw"s, "map"s})
{
auto const & osmFile = ScopedFile{"planet." + osmFileTypeExtension, osmFileData};
auto const & dataPath = ScopedDir{"intermediate_data", true /* recursiveForceRemove */};
for (auto threadsCount : {1, 2, 4})
{
auto const & osmFile = ScopedFile{"planet." + osmFileTypeExtension, osmFileData};
auto const & dataPath = ScopedDir{"intermediate_data", true /* recursiveForceRemove */};
auto const & genInfo = MakeGenerateInfo(dataPath.GetFullPath(), osmFile.GetFullPath(),
osmFileTypeExtension, nodeStorageType);
auto generation = GenerateIntermediateData(genInfo);
CHECK(generation, ());
auto const & genInfo = MakeGenerateInfo(dataPath.GetFullPath(), osmFile.GetFullPath(),
osmFileTypeExtension, nodeStorageType,
threadsCount);
auto generation = GenerateIntermediateData(genInfo);
CHECK(generation, ());
auto osmElements =
ReadOsmElements(genInfo.m_osmFileName, osmFormatParsers.at(osmFileTypeExtension));
auto const & intermediateData = cache::IntermediateData{genInfo, true /* forceReload */};
auto const & cache = intermediateData.GetCache();
dataTester(osmElements, *cache);
auto osmElements =
ReadOsmElements(genInfo.m_osmFileName, osmFormatParsers.at(osmFileTypeExtension));
auto const & intermediateData = cache::IntermediateData{genInfo, true /* forceReload */};
auto const & cache = intermediateData.GetCache();
dataTester(osmElements, *cache);
}
}
}
}

View file

@ -1,9 +1,12 @@
#include "generator/intermediate_data.hpp"
#include <atomic>
#include <new>
#include <set>
#include <string>
#include <boost/filesystem.hpp>
#include "base/assert.hpp"
#include "base/checked_cast.hpp"
#include "base/logging.hpp"
@ -104,10 +107,18 @@ public:
++m_numProcessedPoints;
}
void AddPoints(Nodes const & nodes, bool /* concurrent */) override
{
std::lock_guard<std::mutex> lock(m_updateMutex);
for (auto const & node : nodes)
AddPoint(node.first, node.second.m_lat, node.second.m_lon);
}
uint64_t GetNumProcessedPoints() const override { return m_numProcessedPoints; }
private:
FileWriter m_fileWriter;
std::mutex m_updateMutex;
uint64_t m_numProcessedPoints = 0;
};
@ -162,14 +173,24 @@ public:
LatLon & ll = m_data[id];
ToLatLon(lat, lon, ll);
++m_numProcessedPoints;
m_numProcessedPoints.fetch_add(1, std::memory_order_relaxed);
}
void AddPoints(Nodes const & nodes, bool /* concurrent */) override
{
for (auto const & node : nodes)
{
LatLon & ll = m_data[node.first];
ToLatLon(node.second.m_lat, node.second.m_lon, ll);
}
m_numProcessedPoints.fetch_add(nodes.size(), std::memory_order_relaxed);
}
uint64_t GetNumProcessedPoints() const override { return m_numProcessedPoints; }
private:
FileWriter m_fileWriter;
vector<LatLon> m_data;
uint64_t m_numProcessedPoints = 0;
std::atomic<uint64_t> m_numProcessedPoints{0};
};
// MapFilePointStorageReader -----------------------------------------------------------------------
@ -243,10 +264,17 @@ public:
++m_numProcessedPoints;
}
void AddPoints(Nodes const & nodes, bool /* concurrent */) override
{
std::lock_guard<std::mutex> lock(m_updateMutex);
for (auto const & node : nodes)
AddPoint(node.first, node.second.m_lat, node.second.m_lon);
}
uint64_t GetNumProcessedPoints() const override { return m_numProcessedPoints; }
private:
FileWriter m_fileWriter;
std::mutex m_updateMutex;
uint64_t m_numProcessedPoints = 0;
};
@ -387,6 +415,43 @@ IntermediateDataWriter::IntermediateDataWriter(PointStorageWriterInterface & nod
, m_wayToRelations(info.GetIntermediateFileName(WAYS_FILE, ID2REL_EXT))
{}
void IntermediateDataWriter::AddNode(Key id, double lat, double lon)
{
m_nodes.AddPoint(id, lat, lon);
}
void IntermediateDataWriter::AddNodes(Nodes const & nodes, bool concurrent)
{
m_nodes.AddPoints(nodes, concurrent);
}
void IntermediateDataWriter::AddWay(Key id, WayElement const & e)
{
m_ways.Write(id, e);
}
void IntermediateDataWriter::AddWays(Ways const & ways, bool concurrent)
{
m_ways.Write(ways, concurrent);
}
void IntermediateDataWriter::AddRelations(Relations const & relations, bool concurrent)
{
m_relations.Write(relations, concurrent);
{
std::lock_guard<std::mutex> lock{m_nodeToRelationsUpdateMutex};
for (auto const & relation : relations)
AddToIndex(m_nodeToRelations, relation.first, relation.second.nodes);
}
{
std::lock_guard<std::mutex> lock{m_wayToRelationsUpdateMutex};
for (auto const & relation : relations)
AddToIndex(m_wayToRelations, relation.first, relation.second.ways);
}
}
void IntermediateDataWriter::AddRelation(Key id, RelationElement const & e)
{
static set<string> const types = {"multipolygon", "route", "boundary",

View file

@ -59,8 +59,11 @@ static_assert(std::is_trivially_copyable<LatLonPos>::value, "");
class PointStorageWriterInterface
{
public:
using Nodes = std::vector<std::pair<Key, NodeElement>>;
virtual ~PointStorageWriterInterface() {}
virtual void AddPoint(uint64_t id, double lat, double lon) = 0;
virtual void AddPoints(Nodes const & nodes, bool concurrent) = 0;
virtual uint64_t GetNumProcessedPoints() const = 0;
};
@ -188,15 +191,69 @@ public:
m_currOffset += sizeof(sz) + sz;
}
template <typename Key, typename Value>
void Write(std::vector<std::pair<Key, Value>> const & elements, bool /* concurrent */)
{
auto data = std::vector<uint8_t>{};
data.reserve(elements.size() * 1024);
auto elementsOffsets = std::vector<std::pair<Key, uint64_t>>{};
elementsOffsets.reserve(elements.size());
auto writer = MemWriter<decltype(data)>{data};
for (auto const & element : elements)
{
auto const pos = writer.Pos();
WriteValue(element.second, writer);
elementsOffsets.emplace_back(element.first, pos);
}
uint64_t dataOffset = 0;
{
std::lock_guard<std::mutex> lock{m_fileWriterMutex};
dataOffset = m_currOffset;
m_fileWriter.Write(data.data(), data.size());
m_currOffset += data.size();
}
{
std::lock_guard<std::mutex> lock{m_offsetsMutex};
for (auto const & elementOffset : elementsOffsets)
m_offsets.Add(elementOffset.first, dataOffset + elementOffset.second);
}
}
void SaveOffsets();
protected:
BufferedFileWriter m_fileWriter;
std::mutex m_fileWriterMutex;
uint64_t m_currOffset{0};
IndexFileWriter m_offsets;
std::mutex m_offsetsMutex;
std::string m_name;
std::vector<uint8_t> m_data;
bool m_preload = false;
private:
template <typename Value, typename Writer>
void WriteValue(Value const & element, Writer & writer)
{
auto const sizePos = writer.Pos();
auto elementDataSize = uint32_t{0};
writer.Write(&elementDataSize, sizeof(elementDataSize));
auto const elementDataPos = writer.Pos();
element.Write(writer);
auto const elementDataEndPos = writer.Pos();
elementDataSize = base::checked_cast<uint32_t>(elementDataEndPos - elementDataPos);
ASSERT_LESS(elementDataSize, std::numeric_limits<uint32_t>::max(), ());
writer.Seek(sizePos);
writer.Write(&elementDataSize, sizeof(elementDataSize));
writer.Seek(elementDataEndPos);
}
};
class IntermediateDataReader
@ -277,12 +334,19 @@ private:
class IntermediateDataWriter
{
public:
using Nodes = std::vector<std::pair<Key, NodeElement>>;
using Ways = std::vector<std::pair<Key, WayElement>>;
using Relations = std::vector<std::pair<Key, RelationElement>>;
IntermediateDataWriter(PointStorageWriterInterface & nodes, feature::GenerateInfo const & info);
void AddNode(Key id, double lat, double lon) { m_nodes.AddPoint(id, lat, lon); }
void AddWay(Key id, WayElement const & e) { m_ways.Write(id, e); }
void AddNode(Key id, double lat, double lon);
void AddNodes(Nodes const & nodes, bool concurrent);
void AddWay(Key id, WayElement const & e);
void AddWays(Ways const & ways, bool concurrent);
void AddRelation(Key id, RelationElement const & e);
void AddRelations(Relations const & relations, bool concurrent);
void SaveIndex();
static void AddToIndex(cache::IndexFileWriter & index, Key relationId, std::vector<uint64_t> const & values)
@ -303,7 +367,9 @@ private:
cache::OSMElementCacheWriter m_ways;
cache::OSMElementCacheWriter m_relations;
cache::IndexFileWriter m_nodeToRelations;
std::mutex m_nodeToRelationsUpdateMutex;
cache::IndexFileWriter m_wayToRelations;
std::mutex m_wayToRelationsUpdateMutex;
};
std::unique_ptr<PointStorageReaderInterface>

View file

@ -17,6 +17,11 @@
#include <fstream>
#include <memory>
#include <set>
#include <thread>
#include <vector>
#include <boost/iostreams/device/mapped_file.hpp>
#include <boost/iostreams/stream.hpp>
#include "defines.hpp"
@ -128,7 +133,7 @@ void BuildIntermediateDataFromXML(SourceReader & stream, cache::IntermediateData
OsmElement element;
while (processorOsmElementsFromXml.TryRead(element))
{
towns.CheckElement(element);
towns.CheckElement(element, false /* concurrent */);
AddElementToCache(cache, std::move(element));
}
}
@ -148,35 +153,121 @@ void ProcessOsmElementsFromXML(SourceReader & stream, function<void(OsmElement &
processor(std::move(element));
}
void BuildIntermediateDataFromO5M(SourceReader & stream, cache::IntermediateDataWriter & cache,
TownsDumper & towns)
void BuildIntermediateData(std::vector<OsmElement> && elements,
cache::IntermediateDataWriter & cache, TownsDumper & towns,
bool concurrent)
{
auto processor = [&](OsmElement && element) {
towns.CheckElement(element);
AddElementToCache(cache, std::move(element));
};
if (elements.empty())
return;
// Use only this function here, look into ProcessOsmElementsFromO5M
// for more details.
ProcessOsmElementsFromO5M(stream, processor);
auto const firstElementType = elements.front().m_type;
auto nodes = cache::IntermediateDataWriter::Nodes{};
if (firstElementType == OsmElement::EntityType::Node)
nodes.reserve(elements.size());
auto ways = cache::IntermediateDataWriter::Ways{};
if (firstElementType == OsmElement::EntityType::Way)
ways.reserve(elements.size());
auto relations = cache::IntermediateDataWriter::Relations{};
if (firstElementType == OsmElement::EntityType::Relation)
relations.reserve(elements.size());
for (auto & osmElement : elements)
{
towns.CheckElement(osmElement, concurrent);
auto const id = osmElement.m_id;
switch (osmElement.m_type)
{
case OsmElement::EntityType::Node:
{
nodes.emplace_back(id, NodeElement{});
BuildIntermediateNode(std::move(osmElement), nodes.back().second);
break;
}
case OsmElement::EntityType::Way:
{
WayElement way{id};
if (BuildIntermediateWay(std::move(osmElement), way))
ways.emplace_back(id, std::move(way));
break;
}
case OsmElement::EntityType::Relation:
{
RelationElement relation;
if (BuildIntermediateRelation(std::move(osmElement), relation))
relations.emplace_back(id, std::move(relation));
break;
}
default:
break;
}
}
if (!nodes.empty())
cache.AddNodes(std::move(nodes), concurrent);
if (!ways.empty())
cache.AddWays(std::move(ways), concurrent);
if (!relations.empty())
cache.AddRelations(std::move(relations), concurrent);
}
void BuildIntermediateDataFromO5M(
std::string const & filename, cache::IntermediateDataWriter & cache, TownsDumper & towns)
ProcessorOsmElementsFromO5M & o5mReader, cache::IntermediateDataWriter & cache,
TownsDumper & towns, bool concurrent)
{
std::vector<OsmElement> elements(o5mReader.ChunkSize());
size_t elementsCount = 0;
while (o5mReader.TryRead(elements[elementsCount]))
{
++elementsCount;
if (elementsCount < o5mReader.ChunkSize())
continue;
BuildIntermediateData(std::move(elements), cache, towns, concurrent);
elements.resize(o5mReader.ChunkSize()); // restore capacity after std::move(elements)
elementsCount = 0;
}
elements.resize(elementsCount);
BuildIntermediateData(std::move(elements), cache, towns, concurrent);
}
void BuildIntermediateDataFromO5M(
std::string const & filename, cache::IntermediateDataWriter & cache, TownsDumper & towns,
unsigned int threadsCount)
{
if (filename.empty())
{
// Read form stdin.
SourceReader reader{};
return BuildIntermediateDataFromO5M(reader, cache, towns);
auto && reader = SourceReader{};
auto && o5mReader = ProcessorOsmElementsFromO5M(reader);
return BuildIntermediateDataFromO5M(o5mReader, cache, towns, false /* concurrent */);
}
LOG_SHORT(LINFO, ("Reading OSM data from", filename));
auto && stream = std::ifstream{filename, std::ios::binary};
auto && reader = SourceReader(stream);
auto sourceMap = boost::iostreams::mapped_file_source{filename};
if (!sourceMap.is_open())
MYTHROW(Writer::OpenException, ("Failed to open", filename));
BuildIntermediateDataFromO5M(reader, cache, towns);
constexpr size_t chunkSize = 10'000;
std::vector<std::thread> threads;
for (unsigned int i = 0; i < std::max(threadsCount, 1u); ++i)
{
threads.emplace_back([&sourceMap, &cache, &towns, threadsCount, i] {
namespace io = boost::iostreams;
auto && sourceArray = io::array_source{sourceMap.data(), sourceMap.size()};
auto && stream = io::stream<io::array_source>{sourceArray, std::ios::binary};
auto && reader = SourceReader(stream);
auto && o5mReader = ProcessorOsmElementsFromO5M(reader, threadsCount, i, chunkSize);
BuildIntermediateDataFromO5M(o5mReader, cache, towns, threadsCount > 1);
});
}
for (auto & thread : threads)
thread.join();
}
void ProcessOsmElementsFromO5M(SourceReader & stream, function<void(OsmElement &&)> processor)
@ -193,7 +284,7 @@ ProcessorOsmElementsFromO5M::ProcessorOsmElementsFromO5M(
: m_stream(stream)
, m_dataset([&](uint8_t * buffer, size_t size) {
return m_stream.Read(reinterpret_cast<char *>(buffer), size);
})
}, 1024 * 1024)
, m_taskCount{taskCount}
, m_taskId{taskId}
, m_chunkSize{chunkSize}
@ -319,7 +410,7 @@ bool GenerateIntermediateData(feature::GenerateInfo const & info)
BuildIntermediateDataFromXML(info.m_osmFileName, cache, towns);
break;
case feature::GenerateInfo::OsmSourceType::O5M:
BuildIntermediateDataFromO5M(info.m_osmFileName, cache, towns);
BuildIntermediateDataFromO5M(info.m_osmFileName, cache, towns, info.m_threadsCount);
break;
}

View file

@ -58,7 +58,7 @@ void TownsDumper::FilterTowns()
LOG(LINFO, ("Preprocessing finished. Have", m_records.size(), "towns."));
}
void TownsDumper::CheckElement(OsmElement const & em)
void TownsDumper::CheckElement(OsmElement const & em, bool /* concurrent */)
{
if (em.m_type != OsmElement::EntityType::Node)
return;
@ -96,7 +96,10 @@ void TownsDumper::CheckElement(OsmElement const & em)
capital = false;
if (town || capital)
{
std::lock_guard<std::mutex> lock{m_updateMutex};
m_records.emplace_back(em.m_lat, em.m_lon, em.m_id, capital, population);
}
}
void TownsDumper::Dump(std::string const & filePath)

View file

@ -8,6 +8,7 @@
#include "base/string_utils.hpp"
#include <mutex>
#include <string>
#include <vector>
@ -16,7 +17,7 @@ class TownsDumper
public:
TownsDumper();
void CheckElement(OsmElement const & em);
void CheckElement(OsmElement const & em, bool concurrent);
void Dump(std::string const & filePath);
@ -43,4 +44,5 @@ private:
};
std::vector<Town> m_records;
std::mutex m_updateMutex;
};