blob: b538c50548b9bb9d49ad680abeca06893fd43aee [file] [log] [blame]
package capacity
// This package makes multiple queries to InfluxDB to get metrics that allow
// us to gauge theoretical capacity needs. Presently, the last 3 days worth of
// swarming data is used as the basis for these metrics.
import (
"context"
"fmt"
"sort"
"strings"
"sync"
"time"
"go.skia.org/infra/go/cq"
"go.skia.org/infra/go/git"
"go.skia.org/infra/go/git/repograph"
"go.skia.org/infra/go/skerr"
"go.skia.org/infra/go/sklog"
"go.skia.org/infra/go/swarming"
"go.skia.org/infra/go/util"
"go.skia.org/infra/task_scheduler/go/db/cache"
"go.skia.org/infra/task_scheduler/go/specs"
"go.skia.org/infra/task_scheduler/go/task_cfg_cache"
"go.skia.org/infra/task_scheduler/go/types"
)
// CapacityClient provides methods for tracking bot capacity.
type CapacityClient interface {
// QueryAll updates the capacity metrics.
QueryAll(ctx context.Context) error
// StartLoading begins an infinite loop to recompute the capacity metrics after a
// given interval of time. Any errors are logged, but the loop is not broken.
StartLoading(ctx context.Context, interval time.Duration)
// Returns the most recent capacity metrics. Keyed by stringified dimensions.
CapacityMetrics() map[string]BotConfig
}
type CapacityClientImpl struct {
tcc *task_cfg_cache.TaskCfgCacheImpl
tasks cache.TaskCache
repos repograph.Map
// The cached measurements
lastMeasurements map[string]BotConfig
mtx sync.Mutex
}
// Caller is responsible for periodically updating the arguments.
func New(tcc *task_cfg_cache.TaskCfgCacheImpl, tasks cache.TaskCache, repos repograph.Map) *CapacityClientImpl {
return &CapacityClientImpl{tcc: tcc, tasks: tasks, repos: repos}
}
type taskData struct {
Duration time.Duration
BotId string
}
type TaskDuration struct {
Name string `json:"task_name"`
AverageDuration time.Duration `json:"task_duration_ns"`
OnCQ bool `json:"on_cq_also"`
}
// BotConfig represents one bot config we test on. I.e. one group of dimensions that execute tasks.
type BotConfig struct {
Dimensions []string `json:"dimensions"`
Bots map[string]bool `json:"bots"` // maps bot id to boolean
TaskAverageDurations []TaskDuration `json:"tasks"`
}
// getTaskDurations fetches Tasks from the TaskCache and generates a taskData for each completed
// Swarming Task, grouped by repo and TaskSpec name.
func (c *CapacityClientImpl) getTaskDurations() (map[string]map[string][]taskData, error) {
// Fetch last 72 hours worth of tasks that TaskScheduler created.
now := time.Now()
before := now.Add(-72 * time.Hour)
tasks, err := c.tasks.GetTasksFromDateRange(before, now)
if err != nil {
return nil, fmt.Errorf("Could not fetch tasks between %s and %s: %s", before, now, err)
}
sklog.Infof("Found %d tasks in last 72 hours", len(tasks))
// Go through all the tasks and group the durations and bot ids by task name
durations := make(map[string]map[string][]taskData)
count := 0
for _, task := range tasks {
// Skip any task that didn't finish or didn't run. Finished and Started are
// the same if the task never ran.
if !task.Done() {
continue
}
if task.Fake() {
continue
}
// TODO(dogben): Need to consider deduplicated tasks also.
duration := task.Finished.Sub(task.Started)
if len(durations[task.Repo]) == 0 {
durations[task.Repo] = map[string][]taskData{}
}
if len(durations[task.Repo][task.Name]) == 0 {
count++
}
durations[task.Repo][task.Name] = append(durations[task.Repo][task.Name], taskData{
Duration: duration,
BotId: task.SwarmingBotId,
})
}
sklog.Infof("From %d tasks, we saw %d unique task names over %d repos", len(tasks), count, len(durations))
return durations, nil
}
// getCQTaskSpecs returns the TaskSpec names of all Jobs on the CQ.
// TODO(benjaminwagner): return a util.StringSet{}
func (c *CapacityClientImpl) getCQTaskSpecs() ([]string, error) {
cqTasks, err := cq.GetSkiaCQTryBots()
if err != nil {
sklog.Warningf("Could not get Skia CQ bots. Continuing anyway. %s", err)
cqTasks = []string{}
}
infraCQTasks, err := cq.GetSkiaInfraCQTryBots()
if err != nil {
sklog.Warningf("Could not get Infra CQ bots. Continuing anyway. %s", err)
infraCQTasks = []string{}
}
cqTasks = append(cqTasks, infraCQTasks...)
// TODO(benjaminwagner): This is a list of Job names, not Task names.
return cqTasks, nil
}
// botConfigKey creates a string key from a list of dimensions. dims will be sorted.
func botConfigKey(dims []string) string {
sort.Strings(dims)
return strings.Join(dims, "|")
}
// getTasksCfg finds the most recent cached TasksCfg for the main branch of the given repo. Also
// returns the commit hash where the TasksCfg was found.
func (c *CapacityClientImpl) getTasksCfg(ctx context.Context, repo string) (*specs.TasksCfg, string, error) {
repoGraph, ok := c.repos[repo]
if !ok {
return nil, "", skerr.Fmt("Unknown repo %q", repo)
}
commit := repoGraph.Get(git.MainBranch)
if commit == nil {
commit = repoGraph.Get(git.MasterBranch)
}
if commit == nil {
return nil, "", skerr.Fmt("Unable to find main branch in %q", repo)
}
const lookback = 5
for i := 0; i < lookback; i++ {
rs := types.RepoState{
Repo: repo,
Revision: commit.Hash,
}
tasksCfg, cachedErr, err := c.tcc.Get(ctx, rs)
if cachedErr != nil {
err = cachedErr
}
if err == nil {
return tasksCfg, commit.Hash, nil
}
sklog.Warningf("Error getting TasksCfg for %s: %s", rs, err)
if p := commit.GetParents(); len(p) == 0 {
return nil, "", skerr.Fmt("Unable to find TasksCfg for any revision in %q", repo)
} else {
commit = p[0]
}
}
return nil, "", skerr.Fmt("Unable to find any TasksCfg for %s looking back %d commits.", repo, lookback)
}
// computeBotConfigs groups TaskSpecs by identical dimensions and returns a BotConfig for each
// dimension set. Arguments are getTaskDurations() and getCQTaskSpecs(). The returned map is keyed
// by botConfigKey(BotConfig.Dimensions).
func (c *CapacityClientImpl) computeBotConfigs(ctx context.Context, durations map[string]map[string][]taskData, cqTasks []string) (map[string]BotConfig, error) {
// botConfigs coalesces all dimension groups together. For example, all tests
// that require "device_type:flounder|device_os:N12345" will be grouped together,
// letting us determine our actual use and theoretical capacity of that config.
botConfigs := make(map[string]BotConfig)
for repo, repoDurations := range durations {
// The db.Task structs don't have their dimensions, so we pull those off of the main
// branches of the repo. If the dimensions were updated recently, this may lead
// to some inaccuracies. In practice, this probably won't happen because updates
// tend to update, say, all the Nexus10s to a new OS version, which is effectively no change.
tasksCfg, hash, err := c.getTasksCfg(ctx, repo)
if err != nil {
sklog.Errorf("Skipping repo %s: %s", repo, err)
continue
}
for taskName, taskRuns := range repoDurations {
// Look up the TaskSpec to get the dimensions.
taskSpec, ok := tasksCfg.Tasks[taskName]
if !ok {
sklog.Warningf("Could not find taskspec for %s at %s@%s.", taskName, repo, hash)
continue
}
dims := taskSpec.Dimensions
key := botConfigKey(dims)
config, ok := botConfigs[key]
if !ok {
config = BotConfig{
Dimensions: dims,
Bots: make(map[string]bool),
TaskAverageDurations: make([]TaskDuration, 0),
}
}
// Compute average duration and add all the bots we've seen on this task
avgDuration := time.Duration(0)
for _, td := range taskRuns {
avgDuration += td.Duration
config.Bots[td.BotId] = true
}
if len(taskRuns) != 0 {
avgDuration /= time.Duration(len(taskRuns))
}
config.TaskAverageDurations = append(config.TaskAverageDurations, TaskDuration{
Name: taskName,
AverageDuration: avgDuration,
OnCQ: util.In(taskName, cqTasks),
})
botConfigs[key] = config
}
}
return botConfigs, nil
}
// mergeBotConfigs replaces overlapping BotConfigs in the given map by a combined BotConfig.
func mergeBotConfigs(botConfigs map[string]BotConfig) {
// okToMerge returns true if the BotConfigs referenced by configKeys do not have conflicting
// dimensions.
okToMerge := func(configKeys []string) bool {
combinedDims := map[string]string{}
for _, configKey := range configKeys {
nextDims, err := swarming.ParseDimensionsSingleValue(botConfigs[configKey].Dimensions)
if err != nil {
return false
}
for k, v := range nextDims {
if other, ok := combinedDims[k]; ok && v != other {
return false
}
combinedDims[k] = v
}
}
return true
}
botIdToConfigs := map[string][]string{}
configsToMerge := util.NewStringSet()
for key, config := range botConfigs {
for botId := range config.Bots {
if !util.In(key, botIdToConfigs[botId]) {
botIdToConfigs[botId] = append(botIdToConfigs[botId], key)
}
if len(botIdToConfigs[botId]) > 1 && okToMerge(botIdToConfigs[botId]) {
configsToMerge[key] = true
}
}
}
for key := range configsToMerge {
if _, ok := botConfigs[key]; !ok {
// Already merged.
continue
}
groupKeys := util.NewStringSet()
bots := util.NewStringSet()
var gather func(string)
gather = func(key string) {
if _, ok := groupKeys[key]; ok {
return
}
groupKeys[key] = true
config := botConfigs[key]
for botId := range config.Bots {
bots[botId] = true
for _, other := range botIdToConfigs[botId] {
gather(other)
}
}
}
gather(key)
dimSet := util.NewStringSet()
durs := []TaskDuration{}
for key := range groupKeys {
config := botConfigs[key]
dimSet.AddLists(config.Dimensions)
durs = append(durs, config.TaskAverageDurations...)
delete(botConfigs, key)
}
dims := dimSet.Keys()
newKey := botConfigKey(dims)
botConfigs[newKey] = BotConfig{
Dimensions: dims,
Bots: bots,
TaskAverageDurations: durs,
}
}
}
// QueryAll implements CapacityClient.
func (c *CapacityClientImpl) QueryAll(ctx context.Context) error {
sklog.Info("Recounting Capacity Stats")
durations, err := c.getTaskDurations()
if err != nil {
return err
}
cqTasks, err := c.getCQTaskSpecs()
if err != nil {
return err
}
botConfigs, err := c.computeBotConfigs(ctx, durations, cqTasks)
if err != nil {
return err
}
// Merge BotConfigs with overlapping bots. (This could be the wrong thing to do if a bot's
// dimensions change, e.g. installing a GPU in a CPU bot or recreating a GCE bot with a different
// CPU type. I expect that to be a rare case.)
mergeBotConfigs(botConfigs)
c.mtx.Lock()
defer c.mtx.Unlock()
c.lastMeasurements = botConfigs
return err
}
// StartLoading implements CapacityClient.
func (c *CapacityClientImpl) StartLoading(ctx context.Context, interval time.Duration) {
go func() {
util.RepeatCtx(ctx, interval, func(ctx context.Context) {
if err := c.QueryAll(ctx); err != nil {
sklog.Errorf("There was a problem counting capacity stats")
}
})
}()
}
// CapacityMetrics implements CapacityClient.
func (c *CapacityClientImpl) CapacityMetrics() map[string]BotConfig {
c.mtx.Lock()
defer c.mtx.Unlock()
return c.lastMeasurements
}