You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.
 
 
 
 
 
 
gitea/vendor/github.com/couchbase/gomemcached/client/mc.go

1443 lines
39 KiB

// Package memcached provides a memcached binary protocol client.
package memcached
import (
"crypto/tls"
"encoding/binary"
"fmt"
"github.com/couchbase/gomemcached"
"github.com/couchbase/goutils/logging"
"github.com/couchbase/goutils/scramsha"
"github.com/pkg/errors"
"io"
"math"
"net"
"strings"
"sync"
"sync/atomic"
"time"
)
type ClientIface interface {
Add(vb uint16, key string, flags int, exp int, body []byte, context ...*ClientContext) (*gomemcached.MCResponse, error)
Append(vb uint16, key string, data []byte, context ...*ClientContext) (*gomemcached.MCResponse, error)
Auth(user, pass string) (*gomemcached.MCResponse, error)
AuthList() (*gomemcached.MCResponse, error)
AuthPlain(user, pass string) (*gomemcached.MCResponse, error)
AuthScramSha(user, pass string) (*gomemcached.MCResponse, error)
CASNext(vb uint16, k string, exp int, state *CASState) bool
CAS(vb uint16, k string, f CasFunc, initexp int) (*gomemcached.MCResponse, error)
CollectionsGetCID(scope string, collection string) (*gomemcached.MCResponse, error)
CollectionEnabled() bool
Close() error
Decr(vb uint16, key string, amt, def uint64, exp int, context ...*ClientContext) (uint64, error)
Del(vb uint16, key string, context ...*ClientContext) (*gomemcached.MCResponse, error)
EnableMutationToken() (*gomemcached.MCResponse, error)
EnableFeatures(features Features) (*gomemcached.MCResponse, error)
Get(vb uint16, key string, context ...*ClientContext) (*gomemcached.MCResponse, error)
GetAllVbSeqnos(vbSeqnoMap map[uint16]uint64, context ...*ClientContext) (map[uint16]uint64, error)
GetAndTouch(vb uint16, key string, exp int, context ...*ClientContext) (*gomemcached.MCResponse, error)
GetBulk(vb uint16, keys []string, rv map[string]*gomemcached.MCResponse, subPaths []string, context ...*ClientContext) error
GetCollectionsManifest() (*gomemcached.MCResponse, error)
GetMeta(vb uint16, key string, context ...*ClientContext) (*gomemcached.MCResponse, error)
GetRandomDoc(context ...*ClientContext) (*gomemcached.MCResponse, error)
GetSubdoc(vb uint16, key string, subPaths []string, context ...*ClientContext) (*gomemcached.MCResponse, error)
Hijack() io.ReadWriteCloser
Incr(vb uint16, key string, amt, def uint64, exp int, context ...*ClientContext) (uint64, error)
Observe(vb uint16, key string) (result ObserveResult, err error)
ObserveSeq(vb uint16, vbuuid uint64) (result *ObserveSeqResult, err error)
Receive() (*gomemcached.MCResponse, error)
ReceiveWithDeadline(deadline time.Time) (*gomemcached.MCResponse, error)
Send(req *gomemcached.MCRequest) (rv *gomemcached.MCResponse, err error)
Set(vb uint16, key string, flags int, exp int, body []byte, context ...*ClientContext) (*gomemcached.MCResponse, error)
SetKeepAliveOptions(interval time.Duration)
SetReadDeadline(t time.Time)
SetDeadline(t time.Time)
SelectBucket(bucket string) (*gomemcached.MCResponse, error)
SetCas(vb uint16, key string, flags int, exp int, cas uint64, body []byte, context ...*ClientContext) (*gomemcached.MCResponse, error)
Stats(key string) ([]StatValue, error)
StatsMap(key string) (map[string]string, error)
StatsMapForSpecifiedStats(key string, statsMap map[string]string) error
Transmit(req *gomemcached.MCRequest) error
TransmitWithDeadline(req *gomemcached.MCRequest, deadline time.Time) error
TransmitResponse(res *gomemcached.MCResponse) error
UprGetFailoverLog(vb []uint16) (map[uint16]*FailoverLog, error)
// UprFeed Related
NewUprFeed() (*UprFeed, error)
NewUprFeedIface() (UprFeedIface, error)
NewUprFeedWithConfig(ackByClient bool) (*UprFeed, error)
NewUprFeedWithConfigIface(ackByClient bool) (UprFeedIface, error)
}
type ClientContext struct {
// Collection-based context
CollId uint32
// VB-state related context
// nil means not used in this context
VbState *VbStateType
}
type VbStateType uint8
const (
VbAlive VbStateType = 0x00
VbActive VbStateType = 0x01
VbReplica VbStateType = 0x02
VbPending VbStateType = 0x03
VbDead VbStateType = 0x04
)
func (context *ClientContext) InitExtras(req *gomemcached.MCRequest, client *Client) {
if req == nil || client == nil {
return
}
var bytesToAllocate int
switch req.Opcode {
case gomemcached.GET_ALL_VB_SEQNOS:
if context.VbState != nil {
bytesToAllocate += 4
}
if client.CollectionEnabled() {
if context.VbState == nil {
bytesToAllocate += 8
} else {
bytesToAllocate += 4
}
}
}
if bytesToAllocate > 0 {
req.Extras = make([]byte, bytesToAllocate)
}
}
const bufsize = 1024
var UnHealthy uint32 = 0
var Healthy uint32 = 1
type Features []Feature
type Feature uint16
const FeatureTcpNoDelay = Feature(0x03)
const FeatureMutationToken = Feature(0x04) // XATTR bit in data type field with dcp mutations
const FeatureXattr = Feature(0x06)
const FeatureXerror = Feature(0x07)
const FeatureCollections = Feature(0x12)
const FeatureSnappyCompression = Feature(0x0a)
const FeatureDataType = Feature(0x0b)
type memcachedConnection interface {
io.ReadWriteCloser
SetReadDeadline(time.Time) error
SetDeadline(time.Time) error
}
// The Client itself.
type Client struct {
conn memcachedConnection
// use uint32 type so that it can be accessed through atomic APIs
healthy uint32
opaque uint32
hdrBuf []byte
collectionsEnabled uint32
deadline time.Time
}
var (
DefaultDialTimeout = time.Duration(0) // No timeout
DefaultWriteTimeout = time.Duration(0) // No timeout
dialFun = func(prot, dest string) (net.Conn, error) {
return net.DialTimeout(prot, dest, DefaultDialTimeout)
}
)
// Connect to a memcached server.
func Connect(prot, dest string) (rv *Client, err error) {
conn, err := dialFun(prot, dest)
if err != nil {
return nil, err
}
return Wrap(conn)
}
// Connect to a memcached server using TLS.
func ConnectTLS(prot, dest string, config *tls.Config) (rv *Client, err error) {
conn, err := tls.Dial(prot, dest, config)
if err != nil {
return nil, err
}
return Wrap(conn)
}
func SetDefaultTimeouts(dial, read, write time.Duration) {
DefaultDialTimeout = dial
DefaultWriteTimeout = write
}
func SetDefaultDialTimeout(dial time.Duration) {
DefaultDialTimeout = dial
}
func (c *Client) SetKeepAliveOptions(interval time.Duration) {
tcpConn, ok := c.conn.(*net.TCPConn)
if ok {
tcpConn.SetKeepAlive(true)
tcpConn.SetKeepAlivePeriod(interval)
}
}
func (c *Client) SetReadDeadline(t time.Time) {
c.conn.SetReadDeadline(t)
}
func (c *Client) SetDeadline(t time.Time) {
if t.Equal(c.deadline) {
return
}
c.conn.SetDeadline(t)
c.deadline = t
}
// Wrap an existing transport.
func Wrap(conn memcachedConnection) (rv *Client, err error) {
client := &Client{
conn: conn,
hdrBuf: make([]byte, gomemcached.HDR_LEN),
opaque: uint32(1),
}
client.setHealthy(true)
return client, nil
}
// Close the connection when you're done.
func (c *Client) Close() error {
return c.conn.Close()
}
// IsHealthy returns true unless the client is belived to have
// difficulty communicating to its server.
//
// This is useful for connection pools where we want to
// non-destructively determine that a connection may be reused.
func (c Client) IsHealthy() bool {
healthyState := atomic.LoadUint32(&c.healthy)
return healthyState == Healthy
}
// Send a custom request and get the response.
func (c *Client) Send(req *gomemcached.MCRequest) (rv *gomemcached.MCResponse, err error) {
err = c.Transmit(req)
if err != nil {
return
}
resp, _, err := getResponse(c.conn, c.hdrBuf)
c.setHealthy(!gomemcached.IsFatal(err))
return resp, err
}
// Transmit send a request, but does not wait for a response.
func (c *Client) Transmit(req *gomemcached.MCRequest) error {
if DefaultWriteTimeout > 0 {
c.conn.(net.Conn).SetWriteDeadline(time.Now().Add(DefaultWriteTimeout))
}
_, err := transmitRequest(c.conn, req)
// clear write deadline to avoid interference with future write operations
if DefaultWriteTimeout > 0 {
c.conn.(net.Conn).SetWriteDeadline(time.Time{})
}
if err != nil {
c.setHealthy(false)
}
return err
}
func (c *Client) TransmitWithDeadline(req *gomemcached.MCRequest, deadline time.Time) error {
c.conn.(net.Conn).SetWriteDeadline(deadline)
_, err := transmitRequest(c.conn, req)
// clear write deadline to avoid interference with future write operations
c.conn.(net.Conn).SetWriteDeadline(time.Time{})
if err != nil {
c.setHealthy(false)
}
return err
}
// TransmitResponse send a response, does not wait.
func (c *Client) TransmitResponse(res *gomemcached.MCResponse) error {
if DefaultWriteTimeout > 0 {
c.conn.(net.Conn).SetWriteDeadline(time.Now().Add(DefaultWriteTimeout))
}
_, err := transmitResponse(c.conn, res)
// clear write deadline to avoid interference with future write operations
if DefaultWriteTimeout > 0 {
c.conn.(net.Conn).SetWriteDeadline(time.Time{})
}
if err != nil {
c.setHealthy(false)
}
return err
}
// Receive a response
func (c *Client) Receive() (*gomemcached.MCResponse, error) {
resp, _, err := getResponse(c.conn, c.hdrBuf)
if err != nil && resp.Status != gomemcached.KEY_ENOENT && resp.Status != gomemcached.EBUSY {
c.setHealthy(false)
}
return resp, err
}
func (c *Client) ReceiveWithDeadline(deadline time.Time) (*gomemcached.MCResponse, error) {
c.conn.(net.Conn).SetReadDeadline(deadline)
resp, _, err := getResponse(c.conn, c.hdrBuf)
// Clear read deadline to avoid interference with future read operations.
c.conn.(net.Conn).SetReadDeadline(time.Time{})
if err != nil && resp.Status != gomemcached.KEY_ENOENT && resp.Status != gomemcached.EBUSY {
c.setHealthy(false)
}
return resp, err
}
func appendMutationToken(bytes []byte) []byte {
bytes = append(bytes, 0, 0)
binary.BigEndian.PutUint16(bytes[len(bytes)-2:], uint16(0x04))
return bytes
}
//Send a hello command to enable MutationTokens
func (c *Client) EnableMutationToken() (*gomemcached.MCResponse, error) {
var payload []byte
payload = appendMutationToken(payload)
return c.Send(&gomemcached.MCRequest{
Opcode: gomemcached.HELLO,
Key: []byte("GoMemcached"),
Body: payload,
})
}
//Send a hello command to enable specific features
func (c *Client) EnableFeatures(features Features) (*gomemcached.MCResponse, error) {
var payload []byte
collectionsEnabled := 0
for _, feature := range features {
if feature == FeatureCollections {
collectionsEnabled = 1
}
payload = append(payload, 0, 0)
binary.BigEndian.PutUint16(payload[len(payload)-2:], uint16(feature))
}
rv, err := c.Send(&gomemcached.MCRequest{
Opcode: gomemcached.HELLO,
Key: []byte("GoMemcached"),
Body: payload,
})
if err == nil && collectionsEnabled != 0 {
atomic.StoreUint32(&c.collectionsEnabled, uint32(collectionsEnabled))
}
return rv, err
}
// Sets collection info for a request
func (c *Client) setCollection(req *gomemcached.MCRequest, context ...*ClientContext) error {
req.CollIdLen = 0
collectionId := uint32(0)
if len(context) > 0 {
collectionId = context[0].CollId
}
// if the optional collection is specified, it must be default for clients that haven't turned on collections
if atomic.LoadUint32(&c.collectionsEnabled) == 0 {
if collectionId != 0 {
return fmt.Errorf("Client does not use collections but a collection was specified")
}
} else {
req.CollIdLen = binary.PutUvarint(req.CollId[:], uint64(collectionId))
}
return nil
}
// Sets collection info in extras
func (c *Client) setExtrasCollection(req *gomemcached.MCRequest, context ...*ClientContext) error {
collectionId := uint32(0)
if len(context) > 0 {
collectionId = context[0].CollId
}
// if the optional collection is specified, it must be default for clients that haven't turned on collections
if atomic.LoadUint32(&c.collectionsEnabled) == 0 {
if collectionId != 0 {
return fmt.Errorf("Client does not use collections but a collection was specified")
}
} else {
req.Extras = make([]byte, 4)
binary.BigEndian.PutUint32(req.Extras, collectionId)
}
return nil
}
func (c *Client) setVbSeqnoContext(req *gomemcached.MCRequest, context ...*ClientContext) error {
if len(context) == 0 || req == nil {
return nil
}
switch req.Opcode {
case gomemcached.GET_ALL_VB_SEQNOS:
if len(context) == 0 {
return nil
}
if len(req.Extras) == 0 {
context[0].InitExtras(req, c)
}
if context[0].VbState != nil {
binary.BigEndian.PutUint32(req.Extras, uint32(*(context[0].VbState)))
}
if c.CollectionEnabled() {
binary.BigEndian.PutUint32(req.Extras[4:8], context[0].CollId)
}
return nil
default:
return fmt.Errorf("setVbState Not supported for opcode: %v", req.Opcode.String())
}
}
// Get the value for a key.
func (c *Client) Get(vb uint16, key string, context ...*ClientContext) (*gomemcached.MCResponse, error) {
req := &gomemcached.MCRequest{
Opcode: gomemcached.GET,
VBucket: vb,
Key: []byte(key),
}
err := c.setCollection(req, context...)
if err != nil {
return nil, err
}
return c.Send(req)
}
// Get the xattrs, doc value for the input key
func (c *Client) GetSubdoc(vb uint16, key string, subPaths []string, context ...*ClientContext) (*gomemcached.MCResponse, error) {
extraBuf, valueBuf := GetSubDocVal(subPaths)
req := &gomemcached.MCRequest{
Opcode: gomemcached.SUBDOC_MULTI_LOOKUP,
VBucket: vb,
Key: []byte(key),
Extras: extraBuf,
Body: valueBuf,
}
err := c.setCollection(req, context...)
if err != nil {
return nil, err
}
res, err := c.Send(req)
if err != nil && IfResStatusError(res) {
return res, err
}
return res, nil
}
// Retrieve the collections manifest.
func (c *Client) GetCollectionsManifest() (*gomemcached.MCResponse, error) {
res, err := c.Send(&gomemcached.MCRequest{
Opcode: gomemcached.GET_COLLECTIONS_MANIFEST,
})
if err != nil && IfResStatusError(res) {
return res, err
}
return res, nil
}
// Retrieve the collections manifest.
func (c *Client) CollectionsGetCID(scope string, collection string) (*gomemcached.MCResponse, error) {
res, err := c.Send(&gomemcached.MCRequest{
Opcode: gomemcached.COLLECTIONS_GET_CID,
Key: []byte(scope + "." + collection),
})
if err != nil && IfResStatusError(res) {
return res, err
}
return res, nil
}
func (c *Client) CollectionEnabled() bool {
return atomic.LoadUint32(&c.collectionsEnabled) > 0
}
// Get the value for a key, and update expiry
func (c *Client) GetAndTouch(vb uint16, key string, exp int, context ...*ClientContext) (*gomemcached.MCResponse, error) {
extraBuf := make([]byte, 4)
binary.BigEndian.PutUint32(extraBuf[0:], uint32(exp))
req := &gomemcached.MCRequest{
Opcode: gomemcached.GAT,
VBucket: vb,
Key: []byte(key),
Extras: extraBuf,
}
err := c.setCollection(req, context...)
if err != nil {
return nil, err
}
return c.Send(req)
}
// Get metadata for a key
func (c *Client) GetMeta(vb uint16, key string, context ...*ClientContext) (*gomemcached.MCResponse, error) {
req := &gomemcached.MCRequest{
Opcode: gomemcached.GET_META,
VBucket: vb,
Key: []byte(key),
}
err := c.setCollection(req, context...)
if err != nil {
return nil, err
}
return c.Send(req)
}
// Del deletes a key.
func (c *Client) Del(vb uint16, key string, context ...*ClientContext) (*gomemcached.MCResponse, error) {
req := &gomemcached.MCRequest{
Opcode: gomemcached.DELETE,
VBucket: vb,
Key: []byte(key),
}
err := c.setCollection(req, context...)
if err != nil {
return nil, err
}
return c.Send(req)
}
// Get a random document
func (c *Client) GetRandomDoc(context ...*ClientContext) (*gomemcached.MCResponse, error) {
req := &gomemcached.MCRequest{
Opcode: 0xB6,
}
err := c.setExtrasCollection(req, context...)
if err != nil {
return nil, err
}
return c.Send(req)
}
// AuthList lists SASL auth mechanisms.
func (c *Client) AuthList() (*gomemcached.MCResponse, error) {
return c.Send(&gomemcached.MCRequest{
Opcode: gomemcached.SASL_LIST_MECHS})
}
// Auth performs SASL PLAIN authentication against the server.
func (c *Client) Auth(user, pass string) (*gomemcached.MCResponse, error) {
res, err := c.AuthList()
if err != nil {
return res, err
}
authMech := string(res.Body)
if strings.Index(authMech, "PLAIN") != -1 {
return c.AuthPlain(user, pass)
}
return nil, fmt.Errorf("auth mechanism PLAIN not supported")
}
// AuthScramSha performs SCRAM-SHA authentication against the server.
func (c *Client) AuthScramSha(user, pass string) (*gomemcached.MCResponse, error) {
res, err := c.AuthList()
if err != nil {
return nil, errors.Wrap(err, "Unable to obtain list of methods.")
}
methods := string(res.Body)
method, err := scramsha.BestMethod(methods)
if err != nil {
return nil, errors.Wrap(err,
"Unable to select SCRAM-SHA method.")
}
s, err := scramsha.NewScramSha(method)
if err != nil {
return nil, errors.Wrap(err, "Unable to initialize scramsha.")
}
logging.Infof("Using %v authentication for user %v%v%v", method, gomemcached.UdTagBegin, user, gomemcached.UdTagEnd)
message, err := s.GetStartRequest(user)
if err != nil {
return nil, errors.Wrapf(err,
"Error building start request for user %s.", user)
}
startRequest := &gomemcached.MCRequest{
Opcode: 0x21,
Key: []byte(method),
Body: []byte(message)}
startResponse, err := c.Send(startRequest)
if err != nil {
return nil, errors.Wrap(err, "Error sending start request.")
}
err = s.HandleStartResponse(string(startResponse.Body))
if err != nil {
return nil, errors.Wrap(err, "Error handling start response.")
}
message = s.GetFinalRequest(pass)
// send step request
finalRequest := &gomemcached.MCRequest{
Opcode: 0x22,
Key: []byte(method),
Body: []byte(message)}
finalResponse, err := c.Send(finalRequest)
if err != nil {
return nil, errors.Wrap(err, "Error sending final request.")
}
err = s.HandleFinalResponse(string(finalResponse.Body))
if err != nil {
return nil, errors.Wrap(err, "Error handling final response.")
}
return finalResponse, nil
}
func (c *Client) AuthPlain(user, pass string) (*gomemcached.MCResponse, error) {
logging.Infof("Using plain authentication for user %v%v%v", gomemcached.UdTagBegin, user, gomemcached.UdTagEnd)
return c.Send(&gomemcached.MCRequest{
Opcode: gomemcached.SASL_AUTH,
Key: []byte("PLAIN"),
Body: []byte(fmt.Sprintf("\x00%s\x00%s", user, pass))})
}
// select bucket
func (c *Client) SelectBucket(bucket string) (*gomemcached.MCResponse, error) {
return c.Send(&gomemcached.MCRequest{
Opcode: gomemcached.SELECT_BUCKET,
Key: []byte(bucket)})
}
func (c *Client) store(opcode gomemcached.CommandCode, vb uint16,
key string, flags int, exp int, body []byte, context ...*ClientContext) (*gomemcached.MCResponse, error) {
req := &gomemcached.MCRequest{
Opcode: opcode,
VBucket: vb,
Key: []byte(key),
Cas: 0,
Opaque: 0,
Extras: []byte{0, 0, 0, 0, 0, 0, 0, 0},
Body: body}
err := c.setCollection(req, context...)
if err != nil {
return nil, err
}
binary.BigEndian.PutUint64(req.Extras, uint64(flags)<<32|uint64(exp))
return c.Send(req)
}
func (c *Client) storeCas(opcode gomemcached.CommandCode, vb uint16,
key string, flags int, exp int, cas uint64, body []byte, context ...*ClientContext) (*gomemcached.MCResponse, error) {
req := &gomemcached.MCRequest{
Opcode: opcode,
VBucket: vb,
Key: []byte(key),
Cas: cas,
Opaque: 0,
Extras: []byte{0, 0, 0, 0, 0, 0, 0, 0},
Body: body}
err := c.setCollection(req, context...)
if err != nil {
return nil, err
}
binary.BigEndian.PutUint64(req.Extras, uint64(flags)<<32|uint64(exp))
return c.Send(req)
}
// Incr increments the value at the given key.
func (c *Client) Incr(vb uint16, key string,
amt, def uint64, exp int, context ...*ClientContext) (uint64, error) {
req := &gomemcached.MCRequest{
Opcode: gomemcached.INCREMENT,
VBucket: vb,
Key: []byte(key),
Extras: make([]byte, 8+8+4),
}
err := c.setCollection(req, context...)
if err != nil {
return 0, err
}
binary.BigEndian.PutUint64(req.Extras[:8], amt)
binary.BigEndian.PutUint64(req.Extras[8:16], def)
binary.BigEndian.PutUint32(req.Extras[16:20], uint32(exp))
resp, err := c.Send(req)
if err != nil {
return 0, err
}
return binary.BigEndian.Uint64(resp.Body), nil
}
// Decr decrements the value at the given key.
func (c *Client) Decr(vb uint16, key string,
amt, def uint64, exp int, context ...*ClientContext) (uint64, error) {
req := &gomemcached.MCRequest{
Opcode: gomemcached.DECREMENT,
VBucket: vb,
Key: []byte(key),
Extras: make([]byte, 8+8+4),
}
err := c.setCollection(req, context...)
if err != nil {
return 0, err
}
binary.BigEndian.PutUint64(req.Extras[:8], amt)
binary.BigEndian.PutUint64(req.Extras[8:16], def)
binary.BigEndian.PutUint32(req.Extras[16:20], uint32(exp))
resp, err := c.Send(req)
if err != nil {
return 0, err
}
return binary.BigEndian.Uint64(resp.Body), nil
}
// Add a value for a key (store if not exists).
func (c *Client) Add(vb uint16, key string, flags int, exp int,
body []byte, context ...*ClientContext) (*gomemcached.MCResponse, error) {
return c.store(gomemcached.ADD, vb, key, flags, exp, body, context...)
}
// Set the value for a key.
func (c *Client) Set(vb uint16, key string, flags int, exp int,
body []byte, context ...*ClientContext) (*gomemcached.MCResponse, error) {
return c.store(gomemcached.SET, vb, key, flags, exp, body, context...)
}
// SetCas set the value for a key with cas
func (c *Client) SetCas(vb uint16, key string, flags int, exp int, cas uint64,
body []byte, context ...*ClientContext) (*gomemcached.MCResponse, error) {
return c.storeCas(gomemcached.SET, vb, key, flags, exp, cas, body, context...)
}
// Append data to the value of a key.
func (c *Client) Append(vb uint16, key string, data []byte, context ...*ClientContext) (*gomemcached.MCResponse, error) {
req := &gomemcached.MCRequest{
Opcode: gomemcached.APPEND,
VBucket: vb,
Key: []byte(key),
Cas: 0,
Opaque: 0,
Body: data}
err := c.setCollection(req, context...)
if err != nil {
return nil, err
}
return c.Send(req)
}
// GetBulk gets keys in bulk
func (c *Client) GetBulk(vb uint16, keys []string, rv map[string]*gomemcached.MCResponse, subPaths []string, context ...*ClientContext) error {
stopch := make(chan bool)
var wg sync.WaitGroup
defer func() {
close(stopch)
wg.Wait()
}()
if (math.MaxInt32 - c.opaque) < (uint32(len(keys)) + 1) {
c.opaque = uint32(1)
}
opStart := c.opaque
errch := make(chan error, 2)
wg.Add(1)
go func() {
defer func() {
if r := recover(); r != nil {
logging.Infof("Recovered in f %v", r)
}
errch <- nil
wg.Done()
}()
ok := true
for ok {
select {
case <-stopch:
return
default:
res, err := c.Receive()
if err != nil && IfResStatusError(res) {
if res == nil || res.Status != gomemcached.KEY_ENOENT {
errch <- err
return
}
// continue receiving in case of KEY_ENOENT
} else if res.Opcode == gomemcached.GET ||
res.Opcode == gomemcached.SUBDOC_GET ||
res.Opcode == gomemcached.SUBDOC_MULTI_LOOKUP {
opaque := res.Opaque - opStart
if opaque < 0 || opaque >= uint32(len(keys)) {
// Every now and then we seem to be seeing an invalid opaque
// value returned from the server. When this happens log the error
// and the calling function will retry the bulkGet. MB-15140
logging.Errorf(" Invalid opaque Value. Debug info : Res.opaque : %v(%v), Keys %v, Response received %v \n key list %v this key %v", res.Opaque, opaque, len(keys), res, keys, string(res.Body))
errch <- fmt.Errorf("Out of Bounds error")
return
}
rv[keys[opaque]] = res
}
if res.Opcode == gomemcached.NOOP {
ok = false
}
}
}
}()
memcachedReqPkt := &gomemcached.MCRequest{
Opcode: gomemcached.GET,
VBucket: vb,
}
err := c.setCollection(memcachedReqPkt, context...)
if err != nil {
return err
}
if len(subPaths) > 0 {
extraBuf, valueBuf := GetSubDocVal(subPaths)
memcachedReqPkt.Opcode = gomemcached.SUBDOC_MULTI_LOOKUP
memcachedReqPkt.Extras = extraBuf
memcachedReqPkt.Body = valueBuf
}
for _, k := range keys { // Start of Get request
memcachedReqPkt.Key = []byte(k)
memcachedReqPkt.Opaque = c.opaque
err := c.Transmit(memcachedReqPkt)
if err != nil {
logging.Errorf(" Transmit failed in GetBulkAll %v", err)
return err
}
c.opaque++
} // End of Get request
// finally transmit a NOOP
err = c.Transmit(&gomemcached.MCRequest{
Opcode: gomemcached.NOOP,
VBucket: vb,
Opaque: c.opaque,
})
if err != nil {
logging.Errorf(" Transmit of NOOP failed %v", err)
return err
}
c.opaque++
return <-errch
}
func GetSubDocVal(subPaths []string) (extraBuf, valueBuf []byte) {
var ops []string
totalBytesLen := 0
num := 1
for _, v := range subPaths {
totalBytesLen = totalBytesLen + len([]byte(v))
ops = append(ops, v)
num = num + 1
}
// Xattr retrieval - subdoc multi get
// Set deleted true only if it is not expiration
if len(subPaths) != 1 || subPaths[0] != "$document.exptime" {
extraBuf = append(extraBuf, uint8(0x04))
}
valueBuf = make([]byte, num*4+totalBytesLen)
//opcode for subdoc get
op := gomemcached.SUBDOC_GET
// Calculate path total bytes
// There are 2 ops - get xattrs - both input and $document and get whole doc
valIter := 0
for _, v := range ops {
pathBytes := []byte(v)
valueBuf[valIter+0] = uint8(op)
// SubdocFlagXattrPath indicates that the path refers to
// an Xattr rather than the document body.
valueBuf[valIter+1] = uint8(gomemcached.SUBDOC_FLAG_XATTR)
// 2 byte key
binary.BigEndian.PutUint16(valueBuf[valIter+2:], uint16(len(pathBytes)))
// Then n bytes path
copy(valueBuf[valIter+4:], pathBytes)
valIter = valIter + 4 + len(pathBytes)
}
return
}
// ObservedStatus is the type reported by the Observe method
type ObservedStatus uint8
// Observation status values.
const (
ObservedNotPersisted = ObservedStatus(0x00) // found, not persisted
ObservedPersisted = ObservedStatus(0x01) // found, persisted
ObservedNotFound = ObservedStatus(0x80) // not found (or a persisted delete)
ObservedLogicallyDeleted = ObservedStatus(0x81) // pending deletion (not persisted yet)
)
// ObserveResult represents the data obtained by an Observe call
type ObserveResult struct {
Status ObservedStatus // Whether the value has been persisted/deleted
Cas uint64 // Current value's CAS
PersistenceTime time.Duration // Node's average time to persist a value
ReplicationTime time.Duration // Node's average time to replicate a value
}
// Observe gets the persistence/replication/CAS state of a key
func (c *Client) Observe(vb uint16, key string) (result ObserveResult, err error) {
// http://www.couchbase.com/wiki/display/couchbase/Observe
body := make([]byte, 4+len(key))
binary.BigEndian.PutUint16(body[0:2], vb)
binary.BigEndian.PutUint16(body[2:4], uint16(len(key)))
copy(body[4:4+len(key)], key)
res, err := c.Send(&gomemcached.MCRequest{
Opcode: gomemcached.OBSERVE,
VBucket: vb,
Body: body,
})
if err != nil {
return
}
// Parse the response data from the body:
if len(res.Body) < 2+2+1 {
err = io.ErrUnexpectedEOF
return
}
outVb := binary.BigEndian.Uint16(res.Body[0:2])
keyLen := binary.BigEndian.Uint16(res.Body[2:4])
if len(res.Body) < 2+2+int(keyLen)+1+8 {
err = io.ErrUnexpectedEOF
return
}
outKey := string(res.Body[4 : 4+keyLen])
if outVb != vb || outKey != key {
err = fmt.Errorf("observe returned wrong vbucket/key: %d/%q", outVb, outKey)
return
}
result.Status = ObservedStatus(res.Body[4+keyLen])
result.Cas = binary.BigEndian.Uint64(res.Body[5+keyLen:])
// The response reuses the Cas field to store time statistics:
result.PersistenceTime = time.Duration(res.Cas>>32) * time.Millisecond
result.ReplicationTime = time.Duration(res.Cas&math.MaxUint32) * time.Millisecond
return
}
// CheckPersistence checks whether a stored value has been persisted to disk yet.
func (result ObserveResult) CheckPersistence(cas uint64, deletion bool) (persisted bool, overwritten bool) {
switch {
case result.Status == ObservedNotFound && deletion:
persisted = true
case result.Cas != cas:
overwritten = true
case result.Status == ObservedPersisted:
persisted = true
}
return
}
// Sequence number based Observe Implementation
type ObserveSeqResult struct {
Failover uint8 // Set to 1 if a failover took place
VbId uint16 // vbucket id
Vbuuid uint64 // vucket uuid
LastPersistedSeqNo uint64 // last persisted sequence number
CurrentSeqNo uint64 // current sequence number
OldVbuuid uint64 // Old bucket vbuuid
LastSeqNo uint64 // last sequence number received before failover
}
func (c *Client) ObserveSeq(vb uint16, vbuuid uint64) (result *ObserveSeqResult, err error) {
// http://www.couchbase.com/wiki/display/couchbase/Observe
body := make([]byte, 8)
binary.BigEndian.PutUint64(body[0:8], vbuuid)
res, err := c.Send(&gomemcached.MCRequest{
Opcode: gomemcached.OBSERVE_SEQNO,
VBucket: vb,
Body: body,
Opaque: 0x01,
})
if err != nil {
return
}
if res.Status != gomemcached.SUCCESS {
return nil, fmt.Errorf(" Observe returned error %v", res.Status)
}
// Parse the response data from the body:
if len(res.Body) < (1 + 2 + 8 + 8 + 8) {
err = io.ErrUnexpectedEOF
return
}
result = &ObserveSeqResult{}
result.Failover = res.Body[0]
result.VbId = binary.BigEndian.Uint16(res.Body[1:3])
result.Vbuuid = binary.BigEndian.Uint64(res.Body[3:11])
result.LastPersistedSeqNo = binary.BigEndian.Uint64(res.Body[11:19])
result.CurrentSeqNo = binary.BigEndian.Uint64(res.Body[19:27])
// in case of failover processing we can have old vbuuid and the last persisted seq number
if result.Failover == 1 && len(res.Body) >= (1+2+8+8+8+8+8) {
result.OldVbuuid = binary.BigEndian.Uint64(res.Body[27:35])
result.LastSeqNo = binary.BigEndian.Uint64(res.Body[35:43])
}
return
}
// CasOp is the type of operation to perform on this CAS loop.
type CasOp uint8
const (
// CASStore instructs the server to store the new value normally
CASStore = CasOp(iota)
// CASQuit instructs the client to stop attempting to CAS, leaving value untouched
CASQuit
// CASDelete instructs the server to delete the current value
CASDelete
)
// User specified termination is returned as an error.
func (c CasOp) Error() string {
switch c {
case CASStore:
return "CAS store"
case CASQuit:
return "CAS quit"
case CASDelete:
return "CAS delete"
}
panic("Unhandled value")
}
//////// CAS TRANSFORM
// CASState tracks the state of CAS over several operations.
//
// This is used directly by CASNext and indirectly by CAS
type CASState struct {
initialized bool // false on the first call to CASNext, then true
Value []byte // Current value of key; update in place to new value
Cas uint64 // Current CAS value of key
Exists bool // Does a value exist for the key? (If not, Value will be nil)
Err error // Error, if any, after CASNext returns false
resp *gomemcached.MCResponse
}
// CASNext is a non-callback, loop-based version of CAS method.
//
// Usage is like this:
//
// var state memcached.CASState
// for client.CASNext(vb, key, exp, &state) {
// state.Value = some_mutation(state.Value)
// }
// if state.Err != nil { ... }
func (c *Client) CASNext(vb uint16, k string, exp int, state *CASState) bool {
if state.initialized {
if !state.Exists {
// Adding a new key:
if state.Value == nil {
state.Cas = 0
return false // no-op (delete of non-existent value)
}
state.resp, state.Err = c.Add(vb, k, 0, exp, state.Value)
} else {
// Updating / deleting a key:
req := &gomemcached.MCRequest{
Opcode: gomemcached.DELETE,
VBucket: vb,
Key: []byte(k),
Cas: state.Cas}
if state.Value != nil {
req.Opcode = gomemcached.SET
req.Opaque = 0
req.Extras = []byte{0, 0, 0, 0, 0, 0, 0, 0}
req.Body = state.Value
flags := 0
binary.BigEndian.PutUint64(req.Extras, uint64(flags)<<32|uint64(exp))
}
state.resp, state.Err = c.Send(req)
}
// If the response status is KEY_EEXISTS or NOT_STORED there's a conflict and we'll need to
// get the new value (below). Otherwise, we're done (either success or failure) so return:
if !(state.resp != nil && (state.resp.Status == gomemcached.KEY_EEXISTS ||
state.resp.Status == gomemcached.NOT_STORED)) {
state.Cas = state.resp.Cas
return false // either success or fatal error
}
}
// Initial call, or after a conflict: GET the current value and CAS and return them:
state.initialized = true
if state.resp, state.Err = c.Get(vb, k); state.Err == nil {
state.Exists = true
state.Value = state.resp.Body
state.Cas = state.resp.Cas
} else if state.resp != nil && state.resp.Status == gomemcached.KEY_ENOENT {
state.Err = nil
state.Exists = false
state.Value = nil
state.Cas = 0
} else {
return false // fatal error
}
return true // keep going...
}
// CasFunc is type type of function to perform a CAS transform.
//
// Input is the current value, or nil if no value exists.
// The function should return the new value (if any) to set, and the store/quit/delete operation.
type CasFunc func(current []byte) ([]byte, CasOp)
// CAS performs a CAS transform with the given function.
//
// If the value does not exist, a nil current value will be sent to f.
func (c *Client) CAS(vb uint16, k string, f CasFunc,
initexp int) (*gomemcached.MCResponse, error) {
var state CASState
for c.CASNext(vb, k, initexp, &state) {
newValue, operation := f(state.Value)
if operation == CASQuit || (operation == CASDelete && state.Value == nil) {
return nil, operation
}
state.Value = newValue
}
return state.resp, state.Err
}
// StatValue is one of the stats returned from the Stats method.
type StatValue struct {
// The stat key
Key string
// The stat value
Val string
}
// Stats requests server-side stats.
//
// Use "" as the stat key for toplevel stats.
func (c *Client) Stats(key string) ([]StatValue, error) {
rv := make([]StatValue, 0, 128)
req := &gomemcached.MCRequest{
Opcode: gomemcached.STAT,
Key: []byte(key),
Opaque: 918494,
}
err := c.Transmit(req)
if err != nil {
return rv, err
}
for {
res, _, err := getResponse(c.conn, c.hdrBuf)
if err != nil {
return rv, err
}
k := string(res.Key)
if k == "" {
break
}
rv = append(rv, StatValue{
Key: k,
Val: string(res.Body),
})
}
return rv, nil
}
// StatsMap requests server-side stats similarly to Stats, but returns
// them as a map.
//
// Use "" as the stat key for toplevel stats.
func (c *Client) StatsMap(key string) (map[string]string, error) {
rv := make(map[string]string)
req := &gomemcached.MCRequest{
Opcode: gomemcached.STAT,
Key: []byte(key),
Opaque: 918494,
}
err := c.Transmit(req)
if err != nil {
return rv, err
}
for {
res, _, err := getResponse(c.conn, c.hdrBuf)
if err != nil {
return rv, err
}
k := string(res.Key)
if k == "" {
break
}
rv[k] = string(res.Body)
}
return rv, nil
}
// instead of returning a new statsMap, simply populate passed in statsMap, which contains all the keys
// for which stats needs to be retrieved
func (c *Client) StatsMapForSpecifiedStats(key string, statsMap map[string]string) error {
// clear statsMap
for key, _ := range statsMap {
statsMap[key] = ""
}
req := &gomemcached.MCRequest{
Opcode: gomemcached.STAT,
Key: []byte(key),
Opaque: 918494,
}
err := c.Transmit(req)
if err != nil {
return err
}
for {
res, _, err := getResponse(c.conn, c.hdrBuf)
if err != nil {
return err
}
k := string(res.Key)
if k == "" {
break
}
if _, ok := statsMap[k]; ok {
statsMap[k] = string(res.Body)
}
}
return nil
}
// UprGetFailoverLog for given list of vbuckets.
func (mc *Client) UprGetFailoverLog(vb []uint16) (map[uint16]*FailoverLog, error) {
rq := &gomemcached.MCRequest{
Opcode: gomemcached.UPR_FAILOVERLOG,
Opaque: opaqueFailover,
}
failoverLogs := make(map[uint16]*FailoverLog)
for _, vBucket := range vb {
rq.VBucket = vBucket
if err := mc.Transmit(rq); err != nil {
return nil, err
}
res, err := mc.Receive()
if err != nil {
return nil, fmt.Errorf("failed to receive %s", err.Error())
} else if res.Opcode != gomemcached.UPR_FAILOVERLOG || res.Status != gomemcached.SUCCESS {
return nil, fmt.Errorf("unexpected #opcode %v", res.Opcode)
}
flog, err := parseFailoverLog(res.Body)
if err != nil {
return nil, fmt.Errorf("unable to parse failover logs for vb %d", vb)
}
failoverLogs[vBucket] = flog
}
return failoverLogs, nil
}
// Hijack exposes the underlying connection from this client.
//
// It also marks the connection as unhealthy since the client will
// have lost control over the connection and can't otherwise verify
// things are in good shape for connection pools.
func (c *Client) Hijack() io.ReadWriteCloser {
c.setHealthy(false)
return c.conn
}
func (c *Client) setHealthy(healthy bool) {
healthyState := UnHealthy
if healthy {
healthyState = Healthy
}
atomic.StoreUint32(&c.healthy, healthyState)
}
func IfResStatusError(response *gomemcached.MCResponse) bool {
return response == nil ||
(response.Status != gomemcached.SUBDOC_BAD_MULTI &&
response.Status != gomemcached.SUBDOC_PATH_NOT_FOUND &&
response.Status != gomemcached.SUBDOC_MULTI_PATH_FAILURE_DELETED)
}
func (c *Client) Conn() io.ReadWriteCloser {
return c.conn
}
// Since the binary request supports only a single collection at a time, it is possible
// that this may be called multiple times in succession by callers to get vbSeqnos for
// multiple collections. Thus, caller could pass in a non-nil map so the gomemcached
// client won't need to allocate new map for each call to prevent too much GC
// NOTE: If collection is enabled and context is not given, KV will still return stats for default collection
func (c *Client) GetAllVbSeqnos(vbSeqnoMap map[uint16]uint64, context ...*ClientContext) (map[uint16]uint64, error) {
rq := &gomemcached.MCRequest{
Opcode: gomemcached.GET_ALL_VB_SEQNOS,
Opaque: opaqueGetSeqno,
}
err := c.setVbSeqnoContext(rq, context...)
if err != nil {
return vbSeqnoMap, err
}
err = c.Transmit(rq)
if err != nil {
return vbSeqnoMap, err
}
res, err := c.Receive()
if err != nil {
return vbSeqnoMap, fmt.Errorf("failed to receive: %v", err)
}
vbSeqnosList, err := parseGetSeqnoResp(res.Body)
if err != nil {
logging.Errorf("Unable to parse : err: %v\n", err)
return vbSeqnoMap, err
}
if vbSeqnoMap == nil {
vbSeqnoMap = make(map[uint16]uint64)
}
combineMapWithReturnedList(vbSeqnoMap, vbSeqnosList)
return vbSeqnoMap, nil
}
func combineMapWithReturnedList(vbSeqnoMap map[uint16]uint64, list *VBSeqnos) {
if list == nil {
return
}
// If the map contains exactly the existing vbs in the list, no need to modify
needToCleanupMap := true
if len(vbSeqnoMap) == 0 {
needToCleanupMap = false
} else if len(vbSeqnoMap) == len(*list) {
needToCleanupMap = false
for _, pair := range *list {
_, vbExists := vbSeqnoMap[uint16(pair[0])]
if !vbExists {
needToCleanupMap = true
break
}
}
}
if needToCleanupMap {
var vbsToDelete []uint16
for vbInSeqnoMap, _ := range vbSeqnoMap {
// If a vb in the seqno map doesn't exist in the returned list, need to clean up
// to ensure returning an accurate result
found := false
var vbno uint16
for _, pair := range *list {
vbno = uint16(pair[0])
if vbno == vbInSeqnoMap {
found = true
break
} else if vbno > vbInSeqnoMap {
// definitely not in the list
break
}
}
if !found {
vbsToDelete = append(vbsToDelete, vbInSeqnoMap)
}
}
for _, vbno := range vbsToDelete {
delete(vbSeqnoMap, vbno)
}
}
// Set the map with data from the list
for _, pair := range *list {
vbno := uint16(pair[0])
seqno := pair[1]
vbSeqnoMap[vbno] = seqno
}
}