Skip to content

Commit

Permalink
Merge branch 'master' into sbruens/deadlock
Browse files Browse the repository at this point in the history
  • Loading branch information
sbruens committed Sep 18, 2024
2 parents 9712cd5 + b561f49 commit 136432d
Show file tree
Hide file tree
Showing 7 changed files with 1,023 additions and 775 deletions.
126 changes: 49 additions & 77 deletions cmd/outline-ss-server/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,6 @@ package main

import (
"container/list"
"context"
"flag"
"fmt"
"log/slog"
Expand All @@ -29,9 +28,9 @@ import (
"syscall"
"time"

"github.com/Jigsaw-Code/outline-sdk/transport"
"github.com/Jigsaw-Code/outline-sdk/transport/shadowsocks"
"github.com/Jigsaw-Code/outline-ss-server/ipinfo"
outline_prometheus "github.com/Jigsaw-Code/outline-ss-server/prometheus"
"github.com/Jigsaw-Code/outline-ss-server/service"
"github.com/lmittmann/tint"
"github.com/prometheus/client_golang/prometheus"
Expand All @@ -58,15 +57,16 @@ func init() {
)
}

type SSServer struct {
stopConfig func() error
lnManager service.ListenerManager
natTimeout time.Duration
m *outlineMetrics
replayCache service.ReplayCache
type OutlineServer struct {
stopConfig func() error
lnManager service.ListenerManager
natTimeout time.Duration
serverMetrics *serverMetrics
serviceMetrics service.ServiceMetrics
replayCache service.ReplayCache
}

func (s *SSServer) loadConfig(filename string) error {
func (s *OutlineServer) loadConfig(filename string) error {
configData, err := os.ReadFile(filename)
if err != nil {
return fmt.Errorf("failed to read config file %s: %w", filename, err)
Expand Down Expand Up @@ -120,32 +120,6 @@ func newCipherListFromConfig(config ServiceConfig) (service.CipherList, error) {
return ciphers, nil
}

func (s *SSServer) NewShadowsocksStreamHandler(ciphers service.CipherList) service.StreamHandler {
authFunc := service.NewShadowsocksStreamAuthenticator(ciphers, &s.replayCache, s.m.tcpServiceMetrics)
// TODO: Register initial data metrics at zero.
return service.NewStreamHandler(authFunc, tcpReadTimeout)
}

func (s *SSServer) NewShadowsocksPacketHandler(ciphers service.CipherList) service.PacketHandler {
return service.NewPacketHandler(s.natTimeout, ciphers, s.m, s.m.udpServiceMetrics)
}

func (s *SSServer) NewShadowsocksStreamHandlerFromConfig(config ServiceConfig) (service.StreamHandler, error) {
ciphers, err := newCipherListFromConfig(config)
if err != nil {
return nil, err
}
return s.NewShadowsocksStreamHandler(ciphers), nil
}

func (s *SSServer) NewShadowsocksPacketHandlerFromConfig(config ServiceConfig) (service.PacketHandler, error) {
ciphers, err := newCipherListFromConfig(config)
if err != nil {
return nil, err
}
return s.NewShadowsocksPacketHandler(ciphers), nil
}

type listenerSet struct {
manager service.ListenerManager
listenerCloseFuncs map[string]func() error
Expand Down Expand Up @@ -207,7 +181,7 @@ func (ls *listenerSet) Len() int {
return len(ls.listenerCloseFuncs)
}

func (s *SSServer) runConfig(config Config) (func() error, error) {
func (s *OutlineServer) runConfig(config Config) (func() error, error) {
startErrCh := make(chan error)
stopErrCh := make(chan error)
stopCh := make(chan struct{})
Expand Down Expand Up @@ -243,31 +217,41 @@ func (s *SSServer) runConfig(config Config) (func() error, error) {
ciphers := service.NewCipherList()
ciphers.Update(cipherList)

sh := s.NewShadowsocksStreamHandler(ciphers)
ssService, err := service.NewShadowsocksService(
service.WithCiphers(ciphers),
service.WithNatTimeout(s.natTimeout),
service.WithMetrics(s.serviceMetrics),
service.WithReplayCache(&s.replayCache),
)
ln, err := lnSet.ListenStream(addr)
if err != nil {
return err
}
slog.Info("TCP service started.", "address", ln.Addr().String())
go service.StreamServe(ln.AcceptStream, func(ctx context.Context, conn transport.StreamConn) {
connMetrics := s.m.AddOpenTCPConnection(conn)
sh.Handle(ctx, conn, connMetrics)
})
go service.StreamServe(ln.AcceptStream, ssService.HandleStream)

pc, err := lnSet.ListenPacket(addr)
if err != nil {
return err
}
slog.Info("UDP service started.", "address", pc.LocalAddr().String())
ph := s.NewShadowsocksPacketHandler(ciphers)
go ph.Handle(pc)
go ssService.HandlePacket(pc)
}

for _, serviceConfig := range config.Services {
var (
sh service.StreamHandler
ph service.PacketHandler
ciphers, err := newCipherListFromConfig(serviceConfig)
if err != nil {
return fmt.Errorf("failed to create cipher list from config: %v", err)
}
ssService, err := service.NewShadowsocksService(
service.WithCiphers(ciphers),
service.WithNatTimeout(s.natTimeout),
service.WithMetrics(s.serviceMetrics),
service.WithReplayCache(&s.replayCache),
)
if err != nil {
return err
}
for _, lnConfig := range serviceConfig.Listeners {
switch lnConfig.Type {
case listenerTypeTCP:
Expand All @@ -276,36 +260,21 @@ func (s *SSServer) runConfig(config Config) (func() error, error) {
return err
}
slog.Info("TCP service started.", "address", ln.Addr().String())
if sh == nil {
sh, err = s.NewShadowsocksStreamHandlerFromConfig(serviceConfig)
if err != nil {
return err
}
}
go service.StreamServe(ln.AcceptStream, func(ctx context.Context, conn transport.StreamConn) {
connMetrics := s.m.AddOpenTCPConnection(conn)
sh.Handle(ctx, conn, connMetrics)
})
go service.StreamServe(ln.AcceptStream, ssService.HandleStream)
case listenerTypeUDP:
pc, err := lnSet.ListenPacket(lnConfig.Address)
if err != nil {
return err
}
slog.Info("UDP service started.", "address", pc.LocalAddr().String())
if ph == nil {
ph, err = s.NewShadowsocksPacketHandlerFromConfig(serviceConfig)
if err != nil {
return err
}
}
go ph.Handle(pc)
go ssService.HandlePacket(pc)
}
}
totalCipherCount += len(serviceConfig.Keys)
}

slog.Info("Loaded config.", "access_keys", totalCipherCount, "listeners", lnSet.Len())
s.m.SetNumAccessKeys(totalCipherCount, lnSet.Len())
s.serverMetrics.SetNumAccessKeys(totalCipherCount, lnSet.Len())
return nil
}()

Expand All @@ -327,7 +296,7 @@ func (s *SSServer) runConfig(config Config) (func() error, error) {
}

// Stop stops serving the current config.
func (s *SSServer) Stop() error {
func (s *OutlineServer) Stop() error {
stopFunc := s.stopConfig
if stopFunc == nil {
return nil
Expand All @@ -340,13 +309,14 @@ func (s *SSServer) Stop() error {
return nil
}

// RunSSServer starts a shadowsocks server running, and returns the server or an error.
func RunSSServer(filename string, natTimeout time.Duration, sm *outlineMetrics, replayHistory int) (*SSServer, error) {
server := &SSServer{
lnManager: service.NewListenerManager(),
natTimeout: natTimeout,
m: sm,
replayCache: service.NewReplayCache(replayHistory),
// RunOutlineServer starts an Outline server running, and returns the server or an error.
func RunOutlineServer(filename string, natTimeout time.Duration, serverMetrics *serverMetrics, serviceMetrics service.ServiceMetrics, replayHistory int) (*OutlineServer, error) {
server := &OutlineServer{
lnManager: service.NewListenerManager(),
natTimeout: natTimeout,
serverMetrics: serverMetrics,
serviceMetrics: serviceMetrics,
replayCache: service.NewReplayCache(replayHistory),
}
err := server.loadConfig(filename)
if err != nil {
Expand Down Expand Up @@ -424,14 +394,16 @@ func main() {
}
defer ip2info.Close()

metrics, err := newPrometheusOutlineMetrics(ip2info)
serverMetrics := newPrometheusServerMetrics()
serverMetrics.SetVersion(version)
serviceMetrics, err := outline_prometheus.NewServiceMetrics(ip2info)
if err != nil {
slog.Error("Failed to create Outline Prometheus metrics. Aborting.", "err", err)
slog.Error("Failed to create Outline Prometheus service metrics. Aborting.", "err", err)
}
metrics.SetBuildInfo(version)
r := prometheus.WrapRegistererWithPrefix("shadowsocks_", prometheus.DefaultRegisterer)
r.MustRegister(metrics)
_, err = RunSSServer(flags.ConfigFile, flags.natTimeout, metrics, flags.replayHistory)
r.MustRegister(serverMetrics, serviceMetrics)

_, err = RunOutlineServer(flags.ConfigFile, flags.natTimeout, serverMetrics, serviceMetrics, flags.replayHistory)
if err != nil {
slog.Error("Server failed to start. Aborting.", "err", err)
}
Expand Down
Loading

0 comments on commit 136432d

Please sign in to comment.