imap: continuous payload delivery in async workers

embed
Manos Pitsidianakis 2019-08-25 10:46:25 +03:00
parent c561814cd6
commit c9f7b41e47
Signed by: Manos Pitsidianakis
GPG Key ID: 73627C2F690DF710
5 changed files with 87 additions and 115 deletions

View File

@ -72,36 +72,20 @@ impl<T> fmt::Debug for AsyncStatus<T> {
} }
/// A builder object for `Async<T>` /// A builder object for `Async<T>`
#[derive(Clone)] #[derive(Debug, Clone)]
pub struct AsyncBuilder<T: Send + Sync> { pub struct AsyncBuilder<T: Send + Sync> {
payload_hook: Option<Arc<Fn() -> () + Send + Sync>>,
tx: chan::Sender<AsyncStatus<T>>, tx: chan::Sender<AsyncStatus<T>>,
rx: chan::Receiver<AsyncStatus<T>>, rx: chan::Receiver<AsyncStatus<T>>,
} }
#[derive(Clone)] #[derive(Clone, Debug)]
pub struct Async<T: Send + Sync> { pub struct Async<T: Send + Sync> {
pub value: Option<T>,
work: Work, work: Work,
active: bool, active: bool,
payload_hook: Option<Arc<dyn Fn() -> () + Send + Sync>>,
link: Option<T>,
tx: chan::Sender<AsyncStatus<T>>, tx: chan::Sender<AsyncStatus<T>>,
rx: chan::Receiver<AsyncStatus<T>>, rx: chan::Receiver<AsyncStatus<T>>,
} }
impl<T: Send + Sync> std::fmt::Debug for Async<T> {
fn fmt(&self, f: &mut std::fmt::Formatter) -> std::fmt::Result {
write!(
f,
"Async<{}> {{ active: {}, payload_hook: {} }}",
stringify!(T),
self.active,
self.payload_hook.is_some()
)
}
}
impl<T: Send + Sync> Default for AsyncBuilder<T> { impl<T: Send + Sync> Default for AsyncBuilder<T> {
fn default() -> Self { fn default() -> Self {
AsyncBuilder::<T>::new() AsyncBuilder::<T>::new()
@ -117,7 +101,6 @@ where
AsyncBuilder { AsyncBuilder {
tx: sender, tx: sender,
rx: receiver, rx: receiver,
payload_hook: None,
} }
} }
/// Returns the sender object of the promise's channel. /// Returns the sender object of the promise's channel.
@ -132,32 +115,17 @@ where
pub fn build(self, work: Box<dyn Fn() -> () + Send + Sync>) -> Async<T> { pub fn build(self, work: Box<dyn Fn() -> () + Send + Sync>) -> Async<T> {
Async { Async {
work: Work(Arc::new(work)), work: Work(Arc::new(work)),
value: None,
tx: self.tx, tx: self.tx,
rx: self.rx, rx: self.rx,
link: None,
payload_hook: None,
active: false, active: false,
} }
} }
pub fn add_payload_hook(
&mut self,
payload_hook: Option<Arc<dyn Fn() -> () + Send + Sync>>,
) -> &mut Self {
self.payload_hook = payload_hook;
self
}
} }
impl<T> Async<T> impl<T> Async<T>
where where
T: Send + Sync, T: Send + Sync,
{ {
/// Consumes `self` and returns the computed value. Will panic if computation hasn't finished.
pub fn extract(self) -> T {
self.value.unwrap()
}
pub fn work(&mut self) -> Option<Work> { pub fn work(&mut self) -> Option<Work> {
if !self.active { if !self.active {
self.active = true; self.active = true;
@ -175,21 +143,21 @@ where
self.rx.clone() self.rx.clone()
} }
/// Polls worker thread and returns result. /// Polls worker thread and returns result.
pub fn poll(&mut self) -> Result<AsyncStatus<T>, ()> { pub fn poll_block(&mut self) -> Result<AsyncStatus<T>, ()> {
if self.value.is_some() { if !self.active {
return Ok(AsyncStatus::Finished); return Ok(AsyncStatus::Finished);
} }
//self.tx.send(true);
let rx = &self.rx; let rx = &self.rx;
let result: T;
chan_select! { chan_select! {
default => {
return Ok(AsyncStatus::NoUpdate);
},
rx.recv() -> r => { rx.recv() -> r => {
match r { match r {
Some(AsyncStatus::Payload(payload)) => { Some(p @ AsyncStatus::Payload(_)) => {
result = payload; return Ok(p);
},
Some(f @ AsyncStatus::Finished) => {
self.active = false;
return Ok(f);
}, },
Some(a) => { Some(a) => {
return Ok(a); return Ok(a);
@ -200,46 +168,35 @@ where
} }
}, },
}; };
self.value = Some(result); }
if let Some(hook) = self.payload_hook.as_ref() { /// Polls worker thread and returns result.
hook(); pub fn poll(&mut self) -> Result<AsyncStatus<T>, ()> {
if !self.active {
return Ok(AsyncStatus::Finished);
} }
Ok(AsyncStatus::Finished)
}
/// Blocks until thread joins.
pub fn join(&mut self) {
let result: T;
let rx = &self.rx; let rx = &self.rx;
loop { chan_select! {
chan_select! { default => {
rx.recv() -> r => { return Ok(AsyncStatus::NoUpdate);
match r { },
Some(AsyncStatus::Payload(payload)) => { rx.recv() -> r => {
result = payload; match r {
break; Some(p @ AsyncStatus::Payload(_)) => {
}, return Ok(p);
_ => continue, },
Some(f @ AsyncStatus::Finished) => {
self.active = false;
return Ok(f);
},
Some(a) => {
return Ok(a);
} }
_ => {
return Err(());
},
} }
},
} };
}
self.value = Some(result);
}
pub fn link(&mut self, other: Async<T>) -> &mut Self {
let Async {
rx,
tx,
work,
value,
..
} = other;
self.rx = rx;
self.tx = tx;
self.work = work;
self.value = value;
self
} }
} }

View File

@ -569,6 +569,18 @@ macro_rules! get_conf_val {
}; };
} }
macro_rules! exit_on_error {
($s:ident, $($result:expr)+) => {
$(if let Err(e) = $result {
eprintln!(
"IMAP error ({}): {}",
$s.name.as_str(),
e.to_string(),
);
std::process::exit(1);
})+
};
}
impl ImapType { impl ImapType {
pub fn new(s: &AccountSettings) -> Self { pub fn new(s: &AccountSettings) -> Self {
use std::io::prelude::*; use std::io::prelude::*;
@ -591,11 +603,15 @@ impl ImapType {
std::process::exit(1); std::process::exit(1);
}; };
let mut socket = TcpStream::connect(&addr).unwrap(); let mut socket = TcpStream::connect(&addr);
let cmd_id = 0; let cmd_id = 0;
socket exit_on_error!(
.write_all(format!("M{} STARTTLS\r\n", cmd_id).as_bytes()) s,
.unwrap(); socket
socket.as_mut().unwrap().write_all(format!("M{} STARTTLS\r\n", cmd_id).as_bytes())
);
let mut socket = socket.unwrap();
// FIXME handle response properly
let mut buf = vec![0; 1024]; let mut buf = vec![0; 1024];
let mut response = String::with_capacity(1024); let mut response = String::with_capacity(1024);
let mut cap_flag = false; let mut cap_flag = false;

View File

@ -63,8 +63,8 @@ impl BackendOp for ImapOp {
let mut response = String::with_capacity(8 * 1024); let mut response = String::with_capacity(8 * 1024);
{ {
let mut conn = self.connection.lock().unwrap(); let mut conn = self.connection.lock().unwrap();
conn.send_command(format!("SELECT {}", self.folder_path).as_bytes()); conn.send_command(format!("SELECT {}", self.folder_path).as_bytes())?;
conn.read_response(&mut response); conn.read_response(&mut response)?;
conn.send_command(format!("UID FETCH {} (FLAGS RFC822)", self.uid).as_bytes())?; conn.send_command(format!("UID FETCH {} (FLAGS RFC822)", self.uid).as_bytes())?;
conn.read_response(&mut response)?; conn.read_response(&mut response)?;
} }

View File

@ -804,6 +804,7 @@ impl MaildirType {
}; };
let result = thunk(); let result = thunk();
tx_final.send(AsyncStatus::Payload(result)); tx_final.send(AsyncStatus::Payload(result));
tx_final.send(AsyncStatus::Finished);
}; };
Box::new(closure) Box::new(closure)
}; };

View File

@ -221,6 +221,7 @@ impl Account {
let mut stack: StackVec<FolderHash> = StackVec::new(); let mut stack: StackVec<FolderHash> = StackVec::new();
let mut tree: Vec<FolderNode> = Vec::new(); let mut tree: Vec<FolderNode> = Vec::new();
let mut collection: Collection = Collection::new(Default::default());
for (h, f) in ref_folders.iter() { for (h, f) in ref_folders.iter() {
if !settings.folder_confs.contains_key(f.path()) if !settings.folder_confs.contains_key(f.path())
|| settings.folder_confs[f.path()].subscribe.is_false() || settings.folder_confs[f.path()].subscribe.is_false()
@ -258,6 +259,7 @@ impl Account {
*h, *h,
Account::new_worker(f.clone(), &mut backend, notify_fn.clone()), Account::new_worker(f.clone(), &mut backend, notify_fn.clone()),
); );
collection.threads.insert(*h, Threads::default());
} }
tree.sort_unstable_by_key(|f| ref_folders[&f.hash].path()); tree.sort_unstable_by_key(|f| ref_folders[&f.hash].path());
@ -298,7 +300,7 @@ impl Account {
tree, tree,
address_book, address_book,
sent_folder, sent_folder,
collection: Collection::new(Default::default()), collection,
workers, workers,
settings: settings.clone(), settings: settings.clone(),
runtime_settings: settings, runtime_settings: settings,
@ -320,37 +322,39 @@ impl Account {
let w = builder.build(Box::new(move || { let w = builder.build(Box::new(move || {
let mut mailbox_handle = mailbox_handle.clone(); let mut mailbox_handle = mailbox_handle.clone();
let work = mailbox_handle.work().unwrap(); let work = mailbox_handle.work().unwrap();
let rx = mailbox_handle.rx(); debug!("AA");
let tx = mailbox_handle.tx();
std::thread::Builder::new() std::thread::Builder::new()
.spawn(move || { .spawn(move || {
debug!("A");
work.compute(); work.compute();
debug!("B");
}) })
.unwrap(); .unwrap();
debug!("BB");
loop { loop {
debug!("looping"); debug!("LL");
chan_select! { match debug!(mailbox_handle.poll_block()) {
rx.recv() -> r => { Ok(s @ AsyncStatus::Payload(_)) => {
debug!("got {:?}", r); our_tx.send(s);
match r { debug!("notifying for {}", folder_hash);
Some(s @ AsyncStatus::Payload(_)) => { notify_fn.notify(folder_hash);
our_tx.send(s); }
debug!("notifying for {}", folder_hash); Ok(s @ AsyncStatus::Finished) => {
notify_fn.notify(folder_hash); our_tx.send(s);
} notify_fn.notify(folder_hash);
Some(AsyncStatus::Finished) => { debug!("exiting");
debug!("exiting"); return;
return; }
} Ok(s) => {
Some(s) => { our_tx.send(s);
our_tx.send(s); }
} Err(_) => {
None => return, debug!("poll error");
} return;
} }
} }
debug!("DD");
} }
})); }));
Some(w) Some(w)
@ -535,7 +539,7 @@ impl Account {
debug!("got payload in status for {}", folder_hash); debug!("got payload in status for {}", folder_hash);
self.load_mailbox(folder_hash, envs); self.load_mailbox(folder_hash, envs);
} }
Ok(AsyncStatus::Finished) if w.value.is_none() => { Ok(AsyncStatus::Finished) => {
debug!("got finished in status for {}", folder_hash); debug!("got finished in status for {}", folder_hash);
self.folders.entry(folder_hash).and_modify(|f| { self.folders.entry(folder_hash).and_modify(|f| {
let m = if let MailboxEntry::Parsing(m, _, _) = f { let m = if let MailboxEntry::Parsing(m, _, _) = f {
@ -548,11 +552,6 @@ impl Account {
self.workers.insert(folder_hash, None); self.workers.insert(folder_hash, None);
} }
Ok(AsyncStatus::Finished) if w.value.is_some() => {
let envs = w.value.take().unwrap();
debug!("got payload in status for {}", folder_hash);
self.load_mailbox(folder_hash, envs);
}
Ok(AsyncStatus::ProgressReport(n)) => { Ok(AsyncStatus::ProgressReport(n)) => {
self.folders.entry(folder_hash).and_modify(|f| { self.folders.entry(folder_hash).and_modify(|f| {
if let MailboxEntry::Parsing(_, ref mut d, _) = f { if let MailboxEntry::Parsing(_, ref mut d, _) = f {
@ -565,7 +564,6 @@ impl Account {
//return Err(0); //return Err(0);
} }
}, },
Some(_) => return Ok(()),
}; };
if self.folders[&folder_hash].is_available() if self.folders[&folder_hash].is_available()
|| (self.folders[&folder_hash].is_parsing() || (self.folders[&folder_hash].is_parsing()