blob: c036de428b5e42ab87f5db7cfed18275ddbaa1c4 [file]
package service
import (
"context"
"net/http"
"time"
"github.com/google/uuid"
"github.com/grpc-ecosystem/grpc-gateway/v2/runtime"
"go.temporal.io/sdk/client"
"go.temporal.io/sdk/temporal"
"golang.org/x/time/rate"
"google.golang.org/grpc/codes"
"google.golang.org/grpc/status"
"go.skia.org/infra/go/skerr"
"go.skia.org/infra/go/sklog"
"go.skia.org/infra/pinpoint/go/workflows"
pb "go.skia.org/infra/pinpoint/proto/v1"
tpr_client "go.skia.org/infra/temporal/go/client"
enumspb "go.temporal.io/api/enums/v1"
)
type server struct {
pb.UnimplementedPinpointServer
// Local rate limiter to only limit the traffic for migration temporarilly.
limiter *rate.Limiter
temporal tpr_client.TemporalProvider
}
const (
// Those should be configurable for each instance.
hostPort = "localhost:7233"
namespace = "perf-internal"
taskQueue = "perf.perf-chrome-public.bisect"
// TODO(b/352631333): Replace the rate limit with an actual queueing system
rateLimit = 30 * time.Minute // accept 1 request every 30 minutes
// arbitrary timeouts to ensure workflows will not run forever. Note that it is possible
// for a job to naturally timeout due to long running time. Consider updating these
// if too many jobs time out.
pairwiseTimeoutDuration = 2 * time.Hour
bisectionTimeout = 12 * time.Hour
culpritFinderTimeout = 16 * time.Hour
)
func New(t tpr_client.TemporalProvider, l *rate.Limiter) *server {
if l == nil {
// 1 token every 30 minutes, this allow some buffer to drain the hot spots in the bots pool.
l = rate.NewLimiter(rate.Every(rateLimit), 1)
}
if t == nil {
t = tpr_client.DefaultTemporalProvider{}
}
return &server{
limiter: l,
temporal: t,
}
}
func NewJSONHandler(ctx context.Context, srv pb.PinpointServer) (http.Handler, error) {
m := runtime.NewServeMux()
if err := pb.RegisterPinpointHandlerServer(ctx, m, srv); err != nil {
return nil, skerr.Wrapf(err, "unable to register pinpoint handler")
}
return m, nil
}
func (s *server) CancelJob(ctx context.Context, req *pb.CancelJobRequest) (*pb.CancelJobResponse, error) {
sklog.Infof("Receiving cancel job request: %v", req)
if req.JobId == "" {
return nil, status.Errorf(codes.InvalidArgument, "bad request: missing JobId")
}
if req.Reason == "" {
return nil, status.Errorf(codes.InvalidArgument, "bad request: missing Reason")
}
c, cleanUp, err := s.temporal.NewClient(hostPort, namespace)
if err != nil {
return nil, status.Errorf(codes.Internal, "Unable to connect to Temporal (%v).", err)
}
defer cleanUp()
err = c.CancelWorkflow(ctx, req.JobId, "")
if err != nil {
return nil, status.Errorf(codes.Internal, "Unable to cancel workflow (%v).", err)
}
return &pb.CancelJobResponse{JobId: req.JobId, State: "Cancelled"}, nil
}
func (s *server) LegacyJobQuery(ctx context.Context, req *pb.LegacyJobRequest) (*pb.LegacyJobResponse, error) {
qresp, err := s.QueryBisection(ctx, &pb.QueryBisectRequest{
JobId: req.GetJobId(),
})
if err != nil {
// We don't skerr.Wrap here because we expect to populate err with status.code back to
// the client, this is automatic conversion to REST API status code when this is exposed
// via grpc-gateway.
// Note this API is only intermediate and will be gone, this is not considered to be
// best practise.
return nil, err
}
// TODO(b/318864009): convert BisectExecution to LegacyJobResponse
// This should be just a copy.
resp := &pb.LegacyJobResponse{
JobId: qresp.GetJobId(),
}
return resp, nil
}
func (s *server) ScheduleBisection(ctx context.Context, req *pb.ScheduleBisectRequest) (*pb.BisectExecution, error) {
// Those logs are used to test traffic from existing services in catapult, shall be removed.
sklog.Infof("Receiving bisection request: %v", req)
if !s.limiter.Allow() {
sklog.Infof("The request is dropped due to rate limiting.")
return nil, status.Errorf(codes.ResourceExhausted, "unable to fulfill the request due to rate limiting, dropping")
}
// TODO(b/318864009): Remove this function once Pinpoint migration is completed.
req = updateFieldsForCatapult(req)
if err := validateBisectRequest(req); err != nil {
return nil, status.Error(codes.InvalidArgument, err.Error())
}
c, cleanUp, err := s.temporal.NewClient(hostPort, namespace)
if err != nil {
return nil, status.Errorf(codes.Internal, "Unable to connect to Temporal (%v).", err)
}
defer cleanUp()
wo := client.StartWorkflowOptions{
ID: uuid.New().String(),
TaskQueue: taskQueue,
WorkflowExecutionTimeout: bisectionTimeout,
RetryPolicy: &temporal.RetryPolicy{
// We will only attempt to run the workflow exactly once as we expect any failure will be
// not retry-recoverable failure.
MaximumAttempts: 1,
},
}
wf, err := c.ExecuteWorkflow(ctx, wo, workflows.CatapultBisect, &workflows.BisectParams{
Request: req,
Production: true,
})
if err != nil {
return nil, status.Errorf(codes.Internal, "Unable to start workflow (%v).", err)
}
return &pb.BisectExecution{JobId: wf.GetID()}, nil
}
func (s *server) ScheduleCulpritFinder(ctx context.Context, req *pb.ScheduleCulpritFinderRequest) (*pb.CulpritFinderExecution, error) {
// Those logs are used to test traffic from existing services in catapult, shall be removed.
sklog.Infof("Receiving culprit finder request: %v", req)
if !s.limiter.Allow() {
sklog.Infof("The request is dropped due to rate limiting.")
return nil, status.Errorf(codes.ResourceExhausted, "unable to fulfill the request due to rate limiting, dropping")
}
// TODO(b/318864009): Remove this function once Pinpoint migration is completed.
req = updateCulpritFinderFieldsForCatapult(req)
if err := validateCulpritFinderRequest(req); err != nil {
sklog.Warningf("the request failed validation due to %v", err.Error())
return nil, status.Error(codes.InvalidArgument, err.Error())
}
c, cleanUp, err := s.temporal.NewClient(hostPort, namespace)
if err != nil {
return nil, status.Errorf(codes.Internal, "Unable to connect to Temporal (%v).", err)
}
defer cleanUp()
wo := client.StartWorkflowOptions{
ID: uuid.New().String(),
TaskQueue: taskQueue,
// arbitrary timeout to assure it's not going forever. Set to a few hours more than
// bisect timeout.
WorkflowExecutionTimeout: culpritFinderTimeout,
RetryPolicy: &temporal.RetryPolicy{
// We will only attempt to run the workflow exactly once as we expect any failure will be
// not retry-recoverable failure.
MaximumAttempts: 1,
},
}
wf, err := c.ExecuteWorkflow(ctx, wo, workflows.CulpritFinderWorkflow, &workflows.CulpritFinderParams{
Request: req,
Production: true,
})
if err != nil {
return nil, status.Errorf(codes.Internal, "Unable to start workflow (%v).", err)
}
return &pb.CulpritFinderExecution{JobId: wf.GetID()}, nil
}
func (s *server) SchedulePairwise(ctx context.Context, req *pb.SchedulePairwiseRequest) (*pb.PairwiseExecution, error) {
// TODO(b/391717876) - remove log once migration complete, as these logs can
// get noisy.
sklog.Infof("Pairwise (try) request received: %v", req)
if !s.limiter.Allow() {
sklog.Infof("The request is dropped due to rate limiting.")
return nil, status.Errorf(codes.ResourceExhausted, "unable to fulfill the request due to rate limiting, dropping")
}
if err := validatePairwiseRequest(req); err != nil {
return nil, status.Errorf(codes.InvalidArgument, "invalid request")
}
c, cleanUp, err := s.temporal.NewClient(hostPort, namespace)
if err != nil {
return nil, status.Errorf(codes.Internal, "Unable to connect to Temporal (%v).", err)
}
defer cleanUp()
workflowOptions := client.StartWorkflowOptions{
ID: uuid.New().String(),
TaskQueue: taskQueue,
// A pairwise try job SLO is to complete sub 2 hours.
WorkflowExecutionTimeout: pairwiseTimeoutDuration,
RetryPolicy: &temporal.RetryPolicy{
// We will only attempt to run the workflow exactly once as we expect any failure will be
// not retry-recoverable failure.
MaximumAttempts: 1,
},
}
workflowRun, err := c.ExecuteWorkflow(ctx, workflowOptions, workflows.PairwiseWorkflow, &workflows.PairwiseParams{
Request: req,
CulpritVerify: false,
})
if err != nil {
sklog.Infof("failed to execute workflow: %v", err)
return nil, status.Errorf(codes.Internal, "Unable to execute workflow: (%v).", err)
}
return &pb.PairwiseExecution{
JobId: workflowRun.GetID(),
}, nil
}
func (s *server) QueryPairwise(ctx context.Context, req *pb.QueryPairwiseRequest) (*pb.QueryPairwiseResponse, error) {
sklog.Infof("Query Pairwise (try) request received: %v", req)
if err := validateQueryPairwiseRequest(req); err != nil {
return nil, status.Errorf(codes.InvalidArgument, "invalid request")
}
c, cleanUp, err := s.temporal.NewClient(hostPort, namespace)
if err != nil {
return nil, status.Errorf(codes.Internal, "Unable to connect to Temporal (%v).", err)
}
defer cleanUp()
resp, err := c.DescribeWorkflowExecution(ctx, req.JobId, "")
workflowStatus := resp.GetWorkflowExecutionInfo().GetStatus()
switch workflowStatus {
case enumspb.WORKFLOW_EXECUTION_STATUS_COMPLETED:
workflowRun := c.GetWorkflow(ctx, req.JobId, "")
var pairwiseExecution pb.PairwiseExecution
errGet := workflowRun.Get(ctx, &pairwiseExecution)
if errGet != nil {
return nil, status.Errorf(codes.Internal, "Pairwise workflow completed, but failed to get results (runID: '%s'): %v",
req.JobId, errGet)
}
return &pb.QueryPairwiseResponse{
Status: pb.PairwiseJobStatus_PAIRWISE_JOB_STATUS_COMPLETED,
Execution: &pairwiseExecution,
}, nil
case enumspb.WORKFLOW_EXECUTION_STATUS_FAILED,
enumspb.WORKFLOW_EXECUTION_STATUS_TIMED_OUT,
enumspb.WORKFLOW_EXECUTION_STATUS_TERMINATED:
return &pb.QueryPairwiseResponse{
Status: pb.PairwiseJobStatus_PAIRWISE_JOB_STATUS_FAILED,
Execution: nil,
}, nil
case enumspb.WORKFLOW_EXECUTION_STATUS_CANCELED:
return &pb.QueryPairwiseResponse{
Status: pb.PairwiseJobStatus_PAIRWISE_JOB_STATUS_CANCELED,
Execution: nil,
}, nil
case enumspb.WORKFLOW_EXECUTION_STATUS_RUNNING,
enumspb.WORKFLOW_EXECUTION_STATUS_CONTINUED_AS_NEW:
return &pb.QueryPairwiseResponse{
Status: pb.PairwiseJobStatus_PAIRWISE_JOB_STATUS_RUNNING,
Execution: nil,
}, nil
}
return nil, status.Errorf(codes.Internal, "Pairwise workflow execution returned unknown status.")
}