package swarming

import (
	"context"
	"encoding/json"
	"fmt"
	"net/http"
	"strings"
	"time"

	"cloud.google.com/go/pubsub"
	"github.com/gorilla/mux"
	"go.skia.org/infra/go/common"
	"go.skia.org/infra/go/httputils"
)

const (
	PUBSUB_FULLY_QUALIFIED_TOPIC_TMPL    = "projects/%s/topics/%s"
	PUBSUB_TOPIC_SWARMING_TASKS          = "swarming-tasks"
	PUBSUB_TOPIC_SWARMING_TASKS_INTERNAL = "swarming-tasks-internal"
	PUSH_URL_SWARMING_TASKS              = "pubsub/swarming-tasks"
)

// InitPubSub ensures that the pub/sub topics and subscriptions needed by the
// TaskScheduler exist.
func InitPubSub(serverUrl, topicName, subscriberName string) error {
	ctx := context.Background()

	// Create a client.
	client, err := pubsub.NewClient(ctx, common.PROJECT_ID)
	if err != nil {
		return err
	}

	// Create topic and subscription if necessary.

	// Topic.
	topic := client.Topic(topicName)
	exists, err := topic.Exists(ctx)
	if err != nil {
		return err
	}
	if !exists {
		if _, err := client.CreateTopic(ctx, topicName); err != nil {
			return err
		}
	}

	// Subscription.
	subName := fmt.Sprintf("%s+%s", subscriberName, topicName)
	sub := client.Subscription(subName)
	exists, err = sub.Exists(ctx)
	if err != nil {
		return err
	}
	if !exists {
		endpoint := serverUrl
		if !strings.HasSuffix(endpoint, "/") {
			endpoint += "/"
		}
		endpoint += PUSH_URL_SWARMING_TASKS
		if _, err := client.CreateSubscription(ctx, subName, pubsub.SubscriptionConfig{
			Topic:       topic,
			AckDeadline: 3 * time.Minute,
			PushConfig: pubsub.PushConfig{
				Endpoint: endpoint,
			},
		}); err != nil {
			return err
		}
	}
	return nil
}

// PubSubRequest is the format of pub/sub HTTP request body.
type PubSubRequest struct {
	Message      pubsub.Message `json:"message"`
	Subscription string         `json:"subscription"`
}

// PubSubTaskMessage is a message received from Swarming via pub/sub about a
// Task.
type PubSubTaskMessage struct {
	SwarmingTaskId string `json:"task_id"`
	UserData       string `json:"userdata"`
}

// PubSubHandler is an interface used for handling pub/sub messages.
type PubSubHandler interface {
	HandleSwarmingPubSub(*PubSubTaskMessage) bool
}

// RegisterPubSubServer adds handler to r that handle pub/sub push
// notifications.
func RegisterPubSubServer(h PubSubHandler, r *mux.Router) {
	r.HandleFunc("/"+PUSH_URL_SWARMING_TASKS, func(w http.ResponseWriter, r *http.Request) {
		var req PubSubRequest
		if err := json.NewDecoder(r.Body).Decode(&req); err != nil {
			httputils.ReportError(w, r, err, "Failed to decode request body.")
			return
		}

		var t PubSubTaskMessage
		if err := json.Unmarshal(req.Message.Data, &t); err != nil {
			httputils.ReportError(w, r, err, "Failed to decode PubSubTaskMessage.")
			return
		}

		ack := h.HandleSwarmingPubSub(&t)
		if ack {
			w.WriteHeader(http.StatusOK)
		} else {
			w.WriteHeader(http.StatusServiceUnavailable)
		}
	}).Methods(http.MethodPost)
}
