You can not select more than 25 topics
			Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.
		
		
		
		
		
			
		
			
				
					
					
						
							261 lines
						
					
					
						
							6.0 KiB
						
					
					
				
			
		
		
	
	
							261 lines
						
					
					
						
							6.0 KiB
						
					
					
				| // Copyright (c) 2012, Suryandaru Triandana <syndtr@gmail.com>
 | |
| // All rights reserved.
 | |
| //
 | |
| // Use of this source code is governed by a BSD-style license that can be
 | |
| // found in the LICENSE file.
 | |
| 
 | |
| package leveldb
 | |
| 
 | |
| import (
 | |
| 	"encoding/binary"
 | |
| 	"fmt"
 | |
| 
 | |
| 	"github.com/syndtr/goleveldb/leveldb/errors"
 | |
| 	"github.com/syndtr/goleveldb/leveldb/memdb"
 | |
| 	"github.com/syndtr/goleveldb/leveldb/storage"
 | |
| )
 | |
| 
 | |
| // ErrBatchCorrupted records reason of batch corruption.
 | |
| type ErrBatchCorrupted struct {
 | |
| 	Reason string
 | |
| }
 | |
| 
 | |
| func (e *ErrBatchCorrupted) Error() string {
 | |
| 	return fmt.Sprintf("leveldb: batch corrupted: %s", e.Reason)
 | |
| }
 | |
| 
 | |
| func newErrBatchCorrupted(reason string) error {
 | |
| 	return errors.NewErrCorrupted(storage.FileDesc{}, &ErrBatchCorrupted{reason})
 | |
| }
 | |
| 
 | |
| const (
 | |
| 	batchHdrLen  = 8 + 4
 | |
| 	batchGrowRec = 3000
 | |
| )
 | |
| 
 | |
| // BatchReplay wraps basic batch operations.
 | |
| type BatchReplay interface {
 | |
| 	Put(key, value []byte)
 | |
| 	Delete(key []byte)
 | |
| }
 | |
| 
 | |
| // Batch is a write batch.
 | |
| type Batch struct {
 | |
| 	data       []byte
 | |
| 	rLen, bLen int
 | |
| 	seq        uint64
 | |
| 	sync       bool
 | |
| }
 | |
| 
 | |
| func (b *Batch) grow(n int) {
 | |
| 	off := len(b.data)
 | |
| 	if off == 0 {
 | |
| 		off = batchHdrLen
 | |
| 		if b.data != nil {
 | |
| 			b.data = b.data[:off]
 | |
| 		}
 | |
| 	}
 | |
| 	if cap(b.data)-off < n {
 | |
| 		if b.data == nil {
 | |
| 			b.data = make([]byte, off, off+n)
 | |
| 		} else {
 | |
| 			odata := b.data
 | |
| 			div := 1
 | |
| 			if b.rLen > batchGrowRec {
 | |
| 				div = b.rLen / batchGrowRec
 | |
| 			}
 | |
| 			b.data = make([]byte, off, off+n+(off-batchHdrLen)/div)
 | |
| 			copy(b.data, odata)
 | |
| 		}
 | |
| 	}
 | |
| }
 | |
| 
 | |
| func (b *Batch) appendRec(kt keyType, key, value []byte) {
 | |
| 	n := 1 + binary.MaxVarintLen32 + len(key)
 | |
| 	if kt == keyTypeVal {
 | |
| 		n += binary.MaxVarintLen32 + len(value)
 | |
| 	}
 | |
| 	b.grow(n)
 | |
| 	off := len(b.data)
 | |
| 	data := b.data[:off+n]
 | |
| 	data[off] = byte(kt)
 | |
| 	off++
 | |
| 	off += binary.PutUvarint(data[off:], uint64(len(key)))
 | |
| 	copy(data[off:], key)
 | |
| 	off += len(key)
 | |
| 	if kt == keyTypeVal {
 | |
| 		off += binary.PutUvarint(data[off:], uint64(len(value)))
 | |
| 		copy(data[off:], value)
 | |
| 		off += len(value)
 | |
| 	}
 | |
| 	b.data = data[:off]
 | |
| 	b.rLen++
 | |
| 	//  Include 8-byte ikey header
 | |
| 	b.bLen += len(key) + len(value) + 8
 | |
| }
 | |
| 
 | |
| // Put appends 'put operation' of the given key/value pair to the batch.
 | |
| // It is safe to modify the contents of the argument after Put returns.
 | |
| func (b *Batch) Put(key, value []byte) {
 | |
| 	b.appendRec(keyTypeVal, key, value)
 | |
| }
 | |
| 
 | |
| // Delete appends 'delete operation' of the given key to the batch.
 | |
| // It is safe to modify the contents of the argument after Delete returns.
 | |
| func (b *Batch) Delete(key []byte) {
 | |
| 	b.appendRec(keyTypeDel, key, nil)
 | |
| }
 | |
| 
 | |
| // Dump dumps batch contents. The returned slice can be loaded into the
 | |
| // batch using Load method.
 | |
| // The returned slice is not its own copy, so the contents should not be
 | |
| // modified.
 | |
| func (b *Batch) Dump() []byte {
 | |
| 	return b.encode()
 | |
| }
 | |
| 
 | |
| // Load loads given slice into the batch. Previous contents of the batch
 | |
| // will be discarded.
 | |
| // The given slice will not be copied and will be used as batch buffer, so
 | |
| // it is not safe to modify the contents of the slice.
 | |
| func (b *Batch) Load(data []byte) error {
 | |
| 	return b.decode(0, data)
 | |
| }
 | |
| 
 | |
| // Replay replays batch contents.
 | |
| func (b *Batch) Replay(r BatchReplay) error {
 | |
| 	return b.decodeRec(func(i int, kt keyType, key, value []byte) error {
 | |
| 		switch kt {
 | |
| 		case keyTypeVal:
 | |
| 			r.Put(key, value)
 | |
| 		case keyTypeDel:
 | |
| 			r.Delete(key)
 | |
| 		}
 | |
| 		return nil
 | |
| 	})
 | |
| }
 | |
| 
 | |
| // Len returns number of records in the batch.
 | |
| func (b *Batch) Len() int {
 | |
| 	return b.rLen
 | |
| }
 | |
| 
 | |
| // Reset resets the batch.
 | |
| func (b *Batch) Reset() {
 | |
| 	b.data = b.data[:0]
 | |
| 	b.seq = 0
 | |
| 	b.rLen = 0
 | |
| 	b.bLen = 0
 | |
| 	b.sync = false
 | |
| }
 | |
| 
 | |
| func (b *Batch) init(sync bool) {
 | |
| 	b.sync = sync
 | |
| }
 | |
| 
 | |
| func (b *Batch) append(p *Batch) {
 | |
| 	if p.rLen > 0 {
 | |
| 		b.grow(len(p.data) - batchHdrLen)
 | |
| 		b.data = append(b.data, p.data[batchHdrLen:]...)
 | |
| 		b.rLen += p.rLen
 | |
| 	}
 | |
| 	if p.sync {
 | |
| 		b.sync = true
 | |
| 	}
 | |
| }
 | |
| 
 | |
| // size returns sums of key/value pair length plus 8-bytes ikey.
 | |
| func (b *Batch) size() int {
 | |
| 	return b.bLen
 | |
| }
 | |
| 
 | |
| func (b *Batch) encode() []byte {
 | |
| 	b.grow(0)
 | |
| 	binary.LittleEndian.PutUint64(b.data, b.seq)
 | |
| 	binary.LittleEndian.PutUint32(b.data[8:], uint32(b.rLen))
 | |
| 
 | |
| 	return b.data
 | |
| }
 | |
| 
 | |
| func (b *Batch) decode(prevSeq uint64, data []byte) error {
 | |
| 	if len(data) < batchHdrLen {
 | |
| 		return newErrBatchCorrupted("too short")
 | |
| 	}
 | |
| 
 | |
| 	b.seq = binary.LittleEndian.Uint64(data)
 | |
| 	if b.seq < prevSeq {
 | |
| 		return newErrBatchCorrupted("invalid sequence number")
 | |
| 	}
 | |
| 	b.rLen = int(binary.LittleEndian.Uint32(data[8:]))
 | |
| 	if b.rLen < 0 {
 | |
| 		return newErrBatchCorrupted("invalid records length")
 | |
| 	}
 | |
| 	// No need to be precise at this point, it won't be used anyway
 | |
| 	b.bLen = len(data) - batchHdrLen
 | |
| 	b.data = data
 | |
| 
 | |
| 	return nil
 | |
| }
 | |
| 
 | |
| func (b *Batch) decodeRec(f func(i int, kt keyType, key, value []byte) error) error {
 | |
| 	off := batchHdrLen
 | |
| 	for i := 0; i < b.rLen; i++ {
 | |
| 		if off >= len(b.data) {
 | |
| 			return newErrBatchCorrupted("invalid records length")
 | |
| 		}
 | |
| 
 | |
| 		kt := keyType(b.data[off])
 | |
| 		if kt > keyTypeVal {
 | |
| 			panic(kt)
 | |
| 			return newErrBatchCorrupted("bad record: invalid type")
 | |
| 		}
 | |
| 		off++
 | |
| 
 | |
| 		x, n := binary.Uvarint(b.data[off:])
 | |
| 		off += n
 | |
| 		if n <= 0 || off+int(x) > len(b.data) {
 | |
| 			return newErrBatchCorrupted("bad record: invalid key length")
 | |
| 		}
 | |
| 		key := b.data[off : off+int(x)]
 | |
| 		off += int(x)
 | |
| 		var value []byte
 | |
| 		if kt == keyTypeVal {
 | |
| 			x, n := binary.Uvarint(b.data[off:])
 | |
| 			off += n
 | |
| 			if n <= 0 || off+int(x) > len(b.data) {
 | |
| 				return newErrBatchCorrupted("bad record: invalid value length")
 | |
| 			}
 | |
| 			value = b.data[off : off+int(x)]
 | |
| 			off += int(x)
 | |
| 		}
 | |
| 
 | |
| 		if err := f(i, kt, key, value); err != nil {
 | |
| 			return err
 | |
| 		}
 | |
| 	}
 | |
| 
 | |
| 	return nil
 | |
| }
 | |
| 
 | |
| func (b *Batch) memReplay(to *memdb.DB) error {
 | |
| 	var ikScratch []byte
 | |
| 	return b.decodeRec(func(i int, kt keyType, key, value []byte) error {
 | |
| 		ikScratch = makeInternalKey(ikScratch, key, b.seq+uint64(i), kt)
 | |
| 		return to.Put(ikScratch, value)
 | |
| 	})
 | |
| }
 | |
| 
 | |
| func (b *Batch) memDecodeAndReplay(prevSeq uint64, data []byte, to *memdb.DB) error {
 | |
| 	if err := b.decode(prevSeq, data); err != nil {
 | |
| 		return err
 | |
| 	}
 | |
| 	return b.memReplay(to)
 | |
| }
 | |
| 
 | |
| func (b *Batch) revertMemReplay(to *memdb.DB) error {
 | |
| 	var ikScratch []byte
 | |
| 	return b.decodeRec(func(i int, kt keyType, key, value []byte) error {
 | |
| 		ikScratch := makeInternalKey(ikScratch, key, b.seq+uint64(i), kt)
 | |
| 		return to.Delete(ikScratch)
 | |
| 	})
 | |
| }
 | |
| 
 |