From 0da21e105219e08d7331525f69ac13038de93416 Mon Sep 17 00:00:00 2001 From: colmsnowplow Date: Wed, 19 Jun 2024 16:03:14 +0100 Subject: [PATCH] cleanup --- pkg/target/kinesis.go | 38 ++------------------------------------ 1 file changed, 2 insertions(+), 36 deletions(-) diff --git a/pkg/target/kinesis.go b/pkg/target/kinesis.go index 790239c7..3e988149 100644 --- a/pkg/target/kinesis.go +++ b/pkg/target/kinesis.go @@ -124,6 +124,8 @@ func AdaptKinesisTargetFunc(f func(c *KinesisTargetConfig) (*KinesisTarget, erro } } +// Write pushes all messages to the required target +// At present it configures and calls the TargetStruct's Write method - but we should refactor such that we just have one Write function func (kt *KinesisTarget) Write(messages []*models.Message) (*models.TargetWriteResult, error) { kt.TargetStruct.AppendBatchTransforms = []batchtransform.BatchTransformationFunction{ @@ -135,42 +137,6 @@ func (kt *KinesisTarget) Write(messages []*models.Message) (*models.TargetWriteR return kt.TargetStruct.Write(messages) } -// // Write pushes all messages to the required target -// // TODO: Should each put be in its own goroutine? -// func (kt *KinesisTarget) Write(messages []*models.Message) (*models.TargetWriteResult, error) { -// kt.log.Debugf("Writing %d messages to stream ...", len(messages)) - -// // TODO: Replace with new batch transformation -// chunks, oversized := models.GetChunkedMessages( -// messages, -// kt.requestMaxMessages, -// kt.MaximumAllowedMessageSizeBytes(), -// kinesisPutRecordsRequestByteLimit, -// ) - -// writeResult := &models.TargetWriteResult{ -// Oversized: oversized, -// } - -// var errResult error - -// for _, chunk := range chunks { -// res, err := kt.process(chunk) -// writeResult = writeResult.Append(res) - -// if err != nil { -// errResult = multierror.Append(errResult, err) -// } -// } - -// if errResult != nil { -// errResult = errors.Wrap(errResult, "Error writing messages to Kinesis stream") -// } - -// kt.log.Debugf("Successfully wrote %d/%d messages", writeResult.SentCount, writeResult.Total()) -// return writeResult, errResult -// } - func (kt *KinesisTarget) process(messages []*models.Message) (*models.TargetWriteResult, error) { messageCount := int64(len(messages)) kt.log.Debugf("Writing chunk of %d messages to stream ...", messageCount)