blob: ea04faceb51d079cc3a4e2ece2c8b7335529e2cf [file] [log] [blame]
package gcsuploader
import (
"bytes"
"context"
"encoding/json"
"fmt"
"io/ioutil"
"net/http"
"runtime"
"strings"
"time"
gstorage "cloud.google.com/go/storage"
"github.com/cenkalti/backoff"
"google.golang.org/api/option"
"go.skia.org/infra/go/exec"
"go.skia.org/infra/go/gcs"
"go.skia.org/infra/go/skerr"
)
const (
// gcsPrefix is the expected prefix for a GCS URL.
gcsPrefix = "gs://"
)
// GCSUploader implementations provide functions to upload to GCS.
type GCSUploader interface {
// UploadBytes copies a local file to GCS. If data is provided, those
// bytes may be used instead of read again from disk.
// The dst string is assumed to have a gs:// prefix.
// Currently only uploading from a local file to GCS is supported, that is
// one cannot use gs://foo/bar as 'fileName'
UploadBytes(ctx context.Context, data []byte, fallbackSrc, dst string) error
// UploadJSON serializes the given data to JSON and uploads the result to GCS.
// An implementation can use tempFileName for temporary storage of JSON data.
UploadJSON(ctx context.Context, data interface{}, tempFileName, gcsObjectPath string) error
}
// GsutilImpl implements the GCSUploader and ImageDownloader interfaces.
type GsutilImpl struct{}
// UploadJSON serializes the given data to JSON and writes the result to the given
// tempFileName, then it copies the file to the given path in GCS. gcsObjPath is assumed
// to have the form: <bucket_name>/path/to/object
func (g *GsutilImpl) UploadJSON(ctx context.Context, data interface{}, tempFileName, gcsObjPath string) error {
jsonBytes, err := json.Marshal(data)
if err != nil {
return skerr.Wrapf(err, "could not marshal to JSON before uploading")
}
if err := ioutil.WriteFile(tempFileName, jsonBytes, 0644); err != nil {
return skerr.Wrapf(err, "saving json to %s", tempFileName)
}
// Upload the written file.
return g.UploadBytes(ctx, nil, tempFileName, prefixGCS(gcsObjPath))
}
// prefixGCS adds the "gs://" prefix to the given GCS path.
func prefixGCS(gcsPath string) string {
return fmt.Sprintf(gcsPrefix+"%s", gcsPath)
}
// UploadBytes shells out to gsutil to copy the given src to the given target. A path
// starting with "gs://" is assumed to be in GCS.
func (g *GsutilImpl) UploadBytes(ctx context.Context, _ []byte, fallbackSrc, dst string) error {
return g.gsutilCmd(ctx, "cp", fallbackSrc, dst)
}
// gsutilCmd executes a given command using the local gsutil executable (or python script, if
// on Windows).
func (g *GsutilImpl) gsutilCmd(ctx context.Context, cmd ...string) error {
var outBuf bytes.Buffer
runCmd := &exec.Command{
Name: "gsutil",
Args: cmd,
CombinedOutput: &outBuf,
}
if err := exec.Run(ctx, runCmd); err != nil {
if runtime.GOOS == "windows" {
cmd = append([]string{"gsutil.py"}, cmd...)
runCmd = &exec.Command{
Name: "python",
Args: cmd,
CombinedOutput: &outBuf,
}
if err := exec.Run(ctx, runCmd); err != nil {
return skerr.Wrapf(err, "running gsutil on windows. Got output \n%s\n", outBuf.String())
}
} else {
return skerr.Wrapf(err, "running gsutil. Got output \n%s\n", outBuf.String())
}
}
return nil
}
// clientImpl implements the GCSUploader interface using an authenticated
// (via an OAuth service account) http client.
type clientImpl struct {
client *gstorage.Client
}
func New(ctx context.Context, httpClient *http.Client) (*clientImpl, error) {
ret := &clientImpl{}
var err error
ret.client, err = gstorage.NewClient(ctx, option.WithHTTPClient(httpClient))
if err != nil {
return nil, skerr.Wrapf(err, "instantiating storage client")
}
return ret, nil
}
// UploadBytes implements the GCSUploader interface.
func (h *clientImpl) UploadBytes(ctx context.Context, data []byte, fallbackSrc, dst string) error {
if len(data) == 0 {
if strings.HasPrefix(fallbackSrc, gcsPrefix) {
return skerr.Fmt("Copying from a remote file is not supported")
}
var err error
data, err = ioutil.ReadFile(fallbackSrc)
if err != nil {
return skerr.Wrapf(err, "reading file %s", fallbackSrc)
}
}
return h.uploadToGCS(ctx, data, dst)
}
// UploadJSON implements the GCSUploader interface.
func (h *clientImpl) UploadJSON(ctx context.Context, data interface{}, _, gcsObjectPath string) error {
jsonBytes, err := json.Marshal(data)
if err != nil {
return skerr.Wrap(err)
}
return h.uploadToGCS(ctx, jsonBytes, gcsObjectPath)
}
// uploadToGCS takes the given bytes and uploads them to the destination GCS object.
func (h *clientImpl) uploadToGCS(ctx context.Context, data []byte, dst string) error {
// Trim the prefix and upload the content to the cloud.
dst = strings.TrimPrefix(dst, gcsPrefix)
bucket, objPath := gcs.SplitGSPath(dst)
handle := h.client.Bucket(bucket).Object(objPath)
isFirstTime := true
eb := backoff.NewExponentialBackOff()
eb.InitialInterval = time.Second
eb.MaxInterval = 10 * time.Second
eb.MaxElapsedTime = 30 * time.Second
err := backoff.Retry(func() error {
if err := ctx.Err(); err != nil {
return backoff.Permanent(err)
}
// To avoid slowing down every upload with an extra check, we only check if the file
// already exists on the first failure (e.g. 503 that happens if multiple processes are
// writing the same file at once).
if !isFirstTime {
if _, err := handle.Attrs(ctx); err == nil {
fmt.Printf("Skipping upload of %s - already exists on server\n", dst)
return nil
}
}
isFirstTime = false
ctx, cancel := context.WithCancel(ctx)
defer cancel() // The docs say to cancel the context in the event of an error or success.
w := handle.NewWriter(ctx)
w.ChunkSize = 0 // Upload files in one go, to avoid conflicts if multiple processes are
// uploading at the same time.
_, err := w.Write(data)
if err != nil {
fmt.Printf("Error writing to %s: %s (retrying)\n", dst, err)
return skerr.Wrap(err)
}
if err := w.Close(); err != nil {
fmt.Printf("Error closing file %s: %s (retrying)\n", dst, err)
return skerr.Wrap(err)
}
return nil
}, eb)
if err != nil {
return skerr.Wrapf(err, "uploading %d bytes to %s with exponential retries", len(data), dst)
}
return nil
}
// DryRunImpl implements the GCSUploader and ImageDownloader interfaces (but doesn't
// actually upload or download anything)
type DryRunImpl struct{}
// UploadBytes implements the GCSUploader interface.
func (h *DryRunImpl) UploadBytes(_ context.Context, _ []byte, fallbackSrc, dst string) error {
fmt.Printf("dryrun -- upload bytes from %s to %s\n", fallbackSrc, dst)
return nil
}
// UploadJSON implements the GCSUploader interface.
func (h *DryRunImpl) UploadJSON(_ context.Context, _ interface{}, tempFileName, gcsObjectPath string) error {
fmt.Printf("dryrun -- upload JSON from %s to %s\n", tempFileName, gcsObjectPath)
return nil
}
// Make sure GsutilImpl fulfills the GCSUploader interface.
var _ GCSUploader = (*GsutilImpl)(nil)
// Make sure DryRunImpl fulfills the GCSUploader interface.
var _ GCSUploader = (*DryRunImpl)(nil)