From 518ba0ee5c6b944c2056748e9aff29e0b5b05423 Mon Sep 17 00:00:00 2001 From: Luke Steensen Date: Thu, 19 Dec 2019 16:59:54 -0600 Subject: [PATCH] feat(new sink): Initial `aws_kinesis_firehose` sink (#1388) Signed-off-by: Luke Steensen --- .circleci/config.yml | 2 +- .github/semantic.yml | 1 + .meta/links.toml | 4 +- .meta/sinks/aws_kinesis_firehose.toml | 62 ++ .meta/sinks/aws_kinesis_streams.toml | 4 +- Cargo.lock | 15 + Cargo.toml | 7 +- README.md | 2 +- config/vector.spec.toml | 167 +++++ docker-compose.yml | 3 +- src/sinks/aws_kinesis_firehose.rs | 371 +++++++++++ src/sinks/elasticsearch.rs | 6 +- src/sinks/mod.rs | 1 + .../reference/sinks/aws_kinesis_firehose.md | 594 ++++++++++++++++++ .../sinks/aws_kinesis_firehose.md.erb | 27 + website/metadata.js | 22 + website/sidebars.js | 2 + 17 files changed, 1280 insertions(+), 10 deletions(-) create mode 100644 .meta/sinks/aws_kinesis_firehose.toml create mode 100644 src/sinks/aws_kinesis_firehose.rs create mode 100644 website/docs/reference/sinks/aws_kinesis_firehose.md create mode 100644 website/docs/reference/sinks/aws_kinesis_firehose.md.erb diff --git a/.circleci/config.yml b/.circleci/config.yml index d693afc0cd996..9f4ad95a0126c 100644 --- a/.circleci/config.yml +++ b/.circleci/config.yml @@ -185,7 +185,7 @@ jobs: DATA_DIR: /tmp/localstack/data DEBUG: 1 PORT_WEB_UI: 8888 - SERVICES: kinesis,cloudwatch,elasticsearch + SERVICES: kinesis,cloudwatch,elasticsearch,firehose - image: minio/minio environment: MINIO_ACCESS_KEY: "test-access-key" diff --git a/.github/semantic.yml b/.github/semantic.yml index badb29e59f082..94e37175c2c19 100644 --- a/.github/semantic.yml +++ b/.github/semantic.yml @@ -80,6 +80,7 @@ scopes: # sinks - aws_cloudwatch_logs sink - aws_cloudwatch_metrics sink + - aws_kinesis_firehose sink - aws_kinesis_streams sink - aws_s3 sink - blackhole sink diff --git a/.meta/links.toml b/.meta/links.toml index b4d0e427249fa..b85483dab41d4 100644 --- a/.meta/links.toml +++ b/.meta/links.toml @@ -25,8 +25,10 @@ aws_cw_metrics_regions = "https://docs.aws.amazon.com/general/latest/gr/rande.ht aws_elasticsearch = "https://aws.amazon.com/elasticsearch-service/" aws_elasticsearch_regions = "https://docs.aws.amazon.com/general/latest/gr/rande.html#elasticsearch-service-regions" aws_kinesis_data_streams = "https://aws.amazon.com/kinesis/data-streams/" +aws_kinesis_data_firehose = "https://aws.amazon.com/kinesis/data-firehose/" aws_kinesis_partition_key = "https://docs.aws.amazon.com/kinesis/latest/APIReference/API_PutRecordsRequestEntry.html#Streams-Type-PutRecordsRequestEntry-PartitionKey" -aws_kinesis_service_limits = "https://docs.aws.amazon.com/streams/latest/dev/service-sizes-and-limits.html" +aws_kinesis_streams_service_limits = "https://docs.aws.amazon.com/streams/latest/dev/service-sizes-and-limits.html" +aws_kinesis_firehose_service_limits = "https://docs.aws.amazon.com/firehose/latest/dev/limits.html" aws_kinesis_split_shards = "https://docs.aws.amazon.com/streams/latest/dev/kinesis-using-sdk-java-resharding-split.html" aws_s3 = "https://aws.amazon.com/s3/" aws_s3_regions = "https://docs.aws.amazon.com/general/latest/gr/rande.html#s3_region" diff --git a/.meta/sinks/aws_kinesis_firehose.toml b/.meta/sinks/aws_kinesis_firehose.toml new file mode 100644 index 0000000000000..31d96a8a9a35a --- /dev/null +++ b/.meta/sinks/aws_kinesis_firehose.toml @@ -0,0 +1,62 @@ +[sinks.aws_kinesis_firehose] +title = "AWS Kinesis Firehose" +batch_size = 1049000 +batch_timeout = 1 +beta = true +buffer = true +common = false +delivery_guarantee = "at_least_once" +egress_method = "batching" +encodings = ["json", "text"] +function_category = "transmit" +healthcheck = true +input_types = ["log"] +request_rate_limit_duration_secs = 1 +request_rate_limit_num =5 +request_retry_attempts = 5 +request_retry_backoff_secs = 1 +request_in_flight_limit = 5 +request_timeout_secs = 30 +service_limits_short_link = "aws_kinesis_firehose_service_limits" +service_provider = "Amazon" +write_to_description = "[Amazon Web Service's Kinesis Data Firehose][urls.aws_kinesis_data_firehose] via the [`PutRecordBatch` API endpoint](https://docs.aws.amazon.com/firehose/latest/APIReference/API_PutRecordBatch.html)" + +[sinks.aws_kinesis_firehose.options.region] +type = "string" +common = true +examples = ["us-east-1"] +null = false +description = "The [AWS region][urls.aws_cw_logs_regions] of the target Kinesis Firehose delivery stream resides." + +[sinks.aws_kinesis_firehose.options.stream_name] +type = "string" +common = true +examples = ["my-stream"] +null = false +description = "The [stream name][urls.aws_cw_logs_stream_name] of the target Kinesis Firehose delivery stream." + +[[sinks.aws_kinesis_firehose.output.examples]] +label = "Generic" +output.type = "http" +output.body = """\ +POST / HTTP/1.1 +Host: firehose.. +Content-Length: +Content-Type: application/x-amz-json-1.1 +Connection: Keep-Alive +X-Amz-Target: Firehose_20150804.PutRecordBatch +{ + "DeliveryStreamName": "", + "Records": [ + { + "Data": "", + }, + { + "Data": "", + }, + { + "Data": "", + }, + ] +}\ +""" diff --git a/.meta/sinks/aws_kinesis_streams.toml b/.meta/sinks/aws_kinesis_streams.toml index 469c772eae75b..66d8ee8f79c5e 100644 --- a/.meta/sinks/aws_kinesis_streams.toml +++ b/.meta/sinks/aws_kinesis_streams.toml @@ -17,7 +17,7 @@ request_retry_attempts = 5 request_retry_backoff_secs = 1 request_in_flight_limit = 5 request_timeout_secs = 30 -service_limits_short_link = "aws_kinesis_service_limits" +service_limits_short_link = "aws_kinesis_streams_service_limits" service_provider = "Amazon" write_to_description = "[Amazon Web Service's Kinesis Data Stream service][urls.aws_kinesis_data_streams] via the [`PutRecords` API endpoint](https://docs.aws.amazon.com/kinesis/latest/APIReference/API_PutRecords.html)" @@ -69,4 +69,4 @@ X-Amz-Target: Kinesis_20131202.PutRecords ], "StreamName": "" }\ -""" \ No newline at end of file +""" diff --git a/Cargo.lock b/Cargo.lock index 196a237950dbf..a5659fccc2df3 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -2630,6 +2630,19 @@ dependencies = [ "tokio-timer 0.2.11 (registry+https://github.com/rust-lang/crates.io-index)", ] +[[package]] +name = "rusoto_firehose" +version = "0.41.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +dependencies = [ + "bytes 0.4.12 (registry+https://github.com/rust-lang/crates.io-index)", + "futures 0.1.29 (registry+https://github.com/rust-lang/crates.io-index)", + "rusoto_core 0.41.0 (registry+https://github.com/rust-lang/crates.io-index)", + "serde 1.0.102 (registry+https://github.com/rust-lang/crates.io-index)", + "serde_derive 1.0.102 (registry+https://github.com/rust-lang/crates.io-index)", + "serde_json 1.0.41 (registry+https://github.com/rust-lang/crates.io-index)", +] + [[package]] name = "rusoto_kinesis" version = "0.41.0" @@ -4155,6 +4168,7 @@ dependencies = [ "rusoto_cloudwatch 0.41.0 (registry+https://github.com/rust-lang/crates.io-index)", "rusoto_core 0.41.0 (registry+https://github.com/rust-lang/crates.io-index)", "rusoto_credential 0.41.1 (registry+https://github.com/rust-lang/crates.io-index)", + "rusoto_firehose 0.41.0 (registry+https://github.com/rust-lang/crates.io-index)", "rusoto_kinesis 0.41.0 (registry+https://github.com/rust-lang/crates.io-index)", "rusoto_logs 0.41.0 (registry+https://github.com/rust-lang/crates.io-index)", "rusoto_s3 0.41.0 (registry+https://github.com/rust-lang/crates.io-index)", @@ -4660,6 +4674,7 @@ dependencies = [ "checksum rusoto_cloudwatch 0.41.0 (registry+https://github.com/rust-lang/crates.io-index)" = "f470c278733d18760a5966a89227f4d9a06bb1d8260d676b011faa4d2aa82342" "checksum rusoto_core 0.41.0 (registry+https://github.com/rust-lang/crates.io-index)" = "351e97aedcc659bd03168ff7fd3dbb270b6ee812c0c51c7953d2ef6f0a119aa9" "checksum rusoto_credential 0.41.1 (registry+https://github.com/rust-lang/crates.io-index)" = "22a9b3b73099876f50d3de8e0974de71934ddca4d48d11268456b47c4d2fff87" +"checksum rusoto_firehose 0.41.0 (registry+https://github.com/rust-lang/crates.io-index)" = "ef495217f457f6ad9a1d975fcbedc346b39708a6725118d58f704064e0252c81" "checksum rusoto_kinesis 0.41.0 (registry+https://github.com/rust-lang/crates.io-index)" = "f88609598b24779b268f47c0d19780cf4865b0cbccd9f677bae75f7f7c5d4dcb" "checksum rusoto_logs 0.41.0 (registry+https://github.com/rust-lang/crates.io-index)" = "691f665f2ae401d4fddaac8d6a33c3e7c388e93494242ea937ebdddf9961f7f6" "checksum rusoto_s3 0.41.0 (registry+https://github.com/rust-lang/crates.io-index)" = "b6c840fca030950caf0c9bea89eb35311aa5b01c0341fb0aaf28d347b5c416d7" diff --git a/Cargo.toml b/Cargo.toml index d636944efbe72..c4b964120dc45 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -70,6 +70,7 @@ rusoto_logs = "0.41.0" rusoto_cloudwatch = "0.41.0" rusoto_kinesis = "0.41.0" rusoto_credential = "0.41.1" +rusoto_firehose = "0.41.0" # Tower tower = "0.1.1" @@ -204,11 +205,14 @@ docker = [ "docker-integration-tests", "ec2-metadata-integration-tests", "es-integration-tests", + "firehose-integration-tests", "gcp-pubsub-integration-tests", "kafka-integration-tests", "kinesis-integration-tests", "s3-integration-tests", - "splunk-integration-tests" + "splunk-integration-tests", + "docker-integration-tests", + "ec2-metadata-integration-tests", ] clickhouse-integration-tests = [] cloudwatch-logs-integration-tests = [] @@ -216,6 +220,7 @@ cloudwatch-metrics-integration-tests = [] docker-integration-tests = [] ec2-metadata-integration-tests = [] es-integration-tests = [] +firehose-integration-tests = [] gcp-pubsub-integration-tests = [] kafka-integration-tests = [] kinesis-integration-tests = [] diff --git a/README.md b/README.md index f33229c166644..678e29abe238c 100644 --- a/README.md +++ b/README.md @@ -62,7 +62,7 @@ simple and unified. * [**Sources**][docs.sources] - [docker][docs.sources.docker], [file][docs.sources.file], [journald][docs.sources.journald], [kafka][docs.sources.kafka], [syslog][docs.sources.syslog], [tcp][docs.sources.tcp], and [5 more...][docs.sources] * [**Transforms**][docs.transforms] - [json_parser][docs.transforms.json_parser], [log_to_metric][docs.transforms.log_to_metric], [lua][docs.transforms.lua], [regex_parser][docs.transforms.regex_parser], [sampler][docs.transforms.sampler], [tokenizer][docs.transforms.tokenizer], and [12 more...][docs.transforms] -* [**Sinks**][docs.sinks] - [aws_cloudwatch_logs][docs.sinks.aws_cloudwatch_logs], [aws_s3][docs.sinks.aws_s3], [clickhouse][docs.sinks.clickhouse], [elasticsearch][docs.sinks.elasticsearch], [gcp_pubsub][docs.sinks.gcp_pubsub], [http][docs.sinks.http], [kafka][docs.sinks.kafka], and [12 more...][docs.sinks] +* [**Sinks**][docs.sinks] - [aws_cloudwatch_logs][docs.sinks.aws_cloudwatch_logs], [aws_s3][docs.sinks.aws_s3], [clickhouse][docs.sinks.clickhouse], [elasticsearch][docs.sinks.elasticsearch], [gcp_pubsub][docs.sinks.gcp_pubsub], [http][docs.sinks.http], [kafka][docs.sinks.kafka], and [13 more...][docs.sinks] #### Administration diff --git a/config/vector.spec.toml b/config/vector.spec.toml index a8717842aad62..89f801b67651d 100644 --- a/config/vector.spec.toml +++ b/config/vector.spec.toml @@ -1640,6 +1640,173 @@ end healthcheck = true healthcheck = false +# Batches `log` events to Amazon Web Service's Kinesis Data Firehose via the `PutRecordBatch` API endpoint. +[sinks.aws_kinesis_firehose] + # + # General + # + + # The component type. This is a required field that tells Vector which + # component to use. The value _must_ be `aws_kinesis_firehose`. + # + # * required + # * type: string + # * must be: "aws_kinesis_firehose" + type = "aws_kinesis_firehose" + + # A list of upstream source or transform IDs. See configuration for more info. + # + # * required + # * type: [string] + inputs = ["my-source-id"] + + # The AWS region of the target Kinesis Firehose delivery stream resides. + # + # * required + # * type: string + region = "us-east-1" + + # The stream name of the target Kinesis Firehose delivery stream. + # + # * required + # * type: string + stream_name = "my-stream" + + # Enables/disables the sink healthcheck upon start. + # + # * optional + # * default: true + # * type: bool + healthcheck = true + healthcheck = false + + # + # requests + # + + # The encoding format used to serialize the events before outputting. + # + # * required + # * type: string + # * enum: "json" or "text" + encoding = "json" + encoding = "text" + + # + # Batching + # + + # The maximum size of a batch before it is flushed. + # + # * optional + # * default: 1049000 + # * type: int + # * unit: bytes + batch_size = 1049000 + + # The maximum age of a batch before it is flushed. + # + # * optional + # * default: 1 + # * type: int + # * unit: seconds + batch_timeout = 1 + + # + # Requests + # + + # The maximum number of in-flight requests allowed at any given time. + # + # * optional + # * default: 5 + # * type: int + request_in_flight_limit = 5 + + # The window used for the `request_rate_limit_num` option + # + # * optional + # * default: 1 + # * type: int + # * unit: seconds + request_rate_limit_duration_secs = 1 + + # The maximum number of requests allowed within the + # `request_rate_limit_duration_secs` window. + # + # * optional + # * default: 5 + # * type: int + request_rate_limit_num = 5 + + # The maximum number of retries to make for failed requests. + # + # * optional + # * default: 5 + # * type: int + request_retry_attempts = 5 + + # The amount of time to wait before attempting a failed request again. + # + # * optional + # * default: 1 + # * type: int + # * unit: seconds + request_retry_backoff_secs = 1 + + # The maximum time a request can take before being aborted. It is highly + # recommended that you do not lower value below the service's internal timeout, + # as this could create orphaned requests, pile on retries, and result in + # deuplicate data downstream. + # + # * optional + # * default: 30 + # * type: int + # * unit: seconds + request_timeout_secs = 30 + + # + # Buffer + # + + [sinks.aws_kinesis_firehose.buffer] + # The buffer's type / location. `disk` buffers are persistent and will be + # retained between restarts. + # + # * optional + # * default: "memory" + # * type: string + # * enum: "memory" or "disk" + type = "memory" + type = "disk" + + # The maximum size of the buffer on the disk. + # + # * optional + # * no default + # * type: int + # * unit: bytes + # * relevant when type = "disk" + max_size = 104900000 + + # The maximum number of events allowed in the buffer. + # + # * optional + # * default: 500 + # * type: int + # * unit: events + # * relevant when type = "memory" + num_items = 500 + + # The behavior when the buffer becomes full. + # + # * optional + # * default: "block" + # * type: string + # * enum: "block" or "drop_newest" + when_full = "block" + when_full = "drop_newest" + # Batches `log` events to Amazon Web Service's Kinesis Data Stream service via the `PutRecords` API endpoint. [sinks.aws_kinesis_streams] # diff --git a/docker-compose.yml b/docker-compose.yml index f6c4e199274b4..696f4f9ce9952 100644 --- a/docker-compose.yml +++ b/docker-compose.yml @@ -6,8 +6,9 @@ services: - "4568:4568" - "4582:4582" - "4571:4571" + - "4573:4573" environment: - SERVICES: kinesis:4568,cloudwatch:4582,elasticsearch:4571 + SERVICES: kinesis:4568,cloudwatch:4582,elasticsearch:4571,firehose:4573 minio: image: minio/minio ports: diff --git a/src/sinks/aws_kinesis_firehose.rs b/src/sinks/aws_kinesis_firehose.rs new file mode 100644 index 0000000000000..8ca1d38ca1bfc --- /dev/null +++ b/src/sinks/aws_kinesis_firehose.rs @@ -0,0 +1,371 @@ +use crate::{ + dns::Resolver, + event::{self, Event}, + region::RegionOrEndpoint, + sinks::util::{retries::RetryLogic, BatchConfig, SinkExt, TowerRequestConfig}, + topology::config::{DataType, SinkConfig, SinkContext, SinkDescription}, +}; +use bytes::Bytes; +use futures::{stream::iter_ok, Future, Poll, Sink}; +use lazy_static::lazy_static; +use rusoto_core::{Region, RusotoError, RusotoFuture}; +use rusoto_firehose::{ + KinesisFirehose, KinesisFirehoseClient, ListDeliveryStreamsInput, PutRecordBatchError, + PutRecordBatchInput, PutRecordBatchOutput, Record, +}; +use serde::{Deserialize, Serialize}; +use snafu::Snafu; +use std::{convert::TryInto, fmt}; +use tower::Service; +use tracing_futures::{Instrument, Instrumented}; + +#[derive(Clone)] +pub struct KinesisFirehoseService { + client: KinesisFirehoseClient, + config: KinesisFirehoseSinkConfig, +} + +#[derive(Deserialize, Serialize, Debug, Clone, Default)] +#[serde(deny_unknown_fields)] +pub struct KinesisFirehoseSinkConfig { + pub stream_name: String, + #[serde(flatten)] + pub region: RegionOrEndpoint, + pub encoding: Encoding, + #[serde(default, flatten)] + pub batch: BatchConfig, + #[serde(flatten)] + pub request: TowerRequestConfig, +} + +lazy_static! { + static ref REQUEST_DEFAULTS: TowerRequestConfig = TowerRequestConfig { + request_timeout_secs: Some(30), + ..Default::default() + }; +} + +#[derive(Deserialize, Serialize, Debug, Eq, PartialEq, Clone, Derivative)] +#[serde(rename_all = "snake_case")] +#[derivative(Default)] +pub enum Encoding { + #[derivative(Default)] + Text, + Json, +} + +inventory::submit! { + SinkDescription::new::("aws_kinesis_firehose") +} + +#[typetag::serde(name = "aws_kinesis_firehose")] +impl SinkConfig for KinesisFirehoseSinkConfig { + fn build(&self, cx: SinkContext) -> crate::Result<(super::RouterSink, super::Healthcheck)> { + let config = self.clone(); + let healthcheck = healthcheck(self.clone(), cx.resolver())?; + let sink = KinesisFirehoseService::new(config, cx)?; + Ok((Box::new(sink), healthcheck)) + } + + fn input_type(&self) -> DataType { + DataType::Log + } + + fn sink_type(&self) -> &'static str { + "aws_kinesis_firehose" + } +} + +impl KinesisFirehoseService { + pub fn new( + config: KinesisFirehoseSinkConfig, + cx: SinkContext, + ) -> crate::Result> { + let client = create_client(config.region.clone().try_into()?, cx.resolver())?; + + let batch = config.batch.unwrap_or(bytesize::mib(1u64), 1); + let request = config.request.unwrap_with(&REQUEST_DEFAULTS); + let encoding = config.encoding.clone(); + + let kinesis = KinesisFirehoseService { client, config }; + + let sink = request + .batch_sink(KinesisFirehoseRetryLogic, kinesis, cx.acker()) + .batched_with_min(Vec::new(), &batch) + .with_flat_map(move |e| iter_ok(encode_event(e, &encoding))); + + Ok(sink) + } +} + +impl Service> for KinesisFirehoseService { + type Response = PutRecordBatchOutput; + type Error = RusotoError; + type Future = Instrumented>; + + fn poll_ready(&mut self) -> Poll<(), Self::Error> { + Ok(().into()) + } + + fn call(&mut self, records: Vec) -> Self::Future { + debug!( + message = "sending records.", + events = %records.len(), + ); + + let request = PutRecordBatchInput { + records, + delivery_stream_name: self.config.stream_name.clone(), + }; + + self.client + .put_record_batch(request) + .instrument(info_span!("request")) + } +} + +impl fmt::Debug for KinesisFirehoseService { + fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { + f.debug_struct("KinesisFirehoseService") + .field("config", &self.config) + .finish() + } +} + +#[derive(Debug, Clone)] +struct KinesisFirehoseRetryLogic; + +impl RetryLogic for KinesisFirehoseRetryLogic { + type Error = RusotoError; + type Response = PutRecordBatchOutput; + + fn is_retriable_error(&self, error: &Self::Error) -> bool { + match error { + RusotoError::HttpDispatch(_) => true, + RusotoError::Service(PutRecordBatchError::ServiceUnavailable(_)) => true, + RusotoError::Unknown(res) if res.status.is_server_error() => true, + _ => false, + } + } +} + +#[derive(Debug, Snafu)] +enum HealthcheckError { + #[snafu(display("ListDeliveryStreams failed: {}", source))] + ListDeliveryStreamsFailed { + source: RusotoError, + }, + #[snafu(display("Stream names do not match, got {}, expected {}", name, stream_name))] + StreamNamesMismatch { name: String, stream_name: String }, + #[snafu(display( + "Stream returned does not contain any streams that match {}", + stream_name + ))] + NoMatchingStreamName { stream_name: String }, +} + +fn healthcheck( + config: KinesisFirehoseSinkConfig, + resolver: Resolver, +) -> crate::Result { + let client = create_client(config.region.try_into()?, resolver)?; + let stream_name = config.stream_name; + + let fut = client + .list_delivery_streams(ListDeliveryStreamsInput { + exclusive_start_delivery_stream_name: Some(stream_name.clone()), + limit: Some(1), + delivery_stream_type: None, + }) + .map_err(|source| HealthcheckError::ListDeliveryStreamsFailed { source }.into()) + .and_then(move |res| Ok(res.delivery_stream_names.into_iter().next())) + .and_then(move |name| { + if let Some(name) = name { + if name == stream_name { + Ok(()) + } else { + Err(HealthcheckError::StreamNamesMismatch { name, stream_name }.into()) + } + } else { + Err(HealthcheckError::NoMatchingStreamName { stream_name }.into()) + } + }); + + Ok(Box::new(fut)) +} + +fn create_client(region: Region, resolver: Resolver) -> crate::Result { + use rusoto_credential::DefaultCredentialsProvider; + + let p = DefaultCredentialsProvider::new()?; + let d = crate::sinks::util::rusoto::client(resolver)?; + + Ok(KinesisFirehoseClient::new_with(d, p, region)) +} + +fn encode_event(event: Event, encoding: &Encoding) -> Option { + let log = event.into_log(); + let data = match encoding { + Encoding::Json => { + serde_json::to_vec(&log.unflatten()).expect("Error encoding event as json.") + } + + Encoding::Text => log + .get(&event::MESSAGE) + .map(|v| v.as_bytes().to_vec()) + .unwrap_or_default(), + }; + + let data = Bytes::from(data); + + Some(Record { data }) +} + +#[cfg(test)] +mod tests { + use super::*; + use crate::event::{self, Event}; + use std::collections::HashMap; + + #[test] + fn firehose_encode_event_text() { + let message = "hello world".to_string(); + let event = encode_event(message.clone().into(), &Encoding::Text).unwrap(); + + assert_eq!(&event.data[..], message.as_bytes()); + } + + #[test] + fn firehose_encode_event_json() { + let message = "hello world".to_string(); + let mut event = Event::from(message.clone()); + event + .as_mut_log() + .insert_explicit("key".into(), "value".into()); + let event = encode_event(event, &Encoding::Json).unwrap(); + + let map: HashMap = serde_json::from_slice(&event.data[..]).unwrap(); + + assert_eq!(map[&event::MESSAGE.to_string()], message); + assert_eq!(map["key"], "value".to_string()); + } +} + +#[cfg(feature = "firehose-integration-tests")] +#[cfg(test)] +mod integration_tests { + use super::*; + use crate::{ + region::RegionOrEndpoint, + runtime, + sinks::elasticsearch::{ElasticSearchCommon, ElasticSearchConfig, Provider}, + test_util::{random_events_with_stream, random_string}, + topology::config::SinkContext, + }; + use futures::Sink; + use rusoto_core::Region; + use rusoto_firehose::{ + CreateDeliveryStreamInput, ElasticsearchDestinationConfiguration, KinesisFirehose, + KinesisFirehoseClient, + }; + use serde_json::{json, Value}; + use std::{thread, time::Duration}; + + #[test] + fn firehose_put_records() { + let stream = gen_stream(); + + let region = Region::Custom { + name: "localstack".into(), + endpoint: "http://localhost:4573".into(), + }; + + ensure_stream(region.clone(), stream.clone()); + + let config = KinesisFirehoseSinkConfig { + stream_name: stream.clone(), + region: RegionOrEndpoint::with_endpoint("http://localhost:4573".into()), + encoding: Encoding::Json, // required for ES destination w/ localstack + batch: BatchConfig { + batch_size: Some(2), + batch_timeout: None, + }, + request: TowerRequestConfig { + request_timeout_secs: Some(10), + request_retry_attempts: Some(0), + ..Default::default() + }, + }; + + let mut rt = runtime::Runtime::new().unwrap(); + let cx = SinkContext::new_test(rt.executor()); + + let sink = KinesisFirehoseService::new(config, cx).unwrap(); + + let (input, events) = random_events_with_stream(100, 100); + + let pump = sink.send_all(events); + let _ = rt.block_on(pump).unwrap(); + + thread::sleep(Duration::from_secs(1)); + + let config = ElasticSearchConfig { + provider: Some(Provider::Aws), + region: RegionOrEndpoint::with_endpoint("http://localhost:4571".into()), + index: Some(stream.clone()), + ..Default::default() + }; + let common = ElasticSearchCommon::parse_config(&config).expect("Config error"); + + let client = reqwest::Client::builder() + .build() + .expect("Could not build HTTP client"); + + let response = client + .get(&format!("{}/{}/_search", common.base_url, stream)) + .json(&json!({ + "query": { "query_string": { "query": "*" } } + })) + .send() + .unwrap() + .json::>() + .unwrap(); + + assert_eq!(input.len() as u64, response.total()); + let input = input + .into_iter() + .map(|rec| serde_json::to_value(rec.into_log().unflatten()).unwrap()) + .collect::>(); + for hit in response.into_hits() { + let event = hit.into_document().unwrap(); + assert!(input.contains(&event)); + } + } + + fn ensure_stream(region: Region, delivery_stream_name: String) { + let client = KinesisFirehoseClient::new(region); + + let es_config = ElasticsearchDestinationConfiguration { + index_name: delivery_stream_name.clone(), + domain_arn: "doesn't matter".into(), + role_arn: "doesn't matter".into(), + type_name: "doesn't matter".into(), + ..Default::default() + }; + + let req = CreateDeliveryStreamInput { + delivery_stream_name, + elasticsearch_destination_configuration: Some(es_config), + ..Default::default() + }; + + match client.create_delivery_stream(req).sync() { + Ok(_) => (), + Err(e) => println!("Unable to create the delivery stream {:?}", e), + }; + } + + fn gen_stream() -> String { + format!("test-{}", random_string(10).to_lowercase()) + } +} diff --git a/src/sinks/elasticsearch.rs b/src/sinks/elasticsearch.rs index f21a6695c9c7d..b76ae44cf4221 100644 --- a/src/sinks/elasticsearch.rs +++ b/src/sinks/elasticsearch.rs @@ -94,8 +94,8 @@ impl SinkConfig for ElasticSearchConfig { } } -struct ElasticSearchCommon { - base_url: String, +pub struct ElasticSearchCommon { + pub base_url: String, authorization: Option, region: Option, credentials: Option, @@ -117,7 +117,7 @@ enum ParseError { } impl ElasticSearchCommon { - fn parse_config(config: &ElasticSearchConfig) -> crate::Result { + pub fn parse_config(config: &ElasticSearchConfig) -> crate::Result { let authorization = config.basic_auth.as_ref().map(|auth| { let token = format!("{}:{}", auth.user, auth.password); format!("Basic {}", base64::encode(token.as_bytes())) diff --git a/src/sinks/mod.rs b/src/sinks/mod.rs index a925f22d28812..15e9d7b1872a8 100644 --- a/src/sinks/mod.rs +++ b/src/sinks/mod.rs @@ -3,6 +3,7 @@ use snafu::Snafu; pub mod aws_cloudwatch_logs; pub mod aws_cloudwatch_metrics; +pub mod aws_kinesis_firehose; pub mod aws_kinesis_streams; pub mod aws_s3; pub mod blackhole; diff --git a/website/docs/reference/sinks/aws_kinesis_firehose.md b/website/docs/reference/sinks/aws_kinesis_firehose.md new file mode 100644 index 0000000000000..511fc36bcd9f9 --- /dev/null +++ b/website/docs/reference/sinks/aws_kinesis_firehose.md @@ -0,0 +1,594 @@ +--- +delivery_guarantee: "at_least_once" +description: "The Vector `aws_kinesis_firehose` sink batches `log` events to Amazon Web Service's Kinesis Data Firehose via the `PutRecordBatch` API endpoint." +event_types: ["log"] +issues_url: https://github.com/timberio/vector/issues?q=is%3Aopen+is%3Aissue+label%3A%22sink%3A+aws_kinesis_firehose%22 +operating_systems: ["linux","macos","windows"] +sidebar_label: "aws_kinesis_firehose|[\"log\"]" +source_url: https://github.com/timberio/vector/tree/master/src/sinks/aws_kinesis_firehose.rs +status: "beta" +title: "AWS Kinesis Firehose Sink" +unsupported_operating_systems: [] +--- + +The Vector `aws_kinesis_firehose` sink [batches](#buffers--batches) [`log`][docs.data-model.log] events to [Amazon Web Service's Kinesis Data Firehose][urls.aws_kinesis_data_firehose] via the [`PutRecordBatch` API endpoint](https://docs.aws.amazon.com/firehose/latest/APIReference/API_PutRecordBatch.html). + + + +## Configuration + +import Tabs from '@theme/Tabs'; + + + +import TabItem from '@theme/TabItem'; + + + +import CodeHeader from '@site/src/components/CodeHeader'; + + + +```toml +[sinks.my_sink_id] + # REQUIRED - General + type = "aws_kinesis_firehose" # example, must be: "aws_kinesis_firehose" + inputs = ["my-source-id"] # example + region = "us-east-1" # example + stream_name = "my-stream" # example + + # REQUIRED - requests + encoding = "json" # example, enum +``` + + + + + + +```toml +[sinks.my_sink_id] + # REQUIRED - General + type = "aws_kinesis_firehose" # example, must be: "aws_kinesis_firehose" + inputs = ["my-source-id"] # example + region = "us-east-1" # example + stream_name = "my-stream" # example + + # REQUIRED - requests + encoding = "json" # example, enum + + # OPTIONAL - General + healthcheck = true # default + + # OPTIONAL - Batching + batch_size = 1049000 # default, bytes + batch_timeout = 1 # default, seconds + + # OPTIONAL - Requests + request_in_flight_limit = 5 # default + request_rate_limit_duration_secs = 1 # default, seconds + request_rate_limit_num = 5 # default + request_retry_attempts = 5 # default + request_retry_backoff_secs = 1 # default, seconds + request_timeout_secs = 30 # default, seconds + + # OPTIONAL - Buffer + [sinks.my_sink_id.buffer] + type = "memory" # default, enum + max_size = 104900000 # example, no default, bytes, relevant when type = "disk" + num_items = 500 # default, events, relevant when type = "memory" + when_full = "block" # default, enum +``` + + + + + +## Options + +import Fields from '@site/src/components/Fields'; + +import Field from '@site/src/components/Field'; + + + + + + +### batch_size + +The maximum size of a batch before it is flushed. See [Buffers & Batches](#buffers--batches) for more info. + + + + + + + +### batch_timeout + +The maximum age of a batch before it is flushed. See [Buffers & Batches](#buffers--batches) for more info. + + + + + + + +### buffer + +Configures the sink specific buffer. + + + + + + +#### max_size + +The maximum size of the buffer on the disk. + + + + + + + +#### num_items + +The maximum number of [events][docs.data-model#event] allowed in the buffer. + + + + + + + +#### type + +The buffer's type / location. `disk` buffers are persistent and will be retained between restarts. + + + + + + + +#### when_full + +The behavior when the buffer becomes full. + + + + + + + + + + + + +### encoding + +The encoding format used to serialize the events before outputting. + + + + + + + +### healthcheck + +Enables/disables the sink healthcheck upon start. See [Health Checks](#health-checks) for more info. + + + + + + + +### region + +The [AWS region][urls.aws_cw_logs_regions] of the target Kinesis Firehose delivery stream resides. + + + + + + + +### request_in_flight_limit + +The maximum number of in-flight requests allowed at any given time. See [Rate Limits](#rate-limits) for more info. + + + + + + + +### request_rate_limit_duration_secs + +The window used for the [`request_rate_limit_num`](#request_rate_limit_num) option See [Rate Limits](#rate-limits) for more info. + + + + + + + +### request_rate_limit_num + +The maximum number of requests allowed within the [`request_rate_limit_duration_secs`](#request_rate_limit_duration_secs) window. See [Rate Limits](#rate-limits) for more info. + + + + + + + +### request_retry_attempts + +The maximum number of retries to make for failed requests. See [Retry Policy](#retry-policy) for more info. + + + + + + + +### request_retry_backoff_secs + +The amount of time to wait before attempting a failed request again. See [Retry Policy](#retry-policy) for more info. + + + + + + + +### request_timeout_secs + +The maximum time a request can take before being aborted. It is highly recommended that you do not lower value below the service's internal timeout, as this could create orphaned requests, pile on retries, and result in deuplicate data downstream. + + + + + + + +### stream_name + +The [stream name][urls.aws_cw_logs_stream_name] of the target Kinesis Firehose delivery stream. + + + + + + + +## Output + +The `aws_kinesis_firehose` sink [batches](#buffers--batches) [`log`][docs.data-model.log] events to [Amazon Web Service's Kinesis Data Firehose][urls.aws_kinesis_data_firehose] via the [`PutRecordBatch` API endpoint](https://docs.aws.amazon.com/firehose/latest/APIReference/API_PutRecordBatch.html). +Batches are flushed via the [`batch_size`](#batch_size) or +[`batch_timeout`](#batch_timeout) options. You can learn more in the [buffers & +batches](#buffers--batches) section. +For example: + +## How It Works + +### Buffers & Batches + +import SVG from 'react-inlinesvg'; + + + +The `aws_kinesis_firehose` sink buffers & batches data as +shown in the diagram above. You'll notice that Vector treats these concepts +differently, instead of treating them as global concepts, Vector treats them +as sink specific concepts. This isolates sinks, ensuring services disruptions +are contained and [delivery guarantees][docs.guarantees] are honored. + +*Batches* are flushed when 1 of 2 conditions are met: + +1. The batch age meets or exceeds the configured [`batch_timeout`](#batch_timeout) (default: `1 seconds`). +2. The batch size meets or exceeds the configured [`batch_size`](#batch_size) (default: `1049000 bytes`). + +*Buffers* are controlled via the [`buffer.*`](#buffer) options. + +### Environment Variables + +Environment variables are supported through all of Vector's configuration. +Simply add `${MY_ENV_VAR}` in your Vector configuration file and the variable +will be replaced before being evaluated. + +You can learn more in the [Environment Variables][docs.configuration#environment-variables] +section. + +### Health Checks + +Health checks ensure that the downstream service is accessible and ready to +accept data. This check is performed upon sink initialization. +If the health check fails an error will be logged and Vector will proceed to +start. + +#### Require Health Checks + +If you'd like to exit immediately upon a health check failure, you can +pass the `--require-healthy` flag: + +```bash +vector --config /etc/vector/vector.toml --require-healthy +``` + +#### Disable Health Checks + +If you'd like to disable health checks for this sink you can set the +`healthcheck` option to `false`. + +### Rate Limits + +Vector offers a few levers to control the rate and volume of requests to the +downstream service. Start with the [`request_rate_limit_duration_secs`](#request_rate_limit_duration_secs) and +`request_rate_limit_num` options to ensure Vector does not exceed the specified +number of requests in the specified window. You can further control the pace at +which this window is saturated with the [`request_in_flight_limit`](#request_in_flight_limit) option, which +will guarantee no more than the specified number of requests are in-flight at +any given time. + +Please note, Vector's defaults are carefully chosen and it should be rare that +you need to adjust these. If you found a good reason to do so please share it +with the Vector team by [opening an issie][urls.new_aws_kinesis_firehose_sink_issue]. + +### Retry Policy + +Vector will retry failed requests (status == `429`, >= `500`, and != `501`). +Other responses will _not_ be retried. You can control the number of retry +attempts and backoff rate with the [`request_retry_attempts`](#request_retry_attempts) and +`request_retry_backoff_secs` options. + + +[docs.configuration#environment-variables]: /docs/setup/configuration/#environment-variables +[docs.data-model#event]: /docs/about/data-model/#event +[docs.data-model.log]: /docs/about/data-model/log/ +[docs.guarantees]: /docs/about/guarantees/ +[urls.aws_cw_logs_regions]: https://docs.aws.amazon.com/general/latest/gr/rande.html#cwl_region +[urls.aws_cw_logs_stream_name]: https://docs.aws.amazon.com/AmazonCloudWatch/latest/logs/Working-with-log-groups-and-streams.html +[urls.aws_kinesis_data_firehose]: https://aws.amazon.com/kinesis/data-firehose/ +[urls.new_aws_kinesis_firehose_sink_issue]: https://github.com/timberio/vector/issues/new?labels=sink%3A+aws_kinesis_firehose diff --git a/website/docs/reference/sinks/aws_kinesis_firehose.md.erb b/website/docs/reference/sinks/aws_kinesis_firehose.md.erb new file mode 100644 index 0000000000000..96488c403afa8 --- /dev/null +++ b/website/docs/reference/sinks/aws_kinesis_firehose.md.erb @@ -0,0 +1,27 @@ +<%- component = metadata.sinks.aws_kinesis_firehose -%> + +<%= component_header(component) %> + +## Configuration + +<%= component_config_example(component) %> + +## Options + +<%= options(component.specific_options_list, heading_depth: 3) %> + +<%- if component.env_vars_list.any? -%> +## Env Vars + +<%= options(component.env_vars_list, heading_depth: 3) %> + +<%- end -%> +<%- if component.output -%> +## Output + +<%= component_output(component, component.output) %> + +<%- end -%> +## How It Works [[sort]] + +<%= component_sections(component) %> diff --git a/website/metadata.js b/website/metadata.js index 0c6073105dc81..ab985e68a4c11 100644 --- a/website/metadata.js +++ b/website/metadata.js @@ -17790,6 +17790,28 @@ module.exports = { ] }, + "aws_kinesis_firehose": { + "beta": true, + "delivery_guarantee": "at_least_once", + "description": "Batches log events to [Amazon Web Service's Kinesis Data Firehose][urls.aws_kinesis_data_firehose] via the [`PutRecordBatch` API endpoint](https://docs.aws.amazon.com/firehose/latest/APIReference/API_PutRecordBatch.html).", + "event_types": [ + "log" + ], + "function_category": "transmit", + "id": "aws_kinesis_firehose_sink", + "name": "aws_kinesis_firehose", + "operating_systems": [ + "linux", + "macos", + "windows" + ], + "service_provider": "Amazon", + "status": "beta", + "type": "sink", + "unsupported_operating_systems": [ + + ] + }, "aws_kinesis_streams": { "beta": true, "delivery_guarantee": "at_least_once", diff --git a/website/sidebars.js b/website/sidebars.js index 83b0d540dbd0f..46f4a8555be82 100644 --- a/website/sidebars.js +++ b/website/sidebars.js @@ -192,6 +192,8 @@ module.exports = { "reference/sinks/aws_cloudwatch_metrics", + "reference/sinks/aws_kinesis_firehose", + "reference/sinks/aws_kinesis_streams", "reference/sinks/aws_s3",