forked from organicmaps/organicmaps
[loadersim] CDN support
This commit is contained in:
parent
d8cd552628
commit
0d7d7f64a6
1 changed files with 61 additions and 24 deletions
|
@ -35,11 +35,13 @@ type FetchSource struct {
|
|||
TotalDownloaded int64
|
||||
NumChunkAttempts int
|
||||
NumRetries int
|
||||
IsCDN bool
|
||||
SlowThreshold float64
|
||||
}
|
||||
|
||||
func NewFetchSource(
|
||||
manager *FetchManager, id string, host string,
|
||||
chunkSpeed float64, speedLimit float64, boost float64) *FetchSource {
|
||||
chunkSpeed float64, speedLimit float64, boost float64, isCDN bool, slowThreshold float64) *FetchSource {
|
||||
return &FetchSource{
|
||||
Manager: manager,
|
||||
Id: id,
|
||||
|
@ -50,6 +52,8 @@ func NewFetchSource(
|
|||
ChunkSpeed: chunkSpeed,
|
||||
LastSetTime: time.Now(),
|
||||
NumRetries: 0,
|
||||
IsCDN: isCDN,
|
||||
SlowThreshold: slowThreshold,
|
||||
}
|
||||
}
|
||||
|
||||
|
@ -76,7 +80,7 @@ func (s *FetchSource) RecordError(err error) {
|
|||
s.LastSetTime = time.Now()
|
||||
s.NumRetries += 1
|
||||
s.Status = SOURCE_BAD
|
||||
if s.NumRetries > 3 { // TODO: Magic number
|
||||
if s.NumRetries >= 3 { // TODO: Magic number
|
||||
s.Status = SOURCE_FAILED
|
||||
}
|
||||
}
|
||||
|
@ -122,9 +126,25 @@ func (m *FetchManager) CreateScheduler(path string, size int64) *FetchScheduler
|
|||
}
|
||||
}
|
||||
|
||||
func (m *FetchManager) IsCDNEnabled() bool {
|
||||
for _, source := range m.Sources {
|
||||
if source.IsCDN {
|
||||
continue
|
||||
}
|
||||
if source.Status == SOURCE_FAILED {
|
||||
continue
|
||||
}
|
||||
if source.ChunkSpeed < source.SlowThreshold {
|
||||
continue
|
||||
}
|
||||
return false
|
||||
}
|
||||
return true
|
||||
}
|
||||
|
||||
func (m *FetchManager) PrintSources() {
|
||||
for _, source := range m.Sources {
|
||||
fmt.Printf("%v, status=%d spd=%5.0f espd=%5.0f last_speed=%5.0f score=%5.2f uncertaintyBoost=%5.2f, Total=%5.0f Attempts=%d, timeSinceLastSet=%5.1f, numRetries=%d\n",
|
||||
fmt.Printf("%v, status=%d spd=%5.0f espd=%5.0f last_speed=%5.0f score=%5.2f uncertaintyBoost=%5.2f, Total=%5.0f Attempts=%d, timeSinceLastSet=%5.1f, numRetries=%d, isCDN=%v\n",
|
||||
source.Id,
|
||||
source.Status,
|
||||
source.ChunkSpeed/1024,
|
||||
|
@ -135,11 +155,14 @@ func (m *FetchManager) PrintSources() {
|
|||
float64(source.TotalDownloaded)/1024.0,
|
||||
source.NumChunkAttempts,
|
||||
time.Now().Sub(source.LastSetTime).Seconds(),
|
||||
source.NumRetries)
|
||||
source.NumRetries,
|
||||
source.IsCDN,
|
||||
)
|
||||
}
|
||||
if m.NumTotalChunks != 0 {
|
||||
fmt.Printf("Average chunk time=%.2f, avg chunk speed=%.1f KBps\n", m.AverageChunkTime(), m.AverageChunkSpeed() / 1024)
|
||||
}
|
||||
fmt.Printf("CDN is Enabled: %v\n", m.IsCDNEnabled())
|
||||
fmt.Println()
|
||||
}
|
||||
|
||||
|
@ -165,9 +188,14 @@ func (m *FetchManager) GetSource() *FetchSource {
|
|||
defer m.SourceMutex.Unlock()
|
||||
|
||||
m.PrintSources()
|
||||
|
||||
isCDNEnabled := m.IsCDNEnabled()
|
||||
|
||||
var selectedSource *FetchSource
|
||||
for _, source := range m.Sources {
|
||||
if source.IsCDN && !isCDNEnabled {
|
||||
continue
|
||||
}
|
||||
switch source.Status {
|
||||
case SOURCE_BUSY, SOURCE_FAILED:
|
||||
continue
|
||||
|
@ -308,11 +336,18 @@ type FetchSchedulerTask struct {
|
|||
func (t *FetchSchedulerTask) Run() {
|
||||
// TODO: If there is an error, Run() must be re-attempted several times for each task,
|
||||
// with different sources.
|
||||
source := t.scheduler.Manager.GetSource()
|
||||
if source == nil {
|
||||
fmt.Printf("Cannot acquire new source")
|
||||
time.Sleep(time.Second * time.Duration(t.scheduler.Manager.AverageChunkTime()) * 3) // TODO: magic numbers
|
||||
return
|
||||
var source *FetchSource
|
||||
for {
|
||||
source = t.scheduler.Manager.GetSource()
|
||||
if source == nil {
|
||||
if t.scheduler.Manager.AllSourcesFailed() {
|
||||
return
|
||||
}
|
||||
fmt.Printf("Cannot acquire new source")
|
||||
time.Sleep(time.Second * time.Duration(t.scheduler.Manager.AverageChunkTime()) * 3) // TODO: magic numbers
|
||||
} else {
|
||||
break
|
||||
}
|
||||
}
|
||||
|
||||
t.startTime = time.Now()
|
||||
|
@ -384,14 +419,14 @@ func (t *FetchSchedulerTask) RunWithSource(source *FetchSource) error {
|
|||
} else {
|
||||
break
|
||||
}
|
||||
elapsedTime := time.Now().Sub(t.startTime).Seconds()
|
||||
momentarySpeed := float64(bytesRead) / 1024 / (elapsedTime + 1e-100)
|
||||
// elapsedTime := time.Now().Sub(t.startTime).Seconds()
|
||||
// momentarySpeed := float64(bytesRead) / 1024 / (elapsedTime + 1e-100)
|
||||
//fmt.Printf("%v Momentary speed at %.2f: %.2f\n", source.Id, elapsedTime, momentarySpeed )
|
||||
|
||||
minAllowedSpeed := t.scheduler.Manager.AverageChunkSpeed() / 20 // TODO: magic numbers, review, maybe mark server as bad
|
||||
if elapsedTime > 16 && momentarySpeed < minAllowedSpeed {
|
||||
return fmt.Errorf("Server %v too slow", source.Id)
|
||||
}
|
||||
// minAllowedSpeed := t.scheduler.Manager.AverageChunkSpeed() / 20 // TODO: magic numbers, review, maybe mark server as bad
|
||||
// if elapsedTime > 16 && momentarySpeed < minAllowedSpeed {
|
||||
// fmt.Printf("Server %v too slow", source.Id)
|
||||
// }
|
||||
}
|
||||
|
||||
return nil
|
||||
|
@ -399,12 +434,12 @@ func (t *FetchSchedulerTask) RunWithSource(source *FetchSource) error {
|
|||
|
||||
func main() {
|
||||
manager := &FetchManager{
|
||||
ChunkSize: 16 * 1024,
|
||||
NumWorkers: 10,
|
||||
ChunkSize: 256 * 1024,
|
||||
NumWorkers: 3,
|
||||
UncertaintyBoostPerChunkTime: 1.1,
|
||||
}
|
||||
manager.Sources = []*FetchSource{
|
||||
NewFetchSource(manager, "2", "xsecond.server", 800*1024, 1000*1024, 3.0),
|
||||
NewFetchSource(manager, "2", "second.server", 800*1024, 1000*1024, 3.0, false, 10000 * 1024),
|
||||
/* NewFetchSource(manager, "3.1", "third.server", 350*1024, 0*1024, 1.0),
|
||||
NewFetchSource(manager, "3.2", "third.server", 350*1024, 0*1024, 1.0),
|
||||
NewFetchSource(manager, "3.3", "third.server", 350*1024, 0*1024, 1.0),
|
||||
|
@ -412,14 +447,16 @@ func main() {
|
|||
NewFetchSource(manager, "3.5", "third.server", 350*1024, 0*1024, 1.0),
|
||||
NewFetchSource(manager, "3.6", "third.server", 350*1024, 0*1024, 1.0),
|
||||
NewFetchSource(manager, "3.7", "third.server", 350*1024, 0*1024, 1.0),*/
|
||||
NewFetchSource(manager, "3", "xthird.server", 350*1024, 0*1024, 1.0),
|
||||
NewFetchSource(manager, "4", "xfourth.server", 160*1024, 0*1024, 1.0),
|
||||
NewFetchSource(manager, "22", "xfirst.server", 50*1024, 50*1024, 1.0),
|
||||
NewFetchSource(manager, "33", "xus31.mapswithme.com", 1500*1024, 0*1024, 1.0),
|
||||
NewFetchSource(manager, "3", "third.server", 350*1024, 0*1024, 1.0, false, 10000 * 1024),
|
||||
NewFetchSource(manager, "4", "fourth.server", 160*1024, 0*1024, 1.0, false, 10000 * 1024),
|
||||
NewFetchSource(manager, "22", "first.server", 50*1024, 0*1024, 1.0, false, 10000 * 1024),
|
||||
NewFetchSource(manager, "33", "fifth.server", 1500*1024, 0*1024, 1.0, false, 10000 * 1024),
|
||||
NewFetchSource(manager, "us3-cdn", "mappy.local", 150*1024, 0*1024, 0.1, true, 10 * 1024),
|
||||
|
||||
}
|
||||
|
||||
for i := 0; i < 1; i++ {
|
||||
scheduler := manager.CreateScheduler("/direct/131031/Belarus.mwm", 67215188)
|
||||
for i := 0; i < 10; i++ {
|
||||
scheduler := manager.CreateScheduler("/ios/131031/Belarus.mwm", 67215188)
|
||||
scheduler.Fetch()
|
||||
}
|
||||
}
|
||||
|
|
Loading…
Add table
Reference in a new issue