[datahopper] No more statefulset

Workdir is only used to write the last ingestion timestamp for
bot_metrics. We can use the last-ingested commit timestamp as a
conservative estimate for the same.

Change-Id: Iacf794abebce75f31e9ca7da31942768686a54d3
Reviewed-on: https://skia-review.googlesource.com/c/buildbot/+/233167
Commit-Queue: Eric Boren <borenet@google.com>
Reviewed-by: Ben Wagner aka dogben <benjaminwagner@google.com>
diff --git a/datahopper/go/bot_metrics/bot_metrics.go b/datahopper/go/bot_metrics/bot_metrics.go
index 1920192..f5a3099 100644
--- a/datahopper/go/bot_metrics/bot_metrics.go
+++ b/datahopper/go/bot_metrics/bot_metrics.go
@@ -11,10 +11,7 @@
 	"context"
 	"encoding/gob"
 	"fmt"
-	"io/ioutil"
 	"math"
-	"os"
-	"path"
 	"sort"
 	"strings"
 	"time"
@@ -193,7 +190,7 @@
 // cycle runs ingestion of task data, maps each task to the commits it covered
 // before any other task, and inserts event data based on the lag time between
 // a commit landing and each task finishing for that commit.
-func cycle(ctx context.Context, taskDb db.TaskReader, repos repograph.Map, tcc *task_cfg_cache.TaskCfgCache, edb events.EventDB, em *events.EventMetrics, lastFinished, now time.Time, workdir string) error {
+func cycle(ctx context.Context, taskDb db.TaskReader, repos repograph.Map, tcc *task_cfg_cache.TaskCfgCache, edb events.EventDB, em *events.EventMetrics, lastFinished, now time.Time) error {
 	totalCommits := 0
 	for _, r := range repos {
 		totalCommits += r.Len()
@@ -367,45 +364,43 @@
 		return fmt.Errorf("Failed to update metrics: %s", err)
 	}
 	em.LogMetrics()
-	if err := writeTs(workdir, now); err != nil {
-		return fmt.Errorf("Failed to write timestamp file: %s", err)
-	}
 	return nil
 }
 
-// readTs returns the last successful run timestamp which was cached in a file.
-func readTs(workdir string) (time.Time, error) {
-	var rv time.Time
-	b, err := ioutil.ReadFile(path.Join(workdir, TIMESTAMP_FILE))
-	if err != nil {
-		if os.IsNotExist(err) {
+// getLastIngestionTs returns the timestamp of the last commit for which we
+// successfully ingested events.
+func getLastIngestionTs(edb events.EventDB, repos repograph.Map) (time.Time, error) {
+	timeEnd := time.Now()
+	window := time.Hour
+	for {
+		timeStart := timeEnd.Add(-window)
+		var latest time.Time
+		for repo := range repos {
+			ev, err := edb.Range(fmtStream(repo), timeStart, timeEnd)
+			if err != nil {
+				return BEGINNING_OF_TIME, err
+			}
+			if len(ev) > 0 {
+				ts := ev[len(ev)-1].Timestamp
+				if ts.After(latest) {
+					latest = ts
+				}
+			}
+		}
+		if !util.TimeIsZero(latest) {
+			return latest, nil
+		}
+		if timeStart.Before(BEGINNING_OF_TIME) {
 			return BEGINNING_OF_TIME, nil
 		}
-		return rv, err
+		window *= 2
+		timeEnd = timeStart
 	}
-	if err := gob.NewDecoder(bytes.NewBuffer(b)).Decode(&rv); err != nil {
-		return rv, err
-	}
-	return rv, nil
-}
-
-// writeTs writes the last successful run timestamp to a file.
-func writeTs(workdir string, ts time.Time) error {
-	var buf bytes.Buffer
-	if err := gob.NewEncoder(&buf).Encode(ts); err != nil {
-		return err
-	}
-	return ioutil.WriteFile(path.Join(workdir, TIMESTAMP_FILE), buf.Bytes(), os.ModePerm)
 }
 
 // Start initiates "average time to X% bot coverage" metrics data generation.
 // The caller is responsible for updating the passed-in repos and TaskCfgCache.
-func Start(ctx context.Context, taskDb db.TaskReader, repos repograph.Map, tcc *task_cfg_cache.TaskCfgCache, btProject, btInstance, workdir string, ts oauth2.TokenSource) error {
-	// Setup.
-	if err := os.MkdirAll(workdir, os.ModePerm); err != nil {
-		return err
-	}
-
+func Start(ctx context.Context, taskDb db.TaskReader, repos repograph.Map, tcc *task_cfg_cache.TaskCfgCache, btProject, btInstance string, ts oauth2.TokenSource) error {
 	// Set up event metrics.
 	edb, err := events.NewBTEventDB(ctx, btProject, btInstance, ts)
 	if err != nil {
@@ -416,8 +411,8 @@
 		return fmt.Errorf("Failed to create EventMetrics: %s", err)
 	}
 	for repoUrl := range repos {
+		s := em.GetEventStream(fmtStream(repoUrl))
 		for _, p := range []time.Duration{24 * time.Hour, 7 * 24 * time.Hour} {
-			s := em.GetEventStream(fmtStream(repoUrl))
 			for _, pct := range PERCENTILES {
 				if err := addMetric(s, repoUrl, pct, p); err != nil {
 					return fmt.Errorf("Failed to add metric: %s", err)
@@ -427,13 +422,13 @@
 	}
 
 	lv := metrics2.NewLiveness("last_successful_bot_coverage_metrics")
-	lastFinished, err := readTs(workdir)
+	lastFinished, err := getLastIngestionTs(edb, repos)
 	if err != nil {
-		return fmt.Errorf("Failed to read timestamp: %s", err)
+		return fmt.Errorf("Failed to get timestamp of last successful ingestion: %s", err)
 	}
 	go util.RepeatCtx(10*time.Minute, ctx, func() {
 		now := time.Now()
-		if err := cycle(ctx, taskDb, repos, tcc, edb, em, lastFinished, now, workdir); err != nil {
+		if err := cycle(ctx, taskDb, repos, tcc, edb, em, lastFinished, now); err != nil {
 			sklog.Errorf("Failed to obtain avg time to X%% bot coverage metrics: %s", err)
 		} else {
 			lastFinished = now
diff --git a/datahopper/go/datahopper/main.go b/datahopper/go/datahopper/main.go
index 684c799..5db8804 100644
--- a/datahopper/go/datahopper/main.go
+++ b/datahopper/go/datahopper/main.go
@@ -8,7 +8,6 @@
 	"context"
 	"flag"
 	"fmt"
-	"path/filepath"
 	"regexp"
 	"time"
 
@@ -56,7 +55,6 @@
 	repoUrls          = common.NewMultiStringFlag("repo", nil, "Repositories to query for status.")
 	swarmingServer    = flag.String("swarming_server", "", "Host name of the Swarming server.")
 	swarmingPools     = common.NewMultiStringFlag("swarming_pool", nil, "Swarming pools to use.")
-	workdir           = flag.String("workdir", ".", "Working directory used by data processors.")
 )
 
 var (
@@ -78,13 +76,6 @@
 	)
 	ctx := context.Background()
 
-	// Absolutify the workdir.
-	w, err := filepath.Abs(*workdir)
-	if err != nil {
-		sklog.Fatal(w)
-	}
-	sklog.Infof("Workdir is %s", w)
-
 	// OAuth2.0 TokenSource.
 	ts, err := auth.NewDefaultTokenSource(*local, auth.SCOPE_USERINFO_EMAIL, pubsub.AUTH_SCOPE, bigtable.Scope, datastore.ScopeDatastore, swarming.AUTH_SCOPE, auth.SCOPE_READ_WRITE, auth.SCOPE_GERRIT)
 	if err != nil {
@@ -185,7 +176,7 @@
 	}
 
 	// Generate "time to X% bot coverage" metrics.
-	if err := bot_metrics.Start(ctx, d, repos, tcc, *btProject, *btInstance, *workdir, ts); err != nil {
+	if err := bot_metrics.Start(ctx, d, repos, tcc, *btProject, *btInstance, ts); err != nil {
 		sklog.Fatal(err)
 	}