[generator:geo_objects] Speedup poi generation: parallel write into KV in node-file

This commit is contained in:
Anatoly Serdtcev 2019-12-25 19:35:34 +03:00 committed by LaGrunge
parent 73f4a75c72
commit 917387ce20
3 changed files with 140 additions and 77 deletions

View file

@ -415,44 +415,149 @@ NullBuildingsInfo EnrichPointsWithOuterBuildingGeometry(GeoObjectMaintainer & ge
return addressing.GetNullBuildingsInfo();
}
//--------------------------------------------------------------------------------------------------
base::JSONPtr FindHouse(FeatureBuilder const & fb,
GeoObjectMaintainer::GeoObjectsView const & geoView,
NullBuildingsInfo const & buildingsInfo)
// PoisAddressEnricher -----------------------------------------------------------------------------
class PoisAddressEnricher
{
base::JSONPtr house =
geoView.GetFullGeoObject(fb.GetKeyPoint(), [](GeoObjectMaintainer::GeoObjectData const & data) {
return !data.m_house.empty();
});
if (house)
return house;
std::vector<base::GeoObjectId> potentialIds = geoView.SearchObjectsInIndex(fb.GetKeyPoint());
for (base::GeoObjectId id : potentialIds)
public:
PoisAddressEnricher(std::string geoObjectKeyValuePath,
std::string const & geoObjectsTmpMwmPath,
std::string const & localityIndexedPoiIdsPath,
GeoObjectMaintainer & geoObjectMaintainer,
NullBuildingsInfo const & buildingsInfo,
unsigned int threadsCount)
: m_geoObjectKeyValuePath{geoObjectKeyValuePath}
, m_geoObjectsTmpMwmPath{geoObjectsTmpMwmPath}
, m_localityIndexedPoiIdsPath{localityIndexedPoiIdsPath}
, m_geoObjectMaintainer{geoObjectMaintainer}
, m_buildingsInfo{buildingsInfo}
, m_threadsCount{threadsCount}
{
auto const it = buildingsInfo.m_buildings2AddressPoints.find(id);
if (it != buildingsInfo.m_buildings2AddressPoints.end())
return geoView.GetFullGeoObjectWithoutNameAndCoordinates(it->second);
}
return {};
}
void AddAddresses()
{
auto poiIdsFilesMerger = FilesMerger(m_localityIndexedPoiIdsPath);
auto && poisAddressEnrichedStat = std::atomic_size_t{0};
feature::ProcessParallelFromDatRawFormat(m_threadsCount, m_geoObjectsTmpMwmPath, [&] {
return Processor{*this, poiIdsFilesMerger, poisAddressEnrichedStat};
});
poiIdsFilesMerger.Merge();
base::JSONPtr MakeJsonValueWithNameFromFeature(FeatureBuilder const & fb, JsonValue const & json)
LOG(LINFO, ("Added", poisAddressEnrichedStat, "POIs enriched with address."));
}
private:
class Processor
{
public:
Processor(PoisAddressEnricher & enricher, FilesMerger & poiIdsFilesMerger,
std::atomic_size_t & poisAddressEnrichedStat)
: m_goObjectsView{enricher.m_geoObjectMaintainer.CreateView()}
, m_buildingsInfo{enricher.m_buildingsInfo}
, m_kvWriter{enricher.m_geoObjectKeyValuePath}
, m_poisAddressEnrichedStat{poisAddressEnrichedStat}
{
auto const poiIdsPath = GetPlatform().TmpPathForFile();
m_localityIndexedPoiIds.open(poiIdsPath);
if (!m_localityIndexedPoiIds.is_open())
MYTHROW(Writer::OpenException, ("Failed to open", poiIdsPath));
poiIdsFilesMerger.DeferMergeAndDelete(poiIdsPath);
}
void operator()(FeatureBuilder & fb, uint64_t /* currPos */)
{
if (!GeoObjectsFilter::IsPoi(fb))
return;
if (GeoObjectsFilter::IsBuilding(fb) || GeoObjectsFilter::HasHouse(fb))
return;
// No name and coordinates here, we will take it from fb in MakeJsonValueWithNameFromFeature
auto house = FindHouse(fb);
if (!house)
return;
WriteIntoKv(fb, house);
auto const id = fb.GetMostGenericOsmId();
m_localityIndexedPoiIds << id << "\n";
}
private:
JsonValue FindHouse(FeatureBuilder const & fb)
{
base::JSONPtr house =
m_goObjectsView.GetFullGeoObject(
fb.GetKeyPoint(), [](GeoObjectMaintainer::GeoObjectData const & data)
{
return !data.m_house.empty();
});
if (house)
return JsonValue{std::move(house)};
auto && potentialIds = m_goObjectsView.SearchObjectsInIndex(fb.GetKeyPoint());
auto const & buildings2AddressPoints = m_buildingsInfo.m_buildings2AddressPoints;
for (base::GeoObjectId id : potentialIds)
{
auto const it = buildings2AddressPoints.find(id);
if (it != buildings2AddressPoints.end())
return JsonValue{m_goObjectsView.GetFullGeoObjectWithoutNameAndCoordinates(it->second)};
}
return JsonValue{};
}
JsonValue MakeJsonValueWithNameFromFeature(FeatureBuilder const & fb, JsonValue const & json)
{
auto jsonWithAddress = json.MakeDeepCopyJson();
auto properties = json_object_get(jsonWithAddress.get(), "properties");
Localizator localizator(*properties);
localizator.SetLocale("name", Localizator::EasyObjectWithTranslation(fb.GetMultilangName()));
UpdateCoordinates(fb.GetKeyPoint(), jsonWithAddress);
return JsonValue{std::move(jsonWithAddress)};
}
void WriteIntoKv(FeatureBuilder & fb, JsonValue const & house)
{
auto const id = fb.GetMostGenericOsmId();
auto jsonValue = MakeJsonValueWithNameFromFeature(fb, house);
m_kvWriter.Write(id, JsonValue{std::move(jsonValue)});
++m_poisAddressEnrichedStat;
}
GeoObjectMaintainer::GeoObjectsView m_goObjectsView;
NullBuildingsInfo const & m_buildingsInfo;
KeyValueConcurrentWriter m_kvWriter;
std::ofstream m_localityIndexedPoiIds;
std::atomic_size_t & m_poisAddressEnrichedStat;
};
std::string m_geoObjectKeyValuePath;
std::string m_geoObjectsTmpMwmPath;
std::string m_localityIndexedPoiIdsPath;
GeoObjectMaintainer & m_geoObjectMaintainer;
NullBuildingsInfo const & m_buildingsInfo;
unsigned int m_threadsCount;
};
void AddPoisEnrichedWithHouseAddresses(GeoObjectMaintainer & geoObjectMaintainer,
NullBuildingsInfo const & buildingsInfo,
std::string const & geoObjectKeyValuePath,
std::string const & pathInGeoObjectsTmpMwm,
std::string const & localityIndexedPoiIdsPath,
bool /*verbose*/, unsigned int threadsCount)
{
auto jsonWithAddress = json.MakeDeepCopyJson();
auto properties = json_object_get(jsonWithAddress.get(), "properties");
Localizator localizator(*properties);
localizator.SetLocale("name", Localizator::EasyObjectWithTranslation(fb.GetMultilangName()));
UpdateCoordinates(fb.GetKeyPoint(), jsonWithAddress);
return jsonWithAddress;
auto && poisAddressEnricher =
PoisAddressEnricher{geoObjectKeyValuePath, pathInGeoObjectsTmpMwm, localityIndexedPoiIdsPath,
geoObjectMaintainer, buildingsInfo, threadsCount};
poisAddressEnricher.AddAddresses();
}
//--------------------------------------------------------------------------------------------------
bool JsonHasBuilding(JsonValue const & json)
{
auto && address =
@ -475,44 +580,5 @@ boost::optional<indexer::GeoObjectsIndex<IndexReader>> MakeTempGeoObjectsIndex(
return indexer::ReadIndex<indexer::GeoObjectsIndexBox<IndexReader>, MmapReader>(indexFile);
}
void AddPoisEnrichedWithHouseAddresses(GeoObjectMaintainer & geoObjectMaintainer,
NullBuildingsInfo const & buildingsInfo,
std::string const & geoObjectKeyValuePath,
std::string const & pathInGeoObjectsTmpMwm,
std::ostream & streamPoiIdsToAddToCoveringIndex,
bool /*verbose*/, unsigned int threadsCount)
{
std::atomic_size_t counter{0};
auto && kvWriter = KeyValueConcurrentWriter{geoObjectKeyValuePath};
std::mutex streamMutex;
auto const & view = geoObjectMaintainer.CreateView();
auto const concurrentTransformer = [&](FeatureBuilder & fb, uint64_t /* currPos */) {
if (!GeoObjectsFilter::IsPoi(fb))
return;
if (GeoObjectsFilter::IsBuilding(fb) || GeoObjectsFilter::HasHouse(fb))
return;
// No name and coordinates here, we will take it from fb in MakeJsonValueWithNameFromFeature
auto house = FindHouse(fb, view, buildingsInfo);
if (!house)
return;
auto const id = fb.GetMostGenericOsmId();
auto jsonValue = MakeJsonValueWithNameFromFeature(fb, JsonValue{std::move(house)});
counter++;
if (counter % 100000 == 0)
LOG(LINFO, (counter, "pois added"));
std::lock_guard<std::mutex> lock(streamMutex);
kvWriter.Write(id, JsonValue{std::move(jsonValue)});
streamPoiIdsToAddToCoveringIndex << id << "\n";
};
ForEachParallelFromDatRawFormat(threadsCount, pathInGeoObjectsTmpMwm, concurrentTransformer);
LOG(LINFO, ("Added", counter, "POIs enriched with address."));
}
} // namespace geo_objects
} // namespace generator

View file

@ -48,11 +48,9 @@ NullBuildingsInfo EnrichPointsWithOuterBuildingGeometry(
GeoObjectMaintainer & geoObjectMaintainer, std::string const & pathInGeoObjectsTmpMwm,
unsigned int threadsCount);
void AddPoisEnrichedWithHouseAddresses(GeoObjectMaintainer & geoObjectMaintainer,
NullBuildingsInfo const & buildingsInfo,
std::string const & geoObjectKeyValuePath,
std::string const & pathInGeoObjectsTmpMwm,
std::ostream & streamPoiIdsToAddToCoveringIndex,
bool verbose, unsigned int threadsCount);
void AddPoisEnrichedWithHouseAddresses(
GeoObjectMaintainer & geoObjectMaintainer, NullBuildingsInfo const & buildingsInfo,
std::string const & geoObjectKeyValuePath, std::string const & pathInGeoObjectsTmpMwm,
std::string const & localityIndexedPoiIdsPath, bool verbose, unsigned int threadsCount);
} // namespace geo_objects
} // namespace generator

View file

@ -64,10 +64,9 @@ bool GeoObjectsGenerator::GenerateGeoObjectsPrivate()
NullBuildingsInfo const & buildingInfo = EnrichPointsWithOuterBuildingGeometry(
m_geoObjectMaintainer, m_pathInGeoObjectsTmpMwm, m_threadsCount);
std::ofstream streamPoiIdsToAddToLocalityIndex(m_pathOutPoiIdsToAddToCoveringIndex);
AddPoisEnrichedWithHouseAddresses(m_geoObjectMaintainer, buildingInfo,
m_pathOutGeoObjectsKv, m_pathInGeoObjectsTmpMwm,
streamPoiIdsToAddToLocalityIndex,
m_pathOutPoiIdsToAddToCoveringIndex,
m_verbose, m_threadsCount);
LOG(LINFO, ("Geo objects without addresses were built."));