From 0570127e6aa5d0ec05a4f2702fa1db2d0025df2a Mon Sep 17 00:00:00 2001 From: __JM_Joy__ <918734043@qq.com> Date: Sun, 17 Nov 2019 16:44:54 +0800 Subject: [PATCH] Finish copy&paste sync code to async. --- .travis.yml | 8 +- Cargo.toml | 12 +-- README.md | 2 - src/client.rs | 151 ++++++++++++++++++++++++++++++++ src/lib.rs | 5 +- src/meta.rs | 116 +++++++++++++++++++++--- tests/async_client_post_test.rs | 49 +++++++++++ tests/async_client_test.rs | 45 ++++++++++ tests/common.rs | 10 +-- 9 files changed, 370 insertions(+), 28 deletions(-) create mode 100644 tests/async_client_post_test.rs create mode 100644 tests/async_client_test.rs diff --git a/.travis.yml b/.travis.yml index 7c2efea..0f92b06 100644 --- a/.travis.yml +++ b/.travis.yml @@ -12,4 +12,10 @@ before_install: - docker ps -a env: - - RUST_LOG=fastcgi_client \ No newline at end of file + - RUST_LOG=fastcgi_client + +script: + - cargo build --verbose + - cargo test --verbose + - cargo build --verbose --no-default-features + - cargo test --verbose --no-default-features diff --git a/Cargo.toml b/Cargo.toml index 67f9d82..5f0f15a 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -1,6 +1,6 @@ [package] name = "fastcgi-client" -version = "0.3.0" +version = "0.4.0" authors = ["__JM_Joy__ <918734043@qq.com>"] edition = "2018" description = "Fastcgi client implemented for Rust." @@ -16,17 +16,19 @@ exclude = [ ] [dependencies] -byteorder = "1.3.1" -log = "0.4.6" +async-std = {version = "1.0.1", optional = true} bufstream = "0.1.4" +byteorder = "1.3.1" error-chain = "0.12.1" -async-std = {version = "0.99", optional = true} +log = "0.4.6" [dev-dependencies] +async-attributes = "1.1.1" env_logger = "0.6.1" [build-dependencies] toml = "0.5.1" [features] -async-await = ["async-std"] +default = ["async_std"] +async_std = ["async-std"] diff --git a/README.md b/README.md index e9b1d82..295fc8a 100644 --- a/README.md +++ b/README.md @@ -8,8 +8,6 @@ Fastcgi client implemented for Rust. -**Notice: This crate is not productive yet, please do not use in production.** - ## Example ```rust diff --git a/src/client.rs b/src/client.rs index d249037..c1901f1 100644 --- a/src/client.rs +++ b/src/client.rs @@ -8,6 +8,9 @@ use log::info; use std::collections::HashMap; use std::io::{self, Read, Write}; +#[cfg(feature = "async_std")] +use async_std::io::{prelude::*, Read as AsyncRead, Write as AsyncWrite}; + /// Client for handling communication between fastcgi server. pub struct Client { keep_alive: bool, @@ -161,3 +164,151 @@ impl Client { self.outputs.get_mut(&id).ok_or_else(|| ErrorKind::RequestIdNotFound(id).into()) } } + +#[cfg(feature = "async_std")] +/// Async client for handling communication between fastcgi server. +pub struct AsyncClient { + keep_alive: bool, + stream: Box, + outputs: OutputMap, +} + +#[cfg(feature = "async_std")] +impl AsyncClient { + /// Construct a `AsyncClient` Object with stream (such as `async_std::net::TcpStream` or `async_std::os::unix::net::UnixStream`, + /// with buffered read/write for stream. + pub fn new(stream: S, keep_alive: bool) -> Self { + Self { + keep_alive, + stream: Box::new(stream), + outputs: HashMap::new(), + } + } + + /// Send request and receive response from fastcgi server. + /// - `params` fastcgi params. + /// - `body` always the http post or put body. + /// + /// return the output of fastcgi stdout and stderr. + pub async fn do_request<'a>(&mut self, params: &Params<'a>, body: &mut (dyn AsyncRead + Unpin)) -> ClientResult<&mut Output> { + let id = RequestIdGenerator.generate(); + self.handle_request(id, params, body).await?; + self.handle_response(id).await?; + Ok(self.outputs.get_mut(&id).ok_or_else(|| ErrorKind::RequestIdNotFound(id))?) + } + + async fn handle_request<'a>(&mut self, id: u16, params: &Params<'a>, body: &mut (dyn AsyncRead + Unpin)) -> ClientResult<()> { + let write_stream = &mut self.stream; + + info!("[id = {}] Start handle request.", id); + + let begin_request_rec = BeginRequestRec::new(id, Role::Responder, self.keep_alive)?; + info!("[id = {}] Send to stream: {:?}.", id, &begin_request_rec); + begin_request_rec.async_write_to_stream(write_stream).await?; + + let param_pairs = ParamPairs::new(params); + info!("[id = {}] Params will be sent: {:?}.", id, ¶m_pairs); + + Header::async_write_to_stream_batches( + RequestType::Params, + id, + write_stream, + &mut ¶m_pairs.to_content()?[..], + Some(|header| { + info!("[id = {}] Send to stream for Params: {:?}.", id, &header); + header + }), + ) + .await?; + + Header::async_write_to_stream_batches( + RequestType::Params, + id, + write_stream, + &mut async_std::io::empty(), + Some(|header| { + info!("[id = {}] Send to stream for Params: {:?}.", id, &header); + header + }), + ) + .await?; + + Header::async_write_to_stream_batches( + RequestType::Stdin, + id, + write_stream, + body, + Some(|header| { + info!("[id = {}] Send to stream for Stdin: {:?}.", id, &header); + header + }), + ) + .await?; + + Header::async_write_to_stream_batches( + RequestType::Stdin, + id, + write_stream, + &mut async_std::io::empty(), + Some(|header| { + info!("[id = {}] Send to stream for Stdin: {:?}.", id, &header); + header + }), + ) + .await?; + + write_stream.flush().await?; + + Ok(()) + } + + async fn handle_response(&mut self, id: u16) -> ClientResult<()> { + self.init_output(id); + + let global_end_request_rec: Option; + + loop { + let read_stream = &mut self.stream; + let header = Header::new_from_async_stream(read_stream).await?; + info!("[id = {}] Receive from stream: {:?}.", id, &header); + + if header.request_id != id { + return Err(ErrorKind::ResponseNotFound(id).into()); + } + + match header.r#type { + RequestType::Stdout => { + let content = header.read_content_from_async_stream(read_stream).await?; + self.get_output_mut(id)?.set_stdout(content) + } + RequestType::Stderr => { + let content = header.read_content_from_async_stream(read_stream).await?; + self.get_output_mut(id)?.set_stderr(content) + } + RequestType::EndRequest => { + let end_request_rec = EndRequestRec::from_async_header(&header, read_stream).await?; + info!("[id = {}] Receive from stream: {:?}.", id, &end_request_rec); + global_end_request_rec = Some(end_request_rec); + break; + } + r#type => return Err(ErrorKind::UnknownRequestType(r#type).into()), + } + } + + match global_end_request_rec { + Some(end_request_rec) => end_request_rec + .end_request + .protocol_status + .convert_to_client_result(end_request_rec.end_request.app_status), + None => unreachable!(), + } + } + + fn init_output(&mut self, id: u16) { + self.outputs.insert(id, Default::default()); + } + + fn get_output_mut(&mut self, id: u16) -> ClientResult<&mut Output> { + self.outputs.get_mut(&id).ok_or_else(|| ErrorKind::RequestIdNotFound(id).into()) + } +} diff --git a/src/lib.rs b/src/lib.rs index 30bce21..8556d21 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -55,8 +55,11 @@ mod params; pub use crate::client::Client; pub use crate::error::*; -pub use crate::meta::{Address, Output}; +pub use crate::meta::Output; pub use crate::params::Params; +#[cfg(feature = "async_std")] +pub use crate::client::AsyncClient; + /// Version of this crate. pub const VERSION: &'static str = env!("LIB_BUILD_VERSION"); diff --git a/src/meta.rs b/src/meta.rs index aaa881b..5c9d2a7 100644 --- a/src/meta.rs +++ b/src/meta.rs @@ -10,6 +10,12 @@ use std::io::{self, Read, Write}; use std::mem::size_of; use std::ops::{Deref, DerefMut}; +#[cfg(feature = "async_std")] +use async_std::io::{ + prelude::{ReadExt as AsyncReadExt, *}, + Read as AsyncRead, Write as AsyncWrite, +}; + pub(crate) const VERSION_1: u8 = 1; pub(crate) const MAX_LENGTH: usize = 0xffff; pub(crate) const HEADER_LEN: usize = size_of::
(); @@ -100,6 +106,38 @@ impl Header { Ok(()) } + #[cfg(feature = "async_std")] + pub(crate) async fn async_write_to_stream_batches( + r#type: RequestType, + request_id: u16, + writer: &mut (dyn AsyncWrite + Unpin), + content: &mut (dyn AsyncRead + Unpin), + before_write: Option, + ) -> io::Result<()> + where + F: Fn(Header) -> Header, + { + let mut buf: [u8; MAX_LENGTH] = [0; MAX_LENGTH]; + let mut had_writen = false; + + loop { + let read = content.read(&mut buf).await?; + if had_writen && (read == 0 || read < MAX_LENGTH) { + break; + } + + let buf = &buf[..read]; + let mut header = Self::new(r#type.clone(), request_id, buf); + if let Some(ref f) = before_write { + header = f(header); + } + header.write_to_async_stream(writer, buf).await?; + + had_writen = true; + } + Ok(()) + } + fn new(r#type: RequestType, request_id: u16, content: &[u8]) -> Self { let content_length = min(content.len(), MAX_LENGTH) as u16; Self { @@ -128,6 +166,23 @@ impl Header { Ok(()) } + #[cfg(feature = "async_std")] + async fn write_to_async_stream(self, writer: &mut (dyn AsyncWrite + Unpin), content: &[u8]) -> io::Result<()> { + let mut buf: Vec = Vec::new(); + buf.push(self.version); + buf.push(self.r#type as u8); + buf.write_u16::(self.request_id)?; + buf.write_u16::(self.content_length)?; + buf.push(self.padding_length); + buf.push(self.reserved); + + writer.write_all(&buf).await?; + writer.write_all(content).await?; + writer.write_all(&vec![0; self.padding_length as usize]).await?; + + Ok(()) + } + pub(crate) fn new_from_stream(reader: &mut dyn Read) -> io::Result { let mut buf: [u8; HEADER_LEN] = [0; HEADER_LEN]; reader.read_exact(&mut buf)?; @@ -142,6 +197,21 @@ impl Header { }) } + #[cfg(feature = "async_std")] + pub(crate) async fn new_from_async_stream(reader: &mut (dyn AsyncRead + Unpin)) -> io::Result { + let mut buf: [u8; HEADER_LEN] = [0; HEADER_LEN]; + reader.read_exact(&mut buf).await?; + + Ok(Self { + version: buf[0], + r#type: RequestType::from_u8(buf[1]), + request_id: (&buf[2..4]).read_u16::()?, + content_length: (&buf[4..6]).read_u16::()?, + padding_length: buf[6], + reserved: buf[7], + }) + } + pub(crate) fn read_content_from_stream(&self, reader: &mut dyn Read) -> io::Result> { let mut buf = vec![0; self.content_length as usize]; reader.read_exact(&mut buf)?; @@ -149,6 +219,15 @@ impl Header { reader.read_exact(&mut padding_buf)?; Ok(buf) } + + #[cfg(feature = "async_std")] + pub(crate) async fn read_content_from_async_stream(&self, reader: &mut (dyn AsyncRead + Unpin)) -> io::Result> { + let mut buf = vec![0; self.content_length as usize]; + reader.read_exact(&mut buf).await?; + let mut padding_buf = vec![0; self.padding_length as usize]; + reader.read_exact(&mut padding_buf).await?; + Ok(buf) + } } #[derive(Debug, Clone, Copy)] @@ -206,6 +285,11 @@ impl BeginRequestRec { pub(crate) fn write_to_stream(self, writer: &mut dyn Write) -> io::Result<()> { self.header.write_to_stream(writer, &self.content) } + + #[cfg(feature = "async_std")] + pub(crate) async fn async_write_to_stream(self, writer: &mut (dyn AsyncWrite + Unpin)) -> io::Result<()> { + self.header.write_to_async_stream(writer, &self.content).await + } } impl Debug for BeginRequestRec { @@ -359,7 +443,7 @@ impl EndRequestRec { let app_status = content.read_u32::()?; let protocol_status = ProtocolStatus::from_u8(content.read_u8()?); let mut reserved: [u8; 3] = [0; 3]; - content.read_exact(&mut reserved)?; + Read::read_exact(&mut content, &mut reserved)?; Ok(Self { header, @@ -370,19 +454,25 @@ impl EndRequestRec { }, }) } -} -/// Fastcgi server address. -#[derive(Debug)] -pub enum Address<'a> { - /// Tcp connection info. - /// - `0` host. - /// - `1` port. - Tcp(&'a str, u16), - #[cfg(unix)] - /// Unix socket info. - /// - `0` path of unix-sock file. - UnixSock(&'a str), + #[cfg(feature = "async_std")] + pub(crate) async fn from_async_header(header: &Header, reader: &mut (dyn AsyncRead + Unpin)) -> io::Result { + let header = header.clone(); + let mut content = &*header.read_content_from_async_stream(reader).await?; + let app_status = content.read_u32::()?; + let protocol_status = ProtocolStatus::from_u8(content.read_u8()?); + let mut reserved: [u8; 3] = [0; 3]; + AsyncReadExt::read_exact(&mut content, &mut reserved).await?; + + Ok(Self { + header, + end_request: EndRequest { + app_status, + protocol_status, + reserved, + }, + }) + } } pub(crate) type OutputMap = HashMap; diff --git a/tests/async_client_post_test.rs b/tests/async_client_post_test.rs new file mode 100644 index 0000000..93cec82 --- /dev/null +++ b/tests/async_client_post_test.rs @@ -0,0 +1,49 @@ +#![cfg(feature = "async_std")] + +use async_std::net::TcpStream; +use fastcgi_client::{AsyncClient, Params}; +use std::env::current_dir; + +mod common; + +#[async_attributes::test] +async fn test() { + common::setup(); + + let stream = TcpStream::connect(("127.0.0.1", 9000)).await.unwrap(); + let mut client = AsyncClient::new(stream, false); + + let document_root = current_dir().unwrap().join("tests").join("php"); + let document_root = document_root.to_str().unwrap(); + let script_name = current_dir().unwrap().join("tests").join("php").join("post.php"); + let script_name = script_name.to_str().unwrap(); + + let body = b"p1=3&p2=4"; + let len = format!("{}", body.len()); + + let params = Params::with_predefine() + .set_request_method("POST") + .set_document_root(document_root) + .set_script_name("/post.php") + .set_script_filename(script_name) + .set_request_uri("/post.php?g1=1&g2=2") + .set_query_string("g1=1&g2=2") + .set_document_uri("/post.php") + .set_remote_addr("127.0.0.1") + .set_remote_port("12345") + .set_server_addr("127.0.0.1") + .set_server_port("80") + .set_server_name("jmjoy-pc") + .set_content_type("application/x-www-form-urlencoded") + .set_content_length(&len); + let output = client.do_request(¶ms, &mut &body[..]).await.unwrap(); + + let stdout = String::from_utf8(output.get_stdout().unwrap_or(Default::default())).unwrap(); + assert!(stdout.contains("Content-type: text/html; charset=UTF-8")); + assert!(stdout.contains("\r\n\r\n")); + assert!(stdout.contains("1234")); + + let stderr = String::from_utf8(output.get_stderr().unwrap_or(Default::default())).unwrap(); + let stderr = dbg!(stderr); + assert!(stderr.contains("PHP message: PHP Fatal error: Uncaught Exception: TEST")); +} diff --git a/tests/async_client_test.rs b/tests/async_client_test.rs new file mode 100644 index 0000000..19b38ca --- /dev/null +++ b/tests/async_client_test.rs @@ -0,0 +1,45 @@ +#![cfg(feature = "async_std")] + +use async_std::io::{self, Read, Write}; +use async_std::net::TcpStream; +use fastcgi_client::{AsyncClient, Params}; +use std::env::current_dir; + +mod common; + +#[async_attributes::test] +async fn test() { + common::setup(); + + let stream = TcpStream::connect(("127.0.0.1", 9000)).await.unwrap(); + test_client(&mut AsyncClient::new(stream, false)).await; +} + +async fn test_client(client: &mut AsyncClient) { + let document_root = current_dir().unwrap().join("tests").join("php"); + let document_root = document_root.to_str().unwrap(); + let script_name = current_dir().unwrap().join("tests").join("php").join("index.php"); + let script_name = script_name.to_str().unwrap(); + + let params = Params::with_predefine() + .set_request_method("GET") + .set_document_root(document_root) + .set_script_name("/index.php") + .set_script_filename(script_name) + .set_request_uri("/index.php") + .set_document_uri("/index.php") + .set_remote_addr("127.0.0.1") + .set_remote_port("12345") + .set_server_addr("127.0.0.1") + .set_server_port("80") + .set_server_name("jmjoy-pc") + .set_content_type("") + .set_content_length("0"); + let output = client.do_request(¶ms, &mut io::empty()).await.unwrap(); + + let stdout = String::from_utf8(output.get_stdout().unwrap_or(Default::default())).unwrap(); + assert!(stdout.contains("Content-type: text/html; charset=UTF-8")); + assert!(stdout.contains("\r\n\r\n")); + assert!(stdout.contains("hello")); + assert_eq!(output.get_stderr(), None); +} diff --git a/tests/common.rs b/tests/common.rs index b7d58b7..b866c2f 100644 --- a/tests/common.rs +++ b/tests/common.rs @@ -1,12 +1,10 @@ -use std::sync::{Once, ONCE_INIT}; +use std::sync::Once; -#[allow(dead_code)] -static INIT: Once = ONCE_INIT; +static START: Once = Once::new(); /// Setup function that is only run once, even if called multiple times. -#[allow(dead_code)] -pub(crate) fn setup() { - INIT.call_once(|| { +pub fn setup() { + START.call_once(|| { env_logger::init(); }); }