Skip to content

Commit

Permalink
async procs wip
Browse files Browse the repository at this point in the history
  • Loading branch information
jsadusk committed Jan 31, 2025
1 parent 94b4ccd commit 6c84e15
Showing 1 changed file with 148 additions and 10 deletions.
158 changes: 148 additions & 10 deletions src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -13,11 +13,12 @@ use std::io::{Read, Seek, SeekFrom, Write};
use std::path::Path;
use std::rc::Rc;
use std::str::Utf8Error;
use std::thread::JoinHandle;
use std::time::SystemTime;
use thiserror::Error;
//use pipe::{pipe, PipeReader, PipeWriter};
//use crossbeam_channel::{unbounded as channel};

use pipe::{pipe, PipeReader, PipeWriter};
use crossbeam_channel;
use std::thread;
emacs::plugin_is_GPL_compatible!();

use anyhow::{anyhow, bail, Context};
Expand Down Expand Up @@ -64,16 +65,39 @@ impl<Inner> ScopedStatic<Inner> {
}
}

/*struct AsyncSession {
enum ControlMsg {
Create(String, PipeReader, PipeWriter, PipeWriter),
Kill(u64),
}

enum ResponseMsg {
Alive,
Created(u64),
Died(u64, u64),
Error(HandlerError)
}

struct AsyncProcess {
pid: u64,
stdin: PipeWriter,
stdout: PipeReader,
stderr: PipeReader,
proc_obj: Value,
}

struct AsyncSession {
connection: String,
signal_pipe: PipeWriter,
control: crossbeam_channel::Sender<ControlMsg>,
response: crossbeam_channel::Rec
}*/
command: crossbeam_channel::Sender<ControlMsg>,
response: crossbeam_channel::Receiver<ResponseMsg>,
processes: HashMap<u64, AsyncProcess>,
thread: JoinHandle<()>,
}

thread_local! {
static SESSIONS: RefCell<HashMap<String, Rc<Session>>> = RefCell::new(HashMap::new());
static SFTPS: RefCell<HashMap<String, Rc<Sftp>>> = RefCell::new(HashMap::new());
//static ASYNC_SESSIONS: RefCell<HashMap<String, Rc<AsyncSession>>> = RefCell::new(HashMap::new());
static ASYNC_SESSIONS: RefCell<HashMap<String, Rc<AsyncSession>>> = RefCell::new(HashMap::new());
static CURRENT_ENV: ScopedStatic<Env> = ScopedStatic::default();
}

Expand Down Expand Up @@ -358,8 +382,12 @@ fn ssh_auth_callback(
.map_err(|e: anyhow::Error| SshError::Fatal(e.to_string()))
}

fn connection_str(user: &str, host: &str) -> String {
format!("{}@{}", user, host)
}

fn get_connection(user: &str, host: &str, env: &Env) -> HandlerResult<Rc<Session>> {
let connection_str = format!("{}@{}", user, host);
let connection_str = connection_str(user, host);
SESSIONS.with(|sessions| {
let mut sessions = sessions.try_borrow_mut()?;
if let Some(session) = sessions.get(&connection_str) {
Expand All @@ -379,7 +407,7 @@ fn get_connection(user: &str, host: &str, env: &Env) -> HandlerResult<Rc<Session
}

fn get_sftp(user: &str, host: &str, session: &Session) -> EmacsResult<Rc<Sftp>> {
let connection_str = format!("{}@{}", user, host);
let connection_str = connection_str(user, host);
SFTPS.with(|sftps| {
let mut sftps = sftps.try_borrow_mut()?;
if let Some(sftp) = sftps.get(&connection_str) {
Expand Down Expand Up @@ -1040,6 +1068,107 @@ fn process_file<'a>(
})
}

fn async_session_main(
signal_pipe: PipeReader,
command_reciever: crossbeam_channel::Receiver<ControlMsg>,
response_sender: crossbeam_channel::Sender<ResponseMsg>,
session: Session
) {

}

impl AsyncSession {
pub(crate) fn get(env: &env, user: &str, host: &str) -> HandlerResult<Rc<AsyncSession>> {
let connection_str = connection_str(user, host);
ASYNC_SESSIONS.with(|async_sessions| {
let async_sessions = async_sessions.try_borrow_mut()?;
if let Some(async_session) = async_sessions.get(connection_str) {
Some(async_session.clone())
} else {
let async_session = Rc::new(AsyncSession::new(env, connection_str)?);
async_sessions.insert(connection_str, async_session.clone());
Ok(async_session)
}
})
}

fn new(env: &Env, connection_str: String) -> HandlerResult<AsyncSession> {
env.message(format!("Creating async session {}", connection_str))?;
let (mut signal_read, mut signal_write) = pipe();
let (command_sender, command_reciever) = crossbeam_channel::unbounded();
let (response_sender, response_reciever) = crossbeam_channel::unbounded();
let session = Rc::new(init_connection(user, host, env)?);

let async_thread = thread::spawn(
move || {
async_session_main(signal_read, command_reciever, response_sender, session);
}
);


env.message(format!("Waiting for async session {}", connection_str))?;
match response_reciever.try_recv()? {
Alive => {
env.message(format!("Created async session {}", connection_str))?;
Ok(AsyncSession {
connection: connect_str,
signal_pipe: signal_write,
command: command_sender,
response: response_reciever,
processes: HashMap::new(),
thread: async_thread,
})
},
Error(err) => Err(err),
_ => Error("Unknown response on async session create")
}
}

fn make_process(&self, env: &Env, command: Vec<String>) -> HandlerResult<AsyncProcess> {
env.message(format!("Creating process on {}", connection))?;
let (mut stdin_read, mut stdin_write) = pipe();
let (mut stdout_read, mut stdout_write) = pipe();
let (mut stderr_read, mut stderr_write) = pipe();

let signal: [u8] = ['c'];
self.signal_pipe.write(&signal);
self.command.try_send(ControlMsg::Create(command, stdin_read, stdout_write, stderr_write))?;


env.message(format!("Waiting for process on {}", connection))?;
while true {
match self.response.try_recv()? {
Created(pid) => {
let proc: AsyncProcess = AsyncProcess::new(
env, pid, stderr_write, stdout_read, stderr_read
)?;
self.processes
Died(pid, code) =>
}
}
}
}

#[defun]
fn make_process<'a>(
env: &'a Env,
name: Value<'a>,
buffer: Value<'a>,
command: Value<'a>,
coding: Value<'a>,
type: Value<'a>,
query_flag: Value<'a>,
stopped: Value<'a>,
filter: Value<'a>,
sentinel: Value<'a>,
stderr: Value<'a>,
file_handler: Value<'a>,
) -> Value<'a> {
let dissected = env.default_directory()?;
let session = get_async_session(&dissected.user, &dissected.host, &env)?;

}

fn octal_permissions_to_string(permissions: u32) -> String {
let mut permissions: u32 = permissions;
let mut ls: Vec<char> = "----------".chars().collect();
Expand Down Expand Up @@ -1069,6 +1198,15 @@ fn octal_permissions_to_string(permissions: u32) -> String {

#[defun]
fn bare<'a>(env: &'a Env, filename1: Value<'a>) -> EmacsResult<Value<'a>> {
let myref = {
let mystr = String::new("hello");
&mystr
}

println!("{}", myref);



for _i in 0..10000 {
let message = String::from_lisp(filename1)?;
let message = message.into_lisp(env)?;
Expand Down

0 comments on commit 6c84e15

Please sign in to comment.