add retries to fm_bot

Change-Id: Ibcc6ee83cde537caaab824658721bbda300abc18
Reviewed-on: https://skia-review.googlesource.com/c/skia/+/208273
Commit-Queue: Mike Klein <mtklein@google.com>
Reviewed-by: Brian Osman <brianosman@google.com>
diff --git a/tools/fm/fm_bot.go b/tools/fm/fm_bot.go
index 808a432..9c1b69b 100644
--- a/tools/fm/fm_bot.go
+++ b/tools/fm/fm_bot.go
@@ -14,6 +14,8 @@
 	"path/filepath"
 	"runtime"
 	"strings"
+	"sync"
+	"sync/atomic"
 	"time"
 )
 
@@ -72,7 +74,9 @@
 	output, err := cmd.CombinedOutput()
 
 	if err != nil {
-		log.Printf("\n%v #failed (%v):\n%s\n", strings.Join(cmd.Args, " "), err, output)
+		if !*quiet || len(sources) == 1 {
+			log.Printf("\n%v #failed (%v):\n%s\n", strings.Join(cmd.Args, " "), err, output)
+		}
 		return false
 	} else if !*quiet {
 		log.Printf("\n%v #done in %v:\n%s", strings.Join(cmd.Args, " "), time.Since(start), output)
@@ -193,40 +197,38 @@
 		}
 	}
 
-	// The buffer size of main->worker channels isn't super important...
-	// presumably we'll have many hungry goroutines snapping up work as quick
-	// as they can, and if things get backed up, no real reason for main to do
-	// anything but block.
+
+	wg := &sync.WaitGroup{}
+	var failures int32 = 0
+
+	worker := func(queue chan work) {
+		for w := range queue {
+			if !callFM(fm, w.Sources, w.Flags) {
+				if len(w.Sources) == 1 {
+					// If a source ran alone and failed, that's just a failure.
+					atomic.AddInt32(&failures, 1)
+				} else {
+					// If a batch of sources ran and failed, split them up and try again.
+					for _, source := range w.Sources {
+						wg.Add(1)
+						queue <- work{[]string{source}, w.Flags}
+					}
+				}
+			}
+			wg.Done()
+		}
+	}
+
 	cpu := make(chan work, *cpuLimit)
-	gpu := make(chan work, *gpuLimit)
-
-	// The buffer size of this worker->main results channel is much more
-	// sensitive.  Since it's a many->one funnel, it's easy for the workers to
-	// produce lots of results that main can't keep up with.
-	//
-	// This needlessly throttles our progress, and we can even deadlock if
-	// the buffer fills up before main has finished enqueueing all the work.
-	//
-	// So we set the buffer size here large enough to hold a result for every
-	// item we might possibly enqueue.
-	results := make(chan bool, (*cpuLimit+*gpuLimit)*len(jobs))
-
 	for i := 0; i < *cpuLimit; i++ {
-		go func() {
-			for w := range cpu {
-				results <- callFM(fm, w.Sources, w.Flags)
-			}
-		}()
-	}
-	for i := 0; i < *gpuLimit; i++ {
-		go func() {
-			for w := range gpu {
-				results <- callFM(fm, w.Sources, w.Flags)
-			}
-		}()
+		go worker(cpu)
 	}
 
-	sent := 0
+	gpu := make(chan work, *gpuLimit)
+	for i := 0; i < *gpuLimit; i++ {
+		go worker(gpu)
+	}
+
 	for _, job := range jobs {
 		// Skip blank lines, empty command lines.
 		if len(job) == 0 {
@@ -261,7 +263,6 @@
 		}
 
 		// Round up so there's at least one source per batch.
-		// This math also helps guarantee that sent stays <= cap(results).
 		sourcesPerBatch := (len(sources) + limit - 1) / limit
 
 		for i := 0; i < len(sources); i += sourcesPerBatch {
@@ -271,26 +272,14 @@
 			}
 			batch := sources[i:end]
 
+			wg.Add(1)
 			queue <- work{batch, flags}
-
-			sent += 1
 		}
 	}
-	close(cpu)
-	close(gpu)
 
-	if sent > cap(results) {
-		log.Fatalf("Oops, we sent %d but cap(results) is only %d.  "+
-			"This could lead to deadlock and is a bug.", sent, cap(results))
-	}
+	wg.Wait()
 
-	failures := 0
-	for i := 0; i < sent; i++ {
-		if !<-results {
-			failures += 1
-		}
-	}
 	if failures > 0 {
-		log.Fatalln(failures, "invocations of", fm, "failed")
+		log.Fatalln(failures, "failures after retries")
 	}
 }