blob: 180727b928d0691f93edcc4be74d23c73c794e8d [file] [log] [blame]
package sink
import (
"context"
"encoding/json"
"cloud.google.com/go/pubsub"
"go.skia.org/infra/go/metrics2"
"go.skia.org/infra/go/skerr"
"go.skia.org/infra/go/sklog"
"go.skia.org/infra/machine/go/common"
"go.skia.org/infra/machine/go/machine"
"go.skia.org/infra/machine/go/machineserver/config"
)
// SinkImpl implements the Sink interface using Google Cloud PubSub.
type SinkImpl struct {
topic *pubsub.Topic
sendSuccess metrics2.Counter
sendFailure metrics2.Counter
}
// New return a new SinkImpl instance.
func New(ctx context.Context, local bool, instanceConfig config.InstanceConfig) (*SinkImpl, error) {
_, topic, err := common.NewPubSubClient(ctx, local, instanceConfig)
if err != nil {
return nil, skerr.Wrapf(err, "Failed to create pubsub client for SinkImpl.")
}
sklog.Infof("Sink started for topic: %q", topic.String())
return &SinkImpl{
topic: topic,
sendSuccess: metrics2.GetCounter("machine_sink_send_success"),
sendFailure: metrics2.GetCounter("machine_sink_send_failure"),
}, nil
}
// Send implements the Sink interface.
func (s *SinkImpl) Send(ctx context.Context, event machine.Event) error {
b, err := json.Marshal(event)
if err != nil {
s.sendFailure.Inc(1)
return skerr.Wrapf(err, "Failed to serialize the event.")
}
msg := &pubsub.Message{
Data: b,
}
_, err = s.topic.Publish(ctx, msg).Get(ctx)
if err != nil {
s.sendFailure.Inc(1)
return skerr.Wrapf(err, "Failed to send message.")
}
s.sendSuccess.Inc(1)
return nil
}
// Affirm that *SinkImpl implements the Sink interface.
var _ Sink = (*SinkImpl)(nil)