blob: 82c3eac3b64bfd743f4d20373f563dce27172952 [file] [log] [blame]
package events
import (
"bytes"
"context"
"encoding/gob"
"fmt"
"time"
"cloud.google.com/go/bigtable"
"go.skia.org/infra/go/util"
"golang.org/x/oauth2"
"google.golang.org/api/option"
)
const (
// BigTable configuration.
// We use a single BigTable table to store all event metrics.
BT_TABLE = "metrics-eventdb"
// We use a single BigTable column family.
BT_COLUMN_FAMILY = "EVTS"
// We use a single BigTable column which stores gob-encoded Events.
BT_COLUMN = "EVT"
INSERT_TIMEOUT = 30 * time.Second
QUERY_TIMEOUT = 3 * 60 * time.Second
)
var (
// Fully-qualified BigTable column name.
BT_COLUMN_FULL = fmt.Sprintf("%s:%s", BT_COLUMN_FAMILY, BT_COLUMN)
)
// btRowKey returns a BigTable row key for the given stream and timestamp.
func btRowKey(stream string, ts time.Time) string {
return fmt.Sprintf("%s#%s", stream, ts.UTC().Format(util.SAFE_TIMESTAMP_FORMAT))
}
// BigTable implementation of EventDB.
type btEventDB struct {
client *bigtable.Client
table *bigtable.Table
}
// NewBTEventDB returns an EventDB which is backed by BigTable.
func NewBTEventDB(ctx context.Context, btProject, btInstance string, ts oauth2.TokenSource) (EventDB, error) {
client, err := bigtable.NewClient(ctx, btProject, btInstance, option.WithTokenSource(ts))
if err != nil {
return nil, fmt.Errorf("Failed to create BigTable client: %s", err)
}
table := client.Open(BT_TABLE)
return &btEventDB{
table: table,
}, nil
}
// See documentation for EventDB interface.
func (db *btEventDB) Append(stream string, data []byte) error {
return db.Insert(&Event{
Stream: stream,
Timestamp: time.Now(),
Data: data,
})
}
// See documentation for EventDB interface.
func (db *btEventDB) Close() error {
return db.client.Close()
}
// See documentation for EventDB interface.
func (db *btEventDB) Insert(e *Event) error {
if util.TimeIsZero(e.Timestamp) {
return fmt.Errorf("Cannot insert an event without a timestamp.")
}
if e.Stream == "" {
return fmt.Errorf("Cannot insert an event without a stream.")
}
rk := btRowKey(e.Stream, e.Timestamp)
var buf bytes.Buffer
if err := gob.NewEncoder(&buf).Encode(e); err != nil {
return err
}
mut := bigtable.NewMutation()
mut.Set(BT_COLUMN_FAMILY, BT_COLUMN, bigtable.ServerTime, buf.Bytes())
ctx, cancel := context.WithTimeout(context.TODO(), INSERT_TIMEOUT)
defer cancel()
return db.table.Apply(ctx, rk, mut)
}
// See documentation for EventDB interface.
func (db *btEventDB) Range(stream string, start, end time.Time) ([]*Event, error) {
var rv []*Event
if err := util.IterTimeChunks(start, end, 24*time.Hour, func(start, end time.Time) error {
var rvErr error
s := btRowKey(stream, start)
e := btRowKey(stream, end)
ctx, cancel := context.WithTimeout(context.TODO(), QUERY_TIMEOUT)
defer cancel()
if err := db.table.ReadRows(ctx, bigtable.NewRange(s, e), func(row bigtable.Row) bool {
for _, ri := range row[BT_COLUMN_FAMILY] {
if ri.Column == BT_COLUMN_FULL {
var ev Event
rvErr = gob.NewDecoder(bytes.NewReader(ri.Value)).Decode(&ev)
if rvErr != nil {
return false
}
rv = append(rv, &ev)
// We only store one event per row.
return true
}
}
return true
}); err != nil {
return err
}
return rvErr
}); err != nil {
return nil, err
}
return rv, nil
}