Allow common redis and leveldb connections (#12385)
* Allow common redis and leveldb connections Prevents multiple reopening of redis and leveldb connections to the same place by sharing connections. Further allows for more configurable redis connection type using the redisURI and a leveldbURI scheme. Signed-off-by: Andrew Thornton <art27@cantab.net> * add unit-test Signed-off-by: Andrew Thornton <art27@cantab.net> * as per @lunny Signed-off-by: Andrew Thornton <art27@cantab.net> * add test Signed-off-by: Andrew Thornton <art27@cantab.net> * Update modules/cache/cache_redis.go * Update modules/queue/queue_disk.go * Update modules/cache/cache_redis.go * Update modules/cache/cache_redis.go * Update modules/queue/unique_queue_disk.go * Update modules/queue/queue_disk.go * Update modules/queue/unique_queue_disk.go * Update modules/session/redis.go Co-authored-by: techknowlogick <techknowlogick@gitea.io> Co-authored-by: Lauris BH <lauris@nix.lv>tokarchuk/v1.17
parent
f404bdde9b
commit
7f8e3192cd
@ -0,0 +1,25 @@ |
||||
// Copyright 2020 The Gitea Authors. All rights reserved.
|
||||
// Use of this source code is governed by a MIT-style
|
||||
// license that can be found in the LICENSE file.
|
||||
|
||||
package nosql |
||||
|
||||
import "net/url" |
||||
|
||||
// ToLevelDBURI converts old style connections to a LevelDBURI
|
||||
//
|
||||
// A LevelDBURI matches the pattern:
|
||||
//
|
||||
// leveldb://path[?[option=value]*]
|
||||
//
|
||||
// We have previously just provided the path but this prevent other options
|
||||
func ToLevelDBURI(connection string) *url.URL { |
||||
uri, err := url.Parse(connection) |
||||
if err == nil && uri.Scheme == "leveldb" { |
||||
return uri |
||||
} |
||||
uri, _ = url.Parse("leveldb://common") |
||||
uri.Host = "" |
||||
uri.Path = connection |
||||
return uri |
||||
} |
@ -0,0 +1,71 @@ |
||||
// Copyright 2020 The Gitea Authors. All rights reserved.
|
||||
// Use of this source code is governed by a MIT-style
|
||||
// license that can be found in the LICENSE file.
|
||||
|
||||
package nosql |
||||
|
||||
import ( |
||||
"strconv" |
||||
"sync" |
||||
"time" |
||||
|
||||
"github.com/go-redis/redis/v7" |
||||
"github.com/syndtr/goleveldb/leveldb" |
||||
) |
||||
|
||||
var manager *Manager |
||||
|
||||
// Manager is the nosql connection manager
|
||||
type Manager struct { |
||||
mutex sync.Mutex |
||||
|
||||
RedisConnections map[string]*redisClientHolder |
||||
LevelDBConnections map[string]*levelDBHolder |
||||
} |
||||
|
||||
type redisClientHolder struct { |
||||
redis.UniversalClient |
||||
name []string |
||||
count int64 |
||||
} |
||||
|
||||
func (r *redisClientHolder) Close() error { |
||||
return manager.CloseRedisClient(r.name[0]) |
||||
} |
||||
|
||||
type levelDBHolder struct { |
||||
name []string |
||||
count int64 |
||||
db *leveldb.DB |
||||
} |
||||
|
||||
func init() { |
||||
_ = GetManager() |
||||
} |
||||
|
||||
// GetManager returns a Manager and initializes one as singleton is there's none yet
|
||||
func GetManager() *Manager { |
||||
if manager == nil { |
||||
manager = &Manager{ |
||||
RedisConnections: make(map[string]*redisClientHolder), |
||||
LevelDBConnections: make(map[string]*levelDBHolder), |
||||
} |
||||
} |
||||
return manager |
||||
} |
||||
|
||||
func valToTimeDuration(vs []string) (result time.Duration) { |
||||
var err error |
||||
for _, v := range vs { |
||||
result, err = time.ParseDuration(v) |
||||
if err != nil { |
||||
var val int |
||||
val, err = strconv.Atoi(v) |
||||
result = time.Duration(val) |
||||
} |
||||
if err == nil { |
||||
return |
||||
} |
||||
} |
||||
return |
||||
} |
@ -0,0 +1,151 @@ |
||||
// Copyright 2020 The Gitea Authors. All rights reserved.
|
||||
// Use of this source code is governed by a MIT-style
|
||||
// license that can be found in the LICENSE file.
|
||||
|
||||
package nosql |
||||
|
||||
import ( |
||||
"path" |
||||
"strconv" |
||||
"strings" |
||||
|
||||
"github.com/syndtr/goleveldb/leveldb" |
||||
"github.com/syndtr/goleveldb/leveldb/errors" |
||||
"github.com/syndtr/goleveldb/leveldb/opt" |
||||
) |
||||
|
||||
// CloseLevelDB closes a levelDB
|
||||
func (m *Manager) CloseLevelDB(connection string) error { |
||||
m.mutex.Lock() |
||||
defer m.mutex.Unlock() |
||||
db, ok := m.LevelDBConnections[connection] |
||||
if !ok { |
||||
connection = ToLevelDBURI(connection).String() |
||||
db, ok = m.LevelDBConnections[connection] |
||||
} |
||||
if !ok { |
||||
return nil |
||||
} |
||||
|
||||
db.count-- |
||||
if db.count > 0 { |
||||
return nil |
||||
} |
||||
|
||||
for _, name := range db.name { |
||||
delete(m.LevelDBConnections, name) |
||||
} |
||||
return db.db.Close() |
||||
} |
||||
|
||||
// GetLevelDB gets a levelDB for a particular connection
|
||||
func (m *Manager) GetLevelDB(connection string) (*leveldb.DB, error) { |
||||
m.mutex.Lock() |
||||
defer m.mutex.Unlock() |
||||
db, ok := m.LevelDBConnections[connection] |
||||
if ok { |
||||
db.count++ |
||||
|
||||
return db.db, nil |
||||
} |
||||
dataDir := connection |
||||
uri := ToLevelDBURI(connection) |
||||
db = &levelDBHolder{ |
||||
name: []string{connection, uri.String()}, |
||||
} |
||||
|
||||
dataDir = path.Join(uri.Host, uri.Path) |
||||
opts := &opt.Options{} |
||||
for k, v := range uri.Query() { |
||||
switch replacer.Replace(strings.ToLower(k)) { |
||||
case "blockcachecapacity": |
||||
opts.BlockCacheCapacity, _ = strconv.Atoi(v[0]) |
||||
case "blockcacheevictremoved": |
||||
opts.BlockCacheEvictRemoved, _ = strconv.ParseBool(v[0]) |
||||
case "blockrestartinterval": |
||||
opts.BlockRestartInterval, _ = strconv.Atoi(v[0]) |
||||
case "blocksize": |
||||
opts.BlockSize, _ = strconv.Atoi(v[0]) |
||||
case "compactionexpandlimitfactor": |
||||
opts.CompactionExpandLimitFactor, _ = strconv.Atoi(v[0]) |
||||
case "compactiongpoverlapsfactor": |
||||
opts.CompactionGPOverlapsFactor, _ = strconv.Atoi(v[0]) |
||||
case "compactionl0trigger": |
||||
opts.CompactionL0Trigger, _ = strconv.Atoi(v[0]) |
||||
case "compactionsourcelimitfactor": |
||||
opts.CompactionSourceLimitFactor, _ = strconv.Atoi(v[0]) |
||||
case "compactiontablesize": |
||||
opts.CompactionTableSize, _ = strconv.Atoi(v[0]) |
||||
case "compactiontablesizemultiplier": |
||||
opts.CompactionTableSizeMultiplier, _ = strconv.ParseFloat(v[0], 64) |
||||
case "compactiontablesizemultiplierperlevel": |
||||
for _, val := range v { |
||||
f, _ := strconv.ParseFloat(val, 64) |
||||
opts.CompactionTableSizeMultiplierPerLevel = append(opts.CompactionTableSizeMultiplierPerLevel, f) |
||||
} |
||||
case "compactiontotalsize": |
||||
opts.CompactionTotalSize, _ = strconv.Atoi(v[0]) |
||||
case "compactiontotalsizemultiplier": |
||||
opts.CompactionTotalSizeMultiplier, _ = strconv.ParseFloat(v[0], 64) |
||||
case "compactiontotalsizemultiplierperlevel": |
||||
for _, val := range v { |
||||
f, _ := strconv.ParseFloat(val, 64) |
||||
opts.CompactionTotalSizeMultiplierPerLevel = append(opts.CompactionTotalSizeMultiplierPerLevel, f) |
||||
} |
||||
case "compression": |
||||
val, _ := strconv.Atoi(v[0]) |
||||
opts.Compression = opt.Compression(val) |
||||
case "disablebufferpool": |
||||
opts.DisableBufferPool, _ = strconv.ParseBool(v[0]) |
||||
case "disableblockcache": |
||||
opts.DisableBlockCache, _ = strconv.ParseBool(v[0]) |
||||
case "disablecompactionbackoff": |
||||
opts.DisableCompactionBackoff, _ = strconv.ParseBool(v[0]) |
||||
case "disablelargebatchtransaction": |
||||
opts.DisableLargeBatchTransaction, _ = strconv.ParseBool(v[0]) |
||||
case "errorifexist": |
||||
opts.ErrorIfExist, _ = strconv.ParseBool(v[0]) |
||||
case "errorifmissing": |
||||
opts.ErrorIfMissing, _ = strconv.ParseBool(v[0]) |
||||
case "iteratorsamplingrate": |
||||
opts.IteratorSamplingRate, _ = strconv.Atoi(v[0]) |
||||
case "nosync": |
||||
opts.NoSync, _ = strconv.ParseBool(v[0]) |
||||
case "nowritemerge": |
||||
opts.NoWriteMerge, _ = strconv.ParseBool(v[0]) |
||||
case "openfilescachecapacity": |
||||
opts.OpenFilesCacheCapacity, _ = strconv.Atoi(v[0]) |
||||
case "readonly": |
||||
opts.ReadOnly, _ = strconv.ParseBool(v[0]) |
||||
case "strict": |
||||
val, _ := strconv.Atoi(v[0]) |
||||
opts.Strict = opt.Strict(val) |
||||
case "writebuffer": |
||||
opts.WriteBuffer, _ = strconv.Atoi(v[0]) |
||||
case "writel0pausetrigger": |
||||
opts.WriteL0PauseTrigger, _ = strconv.Atoi(v[0]) |
||||
case "writel0slowdowntrigger": |
||||
opts.WriteL0SlowdownTrigger, _ = strconv.Atoi(v[0]) |
||||
case "clientname": |
||||
db.name = append(db.name, v[0]) |
||||
} |
||||
} |
||||
|
||||
var err error |
||||
db.db, err = leveldb.OpenFile(dataDir, opts) |
||||
if err != nil { |
||||
if !errors.IsCorrupted(err) { |
||||
return nil, err |
||||
} |
||||
db.db, err = leveldb.RecoverFile(dataDir, opts) |
||||
if err != nil { |
||||
return nil, err |
||||
} |
||||
} |
||||
|
||||
for _, name := range db.name { |
||||
m.LevelDBConnections[name] = db |
||||
} |
||||
db.count++ |
||||
return db.db, nil |
||||
} |
@ -0,0 +1,205 @@ |
||||
// Copyright 2020 The Gitea Authors. All rights reserved.
|
||||
// Use of this source code is governed by a MIT-style
|
||||
// license that can be found in the LICENSE file.
|
||||
|
||||
package nosql |
||||
|
||||
import ( |
||||
"crypto/tls" |
||||
"path" |
||||
"strconv" |
||||
"strings" |
||||
|
||||
"github.com/go-redis/redis/v7" |
||||
) |
||||
|
||||
var replacer = strings.NewReplacer("_", "", "-", "") |
||||
|
||||
// CloseRedisClient closes a redis client
|
||||
func (m *Manager) CloseRedisClient(connection string) error { |
||||
m.mutex.Lock() |
||||
defer m.mutex.Unlock() |
||||
client, ok := m.RedisConnections[connection] |
||||
if !ok { |
||||
connection = ToRedisURI(connection).String() |
||||
client, ok = m.RedisConnections[connection] |
||||
} |
||||
if !ok { |
||||
return nil |
||||
} |
||||
|
||||
client.count-- |
||||
if client.count > 0 { |
||||
return nil |
||||
} |
||||
|
||||
for _, name := range client.name { |
||||
delete(m.RedisConnections, name) |
||||
} |
||||
return client.UniversalClient.Close() |
||||
} |
||||
|
||||
// GetRedisClient gets a redis client for a particular connection
|
||||
func (m *Manager) GetRedisClient(connection string) redis.UniversalClient { |
||||
m.mutex.Lock() |
||||
defer m.mutex.Unlock() |
||||
client, ok := m.RedisConnections[connection] |
||||
if ok { |
||||
client.count++ |
||||
return client |
||||
} |
||||
|
||||
uri := ToRedisURI(connection) |
||||
client, ok = m.RedisConnections[uri.String()] |
||||
if ok { |
||||
client.count++ |
||||
return client |
||||
} |
||||
client = &redisClientHolder{ |
||||
name: []string{connection, uri.String()}, |
||||
} |
||||
|
||||
opts := &redis.UniversalOptions{} |
||||
tlsConfig := &tls.Config{} |
||||
|
||||
// Handle username/password
|
||||
if password, ok := uri.User.Password(); ok { |
||||
opts.Password = password |
||||
// Username does not appear to be handled by redis.Options
|
||||
opts.Username = uri.User.Username() |
||||
} else if uri.User.Username() != "" { |
||||
// assume this is the password
|
||||
opts.Password = uri.User.Username() |
||||
} |
||||
|
||||
// Now handle the uri query sets
|
||||
for k, v := range uri.Query() { |
||||
switch replacer.Replace(strings.ToLower(k)) { |
||||
case "addr": |
||||
opts.Addrs = append(opts.Addrs, v...) |
||||
case "addrs": |
||||
opts.Addrs = append(opts.Addrs, strings.Split(v[0], ",")...) |
||||
case "username": |
||||
opts.Username = v[0] |
||||
case "password": |
||||
opts.Password = v[0] |
||||
case "database": |
||||
fallthrough |
||||
case "db": |
||||
opts.DB, _ = strconv.Atoi(v[0]) |
||||
case "maxretries": |
||||
opts.MaxRetries, _ = strconv.Atoi(v[0]) |
||||
case "minretrybackoff": |
||||
opts.MinRetryBackoff = valToTimeDuration(v) |
||||
case "maxretrybackoff": |
||||
opts.MaxRetryBackoff = valToTimeDuration(v) |
||||
case "timeout": |
||||
timeout := valToTimeDuration(v) |
||||
if timeout != 0 { |
||||
if opts.DialTimeout == 0 { |
||||
opts.DialTimeout = timeout |
||||
} |
||||
if opts.ReadTimeout == 0 { |
||||
opts.ReadTimeout = timeout |
||||
} |
||||
} |
||||
case "dialtimeout": |
||||
opts.DialTimeout = valToTimeDuration(v) |
||||
case "readtimeout": |
||||
opts.ReadTimeout = valToTimeDuration(v) |
||||
case "writetimeout": |
||||
opts.WriteTimeout = valToTimeDuration(v) |
||||
case "poolsize": |
||||
opts.PoolSize, _ = strconv.Atoi(v[0]) |
||||
case "minidleconns": |
||||
opts.MinIdleConns, _ = strconv.Atoi(v[0]) |
||||
case "pooltimeout": |
||||
opts.PoolTimeout = valToTimeDuration(v) |
||||
case "idletimeout": |
||||
opts.IdleTimeout = valToTimeDuration(v) |
||||
case "idlecheckfrequency": |
||||
opts.IdleCheckFrequency = valToTimeDuration(v) |
||||
case "maxredirects": |
||||
opts.MaxRedirects, _ = strconv.Atoi(v[0]) |
||||
case "readonly": |
||||
opts.ReadOnly, _ = strconv.ParseBool(v[0]) |
||||
case "routebylatency": |
||||
opts.RouteByLatency, _ = strconv.ParseBool(v[0]) |
||||
case "routerandomly": |
||||
opts.RouteRandomly, _ = strconv.ParseBool(v[0]) |
||||
case "sentinelmasterid": |
||||
fallthrough |
||||
case "mastername": |
||||
opts.MasterName = v[0] |
||||
case "skipverify": |
||||
fallthrough |
||||
case "insecureskipverify": |
||||
insecureSkipVerify, _ := strconv.ParseBool(v[0]) |
||||
tlsConfig.InsecureSkipVerify = insecureSkipVerify |
||||
case "clientname": |
||||
client.name = append(client.name, v[0]) |
||||
} |
||||
} |
||||
|
||||
switch uri.Scheme { |
||||
case "redis+sentinels": |
||||
fallthrough |
||||
case "rediss+sentinel": |
||||
opts.TLSConfig = tlsConfig |
||||
fallthrough |
||||
case "redis+sentinel": |
||||
if uri.Host != "" { |
||||
opts.Addrs = append(opts.Addrs, strings.Split(uri.Host, ",")...) |
||||
} |
||||
if uri.Path != "" { |
||||
if db, err := strconv.Atoi(uri.Path); err == nil { |
||||
opts.DB = db |
||||
} |
||||
} |
||||
|
||||
client.UniversalClient = redis.NewFailoverClient(opts.Failover()) |
||||
case "redis+clusters": |
||||
fallthrough |
||||
case "rediss+cluster": |
||||
opts.TLSConfig = tlsConfig |
||||
fallthrough |
||||
case "redis+cluster": |
||||
if uri.Host != "" { |
||||
opts.Addrs = append(opts.Addrs, strings.Split(uri.Host, ",")...) |
||||
} |
||||
if uri.Path != "" { |
||||
if db, err := strconv.Atoi(uri.Path); err == nil { |
||||
opts.DB = db |
||||
} |
||||
} |
||||
client.UniversalClient = redis.NewClusterClient(opts.Cluster()) |
||||
case "redis+socket": |
||||
simpleOpts := opts.Simple() |
||||
simpleOpts.Network = "unix" |
||||
simpleOpts.Addr = path.Join(uri.Host, uri.Path) |
||||
client.UniversalClient = redis.NewClient(simpleOpts) |
||||
case "rediss": |
||||
opts.TLSConfig = tlsConfig |
||||
fallthrough |
||||
case "redis": |
||||
if uri.Host != "" { |
||||
opts.Addrs = append(opts.Addrs, strings.Split(uri.Host, ",")...) |
||||
} |
||||
if uri.Path != "" { |
||||
if db, err := strconv.Atoi(uri.Path); err == nil { |
||||
opts.DB = db |
||||
} |
||||
} |
||||
client.UniversalClient = redis.NewClient(opts.Simple()) |
||||
default: |
||||
return nil |
||||
} |
||||
|
||||
for _, name := range client.name { |
||||
m.RedisConnections[name] = client |
||||
} |
||||
|
||||
client.count++ |
||||
|
||||
return client |
||||
} |
@ -0,0 +1,102 @@ |
||||
// Copyright 2020 The Gitea Authors. All rights reserved.
|
||||
// Use of this source code is governed by a MIT-style
|
||||
// license that can be found in the LICENSE file.
|
||||
|
||||
package nosql |
||||
|
||||
import ( |
||||
"net/url" |
||||
"strconv" |
||||
"strings" |
||||
) |
||||
|
||||
// The file contains common redis connection functions
|
||||
|
||||
// ToRedisURI converts old style connections to a RedisURI
|
||||
//
|
||||
// A RedisURI matches the pattern:
|
||||
//
|
||||
// redis://[username:password@]host[:port][/database][?[option=value]*]
|
||||
// rediss://[username:password@]host[:port][/database][?[option=value]*]
|
||||
// redis+socket://[username:password@]path[/database][?[option=value]*]
|
||||
// redis+sentinel://[password@]host1 [: port1][, host2 [:port2]][, hostN [:portN]][/ database][?[option=value]*]
|
||||
// redis+cluster://[password@]host1 [: port1][, host2 [:port2]][, hostN [:portN]][/ database][?[option=value]*]
|
||||
//
|
||||
// We have previously used a URI like:
|
||||
// addrs=127.0.0.1:6379 db=0
|
||||
// network=tcp,addr=127.0.0.1:6379,password=macaron,db=0,pool_size=100,idle_timeout=180
|
||||
//
|
||||
// We need to convert this old style to the new style
|
||||
func ToRedisURI(connection string) *url.URL { |
||||
uri, err := url.Parse(connection) |
||||
if err == nil && strings.HasPrefix(uri.Scheme, "redis") { |
||||
// OK we're going to assume that this is a reasonable redis URI
|
||||
return uri |
||||
} |
||||
|
||||
// Let's set a nice default
|
||||
uri, _ = url.Parse("redis://127.0.0.1:6379/0") |
||||
network := "tcp" |
||||
query := uri.Query() |
||||
|
||||
// OK so there are two types: Space delimited and Comma delimited
|
||||
// Let's assume that we have a space delimited string - as this is the most common
|
||||
fields := strings.Fields(connection) |
||||
if len(fields) == 1 { |
||||
// It's a comma delimited string, then...
|
||||
fields = strings.Split(connection, ",") |
||||
|
||||
} |
||||
for _, f := range fields { |
||||
items := strings.SplitN(f, "=", 2) |
||||
if len(items) < 2 { |
||||
continue |
||||
} |
||||
switch strings.ToLower(items[0]) { |
||||
case "network": |
||||
if items[1] == "unix" { |
||||
uri.Scheme = "redis+socket" |
||||
} |
||||
network = items[1] |
||||
case "addrs": |
||||
uri.Host = items[1] |
||||
// now we need to handle the clustering
|
||||
if strings.Contains(items[1], ",") && network == "tcp" { |
||||
uri.Scheme = "redis+cluster" |
||||
} |
||||
case "addr": |
||||
uri.Host = items[1] |
||||
case "password": |
||||
uri.User = url.UserPassword(uri.User.Username(), items[1]) |
||||
case "username": |
||||
password, set := uri.User.Password() |
||||
if !set { |
||||
uri.User = url.User(items[1]) |
||||
} else { |
||||
uri.User = url.UserPassword(items[1], password) |
||||
} |
||||
case "db": |
||||
uri.Path = "/" + items[1] |
||||
case "idle_timeout": |
||||
_, err := strconv.Atoi(items[1]) |
||||
if err == nil { |
||||
query.Add("idle_timeout", items[1]+"s") |
||||
} else { |
||||
query.Add("idle_timeout", items[1]) |
||||
} |
||||
default: |
||||
// Other options become query params
|
||||
query.Add(items[0], items[1]) |
||||
} |
||||
} |
||||
|
||||
// Finally we need to fix up the Host if we have a unix port
|
||||
if uri.Scheme == "redis+socket" { |
||||
query.Set("db", uri.Path) |
||||
uri.Path = uri.Host |
||||
uri.Host = "" |
||||
} |
||||
uri.RawQuery = query.Encode() |
||||
|
||||
return uri |
||||
} |
@ -0,0 +1,35 @@ |
||||
// Copyright 2020 The Gitea Authors. All rights reserved.
|
||||
// Use of this source code is governed by a MIT-style
|
||||
// license that can be found in the LICENSE file.
|
||||
|
||||
package nosql |
||||
|
||||
import ( |
||||
"testing" |
||||
) |
||||
|
||||
func TestToRedisURI(t *testing.T) { |
||||
tests := []struct { |
||||
name string |
||||
connection string |
||||
want string |
||||
}{ |
||||
{ |
||||
name: "old_default", |
||||
connection: "addrs=127.0.0.1:6379 db=0", |
||||
want: "redis://127.0.0.1:6379/0", |
||||
}, |
||||
{ |
||||
name: "old_macaron_session_default", |
||||
connection: "network=tcp,addr=127.0.0.1:6379,password=macaron,db=0,pool_size=100,idle_timeout=180", |
||||
want: "redis://:macaron@127.0.0.1:6379/0?idle_timeout=180s&pool_size=100", |
||||
}, |
||||
} |
||||
for _, tt := range tests { |
||||
t.Run(tt.name, func(t *testing.T) { |
||||
if got := ToRedisURI(tt.connection); got == nil || got.String() != tt.want { |
||||
t.Errorf(`ToRedisURI(%q) = %s, want %s`, tt.connection, got.String(), tt.want) |
||||
} |
||||
}) |
||||
} |
||||
} |
@ -1 +0,0 @@ |
||||
ignore |
@ -1 +0,0 @@ |
||||
ignore |
@ -1,19 +0,0 @@ |
||||
sudo: false |
||||
language: go |
||||
|
||||
services: |
||||
- redis-server |
||||
|
||||
go: |
||||
- 1.9.x |
||||
- 1.10.x |
||||
- 1.11.x |
||||
- tip |
||||
|
||||
matrix: |
||||
allow_failures: |
||||
- go: tip |
||||
|
||||
install: |
||||
- go get github.com/onsi/ginkgo |
||||
- go get github.com/onsi/gomega |
@ -1,25 +0,0 @@ |
||||
# Changelog |
||||
|
||||
## Unreleased |
||||
|
||||
- Cluster and Ring pipelines process commands for each node in its own goroutine. |
||||
|
||||
## 6.14 |
||||
|
||||
- Added Options.MinIdleConns. |
||||
- Added Options.MaxConnAge. |
||||
- PoolStats.FreeConns is renamed to PoolStats.IdleConns. |
||||
- Add Client.Do to simplify creating custom commands. |
||||
- Add Cmd.String, Cmd.Int, Cmd.Int64, Cmd.Uint64, Cmd.Float64, and Cmd.Bool helpers. |
||||
- Lower memory usage. |
||||
|
||||
## v6.13 |
||||
|
||||
- Ring got new options called `HashReplicas` and `Hash`. It is recommended to set `HashReplicas = 1000` for better keys distribution between shards. |
||||
- Cluster client was optimized to use much less memory when reloading cluster state. |
||||
- PubSub.ReceiveMessage is re-worked to not use ReceiveTimeout so it does not lose data when timeout occurres. In most cases it is recommended to use PubSub.Channel instead. |
||||
- Dialer.KeepAlive is set to 5 minutes by default. |
||||
|
||||
## v6.12 |
||||
|
||||
- ClusterClient got new option called `ClusterSlots` which allows to build cluster of normal Redis Servers that don't have cluster mode enabled. See https://godoc.org/github.com/go-redis/redis#example-NewClusterClient--ManualSetup |
@ -1,89 +0,0 @@ |
||||
package internal |
||||
|
||||
import ( |
||||
"io" |
||||
"net" |
||||
"strings" |
||||
|
||||
"github.com/go-redis/redis/internal/proto" |
||||
) |
||||
|
||||
func IsRetryableError(err error, retryTimeout bool) bool { |
||||
if err == nil { |
||||
return false |
||||
} |
||||
if err == io.EOF { |
||||
return true |
||||
} |
||||
if netErr, ok := err.(net.Error); ok { |
||||
if netErr.Timeout() { |
||||
return retryTimeout |
||||
} |
||||
return true |
||||
} |
||||
s := err.Error() |
||||
if s == "ERR max number of clients reached" { |
||||
return true |
||||
} |
||||
if strings.HasPrefix(s, "LOADING ") { |
||||
return true |
||||
} |
||||
if strings.HasPrefix(s, "READONLY ") { |
||||
return true |
||||
} |
||||
if strings.HasPrefix(s, "CLUSTERDOWN ") { |
||||
return true |
||||
} |
||||
return false |
||||
} |
||||
|
||||
func IsRedisError(err error) bool { |
||||
_, ok := err.(proto.RedisError) |
||||
return ok |
||||
} |
||||
|
||||
func IsBadConn(err error, allowTimeout bool) bool { |
||||
if err == nil { |
||||
return false |
||||
} |
||||
if IsRedisError(err) { |
||||
// #790
|
||||
return IsReadOnlyError(err) |
||||
} |
||||
if allowTimeout { |
||||
if netErr, ok := err.(net.Error); ok && netErr.Timeout() { |
||||
return false |
||||
} |
||||
} |
||||
return true |
||||
} |
||||
|
||||
func IsMovedError(err error) (moved bool, ask bool, addr string) { |
||||
if !IsRedisError(err) { |
||||
return |
||||
} |
||||
|
||||
s := err.Error() |
||||
if strings.HasPrefix(s, "MOVED ") { |
||||
moved = true |
||||
} else if strings.HasPrefix(s, "ASK ") { |
||||
ask = true |
||||
} else { |
||||
return |
||||
} |
||||
|
||||
ind := strings.LastIndex(s, " ") |
||||
if ind == -1 { |
||||
return false, false, "" |
||||
} |
||||
addr = s[ind+1:] |
||||
return |
||||
} |
||||
|
||||
func IsLoadingError(err error) bool { |
||||
return strings.HasPrefix(err.Error(), "LOADING ") |
||||
} |
||||
|
||||
func IsReadOnlyError(err error) bool { |
||||
return strings.HasPrefix(err.Error(), "READONLY ") |
||||
} |
@ -1,15 +0,0 @@ |
||||
package internal |
||||
|
||||
import ( |
||||
"fmt" |
||||
"log" |
||||
) |
||||
|
||||
var Logger *log.Logger |
||||
|
||||
func Logf(s string, args ...interface{}) { |
||||
if Logger == nil { |
||||
return |
||||
} |
||||
Logger.Output(2, fmt.Sprintf(s, args...)) |
||||
} |
@ -1,93 +0,0 @@ |
||||
package pool |
||||
|
||||
import ( |
||||
"net" |
||||
"sync/atomic" |
||||
"time" |
||||
|
||||
"github.com/go-redis/redis/internal/proto" |
||||
) |
||||
|
||||
var noDeadline = time.Time{} |
||||
|
||||
type Conn struct { |
||||
netConn net.Conn |
||||
|
||||
rd *proto.Reader |
||||
rdLocked bool |
||||
wr *proto.Writer |
||||
|
||||
InitedAt time.Time |
||||
pooled bool |
||||
usedAt atomic.Value |
||||
} |
||||
|
||||
func NewConn(netConn net.Conn) *Conn { |
||||
cn := &Conn{ |
||||
netConn: netConn, |
||||
} |
||||
cn.rd = proto.NewReader(netConn) |
||||
cn.wr = proto.NewWriter(netConn) |
||||
cn.SetUsedAt(time.Now()) |
||||
return cn |
||||
} |
||||
|
||||
func (cn *Conn) UsedAt() time.Time { |
||||
return cn.usedAt.Load().(time.Time) |
||||
} |
||||
|
||||
func (cn *Conn) SetUsedAt(tm time.Time) { |
||||
cn.usedAt.Store(tm) |
||||
} |
||||
|
||||
func (cn *Conn) SetNetConn(netConn net.Conn) { |
||||
cn.netConn = netConn |
||||
cn.rd.Reset(netConn) |
||||
cn.wr.Reset(netConn) |
||||
} |
||||
|
||||
func (cn *Conn) setReadTimeout(timeout time.Duration) error { |
||||
now := time.Now() |
||||
cn.SetUsedAt(now) |
||||
if timeout > 0 { |
||||
return cn.netConn.SetReadDeadline(now.Add(timeout)) |
||||
} |
||||
return cn.netConn.SetReadDeadline(noDeadline) |
||||
} |
||||
|
||||
func (cn *Conn) setWriteTimeout(timeout time.Duration) error { |
||||
now := time.Now() |
||||
cn.SetUsedAt(now) |
||||
if timeout > 0 { |
||||
return cn.netConn.SetWriteDeadline(now.Add(timeout)) |
||||
} |
||||
return cn.netConn.SetWriteDeadline(noDeadline) |
||||
} |
||||
|
||||
func (cn *Conn) Write(b []byte) (int, error) { |
||||
return cn.netConn.Write(b) |
||||
} |
||||
|
||||
func (cn *Conn) RemoteAddr() net.Addr { |
||||
return cn.netConn.RemoteAddr() |
||||
} |
||||
|
||||
func (cn *Conn) WithReader(timeout time.Duration, fn func(rd *proto.Reader) error) error { |
||||
_ = cn.setReadTimeout(timeout) |
||||
return fn(cn.rd) |
||||
} |
||||
|
||||
func (cn *Conn) WithWriter(timeout time.Duration, fn func(wr *proto.Writer) error) error { |
||||
_ = cn.setWriteTimeout(timeout) |
||||
|
||||
firstErr := fn(cn.wr) |
||||
err := cn.wr.Flush() |
||||
if err != nil && firstErr == nil { |
||||
firstErr = err |
||||
} |
||||
return firstErr |
||||
} |
||||
|
||||
func (cn *Conn) Close() error { |
||||
return cn.netConn.Close() |
||||
} |
@ -1,53 +0,0 @@ |
||||
package pool |
||||
|
||||
type SingleConnPool struct { |
||||
cn *Conn |
||||
} |
||||
|
||||
var _ Pooler = (*SingleConnPool)(nil) |
||||
|
||||
func NewSingleConnPool(cn *Conn) *SingleConnPool { |
||||
return &SingleConnPool{ |
||||
cn: cn, |
||||
} |
||||
} |
||||
|
||||
func (p *SingleConnPool) NewConn() (*Conn, error) { |
||||
panic("not implemented") |
||||
} |
||||
|
||||
func (p *SingleConnPool) CloseConn(*Conn) error { |
||||
panic("not implemented") |
||||
} |
||||
|
||||
func (p *SingleConnPool) Get() (*Conn, error) { |
||||
return p.cn, nil |
||||
} |
||||
|
||||
func (p *SingleConnPool) Put(cn *Conn) { |
||||
if p.cn != cn { |
||||
panic("p.cn != cn") |
||||
} |
||||
} |
||||
|
||||
func (p *SingleConnPool) Remove(cn *Conn) { |
||||
if p.cn != cn { |
||||
panic("p.cn != cn") |
||||
} |
||||
} |
||||
|
||||
func (p *SingleConnPool) Len() int { |
||||
return 1 |
||||
} |
||||
|
||||
func (p *SingleConnPool) IdleLen() int { |
||||
return 0 |
||||
} |
||||
|
||||
func (p *SingleConnPool) Stats() *Stats { |
||||
return nil |
||||
} |
||||
|
||||
func (p *SingleConnPool) Close() error { |
||||
return nil |
||||
} |
@ -1,29 +0,0 @@ |
||||
package internal |
||||
|
||||
import "github.com/go-redis/redis/internal/util" |
||||
|
||||
func ToLower(s string) string { |
||||
if isLower(s) { |
||||
return s |
||||
} |
||||
|
||||
b := make([]byte, len(s)) |
||||
for i := range b { |
||||
c := s[i] |
||||
if c >= 'A' && c <= 'Z' { |
||||
c += 'a' - 'A' |
||||
} |
||||
b[i] = c |
||||
} |
||||
return util.BytesToString(b) |
||||
} |
||||
|
||||
func isLower(s string) bool { |
||||
for i := 0; i < len(s); i++ { |
||||
c := s[i] |
||||
if c >= 'A' && c <= 'Z' { |
||||
return false |
||||
} |
||||
} |
||||
return true |
||||
} |
@ -1,580 +0,0 @@ |
||||
package redis |
||||
|
||||
import ( |
||||
"context" |
||||
"fmt" |
||||
"log" |
||||
"os" |
||||
"time" |
||||
|
||||
"github.com/go-redis/redis/internal" |
||||
"github.com/go-redis/redis/internal/pool" |
||||
"github.com/go-redis/redis/internal/proto" |
||||
) |
||||
|
||||
// Nil reply Redis returns when key does not exist.
|
||||
const Nil = proto.Nil |
||||
|
||||
func init() { |
||||
SetLogger(log.New(os.Stderr, "redis: ", log.LstdFlags|log.Lshortfile)) |
||||
} |
||||
|
||||
func SetLogger(logger *log.Logger) { |
||||
internal.Logger = logger |
||||
} |
||||
|
||||
type baseClient struct { |
||||
opt *Options |
||||
connPool pool.Pooler |
||||
limiter Limiter |
||||
|
||||
process func(Cmder) error |
||||
processPipeline func([]Cmder) error |
||||
processTxPipeline func([]Cmder) error |
||||
|
||||
onClose func() error // hook called when client is closed
|
||||
} |
||||
|
||||
func (c *baseClient) init() { |
||||
c.process = c.defaultProcess |
||||
c.processPipeline = c.defaultProcessPipeline |
||||
c.processTxPipeline = c.defaultProcessTxPipeline |
||||
} |
||||
|
||||
func (c *baseClient) String() string { |
||||
return fmt.Sprintf("Redis<%s db:%d>", c.getAddr(), c.opt.DB) |
||||
} |
||||
|
||||
func (c *baseClient) newConn() (*pool.Conn, error) { |
||||
cn, err := c.connPool.NewConn() |
||||
if err != nil { |
||||
return nil, err |
||||
} |
||||
|
||||
if cn.InitedAt.IsZero() { |
||||
if err := c.initConn(cn); err != nil { |
||||
_ = c.connPool.CloseConn(cn) |
||||
return nil, err |
||||
} |
||||
} |
||||
|
||||
return cn, nil |
||||
} |
||||
|
||||
func (c *baseClient) getConn() (*pool.Conn, error) { |
||||
if c.limiter != nil { |
||||
err := c.limiter.Allow() |
||||
if err != nil { |
||||
return nil, err |
||||
} |
||||
} |
||||
|
||||
cn, err := c._getConn() |
||||
if err != nil { |
||||
if c.limiter != nil { |
||||
c.limiter.ReportResult(err) |
||||
} |
||||
return nil, err |
||||
} |
||||
return cn, nil |
||||
} |
||||
|
||||
func (c *baseClient) _getConn() (*pool.Conn, error) { |
||||
cn, err := c.connPool.Get() |
||||
if err != nil { |
||||
return nil, err |
||||
} |
||||
|
||||
if cn.InitedAt.IsZero() { |
||||
err := c.initConn(cn) |
||||
if err != nil { |
||||
c.connPool.Remove(cn) |
||||
return nil, err |
||||
} |
||||
} |
||||
|
||||
return cn, nil |
||||
} |
||||
|
||||
func (c *baseClient) releaseConn(cn *pool.Conn, err error) { |
||||
if c.limiter != nil { |
||||
c.limiter.ReportResult(err) |
||||
} |
||||
|
||||
if internal.IsBadConn(err, false) { |
||||
c.connPool.Remove(cn) |
||||
} else { |
||||
c.connPool.Put(cn) |
||||
} |
||||
} |
||||
|
||||
func (c *baseClient) releaseConnStrict(cn *pool.Conn, err error) { |
||||
if c.limiter != nil { |
||||
c.limiter.ReportResult(err) |
||||
} |
||||
|
||||
if err == nil || internal.IsRedisError(err) { |
||||
c.connPool.Put(cn) |
||||
} else { |
||||
c.connPool.Remove(cn) |
||||
} |
||||
} |
||||
|
||||
func (c *baseClient) initConn(cn *pool.Conn) error { |
||||
cn.InitedAt = time.Now() |
||||
|
||||
if c.opt.Password == "" && |
||||
c.opt.DB == 0 && |
||||
!c.opt.readOnly && |
||||
c.opt.OnConnect == nil { |
||||
return nil |
||||
} |
||||
|
||||
conn := newConn(c.opt, cn) |
||||
_, err := conn.Pipelined(func(pipe Pipeliner) error { |
||||
if c.opt.Password != "" { |
||||
pipe.Auth(c.opt.Password) |
||||
} |
||||
|
||||
if c.opt.DB > 0 { |
||||
pipe.Select(c.opt.DB) |
||||
} |
||||
|
||||
if c.opt.readOnly { |
||||
pipe.ReadOnly() |
||||
} |
||||
|
||||
return nil |
||||
}) |
||||
if err != nil { |
||||
return err |
||||
} |
||||
|
||||
if c.opt.OnConnect != nil { |
||||
return c.opt.OnConnect(conn) |
||||
} |
||||
return nil |
||||
} |
||||
|
||||
// Do creates a Cmd from the args and processes the cmd.
|
||||
func (c *baseClient) Do(args ...interface{}) *Cmd { |
||||
cmd := NewCmd(args...) |
||||
_ = c.Process(cmd) |
||||
return cmd |
||||
} |
||||
|
||||
// WrapProcess wraps function that processes Redis commands.
|
||||
func (c *baseClient) WrapProcess( |
||||
fn func(oldProcess func(cmd Cmder) error) func(cmd Cmder) error, |
||||
) { |
||||
c.process = fn(c.process) |
||||
} |
||||
|
||||
func (c *baseClient) Process(cmd Cmder) error { |
||||
return c.process(cmd) |
||||
} |
||||
|
||||
func (c *baseClient) defaultProcess(cmd Cmder) error { |
||||
for attempt := 0; attempt <= c.opt.MaxRetries; attempt++ { |
||||
if attempt > 0 { |
||||
time.Sleep(c.retryBackoff(attempt)) |
||||
} |
||||
|
||||
cn, err := c.getConn() |
||||
if err != nil { |
||||
cmd.setErr(err) |
||||
if internal.IsRetryableError(err, true) { |
||||
continue |
||||
} |
||||
return err |
||||
} |
||||
|
||||
err = cn.WithWriter(c.opt.WriteTimeout, func(wr *proto.Writer) error { |
||||
return writeCmd(wr, cmd) |
||||
}) |
||||
if err != nil { |
||||
c.releaseConn(cn, err) |
||||
cmd.setErr(err) |
||||
if internal.IsRetryableError(err, true) { |
||||
continue |
||||
} |
||||
return err |
||||
} |
||||
|
||||
err = cn.WithReader(c.cmdTimeout(cmd), func(rd *proto.Reader) error { |
||||
return cmd.readReply(rd) |
||||
}) |
||||
c.releaseConn(cn, err) |
||||
if err != nil && internal.IsRetryableError(err, cmd.readTimeout() == nil) { |
||||
continue |
||||
} |
||||
|
||||
return err |
||||
} |
||||
|
||||
return cmd.Err() |
||||
} |
||||
|
||||
func (c *baseClient) retryBackoff(attempt int) time.Duration { |
||||
return internal.RetryBackoff(attempt, c.opt.MinRetryBackoff, c.opt.MaxRetryBackoff) |
||||
} |
||||
|
||||
func (c *baseClient) cmdTimeout(cmd Cmder) time.Duration { |
||||
if timeout := cmd.readTimeout(); timeout != nil { |
||||
t := *timeout |
||||
if t == 0 { |
||||
return 0 |
||||
} |
||||
return t + 10*time.Second |
||||
} |
||||
return c.opt.ReadTimeout |
||||
} |
||||
|
||||
// Close closes the client, releasing any open resources.
|
||||
//
|
||||
// It is rare to Close a Client, as the Client is meant to be
|
||||
// long-lived and shared between many goroutines.
|
||||
func (c *baseClient) Close() error { |
||||
var firstErr error |
||||
if c.onClose != nil { |
||||
if err := c.onClose(); err != nil && firstErr == nil { |
||||
firstErr = err |
||||
} |
||||
} |
||||
if err := c.connPool.Close(); err != nil && firstErr == nil { |
||||
firstErr = err |
||||
} |
||||
return firstErr |
||||
} |
||||
|
||||
func (c *baseClient) getAddr() string { |
||||
return c.opt.Addr |
||||
} |
||||
|
||||
func (c *baseClient) WrapProcessPipeline( |
||||
fn func(oldProcess func([]Cmder) error) func([]Cmder) error, |
||||
) { |
||||
c.processPipeline = fn(c.processPipeline) |
||||
c.processTxPipeline = fn(c.processTxPipeline) |
||||
} |
||||
|
||||
func (c *baseClient) defaultProcessPipeline(cmds []Cmder) error { |
||||
return c.generalProcessPipeline(cmds, c.pipelineProcessCmds) |
||||
} |
||||
|
||||
func (c *baseClient) defaultProcessTxPipeline(cmds []Cmder) error { |
||||
return c.generalProcessPipeline(cmds, c.txPipelineProcessCmds) |
||||
} |
||||
|
||||
type pipelineProcessor func(*pool.Conn, []Cmder) (bool, error) |
||||
|
||||
func (c *baseClient) generalProcessPipeline(cmds []Cmder, p pipelineProcessor) error { |
||||
for attempt := 0; attempt <= c.opt.MaxRetries; attempt++ { |
||||
if attempt > 0 { |
||||
time.Sleep(c.retryBackoff(attempt)) |
||||
} |
||||
|
||||
cn, err := c.getConn() |
||||
if err != nil { |
||||
setCmdsErr(cmds, err) |
||||
return err |
||||
} |
||||
|
||||
canRetry, err := p(cn, cmds) |
||||
c.releaseConnStrict(cn, err) |
||||
|
||||
if !canRetry || !internal.IsRetryableError(err, true) { |
||||
break |
||||
} |
||||
} |
||||
return cmdsFirstErr(cmds) |
||||
} |
||||
|
||||
func (c *baseClient) pipelineProcessCmds(cn *pool.Conn, cmds []Cmder) (bool, error) { |
||||
err := cn.WithWriter(c.opt.WriteTimeout, func(wr *proto.Writer) error { |
||||
return writeCmd(wr, cmds...) |
||||
}) |
||||
if err != nil { |
||||
setCmdsErr(cmds, err) |
||||
return true, err |
||||
} |
||||
|
||||
err = cn.WithReader(c.opt.ReadTimeout, func(rd *proto.Reader) error { |
||||
return pipelineReadCmds(rd, cmds) |
||||
}) |
||||
return true, err |
||||
} |
||||
|
||||
func pipelineReadCmds(rd *proto.Reader, cmds []Cmder) error { |
||||
for _, cmd := range cmds { |
||||
err := cmd.readReply(rd) |
||||
if err != nil && !internal.IsRedisError(err) { |
||||
return err |
||||
} |
||||
} |
||||
return nil |
||||
} |
||||
|
||||
func (c *baseClient) txPipelineProcessCmds(cn *pool.Conn, cmds []Cmder) (bool, error) { |
||||
err := cn.WithWriter(c.opt.WriteTimeout, func(wr *proto.Writer) error { |
||||
return txPipelineWriteMulti(wr, cmds) |
||||
}) |
||||
if err != nil { |
||||
setCmdsErr(cmds, err) |
||||
return true, err |
||||
} |
||||
|
||||
err = cn.WithReader(c.opt.ReadTimeout, func(rd *proto.Reader) error { |
||||
err := txPipelineReadQueued(rd, cmds) |
||||
if err != nil { |
||||
setCmdsErr(cmds, err) |
||||
return err |
||||
} |
||||
return pipelineReadCmds(rd, cmds) |
||||
}) |
||||
return false, err |
||||
} |
||||
|
||||
func txPipelineWriteMulti(wr *proto.Writer, cmds []Cmder) error { |
||||
multiExec := make([]Cmder, 0, len(cmds)+2) |
||||
multiExec = append(multiExec, NewStatusCmd("MULTI")) |
||||
multiExec = append(multiExec, cmds...) |
||||
multiExec = append(multiExec, NewSliceCmd("EXEC")) |
||||
return writeCmd(wr, multiExec...) |
||||
} |
||||
|
||||
func txPipelineReadQueued(rd *proto.Reader, cmds []Cmder) error { |
||||
// Parse queued replies.
|
||||
var statusCmd StatusCmd |
||||
err := statusCmd.readReply(rd) |
||||
if err != nil { |
||||
return err |
||||
} |
||||
|
||||
for range cmds { |
||||
err = statusCmd.readReply(rd) |
||||
if err != nil && !internal.IsRedisError(err) { |
||||
return err |
||||
} |
||||
} |
||||
|
||||
// Parse number of replies.
|
||||
line, err := rd.ReadLine() |
||||
if err != nil { |
||||
if err == Nil { |
||||
err = TxFailedErr |
||||
} |
||||
return err |
||||
} |
||||
|
||||
switch line[0] { |
||||
case proto.ErrorReply: |
||||
return proto.ParseErrorReply(line) |
||||
case proto.ArrayReply: |
||||
// ok
|
||||
default: |
||||
err := fmt.Errorf("redis: expected '*', but got line %q", line) |
||||
return err |
||||
} |
||||
|
||||
return nil |
||||
} |
||||
|
||||
//------------------------------------------------------------------------------
|
||||
|
||||
// Client is a Redis client representing a pool of zero or more
|
||||
// underlying connections. It's safe for concurrent use by multiple
|
||||
// goroutines.
|
||||
type Client struct { |
||||
baseClient |
||||
cmdable |
||||
|
||||
ctx context.Context |
||||
} |
||||
|
||||
// NewClient returns a client to the Redis Server specified by Options.
|
||||
func NewClient(opt *Options) *Client { |
||||
opt.init() |
||||
|
||||
c := Client{ |
||||
baseClient: baseClient{ |
||||
opt: opt, |
||||
connPool: newConnPool(opt), |
||||
}, |
||||
} |
||||
c.baseClient.init() |
||||
c.init() |
||||
|
||||
return &c |
||||
} |
||||
|
||||
func (c *Client) init() { |
||||
c.cmdable.setProcessor(c.Process) |
||||
} |
||||
|
||||
func (c *Client) Context() context.Context { |
||||
if c.ctx != nil { |
||||
return c.ctx |
||||
} |
||||
return context.Background() |
||||
} |
||||
|
||||
func (c *Client) WithContext(ctx context.Context) *Client { |
||||
if ctx == nil { |
||||
panic("nil context") |
||||
} |
||||
c2 := c.clone() |
||||
c2.ctx = ctx |
||||
return c2 |
||||
} |
||||
|
||||
func (c *Client) clone() *Client { |
||||
cp := *c |
||||
cp.init() |
||||
return &cp |
||||
} |
||||
|
||||
// Options returns read-only Options that were used to create the client.
|
||||
func (c *Client) Options() *Options { |
||||
return c.opt |
||||
} |
||||
|
||||
func (c *Client) SetLimiter(l Limiter) *Client { |
||||
c.limiter = l |
||||
return c |
||||
} |
||||
|
||||
type PoolStats pool.Stats |
||||
|
||||
// PoolStats returns connection pool stats.
|
||||
func (c *Client) PoolStats() *PoolStats { |
||||
stats := c.connPool.Stats() |
||||
return (*PoolStats)(stats) |
||||
} |
||||
|
||||
func (c *Client) Pipelined(fn func(Pipeliner) error) ([]Cmder, error) { |
||||
return c.Pipeline().Pipelined(fn) |
||||
} |
||||
|
||||
func (c *Client) Pipeline() Pipeliner { |
||||
pipe := Pipeline{ |
||||
exec: c.processPipeline, |
||||
} |
||||
pipe.statefulCmdable.setProcessor(pipe.Process) |
||||
return &pipe |
||||
} |
||||
|
||||
func (c *Client) TxPipelined(fn func(Pipeliner) error) ([]Cmder, error) { |
||||
return c.TxPipeline().Pipelined(fn) |
||||
} |
||||
|
||||
// TxPipeline acts like Pipeline, but wraps queued commands with MULTI/EXEC.
|
||||
func (c *Client) TxPipeline() Pipeliner { |
||||
pipe := Pipeline{ |
||||
exec: c.processTxPipeline, |
||||
} |
||||
pipe.statefulCmdable.setProcessor(pipe.Process) |
||||
return &pipe |
||||
} |
||||
|
||||
func (c *Client) pubSub() *PubSub { |
||||
pubsub := &PubSub{ |
||||
opt: c.opt, |
||||
|
||||
newConn: func(channels []string) (*pool.Conn, error) { |
||||
return c.newConn() |
||||
}, |
||||
closeConn: c.connPool.CloseConn, |
||||
} |
||||
pubsub.init() |
||||
return pubsub |
||||
} |
||||
|
||||
// Subscribe subscribes the client to the specified channels.
|
||||
// Channels can be omitted to create empty subscription.
|
||||
// Note that this method does not wait on a response from Redis, so the
|
||||
// subscription may not be active immediately. To force the connection to wait,
|
||||
// you may call the Receive() method on the returned *PubSub like so:
|
||||
//
|
||||
// sub := client.Subscribe(queryResp)
|
||||
// iface, err := sub.Receive()
|
||||
// if err != nil {
|
||||
// // handle error
|
||||
// }
|
||||
//
|
||||
// // Should be *Subscription, but others are possible if other actions have been
|
||||
// // taken on sub since it was created.
|
||||
// switch iface.(type) {
|
||||
// case *Subscription:
|
||||
// // subscribe succeeded
|
||||
// case *Message:
|
||||
// // received first message
|
||||
// case *Pong:
|
||||
// // pong received
|
||||
// default:
|
||||
// // handle error
|
||||
// }
|
||||
//
|
||||
// ch := sub.Channel()
|
||||
func (c *Client) Subscribe(channels ...string) *PubSub { |
||||
pubsub := c.pubSub() |
||||
if len(channels) > 0 { |
||||
_ = pubsub.Subscribe(channels...) |
||||
} |
||||
return pubsub |
||||
} |
||||
|
||||
// PSubscribe subscribes the client to the given patterns.
|
||||
// Patterns can be omitted to create empty subscription.
|
||||
func (c *Client) PSubscribe(channels ...string) *PubSub { |
||||
pubsub := c.pubSub() |
||||
if len(channels) > 0 { |
||||
_ = pubsub.PSubscribe(channels...) |
||||
} |
||||
return pubsub |
||||
} |
||||
|
||||
//------------------------------------------------------------------------------
|
||||
|
||||
// Conn is like Client, but its pool contains single connection.
|
||||
type Conn struct { |
||||
baseClient |
||||
statefulCmdable |
||||
} |
||||
|
||||
func newConn(opt *Options, cn *pool.Conn) *Conn { |
||||
c := Conn{ |
||||
baseClient: baseClient{ |
||||
opt: opt, |
||||
connPool: pool.NewSingleConnPool(cn), |
||||
}, |
||||
} |
||||
c.baseClient.init() |
||||
c.statefulCmdable.setProcessor(c.Process) |
||||
return &c |
||||
} |
||||
|
||||
func (c *Conn) Pipelined(fn func(Pipeliner) error) ([]Cmder, error) { |
||||
return c.Pipeline().Pipelined(fn) |
||||
} |
||||
|
||||
func (c *Conn) Pipeline() Pipeliner { |
||||
pipe := Pipeline{ |
||||
exec: c.processPipeline, |
||||
} |
||||
pipe.statefulCmdable.setProcessor(pipe.Process) |
||||
return &pipe |
||||
} |
||||
|
||||
func (c *Conn) TxPipelined(fn func(Pipeliner) error) ([]Cmder, error) { |
||||
return c.TxPipeline().Pipelined(fn) |
||||
} |
||||
|
||||
// TxPipeline acts like Pipeline, but wraps queued commands with MULTI/EXEC.
|
||||
func (c *Conn) TxPipeline() Pipeliner { |
||||
pipe := Pipeline{ |
||||
exec: c.processTxPipeline, |
||||
} |
||||
pipe.statefulCmdable.setProcessor(pipe.Process) |
||||
return &pipe |
||||
} |
@ -0,0 +1,15 @@ |
||||
run: |
||||
concurrency: 8 |
||||
deadline: 5m |
||||
tests: false |
||||
linters: |
||||
enable-all: true |
||||
disable: |
||||
- funlen |
||||
- gochecknoglobals |
||||
- gocognit |
||||
- goconst |
||||
- godox |
||||
- gosec |
||||
- maligned |
||||
- wsl |
@ -0,0 +1,22 @@ |
||||
dist: xenial |
||||
language: go |
||||
|
||||
services: |
||||
- redis-server |
||||
|
||||
go: |
||||
- 1.12.x |
||||
- 1.13.x |
||||
- tip |
||||
|
||||
matrix: |
||||
allow_failures: |
||||
- go: tip |
||||
|
||||
env: |
||||
- GO111MODULE=on |
||||
|
||||
go_import_path: github.com/go-redis/redis |
||||
|
||||
before_install: |
||||
- curl -sfL https://install.goreleaser.com/github.com/golangci/golangci-lint.sh | sh -s -- -b $(go env GOPATH)/bin v1.21.0 |
@ -0,0 +1,46 @@ |
||||
# Changelog |
||||
|
||||
## v7.2 |
||||
|
||||
- Existing `HMSet` is renamed to `HSet` and old deprecated `HMSet` is restored for Redis 3 users. |
||||
|
||||
## v7.1 |
||||
|
||||
- Existing `Cmd.String` is renamed to `Cmd.Text`. New `Cmd.String` implements `fmt.Stringer` interface. |
||||
|
||||
## v7 |
||||
|
||||
- *Important*. Tx.Pipeline now returns a non-transactional pipeline. Use Tx.TxPipeline for a transactional pipeline. |
||||
- WrapProcess is replaced with more convenient AddHook that has access to context.Context. |
||||
- WithContext now can not be used to create a shallow copy of the client. |
||||
- New methods ProcessContext, DoContext, and ExecContext. |
||||
- Client respects Context.Deadline when setting net.Conn deadline. |
||||
- Client listens on Context.Done while waiting for a connection from the pool and returns an error when context context is cancelled. |
||||
- Add PubSub.ChannelWithSubscriptions that sends `*Subscription` in addition to `*Message` to allow detecting reconnections. |
||||
- `time.Time` is now marshalled in RFC3339 format. `rdb.Get("foo").Time()` helper is added to parse the time. |
||||
- `SetLimiter` is removed and added `Options.Limiter` instead. |
||||
- `HMSet` is deprecated as of Redis v4. |
||||
|
||||
## v6.15 |
||||
|
||||
- Cluster and Ring pipelines process commands for each node in its own goroutine. |
||||
|
||||
## 6.14 |
||||
|
||||
- Added Options.MinIdleConns. |
||||
- Added Options.MaxConnAge. |
||||
- PoolStats.FreeConns is renamed to PoolStats.IdleConns. |
||||
- Add Client.Do to simplify creating custom commands. |
||||
- Add Cmd.String, Cmd.Int, Cmd.Int64, Cmd.Uint64, Cmd.Float64, and Cmd.Bool helpers. |
||||
- Lower memory usage. |
||||
|
||||
## v6.13 |
||||
|
||||
- Ring got new options called `HashReplicas` and `Hash`. It is recommended to set `HashReplicas = 1000` for better keys distribution between shards. |
||||
- Cluster client was optimized to use much less memory when reloading cluster state. |
||||
- PubSub.ReceiveMessage is re-worked to not use ReceiveTimeout so it does not lose data when timeout occurres. In most cases it is recommended to use PubSub.Channel instead. |
||||
- Dialer.KeepAlive is set to 5 minutes by default. |
||||
|
||||
## v6.12 |
||||
|
||||
- ClusterClient got new option called `ClusterSlots` which allows to build cluster of normal Redis Servers that don't have cluster mode enabled. See https://godoc.org/github.com/go-redis/redis#example-NewClusterClient--ManualSetup |
832
vendor/github.com/go-redis/redis/cluster.go → vendor/github.com/go-redis/redis/v7/cluster.go
generated
vendored
832
vendor/github.com/go-redis/redis/cluster.go → vendor/github.com/go-redis/redis/v7/cluster.go
generated
vendored
File diff suppressed because it is too large
Load Diff
826
vendor/github.com/go-redis/redis/command.go → vendor/github.com/go-redis/redis/v7/command.go
generated
vendored
826
vendor/github.com/go-redis/redis/command.go → vendor/github.com/go-redis/redis/v7/command.go
generated
vendored
File diff suppressed because it is too large
Load Diff
1196
vendor/github.com/go-redis/redis/commands.go → vendor/github.com/go-redis/redis/v7/commands.go
generated
vendored
1196
vendor/github.com/go-redis/redis/commands.go → vendor/github.com/go-redis/redis/v7/commands.go
generated
vendored
File diff suppressed because it is too large
Load Diff
0
vendor/github.com/go-redis/redis/doc.go → vendor/github.com/go-redis/redis/v7/doc.go
generated
vendored
0
vendor/github.com/go-redis/redis/doc.go → vendor/github.com/go-redis/redis/v7/doc.go
generated
vendored
@ -0,0 +1,108 @@ |
||||
package redis |
||||
|
||||
import ( |
||||
"context" |
||||
"io" |
||||
"net" |
||||
"strings" |
||||
|
||||
"github.com/go-redis/redis/v7/internal/pool" |
||||
"github.com/go-redis/redis/v7/internal/proto" |
||||
) |
||||
|
||||
var ErrClosed = pool.ErrClosed |
||||
|
||||
type Error interface { |
||||
error |
||||
|
||||
// RedisError is a no-op function but
|
||||
// serves to distinguish types that are Redis
|
||||
// errors from ordinary errors: a type is a
|
||||
// Redis error if it has a RedisError method.
|
||||
RedisError() |
||||
} |
||||
|
||||
var _ Error = proto.RedisError("") |
||||
|
||||
func isRetryableError(err error, retryTimeout bool) bool { |
||||
switch err { |
||||
case nil, context.Canceled, context.DeadlineExceeded: |
||||
return false |
||||
case io.EOF: |
||||
return true |
||||
} |
||||
if netErr, ok := err.(net.Error); ok { |
||||
if netErr.Timeout() { |
||||
return retryTimeout |
||||
} |
||||
return true |
||||
} |
||||
|
||||
s := err.Error() |
||||
if s == "ERR max number of clients reached" { |
||||
return true |
||||
} |
||||
if strings.HasPrefix(s, "LOADING ") { |
||||
return true |
||||
} |
||||
if strings.HasPrefix(s, "READONLY ") { |
||||
return true |
||||
} |
||||
if strings.HasPrefix(s, "CLUSTERDOWN ") { |
||||
return true |
||||
} |
||||
return false |
||||
} |
||||
|
||||
func isRedisError(err error) bool { |
||||
_, ok := err.(proto.RedisError) |
||||
return ok |
||||
} |
||||
|
||||
func isBadConn(err error, allowTimeout bool) bool { |
||||
if err == nil { |
||||
return false |
||||
} |
||||
if isRedisError(err) { |
||||
// Close connections in read only state in case domain addr is used
|
||||
// and domain resolves to a different Redis Server. See #790.
|
||||
return isReadOnlyError(err) |
||||
} |
||||
if allowTimeout { |
||||
if netErr, ok := err.(net.Error); ok && netErr.Timeout() { |
||||
return false |
||||
} |
||||
} |
||||
return true |
||||
} |
||||
|
||||
func isMovedError(err error) (moved bool, ask bool, addr string) { |
||||
if !isRedisError(err) { |
||||
return |
||||
} |
||||
|
||||
s := err.Error() |
||||
switch { |
||||
case strings.HasPrefix(s, "MOVED "): |
||||
moved = true |
||||
case strings.HasPrefix(s, "ASK "): |
||||
ask = true |
||||
default: |
||||
return |
||||
} |
||||
|
||||
ind := strings.LastIndex(s, " ") |
||||
if ind == -1 { |
||||
return false, false, "" |
||||
} |
||||
addr = s[ind+1:] |
||||
return |
||||
} |
||||
|
||||
func isLoadingError(err error) bool { |
||||
return strings.HasPrefix(err.Error(), "LOADING ") |
||||
} |
||||
|
||||
func isReadOnlyError(err error) bool { |
||||
return strings.HasPrefix(err.Error(), "READONLY ") |
||||
} |
@ -0,0 +1,15 @@ |
||||
module github.com/go-redis/redis/v7 |
||||
|
||||
require ( |
||||
github.com/golang/protobuf v1.3.2 // indirect |
||||
github.com/kr/pretty v0.1.0 // indirect |
||||
github.com/onsi/ginkgo v1.10.1 |
||||
github.com/onsi/gomega v1.7.0 |
||||
golang.org/x/net v0.0.0-20190923162816-aa69164e4478 // indirect |
||||
golang.org/x/sys v0.0.0-20191010194322-b09406accb47 // indirect |
||||
golang.org/x/text v0.3.2 // indirect |
||||
gopkg.in/check.v1 v1.0.0-20190902080502-41f04d3bba15 // indirect |
||||
gopkg.in/yaml.v2 v2.2.4 // indirect |
||||
) |
||||
|
||||
go 1.11 |
@ -0,0 +1,47 @@ |
||||
github.com/fsnotify/fsnotify v1.4.7 h1:IXs+QLmnXW2CcXuY+8Mzv/fWEsPGWxqefPtCP5CnV9I= |
||||
github.com/fsnotify/fsnotify v1.4.7/go.mod h1:jwhsz4b93w/PPRr/qN1Yymfu8t87LnFCMoQvtojpjFo= |
||||
github.com/golang/protobuf v1.2.0 h1:P3YflyNX/ehuJFLhxviNdFxQPkGK5cDcApsge1SqnvM= |
||||
github.com/golang/protobuf v1.2.0/go.mod h1:6lQm79b+lXiMfvg/cZm0SGofjICqVBUtrP5yJMmIC1U= |
||||
github.com/golang/protobuf v1.3.2 h1:6nsPYzhq5kReh6QImI3k5qWzO4PEbvbIW2cwSfR/6xs= |
||||
github.com/golang/protobuf v1.3.2/go.mod h1:6lQm79b+lXiMfvg/cZm0SGofjICqVBUtrP5yJMmIC1U= |
||||
github.com/hpcloud/tail v1.0.0 h1:nfCOvKYfkgYP8hkirhJocXT2+zOD8yUNjXaWfTlyFKI= |
||||
github.com/hpcloud/tail v1.0.0/go.mod h1:ab1qPbhIpdTxEkNHXyeSf5vhxWSCs/tWer42PpOxQnU= |
||||
github.com/kr/pretty v0.1.0 h1:L/CwN0zerZDmRFUapSPitk6f+Q3+0za1rQkzVuMiMFI= |
||||
github.com/kr/pretty v0.1.0/go.mod h1:dAy3ld7l9f0ibDNOQOHHMYYIIbhfbHSm3C4ZsoJORNo= |
||||
github.com/kr/pty v1.1.1/go.mod h1:pFQYn66WHrOpPYNljwOMqo10TkYh1fy3cYio2l3bCsQ= |
||||
github.com/kr/text v0.1.0 h1:45sCR5RtlFHMR4UwH9sdQ5TC8v0qDQCHnXt+kaKSTVE= |
||||
github.com/kr/text v0.1.0/go.mod h1:4Jbv+DJW3UT/LiOwJeYQe1efqtUx/iVham/4vfdArNI= |
||||
github.com/onsi/ginkgo v1.6.0/go.mod h1:lLunBs/Ym6LB5Z9jYTR76FiuTmxDTDusOGeTQH+WWjE= |
||||
github.com/onsi/ginkgo v1.10.1 h1:q/mM8GF/n0shIN8SaAZ0V+jnLPzen6WIVZdiwrRlMlo= |
||||
github.com/onsi/ginkgo v1.10.1/go.mod h1:lLunBs/Ym6LB5Z9jYTR76FiuTmxDTDusOGeTQH+WWjE= |
||||
github.com/onsi/gomega v1.7.0 h1:XPnZz8VVBHjVsy1vzJmRwIcSwiUO+JFfrv/xGiigmME= |
||||
github.com/onsi/gomega v1.7.0/go.mod h1:ex+gbHU/CVuBBDIJjb2X0qEXbFg53c61hWP/1CpauHY= |
||||
golang.org/x/crypto v0.0.0-20190308221718-c2843e01d9a2/go.mod h1:djNgcEr1/C05ACkg1iLfiJU5Ep61QUkGW8qpdssI0+w= |
||||
golang.org/x/net v0.0.0-20180906233101-161cd47e91fd h1:nTDtHvHSdCn1m6ITfMRqtOd/9+7a3s8RBNOZ3eYZzJA= |
||||
golang.org/x/net v0.0.0-20180906233101-161cd47e91fd/go.mod h1:mL1N/T3taQHkDXs73rZJwtUhF3w3ftmwwsq0BUmARs4= |
||||
golang.org/x/net v0.0.0-20190923162816-aa69164e4478 h1:l5EDrHhldLYb3ZRHDUhXF7Om7MvYXnkV9/iQNo1lX6g= |
||||
golang.org/x/net v0.0.0-20190923162816-aa69164e4478/go.mod h1:z5CRVTTTmAJ677TzLLGU+0bjPO0LkuOLi4/5GtJWs/s= |
||||
golang.org/x/sync v0.0.0-20180314180146-1d60e4601c6f h1:wMNYb4v58l5UBM7MYRLPG6ZhfOqbKu7X5eyFl8ZhKvA= |
||||
golang.org/x/sync v0.0.0-20180314180146-1d60e4601c6f/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM= |
||||
golang.org/x/sys v0.0.0-20180909124046-d0be0721c37e h1:o3PsSEY8E4eXWkXrIP9YJALUkVZqzHJT5DOasTyn8Vs= |
||||
golang.org/x/sys v0.0.0-20180909124046-d0be0721c37e/go.mod h1:STP8DvDyc/dI5b8T5hshtkjS+E42TnysNCUPdjciGhY= |
||||
golang.org/x/sys v0.0.0-20190215142949-d0b11bdaac8a/go.mod h1:STP8DvDyc/dI5b8T5hshtkjS+E42TnysNCUPdjciGhY= |
||||
golang.org/x/sys v0.0.0-20191010194322-b09406accb47 h1:/XfQ9z7ib8eEJX2hdgFTZJ/ntt0swNk5oYBziWeTCvY= |
||||
golang.org/x/sys v0.0.0-20191010194322-b09406accb47/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs= |
||||
golang.org/x/text v0.3.0 h1:g61tztE5qeGQ89tm6NTjjM9VPIm088od1l6aSorWRWg= |
||||
golang.org/x/text v0.3.0/go.mod h1:NqM8EUOU14njkJ3fqMW+pc6Ldnwhi/IjpwHt7yyuwOQ= |
||||
golang.org/x/text v0.3.2 h1:tW2bmiBqwgJj/UpqtC8EpXEZVYOwU0yG4iWbprSVAcs= |
||||
golang.org/x/text v0.3.2/go.mod h1:bEr9sfX3Q8Zfm5fL9x+3itogRgK3+ptLWKqgva+5dAk= |
||||
golang.org/x/tools v0.0.0-20180917221912-90fa682c2a6e/go.mod h1:n7NCudcB/nEzxVGmLbDWY5pfWTLqBcC2KZ6jyYvM4mQ= |
||||
gopkg.in/check.v1 v0.0.0-20161208181325-20d25e280405 h1:yhCVgyC4o1eVCa2tZl7eS0r+SDo693bJlVdllGtEeKM= |
||||
gopkg.in/check.v1 v0.0.0-20161208181325-20d25e280405/go.mod h1:Co6ibVJAznAaIkqp8huTwlJQCZ016jof/cbN4VW5Yz0= |
||||
gopkg.in/check.v1 v1.0.0-20190902080502-41f04d3bba15 h1:YR8cESwS4TdDjEe65xsg0ogRM/Nc3DYOhEAlW+xobZo= |
||||
gopkg.in/check.v1 v1.0.0-20190902080502-41f04d3bba15/go.mod h1:Co6ibVJAznAaIkqp8huTwlJQCZ016jof/cbN4VW5Yz0= |
||||
gopkg.in/fsnotify.v1 v1.4.7 h1:xOHLXZwVvI9hhs+cLKq5+I5onOuwQLhQwiu63xxlHs4= |
||||
gopkg.in/fsnotify.v1 v1.4.7/go.mod h1:Tz8NjZHkW78fSQdbUxIjBTcgA1z1m8ZHf0WmKUhAMys= |
||||
gopkg.in/tomb.v1 v1.0.0-20141024135613-dd632973f1e7 h1:uRGJdciOHaEIrze2W8Q3AKkepLTh2hOroT7a+7czfdQ= |
||||
gopkg.in/tomb.v1 v1.0.0-20141024135613-dd632973f1e7/go.mod h1:dt/ZhP58zS4L8KSrWDmTeBkI65Dw0HsyUHuEVlX15mw= |
||||
gopkg.in/yaml.v2 v2.2.1 h1:mUhvW9EsL+naU5Q3cakzfE91YhliOondGd6ZrsDBHQE= |
||||
gopkg.in/yaml.v2 v2.2.1/go.mod h1:hI93XBmqTisBFMUTm0b8Fm+jr3Dg1NNxqwp+5A1VGuI= |
||||
gopkg.in/yaml.v2 v2.2.4 h1:/eiJrUcujPVeJ3xlSWaiNi3uSVmDGBK1pDHUHAnao1I= |
||||
gopkg.in/yaml.v2 v2.2.4/go.mod h1:hI93XBmqTisBFMUTm0b8Fm+jr3Dg1NNxqwp+5A1VGuI= |
@ -0,0 +1,8 @@ |
||||
package internal |
||||
|
||||
import ( |
||||
"log" |
||||
"os" |
||||
) |
||||
|
||||
var Logger = log.New(os.Stderr, "redis: ", log.LstdFlags|log.Lshortfile) |
@ -0,0 +1,118 @@ |
||||
package pool |
||||
|
||||
import ( |
||||
"context" |
||||
"net" |
||||
"sync/atomic" |
||||
"time" |
||||
|
||||
"github.com/go-redis/redis/v7/internal/proto" |
||||
) |
||||
|
||||
var noDeadline = time.Time{} |
||||
|
||||
type Conn struct { |
||||
netConn net.Conn |
||||
|
||||
rd *proto.Reader |
||||
wr *proto.Writer |
||||
|
||||
Inited bool |
||||
pooled bool |
||||
createdAt time.Time |
||||
usedAt int64 // atomic
|
||||
} |
||||
|
||||
func NewConn(netConn net.Conn) *Conn { |
||||
cn := &Conn{ |
||||
netConn: netConn, |
||||
createdAt: time.Now(), |
||||
} |
||||
cn.rd = proto.NewReader(netConn) |
||||
cn.wr = proto.NewWriter(netConn) |
||||
cn.SetUsedAt(time.Now()) |
||||
return cn |
||||
} |
||||
|
||||
func (cn *Conn) UsedAt() time.Time { |
||||
unix := atomic.LoadInt64(&cn.usedAt) |
||||
return time.Unix(unix, 0) |
||||
} |
||||
|
||||
func (cn *Conn) SetUsedAt(tm time.Time) { |
||||
atomic.StoreInt64(&cn.usedAt, tm.Unix()) |
||||
} |
||||
|
||||
func (cn *Conn) SetNetConn(netConn net.Conn) { |
||||
cn.netConn = netConn |
||||
cn.rd.Reset(netConn) |
||||
cn.wr.Reset(netConn) |
||||
} |
||||
|
||||
func (cn *Conn) Write(b []byte) (int, error) { |
||||
return cn.netConn.Write(b) |
||||
} |
||||
|
||||
func (cn *Conn) RemoteAddr() net.Addr { |
||||
return cn.netConn.RemoteAddr() |
||||
} |
||||
|
||||
func (cn *Conn) WithReader(ctx context.Context, timeout time.Duration, fn func(rd *proto.Reader) error) error { |
||||
err := cn.netConn.SetReadDeadline(cn.deadline(ctx, timeout)) |
||||
if err != nil { |
||||
return err |
||||
} |
||||
return fn(cn.rd) |
||||
} |
||||
|
||||
func (cn *Conn) WithWriter( |
||||
ctx context.Context, timeout time.Duration, fn func(wr *proto.Writer) error, |
||||
) error { |
||||
err := cn.netConn.SetWriteDeadline(cn.deadline(ctx, timeout)) |
||||
if err != nil { |
||||
return err |
||||
} |
||||
|
||||
if cn.wr.Buffered() > 0 { |
||||
cn.wr.Reset(cn.netConn) |
||||
} |
||||
|
||||
err = fn(cn.wr) |
||||
if err != nil { |
||||
return err |
||||
} |
||||
|
||||
return cn.wr.Flush() |
||||
} |
||||
|
||||
func (cn *Conn) Close() error { |
||||
return cn.netConn.Close() |
||||
} |
||||
|
||||
func (cn *Conn) deadline(ctx context.Context, timeout time.Duration) time.Time { |
||||
tm := time.Now() |
||||
cn.SetUsedAt(tm) |
||||
|
||||
if timeout > 0 { |
||||
tm = tm.Add(timeout) |
||||
} |
||||
|
||||
if ctx != nil { |
||||
deadline, ok := ctx.Deadline() |
||||
if ok { |
||||
if timeout == 0 { |
||||
return deadline |
||||
} |
||||
if deadline.Before(tm) { |
||||
return deadline |
||||
} |
||||
return tm |
||||
} |
||||
} |
||||
|
||||
if timeout > 0 { |
||||
return tm |
||||
} |
||||
|
||||
return noDeadline |
||||
} |
@ -0,0 +1,208 @@ |
||||
package pool |
||||
|
||||
import ( |
||||
"context" |
||||
"fmt" |
||||
"sync/atomic" |
||||
) |
||||
|
||||
const ( |
||||
stateDefault = 0 |
||||
stateInited = 1 |
||||
stateClosed = 2 |
||||
) |
||||
|
||||
type BadConnError struct { |
||||
wrapped error |
||||
} |
||||
|
||||
var _ error = (*BadConnError)(nil) |
||||
|
||||
func (e BadConnError) Error() string { |
||||
s := "redis: Conn is in a bad state" |
||||
if e.wrapped != nil { |
||||
s += ": " + e.wrapped.Error() |
||||
} |
||||
return s |
||||
} |
||||
|
||||
func (e BadConnError) Unwrap() error { |
||||
return e.wrapped |
||||
} |
||||
|
||||
type SingleConnPool struct { |
||||
pool Pooler |
||||
level int32 // atomic
|
||||
|
||||
state uint32 // atomic
|
||||
ch chan *Conn |
||||
|
||||
_badConnError atomic.Value |
||||
} |
||||
|
||||
var _ Pooler = (*SingleConnPool)(nil) |
||||
|
||||
func NewSingleConnPool(pool Pooler) *SingleConnPool { |
||||
p, ok := pool.(*SingleConnPool) |
||||
if !ok { |
||||
p = &SingleConnPool{ |
||||
pool: pool, |
||||
ch: make(chan *Conn, 1), |
||||
} |
||||
} |
||||
atomic.AddInt32(&p.level, 1) |
||||
return p |
||||
} |
||||
|
||||
func (p *SingleConnPool) SetConn(cn *Conn) { |
||||
if atomic.CompareAndSwapUint32(&p.state, stateDefault, stateInited) { |
||||
p.ch <- cn |
||||
} else { |
||||
panic("not reached") |
||||
} |
||||
} |
||||
|
||||
func (p *SingleConnPool) NewConn(ctx context.Context) (*Conn, error) { |
||||
return p.pool.NewConn(ctx) |
||||
} |
||||
|
||||
func (p *SingleConnPool) CloseConn(cn *Conn) error { |
||||
return p.pool.CloseConn(cn) |
||||
} |
||||
|
||||
func (p *SingleConnPool) Get(ctx context.Context) (*Conn, error) { |
||||
// In worst case this races with Close which is not a very common operation.
|
||||
for i := 0; i < 1000; i++ { |
||||
switch atomic.LoadUint32(&p.state) { |
||||
case stateDefault: |
||||
cn, err := p.pool.Get(ctx) |
||||
if err != nil { |
||||
return nil, err |
||||
} |
||||
if atomic.CompareAndSwapUint32(&p.state, stateDefault, stateInited) { |
||||
return cn, nil |
||||
} |
||||
p.pool.Remove(cn, ErrClosed) |
||||
case stateInited: |
||||
if err := p.badConnError(); err != nil { |
||||
return nil, err |
||||
} |
||||
cn, ok := <-p.ch |
||||
if !ok { |
||||
return nil, ErrClosed |
||||
} |
||||
return cn, nil |
||||
case stateClosed: |
||||
return nil, ErrClosed |
||||
default: |
||||
panic("not reached") |
||||
} |
||||
} |
||||
return nil, fmt.Errorf("redis: SingleConnPool.Get: infinite loop") |
||||
} |
||||
|
||||
func (p *SingleConnPool) Put(cn *Conn) { |
||||
defer func() { |
||||
if recover() != nil { |
||||
p.freeConn(cn) |
||||
} |
||||
}() |
||||
p.ch <- cn |
||||
} |
||||
|
||||
func (p *SingleConnPool) freeConn(cn *Conn) { |
||||
if err := p.badConnError(); err != nil { |
||||
p.pool.Remove(cn, err) |
||||
} else { |
||||
p.pool.Put(cn) |
||||
} |
||||
} |
||||
|
||||
func (p *SingleConnPool) Remove(cn *Conn, reason error) { |
||||
defer func() { |
||||
if recover() != nil { |
||||
p.pool.Remove(cn, ErrClosed) |
||||
} |
||||
}() |
||||
p._badConnError.Store(BadConnError{wrapped: reason}) |
||||
p.ch <- cn |
||||
} |
||||
|
||||
func (p *SingleConnPool) Len() int { |
||||
switch atomic.LoadUint32(&p.state) { |
||||
case stateDefault: |
||||
return 0 |
||||
case stateInited: |
||||
return 1 |
||||
case stateClosed: |
||||
return 0 |
||||
default: |
||||
panic("not reached") |
||||
} |
||||
} |
||||
|
||||
func (p *SingleConnPool) IdleLen() int { |
||||
return len(p.ch) |
||||
} |
||||
|
||||
func (p *SingleConnPool) Stats() *Stats { |
||||
return &Stats{} |
||||
} |
||||
|
||||
func (p *SingleConnPool) Close() error { |
||||
level := atomic.AddInt32(&p.level, -1) |
||||
if level > 0 { |
||||
return nil |
||||
} |
||||
|
||||
for i := 0; i < 1000; i++ { |
||||
state := atomic.LoadUint32(&p.state) |
||||
if state == stateClosed { |
||||
return ErrClosed |
||||
} |
||||
if atomic.CompareAndSwapUint32(&p.state, state, stateClosed) { |
||||
close(p.ch) |
||||
cn, ok := <-p.ch |
||||
if ok { |
||||
p.freeConn(cn) |
||||
} |
||||
return nil |
||||
} |
||||
} |
||||
|
||||
return fmt.Errorf("redis: SingleConnPool.Close: infinite loop") |
||||
} |
||||
|
||||
func (p *SingleConnPool) Reset() error { |
||||
if p.badConnError() == nil { |
||||
return nil |
||||
} |
||||
|
||||
select { |
||||
case cn, ok := <-p.ch: |
||||
if !ok { |
||||
return ErrClosed |
||||
} |
||||
p.pool.Remove(cn, ErrClosed) |
||||
p._badConnError.Store(BadConnError{wrapped: nil}) |
||||
default: |
||||
return fmt.Errorf("redis: SingleConnPool does not have a Conn") |
||||
} |
||||
|
||||
if !atomic.CompareAndSwapUint32(&p.state, stateInited, stateDefault) { |
||||
state := atomic.LoadUint32(&p.state) |
||||
return fmt.Errorf("redis: invalid SingleConnPool state: %d", state) |
||||
} |
||||
|
||||
return nil |
||||
} |
||||
|
||||
func (p *SingleConnPool) badConnError() error { |
||||
if v := p._badConnError.Load(); v != nil { |
||||
err := v.(BadConnError) |
||||
if err.wrapped != nil { |
||||
return err |
||||
} |
||||
} |
||||
return nil |
||||
} |
@ -0,0 +1,56 @@ |
||||
package internal |
||||
|
||||
import ( |
||||
"context" |
||||
"time" |
||||
|
||||
"github.com/go-redis/redis/v7/internal/util" |
||||
) |
||||
|
||||
func Sleep(ctx context.Context, dur time.Duration) error { |
||||
t := time.NewTimer(dur) |
||||
defer t.Stop() |
||||
|
||||
select { |
||||
case <-t.C: |
||||
return nil |
||||
case <-ctx.Done(): |
||||
return ctx.Err() |
||||
} |
||||
} |
||||
|
||||
func ToLower(s string) string { |
||||
if isLower(s) { |
||||
return s |
||||
} |
||||
|
||||
b := make([]byte, len(s)) |
||||
for i := range b { |
||||
c := s[i] |
||||
if c >= 'A' && c <= 'Z' { |
||||
c += 'a' - 'A' |
||||
} |
||||
b[i] = c |
||||
} |
||||
return util.BytesToString(b) |
||||
} |
||||
|
||||
func isLower(s string) bool { |
||||
for i := 0; i < len(s); i++ { |
||||
c := s[i] |
||||
if c >= 'A' && c <= 'Z' { |
||||
return false |
||||
} |
||||
} |
||||
return true |
||||
} |
||||
|
||||
func Unwrap(err error) error { |
||||
u, ok := err.(interface { |
||||
Unwrap() error |
||||
}) |
||||
if !ok { |
||||
return nil |
||||
} |
||||
return u.Unwrap() |
||||
} |
260
vendor/github.com/go-redis/redis/pubsub.go → vendor/github.com/go-redis/redis/v7/pubsub.go
generated
vendored
260
vendor/github.com/go-redis/redis/pubsub.go → vendor/github.com/go-redis/redis/v7/pubsub.go
generated
vendored
@ -0,0 +1,758 @@ |
||||
package redis |
||||
|
||||
import ( |
||||
"context" |
||||
"fmt" |
||||
"log" |
||||
"time" |
||||
|
||||
"github.com/go-redis/redis/v7/internal" |
||||
"github.com/go-redis/redis/v7/internal/pool" |
||||
"github.com/go-redis/redis/v7/internal/proto" |
||||
) |
||||
|
||||
// Nil reply returned by Redis when key does not exist.
|
||||
const Nil = proto.Nil |
||||
|
||||
func SetLogger(logger *log.Logger) { |
||||
internal.Logger = logger |
||||
} |
||||
|
||||
//------------------------------------------------------------------------------
|
||||
|
||||
type Hook interface { |
||||
BeforeProcess(ctx context.Context, cmd Cmder) (context.Context, error) |
||||
AfterProcess(ctx context.Context, cmd Cmder) error |
||||
|
||||
BeforeProcessPipeline(ctx context.Context, cmds []Cmder) (context.Context, error) |
||||
AfterProcessPipeline(ctx context.Context, cmds []Cmder) error |
||||
} |
||||
|
||||
type hooks struct { |
||||
hooks []Hook |
||||
} |
||||
|
||||
func (hs *hooks) lock() { |
||||
hs.hooks = hs.hooks[:len(hs.hooks):len(hs.hooks)] |
||||
} |
||||
|
||||
func (hs hooks) clone() hooks { |
||||
clone := hs |
||||
clone.lock() |
||||
return clone |
||||
} |
||||
|
||||
func (hs *hooks) AddHook(hook Hook) { |
||||
hs.hooks = append(hs.hooks, hook) |
||||
} |
||||
|
||||
func (hs hooks) process( |
||||
ctx context.Context, cmd Cmder, fn func(context.Context, Cmder) error, |
||||
) error { |
||||
ctx, err := hs.beforeProcess(ctx, cmd) |
||||
if err != nil { |
||||
cmd.SetErr(err) |
||||
return err |
||||
} |
||||
|
||||
cmdErr := fn(ctx, cmd) |
||||
|
||||
if err := hs.afterProcess(ctx, cmd); err != nil { |
||||
cmd.SetErr(err) |
||||
return err |
||||
} |
||||
|
||||
return cmdErr |
||||
} |
||||
|
||||
func (hs hooks) beforeProcess(ctx context.Context, cmd Cmder) (context.Context, error) { |
||||
for _, h := range hs.hooks { |
||||
var err error |
||||
ctx, err = h.BeforeProcess(ctx, cmd) |
||||
if err != nil { |
||||
return nil, err |
||||
} |
||||
} |
||||
return ctx, nil |
||||
} |
||||
|
||||
func (hs hooks) afterProcess(ctx context.Context, cmd Cmder) error { |
||||
var firstErr error |
||||
for _, h := range hs.hooks { |
||||
err := h.AfterProcess(ctx, cmd) |
||||
if err != nil && firstErr == nil { |
||||
firstErr = err |
||||
} |
||||
} |
||||
return firstErr |
||||
} |
||||
|
||||
func (hs hooks) processPipeline( |
||||
ctx context.Context, cmds []Cmder, fn func(context.Context, []Cmder) error, |
||||
) error { |
||||
ctx, err := hs.beforeProcessPipeline(ctx, cmds) |
||||
if err != nil { |
||||
setCmdsErr(cmds, err) |
||||
return err |
||||
} |
||||
|
||||
cmdsErr := fn(ctx, cmds) |
||||
|
||||
if err := hs.afterProcessPipeline(ctx, cmds); err != nil { |
||||
setCmdsErr(cmds, err) |
||||
return err |
||||
} |
||||
|
||||
return cmdsErr |
||||
} |
||||
|
||||
func (hs hooks) beforeProcessPipeline(ctx context.Context, cmds []Cmder) (context.Context, error) { |
||||
for _, h := range hs.hooks { |
||||
var err error |
||||
ctx, err = h.BeforeProcessPipeline(ctx, cmds) |
||||
if err != nil { |
||||
return nil, err |
||||
} |
||||
} |
||||
return ctx, nil |
||||
} |
||||
|
||||
func (hs hooks) afterProcessPipeline(ctx context.Context, cmds []Cmder) error { |
||||
var firstErr error |
||||
for _, h := range hs.hooks { |
||||
err := h.AfterProcessPipeline(ctx, cmds) |
||||
if err != nil && firstErr == nil { |
||||
firstErr = err |
||||
} |
||||
} |
||||
return firstErr |
||||
} |
||||
|
||||
func (hs hooks) processTxPipeline( |
||||
ctx context.Context, cmds []Cmder, fn func(context.Context, []Cmder) error, |
||||
) error { |
||||
cmds = wrapMultiExec(cmds) |
||||
return hs.processPipeline(ctx, cmds, fn) |
||||
} |
||||
|
||||
//------------------------------------------------------------------------------
|
||||
|
||||
type baseClient struct { |
||||
opt *Options |
||||
connPool pool.Pooler |
||||
|
||||
onClose func() error // hook called when client is closed
|
||||
} |
||||
|
||||
func newBaseClient(opt *Options, connPool pool.Pooler) *baseClient { |
||||
return &baseClient{ |
||||
opt: opt, |
||||
connPool: connPool, |
||||
} |
||||
} |
||||
|
||||
func (c *baseClient) clone() *baseClient { |
||||
clone := *c |
||||
return &clone |
||||
} |
||||
|
||||
func (c *baseClient) withTimeout(timeout time.Duration) *baseClient { |
||||
opt := c.opt.clone() |
||||
opt.ReadTimeout = timeout |
||||
opt.WriteTimeout = timeout |
||||
|
||||
clone := c.clone() |
||||
clone.opt = opt |
||||
|
||||
return clone |
||||
} |
||||
|
||||
func (c *baseClient) String() string { |
||||
return fmt.Sprintf("Redis<%s db:%d>", c.getAddr(), c.opt.DB) |
||||
} |
||||
|
||||
func (c *baseClient) newConn(ctx context.Context) (*pool.Conn, error) { |
||||
cn, err := c.connPool.NewConn(ctx) |
||||
if err != nil { |
||||
return nil, err |
||||
} |
||||
|
||||
err = c.initConn(ctx, cn) |
||||
if err != nil { |
||||
_ = c.connPool.CloseConn(cn) |
||||
return nil, err |
||||
} |
||||
|
||||
return cn, nil |
||||
} |
||||
|
||||
func (c *baseClient) getConn(ctx context.Context) (*pool.Conn, error) { |
||||
if c.opt.Limiter != nil { |
||||
err := c.opt.Limiter.Allow() |
||||
if err != nil { |
||||
return nil, err |
||||
} |
||||
} |
||||
|
||||
cn, err := c._getConn(ctx) |
||||
if err != nil { |
||||
if c.opt.Limiter != nil { |
||||
c.opt.Limiter.ReportResult(err) |
||||
} |
||||
return nil, err |
||||
} |
||||
return cn, nil |
||||
} |
||||
|
||||
func (c *baseClient) _getConn(ctx context.Context) (*pool.Conn, error) { |
||||
cn, err := c.connPool.Get(ctx) |
||||
if err != nil { |
||||
return nil, err |
||||
} |
||||
|
||||
err = c.initConn(ctx, cn) |
||||
if err != nil { |
||||
c.connPool.Remove(cn, err) |
||||
if err := internal.Unwrap(err); err != nil { |
||||
return nil, err |
||||
} |
||||
return nil, err |
||||
} |
||||
|
||||
return cn, nil |
||||
} |
||||
|
||||
func (c *baseClient) initConn(ctx context.Context, cn *pool.Conn) error { |
||||
if cn.Inited { |
||||
return nil |
||||
} |
||||
cn.Inited = true |
||||
|
||||
if c.opt.Password == "" && |
||||
c.opt.DB == 0 && |
||||
!c.opt.readOnly && |
||||
c.opt.OnConnect == nil { |
||||
return nil |
||||
} |
||||
|
||||
connPool := pool.NewSingleConnPool(nil) |
||||
connPool.SetConn(cn) |
||||
conn := newConn(ctx, c.opt, connPool) |
||||
|
||||
_, err := conn.Pipelined(func(pipe Pipeliner) error { |
||||
if c.opt.Password != "" { |
||||
if c.opt.Username != "" { |
||||
pipe.AuthACL(c.opt.Username, c.opt.Password) |
||||
} else { |
||||
pipe.Auth(c.opt.Password) |
||||
} |
||||
} |
||||
|
||||
if c.opt.DB > 0 { |
||||
pipe.Select(c.opt.DB) |
||||
} |
||||
|
||||
if c.opt.readOnly { |
||||
pipe.ReadOnly() |
||||
} |
||||
|
||||
return nil |
||||
}) |
||||
if err != nil { |
||||
return err |
||||
} |
||||
|
||||
if c.opt.OnConnect != nil { |
||||
return c.opt.OnConnect(conn) |
||||
} |
||||
return nil |
||||
} |
||||
|
||||
func (c *baseClient) releaseConn(cn *pool.Conn, err error) { |
||||
if c.opt.Limiter != nil { |
||||
c.opt.Limiter.ReportResult(err) |
||||
} |
||||
|
||||
if isBadConn(err, false) { |
||||
c.connPool.Remove(cn, err) |
||||
} else { |
||||
c.connPool.Put(cn) |
||||
} |
||||
} |
||||
|
||||
func (c *baseClient) withConn( |
||||
ctx context.Context, fn func(context.Context, *pool.Conn) error, |
||||
) error { |
||||
cn, err := c.getConn(ctx) |
||||
if err != nil { |
||||
return err |
||||
} |
||||
defer func() { |
||||
c.releaseConn(cn, err) |
||||
}() |
||||
|
||||
err = fn(ctx, cn) |
||||
return err |
||||
} |
||||
|
||||
func (c *baseClient) process(ctx context.Context, cmd Cmder) error { |
||||
err := c._process(ctx, cmd) |
||||
if err != nil { |
||||
cmd.SetErr(err) |
||||
return err |
||||
} |
||||
return nil |
||||
} |
||||
|
||||
func (c *baseClient) _process(ctx context.Context, cmd Cmder) error { |
||||
var lastErr error |
||||
for attempt := 0; attempt <= c.opt.MaxRetries; attempt++ { |
||||
if attempt > 0 { |
||||
if err := internal.Sleep(ctx, c.retryBackoff(attempt)); err != nil { |
||||
return err |
||||
} |
||||
} |
||||
|
||||
retryTimeout := true |
||||
lastErr = c.withConn(ctx, func(ctx context.Context, cn *pool.Conn) error { |
||||
err := cn.WithWriter(ctx, c.opt.WriteTimeout, func(wr *proto.Writer) error { |
||||
return writeCmd(wr, cmd) |
||||
}) |
||||
if err != nil { |
||||
return err |
||||
} |
||||
|
||||
err = cn.WithReader(ctx, c.cmdTimeout(cmd), cmd.readReply) |
||||
if err != nil { |
||||
retryTimeout = cmd.readTimeout() == nil |
||||
return err |
||||
} |
||||
|
||||
return nil |
||||
}) |
||||
if lastErr == nil || !isRetryableError(lastErr, retryTimeout) { |
||||
return lastErr |
||||
} |
||||
} |
||||
return lastErr |
||||
} |
||||
|
||||
func (c *baseClient) retryBackoff(attempt int) time.Duration { |
||||
return internal.RetryBackoff(attempt, c.opt.MinRetryBackoff, c.opt.MaxRetryBackoff) |
||||
} |
||||
|
||||
func (c *baseClient) cmdTimeout(cmd Cmder) time.Duration { |
||||
if timeout := cmd.readTimeout(); timeout != nil { |
||||
t := *timeout |
||||
if t == 0 { |
||||
return 0 |
||||
} |
||||
return t + 10*time.Second |
||||
} |
||||
return c.opt.ReadTimeout |
||||
} |
||||
|
||||
// Close closes the client, releasing any open resources.
|
||||
//
|
||||
// It is rare to Close a Client, as the Client is meant to be
|
||||
// long-lived and shared between many goroutines.
|
||||
func (c *baseClient) Close() error { |
||||
var firstErr error |
||||
if c.onClose != nil { |
||||
if err := c.onClose(); err != nil { |
||||
firstErr = err |
||||
} |
||||
} |
||||
if err := c.connPool.Close(); err != nil && firstErr == nil { |
||||
firstErr = err |
||||
} |
||||
return firstErr |
||||
} |
||||
|
||||
func (c *baseClient) getAddr() string { |
||||
return c.opt.Addr |
||||
} |
||||
|
||||
func (c *baseClient) processPipeline(ctx context.Context, cmds []Cmder) error { |
||||
return c.generalProcessPipeline(ctx, cmds, c.pipelineProcessCmds) |
||||
} |
||||
|
||||
func (c *baseClient) processTxPipeline(ctx context.Context, cmds []Cmder) error { |
||||
return c.generalProcessPipeline(ctx, cmds, c.txPipelineProcessCmds) |
||||
} |
||||
|
||||
type pipelineProcessor func(context.Context, *pool.Conn, []Cmder) (bool, error) |
||||
|
||||
func (c *baseClient) generalProcessPipeline( |
||||
ctx context.Context, cmds []Cmder, p pipelineProcessor, |
||||
) error { |
||||
err := c._generalProcessPipeline(ctx, cmds, p) |
||||
if err != nil { |
||||
setCmdsErr(cmds, err) |
||||
return err |
||||
} |
||||
return cmdsFirstErr(cmds) |
||||
} |
||||
|
||||
func (c *baseClient) _generalProcessPipeline( |
||||
ctx context.Context, cmds []Cmder, p pipelineProcessor, |
||||
) error { |
||||
var lastErr error |
||||
for attempt := 0; attempt <= c.opt.MaxRetries; attempt++ { |
||||
if attempt > 0 { |
||||
if err := internal.Sleep(ctx, c.retryBackoff(attempt)); err != nil { |
||||
return err |
||||
} |
||||
} |
||||
|
||||
var canRetry bool |
||||
lastErr = c.withConn(ctx, func(ctx context.Context, cn *pool.Conn) error { |
||||
var err error |
||||
canRetry, err = p(ctx, cn, cmds) |
||||
return err |
||||
}) |
||||
if lastErr == nil || !canRetry || !isRetryableError(lastErr, true) { |
||||
return lastErr |
||||
} |
||||
} |
||||
return lastErr |
||||
} |
||||
|
||||
func (c *baseClient) pipelineProcessCmds( |
||||
ctx context.Context, cn *pool.Conn, cmds []Cmder, |
||||
) (bool, error) { |
||||
err := cn.WithWriter(ctx, c.opt.WriteTimeout, func(wr *proto.Writer) error { |
||||
return writeCmds(wr, cmds) |
||||
}) |
||||
if err != nil { |
||||
return true, err |
||||
} |
||||
|
||||
err = cn.WithReader(ctx, c.opt.ReadTimeout, func(rd *proto.Reader) error { |
||||
return pipelineReadCmds(rd, cmds) |
||||
}) |
||||
return true, err |
||||
} |
||||
|
||||
func pipelineReadCmds(rd *proto.Reader, cmds []Cmder) error { |
||||
for _, cmd := range cmds { |
||||
err := cmd.readReply(rd) |
||||
if err != nil && !isRedisError(err) { |
||||
return err |
||||
} |
||||
} |
||||
return nil |
||||
} |
||||
|
||||
func (c *baseClient) txPipelineProcessCmds( |
||||
ctx context.Context, cn *pool.Conn, cmds []Cmder, |
||||
) (bool, error) { |
||||
err := cn.WithWriter(ctx, c.opt.WriteTimeout, func(wr *proto.Writer) error { |
||||
return writeCmds(wr, cmds) |
||||
}) |
||||
if err != nil { |
||||
return true, err |
||||
} |
||||
|
||||
err = cn.WithReader(ctx, c.opt.ReadTimeout, func(rd *proto.Reader) error { |
||||
statusCmd := cmds[0].(*StatusCmd) |
||||
// Trim multi and exec.
|
||||
cmds = cmds[1 : len(cmds)-1] |
||||
|
||||
err := txPipelineReadQueued(rd, statusCmd, cmds) |
||||
if err != nil { |
||||
return err |
||||
} |
||||
|
||||
return pipelineReadCmds(rd, cmds) |
||||
}) |
||||
return false, err |
||||
} |
||||
|
||||
func wrapMultiExec(cmds []Cmder) []Cmder { |
||||
if len(cmds) == 0 { |
||||
panic("not reached") |
||||
} |
||||
cmds = append(cmds, make([]Cmder, 2)...) |
||||
copy(cmds[1:], cmds[:len(cmds)-2]) |
||||
cmds[0] = NewStatusCmd("multi") |
||||
cmds[len(cmds)-1] = NewSliceCmd("exec") |
||||
return cmds |
||||
} |
||||
|
||||
func txPipelineReadQueued(rd *proto.Reader, statusCmd *StatusCmd, cmds []Cmder) error { |
||||
// Parse queued replies.
|
||||
if err := statusCmd.readReply(rd); err != nil { |
||||
return err |
||||
} |
||||
|
||||
for range cmds { |
||||
if err := statusCmd.readReply(rd); err != nil && !isRedisError(err) { |
||||
return err |
||||
} |
||||
} |
||||
|
||||
// Parse number of replies.
|
||||
line, err := rd.ReadLine() |
||||
if err != nil { |
||||
if err == Nil { |
||||
err = TxFailedErr |
||||
} |
||||
return err |
||||
} |
||||
|
||||
switch line[0] { |
||||
case proto.ErrorReply: |
||||
return proto.ParseErrorReply(line) |
||||
case proto.ArrayReply: |
||||
// ok
|
||||
default: |
||||
err := fmt.Errorf("redis: expected '*', but got line %q", line) |
||||
return err |
||||
} |
||||
|
||||
return nil |
||||
} |
||||
|
||||
//------------------------------------------------------------------------------
|
||||
|
||||
// Client is a Redis client representing a pool of zero or more
|
||||
// underlying connections. It's safe for concurrent use by multiple
|
||||
// goroutines.
|
||||
type Client struct { |
||||
*baseClient |
||||
cmdable |
||||
hooks |
||||
ctx context.Context |
||||
} |
||||
|
||||
// NewClient returns a client to the Redis Server specified by Options.
|
||||
func NewClient(opt *Options) *Client { |
||||
opt.init() |
||||
|
||||
c := Client{ |
||||
baseClient: newBaseClient(opt, newConnPool(opt)), |
||||
ctx: context.Background(), |
||||
} |
||||
c.cmdable = c.Process |
||||
|
||||
return &c |
||||
} |
||||
|
||||
func (c *Client) clone() *Client { |
||||
clone := *c |
||||
clone.cmdable = clone.Process |
||||
clone.hooks.lock() |
||||
return &clone |
||||
} |
||||
|
||||
func (c *Client) WithTimeout(timeout time.Duration) *Client { |
||||
clone := c.clone() |
||||
clone.baseClient = c.baseClient.withTimeout(timeout) |
||||
return clone |
||||
} |
||||
|
||||
func (c *Client) Context() context.Context { |
||||
return c.ctx |
||||
} |
||||
|
||||
func (c *Client) WithContext(ctx context.Context) *Client { |
||||
if ctx == nil { |
||||
panic("nil context") |
||||
} |
||||
clone := c.clone() |
||||
clone.ctx = ctx |
||||
return clone |
||||
} |
||||
|
||||
func (c *Client) Conn() *Conn { |
||||
return newConn(c.ctx, c.opt, pool.NewSingleConnPool(c.connPool)) |
||||
} |
||||
|
||||
// Do creates a Cmd from the args and processes the cmd.
|
||||
func (c *Client) Do(args ...interface{}) *Cmd { |
||||
return c.DoContext(c.ctx, args...) |
||||
} |
||||
|
||||
func (c *Client) DoContext(ctx context.Context, args ...interface{}) *Cmd { |
||||
cmd := NewCmd(args...) |
||||
_ = c.ProcessContext(ctx, cmd) |
||||
return cmd |
||||
} |
||||
|
||||
func (c *Client) Process(cmd Cmder) error { |
||||
return c.ProcessContext(c.ctx, cmd) |
||||
} |
||||
|
||||
func (c *Client) ProcessContext(ctx context.Context, cmd Cmder) error { |
||||
return c.hooks.process(ctx, cmd, c.baseClient.process) |
||||
} |
||||
|
||||
func (c *Client) processPipeline(ctx context.Context, cmds []Cmder) error { |
||||
return c.hooks.processPipeline(ctx, cmds, c.baseClient.processPipeline) |
||||
} |
||||
|
||||
func (c *Client) processTxPipeline(ctx context.Context, cmds []Cmder) error { |
||||
return c.hooks.processTxPipeline(ctx, cmds, c.baseClient.processTxPipeline) |
||||
} |
||||
|
||||
// Options returns read-only Options that were used to create the client.
|
||||
func (c *Client) Options() *Options { |
||||
return c.opt |
||||
} |
||||
|
||||
type PoolStats pool.Stats |
||||
|
||||
// PoolStats returns connection pool stats.
|
||||
func (c *Client) PoolStats() *PoolStats { |
||||
stats := c.connPool.Stats() |
||||
return (*PoolStats)(stats) |
||||
} |
||||
|
||||
func (c *Client) Pipelined(fn func(Pipeliner) error) ([]Cmder, error) { |
||||
return c.Pipeline().Pipelined(fn) |
||||
} |
||||
|
||||
func (c *Client) Pipeline() Pipeliner { |
||||
pipe := Pipeline{ |
||||
ctx: c.ctx, |
||||
exec: c.processPipeline, |
||||
} |
||||
pipe.init() |
||||
return &pipe |
||||
} |
||||
|
||||
func (c *Client) TxPipelined(fn func(Pipeliner) error) ([]Cmder, error) { |
||||
return c.TxPipeline().Pipelined(fn) |
||||
} |
||||
|
||||
// TxPipeline acts like Pipeline, but wraps queued commands with MULTI/EXEC.
|
||||
func (c *Client) TxPipeline() Pipeliner { |
||||
pipe := Pipeline{ |
||||
ctx: c.ctx, |
||||
exec: c.processTxPipeline, |
||||
} |
||||
pipe.init() |
||||
return &pipe |
||||
} |
||||
|
||||
func (c *Client) pubSub() *PubSub { |
||||
pubsub := &PubSub{ |
||||
opt: c.opt, |
||||
|
||||
newConn: func(channels []string) (*pool.Conn, error) { |
||||
return c.newConn(context.TODO()) |
||||
}, |
||||
closeConn: c.connPool.CloseConn, |
||||
} |
||||
pubsub.init() |
||||
return pubsub |
||||
} |
||||
|
||||
// Subscribe subscribes the client to the specified channels.
|
||||
// Channels can be omitted to create empty subscription.
|
||||
// Note that this method does not wait on a response from Redis, so the
|
||||
// subscription may not be active immediately. To force the connection to wait,
|
||||
// you may call the Receive() method on the returned *PubSub like so:
|
||||
//
|
||||
// sub := client.Subscribe(queryResp)
|
||||
// iface, err := sub.Receive()
|
||||
// if err != nil {
|
||||
// // handle error
|
||||
// }
|
||||
//
|
||||
// // Should be *Subscription, but others are possible if other actions have been
|
||||
// // taken on sub since it was created.
|
||||
// switch iface.(type) {
|
||||
// case *Subscription:
|
||||
// // subscribe succeeded
|
||||
// case *Message:
|
||||
// // received first message
|
||||
// case *Pong:
|
||||
// // pong received
|
||||
// default:
|
||||
// // handle error
|
||||
// }
|
||||
//
|
||||
// ch := sub.Channel()
|
||||
func (c *Client) Subscribe(channels ...string) *PubSub { |
||||
pubsub := c.pubSub() |
||||
if len(channels) > 0 { |
||||
_ = pubsub.Subscribe(channels...) |
||||
} |
||||
return pubsub |
||||
} |
||||
|
||||
// PSubscribe subscribes the client to the given patterns.
|
||||
// Patterns can be omitted to create empty subscription.
|
||||
func (c *Client) PSubscribe(channels ...string) *PubSub { |
||||
pubsub := c.pubSub() |
||||
if len(channels) > 0 { |
||||
_ = pubsub.PSubscribe(channels...) |
||||
} |
||||
return pubsub |
||||
} |
||||
|
||||
//------------------------------------------------------------------------------
|
||||
|
||||
type conn struct { |
||||
baseClient |
||||
cmdable |
||||
statefulCmdable |
||||
} |
||||
|
||||
// Conn is like Client, but its pool contains single connection.
|
||||
type Conn struct { |
||||
*conn |
||||
ctx context.Context |
||||
} |
||||
|
||||
func newConn(ctx context.Context, opt *Options, connPool pool.Pooler) *Conn { |
||||
c := Conn{ |
||||
conn: &conn{ |
||||
baseClient: baseClient{ |
||||
opt: opt, |
||||
connPool: connPool, |
||||
}, |
||||
}, |
||||
ctx: ctx, |
||||
} |
||||
c.cmdable = c.Process |
||||
c.statefulCmdable = c.Process |
||||
return &c |
||||
} |
||||
|
||||
func (c *Conn) Process(cmd Cmder) error { |
||||
return c.ProcessContext(c.ctx, cmd) |
||||
} |
||||
|
||||
func (c *Conn) ProcessContext(ctx context.Context, cmd Cmder) error { |
||||
return c.baseClient.process(ctx, cmd) |
||||
} |
||||
|
||||
func (c *Conn) Pipelined(fn func(Pipeliner) error) ([]Cmder, error) { |
||||
return c.Pipeline().Pipelined(fn) |
||||
} |
||||
|
||||
func (c *Conn) Pipeline() Pipeliner { |
||||
pipe := Pipeline{ |
||||
ctx: c.ctx, |
||||
exec: c.processPipeline, |
||||
} |
||||
pipe.init() |
||||
return &pipe |
||||
} |
||||
|
||||
func (c *Conn) TxPipelined(fn func(Pipeliner) error) ([]Cmder, error) { |
||||
return c.TxPipeline().Pipelined(fn) |
||||
} |
||||
|
||||
// TxPipeline acts like Pipeline, but wraps queued commands with MULTI/EXEC.
|
||||
func (c *Conn) TxPipeline() Pipeliner { |
||||
pipe := Pipeline{ |
||||
ctx: c.ctx, |
||||
exec: c.processTxPipeline, |
||||
} |
||||
pipe.init() |
||||
return &pipe |
||||
} |
292
vendor/github.com/go-redis/redis/ring.go → vendor/github.com/go-redis/redis/v7/ring.go
generated
vendored
292
vendor/github.com/go-redis/redis/ring.go → vendor/github.com/go-redis/redis/v7/ring.go
generated
vendored
91
vendor/github.com/go-redis/redis/tx.go → vendor/github.com/go-redis/redis/v7/tx.go
generated
vendored
91
vendor/github.com/go-redis/redis/tx.go → vendor/github.com/go-redis/redis/v7/tx.go
generated
vendored
Loading…
Reference in new issue