From 1920ee0d44c45dcd1df2046c37fb60365be3067f Mon Sep 17 00:00:00 2001 From: Vinicius Fortuna Date: Fri, 19 Apr 2024 13:16:21 -0400 Subject: [PATCH 01/27] Collect connection address --- x/examples/test-connectivity/main.go | 43 ++++++++++++++++++++++++++-- 1 file changed, 40 insertions(+), 3 deletions(-) diff --git a/x/examples/test-connectivity/main.go b/x/examples/test-connectivity/main.go index 45b4ea8d..c5df4718 100644 --- a/x/examples/test-connectivity/main.go +++ b/x/examples/test-connectivity/main.go @@ -31,6 +31,7 @@ import ( "time" "github.com/Jigsaw-Code/outline-sdk/dns" + "github.com/Jigsaw-Code/outline-sdk/transport" "github.com/Jigsaw-Code/outline-sdk/x/config" "github.com/Jigsaw-Code/outline-sdk/x/connectivity" "github.com/Jigsaw-Code/outline-sdk/x/report" @@ -47,6 +48,9 @@ type connectivityReport struct { // TODO(fortuna): add sanitized transport config. Transport string `json:"transport"` + // The address of the selected connection to the proxy server. + ConnectionAddress addressJSON `json:"connection_address"` + // Observations Time time.Time `json:"time"` DurationMs int64 `json:"duration_ms"` @@ -62,6 +66,19 @@ type errorJSON struct { Msg string `json:"msg,omitempty"` } +type addressJSON struct { + Host string `json:"host"` + Port string `json:"port"` +} + +func newAddressJSON(address string) (addressJSON, error) { + host, port, err := net.SplitHostPort(address) + if err != nil { + return addressJSON{}, err + } + return addressJSON{host, port}, nil +} + func makeErrorRecord(result *connectivity.ConnectivityError) *errorJSON { if result == nil { return nil @@ -168,15 +185,30 @@ func main() { for _, proto := range strings.Split(*protoFlag, ",") { proto = strings.TrimSpace(proto) var resolver dns.Resolver + var connectionAddress string switch proto { case "tcp": - streamDialer, err := configToDialer.NewStreamDialer(*transportFlag) + baseDialer := transport.FuncStreamDialer(func(ctx context.Context, addr string) (transport.StreamConn, error) { + conn, err := (&transport.TCPDialer{}).DialStream(ctx, addr) + if conn != nil { + connectionAddress = conn.RemoteAddr().String() + } + return conn, err + }) + streamDialer, err := config.WrapStreamDialer(baseDialer, *transportFlag) if err != nil { log.Fatalf("Failed to create StreamDialer: %v", err) } resolver = dns.NewTCPResolver(streamDialer, resolverAddress) case "udp": - packetDialer, err := configToDialer.NewPacketDialer(*transportFlag) + baseDialer := transport.FuncPacketDialer(func(ctx context.Context, addr string) (net.Conn, error) { + conn, err := (&transport.UDPDialer{}).DialPacket(ctx, addr) + if conn != nil { + connectionAddress = conn.RemoteAddr().String() + } + return conn, err + }) + packetDialer, err := config.WrapPacketDialer(baseDialer, *transportFlag) if err != nil { log.Fatalf("Failed to create PacketDialer: %v", err) } @@ -198,7 +230,7 @@ func main() { if err != nil { log.Fatalf("Failed to sanitize config: %v", err) } - var r report.Report = connectivityReport{ + r := connectivityReport{ Resolver: resolverAddress, Proto: proto, Time: startTime.UTC().Truncate(time.Second), @@ -207,6 +239,11 @@ func main() { DurationMs: testDuration.Milliseconds(), Error: makeErrorRecord(result), } + connectionAddressJSON, err := newAddressJSON(connectionAddress) + if err == nil { + r.ConnectionAddress = connectionAddressJSON + } + if reportCollector != nil { err = reportCollector.Collect(context.Background(), r) if err != nil { From a50d0eb785fcb9d8280287aaf878b4fb8f611c9a Mon Sep 17 00:00:00 2001 From: Vinicius Fortuna Date: Fri, 19 Apr 2024 14:04:49 -0400 Subject: [PATCH 02/27] Try intercepting the transport dialer --- x/connectivity/connectivity.go | 51 ++++++++++++++++++++++++++++ x/examples/test-connectivity/main.go | 42 +++++++---------------- 2 files changed, 64 insertions(+), 29 deletions(-) diff --git a/x/connectivity/connectivity.go b/x/connectivity/connectivity.go index c6d26124..54e6f80f 100644 --- a/x/connectivity/connectivity.go +++ b/x/connectivity/connectivity.go @@ -18,13 +18,24 @@ import ( "context" "errors" "fmt" + "log" + "net" "syscall" "time" "github.com/Jigsaw-Code/outline-sdk/dns" + "github.com/Jigsaw-Code/outline-sdk/transport" "golang.org/x/net/dns/dnsmessage" ) +// ConnectivityResult captures the observed result of the connectivity test. +type ConnectivityResult struct { + // Address we connected to + ConnectionAddress string + // Observed error + Error *ConnectivityError +} + // ConnectivityError captures the observed error of the connectivity test. type ConnectivityError struct { // Which operation in the test that failed: "connect", "send" or "receive" @@ -64,6 +75,46 @@ func makeConnectivityError(op string, err error) *ConnectivityError { return &ConnectivityError{Op: op, PosixError: code, Err: err} } +func TestStreamConnectivityWithDNS(ctx context.Context, dialer transport.StreamDialer, resolverAddress string, testDomain string) (*ConnectivityResult, error) { + result := &ConnectivityResult{} + captureDialer := transport.FuncStreamDialer(func(ctx context.Context, addr string) (transport.StreamConn, error) { + conn, err := dialer.DialStream(ctx, addr) + if conn != nil { + result.ConnectionAddress = conn.RemoteAddr().String() + log.Println("address", result.ConnectionAddress) + } + return conn, err + }) + resolver := dns.NewTCPResolver(captureDialer, resolverAddress) + var err error + result.Error, err = TestConnectivityWithResolver(ctx, resolver, testDomain) + if err != nil { + return nil, err + } + return result, nil +} + +func TestPacketConnectivityWithDNS(ctx context.Context, dialer transport.PacketDialer, resolverAddress string, testDomain string) (*ConnectivityResult, error) { + result := &ConnectivityResult{} + captureDialer := transport.FuncPacketDialer(func(ctx context.Context, addr string) (net.Conn, error) { + conn, err := dialer.DialPacket(ctx, addr) + if conn != nil { + // This doesn't work with the PacketListenerDialer we use because it returns the address of the target, not the proxy. + // TODO(fortuna): make PLD use the first hop address or try something else. + result.ConnectionAddress = conn.RemoteAddr().String() + log.Println("address", result.ConnectionAddress) + } + return conn, err + }) + resolver := dns.NewUDPResolver(captureDialer, resolverAddress) + var err error + result.Error, err = TestConnectivityWithResolver(ctx, resolver, testDomain) + if err != nil { + return nil, err + } + return result, nil +} + // TestConnectivityWithResolver tests weather we can get a response from the given [Resolver]. It can be used // to test connectivity of its underlying [transport.StreamDialer] or [transport.PacketDialer]. // Invalid tests that cannot assert connectivity will return (nil, error). diff --git a/x/examples/test-connectivity/main.go b/x/examples/test-connectivity/main.go index c5df4718..a3d2c424 100644 --- a/x/examples/test-connectivity/main.go +++ b/x/examples/test-connectivity/main.go @@ -30,7 +30,6 @@ import ( "strings" "time" - "github.com/Jigsaw-Code/outline-sdk/dns" "github.com/Jigsaw-Code/outline-sdk/transport" "github.com/Jigsaw-Code/outline-sdk/x/config" "github.com/Jigsaw-Code/outline-sdk/x/connectivity" @@ -184,48 +183,33 @@ func main() { resolverAddress := net.JoinHostPort(resolverHost, "53") for _, proto := range strings.Split(*protoFlag, ",") { proto = strings.TrimSpace(proto) - var resolver dns.Resolver - var connectionAddress string + var testResult *connectivity.ConnectivityResult + var testErr error + startTime := time.Now() switch proto { case "tcp": - baseDialer := transport.FuncStreamDialer(func(ctx context.Context, addr string) (transport.StreamConn, error) { - conn, err := (&transport.TCPDialer{}).DialStream(ctx, addr) - if conn != nil { - connectionAddress = conn.RemoteAddr().String() - } - return conn, err - }) - streamDialer, err := config.WrapStreamDialer(baseDialer, *transportFlag) + dialer, err := config.WrapStreamDialer(&transport.TCPDialer{}, *transportFlag) if err != nil { log.Fatalf("Failed to create StreamDialer: %v", err) } - resolver = dns.NewTCPResolver(streamDialer, resolverAddress) + testResult, testErr = connectivity.TestStreamConnectivityWithDNS(context.Background(), dialer, resolverAddress, *domainFlag) case "udp": - baseDialer := transport.FuncPacketDialer(func(ctx context.Context, addr string) (net.Conn, error) { - conn, err := (&transport.UDPDialer{}).DialPacket(ctx, addr) - if conn != nil { - connectionAddress = conn.RemoteAddr().String() - } - return conn, err - }) - packetDialer, err := config.WrapPacketDialer(baseDialer, *transportFlag) + dialer, err := config.WrapPacketDialer(&transport.UDPDialer{}, *transportFlag) if err != nil { log.Fatalf("Failed to create PacketDialer: %v", err) } - resolver = dns.NewUDPResolver(packetDialer, resolverAddress) + testResult, testErr = connectivity.TestPacketConnectivityWithDNS(context.Background(), dialer, resolverAddress, *domainFlag) default: log.Fatalf(`Invalid proto %v. Must be "tcp" or "udp"`, proto) } - startTime := time.Now() - result, err := connectivity.TestConnectivityWithResolver(context.Background(), resolver, *domainFlag) - if err != nil { - log.Fatalf("Connectivity test failed to run: %v", err) + if testErr != nil { + log.Fatalf("Connectivity test failed to run: %v", testErr) } testDuration := time.Since(startTime) - if result == nil { + if testResult.Error == nil { success = true } - debugLog.Printf("Test %v %v result: %v", proto, resolverAddress, result) + debugLog.Printf("Test %v %v result: %v", proto, resolverAddress, testResult) sanitizedConfig, err := config.SanitizeConfig(*transportFlag) if err != nil { log.Fatalf("Failed to sanitize config: %v", err) @@ -237,9 +221,9 @@ func main() { // TODO(fortuna): Add sanitized config: Transport: sanitizedConfig, DurationMs: testDuration.Milliseconds(), - Error: makeErrorRecord(result), + Error: makeErrorRecord(testResult.Error), } - connectionAddressJSON, err := newAddressJSON(connectionAddress) + connectionAddressJSON, err := newAddressJSON(testResult.ConnectionAddress) if err == nil { r.ConnectionAddress = connectionAddressJSON } From f98ba8bafe00f64da09d588af64d8960b69a9d93 Mon Sep 17 00:00:00 2001 From: Vinicius Fortuna Date: Fri, 19 Apr 2024 14:52:31 -0400 Subject: [PATCH 03/27] Pass wrapper instead --- x/connectivity/connectivity.go | 33 +++++++++++++++++----------- x/examples/test-connectivity/main.go | 14 +++++------- 2 files changed, 26 insertions(+), 21 deletions(-) diff --git a/x/connectivity/connectivity.go b/x/connectivity/connectivity.go index 54e6f80f..d607eb49 100644 --- a/x/connectivity/connectivity.go +++ b/x/connectivity/connectivity.go @@ -18,7 +18,6 @@ import ( "context" "errors" "fmt" - "log" "net" "syscall" "time" @@ -75,18 +74,22 @@ func makeConnectivityError(op string, err error) *ConnectivityError { return &ConnectivityError{Op: op, PosixError: code, Err: err} } -func TestStreamConnectivityWithDNS(ctx context.Context, dialer transport.StreamDialer, resolverAddress string, testDomain string) (*ConnectivityResult, error) { +type WrapStreamDialer func(ctx context.Context, baseDialer transport.StreamDialer) (transport.StreamDialer, error) + +func TestStreamConnectivityWithDNS(ctx context.Context, baseDialer transport.StreamDialer, wrap WrapStreamDialer, resolverAddress string, testDomain string) (*ConnectivityResult, error) { result := &ConnectivityResult{} - captureDialer := transport.FuncStreamDialer(func(ctx context.Context, addr string) (transport.StreamConn, error) { - conn, err := dialer.DialStream(ctx, addr) + interceptDialer := transport.FuncStreamDialer(func(ctx context.Context, addr string) (transport.StreamConn, error) { + conn, err := baseDialer.DialStream(ctx, addr) if conn != nil { result.ConnectionAddress = conn.RemoteAddr().String() - log.Println("address", result.ConnectionAddress) } return conn, err }) - resolver := dns.NewTCPResolver(captureDialer, resolverAddress) - var err error + dialer, err := wrap(ctx, interceptDialer) + if err != nil { + return nil, err + } + resolver := dns.NewTCPResolver(dialer, resolverAddress) result.Error, err = TestConnectivityWithResolver(ctx, resolver, testDomain) if err != nil { return nil, err @@ -94,20 +97,24 @@ func TestStreamConnectivityWithDNS(ctx context.Context, dialer transport.StreamD return result, nil } -func TestPacketConnectivityWithDNS(ctx context.Context, dialer transport.PacketDialer, resolverAddress string, testDomain string) (*ConnectivityResult, error) { +type WrapPacketDialer func(ctx context.Context, baseDialer transport.PacketDialer) (transport.PacketDialer, error) + +func TestPacketConnectivityWithDNS(ctx context.Context, baseDialer transport.PacketDialer, wrap WrapPacketDialer, resolverAddress string, testDomain string) (*ConnectivityResult, error) { result := &ConnectivityResult{} - captureDialer := transport.FuncPacketDialer(func(ctx context.Context, addr string) (net.Conn, error) { - conn, err := dialer.DialPacket(ctx, addr) + interceptDialer := transport.FuncPacketDialer(func(ctx context.Context, addr string) (net.Conn, error) { + conn, err := baseDialer.DialPacket(ctx, addr) if conn != nil { // This doesn't work with the PacketListenerDialer we use because it returns the address of the target, not the proxy. // TODO(fortuna): make PLD use the first hop address or try something else. result.ConnectionAddress = conn.RemoteAddr().String() - log.Println("address", result.ConnectionAddress) } return conn, err }) - resolver := dns.NewUDPResolver(captureDialer, resolverAddress) - var err error + dialer, err := wrap(ctx, interceptDialer) + if err != nil { + return nil, err + } + resolver := dns.NewUDPResolver(dialer, resolverAddress) result.Error, err = TestConnectivityWithResolver(ctx, resolver, testDomain) if err != nil { return nil, err diff --git a/x/examples/test-connectivity/main.go b/x/examples/test-connectivity/main.go index a3d2c424..cc9f7357 100644 --- a/x/examples/test-connectivity/main.go +++ b/x/examples/test-connectivity/main.go @@ -188,17 +188,15 @@ func main() { startTime := time.Now() switch proto { case "tcp": - dialer, err := config.WrapStreamDialer(&transport.TCPDialer{}, *transportFlag) - if err != nil { - log.Fatalf("Failed to create StreamDialer: %v", err) + wrap := func(ctx context.Context, baseDialer transport.StreamDialer) (transport.StreamDialer, error) { + return config.WrapStreamDialer(baseDialer, *transportFlag) } - testResult, testErr = connectivity.TestStreamConnectivityWithDNS(context.Background(), dialer, resolverAddress, *domainFlag) + testResult, testErr = connectivity.TestStreamConnectivityWithDNS(context.Background(), &transport.TCPDialer{}, wrap, resolverAddress, *domainFlag) case "udp": - dialer, err := config.WrapPacketDialer(&transport.UDPDialer{}, *transportFlag) - if err != nil { - log.Fatalf("Failed to create PacketDialer: %v", err) + wrap := func(ctx context.Context, baseDialer transport.PacketDialer) (transport.PacketDialer, error) { + return config.WrapPacketDialer(baseDialer, *transportFlag) } - testResult, testErr = connectivity.TestPacketConnectivityWithDNS(context.Background(), dialer, resolverAddress, *domainFlag) + testResult, testErr = connectivity.TestPacketConnectivityWithDNS(context.Background(), &transport.UDPDialer{}, wrap, resolverAddress, *domainFlag) default: log.Fatalf(`Invalid proto %v. Must be "tcp" or "udp"`, proto) } From c5ab95c42df7b585a9f144b9c26cd14d857eacf0 Mon Sep 17 00:00:00 2001 From: Vinicius Fortuna Date: Fri, 19 Apr 2024 15:01:01 -0400 Subject: [PATCH 04/27] Clean up --- x/connectivity/connectivity.go | 14 ++++++++------ x/examples/test-connectivity/main.go | 4 ++-- 2 files changed, 10 insertions(+), 8 deletions(-) diff --git a/x/connectivity/connectivity.go b/x/connectivity/connectivity.go index d607eb49..bf8e7026 100644 --- a/x/connectivity/connectivity.go +++ b/x/connectivity/connectivity.go @@ -74,8 +74,10 @@ func makeConnectivityError(op string, err error) *ConnectivityError { return &ConnectivityError{Op: op, PosixError: code, Err: err} } -type WrapStreamDialer func(ctx context.Context, baseDialer transport.StreamDialer) (transport.StreamDialer, error) +type WrapStreamDialer func(baseDialer transport.StreamDialer) (transport.StreamDialer, error) +// TestStreamConnectivityWithDNS tests weather we can get a response from a DNS resolver at resolverAddress over a stream connection. It sends testDomain as the query. +// It uses the baseDialer to create a first-hop connection to the proxy, and the wrap to apply the transport. func TestStreamConnectivityWithDNS(ctx context.Context, baseDialer transport.StreamDialer, wrap WrapStreamDialer, resolverAddress string, testDomain string) (*ConnectivityResult, error) { result := &ConnectivityResult{} interceptDialer := transport.FuncStreamDialer(func(ctx context.Context, addr string) (transport.StreamConn, error) { @@ -85,7 +87,7 @@ func TestStreamConnectivityWithDNS(ctx context.Context, baseDialer transport.Str } return conn, err }) - dialer, err := wrap(ctx, interceptDialer) + dialer, err := wrap(interceptDialer) if err != nil { return nil, err } @@ -97,20 +99,20 @@ func TestStreamConnectivityWithDNS(ctx context.Context, baseDialer transport.Str return result, nil } -type WrapPacketDialer func(ctx context.Context, baseDialer transport.PacketDialer) (transport.PacketDialer, error) +type WrapPacketDialer func(baseDialer transport.PacketDialer) (transport.PacketDialer, error) +// TestPacketConnectivityWithDNS tests weather we can get a response from a DNS resolver at resolverAddress over a packet connection. It sends testDomain as the query. +// It uses the baseDialer to create a first-hop connection to the proxy, and the wrap to apply the transport. func TestPacketConnectivityWithDNS(ctx context.Context, baseDialer transport.PacketDialer, wrap WrapPacketDialer, resolverAddress string, testDomain string) (*ConnectivityResult, error) { result := &ConnectivityResult{} interceptDialer := transport.FuncPacketDialer(func(ctx context.Context, addr string) (net.Conn, error) { conn, err := baseDialer.DialPacket(ctx, addr) if conn != nil { - // This doesn't work with the PacketListenerDialer we use because it returns the address of the target, not the proxy. - // TODO(fortuna): make PLD use the first hop address or try something else. result.ConnectionAddress = conn.RemoteAddr().String() } return conn, err }) - dialer, err := wrap(ctx, interceptDialer) + dialer, err := wrap(interceptDialer) if err != nil { return nil, err } diff --git a/x/examples/test-connectivity/main.go b/x/examples/test-connectivity/main.go index cc9f7357..7ad2cb56 100644 --- a/x/examples/test-connectivity/main.go +++ b/x/examples/test-connectivity/main.go @@ -188,12 +188,12 @@ func main() { startTime := time.Now() switch proto { case "tcp": - wrap := func(ctx context.Context, baseDialer transport.StreamDialer) (transport.StreamDialer, error) { + wrap := func(baseDialer transport.StreamDialer) (transport.StreamDialer, error) { return config.WrapStreamDialer(baseDialer, *transportFlag) } testResult, testErr = connectivity.TestStreamConnectivityWithDNS(context.Background(), &transport.TCPDialer{}, wrap, resolverAddress, *domainFlag) case "udp": - wrap := func(ctx context.Context, baseDialer transport.PacketDialer) (transport.PacketDialer, error) { + wrap := func(baseDialer transport.PacketDialer) (transport.PacketDialer, error) { return config.WrapPacketDialer(baseDialer, *transportFlag) } testResult, testErr = connectivity.TestPacketConnectivityWithDNS(context.Background(), &transport.UDPDialer{}, wrap, resolverAddress, *domainFlag) From f88036d41d2a89751eb1148e735691be7fcc5eef Mon Sep 17 00:00:00 2001 From: Vinicius Fortuna Date: Fri, 19 Apr 2024 15:07:44 -0400 Subject: [PATCH 05/27] Comment --- x/connectivity/connectivity.go | 2 ++ 1 file changed, 2 insertions(+) diff --git a/x/connectivity/connectivity.go b/x/connectivity/connectivity.go index bf8e7026..9aa5476a 100644 --- a/x/connectivity/connectivity.go +++ b/x/connectivity/connectivity.go @@ -78,6 +78,7 @@ type WrapStreamDialer func(baseDialer transport.StreamDialer) (transport.StreamD // TestStreamConnectivityWithDNS tests weather we can get a response from a DNS resolver at resolverAddress over a stream connection. It sends testDomain as the query. // It uses the baseDialer to create a first-hop connection to the proxy, and the wrap to apply the transport. +// The baseDialer is typically TCPDialer, but it can be replaced for remote measurements. func TestStreamConnectivityWithDNS(ctx context.Context, baseDialer transport.StreamDialer, wrap WrapStreamDialer, resolverAddress string, testDomain string) (*ConnectivityResult, error) { result := &ConnectivityResult{} interceptDialer := transport.FuncStreamDialer(func(ctx context.Context, addr string) (transport.StreamConn, error) { @@ -103,6 +104,7 @@ type WrapPacketDialer func(baseDialer transport.PacketDialer) (transport.PacketD // TestPacketConnectivityWithDNS tests weather we can get a response from a DNS resolver at resolverAddress over a packet connection. It sends testDomain as the query. // It uses the baseDialer to create a first-hop connection to the proxy, and the wrap to apply the transport. +// The baseDialer is typically UDPDialer, but it can be replaced for remote measurements. func TestPacketConnectivityWithDNS(ctx context.Context, baseDialer transport.PacketDialer, wrap WrapPacketDialer, resolverAddress string, testDomain string) (*ConnectivityResult, error) { result := &ConnectivityResult{} interceptDialer := transport.FuncPacketDialer(func(ctx context.Context, addr string) (net.Conn, error) { From 3d750e1d37e626c88dbdf8b81576e4b0d39508a7 Mon Sep 17 00:00:00 2001 From: Vinicius Fortuna Date: Tue, 23 Apr 2024 22:22:30 -0400 Subject: [PATCH 06/27] Collect connections --- x/connectivity/connectivity.go | 70 +++++++++++++++++++++------- x/examples/test-connectivity/main.go | 26 +++++++++-- 2 files changed, 76 insertions(+), 20 deletions(-) diff --git a/x/connectivity/connectivity.go b/x/connectivity/connectivity.go index 9aa5476a..175c9ede 100644 --- a/x/connectivity/connectivity.go +++ b/x/connectivity/connectivity.go @@ -30,7 +30,16 @@ import ( // ConnectivityResult captures the observed result of the connectivity test. type ConnectivityResult struct { // Address we connected to - ConnectionAddress string + Connections []ConnectionResult + // Address of the connection that was selected + SelectedAddress string + // Observed error + Error *ConnectivityError +} + +type ConnectionResult struct { + // Address we connected to + Address string // Observed error Error *ConnectivityError } @@ -80,11 +89,28 @@ type WrapStreamDialer func(baseDialer transport.StreamDialer) (transport.StreamD // It uses the baseDialer to create a first-hop connection to the proxy, and the wrap to apply the transport. // The baseDialer is typically TCPDialer, but it can be replaced for remote measurements. func TestStreamConnectivityWithDNS(ctx context.Context, baseDialer transport.StreamDialer, wrap WrapStreamDialer, resolverAddress string, testDomain string) (*ConnectivityResult, error) { - result := &ConnectivityResult{} + testResult := &ConnectivityResult{ + Connections: make([]ConnectionResult, 0), + } interceptDialer := transport.FuncStreamDialer(func(ctx context.Context, addr string) (transport.StreamConn, error) { - conn, err := baseDialer.DialStream(ctx, addr) - if conn != nil { - result.ConnectionAddress = conn.RemoteAddr().String() + host, port, err := net.SplitHostPort(addr) + if err != nil { + return nil, err + } + ips, err := (&net.Resolver{PreferGo: false}).LookupHost(ctx, host) + var conn transport.StreamConn + for _, ip := range ips { + addr := net.JoinHostPort(ip, port) + connResult := ConnectionResult{Address: addr} + conn, err = baseDialer.DialStream(ctx, addr) + if err != nil { + connResult.Error = makeConnectivityError("connect", err) + } + testResult.Connections = append(testResult.Connections, connResult) + if err == nil { + testResult.SelectedAddress = addr + break + } } return conn, err }) @@ -93,11 +119,11 @@ func TestStreamConnectivityWithDNS(ctx context.Context, baseDialer transport.Str return nil, err } resolver := dns.NewTCPResolver(dialer, resolverAddress) - result.Error, err = TestConnectivityWithResolver(ctx, resolver, testDomain) + testResult.Error, err = TestConnectivityWithResolver(ctx, resolver, testDomain) if err != nil { return nil, err } - return result, nil + return testResult, nil } type WrapPacketDialer func(baseDialer transport.PacketDialer) (transport.PacketDialer, error) @@ -106,11 +132,26 @@ type WrapPacketDialer func(baseDialer transport.PacketDialer) (transport.PacketD // It uses the baseDialer to create a first-hop connection to the proxy, and the wrap to apply the transport. // The baseDialer is typically UDPDialer, but it can be replaced for remote measurements. func TestPacketConnectivityWithDNS(ctx context.Context, baseDialer transport.PacketDialer, wrap WrapPacketDialer, resolverAddress string, testDomain string) (*ConnectivityResult, error) { - result := &ConnectivityResult{} + testResult := &ConnectivityResult{} interceptDialer := transport.FuncPacketDialer(func(ctx context.Context, addr string) (net.Conn, error) { - conn, err := baseDialer.DialPacket(ctx, addr) - if conn != nil { - result.ConnectionAddress = conn.RemoteAddr().String() + host, port, err := net.SplitHostPort(addr) + if err != nil { + return nil, err + } + ips, err := (&net.Resolver{PreferGo: false}).LookupHost(ctx, host) + var conn net.Conn + for _, ip := range ips { + addr := net.JoinHostPort(ip, port) + connResult := ConnectionResult{Address: addr} + conn, err = baseDialer.DialPacket(ctx, addr) + if err != nil { + connResult.Error = makeConnectivityError("connect", err) + } + testResult.Connections = append(testResult.Connections, connResult) + if err == nil { + testResult.SelectedAddress = addr + break + } } return conn, err }) @@ -119,11 +160,8 @@ func TestPacketConnectivityWithDNS(ctx context.Context, baseDialer transport.Pac return nil, err } resolver := dns.NewUDPResolver(dialer, resolverAddress) - result.Error, err = TestConnectivityWithResolver(ctx, resolver, testDomain) - if err != nil { - return nil, err - } - return result, nil + testResult.Error, err = TestConnectivityWithResolver(ctx, resolver, testDomain) + return testResult, err } // TestConnectivityWithResolver tests weather we can get a response from the given [Resolver]. It can be used diff --git a/x/examples/test-connectivity/main.go b/x/examples/test-connectivity/main.go index 7ad2cb56..14e5a916 100644 --- a/x/examples/test-connectivity/main.go +++ b/x/examples/test-connectivity/main.go @@ -48,7 +48,8 @@ type connectivityReport struct { Transport string `json:"transport"` // The address of the selected connection to the proxy server. - ConnectionAddress addressJSON `json:"connection_address"` + Connections []connectionJSON `json:"connections"` + SelectedAddress *addressJSON `json:"selected_address,omitempty"` // Observations Time time.Time `json:"time"` @@ -56,6 +57,11 @@ type connectivityReport struct { Error *errorJSON `json:"error"` } +type connectionJSON struct { + Address *addressJSON `json:"address,omitempty"` + Error *errorJSON `json:"error"` +} + type errorJSON struct { // TODO: add Shadowsocks/Transport error Op string `json:"op,omitempty"` @@ -221,9 +227,21 @@ func main() { DurationMs: testDuration.Milliseconds(), Error: makeErrorRecord(testResult.Error), } - connectionAddressJSON, err := newAddressJSON(testResult.ConnectionAddress) - if err == nil { - r.ConnectionAddress = connectionAddressJSON + for _, cr := range testResult.Connections { + cj := connectionJSON{ + Error: makeErrorRecord(cr.Error), + } + addressJSON, err := newAddressJSON(cr.Address) + if err == nil { + cj.Address = &addressJSON + } + r.Connections = append(r.Connections, cj) + } + if testResult.SelectedAddress != "" { + selectedAddressJSON, err := newAddressJSON(testResult.SelectedAddress) + if err == nil { + r.SelectedAddress = &selectedAddressJSON + } } if reportCollector != nil { From a1732d24d15d81ffbe75638f668619cb9ffd7389 Mon Sep 17 00:00:00 2001 From: Vinicius Fortuna Date: Wed, 24 Apr 2024 12:01:58 -0400 Subject: [PATCH 07/27] Fix --- x/connectivity/connectivity.go | 13 +++++++++++-- 1 file changed, 11 insertions(+), 2 deletions(-) diff --git a/x/connectivity/connectivity.go b/x/connectivity/connectivity.go index 175c9ede..e7e9a958 100644 --- a/x/connectivity/connectivity.go +++ b/x/connectivity/connectivity.go @@ -102,7 +102,10 @@ func TestStreamConnectivityWithDNS(ctx context.Context, baseDialer transport.Str for _, ip := range ips { addr := net.JoinHostPort(ip, port) connResult := ConnectionResult{Address: addr} - conn, err = baseDialer.DialStream(ctx, addr) + deadline := time.Now().Add(1 * time.Second) + ipCtx, cancel := context.WithDeadline(ctx, deadline) + defer cancel() + conn, err = baseDialer.DialStream(ipCtx, addr) if err != nil { connResult.Error = makeConnectivityError("connect", err) } @@ -118,7 +121,13 @@ func TestStreamConnectivityWithDNS(ctx context.Context, baseDialer transport.Str if err != nil { return nil, err } - resolver := dns.NewTCPResolver(dialer, resolverAddress) + resolverConn, err := dialer.DialStream(ctx, resolverAddress) + if err != nil { + return nil, err + } + resolver := dns.NewTCPResolver(transport.FuncStreamDialer(func(ctx context.Context, addr string) (transport.StreamConn, error) { + return resolverConn, nil + }), resolverAddress) testResult.Error, err = TestConnectivityWithResolver(ctx, resolver, testDomain) if err != nil { return nil, err From 053da66eafd316a07df07341273f06b51dcdd9ae Mon Sep 17 00:00:00 2001 From: Vinicius Fortuna Date: Wed, 24 Apr 2024 12:02:12 -0400 Subject: [PATCH 08/27] Timeout --- x/connectivity/connectivity.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/x/connectivity/connectivity.go b/x/connectivity/connectivity.go index e7e9a958..cfc1148e 100644 --- a/x/connectivity/connectivity.go +++ b/x/connectivity/connectivity.go @@ -102,7 +102,7 @@ func TestStreamConnectivityWithDNS(ctx context.Context, baseDialer transport.Str for _, ip := range ips { addr := net.JoinHostPort(ip, port) connResult := ConnectionResult{Address: addr} - deadline := time.Now().Add(1 * time.Second) + deadline := time.Now().Add(5 * time.Second) ipCtx, cancel := context.WithDeadline(ctx, deadline) defer cancel() conn, err = baseDialer.DialStream(ipCtx, addr) From e6f31d0f2eb1bcecde81ca295d114f59c63e1927 Mon Sep 17 00:00:00 2001 From: Vinicius Fortuna Date: Wed, 24 Apr 2024 16:25:23 -0400 Subject: [PATCH 09/27] To fix --- x/connectivity/connectivity.go | 36 ++++++++++++++++++---------- x/examples/test-connectivity/main.go | 11 ++++++--- 2 files changed, 32 insertions(+), 15 deletions(-) diff --git a/x/connectivity/connectivity.go b/x/connectivity/connectivity.go index cfc1148e..628856b7 100644 --- a/x/connectivity/connectivity.go +++ b/x/connectivity/connectivity.go @@ -29,21 +29,30 @@ import ( // ConnectivityResult captures the observed result of the connectivity test. type ConnectivityResult struct { - // Address we connected to - Connections []ConnectionResult + // The result of the initial connect attempt + Connect ConnectResult // Address of the connection that was selected SelectedAddress string // Observed error Error *ConnectivityError } -type ConnectionResult struct { - // Address we connected to - Address string +type ConnectResult struct { + // Address we dialed + DialedAddress string + // Address we selected + SelectedAddress string + // Lists each connection attempt + Attempts []ConnectionAttempt // Observed error Error *ConnectivityError } +type ConnectionAttempt struct { + Address string + Error error +} + // ConnectivityError captures the observed error of the connectivity test. type ConnectivityError struct { // Which operation in the test that failed: "connect", "send" or "receive" @@ -89,10 +98,12 @@ type WrapStreamDialer func(baseDialer transport.StreamDialer) (transport.StreamD // It uses the baseDialer to create a first-hop connection to the proxy, and the wrap to apply the transport. // The baseDialer is typically TCPDialer, but it can be replaced for remote measurements. func TestStreamConnectivityWithDNS(ctx context.Context, baseDialer transport.StreamDialer, wrap WrapStreamDialer, resolverAddress string, testDomain string) (*ConnectivityResult, error) { - testResult := &ConnectivityResult{ - Connections: make([]ConnectionResult, 0), - } + testResult := &ConnectivityResult{} interceptDialer := transport.FuncStreamDialer(func(ctx context.Context, addr string) (transport.StreamConn, error) { + connectResult := &testResult.Connect + // Captures the address of the first hop, before resolution. + connectResult.DialedAddress = addr + connectResult.Attempts = make([]ConnectionAttempt, 0) host, port, err := net.SplitHostPort(addr) if err != nil { return nil, err @@ -101,17 +112,18 @@ func TestStreamConnectivityWithDNS(ctx context.Context, baseDialer transport.Str var conn transport.StreamConn for _, ip := range ips { addr := net.JoinHostPort(ip, port) - connResult := ConnectionResult{Address: addr} + attemptResult := ConnectionAttempt{Address: addr} + // TODO: This is slow. Race and overlap attempts instead. deadline := time.Now().Add(5 * time.Second) ipCtx, cancel := context.WithDeadline(ctx, deadline) defer cancel() conn, err = baseDialer.DialStream(ipCtx, addr) if err != nil { - connResult.Error = makeConnectivityError("connect", err) + attemptResult.Error = err } - testResult.Connections = append(testResult.Connections, connResult) + connectResult.Attempts = append(connectResult.Attempts, attemptResult) if err == nil { - testResult.SelectedAddress = addr + connectResult.SelectedAddress = addr break } } diff --git a/x/examples/test-connectivity/main.go b/x/examples/test-connectivity/main.go index 14e5a916..cc4abe57 100644 --- a/x/examples/test-connectivity/main.go +++ b/x/examples/test-connectivity/main.go @@ -47,9 +47,9 @@ type connectivityReport struct { // TODO(fortuna): add sanitized transport config. Transport string `json:"transport"` - // The address of the selected connection to the proxy server. - Connections []connectionJSON `json:"connections"` - SelectedAddress *addressJSON `json:"selected_address,omitempty"` + // The result for the connection. + Connect connectAttemptJSON `json:"connect"` + SelectedAddress *addressJSON `json:"selected_address,omitempty"` // Observations Time time.Time `json:"time"` @@ -57,6 +57,11 @@ type connectivityReport struct { Error *errorJSON `json:"error"` } +type connectAttemptJSON struct { + Address *addressJSON `json:"address,omitempty"` + Attempts []connectAttemptJSON `json:"attempts,omitempty"` +} + type connectionJSON struct { Address *addressJSON `json:"address,omitempty"` Error *errorJSON `json:"error"` From cad18a9b65ef167a4b141e38be0d89a0f37a5138 Mon Sep 17 00:00:00 2001 From: amir gh Date: Thu, 2 May 2024 09:38:01 -0700 Subject: [PATCH 10/27] loop over intercept dialer --- x/connectivity/connectivity.go | 174 +++++++++++++++++++++------------ 1 file changed, 110 insertions(+), 64 deletions(-) diff --git a/x/connectivity/connectivity.go b/x/connectivity/connectivity.go index 628856b7..fb8e85d9 100644 --- a/x/connectivity/connectivity.go +++ b/x/connectivity/connectivity.go @@ -99,50 +99,71 @@ type WrapStreamDialer func(baseDialer transport.StreamDialer) (transport.StreamD // The baseDialer is typically TCPDialer, but it can be replaced for remote measurements. func TestStreamConnectivityWithDNS(ctx context.Context, baseDialer transport.StreamDialer, wrap WrapStreamDialer, resolverAddress string, testDomain string) (*ConnectivityResult, error) { testResult := &ConnectivityResult{} - interceptDialer := transport.FuncStreamDialer(func(ctx context.Context, addr string) (transport.StreamConn, error) { - connectResult := &testResult.Connect - // Captures the address of the first hop, before resolution. - connectResult.DialedAddress = addr - connectResult.Attempts = make([]ConnectionAttempt, 0) - host, port, err := net.SplitHostPort(addr) - if err != nil { - return nil, err - } - ips, err := (&net.Resolver{PreferGo: false}).LookupHost(ctx, host) - var conn transport.StreamConn - for _, ip := range ips { - addr := net.JoinHostPort(ip, port) - attemptResult := ConnectionAttempt{Address: addr} - // TODO: This is slow. Race and overlap attempts instead. - deadline := time.Now().Add(5 * time.Second) - ipCtx, cancel := context.WithDeadline(ctx, deadline) - defer cancel() - conn, err = baseDialer.DialStream(ipCtx, addr) + i := 0 + iterate := true + for iterate { + interceptDialer := transport.FuncStreamDialer(func(ctx context.Context, addr string) (transport.StreamConn, error) { + connectResult := &testResult.Connect + // Captures the address of the first hop, before resolution. + connectResult.DialedAddress = addr + //connectResult.Attempts = make([]ConnectionAttempt, 0) + host, port, err := net.SplitHostPort(addr) + if err != nil { + return nil, err + } + ips, err := (&net.Resolver{PreferGo: false}).LookupHost(ctx, host) if err != nil { - attemptResult.Error = err + connectResult.Error = makeConnectivityError("resolve", err) + return nil, err } - connectResult.Attempts = append(connectResult.Attempts, attemptResult) - if err == nil { - connectResult.SelectedAddress = addr - break + var conn transport.StreamConn + if i < len(ips) { + ip := ips[i] + i++ + fmt.Printf("Trying address %v\n", ip) + addr := net.JoinHostPort(ip, port) + attemptResult := ConnectionAttempt{Address: addr} + // TODO: This is slow. Race and overlap attempts instead. + // abort loop as soon as one connection is successful + deadline := time.Now().Add(5 * time.Second) + ipCtx, cancel := context.WithDeadline(ctx, deadline) + defer cancel() + conn, err = baseDialer.DialStream(ipCtx, addr) + if err != nil { + attemptResult.Error = errors.Unwrap(err) + } + connectResult.Attempts = append(connectResult.Attempts, attemptResult) + if err == nil { + connectResult.SelectedAddress = addr + iterate = false + } + return conn, err + } else { + iterate = false + return nil, fmt.Errorf("all connect attempts failed. no more addresses to try") } + }) + dialer, err := wrap(interceptDialer) + if err != nil { + continue + //return testResult, err + } + deadline := time.Now().Add(5 * time.Second) + ipCtx, cancel := context.WithDeadline(ctx, deadline) + defer cancel() + resolverConn, err := dialer.DialStream(ipCtx, resolverAddress) + if err != nil { + testResult.Error = makeConnectivityError("connect", err) + continue + } + resolver := dns.NewTCPResolver(transport.FuncStreamDialer(func(ctx context.Context, addr string) (transport.StreamConn, error) { + return resolverConn, nil + }), resolverAddress) + testResult.Error, err = TestConnectivityWithResolver(ctx, resolver, testDomain) + if err != nil { + continue + //return testResult, err } - return conn, err - }) - dialer, err := wrap(interceptDialer) - if err != nil { - return nil, err - } - resolverConn, err := dialer.DialStream(ctx, resolverAddress) - if err != nil { - return nil, err - } - resolver := dns.NewTCPResolver(transport.FuncStreamDialer(func(ctx context.Context, addr string) (transport.StreamConn, error) { - return resolverConn, nil - }), resolverAddress) - testResult.Error, err = TestConnectivityWithResolver(ctx, resolver, testDomain) - if err != nil { - return nil, err } return testResult, nil } @@ -154,35 +175,60 @@ type WrapPacketDialer func(baseDialer transport.PacketDialer) (transport.PacketD // The baseDialer is typically UDPDialer, but it can be replaced for remote measurements. func TestPacketConnectivityWithDNS(ctx context.Context, baseDialer transport.PacketDialer, wrap WrapPacketDialer, resolverAddress string, testDomain string) (*ConnectivityResult, error) { testResult := &ConnectivityResult{} - interceptDialer := transport.FuncPacketDialer(func(ctx context.Context, addr string) (net.Conn, error) { - host, port, err := net.SplitHostPort(addr) - if err != nil { - return nil, err - } - ips, err := (&net.Resolver{PreferGo: false}).LookupHost(ctx, host) - var conn net.Conn - for _, ip := range ips { - addr := net.JoinHostPort(ip, port) - connResult := ConnectionResult{Address: addr} - conn, err = baseDialer.DialPacket(ctx, addr) + connectResult := &testResult.Connect + i := 0 + iterate := true + for iterate { + interceptDialer := transport.FuncPacketDialer(func(ctx context.Context, addr string) (net.Conn, error) { + // Captures the address of the first hop, before resolution. + connectResult.DialedAddress = addr + host, port, err := net.SplitHostPort(addr) + if err != nil { + return nil, err + } + ips, err := (&net.Resolver{PreferGo: false}).LookupHost(ctx, host) if err != nil { - connResult.Error = makeConnectivityError("connect", err) + connectResult.Error = makeConnectivityError("resolve", err) + return nil, err } - testResult.Connections = append(testResult.Connections, connResult) - if err == nil { - testResult.SelectedAddress = addr - break + var conn net.Conn + if i < len(ips) { + ip := ips[i] + i++ + fmt.Printf("Trying address %v\n", ip) + addr := net.JoinHostPort(ip, port) + attemptResult := ConnectionAttempt{Address: addr} + conn, err = baseDialer.DialPacket(ctx, addr) + if err != nil { + attemptResult.Error = errors.Unwrap(err) + } + //testResult.Connections = append(testResult.Connections, connResult) + connectResult.Attempts = append(connectResult.Attempts, attemptResult) + if err == nil { + testResult.SelectedAddress = addr + //iterate = false + } + return conn, err + } else { + iterate = false + return nil, fmt.Errorf("all connect attempts failed. no more addresses to try") } + }) + dialer, err := wrap(interceptDialer) + if err != nil { + testResult.Error = makeConnectivityError("connect", err) + continue + //return nil, err + } + resolver := dns.NewUDPResolver(dialer, resolverAddress) + testResult.Error, err = TestConnectivityWithResolver(ctx, resolver, testDomain) + if err != nil { + continue + //return testResult, err } - return conn, err - }) - dialer, err := wrap(interceptDialer) - if err != nil { - return nil, err } - resolver := dns.NewUDPResolver(dialer, resolverAddress) - testResult.Error, err = TestConnectivityWithResolver(ctx, resolver, testDomain) - return testResult, err + // TODO: error is always being returned as nil; must change this + return testResult, nil } // TestConnectivityWithResolver tests weather we can get a response from the given [Resolver]. It can be used From c3df0ed7b7e522b5d3490c4b3a77c45005383277 Mon Sep 17 00:00:00 2001 From: amir gh Date: Thu, 2 May 2024 09:38:28 -0700 Subject: [PATCH 11/27] code cleanup --- x/examples/test-connectivity/main.go | 19 +++++++++++-------- 1 file changed, 11 insertions(+), 8 deletions(-) diff --git a/x/examples/test-connectivity/main.go b/x/examples/test-connectivity/main.go index cc4abe57..8c5b0737 100644 --- a/x/examples/test-connectivity/main.go +++ b/x/examples/test-connectivity/main.go @@ -58,8 +58,8 @@ type connectivityReport struct { } type connectAttemptJSON struct { - Address *addressJSON `json:"address,omitempty"` - Attempts []connectAttemptJSON `json:"attempts,omitempty"` + Address *addressJSON `json:"address,omitempty"` + Attempts []connectionJSON `json:"attempts,omitempty"` } type connectionJSON struct { @@ -212,7 +212,8 @@ func main() { log.Fatalf(`Invalid proto %v. Must be "tcp" or "udp"`, proto) } if testErr != nil { - log.Fatalf("Connectivity test failed to run: %v", testErr) + //log.Fatalf("Connectivity test failed to run: %v", testErr) + debugLog.Printf("Connectivity test failed to run: %v", testErr) } testDuration := time.Since(startTime) if testResult.Error == nil { @@ -232,16 +233,18 @@ func main() { DurationMs: testDuration.Milliseconds(), Error: makeErrorRecord(testResult.Error), } - for _, cr := range testResult.Connections { - cj := connectionJSON{ - Error: makeErrorRecord(cr.Error), - } + for _, cr := range testResult.Connect.Attempts { + cj := connectionJSON{} addressJSON, err := newAddressJSON(cr.Address) if err == nil { cj.Address = &addressJSON } - r.Connections = append(r.Connections, cj) + if cr.Error != nil { + cj.Error = &errorJSON{Msg: cr.Error.Error()} + } + r.Connect.Attempts = append(r.Connect.Attempts, cj) } + //fmt.Println("setting selected address...") if testResult.SelectedAddress != "" { selectedAddressJSON, err := newAddressJSON(testResult.SelectedAddress) if err == nil { From 4fdf296e1f7c63eccb0cdfe075bd55de10b4b575 Mon Sep 17 00:00:00 2001 From: amir gh Date: Sat, 4 May 2024 19:33:58 -0700 Subject: [PATCH 12/27] changed connectivity result format --- x/connectivity/connectivity.go | 97 +++++++++++++++++++--------------- 1 file changed, 55 insertions(+), 42 deletions(-) diff --git a/x/connectivity/connectivity.go b/x/connectivity/connectivity.go index fb8e85d9..44ee9090 100644 --- a/x/connectivity/connectivity.go +++ b/x/connectivity/connectivity.go @@ -29,28 +29,26 @@ import ( // ConnectivityResult captures the observed result of the connectivity test. type ConnectivityResult struct { - // The result of the initial connect attempt - Connect ConnectResult - // Address of the connection that was selected - SelectedAddress string - // Observed error - Error *ConnectivityError -} - -type ConnectResult struct { - // Address we dialed - DialedAddress string - // Address we selected - SelectedAddress string // Lists each connection attempt Attempts []ConnectionAttempt - // Observed error + // Address of the connection that was selected + Endpoint string + // Start time of the main test + StartTime time.Time + // Duration of the main test + Duration time.Duration + // result error Error *ConnectivityError } type ConnectionAttempt struct { Address string - Error error + // Start time of the connection attempt + StartTime time.Time + // Duration of the connection attempt + Duration time.Duration + // Observed error + Error *ConnectivityError } // ConnectivityError captures the observed error of the connectivity test. @@ -99,21 +97,25 @@ type WrapStreamDialer func(baseDialer transport.StreamDialer) (transport.StreamD // The baseDialer is typically TCPDialer, but it can be replaced for remote measurements. func TestStreamConnectivityWithDNS(ctx context.Context, baseDialer transport.StreamDialer, wrap WrapStreamDialer, resolverAddress string, testDomain string) (*ConnectivityResult, error) { testResult := &ConnectivityResult{} + testResult.StartTime = time.Now() + connectResult := &testResult.Attempts i := 0 iterate := true for iterate { + attempt := &ConnectionAttempt{} interceptDialer := transport.FuncStreamDialer(func(ctx context.Context, addr string) (transport.StreamConn, error) { - connectResult := &testResult.Connect // Captures the address of the first hop, before resolution. - connectResult.DialedAddress = addr - //connectResult.Attempts = make([]ConnectionAttempt, 0) + attempt.StartTime = time.Now() + testResult.Endpoint = addr host, port, err := net.SplitHostPort(addr) if err != nil { + attempt.Duration = time.Since(attempt.StartTime) return nil, err } ips, err := (&net.Resolver{PreferGo: false}).LookupHost(ctx, host) if err != nil { - connectResult.Error = makeConnectivityError("resolve", err) + attempt.Duration = time.Since(attempt.StartTime) + attempt.Error = makeConnectivityError("resolve", err) return nil, err } var conn transport.StreamConn @@ -122,7 +124,7 @@ func TestStreamConnectivityWithDNS(ctx context.Context, baseDialer transport.Str i++ fmt.Printf("Trying address %v\n", ip) addr := net.JoinHostPort(ip, port) - attemptResult := ConnectionAttempt{Address: addr} + attempt.Address = addr // TODO: This is slow. Race and overlap attempts instead. // abort loop as soon as one connection is successful deadline := time.Now().Add(5 * time.Second) @@ -130,16 +132,17 @@ func TestStreamConnectivityWithDNS(ctx context.Context, baseDialer transport.Str defer cancel() conn, err = baseDialer.DialStream(ipCtx, addr) if err != nil { - attemptResult.Error = errors.Unwrap(err) - } - connectResult.Attempts = append(connectResult.Attempts, attemptResult) - if err == nil { - connectResult.SelectedAddress = addr - iterate = false + attempt.Duration = time.Since(attempt.StartTime) + return nil, err } + // if err == nil { + // //connectResult.SelectedAddress = addr + // iterate = false + // } return conn, err } else { iterate = false + attempt.Duration = time.Since(attempt.StartTime) return nil, fmt.Errorf("all connect attempts failed. no more addresses to try") } }) @@ -153,18 +156,24 @@ func TestStreamConnectivityWithDNS(ctx context.Context, baseDialer transport.Str defer cancel() resolverConn, err := dialer.DialStream(ipCtx, resolverAddress) if err != nil { - testResult.Error = makeConnectivityError("connect", err) + attempt.Duration = time.Since(attempt.StartTime) + attempt.Error = makeConnectivityError("connect", err) + *connectResult = append(*connectResult, *attempt) continue } resolver := dns.NewTCPResolver(transport.FuncStreamDialer(func(ctx context.Context, addr string) (transport.StreamConn, error) { return resolverConn, nil }), resolverAddress) - testResult.Error, err = TestConnectivityWithResolver(ctx, resolver, testDomain) + attempt.Error, err = TestConnectivityWithResolver(ctx, resolver, testDomain) + attempt.Duration = time.Since(attempt.StartTime) + *connectResult = append(*connectResult, *attempt) if err != nil { continue //return testResult, err } } + testResult.Duration = time.Since(testResult.StartTime) + fmt.Printf("Test result: %v\n", testResult) return testResult, nil } @@ -175,20 +184,24 @@ type WrapPacketDialer func(baseDialer transport.PacketDialer) (transport.PacketD // The baseDialer is typically UDPDialer, but it can be replaced for remote measurements. func TestPacketConnectivityWithDNS(ctx context.Context, baseDialer transport.PacketDialer, wrap WrapPacketDialer, resolverAddress string, testDomain string) (*ConnectivityResult, error) { testResult := &ConnectivityResult{} - connectResult := &testResult.Connect + connectResult := &testResult.Attempts i := 0 iterate := true for iterate { + attempt := &ConnectionAttempt{} interceptDialer := transport.FuncPacketDialer(func(ctx context.Context, addr string) (net.Conn, error) { + attempt.StartTime = time.Now() // Captures the address of the first hop, before resolution. - connectResult.DialedAddress = addr + testResult.Endpoint = addr host, port, err := net.SplitHostPort(addr) if err != nil { + attempt.Duration = time.Since(attempt.StartTime) return nil, err } ips, err := (&net.Resolver{PreferGo: false}).LookupHost(ctx, host) if err != nil { - connectResult.Error = makeConnectivityError("resolve", err) + attempt.Duration = time.Since(attempt.StartTime) + attempt.Error = makeConnectivityError("resolve", err) return nil, err } var conn net.Conn @@ -197,17 +210,16 @@ func TestPacketConnectivityWithDNS(ctx context.Context, baseDialer transport.Pac i++ fmt.Printf("Trying address %v\n", ip) addr := net.JoinHostPort(ip, port) - attemptResult := ConnectionAttempt{Address: addr} + attempt.Address = addr conn, err = baseDialer.DialPacket(ctx, addr) if err != nil { - attemptResult.Error = errors.Unwrap(err) - } - //testResult.Connections = append(testResult.Connections, connResult) - connectResult.Attempts = append(connectResult.Attempts, attemptResult) - if err == nil { - testResult.SelectedAddress = addr - //iterate = false + attempt.Duration = time.Since(attempt.StartTime) + return nil, err } + // if err == nil { + // testResult.SelectedAddress = addr + // //iterate = false + // } return conn, err } else { iterate = false @@ -216,12 +228,13 @@ func TestPacketConnectivityWithDNS(ctx context.Context, baseDialer transport.Pac }) dialer, err := wrap(interceptDialer) if err != nil { - testResult.Error = makeConnectivityError("connect", err) + attempt.Error = makeConnectivityError("wrap", err) + *connectResult = append(*connectResult, *attempt) continue - //return nil, err } resolver := dns.NewUDPResolver(dialer, resolverAddress) - testResult.Error, err = TestConnectivityWithResolver(ctx, resolver, testDomain) + attempt.Error, err = TestConnectivityWithResolver(ctx, resolver, testDomain) + *connectResult = append(*connectResult, *attempt) if err != nil { continue //return testResult, err From 01a1c7e7d50a98005d79415adeb9c0ec8e03f518 Mon Sep 17 00:00:00 2001 From: amir gh Date: Sat, 4 May 2024 19:34:20 -0700 Subject: [PATCH 13/27] updated json report format --- x/examples/test-connectivity/main.go | 43 ++++++++++++++++------------ 1 file changed, 25 insertions(+), 18 deletions(-) diff --git a/x/examples/test-connectivity/main.go b/x/examples/test-connectivity/main.go index 8c5b0737..0a96da6d 100644 --- a/x/examples/test-connectivity/main.go +++ b/x/examples/test-connectivity/main.go @@ -48,8 +48,7 @@ type connectivityReport struct { Transport string `json:"transport"` // The result for the connection. - Connect connectAttemptJSON `json:"connect"` - SelectedAddress *addressJSON `json:"selected_address,omitempty"` + Result connectivityResult `json:"result"` // Observations Time time.Time `json:"time"` @@ -57,14 +56,16 @@ type connectivityReport struct { Error *errorJSON `json:"error"` } -type connectAttemptJSON struct { - Address *addressJSON `json:"address,omitempty"` - Attempts []connectionJSON `json:"attempts,omitempty"` +type connectivityResult struct { + Endpoint *addressJSON `json:"endpoint,omitempty"` + Attempts []connectionAttemptJSON `json:"attempts,omitempty"` } -type connectionJSON struct { - Address *addressJSON `json:"address,omitempty"` - Error *errorJSON `json:"error"` +type connectionAttemptJSON struct { + Address *addressJSON `json:"address,omitempty"` + Time time.Time `json:"time"` + DurationMs int64 `json:"duration_ms"` + Error *errorJSON `json:"error"` } type errorJSON struct { @@ -233,24 +234,30 @@ func main() { DurationMs: testDuration.Milliseconds(), Error: makeErrorRecord(testResult.Error), } - for _, cr := range testResult.Connect.Attempts { - cj := connectionJSON{} + addressJSON, err := newAddressJSON(testResult.Endpoint) + if err == nil { + r.Result.Endpoint = &addressJSON + } + for _, cr := range testResult.Attempts { + cj := connectionAttemptJSON{} addressJSON, err := newAddressJSON(cr.Address) if err == nil { cj.Address = &addressJSON } + cj.Time = cr.StartTime.UTC().Truncate(time.Second) + cj.DurationMs = cr.Duration.Milliseconds() if cr.Error != nil { - cj.Error = &errorJSON{Msg: cr.Error.Error()} + cj.Error = makeErrorRecord(cr.Error) } - r.Connect.Attempts = append(r.Connect.Attempts, cj) + r.Result.Attempts = append(r.Result.Attempts, cj) } //fmt.Println("setting selected address...") - if testResult.SelectedAddress != "" { - selectedAddressJSON, err := newAddressJSON(testResult.SelectedAddress) - if err == nil { - r.SelectedAddress = &selectedAddressJSON - } - } + // if testResult.SelectedAddress != "" { + // selectedAddressJSON, err := newAddressJSON(testResult.SelectedAddress) + // if err == nil { + // r.SelectedAddress = &selectedAddressJSON + // } + // } if reportCollector != nil { err = reportCollector.Collect(context.Background(), r) From 91dae61e135442da4a33d4cedff3fce3f04f2f58 Mon Sep 17 00:00:00 2001 From: amir gh Date: Thu, 9 May 2024 21:34:41 -0700 Subject: [PATCH 14/27] making tcp tests run concurrently --- x/connectivity/connectivity.go | 165 +++++++++++++++++++-------------- 1 file changed, 95 insertions(+), 70 deletions(-) diff --git a/x/connectivity/connectivity.go b/x/connectivity/connectivity.go index 44ee9090..9e55a295 100644 --- a/x/connectivity/connectivity.go +++ b/x/connectivity/connectivity.go @@ -19,6 +19,7 @@ import ( "errors" "fmt" "net" + "sync" "syscall" "time" @@ -92,6 +93,8 @@ func makeConnectivityError(op string, err error) *ConnectivityError { type WrapStreamDialer func(baseDialer transport.StreamDialer) (transport.StreamDialer, error) +var ErrAllConnectAttemptsFailed = errors.New("all connect attempts failed.") + // TestStreamConnectivityWithDNS tests weather we can get a response from a DNS resolver at resolverAddress over a stream connection. It sends testDomain as the query. // It uses the baseDialer to create a first-hop connection to the proxy, and the wrap to apply the transport. // The baseDialer is typically TCPDialer, but it can be replaced for remote measurements. @@ -99,81 +102,103 @@ func TestStreamConnectivityWithDNS(ctx context.Context, baseDialer transport.Str testResult := &ConnectivityResult{} testResult.StartTime = time.Now() connectResult := &testResult.Attempts - i := 0 - iterate := true - for iterate { - attempt := &ConnectionAttempt{} - interceptDialer := transport.FuncStreamDialer(func(ctx context.Context, addr string) (transport.StreamConn, error) { - // Captures the address of the first hop, before resolution. - attempt.StartTime = time.Now() - testResult.Endpoint = addr - host, port, err := net.SplitHostPort(addr) - if err != nil { - attempt.Duration = time.Since(attempt.StartTime) - return nil, err - } - ips, err := (&net.Resolver{PreferGo: false}).LookupHost(ctx, host) - if err != nil { - attempt.Duration = time.Since(attempt.StartTime) - attempt.Error = makeConnectivityError("resolve", err) - return nil, err - } - var conn transport.StreamConn - if i < len(ips) { - ip := ips[i] - i++ - fmt.Printf("Trying address %v\n", ip) - addr := net.JoinHostPort(ip, port) - attempt.Address = addr - // TODO: This is slow. Race and overlap attempts instead. - // abort loop as soon as one connection is successful - deadline := time.Now().Add(5 * time.Second) - ipCtx, cancel := context.WithDeadline(ctx, deadline) - defer cancel() - conn, err = baseDialer.DialStream(ipCtx, addr) + ipIndex := 0 + done := make(chan bool) + proceed := make(chan bool, 1) + var waitGroup sync.WaitGroup + var mutex sync.Mutex + // Create a new context for canceling goroutines + ctx, cancel := context.WithCancel(ctx) + defer cancel() + proceed <- true +loop: + for { + select { + case <-done: + break loop + case <-proceed: + waitGroup.Add(1) + attempt := &ConnectionAttempt{} + go func(attempt *ConnectionAttempt) { + defer waitGroup.Done() + attempt.StartTime = time.Now() + mutex.Lock() + interceptDialer := transport.FuncStreamDialer(func(ctx context.Context, addr string) (transport.StreamConn, error) { + // Captures the address of the first hop, before resolution. + testResult.Endpoint = addr + host, port, err := net.SplitHostPort(addr) + if err != nil { + attempt.Duration = time.Since(attempt.StartTime) + cancel() + done <- true + return nil, err + } + ips, err := (&net.Resolver{PreferGo: false}).LookupHost(ctx, host) + if err != nil { + cancel() + done <- true + return nil, err + } + var conn transport.StreamConn + if ipIndex < len(ips) { + // proceed to setting up the next test + proceed <- true + ip := ips[ipIndex] + ipIndex++ + addr := net.JoinHostPort(ip, port) + attempt.Address = addr + deadline := time.Now().Add(5 * time.Second) + ipCtx, cancelWithDeadline := context.WithDeadline(ctx, deadline) + defer cancelWithDeadline() + conn, err = baseDialer.DialStream(ipCtx, addr) + if err != nil { + attempt.Duration = time.Since(attempt.StartTime) + return nil, err + } + return conn, err + } else { + // stop iterating + done <- true + attempt.Duration = time.Since(attempt.StartTime) + return nil, ErrAllConnectAttemptsFailed + } + }) + mutex.Unlock() + dialer, err := wrap(interceptDialer) if err != nil { + *connectResult = append(*connectResult, *attempt) + return + } + resolverConn, err := dialer.DialStream(ctx, resolverAddress) + if err != nil { + // do not include cencelled errors in the result + if errors.Is(err, context.Canceled) { + return + } + // do not include the all connect attempts failed error in the result + if errors.Is(err, ErrAllConnectAttemptsFailed) { + return + } attempt.Duration = time.Since(attempt.StartTime) - return nil, err + attempt.Error = makeConnectivityError("connect", err) + *connectResult = append(*connectResult, *attempt) + return } - // if err == nil { - // //connectResult.SelectedAddress = addr - // iterate = false - // } - return conn, err - } else { - iterate = false + resolver := dns.NewTCPResolver(transport.FuncStreamDialer(func(ctx context.Context, addr string) (transport.StreamConn, error) { + return resolverConn, nil + }), resolverAddress) + attempt.Error, _ = TestConnectivityWithResolver(ctx, resolver, testDomain) attempt.Duration = time.Since(attempt.StartTime) - return nil, fmt.Errorf("all connect attempts failed. no more addresses to try") - } - }) - dialer, err := wrap(interceptDialer) - if err != nil { - continue - //return testResult, err - } - deadline := time.Now().Add(5 * time.Second) - ipCtx, cancel := context.WithDeadline(ctx, deadline) - defer cancel() - resolverConn, err := dialer.DialStream(ipCtx, resolverAddress) - if err != nil { - attempt.Duration = time.Since(attempt.StartTime) - attempt.Error = makeConnectivityError("connect", err) - *connectResult = append(*connectResult, *attempt) - continue - } - resolver := dns.NewTCPResolver(transport.FuncStreamDialer(func(ctx context.Context, addr string) (transport.StreamConn, error) { - return resolverConn, nil - }), resolverAddress) - attempt.Error, err = TestConnectivityWithResolver(ctx, resolver, testDomain) - attempt.Duration = time.Since(attempt.StartTime) - *connectResult = append(*connectResult, *attempt) - if err != nil { - continue - //return testResult, err + *connectResult = append(*connectResult, *attempt) + if attempt.Error == nil { + // test has succeeded; cancel the rest of the goroutines + cancel() + } + }(attempt) } } + waitGroup.Wait() testResult.Duration = time.Since(testResult.StartTime) - fmt.Printf("Test result: %v\n", testResult) return testResult, nil } @@ -187,6 +212,7 @@ func TestPacketConnectivityWithDNS(ctx context.Context, baseDialer transport.Pac connectResult := &testResult.Attempts i := 0 iterate := true + //var iterateMutex sync.Mutex for iterate { attempt := &ConnectionAttempt{} interceptDialer := transport.FuncPacketDialer(func(ctx context.Context, addr string) (net.Conn, error) { @@ -223,7 +249,7 @@ func TestPacketConnectivityWithDNS(ctx context.Context, baseDialer transport.Pac return conn, err } else { iterate = false - return nil, fmt.Errorf("all connect attempts failed. no more addresses to try") + return nil, ErrAllConnectAttemptsFailed } }) dialer, err := wrap(interceptDialer) @@ -237,7 +263,6 @@ func TestPacketConnectivityWithDNS(ctx context.Context, baseDialer transport.Pac *connectResult = append(*connectResult, *attempt) if err != nil { continue - //return testResult, err } } // TODO: error is always being returned as nil; must change this From 7b473941f366b09588c8e1ab76399ac466714d46 Mon Sep 17 00:00:00 2001 From: amir gh Date: Mon, 13 May 2024 08:49:05 -0700 Subject: [PATCH 15/27] close clean up / remove mutex --- x/connectivity/connectivity.go | 29 +++++++++++++++++++---------- 1 file changed, 19 insertions(+), 10 deletions(-) diff --git a/x/connectivity/connectivity.go b/x/connectivity/connectivity.go index 9e55a295..d853bf6a 100644 --- a/x/connectivity/connectivity.go +++ b/x/connectivity/connectivity.go @@ -106,7 +106,7 @@ func TestStreamConnectivityWithDNS(ctx context.Context, baseDialer transport.Str done := make(chan bool) proceed := make(chan bool, 1) var waitGroup sync.WaitGroup - var mutex sync.Mutex + //var mutex sync.Mutex // Create a new context for canceling goroutines ctx, cancel := context.WithCancel(ctx) defer cancel() @@ -122,13 +122,13 @@ loop: go func(attempt *ConnectionAttempt) { defer waitGroup.Done() attempt.StartTime = time.Now() - mutex.Lock() + //mutex.Lock() interceptDialer := transport.FuncStreamDialer(func(ctx context.Context, addr string) (transport.StreamConn, error) { // Captures the address of the first hop, before resolution. testResult.Endpoint = addr host, port, err := net.SplitHostPort(addr) if err != nil { - attempt.Duration = time.Since(attempt.StartTime) + //attempt.Duration = time.Since(attempt.StartTime) cancel() done <- true return nil, err @@ -144,26 +144,26 @@ loop: // proceed to setting up the next test proceed <- true ip := ips[ipIndex] + fmt.Printf("Trying address %v\n", ip) ipIndex++ addr := net.JoinHostPort(ip, port) attempt.Address = addr - deadline := time.Now().Add(5 * time.Second) - ipCtx, cancelWithDeadline := context.WithDeadline(ctx, deadline) - defer cancelWithDeadline() + // TODO: pass timeout paramter as argument + ipCtx, cancelWithTimeout := context.WithTimeout(ctx, 5*time.Second) + defer cancelWithTimeout() conn, err = baseDialer.DialStream(ipCtx, addr) if err != nil { - attempt.Duration = time.Since(attempt.StartTime) return nil, err } return conn, err } else { // stop iterating done <- true - attempt.Duration = time.Since(attempt.StartTime) + //attempt.Duration = time.Since(attempt.StartTime) return nil, ErrAllConnectAttemptsFailed } }) - mutex.Unlock() + //mutex.Unlock() dialer, err := wrap(interceptDialer) if err != nil { *connectResult = append(*connectResult, *attempt) @@ -175,13 +175,16 @@ loop: if errors.Is(err, context.Canceled) { return } - // do not include the all connect attempts failed error in the result + // do not include ErrAllConnectAttemptsFailed type error in the attempt result if errors.Is(err, ErrAllConnectAttemptsFailed) { return } attempt.Duration = time.Since(attempt.StartTime) attempt.Error = makeConnectivityError("connect", err) *connectResult = append(*connectResult, *attempt) + // populate main test result error field with + // one of attempt errors if non of the attempts succeeded + testResult.Error = attempt.Error return } resolver := dns.NewTCPResolver(transport.FuncStreamDialer(func(ctx context.Context, addr string) (transport.StreamConn, error) { @@ -191,9 +194,15 @@ loop: attempt.Duration = time.Since(attempt.StartTime) *connectResult = append(*connectResult, *attempt) if attempt.Error == nil { + testResult.Error = nil // test has succeeded; cancel the rest of the goroutines cancel() + } else { + // populate main test result error field with + // one of attempt errors if non of the attempts succeeded + testResult.Error = attempt.Error } + }(attempt) } } From fb791b1bbe6d0caf736ba861d48192c24db96b65 Mon Sep 17 00:00:00 2001 From: amir gh Date: Tue, 14 May 2024 15:34:57 -0700 Subject: [PATCH 16/27] add ip type tag to address json --- x/examples/test-connectivity/main.go | 30 +++++++++++++++++++++++++--- 1 file changed, 27 insertions(+), 3 deletions(-) diff --git a/x/examples/test-connectivity/main.go b/x/examples/test-connectivity/main.go index 0a96da6d..cf51ecef 100644 --- a/x/examples/test-connectivity/main.go +++ b/x/examples/test-connectivity/main.go @@ -78,16 +78,40 @@ type errorJSON struct { } type addressJSON struct { - Host string `json:"host"` - Port string `json:"port"` + Host string `json:"host"` + Port string `json:"port"` + AddressType IPType `json:"ip_type"` } +type IPType string + +const ( + IPv4 IPType = "v4" + IPv6 IPType = "v6" +) + func newAddressJSON(address string) (addressJSON, error) { host, port, err := net.SplitHostPort(address) if err != nil { return addressJSON{}, err } - return addressJSON{host, port}, nil + ipType, err := checkIPVersion(host) + if err != nil { + return addressJSON{}, err + } + return addressJSON{host, port, ipType}, nil +} + +func checkIPVersion(ipStr string) (IPType, error) { + ip := net.ParseIP(ipStr) + if ip == nil { + return "", fmt.Errorf("invalid ip address: %s", ipStr) + } + if ip.To4() != nil { + return IPv4, nil + } else { + return IPv6, nil + } } func makeErrorRecord(result *connectivity.ConnectivityError) *errorJSON { From 1fe55fafe29a589c476da91af8211de18df7de99 Mon Sep 17 00:00:00 2001 From: amir gh Date: Tue, 14 May 2024 15:35:23 -0700 Subject: [PATCH 17/27] polish comments and code --- x/connectivity/connectivity.go | 21 ++++++++++----------- 1 file changed, 10 insertions(+), 11 deletions(-) diff --git a/x/connectivity/connectivity.go b/x/connectivity/connectivity.go index d853bf6a..9a54ed8b 100644 --- a/x/connectivity/connectivity.go +++ b/x/connectivity/connectivity.go @@ -93,14 +93,13 @@ func makeConnectivityError(op string, err error) *ConnectivityError { type WrapStreamDialer func(baseDialer transport.StreamDialer) (transport.StreamDialer, error) -var ErrAllConnectAttemptsFailed = errors.New("all connect attempts failed.") +var ErrAllConnectAttemptsFailed = errors.New("all connect attempts failed") // TestStreamConnectivityWithDNS tests weather we can get a response from a DNS resolver at resolverAddress over a stream connection. It sends testDomain as the query. // It uses the baseDialer to create a first-hop connection to the proxy, and the wrap to apply the transport. // The baseDialer is typically TCPDialer, but it can be replaced for remote measurements. func TestStreamConnectivityWithDNS(ctx context.Context, baseDialer transport.StreamDialer, wrap WrapStreamDialer, resolverAddress string, testDomain string) (*ConnectivityResult, error) { testResult := &ConnectivityResult{} - testResult.StartTime = time.Now() connectResult := &testResult.Attempts ipIndex := 0 done := make(chan bool) @@ -111,6 +110,7 @@ func TestStreamConnectivityWithDNS(ctx context.Context, baseDialer transport.Str ctx, cancel := context.WithCancel(ctx) defer cancel() proceed <- true + testResult.StartTime = time.Now() loop: for { select { @@ -122,13 +122,11 @@ loop: go func(attempt *ConnectionAttempt) { defer waitGroup.Done() attempt.StartTime = time.Now() - //mutex.Lock() interceptDialer := transport.FuncStreamDialer(func(ctx context.Context, addr string) (transport.StreamConn, error) { // Captures the address of the first hop, before resolution. testResult.Endpoint = addr host, port, err := net.SplitHostPort(addr) if err != nil { - //attempt.Duration = time.Since(attempt.StartTime) cancel() done <- true return nil, err @@ -144,7 +142,7 @@ loop: // proceed to setting up the next test proceed <- true ip := ips[ipIndex] - fmt.Printf("Trying address %v\n", ip) + //fmt.Printf("Trying address %v\n", ip) ipIndex++ addr := net.JoinHostPort(ip, port) attempt.Address = addr @@ -159,11 +157,9 @@ loop: } else { // stop iterating done <- true - //attempt.Duration = time.Since(attempt.StartTime) return nil, ErrAllConnectAttemptsFailed } }) - //mutex.Unlock() dialer, err := wrap(interceptDialer) if err != nil { *connectResult = append(*connectResult, *attempt) @@ -171,18 +167,18 @@ loop: } resolverConn, err := dialer.DialStream(ctx, resolverAddress) if err != nil { - // do not include cencelled errors in the result + // Do not include cencelled errors in the result if errors.Is(err, context.Canceled) { return } - // do not include ErrAllConnectAttemptsFailed type error in the attempt result + // Do not include ErrAllConnectAttemptsFailed type error in the attempt result if errors.Is(err, ErrAllConnectAttemptsFailed) { return } attempt.Duration = time.Since(attempt.StartTime) attempt.Error = makeConnectivityError("connect", err) *connectResult = append(*connectResult, *attempt) - // populate main test result error field with + // CHANGE: populate main test result error field with // one of attempt errors if non of the attempts succeeded testResult.Error = attempt.Error return @@ -190,6 +186,9 @@ loop: resolver := dns.NewTCPResolver(transport.FuncStreamDialer(func(ctx context.Context, addr string) (transport.StreamConn, error) { return resolverConn, nil }), resolverAddress) + // I am igniring the error returned by TestConnectivityWithResolver + // because I am already capturing the error in the attempt. Not sure + // if this is the right approach. attempt.Error, _ = TestConnectivityWithResolver(ctx, resolver, testDomain) attempt.Duration = time.Since(attempt.StartTime) *connectResult = append(*connectResult, *attempt) @@ -198,7 +197,7 @@ loop: // test has succeeded; cancel the rest of the goroutines cancel() } else { - // populate main test result error field with + // CHANGE: populate main test result error field with // one of attempt errors if non of the attempts succeeded testResult.Error = attempt.Error } From bb1090e6945ce905ace81ce9ab88e1ce8676aa8d Mon Sep 17 00:00:00 2001 From: amir gh Date: Tue, 14 May 2024 20:09:58 -0700 Subject: [PATCH 18/27] make udp tests concurrent --- x/connectivity/connectivity.go | 169 ++++++++++++++++++++++----------- 1 file changed, 116 insertions(+), 53 deletions(-) diff --git a/x/connectivity/connectivity.go b/x/connectivity/connectivity.go index 9a54ed8b..d8be275f 100644 --- a/x/connectivity/connectivity.go +++ b/x/connectivity/connectivity.go @@ -105,7 +105,6 @@ func TestStreamConnectivityWithDNS(ctx context.Context, baseDialer transport.Str done := make(chan bool) proceed := make(chan bool, 1) var waitGroup sync.WaitGroup - //var mutex sync.Mutex // Create a new context for canceling goroutines ctx, cancel := context.WithCancel(ctx) defer cancel() @@ -162,6 +161,8 @@ loop: }) dialer, err := wrap(interceptDialer) if err != nil { + attempt.Duration = time.Since(attempt.StartTime) + attempt.Error.Err = err *connectResult = append(*connectResult, *attempt) return } @@ -186,7 +187,7 @@ loop: resolver := dns.NewTCPResolver(transport.FuncStreamDialer(func(ctx context.Context, addr string) (transport.StreamConn, error) { return resolverConn, nil }), resolverAddress) - // I am igniring the error returned by TestConnectivityWithResolver + // I am ignoring the error returned by TestConnectivityWithResolver // because I am already capturing the error in the attempt. Not sure // if this is the right approach. attempt.Error, _ = TestConnectivityWithResolver(ctx, resolver, testDomain) @@ -201,7 +202,6 @@ loop: // one of attempt errors if non of the attempts succeeded testResult.Error = attempt.Error } - }(attempt) } } @@ -218,62 +218,119 @@ type WrapPacketDialer func(baseDialer transport.PacketDialer) (transport.PacketD func TestPacketConnectivityWithDNS(ctx context.Context, baseDialer transport.PacketDialer, wrap WrapPacketDialer, resolverAddress string, testDomain string) (*ConnectivityResult, error) { testResult := &ConnectivityResult{} connectResult := &testResult.Attempts - i := 0 - iterate := true - //var iterateMutex sync.Mutex - for iterate { - attempt := &ConnectionAttempt{} - interceptDialer := transport.FuncPacketDialer(func(ctx context.Context, addr string) (net.Conn, error) { - attempt.StartTime = time.Now() - // Captures the address of the first hop, before resolution. - testResult.Endpoint = addr - host, port, err := net.SplitHostPort(addr) - if err != nil { - attempt.Duration = time.Since(attempt.StartTime) - return nil, err - } - ips, err := (&net.Resolver{PreferGo: false}).LookupHost(ctx, host) - if err != nil { - attempt.Duration = time.Since(attempt.StartTime) - attempt.Error = makeConnectivityError("resolve", err) - return nil, err - } - var conn net.Conn - if i < len(ips) { - ip := ips[i] - i++ - fmt.Printf("Trying address %v\n", ip) - addr := net.JoinHostPort(ip, port) - attempt.Address = addr - conn, err = baseDialer.DialPacket(ctx, addr) + ipIndex := 0 + done := make(chan bool) + proceed := make(chan bool, 1) + var waitGroup sync.WaitGroup + // Create a new context for canceling goroutines + ctx, cancel := context.WithCancel(ctx) + defer cancel() + proceed <- true + testResult.StartTime = time.Now() +loop: + for { + select { + case <-done: + break loop + case <-proceed: + waitGroup.Add(1) + attempt := &ConnectionAttempt{} + go func(attempt *ConnectionAttempt) { + defer waitGroup.Done() + attempt.StartTime = time.Now() + interceptDialer := transport.FuncPacketDialer(func(ctx context.Context, addr string) (net.Conn, error) { + // Captures the address of the first hop, before resolution. + testResult.Endpoint = addr + host, port, err := net.SplitHostPort(addr) + if err != nil { + cancel() + done <- true + return nil, err + } + ips, err := (&net.Resolver{PreferGo: false}).LookupHost(ctx, host) + if err != nil { + cancel() + done <- true + return nil, err + } + var conn net.Conn + if ipIndex < len(ips) { + // proceed to setting up the next test + proceed <- true + ip := ips[ipIndex] + ipIndex++ + fmt.Printf("Trying address %v\n", ip) + addr = net.JoinHostPort(ip, port) + attempt.Address = addr + // TODO: pass timeout paramter as argument + //ipCtx, cancelWithTimeout := context.WithTimeout(ctx, 5*time.Second) + //defer cancelWithTimeout() + conn, err = baseDialer.DialPacket(ctx, addr) + if err != nil { + return nil, err + } + return conn, err + } else { + // stop iterating + done <- true + return nil, ErrAllConnectAttemptsFailed + } + }) + dialer, err := wrap(interceptDialer) + if err != nil { + fmt.Println("wrap failed...") + *connectResult = append(*connectResult, *attempt) + return + } + resolverConn, err := dialer.DialPacket(ctx, resolverAddress) if err != nil { + // Do not include cencelled errors in the result + if errors.Is(err, context.Canceled) { + fmt.Println("context is being cancelled...") + return + } + // Do not include ErrAllConnectAttemptsFailed type error in the attempt result + if errors.Is(err, ErrAllConnectAttemptsFailed) { + return + } attempt.Duration = time.Since(attempt.StartTime) - return nil, err + attempt.Error = makeConnectivityError("connect", err) + *connectResult = append(*connectResult, *attempt) + // CHANGE: populate main test result error field with + // one of attempt errors if non of the attempts succeeded + testResult.Error = attempt.Error + return } - // if err == nil { - // testResult.SelectedAddress = addr - // //iterate = false - // } - return conn, err - } else { - iterate = false - return nil, ErrAllConnectAttemptsFailed - } - }) - dialer, err := wrap(interceptDialer) - if err != nil { - attempt.Error = makeConnectivityError("wrap", err) - *connectResult = append(*connectResult, *attempt) - continue - } - resolver := dns.NewUDPResolver(dialer, resolverAddress) - attempt.Error, err = TestConnectivityWithResolver(ctx, resolver, testDomain) - *connectResult = append(*connectResult, *attempt) - if err != nil { - continue + resolver := dns.NewUDPResolver(transport.FuncPacketDialer(func(ctx context.Context, addr string) (net.Conn, error) { + return resolverConn, nil + }), resolverAddress) + //resolver := dns.NewUDPResolver(dialer, resolverAddress) + attempt.Error, err = TestConnectivityWithResolver(ctx, resolver, testDomain) + if err != nil { + fmt.Println("udp test failed...") + } + attempt.Duration = time.Since(attempt.StartTime) + *connectResult = append(*connectResult, *attempt) + if attempt.Error == nil { + testResult.Error = nil + // test has succeeded; cancel the rest of the goroutines + //done <- true + fmt.Println("success found aborting all tests...") + cancel() + //done <- true + //return + } else { + // CHANGE: populate main test result error field with + // one of attempt errors if non of the attempts succeeded + testResult.Error = attempt.Error + //return + } + }(attempt) } } // TODO: error is always being returned as nil; must change this + waitGroup.Wait() + testResult.Duration = time.Since(testResult.StartTime) return testResult, nil } @@ -301,6 +358,12 @@ func TestConnectivityWithResolver(ctx context.Context, resolver dns.Resolver, te if errors.Is(err, dns.ErrBadRequest) { return nil, err } + if errors.Is(err, ErrAllConnectAttemptsFailed) { + fmt.Println("all connect attempts failed...") + } + if errors.Is(err, context.Canceled) { + fmt.Println("context cancelled...") + } if errors.Is(err, dns.ErrDial) { return makeConnectivityError("connect", err), nil } else if errors.Is(err, dns.ErrSend) { From e4e49151b155463ea68068304d7914971a18d5b7 Mon Sep 17 00:00:00 2001 From: amir gh Date: Tue, 14 May 2024 20:10:26 -0700 Subject: [PATCH 19/27] cancel pending read/writes --- dns/resolver.go | 22 +++++++++++++++++++--- 1 file changed, 19 insertions(+), 3 deletions(-) diff --git a/dns/resolver.go b/dns/resolver.go index 92b83a97..dda59e7a 100644 --- a/dns/resolver.go +++ b/dns/resolver.go @@ -186,16 +186,16 @@ func queryDatagram(conn io.ReadWriter, q dnsmessage.Question) (*dnsmessage.Messa err = nil } if err != nil { - return nil, &nestedError{ErrReceive, errors.Join(returnErr, fmt.Errorf("read message failed: %w", err))} + return nil, &nestedError{ErrReceive, wrappErrors(returnErr, fmt.Errorf("read message failed: %w", err))} } var msg dnsmessage.Message if err := msg.Unpack(buf[:n]); err != nil { - returnErr = errors.Join(returnErr, err) + returnErr = wrappErrors(returnErr, err) // Ignore invalid packets that fail to parse. It could be injected. continue } if err := checkResponse(id, q, msg.Header, msg.Questions); err != nil { - returnErr = errors.Join(returnErr, err) + returnErr = wrappErrors(returnErr, err) continue } return &msg, nil @@ -268,6 +268,11 @@ func NewUDPResolver(pd transport.PacketDialer, resolverAddr string) Resolver { return nil, &nestedError{ErrDial, err} } defer conn.Close() + // force close connection is context is done/cancelled. + go func() { + <-ctx.Done() + conn.Close() + }() if deadline, ok := ctx.Deadline(); ok { conn.SetDeadline(deadline) } @@ -391,3 +396,14 @@ func NewHTTPSResolver(sd transport.StreamDialer, resolverAddr string, url string return &msg, nil }) } + +func wrappErrors(err1, err2 error) error { + switch { + case err1 == nil: + return err2 + case err2 == nil: + return err1 + default: + return fmt.Errorf("%v: %w", err1, err2) + } +} From cb83523572a4497e8b70491381e2c33a7cb1d4b2 Mon Sep 17 00:00:00 2001 From: amir gh Date: Wed, 15 May 2024 11:02:00 -0700 Subject: [PATCH 20/27] do not capture conn closed err --- x/connectivity/connectivity.go | 39 +++++++++++++++------------------- 1 file changed, 17 insertions(+), 22 deletions(-) diff --git a/x/connectivity/connectivity.go b/x/connectivity/connectivity.go index d8be275f..15663f57 100644 --- a/x/connectivity/connectivity.go +++ b/x/connectivity/connectivity.go @@ -93,7 +93,7 @@ func makeConnectivityError(op string, err error) *ConnectivityError { type WrapStreamDialer func(baseDialer transport.StreamDialer) (transport.StreamDialer, error) -var ErrAllConnectAttemptsFailed = errors.New("all connect attempts failed") +var ErrNoMoreIPs = errors.New("no more ips to try") // TestStreamConnectivityWithDNS tests weather we can get a response from a DNS resolver at resolverAddress over a stream connection. It sends testDomain as the query. // It uses the baseDialer to create a first-hop connection to the proxy, and the wrap to apply the transport. @@ -156,7 +156,7 @@ loop: } else { // stop iterating done <- true - return nil, ErrAllConnectAttemptsFailed + return nil, ErrNoMoreIPs } }) dialer, err := wrap(interceptDialer) @@ -172,8 +172,8 @@ loop: if errors.Is(err, context.Canceled) { return } - // Do not include ErrAllConnectAttemptsFailed type error in the attempt result - if errors.Is(err, ErrAllConnectAttemptsFailed) { + // Do not include ErrNoMoreIPs type error in the attempt result + if errors.Is(err, ErrNoMoreIPs) { return } attempt.Duration = time.Since(attempt.StartTime) @@ -259,12 +259,9 @@ loop: proceed <- true ip := ips[ipIndex] ipIndex++ - fmt.Printf("Trying address %v\n", ip) + //fmt.Printf("Trying address %v\n", ip) addr = net.JoinHostPort(ip, port) attempt.Address = addr - // TODO: pass timeout paramter as argument - //ipCtx, cancelWithTimeout := context.WithTimeout(ctx, 5*time.Second) - //defer cancelWithTimeout() conn, err = baseDialer.DialPacket(ctx, addr) if err != nil { return nil, err @@ -273,7 +270,7 @@ loop: } else { // stop iterating done <- true - return nil, ErrAllConnectAttemptsFailed + return nil, ErrNoMoreIPs } }) dialer, err := wrap(interceptDialer) @@ -285,12 +282,13 @@ loop: resolverConn, err := dialer.DialPacket(ctx, resolverAddress) if err != nil { // Do not include cencelled errors in the result + // This never gets triggered in PacketDialer since + // connect is not a blocking operation; we can remove it later if errors.Is(err, context.Canceled) { - fmt.Println("context is being cancelled...") return } - // Do not include ErrAllConnectAttemptsFailed type error in the attempt result - if errors.Is(err, ErrAllConnectAttemptsFailed) { + // Do not include ErrNoMoreIPs type error in the attempt result + if errors.Is(err, ErrNoMoreIPs) { return } attempt.Duration = time.Since(attempt.StartTime) @@ -307,18 +305,15 @@ loop: //resolver := dns.NewUDPResolver(dialer, resolverAddress) attempt.Error, err = TestConnectivityWithResolver(ctx, resolver, testDomain) if err != nil { - fmt.Println("udp test failed...") + //fmt.Printf("Test failed: %v\n", err) + return } attempt.Duration = time.Since(attempt.StartTime) *connectResult = append(*connectResult, *attempt) if attempt.Error == nil { testResult.Error = nil // test has succeeded; cancel the rest of the goroutines - //done <- true - fmt.Println("success found aborting all tests...") cancel() - //done <- true - //return } else { // CHANGE: populate main test result error field with // one of attempt errors if non of the attempts succeeded @@ -356,13 +351,13 @@ func TestConnectivityWithResolver(ctx context.Context, resolver dns.Resolver, te _, err = resolver.Query(ctx, *q) if errors.Is(err, dns.ErrBadRequest) { + // maybe change this to include err in report? return nil, err } - if errors.Is(err, ErrAllConnectAttemptsFailed) { - fmt.Println("all connect attempts failed...") - } - if errors.Is(err, context.Canceled) { - fmt.Println("context cancelled...") + // If the connection is force cancelled, + // we don't want to report an error. + if errors.Is(err, net.ErrClosed) { + return nil, err } if errors.Is(err, dns.ErrDial) { return makeConnectivityError("connect", err), nil From bdbbc5e884b87ff63f7754d0fcd22d378a0d813d Mon Sep 17 00:00:00 2001 From: amir gh Date: Mon, 1 Jul 2024 15:36:41 -0700 Subject: [PATCH 21/27] improvement to concurrency --- x/connectivity/connectivity.go | 35 ++++++++++++++++++++++++---------- 1 file changed, 25 insertions(+), 10 deletions(-) diff --git a/x/connectivity/connectivity.go b/x/connectivity/connectivity.go index 15663f57..255bbee5 100644 --- a/x/connectivity/connectivity.go +++ b/x/connectivity/connectivity.go @@ -132,6 +132,7 @@ loop: } ips, err := (&net.Resolver{PreferGo: false}).LookupHost(ctx, host) if err != nil { + fmt.Println("LookupHost failed: ", err) cancel() done <- true return nil, err @@ -141,7 +142,7 @@ loop: // proceed to setting up the next test proceed <- true ip := ips[ipIndex] - //fmt.Printf("Trying address %v\n", ip) + fmt.Printf("Trying IP address %v\n", ip) ipIndex++ addr := net.JoinHostPort(ip, port) attempt.Address = addr @@ -155,21 +156,22 @@ loop: return conn, err } else { // stop iterating + // fmt.Println("No more IPs to try") done <- true return nil, ErrNoMoreIPs } }) dialer, err := wrap(interceptDialer) if err != nil { - attempt.Duration = time.Since(attempt.StartTime) - attempt.Error.Err = err - *connectResult = append(*connectResult, *attempt) + fmt.Printf("wrap failed: %v\n", err) + done <- true return } resolverConn, err := dialer.DialStream(ctx, resolverAddress) if err != nil { // Do not include cencelled errors in the result if errors.Is(err, context.Canceled) { + fmt.Println("Context cancelled") return } // Do not include ErrNoMoreIPs type error in the attempt result @@ -190,7 +192,10 @@ loop: // I am ignoring the error returned by TestConnectivityWithResolver // because I am already capturing the error in the attempt. Not sure // if this is the right approach. - attempt.Error, _ = TestConnectivityWithResolver(ctx, resolver, testDomain) + attempt.Error, err = TestConnectivityWithResolver(ctx, resolver, testDomain) + if err != nil { + return + } attempt.Duration = time.Since(attempt.StartTime) *connectResult = append(*connectResult, *attempt) if attempt.Error == nil { @@ -259,7 +264,7 @@ loop: proceed <- true ip := ips[ipIndex] ipIndex++ - //fmt.Printf("Trying address %v\n", ip) + fmt.Printf("Trying address %v\n", ip) addr = net.JoinHostPort(ip, port) attempt.Address = addr conn, err = baseDialer.DialPacket(ctx, addr) @@ -275,8 +280,8 @@ loop: }) dialer, err := wrap(interceptDialer) if err != nil { - fmt.Println("wrap failed...") - *connectResult = append(*connectResult, *attempt) + fmt.Printf("wrap failed: %v\n", err) + done <- true return } resolverConn, err := dialer.DialPacket(ctx, resolverAddress) @@ -305,7 +310,7 @@ loop: //resolver := dns.NewUDPResolver(dialer, resolverAddress) attempt.Error, err = TestConnectivityWithResolver(ctx, resolver, testDomain) if err != nil { - //fmt.Printf("Test failed: %v\n", err) + fmt.Printf("Test failed: %v\n", err) return } attempt.Duration = time.Since(attempt.StartTime) @@ -337,7 +342,7 @@ loop: func TestConnectivityWithResolver(ctx context.Context, resolver dns.Resolver, testDomain string) (*ConnectivityError, error) { if _, ok := ctx.Deadline(); !ok { // Default deadline is 5 seconds. - deadline := time.Now().Add(5 * time.Second) + deadline := time.Now().Add(8 * time.Second) var cancel context.CancelFunc ctx, cancel = context.WithDeadline(ctx, deadline) // Releases the timer. @@ -350,15 +355,25 @@ func TestConnectivityWithResolver(ctx context.Context, resolver dns.Resolver, te _, err = resolver.Query(ctx, *q) + // This code block has an issue since + // it does not distinguish between a context deadline exeeded + // and context cancelled error. + // if ctx.Err() != nil { + // fmt.Println("Context error: ", ctx.Err()) + // return nil, ctx.Err() + // } + if errors.Is(err, dns.ErrBadRequest) { // maybe change this to include err in report? return nil, err } // If the connection is force cancelled, // we don't want to report an error. + // Fortuna suggestion: ctx.Err() is not nil if errors.Is(err, net.ErrClosed) { return nil, err } + if errors.Is(err, dns.ErrDial) { return makeConnectivityError("connect", err), nil } else if errors.Is(err, dns.ErrSend) { From 08e43817ae8a7047d13ca9c10bb1167d86a63ea9 Mon Sep 17 00:00:00 2001 From: amir gh Date: Mon, 1 Jul 2024 15:38:00 -0700 Subject: [PATCH 22/27] bug fix and code clean up --- x/examples/test-connectivity/main.go | 58 +++++++++++++++++++++------- 1 file changed, 45 insertions(+), 13 deletions(-) diff --git a/x/examples/test-connectivity/main.go b/x/examples/test-connectivity/main.go index cf51ecef..9e027ffd 100644 --- a/x/examples/test-connectivity/main.go +++ b/x/examples/test-connectivity/main.go @@ -20,7 +20,6 @@ import ( "errors" "flag" "fmt" - "io" "log" "net" "net/http" @@ -36,9 +35,9 @@ import ( "github.com/Jigsaw-Code/outline-sdk/x/report" ) -var debugLog log.Logger = *log.New(io.Discard, "", 0) +var debugLog log.Logger = *log.New(os.Stderr, "", 0) -// var errorLog log.Logger = *log.New(os.Stderr, "[ERROR] ", log.LstdFlags|log.Lmicroseconds|log.Lshortfile) +//var errorLog log.Logger = *log.New(os.Stderr, "[ERROR] ", log.LstdFlags|log.Lmicroseconds|log.Lshortfile) type connectivityReport struct { // Inputs @@ -80,14 +79,15 @@ type errorJSON struct { type addressJSON struct { Host string `json:"host"` Port string `json:"port"` - AddressType IPType `json:"ip_type"` + AddressType IPType `json:"ip_type,omitempty"` } type IPType string const ( - IPv4 IPType = "v4" - IPv6 IPType = "v6" + IPv4 IPType = "v4" + IPv6 IPType = "v6" + Domain IPType = "" ) func newAddressJSON(address string) (addressJSON, error) { @@ -97,7 +97,8 @@ func newAddressJSON(address string) (addressJSON, error) { } ipType, err := checkIPVersion(host) if err != nil { - return addressJSON{}, err + // If the address is not an IP, it is a domain name + return addressJSON{Host: host, Port: port, AddressType: Domain}, nil } return addressJSON{host, port, ipType}, nil } @@ -105,7 +106,7 @@ func newAddressJSON(address string) (addressJSON, error) { func checkIPVersion(ipStr string) (IPType, error) { ip := net.ParseIP(ipStr) if ip == nil { - return "", fmt.Errorf("invalid ip address: %s", ipStr) + return Domain, fmt.Errorf("invalid ip address: %s", ipStr) } if ip.To4() != nil { return IPv4, nil @@ -153,6 +154,7 @@ func init() { func main() { verboseFlag := flag.Bool("v", false, "Enable debug output") transportFlag := flag.String("transport", "", "Transport config") + baseTransportFlag := flag.String("base-transport", "", "Remote Transport config") domainFlag := flag.String("domain", "example.com.", "Domain name to resolve in the test") resolverFlag := flag.String("resolver", "8.8.8.8,2001:4860:4860::8888", "Comma-separated list of addresses of DNS resolver to use for the test") protoFlag := flag.String("proto", "tcp,udp", "Comma-separated list of the protocols to test. Must be \"tcp\", \"udp\", or a combination of them") @@ -179,6 +181,26 @@ func main() { debugLog = *log.New(os.Stderr, "[DEBUG] ", log.LstdFlags|log.Lmicroseconds|log.Lshortfile) } + baseDialer, err := config.NewDefaultConfigParser().WrapStreamDialer(&transport.TCPDialer{}, *baseTransportFlag) + if err != nil { + log.Fatalf("Could not create dialer: %v\n", err) + } + dialContext := func(ctx context.Context, network, addr string) (net.Conn, error) { + host, port, err := net.SplitHostPort(addr) + if err != nil { + return nil, fmt.Errorf("invalid address: %w", err) + } + if !strings.HasPrefix(network, "tcp") { + return nil, fmt.Errorf("protocol not supported: %v", network) + } + return baseDialer.DialStream(ctx, net.JoinHostPort(host, port)) + } + // TODO: provide timeout as input argument to cli + httpClient := &http.Client{ + Transport: &http.Transport{DialContext: dialContext}, + Timeout: 30 * time.Second, + } + var reportCollector report.Collector if *reportToFlag != "" { collectorURL, err := url.Parse(*reportToFlag) @@ -187,11 +209,11 @@ func main() { } remoteCollector := &report.RemoteCollector{ CollectorURL: collectorURL, - HttpClient: &http.Client{Timeout: 10 * time.Second}, + HttpClient: httpClient, } retryCollector := &report.RetryCollector{ Collector: remoteCollector, - MaxRetry: 3, + MaxRetry: 1, InitialDelay: 1 * time.Second, } reportCollector = &report.SamplingCollector{ @@ -224,15 +246,23 @@ func main() { startTime := time.Now() switch proto { case "tcp": + base, err := configParser.WrapStreamDialer(&transport.TCPDialer{}, *baseTransportFlag) + if err != nil { + log.Fatalf("Could not create base dialer: %v\n", err) + } wrap := func(baseDialer transport.StreamDialer) (transport.StreamDialer, error) { return config.WrapStreamDialer(baseDialer, *transportFlag) } - testResult, testErr = connectivity.TestStreamConnectivityWithDNS(context.Background(), &transport.TCPDialer{}, wrap, resolverAddress, *domainFlag) + testResult, testErr = connectivity.TestStreamConnectivityWithDNS(context.Background(), base, wrap, resolverAddress, *domainFlag) case "udp": + base, err := configParser.WrapPacketDialer(&transport.UDPDialer{}, *baseTransportFlag) + if err != nil { + log.Fatalf("Could not create base dialer: %v\n", err) + } wrap := func(baseDialer transport.PacketDialer) (transport.PacketDialer, error) { return config.WrapPacketDialer(baseDialer, *transportFlag) } - testResult, testErr = connectivity.TestPacketConnectivityWithDNS(context.Background(), &transport.UDPDialer{}, wrap, resolverAddress, *domainFlag) + testResult, testErr = connectivity.TestPacketConnectivityWithDNS(context.Background(), base, wrap, resolverAddress, *domainFlag) default: log.Fatalf(`Invalid proto %v. Must be "tcp" or "udp"`, proto) } @@ -283,7 +313,9 @@ func main() { // } // } - if reportCollector != nil { + if reportCollector != nil && r.Result.Attempts != nil { + // do not report if there are no attempts + fmt.Println("sending the report....") err = reportCollector.Collect(context.Background(), r) if err != nil { debugLog.Printf("Failed to collect report: %v\n", err) From 60eed451bc3a88625b19e5a92ca5ff9fb4d0bd52 Mon Sep 17 00:00:00 2001 From: amir gh Date: Tue, 30 Jul 2024 15:48:21 -0600 Subject: [PATCH 23/27] migrating to new config design --- x/examples/test-connectivity/main.go | 15 ++++++++------- 1 file changed, 8 insertions(+), 7 deletions(-) diff --git a/x/examples/test-connectivity/main.go b/x/examples/test-connectivity/main.go index 9e027ffd..01541fbd 100644 --- a/x/examples/test-connectivity/main.go +++ b/x/examples/test-connectivity/main.go @@ -180,8 +180,8 @@ func main() { if *verboseFlag { debugLog = *log.New(os.Stderr, "[DEBUG] ", log.LstdFlags|log.Lmicroseconds|log.Lshortfile) } - - baseDialer, err := config.NewDefaultConfigParser().WrapStreamDialer(&transport.TCPDialer{}, *baseTransportFlag) + configToDialer := config.NewDefaultConfigToDialer() + baseDialer, err := configToDialer.NewStreamDialer(*baseTransportFlag) if err != nil { log.Fatalf("Could not create dialer: %v\n", err) } @@ -235,7 +235,6 @@ func main() { success := false jsonEncoder := json.NewEncoder(os.Stdout) jsonEncoder.SetEscapeHTML(false) - configToDialer := config.NewDefaultConfigToDialer() for _, resolverHost := range strings.Split(*resolverFlag, ",") { resolverHost := strings.TrimSpace(resolverHost) resolverAddress := net.JoinHostPort(resolverHost, "53") @@ -246,21 +245,23 @@ func main() { startTime := time.Now() switch proto { case "tcp": - base, err := configParser.WrapStreamDialer(&transport.TCPDialer{}, *baseTransportFlag) + base, err := configToDialer.NewStreamDialer(*baseTransportFlag) if err != nil { log.Fatalf("Could not create base dialer: %v\n", err) } wrap := func(baseDialer transport.StreamDialer) (transport.StreamDialer, error) { - return config.WrapStreamDialer(baseDialer, *transportFlag) + configToDialer.BaseStreamDialer = baseDialer + return configToDialer.NewStreamDialer(*transportFlag) } testResult, testErr = connectivity.TestStreamConnectivityWithDNS(context.Background(), base, wrap, resolverAddress, *domainFlag) case "udp": - base, err := configParser.WrapPacketDialer(&transport.UDPDialer{}, *baseTransportFlag) + base, err := configToDialer.NewPacketDialer(*baseTransportFlag) if err != nil { log.Fatalf("Could not create base dialer: %v\n", err) } wrap := func(baseDialer transport.PacketDialer) (transport.PacketDialer, error) { - return config.WrapPacketDialer(baseDialer, *transportFlag) + configToDialer.BasePacketDialer = baseDialer + return configToDialer.NewPacketDialer(*transportFlag) } testResult, testErr = connectivity.TestPacketConnectivityWithDNS(context.Background(), base, wrap, resolverAddress, *domainFlag) default: From c7832deb4cfb2cafd097b3965c39270dab7f26ba Mon Sep 17 00:00:00 2001 From: amir gh Date: Tue, 30 Jul 2024 15:53:34 -0600 Subject: [PATCH 24/27] removed commented code --- x/examples/test-connectivity/main.go | 7 ------- 1 file changed, 7 deletions(-) diff --git a/x/examples/test-connectivity/main.go b/x/examples/test-connectivity/main.go index 01541fbd..22f77135 100644 --- a/x/examples/test-connectivity/main.go +++ b/x/examples/test-connectivity/main.go @@ -306,13 +306,6 @@ func main() { } r.Result.Attempts = append(r.Result.Attempts, cj) } - //fmt.Println("setting selected address...") - // if testResult.SelectedAddress != "" { - // selectedAddressJSON, err := newAddressJSON(testResult.SelectedAddress) - // if err == nil { - // r.SelectedAddress = &selectedAddressJSON - // } - // } if reportCollector != nil && r.Result.Attempts != nil { // do not report if there are no attempts From dfd82c928080819246ecce25b9036468adc5f737 Mon Sep 17 00:00:00 2001 From: amir gh Date: Tue, 30 Jul 2024 16:01:47 -0600 Subject: [PATCH 25/27] reverse wrappErrors in dns --- dns/resolver.go | 26 +++++++++++++------------- 1 file changed, 13 insertions(+), 13 deletions(-) diff --git a/dns/resolver.go b/dns/resolver.go index dda59e7a..c8a3e69a 100644 --- a/dns/resolver.go +++ b/dns/resolver.go @@ -186,16 +186,16 @@ func queryDatagram(conn io.ReadWriter, q dnsmessage.Question) (*dnsmessage.Messa err = nil } if err != nil { - return nil, &nestedError{ErrReceive, wrappErrors(returnErr, fmt.Errorf("read message failed: %w", err))} + return nil, &nestedError{ErrReceive, errors.Join(returnErr, fmt.Errorf("read message failed: %w", err))} } var msg dnsmessage.Message if err := msg.Unpack(buf[:n]); err != nil { - returnErr = wrappErrors(returnErr, err) + returnErr = errors.Join(returnErr, err) // Ignore invalid packets that fail to parse. It could be injected. continue } if err := checkResponse(id, q, msg.Header, msg.Questions); err != nil { - returnErr = wrappErrors(returnErr, err) + returnErr = errors.Join(returnErr, err) continue } return &msg, nil @@ -397,13 +397,13 @@ func NewHTTPSResolver(sd transport.StreamDialer, resolverAddr string, url string }) } -func wrappErrors(err1, err2 error) error { - switch { - case err1 == nil: - return err2 - case err2 == nil: - return err1 - default: - return fmt.Errorf("%v: %w", err1, err2) - } -} +// func wrappErrors(err1, err2 error) error { +// switch { +// case err1 == nil: +// return err2 +// case err2 == nil: +// return err1 +// default: +// return fmt.Errorf("%v: %w", err1, err2) +// } +// } From b8134d8c80e443663d86f91aea81ab61fba4b483 Mon Sep 17 00:00:00 2001 From: amir gh Date: Fri, 9 Aug 2024 15:58:32 -0700 Subject: [PATCH 26/27] fix wrap issue --- x/examples/test-connectivity/main.go | 10 ++++++---- 1 file changed, 6 insertions(+), 4 deletions(-) diff --git a/x/examples/test-connectivity/main.go b/x/examples/test-connectivity/main.go index 22f77135..fbd97972 100644 --- a/x/examples/test-connectivity/main.go +++ b/x/examples/test-connectivity/main.go @@ -250,8 +250,9 @@ func main() { log.Fatalf("Could not create base dialer: %v\n", err) } wrap := func(baseDialer transport.StreamDialer) (transport.StreamDialer, error) { - configToDialer.BaseStreamDialer = baseDialer - return configToDialer.NewStreamDialer(*transportFlag) + c := config.NewDefaultConfigToDialer() + c.BaseStreamDialer = baseDialer + return c.NewStreamDialer(*transportFlag) } testResult, testErr = connectivity.TestStreamConnectivityWithDNS(context.Background(), base, wrap, resolverAddress, *domainFlag) case "udp": @@ -260,8 +261,9 @@ func main() { log.Fatalf("Could not create base dialer: %v\n", err) } wrap := func(baseDialer transport.PacketDialer) (transport.PacketDialer, error) { - configToDialer.BasePacketDialer = baseDialer - return configToDialer.NewPacketDialer(*transportFlag) + c := config.NewDefaultConfigToDialer() + c.BasePacketDialer = baseDialer + return c.NewPacketDialer(*transportFlag) } testResult, testErr = connectivity.TestPacketConnectivityWithDNS(context.Background(), base, wrap, resolverAddress, *domainFlag) default: From 7318a7e36af2d84d0e88f36df53066a8a9dcea37 Mon Sep 17 00:00:00 2001 From: amir gh Date: Fri, 9 Aug 2024 16:36:46 -0700 Subject: [PATCH 27/27] change time to start_time --- x/examples/test-connectivity/main.go | 12 ++++++------ 1 file changed, 6 insertions(+), 6 deletions(-) diff --git a/x/examples/test-connectivity/main.go b/x/examples/test-connectivity/main.go index fbd97972..4080ed26 100644 --- a/x/examples/test-connectivity/main.go +++ b/x/examples/test-connectivity/main.go @@ -50,7 +50,7 @@ type connectivityReport struct { Result connectivityResult `json:"result"` // Observations - Time time.Time `json:"time"` + StartTime time.Time `json:"start_time"` DurationMs int64 `json:"duration_ms"` Error *errorJSON `json:"error"` } @@ -62,7 +62,7 @@ type connectivityResult struct { type connectionAttemptJSON struct { Address *addressJSON `json:"address,omitempty"` - Time time.Time `json:"time"` + StartTime time.Time `json:"start_time"` DurationMs int64 `json:"duration_ms"` Error *errorJSON `json:"error"` } @@ -283,9 +283,9 @@ func main() { log.Fatalf("Failed to sanitize config: %v", err) } r := connectivityReport{ - Resolver: resolverAddress, - Proto: proto, - Time: startTime.UTC().Truncate(time.Second), + Resolver: resolverAddress, + Proto: proto, + StartTime: startTime.UTC().Truncate(time.Second), // TODO(fortuna): Add sanitized config: Transport: sanitizedConfig, DurationMs: testDuration.Milliseconds(), @@ -301,7 +301,7 @@ func main() { if err == nil { cj.Address = &addressJSON } - cj.Time = cr.StartTime.UTC().Truncate(time.Second) + cj.StartTime = cr.StartTime.UTC().Truncate(time.Second) cj.DurationMs = cr.Duration.Milliseconds() if cr.Error != nil { cj.Error = makeErrorRecord(cr.Error)