| package main |
| |
| /* |
| This is a server which collects and serves information about Task Drivers. |
| */ |
| |
| import ( |
| "context" |
| "encoding/json" |
| "flag" |
| "fmt" |
| "html/template" |
| "net/http" |
| "path/filepath" |
| "strings" |
| "time" |
| |
| "cloud.google.com/go/bigtable" |
| "cloud.google.com/go/pubsub" |
| "github.com/gorilla/mux" |
| "go.skia.org/infra/go/auth" |
| "go.skia.org/infra/go/common" |
| "go.skia.org/infra/go/httputils" |
| "go.skia.org/infra/go/login" |
| "go.skia.org/infra/go/sklog" |
| "go.skia.org/infra/task_driver/go/db" |
| bigtable_db "go.skia.org/infra/task_driver/go/db/bigtable" |
| "go.skia.org/infra/task_driver/go/display" |
| "go.skia.org/infra/task_driver/go/handlers" |
| "go.skia.org/infra/task_driver/go/logs" |
| "go.skia.org/infra/task_driver/go/td" |
| ) |
| |
| const ( |
| subscriptionName = "td_server_log_collector" |
| ) |
| |
| var ( |
| // Flags. |
| btInstance = flag.String("bigtable_instance", "", "BigTable instance to use.") |
| btProject = flag.String("bigtable_project", "", "GCE project to use for BigTable.") |
| host = flag.String("host", "localhost", "HTTP service host") |
| local = flag.Bool("local", false, "Running locally if true. As opposed to in production.") |
| port = flag.String("port", ":8000", "HTTP service port (e.g., ':8000')") |
| project = flag.String("project_id", "", "GCE Project ID") |
| promPort = flag.String("prom_port", ":20000", "Metrics service address (e.g., ':10110')") |
| resourcesDir = flag.String("resources_dir", "./dist", "The directory to find templates, JS, and CSS files. If blank the \"dist\" subdirectory of the current directory will be used.") |
| hang = flag.Bool("hang", false, "hang") |
| |
| // Database used for storing and retrieving Task Drivers. |
| d db.DB |
| |
| // BigTable connector used for storing and retrieving logs. |
| lm *logs.LogsManager |
| |
| // HTML templates. |
| tdTemplate *template.Template = nil |
| ) |
| |
| // logsHandler reads log entries from BigTable and writes them to the ResponseWriter. |
| func logsHandler(w http.ResponseWriter, r *http.Request, taskId, stepId, logId string) { |
| // TODO(borenet): If we had access to the Task Driver DB, we could first |
| // retrieve the run and then limit our search to its duration. That |
| // might speed up the search quite a bit. |
| w.Header().Set("Content-Type", "text/plain") |
| entries, err := lm.Search(taskId, stepId, logId) |
| if err != nil { |
| httputils.ReportError(w, err, "Failed to search log entries.", http.StatusInternalServerError) |
| return |
| } |
| if len(entries) == 0 { |
| // TODO(borenet): Maybe an empty log is not the same as a |
| // missing log? |
| http.Error(w, fmt.Sprintf("No matching log entries were found."), http.StatusNotFound) |
| return |
| } |
| for _, e := range entries { |
| line := e.TextPayload |
| if !strings.HasSuffix(line, "\n") { |
| line += "\n" |
| } |
| if _, err := w.Write([]byte(line)); err != nil { |
| httputils.ReportError(w, err, "Failed to write response.", http.StatusInternalServerError) |
| return |
| } |
| } |
| } |
| |
| // getVar returns the variable which should be present in the request path. |
| // It returns "" if it is not found, in which case it also writes an error to |
| // the ResponseWriter. |
| func getVar(w http.ResponseWriter, r *http.Request, key string) string { |
| val, ok := mux.Vars(r)[key] |
| if !ok { |
| http.Error(w, fmt.Sprintf("No %s in request path.", key), http.StatusBadRequest) |
| return "" |
| } |
| return val |
| } |
| |
| // taskLogsHandler is a handler which serves logs for a given task. |
| func taskLogsHandler(w http.ResponseWriter, r *http.Request) { |
| taskId := getVar(w, r, "taskId") |
| if taskId == "" { |
| return |
| } |
| logsHandler(w, r, taskId, "", "") |
| } |
| |
| // stepLogsHandler is a handler which serves logs for a given step. |
| func stepLogsHandler(w http.ResponseWriter, r *http.Request) { |
| taskId := getVar(w, r, "taskId") |
| stepId := getVar(w, r, "stepId") |
| if taskId == "" || stepId == "" { |
| return |
| } |
| logsHandler(w, r, taskId, stepId, "") |
| } |
| |
| // singleLogHandler is a handler which serves logs for a single log ID. |
| func singleLogHandler(w http.ResponseWriter, r *http.Request) { |
| taskId := getVar(w, r, "taskId") |
| stepId := getVar(w, r, "stepId") |
| logId := getVar(w, r, "logId") |
| if taskId == "" || stepId == "" || logId == "" { |
| return |
| } |
| logsHandler(w, r, taskId, stepId, logId) |
| } |
| |
| // getTaskDriver returns a db.TaskDriverRun instance for the given request. If |
| // anything went wrong, returns nil and writes an error to the ResponseWriter. |
| func getTaskDriver(w http.ResponseWriter, r *http.Request) *db.TaskDriverRun { |
| id := getVar(w, r, "taskId") |
| if id == "" { |
| return nil |
| } |
| td, err := d.GetTaskDriver(id) |
| if err != nil { |
| httputils.ReportError(w, err, "Failed to retrieve task driver.", http.StatusInternalServerError) |
| return nil |
| } |
| if td == nil { |
| // The requested task driver doesn't exist. This is a temporary |
| // measure while some tasks are running task drivers and others |
| // are not: if the client provided a redirect URL, redirect the |
| // client, otherwise give a 404. |
| if redirect := r.FormValue("ifNotFound"); redirect != "" { |
| http.Redirect(w, r, redirect, http.StatusSeeOther) |
| return nil |
| } |
| http.Error(w, "No task driver exists with the given ID.", http.StatusNotFound) |
| return nil |
| } |
| return td |
| } |
| |
| // getTaskDriverDisplay returns a display.TaskDriverRunDisplay instance for the |
| // given request. If anything went wrong, returns nil and writes an error to the |
| // ResponseWriter. |
| func getTaskDriverDisplay(w http.ResponseWriter, r *http.Request) *display.TaskDriverRunDisplay { |
| td := getTaskDriver(w, r) |
| if td == nil { |
| // Any error was handled by getTaskDriver. |
| return nil |
| } |
| disp, err := display.TaskDriverForDisplay(td) |
| if err != nil { |
| httputils.ReportError(w, err, "Failed to format task driver for response.", http.StatusInternalServerError) |
| return nil |
| } |
| return disp |
| } |
| |
| // taskDriverHandler handles requests for an individual Task Driver. |
| func taskDriverHandler(w http.ResponseWriter, r *http.Request) { |
| disp := getTaskDriverDisplay(w, r) |
| if disp == nil { |
| // Any error was handled by getTaskDriverDisplay. |
| return |
| } |
| |
| if *local { |
| // reload during local development |
| loadTemplates() |
| } |
| b, err := json.Marshal(disp) |
| if err != nil { |
| httputils.ReportError(w, err, "Failed to encode response.", http.StatusInternalServerError) |
| return |
| } |
| page := struct { |
| TaskName string |
| TaskJson string |
| }{ |
| TaskName: disp.Name, |
| TaskJson: string(b), |
| } |
| w.Header().Set("Content-Type", "text/html") |
| if err := tdTemplate.Execute(w, page); err != nil { |
| httputils.ReportError(w, err, "Server could not load page", http.StatusInternalServerError) |
| return |
| } |
| } |
| |
| // jsonTaskDriverHandler returns the JSON representation of the requested Task Driver. |
| func jsonTaskDriverHandler(w http.ResponseWriter, r *http.Request) { |
| disp := getTaskDriverDisplay(w, r) |
| if disp == nil { |
| // Any error was handled by getTaskDriverDisplay. |
| return |
| } |
| |
| if err := json.NewEncoder(w).Encode(disp); err != nil { |
| httputils.ReportError(w, err, "Failed to encode response.", http.StatusInternalServerError) |
| return |
| } |
| } |
| |
| // Load the HTML pages. |
| func loadTemplates() { |
| tdTemplate = template.Must(template.ParseFiles( |
| filepath.Join(*resourcesDir, "task-driver-index.html"), |
| )) |
| } |
| |
| // Run the web server. |
| func runServer(ctx context.Context, serverURL string) { |
| loadTemplates() |
| r := mux.NewRouter() |
| r.HandleFunc("/td/{taskId}", taskDriverHandler) |
| r.PathPrefix("/dist/").Handler(http.StripPrefix("/dist/", http.HandlerFunc(httputils.MakeResourceHandler(*resourcesDir)))) |
| r.HandleFunc("/oauth2callback/", login.OAuth2CallbackHandler) |
| r.HandleFunc("/logout/", login.LogoutHandler) |
| r.HandleFunc("/loginstatus/", login.StatusHandler) |
| handlers.AddTaskDriverHandlers(r, d, lm) |
| h := httputils.LoggingGzipRequestResponse(r) |
| h = httputils.XFrameOptionsDeny(h) |
| if !*local { |
| h = httputils.HealthzAndHTTPS(h) |
| } |
| http.Handle("/", h) |
| sklog.Infof("Ready to serve on %s", serverURL) |
| sklog.Fatal(http.ListenAndServe(*port, nil)) |
| } |
| |
| // handleMessage decodes and inserts an update |
| func handleMessage(msg *pubsub.Message) error { |
| var e logs.Entry |
| if err := json.Unmarshal(msg.Data, &e); err != nil { |
| // If the message has badly-formatted data, |
| // we'll never be able to parse it, so go ahead |
| // and ack it to get it out of the queue. |
| msg.Ack() |
| return err |
| } |
| if e.JsonPayload != nil { |
| if err := e.JsonPayload.Validate(); err != nil { |
| // If the message has badly-formatted data, |
| // we'll never be able to use it, so go ahead |
| // and ack it to get it out of the queue. |
| msg.Ack() |
| return err |
| } |
| if err := d.UpdateTaskDriver(e.JsonPayload.TaskId, e.JsonPayload); err != nil { |
| // This may be a transient error, so nack the message and hope |
| // that we'll be able to handle it on redelivery. |
| msg.Nack() |
| return fmt.Errorf("Failed to insert task driver update: %s", err) |
| } |
| } else if e.TextPayload != "" { |
| if err := lm.Insert(&e); err != nil { |
| // This may be a transient error, so nack the message and hope |
| // that we'll be able to handle it on redelivery. |
| msg.Nack() |
| return fmt.Errorf("Failed to insert log entry: %s", err) |
| } |
| } else { |
| msg.Ack() |
| return fmt.Errorf("Message has no payload: %+v", msg) |
| } |
| msg.Ack() |
| |
| return nil |
| } |
| |
| func main() { |
| common.InitWithMust( |
| "task-driver-server", |
| common.PrometheusOpt(promPort), |
| common.MetricsLoggingOpt(), |
| ) |
| defer common.Defer() |
| if *project == "" { |
| sklog.Fatal("--project_id is required.") |
| } |
| if *hang { |
| select {} |
| } |
| |
| // Setup pubsub. |
| ctx := context.Background() |
| client, err := pubsub.NewClient(ctx, *project) |
| if err != nil { |
| sklog.Fatal(err) |
| } |
| topic := client.Topic(td.PubsubTopicLogs) |
| if exists, err := topic.Exists(ctx); err != nil { |
| sklog.Fatal(err) |
| } else if !exists { |
| topic, err = client.CreateTopic(ctx, td.PubsubTopicLogs) |
| if err != nil { |
| sklog.Fatal(err) |
| } |
| } |
| sub := client.Subscription(subscriptionName) |
| if exists, err := sub.Exists(ctx); err != nil { |
| sklog.Fatal(err) |
| } else if !exists { |
| sub, err = client.CreateSubscription(ctx, subscriptionName, pubsub.SubscriptionConfig{ |
| Topic: topic, |
| AckDeadline: 10 * time.Second, |
| }) |
| if err != nil { |
| sklog.Fatal(err) |
| } |
| } |
| |
| // Create the TaskDriver DB. |
| ts, err := auth.NewDefaultTokenSource(*local, bigtable.Scope) |
| if err != nil { |
| sklog.Fatal(err) |
| } |
| d, err = bigtable_db.NewBigTableDB(ctx, *btProject, *btInstance, ts) |
| if err != nil { |
| sklog.Fatal(err) |
| } |
| lm, err = logs.NewLogsManager(ctx, *btProject, *btInstance, ts) |
| if err != nil { |
| sklog.Fatal(err) |
| } |
| |
| // Launch a goroutine to listen for pubsub messages. |
| go func() { |
| sklog.Infof("Waiting for messages.") |
| for { |
| if err := sub.Receive(ctx, func(ctx context.Context, msg *pubsub.Message) { |
| if err := handleMessage(msg); err != nil { |
| sklog.Errorf("Failed to handle pubsub message: %s", err) |
| } |
| }); err != nil { |
| sklog.Fatal(err) |
| } |
| } |
| }() |
| |
| // Run the web server. |
| serverURL := "https://" + *host |
| if *local { |
| serverURL = "http://" + *host + *port |
| } |
| runServer(ctx, serverURL) |
| } |