Skip to content

Commit

Permalink
clean up
Browse files Browse the repository at this point in the history
  • Loading branch information
cpetig committed Jan 12, 2025
1 parent f19d9d2 commit c9f748a
Show file tree
Hide file tree
Showing 5 changed files with 10 additions and 112 deletions.
2 changes: 1 addition & 1 deletion crates/cpp/tests/symmetric_stream/source/src/lib.rs
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
use std::sync::atomic::{AtomicU32, Ordering};

use wit_bindgen_symmetric_rt::{
async_support::Stream, register, symmetric_stream, CallbackState, EventSubscription,
async_support::Stream, register, CallbackState, EventSubscription,
};

static COUNT: AtomicU32 = AtomicU32::new(1);
Expand Down
33 changes: 0 additions & 33 deletions crates/symmetric_executor/rust-client/src/async_support.rs
Original file line number Diff line number Diff line change
Expand Up @@ -11,8 +11,6 @@ use crate::module::symmetric::runtime::symmetric_executor::{

pub use stream_support::{results, Stream, StreamHandle2, StreamReader, StreamWriter};

// pub use futures;

mod future_support;
// later make it non-pub
pub mod stream_support;
Expand Down Expand Up @@ -139,34 +137,3 @@ pub fn spawn(future: impl Future<Output = ()> + 'static + Send) {
let wait_for = unsafe { EventSubscription::from_handle(wait_for as usize) };
drop(wait_for);
}

// #[repr(transparent)]
// pub struct AddressSend(pub *mut ());
// unsafe impl Send for AddressSend {}
// unsafe impl Sync for StreamHandle2 {}

// this is used for reading?
// pub async unsafe fn await_stream_result(
// import: unsafe extern "C" fn(&Stream, Buffer) -> Buffer,
// stream: StreamHandle2,
// buffer: Buffer,
// ) -> Option<Buffer> {
// let stream_copy = stream.clone();
// let result = import(&stream, buffer);
// match result {
// results::BLOCKED => {
// let event =
// unsafe { subscribe_event_send_ptr(stream_support::read_ready_event(stream.0)) };
// event.reset();
// wait_on(event).await;
// let v = stream.read_result();
// if let results::CLOSED | results::CANCELED = v {
// None
// } else {
// Some(usize::try_from(v).unwrap())
// }
// }
// results::CLOSED | results::CANCELED => None,
// v => Some(usize::try_from(v).unwrap()),
// }
// }
Original file line number Diff line number Diff line change
Expand Up @@ -127,7 +127,6 @@ impl<T> Drop for StreamWriter<T> {
pub struct StreamReader<T: 'static> {
handle: StreamHandle2,
future: Option<Pin<Box<dyn Future<Output = Option<Vec<T>>> + 'static + Send>>>,
// event: EventSubscription,
_phantom: PhantomData<T>,
}

Expand Down Expand Up @@ -176,7 +175,6 @@ impl<T: Unpin + Send> futures::stream::Stream for StreamReader<T> {
let mut buffer0 = iter::repeat_with(MaybeUninit::uninit)
.take(ceiling(4 * 1024, mem::size_of::<T>()))
.collect::<Vec<_>>();
// let stream_handle = handle;
let address = unsafe { Address::from_handle(buffer0.as_mut_ptr() as usize) };
let buffer = Buffer::new(address, buffer0.len() as u64);
handle.start_reading(buffer);
Expand All @@ -191,29 +189,6 @@ impl<T: Unpin + Send> futures::stream::Stream for StreamReader<T> {
} else {
None
}
// let result = if let Some(count) = {
// let poll_fn: unsafe fn(*mut Stream, *mut (), usize) -> isize = start_reading;
// let address = super::AddressSend(buffer.as_mut_ptr() as _);
// let count = unsafe {
// super::await_stream_result(poll_fn, stream_handle, address, buffer.len())
// .await
// };
// #[allow(unused)]
// if let Some(count) = count {
// let value = ();
// }
// count
// }
// T::read(&stream_handle, &mut buffer).await
// {
// buffer.truncate(count);
// Some(unsafe { mem::transmute::<Vec<MaybeUninit<T>>, Vec<T>>(buffer) })
// } else {
// None
// };
// todo!();
// Some(Vec::new())
// result
}) as Pin<Box<dyn Future<Output = _> + Send>>);
}

Expand All @@ -235,11 +210,7 @@ impl<T> Drop for StreamReader<T> {
}
}

// Stream handles are Send, so wrap them
// #[repr(transparent)]
pub type StreamHandle2 = Stream;
// unsafe impl Send for StreamHandle2 {}
// unsafe impl Sync for StreamHandle2 {}

pub fn new_stream<T: 'static>() -> (StreamWriter<T>, StreamReader<T>) {
let handle = Stream::new();
Expand Down
53 changes: 6 additions & 47 deletions crates/symmetric_executor/symmetric_stream/src/lib.rs
Original file line number Diff line number Diff line change
@@ -1,5 +1,4 @@
use std::{
mem::transmute,
ptr::null_mut,
sync::{
atomic::{AtomicIsize, AtomicPtr, AtomicUsize, Ordering},
Expand All @@ -10,9 +9,7 @@ use std::{
use stream_impl::exports::symmetric::runtime::symmetric_stream::{
self, Address, GuestAddress, GuestBuffer, GuestStreamObj,
};
use stream_impl::symmetric::runtime::symmetric_executor::{
self, EventGenerator, EventSubscription,
};
use stream_impl::symmetric::runtime::symmetric_executor::EventGenerator;

mod stream_impl;

Expand Down Expand Up @@ -58,8 +55,6 @@ impl GuestBuffer for Buffer {

mod results {
pub const BLOCKED: isize = -1;
// pub const CLOSED: isize = isize::MIN;
// pub const CANCELED: isize = 0;
}

struct StreamInner {
Expand Down Expand Up @@ -99,71 +94,41 @@ impl GuestStreamObj for StreamObj {
let old_ready = self.0.ready_size.load(Ordering::Acquire);
if old_readya as usize == EOF_MARKER {
todo!();
// return old_ready;
}
assert!(old_ready == results::BLOCKED);
let old_size = self.0.read_size.swap(size as usize, Ordering::Acquire);
assert_eq!(old_size, 0);
let old_ptr = self.0.read_addr.swap(buf, Ordering::Release);
assert_eq!(old_ptr, std::ptr::null_mut());
self.write_ready_activate();
// unsafe { activate_event_send_ptr(write_evt) };
// results::BLOCKED
}

// fn read_ready_event(&self) -> symmetric_stream::EventGenerator {
// unsafe {
// symmetric_stream::EventGenerator::from_handle(self.read_ready_event_send as usize)
// }
// }

fn read_result(&self) -> Option<symmetric_stream::Buffer> {
let size = self.0.ready_size.swap(results::BLOCKED, Ordering::Acquire);
let addr = self.0.ready_addr.swap(null_mut(), Ordering::Acquire);
let addr = self.0.ready_addr.swap(null_mut(), Ordering::Relaxed);
let capacity = self.0.ready_capacity.swap(0, Ordering::Relaxed);
if addr as usize == EOF_MARKER {
None
} else {
Some(symmetric_stream::Buffer::new(Buffer {
addr,
capacity: size as usize,
capacity,
size: AtomicUsize::new(size as usize),
}))
}
}

// fn close_read(stream: symmetric_stream::StreamObj) -> () {
// let refs = unsafe { &mut *stream }
// .active_instances
// .fetch_sub(1, Ordering::AcqRel);
// if refs == 1 {
// let obj = Box::from_raw(stream);
// drop(EventGenerator::from_handle(
// obj.read_ready_event_send as usize,
// ));
// drop(EventGenerator::from_handle(
// obj.write_ready_event_send as usize,
// ));
// drop(obj);
// }
// }

fn is_ready_to_write(&self) -> bool {
!self.0.read_addr.load(Ordering::Acquire).is_null()
}

// fn write_ready_event(&self) -> symmetric_stream::EventGenerator {
// //self.write_ready_event_send.clone()
// unsafe {
// symmetric_stream::EventGenerator::from_handle(self.write_ready_event_send as usize)
// }
// }

fn start_writing(&self) -> symmetric_stream::Buffer {
let size = self.0.read_size.swap(0, Ordering::Acquire);
let addr = self
.0
.read_addr
.swap(core::ptr::null_mut(), Ordering::Release);
.swap(core::ptr::null_mut(), Ordering::Relaxed);
self.0.ready_capacity.store(size, Ordering::Release);
symmetric_stream::Buffer::new(Buffer {
addr,
capacity: size,
Expand All @@ -183,11 +148,9 @@ impl GuestStreamObj for StreamObj {
let _old_readya = self.0.ready_addr.swap(addr, Ordering::Release);
assert_eq!(old_ready, results::BLOCKED);
self.read_ready_activate();
// unsafe { activate_event_send_ptr(self.read_ready_event) };
}

fn clone(&self) -> symmetric_stream::StreamObj {
// let _= self.0.active_instances.fetch_add(1, Ordering::AcqRel);
symmetric_stream::StreamObj::new(StreamObj(Arc::clone(&self.0)))
}

Expand All @@ -206,10 +169,6 @@ impl GuestStreamObj for StreamObj {
fn read_ready_activate(&self) {
self.0.read_ready_event_send.activate();
}

// fn close_write(stream: symmetric_stream::StreamObj) -> () {
// todo!()
// }
}

const EOF_MARKER: usize = 1;
Expand Down
5 changes: 3 additions & 2 deletions crates/symmetric_executor/symmetric_stream/src/stream_impl.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1023,8 +1023,9 @@ pub mod exports {
match result0 {
Some(e) => {
*arg1.add(0).cast::<u8>() = (1i32) as u8;
*arg1.add(core::mem::size_of::<*const u8>()).cast::<*mut u8>() =
(e).take_handle() as *mut u8;
*arg1
.add(core::mem::size_of::<*const u8>())
.cast::<*mut u8>() = (e).take_handle() as *mut u8;
}
None => {
*arg1.add(0).cast::<u8>() = (0i32) as u8;
Expand Down

0 comments on commit c9f748a

Please sign in to comment.