blob: b22d3a0c9c3161260fb32c378497442c83b9f309 [file] [log] [blame]
package pubsub
import (
"context"
"sync"
"time"
"cloud.google.com/go/pubsub"
"go.skia.org/infra/go/git/repograph"
"go.skia.org/infra/go/gitstore"
"go.skia.org/infra/go/gitstore/bt_gitstore"
"go.skia.org/infra/go/skerr"
"go.skia.org/infra/go/sklog"
"golang.org/x/oauth2"
)
// AutoUpdateMap is a wrapper around repograph.Map which provides a convenience
// method for auto-updating the Graphs in the Map.
type AutoUpdateMap struct {
btConf *bt_gitstore.BTConfig
Map repograph.Map
repoIDs map[string]int64
}
// NewBTGitStoreMap is a wrapper around bt_gitstore.NewBTGitStoreMap which
// provides a convenience method for auto-updating the Graphs in the Map.
func NewAutoUpdateMap(ctx context.Context, repoUrls []string, btConf *bt_gitstore.BTConfig) (*AutoUpdateMap, error) {
rv := &AutoUpdateMap{
btConf: btConf,
Map: make(map[string]*repograph.Graph, len(repoUrls)),
repoIDs: make(map[string]int64, len(repoUrls)),
}
for _, repoUrl := range repoUrls {
gs, err := bt_gitstore.New(ctx, btConf, repoUrl)
if err != nil {
return nil, skerr.Wrapf(err, "Failed to create GitStore for %s", repoUrl)
}
graph, err := gitstore.GetRepoGraph(ctx, gs)
if err != nil {
return nil, skerr.Wrapf(err, "Failed to create Graph from GitStore for %s", repoUrl)
}
rv.Map[repoUrl] = graph
rv.repoIDs[repoUrl] = gs.RepoID
}
return rv, nil
}
// Start initializes auto-updating of the AutoUpdateMap.
func (m *AutoUpdateMap) Start(ctx context.Context, subscriberID string, ts oauth2.TokenSource, fallbackInterval time.Duration, callback AutoUpdateMapCallback) error {
for repoUrl, graph := range m.Map {
// https://golang.org/doc/faq#closures_and_goroutines
repoUrl := repoUrl
graph := graph
err := updateUsingPubSub(ctx, m.btConf, subscriberID, m.repoIDs[repoUrl], graph, ts, fallbackInterval, func(ctx context.Context, g *repograph.Graph, ack, nack func()) error {
return callback(ctx, repoUrl, g, ack, nack)
})
if err != nil {
return skerr.Wrap(err)
}
}
return nil
}
// AutoUpdateCallback is a callback function used in UpdateUsingPubSub which is
// called after the Graph is updated but before the changes are committed. If
// the callback returns an error, the changes are not committed. In addition to
// the Graph itself, the callback accepts two functions as parameters, ack and
// nack, which in turn call the Ack() or Nack() functions on the pubsub
// message(s) which triggered the update. The callback function should call one
// of them to ensure that the message(s) get redelivered or not, as desired.
type AutoUpdateCallback func(ctx context.Context, g *repograph.Graph, ack, nack func()) error
// AutoUpdateMapCallback is like AutoUpdateCallback, except that it's handed to
// NewAutoUpdateMap.Start() and also includes the repo URL.
type AutoUpdateMapCallback func(ctx context.Context, repoUrl string, g *repograph.Graph, ack, nack func()) error
// updateUsingPubSub updates the passed-in Graph whenever a pubsub message is
// received for the given repo and at the given fallback interval. It calls the
// given callback function after the Graph updates but before the changes are
// committed.
func updateUsingPubSub(ctx context.Context, btConf *bt_gitstore.BTConfig, subscriberID string, repoID int64, graph *repograph.Graph, ts oauth2.TokenSource, fallbackInterval time.Duration, callback AutoUpdateCallback) error {
var ticker *time.Ticker
var tickCh <-chan time.Time
if fallbackInterval > 0 {
ticker = time.NewTicker(fallbackInterval)
go func() {
<-ctx.Done()
ticker.Stop()
}()
tickCh = ticker.C
}
_, err := updateUsingPubSubHelper(ctx, btConf, subscriberID, repoID, graph, ts, tickCh, callback)
if err != nil && ticker != nil {
ticker.Stop()
}
return err
}
// ackFn returns a function which Acks the message.
func ackFn(msg *pubsub.Message) func() {
if msg == nil {
return func() {}
}
return msg.Ack
}
// nackFn returns a function which Nacks all of the messages in the batch.
func nackFn(msg *pubsub.Message) func() {
if msg == nil {
return func() {}
}
return msg.Nack
}
// updateUsingPubSubHelper is a helper function used by UpdateUsingPubSub, for
// testing. In addition to any error, returns a func which waits for all
// spawned goroutines to exit.
func updateUsingPubSubHelper(ctx context.Context, btConf *bt_gitstore.BTConfig, subscriberID string, repoID int64, graph *repograph.Graph, ts oauth2.TokenSource, tickCh <-chan time.Time, callback AutoUpdateCallback) (func(), error) {
var wg sync.WaitGroup
var mtx sync.Mutex
doUpdate := func(ctx context.Context, msg *pubsub.Message) {
mtx.Lock()
defer mtx.Unlock()
err := graph.UpdateWithCallback(ctx, func(g *repograph.Graph) error {
return callback(ctx, g, ackFn(msg), nackFn(msg))
})
if err != nil {
sklog.Errorf("Failed to update repo: %s", err)
}
}
// Create the PubSub subscription.
err := NewSubscriber(ctx, btConf, subscriberID, repoID, ts, func(msg *pubsub.Message, branches map[string]string) {
doUpdate(ctx, msg)
})
if err != nil {
return nil, skerr.Wrapf(err, "Failed to create auto-updating repograph.Graph")
}
// Spin up a goroutine to update the Graph at the specified fallback
// interval.
if tickCh != nil {
wg.Add(1)
go func() {
defer func() {
wg.Done()
}()
for {
select {
case <-ctx.Done():
return
case <-tickCh:
doUpdate(ctx, nil)
}
}
}()
}
return wg.Wait, nil
}