blob: 43e66a61dcc834408bc3831691aec4d9b9bdbc56 [file] [log] [blame]
package commit_cache
import (
"encoding/gob"
"encoding/json"
"fmt"
"io"
"os"
"path/filepath"
"regexp"
"sync"
"time"
"github.com/skia-dev/glog"
"skia.googlesource.com/buildbot.git/go/buildbot"
"skia.googlesource.com/buildbot.git/go/gitinfo"
)
/*
Utilities for caching commit data.
*/
const (
// Number of commits to store in a given cache block. Changing this
// will require completely rebuilding the cache.
BLOCK_SIZE = 25
// Pattern for cache file names.
CACHE_FILE_PATTERN = "commit_cache.%d.gob"
// If the time since the block was last updated is greater than this,
// force a reload.
MAX_BLOCK_AGE = 2 * time.Minute
)
var (
// Patterns indicating which bots to skip.
BOT_BLACKLIST = []string{
".*-Trybot",
".*Housekeeper.*",
}
)
func init() {
gob.Register([]interface{}{})
gob.Register(map[string]interface{}{})
}
// cacheFileName returns the name of a cache file, based on the cache directory
// and the block number.
func cacheFileName(cacheDir string, blockNum int) string {
return filepath.Join(cacheDir, fmt.Sprintf(CACHE_FILE_PATTERN, blockNum))
}
// blockIdx returns the index of the block and the index of the commit
// within that block for the given commit index.
func blockIdx(commitIdx int) (int, int) {
return commitIdx / BLOCK_SIZE, commitIdx % BLOCK_SIZE
}
// skipBot determines whether the given bot should be skipped.
func skipBot(b string) bool {
for _, p := range BOT_BLACKLIST {
m, err := regexp.MatchString(p, b)
if err != nil {
glog.Fatal(err)
}
if m {
return true
}
}
return false
}
// CommitData is a struct which contains information about a single commit.
// Changing its structure will require completely rebuilding the cache.
type CommitData struct {
*gitinfo.LongCommit
Builds map[string]*buildbot.BuildSummary `json:"builds"`
}
// cacheBlock is an independently-managed slice of the commit cache. Changing
// its structure will require completely rebuilding the cache.
type cacheBlock struct {
BlockNum int
CacheFile string
Commits []*CommitData
LastLoaded time.Time
mutex sync.RWMutex
parent *CommitCache
storedBuilds map[int]bool
}
// fromFile reads the cache file and returns a commitCache object.
func fromFile(cacheFile string, parent *CommitCache, expectFull bool) (*cacheBlock, error) {
glog.Infof("Reading commit cache from file %s", cacheFile)
b := cacheBlock{}
if _, err := os.Stat(cacheFile); os.IsNotExist(err) {
return nil, fmt.Errorf("Commit cache file %s does not exist.", cacheFile)
}
f, err := os.Open(cacheFile)
if err != nil {
return nil, fmt.Errorf("Failed to open cache file %s: %v", cacheFile, err)
}
defer f.Close()
if err := gob.NewDecoder(f).Decode(&b); err != nil {
return nil, fmt.Errorf("Failed to read cache file %s: %v", cacheFile, err)
}
b.CacheFile = cacheFile
// Validate the block size.
if len(b.Commits) > BLOCK_SIZE {
return nil, fmt.Errorf("Serialized cache block contains more than %d commits! Did the block size change?", BLOCK_SIZE)
}
if len(b.Commits) < BLOCK_SIZE {
// This happens if the BLOCK_SIZE has increased OR if this is
// the last block, which is allowed not to be full.
if expectFull {
return nil, fmt.Errorf("Serialized cache block contains fewer than %d commits! Did the block size change?", BLOCK_SIZE)
}
// Ensure that the commit slice has the required capacity.
commits := make([]*CommitData, len(b.Commits), BLOCK_SIZE)
n := copy(commits, b.Commits)
if n != len(b.Commits) {
return nil, fmt.Errorf("Failed to re-slice commit data; Copied %d of %d items.", n, len(b.Commits))
}
b.Commits = commits
}
b.parent = parent
b.storedBuilds = map[int]bool{}
for _, c := range b.Commits {
for _, build := range c.Builds {
if build.Finished {
b.storedBuilds[build.Id] = true
}
}
}
glog.Infof("Done reading cache block %d (%d commits) from %s", b.BlockNum, len(b.Commits), cacheFile)
return &b, nil
}
// toFile serializes the cache block to a file.
func (b *cacheBlock) toFile() error {
glog.Infof("Writing commit cache block %d (%d commits) to file %s", b.BlockNum, len(b.Commits), b.CacheFile)
f, err := os.Create(b.CacheFile)
if err != nil {
return fmt.Errorf("Failed to open/create cache file %s: %v", b.CacheFile, err)
}
defer f.Close()
if err := gob.NewEncoder(f).Encode(b); err != nil {
return fmt.Errorf("Failed to write cache file %s: %v", b.CacheFile, err)
}
glog.Infof("Done writing commit cache to %s", b.CacheFile)
return nil
}
// NumCommits gives the number of commits in this block.
func (b *cacheBlock) NumCommits() int {
b.mutex.RLock()
defer b.mutex.RUnlock()
return len(b.Commits)
}
// Get returns the CommitData at the given index within this block.
func (b *cacheBlock) Get(idx int) (*CommitData, error) {
// Force a reload if needed.
if time.Now().UTC().Sub(b.LastLoaded) > MAX_BLOCK_AGE {
glog.Infof("Block %d is too old (last loaded at %s). Forcing reload.", b.BlockNum, b.LastLoaded)
if err := b.UpdateBuilds(); err != nil {
return nil, err
}
}
b.mutex.RLock()
defer b.mutex.RUnlock()
if idx < 0 || idx >= len(b.Commits) {
return nil, fmt.Errorf("Index out of range: %d not in [%d, %d)", idx, 0, len(b.Commits))
}
return b.Commits[idx], nil
}
// Slice returns a slice of CommitDatas from this block.
func (b *cacheBlock) Slice(startIdx, endIdx int) ([]*CommitData, error) {
b.mutex.RLock()
defer b.mutex.RUnlock()
if startIdx < 0 || startIdx > endIdx || endIdx > len(b.Commits) {
return nil, fmt.Errorf("Index out of range: (%d < 0 || %d > %d || %d > %d)", startIdx, startIdx, endIdx, endIdx, len(b.Commits))
}
// Force a reload if needed.
if time.Now().UTC().Sub(b.LastLoaded) > MAX_BLOCK_AGE {
glog.Infof("Block %d is too old (last loaded at %s). Forcing reload.", b.BlockNum, b.LastLoaded)
b.mutex.RUnlock()
if err := b.UpdateBuilds(); err != nil {
return nil, err
}
b.mutex.RLock()
}
return b.Commits[startIdx:endIdx], nil
}
// UpdateBuilds reloads all build data for commits in this block.
func (b *cacheBlock) UpdateBuilds() error {
b.mutex.RLock()
glog.Infof("UpdateBuilds(%d)", b.BlockNum)
hashes := make([]string, 0, len(b.Commits))
for _, c := range b.Commits {
hashes = append(hashes, c.Hash)
}
b.mutex.RUnlock()
builds, err := buildbot.GetBuildsForCommits(hashes, b.storedBuilds)
if err != nil {
return err
}
b.mutex.Lock()
defer b.mutex.Unlock()
for _, c := range b.Commits {
beforeBuilds := len(c.Builds)
for _, build := range builds[c.Hash] {
// Store the IDs of the loaded builds *before* filtering
// to ensure that we don't have to load them again.
if build.IsFinished() {
b.storedBuilds[build.Id] = true
}
// Filter out unwanted builders.
if !skipBot(build.Builder) {
if c.Builds == nil {
c.Builds = map[string]*buildbot.BuildSummary{}
}
c.Builds[build.Builder] = build.GetSummary()
}
}
glog.Infof("Found %d new builds (total %d) for %s", len(c.Builds)-beforeBuilds, len(c.Builds), c.Hash)
}
b.LastLoaded = time.Now().UTC()
return b.toFile()
}
// NewCommits copies a portion of the new commits into this block and returns
// the number of commits which were copied.
func (b *cacheBlock) NewCommits(newCommits []*CommitData) (int, error) {
b.mutex.Lock()
defer b.mutex.Unlock()
// Figure out how many commits we can copy.
oldLen := len(b.Commits)
n := BLOCK_SIZE - oldLen
if n > len(newCommits) {
n = len(newCommits)
}
// Extend the slice to contain the new commits.
b.Commits = b.Commits[:oldLen+n]
// Copy over the new commits.
actuallyCopied := copy(b.Commits[oldLen:], newCommits[:n])
if n != actuallyCopied {
return actuallyCopied, fmt.Errorf("Wanted to copy %d but copied %d", n, actuallyCopied)
}
return n, b.toFile()
}
// CommitCache is a struct used for caching commit data. Stores ALL commits in
// the repository.
type CommitCache struct {
blocks []*cacheBlock
BranchHeads []*gitinfo.GitBranch
cacheDir string
mutex sync.RWMutex
repo *gitinfo.GitInfo
requestSize int
}
// New creates and returns a new CommitCache which watches the given repo.
// The initial update will load ALL commits from the repository, so expect
// this to be slow.
func New(repo *gitinfo.GitInfo, cacheDir string, requestSize int) (*CommitCache, error) {
c := &CommitCache{
cacheDir: cacheDir,
blocks: []*cacheBlock{},
repo: repo,
requestSize: requestSize,
}
if _, err := os.Stat(cacheDir); os.IsNotExist(err) {
if err = os.MkdirAll(cacheDir, os.ModePerm); err != nil {
return nil, fmt.Errorf("Failed to create cache dir: %v", err)
}
}
cacheFiles := []string{}
for {
fileName := cacheFileName(cacheDir, len(cacheFiles))
if _, err := os.Stat(fileName); os.IsNotExist(err) {
break
}
cacheFiles = append(cacheFiles, fileName)
}
// Load the blocks from file.
for i, f := range cacheFiles {
b, err := fromFile(f, c, i != len(cacheFiles)-1)
if err != nil {
return nil, fmt.Errorf("Could not load cache block: %v", err)
}
c.blocks = append(c.blocks, b)
}
if len(cacheFiles) == 0 {
if err := c.appendBlock(); err != nil {
return nil, err
}
}
// Update the cache.
if err := c.update(); err != nil {
return nil, err
}
// Update in a loop.
go func() {
for _ = range time.Tick(time.Minute) {
if err := c.update(); err != nil {
glog.Errorf("Failed to update commit cache: %v", err)
}
}
}()
return c, nil
}
// numCommits returns the number of commits in the cache. Assumes the caller
// holds a lock.
func (c *CommitCache) numCommits() int {
if c.blocks == nil {
return 0
}
b := len(c.blocks)
if b == 0 {
return 0
}
// Assume all blocks are full, except the last.
return (b-1)*BLOCK_SIZE + c.blocks[b-1].NumCommits()
}
// NumCommits returns the number of commits in the cache.
func (c *CommitCache) NumCommits() int {
c.mutex.RLock()
defer c.mutex.RUnlock()
return c.numCommits()
}
// Get returns the CommitData at the given index.
func (c *CommitCache) Get(idx int) (*CommitData, error) {
c.mutex.RLock()
defer c.mutex.RUnlock()
b, i := blockIdx(idx)
if b >= len(c.blocks) {
return nil, fmt.Errorf("Index out of range: %d not in [%d, %d)", b, 0, len(c.blocks))
}
return c.blocks[b].Get(i)
}
// Slice returns a slice of CommitDatas from the cache.
func (c *CommitCache) Slice(startIdx, endIdx int) ([]*CommitData, error) {
c.mutex.RLock()
c.mutex.RUnlock()
n := c.numCommits()
if startIdx < 0 || startIdx > endIdx || endIdx > n {
return nil, fmt.Errorf("Index out of range: (%d < 0 || %d > %d || %d > %d)", startIdx, startIdx, endIdx, endIdx, n)
}
startBlock, startSubIdx := blockIdx(startIdx)
endBlock, endSubIdx := blockIdx(endIdx)
rv := make([]*CommitData, endIdx-startIdx)
var wg sync.WaitGroup
errs := make([]error, endBlock-startBlock+1)
for b := startBlock; b <= endBlock; b++ {
wg.Add(1)
go func(b int) {
defer wg.Done()
var i, j int
if b == startBlock {
i = startSubIdx
} else {
i = 0
}
if b == endBlock {
j = endSubIdx
} else {
j = c.blocks[b].NumCommits()
}
s, err := c.blocks[b].Slice(i, j)
if err != nil {
errs[b-startBlock] = fmt.Errorf("Failed to slice block: %v", err)
return
}
x := (b-startBlock)*BLOCK_SIZE + i - startSubIdx
y := (b-startBlock)*BLOCK_SIZE + j - startSubIdx
if n := copy(rv[x:y], s); n != len(s) {
errs[b-startBlock] = fmt.Errorf("Failed to copy slice %v to %v (copied %d elements)", s, rv, n)
return
}
}(b)
}
wg.Wait()
for _, e := range errs {
if e != nil {
return nil, e
}
}
return rv, nil
}
// appendBlock adds a new block to the cache. Assumes the caller holds a lock.
func (c *CommitCache) appendBlock() error {
if c.blocks == nil {
c.blocks = []*cacheBlock{}
}
if len(c.blocks) > 0 && c.blocks[len(c.blocks)-1].NumCommits() != BLOCK_SIZE {
return fmt.Errorf("Failed to add a new cache block; current last block is not full.")
}
c.blocks = append(c.blocks, &cacheBlock{
BlockNum: len(c.blocks),
CacheFile: cacheFileName(c.cacheDir, len(c.blocks)),
Commits: make([]*CommitData, 0, BLOCK_SIZE),
parent: c,
storedBuilds: map[int]bool{},
})
return nil
}
// update syncs the source code repository and loads any new commits.
func (c *CommitCache) update() error {
glog.Info("Reloading commits.")
if err := c.repo.Update(true, true); err != nil {
return fmt.Errorf("Failed to update the repo: %v", err)
}
from := time.Time{}
n := c.NumCommits()
if n > 0 {
last, err := c.Get(n - 1)
if err != nil {
return fmt.Errorf("Failed to get last commit: %v", err)
}
from = last.Timestamp
}
newCommitHashes := c.repo.From(from)
glog.Infof("Processing %d new commits.", len(newCommitHashes))
newCommits := make([]*CommitData, len(newCommitHashes))
if len(newCommitHashes) > 0 {
for i, h := range newCommitHashes {
d, err := c.repo.Details(h)
if err != nil {
return fmt.Errorf("Failed to obtain commit details for %s: %v", h, err)
}
newCommits[i] = &CommitData{d, map[string]*buildbot.BuildSummary{}}
}
}
branchHeads, err := c.repo.GetBranches()
if err != nil {
return fmt.Errorf("Failed to read branch information from the repo: %v", err)
}
// Update the cached values all at once at at the end.
glog.Infof("Updating the cache.")
c.mutex.Lock()
defer c.mutex.Unlock()
c.BranchHeads = branchHeads
for i := 0; i < len(newCommits); {
n, err := c.blocks[len(c.blocks)-1].NewCommits(newCommits[i:])
if err != nil {
return fmt.Errorf("Failed to insert new commits into block: %v", err)
}
i += n
if i == len(newCommits) {
break
}
if err := c.appendBlock(); err != nil {
return fmt.Errorf("Failed to update the cache: %v", err)
}
}
// Only update the blocks needed to cover the average requestSize.
blocksToUpdate := c.requestSize/BLOCK_SIZE + 1
var wg sync.WaitGroup
errs := make([]error, blocksToUpdate)
for i := 0; i < blocksToUpdate; i++ {
wg.Add(1)
go func(j int) {
defer wg.Done()
blockIdx := len(c.blocks) - blocksToUpdate + j
errs[j] = c.blocks[blockIdx].UpdateBuilds()
}(i)
}
wg.Wait()
for _, err := range errs {
if err != nil {
return err
}
}
glog.Infof("Finished updating the cache.")
return nil
}
// RangeAsJson writes the given commit range along with the branch heads in JSON
// format to the given Writer. Assumes that the caller holds a read lock.
func (c *CommitCache) RangeAsJson(w io.Writer, startIdx, endIdx int) error {
commits, err := c.Slice(startIdx, endIdx)
if err != nil {
return err
}
data := struct {
Commits []*CommitData `json:"commits"`
BranchHeads []*gitinfo.GitBranch `json:"branch_heads"`
StartIdx int `json:"startIdx"`
EndIdx int `json:"endIdx"`
}{
Commits: commits,
BranchHeads: c.BranchHeads,
StartIdx: startIdx,
EndIdx: endIdx,
}
return json.NewEncoder(w).Encode(&data)
}
// LastNAsJson writes the last N commits along with the branch heads in JSON
// format to the given Writer.
func (c *CommitCache) LastNAsJson(w io.Writer, n int) error {
end := c.numCommits()
start := end - n
if start < 0 {
start = 0
}
return c.RangeAsJson(w, start, end)
}