| package task_cfg_cache |
| |
| import ( |
| "bytes" |
| "context" |
| "encoding/gob" |
| "errors" |
| "fmt" |
| "time" |
| |
| "cloud.google.com/go/bigtable" |
| "go.opencensus.io/trace" |
| "go.skia.org/infra/go/atomic_miss_cache" |
| "go.skia.org/infra/go/git/repograph" |
| "go.skia.org/infra/go/now" |
| "go.skia.org/infra/go/sklog" |
| "go.skia.org/infra/task_scheduler/go/specs" |
| "go.skia.org/infra/task_scheduler/go/types" |
| "golang.org/x/oauth2" |
| "google.golang.org/api/option" |
| ) |
| |
| const ( |
| // BigTable configuration. |
| |
| // We use a single BigTable table for storing gob-encoded TaskSpecs and |
| // JobSpecs. |
| BT_TABLE = "tasks-cfg" |
| |
| // We use a single BigTable column family. |
| BT_COLUMN_FAMILY = "CFGS" |
| |
| // We use a single BigTable column which stores gob-encoded TaskSpecs |
| // and JobSpecs. |
| BT_COLUMN = "CFG" |
| |
| INSERT_TIMEOUT = 30 * time.Second |
| QUERY_TIMEOUT = 5 * time.Second |
| ) |
| |
| var ( |
| // Fully-qualified BigTable column name. |
| BT_COLUMN_FULL = fmt.Sprintf("%s:%s", BT_COLUMN_FAMILY, BT_COLUMN) |
| |
| ErrNoSuchEntry = atomic_miss_cache.ErrNoSuchEntry |
| ) |
| |
| type TaskCfgCache interface { |
| // Cleanup removes cache entries which are outside of our scheduling window. |
| Cleanup(ctx context.Context, period time.Duration) error |
| |
| // Close frees up resources used by the TaskCfgCache. |
| Close() error |
| |
| // Get returns the TasksCfg (or error) for the given RepoState in the cache. |
| // If the given entry does not exist in the cache, reads through to the |
| // backing store to find it there and adds to the cache if it exists. If no |
| // entry exists for the given RepoState, returns ErrNoSuchEntry. If there is |
| // a cached (ie. permanent non-recoverable) error for this RepoState, it is |
| // returned as the second return value. |
| Get(context.Context, types.RepoState) (*specs.TasksCfg, error, error) |
| |
| // Sets the TasksCfg (or error) for the given RepoState in the cache. |
| Set(ctx context.Context, rs types.RepoState, cfg *specs.TasksCfg, storedErr error) error |
| |
| // Sets the TasksCfg (or error) for the given RepoState in the cache by |
| // calling the given function if no value already exists. Returns the |
| // existing or new CachedValue, or any error which occurred. |
| SetIfUnset(ctx context.Context, rs types.RepoState, fn func(context.Context) (*CachedValue, error)) (*CachedValue, error) |
| } |
| |
| // TaskCfgCacheImpl is a struct used for caching tasks cfg files. The user should |
| // periodically call Cleanup() to remove old entries. |
| type TaskCfgCacheImpl struct { |
| cache *atomic_miss_cache.AtomicMissCache |
| client *bigtable.Client |
| repos repograph.Map |
| } |
| |
| // backingCache implements persistent storage of TasksCfgs in BigTable. |
| type backingCache struct { |
| table *bigtable.Table |
| tcc *TaskCfgCacheImpl |
| } |
| |
| // CachedValue represents a cached TasksCfg value. It includes any permanent |
| // error, which cannot be recovered via retries. |
| type CachedValue struct { |
| // RepoState is the RepoState which is associated with this TasksCfg. |
| RepoState types.RepoState |
| // Cfg is the TasksCfg for this entry. |
| Cfg *specs.TasksCfg |
| // Err stores a permanent error. Mutually-exclusive with Cfg. |
| Err string |
| } |
| |
| // See documentation for atomic_miss_cache.ICache interface. |
| func (c *backingCache) Get(ctx context.Context, key string) (atomic_miss_cache.Value, error) { |
| return GetTasksCfgFromBigTable(ctx, c.table, key) |
| } |
| |
| // See documentation for atomic_miss_cache.ICache interface. |
| func (c *backingCache) Set(ctx context.Context, key string, val atomic_miss_cache.Value) error { |
| cv := val.(*CachedValue) |
| if !cv.RepoState.Valid() { |
| return fmt.Errorf("Invalid RepoState: %+v", cv.RepoState) |
| } |
| return WriteTasksCfgToBigTable(ctx, c.table, key, cv) |
| } |
| |
| // See documentation for atomic_miss_cache.ICache interface. |
| func (c *backingCache) Delete(ctx context.Context, key string) error { |
| // We don't delete from BigTable. |
| return nil |
| } |
| |
| // NewTaskCfgCache returns a TaskCfgCache instance. |
| func NewTaskCfgCache(ctx context.Context, repos repograph.Map, btProject, btInstance string, ts oauth2.TokenSource) (*TaskCfgCacheImpl, error) { |
| client, err := bigtable.NewClient(ctx, btProject, btInstance, option.WithTokenSource(ts)) |
| if err != nil { |
| return nil, fmt.Errorf("Failed to create BigTable client: %s", err) |
| } |
| table := client.Open(BT_TABLE) |
| c := &TaskCfgCacheImpl{ |
| client: client, |
| repos: repos, |
| } |
| c.cache = atomic_miss_cache.New(&backingCache{ |
| table: table, |
| tcc: c, |
| }) |
| return c, nil |
| } |
| |
| // GetTasksCfgFromBigTable retrieves a CachedValue from BigTable. |
| func GetTasksCfgFromBigTable(ctx context.Context, table *bigtable.Table, rowKey string) (*CachedValue, error) { |
| var rv *CachedValue |
| var rvErr error |
| if err := table.ReadRows(ctx, bigtable.PrefixRange(rowKey), func(row bigtable.Row) bool { |
| for _, ri := range row[BT_COLUMN_FAMILY] { |
| if ri.Column == BT_COLUMN_FULL { |
| var cv CachedValue |
| rvErr = gob.NewDecoder(bytes.NewReader(ri.Value)).Decode(&cv) |
| if rvErr == nil { |
| rv = &cv |
| } |
| return false |
| } |
| } |
| return true |
| }); err != nil { |
| return nil, err |
| } |
| if rvErr != nil { |
| return nil, rvErr |
| } |
| if rv == nil { |
| return nil, ErrNoSuchEntry |
| } |
| return rv, nil |
| } |
| |
| // WriteTasksCfgToBigTable writes the given CachedValue to BigTable. |
| func WriteTasksCfgToBigTable(ctx context.Context, table *bigtable.Table, key string, cv *CachedValue) error { |
| rowKey := cv.RepoState.RowKey() |
| if rowKey != key { |
| return fmt.Errorf("Key doesn't match RepoState.RowKey(): %s vs %s", key, rowKey) |
| } |
| var buf bytes.Buffer |
| if err := gob.NewEncoder(&buf).Encode(cv); err != nil { |
| return err |
| } |
| mut := bigtable.NewMutation() |
| mut.Set(BT_COLUMN_FAMILY, BT_COLUMN, bigtable.ServerTime, buf.Bytes()) |
| return table.Apply(ctx, rowKey, mut) |
| } |
| |
| // Close implements TaskCfgCache. |
| func (c *TaskCfgCacheImpl) Close() error { |
| return c.client.Close() |
| } |
| |
| // Get implements TaskCfgCache. |
| func (c *TaskCfgCacheImpl) Get(ctx context.Context, rs types.RepoState) (*specs.TasksCfg, error, error) { |
| ctx, span := trace.StartSpan(ctx, "taskcfgcache_Get", trace.WithSampler(trace.ProbabilitySampler(0.01))) |
| defer span.End() |
| val, err := c.cache.Get(ctx, rs.RowKey()) |
| if err != nil { |
| return nil, nil, err |
| } |
| cv := val.(*CachedValue) |
| if cv.Err != "" { |
| return nil, errors.New(cv.Err), nil |
| } |
| return cv.Cfg, nil, nil |
| } |
| |
| // Sets the TasksCfg (or error) for the given RepoState in the cache. |
| func (c *TaskCfgCacheImpl) Set(ctx context.Context, rs types.RepoState, cfg *specs.TasksCfg, storedErr error) error { |
| errString := "" |
| if storedErr != nil { |
| errString = storedErr.Error() |
| } |
| return c.cache.Set(ctx, rs.RowKey(), atomic_miss_cache.Value(&CachedValue{ |
| RepoState: rs, |
| Cfg: cfg, |
| Err: errString, |
| })) |
| } |
| |
| // Sets the TasksCfg (or error) for the given RepoState in the cache by calling |
| // the given function if no value already exists. Returns the existing or new |
| // CachedValue, or any error which occurred. |
| func (c *TaskCfgCacheImpl) SetIfUnset(ctx context.Context, rs types.RepoState, fn func(context.Context) (*CachedValue, error)) (*CachedValue, error) { |
| cv, err := c.cache.SetIfUnset(ctx, rs.RowKey(), func(ctx context.Context) (atomic_miss_cache.Value, error) { |
| val, err := fn(ctx) |
| return val, err |
| }) |
| if err != nil { |
| return nil, err |
| } |
| return cv.(*CachedValue), nil |
| } |
| |
| // getTaskSpecsForRepoStates returns a set of TaskSpecs for each of the given |
| // set of RepoStates, keyed by RepoState and TaskSpec name. If any of the |
| // RepoStates do not have a corresponding entry in the cache, they are simply |
| // left out. |
| func (c *TaskCfgCacheImpl) getTaskSpecsForRepoStates(ctx context.Context, rs []types.RepoState) (map[types.RepoState]map[string]*specs.TaskSpec, error) { |
| rv := make(map[types.RepoState]map[string]*specs.TaskSpec, len(rs)) |
| for _, s := range rs { |
| cached, err := c.cache.Get(ctx, s.RowKey()) |
| if err == ErrNoSuchEntry { |
| sklog.Errorf("Entry not found in cache: %+v", s) |
| continue |
| } else if err != nil { |
| return nil, err |
| } |
| val := cached.(*CachedValue) |
| if val.Err != "" { |
| sklog.Errorf("Cached entry has permanent error; skipping: %s", val.Err) |
| continue |
| } |
| subMap := make(map[string]*specs.TaskSpec, len(val.Cfg.Tasks)) |
| for name, taskSpec := range val.Cfg.Tasks { |
| subMap[name] = taskSpec.Copy() |
| } |
| rv[s] = subMap |
| } |
| return rv, nil |
| } |
| |
| // GetTaskSpec returns the TaskSpec at the given RepoState, or an error if no |
| // such TaskSpec exists. |
| func GetTaskSpec(ctx context.Context, c TaskCfgCache, rs types.RepoState, name string) (*specs.TaskSpec, error) { |
| cfg, cachedErr, err := c.Get(ctx, rs) |
| if err != nil { |
| return nil, err |
| } |
| if cachedErr != nil { |
| return nil, cachedErr |
| } |
| t, ok := cfg.Tasks[name] |
| if !ok { |
| return nil, fmt.Errorf("No such task spec: %s @ %s", name, rs) |
| } |
| return t.Copy(), nil |
| } |
| |
| // MakeJob is a helper function which retrieves the given JobSpec at the given |
| // RepoState and uses it to create a Job instance. |
| func MakeJob(ctx context.Context, c TaskCfgCache, rs types.RepoState, name string) (*types.Job, error) { |
| cfg, cachedErr, err := c.Get(ctx, rs) |
| if err != nil { |
| return nil, err |
| } |
| if cachedErr != nil { |
| return nil, cachedErr |
| } |
| spec, ok := cfg.Jobs[name] |
| if !ok { |
| return nil, fmt.Errorf("No such job: %s", name) |
| } |
| deps, err := spec.GetTaskSpecDAG(cfg) |
| if err != nil { |
| return nil, err |
| } |
| |
| return &types.Job{ |
| Created: now.Now(ctx), |
| Dependencies: deps, |
| Name: name, |
| Priority: spec.Priority, |
| RepoState: rs, |
| Tasks: map[string][]*types.TaskSummary{}, |
| }, nil |
| } |
| |
| // Cleanup removes cache entries which are outside of our scheduling window. |
| func (c *TaskCfgCacheImpl) Cleanup(ctx context.Context, period time.Duration) error { |
| periodStart := now.Now(ctx).Add(-period) |
| if err := c.cache.Cleanup(ctx, func(ctx context.Context, key string, val atomic_miss_cache.Value) bool { |
| cv := val.(*CachedValue) |
| details, err := cv.RepoState.GetCommit(c.repos) |
| return err != nil || details.Timestamp.Before(periodStart) |
| }); err != nil { |
| return err |
| } |
| return nil |
| } |