-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathaggregate.go
87 lines (79 loc) · 2.44 KB
/
aggregate.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
package easymongo
import (
"go.mongodb.org/mongo-driver/mongo"
"go.mongodb.org/mongo-driver/mongo/options"
)
type AggregationQuery struct {
*Query
allowDiskUse *bool
batchSize *int32
bypassDocumentValidation *bool
// Collation: nil,
// Comment: nil,
// Hint
}
// Aggregate begins an aggregationQuery pipeline.
// The pipeline will be executed on a call to coll.Aggregate().All() or coll.Agregate().One()
func (c *Collection) Aggregate(pipeline interface{}) *AggregationQuery {
return &AggregationQuery{
Query: c.query(pipeline),
}
}
// Cursor executes the query using the provided options and returns the
// mongo.Cursor that can be worked with directly. This is typically useful
// when returning large numbers of results. If you just need to get at the documents without
// iterating, call .One() or .All()
func (p *AggregationQuery) Cursor() (*mongo.Cursor, error) {
coll := p.collection.mongoColl
ctx, cancelFunc := p.getContext()
defer cancelFunc()
opts := p.aggregateOptions()
cursor, err := coll.Aggregate(ctx, p.filter, opts)
return cursor, p.collection.handleErr(err)
}
func (p *AggregationQuery) aggregateOptions() *options.AggregateOptions {
opts := &options.AggregateOptions{
AllowDiskUse: p.allowDiskUse,
BatchSize: p.batchSize,
BypassDocumentValidation: p.bypassDocumentValidation,
Collation: p.Query.collation,
MaxTime: p.Query.timeout,
Comment: p.Query.comment,
}
if p.hintIndices != nil {
opts.Hint = *p.Query.hintIndices
}
return opts
}
// All executes the aggregation and returns the resultant output to the provided result object.
func (p *AggregationQuery) All(result interface{}) error {
cursor, err := p.Cursor()
if err != nil {
return err
}
ctx, cancelFunc := p.getContext()
defer cancelFunc()
if err = cursor.All(ctx, result); err != nil {
return err
}
return p.collection.handleErr(err)
}
// One executes the aggregation and returns the first result to the provided result object.
func (p *AggregationQuery) One(result interface{}) error {
cursor, err := p.Cursor()
if err != nil {
return err
}
ctx, cancelFunc := p.getContext()
defer cancelFunc()
if found := cursor.Next(ctx); !found {
// Move the cursor
err = ErrNoDocuments
return err
}
if err = cursor.Decode(result); err != nil {
// Decode the result
return err
}
return p.collection.handleErr(err)
}