-
Notifications
You must be signed in to change notification settings - Fork 4
/
Copy pathmap.go
130 lines (119 loc) · 3.3 KB
/
map.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
package hoff
import (
"context"
"fmt"
"sync"
"sync/atomic"
)
// Map implements the basic map function.
func Map[In, Out any](arr []In, fn func(In) Out) []Out {
out := make([]Out, 0, len(arr))
for _, elem := range arr {
out = append(out, fn(elem))
}
return out
}
// MapError is the same as `Map` but for functions that might return an error.
func MapError[In, Out any](arr []In, fn func(In, int) (Out, error)) ([]Out, error) {
out := make([]Out, 0, len(arr))
for i, elem := range arr {
mapped, err := fn(elem, i)
if err != nil {
return nil, fmt.Errorf("MapError got an error in index %d, value %v: %w", i, elem, err)
}
out = append(out, mapped)
}
return out, nil
}
// MapContextError is the same as `MapError` but with a context.
func MapContextError[In, Out any](
ctx context.Context,
arr []In,
fn func(context.Context, In, int) (Out, error),
) ([]Out, error) {
out := make([]Out, 0, len(arr))
for i, elem := range arr {
mapped, err := fn(ctx, elem, i)
if err != nil {
return nil, fmt.Errorf("MapContextError got an error in index %d, value %v: %w", i, elem, err)
}
out = append(out, mapped)
}
return out, nil
}
// MapConcurrentToResults is the same as `MapContextError` but applied with concurrency.
// It returns an slice of results, each one containing the value and the error
// if any. In spite of concurrency, order is guaranteed.
func MapConcurrentToResults[In, Out any](
ctx context.Context,
arr []In,
fn func(context.Context, In, int) (Out, error),
) Results[Out] {
results := make(Results[Out], len(arr))
var wg sync.WaitGroup
for i, t := range arr {
wg.Add(1)
go func(ctx context.Context, elem In, i int) {
defer wg.Done()
defer func() {
r := recover()
if r != nil {
var val Out
results[i] = Result[Out]{
Value: val,
Error: fmt.Errorf(
"MapConcurrentToResults recovered from panic while processing index %d, value %v: %v", i,
elem, r,
),
}
}
}()
val, err := fn(ctx, elem, i)
if err != nil {
err = fmt.Errorf("MapConcurrentToResults got an error in index %d, value %v: %w", i, elem, err)
}
results[i] = Result[Out]{val, err}
}(ctx, t, i)
}
wg.Wait()
return results
}
// MapConcurrentError is the same as `MapConcurrentError` but returns only the
// values and, if it happens, the first error.
func MapConcurrentError[In, Out any](
ctx context.Context,
arr []In,
fn func(context.Context, In, int) (Out, error),
) ([]Out, error) {
results := make([]Out, len(arr))
errs := make(chan error)
defer close(errs)
var shutdown uint32 // used to gracefuly shutdown in case of error or panic
for i, elem := range arr {
go func(ctx context.Context, elem In, i int) {
defer func() {
r := recover()
if r != nil && atomic.CompareAndSwapUint32(&shutdown, 0, 1) {
errs <- fmt.Errorf(
"MapConcurrentError recovered from panic while processing index %d, value %v: %v", i, elem, r,
)
}
}()
r, err := fn(ctx, elem, i)
if err != nil && atomic.CompareAndSwapUint32(&shutdown, 0, 1) {
errs <- fmt.Errorf("MapConcurrentError got an error in index %d, value %v: %w", i, elem, err)
}
results[i] = r
if atomic.LoadUint32(&shutdown) == 0 {
errs <- nil
}
}(ctx, elem, i)
}
for range arr {
err := <-errs
if err != nil {
return nil, err
}
}
return results, nil
}