blob: b29d3abc6d2acd103bc01a1f5b56187d3e42c73c [file] [log] [blame]
// Package process does the whole process of ingesting files into a trace store.
package process
import (
"context"
"sync"
"time"
"cloud.google.com/go/pubsub"
"go.opencensus.io/trace"
"go.skia.org/infra/go/metrics2"
"go.skia.org/infra/go/paramtools"
"go.skia.org/infra/go/query"
"go.skia.org/infra/go/skerr"
"go.skia.org/infra/go/sklog"
"golang.org/x/oauth2/google"
"go.skia.org/infra/perf/go/builders"
"go.skia.org/infra/perf/go/config"
"go.skia.org/infra/perf/go/file"
"go.skia.org/infra/perf/go/git"
"go.skia.org/infra/perf/go/ingest/parser"
"go.skia.org/infra/perf/go/ingestevents"
"go.skia.org/infra/perf/go/tracestore"
"go.skia.org/infra/perf/go/tracing"
"go.skia.org/infra/perf/go/types"
"google.golang.org/api/option"
)
const writeRetries = 10
// defaultDatabaseTimeout is the context timeout used when making a request that
// involves the database. For more complex requests use config.QueryMaxRuntime.
const defaultDatabaseTimeout = time.Minute
// sendPubSubEvent sends the unencoded params and paramset found in a single
// ingested file to the PubSub topic specified in the selected Perf instances
// configuration data.
func sendPubSubEvent(ctx context.Context, pubSubClient *pubsub.Client, topicName string, params []paramtools.Params, paramset paramtools.ReadOnlyParamSet, filename string) error {
if topicName == "" {
return nil
}
traceIDs := make([]string, 0, len(params))
for _, p := range params {
key, err := query.MakeKey(p)
if err != nil {
continue
}
traceIDs = append(traceIDs, key)
}
ie := &ingestevents.IngestEvent{
TraceIDs: traceIDs,
ParamSet: paramset,
Filename: filename,
}
body, err := ingestevents.CreatePubSubBody(ie)
if err != nil {
return skerr.Wrapf(err, "Failed to encode PubSub body for topic: %q", topicName)
}
msg := &pubsub.Message{
Data: body,
}
_, err = pubSubClient.Topic(topicName).Publish(ctx, msg).Get(ctx)
return skerr.Wrap(err)
}
// workerInfo is all the information that a worker Go routine will need to
// process a single incoming file.
type workerInfo struct {
filesReceived metrics2.Counter
failedToParse metrics2.Counter
skipped metrics2.Counter
badGitHash metrics2.Counter
failedToWrite metrics2.Counter
successfulWrite metrics2.Counter
successfulWriteCount metrics2.Counter
dlEnabled bool
p *parser.Parser
store tracestore.TraceStore
g git.Git
pubSubClient *pubsub.Client
instanceConfig *config.InstanceConfig
}
// newWorker returns a new *workerInfo.
func newWorker(
filesReceived metrics2.Counter,
failedToParse metrics2.Counter,
skipped metrics2.Counter,
badGitHash metrics2.Counter,
failedToWrite metrics2.Counter,
successfulWrite metrics2.Counter,
successfulWriteCount metrics2.Counter,
dlEnabled bool,
p *parser.Parser,
store tracestore.TraceStore,
g git.Git,
pubSubClient *pubsub.Client,
instanceConfig *config.InstanceConfig,
) *workerInfo {
return &workerInfo{
filesReceived: filesReceived,
failedToParse: failedToParse,
skipped: skipped,
badGitHash: badGitHash,
failedToWrite: failedToWrite,
successfulWrite: successfulWrite,
successfulWriteCount: successfulWriteCount,
dlEnabled: dlEnabled,
p: p,
store: store,
g: g,
pubSubClient: pubSubClient,
instanceConfig: instanceConfig,
}
}
// processSingleFile parses a single incoming file and write the data to the
// datastore.
func (w *workerInfo) processSingleFile(f file.File) error {
ctx, cancel := context.WithTimeout(context.Background(), defaultDatabaseTimeout)
defer cancel()
ctx, span := trace.StartSpan(ctx, "ingest.parser.processSingleFile")
defer span.End()
sklog.Infof("Ingest received: %v", f)
w.filesReceived.Inc(1)
// Parse the file.
params, values, gitHash, err := w.p.Parse(ctx, f)
if err != nil {
if err == parser.ErrFileShouldBeSkipped {
sklog.Debugf("File should be skipped %v: %s", f, err)
w.skipped.Inc(1)
} else {
sklog.Errorf("Failed to parse %v: %s", f, err)
w.failedToParse.Inc(1)
}
nackMessageIfNecessary(w.dlEnabled, f)
return nil
}
sklog.Info("Lookup CommitNumber")
// if git_hash is missing from GCS file
if len(gitHash) == 0 {
sklog.Errorf("Unable to handle empty git hash.")
nackMessageIfNecessary(w.dlEnabled, f)
return nil
}
commitNumberFromFile := types.CommitNumber(0)
if w.g.RepoSuppliedCommitNumber() {
commitNumberFromFile, err = w.p.ParseCommitNumberFromGitHash(gitHash)
if err != nil {
sklog.Errorf("Unable to convert githash to integer commit number %q.", gitHash, err)
nackMessageIfNecessary(w.dlEnabled, f)
return nil
}
}
// Convert gitHash or check the existance of a commitNumber.
commitNumber, err := w.g.GetCommitNumber(ctx, gitHash, commitNumberFromFile)
if err != nil {
if err := w.g.Update(ctx); err != nil {
sklog.Errorf("Failed to Update: ", err)
}
commitNumber, err = w.g.GetCommitNumber(ctx, gitHash, commitNumberFromFile)
if err != nil {
w.badGitHash.Inc(1)
sklog.Error("Failed to find commit number %v: %s", f, err)
nackMessageIfNecessary(w.dlEnabled, f)
return nil
}
}
sklog.Info("Build ParamSet")
// Build paramset from params.
ps := paramtools.NewParamSet()
for _, p := range params {
ps.AddParams(p)
}
ps.Normalize()
sklog.Info("WriteTraces")
const retries = writeRetries
i := 0
writeFailed := false
for {
// Write data to the trace store.
err := w.store.WriteTraces(ctx, commitNumber, params, values, ps, f.Name, time.Now())
if err == nil {
break
}
i++
if i > retries {
writeFailed = true
break
}
}
if writeFailed {
w.failedToWrite.Inc(1)
sklog.Errorf("Failed to write after %d retries %q: %s", retries, f.Name, err)
nackMessageIfNecessary(w.dlEnabled, f)
} else {
if f.PubSubMsg != nil {
f.PubSubMsg.Ack()
sklog.Debugf("Message acked: %v", f.PubSubMsg)
}
w.successfulWrite.Inc(1)
w.successfulWriteCount.Inc(int64(len(params)))
}
if err := sendPubSubEvent(ctx, w.pubSubClient, w.instanceConfig.IngestionConfig.FileIngestionTopicName, params, ps.Freeze(), f.Name); err != nil {
sklog.Errorf("Failed to send pubsub event: %s", err)
} else {
sklog.Info("FileIngestionTopicName pubsub message sent.")
}
return nil
}
// worker ingests files that arrive on the given 'ch' channel.
func worker(ctx context.Context, wg *sync.WaitGroup, g git.Git, store tracestore.TraceStore, ch <-chan file.File, pubSubClient *pubsub.Client, instanceConfig *config.InstanceConfig) {
// Metrics.
filesReceived := metrics2.GetCounter("perfserver_ingest_files_received")
failedToParse := metrics2.GetCounter("perfserver_ingest_failed_to_parse")
skipped := metrics2.GetCounter("perfserver_ingest_skipped")
badGitHash := metrics2.GetCounter("perfserver_ingest_bad_githash")
failedToWrite := metrics2.GetCounter("perfserver_ingest_failed_to_write")
successfulWrite := metrics2.GetCounter("perfserver_ingest_successful_write")
successfulWriteCount := metrics2.GetCounter("perfserver_ingest_num_points_written")
dlEnabled := config.IsDeadLetterCollectionEnabled(instanceConfig)
// New Parser.
p, err := parser.New(instanceConfig)
if err != nil {
sklog.Errorf("Ingestion worker failed to create parser: %s", err)
wg.Done()
return
}
workerInfo := newWorker(filesReceived, failedToParse, skipped, badGitHash, failedToWrite, successfulWrite, successfulWriteCount, dlEnabled, p, store, g, pubSubClient, instanceConfig)
for f := range ch {
if err := ctx.Err(); err != nil {
sklog.Error(err)
break
}
if err := workerInfo.processSingleFile(f); err != nil {
break
}
}
wg.Done()
}
// Start a single go routine to process incoming ingestion files and write
// the data they contain to a trace store.
//
// Except for file.Sources of type "dir" this function should never return
// except on error.
func Start(ctx context.Context, local bool, numParallelIngesters int, instanceConfig *config.InstanceConfig) error {
if err := tracing.Init(local, instanceConfig); err != nil {
sklog.Fatalf("Failed to start tracing: %s", err)
}
var pubSubClient *pubsub.Client
if instanceConfig.IngestionConfig.FileIngestionTopicName != "" {
ts, err := google.DefaultTokenSource(ctx, pubsub.ScopePubSub)
if err != nil {
sklog.Fatalf("Failed to create TokenSource: %s", err)
}
pubSubClient, err = pubsub.NewClient(ctx, instanceConfig.IngestionConfig.SourceConfig.Project, option.WithTokenSource(ts))
if err != nil {
sklog.Fatal(err)
}
}
// New file.Source.
source, err := builders.NewSourceFromConfig(ctx, instanceConfig, local)
if err != nil {
return skerr.Wrap(err)
}
ch, err := source.Start(ctx)
if err != nil {
return skerr.Wrap(err)
}
// New TraceStore.
store, err := builders.NewTraceStoreFromConfig(ctx, local, instanceConfig)
if err != nil {
return skerr.Wrap(err)
}
// New perfgit.Git.
sklog.Infof("Cloning repo %q into %q", instanceConfig.GitRepoConfig.URL, instanceConfig.GitRepoConfig.Dir)
g, err := builders.NewPerfGitFromConfig(ctx, local, instanceConfig)
if err != nil {
return skerr.Wrap(err)
}
// Polling isn't needed because we call update on the repo if we find a git hash we don't recognize.
// g.StartBackgroundPolling(ctx, gitRefreshDuration)
sklog.Info("Waiting on files to process.")
var wg sync.WaitGroup
for i := 0; i < numParallelIngesters; i++ {
wg.Add(1)
go worker(ctx, &wg, g, store, ch, pubSubClient, instanceConfig)
}
wg.Wait()
sklog.Infof("Exited while waiting on files. Should only happen on source_type=dir.")
return nil
}
func nackMessageIfNecessary(dlEnabled bool, f file.File) {
if dlEnabled {
// This message will be available to the ingestor immediately.
f.PubSubMsg.Nack()
sklog.Debugf("Message nacked during message process: %v", f.PubSubMsg)
}
}