From 1d1dd349382c13a3d3b7549bccdef2490a69bbdf Mon Sep 17 00:00:00 2001 From: Roger Peppe Date: Wed, 9 Oct 2019 19:58:55 +0100 Subject: [PATCH] change consume CLI usage Currently the consume subcommand usage uses flags to specify both the topic and the offsets to be consumed. Given that both are very commonly specified, using flags seems overly verbose, so we change to using positional arguments instead. Instead of the current: hkt consume -topic my-topic -offsets all=newest-30m: the new usage is: hkt consume my-topic all=newest-30m: The KT_TOPIC environment variable is no longer recognised by the hkt command. --- README.md | 2 +- consume.go | 36 +++++++++++++++++-------- consume_test.go | 10 ++----- group.go | 5 ++-- produce.go | 5 ++-- produce_test.go | 10 ++----- testdata/consume-with-key.txt | 6 ++--- testdata/jsonmessages-consume-error.txt | 2 +- testdata/jsonmessages.txt | 2 +- testdata/multipartition.txt | 4 +-- testdata/system.txt | 2 +- 11 files changed, 42 insertions(+), 42 deletions(-) diff --git a/README.md b/README.md index 6275aab..869e1e1 100644 --- a/README.md +++ b/README.md @@ -7,7 +7,7 @@ Some reasons why you might be interested: * Modify consumer group offsets (e.g., resetting or manually setting offsets per topic and per partition). * JSON output for easy consumption with tools like [kp](https://github.com/echojc/kp) or [jq](https://stedolan.github.io/jq/). * JSON input to facilitate automation via tools like [jsonify](https://github.com/fgeller/jsonify). -* Configure brokers and topic via environment variables `KT_BROKERS` and `KT_TOPIC` for a shell session. +* Configure brokers with the `KT_BROKERS` environment variable. * Fast start up time. * No buffering of output. * Binary keys and payloads can be passed and presented in base64 or hex encoding. diff --git a/consume.go b/consume.go index 5be58f2..ff84161 100644 --- a/consume.go +++ b/consume.go @@ -49,8 +49,6 @@ func (cmd *consumeCmd) addFlags(flags *flag.FlagSet) { cmd.commonFlags.addFlags(flags) cmd.partitioners = []string{"sarama"} flags.Var(listFlag{&cmd.partitioners}, "partitioners", "Comma-separated list of partitioners to consider when using the key flag. See below for details") - flags.StringVar(&cmd.topic, "topic", "", "Topic to consume (required).") - flags.StringVar(&cmd.offsets, "offsets", "", "Specifies what messages to read by partition and offset range (defaults to all).") flags.DurationVar(&cmd.timeout, "timeout", time.Duration(0), "Timeout after not reading messages (default 0 to disable).") flags.StringVar(&cmd.keyStr, "key", "", "Print only messages with this key. Note: this relies on the producer using one of the partitioning algorithms specified with the -partitioners argument") flags.BoolVar(&cmd.pretty, "pretty", true, "Control output pretty printing.") @@ -59,7 +57,7 @@ func (cmd *consumeCmd) addFlags(flags *flag.FlagSet) { flags.StringVar(&cmd.keyCodecType, "keycodec", "string", "Present message key as (string|hex|base64), defaults to string.") flags.Usage = func() { - fmt.Fprintln(os.Stderr, "Usage of consume:") + fmt.Fprintln(os.Stderr, "Usage: hkt consume [flags] TOPIC [OFFSETS]") flags.PrintDefaults() fmt.Fprintln(os.Stderr, consumeDocString) } @@ -67,14 +65,24 @@ func (cmd *consumeCmd) addFlags(flags *flag.FlagSet) { func (cmd *consumeCmd) environFlags() map[string]string { return map[string]string{ - "topic": "KT_TOPIC", "brokers": "KT_BROKERS", } } func (cmd *consumeCmd) run(args []string) error { - if len(args) > 0 { - return fmt.Errorf("unexpected argument to consume command") + if len(args) < 1 { + return fmt.Errorf("consume: no topic specified in first argument") + } + if len(args) > 2 { + return fmt.Errorf("unexpected extra arguments to consume command") + } + cmd.topic = args[0] + if cmd.topic == "" { + return fmt.Errorf("empty topic name") + } + offsetsStr := "all" + if len(args) > 1 { + offsetsStr = args[1] } if cmd.verbose { sarama.Logger = log.New(os.Stderr, "", log.LstdFlags) @@ -92,7 +100,7 @@ func (cmd *consumeCmd) run(args []string) error { if err != nil { return fmt.Errorf("bad -keycodec argument: %v", err) } - offsets, err := parseOffsets(cmd.offsets, time.Now()) + offsets, err := parseOffsets(offsetsStr, time.Now()) if err != nil { return err } @@ -442,6 +450,9 @@ var consumeDocString = ` The consume command reads messages from a Kafka topic and prints them to the standard output. +If the OFFSETS argument isn't provided, it defaults to "all" (all messages from +the topic are returned). + The messages will be printed as a stream of JSON objects in the following form: { @@ -461,8 +472,8 @@ For example: {"partition":0,"key":"k1","value":{"foo":1234},"time":"2019-10-08T01:01:01Z"} -The values for -topic and -brokers can also be set via environment variables KT_TOPIC and KT_BROKERS respectively. -The values supplied on the command line have priority over environment variable values. +The value for -brokers can also be set with the environment variable KT_BROKERS. +The value supplied on the command line takes precedence over the environment variable. KEY SEARCH @@ -515,9 +526,12 @@ the given topic. - partition is the numeric identifier for a partition. You can use "all" to specify a default interval for all partitions. - - start is the included offset or time where consumption should start. + - start is the included offset or time where consumption should start; + it defaults to "oldest". - - end is the included offset or time where consumption should end. + - end is the included offset or time where consumption should end; + it defaults to "newest" when the -f flag isn't provided, or the + maximum possible offset if it is. An offset may be specified as: diff --git a/consume_test.go b/consume_test.go index 5bd7267..7f0d492 100644 --- a/consume_test.go +++ b/consume_test.go @@ -783,13 +783,11 @@ func TestConsumeParseArgsUsesEnvVar(t *testing.T) { c := qt.New(t) defer c.Done() - c.Setenv("KT_TOPIC", "test-topic") c.Setenv("KT_BROKERS", "hans:2000") cmd0, _, err := parseCmd("hkt", "consume") c.Assert(err, qt.Equals, nil) cmd := cmd0.(*consumeCmd) - c.Assert(cmd.topic, qt.Equals, "test-topic") c.Assert(cmd.brokers, qt.DeepEquals, []string{"hans:2000"}) } @@ -798,12 +796,10 @@ func TestConsumeParseArgsDefault(t *testing.T) { c := qt.New(t) defer c.Done() - c.Setenv("KT_TOPIC", "") c.Setenv("KT_BROKERS", "") - cmd0, _, err := parseCmd("hkt", "consume", "-topic", "test-topic") + cmd0, _, err := parseCmd("hkt", "consume") c.Assert(err, qt.Equals, nil) cmd := cmd0.(*consumeCmd) - c.Assert(cmd.topic, qt.Equals, "test-topic") c.Assert(cmd.brokers, qt.DeepEquals, []string{"localhost:9092"}) } @@ -812,13 +808,11 @@ func TestConsumeParseArgsFlagsOverrideEnv(t *testing.T) { defer c.Done() // command line arg wins - c.Setenv("KT_TOPIC", "BLUBB") c.Setenv("KT_BROKERS", "BLABB") - cmd0, _, err := parseCmd("hkt", "consume", "-topic", "test-topic", "-brokers", "hans:2000") + cmd0, _, err := parseCmd("hkt", "consume", "-brokers", "hans:2000") c.Assert(err, qt.Equals, nil) cmd := cmd0.(*consumeCmd) - c.Assert(cmd.topic, qt.Equals, "test-topic") c.Assert(cmd.brokers, qt.DeepEquals, []string{"hans:2000"}) } diff --git a/group.go b/group.go index 558d06b..96d297b 100644 --- a/group.go +++ b/group.go @@ -71,7 +71,6 @@ func (cmd *groupCmd) addFlags(flags *flag.FlagSet) { func (cmd *groupCmd) environFlags() map[string]string { return map[string]string{ - "topic": "KT_TOPIC", "brokers": "KT_BROKERS", } } @@ -390,8 +389,8 @@ func (cmd *groupCmd) connect(broker *sarama.Broker) error { } var groupDocString = ` -The values for -topic and -brokers can also be set via environment variables KT_TOPIC and KT_BROKERS respectively. -The values supplied on the command line win over environment variable values. +The value for -brokers can also be set with the environment variable KT_BROKERS. +The value supplied on the command line takes precedence over the environment variable. The group command can be used to list groups, their offsets and lag and to reset a group's offset. diff --git a/produce.go b/produce.go index b027ced..610109b 100644 --- a/produce.go +++ b/produce.go @@ -79,7 +79,6 @@ func (cmd *produceCmd) addFlags(flags *flag.FlagSet) { func (cmd *produceCmd) environFlags() map[string]string { return map[string]string{ - "topic": "KT_TOPIC", "brokers": "KT_BROKERS", } } @@ -261,8 +260,8 @@ func (p producerPartitioner) MessageRequiresConsistency(m *sarama.ProducerMessag } var produceDocString = ` -The values for -topic and -brokers can also be set via environment variables KT_TOPIC and KT_BROKERS respectively. -The values supplied on the command line win over environment variable values. +The value for -brokers can also be set with the environment variable KT_BROKERS. +The value supplied on the command line takes precedence over the environment variable. Input is read from stdin and separated by newlines. diff --git a/produce_test.go b/produce_test.go index 7ece5c1..e150937 100644 --- a/produce_test.go +++ b/produce_test.go @@ -13,14 +13,12 @@ func TestProduceParseArgsUsesEnvVar(t *testing.T) { c := qt.New(t) defer c.Done() - c.Setenv("KT_TOPIC", "test-topic") c.Setenv("KT_BROKERS", "hans:2000") cmd0, _, err := parseCmd("hkt", "produce") c.Assert(err, qt.Equals, nil) cmd := cmd0.(*produceCmd) - c.Assert(cmd.topic, qt.Equals, "test-topic") c.Assert(cmd.brokers, qt.DeepEquals, []string{"hans:2000"}) } @@ -29,13 +27,11 @@ func TestProduceParseArgsDefault(t *testing.T) { c := qt.New(t) defer c.Done() - c.Setenv("KT_TOPIC", "") c.Setenv("KT_BROKERS", "") - cmd0, _, err := parseCmd("hkt", "produce", "-topic", "test-topic") + cmd0, _, err := parseCmd("hkt", "produce") c.Assert(err, qt.Equals, nil) cmd := cmd0.(*produceCmd) - c.Assert(cmd.topic, qt.Equals, "test-topic") c.Assert(cmd.brokers, qt.DeepEquals, []string{"localhost:9092"}) } @@ -44,13 +40,11 @@ func TestProduceParseArgsFlagsOverrideEnv(t *testing.T) { defer c.Done() // command line arg wins - c.Setenv("KT_TOPIC", "BLUBB") c.Setenv("KT_BROKERS", "BLABB") - cmd0, _, err := parseCmd("hkt", "produce", "-topic", "test-topic", "-brokers", "hans:2000") + cmd0, _, err := parseCmd("hkt", "produce", "-brokers", "hans:2000") c.Assert(err, qt.Equals, nil) cmd := cmd0.(*produceCmd) - c.Assert(cmd.topic, qt.Equals, "test-topic") c.Assert(cmd.brokers, qt.DeepEquals, []string{"hans:2000"}) } diff --git a/testdata/consume-with-key.txt b/testdata/consume-with-key.txt index a094481..d3c81d0 100644 --- a/testdata/consume-with-key.txt +++ b/testdata/consume-with-key.txt @@ -8,16 +8,16 @@ kt produce -valuecodec string -partitioner std -topic $topic # consuming with just the default (sarama) partitioner will only # get messages from the partition chosen by that. -kt consume -valuecodec string -key k1 -topic $topic +kt consume -valuecodec string -key k1 $topic cmp stdout consume-1-stdout.json # adding another partitioner will pull in that partition too -kt consume -valuecodec string -key k1 -partitioners sarama,std -topic $topic +kt consume -valuecodec string -key k1 -partitioners sarama,std $topic cmp stdout consume-2-stdout.json # specifying "all" will pull in the message that was sent to the # manually chosen partition too -kt consume -valuecodec string -key k1 -partitioners all -topic $topic +kt consume -valuecodec string -key k1 -partitioners all $topic cmp stdout consume-3-stdout.json -- topic-detail.json -- diff --git a/testdata/jsonmessages-consume-error.txt b/testdata/jsonmessages-consume-error.txt index 3d2d3e8..ef7f0f2 100644 --- a/testdata/jsonmessages-consume-error.txt +++ b/testdata/jsonmessages-consume-error.txt @@ -3,7 +3,7 @@ kt admin -createtopic $topic -topicdetail topic-detail.json stdin produce-stdin.json kt produce -valuecodec string -topic $topic -kt consume -topic $topic +kt consume $topic cmp stdout consume-stdout.json cmp stderr consume-stderr diff --git a/testdata/jsonmessages.txt b/testdata/jsonmessages.txt index 0e751f1..1184024 100644 --- a/testdata/jsonmessages.txt +++ b/testdata/jsonmessages.txt @@ -3,7 +3,7 @@ kt admin -createtopic $topic -topicdetail topic-detail.json stdin produce-stdin.json kt produce -topic $topic -kt consume -topic $topic +kt consume $topic cmp stdout consume-stdout.json -- topic-detail.json -- diff --git a/testdata/multipartition.txt b/testdata/multipartition.txt index 5ec736e..16aee66 100644 --- a/testdata/multipartition.txt +++ b/testdata/multipartition.txt @@ -3,10 +3,10 @@ kt admin -createtopic $topic -topicdetail topic-detail.json stdin produce-stdin.json kt produce -valuecodec string -topic $topic -kt consume -valuecodec string -topic $topic +kt consume -valuecodec string $topic cmp stdout consume-1-stdout.json -kt consume -valuecodec string -offsets 3=1:newest-1 -topic $topic +kt consume -valuecodec string $topic 3=1:newest-1 cmp stdout consume-2-stdout.json -- topic-detail.json -- diff --git a/testdata/system.txt b/testdata/system.txt index 87c5bac..0e78a24 100644 --- a/testdata/system.txt +++ b/testdata/system.txt @@ -9,7 +9,7 @@ kt produce -valuecodec string -topic $topic ! stdout . # 2 -kt consume -valuecodec string -f -topic $topic -timeout 500ms +kt consume -valuecodec string -f -timeout 500ms $topic stderr 'consuming from partition 0 timed out after 500ms' cmpenvjson stdout '{"value": "hello 1", "key": "boom", "partition": 0, "offset": 0, "time": "$now"}'