blob: 8176316bb7e4901c669046fc0f83cd379f43497c [file] [log] [blame]
package periodic_triggers
import (
"context"
"encoding/json"
"errors"
"fmt"
"io"
"os"
"path"
"sync"
"time"
"go.skia.org/infra/go/metrics2"
"go.skia.org/infra/go/sklog"
"go.skia.org/infra/go/util"
)
const (
// TRIGGER_DIRNAME is the name of the directory containing files
// indicating that periodic actions should be triggered. Common practice
// is to use systemd services to write the files on a timer.
TRIGGER_DIRNAME = "periodic-trigger"
// PERIODIC_TRIGGER_MEASUREMENT is the name of the liveness metric for
// periodic triggers.
PERIODIC_TRIGGER_MEASUREMENT = "periodic_trigger"
// LAST_TRIGGERED_JSON_FILE is the name of a JSON file containing the
// last-triggered time for each known trigger.
LAST_TRIGGERED_JSON_FILE = "last-triggered.json"
)
// periodicTriggerMetrics tracks liveness metrics for various periodic triggers.
type periodicTriggerMetrics struct {
jsonFile string
LastTriggered map[string]time.Time `json:"last_triggered"`
metrics map[string]metrics2.Liveness
}
// newPeriodicTriggerMetrics returns a periodicTriggerMetrics instance,
// pre-filled with data from a file.
func newPeriodicTriggerMetrics(workdir string) (*periodicTriggerMetrics, error) {
var rv periodicTriggerMetrics
jsonFile := path.Join(workdir, LAST_TRIGGERED_JSON_FILE)
f, err := os.Open(jsonFile)
if err == nil {
defer util.Close(f)
if err := json.NewDecoder(f).Decode(&rv); err != nil {
return nil, err
}
} else if !os.IsNotExist(err) {
return nil, err
} else {
rv.LastTriggered = map[string]time.Time{}
}
rv.jsonFile = jsonFile
rv.metrics = make(map[string]metrics2.Liveness, len(rv.LastTriggered))
for trigger, last := range rv.LastTriggered {
lv := metrics2.NewLiveness(PERIODIC_TRIGGER_MEASUREMENT, map[string]string{
"trigger": trigger,
})
lv.ManualReset(last)
rv.metrics[trigger] = lv
}
return &rv, nil
}
// Reset resets the given trigger metric.
func (m *periodicTriggerMetrics) Reset(name string) {
now := time.Now()
lv, ok := m.metrics[name]
if !ok {
sklog.Errorf("Creating metric %s -- %s", PERIODIC_TRIGGER_MEASUREMENT, name)
lv = metrics2.NewLiveness(PERIODIC_TRIGGER_MEASUREMENT, map[string]string{
"trigger": name,
})
m.metrics[name] = lv
}
lv.ManualReset(now)
m.LastTriggered[name] = now
}
// Write writes the last-triggered times to a JSON file.
func (m *periodicTriggerMetrics) Write() error {
return util.WithWriteFile(m.jsonFile, func(w io.Writer) error {
return json.NewEncoder(w).Encode(m)
})
}
// findAndParseTriggerFiles returns the base filenames for each file in
// triggerDir.
func findAndParseTriggerFiles(triggerDir string) ([]string, error) {
dir, err := os.Open(triggerDir)
if err != nil {
return nil, fmt.Errorf("Unable to read trigger directory %s: %s", triggerDir, err)
}
defer util.Close(dir)
files, err := dir.Readdirnames(1)
if err == io.EOF {
return []string{}, nil
} else if err != nil {
return nil, fmt.Errorf("Unable to list trigger directory %s: %s", triggerDir, err)
}
return files, nil
}
// deleteTriggerFile removes the given trigger file indicating that the
// triggered function(s) succeeded.
func deleteTriggerFile(triggerDir, basename string) error {
filename := path.Join(triggerDir, basename)
if err := os.Remove(filename); err != nil {
return fmt.Errorf("Unable to remove trigger file %s: %s", filename, err)
}
return nil
}
// Triggerer is a struct which triggers certain actions on a timer.
type Triggerer struct {
funcs map[string][]func(context.Context) error
metrics *periodicTriggerMetrics
mtx sync.RWMutex
triggerDir string
workdir string
}
// Return a Triggerer instance.
func NewTriggerer(workdir string) (*Triggerer, error) {
triggerDir := path.Join(workdir, TRIGGER_DIRNAME)
if err := os.MkdirAll(triggerDir, os.ModePerm); err != nil {
return nil, err
}
metrics, err := newPeriodicTriggerMetrics(workdir)
if err != nil {
return nil, err
}
return &Triggerer{
funcs: map[string][]func(context.Context) error{},
metrics: metrics,
mtx: sync.RWMutex{},
triggerDir: triggerDir,
workdir: workdir,
}, nil
}
// Register the given function to run at the given trigger.
func (t *Triggerer) Register(trigger string, fn func(context.Context) error) {
t.mtx.Lock()
defer t.mtx.Unlock()
t.funcs[trigger] = append(t.funcs[trigger], fn)
}
// RunPeriodicTriggers returns the set of triggers which have just fired.
func (t *Triggerer) RunPeriodicTriggers(ctx context.Context) error {
triggers, err := findAndParseTriggerFiles(t.triggerDir)
if err != nil {
return err
}
t.mtx.RLock()
defer t.mtx.RUnlock()
// TODO(borenet): Parallelize this?
allErrs := []error{}
for _, trigger := range triggers {
errs := []error{}
for _, fn := range t.funcs[trigger] {
if err := fn(ctx); err != nil {
errs = append(errs, err)
}
}
if len(errs) == 0 {
if err := deleteTriggerFile(t.triggerDir, trigger); err != nil {
allErrs = append(allErrs, err)
}
t.metrics.Reset(trigger)
} else {
allErrs = append(allErrs, errs...)
}
}
if err := t.metrics.Write(); err != nil {
allErrs = append(allErrs, err)
}
if len(allErrs) > 0 {
rvMsg := "Encountered errors running periodic triggers:"
for _, err := range allErrs {
rvMsg += fmt.Sprintf("\n%s", err)
}
return errors.New(rvMsg)
}
return nil
}
// Start running periodic triggers in a loop.
func Start(ctx context.Context, workdir string, triggers map[string][]func(context.Context) error) error {
t, err := NewTriggerer(workdir)
if err != nil {
return err
}
for trigger, funcs := range triggers {
for _, fn := range funcs {
t.Register(trigger, fn)
}
}
lv := metrics2.NewLiveness("last_successful_periodic_trigger_loop")
go util.RepeatCtx(time.Minute, ctx, func() {
if err := t.RunPeriodicTriggers(ctx); err != nil {
sklog.Errorf("Failed to run periodic triggers: %s", err)
} else {
lv.Reset()
}
})
return nil
}