Skip to content

Commit

Permalink
Add response stream. (#15)
Browse files Browse the repository at this point in the history
  • Loading branch information
jmjoy authored Jun 13, 2023
1 parent 6f68214 commit 0b3a6b3
Show file tree
Hide file tree
Showing 10 changed files with 504 additions and 27 deletions.
2 changes: 1 addition & 1 deletion .rustfmt.toml
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@
# See the License for the specific language governing permissions and
# limitations under the License.

fn_args_layout = "Compressed"
fn_params_layout = "Compressed"
format_code_in_doc_comments = true
format_macro_bodies = true
format_macro_matchers = true
Expand Down
74 changes: 73 additions & 1 deletion src/client.rs
Original file line number Diff line number Diff line change
Expand Up @@ -17,13 +17,14 @@ use crate::{
meta::{BeginRequestRec, EndRequestRec, Header, ParamPairs, RequestType, Role},
params::Params,
request::Request,
response::ResponseStream,
ClientError, ClientResult, Response,
};
use std::marker::PhantomData;
use tokio::io::{AsyncRead, AsyncWrite, AsyncWriteExt};
use tracing::debug;

/// I refer to nignx fastcgi implementation, found the request id is always 1.
/// I refer to nginx fastcgi implementation, found the request id is always 1.
///
/// <https://github.com/nginx/nginx/blob/f7ea8c76b55f730daa3b63f5511feb564b44d901/src/http/modules/ngx_http_fastcgi_module.c>
const REQUEST_ID: u16 = 1;
Expand Down Expand Up @@ -51,6 +52,40 @@ impl<S: AsyncRead + AsyncWrite + Unpin> Client<S, ShortConn> {
) -> ClientResult<Response> {
self.inner_execute(request).await
}

/// Send request and receive response stream from fastcgi server, under
/// short connection mode.
///
/// # Examples
///
/// ```
/// use fastcgi_client::{response::Content, Client, Params, Request};
/// use tokio::{io, net::TcpStream};
///
/// async fn stream() {
/// let stream = TcpStream::connect(("127.0.0.1", 9000)).await.unwrap();
/// let client = Client::new(stream);
/// let mut stream = client
/// .execute_once_stream(Request::new(Params::default(), &mut io::empty()))
/// .await
/// .unwrap();
///
/// while let Some(content) = stream.next().await {
/// let content = content.unwrap();
///
/// match content {
/// Content::Stdout(out) => todo!(),
/// Content::Stderr(out) => todo!(),
/// }
/// }
/// }
/// ```
pub async fn execute_once_stream<'a, I: AsyncRead + Unpin>(
mut self, request: Request<'_, I>,
) -> ClientResult<ResponseStream<S>> {
Self::handle_request(&mut self.stream, REQUEST_ID, request.params, request.stdin).await?;
Ok(ResponseStream::new(self.stream, REQUEST_ID))
}
}

impl<S: AsyncRead + AsyncWrite + Unpin> Client<S, KeepAlive> {
Expand All @@ -70,6 +105,43 @@ impl<S: AsyncRead + AsyncWrite + Unpin> Client<S, KeepAlive> {
) -> ClientResult<Response> {
self.inner_execute(request).await
}

/// Send request and receive response stream from fastcgi server, under
/// keep alive connection mode.
///
/// # Examples
///
/// ```
/// use fastcgi_client::{response::Content, Client, Params, Request};
/// use tokio::{io, net::TcpStream};
///
/// async fn stream() {
/// let stream = TcpStream::connect(("127.0.0.1", 9000)).await.unwrap();
/// let mut client = Client::new_keep_alive(stream);
///
/// for _ in (0..3) {
/// let mut stream = client
/// .execute_stream(Request::new(Params::default(), &mut io::empty()))
/// .await
/// .unwrap();
///
/// while let Some(content) = stream.next().await {
/// let content = content.unwrap();
///
/// match content {
/// Content::Stdout(out) => todo!(),
/// Content::Stderr(out) => todo!(),
/// }
/// }
/// }
/// }
/// ```
pub async fn execute_stream<I: AsyncRead + Unpin>(
&mut self, request: Request<'_, I>,
) -> ClientResult<ResponseStream<&mut S>> {
Self::handle_request(&mut self.stream, REQUEST_ID, request.params, request.stdin).await?;
Ok(ResponseStream::new(&mut self.stream, REQUEST_ID))
}
}

impl<S: AsyncRead + AsyncWrite + Unpin, M: Mode> Client<S, M> {
Expand Down
40 changes: 26 additions & 14 deletions src/meta.rs
Original file line number Diff line number Diff line change
Expand Up @@ -91,11 +91,11 @@ impl Header {
W: AsyncWrite + Unpin,
{
let mut buf: [u8; MAX_LENGTH] = [0; MAX_LENGTH];
let mut had_writen = false;
let mut had_written = false;

loop {
let read = content.read(&mut buf).await?;
if had_writen && read == 0 {
if had_written && read == 0 {
break;
}

Expand All @@ -106,7 +106,7 @@ impl Header {
}
header.write_to_stream(writer, buf).await?;

had_writen = true;
had_written = true;
}
Ok(())
}
Expand Down Expand Up @@ -147,14 +147,19 @@ impl Header {
let mut buf: [u8; HEADER_LEN] = [0; HEADER_LEN];
reader.read_exact(&mut buf).await?;

Ok(Self {
Ok(Self::new_from_buf(&buf))
}

#[inline]
pub(crate) fn new_from_buf(buf: &[u8; HEADER_LEN]) -> Self {
Self {
version: buf[0],
r#type: RequestType::from_u8(buf[1]),
request_id: (&buf[2..4]).read_u16().await?,
content_length: (&buf[4..6]).read_u16().await?,
request_id: be_buf_to_u16(&buf[2..4]),
content_length: be_buf_to_u16(&buf[4..6]),
padding_length: buf[6],
reserved: buf[7],
})
}
}

pub(crate) async fn read_content_from_stream<R: AsyncRead + Unpin>(
Expand Down Expand Up @@ -386,19 +391,26 @@ impl EndRequestRec {
header: &Header, reader: &mut R,
) -> io::Result<Self> {
let header = header.clone();
let mut content = &*header.read_content_from_stream(reader).await?;
let app_status = content.read_u32().await?;
let protocol_status = ProtocolStatus::from_u8(content.read_u8().await?);
let mut reserved: [u8; 3] = [0; 3];
AsyncReadExt::read_exact(&mut content, &mut reserved).await?;
let content = &*header.read_content_from_stream(reader).await?;
Ok(Self::new_from_buf(header, content))
}

Ok(Self {
pub(crate) fn new_from_buf(header: Header, buf: &[u8]) -> Self {
let app_status = u32::from_be_bytes(<[u8; 4]>::try_from(&buf[0..4]).unwrap());
let protocol_status =
ProtocolStatus::from_u8(u8::from_be_bytes(<[u8; 1]>::try_from(&buf[4..5]).unwrap()));
let reserved = <[u8; 3]>::try_from(&buf[5..8]).unwrap();
Self {
header,
end_request: EndRequest {
app_status,
protocol_status,
reserved,
},
})
}
}
}

fn be_buf_to_u16(buf: &[u8]) -> u16 {
u16::from_be_bytes(<[u8; 2]>::try_from(buf).unwrap())
}
183 changes: 182 additions & 1 deletion src/response.rs
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,13 @@
// See the License for the specific language governing permissions and
// limitations under the License.

use std::{fmt, fmt::Debug, str};
use crate::{
meta::{EndRequestRec, Header, RequestType},
ClientError, ClientResult,
};
use std::{cmp::min, fmt, fmt::Debug, str};
use tokio::io::{AsyncRead, AsyncReadExt};
use tracing::debug;

/// Output of fastcgi request, contains STDOUT and STDERR.
#[derive(Default, Clone)]
Expand All @@ -30,3 +36,178 @@ impl Debug for Response {
.finish()
}
}

pub enum Content<'a> {
Stdout(&'a [u8]),
Stderr(&'a [u8]),
}

#[derive(PartialEq)]
enum ReadStep {
Content,
Padding,
}

/// Generated by
/// [Client::execute_once_stream](crate::client::Client::execute_once_stream) or
/// [Client::execute_stream](crate::client::Client::execute_stream).
///
/// The [ResponseStream] does not implement `futures::Stream`, because
/// `futures::Stream` does not yet support GAT, so manually provide the
/// [next](ResponseStream::next) method, which support the `while let` syntax.
pub struct ResponseStream<S: AsyncRead + Unpin> {
stream: S,
id: u16,

ended: bool,

header: Option<Header>,

content_buf: Vec<u8>,
content_read: usize,

read_step: ReadStep,
}

impl<S: AsyncRead + Unpin> ResponseStream<S> {
#[inline]
pub(crate) fn new(stream: S, id: u16) -> Self {
Self {
stream,
id,
ended: false,
header: None,
content_buf: vec![0; 4096],
content_read: 0,
read_step: ReadStep::Content,
}
}

pub async fn next(&mut self) -> Option<ClientResult<Content<'_>>> {
if self.ended {
return None;
}

loop {
if self.header.is_none() {
match Header::new_from_stream(&mut self.stream).await {
Ok(header) => {
self.header = Some(header);
}
Err(err) => {
self.ended = true;
return Some(Err(err.into()));
}
};
}

let header = self.header.as_ref().unwrap();

match header.r#type.clone() {
RequestType::Stdout => match self.read_step {
ReadStep::Content => {
return self
.read_to_content(
header.content_length as usize,
Content::Stdout,
Self::prepare_for_read_padding,
)
.await;
}
ReadStep::Padding => {
self.read_to_content(
header.padding_length as usize,
Content::Stdout,
Self::prepare_for_read_header,
)
.await;
continue;
}
},
RequestType::Stderr => match self.read_step {
ReadStep::Content => {
return self
.read_to_content(
header.content_length as usize,
Content::Stderr,
Self::prepare_for_read_padding,
)
.await;
}
ReadStep::Padding => {
self.read_to_content(
header.padding_length as usize,
Content::Stderr,
Self::prepare_for_read_header,
)
.await;
continue;
}
},
RequestType::EndRequest => {
let end_request_rec =
match EndRequestRec::from_header(header, &mut self.stream).await {
Ok(rec) => rec,
Err(err) => {
self.ended = true;
return Some(Err(err.into()));
}
};
debug!(id = self.id, ?end_request_rec, "Receive from stream.");

self.ended = true;

return match end_request_rec
.end_request
.protocol_status
.convert_to_client_result(end_request_rec.end_request.app_status)
{
Ok(_) => None,
Err(err) => Some(Err(err)),
};
}
r#type => {
self.ended = true;
return Some(Err(ClientError::UnknownRequestType {
request_type: r#type,
}));
}
}
}
}

async fn read_to_content<'a, T: 'a>(
&'a mut self, length: usize, content_fn: impl FnOnce(&'a [u8]) -> T,
prepare_for_next_fn: impl FnOnce(&mut Self),
) -> Option<ClientResult<T>> {
let content_len = self.content_buf.len();
let read = match self
.stream
.read(&mut self.content_buf[..min(content_len, length - self.content_read)])
.await
{
Ok(read) => read,
Err(err) => {
self.ended = true;
return Some(Err(err.into()));
}
};

self.content_read += read;
if self.content_read >= length {
self.content_read = 0;
prepare_for_next_fn(self);
}

Some(Ok(content_fn(&self.content_buf[..read])))
}

fn prepare_for_read_padding(&mut self) {
self.read_step = ReadStep::Padding;
}

fn prepare_for_read_header(&mut self) {
self.header = None;
self.read_step = ReadStep::Content;
}
}
Loading

0 comments on commit 0b3a6b3

Please sign in to comment.