blob: 885596c3bf3ee8062e43f60e9afa9fc1d30189c6 [file] [log] [blame]
package main
import (
"context"
"flag"
"net/http"
"github.com/jackc/pgx/v4/pgxpool"
"go.opencensus.io/trace"
"go.skia.org/infra/go/common"
"go.skia.org/infra/go/httputils"
"go.skia.org/infra/go/metrics2"
"go.skia.org/infra/go/sklog"
"go.skia.org/infra/go/util"
"go.skia.org/infra/golden/go/config"
"go.skia.org/infra/golden/go/ignore/sqlignorestore"
"go.skia.org/infra/golden/go/sql"
"go.skia.org/infra/golden/go/tracing"
)
const (
// Arbitrary number
maxSQLConnections = 4
)
// TODO(kjlubick) other periodic tasks
// - Fix any races with trace rules (re-apply all rules to Traces and ValuesAtHead)
// - Send tasks to diffworker queue (de-duplicating where possible).
type periodicTasksConfig struct {
config.Common
// NullIgnorePeriod is how often we should try to apply the ignore rules to null traces.
NullIgnorePeriod config.Duration `json:"null_ignore_period"`
}
func main() {
// Command line flags.
var (
commonInstanceConfig = flag.String("common_instance_config", "", "Path to the json5 file containing the configuration that needs to be the same across all services for a given instance.")
thisConfig = flag.String("config", "", "Path to the json5 file containing the configuration specific to the periodic tasks server.")
hang = flag.Bool("hang", false, "Stop and do nothing after reading the flags. Good for debugging containers.")
)
// Parse the options. So we can configure logging.
flag.Parse()
if *hang {
sklog.Info("Hanging")
select {}
}
var ptc periodicTasksConfig
if err := config.LoadFromJSON5(&ptc, commonInstanceConfig, thisConfig); err != nil {
sklog.Fatalf("Reading config: %s", err)
}
sklog.Infof("Loaded config %#v", ptc)
// Set up the logging options.
logOpts := []common.Opt{
common.PrometheusOpt(&ptc.PromPort),
}
tp := 0.01
if ptc.TracingProportion > 0 {
tp = ptc.TracingProportion
}
common.InitWithMust("periodictasks", logOpts...)
if err := tracing.Initialize(tp); err != nil {
sklog.Fatalf("Could not set up tracing: %s", err)
}
ctx := context.Background()
db := mustInitSQLDatabase(ctx, ptc)
startUpdateTracesWithNullStatus(ctx, db, ptc)
sklog.Infof("periodic tasks have been started")
http.HandleFunc("/healthz", httputils.ReadyHandleFunc)
sklog.Fatal(http.ListenAndServe(ptc.ReadyPort, nil))
}
func startUpdateTracesWithNullStatus(ctx context.Context, db *pgxpool.Pool, ptc periodicTasksConfig) {
liveness := metrics2.NewLiveness("periodic_tasks", map[string]string{
"task": "updateTracesWithNullStatus",
})
go util.RepeatCtx(ctx, ptc.NullIgnorePeriod.Duration, func(ctx context.Context) {
ctx, span := trace.StartSpan(ctx, "updateTracesWithNullStatus")
defer span.End()
if err := sqlignorestore.UpdateIgnoredTraces(ctx, db); err != nil {
sklog.Error("Error while updating null traces: %s", err)
return // return so the liveness is not updated
}
liveness.Reset()
sklog.Infof("Done with updateTracesWithNullStatus")
})
}
func mustInitSQLDatabase(ctx context.Context, ptc periodicTasksConfig) *pgxpool.Pool {
if ptc.SQLDatabaseName == "" {
sklog.Fatalf("Must have SQL Database Information")
}
url := sql.GetConnectionURL(ptc.SQLConnection, ptc.SQLDatabaseName)
conf, err := pgxpool.ParseConfig(url)
if err != nil {
sklog.Fatalf("error getting postgres config %s: %s", url, err)
}
conf.MaxConns = maxSQLConnections
db, err := pgxpool.ConnectConfig(ctx, conf)
if err != nil {
sklog.Fatalf("error connecting to the database: %s", err)
}
sklog.Infof("Connected to SQL database %s", ptc.SQLDatabaseName)
return db
}