[alohalytics] Gracefully store all messages in the MessagesQueue’s destructor.

This commit is contained in:
Alex Zolotarev 2015-06-19 22:11:02 +03:00
parent d1229bddfd
commit 5a87f34aac
2 changed files with 40 additions and 30 deletions

View file

@ -26,6 +26,7 @@ SOFTWARE.
// block of bytes. If storage directory is set, it stores everything in a file (on a separate thread too).
// When TMaxFileSizeInBytes limit is hit, file is "archived" (see TFileArchiver in constructor)
// and a new file is created instead.
// Destructor gracefully processes all commands left in the queue.
#ifndef MESSAGES_QUEUE_H
#define MESSAGES_QUEUE_H
@ -75,9 +76,9 @@ class MessagesQueue final {
~MessagesQueue() {
{
std::lock_guard<std::mutex> lock(mutex_);
std::lock_guard<std::mutex> lock(commands_mutex_);
worker_thread_should_exit_ = true;
condition_variable_.notify_all();
commands_condition_variable_.notify_all();
}
worker_thread_.join();
}
@ -86,18 +87,21 @@ class MessagesQueue final {
// Executed on the WorkerThread.
void SetStorageDirectory(std::string directory) {
FileManager::AppendDirectorySlash(directory);
std::lock_guard<std::mutex> lock(mutex_);
std::lock_guard<std::mutex> lock(commands_mutex_);
commands_queue_.push_back(std::bind(&MessagesQueue::ProcessInitializeStorageCommand, this, directory));
condition_variable_.notify_all();
commands_condition_variable_.notify_all();
}
// Stores message into a file archive (if SetStorageDirectory was called with a valid directory),
// otherwise stores messages in-memory.
// Executed on the WorkerThread.
void PushMessage(const std::string & message) {
std::lock_guard<std::mutex> lock(mutex_);
messages_buffer_.append(message);
{
std::lock_guard<std::mutex> lock(messages_mutex_);
messages_buffer_.append(message);
}
std::lock_guard<std::mutex> lock(commands_mutex_);
commands_queue_.push_back(std::bind(&MessagesQueue::ProcessMessageCommand, this));
condition_variable_.notify_all();
commands_condition_variable_.notify_all();
}
// Processor should return true if file was successfully processed (e.g. uploaded to a server, etc.).
@ -108,9 +112,9 @@ class MessagesQueue final {
// Executed on the WorkerThread.
void ProcessArchivedFiles(TArchivedFilesProcessor processor,
TFileProcessingFinishedCallback callback = TFileProcessingFinishedCallback()) {
std::lock_guard<std::mutex> lock(mutex_);
std::lock_guard<std::mutex> lock(commands_mutex_);
commands_queue_.push_back(std::bind(&MessagesQueue::ProcessArchivedFilesCommand, this, processor, callback));
condition_variable_.notify_all();
commands_condition_variable_.notify_all();
}
private:
@ -136,14 +140,14 @@ class MessagesQueue final {
}
}
void StoreMessages(std::string const & messages_buffer) {
void StoreMessages(std::string const & messages) {
if (current_file_) {
*current_file_ << messages_buffer << std::flush;
*current_file_ << messages << std::flush;
if (current_file_->tellp() >= kMaxFileSizeInBytes) {
ArchiveCurrentFile();
}
} else {
inmemory_storage_.append(messages_buffer);
inmemory_storage_.append(messages);
}
}
@ -169,7 +173,7 @@ class MessagesQueue final {
void ProcessMessageCommand() {
std::string messages_buffer_copy;
{
std::lock_guard<std::mutex> lock(mutex_);
std::lock_guard<std::mutex> lock(messages_mutex_);
if (!messages_buffer_.empty()) {
messages_buffer_copy.swap(messages_buffer_);
}
@ -226,11 +230,15 @@ class MessagesQueue final {
TCommand command_to_execute;
while (true) {
{
std::unique_lock<std::mutex> lock(mutex_);
condition_variable_.wait(lock, [this] { return !commands_queue_.empty() || worker_thread_should_exit_; });
std::unique_lock<std::mutex> lock(commands_mutex_);
commands_condition_variable_.wait(lock,
[this] { return !commands_queue_.empty() || worker_thread_should_exit_; });
if (worker_thread_should_exit_) {
// TODO(AlexZ): Should we execute commands (if any) on exit?
// What if they will be too long (e.g. network connection)?
// Gracefully finish all commands left in the queue and exit.
while (!commands_queue_.empty()) {
commands_queue_.front()();
commands_queue_.pop_front();
}
return;
}
command_to_execute = commands_queue_.front();
@ -252,8 +260,9 @@ class MessagesQueue final {
std::list<TCommand> commands_queue_;
volatile bool worker_thread_should_exit_ = false;
std::mutex mutex_;
std::condition_variable condition_variable_;
std::mutex messages_mutex_;
std::mutex commands_mutex_;
std::condition_variable commands_condition_variable_;
// Only WorkerThread accesses this variable.
std::unique_ptr<std::ofstream> current_file_;
// Should be the last member of the class to initialize after all other members.

View file

@ -319,23 +319,23 @@ void Test_MessagesQueue_SwitchFromInMemoryToFile_and_OfflineEmulation() {
const std::string tmpdir = FileManager::GetDirectoryFromFilePath(GenerateTemporaryFileName());
CleanUpQueueFiles(tmpdir);
ScopedRemoveFile remover(tmpdir + alohalytics::kCurrentFileName);
HundredKilobytesFileQueue q;
std::string archived_file, second_archived_file;
{
q.PushMessage(kTestMessage); // This one goes into the memory storage.
q.SetStorageDirectory(tmpdir); // Here message shoud move from memory into the file.
std::thread worker([&q]() { q.PushMessage(kTestWorkerMessage); });
worker.join();
// Wait until messages will be stored into the file.
// NOTE: THIS IS NOT A PRODUCTION-READY PRACTICE! NEVER USE IT IN PRODUCTION!
// Here it is used for tests simplification only.
std::this_thread::sleep_for(std::chrono::milliseconds(20));
{
HundredKilobytesFileQueue q;
q.PushMessage(kTestMessage); // This one goes into the memory storage.
q.SetStorageDirectory(tmpdir); // Here message shoud move from memory into the file.
std::thread worker([&q]() { q.PushMessage(kTestWorkerMessage); });
worker.join();
// After calling queue's destructor, all messages should be gracefully stored in the file.
}
TEST_EQUAL(kTestMessage + kTestWorkerMessage,
FileManager::ReadFileAsString(tmpdir + alohalytics::kCurrentFileName));
bool processor_was_called = false;
FinishTask finish_task;
HundredKilobytesFileQueue q;
q.SetStorageDirectory(tmpdir);
q.ProcessArchivedFiles([&processor_was_called, &archived_file](bool is_file, const std::string & full_file_path) {
TEST_EQUAL(true, is_file);
TEST_EQUAL(kTestMessage + kTestWorkerMessage, FileManager::ReadFileAsString(full_file_path));
@ -351,8 +351,9 @@ void Test_MessagesQueue_SwitchFromInMemoryToFile_and_OfflineEmulation() {
}
// Create second archive in the queue after ProcessArchivedFiles() call.
HundredKilobytesFileQueue q;
q.SetStorageDirectory(tmpdir);
q.PushMessage(kTestMessage);
{
bool archive1_processed = false, archive2_processed = false;
FinishTask finish_task;