Skip to content

Commit

Permalink
feat:multithread made necessary modules public for using in mutithrea…
Browse files Browse the repository at this point in the history
…ded envs, implemented ChunkReader and provided example
  • Loading branch information
feefladder committed Sep 18, 2024
1 parent feefadd commit 49c16cd
Show file tree
Hide file tree
Showing 6 changed files with 318 additions and 32 deletions.
11 changes: 9 additions & 2 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,9 @@ exclude = ["tests/images/*", "tests/fuzz_images/*"]

[features]
async_decoder = ["dep:futures", "dep:async-trait"]
ehttp = ["dep:ehttp", "async_decoder"]
multithread = ["async_decoder"]
# only for async example reading a COG
ehttp = ["async_decoder", "dep:ehttp"]

[dependencies]
weezl = "0.1.0"
Expand All @@ -44,4 +46,9 @@ description = "Example showing use of async features using async http requests"
[[example]]
name = "async_http"
path="examples/async_http.rs"
required-features=["ehttp"]
required-features=["ehttp"]

[package.metadata.example.multithread]
name = "multithread_http"
path="examples/multithread_http.rs"
required-features=["ehttp", "async_decoder"]
194 changes: 194 additions & 0 deletions examples/multithread_http.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,194 @@
// Special thanks to Alice for the help: https://users.rust-lang.org/t/63019/6
use std::io::{Result, SeekFrom};
use std::pin::Pin;
use std::sync::Arc;
use futures::{
future::BoxFuture,
io::{AsyncRead, AsyncSeek},
Future,
};
use tiff::decoder::Decoder;

// extern crate ehttp;

// Arc for sharing, see https://users.rust-lang.org/t/how-to-clone-a-boxed-closure/31035/9
// or https://stackoverflow.com/a/27883612/14681457
pub type F = Arc<
dyn Fn(u64, u64) -> BoxFuture<'static, std::io::Result<SeekOutput>> + Send + Sync,
>;
pub struct RangedStreamer {
pos: u64,
length: u64, // total size
state: State,
range_get: F,
min_request_size: usize, // requests have at least this size
}

/// This is a fake clone, that doesn't clone the currently pending task, but everything else
impl Clone for RangedStreamer {
fn clone(&self) -> Self {
RangedStreamer {
pos: self.pos,
length: self.length,
state: State::HasChunk(SeekOutput {
start: 0,
data: vec![],
}),
range_get: self.range_get.clone(),
min_request_size: self.min_request_size,
}
}
}

enum State {
HasChunk(SeekOutput),
Seeking(BoxFuture<'static, std::io::Result<SeekOutput>>),
}

#[derive(Debug, Clone)]
pub struct SeekOutput {
pub start: u64,
pub data: Vec<u8>,
}



impl RangedStreamer {
pub fn new(length: usize, min_request_size: usize, range_get: F) -> Self {
let length = length as u64;
Self {
pos: 0,
length,
state: State::HasChunk(SeekOutput {
start: 0,
data: vec![],
}),
range_get,
min_request_size,
}
}
}

// whether `test_interval` is inside `a` (start, length).
fn range_includes(a: (usize, usize), test_interval: (usize, usize)) -> bool {
if test_interval.0 < a.0 {
return false;
}
let test_end = test_interval.0 + test_interval.1;
let a_end = a.0 + a.1;
if test_end > a_end {
return false;
}
true
}

impl AsyncRead for RangedStreamer {
fn poll_read(
mut self: std::pin::Pin<&mut Self>,
cx: &mut std::task::Context<'_>,
buf: &mut [u8],
) -> std::task::Poll<Result<usize>> {
let requested_range = (self.pos as usize, buf.len());
let min_request_size = self.min_request_size;
match &mut self.state {
State::HasChunk(output) => {
let existing_range = (output.start as usize, output.data.len());
if range_includes(existing_range, requested_range) {
let offset = requested_range.0 - existing_range.0;
buf.copy_from_slice(&output.data[offset..offset + buf.len()]);
self.pos += buf.len() as u64;
std::task::Poll::Ready(Ok(buf.len()))
} else {
let start = requested_range.0 as u64;
let length = std::cmp::max(min_request_size, requested_range.1);
let future = (self.range_get)(start, length.try_into().unwrap());
self.state = State::Seeking(Box::pin(future));
self.poll_read(cx, buf)
}
}
State::Seeking(ref mut future) => match Pin::new(future).poll(cx) {
std::task::Poll::Ready(v) => {
match v {
Ok(output) => self.state = State::HasChunk(output),
Err(e) => return std::task::Poll::Ready(Err(e)),
};
self.poll_read(cx, buf)
}
std::task::Poll::Pending => std::task::Poll::Pending,
},
}
}
}

impl AsyncSeek for RangedStreamer {
fn poll_seek(
mut self: std::pin::Pin<&mut Self>,
_: &mut std::task::Context<'_>,
pos: std::io::SeekFrom,
) -> std::task::Poll<Result<u64>> {
match pos {
SeekFrom::Start(pos) => self.pos = pos,
SeekFrom::End(pos) => self.pos = (self.length as i64 + pos) as u64,
SeekFrom::Current(pos) => self.pos = (self.pos as i64 + pos) as u64,
};
std::task::Poll::Ready(Ok(self.pos))
}
}



#[tokio::main]
async fn main() {
let url = "https://isdasoil.s3.amazonaws.com/covariates/dem_30m/dem_30m.tif";
let Ok(url_head) = ehttp::fetch_async(ehttp::Request::head(url)).await else {println!("EPIC FAIL!"); return;};
let length = usize::from_str_radix(url_head.headers.get("content-length").unwrap(), 10).expect("Could not parse content length");
println!("head: {:?}", url_head);
let range_get = Arc::new(move |start: u64, length: u64| {
// let bucket = bucket.clone();
let url = url;
Box::pin(async move {
println!("requested: {} kb", length / 1024);
let mut request = ehttp::Request::get(url);
request.headers.insert("Range".to_string(), format!("bytes={:?}-{:?}",start,start+length));
let resp = ehttp::fetch_async(request).await.map_err(|e| std::io::Error::other(e))?;
if !resp.ok {
Err(std::io::Error::other(format!("Received invalid response: {:?}", resp.status)))
} else {
println!("received: {} kb", resp.bytes.len() / 1024);
Ok(SeekOutput {start, data: resp.bytes})
}
}) as BoxFuture<'static, std::io::Result<SeekOutput>>
});
let reader = RangedStreamer::new(length, 30*1024, range_get);

// this decoder will read all necessary tags
let decoder = Decoder::new_overview_async(reader, 0).await.expect("oh noes!");
println!("initialized decoder");
let cloneable_decoder = tiff::decoder::ChunkDecoder::from_decoder(decoder);

let mut handles = Vec::new();
for chunk in 42..69 {
let mut cloned_decoder = cloneable_decoder.clone();

let handle = tokio::spawn(async move {

let result = cloned_decoder.read_chunk_async(chunk).await;
match result {
Ok(data) => {
println!("Successfully read chunk {}", chunk);
Ok(data) // Return the data for collection
}
Err(e) => {
eprintln!("Error reading chunk {}: {:?}", chunk, e);
Err(e) // Return the error for handling
}
}
});
handles.push(handle);
}

let results = futures::future::join_all(handles).await;
for r in results {
println!("result: {:?}", r.expect("idk").expect("idk²").len())
}
}
4 changes: 2 additions & 2 deletions src/decoder/async_decoder/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -453,7 +453,7 @@ impl<R: AsyncRead + AsyncSeek + RangeReader + Unpin + Send> Decoder<R> {
pub async fn read_chunk_async(&mut self, chunk_index: u32) -> TiffResult<DecodingResult> {
let data_dims = self.image().chunk_data_dimensions(chunk_index)?;

let mut result = self.result_buffer(data_dims.0 as usize, data_dims.1 as usize)?;
let mut result = Self::result_buffer(data_dims.0 as usize, data_dims.1 as usize, self.image(), &self.limits)?;

self.read_chunk_to_buffer_async(result.as_buffer(0), chunk_index, data_dims.0 as usize)
.await?;
Expand All @@ -465,7 +465,7 @@ impl<R: AsyncRead + AsyncSeek + RangeReader + Unpin + Send> Decoder<R> {
pub async fn read_image_async(&mut self) -> TiffResult<DecodingResult> {
let width = self.image().width;
let height = self.image().height;
let mut result = self.result_buffer(width as usize, height as usize)?;
let mut result = Self::result_buffer(usize::try_from(width)?, usize::try_from(height)?, self.image(), &self.limits )?;
if width == 0 || height == 0 {
return Ok(result);
}
Expand Down
14 changes: 6 additions & 8 deletions src/decoder/image.rs
Original file line number Diff line number Diff line change
Expand Up @@ -10,14 +10,14 @@ use crate::{ColorType, TiffError, TiffFormatError, TiffResult, TiffUnsupportedEr
use std::io::{self, Cursor, Read, Seek};
use std::sync::Arc;

#[derive(Debug)]
pub(crate) struct StripDecodeState {
#[derive(Debug, Clone)]
pub struct StripDecodeState {
pub rows_per_strip: u32,
}

#[derive(Debug)]
#[derive(Debug, Clone)]
/// Computed values useful for tile decoding
pub(crate) struct TileAttributes {
pub struct TileAttributes {
pub image_width: usize,
pub image_height: usize,

Expand Down Expand Up @@ -58,8 +58,8 @@ impl TileAttributes {
}
}

#[derive(Debug)]
pub(crate) struct Image {
#[derive(Debug, Clone)]
pub struct Image {
pub ifd: Option<Directory>,
pub width: u32,
pub height: u32,
Expand Down Expand Up @@ -687,8 +687,6 @@ impl Image {
let row = &mut row[..data_row_bytes];
reader.read_exact(row)?;

println!("chunk={chunk_index}, index={i}");

// Skip horizontal padding
if chunk_row_bytes > data_row_bytes {
let len = u64::try_from(chunk_row_bytes - data_row_bytes)?;
Expand Down
59 changes: 39 additions & 20 deletions src/decoder/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -15,12 +15,16 @@ use crate::tags::{
use self::stream::{ByteOrder, EndianReader, SmartReader};

pub mod ifd;
mod image;
mod stream;
pub mod image;
pub mod stream;
mod tag_reader;

#[cfg(feature = "async_decoder")]
mod async_decoder;
pub mod async_decoder;
#[cfg(feature = "multithread")]
mod multithread_decoder;
#[cfg(feature = "multithread")]
pub use multithread_decoder::ChunkDecoder;

/// Result of a decoding process
#[derive(Debug)]
Expand Down Expand Up @@ -142,6 +146,21 @@ impl DecodingResult {
DecodingResult::I64(ref mut buf) => DecodingBuffer::I64(&mut buf[start..]),
}
}

pub fn len(&self) -> usize {
match self {
DecodingResult::U8(v) => v.len(),
DecodingResult::U16(v) => v.len(),
DecodingResult::U32(v) => v.len(),
DecodingResult::U64(v) => v.len(),
DecodingResult::F32(v) => v.len(),
DecodingResult::F64(v) => v.len(),
DecodingResult::I8(v) => v.len(),
DecodingResult::I16(v) => v.len(),
DecodingResult::I32(v) => v.len(),
DecodingResult::I64(v) => v.len(),
}
}
}

// A buffer for image decoding
Expand Down Expand Up @@ -435,7 +454,7 @@ impl<R> Decoder<R> {
self.image().colortype()
}

fn image(&self) -> &Image {
pub fn image(&self) -> &Image {
&self.image
}

Expand Down Expand Up @@ -491,38 +510,38 @@ impl<R> Decoder<R> {
}


fn result_buffer(&self, width: usize, height: usize) -> TiffResult<DecodingResult> {
fn result_buffer(width: usize, height: usize, image: &Image, limits: &Limits) -> TiffResult<DecodingResult> {
let buffer_size = match width
.checked_mul(height)
.and_then(|x| x.checked_mul(self.image().samples_per_pixel()))
.and_then(|x| x.checked_mul(image.samples_per_pixel()))
{
Some(s) => s,
None => return Err(TiffError::LimitsExceeded),
};

let max_sample_bits = self.image().bits_per_sample;
match self.image().sample_format {
let max_sample_bits = image.bits_per_sample;
match image.sample_format {
SampleFormat::Uint => match max_sample_bits {
n if n <= 8 => DecodingResult::new_u8(buffer_size, &self.limits),
n if n <= 16 => DecodingResult::new_u16(buffer_size, &self.limits),
n if n <= 32 => DecodingResult::new_u32(buffer_size, &self.limits),
n if n <= 64 => DecodingResult::new_u64(buffer_size, &self.limits),
n if n <= 8 => DecodingResult::new_u8(buffer_size, &limits),
n if n <= 16 => DecodingResult::new_u16(buffer_size, &limits),
n if n <= 32 => DecodingResult::new_u32(buffer_size, &limits),
n if n <= 64 => DecodingResult::new_u64(buffer_size, &limits),
n => Err(TiffError::UnsupportedError(
TiffUnsupportedError::UnsupportedBitsPerChannel(n),
)),
},
SampleFormat::IEEEFP => match max_sample_bits {
32 => DecodingResult::new_f32(buffer_size, &self.limits),
64 => DecodingResult::new_f64(buffer_size, &self.limits),
32 => DecodingResult::new_f32(buffer_size, &limits),
64 => DecodingResult::new_f64(buffer_size, &limits),
n => Err(TiffError::UnsupportedError(
TiffUnsupportedError::UnsupportedBitsPerChannel(n),
)),
},
SampleFormat::Int => match max_sample_bits {
n if n <= 8 => DecodingResult::new_i8(buffer_size, &self.limits),
n if n <= 16 => DecodingResult::new_i16(buffer_size, &self.limits),
n if n <= 32 => DecodingResult::new_i32(buffer_size, &self.limits),
n if n <= 64 => DecodingResult::new_i64(buffer_size, &self.limits),
n if n <= 8 => DecodingResult::new_i8(buffer_size, &limits),
n if n <= 16 => DecodingResult::new_i16(buffer_size, &limits),
n if n <= 32 => DecodingResult::new_i32(buffer_size, &limits),
n if n <= 64 => DecodingResult::new_i64(buffer_size, &limits),
n => Err(TiffError::UnsupportedError(
TiffUnsupportedError::UnsupportedBitsPerChannel(n),
)),
Expand Down Expand Up @@ -1036,7 +1055,7 @@ impl<R: Read + Seek> Decoder<R> {
pub fn read_chunk(&mut self, chunk_index: u32) -> TiffResult<DecodingResult> {
let data_dims = self.image().chunk_data_dimensions(chunk_index)?;

let mut result = self.result_buffer(data_dims.0 as usize, data_dims.1 as usize)?;
let mut result = Self::result_buffer(data_dims.0 as usize, data_dims.1 as usize, self.image(), &self.limits)?;

self.read_chunk_to_buffer(result.as_buffer(0), chunk_index, data_dims.0 as usize)?;

Expand All @@ -1047,7 +1066,7 @@ impl<R: Read + Seek> Decoder<R> {
pub fn read_image(&mut self) -> TiffResult<DecodingResult> {
let width = self.image().width;
let height = self.image().height;
let mut result = self.result_buffer(width as usize, height as usize)?;
let mut result = Self::result_buffer(width as usize, height as usize, self.image(), &self.limits)?;
if width == 0 || height == 0 {
return Ok(result);
}
Expand Down
Loading

0 comments on commit 49c16cd

Please sign in to comment.