Skip to content

Commit

Permalink
Finish copy&paste sync code to async.
Browse files Browse the repository at this point in the history
  • Loading branch information
jmjoy committed Nov 17, 2019
1 parent b0fd83f commit 0570127
Show file tree
Hide file tree
Showing 9 changed files with 370 additions and 28 deletions.
8 changes: 7 additions & 1 deletion .travis.yml
Original file line number Diff line number Diff line change
Expand Up @@ -12,4 +12,10 @@ before_install:
- docker ps -a

env:
- RUST_LOG=fastcgi_client
- RUST_LOG=fastcgi_client

script:
- cargo build --verbose
- cargo test --verbose
- cargo build --verbose --no-default-features
- cargo test --verbose --no-default-features
12 changes: 7 additions & 5 deletions Cargo.toml
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
[package]
name = "fastcgi-client"
version = "0.3.0"
version = "0.4.0"
authors = ["__JM_Joy__ <[email protected]>"]
edition = "2018"
description = "Fastcgi client implemented for Rust."
Expand All @@ -16,17 +16,19 @@ exclude = [
]

[dependencies]
byteorder = "1.3.1"
log = "0.4.6"
async-std = {version = "1.0.1", optional = true}
bufstream = "0.1.4"
byteorder = "1.3.1"
error-chain = "0.12.1"
async-std = {version = "0.99", optional = true}
log = "0.4.6"

[dev-dependencies]
async-attributes = "1.1.1"
env_logger = "0.6.1"

[build-dependencies]
toml = "0.5.1"

[features]
async-await = ["async-std"]
default = ["async_std"]
async_std = ["async-std"]
2 changes: 0 additions & 2 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -8,8 +8,6 @@

Fastcgi client implemented for Rust.

**Notice: This crate is not productive yet, please do not use in production.**

## Example

```rust
Expand Down
151 changes: 151 additions & 0 deletions src/client.rs
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,9 @@ use log::info;
use std::collections::HashMap;
use std::io::{self, Read, Write};

#[cfg(feature = "async_std")]
use async_std::io::{prelude::*, Read as AsyncRead, Write as AsyncWrite};

/// Client for handling communication between fastcgi server.
pub struct Client<S: Read + Write + Send + Sync> {
keep_alive: bool,
Expand Down Expand Up @@ -161,3 +164,151 @@ impl<S: Read + Write + Send + Sync> Client<S> {
self.outputs.get_mut(&id).ok_or_else(|| ErrorKind::RequestIdNotFound(id).into())
}
}

#[cfg(feature = "async_std")]
/// Async client for handling communication between fastcgi server.
pub struct AsyncClient<S: AsyncRead + AsyncWrite + Send + Sync + Unpin> {
keep_alive: bool,
stream: Box<S>,
outputs: OutputMap,
}

#[cfg(feature = "async_std")]
impl<S: AsyncRead + AsyncWrite + Send + Sync + Unpin> AsyncClient<S> {
/// Construct a `AsyncClient` Object with stream (such as `async_std::net::TcpStream` or `async_std::os::unix::net::UnixStream`,
/// with buffered read/write for stream.
pub fn new(stream: S, keep_alive: bool) -> Self {
Self {
keep_alive,
stream: Box::new(stream),
outputs: HashMap::new(),
}
}

/// Send request and receive response from fastcgi server.
/// - `params` fastcgi params.
/// - `body` always the http post or put body.
///
/// return the output of fastcgi stdout and stderr.
pub async fn do_request<'a>(&mut self, params: &Params<'a>, body: &mut (dyn AsyncRead + Unpin)) -> ClientResult<&mut Output> {
let id = RequestIdGenerator.generate();
self.handle_request(id, params, body).await?;
self.handle_response(id).await?;
Ok(self.outputs.get_mut(&id).ok_or_else(|| ErrorKind::RequestIdNotFound(id))?)
}

async fn handle_request<'a>(&mut self, id: u16, params: &Params<'a>, body: &mut (dyn AsyncRead + Unpin)) -> ClientResult<()> {
let write_stream = &mut self.stream;

info!("[id = {}] Start handle request.", id);

let begin_request_rec = BeginRequestRec::new(id, Role::Responder, self.keep_alive)?;
info!("[id = {}] Send to stream: {:?}.", id, &begin_request_rec);
begin_request_rec.async_write_to_stream(write_stream).await?;

let param_pairs = ParamPairs::new(params);
info!("[id = {}] Params will be sent: {:?}.", id, &param_pairs);

Header::async_write_to_stream_batches(
RequestType::Params,
id,
write_stream,
&mut &param_pairs.to_content()?[..],
Some(|header| {
info!("[id = {}] Send to stream for Params: {:?}.", id, &header);
header
}),
)
.await?;

Header::async_write_to_stream_batches(
RequestType::Params,
id,
write_stream,
&mut async_std::io::empty(),
Some(|header| {
info!("[id = {}] Send to stream for Params: {:?}.", id, &header);
header
}),
)
.await?;

Header::async_write_to_stream_batches(
RequestType::Stdin,
id,
write_stream,
body,
Some(|header| {
info!("[id = {}] Send to stream for Stdin: {:?}.", id, &header);
header
}),
)
.await?;

Header::async_write_to_stream_batches(
RequestType::Stdin,
id,
write_stream,
&mut async_std::io::empty(),
Some(|header| {
info!("[id = {}] Send to stream for Stdin: {:?}.", id, &header);
header
}),
)
.await?;

write_stream.flush().await?;

Ok(())
}

async fn handle_response(&mut self, id: u16) -> ClientResult<()> {
self.init_output(id);

let global_end_request_rec: Option<EndRequestRec>;

loop {
let read_stream = &mut self.stream;
let header = Header::new_from_async_stream(read_stream).await?;
info!("[id = {}] Receive from stream: {:?}.", id, &header);

if header.request_id != id {
return Err(ErrorKind::ResponseNotFound(id).into());
}

match header.r#type {
RequestType::Stdout => {
let content = header.read_content_from_async_stream(read_stream).await?;
self.get_output_mut(id)?.set_stdout(content)
}
RequestType::Stderr => {
let content = header.read_content_from_async_stream(read_stream).await?;
self.get_output_mut(id)?.set_stderr(content)
}
RequestType::EndRequest => {
let end_request_rec = EndRequestRec::from_async_header(&header, read_stream).await?;
info!("[id = {}] Receive from stream: {:?}.", id, &end_request_rec);
global_end_request_rec = Some(end_request_rec);
break;
}
r#type => return Err(ErrorKind::UnknownRequestType(r#type).into()),
}
}

match global_end_request_rec {
Some(end_request_rec) => end_request_rec
.end_request
.protocol_status
.convert_to_client_result(end_request_rec.end_request.app_status),
None => unreachable!(),
}
}

fn init_output(&mut self, id: u16) {
self.outputs.insert(id, Default::default());
}

fn get_output_mut(&mut self, id: u16) -> ClientResult<&mut Output> {
self.outputs.get_mut(&id).ok_or_else(|| ErrorKind::RequestIdNotFound(id).into())
}
}
5 changes: 4 additions & 1 deletion src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -55,8 +55,11 @@ mod params;

pub use crate::client::Client;
pub use crate::error::*;
pub use crate::meta::{Address, Output};
pub use crate::meta::Output;
pub use crate::params::Params;

#[cfg(feature = "async_std")]
pub use crate::client::AsyncClient;

/// Version of this crate.
pub const VERSION: &'static str = env!("LIB_BUILD_VERSION");
Loading

0 comments on commit 0570127

Please sign in to comment.