|  | package counters | 
|  |  | 
|  | /* | 
|  | This file contains implementations for various types of counters. | 
|  | */ | 
|  |  | 
|  | import ( | 
|  | "context" | 
|  | "encoding/gob" | 
|  | "fmt" | 
|  | "io" | 
|  | "sync" | 
|  | "time" | 
|  |  | 
|  | "cloud.google.com/go/storage" | 
|  | "go.skia.org/infra/go/gcs" | 
|  | "go.skia.org/infra/go/sklog" | 
|  | "go.skia.org/infra/go/util" | 
|  | ) | 
|  |  | 
|  | // These can be mocked for testing. | 
|  | var timeAfterFunc = time.AfterFunc | 
|  | var timeNowFunc = time.Now | 
|  |  | 
|  | // PersistentAutoDecrementCounter is an AutoDecrementCounter which uses a file | 
|  | // in GCS to persist its value between program restarts. | 
|  | type PersistentAutoDecrementCounter struct { | 
|  | gcs      gcs.GCSClient | 
|  | times    []time.Time | 
|  | file     string | 
|  | mtx      sync.RWMutex | 
|  | duration time.Duration | 
|  | } | 
|  |  | 
|  | // Helper function for reading the PersistentAutoDecrementCounter's timings from | 
|  | // a backing file in GCS. | 
|  | func read(ctx context.Context, gcsClient gcs.GCSClient, path string) ([]time.Time, error) { | 
|  | times := []time.Time{} | 
|  | r, err := gcsClient.FileReader(ctx, path) | 
|  | if err == nil { | 
|  | defer util.Close(r) | 
|  | if err = gob.NewDecoder(r).Decode(×); err != nil { | 
|  | return nil, fmt.Errorf("Invalid or corrupted file for PersistentAutoDecrementCounter: %s", err) | 
|  | } | 
|  | } else if err != storage.ErrObjectNotExist { | 
|  | return nil, fmt.Errorf("Unable to read file for PersistentAutoDecrementCounter: %s", err) | 
|  | } | 
|  | return times, nil | 
|  | } | 
|  |  | 
|  | // NewPersistentAutoDecrementCounter returns a PersistentAutoDecrementCounter | 
|  | // instance using the given file. | 
|  | func NewPersistentAutoDecrementCounter(ctx context.Context, gcsClient gcs.GCSClient, path string, d time.Duration) (*PersistentAutoDecrementCounter, error) { | 
|  | times, err := read(ctx, gcsClient, path) | 
|  | if err != nil { | 
|  | return nil, err | 
|  | } | 
|  | c := &PersistentAutoDecrementCounter{ | 
|  | gcs:      gcsClient, | 
|  | times:    times, | 
|  | file:     path, | 
|  | duration: d, | 
|  | } | 
|  | // Write the file in case we didn't have one before. | 
|  | if err := c.write(ctx); err != nil { | 
|  | return nil, err | 
|  | } | 
|  | // Start timers for any existing counts. | 
|  | now := timeNowFunc().UTC() | 
|  | for _, t := range c.times { | 
|  | t := t | 
|  | timeAfterFunc(t.Sub(now), func() { | 
|  | c.decLogErr(ctx, t) | 
|  | }) | 
|  | } | 
|  | return c, nil | 
|  | } | 
|  |  | 
|  | // write the timings to the backing file. Assumes the caller holds a write lock. | 
|  | func (c *PersistentAutoDecrementCounter) write(ctx context.Context) error { | 
|  | return gcs.WithWriteFile(c.gcs, ctx, c.file, gcs.FILE_WRITE_OPTS_TEXT, func(w io.Writer) error { | 
|  | return gob.NewEncoder(w).Encode(c.times) | 
|  | }) | 
|  | } | 
|  |  | 
|  | // Inc increments the PersistentAutoDecrementCounter and schedules a decrement. | 
|  | func (c *PersistentAutoDecrementCounter) Inc(ctx context.Context) error { | 
|  | c.mtx.Lock() | 
|  | defer c.mtx.Unlock() | 
|  | decTime := timeNowFunc().UTC().Add(c.duration) | 
|  | c.times = append(c.times, decTime) | 
|  | timeAfterFunc(c.duration, func() { | 
|  | c.decLogErr(ctx, decTime) | 
|  | }) | 
|  | return c.write(ctx) | 
|  | } | 
|  |  | 
|  | // dec decrements the PersistentAutoDecrementCounter. | 
|  | func (c *PersistentAutoDecrementCounter) dec(ctx context.Context, t time.Time) error { | 
|  | c.mtx.Lock() | 
|  | defer c.mtx.Unlock() | 
|  | for i, x := range c.times { | 
|  | if x == t { | 
|  | c.times = append(c.times[:i], c.times[i+1:]...) | 
|  | return c.write(ctx) | 
|  | } | 
|  | } | 
|  | sklog.Debugf("PersistentAutoDecrementCounter: Nothing to delete; did we get reset?") | 
|  | return nil | 
|  | } | 
|  |  | 
|  | // decLogErr decrements the PersistentAutoDecrementCounter and logs any error. | 
|  | func (c *PersistentAutoDecrementCounter) decLogErr(ctx context.Context, t time.Time) { | 
|  | if err := c.dec(ctx, t); err != nil { | 
|  | sklog.Errorf("Failed to persist PersistentAutoDecrementCounter: %s", err) | 
|  | } | 
|  | } | 
|  |  | 
|  | // Get returns the current value of the PersistentAutoDecrementCounter. | 
|  | func (c *PersistentAutoDecrementCounter) Get() int64 { | 
|  | c.mtx.RLock() | 
|  | defer c.mtx.RUnlock() | 
|  | return int64(len(c.times)) | 
|  | } | 
|  |  | 
|  | // Reset resets the value of the PersistentAutoDecrementCounter to zero. | 
|  | func (c *PersistentAutoDecrementCounter) Reset(ctx context.Context) error { | 
|  | c.mtx.Lock() | 
|  | defer c.mtx.Unlock() | 
|  | sklog.Debugf("PersistentAutoDecrementCounter: reset.") | 
|  | c.times = []time.Time{} | 
|  | return c.write(ctx) | 
|  | } | 
|  |  | 
|  | // GetDecrementTimes returns a slice of time.Time which indicate *roughly* when | 
|  | // the counter will be decremented. This is informational only, and the caller | 
|  | // should not rely on the times to be perfectly accurate. | 
|  | func (c *PersistentAutoDecrementCounter) GetDecrementTimes() []time.Time { | 
|  | c.mtx.RLock() | 
|  | defer c.mtx.RUnlock() | 
|  | rv := make([]time.Time, len(c.times)) | 
|  | for i, t := range c.times { | 
|  | rv[i] = t | 
|  | } | 
|  | return rv | 
|  | } |