-
Notifications
You must be signed in to change notification settings - Fork 6
/
Copy pathrequest.go
206 lines (170 loc) · 5.06 KB
/
request.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
// SPDX-FileCopyrightText: 2021 Henry Bubert
//
// SPDX-License-Identifier: MIT
package muxrpc
import (
"context"
"encoding/json"
"errors"
"fmt"
"io"
"net"
"runtime/debug"
"strings"
"github.com/ssbc/go-luigi"
"github.com/ssbc/go-muxrpc/v2/codec"
)
// RequestEncoding hides the specifics of codec.Flag
type RequestEncoding uint
// binary, string and JSON are the three supported format types.
// Don't ask me why we have string and binary, this just copies the javascript secifics.
const (
TypeBinary RequestEncoding = iota
TypeString
TypeJSON
)
// IsValid returns false if the type is not known.
func (rt RequestEncoding) IsValid() bool {
if rt < 0 {
return false
}
if rt > TypeJSON {
return false
}
return true
}
func (rt RequestEncoding) asCodecFlag() (codec.Flag, error) {
if !rt.IsValid() {
return 0, fmt.Errorf("muxrpc: invalid request encoding %d", rt)
}
switch rt {
case TypeBinary:
return 0, nil
case TypeString:
return codec.FlagString, nil
case TypeJSON:
return codec.FlagJSON, nil
default:
return 0, fmt.Errorf("muxrpc: invalid request encoding %d", rt)
}
}
// Method defines the name of the endpoint.
type Method []string
// UnmarshalJSON decodes the
func (m *Method) UnmarshalJSON(d []byte) error {
var newM []string
err := json.Unmarshal(d, &newM)
if err != nil {
// ugly 'manifest' hack. everything else is an array of strings (ie ['whoami'])
var meth string
err := json.Unmarshal(d, &meth)
if err != nil {
return fmt.Errorf("muxrpc/method: error decoding packet: %w", err)
}
newM = Method{meth}
}
*m = newM
return nil
}
func (m Method) String() string {
return strings.Join(m, ".")
}
// Request assembles the state of an RPC call
type Request struct {
// Stream is a legacy adapter for luigi-powered streams
Stream Stream `json:"-"`
// Method is the name of the called function
Method Method `json:"name"`
// Args contains the call arguments
RawArgs json.RawMessage `json:"args"`
// Type is the type of the call, i.e. async, sink, source or duplex
Type CallType `json:"type"`
// luigi-less iterators
sink *ByteSink
source *ByteSource
// same as packet.Req - the numerical identifier for the stream
id int32
// used to stop producing more data on this request
// the calling sight might tell us they had enough of this stream
abort context.CancelFunc
remoteAddr net.Addr
endpoint *rpc
}
// Endpoint returns the client instance to start new calls. Mostly usefull inside handlers.
func (req Request) Endpoint() Endpoint { return req.endpoint }
// RemoteAddr returns the netwrap'ed network adddress of the underlying connection. This is usually a pair of secretstream.Addr and TCP
func (req Request) RemoteAddr() net.Addr { return req.remoteAddr }
// ResponseSink returns the response writer for incoming source requests.
func (req *Request) ResponseSink() (*ByteSink, error) {
if req.Type != "source" && req.Type != "duplex" {
return nil, ErrWrongStreamType{req.Type}
}
return req.sink, nil
}
// ResponseSource returns the reader for incoming data of sink or duplex calls.
func (req *Request) ResponseSource() (*ByteSource, error) {
if req.Type != "sink" && req.Type != "duplex" {
return nil, ErrWrongStreamType{req.Type}
}
return req.source, nil
}
// Args is a legacy stub to get the unmarshaled json arguments
func (req *Request) Args() []interface{} {
fmt.Println("[muxrpc/deprecation] warning: please use RawArgs where ever possible")
debug.PrintStack()
var v []interface{}
json.Unmarshal(req.RawArgs, &v)
return v
}
// Return is a helper that returns on an async call
func (req *Request) Return(ctx context.Context, v interface{}) error {
if req.Type != "async" && req.Type != "sync" {
return fmt.Errorf("cannot return value on %q stream", req.Type)
}
var b []byte
switch tv := v.(type) {
case string:
req.sink.SetEncoding(TypeString)
b = []byte(tv)
default:
req.sink.SetEncoding(TypeJSON)
var err error
b, err = json.Marshal(v)
if err != nil {
return fmt.Errorf("muxrpc: error marshaling return value: %w", err)
}
}
if _, err := req.sink.Write(b); err != nil {
return fmt.Errorf("muxrpc: error writing return value: %w", err)
}
return nil
}
// CloseWithError is used to close an ongoing request. Ie instruct the remote to stop sending data
// or notify it that a stream couldn't be fully filled because of an error
func (req *Request) CloseWithError(cerr error) error {
if cerr == nil || errors.Is(cerr, io.EOF) || errors.Is(cerr, luigi.EOS{}) {
req.source.Cancel(nil)
req.sink.Close()
} else {
req.source.Cancel(cerr)
req.sink.CloseWithError(cerr)
}
// this is a bit ugly but CloseWithError() is the function that HandlerMux uses when replying with "no such command"
req.endpoint.closeStream(req, cerr)
return nil
}
// Close closes the stream with io.EOF
func (req *Request) Close() error {
return req.CloseWithError(io.EOF)
}
// CallType is the type of a call
type CallType string
// Flags returns the packet flags of the respective call type
func (t CallType) Flags() codec.Flag {
switch t {
case "source", "sink", "duplex":
return codec.FlagStream
default:
return 0
}
}