Skip to content

Commit

Permalink
cleanup
Browse files Browse the repository at this point in the history
  • Loading branch information
colmsnowplow committed Jun 19, 2024
1 parent 3ef65b9 commit 0da21e1
Showing 1 changed file with 2 additions and 36 deletions.
38 changes: 2 additions & 36 deletions pkg/target/kinesis.go
Original file line number Diff line number Diff line change
Expand Up @@ -124,6 +124,8 @@ func AdaptKinesisTargetFunc(f func(c *KinesisTargetConfig) (*KinesisTarget, erro
}
}

// Write pushes all messages to the required target
// At present it configures and calls the TargetStruct's Write method - but we should refactor such that we just have one Write function
func (kt *KinesisTarget) Write(messages []*models.Message) (*models.TargetWriteResult, error) {

kt.TargetStruct.AppendBatchTransforms = []batchtransform.BatchTransformationFunction{
Expand All @@ -135,42 +137,6 @@ func (kt *KinesisTarget) Write(messages []*models.Message) (*models.TargetWriteR
return kt.TargetStruct.Write(messages)
}

// // Write pushes all messages to the required target
// // TODO: Should each put be in its own goroutine?
// func (kt *KinesisTarget) Write(messages []*models.Message) (*models.TargetWriteResult, error) {
// kt.log.Debugf("Writing %d messages to stream ...", len(messages))

// // TODO: Replace with new batch transformation
// chunks, oversized := models.GetChunkedMessages(
// messages,
// kt.requestMaxMessages,
// kt.MaximumAllowedMessageSizeBytes(),
// kinesisPutRecordsRequestByteLimit,
// )

// writeResult := &models.TargetWriteResult{
// Oversized: oversized,
// }

// var errResult error

// for _, chunk := range chunks {
// res, err := kt.process(chunk)
// writeResult = writeResult.Append(res)

// if err != nil {
// errResult = multierror.Append(errResult, err)
// }
// }

// if errResult != nil {
// errResult = errors.Wrap(errResult, "Error writing messages to Kinesis stream")
// }

// kt.log.Debugf("Successfully wrote %d/%d messages", writeResult.SentCount, writeResult.Total())
// return writeResult, errResult
// }

func (kt *KinesisTarget) process(messages []*models.Message) (*models.TargetWriteResult, error) {
messageCount := int64(len(messages))
kt.log.Debugf("Writing chunk of %d messages to stream ...", messageCount)
Expand Down

0 comments on commit 0da21e1

Please sign in to comment.