diff --git a/pkg/target/kinesis.go b/pkg/target/kinesis.go index 3e988149..843f11cd 100644 --- a/pkg/target/kinesis.go +++ b/pkg/target/kinesis.go @@ -21,10 +21,10 @@ import ( "github.com/pkg/errors" log "github.com/sirupsen/logrus" - "github.com/snowplow/snowbridge/pkg/batchtransform" "github.com/snowplow/snowbridge/pkg/common" "github.com/snowplow/snowbridge/pkg/models" "github.com/snowplow/snowbridge/pkg/target/targetiface" + batchtransform "github.com/snowplow/snowbridge/pkg/transform/batch" ) const ( @@ -131,7 +131,7 @@ func (kt *KinesisTarget) Write(messages []*models.Message) (*models.TargetWriteR kt.TargetStruct.AppendBatchTransforms = []batchtransform.BatchTransformationFunction{ chunkBatcherWithConfig(kt.requestMaxMessages, kt.MaximumAllowedMessageSizeBytes(), kinesisPutRecordsRequestByteLimit), } - kt.TargetStruct.Process = func(batch models.MessageBatch) (*models.TargetWriteResult, error) { + kt.TargetStruct.Process = func(batch *models.MessageBatch) (*models.TargetWriteResult, error) { return kt.process(batch.OriginalMessages) } return kt.TargetStruct.Write(messages) diff --git a/pkg/target/targetiface/target.go b/pkg/target/targetiface/target.go index 1c86640a..d445c720 100644 --- a/pkg/target/targetiface/target.go +++ b/pkg/target/targetiface/target.go @@ -27,7 +27,7 @@ type Target interface { } // TargetProcessFunc defines the API for each target's implementation to handle sending a batch of data. -type TargetProcessFunc func(models.MessageBatch) (*models.TargetWriteResult, error) +type TargetProcessFunc func(*models.MessageBatch) (*models.TargetWriteResult, error) // TargetStruct is an experiment type TargetStruct struct {