blob: f5646cf880c2cbb459ec63644c54b09c5fd4ecff [file] [log] [blame]
// A command-line application that is used to trigger PubSub events for a given Perf config
// over a specific time range. One event per file will be generated for every file found
// in GCS in the given time range.
package main
import (
"context"
"encoding/json"
"flag"
"fmt"
"net/url"
"time"
"cloud.google.com/go/pubsub"
"cloud.google.com/go/storage"
"go.skia.org/infra/go/auth"
"go.skia.org/infra/go/common"
"go.skia.org/infra/go/fileutil"
"go.skia.org/infra/go/gcs"
"go.skia.org/infra/go/httputils"
"go.skia.org/infra/go/sklog"
"go.skia.org/infra/perf/go/config"
"google.golang.org/api/option"
)
// flags
var (
configName = flag.String("config_name", "nano", "Name of the perf ingester config to use.")
local = flag.Bool("local", false, "Running locally if true. As opposed to in production.")
start = flag.String("start", "", "Start the ingestion at this time, of the form: 2006-01-02. Default to one week ago.")
end = flag.String("end", "", "Ingest up to this time, of the form: 2006-01-02. Defaults to now.")
dryrun = flag.Bool("dry_run", false, "Just display the list of files to send.")
)
func main() {
common.InitWithMust(
"perf-force-ingest",
)
ctx := context.Background()
cfg, ok := config.PERF_BIGTABLE_CONFIGS[*configName]
if !ok {
sklog.Fatalf("Invalid --config value: %q", *configName)
}
ts, err := auth.NewDefaultTokenSource(*local, storage.ScopeReadOnly)
if err != nil {
sklog.Fatalf("Failed to create TokenSource: %s", err)
}
pubSubClient, err := pubsub.NewClient(ctx, cfg.Project, option.WithTokenSource(ts))
if err != nil {
sklog.Fatal(err)
}
topic := pubSubClient.Topic(cfg.Topic)
now := time.Now()
startTime := now.Add(-7 * 24 * time.Hour)
if *start != "" {
startTime, err = time.Parse("2006-01-02", *start)
if err != nil {
sklog.Fatal(err)
}
}
endTime := now
if *end != "" {
endTime, err = time.Parse("2006-01-02", *end)
if err != nil {
sklog.Fatal(err)
}
}
client := httputils.DefaultClientConfig().WithTokenSource(ts).WithoutRetries().Client()
gcsClient, err := storage.NewClient(ctx, option.WithHTTPClient(client))
if err != nil {
sklog.Fatalf("Failed to create GCS client: %s", err)
}
for _, prefix := range cfg.Sources {
u, err := url.Parse(prefix)
if err != nil {
sklog.Fatalf("Failed to parse the prefix: %s", err)
}
dirs := fileutil.GetHourlyDirs(u.Path[1:], startTime.Unix(), endTime.Unix())
for _, dir := range dirs {
sklog.Infof("Directory: %q", dir)
err := gcs.AllFilesInDir(gcsClient, u.Host, dir, func(item *storage.ObjectAttrs) {
// The PubSub event data is a JSON serialized storage.ObjectAttrs object.
// See https://cloud.google.com/storage/docs/pubsub-notifications#payload
sklog.Infof("File: %q", item.Name)
b, err := json.Marshal(storage.ObjectAttrs{
Name: item.Name,
Bucket: u.Host,
})
if err != nil {
sklog.Errorf("Failed to serialize event: %s", err)
}
if *dryrun {
fmt.Println(item.Name, item.Bucket)
return
}
topic.Publish(ctx, &pubsub.Message{
Data: b,
})
})
if err != nil {
if err != nil {
sklog.Errorf("Failed while walking GCS files: %s", err)
}
}
}
}
}