[gold] Support SQL ingestion on primary branch when commit_id specified

Previously, I had written support for ingesting from CLs for ChromeOS,
but not the primary branch. This addresses that for the new SQL system.

Bug: skia:10582, skia:11367
Change-Id: I79d92c2cf0bdd02b0b82e0dd4952f4ff3eafc22e
Reviewed-on: https://skia-review.googlesource.com/c/buildbot/+/402918
Reviewed-by: Leandro Lovisolo <lovisolo@google.com>
diff --git a/golden/go/ingestion_processors/primarysql.go b/golden/go/ingestion_processors/primarysql.go
index 7a4f146..9752d1e 100644
--- a/golden/go/ingestion_processors/primarysql.go
+++ b/golden/go/ingestion_processors/primarysql.go
@@ -172,30 +172,45 @@
 func (s *sqlPrimaryIngester) getCommitAndTileID(ctx context.Context, gr *jsonio.GoldResults) (schema.CommitID, schema.TileID, error) {
 	ctx, span := trace.StartSpan(ctx, "getCommitAndTileID")
 	defer span.End()
-	if gr.GitHash == "" {
-		// should never happen, assuming the jsonio.Validate works
-		return "", 0, skerr.Fmt("missing GitHash")
-	}
-	if c, ok := s.commitsCache.Get(gr.GitHash); ok {
-		cce, ok := c.(commitCacheEntry)
-		if ok {
-			return cce.commitID, cce.tileID, nil
-		}
-		sklog.Warningf("Corrupt entry in commits cache: %#v", c)
-		s.commitsCache.Remove(gr.GitHash)
-	}
-	// Cache miss - go to DB; We can't assume it's in the CommitsWithData table yet.
-	row := s.db.QueryRow(ctx, `SELECT commit_id FROM GitCommits WHERE git_hash = $1`, gr.GitHash)
 	var commitID schema.CommitID
-	if err := row.Scan(&commitID); err != nil {
-		return "", 0, skerr.Wrapf(err, "looking up git_hash = %q", gr.GitHash)
+	var key string
+	if gr.GitHash != "" {
+		key = gr.GitHash
+		if c, ok := s.commitsCache.Get(key); ok {
+			cce, ok := c.(commitCacheEntry)
+			if ok {
+				return cce.commitID, cce.tileID, nil
+			}
+			sklog.Warningf("Corrupt entry in commits cache: %#v", c)
+			s.commitsCache.Remove(key)
+		}
+		// Cache miss - go to DB; We can't assume it's in the CommitsWithData table yet.
+		row := s.db.QueryRow(ctx, `SELECT commit_id FROM GitCommits WHERE git_hash = $1`, gr.GitHash)
+		if err := row.Scan(&commitID); err != nil {
+			return "", 0, skerr.Wrapf(err, "looking up git_hash = %q", gr.GitHash)
+		}
+	} else if gr.CommitID != "" && gr.CommitMetadata != "" {
+		key = gr.CommitID
+		if _, ok := s.commitsCache.Get(key); !ok {
+			// On a cache miss, make sure we store it to the table.
+			const statement = `INSERT INTO MetadataCommits (commit_id, commit_metadata)
+VALUES ($1, $2) ON CONFLICT DO NOTHING`
+			_, err := s.db.Exec(ctx, statement, gr.CommitID, gr.CommitMetadata)
+			if err != nil {
+				return "", 0, skerr.Wrapf(err, "storing metadata commit %q : %q", gr.CommitID, gr.CommitMetadata)
+			}
+		}
+		commitID = schema.CommitID(gr.CommitID)
+	} else {
+		// should never happen, assuming the jsonio.Validate works
+		return "", 0, skerr.Fmt("missing GitHash and CommitID/CommitMetadata")
 	}
 
 	tileID, err := s.getAndWriteTileIDForCommit(ctx, commitID)
 	if err != nil {
 		return "", 0, skerr.Wrapf(err, "computing tile id for %s", commitID)
 	}
-	s.updateCommitCache(gr, commitID, tileID)
+	s.updateCommitCache(gr, key, commitID, tileID)
 	return commitID, tileID, nil
 }
 
@@ -275,8 +290,8 @@
 
 // updateCommitCache updates the local cache of "CommitsWithData" if necessary (so we know we do
 // not have to write to the table the next time).
-func (s *sqlPrimaryIngester) updateCommitCache(gr *jsonio.GoldResults, id schema.CommitID, tileID schema.TileID) {
-	s.commitsCache.Add(gr.GitHash, commitCacheEntry{
+func (s *sqlPrimaryIngester) updateCommitCache(gr *jsonio.GoldResults, key string, id schema.CommitID, tileID schema.TileID) {
+	s.commitsCache.Add(key, commitCacheEntry{
 		commitID: id,
 		tileID:   tileID,
 	})
diff --git a/golden/go/ingestion_processors/primarysql_test.go b/golden/go/ingestion_processors/primarysql_test.go
index 3dfff50..bbc0b62 100644
--- a/golden/go/ingestion_processors/primarysql_test.go
+++ b/golden/go/ingestion_processors/primarysql_test.go
@@ -244,6 +244,228 @@
 	assert.Equal(t, resultsMetricBefore+3, s.resultsIngested.Get())
 }
 
+func TestPrimarySQL_Process_CommitIDSpecified_Success(t *testing.T) {
+	unittest.LargeTest(t)
+	ctx := context.Background()
+	db := sqltest.NewCockroachDBForTestsWithProductionSchema(ctx, t)
+
+	// This file has data from 3 traces across 2 corpora. The data is from the third commit.
+	const squareTraceKeys = `{"color mode":"RGB","device":"QuadroP400","name":"square","os":"Windows10.2","source_type":"corners"}`
+	const triangleTraceKeys = `{"color mode":"RGB","device":"QuadroP400","name":"triangle","os":"Windows10.2","source_type":"corners"}`
+	const circleTraceKeys = `{"color mode":"RGB","device":"QuadroP400","name":"circle","os":"Windows10.2","source_type":"round"}`
+	const expectedCommitID = "0000000100"
+	src := fakeGCSSourceFromFile(t, "has_metadata.json")
+	s := PrimaryBranchSQL(src, map[string]string{sqlTileWidthConfig: "5"}, db)
+	totalMetricBefore := s.filesProcessed.Get()
+	successMetricBefore := s.filesSuccess.Get()
+	resultsMetricBefore := s.resultsIngested.Get()
+
+	ctx = overwriteNow(ctx, fakeIngestionTime)
+	err := s.Process(ctx, dks.WindowsFile3)
+	require.NoError(t, err)
+
+	actualSourceFiles := sqltest.GetAllRows(ctx, t, db, "SourceFiles", &schema.SourceFileRow{}).([]schema.SourceFileRow)
+	assert.Equal(t, []schema.SourceFileRow{{
+		SourceFileID: h(dks.WindowsFile3),
+		SourceFile:   dks.WindowsFile3,
+		LastIngested: fakeIngestionTime,
+	}}, actualSourceFiles)
+
+	actualGroupings := sqltest.GetAllRows(ctx, t, db, "Groupings", &schema.GroupingRow{}).([]schema.GroupingRow)
+	assert.ElementsMatch(t, []schema.GroupingRow{{
+		GroupingID: h(circleGrouping),
+		Keys: map[string]string{
+			types.CorpusField:     dks.RoundCorpus,
+			types.PrimaryKeyField: dks.CircleTest,
+		},
+	}, {
+		GroupingID: h(squareGrouping),
+		Keys: map[string]string{
+			types.CorpusField:     dks.CornersCorpus,
+			types.PrimaryKeyField: dks.SquareTest,
+		},
+	}, {
+		GroupingID: h(triangleGrouping),
+		Keys: map[string]string{
+			types.CorpusField:     dks.CornersCorpus,
+			types.PrimaryKeyField: dks.TriangleTest,
+		},
+	}}, actualGroupings)
+
+	actualOptions := sqltest.GetAllRows(ctx, t, db, "Options", &schema.OptionsRow{}).([]schema.OptionsRow)
+	assert.ElementsMatch(t, []schema.OptionsRow{{
+		OptionsID: h(pngOptions),
+		Keys: map[string]string{
+			"ext": "png",
+		},
+	}}, actualOptions)
+
+	actualTraces := sqltest.GetAllRows(ctx, t, db, "Traces", &schema.TraceRow{}).([]schema.TraceRow)
+	assert.Equal(t, []schema.TraceRow{{
+		TraceID:    h(circleTraceKeys),
+		Corpus:     dks.RoundCorpus,
+		GroupingID: h(circleGrouping),
+		Keys: map[string]string{
+			types.CorpusField:     dks.RoundCorpus,
+			types.PrimaryKeyField: dks.CircleTest,
+			dks.ColorModeKey:      dks.RGBColorMode,
+			dks.OSKey:             dks.Windows10dot2OS,
+			dks.DeviceKey:         dks.QuadroDevice,
+		},
+		MatchesAnyIgnoreRule: schema.NBNull,
+	}, {
+		TraceID:    h(squareTraceKeys),
+		Corpus:     dks.CornersCorpus,
+		GroupingID: h(squareGrouping),
+		Keys: map[string]string{
+			types.CorpusField:     dks.CornersCorpus,
+			types.PrimaryKeyField: dks.SquareTest,
+			dks.ColorModeKey:      dks.RGBColorMode,
+			dks.OSKey:             dks.Windows10dot2OS,
+			dks.DeviceKey:         dks.QuadroDevice,
+		},
+		MatchesAnyIgnoreRule: schema.NBNull,
+	}, {
+		TraceID:    h(triangleTraceKeys),
+		Corpus:     dks.CornersCorpus,
+		GroupingID: h(triangleGrouping),
+		Keys: map[string]string{
+			types.CorpusField:     dks.CornersCorpus,
+			types.PrimaryKeyField: dks.TriangleTest,
+			dks.ColorModeKey:      dks.RGBColorMode,
+			dks.OSKey:             dks.Windows10dot2OS,
+			dks.DeviceKey:         dks.QuadroDevice,
+		},
+		MatchesAnyIgnoreRule: schema.NBNull,
+	}}, actualTraces)
+
+	actualCommitsWithData := sqltest.GetAllRows(ctx, t, db, "CommitsWithData", &schema.CommitWithDataRow{}).([]schema.CommitWithDataRow)
+	assert.Equal(t, []schema.CommitWithDataRow{{
+		CommitID: expectedCommitID,
+		TileID:   0,
+	}}, actualCommitsWithData)
+
+	metadataCommits := sqltest.GetAllRows(ctx, t, db, "MetadataCommits", &schema.MetadataCommitRow{}).([]schema.MetadataCommitRow)
+	assert.Equal(t, []schema.MetadataCommitRow{{
+		CommitID:       expectedCommitID,
+		CommitMetadata: "gs://example/commit/77e1d3255fde0bebc80f596f6a1c4ad79c8b8bbe",
+	}}, metadataCommits)
+
+	actualTraceValues := sqltest.GetAllRows(ctx, t, db, "TraceValues", &schema.TraceValueRow{}).([]schema.TraceValueRow)
+	assert.ElementsMatch(t, []schema.TraceValueRow{{
+		Shard:        0x4,
+		TraceID:      h(squareTraceKeys),
+		CommitID:     expectedCommitID,
+		Digest:       d(dks.DigestA01Pos),
+		GroupingID:   h(squareGrouping),
+		OptionsID:    h(pngOptions),
+		SourceFileID: h(dks.WindowsFile3),
+	}, {
+		Shard:        0x3,
+		TraceID:      h(triangleTraceKeys),
+		CommitID:     expectedCommitID,
+		Digest:       d(dks.DigestB01Pos),
+		GroupingID:   h(triangleGrouping),
+		OptionsID:    h(pngOptions),
+		SourceFileID: h(dks.WindowsFile3),
+	}, {
+		Shard:        0x7,
+		TraceID:      h(circleTraceKeys),
+		CommitID:     expectedCommitID,
+		Digest:       d(dks.DigestC01Pos),
+		GroupingID:   h(circleGrouping),
+		OptionsID:    h(pngOptions),
+		SourceFileID: h(dks.WindowsFile3),
+	}}, actualTraceValues)
+
+	actualValuesAtHead := sqltest.GetAllRows(ctx, t, db, "ValuesAtHead", &schema.ValueAtHeadRow{}).([]schema.ValueAtHeadRow)
+	assert.ElementsMatch(t, []schema.ValueAtHeadRow{{
+		TraceID:            h(squareTraceKeys),
+		MostRecentCommitID: expectedCommitID,
+		Digest:             d(dks.DigestA01Pos),
+		OptionsID:          h(pngOptions),
+		GroupingID:         h(squareGrouping),
+		Corpus:             dks.CornersCorpus,
+		Keys: map[string]string{
+			types.CorpusField:     dks.CornersCorpus,
+			types.PrimaryKeyField: dks.SquareTest,
+			dks.ColorModeKey:      dks.RGBColorMode,
+			dks.OSKey:             dks.Windows10dot2OS,
+			dks.DeviceKey:         dks.QuadroDevice,
+		},
+		MatchesAnyIgnoreRule: schema.NBNull,
+	}, {
+		TraceID:            h(triangleTraceKeys),
+		MostRecentCommitID: expectedCommitID,
+		Digest:             d(dks.DigestB01Pos),
+		OptionsID:          h(pngOptions),
+		GroupingID:         h(triangleGrouping),
+		Corpus:             dks.CornersCorpus,
+		Keys: map[string]string{
+			types.CorpusField:     dks.CornersCorpus,
+			types.PrimaryKeyField: dks.TriangleTest,
+			dks.ColorModeKey:      dks.RGBColorMode,
+			dks.OSKey:             dks.Windows10dot2OS,
+			dks.DeviceKey:         dks.QuadroDevice,
+		},
+		MatchesAnyIgnoreRule: schema.NBNull,
+	}, {
+		TraceID:            h(circleTraceKeys),
+		MostRecentCommitID: expectedCommitID,
+		Digest:             d(dks.DigestC01Pos),
+		OptionsID:          h(pngOptions),
+		GroupingID:         h(circleGrouping),
+		Corpus:             dks.RoundCorpus,
+		Keys: map[string]string{
+			types.CorpusField:     dks.RoundCorpus,
+			types.PrimaryKeyField: dks.CircleTest,
+			dks.ColorModeKey:      dks.RGBColorMode,
+			dks.OSKey:             dks.Windows10dot2OS,
+			dks.DeviceKey:         dks.QuadroDevice,
+		},
+		MatchesAnyIgnoreRule: schema.NBNull,
+	}}, actualValuesAtHead)
+
+	actualExpectations := sqltest.GetAllRows(ctx, t, db, "Expectations", &schema.ExpectationRow{}).([]schema.ExpectationRow)
+	assert.Equal(t, []schema.ExpectationRow{{
+		GroupingID: h(squareGrouping),
+		Digest:     d(dks.DigestA01Pos),
+		Label:      schema.LabelUntriaged,
+	}, {
+		GroupingID: h(triangleGrouping),
+		Digest:     d(dks.DigestB01Pos),
+		Label:      schema.LabelUntriaged,
+	}, {
+		GroupingID: h(circleGrouping),
+		Digest:     d(dks.DigestC01Pos),
+		Label:      schema.LabelUntriaged,
+	}}, actualExpectations)
+
+	actualPrimaryBranchParams := sqltest.GetAllRows(ctx, t, db, "PrimaryBranchParams", &schema.PrimaryBranchParamRow{}).([]schema.PrimaryBranchParamRow)
+	assert.Equal(t, []schema.PrimaryBranchParamRow{
+		{Key: dks.ColorModeKey, Value: dks.RGBColorMode, TileID: 0},
+		{Key: dks.DeviceKey, Value: dks.QuadroDevice, TileID: 0},
+		{Key: "ext", Value: "png", TileID: 0},
+		{Key: types.PrimaryKeyField, Value: dks.CircleTest, TileID: 0},
+		{Key: types.PrimaryKeyField, Value: dks.SquareTest, TileID: 0},
+		{Key: types.PrimaryKeyField, Value: dks.TriangleTest, TileID: 0},
+		{Key: dks.OSKey, Value: dks.Windows10dot2OS, TileID: 0},
+		{Key: types.CorpusField, Value: dks.CornersCorpus, TileID: 0},
+		{Key: types.CorpusField, Value: dks.RoundCorpus, TileID: 0},
+	}, actualPrimaryBranchParams)
+
+	actualTiledTraceDigests := sqltest.GetAllRows(ctx, t, db, "TiledTraceDigests", &schema.TiledTraceDigestRow{}).([]schema.TiledTraceDigestRow)
+	assert.Equal(t, []schema.TiledTraceDigestRow{
+		{TraceID: h(squareTraceKeys), Digest: d(dks.DigestA01Pos), TileID: 0, GroupingID: h(squareGrouping)},
+		{TraceID: h(triangleTraceKeys), Digest: d(dks.DigestB01Pos), TileID: 0, GroupingID: h(triangleGrouping)},
+		{TraceID: h(circleTraceKeys), Digest: d(dks.DigestC01Pos), TileID: 0, GroupingID: h(circleGrouping)},
+	}, actualTiledTraceDigests)
+
+	assert.Equal(t, totalMetricBefore+1, s.filesProcessed.Get())
+	assert.Equal(t, successMetricBefore+1, s.filesSuccess.Get())
+	assert.Equal(t, resultsMetricBefore+3, s.resultsIngested.Get())
+}
+
 func TestPrimarySQL_Process_TileAlreadyComputed_Success(t *testing.T) {
 	unittest.LargeTest(t)
 	ctx := context.Background()
diff --git a/golden/go/ingestion_processors/testdata/has_metadata.json b/golden/go/ingestion_processors/testdata/has_metadata.json
new file mode 100644
index 0000000..be095e8
--- /dev/null
+++ b/golden/go/ingestion_processors/testdata/has_metadata.json
@@ -0,0 +1,44 @@
+{
+  "description": "This file has data from three traces across two corpora. It specifies a commit id and metadata. It lines up with datakitchensink",
+  "commit_id" : "0000000100",
+  "commit_metadata" : "gs://example/commit/77e1d3255fde0bebc80f596f6a1c4ad79c8b8bbe",
+  "key" : {
+    "os": "Windows10.2",
+    "device": "QuadroP400"
+  },
+  "results" : [
+    {
+      "key" : {
+        "color mode" : "RGB",
+        "name" : "square",
+        "source_type" : "corners"
+      },
+      "md5" : "a01a01a01a01a01a01a01a01a01a01a0",
+      "options" : {
+        "ext" : "png"
+      }
+    },
+    {
+      "key" : {
+        "color mode" : "RGB",
+        "name" : "triangle",
+        "source_type" : "corners"
+      },
+      "md5" : "b01b01b01b01b01b01b01b01b01b01b0",
+      "options" : {
+        "ext" : "png"
+      }
+    },
+    {
+      "key" : {
+        "color mode" : "RGB",
+        "name" : "circle",
+        "source_type" : "round"
+      },
+      "md5" : "c01c01c01c01c01c01c01c01c01c01c0",
+      "options" : {
+        "ext" : "png"
+      }
+    }
+  ]
+}
\ No newline at end of file
diff --git a/golden/go/ingestion_processors/testdata/missing_git_hash.json b/golden/go/ingestion_processors/testdata/missing_git_hash.json
index 19e1ce7..a12729a 100644
--- a/golden/go/ingestion_processors/testdata/missing_git_hash.json
+++ b/golden/go/ingestion_processors/testdata/missing_git_hash.json
@@ -1,5 +1,5 @@
 {
-  "description": "This file is missing gitHash",
+  "description": "This file is missing gitHash and commit_id/commit_metadata",
   "gitHash" : "",
   "key" : {
     "os": "Windows10.2",
diff --git a/golden/go/sql/schema/sql.go b/golden/go/sql/schema/sql.go
index 86bdab7..28108e0 100644
--- a/golden/go/sql/schema/sql.go
+++ b/golden/go/sql/schema/sql.go
@@ -71,6 +71,10 @@
   note STRING,
   query JSONB NOT NULL
 );
+CREATE TABLE IF NOT EXISTS MetadataCommits (
+  commit_id STRING PRIMARY KEY,
+  commit_metadata STRING NOT NULL
+);
 CREATE TABLE IF NOT EXISTS Options (
   options_id BYTES PRIMARY KEY,
   keys JSONB NOT NULL
diff --git a/golden/go/sql/schema/tables.go b/golden/go/sql/schema/tables.go
index c3e20ca..5b32ea7 100644
--- a/golden/go/sql/schema/tables.go
+++ b/golden/go/sql/schema/tables.go
@@ -115,6 +115,7 @@
 	GitCommits                  []GitCommitRow                  `sql_backup:"daily"`
 	Groupings                   []GroupingRow                   `sql_backup:"monthly"`
 	IgnoreRules                 []IgnoreRuleRow                 `sql_backup:"daily"`
+	MetadataCommits             []MetadataCommitRow             `sql_backup:"daily"`
 	Options                     []OptionsRow                    `sql_backup:"monthly"`
 	Patchsets                   []PatchsetRow                   `sql_backup:"weekly"`
 	PrimaryBranchParams         []PrimaryBranchParamRow         `sql_backup:"monthly"`
@@ -998,3 +999,22 @@
 func (r *TrackingCommitRow) ScanFrom(scan func(...interface{}) error) error {
 	return scan(&r.Repo, &r.LastGitHash)
 }
+
+type MetadataCommitRow struct {
+	// CommitID is a potentially arbitrary string. It is a foreign key in the CommitsWithData table.
+	CommitID CommitID `sql:"commit_id STRING PRIMARY KEY"`
+	// CommitMetadata is an arbitrary string; For current implementations, it is a link to a GCS
+	// file that has more information about the state of the repo when the data was generated.
+	CommitMetadata string `sql:"commit_metadata STRING NOT NULL"`
+}
+
+// ToSQLRow implements the sqltest.SQLExporter interface.
+func (r MetadataCommitRow) ToSQLRow() (colNames []string, colData []interface{}) {
+	return []string{"commit_id", "commit_metadata"},
+		[]interface{}{r.CommitID, r.CommitMetadata}
+}
+
+// ScanFrom implements the sqltest.SQLScanner interface.
+func (r *MetadataCommitRow) ScanFrom(scan func(...interface{}) error) error {
+	return scan(&r.CommitID, &r.CommitMetadata)
+}