From c9365350dd83116cd80b6ba86ee0a1c7d22fecd1 Mon Sep 17 00:00:00 2001 From: Sophie Tauchert Date: Thu, 14 Jul 2022 12:48:26 +0200 Subject: [PATCH] Implement async reader --- Cargo.toml | 17 + src/lib.rs | 2 + src/reader.rs | 4 + src/reader/async_reader.rs | 698 +++++++++++++++++++++++++++++++++++++ 4 files changed, 721 insertions(+) create mode 100644 src/reader/async_reader.rs diff --git a/Cargo.toml b/Cargo.toml index e13202e2..c88d0aaa 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -16,6 +16,8 @@ document-features = { version = "0.2", optional = true } encoding_rs = { version = "0.8", optional = true } serde = { version = "1.0", optional = true } memchr = "2.5" +tokio = { version = "1.19", optional = true, default-features = false, features = ["io-util"] } +async-recursion = { version = "1.0", optional = true } [dev-dependencies] criterion = "0.3" @@ -23,6 +25,8 @@ pretty_assertions = "1.2" regex = "1" serde = { version = "1.0", features = ["derive"] } serde-value = "0.7" +tokio = { version = "1.20", default-features = false, features = ["macros", "rt-multi-thread"] } +tokio-test = "0.4" [lib] bench = false @@ -101,6 +105,19 @@ serialize = ["serde"] ## Enables support for recognizing all [HTML 5 entities](https://dev.w3.org/html5/html-author/charref) escape-html = [] +## Enables support for asynchronous reading from `tokio`'s IO-Traits. +## +## This can be used for example with `Reader::from_async_reader(read)` where `read` +## is some type implementing `tokio::io::AsyncBufRead`. +async = ["tokio", "async-recursion"] + +## Enables support for asynchronous reading from files using `tokio`. This feature +## also automatically enables the `async` feature as well. +## +## This can be used for example with `Reader::from_file_async(path)` where `path` +## is a file path. +async-fs = ["async", "tokio/fs"] + [package.metadata.docs.rs] all-features = true diff --git a/src/lib.rs b/src/lib.rs index 26436786..be8903c0 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -158,5 +158,7 @@ mod writer; #[cfg(feature = "serialize")] pub use crate::errors::serialize::DeError; pub use crate::errors::{Error, Result}; +#[cfg(feature = "async")] +pub use crate::reader::AsyncReader; pub use crate::reader::{Decoder, IoReader, Reader, SliceReader}; pub use crate::writer::{ElementWriter, Writer}; diff --git a/src/reader.rs b/src/reader.rs index baf68b08..ce14a08d 100644 --- a/src/reader.rs +++ b/src/reader.rs @@ -13,9 +13,13 @@ use crate::name::{LocalName, NamespaceResolver, QName, ResolveResult}; use memchr; +#[cfg(feature = "async")] +mod async_reader; mod io_reader; mod slice_reader; +#[cfg(feature = "async")] +pub use self::async_reader::AsyncReader; pub use self::io_reader::IoReader; pub use self::slice_reader::SliceReader; diff --git a/src/reader/async_reader.rs b/src/reader/async_reader.rs new file mode 100644 index 00000000..30ea2580 --- /dev/null +++ b/src/reader/async_reader.rs @@ -0,0 +1,698 @@ +//! This is an implementation of [`Reader`] for reading from a [`AsyncRead`] or [`AsyncBufRead`] +//! as underlying byte stream. This reader fully implements async/await so reading can use +//! non-blocking I/O. + +use std::ops::{Deref, DerefMut}; +use std::path::Path; + +use async_recursion::async_recursion; +#[cfg(feature = "async-fs")] +use tokio::fs::File; +use tokio::io::{self, AsyncBufRead, AsyncBufReadExt, AsyncRead, BufReader}; + +use crate::events::{BytesText, Event}; +use crate::name::{QName, ResolveResult}; +use crate::{Error, Result}; + +#[cfg(feature = "encoding")] +use super::{detect_encoding, EncodingRef}; +use super::{is_whitespace, BangType, InnerReader, ReadElementState, Reader, TagState}; + +/// A struct for handling reading functions based on reading from a [`BufRead`]. +#[derive(Debug, Clone)] +pub struct AsyncReader(R); + +impl Deref for AsyncReader { + type Target = R; + + fn deref(&self) -> &Self::Target { + &self.0 + } +} + +impl DerefMut for AsyncReader { + fn deref_mut(&mut self) -> &mut Self::Target { + &mut self.0 + } +} + +impl InnerReader for AsyncReader { + type Reader = R; + + fn into_inner(self) -> Self::Reader { + self.0 + } +} + +/// Private reading functions. +impl AsyncReader { + #[inline] + async fn read_bytes_until<'buf>( + &mut self, + byte: u8, + buf: &'buf mut Vec, + position: &mut usize, + ) -> Result> { + let mut read = 0; + let mut done = false; + let start = buf.len(); + while !done { + let used = { + let available = match self.fill_buf().await { + Ok(n) if n.is_empty() => break, + Ok(n) => n, + Err(e) if e.kind() == io::ErrorKind::Interrupted => continue, + Err(e) => { + *position += read; + return Err(Error::Io(e)); + } + }; + + match memchr::memchr(byte, available) { + Some(i) => { + buf.extend_from_slice(&available[..i]); + done = true; + i + 1 + } + None => { + buf.extend_from_slice(available); + available.len() + } + } + }; + self.consume(used); + read += used; + } + *position += read; + + if read == 0 { + Ok(None) + } else { + Ok(Some(&buf[start..])) + } + } + + async fn read_bang_element<'buf>( + &mut self, + buf: &'buf mut Vec, + position: &mut usize, + ) -> Result> { + // Peeked one bang ('!') before being called, so it's guaranteed to + // start with it. + let start = buf.len(); + let mut read = 1; + buf.push(b'!'); + self.consume(1); + + let bang_type = BangType::new(self.peek_one().await?)?; + + loop { + match self.fill_buf().await { + // Note: Do not update position, so the error points to + // somewhere sane rather than at the EOF + Ok(n) if n.is_empty() => return Err(bang_type.to_err()), + Ok(available) => { + if let Some((consumed, used)) = bang_type.parse(available, read) { + buf.extend_from_slice(consumed); + + self.consume(used); + read += used; + + *position += read; + break; + } else { + buf.extend_from_slice(available); + + let used = available.len(); + self.consume(used); + read += used; + } + } + Err(ref e) if e.kind() == io::ErrorKind::Interrupted => continue, + Err(e) => { + *position += read; + return Err(Error::Io(e)); + } + } + } + + if read == 0 { + Ok(None) + } else { + Ok(Some((bang_type, &buf[start..]))) + } + } + + #[inline] + async fn read_element<'buf>( + &mut self, + buf: &'buf mut Vec, + position: &mut usize, + ) -> Result> { + let mut state = ReadElementState::Elem; + let mut read = 0; + + let start = buf.len(); + loop { + match self.fill_buf().await { + Ok(n) if n.is_empty() => break, + Ok(available) => { + if let Some((consumed, used)) = state.change(available) { + buf.extend_from_slice(consumed); + + self.consume(used); + read += used; + + *position += read; + break; + } else { + buf.extend_from_slice(available); + + let used = available.len(); + self.consume(used); + read += used; + } + } + Err(ref e) if e.kind() == io::ErrorKind::Interrupted => continue, + Err(e) => { + *position += read; + return Err(Error::Io(e)); + } + }; + } + + if read == 0 { + Ok(None) + } else { + Ok(Some(&buf[start..])) + } + } + + /// Consume and discard all the whitespace until the next non-whitespace + /// character or EOF. + async fn skip_whitespace(&mut self, position: &mut usize) -> Result<()> { + loop { + break match self.fill_buf().await { + Ok(n) => { + let count = n.iter().position(|b| !is_whitespace(*b)).unwrap_or(n.len()); + if count > 0 { + self.consume(count); + *position += count; + continue; + } else { + Ok(()) + } + } + Err(ref e) if e.kind() == io::ErrorKind::Interrupted => continue, + Err(e) => Err(Error::Io(e)), + }; + } + } + + /// Consume and discard one character if it matches the given byte. Return + /// true if it matched. + async fn skip_one(&mut self, byte: u8, position: &mut usize) -> Result { + match self.peek_one().await? { + Some(b) if b == byte => { + *position += 1; + self.consume(1); + Ok(true) + } + _ => Ok(false), + } + } + + /// Return one character without consuming it, so that future `read_*` calls + /// will still include it. On EOF, return None. + async fn peek_one(&mut self) -> Result> { + loop { + break match self.fill_buf().await { + Ok(n) if n.is_empty() => Ok(None), + Ok(n) => Ok(Some(n[0])), + Err(ref e) if e.kind() == io::ErrorKind::Interrupted => continue, + Err(e) => Err(Error::Io(e)), + }; + } + } +} + +/// Private functions for a [`Reader`] based on an [`AsyncReader`]. +impl Reader> { + /// Read text into the given buffer, and return an event that borrows from + /// either that buffer or from the input itself, based on the type of the + /// reader. + #[async_recursion] + async fn read_event_impl<'buf>(&mut self, buf: &'buf mut Vec) -> Result> { + let event = match self.tag_state { + TagState::Init => self.read_until_open(buf, true).await, + TagState::Closed => self.read_until_open(buf, false).await, + TagState::Opened => self.read_until_close(buf).await, + TagState::Empty => self.close_expanded_empty(), + TagState::Exit => return Ok(Event::Eof), + }; + match event { + Err(_) | Ok(Event::Eof) => self.tag_state = TagState::Exit, + _ => {} + } + event + } + + /// Read until '<' is found and moves reader to an `Opened` state. + /// + /// Return a `StartText` event if `first` is `true` and a `Text` event otherwise + async fn read_until_open<'buf>( + &mut self, + buf: &'buf mut Vec, + first: bool, + ) -> Result> { + self.tag_state = TagState::Opened; + + if self.trim_text_start { + self.reader.skip_whitespace(&mut self.buf_position).await?; + } + + // If we already at the `<` symbol, do not try to return an empty Text event + if self.reader.skip_one(b'<', &mut self.buf_position).await? { + return self.read_event_impl(buf).await; + } + + match self + .reader + .read_bytes_until(b'<', buf, &mut self.buf_position) + .await + { + Ok(Some(bytes)) => { + #[cfg(feature = "encoding")] + if first && self.encoding.can_be_refined() { + if let Some(encoding) = detect_encoding(bytes) { + self.encoding = EncodingRef::BomDetected(encoding); + } + } + + let content = if self.trim_text_end { + // Skip the ending '< + let len = bytes + .iter() + .rposition(|&b| !is_whitespace(b)) + .map_or_else(|| bytes.len(), |p| p + 1); + &bytes[..len] + } else { + bytes + }; + + Ok(if first { + Event::StartText(BytesText::from_escaped(content).into()) + } else { + Event::Text(BytesText::from_escaped(content)) + }) + } + Ok(None) => Ok(Event::Eof), + Err(e) => Err(e), + } + } + + /// Private function to read until `>` is found. This function expects that + /// it was called just after encounter a `<` symbol. + async fn read_until_close<'buf>(&mut self, buf: &'buf mut Vec) -> Result> { + self.tag_state = TagState::Closed; + + match self.reader.peek_one().await { + // ` match self + .reader + .read_bang_element(buf, &mut self.buf_position) + .await + { + Ok(None) => Ok(Event::Eof), + Ok(Some((bang_type, bytes))) => self.read_bang(bang_type, bytes), + Err(e) => Err(e), + }, + // ` match self + .reader + .read_bytes_until(b'>', buf, &mut self.buf_position) + .await + { + Ok(None) => Ok(Event::Eof), + Ok(Some(bytes)) => self.read_end(bytes), + Err(e) => Err(e), + }, + // ` match self + .reader + .read_bytes_until(b'>', buf, &mut self.buf_position) + .await + { + Ok(None) => Ok(Event::Eof), + Ok(Some(bytes)) => self.read_question_mark(bytes), + Err(e) => Err(e), + }, + // `<...` - opening or self-closed tag + Ok(Some(_)) => match self.reader.read_element(buf, &mut self.buf_position).await { + Ok(None) => Ok(Event::Eof), + Ok(Some(bytes)) => self.read_start(bytes), + Err(e) => Err(e), + }, + Ok(None) => Ok(Event::Eof), + Err(e) => Err(e), + } + } +} + +/// Builder for reading from a file. Gated behind the `async-fs` feature. +#[cfg(feature = "async-fs")] +impl Reader>> { + /// Creates an XML reader from a file path. + pub async fn from_file_async>(path: P) -> Result { + let file = File::open(path).await.map_err(Error::Io)?; + let reader = BufReader::new(file); + Ok(Self::from_reader_internal(AsyncReader(reader))) + } +} + +/// Builder for reading from any [`BufRead`]. +impl Reader> { + /// Creates an XML reader from any type implementing [`Read`]. + pub fn from_async_reader(reader: R) -> Self { + Self::from_reader_internal(AsyncReader(reader)) + } +} + +/// Builder for reading from any [`Read`]. +impl Reader>> { + /// Creates an XML reader from any type implementing [`Read`]. + pub fn from_async_unbuffered_reader(reader: R) -> Self { + Self::from_reader_internal(AsyncReader(BufReader::new(reader))) + } +} + +/// Public reading methods for a [`Reader`] based on an [`AsyncReader`]. +impl Reader> { + /// Reads the next `Event` asynchronously. + /// + /// This is the main entry point for reading XML `Event`s when using an async reader. + /// + /// `Event`s borrow `buf` and can be converted to own their data if needed (uses `Cow` + /// internally). + /// + /// Having the possibility to control the internal buffers gives you some additional benefits + /// such as: + /// + /// - Reduce the number of allocations by reusing the same buffer. For constrained systems, + /// you can call `buf.clear()` once you are done with processing the event (typically at the + /// end of your loop). + /// - Reserve the buffer length if you know the file size (using `Vec::with_capacity`). + /// + /// # Examples + /// + /// ``` + /// # tokio_test::block_on(async move { + /// use quick_xml::Reader; + /// use quick_xml::events::Event; + /// + /// let xml = r#" + /// Test + /// Test 2 + /// "#; + /// // This explicitly uses `from_reader(xml.as_bytes())` to use a buffered reader instead of + /// // relying on the zero-copy optimizations for reading from byte slices. + /// let mut reader = Reader::from_async_reader(xml.as_bytes()); + /// reader.trim_text(true); + /// let mut count = 0; + /// let mut buf = Vec::new(); + /// let mut txt = Vec::new(); + /// loop { + /// match reader.read_event_into_async(&mut buf).await { + /// Ok(Event::Start(_)) => count += 1, + /// Ok(Event::Text(e)) => txt.push(e.decode_and_unescape(&reader).unwrap().into_owned()), + /// Err(e) => panic!("Error at position {}: {:?}", reader.buffer_position(), e), + /// Ok(Event::Eof) => break, + /// _ => (), + /// } + /// buf.clear(); + /// } + /// println!("Found {} start events", count); + /// println!("Text events: {:?}", txt); + /// # }); + /// ``` + #[inline] + pub async fn read_event_into_async<'buf>( + &mut self, + buf: &'buf mut Vec, + ) -> Result> { + self.read_event_impl(buf).await + } + + /// Reads asynchronously until end element is found using provided buffer as + /// intermediate storage for events content. This function is supposed to be + /// called after you already read a [`Start`] event. + /// + /// Manages nested cases where parent and child elements have the same name. + /// + /// If corresponding [`End`] event will not be found, the [`Error::UnexpectedEof`] + /// will be returned. In particularly, that error will be returned if you call + /// this method without consuming the corresponding [`Start`] event first. + /// + /// If your reader created from a string slice or byte array slice, it is + /// better to use [`read_to_end()`] method, because it will not copy bytes + /// into intermediate buffer. + /// + /// The provided `buf` buffer will be filled only by one event content at time. + /// Before reading of each event the buffer will be cleared. If you know an + /// appropriate size of each event, you can preallocate the buffer to reduce + /// number of reallocations. + /// + /// The `end` parameter should contain name of the end element _in the reader + /// encoding_. It is good practice to always get that parameter using + /// [`BytesStart::to_end()`] method. + /// + /// The correctness of the skipped events does not checked, if you disabled + /// the [`check_end_names`] option. + /// + /// # Namespaces + /// + /// While the [`Reader`] does not support namespace resolution, namespaces + /// does not change the algorithm for comparing names. Although the names + /// `a:name` and `b:name` where both prefixes `a` and `b` resolves to the + /// same namespace, are semantically equivalent, `` cannot close + /// ``, because according to [the specification] + /// + /// > The end of every element that begins with a **start-tag** MUST be marked + /// > by an **end-tag** containing a name that echoes the element's type as + /// > given in the **start-tag** + /// + /// # Examples + /// + /// This example shows, how you can skip XML content after you read the + /// start event. + /// + /// ``` + /// # tokio_test::block_on(async move { + /// # use pretty_assertions::assert_eq; + /// use quick_xml::events::{BytesStart, Event}; + /// use quick_xml::Reader; + /// + /// let mut reader = Reader::from_async_reader(r#" + /// + /// + /// + /// + /// + /// + /// + /// + /// "#.as_bytes()); + /// reader.trim_text(true); + /// let mut buf = Vec::new(); + /// + /// let start = BytesStart::borrowed_name(b"outer"); + /// let end = start.to_end().into_owned(); + /// + /// // First, we read a start event... + /// assert_eq!(reader.read_event_into_async(&mut buf).await.unwrap(), Event::Start(start)); + /// + /// //...then, we could skip all events to the corresponding end event. + /// // This call will correctly handle nested elements. + /// // Note, however, that this method does not handle namespaces. + /// reader.read_to_end_into_async(end.name(), &mut buf).await.unwrap(); + /// + /// // At the end we should get an Eof event, because we ate the whole XML + /// assert_eq!(reader.read_event_into_async(&mut buf).await.unwrap(), Event::Eof); + /// # }); + /// ``` + /// + /// [`Start`]: Event::Start + /// [`End`]: Event::End + /// [`read_to_end()`]: Self::read_to_end + /// [`check_end_names`]: Self::check_end_names + /// [the specification]: https://www.w3.org/TR/xml11/#dt-etag + pub async fn read_to_end_into_async<'_self, 'buf, 'name>( + &'_self mut self, + end: QName<'name>, + buf: &'buf mut Vec, + ) -> Result<()> { + let mut depth = 0; + loop { + buf.clear(); + match self.read_event_into_async(buf).await { + Err(e) => return Err(e), + + Ok(Event::Start(e)) if e.name() == end => depth += 1, + Ok(Event::End(e)) if e.name() == end => { + if depth == 0 { + return Ok(()); + } + depth -= 1; + } + Ok(Event::Eof) => { + let name = self.decoder().decode(end.as_ref()); + return Err(Error::UnexpectedEof(format!("", name))); + } + _ => (), + } + } + } + + /// Reads optional text between start and end tags asnychronously. + /// + /// If the next event is a [`Text`] event, returns the decoded and unescaped content as a + /// `String`. If the next event is an [`End`] event, returns the empty string. In all other + /// cases, returns an error. + /// + /// Any text will be decoded using the XML encoding specified in the XML declaration (or UTF-8 + /// if none is specified). + /// + /// # Examples + /// + /// ``` + /// # tokio_test::block_on(async move { + /// # use pretty_assertions::assert_eq; + /// use quick_xml::Reader; + /// use quick_xml::events::Event; + /// + /// let mut xml = Reader::from_async_reader(b" + /// <b> + /// + /// " as &[u8]); + /// xml.trim_text(true); + /// let mut buf = Vec::new(); + /// + /// let expected = ["", ""]; + /// for &content in expected.iter() { + /// match xml.read_event_into_async(&mut buf).await { + /// Ok(Event::Start(ref e)) => { + /// assert_eq!(&xml.read_text_into_async(e.name(), &mut Vec::new()).await.unwrap(), content); + /// }, + /// e => panic!("Expecting Start event, found {:?}", e), + /// } + /// buf.clear(); + /// } + /// # }); + /// ``` + /// + /// [`Text`]: Event::Text + /// [`End`]: Event::End + pub async fn read_text_into_async<'_self, 'name, 'buf>( + &'_self mut self, + end: QName<'name>, + buf: &'buf mut Vec, + ) -> Result { + let s = match self.read_event_into_async(buf).await { + Err(e) => return Err(e), + + Ok(Event::Text(e)) => e.decode_and_unescape(self)?.into_owned(), + Ok(Event::End(e)) if e.name() == end => return Ok("".to_string()), + Ok(Event::Eof) => return Err(Error::UnexpectedEof("Text".to_string())), + _ => return Err(Error::TextNotFound), + }; + self.read_to_end_into_async(end, buf).await?; + Ok(s) + } + + /// Reads the next event and resolves its namespace (if applicable) asynchronously. + /// + /// # Examples + /// + /// ``` + /// # tokio_test::block_on(async move { + /// use std::str::from_utf8; + /// use quick_xml::Reader; + /// use quick_xml::events::Event; + /// use quick_xml::name::ResolveResult::*; + /// + /// let xml = r#" + /// Test + /// Test 2 + /// "#; + /// let mut reader = Reader::from_async_reader(xml.as_bytes()); + /// reader.trim_text(true); + /// let mut count = 0; + /// let mut buf = Vec::new(); + /// let mut ns_buf = Vec::new(); + /// let mut txt = Vec::new(); + /// loop { + /// match reader.read_namespaced_event_async(&mut buf, &mut ns_buf).await { + /// Ok((Bound(ns), Event::Start(e))) => { + /// count += 1; + /// match (ns.as_ref(), e.local_name().as_ref()) { + /// (b"www.xxxx", b"tag1") => (), + /// (b"www.yyyy", b"tag2") => (), + /// (ns, n) => panic!("Namespace and local name mismatch"), + /// } + /// println!("Resolved namespace: {:?}", ns); + /// } + /// Ok((Unbound, Event::Start(_))) => { + /// panic!("Element not in any namespace") + /// }, + /// Ok((Unknown(p), Event::Start(_))) => { + /// panic!("Undeclared namespace prefix {:?}", String::from_utf8(p)) + /// } + /// Ok((_, Event::Text(e))) => { + /// txt.push(e.decode_and_unescape(&reader).unwrap().into_owned()) + /// }, + /// Err(e) => panic!("Error at position {}: {:?}", reader.buffer_position(), e), + /// Ok((_, Event::Eof)) => break, + /// _ => (), + /// } + /// buf.clear(); + /// } + /// println!("Found {} start events", count); + /// println!("Text events: {:?}", txt); + /// # }); + /// ``` + pub async fn read_namespaced_event_async<'b, 'ns>( + &mut self, + buf: &'b mut Vec, + namespace_buffer: &'ns mut Vec, + ) -> Result<(ResolveResult<'ns>, Event<'b>)> { + if self.pending_pop { + self.ns_resolver.pop(namespace_buffer); + } + self.pending_pop = false; + let event = self.read_event_into_async(buf).await; + self.resolve_namespaced_event_inner(event, namespace_buffer) + } +} + +#[cfg(test)] +mod test { + use super::*; + use crate::reader::test::check; + + fn input_from_bytes(bytes: &[u8]) -> AsyncReader<&[u8]> { + AsyncReader(bytes) + } + + fn reader_from_str(s: &str) -> Reader> { + Reader::from_async_reader(s.as_bytes()) + } + + #[allow(dead_code)] + fn reader_from_bytes(s: &[u8]) -> Reader> { + Reader::from_async_reader(s) + } + + check!(#[tokio::test] async { + let mut buf = Vec::new(); await + }); +}