blob: 7253d458e7963b5559f05be5dd34943d40a5e931 [file] [log] [blame] [edit]
package main
import (
"context"
"flag"
"fmt"
"net/http"
"os"
"time"
"github.com/go-chi/chi/v5"
"go.skia.org/infra/go/common"
"go.skia.org/infra/go/httputils"
"go.skia.org/infra/go/sklog"
"go.skia.org/infra/go/sser"
"k8s.io/client-go/kubernetes"
"k8s.io/client-go/rest"
)
// flags
var (
local = flag.Bool("local", false, "Running locally if true. As opposed to in production.")
port = flag.String("port", ":8000", "HTTP service address (e.g., ':8000')")
sserPort = flag.Int("sser_port", 7000, "Server-Sent Events peer connection port.")
promPort = flag.String("prom_port", ":20000", "Metrics service address (e.g., ':10110')")
namespace = flag.String("namespace", "default", "Namespace this application is running in.")
labelSelector = flag.String("label_selector", "app=sserexample", "Label selector for peers of this application.")
)
const page = `<!DOCTYPE html>
<html>
<head>
<title></title>
<meta charset="utf-8" />
</head>
<body>
<pre></pre>
<script type="text/javascript" charset="utf-8">
const pre = document.querySelector('pre');
const evtSource = new EventSource('/_/sse?stream=counter');
const messages = [];
evtSource.onmessage = (event) => {
messages.push(event.data);
while (messages.length > 10) {
messages.shift();
}
pre.textContent = messages.join('\n');
};
</script>
</body>
</html>
`
func index(w http.ResponseWriter, r *http.Request) {
w.Header().Set("Context-Type", "text/html")
_, err := w.Write([]byte(page))
if err != nil {
sklog.Errorf("write index page: %s", err)
}
}
func main() {
ctx := context.Background()
common.InitWithMust(
"sserexample",
common.PrometheusOpt(promPort),
)
var peerFinder sser.PeerFinder
if !*local {
config, err := rest.InClusterConfig()
if err != nil {
sklog.Fatal(err)
}
clientset, err := kubernetes.NewForConfig(config)
if err != nil {
sklog.Fatal(err)
}
peerFinder, err = sser.NewPeerFinder(clientset, *namespace, *labelSelector)
if err != nil {
sklog.Fatal(err)
}
} else {
peerFinder = sser.NewPeerFinderLocalhost()
}
sserServer, err := sser.New(*sserPort, peerFinder)
if err != nil {
sklog.Fatal(err)
}
err = sserServer.Start(ctx)
if err != nil {
sklog.Fatal(err)
}
r := chi.NewRouter()
r.HandleFunc("/", index)
r.HandleFunc("/_/sse", sserServer.ClientConnectionHandler(ctx))
var h http.Handler = r
if !*local {
h = httputils.HealthzAndHTTPS(h)
}
hostname, err := os.Hostname()
if err != nil {
sklog.Fatal(err)
}
count := 0
go func() {
for range time.Tick(time.Second) {
count++
if err := sserServer.Send(ctx, "counter", fmt.Sprintf("%s - %d", hostname, count)); err != nil {
sklog.Errorf("failed to send: %s", err)
}
}
}()
http.Handle("/", h)
sklog.Info("Ready to serve.")
sklog.Fatal(http.ListenAndServe(*port, nil))
}