Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

WIP: Draft PR for improving connectivity testing to capture resolved IPs and all attempts #228

Draft
wants to merge 27 commits into
base: main
Choose a base branch
from
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
27 commits
Select commit Hold shift + click to select a range
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
16 changes: 16 additions & 0 deletions dns/resolver.go
Original file line number Diff line number Diff line change
Expand Up @@ -268,6 +268,11 @@ func NewUDPResolver(pd transport.PacketDialer, resolverAddr string) Resolver {
return nil, &nestedError{ErrDial, err}
}
defer conn.Close()
// force close connection is context is done/cancelled.
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@fortuna I have to add this here to close the connection for pending receives (in case of timeouts) when another attempt succeeds. I managed to do this with the following change. I am basically cancelling the passed context in connectivity.go and checking if context is done. This is the best I came up with but I am not sure if there's a more elegant way to accomplish this.

go func() {
<-ctx.Done()
conn.Close()
}()
if deadline, ok := ctx.Deadline(); ok {
conn.SetDeadline(deadline)
}
Expand Down Expand Up @@ -391,3 +396,14 @@ func NewHTTPSResolver(sd transport.StreamDialer, resolverAddr string, url string
return &msg, nil
})
}

// func wrappErrors(err1, err2 error) error {
// switch {
// case err1 == nil:
// return err2
// case err2 == nil:
// return err1
// default:
// return fmt.Errorf("%v: %w", err1, err2)
// }
// }
288 changes: 287 additions & 1 deletion x/connectivity/connectivity.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,13 +18,40 @@ import (
"context"
"errors"
"fmt"
"net"
"sync"
"syscall"
"time"

"github.com/Jigsaw-Code/outline-sdk/dns"
"github.com/Jigsaw-Code/outline-sdk/transport"
"golang.org/x/net/dns/dnsmessage"
)

// ConnectivityResult captures the observed result of the connectivity test.
type ConnectivityResult struct {
// Lists each connection attempt
Attempts []ConnectionAttempt
// Address of the connection that was selected
Endpoint string
// Start time of the main test
StartTime time.Time
// Duration of the main test
Duration time.Duration
// result error
Error *ConnectivityError
}

type ConnectionAttempt struct {
Address string
// Start time of the connection attempt
StartTime time.Time
// Duration of the connection attempt
Duration time.Duration
// Observed error
Error *ConnectivityError
}

// ConnectivityError captures the observed error of the connectivity test.
type ConnectivityError struct {
// Which operation in the test that failed: "connect", "send" or "receive"
Expand Down Expand Up @@ -64,6 +91,249 @@ func makeConnectivityError(op string, err error) *ConnectivityError {
return &ConnectivityError{Op: op, PosixError: code, Err: err}
}

type WrapStreamDialer func(baseDialer transport.StreamDialer) (transport.StreamDialer, error)

var ErrNoMoreIPs = errors.New("no more ips to try")

// TestStreamConnectivityWithDNS tests weather we can get a response from a DNS resolver at resolverAddress over a stream connection. It sends testDomain as the query.
// It uses the baseDialer to create a first-hop connection to the proxy, and the wrap to apply the transport.
// The baseDialer is typically TCPDialer, but it can be replaced for remote measurements.
func TestStreamConnectivityWithDNS(ctx context.Context, baseDialer transport.StreamDialer, wrap WrapStreamDialer, resolverAddress string, testDomain string) (*ConnectivityResult, error) {
testResult := &ConnectivityResult{}
connectResult := &testResult.Attempts
ipIndex := 0
done := make(chan bool)
proceed := make(chan bool, 1)
var waitGroup sync.WaitGroup
// Create a new context for canceling goroutines
ctx, cancel := context.WithCancel(ctx)
defer cancel()
proceed <- true
testResult.StartTime = time.Now()
loop:
for {
select {
case <-done:
break loop
case <-proceed:
waitGroup.Add(1)
attempt := &ConnectionAttempt{}
go func(attempt *ConnectionAttempt) {
defer waitGroup.Done()
attempt.StartTime = time.Now()
interceptDialer := transport.FuncStreamDialer(func(ctx context.Context, addr string) (transport.StreamConn, error) {
// Captures the address of the first hop, before resolution.
testResult.Endpoint = addr
host, port, err := net.SplitHostPort(addr)
if err != nil {
cancel()
done <- true
return nil, err
}
ips, err := (&net.Resolver{PreferGo: false}).LookupHost(ctx, host)
if err != nil {
fmt.Println("LookupHost failed: ", err)
cancel()
done <- true
return nil, err
}
var conn transport.StreamConn
if ipIndex < len(ips) {
// proceed to setting up the next test
proceed <- true
ip := ips[ipIndex]
fmt.Printf("Trying IP address %v\n", ip)
ipIndex++
addr := net.JoinHostPort(ip, port)
attempt.Address = addr
// TODO: pass timeout paramter as argument
ipCtx, cancelWithTimeout := context.WithTimeout(ctx, 5*time.Second)
defer cancelWithTimeout()
conn, err = baseDialer.DialStream(ipCtx, addr)
if err != nil {
return nil, err
}
return conn, err
} else {
// stop iterating
// fmt.Println("No more IPs to try")
done <- true
return nil, ErrNoMoreIPs
}
})
dialer, err := wrap(interceptDialer)
if err != nil {
fmt.Printf("wrap failed: %v\n", err)
done <- true
return
}
resolverConn, err := dialer.DialStream(ctx, resolverAddress)
if err != nil {
// Do not include cencelled errors in the result
if errors.Is(err, context.Canceled) {
fmt.Println("Context cancelled")
return
}
// Do not include ErrNoMoreIPs type error in the attempt result
if errors.Is(err, ErrNoMoreIPs) {
return
}
attempt.Duration = time.Since(attempt.StartTime)
attempt.Error = makeConnectivityError("connect", err)
*connectResult = append(*connectResult, *attempt)
// CHANGE: populate main test result error field with
// one of attempt errors if non of the attempts succeeded
testResult.Error = attempt.Error
return
}
resolver := dns.NewTCPResolver(transport.FuncStreamDialer(func(ctx context.Context, addr string) (transport.StreamConn, error) {
return resolverConn, nil
}), resolverAddress)
// I am ignoring the error returned by TestConnectivityWithResolver
// because I am already capturing the error in the attempt. Not sure
// if this is the right approach.
attempt.Error, err = TestConnectivityWithResolver(ctx, resolver, testDomain)
if err != nil {
return
}
attempt.Duration = time.Since(attempt.StartTime)
*connectResult = append(*connectResult, *attempt)
if attempt.Error == nil {
testResult.Error = nil
// test has succeeded; cancel the rest of the goroutines
cancel()
} else {
// CHANGE: populate main test result error field with
// one of attempt errors if non of the attempts succeeded
testResult.Error = attempt.Error
}
}(attempt)
}
}
waitGroup.Wait()
testResult.Duration = time.Since(testResult.StartTime)
return testResult, nil
}

type WrapPacketDialer func(baseDialer transport.PacketDialer) (transport.PacketDialer, error)

// TestPacketConnectivityWithDNS tests weather we can get a response from a DNS resolver at resolverAddress over a packet connection. It sends testDomain as the query.
// It uses the baseDialer to create a first-hop connection to the proxy, and the wrap to apply the transport.
// The baseDialer is typically UDPDialer, but it can be replaced for remote measurements.
func TestPacketConnectivityWithDNS(ctx context.Context, baseDialer transport.PacketDialer, wrap WrapPacketDialer, resolverAddress string, testDomain string) (*ConnectivityResult, error) {
testResult := &ConnectivityResult{}
connectResult := &testResult.Attempts
ipIndex := 0
done := make(chan bool)
proceed := make(chan bool, 1)
var waitGroup sync.WaitGroup
// Create a new context for canceling goroutines
ctx, cancel := context.WithCancel(ctx)
defer cancel()
proceed <- true
testResult.StartTime = time.Now()
loop:
for {
select {
case <-done:
break loop
case <-proceed:
waitGroup.Add(1)
attempt := &ConnectionAttempt{}
go func(attempt *ConnectionAttempt) {
defer waitGroup.Done()
attempt.StartTime = time.Now()
interceptDialer := transport.FuncPacketDialer(func(ctx context.Context, addr string) (net.Conn, error) {
// Captures the address of the first hop, before resolution.
testResult.Endpoint = addr
host, port, err := net.SplitHostPort(addr)
if err != nil {
cancel()
done <- true
return nil, err
}
ips, err := (&net.Resolver{PreferGo: false}).LookupHost(ctx, host)
if err != nil {
cancel()
done <- true
return nil, err
}
var conn net.Conn
if ipIndex < len(ips) {
// proceed to setting up the next test
proceed <- true
ip := ips[ipIndex]
ipIndex++
fmt.Printf("Trying address %v\n", ip)
addr = net.JoinHostPort(ip, port)
attempt.Address = addr
conn, err = baseDialer.DialPacket(ctx, addr)
if err != nil {
return nil, err
}
return conn, err
} else {
// stop iterating
done <- true
return nil, ErrNoMoreIPs
}
})
dialer, err := wrap(interceptDialer)
if err != nil {
fmt.Printf("wrap failed: %v\n", err)
done <- true
return
}
resolverConn, err := dialer.DialPacket(ctx, resolverAddress)
if err != nil {
// Do not include cencelled errors in the result
// This never gets triggered in PacketDialer since
// connect is not a blocking operation; we can remove it later
if errors.Is(err, context.Canceled) {
return
}
// Do not include ErrNoMoreIPs type error in the attempt result
if errors.Is(err, ErrNoMoreIPs) {
return
}
attempt.Duration = time.Since(attempt.StartTime)
attempt.Error = makeConnectivityError("connect", err)
*connectResult = append(*connectResult, *attempt)
// CHANGE: populate main test result error field with
// one of attempt errors if non of the attempts succeeded
testResult.Error = attempt.Error
return
}
resolver := dns.NewUDPResolver(transport.FuncPacketDialer(func(ctx context.Context, addr string) (net.Conn, error) {
return resolverConn, nil
}), resolverAddress)
//resolver := dns.NewUDPResolver(dialer, resolverAddress)
attempt.Error, err = TestConnectivityWithResolver(ctx, resolver, testDomain)
if err != nil {
fmt.Printf("Test failed: %v\n", err)
return
}
attempt.Duration = time.Since(attempt.StartTime)
*connectResult = append(*connectResult, *attempt)
if attempt.Error == nil {
testResult.Error = nil
// test has succeeded; cancel the rest of the goroutines
cancel()
} else {
// CHANGE: populate main test result error field with
// one of attempt errors if non of the attempts succeeded
testResult.Error = attempt.Error
//return
}
}(attempt)
}
}
// TODO: error is always being returned as nil; must change this
waitGroup.Wait()
testResult.Duration = time.Since(testResult.StartTime)
return testResult, nil
}

// TestConnectivityWithResolver tests weather we can get a response from the given [Resolver]. It can be used
// to test connectivity of its underlying [transport.StreamDialer] or [transport.PacketDialer].
// Invalid tests that cannot assert connectivity will return (nil, error).
Expand All @@ -72,7 +342,7 @@ func makeConnectivityError(op string, err error) *ConnectivityError {
func TestConnectivityWithResolver(ctx context.Context, resolver dns.Resolver, testDomain string) (*ConnectivityError, error) {
if _, ok := ctx.Deadline(); !ok {
// Default deadline is 5 seconds.
deadline := time.Now().Add(5 * time.Second)
deadline := time.Now().Add(8 * time.Second)
var cancel context.CancelFunc
ctx, cancel = context.WithDeadline(ctx, deadline)
// Releases the timer.
Expand All @@ -85,9 +355,25 @@ func TestConnectivityWithResolver(ctx context.Context, resolver dns.Resolver, te

_, err = resolver.Query(ctx, *q)

// This code block has an issue since
// it does not distinguish between a context deadline exeeded
// and context cancelled error.
// if ctx.Err() != nil {
// fmt.Println("Context error: ", ctx.Err())
// return nil, ctx.Err()
// }

if errors.Is(err, dns.ErrBadRequest) {
// maybe change this to include err in report?
return nil, err
}
// If the connection is force cancelled,
// we don't want to report an error.
// Fortuna suggestion: ctx.Err() is not nil
if errors.Is(err, net.ErrClosed) {
return nil, err
}

if errors.Is(err, dns.ErrDial) {
return makeConnectivityError("connect", err), nil
} else if errors.Is(err, dns.ErrSend) {
Expand Down
Loading
Loading