diff --git a/Cargo.lock b/Cargo.lock index a6c001fe0..8f72928b3 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -130,6 +130,18 @@ dependencies = [ "syn 1.0.109", ] +[[package]] +name = "anchor-attribute-access-control" +version = "0.30.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "47fe28365b33e8334dd70ae2f34a43892363012fe239cf37d2ee91693575b1f8" +dependencies = [ + "anchor-syn 0.30.1", + "proc-macro2", + "quote", + "syn 1.0.109", +] + [[package]] name = "anchor-attribute-account" version = "0.29.0" @@ -143,6 +155,19 @@ dependencies = [ "syn 1.0.109", ] +[[package]] +name = "anchor-attribute-account" +version = "0.30.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "3c288d496168268d198d9b53ee9f4f9d260a55ba4df9877ea1d4486ad6109e0f" +dependencies = [ + "anchor-syn 0.30.1", + "bs58 0.5.0", + "proc-macro2", + "quote", + "syn 1.0.109", +] + [[package]] name = "anchor-attribute-constant" version = "0.29.0" @@ -154,6 +179,17 @@ dependencies = [ "syn 1.0.109", ] +[[package]] +name = "anchor-attribute-constant" +version = "0.30.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "49b77b6948d0eeaaa129ce79eea5bbbb9937375a9241d909ca8fb9e006bb6e90" +dependencies = [ + "anchor-syn 0.30.1", + "quote", + "syn 1.0.109", +] + [[package]] name = "anchor-attribute-error" version = "0.29.0" @@ -165,6 +201,17 @@ dependencies = [ "syn 1.0.109", ] +[[package]] +name = "anchor-attribute-error" +version = "0.30.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "4d20bb569c5a557c86101b944721d865e1fd0a4c67c381d31a44a84f07f84828" +dependencies = [ + "anchor-syn 0.30.1", + "quote", + "syn 1.0.109", +] + [[package]] name = "anchor-attribute-event" version = "0.29.0" @@ -177,6 +224,18 @@ dependencies = [ "syn 1.0.109", ] +[[package]] +name = "anchor-attribute-event" +version = "0.30.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "4cebd8d0671a3a9dc3160c48598d652c34c77de6be4d44345b8b514323284d57" +dependencies = [ + "anchor-syn 0.30.1", + "proc-macro2", + "quote", + "syn 1.0.109", +] + [[package]] name = "anchor-attribute-program" version = "0.29.0" @@ -188,13 +247,30 @@ dependencies = [ "syn 1.0.109", ] +[[package]] +name = "anchor-attribute-program" +version = "0.30.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "efb2a5eb0860e661ab31aff7bb5e0288357b176380e985bade4ccb395981b42d" +dependencies = [ + "anchor-lang-idl", + "anchor-syn 0.30.1", + "anyhow", + "bs58 0.5.0", + "heck 0.3.3", + "proc-macro2", + "quote", + "serde_json", + "syn 1.0.109", +] + [[package]] name = "anchor-client" version = "0.29.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "cb48c4a7911038da546dc752655a29fa49f6bd50ebc1edca218bac8da1012acd" dependencies = [ - "anchor-lang", + "anchor-lang 0.29.0", "anyhow", "futures", "regex", @@ -218,6 +294,17 @@ dependencies = [ "syn 1.0.109", ] +[[package]] +name = "anchor-derive-accounts" +version = "0.30.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "04368b5abef4266250ca8d1d12f4dff860242681e4ec22b885dcfe354fd35aa1" +dependencies = [ + "anchor-syn 0.30.1", + "quote", + "syn 1.0.109", +] + [[package]] name = "anchor-derive-serde" version = "0.29.0" @@ -231,6 +318,19 @@ dependencies = [ "syn 1.0.109", ] +[[package]] +name = "anchor-derive-serde" +version = "0.30.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "e0bb0e0911ad4a70cab880cdd6287fe1e880a1a9d8e4e6defa8e9044b9796a6c" +dependencies = [ + "anchor-syn 0.30.1", + "borsh-derive-internal 0.10.3", + "proc-macro2", + "quote", + "syn 1.0.109", +] + [[package]] name = "anchor-derive-space" version = "0.29.0" @@ -242,6 +342,17 @@ dependencies = [ "syn 1.0.109", ] +[[package]] +name = "anchor-derive-space" +version = "0.30.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "5ef415ff156dc82e9ecb943189b0cb241b3a6bfc26a180234dc21bd3ef3ce0cb" +dependencies = [ + "proc-macro2", + "quote", + "syn 1.0.109", +] + [[package]] name = "anchor-gen" version = "0.3.1" @@ -290,15 +401,15 @@ version = "0.29.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "35da4785497388af0553586d55ebdc08054a8b1724720ef2749d313494f2b8ad" dependencies = [ - "anchor-attribute-access-control", - "anchor-attribute-account", - "anchor-attribute-constant", - "anchor-attribute-error", - "anchor-attribute-event", - "anchor-attribute-program", - "anchor-derive-accounts", - "anchor-derive-serde", - "anchor-derive-space", + "anchor-attribute-access-control 0.29.0", + "anchor-attribute-account 0.29.0", + "anchor-attribute-constant 0.29.0", + "anchor-attribute-error 0.29.0", + "anchor-attribute-event 0.29.0", + "anchor-attribute-program 0.29.0", + "anchor-derive-accounts 0.29.0", + "anchor-derive-serde 0.29.0", + "anchor-derive-space 0.29.0", "arrayref", "base64 0.13.1", "bincode", @@ -309,15 +420,64 @@ dependencies = [ "thiserror", ] +[[package]] +name = "anchor-lang" +version = "0.30.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "6620c9486d9d36a4389cab5e37dc34a42ed0bfaa62e6a75a2999ce98f8f2e373" +dependencies = [ + "anchor-attribute-access-control 0.30.1", + "anchor-attribute-account 0.30.1", + "anchor-attribute-constant 0.30.1", + "anchor-attribute-error 0.30.1", + "anchor-attribute-event 0.30.1", + "anchor-attribute-program 0.30.1", + "anchor-derive-accounts 0.30.1", + "anchor-derive-serde 0.30.1", + "anchor-derive-space 0.30.1", + "arrayref", + "base64 0.21.7", + "bincode", + "borsh 0.10.3", + "bytemuck", + "getrandom 0.2.10", + "solana-program", + "thiserror", +] + +[[package]] +name = "anchor-lang-idl" +version = "0.1.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "31cf97b4e6f7d6144a05e435660fcf757dbc3446d38d0e2b851d11ed13625bba" +dependencies = [ + "anchor-lang-idl-spec", + "anyhow", + "heck 0.3.3", + "serde", + "serde_json", + "sha2 0.10.8", +] + +[[package]] +name = "anchor-lang-idl-spec" +version = "0.1.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "2bdf143115440fe621bdac3a29a1f7472e09f6cd82b2aa569429a0c13f103838" +dependencies = [ + "anyhow", + "serde", +] + [[package]] name = "anchor-spl" version = "0.29.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "6c4fd6e43b2ca6220d2ef1641539e678bfc31b6cc393cf892b373b5997b6a39a" dependencies = [ - "anchor-lang", + "anchor-lang 0.29.0", "solana-program", - "spl-associated-token-account", + "spl-associated-token-account 2.3.0", "spl-token 4.0.0", "spl-token-2022 0.9.0", ] @@ -359,6 +519,24 @@ dependencies = [ "thiserror", ] +[[package]] +name = "anchor-syn" +version = "0.30.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "f99daacb53b55cfd37ce14d6c9905929721137fd4c67bbab44a19802aecb622f" +dependencies = [ + "anyhow", + "bs58 0.5.0", + "heck 0.3.3", + "proc-macro2", + "quote", + "serde", + "serde_json", + "sha2 0.10.8", + "syn 1.0.109", + "thiserror", +] + [[package]] name = "android-tzdata" version = "0.1.1" @@ -1046,7 +1224,7 @@ dependencies = [ "aws-smithy-http-tower 0.51.0", "aws-smithy-types 0.51.0", "bytes", - "fastrand 1.8.0", + "fastrand", "http 0.2.11", "http-body 0.4.5", "hyper 0.14.28", @@ -1069,7 +1247,7 @@ dependencies = [ "aws-smithy-http-tower 0.54.4", "aws-smithy-types 0.54.4", "bytes", - "fastrand 1.8.0", + "fastrand", "http 0.2.11", "http-body 0.4.5", "pin-project-lite", @@ -1443,7 +1621,7 @@ dependencies = [ "rand_chacha 0.3.0", "rust_decimal", "serde", - "sha2 0.10.8", + "sha2 0.9.9", "thiserror", ] @@ -1950,7 +2128,7 @@ version = "0.1.0" source = "git+https://github.com/helium/helium-anchor-gen.git#3036b33793cfe54b20ab24761677493510d5bd50" dependencies = [ "anchor-gen", - "anchor-lang", + "anchor-lang 0.29.0", ] [[package]] @@ -2591,7 +2769,7 @@ version = "0.2.2" source = "git+https://github.com/helium/helium-anchor-gen.git#3036b33793cfe54b20ab24761677493510d5bd50" dependencies = [ "anchor-gen", - "anchor-lang", + "anchor-lang 0.29.0", ] [[package]] @@ -2963,22 +3141,13 @@ version = "2.5.3" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "0206175f82b8d6bf6652ff7d71a1e27fd2e4efde587fd368662814d6ec1d9ce0" -[[package]] -name = "exponential-backoff" -version = "2.0.0" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "6ffb309d235a642598183aeda8925e871e85dd5a433c2c877e69ff0a960f4c02" -dependencies = [ - "fastrand 2.3.0", -] - [[package]] name = "fanout" version = "0.1.0" source = "git+https://github.com/helium/helium-anchor-gen.git#3036b33793cfe54b20ab24761677493510d5bd50" dependencies = [ "anchor-gen", - "anchor-lang", + "anchor-lang 0.29.0", ] [[package]] @@ -2999,12 +3168,6 @@ dependencies = [ "instant", ] -[[package]] -name = "fastrand" -version = "2.3.0" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "37909eebbb50d72f9059c3b6d82c0463f2ff062c9e95845c43a6c9c0355411be" - [[package]] name = "feature-probe" version = "0.1.1" @@ -3222,7 +3385,7 @@ version = "1.12.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "7694489acd39452c77daa48516b894c153f192c3578d5a839b62c58099fcbf48" dependencies = [ - "fastrand 1.8.0", + "fastrand", "futures-core", "futures-io", "memchr", @@ -3560,7 +3723,7 @@ version = "0.1.0" source = "git+https://github.com/helium/helium-anchor-gen.git#3036b33793cfe54b20ab24761677493510d5bd50" dependencies = [ "anchor-gen", - "anchor-lang", + "anchor-lang 0.29.0", "circuit-breaker", "data-credits", "fanout", @@ -3586,7 +3749,7 @@ dependencies = [ "bs58 0.5.0", "byteorder", "ed25519-compact", - "getrandom 0.2.10", + "getrandom 0.1.16", "k256", "lazy_static", "multihash", @@ -3607,13 +3770,13 @@ version = "0.2.11" source = "git+https://github.com/helium/helium-anchor-gen.git#3036b33793cfe54b20ab24761677493510d5bd50" dependencies = [ "anchor-gen", - "anchor-lang", + "anchor-lang 0.29.0", ] [[package]] name = "helium-lib" -version = "0.1.1" -source = "git+https://github.com/helium/helium-wallet-rs.git?branch=master#0a865a2cb5f4a537b055095f093169225eff8ae2" +version = "0.0.0" +source = "git+https://github.com/helium/helium-wallet-rs.git?branch=master#21208fcf8dcaaee955fa38798d9fa0da6b8d3f7b" dependencies = [ "anchor-client", "anchor-spl", @@ -3642,7 +3805,7 @@ dependencies = [ "solana-sdk", "solana-transaction-status", "spl-account-compression", - "spl-associated-token-account", + "spl-associated-token-account 3.0.2", "spl-memo", "thiserror", "tonic", @@ -3672,7 +3835,7 @@ version = "0.1.8" source = "git+https://github.com/helium/helium-anchor-gen.git#3036b33793cfe54b20ab24761677493510d5bd50" dependencies = [ "anchor-gen", - "anchor-lang", + "anchor-lang 0.29.0", ] [[package]] @@ -3726,7 +3889,7 @@ version = "0.1.0" source = "git+https://github.com/helium/helium-anchor-gen.git#3036b33793cfe54b20ab24761677493510d5bd50" dependencies = [ "anchor-gen", - "anchor-lang", + "anchor-lang 0.29.0", ] [[package]] @@ -4542,7 +4705,7 @@ version = "0.2.0" source = "git+https://github.com/helium/helium-anchor-gen.git#3036b33793cfe54b20ab24761677493510d5bd50" dependencies = [ "anchor-gen", - "anchor-lang", + "anchor-lang 0.29.0", ] [[package]] @@ -4551,7 +4714,7 @@ version = "0.2.0" source = "git+https://github.com/helium/helium-anchor-gen.git#3036b33793cfe54b20ab24761677493510d5bd50" dependencies = [ "anchor-gen", - "anchor-lang", + "anchor-lang 0.29.0", ] [[package]] @@ -4931,7 +5094,7 @@ version = "0.1.3" source = "git+https://github.com/helium/helium-anchor-gen.git#3036b33793cfe54b20ab24761677493510d5bd50" dependencies = [ "anchor-gen", - "anchor-lang", + "anchor-lang 0.29.0", ] [[package]] @@ -5361,7 +5524,7 @@ version = "0.7.2" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "681030a937600a36906c185595136d26abfebb4aa9c65701cefcaf8578bb982b" dependencies = [ - "proc-macro-crate 3.1.0", + "proc-macro-crate 1.1.3", "proc-macro2", "quote", "syn 2.0.58", @@ -5787,7 +5950,7 @@ version = "0.2.1" source = "git+https://github.com/helium/helium-anchor-gen.git#3036b33793cfe54b20ab24761677493510d5bd50" dependencies = [ "anchor-gen", - "anchor-lang", + "anchor-lang 0.29.0", ] [[package]] @@ -5960,9 +6123,9 @@ dependencies = [ [[package]] name = "pyth-solana-receiver-sdk" version = "0.3.1" -source = "git+https://github.com/madninja/pyth-crosschain.git?branch=madninja%2Fcap_solana_dep#0b40f6d9599c502b2e1dab4262efba66459b8690" +source = "git+https://github.com/madninja/pyth-crosschain.git?branch=madninja%2Fcap_solana_dep#6576247294bde3ab7b62f7a2dfb4d4d48c401b35" dependencies = [ - "anchor-lang", + "anchor-lang 0.29.0", "hex", "pythnet-sdk", "solana-program", @@ -5971,9 +6134,9 @@ dependencies = [ [[package]] name = "pythnet-sdk" version = "2.3.0" -source = "git+https://github.com/madninja/pyth-crosschain.git?branch=madninja%2Fcap_solana_dep#0b40f6d9599c502b2e1dab4262efba66459b8690" +source = "git+https://github.com/madninja/pyth-crosschain.git?branch=madninja%2Fcap_solana_dep#6576247294bde3ab7b62f7a2dfb4d4d48c401b35" dependencies = [ - "anchor-lang", + "anchor-lang 0.30.1", "bincode", "borsh 0.10.3", "bytemuck", @@ -6436,7 +6599,7 @@ version = "0.2.0" source = "git+https://github.com/helium/helium-anchor-gen.git#3036b33793cfe54b20ab24761677493510d5bd50" dependencies = [ "anchor-gen", - "anchor-lang", + "anchor-lang 0.29.0", ] [[package]] @@ -7156,12 +7319,10 @@ dependencies = [ "async-trait", "chrono", "clap 4.4.8", - "exponential-backoff", "file-store", "futures", "helium-anchor-gen", "helium-crypto", - "helium-lib", "itertools", "metrics", "serde", @@ -7194,8 +7355,8 @@ dependencies = [ "solana-sdk", "spl-token 4.0.0", "spl-token-2022 1.0.0", - "spl-token-group-interface", - "spl-token-metadata-interface", + "spl-token-group-interface 0.1.0", + "spl-token-metadata-interface 0.2.0", "thiserror", "zstd", ] @@ -7799,7 +7960,7 @@ dependencies = [ "serde_json", "solana-account-decoder", "solana-sdk", - "spl-associated-token-account", + "spl-associated-token-account 2.3.0", "spl-memo", "spl-token 4.0.0", "spl-token-2022 1.0.0", @@ -7938,7 +8099,7 @@ version = "0.3.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "85c43bd4455d9fb29b9e4f83c087ccffa2f6f41fecfc0549932ae391d00f3378" dependencies = [ - "anchor-lang", + "anchor-lang 0.29.0", "bytemuck", "spl-concurrent-merkle-tree", "spl-noop", @@ -7960,6 +8121,22 @@ dependencies = [ "thiserror", ] +[[package]] +name = "spl-associated-token-account" +version = "3.0.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "a2e688554bac5838217ffd1fab7845c573ff106b6336bf7d290db7c98d5a8efd" +dependencies = [ + "assert_matches", + "borsh 1.5.1", + "num-derive 0.4.2", + "num-traits", + "solana-program", + "spl-token 4.0.0", + "spl-token-2022 3.0.2", + "thiserror", +] + [[package]] name = "spl-concurrent-merkle-tree" version = "0.2.0" @@ -7979,7 +8156,18 @@ checksum = "daa600f2fe56f32e923261719bae640d873edadbc5237681a39b8e37bfd4d263" dependencies = [ "bytemuck", "solana-program", - "spl-discriminator-derive", + "spl-discriminator-derive 0.1.2", +] + +[[package]] +name = "spl-discriminator" +version = "0.2.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "34d1814406e98b08c5cd02c1126f83fd407ad084adce0b05fda5730677822eac" +dependencies = [ + "bytemuck", + "solana-program", + "spl-discriminator-derive 0.2.0", ] [[package]] @@ -7989,7 +8177,18 @@ source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "07fd7858fc4ff8fb0e34090e41d7eb06a823e1057945c26d480bfc21d2338a93" dependencies = [ "quote", - "spl-discriminator-syn", + "spl-discriminator-syn 0.1.2", + "syn 2.0.58", +] + +[[package]] +name = "spl-discriminator-derive" +version = "0.2.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "d9e8418ea6269dcfb01c712f0444d2c75542c04448b480e87de59d2865edc750" +dependencies = [ + "quote", + "spl-discriminator-syn 0.2.0", "syn 2.0.58", ] @@ -8006,6 +8205,19 @@ dependencies = [ "thiserror", ] +[[package]] +name = "spl-discriminator-syn" +version = "0.2.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "8c1f05593b7ca9eac7caca309720f2eafb96355e037e6d373b909a80fe7b69b9" +dependencies = [ + "proc-macro2", + "quote", + "sha2 0.10.8", + "syn 2.0.58", + "thiserror", +] + [[package]] name = "spl-memo" version = "4.0.0" @@ -8034,7 +8246,20 @@ dependencies = [ "bytemuck", "solana-program", "solana-zk-token-sdk", - "spl-program-error", + "spl-program-error 0.3.1", +] + +[[package]] +name = "spl-pod" +version = "0.2.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "046ce669f48cf2eca1ec518916d8725596bfb655beb1c74374cf71dc6cb773c9" +dependencies = [ + "borsh 1.5.1", + "bytemuck", + "solana-program", + "solana-zk-token-sdk", + "spl-program-error 0.4.1", ] [[package]] @@ -8046,7 +8271,20 @@ dependencies = [ "num-derive 0.4.2", "num-traits", "solana-program", - "spl-program-error-derive", + "spl-program-error-derive 0.3.2", + "thiserror", +] + +[[package]] +name = "spl-program-error" +version = "0.4.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "49065093ea91f57b9b2bd81493ff705e2ad4e64507a07dbc02b085778e02770e" +dependencies = [ + "num-derive 0.4.2", + "num-traits", + "solana-program", + "spl-program-error-derive 0.4.1", "thiserror", ] @@ -8062,6 +8300,18 @@ dependencies = [ "syn 2.0.58", ] +[[package]] +name = "spl-program-error-derive" +version = "0.4.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "e6d375dd76c517836353e093c2dbb490938ff72821ab568b545fd30ab3256b3e" +dependencies = [ + "proc-macro2", + "quote", + "sha2 0.10.8", + "syn 2.0.58", +] + [[package]] name = "spl-tlv-account-resolution" version = "0.4.0" @@ -8070,10 +8320,10 @@ checksum = "062e148d3eab7b165582757453632ffeef490c02c86a48bfdb4988f63eefb3b9" dependencies = [ "bytemuck", "solana-program", - "spl-discriminator", - "spl-pod", - "spl-program-error", - "spl-type-length-value", + "spl-discriminator 0.1.1", + "spl-pod 0.1.1", + "spl-program-error 0.3.1", + "spl-type-length-value 0.3.1", ] [[package]] @@ -8084,10 +8334,24 @@ checksum = "56f335787add7fa711819f9e7c573f8145a5358a709446fe2d24bf2a88117c90" dependencies = [ "bytemuck", "solana-program", - "spl-discriminator", - "spl-pod", - "spl-program-error", - "spl-type-length-value", + "spl-discriminator 0.1.1", + "spl-pod 0.1.1", + "spl-program-error 0.3.1", + "spl-type-length-value 0.3.1", +] + +[[package]] +name = "spl-tlv-account-resolution" +version = "0.6.3" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "cace91ba08984a41556efe49cbf2edca4db2f577b649da7827d3621161784bf8" +dependencies = [ + "bytemuck", + "solana-program", + "spl-discriminator 0.2.2", + "spl-pod 0.2.2", + "spl-program-error 0.4.1", + "spl-type-length-value 0.4.3", ] [[package]] @@ -8134,11 +8398,11 @@ dependencies = [ "solana-program", "solana-zk-token-sdk", "spl-memo", - "spl-pod", + "spl-pod 0.1.1", "spl-token 4.0.0", - "spl-token-metadata-interface", + "spl-token-metadata-interface 0.2.0", "spl-transfer-hook-interface 0.3.0", - "spl-type-length-value", + "spl-type-length-value 0.3.1", "thiserror", ] @@ -8157,12 +8421,36 @@ dependencies = [ "solana-security-txt", "solana-zk-token-sdk", "spl-memo", - "spl-pod", + "spl-pod 0.1.1", "spl-token 4.0.0", - "spl-token-group-interface", - "spl-token-metadata-interface", + "spl-token-group-interface 0.1.0", + "spl-token-metadata-interface 0.2.0", "spl-transfer-hook-interface 0.4.1", - "spl-type-length-value", + "spl-type-length-value 0.3.1", + "thiserror", +] + +[[package]] +name = "spl-token-2022" +version = "3.0.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "e5412f99ae7ee6e0afde00defaa354e6228e47e30c0e3adf553e2e01e6abb584" +dependencies = [ + "arrayref", + "bytemuck", + "num-derive 0.4.2", + "num-traits", + "num_enum 0.7.2", + "solana-program", + "solana-security-txt", + "solana-zk-token-sdk", + "spl-memo", + "spl-pod 0.2.2", + "spl-token 4.0.0", + "spl-token-group-interface 0.2.3", + "spl-token-metadata-interface 0.3.3", + "spl-transfer-hook-interface 0.6.3", + "spl-type-length-value 0.4.3", "thiserror", ] @@ -8174,9 +8462,22 @@ checksum = "b889509d49fa74a4a033ca5dae6c2307e9e918122d97e58562f5c4ffa795c75d" dependencies = [ "bytemuck", "solana-program", - "spl-discriminator", - "spl-pod", - "spl-program-error", + "spl-discriminator 0.1.1", + "spl-pod 0.1.1", + "spl-program-error 0.3.1", +] + +[[package]] +name = "spl-token-group-interface" +version = "0.2.3" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "d419b5cfa3ee8e0f2386fd7e02a33b3ec8a7db4a9c7064a2ea24849dc4a273b6" +dependencies = [ + "bytemuck", + "solana-program", + "spl-discriminator 0.2.2", + "spl-pod 0.2.2", + "spl-program-error 0.4.1", ] [[package]] @@ -8187,10 +8488,24 @@ checksum = "4c16ce3ba6979645fb7627aa1e435576172dd63088dc7848cb09aa331fa1fe4f" dependencies = [ "borsh 0.10.3", "solana-program", - "spl-discriminator", - "spl-pod", - "spl-program-error", - "spl-type-length-value", + "spl-discriminator 0.1.1", + "spl-pod 0.1.1", + "spl-program-error 0.3.1", + "spl-type-length-value 0.3.1", +] + +[[package]] +name = "spl-token-metadata-interface" +version = "0.3.3" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "30179c47e93625680dabb620c6e7931bd12d62af390f447bc7beb4a3a9b5feee" +dependencies = [ + "borsh 1.5.1", + "solana-program", + "spl-discriminator 0.2.2", + "spl-pod 0.2.2", + "spl-program-error 0.4.1", + "spl-type-length-value 0.4.3", ] [[package]] @@ -8202,11 +8517,11 @@ dependencies = [ "arrayref", "bytemuck", "solana-program", - "spl-discriminator", - "spl-pod", - "spl-program-error", + "spl-discriminator 0.1.1", + "spl-pod 0.1.1", + "spl-program-error 0.3.1", "spl-tlv-account-resolution 0.4.0", - "spl-type-length-value", + "spl-type-length-value 0.3.1", ] [[package]] @@ -8218,11 +8533,27 @@ dependencies = [ "arrayref", "bytemuck", "solana-program", - "spl-discriminator", - "spl-pod", - "spl-program-error", + "spl-discriminator 0.1.1", + "spl-pod 0.1.1", + "spl-program-error 0.3.1", "spl-tlv-account-resolution 0.5.2", - "spl-type-length-value", + "spl-type-length-value 0.3.1", +] + +[[package]] +name = "spl-transfer-hook-interface" +version = "0.6.3" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "66a98359769cd988f7b35c02558daa56d496a7e3bd8626e61f90a7c757eedb9b" +dependencies = [ + "arrayref", + "bytemuck", + "solana-program", + "spl-discriminator 0.2.2", + "spl-pod 0.2.2", + "spl-program-error 0.4.1", + "spl-tlv-account-resolution 0.6.3", + "spl-type-length-value 0.4.3", ] [[package]] @@ -8233,9 +8564,22 @@ checksum = "8f9ebd75d29c5f48de5f6a9c114e08531030b75b8ac2c557600ac7da0b73b1e8" dependencies = [ "bytemuck", "solana-program", - "spl-discriminator", - "spl-pod", - "spl-program-error", + "spl-discriminator 0.1.1", + "spl-pod 0.1.1", + "spl-program-error 0.3.1", +] + +[[package]] +name = "spl-type-length-value" +version = "0.4.3" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "422ce13429dbd41d2cee8a73931c05fda0b0c8ca156a8b0c19445642550bb61a" +dependencies = [ + "bytemuck", + "solana-program", + "spl-discriminator 0.2.2", + "spl-pod 0.2.2", + "spl-program-error 0.4.1", ] [[package]] @@ -8501,7 +8845,7 @@ source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "5cdb1ef4eaeeaddc8fbd371e5017057064af0911902ef36b39801f67cc6d79e4" dependencies = [ "cfg-if", - "fastrand 1.8.0", + "fastrand", "libc", "redox_syscall 0.2.16", "remove_dir_all", @@ -8932,7 +9276,7 @@ version = "0.2.0" source = "git+https://github.com/helium/helium-anchor-gen.git#3036b33793cfe54b20ab24761677493510d5bd50" dependencies = [ "anchor-gen", - "anchor-lang", + "anchor-lang 0.29.0", ] [[package]] @@ -8975,7 +9319,7 @@ source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "97fee6b57c6a41524a810daee9286c02d7752c4253064d0b05472833a438f675" dependencies = [ "cfg-if", - "rand 0.8.5", + "rand 0.7.3", "static_assertions", ] @@ -9180,7 +9524,7 @@ version = "0.3.3" source = "git+https://github.com/helium/helium-anchor-gen.git#3036b33793cfe54b20ab24761677493510d5bd50" dependencies = [ "anchor-gen", - "anchor-lang", + "anchor-lang 0.29.0", ] [[package]] @@ -9645,7 +9989,7 @@ dependencies = [ "rand 0.8.5", "serde", "serde_json", - "sha2 0.10.8", + "sha2 0.9.9", "thiserror", "twox-hash", "xorf", diff --git a/Cargo.toml b/Cargo.toml index 70e7d76b0..2a963bbee 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -65,7 +65,7 @@ sqlx = { version = "0", features = [ "runtime-tokio-rustls", ] } helium-anchor-gen = { git = "https://github.com/helium/helium-anchor-gen.git" } -helium-crypto = { version = "0.8.4", features = ["multisig", "solana"] } +helium-crypto = { version = "0.8.4", features = ["multisig"] } helium-lib = { git = "https://github.com/helium/helium-wallet-rs.git", branch = "master" } hextree = { git = "https://github.com/jaykickliter/HexTree", branch = "main", features = [ "disktree", @@ -128,13 +128,10 @@ sqlx = { git = "https://github.com/launchbadge/sqlx.git", rev = "42dd78fe931df65 # When attempting to test proto changes without needing to push a branch you can # patch the github url to point to your local proto repo. -# +# # Patching for beacon must point directly to the crate, it will not look in the # repo for sibling crates. -# +# # [patch.'https://github.com/helium/proto'] -# helium-proto = { path = "../proto" } -# beacon = { path = "../proto/beacon" } - -# [patch.'https://github.com/helium/helium-wallet-rs.git'] -# helium-lib = { path = "../helium-wallet-rs/helium-lib" } +# helium-proto = { path = "../../proto" } +# beacon = { path = "../../proto" } diff --git a/boost_manager/tests/integrations/updater_tests.rs b/boost_manager/tests/integrations/updater_tests.rs index ae8d57e0d..cc47c7b73 100644 --- a/boost_manager/tests/integrations/updater_tests.rs +++ b/boost_manager/tests/integrations/updater_tests.rs @@ -20,7 +20,7 @@ pub struct MockTransaction { pub struct MockSolanaConnection { submitted: Mutex>, - error: Option, + error: Option, } impl MockSolanaConnection { @@ -34,7 +34,7 @@ impl MockSolanaConnection { fn with_error(error: String) -> Self { Self { submitted: Mutex::new(vec![]), - error: Some(tonic::Status::internal(error)), + error: Some(error), } } } @@ -58,11 +58,7 @@ impl SolanaNetwork for MockSolanaConnection { self.error .as_ref() - .map(|err| { - Err(SolanaRpcError::HeliumLib(solana::error::Error::Grpc( - err.to_owned(), - ))) - }) + .map(|err| Err(SolanaRpcError::Test(err.to_owned()))) .unwrap_or(Ok(())) } diff --git a/iot_packet_verifier/src/balances.rs b/iot_packet_verifier/src/balances.rs index 51deee5b2..a5ab6cea3 100644 --- a/iot_packet_verifier/src/balances.rs +++ b/iot_packet_verifier/src/balances.rs @@ -12,7 +12,6 @@ use tokio::sync::Mutex; /// Caches balances fetched from the solana chain and debits made by the /// packet verifier. -#[derive(Clone)] pub struct BalanceCache { payer_accounts: BalanceStore, solana: S, @@ -56,10 +55,6 @@ impl BalanceCache { pub fn balances(&self) -> BalanceStore { self.payer_accounts.clone() } - - pub async fn get_payer_balance(&self, payer: &PublicKeyBinary) -> Option { - self.payer_accounts.lock().await.get(payer).cloned() - } } #[async_trait::async_trait] diff --git a/iot_packet_verifier/src/burner.rs b/iot_packet_verifier/src/burner.rs index beffd4509..3f19afd10 100644 --- a/iot_packet_verifier/src/burner.rs +++ b/iot_packet_verifier/src/burner.rs @@ -5,12 +5,10 @@ use crate::{ }, }; use futures::{future::LocalBoxFuture, TryFutureExt}; -use helium_crypto::PublicKeyBinary; -use solana::{burn::SolanaNetwork, sender, SolanaRpcError}; +use solana::{burn::SolanaNetwork, GetSignature, SolanaRpcError}; use std::time::Duration; use task_manager::ManagedTask; use tokio::time::{self, MissedTickBehavior}; -use tracing::Instrument; pub struct Burner { pending_tables: P, @@ -21,7 +19,7 @@ pub struct Burner { impl ManagedTask for Burner where - P: PendingTables, + P: PendingTables + Send + Sync + 'static, S: SolanaNetwork, { fn start_task( @@ -78,9 +76,9 @@ where loop { tokio::select! { - biased; - _ = shutdown.clone() => break, - _ = burn_timer.tick() => { + biased; + _ = shutdown.clone() => break, + _ = burn_timer.tick() => { match self.burn().await { Ok(()) => continue, Err(err) => { @@ -96,147 +94,49 @@ where } pub async fn burn(&mut self) -> Result<(), BurnError> { - // There should only be a single pending txn at a time - let pending_txns = self.pending_tables.fetch_all_pending_txns().await?; - if !pending_txns.is_empty() { - tracing::info!(pending_txns = pending_txns.len(), "skipping burn"); - return Ok(()); - } - // Fetch the next payer and amount that should be burn. If no such burn // exists, perform no action. let Some(Burn { payer, amount }) = self.pending_tables.fetch_next_burn().await? else { - tracing::info!("no pending burns"); return Ok(()); }; tracing::info!(%amount, %payer, "Burning DC"); + // Create a burn transaction and execute it: let txn = self .solana .make_burn_transaction(&payer, amount) .await .map_err(BurnError::SolanaError)?; - - let store = BurnTxnStore::new( - self.pending_tables.clone(), - self.balances.clone(), - payer.clone(), - amount, - ); - - let burn_span = tracing::info_span!("burn_txn", %payer, amount); + self.pending_tables + .add_pending_transaction(&payer, amount, txn.get_signature()) + .await?; self.solana - .submit_transaction(&txn, &store) - .map_err(BurnError::SolanaError) - .instrument(burn_span) + .submit_transaction(&txn) .await - } -} - -pub struct BurnTxnStore { - pool: PT, - balances: BalanceStore, - payer: PublicKeyBinary, - amount: u64, -} - -impl BurnTxnStore { - pub fn new(pool: PT, balances: BalanceStore, payer: PublicKeyBinary, amount: u64) -> Self { - Self { - pool, - balances, - payer, - amount, - } - } -} - -#[async_trait::async_trait] -impl sender::TxnStore for BurnTxnStore { - async fn on_prepared(&self, txn: &solana::Transaction) -> sender::SenderResult<()> { - tracing::info!("txn prepared"); - - let signature = txn.get_signature(); - let add_pending = self - .pool - .add_pending_transaction(&self.payer, self.amount, signature); - - let Ok(()) = add_pending.await else { - tracing::error!("failed to add pending transcation"); - return Err(sender::SenderError::preparation( - "could not add pending transaction", - )); - }; - - Ok(()) - } - - async fn on_finalized(&self, txn: &solana::Transaction) { - tracing::info!("txn finalized"); - - let Ok(mut db_txn) = self.pool.begin().await else { - tracing::error!("failed to start finalized txn db transaction"); - return; - }; - - let signature = txn.get_signature(); - let Ok(()) = db_txn.remove_pending_transaction(signature).await else { - tracing::error!("failed to remove pending"); - return; - }; + .map_err(BurnError::SolanaError)?; - let Ok(()) = db_txn - .subtract_burned_amount(&self.payer, self.amount) - .await - else { - tracing::error!("failed to subtract burned amount"); - return; - }; + // Removing the pending transaction and subtract the burn amount + // now that we have confirmation that the burn transaction is confirmed + // on chain: + let mut pending_tables_txn = self.pending_tables.begin().await?; + pending_tables_txn + .remove_pending_transaction(txn.get_signature()) + .await?; + pending_tables_txn + .subtract_burned_amount(&payer, amount) + .await?; + pending_tables_txn.commit().await?; - // Subtract balances from map before submitted db txn let mut balance_lock = self.balances.lock().await; - let payer_account = balance_lock.get_mut(&self.payer).unwrap(); - // Reduce the pending burn amount and the payer's balance by the amount we've burned - payer_account.burned = payer_account.burned.saturating_sub(self.amount); - payer_account.balance = payer_account.balance.saturating_sub(self.amount); + let payer_account = balance_lock.get_mut(&payer).unwrap(); + // Reduce the pending burn amount and the payer's balance by the amount + // we've burned. + payer_account.burned = payer_account.burned.saturating_sub(amount); + payer_account.balance = payer_account.balance.saturating_sub(amount); - let Ok(()) = db_txn.commit().await else { - tracing::error!("failed to commit finalized txn db transaction"); - return; - }; + metrics::counter!("burned", "payer" => payer.to_string()).increment(amount); - metrics::counter!( - "burned", - "payer" => self.payer.to_string(), - "success" => "true" - ) - .increment(self.amount); - } - - async fn on_error(&self, txn: &solana::Transaction, err: sender::SenderError) { - tracing::warn!(?err, "txn failed"); - let Ok(mut db_txn) = self.pool.begin().await else { - tracing::error!("failed to start error transaction"); - return; - }; - - let signature = txn.get_signature(); - let Ok(()) = db_txn.remove_pending_transaction(signature).await else { - tracing::error!("failed to remove pending transaction on error"); - return; - }; - - let Ok(()) = db_txn.commit().await else { - tracing::error!("failed to commit on error transaction"); - return; - }; - - metrics::counter!( - "burned", - "payer" => self.payer.to_string(), - "success" => "false" - ) - .increment(self.amount); + Ok(()) } } diff --git a/iot_packet_verifier/src/daemon.rs b/iot_packet_verifier/src/daemon.rs index 0cd1ca285..b442e77e2 100644 --- a/iot_packet_verifier/src/daemon.rs +++ b/iot_packet_verifier/src/daemon.rs @@ -112,7 +112,7 @@ impl Cmd { bail!("Missing solana section in settings"); }; // Set up the solana RpcClient: - Some(SolanaRpc::new(solana_settings, solana::SubDao::Iot).await?) + Some(SolanaRpc::new(solana_settings).await?) } else { None }; diff --git a/iot_packet_verifier/src/pending.rs b/iot_packet_verifier/src/pending.rs index 97b2df272..561059651 100644 --- a/iot_packet_verifier/src/pending.rs +++ b/iot_packet_verifier/src/pending.rs @@ -4,12 +4,14 @@ use helium_crypto::PublicKeyBinary; use solana::{burn::SolanaNetwork, SolanaRpcError}; use solana_sdk::signature::Signature; use sqlx::{postgres::PgRow, FromRow, PgPool, Postgres, Row, Transaction}; +use std::{collections::HashMap, sync::Arc}; +use tokio::sync::Mutex; use crate::balances::BalanceStore; /// To avoid excessive burn transaction (which cost us money), we institute a minimum /// amount of Data Credits accounted for before we burn from a payer: -pub const BURN_THRESHOLD: i64 = 10_000; +const BURN_THRESHOLD: i64 = 10_000; #[async_trait] pub trait AddPendingBurn { @@ -21,7 +23,7 @@ pub trait AddPendingBurn { } #[async_trait] -pub trait PendingTables: Send + Sync + Clone + 'static { +pub trait PendingTables { type Transaction<'a>: PendingTablesTransaction<'a> + Send + Sync where Self: 'a; @@ -37,17 +39,6 @@ pub trait PendingTables: Send + Sync + Clone + 'static { payer: &PublicKeyBinary, amount: u64, signature: &Signature, - ) -> Result<(), sqlx::Error> { - self.do_add_pending_transaction(payer, amount, signature, Utc::now()) - .await - } - - async fn do_add_pending_transaction( - &self, - payer: &PublicKeyBinary, - amount: u64, - signature: &Signature, - time_of_submission: DateTime, ) -> Result<(), sqlx::Error>; async fn begin<'a>(&'a self) -> Result, sqlx::Error>; @@ -151,12 +142,11 @@ impl PendingTables for PgPool { .await } - async fn do_add_pending_transaction( + async fn add_pending_transaction( &self, payer: &PublicKeyBinary, amount: u64, signature: &Signature, - time_of_submission: DateTime, ) -> Result<(), sqlx::Error> { sqlx::query( r#" @@ -167,7 +157,7 @@ impl PendingTables for PgPool { .bind(signature.to_string()) .bind(payer) .bind(amount as i64) - .bind(time_of_submission) + .bind(Utc::now()) .execute(self) .await?; Ok(()) @@ -276,3 +266,229 @@ impl FromRow<'_, PgRow> for PendingTxn { }) } } + +#[async_trait] +impl AddPendingBurn for Arc>> { + async fn add_burned_amount( + &mut self, + payer: &PublicKeyBinary, + amount: u64, + ) -> Result<(), sqlx::Error> { + let mut map = self.lock().await; + *map.entry(payer.clone()).or_default() += amount; + Ok(()) + } +} + +#[derive(Clone)] +pub struct MockPendingTxn { + payer: PublicKeyBinary, + amount: u64, + time_of_submission: DateTime, +} + +#[derive(Default, Clone)] +pub struct MockPendingTables { + pub pending_txns: Arc>>, + pub pending_burns: Arc>>, +} + +#[async_trait] +impl PendingTables for MockPendingTables { + type Transaction<'a> = &'a MockPendingTables; + + async fn fetch_next_burn(&self) -> Result, sqlx::Error> { + Ok(self + .pending_burns + .lock() + .await + .iter() + .max_by_key(|(_, amount)| **amount) + .map(|(payer, &amount)| Burn { + payer: payer.clone(), + amount, + })) + } + + async fn fetch_all_pending_burns(&self) -> Result, sqlx::Error> { + Ok(self + .pending_burns + .lock() + .await + .clone() + .into_iter() + .map(|(payer, amount)| Burn { payer, amount }) + .collect()) + } + + async fn fetch_all_pending_txns(&self) -> Result, sqlx::Error> { + Ok(self + .pending_txns + .lock() + .await + .clone() + .into_iter() + .map(|(signature, mock)| PendingTxn { + signature, + payer: mock.payer, + amount: mock.amount, + time_of_submission: mock.time_of_submission, + }) + .collect()) + } + + async fn add_pending_transaction( + &self, + payer: &PublicKeyBinary, + amount: u64, + signature: &Signature, + ) -> Result<(), sqlx::Error> { + self.pending_txns.lock().await.insert( + *signature, + MockPendingTxn { + payer: payer.clone(), + amount, + time_of_submission: Utc::now(), + }, + ); + Ok(()) + } + + async fn begin<'a>(&'a self) -> Result, sqlx::Error> { + Ok(self) + } +} + +#[async_trait] +impl<'a> PendingTablesTransaction<'a> for &'a MockPendingTables { + async fn remove_pending_transaction( + &mut self, + signature: &Signature, + ) -> Result<(), sqlx::Error> { + self.pending_txns.lock().await.remove(signature); + Ok(()) + } + + async fn subtract_burned_amount( + &mut self, + payer: &PublicKeyBinary, + amount: u64, + ) -> Result<(), sqlx::Error> { + let mut map = self.pending_burns.lock().await; + let balance = map.get_mut(payer).unwrap(); + *balance -= amount; + Ok(()) + } + + async fn commit(self) -> Result<(), sqlx::Error> { + Ok(()) + } +} + +#[cfg(test)] +mod test { + + use crate::balances::PayerAccount; + + use super::*; + use std::collections::HashSet; + + #[derive(Clone)] + struct MockConfirmed(HashSet); + + #[async_trait] + impl SolanaNetwork for MockConfirmed { + type Transaction = Signature; + + #[allow(clippy::diverging_sub_expression)] + async fn payer_balance(&self, _payer: &PublicKeyBinary) -> Result { + unreachable!() + } + + #[allow(clippy::diverging_sub_expression)] + async fn make_burn_transaction( + &self, + _payer: &PublicKeyBinary, + _amount: u64, + ) -> Result { + unreachable!() + } + + #[allow(clippy::diverging_sub_expression)] + async fn submit_transaction( + &self, + _transaction: &Self::Transaction, + ) -> Result<(), SolanaRpcError> { + unreachable!() + } + + async fn confirm_transaction(&self, txn: &Signature) -> Result { + Ok(self.0.contains(txn)) + } + } + + #[tokio::test] + async fn test_confirm_pending_txns() { + let confirmed = Signature::new_unique(); + let unconfirmed = Signature::new_unique(); + let payer: PublicKeyBinary = "112NqN2WWMwtK29PMzRby62fDydBJfsCLkCAf392stdok48ovNT6" + .parse() + .unwrap(); + let mut pending_txns = HashMap::new(); + const CONFIRMED_BURN_AMOUNT: u64 = 7; + const UNCONFIRMED_BURN_AMOUNT: u64 = 11; + pending_txns.insert( + confirmed, + MockPendingTxn { + payer: payer.clone(), + amount: CONFIRMED_BURN_AMOUNT, + time_of_submission: Utc::now() - Duration::minutes(1), + }, + ); + pending_txns.insert( + unconfirmed, + MockPendingTxn { + payer: payer.clone(), + amount: UNCONFIRMED_BURN_AMOUNT, + time_of_submission: Utc::now() - Duration::minutes(1), + }, + ); + let mut balances = HashMap::new(); + balances.insert( + payer.clone(), + PayerAccount { + balance: CONFIRMED_BURN_AMOUNT + UNCONFIRMED_BURN_AMOUNT, + burned: CONFIRMED_BURN_AMOUNT + UNCONFIRMED_BURN_AMOUNT, + }, + ); + let mut pending_burns = HashMap::new(); + pending_burns.insert( + payer.clone(), + CONFIRMED_BURN_AMOUNT + UNCONFIRMED_BURN_AMOUNT, + ); + let pending_txns = Arc::new(Mutex::new(pending_txns)); + let pending_burns = Arc::new(Mutex::new(pending_burns)); + let pending_tables = MockPendingTables { + pending_txns, + pending_burns, + }; + let mut confirmed_txns = HashSet::new(); + confirmed_txns.insert(confirmed); + let confirmed = MockConfirmed(confirmed_txns); + // Confirm and resolve transactions: + confirm_pending_txns(&pending_tables, &confirmed, &Arc::new(Mutex::new(balances))) + .await + .unwrap(); + // The amount left in the pending burns table should only be the unconfirmed + // burn amount: + assert_eq!( + *pending_tables + .pending_burns + .lock() + .await + .get(&payer) + .unwrap(), + UNCONFIRMED_BURN_AMOUNT, + ); + } +} diff --git a/iot_packet_verifier/src/verifier.rs b/iot_packet_verifier/src/verifier.rs index fb93900cf..6c5b24dc8 100644 --- a/iot_packet_verifier/src/verifier.rs +++ b/iot_packet_verifier/src/verifier.rs @@ -138,6 +138,21 @@ pub trait Debiter { ) -> Result, SolanaRpcError>; } +#[async_trait] +impl Debiter for Arc>> { + async fn debit_if_sufficient( + &self, + payer: &PublicKeyBinary, + amount: u64, + _trigger_balance_check_threshold: u64, + ) -> Result, SolanaRpcError> { + let map = self.lock().await; + let balance = map.get(payer).unwrap(); + // Don't debit the amount if we're mocking. That is a job for the burner. + Ok((*balance >= amount).then(|| balance.saturating_sub(amount))) + } +} + // TODO: Move these to a separate module pub struct Org { @@ -212,6 +227,14 @@ impl BalanceStore for crate::balances::BalanceStore { } } +#[async_trait] +// differs from the BalanceStore in the value stored in the contained HashMap; a u64 here instead of a Balance {} struct +impl BalanceStore for Arc>> { + async fn set_balance(&self, payer: &PublicKeyBinary, balance: u64) { + *self.lock().await.entry(payer.clone()).or_default() = balance; + } +} + #[derive(thiserror::Error, Debug)] pub enum MonitorError { #[error("Join error: {0}")] diff --git a/iot_packet_verifier/tests/integration_tests.rs b/iot_packet_verifier/tests/integration_tests.rs index bdb8a1c2b..b571bd106 100644 --- a/iot_packet_verifier/tests/integration_tests.rs +++ b/iot_packet_verifier/tests/integration_tests.rs @@ -13,24 +13,28 @@ use helium_proto::{ use iot_packet_verifier::{ balances::{BalanceCache, PayerAccount}, burner::Burner, - pending::{confirm_pending_txns, AddPendingBurn, Burn, PendingTables, BURN_THRESHOLD}, + pending::{confirm_pending_txns, AddPendingBurn, Burn, MockPendingTables, PendingTables}, verifier::{payload_size_to_dc, ConfigServer, ConfigServerError, Org, Verifier, BYTES_PER_DC}, }; use solana::{ - burn::{test_client::TestSolanaClientMap, SolanaNetwork}, - sender, Signature, + burn::{MockTransaction, SolanaNetwork}, + GetSignature, SolanaRpcError, }; +use solana_sdk::signature::Signature; use sqlx::PgPool; -use std::{collections::HashMap, sync::Arc, time::Duration}; +use std::{ + collections::{HashMap, HashSet}, + sync::Arc, + time::Duration, +}; use tokio::sync::Mutex; -#[derive(Debug)] struct MockConfig { payer: PublicKeyBinary, enabled: bool, } -#[derive(Debug, Default, Clone)] +#[derive(Default, Clone)] struct MockConfigServer { payers: Arc>>, } @@ -157,37 +161,48 @@ fn invalid_packet(payload_size: u32, payload_hash: Vec) -> InvalidPacket { } } -#[sqlx::test] -async fn test_config_unlocking(pool: PgPool) -> anyhow::Result<()> { +#[derive(Clone)] +struct InstantlyBurnedBalance(Arc>>); + +#[async_trait] +impl AddPendingBurn for InstantlyBurnedBalance { + async fn add_burned_amount( + &mut self, + payer: &PublicKeyBinary, + amount: u64, + ) -> Result<(), sqlx::Error> { + let mut map = self.0.lock().await; + let balance = map.get_mut(payer).unwrap(); + *balance -= amount; + Ok(()) + } +} + +#[tokio::test] +async fn test_config_unlocking() { // Set up orgs: let orgs = MockConfigServer::default(); orgs.insert(0_u64, PublicKeyBinary::from(vec![0])).await; - // Set up balances: - let mut solana_network = TestSolanaClientMap::default(); - solana_network - .insert(PublicKeyBinary::from(vec![0]), 3) - .await; - + let mut solana_network = HashMap::new(); + solana_network.insert(PublicKeyBinary::from(vec![0]), 3); + let solana_network = Arc::new(Mutex::new(solana_network)); // Set up cache: - let mut txn = pool.begin().await?; - txn.add_burned_amount(&PublicKeyBinary::from(vec![0]), 3) - .await?; - txn.commit().await?; - let balances = BalanceCache::new(&pool, solana_network.clone()).await?; - + let mut cache = HashMap::new(); + cache.insert(PublicKeyBinary::from(vec![0]), 3); + let cache = Arc::new(Mutex::new(cache)); + let balances = InstantlyBurnedBalance(cache.clone()); // Set up verifier: let mut verifier = Verifier { - debiter: balances.clone(), + debiter: balances.0.clone(), config_server: orgs.clone(), }; let mut valid_packets = Vec::new(); let mut invalid_packets = Vec::new(); - let mut pending_burn_txn = pool.begin().await?; verifier .verify( 1, - &mut pending_burn_txn, + &mut balances.clone(), stream::iter(vec![ packet_report(0, 0, 24, vec![1], false), packet_report(0, 1, 48, vec![2], false), @@ -199,22 +214,21 @@ async fn test_config_unlocking(pool: PgPool) -> anyhow::Result<()> { .await .unwrap(); - pending_burn_txn.commit().await?; - - // Orgs start out locked assert!(!orgs.payers.lock().await.get(&0).unwrap().enabled); // Update the solana network: - solana_network - .set_payer_balance(&PublicKeyBinary::from(vec![0]), 50) - .await; + *solana_network + .lock() + .await + .get_mut(&PublicKeyBinary::from(vec![0])) + .unwrap() = 50; let (trigger, listener) = triggered::trigger(); // Calling monitor funds should re-enable the org and update the // verifier's cache let solana = solana_network.clone(); - let balance_cache = balances.balances().clone(); + let balance_cache = cache.clone(); let orgs_clone = orgs.clone(); tokio::spawn(async move { orgs_clone @@ -233,24 +247,20 @@ async fn test_config_unlocking(pool: PgPool) -> anyhow::Result<()> { // We should be re-enabled assert!(orgs.payers.lock().await.get(&0).unwrap().enabled); assert_eq!( - balances - .balances() + *cache .lock() .await .get(&PublicKeyBinary::from(vec![0])) - .unwrap() - .balance, - 50, - "balance is back to 50" + .unwrap(), + 50 ); trigger.trigger(); - let mut pending_burn_txn = pool.begin().await?; verifier .verify( 1, - &mut pending_burn_txn, + &mut balances.clone(), stream::iter(vec![ packet_report(0, 0, 24, vec![1], false), packet_report(0, 1, 48, vec![2], false), @@ -262,31 +272,20 @@ async fn test_config_unlocking(pool: PgPool) -> anyhow::Result<()> { .await .unwrap(); - pending_burn_txn.commit().await?; - // Still enabled: assert!(orgs.payers.lock().await.get(&0).unwrap().enabled); - let payer_account = balances - .balances() - .lock() - .await - .get(&PublicKeyBinary::from(vec![0])) - .cloned() - .unwrap(); assert_eq!( - payer_account.balance, 50, - "balance has not been deducted because no solana burn has been sent" - ); - assert_eq!( - payer_account.burned, 7, - "burned is previous sufficient amount plus new sufficient amount" + *cache + .lock() + .await + .get(&PublicKeyBinary::from(vec![0])) + .unwrap(), + 46 ); - - Ok(()) } -#[sqlx::test] -async fn test_verifier_free_packets(pool: PgPool) -> anyhow::Result<()> { +#[tokio::test] +async fn test_verifier_free_packets() { // Org packets let packets = vec![ packet_report(0, 0, 24, vec![4], true), @@ -301,9 +300,9 @@ async fn test_verifier_free_packets(pool: PgPool) -> anyhow::Result<()> { orgs.insert(0_u64, org_pubkey.clone()).await; // Set up balances: - let mut solana_network = TestSolanaClientMap::default(); - solana_network.insert(org_pubkey.clone(), 5).await; - let balances = BalanceCache::new(&pool, solana_network).await?; + let mut balances = HashMap::new(); + balances.insert(org_pubkey.clone(), 5); + let balances = InstantlyBurnedBalance(Arc::new(Mutex::new(balances))); // Set up output: let mut valid_packets = Vec::new(); @@ -311,22 +310,20 @@ async fn test_verifier_free_packets(pool: PgPool) -> anyhow::Result<()> { // Set up verifier: let mut verifier = Verifier { - debiter: balances.clone(), + debiter: balances.0.clone(), config_server: orgs, }; // Run the verifier: - let mut pending_burn_txn = pool.begin().await?; verifier .verify( 1, - &mut pending_burn_txn, + &mut balances.clone(), stream::iter(packets), &mut valid_packets, &mut invalid_packets, ) .await .unwrap(); - pending_burn_txn.commit().await?; // Verify packet reports: assert_eq!( @@ -343,18 +340,18 @@ async fn test_verifier_free_packets(pool: PgPool) -> anyhow::Result<()> { let payers = verifier.config_server.payers.lock().await; assert!(payers.get(&0).unwrap().enabled); - let payer_balance = balances - .get_payer_balance(&org_pubkey) - .await - .expect("known payer"); - assert_eq!(payer_balance.balance, 5, "balance should not have chnaged"); - assert_eq!(payer_balance.burned, 0, "nothing should be burned"); - - Ok(()) + assert_eq!( + verifier + .debiter + .payer_balance(&org_pubkey) + .await + .expect("unchanged balance"), + 5 + ); } -#[sqlx::test] -async fn test_verifier(pool: PgPool) -> anyhow::Result<()> { +#[tokio::test] +async fn test_verifier() { let packets = vec![ // Packets for first OUI packet_report(0, 0, 24, vec![1], false), @@ -376,43 +373,31 @@ async fn test_verifier(pool: PgPool) -> anyhow::Result<()> { orgs.insert(1_u64, PublicKeyBinary::from(vec![1])).await; orgs.insert(2_u64, PublicKeyBinary::from(vec![2])).await; // Set up balances: - let mut solana_network = TestSolanaClientMap::default(); - - // let mut balances = HashMap::new(); - solana_network - .insert(PublicKeyBinary::from(vec![0]), 3) - .await; - solana_network - .insert(PublicKeyBinary::from(vec![1]), 5) - .await; - solana_network - .insert(PublicKeyBinary::from(vec![2]), 2) - .await; - let balances = BalanceCache::new(&pool, solana_network.clone()).await?; - + let mut balances = HashMap::new(); + balances.insert(PublicKeyBinary::from(vec![0]), 3); + balances.insert(PublicKeyBinary::from(vec![1]), 5); + balances.insert(PublicKeyBinary::from(vec![2]), 2); + let balances = InstantlyBurnedBalance(Arc::new(Mutex::new(balances))); // Set up output: let mut valid_packets = Vec::new(); let mut invalid_packets = Vec::new(); - // Set up verifier: let mut verifier = Verifier { - debiter: balances, + debiter: balances.0.clone(), config_server: orgs, }; // Run the verifier: - let mut pending_burn_txn = pool.begin().await?; verifier .verify( 1, - &mut pending_burn_txn, + &mut balances.clone(), stream::iter(packets), &mut valid_packets, &mut invalid_packets, ) .await .unwrap(); - pending_burn_txn.commit().await?; // Verify packet reports: assert_eq!( @@ -437,31 +422,33 @@ async fn test_verifier(pool: PgPool) -> anyhow::Result<()> { assert!(!payers.get(&0).unwrap().enabled); assert!(payers.get(&1).unwrap().enabled); assert!(payers.get(&2).unwrap().enabled); - - Ok(()) } -#[sqlx::test] -async fn test_end_to_end(pool: PgPool) -> anyhow::Result<()> { +#[tokio::test] +async fn test_end_to_end() { let payer = PublicKeyBinary::from(vec![0]); - // Our balance and packet size has to surpass BURN_THRESHOLD - // for burning to consider the verified packets. - const STARTING_BALANCE: u64 = BURN_THRESHOLD as u64 * 3; - const LARGE_PACKET_SIZE: u32 = (BURN_THRESHOLD as u64 * BYTES_PER_DC) as u32; + // Pending tables: + let pending_burns: Arc>> = + Arc::new(Mutex::new(HashMap::new())); + let pending_tables = MockPendingTables { + pending_txns: Default::default(), + pending_burns: pending_burns.clone(), + }; // Solana network: - let mut solana_network = TestSolanaClientMap::default(); - solana_network.insert(payer.clone(), STARTING_BALANCE).await; + let mut solana_network = HashMap::new(); + solana_network.insert(payer.clone(), 3_u64); // Start with 3 data credits + let solana_network = Arc::new(Mutex::new(solana_network)); // Balance cache: - let balance_cache = BalanceCache::new(&pool, solana_network.clone()) + let balance_cache = BalanceCache::new(&pending_tables, solana_network.clone()) .await .unwrap(); // Burner: let mut burner = Burner::new( - pool.clone(), + pending_tables.clone(), &balance_cache, Duration::default(), // Burn period does not matter, we manually burn solana_network.clone(), @@ -477,28 +464,26 @@ async fn test_end_to_end(pool: PgPool) -> anyhow::Result<()> { // Set up verifier: let mut verifier = Verifier { - debiter: balance_cache.clone(), + debiter: balance_cache, config_server: orgs, }; - // Verify four packets, each costing one BURN_THRESHOLD. The last one should be invalid - let mut pending_burn_txn = pool.begin().await?; + // Verify four packets, each costing one DC. The last one should be invalid verifier .verify( 1, - &mut pending_burn_txn, + &mut pending_burns.clone(), stream::iter(vec![ - packet_report(0, 0, LARGE_PACKET_SIZE, vec![1], false), - packet_report(0, 1, LARGE_PACKET_SIZE, vec![2], false), - packet_report(0, 2, LARGE_PACKET_SIZE, vec![3], false), - packet_report(0, 3, LARGE_PACKET_SIZE, vec![4], false), + packet_report(0, 0, BYTES_PER_DC as u32, vec![1], false), + packet_report(0, 1, BYTES_PER_DC as u32, vec![2], false), + packet_report(0, 2, BYTES_PER_DC as u32, vec![3], false), + packet_report(0, 3, BYTES_PER_DC as u32, vec![4], false), ]), &mut valid_packets, &mut invalid_packets, ) .await .unwrap(); - pending_burn_txn.commit().await?; // Org 0 should be disabled now: assert!( @@ -515,27 +500,30 @@ async fn test_end_to_end(pool: PgPool) -> anyhow::Result<()> { assert_eq!( valid_packets, vec![ - valid_packet(0, LARGE_PACKET_SIZE, vec![1], true), - valid_packet(1000, LARGE_PACKET_SIZE, vec![2], true), - valid_packet(2000, LARGE_PACKET_SIZE, vec![3], true), + valid_packet(0, BYTES_PER_DC as u32, vec![1], true), + valid_packet(1000, BYTES_PER_DC as u32, vec![2], true), + valid_packet(2000, BYTES_PER_DC as u32, vec![3], true), ] ); // Last packet is invalid: assert_eq!( invalid_packets, - vec![invalid_packet(LARGE_PACKET_SIZE, vec![4])] + vec![invalid_packet(BYTES_PER_DC as u32, vec![4])] ); // Check current balance: - // Check that 3x the BURN_THRESHOLD DC are pending to be burned: - let balance = verifier - .debiter - .get_payer_balance(&payer) - .await - .expect("known payer"); - assert_eq!(balance.balance, STARTING_BALANCE); - assert_eq!(balance.burned, STARTING_BALANCE); + let balance = { + let balances = verifier.debiter.balances(); + let balances = balances.lock().await; + *balances.get(&payer).unwrap() + }; + assert_eq!(balance.balance, 3); + assert_eq!(balance.burned, 3); + + // Check that 3 DC are pending to be burned: + let pending_burn = *pending_burns.lock().await.get(&payer).unwrap(); + assert_eq!(pending_burn, 3); // Initiate the burn: burner.burn().await.unwrap(); @@ -550,25 +538,21 @@ async fn test_end_to_end(pool: PgPool) -> anyhow::Result<()> { assert_eq!(balance.burned, 0); // Pending burns should be empty as well: - let payer_balance = balance_cache - .get_payer_balance(&payer) - .await - .expect("known payer"); - assert_eq!(payer_balance.burned, 0, "pending was burned"); + let pending_burn = *pending_burns.lock().await.get(&payer).unwrap(); + assert_eq!(pending_burn, 0); // Additionally, the balance on the solana network should be zero: - let solana_balance = solana_network.get_payer_balance(&payer).await; - assert_eq!(solana_balance, 0, "solana balance"); + let solana_balance = *solana_network.lock().await.get(&payer).unwrap(); + assert_eq!(solana_balance, 0); // Attempting to validate one packet should fail now: valid_packets.clear(); invalid_packets.clear(); - let mut pending_burn_txn = pool.begin().await?; verifier .verify( 1, - &mut pending_burn_txn, + &mut pending_burns.clone(), stream::iter(vec![packet_report( 0, 4, @@ -581,7 +565,6 @@ async fn test_end_to_end(pool: PgPool) -> anyhow::Result<()> { ) .await .unwrap(); - pending_burn_txn.commit().await?; assert_eq!(valid_packets, vec![]); @@ -589,16 +572,51 @@ async fn test_end_to_end(pool: PgPool) -> anyhow::Result<()> { invalid_packets, vec![invalid_packet(BYTES_PER_DC as u32, vec![5])] ); +} - Ok(()) +#[derive(Clone)] +struct MockSolanaNetwork { + confirmed: Arc>>, + ledger: Arc>>, } -struct NoopStore; +impl MockSolanaNetwork { + fn new(ledger: HashMap) -> Self { + Self { + confirmed: Arc::new(Default::default()), + ledger: Arc::new(Mutex::new(ledger)), + } + } +} + +#[async_trait] +impl SolanaNetwork for MockSolanaNetwork { + type Transaction = MockTransaction; -#[async_trait::async_trait] -impl sender::TxnStore for NoopStore {} + async fn payer_balance(&self, payer: &PublicKeyBinary) -> Result { + self.ledger.payer_balance(payer).await + } + + async fn make_burn_transaction( + &self, + payer: &PublicKeyBinary, + amount: u64, + ) -> Result { + self.ledger.make_burn_transaction(payer, amount).await + } + + async fn submit_transaction(&self, txn: &MockTransaction) -> Result<(), SolanaRpcError> { + self.confirmed.lock().await.insert(txn.signature); + self.ledger.submit_transaction(txn).await + } + + async fn confirm_transaction(&self, txn: &Signature) -> Result { + Ok(self.confirmed.lock().await.contains(txn)) + } +} #[sqlx::test] +#[ignore] async fn test_pending_txns(pool: PgPool) -> anyhow::Result<()> { const CONFIRMED_BURN_AMOUNT: u64 = 7; const UNCONFIRMED_BURN_AMOUNT: u64 = 11; @@ -618,13 +636,7 @@ async fn test_pending_txns(pool: PgPool) -> anyhow::Result<()> { burned: CONFIRMED_BURN_AMOUNT + UNCONFIRMED_BURN_AMOUNT, }, ); - let mut solana_network = TestSolanaClientMap::default(); - solana_network - .insert( - payer.clone(), - CONFIRMED_BURN_AMOUNT + UNCONFIRMED_BURN_AMOUNT, - ) - .await; + let mock_network = MockSolanaNetwork::new(ledger); // Add both the burn amounts to the pending burns table { @@ -637,46 +649,30 @@ async fn test_pending_txns(pool: PgPool) -> anyhow::Result<()> { } // First transaction is confirmed - // Make submission time in past to bypass confirm txn sleep { - let txn = solana_network + let txn = mock_network .make_burn_transaction(&payer, CONFIRMED_BURN_AMOUNT) .await .unwrap(); - pool.do_add_pending_transaction( - &payer, - CONFIRMED_BURN_AMOUNT, - txn.get_signature(), - Utc::now() - chrono::Duration::minutes(2), - ) - .await - .unwrap(); - solana_network.add_confirmed(*txn.get_signature()).await; - solana_network - .submit_transaction(&txn, &NoopStore) + pool.add_pending_transaction(&payer, CONFIRMED_BURN_AMOUNT, txn.get_signature()) .await .unwrap(); + mock_network.submit_transaction(&txn).await.unwrap(); } // Second is unconfirmed - // Make submission time in past to bypass confirm txn sleep { - let txn = solana_network + let txn = mock_network .make_burn_transaction(&payer, UNCONFIRMED_BURN_AMOUNT) .await .unwrap(); - pool.do_add_pending_transaction( - &payer, - UNCONFIRMED_BURN_AMOUNT, - txn.get_signature(), - Utc::now() - chrono::Duration::minutes(3), - ) - .await - .unwrap(); + pool.add_pending_transaction(&payer, UNCONFIRMED_BURN_AMOUNT, txn.get_signature()) + .await + .unwrap(); } // Confirm pending transactions - confirm_pending_txns(&pool, &solana_network, &Arc::new(Mutex::new(cache))) + confirm_pending_txns(&pool, &mock_network, &Arc::new(Mutex::new(cache))) .await .unwrap(); @@ -690,91 +686,3 @@ async fn test_pending_txns(pool: PgPool) -> anyhow::Result<()> { Ok(()) } - -#[sqlx::test] -async fn will_not_burn_when_pending_txns(pool: PgPool) -> anyhow::Result<()> { - let payer = PublicKeyBinary::from(vec![1]); - - const BURN_ONE: u64 = 20_000; - const BURN_TWO: u64 = 30_000; - const INITIAL_BALANCE: u64 = 500_000; - - // Add pending burns for both payer - let mut transaction = pool.begin().await.unwrap(); - transaction.add_burned_amount(&payer, BURN_ONE).await?; - transaction.commit().await.unwrap(); - - // Mark payer_one burn as pending - let signature = Signature::new_unique(); - pool.do_add_pending_transaction( - &payer, - BURN_ONE, - &signature, - Utc::now() - chrono::Duration::minutes(2), - ) - .await?; - - // Make the Burner - let mut solana_network = TestSolanaClientMap::fail_on_send(); // panic if txn is sent - solana_network.insert(payer.clone(), INITIAL_BALANCE).await; - let balance_cache = BalanceCache::new(&pool, solana_network.clone()).await?; - let mut burner = Burner::new( - pool.clone(), - &balance_cache, - Duration::default(), - solana_network.clone(), - ); - - // Add more burn that is not part of pending txn - let mut transaction = pool.begin().await.unwrap(); - transaction.add_burned_amount(&payer, BURN_TWO).await?; - transaction.commit().await.unwrap(); - - // Do the burn - // Burning will fail if a txn is sent - burner.burn().await?; - - // pending burns contains full amount - let next_burn = pool.fetch_next_burn().await?.expect("pending burn"); - assert_eq!(next_burn.payer, payer); - assert_eq!(next_burn.amount, BURN_ONE + BURN_TWO); - - // Ensure state is unchanged, no burns were added - let pending_txns = pool.fetch_all_pending_txns().await?; - assert_eq!(1, pending_txns.len(), "single txn after burn"); - let pending = pending_txns.first().expect("pending txn"); - assert_eq!(pending.payer, payer); - assert_eq!(pending.amount, BURN_ONE); - - // =========================================== - // The Burner did not add a new transaction when there was one in flight. - // Now we confirm the inflight transaction and make sure it will send the - // next one. - - // Confirm the pending transaction - confirm_pending_txns(&pool, &solana_network, &balance_cache.balances()).await?; - - // The pending burn amount has been deducted - let next_burn = pool.fetch_next_burn().await?.expect("valid pending burn"); - assert_eq!(next_burn.payer, payer); - assert_eq!(next_burn.amount, BURN_TWO); - - // Recreate a Burner with a Solana client that allows sending. - // All burns should be done after this. - let mut solana_network = TestSolanaClientMap::default(); - solana_network.insert(payer.clone(), INITIAL_BALANCE).await; - let mut burner = Burner::new( - pool.clone(), - &balance_cache, - Duration::default(), - solana_network.clone(), - ); - burner.burn().await?; - - assert!( - pool.fetch_next_burn().await?.is_none(), - "nothing left to burn" - ); - - Ok(()) -} diff --git a/mobile_packet_verifier/migrations/7_pending_txns.sql b/mobile_packet_verifier/migrations/7_pending_txns.sql deleted file mode 100644 index 017e0102b..000000000 --- a/mobile_packet_verifier/migrations/7_pending_txns.sql +++ /dev/null @@ -1,6 +0,0 @@ -CREATE TABLE pending_txns ( - signature TEXT PRIMARY KEY, - payer TEXT NOT NULL, - amount BIGINT NOT NULL, - time_of_submission TIMESTAMPTZ NOT NULL -); diff --git a/mobile_packet_verifier/migrations/8_pending_data_transfer_sessions.sql b/mobile_packet_verifier/migrations/8_pending_data_transfer_sessions.sql deleted file mode 100644 index a25e9dbd0..000000000 --- a/mobile_packet_verifier/migrations/8_pending_data_transfer_sessions.sql +++ /dev/null @@ -1,11 +0,0 @@ -CREATE TABLE pending_data_transfer_sessions ( - pub_key TEXT NOT NULL, - payer TEXT NOT NULL, - uploaded_bytes BIGINT NOT NULL, - downloaded_bytes BIGINT NOT NULL, - rewardable_bytes BIGINT NOT NULL, - first_timestamp TIMESTAMPTZ NOT NULL, - last_timestamp TIMESTAMPTZ NOT NULL, - signature TEXT NOT NULL, - PRIMARY KEY(pub_key, payer) -); diff --git a/mobile_packet_verifier/src/accumulate.rs b/mobile_packet_verifier/src/accumulate.rs index 7879a7739..9de1c6669 100644 --- a/mobile_packet_verifier/src/accumulate.rs +++ b/mobile_packet_verifier/src/accumulate.rs @@ -37,8 +37,7 @@ pub async fn accumulate_sessions( continue; } - pending_burns::save_data_transfer_session_req(&mut *txn, &report.report, curr_file_ts) - .await?; + pending_burns::save(&mut *txn, &report.report, curr_file_ts).await?; } Ok(()) diff --git a/mobile_packet_verifier/src/burner.rs b/mobile_packet_verifier/src/burner.rs index a2eea0b57..1c5296d5a 100644 --- a/mobile_packet_verifier/src/burner.rs +++ b/mobile_packet_verifier/src/burner.rs @@ -1,24 +1,33 @@ -use anyhow::Context; -use chrono::{Duration, Utc}; +use std::time::Duration; + use file_store::file_sink::FileSinkClient; use helium_crypto::PublicKeyBinary; use helium_proto::services::packet_verifier::ValidDataTransferSession; -use solana::{burn::SolanaNetwork, sender}; -use sqlx::PgPool; +use solana::{burn::SolanaNetwork, GetSignature, SolanaRpcError}; +use sqlx::{Pool, Postgres}; use tracing::Instrument; -use crate::{pending_burns, pending_txns}; +use crate::pending_burns; pub struct Burner { valid_sessions: FileSinkClient, solana: S, + failed_retry_attempts: usize, + failed_check_interval: Duration, } impl Burner { - pub fn new(valid_sessions: FileSinkClient, solana: S) -> Self { + pub fn new( + valid_sessions: FileSinkClient, + solana: S, + failed_retry_attempts: usize, + failed_check_interval: Duration, + ) -> Self { Self { valid_sessions, solana, + failed_retry_attempts, + failed_check_interval, } } } @@ -27,56 +36,7 @@ impl Burner where S: SolanaNetwork, { - pub async fn confirm_and_burn(&self, pool: &PgPool) -> anyhow::Result<()> { - self.confirm_pending_txns(pool) - .await - .context("confirming pending txns")?; - self.burn(pool).await.context("burning")?; - Ok(()) - } - - pub async fn confirm_pending_txns(&self, pool: &PgPool) -> anyhow::Result<()> { - let pending = pending_txns::fetch_all_pending_txns(pool).await?; - tracing::info!(count = pending.len(), "confirming pending txns"); - - for pending in pending { - // Sleep for at least a minute since the time of submission to - // give the transaction plenty of time to be confirmed: - let time_since_submission = Utc::now() - pending.time_of_submission; - if Duration::minutes(1) > time_since_submission { - let delay = Duration::minutes(1) - time_since_submission; - tracing::info!(?pending, %delay, "waiting to confirm pending txn"); - tokio::time::sleep(delay.to_std()?).await; - } - - let signature = pending.signature; - let confirmed = self.solana.confirm_transaction(&signature).await?; - tracing::info!(?pending, confirmed, "confirming pending transaction"); - if confirmed { - let sessions = - pending_txns::get_pending_data_sessions_for_signature(pool, &signature).await?; - for session in sessions { - let _write = self - .valid_sessions - .write(ValidDataTransferSession::from(session), &[]) - .await?; - } - pending_txns::remove_pending_txn_success(pool, &signature).await?; - } else { - pending_txns::remove_pending_txn_failure(pool, &signature).await?; - } - } - - Ok(()) - } - - pub async fn burn(&self, pool: &PgPool) -> anyhow::Result<()> { - let pending_txns = pending_txns::pending_txn_count(pool).await?; - if pending_txns > 0 { - tracing::error!(pending_txns, "ignoring burn"); - return Ok(()); - } - + pub async fn burn(&self, pool: &Pool) -> anyhow::Result<()> { for payer_pending_burn in pending_burns::get_all_payer_burns(pool).await? { let payer = payer_pending_burn.payer; let total_dcs = payer_pending_burn.total_dcs; @@ -96,119 +56,119 @@ where tracing::info!(%total_dcs, %payer, "Burning DC"); let txn = self.solana.make_burn_transaction(&payer, total_dcs).await?; - let store = BurnTxnStore::new( - pool.clone(), - payer.clone(), - total_dcs, - sessions, - self.valid_sessions.clone(), - ); - - let burn_span = tracing::info_span!("burn_txn", %payer, amount = total_dcs); - self.solana - .submit_transaction(&txn, &store) - .instrument(burn_span) - .await? + match self.solana.submit_transaction(&txn).await { + Ok(()) => { + handle_transaction_success( + pool, + payer, + total_dcs, + sessions, + &self.valid_sessions, + ) + .await?; + } + Err(err) => { + let span = tracing::info_span!( + "txn_confirmation", + signature = %txn.get_signature(), + %payer, + total_dcs, + max_attempts = self.failed_retry_attempts + ); + + // block on confirmation + self.transaction_confirmation_check(pool, err, txn, payer, total_dcs, sessions) + .instrument(span) + .await; + } + } } Ok(()) } -} -struct BurnTxnStore { - pool: PgPool, - payer: PublicKeyBinary, - amount: u64, - sessions: Vec, - valid_sessions: FileSinkClient, -} - -impl BurnTxnStore { - fn new( - pool: PgPool, + async fn transaction_confirmation_check( + &self, + pool: &Pool, + err: SolanaRpcError, + txn: S::Transaction, payer: PublicKeyBinary, - amount: u64, + total_dcs: u64, sessions: Vec, - valid_sessions: FileSinkClient, - ) -> Self { - Self { - pool, - payer, - amount, - sessions, - valid_sessions, - } - } -} - -#[async_trait::async_trait] -impl sender::TxnStore for BurnTxnStore { - async fn on_prepared(&self, txn: &solana::Transaction) -> sender::SenderResult<()> { - tracing::info!("txn prepared"); - - let signature = txn.get_signature(); - let add_pending = - pending_txns::add_pending_txn(&self.pool, &self.payer, self.amount, signature); - - match add_pending.await { - Ok(()) => {} - Err(err) => { - tracing::error!("failed to add pending transaction"); - return Err(sender::SenderError::preparation(&format!( - "could not add pending transaction: {err:?}" - ))); - } - } - - Ok(()) - } - - async fn on_finalized(&self, txn: &solana::Transaction) { - tracing::info!("txn finalized"); - metrics::counter!( - "burned", - "payer" => self.payer.to_string(), - "success" => "true" - ) - .increment(self.amount); - - // Delete from the data transfer session and write out to S3 - let remove_burn = pending_burns::delete_for_payer(&self.pool, &self.payer, self.amount); - if let Err(err) = remove_burn.await { - tracing::error!(?err, "failed to deduct finalized burn"); - } + ) { + tracing::warn!(?err, "starting txn confirmation check"); + // We don't know if the txn actually made it, maybe it did let signature = txn.get_signature(); - let remove_pending = pending_txns::remove_pending_txn_success(&self.pool, signature); - if let Err(err) = remove_pending.await { - tracing::error!(?err, "failed to remove successful pending txn"); - } - for session in self.sessions.iter() { - let session = session.to_owned(); - let write = self - .valid_sessions - .write(ValidDataTransferSession::from(session), &[]); - if let Err(err) = write.await { - tracing::error!(?err, "failed to write data session for finalized burn"); + let mut attempt = 0; + while attempt <= self.failed_retry_attempts { + tokio::time::sleep(self.failed_check_interval).await; + match self.solana.confirm_transaction(signature).await { + Ok(true) => { + tracing::debug!("txn confirmed on chain"); + let txn_success = handle_transaction_success( + pool, + payer, + total_dcs, + sessions, + &self.valid_sessions, + ) + .await; + if let Err(err) = txn_success { + tracing::error!(?err, "txn succeeded, something else failed"); + } + + return; + } + Ok(false) => { + tracing::info!(attempt, "txn not confirmed, yet..."); + attempt += 1; + continue; + } + Err(err) => { + // Client errors do not count against retry attempts + tracing::error!(?err, attempt, "failed to confirm txn"); + continue; + } } } - } - async fn on_error(&self, txn: &solana::Transaction, err: sender::SenderError) { - tracing::warn!(?err, "txn failed"); - - let signature = txn.get_signature(); - let remove_pending = pending_txns::remove_pending_txn_failure(&self.pool, signature); - if let Err(err) = remove_pending.await { - tracing::error!(?err, "failed to remove failed pending txn"); - } + tracing::warn!("failed to confirm txn"); + // We have failed to burn data credits: metrics::counter!( "burned", - "payer" => self.payer.to_string(), + "payer" => payer.to_string(), "success" => "false" ) - .increment(self.amount); + .increment(total_dcs); } } + +async fn handle_transaction_success( + pool: &Pool, + payer: PublicKeyBinary, + total_dcs: u64, + sessions: Vec, + valid_sessions: &FileSinkClient, +) -> Result<(), anyhow::Error> { + // We succesfully managed to burn data credits: + metrics::counter!( + "burned", + "payer" => payer.to_string(), + "success" => "true" + ) + .increment(total_dcs); + + // Delete from the data transfer session and write out to S3 + pending_burns::delete_for_payer(pool, &payer, total_dcs).await?; + + for session in sessions { + valid_sessions + .write(ValidDataTransferSession::from(session), &[]) + .await?; + } + + Ok(()) +} diff --git a/mobile_packet_verifier/src/daemon.rs b/mobile_packet_verifier/src/daemon.rs index 400bd97d5..6049a93d0 100644 --- a/mobile_packet_verifier/src/daemon.rs +++ b/mobile_packet_verifier/src/daemon.rs @@ -81,14 +81,14 @@ where biased; _ = &mut shutdown => return Ok(()), _ = sleep_until(burn_time) => { - match self.burner.confirm_and_burn(&self.pool).await { + // It's time to burn + match self.burner.burn(&self.pool).await { Ok(_) => { burn_time = Instant::now() + self.burn_period; - tracing::info!(next_burn = ?self.burn_period, "successful burn") } - Err(err) => { + Err(e) => { burn_time = Instant::now() + self.min_burn_period; - tracing::warn!(?err, next_burn = ?self.min_burn_period, "failed to confirm or burn"); + tracing::warn!("failed to burn {e:?}, re running burn in {:?} min", self.min_burn_period); } } } @@ -128,7 +128,7 @@ impl Cmd { bail!("Missing solana section in settings"); }; // Set up the solana RpcClient: - Some(SolanaRpc::new(solana_settings, solana::SubDao::Mobile).await?) + Some(SolanaRpc::new(solana_settings).await?) } else { None }; @@ -157,7 +157,12 @@ impl Cmd { ) .await?; - let burner = Burner::new(valid_sessions, solana); + let burner = Burner::new( + valid_sessions, + solana, + settings.txn_confirmation_retry_attempts, + settings.txn_confirmation_check_interval, + ); let file_store = FileStore::from_settings(&settings.ingest).await?; diff --git a/mobile_packet_verifier/src/lib.rs b/mobile_packet_verifier/src/lib.rs index 629ff83f4..9ddb71634 100644 --- a/mobile_packet_verifier/src/lib.rs +++ b/mobile_packet_verifier/src/lib.rs @@ -9,16 +9,8 @@ pub mod burner; pub mod daemon; pub mod event_ids; pub mod pending_burns; -pub mod pending_txns; pub mod settings; -const BYTES_PER_DC: u64 = 20_000; - -pub fn bytes_to_dc(bytes: u64) -> u64 { - let bytes = bytes.max(BYTES_PER_DC); - bytes.div_ceil(BYTES_PER_DC) -} - pub struct MobileConfigClients { gateway_client: client::GatewayClient, auth_client: client::AuthorizationClient, @@ -55,15 +47,3 @@ impl MobileConfigResolverExt for MobileConfigClients { .unwrap_or_default() } } - -#[cfg(test)] -mod tests { - use super::*; - - #[test] - fn test_bytes_to_dc() { - assert_eq!(1, bytes_to_dc(1)); - assert_eq!(1, bytes_to_dc(20_000)); - assert_eq!(2, bytes_to_dc(20_001)); - } -} diff --git a/mobile_packet_verifier/src/pending_burns.rs b/mobile_packet_verifier/src/pending_burns.rs index 94c8f725b..a360a4800 100644 --- a/mobile_packet_verifier/src/pending_burns.rs +++ b/mobile_packet_verifier/src/pending_burns.rs @@ -4,13 +4,11 @@ use chrono::{DateTime, Utc}; use file_store::{mobile_session::DataTransferSessionReq, traits::TimestampEncode}; use helium_crypto::PublicKeyBinary; use helium_proto::services::packet_verifier::ValidDataTransferSession; -use sqlx::{FromRow, Pool, Postgres, Row, Transaction}; - -use crate::bytes_to_dc; +use sqlx::{prelude::FromRow, Pool, Postgres, Row, Transaction}; const METRIC_NAME: &str = "pending_dc_burn"; -#[derive(Debug, FromRow, Clone)] +#[derive(FromRow)] pub struct DataTransferSession { pub_key: PublicKeyBinary, payer: PublicKeyBinary, @@ -44,7 +42,6 @@ impl From for ValidDataTransferSession { } } -#[derive(Debug)] pub struct PendingPayerBurn { pub payer: PublicKeyBinary, pub total_dcs: u64, @@ -114,60 +111,35 @@ pub async fn get_all_payer_burns(conn: &Pool) -> anyhow::Result, req: &DataTransferSessionReq, last_timestamp: DateTime, -) -> Result<(), sqlx::Error> { - save_data_transfer_session( - txn, - &DataTransferSession { - pub_key: req.data_transfer_usage.pub_key.clone(), - payer: req.data_transfer_usage.payer.clone(), - uploaded_bytes: req.data_transfer_usage.upload_bytes as i64, - downloaded_bytes: req.data_transfer_usage.download_bytes as i64, - rewardable_bytes: req.rewardable_bytes as i64, - // timestamps are the same upon ingest - first_timestamp: last_timestamp, - last_timestamp, - }, - ) - .await?; - +) -> anyhow::Result<()> { let dc_to_burn = bytes_to_dc(req.rewardable_bytes); - increment_metric(&req.data_transfer_usage.payer, dc_to_burn); - Ok(()) -} - -pub async fn save_data_transfer_session( - txn: &mut Transaction<'_, Postgres>, - data_transfer_session: &DataTransferSession, -) -> Result<(), sqlx::Error> { sqlx::query( r#" - INSERT INTO data_transfer_sessions - (pub_key, payer, uploaded_bytes, downloaded_bytes, rewardable_bytes, first_timestamp, last_timestamp) - VALUES - ($1, $2, $3, $4, $5, $6, $7) + INSERT INTO data_transfer_sessions (pub_key, payer, uploaded_bytes, downloaded_bytes, rewardable_bytes, first_timestamp, last_timestamp) + VALUES ($1, $2, $3, $4, $5, $6, $6) ON CONFLICT (pub_key, payer) DO UPDATE SET - uploaded_bytes = data_transfer_sessions.uploaded_bytes + EXCLUDED.uploaded_bytes, - downloaded_bytes = data_transfer_sessions.downloaded_bytes + EXCLUDED.downloaded_bytes, - rewardable_bytes = data_transfer_sessions.rewardable_bytes + EXCLUDED.rewardable_bytes, - first_timestamp = LEAST(data_transfer_sessions.first_timestamp, EXCLUDED.first_timestamp), - last_timestamp = GREATEST(data_transfer_sessions.last_timestamp, EXCLUDED.last_timestamp) + uploaded_bytes = data_transfer_sessions.uploaded_bytes + EXCLUDED.uploaded_bytes, + downloaded_bytes = data_transfer_sessions.downloaded_bytes + EXCLUDED.downloaded_bytes, + rewardable_bytes = data_transfer_sessions.rewardable_bytes + EXCLUDED.rewardable_bytes, + last_timestamp = GREATEST(data_transfer_sessions.last_timestamp, EXCLUDED.last_timestamp) "# ) - .bind(&data_transfer_session.pub_key) - .bind(&data_transfer_session.payer) - .bind(data_transfer_session.uploaded_bytes) - .bind(data_transfer_session.downloaded_bytes) - .bind(data_transfer_session.rewardable_bytes) - .bind(data_transfer_session.first_timestamp) - .bind(data_transfer_session.last_timestamp) + .bind(&req.data_transfer_usage.pub_key) + .bind(&req.data_transfer_usage.payer) + .bind(req.data_transfer_usage.upload_bytes as i64) + .bind(req.data_transfer_usage.download_bytes as i64) + .bind(req.rewardable_bytes as i64) + .bind(last_timestamp) .execute(txn) .await?; + increment_metric(&req.data_transfer_usage.payer, dc_to_burn); + Ok(()) } @@ -197,3 +169,22 @@ fn increment_metric(payer: &PublicKeyBinary, value: u64) { fn decrement_metric(payer: &PublicKeyBinary, value: u64) { metrics::gauge!(METRIC_NAME, "payer" => payer.to_string()).decrement(value as f64); } + +const BYTES_PER_DC: u64 = 20_000; + +fn bytes_to_dc(bytes: u64) -> u64 { + let bytes = bytes.max(BYTES_PER_DC); + bytes.div_ceil(BYTES_PER_DC) +} + +#[cfg(test)] +mod tests { + use super::*; + + #[test] + fn test_bytes_to_dc() { + assert_eq!(1, bytes_to_dc(1)); + assert_eq!(1, bytes_to_dc(20_000)); + assert_eq!(2, bytes_to_dc(20_001)); + } +} diff --git a/mobile_packet_verifier/src/pending_txns.rs b/mobile_packet_verifier/src/pending_txns.rs deleted file mode 100644 index 4f4572816..000000000 --- a/mobile_packet_verifier/src/pending_txns.rs +++ /dev/null @@ -1,182 +0,0 @@ -use chrono::{DateTime, Utc}; -use helium_crypto::PublicKeyBinary; -use solana::Signature; -use sqlx::{postgres::PgRow, FromRow, PgPool, Row}; - -use crate::pending_burns::{self, DataTransferSession}; - -#[derive(Debug)] -pub struct PendingTxn { - pub signature: Signature, - pub payer: PublicKeyBinary, - pub amount: u64, - pub time_of_submission: DateTime, -} - -impl FromRow<'_, PgRow> for PendingTxn { - fn from_row(row: &PgRow) -> sqlx::Result { - Ok(Self { - payer: row.try_get("payer")?, - amount: row.try_get::("amount")? as u64, - time_of_submission: row.try_get("time_of_submission")?, - signature: row - .try_get::("signature")? - .parse() - .map_err(|e| sqlx::Error::ColumnDecode { - index: "signature".to_string(), - source: Box::new(e), - })?, - }) - } -} - -pub async fn get_pending_data_sessions_for_signature( - conn: &PgPool, - signature: &Signature, -) -> anyhow::Result> { - let pending = sqlx::query_as( - r#" - SELECT * FROM pending_data_transfer_sessions - WHERE signature = $1 - "#, - ) - .bind(signature.to_string()) - .fetch_all(conn) - .await?; - Ok(pending) -} - -pub async fn pending_txn_count(conn: &PgPool) -> anyhow::Result { - // QUESTION: Pending Txns exists across two tables, - // `pending_data_transfer_sessions` and `pending_txns`. - // Do we want to be checking that both tables are empty? - let count: i64 = sqlx::query_scalar("SELECT COUNT(*) FROM pending_txns") - .fetch_one(conn) - .await?; - Ok(count as usize) -} - -pub async fn add_pending_txn( - conn: &PgPool, - payer: &PublicKeyBinary, - amount: u64, - signature: &Signature, -) -> Result<(), sqlx::Error> { - do_add_pending_txn(conn, payer, amount, signature, Utc::now()).await?; - Ok(()) -} - -pub async fn do_add_pending_txn( - conn: &PgPool, - payer: &PublicKeyBinary, - amount: u64, - signature: &Signature, - time_of_submission: DateTime, -) -> Result<(), sqlx::Error> { - let mut txn = conn.begin().await?; - sqlx::query( - r#" - INSERT INTO pending_txns (signature, payer, amount, time_of_submission) - VALUES ($1, $2, $3, $4) - "#, - ) - .bind(signature.to_string()) - .bind(payer) - .bind(amount as i64) - .bind(time_of_submission) - .execute(&mut *txn) - .await?; - - sqlx::query( - r#" - WITH moved_rows AS ( - DELETE FROM data_transfer_sessions - WHERE payer = $1 - RETURNING * - ) - INSERT INTO pending_data_transfer_sessions ( - pub_key, - payer, - uploaded_bytes, - downloaded_bytes, - rewardable_bytes, - first_timestamp, - last_timestamp, - signature - ) - SELECT - pub_key, - payer, - uploaded_bytes, - downloaded_bytes, - rewardable_bytes, - first_timestamp, - last_timestamp, - $2 - FROM moved_rows; - "#, - ) - .bind(payer) - .bind(signature.to_string()) - .execute(&mut *txn) - .await?; - - txn.commit().await?; - Ok(()) -} - -pub async fn remove_pending_txn_failure( - conn: &PgPool, - signature: &Signature, -) -> Result<(), sqlx::Error> { - let mut txn = conn.begin().await?; - sqlx::query("DELETE FROM pending_txns WHERE signature = $1") - .bind(signature.to_string()) - .execute(&mut *txn) - .await?; - - // Move pending data sessions back to the main table - let transfer_sessions: Vec = sqlx::query_as( - r#" - DELETE FROM pending_data_transfer_sessions - WHERE signature = $1 - RETURNING * - "#, - ) - .bind(signature.to_string()) - .fetch_all(&mut *txn) - .await?; - - for session in transfer_sessions.iter() { - pending_burns::save_data_transfer_session(&mut txn, session).await?; - } - - txn.commit().await?; - - Ok(()) -} - -pub async fn remove_pending_txn_success( - conn: &PgPool, - signature: &Signature, -) -> Result<(), sqlx::Error> { - let mut txn = conn.begin().await?; - sqlx::query("DELETE FROM pending_txns WHERE signature = $1") - .bind(signature.to_string()) - .execute(&mut *txn) - .await?; - - sqlx::query("DELETE FROM pending_data_transfer_sessions WHERE signature = $1") - .bind(signature.to_string()) - .execute(&mut *txn) - .await?; - - txn.commit().await?; - Ok(()) -} - -pub async fn fetch_all_pending_txns(conn: &PgPool) -> Result, sqlx::Error> { - sqlx::query_as("SELECT * from pending_txns") - .fetch_all(conn) - .await -} diff --git a/mobile_packet_verifier/src/settings.rs b/mobile_packet_verifier/src/settings.rs index 2cd97bf26..08ebd4052 100644 --- a/mobile_packet_verifier/src/settings.rs +++ b/mobile_packet_verifier/src/settings.rs @@ -34,6 +34,17 @@ pub struct Settings { pub purger_interval: Duration, #[serde(with = "humantime_serde", default = "default_purger_max_age")] pub purger_max_age: Duration, + /// When a burn transaction is not a success, how many times should + /// we try to confirm the transaction before considering it a failure? + #[serde(default = "default_txn_confirmation_retry_attempts")] + pub txn_confirmation_retry_attempts: usize, + /// When a burn transaction is not a success, how long should we + /// wait between trying to confirm if the transaction made it to Solana? + #[serde( + with = "humantime_serde", + default = "default_txn_confirmation_check_interval" + )] + pub txn_confirmation_check_interval: Duration, } fn default_purger_interval() -> Duration { @@ -44,6 +55,14 @@ fn default_purger_max_age() -> Duration { humantime::parse_duration("24 hours").unwrap() } +fn default_txn_confirmation_check_interval() -> Duration { + humantime::parse_duration("1 min").unwrap() +} + +fn default_txn_confirmation_retry_attempts() -> usize { + 5 +} + fn default_start_after() -> DateTime { DateTime::UNIX_EPOCH } diff --git a/mobile_packet_verifier/tests/integration_tests.rs b/mobile_packet_verifier/tests/integration_tests.rs deleted file mode 100644 index c237cd07f..000000000 --- a/mobile_packet_verifier/tests/integration_tests.rs +++ /dev/null @@ -1,408 +0,0 @@ -use chrono::Utc; -use file_store::{ - file_sink::{FileSinkClient, Message}, - mobile_session::{DataTransferEvent, DataTransferSessionReq}, -}; -use helium_crypto::PublicKeyBinary; -use helium_proto::services::{ - packet_verifier::ValidDataTransferSession, poc_mobile::DataTransferRadioAccessTechnology, -}; -use mobile_packet_verifier::{burner::Burner, bytes_to_dc, pending_burns, pending_txns}; -use solana::{ - burn::{test_client::TestSolanaClientMap, SolanaNetwork}, - Signature, -}; -use sqlx::PgPool; -use tokio::sync::mpsc::{self, error::TryRecvError}; - -#[sqlx::test] -fn burn_checks_for_sufficient_balance(pool: PgPool) -> anyhow::Result<()> { - let payer_insufficient = PublicKeyBinary::from(vec![1]); - let payer_sufficient = PublicKeyBinary::from(vec![2]); - const ORIGINAL_BALANCE: u64 = 10_000; - - // Initialize payers with balances - let mut solana_network = TestSolanaClientMap::default(); - solana_network - .insert(payer_insufficient.clone(), ORIGINAL_BALANCE) - .await; - solana_network - .insert(payer_sufficient.clone(), ORIGINAL_BALANCE) - .await; - - // Add Data Transfer Sessiosn for both payers - save_data_transfer_sessions( - &pool, - &[ - (&payer_insufficient, &payer_insufficient, 1_000_000_000), // exceed balance - (&payer_sufficient, &payer_sufficient, 1_000_000), // within balance - ], - ) - .await?; - - // Ensure we see 2 pending burns - let pre_burns = pending_burns::get_all_payer_burns(&pool).await?; - assert_eq!(pre_burns.len(), 2, "2 burns for 2 payers"); - - // Burn what we can - let mut burner = TestBurner::new(solana_network.clone()); - burner.burn(&pool).await?; - - // 1 burn succeeded, the other payer has insufficient balance - let burns = pending_burns::get_all_payer_burns(&pool).await?; - assert_eq!(burns.len(), 1, "1 burn left"); - - // Ensure no pending transactions - let pending = pending_txns::fetch_all_pending_txns(&pool).await?; - assert!(pending.is_empty(), "pending txn should be removed"); - - // Ensure balance for payers through solana mock - assert_eq!( - solana_network.get_payer_balance(&payer_insufficient).await, - ORIGINAL_BALANCE, - "original balance" - ); - assert!( - solana_network.get_payer_balance(&payer_sufficient).await < ORIGINAL_BALANCE, - "reduced balance" - ); - - // Ensure successful data transfer sessions were output - let written_sessions = burner.get_written_sessions(); - assert_eq!(written_sessions.len(), 1, "1 data transfer session written"); - - Ok(()) -} - -#[sqlx::test] -async fn test_confirm_pending_txns(pool: PgPool) -> anyhow::Result<()> { - let payer_one = PublicKeyBinary::from(vec![1]); - let payer_two = PublicKeyBinary::from(vec![2]); - - let mut solana_network = TestSolanaClientMap::default(); - solana_network.insert(payer_one.clone(), 10_000).await; - - save_data_transfer_sessions( - &pool, - &[ - (&payer_one, &payer_one, 1_000), - (&payer_two, &payer_two, 1_000), - ], - ) - .await?; - - let burns = pending_burns::get_all_payer_burns(&pool).await?; - assert_eq!(burns.len(), 2, "two burns for two payers"); - - // First transaction is confirmed - // Make submission time in past to bypass confirm txn sleep - let confirmed_signature = Signature::new_unique(); - pending_txns::do_add_pending_txn( - &pool, - &payer_one, - 1_000, - &confirmed_signature, - Utc::now() - chrono::Duration::minutes(2), - ) - .await?; - - // Second transaction is unconfirmed - // Make submission time in past to bypass confirm txn sleep - let unconfirmed_signature = Signature::new_unique(); - pending_txns::do_add_pending_txn( - &pool, - &payer_two, - 500, - &unconfirmed_signature, - Utc::now() - chrono::Duration::minutes(2), - ) - .await?; - - // Tell Mock Solana which txn to confirm - solana_network.add_confirmed(confirmed_signature).await; - // solana_network.add_confirmed(unconfirmed_txn).await; // uncomment for failure - - assert_eq!(pending_txns::fetch_all_pending_txns(&pool).await?.len(), 2); - let burner = TestBurner::new(solana_network); - burner.confirm_pending_txns(&pool).await?; - // confirmed and unconfirmed txns have been cleared - assert_eq!(pending_txns::fetch_all_pending_txns(&pool).await?.len(), 0); - - // The unconfirmed txn is moved back to ready for burning - let burns = pending_burns::get_all_payer_burns(&pool).await?; - assert_eq!(burns.len(), 1, "the unconfirmed txn has moved back to burn"); - - let payer_burn = &burns[0]; - assert_eq!(payer_burn.payer, payer_two); - assert_eq!(payer_burn.total_dcs, bytes_to_dc(2_000)); - assert_eq!(payer_burn.sessions.len(), 1); - - Ok(()) -} - -#[sqlx::test] -fn confirming_pending_txns_writes_out_sessions(pool: PgPool) -> anyhow::Result<()> { - // Insert a pending txn for some sessions. - // Insert more sessions after the pending txn. - // confirm txns and ensure sessions are written. - // ensure sessions not written for that pending txn are still there. - - let payer = PublicKeyBinary::from(vec![0]); - let pubkey_one = PublicKeyBinary::from(vec![1]); - let pubkey_two = PublicKeyBinary::from(vec![2]); - - // Add a transfer session for payer - save_data_transfer_sessions( - &pool, - &[(&payer, &pubkey_one, 1_000), (&payer, &pubkey_two, 1_000)], - ) - .await?; - - // Mark the session as pending - let signature = Signature::new_unique(); - pending_txns::do_add_pending_txn( - &pool, - &payer, - 1_000, - &signature, - Utc::now() - chrono::Duration::minutes(2), - ) - .await?; - - // Add another transfer session that should not be written out - save_data_transfer_sessions( - &pool, - &[(&payer, &pubkey_one, 5_000), (&payer, &pubkey_two, 5_000)], - ) - .await?; - - let solana_network = TestSolanaClientMap::default(); - let mut burner = TestBurner::new(solana_network); - burner.confirm_pending_txns(&pool).await?; - - // In flight session is written out - let written_sessions = burner.get_written_sessions(); - assert_eq!(written_sessions.len(), 2, "2 data transfer session written"); - - // Late added session is still waiting for burn - let payer_burns = pending_burns::get_all_payer_burns(&pool).await?; - assert_eq!(payer_burns.len(), 1); - - let payer_burn = &payer_burns[0]; - assert_eq!(payer_burn.payer, payer); - // DC is calculated for each session individually, then summed - assert_eq!( - payer_burn.total_dcs, - bytes_to_dc(5_000) + bytes_to_dc(5_000), - ); - assert_eq!(payer_burn.sessions.len(), 2); - - Ok(()) -} - -#[sqlx::test] -fn unconfirmed_pending_txn_moves_data_session_back_to_primary_table( - pool: PgPool, -) -> anyhow::Result<()> { - // After making a pending_txn, and the data sessions are moved for - // processing If the txn cannot be finalized, the data sessions that were - // going to be written need to be moved back to being considerd for burn. - - let var_name = PublicKeyBinary::from(vec![0]); - let payer = var_name; - let pubkey_one = PublicKeyBinary::from(vec![1]); - let pubkey_two = PublicKeyBinary::from(vec![2]); - - // Insert sessions - save_data_transfer_sessions( - &pool, - &[(&payer, &pubkey_one, 1_000), (&payer, &pubkey_two, 1_000)], - ) - .await?; - - // Mark as pending txns - let signature = Signature::new_unique(); - pending_txns::do_add_pending_txn( - &pool, - &payer, - 1_000, - &signature, - Utc::now() - chrono::Duration::minutes(2), - ) - .await?; - - // Insert more sessions - save_data_transfer_sessions( - &pool, - &[(&payer, &pubkey_one, 5_000), (&payer, &pubkey_two, 5_000)], - ) - .await?; - - // There are sessions for burning, but we cannot because there are also pending txns. - let payer_burns = pending_burns::get_all_payer_burns(&pool).await?; - assert_eq!( - payer_burns.len(), - 1, - "still have sessions ready for burning" - ); - assert_eq!( - payer_burns[0].total_dcs, - bytes_to_dc(5_000) + bytes_to_dc(5_000) - ); - - let txn_count = pending_txns::pending_txn_count(&pool).await?; - assert_eq!(txn_count, 1, "there should be a single pending txn"); - - // Fail the pending txns - let mut solana_network = TestSolanaClientMap::default(); - // Adding a random confirmed txn will cause other txns to not be considered finalized - solana_network.add_confirmed(Signature::new_unique()).await; - - let burner = TestBurner::new(solana_network); - burner.confirm_pending_txns(&pool).await?; - - // Sessions are merged with 2nd set of sessions for burning - let txn_count = pending_txns::pending_txn_count(&pool).await?; - assert_eq!(txn_count, 0, "should be no more pending txns"); - - // There is still only 1 payer burn, but the amount to burn contains both sets of sessions. - let payer_burns = pending_burns::get_all_payer_burns(&pool).await?; - assert_eq!(payer_burns.len(), 1, "still have 1 burn to go"); - assert_eq!( - payer_burns[0].total_dcs, - bytes_to_dc(5_000) + bytes_to_dc(5_000) - ); - - Ok(()) -} - -#[sqlx::test] -fn will_not_burn_when_pending_txns(pool: PgPool) -> anyhow::Result<()> { - // Trigger a burn when there are data sessions that can be burned _and_ pending txns. - // Nothing should happen until the pending_txns are gone. - - let payer = PublicKeyBinary::from(vec![0]); - let pubkey_one = PublicKeyBinary::from(vec![1]); - let pubkey_two = PublicKeyBinary::from(vec![2]); - - // Add a transfer session for payer - save_data_transfer_sessions( - &pool, - &[(&payer, &pubkey_one, 1_000), (&payer, &pubkey_two, 1_000)], - ) - .await?; - - // Mark the session as pending - let signature = Signature::new_unique(); - pending_txns::do_add_pending_txn( - &pool, - &payer, - 1_000, - &signature, - Utc::now() - chrono::Duration::minutes(2), - ) - .await?; - - // Add more sessions that are ready for burning - save_data_transfer_sessions( - &pool, - &[(&payer, &pubkey_one, 5_000), (&payer, &pubkey_two, 5_000)], - ) - .await?; - - // Burn does nothing because of pending transactions - let mut solana_network = TestSolanaClientMap::default(); - solana_network.insert(payer.clone(), 10_000).await; - - let mut burner = TestBurner::new(solana_network); - burner.burn(&pool).await?; - - // No sessions written because of pending txn - burner.assert_no_sessions_written(); - - // Remove pending burn. - // Data Transfer Sessions should go through now. - pending_txns::remove_pending_txn_success(&pool, &signature).await?; - burner.burn(&pool).await?; - - let written_sessions = burner.get_written_sessions(); - assert_eq!(written_sessions.len(), 2, "2 data transfer session written"); - - Ok(()) -} - -fn mk_data_transfer_session( - payer_key: &PublicKeyBinary, - pubkey: &PublicKeyBinary, - rewardable_bytes: u64, -) -> DataTransferSessionReq { - DataTransferSessionReq { - data_transfer_usage: DataTransferEvent { - pub_key: pubkey.clone(), - upload_bytes: rewardable_bytes / 2, - download_bytes: rewardable_bytes / 2, - radio_access_technology: DataTransferRadioAccessTechnology::Wlan, - event_id: "event-id".to_string(), - payer: payer_key.clone(), - timestamp: Utc::now(), - signature: vec![], - }, - rewardable_bytes, - pub_key: pubkey.clone(), - signature: vec![], - } -} - -async fn save_data_transfer_sessions( - pool: &PgPool, - sessions: &[(&PublicKeyBinary, &PublicKeyBinary, u64)], -) -> anyhow::Result<()> { - let mut txn = pool.begin().await?; - for (payer, pubkey, amount) in sessions { - let session = mk_data_transfer_session(payer, pubkey, *amount); - pending_burns::save_data_transfer_session_req(&mut txn, &session, Utc::now()).await?; - } - txn.commit().await?; - - Ok(()) -} - -struct TestBurner { - burner: Burner, - rx: mpsc::Receiver>, -} - -impl TestBurner { - fn new(solana: S) -> Self { - let (valid_sessions_tx, rx) = tokio::sync::mpsc::channel(10); - let valid_sessions = FileSinkClient::new(valid_sessions_tx, "test"); - Self { - burner: Burner::new(valid_sessions, solana), - rx, - } - } - - async fn burn(&self, pool: &PgPool) -> anyhow::Result<()> { - self.burner.burn(pool).await - } - - async fn confirm_pending_txns(&self, pool: &PgPool) -> anyhow::Result<()> { - self.burner.confirm_pending_txns(pool).await - } - - fn get_written_sessions(&mut self) -> Vec> { - let mut written_sessions = vec![]; - while let Ok(session) = self.rx.try_recv() { - written_sessions.push(session); - } - written_sessions - } - - fn assert_no_sessions_written(&mut self) { - match self.rx.try_recv() { - Ok(_) => panic!("nothing should be written"), - Err(TryRecvError::Disconnected) => panic!("file sink client was incorrectly dropped"), - Err(TryRecvError::Empty) => (), - } - } -} diff --git a/solana/Cargo.toml b/solana/Cargo.toml index 9ef10c7e8..b63b87618 100644 --- a/solana/Cargo.toml +++ b/solana/Cargo.toml @@ -8,24 +8,22 @@ license.workspace = true [dependencies] anyhow = { workspace = true } -async-trait = { workspace = true } +async-trait = {workspace = true} anchor-client = { workspace = true } -clap = { workspace = true } -chrono = { workspace = true } -file-store = { path = "../file_store" } -futures = { workspace = true } -helium-anchor-gen = { workspace = true } -helium-crypto = { workspace = true } -helium-lib = { workspace = true } -itertools = { workspace = true } -metrics = { workspace = true } -serde = { workspace = true } -sha2 = { workspace = true } -solana-client = { workspace = true } -solana-program = { workspace = true } -solana-sdk = { workspace = true } -spl-token = { workspace = true } -thiserror = { workspace = true } -tokio = { workspace = true } -tracing = { workspace = true } -exponential-backoff = "2.0.0" +clap = {workspace = true} +chrono = {workspace = true} +file-store = {path = "../file_store"} +futures = {workspace = true} +helium-anchor-gen = {workspace = true} +helium-crypto = {workspace = true} +itertools = {workspace = true} +metrics = {workspace = true} +serde = {workspace = true} +sha2 = {workspace = true} +solana-client = {workspace = true} +solana-program = {workspace = true} +solana-sdk = {workspace = true} +spl-token = {workspace = true} +thiserror = {workspace = true} +tokio = {workspace = true} +tracing = {workspace = true} diff --git a/solana/src/burn.rs b/solana/src/burn.rs index bafbbbaa6..bcdfcd9fc 100644 --- a/solana/src/burn.rs +++ b/solana/src/burn.rs @@ -1,18 +1,35 @@ -use crate::{ - read_keypair_from_file, sender, GetSignature, Keypair, SolanaRpcError, SubDao, Transaction, -}; +use crate::{send_with_retry, GetSignature, SolanaRpcError}; +use anchor_client::RequestBuilder; use async_trait::async_trait; +use chrono::{DateTime, Utc}; +use helium_anchor_gen::{ + anchor_lang::{AccountDeserialize, ToAccountMetas}, + data_credits::{self, accounts, instruction}, + helium_sub_daos::{self, DaoV0, SubDaoV0}, +}; use helium_crypto::PublicKeyBinary; -use helium_lib::{client, dc, token, TransactionOpts}; +use itertools::Itertools; use serde::Deserialize; -use solana_client::rpc_config::RpcSendTransactionConfig; -use solana_sdk::{commitment_config::CommitmentConfig, signature::Signature}; -use std::str::FromStr; -use std::sync::Arc; +use sha2::{Digest, Sha256}; +use solana_client::{ + client_error::ClientError, nonblocking::rpc_client::RpcClient, rpc_response::Response, +}; +use solana_sdk::{ + commitment_config::CommitmentConfig, + compute_budget::ComputeBudgetInstruction, + program_pack::Pack, + pubkey::Pubkey, + signature::{read_keypair_file, Keypair, Signature}, + signer::Signer, + transaction::Transaction, +}; +use std::{collections::HashMap, str::FromStr}; +use std::{sync::Arc, time::SystemTime}; +use tokio::sync::Mutex; #[async_trait] -pub trait SolanaNetwork: Send + Sync + 'static { - type Transaction: Send + Sync + 'static; +pub trait SolanaNetwork: Clone + Send + Sync + 'static { + type Transaction: GetSignature + Send + Sync + 'static; async fn payer_balance(&self, payer: &PublicKeyBinary) -> Result; @@ -25,7 +42,6 @@ pub trait SolanaNetwork: Send + Sync + 'static { async fn submit_transaction( &self, transaction: &Self::Transaction, - store: &impl sender::TxnStore, ) -> Result<(), SolanaRpcError>; async fn confirm_transaction(&self, txn: &Signature) -> Result; @@ -34,15 +50,17 @@ pub trait SolanaNetwork: Send + Sync + 'static { #[derive(Debug, Deserialize)] pub struct Settings { rpc_url: String, + cluster: String, burn_keypair: String, + dc_mint: String, + dnt_mint: String, #[serde(default)] payers_to_monitor: Vec, - #[serde(default = "default_min_priority_fee")] + #[serde(default = "min_priority_fee")] min_priority_fee: u64, - skip_preflight: bool, } -fn default_min_priority_fee() -> u64 { +fn min_priority_fee() -> u64 { 1 } @@ -56,74 +74,79 @@ impl Settings { } } +#[derive(Clone)] pub struct SolanaRpc { - sub_dao: SubDao, - provider: client::SolanaRpcClient, - keypair: Keypair, + provider: Arc, + program_cache: BurnProgramCache, + cluster: String, + keypair: [u8; 64], payers_to_monitor: Vec, - transaction_opts: TransactionOpts, - skip_preflight: bool, + priority_fee: PriorityFee, + min_priority_fee: u64, } impl SolanaRpc { - pub async fn new(settings: &Settings, sub_dao: SubDao) -> Result, SolanaRpcError> { - let keypair = read_keypair_from_file(&settings.burn_keypair)?; - let provider = client::SolanaRpcClient::new_with_commitment( - settings.rpc_url.clone(), - CommitmentConfig::finalized(), - ); - - tracing::info!( - min_priority_fee = settings.min_priority_fee, - skip_preflight = settings.skip_preflight, - "initialize solana" - ); - + pub async fn new(settings: &Settings) -> Result, SolanaRpcError> { + let dc_mint = settings.dc_mint.parse()?; + let dnt_mint = settings.dnt_mint.parse()?; + let Ok(keypair) = read_keypair_file(&settings.burn_keypair) else { + return Err(SolanaRpcError::FailedToReadKeypairError( + settings.burn_keypair.to_owned(), + )); + }; + let provider = + RpcClient::new_with_commitment(settings.rpc_url.clone(), CommitmentConfig::finalized()); + let program_cache = BurnProgramCache::new(&provider, dc_mint, dnt_mint).await?; + if program_cache.dc_burn_authority != keypair.pubkey() { + return Err(SolanaRpcError::InvalidKeypair); + } Ok(Arc::new(Self { - sub_dao, - provider, - keypair, + cluster: settings.cluster.clone(), + provider: Arc::new(provider), + program_cache, + keypair: keypair.to_bytes(), payers_to_monitor: settings.payers_to_monitor()?, - skip_preflight: settings.skip_preflight, - transaction_opts: TransactionOpts { - min_priority_fee: settings.min_priority_fee, - ..Default::default() - }, + priority_fee: PriorityFee::default(), + min_priority_fee: settings.min_priority_fee, })) } } -impl AsRef for SolanaRpc { - fn as_ref(&self) -> &client::SolanaRpcClient { - &self.provider - } -} - #[async_trait] impl SolanaNetwork for SolanaRpc { type Transaction = Transaction; async fn payer_balance(&self, payer: &PublicKeyBinary) -> Result { - let delegated_dc_key = self.sub_dao.delegated_dc_key(payer); - let escrow_account = self.sub_dao.escrow_key(&delegated_dc_key); - - let amount = match token::balance_for_address(&self, &escrow_account).await? { - Some(token_balance) => token_balance.amount.amount, - None => { - tracing::info!(%payer, "Account not found, no balance"); - 0 + let ddc_key = delegated_data_credits(&self.program_cache.sub_dao, payer); + let (escrow_account, _) = Pubkey::find_program_address( + &["escrow_dc_account".as_bytes(), &ddc_key.to_bytes()], + &data_credits::ID, + ); + let account_data = match self + .provider + .get_account_with_commitment(&escrow_account, CommitmentConfig::finalized()) + .await? + { + Response { value: None, .. } => { + tracing::info!(%payer, "Account not found, therefore no balance"); + return Ok(0); } + Response { + value: Some(account), + .. + } => account.data, }; + let account_layout = spl_token::state::Account::unpack(account_data.as_slice())?; if self.payers_to_monitor.contains(payer) { metrics::gauge!( "balance", "payer" => payer.to_string() ) - .set(amount as f64); + .set(account_layout.amount as f64); } - Ok(amount) + Ok(account_layout.amount) } async fn make_burn_transaction( @@ -131,41 +154,119 @@ impl SolanaNetwork for SolanaRpc { payer: &PublicKeyBinary, amount: u64, ) -> Result { - let tx = dc::burn_delegated( - self, - self.sub_dao, - &self.keypair, - amount, - payer, - &self.transaction_opts, - ) - .await?; - tracing::info!( - amount, - %payer, - sub_dao = %self.sub_dao, - min_priority_fee = self.transaction_opts.min_priority_fee, - "created burn txn" + // Fetch the sub dao epoch info: + const EPOCH_LENGTH: u64 = 60 * 60 * 24; + let epoch = SystemTime::now() + .duration_since(SystemTime::UNIX_EPOCH)? + .as_secs() + / EPOCH_LENGTH; + let (sub_dao_epoch_info, _) = Pubkey::find_program_address( + &[ + "sub_dao_epoch_info".as_bytes(), + self.program_cache.sub_dao.as_ref(), + &epoch.to_le_bytes(), + ], + &helium_sub_daos::ID, + ); + + // Fetch escrow account + let ddc_key = delegated_data_credits(&self.program_cache.sub_dao, payer); + let (escrow_account, _) = Pubkey::find_program_address( + &["escrow_dc_account".as_bytes(), &ddc_key.to_bytes()], + &data_credits::ID, ); - Ok(tx.into()) + let accounts = accounts::BurnDelegatedDataCreditsV0 { + sub_dao_epoch_info, + dao: self.program_cache.dao, + sub_dao: self.program_cache.sub_dao, + account_payer: self.program_cache.account_payer, + data_credits: self.program_cache.data_credits, + delegated_data_credits: delegated_data_credits(&self.program_cache.sub_dao, payer), + token_program: spl_token::id(), + helium_sub_daos_program: helium_sub_daos::id(), + system_program: solana_program::system_program::id(), + dc_burn_authority: self.program_cache.dc_burn_authority, + dc_mint: self.program_cache.dc_mint, + escrow_account, + registrar: self.program_cache.registrar, + }; + + let priority_fee_accounts: Vec<_> = accounts + .to_account_metas(None) + .into_iter() + .map(|x| x.pubkey) + .unique() + .take(MAX_RECENT_PRIORITY_FEE_ACCOUNTS) + .collect(); + + // Get a new priority fee. Can't be done in Sync land + let priority_fee = self + .priority_fee + .get_estimate( + &self.provider, + &priority_fee_accounts, + self.min_priority_fee, + ) + .await?; + + tracing::info!(%priority_fee); + + // This is Sync land: anything async in here will error. + let instructions = { + let request = RequestBuilder::from( + data_credits::id(), + &self.cluster, + std::rc::Rc::new(Keypair::from_bytes(&self.keypair).unwrap()), + Some(CommitmentConfig::finalized()), + ); + + let args = instruction::BurnDelegatedDataCreditsV0 { + _args: data_credits::BurnDelegatedDataCreditsArgsV0 { amount }, + }; + + // As far as I can tell, the instructions function does not actually have any + // error paths. + request + // Set priority fees: + .instruction(ComputeBudgetInstruction::set_compute_unit_limit(300_000)) + .instruction(ComputeBudgetInstruction::set_compute_unit_price( + priority_fee, + )) + // Create burn transaction + .accounts(accounts) + .args(args) + .instructions() + .unwrap() + }; + + let blockhash = self.provider.get_latest_blockhash().await?; + let signer = Keypair::from_bytes(&self.keypair).unwrap(); + + Ok(Transaction::new_signed_with_payer( + &instructions, + Some(&signer.pubkey()), + &[&signer], + blockhash, + )) } - async fn submit_transaction( - &self, - tx: &Self::Transaction, - store: &impl sender::TxnStore, - ) -> Result<(), SolanaRpcError> { - let config = RpcSendTransactionConfig { - skip_preflight: self.skip_preflight, + async fn submit_transaction(&self, tx: &Self::Transaction) -> Result<(), SolanaRpcError> { + let config = solana_client::rpc_config::RpcSendTransactionConfig { + skip_preflight: true, ..Default::default() }; - match sender::send_and_finalize(&self, tx, config, store).await { - Ok(_tracked) => { - let signature = tx.get_signature(); + match send_with_retry!(self + .provider + .send_and_confirm_transaction_with_spinner_and_config( + tx, + CommitmentConfig::finalized(), + config, + )) { + Ok(signature) => { tracing::info!( transaction = %signature, - "Data credit burn successful" + "Data credit burn successful", ); Ok(()) } @@ -194,6 +295,122 @@ impl SolanaNetwork for SolanaRpc { } } +#[derive(Default, Clone)] +pub struct PriorityFee { + last_estimate: Arc>, +} + +pub const MAX_RECENT_PRIORITY_FEE_ACCOUNTS: usize = 128; + +impl PriorityFee { + pub async fn get_estimate( + &self, + provider: &RpcClient, + accounts: &[Pubkey], + min_priority_fee: u64, + ) -> Result { + let mut last_estimate = self.last_estimate.lock().await; + match last_estimate.time_taken { + Some(time_taken) if (Utc::now() - time_taken) < chrono::Duration::minutes(15) => { + return Ok(last_estimate.fee_estimate) + } + _ => (), + } + // Find a new estimate + let time_taken = Utc::now(); + let recent_fees = provider.get_recent_prioritization_fees(accounts).await?; + let mut max_per_slot = Vec::new(); + for (slot, fees) in &recent_fees.into_iter().group_by(|x| x.slot) { + let Some(maximum) = fees.map(|x| x.prioritization_fee).max() else { + continue; + }; + max_per_slot.push((slot, maximum)); + } + // Only take the most recent 20 maximum fees: + max_per_slot.sort_by(|a, b| a.0.cmp(&b.0).reverse()); + let mut max_per_slot: Vec<_> = max_per_slot.into_iter().take(20).map(|x| x.1).collect(); + max_per_slot.sort(); + // Get the median: + let num_recent_fees = max_per_slot.len(); + let mid = num_recent_fees / 2; + let estimate = if num_recent_fees == 0 { + min_priority_fee + } else if num_recent_fees % 2 == 0 { + // If the number of samples is even, taken the mean of the two median fees + (max_per_slot[mid - 1] + max_per_slot[mid]) / 2 + } else { + max_per_slot[mid] + } + .max(min_priority_fee); + *last_estimate = LastEstimate::new(time_taken, estimate); + Ok(estimate) + } +} + +#[derive(Copy, Clone, Default)] +pub struct LastEstimate { + time_taken: Option>, + fee_estimate: u64, +} + +impl LastEstimate { + fn new(time_taken: DateTime, fee_estimate: u64) -> Self { + Self { + time_taken: Some(time_taken), + fee_estimate, + } + } +} + +/// Cached pubkeys for the burn program +#[derive(Clone)] +pub struct BurnProgramCache { + pub account_payer: Pubkey, + pub data_credits: Pubkey, + pub sub_dao: Pubkey, + pub dao: Pubkey, + pub dc_mint: Pubkey, + pub dc_burn_authority: Pubkey, + pub registrar: Pubkey, +} + +impl BurnProgramCache { + pub async fn new( + provider: &RpcClient, + dc_mint: Pubkey, + dnt_mint: Pubkey, + ) -> Result { + let (account_payer, _) = + Pubkey::find_program_address(&["account_payer".as_bytes()], &data_credits::ID); + let (data_credits, _) = + Pubkey::find_program_address(&["dc".as_bytes(), dc_mint.as_ref()], &data_credits::ID); + let (sub_dao, _) = Pubkey::find_program_address( + &["sub_dao".as_bytes(), dnt_mint.as_ref()], + &helium_sub_daos::ID, + ); + let (dao, dc_burn_authority) = { + let account_data = provider.get_account_data(&sub_dao).await?; + let mut account_data = account_data.as_ref(); + let sub_dao = SubDaoV0::try_deserialize(&mut account_data)?; + (sub_dao.dao, sub_dao.dc_burn_authority) + }; + let registrar = { + let account_data = provider.get_account_data(&dao).await?; + let mut account_data = account_data.as_ref(); + DaoV0::try_deserialize(&mut account_data)?.registrar + }; + Ok(Self { + account_payer, + data_credits, + sub_dao, + dao, + dc_mint, + dc_burn_authority, + registrar, + }) + } +} + const FIXED_BALANCE: u64 = 1_000_000_000; pub enum PossibleTransaction { @@ -239,11 +456,10 @@ impl SolanaNetwork for Option> { async fn submit_transaction( &self, transaction: &Self::Transaction, - store: &impl sender::TxnStore, ) -> Result<(), SolanaRpcError> { match (self, transaction) { (Some(ref rpc), PossibleTransaction::Transaction(ref txn)) => { - rpc.submit_transaction(txn, store).await? + rpc.submit_transaction(txn).await? } (None, PossibleTransaction::NoTransaction(_)) => (), _ => unreachable!(), @@ -260,162 +476,60 @@ impl SolanaNetwork for Option> { } } -pub mod test_client { - use std::{ - collections::{HashMap, HashSet}, - sync::Arc, - time::Instant, - }; - - use helium_crypto::PublicKeyBinary; - use helium_lib::keypair::Signature; - use solana_client::rpc_config::RpcSendTransactionConfig; - use tokio::sync::Mutex; - - use crate::{sender, SolanaRpcError, SolanaTransaction, Transaction}; - - use super::SolanaNetwork; - - #[derive(Debug, Clone)] - pub struct TestSolanaClientMap { - payer_balances: Arc>>, - txn_sig_to_payer: Arc>>, - confirm_all_txns: bool, - fail_on_submit_txn: bool, - confirmed_txns: Arc>>, - // Using the nanoseconds since the client was made as block height - block_height: Instant, - } +pub struct MockTransaction { + pub signature: Signature, + pub payer: PublicKeyBinary, + pub amount: u64, +} - impl Default for TestSolanaClientMap { - fn default() -> Self { - Self { - payer_balances: Default::default(), - txn_sig_to_payer: Default::default(), - block_height: Instant::now(), - confirm_all_txns: true, - fail_on_submit_txn: false, - confirmed_txns: Default::default(), - } - } +impl GetSignature for MockTransaction { + fn get_signature(&self) -> &Signature { + &self.signature } +} - impl TestSolanaClientMap { - pub fn fail_on_send() -> Self { - Self { - fail_on_submit_txn: true, - ..Default::default() - } - } - pub fn new(ledger: Arc>>) -> Self { - Self { - payer_balances: ledger, - txn_sig_to_payer: Default::default(), - block_height: Instant::now(), - confirm_all_txns: true, - fail_on_submit_txn: false, - confirmed_txns: Default::default(), - } - } - pub async fn insert(&mut self, payer: PublicKeyBinary, amount: u64) { - self.payer_balances.lock().await.insert(payer, amount); - } - - pub async fn add_confirmed(&mut self, signature: Signature) { - self.confirm_all_txns = false; - self.confirmed_txns.lock().await.insert(signature); - } - - pub async fn get_payer_balance(&self, payer: &PublicKeyBinary) -> u64 { - self.payer_balances - .lock() - .await - .get(payer) - .cloned() - .unwrap_or_default() - } +#[async_trait] +impl SolanaNetwork for Arc>> { + type Transaction = MockTransaction; - pub async fn set_payer_balance(&self, payer: &PublicKeyBinary, balance: u64) { - *self.payer_balances.lock().await.get_mut(payer).unwrap() = balance; - } + async fn payer_balance(&self, payer: &PublicKeyBinary) -> Result { + Ok(*self.lock().await.get(payer).unwrap()) } - #[async_trait::async_trait] - impl SolanaNetwork for TestSolanaClientMap { - type Transaction = Transaction; - - async fn payer_balance(&self, payer: &PublicKeyBinary) -> Result { - Ok(*self.payer_balances.lock().await.get(payer).unwrap()) - } - - async fn make_burn_transaction( - &self, - payer: &PublicKeyBinary, - amount: u64, - ) -> Result { - let mut inner = SolanaTransaction::default(); - - let sig = Signature::new_unique(); - // add signature -> (payer, amount) so we can subtract - self.txn_sig_to_payer - .lock() - .await - .insert(sig, (payer.clone(), amount)); - inner.signatures.push(sig); - - Ok(Transaction { - inner, - sent_block_height: 1, - }) - } - - async fn submit_transaction( - &self, - txn: &Transaction, - store: &impl sender::TxnStore, - ) -> Result<(), SolanaRpcError> { - if self.fail_on_submit_txn { - panic!("attempting to send transaction"); - } - // Test client must attempt to send for changes to take place - sender::send_and_finalize(self, txn, RpcSendTransactionConfig::default(), store) - .await?; - - let signature = txn.get_signature(); - if let Some((payer, amount)) = self.txn_sig_to_payer.lock().await.get(signature) { - *self.payer_balances.lock().await.get_mut(payer).unwrap() -= amount; - } - - Ok(()) - } + async fn make_burn_transaction( + &self, + payer: &PublicKeyBinary, + amount: u64, + ) -> Result { + Ok(MockTransaction { + signature: Signature::new_unique(), + payer: payer.clone(), + amount, + }) + } - async fn confirm_transaction(&self, signature: &Signature) -> Result { - if self.confirm_all_txns { - return Ok(true); - } - Ok(self.confirmed_txns.lock().await.contains(signature)) - } + async fn submit_transaction(&self, txn: &MockTransaction) -> Result<(), SolanaRpcError> { + *self.lock().await.get_mut(&txn.payer).unwrap() -= txn.amount; + Ok(()) } - #[async_trait::async_trait] - impl sender::SenderClientExt for TestSolanaClientMap { - async fn send_txn( - &self, - txn: &Transaction, - _config: RpcSendTransactionConfig, - ) -> Result { - Ok(*txn.get_signature()) - } - async fn finalize_signature( - &self, - _signature: &Signature, - ) -> Result<(), sender::SolanaClientError> { - Ok(()) - } - async fn get_block_height(&self) -> Result { - // Using the nanoseconds since the client was made as block height - let block_height = self.block_height.elapsed().as_nanos(); - Ok(block_height as u64) - } + async fn confirm_transaction(&self, _txn: &Signature) -> Result { + Ok(true) } } + +/// Returns the PDA for the Delegated Data Credits of the given `payer`. +pub fn delegated_data_credits(sub_dao: &Pubkey, payer: &PublicKeyBinary) -> Pubkey { + let mut hasher = Sha256::new(); + hasher.update(payer.to_string()); + let sha_digest = hasher.finalize(); + let (ddc_key, _) = Pubkey::find_program_address( + &[ + "delegated_data_credits".as_bytes(), + sub_dao.as_ref(), + &sha_digest, + ], + &data_credits::ID, + ); + ddc_key +} diff --git a/solana/src/lib.rs b/solana/src/lib.rs index c57fa43f1..526dabf3c 100644 --- a/solana/src/lib.rs +++ b/solana/src/lib.rs @@ -1,61 +1,33 @@ -use solana_client::{client_error::ClientError, rpc_client::SerializableTransaction}; +use solana_client::client_error::ClientError; use solana_sdk::pubkey::ParsePubkeyError; -use solana_sdk::signature::read_keypair_file; -use std::{path::Path, time::SystemTimeError}; - -pub use helium_lib::{ - dao::SubDao, - error, - keypair::{Keypair, Pubkey, Signature}, -}; -pub use solana_sdk::transaction::VersionedTransaction as SolanaTransaction; - -#[derive(serde::Serialize)] -pub struct Transaction { - pub inner: SolanaTransaction, - pub sent_block_height: u64, -} - -impl From<(SolanaTransaction, u64)> for Transaction { - fn from(value: (SolanaTransaction, u64)) -> Self { - Self { - inner: value.0, - sent_block_height: value.1, - } - } -} - -impl SerializableTransaction for Transaction { - fn get_signature(&self) -> &Signature { - self.inner.get_signature() - } - - fn get_recent_blockhash(&self) -> &solana_sdk::hash::Hash { - self.inner.get_recent_blockhash() - } - - fn uses_durable_nonce(&self) -> bool { - self.inner.uses_durable_nonce() - } -} - -impl Transaction { - pub fn get_signature(&self) -> &Signature { - self.inner.get_signature() - } -} +use solana_sdk::signature::Signature; +use solana_sdk::transaction::Transaction; +use std::time::SystemTimeError; pub mod burn; pub mod carrier; -pub mod sender; pub mod start_boost; -pub fn read_keypair_from_file>(path: F) -> Result { - let path = path.as_ref(); - let keypair = read_keypair_file(path) - .map_err(|_err| SolanaRpcError::FailedToReadKeypairError(path.display().to_string()))?; - Ok(keypair.into()) +macro_rules! send_with_retry { + ($rpc:expr) => {{ + let mut attempt = 1; + loop { + match $rpc.await { + Ok(resp) => break Ok(resp), + Err(err) => { + if attempt < 5 { + attempt += 1; + tokio::time::sleep(std::time::Duration::from_secs(attempt)).await; + continue; + } else { + break Err(err); + } + } + } + } + }}; } +pub(crate) use send_with_retry; #[derive(thiserror::Error, Debug)] pub enum SolanaRpcError { @@ -77,12 +49,9 @@ pub enum SolanaRpcError { FailedToReadKeypairError(String), #[error("crypto error: {0}")] Crypto(#[from] helium_crypto::Error), - #[error("helium-lib error: {0}")] - HeliumLib(#[from] helium_lib::error::Error), - #[error("Parse Solana Pubkey from slice error: {0}")] - ParsePubkeyFromSliceError(#[from] std::array::TryFromSliceError), - #[error("Sender Error: {0}")] - Sender(#[from] sender::SenderError), + // TODO: Remove when fully integrated with helium-lib + #[error("Test Error")] + Test(String), } impl From for SolanaRpcError { @@ -101,14 +70,14 @@ pub trait GetSignature { fn get_signature(&self) -> &Signature; } -impl GetSignature for Signature { +impl GetSignature for Transaction { fn get_signature(&self) -> &Signature { - self + &self.signatures[0] } } -impl GetSignature for Transaction { +impl GetSignature for Signature { fn get_signature(&self) -> &Signature { - self.get_signature() + self } } diff --git a/solana/src/sender.rs b/solana/src/sender.rs deleted file mode 100644 index 0ade67d66..000000000 --- a/solana/src/sender.rs +++ /dev/null @@ -1,419 +0,0 @@ -use std::time::Duration; - -use exponential_backoff::Backoff; -use helium_lib::{client, keypair::Signature}; -use solana_client::rpc_config::RpcSendTransactionConfig; -use solana_sdk::commitment_config::CommitmentConfig; - -use crate::Transaction; - -pub type SolanaClientError = solana_client::client_error::ClientError; -pub type SenderResult = Result; - -#[derive(Debug, thiserror::Error)] -pub enum SenderError { - #[error("Txn Preparation error: {0}")] - Preparation(String), - #[error("Solana Client error: {0}")] - SolanaClient(Box), - #[error("Failed to send txn {attempt} times")] - Sending { attempt: usize }, - #[error("Failed to finalize txn")] - Finalize, -} - -impl From for SenderError { - fn from(err: SolanaClientError) -> Self { - Self::SolanaClient(Box::new(err)) - } -} - -impl SenderError { - pub fn preparation(msg: &str) -> Self { - Self::Preparation(msg.to_string()) - } -} - -pub async fn send_and_finalize( - client: &impl SenderClientExt, - txn: &Transaction, - config: RpcSendTransactionConfig, - store: &impl TxnStore, -) -> SenderResult<()> { - let sent_block_height = client.get_block_height().await?; - - store.on_prepared(txn).await?; - send_with_retry(client, txn, config, store).await?; - store.on_sent(txn).await; - - finalize_signature(client, txn, store, sent_block_height).await?; - store.on_finalized(txn).await; - - Ok(()) -} - -async fn send_with_retry( - client: &impl SenderClientExt, - txn: &Transaction, - config: RpcSendTransactionConfig, - store: &impl TxnStore, -) -> SenderResult<()> { - let backoff = store.make_backoff().into_iter(); - - for (attempt, duration) in backoff.enumerate() { - let attempt = attempt + 1; // 1-index loop - match client.send_txn(txn, config).await { - Ok(_sig) => return Ok(()), - Err(err) => match duration { - Some(duration) => { - store.on_sent_retry(txn, attempt).await; - tokio::time::sleep(duration).await; - } - None => { - store.on_error(txn, SenderError::Sending { attempt }).await; - return Err(err.into()); - } - }, - } - } - - unreachable!("Exceeded max attempts without returning") -} - -async fn finalize_signature( - client: &impl SenderClientExt, - txn: &Transaction, - store: &impl TxnStore, - sent_block_height: u64, -) -> SenderResult<()> { - const FINALIZATION_BLOCK_COUNT: u64 = 152; - - // Sleep until we're past the block where our transaction should be finalized. - loop { - let curr_block_height = client.get_block_height().await?; - if curr_block_height > sent_block_height + FINALIZATION_BLOCK_COUNT { - break; - } - tokio::time::sleep(Duration::from_secs(1)).await; - } - - let signature = txn.get_signature(); - if let Err(err) = client.finalize_signature(signature).await { - store.on_error(txn, SenderError::Finalize).await; - return Err(err.into()); - }; - - Ok(()) -} - -#[async_trait::async_trait] -pub trait SenderClientExt: Send + Sync { - async fn send_txn( - &self, - txn: &Transaction, - config: RpcSendTransactionConfig, - ) -> Result; - async fn finalize_signature(&self, signature: &Signature) -> Result<(), SolanaClientError>; - async fn get_block_height(&self) -> Result; -} - -#[async_trait::async_trait] -pub trait TxnStore: Send + Sync { - fn make_backoff(&self) -> Backoff { - Backoff::new(5, Duration::from_secs(1), Duration::from_secs(5)) - } - // Last chance to _not_ send a transaction. - async fn on_prepared(&self, _txn: &Transaction) -> SenderResult<()> { - Ok(()) - } - // The txn has been succesfully sent to Solana. - async fn on_sent(&self, _txn: &Transaction) { - tracing::info!("txn sent"); - } - // Sending the txn failed, and we're going to try again. - async fn on_sent_retry(&self, _txn: &Transaction, attempt: usize) { - tracing::info!(attempt, "txn retrying"); - } - // Txn's status has been successfully seen as Finalized. - async fn on_finalized(&self, _txn: &Transaction) {} - // Something went wrong during sending or finalizing. - // _err will be `SenderError::Sending` or `SenderError::Finalize`. - // The actual cause of the error will be returned to the sender. - async fn on_error(&self, _txn: &Transaction, _err: SenderError) {} -} - -#[async_trait::async_trait] -impl + Send + Sync> SenderClientExt for T { - async fn send_txn( - &self, - txn: &Transaction, - config: RpcSendTransactionConfig, - ) -> Result { - Ok(self - .as_ref() - .send_transaction_with_config(txn, config) - .await?) - } - - async fn finalize_signature(&self, signature: &Signature) -> Result<(), SolanaClientError> { - // Block height errors are handled in `finalize_signature`. - Ok(self - .as_ref() - .poll_for_signature_with_commitment(signature, CommitmentConfig::finalized()) - .await?) - } - - async fn get_block_height(&self) -> Result { - Ok(self.as_ref().get_block_height().await?) - } -} - -#[cfg(test)] -mod tests { - use std::{ - sync::{Arc, Mutex}, - time::Instant, - }; - - use solana_sdk::signer::SignerError; - - use super::*; - - #[derive(Default)] - struct MockTxnStore { - pub fail_prepared: bool, - pub calls: Arc>>, - } - - impl MockTxnStore { - fn fail_prepared() -> Self { - Self { - fail_prepared: true, - ..Default::default() - } - } - fn record_call(&self, method: String) { - self.calls.lock().unwrap().push(method); - } - } - - #[async_trait::async_trait] - impl TxnStore for MockTxnStore { - fn make_backoff(&self) -> Backoff { - Backoff::new(5, Duration::from_millis(10), Duration::from_millis(50)) - } - - async fn on_prepared(&self, txn: &Transaction) -> SenderResult<()> { - if self.fail_prepared { - return Err(SenderError::preparation("mock failure")); - } - let signature = txn.get_signature(); - self.record_call(format!("on_prepared: {signature}")); - Ok(()) - } - async fn on_sent(&self, txn: &Transaction) { - let signature = txn.get_signature(); - self.record_call(format!("on_sent: {signature}")); - } - async fn on_sent_retry(&self, txn: &Transaction, attempt: usize) { - let signature = txn.get_signature(); - self.record_call(format!("on_sent_retry: {attempt} {signature}")); - } - async fn on_finalized(&self, txn: &Transaction) { - let signature = txn.get_signature(); - self.record_call(format!("on_finalized: {signature}")) - } - async fn on_error(&self, txn: &Transaction, err: SenderError) { - let signature = txn.get_signature(); - self.record_call(format!("on_error: {signature} {err}")); - } - } - - struct MockClient { - pub sent_attempts: Mutex, - pub succeed_after_sent_attempts: usize, - pub finalize_success: bool, - pub block_height: Instant, - } - - impl MockClient { - fn succeed() -> Self { - Self { - sent_attempts: Mutex::new(0), - succeed_after_sent_attempts: 0, - finalize_success: true, - block_height: Instant::now(), - } - } - - fn succeed_after(succeed_after_sent_attempts: usize) -> Self { - Self { - sent_attempts: Mutex::new(0), - succeed_after_sent_attempts, - finalize_success: true, - block_height: Instant::now(), - } - } - } - - #[async_trait::async_trait] - impl SenderClientExt for MockClient { - async fn send_txn( - &self, - txn: &Transaction, - _config: RpcSendTransactionConfig, - ) -> Result { - let mut attempts = self.sent_attempts.lock().unwrap(); - *attempts += 1; - - if *attempts >= self.succeed_after_sent_attempts { - return Ok(*txn.get_signature()); - } - - // Fake Error - Err(SignerError::KeypairPubkeyMismatch.into()) - } - - async fn finalize_signature( - &self, - _signature: &Signature, - ) -> Result<(), SolanaClientError> { - if self.finalize_success { - return Ok(()); - } - // Fake Error - Err(SignerError::KeypairPubkeyMismatch.into()) - } - - async fn get_block_height(&self) -> Result { - // Using nanoseconds since test start as block_height - let block_height = self.block_height.elapsed().as_nanos(); - Ok(block_height as u64) - } - } - - fn mk_test_transaction() -> Transaction { - let mut inner = solana_sdk::transaction::Transaction::default(); - inner.signatures.push(Signature::new_unique()); - Transaction { - inner: inner.into(), - sent_block_height: 1, - } - } - - #[tokio::test] - async fn send_finalized_success() -> anyhow::Result<()> { - let tx = mk_test_transaction(); - let store = MockTxnStore::default(); - let client = MockClient::succeed(); - - send_and_finalize(&client, &tx, RpcSendTransactionConfig::default(), &store).await?; - - let signature = tx.get_signature(); - let calls = store.calls.lock().unwrap(); - assert_eq!( - *calls, - vec![ - format!("on_prepared: {signature}"), - format!("on_sent: {signature}"), - format!("on_finalized: {signature}") - ] - ); - - Ok(()) - } - - #[tokio::test] - async fn send_finalized_success_after_retry() -> anyhow::Result<()> { - let txn = mk_test_transaction(); - let store = MockTxnStore::default(); - let client = MockClient::succeed_after(5); - - send_and_finalize(&client, &txn, RpcSendTransactionConfig::default(), &store).await?; - - let signature = txn.get_signature(); - let calls = store.calls.lock().unwrap(); - assert_eq!( - *calls, - vec![ - format!("on_prepared: {signature}"), - format!("on_sent_retry: 1 {signature}"), - format!("on_sent_retry: 2 {signature}"), - format!("on_sent_retry: 3 {signature}"), - format!("on_sent_retry: 4 {signature}"), - format!("on_sent: {signature}"), - format!("on_finalized: {signature}") - ] - ); - - Ok(()) - } - - #[tokio::test] - async fn send_error_with_retry() -> anyhow::Result<()> { - let txn = mk_test_transaction(); - let store = MockTxnStore::default(); - let client = MockClient::succeed_after(999); - - let res = - send_and_finalize(&client, &txn, RpcSendTransactionConfig::default(), &store).await; - assert!(res.is_err()); - - let signature = txn.get_signature(); - let calls = store.calls.lock().unwrap(); - assert_eq!( - *calls, - vec![ - format!("on_prepared: {signature}"), - format!("on_sent_retry: 1 {signature}"), - format!("on_sent_retry: 2 {signature}"), - format!("on_sent_retry: 3 {signature}"), - format!("on_sent_retry: 4 {signature}"), - format!("on_error: {signature} Failed to send txn 5 times") - ] - ); - - Ok(()) - } - - #[tokio::test] - async fn send_success_finalize_error() -> anyhow::Result<()> { - let txn = mk_test_transaction(); - let store = MockTxnStore::default(); - let mut client = MockClient::succeed(); - client.finalize_success = false; - - let res = - send_and_finalize(&client, &txn, RpcSendTransactionConfig::default(), &store).await; - assert!(res.is_err()); - - let signature = txn.get_signature(); - let calls = store.calls.lock().unwrap(); - assert_eq!( - *calls, - vec![ - format!("on_prepared: {signature}"), - format!("on_sent: {signature}"), - format!("on_error: {signature} Failed to finalize txn") - ] - ); - - Ok(()) - } - - #[tokio::test] - async fn failed_preparation() -> anyhow::Result<()> { - let txn = mk_test_transaction(); - let store = MockTxnStore::fail_prepared(); - let client = MockClient::succeed(); - - let res = - send_and_finalize(&client, &txn, RpcSendTransactionConfig::default(), &store).await; - assert!(res.is_err()); - - let calls = store.calls.lock().unwrap(); - assert_eq!(*calls, Vec::::new()); - - Ok(()) - } -} diff --git a/solana/src/start_boost.rs b/solana/src/start_boost.rs index 1f90a27e4..1245ac2d6 100644 --- a/solana/src/start_boost.rs +++ b/solana/src/start_boost.rs @@ -1,37 +1,23 @@ -use crate::{read_keypair_from_file, GetSignature, SolanaRpcError, Transaction}; +use crate::{send_with_retry, GetSignature, SolanaRpcError}; +use anchor_client::RequestBuilder; use async_trait::async_trait; -use chrono::{DateTime, Utc}; use file_store::hex_boost::BoostedHexActivation; -use helium_lib::{ - boosting, client, - keypair::{Keypair, Signature}, - TransactionOpts, +use helium_anchor_gen::{ + anchor_lang::{InstructionData, ToAccountMetas}, + hexboosting::{self, accounts, instruction}, }; use serde::Deserialize; use solana_client::nonblocking::rpc_client::RpcClient; -use solana_sdk::{commitment_config::CommitmentConfig, pubkey::Pubkey, signer::Signer}; +use solana_program::instruction::Instruction; +use solana_sdk::{ + commitment_config::CommitmentConfig, + pubkey::Pubkey, + signature::{read_keypair_file, Keypair, Signature}, + signer::Signer, + transaction::Transaction, +}; use std::sync::Arc; -macro_rules! send_with_retry { - ($rpc:expr) => {{ - let mut attempt = 1; - loop { - match $rpc.await { - Ok(resp) => break Ok(resp), - Err(err) => { - if attempt < 5 { - attempt += 1; - tokio::time::sleep(std::time::Duration::from_secs(attempt)).await; - continue; - } else { - break Err(err); - } - } - } - } - }}; -} - #[async_trait] pub trait SolanaNetwork: Send + Sync + 'static { type Transaction: GetSignature + Send + Sync + 'static; @@ -52,58 +38,36 @@ pub trait SolanaNetwork: Send + Sync + 'static { #[derive(Debug, Deserialize)] pub struct Settings { rpc_url: String, + cluster: String, start_authority_keypair: String, } + pub struct SolanaRpc { provider: RpcClient, - keypair: Keypair, + cluster: String, + keypair: [u8; 64], start_authority: Pubkey, - transaction_opts: TransactionOpts, } impl SolanaRpc { pub async fn new(settings: &Settings) -> Result, SolanaRpcError> { - let keypair = read_keypair_from_file(&settings.start_authority_keypair)?; - let provider = client::SolanaRpcClient::new_with_commitment( - settings.rpc_url.clone(), - CommitmentConfig::finalized(), - ); - + let Ok(keypair) = read_keypair_file(&settings.start_authority_keypair) else { + return Err(SolanaRpcError::FailedToReadKeypairError( + settings.start_authority_keypair.to_owned(), + )); + }; + let provider = + RpcClient::new_with_commitment(settings.rpc_url.clone(), CommitmentConfig::finalized()); + let start_authority = keypair.pubkey(); Ok(Arc::new(Self { + cluster: settings.cluster.clone(), provider, - start_authority: keypair.pubkey(), - keypair, - transaction_opts: TransactionOpts::default(), + keypair: keypair.to_bytes(), + start_authority, })) } } -impl AsRef for SolanaRpc { - fn as_ref(&self) -> &client::SolanaRpcClient { - &self.provider - } -} - -struct BoostedHex<'a>(&'a Pubkey, &'a BoostedHexActivation); - -impl helium_lib::boosting::StartBoostingHex for BoostedHex<'_> { - fn start_authority(&self) -> Pubkey { - *self.0 - } - - fn boost_config(&self) -> Pubkey { - self.1.boost_config_pubkey.parse().unwrap() - } - - fn boosted_hex(&self) -> Pubkey { - self.1.boosted_hex_pubkey.parse().unwrap() - } - - fn activation_ts(&self) -> DateTime { - self.1.activation_ts - } -} - #[async_trait] impl SolanaNetwork for SolanaRpc { type Transaction = Transaction; @@ -112,15 +76,43 @@ impl SolanaNetwork for SolanaRpc { &self, batch: &[BoostedHexActivation], ) -> Result { - let tx = boosting::start_boost( - &self, - batch.iter().map(|b| BoostedHex(&self.start_authority, b)), - &self.keypair, - &self.transaction_opts, - ) - .await?; - - Ok(tx.into()) + let instructions = { + let mut request = RequestBuilder::from( + hexboosting::id(), + &self.cluster, + std::rc::Rc::new(Keypair::from_bytes(&self.keypair).unwrap()), + Some(CommitmentConfig::finalized()), + ); + for update in batch { + let account = accounts::StartBoostV0 { + start_authority: self.start_authority, + boost_config: update.boost_config_pubkey.parse()?, + boosted_hex: update.boosted_hex_pubkey.parse()?, + }; + let args = instruction::StartBoostV0 { + _args: hexboosting::StartBoostArgsV0 { + start_ts: update.activation_ts.timestamp(), + }, + }; + let instruction = Instruction { + program_id: hexboosting::id(), + accounts: account.to_account_metas(None), + data: args.data(), + }; + request = request.instruction(instruction); + } + request.instructions().unwrap() + }; + tracing::debug!("instructions: {:?}", instructions); + let blockhash = self.provider.get_latest_blockhash().await?; + let signer = Keypair::from_bytes(&self.keypair).unwrap(); + + Ok(Transaction::new_signed_with_payer( + &instructions, + Some(&signer.pubkey()), + &[&signer], + blockhash, + )) } async fn submit_transaction(&self, tx: &Self::Transaction) -> Result<(), SolanaRpcError> {