| // package main is the main entry point for the cabe server executable. |
| package main |
| |
| import ( |
| "context" |
| "flag" |
| "fmt" |
| "net" |
| "net/http" |
| "os" |
| "time" |
| |
| "cloud.google.com/go/compute/metadata" |
| rbeclient "github.com/bazelbuild/remote-apis-sdks/go/pkg/client" |
| "github.com/go-chi/chi/v5" |
| swarmingapi "go.chromium.org/luci/common/api/swarming/swarming/v1" |
| "go.opencensus.io/plugin/ocgrpc" |
| "go.skia.org/infra/go/swarming" |
| "google.golang.org/grpc" |
| "google.golang.org/grpc/health" |
| "google.golang.org/grpc/health/grpc_health_v1" |
| "google.golang.org/grpc/reflection" |
| |
| "go.skia.org/infra/go/cleanup" |
| "go.skia.org/infra/go/common" |
| "go.skia.org/infra/go/grpcsp" |
| "go.skia.org/infra/go/httputils" |
| "go.skia.org/infra/go/roles" |
| "go.skia.org/infra/go/sklog" |
| "go.skia.org/infra/go/tracing" |
| "go.skia.org/infra/go/tracing/loggingtracer" |
| |
| "go.skia.org/infra/cabe/go/analysisserver" |
| "go.skia.org/infra/cabe/go/backends" |
| cpb "go.skia.org/infra/cabe/go/proto" |
| "go.skia.org/infra/go/grpclogging" |
| "go.skia.org/infra/perf/go/perfresults" |
| ) |
| |
| const ( |
| appName = "cabe" |
| drainTime = time.Second * 5 |
| ) |
| |
| func init() { |
| // Workaround for "ERROR: logging before flag.Parse" messages that show |
| // up due to some transitive dependency on glog (we don't use it directly). |
| // See: https://github.com/kubernetes/kubernetes/issues/17162 |
| fs := flag.NewFlagSet("", flag.ContinueOnError) |
| _ = fs.Parse([]string{}) |
| flag.CommandLine = fs |
| } |
| |
| // App is the cabe server application. |
| type App struct { |
| port string |
| grpcPort string |
| promPort string |
| disableGRPCSP bool |
| disableGRPCLog bool |
| traceSampleRate float64 |
| logTracing bool |
| |
| lisGRPC net.Listener |
| lisHTTP net.Listener |
| |
| authPolicy *grpcsp.ServerPolicy |
| grpcLogger *grpclogging.GRPCLogger |
| swarmingClient swarming.ApiClient |
| rbeClients map[string]*rbeclient.Client |
| |
| httpServer *http.Server |
| grpcServer *grpc.Server |
| } |
| |
| // FlagSet constructs a flag.FlagSet for the App. |
| func (a *App) FlagSet() *flag.FlagSet { |
| fs := flag.NewFlagSet(appName, flag.ExitOnError) |
| fs.StringVar(&a.port, "port", ":8002", "HTTP service address (e.g., ':8002')") |
| fs.StringVar(&a.promPort, "prom_port", ":20000", "Metrics service address (e.g., ':10110')") |
| fs.StringVar(&a.grpcPort, "grpc_port", ":50051", "gRPC service port (e.g., ':50051')") |
| fs.BoolVar(&a.disableGRPCSP, "disable_grpcsp", false, "disable authorization checks for incoming grpc calls") |
| fs.BoolVar(&a.disableGRPCLog, "disable_grpclog", false, "disable structured logging for grpc client and server calls") |
| fs.Float64Var(&a.traceSampleRate, "trace_sample", 0.0, "sampling rate for trace collection") |
| fs.BoolVar(&a.logTracing, "log_tracing", false, "send tracing information to logs (DO NOT use in production!)") |
| |
| return fs |
| } |
| |
| func (a *App) casResultReader(ctx context.Context, instance, digest string) (map[string]perfresults.PerfResults, error) { |
| rbeClient, ok := a.rbeClients[instance] |
| if !ok { |
| return nil, fmt.Errorf("no RBE client for instance %s", instance) |
| } |
| |
| return backends.FetchBenchmarkJSON(ctx, rbeClient, digest) |
| } |
| |
| func (a *App) swarmingTaskReader(ctx context.Context, pinpointJobID string) ([]*swarmingapi.SwarmingRpcsTaskRequestMetadata, error) { |
| tasksResp, err := a.swarmingClient.ListTasks(ctx, time.Now().AddDate(0, 0, -56), time.Now(), []string{"pinpoint_job_id:" + pinpointJobID}, "") |
| if err != nil { |
| sklog.Fatalf("list task results: %v", err) |
| return nil, err |
| } |
| return tasksResp, nil |
| } |
| |
| // Init creates listeners for required service ports and prepares the App for serving. |
| func (a *App) Init(ctx context.Context) error { |
| if a.swarmingClient == nil { |
| return fmt.Errorf("missing swarming service client") |
| } |
| if a.rbeClients == nil { |
| return fmt.Errorf("missing rbe service clients") |
| } |
| if a.authPolicy == nil && !a.disableGRPCSP { |
| return fmt.Errorf("missing required grpc authorization policy") |
| } |
| var err error |
| |
| // Just testing the http healthz check to make sure envoy can |
| // connect to these processes at all. If we end up needing |
| // both the http server and the grpc server in order to satisfy envoy |
| // health checks AND serve grpc requests, we can separate the http and |
| // grpc port flags in k8s configs. |
| sklog.Infof("registering http healthz handler") |
| topLevelRouter := chi.NewRouter() |
| h := httputils.HealthzAndHTTPS(topLevelRouter) |
| httpServeMux := http.NewServeMux() |
| httpServeMux.Handle("/", h) |
| a.lisHTTP, err = net.Listen("tcp", a.port) |
| if err != nil { |
| return err |
| } |
| // If the port was specified as ":0" and the OS picked a port for us, |
| // set the app's port to the actual port it's listening on. |
| a.port = a.lisHTTP.Addr().String() |
| a.httpServer = &http.Server{ |
| Addr: a.port, |
| Handler: httpServeMux, |
| } |
| |
| opts := []grpc.ServerOption{} |
| |
| interceptors := []grpc.UnaryServerInterceptor{} |
| if a.grpcLogger != nil { |
| interceptors = append(interceptors, a.grpcLogger.ServerUnaryLoggingInterceptor) |
| } |
| |
| if !a.disableGRPCSP { |
| interceptors = append(interceptors, a.authPolicy.UnaryInterceptor()) |
| } |
| |
| opts = append(opts, grpc.ChainUnaryInterceptor(interceptors...)) |
| if !a.disableGRPCSP { |
| opts = append(opts, grpc.UnaryInterceptor(a.authPolicy.UnaryInterceptor())) |
| } |
| // Add the Opencensus grpc stats/server handler in order to attach traces to the |
| // contexts for incoming gRPC requests. |
| opts = append(opts, grpc.StatsHandler(&ocgrpc.ServerHandler{})) |
| |
| a.grpcServer = grpc.NewServer(opts...) |
| |
| sklog.Infof("registering grpc health server") |
| healthServer := health.NewServer() |
| grpc_health_v1.RegisterHealthServer(a.grpcServer, healthServer) |
| |
| sklog.Infof("registering grpc reflection server") |
| reflection.Register(a.grpcServer) |
| |
| sklog.Infof("registering cabe grpc server") |
| analysisServer := analysisserver.New(a.casResultReader, a.swarmingTaskReader) |
| cpb.RegisterAnalysisServer(a.grpcServer, analysisServer) |
| a.lisGRPC, err = net.Listen("tcp", a.grpcPort) |
| if err != nil { |
| sklog.Fatalf("failed to listen: %v", err) |
| } |
| // If the port was specified as ":0" and the OS picked a port for us, |
| // set the app's grpc port to the actual port it's listening on. |
| a.grpcPort = a.lisGRPC.Addr().String() |
| sklog.Infof("server listening at %v", a.lisGRPC.Addr()) |
| return nil |
| } |
| |
| // ServeGRPC does not return unless there is an error during the startup process, in which case it |
| // returns the error, or if a call to [Cleanup()] causes a graceful shutdown, in which |
| // case it returns either nil if the graceful shutdown succeeds, or an error if it does not. |
| func (a *App) ServeGRPC(ctx context.Context) error { |
| if err := a.grpcServer.Serve(a.lisGRPC); err != nil { |
| sklog.Errorf("failed to serve grpc: %v", err) |
| return err |
| } |
| |
| return nil |
| } |
| |
| // ServeHTTP does not return unless there is an error in [http.Server#Serve], in which case |
| // it returns the error, or if a call to [Cleanup()] causes a graceful shutdown, in which |
| // case it returns either nil if the graceful shutdown succeeds, or an error if it does not. |
| func (a *App) ServeHTTP() error { |
| if err := a.httpServer.Serve(a.lisHTTP); err != nil && err != http.ErrServerClosed { |
| sklog.Errorf("faled to serve http: %v", err) |
| return err |
| } |
| return nil |
| } |
| |
| // DialBackends establishes rpc channel connections to backend |
| // services required by App. |
| func (a *App) DialBackends(ctx context.Context) error { |
| sklog.Infof("dialing RBE-CAS backends") |
| opts := []grpc.DialOption{} |
| if a.grpcLogger != nil { |
| opts = append(opts, |
| grpc.WithChainUnaryInterceptor(a.grpcLogger.ClientUnaryLoggingInterceptor), |
| grpc.WithChainStreamInterceptor(a.grpcLogger.ClientStreamLoggingInterceptor)) |
| } |
| |
| rbeClients, err := backends.DialRBECAS(ctx, opts...) |
| if err != nil { |
| sklog.Fatalf("dialing RBE-CAS backends: %v", err) |
| return err |
| } |
| sklog.Infof("successfully dialed %d RBE-CAS instances", len(rbeClients)) |
| a.rbeClients = rbeClients |
| |
| sklog.Infof("dialing Swarming") |
| swarmingClient, err := backends.DialSwarming(ctx) |
| if err != nil { |
| sklog.Fatalf("dialing swarming: %v", err) |
| return err |
| } |
| sklog.Infof("successfully dialed swarming") |
| a.swarmingClient = swarmingClient |
| return nil |
| } |
| |
| // ConfigureAuthorization configures a role-based authorization policy for the grpc server and |
| // the services it serves. |
| func (a *App) ConfigureAuthorization() error { |
| a.authPolicy = grpcsp.Server() |
| |
| healthPolicy, err := a.authPolicy.Service(grpc_health_v1.Health_ServiceDesc) |
| if err != nil { |
| sklog.Errorf("creating auth policy for service: %v", err) |
| return err |
| } |
| if err := healthPolicy.AuthorizeUnauthenticated(); err != nil { |
| sklog.Errorf("configuring roles for service: %v", err) |
| return err |
| } |
| |
| analysisPolicy, err := a.authPolicy.Service(cpb.Analysis_ServiceDesc) |
| if err != nil { |
| sklog.Errorf("creating auth policy for service: %v", err) |
| return err |
| } |
| |
| if err := analysisPolicy.AuthorizeRoles(roles.Roles{roles.Admin}); err != nil { |
| sklog.Errorf("configuring roles for service: %v", err) |
| return err |
| } |
| if err := analysisPolicy.AuthorizeMethodForRoles("GetAnalysis", roles.Roles{roles.Viewer}); err != nil { |
| sklog.Errorf("configuring roles for method: %v", err) |
| return err |
| } |
| |
| return nil |
| } |
| |
| // Cleanup gracefully shuts down any running servers and closes |
| // any open backend connections. |
| func (a *App) Cleanup() { |
| sklog.Info("Shutdown server gracefully.") |
| if a.grpcServer != nil { |
| a.grpcServer.GracefulStop() |
| } |
| |
| if err := a.httpServer.Shutdown(context.Background()); err != nil { |
| sklog.Errorf("shutting down http server: %v", err) |
| } |
| |
| // Now shut down client connections to backends that have clean shutdown methods. |
| for instance, rbeClient := range a.rbeClients { |
| if err := rbeClient.Close(); err != nil { |
| sklog.Errorf("closing RBE client connection for instance %q: %v", instance, err) |
| } |
| } |
| |
| // The [swarming.ApiClient] interface does not offer a clean shutdown method. |
| } |
| |
| func main() { |
| a := &App{} |
| |
| common.InitWithMust( |
| appName, |
| common.PrometheusOpt(&a.promPort), |
| common.FlagSetOpt(a.FlagSet()), |
| ) |
| |
| traceAttrs := map[string]interface{}{ |
| "podName": os.Getenv("POD_NAME"), |
| "containerName": os.Getenv("CONTAINER_NAME"), |
| "service.name": "cabeserver", |
| } |
| if a.logTracing { |
| loggingtracer.Initialize() |
| } |
| if err := tracing.Initialize(a.traceSampleRate, "skia-infra-public", traceAttrs); err != nil { |
| sklog.Fatalf("Could not initialize tracing: %s", err) |
| } |
| if err := a.ConfigureAuthorization(); err != nil { |
| sklog.Fatalf("configuring authorization policy: %v", err) |
| } |
| |
| if !a.disableGRPCLog { |
| projectID := "" |
| if metadata.OnGCE() { |
| var err error |
| projectID, err = metadata.ProjectID() |
| if err != nil { |
| sklog.Fatal(err) |
| } |
| } |
| a.grpcLogger = grpclogging.New(projectID, os.Stdout) |
| } |
| ctx := context.Background() |
| |
| if err := a.DialBackends(ctx); err != nil { |
| sklog.Fatalf("dialing backends: %v", err) |
| } |
| |
| cleanup.AtExit(a.Cleanup) |
| if err := a.Init(ctx); err != nil { |
| sklog.Fatal(err) |
| } |
| ch := make(chan interface{}) |
| go func() { |
| if err := a.ServeGRPC(ctx); err != nil { |
| sklog.Fatal(err) |
| } |
| ch <- nil |
| }() |
| go func() { |
| if err := a.ServeHTTP(); err != nil { |
| sklog.Fatal(err) |
| } |
| ch <- nil |
| }() |
| <-ch |
| <-ch |
| } |