Skip to content

Commit

Permalink
Replace VarStr object in schema
Browse files Browse the repository at this point in the history
  • Loading branch information
jbeisen committed Dec 4, 2023
1 parent d59c9fd commit 5a1d239
Show file tree
Hide file tree
Showing 12 changed files with 146 additions and 48 deletions.
38 changes: 28 additions & 10 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

2 changes: 1 addition & 1 deletion arroyo-api/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -54,7 +54,7 @@ tracing = "0.1"
anyhow = "1.0.70"

# json-schema support
typify = "0.0.13"
typify = "0.0.14"
schemars = "0.8"

# metric querying
Expand Down
2 changes: 1 addition & 1 deletion arroyo-connectors/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@ serde_json = "1"

tokio = { version = "1", features = ["full"] }

typify = "0.0.13"
typify = "0.0.14"
schemars = "0.8"

tonic = {workspace = true}
Expand Down
6 changes: 3 additions & 3 deletions arroyo-connectors/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@ use arroyo_rpc::api_types::connections::{
ConnectionProfile, ConnectionSchema, ConnectionType, FieldType, SourceField, SourceFieldType,
};
use arroyo_rpc::primitive_to_sql;
use arroyo_types::{string_to_map, sub_env_vars};
use arroyo_types::string_to_map;
use axum::response::sse::Event;
use blackhole::BlackholeConnector;
use fluvio::FluvioConnector;
Expand Down Expand Up @@ -331,7 +331,7 @@ pub(crate) fn nullable_field(name: &str, field_type: SourceFieldType) -> SourceF
}
}

fn construct_http_client(endpoint: &str, headers: Option<&String>) -> anyhow::Result<Client> {
fn construct_http_client(endpoint: &str, headers: Option<String>) -> anyhow::Result<Client> {
if let Err(e) = reqwest::Url::parse(&endpoint) {
bail!("invalid endpoint '{}': {:?}", endpoint, e)
};
Expand All @@ -344,7 +344,7 @@ fn construct_http_client(endpoint: &str, headers: Option<&String>) -> anyhow::Re
Ok((
TryInto::<HeaderName>::try_into(&k)
.map_err(|_| anyhow!("invalid header name {}", k))?,
TryInto::<HeaderValue>::try_into(sub_env_vars(&v).map_err(|e| anyhow!(e))?)
TryInto::<HeaderValue>::try_into(&v)
.map_err(|_| anyhow!("invalid header value {}", v))?,
))
})
Expand Down
17 changes: 11 additions & 6 deletions arroyo-connectors/src/polling_http.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@ use std::convert::Infallible;

use anyhow::anyhow;
use arroyo_rpc::OperatorConfig;
use arroyo_types::string_to_map;
use arroyo_types::{string_to_map, VarStr};
use axum::response::sse::Event;
use reqwest::{Client, Request};
use tokio::sync::mpsc::Sender;
Expand All @@ -20,7 +20,10 @@ use super::Connector;

const TABLE_SCHEMA: &str = include_str!("../../connector-schemas/polling_http/table.json");

import_types!(schema = "../connector-schemas/polling_http/table.json");
import_types!(
schema = "../connector-schemas/polling_http/table.json",
replace = { VarStr = VarStr }
);
const ICON: &str = include_str!("../resources/http.svg");

pub struct PollingHTTPConnector {}
Expand Down Expand Up @@ -55,8 +58,10 @@ impl PollingHTTPConnector {
config: &PollingHttpTable,
tx: Sender<Result<Event, Infallible>>,
) -> anyhow::Result<()> {
let client =
construct_http_client(&config.endpoint, config.headers.as_ref().map(|t| &t.0))?;
let client = construct_http_client(
&config.endpoint,
config.headers.as_ref().map(|t| t.sub_env_vars()),
)?;
let req = Self::construct_test_request(&client, config)?;

tx.send(Ok(Event::default()
Expand Down Expand Up @@ -166,7 +171,7 @@ impl Connector for PollingHTTPConnector {
EmptyConfig {},
PollingHttpTable {
endpoint,
headers: headers.map(Headers),
headers: headers.map(|s| VarStr { raw_val: s }),
method,
body,
poll_interval_ms: interval,
Expand All @@ -187,7 +192,7 @@ impl Connector for PollingHTTPConnector {
let description = format!("PollingHTTPSource<{}>", table.endpoint);

if let Some(headers) = &table.headers {
string_to_map(headers).ok_or_else(|| {
string_to_map(&headers.sub_env_vars()).ok_or_else(|| {
anyhow!(
"Invalid format for headers; should be a \
comma-separated list of colon-separated key value pairs"
Expand Down
10 changes: 7 additions & 3 deletions arroyo-connectors/src/webhook.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ use std::convert::Infallible;
use anyhow::anyhow;
use arroyo_rpc::OperatorConfig;

use arroyo_types::VarStr;
use axum::response::sse::Event;
use reqwest::{Client, Request};
use serde_json::json;
Expand Down Expand Up @@ -47,8 +48,10 @@ impl WebhookConnector {
config: &WebhookTable,
tx: Sender<Result<Event, Infallible>>,
) -> anyhow::Result<()> {
let client =
construct_http_client(&config.endpoint, config.headers.as_ref().map(|t| &t.0))?;
let client = construct_http_client(
&config.endpoint,
config.headers.as_ref().map(|t| t.0.clone()),
)?;
let req = Self::construct_test_request(&client, config)?;

tx.send(Ok(Event::default()
Expand Down Expand Up @@ -183,7 +186,8 @@ impl Connector for WebhookConnector {
.map_err(|e| anyhow!("invalid value for 'headers' config: {:?}", e))?;

let table = WebhookTable { endpoint, headers };
let client = construct_http_client(&table.endpoint, table.headers.as_ref().map(|t| &t.0))?;
let client =
construct_http_client(&table.endpoint, table.headers.as_ref().map(|t| t.0.clone()))?;
let _ = Self::construct_test_request(&client, &table)?;

self.from_config(None, name, EmptyConfig {}, table, schema)
Expand Down
30 changes: 30 additions & 0 deletions arroyo-console/src/routes/connections/JsonForm.tsx
Original file line number Diff line number Diff line change
Expand Up @@ -437,15 +437,20 @@ export function FormInner({
return value;
}

// console.log(JSON.stringify(schema, null, 2));

return (
<Stack spacing={6}>
<Text>{JSON.stringify(values)}</Text>
<Text>{JSON.stringify(errors)}</Text>
{Object.keys(schema.properties || {})
.filter(key => {
const property = schema.properties![key];
// @ts-ignore
return !property.deprecated ?? true;
})
.map(key => {
console.log('key', key);
const property = schema.properties![key];
const nextPath = (path ? `${path}.` : '') + key;
if (typeof property == 'object') {
Expand Down Expand Up @@ -603,7 +608,32 @@ export function FormInner({
/>
</Box>
);
} else if (property.$ref?.endsWith('VarStr')) {
const valPath = nextPath + '.raw_val';
return (
<Box border={'1px solid red'}>
<Text>{JSON.stringify(property)}</Text>
<StringWidget
path={valPath}
key={key}
title={property.title || key}
description={property.description}
required={schema.required?.includes(key)}
// @ts-ignore
maxLength={property.maxLength}
// @ts-ignore
placeholder={
property.examples ? (property.examples[0] as string) : undefined
}
value={traversePath(values, valPath)}
errors={errors}
onChange={onChange}
/>
</Box>
);
} else {
console.info('UNSUPPOrTED OBJECT', property);
console.info('defs: ', schema.$defs);
console.warn('Unsupported property', property);
return <></>;
}
Expand Down
2 changes: 1 addition & 1 deletion arroyo-sql/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,7 @@ proc-macro2 = "1"
syn = {version = "2", features = ["full", "parsing"]}
tracing = "0.1.37"

typify = "0.0.13"
typify = "0.0.14"
schemars = "0.8"
serde_json_path = "0.6.3"
apache-avro = "0.16.0"
Expand Down
14 changes: 13 additions & 1 deletion arroyo-types/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -869,7 +869,19 @@ pub fn range_for_server(i: usize, n: usize) -> RangeInclusive<u64> {
start..=end
}

pub fn sub_env_vars(input: &str) -> Result<String, String> {
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct VarStr {
pub raw_val: String,
}

impl VarStr {
pub fn sub_env_vars(&self) -> String {
// TODO: return result
sub_env_vars(&self.raw_val).unwrap()
}
}

fn sub_env_vars(input: &str) -> Result<String, String> {
// Regex to match patterns like {{ VAR_NAME }}
static RE: OnceLock<Regex> = OnceLock::new();
let re = RE.get_or_init(|| Regex::new(r"\{\{\s*(\w+)\s*\}\}").unwrap());
Expand Down
2 changes: 1 addition & 1 deletion arroyo-worker/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -66,7 +66,7 @@ rdkafka-sys = "4.5.0"
eventsource-client = "0.11.0"
regex = "1.8.1"
anyhow = "1.0.71"
typify = "0.0.13"
typify = "0.0.14"
regress = "0.6.0"
tokio-tungstenite = { version = "0.20.1", features = ["native-tls"] }
fluvio = {version = "=0.21", features = ["openssl"]}
Expand Down
39 changes: 23 additions & 16 deletions arroyo-worker/src/connectors/polling_http.rs
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@ use std::{marker::PhantomData, time::Duration};
use arroyo_macro::source_fn;
use arroyo_rpc::ControlMessage;
use arroyo_rpc::{grpc::TableDescriptor, OperatorConfig};
use arroyo_types::{string_to_map, sub_env_vars, Message, Record, UserError, Watermark};
use arroyo_types::{string_to_map, Message, Record, UserError, VarStr, Watermark};

use serde::{Deserialize, Serialize};
use tokio::select;
Expand All @@ -26,8 +26,10 @@ use crate::{
SourceFinishType,
};

import_types!(schema = "../connector-schemas/polling_http/table.json");

import_types!(
schema = "../connector-schemas/polling_http/table.json",
replace = { VarStr = VarStr }
);
const DEFAULT_POLLING_INTERVAL: Duration = Duration::from_secs(1);
const MAX_BODY_SIZE: usize = 5 * 1024 * 1024; // 5M ought to be enough for anybody

Expand Down Expand Up @@ -66,19 +68,24 @@ where
let table: PollingHttpTable =
serde_json::from_value(config.table).expect("Invalid table config for WebhookSink");

let headers = string_to_map(table.headers.as_ref().map(|t| t.0.as_str()).unwrap_or(""))
.expect("Invalid header map")
.into_iter()
.map(|(k, v)| {
(
(&k).try_into()
.expect(&format!("invalid header name {}", k)),
(sub_env_vars(&v).unwrap())
.try_into()
.expect(&format!("invalid header value {}", v)),
)
})
.collect();
let headers = string_to_map(
&table
.headers
.as_ref()
.map(|t| t.sub_env_vars())
.unwrap_or("".to_string()),
)
.expect("Invalid header map")
.into_iter()
.map(|(k, v)| {
(
(&k).try_into()
.expect(&format!("invalid header name {}", k)),
(&v).try_into()
.expect(&format!("invalid header value {}", v)),
)
})
.collect();

let deserializer = DataDeserializer::new(
config
Expand Down
Loading

0 comments on commit 5a1d239

Please sign in to comment.