blob: 4d107c7c72e54475d163b4cc6ec6fc5b70a7ddcf [file] [log] [blame]
// busywork is an end-to-end test for local_db. It performs inserts and updates
// roughly mimicking what we might expect from task_scheduler. It also tracks
// performance for various operations.
package main
import (
"container/heap"
"context"
"flag"
"fmt"
"math"
"math/rand"
"os"
"sort"
"strconv"
"sync"
"time"
"go.skia.org/infra/go/auth"
"go.skia.org/infra/go/common"
"go.skia.org/infra/go/sklog"
"go.skia.org/infra/task_scheduler/go/db"
"go.skia.org/infra/task_scheduler/go/db/cache"
"go.skia.org/infra/task_scheduler/go/db/firestore"
"go.skia.org/infra/task_scheduler/go/scheduling"
"go.skia.org/infra/task_scheduler/go/types"
"go.skia.org/infra/task_scheduler/go/window"
)
var (
// Flags.
local = flag.Bool("local", true, "Whether we're running on a dev machine vs in production.")
promPort = flag.String("prom_port", ":20000", "Metrics service address (e.g., ':10110')")
// Counters.
inserts = 0
insertDur = time.Duration(0)
mInserts = sync.RWMutex{}
insertAndUpdates = 0
insertAndUpdateDur = time.Duration(0)
mInsertAndUpdates = sync.RWMutex{}
updates = 0
updateDur = time.Duration(0)
mUpdates = sync.RWMutex{}
reads = 0
readDur = time.Duration(0)
mReads = sync.RWMutex{}
// epoch is a time before local_db was written.
epoch = time.Date(2016, 8, 1, 0, 0, 0, 0, time.UTC)
)
const (
// Parameters for creating random tasks.
kNumTaskNames = 50
kNumRepos = 3
kRecentCommitRange = 30
kMedianBlamelistLength = 2
// Parameters for randomly updating tasks.
kMedianPendingDuration = 10 * time.Second
kMedianRunningDuration = 10 * time.Minute
)
// itoh converts an integer to a commit hash. Task.Revision is always set to
// the result of itoh.
func itoh(i int) string {
return strconv.Itoa(i)
}
// htoi converts a commit hash to an integer. A commit's parent is
// itoh(htoi(hash)-1).
func htoi(h string) int {
i, err := strconv.Atoi(h)
if err != nil {
sklog.Fatal(err)
}
return i
}
// makeTask generates task with random Name, Repo, and Revision. Revision will
// be picked randomly from a range starting at recentCommitsBegin.
func makeTask(recentCommitsBegin int) *types.Task {
return &types.Task{
TaskKey: types.TaskKey{
RepoState: types.RepoState{
Repo: fmt.Sprintf("Repo-%d", rand.Intn(kNumRepos)),
Revision: itoh(recentCommitsBegin + rand.Intn(kRecentCommitRange)),
},
Name: fmt.Sprintf("Task-%d", rand.Intn(kNumTaskNames)),
},
}
}
// updateBlamelists sets t's Commits based on t.Revision and previously-inserted
// tasks' Commits and returns t. If another task's Commits needs to change, also
// returns that task with its updated Commits.
func updateBlamelists(cache cache.TaskCache, t *types.Task) ([]*types.Task, error) {
if !cache.KnownTaskName(t.Repo, t.Name) {
t.Commits = []string{t.Revision}
return []*types.Task{t}, nil
}
stealFrom, err := cache.GetTaskForCommit(t.Repo, t.Revision, t.Name)
if err != nil {
return nil, fmt.Errorf("Could not find task %q for commit %q: %s", t.Name, t.Revision, err)
}
lastCommit := htoi(t.Revision)
firstCommit := lastCommit
// Work backwards until prev changes.
for i := lastCommit - 1; i > 0; i-- {
if lastCommit-firstCommit+1 > scheduling.MAX_BLAMELIST_COMMITS && stealFrom == nil {
t.Commits = []string{t.Revision}
return []*types.Task{t}, nil
}
hash := itoh(i)
prev, err := cache.GetTaskForCommit(t.Repo, hash, t.Name)
if err != nil {
return nil, fmt.Errorf("Could not find task %q for commit %q: %s", t.Name, hash, err)
}
if stealFrom != prev {
break
}
firstCommit = i
}
t.Commits = make([]string, lastCommit-firstCommit+1)
for i := 0; i <= lastCommit-firstCommit; i++ {
t.Commits[i] = itoh(i + firstCommit)
}
sort.Strings(t.Commits)
if stealFrom != nil {
newCommits := make([]string, 0, len(stealFrom.Commits)-len(t.Commits))
for _, h := range stealFrom.Commits {
idx := sort.SearchStrings(t.Commits, h)
if idx == len(t.Commits) || t.Commits[idx] != h {
newCommits = append(newCommits, h)
}
}
stealFrom.Commits = newCommits
return []*types.Task{t, stealFrom}, nil
} else {
return []*types.Task{t}, nil
}
}
// findApproxLatestCommit scans the DB backwards and returns the commit # of the
// last-created task.
func findApproxLatestCommit(d db.TaskDB) int {
sklog.Infof("findApproxLatestCommit begin")
for t := time.Now(); t.After(epoch); t = t.Add(-24 * time.Hour) {
begin := t.Add(-24 * time.Hour)
sklog.Infof("findApproxLatestCommit loading %s to %s", begin, t)
before := time.Now()
t, err := d.GetTasksFromDateRange(begin, t, "")
getTasksDur := time.Now().Sub(before)
if err != nil {
sklog.Fatal(err)
}
mReads.Lock()
if len(t) > 0 {
reads += len(t)
} else {
reads++
}
readDur += getTasksDur
mReads.Unlock()
if len(t) > 0 {
// Return revision of last task.
lastTask := t[len(t)-1]
i := htoi(lastTask.Revision)
sklog.Infof("findApproxLatestCommit returning %d from %s", i, lastTask.Id)
return i
}
}
sklog.Infof("findApproxLatestCommit found empty DB")
return 0
}
// putTasks inserts randomly-generated tasks into the DB. Does not return.
func putTasks(d db.TaskDB) {
sklog.Infof("putTasks begin")
w, err := window.New(4*24*time.Hour, 0, nil)
if err != nil {
sklog.Fatal(err)
}
cache, err := cache.NewTaskCache(d, w)
if err != nil {
sklog.Fatal(err)
}
// If we're restarting, try to pick up where we left off.
currentCommit := findApproxLatestCommit(d)
meanTasksPerCommit := float64(kNumTaskNames * kNumRepos / kMedianBlamelistLength)
maxTasksPerIter := float64(kNumTaskNames * kNumRepos * kRecentCommitRange)
for {
if err := w.Update(); err != nil {
sklog.Fatal(err)
}
iterTasks := int(math.Max(0, math.Min(maxTasksPerIter, (rand.NormFloat64()+1)*meanTasksPerCommit)))
sklog.Infof("Adding %d tasks with revisions %s - %s", iterTasks, itoh(currentCommit), itoh(currentCommit+kRecentCommitRange))
for i := 0; i < iterTasks; i++ {
t := makeTask(currentCommit)
putTasksDur := time.Duration(0)
before := time.Now()
updatedTasks, err := db.UpdateTasksWithRetries(d, func() ([]*types.Task, error) {
putTasksDur += time.Now().Sub(before)
t := t.Copy()
if err := cache.Update(); err != nil {
sklog.Fatal(err)
}
tasksToUpdate, err := updateBlamelists(cache, t)
if err != nil {
sklog.Fatal(err)
}
before = time.Now()
if err := d.AssignId(t); err != nil {
sklog.Fatal(err)
}
putTasksDur += time.Now().Sub(before)
t.Created = time.Now()
t.SwarmingTaskId = fmt.Sprintf("%x", rand.Int31())
before = time.Now()
return tasksToUpdate, nil
})
putTasksDur += time.Now().Sub(before)
if err != nil {
sklog.Fatal(err)
}
if len(updatedTasks) > 1 {
mInsertAndUpdates.Lock()
if err == nil {
insertAndUpdates += len(updatedTasks)
}
insertAndUpdateDur += putTasksDur
mInsertAndUpdates.Unlock()
} else {
mInserts.Lock()
if err == nil {
inserts++
}
insertDur += putTasksDur
mInserts.Unlock()
}
}
currentCommit++
}
}
// updateEntry is an item in updateEntryHeap.
type updateEntry struct {
task *types.Task
// updateTime is the key for updateEntryHeap.
updateTime time.Time
// heapIndex is the index of this updateEntry in updateEntryHeap. It is kept
// up-to-date by updateEntryHeap methods.
heapIndex int
}
// updateEntryHeap implements a queue of updateEntry's ordered by updateTime. It
// implements heap.Interface.
type updateEntryHeap []*updateEntry
func (h updateEntryHeap) Len() int { return len(h) }
func (h updateEntryHeap) Less(i, j int) bool { return h[i].updateTime.Before(h[j].updateTime) }
func (h updateEntryHeap) Swap(i, j int) {
h[i], h[j] = h[j], h[i]
h[i].heapIndex = i
h[j].heapIndex = j
}
func (h *updateEntryHeap) Push(x interface{}) {
item := x.(*updateEntry)
item.heapIndex = len(*h)
*h = append(*h, item)
}
func (h *updateEntryHeap) Pop() interface{} {
old := *h
n := len(old)
x := old[n-1]
*h = old[0 : n-1]
x.heapIndex = -1
return x
}
// updateTasks makes random updates to pending and running tasks in the DB. Does
// not return.
func updateTasks(d db.TaskDB) {
sklog.Infof("updateTasks begin")
updateQueue := updateEntryHeap{}
idMap := map[string]*updateEntry{}
freshenQueue := func(task *types.Task) {
entry := idMap[task.Id]
// Currently only updating pending and running tasks.
if task.Status == types.TASK_STATUS_PENDING || task.Status == types.TASK_STATUS_RUNNING {
meanUpdateDelay := kMedianPendingDuration
if task.Status == types.TASK_STATUS_RUNNING {
meanUpdateDelay = kMedianRunningDuration
}
updateDelayNanos := int64(math.Max(0, (rand.NormFloat64()+1)*float64(meanUpdateDelay)))
updateTime := time.Now().Add(time.Duration(updateDelayNanos) * time.Nanosecond)
if entry == nil {
entry = &updateEntry{
task: task,
updateTime: updateTime,
heapIndex: -1,
}
heap.Push(&updateQueue, entry)
} else {
entry.task = task
entry.updateTime = updateTime
heap.Fix(&updateQueue, entry.heapIndex)
}
if entry.heapIndex < 0 {
sklog.Fatalf("you lose %#v %#v", entry, updateQueue)
}
idMap[task.Id] = entry
} else if entry != nil {
heap.Remove(&updateQueue, entry.heapIndex)
delete(idMap, task.Id)
}
}
token, err := d.StartTrackingModifiedTasks()
if err != nil {
sklog.Fatal(err)
}
// Initial read to find pending and running tasks.
for t := time.Now(); t.After(epoch); t = t.Add(-24 * time.Hour) {
begin := t.Add(-24 * time.Hour)
sklog.Infof("updateTasks loading %s to %s", begin, t)
before := time.Now()
t, err := d.GetTasksFromDateRange(begin, t, "")
getTasksDur := time.Now().Sub(before)
if err != nil {
sklog.Fatal(err)
}
mReads.Lock()
if len(t) > 0 {
reads += len(t)
} else {
reads++
}
readDur += getTasksDur
mReads.Unlock()
for _, task := range t {
freshenQueue(task)
}
}
sklog.Infof("updateTasks finished loading; %d pending and running", len(idMap))
// Rate limit so we're not constantly taking locks for GetModifiedTasks.
for range time.Tick(time.Millisecond) {
now := time.Now()
t, err := d.GetModifiedTasks(token)
if err != nil {
sklog.Fatal(err)
}
for _, task := range t {
freshenQueue(task)
}
sklog.Infof("updateTasks performing updates; %d tasks on queue", len(updateQueue))
for len(updateQueue) > 0 && updateQueue[0].updateTime.Before(now) {
if time.Now().Sub(now) >= db.MODIFIED_DATA_TIMEOUT-5*time.Second {
break
}
entry := heap.Pop(&updateQueue).(*updateEntry)
task := entry.task
delete(idMap, task.Id)
putTasksDur := time.Duration(0)
before := time.Now()
_, err := db.UpdateTaskWithRetries(d, task.Id, func(task *types.Task) error {
putTasksDur += time.Now().Sub(before)
switch task.Status {
case types.TASK_STATUS_PENDING:
task.Started = now
isMishap := rand.Intn(100) == 0
if isMishap {
task.Status = types.TASK_STATUS_MISHAP
task.Finished = now
} else {
task.Status = types.TASK_STATUS_RUNNING
}
case types.TASK_STATUS_RUNNING:
task.Finished = now
statusRand := rand.Intn(25)
isMishap := statusRand == 0
isFailure := statusRand < 5
if isMishap {
task.Status = types.TASK_STATUS_MISHAP
} else if isFailure {
task.Status = types.TASK_STATUS_FAILURE
} else {
task.Status = types.TASK_STATUS_SUCCESS
task.IsolatedOutput = fmt.Sprintf("%x", rand.Int63())
}
default:
sklog.Fatalf("Task %s in update queue has status %s. %#v", task.Id, task.Status, task)
}
before = time.Now()
return nil
})
putTasksDur += time.Now().Sub(before)
if err != nil {
sklog.Fatal(err)
}
mUpdates.Lock()
updates++
updateDur += putTasksDur
mUpdates.Unlock()
}
}
}
// readTasks reads the last hour of tasks every second. Does not return.
func readTasks(d db.TaskDB) {
sklog.Infof("readTasks begin")
var taskCount uint64 = 0
var readCount uint64 = 0
var totalDuration time.Duration = 0
lastMessage := time.Now()
for range time.Tick(time.Second) {
now := time.Now()
t, err := d.GetTasksFromDateRange(now.Add(-time.Hour), now, "")
dur := time.Now().Sub(now)
if err != nil {
sklog.Fatal(err)
}
taskCount += uint64(len(t))
readCount++
totalDuration += dur
mReads.Lock()
reads += len(t)
readDur += dur
mReads.Unlock()
if now.Sub(lastMessage) > time.Minute {
lastMessage = now
if readCount > 0 && totalDuration > 0 {
sklog.Infof("readTasks %d tasks in last hour; %f reads/sec; %f tasks/sec", taskCount/readCount, float64(readCount)/totalDuration.Seconds(), float64(taskCount)/totalDuration.Seconds())
} else {
sklog.Fatalf("readTasks 0 reads in last minute")
}
taskCount = 0
readCount = 0
totalDuration = 0
}
}
}
// reportStats logs the performance of the DB as seen by putTasks, updateTasks,
// and readTasks. Does not return.
func reportStats() {
lastInserts := 0
lastInsertDur := time.Duration(0)
lastInsertAndUpdates := 0
lastInsertAndUpdateDur := time.Duration(0)
lastUpdates := 0
lastUpdateDur := time.Duration(0)
lastReads := 0
lastReadDur := time.Duration(0)
for range time.Tick(5 * time.Second) {
mInserts.RLock()
totalInserts := inserts
totalInsertDur := insertDur
mInserts.RUnlock()
mInsertAndUpdates.RLock()
totalInsertAndUpdates := insertAndUpdates
totalInsertAndUpdateDur := insertAndUpdateDur
mInsertAndUpdates.RUnlock()
mUpdates.RLock()
totalUpdates := updates
totalUpdateDur := updateDur
mUpdates.RUnlock()
mReads.RLock()
totalReads := reads
totalReadDur := readDur
mReads.RUnlock()
curInserts := totalInserts - lastInserts
lastInserts = totalInserts
curInsertDur := totalInsertDur - lastInsertDur
lastInsertDur = totalInsertDur
curInsertAndUpdates := totalInsertAndUpdates - lastInsertAndUpdates
lastInsertAndUpdates = totalInsertAndUpdates
curInsertAndUpdateDur := totalInsertAndUpdateDur - lastInsertAndUpdateDur
lastInsertAndUpdateDur = totalInsertAndUpdateDur
curUpdates := totalUpdates - lastUpdates
lastUpdates = totalUpdates
curUpdateDur := totalUpdateDur - lastUpdateDur
lastUpdateDur = totalUpdateDur
curReads := totalReads - lastReads
lastReads = totalReads
curReadDur := totalReadDur - lastReadDur
lastReadDur = totalReadDur
sklog.Infof("reportStats total; %d inserts %f/s; %d insert-and-updates %f/s; %d updates %f/s; %d reads %f/s", totalInserts, float64(totalInserts)/totalInsertDur.Seconds(), totalInsertAndUpdates, float64(totalInsertAndUpdates)/totalInsertAndUpdateDur.Seconds(), totalUpdates, float64(totalUpdates)/totalUpdateDur.Seconds(), totalReads, float64(totalReads)/totalReadDur.Seconds())
if curInsertDur.Nanoseconds() == 0 {
curInsertDur += time.Nanosecond
}
if curInsertAndUpdateDur.Nanoseconds() == 0 {
curInsertAndUpdateDur += time.Nanosecond
}
if curUpdateDur.Nanoseconds() == 0 {
curUpdateDur += time.Nanosecond
}
if curReadDur.Nanoseconds() == 0 {
curReadDur += time.Nanosecond
}
sklog.Infof("reportStats current; %d inserts %f/s; %d insert-and-updates %f/s; %d updates %f/s; %d reads %f/s", curInserts, float64(curInserts)/curInsertDur.Seconds(), curInsertAndUpdates, float64(curInsertAndUpdates)/curInsertAndUpdateDur.Seconds(), curUpdates, float64(curUpdates)/curUpdateDur.Seconds(), curReads, float64(curReads)/curReadDur.Seconds())
}
}
func main() {
// Global init.
common.Init()
ts, err := auth.NewDefaultTokenSource(*local)
hostname, err := os.Hostname()
if err != nil {
sklog.Fatal(err)
}
id := fmt.Sprintf("busywork_%s", hostname)
d, err := firestore.NewDB(context.Background(), firestore.FIRESTORE_PROJECT, id, ts, nil)
if err != nil {
sklog.Fatal(err)
}
go reportStats()
go putTasks(d)
go updateTasks(d)
go readTasks(d)
// Block forever while goroutines do the work.
select {}
}