diff --git a/core/src/response.rs b/core/src/response.rs index 76208ffef..0f8961ecb 100644 --- a/core/src/response.rs +++ b/core/src/response.rs @@ -45,7 +45,7 @@ pub struct ProgressValues { pub bytes: usize, } -#[derive(Deserialize, Debug)] +#[derive(Deserialize, Debug, Clone)] pub struct SchemaField { pub name: String, #[serde(rename = "type")] diff --git a/driver/src/rest_api.rs b/driver/src/rest_api.rs index 8b0f23d50..133ed08f2 100644 --- a/driver/src/rest_api.rs +++ b/driver/src/rest_api.rs @@ -71,6 +71,7 @@ impl Connection for RestAPIConnection { async fn query_iter_ext(&self, sql: &str) -> Result { info!("query iter ext: {}", sql); let resp = self.client.start_query(sql).await?; + let resp = self.wait_for_schema(resp).await?; let (schema, rows) = RestAPIRows::from_response(self.client.clone(), resp)?; Ok(RowStatsIterator::new(Arc::new(schema), Box::pin(rows))) } @@ -199,16 +200,33 @@ impl<'o> RestAPIConnection { if !pre.data.is_empty() { return Ok(pre); } + // preserve schema since it is not included in the final response in old servers + let pre_schema = pre.schema.clone(); let mut result = pre; - // preserve schema since it is no included in the final response - let schema = result.schema; while let Some(next_uri) = result.next_uri { result = self.client.query_page(&result.id, &next_uri).await?; if !result.data.is_empty() { break; } } - result.schema = schema; + if result.schema.is_empty() { + result.schema = pre_schema; + } + Ok(result) + } + + async fn wait_for_schema(&self, pre: QueryResponse) -> Result { + if !pre.data.is_empty() || !pre.schema.is_empty() { + return Ok(pre); + } + let mut result = pre; + // preserve schema since it is no included in the final response + while let Some(next_uri) = result.next_uri { + result = self.client.query_page(&result.id, &next_uri).await?; + if !result.data.is_empty() || !result.schema.is_empty() { + break; + } + } Ok(result) }