blob: 5bd09115bb483b556d307aa610605042d1e28168 [file] [log] [blame]
package event
import (
"crypto/md5"
"fmt"
"strings"
"sync"
"go.skia.org/infra/go/eventbus"
"go.skia.org/infra/go/sklog"
"go.skia.org/infra/go/util"
)
const (
GLOBAL_BUILDBOT = "global-buildbot-event"
GLOBAL_GOOGLE_STORAGE = "global-google-storage-event"
)
func init() {
// GLOBAL_GOOGLE_STORAGE even will be fired with an instance of GoogleStorageEventData
eventbus.RegisterGlobalEvent(GLOBAL_GOOGLE_STORAGE, util.JSONCodec(&GoogleStorageEventData{}))
eventbus.RegisterGlobalEvent(GLOBAL_BUILDBOT, util.JSONCodec(&BuildbotEventData{}))
}
func StorageEvent(bucket, prefix string) string {
// Generate a unique topic name. This is also necessary because bucket and prefix values
// can contain many more different characters than event names.
subTopic := fmt.Sprintf("%s-%x", GLOBAL_GOOGLE_STORAGE, md5.Sum([]byte(bucket+"/"+prefix)))
eventbus.RegisterSubTopic(GLOBAL_GOOGLE_STORAGE, subTopic, func(eventData interface{}) bool {
gsEvent, ok := eventData.(*GoogleStorageEventData)
if !ok {
sklog.Errorf("Received data that was not an instance of GoogleStorageEventData.")
return false
}
return (gsEvent.Bucket == bucket) && strings.HasPrefix(gsEvent.Name, prefix)
})
return subTopic
}
type GoogleStorageEventData struct {
Kind string `json:"kind"`
Id string `json:""`
SelfLink string `json:"selfLink"`
Name string `json:"name"`
Bucket string `json:"bucket"`
Generation string `json:"generation"`
Metageneration string `json:"metageneration"`
ContentType string `json:"contentType"`
Updated string `json:"updated"`
TimeDeleted string `json:"timeDeleted"`
StorageClass string `json:"storageClass"`
Size string `json:"size"`
Md5Hash string `json:"md5hash"`
MediaLink string `json:"mediaLink"`
Owner map[string]string `json:"owner"`
Crc32C string `json:"crc32c"`
ETag string `json:"etag"`
}
// BotBilter is a container for chainable filters for BuildBotEvents.
type BotFilter struct {
mutex sync.RWMutex
ids []string
predicates []func(e *BuildbotEventData) bool
}
// BotEventFilter returns a new chainable filter for bot events.
func BotEventFilter() *BotFilter {
return &BotFilter{
ids: []string{},
predicates: []func(*BuildbotEventData) bool{},
}
}
// EventType allows to specified a set of event types that should be
// include in the buildbot events that are delivered via the subscription
// to a subtopic of all buildbot events. If any of the arguments is
// the empty string, the filter will be ignored.
func (b *BotFilter) EventType(eTypes ...string) *BotFilter {
lookup := util.NewStringSet(eTypes)
if lookup[""] {
return b
}
return b.append(eTypes, func(ev *BuildbotEventData) bool {
return lookup[ev.Event]
})
}
// EventType allows to specified a set of step names that should be
// include in the buildbot events that are delivered via the subscription
// to a subtopic of all buildbot events. If any of the arguments is
// the empty string, the filter will be ignored.
func (b *BotFilter) StepName(stepNames ...string) *BotFilter {
lookup := util.NewStringSet(stepNames)
if lookup[""] {
return b
}
return b.append(stepNames, func(ev *BuildbotEventData) bool {
return lookup[getStepName(ev)]
})
}
func (b *BotFilter) append(ids []string, fn func(*BuildbotEventData) bool) *BotFilter {
b.mutex.Lock()
defer b.mutex.Unlock()
b.ids = append(b.ids, ids...)
b.predicates = append(b.predicates, fn)
return b
}
func (b *BotFilter) filter(ev *BuildbotEventData) bool {
b.mutex.RLock()
defer b.mutex.RUnlock()
for _, fn := range b.predicates {
if !fn(ev) {
return false
}
}
return true
}
// BuildbotEvents registers a subtopic and returns a subtopic name for the
// given buildbot event type. If the argument is empty or now filters were
// added to the BotFilter instance, all buildbot events will be delievered.
func BuildbotEvents(botFilter *BotFilter) string {
botFilter.mutex.Lock()
defer botFilter.mutex.Unlock()
if (botFilter == nil) || (len(botFilter.ids) == 0) {
return GLOBAL_BUILDBOT
}
// Make a copy of the predicates and genrate the topic name.
ids := append([]string{GLOBAL_BUILDBOT}, botFilter.ids...)
subTopic := strings.Join(ids, "-")
eventbus.RegisterSubTopic(GLOBAL_BUILDBOT, subTopic, func(eventData interface{}) bool {
e, ok := eventData.(*BuildbotEventData)
if !ok {
sklog.Errorf("Received data that was not an instance of BuildbotEventData.")
return false
}
return botFilter.filter(e)
})
return subTopic
}
type BuildbotEventData struct {
Id int64 `json:"id"`
Event string `json:"event"`
Payload map[string]interface{} `json:"payload"`
Project string `json:"project"`
}
// getStepName robustly extracts the step name if one is present in Payload.
func getStepName(e *BuildbotEventData) string {
if step, ok := e.Payload["step"]; ok {
if name, ok := step.(map[string]interface{})["name"]; ok {
return name.(string)
}
}
return ""
}