From 4ca29d9593b4890441dc163999302a870b76162a Mon Sep 17 00:00:00 2001 From: Alex Zolotarev Date: Mon, 7 Nov 2011 05:55:11 +0300 Subject: [PATCH] [mac] Added chunks download strategy --- platform/chunks_download_strategy.cpp | 92 ++++++++++++ platform/chunks_download_strategy.hpp | 36 +++++ platform/http_request.cpp | 53 ++++++- platform/http_request.hpp | 8 +- platform/http_request_impl_apple.mm | 6 +- platform/http_request_impl_callback.hpp | 2 + platform/platform.pro | 7 +- platform/platform_tests/downloader_test.cpp | 152 +++++++++++++++++++- 8 files changed, 345 insertions(+), 11 deletions(-) create mode 100644 platform/chunks_download_strategy.cpp create mode 100644 platform/chunks_download_strategy.hpp diff --git a/platform/chunks_download_strategy.cpp b/platform/chunks_download_strategy.cpp new file mode 100644 index 0000000000..ff22ab3f1b --- /dev/null +++ b/platform/chunks_download_strategy.cpp @@ -0,0 +1,92 @@ +#include "chunks_download_strategy.hpp" + +#include "../std/algorithm.hpp" + +#define INVALID_CHUNK -1 + + +namespace downloader +{ + +ChunksDownloadStrategy::RangeT const ChunksDownloadStrategy::INVALID_RANGE = RangeT(-1, -1); + +ChunksDownloadStrategy::ChunksDownloadStrategy(vector const & urls, int64_t fileSize, int64_t chunkSize) + : m_chunkSize(chunkSize) +{ + // init servers list + for (size_t i = 0; i < urls.size(); ++i) + m_servers.push_back(make_pair(urls[i], INVALID_RANGE)); + + // init chunks which should be downloaded + // @TODO implement download resume by saving chunks to download for specified file + for (int64_t i = 0; i < fileSize; i += chunkSize) + { + m_chunksToDownload.insert(RangeT(i, min(i + chunkSize - 1, + fileSize - 1))); + } +} + +void ChunksDownloadStrategy::ChunkFinished(bool successfully, int64_t begRange, int64_t endRange) +{ + RangeT const chunk(begRange, endRange); + // find server which was downloading this chunk + for (ServersT::iterator it = m_servers.begin(); it != m_servers.end(); ++it) + { + if (it->second == chunk) + { + if (successfully) + it->second = INVALID_RANGE; + else + { + // remove failed server and mark chunk as not downloaded + m_servers.erase(it); + m_chunksToDownload.insert(chunk); + } + break; + } + } +} + +ChunksDownloadStrategy::ResultT ChunksDownloadStrategy::NextChunk(string & outUrl, + int64_t & begRange, + int64_t & endRange) +{ + if (m_servers.empty()) + return EDownloadFailed; + + if (m_chunksToDownload.empty()) + { + // no more chunks to download + bool allChunksAreFinished = true; + for (size_t i = 0; i < m_servers.size(); ++i) + { + if (m_servers[i].second != INVALID_RANGE) + allChunksAreFinished = false; + } + if (allChunksAreFinished) + return EDownloadSucceeded; + else + return ENoFreeServers; + } + else + { + RangeT const nextChunk = *m_chunksToDownload.begin(); + for (size_t i = 0; i < m_servers.size(); ++i) + { + if (m_servers[i].second == INVALID_RANGE) + { + // found not used server + m_servers[i].second = nextChunk; + outUrl = m_servers[i].first; + begRange = nextChunk.first; + endRange = nextChunk.second; + m_chunksToDownload.erase(m_chunksToDownload.begin()); + return ENextChunk; + } + } + // if we're here, all servers are busy downloading + return ENoFreeServers; + } +} + +} // namespace downloader diff --git a/platform/chunks_download_strategy.hpp b/platform/chunks_download_strategy.hpp new file mode 100644 index 0000000000..1ca82f43a5 --- /dev/null +++ b/platform/chunks_download_strategy.hpp @@ -0,0 +1,36 @@ +#pragma once + +#include "../std/string.hpp" +#include "../std/vector.hpp" +#include "../std/utility.hpp" +#include "../std/set.hpp" + +namespace downloader +{ + +class ChunksDownloadStrategy +{ + int64_t m_chunkSize; + + typedef pair RangeT; + static RangeT const INVALID_RANGE; + /// + typedef vector > ServersT; + ServersT m_servers; + set m_chunksToDownload; + +public: + ChunksDownloadStrategy(vector const & urls, int64_t fileSize, int64_t chunkSize = 512 * 1024); + + void ChunkFinished(bool successfully, int64_t begRange, int64_t endRange); + enum ResultT + { + ENextChunk, + ENoFreeServers, + EDownloadFailed, + EDownloadSucceeded + }; + ResultT NextChunk(string & outUrl, int64_t & begRange, int64_t & endRange); +}; + +} // namespace downloader diff --git a/platform/http_request.cpp b/platform/http_request.cpp index 6cef569e61..50c3cc32d7 100644 --- a/platform/http_request.cpp +++ b/platform/http_request.cpp @@ -1,7 +1,9 @@ #include "http_request.hpp" +#include "chunks_download_strategy.hpp" #ifdef DEBUG #include "../base/thread.hpp" + #include "../base/logging.hpp" #endif #include "../coding/writer.hpp" @@ -17,9 +19,9 @@ HttpRequestImpl * CreateNativeHttpRequest(string const & url, string const & postBody = string()); void DeleteNativeHttpRequest(HttpRequestImpl * request); - +////////////////////////////////////////////////////////////////////////////////////////// HttpRequest::HttpRequest(Writer & writer, CallbackT onFinish, CallbackT onProgress) - : m_status(EInProgress), m_progress(make_pair(-1, -1)), m_writer(writer), + : m_status(EInProgress), m_progress(make_pair(0, -1)), m_writer(writer), m_onFinish(onFinish), m_onProgress(onProgress) { } @@ -30,6 +32,11 @@ HttpRequest::~HttpRequest() DeleteNativeHttpRequest(*it); } +void HttpRequest::OnSizeKnown(int64_t projectedSize) +{ + LOG(LDEBUG, ("Projected size", projectedSize)); +} + void HttpRequest::OnWrite(int64_t offset, void const * buffer, size_t size) { #ifdef DEBUG @@ -65,4 +72,46 @@ HttpRequest * HttpRequest::Post(string const & url, Writer & writer, string cons return self; } +////////////////////////////////////////////////////////////////////////////////////////////////////////// + +class ChunksHttpRequest : public HttpRequest +{ + ChunksDownloadStrategy m_strategy; + + ChunksDownloadStrategy::ResultT StartThreads() + { + string url; + int64_t beg, end; + ChunksDownloadStrategy::ResultT result; + while ((result = m_strategy.NextChunk(url, beg, end)) == ChunksDownloadStrategy::ENextChunk) + m_threads.push_back(CreateNativeHttpRequest(url, *this, beg, end)); + return result; + } + +public: + ChunksHttpRequest(vector const & urls, Writer & writer, int64_t fileSize, + CallbackT onFinish, CallbackT onProgress, int64_t chunkSize) + : HttpRequest(writer, onFinish, onProgress), m_strategy(urls, fileSize, chunkSize) + { + ASSERT(!urls.empty(), ("Urls list shouldn't be empty")); + StartThreads(); + } + +protected: + virtual void OnFinish(long httpCode, int64_t begRange, int64_t endRange) + { + m_strategy.ChunkFinished(httpCode == 200, begRange, endRange); + ChunksDownloadStrategy::ResultT const result = StartThreads(); + if (result != ChunksDownloadStrategy::ENoFreeServers) + HttpRequest::OnFinish(result == ChunksDownloadStrategy::EDownloadSucceeded ? 200 : -2, + 0, -1); + } +}; + +HttpRequest * HttpRequest::GetChunks(vector const & urls, Writer & writer, int64_t fileSize, + CallbackT onFinish, CallbackT onProgress, int64_t chunkSize) +{ + return new ChunksHttpRequest(urls, writer, fileSize, onFinish, onProgress, chunkSize); } + +} // namespace downloader diff --git a/platform/http_request.hpp b/platform/http_request.hpp index 432214aa6c..85980e4718 100644 --- a/platform/http_request.hpp +++ b/platform/http_request.hpp @@ -3,6 +3,7 @@ #include "../std/function.hpp" #include "../std/string.hpp" #include "../std/list.hpp" +#include "../std/vector.hpp" #include "../std/utility.hpp" #include "http_request_impl_callback.hpp" @@ -36,6 +37,7 @@ private: CallbackT m_onFinish; CallbackT m_onProgress; +protected: typedef list ThreadsContainerT; ThreadsContainerT m_threads; @@ -43,6 +45,7 @@ private: /// @name Callbacks for internal native downloading threads //@{ + virtual void OnSizeKnown(int64_t projectedSize); virtual void OnWrite(int64_t offset, void const * buffer, size_t size); virtual void OnFinish(long httpCode, int64_t begRange, int64_t endRange); //@} @@ -55,9 +58,12 @@ public: static HttpRequest * Get(string const & url, Writer & writer, CallbackT onFinish, CallbackT onProgress = CallbackT()); - /// Content-type is always "application/json" + /// Content-type for request is always "application/json" static HttpRequest * Post(string const & url, Writer & writer, string const & postData, CallbackT onFinish, CallbackT onProgress = CallbackT()); + static HttpRequest * GetChunks(vector const & urls, Writer & writer, int64_t fileSize, + CallbackT onFinish, CallbackT onProgress = CallbackT(), + int64_t chunkSize = 512 * 1024); }; } // namespace downloader diff --git a/platform/http_request_impl_apple.mm b/platform/http_request_impl_apple.mm index 80a9c99ac2..69cc03e17b 100644 --- a/platform/http_request_impl_apple.mm +++ b/platform/http_request_impl_apple.mm @@ -105,11 +105,7 @@ return; } - int64_t const expectedLength = [response expectedContentLength]; - if (expectedLength < 0) - LOG(LDEBUG, ("Server doesn't support HTTP Range")); - else - LOG(LDEBUG, ("Expected content length", expectedLength)); + m_callback->OnSizeKnown([response expectedContentLength]); } else { // in theory, we should never be here diff --git a/platform/http_request_impl_callback.hpp b/platform/http_request_impl_callback.hpp index d32ca790d6..397ac032b5 100644 --- a/platform/http_request_impl_callback.hpp +++ b/platform/http_request_impl_callback.hpp @@ -6,6 +6,8 @@ namespace downloader class IHttpRequestImplCallback { public: + /// Called before OnWrite, projectedSize can be -1 if server doesn't support it + virtual void OnSizeKnown(int64_t projectedSize) = 0; virtual void OnWrite(int64_t offset, void const * buffer, size_t size) = 0; virtual void OnFinish(long httpCode, int64_t begRange, int64_t endRange) = 0; }; diff --git a/platform/platform.pro b/platform/platform.pro index 39db108288..83f0d5c99f 100644 --- a/platform/platform.pro +++ b/platform/platform.pro @@ -67,7 +67,8 @@ HEADERS += \ languages.hpp \ url_generator.hpp \ http_request.hpp \ - http_request_impl_callback.hpp + http_request_impl_callback.hpp \ + chunks_download_strategy.hpp SOURCES += \ preferred_languages.cpp \ @@ -75,4 +76,6 @@ SOURCES += \ video_timer.cpp \ languages.cpp \ url_generator.cpp \ - http_request.cpp + http_request.cpp \ + chunks_download_strategy.cpp + diff --git a/platform/platform_tests/downloader_test.cpp b/platform/platform_tests/downloader_test.cpp index 27b9879ef8..f3be092826 100644 --- a/platform/platform_tests/downloader_test.cpp +++ b/platform/platform_tests/downloader_test.cpp @@ -3,8 +3,10 @@ #include "../../base/logging.hpp" #include "../../coding/writer.hpp" +#include "../../coding/sha2.hpp" -#include "../../platform/http_request.hpp" +#include "../http_request.hpp" +#include "../chunks_download_strategy.hpp" #include "../../std/scoped_ptr.hpp" #include "../../std/bind.hpp" @@ -169,3 +171,151 @@ UNIT_TEST(DownloaderSimplePost) TEST_EQUAL(buffer, postData, (buffer)); } } + +UNIT_TEST(ChunksDownloadStrategy) +{ + string const S1 = "UrlOfServer1"; + string const S2 = "UrlOfServer2"; + string const S3 = "UrlOfServer3"; + pair const R1(0, 249), R2(250, 499), R3(500, 749), R4(750, 800); + vector servers; + servers.push_back(S1); + servers.push_back(S2); + servers.push_back(S3); + int64_t const FILE_SIZE = 800; + int64_t const CHUNK_SIZE = 250; + ChunksDownloadStrategy strategy(servers, FILE_SIZE, CHUNK_SIZE); + + string s1; + int64_t beg1, end1; + TEST_EQUAL(strategy.NextChunk(s1, beg1, end1), ChunksDownloadStrategy::ENextChunk, ()); + + string s2; + int64_t beg2, end2; + TEST_EQUAL(strategy.NextChunk(s2, beg2, end2), ChunksDownloadStrategy::ENextChunk, ()); + + string s3; + int64_t beg3, end3; + TEST_EQUAL(strategy.NextChunk(s3, beg3, end3), ChunksDownloadStrategy::ENextChunk, ()); + + string sEmpty; + int64_t begEmpty, endEmpty; + TEST_EQUAL(strategy.NextChunk(sEmpty, begEmpty, endEmpty), ChunksDownloadStrategy::ENoFreeServers, ()); + TEST_EQUAL(strategy.NextChunk(sEmpty, begEmpty, endEmpty), ChunksDownloadStrategy::ENoFreeServers, ()); + + TEST(s1 != s2 && s2 != s3 && s3 != s1, (s1, s2, s3)); + + pair const r1(beg1, end1), r2(beg2, end2), r3(beg3, end3); + TEST(r1 != r2 && r2 != r3 && r3 != r1, (r1, r2, r3)); + TEST(r1 == R1 || r1 == R2 || r1 == R3 || r1 == R4, (r1)); + TEST(r2 == R1 || r2 == R2 || r2 == R3 || r2 == R4, (r2)); + TEST(r3 == R1 || r3 == R2 || r3 == R3 || r3 == R4, (r3)); + + strategy.ChunkFinished(true, beg1, end1); + + string s4; + int64_t beg4, end4; + TEST_EQUAL(strategy.NextChunk(s4, beg4, end4), ChunksDownloadStrategy::ENextChunk, ()); + TEST_EQUAL(s4, s1, ()); + pair const r4(beg4, end4); + TEST(r4 != r1 && r4 != r2 && r4 != r3, (r4)); + + TEST_EQUAL(strategy.NextChunk(sEmpty, begEmpty, endEmpty), ChunksDownloadStrategy::ENoFreeServers, ()); + TEST_EQUAL(strategy.NextChunk(sEmpty, begEmpty, endEmpty), ChunksDownloadStrategy::ENoFreeServers, ()); + + strategy.ChunkFinished(false, beg2, end2); + + TEST_EQUAL(strategy.NextChunk(sEmpty, begEmpty, endEmpty), ChunksDownloadStrategy::ENoFreeServers, ()); + + strategy.ChunkFinished(true, beg4, end4); + + string s5; + int64_t beg5, end5; + TEST_EQUAL(strategy.NextChunk(s5, beg5, end5), ChunksDownloadStrategy::ENextChunk, ()); + TEST_EQUAL(s5, s4, (s5, s4)); + TEST(beg5 == beg2 && end5 == end2, ()); + + TEST_EQUAL(strategy.NextChunk(sEmpty, begEmpty, endEmpty), ChunksDownloadStrategy::ENoFreeServers, ()); + TEST_EQUAL(strategy.NextChunk(sEmpty, begEmpty, endEmpty), ChunksDownloadStrategy::ENoFreeServers, ()); + + strategy.ChunkFinished(true, beg5, end5); + + // 3rd is still alive here + TEST_EQUAL(strategy.NextChunk(sEmpty, begEmpty, endEmpty), ChunksDownloadStrategy::ENoFreeServers, ()); + TEST_EQUAL(strategy.NextChunk(sEmpty, begEmpty, endEmpty), ChunksDownloadStrategy::ENoFreeServers, ()); + + strategy.ChunkFinished(true, beg3, end3); + + TEST_EQUAL(strategy.NextChunk(sEmpty, begEmpty, endEmpty), ChunksDownloadStrategy::EDownloadSucceeded, ()); + TEST_EQUAL(strategy.NextChunk(sEmpty, begEmpty, endEmpty), ChunksDownloadStrategy::EDownloadSucceeded, ()); +} + +UNIT_TEST(ChunksDownloadStrategyFAIL) +{ + string const S1 = "UrlOfServer1"; + string const S2 = "UrlOfServer2"; + pair const R1(0, 249), R2(250, 499), R3(500, 749), R4(750, 800); + vector servers; + servers.push_back(S1); + servers.push_back(S2); + int64_t const FILE_SIZE = 800; + int64_t const CHUNK_SIZE = 250; + ChunksDownloadStrategy strategy(servers, FILE_SIZE, CHUNK_SIZE); + + string s1; + int64_t beg1, end1; + TEST_EQUAL(strategy.NextChunk(s1, beg1, end1), ChunksDownloadStrategy::ENextChunk, ()); + string s2; + int64_t beg2, end2; + TEST_EQUAL(strategy.NextChunk(s2, beg2, end2), ChunksDownloadStrategy::ENextChunk, ()); + TEST_EQUAL(strategy.NextChunk(s2, beg2, end2), ChunksDownloadStrategy::ENoFreeServers, ()); + + strategy.ChunkFinished(false, beg1, end1); + + TEST_EQUAL(strategy.NextChunk(s2, beg2, end2), ChunksDownloadStrategy::ENoFreeServers, ()); + + strategy.ChunkFinished(false, beg2, end2); + + TEST_EQUAL(strategy.NextChunk(s2, beg2, end2), ChunksDownloadStrategy::EDownloadFailed, ()); + +} + +UNIT_TEST(DownloadChunks) +{ + DownloadObserver observer; + string buffer; + MemWriter writer(buffer); + HttpRequest::CallbackT onFinish = bind(&DownloadObserver::OnDownloadFinish, &observer, _1); + HttpRequest::CallbackT onProgress = bind(&DownloadObserver::OnDownloadProgress, &observer, _1); + + vector urls; + urls.push_back(TEST_URL1); + urls.push_back(TEST_URL1); + + { // should use only one thread + scoped_ptr request(HttpRequest::GetChunks(urls, writer, 5, onFinish, onProgress)); + // wait until download is finished + QCoreApplication::exec(); + observer.TestOk(); + TEST_EQUAL(buffer, "Test1", (buffer)); + } + + string const SHA256 = "EE6AE6A2A3619B2F4A397326BEC32583DE2196D9D575D66786CB3B6F9D04A633"; + + observer.Reset(); + writer.Seek(0); + buffer.clear(); + + urls.clear(); + urls.push_back(TEST_URL_BIG_FILE); + urls.push_back(TEST_URL_BIG_FILE); + urls.push_back(TEST_URL_BIG_FILE); + + { // 3 threads + scoped_ptr request(HttpRequest::GetChunks(urls, writer, 5, onFinish, onProgress, 2048)); + // wait until download is finished + QCoreApplication::exec(); + observer.TestOk(); + TEST_EQUAL(sha2::digest256(buffer), SHA256, (buffer)); + } +}