blob: 67ea94158140972c021ee59a53652b490716c566 [file] [log] [blame]
package logs
/*
The logs package provides an interface for inserting and retrieving
Task Driver logs in Cloud BigTable.
*/
import (
"bytes"
"context"
"encoding/gob"
"fmt"
"time"
"cloud.google.com/go/bigtable"
"go.opencensus.io/trace"
"golang.org/x/oauth2"
"google.golang.org/api/option"
"go.skia.org/infra/go/sklog"
"go.skia.org/infra/go/util"
"go.skia.org/infra/task_driver/go/td"
)
const (
// We use a single BigTable table for storing logs.
BT_TABLE = "task-driver-logs"
// We use a single BigTable column family.
BT_COLUMN_FAMILY = "LOGS"
// We use a single BigTable column which stores gob-encoded log entries.
BT_COLUMN = "ENTRY"
INSERT_TIMEOUT = 30 * time.Second
QUERY_TIMEOUT = 5 * time.Second
)
var (
// Fully-qualified BigTable column name.
BT_COLUMN_FULL = fmt.Sprintf("%s:%s", BT_COLUMN_FAMILY, BT_COLUMN)
)
// rowKey returns a BigTable row key for a log entry. If any of the parameters
// is empty, then only the row key prefix for the provided parameters is
// returned, which allows rowKey to be used for prefix searches.
func rowKey(taskId, stepId, logId string, ts time.Time, insertId string) string {
// Full log for an entire task.
rv := taskId
// All logs related to a step.
if stepId != "" {
rv += "#" + stepId
} else {
return rv
}
// A single log stream.
if logId != "" {
rv += "#" + logId
} else {
return rv
}
// Timestamp.
if !util.TimeIsZero(ts) {
rv += "#" + ts.UTC().Format(util.SAFE_TIMESTAMP_FORMAT)
} else {
return rv
}
// Log insert ID. Included in case of multiple entries having the same
// timestamp.
if insertId != "" {
rv += "#" + insertId
} else {
return rv
}
// Done.
return rv
}
// Entry mimics logging.Entry, which for some reason does not include the
// jsonPayload field, and is not parsable via json.Unmarshal due to the Severity
// type.
type Entry struct {
InsertID string `json:"insertId"`
Labels map[string]string `json:"labels"`
LogName string `json:"logName"`
ReceiveTimestamp time.Time `json:"receiveTimestamp"`
//Resource
Severity string `json:"severity"`
JsonPayload *td.Message `json:"jsonPayload"`
TextPayload string `json:"textPayload"`
Timestamp time.Time `json:"timestamp"`
}
// LogsManager is a struct which provides an interface for inserting and
// retrieving Task Driver logs in Cloud BigTable.
type LogsManager struct {
client *bigtable.Client
table *bigtable.Table
}
// NewLogsManager returns a LogsManager instance.
func NewLogsManager(ctx context.Context, project, instance string, ts oauth2.TokenSource) (*LogsManager, error) {
client, err := bigtable.NewClient(ctx, project, instance, option.WithTokenSource(ts))
if err != nil {
return nil, fmt.Errorf("Failed to create BigTable client: %s", err)
}
table := client.Open(BT_TABLE)
return &LogsManager{
client: client,
table: table,
}, nil
}
// Close the LogsManager.
func (m *LogsManager) Close() error {
return m.client.Close()
}
// Insert the given log entry.
func (m *LogsManager) Insert(ctx context.Context, e *Entry) error {
ctx, span := trace.StartSpan(ctx, "LogsManager_Insert")
defer span.End()
// Encode the log entry.
buf := bytes.Buffer{}
if err := gob.NewEncoder(&buf).Encode(e); err != nil {
return fmt.Errorf("Failed to gob-encode log entry: %s", err)
}
// Insert the log entry into BigTable.
mt := bigtable.NewMutation()
mt.Set(BT_COLUMN_FAMILY, BT_COLUMN, bigtable.Time(e.Timestamp), buf.Bytes())
taskId, ok := e.Labels["taskId"]
if !ok {
// TODO(borenet): We should Ack() the message in this case.
return fmt.Errorf("Log entry is missing a task ID! %+v", e)
}
stepId, ok := e.Labels["stepId"]
if !ok {
stepId = "no_step_id"
}
logId, ok := e.Labels["logId"]
if !ok {
logId = "no_log_id"
}
rk := rowKey(taskId, stepId, logId, e.Timestamp, e.InsertID)
ctx, cancel := context.WithTimeout(ctx, INSERT_TIMEOUT)
defer cancel()
return m.table.Apply(ctx, rk, mt)
}
// Search returns Entries matching the given search terms.
func (m *LogsManager) Search(ctx context.Context, taskId, stepId, logId string) ([]*Entry, error) {
ctx, span := trace.StartSpan(ctx, "LogsManager_Search")
defer span.End()
prefix := rowKey(taskId, stepId, logId, time.Time{}, "")
sklog.Infof("Searching for entries with prefix: %s", prefix)
entries := []*Entry{}
var decodeErr error
ctx, cancel := context.WithTimeout(ctx, QUERY_TIMEOUT)
defer cancel()
if err := m.table.ReadRows(ctx, bigtable.PrefixRange(prefix), func(row bigtable.Row) bool {
for _, ri := range row[BT_COLUMN_FAMILY] {
if ri.Column == BT_COLUMN_FULL {
var e Entry
decodeErr = gob.NewDecoder(bytes.NewReader(ri.Value)).Decode(&e)
if decodeErr != nil {
return false
}
entries = append(entries, &e)
// We only store one entry per row.
return true
}
}
return true
}, bigtable.RowFilter(bigtable.LatestNFilter(1))); err != nil {
return nil, fmt.Errorf("Failed to retrieve data from BigTable: %s", err)
}
if decodeErr != nil {
return nil, fmt.Errorf("Failed to gob-decode entry: %s", decodeErr)
}
return entries, nil
}