blob: cc08e1b4cda1ad5114761f073ba77439f992f110 [file] [log] [blame]
package main
// skia_ingestion is the server process that runs an arbitrary number of
// ingesters and stores them in traceDB backends.
import (
"context"
"flag"
"io/ioutil"
"log"
"net/http"
"os"
"path/filepath"
"runtime/pprof"
"time"
"cloud.google.com/go/pubsub"
"cloud.google.com/go/storage"
"go.skia.org/infra/go/auth"
"go.skia.org/infra/go/cleanup"
"go.skia.org/infra/go/common"
"go.skia.org/infra/go/ds"
"go.skia.org/infra/go/eventbus"
"go.skia.org/infra/go/gevent"
"go.skia.org/infra/go/gitstore/bt_gitstore"
"go.skia.org/infra/go/httputils"
"go.skia.org/infra/go/ingestion"
"go.skia.org/infra/go/sharedconfig"
"go.skia.org/infra/go/sklog"
"go.skia.org/infra/go/util"
_ "go.skia.org/infra/golden/go/goldingestion"
"google.golang.org/api/option"
)
func main() {
// Command line flags.
var (
btInstance = flag.String("bt_instance", "", "Bigtable instance to use in the project identified by 'project_id'")
configFilename = flag.String("config_filename", "default.json5", "Configuration file in JSON5 format.")
gitBTInstanceID = flag.String("git_bt_instance", "", "ID of the BigTable instance that contains Git metadata")
gitBTTableID = flag.String("git_bt_table", "", "ID of the BigTable table that contains Git metadata")
httpPort = flag.String("http_port", ":9091", "The http port where ready-ness endpoints are served.")
local = flag.Bool("local", false, "Running locally if true. As opposed to in production.")
memProfile = flag.Duration("memprofile", 0, "Duration for which to profile memory. After this duration the program writes the memory profile and exits.")
namespace = flag.String("namespace", "", "Namespace to be used with Cloud datastore and BigTable (as a row-prefix).")
noCloudLog = flag.Bool("no_cloud_log", false, "Disables cloud logging. Primarily for running locally.")
projectID = flag.String("project_id", common.PROJECT_ID, "GCP project ID.")
promPort = flag.String("prom_port", ":20000", "Metrics service address (e.g., ':10110')")
)
// Parse the options. So we can configure logging.
flag.Parse()
_, appName := filepath.Split(os.Args[0])
// Set up the logging options.
logOpts := []common.Opt{
common.PrometheusOpt(promPort),
}
// Should we disable cloud logging.
if !*noCloudLog {
logOpts = append(logOpts, common.CloudLoggingOpt())
}
common.InitWithMust(appName, logOpts...)
ctx := context.Background()
// Initialize oauth client and start the ingesters.
tokenSrc, err := auth.NewDefaultTokenSource(*local, storage.ScopeFullControl, pubsub.ScopePubSub, pubsub.ScopeCloudPlatform)
if err != nil {
sklog.Fatalf("Failed to auth: %s", err)
}
client := httputils.DefaultClientConfig().WithTokenSource(tokenSrc).With2xxOnly().WithDialTimeout(time.Second * 10).Client()
// Make sure we have a namespace.
if *namespace == "" {
sklog.Fatalf("'namespace' cannot be empty")
}
if err := ds.InitWithOpt(*projectID, *namespace, option.WithTokenSource(tokenSrc)); err != nil {
sklog.Fatalf("Unable to configure cloud datastore: %s", err)
}
// If configured create an instance of IngestionStore based on BigTable.
var ingestionStore ingestion.IngestionStore
if *namespace != "" && *projectID != "" && *btInstance != "" {
ingestionStore, err = ingestion.NewBTIStore(*projectID, *btInstance, *namespace)
if err != nil {
sklog.Errorf("Error creating ingestion store: %s", err)
}
sklog.Infof("IngestionStore instance instantiated.")
}
// Start the ingesters.
config, err := sharedconfig.ConfigFromJson5File(*configFilename)
if err != nil {
sklog.Fatalf("Unable to read config file %s. Got error: %s", *configFilename, err)
}
// Set up the eventbus.
var eventBus eventbus.EventBus
if config.EventTopic != "" {
nodeName, err := gevent.GetNodeName(appName, *local)
if err != nil {
sklog.Fatalf("Error getting node name: %s", err)
}
eventBus, err = gevent.New(*projectID, config.EventTopic, nodeName, option.WithTokenSource(tokenSrc))
if err != nil {
sklog.Fatalf("Error creating global eventbus: %s", err)
}
sklog.Infof("Global eventbus for topic '%s' and subscriber '%s' created. %v", config.EventTopic, nodeName, eventBus == nil)
} else {
eventBus = eventbus.New()
}
// Set up the gitstore if we have the necessary bigtable configuration.
var btConf *bt_gitstore.BTConfig = nil
if *gitBTInstanceID != "" && *gitBTTableID != "" {
btConf = &bt_gitstore.BTConfig{
ProjectID: *projectID,
InstanceID: *gitBTInstanceID,
TableID: *gitBTTableID,
}
}
// Set up the ingesters in the background.
var ingesters []*ingestion.Ingester
go func() {
var err error
ingesters, err = ingestion.IngestersFromConfig(ctx, config, client, eventBus, ingestionStore, btConf)
if err != nil {
sklog.Fatalf("Unable to instantiate ingesters: %s", err)
}
for _, oneIngester := range ingesters {
if err := oneIngester.Start(ctx); err != nil {
sklog.Fatalf("Unable to start ingester: %s", err)
}
}
}()
// Enable the memory profiler if memProfile was set.
if *memProfile > 0 {
writeProfileFn := func() {
sklog.Infof("\nWriting Memory Profile")
f, err := ioutil.TempFile("./", "memory-profile")
if err != nil {
sklog.Fatalf("Unable to create memory profile file: %s", err)
}
if err := pprof.WriteHeapProfile(f); err != nil {
sklog.Fatalf("Unable to write memory profile file: %v", err)
}
util.Close(f)
sklog.Infof("Memory profile written to %s", f.Name())
os.Exit(0)
}
// Write the profile after the given time or whenever we get a SIGINT signal.
time.AfterFunc(*memProfile, writeProfileFn)
cleanup.AtExit(writeProfileFn)
}
// Set up the http handler to indicate ready-ness and start serving.
http.HandleFunc("/healthz", httputils.ReadyHandleFunc)
log.Fatal(http.ListenAndServe(*httpPort, nil))
}