Skip to content

Commit

Permalink
[exporter/awskinesisexporter] Add stream ARN parameter support (open-…
Browse files Browse the repository at this point in the history
…telemetry#33891)

Resolves open-telemetry#33891

Signed-off-by: Hong Chen <[email protected]>
  • Loading branch information
HongChenTW committed Sep 20, 2024
1 parent 2cc8cea commit ab8166d
Show file tree
Hide file tree
Showing 9 changed files with 70 additions and 6 deletions.
27 changes: 27 additions & 0 deletions .chloggen/awskinesisexporter-cross-account.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,27 @@
# Use this changelog template to create an entry for release notes.

# One of 'breaking', 'deprecation', 'new_component', 'enhancement', 'bug_fix'
change_type: enhancement

# The name of the component, or a single word describing the area of concern, (e.g. filelogreceiver)
component: awskinesisexporter

# A brief description of the change. Surround your text with quotes ("") if it needs to start with a backtick (`).
note: Add stream ARN parameter for the AWS Kinesis Data Stream exporter.

# Mandatory: One or more tracking issues related to the change. You can use the PR number here if no issue exists.
issues: [33891]

# (Optional) One or more lines of additional information to render under the primary note.
# These lines will be padded with 2 spaces and then inserted directly into the document.
# Use pipe (|) for multiline entries.
subtext:

# If your change doesn't affect end users or the exported elements of any package,
# you should instead start your pull request title with [chore] or use the "Skip Changelog" label.
# Optional: The change log or logs in which this entry should be included.
# e.g. '[user]' or '[user, api]'
# Include 'user' if the change is relevant to end users.
# Include 'api' if there is a change to a library API.
# Default: '[user]'
change_logs: []
1 change: 1 addition & 0 deletions exporter/awskinesisexporter/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@ import (
// AWSConfig contains AWS specific configuration such as awskinesis stream, region, etc.
type AWSConfig struct {
StreamName string `mapstructure:"stream_name"`
StreamARN string `mapstructure:"stream_arn"`
KinesisEndpoint string `mapstructure:"kinesis_endpoint"`
Region string `mapstructure:"region"`
Role string `mapstructure:"role"`
Expand Down
1 change: 1 addition & 0 deletions exporter/awskinesisexporter/config_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -67,6 +67,7 @@ func TestLoadConfig(t *testing.T) {
},
AWS: AWSConfig{
StreamName: "test-stream",
StreamARN: "arn:aws:kinesis:mars-1:123456789012:stream/test-stream",
KinesisEndpoint: "awskinesis.mars-1.aws.galactic",
Region: "mars-1",
Role: "arn:test-role",
Expand Down
1 change: 1 addition & 0 deletions exporter/awskinesisexporter/exporter.go
Original file line number Diff line number Diff line change
Expand Up @@ -83,6 +83,7 @@ func createExporter(ctx context.Context, c component.Config, log *zap.Logger, op
producer, err := producer.NewBatcher(
options.NewKinesisClient(awsconf, kinesisOpts...),
conf.AWS.StreamName,
conf.AWS.StreamARN,
producer.WithLogger(log),
)
if err != nil {
Expand Down
2 changes: 2 additions & 0 deletions exporter/awskinesisexporter/exporter_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@ func TestCreatingExporter(t *testing.T) {
name: "Default configuration",
conf: applyConfigChanges(func(conf *Config) {
conf.AWS.StreamName = "example-test"
conf.AWS.StreamARN = "arn:aws:kinesis:us-west-2:123456789012:stream/example-test"
}),
validateNew: func(tb testing.TB) func(conf aws.Config, opts ...func(*kinesis.Options)) *kinesis.Client {
return func(conf aws.Config, opts ...func(*kinesis.Options)) *kinesis.Client {
Expand All @@ -45,6 +46,7 @@ func TestCreatingExporter(t *testing.T) {
name: "Apply different region",
conf: applyConfigChanges(func(conf *Config) {
conf.AWS.StreamName = "example-test"
conf.AWS.StreamName = "arn:aws:kinesis:us-east-1:123456789012:stream/example-test"
conf.AWS.Region = "us-east-1"
conf.AWS.Role = "example-role"
}),
Expand Down
15 changes: 13 additions & 2 deletions exporter/awskinesisexporter/internal/producer/batcher.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ import (

type batcher struct {
stream *string
arn *string

client Kinesis
log *zap.Logger
Expand All @@ -32,12 +33,20 @@ var (
permanentErrInvalidArgument = new(*types.InvalidArgumentException)
)

func NewBatcher(kinesisAPI Kinesis, stream string, opts ...BatcherOptions) (Batcher, error) {
func NewBatcher(kinesisAPI Kinesis, stream string, arn string, opts ...BatcherOptions) (Batcher, error) {
be := &batcher{
stream: aws.String(stream),
client: kinesisAPI,
log: zap.NewNop(),
}

if stream != "" {
be.stream = aws.String(stream)
}

if arn != "" {
be.arn = aws.String(arn)
}

for _, opt := range opts {
if err := opt(be); err != nil {
return nil, err
Expand All @@ -50,6 +59,7 @@ func (b *batcher) Put(ctx context.Context, bt *batch.Batch) error {
for _, records := range bt.Chunk() {
out, err := b.client.PutRecords(ctx, &kinesis.PutRecordsInput{
StreamName: b.stream,
StreamARN: b.arn,
Records: records,
})

Expand All @@ -75,6 +85,7 @@ func (b *batcher) Put(ctx context.Context, bt *batch.Batch) error {
func (b *batcher) Ready(ctx context.Context) error {
_, err := b.client.DescribeStream(ctx, &kinesis.DescribeStreamInput{
StreamName: b.stream,
StreamARN: b.arn,
})
return err
}
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@ import (

func benchXEmptyMessages(b *testing.B, msgCount int) {
producer, err := producer.NewBatcher(SetPutRecordsOperation(SuccessfulPutRecordsOperation), "benchmark-stream",
producer.WithLogger(zaptest.NewLogger(b)),
"arn:aws:kinesis:mars-1:123456789012:stream/benchmark-stream", producer.WithLogger(zaptest.NewLogger(b)),
)

require.NoError(b, err, "Must have a valid producer")
Expand Down
26 changes: 23 additions & 3 deletions exporter/awskinesisexporter/internal/producer/batcher_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -68,13 +68,32 @@ func TestBatchedExporter(t *testing.T) {

cases := []struct {
name string
arn string
PutRecordsOP func(*kinesis.PutRecordsInput) (*kinesis.PutRecordsOutput, error)
shouldErr bool
isPermanent bool
}{
{name: "Successful put to kinesis", PutRecordsOP: SuccessfulPutRecordsOperation, shouldErr: false, isPermanent: false},
{name: "Invalid kinesis configuration", PutRecordsOP: HardFailedPutRecordsOperation, shouldErr: true, isPermanent: true},
{name: "Test throttled kinesis operation", PutRecordsOP: TransiantPutRecordsOperation(2), shouldErr: true, isPermanent: false},
{
name: "Successful put to kinesis",
arn: "arn:aws:kinesis:mars-1:123456789012:stream/test-stream",
PutRecordsOP: SuccessfulPutRecordsOperation,
shouldErr: false,
isPermanent: false,
},
{
name: "Invalid kinesis configuration",
arn: "arn:aws:kinesis:mars-1:123456789012:stream/test-stream",
PutRecordsOP: HardFailedPutRecordsOperation,
shouldErr: true,
isPermanent: true,
},
{
name: "Test throttled kinesis operation",
arn: "arn:aws:kinesis:mars-1:123456789012:stream/test-stream",
PutRecordsOP: TransiantPutRecordsOperation(2),
shouldErr: true,
isPermanent: false,
},
}

bt := batch.New()
Expand All @@ -88,6 +107,7 @@ func TestBatchedExporter(t *testing.T) {
be, err := producer.NewBatcher(
SetPutRecordsOperation(tc.PutRecordsOP),
tc.name,
tc.arn,
producer.WithLogger(zaptest.NewLogger(t)),
)
require.NoError(t, err, "Must not error when creating BatchedExporter")
Expand Down
1 change: 1 addition & 0 deletions exporter/awskinesisexporter/testdata/config.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ awskinesis:
max_record_size: 1000
aws:
stream_name: test-stream
stream_arn: arn:aws:kinesis:mars-1:123456789012:stream/test-stream
region: mars-1
role: arn:test-role
kinesis_endpoint: awskinesis.mars-1.aws.galactic
Expand Down

0 comments on commit ab8166d

Please sign in to comment.