blob: 45650c811465b2a665501295cd959b19a633f72a [file] [log] [blame]
package diffstore
import (
"bytes"
"context"
"crypto/md5"
"encoding/hex"
"errors"
"fmt"
"image"
"io"
"path"
"sync"
"time"
"go.skia.org/infra/go/gcs"
"go.skia.org/infra/go/rtcache"
"go.skia.org/infra/go/skerr"
"go.skia.org/infra/go/sklog"
"go.skia.org/infra/go/util"
"go.skia.org/infra/golden/go/diff"
"go.skia.org/infra/golden/go/diffstore/common"
"go.skia.org/infra/golden/go/diffstore/failurestore"
"go.skia.org/infra/golden/go/diffstore/mapper"
"go.skia.org/infra/golden/go/types"
)
const (
// MAX_URI_GET_TRIES is the number of tries we do to load an image.
MAX_URI_GET_TRIES = 4
// Number of concurrent workers downloading images.
N_IMG_WORKERS = 10
)
// ImageLoader facilitates to continuously download images and cache them in RAM.
type ImageLoader struct {
// gsBucketClient targets the specific bucket where the images are stored in GCS.
gsBucketClient gcs.GCSClient
// gsImageBaseDir is the GCS directory (prefix) where images are stored.
gsImageBaseDir string
// imageCache caches and calculates images.
imageCache rtcache.ReadThroughCache
// failureStore persists failures in retrieving images.
failureStore failurestore.FailureStore
// mapper contains various functions for creating image IDs and paths.
mapper mapper.Mapper
}
// getGSRelPath returns the GCS path for a given image ID (excluding the bucket).
func getGSRelPath(imageID types.Digest) string {
return fmt.Sprintf("%s.%s", imageID, common.IMG_EXTENSION)
}
// Creates a new instance of ImageLoader.
func NewImgLoader(client gcs.GCSClient, fStore failurestore.FailureStore, gsImageBaseDir string, maxCacheSize int, m mapper.Mapper) (*ImageLoader, error) {
ret := &ImageLoader{
gsBucketClient: client,
gsImageBaseDir: gsImageBaseDir,
failureStore: fStore,
mapper: m,
}
// Set up the work queues that balance the load.
var err error
if ret.imageCache, err = rtcache.New(func(priority int64, digest string) (interface{}, error) {
return ret.imageLoadWorker(priority, types.Digest(digest))
}, maxCacheSize, N_IMG_WORKERS); err != nil {
return nil, err
}
return ret, nil
}
// Warm makes sure the images are cached.
// If synchronous is true the call blocks until all images are fetched.
// It works in sync with Get, any image that is scheduled to be retrieved by Get
// will not be fetched again.
func (il *ImageLoader) Warm(priority int64, images types.DigestSlice, synchronous bool) {
var wg sync.WaitGroup
wg.Add(1)
go func() {
defer wg.Done()
if _, err := il.Get(priority, images); err != nil {
sklog.Errorf("Unable to warm images. Got error: %s", err)
}
}()
if synchronous {
// Wait for the get to finish.
wg.Wait()
}
}
// errResult is a helper type to capture error information in Get(...).
type errResult struct {
err error
id types.Digest
}
// Get returns the images identified by digests and returns them as NRGBA images.
// Priority determines the order in which multiple concurrent calls are processed.
func (il *ImageLoader) Get(priority int64, images types.DigestSlice) ([]*image.NRGBA, error) {
// Parallel load the requested images.
result := make([]*image.NRGBA, len(images))
errCh := make(chan errResult, len(images))
sklog.Debugf("About to Get %d images.", len(images))
var wg sync.WaitGroup
wg.Add(len(images))
for idx, id := range images {
go func(idx int, id types.Digest) {
defer wg.Done()
img, err := il.imageCache.Get(priority, string(id))
if err != nil {
errCh <- errResult{err: err, id: id}
} else {
result[idx] = img.(*image.NRGBA)
}
}(idx, id)
}
wg.Wait()
sklog.Debugf("Done getting images.")
if len(errCh) > 0 {
close(errCh)
var msg bytes.Buffer
for errRet := range errCh {
_, _ = msg.WriteString(errRet.err.Error())
_, _ = msg.WriteString("\n")
// This captures the edge case when the error is cached in the image loader.
util.LogErr(il.failureStore.AddDigestFailureIfNew(diff.NewDigestFailure(errRet.id, diff.OTHER)))
}
return nil, errors.New(msg.String())
}
return result, nil
}
// Contains returns true if the image with the given digest is present in the in-memory cache.
func (il *ImageLoader) Contains(image types.Digest) bool {
return il.imageCache.Contains(string(image))
}
// PurgeImages removes the images that correspond to the given digests.
func (il *ImageLoader) PurgeImages(images types.DigestSlice, purgeGCS bool) error {
for _, id := range images {
il.imageCache.Remove([]string{string(id)})
if purgeGCS {
gsRelPath := getGSRelPath(id)
il.removeImg(gsRelPath)
}
}
return nil
}
// imageLoadWorker implements the rtcache.ReadThroughFunc signature.
// It loads an image file from Google storage.
func (il *ImageLoader) imageLoadWorker(priority int64, imageID types.Digest) (interface{}, error) {
sklog.Debugf("Downloading (and caching) image with ID %s", imageID)
// Download the image from GCS.
gsRelPath := getGSRelPath(imageID)
imgBytes, err := il.downloadImg(gsRelPath)
if err != nil {
util.LogErr(il.failureStore.AddDigestFailure(diff.NewDigestFailure(imageID, diff.HTTP)))
return nil, err
}
// Decode it and return it.
img, err := common.DecodeImg(bytes.NewBuffer(imgBytes))
if err != nil {
util.LogErr(il.failureStore.AddDigestFailure(diff.NewDigestFailure(imageID, diff.CORRUPTED)))
return nil, err
}
return img, nil
}
// downloadImg retrieves the given image from Google storage.
func (il *ImageLoader) downloadImg(gsPath string) ([]byte, error) {
ctx := context.TODO()
objLocation := path.Join(il.gsImageBaseDir, gsPath)
// Retrieve the attributes.
attrs, err := il.gsBucketClient.GetFileObjectAttrs(ctx, objLocation)
if err != nil {
return nil, skerr.Wrapf(err, "Unable to retrieve attributes for %s/%s.", il.gsBucketClient.Bucket(), objLocation)
}
var buf *bytes.Buffer
for i := 0; i < MAX_URI_GET_TRIES; i++ {
if i > 0 {
sklog.Infof("after error, sleeping 2s before GCS fetch for gs://%s/%s", il.gsBucketClient.Bucket(), objLocation)
// This is an arbitrary amount of time to wait.
// TODO(kjlubick): should this be exponential backoff?
time.Sleep(2 * time.Second)
}
err = func() error {
// Create reader.
reader, err := il.gsBucketClient.FileReader(ctx, objLocation)
if err != nil {
return skerr.Wrapf(err, "New reader failed for %s/%s.", il.gsBucketClient.Bucket(), objLocation)
}
defer util.Close(reader)
// Read file.
size := attrs.Size
buf = bytes.NewBuffer(make([]byte, 0, size))
md5Hash := md5.New()
multiOut := io.MultiWriter(md5Hash, buf)
if _, err = io.Copy(multiOut, reader); err != nil {
return err
}
// Check the MD5.
if hashBytes := md5Hash.Sum(nil); !bytes.Equal(hashBytes, attrs.MD5) {
return skerr.Fmt("MD5 hash for digest %s incorrect: computed hash is %s.", objLocation, hex.EncodeToString(hashBytes))
}
return nil
}()
if err == nil {
break
}
sklog.Errorf("Error fetching file for path %s: %s", objLocation, err)
}
if err != nil {
sklog.Errorf("Failed fetching file after %d attempts", MAX_URI_GET_TRIES)
return nil, err
}
sklog.Infof("Done downloading image for: %s. Length: %d bytes", objLocation, buf.Len())
return buf.Bytes(), nil
}
// removeImg removes the image that corresponds to the given relative path from GCS.
func (il *ImageLoader) removeImg(gsRelPath string) {
// If the bucket is not empty then look there otherwise use the default buckets.
objLocation := path.Join(il.gsImageBaseDir, gsRelPath)
ctx := context.Background()
// Retrieve the attributes to test if the file exists.
_, err := il.gsBucketClient.GetFileObjectAttrs(ctx, objLocation)
if err != nil {
sklog.Errorf("Unable to retrieve attributes for existing object at %s. Got error: %s", objLocation, err)
return
}
// Log an error and continue to the next bucket if we cannot delete the existing file.
if err := il.gsBucketClient.DeleteFile(ctx, objLocation); err != nil {
sklog.Errorf("Unable to delete existing object at %s. Got error: %s", objLocation, err)
}
}