Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix(client): refactor duplicated replica+partition logic #2925

Open
wants to merge 2 commits into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
90 changes: 22 additions & 68 deletions client.go
Original file line number Diff line number Diff line change
Expand Up @@ -363,34 +363,19 @@ func (client *client) MetadataTopics() ([]string, error) {
}

func (client *client) Partitions(topic string) ([]int32, error) {
if client.Closed() {
return nil, ErrClosedClient
}

partitions := client.cachedPartitions(topic, allPartitions)

if len(partitions) == 0 {
err := client.RefreshMetadata(topic)
if err != nil {
return nil, err
}
partitions = client.cachedPartitions(topic, allPartitions)
}

// no partitions found after refresh metadata
if len(partitions) == 0 {
return nil, ErrUnknownTopicOrPartition
}

return partitions, nil
return client.getPartitions(topic, allPartitions)
}

func (client *client) WritablePartitions(topic string) ([]int32, error) {
return client.getPartitions(topic, writablePartitions)
}

func (client *client) getPartitions(topic string, pt partitionType) ([]int32, error) {
if client.Closed() {
return nil, ErrClosedClient
}

partitions := client.cachedPartitions(topic, writablePartitions)
partitions := client.cachedPartitions(topic, pt)

// len==0 catches when it's nil (no such topic) and the odd case when every single
// partition is undergoing leader election simultaneously. Callers have to be able to handle
Expand All @@ -403,7 +388,7 @@ func (client *client) WritablePartitions(topic string) ([]int32, error) {
if err != nil {
return nil, err
}
partitions = client.cachedPartitions(topic, writablePartitions)
partitions = client.cachedPartitions(topic, pt)
}

if partitions == nil {
Expand All @@ -414,56 +399,24 @@ func (client *client) WritablePartitions(topic string) ([]int32, error) {
}

func (client *client) Replicas(topic string, partitionID int32) ([]int32, error) {
if client.Closed() {
return nil, ErrClosedClient
}

metadata := client.cachedMetadata(topic, partitionID)

if metadata == nil {
err := client.RefreshMetadata(topic)
if err != nil {
return nil, err
}
metadata = client.cachedMetadata(topic, partitionID)
}

if metadata == nil {
return nil, ErrUnknownTopicOrPartition
}

if errors.Is(metadata.Err, ErrReplicaNotAvailable) {
return dupInt32Slice(metadata.Replicas), metadata.Err
}
return dupInt32Slice(metadata.Replicas), nil
return client.getReplicas(topic, partitionID, func(metadata *PartitionMetadata) []int32 {
return metadata.Replicas
})
}

func (client *client) InSyncReplicas(topic string, partitionID int32) ([]int32, error) {
if client.Closed() {
return nil, ErrClosedClient
}

metadata := client.cachedMetadata(topic, partitionID)

if metadata == nil {
err := client.RefreshMetadata(topic)
if err != nil {
return nil, err
}
metadata = client.cachedMetadata(topic, partitionID)
}

if metadata == nil {
return nil, ErrUnknownTopicOrPartition
}

if errors.Is(metadata.Err, ErrReplicaNotAvailable) {
return dupInt32Slice(metadata.Isr), metadata.Err
}
return dupInt32Slice(metadata.Isr), nil
return client.getReplicas(topic, partitionID, func(metadata *PartitionMetadata) []int32 {
return metadata.Isr
})
}

func (client *client) OfflineReplicas(topic string, partitionID int32) ([]int32, error) {
return client.getReplicas(topic, partitionID, func(metadata *PartitionMetadata) []int32 {
return metadata.OfflineReplicas
})
}

func (client *client) getReplicas(topic string, partitionID int32, extractor func(metadata *PartitionMetadata) []int32) ([]int32, error) {
if client.Closed() {
return nil, ErrClosedClient
}
Expand All @@ -482,10 +435,11 @@ func (client *client) OfflineReplicas(topic string, partitionID int32) ([]int32,
return nil, ErrUnknownTopicOrPartition
}

replicas := extractor(metadata)
if errors.Is(metadata.Err, ErrReplicaNotAvailable) {
return dupInt32Slice(metadata.OfflineReplicas), metadata.Err
return dupInt32Slice(replicas), metadata.Err
}
return dupInt32Slice(metadata.OfflineReplicas), nil
return dupInt32Slice(replicas), nil
}

func (client *client) Leader(topic string, partitionID int32) (*Broker, error) {
Expand Down
7 changes: 3 additions & 4 deletions utils.go
Original file line number Diff line number Diff line change
Expand Up @@ -44,11 +44,10 @@ func withRecover(fn func()) {
}

func safeAsyncClose(b *Broker) {
tmp := b // local var prevents clobbering in goroutine
go withRecover(func() {
if connected, _ := tmp.Connected(); connected {
if err := tmp.Close(); err != nil {
Logger.Println("Error closing broker", tmp.ID(), ":", err)
if connected, _ := b.Connected(); connected {
if err := b.Close(); err != nil {
Logger.Println("Error closing broker", b.ID(), ":", err)
}
}
})
Expand Down