blob: 0af060bfa43ec3f500d0d95d417dff5e031630a4 [file] [log] [blame]
// The diffcalculator executable listens to the Pub/Sub topic and processes diffs based on the
// messages passed in. For an overview of Pub/Sub, see https://cloud.google.com/pubsub/docs
package main
import (
"context"
"encoding/json"
"flag"
"io/ioutil"
"net/http"
"path"
"sync"
"sync/atomic"
"time"
"cloud.google.com/go/pubsub"
gstorage "cloud.google.com/go/storage"
"github.com/jackc/pgx/v4/pgxpool"
"go.opencensus.io/trace"
"go.skia.org/infra/go/common"
"go.skia.org/infra/go/httputils"
"go.skia.org/infra/go/metrics2"
"go.skia.org/infra/go/skerr"
"go.skia.org/infra/go/sklog"
"go.skia.org/infra/go/util"
"go.skia.org/infra/golden/go/config"
"go.skia.org/infra/golden/go/diff"
"go.skia.org/infra/golden/go/diff/worker"
"go.skia.org/infra/golden/go/sql"
"go.skia.org/infra/golden/go/tracing"
"go.skia.org/infra/golden/go/types"
)
const (
// An arbitrary amount.
maxSQLConnections = 20
// The GCS folder that contains the images, named by their digests.
imgFolder = "dm-images-v1"
)
type diffCalculatorConfig struct {
config.Common
// DiffCacheNamespace is a namespace for differentiating the DiffCache entities. The instance
// name is fine here.
DiffCacheNamespace string `json:"diff_cache_namespace" optional:"true"`
// DiffWorkSubscription is the subscription name used by all replicas of the diffcalculator.
// By setting the subscriber ID to be the same on all instances of the diffcalculator,
// only one of the replicas will get each event (usually). We like our subscription names
// to be unique and keyed to the instance, for easier following up on "Why are there so many
// backed up messages?"
DiffWorkSubscription string `json:"diff_work_subscription"`
// MemcachedServer is the address in the form dns_name:port
// (e.g. gold-memcached-0.gold-memcached:11211).
MemcachedServer string `json:"memcached_server" optional:"true"`
// PubSubFetchSize is how many worker messages to ask PubSub for. This defaults to 10, but for
// instances that have many tests, but most of the messages result in no-ops, this can be
// higher for better utilization and throughput.
PubSubFetchSize int `json:"pubsub_fetch_size" optional:"true"`
}
func main() {
// Command line flags.
var (
commonInstanceConfig = flag.String("common_instance_config", "", "Path to the json5 file containing the configuration that needs to be the same across all services for a given instance.")
thisConfig = flag.String("config", "", "Path to the json5 file containing the configuration specific to baseline server.")
hang = flag.Bool("hang", false, "Stop and do nothing after reading the flags. Good for debugging containers.")
)
// Parse the options. So we can configure logging.
flag.Parse()
if *hang {
sklog.Info("Hanging")
select {}
}
var dcc diffCalculatorConfig
if err := config.LoadFromJSON5(&dcc, commonInstanceConfig, thisConfig); err != nil {
sklog.Fatalf("Reading config: %s", err)
}
sklog.Infof("Loaded config %#v", dcc)
// Set up the logging options.
logOpts := []common.Opt{
common.PrometheusOpt(&dcc.PromPort),
}
common.InitWithMust("diffcalculator", logOpts...)
// We expect there to be a lot of diff work, so we sample 1% of them by default
// to avoid incurring too much overhead.
tp := 0.01
if dcc.TracingProportion > tp {
tp = dcc.TracingProportion
}
if err := tracing.Initialize(tp); err != nil {
sklog.Fatalf("Could not set up tracing: %s", err)
}
ctx := context.Background()
db := mustInitSQLDatabase(ctx, dcc)
gis := mustMakeGCSImageSource(ctx, dcc)
sqlProcessor := &processor{
calculator: worker.NewV2(db, gis, dcc.WindowSize),
ackCounter: metrics2.GetCounter("diffcalculator_ack"),
nackCounter: metrics2.GetCounter("diffcalculator_nack"),
}
go func() {
// Wait at least 5 seconds for the pubsub connection to be initialized before saying
// we are healthy.
time.Sleep(5 * time.Second)
http.HandleFunc("/healthz", httputils.ReadyHandleFunc)
sklog.Fatal(http.ListenAndServe(dcc.ReadyPort, nil))
}()
startMetrics(ctx, sqlProcessor)
sklog.Fatalf("Listening for work %s", listen(ctx, dcc, sqlProcessor))
}
func mustInitSQLDatabase(ctx context.Context, dcc diffCalculatorConfig) *pgxpool.Pool {
if dcc.SQLDatabaseName == "" {
sklog.Fatalf("Must have SQL Database Information")
}
url := sql.GetConnectionURL(dcc.SQLConnection, dcc.SQLDatabaseName)
conf, err := pgxpool.ParseConfig(url)
if err != nil {
sklog.Fatalf("error getting postgres config %s: %s", url, err)
}
conf.MaxConns = maxSQLConnections
db, err := pgxpool.ConnectConfig(ctx, conf)
if err != nil {
sklog.Fatalf("error connecting to the database: %s", err)
}
sklog.Infof("Connected to SQL database %s", dcc.SQLDatabaseName)
return db
}
func mustMakeGCSImageSource(ctx context.Context, dcc diffCalculatorConfig) worker.ImageSource {
// Reads credentials from the env variable GOOGLE_APPLICATION_CREDENTIALS.
storageClient, err := gstorage.NewClient(ctx)
if err != nil {
sklog.Fatalf("Making GCS Image source: %s", storageClient)
}
return &gcsImageDownloader{
client: storageClient,
bucket: dcc.GCSBucket,
}
}
// TODO(kjlubick) maybe deduplicate with storage.GCSClient
type gcsImageDownloader struct {
client *gstorage.Client
bucket string
}
// GetImage downloads the image with the corresponding digest (name) from GCS.
func (g *gcsImageDownloader) GetImage(ctx context.Context, digest types.Digest) ([]byte, error) {
// intentionally using path because gcs is forward slashes
imgPath := path.Join(imgFolder, string(digest)+".png")
r, err := g.client.Bucket(g.bucket).Object(imgPath).NewReader(ctx)
if err != nil {
// If not image not found, this error path will be taken.
return nil, skerr.Wrap(err)
}
defer util.Close(r)
b, err := ioutil.ReadAll(r)
return b, skerr.Wrap(err)
}
func listen(ctx context.Context, dcc diffCalculatorConfig, p *processor) error {
psc, err := pubsub.NewClient(ctx, dcc.PubsubProjectID)
if err != nil {
return skerr.Wrapf(err, "initializing pubsub client for project %s", dcc.PubsubProjectID)
}
// Check that the topic exists. Fail if it does not.
t := psc.Topic(dcc.DiffWorkTopic)
if exists, err := t.Exists(ctx); err != nil {
return skerr.Wrapf(err, "checking for existing topic %s", dcc.DiffWorkTopic)
} else if !exists {
return skerr.Fmt("Diff work topic %s does not exist in project %s", dcc.DiffWorkTopic, dcc.PubsubProjectID)
}
// Check that the subscription exists. Fail if it does not.
sub := psc.Subscription(dcc.DiffWorkSubscription)
if exists, err := sub.Exists(ctx); err != nil {
return skerr.Wrapf(err, "checking for existing subscription %s", dcc.DiffWorkSubscription)
} else if !exists {
return skerr.Fmt("subscription %s does not exist in project %s", dcc.DiffWorkSubscription, dcc.PubsubProjectID)
}
// This is a limit of how many messages to fetch when PubSub has no work. Waiting for PubSub
// to give us messages can take a second or two, so we choose a small, but not too small
// batch size.
if dcc.PubSubFetchSize == 0 {
sub.ReceiveSettings.MaxOutstandingMessages = 10
} else {
sub.ReceiveSettings.MaxOutstandingMessages = dcc.PubSubFetchSize
}
// This process will handle one message at a time. This allows us to more finely control the
// scaling up as necessary.
sub.ReceiveSettings.NumGoroutines = 1
// Blocks until context cancels or pubsub fails in a non retryable way.
return skerr.Wrap(sub.Receive(ctx, p.processPubSubMessage))
}
type processor struct {
calculator diff.Calculator
ackCounter metrics2.Counter
nackCounter metrics2.Counter
// busy is either 1 or 0 depending on if this processor is working or not. This allows us
// to gather data on wall-clock utilization.
busy int64
// PubSub sometimes gives us more than one messages at a time. This mutex ensures that
// we only really process one at a time, which makes sure we don't overload our CPU estimate
// and we avoid cache thrashing.
oneMessageAtATime sync.Mutex
}
// processPubSubMessage processes the data in the given pubsub message and acks or nacks it
// as appropriate.
func (p *processor) processPubSubMessage(ctx context.Context, msg *pubsub.Message) {
p.oneMessageAtATime.Lock()
defer p.oneMessageAtATime.Unlock()
ctx, span := trace.StartSpan(ctx, "processFromPubSub")
defer span.End()
atomic.StoreInt64(&p.busy, 1)
if shouldAck := p.processMessage(ctx, msg.Data); shouldAck {
msg.Ack()
p.ackCounter.Inc(1)
} else {
msg.Nack()
p.nackCounter.Inc(1)
}
atomic.StoreInt64(&p.busy, 0)
}
// processMessage reads the bytes as JSON and calls CalculateDiffs if those bytes were valid.
// We have this as its own function to make it easier to test (it's hard to instantiate a valid
// pubsub message without the emulator because there are private members that need initializing).
// It returns a bool that represents whether the message should be Ack'd (not retried) or Nack'd
// (retried later).
func (p *processor) processMessage(ctx context.Context, msgData []byte) bool {
defer metrics2.FuncTimer().Stop()
var wm diff.WorkerMessage
if err := json.Unmarshal(msgData, &wm); err != nil {
sklog.Errorf("Invalid message passed in: %s\n%s", err, string(msgData))
return true // ack this message so no other subscriber gets it (it will still be invalid).
}
if wm.Version != diff.WorkerMessageVersion {
return true // This is an old or a new message, skip it.
}
// Prevent our workers from getting starved out with long-running tasks. Cancel them, an
// requeue them. CalculateDiffs should be streaming results, so we get some partial progress.
ctx, cancel := context.WithTimeout(ctx, 10*time.Minute)
defer cancel()
err := p.calculator.CalculateDiffs(ctx, wm.Grouping, wm.AdditionalLeft, wm.AdditionalRight)
if err != nil {
sklog.Errorf("Calculating diffs for %v: %s", wm, err)
return false // Let this be tried again.
}
return true // successfully processed.
}
func startMetrics(ctx context.Context, p *processor) {
// This metric will let us get a sense of how well-utilized this processor is. It reads the
// busy int of the processor (which is 0 or 1) and increments the counter with that value.
// Because we are updating the counter once per second, we can use rate() [which computes deltas
// per second] on this counter to get a number between 0 and 1 to indicate wall-clock
// utilization. Hopefully, this lets us know if we need to add more replicas.
go func() {
busy := metrics2.GetCounter("diffcalculator_busy_pulses")
for range time.Tick(time.Second) {
if err := ctx.Err(); err != nil {
return
}
busy.Inc(atomic.LoadInt64(&p.busy))
}
}()
}