Skip to content

Commit

Permalink
Revert "Bugfixes (#72)"
Browse files Browse the repository at this point in the history
This reverts commit f5ea6be.
  • Loading branch information
Devdutt Shenoi committed Dec 14, 2022
1 parent 5dca975 commit 80eb6f1
Show file tree
Hide file tree
Showing 13 changed files with 109 additions and 200 deletions.
11 changes: 10 additions & 1 deletion Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

4 changes: 2 additions & 2 deletions uplink/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -8,8 +8,7 @@ edition = "2021"
structopt = "0.3"
config = { version = "0.13.2", default-features = false, features = ["toml", "json"] }
log = "0.4"
simplelog = "0.12.0"
time = "*"
simplelog = "0.12"
disk = { path = "../disk" }
tokio = { version = "1", features = ["full"] }
tokio-util = { version = "0.7", features = ["codec", "time"] }
Expand All @@ -31,6 +30,7 @@ sysinfo = "0.26"
lazy_static = "1.4.0"
regex = "1.6.0"
chrono = "0.4.19"
stdio-override = "0.1.3"

[build-dependencies]
vergen = { version = "7", features = ["git", "build", "time"] }
Expand Down
6 changes: 3 additions & 3 deletions uplink/src/base/actions.rs
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,7 @@ pub struct Action {

#[derive(Debug, Serialize, Deserialize)]
pub struct ActionResponse {
pub action_id: String,
pub id: String,
// sequence number
pub sequence: u32,
// timestamp
Expand All @@ -46,7 +46,7 @@ impl ActionResponse {
.as_millis() as u64;

ActionResponse {
action_id: id.to_owned(),
id: id.to_owned(),
sequence: 0,
timestamp,
state: state.to_owned(),
Expand Down Expand Up @@ -95,7 +95,7 @@ impl From<&ActionResponse> for Payload {
sequence: resp.sequence,
timestamp: resp.timestamp,
payload: json!({
"id": resp.action_id,
"id": resp.id,
"state": resp.state,
"progress": resp.progress,
"errors": resp.errors
Expand Down
16 changes: 15 additions & 1 deletion uplink/src/base/middleware/tunshell.rs
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
use std::sync::Arc;
use std::sync::{Arc, Mutex};

use flume::Receiver;
use log::error;
Expand All @@ -23,6 +23,7 @@ pub struct TunshellSession {
echo_stdout: bool,
actions_rx: Receiver<Action>,
action_status: Stream<ActionResponse>,
last_process_done: Arc<Mutex<bool>>,
}

impl TunshellSession {
Expand All @@ -37,6 +38,7 @@ impl TunshellSession {
echo_stdout,
actions_rx: tunshell_rx,
action_status,
last_process_done: Arc::new(Mutex::new(true)),
}
}

Expand All @@ -57,6 +59,14 @@ impl TunshellSession {
pub async fn start(mut self) {
while let Ok(action) = self.actions_rx.recv_async().await {
let action_id = action.action_id.clone();
if !(*self.last_process_done.lock().unwrap()) {
let status = ActionResponse::failure(&action_id, "busy".to_owned());
if let Err(e) = self.action_status.fill(status).await {
error!("Failed to send status, Error = {:?}", e);
};

continue;
}

// println!("{:?}", keys);
let keys = match serde_json::from_str(&action.payload) {
Expand All @@ -73,9 +83,11 @@ impl TunshellSession {
};

let mut client = Client::new(self.config(keys), HostShell::new().unwrap());
let last_process_done = self.last_process_done.clone();
let mut status_tx = self.action_status.clone();

tokio::spawn(async move {
*last_process_done.lock().unwrap() = false;
let response = ActionResponse::progress(&action_id, "ShellSpawned", 100);
if let Err(e) = status_tx.fill(response).await {
error!("Failed to send status. Error {:?}", e);
Expand All @@ -100,6 +112,8 @@ impl TunshellSession {
if let Err(e) = send_status {
error!("Failed to send status. Error {:?}", e);
}

*last_process_done.lock().unwrap() = true;
});
}
}
Expand Down
1 change: 1 addition & 0 deletions uplink/src/base/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -88,6 +88,7 @@ pub struct Config {
pub max_inflight: u16,
pub actions: Vec<String>,
pub persistence: Option<Persistence>,
pub log_dir: Option<String>,
pub streams: HashMap<String, StreamConfig>,
pub action_status: StreamConfig,
pub serializer_metrics: Option<StreamConfig>,
Expand Down
35 changes: 0 additions & 35 deletions uplink/src/bin/push_data.rs

This file was deleted.

37 changes: 0 additions & 37 deletions uplink/src/bin/push_to_uplink.rs

This file was deleted.

6 changes: 6 additions & 0 deletions uplink/src/bin/test.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,6 @@
fn main() {
let logcat_re = regex::Regex::new(r#"^(\S+ \S+) (\w)/([^(\s]*).+?:\s*(.*)$"#).unwrap();
let line = "07-25 08:52:02.552 W//apex/com.android.adbd/bin/adbd( 367): type=1400 audit(0.0:21653): avc: denied { search } for comm=73796E6320737663203635 name=\"oem\" dev=\"dm-4\" ino=42 scontext=u:r:adbd:s0 tcontext=u:object_r:oemfs:s0 tclass=dir permissive=0";
let data = logcat_re.captures(line);
dbg!(data);
}
44 changes: 44 additions & 0 deletions uplink/src/bin/test_client.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,44 @@
use futures_util::SinkExt;
use serde::{Deserialize, Serialize};
use std::time::{SystemTime, UNIX_EPOCH};
use tokio::net::TcpStream;
use tokio_stream::StreamExt;
use tokio_util::codec::{Framed, LinesCodec};
use uplink::Action;

#[derive(Debug, Serialize, Deserialize)]
struct Response {
stream: String,
sequence: u32,
timestamp: u64,
action_id: String,
state: String,
progress: u8,
errors: Vec<String>,
}

#[tokio::main]
async fn main() {
let port = std::env::args().nth(1).unwrap_or_else(|| "127.0.0.1:5555".to_string());
let stream = TcpStream::connect(port).await.unwrap();
let mut framed = Framed::new(stream, LinesCodec::new());
let mut idx = 1;
loop {
let action_s = framed.next().await.unwrap().unwrap();
println!("Received: {}", action_s);
let action = serde_json::from_str::<Action>(action_s.as_str()).unwrap();
idx += 1;
let response = Response {
stream: "action_status".to_string(),
sequence: idx,
timestamp: SystemTime::now().duration_since(UNIX_EPOCH).unwrap().as_millis() as u64,
action_id: action.action_id,
state: "Completed".to_string(),
progress: 100,
errors: vec![],
};
let resp = serde_json::to_string(&response).unwrap();
println!("Sending: {}", resp);
framed.send(resp).await.unwrap();
}
}
57 changes: 0 additions & 57 deletions uplink/src/bin/wait_and_send.rs

This file was deleted.

32 changes: 7 additions & 25 deletions uplink/src/collector/downloader.rs
Original file line number Diff line number Diff line change
Expand Up @@ -51,7 +51,6 @@ use serde::{Deserialize, Serialize};

use std::fs::{create_dir_all, File};
use std::{io::Write, path::PathBuf};
use std::time::Duration;

use crate::base::{Authentication, Downloader};
use crate::{Action, ActionResponse, Stream};
Expand Down Expand Up @@ -120,7 +119,7 @@ impl FileDownloader {
}
None => client_builder,
}
.build()?;
.build()?;

// Create rendezvous channel with flume
let (download_tx, download_rx) = flume::bounded(0);
Expand Down Expand Up @@ -150,27 +149,10 @@ impl FileDownloader {
let action = self.download_rx.recv()?;
self.action_id = action.action_id.clone();

let mut error = None;
for _ in 0..3 {
match self.run(action.clone()).await {
Ok(_) => {
error = None;
break;
}
Err(e) => {
error!("Download failed: {e}\nretrying");
error = Some(e);
}
}
tokio::time::sleep(Duration::from_secs(30)).await;
}
match error {
None => {}
Some(e) => {
let status = ActionResponse::failure(&self.action_id, e.to_string())
.set_sequence(self.sequence());
self.send_status(status).await;
}
if let Err(e) = self.run(action).await {
let status = ActionResponse::failure(&self.action_id, e.to_string())
.set_sequence(self.sequence());
self.send_status(status).await;
}
}
}
Expand Down Expand Up @@ -253,9 +235,9 @@ impl FileDownloader {
// Calculate percentage on the basis of content_length if available,
// else increment 0..100 till task is completed.
let percentage = match content_length {
Some(content_length) => 100 * downloaded / content_length,
Some(content_length) => (50 * downloaded / content_length) % 101,
None => {
downloaded = (downloaded + 1) % 101;
downloaded = (downloaded + 1) % 50;
downloaded
}
};
Expand Down
Loading

0 comments on commit 80eb6f1

Please sign in to comment.