blob: ac960d58d40f50f7195a13c63ee919133e8a8d1c [file] [log] [blame]
// Package sser allows handling Server-Sent Events (SSE) connections from across
// kubernetes Pod replicas, for example a Deployment or a StatefulSet, and also
// allows sending events from any of those Pods to all clients listening on a
// stream regardless of which Pod they are connected to.
//
// The diagram below assumes the internal port is :7000, and the public
// facing/frontend port is :8000. These are only for example and both are
// changeable. In the diagram there are 3 pods in a Deployment and each pod is
// able to handle incoming connections from a client wanting to receive a
// Server-Sent Event. The clients always connect to the frontend. (1)(2)
//
// At any point, due to internal processing, the app can send a message to every
// client listening on the "foo" stream by callling Server.Send(ctx, "foo", "my
// message"). The sser.Server then sends the stream name and message to every
// pod via HTTP POST to every pod's internal endpoint. (4)
//
// When each sser.Server handles that request it the sends the message to every
// client connnected to that Pod that is listening on that stream. (5)
//
// +---------------------------+
// | |
// | Pod 1 |
// | |
// +-----> :7000/api/json/v1/send |
// | | | (1) Client A - listen on "foo"
// | | :8000/events <-------------------------------------------
// | | | (5) Client A receives "my message"
// | +---------------------------+
// |
// | +---------------------------+
// | | |
// (4) +---| Pod 2 |
// HTTP Post to every | | | (3) Send(ctx, "foo", "my message")
// peer Pods internal +-----> :7000/api/json/v1/send |
// port. | | |
// | | :8000/events |
// | | |
// | +---------------------------+
// |
// | +---------------------------+
// | | |
// | | Pod N |
// | | |
// +-----> :7000/api/json/v1/send |
// | | (2) Client B - listen on "foo"
// | :8000/events <--------------------------------------------
// | | (5) Client B receives "my message"
// +---------------------------+
//
// Finding all the peer Pods is handled via the Kubernetes API, and any changes
// to the peers are handled through a watch, so the list of peers is always up
// to date.
//
// https://kubernetes.io/docs/reference/using-api/api-concepts/#efficient-detection-of-changes
//
// This package makes no delivery guarantees and has all the same race
// conditions that are possible with the underlying SSE protocol.
//
// The events passed around to the peers are all on internal ports, so we don't
// have to protect those ports, unlike the frontend ports.
package sser
import (
"context"
"net/http"
)
const (
// PeerEndpointURLPath is the URL path that the server will listen on for
// incoming events that need to be distributed to all connected clients.
PeerEndpointURLPath = "/api/json/v1/send"
// QueryParameterName is the query parameter that the sse client and server library look
// for by default.
QueryParameterName = "stream"
)
// PeerFinder finds the IP addresses of all of the pods that make up this set of
// replicas.
type PeerFinder interface {
// Start returns a slice of IP Addresses that represent all the peers of
// this pod that are in the Running state. It also returns a channel that
// will provide updated slices of IP addresses every time they change. Note
// that this includes the running pod itself.
Start(ctx context.Context) ([]string, <-chan []string, error)
}
// Server allows handling incoming SSE connection requests, and sending events
// across multiple pods.
type Server interface {
// Start the SSE server.
//
// Start will automatically start listening on an internal port
// at PeerEndpointURLPath.
//
// Start must be called before ClientConnectionHandler() or Send().
Start(ctx context.Context) error
// ClientConnectionHandler returns an http.Handler that can be registered to
// handle requests from SSE clients on the frontend port.
ClientConnectionHandler(ctx context.Context) http.HandlerFunc
// Send a message to all connections on all peer pods that are listening for
// events from the given stream name.
Send(ctx context.Context, stream string, msg string) error
}