blob: 065b3ca9faf98369f809b4e156d5c49165667551 [file] [log] [blame]
package features
Perform feature extraction for Swarming tasks.
import (
// ExtractRangeV0 extracts features for tasks within the given time range.
// Downloads all tasks in range from the task DB, stores in a convenient format
// and calls into PySpark to perform the actual work.
func ExtractRangeV0(ctx context.Context, d db.TaskReader, start, end time.Time) error {
tasks, err := d.GetTasksFromDateRange(start, end)
if err != nil {
return fmt.Errorf("Failed to retrieve tasks: %s", err)
sklog.Infof("Found %d tasks from %s to %s", len(tasks), start, end)
if len(tasks) == 0 {
return nil
workdir, err := ioutil.TempDir("", "")
if err != nil {
return fmt.Errorf("Failed to create temporary dir: %s", err)
defer util.RemoveAll(workdir)
tasksJson := path.Join(workdir, "tasks.json")
if err := util.WithWriteFile(tasksJson, func(w io.Writer) error {
return json.NewEncoder(w).Encode(tasks)
}); err != nil {
return fmt.Errorf("Failed to write JSON file: %s", err)
job := &dataproc.PySparkJob{
PyFile: "",
Files: []string{tasksJson},
Args: []string{"--tasks-json", "tasks.json"},
Cluster: dataproc.CLUSTER_SKIA,
out, err := job.Run(ctx)
sklog.Infof("Output from job:\n%s", out)
return err
func StartV0(ctx context.Context, workdir string, d db.TaskReader) error {
p := processor.Processor{
BeginningOfTime: time.Date(2016, time.September, 1, 0, 0, 0, 0, time.UTC),
ChunkSize: time.Hour,
Name: "swarming_log_feature_extraction_v0",
Frequency: time.Hour,
ProcessFn: func(ctx context.Context, start, end time.Time) error {
return ExtractRangeV0(ctx, d, start, end)
Window: 24 * time.Hour,
Workdir: workdir,
return p.Start(ctx)