[task scheduler] Use kubernetes jobs to trigger periodic tasks via pubsub
The jobs are added in https://skia-review.googlesource.com/c/skia-public-config/+/182120/
Bug: skia:8636
Change-Id: I603263e43eba847dd8ccc8c5955a06a33e6b233c
Reviewed-on: https://skia-review.googlesource.com/c/179849
Commit-Queue: Eric Boren <borenet@google.com>
Reviewed-by: Ben Wagner <benjaminwagner@google.com>
diff --git a/go/periodic/periodic.go b/go/periodic/periodic.go
new file mode 100644
index 0000000..b827e9f
--- /dev/null
+++ b/go/periodic/periodic.go
@@ -0,0 +1,163 @@
+package periodic
+
+import (
+ "context"
+ "fmt"
+ "time"
+
+ "cloud.google.com/go/pubsub"
+ "go.skia.org/infra/go/sklog"
+ "go.skia.org/infra/go/util"
+ "golang.org/x/oauth2"
+ "google.golang.org/api/option"
+)
+
+const (
+ // Authentication scope required for periodic triggers.
+ AUTH_SCOPE = pubsub.ScopePubSub
+
+ // PubSub topic used for periodic triggers. A single topic is used for
+ // all triggers, with each message containing an attribute indicating
+ // which trigger promted it.
+ PUBSUB_TOPIC = "periodic-trigger"
+
+ // Attribute sent with all pubsub messages; the name of the periodic
+ // trigger which prompted the message.
+ PUBSUB_ATTR_TRIGGER_NAME = "trigger"
+
+ // Attribute sent with all pubsub messages; the unique ID of the call
+ // to Trigger() which prompted the message.
+ PUBSUB_ATTR_TRIGGER_ID = "id"
+
+ // Acknowledgement deadline for pubsub messages; all TriggerCallbackFns
+ // must be faster than this deadline. If this is changed, all
+ // subscriptions will need to be deleted and recreated.
+ PUBSUB_ACK_DEADLINE = 5 * time.Minute
+
+ // Google Cloud project name used for pubsub.
+ PUBSUB_PROJECT = "skia-public"
+
+ // Names of periodic triggers.
+ TRIGGER_NIGHTLY = "nightly"
+ TRIGGER_WEEKLY = "weekly"
+)
+
+var (
+ VALID_TRIGGERS = []string{TRIGGER_NIGHTLY, TRIGGER_WEEKLY}
+)
+
+// TriggerCallbackFn is a function called when handling requests for periodic
+// triggers. The string parameters are the name of the periodic trigger and the
+// unique ID of the call to Trigger() which generated the message. The return
+// value determines whether or not the pubsub message should be ACK'd. If the
+// TriggerCallbackFn returns false, it may be called again. TriggerCallbackFns
+// must finish within the PUBSUB_ACK_DEADLINE.
+type TriggerCallbackFn func(context.Context, string, string) bool
+
+// validateTrigger returns an error if the given trigger name is not valid.
+func validateTrigger(triggerName string) error {
+ if !util.In(triggerName, VALID_TRIGGERS) {
+ return fmt.Errorf("Invalid trigger name %q", triggerName)
+ }
+ return nil
+}
+
+// validateId returns an error if the valid trigger ID is not valid.
+func validateId(triggerId string) error {
+ if triggerId == "" {
+ return fmt.Errorf("Invalid trigger ID %q", triggerId)
+ }
+ return nil
+}
+
+// setup is a helper function which returns the pubsub client and topic,
+// creating the topic if necessary.
+func setup(ctx context.Context, ts oauth2.TokenSource) (*pubsub.Client, *pubsub.Topic, error) {
+ c, err := pubsub.NewClient(ctx, PUBSUB_PROJECT, option.WithTokenSource(ts))
+ if err != nil {
+ return nil, nil, err
+ }
+ t := c.Topic(PUBSUB_TOPIC)
+ exists, err := t.Exists(ctx)
+ if err != nil {
+ return nil, nil, err
+ }
+ if !exists {
+ if _, err := c.CreateTopic(ctx, PUBSUB_TOPIC); err != nil {
+ return nil, nil, err
+ }
+ }
+ return c, t, nil
+}
+
+// Listen creates a background goroutine which listens for pubsub messages for
+// periodic triggers. The subscriber name is used as part of the pubsub
+// subscription ID; if there are multiple instances of a server which all need
+// to receive every message, they should use different subscriber names.
+func Listen(ctx context.Context, subscriberName string, ts oauth2.TokenSource, cb TriggerCallbackFn) error {
+ c, t, err := setup(ctx, ts)
+ if err != nil {
+ return err
+ }
+ subId := PUBSUB_TOPIC + "+" + subscriberName
+ sub := c.Subscription(subId)
+ exists, err := sub.Exists(ctx)
+ if err != nil {
+ return err
+ }
+ if !exists {
+ if _, err := c.CreateSubscription(ctx, subId, pubsub.SubscriptionConfig{
+ Topic: t,
+ AckDeadline: PUBSUB_ACK_DEADLINE + 10*time.Second,
+ }); err != nil {
+ return err
+ }
+ }
+
+ // Start the receiving goroutine.
+ go func() {
+ if err := sub.Receive(ctx, func(ctx context.Context, m *pubsub.Message) {
+ triggerName := m.Attributes[PUBSUB_ATTR_TRIGGER_NAME]
+ if err := validateTrigger(triggerName); err != nil {
+ sklog.Errorf("Received invalid pubsub message: %s", err)
+ m.Ack()
+ }
+ triggerId := m.Attributes[PUBSUB_ATTR_TRIGGER_ID]
+ if err := validateId(triggerId); err != nil {
+ sklog.Errorf("Received invalid pubsub message: %s", err)
+ m.Ack()
+ }
+ if cb(ctx, triggerName, triggerId) {
+ m.Ack()
+ } else {
+ m.Nack()
+ }
+ }); err != nil {
+ sklog.Errorf("Pubsub subscription receive failed: %s", err)
+ }
+ }()
+ return nil
+}
+
+// Send a pubsub message for the given periodic trigger. The triggerId may be
+// used for de-duplication on the subscriber side and should therefore be unique
+// for each invocation of Trigger.
+func Trigger(ctx context.Context, triggerName, triggerId string, ts oauth2.TokenSource) error {
+ if err := validateTrigger(triggerName); err != nil {
+ return err
+ }
+ if err := validateId(triggerId); err != nil {
+ return err
+ }
+ _, t, err := setup(ctx, ts)
+ if err != nil {
+ return err
+ }
+ _, err = t.Publish(ctx, &pubsub.Message{
+ Attributes: map[string]string{
+ PUBSUB_ATTR_TRIGGER_NAME: triggerName,
+ PUBSUB_ATTR_TRIGGER_ID: triggerId,
+ },
+ }).Get(ctx)
+ return err
+}
diff --git a/go/periodic/periodic_test.go b/go/periodic/periodic_test.go
new file mode 100644
index 0000000..2834795
--- /dev/null
+++ b/go/periodic/periodic_test.go
@@ -0,0 +1,52 @@
+package periodic
+
+import (
+ "context"
+ "fmt"
+ "testing"
+
+ "github.com/google/uuid"
+ "github.com/stretchr/testify/assert"
+ "go.skia.org/infra/go/testutils"
+)
+
+func TestPeriodic(t *testing.T) {
+ testutils.LargeTest(t)
+
+ ctx, cancel := context.WithCancel(context.Background())
+ defer cancel()
+
+ // Test validation.
+ assert.EqualError(t, Trigger(ctx, "bogus", uuid.New().String(), nil), "Invalid trigger name \"bogus\"")
+ assert.EqualError(t, Trigger(ctx, TRIGGER_NIGHTLY, "", nil), "Invalid trigger ID \"\"")
+
+ subName := fmt.Sprintf("periodic-test-%s", uuid.New())
+ expectCh := make(chan string)
+ rvCh := make(chan bool)
+
+ assert.NoError(t, Listen(ctx, subName, nil, func(_ context.Context, trigger, id string) bool {
+ expectTrigger := <-expectCh
+ expectId := <-expectCh
+ assert.Equal(t, expectTrigger, trigger)
+ assert.Equal(t, expectId, id)
+ return <-rvCh
+ }))
+
+ check := func(trigger, id string, rv bool) {
+ expectCh <- trigger
+ expectCh <- id
+ rvCh <- rv
+ }
+ triggerAndCheck := func(trigger, id string, rv bool) {
+ assert.NoError(t, Trigger(ctx, trigger, id, nil))
+ check(trigger, id, rv)
+ }
+
+ // Normal operation; a single pubsub round trip.
+ triggerAndCheck(TRIGGER_NIGHTLY, uuid.New().String(), true)
+
+ // Initial handling fails, the message will be delivered again.
+ id := uuid.New().String()
+ triggerAndCheck(TRIGGER_NIGHTLY, id, false)
+ check(TRIGGER_NIGHTLY, id, true)
+}
diff --git a/go/periodic_triggers/periodic_triggers.go b/go/periodic_triggers/periodic_triggers.go
deleted file mode 100644
index 8176316..0000000
--- a/go/periodic_triggers/periodic_triggers.go
+++ /dev/null
@@ -1,214 +0,0 @@
-package periodic_triggers
-
-import (
- "context"
- "encoding/json"
- "errors"
- "fmt"
- "io"
- "os"
- "path"
- "sync"
- "time"
-
- "go.skia.org/infra/go/metrics2"
- "go.skia.org/infra/go/sklog"
- "go.skia.org/infra/go/util"
-)
-
-const (
- // TRIGGER_DIRNAME is the name of the directory containing files
- // indicating that periodic actions should be triggered. Common practice
- // is to use systemd services to write the files on a timer.
- TRIGGER_DIRNAME = "periodic-trigger"
-
- // PERIODIC_TRIGGER_MEASUREMENT is the name of the liveness metric for
- // periodic triggers.
- PERIODIC_TRIGGER_MEASUREMENT = "periodic_trigger"
-
- // LAST_TRIGGERED_JSON_FILE is the name of a JSON file containing the
- // last-triggered time for each known trigger.
- LAST_TRIGGERED_JSON_FILE = "last-triggered.json"
-)
-
-// periodicTriggerMetrics tracks liveness metrics for various periodic triggers.
-type periodicTriggerMetrics struct {
- jsonFile string
- LastTriggered map[string]time.Time `json:"last_triggered"`
- metrics map[string]metrics2.Liveness
-}
-
-// newPeriodicTriggerMetrics returns a periodicTriggerMetrics instance,
-// pre-filled with data from a file.
-func newPeriodicTriggerMetrics(workdir string) (*periodicTriggerMetrics, error) {
- var rv periodicTriggerMetrics
- jsonFile := path.Join(workdir, LAST_TRIGGERED_JSON_FILE)
- f, err := os.Open(jsonFile)
- if err == nil {
- defer util.Close(f)
- if err := json.NewDecoder(f).Decode(&rv); err != nil {
- return nil, err
- }
- } else if !os.IsNotExist(err) {
- return nil, err
- } else {
- rv.LastTriggered = map[string]time.Time{}
- }
- rv.jsonFile = jsonFile
- rv.metrics = make(map[string]metrics2.Liveness, len(rv.LastTriggered))
- for trigger, last := range rv.LastTriggered {
- lv := metrics2.NewLiveness(PERIODIC_TRIGGER_MEASUREMENT, map[string]string{
- "trigger": trigger,
- })
- lv.ManualReset(last)
- rv.metrics[trigger] = lv
- }
- return &rv, nil
-}
-
-// Reset resets the given trigger metric.
-func (m *periodicTriggerMetrics) Reset(name string) {
- now := time.Now()
- lv, ok := m.metrics[name]
- if !ok {
- sklog.Errorf("Creating metric %s -- %s", PERIODIC_TRIGGER_MEASUREMENT, name)
- lv = metrics2.NewLiveness(PERIODIC_TRIGGER_MEASUREMENT, map[string]string{
- "trigger": name,
- })
- m.metrics[name] = lv
- }
- lv.ManualReset(now)
- m.LastTriggered[name] = now
-}
-
-// Write writes the last-triggered times to a JSON file.
-func (m *periodicTriggerMetrics) Write() error {
- return util.WithWriteFile(m.jsonFile, func(w io.Writer) error {
- return json.NewEncoder(w).Encode(m)
- })
-}
-
-// findAndParseTriggerFiles returns the base filenames for each file in
-// triggerDir.
-func findAndParseTriggerFiles(triggerDir string) ([]string, error) {
- dir, err := os.Open(triggerDir)
- if err != nil {
- return nil, fmt.Errorf("Unable to read trigger directory %s: %s", triggerDir, err)
- }
- defer util.Close(dir)
- files, err := dir.Readdirnames(1)
- if err == io.EOF {
- return []string{}, nil
- } else if err != nil {
- return nil, fmt.Errorf("Unable to list trigger directory %s: %s", triggerDir, err)
- }
- return files, nil
-}
-
-// deleteTriggerFile removes the given trigger file indicating that the
-// triggered function(s) succeeded.
-func deleteTriggerFile(triggerDir, basename string) error {
- filename := path.Join(triggerDir, basename)
- if err := os.Remove(filename); err != nil {
- return fmt.Errorf("Unable to remove trigger file %s: %s", filename, err)
- }
- return nil
-}
-
-// Triggerer is a struct which triggers certain actions on a timer.
-type Triggerer struct {
- funcs map[string][]func(context.Context) error
- metrics *periodicTriggerMetrics
- mtx sync.RWMutex
- triggerDir string
- workdir string
-}
-
-// Return a Triggerer instance.
-func NewTriggerer(workdir string) (*Triggerer, error) {
- triggerDir := path.Join(workdir, TRIGGER_DIRNAME)
- if err := os.MkdirAll(triggerDir, os.ModePerm); err != nil {
- return nil, err
- }
- metrics, err := newPeriodicTriggerMetrics(workdir)
- if err != nil {
- return nil, err
- }
- return &Triggerer{
- funcs: map[string][]func(context.Context) error{},
- metrics: metrics,
- mtx: sync.RWMutex{},
- triggerDir: triggerDir,
- workdir: workdir,
- }, nil
-}
-
-// Register the given function to run at the given trigger.
-func (t *Triggerer) Register(trigger string, fn func(context.Context) error) {
- t.mtx.Lock()
- defer t.mtx.Unlock()
- t.funcs[trigger] = append(t.funcs[trigger], fn)
-}
-
-// RunPeriodicTriggers returns the set of triggers which have just fired.
-func (t *Triggerer) RunPeriodicTriggers(ctx context.Context) error {
- triggers, err := findAndParseTriggerFiles(t.triggerDir)
- if err != nil {
- return err
- }
-
- t.mtx.RLock()
- defer t.mtx.RUnlock()
-
- // TODO(borenet): Parallelize this?
- allErrs := []error{}
- for _, trigger := range triggers {
- errs := []error{}
- for _, fn := range t.funcs[trigger] {
- if err := fn(ctx); err != nil {
- errs = append(errs, err)
- }
- }
- if len(errs) == 0 {
- if err := deleteTriggerFile(t.triggerDir, trigger); err != nil {
- allErrs = append(allErrs, err)
- }
- t.metrics.Reset(trigger)
- } else {
- allErrs = append(allErrs, errs...)
- }
- }
- if err := t.metrics.Write(); err != nil {
- allErrs = append(allErrs, err)
- }
- if len(allErrs) > 0 {
- rvMsg := "Encountered errors running periodic triggers:"
- for _, err := range allErrs {
- rvMsg += fmt.Sprintf("\n%s", err)
- }
- return errors.New(rvMsg)
- }
- return nil
-}
-
-// Start running periodic triggers in a loop.
-func Start(ctx context.Context, workdir string, triggers map[string][]func(context.Context) error) error {
- t, err := NewTriggerer(workdir)
- if err != nil {
- return err
- }
- for trigger, funcs := range triggers {
- for _, fn := range funcs {
- t.Register(trigger, fn)
- }
- }
- lv := metrics2.NewLiveness("last_successful_periodic_trigger_loop")
- go util.RepeatCtx(time.Minute, ctx, func() {
- if err := t.RunPeriodicTriggers(ctx); err != nil {
- sklog.Errorf("Failed to run periodic triggers: %s", err)
- } else {
- lv.Reset()
- }
- })
- return nil
-}
diff --git a/go/periodic_triggers/periodic_triggers_test.go b/go/periodic_triggers/periodic_triggers_test.go
deleted file mode 100644
index 7d50e97..0000000
--- a/go/periodic_triggers/periodic_triggers_test.go
+++ /dev/null
@@ -1,45 +0,0 @@
-package periodic_triggers
-
-import (
- "context"
- "io/ioutil"
- "os"
- "path"
- "testing"
-
- assert "github.com/stretchr/testify/require"
- "go.skia.org/infra/go/testutils"
-)
-
-func TestTriggers(t *testing.T) {
- testutils.SmallTest(t)
- ctx := context.Background()
- wd, err := ioutil.TempDir("", "")
- assert.NoError(t, err)
- defer testutils.RemoveAll(t, wd)
-
- p, err := NewTriggerer(wd)
- assert.NoError(t, err)
-
- // Add a periodic trigger.
- ran := false
- p.Register("test", func(ctx context.Context) error {
- ran = true
- return nil
- })
-
- // Run periodic triggers. The trigger file does not exist, so we
- // shouldn't run the function.
- assert.False(t, ran)
- assert.NoError(t, p.RunPeriodicTriggers(ctx))
- assert.False(t, ran)
-
- // Write the trigger file. Cycle, ensure that the trigger file was
- // removed and the periodic task was added.
- triggerFile := path.Join(p.workdir, TRIGGER_DIRNAME, "test")
- assert.NoError(t, ioutil.WriteFile(triggerFile, []byte{}, os.ModePerm))
- assert.NoError(t, p.RunPeriodicTriggers(ctx))
- assert.True(t, ran)
- _, err = os.Stat(triggerFile)
- assert.True(t, os.IsNotExist(err))
-}
diff --git a/periodic-trigger/Dockerfile b/periodic-trigger/Dockerfile
new file mode 100644
index 0000000..a722b40
--- /dev/null
+++ b/periodic-trigger/Dockerfile
@@ -0,0 +1,7 @@
+FROM gcr.io/skia-public/basedebian:testing-slim
+
+USER skia
+
+COPY . /
+
+ENTRYPOINT ["/usr/local/bin/periodic-trigger"]
diff --git a/periodic-trigger/Makefile b/periodic-trigger/Makefile
new file mode 100644
index 0000000..62dae53
--- /dev/null
+++ b/periodic-trigger/Makefile
@@ -0,0 +1,11 @@
+include ../go/skiaversion/skiaversion.mk
+include ../kube/kube.mk
+
+.PHONY: build
+build: skiaversion
+ GOOS=linux go install -a ./go/periodic-trigger
+
+.PHONY: push
+push: build pushk
+ ./build_release
+ pushk --message="$(MESSAGE)" --cluster=skia-public periodic-trigger
diff --git a/periodic-trigger/build_release b/periodic-trigger/build_release
new file mode 100755
index 0000000..dd4be2a
--- /dev/null
+++ b/periodic-trigger/build_release
@@ -0,0 +1,15 @@
+#!/bin/bash
+APPNAME=periodic-trigger
+
+set -x -e
+
+# Copy files into the right locations in ${ROOT}.
+copy_release_files()
+{
+INSTALL="install -D --verbose --backup=none"
+INSTALL_DIR="install -d --verbose --backup=none"
+${INSTALL} --mode=644 -T ./Dockerfile ${ROOT}/Dockerfile
+${INSTALL} --mode=755 -T ${GOPATH}/bin/${APPNAME} ${ROOT}/usr/local/bin/${APPNAME}
+}
+
+source ../bash/docker_build.sh
diff --git a/periodic-trigger/go/periodic-trigger/main.go b/periodic-trigger/go/periodic-trigger/main.go
new file mode 100644
index 0000000..daad560
--- /dev/null
+++ b/periodic-trigger/go/periodic-trigger/main.go
@@ -0,0 +1,43 @@
+package main
+
+import (
+ "context"
+ "flag"
+ "fmt"
+ "time"
+
+ "go.skia.org/infra/go/auth"
+ "go.skia.org/infra/go/common"
+ "go.skia.org/infra/go/periodic"
+ "go.skia.org/infra/go/skiaversion"
+ "go.skia.org/infra/go/sklog"
+)
+
+const (
+ // Template used for creating unique IDs for instances of triggers.
+ TRIGGER_TS = "2006-01-02"
+)
+
+var (
+ local = flag.Bool("local", false, "Running locally if true. As opposed to in production.")
+ project = flag.String("project", "", "GCE project in which to publish the pub/sub message.")
+ trigger = flag.String("trigger", "", "Name of periodic trigger.")
+)
+
+func main() {
+ common.Init()
+ defer common.Defer()
+ skiaversion.MustLogVersion()
+ ts, err := auth.NewDefaultTokenSource(*local, periodic.AUTH_SCOPE)
+ if err != nil {
+ sklog.Fatal(err)
+ }
+ // TODO(borenet): This ID is not necessarily unique; if the cron job is
+ // significantly delayed, we might end up sending the same message twice
+ // with different dates. It also doesn't allow for periods smaller than
+ // 24 hours.
+ id := fmt.Sprintf("%s-%s", *trigger, time.Now().UTC().Format(TRIGGER_TS))
+ if err := periodic.Trigger(context.Background(), *trigger, id, ts); err != nil {
+ sklog.Fatal(err)
+ }
+}
diff --git a/task_scheduler/Makefile b/task_scheduler/Makefile
index d67a88a..fb8299d 100644
--- a/task_scheduler/Makefile
+++ b/task_scheduler/Makefile
@@ -60,14 +60,6 @@
go install -v ../push/go/pushcli
pushcli task-scheduler-db-backup skia-task-scheduler
-.PHONY: push_trigger
-push_trigger: all
- ./build_trigger_nightly_release "`git log -n1 --format=%s`"
- ./build_trigger_weekly_release "`git log -n1 --format=%s`"
- go install -v ../push/go/pushcli
- pushcli task-scheduler-trigger-nightly skia-task-scheduler
- pushcli task-scheduler-trigger-weekly skia-task-scheduler
-
.PHONY: vm_create_prod
vm_create_prod:
go run ./vm.go --alsologtostderr --create --ignore-exists --instance=prod
diff --git a/task_scheduler/build_trigger_nightly_release b/task_scheduler/build_trigger_nightly_release
deleted file mode 100755
index d862000..0000000
--- a/task_scheduler/build_trigger_nightly_release
+++ /dev/null
@@ -1,20 +0,0 @@
-#!/bin/bash
-# Builds and uploads a debian package for task-scheduler-trigger-nightly.
-APPNAME=task-scheduler-trigger-nightly
-DESCRIPTION="Service which triggers nightly task-scheduler jobs."
-SYSTEMD="${APPNAME}.service ${APPNAME}.timer"
-SYSTEMD_TIMER=${APPNAME}.timer
-
-set -x -e
-
-# Copy files into the right locations in ${ROOT}.
-copy_release_files()
-{
-INSTALL="fakeroot install -D --verbose --backup=none --group=root --owner=root"
-INSTALL_DIR="fakeroot install -d --verbose --backup=none --group=root --owner=root"
-${INSTALL} --mode=644 -T ./sys/${APPNAME}.service ${ROOT}/etc/systemd/system/${APPNAME}.service
-${INSTALL} --mode=644 -T ./sys/${APPNAME}.timer ${ROOT}/etc/systemd/system/${APPNAME}.timer
-${INSTALL_DIR} --mode=777 ${ROOT}/mnt/pd0/task_scheduler_workdir/periodic-trigger
-}
-
-source ../bash/release.sh
diff --git a/task_scheduler/build_trigger_weekly_release b/task_scheduler/build_trigger_weekly_release
deleted file mode 100755
index 6ddbaa1..0000000
--- a/task_scheduler/build_trigger_weekly_release
+++ /dev/null
@@ -1,20 +0,0 @@
-#!/bin/bash
-# Builds and uploads a debian package for task-scheduler-trigger-weekly.
-APPNAME=task-scheduler-trigger-weekly
-DESCRIPTION="Service which triggers weekly task-scheduler jobs."
-SYSTEMD="${APPNAME}.service ${APPNAME}.timer"
-SYSTEMD_TIMER=${APPNAME}.timer
-
-set -x -e
-
-# Copy files into the right locations in ${ROOT}.
-copy_release_files()
-{
-INSTALL="fakeroot install -D --verbose --backup=none --group=root --owner=root"
-INSTALL_DIR="fakeroot install -d --verbose --backup=none --group=root --owner=root"
-${INSTALL} --mode=644 -T ./sys/${APPNAME}.service ${ROOT}/etc/systemd/system/${APPNAME}.service
-${INSTALL} --mode=644 -T ./sys/${APPNAME}.timer ${ROOT}/etc/systemd/system/${APPNAME}.timer
-${INSTALL_DIR} --mode=777 ${ROOT}/mnt/pd0/task_scheduler_workdir/periodic-trigger
-}
-
-source ../bash/release.sh
diff --git a/task_scheduler/create-task-scheduler-internal-sa.sh b/task_scheduler/create-task-scheduler-internal-sa.sh
new file mode 100755
index 0000000..7d96c1e
--- /dev/null
+++ b/task_scheduler/create-task-scheduler-internal-sa.sh
@@ -0,0 +1,21 @@
+#/bin/bash
+
+# Creates the service account used by Skia Task Scheduler Internal, and export a
+# key for it into the kubernetes cluster as a secret.
+
+set -e -x
+source ../kube/corp-config.sh
+source ../bash/ramdisk.sh
+
+# New service account we will create.
+SA_NAME="task-scheduler-internal"
+
+cd /tmp/ramdisk
+
+gcloud --project=${PROJECT_ID} iam service-accounts create "${SA_NAME}" --display-name="Service account for Skia Task Scheduler Internal"
+
+gcloud beta iam service-accounts keys create ${SA_NAME}.json --iam-account="${SA_NAME}@${PROJECT_SUBDOMAIN}.iam.gserviceaccount.com"
+
+kubectl create secret generic "${SA_NAME}" --from-file=key.json=${SA_NAME}.json
+
+cd -
diff --git a/task_scheduler/create-task-scheduler-sa.sh b/task_scheduler/create-task-scheduler-sa.sh
new file mode 100755
index 0000000..14b7694
--- /dev/null
+++ b/task_scheduler/create-task-scheduler-sa.sh
@@ -0,0 +1,21 @@
+#/bin/bash
+
+# Creates the service account used by Skia Task Scheduler, and export a key for
+# it into the kubernetes cluster as a secret.
+
+set -e -x
+source ../kube/config.sh
+source ../bash/ramdisk.sh
+
+# New service account we will create.
+SA_NAME="task-scheduler"
+
+cd /tmp/ramdisk
+
+gcloud --project=${PROJECT_ID} iam service-accounts create "${SA_NAME}" --display-name="Service account for Skia Task Scheduler"
+
+gcloud beta iam service-accounts keys create ${SA_NAME}.json --iam-account="${SA_NAME}@${PROJECT_SUBDOMAIN}.iam.gserviceaccount.com"
+
+kubectl create secret generic "${SA_NAME}" --from-file=key.json=${SA_NAME}.json
+
+cd -
diff --git a/task_scheduler/go/db/cache/cache.go b/task_scheduler/go/db/cache/cache.go
index 18d009e..5f79ce2 100644
--- a/task_scheduler/go/db/cache/cache.go
+++ b/task_scheduler/go/db/cache/cache.go
@@ -447,6 +447,10 @@
// the given RepoState. Does not search the underlying DB.
GetJobsByRepoState(string, types.RepoState) ([]*types.Job, error)
+ // GetMatchingJobsFromDateRange retrieves all jobs which were created
+ // in the given date range and match one of the given job names.
+ GetMatchingJobsFromDateRange(names []string, from time.Time, to time.Time) (map[string][]*types.Job, error)
+
// ScheduledJobsForCommit indicates whether or not we triggered any jobs
// for the given repo/commit.
ScheduledJobsForCommit(string, string) (bool, error)
@@ -514,6 +518,23 @@
}
// See documentation for JobCache interface.
+func (c *jobCache) GetMatchingJobsFromDateRange(names []string, from time.Time, to time.Time) (map[string][]*types.Job, error) {
+ c.mtx.RLock()
+ defer c.mtx.RUnlock()
+ m := make(map[string]bool, len(names))
+ for _, name := range names {
+ m[name] = true
+ }
+ rv := make(map[string][]*types.Job, len(names))
+ for _, job := range c.jobs {
+ if !from.After(job.Created) && job.Created.Before(to) && m[job.Name] {
+ rv[job.Name] = append(rv[job.Name], job)
+ }
+ }
+ return rv, nil
+}
+
+// See documentation for JobCache interface.
func (c *jobCache) ScheduledJobsForCommit(repo, rev string) (bool, error) {
c.mtx.RLock()
defer c.mtx.RUnlock()
diff --git a/task_scheduler/go/db/cache/cache_test.go b/task_scheduler/go/db/cache/cache_test.go
index 5eb1972..d5da9ef 100644
--- a/task_scheduler/go/db/cache/cache_test.go
+++ b/task_scheduler/go/db/cache/cache_test.go
@@ -740,6 +740,19 @@
}
}
assert.True(t, found)
+
+ found = false
+ jobsByName, err := c.GetMatchingJobsFromDateRange([]string{job.Name}, job.Created, job.Created.Add(time.Nanosecond))
+ assert.NoError(t, err)
+ for _, jobsForName := range jobsByName {
+ for _, otherJob := range jobsForName {
+ if job.Id == otherJob.Id {
+ deepequal.AssertDeepEqual(t, job, otherJob)
+ found = true
+ }
+ }
+ }
+ assert.True(t, found)
}
}
@@ -769,6 +782,18 @@
t.Fatalf("Found unexpected job %v in GetJobsByRepoState", job)
}
}
+
+ found := false
+ jobsByName, err := c.GetMatchingJobsFromDateRange([]string{job.Name}, time.Time{}, time.Now().Add(10*24*time.Hour))
+ assert.NoError(t, err)
+ for _, jobsForName := range jobsByName {
+ for _, otherJob := range jobsForName {
+ if job.Id == otherJob.Id {
+ found = true
+ }
+ }
+ }
+ assert.False(t, found)
}
}
@@ -981,3 +1006,37 @@
_, err = grt("a.git", "aaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaa")
assert.EqualError(t, err, "Unknown commit a.git@aaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaa")
}
+
+func TestJobCacheGetMatchingJobsFromDateRange(t *testing.T) {
+ testutils.SmallTest(t)
+
+ d := memory.NewInMemoryJobDB(nil)
+
+ // Pre-load a job into the DB.
+ startTime := time.Now().Add(-30 * time.Minute) // Arbitrary starting point.
+ j1 := types.MakeTestJob(startTime)
+ j2 := types.MakeTestJob(startTime)
+ j2.Name = "job2"
+ assert.NoError(t, d.PutJobs([]*types.Job{j1, j2}))
+
+ // Create the cache. Ensure that the existing job is present.
+ w, err := window.New(time.Hour, 0, nil)
+ assert.NoError(t, err)
+ c, err := NewJobCache(d, w, db.DummyGetRevisionTimestamp(j1.Created.Add(-1*time.Minute)))
+ assert.NoError(t, err)
+
+ test := func(names []string, start, end time.Time, expect ...*types.Job) {
+ expectByName := make(map[string][]*types.Job, len(expect))
+ for _, job := range expect {
+ expectByName[job.Name] = append(expectByName[job.Name], job)
+ }
+ jobs, err := c.GetMatchingJobsFromDateRange(names, start, end)
+ assert.NoError(t, err)
+ deepequal.AssertDeepEqual(t, expectByName, jobs)
+ }
+ test([]string{j1.Name, j2.Name}, time.Time{}, time.Now().Add(24*time.Hour), j1, j2)
+ test([]string{j1.Name, j2.Name}, j1.Created, j1.Created.Add(time.Nanosecond), j1, j2)
+ test([]string{j1.Name, j2.Name}, time.Time{}, j1.Created)
+ test([]string{j1.Name, j2.Name}, j1.Created.Add(time.Nanosecond), time.Now().Add(24*time.Hour))
+ test([]string{j1.Name}, j1.Created, j1.Created.Add(time.Nanosecond), j1)
+}
diff --git a/task_scheduler/go/scheduling/periodic.go b/task_scheduler/go/scheduling/periodic.go
deleted file mode 100644
index 70aa0be..0000000
--- a/task_scheduler/go/scheduling/periodic.go
+++ /dev/null
@@ -1,58 +0,0 @@
-package scheduling
-
-import (
- "context"
-
- "go.skia.org/infra/go/sklog"
- "go.skia.org/infra/task_scheduler/go/specs"
- "go.skia.org/infra/task_scheduler/go/types"
-)
-
-// Trigger all jobs with the given trigger name.
-func triggerPeriodicJobsWithName(ctx context.Context, s *TaskScheduler, trigger string) error {
- // Obtain the TasksCfg at tip of master in each repo.
- cfgs := make(map[types.RepoState]*specs.TasksCfg, len(s.repos))
- for url, repo := range s.repos {
- head := repo.Get("master")
- rs := types.RepoState{
- Repo: url,
- Revision: head.Hash,
- }
- cfg, err := s.taskCfgCache.ReadTasksCfg(ctx, rs)
- if err != nil {
- return err
- }
- cfgs[rs] = cfg
- }
- // Trigger the periodic tasks.
- sklog.Infof("Triggering %s tasks", trigger)
- jobs := []*types.Job{}
- for rs, cfg := range cfgs {
- for name, spec := range cfg.Jobs {
- if spec.Trigger == trigger {
- j, err := s.taskCfgCache.MakeJob(ctx, rs, name)
- if err != nil {
- return err
- }
- jobs = append(jobs, j)
- }
- }
- }
- return s.db.PutJobs(jobs)
-}
-
-// Register the nightly and weekly jobs to run.
-func (s *TaskScheduler) registerPeriodicTriggers() {
- s.periodicTriggers.Register("nightly", func(ctx context.Context) error {
- return triggerPeriodicJobsWithName(ctx, s, "nightly")
- })
- s.periodicTriggers.Register("weekly", func(ctx context.Context) error {
- return triggerPeriodicJobsWithName(ctx, s, "weekly")
- })
-}
-
-// triggerPeriodicJobs triggers jobs at HEAD of the master branch in each repo
-// for any files present in the trigger dir.
-func (s *TaskScheduler) triggerPeriodicJobs(ctx context.Context) error {
- return s.periodicTriggers.RunPeriodicTriggers(ctx)
-}
diff --git a/task_scheduler/go/scheduling/task_scheduler.go b/task_scheduler/go/scheduling/task_scheduler.go
index cb379b4..411e2d1 100644
--- a/task_scheduler/go/scheduling/task_scheduler.go
+++ b/task_scheduler/go/scheduling/task_scheduler.go
@@ -20,7 +20,6 @@
"go.skia.org/infra/go/git/repograph"
"go.skia.org/infra/go/isolate"
"go.skia.org/infra/go/metrics2"
- "go.skia.org/infra/go/periodic_triggers"
"go.skia.org/infra/go/sklog"
"go.skia.org/infra/go/swarming"
"go.skia.org/infra/go/timeout"
@@ -111,7 +110,6 @@
pendingInsert map[string]bool
pendingInsertMtx sync.RWMutex
- periodicTriggers *periodic_triggers.Triggerer
pools []string
pubsubTopic string
queue []*taskCandidate // protected by queueMtx.
@@ -160,12 +158,6 @@
if err != nil {
return nil, fmt.Errorf("Failed to create TryJobIntegrator: %s", err)
}
-
- pt, err := periodic_triggers.NewTriggerer(workdir)
- if err != nil {
- return nil, fmt.Errorf("Failed to create periodic triggers: %s", err)
- }
-
s := &TaskScheduler{
bl: bl,
busyBots: newBusyBots(),
@@ -177,7 +169,6 @@
newTasks: map[types.RepoState]util.StringSet{},
newTasksMtx: sync.RWMutex{},
pendingInsert: map[string]bool{},
- periodicTriggers: pt,
pools: pools,
pubsubTopic: pubsubTopic,
queue: []*taskCandidate{},
@@ -192,7 +183,6 @@
window: w,
workdir: workdir,
}
- s.registerPeriodicTriggers()
return s, nil
}
@@ -317,6 +307,92 @@
return s.taskCfgCache.RecentSpecsAndCommits()
}
+// MaybeTriggerPeriodicJobs triggers all periodic jobs with the given trigger
+// name, if those jobs haven't already been triggered.
+func (s *TaskScheduler) MaybeTriggerPeriodicJobs(ctx context.Context, triggerName string) error {
+ // We'll search the jobs we've already triggered to ensure that we don't
+ // trigger the same jobs multiple times in a day/week/whatever. Search a
+ // window that is not quite the size of the trigger interval, to allow
+ // for lag time.
+ end := time.Now()
+ var start time.Time
+ if triggerName == specs.TRIGGER_NIGHTLY {
+ start = end.Add(-23 * time.Hour)
+ } else if triggerName == specs.TRIGGER_WEEKLY {
+ // Note that if the cache window is less than a week, this start
+ // time isn't going to work as expected. However, we only really
+ // expect to need to debounce periodic triggers for a short
+ // window, so anything longer than a few minutes would probably
+ // be enough, and the ~4 days we normally keep in the cache
+ // should be more than sufficient.
+ start = end.Add(-6 * 24 * time.Hour)
+ } else {
+ sklog.Warningf("Ignoring unknown periodic trigger %q", triggerName)
+ return nil
+ }
+
+ // Find the job specs matching the trigger and create Job instances.
+ jobs := []*types.Job{}
+ for repoUrl, repo := range s.repos {
+ master := repo.Get("master")
+ if master == nil {
+ return fmt.Errorf("Failed to retrieve branch 'master' for %s", repoUrl)
+ }
+ rs := types.RepoState{
+ Repo: repoUrl,
+ Revision: master.Hash,
+ }
+ cfg, err := s.taskCfgCache.ReadTasksCfg(ctx, rs)
+ if err != nil {
+ return fmt.Errorf("Failed to retrieve TaskCfg from %s: %s", repoUrl, err)
+ }
+ for name, js := range cfg.Jobs {
+ if js.Trigger == triggerName {
+ job, err := s.taskCfgCache.MakeJob(ctx, rs, name)
+ if err != nil {
+ return fmt.Errorf("Failed to create job: %s", err)
+ }
+ jobs = append(jobs, job)
+ }
+ }
+ }
+ if len(jobs) == 0 {
+ return nil
+ }
+
+ // Filter out any jobs which we've already triggered. Generally, we'd
+ // expect to have triggered all of the jobs or none of them, but there
+ // might be circumstances which caused us to trigger a partial set.
+ names := make([]string, 0, len(jobs))
+ for _, job := range jobs {
+ names = append(names, job.Name)
+ }
+ existing, err := s.jCache.GetMatchingJobsFromDateRange(names, start, end)
+ if err != nil {
+ return err
+ }
+ jobsToInsert := make([]*types.Job, 0, len(jobs))
+ for _, job := range jobs {
+ existingJobs := existing[job.Name]
+ if len(existingJobs) == 0 {
+ jobsToInsert = append(jobsToInsert, job)
+ } else {
+ prev := existingJobs[0] // Pick an arbitrary pre-existing job for logging.
+ sklog.Warningf("Already triggered %d jobs for %s (eg. id %s at %s); not triggering again.", len(existingJobs), job.Name, prev.Id, prev.Created)
+ }
+ }
+ if len(jobsToInsert) == 0 {
+ return nil
+ }
+
+ // Insert the new jobs into the DB.
+ if err := s.db.PutJobs(jobsToInsert); err != nil {
+ return fmt.Errorf("Failed to add periodic jobs: %s", err)
+ }
+ sklog.Infof("Created %d periodic jobs for trigger %q", len(jobs), triggerName)
+ return nil
+}
+
// TriggerJob adds the given Job to the database and returns its ID.
func (s *TaskScheduler) TriggerJob(ctx context.Context, repo, commit, jobName string) (string, error) {
j, err := s.taskCfgCache.MakeJob(ctx, types.RepoState{
@@ -1289,11 +1365,6 @@
return err
}
- // Also trigger any available periodic jobs.
- if err := s.triggerPeriodicJobs(ctx); err != nil {
- return err
- }
-
return s.jCache.Update()
}
diff --git a/task_scheduler/go/scheduling/task_scheduler_test.go b/task_scheduler/go/scheduling/task_scheduler_test.go
index 8ac2804..ad14c96 100644
--- a/task_scheduler/go/scheduling/task_scheduler_test.go
+++ b/task_scheduler/go/scheduling/task_scheduler_test.go
@@ -26,7 +26,6 @@
"go.skia.org/infra/go/isolate"
metrics2_testutils "go.skia.org/infra/go/metrics2/testutils"
"go.skia.org/infra/go/mockhttpclient"
- "go.skia.org/infra/go/periodic_triggers"
"go.skia.org/infra/go/swarming"
"go.skia.org/infra/go/testutils"
"go.skia.org/infra/go/util"
@@ -195,7 +194,6 @@
tmp, err := ioutil.TempDir("", "")
assert.NoError(t, err)
- assert.NoError(t, os.Mkdir(path.Join(tmp, periodic_triggers.TRIGGER_DIRNAME), os.ModePerm))
d := memory.NewInMemoryDB(nil)
isolateClient, err := isolate.NewClient(tmp, isolate.ISOLATE_SERVER_URL_FAKE)
assert.NoError(t, err)
@@ -1976,7 +1974,6 @@
gb := git_testutils.GitInit(t, ctx)
workdir, err := ioutil.TempDir("", "")
assert.NoError(t, err)
- assert.NoError(t, os.Mkdir(path.Join(workdir, periodic_triggers.TRIGGER_DIRNAME), os.ModePerm))
assert.NoError(t, ioutil.WriteFile(path.Join(workdir, ".gclient"), []byte("dummy"), os.ModePerm))
infraBotsSubDir := path.Join("infra", "bots")
@@ -2680,17 +2677,25 @@
defer cleanup()
// Rewrite tasks.json with a periodic job.
- name := "Periodic-Task"
+ nightlyName := "Nightly-Job"
+ weeklyName := "Weekly-Job"
+ names := []string{nightlyName, weeklyName}
+ taskName := "Periodic-Task"
cfg := &specs.TasksCfg{
Jobs: map[string]*specs.JobSpec{
- "Periodic-Job": {
+ nightlyName: {
Priority: 1.0,
- TaskSpecs: []string{name},
- Trigger: "nightly",
+ TaskSpecs: []string{taskName},
+ Trigger: specs.TRIGGER_NIGHTLY,
+ },
+ weeklyName: {
+ Priority: 1.0,
+ TaskSpecs: []string{taskName},
+ Trigger: specs.TRIGGER_WEEKLY,
},
},
Tasks: map[string]*specs.TaskSpec{
- name: {
+ taskName: {
CipdPackages: []*specs.CipdPackage{},
Dependencies: []string{},
Dimensions: []string{
@@ -2708,25 +2713,54 @@
}
gb.Add(ctx, specs.TASKS_CFG_FILE, testutils.MarshalJSON(t, &cfg))
gb.Commit(ctx)
-
- // Cycle, ensure that the periodic task is not added.
assert.NoError(t, s.MainLoop(ctx))
- assert.NoError(t, s.jCache.Update())
- unfinished, err := s.jCache.UnfinishedJobs()
- assert.NoError(t, err)
- assert.Equal(t, 5, len(unfinished)) // Existing per-commit jobs.
- // Write the trigger file. Cycle, ensure that the trigger file was
- // removed and the periodic task was added.
- triggerFile := path.Join(s.workdir, periodic_triggers.TRIGGER_DIRNAME, "nightly")
- assert.NoError(t, ioutil.WriteFile(triggerFile, []byte{}, os.ModePerm))
- assert.NoError(t, s.MainLoop(ctx))
- _, err = os.Stat(triggerFile)
- assert.True(t, os.IsNotExist(err))
+ // Trigger the periodic jobs. Make sure that we inserted the new Job.
+ assert.NoError(t, s.MaybeTriggerPeriodicJobs(ctx, specs.TRIGGER_NIGHTLY))
assert.NoError(t, s.jCache.Update())
- unfinished, err = s.jCache.UnfinishedJobs()
+ start := time.Now().Add(-10 * time.Minute)
+ end := time.Now().Add(10 * time.Minute)
+ jobs, err := s.jCache.GetMatchingJobsFromDateRange(names, start, end)
assert.NoError(t, err)
- assert.Equal(t, 6, len(unfinished))
+ assert.Equal(t, 1, len(jobs[nightlyName]))
+ assert.Equal(t, nightlyName, jobs[nightlyName][0].Name)
+ assert.Equal(t, 0, len(jobs[weeklyName]))
+
+ // Ensure that we don't trigger another.
+ assert.NoError(t, s.MaybeTriggerPeriodicJobs(ctx, specs.TRIGGER_NIGHTLY))
+ assert.NoError(t, s.jCache.Update())
+ jobs, err = s.jCache.GetMatchingJobsFromDateRange(names, start, end)
+ assert.NoError(t, err)
+ assert.Equal(t, 1, len(jobs[nightlyName]))
+ assert.Equal(t, 0, len(jobs[weeklyName]))
+
+ // Hack the old Job's created time to simulate it scrolling out of the
+ // window.
+ oldJob := jobs[nightlyName][0]
+ oldJob.Created = start.Add(-23 * time.Hour)
+ assert.NoError(t, s.db.PutJob(oldJob))
+ assert.NoError(t, s.jCache.Update())
+ jobs, err = s.jCache.GetMatchingJobsFromDateRange(names, start, end)
+ assert.NoError(t, err)
+ assert.Equal(t, 0, len(jobs[nightlyName]))
+ assert.Equal(t, 0, len(jobs[weeklyName]))
+ assert.NoError(t, s.MaybeTriggerPeriodicJobs(ctx, specs.TRIGGER_NIGHTLY))
+ assert.NoError(t, s.jCache.Update())
+ jobs, err = s.jCache.GetMatchingJobsFromDateRange(names, start, end)
+ assert.NoError(t, err)
+ assert.Equal(t, 1, len(jobs[nightlyName]))
+ assert.Equal(t, nightlyName, jobs[nightlyName][0].Name)
+ assert.Equal(t, 0, len(jobs[weeklyName]))
+
+ // Make sure we don't confuse different triggers.
+ assert.NoError(t, s.MaybeTriggerPeriodicJobs(ctx, specs.TRIGGER_WEEKLY))
+ assert.NoError(t, s.jCache.Update())
+ jobs, err = s.jCache.GetMatchingJobsFromDateRange(names, start, end)
+ assert.NoError(t, err)
+ assert.Equal(t, 1, len(jobs[nightlyName]))
+ assert.Equal(t, nightlyName, jobs[nightlyName][0].Name)
+ assert.Equal(t, 1, len(jobs[weeklyName]))
+ assert.Equal(t, weeklyName, jobs[weeklyName][0].Name)
}
func TestUpdateUnfinishedTasks(t *testing.T) {
diff --git a/task_scheduler/go/specs/specs.go b/task_scheduler/go/specs/specs.go
index dbed268..31eea43 100644
--- a/task_scheduler/go/specs/specs.go
+++ b/task_scheduler/go/specs/specs.go
@@ -19,6 +19,7 @@
"go.skia.org/infra/go/git"
"go.skia.org/infra/go/git/repograph"
"go.skia.org/infra/go/metrics2"
+ "go.skia.org/infra/go/periodic"
"go.skia.org/infra/go/sklog"
"go.skia.org/infra/go/util"
"go.skia.org/infra/task_scheduler/go/types"
@@ -44,12 +45,12 @@
// others.
TRIGGER_MASTER_ONLY = "master"
// Trigger this job every night.
- TRIGGER_NIGHTLY = "nightly"
+ TRIGGER_NIGHTLY = periodic.TRIGGER_NIGHTLY
// Don't trigger this job automatically. It will only be run when
// explicitly triggered via a try job or a force trigger.
TRIGGER_ON_DEMAND = "on demand"
// Trigger this job weekly.
- TRIGGER_WEEKLY = "weekly"
+ TRIGGER_WEEKLY = periodic.TRIGGER_WEEKLY
VARIABLE_SYNTAX = "<(%s)"
diff --git a/task_scheduler/go/task_scheduler/main.go b/task_scheduler/go/task_scheduler/main.go
index f5617ea..acca858 100644
--- a/task_scheduler/go/task_scheduler/main.go
+++ b/task_scheduler/go/task_scheduler/main.go
@@ -29,6 +29,7 @@
"go.skia.org/infra/go/isolate"
"go.skia.org/infra/go/login"
"go.skia.org/infra/go/metadata"
+ "go.skia.org/infra/go/periodic"
"go.skia.org/infra/go/skiaversion"
"go.skia.org/infra/go/sklog"
"go.skia.org/infra/go/swarming"
@@ -545,7 +546,6 @@
httputils.ReportError(w, r, err, fmt.Sprintf("Failed to write response: %s", err))
return
}
-
}
func runServer(serverURL string, taskDb db.RemoteDB) {
@@ -762,6 +762,17 @@
}
})
+ // Set up periodic triggers.
+ if err := periodic.Listen(ctx, fmt.Sprintf("task-scheduler-%s", *pubsubTopicSet), tokenSource, func(ctx context.Context, name, id string) bool {
+ if err := ts.MaybeTriggerPeriodicJobs(ctx, name); err != nil {
+ sklog.Errorf("Failed to trigger periodic jobs; will retry later: %s", err)
+ return false // We will retry later.
+ }
+ return true
+ }); err != nil {
+ sklog.Fatal(err)
+ }
+
// Start up the web server.
login.SimpleInitMust(*port, *local)
diff --git a/task_scheduler/sys/task-scheduler-trigger-nightly.service b/task_scheduler/sys/task-scheduler-trigger-nightly.service
deleted file mode 100644
index 2e2053d..0000000
--- a/task_scheduler/sys/task-scheduler-trigger-nightly.service
+++ /dev/null
@@ -1,16 +0,0 @@
-[Unit]
-Description=Trigger Task Scheduler Nightly Jobs
-Requires=mnt-pd0.mount
-Wants=network-online.target
-After=mnt-pd0.mount network-online.target
-
-[Service]
-Type=oneshot
-# Touch "workdir/triggerdir/trigger", where:
-# - workdir is --workdir argument in task-scheduler.service
-# - triggerdir is scheduling.TRIGGER_DIRNAME in periodic.go
-# - trigger selects Jobs by the value of JobSpec.Trigger
-ExecStart=/usr/bin/touch /mnt/pd0/task_scheduler_workdir/periodic-trigger/nightly
-User=default
-Group=default
-LimitNOFILE=10000
diff --git a/task_scheduler/sys/task-scheduler-trigger-nightly.timer b/task_scheduler/sys/task-scheduler-trigger-nightly.timer
deleted file mode 100644
index 1042e40..0000000
--- a/task_scheduler/sys/task-scheduler-trigger-nightly.timer
+++ /dev/null
@@ -1,9 +0,0 @@
-[Unit]
-Description=Run task-scheduler-trigger-nightly.service nightly.
-
-[Timer]
-OnCalendar=*-*-* 05:00:00
-Persistent=false
-
-[Install]
-WantedBy=timers.target
diff --git a/task_scheduler/sys/task-scheduler-trigger-weekly.service b/task_scheduler/sys/task-scheduler-trigger-weekly.service
deleted file mode 100644
index 04599b9..0000000
--- a/task_scheduler/sys/task-scheduler-trigger-weekly.service
+++ /dev/null
@@ -1,16 +0,0 @@
-[Unit]
-Description=Trigger Task Scheduler Weekly Jobs
-Requires=mnt-pd0.mount
-Wants=network-online.target
-After=mnt-pd0.mount network-online.target
-
-[Service]
-Type=oneshot
-# Touch "workdir/triggerdir/trigger", where:
-# - workdir is --workdir argument in task-scheduler.service
-# - triggerdir is scheduling.TRIGGER_DIRNAME in periodic.go
-# - trigger selects Jobs by the value of JobSpec.Trigger
-ExecStart=/usr/bin/touch /mnt/pd0/task_scheduler_workdir/periodic-trigger/weekly
-User=default
-Group=default
-LimitNOFILE=10000
diff --git a/task_scheduler/sys/task-scheduler-trigger-weekly.timer b/task_scheduler/sys/task-scheduler-trigger-weekly.timer
deleted file mode 100644
index 623be21..0000000
--- a/task_scheduler/sys/task-scheduler-trigger-weekly.timer
+++ /dev/null
@@ -1,9 +0,0 @@
-[Unit]
-Description=Run task-scheduler-trigger-weekly.service weekly.
-
-[Timer]
-OnCalendar=Sun *-*-* 05:00:00
-Persistent=false
-
-[Install]
-WantedBy=timers.target