| package main |
| |
| import ( |
| "context" |
| "crypto/md5" |
| "encoding/json" |
| "testing" |
| "time" |
| |
| lru "github.com/hashicorp/golang-lru" |
| "github.com/jackc/pgx/v4/pgxpool" |
| "github.com/stretchr/testify/assert" |
| "github.com/stretchr/testify/require" |
| |
| "go.skia.org/infra/go/now" |
| "go.skia.org/infra/go/paramtools" |
| "go.skia.org/infra/go/testutils" |
| "go.skia.org/infra/go/testutils/unittest" |
| "go.skia.org/infra/golden/go/diff" |
| "go.skia.org/infra/golden/go/diff/mocks" |
| "go.skia.org/infra/golden/go/sql/schema" |
| "go.skia.org/infra/golden/go/sql/sqltest" |
| "go.skia.org/infra/golden/go/types" |
| ) |
| |
| func TestComputeDiffsForPrimaryBranch_WorkAvailable_Success(t *testing.T) { |
| unittest.LargeTest(t) |
| |
| fakeNow := ts("2021-02-02T02:30:00Z") |
| |
| existingData := schema.Tables{PrimaryBranchDiffCalculationWork: []schema.PrimaryBranchDiffCalculationRow{ |
| { |
| GroupingID: h(alphaGrouping), // available for work |
| LastCalculated: ts("2021-02-02T02:15:00Z"), |
| CalculationLeaseEnds: ts("2021-02-02T02:14:00Z"), |
| }, |
| { |
| GroupingID: h(betaGrouping), // Too recently computed |
| LastCalculated: ts("2021-02-02T02:29:50Z"), |
| CalculationLeaseEnds: ts("2021-02-02T02:14:00Z"), |
| }, |
| { |
| GroupingID: h(gammaGrouping), // another worker has it "leased" |
| LastCalculated: ts("2021-02-02T02:25:00Z"), |
| CalculationLeaseEnds: ts("2021-02-02T02:37:00Z"), |
| }, |
| { |
| GroupingID: h(deltaGrouping), // available for work (oldest) |
| LastCalculated: ts("2021-02-02T02:12:00Z"), |
| CalculationLeaseEnds: ts("2021-02-02T02:14:00Z"), |
| }, |
| }, Groupings: makeGroupingRows(alphaGrouping, betaGrouping, gammaGrouping, deltaGrouping)} |
| |
| ctx := context.WithValue(context.Background(), now.ContextKey, fakeNow) |
| db := sqltest.NewCockroachDBForTestsWithProductionSchema(ctx, t) |
| require.NoError(t, sqltest.BulkInsertDataTables(ctx, db, existingData)) |
| |
| mc := &mocks.Calculator{} |
| mc.On("CalculateDiffs", testutils.AnyContext, ps(deltaGrouping), noDigests).Return(nil) |
| |
| s := processorForTest(mc, db) |
| |
| shouldSleep, err := s.computeDiffsForPrimaryBranch(ctx) |
| require.NoError(t, err) |
| assert.False(t, shouldSleep) |
| |
| mc.AssertExpectations(t) |
| |
| actualWork := sqltest.GetAllRows(ctx, t, db, "PrimaryBranchDiffCalculationWork", &schema.PrimaryBranchDiffCalculationRow{}) |
| assert.Contains(t, actualWork, schema.PrimaryBranchDiffCalculationRow{ |
| GroupingID: h(deltaGrouping), |
| LastCalculated: ts("2021-02-02T02:12:00Z"), |
| CalculationLeaseEnds: ts("2021-02-02T02:40:00Z"), // This is the timeout + fakeNow |
| }) |
| } |
| |
| func TestComputeDiffsForPrimaryBranch_NoWorkAvailable_ShouldSleep(t *testing.T) { |
| unittest.LargeTest(t) |
| |
| fakeNow := ts("2021-02-02T02:30:00Z") |
| |
| rowsThatShouldBeUnchanged := []schema.PrimaryBranchDiffCalculationRow{ |
| { |
| GroupingID: h(alphaGrouping), // another worker has it "leased" |
| LastCalculated: ts("2021-02-02T02:15:00Z"), |
| CalculationLeaseEnds: ts("2021-02-02T02:34:00Z"), |
| }, |
| { |
| GroupingID: h(betaGrouping), // Too recently computed |
| LastCalculated: ts("2021-02-02T02:29:50Z"), |
| CalculationLeaseEnds: ts("2021-02-02T02:14:00Z"), |
| }, |
| { |
| GroupingID: h(gammaGrouping), // another worker has it "leased" |
| LastCalculated: ts("2021-02-02T02:25:00Z"), |
| CalculationLeaseEnds: ts("2021-02-02T02:37:00Z"), |
| }, |
| { |
| GroupingID: h(deltaGrouping), // Too recently computed |
| LastCalculated: ts("2021-02-02T02:29:45Z"), |
| CalculationLeaseEnds: ts("2021-02-02T02:14:00Z"), |
| }, |
| } |
| existingData := schema.Tables{PrimaryBranchDiffCalculationWork: rowsThatShouldBeUnchanged, Groupings: makeGroupingRows(alphaGrouping, betaGrouping, gammaGrouping, deltaGrouping)} |
| |
| ctx := context.WithValue(context.Background(), now.ContextKey, fakeNow) |
| db := sqltest.NewCockroachDBForTestsWithProductionSchema(ctx, t) |
| require.NoError(t, sqltest.BulkInsertDataTables(ctx, db, existingData)) |
| |
| s := processorForTest(nil, db) |
| |
| shouldSleep, err := s.computeDiffsForPrimaryBranch(ctx) |
| require.NoError(t, err) |
| assert.True(t, shouldSleep) |
| |
| // We shouldn't have leased any work |
| actualWork := sqltest.GetAllRows(ctx, t, db, "PrimaryBranchDiffCalculationWork", &schema.PrimaryBranchDiffCalculationRow{}) |
| assert.ElementsMatch(t, rowsThatShouldBeUnchanged, actualWork) |
| } |
| |
| func processorForTest(c diff.Calculator, db *pgxpool.Pool) *processor { |
| cache, err := lru.New(100) |
| if err != nil { |
| panic(err) |
| } |
| return &processor{ |
| db: db, |
| calculator: c, |
| groupingCache: cache, |
| primaryCounter: fakeCounter{}, |
| clsCounter: fakeCounter{}, |
| } |
| } |
| |
| func makeGroupingRows(groupings ...string) []schema.GroupingRow { |
| rv := make([]schema.GroupingRow, 0, len(groupings)) |
| for _, g := range groupings { |
| rv = append(rv, schema.GroupingRow{ |
| GroupingID: h(g), |
| Keys: ps(g), |
| }) |
| } |
| return rv |
| } |
| |
| const ( |
| alphaGrouping = `{"name":"alpha","source_type":"corpus_one"}` |
| betaGrouping = `{"name":"beta","source_type":"corpus_two"}` |
| gammaGrouping = `{"name":"gamma","source_type":"corpus_one"}` |
| deltaGrouping = `{"name":"delta","source_type":"corpus_two"}` |
| ) |
| |
| var ( |
| noDigests []types.Digest |
| ) |
| |
| func ts(s string) time.Time { |
| t, err := time.Parse(time.RFC3339, s) |
| if err != nil { |
| panic(err) |
| } |
| return t |
| } |
| |
| // h returns the MD5 hash of the provided string. |
| func h(s string) []byte { |
| hash := md5.Sum([]byte(s)) |
| return hash[:] |
| } |
| |
| func ps(s string) paramtools.Params { |
| var rv paramtools.Params |
| if err := json.Unmarshal([]byte(s), &rv); err != nil { |
| panic(err) |
| } |
| return rv |
| } |
| |
| type fakeCounter struct{} |
| |
| func (_ fakeCounter) Dec(_ int64) {} |
| func (_ fakeCounter) Delete() error { return nil } |
| func (_ fakeCounter) Get() int64 { return 0 } |
| func (_ fakeCounter) Inc(i int64) {} |
| func (_ fakeCounter) Reset() {} |