diff --git a/aggregator.go b/aggregator.go index 4959d1d..ea66394 100644 --- a/aggregator.go +++ b/aggregator.go @@ -31,16 +31,11 @@ func (a *Aggregator) Count() int { // Put record using `data` and `partitionKey`. This method is thread-safe. func (a *Aggregator) Put(data []byte, partitionKey string) { - // For now, all records in the aggregated record will have - // the same partition key. - // later, we will add shard-mapper same as the KPL use. - // see: https://github.com/a8m/kinesis-producer/issues/1 - if len(a.pkeys) == 0 { - a.pkeys = []string{partitionKey} - a.nbytes += len([]byte(partitionKey)) - } + a.pkeys = append(a.pkeys, partitionKey) + a.nbytes += len([]byte(partitionKey)) keyIndex := uint64(len(a.pkeys) - 1) + a.nbytes++ // protobuf message index and wire type a.nbytes += partitionKeyIndexSize a.buf = append(a.buf, &Record{ Data: data, diff --git a/aggregator_test.go b/aggregator_test.go index 827d631..0001194 100644 --- a/aggregator_test.go +++ b/aggregator_test.go @@ -22,7 +22,7 @@ func TestSizeAndCount(t *testing.T) { for i := 0; i < n; i++ { a.Put(data, pkey) } - assert(t, a.Size() == 5*n+5+8*n, "size should equal to the data and the partition-key") + assert(t, a.Size() == n+5*n+5*n+8*n, "size should equal to size of data, partition-keys, partition key indexes, and protobuf wire type") assert(t, a.Count() == n, "count should be equal to the number of Put calls") }