blob: 5746ba0a92cf502578d7497918751a9c551cbe64 [file] [log] [blame]
// push is the web server for pushing debian packages.
package main
import (
compute ""
storage ""
const (
CHAT_MSG = `%s pushed %s to %s`
// flags
var (
bucketName = flag.String("bucket_name", "skia-push", "The name of the Google Storage bucket that contains push packages and info.")
configFilename = flag.String("config_filename", "skiapush.json5", "Config filename.")
local = flag.Bool("local", false, "Running locally if true. As opposed to in production.")
port = flag.String("port", ":8000", "HTTP service address (e.g., ':8000')")
project = flag.String("project", "", "The Google Compute Engine project.")
promPort = flag.String("prom_port", ":20000", "Metrics service address (e.g., ':10110')")
resourcesDir = flag.String("resources_dir", "", "The directory to find templates, JS, and CSS files. If blank the current directory will be used.")
// NewTimeoutClient creates a new http.Client with both a dial timeout and a
// request timeout.
func NewFastTimeoutClient() *http.Client {
return httputils.NewConfiguredTimeoutClient(10*time.Second, 2*time.Second)
// Server is the state of the application.
type Server struct {
// config is the configuration of what servers and apps we are managing.
config packages.PackageConfig
// ip keeps an updated map from server name to public IP address.
ip *Zones
// serverNames is a list of server names (GCE DNS names) we are managing.
// Extracted from 'config'.
serverNames []string
// client is an HTTP client authorized to read and write gs://skia-push.
client *http.Client
// fastClient is an HTTP client that is unauthorized and fails quickly.
fastClient *http.Client
// store is an Google Storage API client authorized to read and write gs://skia-push.
store *storage.Service
// comp is an Google Compute API client authorized to read compute information.
comp *compute.Service
// packageInfo is a cache of info about packages.
packageInfo *packages.AllInfo
// mutex protects currentStatus
mutex sync.Mutex
// The current status of all the units.
currentStatus map[string]*systemd.UnitStatus
// newServer creates a new *Server object.
func newServer() *Server {
if *resourcesDir == "" {
_, filename, _, _ := runtime.Caller(0)
*resourcesDir = filepath.Join(filepath.Dir(filename), "../../dist")
var err error
config, err := packages.LoadPackageConfig(*configFilename)
if err != nil {
sklog.Fatalf("Failed to load PackageConfig file: %s", err)
serverNames := config.AllServerNames()
ts, err := auth.NewDefaultTokenSource(*local, auth.SCOPE_FULL_CONTROL, auth.SCOPE_GCE)
if err != nil {
sklog.Fatalf("Failed to create authenticated HTTP client token source: %s", err)
client := httputils.DefaultClientConfig().WithTokenSource(ts).With2xxOnly().Client()
fastClient := NewFastTimeoutClient()
store, err := storage.New(client)
if err != nil {
sklog.Fatalf("Failed to create storage service client: %s", err)
comp, err := compute.New(client)
if err != nil {
sklog.Fatalf("Failed to create compute service client: %s", err)
ip, err := NewZones(comp)
if err != nil {
sklog.Fatalf("Failed to load IP addresses at startup: %s", err)
packageInfo, err := packages.NewAllInfo(client, store, serverNames)
if err != nil {
sklog.Fatalf("Failed to create packages.AllInfo at startup: %s", err)
return &Server{
config: config,
ip: ip,
serverNames: serverNames,
client: client,
fastClient: fastClient,
store: store,
comp: comp,
packageInfo: packageInfo,
currentStatus: map[string]*systemd.UnitStatus{},
// Zones keeps track of the zone of each server.
type Zones struct {
zone map[string]string
comp *compute.Service
mutex sync.Mutex
func (i *Zones) load() error {
zoneMap := map[string]string{}
zones, err := i.comp.Zones.List(*project).Do()
if err != nil {
return fmt.Errorf("Failed to list zones: %s", err)
for _, zone := range zones.Items {
sklog.Infof("Zone: %s", zone.Name)
list, err := i.comp.Instances.List(*project, zone.Name).Do()
if err != nil {
return fmt.Errorf("Failed to list instances: %s", err)
for _, item := range list.Items {
zoneMap[item.Name] = zone.Name
defer i.mutex.Unlock() = zoneMap
return nil
func (i *Zones) Zone(server string) string {
func NewZones(comp *compute.Service) (*Zones, error) {
i := &Zones{
comp: comp,
if err := i.load(); err != nil {
return nil, err
go func() {
for range time.Tick(time.Second * 60) {
if err := i.load(); err != nil {
sklog.Infof("Error refreshing IP address list: %s", err)
return i, nil
// ServerUI is used in ServersUI.
type ServerUI struct {
// Name is the name of the server.
Name string
// Installed is a list of package names.
Installed []string
// ServersUI is the format for data sent to the UI as JSON.
// It is a list of ServerUI's.
type ServersUI []*ServerUI
// PushNewPackage is the form of the JSON requests we receive
// from the UI to push a package.
type PushNewPackage struct {
// Name is the unique package id, such as 'pull/pull:jcgregori....'.
Name string `json:"name"`
// Server is the GCE name of the server.
Server string `json:"server"`
// getStatus returns a populated []*systemd.UnitStatus for the given server, one for each
// push managed service, and nil if the information wasn't able to be retrieved.
func (s *Server) getStatus(server string) []*systemd.UnitStatus {
resp, err := s.fastClient.Get(fmt.Sprintf("http://%s:10000/_/list", server))
if err != nil {
sklog.Infof("Failed to get status of: %s", server)
return nil
defer util.Close(resp.Body)
if resp.StatusCode != 200 {
sklog.Infof("Bad status code: %d %s", resp.StatusCode, server)
return nil
dec := json.NewDecoder(resp.Body)
var ret types.ListResponse
if err := dec.Decode(&ret); err != nil {
sklog.Infof("Failed to decode: %s", err)
return nil
return ret.Units
// serviceStatus returns a map[string]*systemd.UnitStatus, with one entry for each service running on each
// server. The keys for the return value are "<server_name>:<service_name>", for example,
// "skia-push:logserverd.service".
func (s *Server) serviceStatus(servers ServersUI) map[string]*systemd.UnitStatus {
var mutex sync.Mutex
var wg sync.WaitGroup
ret := map[string]*systemd.UnitStatus{}
for _, server := range servers {
go func(server string) {
defer wg.Done()
allServices := s.getStatus(server)
defer mutex.Unlock()
for _, status := range allServices {
if status.Status != nil {
ret[server+":"+status.Status.Name] = status
return ret
// appNames returns a list of application names from a list of packages.
// For example:
// appNames(["pull/pull:jcgregorio...", "push/push:someone@..."]
// will return
// ["pull", "push"]
func appNames(installed []string) []string {
ret := make([]string, len(installed))
for i, s := range installed {
ret[i] = strings.Split(s, "/")[0]
return ret
// AllUI contains all the information we know about the system.
type AllUI struct {
Servers ServersUI `json:"servers"`
Packages map[string][]*packages.Package `json:"packages"`
Status map[string]*systemd.UnitStatus `json:"status"`
// stateHandler handles the GET of the JSON.
func (s *Server) stateHandler(w http.ResponseWriter, r *http.Request) {
w.Header().Set("Content-Type", "application/json")
if r.FormValue("refresh") == "true" {
if err := s.packageInfo.ForceRefresh(); err != nil {
httputils.ReportError(w, r, err, "Failed to refresh.")
allAvailable := s.packageInfo.AllAvailable()
allInstalled := s.packageInfo.AllInstalled()
// Update allInstalled to add in missing applications.
// Loop over 'config' and make sure each server and application is
// represented, adding in "appName/" placeholders as package names where
// appropriate. This is to bootstrap the case where an app is configured to
// be available for a server, but no package for that application has been
// installed yet.
serversSeen := map[string]bool{}
for name, installed := range allInstalled {
installedNames := appNames(installed.Names)
for _, expected := range s.config.Servers[name].AppNames {
if !util.In(expected, installedNames) {
installed.Names = append(installed.Names, expected+"/")
allInstalled[name] = installed
serversSeen[name] = true
// Now loop over config.Servers and find servers that don't have
// any installed applications. Add them to allInstalled.
for name, expected := range s.config.Servers {
if _, ok := serversSeen[name]; ok {
installed := []string{}
for _, appName := range expected.AppNames {
installed = append(installed, appName+"/")
allInstalled[name].Names = installed
if r.Method == "POST" {
if !login.IsAdmin(r) {
httputils.ReportError(w, r, nil, "You must be logged on as an admin to push.")
push := PushNewPackage{}
dec := json.NewDecoder(r.Body)
defer util.Close(r.Body)
if err := dec.Decode(&push); err != nil {
httputils.ReportError(w, r, fmt.Errorf("Failed to decode push request"), "Failed to decode push request")
if installedPackages, ok := allInstalled[push.Server]; !ok {
httputils.ReportError(w, r, fmt.Errorf("Unknown server name"), "Unknown server name")
} else {
// Find a string starting with the same appname, replace it with
// push.Name. Leave all other package names unchanged.
appName := strings.Split(push.Name, "/")[0]
newInstalled := []string{}
for _, oldName := range installedPackages.Names {
goodName := oldName
if strings.Split(oldName, "/")[0] == appName {
goodName = push.Name
newInstalled = append(newInstalled, goodName)
sklog.Infof("Updating %s with %#v giving %#v", push.Server, push.Name, newInstalled)
if err := s.packageInfo.PutInstalled(push.Server, newInstalled, installedPackages.Generation); err != nil {
httputils.ReportError(w, r, err, "Failed to update server.")
body := fmt.Sprintf(CHAT_MSG, login.LoggedInAs(r), appName, push.Server)
if err := chatbot.Send(body, "push", ""); err != nil {
sklog.Warningf("Failed to send chat notification: %s", err)
if err := trigger.ByMetadata(s.comp, *project, push.Name, push.Server, s.ip.Zone(push.Server)); err != nil {
sklog.Warningf("Could not trigger package load via metadata: %s", err)
allInstalled[push.Server].Names = newInstalled
// The response to either a GET or a POST is an up to date ServersUI.
servers := serversFromAllInstalled(allInstalled)
enc := json.NewEncoder(w)
err := enc.Encode(AllUI{
Servers: servers,
Packages: allAvailable,
Status: s.serviceStatus(servers),
if err != nil {
sklog.Errorf("Failed to write or encode output: %s", err)
func serversFromAllInstalled(allInstalled map[string]*packages.Installed) ServersUI {
servers := ServersUI{}
names := make([]string, 0, len(allInstalled))
for name := range allInstalled {
names = append(names, name)
for _, name := range names {
servers = append(servers, &ServerUI{
Name: name,
Installed: allInstalled[name].Names,
return servers
// statusHandler handles the GET of the JSON for each service's status.
func (s *Server) statusHandler(w http.ResponseWriter, r *http.Request) {
w.Header().Set("Content-Type", "application/json")
enc := json.NewEncoder(w)
err := enc.Encode(s.getCurrentStatus())
if err != nil {
sklog.Errorf("Failed to write or encode output: %s", err)
func (s *Server) getCurrentStatus() map[string]*systemd.UnitStatus {
defer s.mutex.Unlock()
return s.currentStatus
func (s *Server) stepStatus() {
servers := serversFromAllInstalled(s.packageInfo.AllInstalled())
updatedStatus := s.serviceStatus(servers)
defer s.mutex.Unlock()
s.currentStatus = updatedStatus
func (s *Server) startStatusUpdate() {
for range time.Tick(5 * time.Second) {
// changeHandler handles actions on individual services.
// The actions are forwarded off to the pulld service
// running on the machine hosting that service.
func (s *Server) changeHandler(w http.ResponseWriter, r *http.Request) {
if !login.IsAdmin(r) {
httputils.ReportError(w, r, nil, "You must be logged on as an admin to push.")
if err := r.ParseForm(); err != nil {
httputils.ReportError(w, r, err, "Failed to parse form.")
action := r.Form.Get("action")
name := r.Form.Get("name")
machine := r.Form.Get("machine")
if action == "start" && name == "" {
body := fmt.Sprintf("%s rebooted %s", login.LoggedInAs(r), machine)
if err := chatbot.Send(body, "push", ""); err != nil {
sklog.Warningf("Failed to send chat notification: %s", err)
url := fmt.Sprintf("http://%s:10000/_/change?name=%s&action=%s", machine, name, action)
resp, err := s.fastClient.Post(url, "", nil)
if err != nil {
httputils.ReportError(w, r, err, fmt.Sprintf("Failed to reach %s: %v %s", machine, resp, err))
defer util.Close(resp.Body)
if resp.StatusCode != 200 {
httputils.ReportError(w, r, err, fmt.Sprintf("Failed to reach %s: %v %s", machine, resp, err))
w.Header().Set("Content-Type", "application/json")
if _, err := io.Copy(w, resp.Body); err != nil {
sklog.Errorf("Failed to copy JSON error out: %s", err)
func makeResourceHandler() func(http.ResponseWriter, *http.Request) {
fileServer := http.FileServer(http.Dir(*resourcesDir))
return func(w http.ResponseWriter, r *http.Request) {
w.Header().Add("Cache-Control", "max-age=300")
fileServer.ServeHTTP(w, r)
// oneStep does a single step of startDirtyMonitoring().
func (s *Server) oneStep() {
count := int64(0)
allInstalled := s.packageInfo.AllInstalled()
allAvailable := s.packageInfo.AllAvailable()
for serverName, installed := range allInstalled {
// Don't warn about dirty packages on staging instances.
if strings.HasSuffix(serverName, "-stage") {
sklog.Infof("Skipping %s", serverName)
for _, app := range installed.Names {
// app is the full versioned name of the installed app, we can find just
// the package name of the app by splitting off the stuff before the
// first "/".
parts := strings.Split(app, "/")
packageName := parts[0]
for _, version := range allAvailable[packageName] {
if version.Name == app {
if version.Dirty {
sklog.Infof("Finished oneStep: Found %d dirty packages running.", count)
metrics2.GetInt64Metric("dirty_packages", nil).Update(count)
// startDirtyMonitoring periodically checks the number of dirty packages being
// used in prod and reports that number to metrics.
// This function doesn't return and should be launched as a Go routine.
func (s *Server) startDirtyMonitoring() {
for range time.Tick(time.Minute) {
func main() {
if !*local {
login.SimpleInitMust(*port, *local)
s := newServer()
go s.startDirtyMonitoring()
go s.startStatusUpdate()
r := mux.NewRouter()
r.HandleFunc("/_/change", s.changeHandler)
r.HandleFunc("/_/state", s.stateHandler)
r.HandleFunc("/_/status", s.statusHandler)
if !*local {
r.HandleFunc("/loginstatus/", login.StatusHandler)
r.HandleFunc("/logout/", login.LogoutHandler)
r.HandleFunc("/oauth2callback/", login.OAuth2CallbackHandler)
http.Handle("/", httputils.LoggingGzipRequestResponse(r))
sklog.Infoln("Ready to serve.")
sklog.Fatal(http.ListenAndServe(*port, nil))