Merge pull request #11083 from cc-engineering/generator.geo_objects.kv-storage-json-objects-limit

[generator:geo_objects] Optimize RSS usage: json objects limit
This commit is contained in:
LaGrunge 2019-06-21 14:04:10 +03:00 committed by GitHub
commit ee396556c9
No known key found for this signature in database
GPG key ID: 4AEE18F83AFDEB23
4 changed files with 65 additions and 35 deletions

View file

@ -177,8 +177,6 @@ void BuildGeoObjectsWithAddresses(KeyValueStorage & geoObjectsKv,
std::string const & pathInGeoObjectsTmpMwm,
bool verbose, size_t threadsCount)
{
size_t countGeoObjects = 0;
std::mutex updateMutex;
auto const concurrentTransformer = [&](FeatureBuilder & fb, uint64_t /* currPos */) {
if (!GeoObjectsFilter::IsBuilding(fb) && !GeoObjectsFilter::HasHouse(fb))
@ -194,13 +192,11 @@ void BuildGeoObjectsWithAddresses(KeyValueStorage & geoObjectsKv,
JSON_REAL_PRECISION(regions::JsonPolicy::kDefaultPrecision) | JSON_COMPACT)};
std::lock_guard<std::mutex> lock(updateMutex);
geoObjectsKv.Insert(id, std::move(json), std::move(jsonValue));
++countGeoObjects;
geoObjectsKv.Insert(id, std::move(json)); // no cache JSON model
};
ForEachParallelFromDatRawFormat(threadsCount, pathInGeoObjectsTmpMwm, concurrentTransformer);
LOG(LINFO, ("Added ", countGeoObjects, "geo objects with addresses."));
LOG(LINFO, ("Added", geoObjectsKv.Size(), "geo objects with addresses."));
}
void BuildGeoObjectsWithoutAddresses(KeyValueStorage & geoObjectsKv,
@ -209,7 +205,7 @@ void BuildGeoObjectsWithoutAddresses(KeyValueStorage & geoObjectsKv,
std::ostream & streamIdsWithoutAddress,
bool verbose, size_t threadsCount)
{
size_t countGeoObjects = 0;
auto addressObjectsCount = geoObjectsKv.Size();
std::mutex updateMutex;
auto const concurrentTransformer = [&](FeatureBuilder & fb, uint64_t /* currPos */) {
@ -228,14 +224,12 @@ void BuildGeoObjectsWithoutAddresses(KeyValueStorage & geoObjectsKv,
JSON_REAL_PRECISION(regions::JsonPolicy::kDefaultPrecision) | JSON_COMPACT)};
std::lock_guard<std::mutex> lock(updateMutex);
geoObjectsKv.Insert(id, std::move(json), std::move(jsonValue));
geoObjectsKv.Insert(id, std::move(json)); // no cache JSON model
streamIdsWithoutAddress << id << "\n";
++countGeoObjects;
};
ForEachParallelFromDatRawFormat(threadsCount, pathInGeoObjectsTmpMwm, concurrentTransformer);
LOG(LINFO, ("Added ", countGeoObjects, "geo objects without addresses."));
LOG(LINFO, ("Added ", geoObjectsKv.Size() - addressObjectsCount, "geo objects without addresses."));
}
} // namespace
@ -268,7 +262,7 @@ bool GenerateGeoObjects(std::string const & pathInRegionsIndex,
pathInGeoObjectsTmpMwm);
Platform().RemoveFileIfExists(pathOutGeoObjectsKv);
KeyValueStorage geoObjectsKv(pathOutGeoObjectsKv);
KeyValueStorage geoObjectsKv(pathOutGeoObjectsKv, 0 /* cacheValuesCountLimit */);
BuildGeoObjectsWithAddresses(geoObjectsKv, regionInfoGetter, pathInGeoObjectsTmpMwm,
verbose, threadsCount);
LOG(LINFO, ("Geo objects with addresses were built."));

View file

@ -5,11 +5,14 @@
#include "base/exception.hpp"
#include "base/logging.hpp"
#include <cstring>
namespace generator
{
KeyValueStorage::KeyValueStorage(std::string const & path,
KeyValueStorage::KeyValueStorage(std::string const & path, size_t cacheValuesCountLimit,
std::function<bool(KeyValue const &)> const & pred)
: m_storage{path, std::ios_base::in | std::ios_base::out | std::ios_base::app}
, m_cacheValuesCountLimit{cacheValuesCountLimit}
{
if (!m_storage)
MYTHROW(Reader::OpenException, ("Failed to open file", path));
@ -20,18 +23,34 @@ KeyValueStorage::KeyValueStorage(std::string const & path,
{
++lineNumber;
KeyValue kv;
if (!ParseKeyValueLine(line, kv, lineNumber) || !pred(kv))
uint64_t key;
auto value = std::string{};
if (!ParseKeyValueLine(line, lineNumber, key, value))
continue;
m_values.insert(kv);
json_error_t jsonError;
auto json = std::make_shared<JsonValue>(json_loads(value.c_str(), 0, &jsonError));
if (!json)
{
LOG(LWARNING, ("Cannot create base::Json in line", lineNumber, ":", jsonError.text));
continue;
}
if (!pred({key, json}))
continue;
if (m_cacheValuesCountLimit <= m_values.size())
m_values.emplace(key, CopyJsonString(value));
else
m_values.emplace(key, std::move(json));
}
m_storage.clear();
}
// static
bool KeyValueStorage::ParseKeyValueLine(std::string const & line, KeyValue & res, std::streamoff lineNumber)
bool KeyValueStorage::ParseKeyValueLine(std::string const & line, std::streamoff lineNumber,
uint64_t & key, std::string & value)
{
auto const pos = line.find(" ");
if (pos == std::string::npos)
@ -47,26 +66,24 @@ bool KeyValueStorage::ParseKeyValueLine(std::string const & line, KeyValue & res
return false;
}
auto jsonString = line.c_str() + pos + 1;
json_error_t jsonError;
base::JSONPtr json{json_loads(jsonString, 0, &jsonError)};
if (!json)
{
LOG(LWARNING, ("Cannot create base::Json in line", lineNumber, ":", jsonError.text));
return false;
}
res = std::make_pair(static_cast<uint64_t>(id), std::make_shared<JsonValue>(std::move(json)));
key = static_cast<uint64_t>(id);
value = line.c_str() + pos + 1;
return true;
}
void KeyValueStorage::Insert(uint64_t key, JsonString && valueJson, base::JSONPtr && value)
{
auto const emplace = m_values.emplace(key, std::make_shared<JsonValue>(std::move(value)));
CHECK(valueJson.get(), ());
auto json = valueJson.get(); // value usage after std::move(valueJson)
auto emplace = value && m_values.size() < m_cacheValuesCountLimit
? m_values.emplace(key, std::make_shared<JsonValue>(std::move(value)))
: m_values.emplace(key, std::move(valueJson));
if (!emplace.second) // it is ok for OSM relation with several outer borders
return;
m_storage << static_cast<int64_t>(key) << " " << valueJson.get() << "\n";
m_storage << static_cast<int64_t>(key) << " " << json << "\n";
}
std::shared_ptr<JsonValue> KeyValueStorage::Find(uint64_t key) const
@ -75,11 +92,24 @@ std::shared_ptr<JsonValue> KeyValueStorage::Find(uint64_t key) const
if (it == std::end(m_values))
return {};
return it->second;
if (auto json = boost::get<std::shared_ptr<JsonValue>>(&it->second))
return *json;
auto const & jsonString = boost::get<JsonString>(it->second);
auto json = std::make_shared<JsonValue>(json_loads(jsonString.get(), 0, nullptr));
CHECK(json, ());
return json;
}
size_t KeyValueStorage::Size() const
{
return m_values.size();
}
KeyValueStorage::JsonString KeyValueStorage::CopyJsonString(std::string const & value) const
{
char * copy = static_cast<char *>(std::malloc(value.size() + 1));
std::strncpy(copy, value.data(), value.size() + 1);
return JsonString{copy};
}
} // namespace generator

View file

@ -9,6 +9,7 @@
#include <utility>
#include <boost/optional.hpp>
#include <boost/variant.hpp>
#include "3party/jansson/myjansson.hpp"
@ -41,7 +42,7 @@ class KeyValueStorage
public:
using JsonString = std::unique_ptr<char, JSONFreeDeleter>;
explicit KeyValueStorage(std::string const & kvPath,
explicit KeyValueStorage(std::string const & kvPath, size_t cacheValuesCountLimit,
std::function<bool(KeyValue const &)> const & pred = DefaultPred);
KeyValueStorage(KeyValueStorage &&) = default;
@ -50,15 +51,20 @@ public:
KeyValueStorage(KeyValueStorage const &) = delete;
KeyValueStorage & operator=(KeyValueStorage const &) = delete;
void Insert(uint64_t key, JsonString && valueJson, base::JSONPtr && value);
void Insert(uint64_t key, JsonString && valueJson, base::JSONPtr && value = {});
std::shared_ptr<JsonValue> Find(uint64_t key) const;
size_t Size() const;
private:
using Value = boost::variant<std::shared_ptr<JsonValue>, JsonString>;
static bool DefaultPred(KeyValue const &) { return true; }
static bool ParseKeyValueLine(std::string const & line, KeyValue & res, std::streamoff lineNumber);
static bool ParseKeyValueLine(std::string const & line, std::streamoff lineNumber,
uint64_t & key, std::string & value);
JsonString CopyJsonString(std::string const & value) const;
std::fstream m_storage;
std::unordered_map<uint64_t, std::shared_ptr<JsonValue>> m_values;
std::unordered_map<uint64_t, Value> m_values;
size_t m_cacheValuesCountLimit;
};
} // namespace generator

View file

@ -10,7 +10,7 @@ namespace regions
{
RegionInfoGetter::RegionInfoGetter(std::string const & indexPath, std::string const & kvPath)
: m_index{indexer::ReadIndex<indexer::RegionsIndexBox<IndexReader>, MmapReader>(indexPath)}
, m_storage(kvPath)
, m_storage(kvPath, 1'000'000)
{
m_borders.Deserialize(indexPath);
}