// push is the web server for pushing debian packages.
package main

import (
	"encoding/json"
	"flag"
	"fmt"
	"io"
	"net/http"
	"path/filepath"
	"runtime"
	"sort"
	"strings"
	"sync"
	"time"

	"github.com/gorilla/mux"
	"go.skia.org/infra/go/allowed"
	"go.skia.org/infra/go/auth"
	"go.skia.org/infra/go/chatbot"
	"go.skia.org/infra/go/common"
	"go.skia.org/infra/go/httputils"
	"go.skia.org/infra/go/login"
	"go.skia.org/infra/go/metrics2"
	"go.skia.org/infra/go/packages"
	"go.skia.org/infra/go/sklog"
	"go.skia.org/infra/go/systemd"
	"go.skia.org/infra/go/util"
	"go.skia.org/infra/push/go/trigger"
	"go.skia.org/infra/push/go/types"
	compute "google.golang.org/api/compute/v1"
	storage "google.golang.org/api/storage/v1"
)

const (
	CHAT_MSG = `%s pushed %s to %s`
)

// flags
var (
	bucketName     = flag.String("bucket_name", "skia-push", "The name of the Google Storage bucket that contains push packages and info.")
	configFilename = flag.String("config_filename", "skiapush.json5", "Config filename.")
	local          = flag.Bool("local", false, "Running locally if true. As opposed to in production.")
	port           = flag.String("port", ":8000", "HTTP service address (e.g., ':8000')")
	project        = flag.String("project", "google.com:skia-buildbots", "The Google Compute Engine project.")
	promPort       = flag.String("prom_port", ":20000", "Metrics service address (e.g., ':10110')")
	resourcesDir   = flag.String("resources_dir", "", "The directory to find templates, JS, and CSS files. If blank the current directory will be used.")
	logging        = flag.Bool("logging", true, "If true then log to stderr.")
)

// NewTimeoutClient creates a new http.Client with both a dial timeout and a
// request timeout.
func NewFastTimeoutClient() *http.Client {
	return httputils.NewConfiguredTimeoutClient(10*time.Second, 2*time.Second)
}

// Server is the state of the application.
type Server struct {
	// config is the configuration of what servers and apps we are managing.
	config packages.PackageConfig

	// ip keeps an updated map from server name to public IP address.
	ip *Zones

	// serverNames is a list of server names (GCE DNS names) we are managing.
	// Extracted from 'config'.
	serverNames []string

	// client is an HTTP client authorized to read and write gs://skia-push.
	client *http.Client

	// fastClient is an HTTP client that is unauthorized and fails quickly.
	fastClient *http.Client

	// store is an Google Storage API client authorized to read and write gs://skia-push.
	store *storage.Service

	// comp is an Google Compute API client authorized to read compute information.
	comp *compute.Service

	// packageInfo is a cache of info about packages.
	packageInfo *packages.AllInfo

	// mutex protects currentStatus
	mutex sync.Mutex

	// The current status of all the units.
	currentStatus map[string]*systemd.UnitStatus
}

// newServer creates a new *Server object.
func newServer() *Server {
	if *resourcesDir == "" {
		_, filename, _, _ := runtime.Caller(0)
		*resourcesDir = filepath.Join(filepath.Dir(filename), "../../dist")
	}

	var err error
	config, err := packages.LoadPackageConfig(*configFilename)
	if err != nil {
		sklog.Fatalf("Failed to load PackageConfig file: %s", err)
	}

	serverNames := config.AllServerNames()

	ts, err := auth.NewDefaultTokenSource(*local, auth.SCOPE_FULL_CONTROL, auth.SCOPE_GCE)
	if err != nil {
		sklog.Fatalf("Failed to create authenticated HTTP client token source: %s", err)
	}
	client := httputils.DefaultClientConfig().WithTokenSource(ts).With2xxOnly().Client()

	fastClient := NewFastTimeoutClient()

	store, err := storage.New(client)
	if err != nil {
		sklog.Fatalf("Failed to create storage service client: %s", err)
	}
	comp, err := compute.New(client)
	if err != nil {
		sklog.Fatalf("Failed to create compute service client: %s", err)
	}
	ip, err := NewZones(comp)
	if err != nil {
		sklog.Fatalf("Failed to load IP addresses at startup: %s", err)
	}

	packages.SetBucketName(*bucketName)
	packageInfo, err := packages.NewAllInfo(client, store, serverNames)
	if err != nil {
		sklog.Fatalf("Failed to create packages.AllInfo at startup: %s", err)
	}

	chatbot.Init("push.skia.org")

	return &Server{
		config:        config,
		ip:            ip,
		serverNames:   serverNames,
		client:        client,
		fastClient:    fastClient,
		store:         store,
		comp:          comp,
		packageInfo:   packageInfo,
		currentStatus: map[string]*systemd.UnitStatus{},
	}

}

// Zones keeps track of the zone of each server.
type Zones struct {
	zone  map[string]string
	comp  *compute.Service
	mutex sync.Mutex
}

func (i *Zones) load() error {
	zoneMap := map[string]string{}
	zones, err := i.comp.Zones.List(*project).Do()
	if err != nil {
		return fmt.Errorf("Failed to list zones: %s", err)
	}
	for _, zone := range zones.Items {
		sklog.Infof("Zone: %s", zone.Name)
		list, err := i.comp.Instances.List(*project, zone.Name).Do()
		if err != nil {
			return fmt.Errorf("Failed to list instances: %s", err)
		}
		for _, item := range list.Items {
			zoneMap[item.Name] = zone.Name
		}
	}
	i.mutex.Lock()
	defer i.mutex.Unlock()

	i.zone = zoneMap
	return nil
}

func (i *Zones) Zone(server string) string {
	return i.zone[server]
}

func NewZones(comp *compute.Service) (*Zones, error) {
	i := &Zones{
		comp: comp,
	}
	if err := i.load(); err != nil {
		return nil, err
	}
	go func() {
		for range time.Tick(time.Second * 60) {
			if err := i.load(); err != nil {
				sklog.Infof("Error refreshing IP address list: %s", err)
			}
		}
	}()

	return i, nil
}

// ServerUI is used in ServersUI.
type ServerUI struct {
	// Name is the name of the server.
	Name string

	// Installed is a list of package names.
	Installed []string
}

// ServersUI is the format for data sent to the UI as JSON.
// It is a list of ServerUI's.
type ServersUI []*ServerUI

// PushNewPackage is the form of the JSON requests we receive
// from the UI to push a package.
type PushNewPackage struct {
	// Name is the unique package id, such as 'pull/pull:jcgregori....'.
	Name string `json:"name"`

	// Server is the GCE name of the server.
	Server string `json:"server"`
}

// getStatus returns a populated []*systemd.UnitStatus for the given server, one for each
// push managed service, and nil if the information wasn't able to be retrieved.
func (s *Server) getStatus(server string) []*systemd.UnitStatus {
	resp, err := s.fastClient.Get(fmt.Sprintf("http://%s:10000/_/list", server))
	if err != nil {
		sklog.Infof("Failed to get status of: %s", server)
		return nil
	}
	defer util.Close(resp.Body)
	if resp.StatusCode != 200 {
		sklog.Infof("Bad status code: %d %s", resp.StatusCode, server)
		return nil
	}
	dec := json.NewDecoder(resp.Body)

	var ret types.ListResponse
	if err := dec.Decode(&ret); err != nil {
		sklog.Infof("Failed to decode: %s", err)
		return nil
	}
	return ret.Units
}

// serviceStatus returns a map[string]*systemd.UnitStatus, with one entry for each service running on each
// server. The keys for the return value are "<server_name>:<service_name>", for example,
// "skia-push:logserverd.service".
func (s *Server) serviceStatus(servers ServersUI) map[string]*systemd.UnitStatus {
	var mutex sync.Mutex
	var wg sync.WaitGroup
	ret := map[string]*systemd.UnitStatus{}
	for _, server := range servers {
		wg.Add(1)
		go func(server string) {
			defer wg.Done()
			allServices := s.getStatus(server)
			mutex.Lock()
			defer mutex.Unlock()
			for _, status := range allServices {
				if status.Status != nil {
					ret[server+":"+status.Status.Name] = status
				}
			}
		}(server.Name)
	}
	wg.Wait()

	return ret
}

// appNames returns a list of application names from a list of packages.
//
// For example:
//
//    appNames(["pull/pull:jcgregorio...", "push/push:someone@..."]
//
// will return
//
//    ["pull", "push"]
//
func appNames(installed []string) []string {
	ret := make([]string, len(installed))
	for i, s := range installed {
		ret[i] = strings.Split(s, "/")[0]
	}
	return ret
}

// AllUI contains all the information we know about the system.
type AllUI struct {
	Servers  ServersUI                      `json:"servers"`
	Packages map[string][]*packages.Package `json:"packages"`
	Status   map[string]*systemd.UnitStatus `json:"status"`
}

// stateHandler handles the GET of the JSON.
func (s *Server) stateHandler(w http.ResponseWriter, r *http.Request) {
	w.Header().Set("Content-Type", "application/json")

	if r.FormValue("refresh") == "true" {
		if err := s.packageInfo.ForceRefresh(); err != nil {
			httputils.ReportError(w, r, err, "Failed to refresh.")
		}
	}
	allAvailable := s.packageInfo.AllAvailable()
	allInstalled := s.packageInfo.AllInstalled()

	// Update allInstalled to add in missing applications.
	//
	// Loop over 'config' and make sure each server and application is
	// represented, adding in "appName/" placeholders as package names where
	// appropriate. This is to bootstrap the case where an app is configured to
	// be available for a server, but no package for that application has been
	// installed yet.
	serversSeen := map[string]bool{}
	for name, installed := range allInstalled {
		installedNames := appNames(installed.Names)
		for _, expected := range s.config.Servers[name].AppNames {
			if !util.In(expected, installedNames) {
				installed.Names = append(installed.Names, expected+"/")
			}
		}
		allInstalled[name] = installed
		serversSeen[name] = true
	}

	// Now loop over config.Servers and find servers that don't have
	// any installed applications. Add them to allInstalled.
	for name, expected := range s.config.Servers {
		if _, ok := serversSeen[name]; ok {
			continue
		}
		installed := []string{}
		for _, appName := range expected.AppNames {
			installed = append(installed, appName+"/")
		}
		allInstalled[name].Names = installed
	}

	if r.Method == "POST" {
		if !login.IsAdmin(r) {
			httputils.ReportError(w, r, nil, "You must be logged on as an admin to push.")
			return
		}
		push := PushNewPackage{}
		dec := json.NewDecoder(r.Body)
		defer util.Close(r.Body)
		if err := dec.Decode(&push); err != nil {
			httputils.ReportError(w, r, fmt.Errorf("Failed to decode push request"), "Failed to decode push request")
			return
		}
		if installedPackages, ok := allInstalled[push.Server]; !ok {
			httputils.ReportError(w, r, fmt.Errorf("Unknown server name"), "Unknown server name")
			return
		} else {
			// Find a string starting with the same appname, replace it with
			// push.Name. Leave all other package names unchanged.
			appName := strings.Split(push.Name, "/")[0]
			newInstalled := []string{}
			for _, oldName := range installedPackages.Names {
				goodName := oldName
				if strings.Split(oldName, "/")[0] == appName {
					goodName = push.Name
				}
				newInstalled = append(newInstalled, goodName)
			}
			sklog.Infof("Updating %s with %#v giving %#v", push.Server, push.Name, newInstalled)
			if err := s.packageInfo.PutInstalled(push.Server, newInstalled, installedPackages.Generation); err != nil {
				httputils.ReportError(w, r, err, "Failed to update server.")
				return
			}
			body := fmt.Sprintf(CHAT_MSG, login.LoggedInAs(r), appName, push.Server)
			if err := chatbot.Send(body, "push", ""); err != nil {
				sklog.Warningf("Failed to send chat notification: %s", err)
			}

			if err := trigger.ByMetadata(s.comp, *project, push.Name, push.Server, s.ip.Zone(push.Server)); err != nil {
				sklog.Warningf("Could not trigger package load via metadata: %s", err)
			}
			allInstalled[push.Server].Names = newInstalled
		}
	}

	// The response to either a GET or a POST is an up to date ServersUI.
	servers := serversFromAllInstalled(allInstalled)
	enc := json.NewEncoder(w)
	err := enc.Encode(AllUI{
		Servers:  servers,
		Packages: allAvailable,
		Status:   s.serviceStatus(servers),
	})
	if err != nil {
		sklog.Errorf("Failed to write or encode output: %s", err)
		return
	}
}

func serversFromAllInstalled(allInstalled map[string]*packages.Installed) ServersUI {
	servers := ServersUI{}
	names := make([]string, 0, len(allInstalled))
	for name := range allInstalled {
		names = append(names, name)
	}
	sort.Strings(names)
	for _, name := range names {
		servers = append(servers, &ServerUI{
			Name:      name,
			Installed: allInstalled[name].Names,
		})
	}

	return servers
}

// statusHandler handles the GET of the JSON for each service's status.
func (s *Server) statusHandler(w http.ResponseWriter, r *http.Request) {
	w.Header().Set("Content-Type", "application/json")
	enc := json.NewEncoder(w)
	err := enc.Encode(s.getCurrentStatus())
	if err != nil {
		sklog.Errorf("Failed to write or encode output: %s", err)
		return
	}
}

func (s *Server) getCurrentStatus() map[string]*systemd.UnitStatus {
	s.mutex.Lock()
	defer s.mutex.Unlock()
	return s.currentStatus
}

func (s *Server) stepStatus() {
	servers := serversFromAllInstalled(s.packageInfo.AllInstalled())
	updatedStatus := s.serviceStatus(servers)
	s.mutex.Lock()
	defer s.mutex.Unlock()
	s.currentStatus = updatedStatus
}

func (s *Server) startStatusUpdate() {
	s.stepStatus()
	for range time.Tick(5 * time.Second) {
		s.stepStatus()
	}
}

// changeHandler handles actions on individual services.
//
// The actions are forwarded off to the pulld service
// running on the machine hosting that service.
func (s *Server) changeHandler(w http.ResponseWriter, r *http.Request) {
	if !login.IsAdmin(r) {
		httputils.ReportError(w, r, nil, "You must be logged on as an admin to push.")
		return
	}
	if err := r.ParseForm(); err != nil {
		httputils.ReportError(w, r, err, "Failed to parse form.")
		return
	}
	action := r.Form.Get("action")
	name := r.Form.Get("name")
	machine := r.Form.Get("machine")
	if action == "start" && name == "reboot.target" {
		body := fmt.Sprintf("%s rebooted %s", login.LoggedInAs(r), machine)
		if err := chatbot.Send(body, "push", ""); err != nil {
			sklog.Warningf("Failed to send chat notification: %s", err)
		}
	}
	url := fmt.Sprintf("http://%s:10000/_/change?name=%s&action=%s", machine, name, action)
	resp, err := s.fastClient.Post(url, "", nil)
	if err != nil {
		httputils.ReportError(w, r, err, fmt.Sprintf("Failed to reach %s: %v %s", machine, resp, err))
		return
	}
	defer util.Close(resp.Body)
	if resp.StatusCode != 200 {
		httputils.ReportError(w, r, err, fmt.Sprintf("Failed to reach %s: %v %s", machine, resp, err))
		return
	}
	w.Header().Set("Content-Type", "application/json")
	if _, err := io.Copy(w, resp.Body); err != nil {
		sklog.Errorf("Failed to copy JSON error out: %s", err)
	}
}

func makeResourceHandler() func(http.ResponseWriter, *http.Request) {
	fileServer := http.FileServer(http.Dir(*resourcesDir))
	return func(w http.ResponseWriter, r *http.Request) {
		w.Header().Add("Cache-Control", "max-age=300")
		fileServer.ServeHTTP(w, r)
	}
}

// oneStep does a single step of startDirtyMonitoring().
func (s *Server) oneStep() {
	count := int64(0)
	allInstalled := s.packageInfo.AllInstalled()
	allAvailable := s.packageInfo.AllAvailable()
	for serverName, installed := range allInstalled {
		// Don't warn about dirty packages on staging instances.
		if strings.HasSuffix(serverName, "-stage") {
			sklog.Infof("Skipping %s", serverName)
			continue
		}
		for _, app := range installed.Names {
			// app is the full versioned name of the installed app, we can find just
			// the package name of the app by splitting off the stuff before the
			// first "/".
			parts := strings.Split(app, "/")
			packageName := parts[0]
			for _, version := range allAvailable[packageName] {
				if version.Name == app {
					if version.Dirty {
						count++
						break
					}
				}
			}
		}
	}
	sklog.Infof("Finished oneStep: Found %d dirty packages running.", count)
	metrics2.GetInt64Metric("dirty_packages", nil).Update(count)
}

// startDirtyMonitoring periodically checks the number of dirty packages being
// used in prod and reports that number to metrics.
//
// This function doesn't return and should be launched as a Go routine.
func (s *Server) startDirtyMonitoring() {
	s.oneStep()
	for range time.Tick(time.Minute) {
		s.oneStep()
	}
}

func main() {
	common.InitWithMust(
		"push",
		common.PrometheusOpt(promPort),
		common.SLogLoggingOpt(logging),
	)
	if !*local {
		login.SimpleInitWithAllow(*port, *local, allowed.Googlers(), allowed.Googlers(), allowed.Googlers())
	}
	s := newServer()

	go s.startDirtyMonitoring()
	go s.startStatusUpdate()
	r := mux.NewRouter()
	r.HandleFunc("/_/change", s.changeHandler)
	r.HandleFunc("/_/state", s.stateHandler)
	r.HandleFunc("/_/status", s.statusHandler)
	if !*local {
		r.HandleFunc("/loginstatus/", login.StatusHandler)
		r.HandleFunc("/logout/", login.LogoutHandler)
		r.HandleFunc("/oauth2callback/", login.OAuth2CallbackHandler)
	}
	r.PathPrefix("/").HandlerFunc(httputils.MakeResourceHandler(*resourcesDir))
	http.Handle("/", httputils.HealthzAndHTTPS(httputils.LoggingGzipRequestResponse(r)))
	sklog.Infoln("Ready to serve.")
	sklog.Fatal(http.ListenAndServe(*port, nil))
}
