You can not select more than 25 topics
			Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.
		
		
		
		
		
			
		
			
				
					
					
						
							207 lines
						
					
					
						
							5.1 KiB
						
					
					
				
			
		
		
	
	
							207 lines
						
					
					
						
							5.1 KiB
						
					
					
				| // Copyright (c) 2012, Suryandaru Triandana <syndtr@gmail.com>
 | |
| // All rights reserved.
 | |
| //
 | |
| // Use of this source code is governed by a BSD-style license that can be
 | |
| // found in the LICENSE file.
 | |
| 
 | |
| package leveldb
 | |
| 
 | |
| import (
 | |
| 	"fmt"
 | |
| 	"io"
 | |
| 	"os"
 | |
| 	"sync"
 | |
| 
 | |
| 	"github.com/syndtr/goleveldb/leveldb/errors"
 | |
| 	"github.com/syndtr/goleveldb/leveldb/journal"
 | |
| 	"github.com/syndtr/goleveldb/leveldb/opt"
 | |
| 	"github.com/syndtr/goleveldb/leveldb/storage"
 | |
| )
 | |
| 
 | |
| // ErrManifestCorrupted records manifest corruption.
 | |
| type ErrManifestCorrupted struct {
 | |
| 	Field  string
 | |
| 	Reason string
 | |
| }
 | |
| 
 | |
| func (e *ErrManifestCorrupted) Error() string {
 | |
| 	return fmt.Sprintf("leveldb: manifest corrupted (field '%s'): %s", e.Field, e.Reason)
 | |
| }
 | |
| 
 | |
| func newErrManifestCorrupted(fd storage.FileDesc, field, reason string) error {
 | |
| 	return errors.NewErrCorrupted(fd, &ErrManifestCorrupted{field, reason})
 | |
| }
 | |
| 
 | |
| // session represent a persistent database session.
 | |
| type session struct {
 | |
| 	// Need 64-bit alignment.
 | |
| 	stNextFileNum    int64 // current unused file number
 | |
| 	stJournalNum     int64 // current journal file number; need external synchronization
 | |
| 	stPrevJournalNum int64 // prev journal file number; no longer used; for compatibility with older version of leveldb
 | |
| 	stTempFileNum    int64
 | |
| 	stSeqNum         uint64 // last mem compacted seq; need external synchronization
 | |
| 
 | |
| 	stor     storage.Storage
 | |
| 	storLock storage.Lock
 | |
| 	o        *cachedOptions
 | |
| 	icmp     *iComparer
 | |
| 	tops     *tOps
 | |
| 
 | |
| 	manifest       *journal.Writer
 | |
| 	manifestWriter storage.Writer
 | |
| 	manifestFd     storage.FileDesc
 | |
| 
 | |
| 	stCompPtrs []internalKey // compaction pointers; need external synchronization
 | |
| 	stVersion  *version      // current version
 | |
| 	vmu        sync.Mutex
 | |
| }
 | |
| 
 | |
| // Creates new initialized session instance.
 | |
| func newSession(stor storage.Storage, o *opt.Options) (s *session, err error) {
 | |
| 	if stor == nil {
 | |
| 		return nil, os.ErrInvalid
 | |
| 	}
 | |
| 	storLock, err := stor.Lock()
 | |
| 	if err != nil {
 | |
| 		return
 | |
| 	}
 | |
| 	s = &session{
 | |
| 		stor:     stor,
 | |
| 		storLock: storLock,
 | |
| 	}
 | |
| 	s.setOptions(o)
 | |
| 	s.tops = newTableOps(s)
 | |
| 	s.setVersion(newVersion(s))
 | |
| 	s.log("log@legend F·NumFile S·FileSize N·Entry C·BadEntry B·BadBlock Ke·KeyError D·DroppedEntry L·Level Q·SeqNum T·TimeElapsed")
 | |
| 	return
 | |
| }
 | |
| 
 | |
| // Close session.
 | |
| func (s *session) close() {
 | |
| 	s.tops.close()
 | |
| 	if s.manifest != nil {
 | |
| 		s.manifest.Close()
 | |
| 	}
 | |
| 	if s.manifestWriter != nil {
 | |
| 		s.manifestWriter.Close()
 | |
| 	}
 | |
| 	s.manifest = nil
 | |
| 	s.manifestWriter = nil
 | |
| 	s.stVersion = nil
 | |
| }
 | |
| 
 | |
| // Release session lock.
 | |
| func (s *session) release() {
 | |
| 	s.storLock.Release()
 | |
| }
 | |
| 
 | |
| // Create a new database session; need external synchronization.
 | |
| func (s *session) create() error {
 | |
| 	// create manifest
 | |
| 	return s.newManifest(nil, nil)
 | |
| }
 | |
| 
 | |
| // Recover a database session; need external synchronization.
 | |
| func (s *session) recover() (err error) {
 | |
| 	defer func() {
 | |
| 		if os.IsNotExist(err) {
 | |
| 			// Don't return os.ErrNotExist if the underlying storage contains
 | |
| 			// other files that belong to LevelDB. So the DB won't get trashed.
 | |
| 			if fds, _ := s.stor.List(storage.TypeAll); len(fds) > 0 {
 | |
| 				err = &errors.ErrCorrupted{Fd: storage.FileDesc{Type: storage.TypeManifest}, Err: &errors.ErrMissingFiles{}}
 | |
| 			}
 | |
| 		}
 | |
| 	}()
 | |
| 
 | |
| 	fd, err := s.stor.GetMeta()
 | |
| 	if err != nil {
 | |
| 		return
 | |
| 	}
 | |
| 
 | |
| 	reader, err := s.stor.Open(fd)
 | |
| 	if err != nil {
 | |
| 		return
 | |
| 	}
 | |
| 	defer reader.Close()
 | |
| 
 | |
| 	var (
 | |
| 		// Options.
 | |
| 		strict = s.o.GetStrict(opt.StrictManifest)
 | |
| 
 | |
| 		jr      = journal.NewReader(reader, dropper{s, fd}, strict, true)
 | |
| 		rec     = &sessionRecord{}
 | |
| 		staging = s.stVersion.newStaging()
 | |
| 	)
 | |
| 	for {
 | |
| 		var r io.Reader
 | |
| 		r, err = jr.Next()
 | |
| 		if err != nil {
 | |
| 			if err == io.EOF {
 | |
| 				err = nil
 | |
| 				break
 | |
| 			}
 | |
| 			return errors.SetFd(err, fd)
 | |
| 		}
 | |
| 
 | |
| 		err = rec.decode(r)
 | |
| 		if err == nil {
 | |
| 			// save compact pointers
 | |
| 			for _, r := range rec.compPtrs {
 | |
| 				s.setCompPtr(r.level, internalKey(r.ikey))
 | |
| 			}
 | |
| 			// commit record to version staging
 | |
| 			staging.commit(rec)
 | |
| 		} else {
 | |
| 			err = errors.SetFd(err, fd)
 | |
| 			if strict || !errors.IsCorrupted(err) {
 | |
| 				return
 | |
| 			}
 | |
| 			s.logf("manifest error: %v (skipped)", errors.SetFd(err, fd))
 | |
| 		}
 | |
| 		rec.resetCompPtrs()
 | |
| 		rec.resetAddedTables()
 | |
| 		rec.resetDeletedTables()
 | |
| 	}
 | |
| 
 | |
| 	switch {
 | |
| 	case !rec.has(recComparer):
 | |
| 		return newErrManifestCorrupted(fd, "comparer", "missing")
 | |
| 	case rec.comparer != s.icmp.uName():
 | |
| 		return newErrManifestCorrupted(fd, "comparer", fmt.Sprintf("mismatch: want '%s', got '%s'", s.icmp.uName(), rec.comparer))
 | |
| 	case !rec.has(recNextFileNum):
 | |
| 		return newErrManifestCorrupted(fd, "next-file-num", "missing")
 | |
| 	case !rec.has(recJournalNum):
 | |
| 		return newErrManifestCorrupted(fd, "journal-file-num", "missing")
 | |
| 	case !rec.has(recSeqNum):
 | |
| 		return newErrManifestCorrupted(fd, "seq-num", "missing")
 | |
| 	}
 | |
| 
 | |
| 	s.manifestFd = fd
 | |
| 	s.setVersion(staging.finish())
 | |
| 	s.setNextFileNum(rec.nextFileNum)
 | |
| 	s.recordCommited(rec)
 | |
| 	return nil
 | |
| }
 | |
| 
 | |
| // Commit session; need external synchronization.
 | |
| func (s *session) commit(r *sessionRecord) (err error) {
 | |
| 	v := s.version()
 | |
| 	defer v.release()
 | |
| 
 | |
| 	// spawn new version based on current version
 | |
| 	nv := v.spawn(r)
 | |
| 
 | |
| 	if s.manifest == nil {
 | |
| 		// manifest journal writer not yet created, create one
 | |
| 		err = s.newManifest(r, nv)
 | |
| 	} else {
 | |
| 		err = s.flushManifest(r)
 | |
| 	}
 | |
| 
 | |
| 	// finally, apply new version if no error rise
 | |
| 	if err == nil {
 | |
| 		s.setVersion(nv)
 | |
| 	}
 | |
| 
 | |
| 	return
 | |
| }
 | |
| 
 |