[alohalytics] Pass ForceUpload event via memory queue to avoid some unsent events still passing the queue.

This commit is contained in:
Alex Zolotarev 2015-03-23 16:43:17 +03:00
parent 744796975c
commit 3b49abae70
2 changed files with 47 additions and 35 deletions

View file

@ -38,12 +38,19 @@ namespace alohalytics {
typedef std::map<std::string, std::string> TStringMap;
struct MQMessage {
std::string message;
MQMessage(std::string&& msg) : message(msg) {}
bool force_upload;
explicit MQMessage(bool force_upload = false) : force_upload(force_upload) {}
};
class Stats final {
std::string upload_url_;
// Stores already serialized and ready-to-append event with unique client id.
// NOTE: Statistics will not be uploaded if unique client id was not set.
std::string unique_client_id_event_;
MessageQueue<Stats> message_queue_;
MessageQueue<Stats, MQMessage> message_queue_;
typedef fsq::FSQ<fsq::Config<Stats>> TFileStorageQueue;
// TODO(AlexZ): Refactor storage queue so it can store messages in memory if no file directory was set.
std::unique_ptr<TFileStorageQueue> file_storage_queue_;
@ -61,7 +68,7 @@ class Stats final {
public:
// Processes messages passed from UI in message queue's own thread.
// TODO(AlexZ): Refactor message queue to make this method private.
void OnMessage(const std::string& message, size_t dropped_events);
void OnMessage(const MQMessage& message, size_t dropped_events);
// Called by file storage engine to upload file with collected data.
// Should return true if upload has been successful.

View file

@ -72,20 +72,43 @@ bool Stats::UploadBuffer(const std::string& url, std::string&& buffer, bool debu
// Processes messages passed from UI in message queue's own thread.
// TODO(AlexZ): Refactor message queue to make this method private.
void Stats::OnMessage(const std::string& message, size_t dropped_events) {
void Stats::OnMessage(const MQMessage& message, size_t dropped_events) {
if (dropped_events) {
LOG_IF_DEBUG("Warning:", dropped_events, "were dropped from the queue.");
}
if (file_storage_queue_) {
file_storage_queue_->PushMessage(message);
if (message.force_upload) {
LOG_IF_DEBUG("Forcing statistics uploading.");
if (file_storage_queue_) {
// Upload all data, including 'current' file.
file_storage_queue_->ForceProcessing(true);
} else {
std::string buffer = unique_client_id_event_;
// TODO(AlexZ): thread-safety?
TMemoryContainer copy;
copy.swap(memory_storage_);
for (const auto& evt : copy) {
buffer.append(evt);
}
LOG_IF_DEBUG("Forcing in-memory statistics uploading.");
if (!UploadBuffer(upload_url_, std::move(buffer), debug_mode_)) {
// If failed, merge events we tried to upload with possible new ones.
memory_storage_.splice(memory_storage_.end(), std::move(copy));
LOG_IF_DEBUG("Failed to upload in-memory statistics.");
}
}
} else {
auto& container = memory_storage_;
container.push_back(message);
static const size_t kMaxEventsInMemory = 2048;
if (container.size() > kMaxEventsInMemory) {
container.pop_front();
LOG_IF_DEBUG("Warning: maximum numbers of events in memory (", kMaxEventsInMemory,
") was reached and the oldest one was dropped.");
if (file_storage_queue_) {
file_storage_queue_->PushMessage(message.message);
} else {
auto& container = memory_storage_;
container.push_back(message.message);
constexpr size_t kMaxEventsInMemory = 2048;
if (container.size() > kMaxEventsInMemory) {
container.pop_front();
LOG_IF_DEBUG("Warning: maximum numbers of events in memory (", kMaxEventsInMemory,
") was reached and the oldest one was dropped.");
}
}
}
}
@ -191,13 +214,13 @@ Stats& Stats::SetClientId(const std::string& unique_client_id) {
return *this;
}
static inline void LogEventImpl(AlohalyticsBaseEvent const& event, MessageQueue<Stats>& msq) {
static inline void LogEventImpl(AlohalyticsBaseEvent const& event, MessageQueue<Stats, MQMessage>& mmq) {
std::ostringstream sstream;
{
// unique_ptr is used to correctly serialize polymorphic types.
cereal::BinaryOutputArchive(sstream) << std::unique_ptr<AlohalyticsBaseEvent const, NoOpDeleter>(&event);
}
msq.PushMessage(std::move(sstream.str()));
mmq.PushMessage(std::move(sstream.str()));
}
void Stats::LogEvent(std::string const& event_name) {
@ -252,32 +275,14 @@ void Stats::LogEvent(std::string const& event_name, TStringMap const& value_pair
// Forcedly tries to upload all stored data to the server.
void Stats::Upload() {
if (upload_url_.empty()) {
LOG_IF_DEBUG("Warning: upload server url is not set, nothing was uploaded.");
LOG_IF_DEBUG("Warning: upload server url has not been set, nothing was uploaded.");
return;
}
if (unique_client_id_event_.empty()) {
LOG_IF_DEBUG("Warning: unique client ID is not set so statistics was not uploaded.");
LOG_IF_DEBUG("Warning: unique client ID has not been set, nothing was uploaded.");
return;
}
LOG_IF_DEBUG("Forcing statistics uploading.");
if (file_storage_queue_) {
// Upload all data, including 'current' file.
file_storage_queue_->ForceProcessing(true);
} else {
std::string buffer = unique_client_id_event_;
// TODO(AlexZ): thread-safety?
TMemoryContainer copy;
copy.swap(memory_storage_);
for (const auto& evt : copy) {
buffer.append(evt);
}
LOG_IF_DEBUG("Forcing in-memory statistics uploading.");
if (!UploadBuffer(upload_url_, std::move(buffer), debug_mode_)) {
// If failed, merge events we tried to upload with possible new ones.
memory_storage_.splice(memory_storage_.end(), std::move(copy));
LOG_IF_DEBUG("Failed to upload in-memory statistics.");
}
}
message_queue_.PushMessage(MQMessage(true /* force upload */));
}
} // namespace alohalytics