blob: d362f612ff3992197f59fb37da69a1d3c006eccf [file] [log] [blame]
package main
// skia_ingestion is the server process that runs an arbitary number of
// ingesters and stores them in traceDB backends.
import (
"flag"
"io/ioutil"
"os"
"os/signal"
"path/filepath"
"runtime/pprof"
"time"
"go.skia.org/infra/go/auth"
"go.skia.org/infra/go/common"
"go.skia.org/infra/go/eventbus"
"go.skia.org/infra/go/geventbus"
"go.skia.org/infra/go/ingestion"
"go.skia.org/infra/go/sharedconfig"
"go.skia.org/infra/go/sklog"
"go.skia.org/infra/go/util"
_ "go.skia.org/infra/golden/go/goldingestion"
_ "go.skia.org/infra/golden/go/pdfingestion"
storage "google.golang.org/api/storage/v1"
)
// Command line flags.
var (
configFilename = flag.String("config_filename", "default.toml", "Configuration file in TOML format.")
local = flag.Bool("local", false, "Running locally if true. As opposed to in production.")
memProfile = flag.Duration("memprofile", 0, "Duration for which to profile memory. After this duration the program writes the memory profile and exits.")
nsqdAddress = flag.String("nsqd", "", "Address and port of nsqd instance.")
promPort = flag.String("prom_port", ":20000", "Metrics service address (e.g., ':10110')")
serviceAccountFile = flag.String("service_account_file", "", "Credentials file for service account.")
)
func main() {
defer common.LogPanic()
_, appName := filepath.Split(os.Args[0])
// Global init to initialize prometheus and cloud logging.
common.InitWithMust(appName,
common.PrometheusOpt(promPort),
common.CloudLoggingOpt(),
)
// If no nsqd servers is defines, we simply don't have gloabl events.
var globalEventBus geventbus.GlobalEventBus = nil
var err error
if *nsqdAddress != "" {
globalEventBus, err = geventbus.NewNSQEventBus(*nsqdAddress)
if err != nil {
sklog.Fatalf("Unable to connect to NSQ server at address %s: %s", *nsqdAddress, err)
}
}
evt := eventbus.New(globalEventBus)
// Initialize oauth client and start the ingesters.
client, err := auth.NewJWTServiceAccountClient("", *serviceAccountFile, nil, storage.CloudPlatformScope)
if err != nil {
sklog.Fatalf("Failed to auth: %s", err)
}
// Start the ingesters.
config, err := sharedconfig.ConfigFromTomlFile(*configFilename)
if err != nil {
sklog.Fatalf("Unable to read config file %s. Got error: %s", *configFilename, err)
}
ingesters, err := ingestion.IngestersFromConfig(config, client, evt)
if err != nil {
sklog.Fatalf("Unable to instantiate ingesters: %s", err)
}
for _, oneIngester := range ingesters {
oneIngester.Start()
}
// Enable the memory profiler if memProfile was set.
if *memProfile > 0 {
writeProfileFn := func() {
sklog.Infof("\nWriting Memory Profile")
f, err := ioutil.TempFile("./", "memory-profile")
if err != nil {
sklog.Fatalf("Unable to create memory profile file: %s", err)
}
if err := pprof.WriteHeapProfile(f); err != nil {
sklog.Fatalf("Unable to write memory profile file: %v", err)
}
util.Close(f)
sklog.Infof("Memory profile written to %s", f.Name())
os.Exit(0)
}
// Write the profile after the given time or whenever we get a SIGINT signal.
time.AfterFunc(*memProfile, writeProfileFn)
ch := make(chan os.Signal)
signal.Notify(ch, os.Interrupt)
go func() {
<-ch
writeProfileFn()
}()
}
// Run the ingesters forever.
select {}
}