[louhi] Fix DB retrieval for GetLatestFlowExecutions, add test

Bug: skia:13861
Change-Id: Iae754a26e3090d1b092aa7e2f049b3dfa5ba51ee
Reviewed-on: https://skia-review.googlesource.com/c/buildbot/+/600319
Commit-Queue: Eric Boren <borenet@google.com>
Auto-Submit: Eric Boren <borenet@google.com>
Reviewed-by: Ravi Mistry <rmistry@google.com>
diff --git a/go/louhi/firestore/BUILD.bazel b/go/louhi/firestore/BUILD.bazel
index 27737b3..7273ca1 100644
--- a/go/louhi/firestore/BUILD.bazel
+++ b/go/louhi/firestore/BUILD.bazel
@@ -1,3 +1,4 @@
+load("//bazel/go:go_test.bzl", "go_test")
 load("@io_bazel_rules_go//go:def.bzl", "go_library")
 
 go_library(
@@ -16,3 +17,14 @@
         "@org_golang_x_oauth2//google",
     ],
 )
+
+go_test(
+    name = "firestore_test",
+    srcs = ["firestore_test.go"],
+    embed = [":firestore"],
+    deps = [
+        "//go/firestore/testutils",
+        "//go/louhi",
+        "@com_github_stretchr_testify//require",
+    ],
+)
diff --git a/go/louhi/firestore/firestore.go b/go/louhi/firestore/firestore.go
index c09cd6f..61d5190 100644
--- a/go/louhi/firestore/firestore.go
+++ b/go/louhi/firestore/firestore.go
@@ -38,10 +38,15 @@
 	if err != nil {
 		return nil, skerr.Wrapf(err, "failed to create firestore client")
 	}
+	return newDBWithClient(ctx, client), nil
+}
+
+// newDBWithClient returns a FirestoreDB instance which uses the given Client.
+func newDBWithClient(ctx context.Context, client *firestore.Client) *FirestoreDB {
 	return &FirestoreDB{
 		client: client,
 		flows:  client.Collection(collectionFlows),
-	}, nil
+	}
 }
 
 // PutFlowExecution implements DB.
@@ -80,26 +85,40 @@
 
 // GetLatestFlowExecutions implements DB.
 func (db *FirestoreDB) GetLatestFlowExecutions(ctx context.Context) (map[string]*louhi.FlowExecution, error) {
-	iter := db.flows.DocumentRefs(ctx)
+	// Iterate over all known flows.
+	flowIter := db.flows.DocumentRefs(ctx)
 	rv := map[string]*louhi.FlowExecution{}
 	for {
-		doc, err := iter.Next()
+		doc, err := flowIter.Next()
 		if err == iterator.Done {
 			break
 		} else if err != nil {
 			return nil, skerr.Wrapf(err, "failed to search FlowExecutions")
 		}
-		docs, err := doc.Collection(collectionExecutions).OrderBy("CreatedAt", fs.Desc).Where("Result", "!=", louhi.FlowResultUnknown).OrderBy("Result", fs.Asc).Limit(1).Documents(ctx).GetAll()
-		if err != nil {
-			return nil, skerr.Wrap(err)
+		// Iterate over the executions of this flow in most-recent-first order
+		// until we find one which has finished.
+		execIter := doc.Collection(collectionExecutions).OrderBy("CreatedAt", fs.Desc).Documents(ctx)
+		var fe *louhi.FlowExecution
+		for {
+			doc, err := execIter.Next()
+			if err == iterator.Done {
+				break
+			} else if err != nil {
+				return nil, skerr.Wrapf(err, "failed to search FlowExecutions")
+			}
+			fe = new(louhi.FlowExecution)
+			if err := doc.DataTo(fe); err != nil {
+				return nil, skerr.Wrap(err)
+			}
+			if fe.Finished() {
+				break
+			}
 		}
-		if len(docs) == 0 {
+		if !fe.Finished() {
+			// This indicates that there are no executions of this flow which
+			// have finished. Just move on.
 			continue
 		}
-		fe := new(louhi.FlowExecution)
-		if err := docs[0].DataTo(fe); err != nil {
-			return nil, skerr.Wrap(err)
-		}
 		// The DB stores flows by unique ID, not name, and the ID may change
 		// as the flow is edited, so we should deduplicate by name.
 		if prev, ok := rv[fe.FlowName]; !ok || prev.CreatedAt.Before(fe.CreatedAt) {
diff --git a/go/louhi/firestore/firestore_test.go b/go/louhi/firestore/firestore_test.go
new file mode 100644
index 0000000..acfead3
--- /dev/null
+++ b/go/louhi/firestore/firestore_test.go
@@ -0,0 +1,125 @@
+package firestore
+
+import (
+	"context"
+	"testing"
+	"time"
+
+	"github.com/stretchr/testify/require"
+	"go.skia.org/infra/go/firestore/testutils"
+	"go.skia.org/infra/go/louhi"
+)
+
+func setup(t *testing.T) (context.Context, louhi.DB, func()) {
+	ctx, cancel := context.WithCancel(context.Background())
+	c, cleanup := testutils.NewClientForTesting(ctx, t)
+	d := newDBWithClient(ctx, c)
+	return ctx, d, func() {
+		cancel()
+		cleanup()
+	}
+}
+
+func TestFirestoreDB_PutGet(t *testing.T) {
+	ctx, db, cleanup := setup(t)
+	defer cleanup()
+	startTs := time.Unix(1667570100, 0).UTC()
+	finishTs := time.Unix(1667570500, 0).UTC()
+	fe := &louhi.FlowExecution{
+		Artifacts:    []string{"artifact-1"},
+		CreatedAt:    startTs,
+		FinishedAt:   finishTs,
+		FlowName:     "my flow",
+		FlowID:       "my-flow-123",
+		GeneratedCLs: []string{"skia-review/12345"},
+		GitBranch:    "main",
+		GitCommit:    "abc123",
+		ID:           "456",
+		Link:         "https://flows/456",
+		ModifiedAt:   finishTs,
+		ProjectID:    "my-project",
+		Result:       louhi.FlowResultSuccess,
+		SourceCL:     "skia-review/54321",
+		StartedBy:    "Louhi",
+		TriggerType:  louhi.TriggerTypeCommit,
+	}
+	require.NoError(t, db.PutFlowExecution(ctx, fe))
+	actual, err := db.GetFlowExecution(ctx, fe.ID)
+	require.NoError(t, err)
+	require.Equal(t, fe, actual)
+}
+
+func TestFirestoreDB_GetLatestFlowExecutions(t *testing.T) {
+	ctx, db, cleanup := setup(t)
+	defer cleanup()
+	startTs := time.Unix(1667570100, 0).UTC()
+	finishTs := time.Unix(1667570500, 0).UTC()
+	// This is an execution of a previous revision of the flow. Note that the
+	// flow name is the same as the other two executions, but the flow ID is
+	// different. Because there is a newer flow execution with the same name,
+	// this will be retrieved from the DB but not returned by
+	// GetLatestFlowExecutions, as it will get deduplicated.
+	fe0 := &louhi.FlowExecution{
+		CreatedAt:   startTs.Add(-time.Hour),
+		FinishedAt:  finishTs.Add(-time.Hour),
+		FlowName:    "my flow",
+		FlowID:      "my-flow-original",
+		GitBranch:   "main",
+		GitCommit:   "123abc",
+		ID:          "123",
+		Link:        "https://flows/123",
+		ModifiedAt:  finishTs.Add(-time.Hour),
+		ProjectID:   "my-project",
+		Result:      louhi.FlowResultFailure,
+		SourceCL:    "skia-review/54321",
+		StartedBy:   "Louhi",
+		TriggerType: louhi.TriggerTypeCommit,
+	}
+	// This is the last finished execution of the most recent revision of the
+	// flow. We expect this execution to be the only one returned by
+	// GetLatestFlowExecutions.
+	fe1 := &louhi.FlowExecution{
+		Artifacts:    []string{"artifact-1"},
+		CreatedAt:    startTs,
+		FinishedAt:   finishTs,
+		FlowName:     "my flow",
+		FlowID:       "my-flow-123",
+		GeneratedCLs: []string{"skia-review/12345"},
+		GitBranch:    "main",
+		GitCommit:    "abc123",
+		ID:           "456",
+		Link:         "https://flows/456",
+		ModifiedAt:   finishTs,
+		ProjectID:    "my-project",
+		Result:       louhi.FlowResultSuccess,
+		SourceCL:     "skia-review/54321",
+		StartedBy:    "Louhi",
+		TriggerType:  louhi.TriggerTypeCommit,
+	}
+	// This is an unfinished execution of the most recent revision of the flow.
+	// it should be passed over in favor of fe1.
+	fe2 := &louhi.FlowExecution{
+		CreatedAt:   startTs.Add(time.Hour),
+		FinishedAt:  finishTs.Add(time.Hour),
+		FlowName:    "my flow",
+		FlowID:      "my-flow-123",
+		GitBranch:   "main",
+		GitCommit:   "def456",
+		ID:          "919",
+		Link:        "https://flows/919",
+		ModifiedAt:  finishTs.Add(time.Hour),
+		ProjectID:   "my-project",
+		Result:      louhi.FlowResultUnknown,
+		SourceCL:    "skia-review/54321",
+		StartedBy:   "Louhi",
+		TriggerType: louhi.TriggerTypeCommit,
+	}
+	require.NoError(t, db.PutFlowExecution(ctx, fe0))
+	require.NoError(t, db.PutFlowExecution(ctx, fe1))
+	require.NoError(t, db.PutFlowExecution(ctx, fe2))
+	actual, err := db.GetLatestFlowExecutions(ctx)
+	require.NoError(t, err)
+	require.Equal(t, map[string]*louhi.FlowExecution{
+		fe1.FlowName: fe1,
+	}, actual)
+}