blob: 0d97a0235748b61b07d870b34e2d76c817e12971 [file] [log] [blame]
package local_db
import (
const (
// DB_NAME is the name of the database.
DB_NAME = "task_scheduler_db"
// DB_FILENAME is the name of the file in which the database is stored.
DB_FILENAME = "task_scheduler.bdb"
// BUCKET_TASKS is the name of the Tasks bucket. Key is Task.Id, which is set
// to (creation time, sequence number) (see formatId for detail), value is
// described in docs for BUCKET_TASKS_VERSION. Tasks will be updated in place.
// All repos share the same bucket.
BUCKET_TASKS = "tasks"
// BUCKET_TASKS_FILL_PERCENT is the value to set for bolt.Bucket.FillPercent
// for BUCKET_TASKS. BUCKET_TASKS will be append-mostly, so use a high fill
// percent.
// BUCKET_TASKS_VERSION indicates the format of the value of BUCKET_TASKS
// written by PutTasks. Retrieving Tasks from the DB must support all previous
// versions. For all versions, the first byte is the version number.
// Version 1: v[0] = 1; v[1:9] is the modified time as UnixNano encoded as
// big endian; v[9:] is the GOB of the Task.
// BUCKET_JOBS is the name of the Jobs bucket. Key is Job.Id, which is set to
// (creation time, sequence number) (see formatId for detail), value is
// described in docs for BUCKET_JOBS_VERSION. Jobs will be updated in place.
// All repos share the same bucket.
BUCKET_JOBS = "jobs"
// BUCKET_JOBS_FILL_PERCENT is the value to set for bolt.Bucket.FillPercent
// for BUCKET_JOBS. BUCKET_JOBS will be append-mostly, so use a high fill
// percent.
// BUCKET_JOBS_VERSION indicates the format of the value of BUCKET_JOBS
// written by PutJobs. Retrieving Jobs from the DB must support all previous
// versions. For all versions, the first byte is the version number.
// Version 1: v[0] = 1; v[1:9] is the modified time as UnixNano encoded as
// big endian; v[9:] is the GOB of the Job.
// BUCKET_COMMENTS is the name of the comments bucket. Key is KEY_COMMENT_MAP,
// value is the GOB of the map provided by db.CommentBox. The comment map will
// be updated in place. All repos share the same bucket.
BUCKET_COMMENTS = "comments"
KEY_COMMENT_MAP = "comment-map"
// BUCKET_BACKUP is the name of the backup bucket. Key is
// KEY_INCREMENTAL_BACKUP_TIME, value is time.Time.MarshalBinary. The value
// will be updated in place.
BUCKET_BACKUP = "backup"
// TIMESTAMP_FORMAT is a format string passed to Time.Format and time.Parse to
// format/parse the timestamp in the Task ID. It is similar to
// util.RFC3339NanoZeroPad, but since Task.Id can not contain colons, we omit
// most of the punctuation. This timestamp can only be used to format and
// parse times in UTC.
TIMESTAMP_FORMAT = "20060102T150405.000000000Z"
// SEQUENCE_NUMBER_FORMAT is a format string passed to fmt.Sprintf or
// fmt.Sscanf to format/parse the sequence number in the Task ID. It is a
// 16-digit zero-padded lowercase hexidecimal number.
// MAX_CREATED_TIME_SKEW is the maximum difference between the timestamp in a
// Task's Id field and that Task's Created field. This allows AssignId to be
// called before creating the Swarming task so that the Id can be included in
// the Swarming task tags. GetTasksFromDateRange accounts for this skew when
// retrieving tasks. This value can be increased in the future, but can never
// be decreased.
// 6 minutes is based on httputils.DIAL_TIMEOUT + httputils.REQUEST_TIMEOUT,
// which is assumed to be the approximate maximum duration of a successful
// swarming.ApiClient.TriggerTask() call.
MAX_CREATED_TIME_SKEW = 6 * time.Minute
// formatId returns the timestamp and sequence number formatted for a Task or
// Job ID. Format is "<timestamp>_<sequence_num>", where the timestamp is
// formatted using TIMESTAMP_FORMAT and sequence_num is formatted using
func formatId(t time.Time, seq uint64) string {
t = t.UTC()
return fmt.Sprintf("%s_"+SEQUENCE_NUMBER_FORMAT, t.Format(TIMESTAMP_FORMAT), seq)
// ParseId returns the timestamp and sequence number stored in a Task or Job ID.
func ParseId(id string) (time.Time, uint64, error) {
parts := strings.Split(id, "_")
if len(parts) != 2 {
return time.Time{}, 0, fmt.Errorf("Unparsable ID: %q", id)
t, err := time.Parse(TIMESTAMP_FORMAT, parts[0])
if err != nil {
return time.Time{}, 0, fmt.Errorf("Unparsable ID: %q; %s", id, err)
var seq uint64
// Add newlines to force Sscanf to match the entire string. Otherwise
// "123hello" will be parsed as 123. Note that Sscanf does not require 16
// digits even though SEQUENCE_NUMBER_FORMAT specifies padding to 16 digits.
i, err := fmt.Sscanf(parts[1]+"\n", SEQUENCE_NUMBER_FORMAT+"\n", &seq)
if err != nil {
return time.Time{}, 0, fmt.Errorf("Unparsable ID: %q; %s", id, err)
} else if i != 1 {
return time.Time{}, 0, fmt.Errorf("Unparsable ID: %q; Expected one hex number in %s, got %d", id, parts[1], i)
return t, seq, nil
// packV1 creates a value as described for BUCKET_TASKS_VERSION = 1 or
// BUCKET_JOBS_VERSION = 1. t is the modified time and serialized is the GOB of
// the Task or Job.
func packV1(t time.Time, serialized []byte) []byte {
rv := make([]byte, len(serialized)+9)
rv[0] = 1
binary.BigEndian.PutUint64(rv[1:9], uint64(t.UnixNano()))
copy(rv[9:], serialized)
return rv
// unpackV1 gets the modified time and GOB of the Task/Job from a value as
// described by BUCKET_TASKS_VERSION = 1 or BUCKET_JOBS_VERSION = 1. The
// returned GOB shares structure with value.
func unpackV1(value []byte) (time.Time, []byte, error) {
if len(value) < 9 {
return time.Time{}, nil, fmt.Errorf("unpackV1 value is too short (%d bytes)", len(value))
if value[0] != 1 {
return time.Time{}, nil, fmt.Errorf("unpackV1 called for value with version %d", value[0])
t := time.Unix(0, int64(binary.BigEndian.Uint64(value[1:9]))).UTC()
return t, value[9:], nil
// packTask creates a value for the current value of BUCKET_TASKS_VERSION. t is
// the modified time and serialized is the GOB of the Task.
func packTask(t time.Time, serialized []byte) []byte {
return packV1(t, serialized)
// unpackTask gets the modified time and GOB of the Task from a value for any
// supported version. The returned GOB shares structure with value.
func unpackTask(value []byte) (time.Time, []byte, error) {
if len(value) < 1 {
return time.Time{}, nil, fmt.Errorf("unpackTask value is empty")
// Only one version currently supported.
if value[0] != 1 {
return time.Time{}, nil, fmt.Errorf("unpackTask unrecognized version %d", value[0])
return unpackV1(value)
// packJob creates a value for the current value of BUCKET_JOBS_VERSION. t is
// the modified time and serialized is the GOB of the Job.
func packJob(t time.Time, serialized []byte) []byte {
return packV1(t, serialized)
// unpackJob gets the modified time and GOB of the Job from a value for any
// supported version. The returned GOB shares structure with value.
func unpackJob(value []byte) (time.Time, []byte, error) {
if len(value) < 1 {
return time.Time{}, nil, fmt.Errorf("unpackJob value is empty")
// Only one version currently supported.
if value[0] != 1 {
return time.Time{}, nil, fmt.Errorf("unpackJob unrecognized version %d", value[0])
return unpackV1(value)
// localDB accesses a local BoltDB database containing tasks, jobs, and
// comments.
type localDB struct {
// name is used in logging and metrics to identify this DB.
name string
// filename is used when serving the database backup file.
filename string
// db is the underlying BoltDB.
db *bolt.DB
// tx fields contain metrics on the number of active transactions. Protected
// by txMutex.
txCount metrics2.Counter
txNextId int64
txActive map[int64]string
txMutex sync.RWMutex
dbMetric *boltutil.DbMetric
// ModifiedTasks and ModifiedJobs are embedded in order to implement
// db.TaskReader and db.JobReader.
// CommentBox is embedded in order to implement db.CommentDB. CommentBox uses
// this localDB to persist the comments.
// Close will send on each of these channels to indicate goroutines should
// stop.
notifyOnClose []chan bool
// startTx monitors when a transaction starts.
func (d *localDB) startTx(name string) int64 {
defer d.txMutex.Unlock()
id := d.txNextId
d.txActive[id] = name
return id
// endTx monitors when a transaction ends.
func (d *localDB) endTx(id int64) {
defer d.txMutex.Unlock()
delete(d.txActive, id)
// reportActiveTx prints out the list of active transactions.
func (d *localDB) reportActiveTx() {
defer d.txMutex.RUnlock()
if len(d.txActive) == 0 {
sklog.Infof("%s Active Transactions: (none)",
txs := make([]string, 0, len(d.txActive))
for id, name := range d.txActive {
txs = append(txs, fmt.Sprintf(" %d\t%s", id, name))
sklog.Infof("%s Active Transactions:\n%s",, strings.Join(txs, "\n"))
// tx is a wrapper for a BoltDB transaction which tracks statistics.
func (d *localDB) tx(name string, fn func(*bolt.Tx) error, update bool) error {
txId := d.startTx(name)
defer d.endTx(txId)
defer metrics2.NewTimer("db-tx-duration", map[string]string{
"transaction": name,
if update {
return d.db.Update(fn)
} else {
return d.db.View(fn)
// view is a wrapper for the BoltDB instance's View method.
func (d *localDB) view(name string, fn func(*bolt.Tx) error) error {
return d.tx(name, fn, false)
// update is a wrapper for the BoltDB instance's Update method.
func (d *localDB) update(name string, fn func(*bolt.Tx) error) error {
return d.tx(name, fn, true)
// Returns the tasks bucket with FillPercent set.
func tasksBucket(tx *bolt.Tx) *bolt.Bucket {
b := tx.Bucket([]byte(BUCKET_TASKS))
return b
// Returns the jobs bucket with FillPercent set.
func jobsBucket(tx *bolt.Tx) *bolt.Bucket {
b := tx.Bucket([]byte(BUCKET_JOBS))
return b
// NewDB returns a local DB instance.
func NewDB(name, filename string) (db.BackupDBCloser, error) {
boltdb, err := bolt.Open(filename, 0600, nil)
if err != nil {
return nil, err
d := &localDB{
name: name,
filename: path.Base(filename),
db: boltdb,
txCount: metrics2.GetCounter("db-active-tx", map[string]string{
"database": name,
txNextId: 0,
txActive: map[int64]string{},
stopReportActiveTx := make(chan bool)
d.notifyOnClose = append(d.notifyOnClose, stopReportActiveTx)
go func() {
t := time.NewTicker(time.Minute)
for {
select {
case <-stopReportActiveTx:
case <-t.C:
comments := map[string]*db.RepoComments{}
if err := d.update("NewDB", func(tx *bolt.Tx) error {
if _, err := tx.CreateBucketIfNotExists([]byte(BUCKET_TASKS)); err != nil {
return err
if _, err := tx.CreateBucketIfNotExists([]byte(BUCKET_JOBS)); err != nil {
return err
commentsBucket, err := tx.CreateBucketIfNotExists([]byte(BUCKET_COMMENTS))
if err != nil {
return err
if _, err := tx.CreateBucketIfNotExists([]byte(BUCKET_BACKUP)); err != nil {
return err
serializedCommentMap := commentsBucket.Get([]byte(KEY_COMMENT_MAP))
if serializedCommentMap != nil {
if err := gob.NewDecoder(bytes.NewReader(serializedCommentMap)).Decode(&comments); err != nil {
return err
return nil
}); err != nil {
return nil, err
d.CommentBox = db.NewCommentBoxWithPersistence(comments, d.writeCommentsMap)
if dbMetric, err := boltutil.NewDbMetric(boltdb, []string{BUCKET_TASKS, BUCKET_JOBS, BUCKET_COMMENTS}, map[string]string{"database": name}); err != nil {
return nil, err
} else {
d.dbMetric = dbMetric
return d, nil
// See docs for io.Closer interface.
func (d *localDB) Close() error {
defer d.txMutex.Unlock()
if len(d.txActive) > 0 {
return fmt.Errorf("Can not close DB when transactions are active.")
for _, c := range d.notifyOnClose {
c <- true
d.txActive = map[int64]string{}
if err := d.dbMetric.Delete(); err != nil {
return err
d.dbMetric = nil
if err := d.txCount.Delete(); err != nil {
return err
d.txCount = nil
return d.db.Close()
// Sets t.Id either based on t.Created or now. tx must be an update transaction.
func (d *localDB) assignTaskId(tx *bolt.Tx, t *db.Task, now time.Time) error {
if t.Id != "" {
return fmt.Errorf("Task Id already assigned: %v", t.Id)
ts := now
if !util.TimeIsZero(t.Created) {
// TODO(benjaminwagner): Disallow assigning IDs based on t.Created; or
// ensure t.Created is > any ID ts in the DB.
ts = t.Created
seq, err := tasksBucket(tx).NextSequence()
if err != nil {
return err
t.Id = formatId(ts, seq)
return nil
// See docs for TaskDB interface.
func (d *localDB) AssignId(t *db.Task) error {
oldId := t.Id
err := d.update("AssignId", func(tx *bolt.Tx) error {
return d.assignTaskId(tx, t, time.Now())
if err != nil {
t.Id = oldId
return err
// See docs for TaskDB interface.
func (d *localDB) GetTaskById(id string) (*db.Task, error) {
var rv *db.Task
if err := d.view("GetTaskById", func(tx *bolt.Tx) error {
value := tasksBucket(tx).Get([]byte(id))
if value == nil {
return nil
_, serialized, err := unpackTask(value)
if err != nil {
return err
var t db.Task
if err := gob.NewDecoder(bytes.NewReader(serialized)).Decode(&t); err != nil {
return err
rv = &t
return nil
}); err != nil {
return nil, err
if rv == nil {
// Return an error if id is invalid.
if _, _, err := ParseId(id); err != nil {
return nil, err
return rv, nil
// See docs for TaskDB interface.
func (d *localDB) GetTasksFromDateRange(start, end time.Time) ([]*db.Task, error) {
min := []byte(start.Add(-MAX_CREATED_TIME_SKEW).UTC().Format(TIMESTAMP_FORMAT))
max := []byte(end.UTC().Format(TIMESTAMP_FORMAT))
decoder := db.TaskDecoder{}
if err := d.view("GetTasksFromDateRange", func(tx *bolt.Tx) error {
c := tasksBucket(tx).Cursor()
for k, v := c.Seek(min); k != nil && bytes.Compare(k, max) <= 0; k, v = c.Next() {
_, serialized, err := unpackTask(v)
if err != nil {
return err
cpy := make([]byte, len(serialized))
copy(cpy, serialized)
if !decoder.Process(cpy) {
return nil
return nil
}); err != nil {
return nil, err
result, err := decoder.Result()
if err != nil {
return nil, err
// The Tasks retrieved based on Id timestamp may include Tasks with Created
// time before/after the desired range.
// TODO(benjaminwagner): Biased binary search might be faster.
startIdx := 0
for startIdx < len(result) && result[startIdx].Created.Before(start) {
endIdx := len(result)
for endIdx > 0 && !result[endIdx-1].Created.Before(end) {
return result[startIdx:endIdx], nil
// See documentation for TaskDB interface.
func (d *localDB) PutTask(t *db.Task) error {
return d.PutTasks([]*db.Task{t})
// validateTask returns an error if the task can not be inserted into the DB.
// Does not modify task.
func (d *localDB) validateTask(task *db.Task) error {
if util.TimeIsZero(task.Created) {
return fmt.Errorf("Created not set. Task %s created time is %s. %v", task.Id, task.Created, task)
if task.Id != "" {
idTs, _, err := ParseId(task.Id)
if err != nil {
return err
if task.Created.Sub(idTs) > MAX_CREATED_TIME_SKEW {
return fmt.Errorf("Created too late. Task %s was assigned Id at %s which is %s before Created time %s, more than MAX_CREATED_TIME_SKEW = %s.", task.Id, idTs, task.Created.Sub(idTs), task.Created, MAX_CREATED_TIME_SKEW)
if task.Created.Before(idTs) {
return fmt.Errorf("Created too early. Task %s Created time was changed or set to %s after Id assigned at %s.", task.Id, task.Created, idTs)
return nil
// See documentation for TaskDB interface.
func (d *localDB) PutTasks(tasks []*db.Task) error {
// If there is an error during the transaction, we should leave the tasks
// unchanged. Save the old Ids and DbModified times since we set them below.
type savedData struct {
Id string
DbModified time.Time
oldData := make([]savedData, 0, len(tasks))
// Validate and save current data.
for _, t := range tasks {
if err := d.validateTask(t); err != nil {
return err
oldData = append(oldData, savedData{
Id: t.Id,
DbModified: t.DbModified,
revertChanges := func() {
for i, data := range oldData {
tasks[i].Id = data.Id
tasks[i].DbModified = data.DbModified
gobs := make(map[string][]byte, len(tasks))
err := d.update("PutTasks", func(tx *bolt.Tx) error {
bucket := tasksBucket(tx)
// Assign Ids and encode.
e := db.TaskEncoder{}
now := time.Now().UTC()
for _, t := range tasks {
if t.Id == "" {
if err := d.assignTaskId(tx, t, now); err != nil {
return err
} else {
if value := bucket.Get([]byte(t.Id)); value != nil {
modTs, serialized, err := unpackTask(value)
if err != nil {
return err
if !modTs.Equal(t.DbModified) {
var existing db.Task
if err := gob.NewDecoder(bytes.NewReader(serialized)).Decode(&existing); err != nil {
return err
sklog.Warningf("Cached Task has been modified in the DB. Current:\n%#v\nCached:\n%#v", existing, t)
return db.ErrConcurrentUpdate
t.DbModified = now
// Insert/update.
for {
t, serialized, err := e.Next()
if err != nil {
return err
if t == nil {
gobs[t.Id] = serialized
value := packTask(t.DbModified, serialized)
if err := bucket.Put([]byte(t.Id), value); err != nil {
return err
return nil
if err != nil {
return err
} else {
return nil
// Sets job.Id based on job.Created. tx must be an update transaction.
func (d *localDB) assignJobId(tx *bolt.Tx, job *db.Job) error {
if job.Id != "" {
return fmt.Errorf("Job Id already assigned: %v", job.Id)
if util.TimeIsZero(job.Created) {
// TODO(benjaminwagner): Ensure job.Created is > any ID ts in the DB.
return fmt.Errorf("Job Created time is not set: %s", job.Created)
seq, err := jobsBucket(tx).NextSequence()
if err != nil {
return err
job.Id = formatId(job.Created, seq)
return nil
// See docs for JobDB interface.
func (d *localDB) GetJobById(id string) (*db.Job, error) {
var rv *db.Job
if err := d.view("GetJobById", func(tx *bolt.Tx) error {
value := jobsBucket(tx).Get([]byte(id))
if value == nil {
return nil
_, serialized, err := unpackJob(value)
if err != nil {
return err
var job db.Job
if err := gob.NewDecoder(bytes.NewReader(serialized)).Decode(&job); err != nil {
return err
rv = &job
return nil
}); err != nil {
return nil, err
if rv == nil {
// Return an error if id is invalid.
if _, _, err := ParseId(id); err != nil {
return nil, err
return rv, nil
// See docs for JobDB interface.
func (d *localDB) GetJobsFromDateRange(start, end time.Time) ([]*db.Job, error) {
min := []byte(start.UTC().Format(TIMESTAMP_FORMAT))
max := []byte(end.UTC().Format(TIMESTAMP_FORMAT))
decoder := db.JobDecoder{}
if err := d.view("GetJobsFromDateRange", func(tx *bolt.Tx) error {
c := jobsBucket(tx).Cursor()
for k, v := c.Seek(min); k != nil && bytes.Compare(k, max) <= 0; k, v = c.Next() {
_, serialized, err := unpackJob(v)
if err != nil {
return err
cpy := make([]byte, len(serialized))
copy(cpy, serialized)
if !decoder.Process(cpy) {
return nil
return nil
}); err != nil {
return nil, err
result, err := decoder.Result()
if err != nil {
return nil, err
return result, nil
// See documentation for JobDB interface.
func (d *localDB) PutJob(job *db.Job) error {
return d.PutJobs([]*db.Job{job})
// validateJob returns an error if the job can not be inserted into the DB. Does not
// modify job.
func (d *localDB) validateJob(job *db.Job) error {
if util.TimeIsZero(job.Created) {
return fmt.Errorf("Created not set. Job %s created time is %s. %v", job.Id, job.Created, job)
if job.Id != "" {
idTs, _, err := ParseId(job.Id)
if err != nil {
return err
if !idTs.Equal(job.Created) {
return fmt.Errorf("Created time has changed since Job ID assigned. Job %s was assigned Id for Created time %s but Created time is now %s.", job.Id, idTs, job.Created)
return nil
// See documentation for JobDB interface.
func (d *localDB) PutJobs(jobs []*db.Job) error {
// If there is an error during the transaction, we should leave the jobs
// unchanged. Save the old Ids and DbModified times since we set them below.
type savedData struct {
Id string
DbModified time.Time
oldData := make([]savedData, len(jobs))
// Validate and save current data.
for i, job := range jobs {
if err := d.validateJob(job); err != nil {
return err
oldData[i].Id = job.Id
oldData[i].DbModified = job.DbModified
revertChanges := func() {
for i, data := range oldData {
jobs[i].Id = data.Id
jobs[i].DbModified = data.DbModified
gobs := make(map[string][]byte, len(jobs))
err := d.update("PutJobs", func(tx *bolt.Tx) error {
bucket := jobsBucket(tx)
// Assign Ids and encode.
e := db.JobEncoder{}
now := time.Now().UTC()
for _, job := range jobs {
if job.Id == "" {
if err := d.assignJobId(tx, job); err != nil {
return err
} else {
if value := bucket.Get([]byte(job.Id)); value != nil {
modTs, serialized, err := unpackJob(value)
if err != nil {
return err
if !modTs.Equal(job.DbModified) {
var existing db.Job
if err := gob.NewDecoder(bytes.NewReader(serialized)).Decode(&existing); err != nil {
return err
sklog.Warningf("Cached Job has been modified in the DB. Current:\n%#v\nCached:\n%#v", existing, job)
return db.ErrConcurrentUpdate
job.DbModified = now
// Insert/update.
for {
job, serialized, err := e.Next()
if err != nil {
return err
if job == nil {
gobs[job.Id] = serialized
value := packJob(job.DbModified, serialized)
if err := bucket.Put([]byte(job.Id), value); err != nil {
return err
return nil
if err != nil {
return err
} else {
return nil
// writeCommentsMap is passed to db.NewCommentBoxWithPersistence to persist
// comments after every change. Updates the value stored in BUCKET_COMMENTS.
func (d *localDB) writeCommentsMap(comments map[string]*db.RepoComments) error {
var buf bytes.Buffer
if err := gob.NewEncoder(&buf).Encode(comments); err != nil {
return err
return d.update("writeCommentsMap", func(tx *bolt.Tx) error {
return tx.Bucket([]byte(BUCKET_COMMENTS)).Put([]byte(KEY_COMMENT_MAP), buf.Bytes())
// See docs for BackupDBCloser interface.
func (d *localDB) WriteBackup(w io.Writer) error {
return d.view("WriteBackup", func(tx *bolt.Tx) error {
_, err := tx.WriteTo(w)
return err
// See docs for BackupDBCloser interface.
func (d *localDB) SetIncrementalBackupTime(t time.Time) error {
t = t.UTC()
val, err := t.MarshalBinary()
if err != nil {
return err
err = d.update("SetIncrementalBackupTime", func(tx *bolt.Tx) error {
return tx.Bucket([]byte(BUCKET_BACKUP)).Put([]byte(KEY_INCREMENTAL_BACKUP_TIME), val)
if err != nil {
return err
return nil
// See docs for BackupDBCloser interface.
func (d *localDB) GetIncrementalBackupTime() (time.Time, error) {
incBackupTime := time.Time{}
err := d.view("GetIncrementalBackupTime", func(tx *bolt.Tx) error {
commentsBucket := tx.Bucket([]byte(BUCKET_BACKUP))
serializedIncBackupTime := commentsBucket.Get([]byte(KEY_INCREMENTAL_BACKUP_TIME))
if serializedIncBackupTime == nil {
return nil
return incBackupTime.UnmarshalBinary(serializedIncBackupTime)
return incBackupTime.UTC(), err