diff --git a/examples/sudt-transfer.rs b/examples/sudt-transfer.rs index 3191070..e1d0b1e 100644 --- a/examples/sudt-transfer.rs +++ b/examples/sudt-transfer.rs @@ -177,37 +177,82 @@ async fn consume_ack( let client = CkbRpcClient::new(config.ckb_rpc_url.clone()); let sender = sender_lock_script.calc_script_hash().as_bytes().slice(..20); - let mut ack_packets = pin!( - PacketCell::subscribe(client.clone(), config.sdk_config.clone()) - .try_filter(|cell| futures::future::ready(cell.is_ack_packet())) - ); - // Filter for a packet that is sent to us. - let (p, pd) = loop { - let p = ack_packets.try_next().await?.context("no packet cell")?; - let pd = FungibleTokenPacketData::decode(&p.packet.packet.data[..])?; - if pd.sender == sender { - if p.packet.ack.as_deref() != Some(&[1]) { + let packets = PacketCell::search(&client, &config.sdk_config, 100, &mut 0) + .await? + .into_iter() + .filter_map(|p| { + if !p.is_ack_packet() { + return None; + } + let pd = FungibleTokenPacketData::decode(&p.packet.packet.data[..]).ok()?; + if pd.sender == sender { + if p.packet.ack.as_deref() != Some(&[1]) { + println!("skipping packet {pd}"); + return None; + } + Some((p, pd)) + } else { + println!("skipping packet\n{pd:?}\n{:?}", p.packet.packet); + None + } + }) + .collect::>(); + + if packets.is_empty() { + println!("no avaliable packets found, start waiting..."); + let mut ack_packets = pin!(PacketCell::subscribe( + client.clone(), + config.sdk_config.clone() + ) + .try_filter(|cell| futures::future::ready(cell.is_ack_packet()))); + // Filter for a packet that is sent to us. + let (p, pd) = loop { + let p = ack_packets.try_next().await?.context("no packet cell")?; + let pd = FungibleTokenPacketData::decode(&p.packet.packet.data[..])?; + if pd.sender == sender { + if p.packet.ack.as_deref() != Some(&[1]) { + println!("skipping packet {pd}"); + continue; + } + break (p, pd); + } else { println!("skipping packet {pd}"); - continue; } - break (p, pd); - } else { - println!("skipping packet {pd}"); + }; + consume_ack_single(p, pd, sk, &sender_lock_script, &client, config).await?; + } else { + let mut consumed = 0; + for (p, pd) in packets { + consume_ack_single(p, pd, sk, &sender_lock_script, &client, config).await?; + consumed += 1; } - }; + println!("consumed {consumed} packets"); + } + + Ok(()) +} + +async fn consume_ack_single( + p: PacketCell, + pd: FungibleTokenPacketData, + sk: secp256k1::SecretKey, + sender_lock_script: &packed::Script, + client: &CkbRpcClient, + config: &Config, +) -> Result<[u8; 32]> { println!("consuming packet ack\n{pd}\n{:?}", p.packet.packet); let sudt_type_hash = hex::decode(pd.denom).context("decode base denom")?; let (sudt_transfer_dep, st_cell, st_cell_amount, _) = - get_st_cell_by_sudt_type_hash(&client, config, sudt_type_hash).await?; - let sudt_dep = get_type_dep_from_cell(&client, &st_cell) + get_st_cell_by_sudt_type_hash(client, config, sudt_type_hash).await?; + let sudt_dep = get_type_dep_from_cell(client, &st_cell) .await .context("get sudt dep")?; - let user_input = get_capacity_input(&client, &sender_lock_script).await?; + let user_input = get_capacity_input(client, sender_lock_script).await?; let packet_contract_cell = get_latest_cell_by_type_script( - &client, + client, config.sdk_config.packet_contract_type_script().into(), ) .await?; @@ -231,10 +276,8 @@ async fn consume_ack( .input(simple_input(user_input.out_point.into())) .witness(placeholder_witness.as_bytes().pack()); let tx = add_ibc_envelope(tx, &envelope).build(); - let tx = complete_tx(&config.ckb_rpc_url, &tx, sender_lock_script, sk)?; - send_transaction(&config.ckb_rpc_url, tx)?; - - Ok(()) + let tx = complete_tx(&config.ckb_rpc_url, &tx, sender_lock_script.clone(), sk)?; + send_transaction(&config.ckb_rpc_url, tx) } fn get_sudt_type_script(config: &Config, sudt: &str) -> Result { @@ -537,51 +580,92 @@ async fn receive( .as_bytes() .slice(..20); - let mut recv_packets = pin!( - PacketCell::subscribe(client.clone(), config.sdk_config.clone()) - .try_filter(|cell| futures::future::ready(cell.is_recv_packet())) - ); - // Filter for a packet that is sent to us. - let (p, pd) = loop { - let p = recv_packets - .try_next() - .await? - .context("no packet cell for us")?; - let pd = FungibleTokenPacketData::decode(&p.packet.packet.data[..])?; - if pd.receiver == receiver { - break (p, pd); - } else { - println!("skipping packet {pd}"); + let packets = PacketCell::search(&client, &config.sdk_config, 100, &mut 0) + .await? + .into_iter() + .filter_map(|p| { + if !p.is_recv_packet() { + return None; + } + let pd = FungibleTokenPacketData::decode(&p.packet.packet.data[..]).ok()?; + if pd.receiver == receiver { + Some((p, pd)) + } else { + println!("skipping packet\n{pd:?}\n{:?}", p.packet.packet); + None + } + }) + .collect::>(); + + if packets.is_empty() { + println!("no avaliable packets found, start waiting..."); + let mut recv_packets = pin!(PacketCell::subscribe( + client.clone(), + config.sdk_config.clone() + ) + .try_filter(|cell| futures::future::ready(cell.is_recv_packet()))); + // Filter for a packet that is sent to us. + let (p, pd) = loop { + let p = recv_packets + .try_next() + .await? + .context("no packet cell for us")?; + let pd = FungibleTokenPacketData::decode(&p.packet.packet.data[..])?; + if pd.receiver == receiver { + break (p, pd); + } else { + println!("skipping packet {pd}"); + } + }; + receive_single(p, pd, sk, &receiver_lock_script, &client, config).await?; + } else { + let mut received = 0; + for (p, pd) in packets { + receive_single(p, pd, sk, &receiver_lock_script, &client, config).await?; + received += 1; } - }; - println!("receiving packet\n{pd}\n{:?}", p.packet.packet); + println!("received {received} packets"); + } + + Ok(()) +} + +async fn receive_single( + p: PacketCell, + pd: FungibleTokenPacketData, + sk: secp256k1::SecretKey, + receiver_lock_script: &packed::Script, + client: &CkbRpcClient, + config: &Config, +) -> Result<[u8; 32]> { + println!("receiving packet\n{pd:?}\n{:?}", p.packet.packet); let base_denom = pd.denom.split('/').last().context("get base denom")?; let sudt_type_hash = hex::decode(base_denom).context("decode base denom")?; let (sudt_transfer_dep, st_cell, st_cell_amount, sudt_type_script) = - get_st_cell_by_sudt_type_hash(&client, config, sudt_type_hash).await?; + get_st_cell_by_sudt_type_hash(client, config, sudt_type_hash).await?; - let sudt_dep = get_type_dep_from_cell(&client, &st_cell) + let sudt_dep = get_type_dep_from_cell(client, &st_cell) .await .context("get sudt dep")?; let axon_metadata_cell = get_latest_cell_by_type_script( - &client, + client, config.sdk_config.axon_metadata_type_script().into(), ) .await?; let channel_contract_cell = get_latest_cell_by_type_script( - &client, + client, config.sdk_config.channel_contract_type_script().into(), ) .await?; let packet_contract_cell = get_latest_cell_by_type_script( - &client, + client, config.sdk_config.packet_contract_type_script().into(), ) .await?; - let channel = IbcChannelCell::get_latest(&client, &config.sdk_config).await?; + let channel = IbcChannelCell::get_latest(client, &config.sdk_config).await?; let (tx, envelope) = assemble_write_ack_partial_transaction( simple_dep(axon_metadata_cell.out_point.into()), @@ -593,7 +677,7 @@ async fn receive( vec![1], )?; - let user_input = get_capacity_input(&client, &receiver_lock_script).await?; + let user_input = get_capacity_input(client, receiver_lock_script).await?; // sighash placeholder witness. let placeholder_witness = packed::WitnessArgs::new_builder() @@ -629,11 +713,9 @@ async fn receive( let tx = add_ibc_envelope(tx, &envelope).build(); - let tx = complete_tx(&config.ckb_rpc_url, &tx, receiver_lock_script, sk)?; + let tx = complete_tx(&config.ckb_rpc_url, &tx, receiver_lock_script.clone(), sk)?; - send_transaction(&config.ckb_rpc_url, tx)?; - - Ok(()) + send_transaction(&config.ckb_rpc_url, tx) } async fn get_capacity_input(