blob: be3f6f7bbcfd1d92d017a530337fca3bbe3c0ee9 [file] [log] [blame]
/*
The Cluster Telemetry poller checks for new pending tasks by polling the Cluster Telemetry
frontend. Pending tasks are picked up according to the order they were added to CTFE.
When picked up, tasks are immediately executed. There could be multiple tasks running at the
same time.
*/
package main
import (
"bytes"
"context"
"flag"
"fmt"
"io/ioutil"
"os"
"os/user"
"path/filepath"
"strconv"
"strings"
"sync"
"time"
"go.skia.org/infra/ct/go/ct_autoscaler"
"go.skia.org/infra/ct/go/ctfe/admin_tasks"
"go.skia.org/infra/ct/go/ctfe/capture_skps"
"go.skia.org/infra/ct/go/ctfe/chromium_analysis"
"go.skia.org/infra/ct/go/ctfe/chromium_builds"
"go.skia.org/infra/ct/go/ctfe/chromium_perf"
"go.skia.org/infra/ct/go/ctfe/lua_scripts"
"go.skia.org/infra/ct/go/ctfe/metrics_analysis"
"go.skia.org/infra/ct/go/ctfe/pixel_diff"
"go.skia.org/infra/ct/go/ctfe/task_common"
"go.skia.org/infra/ct/go/frontend"
"go.skia.org/infra/ct/go/master_scripts/master_common"
ctutil "go.skia.org/infra/ct/go/util"
"go.skia.org/infra/go/auth"
"go.skia.org/infra/go/exec"
"go.skia.org/infra/go/gitauth"
"go.skia.org/infra/go/metrics2"
"go.skia.org/infra/go/sklog"
skutil "go.skia.org/infra/go/util"
)
// flags
var (
promPort = flag.String("prom_port", ":20000", "Metrics service address (e.g., ':20000')")
pollInterval = flag.Duration("poll_interval", 30*time.Second, "How often to poll CTFE for new pending tasks.")
serviceAccount = flag.String("service_account", "", "Should be set when running in K8s.")
// Map that holds all picked up tasks. Used to ensure same task is not picked up more than once.
pickedUpTasks = map[string]string{}
// Mutex that controls access to the above map.
tasksMtx = sync.Mutex{}
)
type GetPatchFunc func(patchId string) (string, error)
// Specifies the methods that poll requires for each type of task.
type Task interface {
GetTaskName() string
GetCommonCols() *task_common.CommonCols
// Writes any files required by the task and then uses exec.Run to run the task command.
Execute(ctx context.Context, getPatchFunc GetPatchFunc) error
// Returns the corresponding UpdateTaskVars instance of this Task. The
// returned instance is not populated.
GetUpdateTaskVars() task_common.UpdateTaskVars
// Whether the task needs to run on GCE workers.
RunsOnGCEWorkers() bool
}
// Generates a hopefully-unique ID for this execution of this task.
func runId(task Task) string {
return strings.SplitN(task.GetCommonCols().Username, "@", 2)[0] + "-" + ctutil.GetCurrentTs()
}
func executeAndPrintTaskOutput(ctx context.Context, taskName, runId string, args []string) error {
var b bytes.Buffer
if _, err := b.WriteString(fmt.Sprintf("========== Start of stdout and stderr for %s %s ==========\n", taskName, runId)); err != nil {
return fmt.Errorf("Error writing to output buffer: %s", err)
}
if taskErr := exec.Run(ctx, &exec.Command{
Name: taskName,
Args: args,
LogStdout: false,
LogStderr: false,
Stdout: &b,
Stderr: &b,
}); taskErr != nil {
output, getErr := getTaskOutput(b, taskName, runId)
skutil.LogErr(getErr)
fmt.Println(output)
return fmt.Errorf("%s failed with: %s", taskName, taskErr)
}
output, err := getTaskOutput(b, taskName, runId)
if err != nil {
return fmt.Errorf("Could not get output: %s", err)
}
// Print the output and return.
fmt.Println(output)
return nil
}
func getTaskOutput(b bytes.Buffer, taskName, runId string) (string, error) {
if _, err := b.WriteString(fmt.Sprintf("========== End of stdout and stderr for %s %s ==========\n", taskName, runId)); err != nil {
return "", fmt.Errorf("Error writing to output buffer: %s", err)
}
return b.String(), nil
}
// Define frontend.ChromiumAnalysisDatastoreTask here so we can add methods.
type ChromiumAnalysisTask struct {
chromium_analysis.DatastoreTask
}
func (task *ChromiumAnalysisTask) Execute(ctx context.Context, getPatchFunc GetPatchFunc) error {
runId := runId(task)
for fileSuffix, patchGSPath := range map[string]string{
".chromium.patch": task.ChromiumPatchGSPath,
".skia.patch": task.SkiaPatchGSPath,
".v8.patch": task.V8PatchGSPath,
".catapult.patch": task.CatapultPatchGSPath,
".custom_webpages.csv": task.CustomWebpagesGSPath,
} {
patch, err := getPatchFunc(patchGSPath)
if err != nil {
return err
}
// Add an extra newline at the end because git sometimes rejects patches due to
// missing newlines.
patch = patch + "\n"
patchPath := filepath.Join(os.TempDir(), runId+fileSuffix)
if err := ioutil.WriteFile(patchPath, []byte(patch), 0666); err != nil {
return err
}
defer skutil.Remove(patchPath)
}
emails := []string{task.Username}
emails = append(emails, task.CCList...)
args := []string{
"--emails=" + strings.Join(emails, ","),
"--description=" + task.Description,
"--task_id=" + strconv.FormatInt(task.DatastoreKey.ID, 10),
"--pageset_type=" + task.PageSets,
"--benchmark_name=" + task.Benchmark,
"--benchmark_extra_args=" + task.BenchmarkArgs,
"--browser_extra_args=" + task.BrowserArgs,
"--run_in_parallel=" + strconv.FormatBool(task.RunInParallel),
"--target_platform=" + task.Platform,
"--run_on_gce=" + strconv.FormatBool(task.RunsOnGCEWorkers()),
"--match_stdout_txt=" + task.MatchStdoutTxt,
"--run_id=" + runId,
"--logtostderr",
"--email_client_secret_file=" + *master_common.EmailClientSecretFile,
"--email_token_cache_file=" + *master_common.EmailTokenCacheFile,
"--service_account_file=" + *master_common.ServiceAccountFile,
"--task_priority=" + strconv.Itoa(task.TaskPriority),
"--group_name=" + task.GroupName,
fmt.Sprintf("--local=%t", *master_common.Local),
}
return executeAndPrintTaskOutput(ctx, "run_chromium_analysis_on_workers", runId, args)
}
// Define frontend.ChromiumPerfDatastoreTask here so we can add methods.
type ChromiumPerfTask struct {
chromium_perf.DatastoreTask
}
func (task *ChromiumPerfTask) Execute(ctx context.Context, getPatchFunc GetPatchFunc) error {
runId := runId(task)
// TODO(benjaminwagner): Since run_chromium_perf_on_workers only reads these in order to
// upload to Google Storage, eventually we should move the upload step here to avoid writing
// to disk.
for fileSuffix, patchGSPath := range map[string]string{
".chromium.patch": task.ChromiumPatchGSPath,
".skia.patch": task.SkiaPatchGSPath,
".v8.patch": task.V8PatchGSPath,
".catapult.patch": task.CatapultPatchGSPath,
".custom_webpages.csv": task.CustomWebpagesGSPath,
} {
patch, err := getPatchFunc(patchGSPath)
if err != nil {
return err
}
// Add an extra newline at the end because git sometimes rejects patches due to
// missing newlines.
patch = patch + "\n"
patchPath := filepath.Join(os.TempDir(), runId+fileSuffix)
if err := ioutil.WriteFile(patchPath, []byte(patch), 0666); err != nil {
return err
}
defer skutil.Remove(patchPath)
}
emails := []string{task.Username}
emails = append(emails, task.CCList...)
args := []string{
"--emails=" + strings.Join(emails, ","),
"--description=" + task.Description,
"--task_id=" + strconv.FormatInt(task.DatastoreKey.ID, 10),
"--pageset_type=" + task.PageSets,
"--benchmark_name=" + task.Benchmark,
"--benchmark_extra_args=" + task.BenchmarkArgs,
"--browser_extra_args_nopatch=" + task.BrowserArgsNoPatch,
"--browser_extra_args_withpatch=" + task.BrowserArgsWithPatch,
"--repeat_benchmark=" + strconv.FormatInt(task.RepeatRuns, 10),
"--run_in_parallel=" + strconv.FormatBool(task.RunInParallel),
"--target_platform=" + task.Platform,
"--run_on_gce=" + strconv.FormatBool(task.RunsOnGCEWorkers()),
"--run_id=" + runId,
"--logtostderr",
"--email_client_secret_file=" + *master_common.EmailClientSecretFile,
"--email_token_cache_file=" + *master_common.EmailTokenCacheFile,
"--service_account_file=" + *master_common.ServiceAccountFile,
"--task_priority=" + strconv.Itoa(task.TaskPriority),
"--group_name=" + task.GroupName,
fmt.Sprintf("--local=%t", *master_common.Local),
}
return executeAndPrintTaskOutput(ctx, "run_chromium_perf_on_workers", runId, args)
}
// Define frontend.MetricsAnalysisDatastoreTask here so we can add methods.
type MetricsAnalysisTask struct {
metrics_analysis.DatastoreTask
}
func (task *MetricsAnalysisTask) Execute(ctx context.Context, getPatchFunc GetPatchFunc) error {
runId := runId(task)
for fileSuffix, patchGSPath := range map[string]string{
".chromium.patch": task.ChromiumPatchGSPath,
".catapult.patch": task.CatapultPatchGSPath,
".traces.csv": task.CustomTracesGSPath,
} {
patch, err := getPatchFunc(patchGSPath)
if err != nil {
return err
}
// Add an extra newline at the end because git sometimes rejects patches due to
// missing newlines.
patch = patch + "\n"
patchPath := filepath.Join(os.TempDir(), runId+fileSuffix)
if err := ioutil.WriteFile(patchPath, []byte(patch), 0666); err != nil {
return err
}
defer skutil.Remove(patchPath)
}
emails := []string{task.Username}
emails = append(emails, task.CCList...)
args := []string{
"--emails=" + strings.Join(emails, ","),
"--description=" + task.Description,
"--task_id=" + strconv.FormatInt(task.DatastoreKey.ID, 10),
"--metric_name=" + task.MetricName,
"--analysis_output_link=" + task.AnalysisOutputLink,
"--benchmark_extra_args=" + task.BenchmarkArgs,
"--value_column_name=" + task.ValueColumnName,
"--run_id=" + runId,
"--logtostderr",
"--email_client_secret_file=" + *master_common.EmailClientSecretFile,
"--email_token_cache_file=" + *master_common.EmailTokenCacheFile,
"--service_account_file=" + *master_common.ServiceAccountFile,
"--task_priority=" + strconv.Itoa(task.TaskPriority),
fmt.Sprintf("--local=%t", *master_common.Local),
}
return executeAndPrintTaskOutput(ctx, "metrics_analysis_on_workers", runId, args)
}
// Define frontend.PixelDiffDatastoreTask here so we can add methods.
type PixelDiffTask struct {
pixel_diff.DatastoreTask
}
func (task *PixelDiffTask) Execute(ctx context.Context, getPatchFunc GetPatchFunc) error {
runId := runId(task)
for fileSuffix, patchGSPath := range map[string]string{
".chromium.patch": task.ChromiumPatchGSPath,
".skia.patch": task.SkiaPatchGSPath,
".custom_webpages.csv": task.CustomWebpagesGSPath,
} {
patch, err := getPatchFunc(patchGSPath)
if err != nil {
return err
}
// Add an extra newline at the end because git sometimes rejects patches due to
// missing newlines.
patch = patch + "\n"
patchPath := filepath.Join(os.TempDir(), runId+fileSuffix)
if err := ioutil.WriteFile(patchPath, []byte(patch), 0666); err != nil {
return err
}
defer skutil.Remove(patchPath)
}
args := []string{
"--emails=" + task.Username,
"--description=" + task.Description,
"--task_id=" + strconv.FormatInt(task.DatastoreKey.ID, 10),
"--pageset_type=" + task.PageSets,
"--benchmark_extra_args=" + task.BenchmarkArgs,
"--browser_extra_args_nopatch=" + task.BrowserArgsNoPatch,
"--browser_extra_args_withpatch=" + task.BrowserArgsWithPatch,
"--run_on_gce=" + strconv.FormatBool(task.RunsOnGCEWorkers()),
"--run_id=" + runId,
"--logtostderr",
"--email_client_secret_file=" + *master_common.EmailClientSecretFile,
"--email_token_cache_file=" + *master_common.EmailTokenCacheFile,
"--service_account_file=" + *master_common.ServiceAccountFile,
fmt.Sprintf("--local=%t", *master_common.Local),
}
return executeAndPrintTaskOutput(ctx, "pixel_diff_on_workers", runId, args)
}
// Define frontend.CaptureSkpsDatastoreTask here so we can add methods.
type CaptureSkpsTask struct {
capture_skps.DatastoreTask
}
func (task *CaptureSkpsTask) Execute(ctx context.Context, getPatchFunc GetPatchFunc) error {
runId := runId(task)
chromiumBuildDir := ctutil.ChromiumBuildDir(task.ChromiumRev, task.SkiaRev, "")
args := []string{
"--emails=" + task.Username,
"--description=" + task.Description,
"--task_id=" + strconv.FormatInt(task.DatastoreKey.ID, 10),
"--pageset_type=" + task.PageSets,
"--chromium_build=" + chromiumBuildDir,
"--target_platform=Linux",
"--run_on_gce=" + strconv.FormatBool(task.RunsOnGCEWorkers()),
"--run_id=" + runId,
"--logtostderr",
"--email_client_secret_file=" + *master_common.EmailClientSecretFile,
"--email_token_cache_file=" + *master_common.EmailTokenCacheFile,
"--service_account_file=" + *master_common.ServiceAccountFile,
fmt.Sprintf("--local=%t", *master_common.Local),
}
return executeAndPrintTaskOutput(ctx, "capture_skps_on_workers", runId, args)
}
// Define frontend.LuaScriptDatastoreTask here so we can add methods.
type LuaScriptTask struct {
lua_scripts.DatastoreTask
}
func (task *LuaScriptTask) Execute(ctx context.Context, getPatchFunc GetPatchFunc) error {
runId := runId(task)
chromiumBuildDir := ctutil.ChromiumBuildDir(task.ChromiumRev, task.SkiaRev, "")
// TODO(benjaminwagner): Since run_lua_on_workers only reads the lua script in order to
// upload to Google Storage, eventually we should move the upload step here to avoid writing
// to disk. Not sure if we can/should do the same for the aggregator script.
luaScriptName := runId + ".lua"
luaScriptPath := filepath.Join(os.TempDir(), luaScriptName)
if err := ioutil.WriteFile(luaScriptPath, []byte(task.LuaScript), 0666); err != nil {
return err
}
defer skutil.Remove(luaScriptPath)
if task.LuaAggregatorScript != "" {
luaAggregatorName := runId + ".aggregator"
luaAggregatorPath := filepath.Join(os.TempDir(), luaAggregatorName)
if err := ioutil.WriteFile(luaAggregatorPath, []byte(task.LuaAggregatorScript), 0666); err != nil {
return err
}
defer skutil.Remove(luaAggregatorPath)
}
args := []string{
"--emails=" + task.Username,
"--description=" + task.Description,
"--task_id=" + strconv.FormatInt(task.DatastoreKey.ID, 10),
"--pageset_type=" + task.PageSets,
"--chromium_build=" + chromiumBuildDir,
"--run_on_gce=" + strconv.FormatBool(task.RunsOnGCEWorkers()),
"--run_id=" + runId,
"--logtostderr",
"--email_client_secret_file=" + *master_common.EmailClientSecretFile,
"--email_token_cache_file=" + *master_common.EmailTokenCacheFile,
"--service_account_file=" + *master_common.ServiceAccountFile,
fmt.Sprintf("--local=%t", *master_common.Local),
}
return executeAndPrintTaskOutput(ctx, "run_lua_on_workers", runId, args)
}
// Define frontend.ChromiumBuildDatastoreTask here so we can add methods.
type ChromiumBuildTask struct {
chromium_builds.DatastoreTask
}
func (task *ChromiumBuildTask) Execute(ctx context.Context, getPatchFunc GetPatchFunc) error {
runId := runId(task)
// We do not pass --run_on_gce to the below because build tasks always run
// on GCE builders not GCE workers or bare-metal machines.
args := []string{
"--emails=" + task.Username,
"--task_id=" + strconv.FormatInt(task.DatastoreKey.ID, 10),
"--run_id=" + runId,
"--target_platform=Linux",
"--chromium_hash=" + task.ChromiumRev,
"--skia_hash=" + task.SkiaRev,
"--logtostderr",
"--email_client_secret_file=" + *master_common.EmailClientSecretFile,
"--email_token_cache_file=" + *master_common.EmailTokenCacheFile,
"--service_account_file=" + *master_common.ServiceAccountFile,
fmt.Sprintf("--local=%t", *master_common.Local),
}
return executeAndPrintTaskOutput(ctx, "build_chromium", runId, args)
}
// Define frontend.RecreatePageSetsDatastoreTask here so we can add methods.
type RecreatePageSetsTask struct {
admin_tasks.RecreatePageSetsDatastoreTask
}
func (task *RecreatePageSetsTask) Execute(ctx context.Context, getPatchFunc GetPatchFunc) error {
runId := runId(task)
args := []string{
"--emails=" + task.Username,
"--task_id=" + strconv.FormatInt(task.DatastoreKey.ID, 10),
"--run_on_gce=" + strconv.FormatBool(task.RunsOnGCEWorkers()),
"--run_id=" + runId,
"--pageset_type=" + task.PageSets,
"--logtostderr",
"--email_client_secret_file=" + *master_common.EmailClientSecretFile,
"--email_token_cache_file=" + *master_common.EmailTokenCacheFile,
"--service_account_file=" + *master_common.ServiceAccountFile,
fmt.Sprintf("--local=%t", *master_common.Local),
}
return executeAndPrintTaskOutput(ctx, "create_pagesets_on_workers", runId, args)
}
// Define frontend.RecreateWebpageArchivesDatastoreTask here so we can add methods.
type RecreateWebpageArchivesTask struct {
admin_tasks.RecreateWebpageArchivesDatastoreTask
}
func (task *RecreateWebpageArchivesTask) Execute(ctx context.Context, getPatchFunc GetPatchFunc) error {
runId := runId(task)
args := []string{
"--emails=" + task.Username,
"--task_id=" + strconv.FormatInt(task.DatastoreKey.ID, 10),
"--run_on_gce=" + strconv.FormatBool(task.RunsOnGCEWorkers()),
"--run_id=" + runId,
"--pageset_type=" + task.PageSets,
"--logtostderr",
"--email_client_secret_file=" + *master_common.EmailClientSecretFile,
"--email_token_cache_file=" + *master_common.EmailTokenCacheFile,
"--service_account_file=" + *master_common.ServiceAccountFile,
fmt.Sprintf("--local=%t", *master_common.Local),
}
return executeAndPrintTaskOutput(ctx, "capture_archives_on_workers", runId, args)
}
// Returns a poller Task containing the given task_common.Task, or nil if otherTask is nil.
func asPollerTask(ctx context.Context, otherTask task_common.Task) Task {
if otherTask == nil {
return nil
}
switch t := otherTask.(type) {
case *admin_tasks.RecreatePageSetsDatastoreTask:
return &RecreatePageSetsTask{RecreatePageSetsDatastoreTask: *t}
case *admin_tasks.RecreateWebpageArchivesDatastoreTask:
return &RecreateWebpageArchivesTask{RecreateWebpageArchivesDatastoreTask: *t}
case *capture_skps.DatastoreTask:
return &CaptureSkpsTask{DatastoreTask: *t}
case *chromium_analysis.DatastoreTask:
return &ChromiumAnalysisTask{DatastoreTask: *t}
case *chromium_builds.DatastoreTask:
return &ChromiumBuildTask{DatastoreTask: *t}
case *chromium_perf.DatastoreTask:
return &ChromiumPerfTask{DatastoreTask: *t}
case *lua_scripts.DatastoreTask:
return &LuaScriptTask{DatastoreTask: *t}
case *metrics_analysis.DatastoreTask:
return &MetricsAnalysisTask{DatastoreTask: *t}
case *pixel_diff.DatastoreTask:
return &PixelDiffTask{DatastoreTask: *t}
default:
sklog.Errorf("Missing case for %T in asPollerTask", otherTask)
return nil
}
}
// Notifies the frontend that task failed.
func updateWebappTaskSetFailed(task Task) error {
updateVars := task.GetUpdateTaskVars()
updateVars.GetUpdateTaskCommonVars().Id = task.GetCommonCols().DatastoreKey.ID
updateVars.GetUpdateTaskCommonVars().SetCompleted(false)
return frontend.UpdateWebappTaskV2(updateVars)
}
// pollAndExecOnce looks for the oldest pending task in CTFE. If one is found, then
// the local checkout is synced and built, and the picked up task is started in a
// go routine. The function returns without waiting for the task to finish and the
// WaitGroup of the goroutine is returned to the caller. The caller can then call
// wg.Wait() if they would like to wait for the task to finish.
func pollAndExecOnce(ctx context.Context, autoscaler ct_autoscaler.ICTAutoscaler, getPatchFunc GetPatchFunc) *sync.WaitGroup {
pending, err := frontend.GetOldestPendingTaskV2()
var wg sync.WaitGroup
if err != nil {
sklog.Error(err)
return &wg
}
task := asPollerTask(ctx, pending)
if task == nil {
return &wg
}
taskId := fmt.Sprintf("%s.%d", task.GetTaskName(), task.GetCommonCols().DatastoreKey.ID)
tasksMtx.Lock()
_, exists := pickedUpTasks[taskId]
tasksMtx.Unlock()
if exists {
return &wg
}
tasksMtx.Lock()
pickedUpTasks[taskId] = "1"
tasksMtx.Unlock()
if task.RunsOnGCEWorkers() {
if err := autoscaler.RegisterGCETask(taskId); err != nil {
sklog.Errorf("Error when registering GCE task in CT autoscaler: %s", err)
return &wg
}
}
sklog.Infof("Executing task %s", taskId)
// Increment the WaitGroup counter.
wg.Add(1)
go func() {
// Decrement the counter when the goroutine completes.
defer wg.Done()
if err = task.Execute(ctx, getPatchFunc); err == nil {
sklog.Infof("Completed task %s", taskId)
} else {
sklog.Errorf("Task %s failed: %s", taskId, err)
if err := updateWebappTaskSetFailed(task); err != nil {
sklog.Error(err)
}
}
tasksMtx.Lock()
delete(pickedUpTasks, taskId)
tasksMtx.Unlock()
if task.RunsOnGCEWorkers() {
if err := autoscaler.UnregisterGCETask(taskId); err != nil {
sklog.Errorf("Error when unregistering GCE task in CT autoscaler: %s", err)
}
}
}()
// Return the WaitGroup to allow some callers to call wg.Wait()
return &wg
}
func main() {
master_common.InitWithMetrics2("ct-poller", promPort)
autoscaler, err := ct_autoscaler.NewCTAutoscaler(*master_common.Local)
if err != nil {
sklog.Fatalf("Could not instantiate the CT autoscaler: %s", err)
}
healthyGauge := metrics2.GetInt64Metric("healthy")
// Terminate all tasks which were in running state when the poller was restarted.
// See skbug.com/7062.
if err := frontend.TerminateRunningTasks(); err != nil {
sklog.Fatalf("Could not terminate running tasks: %s", err)
}
if !*master_common.Local {
user, err := user.Current()
if err != nil {
sklog.Fatal(err)
}
// Create token source.
ts, err := auth.NewDefaultTokenSource(*master_common.Local, auth.SCOPE_USERINFO_EMAIL, auth.SCOPE_GERRIT)
if err != nil {
sklog.Fatalf("Problem setting up default token source: %s", err)
}
// Use the gitcookie created by gitauth package if .gitcookies does not already exist.
gitcookiesPath := filepath.Join(user.HomeDir, ".gitcookies")
if _, err := gitauth.New(ts, gitcookiesPath, true, *serviceAccount); err != nil {
sklog.Fatalf("Failed to create git cookie updater: %s", err)
}
}
// Run immediately, since pollTick will not fire until after pollInterval.
ctx := context.Background()
pollAndExecOnce(ctx, autoscaler, ctutil.GetPatchFromStorage)
for range time.Tick(*pollInterval) {
healthyGauge.Update(1)
pollAndExecOnce(ctx, autoscaler, ctutil.GetPatchFromStorage)
// Sleeping for a second to avoid the small probability of ending up
// with 2 tasks with the same runID. For context see
// https://skia-review.googlesource.com/c/26941/8/ct/go/poller/main.go#96
time.Sleep(time.Second)
}
}