blob: 3c5dca2fa12f77d05529ce99fe0a3558dceb7efc [file] [log] [blame]
// A command-line application that is used to trigger PubSub events for a given Perf config
// over a specific time range. One event per file will be generated for every file found
// in GCS in the given time range.
package main
import (
// flags
var (
configName = flag.String("config_name", "nano", "Name of the perf ingester config to use.")
local = flag.Bool("local", false, "Running locally if true. As opposed to in production.")
start = flag.String("start", "", "Start the ingestion at this time, of the form: 2006-01-02. Default to one week ago.")
end = flag.String("end", "", "Ingest up to this time, of the form: 2006-01-02. Defaults to now.")
prefix = flag.String("prefix", "gs://skia-perf/nano-json-v1", "The bucket and root directory to scan for files.")
dryrun = flag.Bool("dry_run", false, "Just display the list of files to send.")
func main() {
ctx := context.Background()
cfg, ok := config.PERF_BIGTABLE_CONFIGS[*configName]
if !ok {
sklog.Fatalf("Invalid --config value: %q", *configName)
ts, err := auth.NewDefaultTokenSource(*local, storage.ScopeReadOnly)
if err != nil {
sklog.Fatalf("Failed to create TokenSource: %s", err)
pubSubClient, err := pubsub.NewClient(ctx, cfg.Project, option.WithTokenSource(ts))
if err != nil {
topic := pubSubClient.Topic(cfg.Topic)
now := time.Now()
startTime := now.Add(-7 * 24 * time.Hour)
if *start != "" {
startTime, err = time.Parse("2006-01-02", *start)
if err != nil {
endTime := now
if *end != "" {
endTime, err = time.Parse("2006-01-02", *end)
if err != nil {
client := httputils.DefaultClientConfig().WithTokenSource(ts).WithoutRetries().Client()
gcsClient, err := storage.NewClient(ctx, option.WithHTTPClient(client))
if err != nil {
sklog.Fatalf("Failed to create GCS client: %s", err)
u, err := url.Parse(*prefix)
if err != nil {
sklog.Fatalf("Failed to parse the --prefix flag: %s", err)
dirs := fileutil.GetHourlyDirs(u.Path[1:], startTime.Unix(), endTime.Unix())
for _, dir := range dirs {
sklog.Infof("Directory: %q", dir)
err := gcs.AllFilesInDir(gcsClient, u.Host, dir, func(item *storage.ObjectAttrs) {
// The PubSub event data is a JSON serialized storage.ObjectAttrs object.
// See
sklog.Infof("File: %q", item.Name)
b, err := json.Marshal(storage.ObjectAttrs{
Name: item.Name,
Bucket: u.Host,
if err != nil {
sklog.Errorf("Failed to serialize event: %s", err)
if *dryrun {
fmt.Println(item.Name, item.Bucket)
topic.Publish(ctx, &pubsub.Message{
Data: b,
if err != nil {
if err != nil {
sklog.Errorf("Failed while walking GCS files: %s", err)