Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add OpenObserve as an officially supported sink #21531

Open
wants to merge 10 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from 9 commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions .github/actions/spelling/allow.txt
Original file line number Diff line number Diff line change
Expand Up @@ -142,6 +142,7 @@ Nextbook
Nextcloud
OVH
Odys
openobserve
Openpeak
Oppo
Ovi
Expand Down
3 changes: 3 additions & 0 deletions .github/workflows/changes.yml
Original file line number Diff line number Diff line change
Expand Up @@ -104,6 +104,8 @@ on:
value: ${{ jobs.int_tests.outputs.nginx }}
opentelemetry:
value: ${{ jobs.int_tests.outputs.opentelemetry }}
openobserve:
value: ${{ jobs.int_tests.outputs.openobserve }}
postgres:
value: ${{ jobs.int_tests.outputs.postgres }}
prometheus:
Expand Down Expand Up @@ -230,6 +232,7 @@ jobs:
nats: ${{ steps.filter.outputs.nats }}
nginx: ${{ steps.filter.outputs.nginx }}
opentelemetry: ${{ steps.filter.outputs.opentelemetry }}
openobserve: ${{ steps.filter.outputs.openobserve }}
postgres: ${{ steps.filter.outputs.postgres }}
prometheus: ${{ steps.filter.outputs.prometheus }}
pulsar: ${{ steps.filter.outputs.pulsar }}
Expand Down
10 changes: 10 additions & 0 deletions .github/workflows/ci-integration-review.yml
Original file line number Diff line number Diff line change
Expand Up @@ -388,6 +388,16 @@ jobs:
max_attempts: 3
command: bash scripts/ci-int-e2e-test.sh int opentelemetry

- name: openobserve
if: ${{ startsWith(github.event.review.body, '/ci-run-integration-openobserve')
|| startsWith(github.event.review.body, '/ci-run-integration-all')
|| startsWith(github.event.review.body, '/ci-run-all') }}
uses: nick-fields/retry@v3
with:
timeout_minutes: 30
max_attempts: 3
command: bash scripts/ci-int-e2e-test.sh int openobserve

- name: postgres
if: ${{ startsWith(github.event.review.body, '/ci-run-integration-postgres')
|| startsWith(github.event.review.body, '/ci-run-integration-all')
Expand Down
10 changes: 10 additions & 0 deletions .github/workflows/integration.yml
Original file line number Diff line number Diff line change
Expand Up @@ -81,6 +81,7 @@ jobs:
|| needs.changes.outputs.nats == 'true'
|| needs.changes.outputs.nginx == 'true'
|| needs.changes.outputs.opentelemetry == 'true'
|| needs.changes.outputs.openobserve == 'true'
|| needs.changes.outputs.postgres == 'true'
|| needs.changes.outputs.prometheus == 'true'
|| needs.changes.outputs.pulsar == 'true'
Expand Down Expand Up @@ -355,6 +356,15 @@ jobs:
max_attempts: 3
command: bash scripts/ci-int-e2e-test.sh int opentelemetry

- if: (github.event_name == 'merge_group' || needs.changes.outputs.all-int == 'true' || needs.changes.outputs.openobserve == 'true') &&
(github.event_name != 'pull_request' || env.PR_HAS_ACCESS_TO_SECRETS == 'true')
name: openobserve
uses: nick-fields/retry@v3
with:
timeout_minutes: 30
max_attempts: 3
command: bash scripts/ci-int-e2e-test.sh int openobserve

- if: ${{ github.event_name == 'merge_group' || needs.changes.outputs.all-int == 'true' || needs.changes.outputs.postgres == 'true' }}
name: postgres
uses: nick-fields/retry@v3
Expand Down
4 changes: 4 additions & 0 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -709,6 +709,7 @@ sinks-logs = [
"sinks-new_relic",
"sinks-papertrail",
"sinks-pulsar",
"sinks-openobserve",
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don't see this used anywhere else.

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I was following #21514 (comment). Could you guide me on where to open a PR to be able to be added on https://vector.dev/guides/ ?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Hi @chaitanya-sistla, thank you for pointing me to this. That proposal makes sense to me.

Since there is no precedent, let's think about it a bit:

  • wrap an existing sink, hardcode configuration specific to OpenObserve and expose it as a new sink
  • add a guide to the existing http docs, see example here
  • add community guide on how to use it

Overall, I am happy with both options. But if we go with (1), we might need to do a few iterations since there is no precedent. Option (2) is more straightforward to do. For option (3), the guides live in website/content/en/guides.

Also, I would like to cc @jszwedko for awareness.

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thank you @pront I would like to follow the first approach if that is ideal. Does my PR reflect the Option (1)? Please let me know if I have to make any changes.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think there is precedent for (1). An example would be the recent changes to the Axiom sink: #21362

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Regarding the CLA, can you try from a different browser or cleaning your cookies? Seems like the Github signin failed for you.

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks, I opened in other browsers, it doesn't show an option called Agree anywhere and auto redirects me to github once I signed in with my github.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Chaitanya Sistla seems not to be a GitHub user. You need a GitHub account to be able to sign the CLA. If you have already a GitHub account, please add the email address used for this commit to your account.

I think the issue might be that your commits don't have an email address associated. Could you set an email address via git config user.email and the use git rebase to squash and then git commit --amend --reset-author to add an email address to the commits? The email you use should be associated with your GitHub account.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Hello @pront thank you. Yes I did and made the file changes, does this relate? I would love to make changes in addition but since we don't need any changes in type, I just modified the needed files.

The changes here only modify documentation. We need to actually implement another sink to follow existing precedent (i.e. write Rust code). See the axiom sink for an example: https://github.com/vectordotdev/vector/blob/master/src/sinks/axiom.rs. That sink also just wraps the http sink as you would do. Let me know if that is unclear!

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thank you, the cla issue is now resolved. Let me look into this.

"sinks-redis",
"sinks-sematext",
"sinks-socket",
Expand Down Expand Up @@ -772,6 +773,7 @@ sinks-mqtt = ["dep:rumqttc"]
sinks-nats = ["dep:async-nats", "dep:nkeys"]
sinks-new_relic_logs = ["sinks-http"]
sinks-new_relic = []
sinks-openobserve = ["sinks-http"]
sinks-papertrail = ["dep:syslog"]
sinks-prometheus = ["dep:base64", "dep:prost", "vector-lib/prometheus"]
sinks-pulsar = ["dep:apache-avro", "dep:pulsar", "dep:lru"]
Expand Down Expand Up @@ -821,6 +823,7 @@ all-integration-tests = [
"nats-integration-tests",
"nginx-integration-tests",
"opentelemetry-integration-tests",
"openobserve-integration-tests",
"postgresql_metrics-integration-tests",
"prometheus-integration-tests",
"pulsar-integration-tests",
Expand Down Expand Up @@ -885,6 +888,7 @@ mqtt-integration-tests = ["sinks-mqtt"]
nats-integration-tests = ["sinks-nats", "sources-nats"]
nginx-integration-tests = ["sources-nginx_metrics"]
opentelemetry-integration-tests = ["sources-opentelemetry", "dep:prost"]
openobserve-integration-tests = ["sinks-openobserve"]
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don't see any use of openobserve-integration-tests in this PR. Can we delete it here and from the github workflows?

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

yes

postgresql_metrics-integration-tests = ["sources-postgresql_metrics"]
prometheus-integration-tests = ["sinks-prometheus", "sources-prometheus", "sinks-influxdb"]
pulsar-integration-tests = ["sinks-pulsar", "sources-pulsar"]
Expand Down
2 changes: 1 addition & 1 deletion Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -367,7 +367,7 @@ test-integration: ## Runs all integration tests
test-integration: test-integration-amqp test-integration-appsignal test-integration-aws test-integration-axiom test-integration-azure test-integration-chronicle test-integration-clickhouse
test-integration: test-integration-databend test-integration-docker-logs test-integration-elasticsearch
test-integration: test-integration-eventstoredb test-integration-fluent test-integration-gcp test-integration-greptimedb test-integration-humio test-integration-http-client test-integration-influxdb
test-integration: test-integration-kafka test-integration-logstash test-integration-loki test-integration-mongodb test-integration-nats
test-integration: test-integration-kafka test-integration-logstash test-integration-loki test-integration-mongodb test-integration-nats test-integration-openobserve
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don't see any use of test-integration-openobserve in this PR. Can we delete it?

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

sure

test-integration: test-integration-nginx test-integration-opentelemetry test-integration-postgres test-integration-prometheus test-integration-pulsar
test-integration: test-integration-redis test-integration-splunk test-integration-dnstap test-integration-datadog-agent test-integration-datadog-logs test-integration-e2e-datadog-logs
test-integration: test-integration-datadog-traces test-integration-shutdown
Expand Down
2 changes: 2 additions & 0 deletions src/sinks/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -82,6 +82,8 @@ pub mod nats;
pub mod new_relic;
#[cfg(feature = "sinks-webhdfs")]
pub mod opendal_common;
#[cfg(feature = "sinks-openobserve")]
pub mod openobserve;
#[cfg(feature = "sinks-papertrail")]
pub mod papertrail;
#[cfg(feature = "sinks-prometheus")]
Expand Down
133 changes: 133 additions & 0 deletions src/sinks/openobserve.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,133 @@
use http::Uri;
use vector_lib::codecs::encoding::{FramingConfig, JsonSerializerConfig, SerializerConfig};
use vector_lib::configurable::configurable_component;

use crate::{
codecs::{EncodingConfig, EncodingConfigWithFraming},
config::{AcknowledgementsConfig, DataType, GenerateConfig, Input, SinkConfig, SinkContext},
http::{Auth, MaybeAuth},
sinks::{
http::config::{HttpMethod, HttpSinkConfig},
util::{
http::RequestConfig, BatchConfig, Compression, RealtimeSizeBasedDefaultBatchSettings,
UriSerde,
},
Healthcheck, VectorSink,
},
tls::TlsConfig,
};

/// Configuration for the `openobserve` sink.
#[configurable_component(sink("openobserve", "Deliver log events to OpenObserve."))]
#[derive(Clone, Debug)]
pub struct OpenObserveConfig {
/// The OpenObserve endpoint to send data to.
#[serde(default = "default_endpoint")]
#[configurable(metadata(docs::examples = "http://localhost:5080/api/default/default/_json"))]
uri: UriSerde,

/// The user and password to authenticate with OpenObserve endpoint.
#[configurable(derived)]
auth: Option<Auth>,

#[configurable(derived)]
#[serde(default)]
request: RequestConfig,

/// The compression algorithm to use.
#[configurable(derived)]
#[serde(default = "Compression::gzip_default")]
compression: Compression,

#[configurable(derived)]
encoding: EncodingConfig,

/// The batch settings for the sink.
#[configurable(derived)]
#[serde(default)]
pub batch: BatchConfig<RealtimeSizeBasedDefaultBatchSettings>,

/// Controls how acknowledgements are handled for this sink.
#[configurable(derived)]
#[serde(
default,
deserialize_with = "crate::serde::bool_or_struct",
skip_serializing_if = "crate::serde::is_default"
)]
acknowledgements: AcknowledgementsConfig,

/// The TLS settings for the connection.
///
/// Optional, constrains TLS settings for this sink.
#[configurable(derived)]
tls: Option<TlsConfig>,
}

impl GenerateConfig for OpenObserveConfig {
fn generate_config() -> toml::Value {
toml::from_str(
r#"
uri = "http://localhost:5080/api/default/default/_json"
Auth = "user: [email protected], password: your_ingestion_password"
encoding.codec = "json"
"#,
)
.unwrap()
}
}

fn default_endpoint() -> UriSerde {
UriSerde {
uri: Uri::from_static("http://localhost:5080/api/default/default/_json"),
auth: None,
}
}

#[async_trait::async_trait]
#[typetag::serde(name = "openobserve")]
impl SinkConfig for OpenObserveConfig {
async fn build(&self, cx: SinkContext) -> crate::Result<(VectorSink, Healthcheck)> {
let request = self.request.clone();

// OpenObserve supports native HTTP ingest endpoint. This configuration wraps
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I would move this above SinkConfig and rephrase as:

/// This sink wraps the Vector HTTP sink to provide official support for OpenObserve's
/// native HTTP ingest endpoint. By doing so, it maintains a distinct configuration for 
/// the OpenObserve sink, separate from the Vector HTTP sink. This approach ensures
/// that future changes to OpenObserve's interface can be accommodated without impacting 
/// the underlying Vector HTTP sink.

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

done

// the vector HTTP sink to provide official support for OpenObserve. This sink will
// allow maintaining the vector OpenObserve sink independent of the vector HTTP sink
// configuration and will allow to accomodate any future changes to the interface.
let http_sink_config = HttpSinkConfig {
uri: self.uri.clone(),
compression: self.compression,
auth: self.auth.choose_one(&self.uri.auth)?,
method: HttpMethod::Post,
tls: self.tls.clone(),
request,
acknowledgements: self.acknowledgements,
batch: self.batch,
headers: None,
encoding: EncodingConfigWithFraming::new(
Some(FramingConfig::Bytes),
SerializerConfig::Json(JsonSerializerConfig::default()),
self.encoding.transformer(),
),
payload_prefix: "".into(), // Always newline delimited JSON
payload_suffix: "".into(), // Always newline delimited JSON
};

http_sink_config.build(cx).await
}

fn input(&self) -> Input {
Input::new(self.encoding.config().input_type() & DataType::Log)
}

fn acknowledgements(&self) -> &AcknowledgementsConfig {
&self.acknowledgements
}
}

#[cfg(test)]
mod test {
#[test]
fn generate_config() {
crate::test_util::test_generate_config::<super::OpenObserveConfig>();
}
}
Loading
Loading