blob: dd575e3a523f70f04eb4608326aedf4dfba06477 [file] [log] [blame]
package main
import (
"context"
"flag"
"io/ioutil"
"log"
"net/http"
"strings"
"time"
"github.com/flynn/json5"
"go.skia.org/infra/gitsync/go/watcher"
"go.skia.org/infra/go/auth"
"go.skia.org/infra/go/common"
"go.skia.org/infra/go/gitstore/bt_gitstore"
"go.skia.org/infra/go/gitstore/pubsub"
"go.skia.org/infra/go/httputils"
"go.skia.org/infra/go/human"
"go.skia.org/infra/go/sklog"
"golang.org/x/oauth2/google"
"golang.org/x/sync/errgroup"
)
// This server watches a list of git repos for changes and syncs the meta data of all commits
// to a BigTable backed datastore.
const APPNAME = "gitsync2"
// Default config/flag values
var defaultConf = gitSyncConfig{
BTInstanceID: "production",
BTTableID: "git-repos2",
BTWriteGoroutines: bt_gitstore.DefaultWriteGoroutines,
HttpPort: ":9091",
Local: false,
Mirrors: []string{},
ProjectID: "skia-public",
PromPort: ":20000",
RepoURLs: []string{},
RefreshInterval: human.JSONDuration(10 * time.Minute),
}
func main() {
// config holds the configuration values either from flags or from parsing the config files.
config := defaultConf
// Flags that cause the flags below to be disregarded.
configFile := flag.String("config", "", "Disregard flags and load the configuration from this JSON5 config file. The keys and types of the config file match the flags.")
runInit := flag.Bool("init", false, "Initialize the BigTable instance and quit. This should be run with a different different user who has admin rights.")
gcsBucket := flag.String("gcs_bucket", "", "GCS bucket used for temporary storage during ingestion.")
gcsPath := flag.String("gcs_path", "", "GCS path used for temporary storage during ingestion.")
// Define flags that map to field in the configuration struct.
flag.StringVar(&config.BTInstanceID, "bt_instance", defaultConf.BTInstanceID, "Big Table instance")
flag.StringVar(&config.BTTableID, "bt_table", defaultConf.BTTableID, "BigTable table ID")
flag.IntVar(&config.BTWriteGoroutines, "bt_write_goroutines", defaultConf.BTWriteGoroutines, "Number of goroutines to use when writing to BigTable.")
flag.StringVar(&config.HttpPort, "http_port", defaultConf.HttpPort, "The http port where ready-ness endpoints are served.")
flag.BoolVar(&config.Local, "local", defaultConf.Local, "Running locally if true. As opposed to in production.")
flag.StringVar(&config.ProjectID, "project", defaultConf.ProjectID, "ID of the GCP project")
flag.StringVar(&config.PromPort, "prom_port", defaultConf.PromPort, "Metrics service address (e.g., ':10110')")
common.MultiStringFlagVar(&config.RepoURLs, "repo_url", defaultConf.RepoURLs, "Repo url")
common.MultiStringFlagVar(&config.Mirrors, "mirror", defaultConf.Mirrors, "Obtain data for the given repo url from the given mirror, eg. --mirror=<repo URL>=<gitiles mirror URL>")
common.MultiStringFlagVar(&config.IncludeBranches, "branches", defaultConf.IncludeBranches, "Restrict the given repo URL to the given branches, eg. --branches=<repo URL>=master,my-feature")
common.MultiStringFlagVar(&config.ExcludeBranches, "exclude-branches", defaultConf.ExcludeBranches, "Exclude the given branches for the repo URL, eg. --exclude-branches=<repo URL>=master,my-feature")
flag.DurationVar((*time.Duration)(&config.RefreshInterval), "refresh", time.Duration(defaultConf.RefreshInterval), "Interval in which to poll git and refresh the GitStore.")
common.InitWithMust(
"gitsync",
common.PrometheusOpt(&config.PromPort),
common.MetricsLoggingOpt(),
)
defer common.Defer()
// If a configuration file was given we load it into config.
if *configFile != "" {
confBytes, err := ioutil.ReadFile(*configFile)
if err != nil {
sklog.Fatalf("Error reading config file %s: %s", *configFile, err)
}
if err := json5.Unmarshal(confBytes, &config); err != nil {
sklog.Fatalf("Error parsing config file %s: %s", *configFile, err)
}
}
// Dump the configuration since it might be different than the flags that are dumped by default.
sklog.Infof("\n\n Effective configuration: \n%s \n", config.String())
// Configure the bigtable instance.
btConfig := &bt_gitstore.BTConfig{
ProjectID: config.ProjectID,
InstanceID: config.BTInstanceID,
TableID: config.BTTableID,
AppProfile: APPNAME,
WriteGoroutines: config.BTWriteGoroutines,
}
// Initialize bigtable if invoked with --init and quit.
// This should be invoked with a user that has admin privileges, so that the production user that
// wants to write to the instance does not need admin privileges.
if *runInit {
if err := bt_gitstore.InitBT(btConfig); err != nil {
sklog.Fatalf("Error initializing BT: %s", err)
}
sklog.Infof("BigTable instance %s and table %s in project %s initialized.", btConfig.InstanceID, btConfig.TableID, btConfig.ProjectID)
return
}
// Make sure we have at least one repo configured.
if len(config.RepoURLs) == 0 {
sklog.Fatalf("At least one repository URL must be configured.")
}
// Obtain the Gitiles URLs for each of the repos; by default, assume
// that the Git repo URL is the Gitiles URL, but allow the user to
// specify the Gitiles URL where that is not the case.
gitilesURLs := make(map[string]string, len(config.RepoURLs))
for _, url := range config.RepoURLs {
gitilesURLs[url] = url
}
if len(config.Mirrors) > 0 {
for _, mirror := range config.Mirrors {
split := strings.Split(mirror, "=")
if len(split) != 2 {
sklog.Fatalf("Invalid value for --mirror: %q; must be separated by a single '='", mirror)
}
gitilesURLs[split[0]] = split[1]
}
}
// Create token source.
ctx := context.Background()
ts, err := google.DefaultTokenSource(ctx, auth.ScopeUserinfoEmail, auth.ScopeGerrit, pubsub.AUTH_SCOPE)
if err != nil {
sklog.Fatalf("Problem setting up default token source: %s", err)
}
// Start all repo watchers.
includeBranches := make(map[string][]string, len(config.RepoURLs))
for _, repo := range config.RepoURLs {
includeBranches[repo] = []string{}
}
excludeBranches := make(map[string][]string, len(config.RepoURLs))
for _, repo := range config.RepoURLs {
excludeBranches[repo] = []string{}
}
for _, branchFlag := range config.IncludeBranches {
split := strings.SplitN(branchFlag, "=", 2)
if len(split) != 2 {
sklog.Fatalf("Invalid value for --branch: %s", branchFlag)
}
repo := split[0]
branches := strings.Split(split[1], ",")
if _, ok := includeBranches[repo]; !ok {
sklog.Fatalf("Invalid value for --branch; unknown repo %s", repo)
}
if len(branches) == 0 {
sklog.Fatalf("Invalid value for --branch; no branches specified: %s", branchFlag)
}
includeBranches[repo] = branches
}
for _, branchFlag := range config.ExcludeBranches {
split := strings.SplitN(branchFlag, "=", 2)
if len(split) != 2 {
sklog.Fatalf("Invalid value for --branch: %s", branchFlag)
}
repo := split[0]
branches := strings.Split(split[1], ",")
if _, ok := excludeBranches[repo]; !ok {
sklog.Fatalf("Invalid value for --exclude-branch; unknown repo %s", repo)
}
if len(branches) == 0 {
sklog.Fatalf("Invalid value for --exclude-branch; no branches specified: %s", branchFlag)
}
excludeBranches[repo] = branches
}
var egroup errgroup.Group
for _, repoURL := range config.RepoURLs {
repoURL := repoURL
egroup.Go(func() error {
return watcher.Start(ctx, btConfig, repoURL, includeBranches[repoURL], excludeBranches[repoURL], gitilesURLs[repoURL], *gcsBucket, *gcsPath, time.Duration(config.RefreshInterval), ts)
})
}
if err := egroup.Wait(); err != nil {
sklog.Fatal(err)
}
// Set up the http handler to indicate ready-ness and start serving.
http.HandleFunc("/healthz", httputils.ReadyHandleFunc)
sklog.Infof("Listening on port: %s", config.HttpPort)
log.Fatal(http.ListenAndServe(config.HttpPort, nil))
}