Skip to content

Commit

Permalink
feat: Added kinesis exporter and respective tests. (#2575)
Browse files Browse the repository at this point in the history
* Kinesis exporter basic implementation

* This fixes issue #2557

Kinesis exporter implementation with tests

* Added example how to export data from kinesis

* Kinesis exporter page

* Added relay configuration
Changed partition key field to be calculatable value based on event data with default lambda resolving into value 'default'
Fixed tests improved docs

* Improved kinesisexporter package code test coverage to 84.5%
Added workaround to skip sending empty dataset

* comment fix

* Revert "comment fix"

This reverts commit cb8faea.

* only test
  • Loading branch information
baitcode authored Oct 31, 2024
1 parent 28e398a commit 2a249db
Show file tree
Hide file tree
Showing 15 changed files with 775 additions and 4 deletions.
5 changes: 4 additions & 1 deletion .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -42,4 +42,7 @@ go-feature-flag-relay-proxy/
node_modules/
oryxBuildBinary

./.sonarlint
./.sonarlint

# Localstack cache
volume
1 change: 1 addition & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -553,6 +553,7 @@ It collects all variation events and can save these events in several locations:
- **Local file** *- create local files with the variation usages.*
- **Log** *- use your logger to write the variation usages.*
- **AWS S3** *- export your variation usages to S3.*
- **AWS Kinesis** *- publish your variation usages to AWS Kinesis Stream.*
- **Google Cloud Storage** *- export your variation usages to Google Cloud Storage.*
- **Webhook** *- export your variation usages by calling a webhook.*
- **AWS SQS** *- export your variation usages by sending events to SQS.*
Expand Down
8 changes: 7 additions & 1 deletion cmd/relayproxy/config/exporter.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,8 @@ type ExporterConf struct {
Kafka kafkaexporter.Settings `mapstructure:"kafka" koanf:"kafka"`
ProjectID string `mapstructure:"projectID" koanf:"projectid"`
Topic string `mapstructure:"topic" koanf:"topic"`
StreamArn string `mapstructure:"streamArn" koanf:"streamarn"`
StreamName string `mapstructure:"streamName" koanf:"streamname"`
}

func (c *ExporterConf) IsValid() error {
Expand All @@ -40,6 +42,9 @@ func (c *ExporterConf) IsValid() error {
if (c.Kind == S3Exporter || c.Kind == GoogleStorageExporter) && c.Bucket == "" {
return fmt.Errorf("invalid exporter: no \"bucket\" property found for kind \"%s\"", c.Kind)
}
if (c.Kind == KinesisExporter) && (c.StreamArn == "" && c.StreamName == "") {
return fmt.Errorf("invalid exporter: no \"streamArn\" or \"streamName\" property found for kind \"%s\"", c.Kind)
}
if c.Kind == WebhookExporter && c.EndpointURL == "" {
return fmt.Errorf("invalid exporter: no \"endpointUrl\" property found for kind \"%s\"", c.Kind)
}
Expand Down Expand Up @@ -70,6 +75,7 @@ const (
WebhookExporter ExporterKind = "webhook"
LogExporter ExporterKind = "log"
S3Exporter ExporterKind = "s3"
KinesisExporter ExporterKind = "kinesis"
GoogleStorageExporter ExporterKind = "googleStorage"
SQSExporter ExporterKind = "sqs"
KafkaExporter ExporterKind = "kafka"
Expand All @@ -80,7 +86,7 @@ const (
func (r ExporterKind) IsValid() error {
switch r {
case FileExporter, WebhookExporter, LogExporter, S3Exporter, GoogleStorageExporter, SQSExporter, KafkaExporter,
PubSubExporter:
PubSubExporter, KinesisExporter:
return nil
}
return fmt.Errorf("invalid exporter: kind \"%s\" is not supported", r)
Expand Down
18 changes: 18 additions & 0 deletions cmd/relayproxy/config/exporter_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@ func TestExporterConf_IsValid(t *testing.T) {
QueueURL string
ProjectID string
Topic string
StreamName string
}
tests := []struct {
name string
Expand Down Expand Up @@ -60,6 +61,14 @@ func TestExporterConf_IsValid(t *testing.T) {
wantErr: true,
errValue: "invalid exporter: no \"bucket\" property found for kind \"s3\"",
},
{
name: "kind kinesis without stream",
fields: fields{
Kind: "kinesis",
},
wantErr: true,
errValue: "invalid exporter: no \"streamArn\" or \"streamName\" property found for kind \"kinesis\"",
},
{
name: "kind googleStorage without bucket",
fields: fields{
Expand Down Expand Up @@ -150,6 +159,14 @@ func TestExporterConf_IsValid(t *testing.T) {
},
wantErr: false,
},
{
name: "kind Kinesis valid",
fields: fields{
Kind: "kinesis",
StreamName: "test-stream",
},
wantErr: false,
},
{
name: "kind SQS with queueURL",
fields: fields{
Expand Down Expand Up @@ -204,6 +221,7 @@ func TestExporterConf_IsValid(t *testing.T) {
QueueURL: tt.fields.QueueURL,
ProjectID: tt.fields.ProjectID,
Topic: tt.fields.Topic,
StreamName: tt.fields.StreamName,
}
err := c.IsValid()
assert.Equal(t, tt.wantErr, err != nil)
Expand Down
15 changes: 14 additions & 1 deletion cmd/relayproxy/service/gofeatureflag.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@ import (
"github.com/thomaspoignant/go-feature-flag/exporter/fileexporter"
"github.com/thomaspoignant/go-feature-flag/exporter/gcstorageexporter"
"github.com/thomaspoignant/go-feature-flag/exporter/kafkaexporter"
"github.com/thomaspoignant/go-feature-flag/exporter/kinesisexporter"
"github.com/thomaspoignant/go-feature-flag/exporter/logsexporter"
"github.com/thomaspoignant/go-feature-flag/exporter/pubsubexporter"
"github.com/thomaspoignant/go-feature-flag/exporter/s3exporterv2"
Expand Down Expand Up @@ -260,6 +261,19 @@ func createExporter(c *config.ExporterConf) (exporter.CommonExporter, error) {
ParquetCompressionCodec: parquetCompressionCodec,
AwsConfig: &awsConfig,
}, nil
case config.KinesisExporter:
awsConfig, err := awsConf.LoadDefaultConfig(context.Background())
if err != nil {
return nil, err
}
return &kinesisexporter.Exporter{
Format: format,
AwsConfig: &awsConfig,
Settings: kinesisexporter.NewSettings(
kinesisexporter.WithStreamArn(c.StreamArn),
kinesisexporter.WithStreamName(c.StreamName),
),
}, nil
case config.GoogleStorageExporter:
return &gcstorageexporter.Exporter{
Bucket: c.Bucket,
Expand All @@ -274,7 +288,6 @@ func createExporter(c *config.ExporterConf) (exporter.CommonExporter, error) {
if err != nil {
return nil, err
}

return &sqsexporter.Exporter{
QueueURL: c.QueueURL,
AwsConfig: &awsConfig,
Expand Down
21 changes: 21 additions & 0 deletions cmd/relayproxy/service/gofeatureflag_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@ import (
"github.com/thomaspoignant/go-feature-flag/exporter/fileexporter"
"github.com/thomaspoignant/go-feature-flag/exporter/gcstorageexporter"
"github.com/thomaspoignant/go-feature-flag/exporter/kafkaexporter"
"github.com/thomaspoignant/go-feature-flag/exporter/kinesisexporter"
"github.com/thomaspoignant/go-feature-flag/exporter/logsexporter"
"github.com/thomaspoignant/go-feature-flag/exporter/pubsubexporter"
"github.com/thomaspoignant/go-feature-flag/exporter/s3exporterv2"
Expand Down Expand Up @@ -372,6 +373,26 @@ func Test_initExporter(t *testing.T) {
},
wantType: &kafkaexporter.Exporter{},
},
{
name: "AWS Kinesis Exporter",
wantErr: assert.NoError,
conf: &config.ExporterConf{
Kind: "kinesis",
StreamName: "my-stream",
},
want: ffclient.DataExporter{
FlushInterval: 10 * time.Millisecond,
MaxEventInMemory: config.DefaultExporter.MaxEventInMemory,
Exporter: &kinesisexporter.Exporter{
Format: config.DefaultExporter.Format,
Settings: kinesisexporter.NewSettings(
kinesisexporter.WithStreamArn("my-stream"),
),
},
},
wantType: &kinesisexporter.Exporter{},
skipCompleteValidation: true,
},
}
for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
Expand Down
47 changes: 47 additions & 0 deletions examples/data_export_kinesis/README.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,47 @@
# Kafka exporter example

This example contains everything you need to export the usage of your flag to **`AWS Kinesis`**.

## How to setup the example
_All commands should be run in the root level of the repository._

1. Start a kafka server by running:

```shell
docker-compose -f examples/data_export_kinesis/docker-compose.yml up
```

2. Create python virtualenv & Install awslocal
```shell
mkvirtualenv go-features-flag-kinesis
# or activate existent
workon go-features-flag-kinesis
pip install awscli awscli-local
```

2. Create a topic in Kinesis:

```shell
awslocal kinesis create-stream --stream-name test-stream --region us-east-1
```

3. Update dependencies:

```shell
make vendor
```

4. Run the example application:

```shell
go run ./examples/data_export_kinesis/main.go
```
_If you check the logs, you should see the events being sent 1 by 1 to kinesis._

5. Read the items in the topic:

```shell
SHARD_ITERATOR=$(awslocal kinesis get-shard-iterator --shard-id shardId-000000000000 --shard-iterator-type TRIM_HORIZON --stream-name test-stream --query 'ShardIterator' --region us-east-1)

awslocal kinesis get-records --shard-iterator $SHARD_ITERATOR --region us-east-1
```
13 changes: 13 additions & 0 deletions examples/data_export_kinesis/docker-compose.yml
Original file line number Diff line number Diff line change
@@ -0,0 +1,13 @@
version: '2'
services:
localstack:
container_name: "${LOCALSTACK_DOCKER_NAME:-localstack-main}"
image: localstack/localstack
ports:
- "127.0.0.1:4566:4566" # LocalStack Gateway
- "127.0.0.1:4510-4559:4510-4559" # external services port range
environment:
- DEBUG=${DEBUG:-0}
volumes:
- "${LOCALSTACK_VOLUME_DIR:-./volume}:/var/lib/localstack"
- "/var/run/docker.sock:/var/run/docker.sock"
22 changes: 22 additions & 0 deletions examples/data_export_kinesis/flags.goff.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,22 @@
new-admin-access:
variations:
default_var: false
false_var: false
true_var: true
defaultRule:
percentage:
false_var: 70
true_var: 30

flag-only-for-admin:
variations:
default_var: false
false_var: false
true_var: true
targeting:
- query: admin eq true
percentage:
false_var: 0
true_var: 100
defaultRule:
variation: default_var
82 changes: 82 additions & 0 deletions examples/data_export_kinesis/main.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,82 @@
package main

import (
"context"
"log"
"log/slog"
"time"

"github.com/aws/aws-sdk-go-v2/config"
"github.com/aws/aws-sdk-go-v2/credentials"

kex "github.com/thomaspoignant/go-feature-flag/exporter/kinesisexporter"

ffclient "github.com/thomaspoignant/go-feature-flag"
"github.com/thomaspoignant/go-feature-flag/ffcontext"
"github.com/thomaspoignant/go-feature-flag/retriever/fileretriever"
)

func main() {
/*
1. Start a kinesis server by running `docker-compose -f examples/data_export_kinesis/docker-compose.yml up`
2. Create a stream: docker exec $(docker ps | grep cp-kinesis | awk '{print $1}') kafka-topics --create --topic go-feature-flag-events --bootstrap-server localhost:9092
3. Run this example
-> if you check the logs, you should see the events being sent 1 by 1 to kafka.
4. Read the items in the topic: docker exec $(docker ps | grep cp-kafka | awk '{print $1}') kafka-console-consumer --bootstrap-server localhost:9092 --topic go-feature-flag-events --from-beginning
*/

config, err := config.LoadDefaultConfig(
context.Background(),
config.WithCredentialsProvider(
credentials.NewStaticCredentialsProvider("test", "test", ""),
),
config.WithRegion("us-east-1"),
config.WithBaseEndpoint("http://localhost:4566"),
)

if err != nil {
panic("Can't instantiate localstack connection credentials")
}

err = ffclient.Init(ffclient.Config{
PollingInterval: 10 * time.Second,
LeveledLogger: slog.Default(),
Context: context.Background(),
Retriever: &fileretriever.Retriever{
Path: "examples/data_export_kinesis/flags.goff.yaml",
},
DataExporter: ffclient.DataExporter{
FlushInterval: 1 * time.Second,
MaxEventInMemory: 100,
Exporter: &kex.Exporter{
Settings: kex.NewSettings(
kex.WithStreamName("test-stream"),
kex.WithExplicitHashKey("0"),
),
AwsConfig: &config,
},
},
})

// Check init errors.
if err != nil {
log.Fatal(err)
}
// defer closing ffclient
defer ffclient.Close()

// create users
user1 := ffcontext.
NewEvaluationContextBuilder("aea2fdc1-b9a0-417a-b707-0c9083de68e3").
AddCustom("anonymous", true).
Build()
user2 := ffcontext.NewEvaluationContext("332460b9-a8aa-4f7a-bc5d-9cc33632df9a")

_, _ = ffclient.BoolVariation("new-admin-access", user1, false)
_, _ = ffclient.BoolVariation("new-admin-access", user2, false)
_, _ = ffclient.StringVariation("unknown-flag", user1, "defaultValue")
_, _ = ffclient.JSONVariation("unknown-flag-2", user1, map[string]interface{}{"test": "toto"})
_, _ = ffclient.BoolVariation("new-admin-access", user1, false)
_, _ = ffclient.BoolVariation("new-admin-access", user2, false)

}
Loading

0 comments on commit 2a249db

Please sign in to comment.