[datahopper] No more statefulset
Workdir is only used to write the last ingestion timestamp for
bot_metrics. We can use the last-ingested commit timestamp as a
conservative estimate for the same.
Change-Id: Iacf794abebce75f31e9ca7da31942768686a54d3
Reviewed-on: https://skia-review.googlesource.com/c/buildbot/+/233167
Commit-Queue: Eric Boren <borenet@google.com>
Reviewed-by: Ben Wagner aka dogben <benjaminwagner@google.com>
diff --git a/datahopper/go/bot_metrics/bot_metrics.go b/datahopper/go/bot_metrics/bot_metrics.go
index 1920192..f5a3099 100644
--- a/datahopper/go/bot_metrics/bot_metrics.go
+++ b/datahopper/go/bot_metrics/bot_metrics.go
@@ -11,10 +11,7 @@
"context"
"encoding/gob"
"fmt"
- "io/ioutil"
"math"
- "os"
- "path"
"sort"
"strings"
"time"
@@ -193,7 +190,7 @@
// cycle runs ingestion of task data, maps each task to the commits it covered
// before any other task, and inserts event data based on the lag time between
// a commit landing and each task finishing for that commit.
-func cycle(ctx context.Context, taskDb db.TaskReader, repos repograph.Map, tcc *task_cfg_cache.TaskCfgCache, edb events.EventDB, em *events.EventMetrics, lastFinished, now time.Time, workdir string) error {
+func cycle(ctx context.Context, taskDb db.TaskReader, repos repograph.Map, tcc *task_cfg_cache.TaskCfgCache, edb events.EventDB, em *events.EventMetrics, lastFinished, now time.Time) error {
totalCommits := 0
for _, r := range repos {
totalCommits += r.Len()
@@ -367,45 +364,43 @@
return fmt.Errorf("Failed to update metrics: %s", err)
}
em.LogMetrics()
- if err := writeTs(workdir, now); err != nil {
- return fmt.Errorf("Failed to write timestamp file: %s", err)
- }
return nil
}
-// readTs returns the last successful run timestamp which was cached in a file.
-func readTs(workdir string) (time.Time, error) {
- var rv time.Time
- b, err := ioutil.ReadFile(path.Join(workdir, TIMESTAMP_FILE))
- if err != nil {
- if os.IsNotExist(err) {
+// getLastIngestionTs returns the timestamp of the last commit for which we
+// successfully ingested events.
+func getLastIngestionTs(edb events.EventDB, repos repograph.Map) (time.Time, error) {
+ timeEnd := time.Now()
+ window := time.Hour
+ for {
+ timeStart := timeEnd.Add(-window)
+ var latest time.Time
+ for repo := range repos {
+ ev, err := edb.Range(fmtStream(repo), timeStart, timeEnd)
+ if err != nil {
+ return BEGINNING_OF_TIME, err
+ }
+ if len(ev) > 0 {
+ ts := ev[len(ev)-1].Timestamp
+ if ts.After(latest) {
+ latest = ts
+ }
+ }
+ }
+ if !util.TimeIsZero(latest) {
+ return latest, nil
+ }
+ if timeStart.Before(BEGINNING_OF_TIME) {
return BEGINNING_OF_TIME, nil
}
- return rv, err
+ window *= 2
+ timeEnd = timeStart
}
- if err := gob.NewDecoder(bytes.NewBuffer(b)).Decode(&rv); err != nil {
- return rv, err
- }
- return rv, nil
-}
-
-// writeTs writes the last successful run timestamp to a file.
-func writeTs(workdir string, ts time.Time) error {
- var buf bytes.Buffer
- if err := gob.NewEncoder(&buf).Encode(ts); err != nil {
- return err
- }
- return ioutil.WriteFile(path.Join(workdir, TIMESTAMP_FILE), buf.Bytes(), os.ModePerm)
}
// Start initiates "average time to X% bot coverage" metrics data generation.
// The caller is responsible for updating the passed-in repos and TaskCfgCache.
-func Start(ctx context.Context, taskDb db.TaskReader, repos repograph.Map, tcc *task_cfg_cache.TaskCfgCache, btProject, btInstance, workdir string, ts oauth2.TokenSource) error {
- // Setup.
- if err := os.MkdirAll(workdir, os.ModePerm); err != nil {
- return err
- }
-
+func Start(ctx context.Context, taskDb db.TaskReader, repos repograph.Map, tcc *task_cfg_cache.TaskCfgCache, btProject, btInstance string, ts oauth2.TokenSource) error {
// Set up event metrics.
edb, err := events.NewBTEventDB(ctx, btProject, btInstance, ts)
if err != nil {
@@ -416,8 +411,8 @@
return fmt.Errorf("Failed to create EventMetrics: %s", err)
}
for repoUrl := range repos {
+ s := em.GetEventStream(fmtStream(repoUrl))
for _, p := range []time.Duration{24 * time.Hour, 7 * 24 * time.Hour} {
- s := em.GetEventStream(fmtStream(repoUrl))
for _, pct := range PERCENTILES {
if err := addMetric(s, repoUrl, pct, p); err != nil {
return fmt.Errorf("Failed to add metric: %s", err)
@@ -427,13 +422,13 @@
}
lv := metrics2.NewLiveness("last_successful_bot_coverage_metrics")
- lastFinished, err := readTs(workdir)
+ lastFinished, err := getLastIngestionTs(edb, repos)
if err != nil {
- return fmt.Errorf("Failed to read timestamp: %s", err)
+ return fmt.Errorf("Failed to get timestamp of last successful ingestion: %s", err)
}
go util.RepeatCtx(10*time.Minute, ctx, func() {
now := time.Now()
- if err := cycle(ctx, taskDb, repos, tcc, edb, em, lastFinished, now, workdir); err != nil {
+ if err := cycle(ctx, taskDb, repos, tcc, edb, em, lastFinished, now); err != nil {
sklog.Errorf("Failed to obtain avg time to X%% bot coverage metrics: %s", err)
} else {
lastFinished = now
diff --git a/datahopper/go/datahopper/main.go b/datahopper/go/datahopper/main.go
index 684c799..5db8804 100644
--- a/datahopper/go/datahopper/main.go
+++ b/datahopper/go/datahopper/main.go
@@ -8,7 +8,6 @@
"context"
"flag"
"fmt"
- "path/filepath"
"regexp"
"time"
@@ -56,7 +55,6 @@
repoUrls = common.NewMultiStringFlag("repo", nil, "Repositories to query for status.")
swarmingServer = flag.String("swarming_server", "", "Host name of the Swarming server.")
swarmingPools = common.NewMultiStringFlag("swarming_pool", nil, "Swarming pools to use.")
- workdir = flag.String("workdir", ".", "Working directory used by data processors.")
)
var (
@@ -78,13 +76,6 @@
)
ctx := context.Background()
- // Absolutify the workdir.
- w, err := filepath.Abs(*workdir)
- if err != nil {
- sklog.Fatal(w)
- }
- sklog.Infof("Workdir is %s", w)
-
// OAuth2.0 TokenSource.
ts, err := auth.NewDefaultTokenSource(*local, auth.SCOPE_USERINFO_EMAIL, pubsub.AUTH_SCOPE, bigtable.Scope, datastore.ScopeDatastore, swarming.AUTH_SCOPE, auth.SCOPE_READ_WRITE, auth.SCOPE_GERRIT)
if err != nil {
@@ -185,7 +176,7 @@
}
// Generate "time to X% bot coverage" metrics.
- if err := bot_metrics.Start(ctx, d, repos, tcc, *btProject, *btInstance, *workdir, ts); err != nil {
+ if err := bot_metrics.Start(ctx, d, repos, tcc, *btProject, *btInstance, ts); err != nil {
sklog.Fatal(err)
}