diff --git a/Cargo.lock b/Cargo.lock index 7324017a..20010e99 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -61,6 +61,18 @@ dependencies = [ "aes", ] +[[package]] +name = "ahash" +version = "0.8.11" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "e89da841a80418a9b391ebaea17f5c112ffaaa96f621d2c285b5174da76b9011" +dependencies = [ + "cfg-if", + "once_cell", + "version_check", + "zerocopy", +] + [[package]] name = "aho-corasick" version = "1.1.3" @@ -76,12 +88,12 @@ version = "0.13.4" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "7b0e3b97a21e41ec5c19bfd9b4fc1f7086be104f8b988681230247ffc91cc8ed" dependencies = [ - "axum", + "axum 0.7.7", "axum-extra", "bytes 1.8.0", "cfg-if", "http 1.1.0", - "indexmap", + "indexmap 2.6.0", "schemars", "serde", "serde_json", @@ -92,6 +104,12 @@ dependencies = [ "tracing", ] +[[package]] +name = "allocator-api2" +version = "0.2.20" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "45862d1c77f2228b9e10bc609d5bc203d86ebc9b87ad8d5d5167a6c9abf739d9" + [[package]] name = "android-tzdata" version = "0.1.1" @@ -291,7 +309,7 @@ version = "0.22.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "df7a4168111d7eb622a31b214057b8509c0a7e1794f44c546d742330dc793972" dependencies = [ - "bindgen", + "bindgen 0.69.5", "cc", "cmake", "dunce", @@ -300,6 +318,34 @@ dependencies = [ "paste", ] +[[package]] +name = "axum" +version = "0.6.20" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "3b829e4e32b91e643de6eafe82b1d90675f5874230191a4ffbc1b336dec4d6bf" +dependencies = [ + "async-trait", + "axum-core 0.3.4", + "bitflags 1.3.2", + "bytes 1.8.0", + "futures-util", + "http 0.2.12", + "http-body 0.4.6", + "hyper 0.14.31", + "itoa", + "matchit", + "memchr", + "mime", + "percent-encoding", + "pin-project-lite 0.2.15", + "rustversion", + "serde", + "sync_wrapper 0.1.2", + "tower 0.4.13", + "tower-layer", + "tower-service", +] + [[package]] name = "axum" version = "0.7.7" @@ -307,7 +353,7 @@ source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "504e3947307ac8326a5437504c517c4b56716c9d98fac0028c2acc7ca47d70ae" dependencies = [ "async-trait", - "axum-core", + "axum-core 0.4.5", "base64 0.22.1", "bytes 1.8.0", "futures-util", @@ -337,6 +383,23 @@ dependencies = [ "tracing", ] +[[package]] +name = "axum-core" +version = "0.3.4" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "759fa577a247914fd3f7f76d62972792636412fbfd634cd452f6a385a74d2d2c" +dependencies = [ + "async-trait", + "bytes 1.8.0", + "futures-util", + "http 0.2.12", + "http-body 0.4.6", + "mime", + "rustversion", + "tower-layer", + "tower-service", +] + [[package]] name = "axum-core" version = "0.4.5" @@ -364,8 +427,8 @@ version = "0.9.4" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "73c3220b188aea709cf1b6c5f9b01c3bd936bb08bd2b5184a12b35ac8131b1f9" dependencies = [ - "axum", - "axum-core", + "axum 0.7.7", + "axum-core 0.4.5", "bytes 1.8.0", "futures-util", "headers", @@ -456,6 +519,38 @@ version = "1.6.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "8c3c1a368f70d6cf7302d78f8f7093da241fb8e8807c05cc9e51a125895a6d5b" +[[package]] +name = "bincode" +version = "1.3.3" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "b1f45e9417d87227c7a56d22e471c6206462cba514c7590c09aff4cf6d1ddcad" +dependencies = [ + "serde", +] + +[[package]] +name = "bindgen" +version = "0.66.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "f2b84e06fc203107bfbad243f4aba2af864eb7db3b1cf46ea0a023b0b433d2a7" +dependencies = [ + "bitflags 2.6.0", + "cexpr", + "clang-sys", + "lazy_static", + "lazycell", + "log", + "peeking_take_while", + "prettyplease", + "proc-macro2 1.0.88", + "quote 1.0.37", + "regex", + "rustc-hash 1.1.0", + "shlex", + "syn 2.0.87", + "which", +] + [[package]] name = "bindgen" version = "0.69.5" @@ -583,6 +678,9 @@ name = "bytes" version = "1.8.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "9ac0150caa2ae65ca5bd83f25c7de183dea78d4d366469f148435e2acfbad0da" +dependencies = [ + "serde", +] [[package]] name = "cadeau" @@ -1012,7 +1110,7 @@ dependencies = [ "tap", "thiserror", "tokio 1.41.1", - "tokio-rustls", + "tokio-rustls 0.26.0", "tracing", "uuid", "win-api-wrappers", @@ -1038,7 +1136,7 @@ dependencies = [ "anyhow", "argon2", "async-trait", - "axum", + "axum 0.7.7", "axum-extra", "backoff", "bytes 1.8.0", @@ -1064,6 +1162,8 @@ dependencies = [ "ironrdp-pdu 0.1.0 (git+https://github.com/Devolutions/IronRDP?rev=7c268d863048d0a9182b3f7bf778668de8db4ccf)", "ironrdp-rdcleanpath", "jmux-proxy", + "job-queue", + "job-queue-libsql", "multibase", "network-scanner", "ngrok", @@ -1087,11 +1187,11 @@ dependencies = [ "thiserror", "time", "tokio 1.41.1", - "tokio-rustls", + "tokio-rustls 0.26.0", "tokio-test", "tokio-tungstenite", "tower 0.5.1", - "tower-http", + "tower-http 0.5.2", "tracing", "tracing-cov-mark", "transport", @@ -1142,7 +1242,7 @@ dependencies = [ "aide", "anyhow", "async-trait", - "axum", + "axum 0.7.7", "base16ct", "base64 0.22.1", "camino", @@ -1161,7 +1261,7 @@ dependencies = [ "sha1", "sha2", "tokio 1.41.1", - "tower-http", + "tower-http 0.5.2", "tower-service", "tracing", "walkdir", @@ -1459,6 +1559,24 @@ dependencies = [ "arrayvec", ] +[[package]] +name = "fallible-iterator" +version = "0.2.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "4443176a9f2c162692bd3d352d745ef9413eec5782a80d8fd6f8a1ac692a07f7" + +[[package]] +name = "fallible-iterator" +version = "0.3.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "2acce4a10f12dc2fb14a218589d4f1f62ef011b2d0cc4b3cb1bba8e94da14649" + +[[package]] +name = "fallible-streaming-iterator" +version = "0.1.9" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "7360491ce676a36bf9bb3c56c1aa791658183a54d2744120f27285738d90465a" + [[package]] name = "fastrand" version = "2.1.1" @@ -1706,8 +1824,10 @@ source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "c4567c8db10ae91089c99af84c68c38da3ec2f087c3f82960bcdbf3656b6f4d7" dependencies = [ "cfg-if", + "js-sys", "libc", "wasi", + "wasm-bindgen", ] [[package]] @@ -1755,7 +1875,7 @@ dependencies = [ "futures-sink", "futures-util", "http 0.2.12", - "indexmap", + "indexmap 2.6.0", "slab", "tokio 1.41.1", "tokio-util", @@ -1774,19 +1894,44 @@ dependencies = [ "futures-core", "futures-sink", "http 1.1.0", - "indexmap", + "indexmap 2.6.0", "slab", "tokio 1.41.1", "tokio-util", "tracing", ] +[[package]] +name = "hashbrown" +version = "0.12.3" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "8a9ee70c43aaf417c914396645a0fa852624801b24ebb7ae78fe8272889ac888" + +[[package]] +name = "hashbrown" +version = "0.14.5" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "e5274423e17b7c9fc20b6e7e208532f9b19825d82dfd615708b70edd83df41f1" +dependencies = [ + "ahash", + "allocator-api2", +] + [[package]] name = "hashbrown" version = "0.15.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "1e087f84d4f86bf4b218b927129862374b72199ae7d8657835f1e89000eea4fb" +[[package]] +name = "hashlink" +version = "0.8.4" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "e8094feaf31ff591f651a2664fb9cfd92bba7a60ce3197265e9482ebe753c8f7" +dependencies = [ + "hashbrown 0.14.5", +] + [[package]] name = "headers" version = "0.4.0" @@ -1940,6 +2085,12 @@ dependencies = [ "pin-project-lite 0.2.15", ] +[[package]] +name = "http-range-header" +version = "0.3.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "add0ab9360ddbd88cfeb3bd9574a1d85cfdfa14db10b3e21d3700dbc4328758f" + [[package]] name = "http-range-header" version = "0.4.1" @@ -2009,6 +2160,24 @@ dependencies = [ "want", ] +[[package]] +name = "hyper-rustls" +version = "0.25.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "399c78f9338483cb7e630c8474b07268983c6bd5acee012e4211f9f7bb21b070" +dependencies = [ + "futures-util", + "http 0.2.12", + "hyper 0.14.31", + "log", + "rustls 0.22.4", + "rustls-native-certs 0.7.3", + "rustls-pki-types", + "tokio 1.41.1", + "tokio-rustls 0.25.0", + "webpki-roots", +] + [[package]] name = "hyper-rustls" version = "0.27.3" @@ -2020,13 +2189,25 @@ dependencies = [ "hyper 1.5.0", "hyper-util", "rustls 0.23.15", - "rustls-native-certs", + "rustls-native-certs 0.8.0", "rustls-pki-types", "tokio 1.41.1", - "tokio-rustls", + "tokio-rustls 0.26.0", "tower-service", ] +[[package]] +name = "hyper-timeout" +version = "0.4.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "bbb958482e8c7be4bc3cf272a766a2b0bf1a6755e7a6ae777f017a31d11b13b1" +dependencies = [ + "hyper 0.14.31", + "pin-project-lite 0.2.15", + "tokio 1.41.1", + "tokio-io-timeout", +] + [[package]] name = "hyper-tls" version = "0.5.0" @@ -2231,6 +2412,16 @@ dependencies = [ "windows-sys 0.52.0", ] +[[package]] +name = "indexmap" +version = "1.9.3" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "bd070e393353796e801d209ad339e89596eb4c8d430d18ede6a1cced8fafbd99" +dependencies = [ + "autocfg", + "hashbrown 0.12.3", +] + [[package]] name = "indexmap" version = "2.6.0" @@ -2238,7 +2429,7 @@ source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "707907fe3c25f5424cce2cb7e1cbcafee6bdbe735ca90ef77c29e84591e5b9da" dependencies = [ "equivalent", - "hashbrown", + "hashbrown 0.15.0", "serde", ] @@ -2581,7 +2772,7 @@ dependencies = [ "ironrdp-svc 0.1.0 (git+https://github.com/Devolutions/IronRDP?rev=2e1a9ac88e38e7d92d893007bc25d0a05c365861)", "ironrdp-tokio", "tokio 1.41.1", - "tokio-rustls", + "tokio-rustls 0.26.0", "tracing", ] @@ -2678,7 +2869,7 @@ dependencies = [ "proxy-types", "proxy_cfg", "rustls 0.23.15", - "rustls-native-certs", + "rustls-native-certs 0.8.0", "rustls-pemfile 2.2.0", "seahorse", "sysinfo", @@ -2725,6 +2916,33 @@ dependencies = [ "tracing", ] +[[package]] +name = "job-queue" +version = "0.0.0" +dependencies = [ + "anyhow", + "async-trait", + "time", + "tracing", + "uuid", +] + +[[package]] +name = "job-queue-libsql" +version = "0.0.0" +dependencies = [ + "anyhow", + "async-trait", + "job-queue", + "libsql", + "serde", + "time", + "tracing", + "typed-builder", + "ulid", + "uuid", +] + [[package]] name = "jobserver" version = "0.1.32" @@ -2820,6 +3038,138 @@ dependencies = [ "redox_syscall", ] +[[package]] +name = "libsql" +version = "0.6.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "fe18646e4ef8db446bc3e3f5fb96131483203bc5f4998ff149f79a067530c01c" +dependencies = [ + "anyhow", + "async-stream", + "async-trait", + "base64 0.21.7", + "bincode", + "bitflags 2.6.0", + "bytes 1.8.0", + "fallible-iterator 0.3.0", + "futures", + "http 0.2.12", + "hyper 0.14.31", + "hyper-rustls 0.25.0", + "libsql-hrana", + "libsql-sqlite3-parser", + "libsql-sys", + "libsql_replication", + "parking_lot", + "serde", + "serde_json", + "thiserror", + "tokio 1.41.1", + "tokio-stream", + "tokio-util", + "tonic", + "tonic-web", + "tower 0.4.13", + "tower-http 0.4.4", + "tracing", + "uuid", + "zerocopy", +] + +[[package]] +name = "libsql-ffi" +version = "0.5.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "5f2a50a585a1184a43621a9133b7702ba5cb7a87ca5e704056b19d8005de6faf" +dependencies = [ + "bindgen 0.66.1", + "cc", +] + +[[package]] +name = "libsql-hrana" +version = "0.2.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "aeaf5d19e365465e1c23d687a28c805d7462531b3f619f0ba49d3cf369890a3e" +dependencies = [ + "base64 0.21.7", + "bytes 1.8.0", + "prost", + "serde", +] + +[[package]] +name = "libsql-rusqlite" +version = "0.33.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "ae65c66088dcd309abbd5617ae046abac2a2ee0a7fdada5127353bd68e0a27ea" +dependencies = [ + "bitflags 2.6.0", + "fallible-iterator 0.2.0", + "fallible-streaming-iterator", + "hashlink", + "libsql-ffi", + "smallvec", +] + +[[package]] +name = "libsql-sqlite3-parser" +version = "0.13.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "15a90128c708356af8f7d767c9ac2946692c9112b4f74f07b99a01a60680e413" +dependencies = [ + "bitflags 2.6.0", + "cc", + "fallible-iterator 0.3.0", + "indexmap 2.6.0", + "log", + "memchr", + "phf", + "phf_codegen", + "phf_shared", + "uncased", +] + +[[package]] +name = "libsql-sys" +version = "0.8.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "5c05b61c226781d6f5e26e3e7364617f19c0c1d5332035802e9229d6024cec05" +dependencies = [ + "bytes 1.8.0", + "libsql-ffi", + "libsql-rusqlite", + "once_cell", + "tracing", + "zerocopy", +] + +[[package]] +name = "libsql_replication" +version = "0.6.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "9cf40c4c2c01462da758272976de0a23d19b4e9c714db08efecf262d896655b5" +dependencies = [ + "aes", + "async-stream", + "async-trait", + "bytes 1.8.0", + "cbc", + "libsql-rusqlite", + "libsql-sys", + "parking_lot", + "prost", + "serde", + "thiserror", + "tokio 1.41.1", + "tokio-stream", + "tokio-util", + "tonic", + "tracing", + "uuid", + "zerocopy", +] + [[package]] name = "libudis86-sys" version = "0.2.1" @@ -3585,6 +3935,12 @@ dependencies = [ "thiserror", ] +[[package]] +name = "peeking_take_while" +version = "0.1.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "19b17cddbe7ec3f8bc800887bab5e717348c95ea2ca0b1bf0837fb964dc67099" + [[package]] name = "pem-rfc7468" version = "0.7.0" @@ -3600,6 +3956,45 @@ version = "2.3.1" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "e3148f5046208a5d56bcfc03053e3ca6334e51da8dfb19b6cdc8b306fae3283e" +[[package]] +name = "phf" +version = "0.11.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "ade2d8b8f33c7333b51bcf0428d37e217e9f32192ae4772156f65063b8ce03dc" +dependencies = [ + "phf_shared", +] + +[[package]] +name = "phf_codegen" +version = "0.11.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "e8d39688d359e6b34654d328e262234662d16cc0f60ec8dcbe5e718709342a5a" +dependencies = [ + "phf_generator", + "phf_shared", +] + +[[package]] +name = "phf_generator" +version = "0.11.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "48e4cc64c2ad9ebe670cb8fd69dd50ae301650392e81c05f9bfcb2d5bdbc24b0" +dependencies = [ + "phf_shared", + "rand", +] + +[[package]] +name = "phf_shared" +version = "0.11.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "90fcb95eef784c2ac79119d1dd819e162b5da872ce6f3c3abe1e8ca1c082f72b" +dependencies = [ + "siphasher", + "uncased", +] + [[package]] name = "picky" version = "7.0.0-rc.9" @@ -4009,6 +4404,29 @@ dependencies = [ "unarray", ] +[[package]] +name = "prost" +version = "0.12.6" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "deb1435c188b76130da55f17a466d252ff7b1418b2ad3e037d127b94e3411f29" +dependencies = [ + "bytes 1.8.0", + "prost-derive", +] + +[[package]] +name = "prost-derive" +version = "0.12.6" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "81bddcdb20abf9501610992b6759a4c888aef7d1a7247ef75e2404275ac24af1" +dependencies = [ + "anyhow", + "itertools", + "proc-macro2 1.0.88", + "quote 1.0.37", + "syn 2.0.87", +] + [[package]] name = "proxy-generators" version = "0.0.0" @@ -4318,7 +4736,7 @@ dependencies = [ "http-body 1.0.1", "http-body-util", "hyper 1.5.0", - "hyper-rustls", + "hyper-rustls 0.27.3", "hyper-util", "ipnet", "js-sys", @@ -4329,7 +4747,7 @@ dependencies = [ "pin-project-lite 0.2.15", "quinn", "rustls 0.23.15", - "rustls-native-certs", + "rustls-native-certs 0.8.0", "rustls-pemfile 2.2.0", "rustls-pki-types", "serde", @@ -4337,7 +4755,7 @@ dependencies = [ "serde_urlencoded", "sync_wrapper 1.0.1", "tokio 1.41.1", - "tokio-rustls", + "tokio-rustls 0.26.0", "tokio-util", "tower-service", "url", @@ -4534,6 +4952,20 @@ dependencies = [ "webpki", ] +[[package]] +name = "rustls" +version = "0.22.4" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "bf4ef73721ac7bcd79b2b315da7779d8fc09718c6b3d2d1b2d94850eb8c18432" +dependencies = [ + "log", + "ring 0.17.8", + "rustls-pki-types", + "rustls-webpki", + "subtle", + "zeroize", +] + [[package]] name = "rustls" version = "0.23.15" @@ -4561,6 +4993,19 @@ dependencies = [ "windows-sys 0.59.0", ] +[[package]] +name = "rustls-native-certs" +version = "0.7.3" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "e5bfb394eeed242e909609f56089eecfe5fda225042e8b171791b9c95f5931e5" +dependencies = [ + "openssl-probe", + "rustls-pemfile 2.2.0", + "rustls-pki-types", + "schannel", + "security-framework", +] + [[package]] name = "rustls-native-certs" version = "0.8.0" @@ -4665,7 +5110,7 @@ source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "09c024468a378b7e36765cd36702b7a90cc3cba11654f6685c8f233408e89e92" dependencies = [ "dyn-clone", - "indexmap", + "indexmap 2.6.0", "schemars_derive", "serde", "serde_json", @@ -4801,7 +5246,7 @@ source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "8de514ef58196f1fc96dcaef80fe6170a1ce6215df9687a93fe8300e773fefc5" dependencies = [ "form_urlencoded", - "indexmap", + "indexmap 2.6.0", "itoa", "ryu", "serde", @@ -4835,7 +5280,7 @@ version = "0.13.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "cd34f36fe4c5ba9654417139a9b3a20d2e1de6012ee678ad14d240c22c78d8d6" dependencies = [ - "axum", + "axum 0.7.7", "futures", "percent-encoding", "serde", @@ -4869,7 +5314,7 @@ version = "0.9.34+deprecated" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "6a8b1a1a2ebf674015cc02edccce75287f1a0130d394307b36743c2f5d504b47" dependencies = [ - "indexmap", + "indexmap 2.6.0", "itoa", "ryu", "serde", @@ -4942,6 +5387,12 @@ dependencies = [ "rand_core", ] +[[package]] +name = "siphasher" +version = "0.3.11" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "38b58827f4464d87d377d175e90bf58eb00fd8716ff0a62f80356b5e61555d0d" + [[package]] name = "slab" version = "0.4.9" @@ -5386,6 +5837,16 @@ dependencies = [ "windows-sys 0.52.0", ] +[[package]] +name = "tokio-io-timeout" +version = "1.2.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "30b74022ada614a1b4834de765f9bb43877f910cc8ce4be40e89042c9223a8bf" +dependencies = [ + "pin-project-lite 0.2.15", + "tokio 1.41.1", +] + [[package]] name = "tokio-macros" version = "2.4.0" @@ -5418,6 +5879,17 @@ dependencies = [ "tokio 1.41.1", ] +[[package]] +name = "tokio-rustls" +version = "0.25.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "775e0c0f0adb3a2f22a00c4745d728b479985fc15ee7ca6a2608388c5569860f" +dependencies = [ + "rustls 0.22.4", + "rustls-pki-types", + "tokio 1.41.1", +] + [[package]] name = "tokio-rustls" version = "0.26.0" @@ -5463,11 +5935,11 @@ dependencies = [ "log", "native-tls", "rustls 0.23.15", - "rustls-native-certs", + "rustls-native-certs 0.8.0", "rustls-pki-types", "tokio 1.41.1", "tokio-native-tls", - "tokio-rustls", + "tokio-rustls 0.26.0", "tungstenite", ] @@ -5512,13 +5984,60 @@ version = "0.22.22" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "4ae48d6208a266e853d946088ed816055e556cc6028c5e8e2b84d9fa5dd7c7f5" dependencies = [ - "indexmap", + "indexmap 2.6.0", "serde", "serde_spanned", "toml_datetime", "winnow", ] +[[package]] +name = "tonic" +version = "0.11.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "76c4eb7a4e9ef9d4763600161f12f5070b92a578e1b634db88a6887844c91a13" +dependencies = [ + "async-stream", + "async-trait", + "axum 0.6.20", + "base64 0.21.7", + "bytes 1.8.0", + "h2 0.3.26", + "http 0.2.12", + "http-body 0.4.6", + "hyper 0.14.31", + "hyper-timeout", + "percent-encoding", + "pin-project 1.1.7", + "prost", + "tokio 1.41.1", + "tokio-stream", + "tower 0.4.13", + "tower-layer", + "tower-service", + "tracing", +] + +[[package]] +name = "tonic-web" +version = "0.11.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "dc3b0e1cedbf19fdfb78ef3d672cb9928e0a91a9cb4629cc0c916e8cff8aaaa1" +dependencies = [ + "base64 0.21.7", + "bytes 1.8.0", + "http 0.2.12", + "http-body 0.4.6", + "hyper 0.14.31", + "pin-project 1.1.7", + "tokio-stream", + "tonic", + "tower-http 0.4.4", + "tower-layer", + "tower-service", + "tracing", +] + [[package]] name = "tower" version = "0.3.1" @@ -5537,6 +6056,26 @@ dependencies = [ "tower-util", ] +[[package]] +name = "tower" +version = "0.4.13" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "b8fa9be0de6cf49e536ce1851f987bd21a43b771b09473c3549a6c853db37c1c" +dependencies = [ + "futures-core", + "futures-util", + "indexmap 1.9.3", + "pin-project 1.1.7", + "pin-project-lite 0.2.15", + "rand", + "slab", + "tokio 1.41.1", + "tokio-util", + "tower-layer", + "tower-service", + "tracing", +] + [[package]] name = "tower" version = "0.5.1" @@ -5578,6 +6117,26 @@ dependencies = [ "tower-service", ] +[[package]] +name = "tower-http" +version = "0.4.4" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "61c5bb1d698276a2443e5ecfabc1008bf15a36c12e6a7176e7bf089ea9131140" +dependencies = [ + "bitflags 2.6.0", + "bytes 1.8.0", + "futures-core", + "futures-util", + "http 0.2.12", + "http-body 0.4.6", + "http-range-header 0.3.1", + "pin-project-lite 0.2.15", + "tower 0.4.13", + "tower-layer", + "tower-service", + "tracing", +] + [[package]] name = "tower-http" version = "0.5.2" @@ -5590,7 +6149,7 @@ dependencies = [ "http 1.1.0", "http-body 1.0.1", "http-body-util", - "http-range-header", + "http-range-header 0.4.1", "httpdate", "mime", "mime_guess", @@ -5841,12 +6400,33 @@ version = "1.17.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "42ff0bf0c66b8238c6f3b578df37d0b7848e55df8577b3f74f92a69acceeb825" +[[package]] +name = "ulid" +version = "1.1.3" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "04f903f293d11f31c0c29e4148f6dc0d033a7f80cebc0282bea147611667d289" +dependencies = [ + "getrandom", + "rand", + "uuid", + "web-time", +] + [[package]] name = "unarray" version = "0.1.4" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "eaea85b334db583fe3274d12b4cd1880032beab409c0d774be044d4480ab9a94" +[[package]] +name = "uncased" +version = "0.9.10" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "e1b88fcfe09e89d3866a5c11019378088af2d24c3fbd4f0543f96b479ec90697" +dependencies = [ + "version_check", +] + [[package]] name = "unicase" version = "2.8.0" @@ -5929,7 +6509,7 @@ version = "4.2.3" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "c5afb1a60e207dca502682537fefcfd9921e71d0b83e9576060f09abc6efab23" dependencies = [ - "indexmap", + "indexmap 2.6.0", "serde", "serde_json", "serde_yaml", @@ -6121,6 +6701,16 @@ dependencies = [ "wasm-bindgen", ] +[[package]] +name = "web-time" +version = "1.1.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "5a6580f308b1fad9207618087a65c04e7a10bc77e02c8e84e9b00dd4b12fa0bb" +dependencies = [ + "js-sys", + "wasm-bindgen", +] + [[package]] name = "webpki" version = "0.22.4" @@ -6131,6 +6721,15 @@ dependencies = [ "untrusted 0.9.0", ] +[[package]] +name = "webpki-roots" +version = "0.26.6" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "841c67bff177718f1d4dfefde8d8f0e78f9b6589319ba88312f567fc5841a958" +dependencies = [ + "rustls-pki-types", +] + [[package]] name = "which" version = "4.4.2" diff --git a/crates/job-queue-libsql/Cargo.toml b/crates/job-queue-libsql/Cargo.toml new file mode 100644 index 00000000..849fdb90 --- /dev/null +++ b/crates/job-queue-libsql/Cargo.toml @@ -0,0 +1,21 @@ +[package] +name = "job-queue-libsql" +version = "0.0.0" +edition = "2021" +authors = ["Devolutions Inc. "] +publish = false + +[lints] +workspace = true + +[dependencies] +anyhow = "1" +async-trait = "0.1" +job-queue = { path = "../job-queue" } +libsql = "0.6" +serde = "1" +time = { version = "0.3", default-features = false, features = ["std"] } +tracing = "0.1" +typed-builder = "0.19" +ulid = { version = "1.1", features = ["uuid"] } +uuid = "1.11" diff --git a/crates/job-queue-libsql/src/lib.rs b/crates/job-queue-libsql/src/lib.rs new file mode 100644 index 00000000..e2fe4b80 --- /dev/null +++ b/crates/job-queue-libsql/src/lib.rs @@ -0,0 +1,414 @@ +#[macro_use] +extern crate tracing; + +use anyhow::Context as _; +use async_trait::async_trait; +use job_queue::{DynJob, JobCtx, JobQueue, JobReader, RunnerWaker}; +use libsql::Connection; +use time::OffsetDateTime; +use ulid::Ulid; +use uuid::Uuid; + +pub use libsql; + +/// Implementation of [`JobQueue`] using libSQL as the backend +/// +/// This is inspired by 37signals' Solid Queue: +/// - +/// - +/// +/// And "How to build a job queue with Rust and PostgreSQL" on kerkour.com: +/// - +/// +/// We use the 'user_version' value to store the migration state. +/// It's a very lightweight approach as it is just an integer at a fixed offset in the SQLite file. +/// - +/// - +#[derive(typed_builder::TypedBuilder)] +pub struct LibSqlJobQueue { + runner_waker: RunnerWaker, + conn: Connection, + #[builder(default = 5)] + max_attempts: u32, +} + +#[derive(Debug, Clone, PartialEq)] +#[repr(u32)] +enum JobStatus { + Queued, + Running, +} + +impl LibSqlJobQueue { + async fn apply_pragmas(&self) -> anyhow::Result<()> { + // Inspiration was taken from https://briandouglas.ie/sqlite-defaults/ + const PRAGMAS: &[&str] = &[ + // https://www.sqlite.org/pragma.html#pragma_journal_mode + // Use a write-ahead log instead of a rollback journal to implement transactions. + "PRAGMA journal_mode = WAL", + // https://www.sqlite.org/pragma.html#pragma_synchronous + // TLDR: journal_mode WAL + synchronous NORMAL is a good combination. + // WAL mode is safe from corruption with synchronous=NORMAL + // The synchronous=NORMAL setting is a good choice for most applications running in WAL mode. + "PRAGMA synchronous = NORMAL", + // https://www.sqlite.org/pragma.html#pragma_busy_timeout + // Prevents SQLITE_BUSY errors by giving a timeout to wait for a locked resource before + // returning an error, useful for handling multiple concurrent accesses. + // 15 seconds is a good value for a backend application like a job queue. + "PRAGMA busy_timeout = 15000", + // https://www.sqlite.org/pragma.html#pragma_cache_size + // Reduce the number of disks reads by allowing more data to be cached in memory (3MB). + "PRAGMA cache_size = -3000", + // https://www.sqlite.org/pragma.html#pragma_auto_vacuum + // Reclaims disk space gradually as rows are deleted, instead of performing a full vacuum, + // reducing performance impact during database operations. + "PRAGMA auto_vacuum = INCREMENTAL", + // https://www.sqlite.org/pragma.html#pragma_temp_store + // Store temporary tables and data in memory for better performance + "PRAGMA temp_store = MEMORY", + ]; + + for sql_query in PRAGMAS { + trace!(%sql_query, "PRAGMA query"); + + let mut rows = self + .conn + .query(sql_query, ()) + .await + .context("failed to execute SQL query")?; + + while let Ok(Some(row)) = rows.next().await { + trace!(?row, "PRAGMA row"); + } + } + + Ok(()) + } + + async fn migrate(&self) -> anyhow::Result<()> { + let user_version = self.query_user_version().await?; + + match MIGRATIONS.get(user_version..) { + Some(remaining) if !remaining.is_empty() => { + info!( + user_version, + migration_count = MIGRATIONS.len() - user_version, + "Start migration" + ); + + for (sql_query, migration_id) in remaining.iter().zip(user_version..MIGRATIONS.len()) { + trace!(migration_id, %sql_query, "Apply migration"); + + self.conn + .execute(sql_query, ()) + .await + .with_context(|| format!("failed to execute migration {}", migration_id))?; + + trace!(migration_id, "Applied migration"); + + self.update_user_version(migration_id + 1) + .await + .context("failed to update user version")?; + } + + info!("Migration complete"); + } + None => { + warn!(user_version, "user_version is set to an unexpected value"); + } + _ => { + debug!(user_version, "Database is already up to date"); + } + } + + Ok(()) + } + + async fn query_user_version(&self) -> anyhow::Result { + let sql_query = "PRAGMA user_version"; + + trace!(%sql_query, "Query user_version"); + + let row = self + .conn + .query(sql_query, ()) + .await + .context("failed to execute SQL query")? + .next() + .await + .context("failed to read the row")? + .context("no row returned")?; + + let value = row.get::(0).context("failed to read user_version value")?; + + Ok(usize::try_from(value).expect("number not too big")) + } + + async fn update_user_version(&self, value: usize) -> anyhow::Result<()> { + let value = u64::try_from(value).expect("number not too big"); + + let sql_query = format!("PRAGMA user_version = {value}"); + + trace!(%sql_query, "Update user_version"); + + self.conn + .execute(&sql_query, ()) + .await + .context("failed to execute SQL query")?; + + Ok(()) + } +} + +#[async_trait] +impl JobQueue for LibSqlJobQueue { + async fn setup(&self) -> anyhow::Result<()> { + self.apply_pragmas().await?; + self.migrate().await?; + Ok(()) + } + + async fn reset_claimed_jobs(&self) -> anyhow::Result<()> { + let sql_query = "UPDATE job_queue SET status = :queued_status WHERE status = :running_status"; + + let params = ( + (":running_status", JobStatus::Running as u32), + (":queued_status", JobStatus::Queued as u32), + ); + + trace!(%sql_query, ?params, "Reset claimed jobs"); + + let changed_count = self + .conn + .execute(sql_query, params) + .await + .context("failed to execute SQL query")?; + + trace!(changed_count, "Jobs reset with success"); + + Ok(()) + } + + async fn push_job(&self, job: &DynJob, schedule_for: Option) -> anyhow::Result<()> { + let sql_query = "INSERT INTO job_queue + (id, scheduled_for, failed_attempts, status, name, def) + VALUES (:id, :scheduled_for, :failed_attempts, :status, :name, jsonb(:def))"; + + // UUID v4 only provides randomness, which leads to fragmentation. + // We use ULID instead to reduce index fragmentation. + // https://github.com/ulid/spec + let id = Uuid::from(Ulid::new()).to_string(); + + let schedule_for = schedule_for.unwrap_or_else(|| OffsetDateTime::now_utc()); + + let params = ( + (":id", id), + (":scheduled_for", schedule_for.unix_timestamp()), + (":failed_attempts", 0), + (":status", JobStatus::Queued as u32), + (":name", job.name()), + (":def", job.write_json()?), + ); + + trace!(%sql_query, ?params, "Pushing a new job"); + + self.conn + .execute(sql_query, params) + .await + .context("failed to execute SQL query")?; + + // Notify the waker that a new job is ready for processing. + self.runner_waker.wake(); + + Ok(()) + } + + async fn claim_jobs(&self, reader: &dyn JobReader, number_of_jobs: usize) -> anyhow::Result> { + let number_of_jobs = u32::try_from(number_of_jobs).context("number_of_jobs is too big")?; + + // If we were using Postgres, we would need to use `FOR UPDATE SKIP LOCKED` + // in the SQL query to avoid blocking other readers/writers. + // For MySQL, this would be `FOR UPDATE NOWAIT` + // However, in SQLite / libSQL, there is only a single writer at a time. + // As such, this directive doesn't exist. + + let sql_query = "UPDATE job_queue + SET status = :running_status + WHERE id IN ( + SELECT id + FROM job_queue + WHERE status = :queued_status AND failed_attempts < :max_attempts AND scheduled_for <= unixepoch() + ORDER BY id + LIMIT :number_of_jobs + ) + RETURNING id, failed_attempts, name, json(def) as def"; + + let params = ( + (":running_status", JobStatus::Running as u32), + (":queued_status", JobStatus::Queued as u32), + (":max_attempts", self.max_attempts), + (":number_of_jobs", number_of_jobs), + ); + + trace!(%sql_query, ?params, "Claiming jobs"); + + let mut rows = self + .conn + .query(sql_query, params) + .await + .context("failed to execute SQL query")?; + + let mut jobs = Vec::new(); + + loop { + let row = rows.next().await; + + let row = match row { + Ok(row) => row, + Err(error) => { + error!(%error, "Failed to get next row"); + break; + } + }; + + let Some(row) = row else { + break; + }; + + match libsql::de::from_row::<'_, JobModel>(&row) { + Ok(model) => match reader.read_json(&model.name, &model.def) { + Ok(job) => jobs.push(JobCtx { + id: model.id, + failed_attempts: model.failed_attempts, + job, + }), + Err(e) => { + error!( + error = format!("{e:#}"), + "Failed read job definition; delete the invalid job" + ); + let _ = self.delete_job(model.id).await; + } + }, + Err(error) => { + error!(%error, ?row, "Failed to read row"); + } + } + } + + return Ok(jobs); + + #[derive(serde::Deserialize, Debug, Clone)] + struct JobModel { + id: Uuid, + failed_attempts: u32, + name: String, + def: String, + } + } + + async fn delete_job(&self, id: Uuid) -> anyhow::Result<()> { + let sql_query = "DELETE FROM job_queue WHERE id = $1"; + let params = [id.to_string()]; + + trace!(%sql_query, ?params, "Deleting job"); + + self.conn + .execute(sql_query, params) + .await + .context("failed to execute SQL query")?; + + Ok(()) + } + + async fn fail_job(&self, id: Uuid, schedule_for: OffsetDateTime) -> anyhow::Result<()> { + let sql_query = "UPDATE job_queue + SET + status = :queued_status, + failed_attempts = failed_attempts + 1, + scheduled_for = :scheduled_for + WHERE id = :id"; + + let params = ( + (":queued_status", JobStatus::Queued as u32), + (":scheduled_for", schedule_for.unix_timestamp()), + (":id", id.to_string()), + ); + + trace!(%sql_query, ?params, "Marking job as failed"); + + self.conn + .execute(sql_query, params) + .await + .context("failed to execute SQL query")?; + + Ok(()) + } + + async fn clear_failed(&self) -> anyhow::Result<()> { + let sql_query = "DELETE FROM job_queue WHERE failed_attempts >= $1"; + let params = [self.max_attempts]; + + trace!(%sql_query, ?params, "Clearing failed jobs"); + + let deleted_count = self + .conn + .execute(sql_query, params) + .await + .context("failed to execute SQL query")?; + + trace!(deleted_count, "Cleared failed jobs with success"); + + Ok(()) + } + + async fn next_scheduled_date(&self) -> anyhow::Result> { + let sql_query = "SELECT scheduled_for + FROM job_queue + WHERE status = :queued_status AND failed_attempts < :max_attempts + ORDER BY scheduled_for ASC + LIMIT 1"; + + let params = ( + (":queued_status", JobStatus::Queued as u32), + (":max_attempts", self.max_attempts), + ); + + trace!(%sql_query, ?params, "Fetching the earliest scheduled_for date"); + + let mut rows = self + .conn + .query(sql_query, params) + .await + .context("failed to execute SQL query")?; + + let Some(row) = rows.next().await.context("failed to read the row")? else { + return Ok(None); + }; + + let scheduled_for = row.get::(0).context("failed to read scheduled_for value")?; + let scheduled_for = + OffsetDateTime::from_unix_timestamp(scheduled_for).context("invalid UNIX timestamp for scheduled_for")?; + + Ok(Some(scheduled_for)) + } +} + +// Typically, migrations should not be modified once released, and we should only be appending to this list. +const MIGRATIONS: &[&str] = &[ + "CREATE TABLE job_queue ( + id TEXT NOT NULL PRIMARY KEY, + created_at INT NOT NULL DEFAULT (unixepoch()), + updated_at INT NOT NULL DEFAULT (unixepoch()), + scheduled_for INT NOT NULL, + failed_attempts INT NOT NULL, + status INT NOT NULL, + name TEXT NOT NULL, + def BLOB NOT NULL + ) STRICT", + "CREATE TRIGGER update_job_updated_at_on_update AFTER UPDATE ON job_queue + BEGIN + UPDATE job_queue SET updated_at = unixepoch() WHERE id == NEW.id; + END", + "CREATE INDEX idx_scheduled_for ON job_queue(scheduled_for)", +]; diff --git a/crates/job-queue/Cargo.toml b/crates/job-queue/Cargo.toml new file mode 100644 index 00000000..ec87b437 --- /dev/null +++ b/crates/job-queue/Cargo.toml @@ -0,0 +1,16 @@ +[package] +name = "job-queue" +version = "0.0.0" +edition = "2021" +authors = ["Devolutions Inc. "] +publish = false + +[lints] +workspace = true + +[dependencies] +anyhow = "1" +async-trait = "0.1" +time = { version = "0.3", default-features = false } +tracing = "0.1" +uuid = "1.11" diff --git a/crates/job-queue/src/lib.rs b/crates/job-queue/src/lib.rs new file mode 100644 index 00000000..93232bd8 --- /dev/null +++ b/crates/job-queue/src/lib.rs @@ -0,0 +1,213 @@ +#[macro_use] +extern crate tracing; + +use std::future::Future; +use std::pin::Pin; +use std::sync::Arc; + +use async_trait::async_trait; +use time::OffsetDateTime; +use uuid::Uuid; + +pub type DynJob = Box; + +pub type DynJobQueue = Arc; + +#[async_trait] +pub trait Job: Send + Sync { + fn name(&self) -> &str; + + fn write_json(&self) -> anyhow::Result; + + /// Run the associated job + /// + /// You should assume that the execution could be stopped at any point and write cancel-safe code. + async fn run(&mut self) -> anyhow::Result<()>; +} + +pub trait JobReader: Send + Sync { + fn read_json(&self, name: &str, json: &str) -> anyhow::Result; +} + +#[async_trait] +pub trait JobQueue: Send + Sync { + /// Performs initial setup required before actually using the queue + /// + /// This function should be called first, before using any of the other functions. + async fn setup(&self) -> anyhow::Result<()>; + + /// Resets the status for the jobs claimed + /// + /// Uses this at startup to re-enqueue jobs that didn't run to completion. + async fn reset_claimed_jobs(&self) -> anyhow::Result<()>; + + /// Pushes a new job into the queue + /// + /// This function should ideally call `RunnerWaker::wake()` once the job is enqueued. + async fn push_job(&self, job: &DynJob, schedule_for: Option) -> anyhow::Result<()>; + + /// Fetches at most `number_of_jobs` from the queue + async fn claim_jobs(&self, reader: &dyn JobReader, number_of_jobs: usize) -> anyhow::Result>; + + /// Removes a job from the queue + async fn delete_job(&self, job_id: Uuid) -> anyhow::Result<()>; + + /// Marks a job as failed + /// + /// Failed jobs are re-queued to be tried again later. + async fn fail_job(&self, job_id: Uuid, schedule_for: OffsetDateTime) -> anyhow::Result<()>; + + /// Removes jobs which can't be retried + async fn clear_failed(&self) -> anyhow::Result<()>; + + /// Retrieves the closest future scheduled date + async fn next_scheduled_date(&self) -> anyhow::Result>; +} + +pub struct JobCtx { + pub id: Uuid, + pub failed_attempts: u32, + pub job: DynJob, +} + +#[derive(Clone)] +pub struct RunnerWaker(Arc); + +impl RunnerWaker { + pub fn new(f: F) -> Self { + Self(Arc::new(f)) + } + + pub fn wake(&self) { + (self.0)() + } +} + +pub type SpawnCallback = Box) -> Pin + Send>> + Send>; + +pub type DynFuture = Pin + Send>>; + +pub struct JobRunner<'a> { + pub queue: DynJobQueue, + pub reader: &'a dyn JobReader, + pub spawn: &'a (dyn Fn(JobCtx, SpawnCallback) + Sync), + pub sleep: &'a (dyn Fn(std::time::Duration) -> DynFuture + Sync), + pub wait_notified: &'a (dyn Fn() -> DynFuture + Sync), + pub wait_notified_timeout: &'a (dyn Fn(std::time::Duration) -> DynFuture + Sync), + pub waker: RunnerWaker, + pub max_batch_size: usize, +} + +impl JobRunner<'_> { + pub async fn run(self) { + use std::sync::atomic::{AtomicUsize, Ordering}; + use std::time::{Duration, Instant}; + + const MINIMUM_WAIT_DURATION: Duration = Duration::from_millis(200); + + let Self { + queue, + reader, + spawn, + sleep, + waker, + wait_notified, + wait_notified_timeout, + max_batch_size, + } = self; + + let running_count: Arc = Arc::new(AtomicUsize::new(0)); + + loop { + let batch_size = max_batch_size - running_count.load(Ordering::SeqCst); + + let jobs = match queue.claim_jobs(reader, batch_size).await { + Ok(jobs) => jobs, + Err(e) => { + error!(error = format!("{e:#}"), "Failed to pull jobs"); + (sleep)(Duration::from_secs(30)).await; + continue; + } + }; + + trace!(number_of_jobs = jobs.len(), "Fetched jobs"); + + for job in jobs { + let job_id = job.id; + let failed_attempts = job.failed_attempts; + + let callback = Box::new({ + let queue = Arc::clone(&queue); + let running_count = Arc::clone(&running_count); + let waker = waker.clone(); + + move |result: anyhow::Result<()>| { + let fut = async move { + match result { + Ok(()) => { + if let Err(e) = queue.delete_job(job_id).await { + error!(error = format!("{e:#}"), "Failed to delete job"); + } + } + Err(e) => { + warn!(error = format!("{e:#}"), %job_id, "Job failed"); + + let schedule_for = + OffsetDateTime::now_utc() + (1 << failed_attempts) * Duration::from_secs(30); + + if let Err(e) = queue.fail_job(job_id, schedule_for).await { + error!(error = format!("{e:#}"), "Failed to mark job as failed") + } + } + } + + running_count.fetch_sub(1, Ordering::SeqCst); + waker.wake(); + }; + + (Box::new(fut) as Box + Send>).into() + } + }); + + (spawn)(job, callback); + + running_count.fetch_add(1, Ordering::SeqCst); + } + + let next_scheduled = if running_count.load(Ordering::SeqCst) < max_batch_size { + queue + .next_scheduled_date() + .await + .ok() + .flatten() + .map(|date| date.unix_timestamp() - OffsetDateTime::now_utc().unix_timestamp()) + .inspect(|next_scheduled| trace!("Next task in {next_scheduled} seconds")) + } else { + None + }; + + let before_wait = Instant::now(); + + // Wait for something to happen. + // This could be a notification that a new job has been pushed, or that a running job is terminated. + if let Some(timeout) = next_scheduled { + // If the next task was scheduled in < 0 seconds, skip the wait step. + // This happens because there is a delay between the moment the jobs to run are claimed, and the moment + // we check for the next closest scheduled job. + if let Ok(timeout) = u64::try_from(timeout) { + (wait_notified_timeout)(Duration::from_secs(timeout)).await; + } + } else { + (wait_notified)().await; + } + + let elapsed = before_wait.elapsed(); + + // Make sure we wait a little bit to avoid overloading the database. + if elapsed < MINIMUM_WAIT_DURATION { + let sleep_duration = MINIMUM_WAIT_DURATION - elapsed; + (sleep)(sleep_duration).await; + } + } + } +} diff --git a/devolutions-gateway/Cargo.toml b/devolutions-gateway/Cargo.toml index 3a414bd0..167fb365 100644 --- a/devolutions-gateway/Cargo.toml +++ b/devolutions-gateway/Cargo.toml @@ -24,6 +24,8 @@ jmux-proxy = { path = "../crates/jmux-proxy" } devolutions-agent-shared = { path = "../crates/devolutions-agent-shared" } devolutions-gateway-task = { path = "../crates/devolutions-gateway-task" } devolutions-log = { path = "../crates/devolutions-log" } +job-queue = { path = "../crates/job-queue" } +job-queue-libsql = { path = "../crates/job-queue-libsql" } ironrdp-pdu = { version = "0.1", git = "https://github.com/Devolutions/IronRDP", rev = "7c268d863048d0a9182b3f7bf778668de8db4ccf", features = ["std"] } ironrdp-core = { version = "0.1", git = "https://github.com/Devolutions/IronRDP", rev = "7c268d863048d0a9182b3f7bf778668de8db4ccf", features = ["std"] } ironrdp-rdcleanpath = { version = "0.1", git = "https://github.com/Devolutions/IronRDP", rev = "7c268d863048d0a9182b3f7bf778668de8db4ccf" } diff --git a/devolutions-gateway/src/api/jrec.rs b/devolutions-gateway/src/api/jrec.rs index c7c6d288..6753ffb7 100644 --- a/devolutions-gateway/src/api/jrec.rs +++ b/devolutions-gateway/src/api/jrec.rs @@ -158,8 +158,8 @@ pub(crate) struct DeleteManyResult { /// Mass-deletes recordings stored on this instance /// -/// If you try to delete more than 1,000,000 recordings at once, you should split the list into multiple requests -/// to avoid timing out during the processing of the request. +/// If you try to delete more than 50,000 recordings at once, you should split the list into multiple requests. +/// Bigger payloads will be rejected with 413 Payload Too Large. /// /// The request processing consist in /// 1) checking if one of the recording is active, @@ -182,6 +182,7 @@ pub(crate) struct DeleteManyResult { (status = 401, description = "Invalid or missing authorization token"), (status = 403, description = "Insufficient permissions"), (status = 406, description = "A recording is still ongoing and can't be deleted yet (nothing is deleted)"), + (status = 413, description = "Request payload is too large"), ), security(("scope_token" = ["gateway.recording.delete"])), ))] @@ -189,6 +190,7 @@ async fn jrec_delete_many( State(DgwState { conf_handle, recordings, + job_queue_handle, .. }): State, _scope: RecordingDeleteScope, @@ -196,38 +198,36 @@ async fn jrec_delete_many( ) -> Result, HttpError> { use std::collections::HashSet; - const BLOCKING_THRESHOLD: usize = 100_000; + const THRESHOLD: usize = 50_000; + const CHUNK_SIZE: usize = 1_000; + + if delete_list.len() > THRESHOLD { + return Err(HttpErrorBuilder::new(StatusCode::PAYLOAD_TOO_LARGE).msg("delete list is too big")); + } let recording_path = conf_handle.get_conf().recording_path.clone(); let active_recordings = recordings.active_recordings.cloned(); - // When deleting many many recordings, check_preconditions may take more than 250ms to execute. - // For this reason, we defensively spawn a blocking task. + // Given the threshold of 50,000, it's high unlikely that check_preconditions takes more than 250ms to execute. + // It typically takes between 50ms and 100ms depending on the hardware. let ProcessResult { not_found_count, found_count, recording_paths, - } = if delete_list.len() > BLOCKING_THRESHOLD { - let join_handle = - tokio::task::spawn_blocking(move || process_request(delete_list, &recording_path, &active_recordings)); - join_handle.await.map_err(HttpError::internal().err())?? - } else { - process_request(delete_list, &recording_path, &active_recordings)? - }; + } = process_request(delete_list, &recording_path, &active_recordings)?; - // FIXME: It would be better to have a job queue for this kind of things in case the service is killed. - tokio::spawn({ - async move { - for (session_id, path) in recording_paths { - if let Err(error) = delete_recording(&path).await { - error!( - error = format!("{error:#}"), - "Failed to delete recording for session {session_id}" - ); - } - } - } - }); + for chunk in recording_paths.chunks(CHUNK_SIZE) { + job_queue_handle + .enqueue(DeleteRecordingsJob { + recording_paths: chunk.to_vec(), + }) + .await + .map_err( + HttpError::internal() + .with_msg("couldn't enqueue the deletion task") + .err(), + )?; + } let delete_many_result = DeleteManyResult { found_count, @@ -283,6 +283,39 @@ async fn jrec_delete_many( } } +#[derive(Deserialize, Serialize)] +pub struct DeleteRecordingsJob { + recording_paths: Vec<(Uuid, Utf8PathBuf)>, +} + +impl DeleteRecordingsJob { + pub const NAME: &'static str = "delete-recordings"; +} + +#[axum::async_trait] +impl job_queue::Job for DeleteRecordingsJob { + fn name(&self) -> &str { + Self::NAME + } + + fn write_json(&self) -> anyhow::Result { + serde_json::to_string(self).context("failed to serialize RemuxAction") + } + + async fn run(&mut self) -> anyhow::Result<()> { + for (session_id, path) in core::mem::take(&mut self.recording_paths) { + if let Err(error) = delete_recording(&path).await { + debug!( + error = format!("{error:#}"), + "Failed to delete recording for session {session_id}" + ); + } + } + + Ok(()) + } +} + async fn delete_recording(recording_path: &Utf8Path) -> anyhow::Result<()> { info!(%recording_path, "Delete recording"); diff --git a/devolutions-gateway/src/config.rs b/devolutions-gateway/src/config.rs index 7563a05e..466773cc 100644 --- a/devolutions-gateway/src/config.rs +++ b/devolutions-gateway/src/config.rs @@ -72,6 +72,7 @@ pub struct Conf { pub listeners: Vec, pub subscriber: Option, pub log_file: Utf8PathBuf, + pub job_queue_database: Utf8PathBuf, pub tls: Option, pub provisioner_public_key: PublicKey, pub provisioner_private_key: Option, @@ -221,6 +222,12 @@ impl Conf { .unwrap_or_else(|| Utf8PathBuf::from("gateway")) .pipe_ref(|path| normalize_data_path(path, &data_dir)); + let job_queue_database = conf_file + .job_queue_database + .clone() + .unwrap_or_else(|| Utf8PathBuf::from("job_queue.db")) + .pipe_ref(|path| normalize_data_path(path, &data_dir)); + let jrl_file = conf_file .jrl_file .clone() @@ -277,6 +284,7 @@ impl Conf { listeners, subscriber: conf_file.subscriber.clone(), log_file, + job_queue_database, tls, provisioner_public_key, provisioner_private_key, @@ -1000,6 +1008,10 @@ pub mod dto { #[serde(skip_serializing_if = "Option::is_none")] pub sogar: Option, + /// (Unstable) Path to the SQLite database file for the job queue + #[serde(skip_serializing_if = "Option::is_none")] + pub job_queue_database: Option, + /// (Unstable) Unsafe debug options for developers #[serde(rename = "__debug__", skip_serializing_if = "Option::is_none")] pub debug: Option, @@ -1049,6 +1061,7 @@ pub mod dto { recording_path: None, web_app: None, sogar: None, + job_queue_database: None, debug: None, rest: serde_json::Map::new(), } @@ -1076,7 +1089,7 @@ pub mod dto { match self { VerbosityProfile::Default => "info", VerbosityProfile::Debug => { - "info,devolutions_gateway=debug,devolutions_gateway::api=trace,jmux_proxy=debug,tower_http=trace" + "info,devolutions_gateway=debug,devolutions_gateway::api=trace,jmux_proxy=debug,tower_http=trace,job_queue=trace,job_queue_libsql=trace" } VerbosityProfile::Tls => { "info,devolutions_gateway=debug,devolutions_gateway::tls=trace,rustls=trace,tokio_rustls=debug" diff --git a/devolutions-gateway/src/job_queue.rs b/devolutions-gateway/src/job_queue.rs new file mode 100644 index 00000000..38893ebb --- /dev/null +++ b/devolutions-gateway/src/job_queue.rs @@ -0,0 +1,301 @@ +use std::sync::Arc; +use std::time::Duration; +use std::{future::Future, path::Path}; + +use anyhow::Context as _; +use axum::async_trait; +use devolutions_gateway_task::{ChildTask, ShutdownSignal, Task}; +use job_queue::{DynJobQueue, Job, JobCtx, JobQueue, JobReader, JobRunner, RunnerWaker}; +use job_queue_libsql::libsql; +use time::OffsetDateTime; +use tokio::sync::{mpsc, Notify}; + +pub struct JobQueueCtx { + notify_runner: Arc, + runner_waker: RunnerWaker, + queue: DynJobQueue, + job_queue_rx: JobQueueReceiver, + pub job_queue_handle: JobQueueHandle, +} + +pub struct JobMessage { + pub job: Box, + pub schedule_for: Option, +} + +#[derive(Clone)] +pub struct JobQueueHandle(mpsc::Sender); + +pub type JobQueueReceiver = mpsc::Receiver; + +pub struct JobQueueTask { + queue: DynJobQueue, + job_queue_rx: JobQueueReceiver, +} + +pub struct JobRunnerTask { + notify_runner: Arc, + runner_waker: RunnerWaker, + queue: DynJobQueue, +} + +impl JobQueueCtx { + pub async fn init(database_path: &Path) -> anyhow::Result { + let notify_runner = Arc::new(Notify::new()); + + let runner_waker = RunnerWaker::new({ + let notify_runner = Arc::clone(¬ify_runner); + move || notify_runner.notify_one() + }); + + let database = libsql::Builder::new_local(database_path) + .build() + .await + .context("build database")?; + + let conn = database.connect().context("open database connection")?; + + let queue = job_queue_libsql::LibSqlJobQueue::builder() + .runner_waker(runner_waker.clone()) + .conn(conn) + .build(); + + let queue = Arc::new(queue); + + queue.setup().await.context("queue setup")?; + + queue + .reset_claimed_jobs() + .await + .context("failed to reset claimed jobs")?; + + queue.clear_failed().await.context("failed to clear failed jobs")?; + + let (handle, rx) = JobQueueHandle::new(); + + Ok(Self { + notify_runner, + runner_waker, + queue, + job_queue_rx: rx, + job_queue_handle: handle, + }) + } +} + +impl JobQueueHandle { + pub fn new() -> (Self, JobQueueReceiver) { + let (tx, rx) = mpsc::channel(512); + (Self(tx), rx) + } + + pub fn blocking_enqueue(&self, job: T) -> anyhow::Result<()> { + self.0 + .blocking_send(JobMessage { + job: Box::new(job), + schedule_for: None, + }) + .context("couldn't enqueue job") + } + + pub async fn enqueue(&self, job: T) -> anyhow::Result<()> { + self.0 + .send(JobMessage { + job: Box::new(job), + schedule_for: None, + }) + .await + .context("couldn't enqueue job") + } + + pub async fn blocking_schedule( + &self, + job: T, + schedule_for: OffsetDateTime, + ) -> anyhow::Result<()> { + self.0 + .blocking_send(JobMessage { + job: Box::new(job), + schedule_for: Some(schedule_for), + }) + .context("couldn't enqueue job") + } + + pub async fn schedule(&self, job: T, schedule_for: OffsetDateTime) -> anyhow::Result<()> { + self.0 + .send(JobMessage { + job: Box::new(job), + schedule_for: Some(schedule_for), + }) + .await + .context("couldn't enqueue job") + } +} + +impl JobQueueTask { + pub fn new(ctx: JobQueueCtx) -> Self { + Self { + queue: ctx.queue, + job_queue_rx: ctx.job_queue_rx, + } + } +} + +#[async_trait] +impl Task for JobQueueTask { + type Output = anyhow::Result<()>; + + const NAME: &'static str = "job queue"; + + async fn run(self, shutdown_signal: ShutdownSignal) -> Self::Output { + job_queue_task(self, shutdown_signal).await + } +} + +#[instrument(skip_all)] +async fn job_queue_task(ctx: JobQueueTask, mut shutdown_signal: ShutdownSignal) -> anyhow::Result<()> { + debug!("Task started"); + + let JobQueueTask { + queue, + mut job_queue_rx, + } = ctx; + + loop { + tokio::select! { + msg = job_queue_rx.recv() => { + let Some(msg) = msg else { + debug!("All senders are dead"); + break; + }; + + ChildTask::spawn({ + let queue = Arc::clone(&queue); + + async move { + for _ in 0..5 { + match queue.push_job(&msg.job, msg.schedule_for).await { + Ok(()) => break, + Err(e) => { + warn!(error = format!("{e:#}"), "Failed to push job"); + tokio::time::sleep(Duration::from_secs(20)).await; + } + } + } + } + }) + .detach(); + } + () = shutdown_signal.wait() => break, + } + } + + debug!("Task terminated"); + + Ok(()) +} + +impl JobRunnerTask { + pub fn new(ctx: &JobQueueCtx) -> Self { + Self { + notify_runner: Arc::clone(&ctx.notify_runner), + runner_waker: RunnerWaker::clone(&ctx.runner_waker), + queue: Arc::clone(&ctx.queue), + } + } +} + +#[async_trait] +impl Task for JobRunnerTask { + type Output = anyhow::Result<()>; + + const NAME: &'static str = "job queue"; + + async fn run(self, shutdown_signal: ShutdownSignal) -> Self::Output { + job_runner_task(self, shutdown_signal).await + } +} + +#[instrument(skip_all)] +async fn job_runner_task(ctx: JobRunnerTask, mut shutdown_signal: ShutdownSignal) -> anyhow::Result<()> { + debug!("Task started"); + + let JobRunnerTask { + notify_runner, + runner_waker, + queue, + } = ctx; + + let reader = DgwJobReader; + + let spawn = |mut ctx: JobCtx, callback: job_queue::SpawnCallback| { + tokio::spawn(async move { + let result = ctx.job.run().await; + (callback)(result).await; + }); + }; + + let sleep = + |duration: Duration| (Box::new(tokio::time::sleep(duration)) as Box + Send>).into(); + + let wait_notified = { + let notify_runner = Arc::clone(¬ify_runner); + move || { + let notify_runner = Arc::clone(¬ify_runner); + (Box::new(async move { notify_runner.notified().await }) as Box + Send>).into() + } + }; + + let wait_notified_timeout = move |timeout: Duration| { + let notify_runner = Arc::clone(¬ify_runner); + (Box::new(async move { + tokio::select! { + () = notify_runner.notified() => {} + () = tokio::time::sleep(timeout) => {} + } + }) as Box + Send>) + .into() + }; + + let runner = JobRunner { + queue, + reader: &reader, + spawn: &spawn, + sleep: &sleep, + wait_notified: &wait_notified, + wait_notified_timeout: &wait_notified_timeout, + waker: runner_waker, + max_batch_size: 16, + }; + + tokio::select! { + () = runner.run() => {} + () = shutdown_signal.wait() => {} + } + + debug!("Task terminated"); + + Ok(()) +} + +struct DgwJobReader; + +impl JobReader for DgwJobReader { + fn read_json(&self, name: &str, json: &str) -> anyhow::Result { + use crate::api::jrec::DeleteRecordingsJob; + use crate::recording::RemuxJob; + + match name { + RemuxJob::NAME => { + let job: RemuxJob = serde_json::from_str(json).context("failed to deserialize RemuxJob")?; + Ok(Box::new(job)) + } + DeleteRecordingsJob::NAME => { + let job: DeleteRecordingsJob = + serde_json::from_str(json).context("failed to deserialize DeleteRecordingsJob")?; + Ok(Box::new(job)) + } + _ => anyhow::bail!("unknown job name: {name}"), + } + } +} diff --git a/devolutions-gateway/src/lib.rs b/devolutions-gateway/src/lib.rs index 559e774a..5caa8680 100644 --- a/devolutions-gateway/src/lib.rs +++ b/devolutions-gateway/src/lib.rs @@ -20,6 +20,7 @@ pub mod generic_client; pub mod http; pub mod interceptor; pub mod jmux; +pub mod job_queue; pub mod listener; pub mod log; pub mod middleware; @@ -48,6 +49,7 @@ pub struct DgwState { pub subscriber_tx: subscriber::SubscriberSender, pub shutdown_signal: devolutions_gateway_task::ShutdownSignal, pub recordings: recording::RecordingMessageSender, + pub job_queue_handle: job_queue::JobQueueHandle, } #[doc(hidden)] @@ -55,6 +57,7 @@ pub struct MockHandles { pub session_manager_rx: session::SessionMessageReceiver, pub recording_manager_rx: recording::RecordingMessageReceiver, pub subscriber_rx: subscriber::SubscriberReceiver, + pub job_queue_rx: job_queue::JobQueueReceiver, pub shutdown_handle: devolutions_gateway_task::ShutdownHandle, } @@ -68,6 +71,7 @@ impl DgwState { let (recording_manager_handle, recording_manager_rx) = recording::recording_message_channel(); let (subscriber_tx, subscriber_rx) = subscriber::subscriber_channel(); let (shutdown_handle, shutdown_signal) = devolutions_gateway_task::ShutdownHandle::new(); + let (job_queue_handle, job_queue_rx) = job_queue::JobQueueHandle::new(); let state = Self { conf_handle, @@ -77,12 +81,14 @@ impl DgwState { subscriber_tx, shutdown_signal, recordings: recording_manager_handle, + job_queue_handle, }; let handles = MockHandles { session_manager_rx, recording_manager_rx, subscriber_rx, + job_queue_rx, shutdown_handle, }; diff --git a/devolutions-gateway/src/main.rs b/devolutions-gateway/src/main.rs index 00271b76..34551fa0 100644 --- a/devolutions-gateway/src/main.rs +++ b/devolutions-gateway/src/main.rs @@ -8,12 +8,13 @@ use rustls_cng as _; use utoipa as _; use { argon2 as _, async_trait as _, axum as _, axum_extra as _, backoff as _, bytes as _, camino as _, - devolutions_agent_shared as _, dlopen as _, dlopen_derive as _, etherparse as _, hostname as _, - http_body_util as _, hyper as _, hyper_util as _, ironrdp_pdu as _, ironrdp_rdcleanpath as _, jmux_proxy as _, - multibase as _, network_scanner as _, ngrok as _, nonempty as _, pcap_file as _, picky as _, picky_krb as _, - pin_project_lite as _, portpicker as _, reqwest as _, serde as _, serde_urlencoded as _, smol_str as _, - sysinfo as _, thiserror as _, time as _, tokio_rustls as _, tokio_tungstenite as _, tower as _, tower_http as _, - transport as _, tungstenite as _, typed_builder as _, url as _, uuid as _, zeroize as _, + devolutions_agent_shared as _, dlopen as _, dlopen_derive as _, dunce as _, etherparse as _, hostname as _, + http_body_util as _, hyper as _, hyper_util as _, ironrdp_core as _, ironrdp_pdu as _, ironrdp_rdcleanpath as _, + jmux_proxy as _, job_queue as _, job_queue_libsql as _, multibase as _, network_scanner as _, ngrok as _, + nonempty as _, pcap_file as _, picky as _, picky_krb as _, pin_project_lite as _, portpicker as _, reqwest as _, + serde as _, serde_urlencoded as _, smol_str as _, sysinfo as _, thiserror as _, time as _, tokio_rustls as _, + tokio_tungstenite as _, tower as _, tower_http as _, transport as _, tungstenite as _, typed_builder as _, + url as _, uuid as _, zeroize as _, }; // Used by tests. diff --git a/devolutions-gateway/src/recording.rs b/devolutions-gateway/src/recording.rs index cddafbf0..7d558532 100644 --- a/devolutions-gateway/src/recording.rs +++ b/devolutions-gateway/src/recording.rs @@ -18,6 +18,7 @@ use tokio::{fs, io}; use typed_builder::TypedBuilder; use uuid::Uuid; +use crate::job_queue::JobQueueHandle; use crate::session::SessionMessageSender; use crate::token::{JrecTokenClaims, RecordingFileType}; @@ -349,6 +350,7 @@ pub struct RecordingManagerTask { ongoing_recordings: HashMap, recordings_path: Utf8PathBuf, session_manager_handle: SessionMessageSender, + job_queue_handle: JobQueueHandle, } impl RecordingManagerTask { @@ -356,12 +358,14 @@ impl RecordingManagerTask { rx: RecordingMessageReceiver, recordings_path: Utf8PathBuf, session_manager_handle: SessionMessageSender, + job_queue_handle: JobQueueHandle, ) -> Self { Self { rx, ongoing_recordings: HashMap::new(), recordings_path, session_manager_handle, + job_queue_handle, } } @@ -469,7 +473,7 @@ impl RecordingManagerTask { Ok(recording_file) } - fn handle_disconnect(&mut self, id: Uuid) -> anyhow::Result<()> { + async fn handle_disconnect(&mut self, id: Uuid) -> anyhow::Result<()> { if let Some(ongoing) = self.ongoing_recordings.get_mut(&id) { if !matches!(ongoing.state, OnGoingRecordingState::Connected) { anyhow::bail!("a recording not connected can’t be disconnected (there is probably a bug)"); @@ -504,8 +508,13 @@ impl RecordingManagerTask { if recording_file_path.extension() == Some(RecordingFileType::WebM.extension()) { if cadeau::xmf::is_init() { debug!(%recording_file_path, "Enqueue video remuxing operation"); - // FIXME: It would be better to have a job queue for this kind of things in case the service is killed. - tokio::spawn(remux(recording_file_path)); + + let _ = self + .job_queue_handle + .enqueue(RemuxJob { + input_path: recording_file_path, + }) + .await; } else { debug!("Video remuxing was skipped because XMF native library is not loaded"); } @@ -628,7 +637,7 @@ async fn recording_manager_task( } }, RecordingManagerMessage::Disconnect { id } => { - if let Err(e) = manager.handle_disconnect(id) { + if let Err(e) = manager.handle_disconnect(id).await { error!(error = format!("{e:#}"), "handle_disconnect"); } @@ -691,7 +700,7 @@ async fn recording_manager_task( debug!(?msg, "Received message"); if let RecordingManagerMessage::Disconnect { id } = msg { - if let Err(e) = manager.handle_disconnect(id) { + if let Err(e) = manager.handle_disconnect(id).await { error!(error = format!("{e:#}"), "handle_disconnect"); } manager.ongoing_recordings.remove(&id); @@ -703,7 +712,32 @@ async fn recording_manager_task( Ok(()) } -pub async fn remux(input_path: Utf8PathBuf) { +#[derive(Deserialize, Serialize)] +pub struct RemuxJob { + input_path: Utf8PathBuf, +} + +impl RemuxJob { + pub const NAME: &'static str = "remux"; +} + +#[async_trait] +impl job_queue::Job for RemuxJob { + fn name(&self) -> &str { + Self::NAME + } + + fn write_json(&self) -> anyhow::Result { + serde_json::to_string(self).context("failed to serialize RemuxAction") + } + + async fn run(&mut self) -> anyhow::Result<()> { + remux(core::mem::take(&mut self.input_path)).await; + Ok(()) + } +} + +async fn remux(input_path: Utf8PathBuf) { // CPU-intensive operation potentially lasting much more than 100ms. match tokio::task::spawn_blocking(move || remux_impl(input_path)).await { Err(error) => error!(%error, "Couldn't join the CPU-intensive muxer task"), diff --git a/devolutions-gateway/src/service.rs b/devolutions-gateway/src/service.rs index a0fe4a18..898e8567 100644 --- a/devolutions-gateway/src/service.rs +++ b/devolutions-gateway/src/service.rs @@ -232,6 +232,9 @@ async fn spawn_tasks(conf_handle: ConfHandle) -> anyhow::Result { let (recording_manager_handle, recording_manager_rx) = recording_message_channel(); let (subscriber_tx, subscriber_rx) = subscriber_channel(); let mut tasks = Tasks::new(); + let job_queue_ctx = devolutions_gateway::job_queue::JobQueueCtx::init(conf.job_queue_database.as_std_path()) + .await + .context("failed to initialize job queue context")?; let state = DgwState { conf_handle: conf_handle.clone(), @@ -241,6 +244,7 @@ async fn spawn_tasks(conf_handle: ConfHandle) -> anyhow::Result { subscriber_tx: subscriber_tx.clone(), shutdown_signal: tasks.shutdown_signal.clone(), recordings: recording_manager_handle.clone(), + job_queue_handle: job_queue_ctx.job_queue_handle.clone(), }; conf.listeners @@ -294,8 +298,13 @@ async fn spawn_tasks(conf_handle: ConfHandle) -> anyhow::Result { recording_manager_rx, conf.recording_path.clone(), session_manager_handle, + job_queue_ctx.job_queue_handle.clone(), )); + tasks.register(devolutions_gateway::job_queue::JobRunnerTask::new(&job_queue_ctx)); + + tasks.register(devolutions_gateway::job_queue::JobQueueTask::new(job_queue_ctx)); + Ok(tasks) } diff --git a/devolutions-gateway/tests/config.rs b/devolutions-gateway/tests/config.rs index 31c04ffb..cc0691af 100644 --- a/devolutions-gateway/tests/config.rs +++ b/devolutions-gateway/tests/config.rs @@ -85,6 +85,7 @@ fn hub_sample() -> Sample { plugins: None, recording_path: None, sogar: None, + job_queue_database: None, ngrok: None, verbosity_profile: Some(VerbosityProfile::Tls), web_app: None, @@ -127,6 +128,7 @@ fn legacy_sample() -> Sample { plugins: None, recording_path: None, sogar: None, + job_queue_database: None, ngrok: None, verbosity_profile: None, web_app: None, @@ -168,6 +170,7 @@ fn system_store_sample() -> Sample { plugins: None, recording_path: None, sogar: None, + job_queue_database: None, ngrok: None, verbosity_profile: None, web_app: None, @@ -234,6 +237,7 @@ fn standalone_custom_auth_sample() -> Sample { plugins: None, recording_path: None, sogar: None, + job_queue_database: None, ngrok: None, verbosity_profile: None, web_app: Some(WebAppConf { @@ -307,6 +311,7 @@ fn standalone_no_auth_sample() -> Sample { plugins: None, recording_path: None, sogar: None, + job_queue_database: None, ngrok: None, verbosity_profile: None, web_app: Some(WebAppConf {