| package swarming |
| |
| import ( |
| "context" |
| "encoding/json" |
| "fmt" |
| "net/http" |
| "strings" |
| "time" |
| |
| "cloud.google.com/go/pubsub" |
| "github.com/gorilla/mux" |
| "go.skia.org/infra/go/common" |
| "go.skia.org/infra/go/httputils" |
| ) |
| |
| const ( |
| PUBSUB_FULLY_QUALIFIED_TOPIC_TMPL = "projects/%s/topics/%s" |
| PUBSUB_TOPIC_SWARMING_TASKS = "swarming-tasks" |
| PUBSUB_TOPIC_SWARMING_TASKS_INTERNAL = "swarming-tasks-internal" |
| PUSH_URL_SWARMING_TASKS = "pubsub/swarming-tasks" |
| ) |
| |
| // InitPubSub ensures that the pub/sub topics and subscriptions needed by the |
| // TaskScheduler exist. |
| func InitPubSub(serverUrl, topicName, subscriberName string) error { |
| ctx := context.Background() |
| |
| // Create a client. |
| client, err := pubsub.NewClient(ctx, common.PROJECT_ID) |
| if err != nil { |
| return err |
| } |
| |
| // Create topic and subscription if necessary. |
| |
| // Topic. |
| topic := client.Topic(topicName) |
| exists, err := topic.Exists(ctx) |
| if err != nil { |
| return err |
| } |
| if !exists { |
| if _, err := client.CreateTopic(ctx, topicName); err != nil { |
| return err |
| } |
| } |
| |
| // Subscription. |
| subName := fmt.Sprintf("%s+%s", subscriberName, topicName) |
| sub := client.Subscription(subName) |
| exists, err = sub.Exists(ctx) |
| if err != nil { |
| return err |
| } |
| if !exists { |
| endpoint := serverUrl |
| if !strings.HasSuffix(endpoint, "/") { |
| endpoint += "/" |
| } |
| endpoint += PUSH_URL_SWARMING_TASKS |
| if _, err := client.CreateSubscription(ctx, subName, pubsub.SubscriptionConfig{ |
| Topic: topic, |
| AckDeadline: 3 * time.Minute, |
| PushConfig: pubsub.PushConfig{ |
| Endpoint: endpoint, |
| }, |
| }); err != nil { |
| return err |
| } |
| } |
| return nil |
| } |
| |
| // PubSubRequest is the format of pub/sub HTTP request body. |
| type PubSubRequest struct { |
| Message pubsub.Message `json:"message"` |
| Subscription string `json:"subscription"` |
| } |
| |
| // PubSubTaskMessage is a message received from Swarming via pub/sub about a |
| // Task. |
| type PubSubTaskMessage struct { |
| SwarmingTaskId string `json:"task_id"` |
| UserData string `json:"userdata"` |
| } |
| |
| // PubSubHandler is an interface used for handling pub/sub messages. |
| type PubSubHandler interface { |
| HandleSwarmingPubSub(*PubSubTaskMessage) bool |
| } |
| |
| // RegisterPubSubServer adds handler to r that handle pub/sub push |
| // notifications. |
| func RegisterPubSubServer(h PubSubHandler, r *mux.Router) { |
| r.HandleFunc("/"+PUSH_URL_SWARMING_TASKS, func(w http.ResponseWriter, r *http.Request) { |
| var req PubSubRequest |
| if err := json.NewDecoder(r.Body).Decode(&req); err != nil { |
| httputils.ReportError(w, r, err, "Failed to decode request body.") |
| return |
| } |
| |
| var t PubSubTaskMessage |
| if err := json.Unmarshal(req.Message.Data, &t); err != nil { |
| httputils.ReportError(w, r, err, "Failed to decode PubSubTaskMessage.") |
| return |
| } |
| |
| ack := h.HandleSwarmingPubSub(&t) |
| if ack { |
| w.WriteHeader(http.StatusOK) |
| } else { |
| w.WriteHeader(http.StatusServiceUnavailable) |
| } |
| }).Methods(http.MethodPost) |
| } |