| // Executable file_reingester will scan through all the files in a GCS bucket and create synthetic |
| // pubsub events to cause the files to be re-ingested. |
| package main |
| |
| import ( |
| "context" |
| "flag" |
| "os" |
| "strings" |
| "time" |
| |
| "go.skia.org/infra/go/sklog/sklogimpl" |
| "go.skia.org/infra/go/sklog/stdlogging" |
| |
| "cloud.google.com/go/pubsub" |
| "cloud.google.com/go/storage" |
| |
| "go.skia.org/infra/go/common" |
| "go.skia.org/infra/go/fileutil" |
| "go.skia.org/infra/go/gcs" |
| "go.skia.org/infra/go/sklog" |
| ) |
| |
| func main() { |
| var ( |
| ingesterTopic = flag.String("ingester_topic", "", "Pubsub topic on which to generate synthetic events.") |
| projectID = flag.String("project_id", "skia-public", "GCP project ID.") |
| |
| srcBucket = flag.String("src_bucket", "", "Source bucket to ingest files from.") |
| srcRootDir = flag.String("src_root_dir", "dm-json-v1", "Source root directory to ingest files in.") |
| |
| changelistIDs = common.NewMultiStringFlag("changelists", nil, "If provided, will only ingest data from the provided changelists") |
| |
| // In the early days, there was several invalid entries, because they did not specify |
| // gitHash. Starting re-ingesting Skia on October 1, 2014 seems to be around when |
| // the data is correct. |
| startYear = flag.Int("start_year", 2019, "year to start ingesting") |
| startMonth = flag.Int("start_month", 1, "month to start ingesting") |
| startDay = flag.Int("start_day", 1, "day to start ingesting (at midnight UTC)") |
| |
| sleepBetweenDays = flag.Duration("sleep_between_days", 0, "If non-zero, the amount of time to wait after reingesting one day's worth of data.") |
| ) |
| flag.Parse() |
| sklogimpl.SetLogger(stdlogging.New(os.Stderr)) |
| |
| ctx := context.Background() |
| gcsClient, err := storage.NewClient(ctx) |
| if err != nil { |
| sklog.Fatalf("Failed to create GCS client: %s", err) |
| } |
| |
| psc, err := pubsub.NewClient(ctx, *projectID) |
| if err != nil { |
| sklog.Fatalf("Could not make pubsub client for project %q: %s", *projectID, err) |
| } |
| |
| // Check that the topic exists. Fail if it does not. |
| topic := psc.Topic(*ingesterTopic) |
| if exists, err := topic.Exists(ctx); err != nil { |
| sklog.Fatalf("Error checking for existing topic %q: %s", *ingesterTopic, err) |
| } else if !exists { |
| sklog.Fatalf("topic %s does not exist in project %s", *ingesterTopic, *projectID) |
| } |
| |
| sklog.Infof("starting scanning %q in project %s", *ingesterTopic, *projectID) |
| |
| beginning := time.Date(*startYear, time.Month(*startMonth), *startDay, 0, 0, 0, 0, time.UTC) |
| |
| root := *srcRootDir |
| if *changelistIDs != nil && len(*changelistIDs) > 0 { |
| sklog.Infof("Only processing cls: %+v", *changelistIDs) |
| root = "trybot/dm-json-v1" |
| } |
| |
| dirs := fileutil.GetHourlyDirs(root, beginning, time.Now()) |
| published := 0 |
| for _, dir := range dirs { |
| sklog.Infof("Directory: %q", dir) |
| var last *pubsub.PublishResult |
| err := gcs.AllFilesInDir(gcsClient, *srcBucket, dir, func(item *storage.ObjectAttrs) { |
| if matchesChangelist(changelistIDs, item.Name) { |
| published++ |
| if published%1000 == 0 { |
| sklog.Infof("%d reingeseted", published) |
| } |
| last = publishSyntheticStorageEvent(ctx, topic, item.Bucket, item.Name) |
| } |
| }) |
| if err != nil { |
| sklog.Warningf("Error while processing dir %s: %s", dir, err) |
| } |
| if last != nil { |
| _, err := last.Get(context.Background()) |
| if err != nil { |
| sklog.Fatalf("Could not publish: %s", err) |
| } else { |
| sklog.Debugf("Published something for %s", dir) |
| } |
| } |
| if strings.HasSuffix(dir, "/23") { |
| if *sleepBetweenDays > time.Second { |
| sklog.Infof("Waiting at the end of a day") |
| time.Sleep(*sleepBetweenDays) |
| } |
| } |
| } |
| |
| sklog.Infof("waiting for messages to publish") |
| topic.Stop() |
| sklog.Infof("done") |
| } |
| |
| // matchesChangelist returns true if the given file name matches a changelist id. That is, |
| // there exists "/[clid]" somewhere in the name. For example: |
| // |
| // trybot/dm-json-v1/2021/09/23/02/4140248__1/8835339621082367857/dm-1632364432749558598.json |
| // |
| // has a match for CL 4140248. It's implemented simply, meant for some adhoc re-ingestion. |
| func matchesChangelist(changelistIDs *[]string, name string) bool { |
| if changelistIDs == nil || len(*changelistIDs) == 0 { |
| return true |
| } |
| for _, id := range *changelistIDs { |
| if strings.Contains(name, "/"+id) { |
| return true |
| } |
| } |
| return false |
| } |
| |
| func publishSyntheticStorageEvent(ctx context.Context, topic *pubsub.Topic, bucket, fileName string) *pubsub.PublishResult { |
| return topic.Publish(ctx, &pubsub.Message{ |
| // These are the important attributes read for ingestion. |
| // https://cloud.google.com/storage/docs/pubsub-notifications#attributes |
| Attributes: map[string]string{ |
| "bucketId": bucket, |
| "objectId": fileName, |
| }, |
| Data: nil, // We don't currently read anything from Data. |
| }) |
| } |