Adds the sser Go module.
This module is for handling Server-Sent Events from a
ReplicaSet in kubernetes.
This CL adds the interfaces, mocks, and implementations
for handling such connections from the server side.
Additionally it adds an example application that is deployed
to prod as sserexample.skia.org to demonstrate that this
works with our current prod environment, including
interoperating well with auth-proxy.
Change-Id: I10e1a237ad0746f185c5c77ac954cb23de9c5511
Reviewed-on: https://skia-review.googlesource.com/c/buildbot/+/596588
Reviewed-by: Erik Rose <erikrose@google.com>
Commit-Queue: Joe Gregorio <jcgregorio@google.com>
diff --git a/go.mod b/go.mod
index d601f46..8274095 100644
--- a/go.mod
+++ b/go.mod
@@ -178,6 +178,7 @@
github.com/prometheus/client_model v0.2.0 // indirect
github.com/prometheus/common v0.26.0 // indirect
github.com/prometheus/procfs v0.6.0 // indirect
+ github.com/r3labs/sse/v2 v2.8.1 // indirect
github.com/robertkrimen/otto v0.0.0-20200922221731-ef014fd054ac // indirect
github.com/russross/blackfriday/v2 v2.1.0 // indirect
github.com/sendgrid/rest v2.6.9+incompatible // indirect
@@ -204,6 +205,7 @@
golang.org/x/text v0.3.7 // indirect
golang.org/x/xerrors v0.0.0-20200804184101-5ec99f83aff1 // indirect
google.golang.org/appengine v1.6.7 // indirect
+ gopkg.in/cenkalti/backoff.v1 v1.1.0 // indirect
gopkg.in/check.v1 v1.0.0-20200902074654-038fdea0a05b // indirect
gopkg.in/inf.v0 v0.9.1 // indirect
gopkg.in/ini.v1 v1.66.3 // indirect
diff --git a/go.sum b/go.sum
index ade5dcc..4057c94 100644
--- a/go.sum
+++ b/go.sum
@@ -997,6 +997,8 @@
github.com/prometheus/procfs v0.6.0 h1:mxy4L2jP6qMonqmq+aTtOx1ifVWUgG/TAmntgbh3xv4=
github.com/prometheus/procfs v0.6.0/go.mod h1:cz+aTbrPOrUb4q7XlbU9ygM+/jj0fzG6c1xBZuNvfVA=
github.com/prometheus/tsdb v0.7.1/go.mod h1:qhTCs0VvXwvX/y3TZrWD7rabWM+ijKTux40TwIPHuXU=
+github.com/r3labs/sse/v2 v2.8.1 h1:lZH+W4XOLIq88U5MIHOsLec7+R62uhz3bIi2yn0Sg8o=
+github.com/r3labs/sse/v2 v2.8.1/go.mod h1:Igau6Whc+F17QUgML1fYe1VPZzTV6EMCnYktEmkNJ7I=
github.com/remyoudompheng/bigfft v0.0.0-20190728182440-6a916e37a237/go.mod h1:qqbHyh8v60DhA7CoWK5oRCqLrMHRGoxYCSS9EjAz6Eo=
github.com/robertkrimen/otto v0.0.0-20200922221731-ef014fd054ac h1:kYPjbEN6YPYWWHI6ky1J813KzIq/8+Wg4TO4xU7A/KU=
github.com/robertkrimen/otto v0.0.0-20200922221731-ef014fd054ac/go.mod h1:xvqspoSXJTIpemEonrMDFq6XzwHYYgToXWj5eRX1OtY=
@@ -1319,6 +1321,7 @@
golang.org/x/net v0.0.0-20190813141303-74dc4d7220e7/go.mod h1:z5CRVTTTmAJ677TzLLGU+0bjPO0LkuOLi4/5GtJWs/s=
golang.org/x/net v0.0.0-20190827160401-ba9fcec4b297/go.mod h1:z5CRVTTTmAJ677TzLLGU+0bjPO0LkuOLi4/5GtJWs/s=
golang.org/x/net v0.0.0-20190923162816-aa69164e4478/go.mod h1:z5CRVTTTmAJ677TzLLGU+0bjPO0LkuOLi4/5GtJWs/s=
+golang.org/x/net v0.0.0-20191116160921-f9c825593386/go.mod h1:z5CRVTTTmAJ677TzLLGU+0bjPO0LkuOLi4/5GtJWs/s=
golang.org/x/net v0.0.0-20191119073136-fc4aabc6c914/go.mod h1:z5CRVTTTmAJ677TzLLGU+0bjPO0LkuOLi4/5GtJWs/s=
golang.org/x/net v0.0.0-20191209160850-c0dbc17a3553/go.mod h1:z5CRVTTTmAJ677TzLLGU+0bjPO0LkuOLi4/5GtJWs/s=
golang.org/x/net v0.0.0-20200114155413-6afb5195e5aa/go.mod h1:z5CRVTTTmAJ677TzLLGU+0bjPO0LkuOLi4/5GtJWs/s=
@@ -1805,6 +1808,8 @@
google.golang.org/protobuf v1.28.0 h1:w43yiav+6bVFTBQFZX0r7ipe9JQ1QsbMgHwbBziscLw=
google.golang.org/protobuf v1.28.0/go.mod h1:HV8QOd/L58Z+nl8r43ehVNZIU/HEI6OcFqwMG9pJV4I=
gopkg.in/alecthomas/kingpin.v2 v2.2.6/go.mod h1:FMv+mEhP44yOT+4EoQTLFTRgOQ1FBLkstjWtayDeSgw=
+gopkg.in/cenkalti/backoff.v1 v1.1.0 h1:Arh75ttbsvlpVA7WtVpH4u9h6Zl46xuptxqLxPiSo4Y=
+gopkg.in/cenkalti/backoff.v1 v1.1.0/go.mod h1:J6Vskwqd+OMVJl8C33mmtxTBs2gyzfv7UDAkHu8BrjI=
gopkg.in/check.v1 v0.0.0-20161208181325-20d25e280405/go.mod h1:Co6ibVJAznAaIkqp8huTwlJQCZ016jof/cbN4VW5Yz0=
gopkg.in/check.v1 v1.0.0-20180628173108-788fd7840127/go.mod h1:Co6ibVJAznAaIkqp8huTwlJQCZ016jof/cbN4VW5Yz0=
gopkg.in/check.v1 v1.0.0-20190902080502-41f04d3bba15/go.mod h1:Co6ibVJAznAaIkqp8huTwlJQCZ016jof/cbN4VW5Yz0=
diff --git a/go/sser/BUILD.bazel b/go/sser/BUILD.bazel
new file mode 100644
index 0000000..c001153
--- /dev/null
+++ b/go/sser/BUILD.bazel
@@ -0,0 +1,49 @@
+load("//bazel/go:go_test.bzl", "go_test")
+load("@io_bazel_rules_go//go:def.bzl", "go_library")
+
+go_library(
+ name = "sser",
+ srcs = [
+ "peerfinder_impl.go",
+ "peerfinder_localhost.go",
+ "server_impl.go",
+ "sser.go",
+ ],
+ importpath = "go.skia.org/infra/go/sser",
+ visibility = ["//visibility:public"],
+ deps = [
+ "//go/httputils",
+ "//go/metrics2",
+ "//go/skerr",
+ "//go/sklog",
+ "//go/util_generics",
+ "@com_github_gorilla_mux//:mux",
+ "@com_github_r3labs_sse_v2//:sse",
+ "@io_k8s_api//core/v1:core",
+ "@io_k8s_apimachinery//pkg/apis/meta/v1:meta",
+ "@io_k8s_client_go//kubernetes",
+ ],
+)
+
+go_test(
+ name = "sser_test",
+ srcs = [
+ "peerfinder_impl_test.go",
+ "peerfinder_localhost_test.go",
+ "server_impl_test.go",
+ ],
+ embed = [":sser"],
+ deps = [
+ "//go/k8s/mocks",
+ "//go/k8s/watch/mocks",
+ "//go/metrics2",
+ "//go/sser/mocks",
+ "//go/testutils",
+ "@com_github_r3labs_sse_v2//:sse",
+ "@com_github_stretchr_testify//mock",
+ "@com_github_stretchr_testify//require",
+ "@io_k8s_api//core/v1:core",
+ "@io_k8s_apimachinery//pkg/apis/meta/v1:meta",
+ "@io_k8s_client_go//kubernetes",
+ ],
+)
diff --git a/go/sser/example/BUILD.bazel b/go/sser/example/BUILD.bazel
new file mode 100644
index 0000000..658212b
--- /dev/null
+++ b/go/sser/example/BUILD.bazel
@@ -0,0 +1,38 @@
+load("@io_bazel_rules_go//go:def.bzl", "go_binary", "go_library")
+load("//bazel:skia_app_container.bzl", "skia_app_container")
+
+go_library(
+ name = "example_lib",
+ srcs = ["main.go"],
+ importpath = "go.skia.org/infra/go/sser/example",
+ visibility = ["//visibility:private"],
+ deps = [
+ "//go/common",
+ "//go/httputils",
+ "//go/sklog",
+ "//go/sser",
+ "@com_github_gorilla_mux//:mux",
+ "@io_k8s_client_go//kubernetes",
+ "@io_k8s_client_go//rest",
+ ],
+)
+
+go_binary(
+ name = "example",
+ embed = [":example_lib"],
+ visibility = ["//visibility:public"],
+)
+
+skia_app_container(
+ name = "sserexample",
+ dirs = {
+ "/usr/local/bin": [
+ [
+ "//go/sser/example:example",
+ "0755",
+ ],
+ ],
+ },
+ entrypoint = "/usr/local/bin/example",
+ repository = "skia-public/sserexample",
+)
diff --git a/go/sser/example/Makefile b/go/sser/example/Makefile
new file mode 100644
index 0000000..69bbfb4
--- /dev/null
+++ b/go/sser/example/Makefile
@@ -0,0 +1,2 @@
+push:
+ bazelisk run --config=mayberemote //go/sser/example:pushk_sserexample
\ No newline at end of file
diff --git a/go/sser/example/main.go b/go/sser/example/main.go
new file mode 100644
index 0000000..23a58bc
--- /dev/null
+++ b/go/sser/example/main.go
@@ -0,0 +1,123 @@
+package main
+
+import (
+ "context"
+ "flag"
+ "fmt"
+ "net/http"
+ "os"
+ "time"
+
+ "github.com/gorilla/mux"
+ "go.skia.org/infra/go/common"
+ "go.skia.org/infra/go/httputils"
+ "go.skia.org/infra/go/sklog"
+ "go.skia.org/infra/go/sser"
+ "k8s.io/client-go/kubernetes"
+ "k8s.io/client-go/rest"
+)
+
+// flags
+var (
+ local = flag.Bool("local", false, "Running locally if true. As opposed to in production.")
+ port = flag.String("port", ":8000", "HTTP service address (e.g., ':8000')")
+ sserPort = flag.Int("sser_port", 7000, "Server-Sent Events peer connection port.")
+ promPort = flag.String("prom_port", ":20000", "Metrics service address (e.g., ':10110')")
+ namespace = flag.String("namespace", "default", "Namespace this application is running in.")
+ labelSelector = flag.String("label_selector", "app=sserexample", "Label selector for peers of this application.")
+)
+
+const page = `<!DOCTYPE html>
+<html>
+ <head>
+ <title></title>
+ <meta charset="utf-8" />
+ </head>
+ <body>
+ <pre></pre>
+ <script type="text/javascript" charset="utf-8">
+ const pre = document.querySelector('pre');
+ const evtSource = new EventSource('/_/sse?stream=counter');
+ const messages = [];
+ evtSource.onmessage = (event) => {
+ messages.push(event.data);
+ while (messages.length > 10) {
+ messages.shift();
+ }
+ pre.textContent = messages.join('\n');
+ };
+ </script>
+ </body>
+</html>
+`
+
+func index(w http.ResponseWriter, r *http.Request) {
+ w.Header().Set("Context-Type", "text/html")
+ _, err := w.Write([]byte(page))
+ if err != nil {
+ sklog.Errorf("write index page: %s", err)
+ }
+}
+
+func main() {
+ ctx := context.Background()
+ common.InitWithMust(
+ "sserexample",
+ common.PrometheusOpt(promPort),
+ common.MetricsLoggingOpt(),
+ )
+
+ var peerFinder sser.PeerFinder
+ if !*local {
+ config, err := rest.InClusterConfig()
+ if err != nil {
+ sklog.Fatal(err)
+ }
+ clientset, err := kubernetes.NewForConfig(config)
+ if err != nil {
+ sklog.Fatal(err)
+ }
+
+ peerFinder, err = sser.NewPeerFinder(clientset, *namespace, *labelSelector)
+ if err != nil {
+ sklog.Fatal(err)
+ }
+ } else {
+ peerFinder = sser.NewPeerFinderLocalhost()
+ }
+
+ sserServer, err := sser.New(*sserPort, peerFinder)
+ if err != nil {
+ sklog.Fatal(err)
+ }
+ err = sserServer.Start(ctx)
+ if err != nil {
+ sklog.Fatal(err)
+ }
+
+ r := mux.NewRouter()
+ r.HandleFunc("/", index)
+ r.HandleFunc("/_/sse", sserServer.ClientConnectionHandler(ctx))
+
+ var h http.Handler = r
+ if !*local {
+ h = httputils.HealthzAndHTTPS(h)
+ }
+
+ hostname, err := os.Hostname()
+ if err != nil {
+ sklog.Fatal(err)
+ }
+
+ count := 0
+ go func() {
+ for range time.Tick(time.Second) {
+ count++
+ sserServer.Send(ctx, "counter", fmt.Sprintf("%s - %d", hostname, count))
+ }
+ }()
+
+ http.Handle("/", h)
+ sklog.Info("Ready to serve.")
+ sklog.Fatal(http.ListenAndServe(*port, nil))
+}
diff --git a/go/sser/mocks/BUILD.bazel b/go/sser/mocks/BUILD.bazel
new file mode 100644
index 0000000..feda4e7
--- /dev/null
+++ b/go/sser/mocks/BUILD.bazel
@@ -0,0 +1,60 @@
+load("@io_bazel_rules_go//go:def.bzl", "go_library")
+
+go_library(
+ name = "mocks",
+ srcs = [
+ "Interface.go",
+ "PeerFinder.go",
+ "Server.go",
+ "generate.go",
+ ],
+ importpath = "go.skia.org/infra/go/sser/mocks",
+ visibility = ["//visibility:public"],
+ deps = [
+ "@com_github_stretchr_testify//mock",
+ "@io_k8s_client_go//discovery",
+ "@io_k8s_client_go//kubernetes/typed/admissionregistration/v1:admissionregistration",
+ "@io_k8s_client_go//kubernetes/typed/admissionregistration/v1beta1",
+ "@io_k8s_client_go//kubernetes/typed/apiserverinternal/v1alpha1",
+ "@io_k8s_client_go//kubernetes/typed/apps/v1:apps",
+ "@io_k8s_client_go//kubernetes/typed/apps/v1beta1",
+ "@io_k8s_client_go//kubernetes/typed/apps/v1beta2",
+ "@io_k8s_client_go//kubernetes/typed/authentication/v1:authentication",
+ "@io_k8s_client_go//kubernetes/typed/authentication/v1beta1",
+ "@io_k8s_client_go//kubernetes/typed/authorization/v1:authorization",
+ "@io_k8s_client_go//kubernetes/typed/authorization/v1beta1",
+ "@io_k8s_client_go//kubernetes/typed/autoscaling/v1:autoscaling",
+ "@io_k8s_client_go//kubernetes/typed/autoscaling/v2beta1",
+ "@io_k8s_client_go//kubernetes/typed/autoscaling/v2beta2",
+ "@io_k8s_client_go//kubernetes/typed/batch/v1:batch",
+ "@io_k8s_client_go//kubernetes/typed/batch/v1beta1",
+ "@io_k8s_client_go//kubernetes/typed/certificates/v1:certificates",
+ "@io_k8s_client_go//kubernetes/typed/certificates/v1beta1",
+ "@io_k8s_client_go//kubernetes/typed/coordination/v1:coordination",
+ "@io_k8s_client_go//kubernetes/typed/coordination/v1beta1",
+ "@io_k8s_client_go//kubernetes/typed/core/v1:core",
+ "@io_k8s_client_go//kubernetes/typed/discovery/v1:discovery",
+ "@io_k8s_client_go//kubernetes/typed/discovery/v1beta1",
+ "@io_k8s_client_go//kubernetes/typed/events/v1:events",
+ "@io_k8s_client_go//kubernetes/typed/events/v1beta1",
+ "@io_k8s_client_go//kubernetes/typed/extensions/v1beta1",
+ "@io_k8s_client_go//kubernetes/typed/flowcontrol/v1alpha1",
+ "@io_k8s_client_go//kubernetes/typed/flowcontrol/v1beta1",
+ "@io_k8s_client_go//kubernetes/typed/networking/v1:networking",
+ "@io_k8s_client_go//kubernetes/typed/networking/v1beta1",
+ "@io_k8s_client_go//kubernetes/typed/node/v1:node",
+ "@io_k8s_client_go//kubernetes/typed/node/v1alpha1",
+ "@io_k8s_client_go//kubernetes/typed/node/v1beta1",
+ "@io_k8s_client_go//kubernetes/typed/policy/v1:policy",
+ "@io_k8s_client_go//kubernetes/typed/policy/v1beta1",
+ "@io_k8s_client_go//kubernetes/typed/rbac/v1:rbac",
+ "@io_k8s_client_go//kubernetes/typed/rbac/v1alpha1",
+ "@io_k8s_client_go//kubernetes/typed/rbac/v1beta1",
+ "@io_k8s_client_go//kubernetes/typed/scheduling/v1:scheduling",
+ "@io_k8s_client_go//kubernetes/typed/scheduling/v1alpha1",
+ "@io_k8s_client_go//kubernetes/typed/scheduling/v1beta1",
+ "@io_k8s_client_go//kubernetes/typed/storage/v1:storage",
+ "@io_k8s_client_go//kubernetes/typed/storage/v1alpha1",
+ "@io_k8s_client_go//kubernetes/typed/storage/v1beta1",
+ ],
+)
diff --git a/go/sser/mocks/Interface.go b/go/sser/mocks/Interface.go
new file mode 100644
index 0000000..69b4e80
--- /dev/null
+++ b/go/sser/mocks/Interface.go
@@ -0,0 +1,814 @@
+// Code generated by mockery v0.0.0-dev. DO NOT EDIT.
+
+package mocks
+
+import (
+ apiserverinternalv1alpha1 "k8s.io/client-go/kubernetes/typed/apiserverinternal/v1alpha1"
+ appsv1 "k8s.io/client-go/kubernetes/typed/apps/v1"
+
+ appsv1beta1 "k8s.io/client-go/kubernetes/typed/apps/v1beta1"
+
+ authenticationv1 "k8s.io/client-go/kubernetes/typed/authentication/v1"
+
+ authenticationv1beta1 "k8s.io/client-go/kubernetes/typed/authentication/v1beta1"
+
+ authorizationv1 "k8s.io/client-go/kubernetes/typed/authorization/v1"
+
+ authorizationv1beta1 "k8s.io/client-go/kubernetes/typed/authorization/v1beta1"
+
+ autoscalingv1 "k8s.io/client-go/kubernetes/typed/autoscaling/v1"
+
+ batchv1 "k8s.io/client-go/kubernetes/typed/batch/v1"
+
+ batchv1beta1 "k8s.io/client-go/kubernetes/typed/batch/v1beta1"
+
+ certificatesv1 "k8s.io/client-go/kubernetes/typed/certificates/v1"
+
+ certificatesv1beta1 "k8s.io/client-go/kubernetes/typed/certificates/v1beta1"
+
+ coordinationv1 "k8s.io/client-go/kubernetes/typed/coordination/v1"
+
+ coordinationv1beta1 "k8s.io/client-go/kubernetes/typed/coordination/v1beta1"
+
+ corev1 "k8s.io/client-go/kubernetes/typed/core/v1"
+
+ discovery "k8s.io/client-go/discovery"
+
+ discoveryv1 "k8s.io/client-go/kubernetes/typed/discovery/v1"
+
+ discoveryv1beta1 "k8s.io/client-go/kubernetes/typed/discovery/v1beta1"
+
+ eventsv1 "k8s.io/client-go/kubernetes/typed/events/v1"
+
+ eventsv1beta1 "k8s.io/client-go/kubernetes/typed/events/v1beta1"
+
+ extensionsv1beta1 "k8s.io/client-go/kubernetes/typed/extensions/v1beta1"
+
+ flowcontrolv1beta1 "k8s.io/client-go/kubernetes/typed/flowcontrol/v1beta1"
+
+ mock "github.com/stretchr/testify/mock"
+
+ networkingv1 "k8s.io/client-go/kubernetes/typed/networking/v1"
+
+ networkingv1beta1 "k8s.io/client-go/kubernetes/typed/networking/v1beta1"
+
+ nodev1 "k8s.io/client-go/kubernetes/typed/node/v1"
+
+ nodev1alpha1 "k8s.io/client-go/kubernetes/typed/node/v1alpha1"
+
+ nodev1beta1 "k8s.io/client-go/kubernetes/typed/node/v1beta1"
+
+ policyv1 "k8s.io/client-go/kubernetes/typed/policy/v1"
+
+ policyv1beta1 "k8s.io/client-go/kubernetes/typed/policy/v1beta1"
+
+ rbacv1 "k8s.io/client-go/kubernetes/typed/rbac/v1"
+
+ rbacv1alpha1 "k8s.io/client-go/kubernetes/typed/rbac/v1alpha1"
+
+ rbacv1beta1 "k8s.io/client-go/kubernetes/typed/rbac/v1beta1"
+
+ schedulingv1 "k8s.io/client-go/kubernetes/typed/scheduling/v1"
+
+ schedulingv1alpha1 "k8s.io/client-go/kubernetes/typed/scheduling/v1alpha1"
+
+ schedulingv1beta1 "k8s.io/client-go/kubernetes/typed/scheduling/v1beta1"
+
+ storagev1 "k8s.io/client-go/kubernetes/typed/storage/v1"
+
+ storagev1alpha1 "k8s.io/client-go/kubernetes/typed/storage/v1alpha1"
+
+ storagev1beta1 "k8s.io/client-go/kubernetes/typed/storage/v1beta1"
+
+ testing "testing"
+
+ v1 "k8s.io/client-go/kubernetes/typed/admissionregistration/v1"
+
+ v1alpha1 "k8s.io/client-go/kubernetes/typed/flowcontrol/v1alpha1"
+
+ v1beta1 "k8s.io/client-go/kubernetes/typed/admissionregistration/v1beta1"
+
+ v1beta2 "k8s.io/client-go/kubernetes/typed/apps/v1beta2"
+
+ v2beta1 "k8s.io/client-go/kubernetes/typed/autoscaling/v2beta1"
+
+ v2beta2 "k8s.io/client-go/kubernetes/typed/autoscaling/v2beta2"
+)
+
+// Interface is an autogenerated mock type for the Interface type
+type Interface struct {
+ mock.Mock
+}
+
+// AdmissionregistrationV1 provides a mock function with given fields:
+func (_m *Interface) AdmissionregistrationV1() v1.AdmissionregistrationV1Interface {
+ ret := _m.Called()
+
+ var r0 v1.AdmissionregistrationV1Interface
+ if rf, ok := ret.Get(0).(func() v1.AdmissionregistrationV1Interface); ok {
+ r0 = rf()
+ } else {
+ if ret.Get(0) != nil {
+ r0 = ret.Get(0).(v1.AdmissionregistrationV1Interface)
+ }
+ }
+
+ return r0
+}
+
+// AdmissionregistrationV1beta1 provides a mock function with given fields:
+func (_m *Interface) AdmissionregistrationV1beta1() v1beta1.AdmissionregistrationV1beta1Interface {
+ ret := _m.Called()
+
+ var r0 v1beta1.AdmissionregistrationV1beta1Interface
+ if rf, ok := ret.Get(0).(func() v1beta1.AdmissionregistrationV1beta1Interface); ok {
+ r0 = rf()
+ } else {
+ if ret.Get(0) != nil {
+ r0 = ret.Get(0).(v1beta1.AdmissionregistrationV1beta1Interface)
+ }
+ }
+
+ return r0
+}
+
+// AppsV1 provides a mock function with given fields:
+func (_m *Interface) AppsV1() appsv1.AppsV1Interface {
+ ret := _m.Called()
+
+ var r0 appsv1.AppsV1Interface
+ if rf, ok := ret.Get(0).(func() appsv1.AppsV1Interface); ok {
+ r0 = rf()
+ } else {
+ if ret.Get(0) != nil {
+ r0 = ret.Get(0).(appsv1.AppsV1Interface)
+ }
+ }
+
+ return r0
+}
+
+// AppsV1beta1 provides a mock function with given fields:
+func (_m *Interface) AppsV1beta1() appsv1beta1.AppsV1beta1Interface {
+ ret := _m.Called()
+
+ var r0 appsv1beta1.AppsV1beta1Interface
+ if rf, ok := ret.Get(0).(func() appsv1beta1.AppsV1beta1Interface); ok {
+ r0 = rf()
+ } else {
+ if ret.Get(0) != nil {
+ r0 = ret.Get(0).(appsv1beta1.AppsV1beta1Interface)
+ }
+ }
+
+ return r0
+}
+
+// AppsV1beta2 provides a mock function with given fields:
+func (_m *Interface) AppsV1beta2() v1beta2.AppsV1beta2Interface {
+ ret := _m.Called()
+
+ var r0 v1beta2.AppsV1beta2Interface
+ if rf, ok := ret.Get(0).(func() v1beta2.AppsV1beta2Interface); ok {
+ r0 = rf()
+ } else {
+ if ret.Get(0) != nil {
+ r0 = ret.Get(0).(v1beta2.AppsV1beta2Interface)
+ }
+ }
+
+ return r0
+}
+
+// AuthenticationV1 provides a mock function with given fields:
+func (_m *Interface) AuthenticationV1() authenticationv1.AuthenticationV1Interface {
+ ret := _m.Called()
+
+ var r0 authenticationv1.AuthenticationV1Interface
+ if rf, ok := ret.Get(0).(func() authenticationv1.AuthenticationV1Interface); ok {
+ r0 = rf()
+ } else {
+ if ret.Get(0) != nil {
+ r0 = ret.Get(0).(authenticationv1.AuthenticationV1Interface)
+ }
+ }
+
+ return r0
+}
+
+// AuthenticationV1beta1 provides a mock function with given fields:
+func (_m *Interface) AuthenticationV1beta1() authenticationv1beta1.AuthenticationV1beta1Interface {
+ ret := _m.Called()
+
+ var r0 authenticationv1beta1.AuthenticationV1beta1Interface
+ if rf, ok := ret.Get(0).(func() authenticationv1beta1.AuthenticationV1beta1Interface); ok {
+ r0 = rf()
+ } else {
+ if ret.Get(0) != nil {
+ r0 = ret.Get(0).(authenticationv1beta1.AuthenticationV1beta1Interface)
+ }
+ }
+
+ return r0
+}
+
+// AuthorizationV1 provides a mock function with given fields:
+func (_m *Interface) AuthorizationV1() authorizationv1.AuthorizationV1Interface {
+ ret := _m.Called()
+
+ var r0 authorizationv1.AuthorizationV1Interface
+ if rf, ok := ret.Get(0).(func() authorizationv1.AuthorizationV1Interface); ok {
+ r0 = rf()
+ } else {
+ if ret.Get(0) != nil {
+ r0 = ret.Get(0).(authorizationv1.AuthorizationV1Interface)
+ }
+ }
+
+ return r0
+}
+
+// AuthorizationV1beta1 provides a mock function with given fields:
+func (_m *Interface) AuthorizationV1beta1() authorizationv1beta1.AuthorizationV1beta1Interface {
+ ret := _m.Called()
+
+ var r0 authorizationv1beta1.AuthorizationV1beta1Interface
+ if rf, ok := ret.Get(0).(func() authorizationv1beta1.AuthorizationV1beta1Interface); ok {
+ r0 = rf()
+ } else {
+ if ret.Get(0) != nil {
+ r0 = ret.Get(0).(authorizationv1beta1.AuthorizationV1beta1Interface)
+ }
+ }
+
+ return r0
+}
+
+// AutoscalingV1 provides a mock function with given fields:
+func (_m *Interface) AutoscalingV1() autoscalingv1.AutoscalingV1Interface {
+ ret := _m.Called()
+
+ var r0 autoscalingv1.AutoscalingV1Interface
+ if rf, ok := ret.Get(0).(func() autoscalingv1.AutoscalingV1Interface); ok {
+ r0 = rf()
+ } else {
+ if ret.Get(0) != nil {
+ r0 = ret.Get(0).(autoscalingv1.AutoscalingV1Interface)
+ }
+ }
+
+ return r0
+}
+
+// AutoscalingV2beta1 provides a mock function with given fields:
+func (_m *Interface) AutoscalingV2beta1() v2beta1.AutoscalingV2beta1Interface {
+ ret := _m.Called()
+
+ var r0 v2beta1.AutoscalingV2beta1Interface
+ if rf, ok := ret.Get(0).(func() v2beta1.AutoscalingV2beta1Interface); ok {
+ r0 = rf()
+ } else {
+ if ret.Get(0) != nil {
+ r0 = ret.Get(0).(v2beta1.AutoscalingV2beta1Interface)
+ }
+ }
+
+ return r0
+}
+
+// AutoscalingV2beta2 provides a mock function with given fields:
+func (_m *Interface) AutoscalingV2beta2() v2beta2.AutoscalingV2beta2Interface {
+ ret := _m.Called()
+
+ var r0 v2beta2.AutoscalingV2beta2Interface
+ if rf, ok := ret.Get(0).(func() v2beta2.AutoscalingV2beta2Interface); ok {
+ r0 = rf()
+ } else {
+ if ret.Get(0) != nil {
+ r0 = ret.Get(0).(v2beta2.AutoscalingV2beta2Interface)
+ }
+ }
+
+ return r0
+}
+
+// BatchV1 provides a mock function with given fields:
+func (_m *Interface) BatchV1() batchv1.BatchV1Interface {
+ ret := _m.Called()
+
+ var r0 batchv1.BatchV1Interface
+ if rf, ok := ret.Get(0).(func() batchv1.BatchV1Interface); ok {
+ r0 = rf()
+ } else {
+ if ret.Get(0) != nil {
+ r0 = ret.Get(0).(batchv1.BatchV1Interface)
+ }
+ }
+
+ return r0
+}
+
+// BatchV1beta1 provides a mock function with given fields:
+func (_m *Interface) BatchV1beta1() batchv1beta1.BatchV1beta1Interface {
+ ret := _m.Called()
+
+ var r0 batchv1beta1.BatchV1beta1Interface
+ if rf, ok := ret.Get(0).(func() batchv1beta1.BatchV1beta1Interface); ok {
+ r0 = rf()
+ } else {
+ if ret.Get(0) != nil {
+ r0 = ret.Get(0).(batchv1beta1.BatchV1beta1Interface)
+ }
+ }
+
+ return r0
+}
+
+// CertificatesV1 provides a mock function with given fields:
+func (_m *Interface) CertificatesV1() certificatesv1.CertificatesV1Interface {
+ ret := _m.Called()
+
+ var r0 certificatesv1.CertificatesV1Interface
+ if rf, ok := ret.Get(0).(func() certificatesv1.CertificatesV1Interface); ok {
+ r0 = rf()
+ } else {
+ if ret.Get(0) != nil {
+ r0 = ret.Get(0).(certificatesv1.CertificatesV1Interface)
+ }
+ }
+
+ return r0
+}
+
+// CertificatesV1beta1 provides a mock function with given fields:
+func (_m *Interface) CertificatesV1beta1() certificatesv1beta1.CertificatesV1beta1Interface {
+ ret := _m.Called()
+
+ var r0 certificatesv1beta1.CertificatesV1beta1Interface
+ if rf, ok := ret.Get(0).(func() certificatesv1beta1.CertificatesV1beta1Interface); ok {
+ r0 = rf()
+ } else {
+ if ret.Get(0) != nil {
+ r0 = ret.Get(0).(certificatesv1beta1.CertificatesV1beta1Interface)
+ }
+ }
+
+ return r0
+}
+
+// CoordinationV1 provides a mock function with given fields:
+func (_m *Interface) CoordinationV1() coordinationv1.CoordinationV1Interface {
+ ret := _m.Called()
+
+ var r0 coordinationv1.CoordinationV1Interface
+ if rf, ok := ret.Get(0).(func() coordinationv1.CoordinationV1Interface); ok {
+ r0 = rf()
+ } else {
+ if ret.Get(0) != nil {
+ r0 = ret.Get(0).(coordinationv1.CoordinationV1Interface)
+ }
+ }
+
+ return r0
+}
+
+// CoordinationV1beta1 provides a mock function with given fields:
+func (_m *Interface) CoordinationV1beta1() coordinationv1beta1.CoordinationV1beta1Interface {
+ ret := _m.Called()
+
+ var r0 coordinationv1beta1.CoordinationV1beta1Interface
+ if rf, ok := ret.Get(0).(func() coordinationv1beta1.CoordinationV1beta1Interface); ok {
+ r0 = rf()
+ } else {
+ if ret.Get(0) != nil {
+ r0 = ret.Get(0).(coordinationv1beta1.CoordinationV1beta1Interface)
+ }
+ }
+
+ return r0
+}
+
+// CoreV1 provides a mock function with given fields:
+func (_m *Interface) CoreV1() corev1.CoreV1Interface {
+ ret := _m.Called()
+
+ var r0 corev1.CoreV1Interface
+ if rf, ok := ret.Get(0).(func() corev1.CoreV1Interface); ok {
+ r0 = rf()
+ } else {
+ if ret.Get(0) != nil {
+ r0 = ret.Get(0).(corev1.CoreV1Interface)
+ }
+ }
+
+ return r0
+}
+
+// Discovery provides a mock function with given fields:
+func (_m *Interface) Discovery() discovery.DiscoveryInterface {
+ ret := _m.Called()
+
+ var r0 discovery.DiscoveryInterface
+ if rf, ok := ret.Get(0).(func() discovery.DiscoveryInterface); ok {
+ r0 = rf()
+ } else {
+ if ret.Get(0) != nil {
+ r0 = ret.Get(0).(discovery.DiscoveryInterface)
+ }
+ }
+
+ return r0
+}
+
+// DiscoveryV1 provides a mock function with given fields:
+func (_m *Interface) DiscoveryV1() discoveryv1.DiscoveryV1Interface {
+ ret := _m.Called()
+
+ var r0 discoveryv1.DiscoveryV1Interface
+ if rf, ok := ret.Get(0).(func() discoveryv1.DiscoveryV1Interface); ok {
+ r0 = rf()
+ } else {
+ if ret.Get(0) != nil {
+ r0 = ret.Get(0).(discoveryv1.DiscoveryV1Interface)
+ }
+ }
+
+ return r0
+}
+
+// DiscoveryV1beta1 provides a mock function with given fields:
+func (_m *Interface) DiscoveryV1beta1() discoveryv1beta1.DiscoveryV1beta1Interface {
+ ret := _m.Called()
+
+ var r0 discoveryv1beta1.DiscoveryV1beta1Interface
+ if rf, ok := ret.Get(0).(func() discoveryv1beta1.DiscoveryV1beta1Interface); ok {
+ r0 = rf()
+ } else {
+ if ret.Get(0) != nil {
+ r0 = ret.Get(0).(discoveryv1beta1.DiscoveryV1beta1Interface)
+ }
+ }
+
+ return r0
+}
+
+// EventsV1 provides a mock function with given fields:
+func (_m *Interface) EventsV1() eventsv1.EventsV1Interface {
+ ret := _m.Called()
+
+ var r0 eventsv1.EventsV1Interface
+ if rf, ok := ret.Get(0).(func() eventsv1.EventsV1Interface); ok {
+ r0 = rf()
+ } else {
+ if ret.Get(0) != nil {
+ r0 = ret.Get(0).(eventsv1.EventsV1Interface)
+ }
+ }
+
+ return r0
+}
+
+// EventsV1beta1 provides a mock function with given fields:
+func (_m *Interface) EventsV1beta1() eventsv1beta1.EventsV1beta1Interface {
+ ret := _m.Called()
+
+ var r0 eventsv1beta1.EventsV1beta1Interface
+ if rf, ok := ret.Get(0).(func() eventsv1beta1.EventsV1beta1Interface); ok {
+ r0 = rf()
+ } else {
+ if ret.Get(0) != nil {
+ r0 = ret.Get(0).(eventsv1beta1.EventsV1beta1Interface)
+ }
+ }
+
+ return r0
+}
+
+// ExtensionsV1beta1 provides a mock function with given fields:
+func (_m *Interface) ExtensionsV1beta1() extensionsv1beta1.ExtensionsV1beta1Interface {
+ ret := _m.Called()
+
+ var r0 extensionsv1beta1.ExtensionsV1beta1Interface
+ if rf, ok := ret.Get(0).(func() extensionsv1beta1.ExtensionsV1beta1Interface); ok {
+ r0 = rf()
+ } else {
+ if ret.Get(0) != nil {
+ r0 = ret.Get(0).(extensionsv1beta1.ExtensionsV1beta1Interface)
+ }
+ }
+
+ return r0
+}
+
+// FlowcontrolV1alpha1 provides a mock function with given fields:
+func (_m *Interface) FlowcontrolV1alpha1() v1alpha1.FlowcontrolV1alpha1Interface {
+ ret := _m.Called()
+
+ var r0 v1alpha1.FlowcontrolV1alpha1Interface
+ if rf, ok := ret.Get(0).(func() v1alpha1.FlowcontrolV1alpha1Interface); ok {
+ r0 = rf()
+ } else {
+ if ret.Get(0) != nil {
+ r0 = ret.Get(0).(v1alpha1.FlowcontrolV1alpha1Interface)
+ }
+ }
+
+ return r0
+}
+
+// FlowcontrolV1beta1 provides a mock function with given fields:
+func (_m *Interface) FlowcontrolV1beta1() flowcontrolv1beta1.FlowcontrolV1beta1Interface {
+ ret := _m.Called()
+
+ var r0 flowcontrolv1beta1.FlowcontrolV1beta1Interface
+ if rf, ok := ret.Get(0).(func() flowcontrolv1beta1.FlowcontrolV1beta1Interface); ok {
+ r0 = rf()
+ } else {
+ if ret.Get(0) != nil {
+ r0 = ret.Get(0).(flowcontrolv1beta1.FlowcontrolV1beta1Interface)
+ }
+ }
+
+ return r0
+}
+
+// InternalV1alpha1 provides a mock function with given fields:
+func (_m *Interface) InternalV1alpha1() apiserverinternalv1alpha1.InternalV1alpha1Interface {
+ ret := _m.Called()
+
+ var r0 apiserverinternalv1alpha1.InternalV1alpha1Interface
+ if rf, ok := ret.Get(0).(func() apiserverinternalv1alpha1.InternalV1alpha1Interface); ok {
+ r0 = rf()
+ } else {
+ if ret.Get(0) != nil {
+ r0 = ret.Get(0).(apiserverinternalv1alpha1.InternalV1alpha1Interface)
+ }
+ }
+
+ return r0
+}
+
+// NetworkingV1 provides a mock function with given fields:
+func (_m *Interface) NetworkingV1() networkingv1.NetworkingV1Interface {
+ ret := _m.Called()
+
+ var r0 networkingv1.NetworkingV1Interface
+ if rf, ok := ret.Get(0).(func() networkingv1.NetworkingV1Interface); ok {
+ r0 = rf()
+ } else {
+ if ret.Get(0) != nil {
+ r0 = ret.Get(0).(networkingv1.NetworkingV1Interface)
+ }
+ }
+
+ return r0
+}
+
+// NetworkingV1beta1 provides a mock function with given fields:
+func (_m *Interface) NetworkingV1beta1() networkingv1beta1.NetworkingV1beta1Interface {
+ ret := _m.Called()
+
+ var r0 networkingv1beta1.NetworkingV1beta1Interface
+ if rf, ok := ret.Get(0).(func() networkingv1beta1.NetworkingV1beta1Interface); ok {
+ r0 = rf()
+ } else {
+ if ret.Get(0) != nil {
+ r0 = ret.Get(0).(networkingv1beta1.NetworkingV1beta1Interface)
+ }
+ }
+
+ return r0
+}
+
+// NodeV1 provides a mock function with given fields:
+func (_m *Interface) NodeV1() nodev1.NodeV1Interface {
+ ret := _m.Called()
+
+ var r0 nodev1.NodeV1Interface
+ if rf, ok := ret.Get(0).(func() nodev1.NodeV1Interface); ok {
+ r0 = rf()
+ } else {
+ if ret.Get(0) != nil {
+ r0 = ret.Get(0).(nodev1.NodeV1Interface)
+ }
+ }
+
+ return r0
+}
+
+// NodeV1alpha1 provides a mock function with given fields:
+func (_m *Interface) NodeV1alpha1() nodev1alpha1.NodeV1alpha1Interface {
+ ret := _m.Called()
+
+ var r0 nodev1alpha1.NodeV1alpha1Interface
+ if rf, ok := ret.Get(0).(func() nodev1alpha1.NodeV1alpha1Interface); ok {
+ r0 = rf()
+ } else {
+ if ret.Get(0) != nil {
+ r0 = ret.Get(0).(nodev1alpha1.NodeV1alpha1Interface)
+ }
+ }
+
+ return r0
+}
+
+// NodeV1beta1 provides a mock function with given fields:
+func (_m *Interface) NodeV1beta1() nodev1beta1.NodeV1beta1Interface {
+ ret := _m.Called()
+
+ var r0 nodev1beta1.NodeV1beta1Interface
+ if rf, ok := ret.Get(0).(func() nodev1beta1.NodeV1beta1Interface); ok {
+ r0 = rf()
+ } else {
+ if ret.Get(0) != nil {
+ r0 = ret.Get(0).(nodev1beta1.NodeV1beta1Interface)
+ }
+ }
+
+ return r0
+}
+
+// PolicyV1 provides a mock function with given fields:
+func (_m *Interface) PolicyV1() policyv1.PolicyV1Interface {
+ ret := _m.Called()
+
+ var r0 policyv1.PolicyV1Interface
+ if rf, ok := ret.Get(0).(func() policyv1.PolicyV1Interface); ok {
+ r0 = rf()
+ } else {
+ if ret.Get(0) != nil {
+ r0 = ret.Get(0).(policyv1.PolicyV1Interface)
+ }
+ }
+
+ return r0
+}
+
+// PolicyV1beta1 provides a mock function with given fields:
+func (_m *Interface) PolicyV1beta1() policyv1beta1.PolicyV1beta1Interface {
+ ret := _m.Called()
+
+ var r0 policyv1beta1.PolicyV1beta1Interface
+ if rf, ok := ret.Get(0).(func() policyv1beta1.PolicyV1beta1Interface); ok {
+ r0 = rf()
+ } else {
+ if ret.Get(0) != nil {
+ r0 = ret.Get(0).(policyv1beta1.PolicyV1beta1Interface)
+ }
+ }
+
+ return r0
+}
+
+// RbacV1 provides a mock function with given fields:
+func (_m *Interface) RbacV1() rbacv1.RbacV1Interface {
+ ret := _m.Called()
+
+ var r0 rbacv1.RbacV1Interface
+ if rf, ok := ret.Get(0).(func() rbacv1.RbacV1Interface); ok {
+ r0 = rf()
+ } else {
+ if ret.Get(0) != nil {
+ r0 = ret.Get(0).(rbacv1.RbacV1Interface)
+ }
+ }
+
+ return r0
+}
+
+// RbacV1alpha1 provides a mock function with given fields:
+func (_m *Interface) RbacV1alpha1() rbacv1alpha1.RbacV1alpha1Interface {
+ ret := _m.Called()
+
+ var r0 rbacv1alpha1.RbacV1alpha1Interface
+ if rf, ok := ret.Get(0).(func() rbacv1alpha1.RbacV1alpha1Interface); ok {
+ r0 = rf()
+ } else {
+ if ret.Get(0) != nil {
+ r0 = ret.Get(0).(rbacv1alpha1.RbacV1alpha1Interface)
+ }
+ }
+
+ return r0
+}
+
+// RbacV1beta1 provides a mock function with given fields:
+func (_m *Interface) RbacV1beta1() rbacv1beta1.RbacV1beta1Interface {
+ ret := _m.Called()
+
+ var r0 rbacv1beta1.RbacV1beta1Interface
+ if rf, ok := ret.Get(0).(func() rbacv1beta1.RbacV1beta1Interface); ok {
+ r0 = rf()
+ } else {
+ if ret.Get(0) != nil {
+ r0 = ret.Get(0).(rbacv1beta1.RbacV1beta1Interface)
+ }
+ }
+
+ return r0
+}
+
+// SchedulingV1 provides a mock function with given fields:
+func (_m *Interface) SchedulingV1() schedulingv1.SchedulingV1Interface {
+ ret := _m.Called()
+
+ var r0 schedulingv1.SchedulingV1Interface
+ if rf, ok := ret.Get(0).(func() schedulingv1.SchedulingV1Interface); ok {
+ r0 = rf()
+ } else {
+ if ret.Get(0) != nil {
+ r0 = ret.Get(0).(schedulingv1.SchedulingV1Interface)
+ }
+ }
+
+ return r0
+}
+
+// SchedulingV1alpha1 provides a mock function with given fields:
+func (_m *Interface) SchedulingV1alpha1() schedulingv1alpha1.SchedulingV1alpha1Interface {
+ ret := _m.Called()
+
+ var r0 schedulingv1alpha1.SchedulingV1alpha1Interface
+ if rf, ok := ret.Get(0).(func() schedulingv1alpha1.SchedulingV1alpha1Interface); ok {
+ r0 = rf()
+ } else {
+ if ret.Get(0) != nil {
+ r0 = ret.Get(0).(schedulingv1alpha1.SchedulingV1alpha1Interface)
+ }
+ }
+
+ return r0
+}
+
+// SchedulingV1beta1 provides a mock function with given fields:
+func (_m *Interface) SchedulingV1beta1() schedulingv1beta1.SchedulingV1beta1Interface {
+ ret := _m.Called()
+
+ var r0 schedulingv1beta1.SchedulingV1beta1Interface
+ if rf, ok := ret.Get(0).(func() schedulingv1beta1.SchedulingV1beta1Interface); ok {
+ r0 = rf()
+ } else {
+ if ret.Get(0) != nil {
+ r0 = ret.Get(0).(schedulingv1beta1.SchedulingV1beta1Interface)
+ }
+ }
+
+ return r0
+}
+
+// StorageV1 provides a mock function with given fields:
+func (_m *Interface) StorageV1() storagev1.StorageV1Interface {
+ ret := _m.Called()
+
+ var r0 storagev1.StorageV1Interface
+ if rf, ok := ret.Get(0).(func() storagev1.StorageV1Interface); ok {
+ r0 = rf()
+ } else {
+ if ret.Get(0) != nil {
+ r0 = ret.Get(0).(storagev1.StorageV1Interface)
+ }
+ }
+
+ return r0
+}
+
+// StorageV1alpha1 provides a mock function with given fields:
+func (_m *Interface) StorageV1alpha1() storagev1alpha1.StorageV1alpha1Interface {
+ ret := _m.Called()
+
+ var r0 storagev1alpha1.StorageV1alpha1Interface
+ if rf, ok := ret.Get(0).(func() storagev1alpha1.StorageV1alpha1Interface); ok {
+ r0 = rf()
+ } else {
+ if ret.Get(0) != nil {
+ r0 = ret.Get(0).(storagev1alpha1.StorageV1alpha1Interface)
+ }
+ }
+
+ return r0
+}
+
+// StorageV1beta1 provides a mock function with given fields:
+func (_m *Interface) StorageV1beta1() storagev1beta1.StorageV1beta1Interface {
+ ret := _m.Called()
+
+ var r0 storagev1beta1.StorageV1beta1Interface
+ if rf, ok := ret.Get(0).(func() storagev1beta1.StorageV1beta1Interface); ok {
+ r0 = rf()
+ } else {
+ if ret.Get(0) != nil {
+ r0 = ret.Get(0).(storagev1beta1.StorageV1beta1Interface)
+ }
+ }
+
+ return r0
+}
+
+// NewInterface creates a new instance of Interface. It also registers a cleanup function to assert the mocks expectations.
+func NewInterface(t testing.TB) *Interface {
+ mock := &Interface{}
+
+ t.Cleanup(func() { mock.AssertExpectations(t) })
+
+ return mock
+}
diff --git a/go/sser/mocks/PeerFinder.go b/go/sser/mocks/PeerFinder.go
new file mode 100644
index 0000000..9eba655
--- /dev/null
+++ b/go/sser/mocks/PeerFinder.go
@@ -0,0 +1,57 @@
+// Code generated by mockery v0.0.0-dev. DO NOT EDIT.
+
+package mocks
+
+import (
+ context "context"
+
+ mock "github.com/stretchr/testify/mock"
+
+ testing "testing"
+)
+
+// PeerFinder is an autogenerated mock type for the PeerFinder type
+type PeerFinder struct {
+ mock.Mock
+}
+
+// Start provides a mock function with given fields: ctx
+func (_m *PeerFinder) Start(ctx context.Context) ([]string, <-chan []string, error) {
+ ret := _m.Called(ctx)
+
+ var r0 []string
+ if rf, ok := ret.Get(0).(func(context.Context) []string); ok {
+ r0 = rf(ctx)
+ } else {
+ if ret.Get(0) != nil {
+ r0 = ret.Get(0).([]string)
+ }
+ }
+
+ var r1 <-chan []string
+ if rf, ok := ret.Get(1).(func(context.Context) <-chan []string); ok {
+ r1 = rf(ctx)
+ } else {
+ if ret.Get(1) != nil {
+ r1 = ret.Get(1).(<-chan []string)
+ }
+ }
+
+ var r2 error
+ if rf, ok := ret.Get(2).(func(context.Context) error); ok {
+ r2 = rf(ctx)
+ } else {
+ r2 = ret.Error(2)
+ }
+
+ return r0, r1, r2
+}
+
+// NewPeerFinder creates a new instance of PeerFinder. It also registers a cleanup function to assert the mocks expectations.
+func NewPeerFinder(t testing.TB) *PeerFinder {
+ mock := &PeerFinder{}
+
+ t.Cleanup(func() { mock.AssertExpectations(t) })
+
+ return mock
+}
diff --git a/go/sser/mocks/Server.go b/go/sser/mocks/Server.go
new file mode 100644
index 0000000..d2dc57c
--- /dev/null
+++ b/go/sser/mocks/Server.go
@@ -0,0 +1,61 @@
+// Code generated by mockery v0.0.0-dev. DO NOT EDIT.
+
+package mocks
+
+import (
+ context "context"
+ http "net/http"
+
+ mock "github.com/stretchr/testify/mock"
+
+ testing "testing"
+)
+
+// Server is an autogenerated mock type for the Server type
+type Server struct {
+ mock.Mock
+}
+
+// ClientConnectionHandler provides a mock function with given fields: ctx
+func (_m *Server) ClientConnectionHandler(ctx context.Context) http.HandlerFunc {
+ ret := _m.Called(ctx)
+
+ var r0 http.HandlerFunc
+ if rf, ok := ret.Get(0).(func(context.Context) http.HandlerFunc); ok {
+ r0 = rf(ctx)
+ } else {
+ if ret.Get(0) != nil {
+ r0 = ret.Get(0).(http.HandlerFunc)
+ }
+ }
+
+ return r0
+}
+
+// Send provides a mock function with given fields: ctx, stream, msg
+func (_m *Server) Send(ctx context.Context, stream string, msg string) {
+ _m.Called(ctx, stream, msg)
+}
+
+// Start provides a mock function with given fields: ctx
+func (_m *Server) Start(ctx context.Context) error {
+ ret := _m.Called(ctx)
+
+ var r0 error
+ if rf, ok := ret.Get(0).(func(context.Context) error); ok {
+ r0 = rf(ctx)
+ } else {
+ r0 = ret.Error(0)
+ }
+
+ return r0
+}
+
+// NewServer creates a new instance of Server. It also registers a cleanup function to assert the mocks expectations.
+func NewServer(t testing.TB) *Server {
+ mock := &Server{}
+
+ t.Cleanup(func() { mock.AssertExpectations(t) })
+
+ return mock
+}
diff --git a/go/sser/mocks/generate.go b/go/sser/mocks/generate.go
new file mode 100644
index 0000000..2642559
--- /dev/null
+++ b/go/sser/mocks/generate.go
@@ -0,0 +1,5 @@
+package mocks
+
+//go:generate bazelisk run --config=mayberemote //:mockery -- --name Server --srcpkg=go.skia.org/infra/go/sser --output ${PWD}
+//go:generate bazelisk run --config=mayberemote //:mockery -- --name PeerFinder --srcpkg=go.skia.org/infra/go/sser --output ${PWD}
+//go:generate bazelisk run --config=mayberemote //:mockery -- --name Interface --srcpkg=k8s.io/client-go/kubernetes --output ${PWD}
diff --git a/go/sser/peerfinder_impl.go b/go/sser/peerfinder_impl.go
new file mode 100644
index 0000000..a478644
--- /dev/null
+++ b/go/sser/peerfinder_impl.go
@@ -0,0 +1,101 @@
+package sser
+
+import (
+ "context"
+
+ "go.skia.org/infra/go/skerr"
+ "go.skia.org/infra/go/sklog"
+ v1 "k8s.io/api/core/v1"
+ metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
+ "k8s.io/client-go/kubernetes"
+)
+
+// PeerFinderImpl implements PeerFinder.
+type PeerFinderImpl struct {
+ clientset kubernetes.Interface
+
+ // kubernetes namespace the pods are running in.
+ namespace string
+
+ // kubernetes label selector used to choose pods that are peers. For example: "app=skiaperf".
+ labelSelector string
+
+ // The most recent version of a kubernetes List response, used in Watch
+ // requests.
+ // https://kubernetes.io/docs/reference/using-api/api-concepts/#efficient-detection-of-changes
+ resourceVersion string
+}
+
+// NewPeerFinder returns a new instance of PeerFinderImpl.
+func NewPeerFinder(clientset kubernetes.Interface, namespace string, labelSelector string) (*PeerFinderImpl, error) {
+ return &PeerFinderImpl{
+ clientset: clientset,
+ namespace: namespace,
+ labelSelector: labelSelector,
+ }, nil
+}
+
+func (p *PeerFinderImpl) findAllPeerPodIPAddresses(ctx context.Context) ([]string, string, error) {
+ pods, err := p.clientset.CoreV1().Pods(p.namespace).List(ctx, metav1.ListOptions{
+ LabelSelector: p.labelSelector,
+ })
+ if err != nil {
+ return nil, "", skerr.Wrapf(err, "Could not list peer pods")
+ }
+
+ urls := make([]string, 0, len(pods.Items))
+ for _, p := range pods.Items {
+ // Note that the PodIP can be the empty string.
+ if p.Status.Phase == v1.PodRunning && p.Status.PodIP != "" {
+ urls = append(urls, p.Status.PodIP)
+ }
+ }
+ return urls, pods.ResourceVersion, nil
+}
+
+// Start implements PeerFinder.
+func (p *PeerFinderImpl) Start(ctx context.Context) ([]string, <-chan []string, error) {
+ var ips []string
+ var err error
+ ips, p.resourceVersion, err = p.findAllPeerPodIPAddresses(ctx)
+ if err != nil {
+ return nil, nil, skerr.Wrapf(err, "populating initial set of peers.")
+ }
+
+ ch := make(chan []string)
+
+ // Now start a background Go routine that uses the kubernetes Watch feature
+ // to keep track of updates to the peer pods.
+ go func() {
+ for {
+ watch, err := p.clientset.CoreV1().Pods(p.namespace).Watch(ctx, metav1.ListOptions{
+ LabelSelector: p.labelSelector,
+ ResourceVersion: p.resourceVersion,
+ })
+ if err != nil {
+ sklog.Warningf("watch for peer changes failed: %s", err)
+ p.resourceVersion = ""
+ } else {
+ watch.Stop()
+ }
+ if ctx.Err() != nil {
+ sklog.Errorf("start Go routine cancelled: %s", err)
+ close(ch)
+ return
+ }
+ // Either our watch has failed, or it has returned successfully, and
+ // in either case we should request a new set of IP addresses. Note
+ // that this also updates the resourceVersion, which is needed for
+ // the Watch to continue.
+ newIps, newResourceVersion, err := p.findAllPeerPodIPAddresses(ctx)
+ if err == nil {
+ p.resourceVersion = newResourceVersion
+ ch <- newIps
+ } else {
+ p.resourceVersion = ""
+ }
+ }
+ }()
+
+ return ips, ch, nil
+}
diff --git a/go/sser/peerfinder_impl_test.go b/go/sser/peerfinder_impl_test.go
new file mode 100644
index 0000000..a8f6bdc
--- /dev/null
+++ b/go/sser/peerfinder_impl_test.go
@@ -0,0 +1,202 @@
+package sser
+
+import (
+ "context"
+ "errors"
+ "testing"
+
+ "github.com/stretchr/testify/mock"
+ "github.com/stretchr/testify/require"
+ "go.skia.org/infra/go/k8s/mocks"
+ watchmocks "go.skia.org/infra/go/k8s/watch/mocks"
+ "go.skia.org/infra/go/testutils"
+ v1 "k8s.io/api/core/v1"
+ metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
+ "k8s.io/client-go/kubernetes"
+)
+
+const (
+ testIPAddress1 = "192.168.1.1"
+ testIPAddress2 = "192.168.1.2"
+
+ testLabelSelector = "app=myTestApp"
+ testNamespace = "default"
+ testResourceVersion = "123"
+ testResourceVersionNext = "124"
+)
+
+var (
+ myMockErr = errors.New("my mock error")
+)
+
+func buildClientSetOnPodListMocks(t *testing.T, items []v1.Pod, err error) (kubernetes.Interface, *mocks.PodInterface) {
+ // Build up all the mocks to handle this chain of calls: p.clientset.CoreV1().Pods(...).List(...)
+ podList := &v1.PodList{
+ ListMeta: metav1.ListMeta{
+ ResourceVersion: testResourceVersionNext,
+ },
+ Items: items,
+ }
+ podInterface := mocks.NewPodInterface(t)
+ podInterface.On("List", testutils.AnyContext, metav1.ListOptions{
+ LabelSelector: testLabelSelector,
+ }).Return(podList, err)
+
+ coreV1 := mocks.NewCoreV1Interface(t)
+ coreV1.On("Pods", testNamespace).Return(podInterface)
+
+ clientset := mocks.NewInterface(t)
+ clientset.On("CoreV1").Return(coreV1)
+
+ return clientset, podInterface
+}
+
+var listOptionsNext = metav1.ListOptions{
+ LabelSelector: testLabelSelector,
+ ResourceVersion: testResourceVersionNext,
+}
+
+func addWatchMocksSuccess(t *testing.T, podsMock *mocks.PodInterface) {
+ watch := watchmocks.NewInterface(t)
+ podsMock.On("Watch", testutils.AnyContext, listOptionsNext).Return(watch, nil)
+ watch.On("Stop")
+}
+
+func addWatchMocksError(t *testing.T, podsMock *mocks.PodInterface) {
+ watch := watchmocks.NewInterface(t)
+ podsMock.On("Watch", testutils.AnyContext, listOptionsNext).Return(watch, myMockErr)
+}
+
+func buildPeerFinderImplOnPodListMocks(t *testing.T, items []v1.Pod, err error) *PeerFinderImpl {
+ clientset, _ := buildClientSetOnPodListMocks(t, items, err)
+ p, err := NewPeerFinder(clientset, testNamespace, testLabelSelector)
+ require.NoError(t, err)
+
+ return p
+}
+
+func TestPeerFinderImplFindAllPeerPodIPAddresses_HappyPath(t *testing.T) {
+ items := []v1.Pod{
+ {
+ Status: v1.PodStatus{
+ Phase: v1.PodRunning,
+ PodIP: testIPAddress1,
+ },
+ },
+ {
+ Status: v1.PodStatus{
+ Phase: v1.PodFailed, // This pod will be ignored because it's not Running.
+ PodIP: testIPAddress2,
+ },
+ },
+ {
+ Status: v1.PodStatus{
+ Phase: v1.PodRunning,
+ PodIP: "", // This pod will be ignored because the IP address is empty.
+ },
+ },
+ }
+
+ p := buildPeerFinderImplOnPodListMocks(t, items, nil)
+
+ peers, version, err := p.findAllPeerPodIPAddresses(context.Background())
+ require.NoError(t, err)
+ require.Equal(t, testResourceVersionNext, version)
+ require.Len(t, peers, 1)
+ require.Equal(t, testIPAddress1, peers[0])
+}
+
+func TestPeerFinderImplFindAllPeerPodIPAddresses_ListReturnsError_ReturnsError(t *testing.T) {
+ p := buildPeerFinderImplOnPodListMocks(t, []v1.Pod{}, myMockErr)
+
+ _, _, err := p.findAllPeerPodIPAddresses(context.Background())
+ require.Contains(t, err.Error(), myMockErr.Error())
+}
+
+func TestPeerFinderImplStart_ListReturnsError_ReturnsError(t *testing.T) {
+ p := buildPeerFinderImplOnPodListMocks(t, []v1.Pod{}, myMockErr)
+
+ _, _, err := p.Start(context.Background())
+ require.Contains(t, err.Error(), myMockErr.Error())
+}
+
+func TestPeerFinderImplStart_WatchReturnsError_ResourceVersionGetsCleared(t *testing.T) {
+ clientset, podMock := buildClientSetOnPodListMocks(t, []v1.Pod{}, nil)
+ addWatchMocksError(t, podMock)
+ p, err := NewPeerFinder(clientset, testNamespace, testLabelSelector)
+ require.NoError(t, err)
+
+ ctx, cancel := context.WithCancel(context.Background())
+ cancel()
+
+ _, ch, err := p.Start(ctx)
+ require.NoError(t, err)
+ <-ch
+ require.Empty(t, p.resourceVersion)
+}
+
+func TestPeerFinderImplStart_WatchDoesNotReturnsError_ResourceVersionGetsUpdated(t *testing.T) {
+ clientset, podMock := buildClientSetOnPodListMocks(t, []v1.Pod{}, nil)
+ addWatchMocksSuccess(t, podMock)
+ p, err := NewPeerFinder(clientset, testNamespace, testLabelSelector)
+ require.NoError(t, err)
+
+ ctx, cancel := context.WithCancel(context.Background())
+ cancel()
+
+ _, ch, err := p.Start(ctx)
+ require.NoError(t, err)
+
+ // Since the context was cancelled the returned PeerURL channel will be
+ // closed.
+ _, ok := <-ch
+ require.False(t, ok)
+ require.Equal(t, testResourceVersionNext, p.resourceVersion)
+}
+
+func TestPeerFinderImplStart_WatchDoesNotReturnError_NewPeerURLsSentOnChannel(t *testing.T) {
+ clientset, podMock := buildClientSetOnPodListMocks(t, []v1.Pod{}, nil)
+ p, err := NewPeerFinder(clientset, testNamespace, testLabelSelector)
+ require.NoError(t, err)
+
+ // Create cancellable context so we can stop the Go routine.
+ ctx, cancel := context.WithCancel(context.Background())
+
+ // Create a mock for Watch that can be called repeatedly.
+ watch := watchmocks.NewInterface(t)
+ podMock.On("Watch", testutils.AnyContext, listOptionsNext).Run(func(_ mock.Arguments) {
+ // Prepare the next call to PodList which returns a different set of
+ // items than our first call to buildClientSetOnPodListMocks().
+ items := []v1.Pod{
+ {
+ Status: v1.PodStatus{
+ Phase: v1.PodRunning,
+ PodIP: testIPAddress2,
+ },
+ },
+ }
+ clientset, podMock := buildClientSetOnPodListMocks(t, items, nil)
+ p.clientset = clientset
+ addWatchMocksSuccess(t, podMock)
+ }).Return(watch, nil)
+ watch.On("Stop")
+
+ _, ch, err := p.Start(ctx)
+ require.NoError(t, err)
+
+ // Wait for the new PeerURLs to be sent down the channel.
+ peers := <-ch
+ require.Equal(t, testIPAddress2, peers[0])
+
+ // Cancel the context so the Go routine in Start() exits.
+ cancel()
+
+ // Wait for the channel to be closed.
+ for {
+ _, ok := <-ch
+ if !ok {
+ return
+ }
+ }
+ // Test will hang forever if 'ch' is never closed.
+}
diff --git a/go/sser/peerfinder_localhost.go b/go/sser/peerfinder_localhost.go
new file mode 100644
index 0000000..4e0b284
--- /dev/null
+++ b/go/sser/peerfinder_localhost.go
@@ -0,0 +1,18 @@
+package sser
+
+import "context"
+
+// PeerFinderLocalhost implements PeerFinder but only finds "127.0.0.1".
+type PeerFinderLocalhost struct{}
+
+// Start implements PeerFinder.
+func (p PeerFinderLocalhost) Start(ctx context.Context) ([]string, <-chan []string, error) {
+ return []string{"127.0.0.1"}, make(chan []string), nil
+}
+
+func NewPeerFinderLocalhost() PeerFinderLocalhost {
+ return PeerFinderLocalhost{}
+}
+
+// Confirm PeerFinderLocalhost implements PeerFinder.
+var _ PeerFinder = PeerFinderLocalhost{}
diff --git a/go/sser/peerfinder_localhost_test.go b/go/sser/peerfinder_localhost_test.go
new file mode 100644
index 0000000..ec3131e
--- /dev/null
+++ b/go/sser/peerfinder_localhost_test.go
@@ -0,0 +1,14 @@
+package sser
+
+import (
+ "context"
+ "testing"
+
+ "github.com/stretchr/testify/require"
+)
+
+func TestPeerFinderLocalhost_Start_AlwaysReturnsTheSameValues(t *testing.T) {
+ ips, _, err := PeerFinderLocalhost{}.Start(context.Background())
+ require.Equal(t, []string{"127.0.0.1"}, ips)
+ require.NoError(t, err)
+}
diff --git a/go/sser/server_impl.go b/go/sser/server_impl.go
new file mode 100644
index 0000000..8a5fc18
--- /dev/null
+++ b/go/sser/server_impl.go
@@ -0,0 +1,190 @@
+package sser
+
+import (
+ "bytes"
+ "context"
+ "encoding/json"
+ "errors"
+ "fmt"
+ "io"
+ "net"
+ "net/http"
+ "net/url"
+
+ "github.com/gorilla/mux"
+ sse "github.com/r3labs/sse/v2"
+ "go.skia.org/infra/go/httputils"
+ "go.skia.org/infra/go/metrics2"
+ "go.skia.org/infra/go/skerr"
+ "go.skia.org/infra/go/sklog"
+ "go.skia.org/infra/go/util_generics"
+)
+
+const (
+ // 100 was picked as a rough guess.
+ serverSendChannelSize = 100
+
+ clientConnectionsMetricName = "sser_server_client_connections"
+)
+
+var (
+ errStreamNameRequired = errors.New("a stream name is required as part of the query parameters")
+)
+
+// Event is serialized as JSON to be sent from a server to each peer.
+type Event struct {
+ Stream string `json:"stream"`
+ Msg string `json:"msg"`
+}
+
+// ServerImpl implements Server.
+type ServerImpl struct {
+ // The HTTP port used for peer connections between all replicas of an app
+ // running in kubernetes.
+ internalPort int
+
+ // Keeps the Server updated with all the peers.
+ peerFinder PeerFinder
+
+ // The SSE server implementation.
+ server *sse.Server
+
+ // Carries messages to be sent from Send() into the go routine that runs
+ // from Start.
+ sendCh chan Event
+
+ // The current list of peer Pods that are in the Running state.
+ peers map[string]*http.Client
+}
+
+// New returns a new Server.
+func New(internalPort int, peerFinder PeerFinder) (*ServerImpl, error) {
+ return &ServerImpl{
+ internalPort: internalPort,
+ peerFinder: peerFinder,
+ server: sse.New(),
+ sendCh: make(chan Event, 100),
+ peers: map[string]*http.Client{},
+ }, nil
+}
+
+func (s *ServerImpl) podIPToURL(ip string) string {
+ var ret url.URL
+ ret.Host = fmt.Sprintf("%s:%d", ip, s.internalPort)
+ ret.Path = PeerEndpointURLPath
+ ret.Scheme = "http"
+ return ret.String()
+}
+
+func (s *ServerImpl) setPeersFromIPAddressSlice(ips []string) {
+ newPeers := map[string]*http.Client{}
+ for _, ip := range ips {
+ u := s.podIPToURL(ip)
+ newPeers[u] = util_generics.Get(s.peers, u, httputils.NewFastTimeoutClient())
+ }
+ s.peers = newPeers
+}
+
+func (s *ServerImpl) handlePeerNotification(w http.ResponseWriter, r *http.Request) {
+ var e Event
+ err := json.NewDecoder(r.Body).Decode(&e)
+ if err != nil {
+ httputils.ReportError(w, err, "invalid JSON", http.StatusBadRequest)
+ return
+ }
+
+ s.server.Publish(e.Stream, &sse.Event{
+ Data: []byte(e.Msg),
+ })
+}
+
+// Start implements Server.
+func (s *ServerImpl) Start(ctx context.Context) error {
+ r := mux.NewRouter()
+ r.HandleFunc(PeerEndpointURLPath, s.handlePeerNotification)
+
+ // For testing purposes a 0 is allowed for internalPort, which will
+ // select an available port on the machine.
+ listener, err := net.Listen("tcp", fmt.Sprintf(":%d", s.internalPort))
+ if err != nil {
+ return skerr.Wrapf(err, "listening on port %d", s.internalPort)
+ }
+
+ // Since internalPort might have been 0, we set s.internalPort to the
+ // Port that was selected.
+ s.internalPort = listener.Addr().(*net.TCPAddr).Port
+
+ // Start an HTTP server on internalPort to listen for events from peer pods.
+ go func() {
+ sklog.Fatal(http.Serve(listener, r))
+ }()
+
+ initial, ch, err := s.peerFinder.Start(ctx)
+ if err != nil {
+ return skerr.Wrap(err)
+ }
+ s.setPeersFromIPAddressSlice(initial)
+
+ // Start a Go routine that orchestrates both updates from PeerFinder, and
+ // requests to send messages to all the peer pods. Avoid the need for a
+ // mutex to protect s.peer by using channels and select.
+ go func() {
+ for {
+ select {
+ case newPeers := <-ch:
+ s.setPeersFromIPAddressSlice(newPeers)
+ case msg := <-s.sendCh:
+ // Serialize msg into JSON.
+ b, err := json.Marshal(msg)
+ if err != nil {
+ sklog.Errorf("failed to serialize Event: %s", err)
+ continue
+ }
+ r := bytes.NewReader(b)
+ // Send msg to each internal Peer endpoint.
+ for peerURL, client := range s.peers {
+ resp, err := client.Post(peerURL, "application/json", r)
+ if err != nil {
+ sklog.Errorf("notifying peer: %s", err)
+ continue
+ }
+ _, err = r.Seek(0, io.SeekStart)
+ if err != nil {
+ sklog.Error("seeking to start of buffer: %s", err)
+ }
+ if resp.StatusCode >= 300 {
+ sklog.Errorf("HTTP StatusCode Not OK: %s", resp.Status)
+ continue
+ }
+ }
+ }
+ }
+ }()
+
+ return nil
+}
+
+// ClientConnectionHandler implements Server.
+func (s *ServerImpl) ClientConnectionHandler(ctx context.Context) http.HandlerFunc {
+ return func(w http.ResponseWriter, r *http.Request) {
+ streamName := r.FormValue(QueryParameterName)
+ if streamName == "" {
+ httputils.ReportError(w, errStreamNameRequired, "A stream name must be supplied", http.StatusBadRequest)
+ return
+ }
+ if !s.server.StreamExists(streamName) {
+ s.server.CreateStream(streamName)
+ }
+ c := metrics2.GetCounter(clientConnectionsMetricName, map[string]string{"stream": streamName})
+ c.Inc(1)
+ s.server.ServeHTTP(w, r)
+ c.Dec(1)
+ }
+}
+
+// Send implements Server.
+func (s *ServerImpl) Send(ctx context.Context, stream string, msg string) {
+ s.sendCh <- Event{Stream: stream, Msg: msg}
+}
+
+var _ Server = (*ServerImpl)(nil)
diff --git a/go/sser/server_impl_test.go b/go/sser/server_impl_test.go
new file mode 100644
index 0000000..212f43d
--- /dev/null
+++ b/go/sser/server_impl_test.go
@@ -0,0 +1,133 @@
+package sser
+
+import (
+ "context"
+ "net/http"
+ "net/http/httptest"
+ "testing"
+
+ "github.com/r3labs/sse/v2"
+ "github.com/stretchr/testify/require"
+ "go.skia.org/infra/go/metrics2"
+ "go.skia.org/infra/go/sser/mocks"
+ "go.skia.org/infra/go/testutils"
+)
+
+const (
+ streamName = "testStreamName"
+ eventValue = "this is a test message"
+)
+
+func TestServerImplPodIPToURL_HappyPath(t *testing.T) {
+ s := &ServerImpl{
+ internalPort: 4000,
+ }
+ require.Equal(t, "http://192.168.1.1:4000/api/json/v1/send", s.podIPToURL("192.168.1.1"))
+}
+
+func createServerAndFrontendForTest(t *testing.T) (context.Context, *ServerImpl, *httptest.Server) {
+ ctx := context.Background()
+
+ // Create a PeerFinder that just returns localhost.
+ peerFinderMock := mocks.NewPeerFinder(t)
+ ipCh := make(chan []string)
+ var castChan <-chan []string = ipCh
+ peerFinderMock.On("Start", testutils.AnyContext).Return([]string{"127.0.0.1"}, castChan, nil)
+
+ // Create a new Server that uses any available port to listen for peer connections.
+ sserServer, err := New(0, peerFinderMock)
+ require.NoError(t, err)
+ err = sserServer.Start(ctx)
+ require.NoError(t, err)
+
+ // Create a new web server, aka the frontend, at a different random port,
+ // that handles incoming SSE client connections.
+ frontend := httptest.NewServer(sserServer.ClientConnectionHandler(ctx))
+ t.Cleanup(frontend.Close)
+
+ metrics2.GetCounter(clientConnectionsMetricName, map[string]string{QueryParameterName: streamName}).Reset()
+
+ return ctx, sserServer, frontend
+}
+
+func TestServer_HappyPath(t *testing.T) {
+ ctx, sserServer, frontend := createServerAndFrontendForTest(t)
+
+ // Create an SSE client that talks to the above frontend.
+ client := sse.NewClient(frontend.URL + PeerEndpointURLPath)
+
+ // Listen for events on the given channel.
+ events := make(chan *sse.Event)
+ err := client.SubscribeChan(streamName, events)
+ t.Cleanup(func() {
+ client.Unsubscribe(events)
+ })
+ require.NoError(t, err)
+
+ // Send an event via the Server, which the client should receive via the frontend.
+ sserServer.Send(ctx, streamName, eventValue)
+
+ // Confirm the client received the correct event.
+ e := <-events
+ require.Equal(t, eventValue, string(e.Data))
+
+ require.Equal(t, int64(1),
+ metrics2.GetCounter(clientConnectionsMetricName, map[string]string{QueryParameterName: streamName}).Get())
+}
+
+func TestServer_PeerFinderReturnsError_StartReturnsError(t *testing.T) {
+ ctx := context.Background()
+
+ // Create a PeerFinder that returns an error.
+ peerFinderMock := mocks.NewPeerFinder(t)
+ peerFinderMock.On("Start", testutils.AnyContext).Return(nil, nil, myMockErr)
+
+ // Create a new Server that uses any available port to listen for peer connections.
+ s, err := New(0, peerFinderMock)
+ require.NoError(t, err)
+ err = s.Start(ctx)
+ require.Error(t, err)
+}
+
+func TestServer_TwoClientsForSameStream_BothReceiveEvents(t *testing.T) {
+ ctx, sserServer, frontend := createServerAndFrontendForTest(t)
+
+ // Create an SSE client that talks to the above frontend.
+ client1 := sse.NewClient(frontend.URL + PeerEndpointURLPath)
+ events1 := make(chan *sse.Event)
+ err := client1.SubscribeChan(streamName, events1)
+ t.Cleanup(func() {
+ client1.Unsubscribe(events1)
+ })
+ require.NoError(t, err)
+
+ client2 := sse.NewClient(frontend.URL + PeerEndpointURLPath)
+ events2 := make(chan *sse.Event)
+ err = client2.SubscribeChan(streamName, events2)
+ t.Cleanup(func() {
+ client2.Unsubscribe(events2)
+ })
+ require.NoError(t, err)
+
+ // Send an event via the Server, which the client should receive via the frontend.
+ sserServer.Send(ctx, streamName, eventValue)
+
+ // Confirm the client received the correct event.
+ e := <-events1
+ require.Equal(t, eventValue, string(e.Data))
+
+ e = <-events2
+ require.Equal(t, eventValue, string(e.Data))
+
+ require.Equal(t, int64(2),
+ metrics2.GetCounter(clientConnectionsMetricName, map[string]string{QueryParameterName: streamName}).Get())
+}
+
+func TestClientConnectionHandler_NoStreamNameProvided_ReturnsStatusBadRequest(t *testing.T) {
+ ctx, sserServer, _ := createServerAndFrontendForTest(t)
+ w := httptest.NewRecorder()
+ r := httptest.NewRequest("GET", "/just/a/query/path/with/no/query/parameters", nil)
+
+ sserServer.ClientConnectionHandler(ctx)(w, r)
+ require.Equal(t, http.StatusBadRequest, w.Code)
+}
diff --git a/go/sser/sser.go b/go/sser/sser.go
new file mode 100644
index 0000000..d8c7d16
--- /dev/null
+++ b/go/sser/sser.go
@@ -0,0 +1,107 @@
+// Package sser allows handling Server-Sent Events (SSE) connections from across
+// kubernetes Pod replicas, for example a Deployment or a StatefulSet, and also
+// allows sending events from any of those Pods to all clients listening on a
+// stream regardless of which Pod they are connected to.
+//
+// The diagram below assumes the internal port is :7000, and the public
+// facing/frontend port is :8000. These are only for example and both are
+// changeable. In the diagram there are 3 pods in a Deployment and each pod is
+// able to handle incoming connections from a client wanting to receive a
+// Server-Sent Event. The clients always connect to the frontend. (1)(2)
+//
+// At any point, due to internal processing, the app can send a message to every
+// client listening on the "foo" stream by callling Server.Send(ctx, "foo", "my
+// message"). The sser.Server then sends the stream name and message to every
+// pod via HTTP POST to every pod's internal endpoint. (4)
+//
+// When each sser.Server handles that request it the sends the message to every
+// client connnected to that Pod that is listening on that stream. (5)
+//
+//
+// +---------------------------+
+// | |
+// | Pod 1 |
+// | |
+// +-----> :7000/api/json/v1/send |
+// | | | (1) Client A - listen on "foo"
+// | | :8000/events <-------------------------------------------
+// | | | (5) Client A receives "my message"
+// | +---------------------------+
+// |
+// | +---------------------------+
+// | | |
+// (4) +---| Pod 2 |
+// HTTP Post to every | | | (3) Send(ctx, "foo", "my message")
+// peer Pods internal +-----> :7000/api/json/v1/send |
+// port. | | |
+// | | :8000/events |
+// | | |
+// | +---------------------------+
+// |
+// | +---------------------------+
+// | | |
+// | | Pod N |
+// | | |
+// +-----> :7000/api/json/v1/send |
+// | | (2) Client B - listen on "foo"
+// | :8000/events <--------------------------------------------
+// | | (5) Client B receives "my message"
+// +---------------------------+
+//
+// Finding all the peer Pods is handled via the Kubernetes API, and any changes
+// to the peers are handled through a watch, so the list of peers is always up
+// to date.
+//
+// https://kubernetes.io/docs/reference/using-api/api-concepts/#efficient-detection-of-changes
+//
+// This package makes no delivery guarantees and has all the same race
+// conditions that are possible with the underlying SSE protocol.
+//
+// The events passed around to the peers are all on internal ports, so we don't
+// have to protect those ports, unlike the frontend ports.
+package sser
+
+import (
+ "context"
+ "net/http"
+)
+
+const (
+ // PeerEndpointURLPath is the URL path that the server will listen on for
+ // incoming events that need to be distributed to all connected clients.
+ PeerEndpointURLPath = "/api/json/v1/send"
+
+ // QueryParameterName is the query parameter that the sse client and server library look
+ // for by default.
+ QueryParameterName = "stream"
+)
+
+// PeerFinder finds the IP addresses of all of the pods that make up this set of
+// replicas.
+type PeerFinder interface {
+ // Start returns a slice of IP Addresses that represent all the peers of
+ // this pod that are in the Running state. It also returns a channel that
+ // will provide updated slices of IP addresses every time they change. Note
+ // that this includes the running pod itself.
+ Start(ctx context.Context) ([]string, <-chan []string, error)
+}
+
+// Server allows handling incoming SSE connection requests, and sending events
+// across multiple pods.
+type Server interface {
+ // Start the SSE server.
+ //
+ // Start will automatically start listening on an internal port
+ // at PeerEndpointURLPath.
+ //
+ // Start must be called before ClientConnectionHandler() or Send().
+ Start(ctx context.Context) error
+
+ // ClientConnectionHandler returns an http.Handler that can be registered to
+ // handle requests from SSE clients on the frontend port.
+ ClientConnectionHandler(ctx context.Context) http.HandlerFunc
+
+ // Send a message to all connections on all peer pods that are listening for
+ // events from the given stream name.
+ Send(ctx context.Context, stream string, msg string)
+}
diff --git a/go_repositories.bzl b/go_repositories.bzl
index f207bd8..ba8804b 100644
--- a/go_repositories.bzl
+++ b/go_repositories.bzl
@@ -2999,6 +2999,12 @@
sum = "h1:NNFoKms+kut6ABPf6xiKNM5214jzxAhDBrPHCJ97Wg0=",
version = "v0.2.1",
)
+ go_repository(
+ name = "com_github_r3labs_sse_v2",
+ importpath = "github.com/r3labs/sse/v2",
+ sum = "h1:lZH+W4XOLIq88U5MIHOsLec7+R62uhz3bIi2yn0Sg8o=",
+ version = "v2.8.1",
+ )
go_repository(
name = "com_github_rcrowley_go_metrics",
@@ -3750,6 +3756,12 @@
sum = "h1:jMFz6MfLP0/4fUyZle81rXUoxOBFi19VUFKVDOQfozc=",
version = "v2.2.6",
)
+ go_repository(
+ name = "in_gopkg_cenkalti_backoff_v1",
+ importpath = "gopkg.in/cenkalti/backoff.v1",
+ sum = "h1:Arh75ttbsvlpVA7WtVpH4u9h6Zl46xuptxqLxPiSo4Y=",
+ version = "v1.1.0",
+ )
go_repository(
name = "in_gopkg_check_v1",
diff --git a/skfe/computed.json b/skfe/computed.json
index 3d7fd50..93c62e0 100644
--- a/skfe/computed.json
+++ b/skfe/computed.json
@@ -2017,6 +2017,32 @@
"seconds": 1
},
"load_assignment": {
+ "cluster_name": "sserexample",
+ "endpoints": [
+ {
+ "lb_endpoints": [
+ {
+ "endpoint": {
+ "address": {
+ "socket_address": {
+ "address": "sserexample",
+ "port_value": 8000
+ }
+ }
+ }
+ }
+ ]
+ }
+ ]
+ },
+ "name": "sserexample",
+ "type": "STRICT_DNS"
+ },
+ {
+ "connect_timeout": {
+ "seconds": 1
+ },
+ "load_assignment": {
"cluster_name": "status",
"endpoints": [
{
@@ -4571,6 +4597,23 @@
]
},
{
+ "domains": "sserexample.skia.org",
+ "name": "sserexample.skia.org",
+ "routes": [
+ {
+ "match": {
+ "prefix": "/"
+ },
+ "route": {
+ "cluster": "sserexample",
+ "timeout": {
+ "seconds": 600
+ }
+ }
+ }
+ ]
+ },
+ {
"domains": "status.skia.org",
"name": "status.skia.org",
"routes": [