blob: f15477acf64dfe2cc3e4f795634a27dc053f1b6c [file] [log] [blame]
package backend
import (
"context"
"net"
"go.temporal.io/sdk/client"
"go.skia.org/infra/go/cleanup"
"go.skia.org/infra/go/common"
"go.skia.org/infra/go/grpcsp"
"go.skia.org/infra/go/skerr"
"go.skia.org/infra/go/sklog"
"go.skia.org/infra/perf/go/alerts"
"go.skia.org/infra/perf/go/anomalygroup"
ag_service "go.skia.org/infra/perf/go/anomalygroup/service"
"go.skia.org/infra/perf/go/backend/shared"
"go.skia.org/infra/perf/go/builders"
"go.skia.org/infra/perf/go/config"
"go.skia.org/infra/perf/go/config/validate"
"go.skia.org/infra/perf/go/culprit"
"go.skia.org/infra/perf/go/culprit/notify"
culprit_service "go.skia.org/infra/perf/go/culprit/service"
"go.skia.org/infra/perf/go/notifytypes"
"go.skia.org/infra/perf/go/regression"
"go.skia.org/infra/perf/go/subscription"
tpr_client "go.skia.org/infra/temporal/go/client"
"google.golang.org/grpc"
"google.golang.org/grpc/reflection"
)
const appName = "backend"
// Backend provides a struct for the application.
type Backend struct {
configFileName string
promPort string
grpcPort string
grpcServer *grpc.Server
serverAuthPolicy *grpcsp.ServerPolicy
lisGRPC net.Listener
flags *config.BackendFlags
}
// BackendService provides an interface for a service to be hosted on Backend application.
type BackendService interface {
// GetAuthorizationPolicy returns the authorization policy for the service.
GetAuthorizationPolicy() shared.AuthorizationPolicy
// RegisterGrpc registers the grpc service with the server instance.
RegisterGrpc(server *grpc.Server)
// GetServiceDescriptor returns the service descriptor for the service.
GetServiceDescriptor() grpc.ServiceDesc
}
// initialize initializes the Backend application.
func (b *Backend) initialize(
anomalygroupStore anomalygroup.Store,
culpritStore culprit.Store,
subscriptionStore subscription.Store,
regressionStore regression.Store,
notifier notify.CulpritNotifier) error {
common.InitWithMust(
appName,
common.PrometheusOpt(&b.promPort),
)
var err error
ctx := context.Background()
// Load the config file.
sklog.Infof("Loading configs from %s", b.configFileName)
if err = validate.LoadAndValidate(b.configFileName); err != nil {
sklog.Fatal(err)
}
sklog.Debug("Creating anomalygroup store.")
if anomalygroupStore == nil {
anomalygroupStore, err = builders.NewAnomalyGroupStoreFromConfig(ctx, config.Config)
if err != nil {
sklog.Errorf("Error creating anomalgroup store. %s", err)
return err
}
}
sklog.Debug("Creating culprit notifier.")
if notifier == nil {
notifier, err = notify.GetDefaultNotifier(ctx, config.Config, b.flags.CommitRangeURL)
if err != nil {
sklog.Fatal(err)
}
}
sklog.Debug("Creating culprit store.")
if culpritStore == nil {
culpritStore, err = builders.NewCulpritStoreFromConfig(ctx, config.Config)
if err != nil {
sklog.Errorf("Error creating culprit store. %s", err)
return err
}
}
sklog.Debug("Creating subscription store.")
if subscriptionStore == nil {
subscriptionStore, err = builders.NewSubscriptionStoreFromConfig(ctx, config.Config)
if err != nil {
sklog.Errorf("Error creating subscription store. %s", err)
return err
}
}
sklog.Debug("Creating regression store.")
if regressionStore == nil {
sklog.Debug("Creating alertStore.")
alertStore, err := builders.NewAlertStoreFromConfig(ctx, false, config.Config)
if err != nil {
sklog.Fatal(err)
return err
}
sklog.Debug("Creating config provider.")
configProvider, err := alerts.NewConfigProvider(ctx, alertStore, 600)
if err != nil {
sklog.Fatalf("Failed to create alerts configprovider: %s", err)
return err
}
regressionStore, err = builders.NewRegressionStoreFromConfig(ctx, false, config.Config, configProvider)
if err != nil {
sklog.Errorf("Error creating regression store. %s", err)
return err
}
}
var temporalClient client.Client
if config.Config.NotifyConfig.Notifications == notifytypes.AnomalyGrouper {
// Temporal client needs to setup when grouping is in use.
sklog.Debug("Creating Temporal client.")
if config.Config.TemporalConfig.HostPort == "" || config.Config.TemporalConfig.Namespace == "" {
return skerr.Fmt("Empty values found in temporal properties: Hostport %s, Namespace: %s",
config.Config.TemporalConfig.HostPort, config.Config.TemporalConfig.Namespace)
}
temporalProvider := tpr_client.DefaultTemporalProvider{}
var err error
temporalClient, _, err = temporalProvider.NewClient(
config.Config.TemporalConfig.HostPort, config.Config.TemporalConfig.Namespace)
if err != nil {
return skerr.Wrapf(err, "Error creating temporal client.")
}
}
sklog.Info("Configuring grpc services.")
// Add all the services that will be hosted here.
services := []BackendService{
NewPinpointService(nil, nil),
ag_service.New(anomalygroupStore, regressionStore, temporalClient),
culprit_service.New(anomalygroupStore, culpritStore, subscriptionStore, notifier),
}
err = b.configureServices(services)
if err != nil {
return err
}
opts := []grpc.ServerOption{grpc.UnaryInterceptor(b.serverAuthPolicy.UnaryInterceptor())}
b.grpcServer = grpc.NewServer(opts...)
sklog.Infof("Registering grpc reflection server.")
reflection.Register(b.grpcServer)
sklog.Info("Registering individual services.")
b.registerServices(services)
b.lisGRPC, _ = net.Listen("tcp4", b.grpcPort)
sklog.Infof("Backend server listening at %v", b.lisGRPC.Addr())
cleanup.AtExit(b.Cleanup)
return nil
}
// configureServices configures all available services for Backend.
func (b *Backend) configureServices(services []BackendService) error {
for _, service := range services {
err := b.configureAuthorizationForService(service)
if err != nil {
return err
}
}
return nil
}
// registerServices registers all available services with the grpc server.
func (b *Backend) registerServices(services []BackendService) {
for _, service := range services {
service.RegisterGrpc(b.grpcServer)
}
}
// configureAuthorizationForService configures authorization rules for the given BackendService.
func (b *Backend) configureAuthorizationForService(service BackendService) error {
servicePolicy, err := b.serverAuthPolicy.Service(service.GetServiceDescriptor())
if err != nil {
sklog.Errorf("Error creating auth policy for service: %v", err)
return err
}
authPolicy := service.GetAuthorizationPolicy()
if authPolicy.AllowUnauthenticated {
if err := servicePolicy.AuthorizeUnauthenticated(); err != nil {
sklog.Errorf("Error configuring unauthenticated access for service: %v", err)
return err
}
} else {
if err := servicePolicy.AuthorizeRoles(authPolicy.AuthorizedRoles); err != nil {
sklog.Errorf("Error configuring roles for service: %v", err)
return err
}
if authPolicy.MethodAuthorizedRoles != nil {
for method, authorizedRoles := range authPolicy.MethodAuthorizedRoles {
if err := servicePolicy.AuthorizeMethodForRoles(method, authorizedRoles); err != nil {
sklog.Errorf("Error configuring roles for method %s: %v", method, err)
return err
}
}
}
}
return nil
}
// ServeGRPC does not return unless there is an error during the startup process, in which case it
// returns the error, or if a call to [Cleanup()] causes a graceful shutdown, in which
// case it returns either nil if the graceful shutdown succeeds, or an error if it does not.
func (b *Backend) ServeGRPC() error {
if err := b.grpcServer.Serve(b.lisGRPC); err != nil {
sklog.Errorf("failed to serve grpc: %v", err)
return err
}
return nil
}
// New creates a new instance of Backend application.
func New(flags *config.BackendFlags,
anomalygroupStore anomalygroup.Store,
culpritStore culprit.Store,
subscriptionStore subscription.Store,
regressionStore regression.Store,
notifier notify.CulpritNotifier,
) (*Backend, error) {
b := &Backend{
configFileName: flags.ConfigFilename,
grpcPort: flags.Port,
promPort: flags.PromPort,
serverAuthPolicy: grpcsp.Server(),
flags: flags,
}
err := b.initialize(anomalygroupStore, culpritStore, subscriptionStore, regressionStore, notifier)
return b, err
}
// Cleanup performs a graceful shutdown of the grpc server.
func (b *Backend) Cleanup() {
sklog.Info("Shutdown server gracefully.")
if b.grpcServer != nil {
b.grpcServer.GracefulStop()
}
}
// Serve intiates the listener to serve traffic.
func (b *Backend) Serve() {
if err := b.ServeGRPC(); err != nil {
sklog.Fatal(err)
}
}