-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathsort.go
71 lines (60 loc) · 1.46 KB
/
sort.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
package flame
import (
"sort"
"golang.org/x/exp/constraints"
)
/**************************/
// Sorter
/**************************/
func AddKeySort[X constraints.Ordered, Y any](w *Workflow) Node[KeyValue[X, Y], KeyValue[X, Y]] {
q := make([]KeyValue[X, Y], 0, 10)
n := &SortNode[X, Y]{Queue: q}
w.Nodes = append(w.Nodes, n)
return n
}
type SortNode[X constraints.Ordered, Y any] struct {
Input chan KeyValue[X, Y]
Queue []KeyValue[X, Y]
Outputs []chan KeyValue[X, Y]
}
// Swap is part of sort.Interface.
func (s *SortNode[X, Y]) Swap(i, j int) {
s.Queue[i], s.Queue[j] = s.Queue[j], s.Queue[i]
}
// Less is part of sort.Interface. It is implemented by calling the "by" closure in the sorter.
func (s *SortNode[X, Y]) Less(i, j int) bool {
return s.Queue[i].Key < s.Queue[j].Key
}
// Len is part of sort.Interface.
func (s *SortNode[X, Y]) Len() int {
return len(s.Queue)
}
func (s *SortNode[X, Y]) start(wf *Workflow) {
wf.WaitGroup.Add(1)
go func() {
if s.Input != nil {
for x := range s.Input {
s.Queue = append(s.Queue, x)
}
}
sort.Sort(s)
for _, y := range s.Queue {
for i := range s.Outputs {
s.Outputs[i] <- y
}
}
for i := range s.Outputs {
close(s.Outputs[i])
}
wf.WaitGroup.Done()
}()
}
func (n *SortNode[X, Y]) Connect(e Emitter[KeyValue[X, Y]]) {
o := e.GetOutput()
n.Input = o
}
func (n *SortNode[X, Y]) GetOutput() chan KeyValue[X, Y] {
m := make(chan KeyValue[X, Y])
n.Outputs = append(n.Outputs, m)
return m
}