[alohalytics] Renaming and better documentation for MessagesQueue.

This commit is contained in:
Alex Zolotarev 2015-06-19 21:20:10 +03:00
parent 95799afb3b
commit d1229bddfd

View file

@ -22,6 +22,11 @@ OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
SOFTWARE.
*******************************************************************************/
// This queue stores incoming messages in the memory on a separate thread as a continuous
// block of bytes. If storage directory is set, it stores everything in a file (on a separate thread too).
// When TMaxFileSizeInBytes limit is hit, file is "archived" (see TFileArchiver in constructor)
// and a new file is created instead.
#ifndef MESSAGES_QUEUE_H
#define MESSAGES_QUEUE_H
@ -37,7 +42,7 @@ SOFTWARE.
#include <string> // string
#include <thread> // thread
#include "file_manager.h" // ForEachFileInDir
#include "file_manager.h"
namespace alohalytics {
@ -45,7 +50,7 @@ namespace alohalytics {
typedef std::function<void(const std::string & file_to_archive, const std::string & archived_file)> TFileArchiver;
// Processor should return true if file was processed successfully.
// If file_name_in_content is true, then second parameter is a full path to a file instead of a buffer.
typedef std::function<bool(bool file_name_in_content, const std::string & content)> TArchivedFileProcessor;
typedef std::function<bool(bool file_name_in_content, const std::string & content)> TArchivedFilesProcessor;
enum class ProcessingResult { EProcessedSuccessfully, EProcessingError, ENothingToProcess };
typedef std::function<void(ProcessingResult)> TFileProcessingFinishedCallback;
@ -99,8 +104,9 @@ class MessagesQueue final {
// File is deleted if processor has returned true.
// Processing stops if processor returns false.
// Optional callback is called when all files are processed.
// This call automatically archives current file before processing, but zero-sized "current" file is never archived.
// Executed on the WorkerThread.
void ProcessArchivedFiles(TArchivedFileProcessor processor,
void ProcessArchivedFiles(TArchivedFilesProcessor processor,
TFileProcessingFinishedCallback callback = TFileProcessingFinishedCallback()) {
std::lock_guard<std::mutex> lock(mutex_);
commands_queue_.push_back(std::bind(&MessagesQueue::ProcessArchivedFilesCommand, this, processor, callback));
@ -137,7 +143,7 @@ class MessagesQueue final {
ArchiveCurrentFile();
}
} else {
messages_storage_.append(messages_buffer);
inmemory_storage_.append(messages_buffer);
}
}
@ -153,9 +159,9 @@ class MessagesQueue final {
storage_directory_ = directory;
current_file_ = std::move(new_current_file);
// Also check if there are any messages in the memory storage, and save them to file.
if (!messages_storage_.empty()) {
StoreMessages(messages_storage_);
messages_storage_.clear();
if (!inmemory_storage_.empty()) {
StoreMessages(inmemory_storage_);
inmemory_storage_.clear();
}
}
}
@ -174,12 +180,12 @@ class MessagesQueue final {
}
// If there is no file storage directory set, it should also process messages from the memory buffer.
void ProcessArchivedFilesCommand(TArchivedFileProcessor processor, TFileProcessingFinishedCallback callback) {
void ProcessArchivedFilesCommand(TArchivedFilesProcessor processor, TFileProcessingFinishedCallback callback) {
ProcessingResult result = ProcessingResult::ENothingToProcess;
// Process in-memory messages, if any.
if (!messages_storage_.empty()) {
if (processor(false /* in-memory buffer */, messages_storage_)) {
messages_storage_.clear();
if (!inmemory_storage_.empty()) {
if (processor(false /* in-memory buffer */, inmemory_storage_)) {
inmemory_storage_.clear();
result = ProcessingResult::EProcessedSuccessfully;
} else {
result = ProcessingResult::EProcessingError;
@ -241,7 +247,7 @@ class MessagesQueue final {
// Directory with a slash at the end, where we store "current" file and archived files.
std::string storage_directory_;
// Used as an in-memory storage if storage_dir_ was not set.
std::string messages_storage_;
std::string inmemory_storage_;
typedef std::function<void()> TCommand;
std::list<TCommand> commands_queue_;