| package main |
| |
| // skia_ingestion is the server process that runs an arbitary number of |
| // ingesters and stores them in traceDB backends. |
| |
| import ( |
| "flag" |
| "io/ioutil" |
| "os" |
| "os/signal" |
| "path/filepath" |
| "runtime/pprof" |
| "time" |
| |
| "go.skia.org/infra/go/auth" |
| "go.skia.org/infra/go/common" |
| "go.skia.org/infra/go/eventbus" |
| "go.skia.org/infra/go/geventbus" |
| "go.skia.org/infra/go/ingestion" |
| "go.skia.org/infra/go/sharedconfig" |
| "go.skia.org/infra/go/sklog" |
| "go.skia.org/infra/go/util" |
| _ "go.skia.org/infra/golden/go/goldingestion" |
| _ "go.skia.org/infra/golden/go/pdfingestion" |
| storage "google.golang.org/api/storage/v1" |
| ) |
| |
| // Command line flags. |
| var ( |
| configFilename = flag.String("config_filename", "default.toml", "Configuration file in TOML format.") |
| local = flag.Bool("local", false, "Running locally if true. As opposed to in production.") |
| memProfile = flag.Duration("memprofile", 0, "Duration for which to profile memory. After this duration the program writes the memory profile and exits.") |
| nsqdAddress = flag.String("nsqd", "", "Address and port of nsqd instance.") |
| promPort = flag.String("prom_port", ":20000", "Metrics service address (e.g., ':10110')") |
| serviceAccountFile = flag.String("service_account_file", "", "Credentials file for service account.") |
| ) |
| |
| func main() { |
| defer common.LogPanic() |
| _, appName := filepath.Split(os.Args[0]) |
| |
| // Global init to initialize prometheus and cloud logging. |
| common.InitWithMust(appName, |
| common.PrometheusOpt(promPort), |
| common.CloudLoggingOpt(), |
| ) |
| |
| // If no nsqd servers is defines, we simply don't have gloabl events. |
| var globalEventBus geventbus.GlobalEventBus = nil |
| var err error |
| if *nsqdAddress != "" { |
| globalEventBus, err = geventbus.NewNSQEventBus(*nsqdAddress) |
| if err != nil { |
| sklog.Fatalf("Unable to connect to NSQ server at address %s: %s", *nsqdAddress, err) |
| } |
| } |
| evt := eventbus.New(globalEventBus) |
| |
| // Initialize oauth client and start the ingesters. |
| client, err := auth.NewJWTServiceAccountClient("", *serviceAccountFile, nil, storage.CloudPlatformScope) |
| if err != nil { |
| sklog.Fatalf("Failed to auth: %s", err) |
| } |
| |
| // Start the ingesters. |
| config, err := sharedconfig.ConfigFromTomlFile(*configFilename) |
| if err != nil { |
| sklog.Fatalf("Unable to read config file %s. Got error: %s", *configFilename, err) |
| } |
| |
| ingesters, err := ingestion.IngestersFromConfig(config, client, evt) |
| if err != nil { |
| sklog.Fatalf("Unable to instantiate ingesters: %s", err) |
| } |
| for _, oneIngester := range ingesters { |
| oneIngester.Start() |
| } |
| |
| // Enable the memory profiler if memProfile was set. |
| if *memProfile > 0 { |
| writeProfileFn := func() { |
| sklog.Infof("\nWriting Memory Profile") |
| f, err := ioutil.TempFile("./", "memory-profile") |
| if err != nil { |
| sklog.Fatalf("Unable to create memory profile file: %s", err) |
| } |
| if err := pprof.WriteHeapProfile(f); err != nil { |
| sklog.Fatalf("Unable to write memory profile file: %v", err) |
| } |
| util.Close(f) |
| sklog.Infof("Memory profile written to %s", f.Name()) |
| |
| os.Exit(0) |
| } |
| |
| // Write the profile after the given time or whenever we get a SIGINT signal. |
| time.AfterFunc(*memProfile, writeProfileFn) |
| ch := make(chan os.Signal) |
| signal.Notify(ch, os.Interrupt) |
| go func() { |
| <-ch |
| writeProfileFn() |
| }() |
| } |
| |
| // Run the ingesters forever. |
| select {} |
| } |