Skip to content

Commit

Permalink
Implement resolve_canonical_bootstrap_servers_only
Browse files Browse the repository at this point in the history
Signed-off-by: George Brighton <[email protected]>
  • Loading branch information
gebn authored and dnwe committed Jul 26, 2023
1 parent ecf43f4 commit e6001bc
Show file tree
Hide file tree
Showing 2 changed files with 69 additions and 0 deletions.
60 changes: 60 additions & 0 deletions client.go
Original file line number Diff line number Diff line change
@@ -1,13 +1,18 @@
package sarama

import (
"context"
"errors"
"math"
"math/rand"
"net"
"sort"
"strings"
"sync"
"sync/atomic"
"time"

"golang.org/x/net/proxy"
)

// Client is a generic Kafka client. It manages connections to one or more Kafka brokers.
Expand Down Expand Up @@ -191,6 +196,14 @@ func NewClient(addrs []string, conf *Config) (Client, error) {
transactionCoordinators: make(map[string]int32),
}

if conf.Net.ResolveCanonicalBootstrapServers {
var err error
addrs, err = client.resolveCanonicalNames(addrs)
if err != nil {
return nil, err
}
}

client.randomizeSeedBrokers(addrs)

if conf.Metadata.Full {
Expand Down Expand Up @@ -1227,6 +1240,53 @@ func (client *client) findCoordinator(coordinatorKey string, coordinatorType Coo
return retry(Wrap(ErrOutOfBrokers, brokerErrors...))
}

func (client *client) resolveCanonicalNames(addrs []string) ([]string, error) {
ctx := context.Background()

dialer := client.Config().getDialer()
resolver := net.Resolver{
Dial: func(ctx context.Context, network, address string) (net.Conn, error) {
// dial func should only be called once, so switching within is acceptable
switch d := dialer.(type) {
case proxy.ContextDialer:
return d.DialContext(ctx, network, address)
default:
// we have no choice but to ignore the context
return d.Dial(network, address)
}
},
}

canonicalAddrs := make(map[string]struct{}, len(addrs)) // dedupe as we go
for _, addr := range addrs {
host, port, err := net.SplitHostPort(addr)
if err != nil {
return nil, err // message includes addr
}

ips, err := resolver.LookupHost(ctx, host)
if err != nil {
return nil, err // message includes host
}
for _, ip := range ips {
ptrs, err := resolver.LookupAddr(ctx, ip)
if err != nil {
return nil, err // message includes ip
}

// unlike the Java client, we do not further check that PTRs resolve
ptr := strings.TrimSuffix(ptrs[0], ".") // trailing dot breaks GSSAPI
canonicalAddrs[net.JoinHostPort(ptr, port)] = struct{}{}
}
}

addrs = make([]string, 0, len(canonicalAddrs))
for addr := range canonicalAddrs {
addrs = append(addrs, addr)
}
return addrs, nil
}

// nopCloserClient embeds an existing Client, but disables
// the Close method (yet all other methods pass
// through unchanged). This is for use in larger structs
Expand Down
9 changes: 9 additions & 0 deletions config.go
Original file line number Diff line number Diff line change
Expand Up @@ -50,6 +50,15 @@ type Config struct {
ReadTimeout time.Duration // How long to wait for a response.
WriteTimeout time.Duration // How long to wait for a transmit.

// ResolveCanonicalBootstrapServers turns each bootstrap broker address
// into a set of IPs, then does a reverse lookup on each one to get its
// canonical hostname. This list of hostnames then replaces the
// original address list. Similar to the `client.dns.lookup` option in
// the JVM client, this is especially useful with GSSAPI, where it
// allows providing an alias record instead of individual broker
// hostnames. Defaults to false.
ResolveCanonicalBootstrapServers bool

TLS struct {
// Whether or not to use TLS when connecting to the broker
// (defaults to false).
Expand Down

0 comments on commit e6001bc

Please sign in to comment.