Add job executor
parent
de201b5d6c
commit
4f3a98f90a
|
@ -51,6 +51,9 @@ bitflags = "1.0"
|
||||||
pcre2 = { version = "0.2.3", optional = true }
|
pcre2 = { version = "0.2.3", optional = true }
|
||||||
structopt = { version = "0.3.14", default-features = false }
|
structopt = { version = "0.3.14", default-features = false }
|
||||||
svg_crate = { version = "0.8.0", optional = true, package = "svg" }
|
svg_crate = { version = "0.8.0", optional = true, package = "svg" }
|
||||||
|
futures = "0.3.5"
|
||||||
|
async-task = "3.0.0"
|
||||||
|
num_cpus = "1.12.0"
|
||||||
|
|
||||||
[build-dependencies]
|
[build-dependencies]
|
||||||
syn = { version = "1.0.31", features = [] }
|
syn = { version = "1.0.31", features = [] }
|
||||||
|
|
|
@ -80,6 +80,7 @@ use crate::workers::*;
|
||||||
#[cfg(feature = "sqlite3")]
|
#[cfg(feature = "sqlite3")]
|
||||||
pub mod sqlite3;
|
pub mod sqlite3;
|
||||||
|
|
||||||
|
pub mod jobs1;
|
||||||
pub mod mailcap;
|
pub mod mailcap;
|
||||||
pub mod plugins;
|
pub mod plugins;
|
||||||
|
|
||||||
|
@ -463,6 +464,10 @@ fn run_app(opt: Opt) -> Result<()> {
|
||||||
ThreadEvent::NewThread(id, name) => {
|
ThreadEvent::NewThread(id, name) => {
|
||||||
state.new_thread(id, name);
|
state.new_thread(id, name);
|
||||||
},
|
},
|
||||||
|
ThreadEvent::JobFinished(id) => {
|
||||||
|
debug!("Job finished {}", id);
|
||||||
|
//state.new_thread(id, name);
|
||||||
|
},
|
||||||
}
|
}
|
||||||
},
|
},
|
||||||
recv(signal_recvr) -> sig => {
|
recv(signal_recvr) -> sig => {
|
||||||
|
|
|
@ -24,6 +24,7 @@
|
||||||
*/
|
*/
|
||||||
|
|
||||||
use super::{AccountConf, FileMailboxConf};
|
use super::{AccountConf, FileMailboxConf};
|
||||||
|
use crate::jobs1::JobExecutor;
|
||||||
use melib::async_workers::{Async, AsyncBuilder, AsyncStatus, WorkContext};
|
use melib::async_workers::{Async, AsyncBuilder, AsyncStatus, WorkContext};
|
||||||
use melib::backends::{
|
use melib::backends::{
|
||||||
AccountHash, BackendOp, Backends, MailBackend, Mailbox, MailboxHash, NotifyFn, ReadOnlyOp,
|
AccountHash, BackendOp, Backends, MailBackend, Mailbox, MailboxHash, NotifyFn, ReadOnlyOp,
|
||||||
|
@ -41,6 +42,7 @@ use std::collections::{HashMap, HashSet};
|
||||||
use crate::types::UIEvent::{self, EnvelopeRemove, EnvelopeRename, EnvelopeUpdate, Notification};
|
use crate::types::UIEvent::{self, EnvelopeRemove, EnvelopeRename, EnvelopeUpdate, Notification};
|
||||||
use crate::{StatusEvent, ThreadEvent};
|
use crate::{StatusEvent, ThreadEvent};
|
||||||
use crossbeam::Sender;
|
use crossbeam::Sender;
|
||||||
|
pub use futures::stream::Stream;
|
||||||
use std::collections::VecDeque;
|
use std::collections::VecDeque;
|
||||||
use std::fs;
|
use std::fs;
|
||||||
use std::io;
|
use std::io;
|
||||||
|
@ -193,6 +195,7 @@ impl Account {
|
||||||
mut settings: AccountConf,
|
mut settings: AccountConf,
|
||||||
map: &Backends,
|
map: &Backends,
|
||||||
work_context: WorkContext,
|
work_context: WorkContext,
|
||||||
|
job_executor: &JobExecutor,
|
||||||
sender: Sender<ThreadEvent>,
|
sender: Sender<ThreadEvent>,
|
||||||
notify_fn: NotifyFn,
|
notify_fn: NotifyFn,
|
||||||
) -> Result<Self> {
|
) -> Result<Self> {
|
||||||
|
|
|
@ -0,0 +1,242 @@
|
||||||
|
/*
|
||||||
|
* meli - jobs executor
|
||||||
|
*
|
||||||
|
* Copyright 2020 Manos Pitsidianakis
|
||||||
|
*
|
||||||
|
* This file is part of meli.
|
||||||
|
*
|
||||||
|
* meli is free software: you can redistribute it and/or modify
|
||||||
|
* it under the terms of the GNU General Public License as published by
|
||||||
|
* the Free Software Foundation, either version 3 of the License, or
|
||||||
|
* (at your option) any later version.
|
||||||
|
*
|
||||||
|
* meli is distributed in the hope that it will be useful,
|
||||||
|
* but WITHOUT ANY WARRANTY; without even the implied warranty of
|
||||||
|
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
|
||||||
|
* GNU General Public License for more details.
|
||||||
|
*
|
||||||
|
* You should have received a copy of the GNU General Public License
|
||||||
|
* along with meli. If not, see <http://www.gnu.org/licenses/>.
|
||||||
|
*/
|
||||||
|
|
||||||
|
use std::future::Future;
|
||||||
|
use std::panic::catch_unwind;
|
||||||
|
use std::time::Duration;
|
||||||
|
use std::pin::Pin;
|
||||||
|
use std::sync::Arc;
|
||||||
|
use std::task::{Context, Poll};
|
||||||
|
use std::thread;
|
||||||
|
use uuid::Uuid;
|
||||||
|
|
||||||
|
use crossbeam::channel;
|
||||||
|
use crossbeam::deque::{Injector, Steal, Stealer, Worker};
|
||||||
|
use std::iter;
|
||||||
|
use crossbeam::Sender;
|
||||||
|
use crate::types::ThreadEvent;
|
||||||
|
use futures::channel::oneshot;
|
||||||
|
use crossbeam::sync::{Unparker, Parker};
|
||||||
|
|
||||||
|
type AsyncTask = async_task::Task<()>;
|
||||||
|
|
||||||
|
fn find_task<T>(
|
||||||
|
local: &Worker<T>,
|
||||||
|
global: &Injector<T>,
|
||||||
|
stealers: &[Stealer<T>],
|
||||||
|
) -> Option<T> {
|
||||||
|
// Pop a task from the local queue, if not empty.
|
||||||
|
local.pop().or_else(|| {
|
||||||
|
// Otherwise, we need to look for a task elsewhere.
|
||||||
|
iter::repeat_with(|| {
|
||||||
|
// Try stealing a batch of tasks from the global queue.
|
||||||
|
global.steal_batch_and_pop(local)
|
||||||
|
// Or try stealing a task from one of the other threads.
|
||||||
|
.or_else(|| stealers.iter().map(|s| s.steal()).collect())
|
||||||
|
})
|
||||||
|
// Loop while no task was stolen and any steal operation needs to be retried.
|
||||||
|
.find(|s| !s.is_retry())
|
||||||
|
// Extract the stolen task, if there is one.
|
||||||
|
.and_then(|s| s.success())
|
||||||
|
})
|
||||||
|
}
|
||||||
|
|
||||||
|
macro_rules! uuid_hash_type {
|
||||||
|
($n:ident) => {
|
||||||
|
#[derive(PartialEq, Hash, Eq, Copy, Clone, Serialize, Deserialize, Default)]
|
||||||
|
pub struct $n(Uuid);
|
||||||
|
|
||||||
|
impl core::fmt::Debug for $n {
|
||||||
|
fn fmt(&self, f: &mut core::fmt::Formatter) -> core::fmt::Result {
|
||||||
|
write!(f, "{}", self.0.to_string())
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
impl core::fmt::Display for $n {
|
||||||
|
fn fmt(&self, f: &mut core::fmt::Formatter) -> core::fmt::Result {
|
||||||
|
write!(f, "{}", self.0.to_string())
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
impl $n {
|
||||||
|
fn new() -> Self {
|
||||||
|
$n(Uuid::new_v4())
|
||||||
|
}
|
||||||
|
pub fn null() -> Self {
|
||||||
|
$n(Uuid::nil())
|
||||||
|
}
|
||||||
|
}
|
||||||
|
};
|
||||||
|
}
|
||||||
|
uuid_hash_type!(JobId);
|
||||||
|
|
||||||
|
/// A spawned future and its current state.
|
||||||
|
pub struct MeliTask {
|
||||||
|
task: AsyncTask,
|
||||||
|
id: JobId,
|
||||||
|
}
|
||||||
|
|
||||||
|
pub struct JobExecutor {
|
||||||
|
active_jobs: Vec<JobId>,
|
||||||
|
global_queue: Arc<Injector<MeliTask>>,
|
||||||
|
workers: Vec<Stealer<MeliTask>>,
|
||||||
|
sender: Sender<ThreadEvent>,
|
||||||
|
parkers: Vec<Unparker>,
|
||||||
|
}
|
||||||
|
|
||||||
|
impl JobExecutor {
|
||||||
|
/// A queue that holds scheduled tasks.
|
||||||
|
pub fn new(sender: Sender<ThreadEvent>) -> Self {
|
||||||
|
// Create a queue.
|
||||||
|
let mut ret = JobExecutor {
|
||||||
|
active_jobs: vec![],
|
||||||
|
global_queue: Arc::new(Injector::new()),
|
||||||
|
workers: vec![],
|
||||||
|
parkers: vec![],
|
||||||
|
sender,
|
||||||
|
};
|
||||||
|
let mut workers = vec![];
|
||||||
|
for _ in 0..num_cpus::get().max(1) {
|
||||||
|
let new_worker = Worker::new_fifo();
|
||||||
|
ret.workers.push(new_worker.stealer());
|
||||||
|
let p = Parker::new();
|
||||||
|
ret.parkers.push(p.unparker().clone());
|
||||||
|
workers.push((new_worker, p));
|
||||||
|
}
|
||||||
|
|
||||||
|
|
||||||
|
|
||||||
|
// Spawn executor threads the first time the queue is created.
|
||||||
|
for (i, (local, parker)) in workers.into_iter().enumerate() {
|
||||||
|
let sender = ret.sender.clone();
|
||||||
|
let global = ret.global_queue.clone();
|
||||||
|
let stealers = ret.workers.clone();
|
||||||
|
thread::Builder::new()
|
||||||
|
.name(format!("meli executor thread {}", i))
|
||||||
|
.spawn(move || {
|
||||||
|
loop {
|
||||||
|
parker.park_timeout(Duration::from_millis(100));
|
||||||
|
let task = find_task(&local, &global, stealers.as_slice());
|
||||||
|
if let Some(meli_task) = task {
|
||||||
|
let MeliTask { task, id} = meli_task;
|
||||||
|
debug!("Worker {} got task {:?}", i,id);
|
||||||
|
if let Ok(false) = catch_unwind(|| task.run()) {
|
||||||
|
debug!("Worker {} got result {:?}", i,id);
|
||||||
|
sender.send(ThreadEvent::JobFinished(id)).unwrap();
|
||||||
|
} else {
|
||||||
|
debug!("Worker {} rescheduled {:?}", i,id);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
});
|
||||||
|
}
|
||||||
|
ret
|
||||||
|
}
|
||||||
|
/// Spawns a future on the executor.
|
||||||
|
pub fn spawn<F>(&self, future: F) -> JoinHandle
|
||||||
|
where
|
||||||
|
F: Future<Output = ()> + Send + 'static,
|
||||||
|
{
|
||||||
|
let job_id = JobId::new();
|
||||||
|
let _job_id = job_id.clone();
|
||||||
|
let injector = self.global_queue.clone();
|
||||||
|
// Create a task and schedule it for execution.
|
||||||
|
let (task, handle) = async_task::spawn(async {
|
||||||
|
let _ = future.await;
|
||||||
|
}, move |task| injector.push(MeliTask { task, id: _job_id }), ());
|
||||||
|
task.schedule();
|
||||||
|
for unparker in self.parkers.iter() {
|
||||||
|
unparker.unpark();
|
||||||
|
}
|
||||||
|
|
||||||
|
// Return a join handle that retrieves the output of the future.
|
||||||
|
JoinHandle(handle)
|
||||||
|
}
|
||||||
|
|
||||||
|
///// Spawns a future on the executor.
|
||||||
|
pub fn spawn_specialized<F, R>(&self, future: F) -> (oneshot::Receiver<R>, JobId)
|
||||||
|
where
|
||||||
|
F: Future<Output = R> + Send + 'static,
|
||||||
|
R: Send + 'static,
|
||||||
|
{
|
||||||
|
let (sender, receiver) = oneshot::channel();
|
||||||
|
let job_id = JobId::new();
|
||||||
|
let _job_id = job_id.clone();
|
||||||
|
let injector = self.global_queue.clone();
|
||||||
|
// Create a task and schedule it for execution.
|
||||||
|
let (task, handle) = async_task::spawn(async {
|
||||||
|
let res = future.await;
|
||||||
|
sender.send(res);
|
||||||
|
}, move |task| injector.push(MeliTask { task, id: _job_id }), ());
|
||||||
|
task.schedule();
|
||||||
|
for unparker in self.parkers.iter() {
|
||||||
|
unparker.unpark();
|
||||||
|
}
|
||||||
|
|
||||||
|
(receiver, job_id)
|
||||||
|
}
|
||||||
|
|
||||||
|
}
|
||||||
|
|
||||||
|
|
||||||
|
///// Spawns a future on the executor.
|
||||||
|
//fn spawn<F, R>(future: F) -> JoinHandle<R>
|
||||||
|
//where
|
||||||
|
// F: Future<Output = R> + Send + 'static,
|
||||||
|
// R: Send + 'static,
|
||||||
|
//{
|
||||||
|
// // Create a task and schedule it for execution.
|
||||||
|
// let (task, handle) = async_task::spawn(future, |t| QUEUE.send(t).unwrap(), ());
|
||||||
|
// task.schedule();
|
||||||
|
//
|
||||||
|
// // Return a join handle that retrieves the output of the future.
|
||||||
|
// JoinHandle(handle)
|
||||||
|
//}
|
||||||
|
|
||||||
|
/// Awaits the output of a spawned future.
|
||||||
|
pub struct JoinHandle( async_task::JoinHandle<(), ()>);
|
||||||
|
|
||||||
|
impl Future for JoinHandle {
|
||||||
|
type Output = ();
|
||||||
|
|
||||||
|
fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
|
||||||
|
match Pin::new(&mut self.0).poll(cx) {
|
||||||
|
Poll::Pending => Poll::Pending,
|
||||||
|
Poll::Ready(output) => Poll::Ready(output.expect("task failed")),
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
/*
|
||||||
|
fn _test() {
|
||||||
|
let executor = JobExecutor::new();
|
||||||
|
futures::executor::block_on(async {
|
||||||
|
// Spawn a future.
|
||||||
|
let handle = executor.spawn(async {
|
||||||
|
println!("Running task...");
|
||||||
|
panic!();
|
||||||
|
});
|
||||||
|
|
||||||
|
// Await its output.
|
||||||
|
handle.await;
|
||||||
|
});
|
||||||
|
}
|
||||||
|
*/
|
|
@ -69,6 +69,7 @@ use crate::workers::*;
|
||||||
#[cfg(feature = "sqlite3")]
|
#[cfg(feature = "sqlite3")]
|
||||||
pub mod sqlite3;
|
pub mod sqlite3;
|
||||||
|
|
||||||
|
pub mod jobs1;
|
||||||
pub mod mailcap;
|
pub mod mailcap;
|
||||||
pub mod plugins;
|
pub mod plugins;
|
||||||
|
|
||||||
|
|
|
@ -32,6 +32,7 @@ use super::*;
|
||||||
use crate::plugins::PluginManager;
|
use crate::plugins::PluginManager;
|
||||||
use melib::backends::{AccountHash, MailboxHash, NotifyFn};
|
use melib::backends::{AccountHash, MailboxHash, NotifyFn};
|
||||||
|
|
||||||
|
use crate::jobs1::JobExecutor;
|
||||||
use crossbeam::channel::{unbounded, Receiver, Sender};
|
use crossbeam::channel::{unbounded, Receiver, Sender};
|
||||||
use smallvec::SmallVec;
|
use smallvec::SmallVec;
|
||||||
use std::collections::HashMap;
|
use std::collections::HashMap;
|
||||||
|
@ -95,6 +96,7 @@ pub struct Context {
|
||||||
receiver: Receiver<ThreadEvent>,
|
receiver: Receiver<ThreadEvent>,
|
||||||
input: InputHandler,
|
input: InputHandler,
|
||||||
work_controller: WorkController,
|
work_controller: WorkController,
|
||||||
|
job_executor: JobExecutor,
|
||||||
pub children: Vec<std::process::Child>,
|
pub children: Vec<std::process::Child>,
|
||||||
pub plugin_manager: PluginManager,
|
pub plugin_manager: PluginManager,
|
||||||
|
|
||||||
|
@ -244,6 +246,7 @@ impl State {
|
||||||
|
|
||||||
let mut account_hashes = HashMap::with_capacity_and_hasher(1, Default::default());
|
let mut account_hashes = HashMap::with_capacity_and_hasher(1, Default::default());
|
||||||
let work_controller = WorkController::new(sender.clone());
|
let work_controller = WorkController::new(sender.clone());
|
||||||
|
let job_executor = JobExecutor::new(sender.clone());
|
||||||
let accounts: Vec<Account> = {
|
let accounts: Vec<Account> = {
|
||||||
let mut file_accs = settings
|
let mut file_accs = settings
|
||||||
.accounts
|
.accounts
|
||||||
|
@ -271,6 +274,7 @@ impl State {
|
||||||
a_s.clone(),
|
a_s.clone(),
|
||||||
&backends,
|
&backends,
|
||||||
work_controller.get_context(),
|
work_controller.get_context(),
|
||||||
|
&job_executor,
|
||||||
sender.clone(),
|
sender.clone(),
|
||||||
NotifyFn::new(Box::new(move |f: MailboxHash| {
|
NotifyFn::new(Box::new(move |f: MailboxHash| {
|
||||||
sender
|
sender
|
||||||
|
@ -331,6 +335,7 @@ impl State {
|
||||||
replies: VecDeque::with_capacity(5),
|
replies: VecDeque::with_capacity(5),
|
||||||
temp_files: Vec::new(),
|
temp_files: Vec::new(),
|
||||||
work_controller,
|
work_controller,
|
||||||
|
job_executor,
|
||||||
children: vec![],
|
children: vec![],
|
||||||
plugin_manager,
|
plugin_manager,
|
||||||
|
|
||||||
|
|
|
@ -36,6 +36,7 @@ mod helpers;
|
||||||
pub use self::helpers::*;
|
pub use self::helpers::*;
|
||||||
|
|
||||||
use super::execute::Action;
|
use super::execute::Action;
|
||||||
|
use super::jobs1::JobId;
|
||||||
use super::terminal::*;
|
use super::terminal::*;
|
||||||
|
|
||||||
use melib::backends::{AccountHash, MailboxHash};
|
use melib::backends::{AccountHash, MailboxHash};
|
||||||
|
@ -68,6 +69,7 @@ pub enum ThreadEvent {
|
||||||
/// A thread has updated some of its information
|
/// A thread has updated some of its information
|
||||||
Pulse,
|
Pulse,
|
||||||
//Decode { _ }, // For gpg2 signature check
|
//Decode { _ }, // For gpg2 signature check
|
||||||
|
JobFinished(JobId),
|
||||||
}
|
}
|
||||||
|
|
||||||
impl From<RefreshEvent> for ThreadEvent {
|
impl From<RefreshEvent> for ThreadEvent {
|
||||||
|
|
Loading…
Reference in New Issue