From 4f21b056f9c136a12db17527c91d5795ecf2cf60 Mon Sep 17 00:00:00 2001 From: Jason Wang Date: Wed, 21 Aug 2019 16:09:26 -0700 Subject: [PATCH 1/3] Retain user record partition keys (#1) --- aggregator.go | 18 ++++++++++-------- aggregator_test.go | 2 +- 2 files changed, 11 insertions(+), 9 deletions(-) diff --git a/aggregator.go b/aggregator.go index 4959d1d..3c0e120 100644 --- a/aggregator.go +++ b/aggregator.go @@ -31,14 +31,16 @@ func (a *Aggregator) Count() int { // Put record using `data` and `partitionKey`. This method is thread-safe. func (a *Aggregator) Put(data []byte, partitionKey string) { - // For now, all records in the aggregated record will have - // the same partition key. - // later, we will add shard-mapper same as the KPL use. - // see: https://github.com/a8m/kinesis-producer/issues/1 - if len(a.pkeys) == 0 { - a.pkeys = []string{partitionKey} - a.nbytes += len([]byte(partitionKey)) - } + // a8m: For now, all records in the aggregated record will have + // the same partition key. + // later, we will add shard-mapper same as the KPL use. + // see: https://github.com/a8m/kinesis-producer/issues/1 + // jawang35: In this fork I'm allowing user records to retain + // their individual partition keys with the + // understanding that shard-mapping is not implemented + // like it is in the KPL. + a.pkeys = append(a.pkeys, partitionKey) + a.nbytes += len([]byte(partitionKey)) keyIndex := uint64(len(a.pkeys) - 1) a.nbytes += partitionKeyIndexSize diff --git a/aggregator_test.go b/aggregator_test.go index 827d631..419d87b 100644 --- a/aggregator_test.go +++ b/aggregator_test.go @@ -22,7 +22,7 @@ func TestSizeAndCount(t *testing.T) { for i := 0; i < n; i++ { a.Put(data, pkey) } - assert(t, a.Size() == 5*n+5+8*n, "size should equal to the data and the partition-key") + assert(t, a.Size() == 5*n+5*n+8*n, "size should equal to the data and the partition-keys") assert(t, a.Count() == n, "count should be equal to the number of Put calls") } From 5a22c0c944b4cdc5c03b5ce09a54f3d7f6bb7f7d Mon Sep 17 00:00:00 2001 From: Jason Wang Date: Mon, 26 Aug 2019 12:02:54 -0700 Subject: [PATCH 2/3] Fix max record size check (#2) * Fix max record size check * Add message index and wire type to protobuf message size * Revert >= change --- aggregator.go | 9 +-------- aggregator_test.go | 2 +- 2 files changed, 2 insertions(+), 9 deletions(-) diff --git a/aggregator.go b/aggregator.go index 3c0e120..21c1d4c 100644 --- a/aggregator.go +++ b/aggregator.go @@ -31,18 +31,11 @@ func (a *Aggregator) Count() int { // Put record using `data` and `partitionKey`. This method is thread-safe. func (a *Aggregator) Put(data []byte, partitionKey string) { - // a8m: For now, all records in the aggregated record will have - // the same partition key. - // later, we will add shard-mapper same as the KPL use. - // see: https://github.com/a8m/kinesis-producer/issues/1 - // jawang35: In this fork I'm allowing user records to retain - // their individual partition keys with the - // understanding that shard-mapping is not implemented - // like it is in the KPL. a.pkeys = append(a.pkeys, partitionKey) a.nbytes += len([]byte(partitionKey)) keyIndex := uint64(len(a.pkeys) - 1) + a.nbytes++ // message index and wire type a.nbytes += partitionKeyIndexSize a.buf = append(a.buf, &Record{ Data: data, diff --git a/aggregator_test.go b/aggregator_test.go index 419d87b..61a934a 100644 --- a/aggregator_test.go +++ b/aggregator_test.go @@ -22,7 +22,7 @@ func TestSizeAndCount(t *testing.T) { for i := 0; i < n; i++ { a.Put(data, pkey) } - assert(t, a.Size() == 5*n+5*n+8*n, "size should equal to the data and the partition-keys") + assert(t, a.Size() == n+5*n+5*n+8*n, "size should equal to the data and the partition-keys") assert(t, a.Count() == n, "count should be equal to the number of Put calls") } From 1958d2258fd8dd6fd7b83104c860ee068df1468e Mon Sep 17 00:00:00 2001 From: jawang35 Date: Mon, 26 Aug 2019 13:06:45 -0700 Subject: [PATCH 3/3] Update comments --- aggregator.go | 2 +- aggregator_test.go | 2 +- 2 files changed, 2 insertions(+), 2 deletions(-) diff --git a/aggregator.go b/aggregator.go index 21c1d4c..ea66394 100644 --- a/aggregator.go +++ b/aggregator.go @@ -35,7 +35,7 @@ func (a *Aggregator) Put(data []byte, partitionKey string) { a.nbytes += len([]byte(partitionKey)) keyIndex := uint64(len(a.pkeys) - 1) - a.nbytes++ // message index and wire type + a.nbytes++ // protobuf message index and wire type a.nbytes += partitionKeyIndexSize a.buf = append(a.buf, &Record{ Data: data, diff --git a/aggregator_test.go b/aggregator_test.go index 61a934a..0001194 100644 --- a/aggregator_test.go +++ b/aggregator_test.go @@ -22,7 +22,7 @@ func TestSizeAndCount(t *testing.T) { for i := 0; i < n; i++ { a.Put(data, pkey) } - assert(t, a.Size() == n+5*n+5*n+8*n, "size should equal to the data and the partition-keys") + assert(t, a.Size() == n+5*n+5*n+8*n, "size should equal to size of data, partition-keys, partition key indexes, and protobuf wire type") assert(t, a.Count() == n, "count should be equal to the number of Put calls") }