| package gevent |
| |
| import ( |
| "regexp" |
| "sort" |
| "testing" |
| "time" |
| |
| assert "github.com/stretchr/testify/require" |
| "go.skia.org/infra/go/eventbus" |
| "go.skia.org/infra/go/testutils" |
| "go.skia.org/infra/go/util" |
| ) |
| |
| const ( |
| PROJECT_ID = "test-project" |
| LOCAL_TOPIC = "testing-local-topic" |
| SUBSCRIBER_1 = "buildbot-1" |
| SUBSCRIBER_2 = "buildbot-2" |
| SUBSCRIBER_STORAGE_EVT = "buildbot-storage-evt" |
| |
| // TEST_BUCKET is not actually accessed, it's just used to test synthetic storate events. |
| TEST_BUCKET = "skia-not-existing-gm" |
| TEST_PREFIX = "dm-json-v1" |
| ) |
| |
| // Test structure that is send as the payload on the event channels. |
| type testType struct { |
| ID int |
| Value string |
| TimeStamp uint64 |
| } |
| |
| func TestEventBus(t *testing.T) { |
| testutils.LargeTest(t) |
| |
| testCodec := util.JSONCodec(&testType{}) |
| RegisterCodec("channel1", testCodec) |
| RegisterCodec("channel2", testCodec) |
| |
| eventBus, err := New(PROJECT_ID, LOCAL_TOPIC, SUBSCRIBER_1) |
| assert.NoError(t, err) |
| |
| eventBusTwo, err := New(PROJECT_ID, LOCAL_TOPIC, SUBSCRIBER_2) |
| assert.NoError(t, err) |
| |
| ch := make(chan int, 5) |
| eventBus.SubscribeAsync("channel1", func(e interface{}) { |
| ch <- e.(*testType).ID |
| }) |
| |
| eventBus.SubscribeAsync("channel2", func(e interface{}) { |
| ch <- e.(*testType).ID |
| }) |
| |
| eventBus.SubscribeAsync("channel2", func(e interface{}) { |
| ch <- e.(*testType).ID |
| }) |
| |
| now := uint64(time.Now().UnixNano()) / uint64(time.Millisecond) |
| eventBusTwo.Publish("channel1", &testType{ |
| ID: 1, |
| Value: "value 1", |
| TimeStamp: now, |
| }, true) |
| eventBusTwo.Publish("channel2", &testType{ |
| ID: 2, |
| Value: "value 2", |
| TimeStamp: now + 10, |
| }, true) |
| |
| // Give the messages 10 seconds to process. |
| startTime := time.Now() |
| for { |
| time.Sleep(time.Second) |
| if time.Now().Sub(startTime) > (time.Second * 10) { |
| assert.FailNow(t, "Timeout: did not receive messages in time") |
| } |
| if len(ch) == 3 { |
| break |
| } |
| } |
| assert.Equal(t, 3, len(ch)) |
| vals := []int{<-ch, <-ch, <-ch} |
| sort.Ints(vals) |
| assert.Equal(t, []int{1, 2, 2}, vals) |
| } |
| |
| func TestSynStorageEvents(t *testing.T) { |
| testutils.LargeTest(t) |
| |
| eventBus, err := New(PROJECT_ID, LOCAL_TOPIC, SUBSCRIBER_STORAGE_EVT) |
| assert.NoError(t, err) |
| |
| // Disable actual subscription to the bucket. It's not possible to test right now, but |
| // if the subscription fails or doesn't work we will know immediately when deploying. |
| eventBus.(*distEventBus).disableGCSSubscriptions = true |
| |
| targetFileRegExp := regexp.MustCompile(`.*\.json`) |
| storageEvtChan, err := eventBus.RegisterStorageEvents(TEST_BUCKET, TEST_PREFIX, targetFileRegExp, nil) |
| assert.NoError(t, err) |
| |
| evtCh := make(chan interface{}, 1) |
| eventBus.SubscribeAsync(storageEvtChan, func(evt interface{}) { |
| evtCh <- evt |
| }) |
| |
| now := util.TimeStamp(time.Microsecond) |
| testObjID := TEST_PREFIX + "/2018/11/01/15/89468e1cc434e93baeed282fd0c250b1d963c017/linux_xfa_rel/1541086007/pixel/dm.json" |
| evt := eventbus.NewStorageEvent(TEST_BUCKET, testObjID, now, "5bf5542e57a662120b400c4cff7e9c40") |
| eventBus.PublishStorageEvent(evt) |
| |
| assert.NoError(t, testutils.EventuallyConsistent(50*time.Millisecond, func() error { |
| select { |
| case evt := <-evtCh: |
| sEvt := evt.(*eventbus.StorageEvent) |
| assert.Equal(t, TEST_BUCKET, sEvt.BucketID) |
| assert.Equal(t, testObjID, sEvt.ObjectID) |
| assert.Equal(t, now, sEvt.TimeStamp) |
| return nil |
| default: |
| return testutils.TryAgainErr |
| } |
| })) |
| } |