blob: 065b3ca9faf98369f809b4e156d5c49165667551 [file] [log] [blame]
package features
/*
Perform feature extraction for Swarming tasks.
*/
import (
"context"
"encoding/json"
"fmt"
"io"
"io/ioutil"
"path"
"time"
"go.skia.org/infra/go/dataproc"
"go.skia.org/infra/go/sklog"
"go.skia.org/infra/go/util"
"go.skia.org/infra/statusv2/go/ml/processor"
"go.skia.org/infra/task_scheduler/go/db"
)
// ExtractRangeV0 extracts features for tasks within the given time range.
// Downloads all tasks in range from the task DB, stores in a convenient format
// and calls into PySpark to perform the actual work.
func ExtractRangeV0(ctx context.Context, d db.TaskReader, start, end time.Time) error {
tasks, err := d.GetTasksFromDateRange(start, end)
if err != nil {
return fmt.Errorf("Failed to retrieve tasks: %s", err)
}
sklog.Infof("Found %d tasks from %s to %s", len(tasks), start, end)
if len(tasks) == 0 {
return nil
}
workdir, err := ioutil.TempDir("", "")
if err != nil {
return fmt.Errorf("Failed to create temporary dir: %s", err)
}
defer util.RemoveAll(workdir)
tasksJson := path.Join(workdir, "tasks.json")
if err := util.WithWriteFile(tasksJson, func(w io.Writer) error {
return json.NewEncoder(w).Encode(tasks)
}); err != nil {
return fmt.Errorf("Failed to write JSON file: %s", err)
}
job := &dataproc.PySparkJob{
PyFile: "extract_features_v0.py",
Files: []string{tasksJson},
Args: []string{"--tasks-json", "tasks.json"},
Cluster: dataproc.CLUSTER_SKIA,
}
out, err := job.Run(ctx)
sklog.Infof("Output from job:\n%s", out)
return err
}
func StartV0(ctx context.Context, workdir string, d db.TaskReader) error {
p := processor.Processor{
BeginningOfTime: time.Date(2016, time.September, 1, 0, 0, 0, 0, time.UTC),
ChunkSize: time.Hour,
Name: "swarming_log_feature_extraction_v0",
Frequency: time.Hour,
ProcessFn: func(ctx context.Context, start, end time.Time) error {
return ExtractRangeV0(ctx, d, start, end)
},
Window: 24 * time.Hour,
Workdir: workdir,
}
return p.Start(ctx)
}