diff --git a/kafka/adminapi.go b/kafka/adminapi.go index 07fa5617e..6e612def1 100644 --- a/kafka/adminapi.go +++ b/kafka/adminapi.go @@ -995,26 +995,30 @@ type AlterUserScramCredentialsResult struct { Errors map[string]Error } -// OffsetSpec specifies desired offsets while using ListOffsets. +// OffsetSpec specifies the desired offsets for ListOffsets. Use one of the +// defined constants or NewOffsetSpecForTimestamp. type OffsetSpec int64 const ( - // MaxTimestampOffsetSpec is used to describe the offset with the Max Timestamp which may be different then LatestOffsetSpec as Timestamp can be set client side. - MaxTimestampOffsetSpec = OffsetSpec(C.RD_KAFKA_OFFSET_SPEC_MAX_TIMESTAMP) - // EarliestOffsetSpec is used to describe the earliest offset for the TopicPartition. - EarliestOffsetSpec = OffsetSpec(C.RD_KAFKA_OFFSET_SPEC_EARLIEST) - // LatestOffsetSpec is used to describe the latest offset for the TopicPartition. - LatestOffsetSpec = OffsetSpec(C.RD_KAFKA_OFFSET_SPEC_LATEST) + // MaxTimestampOffsetSpec retrieves the offset with the largest Timestamp. + // This may be different then LatestOffsetSpec as Timestamps can be set client side. + MaxTimestampOffsetSpec OffsetSpec = C.RD_KAFKA_OFFSET_SPEC_MAX_TIMESTAMP + // EarliestOffsetSpec retrieves the earliest offset for the TopicPartition. + EarliestOffsetSpec OffsetSpec = C.RD_KAFKA_OFFSET_SPEC_EARLIEST + // LatestOffsetSpec retrieves the latest offset for the TopicPartition. + LatestOffsetSpec OffsetSpec = C.RD_KAFKA_OFFSET_SPEC_LATEST ) -// NewOffsetSpecForTimestamp creates an OffsetSpec corresponding to the timestamp. -func NewOffsetSpecForTimestamp(timestamp int64) OffsetSpec { - return OffsetSpec(timestamp) +// NewOffsetSpecForTimestamp creates an OffsetSpec to retrieve the earliest offset whose +// timestamp is greater than or equal to the timestamp in Unix milliseconds. +func NewOffsetSpecForTimestamp(timestamp_ms int64) OffsetSpec { + return OffsetSpec(timestamp_ms) } -// ListOffsetsResultInfo describes the result of ListOffsets request for a Topic Partition. +// ListOffsetsResultInfo describes the result of a ListOffsets request for one Topic Partition. type ListOffsetsResultInfo struct { - Offset Offset + Offset Offset + // Timestamp is in Unix milliseconds. Timestamp int64 LeaderEpoch *int32 Error Error @@ -3218,15 +3222,16 @@ func (a *AdminClient) DescribeUserScramCredentials( return result, nil } -// ListOffsets describe offsets for the -// specified TopicPartiton based on an OffsetSpec. +// ListOffsets returns offsets for a set of +// TopicPartitions based on an OffsetSpec. // // Parameters: // // - `ctx` - context with the maximum amount of time to block, or nil for // indefinite. -// - `topicPartitionOffsets` - a map from TopicPartition to OffsetSpec, it -// holds either the OffsetSpec enum value or timestamp. Must not be nil. +// - `topicPartitionOffsets` - a map from TopicPartition to OffsetSpec. An +// OffsetSpec is either a defined enum value or a timestamp created by +// NewOffsetSpecForTimestamp. Must not be nil. // - `options` - ListOffsetsAdminOption options. // // Returns a ListOffsetsResult. diff --git a/kafka/consumer.go b/kafka/consumer.go index efc54d00e..61ce32a3f 100644 --- a/kafka/consumer.go +++ b/kafka/consumer.go @@ -736,7 +736,7 @@ func (c *Consumer) GetWatermarkOffsets(topic string, partition int32) (low, high // // The timestamps to query are represented as `.Offset` in the `times` // argument and the looked up offsets are represented as `.Offset` in the returned -// `offsets` list. +// `offsets` list. Timestamps are in Unix milliseconds. // // The function will block for at most timeoutMs milliseconds. // diff --git a/kafka/error.go b/kafka/error.go index 06c94bb4e..86401ab2d 100644 --- a/kafka/error.go +++ b/kafka/error.go @@ -31,7 +31,8 @@ import ( "unsafe" ) -// Error provides a Kafka-specific error container +// Error provides a Kafka-specific error container. Check that Code() != ErrorNoError +// when embedded in result structs. type Error struct { code ErrorCode str string @@ -85,8 +86,8 @@ func newErrorFromCErrorDestroy(cError *C.rd_kafka_error_t) Error { return newErrorFromCError(cError) } -// Error returns a human readable representation of an Error -// Same as Error.String() +// Error returns a human readable representation of an Error. +// Same as Error.String(). func (e Error) Error() string { return e.String() } @@ -107,7 +108,7 @@ func (e Error) String() string { return errstr } -// Code returns the ErrorCode of an Error +// Code returns the ErrorCode of this Error. func (e Error) Code() ErrorCode { return e.code } diff --git a/kafka/error_gen.go b/kafka/error_gen.go index b90af61a0..478e36fee 100644 --- a/kafka/error_gen.go +++ b/kafka/error_gen.go @@ -73,7 +73,8 @@ func WriteErrorCodes(f *os.File) { */ import "C" -// ErrorCode is the integer representation of local and broker error codes +// ErrorCode is the integer representation of local and broker error codes. +// ErrNoError (0) represents success. type ErrorCode int // String returns a human readable representation of an error code diff --git a/kafka/offset.go b/kafka/offset.go index c0d27f95e..0d568775b 100644 --- a/kafka/offset.go +++ b/kafka/offset.go @@ -129,7 +129,7 @@ func OffsetTail(relativeOffset Offset) Offset { // // The timestamps to query are represented as `.Offset` in the `times` // argument and the looked up offsets are represented as `.Offset` in the returned -// `offsets` list. +// `offsets` list. Timestamps are in Unix milliseconds. // // The function will block for at most timeoutMs milliseconds. // diff --git a/kafka/producer.go b/kafka/producer.go index 04003c9c7..dc7f3488a 100644 --- a/kafka/producer.go +++ b/kafka/producer.go @@ -726,7 +726,7 @@ func (p *Producer) QueryWatermarkOffsets(topic string, partition int32, timeoutM // // The timestamps to query are represented as `.Offset` in the `times` // argument and the looked up offsets are represented as `.Offset` in the returned -// `offsets` list. +// `offsets` list. Timestamps are in Unix milliseconds. // // The function will block for at most timeoutMs milliseconds. //