[gitiles] Limit request QPS, add LogFn and LogFnBatch

This will help with use cases which load a lot of commits.

Change-Id: I44e262961d733a8635b8718dc60cd72499e39414
Reviewed-on: https://skia-review.googlesource.com/c/buildbot/+/233963
Commit-Queue: Eric Boren <borenet@google.com>
Reviewed-by: Ravi Mistry <rmistry@google.com>
diff --git a/go.mod b/go.mod
index 442ff51..f72137e 100644
--- a/go.mod
+++ b/go.mod
@@ -113,6 +113,7 @@
 	golang.org/x/net v0.0.0-20190724013045-ca1201d0de80
 	golang.org/x/oauth2 v0.0.0-20190604053449-0f29369cfe45
 	golang.org/x/sync v0.0.0-20190423024810-112230192c58
+	golang.org/x/time v0.0.0-20190308202827-9d24e82272b4
 	golang.org/x/tools v0.0.0-20190809145639-6d4652c779c4 // indirect
 	golang.org/x/tools/gopls v0.1.3 // indirect
 	google.golang.org/api v0.8.0
diff --git a/go/gitiles/gitiles.go b/go/gitiles/gitiles.go
index 0ef1e13..7646a49 100644
--- a/go/gitiles/gitiles.go
+++ b/go/gitiles/gitiles.go
@@ -4,6 +4,7 @@
 	"context"
 	"encoding/base64"
 	"encoding/json"
+	"errors"
 	"fmt"
 	"io"
 	"io/ioutil"
@@ -18,6 +19,7 @@
 	"go.skia.org/infra/go/httputils"
 	"go.skia.org/infra/go/util"
 	"go.skia.org/infra/go/vcsinfo"
+	"golang.org/x/time/rate"
 )
 
 /*
@@ -29,15 +31,25 @@
 	DATE_FORMAT_NO_TZ = "Mon Jan 02 15:04:05 2006"
 	DATE_FORMAT_TZ    = "Mon Jan 02 15:04:05 2006 -0700"
 	DOWNLOAD_URL      = "%s/+/%s/%s?format=TEXT"
-	LOG_URL           = "%s/+log/%s..%s?format=JSON"
+	LOG_URL           = "%s/+log/%s?format=JSON"
 	REFS_URL          = "%s/+refs%%2Fheads?format=JSON"
 	TAGS_URL          = "%s/+refs%%2Ftags?format=JSON"
+
+	// These were copied from the defaults used by gitfs:
+	// https://gerrit.googlesource.com/gitfs/+/59c1163fd1737445281f2339399b2b986b0d30fe/gitiles/client.go#102
+	MAX_QPS   = rate.Limit(4.0)
+	MAX_BURST = 40
+)
+
+var (
+	ErrStopIteration = errors.New("stop iteration")
 )
 
 // Repo is an object used for interacting with a single Git repo using Gitiles.
 type Repo struct {
 	client         *http.Client
 	gitCookiesPath string
+	rl             *rate.Limiter
 	URL            string
 }
 
@@ -49,12 +61,17 @@
 	return &Repo{
 		client:         c,
 		gitCookiesPath: gitCookiesPath,
+		rl:             rate.NewLimiter(MAX_QPS, MAX_BURST),
 		URL:            url,
 	}
 }
 
 // get executes a GET request to the given URL, returning the http.Response.
 func (r *Repo) get(ctx context.Context, url string) (*http.Response, error) {
+	// Respect the rate limit.
+	if err := r.rl.Wait(ctx); err != nil {
+		return nil, err
+	}
 	req, err := http.NewRequest("GET", url, nil)
 	if err != nil {
 		return nil, err
@@ -189,28 +206,48 @@
 	return commitToLongCommit(&c)
 }
 
+// logHelper is used to perform requests which are equivalent to "git log".
+// Loads commits in batches and calls the given function for each batch of
+// commits. If the function returns an error, iteration stops, and the error is
+// returned, unless it was ErrStopIteration.
+func (r *Repo) logHelper(ctx context.Context, urlTmpl, commit string, fn func(context.Context, []*vcsinfo.LongCommit) error) error {
+	for {
+		var l Log
+		if err := r.getJson(ctx, fmt.Sprintf(urlTmpl, commit), &l); err != nil {
+			return err
+		}
+		// Convert to vcsinfo.LongCommit.
+		commits := make([]*vcsinfo.LongCommit, 0, len(l.Log))
+		for _, c := range l.Log {
+			vc, err := commitToLongCommit(c)
+			if err != nil {
+				return err
+			}
+			commits = append(commits, vc)
+		}
+		if err := fn(ctx, commits); err == ErrStopIteration {
+			return nil
+		} else if err != nil {
+			return err
+		}
+		if l.Next == "" {
+			return nil
+		} else {
+			commit = l.Next
+		}
+	}
+}
+
 // Log returns Gitiles' equivalent to "git log" for the given start and end
 // commits.
 func (r *Repo) Log(ctx context.Context, from, to string) ([]*vcsinfo.LongCommit, error) {
 	rv := []*vcsinfo.LongCommit{}
-	for {
-		var l Log
-		if err := r.getJson(ctx, fmt.Sprintf(LOG_URL, r.URL, from, to), &l); err != nil {
-			return nil, err
-		}
-		// Convert to vcsinfo.LongCommit.
-		for _, c := range l.Log {
-			vc, err := commitToLongCommit(c)
-			if err != nil {
-				return nil, err
-			}
-			rv = append(rv, vc)
-		}
-		if l.Next == "" {
-			break
-		} else {
-			to = l.Next
-		}
+	tmpl := fmt.Sprintf(LOG_URL, r.URL, from+"..%s")
+	if err := r.logHelper(ctx, tmpl, to, func(ctx context.Context, commits []*vcsinfo.LongCommit) error {
+		rv = append(rv, commits...)
+		return nil
+	}); err != nil {
+		return nil, err
 	}
 	return rv, nil
 }
@@ -277,6 +314,26 @@
 	return rv, nil
 }
 
+// LogFn runs the given function for each commit in the log history reachable
+// from the given commit hash or ref name. It stops when ErrStopIteration is
+// returned.
+func (r *Repo) LogFn(ctx context.Context, to string, fn func(context.Context, *vcsinfo.LongCommit) error) error {
+	return r.LogFnBatch(ctx, to, func(ctx context.Context, commits []*vcsinfo.LongCommit) error {
+		for _, c := range commits {
+			if err := fn(ctx, c); err != nil {
+				return err
+			}
+		}
+		return nil
+	})
+}
+
+// LogFnBatch is the same as LogFn but it runs the given function over batches
+// of commits.
+func (r *Repo) LogFnBatch(ctx context.Context, to string, fn func(context.Context, []*vcsinfo.LongCommit) error) error {
+	return r.logHelper(ctx, fmt.Sprintf(LOG_URL, r.URL, "%s"), to, fn)
+}
+
 // Branches returns the list of branches in the repo.
 func (r *Repo) Branches(ctx context.Context) ([]*git.Branch, error) {
 	branchMap := map[string]struct {
diff --git a/go/gitiles/gitiles_test.go b/go/gitiles/gitiles_test.go
index 199e4b7..51a6b5d 100644
--- a/go/gitiles/gitiles_test.go
+++ b/go/gitiles/gitiles_test.go
@@ -16,6 +16,7 @@
 	"go.skia.org/infra/go/testutils"
 	"go.skia.org/infra/go/testutils/unittest"
 	"go.skia.org/infra/go/vcsinfo"
+	"golang.org/x/time/rate"
 )
 
 func TestLog(t *testing.T) {
@@ -81,6 +82,7 @@
 
 	urlMock := mockhttpclient.NewURLMock()
 	r := NewRepo(gb.RepoUrl(), "", urlMock.Client())
+	r.rl.SetLimit(rate.Inf)
 
 	// Helper function for mocking gitiles API calls.
 	mockLog := func(from, to string, rvCommits []string) {
@@ -108,7 +110,7 @@
 		}
 		js := testutils.MarshalJSON(t, results)
 		js = ")]}'\n" + js
-		urlMock.MockOnce(fmt.Sprintf(LOG_URL, gb.RepoUrl(), from, to), mockhttpclient.MockGetDialogue([]byte(js)))
+		urlMock.MockOnce(fmt.Sprintf(LOG_URL, gb.RepoUrl(), fmt.Sprintf("%s..%s", from, to)), mockhttpclient.MockGetDialogue([]byte(js)))
 	}
 
 	// Return a slice of the hashes for the given commits.
@@ -187,6 +189,7 @@
 	repoUrl := "https://fake/repo"
 	urlMock := mockhttpclient.NewURLMock()
 	repo := NewRepo(repoUrl, "", urlMock.Client())
+	repo.rl.SetLimit(rate.Inf)
 	next := 0
 	hash := func() string {
 		next++
@@ -236,13 +239,33 @@
 		}
 		js := testutils.MarshalJSON(t, results)
 		js = ")]}'\n" + js
-		urlMock.MockOnce(fmt.Sprintf(LOG_URL, repoUrl, from.Hash, to.Hash), mockhttpclient.MockGetDialogue([]byte(js)))
+		urlMock.MockOnce(fmt.Sprintf(LOG_URL, repoUrl, fmt.Sprintf("%s..%s", from.Hash, to.Hash)), mockhttpclient.MockGetDialogue([]byte(js)))
+		urlMock.MockOnce(fmt.Sprintf(LOG_URL, repoUrl, to.Hash), mockhttpclient.MockGetDialogue([]byte(js)))
 	}
-	check := func(from, to *vcsinfo.LongCommit, expect []*vcsinfo.LongCommit) {
+	check := func(from, to *vcsinfo.LongCommit, expectCommits []*vcsinfo.LongCommit) {
+		// The expectations are in chronological order for convenience
+		// of the caller. But git logs are in reverse chronological
+		// order. Sort the expectations.
+		expect := make([]*vcsinfo.LongCommit, len(expectCommits))
+		copy(expect, expectCommits)
+		sort.Sort(vcsinfo.LongCommitSlice(expect))
+
+		// Test standard Log(from, to) function.
 		log, err := repo.Log(ctx, from.Hash, to.Hash)
 		assert.NoError(t, err)
-		sort.Sort(sort.Reverse(vcsinfo.LongCommitSlice(log)))
 		deepequal.AssertDeepEqual(t, expect, log)
+
+		// Test LogFn
+		log = make([]*vcsinfo.LongCommit, 0, len(expect))
+		assert.NoError(t, repo.LogFn(ctx, to.Hash, func(ctx context.Context, c *vcsinfo.LongCommit) error {
+			if c.Hash == from.Hash {
+				return ErrStopIteration
+			}
+			log = append(log, c)
+			return nil
+		}))
+		deepequal.AssertDeepEqual(t, expect, log)
+		assert.True(t, urlMock.Empty())
 	}
 
 	// Create some fake commits.
@@ -252,20 +275,20 @@
 	}
 
 	// Most basic test case; no pagination.
-	mock(commits[0], commits[5], commits[0:5], "")
-	check(commits[0], commits[5], commits[0:5])
+	mock(commits[0], commits[5], commits[1:5], "")
+	check(commits[0], commits[5], commits[1:5])
 
 	// Two pages.
 	split := 5
 	mock(commits[0], commits[len(commits)-1], commits[split:], commits[split].Hash)
-	mock(commits[0], commits[split], commits[0:split], "")
-	check(commits[0], commits[len(commits)-1], commits)
+	mock(commits[0], commits[split], commits[1:split], "")
+	check(commits[0], commits[len(commits)-1], commits[1:])
 
 	// Three pages.
 	split1 := 7
 	split2 := 3
 	mock(commits[0], commits[len(commits)-1], commits[split1:], commits[split1].Hash)
 	mock(commits[0], commits[split1], commits[split2:split1], commits[split2].Hash)
-	mock(commits[0], commits[split2], commits[0:split2], "")
-	check(commits[0], commits[len(commits)-1], commits)
+	mock(commits[0], commits[split2], commits[1:split2], "")
+	check(commits[0], commits[len(commits)-1], commits[1:])
 }
diff --git a/go/gitiles/testutils/testutils.go b/go/gitiles/testutils/testutils.go
index 1b040bd..947e1d1 100644
--- a/go/gitiles/testutils/testutils.go
+++ b/go/gitiles/testutils/testutils.go
@@ -91,6 +91,6 @@
 	b, err := json.Marshal(log)
 	assert.NoError(mr.t, err)
 	b = append([]byte(")]}'\n"), b...)
-	url := fmt.Sprintf(gitiles.LOG_URL, mr.url, from, to)
+	url := fmt.Sprintf(gitiles.LOG_URL, mr.url, fmt.Sprintf("%s..%s", from, to))
 	mr.c.MockOnce(url, mockhttpclient.MockGetDialogue(b))
 }