-
Notifications
You must be signed in to change notification settings - Fork 64
/
Copy pathservice_provider_client.go
93 lines (75 loc) · 1.47 KB
/
service_provider_client.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
package goroslib
import (
"context"
"net"
"reflect"
"time"
"github.com/bluenviron/goroslib/v2/pkg/prototcp"
)
type serviceProviderClient struct {
sp *ServiceProvider
callerID string
nconn net.Conn
tconn *prototcp.Conn
ctx context.Context
ctxCancel func()
}
func newServiceProviderClient(
sp *ServiceProvider,
callerID string,
nconn net.Conn,
tconn *prototcp.Conn,
) *serviceProviderClient {
ctx, ctxCancel := context.WithCancel(sp.ctx)
spc := &serviceProviderClient{
sp: sp,
callerID: callerID,
nconn: nconn,
tconn: tconn,
ctx: ctx,
ctxCancel: ctxCancel,
}
if sp.conf.onClient != nil {
sp.conf.onClient()
}
sp.clientsWg.Add(1)
go spc.run()
return spc
}
func (spc *serviceProviderClient) run() {
defer spc.sp.clientsWg.Done()
readErr := make(chan error)
go func() {
readErr <- spc.runReader()
}()
select {
case <-readErr:
spc.nconn.Close()
case <-spc.ctx.Done():
spc.nconn.Close()
<-readErr
}
spc.ctxCancel()
select {
case spc.sp.clientClose <- spc:
case <-spc.sp.ctx.Done():
}
}
func (spc *serviceProviderClient) runReader() error {
spc.nconn.SetReadDeadline(time.Time{})
for {
req := reflect.New(reflect.TypeOf(spc.sp.srvReq)).Interface()
err := spc.tconn.ReadMessage(req)
if err != nil {
return err
}
select {
case spc.sp.clientRequest <- serviceProviderClientRequestReq{
spc: spc,
req: req,
}:
case <-spc.sp.ctx.Done():
return ErrProviderTerminated
}
}
}