[alohalytics] Made max file size in the queue a template parameter.

This commit is contained in:
Alex Zolotarev 2015-06-18 13:01:42 +03:00
parent 95a9921916
commit 79b780128f
4 changed files with 20 additions and 14 deletions

View file

@ -42,7 +42,7 @@ class Stats final {
// Stores already serialized and ready-to-append event with unique client id.
// NOTE: Statistics will not be uploaded if unique client id was not set.
std::string unique_client_id_event_;
MessagesQueue messages_queue_;
HundredKilobytesFileQueue messages_queue_;
bool debug_mode_ = false;
// Use alohalytics::Stats::Instance() to access statistics engine.

View file

@ -139,7 +139,7 @@ Stats & Stats::SetClientId(const std::string & unique_client_id) {
return *this;
}
static inline void LogEventImpl(AlohalyticsBaseEvent const & event, MessagesQueue & messages_queue) {
static inline void LogEventImpl(AlohalyticsBaseEvent const & event, HundredKilobytesFileQueue & messages_queue) {
std::ostringstream sstream;
{
// unique_ptr is used to correctly serialize polymorphic types.

View file

@ -30,6 +30,7 @@ SOFTWARE.
#include <ctime> // time, gmtime
#include <fstream> // ofstream
#include <functional> // bind, function
#include <limits> // numeric_limits
#include <list> // list
#include <memory> // unique_ptr
#include <mutex> // mutex
@ -52,12 +53,12 @@ typedef std::function<void(ProcessingResult)> TFileProcessingFinishedCallback;
constexpr char kCurrentFileName[] = "alohalytics_messages";
constexpr char kArchivedFilesExtension[] = ".archived";
// TMaxFileSizeInBytes is a size limit (before gzip) when we archive "current" file and create a new one for appending.
// Optimal size is the one which (gzipped) can be POSTed to the server as one HTTP request.
template <std::streamoff TMaxFileSizeInBytes>
class MessagesQueue final {
public:
// Size limit (before gzip) when we archive "current" file and create a new one for appending.
// Optimal size is the one which (gzipped) can be POSTed to the server as one HTTP request.
const std::ofstream::pos_type kMaxFileSizeInBytes = 100 * 1024;
static constexpr std::streamoff kMaxFileSizeInBytes = TMaxFileSizeInBytes;
// Default archiving simply renames original file without any additional processing.
static void ArchiveFileByRenamingIt(const std::string & original_file, const std::string & out_archive) {
// TODO(AlexZ): Debug log if rename has failed.
@ -252,6 +253,10 @@ class MessagesQueue final {
std::thread worker_thread_ = std::thread(&MessagesQueue::WorkerThread, this);
};
typedef MessagesQueue<1024 * 100> HundredKilobytesFileQueue;
// TODO(AlexZ): Remove unnecessary file size checks from this specialization.
typedef MessagesQueue<std::numeric_limits<std::streamoff>::max()> UnlimitedFileQueue;
} // namespace alohalytics
#endif // MESSAGES_QUEUE_H

View file

@ -200,7 +200,7 @@ void Test_GetFileSize() {
// ******************* Message Queue tests ******************
using alohalytics::MessagesQueue;
using alohalytics::HundredKilobytesFileQueue;
using alohalytics::ProcessingResult;
bool EndsWith(const std::string & str, const std::string & suffix) {
@ -278,7 +278,7 @@ static void FinishedCallback(ProcessingResult result, FinishTask & finish_task)
void Test_MessagesQueue_InMemory_Empty() {
bool processor_was_called = false;
MessagesQueue q;
HundredKilobytesFileQueue q;
FinishTask finish_task;
q.ProcessArchivedFiles([&processor_was_called](bool, const std::string &) {
processor_was_called = true; // This code should not be executed.
@ -289,7 +289,7 @@ void Test_MessagesQueue_InMemory_Empty() {
}
void Test_MessagesQueue_InMemory_SuccessfulProcessing() {
MessagesQueue q;
HundredKilobytesFileQueue q;
q.PushMessage(kTestMessage);
std::thread worker([&q]() { q.PushMessage(kTestWorkerMessage); });
worker.join();
@ -306,7 +306,7 @@ void Test_MessagesQueue_InMemory_SuccessfulProcessing() {
}
void Test_MessagesQueue_InMemory_FailedProcessing() {
MessagesQueue q;
HundredKilobytesFileQueue q;
q.PushMessage(kTestMessage);
bool processor_was_called = false;
FinishTask finish_task;
@ -324,7 +324,7 @@ void Test_MessagesQueue_SwitchFromInMemoryToFile_and_OfflineEmulation() {
const std::string tmpdir = FileManager::GetDirectoryFromFilePath(GenerateTemporaryFileName());
CleanUpQueueFiles(tmpdir);
ScopedRemoveFile remover(tmpdir + alohalytics::kCurrentFileName);
MessagesQueue q;
HundredKilobytesFileQueue q;
std::string archived_file, second_archived_file;
{
q.PushMessage(kTestMessage); // This one goes into the memory storage.
@ -385,7 +385,7 @@ void Test_MessagesQueue_CreateArchiveOnSizeLimitHit() {
const std::string tmpdir = FileManager::GetDirectoryFromFilePath(GenerateTemporaryFileName());
CleanUpQueueFiles(tmpdir);
ScopedRemoveFile remover(tmpdir + alohalytics::kCurrentFileName);
MessagesQueue q;
HundredKilobytesFileQueue q;
q.SetStorageDirectory(tmpdir);
// Generate messages with total size enough for triggering archiving.
@ -398,7 +398,8 @@ void Test_MessagesQueue_CreateArchiveOnSizeLimitHit() {
}
size += generated_size;
};
static const std::ofstream::pos_type number_of_bytes_to_generate = q.kMaxFileSizeInBytes / 2 + 100;
static const std::ofstream::pos_type number_of_bytes_to_generate =
HundredKilobytesFileQueue::kMaxFileSizeInBytes / 2 + 100;
std::thread worker([&generator]() { generator(kTestWorkerMessage, number_of_bytes_to_generate); });
generator(kTestMessage, number_of_bytes_to_generate);
worker.join();
@ -422,7 +423,7 @@ void Test_MessagesQueue_HighLoadAndIntegrity() {
const std::string tmpdir = FileManager::GetDirectoryFromFilePath(GenerateTemporaryFileName());
CleanUpQueueFiles(tmpdir);
ScopedRemoveFile remover(tmpdir + alohalytics::kCurrentFileName);
MessagesQueue q;
HundredKilobytesFileQueue q;
const int kMaxThreads = 300;
std::mt19937 gen(std::mt19937::default_seed);
std::uniform_int_distribution<> dis('A', 'Z');