Skip to content

Commit

Permalink
Replace VarStr object in schema
Browse files Browse the repository at this point in the history
  • Loading branch information
jbeisen committed Dec 5, 2023
1 parent d59c9fd commit 900b18b
Show file tree
Hide file tree
Showing 7 changed files with 125 additions and 47 deletions.
6 changes: 3 additions & 3 deletions arroyo-connectors/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@ use arroyo_rpc::api_types::connections::{
ConnectionProfile, ConnectionSchema, ConnectionType, FieldType, SourceField, SourceFieldType,
};
use arroyo_rpc::primitive_to_sql;
use arroyo_types::{string_to_map, sub_env_vars};
use arroyo_types::string_to_map;
use axum::response::sse::Event;
use blackhole::BlackholeConnector;
use fluvio::FluvioConnector;
Expand Down Expand Up @@ -331,7 +331,7 @@ pub(crate) fn nullable_field(name: &str, field_type: SourceFieldType) -> SourceF
}
}

fn construct_http_client(endpoint: &str, headers: Option<&String>) -> anyhow::Result<Client> {
fn construct_http_client(endpoint: &str, headers: Option<String>) -> anyhow::Result<Client> {
if let Err(e) = reqwest::Url::parse(&endpoint) {
bail!("invalid endpoint '{}': {:?}", endpoint, e)
};
Expand All @@ -344,7 +344,7 @@ fn construct_http_client(endpoint: &str, headers: Option<&String>) -> anyhow::Re
Ok((
TryInto::<HeaderName>::try_into(&k)
.map_err(|_| anyhow!("invalid header name {}", k))?,
TryInto::<HeaderValue>::try_into(sub_env_vars(&v).map_err(|e| anyhow!(e))?)
TryInto::<HeaderValue>::try_into(&v)
.map_err(|_| anyhow!("invalid header value {}", v))?,
))
})
Expand Down
25 changes: 19 additions & 6 deletions arroyo-connectors/src/polling_http.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@ use std::convert::Infallible;

use anyhow::anyhow;
use arroyo_rpc::OperatorConfig;
use arroyo_types::string_to_map;
use arroyo_types::{string_to_map, VarStr};
use axum::response::sse::Event;
use reqwest::{Client, Request};
use tokio::sync::mpsc::Sender;
Expand All @@ -20,7 +20,10 @@ use super::Connector;

const TABLE_SCHEMA: &str = include_str!("../../connector-schemas/polling_http/table.json");

import_types!(schema = "../connector-schemas/polling_http/table.json");
import_types!(
schema = "../connector-schemas/polling_http/table.json",
replace = { VarStr = VarStr }
);
const ICON: &str = include_str!("../resources/http.svg");

pub struct PollingHTTPConnector {}
Expand Down Expand Up @@ -55,8 +58,18 @@ impl PollingHTTPConnector {
config: &PollingHttpTable,
tx: Sender<Result<Event, Infallible>>,
) -> anyhow::Result<()> {
let client =
construct_http_client(&config.endpoint, config.headers.as_ref().map(|t| &t.0))?;
let headers = match &config.headers {
Some(headers) => {
let s = match headers.sub_env_vars() {
Ok(s) => s,
Err(e) => return Err(anyhow!(e)),
};
Some(s)
}
None => None,
};

let client = construct_http_client(&config.endpoint, headers)?;
let req = Self::construct_test_request(&client, config)?;

tx.send(Ok(Event::default()
Expand Down Expand Up @@ -166,7 +179,7 @@ impl Connector for PollingHTTPConnector {
EmptyConfig {},
PollingHttpTable {
endpoint,
headers: headers.map(Headers),
headers: headers.map(|s| VarStr { raw_val: s }),
method,
body,
poll_interval_ms: interval,
Expand All @@ -187,7 +200,7 @@ impl Connector for PollingHTTPConnector {
let description = format!("PollingHTTPSource<{}>", table.endpoint);

if let Some(headers) = &table.headers {
string_to_map(headers).ok_or_else(|| {
string_to_map(&headers.sub_env_vars().map_err(|e| anyhow!(e))?).ok_or_else(|| {
anyhow!(
"Invalid format for headers; should be a \
comma-separated list of colon-separated key value pairs"
Expand Down
10 changes: 7 additions & 3 deletions arroyo-connectors/src/webhook.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ use std::convert::Infallible;
use anyhow::anyhow;
use arroyo_rpc::OperatorConfig;

use arroyo_types::VarStr;
use axum::response::sse::Event;
use reqwest::{Client, Request};
use serde_json::json;
Expand Down Expand Up @@ -47,8 +48,10 @@ impl WebhookConnector {
config: &WebhookTable,
tx: Sender<Result<Event, Infallible>>,
) -> anyhow::Result<()> {
let client =
construct_http_client(&config.endpoint, config.headers.as_ref().map(|t| &t.0))?;
let client = construct_http_client(
&config.endpoint,
config.headers.as_ref().map(|t| t.0.clone()),
)?;
let req = Self::construct_test_request(&client, config)?;

tx.send(Ok(Event::default()
Expand Down Expand Up @@ -183,7 +186,8 @@ impl Connector for WebhookConnector {
.map_err(|e| anyhow!("invalid value for 'headers' config: {:?}", e))?;

let table = WebhookTable { endpoint, headers };
let client = construct_http_client(&table.endpoint, table.headers.as_ref().map(|t| &t.0))?;
let client =
construct_http_client(&table.endpoint, table.headers.as_ref().map(|t| t.0.clone()))?;
let _ = Self::construct_test_request(&client, &table)?;

self.from_config(None, name, EmptyConfig {}, table, schema)
Expand Down
25 changes: 25 additions & 0 deletions arroyo-console/src/routes/connections/JsonForm.tsx
Original file line number Diff line number Diff line change
Expand Up @@ -437,6 +437,8 @@ export function FormInner({
return value;
}

// console.log(JSON.stringify(schema, null, 2));

return (
<Stack spacing={6}>
{Object.keys(schema.properties || {})
Expand Down Expand Up @@ -603,6 +605,29 @@ export function FormInner({
/>
</Box>
);
} else if (property.$ref?.endsWith('VarStr')) {
const valPath = nextPath + '.raw_val';
return (
<Box border={'1px solid red'}>
<Text>{JSON.stringify(property)}</Text>
<StringWidget
path={valPath}
key={key}
title={property.title || key}
description={property.description}
required={schema.required?.includes(key)}
// @ts-ignore
maxLength={property.maxLength}
// @ts-ignore
placeholder={
property.examples ? (property.examples[0] as string) : undefined
}
value={traversePath(values, valPath)}
errors={errors}
onChange={onChange}
/>
</Box>
);
} else {
console.warn('Unsupported property', property);
return <></>;
Expand Down
35 changes: 21 additions & 14 deletions arroyo-types/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -869,26 +869,33 @@ pub fn range_for_server(i: usize, n: usize) -> RangeInclusive<u64> {
start..=end
}

pub fn sub_env_vars(input: &str) -> Result<String, String> {
// Regex to match patterns like {{ VAR_NAME }}
static RE: OnceLock<Regex> = OnceLock::new();
let re = RE.get_or_init(|| Regex::new(r"\{\{\s*(\w+)\s*\}\}").unwrap());
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct VarStr {
pub raw_val: String,
}

impl VarStr {
pub fn sub_env_vars(&self) -> Result<String, String> {
// Regex to match patterns like {{ VAR_NAME }}
static RE: OnceLock<Regex> = OnceLock::new();
let re = RE.get_or_init(|| Regex::new(r"\{\{\s*(\w+)\s*\}\}").unwrap());

let mut result = input.to_string();
let mut result = self.raw_val.to_string();

for caps in re.captures_iter(input) {
let var_name = caps.get(1).unwrap().as_str();
let full_match = caps.get(0).unwrap().as_str();
for caps in re.captures_iter(&self.raw_val) {
let var_name = caps.get(1).unwrap().as_str();
let full_match = caps.get(0).unwrap().as_str();

match env::var(var_name) {
Ok(value) => {
result = result.replace(full_match, &value);
match env::var(var_name) {
Ok(value) => {
result = result.replace(full_match, &value);
}
Err(_) => return Err(format!("Environment variable {} not found", var_name)),
}
Err(_) => return Err(format!("Environment variable {} not found", var_name)),
}
}

Ok(result)
Ok(result)
}
}

#[cfg(test)]
Expand Down
39 changes: 23 additions & 16 deletions arroyo-worker/src/connectors/polling_http.rs
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@ use std::{marker::PhantomData, time::Duration};
use arroyo_macro::source_fn;
use arroyo_rpc::ControlMessage;
use arroyo_rpc::{grpc::TableDescriptor, OperatorConfig};
use arroyo_types::{string_to_map, sub_env_vars, Message, Record, UserError, Watermark};
use arroyo_types::{string_to_map, Message, Record, UserError, VarStr, Watermark};

use serde::{Deserialize, Serialize};
use tokio::select;
Expand All @@ -26,8 +26,10 @@ use crate::{
SourceFinishType,
};

import_types!(schema = "../connector-schemas/polling_http/table.json");

import_types!(
schema = "../connector-schemas/polling_http/table.json",
replace = { VarStr = VarStr }
);
const DEFAULT_POLLING_INTERVAL: Duration = Duration::from_secs(1);
const MAX_BODY_SIZE: usize = 5 * 1024 * 1024; // 5M ought to be enough for anybody

Expand Down Expand Up @@ -66,19 +68,24 @@ where
let table: PollingHttpTable =
serde_json::from_value(config.table).expect("Invalid table config for WebhookSink");

let headers = string_to_map(table.headers.as_ref().map(|t| t.0.as_str()).unwrap_or(""))
.expect("Invalid header map")
.into_iter()
.map(|(k, v)| {
(
(&k).try_into()
.expect(&format!("invalid header name {}", k)),
(sub_env_vars(&v).unwrap())
.try_into()
.expect(&format!("invalid header value {}", v)),
)
})
.collect();
let headers = string_to_map(
&table
.headers
.as_ref()
.map(|t| t.sub_env_vars().unwrap_or_else(|e| panic!("{}", e)))
.unwrap_or("".to_string()),
)
.expect("Invalid header map")
.into_iter()
.map(|(k, v)| {
(
(&k).try_into()
.expect(&format!("invalid header name {}", k)),
(&v).try_into()
.expect(&format!("invalid header value {}", v)),
)
})
.collect();

let deserializer = DataDeserializer::new(
config
Expand Down
32 changes: 27 additions & 5 deletions connector-schemas/polling_http/table.json
Original file line number Diff line number Diff line change
@@ -1,6 +1,24 @@
{
"type": "object",
"title": "PollingHTTPTable",
"$defs": {
"VarStr": {
"type": "object",
"properties": {
"raw_val": {
"type": "string",
"title": "Raw Value",
"description": "The raw value. May contain [variables](https://doc.arroyo.dev/todo).",
"examples": [
"{{ MY_ENV_VAR }}"
]
}
},
"required": [
"raw_val"
]
}
},
"properties": {
"endpoint": {
"title": "Endpoint",
Expand All @@ -11,10 +29,10 @@
},
"headers": {
"title": "Headers",
"type": "string",
"type": "object",
"description": "Comma separated list of headers to send with the request. May contain [variables](https://doc.arroyo.dev/todo).",
"pattern": "([a-zA-Z0-9-]+: ?.+,)*([a-zA-Z0-9-]+: ?.+)",
"examples": ["Authentication: digest 1234,Content-Type: application/json"]
"examples": ["Authentication: digest 1234,Content-Type: application/json"],
"$ref": "#/$defs/VarStr"
},
"method": {
"title": "Method",
Expand All @@ -26,7 +44,9 @@
"PUT",
"PATCH"
],
"examples": ["GET"]
"examples": [
"GET"
]
},
"body": {
"title": "Body",
Expand All @@ -37,7 +57,9 @@
"title": "Polling Interval (ms)",
"type": "integer",
"description": "Number of milliseconds to wait between successful polls of the HTTP endpoint",
"examples": ["1000"]
"examples": [
"1000"
]
},
"emit_behavior": {
"title": "Emit Behavior",
Expand Down

0 comments on commit 900b18b

Please sign in to comment.