[generator] Fixed nodes in memory.

Signed-off-by: vng <viktor.govako@gmail.com>
This commit is contained in:
vng 2021-04-20 10:01:12 +03:00 committed by Konstantin Pastbin
parent f3b5f72d93
commit 3106cbd66d
5 changed files with 160 additions and 90 deletions

View file

@ -1,7 +1,6 @@
#include "base/base.hpp"
#include "base/assert.hpp"
#include "base/exception.hpp"
#include "base/logging.hpp"
#include "base/src_point.hpp"
@ -14,7 +13,8 @@ bool OnAssertFailedDefault(SrcPoint const & srcPoint, std::string const & msg)
auto & logger = LogHelper::Instance();
std::cerr << '(' << logger.GetThreadID() << ") ASSERT FAILED" << '\n'
<< srcPoint.FileName() << ':' << srcPoint.Line() << '\n' << msg << std::endl;
<< srcPoint.FileName() << ':' << srcPoint.Line() << '\n' << msg
<< std::endl << std::flush;
return true;
}

View file

@ -1,14 +1,9 @@
#include "generator/intermediate_data.hpp"
#include <new>
#include <set>
#include <string>
#include "base/assert.hpp"
#include "base/checked_cast.hpp"
#include "base/logging.hpp"
#include "defines.hpp"
#include <future>
#include <execution>
namespace generator::cache
{
@ -20,11 +15,6 @@ size_t const kFlushCount = 1024;
double const kValueOrder = 1e7;
string const kShortExtension = ".short";
// An estimation.
// OSM had around 4.1 billion nodes on 2017-11-08,
// see https://wiki.openstreetmap.org/wiki/Stats
size_t const kMaxNodesInOSM = size_t{1} << 33;
void ToLatLon(double lat, double lon, LatLon & ll)
{
int64_t const lat64 = lat * kValueOrder;
@ -65,11 +55,10 @@ public:
// PointStorageWriterInterface overrides:
uint64_t GetNumProcessedPoints() const override { return m_numProcessedPoints; }
private:
uint64_t m_numProcessedPoints{0};
protected:
uint64_t m_numProcessedPoints = 0;
};
// RawFilePointStorageMmapReader -------------------------------------------------------------------
class RawFilePointStorageMmapReader : public PointStorageReaderInterface
{
public:
@ -93,12 +82,11 @@ private:
MmapReader m_mmapReader;
};
// RawFilePointStorageWriter -----------------------------------------------------------------------
class RawFilePointStorageWriter : public PointStorageWriterBase
{
public:
explicit RawFilePointStorageWriter(string const & name) :
m_fileWriter(name)
explicit RawFilePointStorageWriter(string const & name)
: m_fileWriter(name)
{}
// PointStorageWriterInterface overrides:
@ -115,26 +103,26 @@ public:
private:
FileWriter m_fileWriter;
uint64_t m_numProcessedPoints = 0;
};
// RawMemPointStorageReader ------------------------------------------------------------------------
class RawMemPointStorageReader : public PointStorageReaderInterface
{
public:
explicit RawMemPointStorageReader(string const & name):
m_fileReader(name),
m_data(kMaxNodesInOSM)
explicit RawMemPointStorageReader(string const & name)
: m_fileReader(name)
{
static_assert(sizeof(size_t) == 8, "This code is only for 64-bit architectures");
m_fileReader.Read(0, m_data.data(), m_data.size() * sizeof(LatLon));
uint64_t const fileSize = m_fileReader.Size();
CHECK_EQUAL(fileSize % sizeof(LatLon), 0, ("Node's coordinates file is broken"));
m_data.resize(fileSize / sizeof(LatLon));
m_fileReader.Read(0, m_data.data(), fileSize);
}
// PointStorageReaderInterface overrides:
bool GetPoint(uint64_t id, double & lat, double & lon) const override
{
LatLon const & ll = m_data[id];
bool ret = FromLatLon(ll, lat, lon);
bool const ret = FromLatLon(ll, lat, lon);
if (!ret)
LOG(LERROR, ("Node with id =", id, "not found!"));
return ret;
@ -145,45 +133,118 @@ private:
std::vector<LatLon> m_data;
};
// RawMemPointStorageWriter ------------------------------------------------------------------------
class RawMemPointStorageWriter : public PointStorageWriterBase
{
// 16G buffer size.
static constexpr size_t kBufferSize = 1000000000;
public:
explicit RawMemPointStorageWriter(string const & name) :
m_fileWriter(name),
m_data(kMaxNodesInOSM)
explicit RawMemPointStorageWriter(string const & name)
: m_fileWriter(name)
{
m_buffer.reserve(kBufferSize);
}
~RawMemPointStorageWriter() noexcept(false) override
{
m_fileWriter.Write(m_data.data(), m_data.size() * sizeof(LatLon));
Flush();
}
// PointStorageWriterInterface overrides:
void AddPoint(uint64_t id, double lat, double lon) override
{
CHECK_LESS(id, m_data.size(),
("Found node with id", id, "which is bigger than the allocated cache size"));
if (m_buffer.size() >= kBufferSize)
FlushAsync();
LatLon & ll = m_data[id];
LatLon ll;
ToLatLon(lat, lon, ll);
m_buffer.push_back({id, ll});
++m_numProcessedPoints;
}
private:
FileWriter m_fileWriter;
std::vector<LatLon> m_data;
uint64_t m_numProcessedPoints = 0;
using BufferT = std::vector<LatLonPos>;
void FlushImpl(BufferT & buffer)
{
// Sort, according to the seek pos in file.
/// @todo Try parallel sort when clang will be able.
//std::sort(std::execution::par, buffer.begin(), buffer.end(), [](LatLonPos const & l, LatLonPos const & r)
std::sort(buffer.begin(), buffer.end(), [](LatLonPos const & l, LatLonPos const & r)
{
return l.m_pos < r.m_pos;
});
size_t constexpr structSize = sizeof(LatLon);
for (auto const & llp : buffer)
m_fileWriter.Write(llp.m_pos * structSize, &llp.m_ll, structSize);
}
// Async version to continue collecting points in parallel, while writing a file.
void FlushAsync()
{
if (m_future.valid())
m_future.wait();
BufferT * pBuffer = new BufferT(std::move(m_buffer));
m_buffer.clear();
m_buffer.reserve(kBufferSize);
// Using raw pointers because we can't make std::function with rvalue reference.
m_future = std::async(std::launch::async, [this, pBuffer]()
{
FlushImpl(*pBuffer);
delete pBuffer;
});
}
void Flush()
{
if (m_future.valid())
m_future.wait();
FlushImpl(m_buffer);
m_buffer.clear();
}
private:
// Expect that fseek(FILE) makes the same check inside, but no ...
class CachedPosWriter
{
FileWriter m_writer;
uint64_t m_pos = 0;
public:
explicit CachedPosWriter(std::string const & fPath) : m_writer(fPath)
{
CHECK_EQUAL(m_pos, m_writer.Pos(), ());
}
void Write(uint64_t pos, void const * p, size_t size)
{
if (m_pos != pos)
{
m_writer.Seek(pos);
m_pos = pos;
}
m_writer.Write(p, size);
m_pos += size;
}
};
CachedPosWriter m_fileWriter;
BufferT m_buffer;
std::future<void> m_future;
};
// MapFilePointStorageReader -----------------------------------------------------------------------
class MapFilePointStorageReader : public PointStorageReaderInterface
{
public:
explicit MapFilePointStorageReader(string const & name) :
m_fileReader(name + kShortExtension)
explicit MapFilePointStorageReader(string const & name)
: m_fileReader(name + kShortExtension)
{
LOG(LINFO, ("Nodes reading is started"));
@ -191,15 +252,12 @@ public:
uint64_t pos = 0;
LatLonPos llp;
LatLon ll;
while (pos < count)
{
m_fileReader.Read(pos, &llp, sizeof(llp));
pos += sizeof(llp);
ll.m_lat = llp.m_lat;
ll.m_lon = llp.m_lon;
m_map.emplace(llp.m_pos, ll);
m_map.emplace(llp.m_pos, llp.m_ll);
}
LOG(LINFO, ("Nodes reading is finished"));
@ -225,25 +283,21 @@ private:
std::unordered_map<uint64_t, LatLon> m_map;
};
// MapFilePointStorageWriter -----------------------------------------------------------------------
class MapFilePointStorageWriter : public PointStorageWriterBase
{
public:
explicit MapFilePointStorageWriter(string const & name) :
m_fileWriter(name + kShortExtension)
explicit MapFilePointStorageWriter(string const & name)
: m_fileWriter(name + kShortExtension)
{
}
// PointStorageWriterInterface overrides:
void AddPoint(uint64_t id, double lat, double lon) override
{
LatLon ll;
ToLatLon(lat, lon, ll);
LatLonPos llp;
llp.m_pos = id;
llp.m_lat = ll.m_lat;
llp.m_lon = ll.m_lon;
ToLatLon(lat, lon, llp.m_ll);
m_fileWriter.Write(&llp, sizeof(llp));
@ -252,7 +306,6 @@ public:
private:
FileWriter m_fileWriter;
uint64_t m_numProcessedPoints = 0;
};
} // namespace
@ -261,7 +314,7 @@ IndexFileReader::IndexFileReader(string const & name)
{
FileReader fileReader(name);
m_elements.clear();
size_t const fileSize = fileReader.Size();
size_t const fileSize = base::checked_cast<size_t>(fileReader.Size());
if (fileSize == 0)
return;
@ -270,14 +323,14 @@ IndexFileReader::IndexFileReader(string const & name)
try
{
m_elements.resize(base::checked_cast<size_t>(fileSize / sizeof(Element)));
m_elements.resize(fileSize / sizeof(Element));
}
catch (std::bad_alloc const &)
{
LOG(LCRITICAL, ("Insufficient memory for required offset map"));
}
fileReader.Read(0, &m_elements[0], base::checked_cast<size_t>(fileSize));
fileReader.Read(0, &m_elements[0], fileSize);
sort(m_elements.begin(), m_elements.end(), ElementComparator());
@ -341,23 +394,32 @@ OSMElementCacheWriter::OSMElementCacheWriter(string const & name)
void OSMElementCacheWriter::SaveOffsets() { m_offsets.WriteAll(); }
// IntermediateDataObjectsCache --------------------------------------------------------------------
IntermediateDataObjectsCache::AllocatedObjects &
IntermediateDataObjectsCache::GetOrCreatePointStorageReader(
feature::GenerateInfo::NodeStorageType type, string const & name)
{
static std::mutex m;
auto const k = std::to_string(static_cast<int>(type)) + name;
std::lock_guard lock(m);
auto it = m_objects.find(k);
if (it == cend(m_objects))
return m_objects.emplace(k, AllocatedObjects(CreatePointStorageReader(type, name))).first->second;
auto const strType = std::to_string(static_cast<int>(type));
auto const key = strType + name;
return it->second;
std::lock_guard lock(m_mutex);
auto res = m_objects.try_emplace(key, type, name);
if (res.second)
LOG(LINFO, ("Created nodes reader:", strType, name));
return res.first->second;
}
void IntermediateDataObjectsCache::Clear()
{
m_objects = std::unordered_map<std::string, AllocatedObjects>();
std::lock_guard lock(m_mutex);
std::unordered_map<string, AllocatedObjects>().swap(m_objects);
}
IntermediateDataObjectsCache::AllocatedObjects::AllocatedObjects(
feature::GenerateInfo::NodeStorageType type, string const & name)
{
m_storageReader = CreatePointStorageReader(type, name);
}
IndexFileReader const & IntermediateDataObjectsCache::AllocatedObjects::GetOrCreateIndexReader(
@ -365,14 +427,11 @@ IndexFileReader const & IntermediateDataObjectsCache::AllocatedObjects::GetOrCre
{
static std::mutex m;
std::lock_guard lock(m);
auto it = m_fileReaders.find(name);
if (it == cend(m_fileReaders))
return m_fileReaders.emplace(name, IndexFileReader(name)).first->second;
return it->second;
return m_fileReaders.try_emplace(name, name).first->second;
}
// IntermediateDataReader
// IntermediateDataReader --------------------------------------------------------------------------
IntermediateDataReader::IntermediateDataReader(
IntermediateDataObjectsCache::AllocatedObjects & objs, feature::GenerateInfo const & info)
: m_nodes(objs.GetPointStorageReader())
@ -384,7 +443,7 @@ IntermediateDataReader::IntermediateDataReader(
objs.GetOrCreateIndexReader(info.GetCacheFileName(RELATIONS_FILE, ID2REL_EXT)))
{}
// IntermediateDataWriter
// IntermediateDataWriter --------------------------------------------------------------------------
IntermediateDataWriter::IntermediateDataWriter(PointStorageWriterInterface & nodes,
feature::GenerateInfo const & info)
: m_nodes(nodes)

View file

@ -12,20 +12,16 @@
#include "base/file_name_utils.hpp"
#include "base/logging.hpp"
#include "defines.hpp"
#include <algorithm>
#include <cstddef>
#include <cstdint>
#include <limits>
#include <memory>
#include <mutex>
#include <string>
#include <type_traits>
#include <unordered_map>
#include <utility>
#include <vector>
#include "defines.hpp"
// Classes for reading and writing any data in file with map of offsets for
// fast searching in memory by some key.
namespace generator
@ -49,8 +45,7 @@ static_assert(std::is_trivially_copyable<LatLon>::value, "");
struct LatLonPos
{
uint64_t m_pos = 0;
int32_t m_lat = 0;
int32_t m_lon = 0;
LatLon m_ll;
};
static_assert(sizeof(LatLonPos) == 16, "Invalid structure size");
static_assert(std::is_trivially_copyable<LatLonPos>::value, "");
@ -66,7 +61,7 @@ public:
class PointStorageReaderInterface
{
public:
virtual ~PointStorageReaderInterface() {}
virtual ~PointStorageReaderInterface() = default;
virtual bool GetPoint(uint64_t id, double & lat, double & lon) const = 0;
};
@ -141,10 +136,7 @@ public:
class AllocatedObjects
{
public:
explicit AllocatedObjects(std::unique_ptr<PointStorageReaderInterface> storageReader)
: m_storageReader(std::move(storageReader))
{
}
AllocatedObjects(feature::GenerateInfo::NodeStorageType type, std::string const & name);
PointStorageReaderInterface const & GetPointStorageReader() const { return *m_storageReader; }
@ -162,6 +154,7 @@ public:
void Clear();
private:
std::mutex m_mutex;
std::unordered_map<std::string, AllocatedObjects> m_objects;
};

View file

@ -20,6 +20,7 @@
#include <exception>
#include <iostream>
#include <mutex>
#include <vector>
#define BOOST_STACKTRACE_GNU_SOURCE_NOT_REQUIRED
@ -27,10 +28,22 @@
namespace generator
{
std::string g_lastError;
void SetLastError(std::string error)
{
static std::mutex m;
std::lock_guard guard(m);
g_lastError.swap(error);
}
void ErrorHandler(int signum)
{
// Avoid recursive calls.
std::signal(signum, SIG_DFL);
if (!g_lastError.empty())
std::cerr << "Last error = " << g_lastError << std::endl;
// If there was an exception, then we will print the message.
try
{
@ -39,19 +52,23 @@ void ErrorHandler(int signum)
}
catch (RootException const & e)
{
std::cerr << "Core exception: " << e.Msg() << "\n";
std::cerr << "Core exception: " << e.Msg() << std::endl;
}
catch (std::exception const & e)
{
std::cerr << "Std exception: " << e.what() << "\n";
std::cerr << "Std exception: " << e.what() << std::endl;
}
catch (...)
{
std::cerr << "Unknown exception.\n";
std::cerr << "Unknown exception." << std::endl;
}
// Print stack stack.
std::cerr << boost::stacktrace::stacktrace();
// Print this fuction address to calculate BASE loading address for raw crash dump.
std::cerr << "ErrorHandler ptr: " << reinterpret_cast<void *>(&ErrorHandler) << std::endl;
// Print stack.
std::cerr << boost::stacktrace::stacktrace() << std::endl << std::flush;
// We raise the signal SIGABRT, so that there would be an opportunity to make a core dump.
std::raise(SIGABRT);
}

View file

@ -34,6 +34,7 @@
namespace generator
{
void SetLastError(std::string error);
void ErrorHandler(int signum);
/// \brief This class is wrapper around |DataSource| if only one mwm is registered in DataSource.