blob: 343cd53a8656593f590d63c8cfcf3357c068337f [file] [log] [blame]
package dsutil
import (
"context"
"math/rand"
"sort"
"time"
"cloud.google.com/go/datastore"
"go.skia.org/infra/go/ds"
"go.skia.org/infra/go/skerr"
"go.skia.org/infra/go/util"
)
const (
// DefaultConsistencyDelta is the default (assumed) time it takes for query results to be consistent
DefaultConsistencyDelta time.Duration = 5 * 60 * 1000 * time.Millisecond
)
// RecentKeysList is a utility type to store recently added/deleted keys in an
// entity for a defined duration.
// This allows to augment an eventually consistent query with the most recent
// keys and therefore yield a consistent listing of entries.
//
// It assumes that the used keys are time based (created via TimeSortableKey)
// and can easily be sorted and the underlying time can be extracted.
type RecentKeysList struct {
// client is the cloud datastore client.
client *datastore.Client
// containerKey is the key of the entity where the most recently added keys
// should be stored.
containerKey *datastore.Key
// consistencyDeltaMs is the consistency delta in milliseconds
consistencyDeltaMs int64
}
// NewRecentKeysList creates a new instance of RecentKeysList. It will store any key
// changes via the Add and Delete functions to the entity identified by the provided
// 'containerKey'.
// Any keys in the container entity that are older than the duration given as
// consistentDelta will be removed. The kind of containerKey is usually of
// kind ds.HELPER_RECENT_KEYS (but this is not strictly required).
func NewRecentKeysList(client *datastore.Client, containerKey *datastore.Key, consistentDelta time.Duration) *RecentKeysList {
return &RecentKeysList{
client: client,
containerKey: containerKey,
consistencyDeltaMs: int64(consistentDelta / time.Millisecond),
}
}
// Add adds a new key to the set of recently added keys within the given transaction.
// This should be called in the transaction that is used to add the new entity.
// It will add the key to the set of recently added keys and removes any keys
// that are no longer within the defined time delta.
func (l *RecentKeysList) Add(tx *datastore.Transaction, newKey *datastore.Key) error {
if newKey.ID <= 0 {
return skerr.Fmt("Key contains invalid numeric ID. It must be generated by calling TimeSortableKey")
}
return l.updateRecentKeys(tx, newKey, false)
}
// Delete removes the given Key from the container that contains the recent keys.
// This needs to be called whenever an entity of the target collection is removed
// to make sure there are no dangling keys in the container.
func (l *RecentKeysList) Delete(tx *datastore.Transaction, removeKey *datastore.Key) error {
return l.updateRecentKeys(tx, removeKey, true)
}
// updateRecentKeys adds or removes the given key from the entity that contains
// the list of recently changed keys.
func (l *RecentKeysList) updateRecentKeys(tx *datastore.Transaction, key *datastore.Key, remove bool) error {
recent := &Recently{}
// This works because direct reads of keys via Get(...) are always strongly consistent.
if err := tx.Get(l.containerKey, recent); err != nil && err != datastore.ErrNoSuchEntity {
return err
}
// If the new key causes a change we write it to the database
var err error
if recent.update(key, l.consistencyDeltaMs, remove) {
_, err = tx.Put(l.containerKey, recent)
}
return err
}
// GetRecent returns the collection of recently changed keys, this includes
// added and deleted keys, encapsulated in an instance of Recently.
// This should be called in parallel to an eventually consistent query.
// The returned Recently instance can then be used to combine the recent keys
// with the result of the query to get a consistent snapshot of the collection.
func (l *RecentKeysList) GetRecent() (*Recently, error) {
ret := &Recently{}
if err := l.client.Get(context.Background(), l.containerKey, ret); err != nil && err != datastore.ErrNoSuchEntity {
return nil, err
}
return ret, nil
}
// Recently contains recently added and deleted keys that can be combined with
// an eventually-consistent query result to produce a complete list of valid keys.
// Its members should never be accessed directly. Instead the Add and
// Delete methods of the RecentKeysList type should be used or its own Combine
// method.
type Recently struct {
// Added contains the sorted keys that were added within the consistency time window
Added []*datastore.Key
// Deleted contains the sorted keys that were deleted within the consistency time window
Deleted []*datastore.Key
}
// Combine assumes that all keys are time based (created via TimeSortableKey) and
// therefore sortable. It combines 'queried' with the added and deleted keys,
// deduplicates them and returns the deduplicated keys sorted in ascending order
// which means the underlying times are in descending order (newest first).
func (r *Recently) Combine(queried []*datastore.Key) []*datastore.Key {
delMap := toMap(r.Deleted)
addMap := toMap(r.Added)
// The Added and Deleted keys are mutually exclusive, so we can only have to
// check the queried keys for recently added and deleted keys.
ret := make([]*datastore.Key, 0, len(r.Added)+len(queried))
ret = append(ret, r.Added...)
for _, k := range queried {
if addMap[k.ID] == nil && delMap[k.ID] == nil {
ret = append(ret, k)
}
}
sort.Slice(ret, func(i, j int) bool { return ret[i].ID < ret[j].ID })
return ret
}
// update adds or removes a key from the list of keys and guarantees that all entries are unique
// and sorted in ascending order (newest first).
func (r *Recently) update(keyToUpdate *datastore.Key, evConsistentDeltaMs int64, remove bool) bool {
// Update the current keys and store them back to the database.
newerThanMs := util.TimeStamp(time.Millisecond) - evConsistentDeltaMs
changed := false
changed = updateSortedKeys(&r.Deleted, keyToUpdate, newerThanMs, !remove) || changed
changed = updateSortedKeys(&r.Added, keyToUpdate, newerThanMs, remove) || changed
return changed
}
// sortableIDMask is a bit mask with the lowest 63 bits set to 1. It is used to
// invert the id in getSortableTimeID
const sortableIDMask = int64((uint64(1) << 63) - 1)
// TimeSortableKey returns a datastore key for the given kind and timestamp (in ms).
// The returned key has the property that it contains the given timestamp embedded
// in its numeric ID and that it is sortable. The ID is inverted in a way that
// when sorted in ascending order the embedded timestamps are sorted in decending
// order. Thus the newest keys are first.
// The GetTimeFromID function allows to extract the timestamp from the id of the returned key.
//
// NOTE: Any program using TimeSortableKey should call rand.Seed(...) to
// initialize the seed of the default random number generator.
func TimeSortableKey(kind ds.Kind, timeStampMs int64) *datastore.Key {
ret := ds.NewKey(kind)
if timeStampMs == 0 {
timeStampMs = util.TimeStamp(time.Millisecond)
}
ret.ID = getSortableTimeID(timeStampMs)
return ret
}
// GetTimeFromID returns a time stamp in ms from the given id. It is a assumed
// that the id comes from a key that was generated with the TimeSortableKey function.
func GetTimeFromID(id int64) int64 {
return (id ^ sortableIDMask) >> 20
}
// getSortableTimeID returns a 64-bit ID that contains the current time and
// is inverted and has the property that when sorted in ascending order contains
// time stamps in decreasing order. Thus the newest IDs are first in the ordering.
//
// This was adapted from:
//
// https://github.com/luci/luci-py/blob/master/appengine/swarming/server/task_request.py#L1078
//
// The key contains a 64-bit numeric ID that follows this structure:
// - 1 highest order bits set to 0 to keep value positive.
// - 43 bits is the time since the epoch at 1ms resolution.
// It is good for 2**43 / 365.3 / 24 / 60 / 60 / 1000 = 278 years or 1970+278 =
// 2248. The author will be dead at that time.
// - 16 bits set to a random value or a server instance specific value. Assuming
// an instance is internally consistent with itself, it can ensure to not reuse
// the same 16 bits in two consecutive requests and/or throttle itself to one
// request per millisecond.
// Using random value reduces to 2**-15 the probability of collision on exact
// same timestamp at 1ms resolution, so a maximum theoretical rate of 65536000
// requests/sec but an effective rate in the range of ~64k requests/sec without
// much transaction conflicts. We should be fine.
// - 4 bits set to 0x1. This is to represent the 'version' of the entity schema.
// Previous version had 0. Note that this value is XOR'ed in the DB so it's
// stored as 0xE. When the TaskRequest entity tree is modified in a breaking
// way that affects the packing and unpacking of task ids, this value should be
// bumped.
// The key id is this value XOR'ed with sortableIDMask (lowest 63 bit set to 1).
// The reason is that increasing key id values are in decreasing timestamp order.
//
func getSortableTimeID(timeStampMs int64) int64 {
random16Bits := rand.Int63() & 0x0FFFF
id := (timeStampMs << 20) | (random16Bits << 4) | 1
ret := id ^ sortableIDMask
return ret
}
func updateSortedKeys(keys *[]*datastore.Key, k *datastore.Key, newerThanMs int64, remove bool) bool {
// Remove all keys that are don't need to be cached any longer
changed := filterSortedKeys(keys, newerThanMs)
// If the key is outside the time window we don't have to add or remove it.
if GetTimeFromID(k.ID) < newerThanMs {
return changed
}
// Find where to insert the new key and insert or remove it
idx := sort.Search(len(*keys), func(i int) bool { return (*keys)[i].ID >= k.ID })
found := idx < len(*keys) && (*keys)[idx].ID == k.ID
if remove {
// If we have found the key we need to remove it.
if found {
// Since we have a slice of pointers we need to take care to not leak the
// memory of the removed element.
copy((*keys)[idx:], (*keys)[idx+1:])
(*keys)[len(*keys)-1] = nil
*keys = (*keys)[:len(*keys)-1]
changed = true
}
return changed
}
// If we found it nothing has to be done.
if found {
return changed
}
*keys = append(*keys, k)
// If the target location is at the end we done. Otherwise we are guaranteed
// that the underlying slice is big enough and we insert the new key without
// an intermediate allocation.
if idx < (len(*keys) - 1) {
// Note: This copies only the existing keys because of the length of dst.
copy((*keys)[idx+1:], (*keys)[idx:])
(*keys)[idx] = k
}
return true
}
func filterSortedKeys(keys *[]*datastore.Key, newerThanMs int64) bool {
idx := sort.Search(len(*keys), func(i int) bool {
return GetTimeFromID((*keys)[i].ID) < newerThanMs
})
if idx == len(*keys) {
return false
}
*keys = (*keys)[:idx]
return true
}
// toMap converts a list of keys to a map keyed by the numeric ID for quick lookup.
func toMap(keys []*datastore.Key) map[int64]*datastore.Key {
ret := make(map[int64]*datastore.Key, len(keys))
for _, k := range keys {
ret[k.ID] = k
}
return ret
}