[Pinpoint] Executue Jobstore Activities in Pairwise Workflow
Before commiting we must insure PGAdpater is running and connecting the
service to the experimental database instance where the database table
is running.
Will adjust kubernetes configs for databaseWriteback flag to be set to
true and also run PGA.
Change-Id: I5628241428afa642b60b9e12448be6969ec9c385
Reviewed-on: https://skia-review.googlesource.com/c/buildbot/+/1017447
Reviewed-by: Eduardo Yap <eduardoyap@google.com>
Commit-Queue: Natnael Alemayehu <natnaelal@google.com>
Reviewed-by: Ashwin Verleker <ashwinpv@google.com>
diff --git a/pinpoint/go/sql/jobs_store/jobs_store.go b/pinpoint/go/sql/jobs_store/jobs_store.go
index cb52091..40e1c83 100644
--- a/pinpoint/go/sql/jobs_store/jobs_store.go
+++ b/pinpoint/go/sql/jobs_store/jobs_store.go
@@ -5,7 +5,9 @@
"context"
"database/sql"
"encoding/json"
+ "math"
"strconv"
+ "time"
"github.com/jackc/pgx/v4"
"go.skia.org/infra/go/skerr"
@@ -189,6 +191,10 @@
return nil
}
+ // These values are passed in as duration in nanoseconds, so we will convert to minutes and round
+ durationMinutes := time.Duration(workflowDuration).Minutes()
+ durationMinutesRounded := int64(math.Round(durationMinutes))
+
// Update duration parameter
tx, err := js.db.Begin(ctx)
if err != nil {
@@ -199,7 +205,7 @@
if err != nil {
return err
}
- params["duration"] = strconv.FormatInt(workflowDuration, 10)
+ params["duration"] = strconv.FormatInt(durationMinutesRounded, 10)
query := `
UPDATE jobs SET
diff --git a/pinpoint/go/sql/jobs_store/jobs_store_test.go b/pinpoint/go/sql/jobs_store/jobs_store_test.go
index f39f0a5..011e5bd 100644
--- a/pinpoint/go/sql/jobs_store/jobs_store_test.go
+++ b/pinpoint/go/sql/jobs_store/jobs_store_test.go
@@ -6,6 +6,7 @@
"errors"
"strings"
"testing"
+ "time"
"github.com/google/uuid"
"github.com/stretchr/testify/assert"
@@ -154,7 +155,8 @@
require.NoError(t, err)
newStatus := "Completed"
- err = js.UpdateJobStatus(ctx, jobID, newStatus, 10)
+ durationInNanoseconds := int64(10 * time.Minute)
+ err = js.UpdateJobStatus(ctx, jobID, newStatus, durationInNanoseconds)
require.NoError(t, err)
retrievedJob, err := js.GetJob(ctx, jobID)
diff --git a/pinpoint/go/workflows/internal/BUILD.bazel b/pinpoint/go/workflows/internal/BUILD.bazel
index 9ec0420..b1cbd38 100644
--- a/pinpoint/go/workflows/internal/BUILD.bazel
+++ b/pinpoint/go/workflows/internal/BUILD.bazel
@@ -105,11 +105,13 @@
"//pinpoint/go/compare",
"//pinpoint/go/midpoint",
"//pinpoint/go/run_benchmark",
+ "//pinpoint/go/sql/jobs_store",
"//pinpoint/go/workflows",
"//pinpoint/proto/v1:proto",
"@com_github_stretchr_testify//assert",
"@com_github_stretchr_testify//mock",
"@com_github_stretchr_testify//require",
+ "@io_temporal_go_sdk//activity",
"@io_temporal_go_sdk//testsuite",
"@io_temporal_go_sdk//worker",
"@io_temporal_go_sdk//workflow",
diff --git a/pinpoint/go/workflows/internal/pairwise.go b/pinpoint/go/workflows/internal/pairwise.go
index e04c1e2..b8f4fa3 100644
--- a/pinpoint/go/workflows/internal/pairwise.go
+++ b/pinpoint/go/workflows/internal/pairwise.go
@@ -6,15 +6,26 @@
"github.com/google/uuid"
swarming_pb "go.chromium.org/luci/swarming/proto/api_v2"
"go.skia.org/infra/go/skerr"
+ "go.skia.org/infra/go/sklog"
"go.skia.org/infra/pinpoint/go/common"
"go.skia.org/infra/pinpoint/go/compare"
+ jobstore "go.skia.org/infra/pinpoint/go/sql/jobs_store"
"go.skia.org/infra/pinpoint/go/workflows"
+
+ "go.temporal.io/sdk/temporal"
"go.temporal.io/sdk/workflow"
pinpoint_proto "go.skia.org/infra/pinpoint/proto/v1"
)
-func PairwiseWorkflow(ctx workflow.Context, p *workflows.PairwiseParams) (*pinpoint_proto.PairwiseExecution, error) {
+// TODO(natnaelal) Expand potential job statuses to include intermediate statuses
+const (
+ failed = "FAILED"
+ completed = "COMPLETED"
+ canceled = "CANCELED"
+)
+
+func PairwiseWorkflow(ctx workflow.Context, p *workflows.PairwiseParams) (pe *pinpoint_proto.PairwiseExecution, finalError error) {
if p.Request.StartBuild == nil && p.Request.StartCommit == nil {
return nil, skerr.Fmt("Base build and commit are empty.")
}
@@ -37,6 +48,11 @@
jobID := uuid.New().String()
wkStartTime := time.Now().UnixNano()
+ if err := workflow.ExecuteActivity(ctx, AddInitialJob, p.Request, jobID).Get(ctx, nil); err != nil {
+ // TODO(natnaelal) Convert subsequent uses of sklog to full errors once Job Store activities are
+ // more stable and fully integrated
+ sklog.Errorf("failed to add initial job info to Spanner: %s", err)
+ }
// Benchmark runs can sometimes generate an inconsistent number of data points.
// So even if all benchmark runs were successful, the number of data values
@@ -73,16 +89,58 @@
"config": p.Request.Configuration,
"story": p.Request.Story,
})
+ protoResults := map[string]*pinpoint_proto.PairwiseExecution_WilcoxonResult{}
defer func() {
duration := time.Now().UnixNano() - wkStartTime
mh.Timer("pairwise_duration").Record(time.Duration(duration))
+
+ // Final writebacks to Spanner before end of Pairwise workflow
+ if finalError != nil {
+ if temporal.IsCanceledError(finalError) {
+ if err := workflow.ExecuteActivity(ctx, UpdateJobStatus, jobID, canceled, duration).Get(ctx, nil); err != nil {
+ sklog.Errorf("couldn't update status for canceled pairwise job with this ID: %s", jobID)
+ }
+ } else {
+ // Pairwise job failed for other reasons.
+ if err := workflow.ExecuteActivity(ctx, SetErrors, jobID, finalError.Error()).Get(ctx, nil); err != nil {
+ sklog.Errorf("couldn't add error for pairwise job with this ID: %s", jobID)
+ }
+ if err := workflow.ExecuteActivity(ctx, UpdateJobStatus, jobID, failed, duration).Get(ctx, nil); err != nil {
+ sklog.Errorf("couldn't update status for pairwise job with this ID: %s", jobID)
+ }
+ }
+ } else {
+ if err := workflow.ExecuteActivity(ctx, UpdateJobStatus, jobID, completed, duration).Get(ctx, nil); err != nil {
+ sklog.Errorf("couldn't update status for pairwise job with this ID: %s", jobID)
+ }
+ // Write back to database the results of the comparision through the job store object
+ if err := workflow.ExecuteActivity(ctx, AddResults, jobID, protoResults).Get(ctx, nil); err != nil {
+ sklog.Errorf("couldn't add results for pairwise job with this ID: %s", jobID)
+ }
+ }
+
}()
var pr *PairwiseRun
if err := workflow.ExecuteChildWorkflow(ctx, workflows.PairwiseCommitsRunner, pairwiseRunnerParams).Get(ctx, &pr); err != nil {
return nil, skerr.Wrap(err)
}
+ // Store details of commit buids and test runs
+ if pr != nil {
+ leftData := &jobstore.CommitRunData{
+ Build: pr.Left.Build,
+ Runs: pr.Left.Runs,
+ }
+ rightData := &jobstore.CommitRunData{
+ Build: pr.Right.Build,
+ Runs: pr.Right.Runs,
+ }
+ if err := workflow.ExecuteActivity(ctx, AddCommitRuns, jobID, leftData, rightData).Get(ctx, nil); err != nil {
+ sklog.Errorf("couldn't add commit runs for pairwise job with this ID: %s", jobID)
+ }
+
+ }
results, err := comparePairwiseRuns(ctx, pr, compare.UnknownDir)
if err != nil {
@@ -99,7 +157,6 @@
culpritCandidate = (*pinpoint_proto.CombinedCommit)(pairwiseRunnerParams.RightCommit)
}
- protoResults := map[string]*pinpoint_proto.PairwiseExecution_WilcoxonResult{}
for chart, res := range results {
protoResults[chart] = &pinpoint_proto.PairwiseExecution_WilcoxonResult{
// Significant is used in CulpritFinder to determine whether to bisect.
diff --git a/pinpoint/go/workflows/internal/pairwise_test.go b/pinpoint/go/workflows/internal/pairwise_test.go
index 0ad5689..75a4b8f 100644
--- a/pinpoint/go/workflows/internal/pairwise_test.go
+++ b/pinpoint/go/workflows/internal/pairwise_test.go
@@ -1,6 +1,7 @@
package internal
import (
+ "context"
"testing"
"github.com/stretchr/testify/assert"
@@ -9,8 +10,10 @@
"go.skia.org/infra/go/skerr"
"go.skia.org/infra/pinpoint/go/common"
"go.skia.org/infra/pinpoint/go/compare"
+ jobstore "go.skia.org/infra/pinpoint/go/sql/jobs_store"
"go.skia.org/infra/pinpoint/go/workflows"
pinpoint_proto "go.skia.org/infra/pinpoint/proto/v1"
+ "go.temporal.io/sdk/activity"
"go.temporal.io/sdk/testsuite"
"go.temporal.io/sdk/workflow"
)
@@ -28,12 +31,40 @@
},
}
+func registerJobStoreActivities(env *testsuite.TestWorkflowEnvironment) {
+ env.RegisterActivityWithOptions(
+ func(context.Context, *pinpoint_proto.SchedulePairwiseRequest, string) error { return nil },
+ activity.RegisterOptions{Name: AddInitialJob},
+ )
+ env.RegisterActivityWithOptions(
+ func(context.Context, string, string, int64) error { return nil },
+ activity.RegisterOptions{Name: UpdateJobStatus},
+ )
+ env.RegisterActivityWithOptions(
+ func(context.Context, string, string) error { return nil },
+ activity.RegisterOptions{Name: SetErrors},
+ )
+ env.RegisterActivityWithOptions(
+ func(context.Context, string, map[string]*pinpoint_proto.PairwiseExecution_WilcoxonResult) error {
+ return nil
+ },
+ activity.RegisterOptions{Name: AddResults},
+ )
+ env.RegisterActivityWithOptions(
+ func(context.Context, string, *jobstore.CommitRunData, *jobstore.CommitRunData) error { return nil },
+ activity.RegisterOptions{Name: AddCommitRuns},
+ )
+}
+
func TestPairwiseWorkflow_GivenUnsuccessfulWorkflow_ReturnsError(t *testing.T) {
testSuite := &testsuite.WorkflowTestSuite{}
env := testSuite.NewTestWorkflowEnvironment()
-
+ registerJobStoreActivities(env)
env.RegisterWorkflowWithOptions(PairwiseCommitsRunnerWorkflow, workflow.RegisterOptions{Name: workflows.PairwiseCommitsRunner})
env.OnWorkflow(workflows.PairwiseCommitsRunner, mock.Anything, mock.Anything).Return(nil, skerr.Fmt("some error")).Once()
+ env.OnActivity(AddInitialJob, mock.Anything, mock.Anything, mock.Anything).Return(nil).Once()
+ env.OnActivity(SetErrors, mock.Anything, mock.Anything, mock.Anything).Return(nil).Once()
+ env.OnActivity(UpdateJobStatus, mock.Anything, mock.Anything, failed, mock.Anything).Return(nil).Once()
env.ExecuteWorkflow(PairwiseWorkflow, &workflows.PairwiseParams{
Request: &pinpoint_proto.SchedulePairwiseRequest{
@@ -79,10 +110,15 @@
testSuite := &testsuite.WorkflowTestSuite{}
env := testSuite.NewTestWorkflowEnvironment()
+ registerJobStoreActivities(env)
env.RegisterWorkflowWithOptions(PairwiseCommitsRunnerWorkflow, workflow.RegisterOptions{Name: workflows.PairwiseCommitsRunner})
env.OnWorkflow(workflows.PairwiseCommitsRunner, mock.Anything, mock.Anything).Return(mockResult, nil).Once()
env.OnActivity(ComparePairwiseActivity, mock.Anything, mock.Anything, mock.Anything, mock.Anything).Return(statResults, nil).Once()
+ env.OnActivity(AddInitialJob, mock.Anything, mock.Anything, mock.Anything).Return(nil).Once()
+ env.OnActivity(AddCommitRuns, mock.Anything, mock.Anything, mock.Anything, mock.Anything).Return(nil).Once()
+ env.OnActivity(UpdateJobStatus, mock.Anything, mock.Anything, completed, mock.Anything).Return(nil).Once()
+ env.OnActivity(AddResults, mock.Anything, mock.Anything, mock.Anything).Return(nil).Once()
env.ExecuteWorkflow(PairwiseWorkflow, &workflows.PairwiseParams{
Request: &pinpoint_proto.SchedulePairwiseRequest{
@@ -131,10 +167,15 @@
testSuite := &testsuite.WorkflowTestSuite{}
env := testSuite.NewTestWorkflowEnvironment()
+ registerJobStoreActivities(env)
env.RegisterWorkflowWithOptions(PairwiseCommitsRunnerWorkflow, workflow.RegisterOptions{Name: workflows.PairwiseCommitsRunner})
env.OnWorkflow(workflows.PairwiseCommitsRunner, mock.Anything, mock.Anything).Return(mockResult, nil).Once()
env.OnActivity(ComparePairwiseActivity, mock.Anything, mock.Anything, mock.Anything, mock.Anything).Return(statResults, nil).Once()
+ env.OnActivity(AddInitialJob, mock.Anything, mock.Anything, mock.Anything).Return(nil).Once()
+ env.OnActivity(AddCommitRuns, mock.Anything, mock.Anything, mock.Anything, mock.Anything).Return(nil).Once()
+ env.OnActivity(UpdateJobStatus, mock.Anything, mock.Anything, completed, mock.Anything).Return(nil).Once()
+ env.OnActivity(AddResults, mock.Anything, mock.Anything, mock.Anything).Return(nil).Once()
env.ExecuteWorkflow(PairwiseWorkflow, &workflows.PairwiseParams{
Request: &pinpoint_proto.SchedulePairwiseRequest{