Skip to content

Commit

Permalink
Merge pull request #15 from rogpeppe-contrib/017-simplify-consume-cli
Browse files Browse the repository at this point in the history
change consume CLI usage
  • Loading branch information
rogpeppe authored Oct 14, 2019
2 parents 0686380 + 1d1dd34 commit be6a612
Show file tree
Hide file tree
Showing 11 changed files with 42 additions and 42 deletions.
2 changes: 1 addition & 1 deletion README.md
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@ Some reasons why you might be interested:
* Modify consumer group offsets (e.g., resetting or manually setting offsets per topic and per partition).
* JSON output for easy consumption with tools like [kp](https://github.com/echojc/kp) or [jq](https://stedolan.github.io/jq/).
* JSON input to facilitate automation via tools like [jsonify](https://github.com/fgeller/jsonify).
* Configure brokers and topic via environment variables `KT_BROKERS` and `KT_TOPIC` for a shell session.
* Configure brokers with the `KT_BROKERS` environment variable.
* Fast start up time.
* No buffering of output.
* Binary keys and payloads can be passed and presented in base64 or hex encoding.
Expand Down
36 changes: 25 additions & 11 deletions consume.go
Original file line number Diff line number Diff line change
Expand Up @@ -49,8 +49,6 @@ func (cmd *consumeCmd) addFlags(flags *flag.FlagSet) {
cmd.commonFlags.addFlags(flags)
cmd.partitioners = []string{"sarama"}
flags.Var(listFlag{&cmd.partitioners}, "partitioners", "Comma-separated list of partitioners to consider when using the key flag. See below for details")
flags.StringVar(&cmd.topic, "topic", "", "Topic to consume (required).")
flags.StringVar(&cmd.offsets, "offsets", "", "Specifies what messages to read by partition and offset range (defaults to all).")
flags.DurationVar(&cmd.timeout, "timeout", time.Duration(0), "Timeout after not reading messages (default 0 to disable).")
flags.StringVar(&cmd.keyStr, "key", "", "Print only messages with this key. Note: this relies on the producer using one of the partitioning algorithms specified with the -partitioners argument")
flags.BoolVar(&cmd.pretty, "pretty", true, "Control output pretty printing.")
Expand All @@ -59,22 +57,32 @@ func (cmd *consumeCmd) addFlags(flags *flag.FlagSet) {
flags.StringVar(&cmd.keyCodecType, "keycodec", "string", "Present message key as (string|hex|base64), defaults to string.")

flags.Usage = func() {
fmt.Fprintln(os.Stderr, "Usage of consume:")
fmt.Fprintln(os.Stderr, "Usage: hkt consume [flags] TOPIC [OFFSETS]")
flags.PrintDefaults()
fmt.Fprintln(os.Stderr, consumeDocString)
}
}

func (cmd *consumeCmd) environFlags() map[string]string {
return map[string]string{
"topic": "KT_TOPIC",
"brokers": "KT_BROKERS",
}
}

func (cmd *consumeCmd) run(args []string) error {
if len(args) > 0 {
return fmt.Errorf("unexpected argument to consume command")
if len(args) < 1 {
return fmt.Errorf("consume: no topic specified in first argument")
}
if len(args) > 2 {
return fmt.Errorf("unexpected extra arguments to consume command")
}
cmd.topic = args[0]
if cmd.topic == "" {
return fmt.Errorf("empty topic name")
}
offsetsStr := "all"
if len(args) > 1 {
offsetsStr = args[1]
}
if cmd.verbose {
sarama.Logger = log.New(os.Stderr, "", log.LstdFlags)
Expand All @@ -92,7 +100,7 @@ func (cmd *consumeCmd) run(args []string) error {
if err != nil {
return fmt.Errorf("bad -keycodec argument: %v", err)
}
offsets, err := parseOffsets(cmd.offsets, time.Now())
offsets, err := parseOffsets(offsetsStr, time.Now())
if err != nil {
return err
}
Expand Down Expand Up @@ -442,6 +450,9 @@ var consumeDocString = `
The consume command reads messages from a Kafka topic and prints them
to the standard output.
If the OFFSETS argument isn't provided, it defaults to "all" (all messages from
the topic are returned).
The messages will be printed as a stream of JSON objects in the following form:
{
Expand All @@ -461,8 +472,8 @@ For example:
{"partition":0,"key":"k1","value":{"foo":1234},"time":"2019-10-08T01:01:01Z"}
The values for -topic and -brokers can also be set via environment variables KT_TOPIC and KT_BROKERS respectively.
The values supplied on the command line have priority over environment variable values.
The value for -brokers can also be set with the environment variable KT_BROKERS.
The value supplied on the command line takes precedence over the environment variable.
KEY SEARCH
Expand Down Expand Up @@ -515,9 +526,12 @@ the given topic.
- partition is the numeric identifier for a partition. You can use "all" to
specify a default interval for all partitions.
- start is the included offset or time where consumption should start.
- start is the included offset or time where consumption should start;
it defaults to "oldest".
- end is the included offset or time where consumption should end.
- end is the included offset or time where consumption should end;
it defaults to "newest" when the -f flag isn't provided, or the
maximum possible offset if it is.
An offset may be specified as:
Expand Down
10 changes: 2 additions & 8 deletions consume_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -783,13 +783,11 @@ func TestConsumeParseArgsUsesEnvVar(t *testing.T) {
c := qt.New(t)
defer c.Done()

c.Setenv("KT_TOPIC", "test-topic")
c.Setenv("KT_BROKERS", "hans:2000")

cmd0, _, err := parseCmd("hkt", "consume")
c.Assert(err, qt.Equals, nil)
cmd := cmd0.(*consumeCmd)
c.Assert(cmd.topic, qt.Equals, "test-topic")
c.Assert(cmd.brokers, qt.DeepEquals, []string{"hans:2000"})
}

Expand All @@ -798,12 +796,10 @@ func TestConsumeParseArgsDefault(t *testing.T) {
c := qt.New(t)
defer c.Done()

c.Setenv("KT_TOPIC", "")
c.Setenv("KT_BROKERS", "")
cmd0, _, err := parseCmd("hkt", "consume", "-topic", "test-topic")
cmd0, _, err := parseCmd("hkt", "consume")
c.Assert(err, qt.Equals, nil)
cmd := cmd0.(*consumeCmd)
c.Assert(cmd.topic, qt.Equals, "test-topic")
c.Assert(cmd.brokers, qt.DeepEquals, []string{"localhost:9092"})
}

Expand All @@ -812,13 +808,11 @@ func TestConsumeParseArgsFlagsOverrideEnv(t *testing.T) {
defer c.Done()

// command line arg wins
c.Setenv("KT_TOPIC", "BLUBB")
c.Setenv("KT_BROKERS", "BLABB")

cmd0, _, err := parseCmd("hkt", "consume", "-topic", "test-topic", "-brokers", "hans:2000")
cmd0, _, err := parseCmd("hkt", "consume", "-brokers", "hans:2000")
c.Assert(err, qt.Equals, nil)
cmd := cmd0.(*consumeCmd)
c.Assert(cmd.topic, qt.Equals, "test-topic")
c.Assert(cmd.brokers, qt.DeepEquals, []string{"hans:2000"})
}

Expand Down
5 changes: 2 additions & 3 deletions group.go
Original file line number Diff line number Diff line change
Expand Up @@ -71,7 +71,6 @@ func (cmd *groupCmd) addFlags(flags *flag.FlagSet) {

func (cmd *groupCmd) environFlags() map[string]string {
return map[string]string{
"topic": "KT_TOPIC",
"brokers": "KT_BROKERS",
}
}
Expand Down Expand Up @@ -390,8 +389,8 @@ func (cmd *groupCmd) connect(broker *sarama.Broker) error {
}

var groupDocString = `
The values for -topic and -brokers can also be set via environment variables KT_TOPIC and KT_BROKERS respectively.
The values supplied on the command line win over environment variable values.
The value for -brokers can also be set with the environment variable KT_BROKERS.
The value supplied on the command line takes precedence over the environment variable.
The group command can be used to list groups, their offsets and lag and to reset a group's offset.
Expand Down
5 changes: 2 additions & 3 deletions produce.go
Original file line number Diff line number Diff line change
Expand Up @@ -79,7 +79,6 @@ func (cmd *produceCmd) addFlags(flags *flag.FlagSet) {

func (cmd *produceCmd) environFlags() map[string]string {
return map[string]string{
"topic": "KT_TOPIC",
"brokers": "KT_BROKERS",
}
}
Expand Down Expand Up @@ -261,8 +260,8 @@ func (p producerPartitioner) MessageRequiresConsistency(m *sarama.ProducerMessag
}

var produceDocString = `
The values for -topic and -brokers can also be set via environment variables KT_TOPIC and KT_BROKERS respectively.
The values supplied on the command line win over environment variable values.
The value for -brokers can also be set with the environment variable KT_BROKERS.
The value supplied on the command line takes precedence over the environment variable.
Input is read from stdin and separated by newlines.
Expand Down
10 changes: 2 additions & 8 deletions produce_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,14 +13,12 @@ func TestProduceParseArgsUsesEnvVar(t *testing.T) {
c := qt.New(t)
defer c.Done()

c.Setenv("KT_TOPIC", "test-topic")
c.Setenv("KT_BROKERS", "hans:2000")

cmd0, _, err := parseCmd("hkt", "produce")
c.Assert(err, qt.Equals, nil)
cmd := cmd0.(*produceCmd)

c.Assert(cmd.topic, qt.Equals, "test-topic")
c.Assert(cmd.brokers, qt.DeepEquals, []string{"hans:2000"})
}

Expand All @@ -29,13 +27,11 @@ func TestProduceParseArgsDefault(t *testing.T) {
c := qt.New(t)
defer c.Done()

c.Setenv("KT_TOPIC", "")
c.Setenv("KT_BROKERS", "")

cmd0, _, err := parseCmd("hkt", "produce", "-topic", "test-topic")
cmd0, _, err := parseCmd("hkt", "produce")
c.Assert(err, qt.Equals, nil)
cmd := cmd0.(*produceCmd)
c.Assert(cmd.topic, qt.Equals, "test-topic")
c.Assert(cmd.brokers, qt.DeepEquals, []string{"localhost:9092"})
}

Expand All @@ -44,13 +40,11 @@ func TestProduceParseArgsFlagsOverrideEnv(t *testing.T) {
defer c.Done()

// command line arg wins
c.Setenv("KT_TOPIC", "BLUBB")
c.Setenv("KT_BROKERS", "BLABB")

cmd0, _, err := parseCmd("hkt", "produce", "-topic", "test-topic", "-brokers", "hans:2000")
cmd0, _, err := parseCmd("hkt", "produce", "-brokers", "hans:2000")
c.Assert(err, qt.Equals, nil)
cmd := cmd0.(*produceCmd)
c.Assert(cmd.topic, qt.Equals, "test-topic")
c.Assert(cmd.brokers, qt.DeepEquals, []string{"hans:2000"})
}

Expand Down
6 changes: 3 additions & 3 deletions testdata/consume-with-key.txt
Original file line number Diff line number Diff line change
Expand Up @@ -8,16 +8,16 @@ kt produce -valuecodec string -partitioner std -topic $topic

# consuming with just the default (sarama) partitioner will only
# get messages from the partition chosen by that.
kt consume -valuecodec string -key k1 -topic $topic
kt consume -valuecodec string -key k1 $topic
cmp stdout consume-1-stdout.json

# adding another partitioner will pull in that partition too
kt consume -valuecodec string -key k1 -partitioners sarama,std -topic $topic
kt consume -valuecodec string -key k1 -partitioners sarama,std $topic
cmp stdout consume-2-stdout.json

# specifying "all" will pull in the message that was sent to the
# manually chosen partition too
kt consume -valuecodec string -key k1 -partitioners all -topic $topic
kt consume -valuecodec string -key k1 -partitioners all $topic
cmp stdout consume-3-stdout.json

-- topic-detail.json --
Expand Down
2 changes: 1 addition & 1 deletion testdata/jsonmessages-consume-error.txt
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@ kt admin -createtopic $topic -topicdetail topic-detail.json
stdin produce-stdin.json
kt produce -valuecodec string -topic $topic

kt consume -topic $topic
kt consume $topic
cmp stdout consume-stdout.json
cmp stderr consume-stderr

Expand Down
2 changes: 1 addition & 1 deletion testdata/jsonmessages.txt
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@ kt admin -createtopic $topic -topicdetail topic-detail.json
stdin produce-stdin.json
kt produce -topic $topic

kt consume -topic $topic
kt consume $topic
cmp stdout consume-stdout.json

-- topic-detail.json --
Expand Down
4 changes: 2 additions & 2 deletions testdata/multipartition.txt
Original file line number Diff line number Diff line change
Expand Up @@ -3,10 +3,10 @@ kt admin -createtopic $topic -topicdetail topic-detail.json
stdin produce-stdin.json
kt produce -valuecodec string -topic $topic

kt consume -valuecodec string -topic $topic
kt consume -valuecodec string $topic
cmp stdout consume-1-stdout.json

kt consume -valuecodec string -offsets 3=1:newest-1 -topic $topic
kt consume -valuecodec string $topic 3=1:newest-1
cmp stdout consume-2-stdout.json

-- topic-detail.json --
Expand Down
2 changes: 1 addition & 1 deletion testdata/system.txt
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@ kt produce -valuecodec string -topic $topic
! stdout .

# 2
kt consume -valuecodec string -f -topic $topic -timeout 500ms
kt consume -valuecodec string -f -timeout 500ms $topic
stderr 'consuming from partition 0 timed out after 500ms'
cmpenvjson stdout '{"value": "hello 1", "key": "boom", "partition": 0, "offset": 0, "time": "$now"}'

Expand Down

0 comments on commit be6a612

Please sign in to comment.