You can not select more than 25 topics
			Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.
		
		
		
		
		
			
		
			
				
					
					
						
							1477 lines
						
					
					
						
							38 KiB
						
					
					
				
			
		
		
	
	
							1477 lines
						
					
					
						
							38 KiB
						
					
					
				| /*
 | |
| Package couchbase provides a smart client for go.
 | |
| 
 | |
| Usage:
 | |
| 
 | |
|  client, err := couchbase.Connect("http://myserver:8091/")
 | |
|  handleError(err)
 | |
|  pool, err := client.GetPool("default")
 | |
|  handleError(err)
 | |
|  bucket, err := pool.GetBucket("MyAwesomeBucket")
 | |
|  handleError(err)
 | |
|  ...
 | |
| 
 | |
| or a shortcut for the bucket directly
 | |
| 
 | |
|  bucket, err := couchbase.GetBucket("http://myserver:8091/", "default", "default")
 | |
| 
 | |
| in any case, you can specify authentication credentials using
 | |
| standard URL userinfo syntax:
 | |
| 
 | |
|  b, err := couchbase.GetBucket("http://bucketname:bucketpass@myserver:8091/",
 | |
|          "default", "bucket")
 | |
| */
 | |
| package couchbase
 | |
| 
 | |
| import (
 | |
| 	"encoding/binary"
 | |
| 	"encoding/json"
 | |
| 	"errors"
 | |
| 	"fmt"
 | |
| 	"io"
 | |
| 	"runtime"
 | |
| 	"strconv"
 | |
| 	"strings"
 | |
| 	"sync"
 | |
| 	"time"
 | |
| 	"unsafe"
 | |
| 
 | |
| 	"github.com/couchbase/gomemcached"
 | |
| 	"github.com/couchbase/gomemcached/client" // package name is 'memcached'
 | |
| 	"github.com/couchbase/goutils/logging"
 | |
| )
 | |
| 
 | |
| // Mutation Token
 | |
| type MutationToken struct {
 | |
| 	VBid  uint16 // vbucket id
 | |
| 	Guard uint64 // vbuuid
 | |
| 	Value uint64 // sequence number
 | |
| }
 | |
| 
 | |
| // Maximum number of times to retry a chunk of a bulk get on error.
 | |
| var MaxBulkRetries = 5000
 | |
| var backOffDuration time.Duration = 100 * time.Millisecond
 | |
| var MaxBackOffRetries = 25 // exponentail backOff result in over 30sec (25*13*0.1s)
 | |
| 
 | |
| // If this is set to a nonzero duration, Do() and ViewCustom() will log a warning if the call
 | |
| // takes longer than that.
 | |
| var SlowServerCallWarningThreshold time.Duration
 | |
| 
 | |
| func slowLog(startTime time.Time, format string, args ...interface{}) {
 | |
| 	if elapsed := time.Now().Sub(startTime); elapsed > SlowServerCallWarningThreshold {
 | |
| 		pc, _, _, _ := runtime.Caller(2)
 | |
| 		caller := runtime.FuncForPC(pc).Name()
 | |
| 		logging.Infof("go-couchbase: "+format+" in "+caller+" took "+elapsed.String(), args...)
 | |
| 	}
 | |
| }
 | |
| 
 | |
| // Return true if error is KEY_ENOENT. Required by cbq-engine
 | |
| func IsKeyEExistsError(err error) bool {
 | |
| 
 | |
| 	res, ok := err.(*gomemcached.MCResponse)
 | |
| 	if ok && res.Status == gomemcached.KEY_EEXISTS {
 | |
| 		return true
 | |
| 	}
 | |
| 
 | |
| 	return false
 | |
| }
 | |
| 
 | |
| // Return true if error is KEY_ENOENT. Required by cbq-engine
 | |
| func IsKeyNoEntError(err error) bool {
 | |
| 
 | |
| 	res, ok := err.(*gomemcached.MCResponse)
 | |
| 	if ok && res.Status == gomemcached.KEY_ENOENT {
 | |
| 		return true
 | |
| 	}
 | |
| 
 | |
| 	return false
 | |
| }
 | |
| 
 | |
| // Return true if error suggests a bucket refresh is required. Required by cbq-engine
 | |
| func IsRefreshRequired(err error) bool {
 | |
| 
 | |
| 	res, ok := err.(*gomemcached.MCResponse)
 | |
| 	if ok && (res.Status == gomemcached.NO_BUCKET || res.Status == gomemcached.NOT_MY_VBUCKET) {
 | |
| 		return true
 | |
| 	}
 | |
| 
 | |
| 	return false
 | |
| }
 | |
| 
 | |
| // ClientOpCallback is called for each invocation of Do.
 | |
| var ClientOpCallback func(opname, k string, start time.Time, err error)
 | |
| 
 | |
| // Do executes a function on a memcached connection to the node owning key "k"
 | |
| //
 | |
| // Note that this automatically handles transient errors by replaying
 | |
| // your function on a "not-my-vbucket" error, so don't assume
 | |
| // your command will only be executed only once.
 | |
| func (b *Bucket) Do(k string, f func(mc *memcached.Client, vb uint16) error) (err error) {
 | |
| 	return b.Do2(k, f, true)
 | |
| }
 | |
| 
 | |
| func (b *Bucket) Do2(k string, f func(mc *memcached.Client, vb uint16) error, deadline bool) (err error) {
 | |
| 	if SlowServerCallWarningThreshold > 0 {
 | |
| 		defer slowLog(time.Now(), "call to Do(%q)", k)
 | |
| 	}
 | |
| 
 | |
| 	vb := b.VBHash(k)
 | |
| 	maxTries := len(b.Nodes()) * 2
 | |
| 	for i := 0; i < maxTries; i++ {
 | |
| 		conn, pool, err := b.getConnectionToVBucket(vb)
 | |
| 		if err != nil {
 | |
| 			if isConnError(err) && backOff(i, maxTries, backOffDuration, true) {
 | |
| 				b.Refresh()
 | |
| 				continue
 | |
| 			}
 | |
| 			return err
 | |
| 		}
 | |
| 
 | |
| 		if deadline && DefaultTimeout > 0 {
 | |
| 			conn.SetDeadline(getDeadline(noDeadline, DefaultTimeout))
 | |
| 			err = f(conn, uint16(vb))
 | |
| 			conn.SetDeadline(noDeadline)
 | |
| 		} else {
 | |
| 			err = f(conn, uint16(vb))
 | |
| 		}
 | |
| 
 | |
| 		var retry bool
 | |
| 		discard := isOutOfBoundsError(err)
 | |
| 
 | |
| 		// MB-30967 / MB-31001 implement back off for transient errors
 | |
| 		if resp, ok := err.(*gomemcached.MCResponse); ok {
 | |
| 			switch resp.Status {
 | |
| 			case gomemcached.NOT_MY_VBUCKET:
 | |
| 				b.Refresh()
 | |
| 				// MB-28842: in case of NMVB, check if the node is still part of the map
 | |
| 				// and ditch the connection if it isn't.
 | |
| 				discard = b.checkVBmap(pool.Node())
 | |
| 				retry = true
 | |
| 			case gomemcached.NOT_SUPPORTED:
 | |
| 				discard = true
 | |
| 				retry = true
 | |
| 			case gomemcached.ENOMEM:
 | |
| 				fallthrough
 | |
| 			case gomemcached.TMPFAIL:
 | |
| 				retry = backOff(i, maxTries, backOffDuration, true)
 | |
| 			default:
 | |
| 				retry = false
 | |
| 			}
 | |
| 		} else if err != nil && isConnError(err) && backOff(i, maxTries, backOffDuration, true) {
 | |
| 			retry = true
 | |
| 		}
 | |
| 
 | |
| 		if discard {
 | |
| 			pool.Discard(conn)
 | |
| 		} else {
 | |
| 			pool.Return(conn)
 | |
| 		}
 | |
| 
 | |
| 		if !retry {
 | |
| 			return err
 | |
| 		}
 | |
| 	}
 | |
| 
 | |
| 	return fmt.Errorf("unable to complete action after %v attemps", maxTries)
 | |
| }
 | |
| 
 | |
| type GatheredStats struct {
 | |
| 	Server string
 | |
| 	Stats  map[string]string
 | |
| 	Err    error
 | |
| }
 | |
| 
 | |
| func getStatsParallel(sn string, b *Bucket, offset int, which string,
 | |
| 	ch chan<- GatheredStats) {
 | |
| 	pool := b.getConnPool(offset)
 | |
| 	var gatheredStats GatheredStats
 | |
| 
 | |
| 	conn, err := pool.Get()
 | |
| 	defer func() {
 | |
| 		pool.Return(conn)
 | |
| 		ch <- gatheredStats
 | |
| 	}()
 | |
| 
 | |
| 	if err != nil {
 | |
| 		gatheredStats = GatheredStats{Server: sn, Err: err}
 | |
| 	} else {
 | |
| 		sm, err := conn.StatsMap(which)
 | |
| 		gatheredStats = GatheredStats{Server: sn, Stats: sm, Err: err}
 | |
| 	}
 | |
| }
 | |
| 
 | |
| // GetStats gets a set of stats from all servers.
 | |
| //
 | |
| // Returns a map of server ID -> map of stat key to map value.
 | |
| func (b *Bucket) GetStats(which string) map[string]map[string]string {
 | |
| 	rv := map[string]map[string]string{}
 | |
| 	for server, gs := range b.GatherStats(which) {
 | |
| 		if len(gs.Stats) > 0 {
 | |
| 			rv[server] = gs.Stats
 | |
| 		}
 | |
| 	}
 | |
| 	return rv
 | |
| }
 | |
| 
 | |
| // GatherStats returns a map of server ID -> GatheredStats from all servers.
 | |
| func (b *Bucket) GatherStats(which string) map[string]GatheredStats {
 | |
| 	vsm := b.VBServerMap()
 | |
| 	if vsm.ServerList == nil {
 | |
| 		return nil
 | |
| 	}
 | |
| 
 | |
| 	// Go grab all the things at once.
 | |
| 	ch := make(chan GatheredStats, len(vsm.ServerList))
 | |
| 	for i, sn := range vsm.ServerList {
 | |
| 		go getStatsParallel(sn, b, i, which, ch)
 | |
| 	}
 | |
| 
 | |
| 	// Gather the results
 | |
| 	rv := map[string]GatheredStats{}
 | |
| 	for range vsm.ServerList {
 | |
| 		gs := <-ch
 | |
| 		rv[gs.Server] = gs
 | |
| 	}
 | |
| 	return rv
 | |
| }
 | |
| 
 | |
| // Get bucket count through the bucket stats
 | |
| func (b *Bucket) GetCount(refresh bool) (count int64, err error) {
 | |
| 	if refresh {
 | |
| 		b.Refresh()
 | |
| 	}
 | |
| 
 | |
| 	var cnt int64
 | |
| 	for _, gs := range b.GatherStats("") {
 | |
| 		if len(gs.Stats) > 0 {
 | |
| 			cnt, err = strconv.ParseInt(gs.Stats["curr_items"], 10, 64)
 | |
| 			if err != nil {
 | |
| 				return 0, err
 | |
| 			}
 | |
| 			count += cnt
 | |
| 		}
 | |
| 	}
 | |
| 
 | |
| 	return count, nil
 | |
| }
 | |
| 
 | |
| // Get bucket document size through the bucket stats
 | |
| func (b *Bucket) GetSize(refresh bool) (size int64, err error) {
 | |
| 	if refresh {
 | |
| 		b.Refresh()
 | |
| 	}
 | |
| 
 | |
| 	var sz int64
 | |
| 	for _, gs := range b.GatherStats("") {
 | |
| 		if len(gs.Stats) > 0 {
 | |
| 			sz, err = strconv.ParseInt(gs.Stats["ep_value_size"], 10, 64)
 | |
| 			if err != nil {
 | |
| 				return 0, err
 | |
| 			}
 | |
| 			size += sz
 | |
| 		}
 | |
| 	}
 | |
| 
 | |
| 	return size, nil
 | |
| }
 | |
| 
 | |
| func isAuthError(err error) bool {
 | |
| 	estr := err.Error()
 | |
| 	return strings.Contains(estr, "Auth failure")
 | |
| }
 | |
| 
 | |
| func IsReadTimeOutError(err error) bool {
 | |
| 	estr := err.Error()
 | |
| 	return strings.Contains(estr, "read tcp") ||
 | |
| 		strings.Contains(estr, "i/o timeout")
 | |
| }
 | |
| 
 | |
| func isTimeoutError(err error) bool {
 | |
| 	estr := err.Error()
 | |
| 	return strings.Contains(estr, "i/o timeout") ||
 | |
| 		strings.Contains(estr, "connection timed out") ||
 | |
| 		strings.Contains(estr, "no route to host")
 | |
| }
 | |
| 
 | |
| // Errors that are not considered fatal for our fetch loop
 | |
| func isConnError(err error) bool {
 | |
| 	if err == io.EOF {
 | |
| 		return true
 | |
| 	}
 | |
| 	estr := err.Error()
 | |
| 	return strings.Contains(estr, "broken pipe") ||
 | |
| 		strings.Contains(estr, "connection reset") ||
 | |
| 		strings.Contains(estr, "connection refused") ||
 | |
| 		strings.Contains(estr, "connection pool is closed")
 | |
| }
 | |
| 
 | |
| func isOutOfBoundsError(err error) bool {
 | |
| 	return err != nil && strings.Contains(err.Error(), "Out of Bounds error")
 | |
| 
 | |
| }
 | |
| 
 | |
| func getDeadline(reqDeadline time.Time, duration time.Duration) time.Time {
 | |
| 	if reqDeadline.IsZero() && duration > 0 {
 | |
| 		return time.Now().Add(duration)
 | |
| 	}
 | |
| 	return reqDeadline
 | |
| }
 | |
| 
 | |
| func backOff(attempt, maxAttempts int, duration time.Duration, exponential bool) bool {
 | |
| 	if attempt < maxAttempts {
 | |
| 		// 0th attempt return immediately
 | |
| 		if attempt > 0 {
 | |
| 			if exponential {
 | |
| 				duration = time.Duration(attempt) * duration
 | |
| 			}
 | |
| 			time.Sleep(duration)
 | |
| 		}
 | |
| 		return true
 | |
| 	}
 | |
| 
 | |
| 	return false
 | |
| }
 | |
| 
 | |
| func (b *Bucket) doBulkGet(vb uint16, keys []string, reqDeadline time.Time,
 | |
| 	ch chan<- map[string]*gomemcached.MCResponse, ech chan<- error, subPaths []string,
 | |
| 	eStatus *errorStatus) {
 | |
| 	if SlowServerCallWarningThreshold > 0 {
 | |
| 		defer slowLog(time.Now(), "call to doBulkGet(%d, %d keys)", vb, len(keys))
 | |
| 	}
 | |
| 
 | |
| 	rv := _STRING_MCRESPONSE_POOL.Get()
 | |
| 	attempts := 0
 | |
| 	backOffAttempts := 0
 | |
| 	done := false
 | |
| 	bname := b.Name
 | |
| 	for ; attempts < MaxBulkRetries && !done && !eStatus.errStatus; attempts++ {
 | |
| 
 | |
| 		if len(b.VBServerMap().VBucketMap) < int(vb) {
 | |
| 			//fatal
 | |
| 			err := fmt.Errorf("vbmap smaller than requested for %v", bname)
 | |
| 			logging.Errorf("go-couchbase: %v vb %d vbmap len %d", err.Error(), vb, len(b.VBServerMap().VBucketMap))
 | |
| 			ech <- err
 | |
| 			return
 | |
| 		}
 | |
| 
 | |
| 		masterID := b.VBServerMap().VBucketMap[vb][0]
 | |
| 
 | |
| 		if masterID < 0 {
 | |
| 			// fatal
 | |
| 			err := fmt.Errorf("No master node available for %v vb %d", bname, vb)
 | |
| 			logging.Errorf("%v", err.Error())
 | |
| 			ech <- err
 | |
| 			return
 | |
| 		}
 | |
| 
 | |
| 		// This stack frame exists to ensure we can clean up
 | |
| 		// connection at a reasonable time.
 | |
| 		err := func() error {
 | |
| 			pool := b.getConnPool(masterID)
 | |
| 			conn, err := pool.Get()
 | |
| 			if err != nil {
 | |
| 				if isAuthError(err) || isTimeoutError(err) {
 | |
| 					logging.Errorf("Fatal Error %v : %v", bname, err)
 | |
| 					ech <- err
 | |
| 					return err
 | |
| 				} else if isConnError(err) {
 | |
| 					if !backOff(backOffAttempts, MaxBackOffRetries, backOffDuration, true) {
 | |
| 						logging.Errorf("Connection Error %v : %v", bname, err)
 | |
| 						ech <- err
 | |
| 						return err
 | |
| 					}
 | |
| 					b.Refresh()
 | |
| 					backOffAttempts++
 | |
| 				}
 | |
| 				logging.Infof("Pool Get returned %v: %v", bname, err)
 | |
| 				// retry
 | |
| 				return nil
 | |
| 			}
 | |
| 
 | |
| 			conn.SetDeadline(getDeadline(reqDeadline, DefaultTimeout))
 | |
| 			err = conn.GetBulk(vb, keys, rv, subPaths)
 | |
| 			conn.SetDeadline(noDeadline)
 | |
| 
 | |
| 			discard := false
 | |
| 			defer func() {
 | |
| 				if discard {
 | |
| 					pool.Discard(conn)
 | |
| 				} else {
 | |
| 					pool.Return(conn)
 | |
| 				}
 | |
| 			}()
 | |
| 
 | |
| 			switch err.(type) {
 | |
| 			case *gomemcached.MCResponse:
 | |
| 				notSMaxTries := len(b.Nodes()) * 2
 | |
| 				st := err.(*gomemcached.MCResponse).Status
 | |
| 				if st == gomemcached.NOT_MY_VBUCKET || (st == gomemcached.NOT_SUPPORTED && attempts < notSMaxTries) {
 | |
| 					b.Refresh()
 | |
| 					discard = b.checkVBmap(pool.Node())
 | |
| 					return nil // retry
 | |
| 				} else if st == gomemcached.EBUSY || st == gomemcached.LOCKED {
 | |
| 					if (attempts % (MaxBulkRetries / 100)) == 0 {
 | |
| 						logging.Infof("Retrying Memcached error (%v) FOR %v(vbid:%d, keys:<ud>%v</ud>)",
 | |
| 							err.Error(), bname, vb, keys)
 | |
| 					}
 | |
| 					return nil // retry
 | |
| 				} else if (st == gomemcached.ENOMEM || st == gomemcached.TMPFAIL) && backOff(backOffAttempts, MaxBackOffRetries, backOffDuration, true) {
 | |
| 					// MB-30967 / MB-31001 use backoff for TMPFAIL too
 | |
| 					backOffAttempts++
 | |
| 					logging.Infof("Retrying Memcached error (%v) FOR %v(vbid:%d, keys:<ud>%v</ud>)",
 | |
| 						err.Error(), bname, vb, keys)
 | |
| 					return nil // retry
 | |
| 				}
 | |
| 				ech <- err
 | |
| 				return err
 | |
| 			case error:
 | |
| 				if isOutOfBoundsError(err) {
 | |
| 					// We got an out of bound error, retry the operation
 | |
| 					discard = true
 | |
| 					return nil
 | |
| 				} else if isConnError(err) && backOff(backOffAttempts, MaxBackOffRetries, backOffDuration, true) {
 | |
| 					backOffAttempts++
 | |
| 					logging.Errorf("Connection Error: %s. Refreshing bucket %v (vbid:%v,keys:<ud>%v</ud>)",
 | |
| 						err.Error(), bname, vb, keys)
 | |
| 					discard = true
 | |
| 					b.Refresh()
 | |
| 					return nil // retry
 | |
| 				}
 | |
| 				ech <- err
 | |
| 				ch <- rv
 | |
| 				return err
 | |
| 			}
 | |
| 
 | |
| 			done = true
 | |
| 			return nil
 | |
| 		}()
 | |
| 
 | |
| 		if err != nil {
 | |
| 			return
 | |
| 		}
 | |
| 	}
 | |
| 
 | |
| 	if attempts >= MaxBulkRetries {
 | |
| 		err := fmt.Errorf("bulkget exceeded MaxBulkRetries for %v(vbid:%d,keys:<ud>%v</ud>)", bname, vb, keys)
 | |
| 		logging.Errorf("%v", err.Error())
 | |
| 		ech <- err
 | |
| 	}
 | |
| 
 | |
| 	ch <- rv
 | |
| }
 | |
| 
 | |
| type errorStatus struct {
 | |
| 	errStatus bool
 | |
| }
 | |
| 
 | |
| type vbBulkGet struct {
 | |
| 	b           *Bucket
 | |
| 	ch          chan<- map[string]*gomemcached.MCResponse
 | |
| 	ech         chan<- error
 | |
| 	k           uint16
 | |
| 	keys        []string
 | |
| 	reqDeadline time.Time
 | |
| 	wg          *sync.WaitGroup
 | |
| 	subPaths    []string
 | |
| 	groupError  *errorStatus
 | |
| }
 | |
| 
 | |
| const _NUM_CHANNELS = 5
 | |
| 
 | |
| var _NUM_CHANNEL_WORKERS = (runtime.NumCPU() + 1) / 2
 | |
| var DefaultDialTimeout = time.Duration(0)
 | |
| var DefaultTimeout = time.Duration(0)
 | |
| var noDeadline = time.Time{}
 | |
| 
 | |
| // Buffer 4k requests per worker
 | |
| var _VB_BULK_GET_CHANNELS []chan *vbBulkGet
 | |
| 
 | |
| func InitBulkGet() {
 | |
| 
 | |
| 	DefaultDialTimeout = 20 * time.Second
 | |
| 	DefaultTimeout = 120 * time.Second
 | |
| 
 | |
| 	memcached.SetDefaultDialTimeout(DefaultDialTimeout)
 | |
| 
 | |
| 	_VB_BULK_GET_CHANNELS = make([]chan *vbBulkGet, _NUM_CHANNELS)
 | |
| 
 | |
| 	for i := 0; i < _NUM_CHANNELS; i++ {
 | |
| 		channel := make(chan *vbBulkGet, 16*1024*_NUM_CHANNEL_WORKERS)
 | |
| 		_VB_BULK_GET_CHANNELS[i] = channel
 | |
| 
 | |
| 		for j := 0; j < _NUM_CHANNEL_WORKERS; j++ {
 | |
| 			go vbBulkGetWorker(channel)
 | |
| 		}
 | |
| 	}
 | |
| }
 | |
| 
 | |
| func vbBulkGetWorker(ch chan *vbBulkGet) {
 | |
| 	defer func() {
 | |
| 		// Workers cannot panic and die
 | |
| 		recover()
 | |
| 		go vbBulkGetWorker(ch)
 | |
| 	}()
 | |
| 
 | |
| 	for vbg := range ch {
 | |
| 		vbDoBulkGet(vbg)
 | |
| 	}
 | |
| }
 | |
| 
 | |
| func vbDoBulkGet(vbg *vbBulkGet) {
 | |
| 	defer vbg.wg.Done()
 | |
| 	defer func() {
 | |
| 		// Workers cannot panic and die
 | |
| 		recover()
 | |
| 	}()
 | |
| 	vbg.b.doBulkGet(vbg.k, vbg.keys, vbg.reqDeadline, vbg.ch, vbg.ech, vbg.subPaths, vbg.groupError)
 | |
| }
 | |
| 
 | |
| var _ERR_CHAN_FULL = fmt.Errorf("Data request queue full, aborting query.")
 | |
| 
 | |
| func (b *Bucket) processBulkGet(kdm map[uint16][]string, reqDeadline time.Time,
 | |
| 	ch chan<- map[string]*gomemcached.MCResponse, ech chan<- error, subPaths []string,
 | |
| 	eStatus *errorStatus) {
 | |
| 
 | |
| 	defer close(ch)
 | |
| 	defer close(ech)
 | |
| 
 | |
| 	wg := &sync.WaitGroup{}
 | |
| 
 | |
| 	for k, keys := range kdm {
 | |
| 
 | |
| 		// GetBulk() group has error donot Queue items for this group
 | |
| 		if eStatus.errStatus {
 | |
| 			break
 | |
| 		}
 | |
| 
 | |
| 		vbg := &vbBulkGet{
 | |
| 			b:           b,
 | |
| 			ch:          ch,
 | |
| 			ech:         ech,
 | |
| 			k:           k,
 | |
| 			keys:        keys,
 | |
| 			reqDeadline: reqDeadline,
 | |
| 			wg:          wg,
 | |
| 			subPaths:    subPaths,
 | |
| 			groupError:  eStatus,
 | |
| 		}
 | |
| 
 | |
| 		wg.Add(1)
 | |
| 
 | |
| 		// Random int
 | |
| 		// Right shift to avoid 8-byte alignment, and take low bits
 | |
| 		c := (uintptr(unsafe.Pointer(vbg)) >> 4) % _NUM_CHANNELS
 | |
| 
 | |
| 		select {
 | |
| 		case _VB_BULK_GET_CHANNELS[c] <- vbg:
 | |
| 			// No-op
 | |
| 		default:
 | |
| 			// Buffer full, abandon the bulk get
 | |
| 			ech <- _ERR_CHAN_FULL
 | |
| 			wg.Add(-1)
 | |
| 		}
 | |
| 	}
 | |
| 
 | |
| 	// Wait for my vb bulk gets
 | |
| 	wg.Wait()
 | |
| }
 | |
| 
 | |
| type multiError []error
 | |
| 
 | |
| func (m multiError) Error() string {
 | |
| 	if len(m) == 0 {
 | |
| 		panic("Error of none")
 | |
| 	}
 | |
| 
 | |
| 	return fmt.Sprintf("{%v errors, starting with %v}", len(m), m[0].Error())
 | |
| }
 | |
| 
 | |
| // Convert a stream of errors from ech into a multiError (or nil) and
 | |
| // send down eout.
 | |
| //
 | |
| // At least one send is guaranteed on eout, but two is possible, so
 | |
| // buffer the out channel appropriately.
 | |
| func errorCollector(ech <-chan error, eout chan<- error, eStatus *errorStatus) {
 | |
| 	defer func() { eout <- nil }()
 | |
| 	var errs multiError
 | |
| 	for e := range ech {
 | |
| 		if !eStatus.errStatus && !IsKeyNoEntError(e) {
 | |
| 			eStatus.errStatus = true
 | |
| 		}
 | |
| 
 | |
| 		errs = append(errs, e)
 | |
| 	}
 | |
| 
 | |
| 	if len(errs) > 0 {
 | |
| 		eout <- errs
 | |
| 	}
 | |
| }
 | |
| 
 | |
| // Fetches multiple keys concurrently, with []byte values
 | |
| //
 | |
| // This is a wrapper around GetBulk which converts all values returned
 | |
| // by GetBulk from raw memcached responses into []byte slices.
 | |
| // Returns one document for duplicate keys
 | |
| func (b *Bucket) GetBulkRaw(keys []string) (map[string][]byte, error) {
 | |
| 
 | |
| 	resp, eout := b.getBulk(keys, noDeadline, nil)
 | |
| 
 | |
| 	rv := make(map[string][]byte, len(keys))
 | |
| 	for k, av := range resp {
 | |
| 		rv[k] = av.Body
 | |
| 	}
 | |
| 
 | |
| 	b.ReleaseGetBulkPools(resp)
 | |
| 	return rv, eout
 | |
| 
 | |
| }
 | |
| 
 | |
| // GetBulk fetches multiple keys concurrently.
 | |
| //
 | |
| // Unlike more convenient GETs, the entire response is returned in the
 | |
| // map array for each key.  Keys that were not found will not be included in
 | |
| // the map.
 | |
| 
 | |
| func (b *Bucket) GetBulk(keys []string, reqDeadline time.Time, subPaths []string) (map[string]*gomemcached.MCResponse, error) {
 | |
| 	return b.getBulk(keys, reqDeadline, subPaths)
 | |
| }
 | |
| 
 | |
| func (b *Bucket) ReleaseGetBulkPools(rv map[string]*gomemcached.MCResponse) {
 | |
| 	_STRING_MCRESPONSE_POOL.Put(rv)
 | |
| }
 | |
| 
 | |
| func (b *Bucket) getBulk(keys []string, reqDeadline time.Time, subPaths []string) (map[string]*gomemcached.MCResponse, error) {
 | |
| 	kdm := _VB_STRING_POOL.Get()
 | |
| 	defer _VB_STRING_POOL.Put(kdm)
 | |
| 	for _, k := range keys {
 | |
| 		if k != "" {
 | |
| 			vb := uint16(b.VBHash(k))
 | |
| 			a, ok1 := kdm[vb]
 | |
| 			if !ok1 {
 | |
| 				a = _STRING_POOL.Get()
 | |
| 			}
 | |
| 			kdm[vb] = append(a, k)
 | |
| 		}
 | |
| 	}
 | |
| 
 | |
| 	eout := make(chan error, 2)
 | |
| 	groupErrorStatus := &errorStatus{}
 | |
| 
 | |
| 	// processBulkGet will own both of these channels and
 | |
| 	// guarantee they're closed before it returns.
 | |
| 	ch := make(chan map[string]*gomemcached.MCResponse)
 | |
| 	ech := make(chan error)
 | |
| 
 | |
| 	go errorCollector(ech, eout, groupErrorStatus)
 | |
| 	go b.processBulkGet(kdm, reqDeadline, ch, ech, subPaths, groupErrorStatus)
 | |
| 
 | |
| 	var rv map[string]*gomemcached.MCResponse
 | |
| 
 | |
| 	for m := range ch {
 | |
| 		if rv == nil {
 | |
| 			rv = m
 | |
| 			continue
 | |
| 		}
 | |
| 
 | |
| 		for k, v := range m {
 | |
| 			rv[k] = v
 | |
| 		}
 | |
| 		_STRING_MCRESPONSE_POOL.Put(m)
 | |
| 	}
 | |
| 
 | |
| 	return rv, <-eout
 | |
| }
 | |
| 
 | |
| // WriteOptions is the set of option flags availble for the Write
 | |
| // method.  They are ORed together to specify the desired request.
 | |
| type WriteOptions int
 | |
| 
 | |
| const (
 | |
| 	// Raw specifies that the value is raw []byte or nil; don't
 | |
| 	// JSON-encode it.
 | |
| 	Raw = WriteOptions(1 << iota)
 | |
| 	// AddOnly indicates an item should only be written if it
 | |
| 	// doesn't exist, otherwise ErrKeyExists is returned.
 | |
| 	AddOnly
 | |
| 	// Persist causes the operation to block until the server
 | |
| 	// confirms the item is persisted.
 | |
| 	Persist
 | |
| 	// Indexable causes the operation to block until it's availble via the index.
 | |
| 	Indexable
 | |
| 	// Append indicates the given value should be appended to the
 | |
| 	// existing value for the given key.
 | |
| 	Append
 | |
| )
 | |
| 
 | |
| var optNames = []struct {
 | |
| 	opt  WriteOptions
 | |
| 	name string
 | |
| }{
 | |
| 	{Raw, "raw"},
 | |
| 	{AddOnly, "addonly"}, {Persist, "persist"},
 | |
| 	{Indexable, "indexable"}, {Append, "append"},
 | |
| }
 | |
| 
 | |
| // String representation of WriteOptions
 | |
| func (w WriteOptions) String() string {
 | |
| 	f := []string{}
 | |
| 	for _, on := range optNames {
 | |
| 		if w&on.opt != 0 {
 | |
| 			f = append(f, on.name)
 | |
| 			w &= ^on.opt
 | |
| 		}
 | |
| 	}
 | |
| 	if len(f) == 0 || w != 0 {
 | |
| 		f = append(f, fmt.Sprintf("0x%x", int(w)))
 | |
| 	}
 | |
| 	return strings.Join(f, "|")
 | |
| }
 | |
| 
 | |
| // Error returned from Write with AddOnly flag, when key already exists in the bucket.
 | |
| var ErrKeyExists = errors.New("key exists")
 | |
| 
 | |
| // General-purpose value setter.
 | |
| //
 | |
| // The Set, Add and Delete methods are just wrappers around this.  The
 | |
| // interpretation of `v` depends on whether the `Raw` option is
 | |
| // given. If it is, v must be a byte array or nil. (A nil value causes
 | |
| // a delete.) If `Raw` is not given, `v` will be marshaled as JSON
 | |
| // before being written. It must be JSON-marshalable and it must not
 | |
| // be nil.
 | |
| func (b *Bucket) Write(k string, flags, exp int, v interface{},
 | |
| 	opt WriteOptions) (err error) {
 | |
| 
 | |
| 	if ClientOpCallback != nil {
 | |
| 		defer func(t time.Time) {
 | |
| 			ClientOpCallback(fmt.Sprintf("Write(%v)", opt), k, t, err)
 | |
| 		}(time.Now())
 | |
| 	}
 | |
| 
 | |
| 	var data []byte
 | |
| 	if opt&Raw == 0 {
 | |
| 		data, err = json.Marshal(v)
 | |
| 		if err != nil {
 | |
| 			return err
 | |
| 		}
 | |
| 	} else if v != nil {
 | |
| 		data = v.([]byte)
 | |
| 	}
 | |
| 
 | |
| 	var res *gomemcached.MCResponse
 | |
| 	err = b.Do(k, func(mc *memcached.Client, vb uint16) error {
 | |
| 		if opt&AddOnly != 0 {
 | |
| 			res, err = memcached.UnwrapMemcachedError(
 | |
| 				mc.Add(vb, k, flags, exp, data))
 | |
| 			if err == nil && res.Status != gomemcached.SUCCESS {
 | |
| 				if res.Status == gomemcached.KEY_EEXISTS {
 | |
| 					err = ErrKeyExists
 | |
| 				} else {
 | |
| 					err = res
 | |
| 				}
 | |
| 			}
 | |
| 		} else if opt&Append != 0 {
 | |
| 			res, err = mc.Append(vb, k, data)
 | |
| 		} else if data == nil {
 | |
| 			res, err = mc.Del(vb, k)
 | |
| 		} else {
 | |
| 			res, err = mc.Set(vb, k, flags, exp, data)
 | |
| 		}
 | |
| 
 | |
| 		return err
 | |
| 	})
 | |
| 
 | |
| 	if err == nil && (opt&(Persist|Indexable) != 0) {
 | |
| 		err = b.WaitForPersistence(k, res.Cas, data == nil)
 | |
| 	}
 | |
| 
 | |
| 	return err
 | |
| }
 | |
| 
 | |
| func (b *Bucket) WriteWithMT(k string, flags, exp int, v interface{},
 | |
| 	opt WriteOptions) (mt *MutationToken, err error) {
 | |
| 
 | |
| 	if ClientOpCallback != nil {
 | |
| 		defer func(t time.Time) {
 | |
| 			ClientOpCallback(fmt.Sprintf("WriteWithMT(%v)", opt), k, t, err)
 | |
| 		}(time.Now())
 | |
| 	}
 | |
| 
 | |
| 	var data []byte
 | |
| 	if opt&Raw == 0 {
 | |
| 		data, err = json.Marshal(v)
 | |
| 		if err != nil {
 | |
| 			return nil, err
 | |
| 		}
 | |
| 	} else if v != nil {
 | |
| 		data = v.([]byte)
 | |
| 	}
 | |
| 
 | |
| 	var res *gomemcached.MCResponse
 | |
| 	err = b.Do(k, func(mc *memcached.Client, vb uint16) error {
 | |
| 		if opt&AddOnly != 0 {
 | |
| 			res, err = memcached.UnwrapMemcachedError(
 | |
| 				mc.Add(vb, k, flags, exp, data))
 | |
| 			if err == nil && res.Status != gomemcached.SUCCESS {
 | |
| 				if res.Status == gomemcached.KEY_EEXISTS {
 | |
| 					err = ErrKeyExists
 | |
| 				} else {
 | |
| 					err = res
 | |
| 				}
 | |
| 			}
 | |
| 		} else if opt&Append != 0 {
 | |
| 			res, err = mc.Append(vb, k, data)
 | |
| 		} else if data == nil {
 | |
| 			res, err = mc.Del(vb, k)
 | |
| 		} else {
 | |
| 			res, err = mc.Set(vb, k, flags, exp, data)
 | |
| 		}
 | |
| 
 | |
| 		if len(res.Extras) >= 16 {
 | |
| 			vbuuid := uint64(binary.BigEndian.Uint64(res.Extras[0:8]))
 | |
| 			seqNo := uint64(binary.BigEndian.Uint64(res.Extras[8:16]))
 | |
| 			mt = &MutationToken{VBid: vb, Guard: vbuuid, Value: seqNo}
 | |
| 		}
 | |
| 
 | |
| 		return err
 | |
| 	})
 | |
| 
 | |
| 	if err == nil && (opt&(Persist|Indexable) != 0) {
 | |
| 		err = b.WaitForPersistence(k, res.Cas, data == nil)
 | |
| 	}
 | |
| 
 | |
| 	return mt, err
 | |
| }
 | |
| 
 | |
| // Set a value in this bucket with Cas and return the new Cas value
 | |
| func (b *Bucket) Cas(k string, exp int, cas uint64, v interface{}) (uint64, error) {
 | |
| 	return b.WriteCas(k, 0, exp, cas, v, 0)
 | |
| }
 | |
| 
 | |
| // Set a value in this bucket with Cas without json encoding it
 | |
| func (b *Bucket) CasRaw(k string, exp int, cas uint64, v interface{}) (uint64, error) {
 | |
| 	return b.WriteCas(k, 0, exp, cas, v, Raw)
 | |
| }
 | |
| 
 | |
| func (b *Bucket) WriteCas(k string, flags, exp int, cas uint64, v interface{},
 | |
| 	opt WriteOptions) (newCas uint64, err error) {
 | |
| 
 | |
| 	if ClientOpCallback != nil {
 | |
| 		defer func(t time.Time) {
 | |
| 			ClientOpCallback(fmt.Sprintf("Write(%v)", opt), k, t, err)
 | |
| 		}(time.Now())
 | |
| 	}
 | |
| 
 | |
| 	var data []byte
 | |
| 	if opt&Raw == 0 {
 | |
| 		data, err = json.Marshal(v)
 | |
| 		if err != nil {
 | |
| 			return 0, err
 | |
| 		}
 | |
| 	} else if v != nil {
 | |
| 		data = v.([]byte)
 | |
| 	}
 | |
| 
 | |
| 	var res *gomemcached.MCResponse
 | |
| 	err = b.Do(k, func(mc *memcached.Client, vb uint16) error {
 | |
| 		res, err = mc.SetCas(vb, k, flags, exp, cas, data)
 | |
| 		return err
 | |
| 	})
 | |
| 
 | |
| 	if err == nil && (opt&(Persist|Indexable) != 0) {
 | |
| 		err = b.WaitForPersistence(k, res.Cas, data == nil)
 | |
| 	}
 | |
| 
 | |
| 	return res.Cas, err
 | |
| }
 | |
| 
 | |
| // Extended CAS operation. These functions will return the mutation token, i.e vbuuid & guard
 | |
| func (b *Bucket) CasWithMeta(k string, flags int, exp int, cas uint64, v interface{}) (uint64, *MutationToken, error) {
 | |
| 	return b.WriteCasWithMT(k, flags, exp, cas, v, 0)
 | |
| }
 | |
| 
 | |
| func (b *Bucket) CasWithMetaRaw(k string, flags int, exp int, cas uint64, v interface{}) (uint64, *MutationToken, error) {
 | |
| 	return b.WriteCasWithMT(k, flags, exp, cas, v, Raw)
 | |
| }
 | |
| 
 | |
| func (b *Bucket) WriteCasWithMT(k string, flags, exp int, cas uint64, v interface{},
 | |
| 	opt WriteOptions) (newCas uint64, mt *MutationToken, err error) {
 | |
| 
 | |
| 	if ClientOpCallback != nil {
 | |
| 		defer func(t time.Time) {
 | |
| 			ClientOpCallback(fmt.Sprintf("Write(%v)", opt), k, t, err)
 | |
| 		}(time.Now())
 | |
| 	}
 | |
| 
 | |
| 	var data []byte
 | |
| 	if opt&Raw == 0 {
 | |
| 		data, err = json.Marshal(v)
 | |
| 		if err != nil {
 | |
| 			return 0, nil, err
 | |
| 		}
 | |
| 	} else if v != nil {
 | |
| 		data = v.([]byte)
 | |
| 	}
 | |
| 
 | |
| 	var res *gomemcached.MCResponse
 | |
| 	err = b.Do(k, func(mc *memcached.Client, vb uint16) error {
 | |
| 		res, err = mc.SetCas(vb, k, flags, exp, cas, data)
 | |
| 		return err
 | |
| 	})
 | |
| 
 | |
| 	if err != nil {
 | |
| 		return 0, nil, err
 | |
| 	}
 | |
| 
 | |
| 	// check for extras
 | |
| 	if len(res.Extras) >= 16 {
 | |
| 		vbuuid := uint64(binary.BigEndian.Uint64(res.Extras[0:8]))
 | |
| 		seqNo := uint64(binary.BigEndian.Uint64(res.Extras[8:16]))
 | |
| 		vb := b.VBHash(k)
 | |
| 		mt = &MutationToken{VBid: uint16(vb), Guard: vbuuid, Value: seqNo}
 | |
| 	}
 | |
| 
 | |
| 	if err == nil && (opt&(Persist|Indexable) != 0) {
 | |
| 		err = b.WaitForPersistence(k, res.Cas, data == nil)
 | |
| 	}
 | |
| 
 | |
| 	return res.Cas, mt, err
 | |
| }
 | |
| 
 | |
| // Set a value in this bucket.
 | |
| // The value will be serialized into a JSON document.
 | |
| func (b *Bucket) Set(k string, exp int, v interface{}) error {
 | |
| 	return b.Write(k, 0, exp, v, 0)
 | |
| }
 | |
| 
 | |
| // Set a value in this bucket with with flags
 | |
| func (b *Bucket) SetWithMeta(k string, flags int, exp int, v interface{}) (*MutationToken, error) {
 | |
| 	return b.WriteWithMT(k, flags, exp, v, 0)
 | |
| }
 | |
| 
 | |
| // SetRaw sets a value in this bucket without JSON encoding it.
 | |
| func (b *Bucket) SetRaw(k string, exp int, v []byte) error {
 | |
| 	return b.Write(k, 0, exp, v, Raw)
 | |
| }
 | |
| 
 | |
| // Add adds a value to this bucket; like Set except that nothing
 | |
| // happens if the key exists.  The value will be serialized into a
 | |
| // JSON document.
 | |
| func (b *Bucket) Add(k string, exp int, v interface{}) (added bool, err error) {
 | |
| 	err = b.Write(k, 0, exp, v, AddOnly)
 | |
| 	if err == ErrKeyExists {
 | |
| 		return false, nil
 | |
| 	}
 | |
| 	return (err == nil), err
 | |
| }
 | |
| 
 | |
| // AddRaw adds a value to this bucket; like SetRaw except that nothing
 | |
| // happens if the key exists.  The value will be stored as raw bytes.
 | |
| func (b *Bucket) AddRaw(k string, exp int, v []byte) (added bool, err error) {
 | |
| 	err = b.Write(k, 0, exp, v, AddOnly|Raw)
 | |
| 	if err == ErrKeyExists {
 | |
| 		return false, nil
 | |
| 	}
 | |
| 	return (err == nil), err
 | |
| }
 | |
| 
 | |
| // Add adds a value to this bucket; like Set except that nothing
 | |
| // happens if the key exists.  The value will be serialized into a
 | |
| // JSON document.
 | |
| func (b *Bucket) AddWithMT(k string, exp int, v interface{}) (added bool, mt *MutationToken, err error) {
 | |
| 	mt, err = b.WriteWithMT(k, 0, exp, v, AddOnly)
 | |
| 	if err == ErrKeyExists {
 | |
| 		return false, mt, nil
 | |
| 	}
 | |
| 	return (err == nil), mt, err
 | |
| }
 | |
| 
 | |
| // AddRaw adds a value to this bucket; like SetRaw except that nothing
 | |
| // happens if the key exists.  The value will be stored as raw bytes.
 | |
| func (b *Bucket) AddRawWithMT(k string, exp int, v []byte) (added bool, mt *MutationToken, err error) {
 | |
| 	mt, err = b.WriteWithMT(k, 0, exp, v, AddOnly|Raw)
 | |
| 	if err == ErrKeyExists {
 | |
| 		return false, mt, nil
 | |
| 	}
 | |
| 	return (err == nil), mt, err
 | |
| }
 | |
| 
 | |
| // Append appends raw data to an existing item.
 | |
| func (b *Bucket) Append(k string, data []byte) error {
 | |
| 	return b.Write(k, 0, 0, data, Append|Raw)
 | |
| }
 | |
| 
 | |
| func (b *Bucket) GetsMCFromCollection(collUid uint32, key string, reqDeadline time.Time) (*gomemcached.MCResponse, error) {
 | |
| 	var err error
 | |
| 	var response *gomemcached.MCResponse
 | |
| 
 | |
| 	if key == "" {
 | |
| 		return nil, nil
 | |
| 	}
 | |
| 
 | |
| 	if ClientOpCallback != nil {
 | |
| 		defer func(t time.Time) { ClientOpCallback("GetsMCFromCollection", key, t, err) }(time.Now())
 | |
| 	}
 | |
| 
 | |
| 	err = b.Do2(key, func(mc *memcached.Client, vb uint16) error {
 | |
| 		var err1 error
 | |
| 
 | |
| 		mc.SetDeadline(getDeadline(reqDeadline, DefaultTimeout))
 | |
| 		_, err1 = mc.SelectBucket(b.Name)
 | |
| 		if err1 != nil {
 | |
| 			mc.SetDeadline(noDeadline)
 | |
| 			return err1
 | |
| 		}
 | |
| 
 | |
| 		mc.SetDeadline(getDeadline(reqDeadline, DefaultTimeout))
 | |
| 		response, err1 = mc.GetFromCollection(vb, collUid, key)
 | |
| 		if err1 != nil {
 | |
| 			mc.SetDeadline(noDeadline)
 | |
| 			return err1
 | |
| 		}
 | |
| 
 | |
| 		return nil
 | |
| 	}, false)
 | |
| 
 | |
| 	return response, err
 | |
| }
 | |
| 
 | |
| // Returns collectionUid, manifestUid, error.
 | |
| func (b *Bucket) GetCollectionCID(scope string, collection string, reqDeadline time.Time) (uint32, uint32, error) {
 | |
| 	var err error
 | |
| 	var response *gomemcached.MCResponse
 | |
| 
 | |
| 	if ClientOpCallback != nil {
 | |
| 		defer func(t time.Time) { ClientOpCallback("GetCollectionCID", scope+"."+collection, t, err) }(time.Now())
 | |
| 	}
 | |
| 
 | |
| 	var key = "DUMMY" // Contact any server.
 | |
| 	var manifestUid uint32
 | |
| 	var collUid uint32
 | |
| 	err = b.Do2(key, func(mc *memcached.Client, vb uint16) error {
 | |
| 		var err1 error
 | |
| 
 | |
| 		mc.SetDeadline(getDeadline(reqDeadline, DefaultTimeout))
 | |
| 		_, err1 = mc.SelectBucket(b.Name)
 | |
| 		if err1 != nil {
 | |
| 			mc.SetDeadline(noDeadline)
 | |
| 			return err1
 | |
| 		}
 | |
| 
 | |
| 		response, err1 = mc.CollectionsGetCID(scope, collection)
 | |
| 		if err1 != nil {
 | |
| 			mc.SetDeadline(noDeadline)
 | |
| 			return err1
 | |
| 		}
 | |
| 
 | |
| 		manifestUid = binary.BigEndian.Uint32(response.Extras[4:8])
 | |
| 		collUid = binary.BigEndian.Uint32(response.Extras[8:12])
 | |
| 
 | |
| 		return nil
 | |
| 	}, false)
 | |
| 
 | |
| 	return collUid, manifestUid, err
 | |
| }
 | |
| 
 | |
| // Get a value straight from Memcached
 | |
| func (b *Bucket) GetsMC(key string, reqDeadline time.Time) (*gomemcached.MCResponse, error) {
 | |
| 	var err error
 | |
| 	var response *gomemcached.MCResponse
 | |
| 
 | |
| 	if key == "" {
 | |
| 		return nil, nil
 | |
| 	}
 | |
| 
 | |
| 	if ClientOpCallback != nil {
 | |
| 		defer func(t time.Time) { ClientOpCallback("GetsMC", key, t, err) }(time.Now())
 | |
| 	}
 | |
| 
 | |
| 	err = b.Do2(key, func(mc *memcached.Client, vb uint16) error {
 | |
| 		var err1 error
 | |
| 
 | |
| 		mc.SetDeadline(getDeadline(reqDeadline, DefaultTimeout))
 | |
| 		response, err1 = mc.Get(vb, key)
 | |
| 		mc.SetDeadline(noDeadline)
 | |
| 		if err1 != nil {
 | |
| 			return err1
 | |
| 		}
 | |
| 		return nil
 | |
| 	}, false)
 | |
| 	return response, err
 | |
| }
 | |
| 
 | |
| // Get a value through the subdoc API
 | |
| func (b *Bucket) GetsSubDoc(key string, reqDeadline time.Time, subPaths []string) (*gomemcached.MCResponse, error) {
 | |
| 	var err error
 | |
| 	var response *gomemcached.MCResponse
 | |
| 
 | |
| 	if key == "" {
 | |
| 		return nil, nil
 | |
| 	}
 | |
| 
 | |
| 	if ClientOpCallback != nil {
 | |
| 		defer func(t time.Time) { ClientOpCallback("GetsSubDoc", key, t, err) }(time.Now())
 | |
| 	}
 | |
| 
 | |
| 	err = b.Do2(key, func(mc *memcached.Client, vb uint16) error {
 | |
| 		var err1 error
 | |
| 
 | |
| 		mc.SetDeadline(getDeadline(reqDeadline, DefaultTimeout))
 | |
| 		response, err1 = mc.GetSubdoc(vb, key, subPaths)
 | |
| 		mc.SetDeadline(noDeadline)
 | |
| 		if err1 != nil {
 | |
| 			return err1
 | |
| 		}
 | |
| 		return nil
 | |
| 	}, false)
 | |
| 	return response, err
 | |
| }
 | |
| 
 | |
| // GetsRaw gets a raw value from this bucket including its CAS
 | |
| // counter and flags.
 | |
| func (b *Bucket) GetsRaw(k string) (data []byte, flags int,
 | |
| 	cas uint64, err error) {
 | |
| 
 | |
| 	if ClientOpCallback != nil {
 | |
| 		defer func(t time.Time) { ClientOpCallback("GetsRaw", k, t, err) }(time.Now())
 | |
| 	}
 | |
| 
 | |
| 	err = b.Do(k, func(mc *memcached.Client, vb uint16) error {
 | |
| 		res, err := mc.Get(vb, k)
 | |
| 		if err != nil {
 | |
| 			return err
 | |
| 		}
 | |
| 		cas = res.Cas
 | |
| 		if len(res.Extras) >= 4 {
 | |
| 			flags = int(binary.BigEndian.Uint32(res.Extras))
 | |
| 		}
 | |
| 		data = res.Body
 | |
| 		return nil
 | |
| 	})
 | |
| 	return
 | |
| }
 | |
| 
 | |
| // Gets gets a value from this bucket, including its CAS counter.  The
 | |
| // value is expected to be a JSON stream and will be deserialized into
 | |
| // rv.
 | |
| func (b *Bucket) Gets(k string, rv interface{}, caso *uint64) error {
 | |
| 	data, _, cas, err := b.GetsRaw(k)
 | |
| 	if err != nil {
 | |
| 		return err
 | |
| 	}
 | |
| 	if caso != nil {
 | |
| 		*caso = cas
 | |
| 	}
 | |
| 	return json.Unmarshal(data, rv)
 | |
| }
 | |
| 
 | |
| // Get a value from this bucket.
 | |
| // The value is expected to be a JSON stream and will be deserialized
 | |
| // into rv.
 | |
| func (b *Bucket) Get(k string, rv interface{}) error {
 | |
| 	return b.Gets(k, rv, nil)
 | |
| }
 | |
| 
 | |
| // GetRaw gets a raw value from this bucket.  No marshaling is performed.
 | |
| func (b *Bucket) GetRaw(k string) ([]byte, error) {
 | |
| 	d, _, _, err := b.GetsRaw(k)
 | |
| 	return d, err
 | |
| }
 | |
| 
 | |
| // GetAndTouchRaw gets a raw value from this bucket including its CAS
 | |
| // counter and flags, and updates the expiry on the doc.
 | |
| func (b *Bucket) GetAndTouchRaw(k string, exp int) (data []byte,
 | |
| 	cas uint64, err error) {
 | |
| 
 | |
| 	if ClientOpCallback != nil {
 | |
| 		defer func(t time.Time) { ClientOpCallback("GetsRaw", k, t, err) }(time.Now())
 | |
| 	}
 | |
| 
 | |
| 	err = b.Do(k, func(mc *memcached.Client, vb uint16) error {
 | |
| 		res, err := mc.GetAndTouch(vb, k, exp)
 | |
| 		if err != nil {
 | |
| 			return err
 | |
| 		}
 | |
| 		cas = res.Cas
 | |
| 		data = res.Body
 | |
| 		return nil
 | |
| 	})
 | |
| 	return data, cas, err
 | |
| }
 | |
| 
 | |
| // GetMeta returns the meta values for a key
 | |
| func (b *Bucket) GetMeta(k string, flags *int, expiry *int, cas *uint64, seqNo *uint64) (err error) {
 | |
| 
 | |
| 	if ClientOpCallback != nil {
 | |
| 		defer func(t time.Time) { ClientOpCallback("GetsMeta", k, t, err) }(time.Now())
 | |
| 	}
 | |
| 
 | |
| 	err = b.Do(k, func(mc *memcached.Client, vb uint16) error {
 | |
| 		res, err := mc.GetMeta(vb, k)
 | |
| 		if err != nil {
 | |
| 			return err
 | |
| 		}
 | |
| 
 | |
| 		*cas = res.Cas
 | |
| 		if len(res.Extras) >= 8 {
 | |
| 			*flags = int(binary.BigEndian.Uint32(res.Extras[4:]))
 | |
| 		}
 | |
| 
 | |
| 		if len(res.Extras) >= 12 {
 | |
| 			*expiry = int(binary.BigEndian.Uint32(res.Extras[8:]))
 | |
| 		}
 | |
| 
 | |
| 		if len(res.Extras) >= 20 {
 | |
| 			*seqNo = uint64(binary.BigEndian.Uint64(res.Extras[12:]))
 | |
| 		}
 | |
| 
 | |
| 		return nil
 | |
| 	})
 | |
| 
 | |
| 	return err
 | |
| }
 | |
| 
 | |
| // Delete a key from this bucket.
 | |
| func (b *Bucket) Delete(k string) error {
 | |
| 	return b.Write(k, 0, 0, nil, Raw)
 | |
| }
 | |
| 
 | |
| // Incr increments the value at a given key by amt and defaults to def if no value present.
 | |
| func (b *Bucket) Incr(k string, amt, def uint64, exp int) (val uint64, err error) {
 | |
| 	if ClientOpCallback != nil {
 | |
| 		defer func(t time.Time) { ClientOpCallback("Incr", k, t, err) }(time.Now())
 | |
| 	}
 | |
| 
 | |
| 	var rv uint64
 | |
| 	err = b.Do(k, func(mc *memcached.Client, vb uint16) error {
 | |
| 		res, err := mc.Incr(vb, k, amt, def, exp)
 | |
| 		if err != nil {
 | |
| 			return err
 | |
| 		}
 | |
| 		rv = res
 | |
| 		return nil
 | |
| 	})
 | |
| 	return rv, err
 | |
| }
 | |
| 
 | |
| // Decr decrements the value at a given key by amt and defaults to def if no value present
 | |
| func (b *Bucket) Decr(k string, amt, def uint64, exp int) (val uint64, err error) {
 | |
| 	if ClientOpCallback != nil {
 | |
| 		defer func(t time.Time) { ClientOpCallback("Decr", k, t, err) }(time.Now())
 | |
| 	}
 | |
| 
 | |
| 	var rv uint64
 | |
| 	err = b.Do(k, func(mc *memcached.Client, vb uint16) error {
 | |
| 		res, err := mc.Decr(vb, k, amt, def, exp)
 | |
| 		if err != nil {
 | |
| 			return err
 | |
| 		}
 | |
| 		rv = res
 | |
| 		return nil
 | |
| 	})
 | |
| 	return rv, err
 | |
| }
 | |
| 
 | |
| // Wrapper around memcached.CASNext()
 | |
| func (b *Bucket) casNext(k string, exp int, state *memcached.CASState) bool {
 | |
| 	if ClientOpCallback != nil {
 | |
| 		defer func(t time.Time) {
 | |
| 			ClientOpCallback("casNext", k, t, state.Err)
 | |
| 		}(time.Now())
 | |
| 	}
 | |
| 
 | |
| 	keepGoing := false
 | |
| 	state.Err = b.Do(k, func(mc *memcached.Client, vb uint16) error {
 | |
| 		keepGoing = mc.CASNext(vb, k, exp, state)
 | |
| 		return state.Err
 | |
| 	})
 | |
| 	return keepGoing && state.Err == nil
 | |
| }
 | |
| 
 | |
| // An UpdateFunc is a callback function to update a document
 | |
| type UpdateFunc func(current []byte) (updated []byte, err error)
 | |
| 
 | |
| // Return this as the error from an UpdateFunc to cancel the Update
 | |
| // operation.
 | |
| const UpdateCancel = memcached.CASQuit
 | |
| 
 | |
| // Update performs a Safe update of a document, avoiding conflicts by
 | |
| // using CAS.
 | |
| //
 | |
| // The callback function will be invoked with the current raw document
 | |
| // contents (or nil if the document doesn't exist); it should return
 | |
| // the updated raw contents (or nil to delete.)  If it decides not to
 | |
| // change anything it can return UpdateCancel as the error.
 | |
| //
 | |
| // If another writer modifies the document between the get and the
 | |
| // set, the callback will be invoked again with the newer value.
 | |
| func (b *Bucket) Update(k string, exp int, callback UpdateFunc) error {
 | |
| 	_, err := b.update(k, exp, callback)
 | |
| 	return err
 | |
| }
 | |
| 
 | |
| // internal version of Update that returns a CAS value
 | |
| func (b *Bucket) update(k string, exp int, callback UpdateFunc) (newCas uint64, err error) {
 | |
| 	var state memcached.CASState
 | |
| 	for b.casNext(k, exp, &state) {
 | |
| 		var err error
 | |
| 		if state.Value, err = callback(state.Value); err != nil {
 | |
| 			return 0, err
 | |
| 		}
 | |
| 	}
 | |
| 	return state.Cas, state.Err
 | |
| }
 | |
| 
 | |
| // A WriteUpdateFunc is a callback function to update a document
 | |
| type WriteUpdateFunc func(current []byte) (updated []byte, opt WriteOptions, err error)
 | |
| 
 | |
| // WriteUpdate performs a Safe update of a document, avoiding
 | |
| // conflicts by using CAS.  WriteUpdate is like Update, except that
 | |
| // the callback can return a set of WriteOptions, of which Persist and
 | |
| // Indexable are recognized: these cause the call to wait until the
 | |
| // document update has been persisted to disk and/or become available
 | |
| // to index.
 | |
| func (b *Bucket) WriteUpdate(k string, exp int, callback WriteUpdateFunc) error {
 | |
| 	var writeOpts WriteOptions
 | |
| 	var deletion bool
 | |
| 	// Wrap the callback in an UpdateFunc we can pass to Update:
 | |
| 	updateCallback := func(current []byte) (updated []byte, err error) {
 | |
| 		update, opt, err := callback(current)
 | |
| 		writeOpts = opt
 | |
| 		deletion = (update == nil)
 | |
| 		return update, err
 | |
| 	}
 | |
| 	cas, err := b.update(k, exp, updateCallback)
 | |
| 	if err != nil {
 | |
| 		return err
 | |
| 	}
 | |
| 	// If callback asked, wait for persistence or indexability:
 | |
| 	if writeOpts&(Persist|Indexable) != 0 {
 | |
| 		err = b.WaitForPersistence(k, cas, deletion)
 | |
| 	}
 | |
| 	return err
 | |
| }
 | |
| 
 | |
| // Observe observes the current state of a document.
 | |
| func (b *Bucket) Observe(k string) (result memcached.ObserveResult, err error) {
 | |
| 	if ClientOpCallback != nil {
 | |
| 		defer func(t time.Time) { ClientOpCallback("Observe", k, t, err) }(time.Now())
 | |
| 	}
 | |
| 
 | |
| 	err = b.Do(k, func(mc *memcached.Client, vb uint16) error {
 | |
| 		result, err = mc.Observe(vb, k)
 | |
| 		return err
 | |
| 	})
 | |
| 	return
 | |
| }
 | |
| 
 | |
| // Returned from WaitForPersistence (or Write, if the Persistent or Indexable flag is used)
 | |
| // if the value has been overwritten by another before being persisted.
 | |
| var ErrOverwritten = errors.New("overwritten")
 | |
| 
 | |
| // Returned from WaitForPersistence (or Write, if the Persistent or Indexable flag is used)
 | |
| // if the value hasn't been persisted by the timeout interval
 | |
| var ErrTimeout = errors.New("timeout")
 | |
| 
 | |
| // WaitForPersistence waits for an item to be considered durable.
 | |
| //
 | |
| // Besides transport errors, ErrOverwritten may be returned if the
 | |
| // item is overwritten before it reaches durability.  ErrTimeout may
 | |
| // occur if the item isn't found durable in a reasonable amount of
 | |
| // time.
 | |
| func (b *Bucket) WaitForPersistence(k string, cas uint64, deletion bool) error {
 | |
| 	timeout := 10 * time.Second
 | |
| 	sleepDelay := 5 * time.Millisecond
 | |
| 	start := time.Now()
 | |
| 	for {
 | |
| 		time.Sleep(sleepDelay)
 | |
| 		sleepDelay += sleepDelay / 2 // multiply delay by 1.5 every time
 | |
| 
 | |
| 		result, err := b.Observe(k)
 | |
| 		if err != nil {
 | |
| 			return err
 | |
| 		}
 | |
| 		if persisted, overwritten := result.CheckPersistence(cas, deletion); overwritten {
 | |
| 			return ErrOverwritten
 | |
| 		} else if persisted {
 | |
| 			return nil
 | |
| 		}
 | |
| 
 | |
| 		if result.PersistenceTime > 0 {
 | |
| 			timeout = 2 * result.PersistenceTime
 | |
| 		}
 | |
| 		if time.Since(start) >= timeout-sleepDelay {
 | |
| 			return ErrTimeout
 | |
| 		}
 | |
| 	}
 | |
| }
 | |
| 
 | |
| var _STRING_MCRESPONSE_POOL = gomemcached.NewStringMCResponsePool(16)
 | |
| 
 | |
| type stringPool struct {
 | |
| 	pool *sync.Pool
 | |
| 	size int
 | |
| }
 | |
| 
 | |
| func newStringPool(size int) *stringPool {
 | |
| 	rv := &stringPool{
 | |
| 		pool: &sync.Pool{
 | |
| 			New: func() interface{} {
 | |
| 				return make([]string, 0, size)
 | |
| 			},
 | |
| 		},
 | |
| 		size: size,
 | |
| 	}
 | |
| 
 | |
| 	return rv
 | |
| }
 | |
| 
 | |
| func (this *stringPool) Get() []string {
 | |
| 	return this.pool.Get().([]string)
 | |
| }
 | |
| 
 | |
| func (this *stringPool) Put(s []string) {
 | |
| 	if s == nil || cap(s) < this.size || cap(s) > 2*this.size {
 | |
| 		return
 | |
| 	}
 | |
| 
 | |
| 	this.pool.Put(s[0:0])
 | |
| }
 | |
| 
 | |
| var _STRING_POOL = newStringPool(16)
 | |
| 
 | |
| type vbStringPool struct {
 | |
| 	pool    *sync.Pool
 | |
| 	strPool *stringPool
 | |
| }
 | |
| 
 | |
| func newVBStringPool(size int, sp *stringPool) *vbStringPool {
 | |
| 	rv := &vbStringPool{
 | |
| 		pool: &sync.Pool{
 | |
| 			New: func() interface{} {
 | |
| 				return make(map[uint16][]string, size)
 | |
| 			},
 | |
| 		},
 | |
| 		strPool: sp,
 | |
| 	}
 | |
| 
 | |
| 	return rv
 | |
| }
 | |
| 
 | |
| func (this *vbStringPool) Get() map[uint16][]string {
 | |
| 	return this.pool.Get().(map[uint16][]string)
 | |
| }
 | |
| 
 | |
| func (this *vbStringPool) Put(s map[uint16][]string) {
 | |
| 	if s == nil {
 | |
| 		return
 | |
| 	}
 | |
| 
 | |
| 	for k, v := range s {
 | |
| 		delete(s, k)
 | |
| 		this.strPool.Put(v)
 | |
| 	}
 | |
| 
 | |
| 	this.pool.Put(s)
 | |
| }
 | |
| 
 | |
| var _VB_STRING_POOL = newVBStringPool(16, _STRING_POOL)
 | |
| 
 |