Create BlobStorage and its indexer.

This commit is contained in:
Yury Melnichek 2012-09-24 06:45:57 +03:00 committed by Alex Zolotarev
parent 8a6c4f394e
commit 5d851463c1
7 changed files with 363 additions and 1 deletions

76
coding/blob_indexer.cpp Normal file
View file

@ -0,0 +1,76 @@
#include "blob_indexer.hpp"
#include "../coding/byte_stream.hpp"
#include "../coding/endianness.hpp"
#include "../coding/varint.hpp"
#include "../coding/writer.hpp"
#include "../coding/write_to_sink.hpp"
#include "../base/assert.hpp"
#include "../base/base.hpp"
#include "../base/logging.hpp"
#include "../std/algorithm.hpp"
#include "../std/set.hpp"
#include "../std/string.hpp"
BlobIndexer::BlobIndexer(Writer & writer,
size_t maxUncompressedChunkSize,
function<void (char const *, size_t, string &)> const & compressor) :
m_writer(writer),
m_maxUncompressedChunkSize(min(int(maxUncompressedChunkSize), (1 << BITS_IN_CHUNK_SIZE) - 1)),
m_compressor(compressor),
m_totalBlobSizeUncompressed(0),
m_maxBlobSize(0),
m_largeBlobCount(0)
{
ASSERT_LESS(maxUncompressedChunkSize, (1 << BITS_IN_CHUNK_SIZE), ());
CHECK_EQUAL(m_writer.Pos(), 0, ("Writer should not have something written already"));
// Write header.
char const header[] = "Blb";
m_writer.Write(header, 3);
WriteToSink(m_writer, static_cast<uint8_t>(BITS_IN_CHUNK_SIZE));
}
uint64_t BlobIndexer::AddBlob(string const & blob)
{
if (blob.size() > m_maxUncompressedChunkSize)
{
LOG(LINFO, ("Blob bigger than chunk:", m_blobChunkAndOffset.size(), blob.size(),
blob.substr(0, 64)));
++m_largeBlobCount;
}
if (m_currentChunk.size() + blob.size() > m_maxUncompressedChunkSize)
FlushChunk();
m_blobChunkAndOffset.push_back(
(m_chunkOffset.size() << BITS_IN_CHUNK_SIZE) + m_currentChunk.size());
m_currentChunk.insert(m_currentChunk.end(), blob.begin(), blob.end());
return m_blobChunkAndOffset.size() - 1;
}
void BlobIndexer::FlushChunk()
{
if (!m_currentChunk.empty())
{
string compressedChunk;
m_compressor(m_currentChunk.data(), m_currentChunk.size(), compressedChunk);
m_writer.Write(compressedChunk.data(), compressedChunk.size());
WriteToSink(m_writer, static_cast<uint32_t>(m_currentChunk.size()));
uint32_t const chunkPrevOffset = (m_chunkOffset.empty() ? 0 : m_chunkOffset.back());
m_chunkOffset.push_back(compressedChunk.size() + 4 + chunkPrevOffset);
m_currentChunk.clear();
}
}
BlobIndexer::~BlobIndexer()
{
FlushChunk();
for (size_t i = 0; i < m_chunkOffset.size(); ++i)
WriteToSink(m_writer, m_chunkOffset[i]);
for (size_t i = 0; i < m_blobChunkAndOffset.size(); ++i)
WriteToSink(m_writer, m_blobChunkAndOffset[i]);
WriteToSink(m_writer, static_cast<uint32_t>(m_blobChunkAndOffset.size()));
}

39
coding/blob_indexer.hpp Normal file
View file

@ -0,0 +1,39 @@
#pragma once
#include "../std/function.hpp"
#include "../std/string.hpp"
#include "../std/vector.hpp"
#include "../base/base.hpp"
class Writer;
class BlobIndexer
{
public:
BlobIndexer(Writer & writer,
size_t maxUncompressedChunkSize,
function<void (char const *, size_t, string &)> const & compressor);
~BlobIndexer();
// Add blob and return its id.
uint64_t AddBlob(string const & blob);
void LogStats() const;
private:
void FlushChunk();
Writer & m_writer;
size_t const m_maxUncompressedChunkSize;
function<void (char const *, size_t, string &)> const m_compressor;
static uint32_t const BITS_IN_CHUNK_SIZE = 20;
vector<uint32_t> m_chunkOffset;
vector<uint32_t> m_blobChunkAndOffset;
vector<char> m_currentChunk;
// Just for stats.
uint64_t m_totalBlobSizeUncompressed;
uint32_t m_maxBlobSize;
uint32_t m_largeBlobCount;
};

92
coding/blob_storage.cpp Normal file
View file

@ -0,0 +1,92 @@
#include "blob_storage.hpp"
#include "reader.hpp"
// File Format:
// Blobs are grouped together in chunks and then chunks are compressed.
// nb - number of blobs
// nc - number of chunks
//
// [4| Header = "Blb1"]
// [*| Chunk 0 ] [*| Chunk 1 ] ... [*| Chunk nc-1]
// [4| Chunk 1 pos] [4| Chunk 2 pos] ... [4| Pos after the last chunk]
// [4| Blob info 0] [4| Blob info 1] ... [4| Blob info nb-1]
// [4| nb]
//
//
// Chunk Format:
// [*| Chunk data]
// [4| Uncompressed chunk size]
//
// Blob Info Format:
// [ Chunk number ] [Offset in uncompressed chunk]
// | 32 - BITS_IN_CHUNK_SIZE | | BITS_IN_CHUNK_SIZE |
BlobStorage::BlobStorage(Reader const * pReader,
function<void (char const *, size_t, char *, size_t)> decompressor) :
m_pReader(pReader), m_decompressor(decompressor)
{
Init();
}
BlobStorage::~BlobStorage()
{
}
void BlobStorage::Init()
{
string header(3, ' ');
ReadFromPos(*m_pReader, 0, &header[0], 3);
if (header != "Blb")
MYTHROW(BlobStorage::OpenException, (header));
m_bitsInChunkSize = ReadPrimitiveFromPos<uint8_t>(*m_pReader, 3);
uint64_t const fileSize = m_pReader->Size();
uint32_t const blobCount = ReadPrimitiveFromPos<uint32_t>(*m_pReader, fileSize - 4);
m_blobInfo.Init(PolymorphReader(m_pReader->CreateSubReader(
fileSize - 4 - 4 * blobCount,
4 * blobCount)));
uint32_t const chunkCount =
(blobCount > 0 ? (m_blobInfo[blobCount - 1] >> m_bitsInChunkSize) + 1 : 0);
m_chunkOffset.Init(PolymorphReader(m_pReader->CreateSubReader(
fileSize - 4 - 4 * blobCount - 4 * chunkCount,
4 * chunkCount)));
}
uint32_t BlobStorage::Size() const
{
return m_blobInfo.size();
}
uint32_t BlobStorage::GetChunkFromBI(uint32_t blobInfo) const
{
return blobInfo >> m_bitsInChunkSize;
}
uint32_t BlobStorage::GetOffsetFromBI(uint32_t blobInfo) const
{
return blobInfo & ((1 << m_bitsInChunkSize) - 1);
}
void BlobStorage::GetBlob(uint32_t i, string & blob) const
{
ASSERT_LESS(i, Size(), ());
uint32_t const blobInfo = m_blobInfo[i];
uint32_t const chunk = GetChunkFromBI(blobInfo);
uint32_t const chunkBeg = (chunk == 0 ? 0 : m_chunkOffset[chunk - 1]);
uint32_t const chunkEnd = m_chunkOffset[chunk];
vector<char> compressedData(chunkEnd - chunkBeg);
ASSERT_GREATER(compressedData.size(), 4, ());
m_pReader->Read(START_OFFSET + chunkBeg, &compressedData[0], compressedData.size());
uint32_t const decompressedSize = ReadPrimitiveFromPos<uint32_t>(
MemReader(&compressedData[0], compressedData.size()), compressedData.size() - 4);
vector<char> data(decompressedSize);
m_decompressor(&compressedData[0], compressedData.size() - 4, &data[0], data.size());
uint32_t const blobOffset = GetOffsetFromBI(blobInfo);
if (i != m_blobInfo.size() - 1 && chunk == GetChunkFromBI(m_blobInfo[i+1]))
blob.assign(data.begin() + blobOffset, data.begin() + GetOffsetFromBI(m_blobInfo[i+1]));
else
blob.assign(data.begin() + blobOffset, data.end());
}

42
coding/blob_storage.hpp Normal file
View file

@ -0,0 +1,42 @@
#pragma once
#include "dd_vector.hpp"
#include "polymorph_reader.hpp"
#include "../std/function.hpp"
#include "../std/scoped_ptr.hpp"
#include "../std/string.hpp"
#include "../base/base.hpp"
#include "../base/exception.hpp"
class Reader;
class BlobStorage
{
public:
DECLARE_EXCEPTION(OpenException, RootException);
// Takes ownership of pReader and deletes it, even if exception is thrown.
BlobStorage(Reader const * pReader,
function<void (char const *, size_t, char *, size_t)> decompressor);
~BlobStorage();
// Get blob by its number, starting from 0.
void GetBlob(uint32_t i, string & blob) const;
// Returns the number of blobs.
uint32_t Size() const;
private:
void Init();
uint32_t GetChunkFromBI(uint32_t blobInfo) const;
uint32_t GetOffsetFromBI(uint32_t blobInfo) const;
uint32_t m_bitsInChunkSize;
static uint32_t const START_OFFSET = 4;
scoped_ptr<Reader const> m_pReader;
function<void (char const *, size_t, char *, size_t)> m_decompressor;
DDVector<uint32_t, PolymorphReader> m_blobInfo;
DDVector<uint32_t, PolymorphReader> m_chunkOffset;
};

View file

@ -28,6 +28,8 @@ SOURCES += \
mmap_reader.cpp \
reader_streambuf.cpp \
reader_writer_ops.cpp \
blob_indexer.cpp \
blob_storage.cpp \
HEADERS += \
internal/xmlparser.h \
@ -83,3 +85,5 @@ HEADERS += \
reader_streambuf.hpp \
reader_writer_ops.hpp \
reader_wrapper.hpp \
blob_indexer.hpp \
blob_storage.hpp \

View file

@ -0,0 +1,108 @@
#include "../../testing/testing.hpp"
#include "../blob_storage.hpp"
#include "../blob_indexer.hpp"
#include "compressor_test_utils.hpp"
#include "../../coding/reader.hpp"
#include "../../coding/writer.hpp"
#include "../../base/logging.hpp"
#include "../../base/macros.hpp"
#include "../../std/string.hpp"
#include "../../std/vector.hpp"
namespace
{
string GetBlob(BlobStorage const & bs, uint32_t i)
{
string blob;
bs.GetBlob(i, blob);
return blob;
}
} // unnamed namespace
UNIT_TEST(BlobIndexerEmptyTest)
{
string serial;
{
MemWriter<string> writer(serial);
BlobIndexer indexer(writer, 20, &coding::TestCompressor);
}
char const expected[] = "Blb\x14\0\0\0\0";
TEST_EQUAL(serial, string(&expected[0], &expected[ARRAY_SIZE(expected)-1]), ());
BlobStorage storage(new MemReader(&serial[0], serial.size()), &coding::TestDecompressor);
}
UNIT_TEST(BlobIndexerSimpleSerialTest)
{
string serial;
{
MemWriter<string> writer(serial);
BlobIndexer indexer(writer, 20, &coding::TestCompressor);
indexer.AddBlob("abc");
}
char const expected[] = "Blb\x14" // Header
"<abc>\3\0\0\0" // Chunk 0 with its decompressed size
"\x9\0\0\0" // Chunk 0 end offset
"\0\0\0\0" // Blob 0 info
"\1\0\0\0"; // Number of chunks
TEST_EQUAL(serial, string(&expected[0], &expected[ARRAY_SIZE(expected)-1]), ());
BlobStorage bs(new MemReader(&serial[0], serial.size()), &coding::TestDecompressor);
TEST_EQUAL(bs.Size(), 1, ());
TEST_EQUAL(GetBlob(bs, 0), "abc", ());
}
UNIT_TEST(BlobIndexerSerialTest)
{
string serial;
{
MemWriter<string> writer(serial);
BlobIndexer indexer(writer, 5, &coding::TestCompressor);
indexer.AddBlob("abc"); // Chunk 0
indexer.AddBlob("d"); // Chunk 0
indexer.AddBlob("ef"); // Chunk 1
indexer.AddBlob("1234567890"); // Chunk 2
indexer.AddBlob("0987654321"); // Chunk 3
indexer.AddBlob("Hello"); // Chunk 4
indexer.AddBlob("World"); // Chunk 5
indexer.AddBlob("!"); // Chunk 6
}
char const expected[] = "Blb\x14" // Header
"<abcd>\x4\0\0\0" // Chunk 0
"<ef>\x2\0\0\0" // Chunk 1
"<1234567890>\xA\0\0\0" // Chunk 2
"<0987654321>\xA\0\0\0" // Chunk 3
"<Hello>\x5\0\0\0" // Chunk 4
"<World>\x5\0\0\0" // Chunk 5
"<!>\x1\0\0\0" // Chunk 6
"\x0A\0\0\0" // Chunk 0 end pos
"\x12\0\0\0" // Chunk 1 end pos
"\x22\0\0\0" // Chunk 2 end pos
"\x32\0\0\0" // Chunk 3 end pos
"\x3D\0\0\0" // Chunk 4 end pos
"\x48\0\0\0" // Chunk 5 end pos
"\x4F\0\0\0" // Chunk 6 end pos
"\x0\0\x00\0" // Blob 0 info
"\x3\0\x00\0" // Blob 1 info
"\x0\0\x10\0" // Blob 2 info
"\x0\0\x20\0" // Blob 3 info
"\x0\0\x30\0" // Blob 4 info
"\x0\0\x40\0" // Blob 5 info
"\x0\0\x50\0" // Blob 6 info
"\x0\0\x60\0" // Blob 7 info
"\x8\0\0\0" // Number of blobs
;
TEST_EQUAL(serial, string(&expected[0], ARRAY_SIZE(expected) - 1), ());
BlobStorage bs(new MemReader(&serial[0], serial.size()), &coding::TestDecompressor);
TEST_EQUAL(bs.Size(), 8, ());
TEST_EQUAL(GetBlob(bs, 0), "abc", ());
TEST_EQUAL(GetBlob(bs, 1), "d", ());
TEST_EQUAL(GetBlob(bs, 2), "ef", ());
TEST_EQUAL(GetBlob(bs, 3), "1234567890", ());
TEST_EQUAL(GetBlob(bs, 4), "0987654321", ());
TEST_EQUAL(GetBlob(bs, 5), "Hello", ());
TEST_EQUAL(GetBlob(bs, 6), "World", ());
TEST_EQUAL(GetBlob(bs, 7), "!", ());
}

View file

@ -37,7 +37,8 @@ SOURCES += ../../testing/testingmain.cpp \
file_data_test.cpp \
zip_reader_test.cpp \
trie_test.cpp \
reader_writer_ops_test.cpp
reader_writer_ops_test.cpp \
blob_storage_test.cpp \
HEADERS += \
reader_test.hpp \