blob: 4dce43cf1de49091539355f27e554d420d835d6b [file] [log] [blame]
package sharedb
// Generate the go code from the protocol buffer definitions.
//go:generate protoc --go_out=plugins=grpc:. sharedb.proto
import (
"bytes"
"fmt"
"io/ioutil"
"path/filepath"
"strings"
"sync"
"github.com/boltdb/bolt"
"go.skia.org/infra/go/fileutil"
"golang.org/x/net/context"
"google.golang.org/grpc"
)
// ShareDB is a wrapper around ShareDBClient to add convenience methods.
type ShareDB struct {
*shareDBClient
}
// New returns a ShareDBClient that is connected to the given server address.
// The returned client can then be used to make RPC calls.
func New(serverAddress string) (*ShareDB, error) {
conn, err := grpc.Dial(serverAddress, grpc.WithInsecure())
if err != nil {
return nil, err
}
return &ShareDB{
shareDBClient: NewShareDBClient(conn).(*shareDBClient),
}, nil
}
// Close the underlying GRCP connection.
func (s *ShareDB) Close() error {
return s.cc.Close()
}
// rpcServer implements the ShareDBServer define in the sharedb.proto file.
// This implementation is based on BoltDB. It stores key-value pairs that
// are addressable via: database/bucket/key.
// Each database maps to a file on the filesystem. The keys within a bucket
// are unique.
type rpcServer struct {
dataDir string
databases map[string]*bolt.DB
dbsMutex sync.Mutex
}
// NewServer returns a instance that implements the ShareDBServer interface that
// was generated via the sharedb.proto file.
// It can then be used to run an RPC server. See tests for details.
func NewServer(dataDir string) ShareDBServer {
ret := &rpcServer{
dataDir: fileutil.Must(fileutil.EnsureDirExists(dataDir)),
databases: map[string]*bolt.DB{},
}
return ret
}
// Get returns the value for the given key identified by the tripple
// (database, bucket, key) in the provided KeyRequest instance.
func (r *rpcServer) Get(ctx context.Context, req *GetRequest) (*GetResponse, error) {
db, err := r.getDB(req.Database, false)
if err != nil {
return nil, err
}
// If there is no database we are done.
if db == nil {
return nil, nil
}
result := GetResponse{}
return &result, db.View(func(tx *bolt.Tx) error {
bucket := tx.Bucket([]byte(req.Bucket))
if bucket == nil {
return nil
}
val := bucket.Get([]byte(req.Key))
result.Value = make([]byte, len(val))
copy(result.Value, val)
return nil
})
}
// Put stores the value provided by the tuple (database, bucket, key, value) in
// the provided instance of KeyValueRequest.
// If the write succeeded, the return value AckReply.Ok will be true.
func (r *rpcServer) Put(ctx context.Context, req *PutRequest) (*PutResponse, error) {
db, err := r.getDB(req.Database, true)
if err != nil {
return &PutResponse{false}, err
}
err = db.Batch(func(tx *bolt.Tx) error {
bucket, err := tx.CreateBucketIfNotExists([]byte(req.Bucket))
if err != nil {
return err
}
return bucket.Put([]byte(req.Key), req.Value)
})
return &PutResponse{err == nil}, err
}
// Delete removes the key identified by (database, bucket, key) in KeyRequest.
// If the write succeeded, the return value AckReply.Ok will be true.
func (r *rpcServer) Delete(ctx context.Context, req *DeleteRequest) (*DeleteResponse, error) {
result := &DeleteResponse{}
db, err := r.getDB(req.Database, false)
if err != nil {
return result, err
}
// If there is no database we are done.
if db == nil {
result.Ok = true
return result, nil
}
err = db.Batch(func(tx *bolt.Tx) error {
bucket := tx.Bucket([]byte(req.Bucket))
if bucket == nil {
return nil
}
return bucket.Delete([]byte(req.Key))
})
result.Ok = (err == nil)
return result, err
}
// Databases returns the list of all databases currently managed by this server.
func (r *rpcServer) Databases(ctx context.Context, req *DatabasesRequest) (*DatabasesResponse, error) {
result := &DatabasesResponse{}
fileInfos, err := ioutil.ReadDir(r.dataDir)
if err != nil {
return result, err
}
dbs := make([]string, 0, len(fileInfos))
for _, fi := range fileInfos {
name := fi.Name()
if strings.HasSuffix(name, ".db") {
dbs = append(dbs, strings.TrimSuffix(name, ".db"))
}
}
result.Values = dbs
return result, nil
}
// Buckets returns the list of all buckets in the specified database.
func (r *rpcServer) Buckets(ctx context.Context, req *BucketsRequest) (*BucketsResponse, error) {
result := &BucketsResponse{}
db, err := r.getDB(req.Database, false)
if err != nil {
return result, err
}
result.Values = []string{}
if db == nil {
return result, nil
}
return result, db.View(func(tx *bolt.Tx) error {
cursor := tx.Cursor()
for k, _ := cursor.First(); k != nil; k, _ = cursor.Next() {
result.Values = append(result.Values, string(k))
}
return nil
})
}
// Keys returns the list of all keys in the specified bucket.
func (r *rpcServer) Keys(ctx context.Context, req *KeysRequest) (*KeysResponse, error) {
result := &KeysResponse{}
db, err := r.getDB(req.Database, false)
if err != nil {
return result, err
}
result.Values = []string{}
if db == nil {
return result, nil
}
// continueScan is set depending if we have a prefix scan or a
// minPrefix - maxPrefix range scan.
continueScan := func(k []byte) bool { return true }
minKey := []byte(req.Prefix)
switch {
case req.MaxPrefix != "":
continueScan = func(k []byte) bool { return bytes.Compare(k, []byte(req.MaxPrefix)) <= 0 }
fallthrough
case req.MinPrefix != "":
minKey = []byte(req.MinPrefix)
case req.Prefix != "":
continueScan = func(k []byte) bool { return bytes.HasPrefix(k, []byte(req.Prefix)) }
}
return result, db.View(func(tx *bolt.Tx) error {
bucket := tx.Bucket([]byte(req.Bucket))
if bucket == nil {
return nil
}
cursor := bucket.Cursor()
for k, _ := cursor.Seek(minKey); (k != nil) && continueScan(k); k, _ = cursor.Next() {
result.Values = append(result.Values, string(k))
}
return nil
})
}
// getDB returns a BoltDB if the instance exists in the internal map of
// databases or on disk. Otherwise it will create the database on disk if
// the 'create' parameter is true. If the database does not exist and create
// is false, it will return (nil, nil).
func (r *rpcServer) getDB(database string, create bool) (*bolt.DB, error) {
r.dbsMutex.Lock()
defer r.dbsMutex.Unlock()
db, ok := r.databases[database]
if ok {
return db, nil
}
// Check if the database exists on disk.
databaseFile := filepath.Join(r.dataDir, database+".db")
if !create && !fileutil.FileExists(databaseFile) {
return nil, fmt.Errorf("Database %s does not exist.", database)
}
db, err := bolt.Open(databaseFile, 0644, nil)
if err != nil {
return nil, err
}
r.databases[database] = db
return db, nil
}