add priority and info to jobs and workers

jobs now have a priority given to them, in order to parse some mailboxes
(eg INBOX, Sent) first.

worker threads now can set their names and status
embed
Manos Pitsidianakis 2019-09-11 17:57:55 +03:00
parent fd38dbed48
commit f394fde143
Signed by: Manos Pitsidianakis
GPG Key ID: 73627C2F690DF710
12 changed files with 577 additions and 221 deletions

View File

@ -40,11 +40,45 @@ use std::fmt;
use std::sync::Arc; use std::sync::Arc;
#[derive(Clone)] #[derive(Clone)]
pub struct Work(pub Arc<Box<dyn Fn() -> () + Send + Sync>>); pub struct WorkContext {
pub new_work: Sender<Work>,
pub set_name: Sender<(std::thread::ThreadId, String)>,
pub set_status: Sender<(std::thread::ThreadId, String)>,
pub finished: Sender<std::thread::ThreadId>,
}
#[derive(Clone)]
pub struct Work {
priority: u64,
pub is_static: bool,
pub closure: Arc<Box<dyn Fn(WorkContext) -> () + Send + Sync>>,
name: String,
status: String,
}
impl Ord for Work {
fn cmp(&self, other: &Work) -> std::cmp::Ordering {
self.priority.cmp(&other.priority)
}
}
impl PartialOrd for Work {
fn partial_cmp(&self, other: &Work) -> Option<std::cmp::Ordering> {
Some(self.priority.cmp(&other.priority))
}
}
impl PartialEq for Work {
fn eq(&self, other: &Work) -> bool {
self.priority == other.priority
}
}
impl Eq for Work {}
impl Work { impl Work {
pub fn compute(&self) { pub fn compute(&self, work_context: WorkContext) {
(self.0)(); (self.closure)(work_context);
} }
} }
@ -80,6 +114,8 @@ impl<T> fmt::Debug for AsyncStatus<T> {
pub struct AsyncBuilder<T: Send + Sync> { pub struct AsyncBuilder<T: Send + Sync> {
tx: Sender<AsyncStatus<T>>, tx: Sender<AsyncStatus<T>>,
rx: Receiver<AsyncStatus<T>>, rx: Receiver<AsyncStatus<T>>,
priority: u64,
is_static: bool,
} }
#[derive(Clone, Debug)] #[derive(Clone, Debug)]
@ -105,6 +141,8 @@ where
AsyncBuilder { AsyncBuilder {
tx: sender, tx: sender,
rx: receiver, rx: receiver,
priority: 0,
is_static: false,
} }
} }
/// Returns the sender object of the promise's channel. /// Returns the sender object of the promise's channel.
@ -115,10 +153,27 @@ where
pub fn rx(&mut self) -> Receiver<AsyncStatus<T>> { pub fn rx(&mut self) -> Receiver<AsyncStatus<T>> {
self.rx.clone() self.rx.clone()
} }
pub fn set_priority(&mut self, new_val: u64) -> &mut Self {
self.priority = new_val;
self
}
pub fn set_is_static(&mut self, new_val: bool) -> &mut Self {
self.is_static = new_val;
self
}
/// Returns an `Async<T>` object that contains a `Thread` join handle that returns a `T` /// Returns an `Async<T>` object that contains a `Thread` join handle that returns a `T`
pub fn build(self, work: Box<dyn Fn() -> () + Send + Sync>) -> Async<T> { pub fn build(self, work: Box<dyn Fn(WorkContext) -> () + Send + Sync>) -> Async<T> {
Async { Async {
work: Work(Arc::new(work)), work: Work {
priority: self.priority,
is_static: self.is_static,
closure: Arc::new(work),
name: String::new(),
status: String::new(),
},
tx: self.tx, tx: self.tx,
rx: self.rx, rx: self.rx,
active: false, active: false,

View File

@ -163,7 +163,11 @@ type NewFolderName = String;
pub trait MailBackend: ::std::fmt::Debug { pub trait MailBackend: ::std::fmt::Debug {
fn get(&mut self, folder: &Folder) -> Async<Result<Vec<Envelope>>>; fn get(&mut self, folder: &Folder) -> Async<Result<Vec<Envelope>>>;
fn watch(&self, sender: RefreshEventConsumer) -> Result<()>; fn watch(
&self,
sender: RefreshEventConsumer,
work_context: WorkContext,
) -> Result<std::thread::ThreadId>;
fn folders(&self) -> FnvHashMap<FolderHash, Folder>; fn folders(&self) -> FnvHashMap<FolderHash, Folder>;
fn operation(&self, hash: EnvelopeHash, folder_hash: FolderHash) -> Box<dyn BackendOp>; fn operation(&self, hash: EnvelopeHash, folder_hash: FolderHash) -> Box<dyn BackendOp>;

View File

@ -33,7 +33,7 @@ pub use watch::*;
extern crate native_tls; extern crate native_tls;
use crate::async_workers::{Async, AsyncBuilder, AsyncStatus}; use crate::async_workers::{Async, AsyncBuilder, AsyncStatus, WorkContext};
use crate::backends::BackendOp; use crate::backends::BackendOp;
use crate::backends::FolderHash; use crate::backends::FolderHash;
use crate::backends::RefreshEvent; use crate::backends::RefreshEvent;
@ -64,7 +64,6 @@ pub struct ImapType {
capabilities: FnvHashSet<Vec<u8>>, capabilities: FnvHashSet<Vec<u8>>,
folders: FnvHashMap<FolderHash, ImapFolder>, folders: FnvHashMap<FolderHash, ImapFolder>,
folder_connections: FnvHashMap<FolderHash, Arc<Mutex<ImapConnection>>>,
hash_index: Arc<Mutex<FnvHashMap<EnvelopeHash, (UID, FolderHash)>>>, hash_index: Arc<Mutex<FnvHashMap<EnvelopeHash, (UID, FolderHash)>>>,
uid_index: Arc<Mutex<FnvHashMap<usize, EnvelopeHash>>>, uid_index: Arc<Mutex<FnvHashMap<usize, EnvelopeHash>>>,
} }
@ -89,7 +88,7 @@ impl MailBackend for ImapType {
let folder_hash = folder.hash(); let folder_hash = folder.hash();
let folder_exists = self.folders[&folder_hash].exists.clone(); let folder_exists = self.folders[&folder_hash].exists.clone();
let connection = self.connection.clone(); let connection = self.connection.clone();
let closure = move || { let closure = move |_work_context| {
let connection = connection.clone(); let connection = connection.clone();
let tx = tx.clone(); let tx = tx.clone();
let mut response = String::with_capacity(8 * 1024); let mut response = String::with_capacity(8 * 1024);
@ -162,16 +161,25 @@ impl MailBackend for ImapType {
w.build(handle) w.build(handle)
} }
fn watch(&self, sender: RefreshEventConsumer) -> Result<()> { fn watch(
&self,
sender: RefreshEventConsumer,
work_context: WorkContext,
) -> Result<std::thread::ThreadId> {
let has_idle: bool = self.capabilities.contains(&b"IDLE"[0..]); let has_idle: bool = self.capabilities.contains(&b"IDLE"[0..]);
let folders = self.imap_folders(); let folders = self.folders.clone();
let conn = self.new_connection()?; let conn = self.new_connection()?;
let main_conn = self.connection.clone(); let main_conn = self.connection.clone();
let hash_index = self.hash_index.clone(); let hash_index = self.hash_index.clone();
let uid_index = self.uid_index.clone(); let uid_index = self.uid_index.clone();
std::thread::Builder::new() let handle = std::thread::Builder::new()
.name(format!("{} imap connection", self.account_name.as_str(),)) .name(format!("{} imap connection", self.account_name.as_str(),))
.spawn(move || { .spawn(move || {
let thread = std::thread::current();
work_context
.set_status
.send((thread.id(), "watching".to_string()))
.unwrap();
let kit = ImapWatchKit { let kit = ImapWatchKit {
conn, conn,
main_conn, main_conn,
@ -179,6 +187,7 @@ impl MailBackend for ImapType {
uid_index, uid_index,
folders, folders,
sender, sender,
work_context,
}; };
if has_idle { if has_idle {
idle(kit); idle(kit);
@ -186,7 +195,7 @@ impl MailBackend for ImapType {
poll_with_examine(kit); poll_with_examine(kit);
} }
})?; })?;
Ok(()) Ok(handle.thread().id())
} }
fn folders(&self) -> FnvHashMap<FolderHash, Folder> { fn folders(&self) -> FnvHashMap<FolderHash, Folder> {
@ -194,7 +203,7 @@ impl MailBackend for ImapType {
return self return self
.folders .folders
.iter() .iter()
.map(|(h, f)| (*h, f.clone() as Folder)) .map(|(h, f)| (*h, Box::new(Clone::clone(f)) as Folder))
.collect(); .collect();
} }
@ -241,7 +250,7 @@ impl MailBackend for ImapType {
debug!(&folders); debug!(&folders);
folders folders
.iter() .iter()
.map(|(h, f)| (*h, f.clone() as Folder)) .map(|(h, f)| (*h, Box::new(Clone::clone(f)) as Folder))
.collect() .collect()
} }
@ -489,7 +498,6 @@ impl ImapType {
folders: Default::default(), folders: Default::default(),
connection: Arc::new(Mutex::new(ImapConnection { cmd_id, stream })), connection: Arc::new(Mutex::new(ImapConnection { cmd_id, stream })),
danger_accept_invalid_certs, danger_accept_invalid_certs,
folder_connections: Default::default(),
hash_index: Default::default(), hash_index: Default::default(),
uid_index: Default::default(), uid_index: Default::default(),
capabilities: Default::default(), capabilities: Default::default(),
@ -532,13 +540,6 @@ impl ImapType {
for f in m.folders.values_mut() { for f in m.folders.values_mut() {
f.children.retain(|c| keys.contains(c)); f.children.retain(|c| keys.contains(c));
} }
/*
for f in m.folders.keys() {
m.folder_connections.insert(
*f,
Arc::new(Mutex::new(exit_on_error!(s returning m.new_connection()))),
);
}*/
m m
} }

View File

@ -21,7 +21,7 @@
use crate::backends::{BackendFolder, Folder, FolderHash}; use crate::backends::{BackendFolder, Folder, FolderHash};
use std::sync::{Arc, Mutex}; use std::sync::{Arc, Mutex};
#[derive(Debug, Default)] #[derive(Debug, Default, Clone)]
pub struct ImapFolder { pub struct ImapFolder {
pub(super) hash: FolderHash, pub(super) hash: FolderHash,
pub(super) path: String, pub(super) path: String,

View File

@ -29,12 +29,14 @@ pub struct ImapWatchKit {
pub uid_index: Arc<Mutex<FnvHashMap<usize, EnvelopeHash>>>, pub uid_index: Arc<Mutex<FnvHashMap<usize, EnvelopeHash>>>,
pub folders: FnvHashMap<FolderHash, ImapFolder>, pub folders: FnvHashMap<FolderHash, ImapFolder>,
pub sender: RefreshEventConsumer, pub sender: RefreshEventConsumer,
pub work_context: WorkContext,
} }
macro_rules! exit_on_error { macro_rules! exit_on_error {
($sender:expr, $folder_hash:ident, $($result:expr)+) => { ($sender:expr, $folder_hash:ident, $work_context:ident, $thread_id:ident, $($result:expr)+) => {
$(if let Err(e) = $result { $(if let Err(e) = $result {
debug!("failure: {}", e.to_string()); debug!("failure: {}", e.to_string());
$work_context.set_status.send(($thread_id, e.to_string())).unwrap();
$sender.send(RefreshEvent { $sender.send(RefreshEvent {
hash: $folder_hash, hash: $folder_hash,
kind: RefreshEventKind::Failure(e), kind: RefreshEventKind::Failure(e),
@ -53,12 +55,32 @@ pub fn poll_with_examine(kit: ImapWatchKit) {
uid_index, uid_index,
folders, folders,
sender, sender,
work_context,
} = kit; } = kit;
let mut response = String::with_capacity(8 * 1024); let mut response = String::with_capacity(8 * 1024);
let thread_id: std::thread::ThreadId = std::thread::current().id();
loop { loop {
work_context
.set_status
.send((thread_id, "sleeping...".to_string()))
.unwrap();
std::thread::sleep(std::time::Duration::from_millis(5 * 60 * 1000)); std::thread::sleep(std::time::Duration::from_millis(5 * 60 * 1000));
for folder in folders.values() { for folder in folders.values() {
examine_updates(folder, &sender, &mut conn, &hash_index, &uid_index); work_context
.set_status
.send((
thread_id,
format!("examining `{}` for updates...", folder.path()),
))
.unwrap();
examine_updates(
folder,
&sender,
&mut conn,
&hash_index,
&uid_index,
&work_context,
);
} }
let mut main_conn = main_conn.lock().unwrap(); let mut main_conn = main_conn.lock().unwrap();
main_conn.send_command(b"NOOP").unwrap(); main_conn.send_command(b"NOOP").unwrap();
@ -77,7 +99,9 @@ pub fn idle(kit: ImapWatchKit) {
uid_index, uid_index,
folders, folders,
sender, sender,
work_context,
} = kit; } = kit;
let thread_id: std::thread::ThreadId = std::thread::current().id();
let folder: &ImapFolder = folders let folder: &ImapFolder = folders
.values() .values()
.find(|f| f.parent.is_none() && f.path().eq_ignore_ascii_case("INBOX")) .find(|f| f.parent.is_none() && f.path().eq_ignore_ascii_case("INBOX"))
@ -87,6 +111,8 @@ pub fn idle(kit: ImapWatchKit) {
exit_on_error!( exit_on_error!(
sender, sender,
folder_hash, folder_hash,
work_context,
thread_id,
conn.read_response(&mut response) conn.read_response(&mut response)
conn.send_command(format!("SELECT {}", folder.path()).as_bytes()) conn.send_command(format!("SELECT {}", folder.path()).as_bytes())
conn.read_response(&mut response) conn.read_response(&mut response)
@ -112,7 +138,17 @@ pub fn idle(kit: ImapWatchKit) {
} }
}; };
} }
exit_on_error!(sender, folder_hash, conn.send_command(b"IDLE")); exit_on_error!(
sender,
folder_hash,
work_context,
thread_id,
conn.send_command(b"IDLE")
);
work_context
.set_status
.send((thread_id, "IDLEing".to_string()))
.unwrap();
let mut iter = ImapBlockingConnection::from(conn); let mut iter = ImapBlockingConnection::from(conn);
let mut beat = std::time::Instant::now(); let mut beat = std::time::Instant::now();
let mut watch = std::time::Instant::now(); let mut watch = std::time::Instant::now();
@ -127,6 +163,8 @@ pub fn idle(kit: ImapWatchKit) {
exit_on_error!( exit_on_error!(
sender, sender,
folder_hash, folder_hash,
work_context,
thread_id,
iter.conn.set_nonblocking(true) iter.conn.set_nonblocking(true)
iter.conn.send_raw(b"DONE") iter.conn.send_raw(b"DONE")
iter.conn.read_response(&mut response) iter.conn.read_response(&mut response)
@ -142,6 +180,8 @@ pub fn idle(kit: ImapWatchKit) {
exit_on_error!( exit_on_error!(
sender, sender,
folder_hash, folder_hash,
work_context,
thread_id,
iter.conn.set_nonblocking(true) iter.conn.set_nonblocking(true)
iter.conn.send_raw(b"DONE") iter.conn.send_raw(b"DONE")
iter.conn.read_response(&mut response) iter.conn.read_response(&mut response)
@ -151,11 +191,31 @@ pub fn idle(kit: ImapWatchKit) {
/* Skip INBOX */ /* Skip INBOX */
continue; continue;
} }
examine_updates(folder, &sender, &mut iter.conn, &hash_index, &uid_index); work_context
.set_status
.send((
thread_id,
format!("examining `{}` for updates...", folder.path()),
))
.unwrap();
examine_updates(
folder,
&sender,
&mut iter.conn,
&hash_index,
&uid_index,
&work_context,
);
} }
work_context
.set_status
.send((thread_id, "done examining mailboxes.".to_string()))
.unwrap();
exit_on_error!( exit_on_error!(
sender, sender,
folder_hash, folder_hash,
work_context,
thread_id,
iter.conn.send_command(b"IDLE") iter.conn.send_command(b"IDLE")
iter.conn.set_nonblocking(false) iter.conn.set_nonblocking(false)
main_conn.lock().unwrap().send_command(b"NOOP") main_conn.lock().unwrap().send_command(b"NOOP")
@ -167,11 +227,17 @@ pub fn idle(kit: ImapWatchKit) {
.to_full_result() .to_full_result()
.map_err(MeliError::from) .map_err(MeliError::from)
{ {
Ok(Some(Recent(_))) => { Ok(Some(Recent(r))) => {
work_context
.set_status
.send((thread_id, format!("got `{} RECENT` notification", r)))
.unwrap();
/* UID SEARCH RECENT */ /* UID SEARCH RECENT */
exit_on_error!( exit_on_error!(
sender, sender,
folder_hash, folder_hash,
work_context,
thread_id,
iter.conn.set_nonblocking(true) iter.conn.set_nonblocking(true)
iter.conn.send_raw(b"DONE") iter.conn.send_raw(b"DONE")
iter.conn.read_response(&mut response) iter.conn.read_response(&mut response)
@ -189,6 +255,8 @@ pub fn idle(kit: ImapWatchKit) {
exit_on_error!( exit_on_error!(
sender, sender,
folder_hash, folder_hash,
work_context,
thread_id,
iter.conn.send_command( iter.conn.send_command(
&[b"UID FETCH", v, b"(FLAGS RFC822.HEADER)"] &[b"UID FETCH", v, b"(FLAGS RFC822.HEADER)"]
.join(&b' '), .join(&b' '),
@ -201,8 +269,18 @@ pub fn idle(kit: ImapWatchKit) {
.map_err(MeliError::from) .map_err(MeliError::from)
{ {
Ok(v) => { Ok(v) => {
let len = v.len();
let mut ctr = 0;
for (uid, flags, b) in v { for (uid, flags, b) in v {
work_context
.set_status
.send((
thread_id,
format!("parsing {}/{} envelopes..", ctr, len),
))
.unwrap();
if let Ok(env) = Envelope::from_bytes(&b, flags) { if let Ok(env) = Envelope::from_bytes(&b, flags) {
ctr += 1;
hash_index hash_index
.lock() .lock()
.unwrap() .unwrap()
@ -220,6 +298,13 @@ pub fn idle(kit: ImapWatchKit) {
}); });
} }
} }
work_context
.set_status
.send((
thread_id,
format!("parsed {}/{} envelopes.", ctr, len),
))
.unwrap();
} }
Err(e) => { Err(e) => {
debug!(e); debug!(e);
@ -237,17 +322,25 @@ pub fn idle(kit: ImapWatchKit) {
exit_on_error!( exit_on_error!(
sender, sender,
folder_hash, folder_hash,
work_context,
thread_id,
iter.conn.send_command(b"IDLE") iter.conn.send_command(b"IDLE")
iter.conn.set_nonblocking(false) iter.conn.set_nonblocking(false)
); );
} }
Ok(Some(Expunge(n))) => { Ok(Some(Expunge(n))) => {
work_context
.set_status
.send((thread_id, format!("got `{} EXPUNGED` notification", n)))
.unwrap();
debug!("expunge {}", n); debug!("expunge {}", n);
} }
Ok(Some(Exists(n))) => { Ok(Some(Exists(n))) => {
exit_on_error!( exit_on_error!(
sender, sender,
folder_hash, folder_hash,
work_context,
thread_id,
iter.conn.set_nonblocking(true) iter.conn.set_nonblocking(true)
iter.conn.send_raw(b"DONE") iter.conn.send_raw(b"DONE")
iter.conn.read_response(&mut response) iter.conn.read_response(&mut response)
@ -256,10 +349,24 @@ pub fn idle(kit: ImapWatchKit) {
* */ * */
let mut prev_exists = folder.exists.lock().unwrap(); let mut prev_exists = folder.exists.lock().unwrap();
debug!("exists {}", n); debug!("exists {}", n);
work_context
.set_status
.send((
thread_id,
format!(
"got `{} EXISTS` notification (EXISTS was previously {} for {}",
n,
*prev_exists,
folder.path()
),
))
.unwrap();
if n > *prev_exists { if n > *prev_exists {
exit_on_error!( exit_on_error!(
sender, sender,
folder_hash, folder_hash,
work_context,
thread_id,
iter.conn.send_command( iter.conn.send_command(
&[ &[
b"FETCH", b"FETCH",
@ -275,11 +382,22 @@ pub fn idle(kit: ImapWatchKit) {
.map_err(MeliError::from) .map_err(MeliError::from)
{ {
Ok(v) => { Ok(v) => {
let len = v.len();
let mut ctr = 0;
for (uid, flags, b) in v { for (uid, flags, b) in v {
work_context
.set_status
.send((
thread_id,
format!("parsing {}/{} envelopes..", ctr, len),
))
.unwrap();
if uid_index.lock().unwrap().contains_key(&uid) { if uid_index.lock().unwrap().contains_key(&uid) {
ctr += 1;
continue; continue;
} }
if let Ok(env) = Envelope::from_bytes(&b, flags) { if let Ok(env) = Envelope::from_bytes(&b, flags) {
ctr += 1;
hash_index hash_index
.lock() .lock()
.unwrap() .unwrap()
@ -297,6 +415,10 @@ pub fn idle(kit: ImapWatchKit) {
}); });
} }
} }
work_context
.set_status
.send((thread_id, format!("parsed {}/{} envelopes.", ctr, len)))
.unwrap();
} }
Err(e) => { Err(e) => {
debug!(e); debug!(e);
@ -310,12 +432,18 @@ pub fn idle(kit: ImapWatchKit) {
exit_on_error!( exit_on_error!(
sender, sender,
folder_hash, folder_hash,
work_context,
thread_id,
iter.conn.send_command(b"IDLE") iter.conn.send_command(b"IDLE")
iter.conn.set_nonblocking(false) iter.conn.set_nonblocking(false)
); );
} }
Ok(None) | Err(_) => {} Ok(None) | Err(_) => {}
} }
work_context
.set_status
.send((thread_id, "IDLEing".to_string()))
.unwrap();
} }
} }
} }
@ -326,13 +454,17 @@ fn examine_updates(
conn: &mut ImapConnection, conn: &mut ImapConnection,
hash_index: &Arc<Mutex<FnvHashMap<EnvelopeHash, (UID, FolderHash)>>>, hash_index: &Arc<Mutex<FnvHashMap<EnvelopeHash, (UID, FolderHash)>>>,
uid_index: &Arc<Mutex<FnvHashMap<usize, EnvelopeHash>>>, uid_index: &Arc<Mutex<FnvHashMap<usize, EnvelopeHash>>>,
work_context: &WorkContext,
) { ) {
let thread_id: std::thread::ThreadId = std::thread::current().id();
let folder_hash = folder.hash(); let folder_hash = folder.hash();
debug!("examining folder {} {}", folder_hash, folder.path()); debug!("examining folder {} {}", folder_hash, folder.path());
let mut response = String::with_capacity(8 * 1024); let mut response = String::with_capacity(8 * 1024);
exit_on_error!( exit_on_error!(
sender, sender,
folder_hash, folder_hash,
work_context,
thread_id,
conn.send_command(format!("EXAMINE {}", folder.path()).as_bytes()) conn.send_command(format!("EXAMINE {}", folder.path()).as_bytes())
conn.read_response(&mut response) conn.read_response(&mut response)
); );
@ -354,6 +486,8 @@ fn examine_updates(
exit_on_error!( exit_on_error!(
sender, sender,
folder_hash, folder_hash,
work_context,
thread_id,
conn.send_command(b"UID SEARCH RECENT") conn.send_command(b"UID SEARCH RECENT")
conn.read_response(&mut response) conn.read_response(&mut response)
); );
@ -368,6 +502,8 @@ fn examine_updates(
exit_on_error!( exit_on_error!(
sender, sender,
folder_hash, folder_hash,
work_context,
thread_id,
conn.send_command( conn.send_command(
&[b"UID FETCH", v, b"(FLAGS RFC822.HEADER)"] &[b"UID FETCH", v, b"(FLAGS RFC822.HEADER)"]
.join(&b' '), .join(&b' '),
@ -421,6 +557,8 @@ fn examine_updates(
exit_on_error!( exit_on_error!(
sender, sender,
folder_hash, folder_hash,
work_context,
thread_id,
conn.send_command( conn.send_command(
&[ &[
b"FETCH", b"FETCH",

View File

@ -191,7 +191,11 @@ impl MailBackend for MaildirType {
fn get(&mut self, folder: &Folder) -> Async<Result<Vec<Envelope>>> { fn get(&mut self, folder: &Folder) -> Async<Result<Vec<Envelope>>> {
self.multicore(4, folder) self.multicore(4, folder)
} }
fn watch(&self, sender: RefreshEventConsumer) -> Result<()> { fn watch(
&self,
sender: RefreshEventConsumer,
work_context: WorkContext,
) -> Result<std::thread::ThreadId> {
let (tx, rx) = channel(); let (tx, rx) = channel();
let mut watcher = watcher(tx, Duration::from_secs(2)).unwrap(); let mut watcher = watcher(tx, Duration::from_secs(2)).unwrap();
let root_path = self.path.to_path_buf(); let root_path = self.path.to_path_buf();
@ -199,11 +203,12 @@ impl MailBackend for MaildirType {
let cache_dir = xdg::BaseDirectories::with_profile("meli", &self.name).unwrap(); let cache_dir = xdg::BaseDirectories::with_profile("meli", &self.name).unwrap();
debug!("watching {:?}", root_path); debug!("watching {:?}", root_path);
let hash_indexes = self.hash_indexes.clone(); let hash_indexes = self.hash_indexes.clone();
thread::Builder::new() let handle = thread::Builder::new()
.name("folder watch".to_string()) .name("folder watch".to_string())
.spawn(move || { .spawn(move || {
// Move `watcher` in the closure's scope so that it doesn't get dropped. // Move `watcher` in the closure's scope so that it doesn't get dropped.
let _watcher = watcher; let _watcher = watcher;
let _work_context = work_context;
loop { loop {
match rx.recv() { match rx.recv() {
/* /*
@ -457,7 +462,7 @@ impl MailBackend for MaildirType {
} }
} }
})?; })?;
Ok(()) Ok(handle.thread().id())
} }
fn operation(&self, hash: EnvelopeHash, folder_hash: FolderHash) -> Box<dyn BackendOp> { fn operation(&self, hash: EnvelopeHash, folder_hash: FolderHash) -> Box<dyn BackendOp> {
@ -676,8 +681,12 @@ impl MaildirType {
let root_path = self.path.to_path_buf(); let root_path = self.path.to_path_buf();
let map = self.hash_indexes.clone(); let map = self.hash_indexes.clone();
let closure = move || { let closure = move |work_context: crate::async_workers::WorkContext| {
let name = name.clone(); let name = name.clone();
work_context
.set_name
.send((std::thread::current().id(), name.clone()))
.unwrap();
let root_path = root_path.clone(); let root_path = root_path.clone();
let map = map.clone(); let map = map.clone();
let tx = tx.clone(); let tx = tx.clone();
@ -798,6 +807,13 @@ impl MaildirType {
for t in threads { for t in threads {
let mut result = t.join().unwrap(); let mut result = t.join().unwrap();
ret.append(&mut result); ret.append(&mut result);
work_context
.set_status
.send((
std::thread::current().id(),
format!("parsing.. {}/{}", ret.len(), files.len()),
))
.unwrap();
} }
}) })
.unwrap(); .unwrap();

View File

@ -23,7 +23,7 @@
* https://wiki2.dovecot.org/MailboxFormat/mbox * https://wiki2.dovecot.org/MailboxFormat/mbox
*/ */
use crate::async_workers::{Async, AsyncBuilder, AsyncStatus}; use crate::async_workers::{Async, AsyncBuilder, AsyncStatus, WorkContext};
use crate::backends::BackendOp; use crate::backends::BackendOp;
use crate::backends::FolderHash; use crate::backends::FolderHash;
use crate::backends::{ use crate::backends::{
@ -374,7 +374,7 @@ impl MailBackend for MboxType {
let folder_path = folder.path().to_string(); let folder_path = folder.path().to_string();
let folder_hash = folder.hash(); let folder_hash = folder.hash();
let folders = self.folders.clone(); let folders = self.folders.clone();
let closure = move || { let closure = move |_work_context| {
let tx = tx.clone(); let tx = tx.clone();
let index = index.clone(); let index = index.clone();
let file = match std::fs::OpenOptions::new() let file = match std::fs::OpenOptions::new()
@ -415,7 +415,11 @@ impl MailBackend for MboxType {
w.build(handle) w.build(handle)
} }
fn watch(&self, sender: RefreshEventConsumer) -> Result<()> { fn watch(
&self,
sender: RefreshEventConsumer,
work_context: WorkContext,
) -> Result<std::thread::ThreadId> {
let (tx, rx) = channel(); let (tx, rx) = channel();
let mut watcher = watcher(tx, std::time::Duration::from_secs(10)).unwrap(); let mut watcher = watcher(tx, std::time::Duration::from_secs(10)).unwrap();
for f in self.folders.lock().unwrap().values() { for f in self.folders.lock().unwrap().values() {
@ -424,7 +428,7 @@ impl MailBackend for MboxType {
} }
let index = self.index.clone(); let index = self.index.clone();
let folders = self.folders.clone(); let folders = self.folders.clone();
std::thread::Builder::new() let handle = std::thread::Builder::new()
.name(format!( .name(format!(
"watching {}", "watching {}",
self.path.file_name().unwrap().to_str().unwrap() self.path.file_name().unwrap().to_str().unwrap()
@ -432,6 +436,7 @@ impl MailBackend for MboxType {
.spawn(move || { .spawn(move || {
// Move `watcher` in the closure's scope so that it doesn't get dropped. // Move `watcher` in the closure's scope so that it doesn't get dropped.
let _watcher = watcher; let _watcher = watcher;
let _work_context = work_context;
let index = index; let index = index;
let folders = folders; let folders = folders;
loop { loop {
@ -518,7 +523,7 @@ impl MailBackend for MboxType {
} }
} }
})?; })?;
Ok(()) Ok(handle.thread().id())
} }
fn folders(&self) -> FnvHashMap<FolderHash, Folder> { fn folders(&self) -> FnvHashMap<FolderHash, Folder> {
self.folders self.folders

View File

@ -188,8 +188,6 @@ fn main() -> std::result::Result<(), std::io::Error> {
let receiver = state.receiver(); let receiver = state.receiver();
let worker_receiver = state.worker_receiver();
/* Register some reasonably useful interfaces */ /* Register some reasonably useful interfaces */
let window = Box::new(Tabbed::new(vec![ let window = Box::new(Tabbed::new(vec![
Box::new(listing::Listing::new(&state.context.accounts)), Box::new(listing::Listing::new(&state.context.accounts)),
@ -298,6 +296,9 @@ fn main() -> std::result::Result<(), std::io::Error> {
state.rcv_event(e); state.rcv_event(e);
state.render(); state.render();
}, },
ThreadEvent::Pulse => {
state.redraw();
},
ThreadEvent::ThreadJoin(id) => { ThreadEvent::ThreadJoin(id) => {
state.join(id); state.join(id);
}, },
@ -315,10 +316,6 @@ fn main() -> std::result::Result<(), std::io::Error> {
_ => {} _ => {}
} }
}, },
recv(worker_receiver) -> _ => {
/* Some worker thread finished their job, acknowledge
* it and move on*/
},
} }
} // end of 'inner } // end of 'inner

View File

@ -37,6 +37,8 @@ use melib::AddressBook;
use melib::StackVec; use melib::StackVec;
use crate::types::UIEvent::{self, EnvelopeRemove, EnvelopeRename, EnvelopeUpdate, Notification}; use crate::types::UIEvent::{self, EnvelopeRemove, EnvelopeRename, EnvelopeUpdate, Notification};
use crate::{workers::WorkController, StatusEvent, ThreadEvent};
use crossbeam::Sender;
use std::collections::VecDeque; use std::collections::VecDeque;
use std::fs; use std::fs;
use std::io; use std::io;
@ -265,7 +267,7 @@ impl Account {
); );
workers.insert( workers.insert(
*h, *h,
Account::new_worker(f.clone(), &mut backend, notify_fn.clone()), Account::new_worker(&settings, f.clone(), &mut backend, notify_fn.clone()),
); );
collection.threads.insert(*h, Threads::default()); collection.threads.insert(*h, Threads::default());
} }
@ -320,6 +322,7 @@ impl Account {
} }
} }
fn new_worker( fn new_worker(
settings: &AccountConf,
folder: Folder, folder: Folder,
backend: &mut Box<dyn MailBackend>, backend: &mut Box<dyn MailBackend>,
notify_fn: Arc<NotifyFn>, notify_fn: Arc<NotifyFn>,
@ -328,21 +331,41 @@ impl Account {
let mut builder = AsyncBuilder::new(); let mut builder = AsyncBuilder::new();
let our_tx = builder.tx(); let our_tx = builder.tx();
let folder_hash = folder.hash(); let folder_hash = folder.hash();
let w = builder.build(Box::new(move || { let priority = match settings.folder_confs[folder.path()].usage {
Some(SpecialUseMailbox::Inbox) => 0,
Some(SpecialUseMailbox::Sent) => 1,
Some(SpecialUseMailbox::Drafts) | Some(SpecialUseMailbox::Trash) => 2,
Some(_) | None => {
3 * folder
.path()
.split(if folder.path().contains('/') {
'/'
} else {
'.'
})
.count() as u64
}
};
/* This polling closure needs to be 'static', that is to spawn its own thread instead of
* being assigned to a worker thread. Otherwise the polling closures could fill up the
* workers causing no actual parsing to be done. If we could yield from within the worker
* threads' closures this could be avoided, but it requires green threads.
*/
builder.set_priority(priority).set_is_static(true);
let w = builder.build(Box::new(move |work_context| {
let name = format!("Parsing {}", folder.path());
let mut mailbox_handle = mailbox_handle.clone(); let mut mailbox_handle = mailbox_handle.clone();
let work = mailbox_handle.work().unwrap(); let work = mailbox_handle.work().unwrap();
debug!("AA"); work_context.new_work.send(work).unwrap();
std::thread::Builder::new() let thread_id = std::thread::current().id();
.spawn(move || { work_context.set_name.send((thread_id, name)).unwrap();
debug!("A"); work_context
work.compute(); .set_status
debug!("B"); .send((thread_id, "Waiting for subworkers..".to_string()))
})
.unwrap(); .unwrap();
debug!("BB");
loop { loop {
debug!("LL");
match debug!(mailbox_handle.poll_block()) { match debug!(mailbox_handle.poll_block()) {
Ok(s @ AsyncStatus::Payload(_)) => { Ok(s @ AsyncStatus::Payload(_)) => {
our_tx.send(s).unwrap(); our_tx.send(s).unwrap();
@ -353,6 +376,7 @@ impl Account {
our_tx.send(s).unwrap(); our_tx.send(s).unwrap();
notify_fn.notify(folder_hash); notify_fn.notify(folder_hash);
debug!("exiting"); debug!("exiting");
work_context.finished.send(thread_id).unwrap();
return; return;
} }
Ok(s) => { Ok(s) => {
@ -363,7 +387,6 @@ impl Account {
return; return;
} }
} }
debug!("DD");
} }
})); }));
Some(w) Some(w)
@ -372,7 +395,11 @@ impl Account {
&mut self, &mut self,
event: RefreshEvent, event: RefreshEvent,
folder_hash: FolderHash, folder_hash: FolderHash,
sender: &crossbeam::channel::Sender<crate::types::ThreadEvent>, context: (
&mut WorkController,
&Sender<ThreadEvent>,
&mut VecDeque<UIEvent>,
),
) -> Option<UIEvent> { ) -> Option<UIEvent> {
if !self.folders[&folder_hash].is_available() { if !self.folders[&folder_hash].is_available() {
self.event_queue.push_back((folder_hash, event)); self.event_queue.push_back((folder_hash, event));
@ -396,6 +423,13 @@ impl Account {
} }
RefreshEventKind::Create(envelope) => { RefreshEventKind::Create(envelope) => {
let env_hash = envelope.hash(); let env_hash = envelope.hash();
if self.collection.envelopes.contains_key(&env_hash)
&& mailbox!(&folder_hash, self.folders)
.envelopes
.contains(&env_hash)
{
return None;
}
mailbox!(&folder_hash, self.folders).insert(env_hash); mailbox!(&folder_hash, self.folders).insert(env_hash);
self.collection.insert(*envelope, folder_hash); self.collection.insert(*envelope, folder_hash);
if self if self
@ -436,6 +470,7 @@ impl Account {
RefreshEventKind::Rescan => { RefreshEventKind::Rescan => {
let ref_folders: FnvHashMap<FolderHash, Folder> = self.backend.folders(); let ref_folders: FnvHashMap<FolderHash, Folder> = self.backend.folders();
let handle = Account::new_worker( let handle = Account::new_worker(
&self.settings,
ref_folders[&folder_hash].clone(), ref_folders[&folder_hash].clone(),
&mut self.backend, &mut self.backend,
self.notify_fn.clone(), self.notify_fn.clone(),
@ -444,17 +479,40 @@ impl Account {
} }
RefreshEventKind::Failure(e) => { RefreshEventKind::Failure(e) => {
debug!("RefreshEvent Failure: {}", e.to_string()); debug!("RefreshEvent Failure: {}", e.to_string());
let sender = sender.clone(); self.watch(context);
self.watch(RefreshEventConsumer::new(Box::new(move |r| {
sender.send(crate::types::ThreadEvent::from(r)).unwrap();
})));
} }
} }
} }
None None
} }
pub fn watch(&self, r: RefreshEventConsumer) { pub fn watch(
self.backend.watch(r).unwrap(); &self,
context: (
&mut WorkController,
&Sender<ThreadEvent>,
&mut VecDeque<UIEvent>,
),
) {
let (work_controller, sender, replies) = context;
let sender = sender.clone();
let r = RefreshEventConsumer::new(Box::new(move |r| {
sender.send(ThreadEvent::from(r)).unwrap();
}));
match self.backend.watch(r, work_controller.get_context()) {
Ok(id) => {
work_controller
.static_threads
.lock()
.unwrap()
.insert(id, format!("watching {}", self.name()).into());
}
Err(e) => {
replies.push_back(UIEvent::StatusEvent(StatusEvent::DisplayMessage(
e.to_string(),
)));
}
}
} }
pub fn len(&self) -> usize { pub fn len(&self) -> usize {

View File

@ -89,6 +89,7 @@ pub struct Context {
sender: Sender<ThreadEvent>, sender: Sender<ThreadEvent>,
receiver: Receiver<ThreadEvent>, receiver: Receiver<ThreadEvent>,
input: InputHandler, input: InputHandler,
work_controller: WorkController,
pub temp_files: Vec<File>, pub temp_files: Vec<File>,
} }
@ -117,6 +118,10 @@ impl Context {
Err(n) => Err(n), Err(n) => Err(n),
} }
} }
pub fn work_controller(&self) -> &WorkController {
&self.work_controller
}
} }
/// A State object to manage and own components and components of the UI. `State` is responsible for /// A State object to manage and own components and components of the UI. `State` is responsible for
@ -132,7 +137,6 @@ pub struct State {
components: Vec<Box<dyn Component>>, components: Vec<Box<dyn Component>>,
pub context: Context, pub context: Context,
threads: FnvHashMap<thread::ThreadId, (Sender<bool>, thread::JoinHandle<()>)>, threads: FnvHashMap<thread::ThreadId, (Sender<bool>, thread::JoinHandle<()>)>,
work_controller: WorkController,
} }
impl Drop for State { impl Drop for State {
@ -218,6 +222,7 @@ impl State {
dirty_areas: VecDeque::with_capacity(5), dirty_areas: VecDeque::with_capacity(5),
replies: VecDeque::with_capacity(5), replies: VecDeque::with_capacity(5),
temp_files: Vec::new(), temp_files: Vec::new(),
work_controller: WorkController::new(sender.clone()),
sender, sender,
receiver, receiver,
@ -227,13 +232,12 @@ impl State {
}, },
}, },
threads: FnvHashMap::with_capacity_and_hasher(1, Default::default()), threads: FnvHashMap::with_capacity_and_hasher(1, Default::default()),
work_controller: WorkController::new(),
}; };
for a in s.context.accounts.iter_mut() { for a in s.context.accounts.iter_mut() {
for worker in a.workers.values_mut() { for worker in a.workers.values_mut() {
if let Some(worker) = worker.as_mut() { if let Some(worker) = worker.as_mut() {
if let Some(w) = worker.work() { if let Some(w) = worker.work() {
s.work_controller.queue.add_work(w); s.context.work_controller.queue.add_work(w);
} }
} }
} }
@ -250,24 +254,35 @@ impl State {
.unwrap(); .unwrap();
s.flush(); s.flush();
debug!("inserting mailbox hashes:"); debug!("inserting mailbox hashes:");
for (x, account) in s.context.accounts.iter_mut().enumerate() { {
for folder in account.backend.folders().values() { /* Account::watch() needs
debug!("hash & folder: {:?} {}", folder.hash(), folder.name()); * - work_controller to pass `work_context` to the watcher threads and then add them
s.context.mailbox_hashes.insert(folder.hash(), x); * to the controller's static thread list,
* - sender to pass a RefreshEventConsumer closure to watcher threads for them to
* inform the main binary that refresh events arrived
* - replies to report any failures to the user
*/
let Context {
ref mut work_controller,
ref sender,
ref mut replies,
ref mut accounts,
ref mut mailbox_hashes,
..
} = &mut s.context;
for (x, account) in accounts.iter_mut().enumerate() {
for folder in account.backend.folders().values() {
debug!("hash & folder: {:?} {}", folder.hash(), folder.name());
mailbox_hashes.insert(folder.hash(), x);
}
account.watch((work_controller, sender, replies));
} }
let sender = s.context.sender.clone();
account.watch(RefreshEventConsumer::new(Box::new(move |r| {
sender.send(ThreadEvent::from(r)).unwrap();
})));
} }
s.restore_input(); s.restore_input();
s s
} }
pub fn worker_receiver(&mut self) -> Receiver<bool> {
self.work_controller.results_rx()
}
/* /*
* When we receive a folder hash from a watcher thread, * When we receive a folder hash from a watcher thread,
* we match the hash to the index of the mailbox, request a reload * we match the hash to the index of the mailbox, request a reload
@ -280,8 +295,16 @@ impl State {
self.context.replies.push_back(UIEvent::from(event)); self.context.replies.push_back(UIEvent::from(event));
return; return;
} }
let Context {
ref mut work_controller,
ref sender,
ref mut replies,
ref mut accounts,
..
} = &mut self.context;
if let Some(notification) = if let Some(notification) =
self.context.accounts[idxa].reload(event, hash, &self.context.sender) accounts[idxa].reload(event, hash, (work_controller, sender, replies))
{ {
if let UIEvent::Notification(_, _) = notification { if let UIEvent::Notification(_, _) = notification {
self.context self.context

View File

@ -50,6 +50,8 @@ pub enum ThreadEvent {
/// A watched folder has been refreshed. /// A watched folder has been refreshed.
RefreshMailbox(Box<RefreshEvent>), RefreshMailbox(Box<RefreshEvent>),
UIEvent(UIEvent), UIEvent(UIEvent),
/// A thread has updated some of its information
Pulse,
//Decode { _ }, // For gpg2 signature check //Decode { _ }, // For gpg2 signature check
} }

View File

@ -1,93 +1,67 @@
use crate::types::ThreadEvent;
use crossbeam::{ use crossbeam::{
channel::{bounded, unbounded, Receiver, Sender}, channel::{bounded, unbounded, Sender},
select, select,
}; };
use melib::async_workers::Work; use fnv::FnvHashMap;
use std; use melib::async_workers::{Work, WorkContext};
use std::sync::Arc;
use std::sync::Mutex;
use std::thread; use std::thread;
const MAX_WORKER: usize = 4; const MAX_WORKER: usize = 4;
pub struct WorkController { /// Representation of a worker thread for use in `WorkController`. These values are to be displayed
pub queue: WorkQueue<Work>, /// to the user.
thread_end_tx: Sender<bool>, #[derive(Debug)]
results: Option<Receiver<bool>>, pub struct Worker {
threads: Vec<std::thread::JoinHandle<()>>, pub name: String,
pub status: String,
} }
impl WorkController { impl From<String> for Worker {
pub fn results_rx(&mut self) -> Receiver<bool> { fn from(val: String) -> Self {
self.results.take().unwrap() Worker {
name: val,
status: String::new(),
}
} }
} }
pub struct WorkController {
pub queue: WorkQueue,
thread_end_tx: Sender<bool>,
/// Worker threads that take up on jobs from self.queue
pub threads: Arc<Mutex<FnvHashMap<thread::ThreadId, Worker>>>,
/// Special function threads that live indefinitely (eg watching a mailbox)
pub static_threads: Arc<Mutex<FnvHashMap<thread::ThreadId, Worker>>>,
work_context: WorkContext,
}
impl Drop for WorkController { impl Drop for WorkController {
fn drop(&mut self) { fn drop(&mut self) {
for _ in 0..self.threads.len() { for _ in 0..self.threads.lock().unwrap().len() {
self.thread_end_tx.send(true).unwrap(); self.thread_end_tx.send(true).unwrap();
} }
/*
let threads = mem::replace(&mut self.threads, Vec::new());
for handle in threads {
handle.join().unwrap();
}
*/
} }
} }
// We need a way to keep track of what work needs to be done.
// This is a multi-source, multi-consumer queue which we call a
// WorkQueue.
// To create this type, we wrap a mutex (std::sync::mutex) around a
// queue (technically a double-ended queue, std::collections::VecDeque).
//
// Mutex stands for MUTually EXclusive. It essentially ensures that only
// one thread has access to a given resource at one time.
use std::sync::Mutex;
// A VecDeque is a double-ended queue, but we will only be using it in forward
// mode; that is, we will push onto the back and pull from the front.
use std::collections::VecDeque;
// Finally we wrap the whole thing in Arc (Atomic Reference Counting) so that
// we can safely share it with other threads. Arc (std::sync::arc) is a lot
// like Rc (std::rc::Rc), in that it allows multiple references to some memory
// which is freed when no references remain, except that it is atomic, making
// it comparitively slow but able to be shared across the thread boundary.
use std::sync::Arc;
// All three of these types are wrapped around a generic type T.
// T is required to be Send (a marker trait automatically implemented when
// it is safe to do so) because it denotes types that are safe to move between
// threads, which is the whole point of the WorkQueue.
// For this implementation, T is required to be Copy as well, for simplicity.
/// A generic work queue for work elements which can be trivially copied.
/// Any producer of work can add elements and any worker can consume them.
/// WorkQueue derives Clone so that it can be distributed among threads.
#[derive(Clone)] #[derive(Clone)]
pub struct WorkQueue<T: Send> { pub struct WorkQueue {
inner: Arc<Mutex<VecDeque<T>>>, inner: Arc<Mutex<Vec<Work>>>,
new_jobs_tx: chan::Sender<bool>, new_jobs_tx: Sender<bool>,
work_context: WorkContext,
} }
impl<T: Send> WorkQueue<T> { impl WorkQueue {
// Creating one of these by hand would be kind of a pain, fn new(new_jobs_tx: Sender<bool>, work_context: WorkContext) -> Self {
// so let's provide a convenience function.
/// Creates a new WorkQueue, ready to be used.
fn new(new_jobs_tx: chan::Sender<bool>) -> Self {
Self { Self {
inner: Arc::new(Mutex::new(VecDeque::new())), inner: Arc::new(Mutex::new(Vec::new())),
new_jobs_tx, new_jobs_tx,
work_context,
} }
} }
// This is the function workers will use to acquire work from the queue.
// They will call it in a loop, checking to see if there is any work available.
/// Blocks the current thread until work is available, then /// Blocks the current thread until work is available, then
/// gets the data required to perform that work. /// gets the data required to perform that work.
/// ///
@ -97,37 +71,22 @@ impl<T: Send> WorkQueue<T> {
/// # Panics /// # Panics
/// Panics if the underlying mutex became poisoned. This is exceedingly /// Panics if the underlying mutex became poisoned. This is exceedingly
/// unlikely. /// unlikely.
fn get_work(&self) -> Option<T> { fn get_work(&self) -> Option<Work> {
// Try to get a lock on the Mutex. If this fails, there is a // try to get a lock on the mutex.
// problem with the mutex - it's poisoned, meaning that a thread that
// held the mutex lock panicked before releasing it. There is no way
// to guarantee that all its invariants are upheld, so we need to not
// use it in that case.
let maybe_queue = self.inner.lock(); let maybe_queue = self.inner.lock();
// A lot is going on here. self.inner is an Arc of Mutex. Arc can deref
// into its internal type, so we can call the methods of that inner
// type (Mutex) without dereferencing, so this is like
// *(self.inner).lock()
// but doesn't look awful. Mutex::lock() returns a
// Result<MutexGuard<VecDeque<T>>>.
// Unwrapping with if let, we get a MutexGuard, which is an RAII guard
// that unlocks the Mutex when it goes out of scope.
if let Ok(mut queue) = maybe_queue { if let Ok(mut queue) = maybe_queue {
// queue is a MutexGuard<VecDeque>, so this is like if queue.is_empty() {
// (*queue).pop_front() return None;
// Returns Some(item) or None if there are no more items. } else {
queue.pop_front() return Some(queue.swap_remove(0));
}
// The function has returned, so queue goes out of scope and the
// mutex unlocks.
} else { } else {
// There's a problem with the mutex. // poisoned mutex, some other thread holding the mutex has panicked!
panic!("WorkQueue::get_work() tried to lock a poisoned mutex"); panic!("WorkQueue::get_work() tried to lock a poisoned mutex");
} }
} }
// Both the controller (main thread) and possibly workers can use this // Both the controller (main thread) and workers can use this
// function to add work to the queue. // function to add work to the queue.
/// Blocks the current thread until work can be added, then /// Blocks the current thread until work can be added, then
@ -137,16 +96,23 @@ impl<T: Send> WorkQueue<T> {
/// # Panics /// # Panics
/// Panics if the underlying mutex became poisoned. This is exceedingly /// Panics if the underlying mutex became poisoned. This is exceedingly
/// unlikely. /// unlikely.
pub fn add_work(&self, work: T) -> usize { pub fn add_work(&self, work: Work) {
if work.is_static {
self.work_context.new_work.send(work).unwrap();
return;
}
// As above, try to get a lock on the mutex. // As above, try to get a lock on the mutex.
if let Ok(mut queue) = self.inner.lock() { if let Ok(mut queue) = self.inner.lock() {
// As above, we can use the MutexGuard<VecDeque<T>> to access /* Insert in position that maintains the queue sorted */
// the internal VecDeque. let pos = match queue.binary_search_by(|probe| probe.cmp(&work)) {
queue.push_back(work); Ok(p) => p,
Err(p) => p,
};
queue.insert(pos, work);
/* inform threads that new job is available */
self.new_jobs_tx.send(true).unwrap(); self.new_jobs_tx.send(true).unwrap();
// Now return the length of the queue.
queue.len()
} else { } else {
panic!("WorkQueue::add_work() tried to lock a poisoned mutex"); panic!("WorkQueue::add_work() tried to lock a poisoned mutex");
} }
@ -154,79 +120,77 @@ impl<T: Send> WorkQueue<T> {
} }
impl WorkController { impl WorkController {
pub fn new() -> WorkController { pub fn new(pulse: Sender<ThreadEvent>) -> WorkController {
let (new_jobs_tx, new_jobs_rx) = unbounded(); let (new_jobs_tx, new_jobs_rx) = unbounded();
// Create a new work queue to keep track of what work needs to be done.
// Note that the queue is internally mutable (or, rather, the Mutex is),
// but this binding doesn't need to be mutable. This isn't unsound because
// the Mutex ensures at runtime that no two references can be used;
// therefore no mutation can occur at the same time as aliasing.
let queue: WorkQueue<Work> = WorkQueue::new(new_jobs_tx);
// Create a MPSC (Multiple Producer, Single Consumer) channel. Every worker /* create a channel for jobs to send new work to Controller thread */
// is a producer, the main thread is a consumer; the producers put their let (new_work_tx, new_work_rx) = unbounded();
// work into the channel when it's done.
let (results_tx, results_rx) = unbounded();
/* create a channel for jobs to set their names */
let (set_name_tx, set_name_rx) = unbounded();
/* create a channel for jobs to set their statuses */
let (set_status_tx, set_status_rx) = unbounded();
/* create a channel for jobs to announce their demise */
let (finished_tx, finished_rx) = unbounded();
/* each associated thread will hold a copy of this context item in order to communicate
* with the controller thread */
let work_context = WorkContext {
new_work: new_work_tx,
set_name: set_name_tx,
set_status: set_status_tx,
finished: finished_tx,
};
let queue: WorkQueue = WorkQueue::new(new_jobs_tx, work_context.clone());
// Create a SyncFlag to share whether or not there are more jobs to be done. // Create a SyncFlag to share whether or not there are more jobs to be done.
let (thread_end_tx, thread_end_rx) = bounded(::std::mem::size_of::<bool>()); let (thread_end_tx, thread_end_rx) = bounded(1);
// This Vec will hold thread join handles to allow us to not exit while work let threads_lock: Arc<Mutex<FnvHashMap<thread::ThreadId, Worker>>> =
// is still being done. These handles provide a .join() method which blocks Arc::new(Mutex::new(FnvHashMap::default()));
// the current thread until the thread referred to by the handle exits.
let mut threads = Vec::new();
let static_threads_lock: Arc<Mutex<FnvHashMap<thread::ThreadId, Worker>>> =
Arc::new(Mutex::new(FnvHashMap::default()));
let mut threads = threads_lock.lock().unwrap();
/* spawn worker threads */
for thread_num in 0..MAX_WORKER { for thread_num in 0..MAX_WORKER {
// Get a reference to the queue for the thread to use /* Each worker thread will wait on two channels: thread_end and new_jobs. thread_end
// .clone() here doesn't clone the actual queue data, but rather the * informs the worker that it should quit and new_jobs informs that there is a new job
// internal Arc produces a new reference for use in the new queue * available inside the queue. Only one worker will get each job, and others will
// instance. * go back to waiting on the channels */
let thread_queue = queue.clone(); let thread_queue = queue.clone();
// Similarly, create a new transmitter for the thread to use
let thread_results_tx = results_tx.clone();
let thread_end_rx = thread_end_rx.clone(); let thread_end_rx = thread_end_rx.clone();
let new_jobs_rx = new_jobs_rx.clone(); let new_jobs_rx = new_jobs_rx.clone();
let new_jobs_rx = new_jobs_rx.clone();
let work_context = work_context.clone();
let pulse = pulse.clone();
// thread::spawn takes a closure (an anonymous function that "closes"
// over its environment). The move keyword means it takes ownership of
// those variables, meaning they can't be used again in the main thread.
let handle = thread::spawn(move || { let handle = thread::spawn(move || {
// A varaible to keep track of how much work was done.
let mut work_done = 0; let mut work_done = 0;
'work_loop: loop { 'work_loop: loop {
debug!("Waiting for work"); debug!("Waiting for work");
// Loop while there's expected to be work, looking for work.
select! { select! {
recv(thread_end_rx) -> _ => { recv(thread_end_rx) -> _ => {
debug!("received thread_end_rx, quitting"); debug!("received thread_end_rx, quitting");
break 'work_loop; break 'work_loop;
}, },
recv(new_jobs_rx) -> _ => { recv(new_jobs_rx) -> _ => {
// If work is available, do that work.
while let Some(work) = thread_queue.get_work() { while let Some(work) = thread_queue.get_work() {
debug!("Got some work"); debug!("Got some work");
// Do some work. work.compute(work_context.clone());
work.compute();
debug!("finished work"); debug!("finished work");
// Record that some work was done.
work_done += 1; work_done += 1;
work_context.set_name.send((std::thread::current().id(), "idle-worker".to_string())).unwrap();
work_context.set_status.send((std::thread::current().id(), "inactive".to_string())).unwrap();
pulse.send(ThreadEvent::Pulse).unwrap();
// Send the work and the result of that work.
//
// Sending could fail. If so, there's no use in
// doing any more work, so abort.
thread_results_tx.send(true).unwrap();
// Signal to the operating system that now is a good time
// to give another thread a chance to run.
//
// This isn't strictly necessary - the OS can preemptively
// switch between threads, without asking - but it helps make
// sure that other threads do get a chance to get some work.
std::thread::yield_now(); std::thread::yield_now();
} }
continue 'work_loop; continue 'work_loop;
@ -234,19 +198,112 @@ impl WorkController {
} }
} }
// Report the amount of work done. /* report the amount of work done. */
debug!("Thread {} did {} jobs.", thread_num, work_done); debug!("Thread {} did {} jobs.", thread_num, work_done);
}); });
// Add the handle for the newly spawned thread to the list of handles /* add the handle for the newly spawned thread to the list of handles */
threads.push(handle); threads.insert(handle.thread().id(), String::from("idle-worker").into());
}
/* drop lock */
drop(threads);
{
/* start controller thread */
let threads_lock = threads_lock.clone();
let _static_threads_lock = static_threads_lock.clone();
let thread_queue = queue.clone();
let threads_lock = threads_lock.clone();
let thread_end_rx = thread_end_rx.clone();
let work_context = work_context.clone();
let handle = thread::spawn(move || 'control_loop: loop {
select! {
recv(thread_end_rx) -> _ => {
debug!("received thread_end_rx, quitting");
break 'control_loop;
},
recv(new_work_rx) -> work => {
if let Ok(work) = work {
if work.is_static {
let work_context = work_context.clone();
let handle = thread::spawn(move || work.compute(work_context));
_static_threads_lock.lock().unwrap().insert(handle.thread().id(), String::new().into());
} else {
thread_queue.add_work(work);
}
}
}
recv(set_name_rx) -> new_name => {
if let Ok((thread_id, new_name)) = new_name {
let mut threads = threads_lock.lock().unwrap();
let mut static_threads = _static_threads_lock.lock().unwrap();
if threads.contains_key(&thread_id) {
threads.entry(thread_id).and_modify(|e| e.name = new_name);
} else if static_threads.contains_key(&thread_id) {
static_threads.entry(thread_id).and_modify(|e| e.name = new_name);
} else {
unreachable!()
}
pulse.send(ThreadEvent::Pulse).unwrap();
}
}
recv(set_status_rx) -> new_status => {
if let Ok((thread_id, new_status)) = new_status {
let mut threads = threads_lock.lock().unwrap();
let mut static_threads = _static_threads_lock.lock().unwrap();
if threads.contains_key(&thread_id) {
threads.entry(thread_id).and_modify(|e| e.status = new_status);
} else if static_threads.contains_key(&thread_id) {
static_threads.entry(thread_id).and_modify(|e| e.status = new_status);
debug!(&static_threads[&thread_id]);
} else {
unreachable!()
}
pulse.send(ThreadEvent::Pulse).unwrap();
}
}
recv(finished_rx) -> dead_thread_id => {
if let Ok(thread_id) = dead_thread_id {
let mut threads = threads_lock.lock().unwrap();
let mut static_threads = _static_threads_lock.lock().unwrap();
if threads.contains_key(&thread_id) {
threads.remove(&thread_id);
} else if static_threads.contains_key(&thread_id) {
static_threads.remove(&thread_id);
} else {
unreachable!()
}
pulse.send(ThreadEvent::Pulse).unwrap();
}
}
}
});
let mut static_threads = static_threads_lock.lock().unwrap();
static_threads.insert(
handle.thread().id(),
"WorkController-thread".to_string().into(),
);
} }
WorkController { WorkController {
queue, queue,
thread_end_tx, thread_end_tx,
results: Some(results_rx), threads: threads_lock,
threads, static_threads: static_threads_lock,
work_context,
} }
} }
pub fn add_static_thread(&mut self, id: std::thread::ThreadId) {
self.static_threads
.lock()
.unwrap()
.insert(id, String::new().into());
}
pub fn get_context(&self) -> WorkContext {
self.work_context.clone()
}
} }