Removed own thread for user services

This commit is contained in:
r.kuznetsov 2018-01-11 11:24:19 +03:00 committed by Yuri Gorshenin
parent a81f6224d1
commit 23d5128a26
2 changed files with 62 additions and 58 deletions

View file

@ -12,6 +12,7 @@
#include "3party/jansson/myjansson.hpp"
#include <chrono>
#include <limits>
#include <sstream>
//#define STAGE_PASSPORT_SERVER
@ -126,13 +127,6 @@ User::User()
Init();
}
User::~User()
{
std::lock_guard<std::mutex> lock(m_mutex);
m_needTerminate = true;
m_condition.notify_one();
}
void User::Init()
{
std::lock_guard<std::mutex> lock(m_mutex);
@ -212,17 +206,15 @@ void User::Authenticate(std::string const & socialToken, SocialTokenType socialT
m_authenticationInProgress = true;
}
//TODO: refactor this after adding support of delayed tasks in WorkerThread.
// Also we need to strictly control destructors order to eliminate the case when
// a delayed task calls destructed object.
m_workerThread.Push([this, url]()
GetPlatform().RunTask(Platform::Thread::Network, [this, url]()
{
Request(url, nullptr, [this](std::string const & response)
{
SetAccessToken(ParseAccessToken(response));
std::lock_guard<std::mutex> lock(m_mutex);
m_authenticationInProgress = false;
});
std::lock_guard<std::mutex> lock(m_mutex);
m_authenticationInProgress = false;
});
}
@ -238,7 +230,7 @@ void User::RequestUserDetails()
if (m_accessToken.empty())
return;
m_workerThread.Push([this, url]()
GetPlatform().RunTask(Platform::Thread::Network, [this, url]()
{
Request(url, [this](platform::HttpClient & request)
{
@ -271,7 +263,7 @@ void User::UploadUserReviews(std::string && dataStr,
if (m_accessToken.empty())
return;
m_workerThread.Push([this, url, dataStr, onCompleteUploading]()
GetPlatform().RunTask(Platform::Thread::Network, [this, url, dataStr, onCompleteUploading]()
{
size_t const bytesCount = dataStr.size();
Request(url, [this, dataStr](platform::HttpClient & request)
@ -303,50 +295,66 @@ void User::UploadUserReviews(std::string && dataStr,
void User::Request(std::string const & url, BuildRequestHandler const & onBuildRequest,
SuccessHandler const & onSuccess, ErrorHandler const & onError)
{
ASSERT(onSuccess != nullptr, ());
uint8_t constexpr kAttemptsCount = 3;
uint32_t constexpr kWaitingInSeconds = 5;
uint32_t constexpr kDegradationScalar = 2;
RequestImpl(url, onBuildRequest, onSuccess, onError, 0 /* attemptIndex */, kWaitingInSeconds);
}
void User::RequestImpl(std::string const & url, BuildRequestHandler const & onBuildRequest,
SuccessHandler const & onSuccess, ErrorHandler const & onError,
uint8_t attemptIndex, uint32_t waitingTimeInSeconds)
{
ASSERT(onSuccess, ());
uint8_t constexpr kMaxAttemptsCount = 3;
uint32_t constexpr kDegradationFactor = 2;
uint32_t waitingTime = kWaitingInSeconds;
int resultCode = -1;
bool isSuccessfulCode = false;
for (uint8_t i = 0; i < kAttemptsCount; ++i)
platform::HttpClient request(url);
request.SetRawHeader("Accept", "application/json");
if (onBuildRequest)
onBuildRequest(request);
// TODO: Now passport service uses redirection. If it becomes false, uncomment check.
if (request.RunHttpRequest()) // && !request.WasRedirected())
{
platform::HttpClient request(url);
request.SetRawHeader("Accept", "application/json");
if (onBuildRequest != nullptr)
onBuildRequest(request);
// TODO: Now passport service uses redirection. If it becomes false, uncomment checking.
if (request.RunHttpRequest())// && !request.WasRedirected())
resultCode = request.ErrorCode();
isSuccessfulCode = (resultCode == 200 || resultCode == 201);
if (isSuccessfulCode) // Ok.
{
resultCode = request.ErrorCode();
isSuccessfulCode = (resultCode == 200 || resultCode == 201);
if (isSuccessfulCode) // Ok.
{
onSuccess(request.ServerResponse());
break;
}
if (resultCode == 403) // Forbidden.
{
ResetAccessToken();
LOG(LWARNING, ("Access denied for", url));
break;
}
onSuccess(request.ServerResponse());
return;
}
// Wait for some time and retry.
std::unique_lock<std::mutex> lock(m_mutex);
m_condition.wait_for(lock, std::chrono::seconds(waitingTime),
[this]{return m_needTerminate;});
if (m_needTerminate)
break;
waitingTime *= kDegradationScalar;
if (resultCode == 403) // Forbidden.
{
ResetAccessToken();
LOG(LWARNING, ("Access denied for", url));
return;
}
}
if (!isSuccessfulCode && onError != nullptr)
if (attemptIndex != 0)
{
ASSERT_LESS_OR_EQUAL(waitingTimeInSeconds,
std::numeric_limits<uint32_t>::max() / kDegradationFactor, ());
waitingTimeInSeconds *= kDegradationFactor;
}
attemptIndex++;
if (!isSuccessfulCode && attemptIndex == kMaxAttemptsCount && onError)
onError(resultCode);
if (attemptIndex < kMaxAttemptsCount)
{
GetPlatform().RunDelayedTask(Platform::Thread::Network,
std::chrono::seconds(waitingTimeInSeconds),
[this, url, onBuildRequest, onSuccess, onError,
attemptIndex, waitingTimeInSeconds]()
{
RequestImpl(url, onBuildRequest, onSuccess, onError,
attemptIndex, waitingTimeInSeconds);
});
}
}

View file

@ -1,10 +1,6 @@
#pragma once
#include "base/worker_thread.hpp"
#include <condition_variable>
#include <functional>
#include <map>
#include <mutex>
#include <string>
#include <vector>
@ -36,7 +32,6 @@ public:
using CompleteUploadingHandler = std::function<void(bool)>;
User();
~User();
void Authenticate(std::string const & socialToken, SocialTokenType socialTokenType);
bool IsAuthenticated() const;
void ResetAccessToken();
@ -55,11 +50,12 @@ private:
void Request(std::string const & url, BuildRequestHandler const & onBuildRequest,
SuccessHandler const & onSuccess, ErrorHandler const & onError = nullptr);
void RequestImpl(std::string const & url, BuildRequestHandler const & onBuildRequest,
SuccessHandler const & onSuccess, ErrorHandler const & onError,
uint8_t attemptIndex, uint32_t waitingTimeInSeconds);
std::string m_accessToken;
mutable std::mutex m_mutex;
std::condition_variable m_condition;
bool m_needTerminate = false;
bool m_authenticationInProgress = false;
Details m_details;
base::WorkerThread m_workerThread;
};