//  Copyright (c) 2014 Couchbase, Inc.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// 		http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

package bleve

import (
	"context"
	"encoding/json"
	"fmt"
	"os"
	"sync"
	"sync/atomic"
	"time"

	"github.com/blevesearch/bleve/document"
	"github.com/blevesearch/bleve/index"
	"github.com/blevesearch/bleve/index/store"
	"github.com/blevesearch/bleve/index/upsidedown"
	"github.com/blevesearch/bleve/mapping"
	"github.com/blevesearch/bleve/registry"
	"github.com/blevesearch/bleve/search"
	"github.com/blevesearch/bleve/search/collector"
	"github.com/blevesearch/bleve/search/facet"
	"github.com/blevesearch/bleve/search/highlight"
)

type indexImpl struct {
	path  string
	name  string
	meta  *indexMeta
	i     index.Index
	m     mapping.IndexMapping
	mutex sync.RWMutex
	open  bool
	stats *IndexStat
}

const storePath = "store"

var mappingInternalKey = []byte("_mapping")

const SearchQueryStartCallbackKey = "_search_query_start_callback_key"
const SearchQueryEndCallbackKey = "_search_query_end_callback_key"

type SearchQueryStartCallbackFn func(size uint64) error
type SearchQueryEndCallbackFn func(size uint64) error

func indexStorePath(path string) string {
	return path + string(os.PathSeparator) + storePath
}

func newIndexUsing(path string, mapping mapping.IndexMapping, indexType string, kvstore string, kvconfig map[string]interface{}) (*indexImpl, error) {
	// first validate the mapping
	err := mapping.Validate()
	if err != nil {
		return nil, err
	}

	if kvconfig == nil {
		kvconfig = map[string]interface{}{}
	}

	if kvstore == "" {
		return nil, fmt.Errorf("bleve not configured for file based indexing")
	}

	rv := indexImpl{
		path: path,
		name: path,
		m:    mapping,
		meta: newIndexMeta(indexType, kvstore, kvconfig),
	}
	rv.stats = &IndexStat{i: &rv}
	// at this point there is hope that we can be successful, so save index meta
	if path != "" {
		err = rv.meta.Save(path)
		if err != nil {
			return nil, err
		}
		kvconfig["create_if_missing"] = true
		kvconfig["error_if_exists"] = true
		kvconfig["path"] = indexStorePath(path)
	} else {
		kvconfig["path"] = ""
	}

	// open the index
	indexTypeConstructor := registry.IndexTypeConstructorByName(rv.meta.IndexType)
	if indexTypeConstructor == nil {
		return nil, ErrorUnknownIndexType
	}

	rv.i, err = indexTypeConstructor(rv.meta.Storage, kvconfig, Config.analysisQueue)
	if err != nil {
		return nil, err
	}
	err = rv.i.Open()
	if err != nil {
		if err == index.ErrorUnknownStorageType {
			return nil, ErrorUnknownStorageType
		}
		return nil, err
	}

	// now persist the mapping
	mappingBytes, err := json.Marshal(mapping)
	if err != nil {
		return nil, err
	}
	err = rv.i.SetInternal(mappingInternalKey, mappingBytes)
	if err != nil {
		return nil, err
	}

	// mark the index as open
	rv.mutex.Lock()
	defer rv.mutex.Unlock()
	rv.open = true
	indexStats.Register(&rv)
	return &rv, nil
}

func openIndexUsing(path string, runtimeConfig map[string]interface{}) (rv *indexImpl, err error) {
	rv = &indexImpl{
		path: path,
		name: path,
	}
	rv.stats = &IndexStat{i: rv}

	rv.meta, err = openIndexMeta(path)
	if err != nil {
		return nil, err
	}

	// backwards compatibility if index type is missing
	if rv.meta.IndexType == "" {
		rv.meta.IndexType = upsidedown.Name
	}

	storeConfig := rv.meta.Config
	if storeConfig == nil {
		storeConfig = map[string]interface{}{}
	}

	storeConfig["path"] = indexStorePath(path)
	storeConfig["create_if_missing"] = false
	storeConfig["error_if_exists"] = false
	for rck, rcv := range runtimeConfig {
		storeConfig[rck] = rcv
	}

	// open the index
	indexTypeConstructor := registry.IndexTypeConstructorByName(rv.meta.IndexType)
	if indexTypeConstructor == nil {
		return nil, ErrorUnknownIndexType
	}

	rv.i, err = indexTypeConstructor(rv.meta.Storage, storeConfig, Config.analysisQueue)
	if err != nil {
		return nil, err
	}
	err = rv.i.Open()
	if err != nil {
		if err == index.ErrorUnknownStorageType {
			return nil, ErrorUnknownStorageType
		}
		return nil, err
	}

	// now load the mapping
	indexReader, err := rv.i.Reader()
	if err != nil {
		return nil, err
	}
	defer func() {
		if cerr := indexReader.Close(); cerr != nil && err == nil {
			err = cerr
		}
	}()

	mappingBytes, err := indexReader.GetInternal(mappingInternalKey)
	if err != nil {
		return nil, err
	}

	var im *mapping.IndexMappingImpl
	err = json.Unmarshal(mappingBytes, &im)
	if err != nil {
		return nil, fmt.Errorf("error parsing mapping JSON: %v\nmapping contents:\n%s", err, string(mappingBytes))
	}

	// mark the index as open
	rv.mutex.Lock()
	defer rv.mutex.Unlock()
	rv.open = true

	// validate the mapping
	err = im.Validate()
	if err != nil {
		// note even if the mapping is invalid
		// we still return an open usable index
		return rv, err
	}

	rv.m = im
	indexStats.Register(rv)
	return rv, err
}

// Advanced returns implementation internals
// necessary ONLY for advanced usage.
func (i *indexImpl) Advanced() (index.Index, store.KVStore, error) {
	s, err := i.i.Advanced()
	if err != nil {
		return nil, nil, err
	}
	return i.i, s, nil
}

// Mapping returns the IndexMapping in use by this
// Index.
func (i *indexImpl) Mapping() mapping.IndexMapping {
	return i.m
}

// Index the object with the specified identifier.
// The IndexMapping for this index will determine
// how the object is indexed.
func (i *indexImpl) Index(id string, data interface{}) (err error) {
	if id == "" {
		return ErrorEmptyID
	}

	i.mutex.RLock()
	defer i.mutex.RUnlock()

	if !i.open {
		return ErrorIndexClosed
	}

	doc := document.NewDocument(id)
	err = i.m.MapDocument(doc, data)
	if err != nil {
		return
	}
	err = i.i.Update(doc)
	return
}

// IndexAdvanced takes a document.Document object
// skips the mapping and indexes it.
func (i *indexImpl) IndexAdvanced(doc *document.Document) (err error) {
	if doc.ID == "" {
		return ErrorEmptyID
	}

	i.mutex.RLock()
	defer i.mutex.RUnlock()

	if !i.open {
		return ErrorIndexClosed
	}

	err = i.i.Update(doc)
	return
}

// Delete entries for the specified identifier from
// the index.
func (i *indexImpl) Delete(id string) (err error) {
	if id == "" {
		return ErrorEmptyID
	}

	i.mutex.RLock()
	defer i.mutex.RUnlock()

	if !i.open {
		return ErrorIndexClosed
	}

	err = i.i.Delete(id)
	return
}

// Batch executes multiple Index and Delete
// operations at the same time.  There are often
// significant performance benefits when performing
// operations in a batch.
func (i *indexImpl) Batch(b *Batch) error {
	i.mutex.RLock()
	defer i.mutex.RUnlock()

	if !i.open {
		return ErrorIndexClosed
	}

	return i.i.Batch(b.internal)
}

// Document is used to find the values of all the
// stored fields for a document in the index.  These
// stored fields are put back into a Document object
// and returned.
func (i *indexImpl) Document(id string) (doc *document.Document, err error) {
	i.mutex.RLock()
	defer i.mutex.RUnlock()

	if !i.open {
		return nil, ErrorIndexClosed
	}
	indexReader, err := i.i.Reader()
	if err != nil {
		return nil, err
	}
	defer func() {
		if cerr := indexReader.Close(); err == nil && cerr != nil {
			err = cerr
		}
	}()

	doc, err = indexReader.Document(id)
	if err != nil {
		return nil, err
	}
	return doc, nil
}

// DocCount returns the number of documents in the
// index.
func (i *indexImpl) DocCount() (count uint64, err error) {
	i.mutex.RLock()
	defer i.mutex.RUnlock()

	if !i.open {
		return 0, ErrorIndexClosed
	}

	// open a reader for this search
	indexReader, err := i.i.Reader()
	if err != nil {
		return 0, fmt.Errorf("error opening index reader %v", err)
	}
	defer func() {
		if cerr := indexReader.Close(); err == nil && cerr != nil {
			err = cerr
		}
	}()

	count, err = indexReader.DocCount()
	return
}

// Search executes a search request operation.
// Returns a SearchResult object or an error.
func (i *indexImpl) Search(req *SearchRequest) (sr *SearchResult, err error) {
	return i.SearchInContext(context.Background(), req)
}

var documentMatchEmptySize int
var searchContextEmptySize int
var facetResultEmptySize int
var documentEmptySize int

func init() {
	var dm search.DocumentMatch
	documentMatchEmptySize = dm.Size()

	var sc search.SearchContext
	searchContextEmptySize = sc.Size()

	var fr search.FacetResult
	facetResultEmptySize = fr.Size()

	var d document.Document
	documentEmptySize = d.Size()
}

// memNeededForSearch is a helper function that returns an estimate of RAM
// needed to execute a search request.
func memNeededForSearch(req *SearchRequest,
	searcher search.Searcher,
	topnCollector *collector.TopNCollector) uint64 {

	backingSize := req.Size + req.From + 1
	if req.Size+req.From > collector.PreAllocSizeSkipCap {
		backingSize = collector.PreAllocSizeSkipCap + 1
	}
	numDocMatches := backingSize + searcher.DocumentMatchPoolSize()

	estimate := 0

	// overhead, size in bytes from collector
	estimate += topnCollector.Size()

	// pre-allocing DocumentMatchPool
	estimate += searchContextEmptySize + numDocMatches*documentMatchEmptySize

	// searcher overhead
	estimate += searcher.Size()

	// overhead from results, lowestMatchOutsideResults
	estimate += (numDocMatches + 1) * documentMatchEmptySize

	// additional overhead from SearchResult
	estimate += reflectStaticSizeSearchResult + reflectStaticSizeSearchStatus

	// overhead from facet results
	if req.Facets != nil {
		estimate += len(req.Facets) * facetResultEmptySize
	}

	// highlighting, store
	if len(req.Fields) > 0 || req.Highlight != nil {
		// Size + From => number of hits
		estimate += (req.Size + req.From) * documentEmptySize
	}

	return uint64(estimate)
}

// SearchInContext executes a search request operation within the provided
// Context. Returns a SearchResult object or an error.
func (i *indexImpl) SearchInContext(ctx context.Context, req *SearchRequest) (sr *SearchResult, err error) {
	i.mutex.RLock()
	defer i.mutex.RUnlock()

	searchStart := time.Now()

	if !i.open {
		return nil, ErrorIndexClosed
	}

	collector := collector.NewTopNCollector(req.Size, req.From, req.Sort)

	// open a reader for this search
	indexReader, err := i.i.Reader()
	if err != nil {
		return nil, fmt.Errorf("error opening index reader %v", err)
	}
	defer func() {
		if cerr := indexReader.Close(); err == nil && cerr != nil {
			err = cerr
		}
	}()

	searcher, err := req.Query.Searcher(indexReader, i.m, search.SearcherOptions{
		Explain:            req.Explain,
		IncludeTermVectors: req.IncludeLocations || req.Highlight != nil,
		Score:              req.Score,
	})
	if err != nil {
		return nil, err
	}
	defer func() {
		if serr := searcher.Close(); err == nil && serr != nil {
			err = serr
		}
	}()

	if req.Facets != nil {
		facetsBuilder := search.NewFacetsBuilder(indexReader)
		for facetName, facetRequest := range req.Facets {
			if facetRequest.NumericRanges != nil {
				// build numeric range facet
				facetBuilder := facet.NewNumericFacetBuilder(facetRequest.Field, facetRequest.Size)
				for _, nr := range facetRequest.NumericRanges {
					facetBuilder.AddRange(nr.Name, nr.Min, nr.Max)
				}
				facetsBuilder.Add(facetName, facetBuilder)
			} else if facetRequest.DateTimeRanges != nil {
				// build date range facet
				facetBuilder := facet.NewDateTimeFacetBuilder(facetRequest.Field, facetRequest.Size)
				dateTimeParser := i.m.DateTimeParserNamed("")
				for _, dr := range facetRequest.DateTimeRanges {
					start, end := dr.ParseDates(dateTimeParser)
					facetBuilder.AddRange(dr.Name, start, end)
				}
				facetsBuilder.Add(facetName, facetBuilder)
			} else {
				// build terms facet
				facetBuilder := facet.NewTermsFacetBuilder(facetRequest.Field, facetRequest.Size)
				facetsBuilder.Add(facetName, facetBuilder)
			}
		}
		collector.SetFacetsBuilder(facetsBuilder)
	}

	memNeeded := memNeededForSearch(req, searcher, collector)
	if cb := ctx.Value(SearchQueryStartCallbackKey); cb != nil {
		if cbF, ok := cb.(SearchQueryStartCallbackFn); ok {
			err = cbF(memNeeded)
		}
	}
	if err != nil {
		return nil, err
	}

	if cb := ctx.Value(SearchQueryEndCallbackKey); cb != nil {
		if cbF, ok := cb.(SearchQueryEndCallbackFn); ok {
			defer func() {
				_ = cbF(memNeeded)
			}()
		}
	}

	err = collector.Collect(ctx, searcher, indexReader)
	if err != nil {
		return nil, err
	}

	hits := collector.Results()

	var highlighter highlight.Highlighter

	if req.Highlight != nil {
		// get the right highlighter
		highlighter, err = Config.Cache.HighlighterNamed(Config.DefaultHighlighter)
		if err != nil {
			return nil, err
		}
		if req.Highlight.Style != nil {
			highlighter, err = Config.Cache.HighlighterNamed(*req.Highlight.Style)
			if err != nil {
				return nil, err
			}
		}
		if highlighter == nil {
			return nil, fmt.Errorf("no highlighter named `%s` registered", *req.Highlight.Style)
		}
	}

	for _, hit := range hits {
		if len(req.Fields) > 0 || highlighter != nil {
			doc, err := indexReader.Document(hit.ID)
			if err == nil && doc != nil {
				if len(req.Fields) > 0 {
					fieldsToLoad := deDuplicate(req.Fields)
					for _, f := range fieldsToLoad {
						for _, docF := range doc.Fields {
							if f == "*" || docF.Name() == f {
								var value interface{}
								switch docF := docF.(type) {
								case *document.TextField:
									value = string(docF.Value())
								case *document.NumericField:
									num, err := docF.Number()
									if err == nil {
										value = num
									}
								case *document.DateTimeField:
									datetime, err := docF.DateTime()
									if err == nil {
										value = datetime.Format(time.RFC3339)
									}
								case *document.BooleanField:
									boolean, err := docF.Boolean()
									if err == nil {
										value = boolean
									}
								case *document.GeoPointField:
									lon, err := docF.Lon()
									if err == nil {
										lat, err := docF.Lat()
										if err == nil {
											value = []float64{lon, lat}
										}
									}
								}
								if value != nil {
									hit.AddFieldValue(docF.Name(), value)
								}
							}
						}
					}
				}
				if highlighter != nil {
					highlightFields := req.Highlight.Fields
					if highlightFields == nil {
						// add all fields with matches
						highlightFields = make([]string, 0, len(hit.Locations))
						for k := range hit.Locations {
							highlightFields = append(highlightFields, k)
						}
					}
					for _, hf := range highlightFields {
						highlighter.BestFragmentsInField(hit, doc, hf, 1)
					}
				}
			} else if doc == nil {
				// unexpected case, a doc ID that was found as a search hit
				// was unable to be found during document lookup
				return nil, ErrorIndexReadInconsistency
			}
		}
		if i.name != "" {
			hit.Index = i.name
		}
	}

	atomic.AddUint64(&i.stats.searches, 1)
	searchDuration := time.Since(searchStart)
	atomic.AddUint64(&i.stats.searchTime, uint64(searchDuration))

	if Config.SlowSearchLogThreshold > 0 &&
		searchDuration > Config.SlowSearchLogThreshold {
		logger.Printf("slow search took %s - %v", searchDuration, req)
	}

	return &SearchResult{
		Status: &SearchStatus{
			Total:      1,
			Successful: 1,
		},
		Request:  req,
		Hits:     hits,
		Total:    collector.Total(),
		MaxScore: collector.MaxScore(),
		Took:     searchDuration,
		Facets:   collector.FacetResults(),
	}, nil
}

// Fields returns the name of all the fields this
// Index has operated on.
func (i *indexImpl) Fields() (fields []string, err error) {
	i.mutex.RLock()
	defer i.mutex.RUnlock()

	if !i.open {
		return nil, ErrorIndexClosed
	}

	indexReader, err := i.i.Reader()
	if err != nil {
		return nil, err
	}
	defer func() {
		if cerr := indexReader.Close(); err == nil && cerr != nil {
			err = cerr
		}
	}()

	fields, err = indexReader.Fields()
	if err != nil {
		return nil, err
	}
	return fields, nil
}

func (i *indexImpl) FieldDict(field string) (index.FieldDict, error) {
	i.mutex.RLock()

	if !i.open {
		i.mutex.RUnlock()
		return nil, ErrorIndexClosed
	}

	indexReader, err := i.i.Reader()
	if err != nil {
		i.mutex.RUnlock()
		return nil, err
	}

	fieldDict, err := indexReader.FieldDict(field)
	if err != nil {
		i.mutex.RUnlock()
		return nil, err
	}

	return &indexImplFieldDict{
		index:       i,
		indexReader: indexReader,
		fieldDict:   fieldDict,
	}, nil
}

func (i *indexImpl) FieldDictRange(field string, startTerm []byte, endTerm []byte) (index.FieldDict, error) {
	i.mutex.RLock()

	if !i.open {
		i.mutex.RUnlock()
		return nil, ErrorIndexClosed
	}

	indexReader, err := i.i.Reader()
	if err != nil {
		i.mutex.RUnlock()
		return nil, err
	}

	fieldDict, err := indexReader.FieldDictRange(field, startTerm, endTerm)
	if err != nil {
		i.mutex.RUnlock()
		return nil, err
	}

	return &indexImplFieldDict{
		index:       i,
		indexReader: indexReader,
		fieldDict:   fieldDict,
	}, nil
}

func (i *indexImpl) FieldDictPrefix(field string, termPrefix []byte) (index.FieldDict, error) {
	i.mutex.RLock()

	if !i.open {
		i.mutex.RUnlock()
		return nil, ErrorIndexClosed
	}

	indexReader, err := i.i.Reader()
	if err != nil {
		i.mutex.RUnlock()
		return nil, err
	}

	fieldDict, err := indexReader.FieldDictPrefix(field, termPrefix)
	if err != nil {
		i.mutex.RUnlock()
		return nil, err
	}

	return &indexImplFieldDict{
		index:       i,
		indexReader: indexReader,
		fieldDict:   fieldDict,
	}, nil
}

func (i *indexImpl) Close() error {
	i.mutex.Lock()
	defer i.mutex.Unlock()

	indexStats.UnRegister(i)

	i.open = false
	return i.i.Close()
}

func (i *indexImpl) Stats() *IndexStat {
	return i.stats
}

func (i *indexImpl) StatsMap() map[string]interface{} {
	return i.stats.statsMap()
}

func (i *indexImpl) GetInternal(key []byte) (val []byte, err error) {
	i.mutex.RLock()
	defer i.mutex.RUnlock()

	if !i.open {
		return nil, ErrorIndexClosed
	}

	reader, err := i.i.Reader()
	if err != nil {
		return nil, err
	}
	defer func() {
		if cerr := reader.Close(); err == nil && cerr != nil {
			err = cerr
		}
	}()

	val, err = reader.GetInternal(key)
	if err != nil {
		return nil, err
	}
	return val, nil
}

func (i *indexImpl) SetInternal(key, val []byte) error {
	i.mutex.RLock()
	defer i.mutex.RUnlock()

	if !i.open {
		return ErrorIndexClosed
	}

	return i.i.SetInternal(key, val)
}

func (i *indexImpl) DeleteInternal(key []byte) error {
	i.mutex.RLock()
	defer i.mutex.RUnlock()

	if !i.open {
		return ErrorIndexClosed
	}

	return i.i.DeleteInternal(key)
}

// NewBatch creates a new empty batch.
func (i *indexImpl) NewBatch() *Batch {
	return &Batch{
		index:    i,
		internal: index.NewBatch(),
	}
}

func (i *indexImpl) Name() string {
	return i.name
}

func (i *indexImpl) SetName(name string) {
	indexStats.UnRegister(i)
	i.name = name
	indexStats.Register(i)
}

type indexImplFieldDict struct {
	index       *indexImpl
	indexReader index.IndexReader
	fieldDict   index.FieldDict
}

func (f *indexImplFieldDict) Next() (*index.DictEntry, error) {
	return f.fieldDict.Next()
}

func (f *indexImplFieldDict) Close() error {
	defer f.index.mutex.RUnlock()
	err := f.fieldDict.Close()
	if err != nil {
		return err
	}
	return f.indexReader.Close()
}

// helper function to remove duplicate entries from slice of strings
func deDuplicate(fields []string) []string {
	entries := make(map[string]struct{})
	ret := []string{}
	for _, entry := range fields {
		if _, exists := entries[entry]; !exists {
			entries[entry] = struct{}{}
			ret = append(ret, entry)
		}
	}
	return ret
}