From cb58c5d5a58cbd683b271c9441480cecdd976973 Mon Sep 17 00:00:00 2001 From: Darafei Praliaskouski Date: Wed, 6 Nov 2013 15:24:18 +0300 Subject: [PATCH] [loadersim] too many bad servers fix --- tools/loadersim/loadersim.go | 27 ++++++++++++++++++--------- 1 file changed, 18 insertions(+), 9 deletions(-) diff --git a/tools/loadersim/loadersim.go b/tools/loadersim/loadersim.go index 81614d1688..bacf695999 100644 --- a/tools/loadersim/loadersim.go +++ b/tools/loadersim/loadersim.go @@ -117,7 +117,7 @@ func (m *FetchManager) CreateScheduler(path string, size int64) *FetchScheduler func (m *FetchManager) PrintSources() { for _, source := range m.Sources { - fmt.Printf("%v, status=%d spd=%5.0f espd=%5.0f last_speed=%5.0f score=%5.2f uncertaintyBoost=%5.2f, Total=%5.0f Attempts=%d\n", + fmt.Printf("%v, status=%d spd=%5.0f espd=%5.0f last_speed=%5.0f score=%5.2f uncertaintyBoost=%5.2f, Total=%5.0f Attempts=%d, timeSinceLastSet=%5.1f\n", source.Id, source.Status, source.ChunkSpeed/1024, @@ -126,7 +126,8 @@ func (m *FetchManager) PrintSources() { source.Score()/1024/1024, m.UncertaintyBoost(time.Now().Sub(source.LastSetTime)), float64(source.TotalDownloaded)/1024.0, - source.NumChunkAttempts) + source.NumChunkAttempts, + time.Now().Sub(source.LastSetTime).Seconds()) } if m.NumTotalChunks != 0 { fmt.Printf("Average chunk time=%.2f, avg chunk speed=%.1f KBps\n", m.AverageChunkTime(), m.AverageChunkSpeed() / 1024) @@ -151,17 +152,20 @@ func (m *FetchManager) AverageChunkSpeed() float64 { } // GetSource finds an optimal source to use for fetching. -func (d *FetchManager) GetSource() *FetchSource { - d.SourceMutex.Lock() - defer d.SourceMutex.Unlock() +func (m *FetchManager) GetSource() *FetchSource { + m.SourceMutex.Lock() + defer m.SourceMutex.Unlock() - d.PrintSources() + m.PrintSources() var selectedSource *FetchSource - for _, source := range d.Sources { + for _, source := range m.Sources { if source.Status == SOURCE_BUSY { continue } + if source.Status == SOURCE_BAD && time.Now().Sub(source.LastSetTime).Seconds() < 20 * m.AverageChunkTime() { + continue + } if selectedSource == nil { selectedSource = source continue @@ -267,6 +271,11 @@ func (t *FetchSchedulerTask) Run() { // TODO: If there is an error, Run() must be re-attempted several times for each task, // with different sources. source := t.scheduler.Manager.GetSource() + if source == nil { + fmt.Printf("Cannot acquire new source") + time.Sleep(time.Second * time.Duration(t.scheduler.Manager.AverageChunkTime()) * 3) // TODO: magic numbers + return + } t.startTime = time.Now() err := t.RunWithSource(source) @@ -353,7 +362,7 @@ func (t *FetchSchedulerTask) RunWithSource(source *FetchSource) error { func main() { manager := &FetchManager{ ChunkSize: 256 * 1024, - NumWorkers: 3, + NumWorkers: 10, UncertaintyBoostPerChunkTime: 1.1, } manager.Sources = []*FetchSource{ @@ -368,7 +377,7 @@ func main() { NewFetchSource(manager, "3", "third.server", 350*1024, 0*1024, 1.0), NewFetchSource(manager, "4", "fourth.server", 160*1024, 0*1024, 1.0), NewFetchSource(manager, "22", "first.server", 50*1024, 50*1024, 1.0), - NewFetchSource(manager, "33", "fifth.server", 1500*1024, 0*1024, 1.0), + NewFetchSource(manager, "33", "us31.mapswithme.com", 1500*1024, 0*1024, 1.0), } for i := 0; i < 10; i++ {