diff --git a/fga/src/client.rs b/fga/src/client.rs index 995875c0005..ffebc31c788 100644 --- a/fga/src/client.rs +++ b/fga/src/client.rs @@ -86,9 +86,30 @@ impl Client { Ok(client) } + pub fn stores(&self) -> impl stream::TryStream { + ContinuationUnfolder::new(self.clone(), ()).stream( + |UnfoldArgs { + client, + continuation, + ctx: _ctx, + }| async move { + let (stores, continuation) = client + .get_stores(None, continuation.as_ref().map(String::as_str)) + .await?; + Ok(( + stores, + UnfoldNextState { + ctx: (), + continuation, + }, + )) + }, + ) + } + pub async fn find_store(&self, store_name: &str) -> Result, RequestFailure> { let stream = self - .get_stores(None) + .stores() .try_filter(|Store { name, .. }| future::ready(name == store_name)); futures::pin_mut!(stream); let store = stream.try_next().await?.into_iter().last(); @@ -298,65 +319,35 @@ impl Client { .unwrap() } - fn get_stores( + async fn get_stores( &self, page_size: Option, - ) -> impl stream::TryStream { + continuation: Option<&str>, + ) -> Result<(Vec, String), RequestFailure> { #[derive(serde::Deserialize)] struct Response { stores: Vec, - continuation_token: Option, + #[serde(default)] + continuation_token: String, } - struct State { - client: Client, - continuation: Option, + let mut url = self.base_url().join("stores").unwrap(); + if let Some(continuation) = continuation { + url.query_pairs_mut() + .append_pair("continuation_token", continuation); + } + if let Some(page_size) = page_size { + url.query_pairs_mut() + .append_pair("page_size", page_size.to_string().as_str()); } + let response = self.inner.get(url).send().await?; - // will be factorized and commented - let stream = stream::try_unfold( - State { - client: self.clone(), - continuation: None, - }, - move |State { - client, - continuation, - }| { - async move { - if continuation.as_ref().is_some_and(String::is_empty) { - return Ok(None); - } - - let mut url = client.base_url().join("stores").unwrap(); - if let Some(continuation) = continuation { - url.query_pairs_mut() - .append_pair("continuation_token", continuation.as_str()); - } - if let Some(page_size) = page_size { - url.query_pairs_mut() - .append_pair("page_size", page_size.to_string().as_str()); - } - let response = client.inner.get(url).send().await?; - - let Response { - stores, - continuation_token, - } = response.error_for_status()?.json::().await?; - let continuation = continuation_token.unwrap_or_default(); - - let next_state = State { - client, - continuation: Some(continuation), - }; - Ok::<_, RequestFailure>(Some((stores, next_state))) - } - }, - ); + let Response { + stores, + continuation_token, + } = response.error_for_status()?.json::().await?; - stream - .map_ok(|stores| stream::iter(stores.into_iter().map(Ok))) - .try_flatten() + Ok((stores, continuation_token)) } async fn post_stores(&self, name: &str) -> Result {