[generator:preprocess] Optimize node file write: async write by mmap()

This commit is contained in:
Anatoly Serdtcev 2019-11-04 09:44:05 +03:00 committed by Sergey Yershov
parent a88f11131f
commit e8c8e74f7d
2 changed files with 31 additions and 10 deletions

View file

@ -170,7 +170,7 @@ void TestIntermediateDataGeneration(
auto const & osmFileData = sample.second;
// Skip test for node storage type "mem": 64Gb required.
for (auto const & nodeStorageType : {"raw"s, "map"s})
for (auto const & nodeStorageType : {"raw"s, "map"s, "mem"s})
{
for (auto threadsCount : {1, 2, 4})
{

View file

@ -6,6 +6,9 @@
#include <string>
#include <boost/filesystem.hpp>
#include <boost/iostreams/device/mapped_file.hpp>
#include <sys/mman.h>
#include "base/assert.hpp"
#include "base/checked_cast.hpp"
@ -153,33 +156,52 @@ private:
class RawMemPointStorageWriter : public PointStorageWriterInterface
{
public:
explicit RawMemPointStorageWriter(string const & name) :
m_fileWriter(name),
m_data(kMaxNodesInOSM)
explicit RawMemPointStorageWriter(string const & name)
{
auto fileParams = boost::iostreams::mapped_file_params{name};
fileParams.flags = boost::iostreams::mapped_file_sink::readwrite;
fileParams.new_file_size = kMaxNodesInOSM * sizeof(LatLon);
m_fileMap.open(fileParams);
if (!m_fileMap.is_open())
MYTHROW(Writer::OpenException, ("Failed to open", name));
// File (mapping pages) are updated sequentially by ascending node's ids.
// Advice to flush dirty pages after update sequentially for consecutive writing to disk.
// See https://stackoverflow.com/questions/5902629/mmap-msync-and-linux-process-termination.
::madvise(const_cast<char*>(m_fileMap.data()), m_fileMap.size(), MADV_SEQUENTIAL);
}
~RawMemPointStorageWriter()
{
m_fileWriter.Write(m_data.data(), m_data.size() * sizeof(LatLon));
// Mark dirty pages to be flushed asynchronously, no wait system buffers flushing to disk.
// Subsequent read() will retrive updated data from flushed/unflushed cache or from a disk.
::msync(m_fileMap.data(), m_fileMap.size(), MS_ASYNC);
}
// PointStorageWriterInterface overrides:
void AddPoint(uint64_t id, double lat, double lon) override
{
CHECK_LESS(id, m_data.size(),
CHECK_LESS(id, m_fileMap.size() / sizeof(LatLon),
("Found node with id", id, "which is bigger than the allocated cache size"));
LatLon & ll = m_data[id];
auto & ll = reinterpret_cast<LatLon*>(m_fileMap.data())[id];
ToLatLon(lat, lon, ll);
m_numProcessedPoints.fetch_add(1, std::memory_order_relaxed);
}
void AddPoints(Nodes const & nodes, bool /* concurrent */) override
{
if (nodes.empty())
return;
// Check only last point (bigest id in nodes).
CHECK_LESS(nodes.back().first, m_fileMap.size() / sizeof(LatLon),
("Found node with id", nodes.back().first,
"which is bigger than the allocated cache size"));
auto const data = reinterpret_cast<LatLon*>(m_fileMap.data());
for (auto const & node : nodes)
{
LatLon & ll = m_data[node.first];
LatLon & ll = data[node.first];
ToLatLon(node.second.m_lat, node.second.m_lon, ll);
}
@ -188,8 +210,7 @@ public:
uint64_t GetNumProcessedPoints() const override { return m_numProcessedPoints; }
private:
FileWriter m_fileWriter;
vector<LatLon> m_data;
boost::iostreams::mapped_file_sink m_fileMap;
std::atomic<uint64_t> m_numProcessedPoints{0};
};