From e80ccc3f1cd8c73ade665aee5463933ecf6f55a0 Mon Sep 17 00:00:00 2001 From: sbruens Date: Fri, 6 Sep 2024 12:16:48 -0400 Subject: [PATCH 01/13] refactor: create re-usable service that can be re-used by Caddy --- cmd/outline-ss-server/main.go | 114 ++--- cmd/outline-ss-server/metrics.go | 550 +----------------------- cmd/outline-ss-server/metrics_test.go | 184 ++------- cmd/outline-ss-server/server_test.go | 9 +- prometheus/metrics.go | 574 ++++++++++++++++++++++++++ prometheus/metrics_test.go | 226 ++++++++++ service/service.go | 130 ++++++ 7 files changed, 1025 insertions(+), 762 deletions(-) create mode 100644 prometheus/metrics.go create mode 100644 prometheus/metrics_test.go create mode 100644 service/service.go diff --git a/cmd/outline-ss-server/main.go b/cmd/outline-ss-server/main.go index e61150c7..6fd84dfa 100644 --- a/cmd/outline-ss-server/main.go +++ b/cmd/outline-ss-server/main.go @@ -16,7 +16,6 @@ package main import ( "container/list" - "context" "flag" "fmt" "log/slog" @@ -29,9 +28,9 @@ import ( "syscall" "time" - "github.com/Jigsaw-Code/outline-sdk/transport" "github.com/Jigsaw-Code/outline-sdk/transport/shadowsocks" "github.com/Jigsaw-Code/outline-ss-server/ipinfo" + outline_prometheus "github.com/Jigsaw-Code/outline-ss-server/prometheus" "github.com/Jigsaw-Code/outline-ss-server/service" "github.com/lmittmann/tint" "github.com/prometheus/client_golang/prometheus" @@ -59,11 +58,12 @@ func init() { } type SSServer struct { - stopConfig func() error - lnManager service.ListenerManager - natTimeout time.Duration - m *outlineMetrics - replayCache service.ReplayCache + stopConfig func() error + lnManager service.ListenerManager + natTimeout time.Duration + serverMetrics *serverMetrics + serviceMetrics service.ServiceMetrics + replayCache service.ReplayCache } func (s *SSServer) loadConfig(filename string) error { @@ -120,32 +120,6 @@ func newCipherListFromConfig(config ServiceConfig) (service.CipherList, error) { return ciphers, nil } -func (s *SSServer) NewShadowsocksStreamHandler(ciphers service.CipherList) service.StreamHandler { - authFunc := service.NewShadowsocksStreamAuthenticator(ciphers, &s.replayCache, s.m.tcpServiceMetrics) - // TODO: Register initial data metrics at zero. - return service.NewStreamHandler(authFunc, tcpReadTimeout) -} - -func (s *SSServer) NewShadowsocksPacketHandler(ciphers service.CipherList) service.PacketHandler { - return service.NewPacketHandler(s.natTimeout, ciphers, s.m, s.m.udpServiceMetrics) -} - -func (s *SSServer) NewShadowsocksStreamHandlerFromConfig(config ServiceConfig) (service.StreamHandler, error) { - ciphers, err := newCipherListFromConfig(config) - if err != nil { - return nil, err - } - return s.NewShadowsocksStreamHandler(ciphers), nil -} - -func (s *SSServer) NewShadowsocksPacketHandlerFromConfig(config ServiceConfig) (service.PacketHandler, error) { - ciphers, err := newCipherListFromConfig(config) - if err != nil { - return nil, err - } - return s.NewShadowsocksPacketHandler(ciphers), nil -} - type listenerSet struct { manager service.ListenerManager listenerCloseFuncs map[string]func() error @@ -243,31 +217,41 @@ func (s *SSServer) runConfig(config Config) (func() error, error) { ciphers := service.NewCipherList() ciphers.Update(cipherList) - sh := s.NewShadowsocksStreamHandler(ciphers) + ssService, err := service.NewService( + service.WithCiphers(ciphers), + service.WithNatTimeout(s.natTimeout), + service.WithMetrics(s.serviceMetrics), + service.WithReplayCache(&s.replayCache), + ) ln, err := lnSet.ListenStream(addr) if err != nil { return err } slog.Info("TCP service started.", "address", ln.Addr().String()) - go service.StreamServe(ln.AcceptStream, func(ctx context.Context, conn transport.StreamConn) { - connMetrics := s.m.AddOpenTCPConnection(conn) - sh.Handle(ctx, conn, connMetrics) - }) + go service.StreamServe(ln.AcceptStream, ssService.HandleStream) pc, err := lnSet.ListenPacket(addr) if err != nil { return err } slog.Info("UDP service started.", "address", pc.LocalAddr().String()) - ph := s.NewShadowsocksPacketHandler(ciphers) - go ph.Handle(pc) + go ssService.HandlePacket(pc) } for _, serviceConfig := range config.Services { - var ( - sh service.StreamHandler - ph service.PacketHandler + ciphers, err := newCipherListFromConfig(serviceConfig) + if err != nil { + return fmt.Errorf("failed to create cipher list from config: %v", err) + } + ssService, err := service.NewService( + service.WithCiphers(ciphers), + service.WithNatTimeout(s.natTimeout), + service.WithMetrics(s.serviceMetrics), + service.WithReplayCache(&s.replayCache), ) + if err != nil { + return err + } for _, lnConfig := range serviceConfig.Listeners { switch lnConfig.Type { case listenerTypeTCP: @@ -276,36 +260,21 @@ func (s *SSServer) runConfig(config Config) (func() error, error) { return err } slog.Info("TCP service started.", "address", ln.Addr().String()) - if sh == nil { - sh, err = s.NewShadowsocksStreamHandlerFromConfig(serviceConfig) - if err != nil { - return err - } - } - go service.StreamServe(ln.AcceptStream, func(ctx context.Context, conn transport.StreamConn) { - connMetrics := s.m.AddOpenTCPConnection(conn) - sh.Handle(ctx, conn, connMetrics) - }) + go service.StreamServe(ln.AcceptStream, ssService.HandleStream) case listenerTypeUDP: pc, err := lnSet.ListenPacket(lnConfig.Address) if err != nil { return err } slog.Info("UDP service started.", "address", pc.LocalAddr().String()) - if ph == nil { - ph, err = s.NewShadowsocksPacketHandlerFromConfig(serviceConfig) - if err != nil { - return err - } - } - go ph.Handle(pc) + go ssService.HandlePacket(pc) } } totalCipherCount += len(serviceConfig.Keys) } slog.Info("Loaded config.", "access_keys", totalCipherCount, "listeners", lnSet.Len()) - s.m.SetNumAccessKeys(totalCipherCount, lnSet.Len()) + s.serverMetrics.SetNumAccessKeys(totalCipherCount, lnSet.Len()) return nil }() @@ -341,12 +310,13 @@ func (s *SSServer) Stop() error { } // RunSSServer starts a shadowsocks server running, and returns the server or an error. -func RunSSServer(filename string, natTimeout time.Duration, sm *outlineMetrics, replayHistory int) (*SSServer, error) { +func RunSSServer(filename string, natTimeout time.Duration, serverMetrics *serverMetrics, serviceMetrics service.ServiceMetrics, replayHistory int) (*SSServer, error) { server := &SSServer{ - lnManager: service.NewListenerManager(), - natTimeout: natTimeout, - m: sm, - replayCache: service.NewReplayCache(replayHistory), + lnManager: service.NewListenerManager(), + natTimeout: natTimeout, + serverMetrics: serverMetrics, + serviceMetrics: serviceMetrics, + replayCache: service.NewReplayCache(replayHistory), } err := server.loadConfig(filename) if err != nil { @@ -424,14 +394,16 @@ func main() { } defer ip2info.Close() - metrics, err := newPrometheusOutlineMetrics(ip2info) + serverMetrics := newPrometheusServerMetrics() + serverMetrics.SetVersion(version) + serviceMetrics, err := outline_prometheus.NewServiceMetrics(ip2info) if err != nil { - slog.Error("Failed to create Outline Prometheus metrics. Aborting.", "err", err) + slog.Error("Failed to create Outline Prometheus service metrics. Aborting.", "err", err) } - metrics.SetBuildInfo(version) r := prometheus.WrapRegistererWithPrefix("shadowsocks_", prometheus.DefaultRegisterer) - r.MustRegister(metrics) - _, err = RunSSServer(flags.ConfigFile, flags.natTimeout, metrics, flags.replayHistory) + r.MustRegister(serverMetrics, serviceMetrics) + + _, err = RunSSServer(flags.ConfigFile, flags.natTimeout, serverMetrics, serviceMetrics, flags.replayHistory) if err != nil { slog.Error("Server failed to start. Aborting.", "err", err) } diff --git a/cmd/outline-ss-server/metrics.go b/cmd/outline-ss-server/metrics.go index 5766163a..32c9b0aa 100644 --- a/cmd/outline-ss-server/metrics.go +++ b/cmd/outline-ss-server/metrics.go @@ -15,495 +15,27 @@ package main import ( - "fmt" - "log/slog" - "net" - "net/netip" - "sync" "time" - "github.com/Jigsaw-Code/outline-ss-server/ipinfo" - "github.com/Jigsaw-Code/outline-ss-server/service" - "github.com/Jigsaw-Code/outline-ss-server/service/metrics" "github.com/prometheus/client_golang/prometheus" ) // `now` is stubbable for testing. var now = time.Now -func NewTimeToCipherVec(proto string) (prometheus.ObserverVec, error) { - vec := prometheus.NewHistogramVec( - prometheus.HistogramOpts{ - Name: "time_to_cipher_ms", - Help: "Time needed to find the cipher", - Buckets: []float64{0.1, 1, 10, 100, 1000}, - }, []string{"proto", "found_key"}) - return vec.CurryWith(map[string]string{"proto": proto}) -} - -type proxyCollector struct { - // NOTE: New metrics need to be added to `newProxyCollector()`, `Describe()` and `Collect()`. - dataBytesPerKey *prometheus.CounterVec - dataBytesPerLocation *prometheus.CounterVec -} - -func newProxyCollector(proto string) (*proxyCollector, error) { - dataBytesPerKey, err := prometheus.NewCounterVec( - prometheus.CounterOpts{ - Name: "data_bytes", - Help: "Bytes transferred by the proxy, per access key", - }, []string{"proto", "dir", "access_key"}).CurryWith(map[string]string{"proto": proto}) - if err != nil { - return nil, err - } - dataBytesPerLocation, err := prometheus.NewCounterVec( - prometheus.CounterOpts{ - Name: "data_bytes_per_location", - Help: "Bytes transferred by the proxy, per location", - }, []string{"proto", "dir", "location", "asn", "asorg"}).CurryWith(map[string]string{"proto": proto}) - if err != nil { - return nil, err - } - return &proxyCollector{ - dataBytesPerKey: dataBytesPerKey, - dataBytesPerLocation: dataBytesPerLocation, - }, nil -} - -func (c *proxyCollector) Describe(ch chan<- *prometheus.Desc) { - c.dataBytesPerKey.Describe(ch) - c.dataBytesPerLocation.Describe(ch) -} - -func (c *proxyCollector) Collect(ch chan<- prometheus.Metric) { - c.dataBytesPerKey.Collect(ch) - c.dataBytesPerLocation.Collect(ch) -} - -func (c *proxyCollector) addClientTarget(clientProxyBytes, proxyTargetBytes int64, accessKey string, clientInfo ipinfo.IPInfo) { - addIfNonZero(clientProxyBytes, c.dataBytesPerKey, "c>p", accessKey) - addIfNonZero(clientProxyBytes, c.dataBytesPerLocation, "c>p", clientInfo.CountryCode.String(), asnLabel(clientInfo.ASN.Number), clientInfo.ASN.Organization) - addIfNonZero(proxyTargetBytes, c.dataBytesPerKey, "p>t", accessKey) - addIfNonZero(proxyTargetBytes, c.dataBytesPerLocation, "p>t", clientInfo.CountryCode.String(), asnLabel(clientInfo.ASN.Number), clientInfo.ASN.Organization) -} - -func (c *proxyCollector) addTargetClient(targetProxyBytes, proxyClientBytes int64, accessKey string, clientInfo ipinfo.IPInfo) { - addIfNonZero(targetProxyBytes, c.dataBytesPerKey, "p 0 { - counterVec.WithLabelValues(lvs...).Add(float64(value)) - } -} - -func asnLabel(asn int) string { - if asn == 0 { - return "" - } - return fmt.Sprint(asn) -} - -// Converts a [net.Addr] to an [IPKey]. -func toIPKey(addr net.Addr, accessKey string) (*IPKey, error) { - hostname, _, err := net.SplitHostPort(addr.String()) - if err != nil { - return nil, fmt.Errorf("failed to create IPKey: %w", err) - } - ip, err := netip.ParseAddr(hostname) - if err != nil { - return nil, fmt.Errorf("failed to create IPKey: %w", err) - } - return &IPKey{ip, accessKey}, nil -} diff --git a/cmd/outline-ss-server/metrics_test.go b/cmd/outline-ss-server/metrics_test.go index 15a112bf..93cce446 100644 --- a/cmd/outline-ss-server/metrics_test.go +++ b/cmd/outline-ss-server/metrics_test.go @@ -21,7 +21,6 @@ import ( "time" "github.com/Jigsaw-Code/outline-ss-server/ipinfo" - "github.com/Jigsaw-Code/outline-ss-server/service/metrics" "github.com/op/go-logging" "github.com/prometheus/client_golang/prometheus" promtest "github.com/prometheus/client_golang/prometheus/testutil" @@ -63,166 +62,49 @@ func (c *fakeConn) RemoteAddr() net.Addr { } func TestMethodsDontPanic(t *testing.T) { - ssMetrics, _ := newPrometheusOutlineMetrics(nil) - proxyMetrics := metrics.ProxyMetrics{ - ClientProxy: 1, - ProxyTarget: 2, - TargetProxy: 3, - ProxyClient: 4, - } - addr := fakeAddr("127.0.0.1:9") - ssMetrics.SetBuildInfo("0.0.0-test") - ssMetrics.SetNumAccessKeys(20, 2) - - tcpMetrics := ssMetrics.AddOpenTCPConnection(&fakeConn{}) - tcpMetrics.AddAuthenticated("0") - tcpMetrics.AddClosed("OK", proxyMetrics, 10*time.Millisecond) - tcpMetrics.AddProbe("ERR_CIPHER", "eof", proxyMetrics.ClientProxy) - - udpMetrics := ssMetrics.AddUDPNatEntry(addr, "key-1") - udpMetrics.AddPacketFromClient("OK", 10, 20) - udpMetrics.AddPacketFromTarget("OK", 10, 20) - udpMetrics.RemoveNatEntry() - - ssMetrics.tcpServiceMetrics.AddCipherSearch(true, 10*time.Millisecond) - ssMetrics.udpServiceMetrics.AddCipherSearch(true, 10*time.Millisecond) -} - -func TestASNLabel(t *testing.T) { - require.Equal(t, "", asnLabel(0)) - require.Equal(t, "100", asnLabel(100)) + m := newPrometheusServerMetrics() + m.SetVersion("0.0.0-test") + m.SetNumAccessKeys(20, 2) } -func TestTunnelTime(t *testing.T) { - t.Run("PerKey", func(t *testing.T) { - setNow(time.Date(2010, 1, 2, 3, 4, 5, .0, time.Local)) - ssMetrics, _ := newPrometheusOutlineMetrics(nil) - reg := prometheus.NewPedanticRegistry() - reg.MustRegister(ssMetrics) - - connMetrics := ssMetrics.AddOpenTCPConnection(&fakeConn{}) - connMetrics.AddAuthenticated("key-1") - setNow(time.Date(2010, 1, 2, 3, 4, 20, .0, time.Local)) - - expected := strings.NewReader(` - # HELP tunnel_time_seconds Tunnel time, per access key. - # TYPE tunnel_time_seconds counter - tunnel_time_seconds{access_key="key-1"} 15 - `) - err := promtest.GatherAndCompare( - reg, - expected, - "tunnel_time_seconds", - ) - require.NoError(t, err, "unexpected metric value found") - }) - - t.Run("PerLocation", func(t *testing.T) { - setNow(time.Date(2010, 1, 2, 3, 4, 5, .0, time.Local)) - ssMetrics, _ := newPrometheusOutlineMetrics(&noopMap{}) - reg := prometheus.NewPedanticRegistry() - reg.MustRegister(ssMetrics) - - connMetrics := ssMetrics.AddOpenTCPConnection(&fakeConn{}) - connMetrics.AddAuthenticated("key-1") - setNow(time.Date(2010, 1, 2, 3, 4, 10, .0, time.Local)) - - expected := strings.NewReader(` - # HELP tunnel_time_seconds_per_location Tunnel time, per location. - # TYPE tunnel_time_seconds_per_location counter - tunnel_time_seconds_per_location{asn="",asorg="",location="XL"} 5 - `) - err := promtest.GatherAndCompare( - reg, - expected, - "tunnel_time_seconds_per_location", - ) - require.NoError(t, err, "unexpected metric value found") - }) -} - -func TestTunnelTimePerKeyDoesNotPanicOnUnknownClosedConnection(t *testing.T) { +func TestSetVersion(t *testing.T) { + m := newPrometheusServerMetrics() reg := prometheus.NewPedanticRegistry() - ssMetrics, _ := newPrometheusOutlineMetrics(nil) + reg.MustRegister(m) - connMetrics := ssMetrics.AddOpenTCPConnection(&fakeConn{}) - connMetrics.AddClosed("OK", metrics.ProxyMetrics{}, time.Minute) + m.SetVersion("0.0.0-test") err := promtest.GatherAndCompare( reg, - strings.NewReader(""), - "tunnel_time_seconds", + strings.NewReader(` + # HELP build_info Information on the outline-ss-server build + # TYPE build_info gauge + build_info{version="0.0.0-test"} 1 + `), + "build_info", ) - require.NoError(t, err, "unexpectedly found metric value") -} - -func BenchmarkOpenTCP(b *testing.B) { - ssMetrics, _ := newPrometheusOutlineMetrics(nil) - conn := &fakeConn{} - b.ResetTimer() - for i := 0; i < b.N; i++ { - ssMetrics.AddOpenTCPConnection(conn) - } -} - -func BenchmarkCloseTCP(b *testing.B) { - ssMetrics, _ := newPrometheusOutlineMetrics(nil) - connMetrics := ssMetrics.AddOpenTCPConnection(&fakeConn{}) - accessKey := "key 1" - status := "OK" - data := metrics.ProxyMetrics{} - duration := time.Minute - b.ResetTimer() - for i := 0; i < b.N; i++ { - connMetrics.AddAuthenticated(accessKey) - connMetrics.AddClosed(status, data, duration) - } + require.NoError(t, err, "unexpected metric value found") } -func BenchmarkProbe(b *testing.B) { - ssMetrics, _ := newPrometheusOutlineMetrics(nil) - connMetrics := ssMetrics.AddOpenTCPConnection(&fakeConn{}) - status := "ERR_REPLAY" - drainResult := "other" - data := metrics.ProxyMetrics{} - b.ResetTimer() - for i := 0; i < b.N; i++ { - connMetrics.AddProbe(status, drainResult, data.ClientProxy) - } -} +func TestSetNumAccessKeys(t *testing.T) { + m := newPrometheusServerMetrics() + reg := prometheus.NewPedanticRegistry() + reg.MustRegister(m) -func BenchmarkClientUDP(b *testing.B) { - ssMetrics, _ := newPrometheusOutlineMetrics(nil) - addr := fakeAddr("127.0.0.1:9") - accessKey := "key 1" - udpMetrics := ssMetrics.AddUDPNatEntry(addr, accessKey) - status := "OK" - size := int64(1000) - b.ResetTimer() - for i := 0; i < b.N; i++ { - udpMetrics.AddPacketFromClient(status, size, size) - } -} + m.SetNumAccessKeys(1, 2) -func BenchmarkTargetUDP(b *testing.B) { - ssMetrics, _ := newPrometheusOutlineMetrics(nil) - addr := fakeAddr("127.0.0.1:9") - accessKey := "key 1" - udpMetrics := ssMetrics.AddUDPNatEntry(addr, accessKey) - status := "OK" - size := int64(1000) - b.ResetTimer() - for i := 0; i < b.N; i++ { - udpMetrics.AddPacketFromTarget(status, size, size) - } -} - -func BenchmarkNAT(b *testing.B) { - ssMetrics, _ := newPrometheusOutlineMetrics(nil) - addr := fakeAddr("127.0.0.1:9") - b.ResetTimer() - for i := 0; i < b.N; i++ { - udpMetrics := ssMetrics.AddUDPNatEntry(addr, "key-0") - udpMetrics.RemoveNatEntry() - } + err := promtest.GatherAndCompare( + reg, + strings.NewReader(` + # HELP keys Count of access keys + # TYPE keys gauge + keys 1 + # HELP ports Count of open ports + # TYPE ports gauge + ports 2 + `), + "keys", + "ports", + ) + require.NoError(t, err, "unexpected metric value found") } diff --git a/cmd/outline-ss-server/server_test.go b/cmd/outline-ss-server/server_test.go index 20729a06..05999486 100644 --- a/cmd/outline-ss-server/server_test.go +++ b/cmd/outline-ss-server/server_test.go @@ -17,14 +17,17 @@ package main import ( "testing" "time" + + "github.com/Jigsaw-Code/outline-ss-server/prometheus" ) func TestRunSSServer(t *testing.T) { - m, err := newPrometheusOutlineMetrics(nil) + serverMetrics := newPrometheusServerMetrics() + serviceMetrics, err := prometheus.NewServiceMetrics(nil) if err != nil { - t.Fatalf("Failed to create Prometheus metrics: %v", err) + t.Fatalf("Failed to create Prometheus service metrics: %v", err) } - server, err := RunSSServer("config_example.yml", 30*time.Second, m, 10000) + server, err := RunSSServer("config_example.yml", 30*time.Second, serverMetrics, serviceMetrics, 10000) if err != nil { t.Fatalf("RunSSServer() error = %v", err) } diff --git a/prometheus/metrics.go b/prometheus/metrics.go new file mode 100644 index 00000000..186cba7c --- /dev/null +++ b/prometheus/metrics.go @@ -0,0 +1,574 @@ +// Copyright 2023 Jigsaw Operations LLC +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// https://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package prometheus + +import ( + "fmt" + "log/slog" + "net" + "net/netip" + "sync" + "time" + + "github.com/Jigsaw-Code/outline-ss-server/ipinfo" + "github.com/Jigsaw-Code/outline-ss-server/service" + "github.com/Jigsaw-Code/outline-ss-server/service/metrics" + "github.com/prometheus/client_golang/prometheus" +) + +// `now` is stubbable for testing. +var now = time.Now + +func newTimeToCipherVec(proto string) (prometheus.ObserverVec, error) { + vec := prometheus.NewHistogramVec( + prometheus.HistogramOpts{ + Name: "time_to_cipher_ms", + Help: "Time needed to find the cipher", + Buckets: []float64{0.1, 1, 10, 100, 1000}, + }, []string{"proto", "found_key"}) + return vec.CurryWith(map[string]string{"proto": proto}) +} + +type proxyCollector struct { + // NOTE: New metrics need to be added to `newProxyCollector()`, `Describe()` and `Collect()`. + dataBytesPerKey *prometheus.CounterVec + dataBytesPerLocation *prometheus.CounterVec +} + +func newProxyCollector(proto string) (*proxyCollector, error) { + dataBytesPerKey, err := prometheus.NewCounterVec( + prometheus.CounterOpts{ + Name: "data_bytes", + Help: "Bytes transferred by the proxy, per access key", + }, []string{"proto", "dir", "access_key"}).CurryWith(map[string]string{"proto": proto}) + if err != nil { + return nil, err + } + dataBytesPerLocation, err := prometheus.NewCounterVec( + prometheus.CounterOpts{ + Name: "data_bytes_per_location", + Help: "Bytes transferred by the proxy, per location", + }, []string{"proto", "dir", "location", "asn", "asorg"}).CurryWith(map[string]string{"proto": proto}) + if err != nil { + return nil, err + } + return &proxyCollector{ + dataBytesPerKey: dataBytesPerKey, + dataBytesPerLocation: dataBytesPerLocation, + }, nil +} + +func (c *proxyCollector) Describe(ch chan<- *prometheus.Desc) { + c.dataBytesPerKey.Describe(ch) + c.dataBytesPerLocation.Describe(ch) +} + +func (c *proxyCollector) Collect(ch chan<- prometheus.Metric) { + c.dataBytesPerKey.Collect(ch) + c.dataBytesPerLocation.Collect(ch) +} + +func (c *proxyCollector) addClientTarget(clientProxyBytes, proxyTargetBytes int64, accessKey string, clientInfo ipinfo.IPInfo) { + addIfNonZero(clientProxyBytes, c.dataBytesPerKey, "c>p", accessKey) + addIfNonZero(clientProxyBytes, c.dataBytesPerLocation, "c>p", clientInfo.CountryCode.String(), asnLabel(clientInfo.ASN.Number), clientInfo.ASN.Organization) + addIfNonZero(proxyTargetBytes, c.dataBytesPerKey, "p>t", accessKey) + addIfNonZero(proxyTargetBytes, c.dataBytesPerLocation, "p>t", clientInfo.CountryCode.String(), asnLabel(clientInfo.ASN.Number), clientInfo.ASN.Organization) +} + +func (c *proxyCollector) addTargetClient(targetProxyBytes, proxyClientBytes int64, accessKey string, clientInfo ipinfo.IPInfo) { + addIfNonZero(targetProxyBytes, c.dataBytesPerKey, "p 0 { + counterVec.WithLabelValues(lvs...).Add(float64(value)) + } +} + +func asnLabel(asn int) string { + if asn == 0 { + return "" + } + return fmt.Sprint(asn) +} + +// Converts a [net.Addr] to an [IPKey]. +func toIPKey(addr net.Addr, accessKey string) (*IPKey, error) { + hostname, _, err := net.SplitHostPort(addr.String()) + if err != nil { + return nil, fmt.Errorf("failed to create IPKey: %w", err) + } + ip, err := netip.ParseAddr(hostname) + if err != nil { + return nil, fmt.Errorf("failed to create IPKey: %w", err) + } + return &IPKey{ip, accessKey}, nil +} diff --git a/prometheus/metrics_test.go b/prometheus/metrics_test.go new file mode 100644 index 00000000..5dfcf05a --- /dev/null +++ b/prometheus/metrics_test.go @@ -0,0 +1,226 @@ +// Copyright 2023 Jigsaw Operations LLC +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// https://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package prometheus + +import ( + "net" + "strings" + "testing" + "time" + + "github.com/Jigsaw-Code/outline-ss-server/ipinfo" + "github.com/Jigsaw-Code/outline-ss-server/service/metrics" + "github.com/op/go-logging" + "github.com/prometheus/client_golang/prometheus" + promtest "github.com/prometheus/client_golang/prometheus/testutil" + "github.com/stretchr/testify/require" +) + +type noopMap struct{} + +func (*noopMap) GetIPInfo(ip net.IP) (ipinfo.IPInfo, error) { + return ipinfo.IPInfo{}, nil +} + +type fakeAddr string + +func (a fakeAddr) String() string { return string(a) } +func (a fakeAddr) Network() string { return "" } + +// Sets the processing clock to be t until changed. +func setNow(t time.Time) { + now = func() time.Time { + return t + } +} + +func init() { + logging.SetLevel(logging.INFO, "") +} + +type fakeConn struct { + net.Conn +} + +func (c *fakeConn) LocalAddr() net.Addr { + return fakeAddr("127.0.0.1:9") +} + +func (c *fakeConn) RemoteAddr() net.Addr { + return fakeAddr("127.0.0.1:10") +} + +func TestMethodsDontPanic(t *testing.T) { + ssMetrics, _ := NewServiceMetrics(nil) + proxyMetrics := metrics.ProxyMetrics{ + ClientProxy: 1, + ProxyTarget: 2, + TargetProxy: 3, + ProxyClient: 4, + } + addr := fakeAddr("127.0.0.1:9") + + tcpMetrics := ssMetrics.AddOpenTCPConnection(&fakeConn{}) + tcpMetrics.AddAuthenticated("0") + tcpMetrics.AddClosed("OK", proxyMetrics, 10*time.Millisecond) + tcpMetrics.AddProbe("ERR_CIPHER", "eof", proxyMetrics.ClientProxy) + + udpMetrics := ssMetrics.AddUDPNatEntry(addr, "key-1") + udpMetrics.AddPacketFromClient("OK", 10, 20) + udpMetrics.AddPacketFromTarget("OK", 10, 20) + udpMetrics.RemoveNatEntry() + + ssMetrics.tcpServiceMetrics.AddCipherSearch(true, 10*time.Millisecond) + ssMetrics.udpServiceMetrics.AddCipherSearch(true, 10*time.Millisecond) +} + +func TestASNLabel(t *testing.T) { + require.Equal(t, "", asnLabel(0)) + require.Equal(t, "100", asnLabel(100)) +} + +func TestTunnelTime(t *testing.T) { + t.Run("PerKey", func(t *testing.T) { + setNow(time.Date(2010, 1, 2, 3, 4, 5, .0, time.Local)) + ssMetrics, _ := NewServiceMetrics(nil) + reg := prometheus.NewPedanticRegistry() + reg.MustRegister(ssMetrics) + + connMetrics := ssMetrics.AddOpenTCPConnection(&fakeConn{}) + connMetrics.AddAuthenticated("key-1") + setNow(time.Date(2010, 1, 2, 3, 4, 20, .0, time.Local)) + + expected := strings.NewReader(` + # HELP tunnel_time_seconds Tunnel time, per access key. + # TYPE tunnel_time_seconds counter + tunnel_time_seconds{access_key="key-1"} 15 + `) + err := promtest.GatherAndCompare( + reg, + expected, + "tunnel_time_seconds", + ) + require.NoError(t, err, "unexpected metric value found") + }) + + t.Run("PerLocation", func(t *testing.T) { + setNow(time.Date(2010, 1, 2, 3, 4, 5, .0, time.Local)) + ssMetrics, _ := NewServiceMetrics(&noopMap{}) + reg := prometheus.NewPedanticRegistry() + reg.MustRegister(ssMetrics) + + connMetrics := ssMetrics.AddOpenTCPConnection(&fakeConn{}) + connMetrics.AddAuthenticated("key-1") + setNow(time.Date(2010, 1, 2, 3, 4, 10, .0, time.Local)) + + expected := strings.NewReader(` + # HELP tunnel_time_seconds_per_location Tunnel time, per location. + # TYPE tunnel_time_seconds_per_location counter + tunnel_time_seconds_per_location{asn="",asorg="",location="XL"} 5 + `) + err := promtest.GatherAndCompare( + reg, + expected, + "tunnel_time_seconds_per_location", + ) + require.NoError(t, err, "unexpected metric value found") + }) +} + +func TestTunnelTimePerKeyDoesNotPanicOnUnknownClosedConnection(t *testing.T) { + reg := prometheus.NewPedanticRegistry() + ssMetrics, _ := NewServiceMetrics(nil) + + connMetrics := ssMetrics.AddOpenTCPConnection(&fakeConn{}) + connMetrics.AddClosed("OK", metrics.ProxyMetrics{}, time.Minute) + + err := promtest.GatherAndCompare( + reg, + strings.NewReader(""), + "tunnel_time_seconds", + ) + require.NoError(t, err, "unexpectedly found metric value") +} + +func BenchmarkOpenTCP(b *testing.B) { + ssMetrics, _ := NewServiceMetrics(nil) + conn := &fakeConn{} + b.ResetTimer() + for i := 0; i < b.N; i++ { + ssMetrics.AddOpenTCPConnection(conn) + } +} + +func BenchmarkCloseTCP(b *testing.B) { + ssMetrics, _ := NewServiceMetrics(nil) + connMetrics := ssMetrics.AddOpenTCPConnection(&fakeConn{}) + accessKey := "key 1" + status := "OK" + data := metrics.ProxyMetrics{} + duration := time.Minute + b.ResetTimer() + for i := 0; i < b.N; i++ { + connMetrics.AddAuthenticated(accessKey) + connMetrics.AddClosed(status, data, duration) + } +} + +func BenchmarkProbe(b *testing.B) { + ssMetrics, _ := NewServiceMetrics(nil) + connMetrics := ssMetrics.AddOpenTCPConnection(&fakeConn{}) + status := "ERR_REPLAY" + drainResult := "other" + data := metrics.ProxyMetrics{} + b.ResetTimer() + for i := 0; i < b.N; i++ { + connMetrics.AddProbe(status, drainResult, data.ClientProxy) + } +} + +func BenchmarkClientUDP(b *testing.B) { + ssMetrics, _ := NewServiceMetrics(nil) + addr := fakeAddr("127.0.0.1:9") + accessKey := "key 1" + udpMetrics := ssMetrics.AddUDPNatEntry(addr, accessKey) + status := "OK" + size := int64(1000) + b.ResetTimer() + for i := 0; i < b.N; i++ { + udpMetrics.AddPacketFromClient(status, size, size) + } +} + +func BenchmarkTargetUDP(b *testing.B) { + ssMetrics, _ := NewServiceMetrics(nil) + addr := fakeAddr("127.0.0.1:9") + accessKey := "key 1" + udpMetrics := ssMetrics.AddUDPNatEntry(addr, accessKey) + status := "OK" + size := int64(1000) + b.ResetTimer() + for i := 0; i < b.N; i++ { + udpMetrics.AddPacketFromTarget(status, size, size) + } +} + +func BenchmarkNAT(b *testing.B) { + ssMetrics, _ := NewServiceMetrics(nil) + addr := fakeAddr("127.0.0.1:9") + b.ResetTimer() + for i := 0; i < b.N; i++ { + udpMetrics := ssMetrics.AddUDPNatEntry(addr, "key-0") + udpMetrics.RemoveNatEntry() + } +} diff --git a/service/service.go b/service/service.go new file mode 100644 index 00000000..ae8f625d --- /dev/null +++ b/service/service.go @@ -0,0 +1,130 @@ +// Copyright 2024 The Outline Authors +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package service + +import ( + "context" + "fmt" + "net" + "time" + + "github.com/Jigsaw-Code/outline-sdk/transport" +) + +const ( + // 59 seconds is most common timeout for servers that do not respond to invalid requests + tcpReadTimeout time.Duration = 59 * time.Second + + // A UDP NAT timeout of at least 5 minutes is recommended in RFC 4787 Section 4.3. + defaultNatTimeout time.Duration = 5 * time.Minute +) + +type ServiceMetrics interface { + UDPMetrics + AddOpenTCPConnection(conn net.Conn) TCPConnMetrics + AddCipherSearch(proto string, accessKeyFound bool, timeToCipher time.Duration) +} + +type Service interface { + HandleStream(ctx context.Context, conn transport.StreamConn) + HandlePacket(conn net.PacketConn) +} + +// Option user's option. +type Option func(s *ssService) error + +type ssService struct { + m ServiceMetrics + ciphers CipherList + natTimeout time.Duration + replayCache *ReplayCache + + sh StreamHandler + ph PacketHandler +} + +func NewService(opts ...Option) (Service, error) { + s := &ssService{} + + for _, opt := range opts { + if err := opt(s); err != nil { + return nil, fmt.Errorf("failed to create new service: %v", err) + } + } + + if s.natTimeout == 0 { + s.natTimeout = defaultNatTimeout + } + return s, nil +} + +// WithCiphers option function. +func WithCiphers(ciphers CipherList) Option { + return func(s *ssService) error { + s.ciphers = ciphers + return nil + } +} + +// WithMetrics option function. +func WithMetrics(metrics ServiceMetrics) Option { + return func(s *ssService) error { + s.m = metrics + return nil + } +} + +// WithReplayCache option function. +func WithReplayCache(replayCache *ReplayCache) Option { + return func(s *ssService) error { + s.replayCache = replayCache + return nil + } +} + +func WithNatTimeout(natTimeout time.Duration) Option { + return func(s *ssService) error { + s.natTimeout = natTimeout + return nil + } +} + +func (s *ssService) HandleStream(ctx context.Context, conn transport.StreamConn) { + if s.sh == nil { + authFunc := NewShadowsocksStreamAuthenticator(s.ciphers, s.replayCache, &ssConnMetrics{ServiceMetrics: s.m, proto: "tcp"}) + // TODO: Register initial data metrics at zero. + s.sh = NewStreamHandler(authFunc, tcpReadTimeout) + } + connMetrics := s.m.AddOpenTCPConnection(conn) + s.sh.Handle(ctx, conn, connMetrics) +} + +func (s *ssService) HandlePacket(conn net.PacketConn) { + if s.ph == nil { + s.ph = NewPacketHandler(s.natTimeout, s.ciphers, s.m, &ssConnMetrics{ServiceMetrics: s.m, proto: "udp"}) + } + s.ph.Handle(conn) +} + +type ssConnMetrics struct { + ServiceMetrics + proto string +} + +var _ ShadowsocksConnMetrics = (*ssConnMetrics)(nil) + +func (cm *ssConnMetrics) AddCipherSearch(accessKeyFound bool, timeToCipher time.Duration) { + cm.ServiceMetrics.AddCipherSearch(cm.proto, accessKeyFound, timeToCipher) +} From 6e330fac5763c9d7ef1e23e724a3eb027c2a9c77 Mon Sep 17 00:00:00 2001 From: sbruens Date: Fri, 6 Sep 2024 12:45:32 -0400 Subject: [PATCH 02/13] Remove need to return errors in opt functions. --- service/service.go | 19 ++++++------------- 1 file changed, 6 insertions(+), 13 deletions(-) diff --git a/service/service.go b/service/service.go index ae8f625d..065fc22d 100644 --- a/service/service.go +++ b/service/service.go @@ -16,7 +16,6 @@ package service import ( "context" - "fmt" "net" "time" @@ -43,7 +42,7 @@ type Service interface { } // Option user's option. -type Option func(s *ssService) error +type Option func(s *ssService) type ssService struct { m ServiceMetrics @@ -59,9 +58,7 @@ func NewService(opts ...Option) (Service, error) { s := &ssService{} for _, opt := range opts { - if err := opt(s); err != nil { - return nil, fmt.Errorf("failed to create new service: %v", err) - } + opt(s) } if s.natTimeout == 0 { @@ -72,32 +69,28 @@ func NewService(opts ...Option) (Service, error) { // WithCiphers option function. func WithCiphers(ciphers CipherList) Option { - return func(s *ssService) error { + return func(s *ssService) { s.ciphers = ciphers - return nil } } // WithMetrics option function. func WithMetrics(metrics ServiceMetrics) Option { - return func(s *ssService) error { + return func(s *ssService) { s.m = metrics - return nil } } // WithReplayCache option function. func WithReplayCache(replayCache *ReplayCache) Option { - return func(s *ssService) error { + return func(s *ssService) { s.replayCache = replayCache - return nil } } func WithNatTimeout(natTimeout time.Duration) Option { - return func(s *ssService) error { + return func(s *ssService) { s.natTimeout = natTimeout - return nil } } From 2738b45f5749b1c1e05692a1b19743aeead79cc4 Mon Sep 17 00:00:00 2001 From: sbruens Date: Fri, 6 Sep 2024 12:52:38 -0400 Subject: [PATCH 03/13] Move the service into `shadowsocks.go`. --- cmd/outline-ss-server/main.go | 4 +- service/service.go | 123 ---------------------------------- service/shadowsocks.go | 112 ++++++++++++++++++++++++++++++- 3 files changed, 113 insertions(+), 126 deletions(-) delete mode 100644 service/service.go diff --git a/cmd/outline-ss-server/main.go b/cmd/outline-ss-server/main.go index 6fd84dfa..543d036d 100644 --- a/cmd/outline-ss-server/main.go +++ b/cmd/outline-ss-server/main.go @@ -217,7 +217,7 @@ func (s *SSServer) runConfig(config Config) (func() error, error) { ciphers := service.NewCipherList() ciphers.Update(cipherList) - ssService, err := service.NewService( + ssService, err := service.NewShadowsocksService( service.WithCiphers(ciphers), service.WithNatTimeout(s.natTimeout), service.WithMetrics(s.serviceMetrics), @@ -243,7 +243,7 @@ func (s *SSServer) runConfig(config Config) (func() error, error) { if err != nil { return fmt.Errorf("failed to create cipher list from config: %v", err) } - ssService, err := service.NewService( + ssService, err := service.NewShadowsocksService( service.WithCiphers(ciphers), service.WithNatTimeout(s.natTimeout), service.WithMetrics(s.serviceMetrics), diff --git a/service/service.go b/service/service.go deleted file mode 100644 index 065fc22d..00000000 --- a/service/service.go +++ /dev/null @@ -1,123 +0,0 @@ -// Copyright 2024 The Outline Authors -// -// Licensed under the Apache License, Version 2.0 (the "License"); -// you may not use this file except in compliance with the License. -// You may obtain a copy of the License at -// -// http://www.apache.org/licenses/LICENSE-2.0 -// -// Unless required by applicable law or agreed to in writing, software -// distributed under the License is distributed on an "AS IS" BASIS, -// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -// See the License for the specific language governing permissions and -// limitations under the License. - -package service - -import ( - "context" - "net" - "time" - - "github.com/Jigsaw-Code/outline-sdk/transport" -) - -const ( - // 59 seconds is most common timeout for servers that do not respond to invalid requests - tcpReadTimeout time.Duration = 59 * time.Second - - // A UDP NAT timeout of at least 5 minutes is recommended in RFC 4787 Section 4.3. - defaultNatTimeout time.Duration = 5 * time.Minute -) - -type ServiceMetrics interface { - UDPMetrics - AddOpenTCPConnection(conn net.Conn) TCPConnMetrics - AddCipherSearch(proto string, accessKeyFound bool, timeToCipher time.Duration) -} - -type Service interface { - HandleStream(ctx context.Context, conn transport.StreamConn) - HandlePacket(conn net.PacketConn) -} - -// Option user's option. -type Option func(s *ssService) - -type ssService struct { - m ServiceMetrics - ciphers CipherList - natTimeout time.Duration - replayCache *ReplayCache - - sh StreamHandler - ph PacketHandler -} - -func NewService(opts ...Option) (Service, error) { - s := &ssService{} - - for _, opt := range opts { - opt(s) - } - - if s.natTimeout == 0 { - s.natTimeout = defaultNatTimeout - } - return s, nil -} - -// WithCiphers option function. -func WithCiphers(ciphers CipherList) Option { - return func(s *ssService) { - s.ciphers = ciphers - } -} - -// WithMetrics option function. -func WithMetrics(metrics ServiceMetrics) Option { - return func(s *ssService) { - s.m = metrics - } -} - -// WithReplayCache option function. -func WithReplayCache(replayCache *ReplayCache) Option { - return func(s *ssService) { - s.replayCache = replayCache - } -} - -func WithNatTimeout(natTimeout time.Duration) Option { - return func(s *ssService) { - s.natTimeout = natTimeout - } -} - -func (s *ssService) HandleStream(ctx context.Context, conn transport.StreamConn) { - if s.sh == nil { - authFunc := NewShadowsocksStreamAuthenticator(s.ciphers, s.replayCache, &ssConnMetrics{ServiceMetrics: s.m, proto: "tcp"}) - // TODO: Register initial data metrics at zero. - s.sh = NewStreamHandler(authFunc, tcpReadTimeout) - } - connMetrics := s.m.AddOpenTCPConnection(conn) - s.sh.Handle(ctx, conn, connMetrics) -} - -func (s *ssService) HandlePacket(conn net.PacketConn) { - if s.ph == nil { - s.ph = NewPacketHandler(s.natTimeout, s.ciphers, s.m, &ssConnMetrics{ServiceMetrics: s.m, proto: "udp"}) - } - s.ph.Handle(conn) -} - -type ssConnMetrics struct { - ServiceMetrics - proto string -} - -var _ ShadowsocksConnMetrics = (*ssConnMetrics)(nil) - -func (cm *ssConnMetrics) AddCipherSearch(accessKeyFound bool, timeToCipher time.Duration) { - cm.ServiceMetrics.AddCipherSearch(cm.proto, accessKeyFound, timeToCipher) -} diff --git a/service/shadowsocks.go b/service/shadowsocks.go index 97329c3a..87814df8 100644 --- a/service/shadowsocks.go +++ b/service/shadowsocks.go @@ -14,9 +14,119 @@ package service -import "time" +import ( + "context" + "net" + "time" + + "github.com/Jigsaw-Code/outline-sdk/transport" +) + +const ( + // 59 seconds is most common timeout for servers that do not respond to invalid requests + tcpReadTimeout time.Duration = 59 * time.Second + + // A UDP NAT timeout of at least 5 minutes is recommended in RFC 4787 Section 4.3. + defaultNatTimeout time.Duration = 5 * time.Minute +) // ShadowsocksConnMetrics is used to report Shadowsocks related metrics on connections. type ShadowsocksConnMetrics interface { AddCipherSearch(accessKeyFound bool, timeToCipher time.Duration) } + +type ServiceMetrics interface { + UDPMetrics + AddOpenTCPConnection(conn net.Conn) TCPConnMetrics + AddCipherSearch(proto string, accessKeyFound bool, timeToCipher time.Duration) +} + +type Service interface { + HandleStream(ctx context.Context, conn transport.StreamConn) + HandlePacket(conn net.PacketConn) +} + +// Option is a Shadowsocks service constructor option. +type Option func(s *ssService) + +type ssService struct { + m ServiceMetrics + ciphers CipherList + natTimeout time.Duration + replayCache *ReplayCache + + sh StreamHandler + ph PacketHandler +} + +// NewShadowsocksService creates a new service +func NewShadowsocksService(opts ...Option) (Service, error) { + s := &ssService{} + + for _, opt := range opts { + opt(s) + } + + if s.natTimeout == 0 { + s.natTimeout = defaultNatTimeout + } + return s, nil +} + +// WithCiphers option function. +func WithCiphers(ciphers CipherList) Option { + return func(s *ssService) { + s.ciphers = ciphers + } +} + +// WithMetrics option function. +func WithMetrics(metrics ServiceMetrics) Option { + return func(s *ssService) { + s.m = metrics + } +} + +// WithReplayCache option function. +func WithReplayCache(replayCache *ReplayCache) Option { + return func(s *ssService) { + s.replayCache = replayCache + } +} + +// WithNatTimeout option function. +func WithNatTimeout(natTimeout time.Duration) Option { + return func(s *ssService) { + s.natTimeout = natTimeout + } +} + +// HandleStream handles a Shadowsocks stream-based connection. +func (s *ssService) HandleStream(ctx context.Context, conn transport.StreamConn) { + if s.sh == nil { + authFunc := NewShadowsocksStreamAuthenticator(s.ciphers, s.replayCache, &ssConnMetrics{ServiceMetrics: s.m, proto: "tcp"}) + // TODO: Register initial data metrics at zero. + s.sh = NewStreamHandler(authFunc, tcpReadTimeout) + } + connMetrics := s.m.AddOpenTCPConnection(conn) + s.sh.Handle(ctx, conn, connMetrics) +} + +// HandlePacket handles a Shadowsocks packet connection. +func (s *ssService) HandlePacket(conn net.PacketConn) { + if s.ph == nil { + s.ph = NewPacketHandler(s.natTimeout, s.ciphers, s.m, &ssConnMetrics{ServiceMetrics: s.m, proto: "udp"}) + } + s.ph.Handle(conn) +} + +type ssConnMetrics struct { + ServiceMetrics + proto string +} + +var _ ShadowsocksConnMetrics = (*ssConnMetrics)(nil) + +func (cm *ssConnMetrics) AddCipherSearch(accessKeyFound bool, timeToCipher time.Duration) { + cm.ServiceMetrics.AddCipherSearch(cm.proto, accessKeyFound, timeToCipher) +} From e6686d1aea8be1d4bc017c4423ead7fd11750410 Mon Sep 17 00:00:00 2001 From: sbruens Date: Wed, 11 Sep 2024 11:08:35 -0400 Subject: [PATCH 04/13] Move initialization of handlers to the constructor. --- service/shadowsocks.go | 16 ++++++++-------- 1 file changed, 8 insertions(+), 8 deletions(-) diff --git a/service/shadowsocks.go b/service/shadowsocks.go index 87814df8..cb274600 100644 --- a/service/shadowsocks.go +++ b/service/shadowsocks.go @@ -70,6 +70,14 @@ func NewShadowsocksService(opts ...Option) (Service, error) { if s.natTimeout == 0 { s.natTimeout = defaultNatTimeout } + + // TODO: Register initial data metrics at zero. + s.sh = NewStreamHandler( + NewShadowsocksStreamAuthenticator(s.ciphers, s.replayCache, &ssConnMetrics{ServiceMetrics: s.m, proto: "tcp"}), + tcpReadTimeout, + ) + s.ph = NewPacketHandler(s.natTimeout, s.ciphers, s.m, &ssConnMetrics{ServiceMetrics: s.m, proto: "udp"}) + return s, nil } @@ -103,20 +111,12 @@ func WithNatTimeout(natTimeout time.Duration) Option { // HandleStream handles a Shadowsocks stream-based connection. func (s *ssService) HandleStream(ctx context.Context, conn transport.StreamConn) { - if s.sh == nil { - authFunc := NewShadowsocksStreamAuthenticator(s.ciphers, s.replayCache, &ssConnMetrics{ServiceMetrics: s.m, proto: "tcp"}) - // TODO: Register initial data metrics at zero. - s.sh = NewStreamHandler(authFunc, tcpReadTimeout) - } connMetrics := s.m.AddOpenTCPConnection(conn) s.sh.Handle(ctx, conn, connMetrics) } // HandlePacket handles a Shadowsocks packet connection. func (s *ssService) HandlePacket(conn net.PacketConn) { - if s.ph == nil { - s.ph = NewPacketHandler(s.natTimeout, s.ciphers, s.m, &ssConnMetrics{ServiceMetrics: s.m, proto: "udp"}) - } s.ph.Handle(conn) } From 1259af8d312fe0676856301c6961b848e96cc967 Mon Sep 17 00:00:00 2001 From: sbruens Date: Wed, 11 Sep 2024 15:25:14 -0400 Subject: [PATCH 05/13] Pass a `list.List` instead of a `CipherList`. --- cmd/outline-ss-server/main.go | 14 ++++---------- service/shadowsocks.go | 9 ++++++--- 2 files changed, 10 insertions(+), 13 deletions(-) diff --git a/cmd/outline-ss-server/main.go b/cmd/outline-ss-server/main.go index 543d036d..d87a42e6 100644 --- a/cmd/outline-ss-server/main.go +++ b/cmd/outline-ss-server/main.go @@ -93,12 +93,12 @@ func (s *SSServer) loadConfig(filename string) error { return nil } -func newCipherListFromConfig(config ServiceConfig) (service.CipherList, error) { +func newCipherListFromConfig(config ServiceConfig) (*list.List, error) { type cipherKey struct { cipher string secret string } - cipherList := list.New() + ciphers := list.New() existingCiphers := make(map[cipherKey]bool) for _, keyConfig := range config.Keys { key := cipherKey{keyConfig.Cipher, keyConfig.Secret} @@ -111,12 +111,9 @@ func newCipherListFromConfig(config ServiceConfig) (service.CipherList, error) { return nil, fmt.Errorf("failed to create encyption key for key %v: %w", keyConfig.ID, err) } entry := service.MakeCipherEntry(keyConfig.ID, cryptoKey, keyConfig.Secret) - cipherList.PushBack(&entry) + ciphers.PushBack(&entry) existingCiphers[key] = true } - ciphers := service.NewCipherList() - ciphers.Update(cipherList) - return ciphers, nil } @@ -214,11 +211,8 @@ func (s *SSServer) runConfig(config Config) (func() error, error) { for portNum, cipherList := range portCiphers { addr := net.JoinHostPort("::", strconv.Itoa(portNum)) - ciphers := service.NewCipherList() - ciphers.Update(cipherList) - ssService, err := service.NewShadowsocksService( - service.WithCiphers(ciphers), + service.WithCiphers(cipherList), service.WithNatTimeout(s.natTimeout), service.WithMetrics(s.serviceMetrics), service.WithReplayCache(&s.replayCache), diff --git a/service/shadowsocks.go b/service/shadowsocks.go index cb274600..227fc81c 100644 --- a/service/shadowsocks.go +++ b/service/shadowsocks.go @@ -15,6 +15,7 @@ package service import ( + "container/list" "context" "net" "time" @@ -61,7 +62,9 @@ type ssService struct { // NewShadowsocksService creates a new service func NewShadowsocksService(opts ...Option) (Service, error) { - s := &ssService{} + s := &ssService{ + ciphers: NewCipherList(), + } for _, opt := range opts { opt(s) @@ -82,9 +85,9 @@ func NewShadowsocksService(opts ...Option) (Service, error) { } // WithCiphers option function. -func WithCiphers(ciphers CipherList) Option { +func WithCiphers(ciphers *list.List) Option { return func(s *ssService) { - s.ciphers = ciphers + s.ciphers.Update(ciphers) } } From 3a64e35b9236ad0564927750253b38a7c1cf7f05 Mon Sep 17 00:00:00 2001 From: sbruens Date: Wed, 11 Sep 2024 15:27:59 -0400 Subject: [PATCH 06/13] Rename `SSServer` to `OutlineServer`. --- cmd/outline-ss-server/main.go | 16 ++++++++-------- 1 file changed, 8 insertions(+), 8 deletions(-) diff --git a/cmd/outline-ss-server/main.go b/cmd/outline-ss-server/main.go index d87a42e6..76a17051 100644 --- a/cmd/outline-ss-server/main.go +++ b/cmd/outline-ss-server/main.go @@ -57,7 +57,7 @@ func init() { ) } -type SSServer struct { +type OutlineServer struct { stopConfig func() error lnManager service.ListenerManager natTimeout time.Duration @@ -66,7 +66,7 @@ type SSServer struct { replayCache service.ReplayCache } -func (s *SSServer) loadConfig(filename string) error { +func (s *OutlineServer) loadConfig(filename string) error { configData, err := os.ReadFile(filename) if err != nil { return fmt.Errorf("failed to read config file %s: %w", filename, err) @@ -178,7 +178,7 @@ func (ls *listenerSet) Len() int { return len(ls.listenerCloseFuncs) } -func (s *SSServer) runConfig(config Config) (func() error, error) { +func (s *OutlineServer) runConfig(config Config) (func() error, error) { startErrCh := make(chan error) stopErrCh := make(chan error) stopCh := make(chan struct{}) @@ -290,7 +290,7 @@ func (s *SSServer) runConfig(config Config) (func() error, error) { } // Stop stops serving the current config. -func (s *SSServer) Stop() error { +func (s *OutlineServer) Stop() error { stopFunc := s.stopConfig if stopFunc == nil { return nil @@ -303,9 +303,9 @@ func (s *SSServer) Stop() error { return nil } -// RunSSServer starts a shadowsocks server running, and returns the server or an error. -func RunSSServer(filename string, natTimeout time.Duration, serverMetrics *serverMetrics, serviceMetrics service.ServiceMetrics, replayHistory int) (*SSServer, error) { - server := &SSServer{ +// RunOutlineServer starts an Outline server running, and returns the server or an error. +func RunOutlineServer(filename string, natTimeout time.Duration, serverMetrics *serverMetrics, serviceMetrics service.ServiceMetrics, replayHistory int) (*OutlineServer, error) { + server := &OutlineServer{ lnManager: service.NewListenerManager(), natTimeout: natTimeout, serverMetrics: serverMetrics, @@ -397,7 +397,7 @@ func main() { r := prometheus.WrapRegistererWithPrefix("shadowsocks_", prometheus.DefaultRegisterer) r.MustRegister(serverMetrics, serviceMetrics) - _, err = RunSSServer(flags.ConfigFile, flags.natTimeout, serverMetrics, serviceMetrics, flags.replayHistory) + _, err = RunOutlineServer(flags.ConfigFile, flags.natTimeout, serverMetrics, serviceMetrics, flags.replayHistory) if err != nil { slog.Error("Server failed to start. Aborting.", "err", err) } From 39da61b64587ddab369c572795702984f2531664 Mon Sep 17 00:00:00 2001 From: sbruens Date: Thu, 12 Sep 2024 13:36:00 -0400 Subject: [PATCH 07/13] refactor: make connection metrics optional --- internal/integration_test/integration_test.go | 14 +++++----- service/tcp.go | 23 +++++++--------- service/udp.go | 27 ++++++++++++------- 3 files changed, 34 insertions(+), 30 deletions(-) diff --git a/internal/integration_test/integration_test.go b/internal/integration_test/integration_test.go index f847f761..d2626eff 100644 --- a/internal/integration_test/integration_test.go +++ b/internal/integration_test/integration_test.go @@ -130,7 +130,6 @@ func TestTCPEcho(t *testing.T) { } replayCache := service.NewReplayCache(5) const testTimeout = 200 * time.Millisecond - testMetrics := &statusMetrics{} authFunc := service.NewShadowsocksStreamAuthenticator(cipherList, &replayCache, &fakeShadowsocksMetrics{}) handler := service.NewStreamHandler(authFunc, testTimeout) handler.SetTargetDialer(&transport.TCPDialer{}) @@ -138,7 +137,7 @@ func TestTCPEcho(t *testing.T) { go func() { service.StreamServe( func() (transport.StreamConn, error) { return proxyListener.AcceptTCP() }, - func(ctx context.Context, conn transport.StreamConn) { handler.Handle(ctx, conn, testMetrics) }, + func(ctx context.Context, conn transport.StreamConn) { handler.Handle(ctx, conn, nil) }, ) done <- struct{}{} }() @@ -192,11 +191,14 @@ func (m *fakeShadowsocksMetrics) AddCipherSearch(accessKeyFound bool, timeToCiph } type statusMetrics struct { - service.NoOpTCPConnMetrics sync.Mutex statuses []string } +var _ service.TCPConnMetrics = (*statusMetrics)(nil) + +func (m *statusMetrics) AddAuthenticated(accessKey string) {} +func (m *statusMetrics) AddProbe(status, drainResult string, clientProxyBytes int64) {} func (m *statusMetrics) AddClosed(status string, data metrics.ProxyMetrics, duration time.Duration) { m.Lock() m.statuses = append(m.statuses, status) @@ -399,7 +401,6 @@ func BenchmarkTCPThroughput(b *testing.B) { b.Fatal(err) } const testTimeout = 200 * time.Millisecond - testMetrics := &service.NoOpTCPConnMetrics{} authFunc := service.NewShadowsocksStreamAuthenticator(cipherList, nil, &fakeShadowsocksMetrics{}) handler := service.NewStreamHandler(authFunc, testTimeout) handler.SetTargetDialer(&transport.TCPDialer{}) @@ -407,7 +408,7 @@ func BenchmarkTCPThroughput(b *testing.B) { go func() { service.StreamServe( service.WrapStreamAcceptFunc(proxyListener.AcceptTCP), - func(ctx context.Context, conn transport.StreamConn) { handler.Handle(ctx, conn, testMetrics) }, + func(ctx context.Context, conn transport.StreamConn) { handler.Handle(ctx, conn, nil) }, ) done <- struct{}{} }() @@ -466,7 +467,6 @@ func BenchmarkTCPMultiplexing(b *testing.B) { } replayCache := service.NewReplayCache(service.MaxCapacity) const testTimeout = 200 * time.Millisecond - testMetrics := &service.NoOpTCPConnMetrics{} authFunc := service.NewShadowsocksStreamAuthenticator(cipherList, &replayCache, &fakeShadowsocksMetrics{}) handler := service.NewStreamHandler(authFunc, testTimeout) handler.SetTargetDialer(&transport.TCPDialer{}) @@ -474,7 +474,7 @@ func BenchmarkTCPMultiplexing(b *testing.B) { go func() { service.StreamServe( service.WrapStreamAcceptFunc(proxyListener.AcceptTCP), - func(ctx context.Context, conn transport.StreamConn) { handler.Handle(ctx, conn, testMetrics) }, + func(ctx context.Context, conn transport.StreamConn) { handler.Handle(ctx, conn, nil) }, ) done <- struct{}{} }() diff --git a/service/tcp.go b/service/tcp.go index 8637663b..5e66261c 100644 --- a/service/tcp.go +++ b/service/tcp.go @@ -253,7 +253,9 @@ func (h *streamHandler) Handle(ctx context.Context, clientConn transport.StreamC status = connError.Status slog.LogAttrs(nil, slog.LevelDebug, "TCP: Error", slog.String("msg", connError.Message), slog.Any("cause", connError.Cause)) } - connMetrics.AddClosed(status, proxyMetrics, connDuration) + if connMetrics != nil { + connMetrics.AddClosed(status, proxyMetrics, connDuration) + } measuredClientConn.Close() // Closing after the metrics are added aids integration testing. slog.LogAttrs(nil, slog.LevelDebug, "TCP: Done.", slog.String("status", status), slog.Duration("duration", connDuration)) } @@ -325,7 +327,9 @@ func (h *streamHandler) handleConnection(ctx context.Context, outerConn transpor h.absorbProbe(outerConn, connMetrics, authErr.Status, proxyMetrics) return authErr } - connMetrics.AddAuthenticated(id) + if connMetrics != nil { + connMetrics.AddAuthenticated(id) + } // Read target address and dial it. tgtAddr, err := getProxyRequest(innerConn) @@ -355,7 +359,9 @@ func (h *streamHandler) absorbProbe(clientConn io.ReadCloser, connMetrics TCPCon _, drainErr := io.Copy(io.Discard, clientConn) // drain socket drainResult := drainErrToString(drainErr) slog.LogAttrs(nil, slog.LevelDebug, "Drain error.", slog.Any("err", drainErr), slog.String("result", drainResult)) - connMetrics.AddProbe(status, drainResult, proxyMetrics.ClientProxy) + if connMetrics != nil { + connMetrics.AddProbe(status, drainResult, proxyMetrics.ClientProxy) + } } func drainErrToString(drainErr error) string { @@ -369,14 +375,3 @@ func drainErrToString(drainErr error) string { return "other" } } - -// NoOpTCPConnMetrics is a [TCPConnMetrics] that doesn't do anything. Useful in tests -// or if you don't want to track metrics. -type NoOpTCPConnMetrics struct{} - -var _ TCPConnMetrics = (*NoOpTCPConnMetrics)(nil) - -func (m *NoOpTCPConnMetrics) AddAuthenticated(accessKey string) {} -func (m *NoOpTCPConnMetrics) AddClosed(status string, data metrics.ProxyMetrics, duration time.Duration) { -} -func (m *NoOpTCPConnMetrics) AddProbe(status, drainResult string, clientProxyBytes int64) {} diff --git a/service/udp.go b/service/udp.go index 2d08a709..9cb4a462 100644 --- a/service/udp.go +++ b/service/udp.go @@ -198,7 +198,7 @@ func (h *packetHandler) Handle(clientConn net.PacketConn) { slog.LogAttrs(nil, slog.LevelDebug, "UDP: Error", slog.String("msg", connError.Message), slog.Any("cause", connError.Cause)) status = connError.Status } - if targetConn != nil { + if targetConn != nil && targetConn.metrics != nil { targetConn.metrics.AddPacketFromClient(status, int64(clientProxyBytes), int64(proxyTargetBytes)) } } @@ -299,11 +299,13 @@ type natmap struct { running *sync.WaitGroup } -func newNATmap(timeout time.Duration, sm UDPMetrics, running *sync.WaitGroup) *natmap { - m := &natmap{metrics: sm, running: running} - m.keyConn = make(map[string]*natconn) - m.timeout = timeout - return m +func newNATmap(timeout time.Duration, metrics UDPMetrics, running *sync.WaitGroup) *natmap { + return &natmap{ + metrics: metrics, + running: running, + keyConn: make(map[string]*natconn), + timeout: timeout, + } } func (m *natmap) Get(key string) *natconn { @@ -341,13 +343,18 @@ func (m *natmap) del(key string) net.PacketConn { } func (m *natmap) Add(clientAddr net.Addr, clientConn net.PacketConn, cryptoKey *shadowsocks.EncryptionKey, targetConn net.PacketConn, keyID string) *natconn { - connMetrics := m.metrics.AddUDPNatEntry(clientAddr, keyID) + var connMetrics UDPConnMetrics + if m.metrics != nil { + connMetrics = m.metrics.AddUDPNatEntry(clientAddr, keyID) + } entry := m.set(clientAddr.String(), targetConn, cryptoKey, keyID, connMetrics) m.running.Add(1) go func() { timedCopy(clientAddr, clientConn, entry, keyID) - connMetrics.RemoveNatEntry() + if connMetrics != nil { + connMetrics.RemoveNatEntry() + } if pc := m.del(clientAddr.String()); pc != nil { pc.Close() } @@ -443,7 +450,9 @@ func timedCopy(clientAddr net.Addr, clientConn net.PacketConn, targetConn *natco if expired { break } - targetConn.metrics.AddPacketFromTarget(status, int64(bodyLen), int64(proxyClientBytes)) + if targetConn.metrics != nil { + targetConn.metrics.AddPacketFromTarget(status, int64(bodyLen), int64(proxyClientBytes)) + } } } From 9d126f9b8336ba8736149eea1883dc1205e2a184 Mon Sep 17 00:00:00 2001 From: sbruens Date: Mon, 16 Sep 2024 16:46:46 -0400 Subject: [PATCH 08/13] Revert "Pass a `list.List` instead of a `CipherList`." This reverts commit 1259af8d312fe0676856301c6961b848e96cc967. --- cmd/outline-ss-server/main.go | 14 ++++++++++---- service/shadowsocks.go | 9 +++------ 2 files changed, 13 insertions(+), 10 deletions(-) diff --git a/cmd/outline-ss-server/main.go b/cmd/outline-ss-server/main.go index 76a17051..bd2aa177 100644 --- a/cmd/outline-ss-server/main.go +++ b/cmd/outline-ss-server/main.go @@ -93,12 +93,12 @@ func (s *OutlineServer) loadConfig(filename string) error { return nil } -func newCipherListFromConfig(config ServiceConfig) (*list.List, error) { +func newCipherListFromConfig(config ServiceConfig) (service.CipherList, error) { type cipherKey struct { cipher string secret string } - ciphers := list.New() + cipherList := list.New() existingCiphers := make(map[cipherKey]bool) for _, keyConfig := range config.Keys { key := cipherKey{keyConfig.Cipher, keyConfig.Secret} @@ -111,9 +111,12 @@ func newCipherListFromConfig(config ServiceConfig) (*list.List, error) { return nil, fmt.Errorf("failed to create encyption key for key %v: %w", keyConfig.ID, err) } entry := service.MakeCipherEntry(keyConfig.ID, cryptoKey, keyConfig.Secret) - ciphers.PushBack(&entry) + cipherList.PushBack(&entry) existingCiphers[key] = true } + ciphers := service.NewCipherList() + ciphers.Update(cipherList) + return ciphers, nil } @@ -211,8 +214,11 @@ func (s *OutlineServer) runConfig(config Config) (func() error, error) { for portNum, cipherList := range portCiphers { addr := net.JoinHostPort("::", strconv.Itoa(portNum)) + ciphers := service.NewCipherList() + ciphers.Update(cipherList) + ssService, err := service.NewShadowsocksService( - service.WithCiphers(cipherList), + service.WithCiphers(ciphers), service.WithNatTimeout(s.natTimeout), service.WithMetrics(s.serviceMetrics), service.WithReplayCache(&s.replayCache), diff --git a/service/shadowsocks.go b/service/shadowsocks.go index 227fc81c..cb274600 100644 --- a/service/shadowsocks.go +++ b/service/shadowsocks.go @@ -15,7 +15,6 @@ package service import ( - "container/list" "context" "net" "time" @@ -62,9 +61,7 @@ type ssService struct { // NewShadowsocksService creates a new service func NewShadowsocksService(opts ...Option) (Service, error) { - s := &ssService{ - ciphers: NewCipherList(), - } + s := &ssService{} for _, opt := range opts { opt(s) @@ -85,9 +82,9 @@ func NewShadowsocksService(opts ...Option) (Service, error) { } // WithCiphers option function. -func WithCiphers(ciphers *list.List) Option { +func WithCiphers(ciphers CipherList) Option { return func(s *ssService) { - s.ciphers.Update(ciphers) + s.ciphers = ciphers } } From 213903d58ff806b9bfa8df37d54b443b97841fa6 Mon Sep 17 00:00:00 2001 From: sbruens Date: Mon, 16 Sep 2024 17:01:13 -0400 Subject: [PATCH 09/13] Create noop metrics if nil. --- service/tcp.go | 26 +++++++++++++++++--------- service/udp.go | 26 ++++++++++++++------------ 2 files changed, 31 insertions(+), 21 deletions(-) diff --git a/service/tcp.go b/service/tcp.go index 5e66261c..5554454c 100644 --- a/service/tcp.go +++ b/service/tcp.go @@ -241,6 +241,9 @@ func StreamServe(accept StreamAcceptFunc, handle StreamHandleFunc) { } func (h *streamHandler) Handle(ctx context.Context, clientConn transport.StreamConn, connMetrics TCPConnMetrics) { + if connMetrics == nil { + connMetrics = &NoOpTCPConnMetrics{} + } var proxyMetrics metrics.ProxyMetrics measuredClientConn := metrics.MeasureConn(clientConn, &proxyMetrics.ProxyClient, &proxyMetrics.ClientProxy) connStart := time.Now() @@ -253,9 +256,7 @@ func (h *streamHandler) Handle(ctx context.Context, clientConn transport.StreamC status = connError.Status slog.LogAttrs(nil, slog.LevelDebug, "TCP: Error", slog.String("msg", connError.Message), slog.Any("cause", connError.Cause)) } - if connMetrics != nil { - connMetrics.AddClosed(status, proxyMetrics, connDuration) - } + connMetrics.AddClosed(status, proxyMetrics, connDuration) measuredClientConn.Close() // Closing after the metrics are added aids integration testing. slog.LogAttrs(nil, slog.LevelDebug, "TCP: Done.", slog.String("status", status), slog.Duration("duration", connDuration)) } @@ -327,9 +328,7 @@ func (h *streamHandler) handleConnection(ctx context.Context, outerConn transpor h.absorbProbe(outerConn, connMetrics, authErr.Status, proxyMetrics) return authErr } - if connMetrics != nil { - connMetrics.AddAuthenticated(id) - } + connMetrics.AddAuthenticated(id) // Read target address and dial it. tgtAddr, err := getProxyRequest(innerConn) @@ -359,9 +358,7 @@ func (h *streamHandler) absorbProbe(clientConn io.ReadCloser, connMetrics TCPCon _, drainErr := io.Copy(io.Discard, clientConn) // drain socket drainResult := drainErrToString(drainErr) slog.LogAttrs(nil, slog.LevelDebug, "Drain error.", slog.Any("err", drainErr), slog.String("result", drainResult)) - if connMetrics != nil { - connMetrics.AddProbe(status, drainResult, proxyMetrics.ClientProxy) - } + connMetrics.AddProbe(status, drainResult, proxyMetrics.ClientProxy) } func drainErrToString(drainErr error) string { @@ -375,3 +372,14 @@ func drainErrToString(drainErr error) string { return "other" } } + +// NoOpTCPConnMetrics is a [TCPConnMetrics] that doesn't do anything. Useful in tests +// or if you don't want to track metrics. +type NoOpTCPConnMetrics struct{} + +var _ TCPConnMetrics = (*NoOpTCPConnMetrics)(nil) + +func (m *NoOpTCPConnMetrics) AddAuthenticated(accessKey string) {} +func (m *NoOpTCPConnMetrics) AddClosed(status string, data metrics.ProxyMetrics, duration time.Duration) { +} +func (m *NoOpTCPConnMetrics) AddProbe(status, drainResult string, clientProxyBytes int64) {} diff --git a/service/udp.go b/service/udp.go index 9cb4a462..444237e3 100644 --- a/service/udp.go +++ b/service/udp.go @@ -89,7 +89,16 @@ type packetHandler struct { // NewPacketHandler creates a UDPService func NewPacketHandler(natTimeout time.Duration, cipherList CipherList, m UDPMetrics, ssMetrics ShadowsocksConnMetrics) PacketHandler { - return &packetHandler{natTimeout: natTimeout, ciphers: cipherList, m: m, ssm: ssMetrics, targetIPValidator: onet.RequirePublicIP} + if m == nil { + m = &NoOpUDPMetrics{} + } + return &packetHandler{ + natTimeout: natTimeout, + ciphers: cipherList, + m: m, + ssm: ssMetrics, + targetIPValidator: onet.RequirePublicIP, + } } // PacketHandler is a running UDP shadowsocks proxy that can be stopped. @@ -198,7 +207,7 @@ func (h *packetHandler) Handle(clientConn net.PacketConn) { slog.LogAttrs(nil, slog.LevelDebug, "UDP: Error", slog.String("msg", connError.Message), slog.Any("cause", connError.Cause)) status = connError.Status } - if targetConn != nil && targetConn.metrics != nil { + if targetConn != nil { targetConn.metrics.AddPacketFromClient(status, int64(clientProxyBytes), int64(proxyTargetBytes)) } } @@ -343,18 +352,13 @@ func (m *natmap) del(key string) net.PacketConn { } func (m *natmap) Add(clientAddr net.Addr, clientConn net.PacketConn, cryptoKey *shadowsocks.EncryptionKey, targetConn net.PacketConn, keyID string) *natconn { - var connMetrics UDPConnMetrics - if m.metrics != nil { - connMetrics = m.metrics.AddUDPNatEntry(clientAddr, keyID) - } + connMetrics := m.metrics.AddUDPNatEntry(clientAddr, keyID) entry := m.set(clientAddr.String(), targetConn, cryptoKey, keyID, connMetrics) m.running.Add(1) go func() { timedCopy(clientAddr, clientConn, entry, keyID) - if connMetrics != nil { - connMetrics.RemoveNatEntry() - } + connMetrics.RemoveNatEntry() if pc := m.del(clientAddr.String()); pc != nil { pc.Close() } @@ -450,9 +454,7 @@ func timedCopy(clientAddr net.Addr, clientConn net.PacketConn, targetConn *natco if expired { break } - if targetConn.metrics != nil { - targetConn.metrics.AddPacketFromTarget(status, int64(bodyLen), int64(proxyClientBytes)) - } + targetConn.metrics.AddPacketFromTarget(status, int64(bodyLen), int64(proxyClientBytes)) } } From e5e8549083751a50177477ac453894deda964641 Mon Sep 17 00:00:00 2001 From: sbruens Date: Mon, 16 Sep 2024 17:02:32 -0400 Subject: [PATCH 10/13] Revert some more changes. --- internal/integration_test/integration_test.go | 14 +++++++------- service/udp.go | 12 +++++------- 2 files changed, 12 insertions(+), 14 deletions(-) diff --git a/internal/integration_test/integration_test.go b/internal/integration_test/integration_test.go index d2626eff..f847f761 100644 --- a/internal/integration_test/integration_test.go +++ b/internal/integration_test/integration_test.go @@ -130,6 +130,7 @@ func TestTCPEcho(t *testing.T) { } replayCache := service.NewReplayCache(5) const testTimeout = 200 * time.Millisecond + testMetrics := &statusMetrics{} authFunc := service.NewShadowsocksStreamAuthenticator(cipherList, &replayCache, &fakeShadowsocksMetrics{}) handler := service.NewStreamHandler(authFunc, testTimeout) handler.SetTargetDialer(&transport.TCPDialer{}) @@ -137,7 +138,7 @@ func TestTCPEcho(t *testing.T) { go func() { service.StreamServe( func() (transport.StreamConn, error) { return proxyListener.AcceptTCP() }, - func(ctx context.Context, conn transport.StreamConn) { handler.Handle(ctx, conn, nil) }, + func(ctx context.Context, conn transport.StreamConn) { handler.Handle(ctx, conn, testMetrics) }, ) done <- struct{}{} }() @@ -191,14 +192,11 @@ func (m *fakeShadowsocksMetrics) AddCipherSearch(accessKeyFound bool, timeToCiph } type statusMetrics struct { + service.NoOpTCPConnMetrics sync.Mutex statuses []string } -var _ service.TCPConnMetrics = (*statusMetrics)(nil) - -func (m *statusMetrics) AddAuthenticated(accessKey string) {} -func (m *statusMetrics) AddProbe(status, drainResult string, clientProxyBytes int64) {} func (m *statusMetrics) AddClosed(status string, data metrics.ProxyMetrics, duration time.Duration) { m.Lock() m.statuses = append(m.statuses, status) @@ -401,6 +399,7 @@ func BenchmarkTCPThroughput(b *testing.B) { b.Fatal(err) } const testTimeout = 200 * time.Millisecond + testMetrics := &service.NoOpTCPConnMetrics{} authFunc := service.NewShadowsocksStreamAuthenticator(cipherList, nil, &fakeShadowsocksMetrics{}) handler := service.NewStreamHandler(authFunc, testTimeout) handler.SetTargetDialer(&transport.TCPDialer{}) @@ -408,7 +407,7 @@ func BenchmarkTCPThroughput(b *testing.B) { go func() { service.StreamServe( service.WrapStreamAcceptFunc(proxyListener.AcceptTCP), - func(ctx context.Context, conn transport.StreamConn) { handler.Handle(ctx, conn, nil) }, + func(ctx context.Context, conn transport.StreamConn) { handler.Handle(ctx, conn, testMetrics) }, ) done <- struct{}{} }() @@ -467,6 +466,7 @@ func BenchmarkTCPMultiplexing(b *testing.B) { } replayCache := service.NewReplayCache(service.MaxCapacity) const testTimeout = 200 * time.Millisecond + testMetrics := &service.NoOpTCPConnMetrics{} authFunc := service.NewShadowsocksStreamAuthenticator(cipherList, &replayCache, &fakeShadowsocksMetrics{}) handler := service.NewStreamHandler(authFunc, testTimeout) handler.SetTargetDialer(&transport.TCPDialer{}) @@ -474,7 +474,7 @@ func BenchmarkTCPMultiplexing(b *testing.B) { go func() { service.StreamServe( service.WrapStreamAcceptFunc(proxyListener.AcceptTCP), - func(ctx context.Context, conn transport.StreamConn) { handler.Handle(ctx, conn, nil) }, + func(ctx context.Context, conn transport.StreamConn) { handler.Handle(ctx, conn, testMetrics) }, ) done <- struct{}{} }() diff --git a/service/udp.go b/service/udp.go index 444237e3..0215358c 100644 --- a/service/udp.go +++ b/service/udp.go @@ -308,13 +308,11 @@ type natmap struct { running *sync.WaitGroup } -func newNATmap(timeout time.Duration, metrics UDPMetrics, running *sync.WaitGroup) *natmap { - return &natmap{ - metrics: metrics, - running: running, - keyConn: make(map[string]*natconn), - timeout: timeout, - } +func newNATmap(timeout time.Duration, sm UDPMetrics, running *sync.WaitGroup) *natmap { + m := &natmap{metrics: sm, running: running} + m.keyConn = make(map[string]*natconn) + m.timeout = timeout + return m } func (m *natmap) Get(key string) *natconn { From 724260e98b2b2bb90ed35666cf73117a7df334c1 Mon Sep 17 00:00:00 2001 From: sbruens Date: Mon, 16 Sep 2024 17:17:33 -0400 Subject: [PATCH 11/13] Use a noop metrics struct if no metrics provided. --- service/shadowsocks.go | 26 +++++++++++++++++++++----- 1 file changed, 21 insertions(+), 5 deletions(-) diff --git a/service/shadowsocks.go b/service/shadowsocks.go index cb274600..0f391f26 100644 --- a/service/shadowsocks.go +++ b/service/shadowsocks.go @@ -50,7 +50,7 @@ type Service interface { type Option func(s *ssService) type ssService struct { - m ServiceMetrics + metrics ServiceMetrics ciphers CipherList natTimeout time.Duration replayCache *ReplayCache @@ -70,13 +70,16 @@ func NewShadowsocksService(opts ...Option) (Service, error) { if s.natTimeout == 0 { s.natTimeout = defaultNatTimeout } + if s.metrics == nil { + s.metrics = &NoOpShadowsocksMetrics{} + } // TODO: Register initial data metrics at zero. s.sh = NewStreamHandler( - NewShadowsocksStreamAuthenticator(s.ciphers, s.replayCache, &ssConnMetrics{ServiceMetrics: s.m, proto: "tcp"}), + NewShadowsocksStreamAuthenticator(s.ciphers, s.replayCache, &ssConnMetrics{ServiceMetrics: s.metrics, proto: "tcp"}), tcpReadTimeout, ) - s.ph = NewPacketHandler(s.natTimeout, s.ciphers, s.m, &ssConnMetrics{ServiceMetrics: s.m, proto: "udp"}) + s.ph = NewPacketHandler(s.natTimeout, s.ciphers, s.metrics, &ssConnMetrics{ServiceMetrics: s.metrics, proto: "udp"}) return s, nil } @@ -91,7 +94,7 @@ func WithCiphers(ciphers CipherList) Option { // WithMetrics option function. func WithMetrics(metrics ServiceMetrics) Option { return func(s *ssService) { - s.m = metrics + s.metrics = metrics } } @@ -111,7 +114,7 @@ func WithNatTimeout(natTimeout time.Duration) Option { // HandleStream handles a Shadowsocks stream-based connection. func (s *ssService) HandleStream(ctx context.Context, conn transport.StreamConn) { - connMetrics := s.m.AddOpenTCPConnection(conn) + connMetrics := s.metrics.AddOpenTCPConnection(conn) s.sh.Handle(ctx, conn, connMetrics) } @@ -130,3 +133,16 @@ var _ ShadowsocksConnMetrics = (*ssConnMetrics)(nil) func (cm *ssConnMetrics) AddCipherSearch(accessKeyFound bool, timeToCipher time.Duration) { cm.ServiceMetrics.AddCipherSearch(cm.proto, accessKeyFound, timeToCipher) } + +type NoOpShadowsocksMetrics struct { + NoOpUDPMetrics +} + +var _ ServiceMetrics = (*NoOpShadowsocksMetrics)(nil) + +func (m *NoOpShadowsocksMetrics) AddOpenTCPConnection(conn net.Conn) TCPConnMetrics { + return &NoOpTCPConnMetrics{} +} + +func (m *NoOpShadowsocksMetrics) AddCipherSearch(proto string, accessKeyFound bool, timeToCipher time.Duration) { +} From 655c3cce291ae1ff177a555ce81593c906d26509 Mon Sep 17 00:00:00 2001 From: sbruens Date: Mon, 16 Sep 2024 17:24:16 -0400 Subject: [PATCH 12/13] Add noop implementation for `ShadowsocksConnMetrics`. --- service/shadowsocks.go | 9 +++++++++ service/tcp.go | 3 +++ service/udp.go | 11 +++++++---- 3 files changed, 19 insertions(+), 4 deletions(-) diff --git a/service/shadowsocks.go b/service/shadowsocks.go index 97329c3a..72170801 100644 --- a/service/shadowsocks.go +++ b/service/shadowsocks.go @@ -20,3 +20,12 @@ import "time" type ShadowsocksConnMetrics interface { AddCipherSearch(accessKeyFound bool, timeToCipher time.Duration) } + +// NoOpShadowsocksConnMetrics is a [ShadowsocksConnMetrics] that doesn't do anything. Useful in tests +// or if you don't want to track metrics. +type NoOpShadowsocksConnMetrics struct{} + +var _ ShadowsocksConnMetrics = (*NoOpShadowsocksConnMetrics)(nil) + +func (m *NoOpShadowsocksConnMetrics) AddCipherSearch(accessKeyFound bool, timeToCipher time.Duration) { +} diff --git a/service/tcp.go b/service/tcp.go index 5554454c..6a12afc0 100644 --- a/service/tcp.go +++ b/service/tcp.go @@ -117,6 +117,9 @@ type StreamAuthenticateFunc func(clientConn transport.StreamConn) (string, trans // NewShadowsocksStreamAuthenticator creates a stream authenticator that uses Shadowsocks. // TODO(fortuna): Offer alternative transports. func NewShadowsocksStreamAuthenticator(ciphers CipherList, replayCache *ReplayCache, metrics ShadowsocksConnMetrics) StreamAuthenticateFunc { + if metrics == nil { + metrics = &NoOpShadowsocksConnMetrics{} + } return func(clientConn transport.StreamConn) (string, transport.StreamConn, *onet.ConnectionError) { // Find the cipher and acess key id. cipherEntry, clientReader, clientSalt, timeToCipher, keyErr := findAccessKey(clientConn, remoteIP(clientConn), ciphers) diff --git a/service/udp.go b/service/udp.go index 0215358c..39091239 100644 --- a/service/udp.go +++ b/service/udp.go @@ -92,11 +92,14 @@ func NewPacketHandler(natTimeout time.Duration, cipherList CipherList, m UDPMetr if m == nil { m = &NoOpUDPMetrics{} } + if ssMetrics == nil { + ssMetrics = &NoOpShadowsocksConnMetrics{} + } return &packetHandler{ - natTimeout: natTimeout, - ciphers: cipherList, - m: m, - ssm: ssMetrics, + natTimeout: natTimeout, + ciphers: cipherList, + m: m, + ssm: ssMetrics, targetIPValidator: onet.RequirePublicIP, } } From c2bae13ea167e9aa3ab62658c42bfb95956b12f5 Mon Sep 17 00:00:00 2001 From: sbruens Date: Mon, 16 Sep 2024 17:45:18 -0400 Subject: [PATCH 13/13] Resolve nil metrics. --- service/shadowsocks.go | 25 +++++++------------------ 1 file changed, 7 insertions(+), 18 deletions(-) diff --git a/service/shadowsocks.go b/service/shadowsocks.go index 0f391f26..aacd3152 100644 --- a/service/shadowsocks.go +++ b/service/shadowsocks.go @@ -70,9 +70,6 @@ func NewShadowsocksService(opts ...Option) (Service, error) { if s.natTimeout == 0 { s.natTimeout = defaultNatTimeout } - if s.metrics == nil { - s.metrics = &NoOpShadowsocksMetrics{} - } // TODO: Register initial data metrics at zero. s.sh = NewStreamHandler( @@ -114,7 +111,10 @@ func WithNatTimeout(natTimeout time.Duration) Option { // HandleStream handles a Shadowsocks stream-based connection. func (s *ssService) HandleStream(ctx context.Context, conn transport.StreamConn) { - connMetrics := s.metrics.AddOpenTCPConnection(conn) + var connMetrics TCPConnMetrics + if s.metrics != nil { + connMetrics = s.metrics.AddOpenTCPConnection(conn) + } s.sh.Handle(ctx, conn, connMetrics) } @@ -131,18 +131,7 @@ type ssConnMetrics struct { var _ ShadowsocksConnMetrics = (*ssConnMetrics)(nil) func (cm *ssConnMetrics) AddCipherSearch(accessKeyFound bool, timeToCipher time.Duration) { - cm.ServiceMetrics.AddCipherSearch(cm.proto, accessKeyFound, timeToCipher) -} - -type NoOpShadowsocksMetrics struct { - NoOpUDPMetrics -} - -var _ ServiceMetrics = (*NoOpShadowsocksMetrics)(nil) - -func (m *NoOpShadowsocksMetrics) AddOpenTCPConnection(conn net.Conn) TCPConnMetrics { - return &NoOpTCPConnMetrics{} -} - -func (m *NoOpShadowsocksMetrics) AddCipherSearch(proto string, accessKeyFound bool, timeToCipher time.Duration) { + if cm.ServiceMetrics != nil { + cm.ServiceMetrics.AddCipherSearch(cm.proto, accessKeyFound, timeToCipher) + } }