blob: 8343eced686b7f6138e234e6bb69d320076f2264 [file] [log] [blame]
package goldingestion
import (
"context"
"fmt"
"strconv"
"go.skia.org/infra/go/sharedb"
"go.skia.org/infra/go/sklog"
"go.skia.org/infra/go/util"
)
const (
DB_NAME = "ingestion-metadata"
BUCKET_NAME = "ingested"
)
type IngestionStore struct {
client *sharedb.ShareDB
ctx context.Context
}
// TODO(stephana): The meta data store needs a background process to remove meta data we are no longer
// interested in.
// NewIngestionStore allows to store and retrieve meta data about the ingestion process.
// serverAddr specifies the sharedb server. ingesterID is the unique id of the ingester,
// it will be used to prefix database names.
func NewIngestionStore(serverAddr string) (*IngestionStore, error) {
client, err := sharedb.New(serverAddr)
if err != nil {
return nil, err
}
return &IngestionStore{
client: client,
ctx: context.Background(),
}, nil
}
// Close the connection to the RPC service.
func (i *IngestionStore) Close() error {
return i.client.Close()
}
// IsIngested returns true if the results for the given master/builder/buildnumber have been processed.
func (i *IngestionStore) IsIngested(ingesterID, master, builder string, buildNumber int64) bool {
resp, err := i.client.Get(i.ctx, &sharedb.GetRequest{Database: DB_NAME, Bucket: getBucketName(ingesterID), Key: getKey(master, builder, buildNumber)})
if err != nil {
sklog.Errorf("Error querying ingestion store: %s", err)
return false
}
return resp.Value != nil
}
// Add adds the given master/builder/buildNumber with a timestamp to the data store.
func (i *IngestionStore) Add(ingesterID, master, builder string, buildNumber int64) error {
value := []byte(strconv.FormatInt(util.TimeStampMs(), 10))
_, err := i.client.Put(i.ctx, &sharedb.PutRequest{Database: DB_NAME, Bucket: getBucketName(ingesterID), Key: getKey(master, builder, buildNumber), Value: value})
return err
}
func getBucketName(ingesterID string) string {
return BUCKET_NAME + "-" + ingesterID
}
func getKey(master, builder string, buildNumber int64) string {
return fmt.Sprintf("%s:%s:%010d", master, builder, buildNumber)
}