diff --git a/base/deferred_task.hpp b/base/deferred_task.hpp index 18dc962a70..8fb80321f6 100644 --- a/base/deferred_task.hpp +++ b/base/deferred_task.hpp @@ -30,7 +30,7 @@ public: } private: - threads::SimpleThread m_thread; + ::threads::SimpleThread m_thread; std::mutex m_mutex; std::condition_variable m_cv; std::function m_fn; diff --git a/base/thread_pool_delayed.hpp b/base/thread_pool_delayed.hpp index 7cdc11d22f..45a64c4a7e 100644 --- a/base/thread_pool_delayed.hpp +++ b/base/thread_pool_delayed.hpp @@ -111,7 +111,7 @@ private: void ProcessTasks(); - std::vector m_threads; + std::vector<::threads::SimpleThread> m_threads; std::mutex m_mu; std::condition_variable m_cv; diff --git a/generator/feature_builder.hpp b/generator/feature_builder.hpp index 6c3c46b4ab..6de52006a7 100644 --- a/generator/feature_builder.hpp +++ b/generator/feature_builder.hpp @@ -7,9 +7,11 @@ #include "base/geo_object_id.hpp" #include "base/stl_helpers.hpp" +#include "base/thread_pool_computational.hpp" #include #include +#include #include #include @@ -257,4 +259,46 @@ void ForEachFromDatRawFormat(std::string const & filename, ToDo && toDo) currPos = src.Pos(); } } + +/// Parallel process features in .dat file. +template +void ForEachParallelFromDatRawFormat(size_t threadsCount, std::string const & filename, + ToDo && toDo) +{ + CHECK_GREATER_OR_EQUAL(threadsCount, 1, ()); + if (threadsCount == 1) + return ForEachFromDatRawFormat(filename, std::forward(toDo)); + + FileReader reader(filename); + ReaderSource src(reader); + + uint64_t currPos = 0; + uint64_t const fileSize = reader.Size(); + + std::mutex readMutex; + auto concurrentProcessor = [&] { + for (;;) + { + FeatureBuilder fb; + uint64_t featurePos; + + { + std::lock_guard lock(readMutex); + + if (fileSize <= currPos) + break; + + ReadFromSourceRawFormat(src, fb); + featurePos = currPos; + currPos = src.Pos(); + } + + toDo(fb, featurePos); + } + }; + + base::thread_pool::computational::ThreadPool threadPool{threadsCount}; + for (size_t i = 0; i < threadsCount; ++i) + threadPool.Submit(concurrentProcessor); +} } // namespace feature diff --git a/generator/generator_tool/generator_tool.cpp b/generator/generator_tool/generator_tool.cpp index bba4be7483..094057eb09 100644 --- a/generator/generator_tool/generator_tool.cpp +++ b/generator/generator_tool/generator_tool.cpp @@ -422,7 +422,8 @@ int GeneratorToolMain(int argc, char ** argv) if (!geo_objects::GenerateGeoObjects(FLAGS_regions_index, FLAGS_regions_key_value, FLAGS_geo_objects_features, FLAGS_ids_without_addresses, FLAGS_geo_objects_key_value, - FLAGS_allow_addressless_for_countries, FLAGS_verbose)) + FLAGS_allow_addressless_for_countries, + FLAGS_verbose, threadsCount)) return EXIT_FAILURE; } diff --git a/generator/geo_objects/geo_objects.cpp b/generator/geo_objects/geo_objects.cpp index fc1ebf9278..223ceb55f4 100644 --- a/generator/geo_objects/geo_objects.cpp +++ b/generator/geo_objects/geo_objects.cpp @@ -26,6 +26,7 @@ #include #include #include +#include #include "platform/platform.hpp" @@ -137,19 +138,28 @@ MakeTempGeoObjectsIndex(std::string const & pathToGeoObjectsTmpMwm) void FilterAddresslessByCountryAndRepackMwm(std::string const & pathInGeoObjectsTmpMwm, std::string const & includeCountries, - regions::RegionInfoGetter const & regionInfoGetter) + regions::RegionInfoGetter const & regionInfoGetter, + size_t threadsCount) { auto const path = Platform().TmpPathForFile(); FeaturesCollector collector(path); + std::mutex collectorMutex; + auto concurrentCollect = [&] (FeatureBuilder const & fb) { + std::lock_guard lock(collectorMutex); + collector.Collect(fb); + }; auto const filteringCollector = [&](FeatureBuilder const & fb, uint64_t /* currPos */) { if (GeoObjectsFilter::HasHouse(fb)) { - collector.Collect(fb); + concurrentCollect(fb); return; } + static_assert(std::is_base_of::value, + ""); auto regionKeyValue = regionInfoGetter.FindDeepest(fb.GetKeyPoint()); if (!regionKeyValue) return; @@ -160,9 +170,10 @@ void FilterAddresslessByCountryAndRepackMwm(std::string const & pathInGeoObjects auto countryName = FromJSON(country); auto pos = includeCountries.find(countryName); if (pos != std::string::npos) - collector.Collect(fb); + concurrentCollect(fb); }; - ForEachFromDatRawFormat(pathInGeoObjectsTmpMwm, filteringCollector); + + ForEachParallelFromDatRawFormat(threadsCount, pathInGeoObjectsTmpMwm, filteringCollector); Platform().RemoveFileIfExists(pathInGeoObjectsTmpMwm); if (std::rename(path.c_str(), pathInGeoObjectsTmpMwm.c_str()) != 0) @@ -225,7 +236,8 @@ bool GenerateGeoObjects(std::string const & pathInRegionsIndex, std::string const & pathInGeoObjectsTmpMwm, std::string const & pathOutIdsWithoutAddress, std::string const & pathOutGeoObjectsKv, - std::string const & allowAddresslessForCountries, bool verbose) + std::string const & allowAddresslessForCountries, + bool verbose, size_t threadsCount) { LOG(LINFO, ("Start generating geo objects..")); auto timer = base::Timer(); @@ -239,7 +251,7 @@ bool GenerateGeoObjects(std::string const & pathInRegionsIndex, if (allowAddresslessForCountries != "*") { FilterAddresslessByCountryAndRepackMwm(pathInGeoObjectsTmpMwm, allowAddresslessForCountries, - regionInfoGetter); + regionInfoGetter, threadsCount); LOG(LINFO, ("Addressless buildings are filtered except countries", allowAddresslessForCountries, ".")); } diff --git a/generator/geo_objects/geo_objects.hpp b/generator/geo_objects/geo_objects.hpp index 90e779eb42..9e61bfd6d5 100644 --- a/generator/geo_objects/geo_objects.hpp +++ b/generator/geo_objects/geo_objects.hpp @@ -19,6 +19,7 @@ bool GenerateGeoObjects(std::string const & pathInRegionsIndex, std::string const & pathInGeoObjectsTmpMwm, std::string const & pathOutIdsWithoutAddress, std::string const & pathOutGeoObjectsKv, - std::string const & allowAddresslessForCountries, bool verbose); + std::string const & allowAddresslessForCountries, + bool verbose, size_t threadsCount); } // namespace geo_objects } // namespace generator diff --git a/generator/regions/region_info_getter.cpp b/generator/regions/region_info_getter.cpp index 6a560caf57..0767df998c 100644 --- a/generator/regions/region_info_getter.cpp +++ b/generator/regions/region_info_getter.cpp @@ -23,6 +23,8 @@ boost::optional RegionInfoGetter::FindDeepest(m2::PointD const & point boost::optional RegionInfoGetter::FindDeepest( m2::PointD const & point, Selector const & selector) const { + static_assert(std::is_base_of::value, ""); + auto const ids = SearchObjectsInIndex(point); return GetDeepest(point, ids, selector); } diff --git a/generator/regions/region_info_getter.hpp b/generator/regions/region_info_getter.hpp index 77fd1c7c44..933f973ea1 100644 --- a/generator/regions/region_info_getter.hpp +++ b/generator/regions/region_info_getter.hpp @@ -22,7 +22,11 @@ namespace generator { namespace regions { -class RegionInfoGetter +// ConcurrentGetProcessability is marker inteface: concurrent mode capability for any get operations. +struct ConcurrentGetProcessability +{ }; + +class RegionInfoGetter : public ConcurrentGetProcessability { public: using Selector = std::function; diff --git a/generator/streets/streets_builder.hpp b/generator/streets/streets_builder.hpp index 6b96c58c68..26429f33ae 100644 --- a/generator/streets/streets_builder.hpp +++ b/generator/streets/streets_builder.hpp @@ -45,7 +45,6 @@ private: void SaveRegionStreetsKv(std::ostream & streamStreetsKv, uint64_t regionId, RegionStreets const & streets); - void AddStreet(feature::FeatureBuilder & fb); void AddStreetHighway(feature::FeatureBuilder & fb); void AddStreetArea(feature::FeatureBuilder & fb);