blob: b827e9ffb126b327eafbac0ee73134baf554f4f1 [file] [log] [blame]
package periodic
import (
"context"
"fmt"
"time"
"cloud.google.com/go/pubsub"
"go.skia.org/infra/go/sklog"
"go.skia.org/infra/go/util"
"golang.org/x/oauth2"
"google.golang.org/api/option"
)
const (
// Authentication scope required for periodic triggers.
AUTH_SCOPE = pubsub.ScopePubSub
// PubSub topic used for periodic triggers. A single topic is used for
// all triggers, with each message containing an attribute indicating
// which trigger promted it.
PUBSUB_TOPIC = "periodic-trigger"
// Attribute sent with all pubsub messages; the name of the periodic
// trigger which prompted the message.
PUBSUB_ATTR_TRIGGER_NAME = "trigger"
// Attribute sent with all pubsub messages; the unique ID of the call
// to Trigger() which prompted the message.
PUBSUB_ATTR_TRIGGER_ID = "id"
// Acknowledgement deadline for pubsub messages; all TriggerCallbackFns
// must be faster than this deadline. If this is changed, all
// subscriptions will need to be deleted and recreated.
PUBSUB_ACK_DEADLINE = 5 * time.Minute
// Google Cloud project name used for pubsub.
PUBSUB_PROJECT = "skia-public"
// Names of periodic triggers.
TRIGGER_NIGHTLY = "nightly"
TRIGGER_WEEKLY = "weekly"
)
var (
VALID_TRIGGERS = []string{TRIGGER_NIGHTLY, TRIGGER_WEEKLY}
)
// TriggerCallbackFn is a function called when handling requests for periodic
// triggers. The string parameters are the name of the periodic trigger and the
// unique ID of the call to Trigger() which generated the message. The return
// value determines whether or not the pubsub message should be ACK'd. If the
// TriggerCallbackFn returns false, it may be called again. TriggerCallbackFns
// must finish within the PUBSUB_ACK_DEADLINE.
type TriggerCallbackFn func(context.Context, string, string) bool
// validateTrigger returns an error if the given trigger name is not valid.
func validateTrigger(triggerName string) error {
if !util.In(triggerName, VALID_TRIGGERS) {
return fmt.Errorf("Invalid trigger name %q", triggerName)
}
return nil
}
// validateId returns an error if the valid trigger ID is not valid.
func validateId(triggerId string) error {
if triggerId == "" {
return fmt.Errorf("Invalid trigger ID %q", triggerId)
}
return nil
}
// setup is a helper function which returns the pubsub client and topic,
// creating the topic if necessary.
func setup(ctx context.Context, ts oauth2.TokenSource) (*pubsub.Client, *pubsub.Topic, error) {
c, err := pubsub.NewClient(ctx, PUBSUB_PROJECT, option.WithTokenSource(ts))
if err != nil {
return nil, nil, err
}
t := c.Topic(PUBSUB_TOPIC)
exists, err := t.Exists(ctx)
if err != nil {
return nil, nil, err
}
if !exists {
if _, err := c.CreateTopic(ctx, PUBSUB_TOPIC); err != nil {
return nil, nil, err
}
}
return c, t, nil
}
// Listen creates a background goroutine which listens for pubsub messages for
// periodic triggers. The subscriber name is used as part of the pubsub
// subscription ID; if there are multiple instances of a server which all need
// to receive every message, they should use different subscriber names.
func Listen(ctx context.Context, subscriberName string, ts oauth2.TokenSource, cb TriggerCallbackFn) error {
c, t, err := setup(ctx, ts)
if err != nil {
return err
}
subId := PUBSUB_TOPIC + "+" + subscriberName
sub := c.Subscription(subId)
exists, err := sub.Exists(ctx)
if err != nil {
return err
}
if !exists {
if _, err := c.CreateSubscription(ctx, subId, pubsub.SubscriptionConfig{
Topic: t,
AckDeadline: PUBSUB_ACK_DEADLINE + 10*time.Second,
}); err != nil {
return err
}
}
// Start the receiving goroutine.
go func() {
if err := sub.Receive(ctx, func(ctx context.Context, m *pubsub.Message) {
triggerName := m.Attributes[PUBSUB_ATTR_TRIGGER_NAME]
if err := validateTrigger(triggerName); err != nil {
sklog.Errorf("Received invalid pubsub message: %s", err)
m.Ack()
}
triggerId := m.Attributes[PUBSUB_ATTR_TRIGGER_ID]
if err := validateId(triggerId); err != nil {
sklog.Errorf("Received invalid pubsub message: %s", err)
m.Ack()
}
if cb(ctx, triggerName, triggerId) {
m.Ack()
} else {
m.Nack()
}
}); err != nil {
sklog.Errorf("Pubsub subscription receive failed: %s", err)
}
}()
return nil
}
// Send a pubsub message for the given periodic trigger. The triggerId may be
// used for de-duplication on the subscriber side and should therefore be unique
// for each invocation of Trigger.
func Trigger(ctx context.Context, triggerName, triggerId string, ts oauth2.TokenSource) error {
if err := validateTrigger(triggerName); err != nil {
return err
}
if err := validateId(triggerId); err != nil {
return err
}
_, t, err := setup(ctx, ts)
if err != nil {
return err
}
_, err = t.Publish(ctx, &pubsub.Message{
Attributes: map[string]string{
PUBSUB_ATTR_TRIGGER_NAME: triggerName,
PUBSUB_ATTR_TRIGGER_ID: triggerId,
},
}).Get(ctx)
return err
}