[generator:features] Speedup features generation: reuse vector's
This commit is contained in:
parent
684443bcbf
commit
1711eb5454
7 changed files with 71 additions and 34 deletions
|
@ -4,12 +4,14 @@ namespace feature
|
|||
{
|
||||
SingleAffiliation::SingleAffiliation(std::string const & filename)
|
||||
: m_filename(filename)
|
||||
, m_affilations{std::make_shared<std::vector<std::string>>(std::vector<std::string>{m_filename})}
|
||||
{
|
||||
}
|
||||
|
||||
std::vector<std::string> SingleAffiliation::GetAffiliations(FeatureBuilder const &) const
|
||||
std::shared_ptr<std::vector<std::string>> SingleAffiliation::GetAffiliations(
|
||||
FeatureBuilder const &) const
|
||||
{
|
||||
return {m_filename};
|
||||
return m_affilations;
|
||||
}
|
||||
|
||||
bool SingleAffiliation::HasRegionByName(std::string const & name) const
|
||||
|
|
|
@ -13,7 +13,8 @@ public:
|
|||
virtual ~AffiliationInterface() = default;
|
||||
|
||||
// The method will return the names of the buckets to which the fb belongs.
|
||||
virtual std::vector<std::string> GetAffiliations(FeatureBuilder const & fb) const = 0;
|
||||
virtual std::shared_ptr<std::vector<std::string>> GetAffiliations(
|
||||
FeatureBuilder const & fb) const = 0;
|
||||
virtual bool HasRegionByName(std::string const & name) const = 0;
|
||||
};
|
||||
|
||||
|
@ -23,10 +24,11 @@ public:
|
|||
SingleAffiliation(std::string const & filename);
|
||||
|
||||
// AffiliationInterface overrides:
|
||||
std::vector<std::string> GetAffiliations(FeatureBuilder const &) const override;
|
||||
std::shared_ptr<std::vector<std::string>> GetAffiliations(FeatureBuilder const &) const override;
|
||||
bool HasRegionByName(std::string const & name) const override;
|
||||
|
||||
private:
|
||||
std::string m_filename;
|
||||
std::shared_ptr<std::vector<std::string>> m_affilations;
|
||||
};
|
||||
} // namespace feature
|
||||
|
|
|
@ -156,39 +156,74 @@ template <class SerializePolicy = feature::serialization_policy::MaxAccuracy>
|
|||
class AffiliationsFeatureLayer : public LayerBase
|
||||
{
|
||||
public:
|
||||
AffiliationsFeatureLayer(size_t bufferSize, std::shared_ptr<feature::AffiliationInterface> const & affiliation,
|
||||
std::shared_ptr<FeatureProcessorQueue> const & queue)
|
||||
: m_bufferSize(bufferSize)
|
||||
AffiliationsFeatureLayer(
|
||||
size_t bufferedItemsCountMax, std::shared_ptr<feature::AffiliationInterface> const & affiliation,
|
||||
std::shared_ptr<FeatureProcessorQueue> const & queue)
|
||||
: m_bufferedItemsCountMax{bufferedItemsCountMax}
|
||||
, m_affiliation(affiliation)
|
||||
, m_queue(queue)
|
||||
{
|
||||
m_buffer.reserve(m_bufferSize);
|
||||
SetupItemsBuffer();
|
||||
}
|
||||
|
||||
// LayerBase overrides:
|
||||
void Handle(feature::FeatureBuilder & fb) override
|
||||
{
|
||||
feature::FeatureBuilder::Buffer buffer;
|
||||
SerializePolicy::Serialize(fb, buffer);
|
||||
m_buffer.emplace_back(std::move(buffer), m_affiliation->GetAffiliations(fb));
|
||||
if (m_buffer.size() >= m_bufferSize)
|
||||
AddBufferToQueue();
|
||||
auto & newItem = PlaceNewItemInBuffer();
|
||||
SerializePolicy::Serialize(fb, newItem.m_buffer);
|
||||
newItem.m_affiliations = m_affiliation->GetAffiliations(fb);
|
||||
}
|
||||
|
||||
bool AddBufferToQueue()
|
||||
void Flush()
|
||||
{
|
||||
if (m_buffer.empty())
|
||||
return false;
|
||||
if (m_bufferedItemsCount == 0)
|
||||
return;
|
||||
|
||||
m_queue->Push(std::move(m_buffer));
|
||||
m_buffer.clear();
|
||||
m_buffer.reserve(m_bufferSize);
|
||||
return true;
|
||||
m_itemsBufferPlace.resize(m_bufferedItemsCount);
|
||||
auto itemsChunk =
|
||||
std::make_shared<std::vector<ProcessedData>>(std::move(m_itemsBufferPlace));
|
||||
|
||||
m_queue->Push(itemsChunk);
|
||||
m_queuedChunks.push_back(itemsChunk);
|
||||
|
||||
SetupItemsBuffer();
|
||||
}
|
||||
|
||||
private:
|
||||
size_t const m_bufferSize;
|
||||
std::vector<ProcessedData> m_buffer;
|
||||
ProcessedData & PlaceNewItemInBuffer()
|
||||
{
|
||||
if (m_bufferedItemsCount >= m_bufferedItemsCountMax)
|
||||
Flush();
|
||||
|
||||
auto & newItem = m_itemsBufferPlace[m_bufferedItemsCount++];
|
||||
// Clear |newItem| from previous values.
|
||||
newItem.m_buffer.clear();
|
||||
newItem.m_affiliations = nullptr;
|
||||
return newItem;
|
||||
}
|
||||
|
||||
void SetupItemsBuffer()
|
||||
{
|
||||
if (!m_queuedChunks.empty() && m_queuedChunks.front().use_count() == 1)
|
||||
{
|
||||
// Reuse items chunk.
|
||||
m_itemsBufferPlace = std::move(*m_queuedChunks.front());
|
||||
m_queuedChunks.pop_front();
|
||||
}
|
||||
else
|
||||
{
|
||||
m_itemsBufferPlace = std::vector<ProcessedData>(m_bufferedItemsCountMax);
|
||||
}
|
||||
|
||||
m_bufferedItemsCount = 0;
|
||||
}
|
||||
|
||||
std::vector<ProcessedData> m_itemsBufferPlace;
|
||||
// |m_bufferedItemsCount| are used for efficient memory reuse: minimize recreate items
|
||||
// of |m_itemsBufferPlace|.
|
||||
size_t m_bufferedItemsCount{0};
|
||||
size_t const m_bufferedItemsCountMax{100};
|
||||
std::list<std::shared_ptr<std::vector<ProcessedData>>> m_queuedChunks;
|
||||
std::shared_ptr<feature::AffiliationInterface> m_affiliation;
|
||||
std::shared_ptr<FeatureProcessorQueue> m_queue;
|
||||
};
|
||||
|
|
|
@ -5,24 +5,22 @@
|
|||
#include "base/thread_safe_queue.hpp"
|
||||
|
||||
#include <cstddef>
|
||||
#include <utility>
|
||||
#include <memory>
|
||||
#include <string>
|
||||
#include <utility>
|
||||
#include <vector>
|
||||
|
||||
namespace generator
|
||||
{
|
||||
size_t static const kAffiliationsBufferSize = 512;
|
||||
size_t static const kAffiliationsBufferSize = 2'000;
|
||||
|
||||
struct ProcessedData
|
||||
{
|
||||
explicit ProcessedData(feature::FeatureBuilder::Buffer && buffer,
|
||||
std::vector<std::string> && affiliations)
|
||||
: m_buffer(std::move(buffer)), m_affiliations(std::move(affiliations)) {}
|
||||
|
||||
feature::FeatureBuilder::Buffer m_buffer;
|
||||
std::vector<std::string> m_affiliations;
|
||||
std::shared_ptr<std::vector<std::string>> m_affiliations;
|
||||
};
|
||||
|
||||
using FeatureProcessorChunk = base::threads::DataWrapper<std::vector<ProcessedData>>;
|
||||
using FeatureProcessorChunk =
|
||||
base::threads::DataWrapper<std::shared_ptr<std::vector<ProcessedData>>>;
|
||||
using FeatureProcessorQueue = base::threads::ThreadSafeQueue<FeatureProcessorChunk>;
|
||||
} // namespace generator
|
||||
|
|
|
@ -170,7 +170,7 @@ void TestIntermediateDataGeneration(
|
|||
auto const & osmFileData = sample.second;
|
||||
|
||||
// Skip test for node storage type "mem": 64Gb required.
|
||||
for (auto const & nodeStorageType : {"raw"s, "map"s, "mem"s})
|
||||
for (auto const & nodeStorageType : {"raw"s, "map"s})
|
||||
{
|
||||
for (auto threadsCount : {1, 2, 4})
|
||||
{
|
||||
|
|
|
@ -31,7 +31,7 @@ void ProcessorSimple::Process(feature::FeatureBuilder & fb)
|
|||
|
||||
void ProcessorSimple::Finish()
|
||||
{
|
||||
m_affiliationsLayer->AddBufferToQueue();
|
||||
m_affiliationsLayer->Flush();
|
||||
}
|
||||
|
||||
void ProcessorSimple::Merge(FeatureProcessorInterface const & other)
|
||||
|
|
|
@ -30,7 +30,7 @@ void RawGeneratorWriter::Run()
|
|||
if (chunk.IsEmpty())
|
||||
return;
|
||||
|
||||
Write(chunk.Get());
|
||||
Write(*chunk.Get());
|
||||
}
|
||||
});
|
||||
}
|
||||
|
@ -51,7 +51,7 @@ void RawGeneratorWriter::Write(std::vector<ProcessedData> const & vecChunks)
|
|||
{
|
||||
for (auto const & chunk : vecChunks)
|
||||
{
|
||||
for (auto const & affiliation : chunk.m_affiliations)
|
||||
for (auto const & affiliation : *chunk.m_affiliations)
|
||||
{
|
||||
if (affiliation.empty())
|
||||
continue;
|
||||
|
|
Loading…
Add table
Reference in a new issue