From 03df547406af8df4cf6b9b6fb80b1a651f031ee8 Mon Sep 17 00:00:00 2001 From: liyukun Date: Fri, 17 Nov 2023 11:53:11 +0800 Subject: [PATCH] feat: consume ack in a loop --- examples/sudt-transfer.rs | 116 ++++++++++++++++++++------------------ 1 file changed, 62 insertions(+), 54 deletions(-) diff --git a/examples/sudt-transfer.rs b/examples/sudt-transfer.rs index 106b6b2..c26c695 100644 --- a/examples/sudt-transfer.rs +++ b/examples/sudt-transfer.rs @@ -1,6 +1,6 @@ //! An example cli for sending/receiving SUDT with the sudt transfer module. -use std::{collections::HashMap, fmt, fs, path::PathBuf, pin::pin, time::Duration}; +use std::{collections::HashMap, fmt, fs, path::PathBuf, time::Duration}; use anyhow::{bail, ensure, Context, Result}; use bytes::Bytes; @@ -31,7 +31,6 @@ use forcerelay_ckb_sdk::{ assemble_send_packet_partial_transaction, assemble_write_ack_partial_transaction, }, }; -use futures::TryStreamExt; use prost::Message; use secp256k1::Secp256k1; use serde::Deserialize; @@ -177,62 +176,71 @@ async fn consume_ack( let client = CkbRpcClient::new(config.ckb_rpc_url.clone()); let sender = sender_lock_script.calc_script_hash().as_bytes().slice(..20); - let mut ack_packets = pin!( - PacketCell::subscribe(client.clone(), config.sdk_config.clone()) - .try_filter(|cell| futures::future::ready(cell.is_ack_packet())) - ); - // Filter for a packet that is sent to us. - let (p, pd) = loop { - let p = ack_packets.try_next().await?.context("no packet cell")?; - let pd = FungibleTokenPacketData::decode(&p.packet.packet.data[..])?; - if pd.sender == sender { - if p.packet.ack.as_deref() != Some(&[1]) { - println!("skipping packet {pd}"); - continue; + let packets = PacketCell::search(&client, &config.sdk_config, 100, &mut 0) + .await? + .into_iter() + .filter_map(|p| { + if !p.is_ack_packet() { + return None; } - break (p, pd); - } else { - println!("skipping packet {pd}"); - } - }; - println!("consuming packet ack\n{pd}\n{:?}", p.packet.packet); + let pd = FungibleTokenPacketData::decode(&p.packet.packet.data[..]).ok()?; + if pd.sender == sender { + if p.packet.ack.as_deref() != Some(&[1]) { + println!("skipping packet {pd}"); + return None; + } + Some((p, pd)) + } else { + println!("skipping packet\n{pd:?}\n{:?}", p.packet.packet); + None + } + }); - let sudt_type_hash = hex::decode(pd.denom).context("decode base denom")?; - let (sudt_transfer_dep, st_cell, st_cell_amount, _) = - get_st_cell_by_sudt_type_hash(&client, config, sudt_type_hash).await?; - let sudt_dep = get_type_dep_from_cell(&client, &st_cell) - .await - .context("get sudt dep")?; + let mut consumed = 0; + for (p, pd) in packets { + println!("consuming packet ack\n{pd}\n{:?}", p.packet.packet); - let user_input = get_capacity_input(&client, &sender_lock_script).await?; + let sudt_type_hash = hex::decode(pd.denom).context("decode base denom")?; + let (sudt_transfer_dep, st_cell, st_cell_amount, _) = + get_st_cell_by_sudt_type_hash(&client, config, sudt_type_hash).await?; + let sudt_dep = get_type_dep_from_cell(&client, &st_cell) + .await + .context("get sudt dep")?; - let packet_contract_cell = get_latest_cell_by_type_script( - &client, - config.sdk_config.packet_contract_type_script().into(), - ) - .await?; + let user_input = get_capacity_input(&client, &sender_lock_script).await?; - let (tx, envelope) = assemble_consume_ack_packet_partial_transaction( - simple_dep(packet_contract_cell.out_point.into()), - p, - )?; - // sighash placeholder witness. - let placeholder_witness = packed::WitnessArgs::new_builder() - .lock(Some(Bytes::from_static(&[0u8; 65])).pack()) - .build(); - let tx = tx - .input(simple_input(st_cell.out_point.into())) - .output(packed::CellOutput::from(st_cell.output)) - .output_data(sudt_amount_data(st_cell_amount).pack()) - .witness([].pack()) - .cell_dep(sudt_transfer_dep) - .cell_dep(sudt_dep) - // capacity input and witness. - .input(simple_input(user_input.out_point.into())) - .witness(placeholder_witness.as_bytes().pack()); - let tx = add_ibc_envelope(tx, &envelope).build(); - let tx = complete_tx(&config.ckb_rpc_url, &tx, sender_lock_script, sk)?; - send_transaction(&config.ckb_rpc_url, tx)?; + let packet_contract_cell = get_latest_cell_by_type_script( + &client, + config.sdk_config.packet_contract_type_script().into(), + ) + .await?; + + let (tx, envelope) = assemble_consume_ack_packet_partial_transaction( + simple_dep(packet_contract_cell.out_point.into()), + p, + )?; + // sighash placeholder witness. + let placeholder_witness = packed::WitnessArgs::new_builder() + .lock(Some(Bytes::from_static(&[0u8; 65])).pack()) + .build(); + let tx = tx + .input(simple_input(st_cell.out_point.into())) + .output(packed::CellOutput::from(st_cell.output)) + .output_data(sudt_amount_data(st_cell_amount).pack()) + .witness([].pack()) + .cell_dep(sudt_transfer_dep) + .cell_dep(sudt_dep) + // capacity input and witness. + .input(simple_input(user_input.out_point.into())) + .witness(placeholder_witness.as_bytes().pack()); + let tx = add_ibc_envelope(tx, &envelope).build(); + let tx = complete_tx(&config.ckb_rpc_url, &tx, sender_lock_script.clone(), sk)?; + send_transaction(&config.ckb_rpc_url, tx)?; + + consumed += 1; + } + + println!("consumed {consumed} packets"); Ok(()) } @@ -560,7 +568,7 @@ async fn receive( let sudt_type_hash = hex::decode(base_denom).context("decode base denom")?; let (sudt_transfer_dep, st_cell, st_cell_amount, sudt_type_script) = - get_st_cell_by_sudt_type_hash(&client, &config, sudt_type_hash).await?; + get_st_cell_by_sudt_type_hash(&client, config, sudt_type_hash).await?; let sudt_dep = get_type_dep_from_cell(&client, &st_cell) .await