blob: d16feb1f4f4b6c1fbea3b54e445eba43a7bea9cc [file] [log] [blame]
package firestore
import (
"context"
"encoding/json"
"fmt"
"sort"
"strings"
"time"
fs "cloud.google.com/go/firestore"
"go.opencensus.io/trace"
"go.skia.org/infra/go/firestore"
"go.skia.org/infra/go/now"
"go.skia.org/infra/go/sklog"
"go.skia.org/infra/go/util"
"go.skia.org/infra/task_scheduler/go/db"
"go.skia.org/infra/task_scheduler/go/types"
"google.golang.org/grpc/codes"
"google.golang.org/grpc/status"
)
const (
COLLECTION_JOBS = "jobs"
)
// Fix all timestamps for the given job.
func fixJobTimestamps(job *types.Job) {
job.Created = firestore.FixTimestamp(job.Created)
job.DbModified = firestore.FixTimestamp(job.DbModified)
job.Finished = firestore.FixTimestamp(job.Finished)
job.Requested = firestore.FixTimestamp(job.Requested)
}
// jobs returns a reference to the jobs collection.
func (d *firestoreDB) jobs() *fs.CollectionRef {
return d.client.Collection(COLLECTION_JOBS)
}
// See documentation for types.JobReader interface.
func (d *firestoreDB) GetJobById(ctx context.Context, id string) (*types.Job, error) {
doc, err := d.client.Get(ctx, d.jobs().Doc(id), DEFAULT_ATTEMPTS, GET_SINGLE_TIMEOUT)
if st, ok := status.FromError(err); ok && st.Code() == codes.NotFound {
return nil, nil
} else if err != nil {
return nil, err
}
var rv types.Job
if err := doc.DataTo(&rv); err != nil {
return nil, err
}
return &rv, nil
}
// See documentation for types.JobReader interface.
func (d *firestoreDB) GetJobsFromDateRange(ctx context.Context, start, end time.Time, repo string) ([]*types.Job, error) {
var jobs [][]*types.Job
init := func(numGoroutines int) {
jobs = make([][]*types.Job, numGoroutines)
for i := 0; i < numGoroutines; i++ {
estResults := estResultSize(end.Sub(start) / time.Duration(numGoroutines))
jobs[i] = make([]*types.Job, 0, estResults)
}
}
elem := func(idx int, doc *fs.DocumentSnapshot) error {
var job types.Job
if err := doc.DataTo(&job); err != nil {
return err
}
if doc.Ref.ID != job.Id {
sklog.Errorf("Job %s is stored with ID %s; GetJobById will not be able to find it!", job.Id, doc.Ref.ID)
return nil
}
if repo != "" {
if job.Repo != repo {
sklog.Errorf("Query returned job with wrong repo; wanted %q but got %q; job: %+v", repo, job.Repo, job)
return nil
}
}
jobs[idx] = append(jobs[idx], &job)
return nil
}
q := d.jobs().Query
if repo != "" {
q = q.Where(KEY_REPO, "==", repo)
}
if err := d.dateRangeHelper(ctx, "GetJobsFromDateRange", q, start, end, init, elem); err != nil {
return nil, err
}
totalResults := 0
for _, jobList := range jobs {
totalResults += len(jobList)
}
rv := make([]*types.Job, 0, totalResults)
for _, jobList := range jobs {
rv = append(rv, jobList...)
}
sort.Sort(types.JobSlice(rv))
return rv, nil
}
// putJobs sets the contents of the given jobs in Firestore, as part of the
// given transaction. It is used by PutJob and PutJobs.
func (d *firestoreDB) putJobs(jobs []*types.Job, isNew []bool, prevModified []time.Time, tx *fs.Transaction) (rvErr error) {
// Find the previous versions of the jobs. Ensure that they weren't
// updated concurrently.
refs := make([]*fs.DocumentRef, 0, len(jobs))
for _, job := range jobs {
refs = append(refs, d.jobs().Doc(job.Id))
}
docs, err := tx.GetAll(refs)
if err != nil {
return err
}
d.client.CountReadQueryAndRows(d.jobs().Path, len(docs))
for idx, doc := range docs {
if !doc.Exists() {
// This is expected for new jobs.
if !isNew[idx] {
sklog.Errorf("Job is not new but wasn't found in the DB: %#v", jobs[idx])
// If the job is supposed to exist but does not, then
// we have a problem.
return db.ErrConcurrentUpdate
}
} else if isNew[idx] {
// If the job is not supposed to exist but does, then
// we have a problem.
var old types.Job
if err := doc.DataTo(&old); err != nil {
return fmt.Errorf("Job has no DbModified timestamp but already exists in the DB. Failed to decode previous job with: %s", err)
}
sklog.Errorf("Job has no DbModified timestamp but already exists in the DB! \"New\" job:\n%#v\nExisting job:\n%#v", jobs[idx], old)
return db.ErrConcurrentUpdate
}
// If the job already exists, check the DbModified timestamp
// to ensure that someone else didn't update it.
if !isNew[idx] {
var old types.Job
if err := doc.DataTo(&old); err != nil {
return err
}
if old.DbModified != prevModified[idx] {
oldBytes, err := json.Marshal(old)
if err != nil {
sklog.Errorf("Failed to marshal JSON: %s", err)
}
newBytes, err := json.Marshal(jobs[idx])
if err != nil {
sklog.Errorf("Failed to marshal JSON: %s", err)
}
sklog.Errorf("Concurrent update: Job %s in DB has DbModified %s; cached job has DbModified %s. \"New\" job:\n%s\nExisting job:\n%s", old.Id, old.DbModified.Format(time.RFC3339Nano), prevModified[idx].Format(time.RFC3339Nano), string(newBytes), string(oldBytes))
return db.ErrConcurrentUpdate
}
}
}
// Set the new contents of the jobs.
d.client.CountWriteQueryAndRows(d.jobs().Path, len(jobs))
for _, job := range jobs {
ref := d.jobs().Doc(job.Id)
if err := tx.Set(ref, job); err != nil {
return err
}
}
return nil
}
// PutJob implements types.JobDB.
func (d *firestoreDB) PutJob(ctx context.Context, job *types.Job) error {
return d.PutJobs(ctx, []*types.Job{job})
}
// PutJobs implements types.JobDB.
func (d *firestoreDB) PutJobs(ctx context.Context, jobs []*types.Job) (rvErr error) {
if len(jobs) == 0 {
return nil
}
if len(jobs) > MAX_TRANSACTION_DOCS {
return fmt.Errorf("Tried to insert %d jobs but Firestore maximum per transaction is %d.", len(jobs), MAX_TRANSACTION_DOCS)
}
// Record the previous ID and DbModified timestamp. We'll reset these
// if we fail to insert the jobs into the DB.
currentTime := firestore.FixTimestamp(now.Now(ctx))
isNew := make([]bool, len(jobs))
prevId := make([]string, len(jobs))
prevModified := make([]time.Time, len(jobs))
for idx, job := range jobs {
if util.TimeIsZero(job.Created) {
return fmt.Errorf("Created not set. Job %s created time is %s. %#v", job.Id, job.Created, job)
}
isNew[idx] = util.TimeIsZero(job.DbModified)
prevId[idx] = job.Id
prevModified[idx] = job.DbModified
}
defer func() {
if rvErr != nil {
for idx, job := range jobs {
job.Id = prevId[idx]
job.DbModified = prevModified[idx]
}
}
}()
// logmsg builds a log message to debug skia:9444.
var logmsg strings.Builder
if _, err := fmt.Fprintf(&logmsg, "Added/updated Jobs with DbModified %s:", currentTime.Format(time.RFC3339Nano)); err != nil {
sklog.Warningf("Error building log message: %s", err)
}
// Assign new IDs (where needed) and DbModified timestamps.
for _, job := range jobs {
logmsg.WriteRune(' ')
if job.Id == "" {
job.Id = firestore.AlphaNumID()
logmsg.WriteRune('+')
}
logmsg.WriteString(job.Id)
if !currentTime.After(job.DbModified) {
// We can't use the same DbModified timestamp for two updates,
// or we risk losing updates. Increment the timestamp if
// necessary.
job.DbModified = job.DbModified.Add(firestore.TS_RESOLUTION)
if _, err := fmt.Fprintf(&logmsg, "@%s", job.DbModified.Format(time.RFC3339Nano)); err != nil {
sklog.Warningf("Error building log message: %s", err)
}
} else {
job.DbModified = currentTime
}
fixJobTimestamps(job)
}
// Insert the jobs into the DB.
if err := d.client.RunTransaction(ctx, "PutJobs", fmt.Sprintf("%d jobs", len(jobs)), DEFAULT_ATTEMPTS, PUT_MULTI_TIMEOUT, func(ctx context.Context, tx *fs.Transaction) error {
return d.putJobs(jobs, isNew, prevModified, tx)
}); err != nil {
return err
}
sklog.Debug(logmsg.String())
return nil
}
// PutJobsInChunks implements types.JobDB.
func (d *firestoreDB) PutJobsInChunks(ctx context.Context, jobs []*types.Job) error {
return util.ChunkIter(len(jobs), MAX_TRANSACTION_DOCS, func(i, j int) error {
return d.PutJobs(ctx, jobs[i:j])
})
}
// SearchJobs implements db.JobReader.
func (d *firestoreDB) SearchJobs(ctx context.Context, params *db.JobSearchParams) ([]*types.Job, error) {
ctx, span := trace.StartSpan(ctx, "db_SearchJobs", trace.WithSampler(trace.ProbabilitySampler(0.01)))
defer span.End()
// Firestore requires all multi-column indexes to be created in advance.
// Because we can't predict which search parameters will be given (and
// because we don't want to create indexes for every combination of
// parameters), we search by the parameter we think will limit the results
// the most, then filter those results by the other parameters.
q := d.jobs().Query
term := "none"
if params.TimeEnd == nil || util.TimeIsZero(*params.TimeEnd) {
end := now.Now(ctx)
params.TimeEnd = &end
}
if params.TimeStart == nil || util.TimeIsZero(*params.TimeStart) {
start := (*params.TimeEnd).Add(-24 * time.Hour)
params.TimeStart = &start
}
if params.BuildbucketBuildID != nil && *params.BuildbucketBuildID != 0 {
q = q.Where("BuildbucketBuildId", "==", *params.BuildbucketBuildID)
term = fmt.Sprintf("BuildBucketBuildId == %d", *params.BuildbucketBuildID)
} else if params.Issue != nil && *params.Issue != "" {
q = q.Where("Issue", "==", *params.Issue)
term = fmt.Sprintf("Issue == %s", *params.Issue)
} else if params.IsForce != nil && *params.IsForce {
q = q.Where("IsForce", "==", *params.IsForce)
term = fmt.Sprintf("IsForce == %v", *params.IsForce)
} else if params.Revision != nil {
q = q.Where("Revision", "==", *params.Revision)
term = fmt.Sprintf("Revision == %s", *params.Revision)
} else if params.Status != nil && *params.Status == types.JOB_STATUS_REQUESTED {
q = q.Where("Status", "==", *params.Status)
term = fmt.Sprintf("Status == %s", *params.Status)
} else {
term = fmt.Sprintf("Created in [%s, %s)", params.TimeStart, params.TimeEnd)
q = q.Where(KEY_CREATED, "<", params.TimeEnd).Where(KEY_CREATED, ">=", params.TimeStart)
// Repo is compatible with TimeStart and TimeEnd because we have an
// index for it.
if params.Repo != nil {
q = q.Where("Repo", "==", *params.Repo)
term += fmt.Sprintf(" and Repo == %s", *params.Repo)
}
}
// Search the DB.
results := []*types.Job{}
err := d.client.IterDocs(ctx, "SearchJobs", term, q, DEFAULT_ATTEMPTS, GET_MULTI_TIMEOUT, func(doc *fs.DocumentSnapshot) error {
var job types.Job
if err := doc.DataTo(&job); err != nil {
return err
}
if db.MatchJob(&job, params) {
results = append(results, &job)
}
if len(results) >= db.SearchResultLimit {
return db.ErrDoneSearching
}
return nil
})
if err == db.ErrDoneSearching {
err = nil
}
sklog.Infof("Searching jobs; terms: %q; results: %d", term, len(results))
return results, err
}