/* * meli - jobs executor * * Copyright 2020 Manos Pitsidianakis * * This file is part of meli. * * meli is free software: you can redistribute it and/or modify * it under the terms of the GNU General Public License as published by * the Free Software Foundation, either version 3 of the License, or * (at your option) any later version. * * meli is distributed in the hope that it will be useful, * but WITHOUT ANY WARRANTY; without even the implied warranty of * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the * GNU General Public License for more details. * * You should have received a copy of the GNU General Public License * along with meli. If not, see . */ //! Async job executor thread pool //! //! ## Usage //! ```no_run //! let (channel, handle, job_id) = job_executor.spawn(job); //! ``` use melib::smol; use melib::uuid::Uuid; use std::collections::HashMap; use std::future::Future; use std::panic::catch_unwind; use std::sync::{Arc, Mutex}; use std::thread; use std::time::Duration; use crate::types::{ThreadEvent, UIEvent}; use crossbeam::channel::Sender; use crossbeam::deque::{Injector, Stealer, Worker}; use crossbeam::sync::{Parker, Unparker}; pub use futures::channel::oneshot; use std::iter; type AsyncTask = async_task::Runnable; fn find_task(local: &Worker, global: &Injector, stealers: &[Stealer]) -> Option { // Pop a task from the local queue, if not empty. local.pop().or_else(|| { // Otherwise, we need to look for a task elsewhere. iter::repeat_with(|| { // Try stealing a batch of tasks from the global queue. global .steal_batch_and_pop(local) // Or try stealing a task from one of the other threads. .or_else(|| stealers.iter().map(|s| s.steal()).collect()) }) // Loop while no task was stolen and any steal operation needs to be retried. .find(|s| !s.is_retry()) // Extract the stolen task, if there is one. .and_then(|s| s.success()) }) } macro_rules! uuid_hash_type { ($n:ident) => { #[derive( PartialEq, Hash, Eq, Copy, Clone, Ord, PartialOrd, Serialize, Deserialize, Default, )] pub struct $n(Uuid); impl core::fmt::Debug for $n { fn fmt(&self, f: &mut core::fmt::Formatter) -> core::fmt::Result { write!(f, "{}", self.0.to_string()) } } impl core::fmt::Display for $n { fn fmt(&self, f: &mut core::fmt::Formatter) -> core::fmt::Result { write!(f, "{}", self.0.to_string()) } } impl $n { pub fn new() -> Self { $n(Uuid::new_v4()) } pub fn null() -> Self { $n(Uuid::nil()) } } }; } uuid_hash_type!(JobId); /// A spawned future and its current state. pub struct MeliTask { task: AsyncTask, id: JobId, timer: bool, } #[derive(Debug)] pub struct JobExecutor { global_queue: Arc>, workers: Vec>, sender: Sender, parkers: Vec, timers: Arc>>, } #[derive(Debug, Default)] struct TimerPrivate { /// Interval for periodic timer. interval: Duration, /// Time until next expiration. value: Duration, active: bool, handle: Option>, cancel: Arc>, } #[derive(Debug)] pub struct Timer { id: Uuid, job_executor: Arc, } impl Timer { pub fn id(&self) -> Uuid { self.id } pub fn rearm(&self) { self.job_executor.rearm(self.id); } pub fn disable(&self) { self.job_executor.disable_timer(self.id); } pub fn set_interval(&self, new_val: Duration) { self.job_executor.set_interval(self.id, new_val); } } impl Drop for Timer { fn drop(&mut self) { self.disable(); } } impl JobExecutor { /// A queue that holds scheduled tasks. pub fn new(sender: Sender) -> Self { // Create a queue. let mut ret = JobExecutor { global_queue: Arc::new(Injector::new()), workers: vec![], parkers: vec![], sender, timers: Arc::new(Mutex::new(HashMap::default())), }; let mut workers = vec![]; for _ in 0..num_cpus::get().max(1) { let new_worker = Worker::new_fifo(); ret.workers.push(new_worker.stealer()); let p = Parker::new(); ret.parkers.push(p.unparker().clone()); workers.push((new_worker, p)); } // Reactor thread thread::Builder::new() .name("meli-reactor".to_string()) .spawn(move || { let ex = smol::Executor::new(); futures::executor::block_on(ex.run(futures::future::pending::<()>())); }) .unwrap(); // Spawn executor threads the first time the queue is created. for (i, (local, parker)) in workers.into_iter().enumerate() { let global = ret.global_queue.clone(); let stealers = ret.workers.clone(); thread::Builder::new() .name(format!("meli-executor-{}", i)) .spawn(move || loop { parker.park_timeout(Duration::from_millis(100)); let task = find_task(&local, &global, stealers.as_slice()); if let Some(meli_task) = task { let MeliTask { task, id, timer } = meli_task; if !timer { debug!("Worker {} got task {:?}", i, id); } let _ = catch_unwind(|| task.run()); if !timer { debug!("Worker {} returned after {:?}", i, id); } } }) .unwrap(); } ret } /// Spawns a future with a generic return value `R` pub fn spawn_specialized(&self, future: F) -> JoinHandle where F: Future + Send + 'static, R: Send + 'static, { let (sender, receiver) = oneshot::channel(); let finished_sender = self.sender.clone(); let job_id = JobId::new(); let injector = self.global_queue.clone(); let cancel = Arc::new(Mutex::new(false)); let cancel2 = cancel.clone(); // Create a task and schedule it for execution. let (handle, task) = async_task::spawn( async move { let res = future.await; let _ = sender.send(res); finished_sender .send(ThreadEvent::JobFinished(job_id)) .unwrap(); }, move |task| { if *cancel.lock().unwrap() { return; } injector.push(MeliTask { task, id: job_id, timer: false, }) }, ); handle.schedule(); for unparker in self.parkers.iter() { unparker.unpark(); } JoinHandle { task: Arc::new(Mutex::new(Some(task))), cancel: cancel2, chan: receiver, job_id, } } /// Spawns a future with a generic return value `R` that might block on a new thread pub fn spawn_blocking(&self, future: F) -> JoinHandle where F: Future + Send + 'static, R: Send + 'static, { self.spawn_specialized(smol::unblock(move || futures::executor::block_on(future))) } pub fn create_timer(self: Arc, interval: Duration, value: Duration) -> Timer { let id = Uuid::new_v4(); let timer = TimerPrivate { interval, cancel: Arc::new(Mutex::new(false)), value, active: true, handle: None, }; self.timers.lock().unwrap().insert(id, timer); self.arm_timer(id, value); Timer { id, job_executor: self, } } pub fn rearm(&self, timer_id: Uuid) { let mut timers_lck = self.timers.lock().unwrap(); if let Some(timer) = timers_lck.get_mut(&timer_id) { let value = timer.value; drop(timers_lck); self.arm_timer(timer_id, value); } } fn arm_timer(&self, id: Uuid, value: Duration) { let job_id = JobId::new(); let sender = self.sender.clone(); let injector = self.global_queue.clone(); let timers = self.timers.clone(); let cancel = Arc::new(Mutex::new(false)); let cancel2 = cancel.clone(); let (task, handle) = async_task::spawn( async move { let mut value = value; loop { smol::Timer::after(value).await; sender .send(ThreadEvent::UIEvent(UIEvent::Timer(id))) .unwrap(); if let Some(interval) = timers.lock().unwrap().get(&id).and_then(|timer| { if timer.interval.as_millis() == 0 && timer.interval.as_secs() == 0 { None } else if timer.active { Some(timer.interval) } else { None } }) { value = interval; } else { break; } } }, move |task| { if *cancel.lock().unwrap() { return; } injector.push(MeliTask { task, id: job_id, timer: true, }) }, ); self.timers.lock().unwrap().entry(id).and_modify(|timer| { timer.handle = Some(handle); timer.cancel = cancel2; timer.active = true; }); task.schedule(); for unparker in self.parkers.iter() { unparker.unpark(); } } fn disable_timer(&self, id: Uuid) { let mut timers_lck = self.timers.lock().unwrap(); if let Some(timer) = timers_lck.get_mut(&id) { timer.active = false; *timer.cancel.lock().unwrap() = true; } } fn set_interval(&self, id: Uuid, new_val: Duration) { let mut timers_lck = self.timers.lock().unwrap(); if let Some(timer) = timers_lck.get_mut(&id) { timer.interval = new_val; } } } pub type JobChannel = oneshot::Receiver; #[derive(Debug)] /// JoinHandle for the future that allows us to cancel the task. pub struct JoinHandle { pub task: Arc>>>, pub chan: JobChannel, pub cancel: Arc>, pub job_id: JobId, } impl JoinHandle { pub fn cancel(&self) { *self.cancel.lock().unwrap() = true; } } impl std::cmp::PartialEq for JoinHandle { fn eq(&self, other: &JobId) -> bool { self.job_id == *other } } /* use std::pin::Pin; use std::task::{Context, Poll}; impl Future for JoinHandle { type Output = Result<()>; fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll { match Pin::new(&mut self.inner).poll(cx) { Poll::Pending => Poll::Pending, Poll::Ready(output) => Poll::Ready(output.expect("task failed")), } } } */