From 770222fe7315a19cc672161ec1b685b831fe0d2b Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Beno=C3=AEt=20CORTIER?= Date: Thu, 14 Nov 2024 12:52:35 +0900 Subject: [PATCH 1/4] feat(dgw): persistent job queue for crash resistance MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit This year we added some background tasks in the Gateway that should not be canceled, or if they are, should be restarted later. Essentially two tasks: mass deletion of recordings (relatively important, but it's always possible to launch indexing in DVLS in case of a problem) and remuxing recordings to webm format (good to have). If the service is killed in the middle of one of these operations, we should resume execution on the next startup. This persisent job queue is implemented using Turso’s libSQL. Using libSQL (or SQLite) for implementing the queue allow us to benefit from all the work put into implementing a reliable, secure and performant disk-based database instead of attempting to implement our own ad-hoc storage and debugging it forever. Inspiration was taken from 37signals' Solid Queue: - https://dev.37signals.com/introducing-solid-queue/ - https://github.com/rails/solid_queue/ And "How to build a job queue with Rust and PostgreSQL" from kerkour.com: - https://kerkour.com/rust-job-queue-with-postgresql The 'user_version' value, which is a SQLite PRAGMA, is used to keep track of the migration state. It's a very lightweight approach as it is just an integer at a fixed offset in the SQLite file. - https://sqlite.org/pragma.html#pragma_user_version - https://www.sqlite.org/fileformat.html#user_version_number Introducing Turso’s libSQL, as opposed to SQLite, will serve us for "Recording Farms" in the future. We’ll want instances of a same Recording Farm to coordinate. At this point, we’ll want to use Turso's libSQL network database feature. Indeed, putting the SQLite database file on a virtual filesystem is not recommended. This can lead to corruption and data loss. Turso will allow us to have a local mode for the simplest setups, and a network and distributed mode for Recording Farms when we get there. --- Cargo.lock | 659 +++++++++++++++++++++++++-- crates/job-queue-libsql/Cargo.toml | 20 + crates/job-queue-libsql/src/lib.rs | 319 +++++++++++++ crates/job-queue/Cargo.toml | 15 + crates/job-queue/src/lib.rs | 182 ++++++++ devolutions-gateway/Cargo.toml | 2 + devolutions-gateway/src/api/jrec.rs | 83 +++- devolutions-gateway/src/config.rs | 15 +- devolutions-gateway/src/job_queue.rs | 248 ++++++++++ devolutions-gateway/src/lib.rs | 6 + devolutions-gateway/src/main.rs | 13 +- devolutions-gateway/src/recording.rs | 46 +- devolutions-gateway/src/service.rs | 12 + devolutions-gateway/tests/config.rs | 5 + 14 files changed, 1556 insertions(+), 69 deletions(-) create mode 100644 crates/job-queue-libsql/Cargo.toml create mode 100644 crates/job-queue-libsql/src/lib.rs create mode 100644 crates/job-queue/Cargo.toml create mode 100644 crates/job-queue/src/lib.rs create mode 100644 devolutions-gateway/src/job_queue.rs diff --git a/Cargo.lock b/Cargo.lock index 7324017a5..c7179a00f 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -61,6 +61,18 @@ dependencies = [ "aes", ] +[[package]] +name = "ahash" +version = "0.8.11" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "e89da841a80418a9b391ebaea17f5c112ffaaa96f621d2c285b5174da76b9011" +dependencies = [ + "cfg-if", + "once_cell", + "version_check", + "zerocopy", +] + [[package]] name = "aho-corasick" version = "1.1.3" @@ -76,12 +88,12 @@ version = "0.13.4" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "7b0e3b97a21e41ec5c19bfd9b4fc1f7086be104f8b988681230247ffc91cc8ed" dependencies = [ - "axum", + "axum 0.7.7", "axum-extra", "bytes 1.8.0", "cfg-if", "http 1.1.0", - "indexmap", + "indexmap 2.6.0", "schemars", "serde", "serde_json", @@ -92,6 +104,12 @@ dependencies = [ "tracing", ] +[[package]] +name = "allocator-api2" +version = "0.2.20" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "45862d1c77f2228b9e10bc609d5bc203d86ebc9b87ad8d5d5167a6c9abf739d9" + [[package]] name = "android-tzdata" version = "0.1.1" @@ -291,7 +309,7 @@ version = "0.22.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "df7a4168111d7eb622a31b214057b8509c0a7e1794f44c546d742330dc793972" dependencies = [ - "bindgen", + "bindgen 0.69.5", "cc", "cmake", "dunce", @@ -300,6 +318,34 @@ dependencies = [ "paste", ] +[[package]] +name = "axum" +version = "0.6.20" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "3b829e4e32b91e643de6eafe82b1d90675f5874230191a4ffbc1b336dec4d6bf" +dependencies = [ + "async-trait", + "axum-core 0.3.4", + "bitflags 1.3.2", + "bytes 1.8.0", + "futures-util", + "http 0.2.12", + "http-body 0.4.6", + "hyper 0.14.31", + "itoa", + "matchit", + "memchr", + "mime", + "percent-encoding", + "pin-project-lite 0.2.15", + "rustversion", + "serde", + "sync_wrapper 0.1.2", + "tower 0.4.13", + "tower-layer", + "tower-service", +] + [[package]] name = "axum" version = "0.7.7" @@ -307,7 +353,7 @@ source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "504e3947307ac8326a5437504c517c4b56716c9d98fac0028c2acc7ca47d70ae" dependencies = [ "async-trait", - "axum-core", + "axum-core 0.4.5", "base64 0.22.1", "bytes 1.8.0", "futures-util", @@ -337,6 +383,23 @@ dependencies = [ "tracing", ] +[[package]] +name = "axum-core" +version = "0.3.4" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "759fa577a247914fd3f7f76d62972792636412fbfd634cd452f6a385a74d2d2c" +dependencies = [ + "async-trait", + "bytes 1.8.0", + "futures-util", + "http 0.2.12", + "http-body 0.4.6", + "mime", + "rustversion", + "tower-layer", + "tower-service", +] + [[package]] name = "axum-core" version = "0.4.5" @@ -364,8 +427,8 @@ version = "0.9.4" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "73c3220b188aea709cf1b6c5f9b01c3bd936bb08bd2b5184a12b35ac8131b1f9" dependencies = [ - "axum", - "axum-core", + "axum 0.7.7", + "axum-core 0.4.5", "bytes 1.8.0", "futures-util", "headers", @@ -456,6 +519,38 @@ version = "1.6.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "8c3c1a368f70d6cf7302d78f8f7093da241fb8e8807c05cc9e51a125895a6d5b" +[[package]] +name = "bincode" +version = "1.3.3" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "b1f45e9417d87227c7a56d22e471c6206462cba514c7590c09aff4cf6d1ddcad" +dependencies = [ + "serde", +] + +[[package]] +name = "bindgen" +version = "0.66.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "f2b84e06fc203107bfbad243f4aba2af864eb7db3b1cf46ea0a023b0b433d2a7" +dependencies = [ + "bitflags 2.6.0", + "cexpr", + "clang-sys", + "lazy_static", + "lazycell", + "log", + "peeking_take_while", + "prettyplease", + "proc-macro2 1.0.88", + "quote 1.0.37", + "regex", + "rustc-hash 1.1.0", + "shlex", + "syn 2.0.87", + "which", +] + [[package]] name = "bindgen" version = "0.69.5" @@ -583,6 +678,9 @@ name = "bytes" version = "1.8.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "9ac0150caa2ae65ca5bd83f25c7de183dea78d4d366469f148435e2acfbad0da" +dependencies = [ + "serde", +] [[package]] name = "cadeau" @@ -1012,7 +1110,7 @@ dependencies = [ "tap", "thiserror", "tokio 1.41.1", - "tokio-rustls", + "tokio-rustls 0.26.0", "tracing", "uuid", "win-api-wrappers", @@ -1038,7 +1136,7 @@ dependencies = [ "anyhow", "argon2", "async-trait", - "axum", + "axum 0.7.7", "axum-extra", "backoff", "bytes 1.8.0", @@ -1064,6 +1162,8 @@ dependencies = [ "ironrdp-pdu 0.1.0 (git+https://github.com/Devolutions/IronRDP?rev=7c268d863048d0a9182b3f7bf778668de8db4ccf)", "ironrdp-rdcleanpath", "jmux-proxy", + "job-queue", + "job-queue-libsql", "multibase", "network-scanner", "ngrok", @@ -1087,11 +1187,11 @@ dependencies = [ "thiserror", "time", "tokio 1.41.1", - "tokio-rustls", + "tokio-rustls 0.26.0", "tokio-test", "tokio-tungstenite", "tower 0.5.1", - "tower-http", + "tower-http 0.5.2", "tracing", "tracing-cov-mark", "transport", @@ -1142,7 +1242,7 @@ dependencies = [ "aide", "anyhow", "async-trait", - "axum", + "axum 0.7.7", "base16ct", "base64 0.22.1", "camino", @@ -1161,7 +1261,7 @@ dependencies = [ "sha1", "sha2", "tokio 1.41.1", - "tower-http", + "tower-http 0.5.2", "tower-service", "tracing", "walkdir", @@ -1459,6 +1559,24 @@ dependencies = [ "arrayvec", ] +[[package]] +name = "fallible-iterator" +version = "0.2.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "4443176a9f2c162692bd3d352d745ef9413eec5782a80d8fd6f8a1ac692a07f7" + +[[package]] +name = "fallible-iterator" +version = "0.3.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "2acce4a10f12dc2fb14a218589d4f1f62ef011b2d0cc4b3cb1bba8e94da14649" + +[[package]] +name = "fallible-streaming-iterator" +version = "0.1.9" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "7360491ce676a36bf9bb3c56c1aa791658183a54d2744120f27285738d90465a" + [[package]] name = "fastrand" version = "2.1.1" @@ -1706,8 +1824,10 @@ source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "c4567c8db10ae91089c99af84c68c38da3ec2f087c3f82960bcdbf3656b6f4d7" dependencies = [ "cfg-if", + "js-sys", "libc", "wasi", + "wasm-bindgen", ] [[package]] @@ -1755,7 +1875,7 @@ dependencies = [ "futures-sink", "futures-util", "http 0.2.12", - "indexmap", + "indexmap 2.6.0", "slab", "tokio 1.41.1", "tokio-util", @@ -1774,19 +1894,44 @@ dependencies = [ "futures-core", "futures-sink", "http 1.1.0", - "indexmap", + "indexmap 2.6.0", "slab", "tokio 1.41.1", "tokio-util", "tracing", ] +[[package]] +name = "hashbrown" +version = "0.12.3" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "8a9ee70c43aaf417c914396645a0fa852624801b24ebb7ae78fe8272889ac888" + +[[package]] +name = "hashbrown" +version = "0.14.5" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "e5274423e17b7c9fc20b6e7e208532f9b19825d82dfd615708b70edd83df41f1" +dependencies = [ + "ahash", + "allocator-api2", +] + [[package]] name = "hashbrown" version = "0.15.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "1e087f84d4f86bf4b218b927129862374b72199ae7d8657835f1e89000eea4fb" +[[package]] +name = "hashlink" +version = "0.8.4" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "e8094feaf31ff591f651a2664fb9cfd92bba7a60ce3197265e9482ebe753c8f7" +dependencies = [ + "hashbrown 0.14.5", +] + [[package]] name = "headers" version = "0.4.0" @@ -1940,6 +2085,12 @@ dependencies = [ "pin-project-lite 0.2.15", ] +[[package]] +name = "http-range-header" +version = "0.3.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "add0ab9360ddbd88cfeb3bd9574a1d85cfdfa14db10b3e21d3700dbc4328758f" + [[package]] name = "http-range-header" version = "0.4.1" @@ -2009,6 +2160,24 @@ dependencies = [ "want", ] +[[package]] +name = "hyper-rustls" +version = "0.25.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "399c78f9338483cb7e630c8474b07268983c6bd5acee012e4211f9f7bb21b070" +dependencies = [ + "futures-util", + "http 0.2.12", + "hyper 0.14.31", + "log", + "rustls 0.22.4", + "rustls-native-certs 0.7.3", + "rustls-pki-types", + "tokio 1.41.1", + "tokio-rustls 0.25.0", + "webpki-roots", +] + [[package]] name = "hyper-rustls" version = "0.27.3" @@ -2020,13 +2189,25 @@ dependencies = [ "hyper 1.5.0", "hyper-util", "rustls 0.23.15", - "rustls-native-certs", + "rustls-native-certs 0.8.0", "rustls-pki-types", "tokio 1.41.1", - "tokio-rustls", + "tokio-rustls 0.26.0", "tower-service", ] +[[package]] +name = "hyper-timeout" +version = "0.4.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "bbb958482e8c7be4bc3cf272a766a2b0bf1a6755e7a6ae777f017a31d11b13b1" +dependencies = [ + "hyper 0.14.31", + "pin-project-lite 0.2.15", + "tokio 1.41.1", + "tokio-io-timeout", +] + [[package]] name = "hyper-tls" version = "0.5.0" @@ -2231,6 +2412,16 @@ dependencies = [ "windows-sys 0.52.0", ] +[[package]] +name = "indexmap" +version = "1.9.3" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "bd070e393353796e801d209ad339e89596eb4c8d430d18ede6a1cced8fafbd99" +dependencies = [ + "autocfg", + "hashbrown 0.12.3", +] + [[package]] name = "indexmap" version = "2.6.0" @@ -2238,7 +2429,7 @@ source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "707907fe3c25f5424cce2cb7e1cbcafee6bdbe735ca90ef77c29e84591e5b9da" dependencies = [ "equivalent", - "hashbrown", + "hashbrown 0.15.0", "serde", ] @@ -2581,7 +2772,7 @@ dependencies = [ "ironrdp-svc 0.1.0 (git+https://github.com/Devolutions/IronRDP?rev=2e1a9ac88e38e7d92d893007bc25d0a05c365861)", "ironrdp-tokio", "tokio 1.41.1", - "tokio-rustls", + "tokio-rustls 0.26.0", "tracing", ] @@ -2678,7 +2869,7 @@ dependencies = [ "proxy-types", "proxy_cfg", "rustls 0.23.15", - "rustls-native-certs", + "rustls-native-certs 0.8.0", "rustls-pemfile 2.2.0", "seahorse", "sysinfo", @@ -2725,6 +2916,31 @@ dependencies = [ "tracing", ] +[[package]] +name = "job-queue" +version = "0.0.0" +dependencies = [ + "anyhow", + "async-trait", + "tracing", + "uuid", +] + +[[package]] +name = "job-queue-libsql" +version = "0.0.0" +dependencies = [ + "anyhow", + "async-trait", + "job-queue", + "libsql", + "serde", + "tracing", + "typed-builder", + "ulid", + "uuid", +] + [[package]] name = "jobserver" version = "0.1.32" @@ -2820,6 +3036,138 @@ dependencies = [ "redox_syscall", ] +[[package]] +name = "libsql" +version = "0.6.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "fe18646e4ef8db446bc3e3f5fb96131483203bc5f4998ff149f79a067530c01c" +dependencies = [ + "anyhow", + "async-stream", + "async-trait", + "base64 0.21.7", + "bincode", + "bitflags 2.6.0", + "bytes 1.8.0", + "fallible-iterator 0.3.0", + "futures", + "http 0.2.12", + "hyper 0.14.31", + "hyper-rustls 0.25.0", + "libsql-hrana", + "libsql-sqlite3-parser", + "libsql-sys", + "libsql_replication", + "parking_lot", + "serde", + "serde_json", + "thiserror", + "tokio 1.41.1", + "tokio-stream", + "tokio-util", + "tonic", + "tonic-web", + "tower 0.4.13", + "tower-http 0.4.4", + "tracing", + "uuid", + "zerocopy", +] + +[[package]] +name = "libsql-ffi" +version = "0.5.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "5f2a50a585a1184a43621a9133b7702ba5cb7a87ca5e704056b19d8005de6faf" +dependencies = [ + "bindgen 0.66.1", + "cc", +] + +[[package]] +name = "libsql-hrana" +version = "0.2.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "aeaf5d19e365465e1c23d687a28c805d7462531b3f619f0ba49d3cf369890a3e" +dependencies = [ + "base64 0.21.7", + "bytes 1.8.0", + "prost", + "serde", +] + +[[package]] +name = "libsql-rusqlite" +version = "0.33.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "ae65c66088dcd309abbd5617ae046abac2a2ee0a7fdada5127353bd68e0a27ea" +dependencies = [ + "bitflags 2.6.0", + "fallible-iterator 0.2.0", + "fallible-streaming-iterator", + "hashlink", + "libsql-ffi", + "smallvec", +] + +[[package]] +name = "libsql-sqlite3-parser" +version = "0.13.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "15a90128c708356af8f7d767c9ac2946692c9112b4f74f07b99a01a60680e413" +dependencies = [ + "bitflags 2.6.0", + "cc", + "fallible-iterator 0.3.0", + "indexmap 2.6.0", + "log", + "memchr", + "phf", + "phf_codegen", + "phf_shared", + "uncased", +] + +[[package]] +name = "libsql-sys" +version = "0.8.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "5c05b61c226781d6f5e26e3e7364617f19c0c1d5332035802e9229d6024cec05" +dependencies = [ + "bytes 1.8.0", + "libsql-ffi", + "libsql-rusqlite", + "once_cell", + "tracing", + "zerocopy", +] + +[[package]] +name = "libsql_replication" +version = "0.6.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "9cf40c4c2c01462da758272976de0a23d19b4e9c714db08efecf262d896655b5" +dependencies = [ + "aes", + "async-stream", + "async-trait", + "bytes 1.8.0", + "cbc", + "libsql-rusqlite", + "libsql-sys", + "parking_lot", + "prost", + "serde", + "thiserror", + "tokio 1.41.1", + "tokio-stream", + "tokio-util", + "tonic", + "tracing", + "uuid", + "zerocopy", +] + [[package]] name = "libudis86-sys" version = "0.2.1" @@ -3585,6 +3933,12 @@ dependencies = [ "thiserror", ] +[[package]] +name = "peeking_take_while" +version = "0.1.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "19b17cddbe7ec3f8bc800887bab5e717348c95ea2ca0b1bf0837fb964dc67099" + [[package]] name = "pem-rfc7468" version = "0.7.0" @@ -3600,6 +3954,45 @@ version = "2.3.1" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "e3148f5046208a5d56bcfc03053e3ca6334e51da8dfb19b6cdc8b306fae3283e" +[[package]] +name = "phf" +version = "0.11.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "ade2d8b8f33c7333b51bcf0428d37e217e9f32192ae4772156f65063b8ce03dc" +dependencies = [ + "phf_shared", +] + +[[package]] +name = "phf_codegen" +version = "0.11.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "e8d39688d359e6b34654d328e262234662d16cc0f60ec8dcbe5e718709342a5a" +dependencies = [ + "phf_generator", + "phf_shared", +] + +[[package]] +name = "phf_generator" +version = "0.11.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "48e4cc64c2ad9ebe670cb8fd69dd50ae301650392e81c05f9bfcb2d5bdbc24b0" +dependencies = [ + "phf_shared", + "rand", +] + +[[package]] +name = "phf_shared" +version = "0.11.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "90fcb95eef784c2ac79119d1dd819e162b5da872ce6f3c3abe1e8ca1c082f72b" +dependencies = [ + "siphasher", + "uncased", +] + [[package]] name = "picky" version = "7.0.0-rc.9" @@ -4009,6 +4402,29 @@ dependencies = [ "unarray", ] +[[package]] +name = "prost" +version = "0.12.6" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "deb1435c188b76130da55f17a466d252ff7b1418b2ad3e037d127b94e3411f29" +dependencies = [ + "bytes 1.8.0", + "prost-derive", +] + +[[package]] +name = "prost-derive" +version = "0.12.6" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "81bddcdb20abf9501610992b6759a4c888aef7d1a7247ef75e2404275ac24af1" +dependencies = [ + "anyhow", + "itertools", + "proc-macro2 1.0.88", + "quote 1.0.37", + "syn 2.0.87", +] + [[package]] name = "proxy-generators" version = "0.0.0" @@ -4318,7 +4734,7 @@ dependencies = [ "http-body 1.0.1", "http-body-util", "hyper 1.5.0", - "hyper-rustls", + "hyper-rustls 0.27.3", "hyper-util", "ipnet", "js-sys", @@ -4329,7 +4745,7 @@ dependencies = [ "pin-project-lite 0.2.15", "quinn", "rustls 0.23.15", - "rustls-native-certs", + "rustls-native-certs 0.8.0", "rustls-pemfile 2.2.0", "rustls-pki-types", "serde", @@ -4337,7 +4753,7 @@ dependencies = [ "serde_urlencoded", "sync_wrapper 1.0.1", "tokio 1.41.1", - "tokio-rustls", + "tokio-rustls 0.26.0", "tokio-util", "tower-service", "url", @@ -4534,6 +4950,20 @@ dependencies = [ "webpki", ] +[[package]] +name = "rustls" +version = "0.22.4" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "bf4ef73721ac7bcd79b2b315da7779d8fc09718c6b3d2d1b2d94850eb8c18432" +dependencies = [ + "log", + "ring 0.17.8", + "rustls-pki-types", + "rustls-webpki", + "subtle", + "zeroize", +] + [[package]] name = "rustls" version = "0.23.15" @@ -4561,6 +4991,19 @@ dependencies = [ "windows-sys 0.59.0", ] +[[package]] +name = "rustls-native-certs" +version = "0.7.3" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "e5bfb394eeed242e909609f56089eecfe5fda225042e8b171791b9c95f5931e5" +dependencies = [ + "openssl-probe", + "rustls-pemfile 2.2.0", + "rustls-pki-types", + "schannel", + "security-framework", +] + [[package]] name = "rustls-native-certs" version = "0.8.0" @@ -4665,7 +5108,7 @@ source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "09c024468a378b7e36765cd36702b7a90cc3cba11654f6685c8f233408e89e92" dependencies = [ "dyn-clone", - "indexmap", + "indexmap 2.6.0", "schemars_derive", "serde", "serde_json", @@ -4801,7 +5244,7 @@ source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "8de514ef58196f1fc96dcaef80fe6170a1ce6215df9687a93fe8300e773fefc5" dependencies = [ "form_urlencoded", - "indexmap", + "indexmap 2.6.0", "itoa", "ryu", "serde", @@ -4835,7 +5278,7 @@ version = "0.13.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "cd34f36fe4c5ba9654417139a9b3a20d2e1de6012ee678ad14d240c22c78d8d6" dependencies = [ - "axum", + "axum 0.7.7", "futures", "percent-encoding", "serde", @@ -4869,7 +5312,7 @@ version = "0.9.34+deprecated" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "6a8b1a1a2ebf674015cc02edccce75287f1a0130d394307b36743c2f5d504b47" dependencies = [ - "indexmap", + "indexmap 2.6.0", "itoa", "ryu", "serde", @@ -4942,6 +5385,12 @@ dependencies = [ "rand_core", ] +[[package]] +name = "siphasher" +version = "0.3.11" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "38b58827f4464d87d377d175e90bf58eb00fd8716ff0a62f80356b5e61555d0d" + [[package]] name = "slab" version = "0.4.9" @@ -5386,6 +5835,16 @@ dependencies = [ "windows-sys 0.52.0", ] +[[package]] +name = "tokio-io-timeout" +version = "1.2.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "30b74022ada614a1b4834de765f9bb43877f910cc8ce4be40e89042c9223a8bf" +dependencies = [ + "pin-project-lite 0.2.15", + "tokio 1.41.1", +] + [[package]] name = "tokio-macros" version = "2.4.0" @@ -5418,6 +5877,17 @@ dependencies = [ "tokio 1.41.1", ] +[[package]] +name = "tokio-rustls" +version = "0.25.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "775e0c0f0adb3a2f22a00c4745d728b479985fc15ee7ca6a2608388c5569860f" +dependencies = [ + "rustls 0.22.4", + "rustls-pki-types", + "tokio 1.41.1", +] + [[package]] name = "tokio-rustls" version = "0.26.0" @@ -5463,11 +5933,11 @@ dependencies = [ "log", "native-tls", "rustls 0.23.15", - "rustls-native-certs", + "rustls-native-certs 0.8.0", "rustls-pki-types", "tokio 1.41.1", "tokio-native-tls", - "tokio-rustls", + "tokio-rustls 0.26.0", "tungstenite", ] @@ -5512,13 +5982,60 @@ version = "0.22.22" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "4ae48d6208a266e853d946088ed816055e556cc6028c5e8e2b84d9fa5dd7c7f5" dependencies = [ - "indexmap", + "indexmap 2.6.0", "serde", "serde_spanned", "toml_datetime", "winnow", ] +[[package]] +name = "tonic" +version = "0.11.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "76c4eb7a4e9ef9d4763600161f12f5070b92a578e1b634db88a6887844c91a13" +dependencies = [ + "async-stream", + "async-trait", + "axum 0.6.20", + "base64 0.21.7", + "bytes 1.8.0", + "h2 0.3.26", + "http 0.2.12", + "http-body 0.4.6", + "hyper 0.14.31", + "hyper-timeout", + "percent-encoding", + "pin-project 1.1.7", + "prost", + "tokio 1.41.1", + "tokio-stream", + "tower 0.4.13", + "tower-layer", + "tower-service", + "tracing", +] + +[[package]] +name = "tonic-web" +version = "0.11.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "dc3b0e1cedbf19fdfb78ef3d672cb9928e0a91a9cb4629cc0c916e8cff8aaaa1" +dependencies = [ + "base64 0.21.7", + "bytes 1.8.0", + "http 0.2.12", + "http-body 0.4.6", + "hyper 0.14.31", + "pin-project 1.1.7", + "tokio-stream", + "tonic", + "tower-http 0.4.4", + "tower-layer", + "tower-service", + "tracing", +] + [[package]] name = "tower" version = "0.3.1" @@ -5537,6 +6054,26 @@ dependencies = [ "tower-util", ] +[[package]] +name = "tower" +version = "0.4.13" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "b8fa9be0de6cf49e536ce1851f987bd21a43b771b09473c3549a6c853db37c1c" +dependencies = [ + "futures-core", + "futures-util", + "indexmap 1.9.3", + "pin-project 1.1.7", + "pin-project-lite 0.2.15", + "rand", + "slab", + "tokio 1.41.1", + "tokio-util", + "tower-layer", + "tower-service", + "tracing", +] + [[package]] name = "tower" version = "0.5.1" @@ -5578,6 +6115,26 @@ dependencies = [ "tower-service", ] +[[package]] +name = "tower-http" +version = "0.4.4" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "61c5bb1d698276a2443e5ecfabc1008bf15a36c12e6a7176e7bf089ea9131140" +dependencies = [ + "bitflags 2.6.0", + "bytes 1.8.0", + "futures-core", + "futures-util", + "http 0.2.12", + "http-body 0.4.6", + "http-range-header 0.3.1", + "pin-project-lite 0.2.15", + "tower 0.4.13", + "tower-layer", + "tower-service", + "tracing", +] + [[package]] name = "tower-http" version = "0.5.2" @@ -5590,7 +6147,7 @@ dependencies = [ "http 1.1.0", "http-body 1.0.1", "http-body-util", - "http-range-header", + "http-range-header 0.4.1", "httpdate", "mime", "mime_guess", @@ -5841,12 +6398,33 @@ version = "1.17.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "42ff0bf0c66b8238c6f3b578df37d0b7848e55df8577b3f74f92a69acceeb825" +[[package]] +name = "ulid" +version = "1.1.3" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "04f903f293d11f31c0c29e4148f6dc0d033a7f80cebc0282bea147611667d289" +dependencies = [ + "getrandom", + "rand", + "uuid", + "web-time", +] + [[package]] name = "unarray" version = "0.1.4" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "eaea85b334db583fe3274d12b4cd1880032beab409c0d774be044d4480ab9a94" +[[package]] +name = "uncased" +version = "0.9.10" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "e1b88fcfe09e89d3866a5c11019378088af2d24c3fbd4f0543f96b479ec90697" +dependencies = [ + "version_check", +] + [[package]] name = "unicase" version = "2.8.0" @@ -5929,7 +6507,7 @@ version = "4.2.3" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "c5afb1a60e207dca502682537fefcfd9921e71d0b83e9576060f09abc6efab23" dependencies = [ - "indexmap", + "indexmap 2.6.0", "serde", "serde_json", "serde_yaml", @@ -6121,6 +6699,16 @@ dependencies = [ "wasm-bindgen", ] +[[package]] +name = "web-time" +version = "1.1.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "5a6580f308b1fad9207618087a65c04e7a10bc77e02c8e84e9b00dd4b12fa0bb" +dependencies = [ + "js-sys", + "wasm-bindgen", +] + [[package]] name = "webpki" version = "0.22.4" @@ -6131,6 +6719,15 @@ dependencies = [ "untrusted 0.9.0", ] +[[package]] +name = "webpki-roots" +version = "0.26.6" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "841c67bff177718f1d4dfefde8d8f0e78f9b6589319ba88312f567fc5841a958" +dependencies = [ + "rustls-pki-types", +] + [[package]] name = "which" version = "4.4.2" diff --git a/crates/job-queue-libsql/Cargo.toml b/crates/job-queue-libsql/Cargo.toml new file mode 100644 index 000000000..05d9dc527 --- /dev/null +++ b/crates/job-queue-libsql/Cargo.toml @@ -0,0 +1,20 @@ +[package] +name = "job-queue-libsql" +version = "0.0.0" +edition = "2021" +authors = ["Devolutions Inc. "] +publish = false + +[lints] +workspace = true + +[dependencies] +job-queue = { path = "../job-queue" } +typed-builder = "0.19" +serde = "1" +ulid = { version = "1.1", features = ["uuid"] } +uuid = "1.11" +anyhow = "1" +async-trait = "0.1" +tracing = "0.1" +libsql = "0.6" diff --git a/crates/job-queue-libsql/src/lib.rs b/crates/job-queue-libsql/src/lib.rs new file mode 100644 index 000000000..65bad6d6b --- /dev/null +++ b/crates/job-queue-libsql/src/lib.rs @@ -0,0 +1,319 @@ +#[macro_use] +extern crate tracing; + +use anyhow::Context as _; +use async_trait::async_trait; +use job_queue::{DynJob, JobCtx, JobQueue, JobReader, RunnerWaker}; +use libsql::Connection; +use ulid::Ulid; +use uuid::Uuid; + +pub use libsql; + +/// Implementation of [`JobQueue`] using libSQL as the backend +/// +/// This is inspired by 37signals' Solid Queue: +/// - +/// - +/// +/// And "How to build a job queue with Rust and PostgreSQL" on kerkour.com: +/// - +/// +/// We use the 'user_version' value to store the migration state. +/// It's a very lightweight approach as it is just an integer at a fixed offset in the SQLite file. +/// - +/// - +#[derive(typed_builder::TypedBuilder)] +pub struct LibSqlJobQueue { + instance_id: Uuid, + runner_waker: RunnerWaker, + conn: Connection, + #[builder(default = 5)] + max_attempts: u32, +} + +#[derive(Debug, Clone, PartialEq)] +#[repr(u32)] +enum JobStatus { + Queued, + Running, +} + +impl LibSqlJobQueue { + async fn query_user_version(&self) -> anyhow::Result { + let sql_query = "PRAGMA user_version"; + + trace!(%sql_query, "Query user_version"); + + let row = self + .conn + .query(sql_query, ()) + .await + .context("failed to execute SQL query")? + .next() + .await + .context("failed to read the row")? + .context("no row returned")?; + + let value = row.get::(0).context("failed to read user_version value")?; + + Ok(usize::try_from(value).expect("number not too big")) + } + + async fn update_user_version(&self, value: usize) -> anyhow::Result<()> { + let value = u64::try_from(value).expect("number not too big"); + + let sql_query = format!("PRAGMA user_version = {value}"); + + trace!(%sql_query, "Update user_version"); + + self.conn + .execute(&sql_query, ()) + .await + .context("failed to execute SQL query")?; + + Ok(()) + } +} + +#[async_trait] +impl JobQueue for LibSqlJobQueue { + async fn migrate(&self) -> anyhow::Result<()> { + const MIGRATIONS: &[&str] = &["CREATE TABLE job_queue ( + id UUID NOT NULL PRIMARY KEY, + instance_id UUID NOT NULL, + created_at TIMESTAMP NOT NULL DEFAULT CURRENT_TIMESTAMP, + updated_at TIMESTAMP NOT NULL DEFAULT CURRENT_TIMESTAMP, + failed_attempts INT NOT NULL, + status INT NOT NULL, + name TEXT NOT NULL, + def JSONB NOT NULL + ); + + CREATE TRIGGER update_job_updated_at_on_update AFTER UPDATE ON job_queue + BEGIN + UPDATE job_queue SET updated_at = CURRENT_TIMESTAMP WHERE rowid == NEW.rowid; + END;"]; + + let user_version = self.query_user_version().await?; + + match MIGRATIONS.get(user_version..) { + Some(remaining) if !remaining.is_empty() => { + info!( + user_version, + migration_count = MIGRATIONS.len() - user_version, + "Start migration" + ); + + for (sql_query, migration_id) in remaining.iter().zip(user_version..MIGRATIONS.len()) { + trace!(migration_id, %sql_query, "Apply migration"); + + self.conn + .execute(sql_query, ()) + .await + .with_context(|| format!("failed to execute migration {}", migration_id))?; + + trace!(migration_id, "Applied migration"); + + self.update_user_version(migration_id + 1) + .await + .context("failed to update user version")?; + } + + info!("Migration complete"); + } + None => { + warn!(user_version, "user_version is set to an unexpected value"); + } + _ => { + debug!(user_version, "Database is already up to date"); + } + } + + Ok(()) + } + + async fn reset_claimed_jobs(&self) -> anyhow::Result<()> { + let sql_query = "UPDATE job_queue SET status = :queued_status WHERE status = :running_status"; + + let params = ( + (":running_status", JobStatus::Running as u32), + (":queued_status", JobStatus::Queued as u32), + ); + + trace!(%sql_query, ?params, "Reset claimed jobs"); + + let changed_count = self + .conn + .execute(sql_query, params) + .await + .context("failed to execute SQL query")?; + + trace!(changed_count, "Jobs reset with success"); + + Ok(()) + } + + async fn push_job(&self, job: &DynJob) -> anyhow::Result<()> { + let sql_query = "INSERT INTO job_queue + (id, instance_id, failed_attempts, status, name, def) + VALUES (:id, :instance_id, :failed_attempts, :status, :name, jsonb(:def))"; + + // UUID v4 provides no other information than randomness which cause fragmentation. + // Reduce index fragmentation by using ULID instead. + // https://github.com/ulid/spec + let id = Uuid::from(Ulid::new()); + + let params = ( + (":id", id.to_string()), + (":instance_id", self.instance_id.to_string()), + (":failed_attempts", 0), + (":status", JobStatus::Queued as u32), + (":name", job.name()), + (":def", job.write_json()?), + ); + + trace!(%sql_query, ?params, "Pushing a new job"); + + self.conn + .execute(sql_query, params) + .await + .context("failed to execute SQL query")?; + + // Notify the waker that a new job is ready for processing. + self.runner_waker.wake(); + + Ok(()) + } + + async fn claim_jobs(&self, reader: &dyn JobReader, number_of_jobs: usize) -> anyhow::Result> { + let number_of_jobs = u32::try_from(number_of_jobs).context("number_of_jobs is too big")?; + + // If we were using Postgres, we would need to use `FOR UPDATE SKIP LOCKED` + // in the SQL query to avoid blocking other readers/writers. + // For MySQL, this would be `FOR UPDATE NOWAIT` + // However, in SQLite / libSQL, there is only a single writer at a time. + // As such, this directive doesn't exist. + + let sql_query = "UPDATE job_queue + SET status = :new_status + WHERE id IN ( + SELECT id + FROM job_queue + WHERE instance_id = :instance_id AND status = :current_status AND failed_attempts < :max_attempts + ORDER BY id + LIMIT :number_of_jobs + ) + RETURNING id, name, json(def) as def"; + + let params = ( + (":new_status", JobStatus::Running as u32), + (":instance_id", self.instance_id.to_string()), + (":current_status", JobStatus::Queued as u32), + (":max_attempts", self.max_attempts), + (":number_of_jobs", number_of_jobs), + ); + + trace!(%sql_query, ?params, "Claiming jobs"); + + let mut rows = self + .conn + .query(sql_query, params) + .await + .context("failed to execute SQL query")?; + + let mut jobs = Vec::new(); + + loop { + let row = rows.next().await; + + let row = match row { + Ok(row) => row, + Err(error) => { + error!(%error, "Failed to get next row"); + break; + } + }; + + let Some(row) = row else { + break; + }; + + match libsql::de::from_row::<'_, JobModel>(&row) { + Ok(model) => match reader.read_json(&model.name, &model.def) { + Ok(job) => jobs.push(JobCtx { id: model.id, job }), + Err(e) => { + error!( + error = format!("{e:#}"), + "Failed read job definition; delete the invalid job" + ); + let _ = self.delete_job(model.id).await; + } + }, + Err(error) => { + error!(%error, ?row, "Failed to read row"); + } + } + } + + return Ok(jobs); + + #[derive(serde::Deserialize, Debug, Clone)] + struct JobModel { + id: Uuid, + name: String, + def: String, + } + } + + async fn delete_job(&self, id: Uuid) -> anyhow::Result<()> { + let sql_query = "DELETE FROM job_queue WHERE id = $1"; + let params = [id.to_string()]; + + trace!(%sql_query, ?params, "Deleting job"); + + self.conn + .execute(sql_query, params) + .await + .context("failed to execute SQL query")?; + + Ok(()) + } + + async fn fail_job(&self, id: Uuid) -> anyhow::Result<()> { + let sql_query = "UPDATE job_queue + SET status = :new_status, failed_attempts = failed_attempts + 1 + WHERE id = :id"; + let params = ((":new_status", JobStatus::Queued as u32), (":id", id.to_string())); + + trace!(%sql_query, ?params, "Marking job as failed"); + + self.conn + .execute(sql_query, params) + .await + .context("failed to execute SQL query")?; + + Ok(()) + } + + async fn clear_failed(&self) -> anyhow::Result<()> { + let sql_query = "DELETE FROM job_queue WHERE instance_id = :instance_id AND failed_attempts >= :max_attempts"; + + let params = ( + (":instance_id", self.instance_id.to_string()), + (":max_attempts", self.max_attempts), + ); + + trace!(%sql_query, ?params, "Clearing failed jobs"); + + let deleted_count = self + .conn + .execute(sql_query, params) + .await + .context("failed to execute SQL query")?; + + trace!(deleted_count, "Cleared failed jobs with success"); + + Ok(()) + } +} diff --git a/crates/job-queue/Cargo.toml b/crates/job-queue/Cargo.toml new file mode 100644 index 000000000..ca46a673a --- /dev/null +++ b/crates/job-queue/Cargo.toml @@ -0,0 +1,15 @@ +[package] +name = "job-queue" +version = "0.0.0" +edition = "2021" +authors = ["Devolutions Inc. "] +publish = false + +[lints] +workspace = true + +[dependencies] +uuid = "1.11" +anyhow = "1" +async-trait = "0.1" +tracing = "0.1" diff --git a/crates/job-queue/src/lib.rs b/crates/job-queue/src/lib.rs new file mode 100644 index 000000000..d419d1219 --- /dev/null +++ b/crates/job-queue/src/lib.rs @@ -0,0 +1,182 @@ +#[macro_use] +extern crate tracing; + +use std::future::Future; +use std::pin::Pin; +use std::sync::Arc; + +use async_trait::async_trait; +use uuid::Uuid; + +pub type DynJob = Box; + +pub type DynJobQueue = Arc; + +#[async_trait] +pub trait Job: Send + Sync { + fn name(&self) -> &str; + + fn write_json(&self) -> anyhow::Result; + + /// Run the associated job + /// + /// You should assume that the execution could be stopped at any point and write cancel-safe code. + async fn run(&mut self) -> anyhow::Result<()>; +} + +pub trait JobReader: Send + Sync { + fn read_json(&self, name: &str, json: &str) -> anyhow::Result; +} + +#[async_trait] +pub trait JobQueue: Send + Sync { + /// Performs migrations as required + /// + /// This function should be called first, before using any of the other functions. + async fn migrate(&self) -> anyhow::Result<()>; + + /// Resets the status for the jobs claimed + /// + /// Uses this at startup to re-enqueue jobs that didn't run to completion. + async fn reset_claimed_jobs(&self) -> anyhow::Result<()>; + + /// Pushes a new job into the queue + /// + /// This function should ideally call `RunnerWaker::wake()` once the job is enqueued. + async fn push_job(&self, job: &DynJob) -> anyhow::Result<()>; + + /// Fetches at most `number_of_jobs` from the queue + async fn claim_jobs(&self, reader: &dyn JobReader, number_of_jobs: usize) -> anyhow::Result>; + + /// Removes a job from the queue + async fn delete_job(&self, job_id: Uuid) -> anyhow::Result<()>; + + /// Marks a job as failed + /// + /// Failed jobs are re-queued to be tried again later. + async fn fail_job(&self, job_id: Uuid) -> anyhow::Result<()>; + + /// Removes jobs which can't be retried + async fn clear_failed(&self) -> anyhow::Result<()>; +} + +pub struct JobCtx { + pub id: Uuid, + pub job: DynJob, +} + +#[derive(Clone)] +pub struct RunnerWaker(Arc); + +impl RunnerWaker { + pub fn new(f: F) -> Self { + Self(Arc::new(f)) + } + + pub fn wake(&self) { + (self.0)() + } +} + +pub type SpawnCallback = Box) -> Pin + Send>> + Send>; + +pub type DynFuture = Pin + Send>>; + +pub struct JobRunner<'a> { + pub queue: DynJobQueue, + pub reader: &'a dyn JobReader, + pub spawn: &'a (dyn Fn(JobCtx, SpawnCallback) + Sync), + pub sleep: &'a (dyn Fn(std::time::Duration) -> DynFuture + Sync), + pub wait_notified: &'a (dyn Fn() -> DynFuture + Sync), + pub waker: RunnerWaker, + pub max_batch_size: usize, +} + +impl JobRunner<'_> { + pub async fn run(self) { + use std::sync::atomic::{AtomicUsize, Ordering}; + use std::time::{Duration, Instant}; + + const MINIMUM_WAIT_DURATION: Duration = Duration::from_millis(200); + + let Self { + queue, + reader, + spawn, + sleep, + waker, + wait_notified, + max_batch_size, + } = self; + + let running_count: Arc = Arc::new(AtomicUsize::new(0)); + + loop { + let batch_size = max_batch_size - running_count.load(Ordering::SeqCst); + + let jobs = match queue.claim_jobs(reader, batch_size).await { + Ok(jobs) => jobs, + Err(e) => { + error!(error = format!("{e:#}"), "Failed to pull jobs"); + (sleep)(Duration::from_secs(10)).await; + continue; + } + }; + + let number_of_jobs = jobs.len(); + if number_of_jobs > 0 { + trace!(number_of_jobs, "Fetched jobs"); + } + + for job in jobs { + let job_id = job.id; + + let callback = Box::new({ + let queue = Arc::clone(&queue); + let running_count = Arc::clone(&running_count); + let waker = waker.clone(); + + move |result: anyhow::Result<()>| { + let fut = async move { + match result { + Ok(()) => { + if let Err(e) = queue.delete_job(job_id).await { + error!(error = format!("{e:#}"), "Failed to delete job"); + } + } + Err(e) => { + warn!(error = format!("{e:#}"), %job_id, "Job failed"); + + if let Err(e) = queue.fail_job(job_id).await { + error!(error = format!("{e:#}"), "Failed to mark job as failed") + } + } + } + + running_count.fetch_sub(1, Ordering::SeqCst); + waker.wake(); + }; + + (Box::new(fut) as Box + Send>).into() + } + }); + + (spawn)(job, callback); + + running_count.fetch_add(1, Ordering::SeqCst); + } + + // Wait for something to happen. + // This could be a notification that a new job has been pushed, or that a running job is terminated. + let before_wait = Instant::now(); + (wait_notified)().await; + let elapsed = before_wait.elapsed(); + + // Make sure we wait a little bit to avoid overloading the database. + if elapsed < MINIMUM_WAIT_DURATION { + let sleep_duration = MINIMUM_WAIT_DURATION - elapsed; + (sleep)(sleep_duration).await; + } + } + } +} diff --git a/devolutions-gateway/Cargo.toml b/devolutions-gateway/Cargo.toml index 3a414bd0c..167fb365a 100644 --- a/devolutions-gateway/Cargo.toml +++ b/devolutions-gateway/Cargo.toml @@ -24,6 +24,8 @@ jmux-proxy = { path = "../crates/jmux-proxy" } devolutions-agent-shared = { path = "../crates/devolutions-agent-shared" } devolutions-gateway-task = { path = "../crates/devolutions-gateway-task" } devolutions-log = { path = "../crates/devolutions-log" } +job-queue = { path = "../crates/job-queue" } +job-queue-libsql = { path = "../crates/job-queue-libsql" } ironrdp-pdu = { version = "0.1", git = "https://github.com/Devolutions/IronRDP", rev = "7c268d863048d0a9182b3f7bf778668de8db4ccf", features = ["std"] } ironrdp-core = { version = "0.1", git = "https://github.com/Devolutions/IronRDP", rev = "7c268d863048d0a9182b3f7bf778668de8db4ccf", features = ["std"] } ironrdp-rdcleanpath = { version = "0.1", git = "https://github.com/Devolutions/IronRDP", rev = "7c268d863048d0a9182b3f7bf778668de8db4ccf" } diff --git a/devolutions-gateway/src/api/jrec.rs b/devolutions-gateway/src/api/jrec.rs index c7c6d2881..6753ffb79 100644 --- a/devolutions-gateway/src/api/jrec.rs +++ b/devolutions-gateway/src/api/jrec.rs @@ -158,8 +158,8 @@ pub(crate) struct DeleteManyResult { /// Mass-deletes recordings stored on this instance /// -/// If you try to delete more than 1,000,000 recordings at once, you should split the list into multiple requests -/// to avoid timing out during the processing of the request. +/// If you try to delete more than 50,000 recordings at once, you should split the list into multiple requests. +/// Bigger payloads will be rejected with 413 Payload Too Large. /// /// The request processing consist in /// 1) checking if one of the recording is active, @@ -182,6 +182,7 @@ pub(crate) struct DeleteManyResult { (status = 401, description = "Invalid or missing authorization token"), (status = 403, description = "Insufficient permissions"), (status = 406, description = "A recording is still ongoing and can't be deleted yet (nothing is deleted)"), + (status = 413, description = "Request payload is too large"), ), security(("scope_token" = ["gateway.recording.delete"])), ))] @@ -189,6 +190,7 @@ async fn jrec_delete_many( State(DgwState { conf_handle, recordings, + job_queue_handle, .. }): State, _scope: RecordingDeleteScope, @@ -196,38 +198,36 @@ async fn jrec_delete_many( ) -> Result, HttpError> { use std::collections::HashSet; - const BLOCKING_THRESHOLD: usize = 100_000; + const THRESHOLD: usize = 50_000; + const CHUNK_SIZE: usize = 1_000; + + if delete_list.len() > THRESHOLD { + return Err(HttpErrorBuilder::new(StatusCode::PAYLOAD_TOO_LARGE).msg("delete list is too big")); + } let recording_path = conf_handle.get_conf().recording_path.clone(); let active_recordings = recordings.active_recordings.cloned(); - // When deleting many many recordings, check_preconditions may take more than 250ms to execute. - // For this reason, we defensively spawn a blocking task. + // Given the threshold of 50,000, it's high unlikely that check_preconditions takes more than 250ms to execute. + // It typically takes between 50ms and 100ms depending on the hardware. let ProcessResult { not_found_count, found_count, recording_paths, - } = if delete_list.len() > BLOCKING_THRESHOLD { - let join_handle = - tokio::task::spawn_blocking(move || process_request(delete_list, &recording_path, &active_recordings)); - join_handle.await.map_err(HttpError::internal().err())?? - } else { - process_request(delete_list, &recording_path, &active_recordings)? - }; + } = process_request(delete_list, &recording_path, &active_recordings)?; - // FIXME: It would be better to have a job queue for this kind of things in case the service is killed. - tokio::spawn({ - async move { - for (session_id, path) in recording_paths { - if let Err(error) = delete_recording(&path).await { - error!( - error = format!("{error:#}"), - "Failed to delete recording for session {session_id}" - ); - } - } - } - }); + for chunk in recording_paths.chunks(CHUNK_SIZE) { + job_queue_handle + .enqueue(DeleteRecordingsJob { + recording_paths: chunk.to_vec(), + }) + .await + .map_err( + HttpError::internal() + .with_msg("couldn't enqueue the deletion task") + .err(), + )?; + } let delete_many_result = DeleteManyResult { found_count, @@ -283,6 +283,39 @@ async fn jrec_delete_many( } } +#[derive(Deserialize, Serialize)] +pub struct DeleteRecordingsJob { + recording_paths: Vec<(Uuid, Utf8PathBuf)>, +} + +impl DeleteRecordingsJob { + pub const NAME: &'static str = "delete-recordings"; +} + +#[axum::async_trait] +impl job_queue::Job for DeleteRecordingsJob { + fn name(&self) -> &str { + Self::NAME + } + + fn write_json(&self) -> anyhow::Result { + serde_json::to_string(self).context("failed to serialize RemuxAction") + } + + async fn run(&mut self) -> anyhow::Result<()> { + for (session_id, path) in core::mem::take(&mut self.recording_paths) { + if let Err(error) = delete_recording(&path).await { + debug!( + error = format!("{error:#}"), + "Failed to delete recording for session {session_id}" + ); + } + } + + Ok(()) + } +} + async fn delete_recording(recording_path: &Utf8Path) -> anyhow::Result<()> { info!(%recording_path, "Delete recording"); diff --git a/devolutions-gateway/src/config.rs b/devolutions-gateway/src/config.rs index 7563a05e3..466773cc5 100644 --- a/devolutions-gateway/src/config.rs +++ b/devolutions-gateway/src/config.rs @@ -72,6 +72,7 @@ pub struct Conf { pub listeners: Vec, pub subscriber: Option, pub log_file: Utf8PathBuf, + pub job_queue_database: Utf8PathBuf, pub tls: Option, pub provisioner_public_key: PublicKey, pub provisioner_private_key: Option, @@ -221,6 +222,12 @@ impl Conf { .unwrap_or_else(|| Utf8PathBuf::from("gateway")) .pipe_ref(|path| normalize_data_path(path, &data_dir)); + let job_queue_database = conf_file + .job_queue_database + .clone() + .unwrap_or_else(|| Utf8PathBuf::from("job_queue.db")) + .pipe_ref(|path| normalize_data_path(path, &data_dir)); + let jrl_file = conf_file .jrl_file .clone() @@ -277,6 +284,7 @@ impl Conf { listeners, subscriber: conf_file.subscriber.clone(), log_file, + job_queue_database, tls, provisioner_public_key, provisioner_private_key, @@ -1000,6 +1008,10 @@ pub mod dto { #[serde(skip_serializing_if = "Option::is_none")] pub sogar: Option, + /// (Unstable) Path to the SQLite database file for the job queue + #[serde(skip_serializing_if = "Option::is_none")] + pub job_queue_database: Option, + /// (Unstable) Unsafe debug options for developers #[serde(rename = "__debug__", skip_serializing_if = "Option::is_none")] pub debug: Option, @@ -1049,6 +1061,7 @@ pub mod dto { recording_path: None, web_app: None, sogar: None, + job_queue_database: None, debug: None, rest: serde_json::Map::new(), } @@ -1076,7 +1089,7 @@ pub mod dto { match self { VerbosityProfile::Default => "info", VerbosityProfile::Debug => { - "info,devolutions_gateway=debug,devolutions_gateway::api=trace,jmux_proxy=debug,tower_http=trace" + "info,devolutions_gateway=debug,devolutions_gateway::api=trace,jmux_proxy=debug,tower_http=trace,job_queue=trace,job_queue_libsql=trace" } VerbosityProfile::Tls => { "info,devolutions_gateway=debug,devolutions_gateway::tls=trace,rustls=trace,tokio_rustls=debug" diff --git a/devolutions-gateway/src/job_queue.rs b/devolutions-gateway/src/job_queue.rs new file mode 100644 index 000000000..353802550 --- /dev/null +++ b/devolutions-gateway/src/job_queue.rs @@ -0,0 +1,248 @@ +use std::sync::Arc; +use std::time::Duration; +use std::{future::Future, path::Path}; + +use anyhow::Context as _; +use axum::async_trait; +use devolutions_gateway_task::{ChildTask, ShutdownSignal, Task}; +use job_queue::{DynJobQueue, Job, JobCtx, JobQueue, JobReader, JobRunner, RunnerWaker}; +use job_queue_libsql::libsql; +use tokio::sync::{mpsc, Notify}; +use uuid::Uuid; + +pub struct JobQueueCtx { + notify_runner: Arc, + runner_waker: RunnerWaker, + queue: DynJobQueue, + job_queue_rx: JobQueueReceiver, + pub job_queue_handle: JobQueueHandle, +} + +#[derive(Clone)] +pub struct JobQueueHandle(mpsc::Sender>); + +pub type JobQueueReceiver = mpsc::Receiver>; + +pub struct JobQueueTask { + queue: DynJobQueue, + job_queue_rx: JobQueueReceiver, +} + +pub struct JobRunnerTask { + notify_runner: Arc, + runner_waker: RunnerWaker, + queue: DynJobQueue, +} + +impl JobQueueCtx { + pub async fn init(gateway_id: Uuid, database_path: &Path) -> anyhow::Result { + let notify_runner = Arc::new(Notify::new()); + + let runner_waker = RunnerWaker::new({ + let notify_runner = Arc::clone(¬ify_runner); + move || notify_runner.notify_one() + }); + + let database = libsql::Builder::new_local(database_path) + .build() + .await + .context("build database")?; + + let conn = database.connect().context("open database connection")?; + + let queue = job_queue_libsql::LibSqlJobQueue::builder() + .instance_id(gateway_id) + .runner_waker(runner_waker.clone()) + .conn(conn) + .build(); + + let queue = Arc::new(queue); + + queue.migrate().await.context("database migration")?; + + queue + .reset_claimed_jobs() + .await + .context("failed to reset claimed jobs")?; + + queue.clear_failed().await.context("failed to clear failed jobs")?; + + let (handle, rx) = JobQueueHandle::new(); + + Ok(Self { + notify_runner, + runner_waker, + queue, + job_queue_rx: rx, + job_queue_handle: handle, + }) + } +} + +impl JobQueueHandle { + pub fn new() -> (Self, JobQueueReceiver) { + let (tx, rx) = mpsc::channel(512); + (Self(tx), rx) + } + + pub fn blocking_enqueue(&self, job: T) -> anyhow::Result<()> { + self.0.blocking_send(Box::new(job)).context("couldn't enqueue job") + } + + pub async fn enqueue(&self, job: T) -> anyhow::Result<()> { + self.0.send(Box::new(job)).await.context("couldn't enqueue job") + } +} + +impl JobQueueTask { + pub fn new(ctx: JobQueueCtx) -> Self { + Self { + queue: ctx.queue, + job_queue_rx: ctx.job_queue_rx, + } + } +} + +#[async_trait] +impl Task for JobQueueTask { + type Output = anyhow::Result<()>; + + const NAME: &'static str = "job queue"; + + async fn run(self, shutdown_signal: ShutdownSignal) -> Self::Output { + job_queue_task(self, shutdown_signal).await + } +} + +#[instrument(skip_all)] +async fn job_queue_task(ctx: JobQueueTask, mut shutdown_signal: ShutdownSignal) -> anyhow::Result<()> { + debug!("Task started"); + + let JobQueueTask { + queue, + mut job_queue_rx, + } = ctx; + + loop { + tokio::select! { + job = job_queue_rx.recv() => { + let Some(job) = job else { + debug!("All senders are dead"); + break; + }; + + ChildTask::spawn({ + let queue = Arc::clone(&queue); + + async move { + for _ in 0..5 { + match queue.push_job(&job).await { + Ok(()) => break, + Err(e) => { + warn!(error = format!("{e:#}"), "Failed to push job"); + tokio::time::sleep(Duration::from_secs(20)).await; + } + } + } + } + }) + .detach(); + } + () = shutdown_signal.wait() => break, + } + } + + debug!("Task terminated"); + + Ok(()) +} + +impl JobRunnerTask { + pub fn new(ctx: &JobQueueCtx) -> Self { + Self { + notify_runner: Arc::clone(&ctx.notify_runner), + runner_waker: RunnerWaker::clone(&ctx.runner_waker), + queue: Arc::clone(&ctx.queue), + } + } +} + +#[async_trait] +impl Task for JobRunnerTask { + type Output = anyhow::Result<()>; + + const NAME: &'static str = "job queue"; + + async fn run(self, shutdown_signal: ShutdownSignal) -> Self::Output { + job_runner_task(self, shutdown_signal).await + } +} + +#[instrument(skip_all)] +async fn job_runner_task(ctx: JobRunnerTask, mut shutdown_signal: ShutdownSignal) -> anyhow::Result<()> { + debug!("Task started"); + + let JobRunnerTask { + notify_runner, + runner_waker, + queue, + } = ctx; + + let reader = DgwJobReader; + + let spawn = |mut ctx: JobCtx, callback: job_queue::SpawnCallback| { + tokio::spawn(async move { + let result = ctx.job.run().await; + (callback)(result).await; + }); + }; + + let sleep = + |duration: Duration| (Box::new(tokio::time::sleep(duration)) as Box + Send>).into(); + + let wait_notified = move || { + let notify_runner = Arc::clone(¬ify_runner); + (Box::new(async move { notify_runner.notified().await }) as Box + Send>).into() + }; + + let runner = JobRunner { + queue, + reader: &reader, + spawn: &spawn, + sleep: &sleep, + wait_notified: &wait_notified, + waker: runner_waker, + max_batch_size: 3, + }; + + tokio::select! { + () = runner.run() => {} + () = shutdown_signal.wait() => {} + } + + debug!("Task terminated"); + + Ok(()) +} + +struct DgwJobReader; + +impl JobReader for DgwJobReader { + fn read_json(&self, name: &str, json: &str) -> anyhow::Result { + use crate::api::jrec::DeleteRecordingsJob; + use crate::recording::RemuxJob; + + match name { + RemuxJob::NAME => { + let job: RemuxJob = serde_json::from_str(json).context("failed to deserialize RemuxJob")?; + Ok(Box::new(job)) + } + DeleteRecordingsJob::NAME => { + let job: DeleteRecordingsJob = + serde_json::from_str(json).context("failed to deserialize DeleteRecordingsJob")?; + Ok(Box::new(job)) + } + _ => anyhow::bail!("unknown job name: {name}"), + } + } +} diff --git a/devolutions-gateway/src/lib.rs b/devolutions-gateway/src/lib.rs index 559e774a7..5caa8680a 100644 --- a/devolutions-gateway/src/lib.rs +++ b/devolutions-gateway/src/lib.rs @@ -20,6 +20,7 @@ pub mod generic_client; pub mod http; pub mod interceptor; pub mod jmux; +pub mod job_queue; pub mod listener; pub mod log; pub mod middleware; @@ -48,6 +49,7 @@ pub struct DgwState { pub subscriber_tx: subscriber::SubscriberSender, pub shutdown_signal: devolutions_gateway_task::ShutdownSignal, pub recordings: recording::RecordingMessageSender, + pub job_queue_handle: job_queue::JobQueueHandle, } #[doc(hidden)] @@ -55,6 +57,7 @@ pub struct MockHandles { pub session_manager_rx: session::SessionMessageReceiver, pub recording_manager_rx: recording::RecordingMessageReceiver, pub subscriber_rx: subscriber::SubscriberReceiver, + pub job_queue_rx: job_queue::JobQueueReceiver, pub shutdown_handle: devolutions_gateway_task::ShutdownHandle, } @@ -68,6 +71,7 @@ impl DgwState { let (recording_manager_handle, recording_manager_rx) = recording::recording_message_channel(); let (subscriber_tx, subscriber_rx) = subscriber::subscriber_channel(); let (shutdown_handle, shutdown_signal) = devolutions_gateway_task::ShutdownHandle::new(); + let (job_queue_handle, job_queue_rx) = job_queue::JobQueueHandle::new(); let state = Self { conf_handle, @@ -77,12 +81,14 @@ impl DgwState { subscriber_tx, shutdown_signal, recordings: recording_manager_handle, + job_queue_handle, }; let handles = MockHandles { session_manager_rx, recording_manager_rx, subscriber_rx, + job_queue_rx, shutdown_handle, }; diff --git a/devolutions-gateway/src/main.rs b/devolutions-gateway/src/main.rs index 00271b760..34551fa0d 100644 --- a/devolutions-gateway/src/main.rs +++ b/devolutions-gateway/src/main.rs @@ -8,12 +8,13 @@ use rustls_cng as _; use utoipa as _; use { argon2 as _, async_trait as _, axum as _, axum_extra as _, backoff as _, bytes as _, camino as _, - devolutions_agent_shared as _, dlopen as _, dlopen_derive as _, etherparse as _, hostname as _, - http_body_util as _, hyper as _, hyper_util as _, ironrdp_pdu as _, ironrdp_rdcleanpath as _, jmux_proxy as _, - multibase as _, network_scanner as _, ngrok as _, nonempty as _, pcap_file as _, picky as _, picky_krb as _, - pin_project_lite as _, portpicker as _, reqwest as _, serde as _, serde_urlencoded as _, smol_str as _, - sysinfo as _, thiserror as _, time as _, tokio_rustls as _, tokio_tungstenite as _, tower as _, tower_http as _, - transport as _, tungstenite as _, typed_builder as _, url as _, uuid as _, zeroize as _, + devolutions_agent_shared as _, dlopen as _, dlopen_derive as _, dunce as _, etherparse as _, hostname as _, + http_body_util as _, hyper as _, hyper_util as _, ironrdp_core as _, ironrdp_pdu as _, ironrdp_rdcleanpath as _, + jmux_proxy as _, job_queue as _, job_queue_libsql as _, multibase as _, network_scanner as _, ngrok as _, + nonempty as _, pcap_file as _, picky as _, picky_krb as _, pin_project_lite as _, portpicker as _, reqwest as _, + serde as _, serde_urlencoded as _, smol_str as _, sysinfo as _, thiserror as _, time as _, tokio_rustls as _, + tokio_tungstenite as _, tower as _, tower_http as _, transport as _, tungstenite as _, typed_builder as _, + url as _, uuid as _, zeroize as _, }; // Used by tests. diff --git a/devolutions-gateway/src/recording.rs b/devolutions-gateway/src/recording.rs index cddafbf00..7d558532a 100644 --- a/devolutions-gateway/src/recording.rs +++ b/devolutions-gateway/src/recording.rs @@ -18,6 +18,7 @@ use tokio::{fs, io}; use typed_builder::TypedBuilder; use uuid::Uuid; +use crate::job_queue::JobQueueHandle; use crate::session::SessionMessageSender; use crate::token::{JrecTokenClaims, RecordingFileType}; @@ -349,6 +350,7 @@ pub struct RecordingManagerTask { ongoing_recordings: HashMap, recordings_path: Utf8PathBuf, session_manager_handle: SessionMessageSender, + job_queue_handle: JobQueueHandle, } impl RecordingManagerTask { @@ -356,12 +358,14 @@ impl RecordingManagerTask { rx: RecordingMessageReceiver, recordings_path: Utf8PathBuf, session_manager_handle: SessionMessageSender, + job_queue_handle: JobQueueHandle, ) -> Self { Self { rx, ongoing_recordings: HashMap::new(), recordings_path, session_manager_handle, + job_queue_handle, } } @@ -469,7 +473,7 @@ impl RecordingManagerTask { Ok(recording_file) } - fn handle_disconnect(&mut self, id: Uuid) -> anyhow::Result<()> { + async fn handle_disconnect(&mut self, id: Uuid) -> anyhow::Result<()> { if let Some(ongoing) = self.ongoing_recordings.get_mut(&id) { if !matches!(ongoing.state, OnGoingRecordingState::Connected) { anyhow::bail!("a recording not connected can’t be disconnected (there is probably a bug)"); @@ -504,8 +508,13 @@ impl RecordingManagerTask { if recording_file_path.extension() == Some(RecordingFileType::WebM.extension()) { if cadeau::xmf::is_init() { debug!(%recording_file_path, "Enqueue video remuxing operation"); - // FIXME: It would be better to have a job queue for this kind of things in case the service is killed. - tokio::spawn(remux(recording_file_path)); + + let _ = self + .job_queue_handle + .enqueue(RemuxJob { + input_path: recording_file_path, + }) + .await; } else { debug!("Video remuxing was skipped because XMF native library is not loaded"); } @@ -628,7 +637,7 @@ async fn recording_manager_task( } }, RecordingManagerMessage::Disconnect { id } => { - if let Err(e) = manager.handle_disconnect(id) { + if let Err(e) = manager.handle_disconnect(id).await { error!(error = format!("{e:#}"), "handle_disconnect"); } @@ -691,7 +700,7 @@ async fn recording_manager_task( debug!(?msg, "Received message"); if let RecordingManagerMessage::Disconnect { id } = msg { - if let Err(e) = manager.handle_disconnect(id) { + if let Err(e) = manager.handle_disconnect(id).await { error!(error = format!("{e:#}"), "handle_disconnect"); } manager.ongoing_recordings.remove(&id); @@ -703,7 +712,32 @@ async fn recording_manager_task( Ok(()) } -pub async fn remux(input_path: Utf8PathBuf) { +#[derive(Deserialize, Serialize)] +pub struct RemuxJob { + input_path: Utf8PathBuf, +} + +impl RemuxJob { + pub const NAME: &'static str = "remux"; +} + +#[async_trait] +impl job_queue::Job for RemuxJob { + fn name(&self) -> &str { + Self::NAME + } + + fn write_json(&self) -> anyhow::Result { + serde_json::to_string(self).context("failed to serialize RemuxAction") + } + + async fn run(&mut self) -> anyhow::Result<()> { + remux(core::mem::take(&mut self.input_path)).await; + Ok(()) + } +} + +async fn remux(input_path: Utf8PathBuf) { // CPU-intensive operation potentially lasting much more than 100ms. match tokio::task::spawn_blocking(move || remux_impl(input_path)).await { Err(error) => error!(%error, "Couldn't join the CPU-intensive muxer task"), diff --git a/devolutions-gateway/src/service.rs b/devolutions-gateway/src/service.rs index a0fe4a18f..89fd0e83b 100644 --- a/devolutions-gateway/src/service.rs +++ b/devolutions-gateway/src/service.rs @@ -232,6 +232,12 @@ async fn spawn_tasks(conf_handle: ConfHandle) -> anyhow::Result { let (recording_manager_handle, recording_manager_rx) = recording_message_channel(); let (subscriber_tx, subscriber_rx) = subscriber_channel(); let mut tasks = Tasks::new(); + let job_queue_ctx = devolutions_gateway::job_queue::JobQueueCtx::init( + conf.id.unwrap_or_else(uuid::Uuid::max), // FIXME: make Gateway ID non optional. + conf.job_queue_database.as_std_path(), + ) + .await + .context("failed to initialize job queue context")?; let state = DgwState { conf_handle: conf_handle.clone(), @@ -241,6 +247,7 @@ async fn spawn_tasks(conf_handle: ConfHandle) -> anyhow::Result { subscriber_tx: subscriber_tx.clone(), shutdown_signal: tasks.shutdown_signal.clone(), recordings: recording_manager_handle.clone(), + job_queue_handle: job_queue_ctx.job_queue_handle.clone(), }; conf.listeners @@ -294,8 +301,13 @@ async fn spawn_tasks(conf_handle: ConfHandle) -> anyhow::Result { recording_manager_rx, conf.recording_path.clone(), session_manager_handle, + job_queue_ctx.job_queue_handle.clone(), )); + tasks.register(devolutions_gateway::job_queue::JobRunnerTask::new(&job_queue_ctx)); + + tasks.register(devolutions_gateway::job_queue::JobQueueTask::new(job_queue_ctx)); + Ok(tasks) } diff --git a/devolutions-gateway/tests/config.rs b/devolutions-gateway/tests/config.rs index 31c04ffb3..cc0691af6 100644 --- a/devolutions-gateway/tests/config.rs +++ b/devolutions-gateway/tests/config.rs @@ -85,6 +85,7 @@ fn hub_sample() -> Sample { plugins: None, recording_path: None, sogar: None, + job_queue_database: None, ngrok: None, verbosity_profile: Some(VerbosityProfile::Tls), web_app: None, @@ -127,6 +128,7 @@ fn legacy_sample() -> Sample { plugins: None, recording_path: None, sogar: None, + job_queue_database: None, ngrok: None, verbosity_profile: None, web_app: None, @@ -168,6 +170,7 @@ fn system_store_sample() -> Sample { plugins: None, recording_path: None, sogar: None, + job_queue_database: None, ngrok: None, verbosity_profile: None, web_app: None, @@ -234,6 +237,7 @@ fn standalone_custom_auth_sample() -> Sample { plugins: None, recording_path: None, sogar: None, + job_queue_database: None, ngrok: None, verbosity_profile: None, web_app: Some(WebAppConf { @@ -307,6 +311,7 @@ fn standalone_no_auth_sample() -> Sample { plugins: None, recording_path: None, sogar: None, + job_queue_database: None, ngrok: None, verbosity_profile: None, web_app: Some(WebAppConf { From 592703065a6d5f24f2342a88b77071b227ad52cb Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Beno=C3=AEt=20CORTIER?= Date: Thu, 14 Nov 2024 22:10:11 +0900 Subject: [PATCH 2/4] . --- crates/job-queue-libsql/src/lib.rs | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/crates/job-queue-libsql/src/lib.rs b/crates/job-queue-libsql/src/lib.rs index 65bad6d6b..31c4b2495 100644 --- a/crates/job-queue-libsql/src/lib.rs +++ b/crates/job-queue-libsql/src/lib.rs @@ -159,8 +159,8 @@ impl JobQueue for LibSqlJobQueue { (id, instance_id, failed_attempts, status, name, def) VALUES (:id, :instance_id, :failed_attempts, :status, :name, jsonb(:def))"; - // UUID v4 provides no other information than randomness which cause fragmentation. - // Reduce index fragmentation by using ULID instead. + // UUID v4 only provides randomness, which leads to fragmentation. + // We use ULID instead to reduce index fragmentation. // https://github.com/ulid/spec let id = Uuid::from(Ulid::new()); From ebb72df32b97c6efb2e72d38d12e4ae060c73f2e Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Beno=C3=AEt=20CORTIER?= Date: Fri, 15 Nov 2024 16:52:50 +0900 Subject: [PATCH 3/4] Allow jobs to be scheduled at a specific date in the future --- Cargo.lock | 3 + crates/job-queue-libsql/Cargo.toml | 11 +- crates/job-queue-libsql/src/lib.rs | 241 +++++++++++++++++++-------- crates/job-queue/Cargo.toml | 3 +- crates/job-queue/src/lib.rs | 55 ++++-- devolutions-gateway/src/job_queue.rs | 81 +++++++-- devolutions-gateway/src/service.rs | 9 +- 7 files changed, 292 insertions(+), 111 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index c7179a00f..3771701dc 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -1175,6 +1175,7 @@ dependencies = [ "pin-project-lite 0.2.15", "portpicker", "proptest", + "rand", "reqwest", "rstest", "rustls-cng", @@ -2922,6 +2923,7 @@ version = "0.0.0" dependencies = [ "anyhow", "async-trait", + "time", "tracing", "uuid", ] @@ -2935,6 +2937,7 @@ dependencies = [ "job-queue", "libsql", "serde", + "time", "tracing", "typed-builder", "ulid", diff --git a/crates/job-queue-libsql/Cargo.toml b/crates/job-queue-libsql/Cargo.toml index 05d9dc527..849fdb90d 100644 --- a/crates/job-queue-libsql/Cargo.toml +++ b/crates/job-queue-libsql/Cargo.toml @@ -9,12 +9,13 @@ publish = false workspace = true [dependencies] +anyhow = "1" +async-trait = "0.1" job-queue = { path = "../job-queue" } -typed-builder = "0.19" +libsql = "0.6" serde = "1" +time = { version = "0.3", default-features = false, features = ["std"] } +tracing = "0.1" +typed-builder = "0.19" ulid = { version = "1.1", features = ["uuid"] } uuid = "1.11" -anyhow = "1" -async-trait = "0.1" -tracing = "0.1" -libsql = "0.6" diff --git a/crates/job-queue-libsql/src/lib.rs b/crates/job-queue-libsql/src/lib.rs index 31c4b2495..e2fe4b802 100644 --- a/crates/job-queue-libsql/src/lib.rs +++ b/crates/job-queue-libsql/src/lib.rs @@ -5,6 +5,7 @@ use anyhow::Context as _; use async_trait::async_trait; use job_queue::{DynJob, JobCtx, JobQueue, JobReader, RunnerWaker}; use libsql::Connection; +use time::OffsetDateTime; use ulid::Ulid; use uuid::Uuid; @@ -25,7 +26,6 @@ pub use libsql; /// - #[derive(typed_builder::TypedBuilder)] pub struct LibSqlJobQueue { - instance_id: Uuid, runner_waker: RunnerWaker, conn: Connection, #[builder(default = 5)] @@ -40,61 +40,52 @@ enum JobStatus { } impl LibSqlJobQueue { - async fn query_user_version(&self) -> anyhow::Result { - let sql_query = "PRAGMA user_version"; - - trace!(%sql_query, "Query user_version"); - - let row = self - .conn - .query(sql_query, ()) - .await - .context("failed to execute SQL query")? - .next() - .await - .context("failed to read the row")? - .context("no row returned")?; - - let value = row.get::(0).context("failed to read user_version value")?; - - Ok(usize::try_from(value).expect("number not too big")) - } - - async fn update_user_version(&self, value: usize) -> anyhow::Result<()> { - let value = u64::try_from(value).expect("number not too big"); - - let sql_query = format!("PRAGMA user_version = {value}"); - - trace!(%sql_query, "Update user_version"); - - self.conn - .execute(&sql_query, ()) - .await - .context("failed to execute SQL query")?; + async fn apply_pragmas(&self) -> anyhow::Result<()> { + // Inspiration was taken from https://briandouglas.ie/sqlite-defaults/ + const PRAGMAS: &[&str] = &[ + // https://www.sqlite.org/pragma.html#pragma_journal_mode + // Use a write-ahead log instead of a rollback journal to implement transactions. + "PRAGMA journal_mode = WAL", + // https://www.sqlite.org/pragma.html#pragma_synchronous + // TLDR: journal_mode WAL + synchronous NORMAL is a good combination. + // WAL mode is safe from corruption with synchronous=NORMAL + // The synchronous=NORMAL setting is a good choice for most applications running in WAL mode. + "PRAGMA synchronous = NORMAL", + // https://www.sqlite.org/pragma.html#pragma_busy_timeout + // Prevents SQLITE_BUSY errors by giving a timeout to wait for a locked resource before + // returning an error, useful for handling multiple concurrent accesses. + // 15 seconds is a good value for a backend application like a job queue. + "PRAGMA busy_timeout = 15000", + // https://www.sqlite.org/pragma.html#pragma_cache_size + // Reduce the number of disks reads by allowing more data to be cached in memory (3MB). + "PRAGMA cache_size = -3000", + // https://www.sqlite.org/pragma.html#pragma_auto_vacuum + // Reclaims disk space gradually as rows are deleted, instead of performing a full vacuum, + // reducing performance impact during database operations. + "PRAGMA auto_vacuum = INCREMENTAL", + // https://www.sqlite.org/pragma.html#pragma_temp_store + // Store temporary tables and data in memory for better performance + "PRAGMA temp_store = MEMORY", + ]; + + for sql_query in PRAGMAS { + trace!(%sql_query, "PRAGMA query"); + + let mut rows = self + .conn + .query(sql_query, ()) + .await + .context("failed to execute SQL query")?; + + while let Ok(Some(row)) = rows.next().await { + trace!(?row, "PRAGMA row"); + } + } Ok(()) } -} -#[async_trait] -impl JobQueue for LibSqlJobQueue { async fn migrate(&self) -> anyhow::Result<()> { - const MIGRATIONS: &[&str] = &["CREATE TABLE job_queue ( - id UUID NOT NULL PRIMARY KEY, - instance_id UUID NOT NULL, - created_at TIMESTAMP NOT NULL DEFAULT CURRENT_TIMESTAMP, - updated_at TIMESTAMP NOT NULL DEFAULT CURRENT_TIMESTAMP, - failed_attempts INT NOT NULL, - status INT NOT NULL, - name TEXT NOT NULL, - def JSONB NOT NULL - ); - - CREATE TRIGGER update_job_updated_at_on_update AFTER UPDATE ON job_queue - BEGIN - UPDATE job_queue SET updated_at = CURRENT_TIMESTAMP WHERE rowid == NEW.rowid; - END;"]; - let user_version = self.query_user_version().await?; match MIGRATIONS.get(user_version..) { @@ -133,6 +124,50 @@ impl JobQueue for LibSqlJobQueue { Ok(()) } + async fn query_user_version(&self) -> anyhow::Result { + let sql_query = "PRAGMA user_version"; + + trace!(%sql_query, "Query user_version"); + + let row = self + .conn + .query(sql_query, ()) + .await + .context("failed to execute SQL query")? + .next() + .await + .context("failed to read the row")? + .context("no row returned")?; + + let value = row.get::(0).context("failed to read user_version value")?; + + Ok(usize::try_from(value).expect("number not too big")) + } + + async fn update_user_version(&self, value: usize) -> anyhow::Result<()> { + let value = u64::try_from(value).expect("number not too big"); + + let sql_query = format!("PRAGMA user_version = {value}"); + + trace!(%sql_query, "Update user_version"); + + self.conn + .execute(&sql_query, ()) + .await + .context("failed to execute SQL query")?; + + Ok(()) + } +} + +#[async_trait] +impl JobQueue for LibSqlJobQueue { + async fn setup(&self) -> anyhow::Result<()> { + self.apply_pragmas().await?; + self.migrate().await?; + Ok(()) + } + async fn reset_claimed_jobs(&self) -> anyhow::Result<()> { let sql_query = "UPDATE job_queue SET status = :queued_status WHERE status = :running_status"; @@ -154,19 +189,21 @@ impl JobQueue for LibSqlJobQueue { Ok(()) } - async fn push_job(&self, job: &DynJob) -> anyhow::Result<()> { + async fn push_job(&self, job: &DynJob, schedule_for: Option) -> anyhow::Result<()> { let sql_query = "INSERT INTO job_queue - (id, instance_id, failed_attempts, status, name, def) - VALUES (:id, :instance_id, :failed_attempts, :status, :name, jsonb(:def))"; + (id, scheduled_for, failed_attempts, status, name, def) + VALUES (:id, :scheduled_for, :failed_attempts, :status, :name, jsonb(:def))"; // UUID v4 only provides randomness, which leads to fragmentation. // We use ULID instead to reduce index fragmentation. // https://github.com/ulid/spec - let id = Uuid::from(Ulid::new()); + let id = Uuid::from(Ulid::new()).to_string(); + + let schedule_for = schedule_for.unwrap_or_else(|| OffsetDateTime::now_utc()); let params = ( - (":id", id.to_string()), - (":instance_id", self.instance_id.to_string()), + (":id", id), + (":scheduled_for", schedule_for.unix_timestamp()), (":failed_attempts", 0), (":status", JobStatus::Queued as u32), (":name", job.name()), @@ -196,20 +233,19 @@ impl JobQueue for LibSqlJobQueue { // As such, this directive doesn't exist. let sql_query = "UPDATE job_queue - SET status = :new_status + SET status = :running_status WHERE id IN ( SELECT id FROM job_queue - WHERE instance_id = :instance_id AND status = :current_status AND failed_attempts < :max_attempts + WHERE status = :queued_status AND failed_attempts < :max_attempts AND scheduled_for <= unixepoch() ORDER BY id LIMIT :number_of_jobs ) - RETURNING id, name, json(def) as def"; + RETURNING id, failed_attempts, name, json(def) as def"; let params = ( - (":new_status", JobStatus::Running as u32), - (":instance_id", self.instance_id.to_string()), - (":current_status", JobStatus::Queued as u32), + (":running_status", JobStatus::Running as u32), + (":queued_status", JobStatus::Queued as u32), (":max_attempts", self.max_attempts), (":number_of_jobs", number_of_jobs), ); @@ -241,7 +277,11 @@ impl JobQueue for LibSqlJobQueue { match libsql::de::from_row::<'_, JobModel>(&row) { Ok(model) => match reader.read_json(&model.name, &model.def) { - Ok(job) => jobs.push(JobCtx { id: model.id, job }), + Ok(job) => jobs.push(JobCtx { + id: model.id, + failed_attempts: model.failed_attempts, + job, + }), Err(e) => { error!( error = format!("{e:#}"), @@ -261,6 +301,7 @@ impl JobQueue for LibSqlJobQueue { #[derive(serde::Deserialize, Debug, Clone)] struct JobModel { id: Uuid, + failed_attempts: u32, name: String, def: String, } @@ -280,11 +321,19 @@ impl JobQueue for LibSqlJobQueue { Ok(()) } - async fn fail_job(&self, id: Uuid) -> anyhow::Result<()> { + async fn fail_job(&self, id: Uuid, schedule_for: OffsetDateTime) -> anyhow::Result<()> { let sql_query = "UPDATE job_queue - SET status = :new_status, failed_attempts = failed_attempts + 1 + SET + status = :queued_status, + failed_attempts = failed_attempts + 1, + scheduled_for = :scheduled_for WHERE id = :id"; - let params = ((":new_status", JobStatus::Queued as u32), (":id", id.to_string())); + + let params = ( + (":queued_status", JobStatus::Queued as u32), + (":scheduled_for", schedule_for.unix_timestamp()), + (":id", id.to_string()), + ); trace!(%sql_query, ?params, "Marking job as failed"); @@ -297,12 +346,8 @@ impl JobQueue for LibSqlJobQueue { } async fn clear_failed(&self) -> anyhow::Result<()> { - let sql_query = "DELETE FROM job_queue WHERE instance_id = :instance_id AND failed_attempts >= :max_attempts"; - - let params = ( - (":instance_id", self.instance_id.to_string()), - (":max_attempts", self.max_attempts), - ); + let sql_query = "DELETE FROM job_queue WHERE failed_attempts >= $1"; + let params = [self.max_attempts]; trace!(%sql_query, ?params, "Clearing failed jobs"); @@ -316,4 +361,54 @@ impl JobQueue for LibSqlJobQueue { Ok(()) } + + async fn next_scheduled_date(&self) -> anyhow::Result> { + let sql_query = "SELECT scheduled_for + FROM job_queue + WHERE status = :queued_status AND failed_attempts < :max_attempts + ORDER BY scheduled_for ASC + LIMIT 1"; + + let params = ( + (":queued_status", JobStatus::Queued as u32), + (":max_attempts", self.max_attempts), + ); + + trace!(%sql_query, ?params, "Fetching the earliest scheduled_for date"); + + let mut rows = self + .conn + .query(sql_query, params) + .await + .context("failed to execute SQL query")?; + + let Some(row) = rows.next().await.context("failed to read the row")? else { + return Ok(None); + }; + + let scheduled_for = row.get::(0).context("failed to read scheduled_for value")?; + let scheduled_for = + OffsetDateTime::from_unix_timestamp(scheduled_for).context("invalid UNIX timestamp for scheduled_for")?; + + Ok(Some(scheduled_for)) + } } + +// Typically, migrations should not be modified once released, and we should only be appending to this list. +const MIGRATIONS: &[&str] = &[ + "CREATE TABLE job_queue ( + id TEXT NOT NULL PRIMARY KEY, + created_at INT NOT NULL DEFAULT (unixepoch()), + updated_at INT NOT NULL DEFAULT (unixepoch()), + scheduled_for INT NOT NULL, + failed_attempts INT NOT NULL, + status INT NOT NULL, + name TEXT NOT NULL, + def BLOB NOT NULL + ) STRICT", + "CREATE TRIGGER update_job_updated_at_on_update AFTER UPDATE ON job_queue + BEGIN + UPDATE job_queue SET updated_at = unixepoch() WHERE id == NEW.id; + END", + "CREATE INDEX idx_scheduled_for ON job_queue(scheduled_for)", +]; diff --git a/crates/job-queue/Cargo.toml b/crates/job-queue/Cargo.toml index ca46a673a..ec87b437f 100644 --- a/crates/job-queue/Cargo.toml +++ b/crates/job-queue/Cargo.toml @@ -9,7 +9,8 @@ publish = false workspace = true [dependencies] -uuid = "1.11" anyhow = "1" async-trait = "0.1" +time = { version = "0.3", default-features = false } tracing = "0.1" +uuid = "1.11" diff --git a/crates/job-queue/src/lib.rs b/crates/job-queue/src/lib.rs index d419d1219..93232bd86 100644 --- a/crates/job-queue/src/lib.rs +++ b/crates/job-queue/src/lib.rs @@ -6,6 +6,7 @@ use std::pin::Pin; use std::sync::Arc; use async_trait::async_trait; +use time::OffsetDateTime; use uuid::Uuid; pub type DynJob = Box; @@ -30,10 +31,10 @@ pub trait JobReader: Send + Sync { #[async_trait] pub trait JobQueue: Send + Sync { - /// Performs migrations as required + /// Performs initial setup required before actually using the queue /// /// This function should be called first, before using any of the other functions. - async fn migrate(&self) -> anyhow::Result<()>; + async fn setup(&self) -> anyhow::Result<()>; /// Resets the status for the jobs claimed /// @@ -43,7 +44,7 @@ pub trait JobQueue: Send + Sync { /// Pushes a new job into the queue /// /// This function should ideally call `RunnerWaker::wake()` once the job is enqueued. - async fn push_job(&self, job: &DynJob) -> anyhow::Result<()>; + async fn push_job(&self, job: &DynJob, schedule_for: Option) -> anyhow::Result<()>; /// Fetches at most `number_of_jobs` from the queue async fn claim_jobs(&self, reader: &dyn JobReader, number_of_jobs: usize) -> anyhow::Result>; @@ -54,14 +55,18 @@ pub trait JobQueue: Send + Sync { /// Marks a job as failed /// /// Failed jobs are re-queued to be tried again later. - async fn fail_job(&self, job_id: Uuid) -> anyhow::Result<()>; + async fn fail_job(&self, job_id: Uuid, schedule_for: OffsetDateTime) -> anyhow::Result<()>; /// Removes jobs which can't be retried async fn clear_failed(&self) -> anyhow::Result<()>; + + /// Retrieves the closest future scheduled date + async fn next_scheduled_date(&self) -> anyhow::Result>; } pub struct JobCtx { pub id: Uuid, + pub failed_attempts: u32, pub job: DynJob, } @@ -88,6 +93,7 @@ pub struct JobRunner<'a> { pub spawn: &'a (dyn Fn(JobCtx, SpawnCallback) + Sync), pub sleep: &'a (dyn Fn(std::time::Duration) -> DynFuture + Sync), pub wait_notified: &'a (dyn Fn() -> DynFuture + Sync), + pub wait_notified_timeout: &'a (dyn Fn(std::time::Duration) -> DynFuture + Sync), pub waker: RunnerWaker, pub max_batch_size: usize, } @@ -106,6 +112,7 @@ impl JobRunner<'_> { sleep, waker, wait_notified, + wait_notified_timeout, max_batch_size, } = self; @@ -118,18 +125,16 @@ impl JobRunner<'_> { Ok(jobs) => jobs, Err(e) => { error!(error = format!("{e:#}"), "Failed to pull jobs"); - (sleep)(Duration::from_secs(10)).await; + (sleep)(Duration::from_secs(30)).await; continue; } }; - let number_of_jobs = jobs.len(); - if number_of_jobs > 0 { - trace!(number_of_jobs, "Fetched jobs"); - } + trace!(number_of_jobs = jobs.len(), "Fetched jobs"); for job in jobs { let job_id = job.id; + let failed_attempts = job.failed_attempts; let callback = Box::new({ let queue = Arc::clone(&queue); @@ -147,7 +152,10 @@ impl JobRunner<'_> { Err(e) => { warn!(error = format!("{e:#}"), %job_id, "Job failed"); - if let Err(e) = queue.fail_job(job_id).await { + let schedule_for = + OffsetDateTime::now_utc() + (1 << failed_attempts) * Duration::from_secs(30); + + if let Err(e) = queue.fail_job(job_id, schedule_for).await { error!(error = format!("{e:#}"), "Failed to mark job as failed") } } @@ -166,10 +174,33 @@ impl JobRunner<'_> { running_count.fetch_add(1, Ordering::SeqCst); } + let next_scheduled = if running_count.load(Ordering::SeqCst) < max_batch_size { + queue + .next_scheduled_date() + .await + .ok() + .flatten() + .map(|date| date.unix_timestamp() - OffsetDateTime::now_utc().unix_timestamp()) + .inspect(|next_scheduled| trace!("Next task in {next_scheduled} seconds")) + } else { + None + }; + + let before_wait = Instant::now(); + // Wait for something to happen. // This could be a notification that a new job has been pushed, or that a running job is terminated. - let before_wait = Instant::now(); - (wait_notified)().await; + if let Some(timeout) = next_scheduled { + // If the next task was scheduled in < 0 seconds, skip the wait step. + // This happens because there is a delay between the moment the jobs to run are claimed, and the moment + // we check for the next closest scheduled job. + if let Ok(timeout) = u64::try_from(timeout) { + (wait_notified_timeout)(Duration::from_secs(timeout)).await; + } + } else { + (wait_notified)().await; + } + let elapsed = before_wait.elapsed(); // Make sure we wait a little bit to avoid overloading the database. diff --git a/devolutions-gateway/src/job_queue.rs b/devolutions-gateway/src/job_queue.rs index 353802550..f8678a5b3 100644 --- a/devolutions-gateway/src/job_queue.rs +++ b/devolutions-gateway/src/job_queue.rs @@ -7,8 +7,8 @@ use axum::async_trait; use devolutions_gateway_task::{ChildTask, ShutdownSignal, Task}; use job_queue::{DynJobQueue, Job, JobCtx, JobQueue, JobReader, JobRunner, RunnerWaker}; use job_queue_libsql::libsql; +use time::OffsetDateTime; use tokio::sync::{mpsc, Notify}; -use uuid::Uuid; pub struct JobQueueCtx { notify_runner: Arc, @@ -18,10 +18,15 @@ pub struct JobQueueCtx { pub job_queue_handle: JobQueueHandle, } +pub struct JobMessage { + pub job: Box, + pub schedule_for: Option, +} + #[derive(Clone)] -pub struct JobQueueHandle(mpsc::Sender>); +pub struct JobQueueHandle(mpsc::Sender); -pub type JobQueueReceiver = mpsc::Receiver>; +pub type JobQueueReceiver = mpsc::Receiver; pub struct JobQueueTask { queue: DynJobQueue, @@ -35,7 +40,7 @@ pub struct JobRunnerTask { } impl JobQueueCtx { - pub async fn init(gateway_id: Uuid, database_path: &Path) -> anyhow::Result { + pub async fn init(database_path: &Path) -> anyhow::Result { let notify_runner = Arc::new(Notify::new()); let runner_waker = RunnerWaker::new({ @@ -51,14 +56,13 @@ impl JobQueueCtx { let conn = database.connect().context("open database connection")?; let queue = job_queue_libsql::LibSqlJobQueue::builder() - .instance_id(gateway_id) .runner_waker(runner_waker.clone()) .conn(conn) .build(); let queue = Arc::new(queue); - queue.migrate().await.context("database migration")?; + queue.setup().await.context("database migration")?; queue .reset_claimed_jobs() @@ -86,11 +90,45 @@ impl JobQueueHandle { } pub fn blocking_enqueue(&self, job: T) -> anyhow::Result<()> { - self.0.blocking_send(Box::new(job)).context("couldn't enqueue job") + self.0 + .blocking_send(JobMessage { + job: Box::new(job), + schedule_for: None, + }) + .context("couldn't enqueue job") } pub async fn enqueue(&self, job: T) -> anyhow::Result<()> { - self.0.send(Box::new(job)).await.context("couldn't enqueue job") + self.0 + .send(JobMessage { + job: Box::new(job), + schedule_for: None, + }) + .await + .context("couldn't enqueue job") + } + + pub async fn blocking_schedule( + &self, + job: T, + schedule_for: OffsetDateTime, + ) -> anyhow::Result<()> { + self.0 + .blocking_send(JobMessage { + job: Box::new(job), + schedule_for: Some(schedule_for), + }) + .context("couldn't enqueue job") + } + + pub async fn schedule(&self, job: T, schedule_for: OffsetDateTime) -> anyhow::Result<()> { + self.0 + .send(JobMessage { + job: Box::new(job), + schedule_for: Some(schedule_for), + }) + .await + .context("couldn't enqueue job") } } @@ -125,8 +163,8 @@ async fn job_queue_task(ctx: JobQueueTask, mut shutdown_signal: ShutdownSignal) loop { tokio::select! { - job = job_queue_rx.recv() => { - let Some(job) = job else { + msg = job_queue_rx.recv() => { + let Some(msg) = msg else { debug!("All senders are dead"); break; }; @@ -136,7 +174,7 @@ async fn job_queue_task(ctx: JobQueueTask, mut shutdown_signal: ShutdownSignal) async move { for _ in 0..5 { - match queue.push_job(&job).await { + match queue.push_job(&msg.job, msg.schedule_for).await { Ok(()) => break, Err(e) => { warn!(error = format!("{e:#}"), "Failed to push job"); @@ -200,9 +238,23 @@ async fn job_runner_task(ctx: JobRunnerTask, mut shutdown_signal: ShutdownSignal let sleep = |duration: Duration| (Box::new(tokio::time::sleep(duration)) as Box + Send>).into(); - let wait_notified = move || { + let wait_notified = { let notify_runner = Arc::clone(¬ify_runner); - (Box::new(async move { notify_runner.notified().await }) as Box + Send>).into() + move || { + let notify_runner = Arc::clone(¬ify_runner); + (Box::new(async move { notify_runner.notified().await }) as Box + Send>).into() + } + }; + + let wait_notified_timeout = move |timeout: Duration| { + let notify_runner = Arc::clone(¬ify_runner); + (Box::new(async move { + tokio::select! { + () = notify_runner.notified() => {} + () = tokio::time::sleep(timeout) => {} + } + }) as Box + Send>) + .into() }; let runner = JobRunner { @@ -211,8 +263,9 @@ async fn job_runner_task(ctx: JobRunnerTask, mut shutdown_signal: ShutdownSignal spawn: &spawn, sleep: &sleep, wait_notified: &wait_notified, + wait_notified_timeout: &wait_notified_timeout, waker: runner_waker, - max_batch_size: 3, + max_batch_size: 16, }; tokio::select! { diff --git a/devolutions-gateway/src/service.rs b/devolutions-gateway/src/service.rs index 89fd0e83b..898e8567c 100644 --- a/devolutions-gateway/src/service.rs +++ b/devolutions-gateway/src/service.rs @@ -232,12 +232,9 @@ async fn spawn_tasks(conf_handle: ConfHandle) -> anyhow::Result { let (recording_manager_handle, recording_manager_rx) = recording_message_channel(); let (subscriber_tx, subscriber_rx) = subscriber_channel(); let mut tasks = Tasks::new(); - let job_queue_ctx = devolutions_gateway::job_queue::JobQueueCtx::init( - conf.id.unwrap_or_else(uuid::Uuid::max), // FIXME: make Gateway ID non optional. - conf.job_queue_database.as_std_path(), - ) - .await - .context("failed to initialize job queue context")?; + let job_queue_ctx = devolutions_gateway::job_queue::JobQueueCtx::init(conf.job_queue_database.as_std_path()) + .await + .context("failed to initialize job queue context")?; let state = DgwState { conf_handle: conf_handle.clone(), From 67638484c600895594c6673dbf2ca27545084e09 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Beno=C3=AEt=20CORTIER?= Date: Fri, 15 Nov 2024 16:56:28 +0900 Subject: [PATCH 4/4] . --- Cargo.lock | 1 - devolutions-gateway/src/job_queue.rs | 2 +- 2 files changed, 1 insertion(+), 2 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index 3771701dc..20010e99b 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -1175,7 +1175,6 @@ dependencies = [ "pin-project-lite 0.2.15", "portpicker", "proptest", - "rand", "reqwest", "rstest", "rustls-cng", diff --git a/devolutions-gateway/src/job_queue.rs b/devolutions-gateway/src/job_queue.rs index f8678a5b3..38893ebb0 100644 --- a/devolutions-gateway/src/job_queue.rs +++ b/devolutions-gateway/src/job_queue.rs @@ -62,7 +62,7 @@ impl JobQueueCtx { let queue = Arc::new(queue); - queue.setup().await.context("database migration")?; + queue.setup().await.context("queue setup")?; queue .reset_claimed_jobs()