From f394fde1434429aa7ff750665516097f946f1c1f Mon Sep 17 00:00:00 2001 From: Manos Pitsidianakis Date: Wed, 11 Sep 2019 17:57:55 +0300 Subject: [PATCH] add priority and info to jobs and workers jobs now have a priority given to them, in order to parse some mailboxes (eg INBOX, Sent) first. worker threads now can set their names and status --- melib/src/async_workers.rs | 65 ++++- melib/src/backends.rs | 6 +- melib/src/backends/imap.rs | 35 +-- melib/src/backends/imap/folder.rs | 2 +- melib/src/backends/imap/watch.rs | 148 ++++++++++- melib/src/backends/maildir/backend.rs | 24 +- melib/src/backends/mbox.rs | 15 +- src/bin.rs | 9 +- ui/src/conf/accounts.rs | 96 ++++++-- ui/src/state.rs | 55 +++-- ui/src/types.rs | 2 + ui/src/workers.rs | 341 +++++++++++++++----------- 12 files changed, 577 insertions(+), 221 deletions(-) diff --git a/melib/src/async_workers.rs b/melib/src/async_workers.rs index 3dfb36ef..b3d7b49d 100644 --- a/melib/src/async_workers.rs +++ b/melib/src/async_workers.rs @@ -40,11 +40,45 @@ use std::fmt; use std::sync::Arc; #[derive(Clone)] -pub struct Work(pub Arc () + Send + Sync>>); +pub struct WorkContext { + pub new_work: Sender, + pub set_name: Sender<(std::thread::ThreadId, String)>, + pub set_status: Sender<(std::thread::ThreadId, String)>, + pub finished: Sender, +} + +#[derive(Clone)] +pub struct Work { + priority: u64, + pub is_static: bool, + pub closure: Arc () + Send + Sync>>, + name: String, + status: String, +} + +impl Ord for Work { + fn cmp(&self, other: &Work) -> std::cmp::Ordering { + self.priority.cmp(&other.priority) + } +} + +impl PartialOrd for Work { + fn partial_cmp(&self, other: &Work) -> Option { + Some(self.priority.cmp(&other.priority)) + } +} + +impl PartialEq for Work { + fn eq(&self, other: &Work) -> bool { + self.priority == other.priority + } +} + +impl Eq for Work {} impl Work { - pub fn compute(&self) { - (self.0)(); + pub fn compute(&self, work_context: WorkContext) { + (self.closure)(work_context); } } @@ -80,6 +114,8 @@ impl fmt::Debug for AsyncStatus { pub struct AsyncBuilder { tx: Sender>, rx: Receiver>, + priority: u64, + is_static: bool, } #[derive(Clone, Debug)] @@ -105,6 +141,8 @@ where AsyncBuilder { tx: sender, rx: receiver, + priority: 0, + is_static: false, } } /// Returns the sender object of the promise's channel. @@ -115,10 +153,27 @@ where pub fn rx(&mut self) -> Receiver> { self.rx.clone() } + + pub fn set_priority(&mut self, new_val: u64) -> &mut Self { + self.priority = new_val; + self + } + + pub fn set_is_static(&mut self, new_val: bool) -> &mut Self { + self.is_static = new_val; + self + } + /// Returns an `Async` object that contains a `Thread` join handle that returns a `T` - pub fn build(self, work: Box () + Send + Sync>) -> Async { + pub fn build(self, work: Box () + Send + Sync>) -> Async { Async { - work: Work(Arc::new(work)), + work: Work { + priority: self.priority, + is_static: self.is_static, + closure: Arc::new(work), + name: String::new(), + status: String::new(), + }, tx: self.tx, rx: self.rx, active: false, diff --git a/melib/src/backends.rs b/melib/src/backends.rs index fe7ee1a4..57cee63a 100644 --- a/melib/src/backends.rs +++ b/melib/src/backends.rs @@ -163,7 +163,11 @@ type NewFolderName = String; pub trait MailBackend: ::std::fmt::Debug { fn get(&mut self, folder: &Folder) -> Async>>; - fn watch(&self, sender: RefreshEventConsumer) -> Result<()>; + fn watch( + &self, + sender: RefreshEventConsumer, + work_context: WorkContext, + ) -> Result; fn folders(&self) -> FnvHashMap; fn operation(&self, hash: EnvelopeHash, folder_hash: FolderHash) -> Box; diff --git a/melib/src/backends/imap.rs b/melib/src/backends/imap.rs index 9e44c43c..6e209742 100644 --- a/melib/src/backends/imap.rs +++ b/melib/src/backends/imap.rs @@ -33,7 +33,7 @@ pub use watch::*; extern crate native_tls; -use crate::async_workers::{Async, AsyncBuilder, AsyncStatus}; +use crate::async_workers::{Async, AsyncBuilder, AsyncStatus, WorkContext}; use crate::backends::BackendOp; use crate::backends::FolderHash; use crate::backends::RefreshEvent; @@ -64,7 +64,6 @@ pub struct ImapType { capabilities: FnvHashSet>, folders: FnvHashMap, - folder_connections: FnvHashMap>>, hash_index: Arc>>, uid_index: Arc>>, } @@ -89,7 +88,7 @@ impl MailBackend for ImapType { let folder_hash = folder.hash(); let folder_exists = self.folders[&folder_hash].exists.clone(); let connection = self.connection.clone(); - let closure = move || { + let closure = move |_work_context| { let connection = connection.clone(); let tx = tx.clone(); let mut response = String::with_capacity(8 * 1024); @@ -162,16 +161,25 @@ impl MailBackend for ImapType { w.build(handle) } - fn watch(&self, sender: RefreshEventConsumer) -> Result<()> { + fn watch( + &self, + sender: RefreshEventConsumer, + work_context: WorkContext, + ) -> Result { let has_idle: bool = self.capabilities.contains(&b"IDLE"[0..]); - let folders = self.imap_folders(); + let folders = self.folders.clone(); let conn = self.new_connection()?; let main_conn = self.connection.clone(); let hash_index = self.hash_index.clone(); let uid_index = self.uid_index.clone(); - std::thread::Builder::new() + let handle = std::thread::Builder::new() .name(format!("{} imap connection", self.account_name.as_str(),)) .spawn(move || { + let thread = std::thread::current(); + work_context + .set_status + .send((thread.id(), "watching".to_string())) + .unwrap(); let kit = ImapWatchKit { conn, main_conn, @@ -179,6 +187,7 @@ impl MailBackend for ImapType { uid_index, folders, sender, + work_context, }; if has_idle { idle(kit); @@ -186,7 +195,7 @@ impl MailBackend for ImapType { poll_with_examine(kit); } })?; - Ok(()) + Ok(handle.thread().id()) } fn folders(&self) -> FnvHashMap { @@ -194,7 +203,7 @@ impl MailBackend for ImapType { return self .folders .iter() - .map(|(h, f)| (*h, f.clone() as Folder)) + .map(|(h, f)| (*h, Box::new(Clone::clone(f)) as Folder)) .collect(); } @@ -241,7 +250,7 @@ impl MailBackend for ImapType { debug!(&folders); folders .iter() - .map(|(h, f)| (*h, f.clone() as Folder)) + .map(|(h, f)| (*h, Box::new(Clone::clone(f)) as Folder)) .collect() } @@ -489,7 +498,6 @@ impl ImapType { folders: Default::default(), connection: Arc::new(Mutex::new(ImapConnection { cmd_id, stream })), danger_accept_invalid_certs, - folder_connections: Default::default(), hash_index: Default::default(), uid_index: Default::default(), capabilities: Default::default(), @@ -532,13 +540,6 @@ impl ImapType { for f in m.folders.values_mut() { f.children.retain(|c| keys.contains(c)); } - /* - for f in m.folders.keys() { - m.folder_connections.insert( - *f, - Arc::new(Mutex::new(exit_on_error!(s returning m.new_connection()))), - ); - }*/ m } diff --git a/melib/src/backends/imap/folder.rs b/melib/src/backends/imap/folder.rs index 988fe27f..e1b27649 100644 --- a/melib/src/backends/imap/folder.rs +++ b/melib/src/backends/imap/folder.rs @@ -21,7 +21,7 @@ use crate::backends::{BackendFolder, Folder, FolderHash}; use std::sync::{Arc, Mutex}; -#[derive(Debug, Default)] +#[derive(Debug, Default, Clone)] pub struct ImapFolder { pub(super) hash: FolderHash, pub(super) path: String, diff --git a/melib/src/backends/imap/watch.rs b/melib/src/backends/imap/watch.rs index 7819be07..29448c7b 100644 --- a/melib/src/backends/imap/watch.rs +++ b/melib/src/backends/imap/watch.rs @@ -29,12 +29,14 @@ pub struct ImapWatchKit { pub uid_index: Arc>>, pub folders: FnvHashMap, pub sender: RefreshEventConsumer, + pub work_context: WorkContext, } macro_rules! exit_on_error { - ($sender:expr, $folder_hash:ident, $($result:expr)+) => { + ($sender:expr, $folder_hash:ident, $work_context:ident, $thread_id:ident, $($result:expr)+) => { $(if let Err(e) = $result { debug!("failure: {}", e.to_string()); + $work_context.set_status.send(($thread_id, e.to_string())).unwrap(); $sender.send(RefreshEvent { hash: $folder_hash, kind: RefreshEventKind::Failure(e), @@ -53,12 +55,32 @@ pub fn poll_with_examine(kit: ImapWatchKit) { uid_index, folders, sender, + work_context, } = kit; let mut response = String::with_capacity(8 * 1024); + let thread_id: std::thread::ThreadId = std::thread::current().id(); loop { + work_context + .set_status + .send((thread_id, "sleeping...".to_string())) + .unwrap(); std::thread::sleep(std::time::Duration::from_millis(5 * 60 * 1000)); for folder in folders.values() { - examine_updates(folder, &sender, &mut conn, &hash_index, &uid_index); + work_context + .set_status + .send(( + thread_id, + format!("examining `{}` for updates...", folder.path()), + )) + .unwrap(); + examine_updates( + folder, + &sender, + &mut conn, + &hash_index, + &uid_index, + &work_context, + ); } let mut main_conn = main_conn.lock().unwrap(); main_conn.send_command(b"NOOP").unwrap(); @@ -77,7 +99,9 @@ pub fn idle(kit: ImapWatchKit) { uid_index, folders, sender, + work_context, } = kit; + let thread_id: std::thread::ThreadId = std::thread::current().id(); let folder: &ImapFolder = folders .values() .find(|f| f.parent.is_none() && f.path().eq_ignore_ascii_case("INBOX")) @@ -87,6 +111,8 @@ pub fn idle(kit: ImapWatchKit) { exit_on_error!( sender, folder_hash, + work_context, + thread_id, conn.read_response(&mut response) conn.send_command(format!("SELECT {}", folder.path()).as_bytes()) conn.read_response(&mut response) @@ -112,7 +138,17 @@ pub fn idle(kit: ImapWatchKit) { } }; } - exit_on_error!(sender, folder_hash, conn.send_command(b"IDLE")); + exit_on_error!( + sender, + folder_hash, + work_context, + thread_id, + conn.send_command(b"IDLE") + ); + work_context + .set_status + .send((thread_id, "IDLEing".to_string())) + .unwrap(); let mut iter = ImapBlockingConnection::from(conn); let mut beat = std::time::Instant::now(); let mut watch = std::time::Instant::now(); @@ -127,6 +163,8 @@ pub fn idle(kit: ImapWatchKit) { exit_on_error!( sender, folder_hash, + work_context, + thread_id, iter.conn.set_nonblocking(true) iter.conn.send_raw(b"DONE") iter.conn.read_response(&mut response) @@ -142,6 +180,8 @@ pub fn idle(kit: ImapWatchKit) { exit_on_error!( sender, folder_hash, + work_context, + thread_id, iter.conn.set_nonblocking(true) iter.conn.send_raw(b"DONE") iter.conn.read_response(&mut response) @@ -151,11 +191,31 @@ pub fn idle(kit: ImapWatchKit) { /* Skip INBOX */ continue; } - examine_updates(folder, &sender, &mut iter.conn, &hash_index, &uid_index); + work_context + .set_status + .send(( + thread_id, + format!("examining `{}` for updates...", folder.path()), + )) + .unwrap(); + examine_updates( + folder, + &sender, + &mut iter.conn, + &hash_index, + &uid_index, + &work_context, + ); } + work_context + .set_status + .send((thread_id, "done examining mailboxes.".to_string())) + .unwrap(); exit_on_error!( sender, folder_hash, + work_context, + thread_id, iter.conn.send_command(b"IDLE") iter.conn.set_nonblocking(false) main_conn.lock().unwrap().send_command(b"NOOP") @@ -167,11 +227,17 @@ pub fn idle(kit: ImapWatchKit) { .to_full_result() .map_err(MeliError::from) { - Ok(Some(Recent(_))) => { + Ok(Some(Recent(r))) => { + work_context + .set_status + .send((thread_id, format!("got `{} RECENT` notification", r))) + .unwrap(); /* UID SEARCH RECENT */ exit_on_error!( sender, folder_hash, + work_context, + thread_id, iter.conn.set_nonblocking(true) iter.conn.send_raw(b"DONE") iter.conn.read_response(&mut response) @@ -189,6 +255,8 @@ pub fn idle(kit: ImapWatchKit) { exit_on_error!( sender, folder_hash, + work_context, + thread_id, iter.conn.send_command( &[b"UID FETCH", v, b"(FLAGS RFC822.HEADER)"] .join(&b' '), @@ -201,8 +269,18 @@ pub fn idle(kit: ImapWatchKit) { .map_err(MeliError::from) { Ok(v) => { + let len = v.len(); + let mut ctr = 0; for (uid, flags, b) in v { + work_context + .set_status + .send(( + thread_id, + format!("parsing {}/{} envelopes..", ctr, len), + )) + .unwrap(); if let Ok(env) = Envelope::from_bytes(&b, flags) { + ctr += 1; hash_index .lock() .unwrap() @@ -220,6 +298,13 @@ pub fn idle(kit: ImapWatchKit) { }); } } + work_context + .set_status + .send(( + thread_id, + format!("parsed {}/{} envelopes.", ctr, len), + )) + .unwrap(); } Err(e) => { debug!(e); @@ -237,17 +322,25 @@ pub fn idle(kit: ImapWatchKit) { exit_on_error!( sender, folder_hash, + work_context, + thread_id, iter.conn.send_command(b"IDLE") iter.conn.set_nonblocking(false) ); } Ok(Some(Expunge(n))) => { + work_context + .set_status + .send((thread_id, format!("got `{} EXPUNGED` notification", n))) + .unwrap(); debug!("expunge {}", n); } Ok(Some(Exists(n))) => { exit_on_error!( sender, folder_hash, + work_context, + thread_id, iter.conn.set_nonblocking(true) iter.conn.send_raw(b"DONE") iter.conn.read_response(&mut response) @@ -256,10 +349,24 @@ pub fn idle(kit: ImapWatchKit) { * */ let mut prev_exists = folder.exists.lock().unwrap(); debug!("exists {}", n); + work_context + .set_status + .send(( + thread_id, + format!( + "got `{} EXISTS` notification (EXISTS was previously {} for {}", + n, + *prev_exists, + folder.path() + ), + )) + .unwrap(); if n > *prev_exists { exit_on_error!( sender, folder_hash, + work_context, + thread_id, iter.conn.send_command( &[ b"FETCH", @@ -275,11 +382,22 @@ pub fn idle(kit: ImapWatchKit) { .map_err(MeliError::from) { Ok(v) => { + let len = v.len(); + let mut ctr = 0; for (uid, flags, b) in v { + work_context + .set_status + .send(( + thread_id, + format!("parsing {}/{} envelopes..", ctr, len), + )) + .unwrap(); if uid_index.lock().unwrap().contains_key(&uid) { + ctr += 1; continue; } if let Ok(env) = Envelope::from_bytes(&b, flags) { + ctr += 1; hash_index .lock() .unwrap() @@ -297,6 +415,10 @@ pub fn idle(kit: ImapWatchKit) { }); } } + work_context + .set_status + .send((thread_id, format!("parsed {}/{} envelopes.", ctr, len))) + .unwrap(); } Err(e) => { debug!(e); @@ -310,12 +432,18 @@ pub fn idle(kit: ImapWatchKit) { exit_on_error!( sender, folder_hash, + work_context, + thread_id, iter.conn.send_command(b"IDLE") iter.conn.set_nonblocking(false) ); } Ok(None) | Err(_) => {} } + work_context + .set_status + .send((thread_id, "IDLEing".to_string())) + .unwrap(); } } } @@ -326,13 +454,17 @@ fn examine_updates( conn: &mut ImapConnection, hash_index: &Arc>>, uid_index: &Arc>>, + work_context: &WorkContext, ) { + let thread_id: std::thread::ThreadId = std::thread::current().id(); let folder_hash = folder.hash(); debug!("examining folder {} {}", folder_hash, folder.path()); let mut response = String::with_capacity(8 * 1024); exit_on_error!( sender, folder_hash, + work_context, + thread_id, conn.send_command(format!("EXAMINE {}", folder.path()).as_bytes()) conn.read_response(&mut response) ); @@ -354,6 +486,8 @@ fn examine_updates( exit_on_error!( sender, folder_hash, + work_context, + thread_id, conn.send_command(b"UID SEARCH RECENT") conn.read_response(&mut response) ); @@ -368,6 +502,8 @@ fn examine_updates( exit_on_error!( sender, folder_hash, + work_context, + thread_id, conn.send_command( &[b"UID FETCH", v, b"(FLAGS RFC822.HEADER)"] .join(&b' '), @@ -421,6 +557,8 @@ fn examine_updates( exit_on_error!( sender, folder_hash, + work_context, + thread_id, conn.send_command( &[ b"FETCH", diff --git a/melib/src/backends/maildir/backend.rs b/melib/src/backends/maildir/backend.rs index d06bfff9..272333bb 100644 --- a/melib/src/backends/maildir/backend.rs +++ b/melib/src/backends/maildir/backend.rs @@ -191,7 +191,11 @@ impl MailBackend for MaildirType { fn get(&mut self, folder: &Folder) -> Async>> { self.multicore(4, folder) } - fn watch(&self, sender: RefreshEventConsumer) -> Result<()> { + fn watch( + &self, + sender: RefreshEventConsumer, + work_context: WorkContext, + ) -> Result { let (tx, rx) = channel(); let mut watcher = watcher(tx, Duration::from_secs(2)).unwrap(); let root_path = self.path.to_path_buf(); @@ -199,11 +203,12 @@ impl MailBackend for MaildirType { let cache_dir = xdg::BaseDirectories::with_profile("meli", &self.name).unwrap(); debug!("watching {:?}", root_path); let hash_indexes = self.hash_indexes.clone(); - thread::Builder::new() + let handle = thread::Builder::new() .name("folder watch".to_string()) .spawn(move || { // Move `watcher` in the closure's scope so that it doesn't get dropped. let _watcher = watcher; + let _work_context = work_context; loop { match rx.recv() { /* @@ -457,7 +462,7 @@ impl MailBackend for MaildirType { } } })?; - Ok(()) + Ok(handle.thread().id()) } fn operation(&self, hash: EnvelopeHash, folder_hash: FolderHash) -> Box { @@ -676,8 +681,12 @@ impl MaildirType { let root_path = self.path.to_path_buf(); let map = self.hash_indexes.clone(); - let closure = move || { + let closure = move |work_context: crate::async_workers::WorkContext| { let name = name.clone(); + work_context + .set_name + .send((std::thread::current().id(), name.clone())) + .unwrap(); let root_path = root_path.clone(); let map = map.clone(); let tx = tx.clone(); @@ -798,6 +807,13 @@ impl MaildirType { for t in threads { let mut result = t.join().unwrap(); ret.append(&mut result); + work_context + .set_status + .send(( + std::thread::current().id(), + format!("parsing.. {}/{}", ret.len(), files.len()), + )) + .unwrap(); } }) .unwrap(); diff --git a/melib/src/backends/mbox.rs b/melib/src/backends/mbox.rs index a984602b..16368f5d 100644 --- a/melib/src/backends/mbox.rs +++ b/melib/src/backends/mbox.rs @@ -23,7 +23,7 @@ * https://wiki2.dovecot.org/MailboxFormat/mbox */ -use crate::async_workers::{Async, AsyncBuilder, AsyncStatus}; +use crate::async_workers::{Async, AsyncBuilder, AsyncStatus, WorkContext}; use crate::backends::BackendOp; use crate::backends::FolderHash; use crate::backends::{ @@ -374,7 +374,7 @@ impl MailBackend for MboxType { let folder_path = folder.path().to_string(); let folder_hash = folder.hash(); let folders = self.folders.clone(); - let closure = move || { + let closure = move |_work_context| { let tx = tx.clone(); let index = index.clone(); let file = match std::fs::OpenOptions::new() @@ -415,7 +415,11 @@ impl MailBackend for MboxType { w.build(handle) } - fn watch(&self, sender: RefreshEventConsumer) -> Result<()> { + fn watch( + &self, + sender: RefreshEventConsumer, + work_context: WorkContext, + ) -> Result { let (tx, rx) = channel(); let mut watcher = watcher(tx, std::time::Duration::from_secs(10)).unwrap(); for f in self.folders.lock().unwrap().values() { @@ -424,7 +428,7 @@ impl MailBackend for MboxType { } let index = self.index.clone(); let folders = self.folders.clone(); - std::thread::Builder::new() + let handle = std::thread::Builder::new() .name(format!( "watching {}", self.path.file_name().unwrap().to_str().unwrap() @@ -432,6 +436,7 @@ impl MailBackend for MboxType { .spawn(move || { // Move `watcher` in the closure's scope so that it doesn't get dropped. let _watcher = watcher; + let _work_context = work_context; let index = index; let folders = folders; loop { @@ -518,7 +523,7 @@ impl MailBackend for MboxType { } } })?; - Ok(()) + Ok(handle.thread().id()) } fn folders(&self) -> FnvHashMap { self.folders diff --git a/src/bin.rs b/src/bin.rs index 5c159fbb..b00be789 100644 --- a/src/bin.rs +++ b/src/bin.rs @@ -188,8 +188,6 @@ fn main() -> std::result::Result<(), std::io::Error> { let receiver = state.receiver(); - let worker_receiver = state.worker_receiver(); - /* Register some reasonably useful interfaces */ let window = Box::new(Tabbed::new(vec![ Box::new(listing::Listing::new(&state.context.accounts)), @@ -298,6 +296,9 @@ fn main() -> std::result::Result<(), std::io::Error> { state.rcv_event(e); state.render(); }, + ThreadEvent::Pulse => { + state.redraw(); + }, ThreadEvent::ThreadJoin(id) => { state.join(id); }, @@ -315,10 +316,6 @@ fn main() -> std::result::Result<(), std::io::Error> { _ => {} } }, - recv(worker_receiver) -> _ => { - /* Some worker thread finished their job, acknowledge - * it and move on*/ - }, } } // end of 'inner diff --git a/ui/src/conf/accounts.rs b/ui/src/conf/accounts.rs index 53d3423e..afd9ad3c 100644 --- a/ui/src/conf/accounts.rs +++ b/ui/src/conf/accounts.rs @@ -37,6 +37,8 @@ use melib::AddressBook; use melib::StackVec; use crate::types::UIEvent::{self, EnvelopeRemove, EnvelopeRename, EnvelopeUpdate, Notification}; +use crate::{workers::WorkController, StatusEvent, ThreadEvent}; +use crossbeam::Sender; use std::collections::VecDeque; use std::fs; use std::io; @@ -265,7 +267,7 @@ impl Account { ); workers.insert( *h, - Account::new_worker(f.clone(), &mut backend, notify_fn.clone()), + Account::new_worker(&settings, f.clone(), &mut backend, notify_fn.clone()), ); collection.threads.insert(*h, Threads::default()); } @@ -320,6 +322,7 @@ impl Account { } } fn new_worker( + settings: &AccountConf, folder: Folder, backend: &mut Box, notify_fn: Arc, @@ -328,21 +331,41 @@ impl Account { let mut builder = AsyncBuilder::new(); let our_tx = builder.tx(); let folder_hash = folder.hash(); - let w = builder.build(Box::new(move || { + let priority = match settings.folder_confs[folder.path()].usage { + Some(SpecialUseMailbox::Inbox) => 0, + Some(SpecialUseMailbox::Sent) => 1, + Some(SpecialUseMailbox::Drafts) | Some(SpecialUseMailbox::Trash) => 2, + Some(_) | None => { + 3 * folder + .path() + .split(if folder.path().contains('/') { + '/' + } else { + '.' + }) + .count() as u64 + } + }; + + /* This polling closure needs to be 'static', that is to spawn its own thread instead of + * being assigned to a worker thread. Otherwise the polling closures could fill up the + * workers causing no actual parsing to be done. If we could yield from within the worker + * threads' closures this could be avoided, but it requires green threads. + */ + builder.set_priority(priority).set_is_static(true); + let w = builder.build(Box::new(move |work_context| { + let name = format!("Parsing {}", folder.path()); let mut mailbox_handle = mailbox_handle.clone(); let work = mailbox_handle.work().unwrap(); - debug!("AA"); - std::thread::Builder::new() - .spawn(move || { - debug!("A"); - work.compute(); - debug!("B"); - }) + work_context.new_work.send(work).unwrap(); + let thread_id = std::thread::current().id(); + work_context.set_name.send((thread_id, name)).unwrap(); + work_context + .set_status + .send((thread_id, "Waiting for subworkers..".to_string())) .unwrap(); - debug!("BB"); loop { - debug!("LL"); match debug!(mailbox_handle.poll_block()) { Ok(s @ AsyncStatus::Payload(_)) => { our_tx.send(s).unwrap(); @@ -353,6 +376,7 @@ impl Account { our_tx.send(s).unwrap(); notify_fn.notify(folder_hash); debug!("exiting"); + work_context.finished.send(thread_id).unwrap(); return; } Ok(s) => { @@ -363,7 +387,6 @@ impl Account { return; } } - debug!("DD"); } })); Some(w) @@ -372,7 +395,11 @@ impl Account { &mut self, event: RefreshEvent, folder_hash: FolderHash, - sender: &crossbeam::channel::Sender, + context: ( + &mut WorkController, + &Sender, + &mut VecDeque, + ), ) -> Option { if !self.folders[&folder_hash].is_available() { self.event_queue.push_back((folder_hash, event)); @@ -396,6 +423,13 @@ impl Account { } RefreshEventKind::Create(envelope) => { let env_hash = envelope.hash(); + if self.collection.envelopes.contains_key(&env_hash) + && mailbox!(&folder_hash, self.folders) + .envelopes + .contains(&env_hash) + { + return None; + } mailbox!(&folder_hash, self.folders).insert(env_hash); self.collection.insert(*envelope, folder_hash); if self @@ -436,6 +470,7 @@ impl Account { RefreshEventKind::Rescan => { let ref_folders: FnvHashMap = self.backend.folders(); let handle = Account::new_worker( + &self.settings, ref_folders[&folder_hash].clone(), &mut self.backend, self.notify_fn.clone(), @@ -444,17 +479,40 @@ impl Account { } RefreshEventKind::Failure(e) => { debug!("RefreshEvent Failure: {}", e.to_string()); - let sender = sender.clone(); - self.watch(RefreshEventConsumer::new(Box::new(move |r| { - sender.send(crate::types::ThreadEvent::from(r)).unwrap(); - }))); + self.watch(context); } } } None } - pub fn watch(&self, r: RefreshEventConsumer) { - self.backend.watch(r).unwrap(); + pub fn watch( + &self, + context: ( + &mut WorkController, + &Sender, + &mut VecDeque, + ), + ) { + let (work_controller, sender, replies) = context; + let sender = sender.clone(); + let r = RefreshEventConsumer::new(Box::new(move |r| { + sender.send(ThreadEvent::from(r)).unwrap(); + })); + match self.backend.watch(r, work_controller.get_context()) { + Ok(id) => { + work_controller + .static_threads + .lock() + .unwrap() + .insert(id, format!("watching {}", self.name()).into()); + } + + Err(e) => { + replies.push_back(UIEvent::StatusEvent(StatusEvent::DisplayMessage( + e.to_string(), + ))); + } + } } pub fn len(&self) -> usize { diff --git a/ui/src/state.rs b/ui/src/state.rs index 70342abf..427394a1 100644 --- a/ui/src/state.rs +++ b/ui/src/state.rs @@ -89,6 +89,7 @@ pub struct Context { sender: Sender, receiver: Receiver, input: InputHandler, + work_controller: WorkController, pub temp_files: Vec, } @@ -117,6 +118,10 @@ impl Context { Err(n) => Err(n), } } + + pub fn work_controller(&self) -> &WorkController { + &self.work_controller + } } /// A State object to manage and own components and components of the UI. `State` is responsible for @@ -132,7 +137,6 @@ pub struct State { components: Vec>, pub context: Context, threads: FnvHashMap, thread::JoinHandle<()>)>, - work_controller: WorkController, } impl Drop for State { @@ -218,6 +222,7 @@ impl State { dirty_areas: VecDeque::with_capacity(5), replies: VecDeque::with_capacity(5), temp_files: Vec::new(), + work_controller: WorkController::new(sender.clone()), sender, receiver, @@ -227,13 +232,12 @@ impl State { }, }, threads: FnvHashMap::with_capacity_and_hasher(1, Default::default()), - work_controller: WorkController::new(), }; for a in s.context.accounts.iter_mut() { for worker in a.workers.values_mut() { if let Some(worker) = worker.as_mut() { if let Some(w) = worker.work() { - s.work_controller.queue.add_work(w); + s.context.work_controller.queue.add_work(w); } } } @@ -250,24 +254,35 @@ impl State { .unwrap(); s.flush(); debug!("inserting mailbox hashes:"); - for (x, account) in s.context.accounts.iter_mut().enumerate() { - for folder in account.backend.folders().values() { - debug!("hash & folder: {:?} {}", folder.hash(), folder.name()); - s.context.mailbox_hashes.insert(folder.hash(), x); + { + /* Account::watch() needs + * - work_controller to pass `work_context` to the watcher threads and then add them + * to the controller's static thread list, + * - sender to pass a RefreshEventConsumer closure to watcher threads for them to + * inform the main binary that refresh events arrived + * - replies to report any failures to the user + */ + let Context { + ref mut work_controller, + ref sender, + ref mut replies, + ref mut accounts, + ref mut mailbox_hashes, + .. + } = &mut s.context; + + for (x, account) in accounts.iter_mut().enumerate() { + for folder in account.backend.folders().values() { + debug!("hash & folder: {:?} {}", folder.hash(), folder.name()); + mailbox_hashes.insert(folder.hash(), x); + } + account.watch((work_controller, sender, replies)); } - let sender = s.context.sender.clone(); - account.watch(RefreshEventConsumer::new(Box::new(move |r| { - sender.send(ThreadEvent::from(r)).unwrap(); - }))); } s.restore_input(); s } - pub fn worker_receiver(&mut self) -> Receiver { - self.work_controller.results_rx() - } - /* * When we receive a folder hash from a watcher thread, * we match the hash to the index of the mailbox, request a reload @@ -280,8 +295,16 @@ impl State { self.context.replies.push_back(UIEvent::from(event)); return; } + let Context { + ref mut work_controller, + ref sender, + ref mut replies, + ref mut accounts, + .. + } = &mut self.context; + if let Some(notification) = - self.context.accounts[idxa].reload(event, hash, &self.context.sender) + accounts[idxa].reload(event, hash, (work_controller, sender, replies)) { if let UIEvent::Notification(_, _) = notification { self.context diff --git a/ui/src/types.rs b/ui/src/types.rs index 75413e42..174fa9ad 100644 --- a/ui/src/types.rs +++ b/ui/src/types.rs @@ -50,6 +50,8 @@ pub enum ThreadEvent { /// A watched folder has been refreshed. RefreshMailbox(Box), UIEvent(UIEvent), + /// A thread has updated some of its information + Pulse, //Decode { _ }, // For gpg2 signature check } diff --git a/ui/src/workers.rs b/ui/src/workers.rs index 4bdd1325..307ff6bd 100644 --- a/ui/src/workers.rs +++ b/ui/src/workers.rs @@ -1,93 +1,67 @@ +use crate::types::ThreadEvent; use crossbeam::{ - channel::{bounded, unbounded, Receiver, Sender}, + channel::{bounded, unbounded, Sender}, select, }; -use melib::async_workers::Work; -use std; - +use fnv::FnvHashMap; +use melib::async_workers::{Work, WorkContext}; +use std::sync::Arc; +use std::sync::Mutex; use std::thread; const MAX_WORKER: usize = 4; -pub struct WorkController { - pub queue: WorkQueue, - thread_end_tx: Sender, - results: Option>, - threads: Vec>, +/// Representation of a worker thread for use in `WorkController`. These values are to be displayed +/// to the user. +#[derive(Debug)] +pub struct Worker { + pub name: String, + pub status: String, } -impl WorkController { - pub fn results_rx(&mut self) -> Receiver { - self.results.take().unwrap() +impl From for Worker { + fn from(val: String) -> Self { + Worker { + name: val, + status: String::new(), + } } } +pub struct WorkController { + pub queue: WorkQueue, + thread_end_tx: Sender, + /// Worker threads that take up on jobs from self.queue + pub threads: Arc>>, + /// Special function threads that live indefinitely (eg watching a mailbox) + pub static_threads: Arc>>, + work_context: WorkContext, +} + impl Drop for WorkController { fn drop(&mut self) { - for _ in 0..self.threads.len() { + for _ in 0..self.threads.lock().unwrap().len() { self.thread_end_tx.send(true).unwrap(); } - /* - let threads = mem::replace(&mut self.threads, Vec::new()); - for handle in threads { - handle.join().unwrap(); - } - */ } } -// We need a way to keep track of what work needs to be done. -// This is a multi-source, multi-consumer queue which we call a -// WorkQueue. - -// To create this type, we wrap a mutex (std::sync::mutex) around a -// queue (technically a double-ended queue, std::collections::VecDeque). -// -// Mutex stands for MUTually EXclusive. It essentially ensures that only -// one thread has access to a given resource at one time. -use std::sync::Mutex; - -// A VecDeque is a double-ended queue, but we will only be using it in forward -// mode; that is, we will push onto the back and pull from the front. -use std::collections::VecDeque; - -// Finally we wrap the whole thing in Arc (Atomic Reference Counting) so that -// we can safely share it with other threads. Arc (std::sync::arc) is a lot -// like Rc (std::rc::Rc), in that it allows multiple references to some memory -// which is freed when no references remain, except that it is atomic, making -// it comparitively slow but able to be shared across the thread boundary. -use std::sync::Arc; - -// All three of these types are wrapped around a generic type T. -// T is required to be Send (a marker trait automatically implemented when -// it is safe to do so) because it denotes types that are safe to move between -// threads, which is the whole point of the WorkQueue. -// For this implementation, T is required to be Copy as well, for simplicity. - -/// A generic work queue for work elements which can be trivially copied. -/// Any producer of work can add elements and any worker can consume them. -/// WorkQueue derives Clone so that it can be distributed among threads. #[derive(Clone)] -pub struct WorkQueue { - inner: Arc>>, - new_jobs_tx: chan::Sender, +pub struct WorkQueue { + inner: Arc>>, + new_jobs_tx: Sender, + work_context: WorkContext, } -impl WorkQueue { - // Creating one of these by hand would be kind of a pain, - // so let's provide a convenience function. - - /// Creates a new WorkQueue, ready to be used. - fn new(new_jobs_tx: chan::Sender) -> Self { +impl WorkQueue { + fn new(new_jobs_tx: Sender, work_context: WorkContext) -> Self { Self { - inner: Arc::new(Mutex::new(VecDeque::new())), + inner: Arc::new(Mutex::new(Vec::new())), new_jobs_tx, + work_context, } } - // This is the function workers will use to acquire work from the queue. - // They will call it in a loop, checking to see if there is any work available. - /// Blocks the current thread until work is available, then /// gets the data required to perform that work. /// @@ -97,37 +71,22 @@ impl WorkQueue { /// # Panics /// Panics if the underlying mutex became poisoned. This is exceedingly /// unlikely. - fn get_work(&self) -> Option { - // Try to get a lock on the Mutex. If this fails, there is a - // problem with the mutex - it's poisoned, meaning that a thread that - // held the mutex lock panicked before releasing it. There is no way - // to guarantee that all its invariants are upheld, so we need to not - // use it in that case. + fn get_work(&self) -> Option { + // try to get a lock on the mutex. let maybe_queue = self.inner.lock(); - // A lot is going on here. self.inner is an Arc of Mutex. Arc can deref - // into its internal type, so we can call the methods of that inner - // type (Mutex) without dereferencing, so this is like - // *(self.inner).lock() - // but doesn't look awful. Mutex::lock() returns a - // Result>>. - - // Unwrapping with if let, we get a MutexGuard, which is an RAII guard - // that unlocks the Mutex when it goes out of scope. if let Ok(mut queue) = maybe_queue { - // queue is a MutexGuard, so this is like - // (*queue).pop_front() - // Returns Some(item) or None if there are no more items. - queue.pop_front() - - // The function has returned, so queue goes out of scope and the - // mutex unlocks. + if queue.is_empty() { + return None; + } else { + return Some(queue.swap_remove(0)); + } } else { - // There's a problem with the mutex. + // poisoned mutex, some other thread holding the mutex has panicked! panic!("WorkQueue::get_work() tried to lock a poisoned mutex"); } } - // Both the controller (main thread) and possibly workers can use this + // Both the controller (main thread) and workers can use this // function to add work to the queue. /// Blocks the current thread until work can be added, then @@ -137,16 +96,23 @@ impl WorkQueue { /// # Panics /// Panics if the underlying mutex became poisoned. This is exceedingly /// unlikely. - pub fn add_work(&self, work: T) -> usize { + pub fn add_work(&self, work: Work) { + if work.is_static { + self.work_context.new_work.send(work).unwrap(); + return; + } + // As above, try to get a lock on the mutex. if let Ok(mut queue) = self.inner.lock() { - // As above, we can use the MutexGuard> to access - // the internal VecDeque. - queue.push_back(work); + /* Insert in position that maintains the queue sorted */ + let pos = match queue.binary_search_by(|probe| probe.cmp(&work)) { + Ok(p) => p, + Err(p) => p, + }; + queue.insert(pos, work); + /* inform threads that new job is available */ self.new_jobs_tx.send(true).unwrap(); - // Now return the length of the queue. - queue.len() } else { panic!("WorkQueue::add_work() tried to lock a poisoned mutex"); } @@ -154,79 +120,77 @@ impl WorkQueue { } impl WorkController { - pub fn new() -> WorkController { + pub fn new(pulse: Sender) -> WorkController { let (new_jobs_tx, new_jobs_rx) = unbounded(); - // Create a new work queue to keep track of what work needs to be done. - // Note that the queue is internally mutable (or, rather, the Mutex is), - // but this binding doesn't need to be mutable. This isn't unsound because - // the Mutex ensures at runtime that no two references can be used; - // therefore no mutation can occur at the same time as aliasing. - let queue: WorkQueue = WorkQueue::new(new_jobs_tx); - // Create a MPSC (Multiple Producer, Single Consumer) channel. Every worker - // is a producer, the main thread is a consumer; the producers put their - // work into the channel when it's done. - let (results_tx, results_rx) = unbounded(); + /* create a channel for jobs to send new work to Controller thread */ + let (new_work_tx, new_work_rx) = unbounded(); + /* create a channel for jobs to set their names */ + let (set_name_tx, set_name_rx) = unbounded(); + + /* create a channel for jobs to set their statuses */ + let (set_status_tx, set_status_rx) = unbounded(); + + /* create a channel for jobs to announce their demise */ + let (finished_tx, finished_rx) = unbounded(); + + /* each associated thread will hold a copy of this context item in order to communicate + * with the controller thread */ + let work_context = WorkContext { + new_work: new_work_tx, + set_name: set_name_tx, + set_status: set_status_tx, + finished: finished_tx, + }; + + let queue: WorkQueue = WorkQueue::new(new_jobs_tx, work_context.clone()); // Create a SyncFlag to share whether or not there are more jobs to be done. - let (thread_end_tx, thread_end_rx) = bounded(::std::mem::size_of::()); + let (thread_end_tx, thread_end_rx) = bounded(1); - // This Vec will hold thread join handles to allow us to not exit while work - // is still being done. These handles provide a .join() method which blocks - // the current thread until the thread referred to by the handle exits. - let mut threads = Vec::new(); + let threads_lock: Arc>> = + Arc::new(Mutex::new(FnvHashMap::default())); + let static_threads_lock: Arc>> = + Arc::new(Mutex::new(FnvHashMap::default())); + + let mut threads = threads_lock.lock().unwrap(); + /* spawn worker threads */ for thread_num in 0..MAX_WORKER { - // Get a reference to the queue for the thread to use - // .clone() here doesn't clone the actual queue data, but rather the - // internal Arc produces a new reference for use in the new queue - // instance. + /* Each worker thread will wait on two channels: thread_end and new_jobs. thread_end + * informs the worker that it should quit and new_jobs informs that there is a new job + * available inside the queue. Only one worker will get each job, and others will + * go back to waiting on the channels */ let thread_queue = queue.clone(); - // Similarly, create a new transmitter for the thread to use - let thread_results_tx = results_tx.clone(); - let thread_end_rx = thread_end_rx.clone(); let new_jobs_rx = new_jobs_rx.clone(); + let new_jobs_rx = new_jobs_rx.clone(); + + let work_context = work_context.clone(); + let pulse = pulse.clone(); - // thread::spawn takes a closure (an anonymous function that "closes" - // over its environment). The move keyword means it takes ownership of - // those variables, meaning they can't be used again in the main thread. let handle = thread::spawn(move || { - // A varaible to keep track of how much work was done. let mut work_done = 0; 'work_loop: loop { debug!("Waiting for work"); - // Loop while there's expected to be work, looking for work. select! { recv(thread_end_rx) -> _ => { debug!("received thread_end_rx, quitting"); break 'work_loop; }, recv(new_jobs_rx) -> _ => { - // If work is available, do that work. while let Some(work) = thread_queue.get_work() { debug!("Got some work"); - // Do some work. - work.compute(); + work.compute(work_context.clone()); debug!("finished work"); - // Record that some work was done. work_done += 1; + work_context.set_name.send((std::thread::current().id(), "idle-worker".to_string())).unwrap(); + work_context.set_status.send((std::thread::current().id(), "inactive".to_string())).unwrap(); + pulse.send(ThreadEvent::Pulse).unwrap(); - // Send the work and the result of that work. - // - // Sending could fail. If so, there's no use in - // doing any more work, so abort. - thread_results_tx.send(true).unwrap(); - - // Signal to the operating system that now is a good time - // to give another thread a chance to run. - // - // This isn't strictly necessary - the OS can preemptively - // switch between threads, without asking - but it helps make - // sure that other threads do get a chance to get some work. std::thread::yield_now(); } continue 'work_loop; @@ -234,19 +198,112 @@ impl WorkController { } } - // Report the amount of work done. + /* report the amount of work done. */ debug!("Thread {} did {} jobs.", thread_num, work_done); }); - // Add the handle for the newly spawned thread to the list of handles - threads.push(handle); + /* add the handle for the newly spawned thread to the list of handles */ + threads.insert(handle.thread().id(), String::from("idle-worker").into()); + } + /* drop lock */ + drop(threads); + + { + /* start controller thread */ + let threads_lock = threads_lock.clone(); + let _static_threads_lock = static_threads_lock.clone(); + let thread_queue = queue.clone(); + let threads_lock = threads_lock.clone(); + let thread_end_rx = thread_end_rx.clone(); + let work_context = work_context.clone(); + + let handle = thread::spawn(move || 'control_loop: loop { + select! { + recv(thread_end_rx) -> _ => { + debug!("received thread_end_rx, quitting"); + break 'control_loop; + }, + recv(new_work_rx) -> work => { + if let Ok(work) = work { + if work.is_static { + let work_context = work_context.clone(); + let handle = thread::spawn(move || work.compute(work_context)); + _static_threads_lock.lock().unwrap().insert(handle.thread().id(), String::new().into()); + } else { + thread_queue.add_work(work); + } + } + } + recv(set_name_rx) -> new_name => { + if let Ok((thread_id, new_name)) = new_name { + let mut threads = threads_lock.lock().unwrap(); + let mut static_threads = _static_threads_lock.lock().unwrap(); + if threads.contains_key(&thread_id) { + threads.entry(thread_id).and_modify(|e| e.name = new_name); + } else if static_threads.contains_key(&thread_id) { + static_threads.entry(thread_id).and_modify(|e| e.name = new_name); + } else { + unreachable!() + } + pulse.send(ThreadEvent::Pulse).unwrap(); + } + } + recv(set_status_rx) -> new_status => { + if let Ok((thread_id, new_status)) = new_status { + let mut threads = threads_lock.lock().unwrap(); + let mut static_threads = _static_threads_lock.lock().unwrap(); + if threads.contains_key(&thread_id) { + threads.entry(thread_id).and_modify(|e| e.status = new_status); + } else if static_threads.contains_key(&thread_id) { + static_threads.entry(thread_id).and_modify(|e| e.status = new_status); + debug!(&static_threads[&thread_id]); + } else { + unreachable!() + } + pulse.send(ThreadEvent::Pulse).unwrap(); + } + } + recv(finished_rx) -> dead_thread_id => { + if let Ok(thread_id) = dead_thread_id { + let mut threads = threads_lock.lock().unwrap(); + let mut static_threads = _static_threads_lock.lock().unwrap(); + if threads.contains_key(&thread_id) { + threads.remove(&thread_id); + } else if static_threads.contains_key(&thread_id) { + static_threads.remove(&thread_id); + } else { + unreachable!() + } + pulse.send(ThreadEvent::Pulse).unwrap(); + } + } + } + }); + + let mut static_threads = static_threads_lock.lock().unwrap(); + static_threads.insert( + handle.thread().id(), + "WorkController-thread".to_string().into(), + ); } WorkController { queue, thread_end_tx, - results: Some(results_rx), - threads, + threads: threads_lock, + static_threads: static_threads_lock, + work_context, } } + + pub fn add_static_thread(&mut self, id: std::thread::ThreadId) { + self.static_threads + .lock() + .unwrap() + .insert(id, String::new().into()); + } + + pub fn get_context(&self) -> WorkContext { + self.work_context.clone() + } }