Skip to content

Commit

Permalink
proper EOF implementation
Browse files Browse the repository at this point in the history
  • Loading branch information
cpetig committed Jan 12, 2025
1 parent 1ef3ddb commit f19d9d2
Show file tree
Hide file tree
Showing 7 changed files with 144 additions and 151 deletions.
6 changes: 3 additions & 3 deletions crates/cpp/tests/symmetric_stream/main/src/main.rs
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
use wit_bindgen_symmetric_rt::{
async_support::{stream_support, Stream},
async_support::Stream,
symmetric_stream::{Address, Buffer},
CallbackState,
};
Expand All @@ -19,9 +19,9 @@ struct CallbackInfo {
extern "C" fn ready(arg: *mut ()) -> CallbackState {
let info = unsafe { &*arg.cast::<CallbackInfo>() };
let buffer = info.stream.read_result();
let len = buffer.get_size();
// unsafe { stream_support::read_amount(info.stream) };
if len > 0 {
if let Some(buffer) = buffer {
let len = buffer.get_size();
for i in 0..len as usize {
println!("data {}", info.data[i]);
}
Expand Down
4 changes: 2 additions & 2 deletions crates/cpp/tests/symmetric_stream/source/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@ extern "C" fn timer_call(data: *mut ()) -> CallbackState {
assert!(size >= 1);
*unsafe { &mut *addr } = count;
buffer.set_size(1);
stream.finish_writing(buffer);
stream.finish_writing(Some(buffer));
}
let _ = stream.take_handle();
CallbackState::Ready
Expand All @@ -27,7 +27,7 @@ extern "C" fn write_ready(data: *mut ()) -> CallbackState {
if count > 5 {
let stream: Stream = unsafe { Stream::from_handle(data as usize) };
// EOF
stream.finish_writing(symmetric_stream::end_of_file());
stream.finish_writing(None);
CallbackState::Ready
} else {
if count == 1 {
Expand Down
Original file line number Diff line number Diff line change
@@ -1,9 +1,7 @@
pub use crate::module::symmetric::runtime::symmetric_stream::StreamObj as Stream;
use crate::{
async_support::wait_on,
module::symmetric::runtime::symmetric_stream::{self, end_of_file},
symmetric_stream::{Address, Buffer},
EventGenerator,
};
use {
futures::sink::Sink,
Expand Down Expand Up @@ -104,7 +102,7 @@ impl<T: Unpin> Sink<Vec<T>> for StreamWriter<T> {
a.write(b);
}
buffer.set_size(item_len as u64);
stream.finish_writing(buffer);
stream.finish_writing(Some(buffer));
Ok(())
}

Expand All @@ -120,7 +118,7 @@ impl<T: Unpin> Sink<Vec<T>> for StreamWriter<T> {
impl<T> Drop for StreamWriter<T> {
fn drop(&mut self) {
if !self.handle.is_write_closed() {
self.handle.finish_writing(end_of_file());
self.handle.finish_writing(None);
}
}
}
Expand Down Expand Up @@ -186,9 +184,13 @@ impl<T: Unpin + Send> futures::stream::Stream for StreamReader<T> {
subsc.reset();
wait_on(subsc).await;
let buffer2 = handle.read_result();
let count = buffer2.get_size();
buffer0.truncate(count as usize);
Some(unsafe { mem::transmute::<Vec<MaybeUninit<T>>, Vec<T>>(buffer0) })
if let Some(buffer2) = buffer2 {
let count = buffer2.get_size();
buffer0.truncate(count as usize);
Some(unsafe { mem::transmute::<Vec<MaybeUninit<T>>, Vec<T>>(buffer0) })
} else {
None
}
// let result = if let Some(count) = {
// let poll_fn: unsafe fn(*mut Stream, *mut (), usize) -> isize = start_reading;
// let address = super::AddressSend(buffer.as_mut_ptr() as _);
Expand Down
105 changes: 57 additions & 48 deletions crates/symmetric_executor/rust-client/src/module.rs
Original file line number Diff line number Diff line change
Expand Up @@ -807,8 +807,19 @@ pub mod symmetric {
}
impl StreamObj {
#[allow(unused_unsafe, clippy::all)]
pub fn read_result(&self) -> Buffer {
/// none is EOF
pub fn read_result(&self) -> Option<Buffer> {
unsafe {
#[cfg_attr(target_pointer_width = "64", repr(align(8)))]
#[cfg_attr(target_pointer_width = "32", repr(align(4)))]
struct RetArea(
[::core::mem::MaybeUninit<u8>; 2 * core::mem::size_of::<*const u8>()],
);
let mut ret_area = RetArea(
[::core::mem::MaybeUninit::uninit();
2 * core::mem::size_of::<*const u8>()],
);
let ptr0 = ret_area.0.as_mut_ptr().cast::<u8>();
#[link(wasm_import_module = "symmetric:runtime/[email protected]")]
extern "C" {
#[cfg_attr(
Expand All @@ -817,10 +828,25 @@ pub mod symmetric {
)]
fn symmetricX3AruntimeX2Fsymmetric_streamX400X2E1X2E0X00X5BmethodX5Dstream_objX2Eread_result(
_: *mut u8,
) -> *mut u8;
_: *mut u8,
);
}
symmetricX3AruntimeX2Fsymmetric_streamX400X2E1X2E0X00X5BmethodX5Dstream_objX2Eread_result((self).handle() as *mut u8, ptr0);
let l1 = i32::from(*ptr0.add(0).cast::<u8>());
match l1 {
0 => None,
1 => {
let e = {
let l2 = *ptr0
.add(core::mem::size_of::<*const u8>())
.cast::<*mut u8>();

Buffer::from_handle(l2 as usize)
};
Some(e)
}
_ => _rt::invalid_enum_discriminant(),
}
let ret = symmetricX3AruntimeX2Fsymmetric_streamX400X2E1X2E0X00X5BmethodX5Dstream_objX2Eread_result((self).handle() as *mut u8);
Buffer::from_handle(ret as usize)
}
}
}
Expand Down Expand Up @@ -884,8 +910,13 @@ pub mod symmetric {
}
impl StreamObj {
#[allow(unused_unsafe, clippy::all)]
pub fn finish_writing(&self, buffer: Buffer) -> () {
/// none is EOF
pub fn finish_writing(&self, buffer: Option<Buffer>) -> () {
unsafe {
let (result0_0, result0_1) = match &buffer {
Some(e) => (1i32, (e).take_handle() as *mut u8),
None => (0i32, core::ptr::null_mut()),
};
#[link(wasm_import_module = "symmetric:runtime/[email protected]")]
extern "C" {
#[cfg_attr(
Expand All @@ -894,10 +925,11 @@ pub mod symmetric {
)]
fn symmetricX3AruntimeX2Fsymmetric_streamX400X2E1X2E0X00X5BmethodX5Dstream_objX2Efinish_writing(
_: *mut u8,
_: i32,
_: *mut u8,
);
}
symmetricX3AruntimeX2Fsymmetric_streamX400X2E1X2E0X00X5BmethodX5Dstream_objX2Efinish_writing((self).handle() as *mut u8, (&buffer).take_handle() as *mut u8);
symmetricX3AruntimeX2Fsymmetric_streamX400X2E1X2E0X00X5BmethodX5Dstream_objX2Efinish_writing((self).handle() as *mut u8, result0_0, result0_1);
}
}
}
Expand All @@ -919,36 +951,6 @@ pub mod symmetric {
}
}
}
#[allow(unused_unsafe, clippy::all)]
/// special EOF buffer value (should be opaque)
pub fn end_of_file() -> Buffer {
unsafe {
#[link(wasm_import_module = "symmetric:runtime/[email protected]")]
extern "C" {
#[cfg_attr(target_arch = "wasm32", link_name = "end-of-file")]
fn symmetricX3AruntimeX2Fsymmetric_streamX400X2E1X2E0X00end_of_file(
) -> *mut u8;
}
let ret = symmetricX3AruntimeX2Fsymmetric_streamX400X2E1X2E0X00end_of_file();
Buffer::from_handle(ret as usize)
}
}
#[allow(unused_unsafe, clippy::all)]
pub fn is_end_of_file(obj: &Buffer) -> bool {
unsafe {
#[link(wasm_import_module = "symmetric:runtime/[email protected]")]
extern "C" {
#[cfg_attr(target_arch = "wasm32", link_name = "is-end-of-file")]
fn symmetricX3AruntimeX2Fsymmetric_streamX400X2E1X2E0X00is_end_of_file(
_: *mut u8,
) -> i32;
}
let ret = symmetricX3AruntimeX2Fsymmetric_streamX400X2E1X2E0X00is_end_of_file(
(obj).handle() as *mut u8,
);
_rt::bool_lift(ret as u8)
}
}
}
}
}
Expand Down Expand Up @@ -1088,14 +1090,21 @@ mod _rt {
self as i64
}
}
pub unsafe fn invalid_enum_discriminant<T>() -> T {
if cfg!(debug_assertions) {
panic!("invalid enum discriminant")
} else {
core::hint::unreachable_unchecked()
}
}
}

#[cfg(target_arch = "wasm32")]
#[unsafe(link_section = "component-type:wit-bindgen:0.37.0:symmetric:[email protected]:module:encoded world")]
#[doc(hidden)]
#[allow(clippy::octal_escapes)]
pub static __WIT_BINDGEN_COMPONENT_TYPE: [u8; 1733] = *b"\
\0asm\x0d\0\x01\0\0\x19\x16wit-component-encoding\x04\0\x07\xc8\x0c\x01A\x02\x01\
pub static __WIT_BINDGEN_COMPONENT_TYPE: [u8; 1716] = *b"\
\0asm\x0d\0\x01\0\0\x19\x16wit-component-encoding\x04\0\x07\xb7\x0c\x01A\x02\x01\
A\x05\x01B\x20\x04\0\x11callback-function\x03\x01\x04\0\x0dcallback-data\x03\x01\
\x04\0\x12event-subscription\x03\x01\x04\0\x0fevent-generator\x03\x01\x01m\x02\x07\
started\x0bnot-started\x04\0\x0bcall-status\x03\0\x04\x01m\x02\x07pending\x05rea\
Expand All @@ -1109,7 +1118,7 @@ od]event-generator.subscribe\x01\x11\x01@\x01\x04self\x10\x01\0\x04\0\x20[method
]event-generator.activate\x01\x12\x01@\0\x01\0\x04\0\x03run\x01\x13\x01i\0\x01i\x01\
\x01@\x03\x07trigger\x0a\x08callback\x14\x04data\x15\x01\0\x04\0\x08register\x01\
\x16\x03\0*symmetric:runtime/[email protected]\x05\0\x02\x03\0\0\x12event\
-subscription\x01B+\x02\x03\x02\x01\x01\x04\0\x12event-subscription\x03\0\0\x04\0\
-subscription\x01B*\x02\x03\x02\x01\x01\x04\0\x12event-subscription\x03\0\0\x04\0\
\x07address\x03\x01\x04\0\x06buffer\x03\x01\x04\0\x0astream-obj\x03\x01\x01i\x02\
\x01i\x03\x01@\x02\x04addr\x05\x08capacityw\0\x06\x04\0\x13[constructor]buffer\x01\
\x07\x01h\x03\x01@\x01\x04self\x08\0\x05\x04\0\x1a[method]buffer.get-address\x01\
Expand All @@ -1121,15 +1130,15 @@ d]buffer.capacity\x01\x0a\x01i\x04\x01@\0\0\x0c\x04\0\x17[constructor]stream-obj
\x01@\x02\x04self\x0e\x06buffer\x06\x01\0\x04\0\x20[method]stream-obj.start-read\
ing\x01\x11\x01@\x01\x04self\x0e\x01\0\x04\0'[method]stream-obj.write-ready-acti\
vate\x01\x12\x01i\x01\x01@\x01\x04self\x0e\0\x13\x04\0'[method]stream-obj.read-r\
eady-subscribe\x01\x14\x01@\x01\x04self\x0e\0\x06\x04\0\x1e[method]stream-obj.re\
ad-result\x01\x15\x04\0$[method]stream-obj.is-ready-to-write\x01\x10\x04\0([meth\
od]stream-obj.write-ready-subscribe\x01\x14\x04\0\x20[method]stream-obj.start-wr\
iting\x01\x15\x04\0![method]stream-obj.finish-writing\x01\x11\x04\0&[method]stre\
am-obj.read-ready-activate\x01\x12\x01@\0\0\x06\x04\0\x0bend-of-file\x01\x16\x01\
@\x01\x03obj\x08\0\x7f\x04\0\x0eis-end-of-file\x01\x17\x03\0(symmetric:runtime/s\
[email protected]\x05\x02\x04\0\x1esymmetric:runtime/[email protected]\x04\0\x0b\x0c\
\x01\0\x06module\x03\0\0\0G\x09producers\x01\x0cprocessed-by\x02\x0dwit-componen\
t\x070.223.0\x10wit-bindgen-rust\x060.37.0";
eady-subscribe\x01\x14\x01k\x06\x01@\x01\x04self\x0e\0\x15\x04\0\x1e[method]stre\
am-obj.read-result\x01\x16\x04\0$[method]stream-obj.is-ready-to-write\x01\x10\x04\
\0([method]stream-obj.write-ready-subscribe\x01\x14\x01@\x01\x04self\x0e\0\x06\x04\
\0\x20[method]stream-obj.start-writing\x01\x17\x01@\x02\x04self\x0e\x06buffer\x15\
\x01\0\x04\0![method]stream-obj.finish-writing\x01\x18\x04\0&[method]stream-obj.\
read-ready-activate\x01\x12\x03\0(symmetric:runtime/[email protected]\x05\x02\
\x04\0\x1esymmetric:runtime/[email protected]\x04\0\x0b\x0c\x01\0\x06module\x03\0\0\0\
G\x09producers\x01\x0cprocessed-by\x02\x0dwit-component\x070.223.0\x10wit-bindge\
n-rust\x060.37.0";

#[inline(never)]
#[doc(hidden)]
Expand Down
51 changes: 20 additions & 31 deletions crates/symmetric_executor/symmetric_stream/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -13,8 +13,6 @@ use stream_impl::exports::symmetric::runtime::symmetric_stream::{
use stream_impl::symmetric::runtime::symmetric_executor::{
self, EventGenerator, EventSubscription,
};
use wit_bindgen_symmetric_rt::symmetric_stream::{end_of_file, is_end_of_file};
//use wit_bindgen_symmetric_rt::{async_support::Stream, EventGenerator};

mod stream_impl;

Expand Down Expand Up @@ -60,8 +58,8 @@ impl GuestBuffer for Buffer {

mod results {
pub const BLOCKED: isize = -1;
pub const CLOSED: isize = isize::MIN;
pub const CANCELED: isize = 0;
// pub const CLOSED: isize = isize::MIN;
// pub const CANCELED: isize = 0;
}

struct StreamInner {
Expand All @@ -71,7 +69,7 @@ struct StreamInner {
read_size: AtomicUsize,
ready_addr: AtomicPtr<()>,
ready_size: AtomicIsize,
// active_instances: AtomicUsize,
ready_capacity: AtomicUsize,
}

struct StreamObj(Arc<StreamInner>);
Expand All @@ -85,20 +83,21 @@ impl GuestStreamObj for StreamObj {
read_size: AtomicUsize::new(0),
ready_addr: AtomicPtr::new(core::ptr::null_mut()),
ready_size: AtomicIsize::new(results::BLOCKED),
// active_instances: AtomicUsize::new(1),
ready_capacity: AtomicUsize::new(0),
};
Self(Arc::new(inner))
}

fn is_write_closed(&self) -> bool {
self.0.ready_size.load(Ordering::Acquire) == results::CLOSED
self.0.ready_addr.load(Ordering::Acquire) as usize == EOF_MARKER
}

fn start_reading(&self, buffer: symmetric_stream::Buffer) {
let buf = buffer.get::<Buffer>().get_address().take_handle() as *mut ();
let size = buffer.get::<Buffer>().capacity();
let old_readya = self.0.ready_addr.load(Ordering::Acquire);
let old_ready = self.0.ready_size.load(Ordering::Acquire);
if old_ready == results::CLOSED {
if old_readya as usize == EOF_MARKER {
todo!();
// return old_ready;
}
Expand All @@ -118,17 +117,18 @@ impl GuestStreamObj for StreamObj {
// }
// }

fn read_result(&self) -> symmetric_stream::Buffer {
fn read_result(&self) -> Option<symmetric_stream::Buffer> {
let size = self.0.ready_size.swap(results::BLOCKED, Ordering::Acquire);
let addr = self.0.ready_addr.swap(null_mut(), Ordering::Acquire);
if addr == EOF_MARKER {
end_of_file()
if addr as usize == EOF_MARKER {
None
} else {
symmetric_stream::Buffer::new(Buffer {
addr,
capacity: size as usize,
size: AtomicUsize::new(size as usize),
}) }
Some(symmetric_stream::Buffer::new(Buffer {
addr,
capacity: size as usize,
size: AtomicUsize::new(size as usize),
}))
}
}

// fn close_read(stream: symmetric_stream::StreamObj) -> () {
Expand Down Expand Up @@ -171,15 +171,13 @@ impl GuestStreamObj for StreamObj {
})
}

fn finish_writing(&self, buffer: symmetric_stream::Buffer) -> () {
let (elements, addr) = if buffer.handle() == EOF_MARKER {
// is_end_of_file(&buffer) {
let _ = buffer.take_handle();
(0, EOF_MARKER as usize as *mut ())
} else {
fn finish_writing(&self, buffer: Option<symmetric_stream::Buffer>) -> () {
let (elements, addr) = if let Some(buffer) = buffer {
let elements = buffer.get::<Buffer>().get_size() as isize;
let addr = buffer.get::<Buffer>().get_address().take_handle() as *mut ();
(elements, addr)
} else {
(0, EOF_MARKER as usize as *mut ())
};
let old_ready = self.0.ready_size.swap(elements as isize, Ordering::Relaxed);
let _old_readya = self.0.ready_addr.swap(addr, Ordering::Release);
Expand Down Expand Up @@ -222,13 +220,4 @@ impl symmetric_stream::Guest for Guest {
type Buffer = Buffer;

type StreamObj = StreamObj;

fn end_of_file() -> symmetric_stream::Buffer {
unsafe { symmetric_stream::Buffer::from_handle(EOF_MARKER) }
}

fn is_end_of_file(obj: symmetric_stream::BufferBorrow<'_>) -> bool {
let ptr: *mut () = unsafe { transmute(obj) };
ptr as usize == EOF_MARKER
}
}
Loading

0 comments on commit f19d9d2

Please sign in to comment.