Skip to content

Commit

Permalink
feat: support http range request to fetch subset of file (#3)
Browse files Browse the repository at this point in the history
  • Loading branch information
HideBa authored Jan 19, 2025
1 parent dc224a2 commit 73734b9
Show file tree
Hide file tree
Showing 14 changed files with 756 additions and 180 deletions.
5 changes: 4 additions & 1 deletion .vscode/settings.json
Original file line number Diff line number Diff line change
Expand Up @@ -8,5 +8,8 @@
"rust-analyzer.checkOnSave.extraArgs": [
"--lib"
],
"python.pythonPath": "${workspaceFolder}/src/py/.venv/bin/python"
"python.pythonPath": "${workspaceFolder}/src/py/.venv/bin/python",
"rust-analyzer.server.extraEnv": {
"RUST_LOG": "debug"
}
}
6 changes: 5 additions & 1 deletion LICENSE
Original file line number Diff line number Diff line change
Expand Up @@ -22,9 +22,13 @@ SOFTWARE.

---

Portions of this software are derived from FlatGeobuf, which is licensed under the BSD 2-Clause License.
Portions of this software are derived from the FlatGeobuf project,
specifically the `packed_r_tree.rs` file, which is licensed under the
BSD 2-Clause License. The original code can be found at:
https://github.com/flatgeobuf/flatgeobuf/blob/master/src/rust/src/packed_r_tree.rs

Copyright (c) 2018, Björn Harrtell, Postnummer Stockholm AB
Copyright (c) 2024, Pirmin Kalberer <[email protected]> (Rust implementation)
All rights reserved.

Redistribution and use in source and binary forms, with or without
Expand Down
18 changes: 13 additions & 5 deletions src/rust/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,9 @@ absolute_path = "warn"
nursery = { level = "warn", priority = -1 }

# See more keys and their definitions at https://doc.rust-lang.org/cargo/reference/manifest.html
[features]
default = []
http = ["http-range-client", "bytes", "reqwest"]

[dependencies]
flatbuffers = "24.3.25"
Expand All @@ -18,6 +21,13 @@ serde_json = "1.0.133"
anyhow = "1.0.95"
fallible-streaming-iterator = "0.1.9"
clap = "4.5.23"
http-range-client = { version = "0.9.0", optional = true, default-features = false, features = [
"reqwest-async",
] }
reqwest = { version = "0.12.12", optional = true }
bytes = { version = "1.9.0", optional = true }
tracing = "0.1.41"
tokio = "1.43.0"

[lib]
name = "flatcitybuf"
Expand All @@ -33,14 +43,12 @@ name = "flatcitybuf_cli"
path = "src/main.rs"

[dev-dependencies]
criterion = { version = "0.5.1", features = ["html_reports"] }
async-trait = "0.1.85"
criterion = { version = "0.5.1", features = ["async_tokio", "html_reports"] }
memory-stats = "1.2.0"
pretty_assertions = "1.4.1"
tokio = { version = "1.43.0", features = ["macros", "rt-multi-thread"] }

[[bench]]
name = "read"
harness = false

# [features]
# default = ["http"]
# http = ["http-range-client", "bytes", "reqwest"]
12 changes: 6 additions & 6 deletions src/rust/makefile
Original file line number Diff line number Diff line change
Expand Up @@ -3,20 +3,20 @@
.PHONY: pre-commit
pre-commit:
cargo fmt
cargo clippy --fix --allow-dirty
cargo machete
cargo nextest run
cargo clippy --fix --allow-dirty --all-features
cargo machete --all-features
cargo nextest run --all-features
cargo check --all-features
cargo build --release
cargo build --release --all-features

.PHONY: ser
ser:
cargo run --bin flatcitybuf_cli ser -i tests/data/delft.city.jsonl -o temp/delft_attr.fcb
cargo run --all-features --bin flatcitybuf_cli ser -i tests/data/delft.city.jsonl -o temp/delft_attr.fcb
# cargo run --bin flatcitybuf_cli serialize -i tests/data/delft.city.jsonl -o temp/delft.fcb

.PHONY: deser
deser:
cargo run --bin flatcitybuf_cli deser -i temp/delft_attr.fcb -o temp/delft_attr.city.jsonl
cargo run --all-features --bin flatcitybuf_cli deser -i temp/delft_attr.fcb -o temp/delft_attr.city.jsonl
# cargo run --bin flatcitybuf_cli deserialize -i temp/small.fcb -o temp/small.city.jsonl

.PHONY: bench
Expand Down
106 changes: 0 additions & 106 deletions src/rust/src/bin/read.rs
Original file line number Diff line number Diff line change
Expand Up @@ -42,112 +42,6 @@ fn read_file() -> Result<(), Box<dyn Error>> {
.write_all(format!("{}\n", serde_json::to_string(feature).unwrap()).as_bytes())?;
}

// let original_cjseq_file = manifest_dir
// .join("tests")
// .join("data")
// .join("small.city.jsonl");
// let cjseq_file = File::open(original_cjseq_file)?;
// let cjseq_reader = BufReader::new(cjseq_file);
// let cj_seq = read_cityjson_from_reader(cjseq_reader, CJTypeKind::Seq)?;
// if let CJType::Seq(cj_seq) = cj_seq {
// let CityJSONSeq {
// cj: original_cj,
// features: original_features,
// } = cj_seq;

// // Compare features
// if original_features != features {
// println!("features differ:");
// if original_features.len() != features.len() {
// println!(
// "length mismatch: {} != {}",
// original_features.len(),
// features.len()
// );
// } else {
// for (i, (orig, new)) in original_features.iter().zip(features.iter()).enumerate() {
// if orig != new {
// println!("feature {} differs:", i);
// if orig.thetype != new.thetype {
// println!(" type mismatch: {} != {}", orig.thetype, new.thetype);
// }
// if orig.city_objects != new.city_objects {
// println!(" city_objects mismatch at index {}", i);
// //compare the first element of the city_objects
// let orig_first = orig.city_objects.iter().next().unwrap();
// let new_first = new.city_objects.get(orig_first.0).unwrap();

// println!(" key mismatch: {}", orig_first.0);
// if orig_first.1.geometry != new_first.geometry {
// println!(
// " geometry mismatch: {:?} != {:?}",
// orig_first.1.geometry, new_first.geometry
// );
// //geometry is a vector of geometry, iterate over the elements and compare
// for (i, (orig_geom, new_geom)) in orig_first
// .1
// .geometry
// .iter()
// .zip(new_first.geometry.iter())
// .enumerate()
// {
// if orig_geom != new_geom {
// println!(
// " geometry element {} mismatch: {:?} != {:?}",
// i, orig_geom, new_geom
// );
// }
// // compare each element of the geometry
// for (j, (orig_elem, new_elem)) in
// orig_geom.iter().zip(new_geom.iter()).enumerate()
// {
// if orig_elem != new_elem {
// println!(
// " geometry element {} mismatch: {:?} != {:?}",
// i, orig_elem, new_elem
// );
// }
// if orig_elem.boundaries != new_elem.boundaries {
// println!(
// " boundaries mismatch: {:?} != {:?}",
// orig_elem.boundaries, new_elem.boundaries
// );
// }
// if orig_elem.thetype != new_elem.thetype {
// println!(
// " type mismatch: {:?} != {:?}",
// orig_elem.thetype, new_elem.thetype
// );
// }
// if orig_elem.lod != new_elem.lod {
// println!(
// " lod mismatch: {:?} != {:?}",
// orig_elem.lod, new_elem.lod
// );
// }
// }
// }
// }
// if orig_first.1.geographical_extent != new_first.geographical_extent {
// println!(
// " geographical_extent mismatch: {:?} != {:?}",
// orig_first.1.geographical_extent, new_first.geographical_extent
// );
// }
// }
// if orig.vertices != new.vertices {
// println!(
// " vertices mismatch: {:?} != {:?}",
// orig.vertices, new.vertices
// );
// }
// }
// }
// }
// panic!("features are not equal");
// }
// }

Ok(())
}

Expand Down
17 changes: 17 additions & 0 deletions src/rust/src/const_vars.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,17 @@
// Current version of FlatCityBuf
pub(crate) const VERSION: u8 = 1;

// Magic bytes for FlatCityBuf
pub(crate) const MAGIC_BYTES: [u8; 8] = [b'f', b'c', b'b', VERSION, b'f', b'c', b'b', 0];

// Maximum buffer size for header
pub(crate) const HEADER_MAX_BUFFER_SIZE: usize = 1024 * 1024 * 512; // 512MB

// Size of magic bytes
pub(crate) const MAGIC_BYTES_SIZE: usize = 8;

// Size of header size
pub(crate) const HEADER_SIZE_SIZE: usize = 4;

// // Offset of header size
// pub(crate) const HEADER_SIZE_OFFSET: usize = MAGIC_BYTES_SIZE + HEADER_SIZE_SIZE;
107 changes: 107 additions & 0 deletions src/rust/src/http_reader/mock_http_range_client.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,107 @@
use crate::HttpFcbReader;
use anyhow::Result;
use bytes::Bytes;
use http_range_client::AsyncHttpRangeClient;
use std::fs::File;
use std::io::{BufReader, Read, Seek, SeekFrom};
use std::ops::Range;
use std::path::PathBuf;
use std::sync::{Arc, RwLock};
use tracing::trace;

impl HttpFcbReader<MockHttpRangeClient> {
/// NOTE: For debugging expediency, this test class often prefers panics over returning a result.
pub async fn mock_from_file(
path: &str,
) -> Result<(
HttpFcbReader<MockHttpRangeClient>,
Arc<RwLock<RequestStats>>,
)> {
trace!("starting: opening http reader, reading header");

let stats = Arc::new(RwLock::new(RequestStats::new()));
let http_client = MockHttpRangeClient::new(path, stats.clone());
let client = http_range_client::AsyncBufferedHttpRangeClient::with(http_client, path);
Ok((Self::_open(client).await?, stats))
}
}

/// NOTE: For debugging expediency, this test class often prefers panics over returning a result.
pub(crate) struct MockHttpRangeClient {
path: PathBuf,
stats: Arc<RwLock<RequestStats>>,
}

pub(crate) struct RequestStats {
pub request_count: u64,
pub bytes_requested: u64,
}

impl RequestStats {
fn new() -> Self {
Self {
request_count: 0,
bytes_requested: 0,
}
}
}

#[async_trait::async_trait]
impl AsyncHttpRangeClient for MockHttpRangeClient {
async fn get_range(&self, url: &str, range: &str) -> http_range_client::Result<Bytes> {
assert_eq!(url, self.path.to_str().unwrap());

/// This is a hack, but we need the start and length of the range
/// since all we're given is the pre-formatted range string, we
/// need to parse it into its components
///
/// For expediency, this test code panics rather than returns a result.
fn parse_range_header(range: &str) -> Range<u64> {
let bytes = range.strip_prefix("bytes=").unwrap();
let parts: Vec<&str> = bytes.split('-').collect();
assert!(parts.len() == 2);
let start = parts[0].parse().expect("should have valid start range");
let end: u64 = parts[1].parse().expect("should have valid end range");
// Range headers are *inclusive*
start..(end + 1)
}

let range = parse_range_header(range);
let request_length = range.end - range.start;

let mut stats = self
.stats
.write()
.expect("test code does not handle actual concurrency");

stats.request_count += 1;
stats.bytes_requested += request_length;

let mut file_reader = BufReader::new(File::open(&self.path).unwrap());
file_reader
.seek(SeekFrom::Start(range.start))
.expect("unable to seek test reader");
let mut output = vec![0; request_length as usize];
file_reader
.read_exact(&mut output)
.expect("failed to read from test reader");
Ok(Bytes::from(output))
}

async fn head_response_header(
&self,
_url: &str,
_header: &str,
) -> http_range_client::Result<Option<String>> {
unimplemented!()
}
}

impl MockHttpRangeClient {
fn new(path: &str, stats: Arc<RwLock<RequestStats>>) -> Self {
Self {
path: path.into(),
stats,
}
}
}
Loading

0 comments on commit 73734b9

Please sign in to comment.