blob: 44efb5435a5c346a04def85e31b2fe45298bf5f2 [file] [log] [blame]
package gcloud_metrics
import (
"context"
"fmt"
"time"
monitoring "cloud.google.com/go/monitoring/apiv3"
"github.com/googleapis/gax-go/v2"
"go.skia.org/infra/go/metrics2"
"go.skia.org/infra/go/now"
"go.skia.org/infra/go/skerr"
"go.skia.org/infra/go/sklog"
"go.skia.org/infra/go/util"
"golang.org/x/oauth2"
"google.golang.org/api/iterator"
"google.golang.org/api/option"
monitoringpb "google.golang.org/genproto/googleapis/monitoring/v3"
"google.golang.org/protobuf/types/known/timestamppb"
)
const (
// metricsPeriod was chosen to be large enough to contain at least one data
// point, with some extra buffer, since Cloud Monitoring has a sample rate
// of one data point per minute.
metricsPeriod = 3 * time.Minute
metricsInterval = time.Minute
pubsubMetricPrefix = "pubsub.googleapis.com/subscription/"
)
var (
pubsubMetrics = map[string]string{
pubsubMetricPrefix + "num_undelivered_messages": "pubsub_num_undelivered_messages",
pubsubMetricPrefix + "oldest_unacked_message_age": "pubsub_oldest_unacked_message_age_s",
}
pubsubIncludeLabels = []string{"subscription_id"}
)
// StartGCloudMetrics begins ingesting metrics from the GCloud Monitoring API.
func StartGCloudMetrics(ctx context.Context, projects []string, ts oauth2.TokenSource) error {
metricsClientInner, err := monitoring.NewMetricClient(ctx, option.WithTokenSource(ts))
if err != nil {
return skerr.Wrap(err)
}
metricsClient := &metricClientWrapper{
metricClient: metricsClientInner,
}
for _, project := range projects {
project := project // Prevents re-use of the variable causing problems in the goroutine.
lvReportGCloudMetrics := metrics2.NewLiveness("last_successful_report_gcloud_metrics", map[string]string{
"project": project,
})
oldMetrics := map[metrics2.Int64Metric]struct{}{}
go util.RepeatCtx(ctx, metricsInterval, func(ctx context.Context) {
newMetrics, err := ingestMetricsForProject(ctx, metricsClient, project)
if err != nil {
sklog.Error(err)
return
}
newMetricsMap := make(map[metrics2.Int64Metric]struct{}, len(newMetrics))
for _, m := range newMetrics {
newMetricsMap[m] = struct{}{}
}
var cleanup []metrics2.Int64Metric
for m := range oldMetrics {
if _, ok := newMetricsMap[m]; !ok {
cleanup = append(cleanup, m)
}
}
if len(cleanup) > 0 {
failedDelete := cleanupOldMetrics(cleanup)
for _, m := range failedDelete {
newMetricsMap[m] = struct{}{}
}
}
oldMetrics = newMetricsMap
lvReportGCloudMetrics.Reset()
})
}
return nil
}
// cleanupOldMetrics deletes old metrics. This fixes the case where metrics no
// longer appear upstream but their metrics hang around without being updated.
func cleanupOldMetrics(oldMetrics []metrics2.Int64Metric) []metrics2.Int64Metric {
failedDelete := []metrics2.Int64Metric{}
for _, m := range oldMetrics {
if err := m.Delete(); err != nil {
sklog.Warningf("Failed to delete metric: %s", err)
failedDelete = append(failedDelete, m)
}
}
return failedDelete
}
// ingestMetricsForProject ingests all GCloud metrics for a particular projects.
func ingestMetricsForProject(ctx context.Context, metricsClient MetricClient, project string) ([]metrics2.Int64Metric, error) {
rv := []metrics2.Int64Metric{}
// PubSub metrics.
pubsubMetrics, err := ingestPubSubMetrics(ctx, metricsClient, project)
if err != nil {
return nil, skerr.Wrap(err)
}
rv = append(rv, pubsubMetrics...)
// TODO: Additional types of metrics go here, following the same pattern as
// PubSub above.
return rv, nil
}
// ingestPubSubMetrics ingests Cloud PubSub metrics for a project.
func ingestPubSubMetrics(ctx context.Context, metricsClient MetricClient, project string) ([]metrics2.Int64Metric, error) {
endTs := now.Now(ctx)
endTime := timestamppb.New(endTs)
startTime := timestamppb.New(endTs.Add(-metricsPeriod))
rv := []metrics2.Int64Metric{}
for metricType, measurement := range pubsubMetrics {
metrics, err := ingestTimeSeries(ctx, metricsClient, project, metricType, measurement, pubsubIncludeLabels, startTime, endTime)
if err != nil {
return nil, skerr.Wrap(err)
}
rv = append(rv, metrics...)
}
return rv, nil
}
// MetricClient provides an abstraction for monitoring.MetricClient, used for
// testing.
type MetricClient interface {
ListTimeSeries(context.Context, *monitoringpb.ListTimeSeriesRequest, ...gax.CallOption) TimeSeriesIterator
}
// TimeSeriesIterator provides an abstraction for monitoring.TimeSeriesIterator,
// used for testing.
type TimeSeriesIterator interface {
Next() (*monitoringpb.TimeSeries, error)
}
// metricClientWrapper wraps a monitoring.MetricClient to implement
// MetricClient.
type metricClientWrapper struct {
metricClient *monitoring.MetricClient
}
// ListTimeSeries implements MetricClient.
func (w *metricClientWrapper) ListTimeSeries(ctx context.Context, req *monitoringpb.ListTimeSeriesRequest, opts ...gax.CallOption) TimeSeriesIterator {
return w.metricClient.ListTimeSeries(ctx, req, opts...)
}
// ingestTimeSeries ingests data from one or more time series matching the given
// metricType within the given project. The data is stored under the given
// measurement name. The includeLabels must deduplicate the time series
// sufficiently to prevent writing conflicting values to the same metric.
func ingestTimeSeries(ctx context.Context, metricsClient MetricClient, project, metricType, measurement string, includeLabels []string, startTime, endTime *timestamppb.Timestamp) ([]metrics2.Int64Metric, error) {
rv := []metrics2.Int64Metric{}
req := &monitoringpb.ListTimeSeriesRequest{
Name: fmt.Sprintf("projects/%s", project),
Filter: fmt.Sprintf("metric.type = %q", metricType),
Interval: &monitoringpb.TimeInterval{
EndTime: endTime,
StartTime: startTime,
},
// ListTimeSeriesRequest_FULL gets us the actual data points from the
// time series, as opposed to just the metadata.
View: monitoringpb.ListTimeSeriesRequest_FULL,
}
iter := metricsClient.ListTimeSeries(ctx, req)
for {
timeSeries, err := iter.Next()
if err == iterator.Done {
break
}
if err != nil {
return nil, skerr.Wrap(err)
}
if len(timeSeries.Points) > 0 {
// Just use the very last data point.
value := timeSeries.Points[len(timeSeries.Points)-1].GetValue().GetInt64Value()
tags := make(map[string]string, len(includeLabels)+1)
for _, label := range includeLabels {
tags[label] = timeSeries.Resource.Labels[label]
}
tags["project"] = project
metric := metrics2.GetInt64Metric(measurement, tags)
metric.Update(value)
rv = append(rv, metric)
}
}
return rv, nil
}
var _ MetricClient = &metricClientWrapper{}