blob: 70e8b24a759f86e39319996fa2e2e17889353410 [file] [log] [blame]
// Executable file_reingester will scan through all the files in a GCS bucket and create synthetic
// pubsub events to cause the files to be re-ingested.
package main
import (
"context"
"flag"
"os"
"strings"
"time"
"go.skia.org/infra/go/sklog/sklogimpl"
"go.skia.org/infra/go/sklog/stdlogging"
"cloud.google.com/go/pubsub"
"cloud.google.com/go/storage"
"go.skia.org/infra/go/common"
"go.skia.org/infra/go/fileutil"
"go.skia.org/infra/go/gcs"
"go.skia.org/infra/go/sklog"
)
func main() {
var (
ingesterTopic = flag.String("ingester_topic", "", "Pubsub topic on which to generate synthetic events.")
projectID = flag.String("project_id", "skia-public", "GCP project ID.")
srcBucket = flag.String("src_bucket", "", "Source bucket to ingest files from.")
srcRootDir = flag.String("src_root_dir", "dm-json-v1", "Source root directory to ingest files in.")
changelistIDs = common.NewMultiStringFlag("changelists", nil, "If provided, will only ingest data from the provided changelists")
// In the early days, there was several invalid entries, because they did not specify
// gitHash. Starting re-ingesting Skia on October 1, 2014 seems to be around when
// the data is correct.
startYear = flag.Int("start_year", 2019, "year to start ingesting")
startMonth = flag.Int("start_month", 1, "month to start ingesting")
startDay = flag.Int("start_day", 1, "day to start ingesting (at midnight UTC)")
sleepBetweenDays = flag.Duration("sleep_between_days", 0, "If non-zero, the amount of time to wait after reingesting one day's worth of data.")
)
flag.Parse()
sklogimpl.SetLogger(stdlogging.New(os.Stderr))
ctx := context.Background()
gcsClient, err := storage.NewClient(ctx)
if err != nil {
sklog.Fatalf("Failed to create GCS client: %s", err)
}
psc, err := pubsub.NewClient(ctx, *projectID)
if err != nil {
sklog.Fatalf("Could not make pubsub client for project %q: %s", *projectID, err)
}
// Check that the topic exists. Fail if it does not.
topic := psc.Topic(*ingesterTopic)
if exists, err := topic.Exists(ctx); err != nil {
sklog.Fatalf("Error checking for existing topic %q: %s", *ingesterTopic, err)
} else if !exists {
sklog.Fatalf("topic %s does not exist in project %s", *ingesterTopic, *projectID)
}
sklog.Infof("starting scanning %q in project %s", *ingesterTopic, *projectID)
beginning := time.Date(*startYear, time.Month(*startMonth), *startDay, 0, 0, 0, 0, time.UTC)
root := *srcRootDir
if *changelistIDs != nil && len(*changelistIDs) > 0 {
sklog.Infof("Only processing cls: %+v", *changelistIDs)
root = "trybot/dm-json-v1"
}
dirs := fileutil.GetHourlyDirs(root, beginning, time.Now())
published := 0
for _, dir := range dirs {
sklog.Infof("Directory: %q", dir)
var last *pubsub.PublishResult
err := gcs.AllFilesInDir(gcsClient, *srcBucket, dir, func(item *storage.ObjectAttrs) {
if matchesChangelist(changelistIDs, item.Name) {
published++
if published%1000 == 0 {
sklog.Infof("%d reingeseted", published)
}
last = publishSyntheticStorageEvent(ctx, topic, item.Bucket, item.Name)
}
})
if err != nil {
sklog.Warningf("Error while processing dir %s: %s", dir, err)
}
if last != nil {
_, err := last.Get(context.Background())
if err != nil {
sklog.Fatalf("Could not publish: %s", err)
} else {
sklog.Debugf("Published something for %s", dir)
}
}
if strings.HasSuffix(dir, "/23") {
if *sleepBetweenDays > time.Second {
sklog.Infof("Waiting at the end of a day")
time.Sleep(*sleepBetweenDays)
}
}
}
sklog.Infof("waiting for messages to publish")
topic.Stop()
sklog.Infof("done")
}
// matchesChangelist returns true if the given file name matches a changelist id. That is,
// there exists "/[clid]" somewhere in the name. For example:
// trybot/dm-json-v1/2021/09/23/02/4140248__1/8835339621082367857/dm-1632364432749558598.json
// has a match for CL 4140248. It's implemented simply, meant for some adhoc re-ingestion.
func matchesChangelist(changelistIDs *[]string, name string) bool {
if changelistIDs == nil || len(*changelistIDs) == 0 {
return true
}
for _, id := range *changelistIDs {
if strings.Contains(name, "/"+id) {
return true
}
}
return false
}
func publishSyntheticStorageEvent(ctx context.Context, topic *pubsub.Topic, bucket, fileName string) *pubsub.PublishResult {
return topic.Publish(ctx, &pubsub.Message{
// These are the important attributes read for ingestion.
// https://cloud.google.com/storage/docs/pubsub-notifications#attributes
Attributes: map[string]string{
"bucketId": bucket,
"objectId": fileName,
},
Data: nil, // We don't currently read anything from Data.
})
}