blob: 4bcaf4566c3f145528212c97b888568abba0a34a [file] [log] [blame]
// alert-to-pubsub accepts POST messages from Prometheus for alerts and publishes them to Google PubSub.
package main
import (
"context"
"encoding/json"
"flag"
"io"
"net/http"
"time"
"cloud.google.com/go/pubsub"
"github.com/go-chi/chi/v5"
"go.skia.org/infra/go/alerts"
"go.skia.org/infra/go/common"
"go.skia.org/infra/go/httputils"
"go.skia.org/infra/go/metrics2"
"go.skia.org/infra/go/sklog"
"go.skia.org/infra/go/util"
"golang.org/x/oauth2/google"
"google.golang.org/api/option"
)
// flags
var (
location = flag.String("location", "skia-public", "The name where this prometheus server is running.")
period = flag.Duration("period", 15*time.Second, "How often to query for alerts.")
port = flag.String("port", ":8000", "HTTP service address (e.g., ':8000')")
project = flag.String("project", "skia-public", "The GCE project name.")
promPort = flag.String("prom_port", ":20000", "Metrics service address (e.g., ':10110')")
simulate = flag.Bool("simulate", false, "Send simulated alerts as opposed to polling Prometheus.")
)
// metrics
var (
failureCounter = metrics2.GetCounter("pubsub_send_failure", nil)
successCounter = metrics2.GetCounter("pubsub_send_success", nil)
liveness = metrics2.NewLiveness("alert_to_pubsub_incoming_alerts")
)
var (
// Version can be changed via -ldflags.
Version = "kubernetes"
)
var (
sim1 = map[string]string{
"__name__": "ALERTS",
"alertname": "BotUnemployed",
"alertstate": "firing",
"bot": "skia-rpi-064",
"category": "infra",
"instance": "skia-datahopper2:20000",
"job": "datahopper",
"pool": "Skia",
"severity": "critical",
"swarming": "chromium-swarm.appspot.com",
}
sim2 = map[string]string{
"__name__": "ALERTS",
"alertname": "BotMissing",
"alertstate": "firing",
"bot": "skia-rpi-064",
"category": "infra",
"instance": "skia-datahopper2:20000",
"job": "datahopper",
"pool": "Skia",
"severity": "critical",
"swarming": "chromium-swarm.appspot.com",
}
stateFromResolved = map[bool]string{
true: alerts.STATE_RESOLVED,
false: alerts.STATE_ACTIVE,
}
)
const (
LINK_TO_SOURCE = "link_to_source"
)
// Optimally we would just use the prometheus code itself for the definition of
// Alert, but their code includes a different version of opencensus initialized
// in an init() call somewhere in their code.
// Alert is a generic representation of an alert in the Prometheus eco-system.
type Alert struct {
// Label value pairs for purpose of aggregation, matching, and disposition
// dispatching. This must minimally include an "alertname" label.
Labels map[string]string `json:"labels"`
// Extra key/value information which does not define alert identity.
Annotations map[string]string `json:"annotations"`
// The known time range for this alert. Both ends are optional.
StartsAt time.Time `json:"startsAt,omitempty"`
EndsAt time.Time `json:"endsAt,omitempty"`
GeneratorURL string `json:"generatorURL,omitempty"`
}
// Resolved returns true if the activity interval ended in the past.
func (a *Alert) Resolved() bool {
resolved := a.ResolvedAt(time.Now())
if resolved {
sklog.Warningf("This received alert is already resolved: %+v", a)
}
return resolved
}
// ResolvedAt returns true if the activity interval ended before
// the given timestamp.
func (a *Alert) ResolvedAt(ts time.Time) bool {
if a.EndsAt.IsZero() {
return false
}
return !a.EndsAt.After(ts)
}
func sendPubSub(ctx context.Context, m map[string]string, topic *pubsub.Topic) {
m[alerts.LOCATION] = *location
b, err := json.Marshal(m)
if err != nil {
sklog.Errorf("Failed to encode message Data: %s: %#v", err, m)
return
}
msg := &pubsub.Message{
Data: b,
}
res := topic.Publish(ctx, msg)
if _, err := res.Get(ctx); err != nil {
failureCounter.Inc(1)
sklog.Errorf("Failed to send message %+v: %s", msg, err)
} else {
successCounter.Inc(1)
}
}
func singleStep(ctx context.Context, client *http.Client, topic *pubsub.Topic) error {
if *simulate {
sendPubSub(ctx, sim1, topic)
sendPubSub(ctx, sim2, topic)
}
// Send a healthz alert to be used as a marker that pubsub is still working.
m := map[string]string{
"__name__": "HEALTHZ",
}
sklog.Info("healthz")
sendPubSub(ctx, m, topic)
return nil
}
type Server struct {
topic *pubsub.Topic
}
func NewServer(topic *pubsub.Topic) *Server {
return &Server{
topic: topic,
}
}
func (s *Server) alertHandler(w http.ResponseWriter, r *http.Request) {
liveness.Reset()
var incomingAlerts []Alert
b, err := io.ReadAll(r.Body)
if err != nil {
httputils.ReportError(w, err, "Failed to read JSON.", http.StatusInternalServerError)
return
}
defer util.Close(r.Body)
if err := json.Unmarshal(b, &incomingAlerts); err != nil {
httputils.ReportError(w, err, "Failed to decode JSON.", http.StatusInternalServerError)
return
}
sklog.Infof("Received %d incomingAlerts.", len(incomingAlerts))
for _, alert := range incomingAlerts {
m := map[string]string{
alerts.STATE: stateFromResolved[alert.Resolved()],
LINK_TO_SOURCE: alert.GeneratorURL,
}
for k, v := range alert.Labels {
m[k] = v
}
for k, v := range alert.Annotations {
m[k] = v
}
// Do not use r.Context() here because we could run into "context deadline exceeded".
sendPubSub(context.Background(), m, s.topic)
}
}
func main() {
common.InitWithMust(
"alert-to-pubsub",
common.PrometheusOpt(promPort),
)
sklog.Infof("Version: %s", Version)
ctx := context.Background()
ts, err := google.DefaultTokenSource(ctx, pubsub.ScopePubSub)
if err != nil {
sklog.Fatal(err)
}
client, err := pubsub.NewClient(ctx, *project, option.WithTokenSource(ts))
if err != nil {
sklog.Fatal(err)
}
topic := client.Topic(alerts.TOPIC)
exists, err := topic.Exists(ctx)
if err != nil {
sklog.Fatal(err)
}
if !exists {
topic, err = client.CreateTopic(ctx, alerts.TOPIC)
if err != nil {
sklog.Fatal(err)
}
}
httpClient := httputils.NewTimeoutClient()
go func() {
for range time.Tick(*period) {
if err := singleStep(ctx, httpClient, topic); err != nil {
sklog.Errorf("Failed step: %s", err)
}
}
}()
server := NewServer(topic)
r := chi.NewRouter()
r.HandleFunc("/api/v1/alerts", server.alertHandler)
h := httputils.LoggingRequestResponse(r)
h = httputils.HealthzAndHTTPS(h)
http.Handle("/", h)
sklog.Info("Ready to serve.")
sklog.Fatal(http.ListenAndServe(*port, nil))
}