diff --git a/generator/feature_builder.cpp b/generator/feature_builder.cpp index 12bece1..ee366d8 100644 --- a/generator/feature_builder.cpp +++ b/generator/feature_builder.cpp @@ -18,6 +18,8 @@ #include #include +#include + using namespace std; namespace @@ -785,4 +787,18 @@ TypeSerializationVersion const MinSize::kSerializationVersion; // static TypeSerializationVersion const MaxAccuracy::kSerializationVersion; } // namespace serialization_policy + +// FeaturesFileMmap -------------------------------------------------------------------------------- +FeaturesFileMmap::FeaturesFileMmap(std::string const & filename) + : m_fileMmap{filename} +{ + if (!m_fileMmap.is_open()) + MYTHROW(Writer::OpenException, ("Failed to open", filename)); + + // Try aggressively (MADV_WILLNEED) and asynchronously read ahead the feature-file. + auto readaheadTask = std::thread([data = m_fileMmap.data(), size = m_fileMmap.size()] { + ::madvise(const_cast(data), size, MADV_WILLNEED); + }); + readaheadTask.detach(); +} } // namespace feature diff --git a/generator/feature_builder.hpp b/generator/feature_builder.hpp index 8139c1b..c4d9c89 100644 --- a/generator/feature_builder.hpp +++ b/generator/feature_builder.hpp @@ -17,6 +17,8 @@ #include #include +#include + namespace serial { class GeometryCodingParams; @@ -289,6 +291,7 @@ struct MaxAccuracy }; } // namespace serialization_policy +// Features file processing ------------------------------------------------------------------------ // Read feature from feature source. template void ReadFromSourceRawFormat(Source & src, FeatureBuilder & fb) @@ -299,65 +302,83 @@ void ReadFromSourceRawFormat(Source & src, FeatureBuilder & fb) SerializationPolicy::Deserialize(fb, buffer); } +class FeaturesFileMmap +{ +public: + class const_iterator; + + FeaturesFileMmap(std::string const & filename); + + template + void ForEachTaskChunk(unsigned int taskIndex, unsigned int tasksCount, size_t chunkSize, + Handler && handler) const + { + auto && reader = + MemReaderTemplate{m_fileMmap.data(), m_fileMmap.size()}; + auto && src = ReaderSource>{reader}; + + auto && buffer = FeatureBuilder::Buffer{}; + auto && fileSize = reader.Size(); + + for (size_t featuresCounter = 0; src.Pos() < fileSize; ++featuresCounter) + { + auto const featurePos = src.Pos(); + + uint32_t const featureSize = ReadVarUint(src); + + auto const featureChunkIndex = featuresCounter / chunkSize; + auto const featureTaskIndex = featureChunkIndex % tasksCount; + if (featureTaskIndex != taskIndex) + { + src.Skip(featureSize); + continue; + } + + buffer.resize(featureSize); + src.Read(buffer.data(), featureSize); + + auto && fb = FeatureBuilder{}; + SerializationPolicy::Deserialize(fb, buffer); + + handler(fb, featurePos); + } + } + +private: + boost::iostreams::mapped_file_source m_fileMmap; +}; + // Process features in .dat file. template void ForEachFromDatRawFormat(std::string const & filename, ToDo && toDo) { - FileReader reader(filename); - ReaderSource src(reader); - auto const fileSize = reader.Size(); - auto currPos = src.Pos(); - // read features one by one - while (currPos < fileSize) - { - FeatureBuilder fb; - ReadFromSourceRawFormat(src, fb); - toDo(fb, currPos); - currPos = src.Pos(); - } + auto && featuresMmap = FeaturesFileMmap{filename}; + featuresMmap.ForEachTaskChunk( + 0 /* taskIndex */, 1 /* taskCount*/, 1 /* chunkSize */, std::forward(toDo)); } /// Parallel process features in .dat file. template void ForEachParallelFromDatRawFormat(unsigned int threadsCount, std::string const & filename, - ToDo && toDo) + ToDo && toDo, uint64_t chunkSize = 1'000) { CHECK_GREATER_OR_EQUAL(threadsCount, 1, ()); - if (threadsCount == 1) + if (threadsCount == 0 || threadsCount == 1) return ForEachFromDatRawFormat(filename, std::forward(toDo)); - FileReader reader(filename); - ReaderSource src(reader); - auto const fileSize = reader.Size(); - auto currPos = src.Pos(); - std::mutex readMutex; - auto concurrentProcessor = [&] { - for (;;) - { - FeatureBuilder fb; - uint64_t featurePos; + auto && featuresMmap = FeaturesFileMmap{filename}; + auto && threads = std::vector{}; + for (unsigned int i = 0; i < threadsCount; ++i) + { + threads.emplace_back([i, threadsCount, chunkSize, &featuresMmap, toDo] { + featuresMmap.ForEachTaskChunk(i, threadsCount, chunkSize, toDo); + }); + } - { - std::lock_guard lock(readMutex); - - if (fileSize <= currPos) - break; - - ReadFromSourceRawFormat(src, fb); - featurePos = currPos; - currPos = src.Pos(); - } - - toDo(fb, featurePos); - } - }; - - std::vector workers; - for (size_t i = 0; i < threadsCount; ++i) - workers.emplace_back(concurrentProcessor); - for (auto & thread : workers) + for (auto & thread : threads) thread.join(); } + template std::vector ReadAllDatRawFormat(std::string const & fileName) { diff --git a/generator/feature_generator.cpp b/generator/feature_generator.cpp index 9353106..fade4b0 100644 --- a/generator/feature_generator.cpp +++ b/generator/feature_generator.cpp @@ -95,38 +95,6 @@ uint32_t FeaturesCollector::Collect(FeatureBuilder const & fb) return featureId; } -FeaturesAndRawGeometryCollector::FeaturesAndRawGeometryCollector(std::string const & featuresFileName, - std::string const & rawGeometryFileName) - : FeaturesCollector(featuresFileName), m_rawGeometryFileStream(rawGeometryFileName) {} - -FeaturesAndRawGeometryCollector::~FeaturesAndRawGeometryCollector() -{ - uint64_t terminator = 0; - m_rawGeometryFileStream.Write(&terminator, sizeof(terminator)); - LOG(LINFO, ("Write", m_rawGeometryCounter, "geometries into", m_rawGeometryFileStream.GetName())); -} - -uint32_t FeaturesAndRawGeometryCollector::Collect(FeatureBuilder const & fb) -{ - uint32_t const featureId = FeaturesCollector::Collect(fb); - FeatureBuilder::Geometry const & geom = fb.GetGeometry(); - if (geom.empty()) - return featureId; - - ++m_rawGeometryCounter; - - uint64_t numGeometries = geom.size(); - m_rawGeometryFileStream.Write(&numGeometries, sizeof(numGeometries)); - for (FeatureBuilder::PointSeq const & points : geom) - { - uint64_t numPoints = points.size(); - m_rawGeometryFileStream.Write(&numPoints, sizeof(numPoints)); - m_rawGeometryFileStream.Write(points.data(), - sizeof(FeatureBuilder::PointSeq::value_type) * points.size()); - } - return featureId; -} - uint32_t CheckedFilePosCast(FileWriter const & f) { uint64_t pos = f.Pos(); diff --git a/generator/feature_generator.hpp b/generator/feature_generator.hpp index 2dddf74..9f1c573 100644 --- a/generator/feature_generator.hpp +++ b/generator/feature_generator.hpp @@ -55,18 +55,5 @@ private: uint32_t m_featureID = 0; }; -class FeaturesAndRawGeometryCollector : public FeaturesCollector -{ - FileWriter m_rawGeometryFileStream; - size_t m_rawGeometryCounter = 0; - -public: - FeaturesAndRawGeometryCollector(std::string const & featuresFileName, - std::string const & rawGeometryFileName); - ~FeaturesAndRawGeometryCollector() override; - - uint32_t Collect(FeatureBuilder const & f) override; -}; - uint32_t CheckedFilePosCast(FileWriter const & f); } // namespace feature