mirror of
https://github.com/mainnika/mongox-go-driver.git
synced 2026-06-13 01:03:35 +00:00
Add preloader filter
Fix pipeline structure Fix preloader logic
This commit is contained in:
@@ -0,0 +1,140 @@
|
|||||||
|
package common
|
||||||
|
|
||||||
|
import (
|
||||||
|
"reflect"
|
||||||
|
"strconv"
|
||||||
|
"strings"
|
||||||
|
|
||||||
|
"github.com/mainnika/mongox-go-driver/mongox"
|
||||||
|
"github.com/mainnika/mongox-go-driver/mongox/errors"
|
||||||
|
"github.com/mainnika/mongox-go-driver/mongox/query"
|
||||||
|
"go.mongodb.org/mongo-driver/bson/primitive"
|
||||||
|
"go.mongodb.org/mongo-driver/mongo"
|
||||||
|
"go.mongodb.org/mongo-driver/mongo/options"
|
||||||
|
)
|
||||||
|
|
||||||
|
func createSimpleLoad(db *mongox.Database, target interface{}, composed *query.Query) (cursor *mongo.Cursor, err error) {
|
||||||
|
|
||||||
|
collection := db.GetCollectionOf(target)
|
||||||
|
opts := options.Find()
|
||||||
|
|
||||||
|
opts.Sort = composed.Sorter()
|
||||||
|
opts.Limit = composed.Limiter()
|
||||||
|
opts.Skip = composed.Skipper()
|
||||||
|
|
||||||
|
return collection.Find(db.Context(), composed.M(), opts)
|
||||||
|
}
|
||||||
|
|
||||||
|
func createAggregateLoad(db *mongox.Database, target interface{}, composed *query.Query) (cursor *mongo.Cursor, err error) {
|
||||||
|
|
||||||
|
collection := db.GetCollectionOf(target)
|
||||||
|
opts := options.Aggregate()
|
||||||
|
|
||||||
|
pipelineHead := primitive.A{primitive.M{"$match": composed.M()}}
|
||||||
|
pipelineTail := primitive.A{}
|
||||||
|
|
||||||
|
el := reflect.ValueOf(target).Elem()
|
||||||
|
elType := el.Type()
|
||||||
|
numField := elType.NumField()
|
||||||
|
_, preloads := composed.Preloader()
|
||||||
|
|
||||||
|
for i := 0; i < numField; i++ {
|
||||||
|
|
||||||
|
field := elType.Field(i)
|
||||||
|
tag := field.Tag
|
||||||
|
|
||||||
|
preloadTag, ok := tag.Lookup("preload")
|
||||||
|
if !ok {
|
||||||
|
continue
|
||||||
|
}
|
||||||
|
jsonTag, ok := tag.Lookup("json")
|
||||||
|
if jsonTag == "-" {
|
||||||
|
return nil, errors.Malformedf("preload private field is impossible")
|
||||||
|
}
|
||||||
|
|
||||||
|
jsonData := strings.SplitN(jsonTag, ",", 2)
|
||||||
|
jsonName := field.Name
|
||||||
|
if len(jsonData) > 0 {
|
||||||
|
jsonName = strings.TrimSpace(jsonData[0])
|
||||||
|
}
|
||||||
|
|
||||||
|
preloadData := strings.Split(preloadTag, ",")
|
||||||
|
if len(preloadData) == 0 {
|
||||||
|
continue
|
||||||
|
}
|
||||||
|
if len(preloadData) == 1 {
|
||||||
|
panic("there is no foreign field")
|
||||||
|
}
|
||||||
|
|
||||||
|
preloadName := strings.TrimSpace(preloadData[0])
|
||||||
|
if len(preloadName) == 0 {
|
||||||
|
preloadName = jsonName
|
||||||
|
}
|
||||||
|
|
||||||
|
foreignField := strings.TrimSpace(preloadData[1])
|
||||||
|
if len(foreignField) == 0 {
|
||||||
|
panic("there is no foreign field")
|
||||||
|
}
|
||||||
|
|
||||||
|
preloadLimiter := 100
|
||||||
|
if len(preloadData) > 2 {
|
||||||
|
|
||||||
|
stringLimit := strings.TrimSpace(preloadData[2])
|
||||||
|
intLimit := preloadLimiter
|
||||||
|
|
||||||
|
intLimit, err = strconv.Atoi(stringLimit)
|
||||||
|
if err == nil {
|
||||||
|
preloadLimiter = intLimit
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
for _, preload := range preloads {
|
||||||
|
if preload != preloadName {
|
||||||
|
continue
|
||||||
|
}
|
||||||
|
|
||||||
|
isPtr := el.Field(i).Kind() == reflect.Ptr
|
||||||
|
isSlice := el.Field(i).Kind() == reflect.Slice
|
||||||
|
isIface := el.Field(i).CanInterface()
|
||||||
|
if (!isPtr && !isSlice) || !isIface {
|
||||||
|
continue
|
||||||
|
}
|
||||||
|
|
||||||
|
typ := el.Field(i).Type()
|
||||||
|
lookupCollection := db.GetCollectionOf(reflect.Zero(typ).Interface())
|
||||||
|
lookupVars := primitive.M{"selector": "$_id"}
|
||||||
|
lookupPipeline := primitive.A{
|
||||||
|
// todo: make match from composed query
|
||||||
|
primitive.M{"$match": primitive.M{"$expr": primitive.M{"$eq": primitive.A{"$" + foreignField, "$$selector"}}}},
|
||||||
|
}
|
||||||
|
|
||||||
|
if isSlice && preloadLimiter > 0 {
|
||||||
|
lookupPipeline = append(lookupPipeline, primitive.M{"$limit": preloadLimiter})
|
||||||
|
} else if !isSlice {
|
||||||
|
lookupPipeline = append(lookupPipeline, primitive.M{"$limit": 1})
|
||||||
|
}
|
||||||
|
|
||||||
|
pipelineTail = append(pipelineTail, primitive.M{
|
||||||
|
"$lookup": primitive.M{
|
||||||
|
"from": lookupCollection.Name(),
|
||||||
|
"let": lookupVars,
|
||||||
|
"pipeline": lookupPipeline,
|
||||||
|
"as": jsonName,
|
||||||
|
},
|
||||||
|
})
|
||||||
|
|
||||||
|
if isSlice {
|
||||||
|
continue
|
||||||
|
}
|
||||||
|
|
||||||
|
pipelineTail = append(pipelineTail, primitive.M{
|
||||||
|
"$unwind": primitive.M{
|
||||||
|
"preserveNullAndEmptyArrays": true,
|
||||||
|
"path": "$" + jsonName,
|
||||||
|
},
|
||||||
|
})
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
return collection.Aggregate(db.Context(), append(pipelineHead, pipelineTail...), opts)
|
||||||
|
}
|
||||||
@@ -6,7 +6,7 @@ import (
|
|||||||
"github.com/mainnika/mongox-go-driver/mongox"
|
"github.com/mainnika/mongox-go-driver/mongox"
|
||||||
"github.com/mainnika/mongox-go-driver/mongox/errors"
|
"github.com/mainnika/mongox-go-driver/mongox/errors"
|
||||||
"github.com/mainnika/mongox-go-driver/mongox/query"
|
"github.com/mainnika/mongox-go-driver/mongox/query"
|
||||||
"go.mongodb.org/mongo-driver/mongo/options"
|
"go.mongodb.org/mongo-driver/mongo"
|
||||||
)
|
)
|
||||||
|
|
||||||
// LoadArray loads an array of documents from the database by query
|
// LoadArray loads an array of documents from the database by query
|
||||||
@@ -31,16 +31,17 @@ func LoadArray(db *mongox.Database, target interface{}, filters ...interface{})
|
|||||||
panic(errors.InternalErrorf("target slice should contain ptrs"))
|
panic(errors.InternalErrorf("target slice should contain ptrs"))
|
||||||
}
|
}
|
||||||
|
|
||||||
dummy := reflect.Zero(targetSliceElemT)
|
|
||||||
collection := db.GetCollectionOf(dummy.Interface())
|
|
||||||
opts := options.Find()
|
|
||||||
composed := query.Compose(filters...)
|
composed := query.Compose(filters...)
|
||||||
|
hasPreloader, _ := composed.Preloader()
|
||||||
|
|
||||||
opts.Sort = composed.Sorter()
|
var result *mongo.Cursor
|
||||||
opts.Limit = composed.Limiter()
|
var err error
|
||||||
opts.Skip = composed.Skipper()
|
|
||||||
|
|
||||||
result, err := collection.Find(db.Context(), composed.M(), opts)
|
if hasPreloader {
|
||||||
|
result, err = createAggregateLoad(db, target, composed)
|
||||||
|
} else {
|
||||||
|
result, err = createSimpleLoad(db, target, composed)
|
||||||
|
}
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return errors.InternalErrorf("can't create find result: %s", err)
|
return errors.InternalErrorf("can't create find result: %s", err)
|
||||||
}
|
}
|
||||||
|
|||||||
@@ -7,7 +7,6 @@ import (
|
|||||||
"github.com/mainnika/mongox-go-driver/mongox/errors"
|
"github.com/mainnika/mongox-go-driver/mongox/errors"
|
||||||
"github.com/mainnika/mongox-go-driver/mongox/query"
|
"github.com/mainnika/mongox-go-driver/mongox/query"
|
||||||
"go.mongodb.org/mongo-driver/mongo"
|
"go.mongodb.org/mongo-driver/mongo"
|
||||||
"go.mongodb.org/mongo-driver/mongo/options"
|
|
||||||
)
|
)
|
||||||
|
|
||||||
// ManyLoader is a controller for a database cursor
|
// ManyLoader is a controller for a database cursor
|
||||||
@@ -43,15 +42,17 @@ func (l *ManyLoader) Close() error {
|
|||||||
// LoadMany function loads documents one by one into a target channel
|
// LoadMany function loads documents one by one into a target channel
|
||||||
func LoadMany(db *mongox.Database, target interface{}, filters ...interface{}) (*ManyLoader, error) {
|
func LoadMany(db *mongox.Database, target interface{}, filters ...interface{}) (*ManyLoader, error) {
|
||||||
|
|
||||||
collection := db.GetCollectionOf(target)
|
var cursor *mongo.Cursor
|
||||||
opts := options.Find()
|
var err error
|
||||||
|
|
||||||
composed := query.Compose(filters...)
|
composed := query.Compose(filters...)
|
||||||
|
hasPreloader, _ := composed.Preloader()
|
||||||
|
|
||||||
opts.Sort = composed.Sorter()
|
if hasPreloader {
|
||||||
opts.Limit = composed.Limiter()
|
cursor, err = createAggregateLoad(db, target, composed)
|
||||||
opts.Skip = composed.Skipper()
|
} else {
|
||||||
|
cursor, err = createSimpleLoad(db, target, composed)
|
||||||
cursor, err := collection.Find(db.Context(), composed.M(), opts)
|
}
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return nil, errors.InternalErrorf("can't create find result: %s", err)
|
return nil, errors.InternalErrorf("can't create find result: %s", err)
|
||||||
}
|
}
|
||||||
|
|||||||
@@ -30,6 +30,7 @@ func Push(q *Query, f interface{}) bool {
|
|||||||
ok = ok || applySort(q, f)
|
ok = ok || applySort(q, f)
|
||||||
ok = ok || applySkip(q, f)
|
ok = ok || applySkip(q, f)
|
||||||
ok = ok || applyProtection(q, f)
|
ok = ok || applyProtection(q, f)
|
||||||
|
ok = ok || applyPreloader(q, f)
|
||||||
|
|
||||||
return ok
|
return ok
|
||||||
}
|
}
|
||||||
@@ -108,3 +109,13 @@ func applyProtection(q *Query, f interface{}) bool {
|
|||||||
|
|
||||||
return true
|
return true
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func applyPreloader(q *Query, f interface{}) bool {
|
||||||
|
|
||||||
|
if f, ok := f.(Preloader); ok {
|
||||||
|
q.preloader = f
|
||||||
|
return true
|
||||||
|
}
|
||||||
|
|
||||||
|
return false
|
||||||
|
}
|
||||||
|
|||||||
@@ -0,0 +1,17 @@
|
|||||||
|
package query
|
||||||
|
|
||||||
|
// Preloader is a filter to skip the result
|
||||||
|
type Preloader interface {
|
||||||
|
Preload() []string
|
||||||
|
}
|
||||||
|
|
||||||
|
// Preload is a simple implementation of the Skipper filter
|
||||||
|
type Preload []string
|
||||||
|
|
||||||
|
var _ Preloader = Preload{}
|
||||||
|
|
||||||
|
// Preload returns a preload list
|
||||||
|
func (l Preload) Preload() []string {
|
||||||
|
|
||||||
|
return Preload(l)
|
||||||
|
}
|
||||||
@@ -12,6 +12,7 @@ type Query struct {
|
|||||||
limiter Limiter
|
limiter Limiter
|
||||||
sorter Sorter
|
sorter Sorter
|
||||||
skipper Skipper
|
skipper Skipper
|
||||||
|
preloader Preloader
|
||||||
}
|
}
|
||||||
|
|
||||||
// And function pushes the elem query to the $and array of the query
|
// And function pushes the elem query to the $and array of the query
|
||||||
@@ -63,6 +64,22 @@ func (q *Query) Skipper() *int64 {
|
|||||||
return q.skipper.Skip()
|
return q.skipper.Skip()
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// Preloader is a preloader list for a query
|
||||||
|
func (q *Query) Preloader() (empty bool, preloader []string) {
|
||||||
|
|
||||||
|
if q.preloader == nil {
|
||||||
|
return false, nil
|
||||||
|
}
|
||||||
|
|
||||||
|
preloader = q.preloader.Preload()
|
||||||
|
|
||||||
|
if len(preloader) == 0 {
|
||||||
|
return false, nil
|
||||||
|
}
|
||||||
|
|
||||||
|
return true, preloader
|
||||||
|
}
|
||||||
|
|
||||||
// Empty checks the query for any content
|
// Empty checks the query for any content
|
||||||
func (q *Query) Empty() bool {
|
func (q *Query) Empty() bool {
|
||||||
|
|
||||||
|
|||||||
Reference in New Issue
Block a user