From 4f3a98f90ad6f990b7ae471d9d160aa0a05fd04a Mon Sep 17 00:00:00 2001 From: Manos Pitsidianakis Date: Fri, 26 Jun 2020 18:31:37 +0300 Subject: [PATCH] Add job executor --- Cargo.toml | 3 + src/bin.rs | 5 + src/conf/accounts.rs | 3 + src/jobs1.rs | 242 +++++++++++++++++++++++++++++++++++++++++++ src/managesieve.rs | 1 + src/state.rs | 5 + src/types.rs | 2 + 7 files changed, 261 insertions(+) create mode 100644 src/jobs1.rs diff --git a/Cargo.toml b/Cargo.toml index 36822e4c..43a62165 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -51,6 +51,9 @@ bitflags = "1.0" pcre2 = { version = "0.2.3", optional = true } structopt = { version = "0.3.14", default-features = false } svg_crate = { version = "0.8.0", optional = true, package = "svg" } +futures = "0.3.5" +async-task = "3.0.0" +num_cpus = "1.12.0" [build-dependencies] syn = { version = "1.0.31", features = [] } diff --git a/src/bin.rs b/src/bin.rs index 8ecbe24b..9b8ac7e2 100644 --- a/src/bin.rs +++ b/src/bin.rs @@ -80,6 +80,7 @@ use crate::workers::*; #[cfg(feature = "sqlite3")] pub mod sqlite3; +pub mod jobs1; pub mod mailcap; pub mod plugins; @@ -463,6 +464,10 @@ fn run_app(opt: Opt) -> Result<()> { ThreadEvent::NewThread(id, name) => { state.new_thread(id, name); }, + ThreadEvent::JobFinished(id) => { + debug!("Job finished {}", id); + //state.new_thread(id, name); + }, } }, recv(signal_recvr) -> sig => { diff --git a/src/conf/accounts.rs b/src/conf/accounts.rs index 1fe29380..2f7f0770 100644 --- a/src/conf/accounts.rs +++ b/src/conf/accounts.rs @@ -24,6 +24,7 @@ */ use super::{AccountConf, FileMailboxConf}; +use crate::jobs1::JobExecutor; use melib::async_workers::{Async, AsyncBuilder, AsyncStatus, WorkContext}; use melib::backends::{ AccountHash, BackendOp, Backends, MailBackend, Mailbox, MailboxHash, NotifyFn, ReadOnlyOp, @@ -41,6 +42,7 @@ use std::collections::{HashMap, HashSet}; use crate::types::UIEvent::{self, EnvelopeRemove, EnvelopeRename, EnvelopeUpdate, Notification}; use crate::{StatusEvent, ThreadEvent}; use crossbeam::Sender; +pub use futures::stream::Stream; use std::collections::VecDeque; use std::fs; use std::io; @@ -193,6 +195,7 @@ impl Account { mut settings: AccountConf, map: &Backends, work_context: WorkContext, + job_executor: &JobExecutor, sender: Sender, notify_fn: NotifyFn, ) -> Result { diff --git a/src/jobs1.rs b/src/jobs1.rs new file mode 100644 index 00000000..f9b002cc --- /dev/null +++ b/src/jobs1.rs @@ -0,0 +1,242 @@ +/* + * meli - jobs executor + * + * Copyright 2020 Manos Pitsidianakis + * + * This file is part of meli. + * + * meli is free software: you can redistribute it and/or modify + * it under the terms of the GNU General Public License as published by + * the Free Software Foundation, either version 3 of the License, or + * (at your option) any later version. + * + * meli is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU General Public License for more details. + * + * You should have received a copy of the GNU General Public License + * along with meli. If not, see . + */ + +use std::future::Future; +use std::panic::catch_unwind; +use std::time::Duration; +use std::pin::Pin; +use std::sync::Arc; +use std::task::{Context, Poll}; +use std::thread; +use uuid::Uuid; + +use crossbeam::channel; +use crossbeam::deque::{Injector, Steal, Stealer, Worker}; +use std::iter; +use crossbeam::Sender; +use crate::types::ThreadEvent; +use futures::channel::oneshot; +use crossbeam::sync::{Unparker, Parker}; + +type AsyncTask = async_task::Task<()>; + +fn find_task( + local: &Worker, + global: &Injector, + stealers: &[Stealer], +) -> Option { + // Pop a task from the local queue, if not empty. + local.pop().or_else(|| { + // Otherwise, we need to look for a task elsewhere. + iter::repeat_with(|| { + // Try stealing a batch of tasks from the global queue. + global.steal_batch_and_pop(local) + // Or try stealing a task from one of the other threads. + .or_else(|| stealers.iter().map(|s| s.steal()).collect()) + }) + // Loop while no task was stolen and any steal operation needs to be retried. + .find(|s| !s.is_retry()) + // Extract the stolen task, if there is one. + .and_then(|s| s.success()) + }) +} + +macro_rules! uuid_hash_type { + ($n:ident) => { + #[derive(PartialEq, Hash, Eq, Copy, Clone, Serialize, Deserialize, Default)] + pub struct $n(Uuid); + + impl core::fmt::Debug for $n { + fn fmt(&self, f: &mut core::fmt::Formatter) -> core::fmt::Result { + write!(f, "{}", self.0.to_string()) + } + } + + impl core::fmt::Display for $n { + fn fmt(&self, f: &mut core::fmt::Formatter) -> core::fmt::Result { + write!(f, "{}", self.0.to_string()) + } + } + + impl $n { + fn new() -> Self { + $n(Uuid::new_v4()) + } + pub fn null() -> Self { + $n(Uuid::nil()) + } + } + }; +} +uuid_hash_type!(JobId); + +/// A spawned future and its current state. +pub struct MeliTask { + task: AsyncTask, + id: JobId, +} + +pub struct JobExecutor { + active_jobs: Vec, + global_queue: Arc>, + workers: Vec>, + sender: Sender, + parkers: Vec, +} + +impl JobExecutor { +/// A queue that holds scheduled tasks. + pub fn new(sender: Sender) -> Self { + // Create a queue. + let mut ret = JobExecutor { + active_jobs: vec![], + global_queue: Arc::new(Injector::new()), + workers: vec![], + parkers: vec![], + sender, + }; + let mut workers = vec![]; + for _ in 0..num_cpus::get().max(1) { + let new_worker = Worker::new_fifo(); + ret.workers.push(new_worker.stealer()); + let p = Parker::new(); +ret.parkers.push(p.unparker().clone()); + workers.push((new_worker, p)); + } + + + + // Spawn executor threads the first time the queue is created. + for (i, (local, parker)) in workers.into_iter().enumerate() { + let sender = ret.sender.clone(); + let global = ret.global_queue.clone(); + let stealers = ret.workers.clone(); + thread::Builder::new() + .name(format!("meli executor thread {}", i)) + .spawn(move || { + loop { + parker.park_timeout(Duration::from_millis(100)); + let task = find_task(&local, &global, stealers.as_slice()); + if let Some(meli_task) = task { + let MeliTask { task, id} = meli_task; + debug!("Worker {} got task {:?}", i,id); + if let Ok(false) = catch_unwind(|| task.run()) { + debug!("Worker {} got result {:?}", i,id); + sender.send(ThreadEvent::JobFinished(id)).unwrap(); + } else { + debug!("Worker {} rescheduled {:?}", i,id); + } + } + } + }); + } + ret + } + /// Spawns a future on the executor. + pub fn spawn(&self, future: F) -> JoinHandle + where + F: Future + Send + 'static, + { + let job_id = JobId::new(); + let _job_id = job_id.clone(); + let injector = self.global_queue.clone(); + // Create a task and schedule it for execution. + let (task, handle) = async_task::spawn(async { + let _ = future.await; + }, move |task| injector.push(MeliTask { task, id: _job_id }), ()); + task.schedule(); + for unparker in self.parkers.iter() { + unparker.unpark(); + } + + // Return a join handle that retrieves the output of the future. + JoinHandle(handle) + } + +///// Spawns a future on the executor. +pub fn spawn_specialized(&self, future: F) -> (oneshot::Receiver, JobId) +where + F: Future + Send + 'static, + R: Send + 'static, +{ + let (sender, receiver) = oneshot::channel(); + let job_id = JobId::new(); + let _job_id = job_id.clone(); + let injector = self.global_queue.clone(); + // Create a task and schedule it for execution. + let (task, handle) = async_task::spawn(async { + let res = future.await; + sender.send(res); + }, move |task| injector.push(MeliTask { task, id: _job_id }), ()); + task.schedule(); + for unparker in self.parkers.iter() { + unparker.unpark(); + } + + (receiver, job_id) +} + +} + + +///// Spawns a future on the executor. +//fn spawn(future: F) -> JoinHandle +//where +// F: Future + Send + 'static, +// R: Send + 'static, +//{ +// // Create a task and schedule it for execution. +// let (task, handle) = async_task::spawn(future, |t| QUEUE.send(t).unwrap(), ()); +// task.schedule(); +// +// // Return a join handle that retrieves the output of the future. +// JoinHandle(handle) +//} + +/// Awaits the output of a spawned future. +pub struct JoinHandle( async_task::JoinHandle<(), ()>); + +impl Future for JoinHandle { + type Output = (); + + fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll { + match Pin::new(&mut self.0).poll(cx) { + Poll::Pending => Poll::Pending, + Poll::Ready(output) => Poll::Ready(output.expect("task failed")), + } + } +} + +/* +fn _test() { + let executor = JobExecutor::new(); + futures::executor::block_on(async { + // Spawn a future. + let handle = executor.spawn(async { + println!("Running task..."); + panic!(); + }); + + // Await its output. + handle.await; + }); +} +*/ diff --git a/src/managesieve.rs b/src/managesieve.rs index 6b4a54ee..a99e8feb 100644 --- a/src/managesieve.rs +++ b/src/managesieve.rs @@ -69,6 +69,7 @@ use crate::workers::*; #[cfg(feature = "sqlite3")] pub mod sqlite3; +pub mod jobs1; pub mod mailcap; pub mod plugins; diff --git a/src/state.rs b/src/state.rs index 8b60e421..12df6e02 100644 --- a/src/state.rs +++ b/src/state.rs @@ -32,6 +32,7 @@ use super::*; use crate::plugins::PluginManager; use melib::backends::{AccountHash, MailboxHash, NotifyFn}; +use crate::jobs1::JobExecutor; use crossbeam::channel::{unbounded, Receiver, Sender}; use smallvec::SmallVec; use std::collections::HashMap; @@ -95,6 +96,7 @@ pub struct Context { receiver: Receiver, input: InputHandler, work_controller: WorkController, + job_executor: JobExecutor, pub children: Vec, pub plugin_manager: PluginManager, @@ -244,6 +246,7 @@ impl State { let mut account_hashes = HashMap::with_capacity_and_hasher(1, Default::default()); let work_controller = WorkController::new(sender.clone()); + let job_executor = JobExecutor::new(sender.clone()); let accounts: Vec = { let mut file_accs = settings .accounts @@ -271,6 +274,7 @@ impl State { a_s.clone(), &backends, work_controller.get_context(), + &job_executor, sender.clone(), NotifyFn::new(Box::new(move |f: MailboxHash| { sender @@ -331,6 +335,7 @@ impl State { replies: VecDeque::with_capacity(5), temp_files: Vec::new(), work_controller, + job_executor, children: vec![], plugin_manager, diff --git a/src/types.rs b/src/types.rs index 08244d4e..a9b08e06 100644 --- a/src/types.rs +++ b/src/types.rs @@ -36,6 +36,7 @@ mod helpers; pub use self::helpers::*; use super::execute::Action; +use super::jobs1::JobId; use super::terminal::*; use melib::backends::{AccountHash, MailboxHash}; @@ -68,6 +69,7 @@ pub enum ThreadEvent { /// A thread has updated some of its information Pulse, //Decode { _ }, // For gpg2 signature check + JobFinished(JobId), } impl From for ThreadEvent {