5 Commits

7 changed files with 66 additions and 32 deletions
+4 -2
View File
@@ -6,7 +6,7 @@ import (
) )
// Reset function creates new zero object for the target pointer // Reset function creates new zero object for the target pointer
func Reset(target interface{}) { func Reset(target interface{}) (created bool) {
type resetter interface { type resetter interface {
Reset() Reset()
@@ -15,7 +15,7 @@ func Reset(target interface{}) {
resettable, canReset := target.(resetter) resettable, canReset := target.(resetter)
if canReset { if canReset {
resettable.Reset() resettable.Reset()
return return false
} }
v := reflect.ValueOf(target) v := reflect.ValueOf(target)
@@ -27,4 +27,6 @@ func Reset(target interface{}) {
zero := reflect.Zero(t) zero := reflect.Zero(t)
v.Elem().Set(zero) v.Elem().Set(zero)
return true
} }
+24 -11
View File
@@ -70,32 +70,45 @@ func (d *Database) LoadArray(target interface{}, filters ...interface{}) (err er
return return
} }
for i = 0; result.Next(ctx); { for i = 0; result.Next(ctx); i++ {
var elem interface{} var elem interface{}
if targetSliceV.Len() == i { if i == targetSliceV.Len() {
value := reflect.New(targetSliceElemT.Elem()) value := reflect.New(targetSliceElemT.Elem())
err = result.Decode(value.Interface())
elem = value.Interface() elem = value.Interface()
if err == nil {
targetSliceV = reflect.Append(targetSliceV, value) err = composed.OnCreate().Invoke(ctx, elem)
if err != nil {
return
} }
err = result.Decode(elem)
if err != nil {
return
}
targetSliceV = reflect.Append(targetSliceV, value)
} else { } else {
elem = targetSliceV.Index(i).Interface() elem = targetSliceV.Index(i).Interface()
base.Reset(elem)
if created := base.Reset(elem); created {
err = composed.OnCreate().Invoke(ctx, elem)
}
if err != nil {
return
}
err = result.Decode(elem) err = result.Decode(elem)
} if err != nil {
if err != nil { return
return }
} }
err = composed.OnDecode().Invoke(ctx, elem) err = composed.OnDecode().Invoke(ctx, elem)
if err != nil { if err != nil {
return return
} }
i++
} }
targetSliceV = targetSliceV.Slice(0, i) targetSliceV = targetSliceV.Slice(0, i)
+6 -1
View File
@@ -56,7 +56,12 @@ func (d *Database) LoadOne(target interface{}, filters ...interface{}) (err erro
return mongox.ErrNoDocuments return mongox.ErrNoDocuments
} }
base.Reset(target) if created := base.Reset(target); created {
err = composed.OnCreate().Invoke(ctx, target)
}
if err != nil {
return
}
err = result.Decode(target) err = result.Decode(target)
if err != nil { if err != nil {
+1 -1
View File
@@ -30,7 +30,7 @@ func (d *Database) LoadStream(target interface{}, filters ...interface{}) (loade
return return
} }
loader = &StreamLoader{cur: cursor, ctx: ctx, target: target, query: composed} loader = &StreamLoader{cur: cursor, ctx: ctx, ref: target, query: composed}
return return
} }
+22 -15
View File
@@ -10,21 +10,21 @@ import (
// StreamLoader is a controller for a database cursor // StreamLoader is a controller for a database cursor
type StreamLoader struct { type StreamLoader struct {
cur *mongox.Cursor cur *mongox.Cursor
query *query.Query query *query.Query
ctx context.Context ctx context.Context
target interface{} ref interface{}
} }
// DecodeNext loads next documents to a target or returns an error // DecodeNextMsg decodes the next document to an interface or returns an error
func (l *StreamLoader) DecodeNext() (err error) { func (l *StreamLoader) DecodeNextMsg(i interface{}) (err error) {
err = l.Next() err = l.Next()
if err != nil { if err != nil {
return return
} }
err = l.Decode() err = l.DecodeMsg(i)
if err != nil { if err != nil {
return return
} }
@@ -32,17 +32,22 @@ func (l *StreamLoader) DecodeNext() (err error) {
return return
} }
// Decode function decodes the current cursor document into the target // DecodeMsg decodes the current cursor document into an interface
func (l *StreamLoader) Decode() (err error) { func (l *StreamLoader) DecodeMsg(i interface{}) (err error) {
base.Reset(l.target) if created := base.Reset(i); created {
err = l.query.OnDecode().Invoke(l.ctx, i)
err = l.cur.Decode(l.target) }
if err != nil { if err != nil {
return return
} }
err = l.query.OnDecode().Invoke(l.ctx, l.target) err = l.cur.Decode(i)
if err != nil {
return
}
err = l.query.OnDecode().Invoke(l.ctx, i)
if err != nil { if err != nil {
return return
} }
@@ -66,15 +71,16 @@ func (l *StreamLoader) Next() (err error) {
return return
} }
// Cursor returns the underlying cursor
func (l *StreamLoader) Cursor() (cursor *mongox.Cursor) { func (l *StreamLoader) Cursor() (cursor *mongox.Cursor) {
return l.cur return l.cur
} }
// Close cursor // Close stream loader and the underlying cursor
func (l *StreamLoader) Close() (err error) { func (l *StreamLoader) Close() (err error) {
closerr := l.cur.Close(l.ctx) closerr := l.cur.Close(l.ctx)
invokerr := l.query.OnClose().Invoke(l.ctx, l.target) invokerr := l.query.OnClose().Invoke(l.ctx, l.ref)
if closerr != nil { if closerr != nil {
err = closerr err = closerr
@@ -89,6 +95,7 @@ func (l *StreamLoader) Close() (err error) {
return return
} }
// Err returns the last error
func (l *StreamLoader) Err() (err error) { func (l *StreamLoader) Err() (err error) {
return l.cur.Err() return l.cur.Err()
} }
+2 -2
View File
@@ -35,8 +35,8 @@ type Database interface {
// StreamLoader is a interface to control database cursor // StreamLoader is a interface to control database cursor
type StreamLoader interface { type StreamLoader interface {
Cursor() (cursor *Cursor) Cursor() (cursor *Cursor)
DecodeNext() (err error) DecodeNextMsg(i interface{}) (err error)
Decode() (err error) DecodeMsg(i interface{}) (err error)
Next() (err error) Next() (err error)
Close() (err error) Close() (err error)
Err() (err error) Err() (err error)
+7
View File
@@ -17,6 +17,7 @@ type Query struct {
updater Updater updater Updater
ondecode Callbacks ondecode Callbacks
onclose Callbacks onclose Callbacks
oncreate Callbacks
} }
// And function pushes the elem query to the $and array of the query // And function pushes the elem query to the $and array of the query
@@ -118,10 +119,16 @@ func (q *Query) OnDecode() (callbacks Callbacks) {
return q.ondecode return q.ondecode
} }
// OnClose callback is called after the mongox ends a loading procedure
func (q *Query) OnClose() (callbacks Callbacks) { func (q *Query) OnClose() (callbacks Callbacks) {
return q.onclose return q.onclose
} }
// OnCreate callback is called if the mongox creates a new document instance during loading
func (q *Query) OnCreate() (callbacks Callbacks) {
return q.onclose
}
// Empty checks the query for any content // Empty checks the query for any content
func (q *Query) Empty() (isEmpty bool) { func (q *Query) Empty() (isEmpty bool) {
return len(q.m) == 0 return len(q.m) == 0