From 49313253b20eaa62555d9e2d76d96c148bcd698d Mon Sep 17 00:00:00 2001 From: HideBa Date: Wed, 22 Jan 2025 22:00:29 +0100 Subject: [PATCH] wip: re-structure folder and build wasm --- .gitignore | 5 +- .vscode/settings.json | 3 +- {src/rust/tests/data => examples}/.gitkeep | 0 src/rust/Cargo.toml | 50 +- src/rust/cli/Cargo.toml | 11 + src/rust/{ => cli}/src/main.rs | 2 +- src/rust/fcb_core/Cargo.toml | 73 +++ src/rust/{ => fcb_core}/benches/read.rs | 2 +- .../{ => fcb_core}/scripts/cjseq_to_fcb.sh | 0 src/rust/{ => fcb_core}/src/bin/read.rs | 4 +- src/rust/{ => fcb_core}/src/bin/write.rs | 8 +- src/rust/{ => fcb_core}/src/cj_utils.rs | 0 src/rust/fcb_core/src/const_vars.rs | 17 + src/rust/{ => fcb_core}/src/error.rs | 0 .../src/fb}/feature_generated.rs | 2 +- .../src/fb}/header_generated.rs | 0 src/rust/fcb_core/src/fb/mod.rs | 5 + .../src/http_reader/mock_http_range_client.rs | 107 ++++ .../{ => fcb_core}/src/http_reader/mod.rs | 26 +- .../fcb_core/src/http_reader/wasm_client.rs | 42 ++ src/rust/fcb_core/src/lib.rs | 29 + .../{ => fcb_core}/src/reader/city_buffer.rs | 9 +- .../{ => fcb_core}/src/reader/deserializer.rs | 3 +- .../{ => fcb_core}/src/reader/geom_decoder.rs | 4 +- src/rust/{ => fcb_core}/src/reader/mod.rs | 12 +- .../{ => fcb_core}/src/writer/attribute.rs | 2 +- .../src/writer/feature_writer.rs | 2 +- .../{ => fcb_core}/src/writer/geom_encoder.rs | 0 .../src/writer/header_writer.rs | 3 +- src/rust/{ => fcb_core}/src/writer/mod.rs | 4 +- .../{ => fcb_core}/src/writer/serializer.rs | 10 +- src/rust/{ => fcb_core}/tests/e2e.rs | 2 +- src/rust/{ => fcb_core}/tests/http.rs | 2 +- src/rust/{ => fcb_core}/tests/read.rs | 2 +- src/rust/{ => fcb_core}/tests/serde.rs | 4 +- src/rust/makefile | 7 +- src/rust/packed_rtree/Cargo.toml | 16 + .../src/lib.rs} | 14 +- src/rust/src/const_vars.rs | 17 - .../src/http_reader/mock_http_range_client.rs | 107 ---- src/rust/src/lib.rs | 27 - src/rust/wasm/Cargo.toml | 33 + src/rust/wasm/src/gloo_client.rs | 51 ++ src/rust/wasm/src/lib.rs | 571 ++++++++++++++++++ src/rust/wasm/src/range_client.rs | 39 ++ 45 files changed, 1089 insertions(+), 238 deletions(-) rename {src/rust/tests/data => examples}/.gitkeep (100%) create mode 100644 src/rust/cli/Cargo.toml rename src/rust/{ => cli}/src/main.rs (99%) create mode 100644 src/rust/fcb_core/Cargo.toml rename src/rust/{ => fcb_core}/benches/read.rs (99%) rename src/rust/{ => fcb_core}/scripts/cjseq_to_fcb.sh (100%) rename src/rust/{ => fcb_core}/src/bin/read.rs (95%) rename src/rust/{ => fcb_core}/src/bin/write.rs (88%) rename src/rust/{ => fcb_core}/src/cj_utils.rs (100%) create mode 100644 src/rust/fcb_core/src/const_vars.rs rename src/rust/{ => fcb_core}/src/error.rs (100%) rename src/rust/{src => fcb_core/src/fb}/feature_generated.rs (99%) rename src/rust/{src => fcb_core/src/fb}/header_generated.rs (100%) create mode 100644 src/rust/fcb_core/src/fb/mod.rs create mode 100644 src/rust/fcb_core/src/http_reader/mock_http_range_client.rs rename src/rust/{ => fcb_core}/src/http_reader/mod.rs (96%) create mode 100644 src/rust/fcb_core/src/http_reader/wasm_client.rs create mode 100644 src/rust/fcb_core/src/lib.rs rename src/rust/{ => fcb_core}/src/reader/city_buffer.rs (76%) rename src/rust/{ => fcb_core}/src/reader/deserializer.rs (99%) rename src/rust/{ => fcb_core}/src/reader/geom_decoder.rs (99%) rename src/rust/{ => fcb_core}/src/reader/mod.rs (97%) rename src/rust/{ => fcb_core}/src/writer/attribute.rs (99%) rename src/rust/{ => fcb_core}/src/writer/feature_writer.rs (98%) rename src/rust/{ => fcb_core}/src/writer/geom_encoder.rs (100%) rename src/rust/{ => fcb_core}/src/writer/header_writer.rs (97%) rename src/rust/{ => fcb_core}/src/writer/mod.rs (98%) rename src/rust/{ => fcb_core}/src/writer/serializer.rs (99%) rename src/rust/{ => fcb_core}/tests/e2e.rs (99%) rename src/rust/{ => fcb_core}/tests/http.rs (95%) rename src/rust/{ => fcb_core}/tests/read.rs (98%) rename src/rust/{ => fcb_core}/tests/serde.rs (99%) create mode 100644 src/rust/packed_rtree/Cargo.toml rename src/rust/{src/packedrtree.rs => packed_rtree/src/lib.rs} (99%) delete mode 100644 src/rust/src/const_vars.rs delete mode 100644 src/rust/src/http_reader/mock_http_range_client.rs create mode 100644 src/rust/wasm/Cargo.toml create mode 100644 src/rust/wasm/src/gloo_client.rs create mode 100644 src/rust/wasm/src/lib.rs create mode 100644 src/rust/wasm/src/range_client.rs diff --git a/.gitignore b/.gitignore index b48ed2f..addb1ce 100644 --- a/.gitignore +++ b/.gitignore @@ -58,6 +58,7 @@ __pycache__/ temp/ -src/rust/benchmark_data/ - +benchmark_data/ +src/rust/fcb_core/tests/data/ +!src/rust/fcb_core/tests/data/.gitkeep .cursorrules diff --git a/.vscode/settings.json b/.vscode/settings.json index 0aa2dee..c1cace6 100644 --- a/.vscode/settings.json +++ b/.vscode/settings.json @@ -11,5 +11,6 @@ "python.pythonPath": "${workspaceFolder}/src/py/.venv/bin/python", "rust-analyzer.server.extraEnv": { "RUST_LOG": "debug" - } + }, + "liveServer.settings.port": 5501 } \ No newline at end of file diff --git a/src/rust/tests/data/.gitkeep b/examples/.gitkeep similarity index 100% rename from src/rust/tests/data/.gitkeep rename to examples/.gitkeep diff --git a/src/rust/Cargo.toml b/src/rust/Cargo.toml index 195091e..d8175bf 100644 --- a/src/rust/Cargo.toml +++ b/src/rust/Cargo.toml @@ -3,49 +3,37 @@ name = "flatcitybuf" version = "0.1.0" edition = "2021" +[workspace] +members = ["cli", "fcb_core", "packed_rtree", "wasm"] +resolver = "2" -# See more keys and their definitions at https://doc.rust-lang.org/cargo/reference/manifest.html -[features] -default = [] -http = ["http-range-client", "bytes", "reqwest"] - -[dependencies] +[workspace.dependencies] +async-trait = "0.1.85" flatbuffers = "24.3.25" byteorder = "1.5.0" cjseq = { git = "https://github.com/HideBa/cjseq", branch = "develop" } tempfile = "3.14.0" serde_json = "1.0.133" +serde = "1.0.200" anyhow = "1.0.95" fallible-streaming-iterator = "0.1.9" clap = "4.5.23" -http-range-client = { version = "0.9.0", optional = true, default-features = false, features = [ - "reqwest-async", -] } -reqwest = { version = "0.12.12", optional = true } -bytes = { version = "1.9.0", optional = true } +http-range-client = { version = "0.9.0", default-features = false } +reqwest = { version = "0.12.12" } tracing = "0.1.41" -tokio = "1.43.0" - -[lib] -name = "flatcitybuf" -path = "src/lib.rs" - - -[[bin]] -name = "read" -path = "src/bin/read.rs" - -[[bin]] -name = "flatcitybuf_cli" -path = "src/main.rs" - -[dev-dependencies] -async-trait = "0.1.85" +bytes = "1.9.0" criterion = { version = "0.5.1", features = ["async_tokio", "html_reports"] } memory-stats = "1.2.0" pretty_assertions = "1.4.1" tokio = { version = "1.43.0", features = ["macros", "rt-multi-thread"] } +getrandom = { version = "0.2.15", features = ["js"] } +gloo-net = "0.6.0" +js-sys = "0.3.77" +wasm-bindgen = "0.2" +wasm-bindgen-futures = "0.4.50" +serde-wasm-bindgen = "0.6.5" +console_error_panic_hook = "0.1.7" +log = "0.4" +console_log = "0.2" -[[bench]] -name = "read" -harness = false +[dependencies] diff --git a/src/rust/cli/Cargo.toml b/src/rust/cli/Cargo.toml new file mode 100644 index 0000000..28989b9 --- /dev/null +++ b/src/rust/cli/Cargo.toml @@ -0,0 +1,11 @@ +[package] +name = "fcb_cli" +version = "0.1.0" +edition = "2021" + +[dependencies] +fcb_core = { path = "../fcb_core", default-features = false } +clap = { workspace = true } +anyhow = { workspace = true } +serde = { workspace = true } +serde_json = { workspace = true } diff --git a/src/rust/src/main.rs b/src/rust/cli/src/main.rs similarity index 99% rename from src/rust/src/main.rs rename to src/rust/cli/src/main.rs index 0de27f3..68e29fa 100644 --- a/src/rust/src/main.rs +++ b/src/rust/cli/src/main.rs @@ -1,6 +1,6 @@ use anyhow::Result; use clap::{Parser, Subcommand}; -use flatcitybuf::{ +use fcb_core::{ attribute::{AttributeSchema, AttributeSchemaMethods}, deserializer, header_writer::HeaderWriterOptions, diff --git a/src/rust/fcb_core/Cargo.toml b/src/rust/fcb_core/Cargo.toml new file mode 100644 index 0000000..1f95e04 --- /dev/null +++ b/src/rust/fcb_core/Cargo.toml @@ -0,0 +1,73 @@ +[package] +name = "fcb_core" +version = "0.1.0" +edition = "2021" + + +# See more keys and their definitions at https://doc.rust-lang.org/cargo/reference/manifest.html +[features] +default = ["http"] +http = ["http-range-client", "reqwest", "bytes"] +wasm = [ + "wasm-bindgen", + "wasm-bindgen-futures", + "gloo-net", + "js-sys", + "getrandom", +] + +[dependencies] +bytes = { workspace = true, optional = true } +flatbuffers = { workspace = true } +byteorder = { workspace = true } +cjseq = { workspace = true } +tempfile = { workspace = true } +serde_json = { workspace = true } +anyhow = { workspace = true } +fallible-streaming-iterator = { workspace = true } +clap = { workspace = true } +tracing = { workspace = true } +packed_rtree = { path = "../packed_rtree" } +http-range-client = { workspace = true, default-features = false, optional = true, features = [ + "reqwest-async", +] } +reqwest = { workspace = true, optional = true } + +[target.'cfg(feature = "http")'.dependencies] +tokio = { workspace = true } +http-range-client = { workspace = true, default-features = false, optional = true, features = [ + "reqwest-async", +] } +reqwest = { workspace = true, optional = true } +getrandom = { workspace = true, optional = true, default-features = false, features = [ + "js", +] } +async-trait = { workspace = true, optional = true, default-features = false } +gloo-net = { workspace = true, optional = true } +js-sys = { workspace = true, optional = true } +wasm-bindgen = { workspace = true, optional = true } +wasm-bindgen-futures = { workspace = true, optional = true } + + +[lib] +name = "fcb_core" +path = "src/lib.rs" + +[[bin]] +name = "read" +path = "src/bin/read.rs" + + +[[bench]] +name = "read" +harness = false + +[dev-dependencies] +async-trait = { workspace = true } +memory-stats = { workspace = true } +pretty_assertions = { workspace = true } + +# [target.'cfg(not(feature = "wasm"))'.dev-dependencies] +# criterion = { workspace = true, features = ["async_tokio", "html_reports"] } +# [target.'cfg(not(target_arch = "wasm32"))'.dev-dependencies] +# tokio = { workspace = true, features = ["macros", "rt-multi-thread"] } diff --git a/src/rust/benches/read.rs b/src/rust/fcb_core/benches/read.rs similarity index 99% rename from src/rust/benches/read.rs rename to src/rust/fcb_core/benches/read.rs index 42a76f6..4b04ef3 100644 --- a/src/rust/benches/read.rs +++ b/src/rust/fcb_core/benches/read.rs @@ -1,7 +1,7 @@ use anyhow::Result; use cjseq::{CityJSON, CityJSONFeature}; use criterion::{black_box, criterion_group, criterion_main, BenchmarkId, Criterion}; -use flatcitybuf::{FcbReader, GeometryType}; +use fcb_core::{FcbReader, GeometryType}; use std::{ collections::HashMap, fs::File, diff --git a/src/rust/scripts/cjseq_to_fcb.sh b/src/rust/fcb_core/scripts/cjseq_to_fcb.sh similarity index 100% rename from src/rust/scripts/cjseq_to_fcb.sh rename to src/rust/fcb_core/scripts/cjseq_to_fcb.sh diff --git a/src/rust/src/bin/read.rs b/src/rust/fcb_core/src/bin/read.rs similarity index 95% rename from src/rust/src/bin/read.rs rename to src/rust/fcb_core/src/bin/read.rs index 9fb3eec..bc4cc7e 100644 --- a/src/rust/src/bin/read.rs +++ b/src/rust/fcb_core/src/bin/read.rs @@ -1,5 +1,5 @@ -use flatcitybuf::deserializer::to_cj_metadata; -use flatcitybuf::FcbReader; +use fcb_core::deserializer::to_cj_metadata; +use fcb_core::FcbReader; use std::error::Error; use std::fs::File; use std::io::{BufReader, BufWriter, Write}; diff --git a/src/rust/src/bin/write.rs b/src/rust/fcb_core/src/bin/write.rs similarity index 88% rename from src/rust/src/bin/write.rs rename to src/rust/fcb_core/src/bin/write.rs index a6019f3..66d7f5f 100644 --- a/src/rust/src/bin/write.rs +++ b/src/rust/fcb_core/src/bin/write.rs @@ -1,6 +1,8 @@ -use flatcitybuf::attribute::{AttributeSchema, AttributeSchemaMethods}; -use flatcitybuf::header_writer::HeaderWriterOptions; -use flatcitybuf::{read_cityjson_from_reader, CJType, CJTypeKind, CityJSONSeq, FcbWriter}; +use fcb_core::{ + attribute::{AttributeSchema, AttributeSchemaMethods}, + header_writer::HeaderWriterOptions, + read_cityjson_from_reader, CJType, CJTypeKind, CityJSONSeq, FcbWriter, +}; use std::error::Error; use std::fs::File; use std::io::{BufReader, BufWriter}; diff --git a/src/rust/src/cj_utils.rs b/src/rust/fcb_core/src/cj_utils.rs similarity index 100% rename from src/rust/src/cj_utils.rs rename to src/rust/fcb_core/src/cj_utils.rs diff --git a/src/rust/fcb_core/src/const_vars.rs b/src/rust/fcb_core/src/const_vars.rs new file mode 100644 index 0000000..67eefce --- /dev/null +++ b/src/rust/fcb_core/src/const_vars.rs @@ -0,0 +1,17 @@ +// Current version of FlatCityBuf +pub const VERSION: u8 = 1; + +// Magic bytes for FlatCityBuf +pub const MAGIC_BYTES: [u8; 8] = [b'f', b'c', b'b', VERSION, b'f', b'c', b'b', 0]; + +// Maximum buffer size for header +pub const HEADER_MAX_BUFFER_SIZE: usize = 1024 * 1024 * 512; // 512MB + +// Size of magic bytes +pub const MAGIC_BYTES_SIZE: usize = 8; + +// Size of header size +pub const HEADER_SIZE_SIZE: usize = 4; + +// // Offset of header size +// pub(crate) const HEADER_SIZE_OFFSET: usize = MAGIC_BYTES_SIZE + HEADER_SIZE_SIZE; diff --git a/src/rust/src/error.rs b/src/rust/fcb_core/src/error.rs similarity index 100% rename from src/rust/src/error.rs rename to src/rust/fcb_core/src/error.rs diff --git a/src/rust/src/feature_generated.rs b/src/rust/fcb_core/src/fb/feature_generated.rs similarity index 99% rename from src/rust/src/feature_generated.rs rename to src/rust/fcb_core/src/fb/feature_generated.rs index ae39ab1..567914d 100644 --- a/src/rust/src/feature_generated.rs +++ b/src/rust/fcb_core/src/fb/feature_generated.rs @@ -2,7 +2,7 @@ // @generated -use crate::header_generated::*; +use crate::fb::*; use core::cmp::Ordering; use core::mem; diff --git a/src/rust/src/header_generated.rs b/src/rust/fcb_core/src/fb/header_generated.rs similarity index 100% rename from src/rust/src/header_generated.rs rename to src/rust/fcb_core/src/fb/header_generated.rs diff --git a/src/rust/fcb_core/src/fb/mod.rs b/src/rust/fcb_core/src/fb/mod.rs new file mode 100644 index 0000000..2e903cd --- /dev/null +++ b/src/rust/fcb_core/src/fb/mod.rs @@ -0,0 +1,5 @@ +pub mod feature_generated; +pub mod header_generated; + +pub use feature_generated::*; +pub use header_generated::*; diff --git a/src/rust/fcb_core/src/http_reader/mock_http_range_client.rs b/src/rust/fcb_core/src/http_reader/mock_http_range_client.rs new file mode 100644 index 0000000..e9e90de --- /dev/null +++ b/src/rust/fcb_core/src/http_reader/mock_http_range_client.rs @@ -0,0 +1,107 @@ +// use crate::http_reader::HttpFcbReader; +// use anyhow::Result; +// use bytes::Bytes; +// use http_range_client; +// use std::fs::File; +// use std::io::{BufReader, Read, Seek, SeekFrom}; +// use std::ops::Range; +// use std::path::PathBuf; +// use std::sync::{Arc, RwLock}; +// use tracing::trace; + +// impl HttpFcbReader { +// /// NOTE: For debugging expediency, this test class often prefers panics over returning a result. +// pub async fn mock_from_file( +// path: &str, +// ) -> Result<( +// HttpFcbReader, +// Arc>, +// )> { +// trace!("starting: opening http reader, reading header"); + +// let stats = Arc::new(RwLock::new(RequestStats::new())); +// let http_client = MockHttpRangeClient::new(path, stats.clone()); +// let client = http_range_client::AsyncBufferedHttpRangeClient::with(http_client, path); +// Ok((Self::_open(client).await?, stats)) +// } +// } + +// /// NOTE: For debugging expediency, this test class often prefers panics over returning a result. +// pub(crate) struct MockHttpRangeClient { +// path: PathBuf, +// stats: Arc>, +// } + +// pub(crate) struct RequestStats { +// pub request_count: u64, +// pub bytes_requested: u64, +// } + +// impl RequestStats { +// fn new() -> Self { +// Self { +// request_count: 0, +// bytes_requested: 0, +// } +// } +// } + +// #[async_trait::async_trait] +// impl AsyncHttpRangeClient for MockHttpRangeClient { +// async fn get_range(&self, url: &str, range: &str) -> http_range_client::Result { +// assert_eq!(url, self.path.to_str().unwrap()); + +// /// This is a hack, but we need the start and length of the range +// /// since all we're given is the pre-formatted range string, we +// /// need to parse it into its components +// /// +// /// For expediency, this test code panics rather than returns a result. +// fn parse_range_header(range: &str) -> Range { +// let bytes = range.strip_prefix("bytes=").unwrap(); +// let parts: Vec<&str> = bytes.split('-').collect(); +// assert!(parts.len() == 2); +// let start = parts[0].parse().expect("should have valid start range"); +// let end: u64 = parts[1].parse().expect("should have valid end range"); +// // Range headers are *inclusive* +// start..(end + 1) +// } + +// let range = parse_range_header(range); +// let request_length = range.end - range.start; + +// let mut stats = self +// .stats +// .write() +// .expect("test code does not handle actual concurrency"); + +// stats.request_count += 1; +// stats.bytes_requested += request_length; + +// let mut file_reader = BufReader::new(File::open(&self.path).unwrap()); +// file_reader +// .seek(SeekFrom::Start(range.start)) +// .expect("unable to seek test reader"); +// let mut output = vec![0; request_length as usize]; +// file_reader +// .read_exact(&mut output) +// .expect("failed to read from test reader"); +// Ok(Bytes::from(output)) +// } + +// async fn head_response_header( +// &self, +// _url: &str, +// _header: &str, +// ) -> http_range_client::Result> { +// unimplemented!() +// } +// } + +// impl MockHttpRangeClient { +// fn new(path: &str, stats: Arc>) -> Self { +// Self { +// path: path.into(), +// stats, +// } +// } +// } diff --git a/src/rust/src/http_reader/mod.rs b/src/rust/fcb_core/src/http_reader/mod.rs similarity index 96% rename from src/rust/src/http_reader/mod.rs rename to src/rust/fcb_core/src/http_reader/mod.rs index 8a815f1..1de833b 100644 --- a/src/rust/src/http_reader/mod.rs +++ b/src/rust/fcb_core/src/http_reader/mod.rs @@ -1,17 +1,23 @@ use crate::deserializer::to_cj_feature; -use crate::packedrtree::{HttpRange, HttpSearchResultItem, NodeItem, PackedRTree}; +use crate::fb::*; + use crate::reader::city_buffer::FcbBuffer; use crate::{ - check_magic_bytes, size_prefixed_root_as_city_feature, HEADER_MAX_BUFFER_SIZE, HEADER_SIZE_SIZE, + check_magic_bytes, size_prefixed_root_as_city_feature, HEADER_MAX_BUFFER_SIZE, + HEADER_SIZE_SIZE, MAGIC_BYTES_SIZE, }; -use crate::{header_generated::*, MAGIC_BYTES_SIZE}; use anyhow::{anyhow, Result}; use byteorder::{ByteOrder, LittleEndian}; use bytes::{BufMut, Bytes, BytesMut}; use cjseq::CityJSONFeature; -use http_range_client::{ - AsyncBufferedHttpRangeClient, AsyncHttpRangeClient, BufferedHttpRangeClient, -}; +#[cfg(feature = "http")] +use http_range_client::{AsyncBufferedHttpRangeClient, AsyncHttpRangeClient}; +use reqwest; + +#[cfg(feature = "http")] +use http_range_client::BufferedHttpRangeClient; + +use packed_rtree::{http::HttpRange, http::HttpSearchResultItem, NodeItem, PackedRTree}; use std::collections::VecDeque; use std::ops::Range; use tracing::debug; @@ -20,19 +26,22 @@ use tracing::trace; #[cfg(test)] mod mock_http_range_client; +// #[cfg(feature = "wasm")] +// mod wasm_client; + // The largest request we'll speculatively make. // If a single huge feature requires, we'll necessarily exceed this limit. const DEFAULT_HTTP_FETCH_SIZE: usize = 1_048_576; // 1MB /// FlatCityBuf dataset HTTP reader -pub struct HttpFcbReader { +pub struct HttpFcbReader { client: AsyncBufferedHttpRangeClient, // feature reading requires header access, therefore // header_buf is included in the FcbBuffer struct. fbs: FcbBuffer, } -pub struct AsyncFeatureIter { +pub struct AsyncFeatureIter { client: AsyncBufferedHttpRangeClient, // feature reading requires header access, therefore // header_buf is included in the FcbBuffer struct. @@ -43,6 +52,7 @@ pub struct AsyncFeatureIter { count: usize, } +#[cfg(feature = "http")] impl HttpFcbReader { pub async fn open(url: &str) -> Result> { println!("open===: {:?}", url); diff --git a/src/rust/fcb_core/src/http_reader/wasm_client.rs b/src/rust/fcb_core/src/http_reader/wasm_client.rs new file mode 100644 index 0000000..959a48f --- /dev/null +++ b/src/rust/fcb_core/src/http_reader/wasm_client.rs @@ -0,0 +1,42 @@ +// use crate::HttpFcbReader; +// use anyhow::Result; +// use bytes::Bytes; +// #[cfg(feature = "wasm")] +// use gloo_net::http::Request; +// use http_range_client::AsyncHttpRangeClient; +// use std::fs::File; +// use std::io::{BufReader, Read, Seek, SeekFrom}; +// use std::ops::Range; +// use std::path::PathBuf; +// use std::sync::{Arc, RwLock}; +// use tracing::trace; +// #[cfg(feature = "wasm")] +// use wasm_bindgen::prelude::*; + +// #[wasm_bindgen] +// #[cfg(feature = "wasm")] +// pub type HttpClient = GlooRequest; + +// #[cfg(feature = "wasm")] +// impl HttpFcbReader { +// pub async fn open(url: &str) -> Result> { +// trace!("starting: opening http reader, reading header"); + +// // let stats = Arc::new(RwLock::new(RequestStats::new())); +// // let http_client = MockHttpRangeClient::new(path, stats.clone()); +// // let client = http_range_client::AsyncBufferedHttpRangeClient::with(http_client, path); +// // Ok((Self::_open(client).await?, stats)) +// todo!("implement me") +// } +// } + +// #[cfg(feature = "wasm")] +// impl AsyncHttpRangeClient for GlooRequest { +// async fn get_range(&self, url: &str, range: &str) -> Result { +// todo!("implement me") +// } + +// async fn head_response_header(&self, url: &str, header: &str) -> Result> { +// todo!("implement me") +// } +// } diff --git a/src/rust/fcb_core/src/lib.rs b/src/rust/fcb_core/src/lib.rs new file mode 100644 index 0000000..9d9926b --- /dev/null +++ b/src/rust/fcb_core/src/lib.rs @@ -0,0 +1,29 @@ +#![allow(clippy::manual_range_contains)] + +mod cj_utils; +pub mod const_vars; +mod error; +mod fb; +#[allow(dead_code, unused_imports, clippy::all, warnings)] +#[cfg(feature = "http")] +#[cfg(feature = "wasm")] +mod http_reader; + +mod reader; +mod writer; + +pub use cj_utils::*; +pub use const_vars::*; +// pub use fb as fb_generated; +pub use fb::*; + +#[cfg(feature = "http")] +#[cfg(feature = "wasm")] +pub use http_reader::*; + +pub use reader::*; +pub use writer::*; + +pub fn check_magic_bytes(bytes: &[u8]) -> bool { + bytes[0..3] == MAGIC_BYTES[0..3] && bytes[4..7] == MAGIC_BYTES[4..7] && bytes[3] <= VERSION +} diff --git a/src/rust/src/reader/city_buffer.rs b/src/rust/fcb_core/src/reader/city_buffer.rs similarity index 76% rename from src/rust/src/reader/city_buffer.rs rename to src/rust/fcb_core/src/reader/city_buffer.rs index fa1c6b2..3a4d51a 100644 --- a/src/rust/src/reader/city_buffer.rs +++ b/src/rust/fcb_core/src/reader/city_buffer.rs @@ -1,16 +1,15 @@ use crate::deserializer::to_cj_feature; -use crate::feature_generated::*; -use crate::header_generated::*; +use crate::fb::*; use anyhow::Result; use cjseq::CityJSONFeature; pub struct FcbBuffer { - pub(crate) header_buf: Vec, - pub(crate) features_buf: Vec, + pub header_buf: Vec, + pub features_buf: Vec, } impl FcbBuffer { - pub(crate) fn header(&self) -> Header { + pub fn header(&self) -> Header { unsafe { size_prefixed_root_as_header_unchecked(&self.header_buf) } } diff --git a/src/rust/src/reader/deserializer.rs b/src/rust/fcb_core/src/reader/deserializer.rs similarity index 99% rename from src/rust/src/reader/deserializer.rs rename to src/rust/fcb_core/src/reader/deserializer.rs index 3c23101..82a08a7 100644 --- a/src/rust/src/reader/deserializer.rs +++ b/src/rust/fcb_core/src/reader/deserializer.rs @@ -1,9 +1,8 @@ use std::collections::HashMap; use crate::{ - feature_generated::{CityFeature, CityObjectType, Geometry, Vertex}, + fb::*, geom_decoder::{decode, decode_semantics}, - header_generated::*, }; use anyhow::{Context, Result}; use byteorder::{ByteOrder, LittleEndian}; diff --git a/src/rust/src/reader/geom_decoder.rs b/src/rust/fcb_core/src/reader/geom_decoder.rs similarity index 99% rename from src/rust/src/reader/geom_decoder.rs rename to src/rust/fcb_core/src/reader/geom_decoder.rs index 761c4f5..33494b4 100644 --- a/src/rust/src/reader/geom_decoder.rs +++ b/src/rust/fcb_core/src/reader/geom_decoder.rs @@ -3,7 +3,7 @@ use cjseq::{ SemanticsValues, }; -use crate::feature_generated::{GeometryType, SemanticObject, SemanticSurfaceType}; +use crate::fb::{GeometryType, SemanticObject, SemanticSurfaceType}; /// For semantics decoding, we only care about solids and shells. /// We stop recursing at d <= 2 which are surfaces, rings and points (meaning we just return semantic_indices). @@ -387,7 +387,7 @@ impl GeometryType { #[cfg(test)] mod tests { use crate::{ - feature_generated::{ + fb::feature_generated::{ root_as_city_feature, CityFeature, CityFeatureArgs, CityObject, CityObjectArgs, GeometryType, }, diff --git a/src/rust/src/reader/mod.rs b/src/rust/fcb_core/src/reader/mod.rs similarity index 97% rename from src/rust/src/reader/mod.rs rename to src/rust/fcb_core/src/reader/mod.rs index 098cecb..33ff9e1 100644 --- a/src/rust/src/reader/mod.rs +++ b/src/rust/fcb_core/src/reader/mod.rs @@ -4,11 +4,13 @@ use city_buffer::*; use cjseq::CityJSONFeature; use deserializer::to_cj_feature; -use crate::feature_generated::{size_prefixed_root_as_city_feature, CityFeature}; -use crate::{check_magic_bytes, PackedRTree, HEADER_MAX_BUFFER_SIZE}; -use crate::{header_generated::*, packedrtree}; +use crate::fb::{size_prefixed_root_as_city_feature, CityFeature}; +use crate::{ + check_magic_bytes, size_prefixed_root_as_header, Column, Header, HEADER_MAX_BUFFER_SIZE, +}; use anyhow::{anyhow, Result}; use fallible_streaming_iterator::FallibleStreamingIterator; +use packed_rtree::PackedRTree; use std::io::{self, Read, Seek, SeekFrom, Write}; pub mod geom_decoder; @@ -27,7 +29,7 @@ pub struct FeatureIter { // header_buf is included in the FgbFeature struct. buffer: FcbBuffer, /// Select>ed features or None if no bbox filter - item_filter: Option>, + item_filter: Option>, /// Number of selected features (None for undefined feature count) count: Option, /// Current feature number @@ -341,7 +343,7 @@ impl FeatureIter { reader: R, verify: bool, buffer: FcbBuffer, - item_filter: Option>, + item_filter: Option>, ) -> FeatureIter { let mut iter = FeatureIter { reader, diff --git a/src/rust/src/writer/attribute.rs b/src/rust/fcb_core/src/writer/attribute.rs similarity index 99% rename from src/rust/src/writer/attribute.rs rename to src/rust/fcb_core/src/writer/attribute.rs index 36b9387..b49289f 100644 --- a/src/rust/src/writer/attribute.rs +++ b/src/rust/fcb_core/src/writer/attribute.rs @@ -1,4 +1,4 @@ -use crate::header_generated::ColumnType; +use crate::fb::ColumnType; use byteorder::{ByteOrder, LittleEndian}; use serde_json::Value; use std::collections::HashMap; diff --git a/src/rust/src/writer/feature_writer.rs b/src/rust/fcb_core/src/writer/feature_writer.rs similarity index 98% rename from src/rust/src/writer/feature_writer.rs rename to src/rust/fcb_core/src/writer/feature_writer.rs index f3e8d75..3cafc8c 100644 --- a/src/rust/src/writer/feature_writer.rs +++ b/src/rust/fcb_core/src/writer/feature_writer.rs @@ -4,7 +4,7 @@ use crate::serializer::*; use super::attribute::AttributeSchema; -use crate::packedrtree::NodeItem; +use packed_rtree::NodeItem; /// A writer that converts CityJSON features to FlatBuffers format /// diff --git a/src/rust/src/writer/geom_encoder.rs b/src/rust/fcb_core/src/writer/geom_encoder.rs similarity index 100% rename from src/rust/src/writer/geom_encoder.rs rename to src/rust/fcb_core/src/writer/geom_encoder.rs diff --git a/src/rust/src/writer/header_writer.rs b/src/rust/fcb_core/src/writer/header_writer.rs similarity index 97% rename from src/rust/src/writer/header_writer.rs rename to src/rust/fcb_core/src/writer/header_writer.rs index a2b641d..3a73d4d 100644 --- a/src/rust/src/writer/header_writer.rs +++ b/src/rust/fcb_core/src/writer/header_writer.rs @@ -1,6 +1,7 @@ -use crate::{serializer::to_fcb_header, PackedRTree}; +use crate::serializer::to_fcb_header; use cjseq::CityJSON; use flatbuffers::FlatBufferBuilder; +use packed_rtree::PackedRTree; use super::attribute::AttributeSchema; diff --git a/src/rust/src/writer/mod.rs b/src/rust/fcb_core/src/writer/mod.rs similarity index 98% rename from src/rust/src/writer/mod.rs rename to src/rust/fcb_core/src/writer/mod.rs index 6c8bae2..8db60d1 100644 --- a/src/rust/src/writer/mod.rs +++ b/src/rust/fcb_core/src/writer/mod.rs @@ -1,9 +1,11 @@ -use crate::{calc_extent, hilbert_sort, NodeItem, PackedRTree, MAGIC_BYTES}; +use crate::MAGIC_BYTES; use anyhow::Result; use attribute::AttributeSchema; use cjseq::{CityJSON, CityJSONFeature}; use feature_writer::FeatureWriter; use header_writer::{HeaderWriter, HeaderWriterOptions}; +use packed_rtree::{calc_extent, hilbert_sort, NodeItem, PackedRTree}; + use std::fs::File; use std::io::{BufReader, BufWriter, Read, Seek, SeekFrom, Write}; diff --git a/src/rust/src/writer/serializer.rs b/src/rust/fcb_core/src/writer/serializer.rs similarity index 99% rename from src/rust/src/writer/serializer.rs rename to src/rust/fcb_core/src/writer/serializer.rs index 061f0f5..83b13a8 100644 --- a/src/rust/src/writer/serializer.rs +++ b/src/rust/fcb_core/src/writer/serializer.rs @@ -1,20 +1,20 @@ use crate::attribute::{encode_attributes_with_schema, AttributeSchema, AttributeSchemaMethods}; -use crate::feature_generated::{ +use crate::fb::{ CityFeature, CityFeatureArgs, CityObject, CityObjectArgs, CityObjectType, Geometry, GeometryArgs, GeometryType, SemanticObject, SemanticObjectArgs, SemanticSurfaceType, Vertex, }; -use crate::geom_encoder::encode; -use crate::header_generated::{ +use crate::fb::{ GeographicalExtent, Header, HeaderArgs, ReferenceSystem, ReferenceSystemArgs, Transform, Vector, }; -use crate::{Column, ColumnArgs, NodeItem}; - +use crate::geom_encoder::encode; +use crate::{Column, ColumnArgs}; use cjseq::{ CityJSON, CityJSONFeature, CityObject as CjCityObject, Geometry as CjGeometry, GeometryType as CjGeometryType, PointOfContact as CjPointOfContact, ReferenceSystem as CjReferenceSystem, Transform as CjTransform, }; use flatbuffers::FlatBufferBuilder; +use packed_rtree::NodeItem; use serde_json::Value; use super::geom_encoder::{GMBoundaries, GMSemantics}; diff --git a/src/rust/tests/e2e.rs b/src/rust/fcb_core/tests/e2e.rs similarity index 99% rename from src/rust/tests/e2e.rs rename to src/rust/fcb_core/tests/e2e.rs index b746df7..9692d9a 100644 --- a/src/rust/tests/e2e.rs +++ b/src/rust/fcb_core/tests/e2e.rs @@ -1,5 +1,5 @@ use anyhow::Result; -use flatcitybuf::{ +use fcb_core::{ attribute::{AttributeSchema, AttributeSchemaMethods}, deserializer, header_writer::HeaderWriterOptions, diff --git a/src/rust/tests/http.rs b/src/rust/fcb_core/tests/http.rs similarity index 95% rename from src/rust/tests/http.rs rename to src/rust/fcb_core/tests/http.rs index 1b234c3..82bb359 100644 --- a/src/rust/tests/http.rs +++ b/src/rust/fcb_core/tests/http.rs @@ -4,7 +4,7 @@ use std::{ io::{BufWriter, Write}, }; -use flatcitybuf::{deserializer::to_cj_metadata, HttpFcbReader}; +use fcb_core::{deserializer::to_cj_metadata, HttpFcbReader}; use anyhow::Result; async fn read_http_file(path: &str) -> Result<(), Box> { diff --git a/src/rust/tests/read.rs b/src/rust/fcb_core/tests/read.rs similarity index 98% rename from src/rust/tests/read.rs rename to src/rust/fcb_core/tests/read.rs index ad8246b..43c0dad 100644 --- a/src/rust/tests/read.rs +++ b/src/rust/fcb_core/tests/read.rs @@ -1,5 +1,5 @@ use anyhow::Result; -use flatcitybuf::FcbReader; +use fcb_core::FcbReader; use std::{fs::File, io::BufReader, path::PathBuf}; #[test] diff --git a/src/rust/tests/serde.rs b/src/rust/fcb_core/tests/serde.rs similarity index 99% rename from src/rust/tests/serde.rs rename to src/rust/fcb_core/tests/serde.rs index 00af82c..a21db5a 100644 --- a/src/rust/tests/serde.rs +++ b/src/rust/fcb_core/tests/serde.rs @@ -1,12 +1,12 @@ use anyhow::Result; -use flatbuffers::FlatBufferBuilder; -use flatcitybuf::{ +use fcb_core::{ attribute::{AttributeSchema, AttributeSchemaMethods}, deserializer::decode_attributes, root_as_city_feature, root_as_header, serializer::{to_columns, to_fcb_attribute}, CityFeature, CityFeatureArgs, CityObject, CityObjectArgs, Header, HeaderArgs, }; +use flatbuffers::FlatBufferBuilder; use serde_json::json; #[test] diff --git a/src/rust/makefile b/src/rust/makefile index 451315e..c62fefe 100644 --- a/src/rust/makefile +++ b/src/rust/makefile @@ -11,7 +11,7 @@ pre-commit: .PHONY: ser ser: - cargo run --all-features --bin flatcitybuf_cli ser -i tests/data/delft.city.jsonl -o temp/delft_attr.fcb + cargo run --all-features --bin flatcitybuf_cli ser -i fcb_core/tests/data/delft.city.jsonl -o temp/delft_attr.fcb # cargo run --bin flatcitybuf_cli serialize -i tests/data/delft.city.jsonl -o temp/delft.fcb .PHONY: deser @@ -22,3 +22,8 @@ deser: .PHONY: bench bench: cargo bench --bench read + +.PHONY: wasm-build +wasm-build: + wasm-pack build --target web --debug +# cargo build --target web --release \ No newline at end of file diff --git a/src/rust/packed_rtree/Cargo.toml b/src/rust/packed_rtree/Cargo.toml new file mode 100644 index 0000000..9f42b8b --- /dev/null +++ b/src/rust/packed_rtree/Cargo.toml @@ -0,0 +1,16 @@ +[package] +name = "packed_rtree" +version = "0.1.0" +edition = "2021" + +[features] +default = ["http"] +http = ["http-range-client"] + +[dependencies] +anyhow = { workspace = true } +byteorder = { workspace = true } +http-range-client = { workspace = true, optional = true, default-features = false } +tracing = { workspace = true } + +[dev-dependencies] diff --git a/src/rust/src/packedrtree.rs b/src/rust/packed_rtree/src/lib.rs similarity index 99% rename from src/rust/src/packedrtree.rs rename to src/rust/packed_rtree/src/lib.rs index 4d12a20..a152c83 100644 --- a/src/rust/src/packedrtree.rs +++ b/src/rust/packed_rtree/src/lib.rs @@ -16,9 +16,7 @@ use anyhow::Result; use byteorder::{LittleEndian, ReadBytesExt, WriteBytesExt}; use core::f64; #[cfg(feature = "http")] -use http_range_client::{ - AsyncBufferedHttpRangeClient, AsyncHttpRangeClient, BufferedHttpRangeClient, -}; +use http_range_client::{AsyncBufferedHttpRangeClient, AsyncHttpRangeClient}; use std::cmp::{max, min}; use std::collections::VecDeque; use std::io::{Cursor, Read, Seek, SeekFrom, Write}; @@ -380,9 +378,9 @@ impl PackedRTree { } #[cfg(feature = "http")] - async fn read_http( + async fn read_http( &mut self, - client: &mut BufferedHttpRangeClient, + client: &mut AsyncBufferedHttpRangeClient, index_begin: usize, ) -> Result<()> { let min_req_size = self.size(); // read full index at once @@ -440,8 +438,8 @@ impl PackedRTree { } #[cfg(feature = "http")] - pub async fn from_http( - client: &mut BufferedHttpRangeClient, + pub async fn from_http( + client: &mut AsyncBufferedHttpRangeClient, index_begin: usize, num_items: usize, node_size: u16, @@ -730,7 +728,7 @@ impl PackedRTree { } #[cfg(feature = "http")] -pub(crate) mod http { +pub mod http { use std::ops::{Range, RangeFrom}; /// Byte range within a file. Suitable for an HTTP Range request. diff --git a/src/rust/src/const_vars.rs b/src/rust/src/const_vars.rs deleted file mode 100644 index efa8b7e..0000000 --- a/src/rust/src/const_vars.rs +++ /dev/null @@ -1,17 +0,0 @@ -// Current version of FlatCityBuf -pub(crate) const VERSION: u8 = 1; - -// Magic bytes for FlatCityBuf -pub(crate) const MAGIC_BYTES: [u8; 8] = [b'f', b'c', b'b', VERSION, b'f', b'c', b'b', 0]; - -// Maximum buffer size for header -pub(crate) const HEADER_MAX_BUFFER_SIZE: usize = 1024 * 1024 * 512; // 512MB - -// Size of magic bytes -pub(crate) const MAGIC_BYTES_SIZE: usize = 8; - -// Size of header size -pub(crate) const HEADER_SIZE_SIZE: usize = 4; - -// // Offset of header size -// pub(crate) const HEADER_SIZE_OFFSET: usize = MAGIC_BYTES_SIZE + HEADER_SIZE_SIZE; diff --git a/src/rust/src/http_reader/mock_http_range_client.rs b/src/rust/src/http_reader/mock_http_range_client.rs deleted file mode 100644 index 7b5bb24..0000000 --- a/src/rust/src/http_reader/mock_http_range_client.rs +++ /dev/null @@ -1,107 +0,0 @@ -use crate::HttpFcbReader; -use anyhow::Result; -use bytes::Bytes; -use http_range_client::AsyncHttpRangeClient; -use std::fs::File; -use std::io::{BufReader, Read, Seek, SeekFrom}; -use std::ops::Range; -use std::path::PathBuf; -use std::sync::{Arc, RwLock}; -use tracing::trace; - -impl HttpFcbReader { - /// NOTE: For debugging expediency, this test class often prefers panics over returning a result. - pub async fn mock_from_file( - path: &str, - ) -> Result<( - HttpFcbReader, - Arc>, - )> { - trace!("starting: opening http reader, reading header"); - - let stats = Arc::new(RwLock::new(RequestStats::new())); - let http_client = MockHttpRangeClient::new(path, stats.clone()); - let client = http_range_client::AsyncBufferedHttpRangeClient::with(http_client, path); - Ok((Self::_open(client).await?, stats)) - } -} - -/// NOTE: For debugging expediency, this test class often prefers panics over returning a result. -pub(crate) struct MockHttpRangeClient { - path: PathBuf, - stats: Arc>, -} - -pub(crate) struct RequestStats { - pub request_count: u64, - pub bytes_requested: u64, -} - -impl RequestStats { - fn new() -> Self { - Self { - request_count: 0, - bytes_requested: 0, - } - } -} - -#[async_trait::async_trait] -impl AsyncHttpRangeClient for MockHttpRangeClient { - async fn get_range(&self, url: &str, range: &str) -> http_range_client::Result { - assert_eq!(url, self.path.to_str().unwrap()); - - /// This is a hack, but we need the start and length of the range - /// since all we're given is the pre-formatted range string, we - /// need to parse it into its components - /// - /// For expediency, this test code panics rather than returns a result. - fn parse_range_header(range: &str) -> Range { - let bytes = range.strip_prefix("bytes=").unwrap(); - let parts: Vec<&str> = bytes.split('-').collect(); - assert!(parts.len() == 2); - let start = parts[0].parse().expect("should have valid start range"); - let end: u64 = parts[1].parse().expect("should have valid end range"); - // Range headers are *inclusive* - start..(end + 1) - } - - let range = parse_range_header(range); - let request_length = range.end - range.start; - - let mut stats = self - .stats - .write() - .expect("test code does not handle actual concurrency"); - - stats.request_count += 1; - stats.bytes_requested += request_length; - - let mut file_reader = BufReader::new(File::open(&self.path).unwrap()); - file_reader - .seek(SeekFrom::Start(range.start)) - .expect("unable to seek test reader"); - let mut output = vec![0; request_length as usize]; - file_reader - .read_exact(&mut output) - .expect("failed to read from test reader"); - Ok(Bytes::from(output)) - } - - async fn head_response_header( - &self, - _url: &str, - _header: &str, - ) -> http_range_client::Result> { - unimplemented!() - } -} - -impl MockHttpRangeClient { - fn new(path: &str, stats: Arc>) -> Self { - Self { - path: path.into(), - stats, - } - } -} diff --git a/src/rust/src/lib.rs b/src/rust/src/lib.rs index 57fb574..e69de29 100644 --- a/src/rust/src/lib.rs +++ b/src/rust/src/lib.rs @@ -1,27 +0,0 @@ -#![allow(clippy::manual_range_contains)] - -mod cj_utils; -mod const_vars; -mod error; -#[allow(dead_code, unused_imports, clippy::all, warnings)] -mod feature_generated; -#[allow(dead_code, unused_imports, clippy::all, warnings)] -mod header_generated; -mod http_reader; -mod packedrtree; -mod reader; -mod writer; - -pub use cj_utils::*; -pub(crate) use const_vars::*; -pub use feature_generated::*; -pub use header_generated::*; -#[cfg(feature = "http")] -pub use http_reader::*; -pub use packedrtree::*; -pub use reader::*; -pub use writer::*; - -fn check_magic_bytes(bytes: &[u8]) -> bool { - bytes[0..3] == MAGIC_BYTES[0..3] && bytes[4..7] == MAGIC_BYTES[4..7] && bytes[3] <= VERSION -} diff --git a/src/rust/wasm/Cargo.toml b/src/rust/wasm/Cargo.toml new file mode 100644 index 0000000..d5bc1b7 --- /dev/null +++ b/src/rust/wasm/Cargo.toml @@ -0,0 +1,33 @@ +[package] +name = "wasm" +version = "0.1.0" +edition = "2021" + + +[lib] +crate-type = ["cdylib"] + +[dependencies] +packed_rtree = { path = "../packed_rtree" } +bytes = { workspace = true } +fcb_core = { path = "../fcb_core", features = ["wasm"] } +getrandom = { workspace = true, features = ["js"] } +gloo-net = { workspace = true } +http-range-client = { workspace = true, default-features = false } +js-sys = { workspace = true } +wasm-bindgen = { workspace = true } +wasm-bindgen-futures = { workspace = true } +byteorder = { workspace = true } +cjseq = { workspace = true } +tempfile = { workspace = true } +serde_json = { workspace = true } +serde = { workspace = true, features = ["derive"] } +anyhow = { workspace = true } +fallible-streaming-iterator = { workspace = true } +clap = { workspace = true } +tracing = { workspace = true } +serde-wasm-bindgen = { workspace = true } +async-trait = { workspace = true, default-features = false } +console_error_panic_hook = { workspace = true } +console_log = { workspace = true } +log = { workspace = true } diff --git a/src/rust/wasm/src/gloo_client.rs b/src/rust/wasm/src/gloo_client.rs new file mode 100644 index 0000000..a37b406 --- /dev/null +++ b/src/rust/wasm/src/gloo_client.rs @@ -0,0 +1,51 @@ +#![cfg(target_arch = "wasm32")] + +use async_trait::async_trait; +use bytes::Bytes; +use gloo_net::http::RequestBuilder as GlooRequest; +use http_range_client::{AsyncBufferedHttpRangeClient, AsyncHttpRangeClient, HttpError, Result}; + +pub struct WasmHttpClient {} + +impl WasmHttpClient { + pub fn new(url: &str) -> AsyncBufferedHttpRangeClient { + AsyncBufferedHttpRangeClient::with(WasmHttpClient {}, url) + } +} + +#[cfg(target_arch = "wasm32")] +#[async_trait(?Send)] +impl AsyncHttpRangeClient for WasmHttpClient { + async fn get_range(&self, url: &str, range: &str) -> Result { + let response = GlooRequest::new(url) + .header("Range", range) + .send() + .await + .map_err(|e| HttpError::HttpError(e.to_string()))?; + + if !response.ok() { + return Err(HttpError::HttpStatus(response.status())); + } + response + .binary() + .await + .map(Bytes::from) + .map_err(|e| HttpError::HttpError(e.to_string())) + } + + async fn head_response_header(&self, url: &str, header: &str) -> Result> { + let response = GlooRequest::new(url) + .send() + .await + .map_err(|e| HttpError::HttpError(format!("failed to send request: {}", e)))?; + if let Some(val) = response.headers().get(header) { + // let v = val + // .to_str() + // .map_err(|e| HttpError::HttpError(e.to_string()))?; + // Ok(Some(v.to_string())) + Ok(Some(val.to_string())) + } else { + Ok(None) + } + } +} diff --git a/src/rust/wasm/src/lib.rs b/src/rust/wasm/src/lib.rs new file mode 100644 index 0000000..febe9a3 --- /dev/null +++ b/src/rust/wasm/src/lib.rs @@ -0,0 +1,571 @@ +use console_error_panic_hook::set_once; +use console_log::init_with_level; +use fcb_core::deserializer::{to_cj_feature, to_cj_metadata}; +use fcb_core::{feature_generated, header_generated, size_prefixed_root_as_header, Header}; +use gloo_client::WasmHttpClient; +use gloo_net::http::{Request as GlooRequest, Response}; +use js_sys::Uint8Array; +use log::{error, info}; +use serde_wasm_bindgen::to_value; +use wasm_bindgen::prelude::*; + +use byteorder::{ByteOrder, LittleEndian}; +use bytes::{BufMut, Bytes, BytesMut}; +use cjseq::CityJSONFeature; +use fcb_core::city_buffer::FcbBuffer; +use fcb_core::{ + check_magic_bytes, size_prefixed_root_as_city_feature, HEADER_MAX_BUFFER_SIZE, + HEADER_SIZE_SIZE, MAGIC_BYTES_SIZE, +}; + +use std::fmt::Error; +use std::result::Result; + +use http_range_client::{ + AsyncBufferedHttpRangeClient, AsyncHttpRangeClient, BufferedHttpRangeClient, +}; + +use packed_rtree::{http::HttpRange, http::HttpSearchResultItem, NodeItem, PackedRTree}; +use std::collections::VecDeque; +use std::ops::Range; +use tracing::debug; +use tracing::trace; +mod gloo_client; + +// The largest request we'll speculatively make. +// If a single huge feature requires, we'll necessarily exceed this limit. +const DEFAULT_HTTP_FETCH_SIZE: usize = 1_048_576; // 1MB + +/// FlatCityBuf dataset HTTP reader +#[wasm_bindgen] +pub struct HttpFcbReader { + client: AsyncBufferedHttpRangeClient, + // feature reading requires header access, therefore + // header_buf is included in the FcbBuffer struct. + fbs: FcbBuffer, +} + +#[wasm_bindgen] +pub struct AsyncFeatureIter { + client: AsyncBufferedHttpRangeClient, + // feature reading requires header access, therefore + // header_buf is included in the FcbBuffer struct. + fbs: FcbBuffer, + /// Which features to iterate + selection: FeatureSelection, + /// Number of selected features + count: usize, +} + +// impl WasmFcbReader { +// pub async fn new(url: String) -> Result { +// let client = WasmHttpClient::new(url).await?; +// Self::_open(client).await +// } +// } + +#[wasm_bindgen] +impl HttpFcbReader { + #[wasm_bindgen(constructor, start)] + pub async fn new(url: String) -> Result { + println!("open===: {:?}", url); + console_error_panic_hook::set_once(); + init_with_level(log::Level::Debug).expect("Could not initialize logger"); + + trace!("starting: opening http reader, reading header"); + let client = WasmHttpClient::new(&url); + Self::_open(client).await + } + + async fn _open( + mut client: AsyncBufferedHttpRangeClient, + ) -> Result { + // Because we use a buffered HTTP reader, anything extra we fetch here can + // be utilized to skip subsequent fetches. + // Immediately following the header is the optional spatial index, we deliberately fetch + // a small part of that to skip subsequent requests + let prefetch_index_bytes: usize = { + // The actual branching factor will be in the header, but since we don't have the header + // yet we guess. The consequence of getting this wrong isn't catastrophic, it just means + // we may be fetching slightly more than we need or that we make an extra request later. + let assumed_branching_factor = PackedRTree::DEFAULT_NODE_SIZE as usize; + + // NOTE: each layer is exponentially larger + let prefetched_layers: u32 = 3; + + (0..prefetched_layers) + .map(|i| assumed_branching_factor.pow(i) * std::mem::size_of::()) + .sum() + }; + + // In reality, the header is probably less than half this size, but better to overshoot and + // fetch an extra kb rather than have to issue a second request. + let assumed_header_size = 2024; + let min_req_size = assumed_header_size + prefetch_index_bytes; + client.set_min_req_size(min_req_size); + debug!("fetching header. min_req_size: {min_req_size} (assumed_header_size: {assumed_header_size}, prefetched_index_bytes: {prefetch_index_bytes})"); + info!("fetching header. min_req_size: {min_req_size} (assumed_header_size: {assumed_header_size}, prefetched_index_bytes: {prefetch_index_bytes})"); + let mut read_bytes = 0; + let bytes = client + .get_range(read_bytes, MAGIC_BYTES_SIZE) + .await + .map_err(|e| JsValue::from_str(&e.to_string()))?; // to get magic bytes + if !check_magic_bytes(bytes) { + return Err(JsValue::from_str("MissingMagicBytes")); + } + debug!("checked magic bytes"); + + read_bytes += MAGIC_BYTES_SIZE; + let mut bytes = BytesMut::from( + client + .get_range(read_bytes, HEADER_SIZE_SIZE) + .await + .map_err(|e| JsValue::from_str(&e.to_string()))?, + ); + read_bytes += HEADER_SIZE_SIZE; + + let header_size = LittleEndian::read_u32(&bytes) as usize; + if header_size > HEADER_MAX_BUFFER_SIZE || header_size < 8 { + // minimum size check avoids panic in FlatBuffers header decoding + return Err(JsValue::from_str(&format!( + "IllegalHeaderSize: {header_size}" + ))); + } + info!("header_size: {header_size}"); + + bytes.put( + client + .get_range(read_bytes, header_size) + .await + .map_err(|e| JsValue::from_str(&e.to_string()))?, + ); + read_bytes += header_size; + + let header_buf = bytes.to_vec(); + + // verify flatbuffer + let header = size_prefixed_root_as_header(&header_buf) + .map_err(|e| JsValue::from_str(&e.to_string()))?; + info!("header:---------"); + info!("header: {:?}", to_cj_metadata(&header)); + trace!("completed: opening http reader"); + Ok(HttpFcbReader { + client, + fbs: FcbBuffer { + header_buf, + features_buf: Vec::new(), + }, + }) + } + + #[wasm_bindgen] + pub fn header(&self) -> Result { + let header = self.fbs.header(); + info!("header in the function: {:?}", to_cj_metadata(&header)); + let cj = to_cj_metadata(&header).map_err(|e| JsValue::from_str(&e.to_string()))?; + let jsval = to_value(&cj).map_err(|e| JsValue::from_str(&e.to_string()))?; + info!("jsval: {:?}", jsval); + Ok(jsval) + } + + fn header_len(&self) -> usize { + MAGIC_BYTES_SIZE + self.fbs.header_buf.len() + } + /// Select all features. + #[wasm_bindgen] + pub async fn select_all(self) -> Result { + let header = self.fbs.header(); + let count = header.features_count(); + // TODO: support reading with unknown feature count + let index_size = if header.index_node_size() > 0 { + PackedRTree::index_size(count as usize, header.index_node_size()) + } else { + 0 + }; + // Skip index + let feature_base = self.header_len() + index_size; + Ok(AsyncFeatureIter { + client: self.client, + fbs: self.fbs, + selection: FeatureSelection::SelectAll(SelectAll { + features_left: count, + pos: feature_base, + }), + count: count as usize, + }) + } + /// Select features within a bounding box. + #[wasm_bindgen] + pub async fn select_bbox( + mut self, + min_x: f64, + min_y: f64, + max_x: f64, + max_y: f64, + ) -> Result { + trace!("starting: select_bbox, traversing index"); + // Read R-Tree index and build filter for features within bbox + let header = self.fbs.header(); + if header.index_node_size() == 0 || header.features_count() == 0 { + return Err(JsValue::from_str("NoIndex")); + } + let count = header.features_count() as usize; + let header_len = self.header_len(); + + // request up to this many extra bytes if it means we can eliminate an extra request + let combine_request_threshold = 256 * 1024; + + let list = PackedRTree::http_stream_search( + &mut self.client, + header_len, + count, + PackedRTree::DEFAULT_NODE_SIZE, + min_x, + min_y, + max_x, + max_y, + combine_request_threshold, + ) + .await + .map_err(|e| JsValue::from_str(&e.to_string()))?; + debug_assert!( + list.windows(2) + .all(|w| w[0].range.start() < w[1].range.start()), + "Since the tree is traversed breadth first, list should be sorted by construction." + ); + + let count = list.len(); + let feature_batches = FeatureBatch::make_batches(list, combine_request_threshold) + .await + .map_err(|e| JsValue::from_str(&e.to_string()))?; + let selection = FeatureSelection::SelectBbox(SelectBbox { feature_batches }); + trace!("completed: select_bbox"); + Ok(AsyncFeatureIter { + client: self.client, + fbs: self.fbs, + selection, + count, + }) + } +} + +#[wasm_bindgen] +impl AsyncFeatureIter { + fn _header(&self) -> Header { + self.fbs.header() + } + #[wasm_bindgen] + pub fn header(&self) -> Result { + let header = self.fbs.header(); + let cj = to_cj_metadata(&header).map_err(|e| JsValue::from_str(&e.to_string()))?; + to_value(&cj).map_err(|e| JsValue::from_str(&e.to_string())) + } + /// Number of selected features (might be unknown) + #[wasm_bindgen] + pub fn features_count(&self) -> Option { + if self.count > 0 { + Some(self.count) + } else { + None + } + } + /// Read next feature + #[wasm_bindgen] + pub async fn next(&mut self) -> Result, JsValue> { + let Some(buffer) = self + .selection + .next_feature_buffer(&mut self.client) + .await + .map_err(|e| JsValue::from_str(&e.to_string()))? + else { + return Ok(None); + }; + + // Not zero-copy + self.fbs.features_buf = buffer.to_vec(); + // verify flatbuffer + let feature = size_prefixed_root_as_city_feature(&self.fbs.features_buf) + .map_err(|e| JsValue::from_str(&e.to_string()))?; + let cj_feature = to_cj_feature(feature, self._header().columns()) + .map_err(|e| JsValue::from_str(&e.to_string()))?; + Ok(Some(to_value(&cj_feature)?)) + } + /// Return current feature + pub fn cur_feature(&self) -> Result { + self.cur_feature() + } + + pub fn cur_cj_feature(&self) -> Result { + let cj_feature = to_cj_feature(self.fbs.feature(), self._header().columns()) + .map_err(|e| JsValue::from_str(&e.to_string()))?; + Ok(to_value(&cj_feature)?) + } +} + +enum FeatureSelection { + SelectAll(SelectAll), + SelectBbox(SelectBbox), +} + +impl FeatureSelection { + async fn next_feature_buffer( + &mut self, + client: &mut AsyncBufferedHttpRangeClient, + ) -> Result, Error> { + match self { + FeatureSelection::SelectAll(select_all) => select_all.next_buffer(client).await, + FeatureSelection::SelectBbox(select_bbox) => select_bbox.next_buffer(client).await, + } + } +} + +struct SelectAll { + /// Features left + features_left: u64, + + /// How many bytes into the file we've read so far + pos: usize, +} + +impl SelectAll { + async fn next_buffer( + &mut self, + client: &mut AsyncBufferedHttpRangeClient, + ) -> Result, Error> { + client.min_req_size(DEFAULT_HTTP_FETCH_SIZE); + + if self.features_left == 0 { + return Ok(None); + } + self.features_left -= 1; + + let mut feature_buffer = + BytesMut::from(client.get_range(self.pos, 4).await.map_err(|_| Error)?); + self.pos += 4; + let feature_size = LittleEndian::read_u32(&feature_buffer) as usize; + feature_buffer.put( + client + .get_range(self.pos, feature_size) + .await + .map_err(|_| Error)?, + ); + self.pos += feature_size; + + Ok(Some(feature_buffer.freeze())) + } +} + +struct SelectBbox { + /// Selected features + feature_batches: Vec, +} + +impl SelectBbox { + async fn next_buffer( + &mut self, + client: &mut AsyncBufferedHttpRangeClient, + ) -> Result, Error> { + let mut next_buffer = None; + while next_buffer.is_none() { + let Some(feature_batch) = self.feature_batches.last_mut() else { + break; + }; + let Some(buffer) = feature_batch.next_buffer(client).await? else { + // done with this batch + self.feature_batches + .pop() + .expect("already asserted feature_batches was non-empty"); + continue; + }; + next_buffer = Some(buffer) + } + + Ok(next_buffer) + } +} + +struct FeatureBatch { + /// The byte location of each feature within the file + feature_ranges: VecDeque, +} + +impl FeatureBatch { + async fn make_batches( + feature_ranges: Vec, + combine_request_threshold: usize, + ) -> Result, Error> { + let mut batched_ranges = vec![]; + + for search_result_item in feature_ranges.into_iter() { + let Some(latest_batch) = batched_ranges.last_mut() else { + let mut new_batch = VecDeque::new(); + new_batch.push_back(search_result_item.range); + batched_ranges.push(new_batch); + continue; + }; + + let previous_item = latest_batch.back().expect("we never push an empty batch"); + + let HttpRange::Range(Range { end: prev_end, .. }) = previous_item else { + debug_assert!(false, "This shouldn't happen. Only the very last feature is expected to have an unknown length"); + let mut new_batch = VecDeque::new(); + new_batch.push_back(search_result_item.range); + batched_ranges.push(new_batch); + continue; + }; + + let wasted_bytes = search_result_item.range.start() - prev_end; + if wasted_bytes < combine_request_threshold { + if wasted_bytes == 0 { + trace!("adjacent feature"); + } else { + trace!("wasting {wasted_bytes} to avoid an extra request"); + } + latest_batch.push_back(search_result_item.range) + } else { + trace!("creating a new request for batch rather than wasting {wasted_bytes} bytes"); + let mut new_batch = VecDeque::new(); + new_batch.push_back(search_result_item.range); + batched_ranges.push(new_batch); + } + } + + let mut batches: Vec<_> = batched_ranges.into_iter().map(FeatureBatch::new).collect(); + batches.reverse(); + Ok(batches) + } + + fn new(feature_ranges: VecDeque) -> Self { + Self { feature_ranges } + } + + /// When fetching new data, how many bytes should we fetch at once. + /// It was computed based on the specific feature ranges of the batch + /// to optimize number of requests vs. wasted bytes vs. resident memory + fn request_size(&self) -> usize { + let Some(first) = self.feature_ranges.front() else { + return 0; + }; + let Some(last) = self.feature_ranges.back() else { + return 0; + }; + + // `last.length()` should only be None if this batch includes the final feature + // in the dataset. Since we can't infer its actual length, we'll fetch only + // the first 4 bytes of that feature buffer, which will tell us the actual length + // of the feature buffer for the subsequent request. + let last_feature_length = last.length().unwrap_or(4); + + let covering_range = first.start()..last.start() + last_feature_length; + + covering_range + .len() + // Since it's all held in memory, don't fetch more than DEFAULT_HTTP_FETCH_SIZE at a time + // unless necessary. + .min(DEFAULT_HTTP_FETCH_SIZE) + } + + async fn next_buffer( + &mut self, + client: &mut AsyncBufferedHttpRangeClient, + ) -> Result, Error> { + let request_size = self.request_size(); + client.set_min_req_size(request_size); + let Some(feature_range) = self.feature_ranges.pop_front() else { + return Ok(None); + }; + + let mut pos = feature_range.start(); + let mut feature_buffer = BytesMut::from(client.get_range(pos, 4).await.map_err(|_| Error)?); + pos += 4; + let feature_size = LittleEndian::read_u32(&feature_buffer) as usize; + feature_buffer.put( + client + .get_range(pos, feature_size) + .await + .map_err(|_| Error)?, + ); + + Ok(Some(feature_buffer.freeze())) + } +} + +// #[cfg(test)] +// mod tests { +// use crate::HttpFcbReader; + +// #[tokio::test] +// async fn fgb_max_request_size() { +// let (fgb, stats) = HttpFcbReader::mock_from_file("../../test/data/UScounties.fgb") +// .await +// .unwrap(); + +// { +// // The read guard needs to be in a scoped block, else we won't release the lock and the test will hang when +// // the actual FGB client code tries to update the stats. +// let stats = stats.read().unwrap(); +// assert_eq!(stats.request_count, 1); +// // This number might change a little if the test data or logic changes, but they should be in the same ballpark. +// assert_eq!(stats.bytes_requested, 12944); +// } + +// // This bbox covers a large swathe of the dataset. The idea is that at least one request should be limited by the +// // max request size `DEFAULT_HTTP_FETCH_SIZE`, but that we should still have a reasonable number of requests. +// let mut iter = fgb.select_bbox(-118.0, 42.0, -100.0, 47.0).await.unwrap(); + +// let mut feature_count = 0; +// while let Some(_feature) = iter.next().await.unwrap() { +// feature_count += 1; +// } +// assert_eq!(feature_count, 169); + +// { +// // The read guard needs to be in a scoped block, else we won't release the lock and the test will hang when +// // the actual FGB client code tries to update the stats. +// let stats = stats.read().unwrap(); +// // These numbers might change a little if the test data or logic changes, but they should be in the same ballpark. +// assert_eq!(stats.request_count, 5); +// assert_eq!(stats.bytes_requested, 2131152); +// } +// } +// } + +// #[wasm_bindgen] +// pub async fn fetch_partial(url: &str, start: u64, end: u64) -> Result { +// // Construct the "Range" header, e.g. "bytes=0-1023" +// let range_header_value = format!("bytes={}-{}", start, end); + +// // Perform a GET request using the `fetch` API under the hood +// let response = Request::get(url) +// .header("Range", &range_header_value) +// .send() +// .await +// .map_err(|err| JsValue::from_str(&err.to_string()))?; + +// // If the server supports partial requests, often you'll get status 206 +// // However, some servers might return 200 if they don't handle partial fetches +// if !response.ok() { +// // We'll forward the status as an error +// return Err(JsValue::from_str(&format!( +// "HTTP status: {}", +// response.status() +// ))); +// } + +// // Retrieve the bytes from the response +// let bytes = response +// .binary() +// .await +// .map_err(|err| JsValue::from_str(&err.to_string()))?; + +// // Convert them to a JavaScript `Uint8Array` so JS code can read them +// let array = Uint8Array::from(bytes.as_slice()); + +// // Return the Uint8Array as a `JsValue` +// Ok(array.into()) +// } + +// #[wasm_bindgen] +// pub fn hello() { +// println!("Hello, world!"); +// } diff --git a/src/rust/wasm/src/range_client.rs b/src/rust/wasm/src/range_client.rs new file mode 100644 index 0000000..9250a4c --- /dev/null +++ b/src/rust/wasm/src/range_client.rs @@ -0,0 +1,39 @@ +// use crate::HttpFcbReader; +// use anyhow::Result; +// use bytes::Bytes; +// use gloo_net::http::Request; +// use http_range_client::AsyncHttpRangeClient; +// use std::fs::File; +// use std::io::{BufReader, Read, Seek, SeekFrom}; +// use std::ops::Range; +// use std::path::PathBuf; +// use std::sync::{Arc, RwLock}; + +// use wasm_bindgen::prelude::*; + +// #[wasm_bindgen] +// pub struct WasmHttpClient { +// inner: GlooRequest, +// } + +// // impl HttpFcbReader { +// // pub async fn open(url: &str) -> Result<> { +// // trace!("starting: opening http reader, reading header"); + +// // // let stats = Arc::new(RwLock::new(RequestStats::new())); +// // // let http_client = MockHttpRangeClient::new(path, stats.clone()); +// // // let client = http_range_client::AsyncBufferedHttpRangeClient::with(http_client, path); +// // // Ok((Self::_open(client).await?, stats)) +// // todo!("implement me") +// // } +// // } + +// impl AsyncHttpRangeClient for WasmHttpClient { +// async fn get_range(&self, url: &str, range: &str) -> Result { +// todo!("implement me") +// } + +// async fn head_response_header(&self, url: &str, header: &str) -> Result> { +// todo!("implement me") +// } +// }