blob: 702a469aaf657c793f9d96b4dbb74b01f1c0eaae [file] [log] [blame]
package swarmingv2
import (
"context"
"fmt"
"testing"
"time"
"github.com/stretchr/testify/require"
apipb "go.chromium.org/luci/swarming/proto/api_v2"
"go.skia.org/infra/go/cipd"
"go.skia.org/infra/go/common"
"go.skia.org/infra/go/now"
"go.skia.org/infra/go/swarming"
"go.skia.org/infra/go/swarming/v2/mocks"
"go.skia.org/infra/go/testutils"
"go.skia.org/infra/task_scheduler/go/types"
"google.golang.org/protobuf/types/known/timestamppb"
)
const (
fakeDigest = "abcd1234abcd1234abcd1234abcd1234abcd1234abcd1234abcd1234abcd1234/987"
fakeDigestHash = "abcd1234abcd1234abcd1234abcd1234abcd1234abcd1234abcd1234abcd1234"
fakeDigestSize = 987
)
func TestGetFreeMachines_CombinesPagedResponses(t *testing.T) {
ctx := context.Background()
client := &mocks.SwarmingV2Client{}
s := &SwarmingV2TaskExecutor{
casInstance: "fake-cas-instance",
pubSubTopic: "fake-pubsub-topic",
client: client,
}
req1 := &apipb.BotsRequest{
Limit: 1000,
Dimensions: []*apipb.StringPair{
{Key: "pool", Value: "fake-pool"},
},
IsBusy: apipb.NullableBool_FALSE,
IsDead: apipb.NullableBool_FALSE,
InMaintenance: apipb.NullableBool_FALSE,
Quarantined: apipb.NullableBool_FALSE,
}
client.On("ListBots", testutils.AnyContext, req1).Return(&apipb.BotInfoListResponse{
Cursor: "cursor1",
Items: []*apipb.BotInfo{
{BotId: "1"},
{BotId: "2"},
},
}, nil)
req2 := &apipb.BotsRequest{
Limit: 1000,
Dimensions: []*apipb.StringPair{
{Key: "pool", Value: "fake-pool"},
},
IsBusy: apipb.NullableBool_FALSE,
IsDead: apipb.NullableBool_FALSE,
InMaintenance: apipb.NullableBool_FALSE,
Quarantined: apipb.NullableBool_FALSE,
Cursor: "cursor1",
}
client.On("ListBots", testutils.AnyContext, req2).Return(&apipb.BotInfoListResponse{
Items: []*apipb.BotInfo{
{BotId: "3"},
{BotId: "4"},
},
}, nil)
machines, err := s.GetFreeMachines(ctx, "fake-pool")
require.NoError(t, err)
require.Equal(t, []*types.Machine{
{ID: "1", Dimensions: []string{}},
{ID: "2", Dimensions: []string{}},
{ID: "3", Dimensions: []string{}},
{ID: "4", Dimensions: []string{}},
}, machines)
}
func TestGetPendingTasks_CombinesPagedResponses(t *testing.T) {
ts := time.Unix(1715176877, 0) // Arbitrary time.
ctx := now.TimeTravelingContext(t.Context(), ts)
client := &mocks.SwarmingV2Client{}
s := &SwarmingV2TaskExecutor{
casInstance: "fake-cas-instance",
pubSubTopic: "fake-pubsub-topic",
client: client,
}
req1 := &apipb.TasksWithPerfRequest{
Limit: 1000,
Start: timestamppb.New(ts.Add(-2 * 24 * time.Hour)),
End: timestamppb.New(ts),
State: apipb.StateQuery_QUERY_PENDING,
Tags: []string{"pool:fake-pool"},
IncludePerformanceStats: false,
}
client.On("ListTasks", testutils.AnyContext, req1).Return(&apipb.TaskListResponse{
Cursor: "cursor1",
Items: []*apipb.TaskResultResponse{
{TaskId: "1", State: apipb.TaskState_PENDING},
{TaskId: "2", State: apipb.TaskState_PENDING},
},
}, nil)
req2 := &apipb.TasksWithPerfRequest{
Limit: 1000,
Start: timestamppb.New(ts.Add(-2 * 24 * time.Hour)),
End: timestamppb.New(ts),
State: apipb.StateQuery_QUERY_PENDING,
Tags: []string{"pool:fake-pool"},
IncludePerformanceStats: false,
Cursor: "cursor1",
}
client.On("ListTasks", testutils.AnyContext, req2).Return(&apipb.TaskListResponse{
Items: []*apipb.TaskResultResponse{
{TaskId: "3", State: apipb.TaskState_PENDING},
{TaskId: "4", State: apipb.TaskState_PENDING},
},
}, nil)
tasks, err := s.GetPendingTasks(ctx, "fake-pool")
require.NoError(t, err)
require.Equal(t, []*types.TaskResult{
{ID: "1", Status: types.TASK_STATUS_PENDING, Tags: map[string][]string{}},
{ID: "2", Status: types.TASK_STATUS_PENDING, Tags: map[string][]string{}},
{ID: "3", Status: types.TASK_STATUS_PENDING, Tags: map[string][]string{}},
{ID: "4", Status: types.TASK_STATUS_PENDING, Tags: map[string][]string{}},
}, tasks)
}
func TestGetTaskResult(t *testing.T) {
ctx := context.Background()
client := &mocks.SwarmingV2Client{}
s := &SwarmingV2TaskExecutor{
casInstance: "fake-cas-instance",
pubSubTopic: "fake-pubsub-topic",
client: client,
}
client.On("GetResult", testutils.AnyContext, &apipb.TaskIdWithPerfRequest{
TaskId: "task-id",
IncludePerformanceStats: false,
}).Return(&apipb.TaskResultResponse{
TaskId: "task-id",
State: apipb.TaskState_COMPLETED,
}, nil)
task, err := s.GetTaskResult(ctx, "task-id")
require.NoError(t, err)
require.Equal(t, &types.TaskResult{
ID: "task-id",
Status: types.TASK_STATUS_SUCCESS,
Tags: map[string][]string{},
}, task)
}
func TestGetTaskCompletionStatuses(t *testing.T) {
ctx := context.Background()
client := &mocks.SwarmingV2Client{}
s := &SwarmingV2TaskExecutor{
casInstance: "fake-cas-instance",
pubSubTopic: "fake-pubsub-topic",
client: client,
}
var taskIds []string
var taskStates []apipb.TaskState
var expectResult []bool
addTask := func(id string, state apipb.TaskState, isFinished bool) {
taskIds = append(taskIds, id)
taskStates = append(taskStates, state)
expectResult = append(expectResult, isFinished)
}
addTask("1", apipb.TaskState_BOT_DIED, true)
addTask("2", apipb.TaskState_CANCELED, true)
addTask("3", apipb.TaskState_CLIENT_ERROR, true)
addTask("4", apipb.TaskState_COMPLETED, true)
addTask("5", apipb.TaskState_EXPIRED, true)
addTask("6", apipb.TaskState_INVALID, true)
addTask("7", apipb.TaskState_KILLED, true)
addTask("8", apipb.TaskState_NO_RESOURCE, true)
addTask("9", apipb.TaskState_TIMED_OUT, true)
addTask("10", apipb.TaskState_PENDING, false)
addTask("11", apipb.TaskState_RUNNING, false)
client.On("ListTaskStates", testutils.AnyContext, &apipb.TaskStatesRequest{
TaskId: taskIds,
}).Return(&apipb.TaskStates{
States: taskStates,
}, nil)
actual, err := s.GetTaskCompletionStatuses(ctx, taskIds)
require.NoError(t, err)
require.Len(t, actual, len(expectResult))
for idx, expect := range expectResult {
require.Equal(t, expect, actual[idx], taskStates[idx])
}
}
func TestTriggerTask(t *testing.T) {
ctx := context.Background()
client := &mocks.SwarmingV2Client{}
s := &SwarmingV2TaskExecutor{
casInstance: "fake-cas-instance",
pubSubTopic: "fake-pubsub-topic",
client: client,
}
expectRequest := &apipb.NewTaskRequest{
Name: "task-name",
Priority: swarming.RECOMMENDED_PRIORITY,
PubsubTopic: fmt.Sprintf(swarming.PUBSUB_FULLY_QUALIFIED_TOPIC_TMPL, common.PROJECT_ID, s.pubSubTopic),
TaskSlices: []*apipb.TaskSlice{
{
ExpirationSecs: int32(swarming.RECOMMENDED_EXPIRATION.Seconds()),
Properties: &apipb.TaskProperties{
CasInputRoot: &apipb.CASReference{
CasInstance: "fake-cas-instance",
Digest: &apipb.Digest{
Hash: "abcd1234abcd1234abcd1234abcd1234abcd1234abcd1234abcd1234abcd1234",
SizeBytes: fakeDigestSize,
},
},
Dimensions: []*apipb.StringPair{
{Key: "os", Value: "Linux"},
},
ExecutionTimeoutSecs: int32(swarming.RECOMMENDED_HARD_TIMEOUT.Seconds()),
IoTimeoutSecs: int32(swarming.RECOMMENDED_IO_TIMEOUT.Seconds()),
},
},
},
User: swarmingUser,
}
client.On("NewTask", testutils.AnyContext, expectRequest).Return(&apipb.TaskRequestMetadataResponse{
TaskId: "task-id",
Request: &apipb.TaskRequestResponse{},
}, nil)
res, err := s.TriggerTask(ctx, &types.TaskRequest{
Name: "task-name",
CasInput: fakeDigest,
Dimensions: []string{"os:Linux"},
})
require.NoError(t, err)
require.Equal(t, &types.TaskResult{
ID: "task-id",
Status: types.TASK_STATUS_PENDING,
}, res)
}
func TestTriggerTask_Minimal(t *testing.T) {
ctx := context.Background()
client := &mocks.SwarmingV2Client{}
s := &SwarmingV2TaskExecutor{
casInstance: "fake-cas-instance",
pubSubTopic: "fake-pubsub-topic",
client: client,
}
expectRequest := &apipb.NewTaskRequest{
Name: "task-name",
Priority: swarming.RECOMMENDED_PRIORITY,
PubsubTopic: fmt.Sprintf(swarming.PUBSUB_FULLY_QUALIFIED_TOPIC_TMPL, common.PROJECT_ID, s.pubSubTopic),
TaskSlices: []*apipb.TaskSlice{
{
ExpirationSecs: int32(swarming.RECOMMENDED_EXPIRATION.Seconds()),
Properties: &apipb.TaskProperties{
CasInputRoot: &apipb.CASReference{
CasInstance: "fake-cas-instance",
Digest: &apipb.Digest{
Hash: "abcd1234abcd1234abcd1234abcd1234abcd1234abcd1234abcd1234abcd1234",
SizeBytes: fakeDigestSize,
},
},
Dimensions: []*apipb.StringPair{
{Key: "os", Value: "Linux"},
},
ExecutionTimeoutSecs: int32(swarming.RECOMMENDED_HARD_TIMEOUT.Seconds()),
IoTimeoutSecs: int32(swarming.RECOMMENDED_IO_TIMEOUT.Seconds()),
},
},
},
User: swarmingUser,
}
client.On("NewTask", testutils.AnyContext, expectRequest).Return(&apipb.TaskRequestMetadataResponse{
TaskId: "task-id",
Request: &apipb.TaskRequestResponse{},
}, nil)
res, err := s.TriggerTask(ctx, &types.TaskRequest{
Name: "task-name",
CasInput: fakeDigest,
Dimensions: []string{"os:Linux"},
})
require.NoError(t, err)
require.Equal(t, &types.TaskResult{
ID: "task-id",
Status: types.TASK_STATUS_PENDING,
}, res)
}
func TestConvertTaskRequest_Minimal(t *testing.T) {
s := &SwarmingV2TaskExecutor{
casInstance: "fake-cas-instance",
pubSubTopic: "fake-pubsub-topic",
client: nil, // Unused in this test.
}
input := &types.TaskRequest{
CasInput: fakeDigest,
}
expect := &apipb.NewTaskRequest{
Priority: swarming.RECOMMENDED_PRIORITY,
TaskSlices: []*apipb.TaskSlice{
{
ExpirationSecs: int32(swarming.RECOMMENDED_EXPIRATION.Seconds()),
Properties: &apipb.TaskProperties{
CasInputRoot: &apipb.CASReference{
CasInstance: s.casInstance,
Digest: &apipb.Digest{
Hash: "abcd1234abcd1234abcd1234abcd1234abcd1234abcd1234abcd1234abcd1234",
SizeBytes: fakeDigestSize,
},
},
ExecutionTimeoutSecs: int32(swarming.RECOMMENDED_HARD_TIMEOUT.Seconds()),
IoTimeoutSecs: int32(swarming.RECOMMENDED_IO_TIMEOUT.Seconds()),
},
},
},
User: swarmingUser,
PubsubTopic: fmt.Sprintf(swarming.PUBSUB_FULLY_QUALIFIED_TOPIC_TMPL, common.PROJECT_ID, s.pubSubTopic),
}
actual, err := s.convertTaskRequest(input)
require.NoError(t, err)
require.Equal(t, expect, actual)
}
func TestConvertTaskRequest_Full(t *testing.T) {
s := &SwarmingV2TaskExecutor{
casInstance: "fake-cas-instance",
pubSubTopic: "fake-pubsub-topic",
client: nil, // Unused in this test.
}
input := &types.TaskRequest{
Caches: []*types.CacheRequest{
{
Name: "go",
Path: "/cache/go",
},
},
CasInput: fakeDigest,
CipdPackages: []*cipd.Package{
{
Name: "my-pkg",
Path: "/path/to/my-pkg",
Version: "version:1",
},
},
Command: []string{"echo", "hello world"},
Dimensions: []string{"os:Linux"},
Env: map[string]string{
"KEY1": "VALUE",
},
EnvPrefixes: map[string][]string{
"KEY2": {"VALUE1", "VALUE2"},
},
ExecutionTimeout: 5 * time.Minute,
Expiration: 10 * time.Minute,
Idempotent: true,
IoTimeout: 2 * time.Minute,
Name: "task-name",
Outputs: []string{"output"},
ServiceAccount: "my-service-account",
Tags: []string{"task-tag:123"},
TaskSchedulerTaskID: "ts-task-id",
}
expect := &apipb.NewTaskRequest{
Name: "task-name",
Priority: swarming.RECOMMENDED_PRIORITY,
TaskSlices: []*apipb.TaskSlice{
{
ExpirationSecs: int32((10 * time.Minute).Seconds()),
Properties: &apipb.TaskProperties{
Caches: []*apipb.CacheEntry{
{
Name: "go",
Path: "/cache/go",
},
},
CipdInput: &apipb.CipdInput{
Packages: []*apipb.CipdPackage{
{
PackageName: "my-pkg",
Version: "version:1",
Path: "/path/to/my-pkg",
},
},
},
Command: []string{"echo", "hello world"},
Dimensions: []*apipb.StringPair{
{Key: "os", Value: "Linux"},
},
Env: []*apipb.StringPair{
{Key: "KEY1", Value: "VALUE"},
},
EnvPrefixes: []*apipb.StringListPair{
{Key: "KEY2", Value: []string{"VALUE1", "VALUE2"}},
},
ExecutionTimeoutSecs: int32((5 * time.Minute).Seconds()),
Idempotent: true,
CasInputRoot: &apipb.CASReference{
CasInstance: s.casInstance,
Digest: &apipb.Digest{
Hash: "abcd1234abcd1234abcd1234abcd1234abcd1234abcd1234abcd1234abcd1234",
SizeBytes: fakeDigestSize,
},
},
IoTimeoutSecs: int32((2 * time.Minute).Seconds()),
Outputs: []string{"output"},
},
},
},
Tags: []string{"task-tag:123"},
User: swarmingUser,
ServiceAccount: "my-service-account",
PubsubTopic: fmt.Sprintf(swarming.PUBSUB_FULLY_QUALIFIED_TOPIC_TMPL, common.PROJECT_ID, s.pubSubTopic),
PubsubUserdata: "ts-task-id",
}
actual, err := s.convertTaskRequest(input)
require.NoError(t, err)
require.Equal(t, expect, actual)
}
func TestConvertTaskResult_Minimal(t *testing.T) {
input := &apipb.TaskResultResponse{}
expect := &types.TaskResult{
Status: types.TASK_STATUS_MISHAP,
Tags: map[string][]string{},
}
actual, err := convertTaskResult(input)
require.NoError(t, err)
require.Equal(t, expect, actual)
}
func TestConvertTaskResult_Full(t *testing.T) {
created := time.Unix(1715190575, 0).UTC()
started := created.Add(time.Minute)
finished := started.Add(time.Minute)
input := &apipb.TaskResultResponse{
TaskId: "task-id",
BotDimensions: []*apipb.StringListPair{
{Key: "os", Value: []string{"Linux", "Debian"}},
},
BotId: "bot-id",
CompletedTs: timestamppb.New(finished),
CreatedTs: timestamppb.New(created),
Duration: 60.0,
ExitCode: 1,
Failure: true,
CasOutputRoot: &apipb.CASReference{
CasInstance: "fake-cas-instance",
Digest: &apipb.Digest{
Hash: "abcd1234abcd1234abcd1234abcd1234abcd1234abcd1234abcd1234abcd1234",
SizeBytes: fakeDigestSize,
},
},
StartedTs: timestamppb.New(started),
State: apipb.TaskState_COMPLETED,
Name: "task-name",
Tags: []string{"key:value1", "key:value2"},
}
expect := &types.TaskResult{
CasOutput: fakeDigest,
Created: created,
Finished: finished,
ID: "task-id",
MachineID: "bot-id",
Started: started,
Status: types.TASK_STATUS_FAILURE,
Tags: map[string][]string{
"key": {"value1", "value2"},
},
}
actual, err := convertTaskResult(input)
require.NoError(t, err)
require.Equal(t, expect, actual)
}
func TestConvertTaskStatus_CombinesWithFailureToProduceTaskStatus(t *testing.T) {
check := func(state apipb.TaskState, failure bool, expect types.TaskStatus) {
result, err := convertTaskStatus(state, failure)
require.NoError(t, err)
require.Equal(t, expect, result)
}
check(apipb.TaskState_BOT_DIED, false, types.TASK_STATUS_MISHAP)
check(apipb.TaskState_BOT_DIED, true, types.TASK_STATUS_MISHAP)
check(apipb.TaskState_CANCELED, false, types.TASK_STATUS_MISHAP)
check(apipb.TaskState_CANCELED, true, types.TASK_STATUS_MISHAP)
check(apipb.TaskState_CLIENT_ERROR, false, types.TASK_STATUS_MISHAP)
check(apipb.TaskState_CLIENT_ERROR, true, types.TASK_STATUS_MISHAP)
check(apipb.TaskState_EXPIRED, false, types.TASK_STATUS_MISHAP)
check(apipb.TaskState_EXPIRED, true, types.TASK_STATUS_MISHAP)
check(apipb.TaskState_NO_RESOURCE, false, types.TASK_STATUS_MISHAP)
check(apipb.TaskState_NO_RESOURCE, true, types.TASK_STATUS_MISHAP)
check(apipb.TaskState_TIMED_OUT, false, types.TASK_STATUS_MISHAP)
check(apipb.TaskState_TIMED_OUT, true, types.TASK_STATUS_MISHAP)
check(apipb.TaskState_KILLED, false, types.TASK_STATUS_MISHAP)
check(apipb.TaskState_KILLED, true, types.TASK_STATUS_MISHAP)
check(apipb.TaskState_INVALID, false, types.TASK_STATUS_MISHAP)
check(apipb.TaskState_INVALID, true, types.TASK_STATUS_MISHAP)
check(apipb.TaskState_PENDING, false, types.TASK_STATUS_PENDING)
check(apipb.TaskState_PENDING, true, types.TASK_STATUS_PENDING)
check(apipb.TaskState_RUNNING, false, types.TASK_STATUS_RUNNING)
check(apipb.TaskState_RUNNING, true, types.TASK_STATUS_RUNNING)
check(apipb.TaskState_COMPLETED, false, types.TASK_STATUS_SUCCESS)
check(apipb.TaskState_COMPLETED, true, types.TASK_STATUS_FAILURE)
}
func TestConvertMachine_Minimal(t *testing.T) {
bot := &apipb.BotInfo{}
machine := &types.Machine{
Dimensions: []string{},
}
require.Equal(t, machine, convertMachine(bot))
}
func TestConvertMachine_Full(t *testing.T) {
bot := &apipb.BotInfo{
BotId: "bot-id",
Dimensions: []*apipb.StringListPair{
{
Key: "os",
Value: []string{"Linux"},
},
// Ensure that we handle multiple entries with the same key.
{
Key: "os",
Value: []string{"Debian", "Debian-13"},
},
{
Key: "gpu",
Value: []string{"none"},
},
},
IsDead: true,
Quarantined: true,
TaskId: "task-id",
}
machine := &types.Machine{
ID: "bot-id",
Dimensions: []string{
"gpu:none",
"os:Debian",
"os:Debian-13",
"os:Linux",
},
IsDead: true,
IsQuarantined: true,
CurrentTaskID: "task-id",
}
require.Equal(t, machine, convertMachine(bot))
}