Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Thread tests #16

Open
wants to merge 2 commits into
base: refactor
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions examples/wasm_thread_example/.gitignore
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
/target
21 changes: 21 additions & 0 deletions examples/wasm_thread_example/Cargo.toml
Original file line number Diff line number Diff line change
@@ -0,0 +1,21 @@
[package]
name = "wasm_thread_example"
version = "0.1.0"
edition = "2021"

# See more keys and their definitions at https://doc.rust-lang.org/cargo/reference/manifest.html
[lib]
crate-type = ["cdylib", "rlib"]

[dependencies]
wasm_thread = { git = "https://github.com/chemicstry/wasm_thread.git", features = [
"es_modules",
], branch = "refactor" }
wasm-bindgen = { version = "0.2" }
wasm-bindgen-futures = "0.4.34"

# Dev dependencies
log = { version = "0.4" }
env_logger = { version = "0.8" }
console_error_panic_hook = { version = "0.1.6" }
console_log = { version = "1.0.0", features = ["color"] }
2 changes: 2 additions & 0 deletions examples/wasm_thread_example/pkg/.gitignore
Original file line number Diff line number Diff line change
@@ -0,0 +1,2 @@
snippets
wasm_thread_example*
12 changes: 12 additions & 0 deletions examples/wasm_thread_example/pkg/index.html
Original file line number Diff line number Diff line change
@@ -0,0 +1,12 @@
<!DOCTYPE html>
<html>
<head>
<meta charset="utf-8" />
<meta http-equiv="X-UA-Compatible" content="IE=edge" />
<title>Page Title</title>
<meta name="viewport" content="width=device-width, initial-scale=1" />
</head>
<body>
<script src="./index.js" type="module"></script>
</body>
</html>
12 changes: 12 additions & 0 deletions examples/wasm_thread_example/pkg/index.js
Original file line number Diff line number Diff line change
@@ -0,0 +1,12 @@
const worker = new Worker("./worker.js", { type: "module" });

worker.postMessage({ type: "init" });
worker.onmessage = (event) => {
if (event.data.type !== "initResponse") return;

for (let i = 0; i < 100000; i++) {
worker.postMessage({ type: "encode" });
}

worker.postMessage({ type: "finish" });
};
17 changes: 17 additions & 0 deletions examples/wasm_thread_example/pkg/serve.json
Original file line number Diff line number Diff line change
@@ -0,0 +1,17 @@
{
"headers": [
{
"source": "*",
"headers": [
{
"key": "Cross-Origin-Opener-Policy",
"value": "same-origin"
},
{
"key": "Cross-Origin-Embedder-Policy",
"value": "require-corp"
}
]
}
]
}
40 changes: 40 additions & 0 deletions examples/wasm_thread_example/pkg/worker.js
Original file line number Diff line number Diff line change
@@ -0,0 +1,40 @@
/// From https://github.com/rustwasm/wasm-bindgen/tree/main/examples/raytrace-parallel

// synchronously, using the browser, import out shim JS scripts
import init, { Encoder } from "./wasm_thread_example.js";

// Wait for the main thread to send us the shared module/memory. Once we've got
// it, initialize it all with the `wasm_bindgen` global we imported via
// `importScripts`.
//
// After our first message all subsequent messages are an entry point to run,
// so we just do that.
let encoder;
let lastCount;
let diffCount = 0;
self.onmessage = async (event) => {
switch (event.data.type) {
case "init": {
await init();
encoder = new Encoder();
self.postMessage({ type: "initResponse" });
break;
}
case "encode": {
encoder.increment();
const count = encoder.count();
if (count !== lastCount) {
lastCount = count;
console.log(count);
diffCount++;
}
break;
}
case "finish": {
const result = await encoder.stop();
console.log("diffCount", diffCount);
self.postMessage({ type: "finishResponse", result });
break;
}
}
};
74 changes: 74 additions & 0 deletions examples/wasm_thread_example/src/lib.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,74 @@
use std::sync::{atomic::AtomicUsize, mpsc::Sender};

use wasm_bindgen::prelude::*;
use wasm_thread as thread;

#[wasm_bindgen]
pub struct Encoder {
counter_thread: Option<thread::JoinHandle<usize>>,
queue: Option<Sender<()>>,
}

static COUNT: AtomicUsize = AtomicUsize::new(0);

#[wasm_bindgen]
impl Encoder {
#[wasm_bindgen(constructor)]
pub fn new() -> Self {
std::panic::set_hook(Box::new(console_error_panic_hook::hook));
console_log::init_with_level(log::Level::Info).expect("Couldn't initialize logger");

let (queue, receiver) = std::sync::mpsc::channel();

wasm_thread::Builder::default()
.prefix(String::from("gif"))
.set_default();

log::info!("Creating counter thread");

let counter_thread = thread::Builder::new()
.name(String::from("counter"))
.spawn(move || {
log::info!("Counter thread started");
while let Ok(_) = receiver.recv() {
let _old_count = COUNT.fetch_add(1, std::sync::atomic::Ordering::Relaxed);
}

log::info!("Counter thread stopped");
COUNT.load(std::sync::atomic::Ordering::Relaxed)
})
.unwrap();

log::info!("Created counter thread");

Self {
counter_thread: Some(counter_thread),
queue: Some(queue),
}
}

pub fn count(&self) -> usize {
COUNT.load(std::sync::atomic::Ordering::Relaxed)
}

pub fn increment(&mut self) {
self.queue
.as_ref()
.expect("Encoder was closed")
.send(())
.unwrap();
}

pub async fn stop(&mut self) {
self.queue = None;

match self.counter_thread.take().unwrap().join_async().await {
Ok(count) => {
log::info!("Count: {}", count);
}
Err(e) => {
log::error!("Error: {:?}", e);
}
}
}
}
26 changes: 26 additions & 0 deletions examples/wasm_thread_example/wasm-pack.sh
Original file line number Diff line number Diff line change
@@ -0,0 +1,26 @@
#!/bin/sh

case $1 in
*"-r"*) # Matches -r or --release
CONF="--release -Z build-std-features=panic_immediate_abort"
BINDGEN=""
FILE_PATH="release"
;;
*)
CONF=""
BINDGEN="--keep-debug"
FILE_PATH="debug"
;;
esac

RUSTFLAGS='-C target-feature=+atomics,+bulk-memory,+mutable-globals' \
cargo +nightly build \
--target wasm32-unknown-unknown \
-Z build-std=std,panic_abort \
${CONF}

wasm-bindgen \
target/wasm32-unknown-unknown/${FILE_PATH}/wasm_thread_example.wasm \
${BINDGEN} \
--target web \
--out-dir ./pkg
74 changes: 74 additions & 0 deletions tests/native.rs
Original file line number Diff line number Diff line change
@@ -1,5 +1,10 @@
//! Trivial tests to ensure that native thread API is unchanged.

use std::{
sync::atomic::{AtomicUsize, Ordering},
time::Duration,
};

use wasm_thread as thread;

#[test]
Expand Down Expand Up @@ -38,3 +43,72 @@ fn thread_scope() {
a.push(4);
assert_eq!(x, a.len());
}

#[test]
fn thread_messaging() {
use std::sync::mpsc::{channel, Receiver};

let (tx, rx) = channel();
static ATOMIC_COUNT: AtomicUsize = AtomicUsize::new(0);

fn reader_callback(rx: Receiver<String>) {
while let Ok(_) = rx.recv() {
let old_value = ATOMIC_COUNT.fetch_add(1, Ordering::Relaxed);
if old_value == usize::MAX {
break;
}
thread::sleep(Duration::from_millis(200));
}
}

let reader_thread = thread::Builder::new()
.name(String::from("reader"))
.spawn(|| reader_callback(rx))
.unwrap();

for i in 0..1000 {
tx.send(format!("message {}", i)).unwrap();
}

let _ = thread::spawn(move || {
drop(tx);
thread::sleep(Duration::from_millis(1100));

let value = ATOMIC_COUNT.load(Ordering::Relaxed);
std::assert_eq!(value, 6);
ATOMIC_COUNT.store(usize::MAX, Ordering::Relaxed);
})
.join()
.unwrap();

reader_thread.join().unwrap();
}

#[test]
fn thread_no_join() {
use std::sync::{atomic::AtomicBool, mpsc::channel};

let (tx, rx) = channel();
static ATOMIC_STARTED: AtomicBool = AtomicBool::new(false);

let _ = thread::Builder::new()
.name(String::from("polled"))
.spawn(move || {
ATOMIC_STARTED.store(true, Ordering::Relaxed);
rx.recv().unwrap();
})
.unwrap();

let _ = thread::Builder::new()
.name(String::from("awaiter"))
.spawn(move || {
thread::sleep(Duration::from_millis(1000));
let started = ATOMIC_STARTED.load(Ordering::Relaxed);
std::assert_eq!(started, true);
})
.unwrap()
.join()
.unwrap();

tx.send(()).unwrap();
}
76 changes: 75 additions & 1 deletion tests/wasm.rs
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
#![cfg(target_arch = "wasm32")]

use core::{
sync::atomic::{AtomicBool, Ordering},
sync::atomic::{AtomicBool, AtomicUsize, Ordering},
time::Duration,
};

Expand Down Expand Up @@ -97,3 +97,77 @@ async fn thread_scope_sync_block() {
.await
.unwrap();
}

#[wasm_bindgen_test]
async fn thread_messaging() {
use std::{
sync::mpsc::{channel, Receiver},
thread as std_thread,
};

let (tx, rx) = channel();
static ATOMIC_COUNT: AtomicUsize = AtomicUsize::new(0);

fn reader_callback(rx: Receiver<String>) {
while let Ok(_) = rx.recv() {
let old_value = ATOMIC_COUNT.fetch_add(1, Ordering::Relaxed);
if old_value == usize::MAX {
break;
}

std_thread::sleep(Duration::from_millis(200));
}
}

let reader_thread = thread::Builder::new()
.name(String::from("reader"))
.spawn(|| reader_callback(rx))
.unwrap();

for i in 0..1000 {
tx.send(format!("message {}", i)).unwrap();
}

let _ = thread::spawn(move || {
std_thread::sleep(Duration::from_millis(1100));

let value = ATOMIC_COUNT.load(Ordering::Relaxed);
std::assert_eq!(value, 6);
ATOMIC_COUNT.store(usize::MAX, Ordering::Relaxed);
})
.join_async()
.await
.unwrap();

reader_thread.join_async().await.unwrap();
}

#[wasm_bindgen_test]
async fn thread_no_join() {
use std::sync::mpsc::channel;

let (tx, rx) = channel();
static ATOMIC_STARTED: AtomicBool = AtomicBool::new(false);

let _ = thread::Builder::new()
.name(String::from("polled"))
.spawn(move || {
ATOMIC_STARTED.store(true, Ordering::Relaxed);
rx.recv().unwrap();
})
.unwrap();

let _ = thread::Builder::new()
.name(String::from("awaiter"))
.spawn(move || {
thread::sleep(Duration::from_millis(1000));
let started = ATOMIC_STARTED.load(Ordering::Relaxed);
std::assert_eq!(started, true);
})
.unwrap()
.join_async()
.await
.unwrap();

tx.send(()).unwrap();
}