[generator] Speedup generation: features file parallel reading

This commit is contained in:
Anatoly Serdtcev 2019-11-10 15:58:14 +03:00 committed by LaGrunge
parent 6d388bc614
commit 152eb7bd1d
4 changed files with 80 additions and 88 deletions

View file

@ -18,6 +18,8 @@
#include <cstring>
#include <vector>
#include <sys/mman.h>
using namespace std;
namespace
@ -785,4 +787,18 @@ TypeSerializationVersion const MinSize::kSerializationVersion;
// static
TypeSerializationVersion const MaxAccuracy::kSerializationVersion;
} // namespace serialization_policy
// FeaturesFileMmap --------------------------------------------------------------------------------
FeaturesFileMmap::FeaturesFileMmap(std::string const & filename)
: m_fileMmap{filename}
{
if (!m_fileMmap.is_open())
MYTHROW(Writer::OpenException, ("Failed to open", filename));
// Try aggressively (MADV_WILLNEED) and asynchronously read ahead the feature-file.
auto readaheadTask = std::thread([data = m_fileMmap.data(), size = m_fileMmap.size()] {
::madvise(const_cast<char*>(data), size, MADV_WILLNEED);
});
readaheadTask.detach();
}
} // namespace feature

View file

@ -17,6 +17,8 @@
#include <thread>
#include <vector>
#include <boost/iostreams/device/mapped_file.hpp>
namespace serial
{
class GeometryCodingParams;
@ -289,6 +291,7 @@ struct MaxAccuracy
};
} // namespace serialization_policy
// Features file processing ------------------------------------------------------------------------
// Read feature from feature source.
template <class SerializationPolicy = serialization_policy::MinSize, class Source>
void ReadFromSourceRawFormat(Source & src, FeatureBuilder & fb)
@ -299,65 +302,83 @@ void ReadFromSourceRawFormat(Source & src, FeatureBuilder & fb)
SerializationPolicy::Deserialize(fb, buffer);
}
class FeaturesFileMmap
{
public:
class const_iterator;
FeaturesFileMmap(std::string const & filename);
template <typename SerializationPolicy, typename Handler>
void ForEachTaskChunk(unsigned int taskIndex, unsigned int tasksCount, size_t chunkSize,
Handler && handler) const
{
auto && reader =
MemReaderTemplate<true /* WithExceptions */>{m_fileMmap.data(), m_fileMmap.size()};
auto && src = ReaderSource<MemReaderTemplate<true>>{reader};
auto && buffer = FeatureBuilder::Buffer{};
auto && fileSize = reader.Size();
for (size_t featuresCounter = 0; src.Pos() < fileSize; ++featuresCounter)
{
auto const featurePos = src.Pos();
uint32_t const featureSize = ReadVarUint<uint32_t>(src);
auto const featureChunkIndex = featuresCounter / chunkSize;
auto const featureTaskIndex = featureChunkIndex % tasksCount;
if (featureTaskIndex != taskIndex)
{
src.Skip(featureSize);
continue;
}
buffer.resize(featureSize);
src.Read(buffer.data(), featureSize);
auto && fb = FeatureBuilder{};
SerializationPolicy::Deserialize(fb, buffer);
handler(fb, featurePos);
}
}
private:
boost::iostreams::mapped_file_source m_fileMmap;
};
// Process features in .dat file.
template <class SerializationPolicy = serialization_policy::MinSize, class ToDo>
void ForEachFromDatRawFormat(std::string const & filename, ToDo && toDo)
{
FileReader reader(filename);
ReaderSource<FileReader> src(reader);
auto const fileSize = reader.Size();
auto currPos = src.Pos();
// read features one by one
while (currPos < fileSize)
{
FeatureBuilder fb;
ReadFromSourceRawFormat<SerializationPolicy>(src, fb);
toDo(fb, currPos);
currPos = src.Pos();
}
auto && featuresMmap = FeaturesFileMmap{filename};
featuresMmap.ForEachTaskChunk<SerializationPolicy>(
0 /* taskIndex */, 1 /* taskCount*/, 1 /* chunkSize */, std::forward<ToDo>(toDo));
}
/// Parallel process features in .dat file.
template <class SerializationPolicy = serialization_policy::MinSize, class ToDo>
void ForEachParallelFromDatRawFormat(unsigned int threadsCount, std::string const & filename,
ToDo && toDo)
ToDo && toDo, uint64_t chunkSize = 1'000)
{
CHECK_GREATER_OR_EQUAL(threadsCount, 1, ());
if (threadsCount == 1)
if (threadsCount == 0 || threadsCount == 1)
return ForEachFromDatRawFormat(filename, std::forward<ToDo>(toDo));
FileReader reader(filename);
ReaderSource<FileReader> src(reader);
auto const fileSize = reader.Size();
auto currPos = src.Pos();
std::mutex readMutex;
auto concurrentProcessor = [&] {
for (;;)
{
FeatureBuilder fb;
uint64_t featurePos;
auto && featuresMmap = FeaturesFileMmap{filename};
auto && threads = std::vector<std::thread>{};
for (unsigned int i = 0; i < threadsCount; ++i)
{
threads.emplace_back([i, threadsCount, chunkSize, &featuresMmap, toDo] {
featuresMmap.ForEachTaskChunk<SerializationPolicy>(i, threadsCount, chunkSize, toDo);
});
}
{
std::lock_guard<std::mutex> lock(readMutex);
if (fileSize <= currPos)
break;
ReadFromSourceRawFormat<SerializationPolicy>(src, fb);
featurePos = currPos;
currPos = src.Pos();
}
toDo(fb, featurePos);
}
};
std::vector<std::thread> workers;
for (size_t i = 0; i < threadsCount; ++i)
workers.emplace_back(concurrentProcessor);
for (auto & thread : workers)
for (auto & thread : threads)
thread.join();
}
template <class SerializationPolicy = serialization_policy::MinSize>
std::vector<FeatureBuilder> ReadAllDatRawFormat(std::string const & fileName)
{

View file

@ -95,38 +95,6 @@ uint32_t FeaturesCollector::Collect(FeatureBuilder const & fb)
return featureId;
}
FeaturesAndRawGeometryCollector::FeaturesAndRawGeometryCollector(std::string const & featuresFileName,
std::string const & rawGeometryFileName)
: FeaturesCollector(featuresFileName), m_rawGeometryFileStream(rawGeometryFileName) {}
FeaturesAndRawGeometryCollector::~FeaturesAndRawGeometryCollector()
{
uint64_t terminator = 0;
m_rawGeometryFileStream.Write(&terminator, sizeof(terminator));
LOG(LINFO, ("Write", m_rawGeometryCounter, "geometries into", m_rawGeometryFileStream.GetName()));
}
uint32_t FeaturesAndRawGeometryCollector::Collect(FeatureBuilder const & fb)
{
uint32_t const featureId = FeaturesCollector::Collect(fb);
FeatureBuilder::Geometry const & geom = fb.GetGeometry();
if (geom.empty())
return featureId;
++m_rawGeometryCounter;
uint64_t numGeometries = geom.size();
m_rawGeometryFileStream.Write(&numGeometries, sizeof(numGeometries));
for (FeatureBuilder::PointSeq const & points : geom)
{
uint64_t numPoints = points.size();
m_rawGeometryFileStream.Write(&numPoints, sizeof(numPoints));
m_rawGeometryFileStream.Write(points.data(),
sizeof(FeatureBuilder::PointSeq::value_type) * points.size());
}
return featureId;
}
uint32_t CheckedFilePosCast(FileWriter const & f)
{
uint64_t pos = f.Pos();

View file

@ -55,18 +55,5 @@ private:
uint32_t m_featureID = 0;
};
class FeaturesAndRawGeometryCollector : public FeaturesCollector
{
FileWriter m_rawGeometryFileStream;
size_t m_rawGeometryCounter = 0;
public:
FeaturesAndRawGeometryCollector(std::string const & featuresFileName,
std::string const & rawGeometryFileName);
~FeaturesAndRawGeometryCollector() override;
uint32_t Collect(FeatureBuilder const & f) override;
};
uint32_t CheckedFilePosCast(FileWriter const & f);
} // namespace feature