blob: 2511733f2f0c1d98edfb0a6cbf12e8775efe5889 [file] [log] [blame]
// metrics2 is a client library for recording and reporting monitoring data.
package metrics2
import (
"fmt"
"net/http"
"os"
"time"
"github.com/gorilla/mux"
"github.com/prometheus/client_golang/prometheus/promhttp"
"github.com/skia-dev/glog"
"go.skia.org/infra/go/influxdb"
)
// Timer is a struct used for measuring elapsed time. Unlike the other metrics
// helpers, timer does not continuously report data; instead, it reports a
// single data point when Stop() is called.
type Timer interface {
// Start starts or resets the timer.
Start()
// Stop stops the timer and reports the elapsed time.
Stop()
}
// Liveness keeps a time-since-last-successful-update metric.
//
// The unit of the metrics is in seconds.
//
// It is used to keep track of periodic processes to make sure that they are running
// successfully. Every liveness metric should have a corresponding alert set up that
// will fire of the time-since-last-successful-update metric gets too large.
type Liveness interface {
// Get returns the current value of the Liveness.
Get() int64
// ManualReset sets the last-successful-update time of the Liveness to a specific value. Useful for tracking processes whose lifetimes are outside of that of the current process, but should not be needed in most cases.
ManualReset(lastSuccessfulUpdate time.Time)
// Reset should be called when some work has been successfully completed.
Reset()
}
// Int64Metric is a metric which reports an int64 value.
type Int64Metric interface {
// Delete removes the metric from its Client's registry.
Delete() error
// Get returns the current value of the metric.
Get() int64
// Update adds a data point to the metric.
Update(v int64)
}
// Float64Metric is a metric which reports a float64 value.
type Float64Metric interface {
// Delete removes the metric from its Client's registry.
Delete() error
// Get returns the current value of the metric.
Get() float64
// Update adds a data point to the metric.
Update(v float64)
}
// Float64SummaryMetric is a metric which reports a summary of many float64 values.
type Float64SummaryMetric interface {
// Observe adds a data point to the metric.
Observe(v float64)
}
// Counter is a struct used for tracking metrics which increment or decrement.
type Counter interface {
// Dec decrements the counter by the given quantity.
Dec(i int64)
// Delete removes the counter from metrics.
Delete() error
// Get returns the current value in the counter.
Get() int64
// Inc increments the counter by the given quantity.
Inc(i int64)
// Reset sets the counter to zero.
Reset()
}
// Client represents a set of metrics.
type Client interface {
// Flush pushes any queued data immediately. Long running apps shouldn't worry about this as Client will auto-push every so often.
Flush() error
// GetCounter creates or retrieves a Counter with the given name and tag set and returns it.
GetCounter(name string, tagsList ...map[string]string) Counter
// GetFloat64Metric returns a Float64Metric instance.
GetFloat64Metric(measurement string, tags ...map[string]string) Float64Metric
// GetInt64Metric returns an Int64Metric instance.
GetInt64Metric(measurement string, tags ...map[string]string) Int64Metric
// GetFloat64SummaryMetric returns an Float64SummaryMetric instance.
GetFloat64SummaryMetric(measurement string, tags ...map[string]string) Float64SummaryMetric
// NewLiveness creates a new Liveness metric helper.
NewLiveness(name string, tagsList ...map[string]string) Liveness
// NewTimer creates and returns a new started timer.
NewTimer(name string, tagsList ...map[string]string) Timer
}
var (
defaultInfluxClient *influxClient = &influxClient{
aggMetrics: map[string]*aggregateMetric{},
counters: map[string]*counter{},
metrics: map[string]*rawMetric{},
reportFrequency: time.Minute,
}
defaultClient Client = defaultInfluxClient
)
// GetDefaultClient returns the default Client.
func GetDefaultClient() Client {
return defaultClient
}
// Init() initializes the metrics package.
func Init(appName string, influxDbClient *influxdb.Client) error {
hostName, err := os.Hostname()
if err != nil {
return fmt.Errorf("Unable to retrieve hostname: %s", err)
}
tags := map[string]string{
"app": appName,
"host": hostName,
}
clientI, err := NewClient(influxDbClient, tags, DEFAULT_REPORT_FREQUENCY)
if err != nil {
return err
}
c := clientI.(*influxClient)
// Some metrics may already be registered with defaultInfluxClient. Copy them
// over.
c.aggMetrics = defaultInfluxClient.aggMetrics
c.counters = defaultInfluxClient.counters
c.metrics = defaultInfluxClient.metrics
// Set the default client.
defaultClient = c
defaultInfluxClient = c
return nil
}
// InitPrometheus initializes metrics to be reported to Prometheus.
//
// port - string, The port on which to serve the metrics, e.g. ":10110".
func InitPrometheus(port string) {
r := mux.NewRouter()
r.Handle("/metrics", promhttp.Handler())
go func() {
glog.Fatal(http.ListenAndServe(port, r))
}()
defaultClient = newPromClient()
defaultInfluxClient = nil
}
// InitPromInflux initializes metrics to be reported to both InfluxDB and Prometheus.
//
// port - string, The port on which to serve the metrics, e.g. ":10110".
func InitPromInflux(appName string, influxDbClient *influxdb.Client, port string) error {
if err := Init(appName, influxDbClient); err != nil {
return fmt.Errorf("Failed to Init Influx metrics: %s", err)
}
influxClient := defaultClient
InitPrometheus(port)
promClient := defaultClient
var err error
defaultClient, err = newMuxClient([]Client{promClient, influxClient})
if err != nil {
return fmt.Errorf("Failed to create MuxClient: %s", err)
}
return nil
}
// InitPromMaybeInflux initalizes Prometheus metrics and also checks if
// InfluxDB metrics are also being used, and if so then metrics are sent to
// both metrics packages.
//
// CLIENTS SHOULD NOT CALL InitPromMaybeInflux directly. Instead use common.InitWith().
func InitPromMaybeInflux(port string) error {
// We always start with influx as the defaultClient, but if it's
// not been initialized with a working client by now we know that
// we aren't using influx, so we can just overwrite the default client,
// otherwise we need to construct a mux client that records the metrics
// into both prom and influx.
if inf, ok := defaultClient.(*influxClient); ok && inf.influxDbClient == nil {
InitPrometheus(port)
} else {
influxClient := defaultClient
InitPrometheus(port)
promClient := defaultClient
var err error
defaultClient, err = newMuxClient([]Client{promClient, influxClient})
if err != nil {
return fmt.Errorf("Failed to create MuxClient: %s", err)
}
}
return nil
}
// NewClient returns a Client which uses the given influxdb.Client to push data.
// defaultTags specifies a set of default tag keys and values which are applied
// to all data points. reportFrequency specifies how often metrics should create
// data points.
func NewClient(influxDbClient *influxdb.Client, defaultTags map[string]string, reportFrequency time.Duration) (Client, error) {
values, err := influxDbClient.NewBatchPoints()
if err != nil {
return nil, err
}
c := &influxClient{
aggMetrics: map[string]*aggregateMetric{},
counters: map[string]*counter{},
influxDbClient: influxDbClient,
defaultTags: defaultTags,
metrics: map[string]*rawMetric{},
reportFrequency: reportFrequency,
values: values,
}
go func() {
for _ = range time.Tick(PUSH_FREQUENCY) {
byMeasurement, err := c.pushData()
if err != nil {
glog.Errorf("Failed to push data into InfluxDB: %s", err)
} else {
total := int64(0)
for k, v := range byMeasurement {
c.GetInt64Metric("metrics.points-pushed.by-measurement", map[string]string{"measurement": k}).Update(v)
total += v
}
c.GetInt64Metric("metrics.points-pushed.total", nil).Update(total)
}
}
}()
go func() {
for _ = range time.Tick(reportFrequency) {
c.collectMetrics()
c.collectAggregateMetrics()
}
}()
return c, nil
}