blob: b478088bd929d30640e1f123dfde390765281edb [file] [log] [blame]
// Command-line application for interacting with BigTable backed Perf storage.
package main
import (
"context"
"encoding/json"
"fmt"
"net/url"
"os"
"time"
"cloud.google.com/go/bigtable"
"cloud.google.com/go/pubsub"
"cloud.google.com/go/storage"
"github.com/spf13/cobra"
"go.skia.org/infra/go/auth"
"go.skia.org/infra/go/fileutil"
"go.skia.org/infra/go/gcs"
"go.skia.org/infra/go/httputils"
"go.skia.org/infra/go/query"
"go.skia.org/infra/go/skerr"
"go.skia.org/infra/go/sklog"
"go.skia.org/infra/go/sklog/glog_and_cloud"
"go.skia.org/infra/perf/go/config"
"go.skia.org/infra/perf/go/tracestore"
"go.skia.org/infra/perf/go/tracestore/btts"
"go.skia.org/infra/perf/go/types"
"golang.org/x/oauth2"
"google.golang.org/api/option"
)
var (
ts oauth2.TokenSource
store tracestore.TraceStore
configFilename string
instanceConfig *config.InstanceConfig
)
// flags
var (
indicesTileFlag types.TileNumber
tracesTileFlag types.TileNumber
tracesQueryFlag string
ingestStartFlag string
ingestEndFlag string
ingestDryrunFlag bool
)
func main() {
ctx := context.Background()
cmd := cobra.Command{
Use: "perf-tool [sub]",
PersistentPreRunE: func(c *cobra.Command, args []string) error {
glog_and_cloud.SetLogger(glog_and_cloud.NewStdErrCloudLogger(glog_and_cloud.SLogStderr))
var err error
ts, err = auth.NewDefaultTokenSource(true, bigtable.Scope)
if err != nil {
return fmt.Errorf("Failed to auth: %s", err)
}
instanceConfig, err = config.InstanceConfigFromFile(configFilename)
if err != nil {
return skerr.Wrap(err)
}
// Create the store client.
store, err = btts.NewBigTableTraceStoreFromConfig(ctx, instanceConfig, ts, false)
if err != nil {
return fmt.Errorf("Failed to create client: %s", err)
}
return nil
},
}
cmd.PersistentFlags().StringVar(&configFilename, "config_filename", "./configs/nano.json", "The filename of the config file to use.")
err := cmd.MarkPersistentFlagRequired("config_filename")
if err != nil {
sklog.Fatal(err)
}
configCmd := &cobra.Command{
Use: "config [sub]",
}
configPubSubCmd := &cobra.Command{
Use: "create-pubsub-topics",
Short: "Create PubSub topics for the given big_table_config.",
RunE: configCreatePubSubTopicsAction,
}
configCmd.AddCommand(configPubSubCmd)
indicesCmd := &cobra.Command{
Use: "indices [sub]",
}
indicesCmd.PersistentFlags().Int32Var((*int32)(&indicesTileFlag), "tile", -1, "The tile to query")
indicesCountCmd := &cobra.Command{
Use: "count",
Short: "Counts the number of index rows.",
Long: "Counts the index rows for the last (most recent) tile, or the tile specified by --tile.",
RunE: indicesCountAction,
}
indicesWriteCmd := &cobra.Command{
Use: "write",
Short: "Write indices",
Long: "Rewrites the indices for the last (most recent) tile, or the tile specified by --tile.",
RunE: indicesWriteAction,
}
indicesWriteAllCmd := &cobra.Command{
Use: "write-all",
Short: "Write indices for all tiles.",
Long: "Rewrites the indices for all tiles, --tiles is ignored. Starts with latest tile and keeps moving to previous tiles until it finds a tile with no traces.",
RunE: indicesWriteAllAction,
}
indicesWriteCmd.Flags().Int32Var((*int32)(&indicesTileFlag), "tile", -1, "The tile to query")
indicesCmd.AddCommand(
indicesCountCmd,
indicesWriteCmd,
indicesWriteAllCmd,
)
tilesCmd := &cobra.Command{
Use: "tiles [sub]",
}
tilesLast := &cobra.Command{
Use: "last",
Short: "Prints the offset of the last (most recent) tile.",
RunE: tilesLastAction,
}
tilesCmd.AddCommand(
tilesLast,
)
tracesCmd := &cobra.Command{
Use: "traces [sub]",
}
tracesCmd.PersistentFlags().Int32Var((*int32)(&tracesTileFlag), "tile", -1, "The tile to query")
tracesCmd.PersistentFlags().StringVar(&tracesQueryFlag, "query", "", "The query to run. Defaults to the empty query which matches all traces.")
tracesListByIndexCmd := &cobra.Command{
Use: "list",
Short: "Prints the IDs of traces in the last (most recent) tile, or the tile specified by the --tile flag, that match --query.",
RunE: tracesListByIndexAction,
}
tracesCmd.AddCommand(
tracesListByIndexCmd,
)
ingestCmd := &cobra.Command{
Use: "ingest [sub]",
}
ingestForceReingestCmd := &cobra.Command{
Use: "force-reingest",
Short: "Force re-ingestion of files.",
RunE: ingestForceReingestAction,
}
ingestForceReingestCmd.Flags().StringVar(&ingestStartFlag, "start", "", "Start the ingestion at this time, of the form: 2006-01-02. Default to one week ago.")
ingestForceReingestCmd.Flags().StringVar(&ingestEndFlag, "end", "", "Ingest up to this time, of the form: 2006-01-02. Defaults to now.")
ingestForceReingestCmd.Flags().BoolVar(&ingestDryrunFlag, "dryrun", false, "Just display the list of files to send.")
ingestCmd.AddCommand(ingestForceReingestCmd)
cmd.AddCommand(
configCmd,
indicesCmd,
tilesCmd,
tracesCmd,
ingestCmd,
)
if err := cmd.Execute(); err != nil {
fmt.Println(err)
os.Exit(1)
}
}
func tilesLastAction(c *cobra.Command, args []string) error {
tileNumber, err := store.GetLatestTile()
if err != nil {
return err
}
fmt.Println(tileNumber)
return nil
}
func tracesListByIndexAction(c *cobra.Command, args []string) error {
var tileNumber types.TileNumber
if tracesTileFlag == -1 {
var err error
tileNumber, err = store.GetLatestTile()
if err != nil {
return err
}
} else {
tileNumber = tracesTileFlag
}
values, err := url.ParseQuery(tracesQueryFlag)
if err != nil {
return err
}
q, err := query.New(values)
if err != nil {
return err
}
ts, err := store.QueryTracesByIndex(context.Background(), tileNumber, q)
if err != nil {
return err
}
for id, trace := range ts {
fmt.Println(id, trace)
}
return nil
}
func indicesWriteAction(c *cobra.Command, args []string) error {
var tileNumber types.TileNumber
if indicesTileFlag == -1 {
var err error
tileNumber, err = store.GetLatestTile()
if err != nil {
return fmt.Errorf("Failed to get latest tile: %s", err)
}
} else {
tileNumber = indicesTileFlag
}
return store.WriteIndices(context.Background(), tileNumber)
}
func indicesWriteAllAction(c *cobra.Command, args []string) error {
tileNumber, err := store.GetLatestTile()
if err != nil {
return fmt.Errorf("Failed to get latest tile: %s", err)
}
for {
if err := store.WriteIndices(context.Background(), tileNumber); err != nil {
return err
}
sklog.Infof("Wrote index for tile %d", tileNumber)
tileNumber = tileNumber.Prev()
count, err := store.TraceCount(context.Background(), tileNumber)
if err != nil {
return err
}
if count == 0 {
break
}
}
return nil
}
func indicesCountAction(c *cobra.Command, args []string) error {
var tileNumber types.TileNumber
if indicesTileFlag == -1 {
var err error
tileNumber, err = store.GetLatestTile()
if err != nil {
return fmt.Errorf("Failed to get latest tile: %s", err)
}
} else {
tileNumber = indicesTileFlag
}
count, err := store.CountIndices(context.Background(), tileNumber)
if err == nil {
fmt.Println(count)
}
return err
}
func createPubSubTopic(ctx context.Context, client *pubsub.Client, topicName string) error {
topic := client.Topic(topicName)
ok, err := topic.Exists(ctx)
if err != nil {
return err
}
if ok {
fmt.Printf("Topic %q already exists\n", topicName)
return nil
}
_, err = client.CreateTopic(ctx, topicName)
return fmt.Errorf("Failed to create topic %q: %s", topicName, err)
}
func configCreatePubSubTopicsAction(c *cobra.Command, args []string) error {
ctx := context.Background()
client, err := pubsub.NewClient(ctx, instanceConfig.IngestionConfig.SourceConfig.Project)
if err != nil {
return err
}
if err := createPubSubTopic(ctx, client, instanceConfig.IngestionConfig.SourceConfig.Topic); err != nil {
return err
}
if instanceConfig.IngestionConfig.FileIngestionTopicName != "" {
if err := createPubSubTopic(ctx, client, instanceConfig.IngestionConfig.FileIngestionTopicName); err != nil {
return err
}
}
return nil
}
func ingestForceReingestAction(c *cobra.Command, args []string) error {
ctx := context.Background()
ts, err := auth.NewDefaultTokenSource(true, storage.ScopeReadOnly)
if err != nil {
return skerr.Wrap(err)
}
pubSubClient, err := pubsub.NewClient(ctx, instanceConfig.IngestionConfig.SourceConfig.Project, option.WithTokenSource(ts))
if err != nil {
return skerr.Wrap(err)
}
topic := pubSubClient.Topic(instanceConfig.IngestionConfig.SourceConfig.Topic)
now := time.Now()
startTime := now.Add(-7 * 24 * time.Hour)
if ingestStartFlag != "" {
startTime, err = time.Parse("2006-01-02", ingestStartFlag)
if err != nil {
return skerr.Wrap(err)
}
}
endTime := now
if ingestEndFlag != "" {
endTime, err = time.Parse("2006-01-02", ingestEndFlag)
if err != nil {
return skerr.Wrap(err)
}
}
client := httputils.DefaultClientConfig().WithTokenSource(ts).WithoutRetries().Client()
gcsClient, err := storage.NewClient(ctx, option.WithHTTPClient(client))
if err != nil {
return skerr.Wrap(err)
}
for _, prefix := range instanceConfig.IngestionConfig.SourceConfig.Sources {
u, err := url.Parse(prefix)
if err != nil {
return skerr.Wrap(err)
}
dirs := fileutil.GetHourlyDirs(u.Path[1:], startTime.Unix(), endTime.Unix())
for _, dir := range dirs {
sklog.Infof("Directory: %q", dir)
err := gcs.AllFilesInDir(gcsClient, u.Host, dir, func(item *storage.ObjectAttrs) {
// The PubSub event data is a JSON serialized storage.ObjectAttrs object.
// See https://cloud.google.com/storage/docs/pubsub-notifications#payload
sklog.Infof("File: %q", item.Name)
b, err := json.Marshal(storage.ObjectAttrs{
Name: item.Name,
Bucket: u.Host,
})
if err != nil {
sklog.Errorf("Failed to serialize event: %s", err)
return
}
if ingestDryrunFlag {
fmt.Println(item.Name, item.Bucket)
return
}
topic.Publish(ctx, &pubsub.Message{
Data: b,
})
})
if err != nil {
return skerr.Wrap(err)
}
}
}
return nil
}