diff --git a/.config/nextest.toml b/.config/nextest.toml new file mode 100644 index 00000000..a4a1e566 --- /dev/null +++ b/.config/nextest.toml @@ -0,0 +1,20 @@ +[profile.default] +retries = 0 +test-threads = "num-cpus" +threads-required = 1 + +[profile.ci] +retries = { backoff = "exponential", count = 3, delay = "30s", jitter = true, max-delay = "300s" } +failure-output = "immediate-final" +fail-fast = false + +[test-groups] +serial = { max-threads = 1 } + +[[profile.default.overrides]] +filter = 'test(/_serial$/)' +test-group = 'serial' + +[[profile.ci.overrides]] +filter = 'test(/_serial$/)' +test-group = 'serial' diff --git a/.dockerignore b/.dockerignore index 337fd68e..ce53d187 100644 --- a/.dockerignore +++ b/.dockerignore @@ -2,9 +2,10 @@ !**/Cargo.toml !**/Cargo.lock +!**/build.rs !**/src !**/config !**/migrations -!.env !diesel.toml -!**/wits + +homestar-functions/src diff --git a/.envrc b/.envrc index 5a4b3d00..2b74e72f 100644 --- a/.envrc +++ b/.envrc @@ -1,5 +1,5 @@ use_flake -export RUST_LOG=homestar_runtime=debug,libp2p=info,libp2p_gossipsub::behaviour=debug +export RUST_LOG=homestar_runtime=debug,libp2p=info,libp2p_gossipsub::behaviour=debug,tarpc=info,tower_http=debug export RUST_BACKTRACE=full export RUSTFLAGS="--cfg tokio_unstable" diff --git a/.github/workflows/coverage.yml b/.github/workflows/coverage.yml index 1f3df710..15b575aa 100644 --- a/.github/workflows/coverage.yml +++ b/.github/workflows/coverage.yml @@ -29,26 +29,11 @@ jobs: - name: Cache Project uses: Swatinem/rust-cache@v2 - - name: Generate Code coverage - env: - CARGO_INCREMENTAL: '0' - LLVM_PROFILE_FILE: "homestar-%p-%m.profraw" - RUSTFLAGS: '-Zprofile -Ccodegen-units=1 -Cinline-threshold=0 -Clink-dead-code -Coverflow-checks=off -Cpanic=abort -Zpanic_abort_tests' - RUSTDOCFLAGS: '-Zprofile -Ccodegen-units=1 -Cinline-threshold=0 -Clink-dead-code -Coverflow-checks=off -Cpanic=abort -Zpanic_abort_tests' - # covering nexttest's doc-test issue - run: cargo test --all-features - - - name: Install grcov - run: "curl -L https://github.com/mozilla/grcov/releases/download/v0.8.12/grcov-x86_64-unknown-linux-gnu.tar.bz2 | tar jxf -" - - - name: Run grcov - run: "./grcov . --llvm --binary-path target/debug/ -s . -t lcov --branch --ignore-not-existing --ignore '/*' -o lcov.info" - - - name: Install covfix - run: cargo install --force rust-covfix - - - name: Run covfix - run: rust-covfix lcov.info -o lcov.info --verbose + - name: Install cargo-llvm-cov + uses: taiki-e/install-action@cargo-llvm-cov + + - name: Generate Code Coverage + run: cargo llvm-cov --all-features --workspace --doctests --lcov --output-path lcov.info - name: Upload to codecov.io uses: codecov/codecov-action@v3 diff --git a/.github/workflows/tests_and_checks.yml b/.github/workflows/tests_and_checks.yml index f6beef45..b4662563 100644 --- a/.github/workflows/tests_and_checks.yml +++ b/.github/workflows/tests_and_checks.yml @@ -95,7 +95,7 @@ jobs: uses: taiki-e/install-action@nextest - name: Run Tests - run: cargo nextest run --all-features + run: cargo nextest run --profile ci --all-features - name: Run Doc Tests run: cargo test --doc @@ -128,7 +128,7 @@ jobs: uses: taiki-e/install-action@nextest - name: Run Tests - run: cargo nextest run --no-default-features + run: cargo nextest run --profile ci --no-default-features run-docs: runs-on: ubuntu-latest diff --git a/.gitignore b/.gitignore index ba1448f6..b7a825ff 100644 --- a/.gitignore +++ b/.gitignore @@ -21,5 +21,10 @@ private homestar-guest-wasm/out homestar-wasm/out +# daemon +homestar.err +homestar.out +homestar.pid + # locks homestar-wasm/Cargo.lock diff --git a/Cargo.lock b/Cargo.lock index 1ad31c9b..57d10744 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -119,15 +119,6 @@ version = "0.1.6" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "4b46cbb362ab8752921c97e041f5e366ee6297bd428a31275b9fcf1e380f7299" -[[package]] -name = "ansi_term" -version = "0.12.1" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "d52a9bb7ec0cf484c551830a7ce27bd20d67eac647e1befb56b0be4ee39a55d2" -dependencies = [ - "winapi", -] - [[package]] name = "anstream" version = "0.3.2" @@ -210,6 +201,21 @@ version = "0.7.6" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "155a5a185e42c6b77ac7b88a15143d930a9e9727a5b7b77eed417404ab15c247" +[[package]] +name = "assert_cmd" +version = "2.0.12" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "88903cb14723e4d4003335bb7f8a14f27691649105346a0f0957466c096adfe6" +dependencies = [ + "anstyle", + "bstr", + "doc-comment", + "predicates", + "predicates-core", + "predicates-tree", + "wait-timeout", +] + [[package]] name = "async-io" version = "1.13.0" @@ -354,6 +360,15 @@ dependencies = [ "rustc-demangle", ] +[[package]] +name = "backtrace-ext" +version = "0.2.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "537beee3be4a18fb023b570f80e3ae28003db9167a751266b259926e25539d50" +dependencies = [ + "backtrace", +] + [[package]] name = "base-x" version = "0.2.11" @@ -573,6 +588,17 @@ dependencies = [ "tinyvec", ] +[[package]] +name = "bstr" +version = "1.6.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "6798148dccfbff0fae41c7574d2fa8f1ef3492fba0face179de5d8d447d67b05" +dependencies = [ + "memchr", + "regex-automata 0.3.2", + "serde", +] + [[package]] name = "bumpalo" version = "3.13.0" @@ -610,6 +636,12 @@ dependencies = [ "syn 1.0.109", ] +[[package]] +name = "bytecount" +version = "0.6.3" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "2c676a478f63e9fa2dd5368a42f28bba0d6c560b775f38583c8bbaa7fcd67c9c" + [[package]] name = "bytemuck" version = "1.13.1" @@ -1306,6 +1338,15 @@ dependencies = [ "zeroize", ] +[[package]] +name = "daemonize" +version = "0.5.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "ab8bfdaacb3c887a54d41bdf48d3af8873b3f5566469f8ba21b92057509f116e" +dependencies = [ + "libc", +] + [[package]] name = "dagga" version = "0.2.1" @@ -1456,6 +1497,12 @@ dependencies = [ "syn 2.0.26", ] +[[package]] +name = "difflib" +version = "0.4.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "6184e33543162437515c2e2b48714794e37845ec9851711914eec9d308f6ebe8" + [[package]] name = "digest" version = "0.9.0" @@ -1570,6 +1617,18 @@ dependencies = [ "zeroize", ] +[[package]] +name = "educe" +version = "0.4.22" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "079044df30bb07de7d846d41a184c4b00e66ebdac93ee459253474f3a47e50ae" +dependencies = [ + "enum-ordinalize", + "proc-macro2", + "quote", + "syn 1.0.109", +] + [[package]] name = "either" version = "1.8.1" @@ -1620,6 +1679,19 @@ dependencies = [ "syn 1.0.109", ] +[[package]] +name = "enum-ordinalize" +version = "3.1.13" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "e4f76552f53cefc9a7f64987c3701b99d982f7690606fd67de1d09712fbf52f1" +dependencies = [ + "num-bigint", + "num-traits", + "proc-macro2", + "quote", + "syn 2.0.26", +] + [[package]] name = "env_logger" version = "0.10.0" @@ -1754,6 +1826,15 @@ dependencies = [ "miniz_oxide", ] +[[package]] +name = "float-cmp" +version = "0.9.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "98de4bbd547a563b716d8dfa9aad1cb19bfab00f4fa09a6a4ed21dbcf44ce9c4" +dependencies = [ + "num-traits", +] + [[package]] name = "flume" version = "0.10.14" @@ -2190,16 +2271,19 @@ name = "homestar-core" version = "0.1.0" dependencies = [ "anyhow", + "async-recursion", "byte-unit", "criterion", "diesel", "enum-as-inner 0.6.0", "enum-assoc", + "futures", "generic-array", "indexmap 2.0.0", "json", "libipld", "libsqlite3-sys", + "once_cell", "proptest", "rand 0.8.5", "serde", @@ -2224,8 +2308,8 @@ dependencies = [ name = "homestar-runtime" version = "0.1.0" dependencies = [ - "ansi_term", "anyhow", + "assert_cmd", "async-trait", "axum", "bincode 2.0.0-rc.3", @@ -2236,6 +2320,7 @@ dependencies = [ "console-subscriber", "criterion", "crossbeam", + "daemonize", "dagga", "dashmap", "diesel", @@ -2257,25 +2342,37 @@ dependencies = [ "libipld", "libp2p", "libsqlite3-sys", + "miette", "nix 0.26.2", + "once_cell", "openssl", + "predicates", "proptest", "rand 0.8.5", "reqwest", + "retry", "sec1", "semver", "serde", "serde_ipld_dagcbor", "serde_with", + "serial_test", + "stream-cancel", + "strum 0.25.0", + "sysinfo", + "tabled", + "tarpc", "thiserror", "tokio", "tokio-tungstenite 0.19.0", + "tokio-util", "tracing", "tracing-appender", "tracing-logfmt", "tracing-subscriber", "tryhard", "url", + "wait-timeout", ] [[package]] @@ -2676,6 +2773,12 @@ dependencies = [ "windows-sys", ] +[[package]] +name = "is_ci" +version = "1.1.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "616cde7c720bb2bb5824a224687d8f77bfd38922027f01d825cd7453be5099fb" + [[package]] name = "itertools" version = "0.10.5" @@ -3468,6 +3571,38 @@ dependencies = [ "autocfg", ] +[[package]] +name = "miette" +version = "5.10.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "59bb584eaeeab6bd0226ccf3509a69d7936d148cf3d036ad350abe35e8c6856e" +dependencies = [ + "backtrace", + "backtrace-ext", + "is-terminal", + "miette-derive", + "once_cell", + "owo-colors", + "supports-color", + "supports-hyperlinks", + "supports-unicode", + "terminal_size", + "textwrap", + "thiserror", + "unicode-width", +] + +[[package]] +name = "miette-derive" +version = "5.10.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "49e7bc1560b95a3c4a25d03de42fe76ca718ab92d1a22a55b9b4cf67b3ae635c" +dependencies = [ + "proc-macro2", + "quote", + "syn 2.0.26", +] + [[package]] name = "migrations_internals" version = "2.1.0" @@ -3796,6 +3931,21 @@ dependencies = [ "minimal-lexical", ] +[[package]] +name = "normalize-line-endings" +version = "0.3.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "61807f77802ff30975e01f4f071c8ba10c022052f98b3294119f3e615d13e5be" + +[[package]] +name = "ntapi" +version = "0.4.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "e8a3895c6391c39d7fe7ebc444a87eb2991b2a0bc718fdabd071eec617fc68e4" +dependencies = [ + "winapi", +] + [[package]] name = "nu-ansi-term" version = "0.46.0" @@ -3806,6 +3956,17 @@ dependencies = [ "winapi", ] +[[package]] +name = "num-bigint" +version = "0.4.3" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "f93ab6289c7b344a8a9f60f88d80aa20032336fe78da341afc91c8a2341fc75f" +dependencies = [ + "autocfg", + "num-integer", + "num-traits", +] + [[package]] name = "num-integer" version = "0.1.45" @@ -3940,6 +4101,49 @@ dependencies = [ "vcpkg", ] +[[package]] +name = "opentelemetry" +version = "0.18.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "69d6c3d7288a106c0a363e4b0e8d308058d56902adefb16f4936f417ffef086e" +dependencies = [ + "opentelemetry_api", + "opentelemetry_sdk", +] + +[[package]] +name = "opentelemetry_api" +version = "0.18.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "c24f96e21e7acc813c7a8394ee94978929db2bcc46cf6b5014fc612bf7760c22" +dependencies = [ + "futures-channel", + "futures-util", + "indexmap 1.9.3", + "js-sys", + "once_cell", + "pin-project-lite 0.2.10", + "thiserror", +] + +[[package]] +name = "opentelemetry_sdk" +version = "0.18.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "1ca41c4933371b61c2a2f214bf16931499af4ec90543604ec828f7a625c09113" +dependencies = [ + "async-trait", + "crossbeam-channel", + "futures-channel", + "futures-executor", + "futures-util", + "once_cell", + "opentelemetry_api", + "percent-encoding", + "rand 0.8.5", + "thiserror", +] + [[package]] name = "ordered-multimap" version = "0.4.3" @@ -3956,6 +4160,12 @@ version = "0.1.1" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "b15813163c1d831bf4a13c3610c05c0d03b39feb07f7e09fa234dac9b15aaf39" +[[package]] +name = "owo-colors" +version = "3.5.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "c1b04fb49957986fdce4d6ee7a65027d55d4b6d2265e5848bbb507b58ccfdb6f" + [[package]] name = "packed_simd_2" version = "0.3.8" @@ -3966,6 +4176,16 @@ dependencies = [ "libm 0.1.4", ] +[[package]] +name = "papergrid" +version = "0.9.1" +source = "git+https://github.com/zhiburt/tabled.git#0f9b50134f689b104e33d417052baef671472815" +dependencies = [ + "bytecount", + "fnv", + "unicode-width", +] + [[package]] name = "parking" version = "2.1.0" @@ -4212,6 +4432,37 @@ version = "0.2.17" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "5b40af805b3121feab8a3c29f04d8ad262fa8e0561883e7653e024ae4479e6de" +[[package]] +name = "predicates" +version = "3.0.3" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "09963355b9f467184c04017ced4a2ba2d75cbcb4e7462690d388233253d4b1a9" +dependencies = [ + "anstyle", + "difflib", + "float-cmp", + "itertools 0.10.5", + "normalize-line-endings", + "predicates-core", + "regex", +] + +[[package]] +name = "predicates-core" +version = "1.0.6" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "b794032607612e7abeb4db69adb4e33590fa6cf1149e95fd7cb00e634b92f174" + +[[package]] +name = "predicates-tree" +version = "1.0.9" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "368ba315fb8c5052ab692e68a0eefec6ec57b23a36959c14496f0b0df2c0cecf" +dependencies = [ + "predicates-core", + "termtree", +] + [[package]] name = "proc-macro-crate" version = "0.1.5" @@ -4719,6 +4970,15 @@ dependencies = [ "quick-error", ] +[[package]] +name = "retry" +version = "2.0.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "9166d72162de3575f950507683fac47e30f6f2c3836b71b7fbc61aa517c9c5f4" +dependencies = [ + "rand 0.8.5", +] + [[package]] name = "ring" version = "0.16.20" @@ -5144,6 +5404,31 @@ dependencies = [ "syn 2.0.26", ] +[[package]] +name = "serial_test" +version = "2.0.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "0e56dd856803e253c8f298af3f4d7eb0ae5e23a737252cd90bb4f3b435033b2d" +dependencies = [ + "dashmap", + "futures", + "lazy_static", + "log", + "parking_lot", + "serial_test_derive", +] + +[[package]] +name = "serial_test_derive" +version = "2.0.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "91d129178576168c589c9ec973feedf7d3126c01ac2bf08795109aa35b69fb8f" +dependencies = [ + "proc-macro2", + "quote", + "syn 2.0.26", +] + [[package]] name = "sha-1" version = "0.9.8" @@ -5274,6 +5559,12 @@ version = "1.11.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "62bb4feee49fdd9f707ef802e22365a35de4b7b299de4763d44bfea899442ff9" +[[package]] +name = "smawk" +version = "0.3.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "f67ad224767faa3c7d8b6d91985b78e70a1324408abcb1cfcc2be4c06bc06043" + [[package]] name = "snafu" version = "0.7.4" @@ -5413,6 +5704,17 @@ version = "1.1.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "a2eb9349b6444b326872e140eb1cf5e7c522154d69e7a0ffb0fb81c06b37543f" +[[package]] +name = "stream-cancel" +version = "0.8.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "7b0a9eb2715209fb8cc0d942fcdff45674bfc9f0090a0d897e85a22955ad159b" +dependencies = [ + "futures-core", + "pin-project", + "tokio", +] + [[package]] name = "strsim" version = "0.10.0" @@ -5425,6 +5727,15 @@ version = "0.24.1" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "063e6045c0e62079840579a7e47a355ae92f60eb74daaf156fb1e84ba164e63f" +[[package]] +name = "strum" +version = "0.25.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "290d54ea6f91c969195bdbcd7442c8c2a2ba87da8bf60a7ee86a235d4bc1e125" +dependencies = [ + "strum_macros", +] + [[package]] name = "strum_macros" version = "0.25.1" @@ -5444,6 +5755,34 @@ version = "2.5.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "81cdd64d312baedb58e21336b31bc043b77e01cc99033ce76ef539f78e965ebc" +[[package]] +name = "supports-color" +version = "2.0.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "4950e7174bffabe99455511c39707310e7e9b440364a2fcb1cc21521be57b354" +dependencies = [ + "is-terminal", + "is_ci", +] + +[[package]] +name = "supports-hyperlinks" +version = "2.1.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "f84231692eb0d4d41e4cdd0cabfdd2e6cd9e255e65f80c9aa7c98dd502b4233d" +dependencies = [ + "is-terminal", +] + +[[package]] +name = "supports-unicode" +version = "2.0.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "4b6c2cb240ab5dd21ed4906895ee23fe5a48acdbd15a3ce388e7b62a9b66baf7" +dependencies = [ + "is-terminal", +] + [[package]] name = "syn" version = "1.0.109" @@ -5497,6 +5836,21 @@ dependencies = [ "walkdir", ] +[[package]] +name = "sysinfo" +version = "0.29.5" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "6b949f01f9c23823744b71e0060472ecbde578ef68cc2a9e46d114efd77c3034" +dependencies = [ + "cfg-if", + "core-foundation-sys", + "libc", + "ntapi", + "once_cell", + "rayon", + "winapi", +] + [[package]] name = "system-configuration" version = "0.5.1" @@ -5534,6 +5888,29 @@ dependencies = [ "winx 0.36.1", ] +[[package]] +name = "tabled" +version = "0.12.2" +source = "git+https://github.com/zhiburt/tabled.git#0f9b50134f689b104e33d417052baef671472815" +dependencies = [ + "papergrid", + "tabled_derive", + "unicode-width", +] + +[[package]] +name = "tabled_derive" +version = "0.6.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "99f688a08b54f4f02f0a3c382aefdb7884d3d69609f785bd253dc033243e3fe4" +dependencies = [ + "heck", + "proc-macro-error", + "proc-macro2", + "quote", + "syn 1.0.109", +] + [[package]] name = "tap" version = "1.0.1" @@ -5546,6 +5923,41 @@ version = "0.12.8" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "1b1c7f239eb94671427157bd93b3694320f3668d4e1eff08c7285366fd777fac" +[[package]] +name = "tarpc" +version = "0.33.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "6f41bce44d290df0598ae4b9cd6ea7f58f651fd3aa4af1b26060c4fa32b08af7" +dependencies = [ + "anyhow", + "fnv", + "futures", + "humantime", + "opentelemetry", + "pin-project", + "rand 0.8.5", + "serde", + "static_assertions", + "tarpc-plugins", + "thiserror", + "tokio", + "tokio-serde", + "tokio-util", + "tracing", + "tracing-opentelemetry", +] + +[[package]] +name = "tarpc-plugins" +version = "0.12.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "0ee42b4e559f17bce0385ebf511a7beb67d5cc33c12c96b7f4e9789919d9c10f" +dependencies = [ + "proc-macro2", + "quote", + "syn 1.0.109", +] + [[package]] name = "tempfile" version = "3.6.0" @@ -5569,6 +5981,33 @@ dependencies = [ "winapi-util", ] +[[package]] +name = "terminal_size" +version = "0.1.17" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "633c1a546cee861a1a6d0dc69ebeca693bf4296661ba7852b9d21d159e0506df" +dependencies = [ + "libc", + "winapi", +] + +[[package]] +name = "termtree" +version = "0.4.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "3369f5ac52d5eb6ab48c6b4ffdc8efbcad6b89c765749064ba298f2c68a16a76" + +[[package]] +name = "textwrap" +version = "0.15.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "b7b3e525a49ec206798b40326a44121291b530c963cfb01018f63e135bac543d" +dependencies = [ + "smawk", + "unicode-linebreak", + "unicode-width", +] + [[package]] name = "thiserror" version = "1.0.43" @@ -5713,6 +6152,21 @@ dependencies = [ "tokio", ] +[[package]] +name = "tokio-serde" +version = "0.8.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "911a61637386b789af998ee23f50aa30d5fd7edcec8d6d3dedae5e5815205466" +dependencies = [ + "bincode 1.3.3", + "bytes", + "educe", + "futures-core", + "futures-sink", + "pin-project", + "serde", +] + [[package]] name = "tokio-stream" version = "0.1.14" @@ -5758,6 +6212,7 @@ dependencies = [ "futures-core", "futures-sink", "pin-project-lite 0.2.10", + "slab", "tokio", "tracing", ] @@ -5933,6 +6388,19 @@ dependencies = [ "tracing-subscriber", ] +[[package]] +name = "tracing-opentelemetry" +version = "0.18.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "21ebb87a95ea13271332df069020513ab70bdb5637ca42d6e492dc3bbbad48de" +dependencies = [ + "once_cell", + "opentelemetry", + "tracing", + "tracing-core", + "tracing-subscriber", +] + [[package]] name = "tracing-subscriber" version = "0.3.17" @@ -6091,7 +6559,7 @@ dependencies = [ "rand 0.8.5", "serde", "serde_json", - "strum", + "strum 0.24.1", "strum_macros", "unsigned-varint", "url", @@ -6142,6 +6610,16 @@ version = "1.0.10" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "22049a19f4a68748a168c0fc439f9516686aa045927ff767eca0a85101fb6e73" +[[package]] +name = "unicode-linebreak" +version = "0.1.4" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "c5faade31a542b8b35855fff6e8def199853b2da8da256da52f52f1316ee3137" +dependencies = [ + "hashbrown 0.12.3", + "regex", +] + [[package]] name = "unicode-normalization" version = "0.1.22" diff --git a/Cargo.toml b/Cargo.toml index a3f7821a..d080e4a7 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -22,11 +22,13 @@ async-trait = "0.1" byte-unit = { version = "4.0", default-features = false } enum-assoc = " 1.1" enum-as-inner = "0.6" +futures = "0.3" libipld = { version = "0.16", features = ["serde-codec"] } rand = "0.8" serde_ipld_dagcbor = "0.3" thiserror = "1.0" -tokio = { version = "1.29", features = ["fs", "io-util", "io-std", "macros", "rt", "rt-multi-thread", "signal", "tracing"] } +tokio = { version = "1.29", features = ["fs", "io-util", "io-std", "macros", "rt", + "rt-multi-thread", "signal", "sync", "tracing"] } tracing = "0.1" ucan = "0.4" diff --git a/deny.toml b/deny.toml index 6d3e47b7..2d1fff76 100644 --- a/deny.toml +++ b/deny.toml @@ -194,6 +194,7 @@ unknown-git = "deny" allow-registry = ["https://github.com/rust-lang/crates.io-index"] # List of URLs for allowed Git repositories allow-git = [ + "https://github.com/zhiburt/tabled.git", "https://github.com/ucan-wg/rs-ucan", "https://github.com/bytecodealliance/preview2-prototyping", "https://github.com/bytecodealliance/wasmtime", diff --git a/docker/Dockerfile b/docker/Dockerfile index bda5558c..8a2e7404 100644 --- a/docker/Dockerfile +++ b/docker/Dockerfile @@ -9,6 +9,9 @@ FROM --platform=$BUILDPLATFORM messense/rust-musl-cross:aarch64-musl as builder- ARG TARGETARCH FROM builder-$TARGETARCH as builder +ARG database_url="homestar.db" +ENV DATABASE_URL=${database_url} + RUN adduser --disabled-password --disabled-login --gecos "" --no-create-home homestar RUN cargo init @@ -22,7 +25,7 @@ RUN cargo init --lib homestar-core && \ RUN echo "fn main() {}" > ./homestar-runtime/src/main.rs # copy cargo.* -COPY Cargo.lock Cargo.toml .env diesel.toml ./ +COPY Cargo.lock Cargo.toml diesel.toml ./ COPY ../homestar-core/Cargo.toml ./homestar-core/ COPY ../homestar-functions/Cargo.toml ./homestar-functions/ COPY ../homestar-wasm/Cargo.toml ./homestar-wasm/ @@ -31,9 +34,10 @@ COPY ../homestar-runtime/Cargo.toml ../homestar-runtime/migrations ./homestar-ru # cache depencies RUN mkdir .cargo RUN cargo vendor > .cargo/config -RUN cargo install diesel_cli --no-default-features --features "sqlite-bundled" -RUN diesel setup -RUN diesel migration run +RUN --mount=type=cache,target=$CARGO_HOME/registry \ + cargo install diesel_cli --no-default-features --features "sqlite-bundled" +RUN diesel setup --database-url $DATABASE_URL +RUN diesel migration run --migration-dir ./homestar-runtime/migrations RUN --mount=type=cache,id=cargo,target=$CARGO_HOME/registry \ --mount=type=cache,id=git,target=$CARGO_HOME/.git \ --mount=type=cache,id=target,target=./target,sharing=locked \ @@ -43,30 +47,47 @@ RUN --mount=type=cache,id=cargo,target=$CARGO_HOME/registry \ COPY . ./ # final build for release -RUN cargo build --workspace --target $CARGO_BUILD_TARGET --release -RUN musl-strip ./target/$CARGO_BUILD_TARGET/release/homestar-runtime -RUN mv ./target/$CARGO_BUILD_TARGET/release/homestar-runtime /usr/local/bin/homestar-runtime +RUN cargo build -p homestar-runtime --target $CARGO_BUILD_TARGET --release --offline RUN mv ./*.db /etc/ +RUN musl-strip ./target/$CARGO_BUILD_TARGET/release/homestar-runtime +RUN mv ./target/$CARGO_BUILD_TARGET/release/homestar-runtime /usr/local/bin/homestar-runtime RUN mv ./homestar-runtime/config /etc/config RUN mv $CARGO_HOME/bin/diesel /usr/local/bin/diesel +RUN chmod +x /usr/local/bin/diesel FROM scratch ARG backtrace=0 -ARG log_level=info +ARG log_levels=homestar_runtime=info,libp2p=info,tarpc=info ENV RUST_BACKTRACE=${backtrace} \ - RUST_LOG=${log_level} + RUST_LOG=${log_levels} -COPY ../homestar-runtime/migrations .env diesel.toml ./ -COPY --from=builder /usr/local/bin/homestar-runtime ./homestar -COPY --from=builder /usr/local/bin/diesel /usr/local/bin/diesel -COPY --from=builder /etc/*.db ./ -COPY --from=builder /etc/config ./config COPY --from=builder /etc/passwd /etc/passwd COPY --from=builder /etc/group /etc/group USER homestar:homestar +ENV HOME=/home/runner USER=homestar GROUP=homestar +WORKDIR /home/runner + +COPY --chown=homestar:homestar diesel.toml ./ +COPY --chown=homestar:homestar ../homestar-runtime/migrations ./migrations +COPY --chown=homestar:homestar --from=builder /usr/local/bin/homestar-runtime ./homestar +COPY --chown=homestar:homestar --from=builder /usr/local/bin/diesel /usr/local/bin/diesel +COPY --chown=homestar:homestar --from=builder /etc/*.db ./ +COPY --chown=homestar:homestar --from=builder /etc/config ./config + +ARG database_url="homestar.db" +ARG rpc_host="127.0.0.1" +ARG rpc_port=3030 +ARG ws_port=1337 + +ENV DATABASE_URL=${database_url} \ + HOMESTAR__NODE__NETWORK__RPC_HOST=${rpc_host} \ + HOMESTAR__NODE__NETWORK__RPC_PORT=${rpc_port} \ + HOMESTAR__NODE__NETWORK__WS_PORT=${ws_port} + +EXPOSE ${rpc_port} ${ws_port} -ENTRYPOINT ["./homestar"] +ENTRYPOINT ["./homestar", "start", "--db", "homestar.db"] diff --git a/flake.nix b/flake.nix index 0120ead9..4ce6725c 100644 --- a/flake.nix +++ b/flake.nix @@ -77,6 +77,10 @@ cargo doc --no-deps --document-private-items --open ''; + docAll = pkgs.writeScriptBin "doc-all" '' + cargo doc --document-private-items --open + ''; + compileWasm = pkgs.writeScriptBin "compile-wasm" '' cargo build -p homestar-functions --target wasm32-unknown-unknown --release ''; @@ -160,6 +164,7 @@ db dbReset doc + docAll compileWasm (builtins.map (arch: dockerBuild arch) ["amd64" "arm64"]) (builtins.map (cmd: xFunc cmd) ["build" "check" "run" "clippy"]) diff --git a/homestar-core/Cargo.toml b/homestar-core/Cargo.toml index 7a4e9e21..756f7088 100644 --- a/homestar-core/Cargo.toml +++ b/homestar-core/Cargo.toml @@ -22,14 +22,17 @@ doctest = true # return to version.workspace = true after the following issue is fixed: # https://github.com/DevinR528/cargo-sort/issues/47 anyhow = { workspace = true } +async-recursion = "1.0" byte-unit = { workspace = true } diesel = { version = "2.0", features = ["sqlite"] } enum-as-inner = { workspace = true } enum-assoc = { workspace = true } +futures = { workspace = true } generic-array = { version = "0.14", features = ["serde"] } indexmap = "2.0" libipld = { workspace = true } libsqlite3-sys = { version = "0.26", features = ["bundled"] } +once_cell = { version = "1.18", optional = true } proptest = { version = "1.2", optional = true } rand = { workspace = true } serde = { version = "1.0", features = ["derive"] } @@ -47,7 +50,7 @@ json = "0.12" [features] default = [] -test_utils = ["proptest"] +test_utils = ["once_cell", "proptest"] [package.metadata.docs.rs] all-features = true diff --git a/homestar-core/src/test_utils/mod.rs b/homestar-core/src/test_utils/mod.rs index cb6162ab..479873bc 100644 --- a/homestar-core/src/test_utils/mod.rs +++ b/homestar-core/src/test_utils/mod.rs @@ -2,7 +2,8 @@ #[cfg(feature = "test_utils")] pub mod cid; -/// Random value generator for sampling data. +#[cfg(feature = "test_utils")] +pub mod ports; #[cfg(feature = "test_utils")] mod rvg; #[cfg(feature = "test_utils")] diff --git a/homestar-core/src/test_utils/ports.rs b/homestar-core/src/test_utils/ports.rs new file mode 100644 index 00000000..362c2868 --- /dev/null +++ b/homestar-core/src/test_utils/ports.rs @@ -0,0 +1,14 @@ +//! Monotonic ports. + +use once_cell::sync::OnceCell; +use rand::Rng; +use std::sync::atomic::{AtomicUsize, Ordering}; + +static PORTS: OnceCell = OnceCell::new(); + +/// Return a unique port(in runtime) for test +pub fn get_port() -> usize { + PORTS + .get_or_init(|| AtomicUsize::new(rand::thread_rng().gen_range(3000..3800))) + .fetch_add(1, Ordering::Relaxed) +} diff --git a/homestar-core/src/test_utils/rvg.rs b/homestar-core/src/test_utils/rvg.rs index fd94aebb..eb40b7c5 100644 --- a/homestar-core/src/test_utils/rvg.rs +++ b/homestar-core/src/test_utils/rvg.rs @@ -1,3 +1,4 @@ +/// Random value generator for sampling data. use proptest::{ collection::vec, strategy::{Strategy, ValueTree}, diff --git a/homestar-core/src/workflow/input.rs b/homestar-core/src/workflow/input.rs index 4fc83d55..2e607453 100644 --- a/homestar-core/src/workflow/input.rs +++ b/homestar-core/src/workflow/input.rs @@ -11,9 +11,11 @@ use crate::workflow::{ pointer::{Await, AwaitResult, ERR_BRANCH, OK_BRANCH, PTR_BRANCH}, InstructionResult, Pointer, }; +use async_recursion::async_recursion; +use futures::{future, future::BoxFuture}; use libipld::{serde::from_ipld, Cid, Ipld}; use serde::{Deserialize, Serialize}; -use std::{collections::btree_map::BTreeMap, result::Result}; +use std::{collections::btree_map::BTreeMap, result::Result, sync::Arc}; mod parse; pub use parse::*; @@ -65,13 +67,16 @@ where /// [awaited promises]: Await /// [inputs]: Input /// [resolving Ipld links]: resolve_links - pub fn resolve(self, lookup_fn: F) -> Result + pub async fn resolve<'a, F>(self, lookup_fn: F) -> Result where - F: FnMut(Cid) -> Result, ResolveError> + Clone, + F: Fn(Cid) -> BoxFuture<'a, Result, ResolveError>> + + Clone + + Send + + Sync, Ipld: From, { let inputs = resolve_args(self.0, lookup_fn); - Ok(Args(inputs)) + Ok(Args(inputs.await)) } } @@ -144,26 +149,29 @@ impl Input { /// [awaited promises]: Await /// [inputs]: Input /// [resolving Ipld links]: resolve_links - pub fn resolve(self, mut lookup_fn: F) -> Input + pub async fn resolve<'a, F>(self, lookup_fn: F) -> Input where - F: FnMut(Cid) -> Result, ResolveError> + Clone, + F: Fn(Cid) -> BoxFuture<'a, Result, ResolveError>> + + Clone + + Send + + Sync, Ipld: From, { match self { Input::Ipld(ipld) => { if let Ok(await_promise) = Await::try_from(&ipld) { - if let Ok(func_ret) = lookup_fn(await_promise.instruction_cid()) { + if let Ok(func_ret) = lookup_fn(await_promise.instruction_cid()).await { Input::Arg(func_ret) } else { Input::Deferred(await_promise) } } else { - Input::Ipld(resolve_links(ipld, lookup_fn)) + Input::Ipld(resolve_links(ipld, lookup_fn.into()).await) } } Input::Arg(ref _arg) => self, Input::Deferred(await_promise) => { - if let Ok(func_ret) = lookup_fn(await_promise.instruction_cid()) { + if let Ok(func_ret) = lookup_fn(await_promise.instruction_cid()).await { Input::Arg(func_ret) } else { Input::Deferred(await_promise) @@ -229,65 +237,81 @@ where } } -fn resolve_args(args: Vec>, lookup_fn: F) -> Vec> +async fn resolve_args<'a, T, F>(args: Vec>, lookup_fn: F) -> Vec> where - F: FnMut(Cid) -> Result, ResolveError> + Clone, + F: Fn(Cid) -> BoxFuture<'a, Result, ResolveError>> + Clone + Send + Sync, Ipld: From, { let args = args.into_iter().map(|v| v.resolve(lookup_fn.clone())); - args.collect() + future::join_all(args).await.into_iter().collect() } /// Resolve [awaited promises] for *only* [Ipld] data, given a lookup function. /// /// [awaited promises]: Await -pub fn resolve_links(ipld: Ipld, mut lookup_fn: F) -> Ipld +#[async_recursion] +pub async fn resolve_links<'a, T, F>(ipld: Ipld, lookup_fn: Arc) -> Ipld where - F: FnMut(Cid) -> Result, ResolveError> + Clone, + F: Fn(Cid) -> BoxFuture<'a, Result, ResolveError>> + Clone + Sync + Send, Ipld: From, { match ipld { Ipld::Map(m) => { - let btree = m.into_iter().map(|(k, v)| match v { - Ipld::Link(cid) => { - if let Ok(func_ret) = lookup_fn(cid) { - if k.eq(PTR_BRANCH) { - (k, func_ret.into()) + let futures = m.into_iter().map(|(k, v)| async { + match v { + Ipld::Link(cid) => { + let mut f = Arc::clone(&lookup_fn); + if let Ok(func_ret) = Arc::make_mut(&mut f)(cid).await { + if k.eq(PTR_BRANCH) { + (k, func_ret.into()) + } else { + (k, func_ret.into_inner().into()) + } } else { - (k, func_ret.into_inner().into()) + (k, v) } - } else { - (k, v) } + Ipld::Map(ref m) => { + let resolved = resolve_links(Ipld::Map(m.clone()), lookup_fn.clone()).await; + (k, resolved) + } + Ipld::List(ref l) => { + let resolved = + resolve_links(Ipld::List(l.clone()), lookup_fn.clone()).await; + (k, resolved) + } + _ => (k, v), } - Ipld::Map(ref m) => { - let resolved = resolve_links(Ipld::Map(m.clone()), lookup_fn.clone()); - (k, resolved) - } - Ipld::List(ref l) => { - let resolved = resolve_links(Ipld::List(l.clone()), lookup_fn.clone()); - (k, resolved) - } - _ => (k, v), }); - - Ipld::Map(btree.collect::>()) + let resolved_results = future::join_all(futures).await; + Ipld::Map( + resolved_results + .into_iter() + .collect::>(), + ) } Ipld::List(l) => { - let list = l.into_iter().map(|v| match v { - Ipld::Link(cid) => { - if let Ok(func_ret) = lookup_fn(cid) { - func_ret.into_inner().into() - } else { - v + let futures = l.into_iter().map(|v| async { + match v { + Ipld::Link(cid) => { + let mut f = Arc::clone(&lookup_fn); + if let Ok(func_ret) = Arc::make_mut(&mut f)(cid).await { + func_ret.into_inner().into() + } else { + v + } } + Ipld::Map(ref m) => { + resolve_links(Ipld::Map(m.clone()), lookup_fn.clone()).await + } + Ipld::List(ref l) => { + resolve_links(Ipld::List(l.clone()), lookup_fn.clone()).await + } + _ => v, } - Ipld::Map(ref m) => resolve_links(Ipld::Map(m.clone()), lookup_fn.clone()), - Ipld::List(ref l) => resolve_links(Ipld::List(l.clone()), lookup_fn.clone()), - _ => v, }); - - Ipld::List(list.collect()) + let resolved_results = future::join_all(futures).await; + Ipld::List(resolved_results) } _ => ipld, } diff --git a/homestar-runtime/Cargo.toml b/homestar-runtime/Cargo.toml index 64563a11..08da9f87 100644 --- a/homestar-runtime/Cargo.toml +++ b/homestar-runtime/Cargo.toml @@ -26,7 +26,6 @@ doc = false bench = false [dependencies] -ansi_term = { version = "0.12", optional = true, default-features = false } # return to version.workspace = true after the following issue is fixed: # https://github.com/DevinR528/cargo-sort/issues/47 anyhow = { workspace = true } @@ -34,19 +33,21 @@ async-trait = "0.1" axum = { version = "0.6", features = ["ws", "headers"], optional = true } bincode = { version = "2.0.0-rc.3", features = ["serde"] } byte-unit = { workspace = true } -clap = { version = "4.3", features = ["derive", "color", "help"] } +clap = { version = "4.3", features = ["derive", "color", "help", "env"] } concat-in-place = "1.1" config = "0.13" console-subscriber = { version = "0.1", default-features = false, features = [ "parking_lot" ], optional = true } crossbeam = "0.8" +daemonize = "0.5" dagga = "0.2" dashmap = "5.5" diesel = { version = "2.1", features = ["sqlite", "r2d2", "returning_clauses_for_sqlite_3_35"] } +diesel_migrations = "2.1" dotenvy = "0.15" enum-assoc = { workspace = true } evmap = "10.0" fnv = "1.0" -futures = "0.3" +futures = { workspace = true } headers = "0.3" homestar-core = { version = "0.1", path = "../homestar-core" } homestar-wasm = { version = "0.1", path = "../homestar-wasm" } @@ -60,6 +61,7 @@ libp2p = { version = "0.52", default-features = false, features = ["kad", "reque "identify", "ed25519", "secp256k1", "mdns", "gossipsub", "request-response", "tokio", "dns", "tcp", "noise", "cbor", "yamux", "websocket"] } libsqlite3-sys = { version = "0.26", features = ["bundled"] } +miette = { version = "5.10", features = ["fancy"] } openssl = { version = "0.10", features = ["vendored"] } proptest = { version = "1.2", optional = true } rand = "0.8" @@ -69,8 +71,13 @@ semver = "1.0" serde = { version = "1.0", features = ["derive"] } serde_ipld_dagcbor = { workspace = true } serde_with = { version = "3.1", features = ["base64"] } +stream-cancel = "0.8" +strum = { version = "0.25", features = ["derive"] } +tabled = { git = "https://github.com/zhiburt/tabled.git" } +tarpc = { version = "0.33", features = ["serde-transport", "serde-transport-bincode", "tcp"] } thiserror = { workspace = true } tokio = { workspace = true } +tokio-util = { version = "0.7", features = ["time"] } tracing = { workspace = true } tracing-appender = "0.2" tracing-logfmt = "0.3" @@ -78,22 +85,30 @@ tracing-subscriber = { version = "0.3", features = ["env-filter", "parking_lot", tryhard = "0.5" url = "2.4" +[target.'cfg(not(windows))'.dependencies] +daemonize = "0.5" + [dev-dependencies] +assert_cmd = "2.0" criterion = "0.5" -diesel_migrations = "2.1" homestar-core = { version = "0.1", path = "../homestar-core", features = [ "test_utils" ] } json = "0.12" nix = "0.26" +once_cell = "1.18" +predicates = "3.0" rand = { workspace = true } +retry = "2.0" +serial_test = "2.0" +sysinfo = "0.29" tokio-tungstenite = "0.19" +wait-timeout = "0.2" [features] default = ["ipfs", "websocket-server"] -ansi-logs = ["ansi_term"] -console = ["console-subscriber"] -ipfs = ["ipfs-api", "ipfs-api-backend-hyper"] -test_utils = ["proptest"] -websocket-server = ["axum"] +console = ["dep:console-subscriber"] +ipfs = ["dep:ipfs-api", "dep:ipfs-api-backend-hyper"] +test_utils = ["dep:proptest"] +websocket-server = ["dep:axum"] [package.metadata.docs.rs] all-features = true diff --git a/homestar-runtime/build.rs b/homestar-runtime/build.rs new file mode 100644 index 00000000..3a8149ef --- /dev/null +++ b/homestar-runtime/build.rs @@ -0,0 +1,3 @@ +fn main() { + println!("cargo:rerun-if-changed=migrations"); +} diff --git a/homestar-runtime/src/cli.rs b/homestar-runtime/src/cli.rs index 2fcca268..ae92e313 100644 --- a/homestar-runtime/src/cli.rs +++ b/homestar-runtime/src/cli.rs @@ -1,7 +1,19 @@ //! CLI commands/arguments. +use crate::network::rpc::Client; +use anyhow::anyhow; use clap::Parser; +use std::{ + net::{IpAddr, SocketAddr}, + path::PathBuf, + str::FromStr, +}; +mod error; +mod show; +pub use error::Error; + +const TMP_DIR: &str = "/tmp"; const HELP_TEMPLATE: &str = "{about} {version} USAGE: @@ -14,15 +26,6 @@ USAGE: #[derive(Parser, Debug)] #[command(author, version, about, long_about = None, help_template = HELP_TEMPLATE)] pub struct Cli { - /// Optional runtime configuration file, otherwise use defaults. - #[arg( - short = 'c', - long = "config", - value_name = "CONFIG", - help = "runtime configuration file" - )] - pub runtime_config: Option, - /// Homestar [Command]. #[clap(subcommand)] pub command: Command, @@ -31,6 +34,125 @@ pub struct Cli { /// CLI Argument types. #[derive(Debug, Parser)] pub enum Command { - /// Start the Runtime with the Homestar runner. - Start, + /// Start the Homestar runtime. + Start { + /// Database url, defaults to sqlite://homestar.db. + #[arg( + long = "db", + value_name = "DB", + env = "DATABASE_URL", + help = "SQLite database url" + )] + database_url: Option, + /// Optional runtime configuration file, otherwise use defaults. + #[arg( + short = 'c', + long = "config", + value_name = "CONFIG", + help = "runtime configuration file" + )] + runtime_config: Option, + /// Daemonize the runtime, false by default. + #[arg( + short = 'd', + long = "daemonize", + default_value_t = false, + help = "daemonize the runtime" + )] + daemonize: bool, + /// Directory to place daemon files, defaults to /tmp. + #[arg( + long = "daemon_dir", + default_value = TMP_DIR, + value_hint = clap::ValueHint::DirPath, + value_name = "DIR", + help = "directory to place daemon files" + )] + daemon_dir: PathBuf, + }, + /// Stop the Homestar runtime. + Stop { + #[arg( + long = "host", + default_value_t = String::from("::1"), + value_hint = clap::ValueHint::Hostname + )] + /// RPC Homestar runtime host to ping. + host: String, + #[arg(short = 'p', long = "port", default_value_t = 3030)] + /// RPC Homestar runtime port to ping. + port: u16, + }, + /// Ping the Homestar runtime. + Ping { + #[arg( + long = "host", + default_value_t = String::from("::1"), + value_hint = clap::ValueHint::Hostname + )] + /// RPC Homestar runtime host to ping. + host: String, + #[arg(short = 'p', long = "port", default_value_t = 3030)] + /// RPC Homestar runtime port to ping. + port: u16, + }, + /// Run a workflow, given a workflow json file. + Run { + /// Path to workflow json file. + #[arg( + short='w', + long = "workflow", + value_hint = clap::ValueHint::FilePath, + value_name = "FILE", + help = "path to workflow file" + )] + workflow: PathBuf, + }, +} + +impl Command { + fn name(&self) -> &'static str { + match self { + Command::Start { .. } => "start", + Command::Stop { .. } => "stop", + Command::Ping { .. } => "ping", + Command::Run { .. } => "run", + } + } + + /// Handle CLI commands related to [Client] RPC calls. + #[allow(clippy::unnecessary_wraps)] + pub fn handle_rpc_command(&self) -> Result<(), Error> { + // Spin up a new tokio runtime on the current thread. + let rt = tokio::runtime::Builder::new_current_thread() + .enable_all() + .build()?; + + match self { + Command::Ping { host, port } => { + let host = IpAddr::from_str(host).map_err(anyhow::Error::new)?; + let addr = SocketAddr::new(host, *port); + let response = rt.block_on(async { + let client = Client::new(addr).await?; + let response = client.ping().await?; + Ok::(response) + })?; + + show::Ping::table(addr, response).echo()?; + Ok(()) + } + Command::Stop { host, port } => { + let host = IpAddr::from_str(host).map_err(anyhow::Error::new)?; + let addr = SocketAddr::new(host, *port); + rt.block_on(async { + let client = Client::new(addr).await?; + let _ = client.stop().await?; + Ok::<(), Error>(()) + })?; + + Ok(()) + } + _ => Err(anyhow!("Invalid command {}", self.name()).into()), + } + } } diff --git a/homestar-runtime/src/cli/error.rs b/homestar-runtime/src/cli/error.rs new file mode 100644 index 00000000..c9e154de --- /dev/null +++ b/homestar-runtime/src/cli/error.rs @@ -0,0 +1,37 @@ +//! Error type for CLI / CLI-interaction. + +use miette::{miette, Diagnostic}; +use std::io; +use tarpc::client::RpcError; + +/// Error types for CLI / CLI-interaction. +#[derive(thiserror::Error, Debug, Diagnostic)] +pub enum Error { + /// Generic CLI error. + #[error("{error_message}")] + CliError { + /// Error message. + error_message: String, + }, + /// Propagated RPC error. + #[error(transparent)] + RpcError(#[from] RpcError), + /// Propagated IO error. + #[error("error writing data to console: {0}")] + WriteError(#[from] io::Error), +} + +impl Error { + /// Create a new [Error]. + pub fn new(err: miette::ErrReport) -> Self { + Error::CliError { + error_message: err.to_string(), + } + } +} + +impl From for Error { + fn from(e: anyhow::Error) -> Self { + Error::new(miette!(e.to_string())) + } +} diff --git a/homestar-runtime/src/cli/show.rs b/homestar-runtime/src/cli/show.rs new file mode 100644 index 00000000..89aafa90 --- /dev/null +++ b/homestar-runtime/src/cli/show.rs @@ -0,0 +1,64 @@ +use std::{ + io::{self, Write}, + net::SocketAddr, +}; +use tabled::{ + settings::{ + object::Rows, + style::{BorderColor, BorderSpanCorrection}, + themes::Colorization, + Alignment, Color, Modify, Panel, Style, + }, + Table, Tabled, +}; + +const TABLE_TITLE: &str = "homestar(╯°□°)╯"; + +/// Output response wrapper. +pub(crate) struct Output(String); + +impl Output { + /// Print ouput response to console via [io::stdout]. + pub(crate) fn echo(&self) -> Result<(), io::Error> { + let stdout = io::stdout(); + let mut handle = io::BufWriter::new(stdout); + writeln!(handle, "{}", self.0) + } +} + +/// Ping response for display. +#[derive(Tabled)] +pub(crate) struct Ping { + address: SocketAddr, + response: String, +} + +trait ApplyStyle { + fn default(&mut self) -> Output; +} + +impl ApplyStyle for Table { + fn default(&mut self) -> Output { + let table = self + .with(Style::modern()) + .with(Panel::header(TABLE_TITLE)) + .with(Modify::new(Rows::first()).with(Alignment::left())) + .with(Colorization::exact([Color::FG_WHITE], Rows::first())) + .with(Colorization::exact( + [Color::FG_BRIGHT_GREEN], + Rows::single(1), + )) + .with(BorderColor::filled(Color::FG_WHITE)) + .with(BorderSpanCorrection) + .to_string(); + + Output(table) + } +} + +impl Ping { + /// Display a singleton table of a `ping` response. + pub(crate) fn table(address: SocketAddr, response: String) -> Output { + Table::new(vec![Self { address, response }]).default() + } +} diff --git a/homestar-runtime/src/daemon.rs b/homestar-runtime/src/daemon.rs new file mode 100644 index 00000000..fa40400d --- /dev/null +++ b/homestar-runtime/src/daemon.rs @@ -0,0 +1,23 @@ +//! Daemonize the Homestar runtime. + +use anyhow::Result; +use std::path::PathBuf; + +const PID_FILE: &str = "homestar.pid"; + +/// Start the Homestar runtime as a daemon. +#[cfg(not(windows))] +pub fn start(dir: PathBuf) -> Result<()> { + daemonize::Daemonize::new() + .working_directory(dir.canonicalize()?) + .pid_file(PID_FILE) + .start()?; + + Ok(()) +} + +/// Start the Homestar runtime as a daemon. +#[cfg(windows)] +pub fn start(dir: PathBuf) -> Result<()> { + Err(anyhow::anyhow!("Daemonizing is not supported on Windows")) +} diff --git a/homestar-runtime/src/db.rs b/homestar-runtime/src/db.rs index 3fe2a4b9..40a117a9 100644 --- a/homestar-runtime/src/db.rs +++ b/homestar-runtime/src/db.rs @@ -13,14 +13,18 @@ use byte_unit::{AdjustedByte, Byte, ByteUnit}; use diesel::{ prelude::*, r2d2::{self, CustomizeConnection, ManageConnection}, - BelongingToDsl, RunQueryDsl, SqliteConnection, + BelongingToDsl, Connection as SingleConnection, RunQueryDsl, SqliteConnection, }; +use diesel_migrations::{embed_migrations, EmbeddedMigrations, MigrationHarness}; use dotenvy::dotenv; use homestar_core::workflow::Pointer; use libipld::Cid; use std::{env, sync::Arc, time::Duration}; use tokio::fs; +use tracing::info; +const ENV: &str = "DATABASE_URL"; +const MIGRATIONS: EmbeddedMigrations = embed_migrations!("migrations/"); const PRAGMAS: &str = " PRAGMA journal_mode = WAL; -- better write-concurrency PRAGMA synchronous = NORMAL; -- fsync only in critical moments @@ -51,23 +55,41 @@ impl Clone for Db { } impl Db { - fn url() -> String { - dotenv().ok(); - env::var("DATABASE_URL").expect("DATABASE_URL must be set") - } - /// Get size of SQlite file in megabytes (via async call). pub async fn size() -> Result { - let len = fs::metadata(Db::url()).await?.len(); + let url = env::var(ENV)?; + let len = fs::metadata(url).await?.len(); let byte = Byte::from_bytes(len); let byte_unit = byte.get_adjusted_unit(ByteUnit::MB); Ok(byte_unit) } + + /// Get database url. + /// + /// Contains a minimal side-effect to set the env if not already set. + pub fn set_url(database_url: Option) -> Option { + database_url.map_or_else( + || dotenv().ok().and_then(|_| env::var(ENV).ok()), + |url| { + env::set_var(ENV, &url); + Some(url) + }, + ) + } } /// Database trait for working with different Sqlite connection pool and /// connection configurations. pub trait Database: Send + Sync + Clone { + /// Test a Sqlite connection to the database and run pending migrations. + fn setup(url: &str) -> Result { + info!("Using database at {:?}", url); + let mut connection = SqliteConnection::establish(url)?; + let _ = connection.run_pending_migrations(MIGRATIONS); + + Ok(connection) + } + /// Establish a pooled connection to Sqlite database. fn setup_connection_pool(settings: &settings::Node) -> Result where @@ -189,7 +211,9 @@ pub trait Database: Send + Sync + Clone { impl Database for Db { fn setup_connection_pool(settings: &settings::Node) -> Result { - let manager = r2d2::ConnectionManager::::new(Db::url()); + let database_url = env::var(ENV)?; + Self::setup(&database_url)?; + let manager = r2d2::ConnectionManager::::new(database_url); // setup PRAGMAs manager diff --git a/homestar-runtime/src/event_handler.rs b/homestar-runtime/src/event_handler.rs index 1b86ba06..543a9356 100644 --- a/homestar-runtime/src/event_handler.rs +++ b/homestar-runtime/src/event_handler.rs @@ -15,7 +15,7 @@ use std::{sync::Arc, time::Duration}; use swarm_event::ResponseEvent; use tokio::{select, sync::mpsc}; -pub(crate) mod channel; +pub mod channel; pub(crate) mod error; pub(crate) mod event; pub(crate) mod swarm_event; @@ -37,8 +37,7 @@ where } /// Event loop handler for [libp2p] network events and commands. -#[allow(dead_code)] -#[allow(missing_debug_implementations)] +#[allow(missing_debug_implementations, dead_code)] pub(crate) struct EventHandler { receipt_quorum: usize, workflow_quorum: usize, diff --git a/homestar-runtime/src/event_handler/channel.rs b/homestar-runtime/src/event_handler/channel.rs index 53079d92..43bc80d4 100644 --- a/homestar-runtime/src/event_handler/channel.rs +++ b/homestar-runtime/src/event_handler/channel.rs @@ -4,31 +4,31 @@ use crossbeam::channel; /// Sender for a bounded [crossbeam::channel]. -pub(crate) type BoundedChannelSender = channel::Sender; +pub type BoundedChannelSender = channel::Sender; /// Receiver for a bounded [crossbeam::channel]. -#[allow(dead_code)] -pub(crate) type BoundedChannelReceiver = channel::Receiver; +pub type BoundedChannelReceiver = channel::Receiver; /// A bounded [crossbeam::channel] with a sender and receiver. +#[allow(dead_code)] #[derive(Debug, Clone)] -pub(crate) struct BoundedChannel { +pub struct BoundedChannel { /// Sender for the channel. - pub(crate) tx: channel::Sender, + tx: channel::Sender, /// REceiver for the channel. - pub(crate) rx: channel::Receiver, + rx: channel::Receiver, } impl BoundedChannel { /// Create a new [BoundedChannel] with a given capacity. - pub(crate) fn new(capacity: usize) -> Self { + pub fn with(capacity: usize) -> (BoundedChannelSender, BoundedChannelReceiver) { let (tx, rx) = channel::bounded(capacity); - Self { tx, rx } + (tx, rx) } /// Create a oneshot (1) [BoundedChannel]. - pub(crate) fn oneshot() -> Self { + pub fn oneshot() -> (BoundedChannelSender, BoundedChannelReceiver) { let (tx, rx) = channel::bounded(1); - Self { tx, rx } + (tx, rx) } } diff --git a/homestar-runtime/src/event_handler/error.rs b/homestar-runtime/src/event_handler/error.rs index 28d1ad68..6f55f29c 100644 --- a/homestar-runtime/src/event_handler/error.rs +++ b/homestar-runtime/src/event_handler/error.rs @@ -26,11 +26,11 @@ pub(crate) enum RequestResponseError { impl RequestResponseError { /// Encode the error into a byte vector via [bincode]. pub(crate) fn encode(&self) -> Result> { - bincode::encode_to_vec(self, bincode::config::standard()).map_err(anyhow::Error::msg) + bincode::encode_to_vec(self, bincode::config::standard()).map_err(anyhow::Error::new) } /// Decode the error from a byte vector via [bincode]. pub(crate) fn decode(bytes: &[u8]) -> Result<(Self, usize)> { - bincode::decode_from_slice(bytes, bincode::config::standard()).map_err(anyhow::Error::msg) + bincode::decode_from_slice(bytes, bincode::config::standard()).map_err(anyhow::Error::new) } } diff --git a/homestar-runtime/src/event_handler/event.rs b/homestar-runtime/src/event_handler/event.rs index cc8b302e..bd60d617 100644 --- a/homestar-runtime/src/event_handler/event.rs +++ b/homestar-runtime/src/event_handler/event.rs @@ -94,6 +94,7 @@ impl Event { ); } Event::Shutdown(tx) => { + info!("event_handler server shutting down"); event_handler.shutdown().await; let _ = tx.send(()); } @@ -178,7 +179,7 @@ impl Captured { Record::new(instruction_bytes, receipt_bytes.to_vec()), receipt_quorum, ) - .map_err(anyhow::Error::msg)?; + .map_err(anyhow::Error::new)?; // Store workflow_receipt join information. let _ = Db::store_workflow_receipt(self.workflow.cid, receipt_cid, conn); @@ -192,7 +193,7 @@ impl Captured { .behaviour_mut() .kademlia .start_providing(Key::new(&workflow_cid_bytes)) - .map_err(anyhow::Error::msg)?; + .map_err(anyhow::Error::new)?; let key = RequestResponseKey::new(self.workflow.cid.to_string(), CapsuleTag::Workflow); @@ -208,7 +209,7 @@ impl Captured { Record::new(workflow_cid_bytes, workflow_bytes), workflow_quorum, ) - .map_err(anyhow::Error::msg)?; + .map_err(anyhow::Error::new)?; // TODO: Handle Workflow Complete / Num of Tasks finished. diff --git a/homestar-runtime/src/event_handler/swarm_event.rs b/homestar-runtime/src/event_handler/swarm_event.rs index 5a56f48d..0b69d64d 100644 --- a/homestar-runtime/src/event_handler/swarm_event.rs +++ b/homestar-runtime/src/event_handler/swarm_event.rs @@ -181,11 +181,11 @@ async fn handle_swarm_event( }, sender, )) => { - let channel = BoundedChannel::oneshot(); + let (tx, rx) = BoundedChannel::oneshot(); if let Ok(cid) = Cid::try_from(cid_str.as_str()) { if let Err(err) = event_handler.sender().try_send(Event::GetProviders( - QueryRecord::with(cid, CapsuleTag::Workflow, channel.tx), + QueryRecord::with(cid, CapsuleTag::Workflow, tx), )) { error!(err=?err, "error opening channel to get providers"); @@ -193,7 +193,7 @@ async fn handle_swarm_event( return; } - match channel.rx.recv_deadline( + match rx.recv_deadline( Instant::now() + event_handler.p2p_provider_timeout, ) { Ok(ResponseEvent::Providers(Ok(providers))) => { @@ -202,10 +202,10 @@ async fn handle_swarm_event( cid_str.to_string(), CapsuleTag::Workflow, ); - let channel = BoundedChannel::oneshot(); + let (tx, _rx) = BoundedChannel::oneshot(); if let Err(err) = event_handler.sender().try_send( Event::OutboundRequest(PeerRequest::with( - peer, request, channel.tx, + peer, request, tx, )), ) { error!(err=?err, "error sending outbound request"); diff --git a/homestar-runtime/src/lib.rs b/homestar-runtime/src/lib.rs index a3d2e78e..42938158 100644 --- a/homestar-runtime/src/lib.rs +++ b/homestar-runtime/src/lib.rs @@ -18,10 +18,11 @@ //! [homestar-wasm]: homestar_wasm pub mod cli; +pub mod daemon; pub mod db; mod event_handler; -pub mod logger; -mod network; +mod logger; +pub mod network; mod receipt; pub mod runner; mod scheduler; @@ -31,8 +32,8 @@ mod worker; pub mod workflow; pub use db::Db; -#[cfg(feature = "websocket-server")] -pub use network::ws; +pub use event_handler::channel; +pub use logger::*; pub use receipt::{Receipt, RECEIPT_TAG, VERSION_KEY}; pub use runner::Runner; pub use settings::Settings; diff --git a/homestar-runtime/src/logger.rs b/homestar-runtime/src/logger.rs index 0580ad43..03742988 100644 --- a/homestar-runtime/src/logger.rs +++ b/homestar-runtime/src/logger.rs @@ -1,34 +1,48 @@ //! Logger initialization. -use anyhow::Result; +use std::{io, path::PathBuf}; +use tracing_appender::non_blocking::{NonBlocking, WorkerGuard}; use tracing_subscriber::{layer::SubscriberExt as _, prelude::*, EnvFilter}; +const LOG_FILE: &str = "homestar.log"; const DIRECTIVE_EXPECT: &str = "Invalid tracing directive"; -/// Initialize a [tracing_subscriber::Registry] with a [logfmt] layer. -/// -/// [logfmt]: -pub fn init(writer: tracing_appender::non_blocking::NonBlocking) -> Result<()> { +/// Logger interface. +#[derive(Debug)] +pub struct Logger; +/// File-logger interface. +#[derive(Debug)] +pub struct FileLogger; + +impl Logger { + /// Initialize a [tracing_subscriber::Registry] with a [logfmt] layer and + /// write to [io::stdout]. + /// + /// [logfmt]: + pub fn init() -> WorkerGuard { + let (writer, guard) = tracing_appender::non_blocking(io::stdout()); + init(writer, guard) + } +} + +impl FileLogger { + /// Initialize a [tracing_subscriber::Registry] with a [logfmt] layer and + /// write to file. + /// + /// [logfmt]: + pub fn init(dir: PathBuf) -> WorkerGuard { + let file_appender = tracing_appender::rolling::daily(dir, LOG_FILE); + let (writer, guard) = tracing_appender::non_blocking(file_appender); + init(writer, guard) + } +} + +fn init(writer: NonBlocking, guard: WorkerGuard) -> WorkerGuard { let format_layer = tracing_subscriber::fmt::layer() .event_format(tracing_logfmt::EventsFormatter::default()) .fmt_fields(tracing_logfmt::FieldsFormatter::default()) .with_writer(writer); - #[cfg(all(feature = "console", tokio_unstable))] - let filter = EnvFilter::try_from_default_env() - .unwrap_or_else(|_| { - EnvFilter::new("info") - .add_directive("libp2p=info".parse().expect(DIRECTIVE_EXPECT)) - .add_directive( - "libp2p_gossipsub::behaviour=debug" - .parse() - .expect(DIRECTIVE_EXPECT), - ) - }) - .add_directive("tokio=trace".parse().expect(DIRECTIVE_EXPECT)) - .add_directive("runtime=trace".parse().expect(DIRECTIVE_EXPECT)); - - #[cfg(any(not(feature = "console"), not(tokio_unstable)))] let filter = EnvFilter::try_from_default_env().unwrap_or_else(|_| { EnvFilter::new("info") .add_directive("libp2p=info".parse().expect(DIRECTIVE_EXPECT)) @@ -37,8 +51,15 @@ pub fn init(writer: tracing_appender::non_blocking::NonBlocking) -> Result<()> { .parse() .expect(DIRECTIVE_EXPECT), ) + .add_directive("tarpc=info".parse().expect(DIRECTIVE_EXPECT)) + .add_directive("tower_http=info".parse().expect(DIRECTIVE_EXPECT)) }); + #[cfg(all(feature = "console", tokio_unstable))] + let filter = filter + .add_directive("tokio=trace".parse().expect(DIRECTIVE_EXPECT)) + .add_directive("runtime=trace".parse().expect(DIRECTIVE_EXPECT)); + let registry = tracing_subscriber::Registry::default() .with(filter) .with(format_layer); @@ -57,5 +78,5 @@ pub fn init(writer: tracing_appender::non_blocking::NonBlocking) -> Result<()> { registry.init(); } - Ok(()) + guard } diff --git a/homestar-runtime/src/main.rs b/homestar-runtime/src/main.rs index f137cbd3..599c96ce 100644 --- a/homestar-runtime/src/main.rs +++ b/homestar-runtime/src/main.rs @@ -1,89 +1,50 @@ -use anyhow::Result; use clap::Parser; -#[cfg(feature = "websocket-server")] -use homestar_runtime::ws; use homestar_runtime::{ cli::{Cli, Command}, + daemon, db::Database, - logger, Db, Runner, Settings, + Db, FileLogger, Logger, Runner, Settings, }; -use std::sync::{ - atomic::{AtomicUsize, Ordering}, - Arc, -}; -use tokio::{runtime, select, time}; +use miette::Result; +use std::sync::Arc; use tracing::info; -fn main() { - let (stdout_writer, _stdout_guard) = tracing_appender::non_blocking(std::io::stdout()); - logger::init(stdout_writer).expect("Failed to initialize logger"); - +fn main() -> Result<()> { let cli = Cli::parse(); - let settings = if let Some(file) = cli.runtime_config { - Settings::load_from_file(file) - } else { - Settings::load() - } - .expect("Failed to load settings"); - - info!("starting with settings: {:?}", settings,); - - let runtime = runtime::Builder::new_multi_thread() - .enable_all() - .thread_name_fn(|| { - static ATOMIC_ID: AtomicUsize = AtomicUsize::new(0); - let id = ATOMIC_ID.fetch_add(1, Ordering::SeqCst); - format!("runtime-{}", id) - }) - .build() - .expect("Failed to start multi-threaded runtime"); - - let db = Db::setup_connection_pool(settings.node()) - .expect("Failed to setup database connection pool"); - match cli.command { - Command::Start => { - runtime - .block_on(runner(Arc::new(settings), db)) - .expect("Failed to run initialization"); - } - } - - drop(runtime); -} - -async fn runner(settings: Arc, db: impl Database + 'static) -> Result<()> { - let mut runner = Runner::start(settings.clone(), db).await?; - - loop { - select! { - biased; - Ok(_event) = runner.command_receiver() => info!("Connected to the Network"), - _ = Runner::shutdown_signal() => { - info!("gracefully shutting down runner"); - let drain_timeout = time::Instant::now() + settings.node().shutdown_timeout(); - - select! { - Ok(()) = runner.shutdown() => { - #[cfg(feature = "websocket-server")] - match runner.ws_receiver().recv() { - Ok(ws::WsMessage::GracefulShutdown) => (), - Err(err) => info!(error=?err, "runner shutdown complete, but with error"), - } - info!("runner shutdown complete"); - break; - }, - _ = time::sleep_until(drain_timeout) => { - info!("shutdown timeout reached, shutting down runner anyway"); - break; - } - } + Command::Start { + runtime_config, + daemonize, + daemon_dir, + database_url, + } => { + let settings = if let Some(file) = runtime_config { + Settings::load_from_file(file) + } else { + Settings::load() } + .expect("Failed to load settings"); + + let _guard = if daemonize { + daemon::start(daemon_dir.clone()).expect("Failed to daemonize homestar runner"); + FileLogger::init(daemon_dir) + } else { + Logger::init() + }; + + info!("starting with settings: {:?}", settings,); + Db::set_url(database_url).expect("Failed to set DB url"); + let db = Db::setup_connection_pool(settings.node()).expect("Failed to setup DB pool"); + + info!("starting Homestar runtime..."); + let settings = Arc::new(settings); + let runner = Runner::start(settings.clone(), db).expect("Failed to start server"); + runner + .serve(settings) + .expect("Failed to run server runtime"); } + cmd => cmd.handle_rpc_command()?, } - - //drop(db); - Ok(()) } diff --git a/homestar-runtime/src/network/mod.rs b/homestar-runtime/src/network/mod.rs index 646f8495..d87c42d8 100644 --- a/homestar-runtime/src/network/mod.rs +++ b/homestar-runtime/src/network/mod.rs @@ -1,15 +1,16 @@ //! [libp2p], [websocket], and [ipfs] networking interfaces. //! -//! [libp2p]: -//! [websocket]: ws -//! [ipfs]: ipfs +//! [libp2p]: libp2p +//! [websocket]: axum::extract::ws +//! [ipfs]: ipfs_api #[cfg(feature = "ipfs")] pub(crate) mod ipfs; pub(crate) mod pubsub; +pub mod rpc; pub(crate) mod swarm; #[cfg(feature = "websocket-server")] -pub mod ws; +pub(crate) mod ws; #[cfg(feature = "ipfs")] pub(crate) use ipfs::IpfsCli; diff --git a/homestar-runtime/src/network/rpc.rs b/homestar-runtime/src/network/rpc.rs new file mode 100644 index 00000000..e5f70da7 --- /dev/null +++ b/homestar-runtime/src/network/rpc.rs @@ -0,0 +1,181 @@ +//! RPC server implementation. + +use crate::{ + channel::{BoundedChannel, BoundedChannelReceiver, BoundedChannelSender}, + settings, +}; +use futures::{future, StreamExt}; +use std::{io, net::SocketAddr, path::PathBuf, sync::Arc}; +use stream_cancel::Valved; +use tarpc::{ + client::{self, RpcError}, + context, + server::{self, incoming::Incoming, Channel}, + tokio_serde::formats::Bincode, +}; +use tokio::{ + runtime::Handle, + select, + sync::{mpsc, oneshot}, +}; +use tracing::{info, warn}; + +/// Message type for messages sent back from the +/// websocket server to the [runner] for example. +/// +/// [runner]: crate::Runner +#[derive(Debug)] +pub(crate) enum ServerMessage { + /// Notify the [Runner] that the RPC server was given a `stop` command. + /// + /// [Runner]: crate::Runner + ShutdownCmd, + /// Message sent by the [Runner] to start a graceful shutdown. + /// + /// [Runner]: crate::Runner + GracefulShutdown(oneshot::Sender<()>), +} + +/// RPC interface definition for CLI-server interaction. +#[tarpc::service] +pub(crate) trait Interface { + /// Returns a greeting for name. + async fn run(workflow_file: PathBuf); + /// Ping the server. + async fn ping() -> String; + /// Stop the server. + async fn stop() -> Result<(), String>; +} + +/// RPC server state information. +#[derive(Debug, Clone)] +pub(crate) struct Server { + /// [SocketAddr] of the RPC server. + pub(crate) addr: SocketAddr, + /// Sender for messages to be sent to the RPC server. + pub(crate) sender: Arc>, + /// Receiver for messages sent to the RPC server. + pub(crate) receiver: BoundedChannelReceiver, + /// Sender for messages to be sent to the [Runner]. + /// + /// [Runner]: crate::Runner + pub(crate) runner_sender: Arc>, + /// Maximum number of connections to the RPC server. + pub(crate) max_connections: usize, +} + +/// RPC client wrapper. +#[derive(Debug, Clone)] +pub struct Client(InterfaceClient); + +/// RPC server state information. +#[derive(Debug, Clone)] +#[allow(dead_code)] +struct ServerHandler { + addr: SocketAddr, + runner_sender: Arc>, +} + +impl ServerHandler { + fn new(addr: SocketAddr, runner_sender: Arc>) -> Self { + Self { + addr, + runner_sender, + } + } +} + +#[tarpc::server] +impl Interface for ServerHandler { + async fn run(self, _: context::Context, _workflow_file: PathBuf) {} + async fn ping(self, _: context::Context) -> String { + "pong".into() + } + async fn stop(self, _: context::Context) -> Result<(), String> { + let _ = self.runner_sender.send(ServerMessage::ShutdownCmd).await; + Ok(()) + } +} + +impl Server { + /// Create a new instance of the RPC server. + pub(crate) fn new( + settings: settings::Network, + runner_sender: Arc>, + ) -> Self { + let (tx, rx) = BoundedChannel::oneshot(); + Self { + addr: SocketAddr::new(settings.rpc_host, settings.rpc_port), + sender: tx.into(), + receiver: rx, + runner_sender, + max_connections: settings.rpc_max_connections, + } + } + + /// Return a RPC server channel sender. + pub(crate) fn sender(&self) -> Arc> { + self.sender.clone() + } + + /// Start the RPC server and connect the client. + pub(crate) async fn spawn(self, runtime_handle: Handle) -> anyhow::Result<()> { + let mut listener = tarpc::serde_transport::tcp::listen(self.addr, Bincode::default).await?; + listener.config_mut().max_frame_length(usize::MAX); + + info!("RPC server listening on {}", self.addr); + + // setup valved listener for cancellation + let (exit, incoming) = Valved::new(listener); + + runtime_handle.spawn(async move { + let fut = incoming + // Ignore accept errors. + .filter_map(|r| future::ready(r.ok())) + .map(server::BaseChannel::with_defaults) + // Limit channels to 1 per IP. + .max_channels_per_key(1, |t| t.transport().peer_addr().unwrap_or(self.addr).ip()) + .map(|channel| { + let handler = ServerHandler::new(self.addr, self.runner_sender.clone()); + channel.execute(handler.serve()) + }) + .buffer_unordered(self.max_connections) + .for_each(|_| async {}); + + select! { + biased; + Ok(msg) = tokio::task::spawn_blocking(move || self.receiver.recv()) => + if let Ok(ServerMessage::GracefulShutdown(tx)) = msg { + info!("RPC server shutting down"); + drop(exit); + let _ = tx.send(()); + }, + _ = fut => warn!("RPC server exited unexpectedly"), + } + }); + + Ok(()) + } +} + +impl Client { + /// Instantiate a new [Client] with a [tcp] connection to a running Homestar + /// runner/server. + /// + /// [tcp]: tarpc::serde_transport::tcp + pub async fn new(addr: SocketAddr) -> Result { + let transport = tarpc::serde_transport::tcp::connect(addr, Bincode::default).await?; + let client = InterfaceClient::new(client::Config::default(), transport).spawn(); + Ok(Client(client)) + } + + /// Ping the server. + pub async fn ping(&self) -> Result { + self.0.ping(context::current()).await + } + + /// Stop the server. + pub async fn stop(&self) -> Result, RpcError> { + self.0.stop(context::current()).await + } +} diff --git a/homestar-runtime/src/network/ws.rs b/homestar-runtime/src/network/ws.rs index 1ed174ef..31c7b688 100644 --- a/homestar-runtime/src/network/ws.rs +++ b/homestar-runtime/src/network/ws.rs @@ -1,11 +1,11 @@ //! Sets up a websocket server for sending and receiving messages from browser //! clients. -use crate::{event_handler::channel::BoundedChannelSender, runner::Runner, settings}; +use crate::settings; use anyhow::{anyhow, Result}; use axum::{ extract::{ - ws::{self, Message, WebSocketUpgrade}, + ws::{self, Message as AxumMsg, WebSocketUpgrade}, ConnectInfo, State, TypedHeader, }, response::IntoResponse, @@ -19,47 +19,41 @@ use std::{ str::FromStr, sync::Arc, }; -use tokio::sync::broadcast; +use tokio::sync::{broadcast, mpsc, oneshot}; use tracing::{debug, info}; /// Type alias for websocket sender. -pub(crate) type WsSender = Arc>; +pub(crate) type Sender = Arc>; /// Message type for messages sent back from the /// websocket server to the [runner] for example. /// /// [runner]: crate::Runner -#[derive(Debug, Clone, PartialEq)] -pub enum WsMessage { +#[derive(Debug)] +pub(crate) enum Message { /// Notify the listener that the websocket server is shutting down /// gracefully. - GracefulShutdown, + GracefulShutdown(oneshot::Sender<()>), } -/// WebSocket state information. +/// WebSocket server state information. #[allow(dead_code, missing_debug_implementations)] #[derive(Clone)] -pub(crate) struct WebSocketServer { +pub(crate) struct Server { addr: SocketAddr, - msg_sender: WsSender, - runner_sender: Arc>, + msg_sender: Arc, } -impl WebSocketServer { +impl Server { /// Setup bounded, MPMC channel for runtime to send and received messages /// through the websocket connection(s). - pub(crate) fn setup_channel( - capacity: usize, - ) -> (broadcast::Sender, broadcast::Receiver) { + fn setup_channel(capacity: usize) -> (broadcast::Sender, broadcast::Receiver) { broadcast::channel(capacity) } - /// Start the websocket server given settings. - pub(crate) async fn start( - settings: settings::Network, - ws_sender: WsSender, - runner_sender: Arc>, - ) -> Result<()> { + pub(crate) fn new(settings: settings::Network) -> Result { + let (sender, _receiver) = Self::setup_channel(settings.websocket_capacity); + let host = IpAddr::from_str(&settings.websocket_host.to_string())?; let port_setting = settings.websocket_port; let addr = if port_available(host, port_setting) { @@ -71,33 +65,41 @@ impl WebSocketServer { SocketAddr::from((host, port)) }; - let ws_state = Self { + Ok(Self { addr, - msg_sender: ws_sender, - runner_sender: runner_sender.clone(), - }; - let app = Router::new().route("/", get(ws_handler).with_state(ws_state.clone())); + msg_sender: Arc::new(sender.into()), + }) + } - info!("websocket server listening on {}", addr); + /// Start the websocket server given settings. + pub(crate) async fn start(self, mut rx: mpsc::Receiver) -> Result<()> { + let app = Router::new().route("/", get(ws_handler).with_state(self.clone())); + info!("websocket server listening on {}", self.addr); - axum::Server::bind(&addr) + axum::Server::bind(&self.addr) .serve(app.into_make_service_with_connect_info::()) .with_graceful_shutdown(async { - let _ = Runner::shutdown_signal().await; - info!("websocket server shutting down"); - drop(ws_state.msg_sender); - let _ = runner_sender.send(WsMessage::GracefulShutdown); + if let Some(Message::GracefulShutdown(tx)) = rx.recv().await { + info!("websocket server shutting down"); + let _ = tx.send(()); + } }) .await?; Ok(()) } + + /// Get websocket message sender for broadcasting messages to websocket + /// clients. + pub(crate) fn sender(&self) -> Arc { + self.msg_sender.clone() + } } async fn ws_handler( ws: WebSocketUpgrade, user_agent: Option>, - State(state): State, + State(state): State, ConnectInfo(addr): ConnectInfo, ) -> impl IntoResponse { let user_agent = if let Some(TypedHeader(user_agent)) = user_agent { @@ -112,12 +114,12 @@ async fn ws_handler( ws.on_upgrade(move |socket| handle_socket(socket, state)) } -async fn handle_socket(mut socket: ws::WebSocket, state: WebSocketServer) { +async fn handle_socket(mut socket: ws::WebSocket, state: Server) { let addr = state.addr; // Send a ping (unsupported by some browsers) just to kick things off and // get a response. - if socket.send(Message::Ping(vec![1, 2, 3])).await.is_ok() { + if socket.send(AxumMsg::Ping(vec![1, 2, 3])).await.is_ok() { debug!("Pinged {}...", addr); } else { info!("Could not send ping {}!", addr); @@ -150,7 +152,7 @@ async fn handle_socket(mut socket: ws::WebSocket, state: WebSocketServer) { while let Ok(msg) = subscribed_rx.recv().await { // In any websocket error, break loop. if socket_sender - .send(Message::Binary(msg.into())) + .send(AxumMsg::Binary(msg.into())) .await .is_err() { @@ -182,15 +184,15 @@ async fn handle_socket(mut socket: ws::WebSocket, state: WebSocketServer) { /// Process [messages]. /// /// [messages]: Message -async fn process_message(msg: Message, addr: SocketAddr) -> ControlFlow<(), ()> { +async fn process_message(msg: AxumMsg, addr: SocketAddr) -> ControlFlow<(), ()> { match msg { - Message::Text(t) => { + AxumMsg::Text(t) => { info!(">>> {} sent str: {:?}", addr, t); } - Message::Binary(d) => { + AxumMsg::Binary(d) => { info!(">>> {} sent {} bytes: {:?}", addr, d.len(), d); } - Message::Close(c) => { + AxumMsg::Close(c) => { if let Some(cf) = c { info!( ">>> {} sent close with code {} and reason `{}`", @@ -202,13 +204,13 @@ async fn process_message(msg: Message, addr: SocketAddr) -> ControlFlow<(), ()> return ControlFlow::Break(()); } - Message::Pong(v) => { + AxumMsg::Pong(v) => { info!(">>> {} sent pong with {:?}", addr, v); } - // You should never need to manually handle Message::Ping, as axum's websocket library + // You should never need to manually handle AxumMsg::Ping, as axum's websocket library // will do so for you automagically by replying with Pong and copying the v according to // spec. But if you need the contents of the pings you can see them here. - Message::Ping(v) => { + AxumMsg::Ping(v) => { info!(">>> {} sent ping with {:?}", addr, v); } } @@ -222,18 +224,14 @@ fn port_available(host: IpAddr, port: u16) -> bool { #[cfg(test)] mod test { use super::*; - use crate::{event_handler::channel::BoundedChannel, settings::Settings}; + use crate::settings::Settings; #[tokio::test] async fn ws_connect() { - let settings = Arc::new(Settings::load().unwrap()); - let (tx, _rx) = WebSocketServer::setup_channel(10); - let ch = BoundedChannel::oneshot(); - tokio::spawn(WebSocketServer::start( - settings.node().network().clone(), - tx.into(), - ch.tx.into(), - )); + let settings = Settings::load().unwrap(); + let state = Server::new(settings.node.network).unwrap(); + let (_ws_tx, ws_rx) = mpsc::channel(1); + tokio::spawn(state.start(ws_rx)); tokio_tungstenite::connect_async("ws://localhost:1337".to_string()) .await diff --git a/homestar-runtime/src/runner.rs b/homestar-runtime/src/runner.rs index f5f90a47..934df874 100644 --- a/homestar-runtime/src/runner.rs +++ b/homestar-runtime/src/runner.rs @@ -1,32 +1,40 @@ //! General [Runner] interface for working across multiple workers //! and executing workflows. +#[cfg(feature = "websocket-server")] +use crate::network::ws; #[cfg(feature = "ipfs")] use crate::network::IpfsCli; use crate::{ + channel::BoundedChannelSender, db::Database, event_handler::{Event, EventHandler}, - network::swarm, + network::{rpc, swarm}, Settings, }; -#[cfg(feature = "websocket-server")] -use crate::{ - event_handler::channel::{BoundedChannel, BoundedChannelReceiver}, - network::ws::{self, WebSocketServer}, -}; use anyhow::Result; use dashmap::DashMap; use libipld::Cid; +#[cfg(not(test))] +use std::sync::atomic::{AtomicUsize, Ordering}; use std::sync::Arc; use tokio::{ - select, + runtime, select, signal::unix::{signal, SignalKind}, sync::{mpsc, oneshot}, - task::AbortHandle, + task::{AbortHandle, JoinHandle}, + time, }; +use tokio_util::time::DelayQueue; use tracing::info; -/// Type alias for a [DashMap] containing running task information. +#[cfg(not(test))] +const HOMESTAR_THREAD: &str = "homestar-runtime"; + +/// Type alias for a [DashMap] containing running worker [JoinHandle]s. +pub type RunningWorkerSet = DashMap>>; + +/// Type alias for a [DashMap] containing running task [AbortHandle]s. pub type RunningTaskSet = DashMap>; /// Trait for managing a [DashMap] of running task information. @@ -50,15 +58,17 @@ impl ModifiedSet for RunningTaskSet { /// /// [Workflows]: homestar_core::Workflow #[cfg(feature = "websocket-server")] +#[allow(dead_code)] #[derive(Debug)] pub struct Runner { - command_sender: oneshot::Sender, - command_receiver: oneshot::Receiver, + message_buffer_len: usize, event_sender: Arc>, - running_set: RunningTaskSet, - #[allow(dead_code)] - ws_sender: ws::WsSender, - ws_receiver: BoundedChannelReceiver, + expiration_queue: DelayQueue, + running_tasks: RunningTaskSet, + running_workers: RunningWorkerSet, + runtime: tokio::runtime::Runtime, + ws_msg_sender: Arc, + ws_mpsc_sender: mpsc::Sender, } /// Runner interface. @@ -66,122 +76,180 @@ pub struct Runner { /// /// [Workflows]: homestar_core::Workflow #[cfg(not(feature = "websocket-server"))] +#[allow(dead_code)] #[derive(Debug)] pub struct Runner { - command_sender: oneshot::Sender, - command_receiver: oneshot::Receiver, + message_buffer_len: usize, event_sender: Arc>, - running_set: RunningTaskSet, + expiration_queue: DelayQueue, + running_tasks: RunningTaskSet, + running_workers: RunningWorkerSet, + runtime: tokio::runtime::Runtime, } impl Runner { - /// Start the Homestar runner context. - pub async fn start(settings: Arc, db: impl Database + 'static) -> Result { - let (command_sender, command_receiver) = oneshot::channel(); - let map = DashMap::new(); - let swarm = swarm::new(settings.node()).await?; + /// Setup bounded, MPSC channel for top-level RPC communication. + pub(crate) fn setup_channel( + capacity: usize, + ) -> ( + mpsc::Sender, + mpsc::Receiver, + ) { + mpsc::channel(capacity) + } - let event_handler = EventHandler::new(swarm, db.clone(), settings.node()); - let event_sender = event_handler.sender(); + /// Initialize and start the Homestar [Runner] / runtime. + #[cfg(not(test))] + pub fn start(settings: Arc, db: impl Database + 'static) -> Result { + let runtime = runtime::Builder::new_multi_thread() + .enable_all() + .thread_name_fn(|| { + static ATOMIC_ID: AtomicUsize = AtomicUsize::new(0); + let id = ATOMIC_ID.fetch_add(1, Ordering::SeqCst); + format!("{HOMESTAR_THREAD}-{id}") + }) + .build()?; - #[cfg(feature = "ipfs")] - tokio::spawn({ - let ipfs = IpfsCli::default(); - event_handler.start(ipfs) - }); + let runner = Self::init(settings, db, runtime)?; - #[cfg(not(feature = "ipfs"))] - tokio::spawn(event_handler.start()); + Ok(runner) + } - #[cfg(feature = "websocket-server")] - { - // Setup websocket communication. - let (tx, _rx) = - WebSocketServer::setup_channel(settings.node().network().websocket_capacity); - let ws_tx = Arc::new(tx); - let ws_channel = BoundedChannel::oneshot(); - let oneshot_sender = ws_channel.tx; - let oneshot_receiver = ws_channel.rx; - - tokio::spawn({ - let settings = settings.node().network().clone(); - WebSocketServer::start(settings, ws_tx.clone(), oneshot_sender.into()) - }); + /// Initialize and start the Homestar [Runner] / runtime. + #[cfg(test)] + pub fn start(settings: Arc, db: impl Database + 'static) -> Result { + let runtime = runtime::Builder::new_current_thread() + .enable_all() + .build()?; - Ok(Self { - command_sender, - command_receiver, - event_sender, - running_set: map, - ws_sender: ws_tx, - ws_receiver: oneshot_receiver, - }) - } + let runner = Self::init(settings, db, runtime)?; - #[cfg(not(feature = "websocket-server"))] - Ok(Self { - command_sender, - command_receiver, - event_sender, - running_set: map, - }) + Ok(runner) } - /// Sequence for shutting down a [Runner], including: - /// a) event-handler channels, - /// b) Running workers - /// c) [Runner] channels. - pub async fn shutdown(&mut self) -> Result<()> { - let (shutdown_sender, shutdown_receiver) = oneshot::channel(); - self.event_sender - .send(Event::Shutdown(shutdown_sender)) - .await?; - - shutdown_receiver.await?; + /// Listen loop for [Runner] signals and messages. + pub fn serve(self, settings: Arc) -> Result<()> { + let (tx, mut rx) = Self::setup_channel(self.message_buffer_len); + let shutdown_timeout = settings.node.shutdown_timeout; + let rpc_server = rpc::Server::new(settings.node.network.clone(), tx.into()); + let rpc_sender = rpc_server.sender(); + self.runtime + .block_on(rpc_server.spawn(self.runtime.handle().clone()))?; + + let shutdown_time_left = self.runtime.block_on(async { + loop { + select! { + biased; + // Duplicate inner-shutdown code here, as tokio::select! + // doesn't allow for either-or patterns like matches. + Some(rpc::ServerMessage::ShutdownCmd) = rx.recv() => { + info!("RPC shutdown signal received, shutting down runner"); + let now = time::Instant::now(); + let drain_timeout = now + shutdown_timeout; + select! { + Ok(()) = self.shutdown(rpc_sender) => { + break now.elapsed(); + }, + _ = time::sleep_until(drain_timeout) => { + info!("shutdown timeout reached, shutting down runner anyway"); + break now.elapsed(); + } + } + }, + _ = Self::shutdown_signal() => { + info!("gracefully shutting down runner"); + + let now = time::Instant::now(); + let drain_timeout = now + shutdown_timeout; + select! { + Ok(()) = self.shutdown(rpc_sender) => { + break now.elapsed(); + }, + _ = time::sleep_until(drain_timeout) => { + info!("shutdown timeout reached, shutting down runner anyway"); + break now.elapsed(); + } + } + } + } + } + }); - // TODO: shutdown workers + if shutdown_time_left < shutdown_timeout { + self.runtime + .shutdown_timeout(shutdown_timeout - shutdown_time_left); + info!("runner shutdown complete"); + } - info!("shutting down runner's channels"); - self.command_receiver.close(); - self.command_sender.closed().await; Ok(()) } - /// Captures shutdown signals for [Runner] and other sub-processes like - /// the webSocket server. - pub async fn shutdown_signal() -> Result<()> { - let mut sigint = signal(SignalKind::interrupt())?; - let mut sigterm = signal(SignalKind::terminate())?; - - select! { - _ = tokio::signal::ctrl_c() => info!("CTRL-C received, shutting down"), - _ = sigint.recv() => info!("SIGINT received, shutting down"), - _ = sigterm.recv() => info!("SIGTERM received, shutting down"), - } + /// [mpsc::Sender] of the event-handler. + /// + /// [EventHandler]: crate::EventHandler + pub fn event_sender(&self) -> Arc> { + self.event_sender.clone() + } - Ok(()) + /// [tokio::sync::broadcast::Sender] for sending messages through the + /// webSocket server to subscribers. + #[cfg(feature = "websocket-server")] + pub fn ws_msg_sender(&self) -> &ws::Sender { + &self.ws_msg_sender } - /// Garbage-collect task [AbortHandle]s in the [RunningTaskSet]. - pub fn gc(&mut self) { - self.running_set.retain(|_cid, handles| { + /// Garbage-collect task [AbortHandle]s in the [RunningTaskSet] and + /// workers in the [RunningWorkerSet]. + #[allow(dead_code)] + pub(crate) fn gc(&mut self) { + self.running_tasks.retain(|_cid, handles| { handles.retain(|handle| !handle.is_finished()); !handles.is_empty() }); + + self.running_workers + .retain(|_cid, handle| !handle.is_finished()); } - /// Garbage-collect task [AbortHandle]s in the [RunningTaskSet] for a specific - /// workflow [Cid], running on a worker. - pub fn gc_worker(&mut self, cid: Cid) { - if let Some(mut handles) = self.running_set.get_mut(&cid) { + /// Garbage-collect task [AbortHandle]s in the [RunningTaskSet] and a + /// worker's [JoinHandle] in the [RunningWorkerSet] for a specific workflow + /// [Cid], running on a worker. + #[allow(dead_code)] + pub(crate) fn gc_worker(&mut self, cid: Cid) { + if let Some(mut handles) = self.running_tasks.get_mut(&cid) { handles.retain(|handle| !handle.is_finished()); } - self.running_set.retain(|_cid, handles| !handles.is_empty()); + + self.running_tasks + .retain(|_cid, handles| !handles.is_empty()); + + if let Some(handle) = self.running_workers.get_mut(&cid) { + if handle.is_finished() { + self.running_workers.remove(&cid); + } + } + } + + /// Abort all workers. + #[allow(dead_code)] + pub(crate) fn abort_workers(&mut self) { + self.running_workers + .iter_mut() + .for_each(|handle| handle.abort()); + } + + /// Abort a specific worker given a [Cid]. + #[allow(dead_code)] + pub(crate) fn abort_worker(&mut self, cid: Cid) { + if let Some(handle) = self.running_workers.get_mut(&cid) { + handle.abort() + } } /// Abort all tasks running within all workers. - pub fn abort_all_tasks(&mut self) { - self.running_set.iter_mut().for_each(|handles| { + #[allow(dead_code)] + pub(crate) fn abort_tasks(&mut self) { + self.running_tasks.iter_mut().for_each(|handles| { for abort_handle in &*handles { abort_handle.abort(); } @@ -189,136 +257,247 @@ impl Runner { } /// Abort a specific worker's tasks given a [Cid]. - pub fn abort_worker_tasks(&mut self, cid: Cid) { - if let Some(handles) = self.running_set.get_mut(&cid) { + #[allow(dead_code)] + pub(crate) fn abort_worker_tasks(&mut self, cid: Cid) { + if let Some(handles) = self.running_tasks.get_mut(&cid) { for abort_handle in &*handles { abort_handle.abort(); } } } - /// [mpsc::Sender] of the event-handler. - /// - /// [EventHandler]: crate::EventHandler - pub fn event_sender(&self) -> Arc> { - self.event_sender.clone() - } + /// Captures shutdown signals for [Runner]. + async fn shutdown_signal() -> Result<()> { + let mut sigint = signal(SignalKind::interrupt())?; + let mut sigterm = signal(SignalKind::terminate())?; - /// [tokio::sync::broadcast::Sender] for sending messages through the - /// webSocket server to subscribers. - #[cfg(feature = "websocket-server")] - pub fn ws_sender(&self) -> &ws::WsSender { - &self.ws_sender - } + select! { + _ = tokio::signal::ctrl_c() => info!("CTRL-C received, shutting down"), + _ = sigint.recv() => info!("SIGINT received, shutting down"), + _ = sigterm.recv() => info!("SIGTERM received, shutting down"), + } - /// Channel for receiving [messages] back from the - /// webSocket server. - /// - /// [messages]: ws::WsMessage - #[cfg(feature = "websocket-server")] - pub fn ws_receiver(&mut self) -> &mut BoundedChannelReceiver { - &mut self.ws_receiver + Ok(()) } - /// [oneshot::Sender] for sending commands to the [Runner]. - pub fn command_sender(&self) -> &oneshot::Sender { - &self.command_sender + /// Sequence for shutting down a [Runner], including: + /// a) event-handler channels, + /// b) Running workers + /// c) [Runner] channels. + async fn shutdown( + &self, + rpc_sender: Arc>, + ) -> Result<()> { + let (shutdown_sender, shutdown_receiver) = oneshot::channel(); + rpc_sender.try_send(rpc::ServerMessage::GracefulShutdown(shutdown_sender))?; + shutdown_receiver.await?; + + let (shutdown_sender, shutdown_receiver) = oneshot::channel(); + self.event_sender + .send(Event::Shutdown(shutdown_sender)) + .await?; + shutdown_receiver.await?; + + #[cfg(feature = "websocket-server")] + { + let (shutdown_sender, shutdown_receiver) = oneshot::channel(); + self.ws_mpsc_sender + .send(ws::Message::GracefulShutdown(shutdown_sender)) + .await?; + shutdown_receiver.await?; + } + + // TODO: shutdown workers + + Ok(()) } - /// [oneshot::Receiver] for Runner to receive commands. - pub fn command_receiver(&mut self) -> &mut oneshot::Receiver { - &mut self.command_receiver + fn init( + settings: Arc, + db: impl Database + 'static, + runtime: tokio::runtime::Runtime, + ) -> Result { + let swarm = runtime.block_on(swarm::new(settings.node()))?; + + let event_handler = EventHandler::new(swarm, db, settings.node()); + let event_sender = event_handler.sender(); + + #[cfg(feature = "ipfs")] + let _event_handler_hdl = runtime.spawn({ + let ipfs = IpfsCli::default(); + event_handler.start(ipfs) + }); + + #[cfg(not(feature = "ipfs"))] + let _event_handler_hdl = runtime.spawn(event_handler.start()); + + #[cfg(feature = "websocket-server")] + { + // Setup websocket communication. + let ws_server = ws::Server::new(settings.node.network.clone())?; + let ws_msg_tx = ws_server.sender(); + + let (ws_tx, ws_rx) = mpsc::channel(settings.node.network.websocket_capacity); + let _ws_hdl = runtime.spawn(ws_server.start(ws_rx)); + + Ok(Self { + message_buffer_len: settings.node.network.events_buffer_len, + event_sender, + expiration_queue: DelayQueue::new(), + running_tasks: DashMap::new(), + running_workers: DashMap::new(), + runtime, + ws_msg_sender: ws_msg_tx, + ws_mpsc_sender: ws_tx, + }) + } + + #[cfg(not(feature = "websocket-server"))] + Ok(Self { + message_buffer_len: settings.node.network.events_buffer_len, + event_sender, + expiration_queue: DelayQueue::new(), + running_tasks: DashMap::new(), + running_workers: DashMap::new(), + runtime, + }) } } #[cfg(test)] mod test { use super::*; + use crate::network::rpc::Client; use homestar_core::test_utils; use rand::thread_rng; - use std::{ - sync::atomic::{AtomicUsize, Ordering}, - time::Duration, - }; + use std::net::SocketAddr; + use tokio::net::TcpStream; - static ATOMIC_PORT: AtomicUsize = AtomicUsize::new(444); - - async fn setup() -> Runner { + fn setup() -> (Runner, Settings) { let mut settings = Settings::load().unwrap(); - let _ = ATOMIC_PORT.fetch_add(1, Ordering::SeqCst) as u16; - settings.node.network.websocket_port = ATOMIC_PORT.load(Ordering::SeqCst) as u16; - let db = crate::test_utils::db::MemoryDb::setup_connection_pool(settings.node()).unwrap(); + settings.node.network.websocket_port = test_utils::ports::get_port() as u16; + settings.node.network.rpc_port = test_utils::ports::get_port() as u16; + let db = crate::test_utils::db::MemoryDb::setup_connection_pool(&settings.node).unwrap(); - Runner::start(settings.into(), db).await.unwrap() + let runner = Runner::start(settings.clone().into(), db).unwrap(); + (runner, settings) } - #[tokio::test] - async fn shutdown() { - let mut runner = setup().await; + #[test] + fn shutdown() { + let (runner, settings) = setup(); + + let (tx, _rx) = Runner::setup_channel(1); + let rpc_server = rpc::Server::new(settings.node.network.clone(), Arc::new(tx)); + let rpc_sender = rpc_server.sender(); + + let addr = SocketAddr::new( + settings.node.network.rpc_host, + settings.node.network.rpc_port, + ); - tokio::spawn(async move { - tokio::time::sleep(Duration::from_millis(100)).await; - // Send SIGINT signal - let _ = nix::sys::signal::kill(nix::unistd::getpid(), nix::sys::signal::Signal::SIGINT); + runner.runtime.block_on(async { + rpc_server + .spawn(runner.runtime.handle().clone()) + .await + .unwrap(); + + let _stream = TcpStream::connect(addr).await.expect("Connection error"); + let _another_stream = TcpStream::connect(addr).await.expect("Connection error"); }); - select! { - result = Runner::shutdown_signal() => { - assert!(result.is_ok()); - select! { - Ok(()) = runner.shutdown() => { - assert!(runner.command_sender().is_closed()); - #[cfg(feature = "websocket-server")] - assert_eq!(runner.ws_receiver().recv().unwrap(), ws::WsMessage::GracefulShutdown); + runner.runtime.block_on(async { + match runner.shutdown(rpc_sender).await { + Ok(()) => { + // with shutdown, we should not be able to connect to the server(s) + let stream_error = TcpStream::connect(addr).await; + assert!(stream_error.is_err()); + assert!(matches!( + stream_error.unwrap_err().kind(), + std::io::ErrorKind::ConnectionRefused + )); + + #[cfg(feature = "websocket-server")] + { + let ws_error = + tokio_tungstenite::connect_async("ws://localhost:1337".to_string()) + .await; + assert!(ws_error.is_err()); } } + _ => panic!("Shutdown failed."), } - } + }); } - #[tokio::test] - async fn abort_all_tasks() { - let mut runner = setup().await; + #[test] + fn spawn_rpc_server_and_ping() { + let (runner, settings) = setup(); - let mut set = tokio::task::JoinSet::new(); + let (tx, _rx) = Runner::setup_channel(1); + let rpc_server = rpc::Server::new(settings.node.network.clone(), Arc::new(tx)); + + runner + .runtime + .block_on(rpc_server.spawn(runner.runtime.handle().clone())) + .unwrap(); - for i in 0..3 { - let handle = set.spawn(async move { i }); - runner.running_set.append_or_insert( - test_utils::cid::generate_cid(&mut thread_rng()), - vec![handle], + runner.runtime.spawn(async move { + let addr = SocketAddr::new( + settings.node.network.rpc_host, + settings.node.network.rpc_port, ); - } - runner.abort_all_tasks(); - assert!(!runner.running_set.is_empty()); + let client = Client::new(addr).await.unwrap(); + let response = client.ping().await.unwrap(); + assert_eq!(response, "pong".to_string()); + }); + } + + #[test] + fn abort_all_tasks() { + let (mut runner, _) = setup(); + let mut set = tokio::task::JoinSet::new(); + + runner.runtime.block_on(async { + for i in 0..3 { + let handle = set.spawn(async move { i }); + runner.running_tasks.append_or_insert( + test_utils::cid::generate_cid(&mut thread_rng()), + vec![handle], + ); + } + + while set.join_next().await.is_some() {} + }); + + runner.abort_tasks(); + assert!(!runner.running_tasks.is_empty()); - while set.join_next().await.is_some() {} runner.gc(); - assert!(runner.running_set.is_empty()); + assert!(runner.running_tasks.is_empty()); } - #[tokio::test] - async fn abort_one_task() { - let mut runner = setup().await; - + #[test] + fn abort_one_task() { + let (mut runner, _) = setup(); let mut set = tokio::task::JoinSet::new(); let mut cids = vec![]; + runner.runtime.block_on(async { + for i in 0..3 { + let handle = set.spawn(async move { i }); + let cid = test_utils::cid::generate_cid(&mut thread_rng()); + runner.running_tasks.append_or_insert(cid, vec![handle]); + cids.push(cid); + } - for i in 0..3 { - let handle = set.spawn(async move { i }); - let cid = test_utils::cid::generate_cid(&mut thread_rng()); - runner.running_set.append_or_insert(cid, vec![handle]); - cids.push(cid); - } + while set.join_next().await.is_some() {} + }); runner.abort_worker_tasks(cids[0]); - assert!(runner.running_set.len() == 3); - - while set.join_next().await.is_some() {} + assert!(runner.running_tasks.len() == 3); runner.gc_worker(cids[0]); - - assert!(runner.running_set.len() == 2); + assert!(runner.running_tasks.len() == 2); } } diff --git a/homestar-runtime/src/scheduler.rs b/homestar-runtime/src/scheduler.rs index 8e5efbf2..cc0e0c1d 100644 --- a/homestar-runtime/src/scheduler.rs +++ b/homestar-runtime/src/scheduler.rs @@ -26,7 +26,7 @@ use homestar_wasm::io::Arg; use indexmap::IndexMap; use libipld::Cid; use std::{ops::ControlFlow, str::FromStr, sync::Arc, time::Instant}; -use tokio::sync::mpsc; +use tokio::sync::{mpsc, RwLock}; use tracing::debug; type Schedule<'a> = Vec, usize>>>; @@ -54,7 +54,7 @@ pub(crate) struct ExecutionGraph<'a> { #[derive(Debug, Clone, Default)] pub(crate) struct TaskScheduler<'a> { /// In-memory map of task/instruction results. - pub(crate) linkmap: Arc>>, + pub(crate) linkmap: Arc>>>, /// [ExecutionGraph] of what's been run so far for a [Workflow] of `batched` /// [Tasks]. /// @@ -133,22 +133,17 @@ impl<'a> TaskScheduler<'a> { } Err(_) => { debug!("receipt not available in the database"); - let channel = BoundedChannel::new(pointers_len); + let (tx, rx) = BoundedChannel::with(pointers_len); for ptr in &pointers { - let _ = - event_sender.try_send(Event::FindRecord(QueryRecord::with( - ptr.cid(), - CapsuleTag::Receipt, - channel.tx.clone(), - ))); + let _ = event_sender.try_send(Event::FindRecord( + QueryRecord::with(ptr.cid(), CapsuleTag::Receipt, tx.clone()), + )); } let mut linkmap = LinkMap::>::new(); let mut counter = 0; while let Ok(ResponseEvent::Found(Ok(FoundEvent::Receipt(found)))) = - channel - .rx - .recv_deadline(Instant::now() + settings.p2p_check_timeout) + rx.recv_deadline(Instant::now() + settings.p2p_check_timeout) { if pointers.contains(&Pointer::new(found.cid())) { if let Ok(cid) = found.instruction().try_into() { @@ -182,7 +177,7 @@ impl<'a> TaskScheduler<'a> { }; Ok(Self { - linkmap: Arc::new(linkmap), + linkmap: Arc::new(linkmap.into()), ran: Some(schedule), run: pivot, resume_step: step, @@ -190,7 +185,7 @@ impl<'a> TaskScheduler<'a> { }) } _ => Ok(Self { - linkmap: Arc::new(LinkMap::>::new()), + linkmap: Arc::new(LinkMap::>::new().into()), ran: None, run: schedule, resume_step: None, @@ -251,7 +246,7 @@ mod test { .await .unwrap(); - assert!(scheduler.linkmap.is_empty()); + assert!(scheduler.linkmap.read().await.is_empty()); assert!(scheduler.ran.is_none()); assert_eq!(scheduler.run.len(), 2); assert_eq!(scheduler.resume_step, None); @@ -314,9 +309,11 @@ mod test { let ran = scheduler.ran.as_ref().unwrap(); - assert_eq!(scheduler.linkmap.len(), 1); + assert_eq!(scheduler.linkmap.read().await.len(), 1); assert!(scheduler .linkmap + .read() + .await .contains_key(&instruction1.to_cid().unwrap())); assert_eq!(ran.len(), 1); assert_eq!(scheduler.run.len(), 1); @@ -395,12 +392,16 @@ mod test { let ran = scheduler.ran.as_ref().unwrap(); - assert_eq!(scheduler.linkmap.len(), 1); + assert_eq!(scheduler.linkmap.read().await.len(), 1); assert!(!scheduler .linkmap + .read() + .await .contains_key(&instruction1.to_cid().unwrap())); assert!(scheduler .linkmap + .read() + .await .contains_key(&instruction2.to_cid().unwrap())); assert_eq!(ran.len(), 2); assert!(scheduler.run.is_empty()); diff --git a/homestar-runtime/src/settings.rs b/homestar-runtime/src/settings.rs index b0e2185e..a569c5b7 100644 --- a/homestar-runtime/src/settings.rs +++ b/homestar-runtime/src/settings.rs @@ -7,9 +7,10 @@ use libp2p::{identity, identity::secp256k1}; use rand::{Rng, SeedableRng}; use sec1::der::Decode; use serde::Deserialize; -use serde_with::{base64::Base64, serde_as, DurationSeconds}; +use serde_with::{base64::Base64, serde_as, DisplayFromStr, DurationSeconds}; use std::{ io::Read, + net::{IpAddr, Ipv6Addr}, path::{Path, PathBuf}, time::Duration, }; @@ -60,7 +61,7 @@ pub struct Node { #[derive(Clone, Debug, Deserialize, PartialEq)] #[serde(default)] pub struct Network { - /// Buffer-length for events channel. + /// Buffer-length for event(s) / command(s) channels. pub(crate) events_buffer_len: usize, /// Address for [Swarm] to listen on. /// @@ -81,6 +82,13 @@ pub struct Network { pub(crate) pubsub_idle_timeout: Duration, /// Quorum for receipt records on the DHT. pub(crate) receipt_quorum: usize, + /// RPC-server port. + #[serde_as(as = "DisplayFromStr")] + pub(crate) rpc_host: IpAddr, + /// RPC-server max-concurrent connections. + pub(crate) rpc_max_connections: usize, + /// RPC-server port. + pub(crate) rpc_port: u16, /// Transport connection timeout. #[serde_as(as = "DurationSeconds")] pub(crate) transport_connection_timeout: Duration, @@ -167,6 +175,9 @@ impl Default for Network { pubsub_heartbeat: Duration::new(60, 0), pubsub_idle_timeout: Duration::new(60 * 60 * 24, 0), receipt_quorum: 2, + rpc_host: IpAddr::V6(Ipv6Addr::LOCALHOST), + rpc_max_connections: 10, + rpc_port: 3030, transport_connection_timeout: Duration::new(20, 0), websocket_host: Uri::from_static("127.0.0.1"), websocket_port: 1337, @@ -273,12 +284,17 @@ impl PubkeyConfig { impl Settings { /// Load settings. + /// + /// Inject environment variables naming them properly on the settings, + /// e.g. HOMESTAR__NODE__DB__MAX_POOL_SIZE=10. + /// + /// Use two underscores as defined by the separator below pub fn load() -> Result { + #[cfg(test)] let path = PathBuf::from(env!("CARGO_MANIFEST_DIR")).join("config/settings.toml"); - // inject environment variables naming them properly on the settings - // e.g. [database] url="foo" - // would be injected with environment variable HOMESTAR_DATABASE_URL="foo" - // use one underscore as defined by the separator below + #[cfg(not(test))] + let path = PathBuf::from("config/settings.toml"); + Self::build(path) } @@ -290,7 +306,14 @@ impl Settings { fn build(path: PathBuf) -> Result { let s = Config::builder() - .add_source(File::with_name(&path.as_path().display().to_string())) + .add_source(File::with_name( + &path + .canonicalize() + .map_err(|e| ConfigError::NotFound(e.to_string()))? + .as_path() + .display() + .to_string(), + )) .add_source(Environment::with_prefix("HOMESTAR").separator("__")) .build()?; s.try_deserialize() @@ -303,27 +326,37 @@ mod test { use crate::Settings; #[test] - fn test_defaults() { + fn defaults() { let settings = Settings::load().unwrap(); - let node_settings = settings.node(); + let node_settings = settings.node; let default_settings = Node { shutdown_timeout: Duration::from_secs(20), ..Default::default() }; - assert_eq!(node_settings, &default_settings); + assert_eq!(node_settings, default_settings); } #[test] - fn test_defaults_with_modification() { + fn defaults_with_modification() { let settings = Settings::build("fixtures/settings.toml".into()).unwrap(); - let mut default_modded_settings = Node::default(); + let mut default_modded_settings = Node::default(); default_modded_settings.network.events_buffer_len = 1000; default_modded_settings.network.websocket_port = 9999; default_modded_settings.shutdown_timeout = Duration::from_secs(20); - assert_eq!(settings.node(), &default_modded_settings); + + assert_eq!(settings.node, default_modded_settings); + } + + #[test] + fn overriding_env() { + std::env::set_var("HOMESTAR__NODE__NETWORK__RPC_PORT", "2046"); + std::env::set_var("HOMESTAR__NODE__DB__MAX_POOL_SIZE", "1"); + let settings = Settings::build("fixtures/settings.toml".into()).unwrap(); + assert_eq!(settings.node.network.rpc_port, 2046); + assert_eq!(settings.node.db.max_pool_size, 1); } #[test] diff --git a/homestar-runtime/src/test_utils/event.rs b/homestar-runtime/src/test_utils/event.rs index ce39fa79..8b55861e 100644 --- a/homestar-runtime/src/test_utils/event.rs +++ b/homestar-runtime/src/test_utils/event.rs @@ -3,5 +3,5 @@ use tokio::sync::mpsc; /// Create an [mpsc::Sender], [mpsc::Receiver] pair for [Event]s. pub fn setup_channel(settings: Settings) -> (mpsc::Sender, mpsc::Receiver) { - mpsc::channel(settings.node().network.events_buffer_len) + mpsc::channel(settings.node.network.events_buffer_len) } diff --git a/homestar-runtime/src/worker.rs b/homestar-runtime/src/worker.rs index 3397e131..e083b207 100644 --- a/homestar-runtime/src/worker.rs +++ b/homestar-runtime/src/worker.rs @@ -43,8 +43,11 @@ use homestar_wasm::{ }; use indexmap::IndexMap; use libipld::{Cid, Ipld}; -use std::{collections::BTreeMap, sync::Arc, thread, time::Instant, vec}; -use tokio::{sync::mpsc, task::JoinSet}; +use std::{collections::BTreeMap, sync::Arc, time::Instant, vec}; +use tokio::{ + sync::{mpsc, RwLock}, + task::JoinSet, +}; use tracing::{debug, error}; #[cfg(feature = "ipfs")] use tryhard::RetryFutureConfig; @@ -137,51 +140,51 @@ impl<'a> Worker<'a> { #[allow(dead_code)] pub(crate) async fn run( self, - db: impl Database + Sync, - running_set: &mut RunningTaskSet, + db: impl Database + 'static, + running_tasks: &'a mut RunningTaskSet, ) -> Result<()> { - self.run_queue(db, running_set).await + self.run_queue(db, running_tasks).await } async fn run_queue( mut self, - db: impl Database + Sync, - running_set: &mut RunningTaskSet, + db: impl Database + 'static, + running_tasks: &'a mut RunningTaskSet, ) -> Result<()> { - fn insert_into_map(mut map: Arc>, key: Cid, value: T) + async fn insert_into_map(map: Arc>>, key: Cid, value: T) where T: Clone, { - Arc::make_mut(&mut map) + map.write() + .await .entry(key) .or_insert_with(|| value.clone()); } - fn resolve_cid( + async fn resolve_cid( cid: Cid, workflow_settings: Arc, - linkmap: &Arc>>, - db: &impl Database, - event_sender: &Arc>, + linkmap: Arc>>>, + db: impl Database, + event_sender: Arc>, ) -> Result, ResolveError> { - if let Some(result) = linkmap.get(&cid) { + if let Some(result) = linkmap.read().await.get(&cid) { Ok(result.to_owned()) } else { match Db::find_instruction(Pointer::new(cid), &mut db.conn()?) { Ok(found) => Ok(found.output_as_arg()), Err(_) => { debug!("no related instruction receipt found in the DB"); - let channel = BoundedChannel::oneshot(); + let (tx, rx) = BoundedChannel::oneshot(); event_sender .try_send(Event::FindRecord(QueryRecord::with( cid, CapsuleTag::Receipt, - channel.tx, + tx, ))) .map_err(|err| ResolveError::TransportError(err.to_string()))?; - let found = match channel - .rx + let found = match rx .recv_deadline(Instant::now() + workflow_settings.p2p_timeout) { Ok(ResponseEvent::Found(Ok(FoundEvent::Receipt(found)))) => found, @@ -200,13 +203,12 @@ impl<'a> Worker<'a> { let found_result = found.output_as_arg(); // Store the result in the linkmap for use in next iterations. - insert_into_map(Arc::clone(linkmap), cid, found_result.clone()); + insert_into_map(linkmap.clone(), cid, found_result.clone()).await; Ok(found_result) } } } } - for batch in self.scheduler.run.into_iter() { let (mut task_set, handles) = batch.into_iter().try_fold( (TaskSet::new(), vec![]), @@ -237,28 +239,23 @@ impl<'a> Worker<'a> { let state = State::default(); let mut wasm_ctx = WasmContext::new(state)?; - let resolved = args.resolve(|cid| { - // Resolve Cid in a separate native threads, - // under a `std::thread::scope`. - thread::scope(|scope| { - let handle = scope.spawn(|| { - resolve_cid( - cid, - self.workflow_settings.clone(), - &self.scheduler.linkmap, - &db, - &self.event_sender, - ) - }); - - handle.join().map_err(|_| { - anyhow!("failed to join thread for resolving Cid: {cid}") - })? - }) - })?; + let db = db.clone(); + let settings = self.workflow_settings.clone(); + let linkmap = self.scheduler.linkmap.clone(); + let event_sender = self.event_sender.clone(); + + let resolved = args.resolve(move |cid| { + Box::pin(resolve_cid( + cid, + settings.clone(), + linkmap.clone(), + db.clone(), + event_sender.clone(), + )) + }); let handle = task_set.spawn(async move { - match wasm_ctx.run(wasm, &fun, resolved).await { + match wasm_ctx.run(wasm, &fun, resolved.await?).await { Ok(output) => { Ok((output, instruction_ptr, invocation_ptr, meta)) } @@ -278,7 +275,7 @@ impl<'a> Worker<'a> { )?; // Concurrently add handles to Runner's running set. - running_set.append_or_insert(self.workflow_info.cid(), handles); + running_tasks.append_or_insert(self.workflow_info.cid(), handles); while let Some(res) = task_set.join_next().await { let (executed, instruction_ptr, invocation_ptr, meta) = res??; @@ -293,7 +290,7 @@ impl<'a> Worker<'a> { ); let mut receipt = Receipt::try_with(instruction_ptr, &invocation_receipt)?; - Arc::make_mut(&mut Arc::clone(&self.scheduler.linkmap)).insert( + self.scheduler.linkmap.write().await.insert( Cid::try_from(receipt.instruction())?, receipt.output_as_arg(), ); @@ -312,12 +309,12 @@ impl<'a> Worker<'a> { let stored_receipt = Db::store_receipt(receipt, &mut db.conn()?)?; // send internal event - let channel = BoundedChannel::oneshot(); + let (tx, _rx) = BoundedChannel::oneshot(); self.event_sender .try_send(Event::CapturedReceipt(Captured::with( stored_receipt, self.workflow_info.clone(), - channel.tx, + tx, )))? } } @@ -524,7 +521,7 @@ mod test { .await .unwrap(); - assert!(worker.scheduler.linkmap.is_empty()); + assert!(worker.scheduler.linkmap.read().await.is_empty()); assert!(worker.scheduler.ran.is_none()); assert_eq!(worker.scheduler.run.len(), 2); assert_eq!(worker.scheduler.resume_step, None); @@ -533,12 +530,12 @@ mod test { #[cfg(feature = "ipfs")] { - let mut running_set = DashMap::new(); + let mut running_tasks = DashMap::new(); let worker_workflow_cid = worker.workflow_info.cid; - worker.run(db.clone(), &mut running_set).await.unwrap(); - assert_eq!(running_set.len(), 1); - assert!(running_set.contains_key(&worker_workflow_cid)); - assert_eq!(running_set.get(&worker_workflow_cid).unwrap().len(), 2); + worker.run(db.clone(), &mut running_tasks).await.unwrap(); + assert_eq!(running_tasks.len(), 1); + assert!(running_tasks.contains_key(&worker_workflow_cid)); + assert_eq!(running_tasks.get(&worker_workflow_cid).unwrap().len(), 2); // first time check DHT for workflow info let workflow_info_event = rx.recv().await.unwrap(); @@ -693,10 +690,12 @@ mod test { .await .unwrap(); - assert_eq!(worker.scheduler.linkmap.len(), 1); + assert_eq!(worker.scheduler.linkmap.read().await.len(), 1); assert!(worker .scheduler .linkmap + .read() + .await .contains_key(&instruction1.to_cid().unwrap())); assert_eq!(worker.scheduler.ran.as_ref().unwrap().len(), 1); assert_eq!(worker.scheduler.run.len(), 1); @@ -706,12 +705,12 @@ mod test { #[cfg(feature = "ipfs")] { - let mut running_set = DashMap::new(); + let mut running_tasks = DashMap::new(); let worker_workflow_cid = worker.workflow_info.cid; - worker.run(db.clone(), &mut running_set).await.unwrap(); - assert_eq!(running_set.len(), 1); - assert!(running_set.contains_key(&worker_workflow_cid)); - assert_eq!(running_set.get(&worker_workflow_cid).unwrap().len(), 1); + worker.run(db.clone(), &mut running_tasks).await.unwrap(); + assert_eq!(running_tasks.len(), 1); + assert!(running_tasks.contains_key(&worker_workflow_cid)); + assert_eq!(running_tasks.get(&worker_workflow_cid).unwrap().len(), 1); // we should have received 1 receipt let next_run_receipt = rx.recv().await.unwrap(); @@ -863,14 +862,18 @@ mod test { .await .unwrap(); - assert_eq!(worker.scheduler.linkmap.len(), 1); + assert_eq!(worker.scheduler.linkmap.read().await.len(), 1); assert!(!worker .scheduler .linkmap + .read() + .await .contains_key(&instruction1.to_cid().unwrap())); assert!(worker .scheduler .linkmap + .read() + .await .contains_key(&instruction2.to_cid().unwrap())); assert_eq!(worker.scheduler.ran.as_ref().unwrap().len(), 2); assert!(worker.scheduler.run.is_empty()); diff --git a/homestar-runtime/src/workflow/info.rs b/homestar-runtime/src/workflow/info.rs index 8963a7a5..72b90037 100644 --- a/homestar-runtime/src/workflow/info.rs +++ b/homestar-runtime/src/workflow/info.rs @@ -219,14 +219,14 @@ impl Info { conn: Option<&'a mut Connection>, handle_timeout_fn: impl FnOnce(Cid, Option<&'a mut Connection>) -> Result, ) -> Result { - let channel = BoundedChannel::oneshot(); + let (tx, rx) = BoundedChannel::oneshot(); event_sender.try_send(Event::FindRecord(QueryRecord::with( workflow_cid, CapsuleTag::Workflow, - channel.tx, + tx, )))?; - match channel.rx.recv_deadline(Instant::now() + p2p_timeout) { + match rx.recv_deadline(Instant::now() + p2p_timeout) { Ok(ResponseEvent::Found(Ok(FoundEvent::Workflow(workflow_info)))) => { // store workflow from info if let Some(conn) = conn { diff --git a/homestar-runtime/tests/cli.rs b/homestar-runtime/tests/cli.rs new file mode 100644 index 00000000..bbe741b7 --- /dev/null +++ b/homestar-runtime/tests/cli.rs @@ -0,0 +1,234 @@ +use anyhow::{Context, Result}; +use assert_cmd::{crate_name, prelude::*}; +use nix::{ + sys::signal::{self, Signal}, + unistd::Pid, +}; +use once_cell::sync::Lazy; +use predicates::prelude::*; +use retry::{delay::Fixed, retry}; +use serial_test::serial; +use std::{ + fs, + net::{IpAddr, Ipv6Addr, Shutdown, SocketAddr, TcpStream}, + path::PathBuf, + process::{Command, Stdio}, + time::Duration, +}; +use sysinfo::{PidExt, ProcessExt, SystemExt}; +use wait_timeout::ChildExt; + +static BIN: Lazy = Lazy::new(|| assert_cmd::cargo::cargo_bin(crate_name!())); + +fn stop_bin() -> Result<()> { + Command::new(BIN.as_os_str()) + .arg("stop") + .stdout(Stdio::null()) + .stderr(Stdio::null()) + .status() + .context("Failed to stop Homestar server")?; + Ok(()) +} + +#[test] +#[serial] +fn test_help_serial() -> Result<()> { + let _ = stop_bin(); + Command::new(BIN.as_os_str()) + .arg("help") + .assert() + .success() + .stdout(predicate::str::contains("start")) + .stdout(predicate::str::contains("stop")) + .stdout(predicate::str::contains("ping")) + .stdout(predicate::str::contains("run")) + .stdout(predicate::str::contains("help")) + .stdout(predicate::str::contains("version")); + + Command::new(BIN.as_os_str()) + .arg("-h") + .assert() + .success() + .stdout(predicate::str::contains("start")) + .stdout(predicate::str::contains("stop")) + .stdout(predicate::str::contains("ping")) + .stdout(predicate::str::contains("run")) + .stdout(predicate::str::contains("help")) + .stdout(predicate::str::contains("version")); + let _ = stop_bin(); + + Ok(()) +} + +#[test] +#[serial] +fn test_version_serial() -> Result<()> { + let _ = stop_bin(); + Command::new(BIN.as_os_str()) + .arg("--version") + .assert() + .success() + .stdout(predicate::str::contains(format!( + "{} {}", + crate_name!(), + env!("CARGO_PKG_VERSION") + ))); + let _ = stop_bin(); + + Ok(()) +} + +#[test] +#[serial] +fn test_server_not_running_serial() -> Result<()> { + let _ = stop_bin(); + + Command::new(BIN.as_os_str()) + .arg("ping") + .assert() + .failure() + .stderr(predicate::str::contains("Connection refused")); + + Command::new(BIN.as_os_str()) + .arg("ping") + .arg("--host") + .arg("::1") + .assert() + .failure() + .stderr(predicate::str::contains("Connection refused")); + + Command::new(BIN.as_os_str()) + .arg("ping") + .arg("--host") + .arg("::2") + .assert() + .failure() + .stderr( + predicate::str::contains("No route to host") + .or(predicate::str::contains("Network is unreachable")), + ); + + Command::new(BIN.as_os_str()) + .arg("stop") + .assert() + .failure() + .stderr( + predicate::str::contains("Connection refused") + .or(predicate::str::contains("server was already shutdown")), + ); + let _ = stop_bin(); + + Ok(()) +} + +#[test] +#[serial] +fn test_server_serial() -> Result<()> { + let _ = stop_bin(); + + Command::new(BIN.as_os_str()) + .arg("start") + .arg("-db") + .arg("homestar.db") + .assert() + .failure(); + + let mut homestar_proc = Command::new(BIN.as_os_str()) + .arg("start") + .arg("--db") + .arg("homestar.db") + .stdout(Stdio::piped()) + .spawn() + .unwrap(); + + let socket = SocketAddr::new(IpAddr::V6(Ipv6Addr::LOCALHOST), 3030); + let result = retry(Fixed::from_millis(500), || { + TcpStream::connect(socket).map(|stream| stream.shutdown(Shutdown::Both)) + }); + + if result.is_err() { + homestar_proc.kill().unwrap(); + panic!("Homestar server/runtime failed to start in time"); + } + + Command::new(BIN.as_os_str()) + .arg("ping") + .assert() + .success() + .stdout(predicate::str::contains("::1")) + .stdout(predicate::str::contains("pong")); + + Command::new(BIN.as_os_str()) + .arg("ping") + .arg("-p") + .arg("9999") + .assert() + .failure() + .stderr(predicate::str::contains("Connection refused")); + + let _ = Command::new(BIN.as_os_str()).arg("stop").output(); + + if let Ok(None) = homestar_proc.try_wait() { + let _status_code = match homestar_proc.wait_timeout(Duration::from_secs(1)).unwrap() { + Some(status) => status.code(), + None => { + homestar_proc.kill().unwrap(); + homestar_proc.wait().unwrap().code() + } + }; + } + let _ = stop_bin(); + + Ok(()) +} + +#[test] +#[serial] +fn test_daemon_serial() -> Result<()> { + let _ = stop_bin(); + + Command::new(BIN.as_os_str()) + .arg("start") + .arg("-d") + .env("DATABASE_URL", "homestar.db") + .stdout(Stdio::piped()) + .assert() + .success(); + + let system = sysinfo::System::new_all(); + let pid = system + .processes_by_exact_name("homestar-runtime") + .collect::>() + .first() + .map(|p| p.pid().as_u32()) + .unwrap_or( + fs::read_to_string("/tmp/homestar.pid") + .expect("Should have a PID file") + .trim() + .parse::() + .unwrap(), + ); + + let socket = SocketAddr::new(IpAddr::V6(Ipv6Addr::LOCALHOST), 3030); + let result = retry(Fixed::from_millis(500), || { + TcpStream::connect(socket).map(|stream| stream.shutdown(Shutdown::Both)) + }); + + if result.is_err() { + panic!("Homestar server/runtime failed to start in time"); + } + + Command::new(BIN.as_os_str()) + .arg("ping") + .assert() + .success() + .stdout(predicate::str::contains("::1")) + .stdout(predicate::str::contains("pong")); + + let _result = signal::kill(Pid::from_raw(pid.try_into().unwrap()), Signal::SIGTERM); + + Command::new(BIN.as_os_str()).arg("ping").assert().failure(); + let _ = stop_bin(); + + Ok(()) +} diff --git a/homestar-wasm/tests/execute_wasm.rs b/homestar-wasm/tests/execute_wasm.rs index d6ba1212..375c4f4c 100644 --- a/homestar-wasm/tests/execute_wasm.rs +++ b/homestar-wasm/tests/execute_wasm.rs @@ -307,10 +307,13 @@ async fn test_execute_wasms_in_seq_with_threaded_result() { // Short-circuit resolve with known value. let resolved = parsed .resolve(|_| { - Ok(InstructionResult::Ok(Arg::Value( - wasmtime::component::Val::String("RoundRound".into()), - ))) + Box::pin(async { + Ok(InstructionResult::Ok(Arg::Value( + wasmtime::component::Val::String("RoundRound".into()), + ))) + }) }) + .await .unwrap(); let res2 = env2.execute(resolved).await.unwrap(); @@ -376,10 +379,13 @@ async fn test_execute_wasms_with_multiple_inits() { // Short-circuit resolve with known value. let resolved = parsed .resolve(|_| { - Ok(InstructionResult::Ok(Arg::Ipld(Ipld::String( - "RoundRound".into(), - )))) + Box::pin(async { + Ok(InstructionResult::Ok(Arg::Value( + wasmtime::component::Val::String("RoundRound".into()), + ))) + }) }) + .await .unwrap(); let res2 = env2.execute(resolved).await.unwrap();