blob: 83fdc4462961c9a3d7e5c8694d73e9ccf6f4c21c [file] [log] [blame]
package stages
import (
"bytes"
"context"
filesystem "io/fs"
"net/http"
"path"
"strings"
"sync"
"go.skia.org/infra/go/docker"
"go.skia.org/infra/go/git"
"go.skia.org/infra/go/gitiles"
"go.skia.org/infra/go/skerr"
"go.skia.org/infra/go/util"
"go.skia.org/infra/go/vfs"
"go.skia.org/infra/k8s-checker/go/k8s_config"
v1 "k8s.io/api/core/v1"
)
const (
// StageFilePath is the path to the json file.
StageFilePath = "pipeline/stages.json"
// GitTagPrefix is a prefix applied to all Docker image tags which
// correspond to Git commit hashes.
GitTagPrefix = "git-"
// PodAnnotationKey is the key of the annotation which is attached to
// PodTemplateSpec to map container names to stages.
PodAnnotationKey = "skia.org/stage"
)
var (
// TODO(borenet): Can we avoid hard-coding these directories?
configDirs = []string{"skia-corp", "skia-public", "skia-infra-corp", "skia-infra-public", "skia-infra-public-dev", "templates"}
)
// CommitResolver is a function used to resolve a short Git commit hash or ref
// to a full commit hash.
type CommitResolver func(ctx context.Context, repoURL, reference string) (string, error)
// GitilesCommitResolver returns a CommitResolver which uses gitiles and the
// given http.Client to resolve commits.
func GitilesCommitResolver(httpClient *http.Client) CommitResolver {
return func(ctx context.Context, repoURL, reference string) (string, error) {
return gitiles.NewRepo(repoURL, httpClient).ResolveRef(ctx, reference)
}
}
// StageManager provides utilities for working with release stage files.
type StageManager struct {
fs vfs.FS
docker docker.Client
commitResolver CommitResolver
}
// NewStageManager returns a StageManager instance. The http.Client is used for
// interacting with git repositories and should have the necessary
// authentication settings (eg. OAuth2.0 token source and scopes) attached.
func NewStageManager(ctx context.Context, fs vfs.FS, dockerClient docker.Client, commitResolver CommitResolver) *StageManager {
return &StageManager{
fs: fs,
docker: dockerClient,
commitResolver: commitResolver,
}
}
// AddImage adds the given image to the stage file. The gitRepo is optional and
// overrides the default git repo.
func (sm *StageManager) AddImage(ctx context.Context, image, gitRepo string) error {
return sm.updateImages(ctx, func(sf *StageFile) error {
if _, ok := sf.Images[image]; ok {
return skerr.Fmt("image %q already exists", image)
}
if sf.Images == nil {
sf.Images = map[string]*Image{}
}
sf.Images[image] = &Image{
GitRepo: gitRepo,
}
return nil
})
}
// RemoveImage removes the given image from the stage file.
func (sm *StageManager) RemoveImage(ctx context.Context, image string) error {
return sm.updateImages(ctx, func(sf *StageFile) error {
if _, ok := sf.Images[image]; !ok {
return skerr.Fmt("image %q does not exist", image)
}
delete(sf.Images, image)
return nil
})
}
// SetStage sets the given stage of the given image to match the given
// reference, which may be a Docker image digest, tag, or Git commit hash.
func (sm *StageManager) SetStage(ctx context.Context, image, stage, reference string) error {
registry, repository, _, err := docker.SplitImage(image)
if err != nil {
return skerr.Wrap(err)
}
return sm.updateImages(ctx, func(sf *StageFile) error {
img, ok := sf.Images[image]
if !ok {
return skerr.Fmt("unknown image %q", image)
}
// If the reference looks like a Git commit hash, query the git repository
// to validate it and to retrieve the full hash.
var gitHash string
maybeGitHash := strings.TrimPrefix(reference, GitTagPrefix)
if git.IsCommitHash(maybeGitHash) {
repoURL := img.GitRepo
if repoURL == "" {
repoURL = sf.DefaultGitRepo
}
fullHash, err := sm.commitResolver(ctx, repoURL, maybeGitHash)
// The ref may look like a git commit hash and not be, and could
// still be a valid Docker image tag, so we don't return the error
// here.
if err == nil {
gitHash = fullHash
reference = GitTagPrefix + fullHash
}
}
// Retrieve the digest of the image.
manifest, err := sm.docker.GetManifest(ctx, registry, repository, reference)
if err != nil {
return skerr.Wrap(err)
}
digest := manifest.Digest
// Find a "git-abc123" tag for the image, derive the commit hash.
if gitHash == "" {
instances, err := sm.docker.ListInstances(ctx, registry, repository)
if err != nil {
return skerr.Wrap(err)
}
instance, ok := instances[digest]
if !ok {
return skerr.Fmt("failed to find instance %q of %s", digest, image)
}
for _, tag := range instance.Tags {
if strings.HasPrefix(tag, GitTagPrefix) {
maybeHash := strings.TrimPrefix(tag, GitTagPrefix)
if git.IsFullCommitHash(maybeHash) {
gitHash = maybeHash
break
}
}
}
}
if gitHash == "" {
return skerr.Fmt("failed to find \"git-\" tag on instance %q of %s", digest, image)
}
// Update the stage file.
if img.Stages == nil {
img.Stages = map[string]*Stage{}
}
img.Stages[stage] = &Stage{
GitHash: gitHash,
Digest: digest,
}
return nil
})
}
// PromoteStage sets a given stage to the same image version as another.
func (sm *StageManager) PromoteStage(ctx context.Context, image, stageToMatch, stageToUpdate string) error {
return sm.updateImages(ctx, func(sf *StageFile) error {
img, ok := sf.Images[image]
if !ok {
return skerr.Fmt("image %q does not exist in %s", image, StageFilePath)
}
matchStage, ok := img.Stages[stageToMatch]
if !ok {
return skerr.Fmt("stage %q does not exist for image %s in %s", stageToMatch, image, StageFilePath)
}
img.Stages[stageToUpdate] = &Stage{
GitHash: matchStage.GitHash,
Digest: matchStage.Digest,
}
return nil
})
}
// RemoveStage removes the given stage from the stage file.
func (sm *StageManager) RemoveStage(ctx context.Context, image, stage string) error {
return sm.updateImages(ctx, func(sf *StageFile) error {
img, ok := sf.Images[image]
if !ok {
return skerr.Fmt("image %q does not exist in %s", image, StageFilePath)
}
if _, ok := img.Stages[stage]; !ok {
return skerr.Fmt("stage %q does not exist for image %s in %s", stage, image, StageFilePath)
}
delete(img.Stages, stage)
return nil
})
}
// Apply updates all config files to conform to the stage file.
func (sm *StageManager) Apply(ctx context.Context) error {
return sm.updateImages(ctx, func(sf *StageFile) error {
return nil
})
}
// ReadStageFile reads and returns the stage file.
func (sm *StageManager) ReadStageFile(ctx context.Context) (*StageFile, error) {
var rv *StageFile
if err := sm.updateImages(ctx, func(sf *StageFile) error {
rv = sf
return nil
}); err != nil {
return nil, skerr.Wrap(err)
}
return rv, nil
}
// updateImages generates updates to the stage file and configs in the config
// repo. The passed-in function may manipulate the StageFile as desired, and
// updateImages will handle updates to the other config files.
func (sm *StageManager) updateImages(ctx context.Context, fn func(*StageFile) error) error {
oldContents, err := vfs.ReadFile(ctx, sm.fs, StageFilePath)
if err != nil {
return skerr.Wrap(err)
}
sf, err := Decode([]byte(oldContents))
if err != nil {
return skerr.Wrap(err)
}
if err := fn(sf); err != nil {
return skerr.Wrap(err)
}
newContents, err := sf.Encode()
if err != nil {
return skerr.Wrap(err)
}
var changesMtx sync.Mutex
changes := map[string][]byte{}
if !bytes.Equal(oldContents, newContents) {
changes[StageFilePath] = newContents
}
eg := util.NewNamedErrGroup()
for _, configDir := range configDirs {
err := vfs.Walk(ctx, sm.fs, configDir, func(fp string, info filesystem.FileInfo, err error) error {
if err != nil {
return skerr.Wrap(err)
}
if info.IsDir() {
return nil
}
if !strings.HasSuffix(fp, ".yaml") && !strings.HasSuffix(fp, ".yml") {
return nil
}
eg.Go(fp, func() error {
// Download the file contents.
fullPath := path.Join(configDir, fp)
oldContents, err := vfs.ReadFile(ctx, sm.fs, fp)
if err != nil {
return skerr.Wrapf(err, "failed to retrieve contents of %s", fullPath)
}
// Parse the k8s configs from the file.
k8sConfig, byteRanges, err := k8s_config.ParseK8sConfigFile(oldContents)
if err != nil {
return skerr.Wrap(err)
}
// Update the images of the containers.
newContents, err := updateK8sConfig(sf, k8sConfig, byteRanges, oldContents)
if err != nil {
return skerr.Wrap(err)
}
if !bytes.Equal(oldContents, newContents) {
changesMtx.Lock()
changes[fp] = newContents
changesMtx.Unlock()
}
return nil
})
return nil
})
if err != nil {
return skerr.Wrap(err)
}
}
if err := eg.Wait(); err != nil {
return skerr.Wrapf(err, "failed to update configs")
}
for path, newContents := range changes {
if err := vfs.WriteFile(ctx, sm.fs, path, newContents); err != nil {
return skerr.Wrap(err)
}
}
return nil
}
func updateK8sConfig(sf *StageFile, k8sConfig *k8s_config.K8sConfigFile, byteRanges map[interface{}]*k8s_config.ByteRange, oldContents []byte) ([]byte, error) {
newContents := make([]byte, len(oldContents))
copy(newContents, oldContents)
update := func(obj interface{}, name string, podSpec *v1.PodTemplateSpec) error {
byteRange, ok := byteRanges[obj]
if !ok {
return skerr.Fmt("no byte range found!")
}
oldSlice := newContents[byteRange.Start:byteRange.End]
newSlice, err := updatePodSpec(sf, podSpec, oldSlice)
if err != nil {
return skerr.Wrapf(err, "updating containers of %s", name)
}
contentsPre := newContents[:byteRange.Start]
contentsPost := newContents[byteRange.End:]
newContents = append(contentsPre, newSlice...)
newContents = append(newContents, contentsPost...)
return nil
}
for _, cronJob := range k8sConfig.CronJob {
if err := update(cronJob, cronJob.Name, &cronJob.Spec.JobTemplate.Spec.Template); err != nil {
return nil, skerr.Wrap(err)
}
}
for _, daemonSet := range k8sConfig.DaemonSet {
if err := update(daemonSet, daemonSet.Name, &daemonSet.Spec.Template); err != nil {
return nil, skerr.Wrap(err)
}
}
for _, deployment := range k8sConfig.Deployment {
if err := update(deployment, deployment.Name, &deployment.Spec.Template); err != nil {
return nil, skerr.Wrap(err)
}
}
for _, statefulSet := range k8sConfig.StatefulSet {
if err := update(statefulSet, statefulSet.Name, &statefulSet.Spec.Template); err != nil {
return nil, skerr.Wrap(err)
}
}
return newContents, nil
}
func updatePodSpec(sf *StageFile, podSpec *v1.PodTemplateSpec, contents []byte) ([]byte, error) {
stagesMap, err := GetStagesFromAnnotations(podSpec.Annotations)
if err != nil {
return nil, skerr.Wrap(err)
}
if len(stagesMap) == 0 {
return contents, nil
}
for _, container := range podSpec.Spec.Containers {
stage, ok := stagesMap[container.Name]
if !ok {
continue
}
imagePath := strings.Split(container.Image, "@")[0]
img, ok := sf.Images[imagePath]
if !ok {
continue
}
stg, ok := img.Stages[stage]
if !ok {
continue
}
if bytes.Count(contents, []byte(container.Image)) > 1 {
return nil, skerr.Fmt("found more than one instance of %s", container.Image)
}
contents = bytes.Replace(contents, []byte(container.Image), []byte(imagePath+"@"+stg.Digest), 1)
}
return contents, nil
}
func GetStagesFromAnnotations(annotations map[string]string) (map[string]string, error) {
value, ok := annotations[PodAnnotationKey]
if !ok {
return map[string]string{}, nil
}
pairs := strings.Split(value, ",")
rv := map[string]string{}
for _, pair := range pairs {
splitPair := strings.Split(pair, ":")
if len(splitPair) != 2 {
return nil, skerr.Fmt("invalid value for %q annotation; expected \"image:stage[, image:stage]\"", PodAnnotationKey)
}
rv[strings.TrimSpace(splitPair[0])] = strings.TrimSpace(splitPair[1])
}
return rv, nil
}