Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: sudt-transfer cli receive more packets in one invocation #28

Merged
merged 3 commits into from
Nov 20, 2023
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
184 changes: 133 additions & 51 deletions examples/sudt-transfer.rs
Original file line number Diff line number Diff line change
Expand Up @@ -177,37 +177,82 @@ async fn consume_ack(
let client = CkbRpcClient::new(config.ckb_rpc_url.clone());
let sender = sender_lock_script.calc_script_hash().as_bytes().slice(..20);

let mut ack_packets = pin!(
PacketCell::subscribe(client.clone(), config.sdk_config.clone())
.try_filter(|cell| futures::future::ready(cell.is_ack_packet()))
);
// Filter for a packet that is sent to us.
let (p, pd) = loop {
let p = ack_packets.try_next().await?.context("no packet cell")?;
let pd = FungibleTokenPacketData::decode(&p.packet.packet.data[..])?;
if pd.sender == sender {
if p.packet.ack.as_deref() != Some(&[1]) {
let packets = PacketCell::search(&client, &config.sdk_config, 100, &mut 0)
.await?
.into_iter()
.filter_map(|p| {
if !p.is_ack_packet() {
return None;
}
let pd = FungibleTokenPacketData::decode(&p.packet.packet.data[..]).ok()?;
if pd.sender == sender {
if p.packet.ack.as_deref() != Some(&[1]) {
println!("skipping packet {pd}");
return None;
}
Some((p, pd))
} else {
println!("skipping packet\n{pd:?}\n{:?}", p.packet.packet);
None
}
})
.collect::<Vec<_>>();

if packets.is_empty() {
println!("no avaliable packets found, start waiting...");
let mut ack_packets = pin!(PacketCell::subscribe(
client.clone(),
config.sdk_config.clone()
)
.try_filter(|cell| futures::future::ready(cell.is_ack_packet())));
// Filter for a packet that is sent to us.
let (p, pd) = loop {
let p = ack_packets.try_next().await?.context("no packet cell")?;
let pd = FungibleTokenPacketData::decode(&p.packet.packet.data[..])?;
if pd.sender == sender {
if p.packet.ack.as_deref() != Some(&[1]) {
println!("skipping packet {pd}");
continue;
}
break (p, pd);
} else {
println!("skipping packet {pd}");
continue;
}
break (p, pd);
} else {
println!("skipping packet {pd}");
};
consume_ack_single(p, pd, sk, &sender_lock_script, &client, config).await?;
} else {
let mut consumed = 0;
for (p, pd) in packets {
consume_ack_single(p, pd, sk, &sender_lock_script, &client, config).await?;
consumed += 1;
}
};
println!("consumed {consumed} packets");
}

Ok(())
}

async fn consume_ack_single(
p: PacketCell,
pd: FungibleTokenPacketData,
sk: secp256k1::SecretKey,
sender_lock_script: &packed::Script,
client: &CkbRpcClient,
config: &Config,
) -> Result<[u8; 32]> {
println!("consuming packet ack\n{pd}\n{:?}", p.packet.packet);

let sudt_type_hash = hex::decode(pd.denom).context("decode base denom")?;
let (sudt_transfer_dep, st_cell, st_cell_amount, _) =
get_st_cell_by_sudt_type_hash(&client, config, sudt_type_hash).await?;
let sudt_dep = get_type_dep_from_cell(&client, &st_cell)
get_st_cell_by_sudt_type_hash(client, config, sudt_type_hash).await?;
let sudt_dep = get_type_dep_from_cell(client, &st_cell)
.await
.context("get sudt dep")?;

let user_input = get_capacity_input(&client, &sender_lock_script).await?;
let user_input = get_capacity_input(client, sender_lock_script).await?;

let packet_contract_cell = get_latest_cell_by_type_script(
&client,
client,
config.sdk_config.packet_contract_type_script().into(),
)
.await?;
Expand All @@ -231,10 +276,8 @@ async fn consume_ack(
.input(simple_input(user_input.out_point.into()))
.witness(placeholder_witness.as_bytes().pack());
let tx = add_ibc_envelope(tx, &envelope).build();
let tx = complete_tx(&config.ckb_rpc_url, &tx, sender_lock_script, sk)?;
send_transaction(&config.ckb_rpc_url, tx)?;

Ok(())
let tx = complete_tx(&config.ckb_rpc_url, &tx, sender_lock_script.clone(), sk)?;
send_transaction(&config.ckb_rpc_url, tx)
}

fn get_sudt_type_script(config: &Config, sudt: &str) -> Result<packed::Script> {
Expand Down Expand Up @@ -537,51 +580,92 @@ async fn receive(
.as_bytes()
.slice(..20);

let mut recv_packets = pin!(
PacketCell::subscribe(client.clone(), config.sdk_config.clone())
.try_filter(|cell| futures::future::ready(cell.is_recv_packet()))
);
// Filter for a packet that is sent to us.
let (p, pd) = loop {
let p = recv_packets
.try_next()
.await?
.context("no packet cell for us")?;
let pd = FungibleTokenPacketData::decode(&p.packet.packet.data[..])?;
if pd.receiver == receiver {
break (p, pd);
} else {
println!("skipping packet {pd}");
let packets = PacketCell::search(&client, &config.sdk_config, 100, &mut 0)
.await?
.into_iter()
.filter_map(|p| {
if !p.is_recv_packet() {
return None;
}
let pd = FungibleTokenPacketData::decode(&p.packet.packet.data[..]).ok()?;
if pd.receiver == receiver {
Some((p, pd))
} else {
println!("skipping packet\n{pd:?}\n{:?}", p.packet.packet);
None
}
})
.collect::<Vec<_>>();

if packets.is_empty() {
println!("no avaliable packets found, start waiting...");
let mut recv_packets = pin!(PacketCell::subscribe(
client.clone(),
config.sdk_config.clone()
)
.try_filter(|cell| futures::future::ready(cell.is_recv_packet())));
// Filter for a packet that is sent to us.
let (p, pd) = loop {
let p = recv_packets
.try_next()
.await?
.context("no packet cell for us")?;
let pd = FungibleTokenPacketData::decode(&p.packet.packet.data[..])?;
if pd.receiver == receiver {
break (p, pd);
} else {
println!("skipping packet {pd}");
}
};
receive_single(p, pd, sk, &receiver_lock_script, &client, config).await?;
} else {
let mut received = 0;
for (p, pd) in packets {
receive_single(p, pd, sk, &receiver_lock_script, &client, config).await?;
received += 1;
}
};
println!("receiving packet\n{pd}\n{:?}", p.packet.packet);
println!("received {received} packets");
}

Ok(())
}

async fn receive_single(
p: PacketCell,
pd: FungibleTokenPacketData,
sk: secp256k1::SecretKey,
receiver_lock_script: &packed::Script,
client: &CkbRpcClient,
config: &Config,
) -> Result<[u8; 32]> {
println!("receiving packet\n{pd:?}\n{:?}", p.packet.packet);

let base_denom = pd.denom.split('/').last().context("get base denom")?;
let sudt_type_hash = hex::decode(base_denom).context("decode base denom")?;

let (sudt_transfer_dep, st_cell, st_cell_amount, sudt_type_script) =
get_st_cell_by_sudt_type_hash(&client, config, sudt_type_hash).await?;
get_st_cell_by_sudt_type_hash(client, config, sudt_type_hash).await?;

let sudt_dep = get_type_dep_from_cell(&client, &st_cell)
let sudt_dep = get_type_dep_from_cell(client, &st_cell)
.await
.context("get sudt dep")?;

let axon_metadata_cell = get_latest_cell_by_type_script(
&client,
client,
config.sdk_config.axon_metadata_type_script().into(),
)
.await?;
let channel_contract_cell = get_latest_cell_by_type_script(
&client,
client,
config.sdk_config.channel_contract_type_script().into(),
)
.await?;
let packet_contract_cell = get_latest_cell_by_type_script(
&client,
client,
config.sdk_config.packet_contract_type_script().into(),
)
.await?;
let channel = IbcChannelCell::get_latest(&client, &config.sdk_config).await?;
let channel = IbcChannelCell::get_latest(client, &config.sdk_config).await?;

let (tx, envelope) = assemble_write_ack_partial_transaction(
simple_dep(axon_metadata_cell.out_point.into()),
Expand All @@ -593,7 +677,7 @@ async fn receive(
vec![1],
)?;

let user_input = get_capacity_input(&client, &receiver_lock_script).await?;
let user_input = get_capacity_input(client, receiver_lock_script).await?;

// sighash placeholder witness.
let placeholder_witness = packed::WitnessArgs::new_builder()
Expand Down Expand Up @@ -629,11 +713,9 @@ async fn receive(

let tx = add_ibc_envelope(tx, &envelope).build();

let tx = complete_tx(&config.ckb_rpc_url, &tx, receiver_lock_script, sk)?;
let tx = complete_tx(&config.ckb_rpc_url, &tx, receiver_lock_script.clone(), sk)?;

send_transaction(&config.ckb_rpc_url, tx)?;

Ok(())
send_transaction(&config.ckb_rpc_url, tx)
}

async fn get_capacity_input(
Expand Down
Loading