blob: 021409f25236e7d3dbe2454215b6f345aafe4aef [file] [log] [blame]
package btts
import (
"context"
"fmt"
"cloud.google.com/go/bigtable"
"go.skia.org/infra/go/paramtools"
"go.skia.org/infra/go/skerr"
"go.skia.org/infra/perf/go/btts/engine"
)
const (
// MAX_PARALLEL_PARAM_INDEX is the maxumum number of key=value pairs that
// can appear in a Query Plan.
MAX_PARALLEL_PARAM_INDEX = 200
)
// sizeOfPlan returns the number of key=value pairs in the ParamSet.
func sizeOfPlan(plan paramtools.ParamSet) int {
count := 0
for _, values := range plan {
count += len(values)
}
return count
}
// validatePlan takes a plan (a ParamSet of OPS encoded keys and values) and
// validates that it should run to completion. This will also error if the query
// is too large, i.e. would generate too many concurrent queries to BigTable.
func validatePlan(plan paramtools.ParamSet) error {
count := sizeOfPlan(plan)
if count > MAX_PARALLEL_PARAM_INDEX {
return fmt.Errorf("Plan is too large, found %d > %d key,value pairs.", count, MAX_PARALLEL_PARAM_INDEX)
}
return nil
}
// ExecutePlan takes a query plan and executes it over a table for the given
// TileKey. The result is a channel that will produce encoded keys in
// alphabetical order and will close after the query is done executing.
//
// An error will be returned if the query is invalid.
//
// The desc is a descriptive string to add to any error logs this function produces.
//
// See Query Engine in BIGTABLE.md for the design.
func ExecutePlan(ctx context.Context, plan paramtools.ParamSet, table *bigtable.Table, tileKey TileKey, desc string) (<-chan string, error) {
if err := validatePlan(plan); err != nil {
return nil, skerr.Wrapf(err, "Plan is invalid")
}
intersectChannels := make([]<-chan string, 0, len(plan))
for key, values := range plan {
unionChannels := make([]<-chan string, len(values))
for i, value := range values {
unionChannels[i] = ParamIndex(ctx, table, tileKey, key, value, desc)
}
intersectChannels = append(intersectChannels, engine.NewUnion(ctx, unionChannels))
}
out := engine.NewIntersect(ctx, intersectChannels)
return out, nil
}