blob: 0db911d6e159a5cdb151cca97eaac3ba0ce23ef4 [file] [log] [blame]
package rbe
import (
"context"
"sort"
"github.com/bazelbuild/remote-apis-sdks/go/pkg/client"
"github.com/bazelbuild/remote-apis-sdks/go/pkg/command"
"github.com/bazelbuild/remote-apis-sdks/go/pkg/digest"
"github.com/bazelbuild/remote-apis-sdks/go/pkg/filemetadata"
"github.com/bazelbuild/remote-apis-sdks/go/pkg/uploadinfo"
remoteexecution "github.com/bazelbuild/remote-apis/build/bazel/remote/execution/v2"
"github.com/golang/protobuf/ptypes/timestamp"
"github.com/golang/protobuf/ptypes/wrappers"
"go.opencensus.io/trace"
"golang.org/x/oauth2"
grpc_oauth "google.golang.org/grpc/credentials/oauth"
"go.skia.org/infra/go/cas"
"go.skia.org/infra/go/skerr"
)
const (
// ExcludeGitDir is a regular expression which may be passed to
// Client.Upload to exclude the ".git" directory.
ExcludeGitDir = `^(.*\/)*\.git(\/.*)*$`
// InstanceChromiumSwarm is the RBE instance used for chromium-swarm.
InstanceChromiumSwarm = "projects/chromium-swarm/instances/default_instance"
// InstanceChromiumSwarmDev is the RBE instance used for chromium-swarm-dev.
InstanceChromiumSwarmDev = "projects/chromium-swarm-dev/instances/default_instance"
// InstanceChromeSwarming is the RBE instance used for chrome-swarming.
InstanceChromeSwarming = "projects/chrome-swarming/instances/default_instance"
rbeService = "remotebuildexecution.googleapis.com:443"
)
var (
// EmptyDigest is the digest of an empty entry in RBE-CAS.
EmptyDigest = digest.Empty.String()
)
// StringToDigest splits the digest string into a digest.Digest instance.
func StringToDigest(str string) (string, int64, error) {
digest, err := digest.NewFromString(str)
if err != nil {
return "", 0, skerr.Wrap(err)
}
return digest.Hash, digest.Size, nil
}
// DigestToString creates a string for the digest with the given hash and size.
func DigestToString(hash string, size int64) string {
return digest.Digest{
Hash: hash,
Size: size,
}.String()
}
// Client is a struct used to interact with RBE-CAS.
type Client struct {
client RBEClient
instance string
}
// RBEClient is an abstraction of client.Client which enables mocks for testing.
type RBEClient interface {
Close() error
ComputeMerkleTree(ctx context.Context, execRoot, workingDir, remoteWorkingDir string, is *command.InputSpec, cache filemetadata.Cache) (root digest.Digest, inputs []*uploadinfo.Entry, stats *client.TreeStats, err error)
DownloadDirectory(ctx context.Context, d digest.Digest, execRoot string, cache filemetadata.Cache) (map[string]*client.TreeOutput, *client.MovedBytesMetadata, error)
GetDirectoryTree(ctx context.Context, d *remoteexecution.Digest) (result []*remoteexecution.Directory, err error)
UploadIfMissing(ctx context.Context, data ...*uploadinfo.Entry) ([]digest.Digest, int64, error)
}
// NewClient returns a Client instance.
func NewClient(ctx context.Context, instance string, ts oauth2.TokenSource) (*Client, error) {
client, err := client.NewClient(ctx, instance, client.DialParams{
Service: rbeService,
TransportCredsOnly: true,
}, &client.PerRPCCreds{
Creds: grpc_oauth.TokenSource{
TokenSource: ts,
},
})
if err != nil {
return nil, skerr.Wrap(err)
}
return &Client{
client: client,
instance: instance,
}, nil
}
// Upload the given paths to RBE-CAS.
func (c *Client) Upload(ctx context.Context, root string, paths, excludes []string) (string, error) {
ctx, span := trace.StartSpan(ctx, "cas_rbe_Upload")
span.AddAttributes(trace.StringAttribute("root", root))
span.AddAttributes(trace.Int64Attribute("num_paths", int64(len(paths))))
span.AddAttributes(trace.Int64Attribute("num_excludes", int64(len(excludes))))
defer span.End()
ex := make([]*command.InputExclusion, 0, len(excludes))
for _, regex := range excludes {
ex = append(ex, &command.InputExclusion{
Regex: regex,
Type: command.UnspecifiedInputType,
})
}
is := command.InputSpec{
Inputs: paths,
InputExclusions: ex,
}
rootDigest, entries, _, err := c.client.ComputeMerkleTree(ctx, root, "" /* =workingDir */, "" /* =remoteWorkingDir */, &is, filemetadata.NewNoopCache())
if err != nil {
return "", skerr.Wrap(err)
}
if _, _, err := c.client.UploadIfMissing(ctx, entries...); err != nil {
return "", skerr.Wrap(err)
}
return rootDigest.String(), nil
}
// Download the given digests from RBE-CAS.
func (c *Client) Download(ctx context.Context, root, casDigest string) error {
ctx, span := trace.StartSpan(ctx, "cas_rbe_Download")
defer span.End()
d, err := digest.NewFromString(casDigest)
if err != nil {
return skerr.Wrap(err)
}
if _, _, err := c.client.DownloadDirectory(ctx, d, root, filemetadata.NewNoopCache()); err != nil {
return skerr.Wrap(err)
}
return nil
}
// checkNodePropertiesIdentical returns an error if the two NodeProperties are
// not identical.
func checkNodePropertiesIdentical(a, b *remoteexecution.NodeProperties) error {
if (a != nil) != (b != nil) {
return skerr.Fmt("One NodeProperties is set while the other is not")
}
if a != nil {
if (a.Mtime != nil) != (b.Mtime != nil) {
return skerr.Fmt("One file has Mtime while the other does not")
}
if a.Mtime != nil {
if !a.Mtime.AsTime().Equal(b.Mtime.AsTime()) {
return skerr.Fmt("Mtime %q != %q", a.Mtime, b.Mtime)
}
}
if (a.UnixMode != nil) != (b.UnixMode != nil) {
return skerr.Fmt("One NodeProperties has UnixMode while the other does not")
}
if a.UnixMode != nil {
if a.UnixMode.Value != b.UnixMode.Value {
return skerr.Fmt("UnixMode %q != %q", a.UnixMode.Value, b.UnixMode.Value)
}
}
if len(a.Properties) != len(b.Properties) {
return skerr.Fmt("Properties differ in length; %d vs %d", len(a.Properties), len(b.Properties))
}
propsA := make([]string, 0, len(a.Properties))
for _, prop := range a.Properties {
propsA = append(propsA, prop.String())
}
sort.Strings(propsA)
propsB := make([]string, 0, len(b.Properties))
for _, prop := range b.Properties {
propsB = append(propsB, prop.String())
}
sort.Strings(propsB)
for idx, propA := range propsA {
propB := propsB[idx]
if propA != propB {
return skerr.Fmt("Properties differ: %s vs %s", propA, propB)
}
}
}
return nil
}
// checkFilesIdentical returns an error if the two FileNodes are not identical.
func checkFilesIdentical(a, b *remoteexecution.FileNode) error {
if a.Name != b.Name {
return skerr.Fmt("Name %q != %q", a.Name, b.Name)
}
if a.IsExecutable != b.IsExecutable {
return skerr.Fmt("Executable %v != %v", a.IsExecutable, b.IsExecutable)
}
if (a.Digest != nil) != (b.Digest != nil) {
return skerr.Fmt("One file has a digest while the other does not")
}
if a.Digest != nil && b.Digest != nil {
if a.Digest.Hash != b.Digest.Hash {
return skerr.Fmt("Digest hash %q != %q", a.Digest.Hash, b.Digest.Hash)
}
if a.Digest.SizeBytes != b.Digest.SizeBytes {
return skerr.Fmt("Size %q != %q", a.Digest.SizeBytes, b.Digest.SizeBytes)
}
}
return checkNodePropertiesIdentical(a.NodeProperties, b.NodeProperties)
}
// checkSymlinksIdentical returns an error if the two SymlinkNodes are not
// identical.
func checkSymlinksIdentical(a, b *remoteexecution.SymlinkNode) error {
if a.Name != b.Name {
return skerr.Fmt("Name %q != %q", a.Name, b.Name)
}
if a.Target != b.Target {
return skerr.Fmt("Target %q != %q", a.Target, b.Target)
}
return checkNodePropertiesIdentical(a.NodeProperties, b.NodeProperties)
}
// directoryNode helps to organize a tree of remoteexecution.Directory.
type directoryNode struct {
*remoteexecution.Directory
Children map[string]*directoryNode
}
// makeTree builds a tree of directoryNodes using the given Directories.
// Requires that the map of Directories is complete, ie. each of the
// subdirectory digests referenced by each of the Directories is itself present
// in the map.
func makeTree(dirsByDigest map[digest.Digest]*remoteexecution.Directory, d digest.Digest) (*directoryNode, error) {
dir, ok := dirsByDigest[d]
if !ok {
return nil, skerr.Fmt("no information provided for digest %s", d.String())
}
rv := &directoryNode{
Directory: dir,
Children: make(map[string]*directoryNode, len(dir.Directories)),
}
for _, childDir := range rv.Directories {
childNode, err := makeTree(dirsByDigest, digest.NewFromProtoUnvalidated(childDir.Digest))
if err != nil {
return nil, err
}
rv.Children[childDir.Name] = childNode
}
return rv, nil
}
// mergeTrees merges the given trees of directoryNodes into a new directoryNode.
// Returns an error if the two trees are incompatible, eg. trees that contain
// files with the same name but different digests.
func (c *Client) mergeTrees(ctx context.Context, a, b *directoryNode) (*directoryNode, error) {
if a == nil {
return b, nil
}
if b == nil {
return a, nil
}
// Files.
filesMap := map[string]*remoteexecution.FileNode{}
for _, file := range a.Files {
filesMap[file.Name] = file
}
for _, file := range b.Files {
exist, ok := filesMap[file.Name]
if ok {
if err := checkFilesIdentical(file, exist); err != nil {
return nil, skerr.Wrapf(err, file.Name)
}
} else {
filesMap[file.Name] = file
}
}
fileNames := make([]string, 0, len(filesMap))
for name := range filesMap {
fileNames = append(fileNames, name)
}
sort.Strings(fileNames)
files := make([]*remoteexecution.FileNode, 0, len(filesMap))
for _, fileName := range fileNames {
files = append(files, filesMap[fileName])
}
// Symlinks.
symlinksMap := map[string]*remoteexecution.SymlinkNode{}
for _, symlink := range a.Symlinks {
symlinksMap[symlink.Name] = symlink
}
for _, symlink := range b.Symlinks {
exist, ok := symlinksMap[symlink.Name]
if ok {
if err := checkSymlinksIdentical(symlink, exist); err != nil {
return nil, skerr.Wrapf(err, symlink.Name)
}
} else {
symlinksMap[symlink.Name] = symlink
}
}
symlinkNames := make([]string, 0, len(symlinksMap))
for name := range symlinksMap {
symlinkNames = append(symlinkNames, name)
}
sort.Strings(symlinkNames)
symlinks := make([]*remoteexecution.SymlinkNode, 0, len(symlinksMap))
for _, symlinkName := range symlinkNames {
symlinks = append(symlinks, symlinksMap[symlinkName])
}
// Directories.
children := map[string]*directoryNode{}
dirsMap := map[string]*remoteexecution.DirectoryNode{}
for _, dir := range a.Directories {
dirsMap[dir.Name] = dir
}
for _, dir := range b.Directories {
if _, ok := dirsMap[dir.Name]; ok {
merged, err := c.mergeTrees(ctx, a.Children[dir.Name], b.Children[dir.Name])
if err != nil {
return nil, skerr.Wrap(err)
}
entry, err := uploadinfo.EntryFromProto(merged.Directory)
if err != nil {
return nil, skerr.Wrap(err)
}
if _, _, err := c.client.UploadIfMissing(ctx, entry); err != nil {
return nil, skerr.Wrap(err)
}
dn := &remoteexecution.DirectoryNode{
Name: dir.Name,
Digest: entry.Digest.ToProto(),
}
dirsMap[dir.Name] = dn
children[dir.Name] = merged
} else {
dirsMap[dir.Name] = dir
children[dir.Name] = b.Children[dir.Name]
}
}
dirNames := make([]string, 0, len(dirsMap))
for name := range dirsMap {
dirNames = append(dirNames, name)
}
sort.Strings(dirNames)
dirs := make([]*remoteexecution.DirectoryNode, 0, len(dirsMap))
for _, dirName := range dirNames {
dirs = append(dirs, dirsMap[dirName])
}
// Properties.
properties := map[string]string{}
var mTime *timestamp.Timestamp
var unixMode *wrappers.UInt32Value
if a.NodeProperties != nil {
for _, prop := range a.NodeProperties.Properties {
properties[prop.Name] = prop.Value
}
if a.NodeProperties.Mtime != nil {
mTime = a.NodeProperties.Mtime
}
if a.NodeProperties.UnixMode != nil {
unixMode = a.NodeProperties.UnixMode
}
}
if b.NodeProperties != nil {
// TODO(borenet): Do we need to make sure that both trees have the same
// set of property keys?
for _, prop := range b.NodeProperties.Properties {
exist, ok := properties[prop.Name]
if ok {
if exist != prop.Value {
return nil, skerr.Fmt("Property %q has conflicting values %q and %q", prop.Name, prop.Value, exist)
}
} else {
properties[prop.Name] = prop.Value
}
}
if b.NodeProperties.Mtime != nil {
if mTime == nil {
mTime = b.NodeProperties.Mtime
} else if mTime.AsTime().Before(b.NodeProperties.Mtime.AsTime()) {
mTime = b.NodeProperties.Mtime
}
}
if b.NodeProperties.UnixMode != nil {
if unixMode == nil {
unixMode = b.NodeProperties.UnixMode
} else if unixMode.Value != b.NodeProperties.UnixMode.Value {
return nil, skerr.Fmt("Directory has conflicting modes %v and %v", unixMode, b.NodeProperties.UnixMode)
}
}
}
var nodeProps *remoteexecution.NodeProperties
if len(properties) > 0 || mTime != nil || unixMode != nil {
var propList []*remoteexecution.NodeProperty
if len(properties) > 0 {
propList = make([]*remoteexecution.NodeProperty, 0, len(properties))
for name, value := range properties {
propList = append(propList, &remoteexecution.NodeProperty{
Name: name,
Value: value,
})
}
}
nodeProps = &remoteexecution.NodeProperties{
Properties: propList,
Mtime: mTime,
UnixMode: unixMode,
}
}
rv := &directoryNode{
Directory: &remoteexecution.Directory{
Files: files,
Directories: dirs,
Symlinks: symlinks,
NodeProperties: nodeProps,
},
Children: children,
}
return rv, nil
}
// Merge the given digests, returning a new digest which contains all of them.
func (c *Client) Merge(ctx context.Context, digests []string) (string, error) {
// Shortcut for empty/single inputs.
if len(digests) == 0 {
return digest.Empty.String(), nil
} else if len(digests) == 1 {
return digests[0], nil
}
ctx, span := trace.StartSpan(ctx, "cas_rbe_Merge")
span.AddAttributes(trace.Int64Attribute("num_digests", int64(len(digests))))
defer span.End()
// Build an in-memory directory tree for each of the given digests.
var trees []*directoryNode
for _, casDigest := range digests {
// Normalize the digest and retrieve the directory tree from the API.
d, err := digest.NewFromString(casDigest)
if err != nil {
return "", skerr.Wrap(err)
}
dirs, err := c.client.GetDirectoryTree(ctx, d.ToProto())
if err != nil {
return "", skerr.Wrapf(err, "failed retrieving %s", d.String())
}
// Start by creating a map of Directory digest to Directory, so that we
// don't have to rely on the ordering of the Directories returned by
// GetDirectoryTree.
dirsByDigest := make(map[digest.Digest]*remoteexecution.Directory, len(dirs))
rootDigest, err := digest.NewFromMessage(dirs[0])
if err != nil {
return "", skerr.Wrapf(err, "failed to create digest for root dir: %+v", dirs[0])
}
dirsByDigest[rootDigest] = dirs[0]
for _, dir := range dirs[1:] {
d, err := digest.NewFromMessage(dir)
if err != nil {
return "", skerr.Wrapf(err, "failed to create digest for dir: %+v", dir)
}
dirsByDigest[d] = dir
}
// Build the in-memory tree representation.
tree, err := makeTree(dirsByDigest, rootDigest)
if err != nil {
return "", skerr.Wrapf(err, "failed to create tree for digest %q", casDigest)
}
trees = append(trees, tree)
}
// Merge the trees.
root := trees[0]
for _, tree := range trees[1:] {
var err error
root, err = c.mergeTrees(ctx, root, tree)
if err != nil {
return "", skerr.Wrap(err)
}
}
// Upload the new root.
entry, err := uploadinfo.EntryFromProto(root.Directory)
if err != nil {
return "", skerr.Wrap(err)
}
if _, _, err := c.client.UploadIfMissing(ctx, entry); err != nil {
return "", skerr.Wrap(err)
}
return entry.Digest.String(), nil
}
// Close implements cas.CAS.
func (c *Client) Close() error {
return c.client.Close()
}
func GetCASInstance(c cas.CAS) (string, error) {
client, ok := c.(*Client)
if !ok {
return "", skerr.Fmt("CAS is not an instance of rbe.Client")
}
return client.instance, nil
}
var _ cas.CAS = &Client{}