blob: 718486e10236f9d9bb07a72ae1dba76443854e5f [file] [log] [blame]
// Command-line application for interacting with BigTable backed Perf storage.
package main
import (
var (
traceStore tracestore.TraceStore
configFilename string
instanceConfig *config.InstanceConfig
// flags
var (
indicesTileFlag types.TileNumber
tracesTileFlag types.TileNumber
tracesQueryFlag string
ingestStartFlag string
ingestEndFlag string
ingestDryrunFlag bool
func mustGetStore() tracestore.TraceStore {
if traceStore != nil {
return traceStore
var err error
traceStore, err = builders.NewTraceStoreFromConfig(context.Background(), true, instanceConfig)
if err != nil {
sklog.Fatalf("Failed to create client: %s", err)
return traceStore
func main() {
cmd := cobra.Command{
Use: "perf-tool [sub]",
PersistentPreRunE: func(c *cobra.Command, args []string) error {
var err error
instanceConfig, err = config.InstanceConfigFromFile(configFilename)
if err != nil {
return skerr.Wrap(err)
return nil
cmd.PersistentFlags().StringVar(&configFilename, "config_filename", "./configs/nano.json", "The filename of the config file to use.")
err := cmd.MarkPersistentFlagRequired("config_filename")
if err != nil {
configCmd := &cobra.Command{
Use: "config [sub]",
configPubSubCmd := &cobra.Command{
Use: "create-pubsub-topics",
Short: "Create PubSub topics for the given big_table_config.",
RunE: configCreatePubSubTopicsAction,
indicesCmd := &cobra.Command{
Use: "indices [sub]",
indicesCmd.PersistentFlags().Int32Var((*int32)(&indicesTileFlag), "tile", -1, "The tile to query")
indicesCountCmd := &cobra.Command{
Use: "count",
Short: "Counts the number of index rows.",
Long: "Counts the index rows for the last (most recent) tile, or the tile specified by --tile.",
RunE: indicesCountAction,
indicesWriteCmd := &cobra.Command{
Use: "write",
Short: "Write indices",
Long: "Rewrites the indices for the last (most recent) tile, or the tile specified by --tile.",
RunE: indicesWriteAction,
indicesWriteAllCmd := &cobra.Command{
Use: "write-all",
Short: "Write indices for all tiles.",
Long: "Rewrites the indices for all tiles, --tiles is ignored. Starts with latest tile and keeps moving to previous tiles until it finds a tile with no traces.",
RunE: indicesWriteAllAction,
indicesWriteCmd.Flags().Int32Var((*int32)(&indicesTileFlag), "tile", -1, "The tile to query")
tilesCmd := &cobra.Command{
Use: "tiles [sub]",
tilesLast := &cobra.Command{
Use: "last",
Short: "Prints the offset of the last (most recent) tile.",
RunE: tilesLastAction,
tracesCmd := &cobra.Command{
Use: "traces [sub]",
tracesCmd.PersistentFlags().Int32Var((*int32)(&tracesTileFlag), "tile", -1, "The tile to query")
tracesCmd.PersistentFlags().StringVar(&tracesQueryFlag, "query", "", "The query to run. Defaults to the empty query which matches all traces.")
tracesListByIndexCmd := &cobra.Command{
Use: "list",
Short: "Prints the IDs of traces in the last (most recent) tile, or the tile specified by the --tile flag, that match --query.",
RunE: tracesListByIndexAction,
ingestCmd := &cobra.Command{
Use: "ingest [sub]",
ingestForceReingestCmd := &cobra.Command{
Use: "force-reingest",
Short: "Force re-ingestion of files.",
RunE: ingestForceReingestAction,
ingestForceReingestCmd.Flags().StringVar(&ingestStartFlag, "start", "", "Start the ingestion at this time, of the form: 2006-01-02. Default to one week ago.")
ingestForceReingestCmd.Flags().StringVar(&ingestEndFlag, "end", "", "Ingest up to this time, of the form: 2006-01-02. Defaults to now.")
ingestForceReingestCmd.Flags().BoolVar(&ingestDryrunFlag, "dryrun", false, "Just display the list of files to send.")
if err := cmd.Execute(); err != nil {
func tilesLastAction(c *cobra.Command, args []string) error {
tileNumber, err := mustGetStore().GetLatestTile()
if err != nil {
return err
return nil
func tracesListByIndexAction(c *cobra.Command, args []string) error {
var tileNumber types.TileNumber
store := mustGetStore()
if tracesTileFlag == -1 {
var err error
tileNumber, err = store.GetLatestTile()
if err != nil {
return err
} else {
tileNumber = tracesTileFlag
values, err := url.ParseQuery(tracesQueryFlag)
if err != nil {
return err
q, err := query.New(values)
if err != nil {
return err
ts, err := store.QueryTracesByIndex(context.Background(), tileNumber, q)
if err != nil {
return err
for id, trace := range ts {
fmt.Println(id, trace)
return nil
func indicesWriteAction(c *cobra.Command, args []string) error {
store := mustGetStore()
var tileNumber types.TileNumber
if indicesTileFlag == -1 {
var err error
tileNumber, err = store.GetLatestTile()
if err != nil {
return fmt.Errorf("Failed to get latest tile: %s", err)
} else {
tileNumber = indicesTileFlag
return store.WriteIndices(context.Background(), tileNumber)
func indicesWriteAllAction(c *cobra.Command, args []string) error {
store := mustGetStore()
tileNumber, err := store.GetLatestTile()
if err != nil {
return fmt.Errorf("Failed to get latest tile: %s", err)
for {
if err := store.WriteIndices(context.Background(), tileNumber); err != nil {
return err
sklog.Infof("Wrote index for tile %d", tileNumber)
tileNumber = tileNumber.Prev()
count, err := store.TraceCount(context.Background(), tileNumber)
if err != nil {
return err
if count == 0 {
return nil
func indicesCountAction(c *cobra.Command, args []string) error {
store := mustGetStore()
var tileNumber types.TileNumber
if indicesTileFlag == -1 {
var err error
tileNumber, err = store.GetLatestTile()
if err != nil {
return fmt.Errorf("Failed to get latest tile: %s", err)
} else {
tileNumber = indicesTileFlag
count, err := store.CountIndices(context.Background(), tileNumber)
if err == nil {
return err
func createPubSubTopic(ctx context.Context, client *pubsub.Client, topicName string) error {
topic := client.Topic(topicName)
ok, err := topic.Exists(ctx)
if err != nil {
return err
if ok {
fmt.Printf("Topic %q already exists\n", topicName)
return nil
_, err = client.CreateTopic(ctx, topicName)
if err != nil {
return fmt.Errorf("Failed to create topic %q: %s", topicName, err)
return nil
func configCreatePubSubTopicsAction(c *cobra.Command, args []string) error {
ctx := context.Background()
client, err := pubsub.NewClient(ctx, instanceConfig.IngestionConfig.SourceConfig.Project)
if err != nil {
return err
if err := createPubSubTopic(ctx, client, instanceConfig.IngestionConfig.SourceConfig.Topic); err != nil {
return err
if instanceConfig.IngestionConfig.FileIngestionTopicName != "" {
if err := createPubSubTopic(ctx, client, instanceConfig.IngestionConfig.FileIngestionTopicName); err != nil {
return err
return nil
func ingestForceReingestAction(c *cobra.Command, args []string) error {
ctx := context.Background()
ts, err := auth.NewDefaultTokenSource(true, storage.ScopeReadOnly)
if err != nil {
return skerr.Wrap(err)
pubSubClient, err := pubsub.NewClient(ctx, instanceConfig.IngestionConfig.SourceConfig.Project, option.WithTokenSource(ts))
if err != nil {
return skerr.Wrap(err)
topic := pubSubClient.Topic(instanceConfig.IngestionConfig.SourceConfig.Topic)
now := time.Now()
startTime := now.Add(-7 * 24 * time.Hour)
if ingestStartFlag != "" {
startTime, err = time.Parse("2006-01-02", ingestStartFlag)
if err != nil {
return skerr.Wrap(err)
endTime := now
if ingestEndFlag != "" {
endTime, err = time.Parse("2006-01-02", ingestEndFlag)
if err != nil {
return skerr.Wrap(err)
client := httputils.DefaultClientConfig().WithTokenSource(ts).WithoutRetries().Client()
gcsClient, err := storage.NewClient(ctx, option.WithHTTPClient(client))
if err != nil {
return skerr.Wrap(err)
for _, prefix := range instanceConfig.IngestionConfig.SourceConfig.Sources {
u, err := url.Parse(prefix)
if err != nil {
return skerr.Wrap(err)
dirs := fileutil.GetHourlyDirs(u.Path[1:], startTime.Unix(), endTime.Unix())
for _, dir := range dirs {
sklog.Infof("Directory: %q", dir)
err := gcs.AllFilesInDir(gcsClient, u.Host, dir, func(item *storage.ObjectAttrs) {
// The PubSub event data is a JSON serialized storage.ObjectAttrs object.
// See
sklog.Infof("File: %q", item.Name)
b, err := json.Marshal(storage.ObjectAttrs{
Name: item.Name,
Bucket: u.Host,
if err != nil {
sklog.Errorf("Failed to serialize event: %s", err)
if ingestDryrunFlag {
fmt.Println(item.Name, item.Bucket)
topic.Publish(ctx, &pubsub.Message{
Data: b,
if err != nil {
return skerr.Wrap(err)
return nil