blob: 153b90c4a5c7658e09088f01ac59ea1c87c5abe1 [file] [log] [blame]
package aggregator
import (
"bytes"
"context"
"crypto/sha1"
"fmt"
"io"
"io/ioutil"
"os"
"path/filepath"
"regexp"
"sort"
"strings"
"sync"
"time"
"cloud.google.com/go/storage"
"go.skia.org/infra/fuzzer/go/common"
"go.skia.org/infra/fuzzer/go/config"
"go.skia.org/infra/fuzzer/go/data"
"go.skia.org/infra/fuzzer/go/deduplicator"
"go.skia.org/infra/fuzzer/go/issues"
fstorage "go.skia.org/infra/fuzzer/go/storage"
"go.skia.org/infra/go/buildskia"
"go.skia.org/infra/go/exec"
"go.skia.org/infra/go/fileutil"
"go.skia.org/infra/go/metrics2"
"go.skia.org/infra/go/sklog"
"go.skia.org/infra/go/util"
)
// Aggregator is a key part of the fuzzing operation
// (see https://skia.googlesource.com/buildbot/+/master/fuzzer/DESIGN.md).
// It will find new bad fuzzes generated by afl-fuzz and create the metadata required for them. It
// does this by searching in the specified AflOutputPath for new crashes and moves them to a
// temporary holding folder (specified by FuzzPath) for parsing, before sending them through the
// "aggregation pipeline". This pipeline has three steps, Analysis, Upload and Bug Reporting.
// Analysis runs the fuzz against a debug and release version of Skia which produces stacktraces and
// error output. Upload uploads these pieces to Google Storage (GCS). Bug Reporting is used to
// either create or update a bug related to the given fuzz.
type Aggregator struct {
// If we are watching for regressions, all fuzzes passed in should be "grey". If they are not
// don't deduplicate them.
WatchForRegressions bool
// Should be set to true if a bug should be created for every bad fuzz found.
// Example: detecting a bad fuzz in a "stable" fuzzer
// TODO(kjlubick): consider making this a function that clients supply so they can decide.
MakeBugOnBadFuzz bool
// Should be set if we want to upload grey fuzzes. This should only be true
// if we are changing versions.
UploadGreyFuzzes bool
storageClient *storage.Client
issueManager *issues.IssuesManager
// For passing the paths of new binaries that should be scanned.
forAnalysis chan analysisPackage
// For passing the file names of analyzed fuzzes that should be uploaded from where they rest on
// disk in `fuzzPath`
forUpload chan uploadPackage
forBugReporting chan bugReportingPackage
// maps category to its deduplicator
deduplicators map[string]deduplicator.Deduplicator
// The shutdown channels are used to signal shutdowns. There are two groups, to
// allow for a softer, cleaner shutdown w/ minimal lost work.
// Group A (monitoring) includes the scanning and the monitoring routine.
// Group B (aggregation) include the analysis and upload routines and the bug reporting routine.
monitoringShutdown chan bool
monitoringWaitGroup *sync.WaitGroup
aggregationShutdown chan bool
aggregationWaitGroup *sync.WaitGroup
greyNames []string
badNames []string
duplicateNames []string
}
const (
BAD_FUZZ = "bad"
GREY_FUZZ = "grey"
EMPTY_THRESHOLD = 5
RETRIES_FOR_BAD_FUZZES = 3
)
var (
CLANG_DEBUG = common.TEST_HARNESS_NAME + "_clang_debug"
CLANG_RELEASE = common.TEST_HARNESS_NAME + "_clang_release"
ASAN_DEBUG = common.TEST_HARNESS_NAME + "_asan_debug"
ASAN_RELEASE = common.TEST_HARNESS_NAME + "_asan_release"
ANALYSIS_EXECUTABLE_LIST = []string{ASAN_RELEASE, ASAN_DEBUG, CLANG_RELEASE, CLANG_DEBUG}
)
// analysisPackage is a struct containing all the pieces of a fuzz needed to analyse it.
type analysisPackage struct {
FilePath string
Category string
}
// uploadPackage is a struct containing all the pieces of a fuzz that need to be uploaded to GCS
type uploadPackage struct {
Data data.GCSPackage
FilePath string
// Must be BAD_FUZZ or GREY_FUZZ
FuzzType string
Category string
}
// bugReportingPackage is a struct containing the pieces of a fuzz that may need to have
// a bug filed or updated.
type bugReportingPackage struct {
Data issues.IssueReportingPackage
IsBadFuzz bool
}
// StartAggregator creates and starts a Aggregator based on the Skia Revision in
// config.Common.SkiaVersion.Hash.
// If there is a problem starting up, an error is returned. Other errors will be logged.
func StartAggregator(ctx context.Context, s *storage.Client, im *issues.IssuesManager, startingReports map[string]<-chan data.FuzzReport) (*Aggregator, error) {
b := Aggregator{
storageClient: s,
issueManager: im,
forAnalysis: make(chan analysisPackage, 1000000),
forUpload: make(chan uploadPackage, 10000),
forBugReporting: make(chan bugReportingPackage, 10000),
MakeBugOnBadFuzz: true,
UploadGreyFuzzes: false,
deduplicators: make(map[string]deduplicator.Deduplicator),
monitoringShutdown: make(chan bool, 2),
// aggregationShutdown needs to be created with a calculated capacity in start
}
// preload the deduplicator
for _, category := range config.Generator.FuzzesToGenerate {
client := fstorage.NewFuzzerGCSClient(s, config.GCS.Bucket)
d := deduplicator.NewRemoteDeduplicator(client)
d.SetRevision(config.Common.SkiaVersion.Hash)
for report := range startingReports[category] {
d.IsUnique(report)
}
b.deduplicators[category] = d
}
return &b, b.start(ctx)
}
// start starts up the Aggregator. It refreshes all status it needs and builds a debug and a
// release version of Skia for use in analysis. It then spawns the aggregation pipeline and a
// monitoring thread.
func (agg *Aggregator) start(ctx context.Context) error {
// Set the wait groups to fresh
agg.monitoringWaitGroup = &sync.WaitGroup{}
agg.aggregationWaitGroup = &sync.WaitGroup{}
if err := agg.buildAnalysisBinaries(ctx); err != nil {
return err
}
agg.monitoringWaitGroup.Add(1)
go agg.scanForNewCandidates()
numAnalysisProcesses := config.Aggregator.NumAnalysisProcesses
if numAnalysisProcesses <= 0 {
// TODO(kjlubick): Actually make this smart based on the number of cores
numAnalysisProcesses = 20
}
for i := 0; i < numAnalysisProcesses; i++ {
agg.aggregationWaitGroup.Add(1)
go agg.waitForAnalysis(ctx, i)
}
numUploadProcesses := config.Aggregator.NumUploadProcesses
if numUploadProcesses <= 0 {
// TODO(kjlubick): Actually make this smart based on the number of cores/number
// of aggregation processes
numUploadProcesses = 5
}
for i := 0; i < numUploadProcesses; i++ {
agg.aggregationWaitGroup.Add(1)
go agg.waitForUploads(i)
}
agg.aggregationWaitGroup.Add(1)
go agg.waitForBugReporting()
agg.aggregationShutdown = make(chan bool, numAnalysisProcesses+numUploadProcesses+1)
// start background routine to monitor queue details
agg.monitoringWaitGroup.Add(1)
go agg.monitorStatus(numAnalysisProcesses, numUploadProcesses)
return nil
}
// buildAnalysisBinaries creates the 4 executables we need to perform analysis and makes a copy of
// them in the executablePath. We need (Debug,Release) x (Clang,ASAN). The copied binaries have
// a suffix like _clang_debug
func (agg *Aggregator) buildAnalysisBinaries(ctx context.Context) error {
if _, err := fileutil.EnsureDirExists(config.Aggregator.FuzzPath); err != nil {
return err
}
if _, err := fileutil.EnsureDirExists(config.Aggregator.WorkingPath); err != nil {
return err
}
if srcExe, err := common.BuildClangHarness(ctx, buildskia.DEBUG_BUILD, true); err != nil {
return err
} else if err := fileutil.CopyExecutable(srcExe, filepath.Join(config.Aggregator.WorkingPath, CLANG_DEBUG)); err != nil {
return err
}
if srcExe, err := common.BuildClangHarness(ctx, buildskia.RELEASE_BUILD, true); err != nil {
return err
} else if err := fileutil.CopyExecutable(srcExe, filepath.Join(config.Aggregator.WorkingPath, CLANG_RELEASE)); err != nil {
return err
}
if srcExe, err := common.BuildASANHarness(ctx, buildskia.DEBUG_BUILD, false); err != nil {
return err
} else if err := fileutil.CopyExecutable(srcExe, filepath.Join(config.Aggregator.WorkingPath, ASAN_DEBUG)); err != nil {
return err
}
if srcExe, err := common.BuildASANHarness(ctx, buildskia.RELEASE_BUILD, false); err != nil {
return err
} else if err := fileutil.CopyExecutable(srcExe, filepath.Join(config.Aggregator.WorkingPath, ASAN_RELEASE)); err != nil {
return err
}
return nil
}
// scanForNewCandidates runs scanHelper once every config.Aggregator.RescanPeriod, which scans the
// config.Generator.AflOutputPath for new fuzzes. If scanHelper returns an error, this method
// will terminate.
func (agg *Aggregator) scanForNewCandidates() {
defer agg.monitoringWaitGroup.Done()
alreadyFoundFuzzes := &SortedStringSlice{}
// time.Tick does not fire immediately, so we fire it manually once.
if err := agg.scanHelper(alreadyFoundFuzzes); err != nil {
sklog.Errorf("Scanner terminated due to error: %v", err)
return
}
sklog.Infof("Sleeping for %s, then waking up to find new crashes again", config.Aggregator.RescanPeriod)
t := time.Tick(config.Aggregator.RescanPeriod)
for {
select {
case <-t:
if err := agg.scanHelper(alreadyFoundFuzzes); err != nil {
sklog.Errorf("Aggregator scanner terminated due to error: %s", err)
return
}
if err := collectFuzzerMetrics(); err != nil {
sklog.Warningf("Encountered error getting fuzzer metrics: %s", err)
}
sklog.Infof("Sleeping for %s, then waking up to find new crashes again", config.Aggregator.RescanPeriod)
case <-agg.monitoringShutdown:
sklog.Info("Aggregator scanner got signal to shut down")
return
}
}
}
// scanHelper runs findFuzzPaths for every category, logs the output and keeps
// alreadyFoundFuzzes up to date.
func (agg *Aggregator) scanHelper(alreadyFoundFuzzes *SortedStringSlice) error {
for _, category := range config.Generator.FuzzesToGenerate {
newlyFound, err := findFuzzPaths(category, alreadyFoundFuzzes)
if err != nil {
return err
}
// AFL-fuzz does not write crashes or hangs atomically, so this workaround waits for a bit after
// we have references to where the crashes will be.
// TODO(kjlubick), switch to using flock once afl-fuzz implements that upstream.
time.Sleep(time.Second)
metrics2.GetInt64Metric("fuzzer_fuzzes_newly_found", map[string]string{"fuzz_category": category, "architecture": config.Generator.Architecture}).Update(int64(len(newlyFound)))
sklog.Infof("%d newly found %s bad fuzzes", len(newlyFound), category)
for _, f := range newlyFound {
agg.forAnalysis <- analysisPackage{
FilePath: f,
Category: category,
}
}
alreadyFoundFuzzes.Append(newlyFound)
}
return nil
}
// findFuzzPaths looks through all the afl-fuzz directories contained in the passed in path and
// returns the absolute path to all files that need to be analyzied which are not already in
// 'alreadyFoundFuzzes'. It also sends them to the forAnalysis channel when it finds them.
// The output from afl-fuzz looks like:
// afl_output_path/category/
// -fuzzer0/
// -crashes/ <-- bad fuzzes end up here (search)
// -hangs/
// -queue/ <-- all fuzzes end up here (search, may catch ASAN-only)
// -fuzzer_stats
// -fuzzer1/
// -crashes/ <-- bad fuzzes end up here (search)
// -hangs/
// -queue/ <-- all fuzzes end up here (search, may catch ASAN-only)
// -fuzzer_stats
// -fuzzer2/
// ...
func findFuzzPaths(category string, alreadyFoundFuzzes *SortedStringSlice) ([]string, error) {
badFuzzPaths := make([]string, 0)
scanPath := filepath.Join(config.Generator.AflOutputPath, category)
aflDir, err := os.Open(scanPath)
if os.IsNotExist(err) {
sklog.Warningf("Path to scan %s does not exist. Returning 0 found fuzzes", scanPath)
return []string{}, nil
}
if err != nil {
return nil, err
}
defer util.Close(aflDir)
fuzzerFolders, err := aflDir.Readdir(-1)
if err != nil {
return nil, err
}
for _, fuzzerFolderInfo := range fuzzerFolders {
// fuzzerFolderName an os.FileInfo like fuzzer0, fuzzer1
path := filepath.Join(scanPath, fuzzerFolderInfo.Name())
fuzzerDir, err := os.Open(path)
if err != nil {
return nil, err
}
defer util.Close(fuzzerDir)
fuzzerContents, err := fuzzerDir.Readdir(-1)
if err != nil {
return nil, err
}
for _, info := range fuzzerContents {
// Look through fuzzerN/crashes and fuzzerN/queue
if info.IsDir() && (strings.HasPrefix(info.Name(), "crashes") || strings.HasPrefix(info.Name(), "queue")) {
crashPath := filepath.Join(path, info.Name())
crashDir, err := os.Open(crashPath)
if err != nil {
return nil, err
}
defer util.Close(crashDir)
crashContents, err := crashDir.Readdir(-1)
if err != nil {
return nil, err
}
for _, crash := range crashContents {
// Make sure the files are actually crashable files we haven't found before
if crash.Name() != "README.txt" && !crash.IsDir() {
if fuzzPath := filepath.Join(crashPath, crash.Name()); !alreadyFoundFuzzes.Contains(fuzzPath) {
badFuzzPaths = append(badFuzzPaths, fuzzPath)
}
}
}
}
}
}
return badFuzzPaths, nil
}
var aflMetrics = []string{"start_time", "last_update", "cycles_done", "execs_done", "execs_per_sec", "paths_total", "paths_found", "paths_imported", "stability"}
var aflRegexps = make([]*regexp.Regexp, 0, len(aflMetrics))
// collectFuzzerMetrics looks through the fuzzer_stats file in the main fuzzer output folder of
// each fuzz category and extracts some key metrics from it. The format of this file is
// detailed in http://lcamtuf.coredump.cx/afl/status_screen.txt
func collectFuzzerMetrics() error {
// Compile the regexps for the metrics once
if len(aflRegexps) == 0 {
for _, m := range aflMetrics {
r, err := regexp.Compile(m + `\s+:\s+(\d+)`)
if err != nil {
sklog.Warningf("Had a problem compiling regexp for %s: %s", m, err)
}
aflRegexps = append(aflRegexps, r)
}
}
for _, category := range config.Generator.FuzzesToGenerate {
statsFile := filepath.Join(config.Generator.AflOutputPath, category, "fuzzer0", "fuzzer_stats")
b, err := ioutil.ReadFile(statsFile)
if err != nil {
sklog.Errorf("Cannot read metrics file %s, continuing anyway: %s", statsFile, err)
continue
}
contents := string(b)
for i, m := range aflMetrics {
r := aflRegexps[i]
if r == nil {
continue
}
metric := fmt.Sprintf("fuzzer_stats_%s", m)
if match := r.FindStringSubmatch(contents); match != nil {
metrics2.GetInt64Metric(metric, map[string]string{"fuzz_category": category, "architecture": config.Generator.Architecture}).Update(int64(util.SafeAtoi(match[1])))
}
}
}
return nil
}
// waitForAnalysis waits for files that need to be analyzed (from forAnalysis) and makes a copy of
// them in agg.fuzzPath with their hash as a file name. It then analyzes it using the supplied
// AnalysisPackage and then signals the results should be uploaded. If any unrecoverable errors
// happen, this method terminates.
func (agg *Aggregator) waitForAnalysis(ctx context.Context, identifier int) {
defer agg.aggregationWaitGroup.Done()
defer metrics2.GetCounter("analysis_process_count", nil).Dec(int64(1))
sklog.Infof("Spawning analyzer %d", identifier)
// our own unique working folder
executableDir := filepath.Join(config.Aggregator.WorkingPath, fmt.Sprintf("analyzer%d", identifier))
if err := setupAnalysis(executableDir); err != nil {
sklog.Errorf("Analyzer %d terminated due to error: %s", identifier, err)
return
}
for {
select {
case badFuzz := <-agg.forAnalysis:
err := agg.analysisHelper(ctx, executableDir, badFuzz)
if err != nil {
sklog.Errorf("Analyzer %d terminated due to error: %s", identifier, err)
return
}
case <-agg.aggregationShutdown:
sklog.Infof("Analyzer %d recieved shutdown signal", identifier)
return
}
}
}
// analysisHelper performs the analysis on the given fuzz and returns an error if anything goes
// wrong. On success, the results will be placed in the upload queue.
func (agg *Aggregator) analysisHelper(ctx context.Context, executableDir string, badFuzz analysisPackage) error {
hash, data, err := calculateHash(badFuzz.FilePath)
if err == os.ErrNotExist {
// fuzzes from the queue sometimes are transient (e.g. the most recent one, if it is found to be
// a duplicate).
return nil
}
if err != nil {
return sklog.FmtErrorf("Unexpected error while hashing contents of %#v: %s", badFuzz, err)
}
newFuzzPath := filepath.Join(config.Aggregator.FuzzPath, hash)
if err := ioutil.WriteFile(newFuzzPath, data, 0644); err != nil {
return sklog.FmtErrorf("Could not copy %s to %s: %s", badFuzz.FilePath, newFuzzPath, err)
}
if upload, err := analyze(ctx, executableDir, hash, badFuzz.Category); err != nil {
return sklog.FmtErrorf("Problem analyzing %s (originally %s), terminating: %s", newFuzzPath, badFuzz.FilePath, err)
} else {
agg.forUpload <- upload
}
return nil
}
// Setup cleans out the working directory and makes a copy of the Debug and Release fuzz executable
// in that directory.
func setupAnalysis(workingDirPath string) error {
// Delete all previous executables to get a clean start
if err := os.RemoveAll(workingDirPath); err != nil && !os.IsNotExist(err) {
return sklog.FmtErrorf("Could not clean out analysis directory %s: %s", workingDirPath, err)
}
if err := os.MkdirAll(workingDirPath, 0755); err != nil {
return sklog.FmtErrorf("Could not making analysis directory %s: %s", workingDirPath, err)
}
// make a copy of the 4 executables that were made in buildAnalysisBinaries()
for _, exe := range ANALYSIS_EXECUTABLE_LIST {
built_exe := filepath.Join(config.Aggregator.WorkingPath, exe)
dst := filepath.Join(workingDirPath, exe)
if err := fileutil.CopyExecutable(built_exe, dst); err != nil {
return sklog.FmtErrorf("Could not copy executable %s to %s: %s", built_exe, dst, err)
}
}
return nil
}
// analyze simply invokes performAnalysis with a fuzz under both the Debug and Release build. Upon
// completion, it checks to see if the fuzz is a grey fuzz and sets the FuzzType accordingly.
func analyze(ctx context.Context, workingDirPath, filename, category string) (uploadPackage, error) {
upload := uploadPackage{
Data: data.GCSPackage{
Name: filename,
FuzzCategory: category,
FuzzArchitecture: config.Generator.Architecture,
Files: map[string]data.OutputFiles{},
},
FuzzType: BAD_FUZZ,
FilePath: filepath.Join(config.Aggregator.FuzzPath, filename),
Category: category,
}
attempts := 0
for {
attempts++
dump, stderr := performAnalysis(ctx, workingDirPath, CLANG_DEBUG, upload.FilePath, category)
upload.Data.Files["CLANG_DEBUG"] = data.OutputFiles{
Key: "CLANG_DEBUG",
Content: map[string]string{
"stdout": dump,
"stderr": stderr,
},
}
dump, stderr = performAnalysis(ctx, workingDirPath, CLANG_RELEASE, upload.FilePath, category)
upload.Data.Files["CLANG_RELEASE"] = data.OutputFiles{
Key: "CLANG_RELEASE",
Content: map[string]string{
"stdout": dump,
"stderr": stderr,
},
}
// AddressSanitizer only outputs to stderr
_, stderr = performAnalysis(ctx, workingDirPath, ASAN_DEBUG, upload.FilePath, category)
upload.Data.Files["ASAN_DEBUG"] = data.OutputFiles{
Key: "ASAN_DEBUG",
Content: map[string]string{
"stderr": stderr,
},
}
_, stderr = performAnalysis(ctx, workingDirPath, ASAN_RELEASE, upload.FilePath, category)
upload.Data.Files["ASAN_RELEASE"] = data.OutputFiles{
Key: "ASAN_RELEASE",
Content: map[string]string{
"stderr": stderr,
},
}
if r := data.ParseGCSPackage(upload.Data); r.IsGrey() {
upload.FuzzType = GREY_FUZZ
break
}
// We know the fuzz is bad - re-analyze to avoid fuzzes that may have flaked bad
// but are hard to reproduce.
if attempts >= RETRIES_FOR_BAD_FUZZES {
break
}
}
return upload, nil
}
// performAnalysis executes a command from the working dir specified using
// AnalysisArgs for a given fuzz category. The crash dumps (which
// come via standard out) and standard errors are recorded as strings.
func performAnalysis(ctx context.Context, workingDirPath, executableName, pathToFile, category string) (string, string) {
var dump bytes.Buffer
var stdErr bytes.Buffer
// GNU timeout is used instead of the option on exec.Command because experimentation with the
// latter showed evidence of that way leaking processes, which led to OOM errors.
cmd := &exec.Command{
Name: "timeout",
Args: common.AnalysisArgsFor(category, "./"+executableName, pathToFile),
LogStdout: false,
LogStderr: false,
Stdout: &dump,
Stderr: &stdErr,
Dir: workingDirPath,
InheritPath: true,
Env: []string{common.ASAN_OPTIONS},
Verbose: exec.Debug,
}
//errors are fine/expected from this, as we are dealing with bad fuzzes
if err := exec.Run(ctx, cmd); err != nil {
return dump.String(), stdErr.String()
}
return dump.String(), stdErr.String()
}
// calcuateHash calculates the sha1 hash of a file, given its path. It returns both the hash as a
// hex-encoded string and the contents of the file.
func calculateHash(path string) (hash string, data []byte, err error) {
data, err = ioutil.ReadFile(path)
if err == os.ErrNotExist {
return "", nil, err
}
if err != nil {
return "", nil, fmt.Errorf("Problem reading file for hashing %s: %s", path, err)
}
return fmt.Sprintf("%x", sha1.Sum(data)), data, nil
}
// A SortedStringSlice has a sortable string slice which is always kept sorted. This allows for an
// implementation of Contains that runs in O(log n)
type SortedStringSlice struct {
strings sort.StringSlice
}
// Contains returns true if the passed in string is in the underlying slice
func (s *SortedStringSlice) Contains(str string) bool {
i := s.strings.Search(str)
if i < len(s.strings) && s.strings[i] == str {
return true
}
return false
}
// Append adds all of the strings to the underlying slice and sorts it
func (s *SortedStringSlice) Append(strs []string) {
s.strings = append(s.strings, strs...)
s.strings.Sort()
}
// waitForUploads waits for uploadPackages to be sent through the forUpload channel and then uploads
// them. If any unrecoverable errors happen, this method terminates.
func (agg *Aggregator) waitForUploads(identifier int) {
defer agg.aggregationWaitGroup.Done()
defer metrics2.GetCounter("upload_process_count", nil).Dec(int64(1))
sklog.Infof("Spawning uploader %d", identifier)
for {
select {
case p := <-agg.forUpload:
if !agg.UploadGreyFuzzes && p.FuzzType == GREY_FUZZ {
sklog.Infof("Skipping upload of grey fuzz %s", p.Data.Name)
continue
}
d, found := agg.deduplicators[p.Category]
if !found {
sklog.Errorf("Problem in Uploader %d, no deduplicator found for category %q; %#v;", identifier, p.Category, agg.deduplicators)
return
}
if !agg.WatchForRegressions && p.FuzzType != GREY_FUZZ && !d.IsUnique(data.ParseReport(p.Data)) {
sklog.Infof("Skipping upload of duplicate fuzz %s", p.Data.Name)
agg.duplicateNames = append(agg.duplicateNames, p.Data.Name)
continue
}
if err := agg.upload(p); err != nil {
sklog.Errorf("Uploader %d terminated due to error: %s", identifier, err)
return
}
agg.forBugReporting <- bugReportingPackage{
Data: issues.IssueReportingPackage{
FuzzName: p.Data.Name,
CommitRevision: config.Common.SkiaVersion.Hash,
Category: p.Category,
},
IsBadFuzz: p.FuzzType == BAD_FUZZ,
}
case <-agg.aggregationShutdown:
sklog.Infof("Uploader %d recieved shutdown signal", identifier)
return
}
}
}
// upload breaks apart the uploadPackage into its constituant parts and uploads them to GCS using
// some helper methods.
func (agg *Aggregator) upload(p uploadPackage) error {
sklog.Infof("uploading %s with file %s", p.Data.Name, p.FilePath)
if p.FuzzType == GREY_FUZZ {
agg.greyNames = append(agg.greyNames, p.Data.Name)
} else {
agg.badNames = append(agg.badNames, p.Data.Name)
}
if err := agg.uploadBinaryFromDisk(p, p.Data.Name, p.FilePath); err != nil {
return err
}
if err := agg.uploadString(p, p.Data.Name+"_debug.asan", p.Data.Files["ASAN_DEBUG"].Content["stderr"]); err != nil {
return err
}
if err := agg.uploadString(p, p.Data.Name+"_debug.dump", p.Data.Files["CLANG_DEBUG"].Content["stdout"]); err != nil {
return err
}
if err := agg.uploadString(p, p.Data.Name+"_debug.err", p.Data.Files["CLANG_DEBUG"].Content["stderr"]); err != nil {
return err
}
if err := agg.uploadString(p, p.Data.Name+"_release.asan", p.Data.Files["ASAN_RELEASE"].Content["stderr"]); err != nil {
return err
}
if err := agg.uploadString(p, p.Data.Name+"_release.dump", p.Data.Files["CLANG_RELEASE"].Content["stdout"]); err != nil {
return err
}
return agg.uploadString(p, p.Data.Name+"_release.err", p.Data.Files["CLANG_RELEASE"].Content["stderr"])
}
// uploadBinaryFromDisk uploads a binary file on disk to GCS, returning an error if anything
// goes wrong.
func (agg *Aggregator) uploadBinaryFromDisk(p uploadPackage, fileName, filePath string) error {
name := fmt.Sprintf("%s/%s/%s/%s/%s/%s", p.Category, config.Common.SkiaVersion.Hash, config.Generator.Architecture, p.FuzzType, p.Data.Name, fileName)
w := agg.storageClient.Bucket(config.GCS.Bucket).Object(name).NewWriter(context.Background())
defer util.Close(w)
// We set the encoding to avoid accidental crashes if Chrome were to try to render a fuzzed png
// or svg or something.
w.ObjectAttrs.ContentEncoding = "application/octet-stream"
f, err := os.Open(filePath)
if err != nil {
return fmt.Errorf("There was a problem reading %s for uploading : %s", filePath, err)
}
if n, err := io.Copy(w, f); err != nil {
return fmt.Errorf("There was a problem uploading binary file %s. Only uploaded %d bytes : %s", name, n, err)
}
return nil
}
// uploadBinaryFromDisk uploads the contents of a string as a file to GCS, returning an error if
// anything goes wrong.
func (agg *Aggregator) uploadString(p uploadPackage, fileName, contents string) error {
name := fmt.Sprintf("%s/%s/%s/%s/%s/%s", p.Category, config.Common.SkiaVersion.Hash, config.Generator.Architecture, p.FuzzType, p.Data.Name, fileName)
w := agg.storageClient.Bucket(config.GCS.Bucket).Object(name).NewWriter(context.Background())
defer util.Close(w)
w.ObjectAttrs.ContentEncoding = "text/plain"
if n, err := w.Write([]byte(contents)); err != nil {
return fmt.Errorf("There was a problem uploading %s. Only uploaded %d bytes: %s", name, n, err)
}
return nil
}
// waitForUploads waits for uploadPackages to be sent through the forUpload channel and then uploads
// them. If any unrecoverable errors happen, this method terminates.
func (agg *Aggregator) waitForBugReporting() {
defer agg.aggregationWaitGroup.Done()
sklog.Info("Spawning bug reporting routine")
for {
select {
case p := <-agg.forBugReporting:
if err := agg.bugReportingHelper(p); err != nil {
sklog.Errorf("Bug reporting terminated due to error: %s", err)
return
}
case <-agg.aggregationShutdown:
sklog.Infof("Bug reporting routine recieved shutdown signal")
return
}
}
}
// bugReportingHelper is a helper function to report bugs if the aggregator is configured to.
func (agg *Aggregator) bugReportingHelper(p bugReportingPackage) error {
if agg.MakeBugOnBadFuzz && p.IsBadFuzz && common.Status(p.Data.Category) == common.STABLE_FUZZER {
sklog.Infof("Creating bug for %s", p.Data.FuzzName)
if err := agg.issueManager.CreateBadBugIssue(p.Data, "Crash found on 'stable' fuzzer"); err != nil {
sklog.Errorf("Error while creating issue for bad fuzz: %s", err)
}
}
return nil
}
// monitorStatus sets up the monitoring routine, which reports how big the work queues are and how
// many processes are up.
func (agg *Aggregator) monitorStatus(numAnalysisProcesses, numUploadProcesses int) {
defer agg.monitoringWaitGroup.Done()
analysisProcessCount := metrics2.GetCounter("analysis_process_count", nil)
analysisProcessCount.Reset()
analysisProcessCount.Inc(int64(numAnalysisProcesses))
uploadProcessCount := metrics2.GetCounter("upload_process_count", nil)
uploadProcessCount.Reset()
uploadProcessCount.Inc(int64(numUploadProcesses))
t := time.Tick(config.Aggregator.StatusPeriod)
for {
select {
case <-agg.monitoringShutdown:
sklog.Info("aggregator monitor got signal to shut down")
return
case <-t:
metrics2.GetInt64Metric("fuzzer_queue_size_analysis", nil).Update(int64(len(agg.forAnalysis)))
metrics2.GetInt64Metric("fuzzer_queue_size_upload", nil).Update(int64(len(agg.forUpload)))
metrics2.GetInt64Metric("fuzzer_queue_size_bug_report", nil).Update(int64(len(agg.forBugReporting)))
}
}
}
// Shutdown gracefully shuts down the aggregator. Anything that was being processed will finish
// prior to the shutdown.
func (agg *Aggregator) ShutDown() {
// once for the monitoring and once for the scanning routines
agg.monitoringShutdown <- true
agg.monitoringShutdown <- true
agg.monitoringWaitGroup.Wait()
// wait for everything to finish analysis and upload
agg.WaitForEmptyQueues()
// signal once for every group b thread we started, which is the capacity of our
// aggregationShutdown channel.
for i := len(agg.aggregationShutdown); i < cap(agg.aggregationShutdown); i++ {
agg.aggregationShutdown <- true
}
agg.aggregationWaitGroup.Wait()
}
// RestartAnalysis restarts the shut down aggregator, assuming that config.Common.SkiaVersion.Hash
// is updated to the desired revision of Skia that should be analyzied.
// Anything that is in the scanning directory should be cleared out,
// lest it be rescanned/analyzed.
func (agg *Aggregator) RestartAnalysis(ctx context.Context) error {
for _, d := range agg.deduplicators {
d.SetRevision(config.Common.SkiaVersion.Hash)
}
return agg.start(ctx)
}
// WaitForEmptyQueues will return once there is nothing more in the analysis-upload-report
// pipeline, waiting in increments of config.Aggregator.StatusPeriod until it is done.
func (agg *Aggregator) WaitForEmptyQueues() {
emptyCount := 0
for range time.Tick(config.Aggregator.StatusPeriod) {
a := len(agg.forAnalysis)
u := len(agg.forUpload)
b := len(agg.forBugReporting)
sklog.Infof("AnalysisQueue: %d, UploadQueue: %d, BugReportingQueue: %d", a, u, b)
if a == 0 && u == 0 && b == 0 {
emptyCount++
if emptyCount >= EMPTY_THRESHOLD {
break
}
} else {
// reset the counter, there was likely a pending task we didn't see.
emptyCount = 0
}
sklog.Infof("Waiting %s for the aggregator's queues to be empty %d more times", config.Aggregator.StatusPeriod, EMPTY_THRESHOLD-emptyCount)
}
}
// ForceAnalysis directly adds the given path to the analysis queue, where it will be analyzed,
// uploaded and possibly bug reported.
func (agg *Aggregator) ForceAnalysis(path, category string) {
agg.forAnalysis <- analysisPackage{
FilePath: path,
Category: category,
}
}
func (agg *Aggregator) ClearUploadedFuzzNames() {
agg.greyNames = []string{}
agg.badNames = []string{}
agg.duplicateNames = []string{}
}
func (agg *Aggregator) UploadedFuzzNames() (bad, grey, duplicate []string) {
return agg.badNames, agg.greyNames, agg.duplicateNames
}