307 lines
11 KiB
Rust
307 lines
11 KiB
Rust
// Silences warnings from the compiler about Work.func and child_entry_point
|
|
// being unused when the target is not wasm.
|
|
#![cfg_attr(not(target_arch = "wasm32"), allow(dead_code))]
|
|
|
|
//! A small module that's intended to provide an example of creating a pool of
|
|
//! web workers which can be used to execute `rayon`-style work.
|
|
|
|
macro_rules! console_log {
|
|
($($t:tt)*) => (log(&format_args!($($t)*).to_string()))
|
|
}
|
|
|
|
#[wasm_bindgen]
|
|
extern "C" {
|
|
#[wasm_bindgen(js_namespace = console)]
|
|
fn log(s: &str);
|
|
#[wasm_bindgen(js_namespace = console, js_name = log)]
|
|
fn logv(x: &JsValue);
|
|
}
|
|
use crate::ThreadEvent;
|
|
use std::cell::RefCell;
|
|
use std::collections::VecDeque;
|
|
use std::rc::Rc;
|
|
use std::sync::{Arc, Mutex};
|
|
use wasm_bindgen::prelude::*;
|
|
use wasm_bindgen::JsCast;
|
|
use web_sys::{CustomEvent, CustomEventInit, ErrorEvent, Event, Worker};
|
|
use web_sys::{DedicatedWorkerGlobalScope, MessageEvent};
|
|
|
|
#[derive(Clone, Debug)]
|
|
pub struct Sender {
|
|
pub event_queue: Arc<Mutex<VecDeque<ThreadEvent>>>,
|
|
}
|
|
|
|
impl Sender {
|
|
pub fn new() -> Self {
|
|
Self {
|
|
event_queue: Arc::new(Mutex::new(VecDeque::with_capacity(5))),
|
|
}
|
|
}
|
|
|
|
pub fn send(&self, e: crate::ThreadEvent) -> melib::Result<()> {
|
|
let mut event_queue_lck = self.event_queue.lock().unwrap();
|
|
event_queue_lck.push_back(e);
|
|
/*
|
|
let b = Box::new(e);
|
|
crate::js_console(&format!("dispatch thread event {:?}", b));
|
|
let ptr = Box::into_raw(b);
|
|
|
|
let event = CustomEvent::new_with_event_init_dict(
|
|
"meli-event",
|
|
CustomEventInit::new().detail(&JsValue::from(ptr as u32)),
|
|
)
|
|
.unwrap();
|
|
let window = web_sys::window().expect("no global `window` exists");
|
|
let document = window.document().expect("should have a document on window");
|
|
|
|
// Manufacture the element we're gonna append
|
|
let el = if let Some(val) = document.get_element_by_id("terminal") {
|
|
val
|
|
} else {
|
|
crate::js_console("COULD NOT GET ELEMENT BY ID #terminal !!!");
|
|
panic!();
|
|
};
|
|
crate::js_console(&format!(
|
|
"dispatch thread event result {:?}",
|
|
el.dispatch_event(&event)
|
|
));
|
|
*/
|
|
Ok(())
|
|
}
|
|
}
|
|
|
|
#[wasm_bindgen]
|
|
#[derive(Clone, Debug)]
|
|
pub struct WorkController {
|
|
pool_state: Rc<PoolState>,
|
|
}
|
|
|
|
#[derive(Debug)]
|
|
struct PoolState {
|
|
workers: RefCell<Vec<Worker>>,
|
|
callback: Closure<dyn FnMut(Event)>,
|
|
}
|
|
|
|
struct Work {
|
|
func: Box<dyn FnOnce() + Send>,
|
|
}
|
|
|
|
#[wasm_bindgen]
|
|
impl WorkController {
|
|
/// Creates a new `WorkController` which immediately creates `initial` workers.
|
|
///
|
|
/// The pool created here can be used over a long period of time, and it
|
|
/// will be initially primed with `initial` workers. Currently workers are
|
|
/// never released or gc'd until the whole pool is destroyed.
|
|
///
|
|
/// # Errors
|
|
///
|
|
/// Returns any error that may happen while a JS web worker is created and a
|
|
/// message is sent to it.
|
|
#[wasm_bindgen(constructor)]
|
|
pub fn new(initial: usize) -> Result<WorkController, JsValue> {
|
|
let pool = WorkController {
|
|
pool_state: Rc::new(PoolState {
|
|
workers: RefCell::new(Vec::with_capacity(initial)),
|
|
callback: Closure::wrap(Box::new(|event: Event| {
|
|
console_log!("unhandled event: {}", event.type_());
|
|
logv(&event);
|
|
}) as Box<dyn FnMut(Event)>),
|
|
}),
|
|
};
|
|
for _ in 0..initial {
|
|
//let worker = pool.spawn()?;
|
|
//pool.pool_state.push(worker);
|
|
}
|
|
|
|
Ok(pool)
|
|
}
|
|
|
|
/// Unconditionally spawns a new worker
|
|
///
|
|
/// The worker isn't registered with this `WorkController` but is capable of
|
|
/// executing work for this wasm module.
|
|
///
|
|
/// # Errors
|
|
///
|
|
/// Returns any error that may happen while a JS web worker is created and a
|
|
/// message is sent to it.
|
|
fn spawn(&self) -> Result<Worker, JsValue> {
|
|
console_log!("spawning new worker");
|
|
// TODO: what do do about `./worker.js`:
|
|
//
|
|
// * the path is only known by the bundler. How can we, as a
|
|
// library, know what's going on?
|
|
// * How do we not fetch a script N times? It internally then
|
|
// causes another script to get fetched N times...
|
|
let worker = Worker::new("./worker.js")?;
|
|
|
|
// With a worker spun up send it the module/memory so it can start
|
|
// instantiating the wasm module. Later it might receive further
|
|
// messages about code to run on the wasm module.
|
|
let array = js_sys::Array::new();
|
|
array.push(&wasm_bindgen::module());
|
|
array.push(&wasm_bindgen::memory());
|
|
worker.post_message(&array)?;
|
|
|
|
Ok(worker)
|
|
}
|
|
|
|
/// Fetches a worker from this pool, spawning one if necessary.
|
|
///
|
|
/// This will attempt to pull an already-spawned web worker from our cache
|
|
/// if one is available, otherwise it will spawn a new worker and return the
|
|
/// newly spawned worker.
|
|
///
|
|
/// # Errors
|
|
///
|
|
/// Returns any error that may happen while a JS web worker is created and a
|
|
/// message is sent to it.
|
|
fn worker(&self) -> Result<Worker, JsValue> {
|
|
match self.pool_state.workers.borrow_mut().pop() {
|
|
Some(worker) => Ok(worker),
|
|
None => self.spawn(),
|
|
}
|
|
}
|
|
|
|
/// Executes the work `f` in a web worker, spawning a web worker if
|
|
/// necessary.
|
|
///
|
|
/// This will acquire a web worker and then send the closure `f` to the
|
|
/// worker to execute. The worker won't be usable for anything else while
|
|
/// `f` is executing, and no callbacks are registered for when the worker
|
|
/// finishes.
|
|
///
|
|
/// # Errors
|
|
///
|
|
/// Returns any error that may happen while a JS web worker is created and a
|
|
/// message is sent to it.
|
|
fn execute(&self, f: impl FnOnce() + Send + 'static) -> Result<Worker, JsValue> {
|
|
let worker = self.worker()?;
|
|
let work = Box::new(Work { func: Box::new(f) });
|
|
let ptr = Box::into_raw(work);
|
|
match worker.post_message(&JsValue::from(ptr as u32)) {
|
|
Ok(()) => Ok(worker),
|
|
Err(e) => {
|
|
unsafe {
|
|
drop(Box::from_raw(ptr));
|
|
}
|
|
Err(e)
|
|
}
|
|
}
|
|
}
|
|
|
|
/// Configures an `onmessage` callback for the `worker` specified for the
|
|
/// web worker to be reclaimed and re-inserted into this pool when a message
|
|
/// is received.
|
|
///
|
|
/// Currently this `WorkController` abstraction is intended to execute one-off
|
|
/// style work where the work itself doesn't send any notifications and
|
|
/// whatn it's done the worker is ready to execute more work. This method is
|
|
/// used for all spawned workers to ensure that when the work is finished
|
|
/// the worker is reclaimed back into this pool.
|
|
fn reclaim_on_message(&self, worker: Worker) {
|
|
let state = Rc::downgrade(&self.pool_state);
|
|
let worker2 = worker.clone();
|
|
let reclaim_slot = Rc::new(RefCell::new(None));
|
|
let slot2 = reclaim_slot.clone();
|
|
let reclaim = Closure::wrap(Box::new(move |event: Event| {
|
|
if let Some(error) = event.dyn_ref::<ErrorEvent>() {
|
|
console_log!("error in worker: {}", error.message());
|
|
// TODO: this probably leaks memory somehow? It's sort of
|
|
// unclear what to do about errors in workers right now.
|
|
return;
|
|
}
|
|
|
|
// If this is a completion event then can deallocate our own
|
|
// callback by clearing out `slot2` which contains our own closure.
|
|
if let Some(_msg) = event.dyn_ref::<MessageEvent>() {
|
|
if let Some(state) = state.upgrade() {
|
|
state.push(worker2.clone());
|
|
}
|
|
*slot2.borrow_mut() = None;
|
|
return;
|
|
}
|
|
|
|
console_log!("unhandled event: {}", event.type_());
|
|
logv(&event);
|
|
// TODO: like above, maybe a memory leak here?
|
|
}) as Box<dyn FnMut(Event)>);
|
|
worker.set_onmessage(Some(reclaim.as_ref().unchecked_ref()));
|
|
*reclaim_slot.borrow_mut() = Some(reclaim);
|
|
}
|
|
}
|
|
|
|
impl WorkController {
|
|
/// Executes `f` in a web worker.
|
|
///
|
|
/// This pool manages a set of web workers to draw from, and `f` will be
|
|
/// spawned quickly into one if the worker is idle. If no idle workers are
|
|
/// available then a new web worker will be spawned.
|
|
///
|
|
/// Once `f` returns the worker assigned to `f` is automatically reclaimed
|
|
/// by this `WorkController`. This method provides no method of learning when
|
|
/// `f` completes, and for that you'll need to use `run_notify`.
|
|
///
|
|
/// # Errors
|
|
///
|
|
/// If an error happens while spawning a web worker or sending a message to
|
|
/// a web worker, that error is returned.
|
|
pub fn run(&self, f: impl FnOnce() + Send + 'static) -> Result<(), JsValue> {
|
|
let worker = self.execute(f)?;
|
|
self.reclaim_on_message(worker);
|
|
Ok(())
|
|
}
|
|
|
|
pub fn send(&self, e: crate::UIEvent) {
|
|
let b = Box::new(ThreadEvent::UIEvent(e));
|
|
crate::js_console(&format!("dispatch thread event {:?}", b));
|
|
let ptr = Box::into_raw(b);
|
|
|
|
let event = CustomEvent::new_with_event_init_dict(
|
|
"meli-event",
|
|
CustomEventInit::new().detail(&JsValue::from(ptr as u32)),
|
|
)
|
|
.unwrap();
|
|
let window = web_sys::window().expect("no global `window` exists");
|
|
let document = window.document().expect("should have a document on window");
|
|
|
|
// Manufacture the element we're gonna append
|
|
let el = if let Some(val) = document.get_element_by_id("terminal") {
|
|
val
|
|
} else {
|
|
crate::js_console("COULD NOT GET ELEMENT BY ID #terminal !!!");
|
|
panic!();
|
|
};
|
|
crate::js_console(&format!(
|
|
"dispatch thread event result {:?}",
|
|
el.dispatch_event(&event)
|
|
));
|
|
}
|
|
}
|
|
|
|
impl PoolState {
|
|
fn push(&self, worker: Worker) {
|
|
worker.set_onmessage(Some(self.callback.as_ref().unchecked_ref()));
|
|
worker.set_onerror(Some(self.callback.as_ref().unchecked_ref()));
|
|
let mut workers = self.workers.borrow_mut();
|
|
for prev in workers.iter() {
|
|
let prev: &JsValue = prev;
|
|
let worker: &JsValue = &worker;
|
|
assert!(prev != worker);
|
|
}
|
|
workers.push(worker);
|
|
}
|
|
}
|
|
|
|
/// Entry point invoked by `worker.js`, a bit of a hack but see the "TODO" above
|
|
/// about `worker.js` in general.
|
|
#[wasm_bindgen]
|
|
pub fn child_entry_point(ptr: u32) -> Result<(), JsValue> {
|
|
let ptr = unsafe { Box::from_raw(ptr as *mut Work) };
|
|
let global = js_sys::global().unchecked_into::<DedicatedWorkerGlobalScope>();
|
|
(ptr.func)();
|
|
global.post_message(&JsValue::undefined())?;
|
|
Ok(())
|
|
}
|