|  | package pubsub | 
|  |  | 
|  | import ( | 
|  | "context" | 
|  | "sync" | 
|  | "time" | 
|  |  | 
|  | "cloud.google.com/go/pubsub" | 
|  | "go.skia.org/infra/go/git/repograph" | 
|  | "go.skia.org/infra/go/gitstore" | 
|  | "go.skia.org/infra/go/gitstore/bt_gitstore" | 
|  | "go.skia.org/infra/go/skerr" | 
|  | "go.skia.org/infra/go/sklog" | 
|  | "golang.org/x/oauth2" | 
|  | ) | 
|  |  | 
|  | // AutoUpdateMap is a wrapper around repograph.Map which provides a convenience | 
|  | // method for auto-updating the Graphs in the Map. | 
|  | type AutoUpdateMap struct { | 
|  | btConf  *bt_gitstore.BTConfig | 
|  | Map     repograph.Map | 
|  | repoIDs map[string]int64 | 
|  | } | 
|  |  | 
|  | // NewBTGitStoreMap is a wrapper around bt_gitstore.NewBTGitStoreMap which | 
|  | // provides a convenience method for auto-updating the Graphs in the Map. | 
|  | func NewAutoUpdateMap(ctx context.Context, repoUrls []string, btConf *bt_gitstore.BTConfig) (*AutoUpdateMap, error) { | 
|  | rv := &AutoUpdateMap{ | 
|  | btConf:  btConf, | 
|  | Map:     make(map[string]*repograph.Graph, len(repoUrls)), | 
|  | repoIDs: make(map[string]int64, len(repoUrls)), | 
|  | } | 
|  | for _, repoUrl := range repoUrls { | 
|  | gs, err := bt_gitstore.New(ctx, btConf, repoUrl) | 
|  | if err != nil { | 
|  | return nil, skerr.Wrapf(err, "Failed to create GitStore for %s", repoUrl) | 
|  | } | 
|  | graph, err := gitstore.GetRepoGraph(ctx, gs) | 
|  | if err != nil { | 
|  | return nil, skerr.Wrapf(err, "Failed to create Graph from GitStore for %s", repoUrl) | 
|  | } | 
|  | rv.Map[repoUrl] = graph | 
|  | rv.repoIDs[repoUrl] = gs.RepoID | 
|  | } | 
|  | return rv, nil | 
|  | } | 
|  |  | 
|  | // Start initializes auto-updating of the AutoUpdateMap. | 
|  | func (m *AutoUpdateMap) Start(ctx context.Context, subscriberID string, ts oauth2.TokenSource, fallbackInterval time.Duration, callback AutoUpdateMapCallback) error { | 
|  | for repoUrl, graph := range m.Map { | 
|  | // https://golang.org/doc/faq#closures_and_goroutines | 
|  | repoUrl := repoUrl | 
|  | graph := graph | 
|  | err := updateUsingPubSub(ctx, m.btConf, subscriberID, m.repoIDs[repoUrl], graph, ts, fallbackInterval, func(ctx context.Context, g *repograph.Graph, ack, nack func()) error { | 
|  | return callback(ctx, repoUrl, g, ack, nack) | 
|  | }) | 
|  | if err != nil { | 
|  | return skerr.Wrap(err) | 
|  | } | 
|  | } | 
|  | return nil | 
|  | } | 
|  |  | 
|  | // AutoUpdateCallback is a callback function used in UpdateUsingPubSub which is | 
|  | // called after the Graph is updated but before the changes are committed. If | 
|  | // the callback returns an error, the changes are not committed. In addition to | 
|  | // the Graph itself, the callback accepts two functions as parameters, ack and | 
|  | // nack, which in turn call the Ack() or Nack() functions on the pubsub | 
|  | // message(s) which triggered the update. The callback function should call one | 
|  | // of them to ensure that the message(s) get redelivered or not, as desired. | 
|  | type AutoUpdateCallback func(ctx context.Context, g *repograph.Graph, ack, nack func()) error | 
|  |  | 
|  | // AutoUpdateMapCallback is like AutoUpdateCallback, except that it's handed to | 
|  | // NewAutoUpdateMap.Start() and also includes the repo URL. | 
|  | type AutoUpdateMapCallback func(ctx context.Context, repoUrl string, g *repograph.Graph, ack, nack func()) error | 
|  |  | 
|  | // updateUsingPubSub updates the passed-in Graph whenever a pubsub message is | 
|  | // received for the given repo and at the given fallback interval. It calls the | 
|  | // given callback function after the Graph updates but before the changes are | 
|  | // committed. | 
|  | func updateUsingPubSub(ctx context.Context, btConf *bt_gitstore.BTConfig, subscriberID string, repoID int64, graph *repograph.Graph, ts oauth2.TokenSource, fallbackInterval time.Duration, callback AutoUpdateCallback) error { | 
|  | var ticker *time.Ticker | 
|  | var tickCh <-chan time.Time | 
|  | if fallbackInterval > 0 { | 
|  | ticker = time.NewTicker(fallbackInterval) | 
|  | go func() { | 
|  | <-ctx.Done() | 
|  | ticker.Stop() | 
|  | }() | 
|  | tickCh = ticker.C | 
|  | } | 
|  | _, err := updateUsingPubSubHelper(ctx, btConf, subscriberID, repoID, graph, ts, tickCh, callback) | 
|  | if err != nil && ticker != nil { | 
|  | ticker.Stop() | 
|  | } | 
|  | return err | 
|  | } | 
|  |  | 
|  | // ackFn returns a function which Acks the message. | 
|  | func ackFn(msg *pubsub.Message) func() { | 
|  | if msg == nil { | 
|  | return func() {} | 
|  | } | 
|  | return msg.Ack | 
|  | } | 
|  |  | 
|  | // nackFn returns a function which Nacks all of the messages in the batch. | 
|  | func nackFn(msg *pubsub.Message) func() { | 
|  | if msg == nil { | 
|  | return func() {} | 
|  | } | 
|  | return msg.Nack | 
|  | } | 
|  |  | 
|  | // updateUsingPubSubHelper is a helper function used by UpdateUsingPubSub, for | 
|  | // testing. In addition to any error, returns a func which waits for all | 
|  | // spawned goroutines to exit. | 
|  | func updateUsingPubSubHelper(ctx context.Context, btConf *bt_gitstore.BTConfig, subscriberID string, repoID int64, graph *repograph.Graph, ts oauth2.TokenSource, tickCh <-chan time.Time, callback AutoUpdateCallback) (func(), error) { | 
|  | var wg sync.WaitGroup | 
|  | var mtx sync.Mutex | 
|  | doUpdate := func(ctx context.Context, msg *pubsub.Message) { | 
|  | mtx.Lock() | 
|  | defer mtx.Unlock() | 
|  | err := graph.UpdateWithCallback(ctx, func(g *repograph.Graph) error { | 
|  | return callback(ctx, g, ackFn(msg), nackFn(msg)) | 
|  | }) | 
|  | if err != nil { | 
|  | sklog.Errorf("Failed to update repo: %s", err) | 
|  | } | 
|  | } | 
|  |  | 
|  | // Create the PubSub subscription. | 
|  | err := NewSubscriber(ctx, btConf, subscriberID, repoID, ts, func(msg *pubsub.Message, branches map[string]string) { | 
|  | doUpdate(ctx, msg) | 
|  | }) | 
|  | if err != nil { | 
|  | return nil, skerr.Wrapf(err, "Failed to create auto-updating repograph.Graph") | 
|  | } | 
|  |  | 
|  | // Spin up a goroutine to update the Graph at the specified fallback | 
|  | // interval. | 
|  | if tickCh != nil { | 
|  | wg.Add(1) | 
|  | go func() { | 
|  | defer func() { | 
|  | wg.Done() | 
|  | }() | 
|  | for { | 
|  | select { | 
|  | case <-ctx.Done(): | 
|  | return | 
|  | case <-tickCh: | 
|  | doUpdate(ctx, nil) | 
|  | } | 
|  | } | 
|  | }() | 
|  | } | 
|  | return wg.Wait, nil | 
|  | } |