blob: 8a1523d6c407ef5600295a071251b0cfd8f15b52 [file] [log] [blame]
package version_watcher
import (
"context"
"fmt"
"time"
"go.skia.org/infra/fuzzer/go/config"
"go.skia.org/infra/fuzzer/go/download_skia"
fstorage "go.skia.org/infra/fuzzer/go/storage"
"go.skia.org/infra/go/metrics2"
"go.skia.org/infra/go/sklog"
)
// A VersionHandler is the type of the callbacks used by VersionWatcher.
type VersionHandler func(context.Context, string) error
// VersionWatcher handles the logic to wait for the version under fuzz to change in
// Google Storage. When it notices the pending version change or the current version
// change, it will fire off one of two provided callbacks.
// It also provides a way for clients to access the current and pending versions seen
// by this watcher.
// The callbacks are not fired on the initial values of the versions, only when a change
// happens.
type VersionWatcher struct {
// A channel to monitor any fatal errors encountered by the version watcher.
Status chan error
storageClient fstorage.FuzzerGCSClient
polingPeriod time.Duration
LastCurrentHash string
LastPendingHash string
onPendingChange VersionHandler
onCurrentChange VersionHandler
force chan bool
}
// NewVersionWatcher creates a version watcher with the specified time period and access
// to GCS. The supplied callbacks may be nil.
func New(s fstorage.FuzzerGCSClient, period time.Duration, onPendingChange, onCurrentChange VersionHandler) *VersionWatcher {
return &VersionWatcher{
storageClient: s,
polingPeriod: period,
onPendingChange: onPendingChange,
onCurrentChange: onCurrentChange,
Status: make(chan error),
force: make(chan bool),
}
}
// Start starts a goroutine that will occasionally wake up (as specified by the period)
// and check to see if the current or pending skia versions to fuzz have changed.
func (vw *VersionWatcher) Start(ctx context.Context) {
go func() {
t := time.Tick(vw.polingPeriod)
for {
select {
case <-vw.force:
vw.check(ctx)
case <-t:
vw.check(ctx)
}
}
}()
}
// check looks in Google Storage to see if the pending or current versions have updated. If so, it
// synchronously calls the relevent callbacks and updates this objects LastCurrentHash
// and/or LastPendingHash.
func (vw *VersionWatcher) check(ctx context.Context) {
sklog.Infof("Woke up to check the Skia version, last current seen was %s", vw.LastCurrentHash)
current, lastUpdated, err := download_skia.GetCurrentSkiaVersionFromGCS(vw.storageClient)
if err != nil {
sklog.Errorf("Failed getting current Skia version from GCS. Going to try again: %s", err)
return
}
sklog.Infof("Current version found to be %q, updated at %v", current, lastUpdated)
if vw.LastCurrentHash == "" {
vw.LastCurrentHash = current
} else if current != vw.LastCurrentHash && vw.onCurrentChange != nil {
sklog.Infof("Calling onCurrentChange(%q)", current)
if err := vw.onCurrentChange(ctx, current); err != nil {
vw.Status <- fmt.Errorf("Failed while executing onCurrentChange %#v.We could be in a broken state. %s", vw.onCurrentChange, err)
return
}
vw.LastCurrentHash = current
lastUpdated = time.Now()
}
if !lastUpdated.IsZero() {
metrics2.GetInt64Metric("fuzzer_version_age", map[string]string{"type": "current"}).Update(int64(time.Since(lastUpdated) / time.Second))
}
if config.Common.ForceReanalysis {
if err := vw.onPendingChange(ctx, vw.LastCurrentHash); err != nil {
sklog.Errorf("There was a problem during force analysis: %s", err)
}
config.Common.ForceReanalysis = false
return
}
pending, lastUpdated, err := download_skia.GetPendingSkiaVersionFromGCS(vw.storageClient)
if err != nil {
sklog.Errorf("Failed getting pending Skia version from GCS. Going to try again: %s", err)
return
}
sklog.Infof("Pending version found to be %q, updated at %v", pending, lastUpdated)
if pending == "" {
vw.LastPendingHash = ""
lastUpdated = time.Now()
} else if vw.LastPendingHash != pending {
vw.LastPendingHash = pending
lastUpdated = time.Now()
if vw.onPendingChange != nil {
sklog.Infof("Calling onPendingChange(%q)", pending)
if err := vw.onPendingChange(ctx, pending); err != nil {
vw.Status <- fmt.Errorf("Failed while executing onCurrentChange %#v.We could be in a broken state. %s", vw.onCurrentChange, err)
return
}
}
}
if !lastUpdated.IsZero() {
metrics2.GetInt64Metric("fuzzer_version_age", map[string]string{"type": "pending"}).Update(int64(time.Since(lastUpdated) / time.Second))
}
}
// Recheck forces a recheck of the pending and current version.
func (vw *VersionWatcher) Recheck() {
vw.force <- true
}