blob: 87101c785c8316971f8bdcc8b92cbc44418da74e [file] [log] [blame]
// Package sqlingestionstore contains a SQL-backed implementation of Store, which
// is meant as a quick "yes/no" to the question "Did we already ingest this file?" when polling
// for files missed during Pub/Sub ingestion.
package sqlingestionstore
import (
"context"
"crypto/md5"
"time"
"go.skia.org/infra/golden/go/ingestion"
lru "github.com/hashicorp/golang-lru"
"github.com/jackc/pgx/v4/pgxpool"
"go.skia.org/infra/go/skerr"
)
const (
cacheSize = 100_000
)
type sqlStore struct {
db *pgxpool.Pool
cache *lru.Cache
}
func New(db *pgxpool.Pool) *sqlStore {
cache, err := lru.New(cacheSize)
if err != nil {
panic(err) // should only cause error if size < 0
}
return &sqlStore{db: db, cache: cache}
}
// SetIngested implements the ingestion.Store interface.
// TODO(kjlubick) When the actual SQL ingestion works, change this to be a no-op (the ingesters
// themselves will write to this table) and WasIngested to target the SourceFiles table.
func (s *sqlStore) SetIngested(ctx context.Context, fileName string, ts time.Time) error {
sourceID := md5.Sum([]byte(fileName))
_, err := s.db.Exec(ctx, `
UPSERT INTO DeprecatedIngestedFiles (source_file_id, source_file, last_ingested)
VALUES ($1, $2, $3)`, sourceID[:], fileName, ts)
if err != nil {
return skerr.Wrapf(err, "Marking %s as ingested", fileName)
}
return nil
}
// WasIngested implements the ingestion.Store interface. It has a RAM cache to remember
// already ingested files (since an ingested file cannot become "uningested").
func (s *sqlStore) WasIngested(ctx context.Context, fileName string) (bool, error) {
if s.cache.Contains(fileName) {
return true, nil
}
sourceID := md5.Sum([]byte(fileName))
row := s.db.QueryRow(ctx, `SELECT count(*) FROM DeprecatedIngestedFiles where source_file_id = $1`, sourceID[:])
count := 0
err := row.Scan(&count)
if err != nil {
return false, skerr.Wrapf(err, "Looking for ingested file %s", fileName)
}
if count == 0 {
return false, nil
}
s.cache.Add(fileName, true)
return true, nil
}
// Verify sqlStore implements Store
var _ ingestion.Store = (*sqlStore)(nil)