Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat(ws_reverse_proxy): support dynamic route #20

Open
wants to merge 4 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
11 changes: 6 additions & 5 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -111,11 +111,12 @@ func main() {
}
```

| Configuration | Default | Description |
|----------------|---------------------------|------------------------------|
| `WithDirector` | `nil` | customize the forward header |
| `WithDialer` | `gorillaws.DefaultDialer` | for dialer customization |
| `WithUpgrader` | `hzws.HertzUpgrader` | for upgrader customization |
| Configuration | Default | Description |
|--------------------|---------------------------|-------------------------------------------------------------|
| `WithDirector` | `nil` | customize the forward header |
| `WithDialer` | `gorillaws.DefaultDialer` | for dialer customization |
| `WithUpgrader` | `hzws.HertzUpgrader` | for upgrader customization |
| `WithDynamicRoute` | `false` | enable dynamic route (proxy url = handler url + target url) |

### More info
See [example](https://github.com/cloudwego/hertz-examples)
9 changes: 6 additions & 3 deletions ws_reverse_proxy.go
Original file line number Diff line number Diff line change
Expand Up @@ -47,7 +47,6 @@ import (
"net/http"

"github.com/bytedance/gopkg/util/gopool"

"github.com/cloudwego/hertz/pkg/app"
"github.com/cloudwego/hertz/pkg/common/hlog"
"github.com/cloudwego/hertz/pkg/protocol"
Expand Down Expand Up @@ -81,9 +80,13 @@ func (w *WSReverseProxy) ServeHTTP(ctx context.Context, c *app.RequestContext) {
if w.options.Director != nil {
w.options.Director(ctx, c, forwardHeader)
}
connBackend, respBackend, err := w.options.Dialer.Dial(w.target, forwardHeader)
target := w.target
if w.options.DynamicRoute {
target = w.target + b2s(c.Path())
}
connBackend, respBackend, err := w.options.Dialer.Dial(target, forwardHeader)
if err != nil {
hlog.CtxErrorf(ctx, "can not dial to remote backend(%v): %v", w.target, err)
hlog.CtxErrorf(ctx, "can not dial to remote backend(%v): %v", target, err)
if respBackend != nil {
if err = wsCopyResponse(&c.Response, respBackend); err != nil {
hlog.CtxErrorf(ctx, "can not copy response: %v", err)
Expand Down
16 changes: 13 additions & 3 deletions ws_reverse_proxy_option.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,9 +28,10 @@ type Director func(ctx context.Context, c *app.RequestContext, forwardHeader htt
type Option func(o *Options)

type Options struct {
Director Director
Dialer *websocket.Dialer
Upgrader *hzws.HertzUpgrader
Director Director
Dialer *websocket.Dialer
Upgrader *hzws.HertzUpgrader
DynamicRoute bool
}

var DefaultOptions = &Options{
Expand All @@ -40,6 +41,7 @@ var DefaultOptions = &Options{
ReadBufferSize: 1024,
WriteBufferSize: 1024,
},
DynamicRoute: false,
}

func newOptions(opts ...Option) *Options {
Expand Down Expand Up @@ -79,3 +81,11 @@ func WithUpgrader(upgrader *hzws.HertzUpgrader) Option {
o.Upgrader = upgrader
}
}

// WithDynamicRoute enable dynamic route
// proxy url = handler url + target url
func WithDynamicRoute() Option {
return func(o *Options) {
o.DynamicRoute = true
}
}
3 changes: 3 additions & 0 deletions ws_reverse_proxy_option_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -39,15 +39,18 @@ func TestOptions(t *testing.T) {
WithDirector(director),
WithDialer(dialer),
WithUpgrader(upgrader),
WithDynamicRoute(),
)
assert.DeepEqual(t, fmt.Sprintf("%p", director), fmt.Sprintf("%p", options.Director))
assert.DeepEqual(t, fmt.Sprintf("%p", dialer), fmt.Sprintf("%p", options.Dialer))
assert.DeepEqual(t, fmt.Sprintf("%p", upgrader), fmt.Sprintf("%p", options.Upgrader))
assert.DeepEqual(t, true, options.DynamicRoute)
}

func TestDefaultOptions(t *testing.T) {
options := newOptions()
assert.Nil(t, options.Director)
assert.DeepEqual(t, DefaultOptions.Dialer, options.Dialer)
assert.DeepEqual(t, DefaultOptions.Upgrader, options.Upgrader)
assert.DeepEqual(t, DefaultOptions.DynamicRoute, options.DynamicRoute)
}
131 changes: 131 additions & 0 deletions ws_reverse_proxy_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -124,3 +124,134 @@ func TestProxy(t *testing.T) {
assert.DeepEqual(t, websocket.TextMessage, msgType)
assert.DeepEqual(t, msg, string(data))
}

var dynamicBackendURL = "ws://127.0.0.1:9001/api"

func TestProxyWithDynamicRoute(t *testing.T) {
// websocket proxy
supportedSubProtocols := []string{"test-protocol"}
upgrader := &hzws.HertzUpgrader{
ReadBufferSize: 4096,
WriteBufferSize: 4096,
CheckOrigin: func(c *app.RequestContext) bool {
return true
},
Subprotocols: supportedSubProtocols,
}

// enable dynamic route option
proxy := NewWSReverseProxy(dynamicBackendURL, WithUpgrader(upgrader), WithDynamicRoute())

// proxy server
ps := server.Default(server.WithHostPorts(":9000"))
ps.NoHijackConnPool = true
ps.GET("/test", proxy.ServeHTTP)
ps.GET("/test2", proxy.ServeHTTP)
go ps.Spin()

time.Sleep(time.Second * 1)

go func() {
// backend server
bs := server.Default(server.WithHostPorts(":9001"))
bs.NoHijackConnPool = true
bs.GET("/api/test", func(ctx context.Context, c *app.RequestContext) {
// Don't upgrade if original host header isn't preserved
host := string(c.Host())
if host != "127.0.0.1:9000" {
hlog.Errorf("Host header set incorrectly. Expecting 127.0.0.1:9000 got %s", host)
return
}

if err := upgrader.Upgrade(c, func(conn *hzws.Conn) {
msgType, msg, err := conn.ReadMessage()
assert.Nil(t, err)

if err = conn.WriteMessage(msgType, msg); err != nil {
return
}
}); err != nil {
hlog.Errorf("upgrade error: %v", err)
return
}
})
bs.GET("/api/test2", func(ctx context.Context, c *app.RequestContext) {
// Don't upgrade if original host header isn't preserved
host := string(c.Host())
if host != "127.0.0.1:9000" {
hlog.Errorf("Host header set incorrectly. Expecting 127.0.0.1:9000 got %s", host)
return
}

if err := upgrader.Upgrade(c, func(conn *hzws.Conn) {
msgType, msg, err := conn.ReadMessage()
assert.Nil(t, err)

if err = conn.WriteMessage(msgType, msg); err != nil {
return
}
}); err != nil {
hlog.Errorf("upgrade error: %v", err)
return
}
})
bs.Spin()
}()

time.Sleep(time.Second * 1)

// only one is supported by the server
clientSubProtocols := []string{"test-protocol", "test-notsupported"}
h := http.Header{}
for _, subproto := range clientSubProtocols {
h.Add("Sec-WebSocket-Protocol", subproto)
}

// client
conn, resp, err := websocket.DefaultDialer.Dial("ws://127.0.0.1:9000/test", h)
assert.Nil(t, err)
conn2, resp2, err := websocket.DefaultDialer.Dial("ws://127.0.0.1:9000/test2", h)
assert.Nil(t, err)

// check if the server really accepted the correct protocol
in := func(desired string) bool {
for _, proto := range resp.Header[http.CanonicalHeaderKey("Sec-WebSocket-Protocol")] {
if desired == proto {
return true
}
}
return false
}
in2 := func(desired string) bool {
for _, proto := range resp2.Header[http.CanonicalHeaderKey("Sec-WebSocket-Protocol")] {
if desired == proto {
return true
}
}
return false
}

assert.True(t, in("test-protocol"))
assert.True(t, in2("test-protocol"))
assert.False(t, in("test-notsupported"))
assert.False(t, in2("test-notsupported"))

// now write a message and send it to the proxy
msg := "hello world"
err = conn.WriteMessage(websocket.TextMessage, []byte(msg))
assert.Nil(t, err)

msg2 := "hello world2"
err = conn2.WriteMessage(websocket.TextMessage, []byte(msg2))
assert.Nil(t, err)

msgType, data, err := conn.ReadMessage()
assert.Nil(t, err)
assert.DeepEqual(t, websocket.TextMessage, msgType)
assert.DeepEqual(t, msg, string(data))

msgType2, data2, err := conn2.ReadMessage()
assert.Nil(t, err)
assert.DeepEqual(t, websocket.TextMessage, msgType2)
assert.DeepEqual(t, msg2, string(data2))
}
Loading