diff --git a/cmd/outline-ss-server/main.go b/cmd/outline-ss-server/main.go index e61150c7..bd2aa177 100644 --- a/cmd/outline-ss-server/main.go +++ b/cmd/outline-ss-server/main.go @@ -16,7 +16,6 @@ package main import ( "container/list" - "context" "flag" "fmt" "log/slog" @@ -29,9 +28,9 @@ import ( "syscall" "time" - "github.com/Jigsaw-Code/outline-sdk/transport" "github.com/Jigsaw-Code/outline-sdk/transport/shadowsocks" "github.com/Jigsaw-Code/outline-ss-server/ipinfo" + outline_prometheus "github.com/Jigsaw-Code/outline-ss-server/prometheus" "github.com/Jigsaw-Code/outline-ss-server/service" "github.com/lmittmann/tint" "github.com/prometheus/client_golang/prometheus" @@ -58,15 +57,16 @@ func init() { ) } -type SSServer struct { - stopConfig func() error - lnManager service.ListenerManager - natTimeout time.Duration - m *outlineMetrics - replayCache service.ReplayCache +type OutlineServer struct { + stopConfig func() error + lnManager service.ListenerManager + natTimeout time.Duration + serverMetrics *serverMetrics + serviceMetrics service.ServiceMetrics + replayCache service.ReplayCache } -func (s *SSServer) loadConfig(filename string) error { +func (s *OutlineServer) loadConfig(filename string) error { configData, err := os.ReadFile(filename) if err != nil { return fmt.Errorf("failed to read config file %s: %w", filename, err) @@ -120,32 +120,6 @@ func newCipherListFromConfig(config ServiceConfig) (service.CipherList, error) { return ciphers, nil } -func (s *SSServer) NewShadowsocksStreamHandler(ciphers service.CipherList) service.StreamHandler { - authFunc := service.NewShadowsocksStreamAuthenticator(ciphers, &s.replayCache, s.m.tcpServiceMetrics) - // TODO: Register initial data metrics at zero. - return service.NewStreamHandler(authFunc, tcpReadTimeout) -} - -func (s *SSServer) NewShadowsocksPacketHandler(ciphers service.CipherList) service.PacketHandler { - return service.NewPacketHandler(s.natTimeout, ciphers, s.m, s.m.udpServiceMetrics) -} - -func (s *SSServer) NewShadowsocksStreamHandlerFromConfig(config ServiceConfig) (service.StreamHandler, error) { - ciphers, err := newCipherListFromConfig(config) - if err != nil { - return nil, err - } - return s.NewShadowsocksStreamHandler(ciphers), nil -} - -func (s *SSServer) NewShadowsocksPacketHandlerFromConfig(config ServiceConfig) (service.PacketHandler, error) { - ciphers, err := newCipherListFromConfig(config) - if err != nil { - return nil, err - } - return s.NewShadowsocksPacketHandler(ciphers), nil -} - type listenerSet struct { manager service.ListenerManager listenerCloseFuncs map[string]func() error @@ -207,7 +181,7 @@ func (ls *listenerSet) Len() int { return len(ls.listenerCloseFuncs) } -func (s *SSServer) runConfig(config Config) (func() error, error) { +func (s *OutlineServer) runConfig(config Config) (func() error, error) { startErrCh := make(chan error) stopErrCh := make(chan error) stopCh := make(chan struct{}) @@ -243,31 +217,41 @@ func (s *SSServer) runConfig(config Config) (func() error, error) { ciphers := service.NewCipherList() ciphers.Update(cipherList) - sh := s.NewShadowsocksStreamHandler(ciphers) + ssService, err := service.NewShadowsocksService( + service.WithCiphers(ciphers), + service.WithNatTimeout(s.natTimeout), + service.WithMetrics(s.serviceMetrics), + service.WithReplayCache(&s.replayCache), + ) ln, err := lnSet.ListenStream(addr) if err != nil { return err } slog.Info("TCP service started.", "address", ln.Addr().String()) - go service.StreamServe(ln.AcceptStream, func(ctx context.Context, conn transport.StreamConn) { - connMetrics := s.m.AddOpenTCPConnection(conn) - sh.Handle(ctx, conn, connMetrics) - }) + go service.StreamServe(ln.AcceptStream, ssService.HandleStream) pc, err := lnSet.ListenPacket(addr) if err != nil { return err } slog.Info("UDP service started.", "address", pc.LocalAddr().String()) - ph := s.NewShadowsocksPacketHandler(ciphers) - go ph.Handle(pc) + go ssService.HandlePacket(pc) } for _, serviceConfig := range config.Services { - var ( - sh service.StreamHandler - ph service.PacketHandler + ciphers, err := newCipherListFromConfig(serviceConfig) + if err != nil { + return fmt.Errorf("failed to create cipher list from config: %v", err) + } + ssService, err := service.NewShadowsocksService( + service.WithCiphers(ciphers), + service.WithNatTimeout(s.natTimeout), + service.WithMetrics(s.serviceMetrics), + service.WithReplayCache(&s.replayCache), ) + if err != nil { + return err + } for _, lnConfig := range serviceConfig.Listeners { switch lnConfig.Type { case listenerTypeTCP: @@ -276,36 +260,21 @@ func (s *SSServer) runConfig(config Config) (func() error, error) { return err } slog.Info("TCP service started.", "address", ln.Addr().String()) - if sh == nil { - sh, err = s.NewShadowsocksStreamHandlerFromConfig(serviceConfig) - if err != nil { - return err - } - } - go service.StreamServe(ln.AcceptStream, func(ctx context.Context, conn transport.StreamConn) { - connMetrics := s.m.AddOpenTCPConnection(conn) - sh.Handle(ctx, conn, connMetrics) - }) + go service.StreamServe(ln.AcceptStream, ssService.HandleStream) case listenerTypeUDP: pc, err := lnSet.ListenPacket(lnConfig.Address) if err != nil { return err } slog.Info("UDP service started.", "address", pc.LocalAddr().String()) - if ph == nil { - ph, err = s.NewShadowsocksPacketHandlerFromConfig(serviceConfig) - if err != nil { - return err - } - } - go ph.Handle(pc) + go ssService.HandlePacket(pc) } } totalCipherCount += len(serviceConfig.Keys) } slog.Info("Loaded config.", "access_keys", totalCipherCount, "listeners", lnSet.Len()) - s.m.SetNumAccessKeys(totalCipherCount, lnSet.Len()) + s.serverMetrics.SetNumAccessKeys(totalCipherCount, lnSet.Len()) return nil }() @@ -327,7 +296,7 @@ func (s *SSServer) runConfig(config Config) (func() error, error) { } // Stop stops serving the current config. -func (s *SSServer) Stop() error { +func (s *OutlineServer) Stop() error { stopFunc := s.stopConfig if stopFunc == nil { return nil @@ -340,13 +309,14 @@ func (s *SSServer) Stop() error { return nil } -// RunSSServer starts a shadowsocks server running, and returns the server or an error. -func RunSSServer(filename string, natTimeout time.Duration, sm *outlineMetrics, replayHistory int) (*SSServer, error) { - server := &SSServer{ - lnManager: service.NewListenerManager(), - natTimeout: natTimeout, - m: sm, - replayCache: service.NewReplayCache(replayHistory), +// RunOutlineServer starts an Outline server running, and returns the server or an error. +func RunOutlineServer(filename string, natTimeout time.Duration, serverMetrics *serverMetrics, serviceMetrics service.ServiceMetrics, replayHistory int) (*OutlineServer, error) { + server := &OutlineServer{ + lnManager: service.NewListenerManager(), + natTimeout: natTimeout, + serverMetrics: serverMetrics, + serviceMetrics: serviceMetrics, + replayCache: service.NewReplayCache(replayHistory), } err := server.loadConfig(filename) if err != nil { @@ -424,14 +394,16 @@ func main() { } defer ip2info.Close() - metrics, err := newPrometheusOutlineMetrics(ip2info) + serverMetrics := newPrometheusServerMetrics() + serverMetrics.SetVersion(version) + serviceMetrics, err := outline_prometheus.NewServiceMetrics(ip2info) if err != nil { - slog.Error("Failed to create Outline Prometheus metrics. Aborting.", "err", err) + slog.Error("Failed to create Outline Prometheus service metrics. Aborting.", "err", err) } - metrics.SetBuildInfo(version) r := prometheus.WrapRegistererWithPrefix("shadowsocks_", prometheus.DefaultRegisterer) - r.MustRegister(metrics) - _, err = RunSSServer(flags.ConfigFile, flags.natTimeout, metrics, flags.replayHistory) + r.MustRegister(serverMetrics, serviceMetrics) + + _, err = RunOutlineServer(flags.ConfigFile, flags.natTimeout, serverMetrics, serviceMetrics, flags.replayHistory) if err != nil { slog.Error("Server failed to start. Aborting.", "err", err) } diff --git a/cmd/outline-ss-server/metrics.go b/cmd/outline-ss-server/metrics.go index 302f1036..32c9b0aa 100644 --- a/cmd/outline-ss-server/metrics.go +++ b/cmd/outline-ss-server/metrics.go @@ -15,499 +15,27 @@ package main import ( - "fmt" - "log/slog" - "net" - "net/netip" - "sync" "time" - "github.com/Jigsaw-Code/outline-ss-server/ipinfo" - "github.com/Jigsaw-Code/outline-ss-server/service" - "github.com/Jigsaw-Code/outline-ss-server/service/metrics" "github.com/prometheus/client_golang/prometheus" ) // `now` is stubbable for testing. var now = time.Now -func NewTimeToCipherVec(proto string) (prometheus.ObserverVec, error) { - vec := prometheus.NewHistogramVec( - prometheus.HistogramOpts{ - Name: "time_to_cipher_ms", - Help: "Time needed to find the cipher", - Buckets: []float64{0.1, 1, 10, 100, 1000}, - }, []string{"proto", "found_key"}) - return vec.CurryWith(map[string]string{"proto": proto}) -} - -type proxyCollector struct { - // NOTE: New metrics need to be added to `newProxyCollector()`, `Describe()` and `Collect()`. - dataBytesPerKey *prometheus.CounterVec - dataBytesPerLocation *prometheus.CounterVec -} - -func newProxyCollector(proto string) (*proxyCollector, error) { - dataBytesPerKey, err := prometheus.NewCounterVec( - prometheus.CounterOpts{ - Name: "data_bytes", - Help: "Bytes transferred by the proxy, per access key", - }, []string{"proto", "dir", "access_key"}).CurryWith(map[string]string{"proto": proto}) - if err != nil { - return nil, err - } - dataBytesPerLocation, err := prometheus.NewCounterVec( - prometheus.CounterOpts{ - Name: "data_bytes_per_location", - Help: "Bytes transferred by the proxy, per location", - }, []string{"proto", "dir", "location", "asn", "asorg"}).CurryWith(map[string]string{"proto": proto}) - if err != nil { - return nil, err - } - return &proxyCollector{ - dataBytesPerKey: dataBytesPerKey, - dataBytesPerLocation: dataBytesPerLocation, - }, nil -} - -func (c *proxyCollector) Describe(ch chan<- *prometheus.Desc) { - c.dataBytesPerKey.Describe(ch) - c.dataBytesPerLocation.Describe(ch) -} - -func (c *proxyCollector) Collect(ch chan<- prometheus.Metric) { - c.dataBytesPerKey.Collect(ch) - c.dataBytesPerLocation.Collect(ch) -} - -func (c *proxyCollector) addClientTarget(clientProxyBytes, proxyTargetBytes int64, accessKey string, clientInfo ipinfo.IPInfo) { - addIfNonZero(clientProxyBytes, c.dataBytesPerKey, "c>p", accessKey) - addIfNonZero(clientProxyBytes, c.dataBytesPerLocation, "c>p", clientInfo.CountryCode.String(), asnLabel(clientInfo.ASN.Number), clientInfo.ASN.Organization) - addIfNonZero(proxyTargetBytes, c.dataBytesPerKey, "p>t", accessKey) - addIfNonZero(proxyTargetBytes, c.dataBytesPerLocation, "p>t", clientInfo.CountryCode.String(), asnLabel(clientInfo.ASN.Number), clientInfo.ASN.Organization) -} - -func (c *proxyCollector) addTargetClient(targetProxyBytes, proxyClientBytes int64, accessKey string, clientInfo ipinfo.IPInfo) { - addIfNonZero(targetProxyBytes, c.dataBytesPerKey, "p 0 { - counterVec.WithLabelValues(lvs...).Add(float64(value)) - } -} - -func asnLabel(asn int) string { - if asn == 0 { - return "" - } - return fmt.Sprint(asn) -} - -// Converts a [net.Addr] to an [IPKey]. -func toIPKey(addr net.Addr, accessKey string) (*IPKey, error) { - hostname, _, err := net.SplitHostPort(addr.String()) - if err != nil { - return nil, fmt.Errorf("failed to create IPKey: %w", err) - } - ip, err := netip.ParseAddr(hostname) - if err != nil { - return nil, fmt.Errorf("failed to create IPKey: %w", err) - } - return &IPKey{ip, accessKey}, nil -} diff --git a/cmd/outline-ss-server/metrics_test.go b/cmd/outline-ss-server/metrics_test.go index 15a112bf..93cce446 100644 --- a/cmd/outline-ss-server/metrics_test.go +++ b/cmd/outline-ss-server/metrics_test.go @@ -21,7 +21,6 @@ import ( "time" "github.com/Jigsaw-Code/outline-ss-server/ipinfo" - "github.com/Jigsaw-Code/outline-ss-server/service/metrics" "github.com/op/go-logging" "github.com/prometheus/client_golang/prometheus" promtest "github.com/prometheus/client_golang/prometheus/testutil" @@ -63,166 +62,49 @@ func (c *fakeConn) RemoteAddr() net.Addr { } func TestMethodsDontPanic(t *testing.T) { - ssMetrics, _ := newPrometheusOutlineMetrics(nil) - proxyMetrics := metrics.ProxyMetrics{ - ClientProxy: 1, - ProxyTarget: 2, - TargetProxy: 3, - ProxyClient: 4, - } - addr := fakeAddr("127.0.0.1:9") - ssMetrics.SetBuildInfo("0.0.0-test") - ssMetrics.SetNumAccessKeys(20, 2) - - tcpMetrics := ssMetrics.AddOpenTCPConnection(&fakeConn{}) - tcpMetrics.AddAuthenticated("0") - tcpMetrics.AddClosed("OK", proxyMetrics, 10*time.Millisecond) - tcpMetrics.AddProbe("ERR_CIPHER", "eof", proxyMetrics.ClientProxy) - - udpMetrics := ssMetrics.AddUDPNatEntry(addr, "key-1") - udpMetrics.AddPacketFromClient("OK", 10, 20) - udpMetrics.AddPacketFromTarget("OK", 10, 20) - udpMetrics.RemoveNatEntry() - - ssMetrics.tcpServiceMetrics.AddCipherSearch(true, 10*time.Millisecond) - ssMetrics.udpServiceMetrics.AddCipherSearch(true, 10*time.Millisecond) -} - -func TestASNLabel(t *testing.T) { - require.Equal(t, "", asnLabel(0)) - require.Equal(t, "100", asnLabel(100)) + m := newPrometheusServerMetrics() + m.SetVersion("0.0.0-test") + m.SetNumAccessKeys(20, 2) } -func TestTunnelTime(t *testing.T) { - t.Run("PerKey", func(t *testing.T) { - setNow(time.Date(2010, 1, 2, 3, 4, 5, .0, time.Local)) - ssMetrics, _ := newPrometheusOutlineMetrics(nil) - reg := prometheus.NewPedanticRegistry() - reg.MustRegister(ssMetrics) - - connMetrics := ssMetrics.AddOpenTCPConnection(&fakeConn{}) - connMetrics.AddAuthenticated("key-1") - setNow(time.Date(2010, 1, 2, 3, 4, 20, .0, time.Local)) - - expected := strings.NewReader(` - # HELP tunnel_time_seconds Tunnel time, per access key. - # TYPE tunnel_time_seconds counter - tunnel_time_seconds{access_key="key-1"} 15 - `) - err := promtest.GatherAndCompare( - reg, - expected, - "tunnel_time_seconds", - ) - require.NoError(t, err, "unexpected metric value found") - }) - - t.Run("PerLocation", func(t *testing.T) { - setNow(time.Date(2010, 1, 2, 3, 4, 5, .0, time.Local)) - ssMetrics, _ := newPrometheusOutlineMetrics(&noopMap{}) - reg := prometheus.NewPedanticRegistry() - reg.MustRegister(ssMetrics) - - connMetrics := ssMetrics.AddOpenTCPConnection(&fakeConn{}) - connMetrics.AddAuthenticated("key-1") - setNow(time.Date(2010, 1, 2, 3, 4, 10, .0, time.Local)) - - expected := strings.NewReader(` - # HELP tunnel_time_seconds_per_location Tunnel time, per location. - # TYPE tunnel_time_seconds_per_location counter - tunnel_time_seconds_per_location{asn="",asorg="",location="XL"} 5 - `) - err := promtest.GatherAndCompare( - reg, - expected, - "tunnel_time_seconds_per_location", - ) - require.NoError(t, err, "unexpected metric value found") - }) -} - -func TestTunnelTimePerKeyDoesNotPanicOnUnknownClosedConnection(t *testing.T) { +func TestSetVersion(t *testing.T) { + m := newPrometheusServerMetrics() reg := prometheus.NewPedanticRegistry() - ssMetrics, _ := newPrometheusOutlineMetrics(nil) + reg.MustRegister(m) - connMetrics := ssMetrics.AddOpenTCPConnection(&fakeConn{}) - connMetrics.AddClosed("OK", metrics.ProxyMetrics{}, time.Minute) + m.SetVersion("0.0.0-test") err := promtest.GatherAndCompare( reg, - strings.NewReader(""), - "tunnel_time_seconds", + strings.NewReader(` + # HELP build_info Information on the outline-ss-server build + # TYPE build_info gauge + build_info{version="0.0.0-test"} 1 + `), + "build_info", ) - require.NoError(t, err, "unexpectedly found metric value") -} - -func BenchmarkOpenTCP(b *testing.B) { - ssMetrics, _ := newPrometheusOutlineMetrics(nil) - conn := &fakeConn{} - b.ResetTimer() - for i := 0; i < b.N; i++ { - ssMetrics.AddOpenTCPConnection(conn) - } -} - -func BenchmarkCloseTCP(b *testing.B) { - ssMetrics, _ := newPrometheusOutlineMetrics(nil) - connMetrics := ssMetrics.AddOpenTCPConnection(&fakeConn{}) - accessKey := "key 1" - status := "OK" - data := metrics.ProxyMetrics{} - duration := time.Minute - b.ResetTimer() - for i := 0; i < b.N; i++ { - connMetrics.AddAuthenticated(accessKey) - connMetrics.AddClosed(status, data, duration) - } + require.NoError(t, err, "unexpected metric value found") } -func BenchmarkProbe(b *testing.B) { - ssMetrics, _ := newPrometheusOutlineMetrics(nil) - connMetrics := ssMetrics.AddOpenTCPConnection(&fakeConn{}) - status := "ERR_REPLAY" - drainResult := "other" - data := metrics.ProxyMetrics{} - b.ResetTimer() - for i := 0; i < b.N; i++ { - connMetrics.AddProbe(status, drainResult, data.ClientProxy) - } -} +func TestSetNumAccessKeys(t *testing.T) { + m := newPrometheusServerMetrics() + reg := prometheus.NewPedanticRegistry() + reg.MustRegister(m) -func BenchmarkClientUDP(b *testing.B) { - ssMetrics, _ := newPrometheusOutlineMetrics(nil) - addr := fakeAddr("127.0.0.1:9") - accessKey := "key 1" - udpMetrics := ssMetrics.AddUDPNatEntry(addr, accessKey) - status := "OK" - size := int64(1000) - b.ResetTimer() - for i := 0; i < b.N; i++ { - udpMetrics.AddPacketFromClient(status, size, size) - } -} + m.SetNumAccessKeys(1, 2) -func BenchmarkTargetUDP(b *testing.B) { - ssMetrics, _ := newPrometheusOutlineMetrics(nil) - addr := fakeAddr("127.0.0.1:9") - accessKey := "key 1" - udpMetrics := ssMetrics.AddUDPNatEntry(addr, accessKey) - status := "OK" - size := int64(1000) - b.ResetTimer() - for i := 0; i < b.N; i++ { - udpMetrics.AddPacketFromTarget(status, size, size) - } -} - -func BenchmarkNAT(b *testing.B) { - ssMetrics, _ := newPrometheusOutlineMetrics(nil) - addr := fakeAddr("127.0.0.1:9") - b.ResetTimer() - for i := 0; i < b.N; i++ { - udpMetrics := ssMetrics.AddUDPNatEntry(addr, "key-0") - udpMetrics.RemoveNatEntry() - } + err := promtest.GatherAndCompare( + reg, + strings.NewReader(` + # HELP keys Count of access keys + # TYPE keys gauge + keys 1 + # HELP ports Count of open ports + # TYPE ports gauge + ports 2 + `), + "keys", + "ports", + ) + require.NoError(t, err, "unexpected metric value found") } diff --git a/cmd/outline-ss-server/server_test.go b/cmd/outline-ss-server/server_test.go index 20729a06..3854d778 100644 --- a/cmd/outline-ss-server/server_test.go +++ b/cmd/outline-ss-server/server_test.go @@ -17,16 +17,19 @@ package main import ( "testing" "time" + + "github.com/Jigsaw-Code/outline-ss-server/prometheus" ) -func TestRunSSServer(t *testing.T) { - m, err := newPrometheusOutlineMetrics(nil) +func TestRunOutlineServer(t *testing.T) { + serverMetrics := newPrometheusServerMetrics() + serviceMetrics, err := prometheus.NewServiceMetrics(nil) if err != nil { - t.Fatalf("Failed to create Prometheus metrics: %v", err) + t.Fatalf("Failed to create Prometheus service metrics: %v", err) } - server, err := RunSSServer("config_example.yml", 30*time.Second, m, 10000) + server, err := RunOutlineServer("config_example.yml", 30*time.Second, serverMetrics, serviceMetrics, 10000) if err != nil { - t.Fatalf("RunSSServer() error = %v", err) + t.Fatalf("RunOutlineServer() error = %v", err) } if err := server.Stop(); err != nil { t.Errorf("Error while stopping server: %v", err) diff --git a/prometheus/metrics.go b/prometheus/metrics.go new file mode 100644 index 00000000..c87277eb --- /dev/null +++ b/prometheus/metrics.go @@ -0,0 +1,578 @@ +// Copyright 2023 Jigsaw Operations LLC +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// https://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package prometheus + +import ( + "fmt" + "log/slog" + "net" + "net/netip" + "sync" + "time" + + "github.com/Jigsaw-Code/outline-ss-server/ipinfo" + "github.com/Jigsaw-Code/outline-ss-server/service" + "github.com/Jigsaw-Code/outline-ss-server/service/metrics" + "github.com/prometheus/client_golang/prometheus" +) + +// `now` is stubbable for testing. +var now = time.Now + +func newTimeToCipherVec(proto string) (prometheus.ObserverVec, error) { + vec := prometheus.NewHistogramVec( + prometheus.HistogramOpts{ + Name: "time_to_cipher_ms", + Help: "Time needed to find the cipher", + Buckets: []float64{0.1, 1, 10, 100, 1000}, + }, []string{"proto", "found_key"}) + return vec.CurryWith(map[string]string{"proto": proto}) +} + +type proxyCollector struct { + // NOTE: New metrics need to be added to `newProxyCollector()`, `Describe()` and `Collect()`. + dataBytesPerKey *prometheus.CounterVec + dataBytesPerLocation *prometheus.CounterVec +} + +func newProxyCollector(proto string) (*proxyCollector, error) { + dataBytesPerKey, err := prometheus.NewCounterVec( + prometheus.CounterOpts{ + Name: "data_bytes", + Help: "Bytes transferred by the proxy, per access key", + }, []string{"proto", "dir", "access_key"}).CurryWith(map[string]string{"proto": proto}) + if err != nil { + return nil, err + } + dataBytesPerLocation, err := prometheus.NewCounterVec( + prometheus.CounterOpts{ + Name: "data_bytes_per_location", + Help: "Bytes transferred by the proxy, per location", + }, []string{"proto", "dir", "location", "asn", "asorg"}).CurryWith(map[string]string{"proto": proto}) + if err != nil { + return nil, err + } + return &proxyCollector{ + dataBytesPerKey: dataBytesPerKey, + dataBytesPerLocation: dataBytesPerLocation, + }, nil +} + +func (c *proxyCollector) Describe(ch chan<- *prometheus.Desc) { + c.dataBytesPerKey.Describe(ch) + c.dataBytesPerLocation.Describe(ch) +} + +func (c *proxyCollector) Collect(ch chan<- prometheus.Metric) { + c.dataBytesPerKey.Collect(ch) + c.dataBytesPerLocation.Collect(ch) +} + +func (c *proxyCollector) addClientTarget(clientProxyBytes, proxyTargetBytes int64, accessKey string, clientInfo ipinfo.IPInfo) { + addIfNonZero(clientProxyBytes, c.dataBytesPerKey, "c>p", accessKey) + addIfNonZero(clientProxyBytes, c.dataBytesPerLocation, "c>p", clientInfo.CountryCode.String(), asnLabel(clientInfo.ASN.Number), clientInfo.ASN.Organization) + addIfNonZero(proxyTargetBytes, c.dataBytesPerKey, "p>t", accessKey) + addIfNonZero(proxyTargetBytes, c.dataBytesPerLocation, "p>t", clientInfo.CountryCode.String(), asnLabel(clientInfo.ASN.Number), clientInfo.ASN.Organization) +} + +func (c *proxyCollector) addTargetClient(targetProxyBytes, proxyClientBytes int64, accessKey string, clientInfo ipinfo.IPInfo) { + addIfNonZero(targetProxyBytes, c.dataBytesPerKey, "p 0 { + counterVec.WithLabelValues(lvs...).Add(float64(value)) + } +} + +func asnLabel(asn int) string { + if asn == 0 { + return "" + } + return fmt.Sprint(asn) +} + +// Converts a [net.Addr] to an [IPKey]. +func toIPKey(addr net.Addr, accessKey string) (*IPKey, error) { + hostname, _, err := net.SplitHostPort(addr.String()) + if err != nil { + return nil, fmt.Errorf("failed to create IPKey: %w", err) + } + ip, err := netip.ParseAddr(hostname) + if err != nil { + return nil, fmt.Errorf("failed to create IPKey: %w", err) + } + return &IPKey{ip, accessKey}, nil +} diff --git a/prometheus/metrics_test.go b/prometheus/metrics_test.go new file mode 100644 index 00000000..5dfcf05a --- /dev/null +++ b/prometheus/metrics_test.go @@ -0,0 +1,226 @@ +// Copyright 2023 Jigsaw Operations LLC +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// https://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package prometheus + +import ( + "net" + "strings" + "testing" + "time" + + "github.com/Jigsaw-Code/outline-ss-server/ipinfo" + "github.com/Jigsaw-Code/outline-ss-server/service/metrics" + "github.com/op/go-logging" + "github.com/prometheus/client_golang/prometheus" + promtest "github.com/prometheus/client_golang/prometheus/testutil" + "github.com/stretchr/testify/require" +) + +type noopMap struct{} + +func (*noopMap) GetIPInfo(ip net.IP) (ipinfo.IPInfo, error) { + return ipinfo.IPInfo{}, nil +} + +type fakeAddr string + +func (a fakeAddr) String() string { return string(a) } +func (a fakeAddr) Network() string { return "" } + +// Sets the processing clock to be t until changed. +func setNow(t time.Time) { + now = func() time.Time { + return t + } +} + +func init() { + logging.SetLevel(logging.INFO, "") +} + +type fakeConn struct { + net.Conn +} + +func (c *fakeConn) LocalAddr() net.Addr { + return fakeAddr("127.0.0.1:9") +} + +func (c *fakeConn) RemoteAddr() net.Addr { + return fakeAddr("127.0.0.1:10") +} + +func TestMethodsDontPanic(t *testing.T) { + ssMetrics, _ := NewServiceMetrics(nil) + proxyMetrics := metrics.ProxyMetrics{ + ClientProxy: 1, + ProxyTarget: 2, + TargetProxy: 3, + ProxyClient: 4, + } + addr := fakeAddr("127.0.0.1:9") + + tcpMetrics := ssMetrics.AddOpenTCPConnection(&fakeConn{}) + tcpMetrics.AddAuthenticated("0") + tcpMetrics.AddClosed("OK", proxyMetrics, 10*time.Millisecond) + tcpMetrics.AddProbe("ERR_CIPHER", "eof", proxyMetrics.ClientProxy) + + udpMetrics := ssMetrics.AddUDPNatEntry(addr, "key-1") + udpMetrics.AddPacketFromClient("OK", 10, 20) + udpMetrics.AddPacketFromTarget("OK", 10, 20) + udpMetrics.RemoveNatEntry() + + ssMetrics.tcpServiceMetrics.AddCipherSearch(true, 10*time.Millisecond) + ssMetrics.udpServiceMetrics.AddCipherSearch(true, 10*time.Millisecond) +} + +func TestASNLabel(t *testing.T) { + require.Equal(t, "", asnLabel(0)) + require.Equal(t, "100", asnLabel(100)) +} + +func TestTunnelTime(t *testing.T) { + t.Run("PerKey", func(t *testing.T) { + setNow(time.Date(2010, 1, 2, 3, 4, 5, .0, time.Local)) + ssMetrics, _ := NewServiceMetrics(nil) + reg := prometheus.NewPedanticRegistry() + reg.MustRegister(ssMetrics) + + connMetrics := ssMetrics.AddOpenTCPConnection(&fakeConn{}) + connMetrics.AddAuthenticated("key-1") + setNow(time.Date(2010, 1, 2, 3, 4, 20, .0, time.Local)) + + expected := strings.NewReader(` + # HELP tunnel_time_seconds Tunnel time, per access key. + # TYPE tunnel_time_seconds counter + tunnel_time_seconds{access_key="key-1"} 15 + `) + err := promtest.GatherAndCompare( + reg, + expected, + "tunnel_time_seconds", + ) + require.NoError(t, err, "unexpected metric value found") + }) + + t.Run("PerLocation", func(t *testing.T) { + setNow(time.Date(2010, 1, 2, 3, 4, 5, .0, time.Local)) + ssMetrics, _ := NewServiceMetrics(&noopMap{}) + reg := prometheus.NewPedanticRegistry() + reg.MustRegister(ssMetrics) + + connMetrics := ssMetrics.AddOpenTCPConnection(&fakeConn{}) + connMetrics.AddAuthenticated("key-1") + setNow(time.Date(2010, 1, 2, 3, 4, 10, .0, time.Local)) + + expected := strings.NewReader(` + # HELP tunnel_time_seconds_per_location Tunnel time, per location. + # TYPE tunnel_time_seconds_per_location counter + tunnel_time_seconds_per_location{asn="",asorg="",location="XL"} 5 + `) + err := promtest.GatherAndCompare( + reg, + expected, + "tunnel_time_seconds_per_location", + ) + require.NoError(t, err, "unexpected metric value found") + }) +} + +func TestTunnelTimePerKeyDoesNotPanicOnUnknownClosedConnection(t *testing.T) { + reg := prometheus.NewPedanticRegistry() + ssMetrics, _ := NewServiceMetrics(nil) + + connMetrics := ssMetrics.AddOpenTCPConnection(&fakeConn{}) + connMetrics.AddClosed("OK", metrics.ProxyMetrics{}, time.Minute) + + err := promtest.GatherAndCompare( + reg, + strings.NewReader(""), + "tunnel_time_seconds", + ) + require.NoError(t, err, "unexpectedly found metric value") +} + +func BenchmarkOpenTCP(b *testing.B) { + ssMetrics, _ := NewServiceMetrics(nil) + conn := &fakeConn{} + b.ResetTimer() + for i := 0; i < b.N; i++ { + ssMetrics.AddOpenTCPConnection(conn) + } +} + +func BenchmarkCloseTCP(b *testing.B) { + ssMetrics, _ := NewServiceMetrics(nil) + connMetrics := ssMetrics.AddOpenTCPConnection(&fakeConn{}) + accessKey := "key 1" + status := "OK" + data := metrics.ProxyMetrics{} + duration := time.Minute + b.ResetTimer() + for i := 0; i < b.N; i++ { + connMetrics.AddAuthenticated(accessKey) + connMetrics.AddClosed(status, data, duration) + } +} + +func BenchmarkProbe(b *testing.B) { + ssMetrics, _ := NewServiceMetrics(nil) + connMetrics := ssMetrics.AddOpenTCPConnection(&fakeConn{}) + status := "ERR_REPLAY" + drainResult := "other" + data := metrics.ProxyMetrics{} + b.ResetTimer() + for i := 0; i < b.N; i++ { + connMetrics.AddProbe(status, drainResult, data.ClientProxy) + } +} + +func BenchmarkClientUDP(b *testing.B) { + ssMetrics, _ := NewServiceMetrics(nil) + addr := fakeAddr("127.0.0.1:9") + accessKey := "key 1" + udpMetrics := ssMetrics.AddUDPNatEntry(addr, accessKey) + status := "OK" + size := int64(1000) + b.ResetTimer() + for i := 0; i < b.N; i++ { + udpMetrics.AddPacketFromClient(status, size, size) + } +} + +func BenchmarkTargetUDP(b *testing.B) { + ssMetrics, _ := NewServiceMetrics(nil) + addr := fakeAddr("127.0.0.1:9") + accessKey := "key 1" + udpMetrics := ssMetrics.AddUDPNatEntry(addr, accessKey) + status := "OK" + size := int64(1000) + b.ResetTimer() + for i := 0; i < b.N; i++ { + udpMetrics.AddPacketFromTarget(status, size, size) + } +} + +func BenchmarkNAT(b *testing.B) { + ssMetrics, _ := NewServiceMetrics(nil) + addr := fakeAddr("127.0.0.1:9") + b.ResetTimer() + for i := 0; i < b.N; i++ { + udpMetrics := ssMetrics.AddUDPNatEntry(addr, "key-0") + udpMetrics.RemoveNatEntry() + } +} diff --git a/service/shadowsocks.go b/service/shadowsocks.go index 72170801..f979bcce 100644 --- a/service/shadowsocks.go +++ b/service/shadowsocks.go @@ -14,13 +14,128 @@ package service -import "time" +import ( + "context" + "net" + "time" + + "github.com/Jigsaw-Code/outline-sdk/transport" +) + +const ( + // 59 seconds is most common timeout for servers that do not respond to invalid requests + tcpReadTimeout time.Duration = 59 * time.Second + + // A UDP NAT timeout of at least 5 minutes is recommended in RFC 4787 Section 4.3. + defaultNatTimeout time.Duration = 5 * time.Minute +) // ShadowsocksConnMetrics is used to report Shadowsocks related metrics on connections. type ShadowsocksConnMetrics interface { AddCipherSearch(accessKeyFound bool, timeToCipher time.Duration) } +type ServiceMetrics interface { + UDPMetrics + AddOpenTCPConnection(conn net.Conn) TCPConnMetrics + AddCipherSearch(proto string, accessKeyFound bool, timeToCipher time.Duration) +} + +type Service interface { + HandleStream(ctx context.Context, conn transport.StreamConn) + HandlePacket(conn net.PacketConn) +} + +// Option is a Shadowsocks service constructor option. +type Option func(s *ssService) + +type ssService struct { + metrics ServiceMetrics + ciphers CipherList + natTimeout time.Duration + replayCache *ReplayCache + + sh StreamHandler + ph PacketHandler +} + +// NewShadowsocksService creates a new service +func NewShadowsocksService(opts ...Option) (Service, error) { + s := &ssService{} + + for _, opt := range opts { + opt(s) + } + + if s.natTimeout == 0 { + s.natTimeout = defaultNatTimeout + } + + // TODO: Register initial data metrics at zero. + s.sh = NewStreamHandler( + NewShadowsocksStreamAuthenticator(s.ciphers, s.replayCache, &ssConnMetrics{ServiceMetrics: s.metrics, proto: "tcp"}), + tcpReadTimeout, + ) + s.ph = NewPacketHandler(s.natTimeout, s.ciphers, s.metrics, &ssConnMetrics{ServiceMetrics: s.metrics, proto: "udp"}) + + return s, nil +} + +// WithCiphers option function. +func WithCiphers(ciphers CipherList) Option { + return func(s *ssService) { + s.ciphers = ciphers + } +} + +// WithMetrics option function. +func WithMetrics(metrics ServiceMetrics) Option { + return func(s *ssService) { + s.metrics = metrics + } +} + +// WithReplayCache option function. +func WithReplayCache(replayCache *ReplayCache) Option { + return func(s *ssService) { + s.replayCache = replayCache + } +} + +// WithNatTimeout option function. +func WithNatTimeout(natTimeout time.Duration) Option { + return func(s *ssService) { + s.natTimeout = natTimeout + } +} + +// HandleStream handles a Shadowsocks stream-based connection. +func (s *ssService) HandleStream(ctx context.Context, conn transport.StreamConn) { + var connMetrics TCPConnMetrics + if s.metrics != nil { + connMetrics = s.metrics.AddOpenTCPConnection(conn) + } + s.sh.Handle(ctx, conn, connMetrics) +} + +// HandlePacket handles a Shadowsocks packet connection. +func (s *ssService) HandlePacket(conn net.PacketConn) { + s.ph.Handle(conn) +} + +type ssConnMetrics struct { + ServiceMetrics + proto string +} + +var _ ShadowsocksConnMetrics = (*ssConnMetrics)(nil) + +func (cm *ssConnMetrics) AddCipherSearch(accessKeyFound bool, timeToCipher time.Duration) { + if cm.ServiceMetrics != nil { + cm.ServiceMetrics.AddCipherSearch(cm.proto, accessKeyFound, timeToCipher) + } +} + // NoOpShadowsocksConnMetrics is a [ShadowsocksConnMetrics] that doesn't do anything. Useful in tests // or if you don't want to track metrics. type NoOpShadowsocksConnMetrics struct{}