diff --git a/.vscode/settings.json b/.vscode/settings.json index 433716a..0aa2dee 100644 --- a/.vscode/settings.json +++ b/.vscode/settings.json @@ -8,5 +8,8 @@ "rust-analyzer.checkOnSave.extraArgs": [ "--lib" ], - "python.pythonPath": "${workspaceFolder}/src/py/.venv/bin/python" + "python.pythonPath": "${workspaceFolder}/src/py/.venv/bin/python", + "rust-analyzer.server.extraEnv": { + "RUST_LOG": "debug" + } } \ No newline at end of file diff --git a/LICENSE b/LICENSE index af96476..2fe5d6a 100644 --- a/LICENSE +++ b/LICENSE @@ -22,9 +22,13 @@ SOFTWARE. --- -Portions of this software are derived from FlatGeobuf, which is licensed under the BSD 2-Clause License. +Portions of this software are derived from the FlatGeobuf project, +specifically the `packed_r_tree.rs` file, which is licensed under the +BSD 2-Clause License. The original code can be found at: +https://github.com/flatgeobuf/flatgeobuf/blob/master/src/rust/src/packed_r_tree.rs Copyright (c) 2018, Björn Harrtell, Postnummer Stockholm AB +Copyright (c) 2024, Pirmin Kalberer (Rust implementation) All rights reserved. Redistribution and use in source and binary forms, with or without diff --git a/src/rust/Cargo.toml b/src/rust/Cargo.toml index 413c89f..c9617c9 100644 --- a/src/rust/Cargo.toml +++ b/src/rust/Cargo.toml @@ -8,6 +8,9 @@ absolute_path = "warn" nursery = { level = "warn", priority = -1 } # See more keys and their definitions at https://doc.rust-lang.org/cargo/reference/manifest.html +[features] +default = [] +http = ["http-range-client", "bytes", "reqwest"] [dependencies] flatbuffers = "24.3.25" @@ -18,6 +21,13 @@ serde_json = "1.0.133" anyhow = "1.0.95" fallible-streaming-iterator = "0.1.9" clap = "4.5.23" +http-range-client = { version = "0.9.0", optional = true, default-features = false, features = [ + "reqwest-async", +] } +reqwest = { version = "0.12.12", optional = true } +bytes = { version = "1.9.0", optional = true } +tracing = "0.1.41" +tokio = "1.43.0" [lib] name = "flatcitybuf" @@ -33,14 +43,12 @@ name = "flatcitybuf_cli" path = "src/main.rs" [dev-dependencies] -criterion = { version = "0.5.1", features = ["html_reports"] } +async-trait = "0.1.85" +criterion = { version = "0.5.1", features = ["async_tokio", "html_reports"] } memory-stats = "1.2.0" pretty_assertions = "1.4.1" +tokio = { version = "1.43.0", features = ["macros", "rt-multi-thread"] } [[bench]] name = "read" harness = false - -# [features] -# default = ["http"] -# http = ["http-range-client", "bytes", "reqwest"] diff --git a/src/rust/makefile b/src/rust/makefile index 3e6f6db..451315e 100644 --- a/src/rust/makefile +++ b/src/rust/makefile @@ -3,20 +3,20 @@ .PHONY: pre-commit pre-commit: cargo fmt - cargo clippy --fix --allow-dirty - cargo machete - cargo nextest run + cargo clippy --fix --allow-dirty --all-features + cargo machete --all-features + cargo nextest run --all-features cargo check --all-features - cargo build --release + cargo build --release --all-features .PHONY: ser ser: - cargo run --bin flatcitybuf_cli ser -i tests/data/delft.city.jsonl -o temp/delft_attr.fcb + cargo run --all-features --bin flatcitybuf_cli ser -i tests/data/delft.city.jsonl -o temp/delft_attr.fcb # cargo run --bin flatcitybuf_cli serialize -i tests/data/delft.city.jsonl -o temp/delft.fcb .PHONY: deser deser: - cargo run --bin flatcitybuf_cli deser -i temp/delft_attr.fcb -o temp/delft_attr.city.jsonl + cargo run --all-features --bin flatcitybuf_cli deser -i temp/delft_attr.fcb -o temp/delft_attr.city.jsonl # cargo run --bin flatcitybuf_cli deserialize -i temp/small.fcb -o temp/small.city.jsonl .PHONY: bench diff --git a/src/rust/src/bin/read.rs b/src/rust/src/bin/read.rs index 0cf13e0..9fb3eec 100644 --- a/src/rust/src/bin/read.rs +++ b/src/rust/src/bin/read.rs @@ -42,112 +42,6 @@ fn read_file() -> Result<(), Box> { .write_all(format!("{}\n", serde_json::to_string(feature).unwrap()).as_bytes())?; } - // let original_cjseq_file = manifest_dir - // .join("tests") - // .join("data") - // .join("small.city.jsonl"); - // let cjseq_file = File::open(original_cjseq_file)?; - // let cjseq_reader = BufReader::new(cjseq_file); - // let cj_seq = read_cityjson_from_reader(cjseq_reader, CJTypeKind::Seq)?; - // if let CJType::Seq(cj_seq) = cj_seq { - // let CityJSONSeq { - // cj: original_cj, - // features: original_features, - // } = cj_seq; - - // // Compare features - // if original_features != features { - // println!("features differ:"); - // if original_features.len() != features.len() { - // println!( - // "length mismatch: {} != {}", - // original_features.len(), - // features.len() - // ); - // } else { - // for (i, (orig, new)) in original_features.iter().zip(features.iter()).enumerate() { - // if orig != new { - // println!("feature {} differs:", i); - // if orig.thetype != new.thetype { - // println!(" type mismatch: {} != {}", orig.thetype, new.thetype); - // } - // if orig.city_objects != new.city_objects { - // println!(" city_objects mismatch at index {}", i); - // //compare the first element of the city_objects - // let orig_first = orig.city_objects.iter().next().unwrap(); - // let new_first = new.city_objects.get(orig_first.0).unwrap(); - - // println!(" key mismatch: {}", orig_first.0); - // if orig_first.1.geometry != new_first.geometry { - // println!( - // " geometry mismatch: {:?} != {:?}", - // orig_first.1.geometry, new_first.geometry - // ); - // //geometry is a vector of geometry, iterate over the elements and compare - // for (i, (orig_geom, new_geom)) in orig_first - // .1 - // .geometry - // .iter() - // .zip(new_first.geometry.iter()) - // .enumerate() - // { - // if orig_geom != new_geom { - // println!( - // " geometry element {} mismatch: {:?} != {:?}", - // i, orig_geom, new_geom - // ); - // } - // // compare each element of the geometry - // for (j, (orig_elem, new_elem)) in - // orig_geom.iter().zip(new_geom.iter()).enumerate() - // { - // if orig_elem != new_elem { - // println!( - // " geometry element {} mismatch: {:?} != {:?}", - // i, orig_elem, new_elem - // ); - // } - // if orig_elem.boundaries != new_elem.boundaries { - // println!( - // " boundaries mismatch: {:?} != {:?}", - // orig_elem.boundaries, new_elem.boundaries - // ); - // } - // if orig_elem.thetype != new_elem.thetype { - // println!( - // " type mismatch: {:?} != {:?}", - // orig_elem.thetype, new_elem.thetype - // ); - // } - // if orig_elem.lod != new_elem.lod { - // println!( - // " lod mismatch: {:?} != {:?}", - // orig_elem.lod, new_elem.lod - // ); - // } - // } - // } - // } - // if orig_first.1.geographical_extent != new_first.geographical_extent { - // println!( - // " geographical_extent mismatch: {:?} != {:?}", - // orig_first.1.geographical_extent, new_first.geographical_extent - // ); - // } - // } - // if orig.vertices != new.vertices { - // println!( - // " vertices mismatch: {:?} != {:?}", - // orig.vertices, new.vertices - // ); - // } - // } - // } - // } - // panic!("features are not equal"); - // } - // } - Ok(()) } diff --git a/src/rust/src/const_vars.rs b/src/rust/src/const_vars.rs new file mode 100644 index 0000000..efa8b7e --- /dev/null +++ b/src/rust/src/const_vars.rs @@ -0,0 +1,17 @@ +// Current version of FlatCityBuf +pub(crate) const VERSION: u8 = 1; + +// Magic bytes for FlatCityBuf +pub(crate) const MAGIC_BYTES: [u8; 8] = [b'f', b'c', b'b', VERSION, b'f', b'c', b'b', 0]; + +// Maximum buffer size for header +pub(crate) const HEADER_MAX_BUFFER_SIZE: usize = 1024 * 1024 * 512; // 512MB + +// Size of magic bytes +pub(crate) const MAGIC_BYTES_SIZE: usize = 8; + +// Size of header size +pub(crate) const HEADER_SIZE_SIZE: usize = 4; + +// // Offset of header size +// pub(crate) const HEADER_SIZE_OFFSET: usize = MAGIC_BYTES_SIZE + HEADER_SIZE_SIZE; diff --git a/src/rust/src/http_reader/mock_http_range_client.rs b/src/rust/src/http_reader/mock_http_range_client.rs new file mode 100644 index 0000000..7b5bb24 --- /dev/null +++ b/src/rust/src/http_reader/mock_http_range_client.rs @@ -0,0 +1,107 @@ +use crate::HttpFcbReader; +use anyhow::Result; +use bytes::Bytes; +use http_range_client::AsyncHttpRangeClient; +use std::fs::File; +use std::io::{BufReader, Read, Seek, SeekFrom}; +use std::ops::Range; +use std::path::PathBuf; +use std::sync::{Arc, RwLock}; +use tracing::trace; + +impl HttpFcbReader { + /// NOTE: For debugging expediency, this test class often prefers panics over returning a result. + pub async fn mock_from_file( + path: &str, + ) -> Result<( + HttpFcbReader, + Arc>, + )> { + trace!("starting: opening http reader, reading header"); + + let stats = Arc::new(RwLock::new(RequestStats::new())); + let http_client = MockHttpRangeClient::new(path, stats.clone()); + let client = http_range_client::AsyncBufferedHttpRangeClient::with(http_client, path); + Ok((Self::_open(client).await?, stats)) + } +} + +/// NOTE: For debugging expediency, this test class often prefers panics over returning a result. +pub(crate) struct MockHttpRangeClient { + path: PathBuf, + stats: Arc>, +} + +pub(crate) struct RequestStats { + pub request_count: u64, + pub bytes_requested: u64, +} + +impl RequestStats { + fn new() -> Self { + Self { + request_count: 0, + bytes_requested: 0, + } + } +} + +#[async_trait::async_trait] +impl AsyncHttpRangeClient for MockHttpRangeClient { + async fn get_range(&self, url: &str, range: &str) -> http_range_client::Result { + assert_eq!(url, self.path.to_str().unwrap()); + + /// This is a hack, but we need the start and length of the range + /// since all we're given is the pre-formatted range string, we + /// need to parse it into its components + /// + /// For expediency, this test code panics rather than returns a result. + fn parse_range_header(range: &str) -> Range { + let bytes = range.strip_prefix("bytes=").unwrap(); + let parts: Vec<&str> = bytes.split('-').collect(); + assert!(parts.len() == 2); + let start = parts[0].parse().expect("should have valid start range"); + let end: u64 = parts[1].parse().expect("should have valid end range"); + // Range headers are *inclusive* + start..(end + 1) + } + + let range = parse_range_header(range); + let request_length = range.end - range.start; + + let mut stats = self + .stats + .write() + .expect("test code does not handle actual concurrency"); + + stats.request_count += 1; + stats.bytes_requested += request_length; + + let mut file_reader = BufReader::new(File::open(&self.path).unwrap()); + file_reader + .seek(SeekFrom::Start(range.start)) + .expect("unable to seek test reader"); + let mut output = vec![0; request_length as usize]; + file_reader + .read_exact(&mut output) + .expect("failed to read from test reader"); + Ok(Bytes::from(output)) + } + + async fn head_response_header( + &self, + _url: &str, + _header: &str, + ) -> http_range_client::Result> { + unimplemented!() + } +} + +impl MockHttpRangeClient { + fn new(path: &str, stats: Arc>) -> Self { + Self { + path: path.into(), + stats, + } + } +} diff --git a/src/rust/src/http_reader/mod.rs b/src/rust/src/http_reader/mod.rs new file mode 100644 index 0000000..8a815f1 --- /dev/null +++ b/src/rust/src/http_reader/mod.rs @@ -0,0 +1,459 @@ +use crate::deserializer::to_cj_feature; +use crate::packedrtree::{HttpRange, HttpSearchResultItem, NodeItem, PackedRTree}; +use crate::reader::city_buffer::FcbBuffer; +use crate::{ + check_magic_bytes, size_prefixed_root_as_city_feature, HEADER_MAX_BUFFER_SIZE, HEADER_SIZE_SIZE, +}; +use crate::{header_generated::*, MAGIC_BYTES_SIZE}; +use anyhow::{anyhow, Result}; +use byteorder::{ByteOrder, LittleEndian}; +use bytes::{BufMut, Bytes, BytesMut}; +use cjseq::CityJSONFeature; +use http_range_client::{ + AsyncBufferedHttpRangeClient, AsyncHttpRangeClient, BufferedHttpRangeClient, +}; +use std::collections::VecDeque; +use std::ops::Range; +use tracing::debug; +use tracing::trace; + +#[cfg(test)] +mod mock_http_range_client; + +// The largest request we'll speculatively make. +// If a single huge feature requires, we'll necessarily exceed this limit. +const DEFAULT_HTTP_FETCH_SIZE: usize = 1_048_576; // 1MB + +/// FlatCityBuf dataset HTTP reader +pub struct HttpFcbReader { + client: AsyncBufferedHttpRangeClient, + // feature reading requires header access, therefore + // header_buf is included in the FcbBuffer struct. + fbs: FcbBuffer, +} + +pub struct AsyncFeatureIter { + client: AsyncBufferedHttpRangeClient, + // feature reading requires header access, therefore + // header_buf is included in the FcbBuffer struct. + fbs: FcbBuffer, + /// Which features to iterate + selection: FeatureSelection, + /// Number of selected features + count: usize, +} + +impl HttpFcbReader { + pub async fn open(url: &str) -> Result> { + println!("open===: {:?}", url); + trace!("starting: opening http reader, reading header"); + let client = BufferedHttpRangeClient::new(url); + Self::_open(client).await + } +} + +impl HttpFcbReader { + pub async fn new(client: AsyncBufferedHttpRangeClient) -> Result> { + Self::_open(client).await + } + + async fn _open(mut client: AsyncBufferedHttpRangeClient) -> Result> { + // Because we use a buffered HTTP reader, anything extra we fetch here can + // be utilized to skip subsequent fetches. + // Immediately following the header is the optional spatial index, we deliberately fetch + // a small part of that to skip subsequent requests + let prefetch_index_bytes: usize = { + // The actual branching factor will be in the header, but since we don't have the header + // yet we guess. The consequence of getting this wrong isn't catastrophic, it just means + // we may be fetching slightly more than we need or that we make an extra request later. + let assumed_branching_factor = PackedRTree::DEFAULT_NODE_SIZE as usize; + + // NOTE: each layer is exponentially larger + let prefetched_layers: u32 = 3; + + (0..prefetched_layers) + .map(|i| assumed_branching_factor.pow(i) * std::mem::size_of::()) + .sum() + }; + + // In reality, the header is probably less than half this size, but better to overshoot and + // fetch an extra kb rather than have to issue a second request. + let assumed_header_size = 2024; + let min_req_size = assumed_header_size + prefetch_index_bytes; + client.set_min_req_size(min_req_size); + debug!("fetching header. min_req_size: {min_req_size} (assumed_header_size: {assumed_header_size}, prefetched_index_bytes: {prefetch_index_bytes})"); + let mut read_bytes = 0; + let bytes = client.get_range(read_bytes, MAGIC_BYTES_SIZE).await?; // to get magic bytes + if !check_magic_bytes(bytes) { + return Err(anyhow!("MissingMagicBytes")); + } + + read_bytes += MAGIC_BYTES_SIZE; + let mut bytes = BytesMut::from(client.get_range(read_bytes, HEADER_SIZE_SIZE).await?); + read_bytes += HEADER_SIZE_SIZE; + + let header_size = LittleEndian::read_u32(&bytes) as usize; + if header_size > HEADER_MAX_BUFFER_SIZE || header_size < 8 { + // minimum size check avoids panic in FlatBuffers header decoding + return Err(anyhow!("IllegalHeaderSize: {header_size}")); + } + + bytes.put(client.get_range(read_bytes, header_size).await?); + read_bytes += header_size; + + let header_buf = bytes.to_vec(); + + // verify flatbuffer + let _header = size_prefixed_root_as_header(&header_buf)?; + + trace!("completed: opening http reader"); + Ok(HttpFcbReader { + client, + fbs: FcbBuffer { + header_buf, + features_buf: Vec::new(), + }, + }) + } + + pub fn header(&self) -> Header { + self.fbs.header() + } + fn header_len(&self) -> usize { + MAGIC_BYTES_SIZE + self.fbs.header_buf.len() + } + /// Select all features. + pub async fn select_all(self) -> Result> { + let header = self.fbs.header(); + let count = header.features_count(); + // TODO: support reading with unknown feature count + let index_size = if header.index_node_size() > 0 { + PackedRTree::index_size(count as usize, header.index_node_size()) + } else { + 0 + }; + // Skip index + let feature_base = self.header_len() + index_size; + Ok(AsyncFeatureIter { + client: self.client, + fbs: self.fbs, + selection: FeatureSelection::SelectAll(SelectAll { + features_left: count, + pos: feature_base, + }), + count: count as usize, + }) + } + /// Select features within a bounding box. + pub async fn select_bbox( + mut self, + min_x: f64, + min_y: f64, + max_x: f64, + max_y: f64, + ) -> Result> { + trace!("starting: select_bbox, traversing index"); + // Read R-Tree index and build filter for features within bbox + let header = self.fbs.header(); + if header.index_node_size() == 0 || header.features_count() == 0 { + return Err(anyhow!("NoIndex")); + } + let count = header.features_count() as usize; + let header_len = self.header_len(); + + // request up to this many extra bytes if it means we can eliminate an extra request + let combine_request_threshold = 256 * 1024; + + let list = PackedRTree::http_stream_search( + &mut self.client, + header_len, + count, + PackedRTree::DEFAULT_NODE_SIZE, + min_x, + min_y, + max_x, + max_y, + combine_request_threshold, + ) + .await?; + debug_assert!( + list.windows(2) + .all(|w| w[0].range.start() < w[1].range.start()), + "Since the tree is traversed breadth first, list should be sorted by construction." + ); + + let count = list.len(); + let feature_batches = FeatureBatch::make_batches(list, combine_request_threshold).await?; + let selection = FeatureSelection::SelectBbox(SelectBbox { feature_batches }); + trace!("completed: select_bbox"); + Ok(AsyncFeatureIter { + client: self.client, + fbs: self.fbs, + selection, + count, + }) + } +} + +impl AsyncFeatureIter { + pub fn header(&self) -> Header { + self.fbs.header() + } + /// Number of selected features (might be unknown) + pub fn features_count(&self) -> Option { + if self.count > 0 { + Some(self.count) + } else { + None + } + } + /// Read next feature + pub async fn next(&mut self) -> Result> { + let Some(buffer) = self.selection.next_feature_buffer(&mut self.client).await? else { + return Ok(None); + }; + + // Not zero-copy + self.fbs.features_buf = buffer.to_vec(); + // verify flatbuffer + let _feature = size_prefixed_root_as_city_feature(&self.fbs.features_buf)?; + Ok(Some(&self.fbs)) + } + /// Return current feature + pub fn cur_feature(&self) -> &FcbBuffer { + &self.fbs + } + + pub fn cur_cj_feature(&self) -> Result { + let cj_feature = to_cj_feature(self.cur_feature().feature(), self.header().columns())?; + Ok(cj_feature) + } + + // pub async fn cj_features(&mut self) -> Result> { + // let mut cj_features = Vec::new(); + // let columns = self.header().columns().unwrap_or_default().to_owned(); + + // while let Some(feature) = self.next().await.map_err(|e| anyhow!("NoFeature: {e}"))? { + // let cj_feature = to_cj_feature(feature.feature(), &columns)?; + // cj_features.push(cj_feature); + // } + // Ok(cj_features) + // } +} + +enum FeatureSelection { + SelectAll(SelectAll), + SelectBbox(SelectBbox), +} + +impl FeatureSelection { + async fn next_feature_buffer( + &mut self, + client: &mut AsyncBufferedHttpRangeClient, + ) -> Result> { + match self { + FeatureSelection::SelectAll(select_all) => select_all.next_buffer(client).await, + FeatureSelection::SelectBbox(select_bbox) => select_bbox.next_buffer(client).await, + } + } +} + +struct SelectAll { + /// Features left + features_left: u64, + + /// How many bytes into the file we've read so far + pos: usize, +} + +impl SelectAll { + async fn next_buffer( + &mut self, + client: &mut AsyncBufferedHttpRangeClient, + ) -> Result> { + client.min_req_size(DEFAULT_HTTP_FETCH_SIZE); + + if self.features_left == 0 { + return Ok(None); + } + self.features_left -= 1; + + let mut feature_buffer = BytesMut::from(client.get_range(self.pos, 4).await?); + self.pos += 4; + let feature_size = LittleEndian::read_u32(&feature_buffer) as usize; + feature_buffer.put(client.get_range(self.pos, feature_size).await?); + self.pos += feature_size; + + Ok(Some(feature_buffer.freeze())) + } +} + +struct SelectBbox { + /// Selected features + feature_batches: Vec, +} + +impl SelectBbox { + async fn next_buffer( + &mut self, + client: &mut AsyncBufferedHttpRangeClient, + ) -> Result> { + let mut next_buffer = None; + while next_buffer.is_none() { + let Some(feature_batch) = self.feature_batches.last_mut() else { + break; + }; + let Some(buffer) = feature_batch.next_buffer(client).await? else { + // done with this batch + self.feature_batches + .pop() + .expect("already asserted feature_batches was non-empty"); + continue; + }; + next_buffer = Some(buffer) + } + + Ok(next_buffer) + } +} + +struct FeatureBatch { + /// The byte location of each feature within the file + feature_ranges: VecDeque, +} + +impl FeatureBatch { + async fn make_batches( + feature_ranges: Vec, + combine_request_threshold: usize, + ) -> Result> { + let mut batched_ranges = vec![]; + + for search_result_item in feature_ranges.into_iter() { + let Some(latest_batch) = batched_ranges.last_mut() else { + let mut new_batch = VecDeque::new(); + new_batch.push_back(search_result_item.range); + batched_ranges.push(new_batch); + continue; + }; + + let previous_item = latest_batch.back().expect("we never push an empty batch"); + + let HttpRange::Range(Range { end: prev_end, .. }) = previous_item else { + debug_assert!(false, "This shouldn't happen. Only the very last feature is expected to have an unknown length"); + let mut new_batch = VecDeque::new(); + new_batch.push_back(search_result_item.range); + batched_ranges.push(new_batch); + continue; + }; + + let wasted_bytes = search_result_item.range.start() - prev_end; + if wasted_bytes < combine_request_threshold { + if wasted_bytes == 0 { + trace!("adjacent feature"); + } else { + trace!("wasting {wasted_bytes} to avoid an extra request"); + } + latest_batch.push_back(search_result_item.range) + } else { + trace!("creating a new request for batch rather than wasting {wasted_bytes} bytes"); + let mut new_batch = VecDeque::new(); + new_batch.push_back(search_result_item.range); + batched_ranges.push(new_batch); + } + } + + let mut batches: Vec<_> = batched_ranges.into_iter().map(FeatureBatch::new).collect(); + batches.reverse(); + Ok(batches) + } + + fn new(feature_ranges: VecDeque) -> Self { + Self { feature_ranges } + } + + /// When fetching new data, how many bytes should we fetch at once. + /// It was computed based on the specific feature ranges of the batch + /// to optimize number of requests vs. wasted bytes vs. resident memory + fn request_size(&self) -> usize { + let Some(first) = self.feature_ranges.front() else { + return 0; + }; + let Some(last) = self.feature_ranges.back() else { + return 0; + }; + + // `last.length()` should only be None if this batch includes the final feature + // in the dataset. Since we can't infer its actual length, we'll fetch only + // the first 4 bytes of that feature buffer, which will tell us the actual length + // of the feature buffer for the subsequent request. + let last_feature_length = last.length().unwrap_or(4); + + let covering_range = first.start()..last.start() + last_feature_length; + + covering_range + .len() + // Since it's all held in memory, don't fetch more than DEFAULT_HTTP_FETCH_SIZE at a time + // unless necessary. + .min(DEFAULT_HTTP_FETCH_SIZE) + } + + async fn next_buffer( + &mut self, + client: &mut AsyncBufferedHttpRangeClient, + ) -> Result> { + let request_size = self.request_size(); + client.set_min_req_size(request_size); + let Some(feature_range) = self.feature_ranges.pop_front() else { + return Ok(None); + }; + + let mut pos = feature_range.start(); + let mut feature_buffer = BytesMut::from(client.get_range(pos, 4).await?); + pos += 4; + let feature_size = LittleEndian::read_u32(&feature_buffer) as usize; + feature_buffer.put(client.get_range(pos, feature_size).await?); + + Ok(Some(feature_buffer.freeze())) + } +} + +// #[cfg(test)] +// mod tests { +// use crate::HttpFcbReader; + +// #[tokio::test] +// async fn fgb_max_request_size() { +// let (fgb, stats) = HttpFcbReader::mock_from_file("../../test/data/UScounties.fgb") +// .await +// .unwrap(); + +// { +// // The read guard needs to be in a scoped block, else we won't release the lock and the test will hang when +// // the actual FGB client code tries to update the stats. +// let stats = stats.read().unwrap(); +// assert_eq!(stats.request_count, 1); +// // This number might change a little if the test data or logic changes, but they should be in the same ballpark. +// assert_eq!(stats.bytes_requested, 12944); +// } + +// // This bbox covers a large swathe of the dataset. The idea is that at least one request should be limited by the +// // max request size `DEFAULT_HTTP_FETCH_SIZE`, but that we should still have a reasonable number of requests. +// let mut iter = fgb.select_bbox(-118.0, 42.0, -100.0, 47.0).await.unwrap(); + +// let mut feature_count = 0; +// while let Some(_feature) = iter.next().await.unwrap() { +// feature_count += 1; +// } +// assert_eq!(feature_count, 169); + +// { +// // The read guard needs to be in a scoped block, else we won't release the lock and the test will hang when +// // the actual FGB client code tries to update the stats. +// let stats = stats.read().unwrap(); +// // These numbers might change a little if the test data or logic changes, but they should be in the same ballpark. +// assert_eq!(stats.request_count, 5); +// assert_eq!(stats.bytes_requested, 2131152); +// } +// } +// } diff --git a/src/rust/src/lib.rs b/src/rust/src/lib.rs index 1bf7d5a..57fb574 100644 --- a/src/rust/src/lib.rs +++ b/src/rust/src/lib.rs @@ -1,26 +1,27 @@ #![allow(clippy::manual_range_contains)] mod cj_utils; +mod const_vars; mod error; #[allow(dead_code, unused_imports, clippy::all, warnings)] mod feature_generated; #[allow(dead_code, unused_imports, clippy::all, warnings)] mod header_generated; +mod http_reader; mod packedrtree; mod reader; mod writer; pub use cj_utils::*; +pub(crate) use const_vars::*; pub use feature_generated::*; pub use header_generated::*; +#[cfg(feature = "http")] +pub use http_reader::*; pub use packedrtree::*; pub use reader::*; pub use writer::*; -pub const VERSION: u8 = 1; -pub(crate) const MAGIC_BYTES: [u8; 8] = [b'f', b'c', b'b', VERSION, b'f', b'c', b'b', 0]; -pub(crate) const HEADER_MAX_BUFFER_SIZE: usize = 1024 * 1024 * 512; // 512MB - fn check_magic_bytes(bytes: &[u8]) -> bool { bytes[0..3] == MAGIC_BYTES[0..3] && bytes[4..7] == MAGIC_BYTES[4..7] && bytes[3] <= VERSION } diff --git a/src/rust/src/packedrtree.rs b/src/rust/src/packedrtree.rs index dc70d29..4d12a20 100644 --- a/src/rust/src/packedrtree.rs +++ b/src/rust/src/packedrtree.rs @@ -14,10 +14,11 @@ use anyhow::Result; use byteorder::{LittleEndian, ReadBytesExt, WriteBytesExt}; -// #[cfg(feature = "http")] -// use http_range_client::{ -// AsyncBufferedHttpRangeClient, AsyncHttpRangeClient, BufferedHttpRangeClient, -// }; +use core::f64; +#[cfg(feature = "http")] +use http_range_client::{ + AsyncBufferedHttpRangeClient, AsyncHttpRangeClient, BufferedHttpRangeClient, +}; use std::cmp::{max, min}; use std::collections::VecDeque; use std::io::{Cursor, Read, Seek, SeekFrom, Write}; @@ -30,10 +31,10 @@ use std::ops::Range; #[repr(C)] /// R-Tree node pub struct NodeItem { - pub min_x: i64, - pub min_y: i64, - pub max_x: i64, - pub max_y: i64, + pub min_x: f64, + pub min_y: f64, + pub max_x: f64, + pub max_y: f64, /// Byte offset in feature data section pub offset: u64, } @@ -42,10 +43,10 @@ impl NodeItem { #[deprecated( note = "Use NodeItem::bounds instead if you're only using the node item for bounds checking" )] - pub fn new(min_x: i64, min_y: i64, max_x: i64, max_y: i64) -> NodeItem { + pub fn new(min_x: f64, min_y: f64, max_x: f64, max_y: f64) -> NodeItem { Self::bounds(min_x, min_y, max_x, max_y) } - pub fn bounds(min_x: i64, min_y: i64, max_x: i64, max_y: i64) -> NodeItem { + pub fn bounds(min_x: f64, min_y: f64, max_x: f64, max_y: f64) -> NodeItem { NodeItem { min_x, min_y, @@ -57,20 +58,20 @@ impl NodeItem { pub fn create(offset: u64) -> NodeItem { NodeItem { - min_x: i64::MAX, - min_y: i64::MAX, - max_x: i64::MIN, - max_y: i64::MIN, + min_x: f64::INFINITY, + min_y: f64::INFINITY, + max_x: f64::NEG_INFINITY, + max_y: f64::NEG_INFINITY, offset, } } pub fn from_reader(mut rdr: impl Read) -> Result { Ok(NodeItem { - min_x: rdr.read_i64::()?, - min_y: rdr.read_i64::()?, - max_x: rdr.read_i64::()?, - max_y: rdr.read_i64::()?, + min_x: rdr.read_f64::()?, + min_y: rdr.read_f64::()?, + max_x: rdr.read_f64::()?, + max_y: rdr.read_f64::()?, offset: rdr.read_u64::()?, }) } @@ -80,19 +81,19 @@ impl NodeItem { } pub fn write(&self, wtr: &mut W) -> std::io::Result<()> { - wtr.write_i64::(self.min_x)?; - wtr.write_i64::(self.min_y)?; - wtr.write_i64::(self.max_x)?; - wtr.write_i64::(self.max_y)?; + wtr.write_f64::(self.min_x)?; + wtr.write_f64::(self.min_y)?; + wtr.write_f64::(self.max_x)?; + wtr.write_f64::(self.max_y)?; wtr.write_u64::(self.offset)?; Ok(()) } - pub fn width(&self) -> i64 { + pub fn width(&self) -> f64 { self.max_x - self.min_x } - pub fn height(&self) -> i64 { + pub fn height(&self) -> f64 { self.max_y - self.min_y } @@ -116,7 +117,7 @@ impl NodeItem { } } - pub fn expand_xy(&mut self, x: i64, y: i64) { + pub fn expand_xy(&mut self, x: f64, y: f64) { if x < self.min_x { self.min_x = x; } @@ -264,11 +265,9 @@ fn hilbert(x: u32, y: u32) -> u32 { fn hilbert_bbox(r: &NodeItem, hilbert_max: u32, extent: &NodeItem) -> u32 { // calculate bbox center and scale to hilbert_max - let x = (hilbert_max as f64 * ((r.min_x + r.max_x) / 2 - extent.min_x) as f64 - / extent.width() as f64) + let x = (hilbert_max as f64 * ((r.min_x + r.max_x) / 2.0 - extent.min_x) / extent.width()) .floor() as u32; - let y = (hilbert_max as f64 * ((r.min_y + r.max_y) / 2 - extent.min_y) as f64 - / extent.height() as f64) + let y = (hilbert_max as f64 * ((r.min_y + r.max_y) / 2.0 - extent.min_y) / extent.height()) .floor() as u32; hilbert(x, y) } @@ -461,10 +460,10 @@ impl PackedRTree { pub fn search( &self, - min_x: i64, - min_y: i64, - max_x: i64, - max_y: i64, + min_x: f64, + min_y: f64, + max_x: f64, + max_y: f64, ) -> Result> { let leaf_nodes_offset = self .level_bounds @@ -507,10 +506,10 @@ impl PackedRTree { data: &mut R, num_items: usize, node_size: u16, - min_x: i64, - min_y: i64, - max_x: i64, - max_y: i64, + min_x: f64, + min_y: f64, + max_x: f64, + max_y: f64, ) -> Result> { let bounds = NodeItem::bounds(min_x, min_y, max_x, max_y); let level_bounds = PackedRTree::generate_level_bounds(num_items, node_size); @@ -578,6 +577,8 @@ impl PackedRTree { max_y: f64, combine_request_threshold: usize, ) -> Result> { + use tracing::debug; + let bounds = NodeItem::bounds(min_x, min_y, max_x, max_y); if num_items == 0 { return Ok(vec![]); @@ -828,24 +829,24 @@ mod tests { #[test] fn tree_2items() -> Result<()> { let mut nodes = Vec::new(); - nodes.push(NodeItem::bounds(0, 0, 1, 1)); - nodes.push(NodeItem::bounds(2, 2, 3, 3)); + nodes.push(NodeItem::bounds(0.0, 0.0, 1.0, 1.0)); + nodes.push(NodeItem::bounds(2.0, 2.0, 3.0, 3.0)); let extent = calc_extent(&nodes); - assert_eq!(extent, NodeItem::bounds(0, 0, 3, 3)); - assert!(nodes[0].intersects(&NodeItem::bounds(0, 0, 1, 1))); - assert!(nodes[1].intersects(&NodeItem::bounds(2, 2, 3, 3))); + assert_eq!(extent, NodeItem::bounds(0.0, 0.0, 3.0, 3.0)); + assert!(nodes[0].intersects(&NodeItem::bounds(0.0, 0.0, 1.0, 1.0))); + assert!(nodes[1].intersects(&NodeItem::bounds(2.0, 2.0, 3.0, 3.0))); hilbert_sort(&mut nodes, &extent); let mut offset = 0; for node in &mut nodes { node.offset = offset; offset += size_of::() as u64; } - assert!(nodes[1].intersects(&NodeItem::bounds(0, 0, 1, 1))); - assert!(nodes[0].intersects(&NodeItem::bounds(2, 2, 3, 3))); + assert!(nodes[1].intersects(&NodeItem::bounds(0.0, 0.0, 1.0, 1.0))); + assert!(nodes[0].intersects(&NodeItem::bounds(2.0, 2.0, 3.0, 3.0))); let tree = PackedRTree::build(&nodes, &extent, PackedRTree::DEFAULT_NODE_SIZE)?; - let list = tree.search(0, 0, 1, 1)?; + let list = tree.search(0.0, 0.0, 1.0, 1.0)?; assert_eq!(list.len(), 1); - assert!(nodes[list[0].index].intersects(&NodeItem::bounds(0, 0, 1, 1))); + assert!(nodes[list[0].index].intersects(&NodeItem::bounds(0.0, 0.0, 1.0, 1.0))); Ok(()) } diff --git a/src/rust/src/reader/city_buffer.rs b/src/rust/src/reader/city_buffer.rs index a84e12d..fa1c6b2 100644 --- a/src/rust/src/reader/city_buffer.rs +++ b/src/rust/src/reader/city_buffer.rs @@ -1,5 +1,8 @@ +use crate::deserializer::to_cj_feature; use crate::feature_generated::*; use crate::header_generated::*; +use anyhow::Result; +use cjseq::CityJSONFeature; pub struct FcbBuffer { pub(crate) header_buf: Vec, @@ -11,7 +14,14 @@ impl FcbBuffer { unsafe { size_prefixed_root_as_header_unchecked(&self.header_buf) } } - pub(crate) fn feature(&self) -> CityFeature { + pub fn feature(&self) -> CityFeature { unsafe { size_prefixed_root_as_city_feature_unchecked(&self.features_buf) } } + + // TODO: think well if needed + pub fn cj_feature(&self) -> Result { + let fcb_feature = self.feature(); + let root_attr_schema = self.header().columns(); + to_cj_feature(fcb_feature, root_attr_schema) + } } diff --git a/src/rust/src/reader/mod.rs b/src/rust/src/reader/mod.rs index 5b1bf87..098cecb 100644 --- a/src/rust/src/reader/mod.rs +++ b/src/rust/src/reader/mod.rs @@ -1,6 +1,6 @@ pub mod city_buffer; pub mod deserializer; -use city_buffer::FcbBuffer; +use city_buffer::*; use cjseq::CityJSONFeature; use deserializer::to_cj_feature; @@ -128,7 +128,7 @@ impl FcbReader { header.index_node_size(), )?; let (min_x, min_y, max_x, max_y) = (min_x as i64, min_y as i64, max_x as i64, max_y as i64); - let list = index.search(min_x, min_y, max_x, max_y)?; + let list = index.search(min_x as f64, min_y as f64, max_x as f64, max_y as f64)?; debug_assert!( list.windows(2).all(|w| w[0].offset < w[1].offset), "Since the tree is traversed breadth first, list should be sorted by construction." @@ -174,10 +174,10 @@ impl FcbReader { &mut self.reader, header.features_count() as usize, PackedRTree::DEFAULT_NODE_SIZE, - min_x, - min_y, - max_x, - max_y, + min_x as f64, + min_y as f64, + max_x as f64, + max_y as f64, )?; debug_assert!( list.windows(2).all(|w| w[0].offset < w[1].offset), diff --git a/src/rust/src/writer/serializer.rs b/src/rust/src/writer/serializer.rs index 8792bdf..061f0f5 100644 --- a/src/rust/src/writer/serializer.rs +++ b/src/rust/src/writer/serializer.rs @@ -297,10 +297,30 @@ pub fn to_fcb_city_feature<'a>( .collect::>(), ), ); - let min_x = city_feature.vertices.iter().map(|v| v[0]).min().unwrap(); - let min_y = city_feature.vertices.iter().map(|v| v[1]).min().unwrap(); - let max_x = city_feature.vertices.iter().map(|v| v[0]).max().unwrap(); - let max_y = city_feature.vertices.iter().map(|v| v[1]).max().unwrap(); + let min_x = city_feature + .vertices + .iter() + .map(|v| v[0]) + .min() + .unwrap_or(0) as f64; + let min_y = city_feature + .vertices + .iter() + .map(|v| v[1]) + .min() + .unwrap_or(0) as f64; + let max_x = city_feature + .vertices + .iter() + .map(|v| v[0]) + .max() + .unwrap_or(0) as f64; + let max_y = city_feature + .vertices + .iter() + .map(|v| v[1]) + .max() + .unwrap_or(0) as f64; let bbox = NodeItem::new(min_x, min_y, max_x, max_y); ( diff --git a/src/rust/tests/http.rs b/src/rust/tests/http.rs new file mode 100644 index 0000000..1b234c3 --- /dev/null +++ b/src/rust/tests/http.rs @@ -0,0 +1,52 @@ +use std::{ + error::Error, + fs::File, + io::{BufWriter, Write}, +}; + +use flatcitybuf::{deserializer::to_cj_metadata, HttpFcbReader}; + +use anyhow::Result; +async fn read_http_file(path: &str) -> Result<(), Box> { + let http_reader = HttpFcbReader::open(path).await?; + let min_x = -200000.0; + let min_y = -200000.0; + let max_x = 200000.0; + let max_y = 200000.0; + let mut iter = http_reader.select_bbox(min_x, min_y, max_x, max_y).await?; + let header = iter.header(); + let cj = to_cj_metadata(&header)?; + + let mut writer = BufWriter::new(File::create("delft_http.city.jsonl")?); + writeln!(writer, "{}", serde_json::to_string(&cj)?)?; + + let mut feat_num = 0; + let feat_count = header.features_count(); + while let Some(feature) = iter.next().await? { + let cj_feature = feature.cj_feature()?; + writeln!(writer, "{}", serde_json::to_string(&cj_feature)?)?; + + feat_num += 1; + if feat_num >= feat_count { + break; + } + } + // TODO: add more tests + Ok(()) +} + +mod http { + use anyhow::Result; + + use crate::read_http_file; + + #[tokio::test] + async fn test_read_http_file() -> Result<()> { + let res = read_http_file( + "https://github.com/HideBa/flatcitybuf-testing/raw/refs/heads/main/delft_attr.fcb", + ) + .await; + assert!(res.is_ok()); + Ok(()) + } +}