From e40cac29e814a54707c53a1b5dc8df8b5102aea4 Mon Sep 17 00:00:00 2001 From: Nikita Tokarchuk Date: Sun, 9 Dec 2018 17:20:28 +0100 Subject: [PATCH] LoadStream function --- mongox/common/loadstream.go | 67 +++++++++++++++++++++++++++++++++++++ mongox/query/limit.go | 17 ++++++++++ 2 files changed, 84 insertions(+) create mode 100644 mongox/common/loadstream.go create mode 100644 mongox/query/limit.go diff --git a/mongox/common/loadstream.go b/mongox/common/loadstream.go new file mode 100644 index 0000000..3a12b53 --- /dev/null +++ b/mongox/common/loadstream.go @@ -0,0 +1,67 @@ +package common + +import ( + "reflect" + + "github.com/mainnika/mongox-go-driver/mongox" + "github.com/mainnika/mongox-go-driver/mongox/errors" + "github.com/mainnika/mongox-go-driver/mongox/query" + "github.com/mongodb/mongo-go-driver/mongo/options" +) + +// LoadStream function loads documents one by one into a target channel +func LoadStream(db *mongox.Database, target interface{}, composed *query.Query) error { + + targetV := reflect.ValueOf(target) + targetT := targetV.Type() + + targetK := targetV.Kind() + if targetK != reflect.Chan { + panic(errors.InternalErrorf("target is not a chan")) + } + if targetT.Elem().Kind() != reflect.Ptr { + panic(errors.InternalErrorf("chan element should be a document ptr")) + } + + dummy := reflect.Zero(targetT.Elem()) + collection := db.GetCollectionOf(dummy.Interface()) + opts := &options.FindOptions{} + + if composed.Sorter() != nil { + opts.Sort = composed.Sorter().Sort() + } + if composed.Limiter() != nil { + limit := int64(composed.Limiter().Limit()) + opts.Limit = &limit + } + + result, err := collection.Find(db.Context(), composed.M(), opts) + if err != nil { + return errors.InternalErrorf("can't create find result: %s", err) + } + + go func() { + defer result.Close(db.Context()) + + for { + elem, ok := targetV.Recv() + if !ok { + break + } + + if result.Next(db.Context()) != true { + targetV.Send(dummy) + break + } + + if result.Decode(elem.Interface()) != nil { + targetV.Send(dummy) + break + } + + targetV.Send(elem) + } + }() + + return nil +} diff --git a/mongox/query/limit.go b/mongox/query/limit.go new file mode 100644 index 0000000..076ac37 --- /dev/null +++ b/mongox/query/limit.go @@ -0,0 +1,17 @@ +package query + +// Limiter is a filter to limit the result +type Limiter interface { + Limit() int +} + +// Limit is a simple implementation of the Limiter filter +type Limit int + +var _ Limiter = Limit(0) + +// Limit returns a limit +func (l Limit) Limit() int { + + return int(l) +}