From 5a87f34aaca9a416b2e5cdc89cfb0a152124f6b1 Mon Sep 17 00:00:00 2001 From: Alex Zolotarev Date: Fri, 19 Jun 2015 22:11:02 +0300 Subject: [PATCH] =?UTF-8?q?[alohalytics]=20Gracefully=20store=20all=20mess?= =?UTF-8?q?ages=20in=20the=20MessagesQueue=E2=80=99s=20destructor.?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- 3party/Alohalytics/src/messages_queue.h | 47 +++++++++++-------- .../Alohalytics/tests/test_messages_queue.cc | 23 ++++----- 2 files changed, 40 insertions(+), 30 deletions(-) diff --git a/3party/Alohalytics/src/messages_queue.h b/3party/Alohalytics/src/messages_queue.h index d60f034844..cfe821e0cb 100644 --- a/3party/Alohalytics/src/messages_queue.h +++ b/3party/Alohalytics/src/messages_queue.h @@ -26,6 +26,7 @@ SOFTWARE. // block of bytes. If storage directory is set, it stores everything in a file (on a separate thread too). // When TMaxFileSizeInBytes limit is hit, file is "archived" (see TFileArchiver in constructor) // and a new file is created instead. +// Destructor gracefully processes all commands left in the queue. #ifndef MESSAGES_QUEUE_H #define MESSAGES_QUEUE_H @@ -75,9 +76,9 @@ class MessagesQueue final { ~MessagesQueue() { { - std::lock_guard lock(mutex_); + std::lock_guard lock(commands_mutex_); worker_thread_should_exit_ = true; - condition_variable_.notify_all(); + commands_condition_variable_.notify_all(); } worker_thread_.join(); } @@ -86,18 +87,21 @@ class MessagesQueue final { // Executed on the WorkerThread. void SetStorageDirectory(std::string directory) { FileManager::AppendDirectorySlash(directory); - std::lock_guard lock(mutex_); + std::lock_guard lock(commands_mutex_); commands_queue_.push_back(std::bind(&MessagesQueue::ProcessInitializeStorageCommand, this, directory)); - condition_variable_.notify_all(); + commands_condition_variable_.notify_all(); } // Stores message into a file archive (if SetStorageDirectory was called with a valid directory), // otherwise stores messages in-memory. // Executed on the WorkerThread. void PushMessage(const std::string & message) { - std::lock_guard lock(mutex_); - messages_buffer_.append(message); + { + std::lock_guard lock(messages_mutex_); + messages_buffer_.append(message); + } + std::lock_guard lock(commands_mutex_); commands_queue_.push_back(std::bind(&MessagesQueue::ProcessMessageCommand, this)); - condition_variable_.notify_all(); + commands_condition_variable_.notify_all(); } // Processor should return true if file was successfully processed (e.g. uploaded to a server, etc.). @@ -108,9 +112,9 @@ class MessagesQueue final { // Executed on the WorkerThread. void ProcessArchivedFiles(TArchivedFilesProcessor processor, TFileProcessingFinishedCallback callback = TFileProcessingFinishedCallback()) { - std::lock_guard lock(mutex_); + std::lock_guard lock(commands_mutex_); commands_queue_.push_back(std::bind(&MessagesQueue::ProcessArchivedFilesCommand, this, processor, callback)); - condition_variable_.notify_all(); + commands_condition_variable_.notify_all(); } private: @@ -136,14 +140,14 @@ class MessagesQueue final { } } - void StoreMessages(std::string const & messages_buffer) { + void StoreMessages(std::string const & messages) { if (current_file_) { - *current_file_ << messages_buffer << std::flush; + *current_file_ << messages << std::flush; if (current_file_->tellp() >= kMaxFileSizeInBytes) { ArchiveCurrentFile(); } } else { - inmemory_storage_.append(messages_buffer); + inmemory_storage_.append(messages); } } @@ -169,7 +173,7 @@ class MessagesQueue final { void ProcessMessageCommand() { std::string messages_buffer_copy; { - std::lock_guard lock(mutex_); + std::lock_guard lock(messages_mutex_); if (!messages_buffer_.empty()) { messages_buffer_copy.swap(messages_buffer_); } @@ -226,11 +230,15 @@ class MessagesQueue final { TCommand command_to_execute; while (true) { { - std::unique_lock lock(mutex_); - condition_variable_.wait(lock, [this] { return !commands_queue_.empty() || worker_thread_should_exit_; }); + std::unique_lock lock(commands_mutex_); + commands_condition_variable_.wait(lock, + [this] { return !commands_queue_.empty() || worker_thread_should_exit_; }); if (worker_thread_should_exit_) { - // TODO(AlexZ): Should we execute commands (if any) on exit? - // What if they will be too long (e.g. network connection)? + // Gracefully finish all commands left in the queue and exit. + while (!commands_queue_.empty()) { + commands_queue_.front()(); + commands_queue_.pop_front(); + } return; } command_to_execute = commands_queue_.front(); @@ -252,8 +260,9 @@ class MessagesQueue final { std::list commands_queue_; volatile bool worker_thread_should_exit_ = false; - std::mutex mutex_; - std::condition_variable condition_variable_; + std::mutex messages_mutex_; + std::mutex commands_mutex_; + std::condition_variable commands_condition_variable_; // Only WorkerThread accesses this variable. std::unique_ptr current_file_; // Should be the last member of the class to initialize after all other members. diff --git a/3party/Alohalytics/tests/test_messages_queue.cc b/3party/Alohalytics/tests/test_messages_queue.cc index 66ff7e2640..8d4669bbcc 100644 --- a/3party/Alohalytics/tests/test_messages_queue.cc +++ b/3party/Alohalytics/tests/test_messages_queue.cc @@ -319,23 +319,23 @@ void Test_MessagesQueue_SwitchFromInMemoryToFile_and_OfflineEmulation() { const std::string tmpdir = FileManager::GetDirectoryFromFilePath(GenerateTemporaryFileName()); CleanUpQueueFiles(tmpdir); ScopedRemoveFile remover(tmpdir + alohalytics::kCurrentFileName); - HundredKilobytesFileQueue q; std::string archived_file, second_archived_file; { - q.PushMessage(kTestMessage); // This one goes into the memory storage. - q.SetStorageDirectory(tmpdir); // Here message shoud move from memory into the file. - std::thread worker([&q]() { q.PushMessage(kTestWorkerMessage); }); - worker.join(); - - // Wait until messages will be stored into the file. - // NOTE: THIS IS NOT A PRODUCTION-READY PRACTICE! NEVER USE IT IN PRODUCTION! - // Here it is used for tests simplification only. - std::this_thread::sleep_for(std::chrono::milliseconds(20)); + { + HundredKilobytesFileQueue q; + q.PushMessage(kTestMessage); // This one goes into the memory storage. + q.SetStorageDirectory(tmpdir); // Here message shoud move from memory into the file. + std::thread worker([&q]() { q.PushMessage(kTestWorkerMessage); }); + worker.join(); + // After calling queue's destructor, all messages should be gracefully stored in the file. + } TEST_EQUAL(kTestMessage + kTestWorkerMessage, FileManager::ReadFileAsString(tmpdir + alohalytics::kCurrentFileName)); bool processor_was_called = false; FinishTask finish_task; + HundredKilobytesFileQueue q; + q.SetStorageDirectory(tmpdir); q.ProcessArchivedFiles([&processor_was_called, &archived_file](bool is_file, const std::string & full_file_path) { TEST_EQUAL(true, is_file); TEST_EQUAL(kTestMessage + kTestWorkerMessage, FileManager::ReadFileAsString(full_file_path)); @@ -351,8 +351,9 @@ void Test_MessagesQueue_SwitchFromInMemoryToFile_and_OfflineEmulation() { } // Create second archive in the queue after ProcessArchivedFiles() call. + HundredKilobytesFileQueue q; + q.SetStorageDirectory(tmpdir); q.PushMessage(kTestMessage); - { bool archive1_processed = false, archive2_processed = false; FinishTask finish_task;