Skip to content

Commit

Permalink
wait until we've got the max change id before starting to buffer changes
Browse files Browse the repository at this point in the history
  • Loading branch information
jeromegn committed Nov 15, 2023
1 parent 5815b85 commit 67dfe7c
Showing 1 changed file with 23 additions and 19 deletions.
42 changes: 23 additions & 19 deletions crates/corro-agent/src/api/public/pubsub.rs
Original file line number Diff line number Diff line change
Expand Up @@ -392,47 +392,45 @@ pub async fn catch_up_sub(
debug!("catching up sub {} from: {from:?}", matcher.id());
let (ready_tx, ready_rx) = oneshot::channel();

let forward_task = tokio::spawn(forward_sub_to_sender(
Some(ready_rx),
sub_rx,
evt_tx.clone(),
));

let last_change_id = {
let mut buf = BytesMut::new();

let mut buf = BytesMut::new();
let (last_change_id, forward_task) = {
let mut conn = matcher.pool().get().await?;

let res = block_in_place(|| {
let max_change_id = {
let (max_change_id, forward_task) = {
let tx = conn.transaction()?; // read transaction
let mut max_change_id = matcher.max_change_id(&tx)?;
let max_change_id = matcher.max_change_id(&tx)?;
let forward_task = tokio::spawn(forward_sub_to_sender(
Some(ready_rx),
sub_rx,
evt_tx.clone(),
));
match from {
Some(from) => {
catch_up_sub_from(&tx, &matcher, from, &mut buf, &evt_tx)?;
debug!("sub caught up to their 'from' of {from:?}");
}
None => {
let new_max_change_id = catch_up_sub_anew(&tx, &matcher, &evt_tx)?;
if new_max_change_id != max_change_id {
warn!(sub_id = %matcher.id(), "wrong assumption about sqlite transaction, max change id differed! prev: {max_change_id:?}, new: {new_max_change_id:?}");
max_change_id = new_max_change_id;
}
catch_up_sub_anew(&tx, &matcher, &evt_tx)?;
// if new_max_change_id != max_change_id {
// warn!(sub_id = %matcher.id(), "wrong assumption about sqlite transaction, max change id differed! prev: {max_change_id:?}, new: {new_max_change_id:?}");
// max_change_id = new_max_change_id;
// }
debug!("sub caught up from scratch");
}
}
max_change_id
(max_change_id, forward_task)
};
if from.is_none() {
let tx = conn.transaction()?;
// make sure we're all caught up to the max change id seen in this tx
catch_up_sub_from(&tx, &matcher, max_change_id, &mut buf, &evt_tx)?;
}
Ok(max_change_id)
Ok((max_change_id, forward_task))
});

match res {
Ok(last_change_id) => last_change_id,
Ok(res) => res,
Err(e) => {
match e {
CatchUpError::Sqlite(e) => {
Expand All @@ -456,6 +454,12 @@ pub async fn catch_up_sub(

if let Err(_e) = ready_tx.send(last_change_id) {
warn!("subscriber catch up readiness receiver was gone, aborting...");
_ = evt_tx
.send(error_to_query_event_bytes(
&mut buf,
"internal error: could not send last change id into channel",
))
.await;
return Ok(());
}

Expand Down

0 comments on commit 67dfe7c

Please sign in to comment.