blob: edc995bbdb221f31a630bc4ac541c57c10a6f389 [file] [log] [blame]
package events
import (
"bytes"
"encoding/gob"
"fmt"
"time"
"github.com/boltdb/bolt"
"go.skia.org/infra/go/util"
)
// EventDB is an interface used for storing Events in a BoltDB.
type EventDB interface {
Append(string, []byte) error
Close() error
Insert(*Event) error
Range(string, time.Time, time.Time) ([]*Event, error)
}
// eventDB is a struct used for storing Events in a BoltDB.
type eventDB struct {
db *bolt.DB
}
// NewEventDB returns an EventDB instance.
func NewEventDB(filename string) (EventDB, error) {
db, err := bolt.Open(filename, 0600, nil)
if err != nil {
return nil, err
}
rv := &eventDB{
db: db,
}
return rv, nil
}
// Close cleans up the eventDB.
func (m *eventDB) Close() error {
return m.db.Close()
}
// Insert inserts the given Event into DB.
func (m *eventDB) Insert(e *Event) error {
if util.TimeIsZero(e.Timestamp) {
return fmt.Errorf("Cannot insert an event without a timestamp.")
}
if e.Stream == "" {
return fmt.Errorf("Cannot insert an event without a stream.")
}
k, err := encodeKey(e.Timestamp)
if err != nil {
return err
}
var buf bytes.Buffer
if err := gob.NewEncoder(&buf).Encode(e); err != nil {
return err
}
return m.db.Update(func(tx *bolt.Tx) error {
b, err := tx.CreateBucketIfNotExists([]byte(e.Stream))
if err != nil {
return err
}
return b.Put(k, buf.Bytes())
})
}
// Append inserts the given data into the given stream at the current time.
func (m *eventDB) Append(stream string, data []byte) error {
return m.Insert(&Event{
Stream: stream,
Timestamp: time.Now(),
Data: data,
})
}
// Range returns all Events in the given range from the given stream.
func (m *eventDB) Range(stream string, start, end time.Time) ([]*Event, error) {
min, err := encodeKey(start)
if err != nil {
return nil, err
}
max, err := encodeKey(end)
if err != nil {
return nil, err
}
rv := []*Event{}
if err := m.db.View(func(tx *bolt.Tx) error {
b := tx.Bucket([]byte(stream))
if b == nil {
return nil
}
c := b.Cursor()
for k, v := c.Seek(min); k != nil && bytes.Compare(k, max) <= 0; k, v = c.Next() {
e := new(Event)
if err := gob.NewDecoder(bytes.NewBuffer(v)).Decode(e); err != nil {
return err
}
rv = append(rv, e)
}
return nil
}); err != nil {
return nil, err
}
return rv, nil
}