Skip to content

Commit

Permalink
feat: wait new packets comming if no avaliable packets found at present
Browse files Browse the repository at this point in the history
  • Loading branch information
liyukun committed Nov 17, 2023
1 parent 03df547 commit ca40d82
Showing 1 changed file with 190 additions and 121 deletions.
311 changes: 190 additions & 121 deletions examples/sudt-transfer.rs
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
//! An example cli for sending/receiving SUDT with the sudt transfer module.

use std::{collections::HashMap, fmt, fs, path::PathBuf, time::Duration};
use std::{collections::HashMap, fmt, fs, path::PathBuf, pin::pin, time::Duration};

use anyhow::{bail, ensure, Context, Result};
use bytes::Bytes;
Expand Down Expand Up @@ -31,6 +31,7 @@ use forcerelay_ckb_sdk::{
assemble_send_packet_partial_transaction, assemble_write_ack_partial_transaction,
},
};
use futures::TryStreamExt;
use prost::Message;
use secp256k1::Secp256k1;
use serde::Deserialize;
Expand Down Expand Up @@ -194,55 +195,89 @@ async fn consume_ack(
println!("skipping packet\n{pd:?}\n{:?}", p.packet.packet);
None
}
});
})
.collect::<Vec<_>>();

let mut consumed = 0;
for (p, pd) in packets {
println!("consuming packet ack\n{pd}\n{:?}", p.packet.packet);
if packets.is_empty() {
println!("no avaliable packets found, start waiting...");
let mut ack_packets = pin!(PacketCell::subscribe(
client.clone(),
config.sdk_config.clone()
)
.try_filter(|cell| futures::future::ready(cell.is_ack_packet())));
// Filter for a packet that is sent to us.
let (p, pd) = loop {
let p = ack_packets.try_next().await?.context("no packet cell")?;
let pd = FungibleTokenPacketData::decode(&p.packet.packet.data[..])?;
if pd.sender == sender {
if p.packet.ack.as_deref() != Some(&[1]) {
println!("skipping packet {pd}");
continue;
}
break (p, pd);
} else {
println!("skipping packet {pd}");
}
};
consume_ack_single(p, pd, sk, &sender_lock_script, &client, config).await?;
} else {
let mut consumed = 0;
for (p, pd) in packets {
consume_ack_single(p, pd, sk, &sender_lock_script, &client, config).await?;
consumed += 1;
}
println!("consumed {consumed} packets");
}

let sudt_type_hash = hex::decode(pd.denom).context("decode base denom")?;
let (sudt_transfer_dep, st_cell, st_cell_amount, _) =
get_st_cell_by_sudt_type_hash(&client, config, sudt_type_hash).await?;
let sudt_dep = get_type_dep_from_cell(&client, &st_cell)
.await
.context("get sudt dep")?;
Ok(())
}

let user_input = get_capacity_input(&client, &sender_lock_script).await?;
async fn consume_ack_single(
p: PacketCell,
pd: FungibleTokenPacketData,
sk: secp256k1::SecretKey,
sender_lock_script: &packed::Script,
client: &CkbRpcClient,
config: &Config,
) -> Result<[u8; 32]> {
println!("consuming packet ack\n{pd}\n{:?}", p.packet.packet);

let packet_contract_cell = get_latest_cell_by_type_script(
&client,
config.sdk_config.packet_contract_type_script().into(),
)
.await?;
let sudt_type_hash = hex::decode(pd.denom).context("decode base denom")?;
let (sudt_transfer_dep, st_cell, st_cell_amount, _) =
get_st_cell_by_sudt_type_hash(client, config, sudt_type_hash).await?;
let sudt_dep = get_type_dep_from_cell(client, &st_cell)
.await
.context("get sudt dep")?;

let (tx, envelope) = assemble_consume_ack_packet_partial_transaction(
simple_dep(packet_contract_cell.out_point.into()),
p,
)?;
// sighash placeholder witness.
let placeholder_witness = packed::WitnessArgs::new_builder()
.lock(Some(Bytes::from_static(&[0u8; 65])).pack())
.build();
let tx = tx
.input(simple_input(st_cell.out_point.into()))
.output(packed::CellOutput::from(st_cell.output))
.output_data(sudt_amount_data(st_cell_amount).pack())
.witness([].pack())
.cell_dep(sudt_transfer_dep)
.cell_dep(sudt_dep)
// capacity input and witness.
.input(simple_input(user_input.out_point.into()))
.witness(placeholder_witness.as_bytes().pack());
let tx = add_ibc_envelope(tx, &envelope).build();
let tx = complete_tx(&config.ckb_rpc_url, &tx, sender_lock_script.clone(), sk)?;
send_transaction(&config.ckb_rpc_url, tx)?;

consumed += 1;
}
let user_input = get_capacity_input(client, sender_lock_script).await?;

println!("consumed {consumed} packets");
let packet_contract_cell = get_latest_cell_by_type_script(
client,
config.sdk_config.packet_contract_type_script().into(),
)
.await?;

Ok(())
let (tx, envelope) = assemble_consume_ack_packet_partial_transaction(
simple_dep(packet_contract_cell.out_point.into()),
p,
)?;
// sighash placeholder witness.
let placeholder_witness = packed::WitnessArgs::new_builder()
.lock(Some(Bytes::from_static(&[0u8; 65])).pack())
.build();
let tx = tx
.input(simple_input(st_cell.out_point.into()))
.output(packed::CellOutput::from(st_cell.output))
.output_data(sudt_amount_data(st_cell_amount).pack())
.witness([].pack())
.cell_dep(sudt_transfer_dep)
.cell_dep(sudt_dep)
// capacity input and witness.
.input(simple_input(user_input.out_point.into()))
.witness(placeholder_witness.as_bytes().pack());
let tx = add_ibc_envelope(tx, &envelope).build();
let tx = complete_tx(&config.ckb_rpc_url, &tx, sender_lock_script.clone(), sk)?;
send_transaction(&config.ckb_rpc_url, tx)
}

fn get_sudt_type_script(config: &Config, sudt: &str) -> Result<packed::Script> {
Expand Down Expand Up @@ -559,94 +594,128 @@ async fn receive(
println!("skipping packet\n{pd:?}\n{:?}", p.packet.packet);
None
}
});
let mut received = 0;
for (p, pd) in packets {
println!("receiving packet\n{pd:?}\n{:?}", p.packet.packet);
})
.collect::<Vec<_>>();

let base_denom = pd.denom.split('/').last().context("get base denom")?;
let sudt_type_hash = hex::decode(base_denom).context("decode base denom")?;
if packets.is_empty() {
println!("no avaliable packets found, start waiting...");
let mut recv_packets = pin!(PacketCell::subscribe(
client.clone(),
config.sdk_config.clone()
)
.try_filter(|cell| futures::future::ready(cell.is_recv_packet())));
// Filter for a packet that is sent to us.
let (p, pd) = loop {
let p = recv_packets
.try_next()
.await?
.context("no packet cell for us")?;
let pd = FungibleTokenPacketData::decode(&p.packet.packet.data[..])?;
if pd.receiver == receiver {
break (p, pd);
} else {
println!("skipping packet {pd}");
}
};
receive_single(p, pd, sk, &receiver_lock_script, &client, config).await?;
} else {
let mut received = 0;
for (p, pd) in packets {
receive_single(p, pd, sk, &receiver_lock_script, &client, config).await?;
received += 1;
}
println!("received {received} packets");
}

let (sudt_transfer_dep, st_cell, st_cell_amount, sudt_type_script) =
get_st_cell_by_sudt_type_hash(&client, config, sudt_type_hash).await?;
Ok(())
}

let sudt_dep = get_type_dep_from_cell(&client, &st_cell)
.await
.context("get sudt dep")?;
async fn receive_single(
p: PacketCell,
pd: FungibleTokenPacketData,
sk: secp256k1::SecretKey,
receiver_lock_script: &packed::Script,
client: &CkbRpcClient,
config: &Config,
) -> Result<[u8; 32]> {
println!("receiving packet\n{pd:?}\n{:?}", p.packet.packet);

let axon_metadata_cell = get_latest_cell_by_type_script(
&client,
config.sdk_config.axon_metadata_type_script().into(),
)
.await?;
let channel_contract_cell = get_latest_cell_by_type_script(
&client,
config.sdk_config.channel_contract_type_script().into(),
)
.await?;
let packet_contract_cell = get_latest_cell_by_type_script(
&client,
config.sdk_config.packet_contract_type_script().into(),
)
.await?;
let channel = IbcChannelCell::get_latest(&client, &config.sdk_config).await?;

let (tx, envelope) = assemble_write_ack_partial_transaction(
simple_dep(axon_metadata_cell.out_point.into()),
simple_dep(channel_contract_cell.out_point.into()),
simple_dep(packet_contract_cell.out_point.into()),
&config.sdk_config,
channel,
p,
vec![1],
)?;

let user_input = get_capacity_input(&client, &receiver_lock_script).await?;

// sighash placeholder witness.
let placeholder_witness = packed::WitnessArgs::new_builder()
.lock(Some(Bytes::from_static(&[0u8; 65])).pack())
.build();
let tx = tx
// st-cell input/output.
.input(simple_input(st_cell.out_point.into()))
.output(packed::CellOutput::from(st_cell.output))
.output_data(
sudt_amount_data(
st_cell_amount
.checked_sub(pd.amount.into())
.context("st-cell amount not enough")?,
)
.pack(),
)
.witness([].pack())
.cell_dep(sudt_transfer_dep.clone())
.cell_dep(sudt_dep)
// sudt output.
.output(
packed::CellOutput::new_builder()
.lock(receiver_lock_script.clone())
.type_(Some(sudt_type_script).pack())
.build_exact_capacity(Capacity::bytes(16).unwrap())
.unwrap(),
)
.output_data(sudt_amount_data(pd.amount.into()).pack())
// capacity input and witness.
.input(simple_input(user_input.out_point.into()))
.witness(placeholder_witness.as_bytes().pack());
let base_denom = pd.denom.split('/').last().context("get base denom")?;
let sudt_type_hash = hex::decode(base_denom).context("decode base denom")?;

let tx = add_ibc_envelope(tx, &envelope).build();
let (sudt_transfer_dep, st_cell, st_cell_amount, sudt_type_script) =
get_st_cell_by_sudt_type_hash(client, config, sudt_type_hash).await?;

let tx = complete_tx(&config.ckb_rpc_url, &tx, receiver_lock_script.clone(), sk)?;
let sudt_dep = get_type_dep_from_cell(client, &st_cell)
.await
.context("get sudt dep")?;

send_transaction(&config.ckb_rpc_url, tx)?;
let axon_metadata_cell = get_latest_cell_by_type_script(
client,
config.sdk_config.axon_metadata_type_script().into(),
)
.await?;
let channel_contract_cell = get_latest_cell_by_type_script(
client,
config.sdk_config.channel_contract_type_script().into(),
)
.await?;
let packet_contract_cell = get_latest_cell_by_type_script(
client,
config.sdk_config.packet_contract_type_script().into(),
)
.await?;
let channel = IbcChannelCell::get_latest(client, &config.sdk_config).await?;

received += 1;
}
let (tx, envelope) = assemble_write_ack_partial_transaction(
simple_dep(axon_metadata_cell.out_point.into()),
simple_dep(channel_contract_cell.out_point.into()),
simple_dep(packet_contract_cell.out_point.into()),
&config.sdk_config,
channel,
p,
vec![1],
)?;

println!("received {received} packets");
let user_input = get_capacity_input(client, receiver_lock_script).await?;

Ok(())
// sighash placeholder witness.
let placeholder_witness = packed::WitnessArgs::new_builder()
.lock(Some(Bytes::from_static(&[0u8; 65])).pack())
.build();
let tx = tx
// st-cell input/output.
.input(simple_input(st_cell.out_point.into()))
.output(packed::CellOutput::from(st_cell.output))
.output_data(
sudt_amount_data(
st_cell_amount
.checked_sub(pd.amount.into())
.context("st-cell amount not enough")?,
)
.pack(),
)
.witness([].pack())
.cell_dep(sudt_transfer_dep.clone())
.cell_dep(sudt_dep)
// sudt output.
.output(
packed::CellOutput::new_builder()
.lock(receiver_lock_script.clone())
.type_(Some(sudt_type_script).pack())
.build_exact_capacity(Capacity::bytes(16).unwrap())
.unwrap(),
)
.output_data(sudt_amount_data(pd.amount.into()).pack())
// capacity input and witness.
.input(simple_input(user_input.out_point.into()))
.witness(placeholder_witness.as_bytes().pack());

let tx = add_ibc_envelope(tx, &envelope).build();

let tx = complete_tx(&config.ckb_rpc_url, &tx, receiver_lock_script.clone(), sk)?;

send_transaction(&config.ckb_rpc_url, tx)
}

async fn get_capacity_input(
Expand Down

0 comments on commit ca40d82

Please sign in to comment.