-
Notifications
You must be signed in to change notification settings - Fork 12
/
coroutine_volatile.go
140 lines (120 loc) · 3.44 KB
/
coroutine_volatile.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
//go:build !durable
package coroutine
import (
"runtime"
"sync"
"unsafe"
)
// Durable is a constant which takes the values true or false depending on
// whether the program is built with the "durable" tag.
const Durable = false
// New creates a new coroutine which executes f as entry point.
func New[R, S any](f func()) Coroutine[R, S] {
return NewWithReturn[R, S](func() (_ R) {
f()
return
})
}
// New creates a new coroutine which executes f as entry point.
func NewWithReturn[R, S any](f func() R) Coroutine[R, S] {
c := &Context[R, S]{
context: context[R]{
next: make(chan struct{}),
},
}
go func() {
execute(c, func() {
defer func() {
c.done = true
close(c.next)
}()
<-c.next
if !c.stop {
c.result = f()
}
})
}()
return Coroutine[R, S]{ctx: c}
}
// Next executes the coroutine until its next yield point, or until completion.
// The method returns true if the coroutine entered a yield point, after which
// the program should call Recv to obtain the value that the coroutine yielded,
// and Send to set the value that will be returned from the yield point.
func (c Coroutine[R, S]) Next() bool {
if c.ctx.done {
return false
}
c.ctx.next <- struct{}{}
_, ok := <-c.ctx.next
return ok
}
type context[R any] struct {
next chan struct{}
}
func (c *Context[R, S]) Yield(v R) S {
if c.stop {
panic("cannot yield from a coroutine that has been stopped")
}
var zero S
c.send = zero
c.recv = v
c.next <- struct{}{}
<-c.next
if c.stop {
runtime.Goexit()
}
return c.send
}
func (c *Context[R, S]) Marshal() ([]byte, error) {
return nil, ErrNotDurable
}
func (c *Context[R, S]) Unmarshal(b []byte) error {
return ErrNotDurable
}
// The offset from the high address of the stack pointer where the v argument
// of the execute function is stored.
//
// We use a once value to lazily initialize the value when executing coroutines
// because we must compute the exact distance from the high stack pointer on the
// coroutine entry point code path. After initialization, the global offset
// variable is only read from the same goroutine, so there is no race since the
// last write is always observed.
var (
offset uintptr
offsetOnce sync.Once
)
// The load function returns the value passed as first argument to the call to
// execute that started the coroutine.
func load() any {
g := getg()
p := unsafe.Pointer(g.stack.hi - offset)
return *(*any)(p)
}
// The execute function is the entry point of coroutines, it pushes the
// coroutine context in v to the stack, registering it to be retrieved by
// calling load, and invokes f as the entry point.
//
// The coroutine continues execution until a yield point is reached or until
// the function passed as entry point returns.
//
// The function has go:nosplit because the address of its local variables must
// remain stable
//
//go:nosplit
//go:noinline
func execute(v any, f func()) {
p := unsafe.Pointer(&v)
offsetOnce.Do(func() {
g := getg()
// In volatile mode a new goroutine is started to back each coroutine,
// which means that we have control over the distance from the call to
// with and the base pointer of the goroutine stack; we can store the
// offset in a global. It does not matter if this write is performed
// from concurrent threads, it always has the same value.
offset = g.stack.hi - uintptr(p)
})
f()
// Keep the variable alive so we know that it will remain on the stack and
// won't be erased by the GC.
runtime.KeepAlive(v)
}