blob: d0727108dd211b4b27ddcbccf4ba8eddb18ace43 [file] [log] [blame]
package main
import (
"context"
"crypto/md5"
"encoding/json"
"testing"
"time"
lru "github.com/hashicorp/golang-lru"
"github.com/jackc/pgx/v4/pgxpool"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
"go.skia.org/infra/go/now"
"go.skia.org/infra/go/paramtools"
"go.skia.org/infra/go/testutils"
"go.skia.org/infra/go/testutils/unittest"
"go.skia.org/infra/golden/go/diff"
"go.skia.org/infra/golden/go/diff/mocks"
"go.skia.org/infra/golden/go/sql/schema"
"go.skia.org/infra/golden/go/sql/sqltest"
"go.skia.org/infra/golden/go/types"
)
func TestComputeDiffsForPrimaryBranch_WorkAvailable_Success(t *testing.T) {
unittest.LargeTest(t)
fakeNow := ts("2021-02-02T02:30:00Z")
existingData := schema.Tables{PrimaryBranchDiffCalculationWork: []schema.PrimaryBranchDiffCalculationRow{
{
GroupingID: h(alphaGrouping), // available for work
LastCalculated: ts("2021-02-02T02:15:00Z"),
CalculationLeaseEnds: ts("2021-02-02T02:14:00Z"),
},
{
GroupingID: h(betaGrouping), // Too recently computed
LastCalculated: ts("2021-02-02T02:29:50Z"),
CalculationLeaseEnds: ts("2021-02-02T02:14:00Z"),
},
{
GroupingID: h(gammaGrouping), // another worker has it "leased"
LastCalculated: ts("2021-02-02T02:25:00Z"),
CalculationLeaseEnds: ts("2021-02-02T02:37:00Z"),
},
{
GroupingID: h(deltaGrouping), // available for work (oldest)
LastCalculated: ts("2021-02-02T02:12:00Z"),
CalculationLeaseEnds: ts("2021-02-02T02:14:00Z"),
},
}, Groupings: makeGroupingRows(alphaGrouping, betaGrouping, gammaGrouping, deltaGrouping)}
ctx := context.WithValue(context.Background(), now.ContextKey, fakeNow)
db := sqltest.NewCockroachDBForTestsWithProductionSchema(ctx, t)
require.NoError(t, sqltest.BulkInsertDataTables(ctx, db, existingData))
mc := &mocks.Calculator{}
mc.On("CalculateDiffs", testutils.AnyContext, ps(deltaGrouping), noDigests).Return(nil)
s := processorForTest(mc, db)
shouldSleep, err := s.computeDiffsForPrimaryBranch(ctx)
require.NoError(t, err)
assert.False(t, shouldSleep)
mc.AssertExpectations(t)
actualWork := sqltest.GetAllRows(ctx, t, db, "PrimaryBranchDiffCalculationWork", &schema.PrimaryBranchDiffCalculationRow{})
assert.Contains(t, actualWork, schema.PrimaryBranchDiffCalculationRow{
GroupingID: h(deltaGrouping),
LastCalculated: ts("2021-02-02T02:12:00Z"),
CalculationLeaseEnds: ts("2021-02-02T02:40:00Z"), // This is the timeout + fakeNow
})
}
func TestComputeDiffsForPrimaryBranch_NoWorkAvailable_ShouldSleep(t *testing.T) {
unittest.LargeTest(t)
fakeNow := ts("2021-02-02T02:30:00Z")
rowsThatShouldBeUnchanged := []schema.PrimaryBranchDiffCalculationRow{
{
GroupingID: h(alphaGrouping), // another worker has it "leased"
LastCalculated: ts("2021-02-02T02:15:00Z"),
CalculationLeaseEnds: ts("2021-02-02T02:34:00Z"),
},
{
GroupingID: h(betaGrouping), // Too recently computed
LastCalculated: ts("2021-02-02T02:29:50Z"),
CalculationLeaseEnds: ts("2021-02-02T02:14:00Z"),
},
{
GroupingID: h(gammaGrouping), // another worker has it "leased"
LastCalculated: ts("2021-02-02T02:25:00Z"),
CalculationLeaseEnds: ts("2021-02-02T02:37:00Z"),
},
{
GroupingID: h(deltaGrouping), // Too recently computed
LastCalculated: ts("2021-02-02T02:29:45Z"),
CalculationLeaseEnds: ts("2021-02-02T02:14:00Z"),
},
}
existingData := schema.Tables{PrimaryBranchDiffCalculationWork: rowsThatShouldBeUnchanged, Groupings: makeGroupingRows(alphaGrouping, betaGrouping, gammaGrouping, deltaGrouping)}
ctx := context.WithValue(context.Background(), now.ContextKey, fakeNow)
db := sqltest.NewCockroachDBForTestsWithProductionSchema(ctx, t)
require.NoError(t, sqltest.BulkInsertDataTables(ctx, db, existingData))
s := processorForTest(nil, db)
shouldSleep, err := s.computeDiffsForPrimaryBranch(ctx)
require.NoError(t, err)
assert.True(t, shouldSleep)
// We shouldn't have leased any work
actualWork := sqltest.GetAllRows(ctx, t, db, "PrimaryBranchDiffCalculationWork", &schema.PrimaryBranchDiffCalculationRow{})
assert.ElementsMatch(t, rowsThatShouldBeUnchanged, actualWork)
}
func processorForTest(c diff.Calculator, db *pgxpool.Pool) *processor {
cache, err := lru.New(100)
if err != nil {
panic(err)
}
return &processor{
db: db,
calculator: c,
groupingCache: cache,
primaryCounter: fakeCounter{},
clsCounter: fakeCounter{},
}
}
func makeGroupingRows(groupings ...string) []schema.GroupingRow {
rv := make([]schema.GroupingRow, 0, len(groupings))
for _, g := range groupings {
rv = append(rv, schema.GroupingRow{
GroupingID: h(g),
Keys: ps(g),
})
}
return rv
}
const (
alphaGrouping = `{"name":"alpha","source_type":"corpus_one"}`
betaGrouping = `{"name":"beta","source_type":"corpus_two"}`
gammaGrouping = `{"name":"gamma","source_type":"corpus_one"}`
deltaGrouping = `{"name":"delta","source_type":"corpus_two"}`
)
var (
noDigests []types.Digest
)
func ts(s string) time.Time {
t, err := time.Parse(time.RFC3339, s)
if err != nil {
panic(err)
}
return t
}
// h returns the MD5 hash of the provided string.
func h(s string) []byte {
hash := md5.Sum([]byte(s))
return hash[:]
}
func ps(s string) paramtools.Params {
var rv paramtools.Params
if err := json.Unmarshal([]byte(s), &rv); err != nil {
panic(err)
}
return rv
}
type fakeCounter struct{}
func (_ fakeCounter) Dec(_ int64) {}
func (_ fakeCounter) Delete() error { return nil }
func (_ fakeCounter) Get() int64 { return 0 }
func (_ fakeCounter) Inc(i int64) {}
func (_ fakeCounter) Reset() {}