blob: a8f6bdccc8d28e8d0484b4f16eaf325c5c886f4a [file] [log] [blame] [edit]
package sser
import (
"context"
"errors"
"testing"
"github.com/stretchr/testify/mock"
"github.com/stretchr/testify/require"
"go.skia.org/infra/go/k8s/mocks"
watchmocks "go.skia.org/infra/go/k8s/watch/mocks"
"go.skia.org/infra/go/testutils"
v1 "k8s.io/api/core/v1"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/client-go/kubernetes"
)
const (
testIPAddress1 = "192.168.1.1"
testIPAddress2 = "192.168.1.2"
testLabelSelector = "app=myTestApp"
testNamespace = "default"
testResourceVersion = "123"
testResourceVersionNext = "124"
)
var (
myMockErr = errors.New("my mock error")
)
func buildClientSetOnPodListMocks(t *testing.T, items []v1.Pod, err error) (kubernetes.Interface, *mocks.PodInterface) {
// Build up all the mocks to handle this chain of calls: p.clientset.CoreV1().Pods(...).List(...)
podList := &v1.PodList{
ListMeta: metav1.ListMeta{
ResourceVersion: testResourceVersionNext,
},
Items: items,
}
podInterface := mocks.NewPodInterface(t)
podInterface.On("List", testutils.AnyContext, metav1.ListOptions{
LabelSelector: testLabelSelector,
}).Return(podList, err)
coreV1 := mocks.NewCoreV1Interface(t)
coreV1.On("Pods", testNamespace).Return(podInterface)
clientset := mocks.NewInterface(t)
clientset.On("CoreV1").Return(coreV1)
return clientset, podInterface
}
var listOptionsNext = metav1.ListOptions{
LabelSelector: testLabelSelector,
ResourceVersion: testResourceVersionNext,
}
func addWatchMocksSuccess(t *testing.T, podsMock *mocks.PodInterface) {
watch := watchmocks.NewInterface(t)
podsMock.On("Watch", testutils.AnyContext, listOptionsNext).Return(watch, nil)
watch.On("Stop")
}
func addWatchMocksError(t *testing.T, podsMock *mocks.PodInterface) {
watch := watchmocks.NewInterface(t)
podsMock.On("Watch", testutils.AnyContext, listOptionsNext).Return(watch, myMockErr)
}
func buildPeerFinderImplOnPodListMocks(t *testing.T, items []v1.Pod, err error) *PeerFinderImpl {
clientset, _ := buildClientSetOnPodListMocks(t, items, err)
p, err := NewPeerFinder(clientset, testNamespace, testLabelSelector)
require.NoError(t, err)
return p
}
func TestPeerFinderImplFindAllPeerPodIPAddresses_HappyPath(t *testing.T) {
items := []v1.Pod{
{
Status: v1.PodStatus{
Phase: v1.PodRunning,
PodIP: testIPAddress1,
},
},
{
Status: v1.PodStatus{
Phase: v1.PodFailed, // This pod will be ignored because it's not Running.
PodIP: testIPAddress2,
},
},
{
Status: v1.PodStatus{
Phase: v1.PodRunning,
PodIP: "", // This pod will be ignored because the IP address is empty.
},
},
}
p := buildPeerFinderImplOnPodListMocks(t, items, nil)
peers, version, err := p.findAllPeerPodIPAddresses(context.Background())
require.NoError(t, err)
require.Equal(t, testResourceVersionNext, version)
require.Len(t, peers, 1)
require.Equal(t, testIPAddress1, peers[0])
}
func TestPeerFinderImplFindAllPeerPodIPAddresses_ListReturnsError_ReturnsError(t *testing.T) {
p := buildPeerFinderImplOnPodListMocks(t, []v1.Pod{}, myMockErr)
_, _, err := p.findAllPeerPodIPAddresses(context.Background())
require.Contains(t, err.Error(), myMockErr.Error())
}
func TestPeerFinderImplStart_ListReturnsError_ReturnsError(t *testing.T) {
p := buildPeerFinderImplOnPodListMocks(t, []v1.Pod{}, myMockErr)
_, _, err := p.Start(context.Background())
require.Contains(t, err.Error(), myMockErr.Error())
}
func TestPeerFinderImplStart_WatchReturnsError_ResourceVersionGetsCleared(t *testing.T) {
clientset, podMock := buildClientSetOnPodListMocks(t, []v1.Pod{}, nil)
addWatchMocksError(t, podMock)
p, err := NewPeerFinder(clientset, testNamespace, testLabelSelector)
require.NoError(t, err)
ctx, cancel := context.WithCancel(context.Background())
cancel()
_, ch, err := p.Start(ctx)
require.NoError(t, err)
<-ch
require.Empty(t, p.resourceVersion)
}
func TestPeerFinderImplStart_WatchDoesNotReturnsError_ResourceVersionGetsUpdated(t *testing.T) {
clientset, podMock := buildClientSetOnPodListMocks(t, []v1.Pod{}, nil)
addWatchMocksSuccess(t, podMock)
p, err := NewPeerFinder(clientset, testNamespace, testLabelSelector)
require.NoError(t, err)
ctx, cancel := context.WithCancel(context.Background())
cancel()
_, ch, err := p.Start(ctx)
require.NoError(t, err)
// Since the context was cancelled the returned PeerURL channel will be
// closed.
_, ok := <-ch
require.False(t, ok)
require.Equal(t, testResourceVersionNext, p.resourceVersion)
}
func TestPeerFinderImplStart_WatchDoesNotReturnError_NewPeerURLsSentOnChannel(t *testing.T) {
clientset, podMock := buildClientSetOnPodListMocks(t, []v1.Pod{}, nil)
p, err := NewPeerFinder(clientset, testNamespace, testLabelSelector)
require.NoError(t, err)
// Create cancellable context so we can stop the Go routine.
ctx, cancel := context.WithCancel(context.Background())
// Create a mock for Watch that can be called repeatedly.
watch := watchmocks.NewInterface(t)
podMock.On("Watch", testutils.AnyContext, listOptionsNext).Run(func(_ mock.Arguments) {
// Prepare the next call to PodList which returns a different set of
// items than our first call to buildClientSetOnPodListMocks().
items := []v1.Pod{
{
Status: v1.PodStatus{
Phase: v1.PodRunning,
PodIP: testIPAddress2,
},
},
}
clientset, podMock := buildClientSetOnPodListMocks(t, items, nil)
p.clientset = clientset
addWatchMocksSuccess(t, podMock)
}).Return(watch, nil)
watch.On("Stop")
_, ch, err := p.Start(ctx)
require.NoError(t, err)
// Wait for the new PeerURLs to be sent down the channel.
peers := <-ch
require.Equal(t, testIPAddress2, peers[0])
// Cancel the context so the Go routine in Start() exits.
cancel()
// Wait for the channel to be closed.
for {
_, ok := <-ch
if !ok {
return
}
}
// Test will hang forever if 'ch' is never closed.
}