|
|
@ -10,21 +10,21 @@ import ( |
|
|
|
|
|
|
|
|
|
|
|
// StreamLoader is a controller for a database cursor
|
|
|
|
// StreamLoader is a controller for a database cursor
|
|
|
|
type StreamLoader struct { |
|
|
|
type StreamLoader struct { |
|
|
|
cur *mongox.Cursor |
|
|
|
cur *mongox.Cursor |
|
|
|
query *query.Query |
|
|
|
query *query.Query |
|
|
|
ctx context.Context |
|
|
|
ctx context.Context |
|
|
|
target interface{} |
|
|
|
ref interface{} |
|
|
|
} |
|
|
|
} |
|
|
|
|
|
|
|
|
|
|
|
// DecodeNext loads next documents to a target or returns an error
|
|
|
|
// DecodeNextMsg decodes the next document to an interface or returns an error
|
|
|
|
func (l *StreamLoader) DecodeNext() (err error) { |
|
|
|
func (l *StreamLoader) DecodeNextMsg(i interface{}) (err error) { |
|
|
|
|
|
|
|
|
|
|
|
err = l.Next() |
|
|
|
err = l.Next() |
|
|
|
if err != nil { |
|
|
|
if err != nil { |
|
|
|
return |
|
|
|
return |
|
|
|
} |
|
|
|
} |
|
|
|
|
|
|
|
|
|
|
|
err = l.Decode() |
|
|
|
err = l.DecodeMsg(i) |
|
|
|
if err != nil { |
|
|
|
if err != nil { |
|
|
|
return |
|
|
|
return |
|
|
|
} |
|
|
|
} |
|
|
@ -32,22 +32,22 @@ func (l *StreamLoader) DecodeNext() (err error) { |
|
|
|
return |
|
|
|
return |
|
|
|
} |
|
|
|
} |
|
|
|
|
|
|
|
|
|
|
|
// Decode function decodes the current cursor document into the target
|
|
|
|
// DecodeMsg decodes the current cursor document into an interface
|
|
|
|
func (l *StreamLoader) Decode() (err error) { |
|
|
|
func (l *StreamLoader) DecodeMsg(i interface{}) (err error) { |
|
|
|
|
|
|
|
|
|
|
|
if created := base.Reset(l.target); created { |
|
|
|
if created := base.Reset(i); created { |
|
|
|
err = l.query.OnDecode().Invoke(l.ctx, l.target) |
|
|
|
err = l.query.OnDecode().Invoke(l.ctx, i) |
|
|
|
} |
|
|
|
} |
|
|
|
if err != nil { |
|
|
|
if err != nil { |
|
|
|
return |
|
|
|
return |
|
|
|
} |
|
|
|
} |
|
|
|
|
|
|
|
|
|
|
|
err = l.cur.Decode(l.target) |
|
|
|
err = l.cur.Decode(i) |
|
|
|
if err != nil { |
|
|
|
if err != nil { |
|
|
|
return |
|
|
|
return |
|
|
|
} |
|
|
|
} |
|
|
|
|
|
|
|
|
|
|
|
err = l.query.OnDecode().Invoke(l.ctx, l.target) |
|
|
|
err = l.query.OnDecode().Invoke(l.ctx, i) |
|
|
|
if err != nil { |
|
|
|
if err != nil { |
|
|
|
return |
|
|
|
return |
|
|
|
} |
|
|
|
} |
|
|
@ -79,7 +79,7 @@ func (l *StreamLoader) Cursor() (cursor *mongox.Cursor) { |
|
|
|
func (l *StreamLoader) Close() (err error) { |
|
|
|
func (l *StreamLoader) Close() (err error) { |
|
|
|
|
|
|
|
|
|
|
|
closerr := l.cur.Close(l.ctx) |
|
|
|
closerr := l.cur.Close(l.ctx) |
|
|
|
invokerr := l.query.OnClose().Invoke(l.ctx, l.target) |
|
|
|
invokerr := l.query.OnClose().Invoke(l.ctx, l.ref) |
|
|
|
|
|
|
|
|
|
|
|
if closerr != nil { |
|
|
|
if closerr != nil { |
|
|
|
err = closerr |
|
|
|
err = closerr |
|
|
|