[alohalytics] Review fixes.

This commit is contained in:
Alex Zolotarev 2015-06-30 19:38:06 +03:00
parent 3649c83ab3
commit d4bc89141f
4 changed files with 16 additions and 63 deletions

View file

@ -72,6 +72,7 @@ int main(int argc, char * argv[]) {
return result;
}
alohalytics::StatisticsReceiver receiver(argv[1]);
string gzipped_body;
ALOG("FastCGI Server instance is ready to serve clients' requests.");
while (FCGX_Accept_r(&request) >= 0) {
try {
@ -83,18 +84,13 @@ int main(int argc, char * argv[]) {
"Status: 411 Length Required\r\nContent-Type: text/plain\r\n\r\n411 Length Required\n");
continue;
}
unique_ptr<char[]> body(new char[content_length]);
if (fcgi_istream(request.in).read(body.get(), content_length).fail()) {
gzipped_body.resize(content_length);
if (fcgi_istream(request.in).read(&gzipped_body[0], content_length).fail()) {
ALOG("WARNING: Can't read body contents, request is ignored.");
FCGX_FPrintF(request.out, "Status: 400 Bad Request\r\nContent-Type: text/plain\r\n\r\n400 Bad Request\n");
continue;
}
// FCGX_FPrintF(request.out, "Content-Type: text/plain\r\n\r\n");
// for (char ** p = request.envp; *p; ++p) {
// FCGX_FPrintF(request.out, "%s\n", *p);
// }
const char * user_agent_str = FCGX_GetParam("HTTP_USER_AGENT", request.envp);
if (user_agent_str) {
} else {
@ -111,9 +107,11 @@ int main(int argc, char * argv[]) {
ALOG("WARNING: Missing REMOTE_ADDR.");
}
// Process received body.
ALOG("Successfully processed request:", string(body.get(), content_length));
// Process and store received body.
// This call can throw different exceptions.
receiver.ProcessReceivedHTTPBody(gzipped_body, AlohalyticsBaseEvent::CurrentTimestamp(),
remote_addr_str ? remote_addr_str : "", user_agent_str ? user_agent_str : "",
request_uri_str ? request_uri_str : "");
FCGX_FPrintF(request.out, "Status: 200 OK\r\nContent-Type: text/plain\r\nContent-Length: %ld\r\n\r\n%s\n",
kBodyTextInSuccessfulServerReply.size(), kBodyTextInSuccessfulServerReply.c_str());
@ -121,8 +119,8 @@ int main(int argc, char * argv[]) {
ALOG("ERROR: Exception while processing request: ", ex.what());
FCGX_FPrintF(request.out,
"Status: 500 Internal Server Error\r\nContent-Type: text/plain\r\n\r\n500 Internal Server Error\n");
// TODO(AlexZ): Full log here to get more details. Also think about clients who can constantly fail because of bad
// data file.
// TODO(AlexZ): Full log, possibly into the file, to get more details about the error. Also think about clients
// who can constantly fail because of bad data file.
continue;
}
}

View file

@ -118,7 +118,7 @@ int main(int argc, char ** argv) {
}
struct tm stm;
::memset(&stm, 0, sizeof(stm));
if (nullptr == ::strptime(&log_entry[start_pos], "[%d/%b/%Y:%H:%M:%S", &stm)) {
if (NULL == ::strptime(&log_entry[start_pos], "[%d/%b/%Y:%H:%M:%S", &stm)) {
cout << "WARNING: Can't parse server timestamp: " << log_entry.substr(start_pos, end_pos - start_pos) << endl;
continue;
}

View file

@ -40,60 +40,15 @@ namespace alohalytics {
class StatisticsReceiver {
std::string storage_directory_;
// Collect all data into a single file in the queue, but periodically archive it
// and create a new one with a call to ProcessArchivedFiles
UnlimitedFileQueue file_storage_queue_;
// How often should we create separate files with all collected data.
static constexpr uint64_t kArchiveFileIntervalInMS = 1000 * 60 * 60; // One hour.
static constexpr const char * kArchiveExtension = ".cereal";
static constexpr const char * kGzippedArchiveExtension = ".gz";
// Used to archive currently collected data into a separate file.
uint64_t last_checked_time_ms_from_epoch_;
public:
explicit StatisticsReceiver(const std::string & storage_directory)
: storage_directory_(storage_directory),
last_checked_time_ms_from_epoch_(AlohalyticsBaseEvent::CurrentTimestamp()) {
: storage_directory_(storage_directory) {
FileManager::AppendDirectorySlash(storage_directory_);
file_storage_queue_.SetStorageDirectory(storage_directory_);
}
// static std::string GenerateFileNameFromEpochMilliseconds(uint64_t ms_from_epoch) {
// const time_t timet = static_cast<const time_t>(ms_from_epoch / 1000);
// char buf[100];
// if (std::strftime(buf, sizeof(buf), "%F-%H%M%S", std::gmtime(&timet))) {
// return std::string(buf) + kArchiveExtension;
// } else {
// return std::to_string(ms_from_epoch) + kArchiveExtension;
// }
// }
// bool ShouldRenameFile(uint64_t current_ms_from_epoch) {
// last_checked_time_ms_from_epoch_ = current_ms_from_epoch;
// if (current_ms_from_epoch - last_checked_time_ms_from_epoch_ > kArchiveFileIntervalInMS) {
// last_checked_time_ms_from_epoch_ = current_ms_from_epoch;
// return true;
// }
// return false;
// }
void ArchiveCollectedData(const std::string & destination_archive_file) {
// TODO Should we gzip it here? Probably by calling an external tool?
file_storage_queue_.ProcessArchivedFiles([destination_archive_file](bool, const std::string & file_path) {
// Sanity check - this lambda should be called only once.
try {
if (FileManager::GetFileSize(destination_archive_file) > 0) {
std::cerr << "ERROR in the queue? Archived file already exists: " << destination_archive_file << std::endl;
}
} catch (const std::exception &) {
std::rename(file_path.c_str(), destination_archive_file.c_str());
}
return true;
});
}
// Throws exceptions on any error.
void ProcessReceivedHTTPBody(const std::string & gzipped_body,
uint64_t server_timestamp,
@ -112,21 +67,21 @@ class StatisticsReceiver {
in_ar(ptr);
// Cereal does not check if binary data is valid. Let's do it ourselves.
// TODO(AlexZ): Switch from Cereal to another library.
if (ptr.get() == nullptr) {
if (!ptr) {
throw std::invalid_argument("Corrupted Cereal object, this == 0.");
}
// TODO(AlexZ): Looks like an overhead to cast every event instead of only the first one,
// but what if stream contains several mixed bodies?
const AlohalyticsIdEvent * id_event = dynamic_cast<const AlohalyticsIdEvent *>(ptr.get());
if (id_event) {
AlohalyticsIdServerEvent * server_id_event = new AlohalyticsIdServerEvent();
std::unique_ptr<AlohalyticsIdServerEvent> server_id_event(new AlohalyticsIdServerEvent());
server_id_event->timestamp = id_event->timestamp;
server_id_event->id = id_event->id;
server_id_event->server_timestamp = server_timestamp;
server_id_event->ip = ip;
server_id_event->user_agent = user_agent;
server_id_event->uri = uri;
ptr.reset(server_id_event);
ptr = std::move(server_id_event);
}
// Serialize it back.
cereal::BinaryOutputArchive(out_stream) << ptr;

View file

@ -168,7 +168,7 @@ void Test_IsDirectoryWritable() {
const std::string not_writable_system_directory =
#ifdef _MSC_VER
"C:\";
"C:\\";
#else
"/Users";
#endif