| package aggregator |
| |
| import ( |
| "bytes" |
| "context" |
| "crypto/sha1" |
| "fmt" |
| "io" |
| "io/ioutil" |
| "os" |
| "path/filepath" |
| "regexp" |
| "sort" |
| "strings" |
| "sync" |
| "time" |
| |
| "cloud.google.com/go/storage" |
| "go.skia.org/infra/fuzzer/go/common" |
| "go.skia.org/infra/fuzzer/go/config" |
| "go.skia.org/infra/fuzzer/go/data" |
| "go.skia.org/infra/fuzzer/go/deduplicator" |
| "go.skia.org/infra/fuzzer/go/issues" |
| fstorage "go.skia.org/infra/fuzzer/go/storage" |
| "go.skia.org/infra/go/buildskia" |
| "go.skia.org/infra/go/exec" |
| "go.skia.org/infra/go/fileutil" |
| "go.skia.org/infra/go/metrics2" |
| "go.skia.org/infra/go/sklog" |
| "go.skia.org/infra/go/util" |
| ) |
| |
| // Aggregator is a key part of the fuzzing operation |
| // (see https://skia.googlesource.com/buildbot/+/master/fuzzer/DESIGN.md). |
| // It will find new bad fuzzes generated by afl-fuzz and create the metadata required for them. It |
| // does this by searching in the specified AflOutputPath for new crashes and moves them to a |
| // temporary holding folder (specified by FuzzPath) for parsing, before sending them through the |
| // "aggregation pipeline". This pipeline has three steps, Analysis, Upload and Bug Reporting. |
| // Analysis runs the fuzz against a debug and release version of Skia which produces stacktraces and |
| // error output. Upload uploads these pieces to Google Storage (GCS). Bug Reporting is used to |
| // either create or update a bug related to the given fuzz. |
| type Aggregator struct { |
| // If we are watching for regressions, all fuzzes passed in should be "grey". If they are not |
| // don't deduplicate them. |
| WatchForRegressions bool |
| // Should be set to true if a bug should be created for every bad fuzz found. |
| // Example: detecting a bad fuzz in a "stable" fuzzer |
| // TODO(kjlubick): consider making this a function that clients supply so they can decide. |
| MakeBugOnBadFuzz bool |
| // Should be set if we want to upload grey fuzzes. This should only be true |
| // if we are changing versions. |
| UploadGreyFuzzes bool |
| |
| storageClient *storage.Client |
| |
| issueManager *issues.IssuesManager |
| |
| // For passing the paths of new binaries that should be scanned. |
| forAnalysis chan analysisPackage |
| // For passing the file names of analyzed fuzzes that should be uploaded from where they rest on |
| // disk in `fuzzPath` |
| forUpload chan uploadPackage |
| |
| forBugReporting chan bugReportingPackage |
| |
| // maps category to its deduplicator |
| deduplicators map[string]deduplicator.Deduplicator |
| |
| // The shutdown channels are used to signal shutdowns. There are two groups, to |
| // allow for a softer, cleaner shutdown w/ minimal lost work. |
| // Group A (monitoring) includes the scanning and the monitoring routine. |
| // Group B (aggregation) include the analysis and upload routines and the bug reporting routine. |
| monitoringShutdown chan bool |
| monitoringWaitGroup *sync.WaitGroup |
| aggregationShutdown chan bool |
| aggregationWaitGroup *sync.WaitGroup |
| |
| greyNames []string |
| badNames []string |
| duplicateNames []string |
| } |
| |
| const ( |
| BAD_FUZZ = "bad" |
| GREY_FUZZ = "grey" |
| EMPTY_THRESHOLD = 5 |
| RETRIES_FOR_BAD_FUZZES = 3 |
| ) |
| |
| var ( |
| CLANG_DEBUG = common.TEST_HARNESS_NAME + "_clang_debug" |
| CLANG_RELEASE = common.TEST_HARNESS_NAME + "_clang_release" |
| ASAN_DEBUG = common.TEST_HARNESS_NAME + "_asan_debug" |
| ASAN_RELEASE = common.TEST_HARNESS_NAME + "_asan_release" |
| |
| ANALYSIS_EXECUTABLE_LIST = []string{ASAN_RELEASE, ASAN_DEBUG, CLANG_RELEASE, CLANG_DEBUG} |
| ) |
| |
| // analysisPackage is a struct containing all the pieces of a fuzz needed to analyse it. |
| type analysisPackage struct { |
| FilePath string |
| Category string |
| } |
| |
| // uploadPackage is a struct containing all the pieces of a fuzz that need to be uploaded to GCS |
| type uploadPackage struct { |
| Data data.GCSPackage |
| FilePath string |
| // Must be BAD_FUZZ or GREY_FUZZ |
| FuzzType string |
| Category string |
| } |
| |
| // bugReportingPackage is a struct containing the pieces of a fuzz that may need to have |
| // a bug filed or updated. |
| type bugReportingPackage struct { |
| Data issues.IssueReportingPackage |
| IsBadFuzz bool |
| } |
| |
| // StartAggregator creates and starts a Aggregator based on the Skia Revision in |
| // config.Common.SkiaVersion.Hash. |
| // If there is a problem starting up, an error is returned. Other errors will be logged. |
| func StartAggregator(ctx context.Context, s *storage.Client, im *issues.IssuesManager, startingReports map[string]<-chan data.FuzzReport) (*Aggregator, error) { |
| b := Aggregator{ |
| storageClient: s, |
| issueManager: im, |
| forAnalysis: make(chan analysisPackage, 1000000), |
| forUpload: make(chan uploadPackage, 10000), |
| forBugReporting: make(chan bugReportingPackage, 10000), |
| MakeBugOnBadFuzz: true, |
| UploadGreyFuzzes: false, |
| deduplicators: make(map[string]deduplicator.Deduplicator), |
| monitoringShutdown: make(chan bool, 2), |
| // aggregationShutdown needs to be created with a calculated capacity in start |
| } |
| |
| // preload the deduplicator |
| for _, category := range config.Generator.FuzzesToGenerate { |
| client := fstorage.NewFuzzerGCSClient(s, config.GCS.Bucket) |
| d := deduplicator.NewRemoteDeduplicator(client) |
| d.SetRevision(config.Common.SkiaVersion.Hash) |
| for report := range startingReports[category] { |
| d.IsUnique(report) |
| } |
| b.deduplicators[category] = d |
| } |
| |
| return &b, b.start(ctx) |
| } |
| |
| // start starts up the Aggregator. It refreshes all status it needs and builds a debug and a |
| // release version of Skia for use in analysis. It then spawns the aggregation pipeline and a |
| // monitoring thread. |
| func (agg *Aggregator) start(ctx context.Context) error { |
| // Set the wait groups to fresh |
| agg.monitoringWaitGroup = &sync.WaitGroup{} |
| agg.aggregationWaitGroup = &sync.WaitGroup{} |
| |
| if err := agg.buildAnalysisBinaries(ctx); err != nil { |
| return err |
| } |
| |
| agg.monitoringWaitGroup.Add(1) |
| go agg.scanForNewCandidates() |
| |
| numAnalysisProcesses := config.Aggregator.NumAnalysisProcesses |
| if numAnalysisProcesses <= 0 { |
| // TODO(kjlubick): Actually make this smart based on the number of cores |
| numAnalysisProcesses = 20 |
| } |
| for i := 0; i < numAnalysisProcesses; i++ { |
| agg.aggregationWaitGroup.Add(1) |
| go agg.waitForAnalysis(ctx, i) |
| } |
| |
| numUploadProcesses := config.Aggregator.NumUploadProcesses |
| if numUploadProcesses <= 0 { |
| // TODO(kjlubick): Actually make this smart based on the number of cores/number |
| // of aggregation processes |
| numUploadProcesses = 5 |
| } |
| for i := 0; i < numUploadProcesses; i++ { |
| agg.aggregationWaitGroup.Add(1) |
| go agg.waitForUploads(i) |
| } |
| agg.aggregationWaitGroup.Add(1) |
| go agg.waitForBugReporting() |
| agg.aggregationShutdown = make(chan bool, numAnalysisProcesses+numUploadProcesses+1) |
| // start background routine to monitor queue details |
| agg.monitoringWaitGroup.Add(1) |
| go agg.monitorStatus(numAnalysisProcesses, numUploadProcesses) |
| return nil |
| } |
| |
| // buildAnalysisBinaries creates the 4 executables we need to perform analysis and makes a copy of |
| // them in the executablePath. We need (Debug,Release) x (Clang,ASAN). The copied binaries have |
| // a suffix like _clang_debug |
| func (agg *Aggregator) buildAnalysisBinaries(ctx context.Context) error { |
| if _, err := fileutil.EnsureDirExists(config.Aggregator.FuzzPath); err != nil { |
| return err |
| } |
| if _, err := fileutil.EnsureDirExists(config.Aggregator.WorkingPath); err != nil { |
| return err |
| } |
| if srcExe, err := common.BuildClangHarness(ctx, buildskia.DEBUG_BUILD, true); err != nil { |
| return err |
| } else if err := fileutil.CopyExecutable(srcExe, filepath.Join(config.Aggregator.WorkingPath, CLANG_DEBUG)); err != nil { |
| return err |
| } |
| if srcExe, err := common.BuildClangHarness(ctx, buildskia.RELEASE_BUILD, true); err != nil { |
| return err |
| } else if err := fileutil.CopyExecutable(srcExe, filepath.Join(config.Aggregator.WorkingPath, CLANG_RELEASE)); err != nil { |
| return err |
| } |
| if srcExe, err := common.BuildASANHarness(ctx, buildskia.DEBUG_BUILD, false); err != nil { |
| return err |
| } else if err := fileutil.CopyExecutable(srcExe, filepath.Join(config.Aggregator.WorkingPath, ASAN_DEBUG)); err != nil { |
| return err |
| } |
| if srcExe, err := common.BuildASANHarness(ctx, buildskia.RELEASE_BUILD, false); err != nil { |
| return err |
| } else if err := fileutil.CopyExecutable(srcExe, filepath.Join(config.Aggregator.WorkingPath, ASAN_RELEASE)); err != nil { |
| return err |
| } |
| return nil |
| } |
| |
| // scanForNewCandidates runs scanHelper once every config.Aggregator.RescanPeriod, which scans the |
| // config.Generator.AflOutputPath for new fuzzes. If scanHelper returns an error, this method |
| // will terminate. |
| func (agg *Aggregator) scanForNewCandidates() { |
| defer agg.monitoringWaitGroup.Done() |
| |
| alreadyFoundFuzzes := &SortedStringSlice{} |
| // time.Tick does not fire immediately, so we fire it manually once. |
| if err := agg.scanHelper(alreadyFoundFuzzes); err != nil { |
| sklog.Errorf("Scanner terminated due to error: %v", err) |
| return |
| } |
| sklog.Infof("Sleeping for %s, then waking up to find new crashes again", config.Aggregator.RescanPeriod) |
| |
| t := time.Tick(config.Aggregator.RescanPeriod) |
| for { |
| select { |
| case <-t: |
| if err := agg.scanHelper(alreadyFoundFuzzes); err != nil { |
| sklog.Errorf("Aggregator scanner terminated due to error: %s", err) |
| return |
| } |
| if err := collectFuzzerMetrics(); err != nil { |
| sklog.Warningf("Encountered error getting fuzzer metrics: %s", err) |
| } |
| sklog.Infof("Sleeping for %s, then waking up to find new crashes again", config.Aggregator.RescanPeriod) |
| case <-agg.monitoringShutdown: |
| sklog.Info("Aggregator scanner got signal to shut down") |
| return |
| } |
| |
| } |
| } |
| |
| // scanHelper runs findFuzzPaths for every category, logs the output and keeps |
| // alreadyFoundFuzzes up to date. |
| func (agg *Aggregator) scanHelper(alreadyFoundFuzzes *SortedStringSlice) error { |
| for _, category := range config.Generator.FuzzesToGenerate { |
| newlyFound, err := findFuzzPaths(category, alreadyFoundFuzzes) |
| if err != nil { |
| return err |
| } |
| // AFL-fuzz does not write crashes or hangs atomically, so this workaround waits for a bit after |
| // we have references to where the crashes will be. |
| // TODO(kjlubick), switch to using flock once afl-fuzz implements that upstream. |
| time.Sleep(time.Second) |
| metrics2.GetInt64Metric("fuzzer_fuzzes_newly_found", map[string]string{"fuzz_category": category, "architecture": config.Generator.Architecture}).Update(int64(len(newlyFound))) |
| sklog.Infof("%d newly found %s bad fuzzes", len(newlyFound), category) |
| for _, f := range newlyFound { |
| agg.forAnalysis <- analysisPackage{ |
| FilePath: f, |
| Category: category, |
| } |
| } |
| alreadyFoundFuzzes.Append(newlyFound) |
| } |
| |
| return nil |
| } |
| |
| // findFuzzPaths looks through all the afl-fuzz directories contained in the passed in path and |
| // returns the absolute path to all files that need to be analyzied which are not already in |
| // 'alreadyFoundFuzzes'. It also sends them to the forAnalysis channel when it finds them. |
| // The output from afl-fuzz looks like: |
| // afl_output_path/category/ |
| // -fuzzer0/ |
| // -crashes/ <-- bad fuzzes end up here (search) |
| // -hangs/ |
| // -queue/ <-- all fuzzes end up here (search, may catch ASAN-only) |
| // -fuzzer_stats |
| // -fuzzer1/ |
| // -crashes/ <-- bad fuzzes end up here (search) |
| // -hangs/ |
| // -queue/ <-- all fuzzes end up here (search, may catch ASAN-only) |
| // -fuzzer_stats |
| // -fuzzer2/ |
| // ... |
| func findFuzzPaths(category string, alreadyFoundFuzzes *SortedStringSlice) ([]string, error) { |
| badFuzzPaths := make([]string, 0) |
| |
| scanPath := filepath.Join(config.Generator.AflOutputPath, category) |
| aflDir, err := os.Open(scanPath) |
| if os.IsNotExist(err) { |
| sklog.Warningf("Path to scan %s does not exist. Returning 0 found fuzzes", scanPath) |
| return []string{}, nil |
| } |
| if err != nil { |
| return nil, err |
| } |
| defer util.Close(aflDir) |
| |
| fuzzerFolders, err := aflDir.Readdir(-1) |
| if err != nil { |
| return nil, err |
| } |
| |
| for _, fuzzerFolderInfo := range fuzzerFolders { |
| // fuzzerFolderName an os.FileInfo like fuzzer0, fuzzer1 |
| path := filepath.Join(scanPath, fuzzerFolderInfo.Name()) |
| fuzzerDir, err := os.Open(path) |
| if err != nil { |
| return nil, err |
| } |
| defer util.Close(fuzzerDir) |
| |
| fuzzerContents, err := fuzzerDir.Readdir(-1) |
| if err != nil { |
| return nil, err |
| } |
| for _, info := range fuzzerContents { |
| // Look through fuzzerN/crashes and fuzzerN/queue |
| if info.IsDir() && (strings.HasPrefix(info.Name(), "crashes") || strings.HasPrefix(info.Name(), "queue")) { |
| crashPath := filepath.Join(path, info.Name()) |
| crashDir, err := os.Open(crashPath) |
| if err != nil { |
| return nil, err |
| } |
| defer util.Close(crashDir) |
| |
| crashContents, err := crashDir.Readdir(-1) |
| if err != nil { |
| return nil, err |
| } |
| for _, crash := range crashContents { |
| // Make sure the files are actually crashable files we haven't found before |
| if crash.Name() != "README.txt" && !crash.IsDir() { |
| if fuzzPath := filepath.Join(crashPath, crash.Name()); !alreadyFoundFuzzes.Contains(fuzzPath) { |
| badFuzzPaths = append(badFuzzPaths, fuzzPath) |
| } |
| } |
| } |
| } |
| } |
| } |
| return badFuzzPaths, nil |
| } |
| |
| var aflMetrics = []string{"start_time", "last_update", "cycles_done", "execs_done", "execs_per_sec", "paths_total", "paths_found", "paths_imported", "stability"} |
| |
| var aflRegexps = make([]*regexp.Regexp, 0, len(aflMetrics)) |
| |
| // collectFuzzerMetrics looks through the fuzzer_stats file in the main fuzzer output folder of |
| // each fuzz category and extracts some key metrics from it. The format of this file is |
| // detailed in http://lcamtuf.coredump.cx/afl/status_screen.txt |
| func collectFuzzerMetrics() error { |
| // Compile the regexps for the metrics once |
| if len(aflRegexps) == 0 { |
| for _, m := range aflMetrics { |
| r, err := regexp.Compile(m + `\s+:\s+(\d+)`) |
| if err != nil { |
| sklog.Warningf("Had a problem compiling regexp for %s: %s", m, err) |
| } |
| aflRegexps = append(aflRegexps, r) |
| } |
| } |
| |
| for _, category := range config.Generator.FuzzesToGenerate { |
| statsFile := filepath.Join(config.Generator.AflOutputPath, category, "fuzzer0", "fuzzer_stats") |
| b, err := ioutil.ReadFile(statsFile) |
| if err != nil { |
| sklog.Errorf("Cannot read metrics file %s, continuing anyway: %s", statsFile, err) |
| continue |
| } |
| contents := string(b) |
| for i, m := range aflMetrics { |
| r := aflRegexps[i] |
| if r == nil { |
| continue |
| } |
| metric := fmt.Sprintf("fuzzer_stats_%s", m) |
| if match := r.FindStringSubmatch(contents); match != nil { |
| metrics2.GetInt64Metric(metric, map[string]string{"fuzz_category": category, "architecture": config.Generator.Architecture}).Update(int64(util.SafeAtoi(match[1]))) |
| } |
| } |
| } |
| return nil |
| } |
| |
| // waitForAnalysis waits for files that need to be analyzed (from forAnalysis) and makes a copy of |
| // them in agg.fuzzPath with their hash as a file name. It then analyzes it using the supplied |
| // AnalysisPackage and then signals the results should be uploaded. If any unrecoverable errors |
| // happen, this method terminates. |
| func (agg *Aggregator) waitForAnalysis(ctx context.Context, identifier int) { |
| defer agg.aggregationWaitGroup.Done() |
| defer metrics2.GetCounter("analysis_process_count", nil).Dec(int64(1)) |
| sklog.Infof("Spawning analyzer %d", identifier) |
| |
| // our own unique working folder |
| executableDir := filepath.Join(config.Aggregator.WorkingPath, fmt.Sprintf("analyzer%d", identifier)) |
| if err := setupAnalysis(executableDir); err != nil { |
| sklog.Errorf("Analyzer %d terminated due to error: %s", identifier, err) |
| return |
| } |
| for { |
| select { |
| case badFuzz := <-agg.forAnalysis: |
| err := agg.analysisHelper(ctx, executableDir, badFuzz) |
| if err != nil { |
| sklog.Errorf("Analyzer %d terminated due to error: %s", identifier, err) |
| return |
| } |
| case <-agg.aggregationShutdown: |
| sklog.Infof("Analyzer %d recieved shutdown signal", identifier) |
| return |
| } |
| } |
| } |
| |
| // analysisHelper performs the analysis on the given fuzz and returns an error if anything goes |
| // wrong. On success, the results will be placed in the upload queue. |
| func (agg *Aggregator) analysisHelper(ctx context.Context, executableDir string, badFuzz analysisPackage) error { |
| hash, data, err := calculateHash(badFuzz.FilePath) |
| if err == os.ErrNotExist { |
| // fuzzes from the queue sometimes are transient (e.g. the most recent one, if it is found to be |
| // a duplicate). |
| return nil |
| } |
| if err != nil { |
| return sklog.FmtErrorf("Unexpected error while hashing contents of %#v: %s", badFuzz, err) |
| } |
| newFuzzPath := filepath.Join(config.Aggregator.FuzzPath, hash) |
| if err := ioutil.WriteFile(newFuzzPath, data, 0644); err != nil { |
| return sklog.FmtErrorf("Could not copy %s to %s: %s", badFuzz.FilePath, newFuzzPath, err) |
| } |
| if upload, err := analyze(ctx, executableDir, hash, badFuzz.Category); err != nil { |
| return sklog.FmtErrorf("Problem analyzing %s (originally %s), terminating: %s", newFuzzPath, badFuzz.FilePath, err) |
| } else { |
| agg.forUpload <- upload |
| } |
| return nil |
| } |
| |
| // Setup cleans out the working directory and makes a copy of the Debug and Release fuzz executable |
| // in that directory. |
| func setupAnalysis(workingDirPath string) error { |
| // Delete all previous executables to get a clean start |
| if err := os.RemoveAll(workingDirPath); err != nil && !os.IsNotExist(err) { |
| return sklog.FmtErrorf("Could not clean out analysis directory %s: %s", workingDirPath, err) |
| } |
| if err := os.MkdirAll(workingDirPath, 0755); err != nil { |
| return sklog.FmtErrorf("Could not making analysis directory %s: %s", workingDirPath, err) |
| } |
| |
| // make a copy of the 4 executables that were made in buildAnalysisBinaries() |
| for _, exe := range ANALYSIS_EXECUTABLE_LIST { |
| built_exe := filepath.Join(config.Aggregator.WorkingPath, exe) |
| dst := filepath.Join(workingDirPath, exe) |
| if err := fileutil.CopyExecutable(built_exe, dst); err != nil { |
| return sklog.FmtErrorf("Could not copy executable %s to %s: %s", built_exe, dst, err) |
| } |
| } |
| return nil |
| } |
| |
| // analyze simply invokes performAnalysis with a fuzz under both the Debug and Release build. Upon |
| // completion, it checks to see if the fuzz is a grey fuzz and sets the FuzzType accordingly. |
| func analyze(ctx context.Context, workingDirPath, filename, category string) (uploadPackage, error) { |
| upload := uploadPackage{ |
| Data: data.GCSPackage{ |
| Name: filename, |
| FuzzCategory: category, |
| FuzzArchitecture: config.Generator.Architecture, |
| Files: map[string]data.OutputFiles{}, |
| }, |
| FuzzType: BAD_FUZZ, |
| FilePath: filepath.Join(config.Aggregator.FuzzPath, filename), |
| Category: category, |
| } |
| |
| attempts := 0 |
| for { |
| attempts++ |
| dump, stderr := performAnalysis(ctx, workingDirPath, CLANG_DEBUG, upload.FilePath, category) |
| upload.Data.Files["CLANG_DEBUG"] = data.OutputFiles{ |
| Key: "CLANG_DEBUG", |
| Content: map[string]string{ |
| "stdout": dump, |
| "stderr": stderr, |
| }, |
| } |
| dump, stderr = performAnalysis(ctx, workingDirPath, CLANG_RELEASE, upload.FilePath, category) |
| upload.Data.Files["CLANG_RELEASE"] = data.OutputFiles{ |
| Key: "CLANG_RELEASE", |
| Content: map[string]string{ |
| "stdout": dump, |
| "stderr": stderr, |
| }, |
| } |
| // AddressSanitizer only outputs to stderr |
| _, stderr = performAnalysis(ctx, workingDirPath, ASAN_DEBUG, upload.FilePath, category) |
| upload.Data.Files["ASAN_DEBUG"] = data.OutputFiles{ |
| Key: "ASAN_DEBUG", |
| Content: map[string]string{ |
| "stderr": stderr, |
| }, |
| } |
| _, stderr = performAnalysis(ctx, workingDirPath, ASAN_RELEASE, upload.FilePath, category) |
| upload.Data.Files["ASAN_RELEASE"] = data.OutputFiles{ |
| Key: "ASAN_RELEASE", |
| Content: map[string]string{ |
| "stderr": stderr, |
| }, |
| } |
| if r := data.ParseGCSPackage(upload.Data); r.IsGrey() { |
| upload.FuzzType = GREY_FUZZ |
| break |
| } |
| // We know the fuzz is bad - re-analyze to avoid fuzzes that may have flaked bad |
| // but are hard to reproduce. |
| if attempts >= RETRIES_FOR_BAD_FUZZES { |
| break |
| } |
| } |
| return upload, nil |
| } |
| |
| // performAnalysis executes a command from the working dir specified using |
| // AnalysisArgs for a given fuzz category. The crash dumps (which |
| // come via standard out) and standard errors are recorded as strings. |
| func performAnalysis(ctx context.Context, workingDirPath, executableName, pathToFile, category string) (string, string) { |
| var dump bytes.Buffer |
| var stdErr bytes.Buffer |
| |
| // GNU timeout is used instead of the option on exec.Command because experimentation with the |
| // latter showed evidence of that way leaking processes, which led to OOM errors. |
| cmd := &exec.Command{ |
| Name: "timeout", |
| Args: common.AnalysisArgsFor(category, "./"+executableName, pathToFile), |
| LogStdout: false, |
| LogStderr: false, |
| Stdout: &dump, |
| Stderr: &stdErr, |
| Dir: workingDirPath, |
| InheritPath: true, |
| Env: []string{common.ASAN_OPTIONS}, |
| Verbose: exec.Debug, |
| } |
| |
| //errors are fine/expected from this, as we are dealing with bad fuzzes |
| if err := exec.Run(ctx, cmd); err != nil { |
| return dump.String(), stdErr.String() |
| } |
| return dump.String(), stdErr.String() |
| } |
| |
| // calcuateHash calculates the sha1 hash of a file, given its path. It returns both the hash as a |
| // hex-encoded string and the contents of the file. |
| func calculateHash(path string) (hash string, data []byte, err error) { |
| data, err = ioutil.ReadFile(path) |
| if err == os.ErrNotExist { |
| return "", nil, err |
| } |
| if err != nil { |
| return "", nil, fmt.Errorf("Problem reading file for hashing %s: %s", path, err) |
| } |
| return fmt.Sprintf("%x", sha1.Sum(data)), data, nil |
| } |
| |
| // A SortedStringSlice has a sortable string slice which is always kept sorted. This allows for an |
| // implementation of Contains that runs in O(log n) |
| type SortedStringSlice struct { |
| strings sort.StringSlice |
| } |
| |
| // Contains returns true if the passed in string is in the underlying slice |
| func (s *SortedStringSlice) Contains(str string) bool { |
| i := s.strings.Search(str) |
| if i < len(s.strings) && s.strings[i] == str { |
| return true |
| } |
| return false |
| } |
| |
| // Append adds all of the strings to the underlying slice and sorts it |
| func (s *SortedStringSlice) Append(strs []string) { |
| s.strings = append(s.strings, strs...) |
| s.strings.Sort() |
| } |
| |
| // waitForUploads waits for uploadPackages to be sent through the forUpload channel and then uploads |
| // them. If any unrecoverable errors happen, this method terminates. |
| func (agg *Aggregator) waitForUploads(identifier int) { |
| defer agg.aggregationWaitGroup.Done() |
| defer metrics2.GetCounter("upload_process_count", nil).Dec(int64(1)) |
| sklog.Infof("Spawning uploader %d", identifier) |
| for { |
| select { |
| case p := <-agg.forUpload: |
| if !agg.UploadGreyFuzzes && p.FuzzType == GREY_FUZZ { |
| sklog.Infof("Skipping upload of grey fuzz %s", p.Data.Name) |
| continue |
| } |
| d, found := agg.deduplicators[p.Category] |
| if !found { |
| sklog.Errorf("Problem in Uploader %d, no deduplicator found for category %q; %#v;", identifier, p.Category, agg.deduplicators) |
| return |
| } |
| if !agg.WatchForRegressions && p.FuzzType != GREY_FUZZ && !d.IsUnique(data.ParseReport(p.Data)) { |
| sklog.Infof("Skipping upload of duplicate fuzz %s", p.Data.Name) |
| agg.duplicateNames = append(agg.duplicateNames, p.Data.Name) |
| continue |
| } |
| if err := agg.upload(p); err != nil { |
| sklog.Errorf("Uploader %d terminated due to error: %s", identifier, err) |
| return |
| } |
| agg.forBugReporting <- bugReportingPackage{ |
| Data: issues.IssueReportingPackage{ |
| FuzzName: p.Data.Name, |
| CommitRevision: config.Common.SkiaVersion.Hash, |
| Category: p.Category, |
| }, |
| IsBadFuzz: p.FuzzType == BAD_FUZZ, |
| } |
| case <-agg.aggregationShutdown: |
| sklog.Infof("Uploader %d recieved shutdown signal", identifier) |
| return |
| } |
| } |
| } |
| |
| // upload breaks apart the uploadPackage into its constituant parts and uploads them to GCS using |
| // some helper methods. |
| func (agg *Aggregator) upload(p uploadPackage) error { |
| sklog.Infof("uploading %s with file %s", p.Data.Name, p.FilePath) |
| if p.FuzzType == GREY_FUZZ { |
| agg.greyNames = append(agg.greyNames, p.Data.Name) |
| } else { |
| agg.badNames = append(agg.badNames, p.Data.Name) |
| } |
| |
| if err := agg.uploadBinaryFromDisk(p, p.Data.Name, p.FilePath); err != nil { |
| return err |
| } |
| if err := agg.uploadString(p, p.Data.Name+"_debug.asan", p.Data.Files["ASAN_DEBUG"].Content["stderr"]); err != nil { |
| return err |
| } |
| if err := agg.uploadString(p, p.Data.Name+"_debug.dump", p.Data.Files["CLANG_DEBUG"].Content["stdout"]); err != nil { |
| return err |
| } |
| if err := agg.uploadString(p, p.Data.Name+"_debug.err", p.Data.Files["CLANG_DEBUG"].Content["stderr"]); err != nil { |
| return err |
| } |
| if err := agg.uploadString(p, p.Data.Name+"_release.asan", p.Data.Files["ASAN_RELEASE"].Content["stderr"]); err != nil { |
| return err |
| } |
| if err := agg.uploadString(p, p.Data.Name+"_release.dump", p.Data.Files["CLANG_RELEASE"].Content["stdout"]); err != nil { |
| return err |
| } |
| return agg.uploadString(p, p.Data.Name+"_release.err", p.Data.Files["CLANG_RELEASE"].Content["stderr"]) |
| } |
| |
| // uploadBinaryFromDisk uploads a binary file on disk to GCS, returning an error if anything |
| // goes wrong. |
| func (agg *Aggregator) uploadBinaryFromDisk(p uploadPackage, fileName, filePath string) error { |
| name := fmt.Sprintf("%s/%s/%s/%s/%s/%s", p.Category, config.Common.SkiaVersion.Hash, config.Generator.Architecture, p.FuzzType, p.Data.Name, fileName) |
| w := agg.storageClient.Bucket(config.GCS.Bucket).Object(name).NewWriter(context.Background()) |
| defer util.Close(w) |
| // We set the encoding to avoid accidental crashes if Chrome were to try to render a fuzzed png |
| // or svg or something. |
| w.ObjectAttrs.ContentEncoding = "application/octet-stream" |
| |
| f, err := os.Open(filePath) |
| if err != nil { |
| return fmt.Errorf("There was a problem reading %s for uploading : %s", filePath, err) |
| } |
| |
| if n, err := io.Copy(w, f); err != nil { |
| return fmt.Errorf("There was a problem uploading binary file %s. Only uploaded %d bytes : %s", name, n, err) |
| } |
| return nil |
| } |
| |
| // uploadBinaryFromDisk uploads the contents of a string as a file to GCS, returning an error if |
| // anything goes wrong. |
| func (agg *Aggregator) uploadString(p uploadPackage, fileName, contents string) error { |
| name := fmt.Sprintf("%s/%s/%s/%s/%s/%s", p.Category, config.Common.SkiaVersion.Hash, config.Generator.Architecture, p.FuzzType, p.Data.Name, fileName) |
| w := agg.storageClient.Bucket(config.GCS.Bucket).Object(name).NewWriter(context.Background()) |
| defer util.Close(w) |
| w.ObjectAttrs.ContentEncoding = "text/plain" |
| |
| if n, err := w.Write([]byte(contents)); err != nil { |
| return fmt.Errorf("There was a problem uploading %s. Only uploaded %d bytes: %s", name, n, err) |
| } |
| return nil |
| } |
| |
| // waitForUploads waits for uploadPackages to be sent through the forUpload channel and then uploads |
| // them. If any unrecoverable errors happen, this method terminates. |
| func (agg *Aggregator) waitForBugReporting() { |
| defer agg.aggregationWaitGroup.Done() |
| sklog.Info("Spawning bug reporting routine") |
| for { |
| select { |
| case p := <-agg.forBugReporting: |
| if err := agg.bugReportingHelper(p); err != nil { |
| sklog.Errorf("Bug reporting terminated due to error: %s", err) |
| return |
| } |
| case <-agg.aggregationShutdown: |
| sklog.Infof("Bug reporting routine recieved shutdown signal") |
| return |
| } |
| } |
| } |
| |
| // bugReportingHelper is a helper function to report bugs if the aggregator is configured to. |
| func (agg *Aggregator) bugReportingHelper(p bugReportingPackage) error { |
| if agg.MakeBugOnBadFuzz && p.IsBadFuzz && common.Status(p.Data.Category) == common.STABLE_FUZZER { |
| sklog.Infof("Creating bug for %s", p.Data.FuzzName) |
| if err := agg.issueManager.CreateBadBugIssue(p.Data, "Crash found on 'stable' fuzzer"); err != nil { |
| sklog.Errorf("Error while creating issue for bad fuzz: %s", err) |
| } |
| } |
| return nil |
| } |
| |
| // monitorStatus sets up the monitoring routine, which reports how big the work queues are and how |
| // many processes are up. |
| func (agg *Aggregator) monitorStatus(numAnalysisProcesses, numUploadProcesses int) { |
| defer agg.monitoringWaitGroup.Done() |
| analysisProcessCount := metrics2.GetCounter("analysis_process_count", nil) |
| analysisProcessCount.Reset() |
| analysisProcessCount.Inc(int64(numAnalysisProcesses)) |
| uploadProcessCount := metrics2.GetCounter("upload_process_count", nil) |
| uploadProcessCount.Reset() |
| uploadProcessCount.Inc(int64(numUploadProcesses)) |
| |
| t := time.Tick(config.Aggregator.StatusPeriod) |
| for { |
| select { |
| case <-agg.monitoringShutdown: |
| sklog.Info("aggregator monitor got signal to shut down") |
| return |
| case <-t: |
| metrics2.GetInt64Metric("fuzzer_queue_size_analysis", nil).Update(int64(len(agg.forAnalysis))) |
| metrics2.GetInt64Metric("fuzzer_queue_size_upload", nil).Update(int64(len(agg.forUpload))) |
| metrics2.GetInt64Metric("fuzzer_queue_size_bug_report", nil).Update(int64(len(agg.forBugReporting))) |
| } |
| } |
| } |
| |
| // Shutdown gracefully shuts down the aggregator. Anything that was being processed will finish |
| // prior to the shutdown. |
| func (agg *Aggregator) ShutDown() { |
| // once for the monitoring and once for the scanning routines |
| agg.monitoringShutdown <- true |
| agg.monitoringShutdown <- true |
| agg.monitoringWaitGroup.Wait() |
| // wait for everything to finish analysis and upload |
| agg.WaitForEmptyQueues() |
| |
| // signal once for every group b thread we started, which is the capacity of our |
| // aggregationShutdown channel. |
| for i := len(agg.aggregationShutdown); i < cap(agg.aggregationShutdown); i++ { |
| agg.aggregationShutdown <- true |
| } |
| agg.aggregationWaitGroup.Wait() |
| } |
| |
| // RestartAnalysis restarts the shut down aggregator, assuming that config.Common.SkiaVersion.Hash |
| // is updated to the desired revision of Skia that should be analyzied. |
| // Anything that is in the scanning directory should be cleared out, |
| // lest it be rescanned/analyzed. |
| func (agg *Aggregator) RestartAnalysis(ctx context.Context) error { |
| for _, d := range agg.deduplicators { |
| d.SetRevision(config.Common.SkiaVersion.Hash) |
| } |
| return agg.start(ctx) |
| } |
| |
| // WaitForEmptyQueues will return once there is nothing more in the analysis-upload-report |
| // pipeline, waiting in increments of config.Aggregator.StatusPeriod until it is done. |
| func (agg *Aggregator) WaitForEmptyQueues() { |
| emptyCount := 0 |
| for range time.Tick(config.Aggregator.StatusPeriod) { |
| a := len(agg.forAnalysis) |
| u := len(agg.forUpload) |
| b := len(agg.forBugReporting) |
| sklog.Infof("AnalysisQueue: %d, UploadQueue: %d, BugReportingQueue: %d", a, u, b) |
| if a == 0 && u == 0 && b == 0 { |
| emptyCount++ |
| if emptyCount >= EMPTY_THRESHOLD { |
| break |
| } |
| } else { |
| // reset the counter, there was likely a pending task we didn't see. |
| emptyCount = 0 |
| } |
| |
| sklog.Infof("Waiting %s for the aggregator's queues to be empty %d more times", config.Aggregator.StatusPeriod, EMPTY_THRESHOLD-emptyCount) |
| } |
| } |
| |
| // ForceAnalysis directly adds the given path to the analysis queue, where it will be analyzed, |
| // uploaded and possibly bug reported. |
| func (agg *Aggregator) ForceAnalysis(path, category string) { |
| agg.forAnalysis <- analysisPackage{ |
| FilePath: path, |
| Category: category, |
| } |
| } |
| |
| func (agg *Aggregator) ClearUploadedFuzzNames() { |
| agg.greyNames = []string{} |
| agg.badNames = []string{} |
| agg.duplicateNames = []string{} |
| } |
| |
| func (agg *Aggregator) UploadedFuzzNames() (bad, grey, duplicate []string) { |
| return agg.badNames, agg.greyNames, agg.duplicateNames |
| } |