diff --git a/generator/generator_tests/intermediate_data_test.cpp b/generator/generator_tests/intermediate_data_test.cpp index 878e9c9..dfbbfc0 100644 --- a/generator/generator_tests/intermediate_data_test.cpp +++ b/generator/generator_tests/intermediate_data_test.cpp @@ -170,7 +170,7 @@ void TestIntermediateDataGeneration( auto const & osmFileData = sample.second; // Skip test for node storage type "mem": 64Gb required. - for (auto const & nodeStorageType : {"raw"s, "map"s}) + for (auto const & nodeStorageType : {"raw"s, "map"s, "mem"s}) { for (auto threadsCount : {1, 2, 4}) { diff --git a/generator/intermediate_data.cpp b/generator/intermediate_data.cpp index 0f5d395..85b5bee 100644 --- a/generator/intermediate_data.cpp +++ b/generator/intermediate_data.cpp @@ -6,6 +6,9 @@ #include #include +#include + +#include #include "base/assert.hpp" #include "base/checked_cast.hpp" @@ -153,33 +156,52 @@ private: class RawMemPointStorageWriter : public PointStorageWriterInterface { public: - explicit RawMemPointStorageWriter(string const & name) : - m_fileWriter(name), - m_data(kMaxNodesInOSM) + explicit RawMemPointStorageWriter(string const & name) { + auto fileParams = boost::iostreams::mapped_file_params{name}; + fileParams.flags = boost::iostreams::mapped_file_sink::readwrite; + fileParams.new_file_size = kMaxNodesInOSM * sizeof(LatLon); + m_fileMap.open(fileParams); + if (!m_fileMap.is_open()) + MYTHROW(Writer::OpenException, ("Failed to open", name)); + + // File (mapping pages) are updated sequentially by ascending node's ids. + // Advice to flush dirty pages after update sequentially for consecutive writing to disk. + // See https://stackoverflow.com/questions/5902629/mmap-msync-and-linux-process-termination. + ::madvise(const_cast(m_fileMap.data()), m_fileMap.size(), MADV_SEQUENTIAL); } ~RawMemPointStorageWriter() { - m_fileWriter.Write(m_data.data(), m_data.size() * sizeof(LatLon)); + // Mark dirty pages to be flushed asynchronously, no wait system buffers flushing to disk. + // Subsequent read() will retrive updated data from flushed/unflushed cache or from a disk. + ::msync(m_fileMap.data(), m_fileMap.size(), MS_ASYNC); } // PointStorageWriterInterface overrides: void AddPoint(uint64_t id, double lat, double lon) override { - CHECK_LESS(id, m_data.size(), + CHECK_LESS(id, m_fileMap.size() / sizeof(LatLon), ("Found node with id", id, "which is bigger than the allocated cache size")); - LatLon & ll = m_data[id]; + auto & ll = reinterpret_cast(m_fileMap.data())[id]; ToLatLon(lat, lon, ll); m_numProcessedPoints.fetch_add(1, std::memory_order_relaxed); } void AddPoints(Nodes const & nodes, bool /* concurrent */) override { + if (nodes.empty()) + return; + // Check only last point (bigest id in nodes). + CHECK_LESS(nodes.back().first, m_fileMap.size() / sizeof(LatLon), + ("Found node with id", nodes.back().first, + "which is bigger than the allocated cache size")); + + auto const data = reinterpret_cast(m_fileMap.data()); for (auto const & node : nodes) { - LatLon & ll = m_data[node.first]; + LatLon & ll = data[node.first]; ToLatLon(node.second.m_lat, node.second.m_lon, ll); } @@ -188,8 +210,7 @@ public: uint64_t GetNumProcessedPoints() const override { return m_numProcessedPoints; } private: - FileWriter m_fileWriter; - vector m_data; + boost::iostreams::mapped_file_sink m_fileMap; std::atomic m_numProcessedPoints{0}; };