diff --git a/coding/serdes_json.hpp b/coding/serdes_json.hpp index 79278eba44..9fa9ca40c5 100644 --- a/coding/serdes_json.hpp +++ b/coding/serdes_json.hpp @@ -26,17 +26,32 @@ public: m_sink.Write(buffer.get(), strlen(buffer.get())); } - void operator()(bool const d, char const * name = nullptr) { ToJSONObject(*m_json, name, d); } - void operator()(uint8_t const d, char const * name = nullptr) { ToJSONObject(*m_json, name, d); } - void operator()(uint32_t const d, char const * name = nullptr) { ToJSONObject(*m_json, name, d); } - void operator()(uint64_t const d, char const * name = nullptr) { ToJSONObject(*m_json, name, d); } - void operator()(int64_t const d, char const * name = nullptr) { ToJSONObject(*m_json, name, d); } - void operator()(double const d, char const * name = nullptr) { ToJSONObject(*m_json, name, d); } - void operator()(std::string const & s, char const * name = nullptr) + template + void ToJsonObjectOrValue(T const & value, char const * name) { - ToJSONObject(*m_json, name, s); + if (name != nullptr) + { + ToJSONObject(*m_json, name, value); + } + else if (json_is_array(m_json)) + { + auto json = ToJSON(value); + json_array_append_new(m_json.get(), json.release()); + } + else + { + ASSERT(false, ("Unsupported JSON structure")); + } } + void operator()(bool const d, char const * name = nullptr) { ToJsonObjectOrValue(d, name); } + void operator()(uint8_t const d, char const * name = nullptr) { ToJsonObjectOrValue(d, name); } + void operator()(uint32_t const d, char const * name = nullptr) { ToJsonObjectOrValue(d, name); } + void operator()(uint64_t const d, char const * name = nullptr) { ToJsonObjectOrValue(d, name); } + void operator()(int64_t const d, char const * name = nullptr) { ToJsonObjectOrValue(d, name); } + void operator()(double const d, char const * name = nullptr) { ToJsonObjectOrValue(d, name); } + void operator()(std::string const & s, char const * name = nullptr) { ToJsonObjectOrValue(s, name); } + template void operator()(std::vector const & vs, char const * name = nullptr) { diff --git a/map/cloud.cpp b/map/cloud.cpp index 3d2d28d183..b8f07d4a10 100644 --- a/map/cloud.cpp +++ b/map/cloud.cpp @@ -29,7 +29,7 @@ using namespace std::chrono; namespace { -uint32_t constexpr kUploadTaskTimeoutInSeconds = 1; +uint32_t constexpr kTaskTimeoutInSeconds = 1; uint64_t constexpr kUpdateTimeoutInHours = 24; uint32_t constexpr kRetryMaxAttempts = 3; uint32_t constexpr kRetryTimeoutInSeconds = 5; @@ -40,8 +40,9 @@ uint64_t constexpr kMaxUploadingFileSizeInBytes = 100 * 1024 * 1024; // 100Mb std::string const kServerUrl = CLOUD_URL; std::string const kCloudServerVersion = "v1"; -std::string const kCloudServerUploadMethod = "upload_url"; -std::string const kCloudServerNotifyMethod = "upload_succeeded"; +std::string const kCloudServerCreateSnapshotMethod = "snapshot/create"; +std::string const kCloudServerUploadMethod = "file/upload/start"; +std::string const kCloudServerNotifyMethod = "file/upload/finish"; std::string const kApplicationJson = "application/json"; std::string GetIndexFilePath(std::string const & indexName) @@ -49,7 +50,7 @@ std::string GetIndexFilePath(std::string const & indexName) return my::JoinPath(GetPlatform().SettingsDir(), indexName); } -std::string BuildUploadingUrl(std::string const & serverPathName) +std::string BuildMethodUrl(std::string const & serverPathName, std::string const & methodName) { if (kServerUrl.empty()) return {}; @@ -58,20 +59,7 @@ std::string BuildUploadingUrl(std::string const & serverPathName) ss << kServerUrl << "/" << kCloudServerVersion << "/" << serverPathName << "/" - << kCloudServerUploadMethod << "/"; - return ss.str(); -} - -std::string BuildNotificationUrl(std::string const & serverPathName) -{ - if (kServerUrl.empty()) - return {}; - - std::ostringstream ss; - ss << kServerUrl << "/" - << kCloudServerVersion << "/" - << serverPathName << "/" - << kCloudServerNotifyMethod << "/"; + << methodName << "/"; return ss.str(); } @@ -80,10 +68,11 @@ std::string BuildAuthenticationToken(std::string const & accessToken) return "Bearer " + accessToken; } -std::string ExtractFileName(std::string const & filePath) +std::string ExtractFileNameWithoutExtension(std::string const & filePath) { std::string path = filePath; my::GetNameFromFullPath(path); + my::GetNameWithoutExt(path); return path; } @@ -115,19 +104,22 @@ std::string CalculateSHA1(std::string const & filePath) return {}; } -std::string BuildUploadingRequestDataJson(std::string const & filePath) +template +std::string SerializeToJson(DataType const & data) { - std::string jsonBody; - Cloud::UploadingRequestData data; - data.m_alohaId = GetPlatform().UniqueClientId(); - data.m_deviceName = GetPlatform().DeviceName(); - data.m_fileName = ExtractFileName(filePath); - - using Sink = MemWriter; - Sink sink(jsonBody); + std::string jsonStr; + using Sink = MemWriter; + Sink sink(jsonStr); coding::SerializerJson serializer(sink); serializer(data); - return jsonBody; + return jsonStr; +} + +template +void DeserializeFromJson(std::string const & jsonStr, DataType & result) +{ + coding::DeserializerJson des(jsonStr); + des(result); } bool IsSuccessfulResultCode(int resultCode) @@ -136,6 +128,22 @@ bool IsSuccessfulResultCode(int resultCode) } } // namespace +Cloud::SnapshotRequestData::SnapshotRequestData(std::vector const & files) + : m_deviceId(GetPlatform().UniqueClientId()) + , m_deviceName(GetPlatform().DeviceName()) + , m_fileNames(files) +{} + +Cloud::UploadingRequestData::UploadingRequestData(std::string const & filePath) + : m_deviceId(GetPlatform().UniqueClientId()) + , m_fileName(ExtractFileNameWithoutExtension(filePath)) +{} + +Cloud::NotifyRequestData::NotifyRequestData(std::string const & filePath, uint64_t fileSize) + : Cloud::UploadingRequestData(filePath) + , m_fileSize(fileSize) +{} + Cloud::Cloud(CloudParams && params) : m_params(std::move(params)) { @@ -212,7 +220,7 @@ void Cloud::Init(std::vector const & filePaths) { std::lock_guard lock(m_mutex); for (auto const & filePath : filePaths) - m_files[ExtractFileName(filePath)] = filePath; + m_files[ExtractFileNameWithoutExtension(filePath)] = filePath; if (m_state != State::Enabled) return; @@ -253,11 +261,11 @@ bool Cloud::ReadIndex() return false; // Read index file. - std::string data; + std::string jsonStr; try { FileReader r(indexFilePath); - r.ReadAsString(data); + r.ReadAsString(jsonStr); } catch (FileReader::Exception const & exception) { @@ -267,15 +275,13 @@ bool Cloud::ReadIndex() } // Parse index file. - if (data.empty()) + if (jsonStr.empty()) return false; try { Index index; - coding::DeserializerJson deserializer(data); - deserializer(index); - + DeserializeFromJson(jsonStr, index); std::lock_guard lock(m_mutex); std::swap(m_index, index); } @@ -298,14 +304,17 @@ void Cloud::UpdateIndex(bool indexExists) { for (auto const & path : m_files) MarkModifiedImpl(path.second); - m_index.m_lastUpdateInHours = h; - m_index.m_isOutdated = true; // Erase disappeared files from index. my::EraseIf(m_index.m_entries, [this](EntryPtr const & entity) { return m_files.find(entity->m_name) == m_files.end(); }); + // Index is outdated only if there is an entry. + m_index.m_isOutdated = !m_index.m_entries.empty(); + if (m_index.m_isOutdated) + m_index.m_lastUpdateInHours = h; + SaveIndexImpl(); } m_indexUpdated = true; @@ -338,7 +347,7 @@ void Cloud::MarkModifiedImpl(std::string const & filePath) if (fileSize > kMaxUploadingFileSizeInBytes) return; - auto const fileName = ExtractFileName(filePath); + auto const fileName = ExtractFileNameWithoutExtension(filePath); auto entryPtr = GetEntryImpl(fileName); if (entryPtr) { @@ -366,17 +375,10 @@ void Cloud::SaveIndexImpl() const if (m_state != State::Enabled || m_index.m_entries.empty()) return; - std::string jsonData; - { - using Sink = MemWriter; - Sink sink(jsonData); - coding::SerializerJson serializer(sink); - serializer(m_index); - } - auto const indexFilePath = GetIndexFilePath(m_params.m_indexName); try { + auto jsonData = SerializeToJson(m_index); FileWriter w(indexFilePath); w.Write(jsonData.c_str(), jsonData.length()); } @@ -389,9 +391,10 @@ void Cloud::SaveIndexImpl() const void Cloud::ScheduleUploading() { + std::vector snapshotFiles; { std::lock_guard lock(m_mutex); - if (m_state != State::Enabled || !m_index.m_isOutdated || + if (m_state != State::Enabled || !m_index.m_isOutdated || m_index.m_entries.empty() || m_accessToken.empty() || m_uploadingStarted || !m_indexUpdated) { return; @@ -407,17 +410,27 @@ void Cloud::ScheduleUploading() } SortEntriesBeforeUploadingImpl(); + + snapshotFiles.reserve(m_index.m_entries.size()); + for (auto const & entry : m_index.m_entries) + snapshotFiles.emplace_back(entry->m_name); + m_uploadingStarted = true; } if (m_onSynchronizationStarted != nullptr) m_onSynchronizationStarted(); - auto entry = FindOutdatedEntry(); - if (entry != nullptr) - ScheduleUploadingTask(entry, kUploadTaskTimeoutInSeconds, 0 /* attemptIndex */); - else - FinishUploading(SynchronizationResult::Success, {}); + // Create snapshot and begin uploading in case of success. + CreateSnapshotTask(kTaskTimeoutInSeconds, 0 /* attemptIndex */, + std::move(snapshotFiles), [this]() + { + auto entry = FindOutdatedEntry(); + if (entry != nullptr) + ScheduleUploadingTask(entry, kTaskTimeoutInSeconds, 0 /* attemptIndex */); + else + FinishUploading(SynchronizationResult::Success, {}); + }); } void Cloud::ScheduleUploadingTask(EntryPtr const & entry, uint32_t timeout, @@ -426,15 +439,19 @@ void Cloud::ScheduleUploadingTask(EntryPtr const & entry, uint32_t timeout, GetPlatform().RunDelayedTask(Platform::Thread::Network, seconds(timeout), [this, entry, timeout, attemptIndex]() { - ASSERT(m_state == State::Enabled, ()); - ASSERT(!m_accessToken.empty(), ()); - ASSERT(m_uploadingStarted, ()); - ASSERT(entry->m_isOutdated, ()); - - auto const uploadingUrl = BuildUploadingUrl(m_params.m_serverPathName); - if (uploadingUrl.empty()) + #ifdef DEBUG { - FinishUploading(SynchronizationResult::NetworkError, "Empty uploading url"); + std::lock_guard lock(m_mutex); + ASSERT(m_state == State::Enabled, ()); + ASSERT(!m_accessToken.empty(), ()); + ASSERT(m_uploadingStarted, ()); + ASSERT(entry->m_isOutdated, ()); + } + #endif + + if (kServerUrl.empty()) + { + FinishUploading(SynchronizationResult::NetworkError, "Empty server url"); return; } @@ -462,8 +479,15 @@ void Cloud::ScheduleUploadingTask(EntryPtr const & entry, uint32_t timeout, if (entry->m_hash != sha1) { + uint64_t uploadedFileSize = 0; + if (!my::GetFileSize(uploadedName, uploadedFileSize)) + { + FinishUploading(SynchronizationResult::DiskError, "File size calculation error"); + return; + } + // Request uploading. - auto const result = RequestUploading(uploadingUrl, uploadedName); + auto const result = RequestUploading(uploadedName); if (result.m_requestResult.m_status == RequestStatus::NetworkError) { // Retry uploading request up to kRetryMaxAttempts times. @@ -497,8 +521,7 @@ void Cloud::ScheduleUploadingTask(EntryPtr const & entry, uint32_t timeout, } // Notify about successful uploading. - auto const notificationResult = - NotifyAboutUploading(BuildNotificationUrl(m_params.m_serverPathName), uploadedName); + auto const notificationResult = NotifyAboutUploading(uploadedName, uploadedFileSize); if (executeResult.m_status != RequestStatus::Ok) { FinishUploading(SynchronizationResult::NetworkError, notificationResult.m_error); @@ -517,12 +540,66 @@ void Cloud::ScheduleUploadingTask(EntryPtr const & entry, uint32_t timeout, // Schedule next uploading task. auto nextEntry = FindOutdatedEntry(); if (nextEntry != nullptr) - ScheduleUploadingTask(nextEntry, kUploadTaskTimeoutInSeconds, 0 /* attemptIndex */); + ScheduleUploadingTask(nextEntry, kTaskTimeoutInSeconds, 0 /* attemptIndex */); else FinishUploading(SynchronizationResult::Success, {}); }); } +void Cloud::CreateSnapshotTask(uint32_t timeout, uint32_t attemptIndex, + std::vector && files, + SnapshotCompletionHandler && handler) +{ + GetPlatform().RunDelayedTask(Platform::Thread::Network, seconds(timeout), + [this, timeout, attemptIndex, files = std::move(files), + handler = std::move(handler)]() mutable + { + #ifdef DEBUG + { + std::lock_guard lock(m_mutex); + ASSERT(m_state == State::Enabled, ()); + ASSERT(!m_accessToken.empty(), ()); + ASSERT(m_uploadingStarted, ()); + ASSERT(!files.empty(), ()); + } + #endif + + if (kServerUrl.empty()) + { + FinishUploading(SynchronizationResult::NetworkError, "Empty server url"); + return; + } + + auto const result = CreateSnapshot(files); + if (result.m_status == RequestStatus::NetworkError) + { + // Retry request up to kRetryMaxAttempts times. + if (attemptIndex + 1 == kRetryMaxAttempts) + { + FinishUploading(SynchronizationResult::NetworkError, result.m_error); + return; + } + + auto const retryTimeout = attemptIndex == 0 ? kRetryTimeoutInSeconds + : timeout * kRetryDegradationFactor; + CreateSnapshotTask(retryTimeout, attemptIndex + 1, std::move(files), std::move(handler)); + return; + } + else if (result.m_status == RequestStatus::Forbidden) + { + // Finish and notify about invalid access token. + if (m_onInvalidToken != nullptr) + m_onInvalidToken(); + + FinishUploading(SynchronizationResult::AuthError, result.m_error); + return; + } + + if (handler != nullptr) + handler(); + }); +} + std::string Cloud::PrepareFileToUploading(std::string const & fileName) { // 1. Get path to the original uploading file. @@ -543,8 +620,8 @@ std::string Cloud::PrepareFileToUploading(std::string const & fileName) return {}; // 3. Create a temporary file from the original uploading file. - auto name = ExtractFileName(filePath); - auto const tmpPath = my::JoinFoldersToPath(GetPlatform().TmpDir(), name); + auto name = ExtractFileNameWithoutExtension(filePath); + auto const tmpPath = my::JoinFoldersToPath(GetPlatform().TmpDir(), name + ".tmp"); if (!my::CopyFileX(filePath, tmpPath)) return {}; @@ -557,34 +634,63 @@ std::string Cloud::PrepareFileToUploading(std::string const & fileName) if (originalSha1 != tmpSha1) return {}; + auto const outputPath = my::JoinFoldersToPath(GetPlatform().TmpDir(), + name + ".uploaded"); + // 5. If the file is zipped return path to the temporary file. - auto ext = my::GetFileExtension(tmpPath); + auto ext = my::GetFileExtension(filePath); strings::AsciiToLower(ext); if (ext == m_params.m_zipExtension) { - tmpFileGuard.release(); // Do not delete temporary file here. - return tmpPath; + if (my::RenameFileX(tmpPath, outputPath)) + { + tmpFileGuard.release(); + return outputPath; + } + return {}; } // 6. Zip file and return path. - my::GetNameWithoutExt(name); - auto const zipPath = my::JoinFoldersToPath(GetPlatform().TmpDir(), - name + m_params.m_zipExtension); - if (CreateZipFromPathDeflatedAndDefaultCompression(tmpPath, zipPath)) - return zipPath; + if (CreateZipFromPathDeflatedAndDefaultCompression(tmpPath, outputPath)) + return outputPath; return {}; } -Cloud::UploadingResult Cloud::RequestUploading(std::string const & uploadingUrl, - std::string const & filePath) const +Cloud::RequestResult Cloud::CreateSnapshot(std::vector const & files) const +{ + ASSERT(!files.empty(), ()); + auto const url = BuildMethodUrl(m_params.m_serverPathName, kCloudServerCreateSnapshotMethod); + platform::HttpClient request(url); + request.SetRawHeader("Accept", kApplicationJson); + request.SetRawHeader("Authorization", BuildAuthenticationToken(m_accessToken)); + request.SetBodyData(SerializeToJson(SnapshotRequestData(files)), kApplicationJson); + + if (request.RunHttpRequest() && !request.WasRedirected()) + { + int const resultCode = request.ErrorCode(); + if (IsSuccessfulResultCode(resultCode)) + return {RequestStatus::Ok, {}}; + + if (resultCode == 403) + { + LOG(LWARNING, ("Access denied for", url)); + return {RequestStatus::Forbidden, request.ServerResponse()}; + } + } + + return {RequestStatus::NetworkError, request.ServerResponse()}; +} + +Cloud::UploadingResult Cloud::RequestUploading(std::string const & filePath) const { UploadingResult result; - platform::HttpClient request(uploadingUrl); + auto const url = BuildMethodUrl(m_params.m_serverPathName, kCloudServerUploadMethod); + platform::HttpClient request(url); request.SetRawHeader("Accept", kApplicationJson); request.SetRawHeader("Authorization", BuildAuthenticationToken(m_accessToken)); - request.SetBodyData(BuildUploadingRequestDataJson(filePath), kApplicationJson); + request.SetBodyData(SerializeToJson(UploadingRequestData(filePath)), kApplicationJson); if (request.RunHttpRequest() && !request.WasRedirected()) { @@ -592,14 +698,13 @@ Cloud::UploadingResult Cloud::RequestUploading(std::string const & uploadingUrl, if (IsSuccessfulResultCode(resultCode)) { result.m_requestResult = {RequestStatus::Ok, {}}; - coding::DeserializerJson des(request.ServerResponse()); - des(result.m_response); + DeserializeFromJson(request.ServerResponse(), result.m_response); return result; } if (resultCode == 403) { - LOG(LWARNING, ("Access denied for", uploadingUrl)); + LOG(LWARNING, ("Access denied for", url)); result.m_requestResult = {RequestStatus::Forbidden, request.ServerResponse()}; return result; } @@ -638,13 +743,15 @@ Cloud::RequestResult Cloud::ExecuteUploading(UploadingResponseData const & respo return {RequestStatus::NetworkError, errorStr}; } -Cloud::RequestResult Cloud::NotifyAboutUploading(std::string const & notificationUrl, - std::string const & filePath) const +Cloud::RequestResult Cloud::NotifyAboutUploading(std::string const & filePath, + uint64_t fileSize) const { - platform::HttpClient request(notificationUrl); + auto const url = BuildMethodUrl(m_params.m_serverPathName, kCloudServerNotifyMethod); + platform::HttpClient request(url); request.SetRawHeader("Accept", kApplicationJson); request.SetRawHeader("Authorization", BuildAuthenticationToken(m_accessToken)); - request.SetBodyData(BuildUploadingRequestDataJson(filePath), kApplicationJson); + request.SetBodyData(SerializeToJson(NotifyRequestData(filePath, fileSize)), + kApplicationJson); if (request.RunHttpRequest() && !request.WasRedirected()) { @@ -654,7 +761,7 @@ Cloud::RequestResult Cloud::NotifyAboutUploading(std::string const & notificatio if (resultCode == 403) { - LOG(LWARNING, ("Access denied for", notificationUrl)); + LOG(LWARNING, ("Access denied for", url)); return {RequestStatus::Forbidden, request.ServerResponse()}; } } diff --git a/map/cloud.hpp b/map/cloud.hpp index fa98127150..6e36d8d928 100644 --- a/map/cloud.hpp +++ b/map/cloud.hpp @@ -63,14 +63,27 @@ public: visitor(m_lastSyncTimestamp, "lastSyncTimestamp")) }; + struct SnapshotRequestData + { + std::string m_deviceId; + std::string m_deviceName; + std::vector m_fileNames; + + explicit SnapshotRequestData(std::vector const & files = {}); + + DECLARE_VISITOR_AND_DEBUG_PRINT(SnapshotRequestData, visitor(m_deviceId, "device_id"), + visitor(m_deviceName, "device_name"), + visitor(m_fileNames, "file_names")) + }; + struct UploadingRequestData { - std::string m_alohaId; - std::string m_deviceName; + std::string m_deviceId; std::string m_fileName; - DECLARE_VISITOR_AND_DEBUG_PRINT(UploadingRequestData, visitor(m_alohaId, "aloha_id"), - visitor(m_deviceName, "device_name"), + explicit UploadingRequestData(std::string const & filePath = {}); + + DECLARE_VISITOR_AND_DEBUG_PRINT(UploadingRequestData, visitor(m_deviceId, "device_id"), visitor(m_fileName, "file_name")) }; @@ -85,6 +98,18 @@ public: visitor(m_method, "method")) }; + struct NotifyRequestData : public UploadingRequestData + { + uint64_t m_fileSize = 0; + + NotifyRequestData() = default; + NotifyRequestData(std::string const & filePath, uint64_t fileSize); + + DECLARE_VISITOR_AND_DEBUG_PRINT(NotifyRequestData, visitor(m_deviceId, "device_id"), + visitor(m_fileName, "file_name"), + visitor(m_fileSize, "file_size")) + }; + enum class RequestStatus { Ok, @@ -136,6 +161,7 @@ public: using SynchronizationStartedHandler = std::function; using SynchronizationFinishedHandler = std::function; + using SnapshotCompletionHandler = std::function; struct CloudParams { @@ -161,7 +187,9 @@ public: explicit Cloud(CloudParams && params); ~Cloud(); + // Handler can be called from non-UI thread. void SetInvalidTokenHandler(InvalidTokenHandler && onInvalidToken); + // Handlers can be called from non-UI thread. void SetSynchronizationHandlers(SynchronizationStartedHandler && onSynchronizationStarted, SynchronizationFinishedHandler && onSynchronizationFinished); @@ -188,6 +216,9 @@ private: void ScheduleUploading(); void ScheduleUploadingTask(EntryPtr const & entry, uint32_t timeout, uint32_t attemptIndex); + void CreateSnapshotTask(uint32_t timeout, uint32_t attemptIndex, + std::vector && files, + SnapshotCompletionHandler && handler); EntryPtr FindOutdatedEntry() const; void FinishUploading(SynchronizationResult result, std::string const & errorStr); void SetAccessToken(std::string const & token); @@ -196,12 +227,11 @@ private: // in case of a disk error. std::string PrepareFileToUploading(std::string const & fileName); - UploadingResult RequestUploading(std::string const & uploadingUrl, - std::string const & filePath) const; + RequestResult CreateSnapshot(std::vector const & files) const; + UploadingResult RequestUploading(std::string const & filePath) const; RequestResult ExecuteUploading(UploadingResponseData const & responseData, std::string const & filePath); - RequestResult NotifyAboutUploading(std::string const & notificationUrl, - std::string const & filePath) const; + RequestResult NotifyAboutUploading(std::string const & filePath, uint64_t fileSize) const; CloudParams const m_params; InvalidTokenHandler m_onInvalidToken;