blob: 37d731c25b02a769528c2d7b6bccf9c502c72db9 [file] [log] [blame]
package td
import (
"bytes"
"encoding/json"
"fmt"
"io"
"os"
"reflect"
"sync"
"cloud.google.com/go/logging"
"github.com/golang/glog"
"go.skia.org/infra/go/util"
)
// Severity indicates the importance of a LogStream, with greater values
// indicating greater severity. Valid values include Debug, Info, Warning, and
// Error.
type Severity int
const (
Debug = Severity(logging.Debug)
Info = Severity(logging.Info)
Warning = Severity(logging.Warning)
Error = Severity(logging.Error)
)
// asCloudLoggingSeverity returns the equivalent logging.Severity for s.
func (s Severity) asCloudLoggingSeverity() logging.Severity {
return logging.Severity(s)
}
// String returns the name of s.
func (s Severity) String() string {
return s.asCloudLoggingSeverity().String()
}
// Receiver is an interface used to implement arbitrary receivers of step
// metadata, as steps are run.
type Receiver interface {
// Handle the given message.
HandleMessage(*Message) error
LogStream(stepId string, logId string, severity Severity) (io.Writer, error)
Close() error
}
// MultiReceiver is a Receiver which multiplexes messages to multiple Receivers.
type MultiReceiver []Receiver
// See documentation for Receiver interface.
func (r MultiReceiver) HandleMessage(m *Message) error {
g := util.NewNamedErrGroup()
for _, rec := range r {
receiver := rec
name := fmt.Sprint(reflect.TypeOf(receiver))
g.Go(name, func() error {
return receiver.HandleMessage(m)
})
}
return g.Wait()
}
// See documentation for Receiver interface.
func (r MultiReceiver) LogStream(stepId, logId string, severity Severity) (io.Writer, error) {
writers := make([]io.Writer, 0, len(r))
for _, rec := range r {
w, err := rec.LogStream(stepId, logId, severity)
if err != nil {
return nil, err
}
writers = append(writers, w)
}
return util.MultiWriter(writers), nil
}
// See documentation for Receiver interface.
func (r MultiReceiver) Close() error {
g := util.NewNamedErrGroup()
for _, rec := range r {
receiver := rec
name := fmt.Sprint(reflect.TypeOf(receiver))
g.Go(name, func() error {
return receiver.Close()
})
}
return g.Wait()
}
// DebugReceiver just dumps the messages straight to the log (stdout/stderr, not
// to Cloud Logging).
type DebugReceiver struct{}
// See documentation for Receiver interface.
func (r *DebugReceiver) HandleMessage(m *Message) error {
switch m.Type {
case MSG_TYPE_RUN_STARTED:
glog.Infof("RUN_STARTED: %+v", m.Run)
case MSG_TYPE_STEP_STARTED:
glog.Infof("STEP_STARTED: %s", m.StepId)
case MSG_TYPE_STEP_FINISHED:
glog.Infof("STEP_FINISHED: %s", m.StepId)
case MSG_TYPE_STEP_EXCEPTION:
glog.Infof("STEP_EXCEPTION: %s", m.StepId)
case MSG_TYPE_STEP_FAILED:
glog.Infof("STEP_FAILED: %s", m.StepId)
case MSG_TYPE_STEP_DATA:
b, err := json.MarshalIndent(m.Data, "", " ")
if err != nil {
return err
}
glog.Infof("STEP_DATA: %s: %s", m.StepId, string(b))
default:
return fmt.Errorf("Invalid message type %s", m.Type)
}
return nil
}
// See documentation for Receiver interface.
func (r *DebugReceiver) LogStream(stepId, logId string, severity Severity) (io.Writer, error) {
if severity >= Warning {
return os.Stderr, nil
}
return os.Stdout, nil
}
// See documentation for Receiver interface.
func (r *DebugReceiver) Close() error {
glog.Info("Run finished.")
return nil
}
// StepReport is a struct used to collect information about a given step.
type StepReport struct {
*StepProperties
Data []interface{} `json:"data,omitempty"`
Errors []string `json:"errors,omitempty"`
Exceptions []string `json:"exceptions,omitempty"`
Logs map[string]*bytes.Buffer
Result StepResult `json:"result,omitempty"`
Steps []*StepReport `json:"steps,omitempty"`
}
// ReportReceiver collects all messages and generates a report when requested.
type ReportReceiver struct {
mtx sync.Mutex
root *StepReport
output string
}
// newReportReceiver returns a ReportReceiver instance.
func newReportReceiver(output string) *ReportReceiver {
return &ReportReceiver{
output: output,
}
}
// Recurse through all steps, running the given function. If the function
// returns false, recursion stops.
func (s *StepReport) Recurse(fn func(*StepReport) bool) bool {
if keepGoing := fn(s); !keepGoing {
return false
}
for _, sub := range s.Steps {
if keepGoing := sub.Recurse(fn); !keepGoing {
return false
}
}
return true
}
// Find the step with the given ID in our list. This helps in case messages
// arrive out of order.
func (s *StepReport) findStep(id string) (*StepReport, error) {
var rv *StepReport
s.Recurse(func(s *StepReport) bool {
if s.Id == id {
rv = s
return false
}
return true
})
if rv != nil {
return rv, nil
}
return nil, fmt.Errorf("Unknown step ID %q", id)
}
// Find the step with the given ID in our list. This helps in case messages
// arrive out of order.
func (r *ReportReceiver) findStep(id string) (*StepReport, error) {
if r.root == nil {
return nil, fmt.Errorf("No steps!")
}
return r.root.findStep(id)
}
// See documentation for Receiver interface.
func (r *ReportReceiver) HandleMessage(m *Message) error {
r.mtx.Lock()
defer r.mtx.Unlock()
switch m.Type {
case MSG_TYPE_RUN_STARTED:
// Do nothing.
case MSG_TYPE_STEP_STARTED:
s := &StepReport{
StepProperties: m.Step,
Logs: map[string]*bytes.Buffer{},
}
if m.Step.Id == STEP_ID_ROOT {
r.root = s
} else {
parent, err := r.findStep(m.Step.Parent)
if err != nil {
return err
}
parent.Steps = append(parent.Steps, s)
}
case MSG_TYPE_STEP_FINISHED:
s, err := r.findStep(m.StepId)
if err != nil {
return err
}
if len(s.Errors) == 0 && len(s.Exceptions) == 0 {
s.Result = STEP_RESULT_SUCCESS
}
case MSG_TYPE_STEP_FAILED:
s, err := r.findStep(m.StepId)
if err != nil {
return err
}
s.Errors = append(s.Errors, m.Error)
s.Result = STEP_RESULT_FAILURE
case MSG_TYPE_STEP_EXCEPTION:
s, err := r.findStep(m.StepId)
if err != nil {
return err
}
s.Exceptions = append(s.Exceptions, m.Error)
s.Result = STEP_RESULT_EXCEPTION
case MSG_TYPE_STEP_DATA:
s, err := r.findStep(m.StepId)
if err != nil {
return err
}
s.Data = append(s.Data, m.Data)
}
return nil
}
// See documentation for Receiver interface.
func (r *ReportReceiver) Close() error {
r.mtx.Lock()
defer r.mtx.Unlock()
if r.output == "" {
return nil
}
// Visit each step, attaching the final log contents to each logData
// instance.
r.root.Recurse(func(s *StepReport) bool {
for _, data := range s.Data {
d, ok := data.(*LogData)
if ok {
if logBuf, ok := s.Logs[d.Id]; ok {
d.Log = logBuf.String()
}
}
}
return true
})
// Dump JSON to the given output.
b, err := json.MarshalIndent(r.root, "", " ")
if err != nil {
return err
}
// Write the report to the desired output.
if r.output == "-" {
_, err = os.Stdout.Write(b)
return err
}
return util.WithWriteFile(r.output, func(w io.Writer) error {
_, err := w.Write(b)
return err
})
}
// See documentation for Receiver interface.
func (r *ReportReceiver) LogStream(stepId, logId string, _ Severity) (io.Writer, error) {
r.mtx.Lock()
defer r.mtx.Unlock()
buf := bytes.NewBuffer([]byte{})
step, err := r.findStep(stepId)
if err != nil {
return nil, err
}
if _, ok := step.Logs[logId]; ok {
return nil, fmt.Errorf("Step %s already has a log with ID %s", stepId, logId)
}
step.Logs[logId] = buf
return buf, nil
}
// CloudLoggingReceiver is a Receiver which sends step metadata and logs to
// Cloud Logging.
type CloudLoggingReceiver struct {
// logger is a handle to the Logger used for the entire test run.
logger *logging.Logger
}
// Return a new CloudLoggingReceiver. This initializes Cloud Logging for the
// entire test run.
func NewCloudLoggingReceiver(logger *logging.Logger) (*CloudLoggingReceiver, error) {
return &CloudLoggingReceiver{
logger: logger,
}, nil
}
// See documentation for Receiver interface.
func (r *CloudLoggingReceiver) HandleMessage(m *Message) error {
// TODO(borenet): When should we LogSync, or Flush? If the program
// crashes or is killed, we'll want to have already flushed the logs.
labels := map[string]string{}
if m.StepId != "" {
labels["stepId"] = m.StepId
}
r.logger.Log(logging.Entry{
Payload: m,
Severity: logging.Debug,
Labels: labels,
})
return nil
}
// cloudLogsWriter is an io.Writer which writes to Cloud Logging.
type cloudLogsWriter struct {
logger *logging.Logger
labels map[string]string
severity logging.Severity
}
// See documentation for io.Writer.
func (w *cloudLogsWriter) Write(b []byte) (int, error) {
// TODO(borenet): Should we buffer until we see a newline?
// TODO(borenet): When should we LogSync, or Flush? If the program
// crashes or is killed, we'll want to have already flushed the logs.
w.logger.Log(logging.Entry{
Labels: w.labels,
Payload: string(b),
Severity: w.severity,
})
return len(b), nil
}
// See documentation for Receiver interface.
func (r *CloudLoggingReceiver) LogStream(stepId, logId string, severity Severity) (io.Writer, error) {
return &cloudLogsWriter{
logger: r.logger,
labels: map[string]string{
"logId": logId,
"stepId": stepId,
},
severity: severity.asCloudLoggingSeverity(),
}, nil
}
// See documentation for Receiver interface.
func (r *CloudLoggingReceiver) Close() error {
return r.logger.Flush()
}