blob: 9752e092c05a4db3ba8c76d23ba8ad9dc80badca [file] [log] [blame]
package task_cfg_cache
import (
"bytes"
"context"
"encoding/gob"
"errors"
"fmt"
"time"
"cloud.google.com/go/bigtable"
"go.opencensus.io/trace"
"go.skia.org/infra/go/atomic_miss_cache"
"go.skia.org/infra/go/git/repograph"
"go.skia.org/infra/go/now"
"go.skia.org/infra/go/sklog"
"go.skia.org/infra/task_scheduler/go/specs"
"go.skia.org/infra/task_scheduler/go/types"
"golang.org/x/oauth2"
"google.golang.org/api/option"
)
const (
// BigTable configuration.
// We use a single BigTable table for storing gob-encoded TaskSpecs and
// JobSpecs.
BT_TABLE = "tasks-cfg"
// We use a single BigTable column family.
BT_COLUMN_FAMILY = "CFGS"
// We use a single BigTable column which stores gob-encoded TaskSpecs
// and JobSpecs.
BT_COLUMN = "CFG"
INSERT_TIMEOUT = 30 * time.Second
QUERY_TIMEOUT = 5 * time.Second
)
var (
// Fully-qualified BigTable column name.
BT_COLUMN_FULL = fmt.Sprintf("%s:%s", BT_COLUMN_FAMILY, BT_COLUMN)
ErrNoSuchEntry = atomic_miss_cache.ErrNoSuchEntry
)
type TaskCfgCache interface {
// Cleanup removes cache entries which are outside of our scheduling window.
Cleanup(ctx context.Context, period time.Duration) error
// Close frees up resources used by the TaskCfgCache.
Close() error
// Get returns the TasksCfg (or error) for the given RepoState in the cache.
// If the given entry does not exist in the cache, reads through to the
// backing store to find it there and adds to the cache if it exists. If no
// entry exists for the given RepoState, returns ErrNoSuchEntry. If there is
// a cached (ie. permanent non-recoverable) error for this RepoState, it is
// returned as the second return value.
Get(context.Context, types.RepoState) (*specs.TasksCfg, error, error)
// Sets the TasksCfg (or error) for the given RepoState in the cache.
Set(ctx context.Context, rs types.RepoState, cfg *specs.TasksCfg, storedErr error) error
// Sets the TasksCfg (or error) for the given RepoState in the cache by
// calling the given function if no value already exists. Returns the
// existing or new CachedValue, or any error which occurred.
SetIfUnset(ctx context.Context, rs types.RepoState, fn func(context.Context) (*CachedValue, error)) (*CachedValue, error)
}
// TaskCfgCacheImpl is a struct used for caching tasks cfg files. The user should
// periodically call Cleanup() to remove old entries.
type TaskCfgCacheImpl struct {
cache *atomic_miss_cache.AtomicMissCache
client *bigtable.Client
repos repograph.Map
}
// backingCache implements persistent storage of TasksCfgs in BigTable.
type backingCache struct {
table *bigtable.Table
tcc *TaskCfgCacheImpl
}
// CachedValue represents a cached TasksCfg value. It includes any permanent
// error, which cannot be recovered via retries.
type CachedValue struct {
// RepoState is the RepoState which is associated with this TasksCfg.
RepoState types.RepoState
// Cfg is the TasksCfg for this entry.
Cfg *specs.TasksCfg
// Err stores a permanent error. Mutually-exclusive with Cfg.
Err string
}
// See documentation for atomic_miss_cache.ICache interface.
func (c *backingCache) Get(ctx context.Context, key string) (atomic_miss_cache.Value, error) {
return GetTasksCfgFromBigTable(ctx, c.table, key)
}
// See documentation for atomic_miss_cache.ICache interface.
func (c *backingCache) Set(ctx context.Context, key string, val atomic_miss_cache.Value) error {
cv := val.(*CachedValue)
if !cv.RepoState.Valid() {
return fmt.Errorf("Invalid RepoState: %+v", cv.RepoState)
}
return WriteTasksCfgToBigTable(ctx, c.table, key, cv)
}
// See documentation for atomic_miss_cache.ICache interface.
func (c *backingCache) Delete(ctx context.Context, key string) error {
// We don't delete from BigTable.
return nil
}
// NewTaskCfgCache returns a TaskCfgCache instance.
func NewTaskCfgCache(ctx context.Context, repos repograph.Map, btProject, btInstance string, ts oauth2.TokenSource) (*TaskCfgCacheImpl, error) {
client, err := bigtable.NewClient(ctx, btProject, btInstance, option.WithTokenSource(ts))
if err != nil {
return nil, fmt.Errorf("Failed to create BigTable client: %s", err)
}
table := client.Open(BT_TABLE)
c := &TaskCfgCacheImpl{
client: client,
repos: repos,
}
c.cache = atomic_miss_cache.New(&backingCache{
table: table,
tcc: c,
})
return c, nil
}
// GetTasksCfgFromBigTable retrieves a CachedValue from BigTable.
func GetTasksCfgFromBigTable(ctx context.Context, table *bigtable.Table, rowKey string) (*CachedValue, error) {
var rv *CachedValue
var rvErr error
if err := table.ReadRows(ctx, bigtable.PrefixRange(rowKey), func(row bigtable.Row) bool {
for _, ri := range row[BT_COLUMN_FAMILY] {
if ri.Column == BT_COLUMN_FULL {
var cv CachedValue
rvErr = gob.NewDecoder(bytes.NewReader(ri.Value)).Decode(&cv)
if rvErr == nil {
rv = &cv
}
return false
}
}
return true
}); err != nil {
return nil, err
}
if rvErr != nil {
return nil, rvErr
}
if rv == nil {
return nil, ErrNoSuchEntry
}
return rv, nil
}
// WriteTasksCfgToBigTable writes the given CachedValue to BigTable.
func WriteTasksCfgToBigTable(ctx context.Context, table *bigtable.Table, key string, cv *CachedValue) error {
rowKey := cv.RepoState.RowKey()
if rowKey != key {
return fmt.Errorf("Key doesn't match RepoState.RowKey(): %s vs %s", key, rowKey)
}
var buf bytes.Buffer
if err := gob.NewEncoder(&buf).Encode(cv); err != nil {
return err
}
mut := bigtable.NewMutation()
mut.Set(BT_COLUMN_FAMILY, BT_COLUMN, bigtable.ServerTime, buf.Bytes())
return table.Apply(ctx, rowKey, mut)
}
// Close implements TaskCfgCache.
func (c *TaskCfgCacheImpl) Close() error {
return c.client.Close()
}
// Get implements TaskCfgCache.
func (c *TaskCfgCacheImpl) Get(ctx context.Context, rs types.RepoState) (*specs.TasksCfg, error, error) {
ctx, span := trace.StartSpan(ctx, "taskcfgcache_Get", trace.WithSampler(trace.ProbabilitySampler(0.01)))
defer span.End()
val, err := c.cache.Get(ctx, rs.RowKey())
if err != nil {
return nil, nil, err
}
cv := val.(*CachedValue)
if cv.Err != "" {
return nil, errors.New(cv.Err), nil
}
return cv.Cfg, nil, nil
}
// Sets the TasksCfg (or error) for the given RepoState in the cache.
func (c *TaskCfgCacheImpl) Set(ctx context.Context, rs types.RepoState, cfg *specs.TasksCfg, storedErr error) error {
errString := ""
if storedErr != nil {
errString = storedErr.Error()
}
return c.cache.Set(ctx, rs.RowKey(), atomic_miss_cache.Value(&CachedValue{
RepoState: rs,
Cfg: cfg,
Err: errString,
}))
}
// Sets the TasksCfg (or error) for the given RepoState in the cache by calling
// the given function if no value already exists. Returns the existing or new
// CachedValue, or any error which occurred.
func (c *TaskCfgCacheImpl) SetIfUnset(ctx context.Context, rs types.RepoState, fn func(context.Context) (*CachedValue, error)) (*CachedValue, error) {
cv, err := c.cache.SetIfUnset(ctx, rs.RowKey(), func(ctx context.Context) (atomic_miss_cache.Value, error) {
val, err := fn(ctx)
return val, err
})
if err != nil {
return nil, err
}
return cv.(*CachedValue), nil
}
// getTaskSpecsForRepoStates returns a set of TaskSpecs for each of the given
// set of RepoStates, keyed by RepoState and TaskSpec name. If any of the
// RepoStates do not have a corresponding entry in the cache, they are simply
// left out.
func (c *TaskCfgCacheImpl) getTaskSpecsForRepoStates(ctx context.Context, rs []types.RepoState) (map[types.RepoState]map[string]*specs.TaskSpec, error) {
rv := make(map[types.RepoState]map[string]*specs.TaskSpec, len(rs))
for _, s := range rs {
cached, err := c.cache.Get(ctx, s.RowKey())
if err == ErrNoSuchEntry {
sklog.Errorf("Entry not found in cache: %+v", s)
continue
} else if err != nil {
return nil, err
}
val := cached.(*CachedValue)
if val.Err != "" {
sklog.Errorf("Cached entry has permanent error; skipping: %s", val.Err)
continue
}
subMap := make(map[string]*specs.TaskSpec, len(val.Cfg.Tasks))
for name, taskSpec := range val.Cfg.Tasks {
subMap[name] = taskSpec.Copy()
}
rv[s] = subMap
}
return rv, nil
}
// GetTaskSpec returns the TaskSpec at the given RepoState, or an error if no
// such TaskSpec exists.
func GetTaskSpec(ctx context.Context, c TaskCfgCache, rs types.RepoState, name string) (*specs.TaskSpec, error) {
cfg, cachedErr, err := c.Get(ctx, rs)
if err != nil {
return nil, err
}
if cachedErr != nil {
return nil, cachedErr
}
t, ok := cfg.Tasks[name]
if !ok {
return nil, fmt.Errorf("No such task spec: %s @ %s", name, rs)
}
return t.Copy(), nil
}
// MakeJob is a helper function which retrieves the given JobSpec at the given
// RepoState and uses it to create a Job instance.
func MakeJob(ctx context.Context, c TaskCfgCache, rs types.RepoState, name string) (*types.Job, error) {
cfg, cachedErr, err := c.Get(ctx, rs)
if err != nil {
return nil, err
}
if cachedErr != nil {
return nil, cachedErr
}
spec, ok := cfg.Jobs[name]
if !ok {
return nil, fmt.Errorf("No such job: %s", name)
}
deps, err := spec.GetTaskSpecDAG(cfg)
if err != nil {
return nil, err
}
return &types.Job{
Created: now.Now(ctx),
Dependencies: deps,
Name: name,
Priority: spec.Priority,
RepoState: rs,
Tasks: map[string][]*types.TaskSummary{},
}, nil
}
// Cleanup removes cache entries which are outside of our scheduling window.
func (c *TaskCfgCacheImpl) Cleanup(ctx context.Context, period time.Duration) error {
periodStart := now.Now(ctx).Add(-period)
if err := c.cache.Cleanup(ctx, func(ctx context.Context, key string, val atomic_miss_cache.Value) bool {
cv := val.(*CachedValue)
details, err := cv.RepoState.GetCommit(c.repos)
return err != nil || details.Timestamp.Before(periodStart)
}); err != nil {
return err
}
return nil
}