add retries to fm_bot
Change-Id: Ibcc6ee83cde537caaab824658721bbda300abc18
Reviewed-on: https://skia-review.googlesource.com/c/skia/+/208273
Commit-Queue: Mike Klein <mtklein@google.com>
Reviewed-by: Brian Osman <brianosman@google.com>
diff --git a/tools/fm/fm_bot.go b/tools/fm/fm_bot.go
index 808a432..9c1b69b 100644
--- a/tools/fm/fm_bot.go
+++ b/tools/fm/fm_bot.go
@@ -14,6 +14,8 @@
"path/filepath"
"runtime"
"strings"
+ "sync"
+ "sync/atomic"
"time"
)
@@ -72,7 +74,9 @@
output, err := cmd.CombinedOutput()
if err != nil {
- log.Printf("\n%v #failed (%v):\n%s\n", strings.Join(cmd.Args, " "), err, output)
+ if !*quiet || len(sources) == 1 {
+ log.Printf("\n%v #failed (%v):\n%s\n", strings.Join(cmd.Args, " "), err, output)
+ }
return false
} else if !*quiet {
log.Printf("\n%v #done in %v:\n%s", strings.Join(cmd.Args, " "), time.Since(start), output)
@@ -193,40 +197,38 @@
}
}
- // The buffer size of main->worker channels isn't super important...
- // presumably we'll have many hungry goroutines snapping up work as quick
- // as they can, and if things get backed up, no real reason for main to do
- // anything but block.
+
+ wg := &sync.WaitGroup{}
+ var failures int32 = 0
+
+ worker := func(queue chan work) {
+ for w := range queue {
+ if !callFM(fm, w.Sources, w.Flags) {
+ if len(w.Sources) == 1 {
+ // If a source ran alone and failed, that's just a failure.
+ atomic.AddInt32(&failures, 1)
+ } else {
+ // If a batch of sources ran and failed, split them up and try again.
+ for _, source := range w.Sources {
+ wg.Add(1)
+ queue <- work{[]string{source}, w.Flags}
+ }
+ }
+ }
+ wg.Done()
+ }
+ }
+
cpu := make(chan work, *cpuLimit)
- gpu := make(chan work, *gpuLimit)
-
- // The buffer size of this worker->main results channel is much more
- // sensitive. Since it's a many->one funnel, it's easy for the workers to
- // produce lots of results that main can't keep up with.
- //
- // This needlessly throttles our progress, and we can even deadlock if
- // the buffer fills up before main has finished enqueueing all the work.
- //
- // So we set the buffer size here large enough to hold a result for every
- // item we might possibly enqueue.
- results := make(chan bool, (*cpuLimit+*gpuLimit)*len(jobs))
-
for i := 0; i < *cpuLimit; i++ {
- go func() {
- for w := range cpu {
- results <- callFM(fm, w.Sources, w.Flags)
- }
- }()
- }
- for i := 0; i < *gpuLimit; i++ {
- go func() {
- for w := range gpu {
- results <- callFM(fm, w.Sources, w.Flags)
- }
- }()
+ go worker(cpu)
}
- sent := 0
+ gpu := make(chan work, *gpuLimit)
+ for i := 0; i < *gpuLimit; i++ {
+ go worker(gpu)
+ }
+
for _, job := range jobs {
// Skip blank lines, empty command lines.
if len(job) == 0 {
@@ -261,7 +263,6 @@
}
// Round up so there's at least one source per batch.
- // This math also helps guarantee that sent stays <= cap(results).
sourcesPerBatch := (len(sources) + limit - 1) / limit
for i := 0; i < len(sources); i += sourcesPerBatch {
@@ -271,26 +272,14 @@
}
batch := sources[i:end]
+ wg.Add(1)
queue <- work{batch, flags}
-
- sent += 1
}
}
- close(cpu)
- close(gpu)
- if sent > cap(results) {
- log.Fatalf("Oops, we sent %d but cap(results) is only %d. "+
- "This could lead to deadlock and is a bug.", sent, cap(results))
- }
+ wg.Wait()
- failures := 0
- for i := 0; i < sent; i++ {
- if !<-results {
- failures += 1
- }
- }
if failures > 0 {
- log.Fatalln(failures, "invocations of", fm, "failed")
+ log.Fatalln(failures, "failures after retries")
}
}