| // Package redisutil provides helper types that use Redis as a backend stored |
| // instead of RAM. |
| // Such caches are available after restarts and can be shared among |
| // multiple machines. |
| package redisutil |
| |
| import ( |
| "errors" |
| "fmt" |
| "reflect" |
| "strconv" |
| "time" |
| |
| "github.com/garyburd/redigo/redis" |
| "github.com/skia-dev/glog" |
| "go.skia.org/infra/go/util" |
| ) |
| |
| const ( |
| // Constants that identify that value type for serialization/deserialization. |
| BYTES_TYPE = "b" |
| STRING_TYPE = "s" |
| INT_TYPE = "i" |
| JSON_TYPE = "j" |
| CODEC_TYPE = "c" |
| ) |
| |
| // RedisLRUCache is a Redis backed LRU cache. |
| type RedisLRUCache struct { |
| indexSetKey string |
| keyPrefix string |
| codec util.LRUCodec |
| pool *redis.Pool |
| } |
| |
| // NewRedisLRUCache returns a new Redis backed cache that complies with the |
| // util.LRUCache interface. |
| // TODO(stephana): This is a misnomer since NewRedisLRUCache does not |
| // expunge items automatically based on a timestamp. This needs to be fixed. |
| func NewRedisLRUCache(serverAddr string, db int, id string, codec util.LRUCodec) util.LRUCache { |
| return &RedisLRUCache{ |
| keyPrefix: id + ":", |
| indexSetKey: id + ":idx", |
| codec: codec, |
| pool: &redis.Pool{ |
| MaxIdle: 1000, |
| MaxActive: 0, |
| IdleTimeout: time.Minute * 20, |
| Dial: func() (redis.Conn, error) { |
| c, err := redis.Dial("tcp", serverAddr) |
| if err != nil { |
| return nil, err |
| } |
| _, err = c.Do("SELECT", db) |
| if err != nil { |
| return nil, err |
| } |
| |
| return c, err |
| }, |
| }, |
| } |
| } |
| |
| // Add, see the diff.LRUCache interface for details. |
| func (c *RedisLRUCache) Add(key, value interface{}) bool { |
| prefixedKey, rawKey, err := c.encodeKey(key) |
| if err != nil { |
| glog.Errorf("Unable to create redis key: %s", err) |
| return false |
| } |
| |
| byteVal, err := c.encodeVal(value) |
| if err != nil { |
| glog.Errorf("Unable to create redis value: %s", err) |
| return false |
| } |
| |
| conn := c.pool.Get() |
| defer util.Close(conn) |
| |
| util.LogErr(conn.Send("MULTI")) |
| util.LogErr(conn.Send("SET", prefixedKey, byteVal)) |
| util.LogErr(conn.Send("SADD", c.indexSetKey, rawKey)) |
| _, err = conn.Do("EXEC") |
| if err != nil { |
| glog.Errorf("Unable to add key: %s", err) |
| return false |
| } |
| return true |
| } |
| |
| // Get, see the diff.LRUCache interface for details. |
| func (c *RedisLRUCache) Get(key interface{}) (interface{}, bool) { |
| prefixedKey, _, err := c.encodeKey(key) |
| if err != nil { |
| glog.Errorf("Unable to create redis key %s", err) |
| } |
| |
| conn := c.pool.Get() |
| defer util.Close(conn) |
| |
| ret, err := c.decodeVal(redis.Bytes(conn.Do("GET", prefixedKey))) |
| if err != nil { |
| // Only log an error if it's not a missing value. |
| if err != redis.ErrNil { |
| glog.Errorf("Unable to get key %s: %s", prefixedKey, err) |
| } |
| return nil, false |
| } |
| return ret, true |
| } |
| |
| // Remove, see the diff.LRUCache interface for details. |
| func (c *RedisLRUCache) Remove(key interface{}) { |
| conn := c.pool.Get() |
| defer util.Close(conn) |
| |
| prefixedKey, rawKey, err := c.encodeKey(key) |
| if err != nil { |
| glog.Errorf("Unable to create redis key %s", err) |
| } |
| |
| util.LogErr(conn.Send("MULTI")) |
| util.LogErr(conn.Send("DEL", prefixedKey)) |
| util.LogErr(conn.Send("SREM", c.indexSetKey, rawKey)) |
| _, err = conn.Do("EXEC") |
| if err != nil { |
| glog.Errorf("Error deleting key:%s", err) |
| } |
| } |
| |
| // Purge clears the cache. |
| func (c *RedisLRUCache) Purge() { |
| for _, k := range c.Keys() { |
| c.Remove(k) |
| } |
| } |
| |
| // Keys returns all current keys in the cache. |
| func (c *RedisLRUCache) Keys() []interface{} { |
| conn := c.pool.Get() |
| defer util.Close(conn) |
| |
| ret, err := redis.Values(conn.Do("SMEMBERS", c.indexSetKey)) |
| if err != nil { |
| glog.Errorf("Unable to get keys: %s", err) |
| return nil |
| } |
| |
| result := make([]interface{}, len(ret)) |
| for i, v := range ret { |
| temp, ok := v.([]byte) |
| if !ok { |
| glog.Errorf("Unable to decode key: %v", v) |
| return nil |
| } |
| result[i] = c.decodeKey(temp) |
| } |
| return result |
| } |
| |
| // Len, see the diff.LRUCache interface for details. |
| func (c *RedisLRUCache) Len() int { |
| conn := c.pool.Get() |
| defer util.Close(conn) |
| |
| ret, err := redis.Int(conn.Do("SCARD", c.indexSetKey)) |
| if err != nil { |
| glog.Errorf("Unable to get length: %s", err) |
| return 0 |
| } |
| return ret |
| } |
| |
| func (c *RedisLRUCache) encodeVal(val interface{}) ([]byte, error) { |
| var resultVal []byte |
| |
| switch testVal := val.(type) { |
| // Use []byte directly. |
| case []byte: |
| resultVal = []byte(BYTES_TYPE + string(testVal)) |
| // Cast strings to []byte. |
| case string: |
| resultVal = []byte(STRING_TYPE + testVal) |
| case int: |
| resultVal = []byte(INT_TYPE + strconv.Itoa(testVal)) |
| default: |
| // If we have a codec then decode it. |
| if c.codec == nil { |
| return nil, fmt.Errorf("Values cannot be of type: %v", reflect.TypeOf(val)) |
| } |
| bytesVal, err := c.codec.Encode(val) |
| if err != nil { |
| return nil, fmt.Errorf("Unable to encode %v. Got error: %s", val, err) |
| } |
| resultVal = []byte(CODEC_TYPE + string(bytesVal)) |
| } |
| return resultVal, nil |
| } |
| |
| func (c *RedisLRUCache) decodeVal(val []byte, err error) (interface{}, error) { |
| if err != nil { |
| return nil, err |
| } |
| |
| switch string(val[:1]) { |
| case BYTES_TYPE: |
| return val[1:], nil |
| case STRING_TYPE: |
| return string(val[1:]), nil |
| case INT_TYPE: |
| ret, err := strconv.ParseInt(string(val[1:]), 10, 0) |
| return int(ret), err |
| case CODEC_TYPE: |
| if c.codec == nil { |
| return nil, errors.New("No codec defined to decode byte array") |
| } |
| return c.codec.Decode(val[1:]) |
| default: |
| return nil, fmt.Errorf("Unable to decode value: %s", string(val)) |
| } |
| } |
| |
| func (c *RedisLRUCache) encodeKey(key interface{}) (string, string, error) { |
| keyStr, ok := key.(string) |
| if !ok { |
| return "", "", errors.New("Key values have to be of type 'strings'") |
| } |
| return c.keyPrefix + keyStr, keyStr, nil |
| } |
| |
| func (c *RedisLRUCache) decodeKey(key []byte) interface{} { |
| return string(key) |
| } |