blob: 2213b034a22312716db6cdca797b253c7629b907 [file] [log] [blame]
// The gitilesfollower executable monitors the repo we are tracking using gitiles. It fills in
// the GitCommits table, specifically making a mapping between a GitHash and CommitID. The CommitID
// is based on the "index" of the commit (i.e. how many commits since the initial commit).
//
// This will be used by all clients that have their tests in the same repo as the code under test.
// Clients with more complex repo structures, will need to have an alternate way of linking
// commit_id to git_hash.
package main
import (
"context"
"flag"
"fmt"
"net/http"
"strconv"
"time"
"github.com/jackc/pgx/v4"
"github.com/jackc/pgx/v4/pgxpool"
"go.opencensus.io/trace"
"go.skia.org/infra/go/auth"
"go.skia.org/infra/go/common"
"go.skia.org/infra/go/gitiles"
"go.skia.org/infra/go/httputils"
"go.skia.org/infra/go/skerr"
"go.skia.org/infra/go/sklog"
"go.skia.org/infra/go/util"
"go.skia.org/infra/go/vcsinfo"
"go.skia.org/infra/golden/go/config"
"go.skia.org/infra/golden/go/sql"
"go.skia.org/infra/golden/go/tracing"
)
const (
// Arbitrary number
maxSQLConnections = 4
// The initial commit will be given this commit ID. Subsequent commits will have monotonically
// increasing integers as IDs. We pick this number instead of zero in case we need to go
// backwards, we can assign a non-negative integer as an id (which won't break the sort order
// when turned into a string).
initialID = 1_000_000_000
// If overrideLatestCommit is set on a context, the associated value will be used instead of
// querying gitiles (which changes over time). This is used by tests.
overrideLatestCommitKey = contextKey("override_latest_commit")
)
type contextKey string // See advice in https://golang.org/pkg/context/#WithValue
type repoFollowerConfig struct {
config.Common
// InitialCommit that we will use if there are no existing commits in the DB. It will be counted
// like a "commit zero", which we actually assign to commit 1 billion in case we need to go back
// in time, we can sort our commit_ids without resorting to negative numbers.
InitialCommit string `json:"initial_commit"`
// PollPeriod is how often we should poll the source of truth.
PollPeriod config.Duration `json:"poll_period"`
// Metrics service address (e.g., ':10110')
PromPort string `json:"prom_port"`
// The port to provide a web handler for /healthz
ReadyPort string `json:"ready_port"`
}
func main() {
// Command line flags.
var (
commonInstanceConfig = flag.String("common_instance_config", "", "Path to the json5 file containing the configuration that needs to be the same across all services for a given instance.")
thisConfig = flag.String("config", "", "Path to the json5 file containing the configuration specific to baseline server.")
hang = flag.Bool("hang", false, "Stop and do nothing after reading the flags. Good for debugging containers.")
)
// Parse the options. So we can configure logging.
flag.Parse()
if *hang {
sklog.Info("Hanging")
select {}
}
var rfc repoFollowerConfig
if err := config.LoadFromJSON5(&rfc, commonInstanceConfig, thisConfig); err != nil {
sklog.Fatalf("Reading config: %s", err)
}
sklog.Infof("Loaded config %#v", rfc)
// Set up the logging options.
logOpts := []common.Opt{
common.PrometheusOpt(&rfc.PromPort),
}
common.InitWithMust("gitilesfollower", logOpts...)
if err := tracing.Initialize(1); err != nil {
sklog.Fatalf("Could not set up tracing: %s", err)
}
ctx := context.Background()
db := mustInitSQLDatabase(ctx, rfc)
ts, err := auth.NewDefaultTokenSource(false, auth.SCOPE_USERINFO_EMAIL, auth.SCOPE_GERRIT)
if err != nil {
sklog.Fatalf("Problem setting up default token source: %s", err)
}
client := httputils.DefaultClientConfig().WithTokenSource(ts).Client()
gitilesClient := gitiles.NewRepo(rfc.GitRepoURL, client)
// This starts a goroutine in the background
if err := pollRepo(ctx, db, gitilesClient, rfc); err != nil {
sklog.Fatalf("Could not do initial update: %s", err)
}
sklog.Infof("Initial update complete")
http.HandleFunc("/healthz", httputils.ReadyHandleFunc)
sklog.Fatal(http.ListenAndServe(rfc.ReadyPort, nil))
}
func mustInitSQLDatabase(ctx context.Context, fcc repoFollowerConfig) *pgxpool.Pool {
if fcc.SQLDatabaseName == "" {
sklog.Fatalf("Must have SQL Database Information")
}
url := sql.GetConnectionURL(fcc.SQLConnection, fcc.SQLDatabaseName)
conf, err := pgxpool.ParseConfig(url)
if err != nil {
sklog.Fatalf("error getting postgres config %s: %s", url, err)
}
conf.MaxConns = maxSQLConnections
db, err := pgxpool.ConnectConfig(ctx, conf)
if err != nil {
sklog.Fatalf("error connecting to the database: %s", err)
}
sklog.Infof("Connected to SQL database %s", fcc.SQLDatabaseName)
return db
}
// pollRepo does an initial updateCycle and starts a goroutine to continue updating according
// to the provided duration for as long as the context remains ok.
func pollRepo(ctx context.Context, db *pgxpool.Pool, client *gitiles.Repo, rfc repoFollowerConfig) error {
sklog.Infof("Doing initial update")
err := updateCycle(ctx, db, client, rfc)
if err != nil {
return skerr.Wrap(err)
}
go func() {
ct := time.Tick(rfc.PollPeriod.Duration)
sklog.Infof("Polling every %s", rfc.PollPeriod.Duration)
for {
select {
case <-ctx.Done():
sklog.Errorf("Stopping polling due to context error: %s", ctx.Err())
return
case <-ct:
err := updateCycle(ctx, db, client, rfc)
if err != nil {
sklog.Errorf("Error on this cycle for talking to %s: %s", rfc.GitRepoURL, rfc)
}
}
}
}()
return nil
}
// GitilesLogger is a subset of the gitiles client library that we need. This allows us to mock
// it out during tests.
type GitilesLogger interface {
Log(ctx context.Context, logExpr string, opts ...gitiles.LogOption) ([]*vcsinfo.LongCommit, error)
LogFirstParent(ctx context.Context, from, to string, opts ...gitiles.LogOption) ([]*vcsinfo.LongCommit, error)
}
// updateCycle polls the gitiles repo for the latest commit and the database for the previously
// seen commit. If those are different, it polls gitiles for all commits that happened between
// those two points and stores them to the DB.
func updateCycle(ctx context.Context, db *pgxpool.Pool, client GitilesLogger, rfc repoFollowerConfig) error {
ctx, span := trace.StartSpan(ctx, "gitilesfollower_updateCycle")
defer span.End()
latestHash, err := getLatestCommitFromRepo(ctx, client, rfc)
if err != nil {
return skerr.Wrap(err)
}
previousHash, previousID, err := getPreviousCommitFromDB(ctx, db)
if err != nil {
return skerr.Wrapf(err, "getting recent commits from DB")
}
if previousHash == latestHash {
sklog.Infof("no updates - latest seen commit %s", previousHash)
return nil
}
if previousHash == "" {
previousHash = rfc.InitialCommit
previousID = initialID
}
sklog.Infof("Getting git history from %s to %s", previousHash, latestHash)
commits, err := client.LogFirstParent(ctx, previousHash, latestHash)
if err != nil {
return skerr.Wrapf(err, "getting backlog of commits from %s..%s", previousHash, latestHash)
}
// commits is backwards and LogFirstParent does not respect gitiles.LogReverse()
reverse(commits)
sklog.Infof("Got %d commits to store", len(commits))
if err := storeCommits(ctx, db, previousID, commits); err != nil {
return skerr.Wrapf(err, "storing %d commits to GitCommits table", len(commits))
}
return nil
}
// reverses the order of the slice.
func reverse(commits []*vcsinfo.LongCommit) {
total := len(commits)
for i := 0; i < total/2; i++ {
commits[i], commits[total-i-1] = commits[total-i-1], commits[i]
}
}
// getLatestCommitFromRepo returns the git hash of the latest git commit known on the configured
// branch. If overrideLatestCommitKey has a value set, that will be used instead.
func getLatestCommitFromRepo(ctx context.Context, client GitilesLogger, rfc repoFollowerConfig) (string, error) {
if hash := ctx.Value(overrideLatestCommitKey); hash != nil {
return hash.(string), nil
}
ctx, span := trace.StartSpan(ctx, "gitilesfollower_getLatestCommitFromRepo")
defer span.End()
latestCommit, err := client.Log(ctx, rfc.GitRepoBranch, gitiles.LogLimit(1))
if err != nil {
return "", skerr.Wrapf(err, "getting last commit")
}
if len(latestCommit) < 1 {
return "", skerr.Fmt("No commits returned")
}
return latestCommit[0].Hash, nil
}
// getPreviousCommitFromDB returns the git_hash and the commit_id of the most recently stored
// commit. "Most recent" here is defined by the lexicographical order of the commit_id. Of note,
// commit_id is returned as an integer because subsequent ids will be computed by adding to that
// integer value.
//
// This approach takes a lesson from Perf by only querying data from the most recent commit in the
// DB and the latest on the tree to make Gold resilient to merged/changed history.
// (e.g. go/skia-infra-pm-007)
func getPreviousCommitFromDB(ctx context.Context, db *pgxpool.Pool) (string, int64, error) {
ctx, span := trace.StartSpan(ctx, "gitilesfollower_getPreviousCommitFromDB")
defer span.End()
row := db.QueryRow(ctx, `SELECT git_hash, commit_id FROM GitCommits
ORDER BY commit_id DESC LIMIT 1`)
hash := ""
id := ""
if err := row.Scan(&hash, &id); err != nil {
if err == pgx.ErrNoRows {
return "", 0, nil // No data in GitCommits
}
return "", 0, skerr.Wrap(err)
}
idInt, err := strconv.ParseInt(id, 10, 64)
if err != nil {
return "", 0, skerr.Wrapf(err, "It is assumed that the commit ids for this type of repo tracking are ints: %q", id)
}
return hash, idInt, nil
}
// storeCommits writes the given commits to the SQL database, assigning them commitIDs in
// monotonically increasing order. The commits slice is expected to be sorted with the oldest
// commit first (the opposite of how gitiles returns it).
func storeCommits(ctx context.Context, db *pgxpool.Pool, lastCommitID int64, commits []*vcsinfo.LongCommit) error {
ctx, span := trace.StartSpan(ctx, "gitilesfollower_storeCommits")
defer span.End()
commitID := lastCommitID + 1
// batchSize is only really relevant in the initial load. But we need it to avoid going over
// the 65k limit of placeholder indexes.
const batchSize = 1000
const statement = `UPSERT INTO GitCommits (git_hash, commit_id, commit_time, author_email, subject) VALUES `
const valuesPerRow = 5
err := util.ChunkIter(len(commits), batchSize, func(startIdx int, endIdx int) error {
chunk := commits[startIdx:endIdx]
arguments := make([]interface{}, 0, len(chunk)*valuesPerRow)
for _, c := range chunk {
cid := fmt.Sprintf("%012d", commitID)
arguments = append(arguments, c.Hash, cid, c.Timestamp, c.Author, c.Subject)
commitID++
}
vp := sql.ValuesPlaceholders(valuesPerRow, len(chunk))
if _, err := db.Exec(ctx, statement+vp, arguments...); err != nil {
return skerr.Wrap(err)
}
return nil
})
return skerr.Wrap(err)
}