Skip to content

Commit

Permalink
Support environment variable substitution
Browse files Browse the repository at this point in the history
  • Loading branch information
jbeisen committed Nov 30, 2023
1 parent 4d3ff50 commit 96fcce8
Show file tree
Hide file tree
Showing 7 changed files with 72 additions and 14 deletions.
21 changes: 14 additions & 7 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

8 changes: 5 additions & 3 deletions arroyo-connectors/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@ use arroyo_rpc::api_types::connections::{
ConnectionProfile, ConnectionSchema, ConnectionType, FieldType, SourceField, SourceFieldType,
};
use arroyo_rpc::primitive_to_sql;
use arroyo_types::string_to_map;
use arroyo_types::{string_to_map, substitute_env_vars};
use axum::response::sse::Event;
use blackhole::BlackholeConnector;
use fluvio::FluvioConnector;
Expand Down Expand Up @@ -344,8 +344,10 @@ fn construct_http_client(endpoint: &str, headers: Option<&String>) -> anyhow::Re
Ok((
TryInto::<HeaderName>::try_into(&k)
.map_err(|_| anyhow!("invalid header name {}", k))?,
TryInto::<HeaderValue>::try_into(&v)
.map_err(|_| anyhow!("invalid header value {}", v))?,
TryInto::<HeaderValue>::try_into(
substitute_env_vars(&v).map_err(|e| anyhow!(e))?,
)
.map_err(|_| anyhow!("invalid header value {}", v))?,
))
})
.collect();
Expand Down
2 changes: 1 addition & 1 deletion arroyo-connectors/src/polling_http.rs
Original file line number Diff line number Diff line change
Expand Up @@ -126,7 +126,7 @@ impl Connector for PollingHTTPConnector {
Err(err) => TestSourceMessage {
error: true,
done: true,
message: format!("{:?}", err),
message: format!("{:?}", err.root_cause()),
},
};

Expand Down
1 change: 1 addition & 0 deletions arroyo-types/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -8,3 +8,4 @@ bincode = "2.0.0-rc.3"
serde = { version = "1.0", features = ["derive"] }
arrow = { workspace = true }
arrow-array = { workspace = true }
regex = "1.10.2"
47 changes: 47 additions & 0 deletions arroyo-types/src/lib.rs
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
use arrow::datatypes::SchemaRef;
use arrow_array::RecordBatch;
use bincode::{config, Decode, Encode};
use regex::Regex;
use serde::ser::SerializeStruct;
use serde::{Deserialize, Serialize};
use std::collections::HashMap;
Expand Down Expand Up @@ -807,6 +808,7 @@ pub enum DateTruncPrecision {
}

use std::convert::TryFrom;
use std::sync::OnceLock;

impl TryFrom<&str> for DatePart {
type Error = String;
Expand Down Expand Up @@ -867,6 +869,28 @@ pub fn range_for_server(i: usize, n: usize) -> RangeInclusive<u64> {
start..=end
}

pub fn substitute_env_vars(input: &str) -> Result<String, String> {
// Regex to match patterns like {{ VAR_NAME }}
static RE: OnceLock<Regex> = OnceLock::new();
let re = RE.get_or_init(|| Regex::new(r"\{\{\s*(\w+)\s*\}\}").unwrap());

let mut result = input.to_string();

for caps in re.captures_iter(input) {
let var_name = caps.get(1).unwrap().as_str();
let full_match = caps.get(0).unwrap().as_str();

match env::var(var_name) {
Ok(value) => {
result = result.replace(full_match, &value);
}
Err(_) => return Err(format!("Environment variable {} not found", var_name)),
}
}

Ok(result)
}

#[cfg(test)]
mod tests {
use super::*;
Expand Down Expand Up @@ -903,4 +927,27 @@ mod tests {
"u64::MAX is not in the correct range"
);
}

#[test]
fn test_no_placeholders() {
let input = "This is a test string with no placeholders";
assert_eq!(substitute_env_vars(input).unwrap(), input);
}

#[test]
fn test_with_placeholders() {
env::set_var("TEST_VAR", "environment variable");
let input = "This is a {{ TEST_VAR }}";
let expected = "This is a environment variable";
assert_eq!(substitute_env_vars(input).unwrap(), expected);
}

#[test]
fn test_multiple_placeholders() {
env::set_var("VAR1", "first");
env::set_var("VAR2", "second");
let input = "Here is the {{ VAR1 }} and here is the {{ VAR2 }}";
let expected = "Here is the first and here is the second";
assert_eq!(substitute_env_vars(input).unwrap(), expected);
}
}
5 changes: 3 additions & 2 deletions arroyo-worker/src/connectors/polling_http.rs
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@ use std::{marker::PhantomData, time::Duration};
use arroyo_macro::source_fn;
use arroyo_rpc::ControlMessage;
use arroyo_rpc::{grpc::TableDescriptor, OperatorConfig};
use arroyo_types::{string_to_map, Message, Record, UserError, Watermark};
use arroyo_types::{string_to_map, substitute_env_vars, Message, Record, UserError, Watermark};

use serde::{Deserialize, Serialize};
use tokio::select;
Expand Down Expand Up @@ -73,7 +73,8 @@ where
(
(&k).try_into()
.expect(&format!("invalid header name {}", k)),
(&v).try_into()
(substitute_env_vars(&v).unwrap())
.try_into()
.expect(&format!("invalid header value {}", v)),
)
})
Expand Down
2 changes: 1 addition & 1 deletion connector-schemas/polling_http/table.json
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@
"headers": {
"title": "Headers",
"type": "string",
"description": "Comma separated list of headers to send with the request",
"description": "Comma separated list of headers to send with the request. May contain [variables](https://doc.arroyo.dev/todo).",
"pattern": "([a-zA-Z0-9-]+: ?.+,)*([a-zA-Z0-9-]+: ?.+)",
"examples": ["Authentication: digest 1234,Content-Type: application/json"]
},
Expand Down

0 comments on commit 96fcce8

Please sign in to comment.