Prepare for v3

This commit is contained in:
2023-06-10 00:44:20 +02:00
parent 7c3e50e783
commit 23029ae710
42 changed files with 880 additions and 719 deletions
+23
View File
@@ -0,0 +1,23 @@
package database
import (
"context"
)
type ctxDatabaseKey struct{}
// GetFromContext function extracts the request data from context
func GetFromContext(ctx context.Context) (q *Database, ok bool) {
q, ok = ctx.Value(ctxDatabaseKey{}).(*Database)
if !ok {
return nil, false
}
return q, true
}
// WithContext creates the new context with a database attached
func WithContext(ctx context.Context, q *Database) (withQuery context.Context) {
db := NewDatabase(ctx, q.Client(), q.Name())
return context.WithValue(ctx, ctxDatabaseKey{}, db)
}
+13 -6
View File
@@ -9,22 +9,29 @@ import (
// Count function counts documents in the database by query
// target is used only to get collection by tag so it'd be better to use nil ptr here
func (d *Database) Count(target interface{}, filters ...interface{}) (result int64, err error) {
composed, err := query.Compose(filters...)
if err != nil {
return
return -1, err
}
collection := d.GetCollectionOf(target)
collection, err := d.GetCollectionOf(target)
if err != nil {
return -1, err
}
ctx := query.WithContext(d.Context(), composed)
m := composed.M()
opts := options.Count()
opts.Limit = composed.Limiter()
opts.Skip = composed.Skipper()
result, err = collection.CountDocuments(ctx, composed.M(), opts)
defer func() { _ = composed.OnClose().Invoke(ctx, target) }()
_ = composed.OnClose().Invoke(ctx, target)
result, err = collection.CountDocuments(ctx, m, opts)
if err != nil {
return -1, err
}
return
return result, nil
}
+188
View File
@@ -0,0 +1,188 @@
package database
import (
"fmt"
"reflect"
"strconv"
"strings"
"go.mongodb.org/mongo-driver/bson/primitive"
"go.mongodb.org/mongo-driver/mongo/options"
"github.com/mainnika/mongox-go-driver/v2/mongox"
"github.com/mainnika/mongox-go-driver/v2/mongox/query"
)
func (d *Database) createCursor(target interface{}, composed *query.Query) (cursor *mongox.Cursor, err error) {
_, hasPreloader := composed.Preloader()
if hasPreloader {
return d.createAggregateCursor(target, composed)
}
return d.createSimpleCursor(target, composed)
}
func (d *Database) createSimpleCursor(target interface{}, composed *query.Query) (cursor *mongox.Cursor, err error) {
collection, err := d.GetCollectionOf(target)
if err != nil {
return nil, err
}
opts := options.Find()
opts.Sort = composed.Sorter()
opts.Limit = composed.Limiter()
opts.Skip = composed.Skipper()
ctx := d.Context()
m := composed.M()
return collection.Find(ctx, m, opts)
}
func (d *Database) createAggregateCursor(target interface{}, composed *query.Query) (cursor *mongox.Cursor, err error) {
collection, err := d.GetCollectionOf(target)
if err != nil {
return nil, err
}
pipeline := primitive.A{}
if !composed.Empty() {
pipeline = append(pipeline, primitive.M{"$match": composed.M()})
}
if composed.Sorter() != nil {
pipeline = append(pipeline, primitive.M{"$sort": composed.Sorter()})
}
if composed.Skipper() != nil {
pipeline = append(pipeline, primitive.M{"$skip": *composed.Skipper()})
}
if composed.Limiter() != nil {
pipeline = append(pipeline, primitive.M{"$limit": *composed.Limiter()})
}
el := reflect.ValueOf(target)
elType := el.Type()
if elType.Kind() == reflect.Ptr {
elType = elType.Elem()
}
numField := elType.NumField()
preloads, _ := composed.Preloader()
for i := 0; i < numField; i++ {
field := elType.Field(i)
tag := field.Tag
preloadTag, ok := tag.Lookup("preload")
if !ok {
continue
}
jsonTag, _ := tag.Lookup("json")
if jsonTag == "-" {
return nil, fmt.Errorf("%w: private field is not preloadable", mongox.ErrMalformedBase)
}
jsonData := strings.SplitN(jsonTag, ",", 2)
jsonName := field.Name
if len(jsonData) > 0 {
jsonName = strings.TrimSpace(jsonData[0])
}
preloadData := strings.Split(preloadTag, ",")
if len(preloadData) == 0 {
continue
}
if len(preloadData) == 1 {
return nil, fmt.Errorf("%w: foreign field is not specified", mongox.ErrMalformedBase)
}
foreignField := strings.TrimSpace(preloadData[1])
if len(foreignField) == 0 {
return nil, fmt.Errorf("%w: foreign field is empty", mongox.ErrMalformedBase)
}
localField := strings.TrimSpace(preloadData[0])
if len(localField) == 0 {
localField = "_id"
}
preloadLimiter := 100
preloadReversed := false
if len(preloadData) > 2 {
stringLimit := strings.TrimSpace(preloadData[2])
intLimit := preloadLimiter
preloadReversed = strings.HasPrefix(stringLimit, "-")
if preloadReversed {
stringLimit = stringLimit[1:]
}
intLimit, err = strconv.Atoi(stringLimit)
if err == nil {
preloadLimiter = intLimit
} else {
return nil, fmt.Errorf("%w: preload limit should be an integer", mongox.ErrMalformedBase)
}
}
for _, preload := range preloads {
if preload != jsonName {
continue
}
field := elType.Field(i)
fieldType := field.Type
isSlice := fieldType.Kind() == reflect.Slice
if isSlice {
fieldType = fieldType.Elem()
}
isPtr := fieldType.Kind() != reflect.Ptr
if isPtr {
return nil, fmt.Errorf("%w: preload field should have ptr type", mongox.ErrMalformedBase)
}
lookupCollection, err := d.GetCollectionOf(reflect.Zero(fieldType).Interface())
if err != nil {
return nil, err
}
lookupVars := primitive.M{"selector": "$" + localField}
lookupPipeline := primitive.A{
primitive.M{"$match": primitive.M{"$expr": primitive.M{"$eq": primitive.A{"$" + foreignField, "$$selector"}}}},
}
if preloadReversed {
lookupPipeline = append(lookupPipeline, primitive.M{"$sort": primitive.M{"_id": -1}})
}
if isSlice && preloadLimiter > 0 {
lookupPipeline = append(lookupPipeline, primitive.M{"$limit": preloadLimiter})
} else if !isSlice {
lookupPipeline = append(lookupPipeline, primitive.M{"$limit": 1})
}
pipeline = append(pipeline, primitive.M{
"$lookup": primitive.M{
"from": lookupCollection.Name(),
"let": lookupVars,
"pipeline": lookupPipeline,
"as": jsonName,
},
})
if isSlice {
continue
}
pipeline = append(pipeline, primitive.M{
"$unwind": primitive.M{
"preserveNullAndEmptyArrays": true,
"path": "$" + jsonName,
},
})
}
}
ctx := d.Context()
opts := options.Aggregate()
return collection.Aggregate(ctx, pipeline, opts)
}
+22 -200
View File
@@ -2,34 +2,27 @@ package database
import (
"context"
"fmt"
"reflect"
"strconv"
"strings"
"go.mongodb.org/mongo-driver/bson/primitive"
"go.mongodb.org/mongo-driver/mongo/options"
"github.com/mainnika/mongox-go-driver/v2/mongox"
"github.com/mainnika/mongox-go-driver/v2/mongox/query"
)
// Database handler
type Database struct {
client *mongox.Client
dbname string
name string
ctx context.Context
}
// NewDatabase function creates new database instance with mongo client and empty context
func NewDatabase(client *mongox.Client, dbname string) (db mongox.Database) {
func NewDatabase(ctx context.Context, client *mongox.Client, name string) (db mongox.Database) {
db = &Database{
client: client,
dbname: dbname,
name: name,
ctx: ctx,
}
return
return db
}
// Client function returns a mongo client
@@ -37,213 +30,42 @@ func (d *Database) Client() (client *mongox.Client) {
return d.client
}
// Name function returns a database name
func (d *Database) Name() (name string) {
return d.name
}
// Context function returns a context
func (d *Database) Context() (ctx context.Context) {
ctx = d.ctx
if ctx == nil {
ctx = context.Background()
}
return
}
// Name function returns a database name
func (d *Database) Name() (name string) {
return d.dbname
}
// New function creates new database context with same client
func (d *Database) New(ctx context.Context) (db mongox.Database) {
if ctx == nil {
ctx = context.Background()
}
db = &Database{
client: d.client,
dbname: d.dbname,
ctx: ctx,
}
return
return ctx
}
// GetCollectionOf returns the collection object by the «collection» tag of the given document;
// the «collection» tag should exists, e.g.:
// type Foobar struct {
// base.ObjectID `bson:",inline" json:",inline" collection:"foobars"`
// ...
// Will panic if there is no «collection» tag
func (d *Database) GetCollectionOf(document interface{}) (collection *mongox.Collection) {
//
// example:
// type Foobar struct {
// base.ObjectID `bson:",inline" json:",inline" collection:"foobars"`
// ...
func (d *Database) GetCollectionOf(document interface{}) (collection *mongox.Collection, err error) {
el := reflect.TypeOf(document).Elem()
numField := el.NumField()
databaseName := d.name
for i := 0; i < numField; i++ {
field := el.Field(i)
tag := field.Tag
found, ok := tag.Lookup("collection")
if !ok {
collectionName, found := tag.Lookup("collection")
if !found {
continue
}
return d.client.Database(d.dbname).Collection(found)
return d.client.Database(databaseName).Collection(collectionName), nil
}
panic(fmt.Errorf("document %v does not have a collection tag", document))
}
func (d *Database) createSimpleLoad(target interface{}, composed *query.Query) (cursor *mongox.Cursor, err error) {
collection := d.GetCollectionOf(target)
opts := options.Find()
opts.Sort = composed.Sorter()
opts.Limit = composed.Limiter()
opts.Skip = composed.Skipper()
return collection.Find(d.Context(), composed.M(), opts)
}
func (d *Database) createAggregateLoad(target interface{}, composed *query.Query) (cursor *mongox.Cursor, err error) {
collection := d.GetCollectionOf(target)
opts := options.Aggregate()
pipeline := primitive.A{}
if !composed.Empty() {
pipeline = append(pipeline, primitive.M{"$match": composed.M()})
}
if composed.Sorter() != nil {
pipeline = append(pipeline, primitive.M{"$sort": composed.Sorter()})
}
if composed.Skipper() != nil {
pipeline = append(pipeline, primitive.M{"$skip": *composed.Skipper()})
}
if composed.Limiter() != nil {
pipeline = append(pipeline, primitive.M{"$limit": *composed.Limiter()})
}
el := reflect.ValueOf(target)
elType := el.Type()
if elType.Kind() == reflect.Ptr {
elType = elType.Elem()
}
numField := elType.NumField()
preloads, _ := composed.Preloader()
for i := 0; i < numField; i++ {
field := elType.Field(i)
tag := field.Tag
preloadTag, ok := tag.Lookup("preload")
if !ok {
continue
}
jsonTag, _ := tag.Lookup("json")
if jsonTag == "-" {
panic(fmt.Errorf("preload private field is impossible"))
}
jsonData := strings.SplitN(jsonTag, ",", 2)
jsonName := field.Name
if len(jsonData) > 0 {
jsonName = strings.TrimSpace(jsonData[0])
}
preloadData := strings.Split(preloadTag, ",")
if len(preloadData) == 0 {
continue
}
if len(preloadData) == 1 {
panic(fmt.Errorf("there is no foreign field"))
}
localField := strings.TrimSpace(preloadData[0])
if len(localField) == 0 {
localField = "_id"
}
foreignField := strings.TrimSpace(preloadData[1])
if len(foreignField) == 0 {
panic(fmt.Errorf("there is no foreign field"))
}
preloadLimiter := 100
preloadReversed := false
if len(preloadData) > 2 {
stringLimit := strings.TrimSpace(preloadData[2])
intLimit := preloadLimiter
preloadReversed = strings.HasPrefix(stringLimit, "-")
if preloadReversed {
stringLimit = stringLimit[1:]
}
intLimit, err = strconv.Atoi(stringLimit)
if err == nil {
preloadLimiter = intLimit
}
}
for _, preload := range preloads {
if preload != jsonName {
continue
}
field := elType.Field(i)
fieldType := field.Type
isSlice := fieldType.Kind() == reflect.Slice
if isSlice {
fieldType = fieldType.Elem()
}
isPtr := fieldType.Kind() != reflect.Ptr
if isPtr {
panic(fmt.Errorf("preload field should have ptr type"))
}
lookupCollection := d.GetCollectionOf(reflect.Zero(fieldType).Interface())
lookupVars := primitive.M{"selector": "$" + localField}
lookupPipeline := primitive.A{
primitive.M{"$match": primitive.M{"$expr": primitive.M{"$eq": primitive.A{"$" + foreignField, "$$selector"}}}},
}
if preloadReversed {
lookupPipeline = append(lookupPipeline, primitive.M{"$sort": primitive.M{"_id": -1}})
}
if isSlice && preloadLimiter > 0 {
lookupPipeline = append(lookupPipeline, primitive.M{"$limit": preloadLimiter})
} else if !isSlice {
lookupPipeline = append(lookupPipeline, primitive.M{"$limit": 1})
}
pipeline = append(pipeline, primitive.M{
"$lookup": primitive.M{
"from": lookupCollection.Name(),
"let": lookupVars,
"pipeline": lookupPipeline,
"as": jsonName,
},
})
if isSlice {
continue
}
pipeline = append(pipeline, primitive.M{
"$unwind": primitive.M{
"preserveNullAndEmptyArrays": true,
"path": "$" + jsonName,
},
})
}
}
return collection.Aggregate(d.Context(), pipeline, opts)
return nil, mongox.ErrNoCollection
}
+25 -25
View File
@@ -13,7 +13,6 @@ import (
// DeleteArray removes documents list from a database by their ids
func (d *Database) DeleteArray(target interface{}, filters ...interface{}) (err error) {
targetV := reflect.ValueOf(target)
targetT := targetV.Type()
@@ -36,42 +35,43 @@ func (d *Database) DeleteArray(target interface{}, filters ...interface{}) (err
zeroElem := reflect.Zero(targetSliceElemT)
targetLen := targetSliceV.Len()
collection, err := d.GetCollectionOf(zeroElem.Interface())
if err != nil {
return err
}
composed, err := query.Compose(filters...)
if err != nil {
return
return err
}
collection := d.GetCollectionOf(zeroElem.Interface())
ids := primitive.A{}
ctx := query.WithContext(d.Context(), composed)
if targetLen > 0 {
var ids primitive.A
for i := 0; i < targetLen; i++ {
elem := targetSliceV.Index(i)
elemID, err := base.GetID(elem.Interface())
if err != nil {
return err
}
for i := 0; i < targetLen; i++ {
elem := targetSliceV.Index(i)
ids = append(ids, base.GetID(elem.Interface()))
}
defer func() {
invokerr := composed.OnClose().Invoke(ctx, target)
if err == nil {
err = invokerr
ids = append(ids, elemID)
}
return
}()
if len(ids) == 0 {
return fmt.Errorf("can't delete zero elements")
composed.And(primitive.M{"_id": primitive.M{"$in": ids}})
}
composed.And(primitive.M{"_id": primitive.M{"$in": ids}})
ctx := query.WithContext(d.Context(), composed)
m := composed.M()
opts := options.Delete()
result, err := collection.DeleteMany(ctx, composed.M(), options.Delete())
defer func() { _ = composed.OnClose().Invoke(ctx, target) }()
result, err := collection.DeleteMany(ctx, m, opts)
if err != nil {
return
return fmt.Errorf("while deleting array: %w", err)
}
if result.DeletedCount != int64(targetLen) {
err = fmt.Errorf("can't verify delete result: removed count mismatch %d != %d", result.DeletedCount, targetLen)
return fmt.Errorf("deleted count mismatch %d != %d", result.DeletedCount, targetLen)
}
return
return nil
}
+27 -24
View File
@@ -2,6 +2,7 @@ package database
import (
"fmt"
"github.com/mainnika/mongox-go-driver/v2/mongox/base/protection"
"github.com/modern-go/reflect2"
"go.mongodb.org/mongo-driver/bson/primitive"
@@ -13,51 +14,53 @@ import (
// DeleteOne removes a document from a database and then returns it into target
func (d *Database) DeleteOne(target interface{}, filters ...interface{}) (err error) {
composed, err := query.Compose(filters...)
if err != nil {
return
return err
}
collection := d.GetCollectionOf(target)
protected := base.GetProtection(target)
ctx := query.WithContext(d.Context(), composed)
opts := options.FindOneAndDelete()
opts.Sort = composed.Sorter()
collection, err := d.GetCollectionOf(target)
if err != nil {
return err
}
if !reflect2.IsNil(target) {
composed.And(primitive.M{"_id": base.GetID(target)})
targetID, err := base.GetID(target)
if err != nil {
return err
}
composed.And(primitive.M{"_id": targetID})
}
protected := protection.Get(target)
if protected != nil {
query.Push(composed, protected)
_, err := query.Push(composed, protected)
if err != nil {
return err
}
protected.Restate()
}
defer func() {
invokerr := composed.OnClose().Invoke(ctx, target)
if err == nil {
err = invokerr
}
ctx := query.WithContext(d.Context(), composed)
m := composed.M()
opts := options.FindOneAndDelete()
opts.Sort = composed.Sorter()
return
}()
defer func() { _ = composed.OnClose().Invoke(ctx, target) }()
result := collection.FindOneAndDelete(ctx, composed.M(), opts)
result := collection.FindOneAndDelete(ctx, m, opts)
if result.Err() != nil {
return fmt.Errorf("can't create find one and delete result: %w", result.Err())
}
err = result.Decode(target)
if err != nil {
return
return fmt.Errorf("can't decode find one and delete result: %w", err)
}
err = composed.OnDecode().Invoke(ctx, target)
if err != nil {
return
}
_ = composed.OnDecode().Invoke(ctx, target)
return
return nil
}
+13 -10
View File
@@ -14,18 +14,21 @@ import (
)
// IndexEnsure function ensures index in mongo collection of document
// `index:""` -- https://docs.mongodb.com/manual/indexes/#create-an-index
// `index:"-"` -- (descending)
// `index:"-,+foo,+-bar"` -- https://docs.mongodb.com/manual/core/index-compound
// `index:"-,+foo,+-bar,unique"` -- https://docs.mongodb.com/manual/core/index-unique
// `index:"-,+foo,+-bar,unique,allowNull"` -- https://docs.mongodb.com/manual/core/index-partial
// `index:"-,unique,allowNull,expireAfter=86400"` -- https://docs.mongodb.com/manual/core/index-ttl
// `index:"-,unique,allowNull,expireAfter={{.Expire}}"` -- evaluate index as a golang template with `cfg` arguments
//
// `index:""` -- https://docs.mongodb.com/manual/indexes/#create-an-index
// `index:"-"` -- (descending)
// `index:"-,+foo,+-bar"` -- https://docs.mongodb.com/manual/core/index-compound
// `index:"-,+foo,+-bar,unique"` -- https://docs.mongodb.com/manual/core/index-unique
// `index:"-,+foo,+-bar,unique,allowNull"` -- https://docs.mongodb.com/manual/core/index-partial
// `index:"-,unique,allowNull,expireAfter=86400"` -- https://docs.mongodb.com/manual/core/index-ttl
// `index:"-,unique,allowNull,expireAfter={{.Expire}}"` -- evaluate index as a golang template with `cfg` arguments
func (d *Database) IndexEnsure(cfg interface{}, document interface{}) (err error) {
el := reflect.ValueOf(document).Elem().Type()
numField := el.NumField()
documents := d.GetCollectionOf(document)
collection, err := d.GetCollectionOf(document)
if err != nil {
return err
}
for i := 0; i < numField; i++ {
@@ -126,7 +129,7 @@ func (d *Database) IndexEnsure(cfg interface{}, document interface{}) (err error
}
}
_, err = documents.Indexes().CreateOne(d.Context(), mongo.IndexModel{Keys: index, Options: opts})
_, err = collection.Indexes().CreateOne(d.Context(), mongo.IndexModel{Keys: index, Options: opts})
if err != nil {
return
}
+5 -1
View File
@@ -1,6 +1,7 @@
package database_test
import (
"github.com/stretchr/testify/require"
"testing"
"github.com/stretchr/testify/assert"
@@ -142,7 +143,10 @@ func TestDatabase_Ensure(t *testing.T) {
err = db.IndexEnsure(tt.settings, tt.doc)
assert.NoError(t, err)
indexes, _ := db.GetCollectionOf(tt.doc).Indexes().List(db.Context())
collection, err := db.GetCollectionOf(tt.doc)
require.NoError(t, err)
indexes, _ := collection.Indexes().List(db.Context())
index := new(map[string]interface{})
indexes.Next(db.Context()) // skip _id_
+21 -50
View File
@@ -4,14 +4,12 @@ import (
"fmt"
"reflect"
"github.com/mainnika/mongox-go-driver/v2/mongox"
"github.com/mainnika/mongox-go-driver/v2/mongox/base"
"github.com/mainnika/mongox-go-driver/v2/mongox/query"
)
// LoadArray loads an array of documents from the database by query
func (d *Database) LoadArray(target interface{}, filters ...interface{}) (err error) {
targetV := reflect.ValueOf(target)
targetT := targetV.Type()
@@ -31,61 +29,36 @@ func (d *Database) LoadArray(target interface{}, filters ...interface{}) (err er
panic(fmt.Errorf("target slice should contain ptrs"))
}
zeroElem := reflect.Zero(targetSliceElemT)
composed, err := query.Compose(filters...)
if err != nil {
return
return err
}
zeroElem := reflect.Zero(targetSliceElemT)
_, hasPreloader := composed.Preloader()
ctx := query.WithContext(d.Context(), composed)
var result *mongox.Cursor
var i int
defer func() { _ = composed.OnClose().Invoke(ctx, target) }()
defer func() {
if result != nil {
closerr := result.Close(ctx)
if err == nil {
err = closerr
}
}
invokerr := composed.OnClose().Invoke(ctx, target)
if err == nil {
err = invokerr
}
return
}()
if hasPreloader {
result, err = d.createAggregateLoad(zeroElem.Interface(), composed)
} else {
result, err = d.createSimpleLoad(zeroElem.Interface(), composed)
}
cur, err := d.createCursor(zeroElem.Interface(), composed)
if err != nil {
err = fmt.Errorf("can't create find result: %w", err)
return
return fmt.Errorf("can't create find result: %w", err)
}
for i = 0; result.Next(ctx); i++ {
defer func() { _ = cur.Close(ctx) }()
var i int
for i = 0; cur.Next(ctx); i++ {
var elem interface{}
if i == targetSliceV.Len() {
value := reflect.New(targetSliceElemT.Elem())
elem = value.Interface()
err = composed.OnCreate().Invoke(ctx, elem)
if err != nil {
return
}
_ = composed.OnCreate().Invoke(ctx, elem)
err = result.Decode(elem)
err = cur.Decode(elem)
if err != nil {
return
return err
}
targetSliceV = reflect.Append(targetSliceV, value)
@@ -93,26 +66,24 @@ func (d *Database) LoadArray(target interface{}, filters ...interface{}) (err er
elem = targetSliceV.Index(i).Interface()
if created := base.Reset(elem); created {
err = composed.OnCreate().Invoke(ctx, elem)
}
if err != nil {
return
_ = composed.OnCreate().Invoke(ctx, elem)
}
err = result.Decode(elem)
err = cur.Decode(elem)
if err != nil {
return
return err
}
}
err = composed.OnDecode().Invoke(ctx, elem)
if err != nil {
return
}
_ = composed.OnDecode().Invoke(ctx, elem)
}
err = cur.Err()
if err != nil {
return err
}
targetSliceV = targetSliceV.Slice(0, i)
targetV.Elem().Set(targetSliceV)
return
return nil
}
+13 -44
View File
@@ -1,8 +1,6 @@
package database
import (
"fmt"
"github.com/mainnika/mongox-go-driver/v2/mongox"
"github.com/mainnika/mongox-go-driver/v2/mongox/base"
"github.com/mainnika/mongox-go-driver/v2/mongox/query"
@@ -10,68 +8,39 @@ import (
// LoadOne function loads a first single target document by a query
func (d *Database) LoadOne(target interface{}, filters ...interface{}) (err error) {
composed, err := query.Compose(append(filters, query.Limit(1))...)
if err != nil {
return
return err
}
_, hasPreloader := composed.Preloader()
ctx := query.WithContext(d.Context(), composed)
var result *mongox.Cursor
defer func() { _ = composed.OnClose().Invoke(ctx, target) }()
defer func() {
if result != nil {
closerr := result.Close(ctx)
if err == nil {
err = closerr
}
}
invokerr := composed.OnClose().Invoke(ctx, target)
if err == nil {
err = invokerr
}
return
}()
if hasPreloader {
result, err = d.createAggregateLoad(target, composed)
} else {
result, err = d.createSimpleLoad(target, composed)
}
cur, err := d.createCursor(target, composed)
if err != nil {
return fmt.Errorf("can't create find result: %w", err)
return err
}
defer func() { _ = cur.Close(ctx) }()
hasNext := result.Next(ctx)
if result.Err() != nil {
err = result.Err()
return
hasNext := cur.Next(ctx)
if cur.Err() != nil {
return cur.Err()
}
if !hasNext {
return mongox.ErrNoDocuments
}
if created := base.Reset(target); created {
err = composed.OnCreate().Invoke(ctx, target)
}
if err != nil {
return
_ = composed.OnCreate().Invoke(ctx, target)
}
err = result.Decode(target)
err = cur.Decode(target)
if err != nil {
return
return err
}
err = composed.OnDecode().Invoke(ctx, target)
if err != nil {
return
}
_ = composed.OnDecode().Invoke(ctx, target)
return
return nil
}
+4 -15
View File
@@ -1,36 +1,25 @@
package database
import (
"fmt"
"github.com/mainnika/mongox-go-driver/v2/mongox"
"github.com/mainnika/mongox-go-driver/v2/mongox/query"
)
// LoadStream function loads documents one by one into a target channel
func (d *Database) LoadStream(target interface{}, filters ...interface{}) (loader mongox.StreamLoader, err error) {
composed, err := query.Compose(filters...)
if err != nil {
return
}
_, hasPreloader := composed.Preloader()
ctx := query.WithContext(d.Context(), composed)
var cursor *mongox.Cursor
if hasPreloader {
cursor, err = d.createAggregateLoad(target, composed)
} else {
cursor, err = d.createSimpleLoad(target, composed)
}
cur, err := d.createCursor(target, composed)
if err != nil {
err = fmt.Errorf("can't create find result: %w", err)
return
return nil, err
}
loader = &StreamLoader{cur: cursor, ctx: ctx, query: composed}
loader = &StreamLoader{cur: cur, ctx: ctx, query: composed}
return
return loader, nil
}
+18 -18
View File
@@ -1,6 +1,7 @@
package database
import (
"github.com/mainnika/mongox-go-driver/v2/mongox/base/protection"
"go.mongodb.org/mongo-driver/bson/primitive"
"go.mongodb.org/mongo-driver/mongo/options"
@@ -10,38 +11,37 @@ import (
// SaveOne saves a single source document to the database
func (d *Database) SaveOne(source interface{}, filters ...interface{}) (err error) {
collection, err := d.GetCollectionOf(source)
if err != nil {
return err
}
composed, err := query.Compose(filters...)
if err != nil {
return
return err
}
collection := d.GetCollectionOf(source)
id := base.GetID(source)
protected := base.GetProtection(source)
ctx := query.WithContext(d.Context(), composed)
id, err := base.GetID(source)
if err != nil {
return err
}
composed.And(primitive.M{"_id": id})
opts := options.FindOneAndReplace()
opts.SetUpsert(true)
opts.SetReturnDocument(options.After)
protected := protection.Get(source)
if protected != nil {
query.Push(composed, protected)
protected.Restate()
}
defer func() {
invokerr := composed.OnClose().Invoke(ctx, source)
if err == nil {
err = invokerr
}
ctx := query.WithContext(d.Context(), composed)
m := composed.M()
opts := options.FindOneAndReplace()
opts.SetUpsert(true)
opts.SetReturnDocument(options.After)
return
}()
defer func() { _ = composed.OnClose().Invoke(ctx, source) }()
result := collection.FindOneAndReplace(ctx, composed.M(), source, opts)
result := collection.FindOneAndReplace(ctx, m, source, opts)
if result.Err() != nil {
return result.Err()
}
+17 -27
View File
@@ -17,7 +17,6 @@ type StreamLoader struct {
// DecodeNextMsg decodes the next document to an interface or returns an error
func (l *StreamLoader) DecodeNextMsg(i interface{}) (err error) {
err = l.Next()
if err != nil {
return
@@ -33,41 +32,35 @@ func (l *StreamLoader) DecodeNextMsg(i interface{}) (err error) {
// DecodeMsg decodes the current cursor document into an interface
func (l *StreamLoader) DecodeMsg(i interface{}) (err error) {
if created := base.Reset(i); created {
err = l.query.OnDecode().Invoke(l.ctx, i)
_ = l.query.OnDecode().Invoke(l.ctx, i)
}
if err != nil {
return
return err
}
err = l.cur.Decode(i)
if err != nil {
return
return err
}
err = l.query.OnDecode().Invoke(l.ctx, i)
if err != nil {
return
}
_ = l.query.OnDecode().Invoke(l.ctx, i)
return
return nil
}
// Next loads next documents but doesn't perform decoding
func (l *StreamLoader) Next() (err error) {
hasNext := l.cur.Next(l.ctx)
err = l.cur.Err()
if err != nil {
return
return err
}
if !hasNext {
err = mongox.ErrNoDocuments
return mongox.ErrNoDocuments
}
return
return nil
}
// Cursor returns the underlying cursor
@@ -77,24 +70,21 @@ func (l *StreamLoader) Cursor() (cursor *mongox.Cursor) {
// Close stream loader and the underlying cursor
func (l *StreamLoader) Close() (err error) {
defer func() { _ = l.query.OnClose().Invoke(l.ctx, nil) }()
closerr := l.cur.Close(l.ctx)
invokerr := l.query.OnClose().Invoke(l.ctx, nil)
if closerr != nil {
err = closerr
return
err = l.cur.Close(l.ctx)
if err != nil {
return err
}
if invokerr != nil {
err = invokerr
return
}
return
return nil
}
// Err returns the last error
func (l *StreamLoader) Err() (err error) {
return l.cur.Err()
}
func (l *StreamLoader) Context() (ctx context.Context) {
return l.ctx
}
+22 -30
View File
@@ -1,72 +1,64 @@
package database
import (
"github.com/mainnika/mongox-go-driver/v2/mongox/base/protection"
"github.com/modern-go/reflect2"
"go.mongodb.org/mongo-driver/bson/primitive"
"go.mongodb.org/mongo-driver/mongo/options"
"github.com/mainnika/mongox-go-driver/v2/mongox/base"
"github.com/mainnika/mongox-go-driver/v2/mongox/query"
)
// UpdateOne updates a single document in the database and loads it into target
func (d *Database) UpdateOne(target interface{}, filters ...interface{}) (err error) {
composed, err := query.Compose(filters...)
if err != nil {
return
return err
}
updaterDoc, err := composed.Updater()
update, err := composed.Updater()
if err != nil {
return
return err
}
collection := d.GetCollectionOf(target)
protected := base.GetProtection(target)
ctx := query.WithContext(d.Context(), composed)
opts := options.FindOneAndUpdate()
opts.SetReturnDocument(options.After)
protected := protection.Get(target)
if protected != nil {
if !protected.X.IsZero() {
query.Push(composed, protected)
}
protected.Restate()
setCmd, _ := updaterDoc["$set"].(primitive.M)
setCmd, _ := update["$set"].(primitive.M)
if reflect2.IsNil(setCmd) {
setCmd = primitive.M{}
}
protected.PutToDocument(setCmd)
updaterDoc["$set"] = setCmd
protected.Inject(setCmd)
update["$set"] = setCmd
}
defer func() {
invokerr := composed.OnClose().Invoke(ctx, target)
if err == nil {
err = invokerr
}
collection, err := d.GetCollectionOf(target)
if err != nil {
return err
}
return
}()
ctx := query.WithContext(d.Context(), composed)
m := composed.M()
opts := options.FindOneAndUpdate()
opts.SetReturnDocument(options.After)
result := collection.FindOneAndUpdate(ctx, composed.M(), updaterDoc, opts)
defer func() { _ = composed.OnClose().Invoke(ctx, target) }()
result := collection.FindOneAndUpdate(ctx, m, update, opts)
if result.Err() != nil {
return result.Err()
}
err = result.Decode(target)
if err != nil {
return
return err
}
err = composed.OnDecode().Invoke(ctx, target)
if err != nil {
return
}
_ = composed.OnDecode().Invoke(ctx, target)
return
return nil
}