[generator:geo_objects] Parallel generation KV

This commit is contained in:
Anatoly Serdtcev 2019-05-08 13:35:42 +03:00 committed by Maksim Andrianov
parent 0466776ab1
commit 61bc809a4a

View file

@ -182,10 +182,13 @@ void FilterAddresslessByCountryAndRepackMwm(std::string const & pathInGeoObjects
void BuildGeoObjectsWithAddresses(regions::RegionInfoGetter const & regionInfoGetter,
std::string const & pathInGeoObjectsTmpMwm,
std::ostream & streamGeoObjectsKv, bool)
std::ostream & streamGeoObjectsKv,
bool verbose, size_t threadsCount)
{
size_t countGeoObjects = 0;
auto const fn = [&](FeatureBuilder & fb, uint64_t /* currPos */) {
std::mutex kvStreamMutex;
auto const concurrentTransformer = [&](FeatureBuilder & fb, uint64_t /* currPos */) {
if (!GeoObjectsFilter::IsBuilding(fb) && !GeoObjectsFilter::HasHouse(fb))
return;
@ -193,23 +196,29 @@ void BuildGeoObjectsWithAddresses(regions::RegionInfoGetter const & regionInfoGe
if (!regionKeyValue)
return;
auto const id = fb.GetMostGenericOsmId().GetEncodedId();
auto const value = MakeGeoObjectValueWithAddress(fb, *regionKeyValue);
streamGeoObjectsKv << static_cast<int64_t>(fb.GetMostGenericOsmId().GetEncodedId()) << " "
<< value.get() << "\n";
std::lock_guard<std::mutex> lock(kvStreamMutex);
streamGeoObjectsKv << static_cast<int64_t>(id) << " " << value.get() << "\n";
++countGeoObjects;
};
ForEachFromDatRawFormat(pathInGeoObjectsTmpMwm, fn);
ForEachParallelFromDatRawFormat(threadsCount, pathInGeoObjectsTmpMwm, concurrentTransformer);
LOG(LINFO, ("Added ", countGeoObjects, "geo objects with addresses."));
}
void BuildGeoObjectsWithoutAddresses(GeoObjectInfoGetter const & geoObjectInfoGetter,
std::string const & pathInGeoObjectsTmpMwm,
std::ostream & streamGeoObjectsKv,
std::ostream & streamIdsWithoutAddress, bool)
std::ostream & streamIdsWithoutAddress,
bool verbose, size_t threadsCount)
{
size_t countGeoObjects = 0;
auto const fn = [&](FeatureBuilder & fb, uint64_t /* currPos */) {
std::mutex kvStreamMutex;
auto const concurrentTransformer = [&](FeatureBuilder & fb, uint64_t /* currPos */) {
if (!GeoObjectsFilter::IsPoi(fb))
return;
if (GeoObjectsFilter::IsBuilding(fb) || GeoObjectsFilter::HasHouse(fb))
@ -219,14 +228,17 @@ void BuildGeoObjectsWithoutAddresses(GeoObjectInfoGetter const & geoObjectInfoGe
if (!house)
return;
auto const value = MakeGeoObjectValueWithoutAddress(fb, *house);
auto const id = static_cast<int64_t>(fb.GetMostGenericOsmId().GetEncodedId());
auto const value = MakeGeoObjectValueWithoutAddress(fb, *house);
std::lock_guard<std::mutex> lock(kvStreamMutex);
streamGeoObjectsKv << id << " " << value.get() << "\n";
streamIdsWithoutAddress << id << "\n";
++countGeoObjects;
};
ForEachFromDatRawFormat(pathInGeoObjectsTmpMwm, fn);
ForEachParallelFromDatRawFormat(threadsCount, pathInGeoObjectsTmpMwm, concurrentTransformer);
LOG(LINFO, ("Added ", countGeoObjects, "geo objects without addresses."));
}
} // namespace
@ -261,7 +273,8 @@ bool GenerateGeoObjects(std::string const & pathInRegionsIndex,
std::ofstream streamGeoObjectsKv(pathOutGeoObjectsKv);
BuildGeoObjectsWithAddresses(regionInfoGetter, pathInGeoObjectsTmpMwm, streamGeoObjectsKv, verbose);
BuildGeoObjectsWithAddresses(regionInfoGetter, pathInGeoObjectsTmpMwm, streamGeoObjectsKv,
verbose, threadsCount);
LOG(LINFO, ("Geo objects with addresses were built."));
auto const pred = [](KeyValue const & kv) { return HouseHasAddress(*kv.second); };
@ -276,7 +289,8 @@ bool GenerateGeoObjects(std::string const & pathInRegionsIndex,
GeoObjectInfoGetter geoObjectInfoGetter{std::move(*geoObjectIndex), std::move(geoObjectsKv)};
std::ofstream streamIdsWithoutAddress(pathOutIdsWithoutAddress);
BuildGeoObjectsWithoutAddresses(geoObjectInfoGetter, pathInGeoObjectsTmpMwm,
streamGeoObjectsKv, streamIdsWithoutAddress, verbose);
streamGeoObjectsKv, streamIdsWithoutAddress,
verbose, threadsCount);
LOG(LINFO, ("Geo objects without addresses were built."));
LOG(LINFO, ("Geo objects key-value storage saved to", pathOutGeoObjectsKv));
LOG(LINFO, ("Ids of POIs without addresses saved to", pathOutIdsWithoutAddress));