blob: 1b8d9de0aa1f0d7bc8a8a68f8259d8f4e40cb534 [file] [log] [blame]
package dsutil
import (
"bytes"
"context"
"encoding/json"
"math"
"cloud.google.com/go/datastore"
"go.skia.org/infra/go/ds"
"go.skia.org/infra/go/sklog"
"go.skia.org/infra/go/util"
"golang.org/x/sync/errgroup"
)
// maxBlobFragBytes is the size of a blob fragment and slightly smaller than a
// the maximum size of an entity which is 1MiB - 4
const maxBlobFragBytes = 1000 * 1000
// BlobStore is a utility type to store data encoded as JSON in blobs. This is
// intended for blobs that are larger than 1MiB. It breaks the blob into multiple
// entities that can be addressed via a single key.
type BlobStore struct {
client *datastore.Client
blobKind ds.Kind
blobFragKind ds.Kind
}
// NewBlobStore creates a new instance of BlobStore.
// 'blobKind' is the name of the entity used to store the blob index.
// 'blobFragKind' is the name fo the entity used to store the JSON encoded fragments
// of the blob
func NewBlobStore(client *datastore.Client, blobKind ds.Kind, blobFragKind ds.Kind) *BlobStore {
return &BlobStore{
client: client,
blobKind: blobKind,
blobFragKind: blobFragKind,
}
}
// blobParent serves as the index of the blob, it holds the keys of the blob
// fragments that make up the blob. The fragments are intended to be
// dis/assembled in the order of 'Children'
type blobParent struct {
Children []*datastore.Key `datastore:",noindex"`
}
// blobFrag contains one fragment of the JSON encoded blob data.
type blobFrag struct {
Body []byte `datastore:",noindex"`
}
// Load loads the blob data referred to by the given key into 'dst'. The same
// rules apply to 'dst' as for json.Decoder.Decode(...)
func (b *BlobStore) Load(key *datastore.Key, dst interface{}) error {
ctx := context.TODO()
blob := &blobParent{}
if err := b.client.Get(ctx, key, blob); err != nil {
return err
}
return b.readBlobData(blob.Children, dst)
}
// Save stores the given data in a blob, which can then be referenced by the
// returned key. Internally the data are json encoded, so the same rules apply
// to 'data' as for json.Encoder.Encode(...)
func (b *BlobStore) Save(data interface{}) (key *datastore.Key, err error) {
ctx := context.TODO()
blob := &blobParent{}
blob.Children, err = b.writeBlobData(data)
if err != nil {
return nil, err
}
// If we fail we need to purge all the entities we create along the way
actions := &TxActions{}
actions.AddRollbackFn(func() error { return b.client.DeleteMulti(ctx, blob.Children) })
defer func() { actions.Run(err) }()
key = ds.NewKey(b.blobKind)
return b.client.Put(ctx, key, blob)
}
// Delete deletes the blob identified by key.
func (b *BlobStore) Delete(key *datastore.Key) error {
var egroup errgroup.Group
ctx := context.TODO()
blob := &blobParent{}
if err := b.client.Get(ctx, key, blob); err != nil {
return err
}
// Delete the parent and all the children.
egroup.Go(func() error { return b.client.Delete(ctx, key) })
egroup.Go(func() error { return b.client.DeleteMulti(ctx, blob.Children) })
return egroup.Wait()
}
// writeBlobData encodes the given object as JSON and stores it in a
// sequence of entities. It returns the keys of the newly created entities that
// contain the blob.
func (b *BlobStore) writeBlobData(value interface{}) ([]*datastore.Key, error) {
blobParts, err := jsonEncodeBlobParts(value)
if err != nil {
return nil, err
}
keys := make([]*datastore.Key, len(blobParts))
var egroup errgroup.Group
ctx := context.TODO()
for idx, part := range blobParts {
func(idx int, part *blobFrag) {
egroup.Go(func() error {
var err error
keys[idx], err = b.client.Put(ctx, ds.NewKey(b.blobFragKind), part)
return err
})
}(idx, part)
}
if err := egroup.Wait(); err != nil {
return nil, err
}
return keys, nil
}
// readBlobData reads the blob parts identified by the given list of keys and
// concatenate their Body fields. The result is then JSON decoded into 'dst'.
// The same rules for dst apply as for json.Decoder.Decode(...).
func (b *BlobStore) readBlobData(keys []*datastore.Key, dst interface{}) error {
ctx := context.TODO()
var egroup errgroup.Group
blobParts := make([]*blobFrag, len(keys))
for idx, key := range keys {
func(idx int, key *datastore.Key) {
egroup.Go(func() error {
blobParts[idx] = &blobFrag{}
if err := b.client.Get(ctx, key, blobParts[idx]); err != nil {
return sklog.FmtErrorf("Error loading blob data %s", err)
}
return nil
})
}(idx, key)
}
if err := egroup.Wait(); err != nil {
return err
}
if err := jsonDecodeBlobParts(blobParts, dst); err != nil {
return sklog.FmtErrorf("Error decoding JSON from blobs: %s", err)
}
return nil
}
// jsonEncodeBlobParts encodes the given object to a JSON and spreads it over a
// sequence of blobFrag instances that are within the limits of entities stored
// in cloud datastore.
func jsonEncodeBlobParts(src interface{}) ([]*blobFrag, error) {
var buf bytes.Buffer
if err := json.NewEncoder(&buf).Encode(src); err != nil {
return nil, err
}
body := buf.Bytes()
nPages := int(math.Ceil(float64(len(body)) / float64(maxBlobFragBytes)))
ret := make([]*blobFrag, nPages)
for p := 0; p < nPages; p++ {
start := p * maxBlobFragBytes
ret[p] = &blobFrag{
Body: body[start:util.MinInt(start+maxBlobFragBytes, len(body))],
}
}
return ret, nil
}
// jsonDecodeBlobParts decodes the JSON contained in the sequence of blobPart
// instances.
func jsonDecodeBlobParts(parts []*blobFrag, dst interface{}) error {
var buf bytes.Buffer
for _, part := range parts {
if _, err := buf.Write(part.Body); err != nil {
return err
}
}
return json.NewDecoder(&buf).Decode(dst)
}