blob: b0c3a01fc9b4a6e129fa4c2a85b68c50e676f671 [file] [log] [blame]
// Package storage implements culprit.Store using SQL.
//
// Please see perf/sql/migrations for the database schema used.
package sqlculpritstore
import (
"context"
"encoding/json"
"fmt"
"strings"
"time"
"github.com/google/uuid"
"go.skia.org/infra/go/skerr"
"go.skia.org/infra/go/sql/pool"
"go.skia.org/infra/go/sql/sqlutil"
pb "go.skia.org/infra/perf/go/culprit/proto/v1"
"go.skia.org/infra/perf/go/culprit/sqlculpritstore/schema"
)
// CulpritStore implements the culprits.Store interface.
type CulpritStore struct {
// db is the database interface.
db pool.Pool
}
// New returns a new *CulpritStore.
func New(db pool.Pool) (*CulpritStore, error) {
return &CulpritStore{
db: db,
}, nil
}
// Returns Culprit objects corresponding to given set of ids.
func (s *CulpritStore) Get(ctx context.Context, ids []string) ([]*pb.Culprit, error) {
statement := "SELECT id, host, project, ref, revision, anomaly_group_ids, issue_ids, group_issue_map FROM Culprits where id IN (%s)"
query := fmt.Sprintf(statement, quotedSlice(ids))
fmt.Println(query)
rows, err := s.db.Query(ctx, query)
if err != nil {
return nil, skerr.Wrapf(err, "Failed to query Culprit")
}
var resp []*pb.Culprit
for rows.Next() {
var id string
var host string
var project string
var ref string
var revision string
var anomaly_group_ids []string
var issue_ids []string
var group_issue_map_in_jsonb string
if err := rows.Scan(&id, &host, &project, &ref, &revision, &anomaly_group_ids, &issue_ids, &group_issue_map_in_jsonb); err != nil {
return nil, skerr.Wrapf(err, "Failed to read Culprit results")
}
var group_issue_map map[string]string
err = json.Unmarshal([]byte(group_issue_map_in_jsonb), &group_issue_map)
if err != nil {
return nil, skerr.Wrapf(err, "Failed to unmarshal the group issue map string %s", group_issue_map_in_jsonb)
}
resp = append(resp, &pb.Culprit{
Commit: &pb.Commit{Host: host, Project: project, Ref: ref, Revision: revision},
AnomalyGroupIds: anomaly_group_ids,
IssueIds: issue_ids,
GroupIssueMap: group_issue_map,
})
}
return resp, nil
}
// Inserts the given culprit elements in the persistant storage. If a culprit already exists,
// appends the anomaly_group_id into its corresponding field.
func (s *CulpritStore) Upsert(ctx context.Context, anomaly_group_id string, ip_commits []*pb.Commit) ([]string, error) {
if len(ip_commits) <= 0 || anomaly_group_id == "" {
return nil, skerr.Fmt("no culprits/anomaly_group_id provided")
}
for _, commit := range ip_commits {
if commit.Host != ip_commits[0].Host ||
commit.Project != ip_commits[0].Project ||
commit.Ref != ip_commits[0].Ref {
return nil, skerr.Fmt(
"all culprits should have same host, project and ref value")
}
}
// Fetch existing anomaly_group_ids
whereCols := []string{"host", "project", "ref", "revision"}
statement := "SELECT id, revision, anomaly_group_ids FROM Culprits WHERE "
statement += sqlutil.WherePlaceholders(whereCols, len(ip_commits))
arguments := make([]interface{}, 0, len(whereCols)*len(ip_commits))
for _, commit := range ip_commits {
arguments = append(
arguments, commit.Host, commit.Project,
commit.Ref,
commit.Revision)
}
rows, err := s.db.Query(ctx, statement, arguments...)
if err != nil {
return nil, skerr.Wrapf(err, "Failed to query Culprit")
}
// Collect anomaly_group_ids and merge with anomaly_group_id in request
culprits_map := convertProtoToSchema(anomaly_group_id, ip_commits)
for rows.Next() {
var row_id string
var row_revision string
var row_group_ids []string
if err := rows.Scan(&row_id, &row_revision, &row_group_ids); err != nil {
return nil, skerr.Wrapf(err, "Failed to read Culprit results")
}
culprits_map[row_revision].Id = row_id
culprits_map[row_revision].AnomalyGroupIDs =
append(culprits_map[row_revision].AnomalyGroupIDs, row_group_ids...)
}
existing_culprits, new_culprits := splitIntoExistingAndNew(culprits_map)
current_time := time.Now().Unix()
return_ids := make([]string, 0)
// Update existing culprits into Culprit table
if len(existing_culprits) > 0 {
statement = "UPSERT INTO Culprits (id, anomaly_group_ids, last_modified) VALUES"
const colsPerRow = 3 // should match number of columns in `statement`
statement += sqlutil.ValuesPlaceholders(colsPerRow, len(existing_culprits))
arguments = make([]interface{}, 0, colsPerRow*len(existing_culprits))
ids := make([]string, 0)
for _, culprit := range existing_culprits {
arguments = append(arguments, culprit.Id, culprit.AnomalyGroupIDs,
current_time)
ids = append(ids, culprit.Id)
}
_, err := s.db.Exec(ctx, statement, arguments...)
if err != nil {
return nil, skerr.Wrapf(err, "Failed to upsert culprit")
} else {
return_ids = append(return_ids, ids...)
}
}
// Insert new culprits into Culprit table
if len(new_culprits) > 0 {
statement = "INSERT INTO Culprits (id, host, project, ref, revision, anomaly_group_ids, last_modified) VALUES "
const colsPerRow = 7 // should match number of columns in `statement`
statement += sqlutil.ValuesPlaceholders(colsPerRow, len(new_culprits))
arguments = make([]interface{}, 0, colsPerRow*len(new_culprits))
ids := make([]string, 0)
for _, culprit := range new_culprits {
id := uuid.NewString()
arguments = append(arguments, id, culprit.Host,
culprit.Project, culprit.Ref, culprit.Revision,
culprit.AnomalyGroupIDs, current_time)
ids = append(ids, id)
}
_, err := s.db.Exec(ctx, statement, arguments...)
if err != nil {
return nil, skerr.Wrapf(err, "Failed to insert culprit")
} else {
return_ids = append(return_ids, ids...)
}
}
return return_ids, nil
}
// Adds issue id to a Culprit row.
func (s *CulpritStore) AddIssueId(ctx context.Context, id string, issue_id string, group_id string) error {
// Fetch existing anomaly_group_ids
culprits, err := s.Get(ctx, []string{id})
if err != nil {
return skerr.Wrapf(err, "Error fetching Culprit id")
}
if len(culprits) == 0 {
return skerr.Fmt("No culprit found for id %s", id)
} else if len(culprits) > 1 {
panic(fmt.Sprintf("Database invariant broken. More than one culprits found for id : %s", id))
}
group_id_exist := false
for i := 0; i < len(culprits[0].AnomalyGroupIds); i++ {
if culprits[0].AnomalyGroupIds[0] == group_id {
group_id_exist = true
break
}
}
if !group_id_exist {
return skerr.Fmt("adding issue %s for group %s which is not related to the culprit %s", issue_id, group_id, id)
}
issue_ids := culprits[0].IssueIds
issue_ids = append(issue_ids, issue_id)
issue_ids = removeDuplicateStr(issue_ids)
group_issue_map := culprits[0].GroupIssueMap
if group_issue_map == nil {
group_issue_map = map[string]string{}
}
if _, ok := group_issue_map[group_id]; ok {
return skerr.Fmt("group id %s has related issue already: %s", group_id, err)
}
group_issue_map[group_id] = issue_id
group_issue_map_in_jsonb, err := json.Marshal(group_issue_map)
if err != nil {
return skerr.Wrapf(err, "Error marshal group issue map: %s", group_issue_map)
}
statement := `
UPDATE
Culprits
SET
issue_ids=$1, group_issue_map=$2
WHERE
id=$3
`
if _, err := s.db.Exec(ctx, statement, issue_ids, group_issue_map_in_jsonb, id); err != nil {
return fmt.Errorf("error adding issue_id %s to culprit %s: %s ", issue_id, id, err)
}
return nil
}
func removeDuplicateStr(strSlice []string) []string {
allKeys := make(map[string]bool)
list := []string{}
for _, item := range strSlice {
if _, value := allKeys[item]; !value {
allKeys[item] = true
list = append(list, item)
}
}
return list
}
// Takes a string array as input, and returns a comma joined string where each element
// is single quoted.
func quotedSlice(a []string) string {
q := make([]string, len(a))
for i, s := range a {
q[i] = fmt.Sprintf("'%s'", s)
}
return strings.Join(q, ", ")
}
// Takes culprit protos and anomaly_group_id as input, and returns a culprit schema struct where
// `AnomalyGroupIds` field is populated
func convertProtoToSchema(anomaly_group_id string, commits []*pb.Commit) map[string]*schema.CulpritSchema {
resp := make(map[string]*schema.CulpritSchema)
for _, commit := range commits {
elem := &schema.CulpritSchema{
Host: commit.Host,
Project: commit.Project,
Ref: commit.Ref,
Revision: commit.Revision,
AnomalyGroupIDs: []string{anomaly_group_id},
}
resp[commit.Revision] = elem
}
return resp
}
// Takes the map of type returned by convertProtoToSchema() above, and returns two maps, where
// first one represents culprits where ID is set, and second where it isn't.
func splitIntoExistingAndNew(culprits map[string]*schema.CulpritSchema) (map[string]*schema.CulpritSchema, map[string]*schema.CulpritSchema) {
existing := make(map[string]*schema.CulpritSchema)
new := make(map[string]*schema.CulpritSchema)
for revision, culprit := range culprits {
if len(culprit.Id) > 0 {
existing[revision] = culprit
} else {
new[revision] = culprit
}
}
return existing, new
}