blob: 67c664e9be3a552583ec517d2aca19983322fea7 [file] [log] [blame]
package events
/*
This package provides facilities for event-based metrics.
The metrics2 package deals with gauges; it doesn't have natural support for
events, ie. individual samples with timestamps. This package provides that
support by allowing the user to insert individual data points and provides
functions for aggregating data points into a gauge format which can then be
used as a normal metric.
*/
import (
"context"
"fmt"
"math"
"sort"
"sync"
"time"
"go.skia.org/infra/go/metrics2"
"go.skia.org/infra/go/skerr"
"go.skia.org/infra/go/sklog"
"go.skia.org/infra/go/util"
)
const (
// Tag key indicating what aggregation function was used to derive the
// metric.
tagAggregation = "aggregation"
// Tag key indicating the time period over which a metric is calculated.
tagPeriod = "period"
// Tag key indicating which event stream a metric is calculated from.
tagStream = "stream"
// The timestamp format used for encoding/decoding keys for the BoltDB
// database of events.
timestampFormat = "20060102T150405.000000000Z"
)
var (
// Callers may not use these tag keys.
RESERVED_TAGS = []string{tagPeriod, tagStream}
)
// encodeKey encodes a key for an entry in the BoltDB database of events.
func encodeKey(ts time.Time) ([]byte, error) {
if ts.UnixNano() < 0 {
return nil, fmt.Errorf("Time is invalid: %s", ts)
}
return []byte(ts.UTC().Format(timestampFormat)), nil
}
// decodeKey decodes a key for an entry in the BoltDB database of events.
func decodeKey(b []byte) (time.Time, error) {
return time.Parse(timestampFormat, string(b))
}
// AggregateFn is a function which reduces a number of Events into a single
// data point.
type AggregateFn func([]*Event) (float64, error)
// DynamicAggregateFn is a function which reduces a number of Events into a
// several data points, each with its own set of tags. The aggregation function
// may return multiple data points, each with its own set of tags. Each tag set
// comprises its own metric, and the aggregation function may return different
// tag sets at each iteration, eg. due to events with different properties
// occurring within each time period.
type DynamicAggregateFn func([]*Event) ([]map[string]string, []float64, error)
// ObservationsFn is a function which derives a number of float64 observations
// from a set of Events. The number of observations does not need to equal the
// number of Events.
type ObservationsFn func([]*Event) ([]float64, error)
// Event contains some metadata plus whatever data the user chooses.
type Event struct {
Stream string
Timestamp time.Time
Data []byte
}
// metric is a set of information used to derive a metric.
type metric struct {
measurement string
tags map[string]string
stream string
period time.Duration
agg AggregateFn
}
// dynamicMetric is a set of information used to derive a set of changing
// metrics. The aggregation function may return multiple data points, each with
// its own set of tags. Each tag set comprises its own metric, and the
// aggregation function may return different tag sets at each iteration, eg. due
// to events with different properties occurring within each time period.
type dynamicMetric struct {
measurement string
tags map[string]string
stream string
period time.Duration
agg DynamicAggregateFn
}
// EventMetrics is a struct used for creating aggregate metrics on streams of
// events.
type EventMetrics struct {
db EventDB
measurement string
dynamicMetrics map[string]map[time.Duration][]*dynamicMetric
currentDynamicMetrics map[string]metrics2.Float64Metric
metrics map[string]map[time.Duration][]*metric
mtx sync.Mutex
}
// NewEventMetrics returns an EventMetrics instance.
func NewEventMetrics(db EventDB, measurement string) (*EventMetrics, error) {
return &EventMetrics{
db: db,
measurement: measurement,
dynamicMetrics: map[string]map[time.Duration][]*dynamicMetric{},
currentDynamicMetrics: map[string]metrics2.Float64Metric{},
metrics: map[string]map[time.Duration][]*metric{},
mtx: sync.Mutex{},
}, nil
}
// Start initiates the EventMetrics goroutines.
func (m *EventMetrics) Start(ctx context.Context) {
lv := metrics2.NewLiveness("last_successful_event_metrics_update", map[string]string{
"measurement": m.measurement,
})
go util.RepeatCtx(ctx, time.Minute, func(ctx context.Context) {
if err := m.updateMetrics(time.Now()); err != nil {
sklog.Errorf("Failed to update event metrics: %s", err)
} else {
lv.Reset()
}
})
}
// Close cleans up the EventMetrics.
func (m *EventMetrics) Close() error {
return m.db.Close()
}
// checkTags returns an error if the given map contains reserved tags.
func checkTags(tags map[string]string) error {
for _, tag := range RESERVED_TAGS {
if _, ok := tags[tag]; ok {
return fmt.Errorf("Tag %q is reserved.", tag)
}
}
return nil
}
// AggregateMetric sets the given aggregation function on the event stream and
// adds a gauge for it. For example, to compute the sum of all int64 events over
// a 24-hour period:
//
// s.AggregateMetric("my-stream", myTags, 24*time.Hour, func(ev []Event) (float64, error) {
// sum := int64(0)
// for _, e := range ev {
// sum += decodeInt64(e)
// }
// return float64(sum), nil
// })
func (m *EventMetrics) AggregateMetric(stream string, tags map[string]string, period time.Duration, agg AggregateFn) error {
mx := &metric{
measurement: m.measurement,
tags: map[string]string{
tagPeriod: fmt.Sprintf("%s", period),
tagStream: stream,
tagAggregation: "",
},
stream: stream,
period: period,
agg: agg,
}
if err := checkTags(tags); err != nil {
return err
}
for k, v := range tags {
mx.tags[k] = v
}
m.mtx.Lock()
defer m.mtx.Unlock()
byPeriod, ok := m.metrics[stream]
if !ok {
byPeriod = map[time.Duration][]*metric{}
m.metrics[stream] = byPeriod
}
byPeriod[period] = append(byPeriod[period], mx)
return nil
}
// DynamicMetric sets the given aggregation function on the event stream. Gauges
// will be added and removed dynamically based on the results of the aggregation
// function. Here's a toy example:
//
// s.DynamicMetric("my-stream", myTags, 24*time.Hour, func(ev []Event) (map[string]float64, error) {
// counts := map[string]int64{}
// for _, e := range ev {
// counts[fmt.Sprintf("%d", decodeInt64(e))]++
// }
// rv := make(map[string]float64, len(counts))
// for k, v := range counts {
// rv[k] = float64(v)
// }
// return rv
// })
func (m *EventMetrics) DynamicMetric(stream string, tags map[string]string, period time.Duration, agg DynamicAggregateFn) error {
mx := &dynamicMetric{
measurement: m.measurement,
tags: map[string]string{
tagPeriod: fmt.Sprintf("%s", period),
tagStream: stream,
tagAggregation: "",
},
stream: stream,
period: period,
agg: agg,
}
for k, v := range tags {
if err := checkTags(tags); err != nil {
return err
}
mx.tags[k] = v
}
m.mtx.Lock()
defer m.mtx.Unlock()
byPeriod, ok := m.dynamicMetrics[stream]
if !ok {
byPeriod = map[time.Duration][]*dynamicMetric{}
m.dynamicMetrics[stream] = byPeriod
}
byPeriod[period] = append(byPeriod[period], mx)
return nil
}
// ComputeStatsMetric sets the given observation function on the event stream.
// Gauges will be added for various aggregation types on the observations
// generated by the function, eg. mean, standard deviation, quantiles, etc. For
// example, to compute statistics for duration of all events over a 24-hour
// period:
//
// s.ComputeStatsMetric("my-stream", myTags, 24*time.Hour, func(ev []Event) (map[string]float64, error) {
// vals := make([]float64, 0, len(ev))
// for _, e := range ev {
// data := decodeEvent()
// vals = append(vals, float64(data.End.Sub(data.Start)))
// }
// return vals
// })
func (m *EventMetrics) ComputeStatsMetric(stream string, tags map[string]string, period time.Duration, obs ObservationsFn) error {
if err := checkTags(tags); err != nil {
return skerr.Wrap(err)
}
metrics := []*metric{}
for aggregationName, aggregationFn := range computeStatsAggregationFuncs {
aggregationFn := aggregationFn // https://golang.org/doc/faq#closures_and_goroutines
agg := func(ev []*Event) (float64, error) {
observations, err := obs(ev)
if err != nil {
return 0.0, skerr.Wrap(err)
}
return aggregationFn(observations), nil
}
mx := &metric{
measurement: m.measurement,
tags: map[string]string{
tagPeriod: period.String(),
tagStream: stream,
tagAggregation: aggregationName,
},
stream: stream,
period: period,
agg: agg,
}
for k, v := range tags {
mx.tags[k] = v
}
metrics = append(metrics, mx)
}
m.mtx.Lock()
defer m.mtx.Unlock()
byPeriod, ok := m.metrics[stream]
if !ok {
byPeriod = map[time.Duration][]*metric{}
m.metrics[stream] = byPeriod
}
for _, mx := range metrics {
byPeriod[period] = append(byPeriod[period], mx)
}
return nil
}
type aggregationFn func([]float64) float64
var computeStatsAggregationFuncs = map[string]aggregationFn{
"mean": mean,
"variance": variance,
"50th-percentile": func(v []float64) float64 {
return percentile(v, 0.5)
},
"90th-percentile": func(v []float64) float64 {
return percentile(v, 0.9)
},
"99th-percentile": func(v []float64) float64 {
return percentile(v, 0.99)
},
}
func mean(v []float64) float64 {
// Avoid dividing by zero.
if len(v) == 0 {
return 0.0
}
sum := float64(0.0)
for _, val := range v {
sum += val
}
return sum / float64(len(v))
}
func variance(v []float64) float64 {
// Avoid dividing by zero.
if len(v) == 0 {
return 0.0
}
meanV := mean(v)
sum := float64(0.0)
for _, val := range v {
dist := val - meanV
sum += dist * dist
}
return sum / float64(len(v))
}
func percentile(v []float64, p float64) float64 {
// Avoid index errors.
if len(v) == 0 {
return 0.0
} else if len(v) == 1 {
return v[0]
}
sort.Float64s(v)
rank := p * float64(len(v)-1)
floor := math.Floor(rank)
ceil := math.Ceil(rank)
if floor == ceil {
return v[int(floor)]
}
lower := v[int(floor)]
upper := v[int(ceil)]
fraction := rank - floor
return lower + fraction*(upper-lower)
}
// updateMetric updates the value for a single metric.
func (m *EventMetrics) updateMetric(ev []*Event, mx *metric) error {
v, err := mx.agg(ev)
if err != nil {
return err
}
metrics2.GetFloat64Metric(mx.measurement, mx.tags).Update(v)
return nil
}
// updateDynamicMetric updates the values for a dynamic metric. Returns the
// metrics which were updated in a map by ID, or an error if applicable.
func (m *EventMetrics) updateDynamicMetric(ev []*Event, mx *dynamicMetric) (map[string]metrics2.Float64Metric, error) {
tagSets, values, err := mx.agg(ev)
if err != nil {
return nil, err
}
if len(tagSets) != len(values) {
return nil, fmt.Errorf("DynamicAggregateFn must return slices of tags and values of equal length (got %d vs %d).", len(tagSets), len(values))
}
gotMetrics := map[string]metrics2.Float64Metric{}
for i, dynamicTags := range tagSets {
tags := util.CopyStringMap(mx.tags)
if err := checkTags(dynamicTags); err != nil {
return nil, err
}
for k, v := range dynamicTags {
tags[k] = v
}
m := metrics2.GetFloat64Metric(mx.measurement, tags)
m.Update(values[i])
// TODO(borenet): Should we include the measurement name?
id, err := util.MD5Sum(tags)
if err != nil {
return nil, err
}
gotMetrics[id] = m
}
return gotMetrics, nil
}
// updateMetrics recalculates values for all metrics using the given value for
// the current time.
func (m *EventMetrics) updateMetrics(now time.Time) error {
m.mtx.Lock()
defer m.mtx.Unlock()
errs := []error{}
for stream, byPeriod := range m.metrics {
for period, metrics := range byPeriod {
ev, err := m.db.Range(stream, now.Add(-period), now)
if err != nil {
errs = append(errs, fmt.Errorf("Failed to retrieve %q events from range %s - %s: %s", stream, now.Add(-period), now, err))
continue
}
for _, mx := range metrics {
if err := m.updateMetric(ev, mx); err != nil {
errs = append(errs, fmt.Errorf("Failed to update metric: %+v\n%s", mx, err))
continue
}
}
}
}
gotDynamicMetrics := map[string]metrics2.Float64Metric{}
for stream, byPeriod := range m.dynamicMetrics {
for period, metrics := range byPeriod {
ev, err := m.db.Range(stream, now.Add(-period), now)
if err != nil {
errs = append(errs, fmt.Errorf("Failed to retrieve %q events from range %s - %s (2): %s", stream, now.Add(-period), now, err))
continue
}
for _, mx := range metrics {
got, err := m.updateDynamicMetric(ev, mx)
if err != nil {
errs = append(errs, fmt.Errorf("Failed to update dynamic metric: %+v\n%s", mx, err))
continue
}
for k, v := range got {
gotDynamicMetrics[k] = v
}
}
}
}
// Delete any no-longer-generated metrics.
for k, v := range m.currentDynamicMetrics {
if _, ok := gotDynamicMetrics[k]; !ok {
if err := v.Delete(); err != nil {
errs = append(errs, fmt.Errorf("Failed to delete old metric %q: %s", k, err))
}
}
}
m.currentDynamicMetrics = gotDynamicMetrics
if len(errs) > 0 {
return fmt.Errorf("UpdateMetrics errors: %v", errs)
}
return nil
}
// UpdateMetrics recalculates values for all metrics.
func (m *EventMetrics) UpdateMetrics() error {
return m.updateMetrics(time.Now())
}
// LogMetrics logs the current values for all metrics.
func (m *EventMetrics) LogMetrics() {
m.mtx.Lock()
defer m.mtx.Unlock()
sklog.Infof("Current event metrics values:")
for _, byPeriod := range m.metrics {
for _, metrics := range byPeriod {
for _, mx := range metrics {
v := metrics2.GetFloat64Metric(mx.measurement, mx.tags).Get()
sklog.Infof(" %s %v: %f", mx.measurement, mx.tags, v)
}
}
}
}
// GetEventStream returns an EventStream instance.
func (m *EventMetrics) GetEventStream(name string) *EventStream {
m.mtx.Lock()
defer m.mtx.Unlock()
return &EventStream{
name: name,
m: m,
}
}