Skip to content

Commit

Permalink
Refactor to send request and response without channels.
Browse files Browse the repository at this point in the history
  • Loading branch information
FireMasterK committed Dec 14, 2023
1 parent bfd4ac6 commit 5408d34
Showing 1 changed file with 86 additions and 115 deletions.
201 changes: 86 additions & 115 deletions reqwest-jni/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,6 @@ use jni::sys::jobject;
use jni::JNIEnv;
use reqwest::{Client, Method, Url};
use tokio::runtime::Runtime;
use tokio::sync::mpsc;

static RUNTIME: OnceLock<Runtime> = OnceLock::new();
static CLIENT: OnceLock<Client> = OnceLock::new();
Expand Down Expand Up @@ -131,126 +130,98 @@ pub extern "system" fn Java_rocks_kavin_reqwest4j_ReqwestUtils_fetch(

let runtime = RUNTIME.get().unwrap();

// Create a channel for communication between tasks
let (tx, mut rx) = mpsc::channel(1);
let (err_tx, mut err_rx) = mpsc::channel(1);

runtime.spawn(async move {
// send request
let response = request.send().await;

match response {
Ok(response) => {
// Send response to the processing task
tx.send(response).await.unwrap();
}
Err(error) => {
// Send error to the error handling task
err_tx.send(error).await.unwrap();
}
}
});

// send request in a async task
{
let jvm = Arc::clone(&jvm);
let future = Arc::clone(&future);
runtime.spawn(async move {
// Receive the response from the first task
let response = rx.recv().await;

if response.is_none() {
return;
}

let response = response.unwrap();

// get response
let status = response.status().as_u16() as i32;

let final_url = response.url().to_string();

let response_headers = response.headers().clone();

let body = response.bytes().await.unwrap_or_default().to_vec();

let mut env = jvm.attach_current_thread().unwrap();

let final_url = env.new_string(final_url).unwrap();

let body = env.byte_array_from_slice(&body).unwrap();

let headers = env.new_object("java/util/HashMap", "()V", &[]).unwrap();
let headers: JMap = JMap::from_env(&mut env, &headers).unwrap();

response_headers.iter().for_each(|(key, value)| {
let key = env.new_string(key.as_str()).unwrap();
let value = env.new_string(value.to_str().unwrap()).unwrap();
headers
.put(&mut env, &JObject::from(key), &JObject::from(value))
.unwrap();
});

// return response to CompletableFuture
let response = env
.new_object(
"rocks/kavin/reqwest4j/Response",
"(ILjava/util/Map;[BLjava/lang/String;)V",
&[
status.into(),
(&headers).into(),
(&body).into(),
(&final_url).into(),
],
)
.unwrap();

let future = future.as_obj();
env.call_method(
future,
"complete",
"(Ljava/lang/Object;)Z",
&[(&response).into()],
)
.unwrap();
});
}

{
let jvm = Arc::clone(&jvm);
let future = Arc::clone(&future);
runtime.spawn(async move {
// Receive the error from the first task
let error = err_rx.recv().await;

if error.is_none() {
return;
// send request
let response = request.send().await;

match response {
Ok(response) => {
// get response
let status = response.status().as_u16() as i32;

let final_url = response.url().to_string();

let response_headers = response.headers().clone();

let body = response.bytes().await.unwrap_or_default().to_vec();

// send response in a blocking task
runtime.spawn_blocking(move || {
let mut env = jvm.attach_current_thread().unwrap();

let final_url = env.new_string(final_url).unwrap();

let body = env.byte_array_from_slice(&body).unwrap();

let headers = env.new_object("java/util/HashMap", "()V", &[]).unwrap();
let headers: JMap = JMap::from_env(&mut env, &headers).unwrap();

response_headers.iter().for_each(|(key, value)| {
let key = env.new_string(key.as_str()).unwrap();
let value = env.new_string(value.to_str().unwrap()).unwrap();
headers
.put(&mut env, &JObject::from(key), &JObject::from(value))
.unwrap();
});

// return response to CompletableFuture
let response = env
.new_object(
"rocks/kavin/reqwest4j/Response",
"(ILjava/util/Map;[BLjava/lang/String;)V",
&[
status.into(),
(&headers).into(),
(&body).into(),
(&final_url).into(),
],
)
.unwrap();

let future = future.as_obj();
env.call_method(
future,
"complete",
"(Ljava/lang/Object;)Z",
&[(&response).into()],
)
.unwrap();
});
}
Err(error) => {
// send error in a blocking task
runtime.spawn_blocking(move || {
let mut env = jvm.attach_current_thread().unwrap();

let error = error.to_string();
let error = env.new_string(error).unwrap();
// create Exception
let exception = env
.new_object(
"java/lang/Exception",
"(Ljava/lang/String;)V",
&[(&error).into()],
)
.unwrap();

let future = future.as_obj();

// pass error to CompletableFuture
env.call_method(
future,
"completeExceptionally",
"(Ljava/lang/Throwable;)Z",
&[(&exception).into()],
)
.unwrap();
});
}
}

let error = error.unwrap();

let mut env = jvm.attach_current_thread().unwrap();

let error = error.to_string();
let error = env.new_string(error).unwrap();
// create Exception
let exception = env
.new_object(
"java/lang/Exception",
"(Ljava/lang/String;)V",
&[(&error).into()],
)
.unwrap();

let future = future.as_obj();

// pass error to CompletableFuture
env.call_method(
future,
"completeExceptionally",
"(Ljava/lang/Throwable;)Z",
&[(&exception).into()],
)
.unwrap();
});
}

Expand Down

0 comments on commit 5408d34

Please sign in to comment.