| package ingester |
| |
| import ( |
| "fmt" |
| "io" |
| "net/http" |
| |
| "time" |
| ) |
| |
| import ( |
| storage "code.google.com/p/google-api-go-client/storage/v1" |
| "github.com/golang/glog" |
| metrics "github.com/rcrowley/go-metrics" |
| "skia.googlesource.com/buildbot.git/go/gitinfo" |
| "skia.googlesource.com/buildbot.git/go/gs" |
| "skia.googlesource.com/buildbot.git/go/util" |
| "skia.googlesource.com/buildbot.git/perf/go/config" |
| "skia.googlesource.com/buildbot.git/perf/go/filetilestore" |
| "skia.googlesource.com/buildbot.git/perf/go/types" |
| ) |
| |
| var ( |
| client *http.Client |
| ) |
| |
| // Init initializes the module, the optional http.Client is used to make HTTP |
| // requests to Google Storage. If nil is supplied then a default client is |
| // used. |
| func Init(cl *http.Client) { |
| if cl != nil { |
| client = cl |
| } else { |
| client = util.NewTimeoutClient() |
| } |
| } |
| |
| // IngestResultsFiles is passed to NewIngester, it does the actual work of mapping the resultsFiles into the Tiles. |
| type IngestResultsFiles func(tt *TileTracker, resultsFiles []*ResultsFileLocation, counter metrics.Counter) error |
| |
| // Ingester does the work of loading JSON files from Google Storage and putting |
| // the data into the TileStore. |
| // |
| // TODO(jcgregorio) This needs a refactor since we also use it to drive the ingestion |
| // of trybot data. It needs to be broken into two pieces, one which feeds ResultsFileLocations, |
| // and a second optional piece tha builds a TileTracker if necessary. |
| type Ingester struct { |
| git *gitinfo.GitInfo |
| tileStore types.TileStore |
| storage *storage.Service |
| hashToNumber map[string]int |
| lastIngestTime time.Time |
| ingestResults IngestResultsFiles |
| storageBaseDir string |
| datasetName string |
| |
| // Metrics about the ingestion process. |
| |
| elapsedTimePerUpdate metrics.Gauge |
| metricsProcessed metrics.Counter |
| lastSuccessfulUpdate time.Time |
| timeSinceLastSucceessfulUpdate metrics.Gauge |
| } |
| |
| func newGauge(name, suffix string) metrics.Gauge { |
| return metrics.NewRegisteredGauge("ingester."+name+".gauge."+suffix, metrics.DefaultRegistry) |
| } |
| |
| func newCounter(name, suffix string) metrics.Counter { |
| return metrics.NewRegisteredCounter("ingester."+name+".gauge."+suffix, metrics.DefaultRegistry) |
| } |
| |
| // NewIngester creates an Ingester given the repo and tilestore specified. |
| func NewIngester(git *gitinfo.GitInfo, tileStoreDir string, datasetName string, f IngestResultsFiles, storageBaseDir, metricName string) (*Ingester, error) { |
| storage, err := storage.New(http.DefaultClient) |
| if err != nil { |
| return nil, fmt.Errorf("Failed to create interace to Google Storage: %s\n", err) |
| } |
| |
| i := &Ingester{ |
| git: git, |
| tileStore: filetilestore.NewFileTileStore(tileStoreDir, datasetName, -1), |
| storage: storage, |
| hashToNumber: map[string]int{}, |
| ingestResults: f, |
| storageBaseDir: storageBaseDir, |
| datasetName: datasetName, |
| elapsedTimePerUpdate: newGauge(metricName, "update"), |
| metricsProcessed: newCounter(metricName, "processed"), |
| lastSuccessfulUpdate: time.Now(), |
| timeSinceLastSucceessfulUpdate: newGauge(metricName, "time-since-last-successful-update"), |
| } |
| |
| i.timeSinceLastSucceessfulUpdate.Update(int64(time.Since(i.lastSuccessfulUpdate).Seconds())) |
| go func() { |
| for _ = range time.Tick(time.Minute) { |
| i.timeSinceLastSucceessfulUpdate.Update(int64(time.Since(i.lastSuccessfulUpdate).Seconds())) |
| } |
| }() |
| return i, nil |
| } |
| |
| // lastCommitTimeInTile looks backward in the list of Commits and finds the most recent. |
| func (i *Ingester) lastCommitTimeInTile(tile *types.Tile) time.Time { |
| t := tile.Commits[0].CommitTime |
| for i := (len(tile.Commits) - 1); i >= 0; i-- { |
| if tile.Commits[i].CommitTime != 0 { |
| t = tile.Commits[i].CommitTime |
| break |
| } |
| } |
| if time.Unix(t, 0).Before(time.Time(config.BEGINNING_OF_TIME)) { |
| t = config.BEGINNING_OF_TIME.Unix() |
| } |
| return time.Unix(t, 0) |
| } |
| |
| // TileTracker keeps track of which Tile we are on, and allows moving to new |
| // Tiles, writing out the current Tile when tiles are changed, and creating new |
| // Tiles if they don't exist. |
| type TileTracker struct { |
| lastTileNum int |
| currentTile *types.Tile |
| tileStore types.TileStore |
| hashToNumber map[string]int |
| } |
| |
| func NewTileTracker(tileStore types.TileStore, hashToNumber map[string]int) *TileTracker { |
| return &TileTracker{ |
| lastTileNum: -1, |
| currentTile: nil, |
| tileStore: tileStore, |
| hashToNumber: hashToNumber, |
| } |
| } |
| |
| // Move changes the current Tile to the one that contains the given Git hash. |
| func (tt *TileTracker) Move(hash string) error { |
| if _, ok := tt.hashToNumber[hash]; !ok { |
| return fmt.Errorf("Commit does not exist in table: %s", hash) |
| } |
| hashNumber := tt.hashToNumber[hash] |
| tileNum := hashNumber / config.TILE_SIZE |
| if tileNum != tt.lastTileNum { |
| glog.Infof("Moving from tile %d to %d", tt.lastTileNum, tileNum) |
| if tt.lastTileNum != -1 { |
| if err := tt.tileStore.Put(0, tt.lastTileNum, tt.currentTile); err != nil { |
| return fmt.Errorf("TileTracker.Move() failed to flush old tile: %s", err) |
| } |
| } |
| tt.lastTileNum = tileNum |
| var err error |
| tt.currentTile, err = tt.tileStore.GetModifiable(0, tileNum) |
| if err != nil { |
| return fmt.Errorf("UpdateCommitInfo: Failed to get modifiable tile %d: %s", tileNum, err) |
| } |
| if tt.currentTile == nil { |
| tt.currentTile = types.NewTile() |
| tt.currentTile.Scale = 0 |
| tt.currentTile.TileIndex = tileNum |
| } |
| } |
| return nil |
| } |
| |
| // Flush writes the current Tile out, should be called once all updates are |
| // done. Note that Move() writes out the former Tile as it moves to a new Tile, |
| // so this only needs to be called at the end of looping over a set of work. |
| func (tt TileTracker) Flush() { |
| glog.Info("Flushing Tile.") |
| if tt.lastTileNum != -1 { |
| if err := tt.tileStore.Put(0, tt.lastTileNum, tt.currentTile); err != nil { |
| glog.Errorf("Failed to write Tile: %s", err) |
| } |
| } |
| } |
| |
| // Tile returns the current Tile. |
| func (tt TileTracker) Tile() *types.Tile { |
| return tt.currentTile |
| } |
| |
| // Offset returns the Value offset of a commit in a Trace. |
| func (tt TileTracker) Offset(hash string) int { |
| return tt.hashToNumber[hash] % config.TILE_SIZE |
| } |
| |
| // UpdateCommitInfo finds all the new commits since the last time we ran and |
| // adds them to the tiles, creating new tiles if necessary. |
| func (i *Ingester) UpdateCommitInfo(pull bool) error { |
| glog.Infof("Ingest %s: Starting UpdateCommitInfo", i.datasetName) |
| if err := i.git.Update(pull, false); err != nil { |
| return fmt.Errorf("Ingest %s: Failed git pull for during UpdateCommitInfo: %s", i.datasetName, err) |
| } |
| |
| // Compute Git CL number for each Git hash. |
| allHashes := i.git.From(time.Time(config.BEGINNING_OF_TIME)) |
| hashToNumber := map[string]int{} |
| for i, h := range allHashes { |
| hashToNumber[h] = i |
| } |
| i.hashToNumber = hashToNumber |
| |
| // Find the time of the last Commit seen. |
| ts := time.Time(config.BEGINNING_OF_TIME) |
| lastTile, err := i.tileStore.Get(0, -1) |
| if err == nil && lastTile != nil { |
| ts = i.lastCommitTimeInTile(lastTile) |
| } else { |
| // Boundary condition; just started making Tiles and none exist. |
| newTile := types.NewTile() |
| newTile.Scale = 0 |
| newTile.TileIndex = 0 |
| if err := i.tileStore.Put(0, 0, newTile); err != nil { |
| return fmt.Errorf("Ingest %s: UpdateCommitInfo: Failed to write new tile: %s", i.datasetName, err) |
| } |
| } |
| glog.Infof("Ingest %s: UpdateCommitInfo: Last commit timestamp: %s", i.datasetName, ts) |
| |
| // Find all the Git hashes that are new to us. |
| newHashes := i.git.From(ts) |
| |
| glog.Infof("Ingest %s: len(newHashes): from %d", i.datasetName, len(newHashes)) |
| |
| // Add Commit info to the Tiles for each new hash. |
| tt := NewTileTracker(i.tileStore, i.hashToNumber) |
| for _, hash := range newHashes { |
| glog.Infof("For hash %s: %s", i.datasetName, hash) |
| if err := tt.Move(hash); err != nil { |
| glog.Errorf("UpdateCommitInfo Move(%s) failed with: %s", hash, err) |
| continue |
| } |
| details, err := i.git.Details(hash) |
| if err != nil { |
| glog.Errorf("Failed to get details for hash: %s: %s", hash, err) |
| continue |
| } |
| tt.Tile().Commits[tt.Offset(hash)] = &types.Commit{ |
| CommitTime: details.Timestamp.Unix(), |
| Hash: hash, |
| Author: details.Author, |
| } |
| } |
| glog.Infof("Ingest %s: Starting to flush tile.", i.datasetName) |
| tt.Flush() |
| |
| glog.Infof("Ingest %s: Finished UpdateCommitInfo", i.datasetName) |
| return nil |
| } |
| |
| // Update does a single full update, first updating the commits and creating |
| // new tiles if necessary, and then pulling in new data from Google Storage to |
| // populate the traces. |
| func (i *Ingester) Update(pull bool, lastIngestTime int64) error { |
| glog.Info("Beginning ingest.") |
| begin := time.Now() |
| if err := i.UpdateCommitInfo(pull); err != nil { |
| glog.Errorf("Update: Failed to update commit info: %s", err) |
| return err |
| } |
| if err := i.UpdateTiles(lastIngestTime); err != nil { |
| glog.Errorf("Update: Failed to update tiles: %s", err) |
| return err |
| } |
| i.lastSuccessfulUpdate = time.Now() |
| i.elapsedTimePerUpdate.Update(int64(time.Since(begin).Seconds())) |
| glog.Info("Finished ingest.") |
| return nil |
| } |
| |
| // UpdateTiles reads the latest JSON files from Google Storage and converts |
| // them into Traces stored in Tiles. |
| func (i *Ingester) UpdateTiles(lastIngestTime int64) error { |
| glog.Infof("Ingest %s: Starting UpdateTiles", i.datasetName) |
| |
| tt := NewTileTracker(i.tileStore, i.hashToNumber) |
| resultsFiles, err := GetResultsFileLocations(lastIngestTime, i.storage, i.storageBaseDir) |
| if err != nil { |
| return fmt.Errorf("Failed to update tiles: %s", err) |
| } |
| |
| glog.Infof("Ingest %s: Found %d resultsFiles", i.datasetName, len(resultsFiles)) |
| |
| i.ingestResults(tt, resultsFiles, i.metricsProcessed) |
| tt.Flush() |
| |
| glog.Infof("Ingest %s: Finished UpdateTiles", i.datasetName) |
| return nil |
| } |
| |
| // ResultsFileLocation is the URI of a single JSON file with results in it. |
| type ResultsFileLocation struct { |
| URI string // Absolute URI used to fetch the file. |
| Name string // Complete path, w/o the gs:// prefix. |
| } |
| |
| func NewResultsFileLocation(uri, name string) *ResultsFileLocation { |
| return &ResultsFileLocation{ |
| URI: uri, |
| Name: name, |
| } |
| } |
| |
| // Fetch retrieves the file contents. |
| // |
| // Callers must call Close() on the returned io.ReadCloser. |
| func (b ResultsFileLocation) Fetch() (io.ReadCloser, error) { |
| for i := 0; i < config.MAX_URI_GET_TRIES; i++ { |
| glog.Infof("Fetching: %s", b.Name) |
| request, err := gs.RequestForStorageURL(b.URI) |
| if err != nil { |
| glog.Warningf("Unable to create Storage MediaURI request: %s\n", err) |
| continue |
| } |
| resp, err := client.Do(request) |
| if err != nil { |
| glog.Warningf("Unable to retrieve URI while creating file iterator: %s", err) |
| continue |
| } |
| if resp.StatusCode != 200 { |
| glog.Errorf("Failed to retrieve: %d %s", resp.StatusCode, resp.Status) |
| } |
| glog.Infof("GS FETCH %s", b.URI) |
| return resp.Body, nil |
| } |
| return nil, fmt.Errorf("Failed fetching JSON after %d attempts", config.MAX_URI_GET_TRIES) |
| } |
| |
| // getFilesFromGSDir returns a list of URIs to get of the JSON files in the |
| // given bucket and directory made after the given timestamp. |
| func getFilesFromGSDir(dir string, earliestTimestamp int64, storage *storage.Service) ([]*ResultsFileLocation, error) { |
| results := []*ResultsFileLocation{} |
| glog.Infoln("Opening directory", dir) |
| |
| req := storage.Objects.List(gs.GS_PROJECT_BUCKET).Prefix(dir) |
| for req != nil { |
| resp, err := req.Do() |
| if err != nil { |
| return nil, fmt.Errorf("Error occurred while listing JSON files: %s", err) |
| } |
| for _, result := range resp.Items { |
| updateDate, _ := time.Parse(time.RFC3339, result.Updated) |
| updateTimestamp := updateDate.Unix() |
| if updateTimestamp > earliestTimestamp { |
| results = append(results, NewResultsFileLocation(result.MediaLink, result.Name)) |
| } |
| } |
| if len(resp.NextPageToken) > 0 { |
| req.PageToken(resp.NextPageToken) |
| } else { |
| req = nil |
| } |
| } |
| return results, nil |
| } |
| |
| // GetResultsFileLocations retrieves a list of ResultsFileLocations from Cloud Storage, each one |
| // corresponding to a single JSON file. |
| func GetResultsFileLocations(last int64, storage *storage.Service, dir string) ([]*ResultsFileLocation, error) { |
| dirs := gs.GetLatestGSDirs(last, time.Now().Unix(), dir) |
| glog.Infoln("GetResultsFileLocations: Looking in dirs: ", dirs) |
| |
| retval := []*ResultsFileLocation{} |
| for _, dir := range dirs { |
| files, err := getFilesFromGSDir(dir, last, storage) |
| if err != nil { |
| return nil, err |
| } |
| retval = append(retval, files...) |
| } |
| return retval, nil |
| } |