-
Notifications
You must be signed in to change notification settings - Fork 7
/
Copy pathexecute.go
215 lines (197 loc) · 6.77 KB
/
execute.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
package goqux
import (
"context"
"fmt"
"reflect"
"github.com/doug-martin/goqu/v9"
"github.com/georgysavva/scany/v2/pgxscan"
"github.com/iancoleman/strcase"
)
type PaginationOptions struct {
// PageSize per page (default: 10)
PageSize uint
// Use columns for key filtering, this will add a WithKeySet option to the query,
// keys aren't validated, so make sure the names are correct or query will fail
// if KeySet isn't set, pagination will use offset instead.
KeySet []string
}
// PageIterator is a function that returns a page of results and a boolean indicating if there should be a next page or to stop iterating.
type PageIterator[T any] func(p *Paginator[T]) ([]T, bool, error)
// Paginator allows to paginate over result set of T
type Paginator[T any] struct {
hasNext bool
iterator PageIterator[T]
offset uint
values []any
stop bool
}
func NewPaginator[T any](iterator PageIterator[T]) *Paginator[T] {
return &Paginator[T]{
hasNext: true,
iterator: iterator,
offset: 0,
values: nil,
stop: false,
}
}
func (p *Paginator[T]) HasMorePages() bool {
return p.hasNext && !p.stop
}
func (p *Paginator[T]) NextPage() ([]T, error) {
data, shouldStop, err := p.iterator(p)
if shouldStop {
p.stop = true
}
return data, err
}
func Select[T any](ctx context.Context, querier pgxscan.Querier, tableName string, options ...SelectOption) ([]T, error) {
query, args, err := BuildSelect(tableName, new(T), options...)
if err != nil {
return nil, err
}
results := make([]T, 0)
if err := pgxscan.Select(ctx, querier, &results, query, args...); err != nil {
return nil, fmt.Errorf("goqux: failed to select: %w", err)
}
return results, nil
}
func SelectOne[T any](ctx context.Context, querier pgxscan.Querier, tableName string, options ...SelectOption) (T, error) {
var result T
query, args, err := BuildSelect(tableName, new(T), append(options, WithSelectLimit(1))...)
if err != nil {
return result, err
}
if err := pgxscan.Get(ctx, querier, &result, query, args...); err != nil {
return result, fmt.Errorf("goqux: failed to select: %w", err)
}
return result, nil
}
func SelectPagination[T any](ctx context.Context, querier pgxscan.Querier, tableName string, paginationOptions *PaginationOptions, options ...SelectOption) (*Paginator[T], error) {
if paginationOptions == nil {
paginationOptions = &PaginationOptions{
PageSize: 10,
}
}
originalOptions := options
return NewPaginator(func(p *Paginator[T]) ([]T, bool, error) {
if paginationOptions.KeySet != nil {
//nolint:gocritic
options = append(originalOptions, WithKeySet(paginationOptions.KeySet, p.values))
} else {
//nolint:gocritic
options = append(originalOptions, WithSelectOffset(p.offset))
}
results, err := Select[T](ctx, querier, tableName, append(options, WithSelectLimit(paginationOptions.PageSize))...)
if err != nil {
return nil, false, fmt.Errorf("goqux: failed to select: %w", err)
}
if len(results) == 0 || len(results) < int(paginationOptions.PageSize) {
p.hasNext = false
return results, false, nil
}
if len(paginationOptions.KeySet) > 0 {
var values = make([]any, len(results))
lastResult := results[len(results)-1]
for i, c := range paginationOptions.KeySet {
values[i] = reflect.ValueOf(lastResult).FieldByName(c).Interface()
}
p.values = values
} else {
p.offset += paginationOptions.PageSize
}
return results, false, nil
}), nil
}
// QueryKeySetPagination is a helper function to paginate over a query using keyset pagination.
func QueryKeySetPagination[T any](ctx context.Context, querier pgxscan.Querier, sd *goqu.SelectDataset, pageSize uint, keyset []string) (*Paginator[T], error) {
if len(keyset) == 0 {
return nil, fmt.Errorf("goqux: keyset is required for pagination")
}
paginationOptions := &PaginationOptions{
PageSize: pageSize,
KeySet: keyset,
}
return NewPaginator(func(p *Paginator[T]) ([]T, bool, error) {
sd = sd.Limit(paginationOptions.PageSize).ClearOffset().ClearOrder()
if p.values == nil {
for _, c := range keyset {
sd = sd.OrderAppend(goqu.C(strcase.ToSnake(c)).Asc())
}
} else {
for i, c := range keyset {
sd = sd.Where(goqu.C(strcase.ToSnake(c)).Gt(p.values[i]))
sd = sd.OrderAppend(goqu.C(strcase.ToSnake(c)).Asc())
}
}
query, args, err := sd.ToSQL()
if err != nil {
return nil, false, fmt.Errorf("goqux: failed to build select query: %w", err)
}
rows, err := querier.Query(ctx, query, args...)
if err != nil {
return nil, false, fmt.Errorf("querier: failed to select: %w", err)
}
results := make([]T, 0)
if err := pgxscan.ScanAll(&results, rows); err != nil {
return nil, false, fmt.Errorf("dbscan: failed to scan: %w", err)
}
if len(results) == 0 || len(results) < int(paginationOptions.PageSize) {
p.hasNext = false
return results, false, nil
}
var values = make([]any, len(results))
lastResult := results[len(results)-1]
for i, c := range paginationOptions.KeySet {
values[i] = reflect.ValueOf(lastResult).FieldByName(c).Interface()
}
p.values = values
return results, false, nil
}), nil
}
func Delete[T any](ctx context.Context, querier pgxscan.Querier, tableName string, options ...DeleteOption) ([]T, error) {
query, args, err := BuildDelete(tableName, options...)
if err != nil {
return nil, err
}
results := make([]T, 0)
if err := pgxscan.Select(ctx, querier, &results, query, args...); err != nil {
return nil, fmt.Errorf("goqux: failed to delete: %w", err)
}
return results, nil
}
func Update[T any](ctx context.Context, querier pgxscan.Querier, tableName string, updateValue any, options ...UpdateOption) ([]T, error) {
query, args, err := BuildUpdate(tableName, updateValue, options...)
if err != nil {
return nil, err
}
results := make([]T, 0)
if err := pgxscan.Select(ctx, querier, &results, query, args...); err != nil {
return nil, fmt.Errorf("goqux: failed to update: %w", err)
}
return results, nil
}
func Insert[T any](ctx context.Context, querier pgxscan.Querier, tableName string, insertValue any, options ...InsertOption) (*T, error) {
var result T
query, args, err := BuildInsert(tableName, []any{insertValue}, options...)
if err != nil {
return nil, err
}
if err := pgxscan.Get(ctx, querier, &result, query, args...); err != nil {
if pgxscan.NotFound(err) {
return nil, nil
}
return nil, fmt.Errorf("goqux: failed to insert: %w", err)
}
return &result, nil
}
func InsertMany[T any](ctx context.Context, querier pgxscan.Querier, tableName string, insertValues []any, options ...InsertOption) ([]T, error) {
query, args, err := BuildInsert(tableName, insertValues, options...)
if err != nil {
return nil, err
}
results := make([]T, 0)
if err := pgxscan.Select(ctx, querier, &results, query, args...); err != nil {
return nil, fmt.Errorf("goqux: failed to insert many: %w", err)
}
return results, nil
}