diff --git a/tools/loadersim/loadersim.go b/tools/loadersim/loadersim.go index 961978baf4..62b4632088 100644 --- a/tools/loadersim/loadersim.go +++ b/tools/loadersim/loadersim.go @@ -35,11 +35,13 @@ type FetchSource struct { TotalDownloaded int64 NumChunkAttempts int NumRetries int + IsCDN bool + SlowThreshold float64 } func NewFetchSource( manager *FetchManager, id string, host string, - chunkSpeed float64, speedLimit float64, boost float64) *FetchSource { + chunkSpeed float64, speedLimit float64, boost float64, isCDN bool, slowThreshold float64) *FetchSource { return &FetchSource{ Manager: manager, Id: id, @@ -50,6 +52,8 @@ func NewFetchSource( ChunkSpeed: chunkSpeed, LastSetTime: time.Now(), NumRetries: 0, + IsCDN: isCDN, + SlowThreshold: slowThreshold, } } @@ -76,7 +80,7 @@ func (s *FetchSource) RecordError(err error) { s.LastSetTime = time.Now() s.NumRetries += 1 s.Status = SOURCE_BAD - if s.NumRetries > 3 { // TODO: Magic number + if s.NumRetries >= 3 { // TODO: Magic number s.Status = SOURCE_FAILED } } @@ -122,9 +126,25 @@ func (m *FetchManager) CreateScheduler(path string, size int64) *FetchScheduler } } +func (m *FetchManager) IsCDNEnabled() bool { + for _, source := range m.Sources { + if source.IsCDN { + continue + } + if source.Status == SOURCE_FAILED { + continue + } + if source.ChunkSpeed < source.SlowThreshold { + continue + } + return false + } + return true +} + func (m *FetchManager) PrintSources() { for _, source := range m.Sources { - fmt.Printf("%v, status=%d spd=%5.0f espd=%5.0f last_speed=%5.0f score=%5.2f uncertaintyBoost=%5.2f, Total=%5.0f Attempts=%d, timeSinceLastSet=%5.1f, numRetries=%d\n", + fmt.Printf("%v, status=%d spd=%5.0f espd=%5.0f last_speed=%5.0f score=%5.2f uncertaintyBoost=%5.2f, Total=%5.0f Attempts=%d, timeSinceLastSet=%5.1f, numRetries=%d, isCDN=%v\n", source.Id, source.Status, source.ChunkSpeed/1024, @@ -135,11 +155,14 @@ func (m *FetchManager) PrintSources() { float64(source.TotalDownloaded)/1024.0, source.NumChunkAttempts, time.Now().Sub(source.LastSetTime).Seconds(), - source.NumRetries) + source.NumRetries, + source.IsCDN, + ) } if m.NumTotalChunks != 0 { fmt.Printf("Average chunk time=%.2f, avg chunk speed=%.1f KBps\n", m.AverageChunkTime(), m.AverageChunkSpeed() / 1024) } + fmt.Printf("CDN is Enabled: %v\n", m.IsCDNEnabled()) fmt.Println() } @@ -165,9 +188,14 @@ func (m *FetchManager) GetSource() *FetchSource { defer m.SourceMutex.Unlock() m.PrintSources() + + isCDNEnabled := m.IsCDNEnabled() var selectedSource *FetchSource for _, source := range m.Sources { + if source.IsCDN && !isCDNEnabled { + continue + } switch source.Status { case SOURCE_BUSY, SOURCE_FAILED: continue @@ -308,11 +336,18 @@ type FetchSchedulerTask struct { func (t *FetchSchedulerTask) Run() { // TODO: If there is an error, Run() must be re-attempted several times for each task, // with different sources. - source := t.scheduler.Manager.GetSource() - if source == nil { - fmt.Printf("Cannot acquire new source") - time.Sleep(time.Second * time.Duration(t.scheduler.Manager.AverageChunkTime()) * 3) // TODO: magic numbers - return + var source *FetchSource + for { + source = t.scheduler.Manager.GetSource() + if source == nil { + if t.scheduler.Manager.AllSourcesFailed() { + return + } + fmt.Printf("Cannot acquire new source") + time.Sleep(time.Second * time.Duration(t.scheduler.Manager.AverageChunkTime()) * 3) // TODO: magic numbers + } else { + break + } } t.startTime = time.Now() @@ -384,14 +419,14 @@ func (t *FetchSchedulerTask) RunWithSource(source *FetchSource) error { } else { break } - elapsedTime := time.Now().Sub(t.startTime).Seconds() - momentarySpeed := float64(bytesRead) / 1024 / (elapsedTime + 1e-100) +// elapsedTime := time.Now().Sub(t.startTime).Seconds() +// momentarySpeed := float64(bytesRead) / 1024 / (elapsedTime + 1e-100) //fmt.Printf("%v Momentary speed at %.2f: %.2f\n", source.Id, elapsedTime, momentarySpeed ) - minAllowedSpeed := t.scheduler.Manager.AverageChunkSpeed() / 20 // TODO: magic numbers, review, maybe mark server as bad - if elapsedTime > 16 && momentarySpeed < minAllowedSpeed { - return fmt.Errorf("Server %v too slow", source.Id) - } +// minAllowedSpeed := t.scheduler.Manager.AverageChunkSpeed() / 20 // TODO: magic numbers, review, maybe mark server as bad +// if elapsedTime > 16 && momentarySpeed < minAllowedSpeed { +// fmt.Printf("Server %v too slow", source.Id) +// } } return nil @@ -399,12 +434,12 @@ func (t *FetchSchedulerTask) RunWithSource(source *FetchSource) error { func main() { manager := &FetchManager{ - ChunkSize: 16 * 1024, - NumWorkers: 10, + ChunkSize: 256 * 1024, + NumWorkers: 3, UncertaintyBoostPerChunkTime: 1.1, } manager.Sources = []*FetchSource{ - NewFetchSource(manager, "2", "xsecond.server", 800*1024, 1000*1024, 3.0), + NewFetchSource(manager, "2", "second.server", 800*1024, 1000*1024, 3.0, false, 10000 * 1024), /* NewFetchSource(manager, "3.1", "third.server", 350*1024, 0*1024, 1.0), NewFetchSource(manager, "3.2", "third.server", 350*1024, 0*1024, 1.0), NewFetchSource(manager, "3.3", "third.server", 350*1024, 0*1024, 1.0), @@ -412,14 +447,16 @@ func main() { NewFetchSource(manager, "3.5", "third.server", 350*1024, 0*1024, 1.0), NewFetchSource(manager, "3.6", "third.server", 350*1024, 0*1024, 1.0), NewFetchSource(manager, "3.7", "third.server", 350*1024, 0*1024, 1.0),*/ - NewFetchSource(manager, "3", "xthird.server", 350*1024, 0*1024, 1.0), - NewFetchSource(manager, "4", "xfourth.server", 160*1024, 0*1024, 1.0), - NewFetchSource(manager, "22", "xfirst.server", 50*1024, 50*1024, 1.0), - NewFetchSource(manager, "33", "xus31.mapswithme.com", 1500*1024, 0*1024, 1.0), + NewFetchSource(manager, "3", "third.server", 350*1024, 0*1024, 1.0, false, 10000 * 1024), + NewFetchSource(manager, "4", "fourth.server", 160*1024, 0*1024, 1.0, false, 10000 * 1024), + NewFetchSource(manager, "22", "first.server", 50*1024, 0*1024, 1.0, false, 10000 * 1024), + NewFetchSource(manager, "33", "fifth.server", 1500*1024, 0*1024, 1.0, false, 10000 * 1024), + NewFetchSource(manager, "us3-cdn", "mappy.local", 150*1024, 0*1024, 0.1, true, 10 * 1024), + } - for i := 0; i < 1; i++ { - scheduler := manager.CreateScheduler("/direct/131031/Belarus.mwm", 67215188) + for i := 0; i < 10; i++ { + scheduler := manager.CreateScheduler("/ios/131031/Belarus.mwm", 67215188) scheduler.Fetch() } }