From ae417ce8ccbc65e26062fb53690ebf0f7b2853e4 Mon Sep 17 00:00:00 2001 From: "Nathan.fooo" <86001920+appflowy@users.noreply.github.com> Date: Sun, 21 Jan 2024 12:39:49 +0800 Subject: [PATCH] feat: collab plugin wasm (#150) * refactor: KVTransactionDB * refactor: KVTransactionDB * chore: add indexeddb * test: add indexeddb test * chore: impl stream * chore: add test * chore: add test * ci: add test ci * ci: add test ci --- .github/workflows/wasm_test.yml | 41 ++ Cargo.lock | 275 ++++++++++-- Makefile.toml | 23 +- collab-database/Cargo.toml | 6 +- collab-database/src/blocks/block.rs | 1 + collab-database/src/blocks/task_controller.rs | 2 +- collab-database/src/id_gen/gen.rs | 16 +- collab-database/src/rows/row.rs | 1 + collab-database/src/user/user_db.rs | 1 + collab-database/tests/database_test/helper.rs | 2 +- .../tests/database_test/restore_test.rs | 2 +- collab-database/tests/helper/util.rs | 2 +- .../tests/user_test/async_test/script.rs | 3 +- collab-database/tests/user_test/helper.rs | 2 +- collab-document/Cargo.toml | 2 +- .../tests/document/restore_test.rs | 2 +- collab-document/tests/util.rs | 2 +- collab-folder/tests/folder_test/util.rs | 4 +- collab-plugins/Cargo.toml | 16 +- .../src/cloud_storage/postgres/plugin.rs | 2 +- collab-plugins/src/lib.rs | 25 +- .../indexeddb/indexeddb_plugin.rs | 177 ++++++++ .../src/local_storage/indexeddb/kv_impl.rs | 413 ++++++++++++++++++ .../src/local_storage/indexeddb/mod.rs | 2 + collab-plugins/src/local_storage/kv/db.rs | 19 +- collab-plugins/src/local_storage/kv/doc.rs | 27 +- collab-plugins/src/local_storage/kv/error.rs | 20 +- collab-plugins/src/local_storage/kv/mod.rs | 2 +- collab-plugins/src/local_storage/kv/oid.rs | 22 +- .../src/local_storage/kv/snapshot.rs | 2 +- collab-plugins/src/local_storage/mod.rs | 36 +- .../src/local_storage/rocksdb/kv_impl.rs | 85 ++-- .../local_storage/rocksdb/rocksdb_plugin.rs | 1 + .../local_storage/rocksdb/snapshot_plugin.rs | 2 +- .../src/local_storage/storage_config.rs | 34 ++ collab-plugins/tests/disk/insert_test.rs | 1 + collab-plugins/tests/disk/range_test.rs | 4 +- collab-plugins/tests/disk/restore_test.rs | 5 +- collab-plugins/tests/disk/script.rs | 3 +- collab-plugins/tests/disk/util.rs | 2 +- collab-plugins/tests/main.rs | 6 +- collab-plugins/tests/web/edit_collab_test.rs | 106 +++++ collab-plugins/tests/web/indexeddb_test.rs | 97 ++++ collab-plugins/tests/web/mod.rs | 5 + collab-plugins/tests/web/setup_tests.js | 6 + collab-plugins/tests/web/test.md | 17 + collab/Cargo.toml | 3 + collab/src/core/transaction.rs | 48 +- collab/src/lib.rs | 16 + 49 files changed, 1425 insertions(+), 166 deletions(-) create mode 100644 .github/workflows/wasm_test.yml create mode 100644 collab-plugins/src/local_storage/indexeddb/indexeddb_plugin.rs create mode 100644 collab-plugins/src/local_storage/indexeddb/kv_impl.rs create mode 100644 collab-plugins/src/local_storage/indexeddb/mod.rs create mode 100644 collab-plugins/src/local_storage/storage_config.rs create mode 100644 collab-plugins/tests/web/edit_collab_test.rs create mode 100644 collab-plugins/tests/web/indexeddb_test.rs create mode 100644 collab-plugins/tests/web/mod.rs create mode 100644 collab-plugins/tests/web/setup_tests.js create mode 100644 collab-plugins/tests/web/test.md diff --git a/.github/workflows/wasm_test.yml b/.github/workflows/wasm_test.yml new file mode 100644 index 000000000..2cc4387c0 --- /dev/null +++ b/.github/workflows/wasm_test.yml @@ -0,0 +1,41 @@ +name: Wasm Tests + +on: + pull_request: + +env: + RUST_TOOLCHAIN: "1.75" + CARGO_MAKE_VERSION: "0.36.6" + +jobs: + test: + name: Run Wasm Tests + runs-on: ubuntu-latest + + steps: + - name: Check out code + uses: actions/checkout@v2 + + - name: Install Rust + uses: actions-rs/toolchain@v1 + with: + profile: minimal + toolchain: stable + override: true + + - name: Install wasm-pack + uses: jetli/wasm-pack-action@v0.3.0 + with: + version: latest + + - uses: taiki-e/install-action@v2 + with: + tool: cargo-make@${{ env.CARGO_MAKE_VERSION }} + + - name: Install Node.js + uses: actions/setup-node@v2 + with: + node-version: '14' + + - name: Run Wasm Tests + run: cargo make wasm_test diff --git a/Cargo.lock b/Cargo.lock index eb678d22c..3f591c3bd 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -2,6 +2,18 @@ # It is not intended for manual editing. version = 3 +[[package]] +name = "accessory" +version = "1.3.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "850bb534b9dc04744fbbb71d30ad6d25a7e4cf6dc33e223c81ef3a92ebab4e0b" +dependencies = [ + "macroific", + "proc-macro2", + "quote", + "syn 2.0.46", +] + [[package]] name = "adler" version = "1.0.2" @@ -67,6 +79,28 @@ dependencies = [ "serde_json", ] +[[package]] +name = "async-stream" +version = "0.3.5" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "cd56dd203fef61ac097dd65721a419ddccb106b2d2b70ba60a6b529f03961a51" +dependencies = [ + "async-stream-impl", + "futures-core", + "pin-project-lite", +] + +[[package]] +name = "async-stream-impl" +version = "0.3.5" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "16e62a023e7c117e27523144c5d2459f4397fcc3cab0085af8e2224f643a0193" +dependencies = [ + "proc-macro2", + "quote", + "syn 2.0.46", +] + [[package]] name = "async-trait" version = "0.1.73" @@ -278,6 +312,7 @@ dependencies = [ "tokio-stream", "tracing", "tracing-subscriber", + "web-sys", "yrs", ] @@ -294,6 +329,7 @@ dependencies = [ "collab-plugins", "futures", "getrandom 0.2.9", + "js-sys", "lazy_static", "lru", "nanoid", @@ -384,6 +420,7 @@ version = "0.1.0" dependencies = [ "anyhow", "assert-json-diff", + "async-stream", "async-trait", "bincode", "bytes", @@ -395,6 +432,8 @@ dependencies = [ "futures", "futures-util", "getrandom 0.2.9", + "indexed_db_futures", + "js-sys", "lazy_static", "parking_lot", "rand 0.8.5", @@ -411,7 +450,12 @@ dependencies = [ "tokio-util", "tracing", "tracing-subscriber", + "tracing-wasm", "uuid", + "wasm-bindgen", + "wasm-bindgen-futures", + "wasm-bindgen-test", + "web-sys", "yrs", ] @@ -451,6 +495,16 @@ dependencies = [ "yaml-rust", ] +[[package]] +name = "console_error_panic_hook" +version = "0.1.7" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "a06aeb73f470f66dcdbf7223caeebb85984942f22f1adb2a088cf9668146bbbc" +dependencies = [ + "cfg-if", + "wasm-bindgen", +] + [[package]] name = "constant_time_eq" version = "0.1.5" @@ -544,6 +598,18 @@ dependencies = [ "syn 2.0.46", ] +[[package]] +name = "delegate-display" +version = "2.1.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "98a85201f233142ac819bbf6226e36d0b5e129a47bd325084674261c82d4cd66" +dependencies = [ + "macroific", + "proc-macro2", + "quote", + "syn 2.0.46", +] + [[package]] name = "digest" version = "0.10.7" @@ -582,6 +648,18 @@ dependencies = [ "libc", ] +[[package]] +name = "fancy_constructor" +version = "1.2.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "f71f317e4af73b2f8f608fac190c52eac4b1879d2145df1db2fe48881ca69435" +dependencies = [ + "macroific", + "proc-macro2", + "quote", + "syn 2.0.46", +] + [[package]] name = "fastrand" version = "2.0.0" @@ -606,9 +684,9 @@ checksum = "42703706b716c37f96a77aea830392ad231f44c9e9a67872fa5548707e11b11c" [[package]] name = "futures" -version = "0.3.28" +version = "0.3.30" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "23342abe12aba583913b2e62f22225ff9c950774065e4bfb61a19cd9770fec40" +checksum = "645c6916888f6cb6350d2550b80fb63e734897a8498abe35cfb732b6487804b0" dependencies = [ "futures-channel", "futures-core", @@ -621,9 +699,9 @@ dependencies = [ [[package]] name = "futures-channel" -version = "0.3.28" +version = "0.3.30" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "955518d47e09b25bbebc7a18df10b81f0c766eaf4c4f1cccef2fca5f2a4fb5f2" +checksum = "eac8f7d7865dcb88bd4373ab671c8cf4508703796caa2b1985a9ca867b3fcb78" dependencies = [ "futures-core", "futures-sink", @@ -631,15 +709,15 @@ dependencies = [ [[package]] name = "futures-core" -version = "0.3.28" +version = "0.3.30" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "4bca583b7e26f571124fe5b7561d49cb2868d79116cfa0eefce955557c6fee8c" +checksum = "dfc6580bb841c5a68e9ef15c77ccc837b40a7504914d52e47b8b0e9bbda25a1d" [[package]] name = "futures-executor" -version = "0.3.28" +version = "0.3.30" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "ccecee823288125bd88b4d7f565c9e58e41858e47ab72e8ea2d64e93624386e0" +checksum = "a576fc72ae164fca6b9db127eaa9a9dda0d61316034f33a0a0d4eda41f02b01d" dependencies = [ "futures-core", "futures-task", @@ -648,15 +726,15 @@ dependencies = [ [[package]] name = "futures-io" -version = "0.3.28" +version = "0.3.30" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "4fff74096e71ed47f8e023204cfd0aa1289cd54ae5430a9523be060cdb849964" +checksum = "a44623e20b9681a318efdd71c299b6b222ed6f231972bfe2f224ebad6311f0c1" [[package]] name = "futures-macro" -version = "0.3.28" +version = "0.3.30" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "89ca545a94061b6365f2c7355b4b32bd20df3ff95f02da9329b34ccc3bd6ee72" +checksum = "87750cf4b7a4c0625b1529e4c543c2182106e4dedc60a2a6455e00d212c489ac" dependencies = [ "proc-macro2", "quote", @@ -665,21 +743,21 @@ dependencies = [ [[package]] name = "futures-sink" -version = "0.3.28" +version = "0.3.30" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "f43be4fe21a13b9781a69afa4985b0f6ee0e1afab2c6f454a8cf30e2b2237b6e" +checksum = "9fb8e00e87438d937621c1c6269e53f536c14d3fbd6a042bb24879e57d474fb5" [[package]] name = "futures-task" -version = "0.3.28" +version = "0.3.30" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "76d3d132be6c0e6aa1534069c705a74a5997a356c0dc2f86a47765e5617c5b65" +checksum = "38d84fa142264698cdce1a9f9172cf383a0c82de1bddcf3092901442c4097004" [[package]] name = "futures-util" -version = "0.3.28" +version = "0.3.30" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "26b01e40b772d54cf6c6d721c1d1abd0647a0106a12ecaa1c186273392a69533" +checksum = "3d6401deb83407ab3da39eba7e33987a73c3df0c82b4bb5813ee871c19c41d48" dependencies = [ "futures-channel", "futures-core", @@ -784,6 +862,23 @@ dependencies = [ "cxx-build", ] +[[package]] +name = "indexed_db_futures" +version = "0.4.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "6cc2083760572ee02385ab8b7c02c20925d2dd1f97a1a25a8737a238608f1152" +dependencies = [ + "accessory", + "cfg-if", + "delegate-display", + "fancy_constructor", + "js-sys", + "uuid", + "wasm-bindgen", + "wasm-bindgen-futures", + "web-sys", +] + [[package]] name = "inout" version = "0.1.3" @@ -810,9 +905,9 @@ dependencies = [ [[package]] name = "js-sys" -version = "0.3.61" +version = "0.3.67" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "445dde2150c55e483f3d8416706b97ec8e8237c307e5b7b4b8dd15e6af2a0730" +checksum = "9a1d36f1235bc969acba30b7f5990b864423a6068a10f7c90ae8f0112e3a59d1" dependencies = [ "wasm-bindgen", ] @@ -920,6 +1015,53 @@ dependencies = [ "hashbrown", ] +[[package]] +name = "macroific" +version = "1.3.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "f05c00ac596022625d01047c421a0d97d7f09a18e429187b341c201cb631b9dd" +dependencies = [ + "macroific_attr_parse", + "macroific_core", + "macroific_macro", +] + +[[package]] +name = "macroific_attr_parse" +version = "1.3.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "fd94d5da95b30ae6e10621ad02340909346ad91661f3f8c0f2b62345e46a2f67" +dependencies = [ + "cfg-if", + "proc-macro2", + "quote", + "syn 2.0.46", +] + +[[package]] +name = "macroific_core" +version = "1.0.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "13198c120864097a565ccb3ff947672d969932b7975ebd4085732c9f09435e55" +dependencies = [ + "proc-macro2", + "quote", + "syn 2.0.46", +] + +[[package]] +name = "macroific_macro" +version = "1.1.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "b0c9853143cbed7f1e41dc39fee95f9b361bec65c8dc2a01bf609be01b61f5ae" +dependencies = [ + "macroific_attr_parse", + "macroific_core", + "proc-macro2", + "quote", + "syn 2.0.46", +] + [[package]] name = "matchers" version = "0.1.0" @@ -1293,6 +1435,12 @@ dependencies = [ "winapi-util", ] +[[package]] +name = "scoped-tls" +version = "1.0.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "e1cf6437eb19a8f4a6cc0f7dca544973b0b78843adbfeb3683d1a94a0024a294" + [[package]] name = "scopeguard" version = "1.1.0" @@ -1657,6 +1805,17 @@ dependencies = [ "tracing-log", ] +[[package]] +name = "tracing-wasm" +version = "0.2.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "4575c663a174420fa2d78f4108ff68f65bf2fbb7dd89f33749b6e826b3626e07" +dependencies = [ + "tracing", + "tracing-subscriber", + "wasm-bindgen", +] + [[package]] name = "typenum" version = "1.16.0" @@ -1677,12 +1836,13 @@ checksum = "c0edd1e5b14653f783770bce4a4dabb4a5108a5370a5f5d8cfe8710c361f6c8b" [[package]] name = "uuid" -version = "1.3.3" +version = "1.6.1" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "345444e32442451b267fc254ae85a209c64be56d2890e601a0c37ff0c3c5ecd2" +checksum = "5e395fcf16a7a3d8127ec99782007af141946b4795001f876d54fb0d55978560" dependencies = [ "getrandom 0.2.9", "sha1_smol", + "wasm-bindgen", ] [[package]] @@ -1727,9 +1887,9 @@ checksum = "9c8d87e72b64a3b4db28d11ce29237c246188f4f51057d65a7eab63b7987e423" [[package]] name = "wasm-bindgen" -version = "0.2.84" +version = "0.2.90" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "31f8dcbc21f30d9b8f2ea926ecb58f6b91192c17e9d33594b3df58b2007ca53b" +checksum = "b1223296a201415c7fad14792dbefaace9bd52b62d33453ade1c5b5f07555406" dependencies = [ "cfg-if", "wasm-bindgen-macro", @@ -1737,24 +1897,36 @@ dependencies = [ [[package]] name = "wasm-bindgen-backend" -version = "0.2.84" +version = "0.2.90" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "95ce90fd5bcc06af55a641a86428ee4229e44e07033963a2290a8e241607ccb9" +checksum = "fcdc935b63408d58a32f8cc9738a0bffd8f05cc7c002086c6ef20b7312ad9dcd" dependencies = [ "bumpalo", "log", "once_cell", "proc-macro2", "quote", - "syn 1.0.109", + "syn 2.0.46", "wasm-bindgen-shared", ] +[[package]] +name = "wasm-bindgen-futures" +version = "0.4.40" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "bde2032aeb86bdfaecc8b261eef3cba735cc426c1f3a3416d1e0791be95fc461" +dependencies = [ + "cfg-if", + "js-sys", + "wasm-bindgen", + "web-sys", +] + [[package]] name = "wasm-bindgen-macro" -version = "0.2.84" +version = "0.2.90" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "4c21f77c0bedc37fd5dc21f897894a5ca01e7bb159884559461862ae90c0b4c5" +checksum = "3e4c238561b2d428924c49815533a8b9121c664599558a5d9ec51f8a1740a999" dependencies = [ "quote", "wasm-bindgen-macro-support", @@ -1762,22 +1934,57 @@ dependencies = [ [[package]] name = "wasm-bindgen-macro-support" -version = "0.2.84" +version = "0.2.90" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "2aff81306fcac3c7515ad4e177f521b5c9a15f2b08f4e32d823066102f35a5f6" +checksum = "bae1abb6806dc1ad9e560ed242107c0f6c84335f1749dd4e8ddb012ebd5e25a7" dependencies = [ "proc-macro2", "quote", - "syn 1.0.109", + "syn 2.0.46", "wasm-bindgen-backend", "wasm-bindgen-shared", ] [[package]] name = "wasm-bindgen-shared" -version = "0.2.84" +version = "0.2.90" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "4d91413b1c31d7539ba5ef2451af3f0b833a005eb27a631cec32bc0635a8602b" + +[[package]] +name = "wasm-bindgen-test" +version = "0.3.40" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "139bd73305d50e1c1c4333210c0db43d989395b64a237bd35c10ef3832a7f70c" +dependencies = [ + "console_error_panic_hook", + "js-sys", + "scoped-tls", + "wasm-bindgen", + "wasm-bindgen-futures", + "wasm-bindgen-test-macro", +] + +[[package]] +name = "wasm-bindgen-test-macro" +version = "0.3.40" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "70072aebfe5da66d2716002c729a14e4aec4da0e23cc2ea66323dac541c93928" +dependencies = [ + "proc-macro2", + "quote", + "syn 2.0.46", +] + +[[package]] +name = "web-sys" +version = "0.3.67" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "0046fef7e28c3804e5e38bfa31ea2a0f73905319b677e57ebe37e49358989b5d" +checksum = "58cd2333b6e0be7a39605f0e255892fd7418a682d8da8fe042fe25128794d2ed" +dependencies = [ + "js-sys", + "wasm-bindgen", +] [[package]] name = "winapi" diff --git a/Makefile.toml b/Makefile.toml index c54f8f622..f4ccd8567 100644 --- a/Makefile.toml +++ b/Makefile.toml @@ -83,7 +83,7 @@ script = [ """ #!/bin/bash BASE_DIR=$(pwd) - crates=("collab" "collab-document" "collab-folder" "collab-user") + crates=("collab" "collab-document" "collab-folder" "collab-user" "collab-plugins") # Iterate over each crate and build it for crate in "${crates[@]}"; do @@ -94,8 +94,27 @@ script = [ # Build the crate wasm-pack build --features="wasm_build" || { echo "Build failed for $crate"; exit 1; } + done + """ +] + +[tasks.wasm_test] +script_runner = "bash" +script = [ + """ + #!/bin/bash + BASE_DIR=$(pwd) + crates=("collab-plugins") + + # Iterate over each crate and build it + for crate in "${crates[@]}"; do + echo "🔥🔥🔥 Running $crate tests with wasm-pack..." - # No need to cd back since we use absolute paths + # Navigate to the crate directory + cd "$BASE_DIR/$crate" || { echo "Failed to enter directory $crate"; exit 1; } + + # Build the crate + wasm-pack test --headless --firefox --features="wasm_build" done """ ] diff --git a/collab-database/Cargo.toml b/collab-database/Cargo.toml index d430ffd16..3ea90b721 100644 --- a/collab-database/Cargo.toml +++ b/collab-database/Cargo.toml @@ -30,6 +30,10 @@ strum = "0.25" strum_macros = "0.25" getrandom = { version = "0.2", optional = true } + +[target.'cfg(target_arch = "wasm32")'.dependencies] +js-sys = "0.3" + [dev-dependencies] collab-plugins = { workspace = true } tempfile = "3.8.0" @@ -38,7 +42,7 @@ assert-json-diff = "2.0.2" lazy_static = "1.4.0" tracing-subscriber = { version = "0.3.3", features = ["env-filter"] } rand = "0.8.4" -futures = "0.3.18" +futures = "0.3.30" zip = "0.6.6" [features] diff --git a/collab-database/src/blocks/block.rs b/collab-database/src/blocks/block.rs index 7870d9485..6281d54b9 100644 --- a/collab-database/src/blocks/block.rs +++ b/collab-database/src/blocks/block.rs @@ -6,6 +6,7 @@ use collab::core::collab::{CollabDocState, MutexCollab}; use collab_entity::CollabType; use collab_plugins::local_storage::kv::doc::CollabKVAction; +use collab_plugins::local_storage::kv::KVTransactionDB; use collab_plugins::local_storage::CollabPersistenceConfig; use collab_plugins::CollabKVDB; use lru::LruCache; diff --git a/collab-database/src/blocks/task_controller.rs b/collab-database/src/blocks/task_controller.rs index b5cd7cc80..331e7245e 100644 --- a/collab-database/src/blocks/task_controller.rs +++ b/collab-database/src/blocks/task_controller.rs @@ -9,7 +9,7 @@ use collab::core::collab::{CollabDocState, MutexCollab}; use collab::core::origin::CollabOrigin; use collab_entity::CollabType; use collab_plugins::local_storage::kv::doc::CollabKVAction; -use collab_plugins::local_storage::kv::PersistenceError; +use collab_plugins::local_storage::kv::{KVTransactionDB, PersistenceError}; use collab_plugins::CollabKVDB; use tokio::sync::watch; diff --git a/collab-database/src/id_gen/gen.rs b/collab-database/src/id_gen/gen.rs index 92720e5bf..0f29c5354 100644 --- a/collab-database/src/id_gen/gen.rs +++ b/collab-database/src/id_gen/gen.rs @@ -1,4 +1,4 @@ -use std::time::SystemTime; +use collab_plugins::{if_native, if_wasm}; /// Equivalent to April 9, 2023 4:18:02 AM const EPOCH: u64 = 1637806706000; @@ -57,11 +57,19 @@ impl RowIDGen { } } - fn timestamp(&self) -> u64 { - SystemTime::now() - .duration_since(SystemTime::UNIX_EPOCH) + if_native! { + fn timestamp(&self) -> u64 { + std::time::SystemTime::now() + .duration_since(std::time::SystemTime::UNIX_EPOCH) .expect("Clock moved backwards!") .as_millis() as u64 + } + } + + if_wasm! { + fn timestamp(&self) -> u64 { + js_sys::Date::now() as u64 + } } } diff --git a/collab-database/src/rows/row.rs b/collab-database/src/rows/row.rs index 1659ef87e..86f46e922 100644 --- a/collab-database/src/rows/row.rs +++ b/collab-database/src/rows/row.rs @@ -10,6 +10,7 @@ use parking_lot::Mutex; use collab::core::value::YrsValueExtension; use collab_plugins::local_storage::kv::doc::CollabKVAction; +use collab_plugins::local_storage::kv::KVTransactionDB; use collab_plugins::CollabKVDB; use serde::{Deserialize, Serialize}; use tracing::error; diff --git a/collab-database/src/user/user_db.rs b/collab-database/src/user/user_db.rs index 78dcf0d57..599f3e3a0 100644 --- a/collab-database/src/user/user_db.rs +++ b/collab-database/src/user/user_db.rs @@ -12,6 +12,7 @@ use collab::preclude::{Collab, Update}; use collab_entity::CollabType; use collab_plugins::local_storage::kv::doc::CollabKVAction; use collab_plugins::local_storage::kv::snapshot::{CollabSnapshot, SnapshotAction}; +use collab_plugins::local_storage::kv::KVTransactionDB; use collab_plugins::local_storage::CollabPersistenceConfig; use collab_plugins::CollabKVDB; use parking_lot::Mutex; diff --git a/collab-database/tests/database_test/helper.rs b/collab-database/tests/database_test/helper.rs index 23ea8e9dd..66ddf6073 100644 --- a/collab-database/tests/database_test/helper.rs +++ b/collab-database/tests/database_test/helper.rs @@ -189,7 +189,7 @@ impl DatabaseTestBuilder { pub async fn build(self) -> DatabaseTest { let tempdir = TempDir::new().unwrap(); let path = tempdir.into_path(); - let collab_db = Arc::new(CollabKVDB::open_opt(path, false).unwrap()); + let collab_db = Arc::new(CollabKVDB::open(path).unwrap()); let collab = CollabBuilder::new(self.uid, &self.database_id) .with_device_id("1") .build() diff --git a/collab-database/tests/database_test/restore_test.rs b/collab-database/tests/database_test/restore_test.rs index 8b796575b..468fa17d1 100644 --- a/collab-database/tests/database_test/restore_test.rs +++ b/collab-database/tests/database_test/restore_test.rs @@ -127,7 +127,7 @@ const HISTORY_DOCUMENT_020: &str = "020_database"; #[tokio::test] async fn open_020_history_database_test() { let (_cleaner, db_path) = unzip_history_database_db(HISTORY_DOCUMENT_020).unwrap(); - let db = std::sync::Arc::new(CollabKVDB::open_opt(db_path, false).unwrap()); + let db = std::sync::Arc::new(CollabKVDB::open(db_path).unwrap()); let database_test = restore_database_from_db( 221439819971039232, "c0e69740-49f0-4790-a488-702e2750ba8d", diff --git a/collab-database/tests/helper/util.rs b/collab-database/tests/helper/util.rs index 9742a1fc1..58a2c1ac2 100644 --- a/collab-database/tests/helper/util.rs +++ b/collab-database/tests/helper/util.rs @@ -587,7 +587,7 @@ impl From for AnyMap { pub fn make_rocks_db() -> Arc { let tempdir = TempDir::new().unwrap(); let path = tempdir.into_path(); - Arc::new(CollabKVDB::open_opt(path, false).unwrap()) + Arc::new(CollabKVDB::open(path).unwrap()) } pub fn setup_log() { diff --git a/collab-database/tests/user_test/async_test/script.rs b/collab-database/tests/user_test/async_test/script.rs index 8dfc574f4..d671bed1d 100644 --- a/collab-database/tests/user_test/async_test/script.rs +++ b/collab-database/tests/user_test/async_test/script.rs @@ -10,6 +10,7 @@ use collab_database::rows::{Cells, CellsBuilder, RowId}; use collab_database::user::WorkspaceDatabase; use collab_database::views::{CreateDatabaseParams, OrderObjectPosition}; use collab_plugins::local_storage::kv::doc::CollabKVAction; +use collab_plugins::local_storage::kv::KVTransactionDB; use collab_plugins::local_storage::CollabPersistenceConfig; use collab_plugins::CollabKVDB; use serde_json::Value; @@ -72,7 +73,7 @@ impl DatabaseTest { pub async fn new(config: CollabPersistenceConfig) -> Self { let tempdir = TempDir::new().unwrap(); let db_path = tempdir.into_path(); - let collab_db = Arc::new(CollabKVDB::open_opt(db_path.clone(), false).unwrap()); + let collab_db = Arc::new(CollabKVDB::open(db_path.clone()).unwrap()); let workspace_database = workspace_database_with_db(1, Arc::downgrade(&collab_db), Some(config.clone())).await; Self { diff --git a/collab-database/tests/user_test/helper.rs b/collab-database/tests/user_test/helper.rs index 49df3101e..a893165ad 100644 --- a/collab-database/tests/user_test/helper.rs +++ b/collab-database/tests/user_test/helper.rs @@ -162,7 +162,7 @@ pub async fn user_database_test_with_db( pub async fn user_database_test_with_default_data(uid: i64) -> WorkspaceDatabaseTest { let tempdir = TempDir::new().unwrap(); let path = tempdir.into_path(); - let db = Arc::new(CollabKVDB::open_opt(path, false).unwrap()); + let db = Arc::new(CollabKVDB::open(path).unwrap()); let w_database = user_database_test_with_db(uid, db).await; w_database diff --git a/collab-document/Cargo.toml b/collab-document/Cargo.toml index 302c4d9bc..2556a0361 100644 --- a/collab-document/Cargo.toml +++ b/collab-document/Cargo.toml @@ -28,7 +28,7 @@ tempfile = "3.8.0" tracing-subscriber = { version = "0.3.3", features = ["env-filter"] } collab-plugins = { workspace = true } zip = "0.6.6" -futures = "0.3.17" +futures = "0.3.30" [features] diff --git a/collab-document/tests/document/restore_test.rs b/collab-document/tests/document/restore_test.rs index 01cc52085..d3c4332b7 100644 --- a/collab-document/tests/document/restore_test.rs +++ b/collab-document/tests/document/restore_test.rs @@ -94,7 +94,7 @@ const HISTORY_DOCUMENT_020: &str = "020_document"; #[tokio::test] async fn open_020_history_document_test() { let (_cleaner, db_path) = unzip_history_document_db(HISTORY_DOCUMENT_020).unwrap(); - let db = std::sync::Arc::new(CollabKVDB::open_opt(db_path, false).unwrap()); + let db = std::sync::Arc::new(CollabKVDB::open(db_path).unwrap()); let document = open_document_with_db( 221439819971039232, "631584ec-af71-42c3-94f4-89dcfdafb988", diff --git a/collab-document/tests/util.rs b/collab-document/tests/util.rs index 2a62c66e8..1ece44c32 100644 --- a/collab-document/tests/util.rs +++ b/collab-document/tests/util.rs @@ -130,7 +130,7 @@ pub async fn open_document_with_db(uid: i64, doc_id: &str, db: Arc) pub fn document_storage() -> Arc { let tempdir = TempDir::new().unwrap(); let path = tempdir.into_path(); - Arc::new(CollabKVDB::open_opt(path, false).unwrap()) + Arc::new(CollabKVDB::open(path).unwrap()) } fn setup_log() { diff --git a/collab-folder/tests/folder_test/util.rs b/collab-folder/tests/folder_test/util.rs index f2bada3b8..8b0120345 100644 --- a/collab-folder/tests/folder_test/util.rs +++ b/collab-folder/tests/folder_test/util.rs @@ -52,7 +52,7 @@ pub async fn create_folder_with_data( let tempdir = TempDir::new().unwrap(); let path = tempdir.into_path(); - let db = Arc::new(CollabKVDB::open_opt(path.clone(), false).unwrap()); + let db = Arc::new(CollabKVDB::open(path.clone()).unwrap()); let disk_plugin = RocksdbDiskPlugin::new( uid.as_i64(), workspace_id.to_string(), @@ -86,7 +86,7 @@ pub async fn create_folder_with_data( } pub async fn open_folder_with_db(uid: UserId, object_id: &str, db_path: PathBuf) -> FolderTest { - let db = Arc::new(CollabKVDB::open_opt(db_path.clone(), false).unwrap()); + let db = Arc::new(CollabKVDB::open(db_path.clone()).unwrap()); let disk_plugin = RocksdbDiskPlugin::new( uid.as_i64(), object_id.to_string(), diff --git a/collab-plugins/Cargo.toml b/collab-plugins/Cargo.toml index 35324706f..4f71ef27c 100644 --- a/collab-plugins/Cargo.toml +++ b/collab-plugins/Cargo.toml @@ -47,10 +47,22 @@ assert-json-diff = "2.0.2" tokio-util = { version = "0.7", features = ["codec"] } config = { version = "0.13.3", default-features = false, features = ["yaml"] } dotenv = "0.15.0" -futures = "0.3.17" +futures = "0.3" +[target.'cfg(target_arch = "wasm32")'.dependencies] +indexed_db_futures = { version = "0.4.1" } +js-sys = "0.3" +async-stream = "0.3.4" +futures = "0.3" +wasm-bindgen = "0.2" +web-sys = { version = "0.3.67", features = ["console", "Window"] } +wasm-bindgen-futures = "0.4.40" +tracing-wasm = "0.2" + +[target.'cfg(target_arch = "wasm32")'.dev-dependencies] +wasm-bindgen-test = "0.3.40" [features] default = [] postgres_plugin = ["rand"] -wasm_build = ["getrandom/js"] \ No newline at end of file +wasm_build = ["getrandom/js", "collab/async-plugin"] \ No newline at end of file diff --git a/collab-plugins/src/cloud_storage/postgres/plugin.rs b/collab-plugins/src/cloud_storage/postgres/plugin.rs index d98907c91..465250891 100644 --- a/collab-plugins/src/cloud_storage/postgres/plugin.rs +++ b/collab-plugins/src/cloud_storage/postgres/plugin.rs @@ -24,7 +24,7 @@ use crate::cloud_storage::remote_collab::{ }; use crate::cloud_storage::sink::{SinkConfig, SinkStrategy}; use crate::local_storage::kv::doc::CollabKVAction; -use crate::local_storage::kv::TransactionMutExt; +use crate::local_storage::kv::{KVTransactionDB, TransactionMutExt}; use crate::CollabKVDB; pub struct SupabaseDBPlugin { diff --git a/collab-plugins/src/lib.rs b/collab-plugins/src/lib.rs index b90190e98..351858836 100644 --- a/collab-plugins/src/lib.rs +++ b/collab-plugins/src/lib.rs @@ -1,8 +1,29 @@ pub mod local_storage; +#[macro_export] +macro_rules! if_native { + ($($item:item)*) => {$( + #[cfg(not(target_arch = "wasm32"))] + $item + )*} +} + +#[macro_export] +macro_rules! if_wasm { + ($($item:item)*) => {$( + #[cfg(target_arch = "wasm32")] + $item + )*} +} + #[cfg(all(feature = "postgres_plugin", not(target_arch = "wasm32")))] pub mod cloud_storage; pub mod connect_state; -#[cfg(not(target_arch = "wasm32"))] -pub type CollabKVDB = local_storage::rocksdb::kv_impl::RocksStore; +if_native! { + pub type CollabKVDB = local_storage::rocksdb::kv_impl::KVTransactionDBRocksdbImpl; +} + +if_wasm! { + pub type CollabKVDB = local_storage::indexeddb::kv_impl::CollabIndexeddb; +} diff --git a/collab-plugins/src/local_storage/indexeddb/indexeddb_plugin.rs b/collab-plugins/src/local_storage/indexeddb/indexeddb_plugin.rs new file mode 100644 index 000000000..c316e915a --- /dev/null +++ b/collab-plugins/src/local_storage/indexeddb/indexeddb_plugin.rs @@ -0,0 +1,177 @@ +use crate::local_storage::indexeddb::kv_impl::CollabIndexeddb; +use crate::local_storage::kv::keys::{make_doc_state_key, make_state_vector_key}; + +use async_stream::stream; +use async_trait::async_trait; +use collab::core::awareness::Awareness; + +use collab::core::origin::CollabOrigin; +use collab::preclude::CollabPlugin; +use collab_entity::CollabType; +use futures::stream::StreamExt; + +use crate::local_storage::kv::PersistenceError; +use collab::core::collab::make_yrs_doc; +use collab::core::transaction::DocTransactionExtension; +use std::sync::atomic::AtomicBool; +use std::sync::atomic::Ordering::SeqCst; +use std::sync::{Arc, Weak}; +use tracing::{error, instrument}; +use wasm_bindgen_futures::spawn_local; +use yrs::{Doc, TransactionMut}; + +pub struct IndexeddbDiskPlugin { + uid: i64, + #[allow(dead_code)] + object_id: String, + #[allow(dead_code)] + collab_type: CollabType, + collab_db: Weak, + did_load: Arc, + edit_sender: DocEditStreamSender, +} + +impl IndexeddbDiskPlugin { + pub fn new( + uid: i64, + object_id: String, + collab_type: CollabType, + collab_db: Weak, + ) -> Self { + let did_load = Arc::new(AtomicBool::new(false)); + let (edit_sender, rx) = tokio::sync::mpsc::unbounded_channel(); + let edit_stream = DocEditStream::new(uid, &object_id, collab_db.clone(), rx); + spawn_local(edit_stream.run()); + Self { + uid, + object_id, + collab_type, + did_load, + collab_db, + edit_sender, + } + } + + #[instrument(skip_all)] + fn flush_doc(&self, db: Arc, object_id: &str) { + let uid = self.uid; + let object_id = object_id.to_string(); + spawn_local(async move { + let doc = make_yrs_doc(); + db.load_doc(uid, &object_id, doc.clone()).await.unwrap(); + let encoded_collab = doc.get_encoded_collab_v1(); + db.flush_doc(uid, &object_id, &encoded_collab) + .await + .unwrap(); + }); + } +} + +#[async_trait] +impl CollabPlugin for IndexeddbDiskPlugin { + async fn init(&self, object_id: &str, _origin: &CollabOrigin, doc: &Doc) { + if let Some(db) = self.collab_db.upgrade() { + let object_id = object_id.to_string(); + let doc = doc.clone(); + let uid = self.uid; + + spawn_local(async move { + match db.load_doc(uid, &object_id, doc.clone()).await { + Ok(_) => {}, + Err(err) => { + if err.is_record_not_found() { + let encoded_collab = doc.get_encoded_collab_v1(); + let f = || async move { + let doc_id = db.create_doc_id(uid, object_id).await?; + let doc_state_key = make_doc_state_key(doc_id); + let sv_key = make_state_vector_key(doc_id); + db.set_data(doc_state_key, encoded_collab.doc_state).await?; + db.set_data(sv_key, encoded_collab.state_vector).await?; + Ok::<(), PersistenceError>(()) + }; + if let Err(err) = f().await { + error!("failed to create doc_id: {:?}", err); + } + } else { + error!("failed to get encoded collab: {:?}", err); + } + }, + } + }); + } else { + tracing::warn!("collab_db is dropped"); + } + } + + fn receive_update(&self, _object_id: &str, _txn: &TransactionMut, update: &[u8]) { + // Only push update if the doc is loaded + if !self.did_load.load(SeqCst) { + return; + } + if let Err(err) = self.edit_sender.send(DocUpdate::Update(update.to_vec())) { + error!("failed to send update: {}", err); + } + } + + fn did_init(&self, _awareness: &Awareness, _object_id: &str, _last_sync_at: i64) { + self.did_load.store(true, SeqCst); + } + + fn flush(&self, object_id: &str, _doc: &Doc) { + if let Some(db) = self.collab_db.upgrade() { + self.flush_doc(db, object_id); + } + } +} + +type DocEditStreamSender = tokio::sync::mpsc::UnboundedSender; +type DocEditStreamReceiver = tokio::sync::mpsc::UnboundedReceiver; +struct DocEditStream { + uid: i64, + object_id: String, + collab_db: Weak, + receiver: Option, +} + +#[derive(Clone)] +enum DocUpdate { + Update(Vec), +} + +impl DocEditStream { + fn new( + uid: i64, + object_id: &str, + collab_db: Weak, + receiver: DocEditStreamReceiver, + ) -> Self { + Self { + uid, + object_id: object_id.to_string(), + collab_db, + receiver: Some(receiver), + } + } + + async fn run(mut self) { + let mut receiver = self.receiver.take().expect("Only take once"); + let stream = stream! { + while let Some(data) = receiver.recv().await { + yield data; + } + }; + stream + .for_each(|data| async { + match data { + DocUpdate::Update(update) => { + if let Some(db) = self.collab_db.upgrade() { + if let Err(err) = db.push_update(self.uid, &self.object_id, &update).await { + error!("failed to push update: {}", err); + } + } + }, + } + }) + .await; + } +} diff --git a/collab-plugins/src/local_storage/indexeddb/kv_impl.rs b/collab-plugins/src/local_storage/indexeddb/kv_impl.rs new file mode 100644 index 000000000..3d9ebadf8 --- /dev/null +++ b/collab-plugins/src/local_storage/indexeddb/kv_impl.rs @@ -0,0 +1,413 @@ +use crate::local_storage::kv::PersistenceError; +use collab::core::collab_plugin::EncodedCollab; +use indexed_db_futures::js_sys::wasm_bindgen::JsValue; +use indexed_db_futures::prelude::*; +use js_sys::{ArrayBuffer, Uint8Array}; + +use crate::local_storage::kv::keys::{ + clock_from_key, make_doc_end_key, make_doc_id_key, make_doc_start_key, make_doc_state_key, + make_doc_update_key, make_state_vector_key, Clock, DocID, DOC_ID_LEN, +}; +use crate::local_storage::kv::oid::{LOCAL_DOC_ID_GEN, OID}; +use anyhow::anyhow; +use collab::core::collab::TransactionMutExt; +use indexed_db_futures::web_sys::IdbKeyRange; +use std::sync::Arc; +use tokio::sync::RwLock; +use tracing::error; +use wasm_bindgen::JsCast; +use web_sys::console; +use yrs::updates::decoder::Decode; +use yrs::{Doc, Transact, Update}; + +pub struct CollabIndexeddb { + db: Arc>, +} + +unsafe impl Send for CollabIndexeddb {} +unsafe impl Sync for CollabIndexeddb {} + +const COLLAB_KV_STORE: &str = "collab_kv"; +impl CollabIndexeddb { + pub async fn new() -> Result { + let mut db_req = IdbDatabase::open_u32("appflowy_indexeddb", 1)?; + db_req.set_on_upgrade_needed(Some(|evt: &IdbVersionChangeEvent| -> Result<(), JsValue> { + if evt + .db() + .object_store_names() + .find(|n| n == COLLAB_KV_STORE) + .is_none() + { + evt.db().create_object_store(COLLAB_KV_STORE)?; + } + Ok(()) + })); + let db = Arc::new(RwLock::new(db_req.await?)); + Ok(Self { db }) + } + + pub async fn with_write_transaction( + &self, + f: impl FnOnce(&IdbTransactionActionImpl<'_>) -> Result, + ) -> Result { + let db_write_guard = self.db.write().await; + let txn = db_write_guard + .transaction_on_one_with_mode(COLLAB_KV_STORE, IdbTransactionMode::Readwrite)?; + let action_impl = IdbTransactionActionImpl::new(txn)?; + let output = f(&action_impl)?; + action_impl.tx.await.into_result()?; + Ok(output) + } + + pub async fn get_data( + &self, + store: &IdbObjectStore<'_>, + key: K, + ) -> Result, PersistenceError> + where + K: AsRef<[u8]>, + { + let js_key = to_js_key(key.as_ref()); + match store.get(&js_key)?.await? { + None => Err(PersistenceError::RecordNotFound(format!( + "object with given key:{:?} is not found", + js_key + ))), + Some(value) => Ok(Uint8Array::new(&value).to_vec()), + } + } + + pub async fn set_data(&self, key: K, value: V) -> Result<(), PersistenceError> + where + K: AsRef<[u8]>, + V: AsRef<[u8]>, + { + let write_guard = self.db.write().await; + let transaction = + write_guard.transaction_on_one_with_mode(COLLAB_KV_STORE, IdbTransactionMode::Readwrite)?; + let store = store_from_transaction(&transaction)?; + self.set_data_with_store(&store, key, value).await?; + transaction_result_to_result(transaction.await)?; + Ok(()) + } + + pub async fn set_data_with_store( + &self, + store: &IdbObjectStore<'_>, + key: K, + value: V, + ) -> Result<(), PersistenceError> + where + K: AsRef<[u8]>, + V: AsRef<[u8]>, + { + let js_key = to_js_key(key.as_ref()); + let js_value = to_js_key(value.as_ref()); + store.put_key_val(&js_key, &js_value)?.await?; + Ok(()) + } + + pub async fn create_doc( + &self, + uid: i64, + object_id: &str, + encoded_collab: &EncodedCollab, + ) -> Result<(), PersistenceError> { + let doc_id = self.create_doc_id(uid, object_id).await?; + let doc_state_key = make_doc_state_key(doc_id); + let sv_key = make_state_vector_key(doc_id); + + let read_guard = self.db.write().await; + let transaction = + read_guard.transaction_on_one_with_mode(COLLAB_KV_STORE, IdbTransactionMode::Readwrite)?; + let store = store_from_transaction(&transaction)?; + self + .set_data_with_store(&store, doc_state_key, &encoded_collab.doc_state) + .await?; + self + .set_data_with_store(&store, sv_key, &encoded_collab.state_vector) + .await?; + + transaction_result_to_result(transaction.await)?; + Ok(()) + } + + pub async fn load_doc( + &self, + uid: i64, + object_id: &str, + doc: Doc, + ) -> Result<(), PersistenceError> { + let read_guard = self.db.read().await; + let transaction = + read_guard.transaction_on_one_with_mode(COLLAB_KV_STORE, IdbTransactionMode::Readonly)?; + let store = store_from_transaction(&transaction)?; + let doc_id = self + .get_doc_id(&store, uid, object_id) + .await + .ok_or_else(|| { + PersistenceError::RecordNotFound(format!("doc_id for object_id:{} is not found", object_id)) + })?; + + let doc_state_key = make_doc_state_key(doc_id); + let doc_state = self.get_data(&store, doc_state_key).await?; + let updates = fetch_updates(&store, doc_id).await?; + + let mut txn = doc + .try_transact_mut() + .map_err(|err| PersistenceError::Internal(anyhow!("Transact mut fail. error: {:?}", err)))?; + let doc_state_update = Update::decode_v1(doc_state.as_ref()).map_err(PersistenceError::Yrs)?; + txn.try_apply_update(doc_state_update)?; + + for update in updates { + if let Ok(update) = Update::decode_v1(update.as_ref()) { + txn.try_apply_update(update)?; + } + } + + drop(txn); + Ok(()) + } + + pub async fn get_encoded_collab( + &self, + uid: i64, + object_id: &str, + ) -> Result { + let read_guard = self.db.read().await; + let transaction = + read_guard.transaction_on_one_with_mode(COLLAB_KV_STORE, IdbTransactionMode::Readonly)?; + let store = store_from_transaction(&transaction)?; + + let doc_id = self + .get_doc_id(&store, uid, object_id) + .await + .ok_or_else(|| { + PersistenceError::RecordNotFound(format!("doc_id for object_id:{} is not found", object_id)) + })?; + + let doc_state_key = make_doc_state_key(doc_id); + let sv_key = make_state_vector_key(doc_id); + + let doc_stata = self.get_data(&store, doc_state_key).await?; + let sv = self.get_data(&store, sv_key).await?; + + Ok(EncodedCollab::new_v1(sv, doc_stata)) + } + + pub async fn flush_doc( + &self, + uid: i64, + object_id: &str, + encoded: &EncodedCollab, + ) -> Result<(), PersistenceError> { + let read_guard = self.db.write().await; + let transaction = + read_guard.transaction_on_one_with_mode(COLLAB_KV_STORE, IdbTransactionMode::Readwrite)?; + let store = store_from_transaction(&transaction)?; + let doc_id = self + .get_doc_id(&store, uid, object_id) + .await + .ok_or_else(|| { + PersistenceError::RecordNotFound(format!("doc_id for object_id:{} is not found", object_id)) + })?; + + let start = to_js_key(make_doc_start_key(doc_id)); + let end = to_js_key(make_doc_end_key(doc_id)); + let key_range = IdbKeyRange::bound(&start, &end).map_err(|err| { + PersistenceError::Internal(anyhow!("Get last update key fail. error: {:?}", err)) + })?; + + let cursor_request = store + .open_cursor_with_range(&key_range)? + .await? + .ok_or_else(|| { + PersistenceError::Internal(anyhow!("Open cursor fail. error: {:?}", "cursor is none")) + })?; + + // Delete the first key + let _ = cursor_request.delete(); + while cursor_request.continue_cursor()?.await? { + console::log_1(&JsValue::from_str("delete cursor")); + if let Err(err) = cursor_request.delete() { + error!("failed to delete cursor: {:?}", err) + } + } + + let doc_state_key = make_doc_state_key(doc_id); + let sv_key = make_state_vector_key(doc_id); + self + .set_data_with_store(&store, doc_state_key, &encoded.doc_state) + .await?; + self + .set_data_with_store(&store, sv_key, &encoded.state_vector) + .await?; + + transaction_result_to_result(transaction.await)?; + Ok(()) + } + + pub async fn push_update( + &self, + uid: i64, + object_id: &str, + update: &[u8], + ) -> Result<(), PersistenceError> { + let write_guard = self.db.write().await; + let transaction = + write_guard.transaction_on_one_with_mode(COLLAB_KV_STORE, IdbTransactionMode::Readwrite)?; + let store = store_from_transaction(&transaction)?; + let doc_id = self + .get_doc_id(&store, uid, object_id) + .await + .ok_or_else(|| { + PersistenceError::RecordNotFound(format!("doc_id for object_id:{} is not found", object_id)) + })?; + self.put_update(&store, doc_id, update).await?; + transaction_result_to_result(transaction.await)?; + Ok(()) + } + + pub async fn get_all_updates( + &self, + uid: i64, + object_id: &str, + ) -> Result>, PersistenceError> { + let read_guard = self.db.read().await; + let transaction = + read_guard.transaction_on_one_with_mode(COLLAB_KV_STORE, IdbTransactionMode::Readonly)?; + let store = store_from_transaction(&transaction)?; + let doc_id = self + .get_doc_id(&store, uid, object_id) + .await + .ok_or_else(|| { + PersistenceError::RecordNotFound(format!("doc_id for object_id:{} is not found", object_id)) + })?; + + fetch_updates(&store, doc_id).await + } + + async fn put_update( + &self, + store: &IdbObjectStore<'_>, + id: OID, + update: &[u8], + ) -> Result<(), PersistenceError> { + let max_key = JsValue::from(Uint8Array::from( + make_doc_update_key(id, Clock::MAX).as_ref(), + )); + + let key_range = IdbKeyRange::upper_bound(&max_key).map_err(|err| { + PersistenceError::Internal(anyhow!("Get last update key fail. error: {:?}", err)) + })?; + let cursor = store + .open_cursor_with_range_and_direction(&key_range, IdbCursorDirection::Prev)? + .await? + .ok_or_else(|| { + PersistenceError::Internal(anyhow!("Open cursor fail. error: {:?}", "cursor is none")) + })?; + + let clock = cursor + .key() + .map(|key| { + let array_buffer = key.dyn_into::().unwrap(); + let uint8_array = Uint8Array::new(&array_buffer); + let mut vec = vec![0; uint8_array.length() as usize]; + uint8_array.copy_to(&mut vec); + let clock_byte = clock_from_key(&vec); + Clock::from_be_bytes(clock_byte.try_into().unwrap()) + }) + .unwrap_or_else(|| 0); + + let next_clock = clock + 1; + let update_key = make_doc_update_key(id, next_clock); + self.set_data_with_store(store, update_key, update).await?; + Ok(()) + } + + pub async fn get_doc_id( + &self, + store: &IdbObjectStore<'_>, + uid: i64, + object_id: &K, + ) -> Option + where + K: AsRef<[u8]> + ?Sized, + { + let uid_id_bytes = &uid.to_be_bytes(); + let key = make_doc_id_key(uid_id_bytes, object_id.as_ref()); + let value = self.get_data(store, key).await.ok()?; + let mut bytes = [0; DOC_ID_LEN]; + bytes[0..DOC_ID_LEN].copy_from_slice(value.as_ref()); + Some(OID::from_be_bytes(bytes)) + } + + pub async fn create_doc_id(&self, uid: i64, object_id: I) -> Result + where + I: AsRef<[u8]>, + { + let new_id = LOCAL_DOC_ID_GEN.lock().next_id(); + let key = make_doc_id_key(&uid.to_be_bytes(), object_id.as_ref()); + self.set_data(key, new_id.to_be_bytes()).await?; + Ok(new_id) + } +} + +fn to_js_key>(key: K) -> JsValue { + JsValue::from(Uint8Array::from(key.as_ref())) +} + +fn store_from_transaction<'a>( + txn: &'a IdbTransaction<'a>, +) -> Result, PersistenceError> { + txn + .object_store(COLLAB_KV_STORE) + .map_err(PersistenceError::from) +} + +pub struct IdbTransactionActionImpl<'a> { + tx: IdbTransaction<'a>, +} + +impl<'a> IdbTransactionActionImpl<'a> { + fn new(tx: IdbTransaction<'a>) -> Result { + Ok(Self { tx }) + } +} + +fn transaction_result_to_result(result: IdbTransactionResult) -> Result<(), PersistenceError> { + match result { + IdbTransactionResult::Success => Ok(()), + IdbTransactionResult::Error(err) => Err(PersistenceError::from(err)), + IdbTransactionResult::Abort => Err(PersistenceError::Internal(anyhow!("Transaction aborted"))), + } +} + +async fn fetch_updates( + store: &IdbObjectStore<'_>, + doc_id: DocID, +) -> Result>, PersistenceError> { + let start = to_js_key(make_doc_update_key(doc_id, 0).as_ref()); + let end = to_js_key(make_doc_update_key(doc_id, Clock::MAX).as_ref()); + let key_range = IdbKeyRange::bound(&start, &end).map_err(|err| { + PersistenceError::Internal(anyhow!("Get last update key fail. error: {:?}", err)) + })?; + let cursor_request = store.open_cursor_with_range(&key_range)?.await?; + if cursor_request.is_none() { + return Ok(Vec::new()); + } + + let cursor_request = cursor_request.unwrap(); + let mut js_values = Vec::new(); + js_values.push(cursor_request.value()); + while cursor_request.continue_cursor()?.await? { + js_values.push(cursor_request.value()); + } + + Ok( + js_values + .into_iter() + .map(|js_value| js_value.dyn_into::().unwrap().to_vec()) + .collect(), + ) +} diff --git a/collab-plugins/src/local_storage/indexeddb/mod.rs b/collab-plugins/src/local_storage/indexeddb/mod.rs new file mode 100644 index 000000000..819c785c9 --- /dev/null +++ b/collab-plugins/src/local_storage/indexeddb/mod.rs @@ -0,0 +1,2 @@ +pub mod indexeddb_plugin; +pub mod kv_impl; diff --git a/collab-plugins/src/local_storage/kv/db.rs b/collab-plugins/src/local_storage/kv/db.rs index 0722e7b33..d347cea0b 100644 --- a/collab-plugins/src/local_storage/kv/db.rs +++ b/collab-plugins/src/local_storage/kv/db.rs @@ -12,6 +12,23 @@ use crate::local_storage::kv::PersistenceError; use smallvec::SmallVec; use yrs::{TransactionMut, Update}; +pub trait KVTransactionDB: Send + Sync + 'static { + type TransactionAction<'a>; + + fn read_txn<'a, 'b>(&'b self) -> Self::TransactionAction<'a> + where + 'b: 'a; + + fn with_write_txn<'a, 'b, Output>( + &'b self, + f: impl FnOnce(&Self::TransactionAction<'a>) -> Result, + ) -> Result + where + 'b: 'a; + + fn flush(&self) -> Result<(), PersistenceError>; +} + pub trait KVStore<'a> { type Range: Iterator; type Entry: KVEntry; @@ -172,7 +189,7 @@ where Some(OID::from_be_bytes(bytes)) } -pub fn make_doc_id_for_key<'a, S>(store: &S, key: Key<20>) -> Result +pub fn insert_doc_id_for_key<'a, S>(store: &S, key: Key<20>) -> Result where S: KVStore<'a>, PersistenceError: From<>::Error>, diff --git a/collab-plugins/src/local_storage/kv/doc.rs b/collab-plugins/src/local_storage/kv/doc.rs index 49754a146..700b665ae 100644 --- a/collab-plugins/src/local_storage/kv/doc.rs +++ b/collab-plugins/src/local_storage/kv/doc.rs @@ -72,8 +72,8 @@ where Ok(()) } - fn is_exist + ?Sized + Debug>(&self, collab_id: i64, object_id: &K) -> bool { - get_doc_id(collab_id, self, object_id).is_some() + fn is_exist + ?Sized + Debug>(&self, uid: i64, object_id: &K) -> bool { + get_doc_id(uid, self, object_id).is_some() } /// Load the document from the database and apply the updates to the transaction. @@ -130,7 +130,10 @@ where Ok(update_count) } else { tracing::trace!("[Client] => {:?} not exist", object_id); - Err(PersistenceError::DocumentNotExist) + Err(PersistenceError::RecordNotFound(format!( + "doc with given object id: {:?} is not found", + object_id + ))) } } @@ -193,7 +196,10 @@ where "🔴Insert update failed. Can't find the doc for {:?}", object_id ); - Err(PersistenceError::DocumentNotExist) + Err(PersistenceError::RecordNotFound(format!( + "doc with given object id: {:?} is not found", + object_id + ))) }, Some(doc_id) => insert_doc_update(self, doc_id, object_id, update.to_vec()), } @@ -323,7 +329,10 @@ where } Ok(updates) } else { - Err(PersistenceError::DocumentNotExist) + Err(PersistenceError::RecordNotFound(format!( + "The document with given object id: {:?} is not found", + object_id.as_ref(), + ))) } } @@ -373,18 +382,18 @@ where Ok(did) } else { let key = make_doc_id_key(&uid.to_be_bytes(), object_id.as_ref()); - let new_did = make_doc_id_for_key(store, key)?; + let new_did = insert_doc_id_for_key(store, key)?; Ok(new_did) } } -fn get_doc_id<'a, K, S>(collab_id: i64, store: &S, object_id: &K) -> Option +fn get_doc_id<'a, K, S>(uid: i64, store: &S, object_id: &K) -> Option where S: KVStore<'a>, K: AsRef<[u8]> + ?Sized, { - let collab_id_bytes = &collab_id.to_be_bytes(); - let key = make_doc_id_key(collab_id_bytes, object_id.as_ref()); + let uid_id_bytes = &uid.to_be_bytes(); + let key = make_doc_id_key(uid_id_bytes, object_id.as_ref()); get_id_for_key(store, key) } diff --git a/collab-plugins/src/local_storage/kv/error.rs b/collab-plugins/src/local_storage/kv/error.rs index 873f87521..708c6c1e9 100644 --- a/collab-plugins/src/local_storage/kv/error.rs +++ b/collab-plugins/src/local_storage/kv/error.rs @@ -21,8 +21,8 @@ pub enum PersistenceError { #[error(transparent)] Bincode(#[from] bincode::Error), - #[error("The document is not exist")] - DocumentNotExist, + #[error("{0}")] + RecordNotFound(String), #[error("The document already exist")] DocumentAlreadyExist, @@ -42,10 +42,26 @@ pub enum PersistenceError { #[error("Can't find the latest update key")] LatestUpdateKeyNotExist, + #[error(transparent)] + Collab(#[from] collab::error::CollabError), + #[error(transparent)] Internal(#[from] anyhow::Error), } +impl PersistenceError { + pub fn is_record_not_found(&self) -> bool { + matches!(self, PersistenceError::RecordNotFound(_)) + } +} + +#[cfg(target_arch = "wasm32")] +impl From for PersistenceError { + fn from(value: indexed_db_futures::web_sys::DomException) -> Self { + PersistenceError::Internal(anyhow::anyhow!("DOMException: {:?}", value)) + } +} + #[cfg(not(target_arch = "wasm32"))] impl From for PersistenceError { fn from(value: rocksdb::Error) -> Self { diff --git a/collab-plugins/src/local_storage/kv/mod.rs b/collab-plugins/src/local_storage/kv/mod.rs index 59e3a1f5d..a4aecf66b 100644 --- a/collab-plugins/src/local_storage/kv/mod.rs +++ b/collab-plugins/src/local_storage/kv/mod.rs @@ -6,6 +6,6 @@ mod db; pub mod doc; pub mod error; pub mod keys; -mod oid; +pub mod oid; mod range; pub mod snapshot; diff --git a/collab-plugins/src/local_storage/kv/oid.rs b/collab-plugins/src/local_storage/kv/oid.rs index b39a2d5d4..d96ea3b15 100644 --- a/collab-plugins/src/local_storage/kv/oid.rs +++ b/collab-plugins/src/local_storage/kv/oid.rs @@ -1,8 +1,8 @@ #![allow(clippy::upper_case_acronyms)] +use crate::{if_native, if_wasm}; use lazy_static::lazy_static; use parking_lot::Mutex; -use std::time::SystemTime; const EPOCH: u64 = 1637806706000; const NODE_BITS: u64 = 8; @@ -25,6 +25,12 @@ pub struct DocIDGen { last_timestamp: u64, } +impl Default for DocIDGen { + fn default() -> Self { + Self::new() + } +} + impl DocIDGen { #[allow(dead_code)] pub fn new() -> DocIDGen { @@ -64,11 +70,19 @@ impl DocIDGen { } } - fn timestamp(&self) -> u64 { - SystemTime::now() - .duration_since(SystemTime::UNIX_EPOCH) + if_wasm! { + fn timestamp(&self) -> u64 { + js_sys::Date::now() as u64 + } + } + + if_native! { + fn timestamp(&self) -> u64 { + std::time::SystemTime::now() + .duration_since(std::time::SystemTime::UNIX_EPOCH) .expect("Clock moved backwards!") .as_millis() as u64 + } } } diff --git a/collab-plugins/src/local_storage/kv/snapshot.rs b/collab-plugins/src/local_storage/kv/snapshot.rs index 28f88f59c..156fae0bc 100644 --- a/collab-plugins/src/local_storage/kv/snapshot.rs +++ b/collab-plugins/src/local_storage/kv/snapshot.rs @@ -143,7 +143,7 @@ where Ok(snapshot_id) } else { let key = make_snapshot_id_key(&uid.to_be_bytes(), object_id.as_ref()); - let new_snapshot_id = make_doc_id_for_key(self, key)?; + let new_snapshot_id = insert_doc_id_for_key(self, key)?; Ok(new_snapshot_id) } } diff --git a/collab-plugins/src/local_storage/mod.rs b/collab-plugins/src/local_storage/mod.rs index 24ab602d8..7aa724de4 100644 --- a/collab-plugins/src/local_storage/mod.rs +++ b/collab-plugins/src/local_storage/mod.rs @@ -3,37 +3,9 @@ pub mod kv; #[cfg(not(target_arch = "wasm32"))] pub mod rocksdb; -#[derive(Clone)] -pub struct CollabPersistenceConfig { - /// Enable snapshot. Default is [false]. - pub enable_snapshot: bool, - /// Generate a snapshot every N updates - /// Default is 100. The value must be greater than 0. - pub snapshot_per_update: u32, -} +#[cfg(target_arch = "wasm32")] +pub mod indexeddb; -impl CollabPersistenceConfig { - pub fn new() -> Self { - Self::default() - } +mod storage_config; - pub fn enable_snapshot(mut self, enable_snapshot: bool) -> Self { - self.enable_snapshot = enable_snapshot; - self - } - - pub fn snapshot_per_update(mut self, snapshot_per_update: u32) -> Self { - debug_assert!(snapshot_per_update > 0); - self.snapshot_per_update = snapshot_per_update; - self - } -} - -impl Default for CollabPersistenceConfig { - fn default() -> Self { - Self { - enable_snapshot: true, - snapshot_per_update: 100, - } - } -} +pub use storage_config::*; diff --git a/collab-plugins/src/local_storage/rocksdb/kv_impl.rs b/collab-plugins/src/local_storage/rocksdb/kv_impl.rs index 5b0a8a8d5..5d2e8eec8 100644 --- a/collab-plugins/src/local_storage/rocksdb/kv_impl.rs +++ b/collab-plugins/src/local_storage/rocksdb/kv_impl.rs @@ -3,8 +3,7 @@ use std::ops::RangeBounds; use std::path::Path; use std::sync::Arc; -use crate::local_storage::kv::doc::CollabKVAction; -use crate::local_storage::kv::{KVEntry, KVStore, PersistenceError}; +use crate::local_storage::kv::{KVEntry, KVStore, KVTransactionDB, PersistenceError}; use rocksdb::Direction::Forward; use rocksdb::{ DBIteratorWithThreadMode, Direction, ErrorKind, IteratorMode, Options, ReadOptions, @@ -13,14 +12,15 @@ use rocksdb::{ }; #[derive(Clone)] -pub struct RocksStore { +pub struct KVTransactionDBRocksdbImpl { db: Arc, } -impl RocksStore { +impl KVTransactionDBRocksdbImpl { /// Open a new RocksDB database at the given path. /// If the database is corrupted, try to repair it. If it cannot be repaired, return an error. - pub fn open_opt(path: impl AsRef, auto_repair: bool) -> Result { + pub fn open(path: impl AsRef) -> Result { + let auto_repair = false; let txn_db_opts = TransactionDBOptions::default(); let mut db_opts = Options::default(); // This option sets the upper limit for the total number of background jobs (both flushes and compactions) @@ -109,19 +109,15 @@ impl RocksStore { Ok(Self { db: Arc::new(db) }) } +} - /// Open a new RocksDB database at the given path. - /// If the database is corrupted, try to repair it. If it cannot be repaired, return an error. - pub fn open(path: impl AsRef) -> Result { - Self::open_opt(path, false) - } +impl KVTransactionDB for KVTransactionDBRocksdbImpl { + type TransactionAction<'a> = RocksdbKVStoreImpl<'a, TransactionDB>; - pub fn flush(&self) -> Result<(), PersistenceError> { - Ok(()) - } - - /// Return a read transaction that accesses the database exclusively. - pub fn read_txn(&self) -> impl CollabKVAction<'_, Error = PersistenceError> { + fn read_txn<'a, 'b>(&'b self) -> Self::TransactionAction<'a> + where + 'b: 'a, + { let mut txn_options = TransactionOptions::default(); // Use snapshot to provides a consistent view of the data. This snapshot can then be used // to perform read operations, and the returned data will be consistent with the database @@ -131,33 +127,38 @@ impl RocksStore { let txn = self .db .transaction_opt(&WriteOptions::default(), &txn_options); - RocksKVStoreImpl::new(txn) + RocksdbKVStoreImpl::new(txn) } - /// Create a write transaction that accesses the database exclusively. - /// The transaction will be committed when the closure [F] returns. - pub fn with_write_txn(&self, f: F) -> Result + fn with_write_txn<'a, 'b, Output>( + &'b self, + f: impl FnOnce(&Self::TransactionAction<'a>) -> Result, + ) -> Result where - F: FnOnce(&RocksKVStoreImpl<'_, TransactionDB>) -> Result, + 'b: 'a, { let txn_options = TransactionOptions::default(); let txn = self .db .transaction_opt(&WriteOptions::default(), &txn_options); - let store = RocksKVStoreImpl::new(txn); + let store = RocksdbKVStoreImpl::new(txn); let result = f(&store)?; store.0.commit()?; Ok(result) } + + fn flush(&self) -> Result<(), PersistenceError> { + Ok(()) + } } -/// Implementation of [KVStore] for [RocksStore]. This is a wrapper around [Transaction]. +/// Implementation of [KVStore] for [KVTransactionDBRocksdbImpl]. This is a wrapper around [Transaction]. // pub struct RocksKVStoreImpl<'a, DB: Send + Sync>(Transaction<'a, DB>); -pub struct RocksKVStoreImpl<'a, DB: Send>(Transaction<'a, DB>); +pub struct RocksdbKVStoreImpl<'a, DB: Send>(Transaction<'a, DB>); -unsafe impl<'a, DB: Send> Send for RocksKVStoreImpl<'a, DB> {} +unsafe impl<'a, DB: Send> Send for RocksdbKVStoreImpl<'a, DB> {} -impl<'a, DB: Send + Sync> RocksKVStoreImpl<'a, DB> { +impl<'a, DB: Send + Sync> RocksdbKVStoreImpl<'a, DB> { pub fn new(txn: Transaction<'a, DB>) -> Self { Self(txn) } @@ -168,10 +169,10 @@ impl<'a, DB: Send + Sync> RocksKVStoreImpl<'a, DB> { } } -impl<'a, DB: Send + Sync> KVStore<'a> for RocksKVStoreImpl<'a, DB> { - type Range = RocksDBRange<'a, DB>; - type Entry = RocksDBEntry; - type Value = RocksDBVec; +impl<'a, DB: Send + Sync> KVStore<'a> for RocksdbKVStoreImpl<'a, DB> { + type Range = RocksdbRange<'a, DB>; + type Entry = RocksdbEntry; + type Value = Vec; type Error = PersistenceError; fn get>(&self, key: K) -> Result, Self::Error> { @@ -235,7 +236,7 @@ impl<'a, DB: Send + Sync> KVStore<'a> for RocksKVStoreImpl<'a, DB> { }; let iterator_mode = IteratorMode::From(from, Forward); let iter = self.0.iterator_opt(iterator_mode, opt); - Ok(RocksDBRange { + Ok(RocksdbRange { // Safe to transmute because the lifetime of the iterator is the same as the lifetime of the // transaction. inner: unsafe { std::mem::transmute(iter) }, @@ -248,29 +249,27 @@ impl<'a, DB: Send + Sync> KVStore<'a> for RocksKVStoreImpl<'a, DB> { let mut raw = self.0.raw_iterator_opt(opt); raw.seek_for_prev(key); if let Some((key, value)) = raw.item() { - Ok(Some(RocksDBEntry::new(key.to_vec(), value.to_vec()))) + Ok(Some(RocksdbEntry::new(key.to_vec(), value.to_vec()))) } else { Ok(None) } } } -impl<'a, DB: Send + Sync> From> for RocksKVStoreImpl<'a, DB> { +impl<'a, DB: Send + Sync> From> for RocksdbKVStoreImpl<'a, DB> { #[inline(always)] fn from(txn: Transaction<'a, DB>) -> Self { - RocksKVStoreImpl::new(txn) + RocksdbKVStoreImpl::new(txn) } } -pub type RocksDBVec = Vec; - -pub struct RocksDBRange<'a, DB> { +pub struct RocksdbRange<'a, DB> { inner: DBIteratorWithThreadMode<'a, Transaction<'a, DB>>, to: Vec, } -impl<'a, DB: Send + Sync> Iterator for RocksDBRange<'a, DB> { - type Item = RocksDBEntry; +impl<'a, DB: Send + Sync> Iterator for RocksdbRange<'a, DB> { + type Item = RocksdbEntry; fn next(&mut self) -> Option { let n = self.inner.next()?; @@ -278,7 +277,7 @@ impl<'a, DB: Send + Sync> Iterator for RocksDBRange<'a, DB> { if key.as_ref() >= self.to.as_slice() { None } else { - Some(RocksDBEntry::new(key.to_vec(), value.to_vec())) + Some(RocksdbEntry::new(key.to_vec(), value.to_vec())) } } else { None @@ -286,18 +285,18 @@ impl<'a, DB: Send + Sync> Iterator for RocksDBRange<'a, DB> { } } -pub struct RocksDBEntry { +pub struct RocksdbEntry { key: Vec, value: Vec, } -impl RocksDBEntry { +impl RocksdbEntry { pub fn new(key: Vec, value: Vec) -> Self { Self { key, value } } } -impl KVEntry for RocksDBEntry { +impl KVEntry for RocksdbEntry { fn key(&self) -> &[u8] { self.key.as_ref() } diff --git a/collab-plugins/src/local_storage/rocksdb/rocksdb_plugin.rs b/collab-plugins/src/local_storage/rocksdb/rocksdb_plugin.rs index 6450a9be7..fc2405a3c 100644 --- a/collab-plugins/src/local_storage/rocksdb/rocksdb_plugin.rs +++ b/collab-plugins/src/local_storage/rocksdb/rocksdb_plugin.rs @@ -16,6 +16,7 @@ use yrs::{Doc, ReadTxn, StateVector, Transact, TransactionMut}; use crate::local_storage::kv::doc::CollabKVAction; use crate::local_storage::kv::snapshot::SnapshotPersistence; +use crate::local_storage::kv::KVTransactionDB; use crate::local_storage::rocksdb::snapshot_plugin::CollabSnapshot; use crate::local_storage::CollabPersistenceConfig; diff --git a/collab-plugins/src/local_storage/rocksdb/snapshot_plugin.rs b/collab-plugins/src/local_storage/rocksdb/snapshot_plugin.rs index 8443ec4e8..22511ad9e 100644 --- a/collab-plugins/src/local_storage/rocksdb/snapshot_plugin.rs +++ b/collab-plugins/src/local_storage/rocksdb/snapshot_plugin.rs @@ -2,7 +2,7 @@ use std::sync::{Arc, Weak}; use crate::local_storage::kv::doc::CollabKVAction; use crate::local_storage::kv::snapshot::SnapshotPersistence; -use crate::local_storage::kv::PersistenceError; +use crate::local_storage::kv::{KVTransactionDB, PersistenceError}; use crate::CollabKVDB; use collab::preclude::Collab; use collab_entity::CollabType; diff --git a/collab-plugins/src/local_storage/storage_config.rs b/collab-plugins/src/local_storage/storage_config.rs new file mode 100644 index 000000000..478a1e9e0 --- /dev/null +++ b/collab-plugins/src/local_storage/storage_config.rs @@ -0,0 +1,34 @@ +#[derive(Clone)] +pub struct CollabPersistenceConfig { + /// Enable snapshot. Default is [false]. + pub enable_snapshot: bool, + /// Generate a snapshot every N updates + /// Default is 100. The value must be greater than 0. + pub snapshot_per_update: u32, +} + +impl CollabPersistenceConfig { + pub fn new() -> Self { + Self::default() + } + + pub fn enable_snapshot(mut self, enable_snapshot: bool) -> Self { + self.enable_snapshot = enable_snapshot; + self + } + + pub fn snapshot_per_update(mut self, snapshot_per_update: u32) -> Self { + debug_assert!(snapshot_per_update > 0); + self.snapshot_per_update = snapshot_per_update; + self + } +} + +impl Default for CollabPersistenceConfig { + fn default() -> Self { + Self { + enable_snapshot: true, + snapshot_per_update: 100, + } + } +} diff --git a/collab-plugins/tests/disk/insert_test.rs b/collab-plugins/tests/disk/insert_test.rs index f1d32560e..cbc275cc2 100644 --- a/collab-plugins/tests/disk/insert_test.rs +++ b/collab-plugins/tests/disk/insert_test.rs @@ -4,6 +4,7 @@ use assert_json_diff::assert_json_eq; use collab::preclude::CollabBuilder; use collab_entity::CollabType; use collab_plugins::local_storage::kv::doc::CollabKVAction; +use collab_plugins::local_storage::kv::KVTransactionDB; use collab_plugins::local_storage::CollabPersistenceConfig; use std::sync::Arc; diff --git a/collab-plugins/tests/disk/range_test.rs b/collab-plugins/tests/disk/range_test.rs index 1dc7ec6d4..e6e5ed772 100644 --- a/collab-plugins/tests/disk/range_test.rs +++ b/collab-plugins/tests/disk/range_test.rs @@ -4,7 +4,7 @@ use std::thread; use crate::disk::util::rocks_db; use collab_plugins::local_storage::kv::keys::{clock_from_key, make_doc_update_key, Clock}; -use collab_plugins::local_storage::kv::{KVEntry, KVStore}; +use collab_plugins::local_storage::kv::{KVEntry, KVStore, KVTransactionDB}; use smallvec::SmallVec; #[tokio::test] @@ -59,7 +59,7 @@ async fn rocks_id_test() { let txn = rocks_db.read_txn(); let value = txn.get([0, 0, 0, 0, 0, 0, 0, 2]).unwrap().unwrap(); - assert_eq!(value.as_ref(), &[0, 1, 3]); + assert_eq!(&value, &[0, 1, 3]); } #[tokio::test] diff --git a/collab-plugins/tests/disk/restore_test.rs b/collab-plugins/tests/disk/restore_test.rs index e5d8670b6..d52a4451f 100644 --- a/collab-plugins/tests/disk/restore_test.rs +++ b/collab-plugins/tests/disk/restore_test.rs @@ -2,6 +2,7 @@ use std::thread; use crate::disk::util::rocks_db; use collab_plugins::local_storage::kv::doc::CollabKVAction; +use collab_plugins::local_storage::kv::KVTransactionDB; use collab_plugins::CollabKVDB; use yrs::{Doc, GetString, Text, Transact}; @@ -33,7 +34,7 @@ async fn single_thread_test() { } drop(db); - let db = CollabKVDB::open_opt(path, false).unwrap(); + let db = CollabKVDB::open(path).unwrap(); for i in 0..100 { let oid = format!("doc_{}", i); let doc = Doc::new(); @@ -80,7 +81,7 @@ async fn rocks_multiple_thread_test() { } drop(db); - let db = CollabKVDB::open_opt(path, false).unwrap(); + let db = CollabKVDB::open(path).unwrap(); for i in 0..100 { let oid = format!("doc_{}", i); let doc = Doc::new(); diff --git a/collab-plugins/tests/disk/script.rs b/collab-plugins/tests/disk/script.rs index 3706e9a22..993264a97 100644 --- a/collab-plugins/tests/disk/script.rs +++ b/collab-plugins/tests/disk/script.rs @@ -11,6 +11,7 @@ use collab_plugins::local_storage::kv::doc::CollabKVAction; use collab_plugins::local_storage::rocksdb::rocksdb_plugin::RocksdbDiskPlugin; use collab_entity::CollabType; +use collab_plugins::local_storage::kv::KVTransactionDB; use collab_plugins::CollabKVDB; use tempfile::TempDir; @@ -70,7 +71,7 @@ impl CollabPersistenceTest { let tempdir = TempDir::new().unwrap(); let db_path = tempdir.into_path(); let uid = 1; - let db = Arc::new(CollabKVDB::open_opt(db_path.clone(), false).unwrap()); + let db = Arc::new(CollabKVDB::open(db_path.clone()).unwrap()); let cleaner = Cleaner::new(db_path); Self { uid, diff --git a/collab-plugins/tests/disk/util.rs b/collab-plugins/tests/disk/util.rs index a98c407db..b913252f7 100644 --- a/collab-plugins/tests/disk/util.rs +++ b/collab-plugins/tests/disk/util.rs @@ -7,5 +7,5 @@ pub fn rocks_db() -> (PathBuf, CollabKVDB) { let tempdir = TempDir::new().unwrap(); let path = tempdir.into_path(); let cloned_path = path.clone(); - (path, CollabKVDB::open_opt(cloned_path, false).unwrap()) + (path, CollabKVDB::open(cloned_path).unwrap()) } diff --git a/collab-plugins/tests/main.rs b/collab-plugins/tests/main.rs index 15fe9d24f..525c0ad0b 100644 --- a/collab-plugins/tests/main.rs +++ b/collab-plugins/tests/main.rs @@ -1,10 +1,12 @@ -use tracing_subscriber::util::SubscriberInitExt; - #[cfg(not(target_arch = "wasm32"))] mod disk; +#[cfg(target_arch = "wasm32")] +mod web; + #[cfg(not(target_arch = "wasm32"))] pub fn setup_log() { + use tracing_subscriber::util::SubscriberInitExt; static START: std::sync::Once = std::sync::Once::new(); START.call_once(|| { let level = "trace"; diff --git a/collab-plugins/tests/web/edit_collab_test.rs b/collab-plugins/tests/web/edit_collab_test.rs new file mode 100644 index 000000000..c19966a16 --- /dev/null +++ b/collab-plugins/tests/web/edit_collab_test.rs @@ -0,0 +1,106 @@ +use assert_json_diff::assert_json_eq; +use collab::core::collab::MutexCollab; +use collab::preclude::CollabBuilder; +use collab_entity::CollabType; +use collab_plugins::local_storage::indexeddb::indexeddb_plugin::IndexeddbDiskPlugin; +use collab_plugins::local_storage::indexeddb::kv_impl::CollabIndexeddb; +use js_sys::Promise; +use serde_json::json; +use std::sync::{Arc, Once}; +use uuid::Uuid; +use wasm_bindgen::prelude::*; +use wasm_bindgen_futures::JsFuture; +use wasm_bindgen_test::wasm_bindgen_test; +use web_sys::window; + +#[wasm_bindgen_test] +async fn edit_collab_with_indexeddb_test() { + setup_log(); + let object_id = Uuid::new_v4().to_string(); + let uid: i64 = 1; + let db = Arc::new(CollabIndexeddb::new().await.unwrap()); + let collab = create_collab(uid, object_id.clone(), &db).await; + collab.lock().insert("message", "hello world"); + let json_1 = collab.to_json_value(); + drop(collab); + + // sleep 2 secs to wait for the disk plugin to flush the data + sleep(2000).await; + let collab_from_disk = create_collab(uid, object_id.clone(), &db).await; + let json_2 = collab_from_disk.to_json_value(); + assert_json_eq!( + json_2, + json!({ + "message": "hello world" + }) + ); + assert_json_eq!(json_1, json_2); +} + +#[wasm_bindgen_test] +async fn flush_collab_with_indexeddb_test() { + setup_log(); + let object_id = Uuid::new_v4().to_string(); + let uid: i64 = 1; + let db = Arc::new(CollabIndexeddb::new().await.unwrap()); + let collab = create_collab(uid, object_id.clone(), &db).await; + collab.lock().insert("1", "a"); + sleep(100).await; + collab.lock().insert("2", "b"); + sleep(100).await; + collab.lock().insert("3", "c"); + sleep(100).await; + let json_1 = collab.to_json_value(); + collab.lock().flush(); + + // sleep 2 secs to wait for the disk plugin to flush the data + sleep(2000).await; + + // after flush, all the updates will be removed. Only the final doc state will be saved to disk + let updates = db.get_all_updates(uid, &object_id).await.unwrap(); + assert_eq!(updates.len(), 0); + + let collab_from_disk = create_collab(uid, object_id.clone(), &db).await; + let json_2 = collab_from_disk.to_json_value(); + assert_json_eq!(json_1, json_2); +} + +pub fn setup_log() { + static START: Once = Once::new(); + START.call_once(|| { + tracing_wasm::set_as_global_default(); + }); +} + +pub async fn create_collab( + uid: i64, + doc_id: String, + db: &Arc, +) -> Arc { + let collab = Arc::new( + CollabBuilder::new(uid, &doc_id) + .with_device_id("1") + .build() + .unwrap(), + ); + let disk_plugin = IndexeddbDiskPlugin::new(uid, doc_id, CollabType::Document, Arc::downgrade(db)); + collab.lock().add_plugin(Arc::new(disk_plugin)); + collab.lock().initialize().await; + sleep(1000).await; + collab +} + +async fn sleep(ms: i32) { + let promise = Promise::new(&mut |resolve, _| { + let closure = Closure::once_into_js(move || { + resolve.call0(&JsValue::NULL).unwrap(); + }); + + window() + .unwrap() + .set_timeout_with_callback_and_timeout_and_arguments_0(closure.as_ref().unchecked_ref(), ms) + .unwrap(); + }); + + JsFuture::from(promise).await.unwrap(); +} diff --git a/collab-plugins/tests/web/indexeddb_test.rs b/collab-plugins/tests/web/indexeddb_test.rs new file mode 100644 index 000000000..0e651f7c6 --- /dev/null +++ b/collab-plugins/tests/web/indexeddb_test.rs @@ -0,0 +1,97 @@ +use collab::core::collab_plugin::EncodedCollab; +use collab_plugins::local_storage::indexeddb::kv_impl::CollabIndexeddb; +use uuid::Uuid; +use wasm_bindgen_test::*; +use yrs::Doc; + +#[wasm_bindgen_test] +async fn indexeddb_put_and_get_encoded_collab_test() { + let db = CollabIndexeddb::new().await.unwrap(); + let object_id = Uuid::new_v4().to_string(); + let uid: i64 = 1; + let encoded_collab = EncodedCollab { + state_vector: vec![1, 2, 3].into(), + doc_state: vec![4, 5, 6].into(), + version: collab::core::collab_plugin::EncoderVersion::V1, + }; + + db.create_doc(uid, &object_id, &encoded_collab) + .await + .unwrap(); + let encoded_collab_from_db = db.get_encoded_collab(uid, &object_id).await.unwrap(); + + assert_eq!( + encoded_collab.state_vector, + encoded_collab_from_db.state_vector + ); + assert_eq!(encoded_collab.doc_state, encoded_collab_from_db.doc_state); +} + +#[wasm_bindgen_test] +async fn indexeddb_get_non_exist_encoded_collab_test() { + let db = CollabIndexeddb::new().await.unwrap(); + let object_id = Uuid::new_v4().to_string(); + let doc = Doc::new(); + let uid: i64 = 1; + let error = db.load_doc(uid, &object_id, doc).await.unwrap_err(); + assert!(error.is_record_not_found()); +} + +#[wasm_bindgen_test] +async fn indexeddb_push_update_test() { + let db = CollabIndexeddb::new().await.unwrap(); + let object_id = Uuid::new_v4().to_string(); + let uid: i64 = 1; + + db.create_doc_id(uid, &object_id).await.unwrap(); + let update_1 = vec![1, 2, 3]; + db.push_update(uid, &object_id, &update_1).await.unwrap(); + + let update_2 = vec![4, 5, 6]; + db.push_update(uid, &object_id, &update_2).await.unwrap(); + + let update_3 = vec![7, 8, 9]; + db.push_update(uid, &object_id, &update_3).await.unwrap(); + + let update_4 = vec![10, 11, 12]; + db.push_update(uid, &object_id, &update_4).await.unwrap(); + + let updates = db.get_all_updates(uid, &object_id).await.unwrap(); + assert_eq!(updates.len(), 4); + assert_eq!(updates[0], update_1); + assert_eq!(updates[1], update_2); + assert_eq!(updates[2], update_3); + assert_eq!(updates[3], update_4); +} + +#[wasm_bindgen_test] +async fn indexeddb_flush_doc_test() { + let db = CollabIndexeddb::new().await.unwrap(); + let object_id = Uuid::new_v4().to_string(); + let uid: i64 = 1; + + db.create_doc_id(uid, &object_id).await.unwrap(); + let update_1 = vec![1, 2, 3]; + db.push_update(uid, &object_id, &update_1).await.unwrap(); + + let update_2 = vec![4, 5, 6]; + db.push_update(uid, &object_id, &update_2).await.unwrap(); + + let update_3 = vec![7, 8, 9]; + db.push_update(uid, &object_id, &update_3).await.unwrap(); + + let update_4 = vec![10, 11, 12]; + db.push_update(uid, &object_id, &update_4).await.unwrap(); + + let encoded_collab = EncodedCollab { + state_vector: vec![1, 2, 3].into(), + doc_state: vec![4, 5, 6].into(), + version: collab::core::collab_plugin::EncoderVersion::V1, + }; + db.flush_doc(uid, &object_id, &encoded_collab) + .await + .unwrap(); + + let updates = db.get_all_updates(uid, &object_id).await.unwrap(); + assert_eq!(updates.len(), 0); +} diff --git a/collab-plugins/tests/web/mod.rs b/collab-plugins/tests/web/mod.rs new file mode 100644 index 000000000..821bd1efd --- /dev/null +++ b/collab-plugins/tests/web/mod.rs @@ -0,0 +1,5 @@ +use wasm_bindgen_test::wasm_bindgen_test_configure; +wasm_bindgen_test_configure!(run_in_browser); + +mod edit_collab_test; +mod indexeddb_test; diff --git a/collab-plugins/tests/web/setup_tests.js b/collab-plugins/tests/web/setup_tests.js new file mode 100644 index 000000000..5ea989ae2 --- /dev/null +++ b/collab-plugins/tests/web/setup_tests.js @@ -0,0 +1,6 @@ +function get_current_timestamp() { + return Date.now(); +} + +// Expose the function to the global scope so it's accessible to the WASM module +global.get_current_timestamp = get_current_timestamp; diff --git a/collab-plugins/tests/web/test.md b/collab-plugins/tests/web/test.md new file mode 100644 index 000000000..76a158426 --- /dev/null +++ b/collab-plugins/tests/web/test.md @@ -0,0 +1,17 @@ + +## Run clippy for web + +```shell +cargo clippy --target=wasm32-unknown-unknown --fix --allow-dirty --features="wasm_build" +``` + +## Run tests in Chrome +```shell +wasm-pack test --chrome --features="wasm_build" +``` + +## Build for web + +```shell +wasm-pack build --features="wasm_build" +``` \ No newline at end of file diff --git a/collab/Cargo.toml b/collab/Cargo.toml index 89a9b8e97..d3c31fd2f 100644 --- a/collab/Cargo.toml +++ b/collab/Cargo.toml @@ -22,6 +22,9 @@ async-trait.workspace = true bincode = "1.3.3" serde_repr = "0.1" +[target.'cfg(target_arch = "wasm32")'.dependencies] +web-sys = { version = "0.3.67"} + [dev-dependencies] tokio = { version = "1.26", features = ["rt"] } tempfile = "3.8.0" diff --git a/collab/src/core/transaction.rs b/collab/src/core/transaction.rs index c03aab815..2918a6d90 100644 --- a/collab/src/core/transaction.rs +++ b/collab/src/core/transaction.rs @@ -1,5 +1,5 @@ use std::thread::sleep; -use std::time::{Duration, Instant}; +use std::time::Duration; use crate::core::collab_plugin::EncodedCollab; use yrs::updates::encoder::Encode; @@ -15,7 +15,7 @@ use crate::error::CollabError; pub struct TransactionRetry<'a> { timeout: Duration, doc: &'a Doc, - start: Instant, + timer: Timer, retry_interval: Duration, } @@ -25,12 +25,12 @@ impl<'a> TransactionRetry<'a> { timeout: Duration::from_secs(2), retry_interval: Duration::from_millis(50), doc, - start: Instant::now(), + timer: Timer::start(), } } pub fn get_read_txn(&mut self) -> Transaction<'a> { - while self.start.elapsed() < self.timeout { + while self.timer.elapsed() < self.timeout { match self.doc.try_transact() { Ok(txn) => { return txn; @@ -45,7 +45,7 @@ impl<'a> TransactionRetry<'a> { } pub fn try_get_write_txn(&mut self) -> Result, CollabError> { - while self.start.elapsed() < self.timeout { + while self.timer.elapsed() < self.timeout { match self.doc.try_transact_mut() { Ok(txn) => { return Ok(txn); @@ -60,7 +60,7 @@ impl<'a> TransactionRetry<'a> { } pub fn get_write_txn_with(&mut self, origin: CollabOrigin) -> TransactionMut<'a> { - while self.start.elapsed() < self.timeout { + while self.timer.elapsed() < self.timeout { match self.doc.try_transact_mut_with(origin.clone()) { Ok(txn) => { return txn; @@ -78,7 +78,7 @@ impl<'a> TransactionRetry<'a> { &mut self, origin: CollabOrigin, ) -> Result, CollabError> { - while self.start.elapsed() < self.timeout { + while self.timer.elapsed() < self.timeout { match self.doc.try_transact_mut_with(origin.clone()) { Ok(txn) => { return Ok(txn); @@ -122,3 +122,37 @@ impl DocTransactionExtension for Doc { self.transact_mut() } } + +if_native! { + struct Timer { + start: std::time::Instant, + } + + impl Timer { + fn start() -> Self { + Self { start: std::time::Instant::now() } + } + + fn elapsed(&self) -> Duration { + self.start.elapsed() + } + } +} + +if_wasm! { + struct Timer { + start: f64, + } + + impl Timer { + fn start() -> Self { + Self { start: web_sys::js_sys::Date::now() } + } + + fn elapsed(&self) -> Duration { + let now = web_sys::js_sys::Date::now(); + let elapsed_ms = now - self.start; + Duration::from_millis(elapsed_ms as u64) + } + } +} diff --git a/collab/src/lib.rs b/collab/src/lib.rs index 84d8fa191..2a0c41cd6 100644 --- a/collab/src/lib.rs +++ b/collab/src/lib.rs @@ -1,3 +1,19 @@ +#[macro_export] +macro_rules! if_native { + ($($item:item)*) => {$( + #[cfg(not(target_arch = "wasm32"))] + $item + )*} +} + +#[macro_export] +macro_rules! if_wasm { + ($($item:item)*) => {$( + #[cfg(target_arch = "wasm32")] + $item + )*} +} + pub mod core; pub mod error; pub mod sync_protocol;