Skip to content

Commit

Permalink
lots of progress
Browse files Browse the repository at this point in the history
  • Loading branch information
mwylde committed Dec 4, 2023
1 parent 175343c commit 963e5ba
Show file tree
Hide file tree
Showing 9 changed files with 491 additions and 301 deletions.
155 changes: 66 additions & 89 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

18 changes: 13 additions & 5 deletions arroyo-api/src/pipelines.rs
Original file line number Diff line number Diff line change
Expand Up @@ -61,7 +61,7 @@ async fn compile_sql<'e, E>(
parallelism: usize,
auth_data: &AuthData,
tx: &E,
) -> anyhow::Result<CompiledSql>
) -> anyhow::Result<Option<CompiledSql>>
where
E: GenericClient,
{
Expand Down Expand Up @@ -293,7 +293,8 @@ pub(crate) async fn create_pipeline<'a>(
tx,
)
.await
.map_err(|e| bad_request(e.to_string()))?;
.map_err(|e| bad_request(e.to_string()))?
.ok_or_else(|| bad_request("The provided SQL does not contain a query"))?;
text = Some(sql.query);
udfs = Some(api_udfs);
is_preview = sql.preview;
Expand Down Expand Up @@ -469,7 +470,7 @@ pub async fn validate_query(

let pipeline_graph_validation_result =
match compile_sql(validate_query_post.query, &udfs, 1, &auth_data, &client).await {
Ok(CompiledSql { mut program, .. }) => {
Ok(Some(CompiledSql { mut program, .. })) => {
optimizations::optimize(&mut program.graph);
let nodes = program
.graph
Expand Down Expand Up @@ -499,12 +500,19 @@ pub async fn validate_query(

QueryValidationResult {
graph: Some(PipelineGraph { nodes, edges }),
errors: None,
errors: vec![],
missing_query: false,
}
}
Ok(None) => QueryValidationResult {
graph: None,
errors: vec![],
missing_query: true,
},
Err(e) => QueryValidationResult {
graph: None,
errors: Some(vec![e.to_string()]),
errors: vec![e.to_string()],
missing_query: false,
},
};

Expand Down
10 changes: 7 additions & 3 deletions arroyo-rpc/src/api_types/pipelines.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ use crate::api_types::udfs::Udf;
use crate::grpc as grpc_proto;
use crate::grpc::api as api_proto;
use serde::{Deserialize, Serialize};
use serde_json::Value;
use utoipa::ToSchema;

#[derive(Serialize, Deserialize, Clone, Debug, ToSchema)]
Expand All @@ -15,7 +16,9 @@ pub struct ValidateQueryPost {
#[serde(rename_all = "camelCase")]
pub struct QueryValidationResult {
pub graph: Option<PipelineGraph>,
pub errors: Option<Vec<String>>,
#[serde(default)]
pub errors: Vec<String>,
pub missing_query: bool,
}

#[derive(Serialize, Deserialize, Clone, Debug, ToSchema)]
Expand Down Expand Up @@ -165,7 +168,7 @@ pub struct OutputData {
pub operator_id: String,
pub timestamp: u64,
pub key: String,
pub value: String,
pub value: Value,
}

impl From<grpc_proto::OutputData> for OutputData {
Expand All @@ -174,7 +177,8 @@ impl From<grpc_proto::OutputData> for OutputData {
operator_id: value.operator_id,
timestamp: value.timestamp,
key: value.key,
value: value.value,
value: serde_json::from_str(&value.value)
.expect("Received non-JSON data from web sink"),
}
}
}
1 change: 1 addition & 0 deletions arroyo-sql-macro/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -181,6 +181,7 @@ fn get_pipeline_module(
},
)
.unwrap()
.unwrap()
.program;

let function = program.make_graph_function();
Expand Down
Loading

0 comments on commit 963e5ba

Please sign in to comment.