[generator:geo_objects] Add parallel KV write

This commit is contained in:
Anatoly Serdtcev 2019-12-09 13:05:46 +03:00 committed by Sergey Yershov
parent b09d2df50e
commit 33467bf76b
13 changed files with 347 additions and 134 deletions

View file

@ -55,6 +55,8 @@ set(
intermediate_data.cpp
intermediate_data.hpp
intermediate_elements.hpp
key_value_concurrent_writer.cpp
key_value_concurrent_writer.hpp
key_value_storage.cpp
key_value_storage.hpp
locality_sorter.cpp

View file

@ -349,29 +349,32 @@ private:
};
// Process features in .dat file.
template <class SerializationPolicy = serialization_policy::MinSize, class ToDo>
void ForEachFromDatRawFormat(std::string const & filename, ToDo && toDo)
template <class SerializationPolicy = serialization_policy::MinSize, class Handler>
void ForEachFromDatRawFormat(std::string const & filename, Handler && handler)
{
auto && featuresMmap = FeaturesFileMmap{filename};
featuresMmap.ForEachTaskChunk<SerializationPolicy>(
0 /* taskIndex */, 1 /* taskCount*/, 1 /* chunkSize */, std::forward<ToDo>(toDo));
0 /* taskIndex */, 1 /* taskCount*/, 1 /* chunkSize */, std::forward<Handler>(handler));
}
/// Parallel process features in .dat file.
template <class SerializationPolicy = serialization_policy::MinSize, class ToDo>
void ForEachParallelFromDatRawFormat(unsigned int threadsCount, std::string const & filename,
ToDo && toDo, uint64_t chunkSize = 1'000)
// Parallel process features in .dat file.
template <class SerializationPolicy = serialization_policy::MinSize, class ProcessorMaker>
void ProcessParallelFromDatRawFormat(unsigned int threadsCount, uint64_t chunkSize,
std::string const & filename,
ProcessorMaker && processorMaker)
{
CHECK_GREATER_OR_EQUAL(threadsCount, 1, ());
if (threadsCount == 0 || threadsCount == 1)
return ForEachFromDatRawFormat(filename, std::forward<ToDo>(toDo));
return ForEachFromDatRawFormat<SerializationPolicy>(filename, processorMaker());
auto && featuresMmap = FeaturesFileMmap{filename};
auto && threads = std::vector<std::thread>{};
for (unsigned int i = 0; i < threadsCount; ++i)
{
threads.emplace_back([i, threadsCount, chunkSize, &featuresMmap, toDo] {
featuresMmap.ForEachTaskChunk<SerializationPolicy>(i, threadsCount, chunkSize, toDo);
auto && processor = processorMaker();
threads.emplace_back([i, threadsCount, chunkSize, &featuresMmap,
processor = std::move(processor)]() mutable {
featuresMmap.ForEachTaskChunk<SerializationPolicy>(i, threadsCount, chunkSize, processor);
});
}
@ -379,6 +382,24 @@ void ForEachParallelFromDatRawFormat(unsigned int threadsCount, std::string cons
thread.join();
}
// Parallel process features in .dat file by 1'000 items in chunk.
template <class SerializationPolicy = serialization_policy::MinSize, class ProcessorMaker>
void ProcessParallelFromDatRawFormat(unsigned int threadsCount, std::string const & filename,
ProcessorMaker && processorMaker)
{
ProcessParallelFromDatRawFormat<SerializationPolicy>(
threadsCount, 1'000 /* chunkSize */, filename, std::forward<ProcessorMaker>(processorMaker));
}
// Parallel process features in .dat file.
template <class SerializationPolicy = serialization_policy::MinSize, class Handler>
void ForEachParallelFromDatRawFormat(unsigned int threadsCount, std::string const & filename,
Handler && handler)
{
ProcessParallelFromDatRawFormat<SerializationPolicy>(
threadsCount, filename, [&handler] { return std::forward<Handler>(handler); });
}
template <class SerializationPolicy = serialization_policy::MinSize>
std::vector<FeatureBuilder> ReadAllDatRawFormat(std::string const & fileName)
{

View file

@ -153,7 +153,6 @@ void TestFindReverse(std::vector<OsmElementData> const & osmElements,
TestRegionAddress(json.get());
TEST(JsonHasBuilding(JsonValue{std::move(json)}), ("No address for", id));
}
geoObjectsGenerator->GetMaintainer().Flush();
KeyValueStorage kvStorage{geoObjectsKeyValue.GetFullPath(), 0 /*cacheValuesCountLimit*/};
@ -242,8 +241,6 @@ void TestPoiHasAddress(std::vector<OsmElementData> const & osmElements)
std::unique_ptr<GeoObjectsGenerator> geoObjectsGenerator = {
TearUp(osmElements, geoObjectsFeatures, idsWithoutAddresses, geoObjectsKeyValue)};
geoObjectsGenerator->GetMaintainer().Flush();
KeyValueStorage kvStorage{geoObjectsKeyValue.GetFullPath(), 0 /*cacheValuesCountLimit*/};
for (GeoObjectId id : expectedIds)

View file

@ -1,6 +1,7 @@
#include "generator/data_version.hpp"
#include "generator/feature_builder.hpp"
#include "generator/feature_generator.hpp"
#include "generator/key_value_concurrent_writer.hpp"
#include "generator/key_value_storage.hpp"
#include "generator/locality_sorter.hpp"
@ -22,14 +23,16 @@
#include "base/geo_object_id.hpp"
#include <boost/optional.hpp>
#include "3party/jansson/myjansson.hpp"
#include <cstdint>
#include <fstream>
#include <functional>
#include <mutex>
#include <vector>
#include <boost/filesystem.hpp>
#include <boost/optional.hpp>
using namespace feature;
@ -37,6 +40,165 @@ namespace generator
{
namespace geo_objects
{
// BufferedCuncurrentUnorderedMapUpdater -----------------------------------------------------------
template <typename Key, typename Value>
class BufferedCuncurrentUnorderedMapUpdater
{
public:
static constexpr size_t kValuesBufferSize{10'000};
// Max size for try-lock flushing into target.
static constexpr size_t kValuesBufferSizeMax{100'000};
BufferedCuncurrentUnorderedMapUpdater(std::unordered_map<Key, Value> & map,
std::mutex & mapMutex)
: m_map{map}
, m_mapMutex{mapMutex}
{ }
BufferedCuncurrentUnorderedMapUpdater(BufferedCuncurrentUnorderedMapUpdater &&) = default;
BufferedCuncurrentUnorderedMapUpdater & operator=(
BufferedCuncurrentUnorderedMapUpdater &&) = default;
~BufferedCuncurrentUnorderedMapUpdater()
{
if (!m_valuesBuffer.empty())
FlushBuffer(true);
}
template <typename... Args>
void Emplace(Args &&... args)
{
m_valuesBuffer.emplace_back(std::forward<Args>(args)...);
if (m_valuesBuffer.size() >= kValuesBufferSize)
FlushBuffer(m_valuesBuffer.size() >= kValuesBufferSizeMax);
}
private:
using MapValue = typename std::unordered_map<Key, Value>::value_type;
void FlushBuffer(bool force)
{
auto && lock =
std::unique_lock<std::mutex>{m_mapMutex, std::defer_lock};
if (force)
lock.lock();
else
lock.try_lock();
if (!lock)
return;
for (auto & value : m_valuesBuffer)
m_map.insert(std::move(value));
lock.unlock();
m_valuesBuffer.clear();
}
std::unordered_map<Key, Value> & m_map;
std::mutex & m_mapMutex;
std::vector<MapValue> m_valuesBuffer;
};
// BuildingsAndHousesGenerator ---------------------------------------------------------------------
class BuildingsAndHousesGenerator
{
public:
BuildingsAndHousesGenerator(BuildingsAndHousesGenerator const &) = delete;
BuildingsAndHousesGenerator & operator=(BuildingsAndHousesGenerator const &) = delete;
BuildingsAndHousesGenerator(
std::string const & geoObjectKeyValuePath, GeoObjectMaintainer & geoObjectMaintainer,
RegionInfoLocater const & regionInfoLocater)
: m_geoObjectKeyValuePath{geoObjectKeyValuePath}
, m_geoObjectMaintainer{geoObjectMaintainer}
, m_regionInfoLocater{regionInfoLocater}
{
}
void GenerateBuildingsAndHouses(
std::string const & geoObjectsTmpMwmPath, unsigned int threadsCount)
{
GeoId2GeoData geoId2GeoData;
uint64_t const fileSize = boost::filesystem::file_size(geoObjectsTmpMwmPath);
geoId2GeoData.reserve(std::min(uint64_t{500'000'000}, fileSize / 10));
std::mutex geoId2GeoDataMutex;
feature::ProcessParallelFromDatRawFormat(threadsCount, geoObjectsTmpMwmPath, [&] {
return Processor{*this, m_geoObjectKeyValuePath, geoId2GeoData, geoId2GeoDataMutex};
});
m_geoObjectMaintainer.SetGeoData(std::move(geoId2GeoData));
}
private:
using GeoObjectData = GeoObjectMaintainer::GeoObjectData;
using GeoId2GeoData = GeoObjectMaintainer::GeoId2GeoData;
class Processor
{
public:
Processor(BuildingsAndHousesGenerator & generator,
std::string const & geoObjectKeyValuePath,
GeoId2GeoData & geoId2GeoData, std::mutex & geoId2GeoDataMutex)
: m_generator{generator}
, m_kvWriter{geoObjectKeyValuePath}
, m_geoDataCache{geoId2GeoData, geoId2GeoDataMutex}
{
}
void operator()(FeatureBuilder & fb, uint64_t /* currPos */)
{
if (!GeoObjectsFilter::IsBuilding(fb) && !GeoObjectsFilter::HasHouse(fb))
return;
auto regionKeyValue = m_generator.m_regionInfoLocater(fb.GetKeyPoint());
if (!regionKeyValue)
return;
WriteIntoKv(fb, *regionKeyValue);
CacheGeoData(fb, *regionKeyValue);
}
private:
void WriteIntoKv(FeatureBuilder & fb, KeyValue const & regionKeyValue)
{
auto const id = fb.GetMostGenericOsmId();
auto jsonValue = AddAddress(fb.GetParams().GetStreet(), fb.GetParams().house.Get(),
fb.GetKeyPoint(), fb.GetMultilangName(), regionKeyValue);
m_kvWriter.Write(id, JsonValue{std::move(jsonValue)});
}
void CacheGeoData(FeatureBuilder & fb, KeyValue const & regionKeyValue)
{
auto const id = fb.GetMostGenericOsmId();
auto geoData = GeoObjectData{fb.GetParams().GetStreet(), fb.GetParams().house.Get(),
base::GeoObjectId(regionKeyValue.first)};
m_geoDataCache.Emplace(id, std::move(geoData));
}
BuildingsAndHousesGenerator & m_generator;
KeyValueConcurrentWriter m_kvWriter;
BufferedCuncurrentUnorderedMapUpdater<base::GeoObjectId, GeoObjectData> m_geoDataCache;
};
std::string m_geoObjectKeyValuePath;
GeoObjectMaintainer & m_geoObjectMaintainer;
RegionInfoLocater const & m_regionInfoLocater;
};
void AddBuildingsAndThingsWithHousesThenEnrichAllWithRegionAddresses(
std::string const & geoObjectKeyValuePath, GeoObjectMaintainer & geoObjectMaintainer,
std::string const & pathInGeoObjectsTmpMwm, RegionInfoLocater const & regionInfoLocater,
bool /*verbose*/, unsigned int threadsCount)
{
auto && generator =
BuildingsAndHousesGenerator{geoObjectKeyValuePath, geoObjectMaintainer, regionInfoLocater};
generator.GenerateBuildingsAndHouses(pathInGeoObjectsTmpMwm, threadsCount);
LOG(LINFO, ("Added", geoObjectMaintainer.Size(), "geo objects with addresses."));
}
namespace
{
NullBuildingsInfo GetHelpfulNullBuildings(GeoObjectMaintainer & geoObjectMaintainer,
@ -229,18 +391,6 @@ boost::optional<indexer::GeoObjectsIndex<IndexReader>> MakeTempGeoObjectsIndex(
return indexer::ReadIndex<indexer::GeoObjectsIndexBox<IndexReader>, MmapReader>(indexFile);
}
void AddBuildingsAndThingsWithHousesThenEnrichAllWithRegionAddresses(
GeoObjectMaintainer & geoObjectMaintainer, std::string const & pathInGeoObjectsTmpMwm,
bool /*verbose*/, unsigned int threadsCount)
{
auto const concurrentTransformer = [&](FeatureBuilder & fb, uint64_t /* currPos */) {
geoObjectMaintainer.StoreAndEnrich(fb);
};
ForEachParallelFromDatRawFormat(threadsCount, pathInGeoObjectsTmpMwm, concurrentTransformer);
LOG(LINFO, ("Added", geoObjectMaintainer.Size(), "geo objects with addresses."));
}
NullBuildingsInfo EnrichPointsWithOuterBuildingGeometry(GeoObjectMaintainer & geoObjectMaintainer,
std::string const & pathInGeoObjectsTmpMwm,
unsigned int threadsCount)
@ -265,11 +415,13 @@ NullBuildingsInfo EnrichPointsWithOuterBuildingGeometry(GeoObjectMaintainer & ge
void AddPoisEnrichedWithHouseAddresses(GeoObjectMaintainer & geoObjectMaintainer,
NullBuildingsInfo const & buildingsInfo,
std::string const & geoObjectKeyValuePath,
std::string const & pathInGeoObjectsTmpMwm,
std::ostream & streamPoiIdsToAddToLocalityIndex,
bool /*verbose*/, unsigned int threadsCount)
{
std::atomic_size_t counter{0};
auto && kvWriter = KeyValueConcurrentWriter{geoObjectKeyValuePath};
std::mutex streamMutex;
auto const & view = geoObjectMaintainer.CreateView();
@ -287,12 +439,12 @@ void AddPoisEnrichedWithHouseAddresses(GeoObjectMaintainer & geoObjectMaintainer
auto const id = fb.GetMostGenericOsmId();
auto jsonValue = MakeJsonValueWithNameFromFeature(fb, JsonValue{std::move(house)});
kvWriter.Write(id, JsonValue{std::move(jsonValue)});
counter++;
if (counter % 100000 == 0)
LOG(LINFO, (counter, "pois added"));
geoObjectMaintainer.WriteToStorage(id, JsonValue{std::move(jsonValue)});
std::lock_guard<std::mutex> lock(streamMutex);
streamPoiIdsToAddToLocalityIndex << id << "\n";
};

View file

@ -15,12 +15,15 @@
#include <string>
#include <boost/optional.hpp>
namespace generator
{
namespace geo_objects
{
using IndexReader = ReaderPtr<Reader>;
using RegionInfoLocater = std::function<boost::optional<KeyValue>(m2::PointD const & pathPoint)>;
boost::optional<indexer::GeoObjectsIndex<IndexReader>> MakeTempGeoObjectsIndex(
std::string const & pathToGeoObjectsTmpMwm);
@ -28,7 +31,8 @@ boost::optional<indexer::GeoObjectsIndex<IndexReader>> MakeTempGeoObjectsIndex(
bool JsonHasBuilding(JsonValue const & json);
void AddBuildingsAndThingsWithHousesThenEnrichAllWithRegionAddresses(
GeoObjectMaintainer & geoObjectMaintainer, std::string const & pathInGeoObjectsTmpMwm,
std::string const & geoObjectKeyValuePath, GeoObjectMaintainer & geoObjectMaintainer,
std::string const & pathInGeoObjectsTmpMwm, RegionInfoLocater const & regionInfoLocater,
bool verbose, unsigned int threadsCount);
struct NullBuildingsInfo
@ -46,6 +50,7 @@ NullBuildingsInfo EnrichPointsWithOuterBuildingGeometry(
void AddPoisEnrichedWithHouseAddresses(GeoObjectMaintainer & geoObjectMaintainer,
NullBuildingsInfo const & buildingsInfo,
std::string const & geoObjectKeyValuePath,
std::string const & pathInGeoObjectsTmpMwm,
std::ostream & streamPoiIdsToAddToLocalityIndex,
bool verbose, unsigned int threadsCount);

View file

@ -25,7 +25,7 @@ namespace geo_objects
{
GeoObjectsGenerator::GeoObjectsGenerator(
GeoObjectMaintainer::RegionInfoGetter && regionInfoGetter,
RegionInfoLocater && regionInfoLocater,
GeoObjectMaintainer::RegionIdGetter && regionIdGetter, std::string pathInGeoObjectsTmpMwm,
std::string pathOutIdsWithoutAddress, std::string pathOutGeoObjectsKv, bool verbose,
unsigned int threadsCount)
@ -34,7 +34,8 @@ GeoObjectsGenerator::GeoObjectsGenerator(
, m_pathOutGeoObjectsKv(std::move(pathOutGeoObjectsKv))
, m_verbose(verbose)
, m_threadsCount(threadsCount)
, m_geoObjectMaintainer{m_pathOutGeoObjectsKv, std::move(regionInfoGetter), std::move(regionIdGetter)}
, m_geoObjectMaintainer{std::move(regionIdGetter)}
, m_regionInfoLocater{std::move(regionInfoLocater)}
{
}
@ -49,7 +50,8 @@ bool GeoObjectsGenerator::GenerateGeoObjectsPrivate()
std::async(std::launch::async, MakeTempGeoObjectsIndex, m_pathInGeoObjectsTmpMwm);
AddBuildingsAndThingsWithHousesThenEnrichAllWithRegionAddresses(
m_geoObjectMaintainer, m_pathInGeoObjectsTmpMwm, m_verbose, m_threadsCount);
m_pathOutGeoObjectsKv, m_geoObjectMaintainer, m_pathInGeoObjectsTmpMwm, m_regionInfoLocater,
m_verbose, m_threadsCount);
LOG(LINFO, ("Geo objects with addresses were built."));
@ -69,8 +71,9 @@ bool GeoObjectsGenerator::GenerateGeoObjectsPrivate()
std::ofstream streamPoiIdsToAddToLocalityIndex(m_pathOutPoiIdsToAddToLocalityIndex);
AddPoisEnrichedWithHouseAddresses(m_geoObjectMaintainer, buildingInfo, m_pathInGeoObjectsTmpMwm,
streamPoiIdsToAddToLocalityIndex, m_verbose, m_threadsCount);
AddPoisEnrichedWithHouseAddresses(
m_geoObjectMaintainer, buildingInfo, m_pathOutGeoObjectsKv, m_pathInGeoObjectsTmpMwm,
streamPoiIdsToAddToLocalityIndex, m_verbose, m_threadsCount);
FilterAddresslessThanGaveTheirGeometryToInnerPoints(m_pathInGeoObjectsTmpMwm, buildingInfo,
m_threadsCount);

View file

@ -18,7 +18,7 @@ namespace geo_objects
class GeoObjectsGenerator
{
public:
GeoObjectsGenerator(GeoObjectMaintainer::RegionInfoGetter && regionInfoGetter,
GeoObjectsGenerator(RegionInfoLocater && regionInfoLocater,
GeoObjectMaintainer::RegionIdGetter && regionIdGetter,
std::string pathInGeoObjectsTmpMwm, std::string pathOutIdsWithoutAddress,
std::string pathOutGeoObjectsKv,
@ -45,6 +45,7 @@ private:
bool m_verbose = false;
unsigned int m_threadsCount = 1;
GeoObjectMaintainer m_geoObjectMaintainer;
RegionInfoLocater m_regionInfoLocater;
};
bool GenerateGeoObjects(std::string const & regionsIndex, std::string const & regionsKeyValue,

View file

@ -10,26 +10,11 @@ namespace generator
{
namespace geo_objects
{
GeoObjectMaintainer::GeoObjectMaintainer(std::string const & pathOutGeoObjectsKv,
RegionInfoGetter && regionInfoGetter,
RegionIdGetter && regionIdGetter)
: m_geoObjectsKvStorage{InitGeoObjectsKv(pathOutGeoObjectsKv)}
, m_regionInfoGetter{std::move(regionInfoGetter)}
, m_regionIdGetter(std::move(regionIdGetter))
GeoObjectMaintainer::GeoObjectMaintainer(RegionIdGetter && regionIdGetter)
: m_regionIdGetter(std::move(regionIdGetter))
{
}
// static
std::fstream GeoObjectMaintainer::InitGeoObjectsKv(std::string const & pathOutGeoObjectsKv)
{
std::fstream result{pathOutGeoObjectsKv,
std::ios_base::in | std::ios_base::out | std::ios_base::app};
if (!result)
MYTHROW(Reader::OpenException, ("Failed to open file", pathOutGeoObjectsKv));
return result;
}
void UpdateCoordinates(m2::PointD const & point, base::JSONPtr & json)
{
auto geometry = json_object_get(json.get(), "geometry");
@ -71,38 +56,6 @@ base::JSONPtr AddAddress(std::string const & street, std::string const & house,
return result;
}
void GeoObjectMaintainer::StoreAndEnrich(feature::FeatureBuilder & fb)
{
if (!GeoObjectsFilter::IsBuilding(fb) && !GeoObjectsFilter::HasHouse(fb))
return;
auto regionKeyValue = m_regionInfoGetter(fb.GetKeyPoint());
if (!regionKeyValue)
return;
auto const id = fb.GetMostGenericOsmId();
auto jsonValue = AddAddress(fb.GetParams().GetStreet(), fb.GetParams().house.Get(),
fb.GetKeyPoint(), fb.GetMultilangName(), *regionKeyValue);
{
std::lock_guard<std::mutex> lock(m_updateMutex);
auto const it = m_geoId2GeoData.emplace(
std::make_pair(id, GeoObjectData{fb.GetParams().GetStreet(), fb.GetParams().house.Get(),
base::GeoObjectId(regionKeyValue->first)}));
// Duplicate ID's are possible
if (!it.second)
return;
}
WriteToStorage(id, JsonValue{std::move(jsonValue)});
}
void GeoObjectMaintainer::WriteToStorage(base::GeoObjectId id, JsonValue && value)
{
std::lock_guard<std::mutex> lock(m_storageMutex);
m_geoObjectsKvStorage << KeyValueStorage::SerializeFullLine(id.GetEncodedId(), std::move(value));
}
// GeoObjectMaintainer::GeoObjectsView
base::JSONPtr GeoObjectMaintainer::GeoObjectsView::GetFullGeoObject(
m2::PointD point,

View file

@ -28,11 +28,12 @@ namespace generator
namespace geo_objects
{
void UpdateCoordinates(m2::PointD const & point, base::JSONPtr & json);
base::JSONPtr AddAddress(std::string const & street, std::string const & house, m2::PointD point,
StringUtf8Multilang const & name, KeyValue const & regionKeyValue);
class GeoObjectMaintainer
{
public:
using RegionInfoGetter = std::function<boost::optional<KeyValue>(m2::PointD const & pathPoint)>;
using RegionIdGetter = std::function<std::shared_ptr<JsonValue>(base::GeoObjectId id)>;
struct GeoObjectData
@ -49,13 +50,11 @@ public:
{
public:
GeoObjectsView(GeoIndex const & geoIndex, GeoId2GeoData const & geoId2GeoData,
RegionIdGetter const & regionIdGetter, std::mutex & updateMutex)
RegionIdGetter const & regionIdGetter)
: m_geoIndex(geoIndex)
, m_geoId2GeoData(geoId2GeoData)
, m_regionIdGetter(regionIdGetter)
, m_lock(updateMutex, std::defer_lock)
{
CHECK(m_lock.try_lock(), ("Cannot create GeoObjectView on locked mutex"));
}
boost::optional<base::GeoObjectId> SearchIdOfFirstMatchedObject(
m2::PointD const & point, std::function<bool(base::GeoObjectId)> && pred) const;
@ -80,34 +79,23 @@ public:
GeoIndex const & m_geoIndex;
GeoId2GeoData const & m_geoId2GeoData;
RegionIdGetter const & m_regionIdGetter;
std::unique_lock<std::mutex> m_lock;
};
GeoObjectMaintainer(std::string const & pathOutGeoObjectsKv, RegionInfoGetter && regionInfoGetter,
RegionIdGetter && regionIdGetter);
GeoObjectMaintainer(RegionIdGetter && regionIdGetter);
void SetIndex(GeoIndex && index) { m_index = std::move(index); }
void StoreAndEnrich(feature::FeatureBuilder & fb);
void WriteToStorage(base::GeoObjectId id, JsonValue && value);
void Flush() { m_geoObjectsKvStorage.flush(); }
void SetGeoData(GeoId2GeoData && geoId2GeoData) { m_geoId2GeoData = std::move(geoId2GeoData); }
size_t Size() const { return m_geoId2GeoData.size(); }
GeoObjectsView CreateView()
{
return GeoObjectsView(m_index, m_geoId2GeoData, m_regionIdGetter, m_updateMutex);
return GeoObjectsView(m_index, m_geoId2GeoData, m_regionIdGetter);
}
private:
static std::fstream InitGeoObjectsKv(std::string const & pathOutGeoObjectsKv);
std::fstream m_geoObjectsKvStorage;
std::mutex m_updateMutex;
std::mutex m_storageMutex;
GeoIndex m_index;
RegionInfoGetter m_regionInfoGetter;
RegionIdGetter m_regionIdGetter;
GeoId2GeoData m_geoId2GeoData;
};

View file

@ -0,0 +1,76 @@
#include "generator/key_value_concurrent_writer.hpp"
#include "generator/key_value_storage.hpp"
#include <cstring>
#include <stdexcept>
#include <fcntl.h>
#include <sys/stat.h>
namespace generator
{
KeyValueConcurrentWriter::KeyValueConcurrentWriter(
std::string const & keyValuePath, size_t bufferSize)
: m_bufferSize{bufferSize}
{
// Posix API are used for concurrent atomic write from threads.
::mode_t mode{S_IRUSR | S_IWUSR | S_IRGRP | S_IROTH};
m_keyValueFile = ::open(keyValuePath.c_str(), O_CREAT | O_WRONLY | O_APPEND, mode);
if (m_keyValueFile == -1)
{
throw std::runtime_error("failed to open file " + keyValuePath + ": " +
std::strerror(errno));
}
}
KeyValueConcurrentWriter::KeyValueConcurrentWriter(KeyValueConcurrentWriter && other)
{
*this = std::move(other);
}
KeyValueConcurrentWriter & KeyValueConcurrentWriter::operator=(
KeyValueConcurrentWriter && other)
{
if (m_keyValueFile != -1)
{
FlushBuffer();
::close(m_keyValueFile);
m_keyValueFile = -1;
}
std::swap(m_keyValueFile, other.m_keyValueFile);
m_keyValueBuffer = std::move(other.m_keyValueBuffer);
m_bufferSize = other.m_bufferSize;
return *this;
}
KeyValueConcurrentWriter::~KeyValueConcurrentWriter()
{
if (m_keyValueFile == -1)
return;
FlushBuffer();
::close(m_keyValueFile);
}
void KeyValueConcurrentWriter::Write(base::GeoObjectId const & id, JsonValue const & jsonValue)
{
KeyValueStorage::SerializeFullLine(m_keyValueBuffer, id.GetEncodedId(), jsonValue);
if (size_t{m_keyValueBuffer.tellp()} + 1'000 >= m_bufferSize)
FlushBuffer();
}
void KeyValueConcurrentWriter::FlushBuffer()
{
auto const & data = m_keyValueBuffer.str();
if (data.empty())
return;
auto writed = ::write(m_keyValueFile, data.data(), data.size());
// Error if ::write() interrupted by a signal.
CHECK(static_cast<size_t>(writed) == data.size(), ());
m_keyValueBuffer.str({});
}
} // namespace generator

View file

@ -0,0 +1,30 @@
#include "generator/key_value_storage.hpp"
#include "base/geo_object_id.hpp"
#include <string>
#include <sstream>
namespace generator
{
// |KeyValueConcurrentWriter| allow concurrent write to the same KV-file by multiple instance of
// this class from threads.
class KeyValueConcurrentWriter
{
public:
KeyValueConcurrentWriter(std::string const & keyValuePath, size_t bufferSize = 1'000'000);
KeyValueConcurrentWriter(KeyValueConcurrentWriter && other);
KeyValueConcurrentWriter & operator=(KeyValueConcurrentWriter && other);
~KeyValueConcurrentWriter();
// No thread-safety.
void Write(base::GeoObjectId const & id, JsonValue const & jsonValue);
private:
int m_keyValueFile{-1};
std::ostringstream m_keyValueBuffer;
size_t m_bufferSize{1'000'000};
void FlushBuffer();
};
} // namespace generator

View file

@ -14,12 +14,8 @@ namespace generator
{
KeyValueStorage::KeyValueStorage(std::string const & path, size_t cacheValuesCountLimit,
std::function<bool(KeyValue const &)> const & pred)
: m_storage{path, std::ios_base::in | std::ios_base::out | std::ios_base::app}
, m_cacheValuesCountLimit{cacheValuesCountLimit}
: m_cacheValuesCountLimit{cacheValuesCountLimit}
{
if (!m_storage)
MYTHROW(Reader::OpenException, ("Failed to open file", path));
auto storage = std::ifstream{path};
std::string line;
std::streamoff lineNumber = 0;
@ -77,31 +73,21 @@ bool KeyValueStorage::ParseKeyValueLine(std::string const & line, std::streamoff
}
// static
std::string KeyValueStorage::SerializeFullLine(uint64_t key, JsonValue && value)
void KeyValueStorage::SerializeFullLine(
std::ostream & out, uint64_t key, JsonValue const & value)
{
auto json = Serialize(value);
auto const & json = Serialize(value);
CHECK(!json.empty(), ());
std::stringstream result;
result << SerializeDref(key) << " " << json << "\n";
return result.str();
out << SerializeDref(key) << " " << json << "\n";
}
void KeyValueStorage::Insert(uint64_t key, JsonValue && value)
// static
std::string KeyValueStorage::SerializeFullLine(uint64_t key, JsonValue const & jsonValue)
{
auto json = Serialize(value);
CHECK(!json.empty(), ());
auto emplaceResult = m_values.emplace(key, std::move(json));
if (!emplaceResult.second) // it is ok for OSM relation with several outer borders
return;
auto const & emplaceIterator = emplaceResult.first;
auto const & result = boost::get<std::string>(emplaceIterator->second);
m_storage << SerializeDref(key) << " " << result << "\n";
std::stringstream result;
SerializeFullLine(result, key, jsonValue);
return result.str();
}
std::shared_ptr<JsonValue> KeyValueStorage::Find(uint64_t key) const

View file

@ -4,6 +4,7 @@
#include <fstream>
#include <functional>
#include <memory>
#include <ostream>
#include <string>
#include <unordered_map>
#include <utility>
@ -20,11 +21,11 @@ public:
explicit JsonValue(json_t * value = nullptr) : m_handle{value} {}
explicit JsonValue(base::JSONPtr && value) : m_handle{std::move(value)} {}
JsonValue(JsonValue const &) = delete;
JsonValue & operator=(JsonValue const &) = delete;
JsonValue(JsonValue &&) = default;
JsonValue & operator=(JsonValue &&) = default;
operator json_t const *() const noexcept { return m_handle.get(); }
operator base::JSONPtr const &() noexcept { return m_handle; };
operator base::JSONPtr const &() const noexcept { return m_handle; };
base::JSONPtr MakeDeepCopyJson() const { return base::JSONPtr{json_deep_copy(m_handle.get())}; }
private:
@ -52,9 +53,8 @@ public:
KeyValueStorage(KeyValueStorage const &) = delete;
KeyValueStorage & operator=(KeyValueStorage const &) = delete;
void Insert(uint64_t key, JsonValue && valueJson);
static std::string SerializeFullLine(uint64_t key, JsonValue && valueJson);
static std::string SerializeFullLine(uint64_t key, JsonValue const & valueJson);
static void SerializeFullLine(std::ostream & out, uint64_t key, JsonValue const & jsonValue);
std::shared_ptr<JsonValue> Find(uint64_t key) const;
size_t Size() const;
@ -72,7 +72,6 @@ private:
static bool DefaultPred(KeyValue const &) { return true; }
static bool ParseKeyValueLine(std::string const & line, std::streamoff lineNumber, uint64_t & key,
std::string & value);
std::fstream m_storage;
std::unordered_map<uint64_t, Value> m_values;
size_t m_cacheValuesCountLimit;
};