Skip to content

Commit

Permalink
feat(new sink): Initial aws_kinesis_firehose sink (#1388)
Browse files Browse the repository at this point in the history
Signed-off-by: Luke Steensen <[email protected]>
  • Loading branch information
lukesteensen authored Dec 19, 2019
1 parent dda55ba commit 518ba0e
Show file tree
Hide file tree
Showing 17 changed files with 1,280 additions and 10 deletions.
2 changes: 1 addition & 1 deletion .circleci/config.yml
Original file line number Diff line number Diff line change
Expand Up @@ -185,7 +185,7 @@ jobs:
DATA_DIR: /tmp/localstack/data
DEBUG: 1
PORT_WEB_UI: 8888
SERVICES: kinesis,cloudwatch,elasticsearch
SERVICES: kinesis,cloudwatch,elasticsearch,firehose
- image: minio/minio
environment:
MINIO_ACCESS_KEY: "test-access-key"
Expand Down
1 change: 1 addition & 0 deletions .github/semantic.yml
Original file line number Diff line number Diff line change
Expand Up @@ -80,6 +80,7 @@ scopes:
# sinks
- aws_cloudwatch_logs sink
- aws_cloudwatch_metrics sink
- aws_kinesis_firehose sink
- aws_kinesis_streams sink
- aws_s3 sink
- blackhole sink
Expand Down
4 changes: 3 additions & 1 deletion .meta/links.toml
Original file line number Diff line number Diff line change
Expand Up @@ -25,8 +25,10 @@ aws_cw_metrics_regions = "https://docs.aws.amazon.com/general/latest/gr/rande.ht
aws_elasticsearch = "https://aws.amazon.com/elasticsearch-service/"
aws_elasticsearch_regions = "https://docs.aws.amazon.com/general/latest/gr/rande.html#elasticsearch-service-regions"
aws_kinesis_data_streams = "https://aws.amazon.com/kinesis/data-streams/"
aws_kinesis_data_firehose = "https://aws.amazon.com/kinesis/data-firehose/"
aws_kinesis_partition_key = "https://docs.aws.amazon.com/kinesis/latest/APIReference/API_PutRecordsRequestEntry.html#Streams-Type-PutRecordsRequestEntry-PartitionKey"
aws_kinesis_service_limits = "https://docs.aws.amazon.com/streams/latest/dev/service-sizes-and-limits.html"
aws_kinesis_streams_service_limits = "https://docs.aws.amazon.com/streams/latest/dev/service-sizes-and-limits.html"
aws_kinesis_firehose_service_limits = "https://docs.aws.amazon.com/firehose/latest/dev/limits.html"
aws_kinesis_split_shards = "https://docs.aws.amazon.com/streams/latest/dev/kinesis-using-sdk-java-resharding-split.html"
aws_s3 = "https://aws.amazon.com/s3/"
aws_s3_regions = "https://docs.aws.amazon.com/general/latest/gr/rande.html#s3_region"
Expand Down
62 changes: 62 additions & 0 deletions .meta/sinks/aws_kinesis_firehose.toml
Original file line number Diff line number Diff line change
@@ -0,0 +1,62 @@
[sinks.aws_kinesis_firehose]
title = "AWS Kinesis Firehose"
batch_size = 1049000
batch_timeout = 1
beta = true
buffer = true
common = false
delivery_guarantee = "at_least_once"
egress_method = "batching"
encodings = ["json", "text"]
function_category = "transmit"
healthcheck = true
input_types = ["log"]
request_rate_limit_duration_secs = 1
request_rate_limit_num =5
request_retry_attempts = 5
request_retry_backoff_secs = 1
request_in_flight_limit = 5
request_timeout_secs = 30
service_limits_short_link = "aws_kinesis_firehose_service_limits"
service_provider = "Amazon"
write_to_description = "[Amazon Web Service's Kinesis Data Firehose][urls.aws_kinesis_data_firehose] via the [`PutRecordBatch` API endpoint](https://docs.aws.amazon.com/firehose/latest/APIReference/API_PutRecordBatch.html)"

[sinks.aws_kinesis_firehose.options.region]
type = "string"
common = true
examples = ["us-east-1"]
null = false
description = "The [AWS region][urls.aws_cw_logs_regions] of the target Kinesis Firehose delivery stream resides."

[sinks.aws_kinesis_firehose.options.stream_name]
type = "string"
common = true
examples = ["my-stream"]
null = false
description = "The [stream name][urls.aws_cw_logs_stream_name] of the target Kinesis Firehose delivery stream."

[[sinks.aws_kinesis_firehose.output.examples]]
label = "Generic"
output.type = "http"
output.body = """\
POST / HTTP/1.1
Host: firehose.<region>.<domain>
Content-Length: <byte_size>
Content-Type: application/x-amz-json-1.1
Connection: Keep-Alive
X-Amz-Target: Firehose_20150804.PutRecordBatch
{
"DeliveryStreamName": "<stream_name>",
"Records": [
{
"Data": "<base64_encoded_log>",
},
{
"Data": "<base64_encoded_log>",
},
{
"Data": "<base64_encoded_log>",
},
]
}\
"""
4 changes: 2 additions & 2 deletions .meta/sinks/aws_kinesis_streams.toml
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@ request_retry_attempts = 5
request_retry_backoff_secs = 1
request_in_flight_limit = 5
request_timeout_secs = 30
service_limits_short_link = "aws_kinesis_service_limits"
service_limits_short_link = "aws_kinesis_streams_service_limits"
service_provider = "Amazon"
write_to_description = "[Amazon Web Service's Kinesis Data Stream service][urls.aws_kinesis_data_streams] via the [`PutRecords` API endpoint](https://docs.aws.amazon.com/kinesis/latest/APIReference/API_PutRecords.html)"

Expand Down Expand Up @@ -69,4 +69,4 @@ X-Amz-Target: Kinesis_20131202.PutRecords
],
"StreamName": "<stream_name>"
}\
"""
"""
15 changes: 15 additions & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

7 changes: 6 additions & 1 deletion Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -70,6 +70,7 @@ rusoto_logs = "0.41.0"
rusoto_cloudwatch = "0.41.0"
rusoto_kinesis = "0.41.0"
rusoto_credential = "0.41.1"
rusoto_firehose = "0.41.0"

# Tower
tower = "0.1.1"
Expand Down Expand Up @@ -204,18 +205,22 @@ docker = [
"docker-integration-tests",
"ec2-metadata-integration-tests",
"es-integration-tests",
"firehose-integration-tests",
"gcp-pubsub-integration-tests",
"kafka-integration-tests",
"kinesis-integration-tests",
"s3-integration-tests",
"splunk-integration-tests"
"splunk-integration-tests",
"docker-integration-tests",
"ec2-metadata-integration-tests",
]
clickhouse-integration-tests = []
cloudwatch-logs-integration-tests = []
cloudwatch-metrics-integration-tests = []
docker-integration-tests = []
ec2-metadata-integration-tests = []
es-integration-tests = []
firehose-integration-tests = []
gcp-pubsub-integration-tests = []
kafka-integration-tests = []
kinesis-integration-tests = []
Expand Down
2 changes: 1 addition & 1 deletion README.md
Original file line number Diff line number Diff line change
Expand Up @@ -62,7 +62,7 @@ simple and unified.

* [**Sources**][docs.sources] - [docker][docs.sources.docker], [file][docs.sources.file], [journald][docs.sources.journald], [kafka][docs.sources.kafka], [syslog][docs.sources.syslog], [tcp][docs.sources.tcp], and [5 more...][docs.sources]
* [**Transforms**][docs.transforms] - [json_parser][docs.transforms.json_parser], [log_to_metric][docs.transforms.log_to_metric], [lua][docs.transforms.lua], [regex_parser][docs.transforms.regex_parser], [sampler][docs.transforms.sampler], [tokenizer][docs.transforms.tokenizer], and [12 more...][docs.transforms]
* [**Sinks**][docs.sinks] - [aws_cloudwatch_logs][docs.sinks.aws_cloudwatch_logs], [aws_s3][docs.sinks.aws_s3], [clickhouse][docs.sinks.clickhouse], [elasticsearch][docs.sinks.elasticsearch], [gcp_pubsub][docs.sinks.gcp_pubsub], [http][docs.sinks.http], [kafka][docs.sinks.kafka], and [12 more...][docs.sinks]
* [**Sinks**][docs.sinks] - [aws_cloudwatch_logs][docs.sinks.aws_cloudwatch_logs], [aws_s3][docs.sinks.aws_s3], [clickhouse][docs.sinks.clickhouse], [elasticsearch][docs.sinks.elasticsearch], [gcp_pubsub][docs.sinks.gcp_pubsub], [http][docs.sinks.http], [kafka][docs.sinks.kafka], and [13 more...][docs.sinks]

#### Administration

Expand Down
167 changes: 167 additions & 0 deletions config/vector.spec.toml
Original file line number Diff line number Diff line change
Expand Up @@ -1640,6 +1640,173 @@ end
healthcheck = true
healthcheck = false

# Batches `log` events to Amazon Web Service's Kinesis Data Firehose via the `PutRecordBatch` API endpoint.
[sinks.aws_kinesis_firehose]
#
# General
#

# The component type. This is a required field that tells Vector which
# component to use. The value _must_ be `aws_kinesis_firehose`.
#
# * required
# * type: string
# * must be: "aws_kinesis_firehose"
type = "aws_kinesis_firehose"

# A list of upstream source or transform IDs. See configuration for more info.
#
# * required
# * type: [string]
inputs = ["my-source-id"]

# The AWS region of the target Kinesis Firehose delivery stream resides.
#
# * required
# * type: string
region = "us-east-1"

# The stream name of the target Kinesis Firehose delivery stream.
#
# * required
# * type: string
stream_name = "my-stream"

# Enables/disables the sink healthcheck upon start.
#
# * optional
# * default: true
# * type: bool
healthcheck = true
healthcheck = false

#
# requests
#

# The encoding format used to serialize the events before outputting.
#
# * required
# * type: string
# * enum: "json" or "text"
encoding = "json"
encoding = "text"

#
# Batching
#

# The maximum size of a batch before it is flushed.
#
# * optional
# * default: 1049000
# * type: int
# * unit: bytes
batch_size = 1049000

# The maximum age of a batch before it is flushed.
#
# * optional
# * default: 1
# * type: int
# * unit: seconds
batch_timeout = 1

#
# Requests
#

# The maximum number of in-flight requests allowed at any given time.
#
# * optional
# * default: 5
# * type: int
request_in_flight_limit = 5

# The window used for the `request_rate_limit_num` option
#
# * optional
# * default: 1
# * type: int
# * unit: seconds
request_rate_limit_duration_secs = 1

# The maximum number of requests allowed within the
# `request_rate_limit_duration_secs` window.
#
# * optional
# * default: 5
# * type: int
request_rate_limit_num = 5

# The maximum number of retries to make for failed requests.
#
# * optional
# * default: 5
# * type: int
request_retry_attempts = 5

# The amount of time to wait before attempting a failed request again.
#
# * optional
# * default: 1
# * type: int
# * unit: seconds
request_retry_backoff_secs = 1

# The maximum time a request can take before being aborted. It is highly
# recommended that you do not lower value below the service's internal timeout,
# as this could create orphaned requests, pile on retries, and result in
# deuplicate data downstream.
#
# * optional
# * default: 30
# * type: int
# * unit: seconds
request_timeout_secs = 30

#
# Buffer
#

[sinks.aws_kinesis_firehose.buffer]
# The buffer's type / location. `disk` buffers are persistent and will be
# retained between restarts.
#
# * optional
# * default: "memory"
# * type: string
# * enum: "memory" or "disk"
type = "memory"
type = "disk"

# The maximum size of the buffer on the disk.
#
# * optional
# * no default
# * type: int
# * unit: bytes
# * relevant when type = "disk"
max_size = 104900000

# The maximum number of events allowed in the buffer.
#
# * optional
# * default: 500
# * type: int
# * unit: events
# * relevant when type = "memory"
num_items = 500

# The behavior when the buffer becomes full.
#
# * optional
# * default: "block"
# * type: string
# * enum: "block" or "drop_newest"
when_full = "block"
when_full = "drop_newest"

# Batches `log` events to Amazon Web Service's Kinesis Data Stream service via the `PutRecords` API endpoint.
[sinks.aws_kinesis_streams]
#
Expand Down
3 changes: 2 additions & 1 deletion docker-compose.yml
Original file line number Diff line number Diff line change
Expand Up @@ -6,8 +6,9 @@ services:
- "4568:4568"
- "4582:4582"
- "4571:4571"
- "4573:4573"
environment:
SERVICES: kinesis:4568,cloudwatch:4582,elasticsearch:4571
SERVICES: kinesis:4568,cloudwatch:4582,elasticsearch:4571,firehose:4573
minio:
image: minio/minio
ports:
Expand Down
Loading

0 comments on commit 518ba0e

Please sign in to comment.