blob: ebf2afc232a963ee4cc25701f817a921e8b0300f [file] [log] [blame]
package bt_gitstore
import (
"sort"
"strconv"
"strings"
"sync/atomic"
"time"
"cloud.google.com/go/bigtable"
multierror "github.com/hashicorp/go-multierror"
"go.skia.org/infra/go/skerr"
"go.skia.org/infra/go/vcsinfo"
)
// shardedResults collects the results of call to iterShardedRange.
type shardedResults interface {
// Add adds the result of a shard.
Add(shard uint32, row bigtable.Row) error
// Finish indicates that the shard is done processing its results.
Finish(shard uint32)
}
// srIndexCommits implements the shardedResults interface for collecting results that are vcsinfo.IndexCommits
type srIndexCommits struct {
errs []*multierror.Error
timestamps []map[*vcsinfo.IndexCommit]time.Time
results [][]*vcsinfo.IndexCommit
retSize int64
}
// create a new instance to collect IndexCommits.
func newSRIndexCommits(shards uint32) *srIndexCommits {
timestamps := make([]map[*vcsinfo.IndexCommit]time.Time, shards)
for i := uint32(0); i < shards; i++ {
timestamps[i] = map[*vcsinfo.IndexCommit]time.Time{}
}
return &srIndexCommits{
results: make([][]*vcsinfo.IndexCommit, shards),
timestamps: timestamps,
errs: make([]*multierror.Error, shards),
}
}
// Add implements the shardedResults interface.
func (s *srIndexCommits) Add(shard uint32, row bigtable.Row) error {
idx := parseIndex(keyFromRowName(row.Key()))
if idx < 0 {
return skerr.Fmt("Unable to parse index key %q. Invalid index", row.Key())
}
prefix := cfCommit + ":"
for _, col := range row[cfCommit] {
if strings.TrimPrefix(col.Column, prefix) == colHashTs {
split := strings.Split(string(col.Value), "#")
if len(split) != 2 {
return skerr.Fmt("Unable to parse hash#timestamp from %q", string(col.Value))
}
ts, err := strconv.Atoi(split[1])
if err != nil {
return skerr.Wrapf(err, "Unable to parse hash#timestamp from %q", string(col.Value))
}
hash := make([]byte, len(split[0]))
copy(hash, []byte(split[0]))
ic := &vcsinfo.IndexCommit{
Index: idx,
Hash: string(hash),
Timestamp: time.Unix(int64(ts), 0).UTC(), // Git has a timestamp resolution of 1s.
}
s.results[shard] = append(s.results[shard], ic)
s.timestamps[shard][ic] = col.Timestamp.Time().UTC()
}
}
return nil
}
// Finish implements the shardedResults interface.
func (s *srIndexCommits) Finish(shard uint32) {
atomic.AddInt64(&s.retSize, int64(len(s.results)))
}
// Sorted returns the resulting IndexCommits by Index->TimeStamp->Hash, and the
// server-side update timestamps associated with each IndexCommit.
// Using the hash ensures results with identical timestamps are sorted stably.
func (s *srIndexCommits) Sorted() ([]*vcsinfo.IndexCommit, map[*vcsinfo.IndexCommit]time.Time) {
// Concatenate the shard results into a single output and sort it.
ics := make([]*vcsinfo.IndexCommit, 0, s.retSize)
timestamps := make(map[*vcsinfo.IndexCommit]time.Time, len(ics))
for shard, sr := range s.results {
ics = append(ics, sr...)
for ic, t := range s.timestamps[shard] {
timestamps[ic] = t
}
}
sort.Sort(vcsinfo.IndexCommitSlice(ics))
return ics, timestamps
}
// srTimestampCommits is an adaptation of srIndexCommits that extracts a different
// column family from a timestamp-based index. Otherwise it is identical.
type srTimestampCommits struct {
*srIndexCommits
}
// newSRTimestampCommits creates a new instance to receive timestamp based IndexCommits.
func newSRTimestampCommits(shards uint32) *srTimestampCommits {
return &srTimestampCommits{
srIndexCommits: newSRIndexCommits(shards),
}
}
// Add implements the shardedResults interface and overrides the Add function in srIndexCommits.
func (s *srTimestampCommits) Add(shard uint32, row bigtable.Row) error {
prefix := cfTsCommit + ":"
for _, col := range row[cfTsCommit] {
hash := strings.TrimPrefix(col.Column, prefix)
// Parse the timestamp from the row key.
split := strings.Split(row.Key(), ":")
ts, err := strconv.Atoi(split[len(split)-1])
if err != nil {
return skerr.Wrapf(err, "Unable to parse timestamp from row key %q", row.Key())
}
// Parse the index.
idxStr := string(col.Value)
idx := parseIndex(idxStr)
if idx < 0 {
return skerr.Fmt("Unable to parse index key %q. Invalid index", idxStr)
}
ic := &vcsinfo.IndexCommit{
Hash: hash,
Timestamp: time.Unix(int64(ts), 0).UTC(), // Git has a timestamp resolution of 1s.
Index: idx,
}
s.results[shard] = append(s.results[shard], ic)
s.timestamps[shard][ic] = col.Timestamp.Time().UTC()
}
return nil
}