[gold] Replace Pub/Sub diffcalculator Part 1

This is the first in a chain of CLs to replace Pub/Sub with
a table-based approach. This will hopefully prevent us from
getting a really big queue of backlog that prevents CLs from
having their diffs calculated in a timely manner.

This handles the primary branch case. Follow-on CLs will
deal with CL data and update periodictasks to store to
the correct tables instead of to Pub/Sub.

Bug: skia:12221
Change-Id: I21a36c759491ff01e7c71f7852a63713448cd2c2
Reviewed-on: https://skia-review.googlesource.com/c/buildbot/+/433919
Reviewed-by: Leandro Lovisolo <lovisolo@google.com>
diff --git a/golden/cmd/diffcalculator/BUILD.bazel b/golden/cmd/diffcalculator/BUILD.bazel
index a0d55e5..c7431be 100644
--- a/golden/cmd/diffcalculator/BUILD.bazel
+++ b/golden/cmd/diffcalculator/BUILD.bazel
@@ -10,6 +10,8 @@
         "//go/common",
         "//go/httputils",
         "//go/metrics2",
+        "//go/now",
+        "//go/paramtools",
         "//go/skerr",
         "//go/sklog",
         "//go/util",
@@ -17,10 +19,13 @@
         "//golden/go/diff",
         "//golden/go/diff/worker",
         "//golden/go/sql",
+        "//golden/go/sql/schema",
         "//golden/go/tracing",
         "//golden/go/types",
+        "@com_github_cockroachdb_cockroach_go_v2//crdb/crdbpgx",
+        "@com_github_hashicorp_golang_lru//:golang-lru",
+        "@com_github_jackc_pgx_v4//:pgx",
         "@com_github_jackc_pgx_v4//pgxpool",
-        "@com_google_cloud_go_pubsub//:pubsub",
         "@com_google_cloud_go_storage//:storage",
         "@io_opencensus_go//trace",
     ],
@@ -37,12 +42,18 @@
     srcs = ["diffcalculator_test.go"],
     embed = [":diffcalculator_lib"],
     deps = [
+        "//go/now",
         "//go/paramtools",
-        "//go/skerr",
         "//go/testutils",
         "//go/testutils/unittest",
+        "//golden/go/diff",
         "//golden/go/diff/mocks",
+        "//golden/go/sql/schema",
+        "//golden/go/sql/sqltest",
         "//golden/go/types",
+        "@com_github_hashicorp_golang_lru//:golang-lru",
+        "@com_github_jackc_pgx_v4//pgxpool",
         "@com_github_stretchr_testify//assert",
+        "@com_github_stretchr_testify//require",
     ],
 )
diff --git a/golden/cmd/diffcalculator/diffcalculator.go b/golden/cmd/diffcalculator/diffcalculator.go
index 0af060b..062d00a 100644
--- a/golden/cmd/diffcalculator/diffcalculator.go
+++ b/golden/cmd/diffcalculator/diffcalculator.go
@@ -4,23 +4,25 @@
 
 import (
 	"context"
-	"encoding/json"
 	"flag"
 	"io/ioutil"
+	"math/rand"
 	"net/http"
 	"path"
-	"sync"
-	"sync/atomic"
 	"time"
 
-	"cloud.google.com/go/pubsub"
 	gstorage "cloud.google.com/go/storage"
+	"github.com/cockroachdb/cockroach-go/v2/crdb/crdbpgx"
+	lru "github.com/hashicorp/golang-lru"
+	"github.com/jackc/pgx/v4"
 	"github.com/jackc/pgx/v4/pgxpool"
 	"go.opencensus.io/trace"
 
 	"go.skia.org/infra/go/common"
 	"go.skia.org/infra/go/httputils"
 	"go.skia.org/infra/go/metrics2"
+	"go.skia.org/infra/go/now"
+	"go.skia.org/infra/go/paramtools"
 	"go.skia.org/infra/go/skerr"
 	"go.skia.org/infra/go/sklog"
 	"go.skia.org/infra/go/util"
@@ -28,6 +30,7 @@
 	"go.skia.org/infra/golden/go/diff"
 	"go.skia.org/infra/golden/go/diff/worker"
 	"go.skia.org/infra/golden/go/sql"
+	"go.skia.org/infra/golden/go/sql/schema"
 	"go.skia.org/infra/golden/go/tracing"
 	"go.skia.org/infra/golden/go/types"
 )
@@ -38,30 +41,18 @@
 
 	// The GCS folder that contains the images, named by their digests.
 	imgFolder = "dm-images-v1"
+
+	calculateCLDataProportion = 0.7
+
+	primaryBranchStalenessThreshold = time.Minute
+
+	diffCalculationTimeout = 10 * time.Minute
+
+	groupingCacheSize = 100_000
 )
 
 type diffCalculatorConfig struct {
 	config.Common
-
-	// DiffCacheNamespace is a namespace for differentiating the DiffCache entities. The instance
-	// name is fine here.
-	DiffCacheNamespace string `json:"diff_cache_namespace" optional:"true"`
-
-	// DiffWorkSubscription is the subscription name used by all replicas of the diffcalculator.
-	// By setting the subscriber ID to be the same on all instances of the diffcalculator,
-	// only one of the replicas will get each event (usually). We like our subscription names
-	// to be unique and keyed to the instance, for easier following up on "Why are there so many
-	// backed up messages?"
-	DiffWorkSubscription string `json:"diff_work_subscription"`
-
-	// MemcachedServer is the address in the form dns_name:port
-	// (e.g. gold-memcached-0.gold-memcached:11211).
-	MemcachedServer string `json:"memcached_server" optional:"true"`
-
-	// PubSubFetchSize is how many worker messages to ask PubSub for. This defaults to 10, but for
-	// instances that have many tests, but most of the messages result in no-ops, this can be
-	// higher for better utilization and throughput.
-	PubSubFetchSize int `json:"pubsub_fetch_size" optional:"true"`
 }
 
 func main() {
@@ -106,23 +97,54 @@
 
 	db := mustInitSQLDatabase(ctx, dcc)
 	gis := mustMakeGCSImageSource(ctx, dcc)
+	gc, err := lru.New(groupingCacheSize)
+	if err != nil {
+		sklog.Fatalf("Could not initialize cache: %s", err)
+	}
+
 	sqlProcessor := &processor{
-		calculator:  worker.NewV2(db, gis, dcc.WindowSize),
-		ackCounter:  metrics2.GetCounter("diffcalculator_ack"),
-		nackCounter: metrics2.GetCounter("diffcalculator_nack"),
+		calculator:     worker.NewV2(db, gis, dcc.WindowSize),
+		db:             db,
+		groupingCache:  gc,
+		primaryCounter: metrics2.GetCounter("diffcalculator_primarybranch_processed"),
+		clsCounter:     metrics2.GetCounter("diffcalculator_cls_processed"),
 	}
 
 	go func() {
-		// Wait at least 5 seconds for the pubsub connection to be initialized before saying
+		// Wait at least 5 seconds for the db connection to be initialized before saying
 		// we are healthy.
 		time.Sleep(5 * time.Second)
 		http.HandleFunc("/healthz", httputils.ReadyHandleFunc)
 		sklog.Fatal(http.ListenAndServe(dcc.ReadyPort, nil))
 	}()
 
-	startMetrics(ctx, sqlProcessor)
+	sklog.Fatalf("Stopped while polling for work %s", beginPolling(ctx, sqlProcessor))
+}
 
-	sklog.Fatalf("Listening for work %s", listen(ctx, dcc, sqlProcessor))
+// beginPolling will continuously try to find work to compute either from CLs or the primary branch.
+func beginPolling(ctx context.Context, sqlProcessor *processor) error {
+	rand.Seed(time.Now().UnixNano())
+	for {
+		if err := ctx.Err(); err != nil {
+			return skerr.Wrap(err)
+		}
+		if rand.Float32() < calculateCLDataProportion {
+			if err := sqlProcessor.computeDiffsForCL(ctx); err != nil {
+				sklog.Errorf("Error computing diffs for CL: %s", err)
+				continue
+			}
+		} else {
+			shouldSleep, err := sqlProcessor.computeDiffsForPrimaryBranch(ctx)
+			if err != nil {
+				sklog.Errorf("Error computing diffs on primary branch: %s", err)
+				continue
+			}
+			if shouldSleep {
+				// TODO(kjlubick) make sure we don't poll as fast as possible when there is
+				// 		no work to do.
+			}
+		}
+	}
 }
 
 func mustInitSQLDatabase(ctx context.Context, dcc diffCalculatorConfig) *pgxpool.Pool {
@@ -176,116 +198,89 @@
 	return b, skerr.Wrap(err)
 }
 
-func listen(ctx context.Context, dcc diffCalculatorConfig, p *processor) error {
-	psc, err := pubsub.NewClient(ctx, dcc.PubsubProjectID)
-	if err != nil {
-		return skerr.Wrapf(err, "initializing pubsub client for project %s", dcc.PubsubProjectID)
-	}
-
-	// Check that the topic exists. Fail if it does not.
-	t := psc.Topic(dcc.DiffWorkTopic)
-	if exists, err := t.Exists(ctx); err != nil {
-		return skerr.Wrapf(err, "checking for existing topic %s", dcc.DiffWorkTopic)
-	} else if !exists {
-		return skerr.Fmt("Diff work topic %s does not exist in project %s", dcc.DiffWorkTopic, dcc.PubsubProjectID)
-	}
-
-	// Check that the subscription exists. Fail if it does not.
-	sub := psc.Subscription(dcc.DiffWorkSubscription)
-	if exists, err := sub.Exists(ctx); err != nil {
-		return skerr.Wrapf(err, "checking for existing subscription %s", dcc.DiffWorkSubscription)
-	} else if !exists {
-		return skerr.Fmt("subscription %s does not exist in project %s", dcc.DiffWorkSubscription, dcc.PubsubProjectID)
-	}
-
-	// This is a limit of how many messages to fetch when PubSub has no work. Waiting for PubSub
-	// to give us messages can take a second or two, so we choose a small, but not too small
-	// batch size.
-	if dcc.PubSubFetchSize == 0 {
-		sub.ReceiveSettings.MaxOutstandingMessages = 10
-	} else {
-		sub.ReceiveSettings.MaxOutstandingMessages = dcc.PubSubFetchSize
-	}
-
-	// This process will handle one message at a time. This allows us to more finely control the
-	// scaling up as necessary.
-	sub.ReceiveSettings.NumGoroutines = 1
-
-	// Blocks until context cancels or pubsub fails in a non retryable way.
-	return skerr.Wrap(sub.Receive(ctx, p.processPubSubMessage))
-}
-
 type processor struct {
-	calculator  diff.Calculator
-	ackCounter  metrics2.Counter
-	nackCounter metrics2.Counter
-	// busy is either 1 or 0 depending on if this processor is working or not. This allows us
-	// to gather data on wall-clock utilization.
-	busy int64
-	// PubSub sometimes gives us more than one messages at a time. This mutex ensures that
-	// we only really process one at a time, which makes sure we don't overload our CPU estimate
-	// and we avoid cache thrashing.
-	oneMessageAtATime sync.Mutex
+	db             *pgxpool.Pool
+	calculator     diff.Calculator
+	groupingCache  *lru.Cache
+	primaryCounter metrics2.Counter
+	clsCounter     metrics2.Counter
 }
 
-// processPubSubMessage processes the data in the given pubsub message and acks or nacks it
-// as appropriate.
-func (p *processor) processPubSubMessage(ctx context.Context, msg *pubsub.Message) {
-	p.oneMessageAtATime.Lock()
-	defer p.oneMessageAtATime.Unlock()
-	ctx, span := trace.StartSpan(ctx, "processFromPubSub")
-	defer span.End()
-	atomic.StoreInt64(&p.busy, 1)
-	if shouldAck := p.processMessage(ctx, msg.Data); shouldAck {
-		msg.Ack()
-		p.ackCounter.Inc(1)
-	} else {
-		msg.Nack()
-		p.nackCounter.Inc(1)
-	}
-	atomic.StoreInt64(&p.busy, 0)
-}
-
-// processMessage reads the bytes as JSON and calls CalculateDiffs if those bytes were valid.
-// We have this as its own function to make it easier to test (it's hard to instantiate a valid
-// pubsub message without the emulator because there are private members that need initializing).
-// It returns a bool that represents whether the message should be Ack'd (not retried) or Nack'd
-// (retried later).
-func (p *processor) processMessage(ctx context.Context, msgData []byte) bool {
-	defer metrics2.FuncTimer().Stop()
-	var wm diff.WorkerMessage
-	if err := json.Unmarshal(msgData, &wm); err != nil {
-		sklog.Errorf("Invalid message passed in: %s\n%s", err, string(msgData))
-		return true // ack this message so no other subscriber gets it (it will still be invalid).
-	}
-	if wm.Version != diff.WorkerMessageVersion {
-		return true // This is an old or a new message, skip it.
-	}
+// computeDiffsForPrimaryBranch fetches the grouping which has not had diff computation happen
+// in the longest time and that some other process is not currently working on.
+func (p *processor) computeDiffsForPrimaryBranch(ctx context.Context) (bool, error) {
 	// Prevent our workers from getting starved out with long-running tasks. Cancel them, an
 	// requeue them. CalculateDiffs should be streaming results, so we get some partial progress.
 	ctx, cancel := context.WithTimeout(ctx, 10*time.Minute)
 	defer cancel()
-	err := p.calculator.CalculateDiffs(ctx, wm.Grouping, wm.AdditionalLeft, wm.AdditionalRight)
+	ctx, span := trace.StartSpan(ctx, "diffcalculator_computeDiffsForPrimaryBranch")
+	defer span.End()
+
+	hasWork := false
+	var groupingID schema.GroupingID
+
+	err := crdbpgx.ExecuteTx(ctx, p.db, pgx.TxOptions{}, func(tx pgx.Tx) error {
+		ts := now.Now(ctx)
+		const selectStatement = `SELECT grouping_id
+FROM PrimaryBranchDiffCalculationWork
+WHERE calculation_lease_ends < $1 AND last_calculated_ts < $2
+ORDER BY last_calculated_ts ASC
+LIMIT 1`
+		row := tx.QueryRow(ctx, selectStatement, ts, ts.Add(-1*primaryBranchStalenessThreshold))
+
+		if err := row.Scan(&groupingID); err != nil {
+			if err == pgx.ErrNoRows {
+				// We've calculated data for the entire primary branch to better than the threshold,
+				// so we return because there's nothing to do right now.
+				return nil
+			}
+			return err // don't wrap - might be retried
+		}
+
+		const updateStatement = `UPDATE PrimaryBranchDiffCalculationWork
+SET calculation_lease_ends = $2 WHERE grouping_id = $1`
+		if _, err := tx.Exec(ctx, updateStatement, groupingID, ts.Add(diffCalculationTimeout)); err != nil {
+			return err // don't wrap, might be retried
+		}
+		hasWork = true
+		return nil
+	})
 	if err != nil {
-		sklog.Errorf("Calculating diffs for %v: %s", wm, err)
-		return false // Let this be tried again.
+		return false, skerr.Wrap(err)
 	}
-	return true // successfully processed.
+	if !hasWork {
+		return true, nil
+	}
+	grouping, err := p.expandGrouping(ctx, groupingID)
+	if err != nil {
+		return false, skerr.Wrap(err)
+	}
+	if err := p.calculator.CalculateDiffs(ctx, grouping, nil); err != nil {
+		return false, skerr.Wrap(err)
+	}
+	p.primaryCounter.Inc(1)
+	return false, nil
 }
 
-func startMetrics(ctx context.Context, p *processor) {
-	// This metric will let us get a sense of how well-utilized this processor is. It reads the
-	// busy int of the processor (which is 0 or 1) and increments the counter with that value.
-	// Because we are updating the counter once per second, we can use rate() [which computes deltas
-	// per second] on this counter to get a number between 0 and 1 to indicate wall-clock
-	// utilization. Hopefully, this lets us know if we need to add more replicas.
-	go func() {
-		busy := metrics2.GetCounter("diffcalculator_busy_pulses")
-		for range time.Tick(time.Second) {
-			if err := ctx.Err(); err != nil {
-				return
-			}
-			busy.Inc(atomic.LoadInt64(&p.busy))
+// expandGrouping returns the params associated with the grouping id. It will use the cache - if
+// there is a cache miss, it will look it up, add it to the cache and return it.
+func (p *processor) expandGrouping(ctx context.Context, groupingID schema.GroupingID) (paramtools.Params, error) {
+	ctx, span := trace.StartSpan(ctx, "expandGrouping")
+	defer span.End()
+	var groupingKeys paramtools.Params
+	if gk, ok := p.groupingCache.Get(string(groupingID)); ok {
+		return gk.(paramtools.Params), nil
+	} else {
+		const statement = `SELECT keys FROM Groupings WHERE grouping_id = $1`
+		row := p.db.QueryRow(ctx, statement, groupingID)
+		if err := row.Scan(&groupingKeys); err != nil {
+			return nil, skerr.Wrap(err)
 		}
-	}()
+		p.groupingCache.Add(string(groupingID), groupingKeys)
+	}
+	return groupingKeys, nil
+}
+
+func (p *processor) computeDiffsForCL(ctx context.Context) error {
+	return skerr.Fmt("Not impl")
 }
diff --git a/golden/cmd/diffcalculator/diffcalculator_test.go b/golden/cmd/diffcalculator/diffcalculator_test.go
index d8eb252..d072710 100644
--- a/golden/cmd/diffcalculator/diffcalculator_test.go
+++ b/golden/cmd/diffcalculator/diffcalculator_test.go
@@ -2,77 +2,184 @@
 
 import (
 	"context"
+	"crypto/md5"
+	"encoding/json"
 	"testing"
+	"time"
 
+	lru "github.com/hashicorp/golang-lru"
+	"github.com/jackc/pgx/v4/pgxpool"
 	"github.com/stretchr/testify/assert"
+	"github.com/stretchr/testify/require"
 
+	"go.skia.org/infra/go/now"
 	"go.skia.org/infra/go/paramtools"
-	"go.skia.org/infra/go/skerr"
 	"go.skia.org/infra/go/testutils"
 	"go.skia.org/infra/go/testutils/unittest"
+	"go.skia.org/infra/golden/go/diff"
 	"go.skia.org/infra/golden/go/diff/mocks"
+	"go.skia.org/infra/golden/go/sql/schema"
+	"go.skia.org/infra/golden/go/sql/sqltest"
 	"go.skia.org/infra/golden/go/types"
 )
 
-func TestProcessPubSubMessage_OldJSON_NoCalculation_Ack(t *testing.T) {
-	unittest.SmallTest(t)
+func TestComputeDiffsForPrimaryBranch_WorkAvailable_Success(t *testing.T) {
+	unittest.LargeTest(t)
 
-	p := processor{}
+	fakeNow := ts("2021-02-02T02:30:00Z")
 
-	messageBytes := []byte(`{"grouping":{"name":"any-test","other grouping":"something","source_type":"any-corpus"},"additional_digests":["abcd","ef123"]}`)
-	shouldAck := p.processMessage(context.Background(), messageBytes)
-	assert.True(t, shouldAck)
-}
+	existingData := schema.Tables{PrimaryBranchDiffCalculationWork: []schema.PrimaryBranchDiffCalculationRow{
+		{
+			GroupingID:           h(alphaGrouping), // available for work
+			LastCalculated:       ts("2021-02-02T02:15:00Z"),
+			CalculationLeaseEnds: ts("2021-02-02T02:14:00Z"),
+		},
+		{
+			GroupingID:           h(betaGrouping), // Too recently computed
+			LastCalculated:       ts("2021-02-02T02:29:50Z"),
+			CalculationLeaseEnds: ts("2021-02-02T02:14:00Z"),
+		},
+		{
+			GroupingID:           h(gammaGrouping), // another worker has it "leased"
+			LastCalculated:       ts("2021-02-02T02:25:00Z"),
+			CalculationLeaseEnds: ts("2021-02-02T02:37:00Z"),
+		},
+		{
+			GroupingID:           h(deltaGrouping), // available for work (oldest)
+			LastCalculated:       ts("2021-02-02T02:12:00Z"),
+			CalculationLeaseEnds: ts("2021-02-02T02:14:00Z"),
+		},
+	}, Groupings: makeGroupingRows(alphaGrouping, betaGrouping, gammaGrouping, deltaGrouping)}
 
-func TestProcessPubSubMessage_ValidJSON_CalculateSucceeds_Ack(t *testing.T) {
-	unittest.SmallTest(t)
+	ctx := context.WithValue(context.Background(), now.ContextKey, fakeNow)
+	db := sqltest.NewCockroachDBForTestsWithProductionSchema(ctx, t)
+	require.NoError(t, sqltest.BulkInsertDataTables(ctx, db, existingData))
 
-	mc := mocks.Calculator{}
+	mc := &mocks.Calculator{}
+	mc.On("CalculateDiffs", testutils.AnyContext, ps(deltaGrouping), noDigests).Return(nil)
 
-	expectedGrouping := paramtools.Params{
-		types.CorpusField:     "any-corpus",
-		types.PrimaryKeyField: "any-test",
-		"other grouping":      "something",
-	}
-	expectedLeftDigests := []types.Digest{"abcd", "ef123"}
-	expectedRightDigests := []types.Digest{"4567"}
+	s := processorForTest(mc, db)
 
-	mc.On("CalculateDiffs", testutils.AnyContext, expectedGrouping, expectedLeftDigests, expectedRightDigests).Return(nil)
+	shouldSleep, err := s.computeDiffsForPrimaryBranch(ctx)
+	require.NoError(t, err)
+	assert.False(t, shouldSleep)
 
-	p := processor{calculator: &mc}
-
-	messageBytes := []byte(`{"version":3,"grouping":{"name":"any-test","other grouping":"something","source_type":"any-corpus"},"additional_left":["abcd","ef123"],"additional_right":["4567"]}`)
-	shouldAck := p.processMessage(context.Background(), messageBytes)
-	assert.True(t, shouldAck)
 	mc.AssertExpectations(t)
+
+	actualWork := sqltest.GetAllRows(ctx, t, db, "PrimaryBranchDiffCalculationWork", &schema.PrimaryBranchDiffCalculationRow{})
+	assert.Contains(t, actualWork, schema.PrimaryBranchDiffCalculationRow{
+		GroupingID:           h(deltaGrouping),
+		LastCalculated:       ts("2021-02-02T02:12:00Z"),
+		CalculationLeaseEnds: ts("2021-02-02T02:40:00Z"), // This is the timeout + fakeNow
+	})
 }
 
-func TestProcessPubSubMessage_ValidJSON_CalculateFails_Nack(t *testing.T) {
-	unittest.SmallTest(t)
+func TestComputeDiffsForPrimaryBranch_NoWorkAvailable_ShouldSleep(t *testing.T) {
+	unittest.LargeTest(t)
 
-	mc := mocks.Calculator{}
+	fakeNow := ts("2021-02-02T02:30:00Z")
 
-	expectedGrouping := paramtools.Params{
-		types.CorpusField:     "any-corpus",
-		types.PrimaryKeyField: "any-test",
+	rowsThatShouldBeUnchanged := []schema.PrimaryBranchDiffCalculationRow{
+		{
+			GroupingID:           h(alphaGrouping), // another worker has it "leased"
+			LastCalculated:       ts("2021-02-02T02:15:00Z"),
+			CalculationLeaseEnds: ts("2021-02-02T02:34:00Z"),
+		},
+		{
+			GroupingID:           h(betaGrouping), // Too recently computed
+			LastCalculated:       ts("2021-02-02T02:29:50Z"),
+			CalculationLeaseEnds: ts("2021-02-02T02:14:00Z"),
+		},
+		{
+			GroupingID:           h(gammaGrouping), // another worker has it "leased"
+			LastCalculated:       ts("2021-02-02T02:25:00Z"),
+			CalculationLeaseEnds: ts("2021-02-02T02:37:00Z"),
+		},
+		{
+			GroupingID:           h(deltaGrouping), // Too recently computed
+			LastCalculated:       ts("2021-02-02T02:29:45Z"),
+			CalculationLeaseEnds: ts("2021-02-02T02:14:00Z"),
+		},
 	}
-	var noExpectedDigests []types.Digest
+	existingData := schema.Tables{PrimaryBranchDiffCalculationWork: rowsThatShouldBeUnchanged, Groupings: makeGroupingRows(alphaGrouping, betaGrouping, gammaGrouping, deltaGrouping)}
 
-	mc.On("CalculateDiffs", testutils.AnyContext, expectedGrouping, noExpectedDigests, noExpectedDigests).Return(skerr.Fmt("boom"))
+	ctx := context.WithValue(context.Background(), now.ContextKey, fakeNow)
+	db := sqltest.NewCockroachDBForTestsWithProductionSchema(ctx, t)
+	require.NoError(t, sqltest.BulkInsertDataTables(ctx, db, existingData))
 
-	p := processor{calculator: &mc}
+	s := processorForTest(nil, db)
 
-	messageBytes := []byte(`{"version":3,"grouping":{"name":"any-test","source_type":"any-corpus"}}`)
-	shouldAck := p.processMessage(context.Background(), messageBytes)
-	assert.False(t, shouldAck)
-	mc.AssertExpectations(t)
+	shouldSleep, err := s.computeDiffsForPrimaryBranch(ctx)
+	require.NoError(t, err)
+	assert.True(t, shouldSleep)
+
+	// We shouldn't have leased any work
+	actualWork := sqltest.GetAllRows(ctx, t, db, "PrimaryBranchDiffCalculationWork", &schema.PrimaryBranchDiffCalculationRow{})
+	assert.ElementsMatch(t, rowsThatShouldBeUnchanged, actualWork)
 }
 
-func TestProcessPubSubMessage_InvalidJSON_Ack(t *testing.T) {
-	unittest.SmallTest(t)
-
-	p := processor{}
-	messageBytes := []byte(`invalid json`)
-	shouldAck := p.processMessage(context.Background(), messageBytes)
-	assert.True(t, shouldAck)
+func processorForTest(c diff.Calculator, db *pgxpool.Pool) *processor {
+	cache, err := lru.New(100)
+	if err != nil {
+		panic(err)
+	}
+	return &processor{
+		db:             db,
+		calculator:     c,
+		groupingCache:  cache,
+		primaryCounter: fakeCounter{},
+		clsCounter:     fakeCounter{},
+	}
 }
+
+func makeGroupingRows(groupings ...string) []schema.GroupingRow {
+	rv := make([]schema.GroupingRow, 0, len(groupings))
+	for _, g := range groupings {
+		rv = append(rv, schema.GroupingRow{
+			GroupingID: h(g),
+			Keys:       ps(g),
+		})
+	}
+	return rv
+}
+
+const (
+	alphaGrouping = `{"name":"alpha","source_type":"corpus_one"}`
+	betaGrouping  = `{"name":"beta","source_type":"corpus_two"}`
+	gammaGrouping = `{"name":"gamma","source_type":"corpus_one"}`
+	deltaGrouping = `{"name":"delta","source_type":"corpus_two"}`
+)
+
+var (
+	noDigests []types.Digest
+)
+
+func ts(s string) time.Time {
+	t, err := time.Parse(time.RFC3339, s)
+	if err != nil {
+		panic(err)
+	}
+	return t
+}
+
+// h returns the MD5 hash of the provided string.
+func h(s string) []byte {
+	hash := md5.Sum([]byte(s))
+	return hash[:]
+}
+
+func ps(s string) paramtools.Params {
+	var rv paramtools.Params
+	if err := json.Unmarshal([]byte(s), &rv); err != nil {
+		panic(err)
+	}
+	return rv
+}
+
+type fakeCounter struct{}
+
+func (_ fakeCounter) Dec(_ int64)   {}
+func (_ fakeCounter) Delete() error { return nil }
+func (_ fakeCounter) Get() int64    { return 0 }
+func (_ fakeCounter) Inc(i int64)   {}
+func (_ fakeCounter) Reset()        {}
diff --git a/golden/go/diff/diff.go b/golden/go/diff/diff.go
index 6598ed9..1fcbc16 100644
--- a/golden/go/diff/diff.go
+++ b/golden/go/diff/diff.go
@@ -281,20 +281,11 @@
 
 type Calculator interface {
 	// CalculateDiffs recomputes all diffs for the current grouping, including any digests provided.
-	// Images (digests) will be sorted into two buckets, the left and right bucket. The left bucket
-	// is a superset of the right bucket. The right bucket consists of all triaged images
-	// for this grouping. The left bucket consists of *all* digests seen for this grouping.
-	// During search, a user will want to see the closest positive and negative image for a given
-	// image. By splitting the images into two different buckets, we do less precomputation. For
-	// example, if there are several flaky traces in a grouping, it can be that 10% of the overall
-	// images for a grouping are triaged and 90% are *not* (these typically come from ignored traces
-	// because they produce something different during most commits). In such a case, computing a
-	// given untriaged digest from a trace against a different untriaged digest is a waste since it
-	// won't show up in the search results. By splitting the images into two buckets, we can
-	// dramatically reduce the computation done over a naive N x N comparison scheme.
-	CalculateDiffs(ctx context.Context, grouping paramtools.Params, additionalLeft, additionalRight []types.Digest) error
+	CalculateDiffs(ctx context.Context, grouping paramtools.Params, additional []types.Digest) error
 }
 
+// TODO(kjlubick) remove WorkerMesssage and WorkerMessageVersion
+
 // WorkerMessageVersion is the current version of the WorkerMessage JSON.
 const WorkerMessageVersion = 3
 
diff --git a/golden/go/diff/mocks/Calculator.go b/golden/go/diff/mocks/Calculator.go
index 11feee1..ba897bf 100644
--- a/golden/go/diff/mocks/Calculator.go
+++ b/golden/go/diff/mocks/Calculator.go
@@ -17,13 +17,13 @@
 	mock.Mock
 }
 
-// CalculateDiffs provides a mock function with given fields: ctx, grouping, additionalLeft, additionalRight
-func (_m *Calculator) CalculateDiffs(ctx context.Context, grouping paramtools.Params, additionalLeft []types.Digest, additionalRight []types.Digest) error {
-	ret := _m.Called(ctx, grouping, additionalLeft, additionalRight)
+// CalculateDiffs provides a mock function with given fields: ctx, grouping, additional
+func (_m *Calculator) CalculateDiffs(ctx context.Context, grouping paramtools.Params, additional []types.Digest) error {
+	ret := _m.Called(ctx, grouping, additional)
 
 	var r0 error
-	if rf, ok := ret.Get(0).(func(context.Context, paramtools.Params, []types.Digest, []types.Digest) error); ok {
-		r0 = rf(ctx, grouping, additionalLeft, additionalRight)
+	if rf, ok := ret.Get(0).(func(context.Context, paramtools.Params, []types.Digest) error); ok {
+		r0 = rf(ctx, grouping, additional)
 	} else {
 		r0 = ret.Error(0)
 	}
diff --git a/golden/go/diff/worker/worker.go b/golden/go/diff/worker/worker.go
index ff86ce4..aeabe91 100644
--- a/golden/go/diff/worker/worker.go
+++ b/golden/go/diff/worker/worker.go
@@ -117,7 +117,7 @@
 func (w *WorkerImpl) CalculateDiffs(ctx context.Context, grouping paramtools.Params, addLeft, addRight []types.Digest) error {
 	ctx, span := trace.StartSpan(ctx, "CalculateDiffs")
 	if span.IsRecordingEvents() {
-		addMetadata(span, grouping, len(addLeft), len(addRight))
+		addMetadata(span, grouping, len(addLeft))
 	}
 	defer span.End()
 	startingTile, err := w.getStartingTile(ctx)
@@ -280,12 +280,11 @@
 
 // addMetadata adds some attributes to the span so we can tell how much work it was supposed to
 // be doing when we are looking at the traces and the performance.
-func addMetadata(span *trace.Span, grouping paramtools.Params, leftDigestCount, rightDigestCount int) {
+func addMetadata(span *trace.Span, grouping paramtools.Params, leftDigestCount int) {
 	groupingStr, _ := json.Marshal(grouping)
 	span.AddAttributes(
 		trace.StringAttribute("grouping", string(groupingStr)),
-		trace.Int64Attribute("left_digests", int64(leftDigestCount)),
-		trace.Int64Attribute("right_digests", int64(rightDigestCount)))
+		trace.Int64Attribute("additional_digests", int64(leftDigestCount)))
 }
 
 // getStartingTile returns the commit ID which is the beginning of the tile of interest (so we
@@ -609,6 +608,3 @@
 	}
 	return diff.GetNRGBA(im), nil
 }
-
-// Make sure WorkerImpl fulfills the diff.Calculator interface.
-var _ diff.Calculator = (*WorkerImpl)(nil)
diff --git a/golden/go/diff/worker/worker2.go b/golden/go/diff/worker/worker2.go
index 9e3255f..a76344e 100644
--- a/golden/go/diff/worker/worker2.go
+++ b/golden/go/diff/worker/worker2.go
@@ -60,10 +60,10 @@
 // CalculateDiffs calculates the diffs for the given grouping. It either computes all of the diffs
 // if there are only "a few" digests, otherwise it computes a subset of them, taking into account
 // recency and triage status.
-func (w *WorkerImpl2) CalculateDiffs(ctx context.Context, grouping paramtools.Params, addLeft, addRight []types.Digest) error {
+func (w *WorkerImpl2) CalculateDiffs(ctx context.Context, grouping paramtools.Params, additional []types.Digest) error {
 	ctx, span := trace.StartSpan(ctx, "worker2_CalculateDiffs")
 	if span.IsRecordingEvents() {
-		addMetadata(span, grouping, len(addLeft), len(addRight))
+		addMetadata(span, grouping, len(additional))
 	}
 	defer span.End()
 	startingTile, err := w.getStartingTile(ctx)
@@ -75,9 +75,9 @@
 		return skerr.Wrap(err)
 	}
 
-	total := len(allDigests) + len(addLeft) + len(addRight)
+	total := len(allDigests) + len(additional)
 	w.inputDigestsSummary.Observe(float64(total))
-	inputDigests := convertToDigestBytes(append(addLeft, addRight...))
+	inputDigests := convertToDigestBytes(additional)
 	if total > computeTotalGridCutoff {
 		// If there are too many digests, we perform a somewhat expensive operation of looking at
 		// the digests produced by all traces to find a smaller subset of images that we should
@@ -85,8 +85,7 @@
 		// a small percentage of groupings (i.e. tests) to have many digests.
 		return skerr.Wrap(w.calculateDiffSubset(ctx, grouping, inputDigests, startingTile))
 	}
-	allDigests = addDigests(allDigests, addLeft)
-	allDigests = addDigests(allDigests, addRight)
+	allDigests = append(allDigests, inputDigests...)
 	return skerr.Wrap(w.calculateAllDiffs(ctx, allDigests))
 }
 
@@ -105,22 +104,6 @@
 	return rv
 }
 
-// addDigests adds the given hex-encoded digests to the slice of bytes.
-func addDigests(digests []schema.DigestBytes, additional []types.Digest) []schema.DigestBytes {
-	if len(additional) == 0 {
-		return digests
-	}
-	for _, d := range additional {
-		b, err := sql.DigestToBytes(d)
-		if err != nil {
-			sklog.Warningf("Invalid digest seen %q: %s", d, err)
-			continue
-		}
-		digests = append(digests, b)
-	}
-	return digests
-}
-
 // getStartingTile returns the commit ID which is the beginning of the tile of interest (so we
 // get enough data to do our comparisons).
 func (w *WorkerImpl2) getStartingTile(ctx context.Context) (schema.TileID, error) {
diff --git a/golden/go/diff/worker/worker2_test.go b/golden/go/diff/worker/worker2_test.go
index 1f4a8b9..579f1a8 100644
--- a/golden/go/diff/worker/worker2_test.go
+++ b/golden/go/diff/worker/worker2_test.go
@@ -37,7 +37,7 @@
 		types.PrimaryKeyField: "not used",
 	}
 	imagesToCalculateDiffsFor := []types.Digest{dks.DigestA01Pos, dks.DigestA02Pos, dks.DigestA04Unt, dks.DigestA05Unt}
-	require.NoError(t, w.CalculateDiffs(ctx, grouping, imagesToCalculateDiffsFor, imagesToCalculateDiffsFor))
+	require.NoError(t, w.CalculateDiffs(ctx, grouping, imagesToCalculateDiffsFor))
 
 	actualMetrics := getAllDiffMetricRows(t, db)
 	assert.Equal(t, []schema.DiffMetricRow{
@@ -77,7 +77,7 @@
 		types.CorpusField:     dks.CornersCorpus,
 		types.PrimaryKeyField: dks.TriangleTest,
 	}
-	require.NoError(t, w.CalculateDiffs(ctx, grouping, nil, nil))
+	require.NoError(t, w.CalculateDiffs(ctx, grouping, nil))
 
 	actualMetrics := getAllDiffMetricRows(t, db)
 	assert.Equal(t, []schema.DiffMetricRow{
@@ -131,7 +131,7 @@
 	for i := 0; i < computeTotalGridCutoff+1; i++ {
 		extraDigests = append(extraDigests, dks.DigestBlank)
 	}
-	require.NoError(t, w.CalculateDiffs(ctx, grouping, extraDigests, nil))
+	require.NoError(t, w.CalculateDiffs(ctx, grouping, extraDigests))
 
 	actualMetrics := getAllDiffMetricRows(t, db)
 	assert.Equal(t, []schema.DiffMetricRow{
@@ -179,7 +179,7 @@
 		types.CorpusField:     dks.RoundCorpus,
 		types.PrimaryKeyField: dks.CircleTest,
 	}
-	require.NoError(t, w.CalculateDiffs(ctx, grouping, []types.Digest{dks.DigestC06Pos_CL}, []types.Digest{dks.DigestC06Pos_CL}))
+	require.NoError(t, w.CalculateDiffs(ctx, grouping, []types.Digest{dks.DigestC06Pos_CL}))
 
 	actualMetrics := getAllDiffMetricRows(t, db)
 	assert.Equal(t, []schema.DiffMetricRow{
@@ -226,7 +226,7 @@
 		types.PrimaryKeyField: "not used",
 	}
 	imagesToCalculateDiffsFor := []types.Digest{dks.DigestA01Pos, dks.DigestA02Pos, dks.DigestA04Unt}
-	require.NoError(t, w.CalculateDiffs(ctx, grouping, imagesToCalculateDiffsFor, imagesToCalculateDiffsFor))
+	require.NoError(t, w.CalculateDiffs(ctx, grouping, imagesToCalculateDiffsFor))
 
 	actualMetrics := getAllDiffMetricRows(t, db)
 	// We should see partial success
@@ -278,7 +278,7 @@
 		types.PrimaryKeyField: "not used",
 	}
 	imagesToCalculateDiffsFor := []types.Digest{dks.DigestA01Pos, dks.DigestA02Pos, dks.DigestA04Unt}
-	require.NoError(t, w.CalculateDiffs(ctx, grouping, imagesToCalculateDiffsFor, imagesToCalculateDiffsFor))
+	require.NoError(t, w.CalculateDiffs(ctx, grouping, imagesToCalculateDiffsFor))
 
 	actualMetrics := getAllDiffMetricRows(t, db)
 	// We should see partial success
diff --git a/golden/go/sql/schema/BUILD.bazel b/golden/go/sql/schema/BUILD.bazel
index bc9c5d7..399b432 100644
--- a/golden/go/sql/schema/BUILD.bazel
+++ b/golden/go/sql/schema/BUILD.bazel
@@ -13,6 +13,7 @@
         "//go/paramtools",
         "//go/skerr",
         "//golden/go/expectations",
+        "//golden/go/types",
         "@com_github_google_uuid//:uuid",
         "@com_github_jackc_pgtype//:pgtype",
     ],
diff --git a/golden/go/sql/schema/sql.go b/golden/go/sql/schema/sql.go
index 50288db..fcc2fa2 100644
--- a/golden/go/sql/schema/sql.go
+++ b/golden/go/sql/schema/sql.go
@@ -91,6 +91,11 @@
   created_ts TIMESTAMP WITH TIME ZONE,
   INDEX cl_order_idx (changelist_id, ps_order)
 );
+CREATE TABLE IF NOT EXISTS PrimaryBranchDiffCalculationWork (
+  grouping_id BYTES PRIMARY KEY,
+  last_calculated_ts TIMESTAMP WITH TIME ZONE NOT NULL,
+  calculation_lease_ends TIMESTAMP WITH TIME ZONE NOT NULL
+);
 CREATE TABLE IF NOT EXISTS PrimaryBranchParams (
   tile_id INT4,
   key STRING,
@@ -103,6 +108,15 @@
   latest_error STRING NOT NULL,
   error_ts TIMESTAMP WITH TIME ZONE NOT NULL
 );
+CREATE TABLE IF NOT EXISTS SecondaryBranchDiffCalculationWork (
+  branch_name STRING,
+  grouping_id BYTES,
+  last_updated_ts TIMESTAMP WITH TIME ZONE NOT NULL,
+  digests STRING[] NOT NULL,
+  last_calculated_ts TIMESTAMP WITH TIME ZONE NOT NULL,
+  calculation_lease_ends TIMESTAMP WITH TIME ZONE NOT NULL,
+  PRIMARY KEY (branch_name, grouping_id)
+);
 CREATE TABLE IF NOT EXISTS SecondaryBranchExpectations (
   branch_name STRING,
   grouping_id BYTES,
@@ -143,10 +157,6 @@
   INDEX grouping_digest_idx (grouping_id, digest),
   INDEX tile_trace_idx (tile_id, trace_id)
 );
-CREATE TABLE IF NOT EXISTS TrackingCommits (
-  repo STRING PRIMARY KEY,
-  last_git_hash STRING NOT NULL
-);
 CREATE TABLE IF NOT EXISTS TraceValues (
   shard INT2,
   trace_id BYTES,
@@ -168,6 +178,10 @@
   INDEX ignored_grouping_idx (matches_any_ignore_rule, grouping_id),
   INVERTED INDEX keys_idx (keys)
 );
+CREATE TABLE IF NOT EXISTS TrackingCommits (
+  repo STRING PRIMARY KEY,
+  last_git_hash STRING NOT NULL
+);
 CREATE TABLE IF NOT EXISTS Tryjobs (
   tryjob_id STRING PRIMARY KEY,
   system STRING NOT NULL,
diff --git a/golden/go/sql/schema/tables.go b/golden/go/sql/schema/tables.go
index 46d8fa8..1463504 100644
--- a/golden/go/sql/schema/tables.go
+++ b/golden/go/sql/schema/tables.go
@@ -4,6 +4,8 @@
 	"crypto/md5"
 	"time"
 
+	"go.skia.org/infra/golden/go/types"
+
 	"github.com/google/uuid"
 	"github.com/jackc/pgtype"
 
@@ -118,30 +120,32 @@
 // is turned into an actual SQL statement.
 //go:generate go run ../exporter/tosql --output_file sql.go --logtostderr --output_pkg schema
 type Tables struct {
-	Changelists                 []ChangelistRow                 `sql_backup:"weekly"`
-	CommitsWithData             []CommitWithDataRow             `sql_backup:"daily"`
-	DiffMetrics                 []DiffMetricRow                 `sql_backup:"monthly"`
-	ExpectationDeltas           []ExpectationDeltaRow           `sql_backup:"daily"`
-	ExpectationRecords          []ExpectationRecordRow          `sql_backup:"daily"`
-	Expectations                []ExpectationRow                `sql_backup:"daily"`
-	GitCommits                  []GitCommitRow                  `sql_backup:"daily"`
-	Groupings                   []GroupingRow                   `sql_backup:"monthly"`
-	IgnoreRules                 []IgnoreRuleRow                 `sql_backup:"daily"`
-	MetadataCommits             []MetadataCommitRow             `sql_backup:"daily"`
-	Options                     []OptionsRow                    `sql_backup:"monthly"`
-	Patchsets                   []PatchsetRow                   `sql_backup:"weekly"`
-	PrimaryBranchParams         []PrimaryBranchParamRow         `sql_backup:"monthly"`
-	ProblemImages               []ProblemImageRow               `sql_backup:"none"`
-	SecondaryBranchExpectations []SecondaryBranchExpectationRow `sql_backup:"daily"`
-	SecondaryBranchParams       []SecondaryBranchParamRow       `sql_backup:"monthly"`
-	SecondaryBranchValues       []SecondaryBranchValueRow       `sql_backup:"monthly"`
-	SourceFiles                 []SourceFileRow                 `sql_backup:"monthly"`
-	TiledTraceDigests           []TiledTraceDigestRow           `sql_backup:"monthly"`
-	TrackingCommits             []TrackingCommitRow             `sql_backup:"daily"`
-	TraceValues                 []TraceValueRow                 `sql_backup:"monthly"`
-	Traces                      []TraceRow                      `sql_backup:"monthly"`
-	Tryjobs                     []TryjobRow                     `sql_backup:"weekly"`
-	ValuesAtHead                []ValueAtHeadRow                `sql_backup:"monthly"`
+	Changelists                        []ChangelistRow                     `sql_backup:"weekly"`
+	CommitsWithData                    []CommitWithDataRow                 `sql_backup:"daily"`
+	DiffMetrics                        []DiffMetricRow                     `sql_backup:"monthly"`
+	ExpectationDeltas                  []ExpectationDeltaRow               `sql_backup:"daily"`
+	ExpectationRecords                 []ExpectationRecordRow              `sql_backup:"daily"`
+	Expectations                       []ExpectationRow                    `sql_backup:"daily"`
+	GitCommits                         []GitCommitRow                      `sql_backup:"daily"`
+	Groupings                          []GroupingRow                       `sql_backup:"monthly"`
+	IgnoreRules                        []IgnoreRuleRow                     `sql_backup:"daily"`
+	MetadataCommits                    []MetadataCommitRow                 `sql_backup:"daily"`
+	Options                            []OptionsRow                        `sql_backup:"monthly"`
+	Patchsets                          []PatchsetRow                       `sql_backup:"weekly"`
+	PrimaryBranchDiffCalculationWork   []PrimaryBranchDiffCalculationRow   `sql_backup:"none"`
+	PrimaryBranchParams                []PrimaryBranchParamRow             `sql_backup:"monthly"`
+	ProblemImages                      []ProblemImageRow                   `sql_backup:"none"`
+	SecondaryBranchDiffCalculationWork []SecondaryBranchDiffCalculationRow `sql_backup:"none"`
+	SecondaryBranchExpectations        []SecondaryBranchExpectationRow     `sql_backup:"daily"`
+	SecondaryBranchParams              []SecondaryBranchParamRow           `sql_backup:"monthly"`
+	SecondaryBranchValues              []SecondaryBranchValueRow           `sql_backup:"monthly"`
+	SourceFiles                        []SourceFileRow                     `sql_backup:"monthly"`
+	TiledTraceDigests                  []TiledTraceDigestRow               `sql_backup:"monthly"`
+	TraceValues                        []TraceValueRow                     `sql_backup:"monthly"`
+	Traces                             []TraceRow                          `sql_backup:"monthly"`
+	TrackingCommits                    []TrackingCommitRow                 `sql_backup:"daily"`
+	Tryjobs                            []TryjobRow                         `sql_backup:"weekly"`
+	ValuesAtHead                       []ValueAtHeadRow                    `sql_backup:"monthly"`
 
 	// DeprecatedIngestedFiles allows us to keep track of files ingested with the old FS/BT ways
 	// until all the SQL ingestion is ready.
@@ -1046,3 +1050,67 @@
 func (r *MetadataCommitRow) ScanFrom(scan func(...interface{}) error) error {
 	return scan(&r.CommitID, &r.CommitMetadata)
 }
+
+// PrimaryBranchDiffCalculationRow represents a grouping for which we need to compute diffs using
+// digests seen on the primary branch. There are intentionally no indexes here for two reasons
+// 1) indexing monotonically increasing columns can cause issues, especially with changing data.
+// 2) for a typical instance, we don't expect there to be that many rows (thousands), so a full
+//    table scan shouldn't be too expensive.
+type PrimaryBranchDiffCalculationRow struct {
+	// GroupingID is the grouping for which we should compute diffs.
+	GroupingID GroupingID `sql:"grouping_id BYTES PRIMARY KEY"`
+	// LastCalculated is the timestamp at which we last calculated diffs for the grouping.
+	LastCalculated time.Time `sql:"last_calculated_ts TIMESTAMP WITH TIME ZONE NOT NULL"`
+	// CalculationLeaseEnds is set to a future time when a worker starts computing diffs on this
+	// grouping. It allows workers to not do the same computation at the same time.
+	CalculationLeaseEnds time.Time `sql:"calculation_lease_ends TIMESTAMP WITH TIME ZONE NOT NULL"`
+}
+
+// ToSQLRow implements the sqltest.SQLExporter interface.
+func (r PrimaryBranchDiffCalculationRow) ToSQLRow() (colNames []string, colData []interface{}) {
+	return []string{"grouping_id", "last_calculated_ts", "calculation_lease_ends"},
+		[]interface{}{r.GroupingID, r.LastCalculated, r.CalculationLeaseEnds}
+}
+
+// ScanFrom implements the sqltest.SQLScanner interface.
+func (r *PrimaryBranchDiffCalculationRow) ScanFrom(scan func(...interface{}) error) error {
+	err := scan(&r.GroupingID, &r.LastCalculated, &r.CalculationLeaseEnds)
+	if err != nil {
+		return skerr.Wrap(err)
+	}
+	r.LastCalculated = r.LastCalculated.UTC()
+	r.CalculationLeaseEnds = r.CalculationLeaseEnds.UTC()
+	return nil
+}
+
+// SecondaryBranchDiffCalculationRow represents a grouping for a CL for which wee need to compute
+// diffs for using digests from that CL as well as the digests on the primary branch.
+type SecondaryBranchDiffCalculationRow struct {
+	BranchName string `sql:"branch_name STRING"`
+
+	GroupingID GroupingID `sql:"grouping_id BYTES"`
+
+	LastUpdated time.Time `sql:"last_updated_ts TIMESTAMP WITH TIME ZONE NOT NULL"`
+
+	DigestsNotOnPrimary []types.Digest `sql:"digests STRING[] NOT NULL"`
+
+	LastCalculated time.Time `sql:"last_calculated_ts TIMESTAMP WITH TIME ZONE NOT NULL"`
+
+	CalculationLeaseEnds time.Time `sql:"calculation_lease_ends TIMESTAMP WITH TIME ZONE NOT NULL"`
+
+	primaryKey struct{} `sql:"PRIMARY KEY (branch_name, grouping_id)"`
+}
+
+// ToSQLRow implements the sqltest.SQLExporter interface.
+func (r SecondaryBranchDiffCalculationRow) ToSQLRow() (colNames []string, colData []interface{}) {
+	return []string{"branch_name", "grouping_id", "last_updated_ts", "digests",
+			"last_calculated_ts", "calculation_lease_ends"},
+		[]interface{}{r.BranchName, r.GroupingID, r.LastUpdated, r.DigestsNotOnPrimary,
+			r.LastCalculated, r.CalculationLeaseEnds}
+}
+
+// ScanFrom implements the sqltest.SQLScanner interface.
+func (r *SecondaryBranchDiffCalculationRow) ScanFrom(scan func(...interface{}) error) error {
+	return scan(&r.BranchName, &r.GroupingID, &r.LastUpdated, &r.DigestsNotOnPrimary,
+		&r.LastCalculated, &r.CalculationLeaseEnds)
+}