You can not select more than 25 topics
			Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.
		
		
		
		
		
			
		
			
				
					
					
						
							188 lines
						
					
					
						
							4.8 KiB
						
					
					
				
			
		
		
	
	
							188 lines
						
					
					
						
							4.8 KiB
						
					
					
				| //  Copyright (c) 2017 Couchbase, Inc.
 | |
| //
 | |
| // Licensed under the Apache License, Version 2.0 (the "License");
 | |
| // you may not use this file except in compliance with the License.
 | |
| // You may obtain a copy of the License at
 | |
| //
 | |
| // 		http://www.apache.org/licenses/LICENSE-2.0
 | |
| //
 | |
| // Unless required by applicable law or agreed to in writing, software
 | |
| // distributed under the License is distributed on an "AS IS" BASIS,
 | |
| // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 | |
| // See the License for the specific language governing permissions and
 | |
| // limitations under the License.
 | |
| 
 | |
| package vellum
 | |
| 
 | |
| import (
 | |
| 	"bytes"
 | |
| )
 | |
| 
 | |
| // MergeFunc is used to choose the new value for a key when merging a slice
 | |
| // of iterators, and the same key is observed with multiple values.
 | |
| // Values presented to the MergeFunc will be in the same order as the
 | |
| // original slice creating the MergeIterator.  This allows some MergeFunc
 | |
| // implementations to prioritize one iterator over another.
 | |
| type MergeFunc func([]uint64) uint64
 | |
| 
 | |
| // MergeIterator implements the Iterator interface by traversing a slice
 | |
| // of iterators and merging the contents of them.  If the same key exists
 | |
| // in mulitipe underlying iterators, a user-provided MergeFunc will be
 | |
| // invoked to choose the new value.
 | |
| type MergeIterator struct {
 | |
| 	itrs   []Iterator
 | |
| 	f      MergeFunc
 | |
| 	currKs [][]byte
 | |
| 	currVs []uint64
 | |
| 
 | |
| 	lowK    []byte
 | |
| 	lowV    uint64
 | |
| 	lowIdxs []int
 | |
| 
 | |
| 	mergeV []uint64
 | |
| }
 | |
| 
 | |
| // NewMergeIterator creates a new MergeIterator over the provided slice of
 | |
| // Iterators and with the specified MergeFunc to resolve duplicate keys.
 | |
| func NewMergeIterator(itrs []Iterator, f MergeFunc) (*MergeIterator, error) {
 | |
| 	rv := &MergeIterator{
 | |
| 		itrs:    itrs,
 | |
| 		f:       f,
 | |
| 		currKs:  make([][]byte, len(itrs)),
 | |
| 		currVs:  make([]uint64, len(itrs)),
 | |
| 		lowIdxs: make([]int, 0, len(itrs)),
 | |
| 		mergeV:  make([]uint64, 0, len(itrs)),
 | |
| 	}
 | |
| 	rv.init()
 | |
| 	if rv.lowK == nil {
 | |
| 		return rv, ErrIteratorDone
 | |
| 	}
 | |
| 	return rv, nil
 | |
| }
 | |
| 
 | |
| func (m *MergeIterator) init() {
 | |
| 	for i, itr := range m.itrs {
 | |
| 		m.currKs[i], m.currVs[i] = itr.Current()
 | |
| 	}
 | |
| 	m.updateMatches()
 | |
| }
 | |
| 
 | |
| func (m *MergeIterator) updateMatches() {
 | |
| 	if len(m.itrs) < 1 {
 | |
| 		return
 | |
| 	}
 | |
| 	m.lowK = m.currKs[0]
 | |
| 	m.lowIdxs = m.lowIdxs[:0]
 | |
| 	m.lowIdxs = append(m.lowIdxs, 0)
 | |
| 	for i := 1; i < len(m.itrs); i++ {
 | |
| 		if m.currKs[i] == nil {
 | |
| 			continue
 | |
| 		}
 | |
| 		cmp := bytes.Compare(m.currKs[i], m.lowK)
 | |
| 		if m.lowK == nil || cmp < 0 {
 | |
| 			// reached a new low
 | |
| 			m.lowK = m.currKs[i]
 | |
| 			m.lowIdxs = m.lowIdxs[:0]
 | |
| 			m.lowIdxs = append(m.lowIdxs, i)
 | |
| 		} else if cmp == 0 {
 | |
| 			m.lowIdxs = append(m.lowIdxs, i)
 | |
| 		}
 | |
| 	}
 | |
| 	if len(m.lowIdxs) > 1 {
 | |
| 		// merge multiple values
 | |
| 		m.mergeV = m.mergeV[:0]
 | |
| 		for _, vi := range m.lowIdxs {
 | |
| 			m.mergeV = append(m.mergeV, m.currVs[vi])
 | |
| 		}
 | |
| 		m.lowV = m.f(m.mergeV)
 | |
| 	} else if len(m.lowIdxs) == 1 {
 | |
| 		m.lowV = m.currVs[m.lowIdxs[0]]
 | |
| 	}
 | |
| }
 | |
| 
 | |
| // Current returns the key and value currently pointed to by this iterator.
 | |
| // If the iterator is not pointing at a valid value (because Iterator/Next/Seek)
 | |
| // returned an error previously, it may return nil,0.
 | |
| func (m *MergeIterator) Current() ([]byte, uint64) {
 | |
| 	return m.lowK, m.lowV
 | |
| }
 | |
| 
 | |
| // Next advances this iterator to the next key/value pair.  If there is none,
 | |
| // then ErrIteratorDone is returned.
 | |
| func (m *MergeIterator) Next() error {
 | |
| 	// move all the current low iterators to next
 | |
| 	for _, vi := range m.lowIdxs {
 | |
| 		err := m.itrs[vi].Next()
 | |
| 		if err != nil && err != ErrIteratorDone {
 | |
| 			return err
 | |
| 		}
 | |
| 		m.currKs[vi], m.currVs[vi] = m.itrs[vi].Current()
 | |
| 	}
 | |
| 	m.updateMatches()
 | |
| 	if m.lowK == nil {
 | |
| 		return ErrIteratorDone
 | |
| 	}
 | |
| 	return nil
 | |
| }
 | |
| 
 | |
| // Seek advances this iterator to the specified key/value pair.  If this key
 | |
| // is not in the FST, Current() will return the next largest key.  If this
 | |
| // seek operation would go past the last key, then ErrIteratorDone is returned.
 | |
| func (m *MergeIterator) Seek(key []byte) error {
 | |
| 	for i := range m.itrs {
 | |
| 		err := m.itrs[i].Seek(key)
 | |
| 		if err != nil && err != ErrIteratorDone {
 | |
| 			return err
 | |
| 		}
 | |
| 	}
 | |
| 	m.updateMatches()
 | |
| 	if m.lowK == nil {
 | |
| 		return ErrIteratorDone
 | |
| 	}
 | |
| 	return nil
 | |
| }
 | |
| 
 | |
| // Close will attempt to close all the underlying Iterators.  If any errors
 | |
| // are encountered, the first will be returned.
 | |
| func (m *MergeIterator) Close() error {
 | |
| 	var rv error
 | |
| 	for i := range m.itrs {
 | |
| 		// close all iterators, return first error if any
 | |
| 		err := m.itrs[i].Close()
 | |
| 		if rv == nil {
 | |
| 			rv = err
 | |
| 		}
 | |
| 	}
 | |
| 	return rv
 | |
| }
 | |
| 
 | |
| // MergeMin chooses the minimum value
 | |
| func MergeMin(vals []uint64) uint64 {
 | |
| 	rv := vals[0]
 | |
| 	for _, v := range vals[1:] {
 | |
| 		if v < rv {
 | |
| 			rv = v
 | |
| 		}
 | |
| 	}
 | |
| 	return rv
 | |
| }
 | |
| 
 | |
| // MergeMax chooses the maximum value
 | |
| func MergeMax(vals []uint64) uint64 {
 | |
| 	rv := vals[0]
 | |
| 	for _, v := range vals[1:] {
 | |
| 		if v > rv {
 | |
| 			rv = v
 | |
| 		}
 | |
| 	}
 | |
| 	return rv
 | |
| }
 | |
| 
 | |
| // MergeSum sums the values
 | |
| func MergeSum(vals []uint64) uint64 {
 | |
| 	rv := vals[0]
 | |
| 	for _, v := range vals[1:] {
 | |
| 		rv += v
 | |
| 	}
 | |
| 	return rv
 | |
| }
 | |
| 
 |