From 7f460082276a02766b52edb54070b2a8e02bfff9 Mon Sep 17 00:00:00 2001 From: Nikita Tokarchuk Date: Fri, 6 Mar 2020 00:22:00 +0100 Subject: [PATCH] Merge common functions into database interface --- mongox/common/common.go | 162 ------------------ mongox/{common => database}/count.go | 9 +- mongox/database/database.go | 152 ++++++++++++++++ mongox/{common => database}/deletearray.go | 9 +- mongox/{common => database}/deleteone.go | 9 +- mongox/{common => database}/loadarray.go | 13 +- mongox/{common => database}/loadone.go | 11 +- mongox/database/loadstream.go | 33 ++++ mongox/{common => database}/saveone.go | 9 +- .../streamloader.go} | 41 ++--- mongox/mongox.go | 16 ++ 11 files changed, 242 insertions(+), 222 deletions(-) delete mode 100644 mongox/common/common.go rename mongox/{common => database}/count.go (68%) rename mongox/{common => database}/deletearray.go (81%) rename mongox/{common => database}/deleteone.go (78%) rename mongox/{common => database}/loadarray.go (80%) rename mongox/{common => database}/loadone.go (68%) create mode 100644 mongox/database/loadstream.go rename mongox/{common => database}/saveone.go (75%) rename mongox/{common/loadstream.go => database/streamloader.go} (57%) diff --git a/mongox/common/common.go b/mongox/common/common.go deleted file mode 100644 index f7ea8fb..0000000 --- a/mongox/common/common.go +++ /dev/null @@ -1,162 +0,0 @@ -package common - -import ( - "fmt" - "reflect" - "strconv" - "strings" - - "go.mongodb.org/mongo-driver/bson/primitive" - "go.mongodb.org/mongo-driver/mongo" - "go.mongodb.org/mongo-driver/mongo/options" - - "github.com/mainnika/mongox-go-driver/v2/mongox" - "github.com/mainnika/mongox-go-driver/v2/mongox/query" -) - -func createSimpleLoad(db mongox.Database, target interface{}, composed *query.Query) (cursor *mongo.Cursor, err error) { - - collection := db.GetCollectionOf(target) - opts := options.Find() - - opts.Sort = composed.Sorter() - opts.Limit = composed.Limiter() - opts.Skip = composed.Skipper() - - return collection.Find(db.Context(), composed.M(), opts) -} - -func createAggregateLoad(db mongox.Database, target interface{}, composed *query.Query) (cursor *mongo.Cursor, err error) { - - collection := db.GetCollectionOf(target) - opts := options.Aggregate() - - pipeline := primitive.A{} - - if !composed.Empty() { - pipeline = append(pipeline, primitive.M{"$match": primitive.M{"$expr": composed.M()}}) - } - if composed.Sorter() != nil { - pipeline = append(pipeline, primitive.M{"$sort": composed.Sorter()}) - } - if composed.Skipper() != nil { - pipeline = append(pipeline, primitive.M{"$skip": *composed.Skipper()}) - } - if composed.Limiter() != nil { - pipeline = append(pipeline, primitive.M{"$limit": *composed.Limiter()}) - } - - el := reflect.ValueOf(target).Elem() - elType := el.Type() - numField := elType.NumField() - _, preloads := composed.Preloader() - - for i := 0; i < numField; i++ { - - field := elType.Field(i) - tag := field.Tag - - preloadTag, ok := tag.Lookup("preload") - if !ok { - continue - } - jsonTag, ok := tag.Lookup("json") - if jsonTag == "-" { - return nil, fmt.Errorf("preload private field is impossible") - } - - jsonData := strings.SplitN(jsonTag, ",", 2) - jsonName := field.Name - if len(jsonData) > 0 { - jsonName = strings.TrimSpace(jsonData[0]) - } - - preloadData := strings.Split(preloadTag, ",") - if len(preloadData) == 0 { - continue - } - if len(preloadData) == 1 { - panic("there is no foreign field") - } - - localField := strings.TrimSpace(preloadData[0]) - if len(localField) == 0 { - localField = "_id" - } - - foreignField := strings.TrimSpace(preloadData[1]) - if len(foreignField) == 0 { - panic("there is no foreign field") - } - - preloadLimiter := 100 - preloadReversed := false - if len(preloadData) > 2 { - stringLimit := strings.TrimSpace(preloadData[2]) - intLimit := preloadLimiter - - preloadReversed = strings.HasPrefix(stringLimit, "-") - if preloadReversed { - stringLimit = stringLimit[1:] - } - - intLimit, err = strconv.Atoi(stringLimit) - if err == nil { - preloadLimiter = intLimit - } - } - - for _, preload := range preloads { - if preload != jsonName { - continue - } - - isSlice := el.Field(i).Kind() == reflect.Slice - - typ := el.Field(i).Type() - if typ.Kind() == reflect.Slice { - typ = typ.Elem() - } - if typ.Kind() != reflect.Ptr { - panic("preload field should have ptr type") - } - - lookupCollection := db.GetCollectionOf(reflect.Zero(typ).Interface()) - lookupVars := primitive.M{"selector": "$" + localField} - lookupPipeline := primitive.A{ - primitive.M{"$match": primitive.M{"$expr": primitive.M{"$eq": primitive.A{"$" + foreignField, "$$selector"}}}}, - } - - if preloadReversed { - lookupPipeline = append(lookupPipeline, primitive.M{"$sort": primitive.M{"_id": -1}}) - } - if isSlice && preloadLimiter > 0 { - lookupPipeline = append(lookupPipeline, primitive.M{"$limit": preloadLimiter}) - } else if !isSlice { - lookupPipeline = append(lookupPipeline, primitive.M{"$limit": 1}) - } - - pipeline = append(pipeline, primitive.M{ - "$lookup": primitive.M{ - "from": lookupCollection.Name(), - "let": lookupVars, - "pipeline": lookupPipeline, - "as": jsonName, - }, - }) - - if isSlice { - continue - } - - pipeline = append(pipeline, primitive.M{ - "$unwind": primitive.M{ - "preserveNullAndEmptyArrays": true, - "path": "$" + jsonName, - }, - }) - } - } - - return collection.Aggregate(db.Context(), pipeline, opts) -} diff --git a/mongox/common/count.go b/mongox/database/count.go similarity index 68% rename from mongox/common/count.go rename to mongox/database/count.go index 825f78b..955efdc 100644 --- a/mongox/common/count.go +++ b/mongox/database/count.go @@ -1,4 +1,4 @@ -package common +package database import ( "fmt" @@ -6,22 +6,21 @@ import ( "go.mongodb.org/mongo-driver/mongo" "go.mongodb.org/mongo-driver/mongo/options" - "github.com/mainnika/mongox-go-driver/v2/mongox" "github.com/mainnika/mongox-go-driver/v2/mongox/query" ) // Count function counts documents in the database by query // target is used only to get collection by tag so it'd be better to use nil ptr here -func Count(db mongox.Database, target interface{}, filters ...interface{}) (int64, error) { +func (d *Database) Count(target interface{}, filters ...interface{}) (int64, error) { - collection := db.GetCollectionOf(target) + collection := d.GetCollectionOf(target) opts := options.Count() composed := query.Compose(filters...) opts.Limit = composed.Limiter() opts.Skip = composed.Skipper() - result, err := collection.CountDocuments(db.Context(), composed.M(), opts) + result, err := collection.CountDocuments(d.Context(), composed.M(), opts) if err == mongo.ErrNoDocuments { return 0, err } diff --git a/mongox/database/database.go b/mongox/database/database.go index 14019b0..a6c6eee 100644 --- a/mongox/database/database.go +++ b/mongox/database/database.go @@ -4,10 +4,15 @@ import ( "context" "fmt" "reflect" + "strconv" + "strings" + "go.mongodb.org/mongo-driver/bson/primitive" "go.mongodb.org/mongo-driver/mongo" + "go.mongodb.org/mongo-driver/mongo/options" "github.com/mainnika/mongox-go-driver/v2/mongox" + "github.com/mainnika/mongox-go-driver/v2/mongox/query" ) // Database handler @@ -80,3 +85,150 @@ func (d *Database) GetCollectionOf(document interface{}) mongox.MongoCollection panic(fmt.Errorf("document %v does not have a collection tag", document)) } + +func (d *Database) createSimpleLoad(target interface{}, composed *query.Query) (cursor *mongo.Cursor, err error) { + + collection := d.GetCollectionOf(target) + opts := options.Find() + + opts.Sort = composed.Sorter() + opts.Limit = composed.Limiter() + opts.Skip = composed.Skipper() + + return collection.Find(d.Context(), composed.M(), opts) +} + +func (d *Database) createAggregateLoad(target interface{}, composed *query.Query) (cursor *mongo.Cursor, err error) { + + collection := d.GetCollectionOf(target) + opts := options.Aggregate() + + pipeline := primitive.A{} + + if !composed.Empty() { + pipeline = append(pipeline, primitive.M{"$match": primitive.M{"$expr": composed.M()}}) + } + if composed.Sorter() != nil { + pipeline = append(pipeline, primitive.M{"$sort": composed.Sorter()}) + } + if composed.Skipper() != nil { + pipeline = append(pipeline, primitive.M{"$skip": *composed.Skipper()}) + } + if composed.Limiter() != nil { + pipeline = append(pipeline, primitive.M{"$limit": *composed.Limiter()}) + } + + el := reflect.ValueOf(target).Elem() + elType := el.Type() + numField := elType.NumField() + _, preloads := composed.Preloader() + + for i := 0; i < numField; i++ { + + field := elType.Field(i) + tag := field.Tag + + preloadTag, ok := tag.Lookup("preload") + if !ok { + continue + } + jsonTag, ok := tag.Lookup("json") + if jsonTag == "-" { + return nil, fmt.Errorf("preload private field is impossible") + } + + jsonData := strings.SplitN(jsonTag, ",", 2) + jsonName := field.Name + if len(jsonData) > 0 { + jsonName = strings.TrimSpace(jsonData[0]) + } + + preloadData := strings.Split(preloadTag, ",") + if len(preloadData) == 0 { + continue + } + if len(preloadData) == 1 { + panic("there is no foreign field") + } + + localField := strings.TrimSpace(preloadData[0]) + if len(localField) == 0 { + localField = "_id" + } + + foreignField := strings.TrimSpace(preloadData[1]) + if len(foreignField) == 0 { + panic("there is no foreign field") + } + + preloadLimiter := 100 + preloadReversed := false + if len(preloadData) > 2 { + stringLimit := strings.TrimSpace(preloadData[2]) + intLimit := preloadLimiter + + preloadReversed = strings.HasPrefix(stringLimit, "-") + if preloadReversed { + stringLimit = stringLimit[1:] + } + + intLimit, err = strconv.Atoi(stringLimit) + if err == nil { + preloadLimiter = intLimit + } + } + + for _, preload := range preloads { + if preload != jsonName { + continue + } + + isSlice := el.Field(i).Kind() == reflect.Slice + + typ := el.Field(i).Type() + if typ.Kind() == reflect.Slice { + typ = typ.Elem() + } + if typ.Kind() != reflect.Ptr { + panic("preload field should have ptr type") + } + + lookupCollection := d.GetCollectionOf(reflect.Zero(typ).Interface()) + lookupVars := primitive.M{"selector": "$" + localField} + lookupPipeline := primitive.A{ + primitive.M{"$match": primitive.M{"$expr": primitive.M{"$eq": primitive.A{"$" + foreignField, "$$selector"}}}}, + } + + if preloadReversed { + lookupPipeline = append(lookupPipeline, primitive.M{"$sort": primitive.M{"_id": -1}}) + } + if isSlice && preloadLimiter > 0 { + lookupPipeline = append(lookupPipeline, primitive.M{"$limit": preloadLimiter}) + } else if !isSlice { + lookupPipeline = append(lookupPipeline, primitive.M{"$limit": 1}) + } + + pipeline = append(pipeline, primitive.M{ + "$lookup": primitive.M{ + "from": lookupCollection.Name(), + "let": lookupVars, + "pipeline": lookupPipeline, + "as": jsonName, + }, + }) + + if isSlice { + continue + } + + pipeline = append(pipeline, primitive.M{ + "$unwind": primitive.M{ + "preserveNullAndEmptyArrays": true, + "path": "$" + jsonName, + }, + }) + } + } + + return collection.Aggregate(d.Context(), pipeline, opts) +} diff --git a/mongox/common/deletearray.go b/mongox/database/deletearray.go similarity index 81% rename from mongox/common/deletearray.go rename to mongox/database/deletearray.go index 2102409..86dc23b 100644 --- a/mongox/common/deletearray.go +++ b/mongox/database/deletearray.go @@ -1,4 +1,4 @@ -package common +package database import ( "fmt" @@ -7,12 +7,11 @@ import ( "go.mongodb.org/mongo-driver/bson/primitive" "go.mongodb.org/mongo-driver/mongo/options" - "github.com/mainnika/mongox-go-driver/v2/mongox" "github.com/mainnika/mongox-go-driver/v2/mongox/base" ) // DeleteArray removes documents list from a database by their ids -func DeleteArray(db mongox.Database, target interface{}) error { +func (d *Database) DeleteArray(target interface{}) error { targetV := reflect.ValueOf(target) targetT := targetV.Type() @@ -35,7 +34,7 @@ func DeleteArray(db mongox.Database, target interface{}) error { zeroElem := reflect.Zero(targetSliceElemT) targetLen := targetSliceV.Len() - collection := db.GetCollectionOf(zeroElem.Interface()) + collection := d.GetCollectionOf(zeroElem.Interface()) opts := options.Delete() ids := primitive.A{} @@ -48,7 +47,7 @@ func DeleteArray(db mongox.Database, target interface{}) error { return fmt.Errorf("can't delete zero elements") } - result, err := collection.DeleteMany(db.Context(), primitive.M{"_id": primitive.M{"$in": ids}}, opts) + result, err := collection.DeleteMany(d.Context(), primitive.M{"_id": primitive.M{"$in": ids}}, opts) if err != nil { return fmt.Errorf("can't create find and delete result: %w", err) } diff --git a/mongox/common/deleteone.go b/mongox/database/deleteone.go similarity index 78% rename from mongox/common/deleteone.go rename to mongox/database/deleteone.go index 280b2ed..f24e5f2 100644 --- a/mongox/common/deleteone.go +++ b/mongox/database/deleteone.go @@ -1,4 +1,4 @@ -package common +package database import ( "fmt" @@ -8,15 +8,14 @@ import ( "go.mongodb.org/mongo-driver/mongo" "go.mongodb.org/mongo-driver/mongo/options" - "github.com/mainnika/mongox-go-driver/v2/mongox" "github.com/mainnika/mongox-go-driver/v2/mongox/base" "github.com/mainnika/mongox-go-driver/v2/mongox/query" ) // DeleteOne removes a document from a database and then returns it into target -func DeleteOne(db mongox.Database, target interface{}, filters ...interface{}) error { +func (d *Database) DeleteOne(target interface{}, filters ...interface{}) error { - collection := db.GetCollectionOf(target) + collection := d.GetCollectionOf(target) opts := &options.FindOneAndDeleteOptions{} composed := query.Compose(filters...) protected := base.GetProtection(target) @@ -33,7 +32,7 @@ func DeleteOne(db mongox.Database, target interface{}, filters ...interface{}) e protected.V = time.Now().Unix() } - result := collection.FindOneAndDelete(db.Context(), composed.M(), opts) + result := collection.FindOneAndDelete(d.Context(), composed.M(), opts) if result.Err() != nil { return fmt.Errorf("can't create find one and delete result: %w", result.Err()) } diff --git a/mongox/common/loadarray.go b/mongox/database/loadarray.go similarity index 80% rename from mongox/common/loadarray.go rename to mongox/database/loadarray.go index 30c9c42..84dc393 100644 --- a/mongox/common/loadarray.go +++ b/mongox/database/loadarray.go @@ -1,4 +1,4 @@ -package common +package database import ( "fmt" @@ -6,13 +6,12 @@ import ( "go.mongodb.org/mongo-driver/mongo" - "github.com/mainnika/mongox-go-driver/v2/mongox" "github.com/mainnika/mongox-go-driver/v2/mongox/base" "github.com/mainnika/mongox-go-driver/v2/mongox/query" ) // LoadArray loads an array of documents from the database by query -func LoadArray(db mongox.Database, target interface{}, filters ...interface{}) error { +func (d *Database) LoadArray(target interface{}, filters ...interface{}) error { targetV := reflect.ValueOf(target) targetT := targetV.Type() @@ -41,18 +40,18 @@ func LoadArray(db mongox.Database, target interface{}, filters ...interface{}) e var err error if hasPreloader { - result, err = createAggregateLoad(db, zeroElem.Interface(), composed) + result, err = d.createAggregateLoad(zeroElem.Interface(), composed) } else { - result, err = createSimpleLoad(db, zeroElem.Interface(), composed) + result, err = d.createSimpleLoad(zeroElem.Interface(), composed) } if err != nil { return fmt.Errorf("can't create find result: %w", err) } - defer result.Close(db.Context()) + defer result.Close(d.Context()) var i int - for i = 0; result.Next(db.Context()); { + for i = 0; result.Next(d.Context()); { if targetSliceV.Len() == i { elem := reflect.New(targetSliceElemT.Elem()) if err = result.Decode(elem.Interface()); err == nil { diff --git a/mongox/common/loadone.go b/mongox/database/loadone.go similarity index 68% rename from mongox/common/loadone.go rename to mongox/database/loadone.go index b8ecd56..7b6a167 100644 --- a/mongox/common/loadone.go +++ b/mongox/database/loadone.go @@ -1,17 +1,16 @@ -package common +package database import ( "fmt" "go.mongodb.org/mongo-driver/mongo" - "github.com/mainnika/mongox-go-driver/v2/mongox" "github.com/mainnika/mongox-go-driver/v2/mongox/base" "github.com/mainnika/mongox-go-driver/v2/mongox/query" ) // LoadOne function loads a first single target document by a query -func LoadOne(db mongox.Database, target interface{}, filters ...interface{}) error { +func (d *Database) LoadOne(target interface{}, filters ...interface{}) error { composed := query.Compose(append(filters, query.Limit(1))...) hasPreloader, _ := composed.Preloader() @@ -20,15 +19,15 @@ func LoadOne(db mongox.Database, target interface{}, filters ...interface{}) err var err error if hasPreloader { - result, err = createAggregateLoad(db, target, composed) + result, err = d.createAggregateLoad(target, composed) } else { - result, err = createSimpleLoad(db, target, composed) + result, err = d.createSimpleLoad(target, composed) } if err != nil { return fmt.Errorf("can't create find result: %w", err) } - hasNext := result.Next(db.Context()) + hasNext := result.Next(d.Context()) if result.Err() != nil { return err } diff --git a/mongox/database/loadstream.go b/mongox/database/loadstream.go new file mode 100644 index 0000000..181a264 --- /dev/null +++ b/mongox/database/loadstream.go @@ -0,0 +1,33 @@ +package database + +import ( + "fmt" + + "go.mongodb.org/mongo-driver/mongo" + + "github.com/mainnika/mongox-go-driver/v2/mongox" + "github.com/mainnika/mongox-go-driver/v2/mongox/query" +) + +// LoadStream function loads documents one by one into a target channel +func (d *Database) LoadStream(target interface{}, filters ...interface{}) (mongox.StreamLoader, error) { + + var cursor *mongo.Cursor + var err error + + composed := query.Compose(filters...) + hasPreloader, _ := composed.Preloader() + + if hasPreloader { + cursor, err = d.createAggregateLoad(target, composed) + } else { + cursor, err = d.createSimpleLoad(target, composed) + } + if err != nil { + return nil, fmt.Errorf("can't create find result: %w", err) + } + + l := &StreamLoader{Cursor: cursor, ctx: d.Context(), target: target} + + return l, nil +} diff --git a/mongox/common/saveone.go b/mongox/database/saveone.go similarity index 75% rename from mongox/common/saveone.go rename to mongox/database/saveone.go index eb6d221..e0d660b 100644 --- a/mongox/common/saveone.go +++ b/mongox/database/saveone.go @@ -1,4 +1,4 @@ -package common +package database import ( "time" @@ -7,15 +7,14 @@ import ( "go.mongodb.org/mongo-driver/bson/primitive" "go.mongodb.org/mongo-driver/mongo/options" - "github.com/mainnika/mongox-go-driver/v2/mongox" "github.com/mainnika/mongox-go-driver/v2/mongox/base" "github.com/mainnika/mongox-go-driver/v2/mongox/query" ) // SaveOne saves a single source document to the database -func SaveOne(db mongox.Database, source interface{}) error { +func (d *Database) SaveOne(source interface{}) error { - collection := db.GetCollectionOf(source) + collection := d.GetCollectionOf(source) opts := options.FindOneAndReplace() id := base.GetID(source) protected := base.GetProtection(source) @@ -30,7 +29,7 @@ func SaveOne(db mongox.Database, source interface{}) error { protected.V = time.Now().Unix() } - result := collection.FindOneAndReplace(db.Context(), composed.M(), source, opts) + result := collection.FindOneAndReplace(d.Context(), composed.M(), source, opts) if result.Err() != nil { return result.Err() } diff --git a/mongox/common/loadstream.go b/mongox/database/streamloader.go similarity index 57% rename from mongox/common/loadstream.go rename to mongox/database/streamloader.go index 5cf2aaa..895709c 100644 --- a/mongox/common/loadstream.go +++ b/mongox/database/streamloader.go @@ -1,4 +1,4 @@ -package common +package database import ( "context" @@ -6,9 +6,7 @@ import ( "go.mongodb.org/mongo-driver/mongo" - "github.com/mainnika/mongox-go-driver/v2/mongox" "github.com/mainnika/mongox-go-driver/v2/mongox/base" - "github.com/mainnika/mongox-go-driver/v2/mongox/query" ) // StreamLoader is a controller for a database cursor @@ -32,7 +30,19 @@ func (l *StreamLoader) DecodeNext() error { base.Reset(l.target) - err := l.Decode(l.target) + err := l.Cursor.Decode(l.target) + if err != nil { + return fmt.Errorf("can't decode desult: %w", err) + } + + return nil +} + +func (l *StreamLoader) Decode() error { + + base.Reset(l.target) + + err := l.Cursor.Decode(l.target) if err != nil { return fmt.Errorf("can't decode desult: %w", err) } @@ -60,26 +70,3 @@ func (l *StreamLoader) Close() error { return l.Cursor.Close(l.ctx) } - -// LoadStream function loads documents one by one into a target channel -func LoadStream(db mongox.Database, target interface{}, filters ...interface{}) (*StreamLoader, error) { - - var cursor *mongo.Cursor - var err error - - composed := query.Compose(filters...) - hasPreloader, _ := composed.Preloader() - - if hasPreloader { - cursor, err = createAggregateLoad(db, target, composed) - } else { - cursor, err = createSimpleLoad(db, target, composed) - } - if err != nil { - return nil, fmt.Errorf("can't create find result: %w", err) - } - - l := &StreamLoader{Cursor: cursor, ctx: db.Context(), target: target} - - return l, nil -} diff --git a/mongox/mongox.go b/mongox/mongox.go index 422002d..04ce522 100644 --- a/mongox/mongox.go +++ b/mongox/mongox.go @@ -16,6 +16,22 @@ type Database interface { Name() string New(ctx context.Context) Database GetCollectionOf(document interface{}) MongoCollection + Count(target interface{}, filters ...interface{}) (int64, error) + DeleteArray(target interface{}) error + DeleteOne(target interface{}, filters ...interface{}) error + LoadArray(target interface{}, filters ...interface{}) error + LoadOne(target interface{}, filters ...interface{}) error + LoadStream(target interface{}, filters ...interface{}) (StreamLoader, error) + SaveOne(source interface{}) error +} + +// StreamLoader is a interface to control database cursor +type StreamLoader interface { + DecodeNext() error + Decode() error + Next() error + Close() error + Err() error } // MongoClient is the mongo client interface