You can not select more than 25 topics
Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.
388 lines
8.5 KiB
388 lines
8.5 KiB
6 years ago
|
package couchbase
|
||
|
|
||
|
import (
|
||
|
"errors"
|
||
|
"sync/atomic"
|
||
|
"time"
|
||
|
|
||
|
"github.com/couchbase/gomemcached"
|
||
|
"github.com/couchbase/gomemcached/client"
|
||
|
"github.com/couchbase/goutils/logging"
|
||
|
)
|
||
|
|
||
|
// GenericMcdAuthHandler is a kind of AuthHandler that performs
|
||
|
// special auth exchange (like non-standard auth, possibly followed by
|
||
|
// select-bucket).
|
||
|
type GenericMcdAuthHandler interface {
|
||
|
AuthHandler
|
||
|
AuthenticateMemcachedConn(host string, conn *memcached.Client) error
|
||
|
}
|
||
|
|
||
|
// Error raised when a connection can't be retrieved from a pool.
|
||
|
var TimeoutError = errors.New("timeout waiting to build connection")
|
||
|
var errClosedPool = errors.New("the connection pool is closed")
|
||
|
var errNoPool = errors.New("no connection pool")
|
||
|
|
||
|
// Default timeout for retrieving a connection from the pool.
|
||
|
var ConnPoolTimeout = time.Hour * 24 * 30
|
||
|
|
||
|
// overflow connection closer cycle time
|
||
|
var ConnCloserInterval = time.Second * 30
|
||
|
|
||
|
// ConnPoolAvailWaitTime is the amount of time to wait for an existing
|
||
|
// connection from the pool before considering the creation of a new
|
||
|
// one.
|
||
|
var ConnPoolAvailWaitTime = time.Millisecond
|
||
|
|
||
|
type connectionPool struct {
|
||
|
host string
|
||
|
mkConn func(host string, ah AuthHandler) (*memcached.Client, error)
|
||
|
auth AuthHandler
|
||
|
connections chan *memcached.Client
|
||
|
createsem chan bool
|
||
|
bailOut chan bool
|
||
|
poolSize int
|
||
|
connCount uint64
|
||
|
inUse bool
|
||
|
}
|
||
|
|
||
|
func newConnectionPool(host string, ah AuthHandler, closer bool, poolSize, poolOverflow int) *connectionPool {
|
||
|
connSize := poolSize
|
||
|
if closer {
|
||
|
connSize += poolOverflow
|
||
|
}
|
||
|
rv := &connectionPool{
|
||
|
host: host,
|
||
|
connections: make(chan *memcached.Client, connSize),
|
||
|
createsem: make(chan bool, poolSize+poolOverflow),
|
||
|
mkConn: defaultMkConn,
|
||
|
auth: ah,
|
||
|
poolSize: poolSize,
|
||
|
}
|
||
|
if closer {
|
||
|
rv.bailOut = make(chan bool, 1)
|
||
|
go rv.connCloser()
|
||
|
}
|
||
|
return rv
|
||
|
}
|
||
|
|
||
|
// ConnPoolTimeout is notified whenever connections are acquired from a pool.
|
||
|
var ConnPoolCallback func(host string, source string, start time.Time, err error)
|
||
|
|
||
|
func defaultMkConn(host string, ah AuthHandler) (*memcached.Client, error) {
|
||
|
var features memcached.Features
|
||
|
|
||
|
conn, err := memcached.Connect("tcp", host)
|
||
|
if err != nil {
|
||
|
return nil, err
|
||
|
}
|
||
|
|
||
|
if TCPKeepalive == true {
|
||
|
conn.SetKeepAliveOptions(time.Duration(TCPKeepaliveInterval) * time.Second)
|
||
|
}
|
||
|
|
||
|
if EnableMutationToken == true {
|
||
|
features = append(features, memcached.FeatureMutationToken)
|
||
|
}
|
||
|
if EnableDataType == true {
|
||
|
features = append(features, memcached.FeatureDataType)
|
||
|
}
|
||
|
|
||
|
if EnableXattr == true {
|
||
|
features = append(features, memcached.FeatureXattr)
|
||
|
}
|
||
|
|
||
|
if len(features) > 0 {
|
||
|
if DefaultTimeout > 0 {
|
||
|
conn.SetDeadline(getDeadline(noDeadline, DefaultTimeout))
|
||
|
}
|
||
|
|
||
|
res, err := conn.EnableFeatures(features)
|
||
|
|
||
|
if DefaultTimeout > 0 {
|
||
|
conn.SetDeadline(noDeadline)
|
||
|
}
|
||
|
|
||
|
if err != nil && isTimeoutError(err) {
|
||
|
conn.Close()
|
||
|
return nil, err
|
||
|
}
|
||
|
|
||
|
if err != nil || res.Status != gomemcached.SUCCESS {
|
||
|
logging.Warnf("Unable to enable features %v", err)
|
||
|
}
|
||
|
}
|
||
|
|
||
|
if gah, ok := ah.(GenericMcdAuthHandler); ok {
|
||
|
err = gah.AuthenticateMemcachedConn(host, conn)
|
||
|
if err != nil {
|
||
|
conn.Close()
|
||
|
return nil, err
|
||
|
}
|
||
|
return conn, nil
|
||
|
}
|
||
|
name, pass, bucket := ah.GetCredentials()
|
||
|
if name != "default" {
|
||
|
_, err = conn.Auth(name, pass)
|
||
|
if err != nil {
|
||
|
conn.Close()
|
||
|
return nil, err
|
||
|
}
|
||
|
// Select bucket (Required for cb_auth creds)
|
||
|
// Required when doing auth with _admin credentials
|
||
|
if bucket != "" && bucket != name {
|
||
|
_, err = conn.SelectBucket(bucket)
|
||
|
if err != nil {
|
||
|
conn.Close()
|
||
|
return nil, err
|
||
|
}
|
||
|
}
|
||
|
}
|
||
|
return conn, nil
|
||
|
}
|
||
|
|
||
|
func (cp *connectionPool) Close() (err error) {
|
||
|
defer func() {
|
||
|
if recover() != nil {
|
||
|
err = errors.New("connectionPool.Close error")
|
||
|
}
|
||
|
}()
|
||
|
if cp.bailOut != nil {
|
||
|
|
||
|
// defensively, we won't wait if the channel is full
|
||
|
select {
|
||
|
case cp.bailOut <- false:
|
||
|
default:
|
||
|
}
|
||
|
}
|
||
|
close(cp.connections)
|
||
|
for c := range cp.connections {
|
||
|
c.Close()
|
||
|
}
|
||
|
return
|
||
|
}
|
||
|
|
||
|
func (cp *connectionPool) Node() string {
|
||
|
return cp.host
|
||
|
}
|
||
|
|
||
|
func (cp *connectionPool) GetWithTimeout(d time.Duration) (rv *memcached.Client, err error) {
|
||
|
if cp == nil {
|
||
|
return nil, errNoPool
|
||
|
}
|
||
|
|
||
|
path := ""
|
||
|
|
||
|
if ConnPoolCallback != nil {
|
||
|
defer func(path *string, start time.Time) {
|
||
|
ConnPoolCallback(cp.host, *path, start, err)
|
||
|
}(&path, time.Now())
|
||
|
}
|
||
|
|
||
|
path = "short-circuit"
|
||
|
|
||
|
// short-circuit available connetions.
|
||
|
select {
|
||
|
case rv, isopen := <-cp.connections:
|
||
|
if !isopen {
|
||
|
return nil, errClosedPool
|
||
|
}
|
||
|
atomic.AddUint64(&cp.connCount, 1)
|
||
|
return rv, nil
|
||
|
default:
|
||
|
}
|
||
|
|
||
|
t := time.NewTimer(ConnPoolAvailWaitTime)
|
||
|
defer t.Stop()
|
||
|
|
||
|
// Try to grab an available connection within 1ms
|
||
|
select {
|
||
|
case rv, isopen := <-cp.connections:
|
||
|
path = "avail1"
|
||
|
if !isopen {
|
||
|
return nil, errClosedPool
|
||
|
}
|
||
|
atomic.AddUint64(&cp.connCount, 1)
|
||
|
return rv, nil
|
||
|
case <-t.C:
|
||
|
// No connection came around in time, let's see
|
||
|
// whether we can get one or build a new one first.
|
||
|
t.Reset(d) // Reuse the timer for the full timeout.
|
||
|
select {
|
||
|
case rv, isopen := <-cp.connections:
|
||
|
path = "avail2"
|
||
|
if !isopen {
|
||
|
return nil, errClosedPool
|
||
|
}
|
||
|
atomic.AddUint64(&cp.connCount, 1)
|
||
|
return rv, nil
|
||
|
case cp.createsem <- true:
|
||
|
path = "create"
|
||
|
// Build a connection if we can't get a real one.
|
||
|
// This can potentially be an overflow connection, or
|
||
|
// a pooled connection.
|
||
|
rv, err := cp.mkConn(cp.host, cp.auth)
|
||
|
if err != nil {
|
||
|
// On error, release our create hold
|
||
|
<-cp.createsem
|
||
|
} else {
|
||
|
atomic.AddUint64(&cp.connCount, 1)
|
||
|
}
|
||
|
return rv, err
|
||
|
case <-t.C:
|
||
|
return nil, ErrTimeout
|
||
|
}
|
||
|
}
|
||
|
}
|
||
|
|
||
|
func (cp *connectionPool) Get() (*memcached.Client, error) {
|
||
|
return cp.GetWithTimeout(ConnPoolTimeout)
|
||
|
}
|
||
|
|
||
|
func (cp *connectionPool) Return(c *memcached.Client) {
|
||
|
if c == nil {
|
||
|
return
|
||
|
}
|
||
|
|
||
|
if cp == nil {
|
||
|
c.Close()
|
||
|
}
|
||
|
|
||
|
if c.IsHealthy() {
|
||
|
defer func() {
|
||
|
if recover() != nil {
|
||
|
// This happens when the pool has already been
|
||
|
// closed and we're trying to return a
|
||
|
// connection to it anyway. Just close the
|
||
|
// connection.
|
||
|
c.Close()
|
||
|
}
|
||
|
}()
|
||
|
|
||
|
select {
|
||
|
case cp.connections <- c:
|
||
|
default:
|
||
|
<-cp.createsem
|
||
|
c.Close()
|
||
|
}
|
||
|
} else {
|
||
|
<-cp.createsem
|
||
|
c.Close()
|
||
|
}
|
||
|
}
|
||
|
|
||
|
// give the ability to discard a connection from a pool
|
||
|
// useful for ditching connections to the wrong node after a rebalance
|
||
|
func (cp *connectionPool) Discard(c *memcached.Client) {
|
||
|
<-cp.createsem
|
||
|
c.Close()
|
||
|
}
|
||
|
|
||
|
// asynchronous connection closer
|
||
|
func (cp *connectionPool) connCloser() {
|
||
|
var connCount uint64
|
||
|
|
||
|
t := time.NewTimer(ConnCloserInterval)
|
||
|
defer t.Stop()
|
||
|
|
||
|
for {
|
||
|
connCount = cp.connCount
|
||
|
|
||
|
// we don't exist anymore! bail out!
|
||
|
select {
|
||
|
case <-cp.bailOut:
|
||
|
return
|
||
|
case <-t.C:
|
||
|
}
|
||
|
t.Reset(ConnCloserInterval)
|
||
|
|
||
|
// no overflow connections open or sustained requests for connections
|
||
|
// nothing to do until the next cycle
|
||
|
if len(cp.connections) <= cp.poolSize ||
|
||
|
ConnCloserInterval/ConnPoolAvailWaitTime < time.Duration(cp.connCount-connCount) {
|
||
|
continue
|
||
|
}
|
||
|
|
||
|
// close overflow connections now that they are not needed
|
||
|
for c := range cp.connections {
|
||
|
select {
|
||
|
case <-cp.bailOut:
|
||
|
return
|
||
|
default:
|
||
|
}
|
||
|
|
||
|
// bail out if close did not work out
|
||
|
if !cp.connCleanup(c) {
|
||
|
return
|
||
|
}
|
||
|
if len(cp.connections) <= cp.poolSize {
|
||
|
break
|
||
|
}
|
||
|
}
|
||
|
}
|
||
|
}
|
||
|
|
||
|
// close connection with recovery on error
|
||
|
func (cp *connectionPool) connCleanup(c *memcached.Client) (rv bool) {
|
||
|
|
||
|
// just in case we are closing a connection after
|
||
|
// bailOut has been sent but we haven't yet read it
|
||
|
defer func() {
|
||
|
if recover() != nil {
|
||
|
rv = false
|
||
|
}
|
||
|
}()
|
||
|
rv = true
|
||
|
|
||
|
c.Close()
|
||
|
<-cp.createsem
|
||
|
return
|
||
|
}
|
||
|
|
||
|
func (cp *connectionPool) StartTapFeed(args *memcached.TapArguments) (*memcached.TapFeed, error) {
|
||
|
if cp == nil {
|
||
|
return nil, errNoPool
|
||
|
}
|
||
|
mc, err := cp.Get()
|
||
|
if err != nil {
|
||
|
return nil, err
|
||
|
}
|
||
|
|
||
|
// A connection can't be used after TAP; Dont' count it against the
|
||
|
// connection pool capacity
|
||
|
<-cp.createsem
|
||
|
|
||
|
return mc.StartTapFeed(*args)
|
||
|
}
|
||
|
|
||
|
const DEFAULT_WINDOW_SIZE = 20 * 1024 * 1024 // 20 Mb
|
||
|
|
||
|
func (cp *connectionPool) StartUprFeed(name string, sequence uint32, dcp_buffer_size uint32, data_chan_size int) (*memcached.UprFeed, error) {
|
||
|
if cp == nil {
|
||
|
return nil, errNoPool
|
||
|
}
|
||
|
mc, err := cp.Get()
|
||
|
if err != nil {
|
||
|
return nil, err
|
||
|
}
|
||
|
|
||
|
// A connection can't be used after it has been allocated to UPR;
|
||
|
// Dont' count it against the connection pool capacity
|
||
|
<-cp.createsem
|
||
|
|
||
|
uf, err := mc.NewUprFeed()
|
||
|
if err != nil {
|
||
|
return nil, err
|
||
|
}
|
||
|
|
||
|
if err := uf.UprOpen(name, sequence, dcp_buffer_size); err != nil {
|
||
|
return nil, err
|
||
|
}
|
||
|
|
||
|
if err := uf.StartFeedWithConfig(data_chan_size); err != nil {
|
||
|
return nil, err
|
||
|
}
|
||
|
|
||
|
return uf, nil
|
||
|
}
|