From c9f7b41e4720b2aef0c68f5c6e3361a859c1f4e3 Mon Sep 17 00:00:00 2001 From: Manos Pitsidianakis Date: Sun, 25 Aug 2019 10:46:25 +0300 Subject: [PATCH] imap: continuous payload delivery in async workers --- melib/src/async_workers.rs | 115 ++++++++------------------ melib/src/backends/imap.rs | 24 +++++- melib/src/backends/imap/operations.rs | 4 +- melib/src/backends/maildir/backend.rs | 1 + ui/src/conf/accounts.rs | 58 +++++++------ 5 files changed, 87 insertions(+), 115 deletions(-) diff --git a/melib/src/async_workers.rs b/melib/src/async_workers.rs index fc0a16fe5..bc7571a61 100644 --- a/melib/src/async_workers.rs +++ b/melib/src/async_workers.rs @@ -72,36 +72,20 @@ impl fmt::Debug for AsyncStatus { } /// A builder object for `Async` -#[derive(Clone)] +#[derive(Debug, Clone)] pub struct AsyncBuilder { - payload_hook: Option () + Send + Sync>>, tx: chan::Sender>, rx: chan::Receiver>, } -#[derive(Clone)] +#[derive(Clone, Debug)] pub struct Async { - pub value: Option, work: Work, active: bool, - payload_hook: Option () + Send + Sync>>, - link: Option, tx: chan::Sender>, rx: chan::Receiver>, } -impl std::fmt::Debug for Async { - fn fmt(&self, f: &mut std::fmt::Formatter) -> std::fmt::Result { - write!( - f, - "Async<{}> {{ active: {}, payload_hook: {} }}", - stringify!(T), - self.active, - self.payload_hook.is_some() - ) - } -} - impl Default for AsyncBuilder { fn default() -> Self { AsyncBuilder::::new() @@ -117,7 +101,6 @@ where AsyncBuilder { tx: sender, rx: receiver, - payload_hook: None, } } /// Returns the sender object of the promise's channel. @@ -132,32 +115,17 @@ where pub fn build(self, work: Box () + Send + Sync>) -> Async { Async { work: Work(Arc::new(work)), - value: None, tx: self.tx, rx: self.rx, - link: None, - payload_hook: None, active: false, } } - - pub fn add_payload_hook( - &mut self, - payload_hook: Option () + Send + Sync>>, - ) -> &mut Self { - self.payload_hook = payload_hook; - self - } } impl Async where T: Send + Sync, { - /// Consumes `self` and returns the computed value. Will panic if computation hasn't finished. - pub fn extract(self) -> T { - self.value.unwrap() - } pub fn work(&mut self) -> Option { if !self.active { self.active = true; @@ -175,21 +143,21 @@ where self.rx.clone() } /// Polls worker thread and returns result. - pub fn poll(&mut self) -> Result, ()> { - if self.value.is_some() { + pub fn poll_block(&mut self) -> Result, ()> { + if !self.active { return Ok(AsyncStatus::Finished); } - //self.tx.send(true); + let rx = &self.rx; - let result: T; chan_select! { - default => { - return Ok(AsyncStatus::NoUpdate); - }, rx.recv() -> r => { match r { - Some(AsyncStatus::Payload(payload)) => { - result = payload; + Some(p @ AsyncStatus::Payload(_)) => { + return Ok(p); + }, + Some(f @ AsyncStatus::Finished) => { + self.active = false; + return Ok(f); }, Some(a) => { return Ok(a); @@ -200,46 +168,35 @@ where } }, }; - self.value = Some(result); - if let Some(hook) = self.payload_hook.as_ref() { - hook(); + } + /// Polls worker thread and returns result. + pub fn poll(&mut self) -> Result, ()> { + if !self.active { + return Ok(AsyncStatus::Finished); } - Ok(AsyncStatus::Finished) - } - /// Blocks until thread joins. - pub fn join(&mut self) { - let result: T; let rx = &self.rx; - loop { - chan_select! { - rx.recv() -> r => { - match r { - Some(AsyncStatus::Payload(payload)) => { - result = payload; - break; - }, - _ => continue, + chan_select! { + default => { + return Ok(AsyncStatus::NoUpdate); + }, + rx.recv() -> r => { + match r { + Some(p @ AsyncStatus::Payload(_)) => { + return Ok(p); + }, + Some(f @ AsyncStatus::Finished) => { + self.active = false; + return Ok(f); + }, + Some(a) => { + return Ok(a); } + _ => { + return Err(()); + }, } - - } - } - self.value = Some(result); - } - - pub fn link(&mut self, other: Async) -> &mut Self { - let Async { - rx, - tx, - work, - value, - .. - } = other; - self.rx = rx; - self.tx = tx; - self.work = work; - self.value = value; - self + }, + }; } } diff --git a/melib/src/backends/imap.rs b/melib/src/backends/imap.rs index 36b1e51d5..7307336c5 100644 --- a/melib/src/backends/imap.rs +++ b/melib/src/backends/imap.rs @@ -569,6 +569,18 @@ macro_rules! get_conf_val { }; } +macro_rules! exit_on_error { + ($s:ident, $($result:expr)+) => { + $(if let Err(e) = $result { + eprintln!( + "IMAP error ({}): {}", + $s.name.as_str(), + e.to_string(), + ); + std::process::exit(1); + })+ + }; +} impl ImapType { pub fn new(s: &AccountSettings) -> Self { use std::io::prelude::*; @@ -591,11 +603,15 @@ impl ImapType { std::process::exit(1); }; - let mut socket = TcpStream::connect(&addr).unwrap(); + let mut socket = TcpStream::connect(&addr); let cmd_id = 0; - socket - .write_all(format!("M{} STARTTLS\r\n", cmd_id).as_bytes()) - .unwrap(); + exit_on_error!( + s, + socket + socket.as_mut().unwrap().write_all(format!("M{} STARTTLS\r\n", cmd_id).as_bytes()) + ); + let mut socket = socket.unwrap(); + // FIXME handle response properly let mut buf = vec![0; 1024]; let mut response = String::with_capacity(1024); let mut cap_flag = false; diff --git a/melib/src/backends/imap/operations.rs b/melib/src/backends/imap/operations.rs index c653a19f8..266bb14d2 100644 --- a/melib/src/backends/imap/operations.rs +++ b/melib/src/backends/imap/operations.rs @@ -63,8 +63,8 @@ impl BackendOp for ImapOp { let mut response = String::with_capacity(8 * 1024); { let mut conn = self.connection.lock().unwrap(); - conn.send_command(format!("SELECT {}", self.folder_path).as_bytes()); - conn.read_response(&mut response); + conn.send_command(format!("SELECT {}", self.folder_path).as_bytes())?; + conn.read_response(&mut response)?; conn.send_command(format!("UID FETCH {} (FLAGS RFC822)", self.uid).as_bytes())?; conn.read_response(&mut response)?; } diff --git a/melib/src/backends/maildir/backend.rs b/melib/src/backends/maildir/backend.rs index 615f1ceaa..60b7d263b 100644 --- a/melib/src/backends/maildir/backend.rs +++ b/melib/src/backends/maildir/backend.rs @@ -804,6 +804,7 @@ impl MaildirType { }; let result = thunk(); tx_final.send(AsyncStatus::Payload(result)); + tx_final.send(AsyncStatus::Finished); }; Box::new(closure) }; diff --git a/ui/src/conf/accounts.rs b/ui/src/conf/accounts.rs index 77002ac1d..29ced12d6 100644 --- a/ui/src/conf/accounts.rs +++ b/ui/src/conf/accounts.rs @@ -221,6 +221,7 @@ impl Account { let mut stack: StackVec = StackVec::new(); let mut tree: Vec = Vec::new(); + let mut collection: Collection = Collection::new(Default::default()); for (h, f) in ref_folders.iter() { if !settings.folder_confs.contains_key(f.path()) || settings.folder_confs[f.path()].subscribe.is_false() @@ -258,6 +259,7 @@ impl Account { *h, Account::new_worker(f.clone(), &mut backend, notify_fn.clone()), ); + collection.threads.insert(*h, Threads::default()); } tree.sort_unstable_by_key(|f| ref_folders[&f.hash].path()); @@ -298,7 +300,7 @@ impl Account { tree, address_book, sent_folder, - collection: Collection::new(Default::default()), + collection, workers, settings: settings.clone(), runtime_settings: settings, @@ -320,37 +322,39 @@ impl Account { let w = builder.build(Box::new(move || { let mut mailbox_handle = mailbox_handle.clone(); let work = mailbox_handle.work().unwrap(); - let rx = mailbox_handle.rx(); - let tx = mailbox_handle.tx(); - + debug!("AA"); std::thread::Builder::new() .spawn(move || { + debug!("A"); work.compute(); + debug!("B"); }) .unwrap(); + debug!("BB"); loop { - debug!("looping"); - chan_select! { - rx.recv() -> r => { - debug!("got {:?}", r); - match r { - Some(s @ AsyncStatus::Payload(_)) => { - our_tx.send(s); - debug!("notifying for {}", folder_hash); - notify_fn.notify(folder_hash); - } - Some(AsyncStatus::Finished) => { - debug!("exiting"); - return; - } - Some(s) => { - our_tx.send(s); - } - None => return, - } + debug!("LL"); + match debug!(mailbox_handle.poll_block()) { + Ok(s @ AsyncStatus::Payload(_)) => { + our_tx.send(s); + debug!("notifying for {}", folder_hash); + notify_fn.notify(folder_hash); + } + Ok(s @ AsyncStatus::Finished) => { + our_tx.send(s); + notify_fn.notify(folder_hash); + debug!("exiting"); + return; + } + Ok(s) => { + our_tx.send(s); + } + Err(_) => { + debug!("poll error"); + return; } } + debug!("DD"); } })); Some(w) @@ -535,7 +539,7 @@ impl Account { debug!("got payload in status for {}", folder_hash); self.load_mailbox(folder_hash, envs); } - Ok(AsyncStatus::Finished) if w.value.is_none() => { + Ok(AsyncStatus::Finished) => { debug!("got finished in status for {}", folder_hash); self.folders.entry(folder_hash).and_modify(|f| { let m = if let MailboxEntry::Parsing(m, _, _) = f { @@ -548,11 +552,6 @@ impl Account { self.workers.insert(folder_hash, None); } - Ok(AsyncStatus::Finished) if w.value.is_some() => { - let envs = w.value.take().unwrap(); - debug!("got payload in status for {}", folder_hash); - self.load_mailbox(folder_hash, envs); - } Ok(AsyncStatus::ProgressReport(n)) => { self.folders.entry(folder_hash).and_modify(|f| { if let MailboxEntry::Parsing(_, ref mut d, _) = f { @@ -565,7 +564,6 @@ impl Account { //return Err(0); } }, - Some(_) => return Ok(()), }; if self.folders[&folder_hash].is_available() || (self.folders[&folder_hash].is_parsing()