[alohalytics] Code Review fixes.

This commit is contained in:
Alex Zolotarev 2015-07-02 15:03:01 +03:00
parent 010b70ea9f
commit 950162db9a
8 changed files with 28 additions and 30 deletions

View file

@ -55,7 +55,7 @@ int main(int, char **) {
cereal::BinaryInputArchive ar(std::cin);
Processor processor;
try {
while (std::cin.good()) {
while (std::cin) {
std::unique_ptr<AlohalyticsBaseEvent> ptr;
ar(ptr);
bricks::rtti::RuntimeDispatcher<AlohalyticsBaseEvent, AlohalyticsKeyPairsLocationEvent, AlohalyticsKeyPairsEvent,

View file

@ -31,7 +31,6 @@
// $http_content_encoding should be set to gzip
#include <chrono>
#include <cstdlib>
#include <ctime>
#include <exception>
@ -92,18 +91,15 @@ int main(int argc, char * argv[]) {
}
const char * user_agent_str = FCGX_GetParam("HTTP_USER_AGENT", request.envp);
if (user_agent_str) {
} else {
if (!user_agent_str) {
ALOG("WARNING: Missing HTTP User-Agent.");
}
const char * request_uri_str = FCGX_GetParam("REQUEST_URI", request.envp);
if (request_uri_str) {
} else {
if (!request_uri_str) {
ALOG("WARNING: Missing REQUEST_URI.");
}
const char * remote_addr_str = FCGX_GetParam("REMOTE_ADDR", request.envp);
if (remote_addr_str) {
} else {
if (!remote_addr_str) {
ALOG("WARNING: Missing REMOTE_ADDR.");
}
@ -115,7 +111,7 @@ int main(int argc, char * argv[]) {
FCGX_FPrintF(request.out, "Status: 200 OK\r\nContent-Type: text/plain\r\nContent-Length: %ld\r\n\r\n%s\n",
kBodyTextInSuccessfulServerReply.size(), kBodyTextInSuccessfulServerReply.c_str());
} catch (const std::exception & ex) {
} catch (const exception & ex) {
ALOG("ERROR: Exception while processing request: ", ex.what());
FCGX_FPrintF(request.out,
"Status: 500 Internal Server Error\r\nContent-Type: text/plain\r\n\r\n500 Internal Server Error\n");

View file

@ -77,7 +77,7 @@ using namespace alohalytics;
static void DeleteFile(const string & file) { std::remove(file.c_str()); }
int main(int argc, char ** argv) {
int main(int argc, char * argv[]) {
if (argc < 2) {
cout << "Usage: " << argv[0] << " <directory to store merged file>" << endl;
return -1;
@ -94,10 +94,10 @@ int main(int argc, char ** argv) {
size_t good_files_processed = 0, corrupted_files_removed = 0, other_files_removed = 0;
size_t files_total_size = 0;
StatisticsReceiver receiver(directory);
while (getline(cin, log_entry).good()) {
while (getline(cin, log_entry)) {
// IP address.
size_t start_pos = 0;
size_t end_pos = log_entry.find_first_of(' ');
string::size_type start_pos = 0;
string::size_type end_pos = log_entry.find_first_of(' ');
if (end_pos == string::npos) {
cout << "WARNING: Can't get IP address. Invalid log entry? " << log_entry << endl;
continue;

View file

@ -43,7 +43,7 @@ class Stats final {
// In current implementation it is used to distinguish between different users in the events stream on the server.
// NOTE: Statistics will not be uploaded if unique client id was not set.
std::string unique_client_id_;
HundredKilobytesFileQueue messages_queue_;
THundredKilobytesFileQueue messages_queue_;
bool debug_mode_ = false;
// Use alohalytics::Stats::Instance() to access statistics engine.

View file

@ -76,12 +76,12 @@ void Stats::GzipAndArchiveFileInTheQueue(const std::string & in_file, const std:
// Append unique installation id in the beginning of each archived file.
try {
std::string buffer(encoded_unique_client_id);
std::string buffer(std::move(encoded_unique_client_id));
{
std::ifstream fi;
fi.exceptions(std::ifstream::failbit | std::ifstream::badbit);
fi.open(in_file, std::ifstream::in | std::ifstream::binary);
const size_t data_offset = encoded_unique_client_id.size();
const size_t data_offset = buffer.size();
const uint64_t file_size = FileManager::GetFileSize(in_file);
buffer.resize(data_offset + file_size);
fi.read(&buffer[data_offset], static_cast<std::streamsize>(file_size));
@ -134,7 +134,7 @@ Stats & Stats::SetClientId(const std::string & unique_client_id) {
return *this;
}
static inline void LogEventImpl(AlohalyticsBaseEvent const & event, HundredKilobytesFileQueue & messages_queue) {
static inline void LogEventImpl(AlohalyticsBaseEvent const & event, THundredKilobytesFileQueue & messages_queue) {
std::ostringstream sstream;
{
// unique_ptr is used to correctly serialize polymorphic types.

View file

@ -35,7 +35,7 @@ namespace alohalytics {
// Useful helper.
struct ScopedRemoveFile {
std::string file;
ScopedRemoveFile(const std::string & file_to_delete) : file(file_to_delete) {}
template<typename F> ScopedRemoveFile(F && file_to_delete): file(std::forward<F>(file_to_delete)) {}
~ScopedRemoveFile() { std::remove(file.c_str()); }
};
@ -88,6 +88,7 @@ struct FileManager {
}
// Returns true if we can write to the specified directory.
// TODO(AlexZ): Investigate better cross-platform solutions. For example, access() does not always work with setuid, etc.
static bool IsDirectoryWritable(std::string directory) {
AppendDirectorySlash(directory);
std::string temporary_file = directory;

View file

@ -259,7 +259,8 @@ class MessagesQueue final {
typedef std::function<void()> TCommand;
std::list<TCommand> commands_queue_;
volatile bool worker_thread_should_exit_ = false;
// Should be guarded by mutex.
bool worker_thread_should_exit_ = false;
std::mutex messages_mutex_;
std::mutex commands_mutex_;
std::condition_variable commands_condition_variable_;
@ -269,7 +270,7 @@ class MessagesQueue final {
std::thread worker_thread_ = std::thread(&MessagesQueue::WorkerThread, this);
};
typedef MessagesQueue<1024 * 100> HundredKilobytesFileQueue;
typedef MessagesQueue<1024 * 100> THundredKilobytesFileQueue;
// TODO(AlexZ): Remove unnecessary file size checks from this specialization.
typedef MessagesQueue<std::numeric_limits<std::streamoff>::max()> UnlimitedFileQueue;

View file

@ -34,7 +34,7 @@ SOFTWARE.
using alohalytics::FileManager;
using alohalytics::ScopedRemoveFile;
using alohalytics::HundredKilobytesFileQueue;
using alohalytics::THundredKilobytesFileQueue;
using alohalytics::ProcessingResult;
bool EndsWith(const std::string & str, const std::string & suffix) {
@ -112,7 +112,7 @@ static void FinishedCallback(ProcessingResult result, FinishTask & finish_task)
TEST(MessagesQueue, InMemory_Empty) {
bool processor_was_called = false;
HundredKilobytesFileQueue q;
THundredKilobytesFileQueue q;
FinishTask finish_task;
q.ProcessArchivedFiles([&processor_was_called](bool, const std::string &) {
processor_was_called = true; // This code should not be executed.
@ -123,7 +123,7 @@ TEST(MessagesQueue, InMemory_Empty) {
}
TEST(MessagesQueue, InMemory_SuccessfulProcessing) {
HundredKilobytesFileQueue q;
THundredKilobytesFileQueue q;
q.PushMessage(kTestMessage);
std::thread worker([&q]() { q.PushMessage(kTestWorkerMessage); });
worker.join();
@ -140,7 +140,7 @@ TEST(MessagesQueue, InMemory_SuccessfulProcessing) {
}
TEST(MessagesQueue, InMemory_FailedProcessing) {
HundredKilobytesFileQueue q;
THundredKilobytesFileQueue q;
q.PushMessage(kTestMessage);
bool processor_was_called = false;
FinishTask finish_task;
@ -161,7 +161,7 @@ TEST(MessagesQueue, SwitchFromInMemoryToFile_and_OfflineEmulation) {
std::string archived_file, second_archived_file;
{
{
HundredKilobytesFileQueue q;
THundredKilobytesFileQueue q;
q.PushMessage(kTestMessage); // This one goes into the memory storage.
q.SetStorageDirectory(tmpdir); // Here message shoud move from memory into the file.
std::thread worker([&q]() { q.PushMessage(kTestWorkerMessage); });
@ -172,7 +172,7 @@ TEST(MessagesQueue, SwitchFromInMemoryToFile_and_OfflineEmulation) {
bool processor_was_called = false;
FinishTask finish_task;
HundredKilobytesFileQueue q;
THundredKilobytesFileQueue q;
q.SetStorageDirectory(tmpdir);
q.ProcessArchivedFiles([&processor_was_called, &archived_file](bool is_file, const std::string & full_file_path) {
EXPECT_TRUE(is_file);
@ -189,7 +189,7 @@ TEST(MessagesQueue, SwitchFromInMemoryToFile_and_OfflineEmulation) {
}
// Create second archive in the queue after ProcessArchivedFiles() call.
HundredKilobytesFileQueue q;
THundredKilobytesFileQueue q;
q.SetStorageDirectory(tmpdir);
q.PushMessage(kTestMessage);
{
@ -219,7 +219,7 @@ TEST(MessagesQueue, CreateArchiveOnSizeLimitHit) {
const std::string tmpdir = FileManager::GetDirectoryFromFilePath(GenerateTemporaryFileName());
CleanUpQueueFiles(tmpdir);
const ScopedRemoveFile remover(tmpdir + alohalytics::kCurrentFileName);
HundredKilobytesFileQueue q;
THundredKilobytesFileQueue q;
q.SetStorageDirectory(tmpdir);
// Generate messages with total size enough for triggering archiving.
@ -233,7 +233,7 @@ TEST(MessagesQueue, CreateArchiveOnSizeLimitHit) {
size += generated_size;
};
static const std::ofstream::pos_type number_of_bytes_to_generate =
HundredKilobytesFileQueue::kMaxFileSizeInBytes / 2 + 100;
THundredKilobytesFileQueue::kMaxFileSizeInBytes / 2 + 100;
std::thread worker([&generator]() { generator(kTestWorkerMessage, number_of_bytes_to_generate); });
generator(kTestMessage, number_of_bytes_to_generate);
worker.join();
@ -257,7 +257,7 @@ TEST(MessagesQueue, HighLoadAndIntegrity) {
const std::string tmpdir = FileManager::GetDirectoryFromFilePath(GenerateTemporaryFileName());
CleanUpQueueFiles(tmpdir);
const ScopedRemoveFile remover(tmpdir + alohalytics::kCurrentFileName);
HundredKilobytesFileQueue q;
THundredKilobytesFileQueue q;
const int kMaxThreads = 300;
std::mt19937 gen(std::mt19937::default_seed);
std::uniform_int_distribution<> dis('A', 'Z');