Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add unit tests for the kafka api protocol, at the byte level #18

Open
wants to merge 1 commit into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
7 changes: 6 additions & 1 deletion src/KafkaNetClient/Common/BigEndianBinaryReader.cs
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,7 @@ public BigEndianBinaryReader(byte[] payload) : this(payload, 0, payload.Length)
}

public long Length { get { return base.BaseStream.Length; } }
public long Position { get { return base.BaseStream.Position; } set { base.BaseStream.Position = 0; } }
public long Position { get { return base.BaseStream.Position; } set { base.BaseStream.Position = value; } }
public bool HasData { get { return base.BaseStream.Position < base.BaseStream.Length; } }

public bool Available(int dataSize)
Expand Down Expand Up @@ -110,6 +110,11 @@ public override UInt64 ReadUInt64()
return EndianAwareRead(8, BitConverter.ToUInt64);
}

public byte[] ReadBytes()
{
return ReadIntPrefixedBytes();
}

public string ReadInt16String()
{
var size = ReadInt16();
Expand Down
12 changes: 12 additions & 0 deletions src/KafkaNetClient/Common/Extensions.cs
Original file line number Diff line number Diff line change
Expand Up @@ -256,5 +256,17 @@ public static Exception ExtractException(this Task task)

return new ApplicationException("Unknown exception occured.");
}

private static readonly DateTime UnixEpoch = new DateTime(1970, 1, 1, 0, 0, 0, DateTimeKind.Utc);

public static long ToUnixEpochMilliseconds(this DateTime pointInTime)
{
return pointInTime > UnixEpoch ? (long)(pointInTime - UnixEpoch).TotalMilliseconds : 0L;
}

public static DateTime FromUnixEpochMilliseconds(this long milliseconds)
{
return UnixEpoch.AddMilliseconds(milliseconds);
}
}
}
4 changes: 4 additions & 0 deletions src/KafkaNetClient/Interfaces/IKafkaRequest.cs
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,10 @@ public interface IKafkaRequest<out T>
/// </summary>
ApiKeyRequestType ApiKey { get; }

/// <summary>
/// This is a numeric version number for the api request. It allows the server to properly interpret the request as the protocol evolves. Responses will always be in the format corresponding to the request version.
/// </summary>
short ApiVersion { get; set; }
/// <summary>
/// Encode this request into the Kafka wire protocol.
/// </summary>
Expand Down
10 changes: 7 additions & 3 deletions src/KafkaNetClient/Protocol/BaseRequest.cs
Original file line number Diff line number Diff line change
Expand Up @@ -14,10 +14,9 @@ public abstract class BaseRequest
/// https://cwiki.apache.org/confluence/display/KAFKA/A+Guide+To+The+Kafka+Protocol
/// </summary>
protected const int ReplicaId = -1;

protected const Int16 ApiVersion = 0;
private string _clientId = "Kafka-Net";
private int _correlationId = 1;
private short _apiVersion = 0;

/// <summary>
/// Descriptive name of the source of the messages sent to kafka
Expand All @@ -30,6 +29,11 @@ public abstract class BaseRequest
/// </summary>
public int CorrelationId { get { return _correlationId; } set { _correlationId = value; } }

/// <summary>
/// This is a numeric version number for the api request. It allows the server to properly interpret the request as the protocol evolves. Responses will always be in the format corresponding to the request version.
/// </summary>
public short ApiVersion { get { return _apiVersion; } set { _apiVersion = value; } }

/// <summary>
/// Flag which tells the broker call to expect a response for this request.
/// </summary>
Expand All @@ -44,7 +48,7 @@ public static KafkaMessagePacker EncodeHeader<T>(IKafkaRequest<T> request)
{
return new KafkaMessagePacker()
.Pack(((Int16)request.ApiKey))
.Pack(ApiVersion)
.Pack(request.ApiVersion)
.Pack(request.CorrelationId)
.Pack(request.ClientId, StringPrefixEncoding.Int16);
}
Expand Down
8 changes: 6 additions & 2 deletions src/KafkaNetClient/Protocol/FetchRequest.cs
Original file line number Diff line number Diff line change
Expand Up @@ -39,7 +39,7 @@ public KafkaDataPayload Encode()

public IEnumerable<FetchResponse> Decode(byte[] payload)
{
return DecodeFetchResponse(payload);
return DecodeFetchResponse(ApiVersion, payload);
}

private KafkaDataPayload EncodeFetchRequest(FetchRequest request)
Expand Down Expand Up @@ -80,12 +80,16 @@ private KafkaDataPayload EncodeFetchRequest(FetchRequest request)
}
}

private IEnumerable<FetchResponse> DecodeFetchResponse(byte[] data)
private IEnumerable<FetchResponse> DecodeFetchResponse(int version, byte[] data)
{
using (var stream = new BigEndianBinaryReader(data))
{
var correlationId = stream.ReadInt32();

if (version >= 1) {
var throttleTime = stream.ReadInt32();
}

var topicCount = stream.ReadInt32();
for (int i = 0; i < topicCount; i++)
{
Expand Down
30 changes: 24 additions & 6 deletions src/KafkaNetClient/Protocol/Message.cs
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,10 @@ public class Message

/// <summary>
/// Attribute value outside message body used for added codec/compression info.
///
/// The lowest 3 bits contain the compression codec used for the message.
/// The fourth lowest bit represents the timestamp type. 0 stands for CreateTime and 1 stands for LogAppendTime. The producer should always set this bit to 0. (since 0.10.0)
/// All other bits should be set to 0.
/// </summary>
public byte Attribute { get; set; }

Expand All @@ -53,6 +57,10 @@ public class Message
/// The message body contents. Can contain compress message set.
/// </summary>
public byte[] Value { get; set; }
/// <summary>
/// This is the timestamp of the message. The timestamp type is indicated in the attributes. Unit is milliseconds since beginning of the epoch (midnight Jan 1, 1970 (UTC)).
/// </summary>
public DateTime Timestamp { get; set; }

/// <summary>
/// Construct an empty message.
Expand Down Expand Up @@ -139,11 +147,14 @@ public static byte[] EncodeMessage(Message message)
{
using (var stream = new KafkaMessagePacker())
{
return stream.Pack(message.MagicNumber)
.Pack(message.Attribute)
.Pack(message.Key)
.Pack(message.Value)
.CrcPayload();
stream.Pack(message.MagicNumber)
.Pack(message.Attribute);
if (message.MagicNumber >= 1) {
stream.Pack(message.Timestamp.ToUnixEpochMilliseconds());
}
return stream.Pack(message.Key)
.Pack(message.Value)
.CrcPayload();
}
}

Expand All @@ -167,9 +178,16 @@ public static IEnumerable<Message> DecodeMessage(long offset, byte[] payload)
Meta = new MessageMetadata { Offset = offset },
MagicNumber = stream.ReadByte(),
Attribute = stream.ReadByte(),
Key = stream.ReadIntPrefixedBytes()
};

if (message.MagicNumber >= 1) {
var timestamp = stream.ReadInt64();
if (timestamp >= 0) {
message.Timestamp = timestamp.FromUnixEpochMilliseconds();
}
}
message.Key = stream.ReadIntPrefixedBytes();

var codec = (MessageCodec)(ProtocolConstants.AttributeCodeMask & message.Attribute);
switch (codec)
{
Expand Down
23 changes: 20 additions & 3 deletions src/KafkaNetClient/Protocol/OffsetCommitRequest.cs
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,10 @@ namespace KafkaNet.Protocol
public class OffsetCommitRequest : BaseRequest, IKafkaRequest<OffsetCommitResponse>
{
public ApiKeyRequestType ApiKey { get { return ApiKeyRequestType.OffsetCommit; } }
public int GenerationId { get; set; }
public string MemberId { get; set; }
public string ConsumerGroup { get; set; }
public TimeSpan? OffsetRetention { get; set; }
public List<OffsetCommit> OffsetCommits { get; set; }

public KafkaDataPayload Encode()
Expand All @@ -31,6 +34,18 @@ private KafkaDataPayload EncodeOffsetCommitRequest(OffsetCommitRequest request)

using (var message = EncodeHeader(request).Pack(request.ConsumerGroup, StringPrefixEncoding.Int16))
{
if (request.ApiVersion >= 1) {
message.Pack(request.GenerationId)
.Pack(request.MemberId, StringPrefixEncoding.Int16);
}
if (request.ApiVersion >= 2) {
if (request.OffsetRetention.HasValue) {
message.Pack((long) request.OffsetRetention.Value.TotalMilliseconds);
} else {
message.Pack(-1L);
}
}

var topicGroups = request.OffsetCommits.GroupBy(x => x.Topic).ToList();
message.Pack(topicGroups.Count);

Expand All @@ -45,9 +60,11 @@ private KafkaDataPayload EncodeOffsetCommitRequest(OffsetCommitRequest request)
foreach (var commit in partition)
{
message.Pack(partition.Key)
.Pack(commit.Offset)
.Pack(commit.TimeStamp)
.Pack(commit.Metadata, StringPrefixEncoding.Int16);
.Pack(commit.Offset);
if (ApiVersion == 1) {
message.Pack(commit.TimeStamp);
}
message.Pack(commit.Metadata, StringPrefixEncoding.Int16);
}
}
}
Expand Down
24 changes: 21 additions & 3 deletions src/KafkaNetClient/Protocol/ProduceRequest.cs
Original file line number Diff line number Diff line change
Expand Up @@ -41,7 +41,7 @@ public KafkaDataPayload Encode()

public IEnumerable<ProduceResponse> Decode(byte[] payload)
{
return DecodeProduceResponse(payload);
return DecodeProduceResponse(ApiVersion, payload);
}

#region Protocol...
Expand Down Expand Up @@ -100,7 +100,7 @@ private KafkaDataPayload EncodeProduceRequest(ProduceRequest request)
}
}

private CompressedMessageResult CreateGzipCompressedMessage(IEnumerable<Message> messages)
private static CompressedMessageResult CreateGzipCompressedMessage(IEnumerable<Message> messages)
{
var messageSet = Message.EncodeMessageSet(messages);

Expand All @@ -119,7 +119,7 @@ private CompressedMessageResult CreateGzipCompressedMessage(IEnumerable<Message>
};
}

private IEnumerable<ProduceResponse> DecodeProduceResponse(byte[] data)
private static IEnumerable<ProduceResponse> DecodeProduceResponse(int version, byte[] data)
{
using (var stream = new BigEndianBinaryReader(data))
{
Expand All @@ -141,9 +141,20 @@ private IEnumerable<ProduceResponse> DecodeProduceResponse(byte[] data)
Offset = stream.ReadInt64()
};

if (version >= 2) {
var milliseconds = stream.ReadInt64();
if (milliseconds >= 0) {
response.Timestamp = milliseconds.FromUnixEpochMilliseconds();
}
}

yield return response;
}
}

if (version >= 2) {
var throttleTime = stream.ReadInt32();
}
}
}

Expand Down Expand Up @@ -177,6 +188,13 @@ public class ProduceResponse : IBaseResponse
/// The offset number to commit as completed.
/// </summary>
public long Offset { get; set; }
/// <summary>
/// If LogAppendTime is used for the topic, this is the timestamp assigned by the broker to the message set.
/// All the messages in the message set have the same timestamp.
/// If CreateTime is used, this field is always -1. The producer can assume the timestamp of the messages in the
/// produce request has been accepted by the broker if there is no error code returned.
/// </summary>
public DateTime? Timestamp { get; set; }

public override bool Equals(object obj)
{
Expand Down
103 changes: 102 additions & 1 deletion src/KafkaNetClient/Protocol/Protocol.cs
Original file line number Diff line number Diff line change
Expand Up @@ -127,6 +127,11 @@ public enum ErrorResponseCode : short
/// </summary>
OffsetMetadataTooLargeCode = 12,

/// <summary>
/// The server disconnected before a response was received.
/// </summary>
NetworkException = 13,

/// <summary>
/// The broker returns this error code for an offset fetch request if it is still loading offsets (after a leader change for that offsets topic partition).
/// </summary>
Expand All @@ -140,7 +145,103 @@ public enum ErrorResponseCode : short
/// <summary>
/// The broker returns this error code if it receives an offset fetch or commit request for a consumer group that it is not a coordinator for.
/// </summary>
NotCoordinatorForConsumerCode = 16
NotCoordinatorForConsumerCode = 16,

/// <summary>
/// The request attempted to perform an operation on an invalid topic.
/// </summary>
InvalidTopic = 17,

/// <summary>
/// The request included message batch larger than the configured segment size on the server.
/// </summary>
RecordListTooLarge = 18,

/// <summary>
/// Messages are rejected since there are fewer in-sync replicas than required.
/// </summary>
NotEnoughReplicas = 19,

/// <summary>
/// Messages are written to the log, but to fewer in-sync replicas than required.
/// </summary>
NotEnoughReplicasAfterAppend = 20,

/// <summary>
/// Produce request specified an invalid value for required acks.
/// </summary>
InvalidRequiredAcks = 21,

/// <summary>
/// Specified group generation id is not valid.
/// </summary>
IllegalGeneration = 22,

/// <summary>
/// The group member's supported protocols are incompatible with those of existing members.
/// </summary>
InconsistentGroupProtocol = 23,

/// <summary>
/// The configured groupId is invalid.
/// </summary>
InvalidGroupId = 24,

/// <summary>
/// The coordinator is not aware of this member.
/// </summary>
UnknownMemberId = 25,

/// <summary>
/// The session timeout is not within the range allowed by the broker (as configured
/// by group.min.session.timeout.ms and group.max.session.timeout.ms).
/// </summary>
InvalidSessionTimeout = 26,

/// <summary>
/// The group is rebalancing, so a rejoin is needed.
/// </summary>
RebalanceInProgress = 27,

/// <summary>
/// The committing offset data size is not valid
/// </summary>
InvalidCommitOffsetSize = 28,

/// <summary>
/// Not authorized to access topic.
/// </summary>
TopicAuthorizationFailed = 29,

/// <summary>
/// Not authorized to access group.
/// </summary>
GroupAuthorizationFailed = 30,

/// <summary>
/// Cluster authorization failed.
/// </summary>
ClusterAuthorizationFailed = 31,

/// <summary>
/// The timestamp of the message is out of acceptable range.
/// </summary>
InvalidTimestamp = 32,

/// <summary>
/// The broker does not support the requested SASL mechanism.
/// </summary>
UnsupportedSaslMechanism = 33,

/// <summary>
/// Request is not valid given the current SASL state.
/// </summary>
IllegalSaslState = 34,

/// <summary>
/// The version of API is not supported.
/// </summary>
UnsupportedVersion = 35
}

/// <summary>
Expand Down
Loading