[gold] Replace Pub/Sub diffcalculator Part 1
This is the first in a chain of CLs to replace Pub/Sub with
a table-based approach. This will hopefully prevent us from
getting a really big queue of backlog that prevents CLs from
having their diffs calculated in a timely manner.
This handles the primary branch case. Follow-on CLs will
deal with CL data and update periodictasks to store to
the correct tables instead of to Pub/Sub.
Bug: skia:12221
Change-Id: I21a36c759491ff01e7c71f7852a63713448cd2c2
Reviewed-on: https://skia-review.googlesource.com/c/buildbot/+/433919
Reviewed-by: Leandro Lovisolo <lovisolo@google.com>
diff --git a/golden/cmd/diffcalculator/BUILD.bazel b/golden/cmd/diffcalculator/BUILD.bazel
index a0d55e5..c7431be 100644
--- a/golden/cmd/diffcalculator/BUILD.bazel
+++ b/golden/cmd/diffcalculator/BUILD.bazel
@@ -10,6 +10,8 @@
"//go/common",
"//go/httputils",
"//go/metrics2",
+ "//go/now",
+ "//go/paramtools",
"//go/skerr",
"//go/sklog",
"//go/util",
@@ -17,10 +19,13 @@
"//golden/go/diff",
"//golden/go/diff/worker",
"//golden/go/sql",
+ "//golden/go/sql/schema",
"//golden/go/tracing",
"//golden/go/types",
+ "@com_github_cockroachdb_cockroach_go_v2//crdb/crdbpgx",
+ "@com_github_hashicorp_golang_lru//:golang-lru",
+ "@com_github_jackc_pgx_v4//:pgx",
"@com_github_jackc_pgx_v4//pgxpool",
- "@com_google_cloud_go_pubsub//:pubsub",
"@com_google_cloud_go_storage//:storage",
"@io_opencensus_go//trace",
],
@@ -37,12 +42,18 @@
srcs = ["diffcalculator_test.go"],
embed = [":diffcalculator_lib"],
deps = [
+ "//go/now",
"//go/paramtools",
- "//go/skerr",
"//go/testutils",
"//go/testutils/unittest",
+ "//golden/go/diff",
"//golden/go/diff/mocks",
+ "//golden/go/sql/schema",
+ "//golden/go/sql/sqltest",
"//golden/go/types",
+ "@com_github_hashicorp_golang_lru//:golang-lru",
+ "@com_github_jackc_pgx_v4//pgxpool",
"@com_github_stretchr_testify//assert",
+ "@com_github_stretchr_testify//require",
],
)
diff --git a/golden/cmd/diffcalculator/diffcalculator.go b/golden/cmd/diffcalculator/diffcalculator.go
index 0af060b..062d00a 100644
--- a/golden/cmd/diffcalculator/diffcalculator.go
+++ b/golden/cmd/diffcalculator/diffcalculator.go
@@ -4,23 +4,25 @@
import (
"context"
- "encoding/json"
"flag"
"io/ioutil"
+ "math/rand"
"net/http"
"path"
- "sync"
- "sync/atomic"
"time"
- "cloud.google.com/go/pubsub"
gstorage "cloud.google.com/go/storage"
+ "github.com/cockroachdb/cockroach-go/v2/crdb/crdbpgx"
+ lru "github.com/hashicorp/golang-lru"
+ "github.com/jackc/pgx/v4"
"github.com/jackc/pgx/v4/pgxpool"
"go.opencensus.io/trace"
"go.skia.org/infra/go/common"
"go.skia.org/infra/go/httputils"
"go.skia.org/infra/go/metrics2"
+ "go.skia.org/infra/go/now"
+ "go.skia.org/infra/go/paramtools"
"go.skia.org/infra/go/skerr"
"go.skia.org/infra/go/sklog"
"go.skia.org/infra/go/util"
@@ -28,6 +30,7 @@
"go.skia.org/infra/golden/go/diff"
"go.skia.org/infra/golden/go/diff/worker"
"go.skia.org/infra/golden/go/sql"
+ "go.skia.org/infra/golden/go/sql/schema"
"go.skia.org/infra/golden/go/tracing"
"go.skia.org/infra/golden/go/types"
)
@@ -38,30 +41,18 @@
// The GCS folder that contains the images, named by their digests.
imgFolder = "dm-images-v1"
+
+ calculateCLDataProportion = 0.7
+
+ primaryBranchStalenessThreshold = time.Minute
+
+ diffCalculationTimeout = 10 * time.Minute
+
+ groupingCacheSize = 100_000
)
type diffCalculatorConfig struct {
config.Common
-
- // DiffCacheNamespace is a namespace for differentiating the DiffCache entities. The instance
- // name is fine here.
- DiffCacheNamespace string `json:"diff_cache_namespace" optional:"true"`
-
- // DiffWorkSubscription is the subscription name used by all replicas of the diffcalculator.
- // By setting the subscriber ID to be the same on all instances of the diffcalculator,
- // only one of the replicas will get each event (usually). We like our subscription names
- // to be unique and keyed to the instance, for easier following up on "Why are there so many
- // backed up messages?"
- DiffWorkSubscription string `json:"diff_work_subscription"`
-
- // MemcachedServer is the address in the form dns_name:port
- // (e.g. gold-memcached-0.gold-memcached:11211).
- MemcachedServer string `json:"memcached_server" optional:"true"`
-
- // PubSubFetchSize is how many worker messages to ask PubSub for. This defaults to 10, but for
- // instances that have many tests, but most of the messages result in no-ops, this can be
- // higher for better utilization and throughput.
- PubSubFetchSize int `json:"pubsub_fetch_size" optional:"true"`
}
func main() {
@@ -106,23 +97,54 @@
db := mustInitSQLDatabase(ctx, dcc)
gis := mustMakeGCSImageSource(ctx, dcc)
+ gc, err := lru.New(groupingCacheSize)
+ if err != nil {
+ sklog.Fatalf("Could not initialize cache: %s", err)
+ }
+
sqlProcessor := &processor{
- calculator: worker.NewV2(db, gis, dcc.WindowSize),
- ackCounter: metrics2.GetCounter("diffcalculator_ack"),
- nackCounter: metrics2.GetCounter("diffcalculator_nack"),
+ calculator: worker.NewV2(db, gis, dcc.WindowSize),
+ db: db,
+ groupingCache: gc,
+ primaryCounter: metrics2.GetCounter("diffcalculator_primarybranch_processed"),
+ clsCounter: metrics2.GetCounter("diffcalculator_cls_processed"),
}
go func() {
- // Wait at least 5 seconds for the pubsub connection to be initialized before saying
+ // Wait at least 5 seconds for the db connection to be initialized before saying
// we are healthy.
time.Sleep(5 * time.Second)
http.HandleFunc("/healthz", httputils.ReadyHandleFunc)
sklog.Fatal(http.ListenAndServe(dcc.ReadyPort, nil))
}()
- startMetrics(ctx, sqlProcessor)
+ sklog.Fatalf("Stopped while polling for work %s", beginPolling(ctx, sqlProcessor))
+}
- sklog.Fatalf("Listening for work %s", listen(ctx, dcc, sqlProcessor))
+// beginPolling will continuously try to find work to compute either from CLs or the primary branch.
+func beginPolling(ctx context.Context, sqlProcessor *processor) error {
+ rand.Seed(time.Now().UnixNano())
+ for {
+ if err := ctx.Err(); err != nil {
+ return skerr.Wrap(err)
+ }
+ if rand.Float32() < calculateCLDataProportion {
+ if err := sqlProcessor.computeDiffsForCL(ctx); err != nil {
+ sklog.Errorf("Error computing diffs for CL: %s", err)
+ continue
+ }
+ } else {
+ shouldSleep, err := sqlProcessor.computeDiffsForPrimaryBranch(ctx)
+ if err != nil {
+ sklog.Errorf("Error computing diffs on primary branch: %s", err)
+ continue
+ }
+ if shouldSleep {
+ // TODO(kjlubick) make sure we don't poll as fast as possible when there is
+ // no work to do.
+ }
+ }
+ }
}
func mustInitSQLDatabase(ctx context.Context, dcc diffCalculatorConfig) *pgxpool.Pool {
@@ -176,116 +198,89 @@
return b, skerr.Wrap(err)
}
-func listen(ctx context.Context, dcc diffCalculatorConfig, p *processor) error {
- psc, err := pubsub.NewClient(ctx, dcc.PubsubProjectID)
- if err != nil {
- return skerr.Wrapf(err, "initializing pubsub client for project %s", dcc.PubsubProjectID)
- }
-
- // Check that the topic exists. Fail if it does not.
- t := psc.Topic(dcc.DiffWorkTopic)
- if exists, err := t.Exists(ctx); err != nil {
- return skerr.Wrapf(err, "checking for existing topic %s", dcc.DiffWorkTopic)
- } else if !exists {
- return skerr.Fmt("Diff work topic %s does not exist in project %s", dcc.DiffWorkTopic, dcc.PubsubProjectID)
- }
-
- // Check that the subscription exists. Fail if it does not.
- sub := psc.Subscription(dcc.DiffWorkSubscription)
- if exists, err := sub.Exists(ctx); err != nil {
- return skerr.Wrapf(err, "checking for existing subscription %s", dcc.DiffWorkSubscription)
- } else if !exists {
- return skerr.Fmt("subscription %s does not exist in project %s", dcc.DiffWorkSubscription, dcc.PubsubProjectID)
- }
-
- // This is a limit of how many messages to fetch when PubSub has no work. Waiting for PubSub
- // to give us messages can take a second or two, so we choose a small, but not too small
- // batch size.
- if dcc.PubSubFetchSize == 0 {
- sub.ReceiveSettings.MaxOutstandingMessages = 10
- } else {
- sub.ReceiveSettings.MaxOutstandingMessages = dcc.PubSubFetchSize
- }
-
- // This process will handle one message at a time. This allows us to more finely control the
- // scaling up as necessary.
- sub.ReceiveSettings.NumGoroutines = 1
-
- // Blocks until context cancels or pubsub fails in a non retryable way.
- return skerr.Wrap(sub.Receive(ctx, p.processPubSubMessage))
-}
-
type processor struct {
- calculator diff.Calculator
- ackCounter metrics2.Counter
- nackCounter metrics2.Counter
- // busy is either 1 or 0 depending on if this processor is working or not. This allows us
- // to gather data on wall-clock utilization.
- busy int64
- // PubSub sometimes gives us more than one messages at a time. This mutex ensures that
- // we only really process one at a time, which makes sure we don't overload our CPU estimate
- // and we avoid cache thrashing.
- oneMessageAtATime sync.Mutex
+ db *pgxpool.Pool
+ calculator diff.Calculator
+ groupingCache *lru.Cache
+ primaryCounter metrics2.Counter
+ clsCounter metrics2.Counter
}
-// processPubSubMessage processes the data in the given pubsub message and acks or nacks it
-// as appropriate.
-func (p *processor) processPubSubMessage(ctx context.Context, msg *pubsub.Message) {
- p.oneMessageAtATime.Lock()
- defer p.oneMessageAtATime.Unlock()
- ctx, span := trace.StartSpan(ctx, "processFromPubSub")
- defer span.End()
- atomic.StoreInt64(&p.busy, 1)
- if shouldAck := p.processMessage(ctx, msg.Data); shouldAck {
- msg.Ack()
- p.ackCounter.Inc(1)
- } else {
- msg.Nack()
- p.nackCounter.Inc(1)
- }
- atomic.StoreInt64(&p.busy, 0)
-}
-
-// processMessage reads the bytes as JSON and calls CalculateDiffs if those bytes were valid.
-// We have this as its own function to make it easier to test (it's hard to instantiate a valid
-// pubsub message without the emulator because there are private members that need initializing).
-// It returns a bool that represents whether the message should be Ack'd (not retried) or Nack'd
-// (retried later).
-func (p *processor) processMessage(ctx context.Context, msgData []byte) bool {
- defer metrics2.FuncTimer().Stop()
- var wm diff.WorkerMessage
- if err := json.Unmarshal(msgData, &wm); err != nil {
- sklog.Errorf("Invalid message passed in: %s\n%s", err, string(msgData))
- return true // ack this message so no other subscriber gets it (it will still be invalid).
- }
- if wm.Version != diff.WorkerMessageVersion {
- return true // This is an old or a new message, skip it.
- }
+// computeDiffsForPrimaryBranch fetches the grouping which has not had diff computation happen
+// in the longest time and that some other process is not currently working on.
+func (p *processor) computeDiffsForPrimaryBranch(ctx context.Context) (bool, error) {
// Prevent our workers from getting starved out with long-running tasks. Cancel them, an
// requeue them. CalculateDiffs should be streaming results, so we get some partial progress.
ctx, cancel := context.WithTimeout(ctx, 10*time.Minute)
defer cancel()
- err := p.calculator.CalculateDiffs(ctx, wm.Grouping, wm.AdditionalLeft, wm.AdditionalRight)
+ ctx, span := trace.StartSpan(ctx, "diffcalculator_computeDiffsForPrimaryBranch")
+ defer span.End()
+
+ hasWork := false
+ var groupingID schema.GroupingID
+
+ err := crdbpgx.ExecuteTx(ctx, p.db, pgx.TxOptions{}, func(tx pgx.Tx) error {
+ ts := now.Now(ctx)
+ const selectStatement = `SELECT grouping_id
+FROM PrimaryBranchDiffCalculationWork
+WHERE calculation_lease_ends < $1 AND last_calculated_ts < $2
+ORDER BY last_calculated_ts ASC
+LIMIT 1`
+ row := tx.QueryRow(ctx, selectStatement, ts, ts.Add(-1*primaryBranchStalenessThreshold))
+
+ if err := row.Scan(&groupingID); err != nil {
+ if err == pgx.ErrNoRows {
+ // We've calculated data for the entire primary branch to better than the threshold,
+ // so we return because there's nothing to do right now.
+ return nil
+ }
+ return err // don't wrap - might be retried
+ }
+
+ const updateStatement = `UPDATE PrimaryBranchDiffCalculationWork
+SET calculation_lease_ends = $2 WHERE grouping_id = $1`
+ if _, err := tx.Exec(ctx, updateStatement, groupingID, ts.Add(diffCalculationTimeout)); err != nil {
+ return err // don't wrap, might be retried
+ }
+ hasWork = true
+ return nil
+ })
if err != nil {
- sklog.Errorf("Calculating diffs for %v: %s", wm, err)
- return false // Let this be tried again.
+ return false, skerr.Wrap(err)
}
- return true // successfully processed.
+ if !hasWork {
+ return true, nil
+ }
+ grouping, err := p.expandGrouping(ctx, groupingID)
+ if err != nil {
+ return false, skerr.Wrap(err)
+ }
+ if err := p.calculator.CalculateDiffs(ctx, grouping, nil); err != nil {
+ return false, skerr.Wrap(err)
+ }
+ p.primaryCounter.Inc(1)
+ return false, nil
}
-func startMetrics(ctx context.Context, p *processor) {
- // This metric will let us get a sense of how well-utilized this processor is. It reads the
- // busy int of the processor (which is 0 or 1) and increments the counter with that value.
- // Because we are updating the counter once per second, we can use rate() [which computes deltas
- // per second] on this counter to get a number between 0 and 1 to indicate wall-clock
- // utilization. Hopefully, this lets us know if we need to add more replicas.
- go func() {
- busy := metrics2.GetCounter("diffcalculator_busy_pulses")
- for range time.Tick(time.Second) {
- if err := ctx.Err(); err != nil {
- return
- }
- busy.Inc(atomic.LoadInt64(&p.busy))
+// expandGrouping returns the params associated with the grouping id. It will use the cache - if
+// there is a cache miss, it will look it up, add it to the cache and return it.
+func (p *processor) expandGrouping(ctx context.Context, groupingID schema.GroupingID) (paramtools.Params, error) {
+ ctx, span := trace.StartSpan(ctx, "expandGrouping")
+ defer span.End()
+ var groupingKeys paramtools.Params
+ if gk, ok := p.groupingCache.Get(string(groupingID)); ok {
+ return gk.(paramtools.Params), nil
+ } else {
+ const statement = `SELECT keys FROM Groupings WHERE grouping_id = $1`
+ row := p.db.QueryRow(ctx, statement, groupingID)
+ if err := row.Scan(&groupingKeys); err != nil {
+ return nil, skerr.Wrap(err)
}
- }()
+ p.groupingCache.Add(string(groupingID), groupingKeys)
+ }
+ return groupingKeys, nil
+}
+
+func (p *processor) computeDiffsForCL(ctx context.Context) error {
+ return skerr.Fmt("Not impl")
}
diff --git a/golden/cmd/diffcalculator/diffcalculator_test.go b/golden/cmd/diffcalculator/diffcalculator_test.go
index d8eb252..d072710 100644
--- a/golden/cmd/diffcalculator/diffcalculator_test.go
+++ b/golden/cmd/diffcalculator/diffcalculator_test.go
@@ -2,77 +2,184 @@
import (
"context"
+ "crypto/md5"
+ "encoding/json"
"testing"
+ "time"
+ lru "github.com/hashicorp/golang-lru"
+ "github.com/jackc/pgx/v4/pgxpool"
"github.com/stretchr/testify/assert"
+ "github.com/stretchr/testify/require"
+ "go.skia.org/infra/go/now"
"go.skia.org/infra/go/paramtools"
- "go.skia.org/infra/go/skerr"
"go.skia.org/infra/go/testutils"
"go.skia.org/infra/go/testutils/unittest"
+ "go.skia.org/infra/golden/go/diff"
"go.skia.org/infra/golden/go/diff/mocks"
+ "go.skia.org/infra/golden/go/sql/schema"
+ "go.skia.org/infra/golden/go/sql/sqltest"
"go.skia.org/infra/golden/go/types"
)
-func TestProcessPubSubMessage_OldJSON_NoCalculation_Ack(t *testing.T) {
- unittest.SmallTest(t)
+func TestComputeDiffsForPrimaryBranch_WorkAvailable_Success(t *testing.T) {
+ unittest.LargeTest(t)
- p := processor{}
+ fakeNow := ts("2021-02-02T02:30:00Z")
- messageBytes := []byte(`{"grouping":{"name":"any-test","other grouping":"something","source_type":"any-corpus"},"additional_digests":["abcd","ef123"]}`)
- shouldAck := p.processMessage(context.Background(), messageBytes)
- assert.True(t, shouldAck)
-}
+ existingData := schema.Tables{PrimaryBranchDiffCalculationWork: []schema.PrimaryBranchDiffCalculationRow{
+ {
+ GroupingID: h(alphaGrouping), // available for work
+ LastCalculated: ts("2021-02-02T02:15:00Z"),
+ CalculationLeaseEnds: ts("2021-02-02T02:14:00Z"),
+ },
+ {
+ GroupingID: h(betaGrouping), // Too recently computed
+ LastCalculated: ts("2021-02-02T02:29:50Z"),
+ CalculationLeaseEnds: ts("2021-02-02T02:14:00Z"),
+ },
+ {
+ GroupingID: h(gammaGrouping), // another worker has it "leased"
+ LastCalculated: ts("2021-02-02T02:25:00Z"),
+ CalculationLeaseEnds: ts("2021-02-02T02:37:00Z"),
+ },
+ {
+ GroupingID: h(deltaGrouping), // available for work (oldest)
+ LastCalculated: ts("2021-02-02T02:12:00Z"),
+ CalculationLeaseEnds: ts("2021-02-02T02:14:00Z"),
+ },
+ }, Groupings: makeGroupingRows(alphaGrouping, betaGrouping, gammaGrouping, deltaGrouping)}
-func TestProcessPubSubMessage_ValidJSON_CalculateSucceeds_Ack(t *testing.T) {
- unittest.SmallTest(t)
+ ctx := context.WithValue(context.Background(), now.ContextKey, fakeNow)
+ db := sqltest.NewCockroachDBForTestsWithProductionSchema(ctx, t)
+ require.NoError(t, sqltest.BulkInsertDataTables(ctx, db, existingData))
- mc := mocks.Calculator{}
+ mc := &mocks.Calculator{}
+ mc.On("CalculateDiffs", testutils.AnyContext, ps(deltaGrouping), noDigests).Return(nil)
- expectedGrouping := paramtools.Params{
- types.CorpusField: "any-corpus",
- types.PrimaryKeyField: "any-test",
- "other grouping": "something",
- }
- expectedLeftDigests := []types.Digest{"abcd", "ef123"}
- expectedRightDigests := []types.Digest{"4567"}
+ s := processorForTest(mc, db)
- mc.On("CalculateDiffs", testutils.AnyContext, expectedGrouping, expectedLeftDigests, expectedRightDigests).Return(nil)
+ shouldSleep, err := s.computeDiffsForPrimaryBranch(ctx)
+ require.NoError(t, err)
+ assert.False(t, shouldSleep)
- p := processor{calculator: &mc}
-
- messageBytes := []byte(`{"version":3,"grouping":{"name":"any-test","other grouping":"something","source_type":"any-corpus"},"additional_left":["abcd","ef123"],"additional_right":["4567"]}`)
- shouldAck := p.processMessage(context.Background(), messageBytes)
- assert.True(t, shouldAck)
mc.AssertExpectations(t)
+
+ actualWork := sqltest.GetAllRows(ctx, t, db, "PrimaryBranchDiffCalculationWork", &schema.PrimaryBranchDiffCalculationRow{})
+ assert.Contains(t, actualWork, schema.PrimaryBranchDiffCalculationRow{
+ GroupingID: h(deltaGrouping),
+ LastCalculated: ts("2021-02-02T02:12:00Z"),
+ CalculationLeaseEnds: ts("2021-02-02T02:40:00Z"), // This is the timeout + fakeNow
+ })
}
-func TestProcessPubSubMessage_ValidJSON_CalculateFails_Nack(t *testing.T) {
- unittest.SmallTest(t)
+func TestComputeDiffsForPrimaryBranch_NoWorkAvailable_ShouldSleep(t *testing.T) {
+ unittest.LargeTest(t)
- mc := mocks.Calculator{}
+ fakeNow := ts("2021-02-02T02:30:00Z")
- expectedGrouping := paramtools.Params{
- types.CorpusField: "any-corpus",
- types.PrimaryKeyField: "any-test",
+ rowsThatShouldBeUnchanged := []schema.PrimaryBranchDiffCalculationRow{
+ {
+ GroupingID: h(alphaGrouping), // another worker has it "leased"
+ LastCalculated: ts("2021-02-02T02:15:00Z"),
+ CalculationLeaseEnds: ts("2021-02-02T02:34:00Z"),
+ },
+ {
+ GroupingID: h(betaGrouping), // Too recently computed
+ LastCalculated: ts("2021-02-02T02:29:50Z"),
+ CalculationLeaseEnds: ts("2021-02-02T02:14:00Z"),
+ },
+ {
+ GroupingID: h(gammaGrouping), // another worker has it "leased"
+ LastCalculated: ts("2021-02-02T02:25:00Z"),
+ CalculationLeaseEnds: ts("2021-02-02T02:37:00Z"),
+ },
+ {
+ GroupingID: h(deltaGrouping), // Too recently computed
+ LastCalculated: ts("2021-02-02T02:29:45Z"),
+ CalculationLeaseEnds: ts("2021-02-02T02:14:00Z"),
+ },
}
- var noExpectedDigests []types.Digest
+ existingData := schema.Tables{PrimaryBranchDiffCalculationWork: rowsThatShouldBeUnchanged, Groupings: makeGroupingRows(alphaGrouping, betaGrouping, gammaGrouping, deltaGrouping)}
- mc.On("CalculateDiffs", testutils.AnyContext, expectedGrouping, noExpectedDigests, noExpectedDigests).Return(skerr.Fmt("boom"))
+ ctx := context.WithValue(context.Background(), now.ContextKey, fakeNow)
+ db := sqltest.NewCockroachDBForTestsWithProductionSchema(ctx, t)
+ require.NoError(t, sqltest.BulkInsertDataTables(ctx, db, existingData))
- p := processor{calculator: &mc}
+ s := processorForTest(nil, db)
- messageBytes := []byte(`{"version":3,"grouping":{"name":"any-test","source_type":"any-corpus"}}`)
- shouldAck := p.processMessage(context.Background(), messageBytes)
- assert.False(t, shouldAck)
- mc.AssertExpectations(t)
+ shouldSleep, err := s.computeDiffsForPrimaryBranch(ctx)
+ require.NoError(t, err)
+ assert.True(t, shouldSleep)
+
+ // We shouldn't have leased any work
+ actualWork := sqltest.GetAllRows(ctx, t, db, "PrimaryBranchDiffCalculationWork", &schema.PrimaryBranchDiffCalculationRow{})
+ assert.ElementsMatch(t, rowsThatShouldBeUnchanged, actualWork)
}
-func TestProcessPubSubMessage_InvalidJSON_Ack(t *testing.T) {
- unittest.SmallTest(t)
-
- p := processor{}
- messageBytes := []byte(`invalid json`)
- shouldAck := p.processMessage(context.Background(), messageBytes)
- assert.True(t, shouldAck)
+func processorForTest(c diff.Calculator, db *pgxpool.Pool) *processor {
+ cache, err := lru.New(100)
+ if err != nil {
+ panic(err)
+ }
+ return &processor{
+ db: db,
+ calculator: c,
+ groupingCache: cache,
+ primaryCounter: fakeCounter{},
+ clsCounter: fakeCounter{},
+ }
}
+
+func makeGroupingRows(groupings ...string) []schema.GroupingRow {
+ rv := make([]schema.GroupingRow, 0, len(groupings))
+ for _, g := range groupings {
+ rv = append(rv, schema.GroupingRow{
+ GroupingID: h(g),
+ Keys: ps(g),
+ })
+ }
+ return rv
+}
+
+const (
+ alphaGrouping = `{"name":"alpha","source_type":"corpus_one"}`
+ betaGrouping = `{"name":"beta","source_type":"corpus_two"}`
+ gammaGrouping = `{"name":"gamma","source_type":"corpus_one"}`
+ deltaGrouping = `{"name":"delta","source_type":"corpus_two"}`
+)
+
+var (
+ noDigests []types.Digest
+)
+
+func ts(s string) time.Time {
+ t, err := time.Parse(time.RFC3339, s)
+ if err != nil {
+ panic(err)
+ }
+ return t
+}
+
+// h returns the MD5 hash of the provided string.
+func h(s string) []byte {
+ hash := md5.Sum([]byte(s))
+ return hash[:]
+}
+
+func ps(s string) paramtools.Params {
+ var rv paramtools.Params
+ if err := json.Unmarshal([]byte(s), &rv); err != nil {
+ panic(err)
+ }
+ return rv
+}
+
+type fakeCounter struct{}
+
+func (_ fakeCounter) Dec(_ int64) {}
+func (_ fakeCounter) Delete() error { return nil }
+func (_ fakeCounter) Get() int64 { return 0 }
+func (_ fakeCounter) Inc(i int64) {}
+func (_ fakeCounter) Reset() {}
diff --git a/golden/go/diff/diff.go b/golden/go/diff/diff.go
index 6598ed9..1fcbc16 100644
--- a/golden/go/diff/diff.go
+++ b/golden/go/diff/diff.go
@@ -281,20 +281,11 @@
type Calculator interface {
// CalculateDiffs recomputes all diffs for the current grouping, including any digests provided.
- // Images (digests) will be sorted into two buckets, the left and right bucket. The left bucket
- // is a superset of the right bucket. The right bucket consists of all triaged images
- // for this grouping. The left bucket consists of *all* digests seen for this grouping.
- // During search, a user will want to see the closest positive and negative image for a given
- // image. By splitting the images into two different buckets, we do less precomputation. For
- // example, if there are several flaky traces in a grouping, it can be that 10% of the overall
- // images for a grouping are triaged and 90% are *not* (these typically come from ignored traces
- // because they produce something different during most commits). In such a case, computing a
- // given untriaged digest from a trace against a different untriaged digest is a waste since it
- // won't show up in the search results. By splitting the images into two buckets, we can
- // dramatically reduce the computation done over a naive N x N comparison scheme.
- CalculateDiffs(ctx context.Context, grouping paramtools.Params, additionalLeft, additionalRight []types.Digest) error
+ CalculateDiffs(ctx context.Context, grouping paramtools.Params, additional []types.Digest) error
}
+// TODO(kjlubick) remove WorkerMesssage and WorkerMessageVersion
+
// WorkerMessageVersion is the current version of the WorkerMessage JSON.
const WorkerMessageVersion = 3
diff --git a/golden/go/diff/mocks/Calculator.go b/golden/go/diff/mocks/Calculator.go
index 11feee1..ba897bf 100644
--- a/golden/go/diff/mocks/Calculator.go
+++ b/golden/go/diff/mocks/Calculator.go
@@ -17,13 +17,13 @@
mock.Mock
}
-// CalculateDiffs provides a mock function with given fields: ctx, grouping, additionalLeft, additionalRight
-func (_m *Calculator) CalculateDiffs(ctx context.Context, grouping paramtools.Params, additionalLeft []types.Digest, additionalRight []types.Digest) error {
- ret := _m.Called(ctx, grouping, additionalLeft, additionalRight)
+// CalculateDiffs provides a mock function with given fields: ctx, grouping, additional
+func (_m *Calculator) CalculateDiffs(ctx context.Context, grouping paramtools.Params, additional []types.Digest) error {
+ ret := _m.Called(ctx, grouping, additional)
var r0 error
- if rf, ok := ret.Get(0).(func(context.Context, paramtools.Params, []types.Digest, []types.Digest) error); ok {
- r0 = rf(ctx, grouping, additionalLeft, additionalRight)
+ if rf, ok := ret.Get(0).(func(context.Context, paramtools.Params, []types.Digest) error); ok {
+ r0 = rf(ctx, grouping, additional)
} else {
r0 = ret.Error(0)
}
diff --git a/golden/go/diff/worker/worker.go b/golden/go/diff/worker/worker.go
index ff86ce4..aeabe91 100644
--- a/golden/go/diff/worker/worker.go
+++ b/golden/go/diff/worker/worker.go
@@ -117,7 +117,7 @@
func (w *WorkerImpl) CalculateDiffs(ctx context.Context, grouping paramtools.Params, addLeft, addRight []types.Digest) error {
ctx, span := trace.StartSpan(ctx, "CalculateDiffs")
if span.IsRecordingEvents() {
- addMetadata(span, grouping, len(addLeft), len(addRight))
+ addMetadata(span, grouping, len(addLeft))
}
defer span.End()
startingTile, err := w.getStartingTile(ctx)
@@ -280,12 +280,11 @@
// addMetadata adds some attributes to the span so we can tell how much work it was supposed to
// be doing when we are looking at the traces and the performance.
-func addMetadata(span *trace.Span, grouping paramtools.Params, leftDigestCount, rightDigestCount int) {
+func addMetadata(span *trace.Span, grouping paramtools.Params, leftDigestCount int) {
groupingStr, _ := json.Marshal(grouping)
span.AddAttributes(
trace.StringAttribute("grouping", string(groupingStr)),
- trace.Int64Attribute("left_digests", int64(leftDigestCount)),
- trace.Int64Attribute("right_digests", int64(rightDigestCount)))
+ trace.Int64Attribute("additional_digests", int64(leftDigestCount)))
}
// getStartingTile returns the commit ID which is the beginning of the tile of interest (so we
@@ -609,6 +608,3 @@
}
return diff.GetNRGBA(im), nil
}
-
-// Make sure WorkerImpl fulfills the diff.Calculator interface.
-var _ diff.Calculator = (*WorkerImpl)(nil)
diff --git a/golden/go/diff/worker/worker2.go b/golden/go/diff/worker/worker2.go
index 9e3255f..a76344e 100644
--- a/golden/go/diff/worker/worker2.go
+++ b/golden/go/diff/worker/worker2.go
@@ -60,10 +60,10 @@
// CalculateDiffs calculates the diffs for the given grouping. It either computes all of the diffs
// if there are only "a few" digests, otherwise it computes a subset of them, taking into account
// recency and triage status.
-func (w *WorkerImpl2) CalculateDiffs(ctx context.Context, grouping paramtools.Params, addLeft, addRight []types.Digest) error {
+func (w *WorkerImpl2) CalculateDiffs(ctx context.Context, grouping paramtools.Params, additional []types.Digest) error {
ctx, span := trace.StartSpan(ctx, "worker2_CalculateDiffs")
if span.IsRecordingEvents() {
- addMetadata(span, grouping, len(addLeft), len(addRight))
+ addMetadata(span, grouping, len(additional))
}
defer span.End()
startingTile, err := w.getStartingTile(ctx)
@@ -75,9 +75,9 @@
return skerr.Wrap(err)
}
- total := len(allDigests) + len(addLeft) + len(addRight)
+ total := len(allDigests) + len(additional)
w.inputDigestsSummary.Observe(float64(total))
- inputDigests := convertToDigestBytes(append(addLeft, addRight...))
+ inputDigests := convertToDigestBytes(additional)
if total > computeTotalGridCutoff {
// If there are too many digests, we perform a somewhat expensive operation of looking at
// the digests produced by all traces to find a smaller subset of images that we should
@@ -85,8 +85,7 @@
// a small percentage of groupings (i.e. tests) to have many digests.
return skerr.Wrap(w.calculateDiffSubset(ctx, grouping, inputDigests, startingTile))
}
- allDigests = addDigests(allDigests, addLeft)
- allDigests = addDigests(allDigests, addRight)
+ allDigests = append(allDigests, inputDigests...)
return skerr.Wrap(w.calculateAllDiffs(ctx, allDigests))
}
@@ -105,22 +104,6 @@
return rv
}
-// addDigests adds the given hex-encoded digests to the slice of bytes.
-func addDigests(digests []schema.DigestBytes, additional []types.Digest) []schema.DigestBytes {
- if len(additional) == 0 {
- return digests
- }
- for _, d := range additional {
- b, err := sql.DigestToBytes(d)
- if err != nil {
- sklog.Warningf("Invalid digest seen %q: %s", d, err)
- continue
- }
- digests = append(digests, b)
- }
- return digests
-}
-
// getStartingTile returns the commit ID which is the beginning of the tile of interest (so we
// get enough data to do our comparisons).
func (w *WorkerImpl2) getStartingTile(ctx context.Context) (schema.TileID, error) {
diff --git a/golden/go/diff/worker/worker2_test.go b/golden/go/diff/worker/worker2_test.go
index 1f4a8b9..579f1a8 100644
--- a/golden/go/diff/worker/worker2_test.go
+++ b/golden/go/diff/worker/worker2_test.go
@@ -37,7 +37,7 @@
types.PrimaryKeyField: "not used",
}
imagesToCalculateDiffsFor := []types.Digest{dks.DigestA01Pos, dks.DigestA02Pos, dks.DigestA04Unt, dks.DigestA05Unt}
- require.NoError(t, w.CalculateDiffs(ctx, grouping, imagesToCalculateDiffsFor, imagesToCalculateDiffsFor))
+ require.NoError(t, w.CalculateDiffs(ctx, grouping, imagesToCalculateDiffsFor))
actualMetrics := getAllDiffMetricRows(t, db)
assert.Equal(t, []schema.DiffMetricRow{
@@ -77,7 +77,7 @@
types.CorpusField: dks.CornersCorpus,
types.PrimaryKeyField: dks.TriangleTest,
}
- require.NoError(t, w.CalculateDiffs(ctx, grouping, nil, nil))
+ require.NoError(t, w.CalculateDiffs(ctx, grouping, nil))
actualMetrics := getAllDiffMetricRows(t, db)
assert.Equal(t, []schema.DiffMetricRow{
@@ -131,7 +131,7 @@
for i := 0; i < computeTotalGridCutoff+1; i++ {
extraDigests = append(extraDigests, dks.DigestBlank)
}
- require.NoError(t, w.CalculateDiffs(ctx, grouping, extraDigests, nil))
+ require.NoError(t, w.CalculateDiffs(ctx, grouping, extraDigests))
actualMetrics := getAllDiffMetricRows(t, db)
assert.Equal(t, []schema.DiffMetricRow{
@@ -179,7 +179,7 @@
types.CorpusField: dks.RoundCorpus,
types.PrimaryKeyField: dks.CircleTest,
}
- require.NoError(t, w.CalculateDiffs(ctx, grouping, []types.Digest{dks.DigestC06Pos_CL}, []types.Digest{dks.DigestC06Pos_CL}))
+ require.NoError(t, w.CalculateDiffs(ctx, grouping, []types.Digest{dks.DigestC06Pos_CL}))
actualMetrics := getAllDiffMetricRows(t, db)
assert.Equal(t, []schema.DiffMetricRow{
@@ -226,7 +226,7 @@
types.PrimaryKeyField: "not used",
}
imagesToCalculateDiffsFor := []types.Digest{dks.DigestA01Pos, dks.DigestA02Pos, dks.DigestA04Unt}
- require.NoError(t, w.CalculateDiffs(ctx, grouping, imagesToCalculateDiffsFor, imagesToCalculateDiffsFor))
+ require.NoError(t, w.CalculateDiffs(ctx, grouping, imagesToCalculateDiffsFor))
actualMetrics := getAllDiffMetricRows(t, db)
// We should see partial success
@@ -278,7 +278,7 @@
types.PrimaryKeyField: "not used",
}
imagesToCalculateDiffsFor := []types.Digest{dks.DigestA01Pos, dks.DigestA02Pos, dks.DigestA04Unt}
- require.NoError(t, w.CalculateDiffs(ctx, grouping, imagesToCalculateDiffsFor, imagesToCalculateDiffsFor))
+ require.NoError(t, w.CalculateDiffs(ctx, grouping, imagesToCalculateDiffsFor))
actualMetrics := getAllDiffMetricRows(t, db)
// We should see partial success
diff --git a/golden/go/sql/schema/BUILD.bazel b/golden/go/sql/schema/BUILD.bazel
index bc9c5d7..399b432 100644
--- a/golden/go/sql/schema/BUILD.bazel
+++ b/golden/go/sql/schema/BUILD.bazel
@@ -13,6 +13,7 @@
"//go/paramtools",
"//go/skerr",
"//golden/go/expectations",
+ "//golden/go/types",
"@com_github_google_uuid//:uuid",
"@com_github_jackc_pgtype//:pgtype",
],
diff --git a/golden/go/sql/schema/sql.go b/golden/go/sql/schema/sql.go
index 50288db..fcc2fa2 100644
--- a/golden/go/sql/schema/sql.go
+++ b/golden/go/sql/schema/sql.go
@@ -91,6 +91,11 @@
created_ts TIMESTAMP WITH TIME ZONE,
INDEX cl_order_idx (changelist_id, ps_order)
);
+CREATE TABLE IF NOT EXISTS PrimaryBranchDiffCalculationWork (
+ grouping_id BYTES PRIMARY KEY,
+ last_calculated_ts TIMESTAMP WITH TIME ZONE NOT NULL,
+ calculation_lease_ends TIMESTAMP WITH TIME ZONE NOT NULL
+);
CREATE TABLE IF NOT EXISTS PrimaryBranchParams (
tile_id INT4,
key STRING,
@@ -103,6 +108,15 @@
latest_error STRING NOT NULL,
error_ts TIMESTAMP WITH TIME ZONE NOT NULL
);
+CREATE TABLE IF NOT EXISTS SecondaryBranchDiffCalculationWork (
+ branch_name STRING,
+ grouping_id BYTES,
+ last_updated_ts TIMESTAMP WITH TIME ZONE NOT NULL,
+ digests STRING[] NOT NULL,
+ last_calculated_ts TIMESTAMP WITH TIME ZONE NOT NULL,
+ calculation_lease_ends TIMESTAMP WITH TIME ZONE NOT NULL,
+ PRIMARY KEY (branch_name, grouping_id)
+);
CREATE TABLE IF NOT EXISTS SecondaryBranchExpectations (
branch_name STRING,
grouping_id BYTES,
@@ -143,10 +157,6 @@
INDEX grouping_digest_idx (grouping_id, digest),
INDEX tile_trace_idx (tile_id, trace_id)
);
-CREATE TABLE IF NOT EXISTS TrackingCommits (
- repo STRING PRIMARY KEY,
- last_git_hash STRING NOT NULL
-);
CREATE TABLE IF NOT EXISTS TraceValues (
shard INT2,
trace_id BYTES,
@@ -168,6 +178,10 @@
INDEX ignored_grouping_idx (matches_any_ignore_rule, grouping_id),
INVERTED INDEX keys_idx (keys)
);
+CREATE TABLE IF NOT EXISTS TrackingCommits (
+ repo STRING PRIMARY KEY,
+ last_git_hash STRING NOT NULL
+);
CREATE TABLE IF NOT EXISTS Tryjobs (
tryjob_id STRING PRIMARY KEY,
system STRING NOT NULL,
diff --git a/golden/go/sql/schema/tables.go b/golden/go/sql/schema/tables.go
index 46d8fa8..1463504 100644
--- a/golden/go/sql/schema/tables.go
+++ b/golden/go/sql/schema/tables.go
@@ -4,6 +4,8 @@
"crypto/md5"
"time"
+ "go.skia.org/infra/golden/go/types"
+
"github.com/google/uuid"
"github.com/jackc/pgtype"
@@ -118,30 +120,32 @@
// is turned into an actual SQL statement.
//go:generate go run ../exporter/tosql --output_file sql.go --logtostderr --output_pkg schema
type Tables struct {
- Changelists []ChangelistRow `sql_backup:"weekly"`
- CommitsWithData []CommitWithDataRow `sql_backup:"daily"`
- DiffMetrics []DiffMetricRow `sql_backup:"monthly"`
- ExpectationDeltas []ExpectationDeltaRow `sql_backup:"daily"`
- ExpectationRecords []ExpectationRecordRow `sql_backup:"daily"`
- Expectations []ExpectationRow `sql_backup:"daily"`
- GitCommits []GitCommitRow `sql_backup:"daily"`
- Groupings []GroupingRow `sql_backup:"monthly"`
- IgnoreRules []IgnoreRuleRow `sql_backup:"daily"`
- MetadataCommits []MetadataCommitRow `sql_backup:"daily"`
- Options []OptionsRow `sql_backup:"monthly"`
- Patchsets []PatchsetRow `sql_backup:"weekly"`
- PrimaryBranchParams []PrimaryBranchParamRow `sql_backup:"monthly"`
- ProblemImages []ProblemImageRow `sql_backup:"none"`
- SecondaryBranchExpectations []SecondaryBranchExpectationRow `sql_backup:"daily"`
- SecondaryBranchParams []SecondaryBranchParamRow `sql_backup:"monthly"`
- SecondaryBranchValues []SecondaryBranchValueRow `sql_backup:"monthly"`
- SourceFiles []SourceFileRow `sql_backup:"monthly"`
- TiledTraceDigests []TiledTraceDigestRow `sql_backup:"monthly"`
- TrackingCommits []TrackingCommitRow `sql_backup:"daily"`
- TraceValues []TraceValueRow `sql_backup:"monthly"`
- Traces []TraceRow `sql_backup:"monthly"`
- Tryjobs []TryjobRow `sql_backup:"weekly"`
- ValuesAtHead []ValueAtHeadRow `sql_backup:"monthly"`
+ Changelists []ChangelistRow `sql_backup:"weekly"`
+ CommitsWithData []CommitWithDataRow `sql_backup:"daily"`
+ DiffMetrics []DiffMetricRow `sql_backup:"monthly"`
+ ExpectationDeltas []ExpectationDeltaRow `sql_backup:"daily"`
+ ExpectationRecords []ExpectationRecordRow `sql_backup:"daily"`
+ Expectations []ExpectationRow `sql_backup:"daily"`
+ GitCommits []GitCommitRow `sql_backup:"daily"`
+ Groupings []GroupingRow `sql_backup:"monthly"`
+ IgnoreRules []IgnoreRuleRow `sql_backup:"daily"`
+ MetadataCommits []MetadataCommitRow `sql_backup:"daily"`
+ Options []OptionsRow `sql_backup:"monthly"`
+ Patchsets []PatchsetRow `sql_backup:"weekly"`
+ PrimaryBranchDiffCalculationWork []PrimaryBranchDiffCalculationRow `sql_backup:"none"`
+ PrimaryBranchParams []PrimaryBranchParamRow `sql_backup:"monthly"`
+ ProblemImages []ProblemImageRow `sql_backup:"none"`
+ SecondaryBranchDiffCalculationWork []SecondaryBranchDiffCalculationRow `sql_backup:"none"`
+ SecondaryBranchExpectations []SecondaryBranchExpectationRow `sql_backup:"daily"`
+ SecondaryBranchParams []SecondaryBranchParamRow `sql_backup:"monthly"`
+ SecondaryBranchValues []SecondaryBranchValueRow `sql_backup:"monthly"`
+ SourceFiles []SourceFileRow `sql_backup:"monthly"`
+ TiledTraceDigests []TiledTraceDigestRow `sql_backup:"monthly"`
+ TraceValues []TraceValueRow `sql_backup:"monthly"`
+ Traces []TraceRow `sql_backup:"monthly"`
+ TrackingCommits []TrackingCommitRow `sql_backup:"daily"`
+ Tryjobs []TryjobRow `sql_backup:"weekly"`
+ ValuesAtHead []ValueAtHeadRow `sql_backup:"monthly"`
// DeprecatedIngestedFiles allows us to keep track of files ingested with the old FS/BT ways
// until all the SQL ingestion is ready.
@@ -1046,3 +1050,67 @@
func (r *MetadataCommitRow) ScanFrom(scan func(...interface{}) error) error {
return scan(&r.CommitID, &r.CommitMetadata)
}
+
+// PrimaryBranchDiffCalculationRow represents a grouping for which we need to compute diffs using
+// digests seen on the primary branch. There are intentionally no indexes here for two reasons
+// 1) indexing monotonically increasing columns can cause issues, especially with changing data.
+// 2) for a typical instance, we don't expect there to be that many rows (thousands), so a full
+// table scan shouldn't be too expensive.
+type PrimaryBranchDiffCalculationRow struct {
+ // GroupingID is the grouping for which we should compute diffs.
+ GroupingID GroupingID `sql:"grouping_id BYTES PRIMARY KEY"`
+ // LastCalculated is the timestamp at which we last calculated diffs for the grouping.
+ LastCalculated time.Time `sql:"last_calculated_ts TIMESTAMP WITH TIME ZONE NOT NULL"`
+ // CalculationLeaseEnds is set to a future time when a worker starts computing diffs on this
+ // grouping. It allows workers to not do the same computation at the same time.
+ CalculationLeaseEnds time.Time `sql:"calculation_lease_ends TIMESTAMP WITH TIME ZONE NOT NULL"`
+}
+
+// ToSQLRow implements the sqltest.SQLExporter interface.
+func (r PrimaryBranchDiffCalculationRow) ToSQLRow() (colNames []string, colData []interface{}) {
+ return []string{"grouping_id", "last_calculated_ts", "calculation_lease_ends"},
+ []interface{}{r.GroupingID, r.LastCalculated, r.CalculationLeaseEnds}
+}
+
+// ScanFrom implements the sqltest.SQLScanner interface.
+func (r *PrimaryBranchDiffCalculationRow) ScanFrom(scan func(...interface{}) error) error {
+ err := scan(&r.GroupingID, &r.LastCalculated, &r.CalculationLeaseEnds)
+ if err != nil {
+ return skerr.Wrap(err)
+ }
+ r.LastCalculated = r.LastCalculated.UTC()
+ r.CalculationLeaseEnds = r.CalculationLeaseEnds.UTC()
+ return nil
+}
+
+// SecondaryBranchDiffCalculationRow represents a grouping for a CL for which wee need to compute
+// diffs for using digests from that CL as well as the digests on the primary branch.
+type SecondaryBranchDiffCalculationRow struct {
+ BranchName string `sql:"branch_name STRING"`
+
+ GroupingID GroupingID `sql:"grouping_id BYTES"`
+
+ LastUpdated time.Time `sql:"last_updated_ts TIMESTAMP WITH TIME ZONE NOT NULL"`
+
+ DigestsNotOnPrimary []types.Digest `sql:"digests STRING[] NOT NULL"`
+
+ LastCalculated time.Time `sql:"last_calculated_ts TIMESTAMP WITH TIME ZONE NOT NULL"`
+
+ CalculationLeaseEnds time.Time `sql:"calculation_lease_ends TIMESTAMP WITH TIME ZONE NOT NULL"`
+
+ primaryKey struct{} `sql:"PRIMARY KEY (branch_name, grouping_id)"`
+}
+
+// ToSQLRow implements the sqltest.SQLExporter interface.
+func (r SecondaryBranchDiffCalculationRow) ToSQLRow() (colNames []string, colData []interface{}) {
+ return []string{"branch_name", "grouping_id", "last_updated_ts", "digests",
+ "last_calculated_ts", "calculation_lease_ends"},
+ []interface{}{r.BranchName, r.GroupingID, r.LastUpdated, r.DigestsNotOnPrimary,
+ r.LastCalculated, r.CalculationLeaseEnds}
+}
+
+// ScanFrom implements the sqltest.SQLScanner interface.
+func (r *SecondaryBranchDiffCalculationRow) ScanFrom(scan func(...interface{}) error) error {
+ return scan(&r.BranchName, &r.GroupingID, &r.LastUpdated, &r.DigestsNotOnPrimary,
+ &r.LastCalculated, &r.CalculationLeaseEnds)
+}