Skip to content

Commit

Permalink
rewrite get_stores using ContinuationUnfolder
Browse files Browse the repository at this point in the history
Signed-off-by: Leo Valais <[email protected]>
  • Loading branch information
leovalais committed Feb 5, 2025
1 parent 71c9f53 commit 28052f1
Showing 1 changed file with 41 additions and 50 deletions.
91 changes: 41 additions & 50 deletions fga/src/client.rs
Original file line number Diff line number Diff line change
Expand Up @@ -86,9 +86,30 @@ impl Client {
Ok(client)
}

pub fn stores(&self) -> impl stream::TryStream<Ok = Store, Error = RequestFailure> {
ContinuationUnfolder::new(self.clone(), ()).stream(
|UnfoldArgs {
client,
continuation,
ctx: _ctx,
}| async move {
let (stores, continuation) = client
.get_stores(None, continuation.as_ref().map(String::as_str))
.await?;
Ok((
stores,
UnfoldNextState {
ctx: (),
continuation,
},
))
},
)
}

pub async fn find_store(&self, store_name: &str) -> Result<Option<Store>, RequestFailure> {
let stream = self
.get_stores(None)
.stores()
.try_filter(|Store { name, .. }| future::ready(name == store_name));
futures::pin_mut!(stream);
let store = stream.try_next().await?.into_iter().last();
Expand Down Expand Up @@ -298,65 +319,35 @@ impl Client {
.unwrap()
}

fn get_stores(
async fn get_stores(
&self,
page_size: Option<usize>,
) -> impl stream::TryStream<Ok = Store, Error = RequestFailure> {
continuation: Option<&str>,
) -> Result<(Vec<Store>, String), RequestFailure> {
#[derive(serde::Deserialize)]
struct Response {
stores: Vec<Store>,
continuation_token: Option<String>,
#[serde(default)]
continuation_token: String,
}

struct State {
client: Client,
continuation: Option<String>,
let mut url = self.base_url().join("stores").unwrap();
if let Some(continuation) = continuation {
url.query_pairs_mut()
.append_pair("continuation_token", continuation);
}
if let Some(page_size) = page_size {
url.query_pairs_mut()
.append_pair("page_size", page_size.to_string().as_str());
}
let response = self.inner.get(url).send().await?;

// will be factorized and commented
let stream = stream::try_unfold(
State {
client: self.clone(),
continuation: None,
},
move |State {
client,
continuation,
}| {
async move {
if continuation.as_ref().is_some_and(String::is_empty) {
return Ok(None);
}

let mut url = client.base_url().join("stores").unwrap();
if let Some(continuation) = continuation {
url.query_pairs_mut()
.append_pair("continuation_token", continuation.as_str());
}
if let Some(page_size) = page_size {
url.query_pairs_mut()
.append_pair("page_size", page_size.to_string().as_str());
}
let response = client.inner.get(url).send().await?;

let Response {
stores,
continuation_token,
} = response.error_for_status()?.json::<Response>().await?;
let continuation = continuation_token.unwrap_or_default();

let next_state = State {
client,
continuation: Some(continuation),
};
Ok::<_, RequestFailure>(Some((stores, next_state)))
}
},
);
let Response {
stores,
continuation_token,
} = response.error_for_status()?.json::<Response>().await?;

stream
.map_ok(|stores| stream::iter(stores.into_iter().map(Ok)))
.try_flatten()
Ok((stores, continuation_token))
}

async fn post_stores(&self, name: &str) -> Result<Store, RequestFailure> {
Expand Down

0 comments on commit 28052f1

Please sign in to comment.