[task scheduler] Use kubernetes jobs to trigger periodic tasks via pubsub

The jobs are added in https://skia-review.googlesource.com/c/skia-public-config/+/182120/

Bug: skia:8636
Change-Id: I603263e43eba847dd8ccc8c5955a06a33e6b233c
Reviewed-on: https://skia-review.googlesource.com/c/179849
Commit-Queue: Eric Boren <borenet@google.com>
Reviewed-by: Ben Wagner <benjaminwagner@google.com>
diff --git a/go/periodic/periodic.go b/go/periodic/periodic.go
new file mode 100644
index 0000000..b827e9f
--- /dev/null
+++ b/go/periodic/periodic.go
@@ -0,0 +1,163 @@
+package periodic
+
+import (
+	"context"
+	"fmt"
+	"time"
+
+	"cloud.google.com/go/pubsub"
+	"go.skia.org/infra/go/sklog"
+	"go.skia.org/infra/go/util"
+	"golang.org/x/oauth2"
+	"google.golang.org/api/option"
+)
+
+const (
+	// Authentication scope required for periodic triggers.
+	AUTH_SCOPE = pubsub.ScopePubSub
+
+	// PubSub topic used for periodic triggers. A single topic is used for
+	// all triggers, with each message containing an attribute indicating
+	// which trigger promted it.
+	PUBSUB_TOPIC = "periodic-trigger"
+
+	// Attribute sent with all pubsub messages; the name of the periodic
+	// trigger which prompted the message.
+	PUBSUB_ATTR_TRIGGER_NAME = "trigger"
+
+	// Attribute sent with all pubsub messages; the unique ID of the call
+	// to Trigger() which prompted the message.
+	PUBSUB_ATTR_TRIGGER_ID = "id"
+
+	// Acknowledgement deadline for pubsub messages; all TriggerCallbackFns
+	// must be faster than this deadline. If this is changed, all
+	// subscriptions will need to be deleted and recreated.
+	PUBSUB_ACK_DEADLINE = 5 * time.Minute
+
+	// Google Cloud project name used for pubsub.
+	PUBSUB_PROJECT = "skia-public"
+
+	// Names of periodic triggers.
+	TRIGGER_NIGHTLY = "nightly"
+	TRIGGER_WEEKLY  = "weekly"
+)
+
+var (
+	VALID_TRIGGERS = []string{TRIGGER_NIGHTLY, TRIGGER_WEEKLY}
+)
+
+// TriggerCallbackFn is a function called when handling requests for periodic
+// triggers. The string parameters are the name of the periodic trigger and the
+// unique ID of the call to Trigger() which generated the message. The return
+// value determines whether or not the pubsub message should be ACK'd. If the
+// TriggerCallbackFn returns false, it may be called again. TriggerCallbackFns
+// must finish within the PUBSUB_ACK_DEADLINE.
+type TriggerCallbackFn func(context.Context, string, string) bool
+
+// validateTrigger returns an error if the given trigger name is not valid.
+func validateTrigger(triggerName string) error {
+	if !util.In(triggerName, VALID_TRIGGERS) {
+		return fmt.Errorf("Invalid trigger name %q", triggerName)
+	}
+	return nil
+}
+
+// validateId returns an error if the valid trigger ID is not valid.
+func validateId(triggerId string) error {
+	if triggerId == "" {
+		return fmt.Errorf("Invalid trigger ID %q", triggerId)
+	}
+	return nil
+}
+
+// setup is a helper function which returns the pubsub client and topic,
+// creating the topic if necessary.
+func setup(ctx context.Context, ts oauth2.TokenSource) (*pubsub.Client, *pubsub.Topic, error) {
+	c, err := pubsub.NewClient(ctx, PUBSUB_PROJECT, option.WithTokenSource(ts))
+	if err != nil {
+		return nil, nil, err
+	}
+	t := c.Topic(PUBSUB_TOPIC)
+	exists, err := t.Exists(ctx)
+	if err != nil {
+		return nil, nil, err
+	}
+	if !exists {
+		if _, err := c.CreateTopic(ctx, PUBSUB_TOPIC); err != nil {
+			return nil, nil, err
+		}
+	}
+	return c, t, nil
+}
+
+// Listen creates a background goroutine which listens for pubsub messages for
+// periodic triggers. The subscriber name is used as part of the pubsub
+// subscription ID; if there are multiple instances of a server which all need
+// to receive every message, they should use different subscriber names.
+func Listen(ctx context.Context, subscriberName string, ts oauth2.TokenSource, cb TriggerCallbackFn) error {
+	c, t, err := setup(ctx, ts)
+	if err != nil {
+		return err
+	}
+	subId := PUBSUB_TOPIC + "+" + subscriberName
+	sub := c.Subscription(subId)
+	exists, err := sub.Exists(ctx)
+	if err != nil {
+		return err
+	}
+	if !exists {
+		if _, err := c.CreateSubscription(ctx, subId, pubsub.SubscriptionConfig{
+			Topic:       t,
+			AckDeadline: PUBSUB_ACK_DEADLINE + 10*time.Second,
+		}); err != nil {
+			return err
+		}
+	}
+
+	// Start the receiving goroutine.
+	go func() {
+		if err := sub.Receive(ctx, func(ctx context.Context, m *pubsub.Message) {
+			triggerName := m.Attributes[PUBSUB_ATTR_TRIGGER_NAME]
+			if err := validateTrigger(triggerName); err != nil {
+				sklog.Errorf("Received invalid pubsub message: %s", err)
+				m.Ack()
+			}
+			triggerId := m.Attributes[PUBSUB_ATTR_TRIGGER_ID]
+			if err := validateId(triggerId); err != nil {
+				sklog.Errorf("Received invalid pubsub message: %s", err)
+				m.Ack()
+			}
+			if cb(ctx, triggerName, triggerId) {
+				m.Ack()
+			} else {
+				m.Nack()
+			}
+		}); err != nil {
+			sklog.Errorf("Pubsub subscription receive failed: %s", err)
+		}
+	}()
+	return nil
+}
+
+// Send a pubsub message for the given periodic trigger. The triggerId may be
+// used for de-duplication on the subscriber side and should therefore be unique
+// for each invocation of Trigger.
+func Trigger(ctx context.Context, triggerName, triggerId string, ts oauth2.TokenSource) error {
+	if err := validateTrigger(triggerName); err != nil {
+		return err
+	}
+	if err := validateId(triggerId); err != nil {
+		return err
+	}
+	_, t, err := setup(ctx, ts)
+	if err != nil {
+		return err
+	}
+	_, err = t.Publish(ctx, &pubsub.Message{
+		Attributes: map[string]string{
+			PUBSUB_ATTR_TRIGGER_NAME: triggerName,
+			PUBSUB_ATTR_TRIGGER_ID:   triggerId,
+		},
+	}).Get(ctx)
+	return err
+}
diff --git a/go/periodic/periodic_test.go b/go/periodic/periodic_test.go
new file mode 100644
index 0000000..2834795
--- /dev/null
+++ b/go/periodic/periodic_test.go
@@ -0,0 +1,52 @@
+package periodic
+
+import (
+	"context"
+	"fmt"
+	"testing"
+
+	"github.com/google/uuid"
+	"github.com/stretchr/testify/assert"
+	"go.skia.org/infra/go/testutils"
+)
+
+func TestPeriodic(t *testing.T) {
+	testutils.LargeTest(t)
+
+	ctx, cancel := context.WithCancel(context.Background())
+	defer cancel()
+
+	// Test validation.
+	assert.EqualError(t, Trigger(ctx, "bogus", uuid.New().String(), nil), "Invalid trigger name \"bogus\"")
+	assert.EqualError(t, Trigger(ctx, TRIGGER_NIGHTLY, "", nil), "Invalid trigger ID \"\"")
+
+	subName := fmt.Sprintf("periodic-test-%s", uuid.New())
+	expectCh := make(chan string)
+	rvCh := make(chan bool)
+
+	assert.NoError(t, Listen(ctx, subName, nil, func(_ context.Context, trigger, id string) bool {
+		expectTrigger := <-expectCh
+		expectId := <-expectCh
+		assert.Equal(t, expectTrigger, trigger)
+		assert.Equal(t, expectId, id)
+		return <-rvCh
+	}))
+
+	check := func(trigger, id string, rv bool) {
+		expectCh <- trigger
+		expectCh <- id
+		rvCh <- rv
+	}
+	triggerAndCheck := func(trigger, id string, rv bool) {
+		assert.NoError(t, Trigger(ctx, trigger, id, nil))
+		check(trigger, id, rv)
+	}
+
+	// Normal operation; a single pubsub round trip.
+	triggerAndCheck(TRIGGER_NIGHTLY, uuid.New().String(), true)
+
+	// Initial handling fails, the message will be delivered again.
+	id := uuid.New().String()
+	triggerAndCheck(TRIGGER_NIGHTLY, id, false)
+	check(TRIGGER_NIGHTLY, id, true)
+}
diff --git a/go/periodic_triggers/periodic_triggers.go b/go/periodic_triggers/periodic_triggers.go
deleted file mode 100644
index 8176316..0000000
--- a/go/periodic_triggers/periodic_triggers.go
+++ /dev/null
@@ -1,214 +0,0 @@
-package periodic_triggers
-
-import (
-	"context"
-	"encoding/json"
-	"errors"
-	"fmt"
-	"io"
-	"os"
-	"path"
-	"sync"
-	"time"
-
-	"go.skia.org/infra/go/metrics2"
-	"go.skia.org/infra/go/sklog"
-	"go.skia.org/infra/go/util"
-)
-
-const (
-	// TRIGGER_DIRNAME is the name of the directory containing files
-	// indicating that periodic actions should be triggered. Common practice
-	// is to use systemd services to write the files on a timer.
-	TRIGGER_DIRNAME = "periodic-trigger"
-
-	// PERIODIC_TRIGGER_MEASUREMENT is the name of the liveness metric for
-	// periodic triggers.
-	PERIODIC_TRIGGER_MEASUREMENT = "periodic_trigger"
-
-	// LAST_TRIGGERED_JSON_FILE is the name of a JSON file containing the
-	// last-triggered time for each known trigger.
-	LAST_TRIGGERED_JSON_FILE = "last-triggered.json"
-)
-
-// periodicTriggerMetrics tracks liveness metrics for various periodic triggers.
-type periodicTriggerMetrics struct {
-	jsonFile      string
-	LastTriggered map[string]time.Time `json:"last_triggered"`
-	metrics       map[string]metrics2.Liveness
-}
-
-// newPeriodicTriggerMetrics returns a periodicTriggerMetrics instance,
-// pre-filled with data from a file.
-func newPeriodicTriggerMetrics(workdir string) (*periodicTriggerMetrics, error) {
-	var rv periodicTriggerMetrics
-	jsonFile := path.Join(workdir, LAST_TRIGGERED_JSON_FILE)
-	f, err := os.Open(jsonFile)
-	if err == nil {
-		defer util.Close(f)
-		if err := json.NewDecoder(f).Decode(&rv); err != nil {
-			return nil, err
-		}
-	} else if !os.IsNotExist(err) {
-		return nil, err
-	} else {
-		rv.LastTriggered = map[string]time.Time{}
-	}
-	rv.jsonFile = jsonFile
-	rv.metrics = make(map[string]metrics2.Liveness, len(rv.LastTriggered))
-	for trigger, last := range rv.LastTriggered {
-		lv := metrics2.NewLiveness(PERIODIC_TRIGGER_MEASUREMENT, map[string]string{
-			"trigger": trigger,
-		})
-		lv.ManualReset(last)
-		rv.metrics[trigger] = lv
-	}
-	return &rv, nil
-}
-
-// Reset resets the given trigger metric.
-func (m *periodicTriggerMetrics) Reset(name string) {
-	now := time.Now()
-	lv, ok := m.metrics[name]
-	if !ok {
-		sklog.Errorf("Creating metric %s -- %s", PERIODIC_TRIGGER_MEASUREMENT, name)
-		lv = metrics2.NewLiveness(PERIODIC_TRIGGER_MEASUREMENT, map[string]string{
-			"trigger": name,
-		})
-		m.metrics[name] = lv
-	}
-	lv.ManualReset(now)
-	m.LastTriggered[name] = now
-}
-
-// Write writes the last-triggered times to a JSON file.
-func (m *periodicTriggerMetrics) Write() error {
-	return util.WithWriteFile(m.jsonFile, func(w io.Writer) error {
-		return json.NewEncoder(w).Encode(m)
-	})
-}
-
-// findAndParseTriggerFiles returns the base filenames for each file in
-// triggerDir.
-func findAndParseTriggerFiles(triggerDir string) ([]string, error) {
-	dir, err := os.Open(triggerDir)
-	if err != nil {
-		return nil, fmt.Errorf("Unable to read trigger directory %s: %s", triggerDir, err)
-	}
-	defer util.Close(dir)
-	files, err := dir.Readdirnames(1)
-	if err == io.EOF {
-		return []string{}, nil
-	} else if err != nil {
-		return nil, fmt.Errorf("Unable to list trigger directory %s: %s", triggerDir, err)
-	}
-	return files, nil
-}
-
-// deleteTriggerFile removes the given trigger file indicating that the
-// triggered function(s) succeeded.
-func deleteTriggerFile(triggerDir, basename string) error {
-	filename := path.Join(triggerDir, basename)
-	if err := os.Remove(filename); err != nil {
-		return fmt.Errorf("Unable to remove trigger file %s: %s", filename, err)
-	}
-	return nil
-}
-
-// Triggerer is a struct which triggers certain actions on a timer.
-type Triggerer struct {
-	funcs      map[string][]func(context.Context) error
-	metrics    *periodicTriggerMetrics
-	mtx        sync.RWMutex
-	triggerDir string
-	workdir    string
-}
-
-// Return a Triggerer instance.
-func NewTriggerer(workdir string) (*Triggerer, error) {
-	triggerDir := path.Join(workdir, TRIGGER_DIRNAME)
-	if err := os.MkdirAll(triggerDir, os.ModePerm); err != nil {
-		return nil, err
-	}
-	metrics, err := newPeriodicTriggerMetrics(workdir)
-	if err != nil {
-		return nil, err
-	}
-	return &Triggerer{
-		funcs:      map[string][]func(context.Context) error{},
-		metrics:    metrics,
-		mtx:        sync.RWMutex{},
-		triggerDir: triggerDir,
-		workdir:    workdir,
-	}, nil
-}
-
-// Register the given function to run at the given trigger.
-func (t *Triggerer) Register(trigger string, fn func(context.Context) error) {
-	t.mtx.Lock()
-	defer t.mtx.Unlock()
-	t.funcs[trigger] = append(t.funcs[trigger], fn)
-}
-
-// RunPeriodicTriggers returns the set of triggers which have just fired.
-func (t *Triggerer) RunPeriodicTriggers(ctx context.Context) error {
-	triggers, err := findAndParseTriggerFiles(t.triggerDir)
-	if err != nil {
-		return err
-	}
-
-	t.mtx.RLock()
-	defer t.mtx.RUnlock()
-
-	// TODO(borenet): Parallelize this?
-	allErrs := []error{}
-	for _, trigger := range triggers {
-		errs := []error{}
-		for _, fn := range t.funcs[trigger] {
-			if err := fn(ctx); err != nil {
-				errs = append(errs, err)
-			}
-		}
-		if len(errs) == 0 {
-			if err := deleteTriggerFile(t.triggerDir, trigger); err != nil {
-				allErrs = append(allErrs, err)
-			}
-			t.metrics.Reset(trigger)
-		} else {
-			allErrs = append(allErrs, errs...)
-		}
-	}
-	if err := t.metrics.Write(); err != nil {
-		allErrs = append(allErrs, err)
-	}
-	if len(allErrs) > 0 {
-		rvMsg := "Encountered errors running periodic triggers:"
-		for _, err := range allErrs {
-			rvMsg += fmt.Sprintf("\n%s", err)
-		}
-		return errors.New(rvMsg)
-	}
-	return nil
-}
-
-// Start running periodic triggers in a loop.
-func Start(ctx context.Context, workdir string, triggers map[string][]func(context.Context) error) error {
-	t, err := NewTriggerer(workdir)
-	if err != nil {
-		return err
-	}
-	for trigger, funcs := range triggers {
-		for _, fn := range funcs {
-			t.Register(trigger, fn)
-		}
-	}
-	lv := metrics2.NewLiveness("last_successful_periodic_trigger_loop")
-	go util.RepeatCtx(time.Minute, ctx, func() {
-		if err := t.RunPeriodicTriggers(ctx); err != nil {
-			sklog.Errorf("Failed to run periodic triggers: %s", err)
-		} else {
-			lv.Reset()
-		}
-	})
-	return nil
-}
diff --git a/go/periodic_triggers/periodic_triggers_test.go b/go/periodic_triggers/periodic_triggers_test.go
deleted file mode 100644
index 7d50e97..0000000
--- a/go/periodic_triggers/periodic_triggers_test.go
+++ /dev/null
@@ -1,45 +0,0 @@
-package periodic_triggers
-
-import (
-	"context"
-	"io/ioutil"
-	"os"
-	"path"
-	"testing"
-
-	assert "github.com/stretchr/testify/require"
-	"go.skia.org/infra/go/testutils"
-)
-
-func TestTriggers(t *testing.T) {
-	testutils.SmallTest(t)
-	ctx := context.Background()
-	wd, err := ioutil.TempDir("", "")
-	assert.NoError(t, err)
-	defer testutils.RemoveAll(t, wd)
-
-	p, err := NewTriggerer(wd)
-	assert.NoError(t, err)
-
-	// Add a periodic trigger.
-	ran := false
-	p.Register("test", func(ctx context.Context) error {
-		ran = true
-		return nil
-	})
-
-	// Run periodic triggers. The trigger file does not exist, so we
-	// shouldn't run the function.
-	assert.False(t, ran)
-	assert.NoError(t, p.RunPeriodicTriggers(ctx))
-	assert.False(t, ran)
-
-	// Write the trigger file. Cycle, ensure that the trigger file was
-	// removed and the periodic task was added.
-	triggerFile := path.Join(p.workdir, TRIGGER_DIRNAME, "test")
-	assert.NoError(t, ioutil.WriteFile(triggerFile, []byte{}, os.ModePerm))
-	assert.NoError(t, p.RunPeriodicTriggers(ctx))
-	assert.True(t, ran)
-	_, err = os.Stat(triggerFile)
-	assert.True(t, os.IsNotExist(err))
-}
diff --git a/periodic-trigger/Dockerfile b/periodic-trigger/Dockerfile
new file mode 100644
index 0000000..a722b40
--- /dev/null
+++ b/periodic-trigger/Dockerfile
@@ -0,0 +1,7 @@
+FROM gcr.io/skia-public/basedebian:testing-slim
+
+USER skia
+
+COPY . /
+
+ENTRYPOINT ["/usr/local/bin/periodic-trigger"]
diff --git a/periodic-trigger/Makefile b/periodic-trigger/Makefile
new file mode 100644
index 0000000..62dae53
--- /dev/null
+++ b/periodic-trigger/Makefile
@@ -0,0 +1,11 @@
+include ../go/skiaversion/skiaversion.mk
+include ../kube/kube.mk
+
+.PHONY: build
+build: skiaversion
+	GOOS=linux go install -a ./go/periodic-trigger
+
+.PHONY: push
+push: build pushk
+	./build_release
+	pushk --message="$(MESSAGE)" --cluster=skia-public periodic-trigger
diff --git a/periodic-trigger/build_release b/periodic-trigger/build_release
new file mode 100755
index 0000000..dd4be2a
--- /dev/null
+++ b/periodic-trigger/build_release
@@ -0,0 +1,15 @@
+#!/bin/bash
+APPNAME=periodic-trigger
+
+set -x -e
+
+# Copy files into the right locations in ${ROOT}.
+copy_release_files()
+{
+INSTALL="install -D --verbose --backup=none"
+INSTALL_DIR="install -d --verbose --backup=none"
+${INSTALL} --mode=644 -T ./Dockerfile ${ROOT}/Dockerfile
+${INSTALL} --mode=755 -T ${GOPATH}/bin/${APPNAME}         ${ROOT}/usr/local/bin/${APPNAME}
+}
+
+source ../bash/docker_build.sh
diff --git a/periodic-trigger/go/periodic-trigger/main.go b/periodic-trigger/go/periodic-trigger/main.go
new file mode 100644
index 0000000..daad560
--- /dev/null
+++ b/periodic-trigger/go/periodic-trigger/main.go
@@ -0,0 +1,43 @@
+package main
+
+import (
+	"context"
+	"flag"
+	"fmt"
+	"time"
+
+	"go.skia.org/infra/go/auth"
+	"go.skia.org/infra/go/common"
+	"go.skia.org/infra/go/periodic"
+	"go.skia.org/infra/go/skiaversion"
+	"go.skia.org/infra/go/sklog"
+)
+
+const (
+	// Template used for creating unique IDs for instances of triggers.
+	TRIGGER_TS = "2006-01-02"
+)
+
+var (
+	local   = flag.Bool("local", false, "Running locally if true. As opposed to in production.")
+	project = flag.String("project", "", "GCE project in which to publish the pub/sub message.")
+	trigger = flag.String("trigger", "", "Name of periodic trigger.")
+)
+
+func main() {
+	common.Init()
+	defer common.Defer()
+	skiaversion.MustLogVersion()
+	ts, err := auth.NewDefaultTokenSource(*local, periodic.AUTH_SCOPE)
+	if err != nil {
+		sklog.Fatal(err)
+	}
+	// TODO(borenet): This ID is not necessarily unique; if the cron job is
+	// significantly delayed, we might end up sending the same message twice
+	// with different dates. It also doesn't allow for periods smaller than
+	// 24 hours.
+	id := fmt.Sprintf("%s-%s", *trigger, time.Now().UTC().Format(TRIGGER_TS))
+	if err := periodic.Trigger(context.Background(), *trigger, id, ts); err != nil {
+		sklog.Fatal(err)
+	}
+}
diff --git a/task_scheduler/Makefile b/task_scheduler/Makefile
index d67a88a..fb8299d 100644
--- a/task_scheduler/Makefile
+++ b/task_scheduler/Makefile
@@ -60,14 +60,6 @@
 	go install -v ../push/go/pushcli
 	pushcli task-scheduler-db-backup skia-task-scheduler
 
-.PHONY: push_trigger
-push_trigger: all
-	./build_trigger_nightly_release "`git log -n1 --format=%s`"
-	./build_trigger_weekly_release "`git log -n1 --format=%s`"
-	go install -v ../push/go/pushcli
-	pushcli task-scheduler-trigger-nightly skia-task-scheduler
-	pushcli task-scheduler-trigger-weekly skia-task-scheduler
-
 .PHONY: vm_create_prod
 vm_create_prod:
 	go run ./vm.go --alsologtostderr --create --ignore-exists --instance=prod
diff --git a/task_scheduler/build_trigger_nightly_release b/task_scheduler/build_trigger_nightly_release
deleted file mode 100755
index d862000..0000000
--- a/task_scheduler/build_trigger_nightly_release
+++ /dev/null
@@ -1,20 +0,0 @@
-#!/bin/bash
-# Builds and uploads a debian package for task-scheduler-trigger-nightly.
-APPNAME=task-scheduler-trigger-nightly
-DESCRIPTION="Service which triggers nightly task-scheduler jobs."
-SYSTEMD="${APPNAME}.service ${APPNAME}.timer"
-SYSTEMD_TIMER=${APPNAME}.timer
-
-set -x -e
-
-# Copy files into the right locations in ${ROOT}.
-copy_release_files()
-{
-INSTALL="fakeroot install -D --verbose --backup=none --group=root --owner=root"
-INSTALL_DIR="fakeroot install -d --verbose --backup=none --group=root --owner=root"
-${INSTALL} --mode=644 -T ./sys/${APPNAME}.service ${ROOT}/etc/systemd/system/${APPNAME}.service
-${INSTALL} --mode=644 -T ./sys/${APPNAME}.timer   ${ROOT}/etc/systemd/system/${APPNAME}.timer
-${INSTALL_DIR} --mode=777                         ${ROOT}/mnt/pd0/task_scheduler_workdir/periodic-trigger
-}
-
-source ../bash/release.sh
diff --git a/task_scheduler/build_trigger_weekly_release b/task_scheduler/build_trigger_weekly_release
deleted file mode 100755
index 6ddbaa1..0000000
--- a/task_scheduler/build_trigger_weekly_release
+++ /dev/null
@@ -1,20 +0,0 @@
-#!/bin/bash
-# Builds and uploads a debian package for task-scheduler-trigger-weekly.
-APPNAME=task-scheduler-trigger-weekly
-DESCRIPTION="Service which triggers weekly task-scheduler jobs."
-SYSTEMD="${APPNAME}.service ${APPNAME}.timer"
-SYSTEMD_TIMER=${APPNAME}.timer
-
-set -x -e
-
-# Copy files into the right locations in ${ROOT}.
-copy_release_files()
-{
-INSTALL="fakeroot install -D --verbose --backup=none --group=root --owner=root"
-INSTALL_DIR="fakeroot install -d --verbose --backup=none --group=root --owner=root"
-${INSTALL} --mode=644 -T ./sys/${APPNAME}.service ${ROOT}/etc/systemd/system/${APPNAME}.service
-${INSTALL} --mode=644 -T ./sys/${APPNAME}.timer   ${ROOT}/etc/systemd/system/${APPNAME}.timer
-${INSTALL_DIR} --mode=777                         ${ROOT}/mnt/pd0/task_scheduler_workdir/periodic-trigger
-}
-
-source ../bash/release.sh
diff --git a/task_scheduler/create-task-scheduler-internal-sa.sh b/task_scheduler/create-task-scheduler-internal-sa.sh
new file mode 100755
index 0000000..7d96c1e
--- /dev/null
+++ b/task_scheduler/create-task-scheduler-internal-sa.sh
@@ -0,0 +1,21 @@
+#/bin/bash
+
+# Creates the service account used by Skia Task Scheduler Internal, and export a
+# key for it into the kubernetes cluster as a secret.
+
+set -e -x
+source ../kube/corp-config.sh
+source ../bash/ramdisk.sh
+
+# New service account we will create.
+SA_NAME="task-scheduler-internal"
+
+cd /tmp/ramdisk
+
+gcloud --project=${PROJECT_ID} iam service-accounts create "${SA_NAME}" --display-name="Service account for Skia Task Scheduler Internal"
+
+gcloud beta iam service-accounts keys create ${SA_NAME}.json --iam-account="${SA_NAME}@${PROJECT_SUBDOMAIN}.iam.gserviceaccount.com"
+
+kubectl create secret generic "${SA_NAME}" --from-file=key.json=${SA_NAME}.json
+
+cd -
diff --git a/task_scheduler/create-task-scheduler-sa.sh b/task_scheduler/create-task-scheduler-sa.sh
new file mode 100755
index 0000000..14b7694
--- /dev/null
+++ b/task_scheduler/create-task-scheduler-sa.sh
@@ -0,0 +1,21 @@
+#/bin/bash
+
+# Creates the service account used by Skia Task Scheduler, and export a key for
+# it into the kubernetes cluster as a secret.
+
+set -e -x
+source ../kube/config.sh
+source ../bash/ramdisk.sh
+
+# New service account we will create.
+SA_NAME="task-scheduler"
+
+cd /tmp/ramdisk
+
+gcloud --project=${PROJECT_ID} iam service-accounts create "${SA_NAME}" --display-name="Service account for Skia Task Scheduler"
+
+gcloud beta iam service-accounts keys create ${SA_NAME}.json --iam-account="${SA_NAME}@${PROJECT_SUBDOMAIN}.iam.gserviceaccount.com"
+
+kubectl create secret generic "${SA_NAME}" --from-file=key.json=${SA_NAME}.json
+
+cd -
diff --git a/task_scheduler/go/db/cache/cache.go b/task_scheduler/go/db/cache/cache.go
index 18d009e..5f79ce2 100644
--- a/task_scheduler/go/db/cache/cache.go
+++ b/task_scheduler/go/db/cache/cache.go
@@ -447,6 +447,10 @@
 	// the given RepoState. Does not search the underlying DB.
 	GetJobsByRepoState(string, types.RepoState) ([]*types.Job, error)
 
+	// GetMatchingJobsFromDateRange retrieves all jobs which were created
+	// in the given date range and match one of the given job names.
+	GetMatchingJobsFromDateRange(names []string, from time.Time, to time.Time) (map[string][]*types.Job, error)
+
 	// ScheduledJobsForCommit indicates whether or not we triggered any jobs
 	// for the given repo/commit.
 	ScheduledJobsForCommit(string, string) (bool, error)
@@ -514,6 +518,23 @@
 }
 
 // See documentation for JobCache interface.
+func (c *jobCache) GetMatchingJobsFromDateRange(names []string, from time.Time, to time.Time) (map[string][]*types.Job, error) {
+	c.mtx.RLock()
+	defer c.mtx.RUnlock()
+	m := make(map[string]bool, len(names))
+	for _, name := range names {
+		m[name] = true
+	}
+	rv := make(map[string][]*types.Job, len(names))
+	for _, job := range c.jobs {
+		if !from.After(job.Created) && job.Created.Before(to) && m[job.Name] {
+			rv[job.Name] = append(rv[job.Name], job)
+		}
+	}
+	return rv, nil
+}
+
+// See documentation for JobCache interface.
 func (c *jobCache) ScheduledJobsForCommit(repo, rev string) (bool, error) {
 	c.mtx.RLock()
 	defer c.mtx.RUnlock()
diff --git a/task_scheduler/go/db/cache/cache_test.go b/task_scheduler/go/db/cache/cache_test.go
index 5eb1972..d5da9ef 100644
--- a/task_scheduler/go/db/cache/cache_test.go
+++ b/task_scheduler/go/db/cache/cache_test.go
@@ -740,6 +740,19 @@
 			}
 		}
 		assert.True(t, found)
+
+		found = false
+		jobsByName, err := c.GetMatchingJobsFromDateRange([]string{job.Name}, job.Created, job.Created.Add(time.Nanosecond))
+		assert.NoError(t, err)
+		for _, jobsForName := range jobsByName {
+			for _, otherJob := range jobsForName {
+				if job.Id == otherJob.Id {
+					deepequal.AssertDeepEqual(t, job, otherJob)
+					found = true
+				}
+			}
+		}
+		assert.True(t, found)
 	}
 }
 
@@ -769,6 +782,18 @@
 				t.Fatalf("Found unexpected job %v in GetJobsByRepoState", job)
 			}
 		}
+
+		found := false
+		jobsByName, err := c.GetMatchingJobsFromDateRange([]string{job.Name}, time.Time{}, time.Now().Add(10*24*time.Hour))
+		assert.NoError(t, err)
+		for _, jobsForName := range jobsByName {
+			for _, otherJob := range jobsForName {
+				if job.Id == otherJob.Id {
+					found = true
+				}
+			}
+		}
+		assert.False(t, found)
 	}
 }
 
@@ -981,3 +1006,37 @@
 	_, err = grt("a.git", "aaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaa")
 	assert.EqualError(t, err, "Unknown commit a.git@aaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaa")
 }
+
+func TestJobCacheGetMatchingJobsFromDateRange(t *testing.T) {
+	testutils.SmallTest(t)
+
+	d := memory.NewInMemoryJobDB(nil)
+
+	// Pre-load a job into the DB.
+	startTime := time.Now().Add(-30 * time.Minute) // Arbitrary starting point.
+	j1 := types.MakeTestJob(startTime)
+	j2 := types.MakeTestJob(startTime)
+	j2.Name = "job2"
+	assert.NoError(t, d.PutJobs([]*types.Job{j1, j2}))
+
+	// Create the cache. Ensure that the existing job is present.
+	w, err := window.New(time.Hour, 0, nil)
+	assert.NoError(t, err)
+	c, err := NewJobCache(d, w, db.DummyGetRevisionTimestamp(j1.Created.Add(-1*time.Minute)))
+	assert.NoError(t, err)
+
+	test := func(names []string, start, end time.Time, expect ...*types.Job) {
+		expectByName := make(map[string][]*types.Job, len(expect))
+		for _, job := range expect {
+			expectByName[job.Name] = append(expectByName[job.Name], job)
+		}
+		jobs, err := c.GetMatchingJobsFromDateRange(names, start, end)
+		assert.NoError(t, err)
+		deepequal.AssertDeepEqual(t, expectByName, jobs)
+	}
+	test([]string{j1.Name, j2.Name}, time.Time{}, time.Now().Add(24*time.Hour), j1, j2)
+	test([]string{j1.Name, j2.Name}, j1.Created, j1.Created.Add(time.Nanosecond), j1, j2)
+	test([]string{j1.Name, j2.Name}, time.Time{}, j1.Created)
+	test([]string{j1.Name, j2.Name}, j1.Created.Add(time.Nanosecond), time.Now().Add(24*time.Hour))
+	test([]string{j1.Name}, j1.Created, j1.Created.Add(time.Nanosecond), j1)
+}
diff --git a/task_scheduler/go/scheduling/periodic.go b/task_scheduler/go/scheduling/periodic.go
deleted file mode 100644
index 70aa0be..0000000
--- a/task_scheduler/go/scheduling/periodic.go
+++ /dev/null
@@ -1,58 +0,0 @@
-package scheduling
-
-import (
-	"context"
-
-	"go.skia.org/infra/go/sklog"
-	"go.skia.org/infra/task_scheduler/go/specs"
-	"go.skia.org/infra/task_scheduler/go/types"
-)
-
-// Trigger all jobs with the given trigger name.
-func triggerPeriodicJobsWithName(ctx context.Context, s *TaskScheduler, trigger string) error {
-	// Obtain the TasksCfg at tip of master in each repo.
-	cfgs := make(map[types.RepoState]*specs.TasksCfg, len(s.repos))
-	for url, repo := range s.repos {
-		head := repo.Get("master")
-		rs := types.RepoState{
-			Repo:     url,
-			Revision: head.Hash,
-		}
-		cfg, err := s.taskCfgCache.ReadTasksCfg(ctx, rs)
-		if err != nil {
-			return err
-		}
-		cfgs[rs] = cfg
-	}
-	// Trigger the periodic tasks.
-	sklog.Infof("Triggering %s tasks", trigger)
-	jobs := []*types.Job{}
-	for rs, cfg := range cfgs {
-		for name, spec := range cfg.Jobs {
-			if spec.Trigger == trigger {
-				j, err := s.taskCfgCache.MakeJob(ctx, rs, name)
-				if err != nil {
-					return err
-				}
-				jobs = append(jobs, j)
-			}
-		}
-	}
-	return s.db.PutJobs(jobs)
-}
-
-// Register the nightly and weekly jobs to run.
-func (s *TaskScheduler) registerPeriodicTriggers() {
-	s.periodicTriggers.Register("nightly", func(ctx context.Context) error {
-		return triggerPeriodicJobsWithName(ctx, s, "nightly")
-	})
-	s.periodicTriggers.Register("weekly", func(ctx context.Context) error {
-		return triggerPeriodicJobsWithName(ctx, s, "weekly")
-	})
-}
-
-// triggerPeriodicJobs triggers jobs at HEAD of the master branch in each repo
-// for any files present in the trigger dir.
-func (s *TaskScheduler) triggerPeriodicJobs(ctx context.Context) error {
-	return s.periodicTriggers.RunPeriodicTriggers(ctx)
-}
diff --git a/task_scheduler/go/scheduling/task_scheduler.go b/task_scheduler/go/scheduling/task_scheduler.go
index cb379b4..411e2d1 100644
--- a/task_scheduler/go/scheduling/task_scheduler.go
+++ b/task_scheduler/go/scheduling/task_scheduler.go
@@ -20,7 +20,6 @@
 	"go.skia.org/infra/go/git/repograph"
 	"go.skia.org/infra/go/isolate"
 	"go.skia.org/infra/go/metrics2"
-	"go.skia.org/infra/go/periodic_triggers"
 	"go.skia.org/infra/go/sklog"
 	"go.skia.org/infra/go/swarming"
 	"go.skia.org/infra/go/timeout"
@@ -111,7 +110,6 @@
 	pendingInsert    map[string]bool
 	pendingInsertMtx sync.RWMutex
 
-	periodicTriggers *periodic_triggers.Triggerer
 	pools            []string
 	pubsubTopic      string
 	queue            []*taskCandidate // protected by queueMtx.
@@ -160,12 +158,6 @@
 	if err != nil {
 		return nil, fmt.Errorf("Failed to create TryJobIntegrator: %s", err)
 	}
-
-	pt, err := periodic_triggers.NewTriggerer(workdir)
-	if err != nil {
-		return nil, fmt.Errorf("Failed to create periodic triggers: %s", err)
-	}
-
 	s := &TaskScheduler{
 		bl:               bl,
 		busyBots:         newBusyBots(),
@@ -177,7 +169,6 @@
 		newTasks:         map[types.RepoState]util.StringSet{},
 		newTasksMtx:      sync.RWMutex{},
 		pendingInsert:    map[string]bool{},
-		periodicTriggers: pt,
 		pools:            pools,
 		pubsubTopic:      pubsubTopic,
 		queue:            []*taskCandidate{},
@@ -192,7 +183,6 @@
 		window:           w,
 		workdir:          workdir,
 	}
-	s.registerPeriodicTriggers()
 	return s, nil
 }
 
@@ -317,6 +307,92 @@
 	return s.taskCfgCache.RecentSpecsAndCommits()
 }
 
+// MaybeTriggerPeriodicJobs triggers all periodic jobs with the given trigger
+// name, if those jobs haven't already been triggered.
+func (s *TaskScheduler) MaybeTriggerPeriodicJobs(ctx context.Context, triggerName string) error {
+	// We'll search the jobs we've already triggered to ensure that we don't
+	// trigger the same jobs multiple times in a day/week/whatever. Search a
+	// window that is not quite the size of the trigger interval, to allow
+	// for lag time.
+	end := time.Now()
+	var start time.Time
+	if triggerName == specs.TRIGGER_NIGHTLY {
+		start = end.Add(-23 * time.Hour)
+	} else if triggerName == specs.TRIGGER_WEEKLY {
+		// Note that if the cache window is less than a week, this start
+		// time isn't going to work as expected. However, we only really
+		// expect to need to debounce periodic triggers for a short
+		// window, so anything longer than a few minutes would probably
+		// be enough, and the ~4 days we normally keep in the cache
+		// should be more than sufficient.
+		start = end.Add(-6 * 24 * time.Hour)
+	} else {
+		sklog.Warningf("Ignoring unknown periodic trigger %q", triggerName)
+		return nil
+	}
+
+	// Find the job specs matching the trigger and create Job instances.
+	jobs := []*types.Job{}
+	for repoUrl, repo := range s.repos {
+		master := repo.Get("master")
+		if master == nil {
+			return fmt.Errorf("Failed to retrieve branch 'master' for %s", repoUrl)
+		}
+		rs := types.RepoState{
+			Repo:     repoUrl,
+			Revision: master.Hash,
+		}
+		cfg, err := s.taskCfgCache.ReadTasksCfg(ctx, rs)
+		if err != nil {
+			return fmt.Errorf("Failed to retrieve TaskCfg from %s: %s", repoUrl, err)
+		}
+		for name, js := range cfg.Jobs {
+			if js.Trigger == triggerName {
+				job, err := s.taskCfgCache.MakeJob(ctx, rs, name)
+				if err != nil {
+					return fmt.Errorf("Failed to create job: %s", err)
+				}
+				jobs = append(jobs, job)
+			}
+		}
+	}
+	if len(jobs) == 0 {
+		return nil
+	}
+
+	// Filter out any jobs which we've already triggered. Generally, we'd
+	// expect to have triggered all of the jobs or none of them, but there
+	// might be circumstances which caused us to trigger a partial set.
+	names := make([]string, 0, len(jobs))
+	for _, job := range jobs {
+		names = append(names, job.Name)
+	}
+	existing, err := s.jCache.GetMatchingJobsFromDateRange(names, start, end)
+	if err != nil {
+		return err
+	}
+	jobsToInsert := make([]*types.Job, 0, len(jobs))
+	for _, job := range jobs {
+		existingJobs := existing[job.Name]
+		if len(existingJobs) == 0 {
+			jobsToInsert = append(jobsToInsert, job)
+		} else {
+			prev := existingJobs[0] // Pick an arbitrary pre-existing job for logging.
+			sklog.Warningf("Already triggered %d jobs for %s (eg. id %s at %s); not triggering again.", len(existingJobs), job.Name, prev.Id, prev.Created)
+		}
+	}
+	if len(jobsToInsert) == 0 {
+		return nil
+	}
+
+	// Insert the new jobs into the DB.
+	if err := s.db.PutJobs(jobsToInsert); err != nil {
+		return fmt.Errorf("Failed to add periodic jobs: %s", err)
+	}
+	sklog.Infof("Created %d periodic jobs for trigger %q", len(jobs), triggerName)
+	return nil
+}
+
 // TriggerJob adds the given Job to the database and returns its ID.
 func (s *TaskScheduler) TriggerJob(ctx context.Context, repo, commit, jobName string) (string, error) {
 	j, err := s.taskCfgCache.MakeJob(ctx, types.RepoState{
@@ -1289,11 +1365,6 @@
 		return err
 	}
 
-	// Also trigger any available periodic jobs.
-	if err := s.triggerPeriodicJobs(ctx); err != nil {
-		return err
-	}
-
 	return s.jCache.Update()
 }
 
diff --git a/task_scheduler/go/scheduling/task_scheduler_test.go b/task_scheduler/go/scheduling/task_scheduler_test.go
index 8ac2804..ad14c96 100644
--- a/task_scheduler/go/scheduling/task_scheduler_test.go
+++ b/task_scheduler/go/scheduling/task_scheduler_test.go
@@ -26,7 +26,6 @@
 	"go.skia.org/infra/go/isolate"
 	metrics2_testutils "go.skia.org/infra/go/metrics2/testutils"
 	"go.skia.org/infra/go/mockhttpclient"
-	"go.skia.org/infra/go/periodic_triggers"
 	"go.skia.org/infra/go/swarming"
 	"go.skia.org/infra/go/testutils"
 	"go.skia.org/infra/go/util"
@@ -195,7 +194,6 @@
 	tmp, err := ioutil.TempDir("", "")
 	assert.NoError(t, err)
 
-	assert.NoError(t, os.Mkdir(path.Join(tmp, periodic_triggers.TRIGGER_DIRNAME), os.ModePerm))
 	d := memory.NewInMemoryDB(nil)
 	isolateClient, err := isolate.NewClient(tmp, isolate.ISOLATE_SERVER_URL_FAKE)
 	assert.NoError(t, err)
@@ -1976,7 +1974,6 @@
 	gb := git_testutils.GitInit(t, ctx)
 	workdir, err := ioutil.TempDir("", "")
 	assert.NoError(t, err)
-	assert.NoError(t, os.Mkdir(path.Join(workdir, periodic_triggers.TRIGGER_DIRNAME), os.ModePerm))
 
 	assert.NoError(t, ioutil.WriteFile(path.Join(workdir, ".gclient"), []byte("dummy"), os.ModePerm))
 	infraBotsSubDir := path.Join("infra", "bots")
@@ -2680,17 +2677,25 @@
 	defer cleanup()
 
 	// Rewrite tasks.json with a periodic job.
-	name := "Periodic-Task"
+	nightlyName := "Nightly-Job"
+	weeklyName := "Weekly-Job"
+	names := []string{nightlyName, weeklyName}
+	taskName := "Periodic-Task"
 	cfg := &specs.TasksCfg{
 		Jobs: map[string]*specs.JobSpec{
-			"Periodic-Job": {
+			nightlyName: {
 				Priority:  1.0,
-				TaskSpecs: []string{name},
-				Trigger:   "nightly",
+				TaskSpecs: []string{taskName},
+				Trigger:   specs.TRIGGER_NIGHTLY,
+			},
+			weeklyName: {
+				Priority:  1.0,
+				TaskSpecs: []string{taskName},
+				Trigger:   specs.TRIGGER_WEEKLY,
 			},
 		},
 		Tasks: map[string]*specs.TaskSpec{
-			name: {
+			taskName: {
 				CipdPackages: []*specs.CipdPackage{},
 				Dependencies: []string{},
 				Dimensions: []string{
@@ -2708,25 +2713,54 @@
 	}
 	gb.Add(ctx, specs.TASKS_CFG_FILE, testutils.MarshalJSON(t, &cfg))
 	gb.Commit(ctx)
-
-	// Cycle, ensure that the periodic task is not added.
 	assert.NoError(t, s.MainLoop(ctx))
-	assert.NoError(t, s.jCache.Update())
-	unfinished, err := s.jCache.UnfinishedJobs()
-	assert.NoError(t, err)
-	assert.Equal(t, 5, len(unfinished)) // Existing per-commit jobs.
 
-	// Write the trigger file. Cycle, ensure that the trigger file was
-	// removed and the periodic task was added.
-	triggerFile := path.Join(s.workdir, periodic_triggers.TRIGGER_DIRNAME, "nightly")
-	assert.NoError(t, ioutil.WriteFile(triggerFile, []byte{}, os.ModePerm))
-	assert.NoError(t, s.MainLoop(ctx))
-	_, err = os.Stat(triggerFile)
-	assert.True(t, os.IsNotExist(err))
+	// Trigger the periodic jobs. Make sure that we inserted the new Job.
+	assert.NoError(t, s.MaybeTriggerPeriodicJobs(ctx, specs.TRIGGER_NIGHTLY))
 	assert.NoError(t, s.jCache.Update())
-	unfinished, err = s.jCache.UnfinishedJobs()
+	start := time.Now().Add(-10 * time.Minute)
+	end := time.Now().Add(10 * time.Minute)
+	jobs, err := s.jCache.GetMatchingJobsFromDateRange(names, start, end)
 	assert.NoError(t, err)
-	assert.Equal(t, 6, len(unfinished))
+	assert.Equal(t, 1, len(jobs[nightlyName]))
+	assert.Equal(t, nightlyName, jobs[nightlyName][0].Name)
+	assert.Equal(t, 0, len(jobs[weeklyName]))
+
+	// Ensure that we don't trigger another.
+	assert.NoError(t, s.MaybeTriggerPeriodicJobs(ctx, specs.TRIGGER_NIGHTLY))
+	assert.NoError(t, s.jCache.Update())
+	jobs, err = s.jCache.GetMatchingJobsFromDateRange(names, start, end)
+	assert.NoError(t, err)
+	assert.Equal(t, 1, len(jobs[nightlyName]))
+	assert.Equal(t, 0, len(jobs[weeklyName]))
+
+	// Hack the old Job's created time to simulate it scrolling out of the
+	// window.
+	oldJob := jobs[nightlyName][0]
+	oldJob.Created = start.Add(-23 * time.Hour)
+	assert.NoError(t, s.db.PutJob(oldJob))
+	assert.NoError(t, s.jCache.Update())
+	jobs, err = s.jCache.GetMatchingJobsFromDateRange(names, start, end)
+	assert.NoError(t, err)
+	assert.Equal(t, 0, len(jobs[nightlyName]))
+	assert.Equal(t, 0, len(jobs[weeklyName]))
+	assert.NoError(t, s.MaybeTriggerPeriodicJobs(ctx, specs.TRIGGER_NIGHTLY))
+	assert.NoError(t, s.jCache.Update())
+	jobs, err = s.jCache.GetMatchingJobsFromDateRange(names, start, end)
+	assert.NoError(t, err)
+	assert.Equal(t, 1, len(jobs[nightlyName]))
+	assert.Equal(t, nightlyName, jobs[nightlyName][0].Name)
+	assert.Equal(t, 0, len(jobs[weeklyName]))
+
+	// Make sure we don't confuse different triggers.
+	assert.NoError(t, s.MaybeTriggerPeriodicJobs(ctx, specs.TRIGGER_WEEKLY))
+	assert.NoError(t, s.jCache.Update())
+	jobs, err = s.jCache.GetMatchingJobsFromDateRange(names, start, end)
+	assert.NoError(t, err)
+	assert.Equal(t, 1, len(jobs[nightlyName]))
+	assert.Equal(t, nightlyName, jobs[nightlyName][0].Name)
+	assert.Equal(t, 1, len(jobs[weeklyName]))
+	assert.Equal(t, weeklyName, jobs[weeklyName][0].Name)
 }
 
 func TestUpdateUnfinishedTasks(t *testing.T) {
diff --git a/task_scheduler/go/specs/specs.go b/task_scheduler/go/specs/specs.go
index dbed268..31eea43 100644
--- a/task_scheduler/go/specs/specs.go
+++ b/task_scheduler/go/specs/specs.go
@@ -19,6 +19,7 @@
 	"go.skia.org/infra/go/git"
 	"go.skia.org/infra/go/git/repograph"
 	"go.skia.org/infra/go/metrics2"
+	"go.skia.org/infra/go/periodic"
 	"go.skia.org/infra/go/sklog"
 	"go.skia.org/infra/go/util"
 	"go.skia.org/infra/task_scheduler/go/types"
@@ -44,12 +45,12 @@
 	// others.
 	TRIGGER_MASTER_ONLY = "master"
 	// Trigger this job every night.
-	TRIGGER_NIGHTLY = "nightly"
+	TRIGGER_NIGHTLY = periodic.TRIGGER_NIGHTLY
 	// Don't trigger this job automatically. It will only be run when
 	// explicitly triggered via a try job or a force trigger.
 	TRIGGER_ON_DEMAND = "on demand"
 	// Trigger this job weekly.
-	TRIGGER_WEEKLY = "weekly"
+	TRIGGER_WEEKLY = periodic.TRIGGER_WEEKLY
 
 	VARIABLE_SYNTAX = "<(%s)"
 
diff --git a/task_scheduler/go/task_scheduler/main.go b/task_scheduler/go/task_scheduler/main.go
index f5617ea..acca858 100644
--- a/task_scheduler/go/task_scheduler/main.go
+++ b/task_scheduler/go/task_scheduler/main.go
@@ -29,6 +29,7 @@
 	"go.skia.org/infra/go/isolate"
 	"go.skia.org/infra/go/login"
 	"go.skia.org/infra/go/metadata"
+	"go.skia.org/infra/go/periodic"
 	"go.skia.org/infra/go/skiaversion"
 	"go.skia.org/infra/go/sklog"
 	"go.skia.org/infra/go/swarming"
@@ -545,7 +546,6 @@
 		httputils.ReportError(w, r, err, fmt.Sprintf("Failed to write response: %s", err))
 		return
 	}
-
 }
 
 func runServer(serverURL string, taskDb db.RemoteDB) {
@@ -762,6 +762,17 @@
 		}
 	})
 
+	// Set up periodic triggers.
+	if err := periodic.Listen(ctx, fmt.Sprintf("task-scheduler-%s", *pubsubTopicSet), tokenSource, func(ctx context.Context, name, id string) bool {
+		if err := ts.MaybeTriggerPeriodicJobs(ctx, name); err != nil {
+			sklog.Errorf("Failed to trigger periodic jobs; will retry later: %s", err)
+			return false // We will retry later.
+		}
+		return true
+	}); err != nil {
+		sklog.Fatal(err)
+	}
+
 	// Start up the web server.
 	login.SimpleInitMust(*port, *local)
 
diff --git a/task_scheduler/sys/task-scheduler-trigger-nightly.service b/task_scheduler/sys/task-scheduler-trigger-nightly.service
deleted file mode 100644
index 2e2053d..0000000
--- a/task_scheduler/sys/task-scheduler-trigger-nightly.service
+++ /dev/null
@@ -1,16 +0,0 @@
-[Unit]
-Description=Trigger Task Scheduler Nightly Jobs
-Requires=mnt-pd0.mount
-Wants=network-online.target
-After=mnt-pd0.mount network-online.target
-
-[Service]
-Type=oneshot
-# Touch "workdir/triggerdir/trigger", where:
-#  - workdir is --workdir argument in task-scheduler.service
-#  - triggerdir is scheduling.TRIGGER_DIRNAME in periodic.go
-#  - trigger selects Jobs by the value of JobSpec.Trigger
-ExecStart=/usr/bin/touch /mnt/pd0/task_scheduler_workdir/periodic-trigger/nightly
-User=default
-Group=default
-LimitNOFILE=10000
diff --git a/task_scheduler/sys/task-scheduler-trigger-nightly.timer b/task_scheduler/sys/task-scheduler-trigger-nightly.timer
deleted file mode 100644
index 1042e40..0000000
--- a/task_scheduler/sys/task-scheduler-trigger-nightly.timer
+++ /dev/null
@@ -1,9 +0,0 @@
-[Unit]
-Description=Run task-scheduler-trigger-nightly.service nightly.
-
-[Timer]
-OnCalendar=*-*-* 05:00:00
-Persistent=false
-
-[Install]
-WantedBy=timers.target
diff --git a/task_scheduler/sys/task-scheduler-trigger-weekly.service b/task_scheduler/sys/task-scheduler-trigger-weekly.service
deleted file mode 100644
index 04599b9..0000000
--- a/task_scheduler/sys/task-scheduler-trigger-weekly.service
+++ /dev/null
@@ -1,16 +0,0 @@
-[Unit]
-Description=Trigger Task Scheduler Weekly Jobs
-Requires=mnt-pd0.mount
-Wants=network-online.target
-After=mnt-pd0.mount network-online.target
-
-[Service]
-Type=oneshot
-# Touch "workdir/triggerdir/trigger", where:
-#  - workdir is --workdir argument in task-scheduler.service
-#  - triggerdir is scheduling.TRIGGER_DIRNAME in periodic.go
-#  - trigger selects Jobs by the value of JobSpec.Trigger
-ExecStart=/usr/bin/touch /mnt/pd0/task_scheduler_workdir/periodic-trigger/weekly
-User=default
-Group=default
-LimitNOFILE=10000
diff --git a/task_scheduler/sys/task-scheduler-trigger-weekly.timer b/task_scheduler/sys/task-scheduler-trigger-weekly.timer
deleted file mode 100644
index 623be21..0000000
--- a/task_scheduler/sys/task-scheduler-trigger-weekly.timer
+++ /dev/null
@@ -1,9 +0,0 @@
-[Unit]
-Description=Run task-scheduler-trigger-weekly.service weekly.
-
-[Timer]
-OnCalendar=Sun *-*-* 05:00:00
-Persistent=false
-
-[Install]
-WantedBy=timers.target