Multithreaded coastlines generation. Second attempt :)

This commit is contained in:
vng 2012-09-22 18:11:34 +03:00 committed by Alex Zolotarev
parent f9eef1f83b
commit 7d6779867e
9 changed files with 234 additions and 96 deletions

View file

@ -176,7 +176,7 @@ namespace
};
}
bool CoastlineFeaturesGenerator::GetFeature(CellIdT const & cell, FeatureBuilder1 & fb)
bool CoastlineFeaturesGenerator::GetFeature(CellIdT const & cell, FeatureBuilder1 & fb) const
{
// get rect cell
double minX, minY, maxX, maxY;
@ -211,7 +211,7 @@ bool CoastlineFeaturesGenerator::GetFeature(CellIdT const & cell, FeatureBuilder
return true;
}
void CoastlineFeaturesGenerator::GetFeatures(size_t i, vector<FeatureBuilder1> & vecFb)
void CoastlineFeaturesGenerator::GetFeatures(size_t i, vector<FeatureBuilder1> & vecFb) const
{
vector<CellIdT> stCells;
stCells.push_back(CellIdT::FromBitsAndLevel(i, m_lowLevel));

View file

@ -22,7 +22,7 @@ class CoastlineFeaturesGenerator
uint32_t m_coastType;
int m_lowLevel, m_highLevel, m_maxPoints;
bool GetFeature(CellIdT const & cell, FeatureBuilder1 & fb);
bool GetFeature(CellIdT const & cell, FeatureBuilder1 & fb) const;
public:
CoastlineFeaturesGenerator(uint32_t coastType,
@ -34,5 +34,5 @@ public:
void Finish();
inline size_t GetCellsCount() const { return 1 << 2 * m_lowLevel; }
void GetFeatures(size_t i, vector<FeatureBuilder1> & vecFb);
void GetFeatures(size_t i, vector<FeatureBuilder1> & vecFb) const;
};

View file

@ -6,6 +6,7 @@
#include "generate_info.hpp"
#include "coastlines_generator.hpp"
#include "world_map_generator.hpp"
#include "multiproducer_oneconsumer.hpp"
#include "../defines.hpp"
@ -335,6 +336,32 @@ public:
}
}
private:
class CoastFeatureTask : public MultiProducerOneConsumer::ITask
{
MainFeaturesEmitter & m_parent;
size_t m_ind;
public:
CoastFeatureTask(MainFeaturesEmitter & parent, size_t ind)
: m_parent(parent), m_ind(ind) {}
virtual void RunBase()
{
vector<FeatureBuilder1> vecFb;
m_parent.m_coasts->GetFeatures(m_ind, vecFb);
for (size_t i = 0; i< vecFb.size(); ++i)
Emit(&vecFb[i]);
}
virtual void EmitBase(void * p)
{
(*m_parent.m_coastsHolder)(*reinterpret_cast<FeatureBuilder1 *>(p));
}
};
public:
void Finish()
{
if (m_world)
@ -345,16 +372,12 @@ public:
m_coasts->Finish();
size_t const count = m_coasts->GetCellsCount();
LOG(LINFO, ("Generating coastline polygons", count));
LOG(LINFO, ("Generating coastline features for ", count, " cells."));
MultiProducerOneConsumer runner(8);
for (size_t i = 0; i < count; ++i)
{
vector<FeatureBuilder1> vecFb;
m_coasts->GetFeatures(i, vecFb);
for (size_t j = 0; j < vecFb.size(); ++j)
(*m_coastsHolder)(vecFb[j]);
}
runner.RunTask(new CoastFeatureTask(*this, i));
runner.Finish();
}
else if (m_coastsHolder)
{

View file

@ -30,6 +30,7 @@ SOURCES += \
osm_decl.cpp \
coastlines_generator.cpp \
tesselator.cpp \
multiproducer_oneconsumer.cpp \
HEADERS += \
feature_merger.hpp \
@ -57,3 +58,4 @@ HEADERS += \
osm_decl.hpp \
coastlines_generator.hpp \
tesselator.hpp \
multiproducer_oneconsumer.hpp \

View file

@ -0,0 +1,45 @@
#include "../../testing/testing.hpp"
#include "../multiproducer_oneconsumer.hpp"
namespace
{
class AccumulateTask : public MultiProducerOneConsumer::ITask
{
vector<uint64_t> & m_res;
public:
AccumulateTask(vector<uint64_t> & res) : m_res(res) {}
static uint64_t const s_upper = 1000000;
virtual void RunBase()
{
uint64_t summ = 0;
for (uint64_t i = 0; i <= s_upper; ++i)
summ += i;
Emit(reinterpret_cast<void *>(summ));
}
virtual void EmitBase(void * p)
{
m_res.push_back(reinterpret_cast<uint64_t>(p));
}
};
}
UNIT_TEST(MultiProducers_Smoke)
{
size_t const count = 64;
vector<uint64_t> vec;
MultiProducerOneConsumer runner(8);
for (size_t i = 0; i < count; ++i)
runner.RunTask(new AccumulateTask(vec));
runner.Finish();
TEST_EQUAL(vec.size(), count, ());
uint64_t const res = AccumulateTask::s_upper * (AccumulateTask::s_upper + 1) / 2;
for (size_t i = 0; i < count; ++i)
TEST_EQUAL(vec[i], res, ());
}

View file

@ -27,4 +27,5 @@ SOURCES += \
tesselator_test.cpp \
triangles_tree_coding_test.cpp \
coasts_test.cpp \
concurrent_tests.cpp \
feature_builder_test.cpp \

View file

@ -0,0 +1,63 @@
#include "multiproducer_oneconsumer.hpp"
#include "../base/logging.hpp"
#include "../base/macros.hpp"
#include "../base/assert.hpp"
MultiProducerOneConsumer::MultiProducerOneConsumer(size_t tasksPerThread)
#if PARALLEL_POLYGONIZER
: m_ThreadPoolSemaphore(m_ThreadPool.maxThreadCount() * tasksPerThread)
#endif
{
#if PARALLEL_POLYGONIZER
LOG(LINFO, ("QThreadPool threads count = ", m_ThreadPool.maxThreadCount()));
#endif
}
void MultiProducerOneConsumer::RunTask(ITask * pTask)
{
#if PARALLEL_POLYGONIZER
pTask->BeforeStart(this);
m_ThreadPool.start(pTask);
#else
pTask->RunBase();
delete pTask;
#endif
}
#if PARALLEL_POLYGONIZER
void MultiProducerOneConsumer::ITask::BeforeStart(MultiProducerOneConsumer * pParent)
{
ASSERT ( pParent, () );
m_pParent = pParent;
m_pParent->m_ThreadPoolSemaphore.acquire();
}
void MultiProducerOneConsumer::ITask::run()
{
RunBase();
m_pParent->m_ThreadPoolSemaphore.release();
}
#endif
void MultiProducerOneConsumer::ITask::Emit(void * p)
{
#if PARALLEL_POLYGONIZER
ASSERT ( m_pParent, () );
QMutexLocker mutexLocker(&m_pParent->m_EmitMutex);
UNUSED_VALUE(mutexLocker);
#endif
EmitBase(p);
}
void MultiProducerOneConsumer::Finish()
{
#if PARALLEL_POLYGONIZER
m_ThreadPool.waitForDone();
#endif
}

View file

@ -0,0 +1,56 @@
#pragma once
#ifndef PARALLEL_POLYGONIZER
#define PARALLEL_POLYGONIZER 1
#endif
#if PARALLEL_POLYGONIZER
#include <QSemaphore>
#include <QThreadPool>
#include <QMutex>
#include <QMutexLocker>
#endif
class MultiProducerOneConsumer
{
#if PARALLEL_POLYGONIZER
QThreadPool m_ThreadPool;
QSemaphore m_ThreadPoolSemaphore;
QMutex m_EmitMutex;
#endif
public:
MultiProducerOneConsumer(size_t tasksPerThread);
class ITask
#if PARALLEL_POLYGONIZER
: public QRunnable
#endif
{
#if PARALLEL_POLYGONIZER
MultiProducerOneConsumer * m_pParent;
#endif
public:
ITask() : m_pParent(0) {}
#if PARALLEL_POLYGONIZER
void BeforeStart(MultiProducerOneConsumer * pParent);
#endif
virtual void RunBase() = 0;
virtual void EmitBase(void * p) = 0;
void Emit(void * p);
#if PARALLEL_POLYGONIZER
// Override
virtual void run();
#endif
};
void RunTask(ITask * pTask);
void Finish();
};

View file

@ -2,39 +2,19 @@
#include "borders_loader.hpp"
#include "feature_builder.hpp"
#include "../indexer/feature_visibility.hpp"
#include "../indexer/cell_id.hpp"
#include "multiproducer_oneconsumer.hpp"
#include "../geometry/rect2d.hpp"
#include "../coding/file_writer.hpp"
#include "../base/base.hpp"
#include "../base/buffer_vector.hpp"
#include "../base/macros.hpp"
#include "../std/scoped_ptr.hpp"
#include "../std/string.hpp"
#ifndef PARALLEL_POLYGONIZER
#define PARALLEL_POLYGONIZER 1
#endif
#if PARALLEL_POLYGONIZER
#include <QSemaphore>
#include <QThreadPool>
#include <QMutex>
#include <QMutexLocker>
#endif
namespace feature
{
// Groups features according to country polygons
template <class FeatureOutT>
class Polygonizer
template <class FeatureOutT> class Polygonizer
{
string m_prefix;
string m_suffix;
@ -43,24 +23,13 @@ namespace feature
vector<string> m_Names;
borders::CountriesContainerT m_countries;
#if PARALLEL_POLYGONIZER
QThreadPool m_ThreadPool;
QSemaphore m_ThreadPoolSemaphore;
QMutex m_EmitFeatureMutex;
#endif
MultiProducerOneConsumer m_impl;
public:
template <class TInfo>
explicit Polygonizer(TInfo const & info)
: m_prefix(info.m_datFilePrefix), m_suffix(info.m_datFileSuffix)
#if PARALLEL_POLYGONIZER
, m_ThreadPoolSemaphore(m_ThreadPool.maxThreadCount() * 8)
#endif
: m_prefix(info.m_datFilePrefix), m_suffix(info.m_datFileSuffix), m_impl(8)
{
#if PARALLEL_POLYGONIZER
LOG(LINFO, ("Polygonizer thread pool threads:", m_ThreadPool.maxThreadCount()));
#endif
if (info.m_splitByPolygons)
{
CHECK(borders::LoadCountriesList(info.m_datFilePrefix, m_countries),
@ -75,7 +44,7 @@ namespace feature
}
~Polygonizer()
{
Finish();
m_impl.Finish();
for_each(m_Buckets.begin(), m_Buckets.end(), DeleteFunctor());
}
@ -100,14 +69,16 @@ namespace feature
}
};
typedef borders::CountryPolygons PolygonsT;
typedef buffer_vector<PolygonsT const *, 32> PolygonsVectorT;
class InsertCountriesPtr
{
typedef buffer_vector<borders::CountryPolygons const *, 32> vec_type;
vec_type & m_vec;
PolygonsVectorT & m_vec;
public:
InsertCountriesPtr(vec_type & vec) : m_vec(vec) {}
void operator() (borders::CountryPolygons const & c)
InsertCountriesPtr(PolygonsVectorT & vec) : m_vec(vec) {}
void operator() (PolygonsT const & c)
{
m_vec.push_back(&c);
}
@ -115,7 +86,7 @@ namespace feature
void operator () (FeatureBuilder1 const & fb)
{
buffer_vector<borders::CountryPolygons const *, 32> vec;
PolygonsVectorT vec;
m_countries.ForEachInRect(fb.GetLimitRect(), InsertCountriesPtr(vec));
switch (vec.size())
@ -126,31 +97,13 @@ namespace feature
EmitFeature(vec[0], fb);
break;
default:
{
#if PARALLEL_POLYGONIZER
m_ThreadPoolSemaphore.acquire();
m_ThreadPool.start(new PolygonizerTask(this, vec, fb));
#else
PolygonizerTask task(this, vec, fb);
task.RunBase();
#endif
}
m_impl.RunTask(new PolygonizerTask(*this, vec, fb));
break;
}
}
void Finish()
void EmitFeature(PolygonsT const * country, FeatureBuilder1 const & fb)
{
#if PARALLEL_POLYGONIZER
m_ThreadPool.waitForDone();
#endif
}
void EmitFeature(borders::CountryPolygons const * country, FeatureBuilder1 const & fb)
{
#if PARALLEL_POLYGONIZER
QMutexLocker mutexLocker(&m_EmitFeatureMutex);
UNUSED_VALUE(mutexLocker);
#endif
if (country->m_index == -1)
{
m_Names.push_back(country->m_name);
@ -161,26 +114,20 @@ namespace feature
(*(m_Buckets[country->m_index]))(fb);
}
vector<string> const & Names() const
{
return m_Names;
}
inline vector<string> const & Names() const { return m_Names; }
private:
friend class PolygonizerTask;
class PolygonizerTask
#if PARALLEL_POLYGONIZER
: public QRunnable
#endif
class PolygonizerTask : public MultiProducerOneConsumer::ITask
{
public:
PolygonizerTask(Polygonizer * pPolygonizer,
buffer_vector<borders::CountryPolygons const *, 32> const & countries,
PolygonizerTask(Polygonizer & polygonizer,
PolygonsVectorT const & countries,
FeatureBuilder1 const & fb)
: m_pPolygonizer(pPolygonizer), m_Countries(countries), m_FB(fb) {}
: m_polygonizer(polygonizer), m_Countries(countries), m_FB(fb) {}
void RunBase()
// Override
virtual void RunBase()
{
for (size_t i = 0; i < m_Countries.size(); ++i)
{
@ -188,23 +135,24 @@ namespace feature
m_FB.ForEachGeometryPoint(doCheck);
if (doCheck.m_belongs)
m_pPolygonizer->EmitFeature(m_Countries[i], m_FB);
Emit(const_cast<PolygonsT *>(m_Countries[i]));
}
}
#if PARALLEL_POLYGONIZER
void run()
// Override
virtual void EmitBase(void * p)
{
RunBase();
m_pPolygonizer->m_ThreadPoolSemaphore.release();
m_polygonizer.EmitFeature(reinterpret_cast<PolygonsT const *>(p), m_FB);
}
#endif
private:
Polygonizer * m_pPolygonizer;
buffer_vector<borders::CountryPolygons const *, 32> m_Countries;
Polygonizer & m_polygonizer;
/// @name Do copy of all input parameters.
//@{
PolygonsVectorT m_Countries;
FeatureBuilder1 m_FB;
//@}
};
};
}