diff --git a/Cargo.lock b/Cargo.lock index f2b037c604..b16781dced 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -200,7 +200,7 @@ checksum = "7378575ff571966e99a744addeff0bff98b8ada0dedf1956d59e634db95eaac1" dependencies = [ "proc-macro2", "quote", - "syn 2.0.59", + "syn 2.0.60", "synstructure 0.13.1", ] @@ -223,7 +223,7 @@ checksum = "7b18050c2cd6fe86c3a76584ef5e0baf286d038cda203eb6223df2cc413565f7" dependencies = [ "proc-macro2", "quote", - "syn 2.0.59", + "syn 2.0.60", ] [[package]] @@ -234,7 +234,7 @@ checksum = "c6fa2087f2753a7da8cc1c0dbfcf89579dd57458e36769de5ac750b4671737ca" dependencies = [ "proc-macro2", "quote", - "syn 2.0.59", + "syn 2.0.60", ] [[package]] @@ -277,7 +277,7 @@ dependencies = [ "http 1.1.0", "http-body 1.0.0", "http-body-util", - "hyper 1.2.0", + "hyper 1.3.1", "hyper-util", "itoa", "matchit", @@ -328,7 +328,7 @@ dependencies = [ "heck 0.4.1", "proc-macro2", "quote", - "syn 2.0.59", + "syn 2.0.60", ] [[package]] @@ -343,7 +343,7 @@ dependencies = [ "http 1.1.0", "http-body 1.0.0", "http-body-util", - "hyper 1.2.0", + "hyper 1.3.1", "hyper-util", "pin-project-lite", "rustls", @@ -547,9 +547,9 @@ checksum = "37b2a672a2cb129a2e41c10b1224bb368f9f37a2b16b612598138befd7b37eb5" [[package]] name = "cc" -version = "1.0.94" +version = "1.0.95" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "17f6e324229dc011159fcc089755d1e2e216a90d43a7dea6853ca740b84f35e7" +checksum = "d32a725bc159af97c3e629873bb9f88fb8cf8a4867175f76dc987815ea07c83b" [[package]] name = "cfg-if" @@ -652,7 +652,7 @@ dependencies = [ "heck 0.5.0", "proc-macro2", "quote", - "syn 2.0.59", + "syn 2.0.60", ] [[package]] @@ -984,7 +984,7 @@ checksum = "f46882e17999c6cc590af592290432be3bce0428cb0d5f8b6715e4dc7b383eb3" dependencies = [ "proc-macro2", "quote", - "syn 2.0.59", + "syn 2.0.60", ] [[package]] @@ -1008,7 +1008,7 @@ dependencies = [ "proc-macro2", "quote", "strsim 0.10.0", - "syn 2.0.59", + "syn 2.0.60", ] [[package]] @@ -1019,7 +1019,7 @@ checksum = "a668eda54683121533a393014d8692171709ff57a7d61f187b6e782719f8933f" dependencies = [ "darling_core", "quote", - "syn 2.0.59", + "syn 2.0.60", ] [[package]] @@ -1105,7 +1105,7 @@ checksum = "5fe87ce4529967e0ba1dcf8450bab64d97dfd5010a6256187ffe2e43e6f0e049" dependencies = [ "proc-macro2", "quote", - "syn 2.0.59", + "syn 2.0.60", ] [[package]] @@ -1135,7 +1135,7 @@ checksum = "2bba3e9872d7c58ce7ef0fcf1844fcc3e23ef2a58377b50df35dd98e42a5726e" dependencies = [ "proc-macro2", "quote", - "syn 2.0.59", + "syn 2.0.60", "unicode-xid", ] @@ -1218,7 +1218,7 @@ checksum = "487585f4d0c6655fe74905e2504d8ad6908e4db67f744eb140876906c2f3175d" dependencies = [ "proc-macro2", "quote", - "syn 2.0.59", + "syn 2.0.60", ] [[package]] @@ -1363,7 +1363,7 @@ dependencies = [ "heck 0.4.1", "proc-macro2", "quote", - "syn 2.0.59", + "syn 2.0.60", ] [[package]] @@ -1376,7 +1376,7 @@ dependencies = [ "num-traits", "proc-macro2", "quote", - "syn 2.0.59", + "syn 2.0.60", ] [[package]] @@ -1396,7 +1396,7 @@ checksum = "5c785274071b1b420972453b306eeca06acf4633829db4223b58a2a8c5953bc4" dependencies = [ "proc-macro2", "quote", - "syn 2.0.59", + "syn 2.0.60", ] [[package]] @@ -1486,9 +1486,9 @@ dependencies = [ [[package]] name = "fiat-crypto" -version = "0.2.7" +version = "0.2.8" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "c007b1ae3abe1cb6f85a16305acd418b7ca6343b953633fee2b76d8f108b830f" +checksum = "38793c55593b33412e3ae40c2c9781ffaa6f438f6f8c10f24e71846fbd7ae01e" [[package]] name = "flume" @@ -1607,7 +1607,7 @@ checksum = "87750cf4b7a4c0625b1529e4c543c2182106e4dedc60a2a6455e00d212c489ac" dependencies = [ "proc-macro2", "quote", - "syn 2.0.59", + "syn 2.0.60", ] [[package]] @@ -1876,9 +1876,9 @@ checksum = "7f24254aa9a54b5c858eaee2f5bccdb46aaf0e486a595ed5fd8f86ba55232a70" [[package]] name = "hickory-proto" -version = "0.24.0" +version = "0.24.1" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "091a6fbccf4860009355e3efc52ff4acf37a63489aad7435372d44ceeb6fbbcf" +checksum = "07698b8420e2f0d6447a436ba999ec85d8fbf2a398bbd737b82cac4a2e96e512" dependencies = [ "async-trait", "cfg-if", @@ -1905,9 +1905,9 @@ dependencies = [ [[package]] name = "hickory-resolver" -version = "0.24.0" +version = "0.24.1" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "35b8f021164e6a984c9030023544c57789c51760065cd510572fedcfb04164e8" +checksum = "28757f23aa75c98f254cf0405e6d8c25b831b32921b050a66692427679b1f243" dependencies = [ "cfg-if", "futures-util", @@ -1929,9 +1929,9 @@ dependencies = [ [[package]] name = "hickory-server" -version = "0.24.0" +version = "0.24.1" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "4fbbb45bc4dcb456445732c705e3cfdc7393b8bcae5c36ecec36b9d76bd67cb5" +checksum = "9be0e43c556b9b3fdb6c7c71a9a32153a2275d02419e3de809e520bfcfe40c37" dependencies = [ "async-trait", "bytes", @@ -2114,9 +2114,9 @@ dependencies = [ [[package]] name = "hyper" -version = "1.2.0" +version = "1.3.1" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "186548d73ac615b32a73aafe38fb4f56c0d340e110e5a200bcadbaf2e199263a" +checksum = "fe575dd17d0862a9a33781c8c4696a55c320909004a67a00fb286ba8b1bc496d" dependencies = [ "bytes", "futures-channel", @@ -2157,7 +2157,7 @@ dependencies = [ "futures-util", "http 1.1.0", "http-body 1.0.0", - "hyper 1.2.0", + "hyper 1.3.1", "pin-project-lite", "socket2", "tokio", @@ -2322,7 +2322,9 @@ dependencies = [ "clap", "derive_more", "flume", - "futures", + "futures-buffered", + "futures-lite", + "futures-util", "genawaiter", "hex", "indicatif", @@ -2377,7 +2379,7 @@ dependencies = [ "proptest", "rand", "rand_core", - "redb 2.0.0", + "redb 2.1.0", "serde", "serde-error", "serde_json", @@ -2412,8 +2414,9 @@ dependencies = [ "chrono", "derive_more", "flume", - "futures", "futures-buffered", + "futures-lite", + "futures-util", "genawaiter", "hashlink", "hex", @@ -2433,7 +2436,7 @@ dependencies = [ "range-collections", "rcgen 0.12.1", "redb 1.5.1", - "redb 2.0.0", + "redb 2.1.0", "reflink-copy", "rustls", "self_cell", @@ -2466,7 +2469,8 @@ dependencies = [ "dirs-next", "duct", "flume", - "futures", + "futures-buffered", + "futures-lite", "hex", "human-time", "indicatif", @@ -2511,7 +2515,7 @@ dependencies = [ "clap", "derive_more", "dirs-next", - "futures", + "futures-lite", "governor", "hickory-proto", "hickory-resolver", @@ -2524,7 +2528,7 @@ dependencies = [ "parking_lot", "pkarr", "rcgen 0.12.1", - "redb 2.0.0", + "redb 2.1.0", "regex", "rustls", "rustls-pemfile 1.0.4", @@ -2554,7 +2558,7 @@ dependencies = [ "clap", "derive_more", "ed25519-dalek", - "futures", + "futures-lite", "genawaiter", "indexmap 2.2.6", "iroh-base", @@ -2595,7 +2599,7 @@ dependencies = [ "anyhow", "erased_set", "http-body-util", - "hyper 1.2.0", + "hyper 1.3.1", "hyper-util", "once_cell", "prometheus-client", @@ -2624,7 +2628,10 @@ dependencies = [ "derive_more", "duct", "flume", - "futures", + "futures-buffered", + "futures-lite", + "futures-sink", + "futures-util", "governor", "hex", "hickory-proto", @@ -2632,7 +2639,7 @@ dependencies = [ "hostname", "http 1.1.0", "http-body-util", - "hyper 1.2.0", + "hyper 1.3.1", "hyper-util", "igd-next", "iroh-base", @@ -2715,13 +2722,14 @@ dependencies = [ "derive_more", "ed25519-dalek", "flume", - "futures", + "futures-util", "hex", "iroh-base", "iroh-blake3", "iroh-metrics", "iroh-net", "iroh-test", + "lru", "num_enum", "postcard", "proptest", @@ -2730,7 +2738,7 @@ dependencies = [ "rand_chacha", "rand_core", "redb 1.5.1", - "redb 2.0.0", + "redb 2.1.0", "self_cell", "serde", "strum 0.25.0", @@ -2833,9 +2841,9 @@ checksum = "01cda141df6706de531b6c46c3a33ecca755538219bd484262fa09410c13539c" [[package]] name = "lock_api" -version = "0.4.11" +version = "0.4.12" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "3c168f8615b12bc01f9c17e2eb0cc07dcae1940121185446edc3744920e8ef45" +checksum = "07af8b9cdd281b7915f413fa73f29ebd5d55d0d3f0155584dade1ff18cea1b17" dependencies = [ "autocfg", "scopeguard", @@ -3219,7 +3227,7 @@ dependencies = [ "proc-macro-crate", "proc-macro2", "quote", - "syn 2.0.59", + "syn 2.0.60", ] [[package]] @@ -3356,9 +3364,9 @@ checksum = "bb813b8af86854136c6922af0598d719255ecb2179515e6e7730d468f05c9cae" [[package]] name = "parking_lot" -version = "0.12.1" +version = "0.12.2" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "3742b2c103b9f06bc9fff0a37ff4912935851bee6d36f3c02bcc755bcfec228f" +checksum = "7e4af0ca4f6caed20e900d564c242b8e5d4903fdacf31d3daf527b66fe6f42fb" dependencies = [ "lock_api", "parking_lot_core", @@ -3366,15 +3374,15 @@ dependencies = [ [[package]] name = "parking_lot_core" -version = "0.9.9" +version = "0.9.10" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "4c42a9226546d68acdd9c0a280d17ce19bfe27a46bf68784e4066115788d008e" +checksum = "1e401f977ab385c9e4e3ab30627d6f26d00e2c73eef317493c4ec6d468726cf8" dependencies = [ "cfg-if", "libc", - "redox_syscall", + "redox_syscall 0.5.1", "smallvec", - "windows-targets 0.48.5", + "windows-targets 0.52.5", ] [[package]] @@ -3439,7 +3447,7 @@ dependencies = [ "pest_meta", "proc-macro2", "quote", - "syn 2.0.59", + "syn 2.0.60", ] [[package]] @@ -3470,7 +3478,7 @@ checksum = "2f38a4412a78282e09a2cf38d195ea5420d15ba0602cb375210efbc877243965" dependencies = [ "proc-macro2", "quote", - "syn 2.0.59", + "syn 2.0.60", ] [[package]] @@ -3575,7 +3583,7 @@ dependencies = [ "proc-macro2", "quote", "regex", - "syn 2.0.59", + "syn 2.0.60", ] [[package]] @@ -3759,9 +3767,9 @@ checksum = "dc375e1527247fe1a97d8b7156678dfe7c1af2fc075c9a4db3690ecd2a148068" [[package]] name = "proc-macro2" -version = "1.0.80" +version = "1.0.81" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "a56dea16b0a29e94408b9aa5e2940a4eedbd128a1ba20e8f7ae60fd3d465af0e" +checksum = "3d1597b0c024618f09a9c3b8655b7e430397a36d23fdafec26d6965e9eec3eba" dependencies = [ "unicode-ident", ] @@ -3786,7 +3794,7 @@ checksum = "440f724eba9f6996b75d63681b0a92b06947f1457076d503a4d2e2c8f56442b8" dependencies = [ "proc-macro2", "quote", - "syn 2.0.59", + "syn 2.0.60", ] [[package]] @@ -3826,14 +3834,16 @@ dependencies = [ [[package]] name = "quic-rpc" -version = "0.7.0" +version = "0.8.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "30d960d0b328db80274cbcbaf4bf53728eff1f43df4fc36c34eb79a97753c7d9" +checksum = "b7af964f8ee0a3ff7f03b8bc3ffce8cd604d9c2e5805b45d1446f774c71fb07e" dependencies = [ "bincode", "educe", "flume", - "futures", + "futures-lite", + "futures-sink", + "futures-util", "pin-project", "quinn", "serde", @@ -4041,9 +4051,9 @@ dependencies = [ [[package]] name = "redb" -version = "2.0.0" +version = "2.1.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "a1100a056c5dcdd4e5513d5333385223b26ef1bf92f31eb38f407e8c20549256" +checksum = "ed7508e692a49b6b2290b56540384ccae9b1fb4d77065640b165835b56ffe3bb" dependencies = [ "libc", ] @@ -4057,6 +4067,15 @@ dependencies = [ "bitflags 1.3.2", ] +[[package]] +name = "redox_syscall" +version = "0.5.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "469052894dcb553421e483e4209ee581a45100d31b4018de03e5a7ad86374a7e" +dependencies = [ + "bitflags 2.5.0", +] + [[package]] name = "redox_users" version = "0.4.5" @@ -4085,18 +4104,18 @@ checksum = "5fddb4f8d99b0a2ebafc65a87a69a7b9875e4b1ae1f00db265d300ef7f28bccc" dependencies = [ "proc-macro2", "quote", - "syn 2.0.59", + "syn 2.0.60", ] [[package]] name = "reflink-copy" -version = "0.1.15" +version = "0.1.17" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "52b1349400e2ffd64a9fb5ed9008e33c0b8ef86bd5bae8f73080839c7082f1d5" +checksum = "7c3138c30c59ed9b8572f82bed97ea591ecd7e45012566046cc39e72679cff22" dependencies = [ "cfg-if", "rustix", - "windows 0.54.0", + "windows 0.56.0", ] [[package]] @@ -4311,9 +4330,9 @@ dependencies = [ [[package]] name = "rustix" -version = "0.38.32" +version = "0.38.34" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "65e04861e65f21776e67888bfbea442b3642beaa0138fdb1dd7a84a52dffdb89" +checksum = "70dc5ec042f7a43c4a73241207cecc9873a06d45debb38b329f8541d85c2730f" dependencies = [ "bitflags 2.5.0", "errno", @@ -4367,9 +4386,9 @@ dependencies = [ [[package]] name = "rustls-pki-types" -version = "1.4.1" +version = "1.5.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "ecd36cc4259e3e4514335c4a138c6b43171a8d61d8f5c9348f9fc7529416f247" +checksum = "beb461507cee2c2ff151784c52762cf4d9ff6a61f3e80968600ed24fa837fa54" [[package]] name = "rustls-webpki" @@ -4525,9 +4544,9 @@ dependencies = [ [[package]] name = "serde" -version = "1.0.197" +version = "1.0.198" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "3fb1c873e1b9b056a4dc4c0c198b24c3ffa059243875552b2bd0933b1aee4ce2" +checksum = "9846a40c979031340571da2545a4e5b7c4163bdae79b301d5f86d03979451fcc" dependencies = [ "serde_derive", ] @@ -4543,20 +4562,20 @@ dependencies = [ [[package]] name = "serde_derive" -version = "1.0.197" +version = "1.0.198" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "7eb0b34b42edc17f6b7cac84a52a1c5f0e1bb2227e997ca9011ea3dd34e8610b" +checksum = "e88edab869b01783ba905e7d0153f9fc1a6505a96e4ad3018011eedb838566d9" dependencies = [ "proc-macro2", "quote", - "syn 2.0.59", + "syn 2.0.60", ] [[package]] name = "serde_json" -version = "1.0.115" +version = "1.0.116" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "12dc5c46daa8e9fdf4f5e71b6cf9a53f2487da0e86e55808e2d35539666497dd" +checksum = "3e17db7126d17feb94eb3fad46bf1a96b034e8aacbc2e775fe81505f8b0b2813" dependencies = [ "itoa", "ryu", @@ -4605,11 +4624,11 @@ dependencies = [ [[package]] name = "serde_with" -version = "3.7.0" +version = "3.8.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "ee80b0e361bbf88fd2f6e242ccd19cfda072cb0faa6ae694ecee08199938569a" +checksum = "2c85f8e96d1d6857f13768fcbd895fcb06225510022a2774ed8b5150581847b0" dependencies = [ - "base64 0.21.7", + "base64 0.22.0", "chrono", "hex", "indexmap 1.9.3", @@ -4623,14 +4642,14 @@ dependencies = [ [[package]] name = "serde_with_macros" -version = "3.7.0" +version = "3.8.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "6561dc161a9224638a31d876ccdfefbc1df91d3f3a8342eddb35f055d48c7655" +checksum = "c8b3a576c4eb2924262d5951a3b737ccaf16c931e39a2810c36f9a7e25575557" dependencies = [ "darling", "proc-macro2", "quote", - "syn 2.0.59", + "syn 2.0.60", ] [[package]] @@ -4701,9 +4720,9 @@ dependencies = [ [[package]] name = "signal-hook-registry" -version = "1.4.1" +version = "1.4.2" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "d8229b473baa5980ac72ef434c4415e70c4b5e71b423043adb4ba059f89c99a1" +checksum = "a9e9e0b4211b72e7b8b6e85c807d36c212bdb33ea8587f7569562a84df5465b1" dependencies = [ "libc", ] @@ -4875,7 +4894,7 @@ dependencies = [ "proc-macro2", "quote", "struct_iterable_internal", - "syn 2.0.59", + "syn 2.0.60", ] [[package]] @@ -4893,7 +4912,7 @@ dependencies = [ "proc-macro2", "quote", "structmeta-derive", - "syn 2.0.59", + "syn 2.0.60", ] [[package]] @@ -4904,7 +4923,7 @@ checksum = "a60bcaff7397072dca0017d1db428e30d5002e00b6847703e2e42005c95fbe00" dependencies = [ "proc-macro2", "quote", - "syn 2.0.59", + "syn 2.0.60", ] [[package]] @@ -4935,7 +4954,7 @@ dependencies = [ "proc-macro2", "quote", "rustversion", - "syn 2.0.59", + "syn 2.0.60", ] [[package]] @@ -4948,7 +4967,7 @@ dependencies = [ "proc-macro2", "quote", "rustversion", - "syn 2.0.59", + "syn 2.0.60", ] [[package]] @@ -5010,9 +5029,9 @@ dependencies = [ [[package]] name = "syn" -version = "2.0.59" +version = "2.0.60" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "4a6531ffc7b071655e4ce2e04bd464c4830bb585a61cabb96cf808f05172615a" +checksum = "909518bc7b1c9b779f1bbf07f2929d35af9f0f37e47c6e9ef7f9dddc1e1821f3" dependencies = [ "proc-macro2", "quote", @@ -5062,7 +5081,7 @@ checksum = "c8af7666ab7b6390ab78131fb5b0fce11d6b7a6951602017c35fa82800708971" dependencies = [ "proc-macro2", "quote", - "syn 2.0.59", + "syn 2.0.60", ] [[package]] @@ -5121,7 +5140,7 @@ dependencies = [ "proc-macro2", "quote", "structmeta", - "syn 2.0.59", + "syn 2.0.60", ] [[package]] @@ -5140,22 +5159,22 @@ dependencies = [ [[package]] name = "thiserror" -version = "1.0.58" +version = "1.0.59" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "03468839009160513471e86a034bb2c5c0e4baae3b43f79ffc55c4a5427b3297" +checksum = "f0126ad08bff79f29fc3ae6a55cc72352056dfff61e3ff8bb7129476d44b23aa" dependencies = [ "thiserror-impl", ] [[package]] name = "thiserror-impl" -version = "1.0.58" +version = "1.0.59" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "c61f3ba182994efc43764a46c018c347bc492c79f024e705f46567b418f6d4f7" +checksum = "d1cd413b5d558b4c5bf3680e324a6fa5014e7b7c067a51e69dbdf47eb7148b66" dependencies = [ "proc-macro2", "quote", - "syn 2.0.59", + "syn 2.0.60", ] [[package]] @@ -5253,7 +5272,7 @@ checksum = "5b8a1e28f2deaa14e508979454cb3a223b10b938b45af148bc0986de36f1923b" dependencies = [ "proc-macro2", "quote", - "syn 2.0.59", + "syn 2.0.60", ] [[package]] @@ -5349,7 +5368,7 @@ dependencies = [ "serde", "serde_spanned", "toml_datetime", - "toml_edit 0.22.9", + "toml_edit 0.22.12", ] [[package]] @@ -5374,9 +5393,9 @@ dependencies = [ [[package]] name = "toml_edit" -version = "0.22.9" +version = "0.22.12" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "8e40bb779c5187258fd7aad0eb68cb8706a0a81fa712fbea808ab43c4b8374c4" +checksum = "d3328d4f68a705b2a4498da1d580585d39a6510f98318a2cec3018a7ec61ddef" dependencies = [ "indexmap 2.2.6", "serde", @@ -5478,7 +5497,7 @@ checksum = "34704c8d6ebcbc939824180af020566b01a7c01f80641264eba0999f6c2b6be7" dependencies = [ "proc-macro2", "quote", - "syn 2.0.59", + "syn 2.0.60", ] [[package]] @@ -5725,7 +5744,7 @@ dependencies = [ "once_cell", "proc-macro2", "quote", - "syn 2.0.59", + "syn 2.0.60", "wasm-bindgen-shared", ] @@ -5759,7 +5778,7 @@ checksum = "e94f17b526d0a461a191c78ea52bbce64071ed5c04c9ffe424dcb38f74171bb7" dependencies = [ "proc-macro2", "quote", - "syn 2.0.59", + "syn 2.0.60", "wasm-bindgen-backend", "wasm-bindgen-shared", ] @@ -5804,7 +5823,7 @@ version = "1.5.1" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "a44ab49fad634e88f55bf8f9bb3abd2f27d7204172a112c7c9987e01c1c94ea9" dependencies = [ - "redox_syscall", + "redox_syscall 0.4.1", "wasite", "web-sys", ] @@ -5833,11 +5852,11 @@ checksum = "ac3b87c63620426dd9b991e5ce0329eff545bccbbb34f3be09ff6fb6ab51b7b6" [[package]] name = "winapi-util" -version = "0.1.6" +version = "0.1.8" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "f29e6f9198ba0d26b4c9f07dbe6f9ed633e1f3d5b8b414090084349e46a52596" +checksum = "4d4cc384e1e73b93bafa6fb4f1df8c41695c8a91cf9c4c64358067d15a7b6c6b" dependencies = [ - "winapi", + "windows-sys 0.52.0", ] [[package]] @@ -5872,18 +5891,18 @@ source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "e48a53791691ab099e5e2ad123536d0fff50652600abaf43bbf952894110d0be" dependencies = [ "windows-core 0.52.0", - "windows-implement", - "windows-interface", + "windows-implement 0.52.0", + "windows-interface 0.52.0", "windows-targets 0.52.5", ] [[package]] name = "windows" -version = "0.54.0" +version = "0.56.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "9252e5725dbed82865af151df558e754e4a3c2c30818359eb17465f1346a1b49" +checksum = "1de69df01bdf1ead2f4ac895dc77c9351aefff65b2f3db429a343f9cbf05e132" dependencies = [ - "windows-core 0.54.0", + "windows-core 0.56.0", "windows-targets 0.52.5", ] @@ -5907,10 +5926,12 @@ dependencies = [ [[package]] name = "windows-core" -version = "0.54.0" +version = "0.56.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "12661b9c89351d684a50a8a643ce5f608e20243b9fb84687800163429f161d65" +checksum = "4698e52ed2d08f8658ab0c39512a7c00ee5fe2688c65f8c0a4f06750d729f2a6" dependencies = [ + "windows-implement 0.56.0", + "windows-interface 0.56.0", "windows-result", "windows-targets 0.52.5", ] @@ -5923,7 +5944,18 @@ checksum = "12168c33176773b86799be25e2a2ba07c7aab9968b37541f1094dbd7a60c8946" dependencies = [ "proc-macro2", "quote", - "syn 2.0.59", + "syn 2.0.60", +] + +[[package]] +name = "windows-implement" +version = "0.56.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "f6fc35f58ecd95a9b71c4f2329b911016e6bec66b3f2e6a4aad86bd2e99e2f9b" +dependencies = [ + "proc-macro2", + "quote", + "syn 2.0.60", ] [[package]] @@ -5934,7 +5966,18 @@ checksum = "9d8dc32e0095a7eeccebd0e3f09e9509365ecb3fc6ac4d6f5f14a3f6392942d1" dependencies = [ "proc-macro2", "quote", - "syn 2.0.59", + "syn 2.0.60", +] + +[[package]] +name = "windows-interface" +version = "0.56.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "08990546bf4edef8f431fa6326e032865f27138718c587dc21bc0265bbcb57cc" +dependencies = [ + "proc-macro2", + "quote", + "syn 2.0.60", ] [[package]] @@ -6214,7 +6257,7 @@ checksum = "9ce1b18ccd8e73a9321186f97e46f9f04b778851177567b1975109d26a08d2a6" dependencies = [ "proc-macro2", "quote", - "syn 2.0.59", + "syn 2.0.60", ] [[package]] diff --git a/iroh-bytes/Cargo.toml b/iroh-bytes/Cargo.toml index de2048644c..f3c106e314 100644 --- a/iroh-bytes/Cargo.toml +++ b/iroh-bytes/Cargo.toml @@ -22,8 +22,8 @@ bytes = { version = "1.4", features = ["serde"] } chrono = "0.4.31" derive_more = { version = "1.0.0-beta.1", features = ["debug", "display", "deref", "deref_mut", "from", "try_into", "into"] } flume = "0.11" -futures = "0.3.25" futures-buffered = "0.2.4" +futures-lite = "2.3" genawaiter = { version = "0.99.1", features = ["futures03"] } hashlink = { version = "0.9.0", optional = true } hex = "0.4.3" @@ -54,6 +54,7 @@ tracing-futures = "0.2.5" http-body = "0.4.5" iroh-bytes = { path = ".", features = ["downloader"] } iroh-test = { path = "../iroh-test" } +futures-buffered = "0.2.4" proptest = "1.0.0" serde_json = "1.0.107" serde_test = "1.0.176" @@ -62,12 +63,14 @@ tracing-subscriber = { version = "0.3", features = ["env-filter"] } rcgen = "0.12.0" rustls = { version = "0.21.11", default-features = false, features = ["quic"] } tempfile = "3.10.0" +futures-util = "0.3.30" [features] default = ["fs-store"] -downloader = ["iroh-net", "parking_lot", "tokio-util/time", "hashlink"] -fs-store = ["reflink-copy", "redb", "redb_v1", "tempfile"] -metrics = ["iroh-metrics"] +downloader = ["dep:iroh-net", "dep:parking_lot", "tokio-util/time", "dep:hashlink"] +fs-store = ["dep:reflink-copy", "redb", "dep:redb_v1", "dep:tempfile"] +metrics = ["dep:iroh-metrics"] +redb = ["dep:redb"] [[example]] name = "provide-bytes" diff --git a/iroh-bytes/examples/fetch-stream.rs b/iroh-bytes/examples/fetch-stream.rs index 307827cbf5..e3ee545691 100644 --- a/iroh-bytes/examples/fetch-stream.rs +++ b/iroh-bytes/examples/fetch-stream.rs @@ -12,7 +12,7 @@ use std::io; use bao_tree::io::fsm::BaoContentItem; use bytes::Bytes; -use futures::{Stream, StreamExt}; +use futures_lite::{Stream, StreamExt}; use genawaiter::sync::Co; use genawaiter::sync::Gen; use tokio::io::AsyncWriteExt; diff --git a/iroh-bytes/examples/provide-bytes.rs b/iroh-bytes/examples/provide-bytes.rs index 89777d7da4..d5bdaa3618 100644 --- a/iroh-bytes/examples/provide-bytes.rs +++ b/iroh-bytes/examples/provide-bytes.rs @@ -110,10 +110,10 @@ async fn main() -> Result<()> { #[derive(Clone)] struct MockEventSender; -use futures::future::FutureExt; +use futures_lite::future::FutureExt; impl iroh_bytes::provider::EventSender for MockEventSender { - fn send(&self, _event: iroh_bytes::provider::Event) -> futures::future::BoxFuture<()> { + fn send(&self, _event: iroh_bytes::provider::Event) -> futures_lite::future::Boxed<()> { async move {}.boxed() } } diff --git a/iroh-bytes/src/downloader.rs b/iroh-bytes/src/downloader.rs index 4a8e910655..481a2dc304 100644 --- a/iroh-bytes/src/downloader.rs +++ b/iroh-bytes/src/downloader.rs @@ -37,7 +37,7 @@ use std::{ time::Duration, }; -use futures::{future::LocalBoxFuture, FutureExt, StreamExt}; +use futures_lite::{future::BoxedLocal, Stream, StreamExt}; use hashlink::LinkedHashSet; use iroh_base::hash::{BlobFormat, Hash, HashAndFormat}; use iroh_net::{MagicEndpoint, NodeAddr, NodeId}; @@ -72,9 +72,7 @@ const SERVICE_CHANNEL_CAPACITY: usize = 128; pub struct IntentId(pub u64); /// Trait modeling a dialer. This allows for IO-less testing. -pub trait Dialer: - futures::Stream)> + Unpin -{ +pub trait Dialer: Stream)> + Unpin { /// Type of connections returned by the Dialer. type Connection: Clone; /// Dial a node. @@ -99,7 +97,7 @@ pub enum FailureAction { } /// Future of a get request. -type GetFut = LocalBoxFuture<'static, InternalDownloadResult>; +type GetFut = BoxedLocal; /// Trait modelling performing a single request over a connection. This allows for IO-less testing. pub trait Getter { @@ -307,7 +305,7 @@ impl std::future::Future for DownloadHandle { use std::task::Poll::*; // make it easier on holders of the handle to poll the result, removing the receiver error // from the middle - match self.receiver.poll_unpin(cx) { + match std::pin::Pin::new(&mut self.receiver).poll(cx) { Ready(Ok(result)) => Ready(result), Ready(Err(_recv_err)) => Ready(Err(DownloadError::ActorClosed)), Pending => Pending, diff --git a/iroh-bytes/src/downloader/get.rs b/iroh-bytes/src/downloader/get.rs index 2fb39c2900..6cf4edbcd8 100644 --- a/iroh-bytes/src/downloader/get.rs +++ b/iroh-bytes/src/downloader/get.rs @@ -4,7 +4,7 @@ use crate::{ get::{db::get_to_db, error::GetError}, store::Store, }; -use futures::FutureExt; +use futures_lite::FutureExt; #[cfg(feature = "metrics")] use iroh_metrics::{inc, inc_by}; diff --git a/iroh-bytes/src/downloader/progress.rs b/iroh-bytes/src/downloader/progress.rs index 47ec74154c..8a0114dda2 100644 --- a/iroh-bytes/src/downloader/progress.rs +++ b/iroh-bytes/src/downloader/progress.rs @@ -155,7 +155,7 @@ impl ProgressSender for BroadcastProgressSender { futs }; - let failed_senders = futures::future::join_all(futs).await; + let failed_senders = futures_buffered::join_all(futs).await; // remove senders where the receiver is dropped if failed_senders.iter().any(|s| s.is_some()) { let mut inner = self.shared.lock(); diff --git a/iroh-bytes/src/downloader/test.rs b/iroh-bytes/src/downloader/test.rs index b18b185ebc..bdf55cc423 100644 --- a/iroh-bytes/src/downloader/test.rs +++ b/iroh-bytes/src/downloader/test.rs @@ -1,11 +1,11 @@ #![cfg(test)] use anyhow::anyhow; -use futures::FutureExt; use std::{ sync::atomic::AtomicUsize, time::{Duration, Instant}, }; +use futures_util::future::FutureExt; use iroh_net::key::SecretKey; use crate::{ @@ -101,7 +101,7 @@ async fn deduplication() { handles.push(h); } assert!( - futures::future::join_all(handles) + futures_buffered::join_all(handles) .await .into_iter() .all(|r| r.is_ok()), @@ -175,7 +175,7 @@ async fn max_concurrent_requests_total() { } assert!( - futures::future::join_all(handles) + futures_buffered::join_all(handles) .await .into_iter() .all(|r| r.is_ok()), @@ -215,7 +215,7 @@ async fn max_concurrent_requests_per_peer() { handles.push(h); } - futures::future::join_all(handles).await; + futures_buffered::join_all(handles).await; } /// Tests concurrent progress reporting for multiple intents. @@ -301,7 +301,7 @@ async fn concurrent_progress() { done_tx.send(()).unwrap(); - let (res_a, res_b, res_c) = futures::future::join3(handle_a, handle_b, handle_c).await; + let (res_a, res_b, res_c) = tokio::join!(handle_a, handle_b, handle_c); res_a.unwrap(); res_b.unwrap(); res_c.unwrap(); @@ -359,7 +359,7 @@ async fn long_queue() { handles.push(h); } - let res = futures::future::join_all(handles).await; + let res = futures_buffered::join_all(handles).await; for res in res { res.expect("all downloads to succeed"); } diff --git a/iroh-bytes/src/downloader/test/dialer.rs b/iroh-bytes/src/downloader/test/dialer.rs index a68ba575fb..89a1af69b2 100644 --- a/iroh-bytes/src/downloader/test/dialer.rs +++ b/iroh-bytes/src/downloader/test/dialer.rs @@ -60,7 +60,7 @@ impl Dialer for TestingDialer { } } -impl futures::Stream for TestingDialer { +impl Stream for TestingDialer { type Item = (NodeId, anyhow::Result); fn poll_next(self: std::pin::Pin<&mut Self>, cx: &mut Context<'_>) -> Poll> { diff --git a/iroh-bytes/src/downloader/test/getter.rs b/iroh-bytes/src/downloader/test/getter.rs index 1581d84af6..378d26579e 100644 --- a/iroh-bytes/src/downloader/test/getter.rs +++ b/iroh-bytes/src/downloader/test/getter.rs @@ -1,9 +1,8 @@ //! Implementation of [`super::Getter`] used for testing. -use std::{sync::Arc, time::Duration}; - -use futures::future::BoxFuture; +use futures_lite::{future::Boxed as BoxFuture, FutureExt}; use parking_lot::RwLock; +use std::{sync::Arc, time::Duration}; use super::*; @@ -16,7 +15,7 @@ pub(super) type RequestHandlerFn = Arc< NodeId, BroadcastProgressSender, Duration, - ) -> BoxFuture<'static, InternalDownloadResult> + ) -> BoxFuture + Send + Sync + 'static, diff --git a/iroh-bytes/src/get/db.rs b/iroh-bytes/src/get/db.rs index 69510d336e..2efa03b6b8 100644 --- a/iroh-bytes/src/get/db.rs +++ b/iroh-bytes/src/get/db.rs @@ -1,18 +1,19 @@ //! Functions that use the iroh-bytes protocol in conjunction with a bao store. -use bao_tree::ChunkNum; -use futures::{Future, StreamExt}; + +use std::future::Future; +use std::io; +use std::num::NonZeroU64; + +use futures_lite::StreamExt; use iroh_base::hash::Hash; use iroh_base::rpc::RpcError; use serde::{Deserialize, Serialize}; +use crate::hashseq::parse_hash_seq; use crate::protocol::RangeSpec; +use crate::store::BaoBatchWriter; use crate::store::BaoBlobSize; use crate::store::FallibleProgressBatchWriter; -use std::io; -use std::num::NonZeroU64; - -use crate::hashseq::parse_hash_seq; -use crate::store::BaoBatchWriter; use crate::{ get::{ @@ -28,7 +29,7 @@ use crate::{ BlobFormat, HashAndFormat, }; use anyhow::anyhow; -use bao_tree::ChunkRanges; +use bao_tree::{ChunkNum, ChunkRanges}; use iroh_io::AsyncSliceReader; use tracing::trace; @@ -294,7 +295,7 @@ pub async fn blob_info(db: &D, hash: &Hash) -> io::Result(db: &D, hash_seq: &[Hash]) -> io::Result>> { - let items = futures::stream::iter(hash_seq) + let items = futures_lite::stream::iter(hash_seq) .then(|hash| blob_info(db, hash)) .collect::>(); items.await.into_iter().collect() diff --git a/iroh-bytes/src/provider.rs b/iroh-bytes/src/provider.rs index 3c23b3684b..52371cba3a 100644 --- a/iroh-bytes/src/provider.rs +++ b/iroh-bytes/src/provider.rs @@ -5,7 +5,7 @@ use std::time::Duration; use anyhow::{Context, Result}; use bao_tree::io::fsm::{encode_ranges_validated, Outboard}; use bao_tree::io::EncodeError; -use futures::future::BoxFuture; +use futures_lite::future::Boxed as BoxFuture; use iroh_base::rpc::RpcError; use iroh_io::stats::{ SliceReaderStats, StreamWriterStats, TrackingSliceReader, TrackingStreamWriter, diff --git a/iroh-bytes/src/store/bao_file.rs b/iroh-bytes/src/store/bao_file.rs index 64adc6adc0..1dc2f72a36 100644 --- a/iroh-bytes/src/store/bao_file.rs +++ b/iroh-bytes/src/store/bao_file.rs @@ -720,7 +720,7 @@ impl BaoBatchWriter for BaoFileWriter { #[cfg(test)] pub mod test_support { - use std::{io::Cursor, ops::Range}; + use std::{future::Future, io::Cursor, ops::Range}; use bao_tree::{ io::{ @@ -731,7 +731,8 @@ pub mod test_support { }, BlockSize, ChunkRanges, }; - use futures::{Future, Stream, StreamExt}; + use futures_lite::{Stream, StreamExt}; + use iroh_base::hash::Hash; use iroh_io::AsyncStreamReader; use rand::RngCore; use range_collections::RangeSet2; @@ -853,7 +854,7 @@ pub mod test_support { .chunks(mtu) .map(Bytes::copy_from_slice) .collect::>(); - futures::stream::iter(parts).then(move |part| async move { + futures_lite::stream::iter(parts).then(move |part| async move { tokio::time::sleep(delay).await; part }) @@ -872,7 +873,7 @@ mod tests { use std::io::Write; use bao_tree::{blake3, ChunkNum, ChunkRanges}; - use futures::StreamExt; + use futures_lite::StreamExt; use iroh_io::TokioStreamReader; use tests::test_support::{ decode_response_into_batch, local, make_wire_data, random_test_data, trickle, validate, diff --git a/iroh-bytes/src/store/fs.rs b/iroh-bytes/src/store/fs.rs index 799f115555..e0a4d192f0 100644 --- a/iroh-bytes/src/store/fs.rs +++ b/iroh-bytes/src/store/fs.rs @@ -78,14 +78,14 @@ use bao_tree::io::{ sync::{ReadAt, Size}, }; use bytes::Bytes; -use futures::{channel::oneshot, Stream, StreamExt}; +use futures_lite::{Stream, StreamExt}; use iroh_base::hash::{BlobFormat, Hash, HashAndFormat}; use iroh_io::AsyncSliceReader; use redb::{AccessGuard, DatabaseError, ReadableTable, StorageError}; use serde::{Deserialize, Serialize}; use smallvec::SmallVec; -use tokio::io::AsyncWriteExt; +use tokio::{io::AsyncWriteExt, sync::oneshot}; use tracing::trace_span; mod import_flat_store; @@ -1250,7 +1250,7 @@ pub(crate) enum OuterError { #[error("progress send error: {0}")] ProgressSend(#[from] ProgressSendError), #[error("recv error: {0}")] - Recv(#[from] oneshot::Canceled), + Recv(#[from] oneshot::error::RecvError), #[error("recv error: {0}")] FlumeRecv(#[from] flume::RecvError), #[error("join error: {0}")] diff --git a/iroh-bytes/src/store/fs/test_support.rs b/iroh-bytes/src/store/fs/test_support.rs index d6c715d145..10dd3530b4 100644 --- a/iroh-bytes/src/store/fs/test_support.rs +++ b/iroh-bytes/src/store/fs/test_support.rs @@ -7,7 +7,7 @@ use std::{ path::{Path, PathBuf}, }; -use futures::channel::oneshot; +use tokio::sync::oneshot; use super::{ tables::{ReadableTables, Tables}, diff --git a/iroh-bytes/src/store/fs/tests.rs b/iroh-bytes/src/store/fs/tests.rs index 5844b78738..901e7106ab 100644 --- a/iroh-bytes/src/store/fs/tests.rs +++ b/iroh-bytes/src/store/fs/tests.rs @@ -36,7 +36,7 @@ pub fn to_stream( .chunks(mtu) .map(Bytes::copy_from_slice) .collect::>(); - futures::stream::iter(parts) + futures_lite::stream::iter(parts) .then(move |part| async move { tokio::time::sleep(delay).await; io::Result::Ok(part) diff --git a/iroh-bytes/src/store/mem.rs b/iroh-bytes/src/store/mem.rs index 395485d606..47f75e9fd2 100644 --- a/iroh-bytes/src/store/mem.rs +++ b/iroh-bytes/src/store/mem.rs @@ -6,7 +6,7 @@ use bao_tree::{ BaoTree, }; use bytes::{Bytes, BytesMut}; -use futures::{Stream, StreamExt}; +use futures_lite::{Stream, StreamExt}; use iroh_base::hash::{BlobFormat, Hash, HashAndFormat}; use iroh_io::AsyncSliceReader; use std::{ diff --git a/iroh-bytes/src/store/readonly_mem.rs b/iroh-bytes/src/store/readonly_mem.rs index 905471b327..d2385aa7a2 100644 --- a/iroh-bytes/src/store/readonly_mem.rs +++ b/iroh-bytes/src/store/readonly_mem.rs @@ -24,7 +24,7 @@ use bao_tree::{ io::{outboard::PreOrderMemOutboard, sync::Outboard}, }; use bytes::Bytes; -use futures::Stream; +use futures_lite::Stream; use iroh_io::AsyncSliceReader; use tokio::io::AsyncWriteExt; diff --git a/iroh-bytes/src/store/traits.rs b/iroh-bytes/src/store/traits.rs index f8a88b0784..e0ec3e6b39 100644 --- a/iroh-bytes/src/store/traits.rs +++ b/iroh-bytes/src/store/traits.rs @@ -1,12 +1,12 @@ //! Traits for in-memory or persistent maps of blob with bao encoded outboards. -use std::{collections::BTreeSet, io, path::PathBuf}; +use std::{collections::BTreeSet, future::Future, io, path::PathBuf}; use bao_tree::{ io::fsm::{BaoContentItem, Outboard}, BaoTree, ChunkRanges, }; use bytes::Bytes; -use futures::{Future, Stream, StreamExt}; +use futures_lite::{Stream, StreamExt}; use genawaiter::rc::{Co, Gen}; use iroh_base::rpc::RpcError; use iroh_io::AsyncSliceReader; @@ -433,7 +433,7 @@ async fn validate_impl( total: complete.len() as u64, }) .await?; - let complete_result = futures::stream::iter(complete) + let complete_result = futures_lite::stream::iter(complete) .map(|hash| { let store = store.clone(); let tx = tx.clone(); @@ -482,7 +482,7 @@ async fn validate_impl( .buffered_unordered(validate_parallelism) .collect::>() .await; - let partial_result = futures::stream::iter(partial) + let partial_result = futures_lite::stream::iter(partial) .map(|hash| { let store = store.clone(); let tx = tx.clone(); diff --git a/iroh-bytes/src/util/progress.rs b/iroh-bytes/src/util/progress.rs index 5a21fb57e9..8915b1cfb2 100644 --- a/iroh-bytes/src/util/progress.rs +++ b/iroh-bytes/src/util/progress.rs @@ -1,10 +1,9 @@ //! Utilities for reporting progress. //! //! The main entry point is the [ProgressSender] trait. -use std::{io, marker::PhantomData, ops::Deref, sync::Arc}; +use std::{future::Future, io, marker::PhantomData, ops::Deref, sync::Arc}; use bytes::Bytes; -use futures::{future::BoxFuture, Future, FutureExt}; use iroh_io::AsyncSliceWriter; /// A general purpose progress sender. This should be usable for reporting progress @@ -58,7 +57,7 @@ use iroh_io::AsyncSliceWriter; /// operation that reports progress of type `B`. If you have a transformation for /// every `B` to an `A`, you can use the [ProgressSender::with_map] method to transform the message. /// -/// This is similar to the [futures::SinkExt::with] method. +/// This is similar to the `futures::SinkExt::with` method. /// /// # Filtering the message type /// @@ -147,13 +146,15 @@ impl std::fmt::Debug for BoxedProgressSender { } } +type BoxFuture<'a, T> = std::pin::Pin + Send + 'a>>; + /// Boxable progress sender trait BoxableProgressSender: IdGenerator + std::fmt::Debug + Send + Sync + 'static { /// Send a message and wait if the receiver is full. /// /// Use this to send important progress messages where delivery must be guaranteed. #[must_use] - fn send(&self, msg: T) -> BoxFuture>; + fn send(&self, msg: T) -> BoxFuture<'_, ProgressSendResult<()>>; /// Try to send a message and drop it if the receiver is full. /// @@ -169,8 +170,8 @@ trait BoxableProgressSender: IdGenerator + std::fmt::Debug + Send + Sync + 's impl BoxableProgressSender for BoxableProgressSenderWrapper { - fn send(&self, msg: I::Msg) -> BoxFuture> { - self.0.send(msg).boxed() + fn send(&self, msg: I::Msg) -> BoxFuture<'_, ProgressSendResult<()>> { + Box::pin(self.0.send(msg)) } fn try_send(&self, msg: I::Msg) -> ProgressSendResult<()> { diff --git a/iroh-cli/Cargo.toml b/iroh-cli/Cargo.toml index 32183f051c..04744d5533 100644 --- a/iroh-cli/Cargo.toml +++ b/iroh-cli/Cargo.toml @@ -33,7 +33,8 @@ derive_more = { version = "1.0.0-beta.1", features = ["display"] } dialoguer = { version = "0.11.0", default-features = false } dirs-next = "2.0.0" flume = "0.11.0" -futures = "0.3.30" +futures-buffered = "0.2.4" +futures-lite = "2.3" hex = "0.4.3" human-time = "0.1.6" indicatif = { version = "0.17", features = ["tokio"] } @@ -42,7 +43,7 @@ iroh-metrics = { version = "0.14.0", path = "../iroh-metrics" } parking_lot = "0.12.1" portable-atomic = "1" postcard = "1.0.8" -quic-rpc = { version = "0.7.0", features = ["flume-transport", "quinn-transport"] } +quic-rpc = { version = "0.8.0", features = ["flume-transport", "quinn-transport"] } quinn = "0.10.2" rand = "0.8.5" rustyline = "12.0.0" diff --git a/iroh-cli/src/commands/author.rs b/iroh-cli/src/commands/author.rs index 09880369bd..64a02dd603 100644 --- a/iroh-cli/src/commands/author.rs +++ b/iroh-cli/src/commands/author.rs @@ -1,7 +1,7 @@ use anyhow::{bail, Result}; use clap::Parser; use derive_more::FromStr; -use futures::TryStreamExt; +use futures_lite::StreamExt; use iroh::base::base32::fmt_short; use iroh::sync::{Author, AuthorId}; diff --git a/iroh-cli/src/commands/blob.rs b/iroh-cli/src/commands/blob.rs index c03c50d53d..875b703cbb 100644 --- a/iroh-cli/src/commands/blob.rs +++ b/iroh-cli/src/commands/blob.rs @@ -8,7 +8,7 @@ use std::{ use anyhow::{anyhow, bail, ensure, Context, Result}; use clap::Subcommand; use console::{style, Emoji}; -use futures::{Stream, StreamExt}; +use futures_lite::{Stream, StreamExt}; use indicatif::{ HumanBytes, HumanDuration, MultiProgress, ProgressBar, ProgressDrawTarget, ProgressState, ProgressStyle, diff --git a/iroh-cli/src/commands/doc.rs b/iroh-cli/src/commands/doc.rs index 6e7815a5b6..59bcbb2940 100644 --- a/iroh-cli/src/commands/doc.rs +++ b/iroh-cli/src/commands/doc.rs @@ -10,7 +10,8 @@ use anyhow::{anyhow, bail, Context, Result}; use clap::Parser; use colored::Colorize; use dialoguer::Confirm; -use futures::{Stream, StreamExt, TryStreamExt}; +use futures_buffered::BufferedStreamExt; +use futures_lite::{Stream, StreamExt}; use indicatif::{HumanBytes, HumanDuration, MultiProgress, ProgressBar, ProgressStyle}; use iroh::base::{base32::fmt_short, node_addr::AddrInfoOptions}; use quic_rpc::ServiceConnection; @@ -808,8 +809,11 @@ where (String, u64, Option, u64), >::new())); - let _stats: Vec = blob_add_progress - .filter_map(|item| async { + let doc2 = doc.clone(); + let imp2 = task_imp.clone(); + + let _stats: Vec<_> = blob_add_progress + .filter_map(|item| { let item = match item.context("Error adding files") { Err(e) => return Some(Err(e)), Ok(item) => item, @@ -880,20 +884,22 @@ where } } }) - .try_chunks(1024) - .map_ok(|chunks| { - futures::stream::iter(chunks.into_iter().map(|(key, hash, size)| { - let doc = doc.clone(); - let imp = task_imp.clone(); - Ok(async move { - doc.set_hash(author_id, key, hash, size).await?; - imp.import_progress(); - anyhow::Ok(size) - }) - })) + .map(move |res| { + let doc = doc2.clone(); + let imp = imp2.clone(); + async move { + match res { + Ok((key, hash, size)) => { + let doc = doc.clone(); + doc.set_hash(author_id, key, hash, size).await?; + imp.import_progress(); + Ok(size) + } + Err(err) => Err(err), + } + } }) - .try_flatten() - .try_buffer_unordered(64) + .buffered_unordered(128) .try_collect() .await?; diff --git a/iroh-cli/src/commands/doctor.rs b/iroh-cli/src/commands/doctor.rs index b7e007c242..4840e5af06 100644 --- a/iroh-cli/src/commands/doctor.rs +++ b/iroh-cli/src/commands/doctor.rs @@ -14,7 +14,7 @@ use crate::config::{iroh_data_root, NodeConfig}; use anyhow::Context; use clap::Subcommand; -use futures::StreamExt; +use futures_lite::StreamExt; use indicatif::{HumanBytes, MultiProgress, ProgressBar}; use iroh::{ base::ticket::Ticket, @@ -236,7 +236,7 @@ fn update_pb( } }) } else { - tokio::spawn(futures::future::ready(())) + tokio::spawn(std::future::ready(())) } } @@ -366,7 +366,7 @@ impl Gui { .template("{spinner:.green} [{bar:80.cyan/blue}] {msg} {bytes}/{total_bytes} ({bytes_per_sec})").unwrap() .progress_chars("█▉▊▋▌▍▎▏ ")); let counters2 = counters.clone(); - let counter_task = AbortingJoinHandle(tokio::spawn(async move { + let counter_task = AbortingJoinHandle::from(tokio::spawn(async move { loop { Self::update_counters(&counters2); Self::update_connection_info(&conn_info, &endpoint, &node_id); diff --git a/iroh-cli/src/commands/node.rs b/iroh-cli/src/commands/node.rs index 1c456dd0c2..0238b514a6 100644 --- a/iroh-cli/src/commands/node.rs +++ b/iroh-cli/src/commands/node.rs @@ -5,7 +5,7 @@ use clap::Subcommand; use colored::Colorize; use comfy_table::Table; use comfy_table::{presets::NOTHING, Cell}; -use futures::{Stream, StreamExt}; +use futures_lite::{Stream, StreamExt}; use human_time::ToHumanTimeString; use iroh::client::Iroh; use iroh::net::{key::PublicKey, magic_endpoint::ConnectionInfo, magicsock::DirectAddrInfo}; diff --git a/iroh-cli/src/commands/start.rs b/iroh-cli/src/commands/start.rs index e529254c32..36a3db1480 100644 --- a/iroh-cli/src/commands/start.rs +++ b/iroh-cli/src/commands/start.rs @@ -1,9 +1,8 @@ -use std::{net::SocketAddr, path::Path, time::Duration}; +use std::{future::Future, net::SocketAddr, path::Path, time::Duration}; use crate::config::NodeConfig; use anyhow::Result; use colored::Colorize; -use futures::Future; use indicatif::{ProgressBar, ProgressDrawTarget, ProgressStyle}; use iroh::node::Node; use iroh::{ @@ -89,7 +88,7 @@ where Ok(()) => { // keep the task open forever if not running in single-command mode if run_type == RunType::UntilStopped { - futures::future::pending().await + futures_lite::future::pending().await } Ok(()) } @@ -98,26 +97,18 @@ where .instrument(info_span!("command")) }); - let node2 = node.clone(); tokio::select! { biased; // always abort on signal-c _ = tokio::signal::ctrl_c(), if run_type != RunType::SingleCommandNoAbort => { command_task.abort(); - node.shutdown(); - node.await?; + node.shutdown().await?; } // abort if the command task finishes (will run forever if not in single-command mode) res = &mut command_task => { - node.shutdown(); - let _ = node.await; + let _ = node.shutdown().await; res??; } - // abort if the node future completes (shutdown called or error) - res = node2 => { - command_task.abort(); - res?; - } } Ok(()) } diff --git a/iroh-cli/src/commands/tag.rs b/iroh-cli/src/commands/tag.rs index f3b2d011f7..69edf005c3 100644 --- a/iroh-cli/src/commands/tag.rs +++ b/iroh-cli/src/commands/tag.rs @@ -1,7 +1,7 @@ use anyhow::Result; use bytes::Bytes; use clap::Subcommand; -use futures::StreamExt; +use futures_lite::StreamExt; use iroh::bytes::Tag; use iroh::{client::Iroh, rpc_protocol::ProviderService}; use quic_rpc::ServiceConnection; diff --git a/iroh-dns-server/Cargo.toml b/iroh-dns-server/Cargo.toml index e68225cc8e..b9c571116c 100644 --- a/iroh-dns-server/Cargo.toml +++ b/iroh-dns-server/Cargo.toml @@ -19,7 +19,7 @@ bytes = "1.5.0" clap = { version = "4.5.1", features = ["derive"] } derive_more = { version = "1.0.0-beta.1", features = ["debug", "display", "into", "from"] } dirs-next = "2.0.0" -futures = "0.3.30" +futures-lite = "2.3.0" governor = "0.6.3" hickory-proto = "0.24.0" hickory-server = { version = "0.24.0", features = ["dns-over-rustls"] } diff --git a/iroh-dns-server/src/http/tls.rs b/iroh-dns-server/src/http/tls.rs index 4e079aefd4..1133d498d5 100644 --- a/iroh-dns-server/src/http/tls.rs +++ b/iroh-dns-server/src/http/tls.rs @@ -10,7 +10,7 @@ use axum_server::{ accept::Accept, tls_rustls::{RustlsAcceptor, RustlsConfig}, }; -use futures::{future::BoxFuture, FutureExt}; +use futures_lite::{future::Boxed as BoxFuture, FutureExt}; use serde::{Deserialize, Serialize}; use tokio::io::{AsyncRead, AsyncWrite}; use tokio_rustls_acme::{axum::AxumAcceptor, caches::DirCache, AcmeConfig}; @@ -62,7 +62,7 @@ impl Acce { type Stream = tokio_rustls::server::TlsStream; type Service = S; - type Future = BoxFuture<'static, io::Result<(Self::Stream, Self::Service)>>; + type Future = BoxFuture>; fn accept(&self, stream: I, service: S) -> Self::Future { match self { diff --git a/iroh-dns-server/src/main.rs b/iroh-dns-server/src/main.rs index 6a7f88d673..511a4c58db 100644 --- a/iroh-dns-server/src/main.rs +++ b/iroh-dns-server/src/main.rs @@ -1,14 +1,16 @@ #![allow(unused_imports)] +use std::future::Future; +use std::net::{Ipv4Addr, SocketAddr}; +use std::path::PathBuf; + use anyhow::Result; use axum::{routing::get, Router}; use clap::Parser; -use futures::{Future, FutureExt}; +use futures_lite::FutureExt; use iroh_dns_server::{ config::Config, metrics::init_metrics, server::run_with_config_until_ctrl_c, }; -use std::net::{Ipv4Addr, SocketAddr}; -use std::path::PathBuf; use tokio::task::JoinSet; use tokio_util::sync::CancellationToken; use tracing::{debug, debug_span, error, error_span, Instrument, Span}; diff --git a/iroh-gossip/Cargo.toml b/iroh-gossip/Cargo.toml index d37908ac32..ecbc550970 100644 --- a/iroh-gossip/Cargo.toml +++ b/iroh-gossip/Cargo.toml @@ -31,7 +31,7 @@ iroh-metrics = { version = "0.14.0", path = "../iroh-metrics" } iroh-base = { version = "0.14.0", path = "../iroh-base" } # net dependencies (optional) -futures = { version = "0.3.25", optional = true } +futures-lite = { version = "2.3", optional = true } iroh-net = { path = "../iroh-net", version = "0.14.0", optional = true, default-features = false } quinn = { version = "0.10", optional = true } tokio = { version = "1", optional = true, features = ["io-util", "sync", "rt", "macros", "net", "fs"] } @@ -47,7 +47,7 @@ url = "2.4.0" [features] default = ["net"] -net = ["futures", "iroh-net", "quinn", "tokio", "tokio-util"] +net = ["dep:futures-lite", "dep:iroh-net", "dep:quinn", "dep:tokio", "dep:tokio-util"] [[example]] name = "chat" diff --git a/iroh-gossip/src/net.rs b/iroh-gossip/src/net.rs index a31b9b5ba3..06f67911c6 100644 --- a/iroh-gossip/src/net.rs +++ b/iroh-gossip/src/net.rs @@ -2,7 +2,7 @@ use anyhow::{anyhow, Context}; use bytes::{Bytes, BytesMut}; -use futures::{stream::Stream, FutureExt}; +use futures_lite::stream::Stream; use genawaiter::sync::{Co, Gen}; use iroh_net::{ dialer::Dialer, key::PublicKey, magic_endpoint::get_remote_node_id, AddrInfo, MagicEndpoint, @@ -10,7 +10,7 @@ use iroh_net::{ }; use rand::rngs::StdRng; use rand_core::SeedableRng; -use std::{collections::HashMap, future::Future, sync::Arc, task::Poll, time::Instant}; +use std::{collections::HashMap, future::Future, pin::Pin, sync::Arc, task::Poll, time::Instant}; use tokio::{ sync::{broadcast, mpsc, oneshot}, task::JoinHandle, @@ -276,7 +276,7 @@ impl Future for JoinTopicFut { mut self: std::pin::Pin<&mut Self>, cx: &mut std::task::Context<'_>, ) -> std::task::Poll { - let res = self.0.poll_unpin(cx); + let res = Pin::new(&mut self.0).poll(cx); match res { Poll::Pending => Poll::Pending, Poll::Ready(Err(_err)) => Poll::Ready(Err(anyhow!("gossip actor dropped"))), diff --git a/iroh-gossip/src/net/util.rs b/iroh-gossip/src/net/util.rs index a53cd4be12..1101300292 100644 --- a/iroh-gossip/src/net/util.rs +++ b/iroh-gossip/src/net/util.rs @@ -120,7 +120,7 @@ impl Timers { sleep.await; self.map.drain_until(instant) } - None => futures::future::pending().await, + None => std::future::pending().await, } } } diff --git a/iroh-net/Cargo.toml b/iroh-net/Cargo.toml index f63e03dffd..c4b5852e57 100644 --- a/iroh-net/Cargo.toml +++ b/iroh-net/Cargo.toml @@ -24,7 +24,10 @@ default-net = "0.20" der = { version = "0.7", features = ["alloc", "derive"] } derive_more = { version = "1.0.0-beta.1", features = ["debug", "display", "from", "try_into", "deref"] } flume = "0.11" -futures = "0.3.25" +futures-buffered = "0.2.4" +futures-lite = "2.3" +futures-sink = "0.3.25" +futures-util = "0.3.25" governor = "0.6.0" hex = "0.4.3" hickory-proto = "0.24.0" diff --git a/iroh-net/examples/connect-unreliable.rs b/iroh-net/examples/connect-unreliable.rs index fc4ebd1315..1a8ce141a7 100644 --- a/iroh-net/examples/connect-unreliable.rs +++ b/iroh-net/examples/connect-unreliable.rs @@ -9,7 +9,7 @@ use std::net::SocketAddr; use anyhow::Context; use clap::Parser; -use futures::StreamExt; +use futures_lite::StreamExt; use iroh_base::base32; use iroh_net::{ key::SecretKey, diff --git a/iroh-net/examples/connect.rs b/iroh-net/examples/connect.rs index 2ec735cbca..3d9146ac86 100644 --- a/iroh-net/examples/connect.rs +++ b/iroh-net/examples/connect.rs @@ -9,7 +9,7 @@ use std::net::SocketAddr; use anyhow::Context; use clap::Parser; -use futures::StreamExt; +use futures_lite::StreamExt; use iroh_base::base32; use iroh_net::relay::RelayUrl; use iroh_net::{key::SecretKey, relay::RelayMode, MagicEndpoint, NodeAddr}; diff --git a/iroh-net/examples/listen-unreliable.rs b/iroh-net/examples/listen-unreliable.rs index 4ed651ff1a..ce6467cd8a 100644 --- a/iroh-net/examples/listen-unreliable.rs +++ b/iroh-net/examples/listen-unreliable.rs @@ -4,7 +4,7 @@ //! run this example from the project root: //! $ cargo run --example listen-unreliable use anyhow::Context; -use futures::StreamExt; +use futures_lite::StreamExt; use iroh_base::base32; use iroh_net::{key::SecretKey, relay::RelayMode, MagicEndpoint}; use tracing::info; diff --git a/iroh-net/examples/listen.rs b/iroh-net/examples/listen.rs index 5049ac4343..a99a3b1f65 100644 --- a/iroh-net/examples/listen.rs +++ b/iroh-net/examples/listen.rs @@ -4,7 +4,7 @@ //! run this example from the project root: //! $ cargo run --example listen use anyhow::Context; -use futures::StreamExt; +use futures_lite::StreamExt; use iroh_base::base32; use iroh_net::{key::SecretKey, relay::RelayMode, MagicEndpoint}; use tracing::{debug, info}; diff --git a/iroh-net/src/bin/iroh-relay.rs b/iroh-net/src/bin/iroh-relay.rs index fcec8d2b8f..f9717a46a1 100644 --- a/iroh-net/src/bin/iroh-relay.rs +++ b/iroh-net/src/bin/iroh-relay.rs @@ -4,6 +4,7 @@ use std::{ borrow::Cow, + future::Future, net::{IpAddr, Ipv6Addr, SocketAddr}, path::{Path, PathBuf}, pin::Pin, @@ -12,7 +13,7 @@ use std::{ use anyhow::{anyhow, bail, Context as _, Result}; use clap::Parser; -use futures::{Future, StreamExt}; +use futures_lite::StreamExt; use http::{response::Builder as ResponseBuilder, HeaderMap}; use hyper::body::Incoming; use hyper::{Method, Request, Response, StatusCode}; diff --git a/iroh-net/src/dialer.rs b/iroh-net/src/dialer.rs index 1cd5854a38..f3b5e737b6 100644 --- a/iroh-net/src/dialer.rs +++ b/iroh-net/src/dialer.rs @@ -4,7 +4,7 @@ use std::{collections::HashMap, pin::Pin, task::Poll}; use crate::{key::PublicKey, MagicEndpoint, NodeAddr, NodeId}; use anyhow::anyhow; -use futures::future::BoxFuture; +use futures_lite::future::Boxed as BoxFuture; use tokio::task::JoinSet; use tokio_util::sync::CancellationToken; use tracing::error; @@ -78,14 +78,14 @@ impl Dialer { } None => { error!("no more pending conns available"); - futures::future::pending().await + std::future::pending().await } } }; (node_id, res) } - true => futures::future::pending().await, + true => std::future::pending().await, } } @@ -95,7 +95,7 @@ impl Dialer { } } -impl futures::Stream for Dialer { +impl futures_lite::Stream for Dialer { type Item = (PublicKey, anyhow::Result); fn poll_next( @@ -117,4 +117,4 @@ impl futures::Stream for Dialer { } /// Future for a pending dial operation -pub type DialFuture = BoxFuture<'static, (PublicKey, anyhow::Result)>; +pub type DialFuture = BoxFuture<(PublicKey, anyhow::Result)>; diff --git a/iroh-net/src/discovery.rs b/iroh-net/src/discovery.rs index bb44aa14dd..b68c61e7bd 100644 --- a/iroh-net/src/discovery.rs +++ b/iroh-net/src/discovery.rs @@ -3,7 +3,7 @@ use std::time::Duration; use anyhow::{anyhow, ensure, Result}; -use futures::{stream::BoxStream, StreamExt}; +use futures_lite::stream::{Boxed as BoxStream, StreamExt}; use iroh_base::node_addr::NodeAddr; use tokio::{sync::oneshot, task::JoinHandle}; use tracing::{debug, error_span, warn, Instrument}; @@ -41,7 +41,7 @@ pub trait Discovery: std::fmt::Debug + Send + Sync { &self, _endpoint: MagicEndpoint, _node_id: NodeId, - ) -> Option>> { + ) -> Option>> { None } } @@ -107,12 +107,13 @@ impl Discovery for ConcurrentDiscovery { &self, endpoint: MagicEndpoint, node_id: NodeId, - ) -> Option>> { + ) -> Option>> { let streams = self .services .iter() .filter_map(|service| service.resolve(endpoint.clone(), node_id)); - let streams = futures::stream::select_all(streams); + + let streams = futures_buffered::Merge::from_iter(streams); Some(Box::pin(streams)) } } @@ -197,7 +198,7 @@ impl DiscoveryTask { fn create_stream( ep: &MagicEndpoint, node_id: NodeId, - ) -> Result>> { + ) -> Result>> { let discovery = ep .discovery() .ok_or_else(|| anyhow!("No discovery service configured"))?; @@ -288,7 +289,6 @@ mod tests { time::SystemTime, }; - use futures::stream; use parking_lot::Mutex; use rand::Rng; @@ -346,7 +346,7 @@ mod tests { &self, endpoint: MagicEndpoint, node_id: NodeId, - ) -> Option>> { + ) -> Option>> { let addr_info = match self.resolve_wrong { false => self.shared.nodes.lock().get(&node_id).cloned(), true => { @@ -378,9 +378,9 @@ mod tests { ); Ok(item) }; - stream::once(fut).boxed() + futures_lite::stream::once_future(fut).boxed() } - None => stream::empty().boxed(), + None => futures_lite::stream::empty().boxed(), }; Some(stream) } @@ -395,8 +395,8 @@ mod tests { &self, _endpoint: MagicEndpoint, _node_id: NodeId, - ) -> Option>> { - Some(stream::empty().boxed()) + ) -> Option>> { + Some(futures_lite::stream::empty().boxed()) } } diff --git a/iroh-net/src/discovery/dns.rs b/iroh-net/src/discovery/dns.rs index befb00cc48..e791edc7d3 100644 --- a/iroh-net/src/discovery/dns.rs +++ b/iroh-net/src/discovery/dns.rs @@ -1,13 +1,12 @@ //! DNS node discovery for iroh-net +use anyhow::Result; +use futures_lite::stream::Boxed as BoxStream; + use crate::{ discovery::{Discovery, DiscoveryItem}, - MagicEndpoint, NodeId, + dns, MagicEndpoint, NodeId, }; -use anyhow::Result; -use futures::{future::FutureExt, stream::BoxStream, StreamExt}; - -use crate::dns; /// The n0 testing DNS node origin pub const N0_DNS_NODE_ORIGIN: &str = "dns.iroh.link"; @@ -54,17 +53,19 @@ impl Discovery for DnsDiscovery { &self, ep: MagicEndpoint, node_id: NodeId, - ) -> Option>> { + ) -> Option>> { let resolver = ep.dns_resolver().clone(); + let origin_domain = self.origin_domain.clone(); let fut = async move { let node_addr = - dns::node_info::lookup_by_id(&resolver, &node_id, &self.origin_domain).await?; + dns::node_info::lookup_by_id(&resolver, &node_id, &origin_domain).await?; Ok(DiscoveryItem { provenance: "dns", last_updated: None, addr_info: node_addr.info, }) }; - Some(fut.into_stream().boxed()) + let stream = futures_lite::stream::once_future(fut); + Some(Box::pin(stream)) } } diff --git a/iroh-net/src/dns.rs b/iroh-net/src/dns.rs index e3d165e9ba..da1843ad38 100644 --- a/iroh-net/src/dns.rs +++ b/iroh-net/src/dns.rs @@ -107,7 +107,7 @@ pub async fn lookup_ipv4_ipv6( let ipv4 = tokio::time::timeout(timeout, ipv4); let ipv6 = tokio::time::timeout(timeout, ipv6); - let res = futures::future::join(ipv4, ipv6).await; + let res = tokio::join!(ipv4, ipv6); match res { (Ok(Ok(ipv4)), Ok(Ok(ipv6))) => { let res = ipv4 diff --git a/iroh-net/src/magic_endpoint.rs b/iroh-net/src/magic_endpoint.rs index f17a9f050d..5b2de7adab 100644 --- a/iroh-net/src/magic_endpoint.rs +++ b/iroh-net/src/magic_endpoint.rs @@ -4,7 +4,7 @@ use std::{net::SocketAddr, path::PathBuf, sync::Arc, time::Duration}; use anyhow::{anyhow, bail, ensure, Context, Result}; use derive_more::Debug; -use futures::StreamExt; +use futures_lite::StreamExt; use quinn_proto::VarInt; use tokio_util::sync::{CancellationToken, WaitForCancellationFuture}; use tracing::{debug, trace}; @@ -327,7 +327,7 @@ impl MagicEndpoint { /// /// To get the current endpoints, drop the stream after the first item was received: /// ``` - /// use futures::StreamExt; + /// use futures_lite::StreamExt; /// use iroh_net::MagicEndpoint; /// /// # let rt = tokio::runtime::Builder::new_current_thread().enable_all().build().unwrap(); diff --git a/iroh-net/src/magicsock.rs b/iroh-net/src/magicsock.rs index 55734012e1..d222a386e9 100644 --- a/iroh-net/src/magicsock.rs +++ b/iroh-net/src/magicsock.rs @@ -35,7 +35,7 @@ use std::{ use anyhow::{anyhow, Context as _, Result}; use bytes::Bytes; -use futures::{FutureExt, Stream}; +use futures_lite::{FutureExt, Stream}; use iroh_metrics::{inc, inc_by}; use quinn::AsyncUdpSocket; use rand::{seq::SliceRandom, Rng, SeedableRng}; @@ -876,7 +876,7 @@ impl Inner { dst_key: PublicKey, msg: &disco::Message, ) -> io::Result { - futures::future::poll_fn(move |cx| self.poll_send_disco_message_udp(dst, dst_key, msg, cx)) + std::future::poll_fn(move |cx| self.poll_send_disco_message_udp(dst, dst_key, msg, cx)) .await } @@ -1347,7 +1347,7 @@ impl MagicSock { /// /// To get the current endpoints, drop the stream after the first item was received: /// ``` - /// use futures::StreamExt; + /// use futures_lite::StreamExt; /// use iroh_net::magicsock::MagicSock; /// /// # let rt = tokio::runtime::Builder::new_current_thread().enable_all().build().unwrap(); @@ -1783,7 +1783,7 @@ impl Actor { return; } if let Err(err) = - futures::future::poll_fn(|cx| self.inner.poll_handle_ping_actions(cx, &mut msgs)).await + std::future::poll_fn(|cx| self.inner.poll_handle_ping_actions(cx, &mut msgs)).await { debug!("failed to send pings: {err:?}"); } @@ -2579,7 +2579,7 @@ fn disco_message_sent(msg: &disco::Message) { #[cfg(test)] pub(crate) mod tests { use anyhow::Context; - use futures::StreamExt; + use futures_lite::StreamExt; use iroh_test::CallOnDrop; use rand::RngCore; diff --git a/iroh-net/src/magicsock/node_map.rs b/iroh-net/src/magicsock/node_map.rs index 18e0ba5df9..ccc78bf49e 100644 --- a/iroh-net/src/magicsock/node_map.rs +++ b/iroh-net/src/magicsock/node_map.rs @@ -9,7 +9,7 @@ use std::{ }; use anyhow::{ensure, Context as _}; -use futures::Stream; +use futures_lite::stream::Stream; use iroh_base::key::NodeId; use iroh_metrics::inc; use parking_lot::Mutex; diff --git a/iroh-net/src/magicsock/relay_actor.rs b/iroh-net/src/magicsock/relay_actor.rs index cf97db5419..29c982e64b 100644 --- a/iroh-net/src/magicsock/relay_actor.rs +++ b/iroh-net/src/magicsock/relay_actor.rs @@ -1,5 +1,6 @@ use std::{ collections::{BTreeMap, HashSet}, + future::Future, net::{IpAddr, SocketAddr}, sync::{atomic::Ordering, Arc}, time::{Duration, Instant}, @@ -8,7 +9,6 @@ use std::{ use anyhow::Context; use backoff::backoff::Backoff; use bytes::{Bytes, BytesMut}; -use futures::Future; use iroh_metrics::{inc, inc_by}; use tokio::{ sync::{mpsc, oneshot}, @@ -359,7 +359,7 @@ impl RelayActor { } async fn note_preferred(&self, my_url: &RelayUrl) { - futures::future::join_all(self.active_relay.iter().map(|(url, (s, _))| async move { + futures_buffered::join_all(self.active_relay.iter().map(|(url, (s, _))| async move { let is_preferred = url == my_url; s.send(ActiveRelayMessage::NotePreferred(is_preferred)) .await diff --git a/iroh-net/src/magicsock/timer.rs b/iroh-net/src/magicsock/timer.rs index d437aa852d..c3f6cbcd4f 100644 --- a/iroh-net/src/magicsock/timer.rs +++ b/iroh-net/src/magicsock/timer.rs @@ -1,6 +1,6 @@ +use std::future::Future; use std::time::Duration; -use futures::Future; use tokio::sync::mpsc; use tokio::task::JoinHandle; use tokio::time::{self, Instant}; diff --git a/iroh-net/src/magicsock/udp_conn.rs b/iroh-net/src/magicsock/udp_conn.rs index 9503b3a6ee..f28ccef80d 100644 --- a/iroh-net/src/magicsock/udp_conn.rs +++ b/iroh-net/src/magicsock/udp_conn.rs @@ -3,11 +3,10 @@ use std::{ io, net::SocketAddr, sync::Arc, - task::{Context, Poll}, + task::{ready, Context, Poll}, }; use anyhow::{bail, Context as _}; -use futures::ready; use quinn::AsyncUdpSocket; use tokio::io::Interest; use tracing::{debug, trace, warn}; diff --git a/iroh-net/src/net/interfaces/linux.rs b/iroh-net/src/net/interfaces/linux.rs index d0e4d6f23e..de15895d15 100644 --- a/iroh-net/src/net/interfaces/linux.rs +++ b/iroh-net/src/net/interfaces/linux.rs @@ -2,7 +2,7 @@ use anyhow::{anyhow, Result}; #[cfg(not(target_os = "android"))] -use futures::TryStreamExt; +use futures_util::TryStreamExt; use tokio::fs::File; use tokio::io::{AsyncBufReadExt, BufReader}; @@ -161,6 +161,7 @@ async fn iface_by_index(handle: &rtnetlink::Handle, index: u32) -> Result(&self, callback: F) -> Result where - F: Fn(bool) -> BoxFuture<'static, ()> + 'static + Sync + Send, + F: Fn(bool) -> BoxFuture<()> + 'static + Sync + Send, { let (s, r) = oneshot::channel(); self.actor_tx @@ -85,10 +85,10 @@ impl Monitor { #[cfg(test)] mod tests { - use futures::FutureExt; - use super::*; + use futures_util::FutureExt; + #[tokio::test] async fn test_smoke_monitor() { let _guard = iroh_test::logging::setup(); diff --git a/iroh-net/src/net/netmon/actor.rs b/iroh-net/src/net/netmon/actor.rs index f38fd2b3e6..6838817d73 100644 --- a/iroh-net/src/net/netmon/actor.rs +++ b/iroh-net/src/net/netmon/actor.rs @@ -5,7 +5,7 @@ use std::{ }; use anyhow::Result; -use futures::future::BoxFuture; +use futures_lite::future::Boxed as BoxFuture; use tokio::sync::{mpsc, oneshot}; use tracing::{debug, info, trace, warn}; @@ -70,7 +70,7 @@ pub(super) struct Actor { pub struct CallbackToken(u64); /// Callbacks that get notified about changes. -pub(super) type Callback = Box BoxFuture<'static, ()> + Sync + Send + 'static>; +pub(super) type Callback = Box BoxFuture<()> + Sync + Send + 'static>; pub(super) enum ActorMessage { Subscribe(Callback, oneshot::Sender), diff --git a/iroh-net/src/net/netmon/linux.rs b/iroh-net/src/net/netmon/linux.rs index 0cdc1a81ce..f1b98dec11 100644 --- a/iroh-net/src/net/netmon/linux.rs +++ b/iroh-net/src/net/netmon/linux.rs @@ -4,7 +4,7 @@ use std::{ }; use anyhow::Result; -use futures::StreamExt; +use futures_lite::StreamExt; use netlink_packet_core::NetlinkPayload; use netlink_packet_route::{address, constants::*, route, RtnlMessage}; use netlink_sys::{AsyncSocket, SocketAddr}; diff --git a/iroh-net/src/portmapper.rs b/iroh-net/src/portmapper.rs index c24473b817..a6e4b2ebbe 100644 --- a/iroh-net/src/portmapper.rs +++ b/iroh-net/src/portmapper.rs @@ -7,7 +7,7 @@ use std::{ }; use anyhow::{anyhow, Result}; -use futures::StreamExt; +use futures_lite::StreamExt; use tokio::sync::{mpsc, oneshot, watch}; use tracing::{debug, info_span, trace, Instrument}; diff --git a/iroh-net/src/portmapper/current_mapping.rs b/iroh-net/src/portmapper/current_mapping.rs index 917ffe80d4..0c2273ece3 100644 --- a/iroh-net/src/portmapper/current_mapping.rs +++ b/iroh-net/src/portmapper/current_mapping.rs @@ -1,13 +1,13 @@ //! Holds the current mapping value and ensures that any change is reported accordingly. use std::{ + future::Future, net::{Ipv4Addr, SocketAddrV4}, num::NonZeroU16, pin::Pin, task::Poll, }; -use futures::Future; use iroh_metrics::inc; use std::time::Duration; use tokio::{sync::watch, time}; @@ -164,7 +164,7 @@ impl CurrentMapping { } } -impl futures::Stream for CurrentMapping { +impl futures_lite::Stream for CurrentMapping { type Item = Event; fn poll_next( @@ -177,8 +177,9 @@ impl futures::Stream for CurrentMapping { #[cfg(test)] mod tests { + use futures_lite::StreamExt; + use super::*; - use futures::StreamExt; // for testing a mapping is simply an ip, port pair type M = (Ipv4Addr, NonZeroU16); diff --git a/iroh-net/src/relay/client.rs b/iroh-net/src/relay/client.rs index 5446e9fe9d..cbee0aea0a 100644 --- a/iroh-net/src/relay/client.rs +++ b/iroh-net/src/relay/client.rs @@ -5,7 +5,9 @@ use std::time::Duration; use anyhow::{anyhow, bail, ensure, Result}; use bytes::Bytes; -use futures::{Sink, SinkExt, StreamExt}; +use futures_lite::StreamExt; +use futures_sink::Sink; +use futures_util::sink::SinkExt; use tokio::io::{AsyncRead, AsyncWrite}; use tokio::sync::mpsc; use tokio_util::codec::{FramedRead, FramedWrite}; diff --git a/iroh-net/src/relay/client_conn.rs b/iroh-net/src/relay/client_conn.rs index 37a5ce68e1..dd876c0ad9 100644 --- a/iroh-net/src/relay/client_conn.rs +++ b/iroh-net/src/relay/client_conn.rs @@ -4,7 +4,8 @@ use std::time::Duration; use anyhow::{Context, Result}; use bytes::Bytes; -use futures::{SinkExt, StreamExt}; +use futures_lite::StreamExt; +use futures_util::SinkExt; use tokio::sync::mpsc; use tokio_util::codec::Framed; use tokio_util::sync::CancellationToken; @@ -26,7 +27,7 @@ use super::{ /// The [`super::server::Server`] side representation of a [`super::client::Client`]'s connection #[derive(Debug)] pub(crate) struct ClientConnManager { - /// Static after construction, process-wide unique counter, incremented each time we accept + /// Static after construction, process-wide unique counter, incremented each time we accept pub(crate) conn_num: usize, // TODO: in the go impl, we have a ptr to the server & use that ptr to update stats diff --git a/iroh-net/src/relay/clients.rs b/iroh-net/src/relay/clients.rs index 796322bab6..7f510ababc 100644 --- a/iroh-net/src/relay/clients.rs +++ b/iroh-net/src/relay/clients.rs @@ -1,15 +1,15 @@ //! based on tailscale/derp/derp_server.go //! //! The "Server" side of the client. Uses the `ClientConnManager`. -use crate::key::PublicKey; use std::collections::{HashMap, HashSet}; -use futures::future::join_all; -use tokio::sync::mpsc; +use tokio::{sync::mpsc, task::JoinSet}; use iroh_metrics::inc; use tracing::{Instrument, Span}; +use crate::key::PublicKey; + use super::{ client_conn::{ClientConnBuilder, ClientConnManager}, metrics::Metrics, @@ -149,13 +149,15 @@ impl Clients { pub async fn shutdown(&mut self) { tracing::trace!("shutting down conn"); - let mut handles = Vec::new(); + let mut handles = JoinSet::default(); for (_, client) in self.inner.drain() { - handles.push(tokio::spawn( - async move { client.shutdown_await().await }.instrument(Span::current()), - )); + handles.spawn(async move { client.shutdown_await().await }.instrument(Span::current())); + } + while let Some(t) = handles.join_next().await { + if let Err(err) = t { + tracing::trace!("shutdown error: {:?}", err); + } } - join_all(handles).await; } /// Record that `src` sent or forwarded a packet to `dst` diff --git a/iroh-net/src/relay/codec.rs b/iroh-net/src/relay/codec.rs index 141ed85353..8b4e02f493 100644 --- a/iroh-net/src/relay/codec.rs +++ b/iroh-net/src/relay/codec.rs @@ -2,7 +2,9 @@ use std::time::Duration; use anyhow::{bail, ensure, Context}; use bytes::{Buf, BufMut, Bytes, BytesMut}; -use futures::{Sink, SinkExt, Stream, StreamExt}; +use futures_lite::{Stream, StreamExt}; +use futures_sink::Sink; +use futures_util::SinkExt; use iroh_base::key::{Signature, PUBLIC_KEY_LENGTH}; use tokio_util::codec::{Decoder, Encoder}; diff --git a/iroh-net/src/relay/http/client.rs b/iroh-net/src/relay/http/client.rs index d96c896543..f522651d11 100644 --- a/iroh-net/src/relay/http/client.rs +++ b/iroh-net/src/relay/http/client.rs @@ -7,7 +7,7 @@ use std::time::Duration; use anyhow::bail; use bytes::Bytes; -use futures::future::BoxFuture; +use futures_lite::future::Boxed as BoxFuture; use hyper::body::Incoming; use hyper::header::UPGRADE; use hyper::upgrade::{Parts, Upgraded}; @@ -151,8 +151,7 @@ struct Actor { relay_client: Option<(RelayClient, RelayClientReceiver)>, is_closed: bool, #[debug("address family selector callback")] - address_family_selector: - Option BoxFuture<'static, bool> + Send + Sync + 'static>>, + address_family_selector: Option BoxFuture + Send + Sync + 'static>>, conn_gen: usize, url: RelayUrl, #[debug("TlsConnector")] @@ -191,8 +190,7 @@ pub struct ClientBuilder { /// Default is false is_preferred: bool, /// Default is None - address_family_selector: - Option BoxFuture<'static, bool> + Send + Sync + 'static>>, + address_family_selector: Option BoxFuture + Send + Sync + 'static>>, /// Default is false is_prober: bool, /// Expected PublicKey of the server @@ -243,7 +241,7 @@ impl ClientBuilder { /// work anyway, so we don't artificially delay the connection speed. pub fn address_family_selector(mut self, selector: S) -> Self where - S: Fn() -> BoxFuture<'static, bool> + Send + Sync + 'static, + S: Fn() -> BoxFuture + Send + Sync + 'static, { self.address_family_selector = Some(Box::new(selector)); self diff --git a/iroh-net/src/relay/http/server.rs b/iroh-net/src/relay/http/server.rs index b151212a1b..c9238ba147 100644 --- a/iroh-net/src/relay/http/server.rs +++ b/iroh-net/src/relay/http/server.rs @@ -1,4 +1,5 @@ use std::collections::HashMap; +use std::future::Future; use std::net::SocketAddr; use std::pin::Pin; use std::sync::Arc; @@ -6,7 +7,7 @@ use std::sync::Arc; use anyhow::{bail, ensure, Context as _, Result}; use bytes::Bytes; use derive_more::Debug; -use futures::future::{Future, FutureExt}; +use futures_lite::FutureExt; use http::response::Builder as ResponseBuilder; use hyper::body::Incoming; use hyper::header::{HeaderValue, UPGRADE}; diff --git a/iroh-net/src/test_utils.rs b/iroh-net/src/test_utils.rs index 88ac5b11f9..9c5b5a2885 100644 --- a/iroh-net/src/test_utils.rs +++ b/iroh-net/src/test_utils.rs @@ -95,10 +95,11 @@ pub(crate) mod dns_and_pkarr_servers { #[cfg(test)] pub(crate) mod dns_server { + use std::future::Future; use std::net::{Ipv4Addr, SocketAddr}; use anyhow::{ensure, Result}; - use futures::{future::BoxFuture, Future}; + use futures_lite::future::Boxed as BoxFuture; use hickory_proto::{ op::{header::MessageType, Message}, serialize::binary::BinDecodable, @@ -118,9 +119,8 @@ pub(crate) mod dns_server { ) -> impl Future> + Send; } - pub type QueryHandlerFunction = Box< - dyn Fn(&Message, &mut Message) -> BoxFuture<'static, Result<()>> + Send + Sync + 'static, - >; + pub type QueryHandlerFunction = + Box BoxFuture> + Send + Sync + 'static>; impl QueryHandler for QueryHandlerFunction { fn resolve( &self, @@ -388,7 +388,7 @@ pub(crate) mod pkarr_dns_state { ) -> impl Future> + Send { const TTL: u32 = 30; let res = self.resolve_dns(query, reply, TTL); - futures::future::ready(res) + std::future::ready(res) } } } diff --git a/iroh-net/src/util.rs b/iroh-net/src/util.rs index f1eee67971..58165a9b8d 100644 --- a/iroh-net/src/util.rs +++ b/iroh-net/src/util.rs @@ -3,18 +3,22 @@ use std::{ future::Future, pin::Pin, + sync::Arc, task::{Context, Poll}, }; -use futures::FutureExt; +use futures_lite::future::Boxed as BoxFuture; +use futures_util::{future::Shared, FutureExt}; /// A join handle that owns the task it is running, and aborts it when dropped. #[derive(Debug, derive_more::Deref)] -pub struct AbortingJoinHandle(pub tokio::task::JoinHandle); +pub struct AbortingJoinHandle { + handle: tokio::task::JoinHandle, +} impl From> for AbortingJoinHandle { fn from(handle: tokio::task::JoinHandle) -> Self { - Self(handle) + Self { handle } } } @@ -22,13 +26,49 @@ impl Future for AbortingJoinHandle { type Output = std::result::Result; fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll { - self.0.poll_unpin(cx) + Pin::new(&mut self.handle).poll(cx) } } impl Drop for AbortingJoinHandle { fn drop(&mut self) { - self.0.abort(); + self.handle.abort(); + } +} + +/// A join handle that owns the task it is running, and aborts it when dropped. +/// It is cloneable and will abort when the last instance is dropped. +#[derive(Debug, Clone)] +pub struct SharedAbortingJoinHandle { + fut: Shared>>, + abort: Arc, +} + +impl From> for SharedAbortingJoinHandle { + fn from(handle: tokio::task::JoinHandle) -> Self { + let abort = handle.abort_handle(); + let fut: BoxFuture> = + Box::pin(async move { handle.await.map_err(|e| e.to_string()) }); + Self { + fut: fut.shared(), + abort: Arc::new(abort), + } + } +} + +impl Future for SharedAbortingJoinHandle { + type Output = std::result::Result; + + fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll { + Pin::new(&mut self.fut).poll(cx) + } +} + +impl Drop for SharedAbortingJoinHandle { + fn drop(&mut self) { + if Arc::strong_count(&self.abort) == 1 { + self.abort.abort(); + } } } diff --git a/iroh-sync/Cargo.toml b/iroh-sync/Cargo.toml index d808a7b0a7..faddc88aea 100644 --- a/iroh-sync/Cargo.toml +++ b/iroh-sync/Cargo.toml @@ -44,7 +44,8 @@ iroh-net = { version = "0.14.0", optional = true, path = "../iroh-net" } tokio-util = { version = "0.7", optional = true, features = ["codec", "io-util", "io"] } tokio-stream = { version = "0.1", optional = true, features = ["sync"]} quinn = { version = "0.10", optional = true } -futures = { version = "0.3", optional = true } +futures-util = { version = "0.3.25", optional = true } +lru = "0.12" self_cell = "1.0.3" [dev-dependencies] @@ -57,5 +58,5 @@ test-strategy = "0.3.1" [features] default = ["net", "metrics"] -net = ["iroh-net", "tokio/io-util", "tokio-stream", "tokio-util", "quinn", "futures"] -metrics = ["iroh-metrics"] +net = ["dep:iroh-net", "tokio/io-util", "dep:tokio-stream", "dep:tokio-util", "dep:quinn", "dep:futures-util"] +metrics = ["dep:iroh-metrics"] diff --git a/iroh-sync/src/net/codec.rs b/iroh-sync/src/net/codec.rs index 591380a785..6b9adffe0f 100644 --- a/iroh-sync/src/net/codec.rs +++ b/iroh-sync/src/net/codec.rs @@ -2,7 +2,7 @@ use std::future::Future; use anyhow::{anyhow, ensure}; use bytes::{Buf, BufMut, BytesMut}; -use futures::SinkExt; +use futures_util::SinkExt; use iroh_net::key::PublicKey; use serde::{Deserialize, Serialize}; use tokio::io::{AsyncRead, AsyncWrite}; @@ -384,7 +384,7 @@ mod tests { &mut bob_writer, &mut bob_reader, bob_handle2, - |_namespace, _peer| futures::future::ready(AcceptOutcome::Allow), + |_namespace, _peer| std::future::ready(AcceptOutcome::Allow), alice_peer_id, ) .await @@ -600,7 +600,7 @@ mod tests { &mut bob_writer, &mut bob_reader, bob_handle, - |_namespace, _peer| futures::future::ready(AcceptOutcome::Allow), + |_namespace, _peer| std::future::ready(AcceptOutcome::Allow), alice_node_pubkey, ) .await diff --git a/iroh/Cargo.toml b/iroh/Cargo.toml index bb12caab5a..81c5bab4d2 100644 --- a/iroh/Cargo.toml +++ b/iroh/Cargo.toml @@ -21,7 +21,9 @@ bao-tree = { version = "0.13", features = ["tokio_fsm"], default-features = fals bytes = "1" derive_more = { version = "1.0.0-beta.1", features = ["debug", "display", "from", "try_into", "from_str"] } flume = "0.11" -futures = "0.3.25" +futures-buffered = "0.2.4" +futures-lite = "2.3" +futures-util = "0.3" genawaiter = { version = "0.99", default-features = false, features = ["futures03"] } hex = { version = "0.4.3" } iroh-bytes = { version = "0.14.0", path = "../iroh-bytes", features = ["downloader"] } @@ -35,7 +37,7 @@ iroh-sync = { version = "0.14.0", path = "../iroh-sync" } iroh-gossip = { version = "0.14.0", path = "../iroh-gossip" } parking_lot = "0.12.1" postcard = { version = "1", default-features = false, features = ["alloc", "use-std", "experimental-derive"] } -quic-rpc = { version = "0.7.0", default-features = false, features = ["flume-transport", "quinn-transport"] } +quic-rpc = { version = "0.8.0", default-features = false, features = ["flume-transport", "quinn-transport"] } quinn = "0.10" rand = "0.8" serde = { version = "1", features = ["derive"] } diff --git a/iroh/examples/collection-provide.rs b/iroh/examples/collection-provide.rs index 1ef27561c3..3269ced626 100644 --- a/iroh/examples/collection-provide.rs +++ b/iroh/examples/collection-provide.rs @@ -66,6 +66,6 @@ async fn main() -> anyhow::Result<()> { println!("\tcargo run --example collection-fetch {}", ticket); // wait for the node to finish, this will block indefinitely // stop with SIGINT (ctrl+c) - node.await?; + node.shutdown().await?; Ok(()) } diff --git a/iroh/examples/hello-world-provide.rs b/iroh/examples/hello-world-provide.rs index 663895e5f4..0f0be772cd 100644 --- a/iroh/examples/hello-world-provide.rs +++ b/iroh/examples/hello-world-provide.rs @@ -48,6 +48,6 @@ async fn main() -> anyhow::Result<()> { println!("\t cargo run --example hello-world-fetch {}", ticket); // wait for the node to finish, this will block indefinitely // stop with SIGINT (ctrl+c) - node.await?; + node.shutdown().await?; Ok(()) } diff --git a/iroh/examples/rpc.rs b/iroh/examples/rpc.rs index 2aacfa758b..1d1aa9550f 100644 --- a/iroh/examples/rpc.rs +++ b/iroh/examples/rpc.rs @@ -40,7 +40,8 @@ where } // wait for the node to finish, this will block indefinitely // stop with SIGINT (ctrl+c) - node.await?; + node.shutdown().await?; + Ok(()) } diff --git a/iroh/src/client.rs b/iroh/src/client.rs index 95609c973b..840cb103b2 100644 --- a/iroh/src/client.rs +++ b/iroh/src/client.rs @@ -2,7 +2,7 @@ //! //! TODO: Contains only iroh sync related methods. Add other methods. -use futures::{Stream, StreamExt}; +use futures_lite::{Stream, StreamExt}; use quic_rpc::{RpcClient, ServiceConnection}; use crate::rpc_protocol::ProviderService; diff --git a/iroh/src/client/authors.rs b/iroh/src/client/authors.rs index 367d4b7e53..8f74d0d107 100644 --- a/iroh/src/client/authors.rs +++ b/iroh/src/client/authors.rs @@ -1,5 +1,5 @@ use anyhow::Result; -use futures::{Stream, TryStreamExt}; +use futures_lite::{stream::StreamExt, Stream}; use iroh_sync::{Author, AuthorId}; use quic_rpc::{RpcClient, ServiceConnection}; @@ -29,7 +29,7 @@ where /// List document authors for which we have a secret key. pub async fn list(&self) -> Result>> { let stream = self.rpc.server_streaming(AuthorListRequest {}).await?; - Ok(flatten(stream).map_ok(|res| res.author_id)) + Ok(flatten(stream).map(|res| res.map(|res| res.author_id))) } /// Export the given author. @@ -69,15 +69,8 @@ mod tests { let author_id = node.authors.create().await?; - assert_eq!( - node.authors - .list() - .await? - .try_collect::>() - .await? - .len(), - 1 - ); + let authors: Vec<_> = node.authors.list().await?.try_collect().await?; + assert_eq!(authors.len(), 1); let author = node .authors @@ -85,24 +78,13 @@ mod tests { .await? .expect("should have author"); node.authors.delete(author_id).await?; - assert!(node - .authors - .list() - .await? - .try_collect::>() - .await? - .is_empty()); + let authors: Vec<_> = node.authors.list().await?.try_collect().await?; + assert!(authors.is_empty()); node.authors.import(author).await?; - assert_eq!( - node.authors - .list() - .await? - .try_collect::>() - .await? - .len(), - 1 - ); + + let authors: Vec<_> = node.authors.list().await?.try_collect().await?; + assert_eq!(authors.len(), 1); Ok(()) } diff --git a/iroh/src/client/blobs.rs b/iroh/src/client/blobs.rs index f14c7b458b..5f7fe5561b 100644 --- a/iroh/src/client/blobs.rs +++ b/iroh/src/client/blobs.rs @@ -1,4 +1,5 @@ use std::{ + future::Future, io, path::PathBuf, pin::Pin, @@ -8,7 +9,8 @@ use std::{ use anyhow::{anyhow, Result}; use bytes::Bytes; -use futures::{Future, SinkExt, Stream, StreamExt, TryStreamExt}; +use futures_lite::{Stream, StreamExt}; +use futures_util::SinkExt; use iroh_base::{node_addr::AddrInfoOptions, ticket::BlobTicket}; use iroh_bytes::{ export::ExportProgress, @@ -180,7 +182,7 @@ where /// Write a blob by passing bytes. pub async fn add_bytes(&self, bytes: impl Into) -> anyhow::Result { - let input = futures::stream::once(futures::future::ready(Ok(bytes.into()))); + let input = futures_lite::stream::once(Ok(bytes.into())); self.add_stream(input, SetTagOption::Auto).await?.await } @@ -190,7 +192,7 @@ where bytes: impl Into, name: impl Into, ) -> anyhow::Result { - let input = futures::stream::once(futures::future::ready(Ok(bytes.into()))); + let input = futures_lite::stream::once(Ok(bytes.into())); self.add_stream(input, SetTagOption::Named(name.into())) .await? .await @@ -207,7 +209,7 @@ where .rpc .server_streaming(BlobValidateRequest { repair }) .await?; - Ok(stream.map_err(anyhow::Error::from)) + Ok(stream.map(|res| res.map_err(anyhow::Error::from))) } /// Validate hashes on the running node. @@ -221,14 +223,14 @@ where .rpc .server_streaming(BlobConsistencyCheckRequest { repair }) .await?; - Ok(stream.map_err(anyhow::Error::from)) + Ok(stream.map(|r| r.map_err(anyhow::Error::from))) } /// Download a blob from another node and add it to the local database. pub async fn download(&self, req: BlobDownloadRequest) -> Result { let stream = self.rpc.server_streaming(req).await?; Ok(BlobDownloadProgress::new( - stream.map_err(anyhow::Error::from), + stream.map(|res| res.map_err(anyhow::Error::from)), )) } @@ -255,7 +257,9 @@ where mode, }; let stream = self.rpc.server_streaming(req).await?; - Ok(BlobExportProgress::new(stream.map_err(anyhow::Error::from))) + Ok(BlobExportProgress::new( + stream.map(|r| r.map_err(anyhow::Error::from)), + )) } /// List all complete blobs. @@ -397,7 +401,7 @@ impl BlobAddProgress { impl Stream for BlobAddProgress { type Item = Result; fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll> { - self.stream.poll_next_unpin(cx) + Pin::new(&mut self.stream).poll_next(cx) } } @@ -406,7 +410,7 @@ impl Future for BlobAddProgress { fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll { loop { - match self.stream.poll_next_unpin(cx) { + match Pin::new(&mut self.stream).poll_next(cx) { Poll::Pending => return Poll::Pending, Poll::Ready(None) => { return Poll::Ready(Err(anyhow!("Response stream ended prematurely"))) @@ -502,7 +506,7 @@ impl BlobDownloadProgress { impl Stream for BlobDownloadProgress { type Item = Result; fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll> { - self.stream.poll_next_unpin(cx) + Pin::new(&mut self.stream).poll_next(cx) } } @@ -511,7 +515,7 @@ impl Future for BlobDownloadProgress { fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll { loop { - match self.stream.poll_next_unpin(cx) { + match Pin::new(&mut self.stream).poll_next(cx) { Poll::Pending => return Poll::Pending, Poll::Ready(None) => { return Poll::Ready(Err(anyhow!("Response stream ended prematurely"))) @@ -590,7 +594,7 @@ impl BlobExportProgress { impl Stream for BlobExportProgress { type Item = Result; fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll> { - self.stream.poll_next_unpin(cx) + Pin::new(&mut self.stream).poll_next(cx) } } @@ -599,7 +603,7 @@ impl Future for BlobExportProgress { fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll { loop { - match self.stream.poll_next_unpin(cx) { + match Pin::new(&mut self.stream).poll_next(cx) { Poll::Pending => return Poll::Pending, Poll::Ready(None) => { return Poll::Ready(Err(anyhow!("Response stream ended prematurely"))) diff --git a/iroh/src/client/docs.rs b/iroh/src/client/docs.rs index e486b2c8a9..83bb9d002d 100644 --- a/iroh/src/client/docs.rs +++ b/iroh/src/client/docs.rs @@ -7,7 +7,7 @@ use std::{ use anyhow::{anyhow, Context as _, Result}; use bytes::Bytes; -use futures::{Stream, StreamExt, TryStreamExt}; +use futures_lite::{Stream, StreamExt}; use iroh_base::{key::PublicKey, node_addr::AddrInfoOptions}; use iroh_bytes::{export::ExportProgress, store::ExportMode, Hash}; use iroh_net::NodeAddr; @@ -72,7 +72,7 @@ where /// List all documents. pub async fn list(&self) -> Result>> { let stream = self.rpc.server_streaming(DocListRequest {}).await?; - Ok(flatten(stream).map_ok(|res| (res.id, res.capability))) + Ok(flatten(stream).map(|res| res.map(|res| (res.id, res.capability)))) } /// Get a [`Doc`] client for a single document. Return None if the document cannot be found. @@ -293,7 +293,7 @@ where query: query.into(), }) .await?; - Ok(flatten(stream).map_ok(|res| res.entry.into())) + Ok(flatten(stream).map(|res| res.map(|res| res.entry.into()))) } /// Get a single entry. @@ -343,11 +343,12 @@ where let stream = self .0 .rpc - .server_streaming(DocSubscribeRequest { doc_id: self.id() }) + .try_server_streaming(DocSubscribeRequest { doc_id: self.id() }) .await?; - Ok(flatten(stream) - .map_ok(|res| res.event.into()) - .map_err(Into::into)) + Ok(stream.map(|res| match res { + Ok(res) => Ok(res.event.into()), + Err(err) => Err(err.into()), + })) } /// Get status info for this document @@ -593,7 +594,7 @@ pub struct DocImportFileOutcome { impl Stream for DocImportFileProgress { type Item = Result; fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll> { - self.stream.poll_next_unpin(cx) + Pin::new(&mut self.stream).poll_next(cx) } } @@ -658,8 +659,9 @@ pub struct DocExportFileOutcome { impl Stream for DocExportFileProgress { type Item = Result; + fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll> { - self.stream.poll_next_unpin(cx) + Pin::new(&mut self.stream).poll_next(cx) } } diff --git a/iroh/src/client/node.rs b/iroh/src/client/node.rs index 30291400f4..6a4d15ff92 100644 --- a/iroh/src/client/node.rs +++ b/iroh/src/client/node.rs @@ -1,7 +1,7 @@ use std::collections::BTreeMap; use anyhow::Result; -use futures::{Stream, TryStreamExt}; +use futures_lite::{Stream, StreamExt}; use iroh_base::key::PublicKey; use iroh_net::magic_endpoint::ConnectionInfo; use quic_rpc::{RpcClient, ServiceConnection}; @@ -32,7 +32,7 @@ where /// Get information about the different connections we have made pub async fn connections(&self) -> Result>> { let stream = self.rpc.server_streaming(NodeConnectionsRequest {}).await?; - Ok(flatten(stream).map_ok(|res| res.conn_info)) + Ok(flatten(stream).map(|res| res.map(|res| res.conn_info))) } /// Get connection information about a node diff --git a/iroh/src/client/tags.rs b/iroh/src/client/tags.rs index db5db7d5b7..0d5a402a3c 100644 --- a/iroh/src/client/tags.rs +++ b/iroh/src/client/tags.rs @@ -1,5 +1,5 @@ use anyhow::Result; -use futures::{Stream, TryStreamExt}; +use futures_lite::{Stream, StreamExt}; use iroh_bytes::Tag; use quic_rpc::{RpcClient, ServiceConnection}; @@ -18,7 +18,7 @@ where /// List all tags. pub async fn list(&self) -> Result>> { let stream = self.rpc.server_streaming(ListTagsRequest).await?; - Ok(stream.map_err(anyhow::Error::from)) + Ok(stream.map(|res| res.map_err(anyhow::Error::from))) } /// Delete a tag. diff --git a/iroh/src/node.rs b/iroh/src/node.rs index 8bd8f01ca6..927e736599 100644 --- a/iroh/src/node.rs +++ b/iroh/src/node.rs @@ -6,16 +6,12 @@ //! //! To shut down the node, call [`Node::shutdown`]. use std::fmt::Debug; -use std::future::Future; use std::net::SocketAddr; use std::path::Path; -use std::pin::Pin; use std::sync::Arc; -use std::task::Poll; use anyhow::{anyhow, Result}; -use futures::future::{BoxFuture, Shared}; -use futures::{FutureExt, StreamExt}; +use futures_lite::{future::Boxed as BoxFuture, FutureExt, StreamExt}; use iroh_bytes::downloader::Downloader; use iroh_bytes::store::Store as BaoStore; use iroh_bytes::BlobFormat; @@ -30,7 +26,7 @@ use iroh_net::{ use quic_rpc::transport::flume::FlumeConnection; use quic_rpc::RpcClient; use tokio::sync::{mpsc, RwLock}; -use tokio::task::JoinError; +use tokio::task::JoinHandle; use tokio_util::sync::CancellationToken; use tokio_util::task::LocalPoolHandle; use tracing::debug; @@ -46,7 +42,7 @@ mod rpc_status; pub use builder::{Builder, GcPolicy, NodeDiscoveryConfig, StorageConfig}; pub use rpc_status::RpcStatus; -type EventCallback = Box BoxFuture<'static, ()> + 'static + Sync + Send>; +type EventCallback = Box BoxFuture<()> + 'static + Sync + Send>; #[derive(Default, derive_more::Debug, Clone)] struct Callbacks(#[debug("..")] Arc>>); @@ -67,8 +63,9 @@ impl Callbacks { impl iroh_bytes::provider::EventSender for Callbacks { fn send(&self, event: iroh_bytes::provider::Event) -> BoxFuture<()> { + let this = self.clone(); async move { - let cbs = self.0.read().await; + let cbs = this.0.read().await; for cb in &*cbs { cb(Event::ByteProvide(event.clone())).await; } @@ -90,7 +87,7 @@ impl iroh_bytes::provider::EventSender for Callbacks { #[derive(Debug, Clone)] pub struct Node { inner: Arc>, - task: Shared>>>, + task: Arc>, client: crate::client::mem::Iroh, } @@ -102,7 +99,7 @@ struct NodeInner { cancel_token: CancellationToken, controller: FlumeConnection, #[debug("callbacks: Sender>")] - cb_sender: mpsc::Sender BoxFuture<'static, ()> + Send + Sync + 'static>>, + cb_sender: mpsc::Sender BoxFuture<()> + Send + Sync + 'static>>, callbacks: Callbacks, #[allow(dead_code)] gc_task: Option>, @@ -191,7 +188,7 @@ impl Node { /// progress. /// /// Warning: The callback must complete quickly, as otherwise it will block ongoing work. - pub async fn subscribe BoxFuture<'static, ()> + Send + Sync + 'static>( + pub async fn subscribe BoxFuture<()> + Send + Sync + 'static>( &self, cb: F, ) -> Result<()> { @@ -236,12 +233,18 @@ impl Node { /// Aborts the node. /// /// This does not gracefully terminate currently: all connections are closed and - /// anything in-transit is lost. The task will stop running and awaiting this - /// [`Node`] will complete. + /// anything in-transit is lost. The task will stop running. + /// If this is the last copy of the `Node`, this will finish once the task is + /// fully shutdown. /// /// The shutdown behaviour will become more graceful in the future. - pub fn shutdown(&self) { + pub async fn shutdown(self) -> Result<()> { self.inner.cancel_token.cancel(); + + if let Ok(task) = Arc::try_unwrap(self.task) { + task.await?; + } + Ok(()) } /// Returns a token that can be used to cancel the node. @@ -250,15 +253,6 @@ impl Node { } } -/// The future completes when the spawned tokio task finishes. -impl Future for Node { - type Output = Result<(), Arc>; - - fn poll(mut self: Pin<&mut Self>, cx: &mut std::task::Context<'_>) -> Poll { - Pin::new(&mut self.task).poll(cx) - } -} - impl std::ops::Deref for Node { type Target = crate::client::mem::Iroh; @@ -405,8 +399,7 @@ mod tests { let iroh = Node::persistent(iroh_root.path()).await?.spawn().await?; let doc = iroh.docs.create().await?; drop(doc); - iroh.shutdown(); - iroh.await?; + iroh.shutdown().await?; } let iroh = Node::persistent(iroh_root.path()).await?.spawn().await?; diff --git a/iroh/src/node/builder.rs b/iroh/src/node/builder.rs index 917395c511..31a6bd631f 100644 --- a/iroh/src/node/builder.rs +++ b/iroh/src/node/builder.rs @@ -7,7 +7,7 @@ use std::{ }; use anyhow::{bail, Context, Result}; -use futures::{FutureExt, StreamExt, TryFutureExt}; +use futures_lite::StreamExt; use iroh_base::key::SecretKey; use iroh_bytes::{ downloader::Downloader, @@ -19,7 +19,6 @@ use iroh_net::{ discovery::{dns::DnsDiscovery, pkarr_publish::PkarrPublisher, ConcurrentDiscovery, Discovery}, magic_endpoint::get_alpn, relay::RelayMode, - util::AbortingJoinHandle, MagicEndpoint, }; use iroh_sync::net::SYNC_ALPN; @@ -407,7 +406,7 @@ where let db = self.blobs_store.clone(); let callbacks = callbacks.clone(); let task = lp.spawn_pinned(move || Self::gc_loop(db, sync_db, gc_period, callbacks)); - Some(AbortingJoinHandle(task)) + Some(task.into()) } else { None }; @@ -453,7 +452,7 @@ where let node = Node { inner, - task: task.map_err(Arc::new).boxed().shared(), + task: Arc::new(task), client, }; @@ -513,7 +512,8 @@ where _ = cancel_token.cancelled() => { // clean shutdown of the blobs db to close the write transaction handler.inner.db.shutdown().await; - if let Err(err) = handler.inner.sync.shutdown().await { + + if let Err(err) = handler.inner.sync.start_shutdown().await { warn!("sync shutdown error: {:?}", err); } break diff --git a/iroh/src/node/rpc.rs b/iroh/src/node/rpc.rs index 870620ab02..29941fc680 100644 --- a/iroh/src/node/rpc.rs +++ b/iroh/src/node/rpc.rs @@ -4,7 +4,8 @@ use std::sync::{Arc, Mutex}; use std::time::Duration; use anyhow::{anyhow, ensure, Result}; -use futures::{FutureExt, Stream, StreamExt}; +use futures_buffered::BufferedStreamExt; +use futures_lite::{Stream, StreamExt}; use genawaiter::sync::{Co, Gen}; use iroh_base::rpc::RpcResult; use iroh_bytes::downloader::{DownloadRequest, Downloader}; @@ -253,8 +254,8 @@ impl Handler { .await } DocSubscribe(msg) => { - chan.server_streaming(msg, handler, |handler, req| { - async move { handler.inner.sync.doc_subscribe(req) }.flatten_stream() + chan.try_server_streaming(msg, handler, |handler, req| async move { + handler.inner.sync.doc_subscribe(req).await }) .await } @@ -649,7 +650,6 @@ impl Handler { progress: flume::Sender, ) -> anyhow::Result<()> { use crate::rpc_protocol::WrapOption; - use futures::TryStreamExt; use iroh_bytes::store::ImportMode; use std::collections::BTreeMap; @@ -699,7 +699,7 @@ impl Handler { // import all files below root recursively let data_sources = crate::util::fs::scan_path(root, wrap)?; const IO_PARALLELISM: usize = 4; - let result: Vec<_> = futures::stream::iter(data_sources) + let result: Vec<_> = futures_lite::stream::iter(data_sources) .map(|source| { let import_progress = import_progress.clone(); let db = self.inner.db.clone(); @@ -717,8 +717,8 @@ impl Handler { io::Result::Ok((name, hash, size, tag)) } }) - .buffered(IO_PARALLELISM) - .try_collect::>() + .buffered_ordered(IO_PARALLELISM) + .try_collect() .await?; // create a collection @@ -805,7 +805,7 @@ impl Handler { } fn node_watch(self, _: NodeWatchRequest) -> impl Stream { - futures::stream::unfold((), |()| async move { + futures_lite::stream::unfold((), |()| async move { tokio::time::sleep(HEALTH_POLL_WAIT).await; Some(( NodeWatchResponse { diff --git a/iroh/src/rpc_protocol.rs b/iroh/src/rpc_protocol.rs index f19fe93e17..14ab810946 100644 --- a/iroh/src/rpc_protocol.rs +++ b/iroh/src/rpc_protocol.rs @@ -30,6 +30,7 @@ use iroh_sync::{ }; use quic_rpc::{ message::{BidiStreaming, BidiStreamingMsg, Msg, RpcMsg, ServerStreaming, ServerStreamingMsg}, + pattern::try_server_streaming::{StreamCreated, TryServerStreaming, TryServerStreamingMsg}, Service, }; use serde::{Deserialize, Serialize}; @@ -566,11 +567,13 @@ pub struct DocSubscribeRequest { } impl Msg for DocSubscribeRequest { - type Pattern = ServerStreaming; + type Pattern = TryServerStreaming; } -impl ServerStreamingMsg for DocSubscribeRequest { - type Response = RpcResult; +impl TryServerStreamingMsg for DocSubscribeRequest { + type Item = DocSubscribeResponse; + type ItemError = RpcError; + type CreateError = RpcError; } /// Response to [`DocSubscribeRequest`] @@ -1224,6 +1227,7 @@ pub enum ProviderResponse { DocGetDownloadPolicy(RpcResult), DocSetDownloadPolicy(RpcResult), DocGetSyncPeers(RpcResult), + StreamCreated(RpcResult), AuthorList(RpcResult), AuthorCreate(RpcResult), diff --git a/iroh/src/sync_engine.rs b/iroh/src/sync_engine.rs index 33ce2ec1e7..69d426ac9e 100644 --- a/iroh/src/sync_engine.rs +++ b/iroh/src/sync_engine.rs @@ -5,18 +5,15 @@ use std::{io, sync::Arc}; use anyhow::Result; -use futures::{ - future::{BoxFuture, FutureExt, Shared}, - Stream, TryStreamExt, -}; +use futures_lite::{Stream, StreamExt}; use iroh_bytes::downloader::Downloader; use iroh_bytes::{store::EntryStatus, Hash}; use iroh_gossip::net::Gossip; +use iroh_net::util::SharedAbortingJoinHandle; use iroh_net::{key::PublicKey, MagicEndpoint, NodeAddr}; use iroh_sync::{actor::SyncHandle, ContentStatus, ContentStatusCallback, Entry, NamespaceId}; use serde::{Deserialize, Serialize}; use tokio::sync::{mpsc, oneshot}; -use tokio_stream::StreamExt; use tracing::{error, error_span, Instrument}; mod gossip; @@ -46,7 +43,7 @@ pub struct SyncEngine { pub(crate) endpoint: MagicEndpoint, pub(crate) sync: SyncHandle, to_live_actor: mpsc::Sender, - tasks_fut: Shared>, + actor_handle: SharedAbortingJoinHandle<()>, #[debug("ContentStatusCallback")] content_status_cb: ContentStatusCallback, } @@ -83,47 +80,26 @@ impl SyncEngine { live_actor_tx.clone(), to_gossip_actor, ); - let mut gossip_actor = GossipActor::new( + let gossip_actor = GossipActor::new( to_gossip_actor_recv, sync.clone(), gossip, live_actor_tx.clone(), ); - let live_actor_task = tokio::task::spawn( + let actor_handle = tokio::task::spawn( async move { - if let Err(err) = actor.run().await { + if let Err(err) = actor.run(gossip_actor).await { error!("sync actor failed: {err:?}"); } } .instrument(error_span!("sync", %me)), ); - let gossip_actor_task = tokio::task::spawn( - async move { - if let Err(err) = gossip_actor.run().await { - error!("gossip recv actor failed: {err:?}"); - } - } - .instrument(error_span!("sync", %me)), - ); - let tasks_fut = async move { - if let Err(err) = live_actor_task.await { - error!("Error while joining actor task: {err:?}"); - } - gossip_actor_task.abort(); - if let Err(err) = gossip_actor_task.await { - if !err.is_cancelled() { - error!("Error while joining gossip recv task task: {err:?}"); - } - } - } - .boxed() - .shared(); Self { endpoint, sync, to_live_actor: live_actor_tx, - tasks_fut, + actor_handle: actor_handle.into(), content_status_cb, } } @@ -177,49 +153,40 @@ impl SyncEngine { } /// Subscribe to replica and sync progress events. - pub fn subscribe( + pub async fn subscribe( &self, namespace: NamespaceId, - ) -> impl Stream> + Unpin + 'static { + ) -> Result> + Unpin + 'static> { let content_status_cb = self.content_status_cb.clone(); // Create a future that sends channel senders to the respective actors. // We clone `self` so that the future does not capture any lifetimes. let this = self.clone(); - let fut = async move { - // Subscribe to insert events from the replica. - let replica_events = { - let (s, r) = flume::bounded(SUBSCRIBE_CHANNEL_CAP); - this.sync.subscribe(namespace, s).await?; - r.into_stream() - .map(move |ev| LiveEvent::from_replica_event(ev, &content_status_cb)) - }; - // Subscribe to events from the [`live::Actor`]. - let sync_events = { - let (s, r) = flume::bounded(SUBSCRIBE_CHANNEL_CAP); - let (reply, reply_rx) = oneshot::channel(); - this.to_live_actor - .send(ToLiveActor::Subscribe { - namespace, - sender: s, - reply, - }) - .await?; - reply_rx.await??; - r.into_stream().map(|event| Ok(LiveEvent::from(event))) - }; + // Subscribe to insert events from the replica. + let a = { + let (s, r) = flume::bounded(SUBSCRIBE_CHANNEL_CAP); + this.sync.subscribe(namespace, s).await?; + r.into_stream() + .map(move |ev| LiveEvent::from_replica_event(ev, &content_status_cb)) + }; - // Merge the two receivers into a single stream. - let stream = replica_events.merge(sync_events); - // We need type annotations for the error type here. - Result::<_, anyhow::Error>::Ok(stream) + // Subscribe to events from the [`live::Actor`]. + let b = { + let (s, r) = flume::bounded(SUBSCRIBE_CHANNEL_CAP); + let (reply, reply_rx) = oneshot::channel(); + this.to_live_actor + .send(ToLiveActor::Subscribe { + namespace, + sender: s, + reply, + }) + .await?; + reply_rx.await??; + r.into_stream().map(|event| Ok(LiveEvent::from(event))) }; - // Flatten the future into a single stream. If the future errors, the error will be - // returned from the first call to [`Stream::next`]. - // We first pin the future so that the resulting stream is `Unpin`. - Box::pin(fut).into_stream().try_flatten() + Ok(a.or(b)) } /// Handle an incoming iroh-sync connection. @@ -230,10 +197,16 @@ impl SyncEngine { Ok(()) } + pub(crate) async fn start_shutdown(&self) -> Result<()> { + self.to_live_actor.send(ToLiveActor::Shutdown).await?; + Ok(()) + } + /// Shutdown the sync engine. - pub async fn shutdown(&self) -> Result<()> { + pub async fn shutdown(self) -> Result<()> { self.to_live_actor.send(ToLiveActor::Shutdown).await?; - self.tasks_fut.clone().await; + + self.actor_handle.await.map_err(|e| anyhow::anyhow!(e))?; Ok(()) } } diff --git a/iroh/src/sync_engine/gossip.rs b/iroh/src/sync_engine/gossip.rs index 7835c28232..86853cab38 100644 --- a/iroh/src/sync_engine/gossip.rs +++ b/iroh/src/sync_engine/gossip.rs @@ -1,7 +1,7 @@ use std::collections::HashSet; use anyhow::{anyhow, Context, Result}; -use futures::{stream::StreamExt, FutureExt}; +use futures_lite::StreamExt; use iroh_gossip::{ net::{Event, Gossip}, proto::TopicId, @@ -108,12 +108,16 @@ impl GossipActor { return Ok(false); } ToGossipActor::Join { namespace, peers } => { + let gossip = self.gossip.clone(); // join gossip for the topic to receive and send message - let fut = self - .gossip - .join(namespace.into(), peers) - .await? - .map(move |res| (namespace, res)); + let fut = async move { + let res = gossip.join(namespace.into(), peers).await; + let res = match res { + Ok(fut) => fut.await, + Err(err) => Err(err), + }; + (namespace, res) + }; self.want_join.insert(namespace); self.pending_joins.spawn(fut); } diff --git a/iroh/src/sync_engine/live.rs b/iroh/src/sync_engine/live.rs index 8a1f2a5997..db81473319 100644 --- a/iroh/src/sync_engine/live.rs +++ b/iroh/src/sync_engine/live.rs @@ -4,7 +4,7 @@ use std::collections::HashSet; use std::{collections::HashMap, time::SystemTime}; use anyhow::{Context, Result}; -use futures::FutureExt; +use futures_lite::FutureExt; use iroh_bytes::downloader::{DownloadError, DownloadRequest, Downloader}; use iroh_bytes::get::Stats; use iroh_bytes::HashAndFormat; @@ -25,9 +25,9 @@ use tokio::{ sync::{self, mpsc, oneshot}, task::JoinSet, }; -use tracing::{debug, error, info, instrument, trace, warn, Instrument, Span}; +use tracing::{debug, error, error_span, info, instrument, trace, warn, Instrument, Span}; -use super::gossip::ToGossipActor; +use super::gossip::{GossipActor, ToGossipActor}; use super::state::{NamespaceStates, Origin, SyncReason}; /// An iroh-sync operation @@ -205,11 +205,22 @@ impl LiveActor { } /// Run the actor loop. - pub async fn run(&mut self) -> Result<()> { + pub async fn run(&mut self, mut gossip_actor: GossipActor) -> Result<()> { + let me = self.endpoint.node_id().fmt_short(); + let gossip_handle = tokio::task::spawn( + async move { + if let Err(err) = gossip_actor.run().await { + error!("gossip recv actor failed: {err:?}"); + } + } + .instrument(error_span!("sync", %me)), + ); + let res = self.run_inner().await; if let Err(err) = self.shutdown().await { error!(?err, "Error during shutdown"); } + gossip_handle.await?; res } @@ -812,7 +823,7 @@ impl Subscribers { async fn send(&mut self, event: Event) -> bool { let futs = self.0.iter().map(|sender| sender.send_async(event.clone())); - let res = futures::future::join_all(futs).await; + let res = futures_buffered::join_all(futs).await; // reverse the order so removing does not shift remaining indices for (i, res) in res.into_iter().enumerate().rev() { if res.is_err() { diff --git a/iroh/src/sync_engine/rpc.rs b/iroh/src/sync_engine/rpc.rs index 923fb6f17f..605f875f2b 100644 --- a/iroh/src/sync_engine/rpc.rs +++ b/iroh/src/sync_engine/rpc.rs @@ -1,7 +1,7 @@ //! This module contains an impl block on [`SyncEngine`] with handlers for RPC requests use anyhow::anyhow; -use futures::Stream; +use futures_lite::Stream; use iroh_bytes::{store::Store as BaoStore, BlobFormat}; use iroh_sync::{Author, NamespaceSecret}; use tokio_stream::StreamExt; @@ -150,15 +150,16 @@ impl SyncEngine { })) } - pub fn doc_subscribe( + pub async fn doc_subscribe( &self, req: DocSubscribeRequest, - ) -> impl Stream> { - let stream = self.subscribe(req.doc_id); - stream.map(|res| { - res.map(|event| DocSubscribeResponse { event }) + ) -> RpcResult>> { + let stream = self.subscribe(req.doc_id).await?; + + Ok(stream.map(|el| { + el.map(|event| DocSubscribeResponse { event }) .map_err(Into::into) - }) + })) } pub async fn doc_import(&self, req: DocImportRequest) -> RpcResult { diff --git a/iroh/tests/gc.rs b/iroh/tests/gc.rs index 734306cc00..fe27b4f93d 100644 --- a/iroh/tests/gc.rs +++ b/iroh/tests/gc.rs @@ -6,7 +6,7 @@ use std::{ use anyhow::Result; use bao_tree::{blake3, io::sync::Outboard, ChunkRanges}; use bytes::Bytes; -use futures::FutureExt; +use futures_lite::FutureExt; use iroh::node::{self, Node}; use rand::RngCore; @@ -127,8 +127,7 @@ async fn gc_basics() -> Result<()> { step(&evs).await; assert_eq!(bao_store.entry_status(&h2).await?, EntryStatus::NotFound); - node.shutdown(); - node.await?; + node.shutdown().await?; Ok(()) } @@ -186,8 +185,7 @@ async fn gc_hashseq_impl() -> Result<()> { assert_eq!(bao_store.entry_status(&h2).await?, EntryStatus::NotFound); assert_eq!(bao_store.entry_status(&hr).await?, EntryStatus::NotFound); - node.shutdown(); - node.await?; + node.shutdown().await?; Ok(()) } @@ -200,7 +198,8 @@ mod file { io::fsm::{BaoContentItem, ResponseDecoderNext}, BaoTree, }; - use futures::StreamExt; + + use futures_lite::StreamExt; use iroh_io::AsyncSliceReaderExt; use testdir::testdir; @@ -380,8 +379,8 @@ mod file { assert!(!path(&hr).exists()); assert!(!outboard_path(&hr).exists()); - node.shutdown(); - node.await?; + node.shutdown().await?; + Ok(()) } @@ -475,8 +474,7 @@ mod file { assert!(!path(&h1).exists()); assert!(!outboard_path(&h1).exists()); - node.shutdown(); - node.await?; + node.shutdown().await?; Ok(()) } @@ -517,8 +515,7 @@ mod file { assert!(dir.join(format!("data/{}.data", h.to_hex())).exists()); } - node.shutdown(); - node.await?; + node.shutdown().await?; Ok(()) } } diff --git a/iroh/tests/provide.rs b/iroh/tests/provide.rs index dffacb86cd..72bd326158 100644 --- a/iroh/tests/provide.rs +++ b/iroh/tests/provide.rs @@ -5,9 +5,9 @@ use std::{ time::{Duration, Instant}, }; -use anyhow::{anyhow, Context, Result}; +use anyhow::{anyhow, Result}; use bytes::Bytes; -use futures::FutureExt; +use futures_lite::FutureExt; use iroh::{ dial::Options, node::{Builder, Event}, @@ -169,7 +169,7 @@ async fn multiple_clients() -> Result<()> { })); } - futures::future::try_join_all(tasks).await?; + futures_buffered::try_join_all(tasks).await?; Ok(()) } @@ -263,8 +263,7 @@ where .await .expect("duration expired"); - node.shutdown(); - node.await?; + node.shutdown().await?; assert_events(events, num_blobs + 1); @@ -314,7 +313,7 @@ async fn test_server_close() { let child_hash = db.insert(b"hello there"); let collection = Collection::from_iter([("hello", child_hash)]); let hash = db.insert_many(collection.to_blobs()).unwrap(); - let mut node = test_node(db).spawn().await.unwrap(); + let node = test_node(db).spawn().await.unwrap(); let node_addr = node.local_endpoint_addresses().await.unwrap(); let peer_id = node.node_id(); @@ -337,11 +336,12 @@ async fn test_server_close() { loop { tokio::select! { biased; - res = &mut node => break res.context("provider failed"), maybe_event = events_recv.recv() => { match maybe_event { Some(event) => match event { - Event::ByteProvide(provider::Event::TransferCompleted { .. }) => node.shutdown(), + Event::ByteProvide(provider::Event::TransferCompleted { .. }) => { + return node.shutdown().await; + }, Event::ByteProvide(provider::Event::TransferAborted { .. }) => { break Err(anyhow!("transfer aborted")); } @@ -353,9 +353,9 @@ async fn test_server_close() { } } }) - .await - .expect("supervisor timeout") - .expect("supervisor failed"); + .await + .expect("supervisor timeout") + .expect("supervisor failed"); } /// create an in memory test database containing the given entries and an iroh collection of all entries diff --git a/iroh/tests/sync.rs b/iroh/tests/sync.rs index f3ff05b2be..c4bbf174d4 100644 --- a/iroh/tests/sync.rs +++ b/iroh/tests/sync.rs @@ -7,7 +7,8 @@ use std::{ use anyhow::{anyhow, bail, Context, Result}; use bytes::Bytes; -use futures::{FutureExt, Stream, StreamExt, TryStreamExt}; +use futures_lite::Stream; +use futures_util::{FutureExt, StreamExt, TryStreamExt}; use iroh::{ client::{mem::Doc, Entry, LiveEvent}, node::{Builder, Node}, @@ -59,7 +60,7 @@ async fn spawn_nodes( for i in 0..n { futs.push(spawn_node(i, &mut rng)); } - futures::future::join_all(futs).await.into_iter().collect() + futures_buffered::join_all(futs).await.into_iter().collect() } pub fn test_rng(seed: &[u8]) -> rand_chacha::ChaCha12Rng { @@ -118,7 +119,7 @@ async fn sync_simple() -> Result<()> { .await; for node in nodes { - node.shutdown(); + node.shutdown().await?; } Ok(()) } @@ -139,7 +140,7 @@ async fn sync_subscribe_no_sync() -> Result<()> { matches!(event, Some(Ok(LiveEvent::InsertLocal { .. }))), "expected InsertLocal but got {event:?}" ); - node.shutdown(); + node.shutdown().await?; Ok(()) } @@ -396,7 +397,7 @@ async fn sync_full_basic() -> Result<()> { info!("shutdown"); for node in nodes { - node.shutdown(); + node.shutdown().await?; } Ok(()) @@ -811,7 +812,7 @@ async fn sync_big() -> Result<()> { info!("shutdown"); for node in nodes { - node.shutdown(); + node.shutdown().await?; } Ok(()) @@ -858,7 +859,7 @@ async fn publish( async fn collect_futures( futs: impl IntoIterator>>, ) -> anyhow::Result> { - futures::future::join_all(futs) + futures_buffered::join_all(futs) .await .into_iter() .collect::>>() @@ -974,7 +975,7 @@ async fn doc_delete() -> Result<()> { tokio::time::sleep(Duration::from_millis(200)).await; let bytes = client.blobs.read_to_bytes(hash).await; assert!(bytes.is_err()); - node.shutdown(); + node.shutdown().await?; Ok(()) }