You can not select more than 25 topics
			Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.
		
		
		
		
		
			
		
			
				
					
					
						
							524 lines
						
					
					
						
							13 KiB
						
					
					
				
			
		
		
	
	
							524 lines
						
					
					
						
							13 KiB
						
					
					
				| // Copyright 2011 The LevelDB-Go Authors. All rights reserved.
 | |
| // Use of this source code is governed by a BSD-style
 | |
| // license that can be found in the LICENSE file.
 | |
| 
 | |
| // Taken from: https://code.google.com/p/leveldb-go/source/browse/leveldb/record/record.go?r=1d5ccbe03246da926391ee12d1c6caae054ff4b0
 | |
| // License, authors and contributors informations can be found at bellow URLs respectively:
 | |
| // 	https://code.google.com/p/leveldb-go/source/browse/LICENSE
 | |
| //	https://code.google.com/p/leveldb-go/source/browse/AUTHORS
 | |
| //  https://code.google.com/p/leveldb-go/source/browse/CONTRIBUTORS
 | |
| 
 | |
| // Package journal reads and writes sequences of journals. Each journal is a stream
 | |
| // of bytes that completes before the next journal starts.
 | |
| //
 | |
| // When reading, call Next to obtain an io.Reader for the next journal. Next will
 | |
| // return io.EOF when there are no more journals. It is valid to call Next
 | |
| // without reading the current journal to exhaustion.
 | |
| //
 | |
| // When writing, call Next to obtain an io.Writer for the next journal. Calling
 | |
| // Next finishes the current journal. Call Close to finish the final journal.
 | |
| //
 | |
| // Optionally, call Flush to finish the current journal and flush the underlying
 | |
| // writer without starting a new journal. To start a new journal after flushing,
 | |
| // call Next.
 | |
| //
 | |
| // Neither Readers or Writers are safe to use concurrently.
 | |
| //
 | |
| // Example code:
 | |
| //	func read(r io.Reader) ([]string, error) {
 | |
| //		var ss []string
 | |
| //		journals := journal.NewReader(r, nil, true, true)
 | |
| //		for {
 | |
| //			j, err := journals.Next()
 | |
| //			if err == io.EOF {
 | |
| //				break
 | |
| //			}
 | |
| //			if err != nil {
 | |
| //				return nil, err
 | |
| //			}
 | |
| //			s, err := ioutil.ReadAll(j)
 | |
| //			if err != nil {
 | |
| //				return nil, err
 | |
| //			}
 | |
| //			ss = append(ss, string(s))
 | |
| //		}
 | |
| //		return ss, nil
 | |
| //	}
 | |
| //
 | |
| //	func write(w io.Writer, ss []string) error {
 | |
| //		journals := journal.NewWriter(w)
 | |
| //		for _, s := range ss {
 | |
| //			j, err := journals.Next()
 | |
| //			if err != nil {
 | |
| //				return err
 | |
| //			}
 | |
| //			if _, err := j.Write([]byte(s)), err != nil {
 | |
| //				return err
 | |
| //			}
 | |
| //		}
 | |
| //		return journals.Close()
 | |
| //	}
 | |
| //
 | |
| // The wire format is that the stream is divided into 32KiB blocks, and each
 | |
| // block contains a number of tightly packed chunks. Chunks cannot cross block
 | |
| // boundaries. The last block may be shorter than 32 KiB. Any unused bytes in a
 | |
| // block must be zero.
 | |
| //
 | |
| // A journal maps to one or more chunks. Each chunk has a 7 byte header (a 4
 | |
| // byte checksum, a 2 byte little-endian uint16 length, and a 1 byte chunk type)
 | |
| // followed by a payload. The checksum is over the chunk type and the payload.
 | |
| //
 | |
| // There are four chunk types: whether the chunk is the full journal, or the
 | |
| // first, middle or last chunk of a multi-chunk journal. A multi-chunk journal
 | |
| // has one first chunk, zero or more middle chunks, and one last chunk.
 | |
| //
 | |
| // The wire format allows for limited recovery in the face of data corruption:
 | |
| // on a format error (such as a checksum mismatch), the reader moves to the
 | |
| // next block and looks for the next full or first chunk.
 | |
| package journal
 | |
| 
 | |
| import (
 | |
| 	"encoding/binary"
 | |
| 	"fmt"
 | |
| 	"io"
 | |
| 
 | |
| 	"github.com/syndtr/goleveldb/leveldb/errors"
 | |
| 	"github.com/syndtr/goleveldb/leveldb/storage"
 | |
| 	"github.com/syndtr/goleveldb/leveldb/util"
 | |
| )
 | |
| 
 | |
| // These constants are part of the wire format and should not be changed.
 | |
| const (
 | |
| 	fullChunkType   = 1
 | |
| 	firstChunkType  = 2
 | |
| 	middleChunkType = 3
 | |
| 	lastChunkType   = 4
 | |
| )
 | |
| 
 | |
| const (
 | |
| 	blockSize  = 32 * 1024
 | |
| 	headerSize = 7
 | |
| )
 | |
| 
 | |
| type flusher interface {
 | |
| 	Flush() error
 | |
| }
 | |
| 
 | |
| // ErrCorrupted is the error type that generated by corrupted block or chunk.
 | |
| type ErrCorrupted struct {
 | |
| 	Size   int
 | |
| 	Reason string
 | |
| }
 | |
| 
 | |
| func (e *ErrCorrupted) Error() string {
 | |
| 	return fmt.Sprintf("leveldb/journal: block/chunk corrupted: %s (%d bytes)", e.Reason, e.Size)
 | |
| }
 | |
| 
 | |
| // Dropper is the interface that wrap simple Drop method. The Drop
 | |
| // method will be called when the journal reader dropping a block or chunk.
 | |
| type Dropper interface {
 | |
| 	Drop(err error)
 | |
| }
 | |
| 
 | |
| // Reader reads journals from an underlying io.Reader.
 | |
| type Reader struct {
 | |
| 	// r is the underlying reader.
 | |
| 	r io.Reader
 | |
| 	// the dropper.
 | |
| 	dropper Dropper
 | |
| 	// strict flag.
 | |
| 	strict bool
 | |
| 	// checksum flag.
 | |
| 	checksum bool
 | |
| 	// seq is the sequence number of the current journal.
 | |
| 	seq int
 | |
| 	// buf[i:j] is the unread portion of the current chunk's payload.
 | |
| 	// The low bound, i, excludes the chunk header.
 | |
| 	i, j int
 | |
| 	// n is the number of bytes of buf that are valid. Once reading has started,
 | |
| 	// only the final block can have n < blockSize.
 | |
| 	n int
 | |
| 	// last is whether the current chunk is the last chunk of the journal.
 | |
| 	last bool
 | |
| 	// err is any accumulated error.
 | |
| 	err error
 | |
| 	// buf is the buffer.
 | |
| 	buf [blockSize]byte
 | |
| }
 | |
| 
 | |
| // NewReader returns a new reader. The dropper may be nil, and if
 | |
| // strict is true then corrupted or invalid chunk will halt the journal
 | |
| // reader entirely.
 | |
| func NewReader(r io.Reader, dropper Dropper, strict, checksum bool) *Reader {
 | |
| 	return &Reader{
 | |
| 		r:        r,
 | |
| 		dropper:  dropper,
 | |
| 		strict:   strict,
 | |
| 		checksum: checksum,
 | |
| 		last:     true,
 | |
| 	}
 | |
| }
 | |
| 
 | |
| var errSkip = errors.New("leveldb/journal: skipped")
 | |
| 
 | |
| func (r *Reader) corrupt(n int, reason string, skip bool) error {
 | |
| 	if r.dropper != nil {
 | |
| 		r.dropper.Drop(&ErrCorrupted{n, reason})
 | |
| 	}
 | |
| 	if r.strict && !skip {
 | |
| 		r.err = errors.NewErrCorrupted(storage.FileDesc{}, &ErrCorrupted{n, reason})
 | |
| 		return r.err
 | |
| 	}
 | |
| 	return errSkip
 | |
| }
 | |
| 
 | |
| // nextChunk sets r.buf[r.i:r.j] to hold the next chunk's payload, reading the
 | |
| // next block into the buffer if necessary.
 | |
| func (r *Reader) nextChunk(first bool) error {
 | |
| 	for {
 | |
| 		if r.j+headerSize <= r.n {
 | |
| 			checksum := binary.LittleEndian.Uint32(r.buf[r.j+0 : r.j+4])
 | |
| 			length := binary.LittleEndian.Uint16(r.buf[r.j+4 : r.j+6])
 | |
| 			chunkType := r.buf[r.j+6]
 | |
| 			unprocBlock := r.n - r.j
 | |
| 			if checksum == 0 && length == 0 && chunkType == 0 {
 | |
| 				// Drop entire block.
 | |
| 				r.i = r.n
 | |
| 				r.j = r.n
 | |
| 				return r.corrupt(unprocBlock, "zero header", false)
 | |
| 			}
 | |
| 			if chunkType < fullChunkType || chunkType > lastChunkType {
 | |
| 				// Drop entire block.
 | |
| 				r.i = r.n
 | |
| 				r.j = r.n
 | |
| 				return r.corrupt(unprocBlock, fmt.Sprintf("invalid chunk type %#x", chunkType), false)
 | |
| 			}
 | |
| 			r.i = r.j + headerSize
 | |
| 			r.j = r.j + headerSize + int(length)
 | |
| 			if r.j > r.n {
 | |
| 				// Drop entire block.
 | |
| 				r.i = r.n
 | |
| 				r.j = r.n
 | |
| 				return r.corrupt(unprocBlock, "chunk length overflows block", false)
 | |
| 			} else if r.checksum && checksum != util.NewCRC(r.buf[r.i-1:r.j]).Value() {
 | |
| 				// Drop entire block.
 | |
| 				r.i = r.n
 | |
| 				r.j = r.n
 | |
| 				return r.corrupt(unprocBlock, "checksum mismatch", false)
 | |
| 			}
 | |
| 			if first && chunkType != fullChunkType && chunkType != firstChunkType {
 | |
| 				chunkLength := (r.j - r.i) + headerSize
 | |
| 				r.i = r.j
 | |
| 				// Report the error, but skip it.
 | |
| 				return r.corrupt(chunkLength, "orphan chunk", true)
 | |
| 			}
 | |
| 			r.last = chunkType == fullChunkType || chunkType == lastChunkType
 | |
| 			return nil
 | |
| 		}
 | |
| 
 | |
| 		// The last block.
 | |
| 		if r.n < blockSize && r.n > 0 {
 | |
| 			if !first {
 | |
| 				return r.corrupt(0, "missing chunk part", false)
 | |
| 			}
 | |
| 			r.err = io.EOF
 | |
| 			return r.err
 | |
| 		}
 | |
| 
 | |
| 		// Read block.
 | |
| 		n, err := io.ReadFull(r.r, r.buf[:])
 | |
| 		if err != nil && err != io.EOF && err != io.ErrUnexpectedEOF {
 | |
| 			return err
 | |
| 		}
 | |
| 		if n == 0 {
 | |
| 			if !first {
 | |
| 				return r.corrupt(0, "missing chunk part", false)
 | |
| 			}
 | |
| 			r.err = io.EOF
 | |
| 			return r.err
 | |
| 		}
 | |
| 		r.i, r.j, r.n = 0, 0, n
 | |
| 	}
 | |
| }
 | |
| 
 | |
| // Next returns a reader for the next journal. It returns io.EOF if there are no
 | |
| // more journals. The reader returned becomes stale after the next Next call,
 | |
| // and should no longer be used. If strict is false, the reader will returns
 | |
| // io.ErrUnexpectedEOF error when found corrupted journal.
 | |
| func (r *Reader) Next() (io.Reader, error) {
 | |
| 	r.seq++
 | |
| 	if r.err != nil {
 | |
| 		return nil, r.err
 | |
| 	}
 | |
| 	r.i = r.j
 | |
| 	for {
 | |
| 		if err := r.nextChunk(true); err == nil {
 | |
| 			break
 | |
| 		} else if err != errSkip {
 | |
| 			return nil, err
 | |
| 		}
 | |
| 	}
 | |
| 	return &singleReader{r, r.seq, nil}, nil
 | |
| }
 | |
| 
 | |
| // Reset resets the journal reader, allows reuse of the journal reader. Reset returns
 | |
| // last accumulated error.
 | |
| func (r *Reader) Reset(reader io.Reader, dropper Dropper, strict, checksum bool) error {
 | |
| 	r.seq++
 | |
| 	err := r.err
 | |
| 	r.r = reader
 | |
| 	r.dropper = dropper
 | |
| 	r.strict = strict
 | |
| 	r.checksum = checksum
 | |
| 	r.i = 0
 | |
| 	r.j = 0
 | |
| 	r.n = 0
 | |
| 	r.last = true
 | |
| 	r.err = nil
 | |
| 	return err
 | |
| }
 | |
| 
 | |
| type singleReader struct {
 | |
| 	r   *Reader
 | |
| 	seq int
 | |
| 	err error
 | |
| }
 | |
| 
 | |
| func (x *singleReader) Read(p []byte) (int, error) {
 | |
| 	r := x.r
 | |
| 	if r.seq != x.seq {
 | |
| 		return 0, errors.New("leveldb/journal: stale reader")
 | |
| 	}
 | |
| 	if x.err != nil {
 | |
| 		return 0, x.err
 | |
| 	}
 | |
| 	if r.err != nil {
 | |
| 		return 0, r.err
 | |
| 	}
 | |
| 	for r.i == r.j {
 | |
| 		if r.last {
 | |
| 			return 0, io.EOF
 | |
| 		}
 | |
| 		x.err = r.nextChunk(false)
 | |
| 		if x.err != nil {
 | |
| 			if x.err == errSkip {
 | |
| 				x.err = io.ErrUnexpectedEOF
 | |
| 			}
 | |
| 			return 0, x.err
 | |
| 		}
 | |
| 	}
 | |
| 	n := copy(p, r.buf[r.i:r.j])
 | |
| 	r.i += n
 | |
| 	return n, nil
 | |
| }
 | |
| 
 | |
| func (x *singleReader) ReadByte() (byte, error) {
 | |
| 	r := x.r
 | |
| 	if r.seq != x.seq {
 | |
| 		return 0, errors.New("leveldb/journal: stale reader")
 | |
| 	}
 | |
| 	if x.err != nil {
 | |
| 		return 0, x.err
 | |
| 	}
 | |
| 	if r.err != nil {
 | |
| 		return 0, r.err
 | |
| 	}
 | |
| 	for r.i == r.j {
 | |
| 		if r.last {
 | |
| 			return 0, io.EOF
 | |
| 		}
 | |
| 		x.err = r.nextChunk(false)
 | |
| 		if x.err != nil {
 | |
| 			if x.err == errSkip {
 | |
| 				x.err = io.ErrUnexpectedEOF
 | |
| 			}
 | |
| 			return 0, x.err
 | |
| 		}
 | |
| 	}
 | |
| 	c := r.buf[r.i]
 | |
| 	r.i++
 | |
| 	return c, nil
 | |
| }
 | |
| 
 | |
| // Writer writes journals to an underlying io.Writer.
 | |
| type Writer struct {
 | |
| 	// w is the underlying writer.
 | |
| 	w io.Writer
 | |
| 	// seq is the sequence number of the current journal.
 | |
| 	seq int
 | |
| 	// f is w as a flusher.
 | |
| 	f flusher
 | |
| 	// buf[i:j] is the bytes that will become the current chunk.
 | |
| 	// The low bound, i, includes the chunk header.
 | |
| 	i, j int
 | |
| 	// buf[:written] has already been written to w.
 | |
| 	// written is zero unless Flush has been called.
 | |
| 	written int
 | |
| 	// first is whether the current chunk is the first chunk of the journal.
 | |
| 	first bool
 | |
| 	// pending is whether a chunk is buffered but not yet written.
 | |
| 	pending bool
 | |
| 	// err is any accumulated error.
 | |
| 	err error
 | |
| 	// buf is the buffer.
 | |
| 	buf [blockSize]byte
 | |
| }
 | |
| 
 | |
| // NewWriter returns a new Writer.
 | |
| func NewWriter(w io.Writer) *Writer {
 | |
| 	f, _ := w.(flusher)
 | |
| 	return &Writer{
 | |
| 		w: w,
 | |
| 		f: f,
 | |
| 	}
 | |
| }
 | |
| 
 | |
| // fillHeader fills in the header for the pending chunk.
 | |
| func (w *Writer) fillHeader(last bool) {
 | |
| 	if w.i+headerSize > w.j || w.j > blockSize {
 | |
| 		panic("leveldb/journal: bad writer state")
 | |
| 	}
 | |
| 	if last {
 | |
| 		if w.first {
 | |
| 			w.buf[w.i+6] = fullChunkType
 | |
| 		} else {
 | |
| 			w.buf[w.i+6] = lastChunkType
 | |
| 		}
 | |
| 	} else {
 | |
| 		if w.first {
 | |
| 			w.buf[w.i+6] = firstChunkType
 | |
| 		} else {
 | |
| 			w.buf[w.i+6] = middleChunkType
 | |
| 		}
 | |
| 	}
 | |
| 	binary.LittleEndian.PutUint32(w.buf[w.i+0:w.i+4], util.NewCRC(w.buf[w.i+6:w.j]).Value())
 | |
| 	binary.LittleEndian.PutUint16(w.buf[w.i+4:w.i+6], uint16(w.j-w.i-headerSize))
 | |
| }
 | |
| 
 | |
| // writeBlock writes the buffered block to the underlying writer, and reserves
 | |
| // space for the next chunk's header.
 | |
| func (w *Writer) writeBlock() {
 | |
| 	_, w.err = w.w.Write(w.buf[w.written:])
 | |
| 	w.i = 0
 | |
| 	w.j = headerSize
 | |
| 	w.written = 0
 | |
| }
 | |
| 
 | |
| // writePending finishes the current journal and writes the buffer to the
 | |
| // underlying writer.
 | |
| func (w *Writer) writePending() {
 | |
| 	if w.err != nil {
 | |
| 		return
 | |
| 	}
 | |
| 	if w.pending {
 | |
| 		w.fillHeader(true)
 | |
| 		w.pending = false
 | |
| 	}
 | |
| 	_, w.err = w.w.Write(w.buf[w.written:w.j])
 | |
| 	w.written = w.j
 | |
| }
 | |
| 
 | |
| // Close finishes the current journal and closes the writer.
 | |
| func (w *Writer) Close() error {
 | |
| 	w.seq++
 | |
| 	w.writePending()
 | |
| 	if w.err != nil {
 | |
| 		return w.err
 | |
| 	}
 | |
| 	w.err = errors.New("leveldb/journal: closed Writer")
 | |
| 	return nil
 | |
| }
 | |
| 
 | |
| // Flush finishes the current journal, writes to the underlying writer, and
 | |
| // flushes it if that writer implements interface{ Flush() error }.
 | |
| func (w *Writer) Flush() error {
 | |
| 	w.seq++
 | |
| 	w.writePending()
 | |
| 	if w.err != nil {
 | |
| 		return w.err
 | |
| 	}
 | |
| 	if w.f != nil {
 | |
| 		w.err = w.f.Flush()
 | |
| 		return w.err
 | |
| 	}
 | |
| 	return nil
 | |
| }
 | |
| 
 | |
| // Reset resets the journal writer, allows reuse of the journal writer. Reset
 | |
| // will also closes the journal writer if not already.
 | |
| func (w *Writer) Reset(writer io.Writer) (err error) {
 | |
| 	w.seq++
 | |
| 	if w.err == nil {
 | |
| 		w.writePending()
 | |
| 		err = w.err
 | |
| 	}
 | |
| 	w.w = writer
 | |
| 	w.f, _ = writer.(flusher)
 | |
| 	w.i = 0
 | |
| 	w.j = 0
 | |
| 	w.written = 0
 | |
| 	w.first = false
 | |
| 	w.pending = false
 | |
| 	w.err = nil
 | |
| 	return
 | |
| }
 | |
| 
 | |
| // Next returns a writer for the next journal. The writer returned becomes stale
 | |
| // after the next Close, Flush or Next call, and should no longer be used.
 | |
| func (w *Writer) Next() (io.Writer, error) {
 | |
| 	w.seq++
 | |
| 	if w.err != nil {
 | |
| 		return nil, w.err
 | |
| 	}
 | |
| 	if w.pending {
 | |
| 		w.fillHeader(true)
 | |
| 	}
 | |
| 	w.i = w.j
 | |
| 	w.j = w.j + headerSize
 | |
| 	// Check if there is room in the block for the header.
 | |
| 	if w.j > blockSize {
 | |
| 		// Fill in the rest of the block with zeroes.
 | |
| 		for k := w.i; k < blockSize; k++ {
 | |
| 			w.buf[k] = 0
 | |
| 		}
 | |
| 		w.writeBlock()
 | |
| 		if w.err != nil {
 | |
| 			return nil, w.err
 | |
| 		}
 | |
| 	}
 | |
| 	w.first = true
 | |
| 	w.pending = true
 | |
| 	return singleWriter{w, w.seq}, nil
 | |
| }
 | |
| 
 | |
| type singleWriter struct {
 | |
| 	w   *Writer
 | |
| 	seq int
 | |
| }
 | |
| 
 | |
| func (x singleWriter) Write(p []byte) (int, error) {
 | |
| 	w := x.w
 | |
| 	if w.seq != x.seq {
 | |
| 		return 0, errors.New("leveldb/journal: stale writer")
 | |
| 	}
 | |
| 	if w.err != nil {
 | |
| 		return 0, w.err
 | |
| 	}
 | |
| 	n0 := len(p)
 | |
| 	for len(p) > 0 {
 | |
| 		// Write a block, if it is full.
 | |
| 		if w.j == blockSize {
 | |
| 			w.fillHeader(false)
 | |
| 			w.writeBlock()
 | |
| 			if w.err != nil {
 | |
| 				return 0, w.err
 | |
| 			}
 | |
| 			w.first = false
 | |
| 		}
 | |
| 		// Copy bytes into the buffer.
 | |
| 		n := copy(w.buf[w.j:], p)
 | |
| 		w.j += n
 | |
| 		p = p[n:]
 | |
| 	}
 | |
| 	return n0, nil
 | |
| }
 | |
| 
 |