[mac] Added chunks download strategy

This commit is contained in:
Alex Zolotarev 2011-11-07 05:55:11 +03:00 committed by Alex Zolotarev
parent 2eedf7245c
commit 4ca29d9593
8 changed files with 345 additions and 11 deletions

View file

@ -0,0 +1,92 @@
#include "chunks_download_strategy.hpp"
#include "../std/algorithm.hpp"
#define INVALID_CHUNK -1
namespace downloader
{
ChunksDownloadStrategy::RangeT const ChunksDownloadStrategy::INVALID_RANGE = RangeT(-1, -1);
ChunksDownloadStrategy::ChunksDownloadStrategy(vector<string> const & urls, int64_t fileSize, int64_t chunkSize)
: m_chunkSize(chunkSize)
{
// init servers list
for (size_t i = 0; i < urls.size(); ++i)
m_servers.push_back(make_pair(urls[i], INVALID_RANGE));
// init chunks which should be downloaded
// @TODO implement download resume by saving chunks to download for specified file
for (int64_t i = 0; i < fileSize; i += chunkSize)
{
m_chunksToDownload.insert(RangeT(i, min(i + chunkSize - 1,
fileSize - 1)));
}
}
void ChunksDownloadStrategy::ChunkFinished(bool successfully, int64_t begRange, int64_t endRange)
{
RangeT const chunk(begRange, endRange);
// find server which was downloading this chunk
for (ServersT::iterator it = m_servers.begin(); it != m_servers.end(); ++it)
{
if (it->second == chunk)
{
if (successfully)
it->second = INVALID_RANGE;
else
{
// remove failed server and mark chunk as not downloaded
m_servers.erase(it);
m_chunksToDownload.insert(chunk);
}
break;
}
}
}
ChunksDownloadStrategy::ResultT ChunksDownloadStrategy::NextChunk(string & outUrl,
int64_t & begRange,
int64_t & endRange)
{
if (m_servers.empty())
return EDownloadFailed;
if (m_chunksToDownload.empty())
{
// no more chunks to download
bool allChunksAreFinished = true;
for (size_t i = 0; i < m_servers.size(); ++i)
{
if (m_servers[i].second != INVALID_RANGE)
allChunksAreFinished = false;
}
if (allChunksAreFinished)
return EDownloadSucceeded;
else
return ENoFreeServers;
}
else
{
RangeT const nextChunk = *m_chunksToDownload.begin();
for (size_t i = 0; i < m_servers.size(); ++i)
{
if (m_servers[i].second == INVALID_RANGE)
{
// found not used server
m_servers[i].second = nextChunk;
outUrl = m_servers[i].first;
begRange = nextChunk.first;
endRange = nextChunk.second;
m_chunksToDownload.erase(m_chunksToDownload.begin());
return ENextChunk;
}
}
// if we're here, all servers are busy downloading
return ENoFreeServers;
}
}
} // namespace downloader

View file

@ -0,0 +1,36 @@
#pragma once
#include "../std/string.hpp"
#include "../std/vector.hpp"
#include "../std/utility.hpp"
#include "../std/set.hpp"
namespace downloader
{
class ChunksDownloadStrategy
{
int64_t m_chunkSize;
typedef pair<int64_t, int64_t> RangeT;
static RangeT const INVALID_RANGE;
/// <server url, currently downloading range or INVALID_RANGE if url is not used>
typedef vector<pair<string, RangeT> > ServersT;
ServersT m_servers;
set<RangeT> m_chunksToDownload;
public:
ChunksDownloadStrategy(vector<string> const & urls, int64_t fileSize, int64_t chunkSize = 512 * 1024);
void ChunkFinished(bool successfully, int64_t begRange, int64_t endRange);
enum ResultT
{
ENextChunk,
ENoFreeServers,
EDownloadFailed,
EDownloadSucceeded
};
ResultT NextChunk(string & outUrl, int64_t & begRange, int64_t & endRange);
};
} // namespace downloader

View file

@ -1,7 +1,9 @@
#include "http_request.hpp"
#include "chunks_download_strategy.hpp"
#ifdef DEBUG
#include "../base/thread.hpp"
#include "../base/logging.hpp"
#endif
#include "../coding/writer.hpp"
@ -17,9 +19,9 @@ HttpRequestImpl * CreateNativeHttpRequest(string const & url,
string const & postBody = string());
void DeleteNativeHttpRequest(HttpRequestImpl * request);
//////////////////////////////////////////////////////////////////////////////////////////
HttpRequest::HttpRequest(Writer & writer, CallbackT onFinish, CallbackT onProgress)
: m_status(EInProgress), m_progress(make_pair(-1, -1)), m_writer(writer),
: m_status(EInProgress), m_progress(make_pair(0, -1)), m_writer(writer),
m_onFinish(onFinish), m_onProgress(onProgress)
{
}
@ -30,6 +32,11 @@ HttpRequest::~HttpRequest()
DeleteNativeHttpRequest(*it);
}
void HttpRequest::OnSizeKnown(int64_t projectedSize)
{
LOG(LDEBUG, ("Projected size", projectedSize));
}
void HttpRequest::OnWrite(int64_t offset, void const * buffer, size_t size)
{
#ifdef DEBUG
@ -65,4 +72,46 @@ HttpRequest * HttpRequest::Post(string const & url, Writer & writer, string cons
return self;
}
//////////////////////////////////////////////////////////////////////////////////////////////////////////
class ChunksHttpRequest : public HttpRequest
{
ChunksDownloadStrategy m_strategy;
ChunksDownloadStrategy::ResultT StartThreads()
{
string url;
int64_t beg, end;
ChunksDownloadStrategy::ResultT result;
while ((result = m_strategy.NextChunk(url, beg, end)) == ChunksDownloadStrategy::ENextChunk)
m_threads.push_back(CreateNativeHttpRequest(url, *this, beg, end));
return result;
}
public:
ChunksHttpRequest(vector<string> const & urls, Writer & writer, int64_t fileSize,
CallbackT onFinish, CallbackT onProgress, int64_t chunkSize)
: HttpRequest(writer, onFinish, onProgress), m_strategy(urls, fileSize, chunkSize)
{
ASSERT(!urls.empty(), ("Urls list shouldn't be empty"));
StartThreads();
}
protected:
virtual void OnFinish(long httpCode, int64_t begRange, int64_t endRange)
{
m_strategy.ChunkFinished(httpCode == 200, begRange, endRange);
ChunksDownloadStrategy::ResultT const result = StartThreads();
if (result != ChunksDownloadStrategy::ENoFreeServers)
HttpRequest::OnFinish(result == ChunksDownloadStrategy::EDownloadSucceeded ? 200 : -2,
0, -1);
}
};
HttpRequest * HttpRequest::GetChunks(vector<string> const & urls, Writer & writer, int64_t fileSize,
CallbackT onFinish, CallbackT onProgress, int64_t chunkSize)
{
return new ChunksHttpRequest(urls, writer, fileSize, onFinish, onProgress, chunkSize);
}
} // namespace downloader

View file

@ -3,6 +3,7 @@
#include "../std/function.hpp"
#include "../std/string.hpp"
#include "../std/list.hpp"
#include "../std/vector.hpp"
#include "../std/utility.hpp"
#include "http_request_impl_callback.hpp"
@ -36,6 +37,7 @@ private:
CallbackT m_onFinish;
CallbackT m_onProgress;
protected:
typedef list<HttpRequestImpl *> ThreadsContainerT;
ThreadsContainerT m_threads;
@ -43,6 +45,7 @@ private:
/// @name Callbacks for internal native downloading threads
//@{
virtual void OnSizeKnown(int64_t projectedSize);
virtual void OnWrite(int64_t offset, void const * buffer, size_t size);
virtual void OnFinish(long httpCode, int64_t begRange, int64_t endRange);
//@}
@ -55,9 +58,12 @@ public:
static HttpRequest * Get(string const & url, Writer & writer, CallbackT onFinish,
CallbackT onProgress = CallbackT());
/// Content-type is always "application/json"
/// Content-type for request is always "application/json"
static HttpRequest * Post(string const & url, Writer & writer, string const & postData,
CallbackT onFinish, CallbackT onProgress = CallbackT());
static HttpRequest * GetChunks(vector<string> const & urls, Writer & writer, int64_t fileSize,
CallbackT onFinish, CallbackT onProgress = CallbackT(),
int64_t chunkSize = 512 * 1024);
};
} // namespace downloader

View file

@ -105,11 +105,7 @@
return;
}
int64_t const expectedLength = [response expectedContentLength];
if (expectedLength < 0)
LOG(LDEBUG, ("Server doesn't support HTTP Range"));
else
LOG(LDEBUG, ("Expected content length", expectedLength));
m_callback->OnSizeKnown([response expectedContentLength]);
}
else
{ // in theory, we should never be here

View file

@ -6,6 +6,8 @@ namespace downloader
class IHttpRequestImplCallback
{
public:
/// Called before OnWrite, projectedSize can be -1 if server doesn't support it
virtual void OnSizeKnown(int64_t projectedSize) = 0;
virtual void OnWrite(int64_t offset, void const * buffer, size_t size) = 0;
virtual void OnFinish(long httpCode, int64_t begRange, int64_t endRange) = 0;
};

View file

@ -67,7 +67,8 @@ HEADERS += \
languages.hpp \
url_generator.hpp \
http_request.hpp \
http_request_impl_callback.hpp
http_request_impl_callback.hpp \
chunks_download_strategy.hpp
SOURCES += \
preferred_languages.cpp \
@ -75,4 +76,6 @@ SOURCES += \
video_timer.cpp \
languages.cpp \
url_generator.cpp \
http_request.cpp
http_request.cpp \
chunks_download_strategy.cpp

View file

@ -3,8 +3,10 @@
#include "../../base/logging.hpp"
#include "../../coding/writer.hpp"
#include "../../coding/sha2.hpp"
#include "../../platform/http_request.hpp"
#include "../http_request.hpp"
#include "../chunks_download_strategy.hpp"
#include "../../std/scoped_ptr.hpp"
#include "../../std/bind.hpp"
@ -169,3 +171,151 @@ UNIT_TEST(DownloaderSimplePost)
TEST_EQUAL(buffer, postData, (buffer));
}
}
UNIT_TEST(ChunksDownloadStrategy)
{
string const S1 = "UrlOfServer1";
string const S2 = "UrlOfServer2";
string const S3 = "UrlOfServer3";
pair<int64_t, int64_t> const R1(0, 249), R2(250, 499), R3(500, 749), R4(750, 800);
vector<string> servers;
servers.push_back(S1);
servers.push_back(S2);
servers.push_back(S3);
int64_t const FILE_SIZE = 800;
int64_t const CHUNK_SIZE = 250;
ChunksDownloadStrategy strategy(servers, FILE_SIZE, CHUNK_SIZE);
string s1;
int64_t beg1, end1;
TEST_EQUAL(strategy.NextChunk(s1, beg1, end1), ChunksDownloadStrategy::ENextChunk, ());
string s2;
int64_t beg2, end2;
TEST_EQUAL(strategy.NextChunk(s2, beg2, end2), ChunksDownloadStrategy::ENextChunk, ());
string s3;
int64_t beg3, end3;
TEST_EQUAL(strategy.NextChunk(s3, beg3, end3), ChunksDownloadStrategy::ENextChunk, ());
string sEmpty;
int64_t begEmpty, endEmpty;
TEST_EQUAL(strategy.NextChunk(sEmpty, begEmpty, endEmpty), ChunksDownloadStrategy::ENoFreeServers, ());
TEST_EQUAL(strategy.NextChunk(sEmpty, begEmpty, endEmpty), ChunksDownloadStrategy::ENoFreeServers, ());
TEST(s1 != s2 && s2 != s3 && s3 != s1, (s1, s2, s3));
pair<int64_t, int64_t> const r1(beg1, end1), r2(beg2, end2), r3(beg3, end3);
TEST(r1 != r2 && r2 != r3 && r3 != r1, (r1, r2, r3));
TEST(r1 == R1 || r1 == R2 || r1 == R3 || r1 == R4, (r1));
TEST(r2 == R1 || r2 == R2 || r2 == R3 || r2 == R4, (r2));
TEST(r3 == R1 || r3 == R2 || r3 == R3 || r3 == R4, (r3));
strategy.ChunkFinished(true, beg1, end1);
string s4;
int64_t beg4, end4;
TEST_EQUAL(strategy.NextChunk(s4, beg4, end4), ChunksDownloadStrategy::ENextChunk, ());
TEST_EQUAL(s4, s1, ());
pair<int64_t, int64_t> const r4(beg4, end4);
TEST(r4 != r1 && r4 != r2 && r4 != r3, (r4));
TEST_EQUAL(strategy.NextChunk(sEmpty, begEmpty, endEmpty), ChunksDownloadStrategy::ENoFreeServers, ());
TEST_EQUAL(strategy.NextChunk(sEmpty, begEmpty, endEmpty), ChunksDownloadStrategy::ENoFreeServers, ());
strategy.ChunkFinished(false, beg2, end2);
TEST_EQUAL(strategy.NextChunk(sEmpty, begEmpty, endEmpty), ChunksDownloadStrategy::ENoFreeServers, ());
strategy.ChunkFinished(true, beg4, end4);
string s5;
int64_t beg5, end5;
TEST_EQUAL(strategy.NextChunk(s5, beg5, end5), ChunksDownloadStrategy::ENextChunk, ());
TEST_EQUAL(s5, s4, (s5, s4));
TEST(beg5 == beg2 && end5 == end2, ());
TEST_EQUAL(strategy.NextChunk(sEmpty, begEmpty, endEmpty), ChunksDownloadStrategy::ENoFreeServers, ());
TEST_EQUAL(strategy.NextChunk(sEmpty, begEmpty, endEmpty), ChunksDownloadStrategy::ENoFreeServers, ());
strategy.ChunkFinished(true, beg5, end5);
// 3rd is still alive here
TEST_EQUAL(strategy.NextChunk(sEmpty, begEmpty, endEmpty), ChunksDownloadStrategy::ENoFreeServers, ());
TEST_EQUAL(strategy.NextChunk(sEmpty, begEmpty, endEmpty), ChunksDownloadStrategy::ENoFreeServers, ());
strategy.ChunkFinished(true, beg3, end3);
TEST_EQUAL(strategy.NextChunk(sEmpty, begEmpty, endEmpty), ChunksDownloadStrategy::EDownloadSucceeded, ());
TEST_EQUAL(strategy.NextChunk(sEmpty, begEmpty, endEmpty), ChunksDownloadStrategy::EDownloadSucceeded, ());
}
UNIT_TEST(ChunksDownloadStrategyFAIL)
{
string const S1 = "UrlOfServer1";
string const S2 = "UrlOfServer2";
pair<int64_t, int64_t> const R1(0, 249), R2(250, 499), R3(500, 749), R4(750, 800);
vector<string> servers;
servers.push_back(S1);
servers.push_back(S2);
int64_t const FILE_SIZE = 800;
int64_t const CHUNK_SIZE = 250;
ChunksDownloadStrategy strategy(servers, FILE_SIZE, CHUNK_SIZE);
string s1;
int64_t beg1, end1;
TEST_EQUAL(strategy.NextChunk(s1, beg1, end1), ChunksDownloadStrategy::ENextChunk, ());
string s2;
int64_t beg2, end2;
TEST_EQUAL(strategy.NextChunk(s2, beg2, end2), ChunksDownloadStrategy::ENextChunk, ());
TEST_EQUAL(strategy.NextChunk(s2, beg2, end2), ChunksDownloadStrategy::ENoFreeServers, ());
strategy.ChunkFinished(false, beg1, end1);
TEST_EQUAL(strategy.NextChunk(s2, beg2, end2), ChunksDownloadStrategy::ENoFreeServers, ());
strategy.ChunkFinished(false, beg2, end2);
TEST_EQUAL(strategy.NextChunk(s2, beg2, end2), ChunksDownloadStrategy::EDownloadFailed, ());
}
UNIT_TEST(DownloadChunks)
{
DownloadObserver observer;
string buffer;
MemWriter<string> writer(buffer);
HttpRequest::CallbackT onFinish = bind(&DownloadObserver::OnDownloadFinish, &observer, _1);
HttpRequest::CallbackT onProgress = bind(&DownloadObserver::OnDownloadProgress, &observer, _1);
vector<string> urls;
urls.push_back(TEST_URL1);
urls.push_back(TEST_URL1);
{ // should use only one thread
scoped_ptr<HttpRequest> request(HttpRequest::GetChunks(urls, writer, 5, onFinish, onProgress));
// wait until download is finished
QCoreApplication::exec();
observer.TestOk();
TEST_EQUAL(buffer, "Test1", (buffer));
}
string const SHA256 = "EE6AE6A2A3619B2F4A397326BEC32583DE2196D9D575D66786CB3B6F9D04A633";
observer.Reset();
writer.Seek(0);
buffer.clear();
urls.clear();
urls.push_back(TEST_URL_BIG_FILE);
urls.push_back(TEST_URL_BIG_FILE);
urls.push_back(TEST_URL_BIG_FILE);
{ // 3 threads
scoped_ptr<HttpRequest> request(HttpRequest::GetChunks(urls, writer, 5, onFinish, onProgress, 2048));
// wait until download is finished
QCoreApplication::exec();
observer.TestOk();
TEST_EQUAL(sha2::digest256(buffer), SHA256, (buffer));
}
}