Skip to content

Commit

Permalink
Merge branch 'main' into simple-producer-example
Browse files Browse the repository at this point in the history
  • Loading branch information
Gorgonx7 authored Oct 12, 2023
2 parents edf8b22 + 24f1249 commit 80243e6
Show file tree
Hide file tree
Showing 2 changed files with 13 additions and 5 deletions.
2 changes: 1 addition & 1 deletion balance_strategy.go
Original file line number Diff line number Diff line change
Expand Up @@ -65,7 +65,7 @@ type BalanceStrategy interface {
// Example with two topics T1 and T2 with six partitions each (0..5) and two members (M1, M2):
//
// M1: {T1: [0, 1, 2], T2: [0, 1, 2]}
// M2: {T2: [3, 4, 5], T2: [3, 4, 5]}
// M2: {T1: [3, 4, 5], T2: [3, 4, 5]}
func NewBalanceStrategyRange() BalanceStrategy {
return &balanceStrategy{
name: RangeBalanceStrategyName,
Expand Down
16 changes: 12 additions & 4 deletions utils.go
Original file line number Diff line number Diff line change
Expand Up @@ -280,6 +280,14 @@ var (
}
)

var (
// This regex validates that a string complies with the pre kafka 1.0.0 format for version strings, for example 0.11.0.3
validPreKafka1Version = regexp.MustCompile(`^0\.\d+\.\d+\.\d+$`)

// This regex validates that a string complies with the post Kafka 1.0.0 format, for example 1.0.0
validPostKafka1Version = regexp.MustCompile(`^\d+\.\d+\.\d+$`)
)

// ParseKafkaVersion parses and returns kafka version or error from a string
func ParseKafkaVersion(s string) (KafkaVersion, error) {
if len(s) < 5 {
Expand All @@ -288,18 +296,18 @@ func ParseKafkaVersion(s string) (KafkaVersion, error) {
var major, minor, veryMinor, patch uint
var err error
if s[0] == '0' {
err = scanKafkaVersion(s, `^0\.\d+\.\d+\.\d+$`, "0.%d.%d.%d", [3]*uint{&minor, &veryMinor, &patch})
err = scanKafkaVersion(s, validPreKafka1Version, "0.%d.%d.%d", [3]*uint{&minor, &veryMinor, &patch})
} else {
err = scanKafkaVersion(s, `^\d+\.\d+\.\d+$`, "%d.%d.%d", [3]*uint{&major, &minor, &veryMinor})
err = scanKafkaVersion(s, validPostKafka1Version, "%d.%d.%d", [3]*uint{&major, &minor, &veryMinor})
}
if err != nil {
return DefaultVersion, err
}
return newKafkaVersion(major, minor, veryMinor, patch), nil
}

func scanKafkaVersion(s string, pattern string, format string, v [3]*uint) error {
if !regexp.MustCompile(pattern).MatchString(s) {
func scanKafkaVersion(s string, pattern *regexp.Regexp, format string, v [3]*uint) error {
if !pattern.MatchString(s) {
return fmt.Errorf("invalid version `%s`", s)
}
_, err := fmt.Sscanf(s, format, v[0], v[1], v[2])
Expand Down

0 comments on commit 80243e6

Please sign in to comment.