From bbedeed3e37b5d145977c5ffbd75788f9ca1f4d5 Mon Sep 17 00:00:00 2001 From: Manos Pitsidianakis Date: Sun, 5 Jul 2020 13:22:48 +0300 Subject: [PATCH] More imap async fixes --- Cargo.lock | 1 + melib/src/backends.rs | 3 + melib/src/backends/imap/operations.rs | 2 - melib/src/backends/imap/protocol_parser.rs | 27 +-- melib/src/backends/imap_async.rs | 109 +++++----- melib/src/backends/imap_async/connection.rs | 137 ++++++------ .../backends/imap_async/protocol_parser.rs | 75 ++++--- melib/src/backends/imap_async/untagged.rs | 2 +- melib/src/backends/imap_async/watch.rs | 46 ++-- melib/src/backends/jmap/operations.rs | 2 - src/components/mail/compose.rs | 113 ++++++++-- src/components/mail/listing.rs | 143 ++++++------ src/components/mail/listing/compact.rs | 2 +- src/components/mail/listing/conversations.rs | 204 ++++++++++-------- src/components/mail/listing/plain.rs | 181 +++++++++------- src/components/mail/view.rs | 8 +- src/conf/accounts.rs | 176 ++++++++++----- src/jobs.rs | 2 +- 18 files changed, 741 insertions(+), 492 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index c4be75e7..c59a150e 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -943,6 +943,7 @@ dependencies = [ "nix", "nom", "notify", + "pin-utils", "reqwest", "rusqlite", "serde", diff --git a/melib/src/backends.rs b/melib/src/backends.rs index e6e50ba6..a77d2708 100644 --- a/melib/src/backends.rs +++ b/melib/src/backends.rs @@ -329,6 +329,9 @@ pub trait MailBackend: ::std::fmt::Debug + Send + Sync { sender: RefreshEventConsumer, work_context: WorkContext, ) -> Result; + fn watch_async(&self, sender: RefreshEventConsumer) -> ResultFuture<()> { + Err(MeliError::new("Unimplemented.")) + } fn mailboxes(&self) -> Result>; fn mailboxes_async(&self) -> ResultFuture> { Err(MeliError::new("Unimplemented.")) diff --git a/melib/src/backends/imap/operations.rs b/melib/src/backends/imap/operations.rs index 824f486c..aeeb27b2 100644 --- a/melib/src/backends/imap/operations.rs +++ b/melib/src/backends/imap/operations.rs @@ -20,8 +20,6 @@ */ use super::*; -use futures::lock::Mutex as FutureMutex; - use crate::backends::*; use crate::email::*; use crate::error::{MeliError, Result}; diff --git a/melib/src/backends/imap/protocol_parser.rs b/melib/src/backends/imap/protocol_parser.rs index bd1545f4..99794171 100644 --- a/melib/src/backends/imap/protocol_parser.rs +++ b/melib/src/backends/imap/protocol_parser.rs @@ -658,13 +658,13 @@ pub fn uid_fetch_response_( let (input, _) = take_while(is_digit)(input)?; let (input, result) = permutation(( preceded( - tag("UID "), + alt((tag("UID "), tag(" UID "))), map_res(digit1, |s| { usize::from_str(unsafe { std::str::from_utf8_unchecked(s) }) }), ), opt(preceded( - tag("FLAGS "), + alt((tag("FLAGS "), tag(" FLAGS "))), delimited(tag("("), byte_flags, tag(")")), )), length_data(delimited( @@ -685,13 +685,16 @@ pub fn uid_fetch_flags_response(input: &[u8]) -> IResult<&[u8], Vec<(usize, (Fla many0(|input| -> IResult<&[u8], (usize, (Flag, Vec))> { let (input, _) = tag("* ")(input)?; let (input, _) = take_while(is_digit)(input)?; - let (input, _) = tag(" FETCH ( ")(input)?; + let (input, _) = tag(" FETCH (")(input)?; let (input, uid_flags) = permutation(( preceded( - tag("UID "), + alt((tag("UID "), tag(" UID "))), map_res(digit1, |s| usize::from_str(to_str!(s))), ), - preceded(tag("FLAGS "), delimited(tag("("), byte_flags, tag(")"))), + preceded( + alt((tag("FLAGS "), tag(" FLAGS "))), + delimited(tag("("), byte_flags, tag(")")), + ), ))(input.ltrim())?; let (input, _) = tag(")\r\n")(input)?; Ok((input, (uid_flags.0, uid_flags.1))) @@ -1356,13 +1359,13 @@ pub fn uid_fetch_envelopes_response( let (input, _) = tag(" FETCH (")(input)?; let (input, uid_flags) = permutation(( preceded( - tag("UID "), + alt((tag("UID "), tag(" UID "))), map_res(digit1, |s| { usize::from_str(unsafe { std::str::from_utf8_unchecked(s) }) }), ), opt(preceded( - tag("FLAGS "), + alt((tag("FLAGS "), tag(" FLAGS "))), delimited(tag("("), byte_flags, tag(")")), )), ))(input.ltrim())?; @@ -1424,31 +1427,31 @@ pub fn status_response(input: &[u8]) -> IResult<&[u8], StatusResponse> { let (input, _) = tag(" (")(input)?; let (input, result) = permutation(( opt(preceded( - tag("MESSAGES "), + alt((tag("MESSAGES "), tag(" MESSAGES "))), map_res(digit1, |s| { usize::from_str(unsafe { std::str::from_utf8_unchecked(s) }) }), )), opt(preceded( - tag("RECENT "), + alt((tag("RECENT "), tag(" RECENT "))), map_res(digit1, |s| { usize::from_str(unsafe { std::str::from_utf8_unchecked(s) }) }), )), opt(preceded( - tag("UIDNEXT "), + alt((tag("UIDNEXT "), tag(" UIDNEXT "))), map_res(digit1, |s| { usize::from_str(unsafe { std::str::from_utf8_unchecked(s) }) }), )), opt(preceded( - tag("UIDVALIDITY "), + alt((tag("UIDVALIDITY "), tag(" UIDVALIDITY "))), map_res(digit1, |s| { usize::from_str(unsafe { std::str::from_utf8_unchecked(s) }) }), )), opt(preceded( - tag("UNSEEN "), + alt((tag("UNSEEN "), tag(" UNSEEN "))), map_res(digit1, |s| { usize::from_str(unsafe { std::str::from_utf8_unchecked(s) }) }), diff --git a/melib/src/backends/imap_async.rs b/melib/src/backends/imap_async.rs index 9549e4c6..f1ebe9c9 100644 --- a/melib/src/backends/imap_async.rs +++ b/melib/src/backends/imap_async.rs @@ -135,7 +135,7 @@ pub struct UIDStore { byte_cache: Arc>>, tag_index: Arc>>, - mailboxes: Arc>>, + mailboxes: Arc>>, is_online: Arc)>>, refresh_events: Arc>>, sender: Arc>>, @@ -151,7 +151,7 @@ impl Default for UIDStore { uid_index: Default::default(), msn_index: Default::default(), byte_cache: Default::default(), - mailboxes: Arc::new(RwLock::new(Default::default())), + mailboxes: Arc::new(FutureMutex::new(Default::default())), tag_index: Arc::new(RwLock::new(Default::default())), is_online: Arc::new(Mutex::new(( Instant::now(), @@ -204,18 +204,17 @@ impl MailBackend for ImapType { mailbox_hash: MailboxHash, sender: RefreshEventConsumer, ) -> ResultFuture<()> { - let inbox = self - .uid_store - .mailboxes - .read() - .unwrap() - .get(&mailbox_hash) - .map(std::clone::Clone::clone) - .unwrap(); let main_conn = self.connection.clone(); *self.uid_store.sender.write().unwrap() = Some(sender); let uid_store = self.uid_store.clone(); Ok(Box::pin(async move { + let inbox = uid_store + .mailboxes + .lock() + .await + .get(&mailbox_hash) + .map(std::clone::Clone::clone) + .unwrap(); let mut conn = main_conn.lock().await; watch::examine_updates(&inbox, &mut conn, &uid_store).await?; Ok(()) @@ -227,7 +226,7 @@ impl MailBackend for ImapType { let connection = self.connection.clone(); Ok(Box::pin(async move { { - let mailboxes = uid_store.mailboxes.read().unwrap(); + let mailboxes = uid_store.mailboxes.lock().await; if !mailboxes.is_empty() { return Ok(mailboxes .iter() @@ -236,7 +235,7 @@ impl MailBackend for ImapType { } } let new_mailboxes = ImapType::imap_mailboxes(&connection).await?; - let mut mailboxes = uid_store.mailboxes.write()?; + let mut mailboxes = uid_store.mailboxes.lock().await; *mailboxes = new_mailboxes; /* let mut invalid_configs = vec![]; @@ -314,7 +313,7 @@ impl MailBackend for ImapType { _mailbox_hash: MailboxHash, _sender: RefreshEventConsumer, ) -> Result> { - unimplemented!() + Err(MeliError::new("Unimplemented.")) } fn watch( @@ -323,45 +322,39 @@ impl MailBackend for ImapType { _work_context: WorkContext, ) -> Result { Ok(std::thread::current().id()) - //Err(MeliError::new("Unimplemented.")) - //unimplemented!() - /* + } + + fn watch_async(&self, sender: RefreshEventConsumer) -> ResultFuture<()> { + debug!("watch_async called"); let conn = ImapConnection::new_connection(&self.server_conf, self.uid_store.clone()); let main_conn = self.connection.clone(); *self.uid_store.sender.write().unwrap() = Some(sender); let uid_store = self.uid_store.clone(); - let handle = std::thread::Builder::new() - .name(format!("{} imap connection", self.account_name.as_str(),)) - .spawn(move || { - let thread = std::thread::current(); - work_context - .set_status - .send((thread.id(), "watching".to_string())) - .unwrap(); - let has_idle: bool = main_conn - .lock() - .unwrap() - .capabilities - .iter() - .any(|cap| cap.eq_ignore_ascii_case(b"IDLE")); - //let kit = ImapWatchKit { - // conn, - // main_conn, - // uid_store, - // work_context, - //}; - //if has_idle { - // idle(kit).ok().take(); - //} else { - // poll_with_examine(kit).ok().take(); - //} - })?; - Ok(handle.thread().id()) - */ + Ok(Box::pin(async move { + let has_idle: bool = main_conn + .lock() + .await + .capabilities + .iter() + .any(|cap| cap.eq_ignore_ascii_case(b"IDLE")); + debug!(has_idle); + let kit = ImapWatchKit { + conn, + main_conn, + uid_store, + }; + if has_idle { + idle(kit).await?; + } else { + poll_with_examine(kit).await?; + } + debug!("watch_async future returning"); + Ok(()) + })) } fn mailboxes(&self) -> Result> { - unimplemented!() + Err(MeliError::new("Unimplemented.")) } fn operation(&self, hash: EnvelopeHash) -> Result> { @@ -392,7 +385,7 @@ impl MailBackend for ImapType { let connection = self.connection.clone(); Ok(Box::pin(async move { let path = { - let mailboxes = uid_store.mailboxes.read().unwrap(); + let mailboxes = uid_store.mailboxes.lock().await; let mailbox = mailboxes.get(&mailbox_hash).ok_or(MeliError::new(format!( "Mailbox with hash {} not found.", @@ -468,7 +461,7 @@ impl MailBackend for ImapType { */ { - let mailboxes = uid_store.mailboxes.write().unwrap(); + let mailboxes = uid_store.mailboxes.lock().await; for root_mailbox in mailboxes.values().filter(|f| f.parent.is_none()) { if path.starts_with(&root_mailbox.name) { debug!("path starts with {:?}", &root_mailbox); @@ -508,7 +501,7 @@ impl MailBackend for ImapType { let ret: Result<()> = ImapResponse::from(&response).into(); ret?; let new_hash = get_path_hash!(path.as_str()); - uid_store.mailboxes.write().unwrap().clear(); + uid_store.mailboxes.lock().await.clear(); Ok((new_hash, new_mailbox_fut?.await.map_err(|err| MeliError::new(format!("Mailbox create was succesful (returned `{}`) but listing mailboxes afterwards returned `{}`", response, err)))?)) })) } @@ -525,7 +518,7 @@ impl MailBackend for ImapType { let no_select: bool; let is_subscribed: bool; { - let mailboxes = uid_store.mailboxes.read().unwrap(); + let mailboxes = uid_store.mailboxes.lock().await; no_select = mailboxes[&mailbox_hash].no_select; is_subscribed = mailboxes[&mailbox_hash].is_subscribed(); imap_path = mailboxes[&mailbox_hash].imap_path().to_string(); @@ -563,7 +556,7 @@ impl MailBackend for ImapType { } let ret: Result<()> = ImapResponse::from(&response).into(); ret?; - uid_store.mailboxes.write().unwrap().clear(); + uid_store.mailboxes.lock().await.clear(); new_mailbox_fut?.await.map_err(|err| format!("Mailbox delete was succesful (returned `{}`) but listing mailboxes afterwards returned `{}`", response, err).into()) })) } @@ -578,7 +571,7 @@ impl MailBackend for ImapType { Ok(Box::pin(async move { let command: String; { - let mailboxes = uid_store.mailboxes.write().unwrap(); + let mailboxes = uid_store.mailboxes.lock().await; if mailboxes[&mailbox_hash].is_subscribed() == new_val { return Ok(()); } @@ -604,8 +597,8 @@ impl MailBackend for ImapType { if ret.is_ok() { uid_store .mailboxes - .write() - .unwrap() + .lock() + .await .entry(mailbox_hash) .and_modify(|entry| { let _ = entry.set_is_subscribed(new_val); @@ -627,7 +620,7 @@ impl MailBackend for ImapType { let command: String; let mut response = String::with_capacity(8 * 1024); { - let mailboxes = uid_store.mailboxes.write().unwrap(); + let mailboxes = uid_store.mailboxes.lock().await; let permissions = mailboxes[&mailbox_hash].permissions(); if !permissions.delete_mailbox { return Err(MeliError::new(format!("You do not have permission to rename mailbox `{}` (rename is equivalent to delete + create). Set permissions for this mailbox are {}", mailboxes[&mailbox_hash].name(), permissions))); @@ -654,10 +647,10 @@ impl MailBackend for ImapType { let new_hash = get_path_hash!(new_path.as_str()); let ret: Result<()> = ImapResponse::from(&response).into(); ret?; - uid_store.mailboxes.write().unwrap().clear(); + uid_store.mailboxes.lock().await.clear(); new_mailbox_fut?.await.map_err(|err| format!("Mailbox rename was succesful (returned `{}`) but listing mailboxes afterwards returned `{}`", response, err))?; Ok(BackendMailbox::clone( - &uid_store.mailboxes.read().unwrap()[&new_hash], + &uid_store.mailboxes.lock().await[&new_hash], )) })) } @@ -670,7 +663,7 @@ impl MailBackend for ImapType { let uid_store = self.uid_store.clone(); let connection = self.connection.clone(); Ok(Box::pin(async move { - let mailboxes = uid_store.mailboxes.write().unwrap(); + let mailboxes = uid_store.mailboxes.lock().await; let permissions = mailboxes[&mailbox_hash].permissions(); if !permissions.change_permissions { return Err(MeliError::new(format!("You do not have permission to change permissions for mailbox `{}`. Set permissions for this mailbox are {}", mailboxes[&mailbox_hash].name(), permissions))); @@ -1076,7 +1069,7 @@ async fn get_hlpr( max_uid: &mut Option, ) -> Result> { let (permissions, mailbox_path, mailbox_exists, no_select, unseen) = { - let f = &uid_store.mailboxes.read().unwrap()[&mailbox_hash]; + let f = &uid_store.mailboxes.lock().await[&mailbox_hash]; ( f.permissions.clone(), f.imap_path().to_string(), diff --git a/melib/src/backends/imap_async/connection.rs b/melib/src/backends/imap_async/connection.rs index e1653c1b..dbbf5639 100644 --- a/melib/src/backends/imap_async/connection.rs +++ b/melib/src/backends/imap_async/connection.rs @@ -569,7 +569,7 @@ impl ImapConnection { self.send_command( format!( "SELECT \"{}\"", - self.uid_store.mailboxes.read().unwrap()[&mailbox_hash].imap_path() + self.uid_store.mailboxes.lock().await[&mailbox_hash].imap_path() ) .as_bytes(), ) @@ -593,7 +593,7 @@ impl ImapConnection { self.send_command( format!( "EXAMINE \"{}\"", - self.uid_store.mailboxes.read().unwrap()[&mailbox_hash].imap_path() + self.uid_store.mailboxes.lock().await[&mailbox_hash].imap_path() ) .as_bytes(), ) @@ -679,27 +679,7 @@ pub struct ImapBlockingConnection { } impl From for ImapBlockingConnection { - fn from(_conn: ImapConnection) -> Self { - unimplemented!() - /* - conn.set_nonblocking(false) - .expect("set_nonblocking call failed"); - conn.stream - .as_mut() - .map(|s| { - s.stream - .set_write_timeout(Some(std::time::Duration::new(30, 0))) - .expect("set_write_timeout call failed") - }) - .expect("set_write_timeout call failed"); - conn.stream - .as_mut() - .map(|s| { - s.stream - .set_read_timeout(Some(std::time::Duration::new(30, 0))) - .expect("set_read_timeout call failed") - }) - .expect("set_read_timeout call failed"); + fn from(conn: ImapConnection) -> Self { ImapBlockingConnection { buf: [0; 1024], conn, @@ -707,7 +687,6 @@ impl From for ImapBlockingConnection { result: Vec::with_capacity(8 * 1024), err: None, } - */ } } @@ -719,69 +698,81 @@ impl ImapBlockingConnection { pub fn err(&self) -> Option<&str> { self.err.as_ref().map(String::as_str) } -} - -impl Iterator for ImapBlockingConnection { - type Item = Vec; - fn next(&mut self) -> Option { - unimplemented!() - /* + pub fn into_stream<'a>(&'a mut self) -> impl Future>> + 'a { self.result.drain(0..self.prev_res_length); self.prev_res_length = 0; + let mut break_flag = false; let mut prev_failure = None; - let ImapBlockingConnection { - ref mut prev_res_length, - ref mut result, - ref mut conn, - ref mut buf, - ref mut err, - } = self; - loop { - if conn.stream.is_err() { - debug!(&conn.stream); + async move { + if self.conn.stream.is_err() { + debug!(&self.conn.stream); return None; } - match conn.stream.as_mut().unwrap().stream.read(buf) { - Ok(0) => return None, - Ok(b) => { - result.extend_from_slice(&buf[0..b]); - debug!(unsafe { std::str::from_utf8_unchecked(result) }); - if let Some(pos) = result.find(b"\r\n") { - *prev_res_length = pos + b"\r\n".len(); - return Some(result[0..*prev_res_length].to_vec()); - } - prev_failure = None; + loop { + if let Some(y) = read(self, &mut break_flag, &mut prev_failure).await { + return Some(y); } - Err(e) - if e.kind() == std::io::ErrorKind::WouldBlock - || e.kind() == std::io::ErrorKind::Interrupted => - { - debug!(&e); - if let Some(prev_failure) = prev_failure.as_ref() { - if Instant::now().duration_since(*prev_failure) - >= std::time::Duration::new(60 * 5, 0) - { - *err = Some(e.to_string()); - return None; - } - } else { - prev_failure = Some(Instant::now()); - } - continue; - } - Err(e) => { - debug!(&conn.stream); - debug!(&e); - *err = Some(e.to_string()); + if break_flag { return None; } } } - */ } } +async fn read( + conn: &mut ImapBlockingConnection, + break_flag: &mut bool, + prev_failure: &mut Option, +) -> Option> { + let ImapBlockingConnection { + ref mut prev_res_length, + ref mut result, + ref mut conn, + ref mut buf, + ref mut err, + } = conn; + + match conn.stream.as_mut().unwrap().stream.read(buf).await { + Ok(0) => { + *break_flag = true; + } + Ok(b) => { + result.extend_from_slice(&buf[0..b]); + debug!(unsafe { std::str::from_utf8_unchecked(result) }); + if let Some(pos) = result.find(b"\r\n") { + *prev_res_length = pos + b"\r\n".len(); + return Some(result[0..*prev_res_length].to_vec()); + } + *prev_failure = None; + } + Err(e) + if e.kind() == std::io::ErrorKind::WouldBlock + || e.kind() == std::io::ErrorKind::Interrupted => + { + debug!(&e); + if let Some(prev_failure) = prev_failure.as_ref() { + if Instant::now().duration_since(*prev_failure) + >= std::time::Duration::new(60 * 5, 0) + { + *err = Some(e.to_string()); + *break_flag = true; + } + } else { + *prev_failure = Some(Instant::now()); + } + } + Err(e) => { + debug!(&conn.stream); + debug!(&e); + *err = Some(e.to_string()); + *break_flag = true; + } + } + None +} + fn lookup_ipv4(host: &str, port: u16) -> Result { use std::net::ToSocketAddrs; diff --git a/melib/src/backends/imap_async/protocol_parser.rs b/melib/src/backends/imap_async/protocol_parser.rs index dbcc29d9..71f5bc84 100644 --- a/melib/src/backends/imap_async/protocol_parser.rs +++ b/melib/src/backends/imap_async/protocol_parser.rs @@ -126,6 +126,26 @@ impl RequiredResponses { } } +#[test] +fn test_imap_required_responses() { + let mut ret = String::new(); + let required_responses = RequiredResponses::FETCH_REQUIRED; + let response = + &"* 1040 FETCH (UID 1064 FLAGS ())\r\nM15 OK Fetch completed (0.001 + 0.299 secs).\r\n" + [0..]; + for l in response.split_rn() { + /*debug!("check line: {}", &l);*/ + if required_responses.check(l) { + ret.push_str(l); + } + } + assert_eq!(&ret, "* 1040 FETCH (UID 1064 FLAGS ())\r\n"); + let v = protocol_parser::uid_fetch_flags_response(response.as_bytes()) + .unwrap() + .1; + assert_eq!(v.len(), 1); +} + #[derive(Debug)] pub struct Alert(String); pub type ImapParseResult<'a, T> = Result<(&'a str, T, Option)>; @@ -171,18 +191,18 @@ impl std::fmt::Display for ResponseCode { fn fmt(&self, fmt: &mut std::fmt::Formatter) -> std::fmt::Result { use ResponseCode::*; match self { - Alert(s)=> write!(fmt, "ALERT: {}", s), - Badcharset(None)=> write!(fmt, "Given charset is not supported by this server."), - Badcharset(Some(s))=> write!(fmt, "Given charset is not supported by this server. Supported ones are: {}", s), - Capability => write!(fmt, "Capability response"), - Parse(s) => write!(fmt, "Server error in parsing message headers: {}", s), - Permanentflags(s) => write!(fmt, "Mailbox supports these flags: {}", s), - ReadOnly=> write!(fmt, "This mailbox is selected read-only."), -ReadWrite => write!(fmt, "This mailbox is selected with read-write permissions."), - Trycreate => write!(fmt, "Failed to operate on the target mailbox because it doesn't exist. Try creating it first."), - Uidnext(uid) => write!(fmt, "Next UID value is {}", uid), - Uidvalidity(uid) => write!(fmt, "Next UIDVALIDITY value is {}", uid), - Unseen(uid) => write!(fmt, "First message without the \\Seen flag is {}", uid), + Alert(s)=> write!(fmt, "ALERT: {}", s), + Badcharset(None)=> write!(fmt, "Given charset is not supported by this server."), + Badcharset(Some(s))=> write!(fmt, "Given charset is not supported by this server. Supported ones are: {}", s), + Capability => write!(fmt, "Capability response"), + Parse(s) => write!(fmt, "Server error in parsing message headers: {}", s), + Permanentflags(s) => write!(fmt, "Mailbox supports these flags: {}", s), + ReadOnly=> write!(fmt, "This mailbox is selected read-only."), + ReadWrite => write!(fmt, "This mailbox is selected with read-write permissions."), + Trycreate => write!(fmt, "Failed to operate on the target mailbox because it doesn't exist. Try creating it first."), + Uidnext(uid) => write!(fmt, "Next UID value is {}", uid), + Uidvalidity(uid) => write!(fmt, "Next UIDVALIDITY value is {}", uid), + Unseen(uid) => write!(fmt, "First message without the \\Seen flag is {}", uid), } } } @@ -658,13 +678,13 @@ pub fn uid_fetch_response_( let (input, _) = take_while(is_digit)(input)?; let (input, result) = permutation(( preceded( - tag("UID "), + alt((tag("UID "), tag(" UID "))), map_res(digit1, |s| { usize::from_str(unsafe { std::str::from_utf8_unchecked(s) }) }), ), opt(preceded( - tag("FLAGS "), + alt((tag("FLAGS "), tag(" FLAGS "))), delimited(tag("("), byte_flags, tag(")")), )), length_data(delimited( @@ -684,15 +704,18 @@ pub fn uid_fetch_response_( pub fn uid_fetch_flags_response(input: &[u8]) -> IResult<&[u8], Vec<(usize, (Flag, Vec))>> { many0(|input| -> IResult<&[u8], (usize, (Flag, Vec))> { let (input, _) = tag("* ")(input)?; - let (input, _) = take_while(is_digit)(input)?; - let (input, _) = tag(" FETCH ( ")(input)?; + let (input, _msn) = take_while(is_digit)(input)?; + let (input, _) = tag(" FETCH (")(input)?; let (input, uid_flags) = permutation(( preceded( - tag("UID "), + alt((tag("UID "), tag(" UID "))), map_res(digit1, |s| usize::from_str(to_str!(s))), ), - preceded(tag("FLAGS "), delimited(tag("("), byte_flags, tag(")"))), - ))(input.ltrim())?; + preceded( + alt((tag("FLAGS "), tag(" FLAGS "))), + delimited(tag("("), byte_flags, tag(")")), + ), + ))(input)?; let (input, _) = tag(")\r\n")(input)?; Ok((input, (uid_flags.0, uid_flags.1))) })(input) @@ -1357,13 +1380,13 @@ pub fn uid_fetch_envelopes_response( let (input, _) = tag(" FETCH (")(input)?; let (input, uid_flags) = permutation(( preceded( - tag("UID "), + alt((tag("UID "), tag(" UID "))), map_res(digit1, |s| { usize::from_str(unsafe { std::str::from_utf8_unchecked(s) }) }), ), opt(preceded( - tag("FLAGS "), + alt((tag("FLAGS "), tag(" FLAGS "))), delimited(tag("("), byte_flags, tag(")")), )), ))(input.ltrim())?; @@ -1425,31 +1448,31 @@ pub fn status_response(input: &[u8]) -> IResult<&[u8], StatusResponse> { let (input, _) = tag(" (")(input)?; let (input, result) = permutation(( opt(preceded( - tag("MESSAGES "), + alt((tag("MESSAGES "), tag(" MESSAGES "))), map_res(digit1, |s| { usize::from_str(unsafe { std::str::from_utf8_unchecked(s) }) }), )), opt(preceded( - tag("RECENT "), + alt((tag("RECENT "), tag(" RECENT "))), map_res(digit1, |s| { usize::from_str(unsafe { std::str::from_utf8_unchecked(s) }) }), )), opt(preceded( - tag("UIDNEXT "), + alt((tag("UIDNEXT "), tag(" UIDNEXT "))), map_res(digit1, |s| { usize::from_str(unsafe { std::str::from_utf8_unchecked(s) }) }), )), opt(preceded( - tag("UIDVALIDITY "), + alt((tag("UIDVALIDITY "), tag(" UIDVALIDITY "))), map_res(digit1, |s| { usize::from_str(unsafe { std::str::from_utf8_unchecked(s) }) }), )), opt(preceded( - tag("UNSEEN "), + alt((tag("UNSEEN "), tag(" UNSEEN "))), map_res(digit1, |s| { usize::from_str(unsafe { std::str::from_utf8_unchecked(s) }) }), diff --git a/melib/src/backends/imap_async/untagged.rs b/melib/src/backends/imap_async/untagged.rs index 74e2ad27..1bf7f158 100644 --- a/melib/src/backends/imap_async/untagged.rs +++ b/melib/src/backends/imap_async/untagged.rs @@ -57,7 +57,7 @@ impl ImapConnection { MailboxSelection::None => return Ok(false), }; let mailbox = - std::clone::Clone::clone(&self.uid_store.mailboxes.read().unwrap()[&mailbox_hash]); + std::clone::Clone::clone(&self.uid_store.mailboxes.lock().await[&mailbox_hash]); let mut response = String::with_capacity(8 * 1024); let untagged_response = diff --git a/melib/src/backends/imap_async/watch.rs b/melib/src/backends/imap_async/watch.rs index 31644ada..89831b29 100644 --- a/melib/src/backends/imap_async/watch.rs +++ b/melib/src/backends/imap_async/watch.rs @@ -60,7 +60,7 @@ pub async fn poll_with_examine(kit: ImapWatchKit) -> Result<()> { conn.connect().await?; let mut response = String::with_capacity(8 * 1024); loop { - let mailboxes = uid_store.mailboxes.read()?; + let mailboxes = uid_store.mailboxes.lock().await; for mailbox in mailboxes.values() { examine_updates(mailbox, &mut conn, &uid_store).await?; } @@ -84,8 +84,8 @@ pub async fn idle(kit: ImapWatchKit) -> Result<()> { conn.connect().await?; let mailbox: ImapMailbox = match uid_store .mailboxes - .read() - .unwrap() + .lock() + .await .values() .find(|f| f.parent.is_none() && (f.special_usage() == SpecialUsageMailbox::Inbox)) .map(std::clone::Clone::clone) @@ -158,37 +158,38 @@ pub async fn idle(kit: ImapWatchKit) -> Result<()> { } Err(e) => { debug!("{:?}", e); - panic!("could not select mailbox"); + return Err(e).chain_err_summary(|| "could not select mailbox"); } }; } exit_on_error!(conn, mailbox_hash, conn.send_command(b"IDLE").await); - let mut iter = ImapBlockingConnection::from(conn); + let mut blockn = ImapBlockingConnection::from(conn); let mut beat = std::time::Instant::now(); let mut watch = std::time::Instant::now(); /* duration interval to send heartbeat */ - let _26_mins = std::time::Duration::from_secs(26 * 60); + const _26_MINS: std::time::Duration = std::time::Duration::from_secs(26 * 60); /* duration interval to check other mailboxes for changes */ - let _5_mins = std::time::Duration::from_secs(5 * 60); - while let Some(line) = iter.next() { + const _5_MINS: std::time::Duration = std::time::Duration::from_secs(5 * 60); + while let Some(line) = blockn.into_stream().await { let now = std::time::Instant::now(); - if now.duration_since(beat) >= _26_mins { + if now.duration_since(beat) >= _26_MINS { let mut main_conn_lck = main_conn.lock().await; exit_on_error!( - iter.conn, + blockn.conn, mailbox_hash, - iter.conn.send_raw(b"DONE").await - iter.conn.read_response(&mut response, RequiredResponses::empty()).await - iter.conn.send_command(b"IDLE").await + blockn.conn.send_raw(b"DONE").await + blockn.conn.read_response(&mut response, RequiredResponses::empty()).await + blockn.conn.send_command(b"IDLE").await main_conn_lck.send_command(b"NOOP").await main_conn_lck.read_response(&mut response, RequiredResponses::empty()).await ); beat = now; } - if now.duration_since(watch) >= _5_mins { + if now.duration_since(watch) >= _5_MINS { /* Time to poll all inboxes */ let mut conn = main_conn.lock().await; - for mailbox in uid_store.mailboxes.read().unwrap().values() { + let mailboxes = uid_store.mailboxes.lock().await; + for mailbox in mailboxes.values() { exit_on_error!( conn, mailbox_hash, @@ -243,6 +244,10 @@ pub async fn idle(kit: ImapWatchKit) -> Result<()> { .contains_key(&(mailbox_hash, uid)) { if let Ok(mut env) = Envelope::from_bytes( + /* unwrap() is safe since we ask for RFC822 in the + * above FETCH, thus uid_fetch_responses() if + * returns a successful parse, it will include the + * RFC822 response */ body.unwrap(), flags.as_ref().map(|&(f, _)| f), ) { @@ -346,9 +351,8 @@ pub async fn idle(kit: ImapWatchKit) -> Result<()> { let mut conn = main_conn.lock().await; /* UID FETCH ALL UID, cross-ref, then FETCH difference headers * */ - let mut prev_exists = mailbox.exists.lock().unwrap(); debug!("exists {}", n); - if n > prev_exists.len() { + if n > mailbox.exists.lock().unwrap().len() { exit_on_error!( conn, mailbox_hash, @@ -356,7 +360,7 @@ pub async fn idle(kit: ImapWatchKit) -> Result<()> { conn.send_command( &[ b"FETCH", - format!("{}:{}", prev_exists.len() + 1, n).as_bytes(), + format!("{}:{}", mailbox.exists.lock().unwrap().len() + 1, n).as_bytes(), b"(UID FLAGS RFC822)", ] .join(&b' '), @@ -418,7 +422,7 @@ pub async fn idle(kit: ImapWatchKit) -> Result<()> { &[(uid, &env)], )?; } - prev_exists.insert_new(env.hash()); + mailbox.exists.lock().unwrap().insert_new(env.hash()); conn.add_refresh_event(RefreshEvent { account_hash: uid_store.account_hash, @@ -488,7 +492,7 @@ pub async fn idle(kit: ImapWatchKit) -> Result<()> { } } debug!("IDLE connection dropped"); - let err: &str = iter.err().unwrap_or("Unknown reason."); + let err: &str = blockn.err().unwrap_or("Unknown reason."); main_conn.lock().await.add_refresh_event(RefreshEvent { account_hash: uid_store.account_hash, mailbox_hash, @@ -759,7 +763,7 @@ pub async fn examine_updates( } Err(e) => { debug!("{:?}", e); - panic!("could not select mailbox"); + return Err(e).chain_err_summary(|| "could not select mailbox"); } }; Ok(()) diff --git a/melib/src/backends/jmap/operations.rs b/melib/src/backends/jmap/operations.rs index a9d74ee4..1cfd018f 100644 --- a/melib/src/backends/jmap/operations.rs +++ b/melib/src/backends/jmap/operations.rs @@ -20,8 +20,6 @@ */ use super::*; - -use crate::error::Result; use std::cell::Cell; use std::sync::{Arc, RwLock}; diff --git a/src/components/mail/compose.rs b/src/components/mail/compose.rs index 6423a36b..16aca95a 100644 --- a/src/components/mail/compose.rs +++ b/src/components/mail/compose.rs @@ -21,9 +21,11 @@ use super::*; use melib::list_management; - -use crate::terminal::embed::EmbedGrid; use melib::Draft; + +use crate::conf::accounts::JobRequest; +use crate::jobs::{oneshot, JobId}; +use crate::terminal::embed::EmbedGrid; use nix::sys::wait::WaitStatus; use std::str::FromStr; use std::sync::{Arc, Mutex}; @@ -64,6 +66,7 @@ impl std::ops::DerefMut for EmbedStatus { #[derive(Debug)] pub struct Composer { reply_context: Option<(MailboxHash, EnvelopeHash)>, + reply_bytes_request: Option<(JobId, oneshot::Receiver>>)>, account_cursor: usize, cursor: Cursor, @@ -89,6 +92,7 @@ impl Default for Composer { pager.set_reflow(text_processing::Reflow::FormatFlowed); Composer { reply_context: None, + reply_bytes_request: None, account_cursor: 0, cursor: Cursor::Headers, @@ -229,21 +233,6 @@ impl Composer { } } } - - match account.operation(msg) { - Err(err) => { - context.replies.push_back(UIEvent::Notification( - None, - err.to_string(), - Some(NotificationType::ERROR), - )); - } - Ok(op) => { - //FIXME - //let parent_bytes = op.as_bytes(); - //ret.draft = Draft::new_reply(&parent_message, parent_bytes.unwrap()); - } - } let subject = parent_message.subject(); ret.draft.headers_mut().insert( "Subject".into(), @@ -254,6 +243,54 @@ impl Composer { }, ); + drop(parent_message); + drop(account); + match context.accounts[coordinates.0] + .operation(msg) + .and_then(|mut op| op.as_bytes()) + { + Err(err) => { + context.replies.push_back(UIEvent::Notification( + None, + err.to_string(), + Some(NotificationType::ERROR), + )); + } + Ok(fut) => { + let (mut rcvr, job_id) = context.accounts[coordinates.0] + .job_executor + .spawn_specialized(fut); + context.accounts[coordinates.0] + .active_jobs + .insert(job_id.clone(), JobRequest::AsBytes); + if let Ok(Some(parent_bytes)) = try_recv_timeout!(&mut rcvr) { + match parent_bytes { + Err(err) => { + context.replies.push_back(UIEvent::Notification( + None, + err.to_string(), + Some(NotificationType::ERROR), + )); + } + Ok(parent_bytes) => { + let env_hash = msg; + let parent_message = + context.accounts[coordinates.0].collection.get_env(env_hash); + let mut new_draft = Draft::new_reply(&parent_message, &parent_bytes); + new_draft + .headers_mut() + .extend(ret.draft.headers_mut().drain()); + new_draft + .attachments_mut() + .extend(ret.draft.attachments_mut().drain(..)); + ret.set_draft(new_draft); + } + } + } else { + ret.reply_bytes_request = Some((job_id, rcvr)); + } + } + } ret.account_cursor = coordinates.0; ret.reply_context = Some((coordinates.1, msg)); ret @@ -585,6 +622,48 @@ impl Component for Composer { fn process_event(&mut self, mut event: &mut UIEvent, context: &mut Context) -> bool { let shortcuts = self.get_shortcuts(context); match (&mut self.mode, &mut event) { + (_, UIEvent::StatusEvent(StatusEvent::JobFinished(ref job_id))) + if self + .reply_bytes_request + .as_ref() + .map(|(j, _)| j == job_id) + .unwrap_or(false) => + { + let bytes = self + .reply_bytes_request + .take() + .unwrap() + .1 + .try_recv() + .unwrap() + .unwrap(); + match bytes { + Ok(parent_bytes) => { + let env_hash = self.reply_context.unwrap().1; + let parent_message = context.accounts[self.account_cursor] + .collection + .get_env(env_hash); + let mut new_draft = Draft::new_reply(&parent_message, &parent_bytes); + new_draft + .headers_mut() + .extend(self.draft.headers_mut().drain()); + new_draft + .attachments_mut() + .extend(self.draft.attachments_mut().drain(..)); + self.set_draft(new_draft); + self.set_dirty(true); + self.initialized = false; + } + Err(err) => { + context.replies.push_back(UIEvent::Notification( + Some(format!("Failed to load parent envelope")), + err.to_string(), + Some(NotificationType::ERROR), + )); + } + } + return true; + } (ViewMode::Edit, _) => { if self.pager.process_event(event, context) { return true; diff --git a/src/components/mail/listing.rs b/src/components/mail/listing.rs index 9fe869cb..c9500ffd 100644 --- a/src/components/mail/listing.rs +++ b/src/components/mail/listing.rs @@ -20,6 +20,7 @@ */ use super::*; +use crate::conf::accounts::JobRequest; use crate::types::segment_tree::SegmentTree; use smallvec::SmallVec; use std::collections::{HashMap, HashSet}; @@ -175,7 +176,10 @@ pub trait MailListingTrait: ListingTrait { )); } Ok(fut) => { - //accout.job_executor.spawn_specialized(fut); + let (rcvr, job_id) = account.job_executor.spawn_specialized(fut); + account + .active_jobs + .insert(job_id, JobRequest::SetFlags(env_hash, rcvr)); } }, ListingAction::SetUnseen => match envelope.set_unseen(op) { @@ -184,7 +188,12 @@ pub trait MailListingTrait: ListingTrait { StatusEvent::DisplayMessage(e.to_string()), )); } - Ok(fut) => {} + Ok(fut) => { + let (rcvr, job_id) = account.job_executor.spawn_specialized(fut); + account + .active_jobs + .insert(job_id, JobRequest::SetFlags(env_hash, rcvr)); + } }, ListingAction::Delete => { drop(envelope); @@ -197,20 +206,21 @@ pub trait MailListingTrait: ListingTrait { )); return; } - Ok(fut) => {} + Ok(fut) => { + let (rcvr, job_id) = account.job_executor.spawn_specialized(fut); + account + .active_jobs + .insert(job_id, JobRequest::DeleteMessage(env_hash, rcvr)); + } } continue; } ListingAction::CopyTo(ref mailbox_path) => { drop(envelope); - /* - * FIXME match account .mailbox_by_path(mailbox_path) - .and_then(|mailbox_hash| { - op.as_bytes() - .and_then(|bytes| account.save(bytes, mailbox_hash, None)) - }) { + .and_then(|mailbox_hash| op.copy_to(mailbox_hash)) + { Err(err) => { context.replies.push_back(UIEvent::Notification( Some("Could not copy.".to_string()), @@ -219,16 +229,18 @@ pub trait MailListingTrait: ListingTrait { )); return; } - Ok(fut) => {} + Ok(fut) => { + let (rcvr, job_id) = account.job_executor.spawn_specialized(fut); + account + .active_jobs + .insert(job_id, JobRequest::SaveMessage(mailbox_hash, rcvr)); + } } - */ continue; } ListingAction::CopyToOtherAccount(ref account_name, ref mailbox_path) => { drop(envelope); - /* - * FIXME - if let Err(err) = op.as_bytes().map(|b| b.to_vec()).and_then(|bytes| { + if let Err(err) = op.as_bytes().and_then(|bytes_fut| { let account_pos = context .accounts .iter() @@ -241,7 +253,11 @@ pub trait MailListingTrait: ListingTrait { })?; let account = &mut context.accounts[account_pos]; let mailbox_hash = account.mailbox_by_path(mailbox_path)?; - account.save(&bytes, mailbox_hash, None) + let (rcvr, job_id) = account.job_executor.spawn_specialized(bytes_fut); + account + .active_jobs + .insert(job_id, JobRequest::CopyTo(mailbox_hash, rcvr)); + Ok(()) }) { context.replies.push_back(UIEvent::Notification( Some("Could not copy.".to_string()), @@ -250,21 +266,52 @@ pub trait MailListingTrait: ListingTrait { )); return; } - */ continue; } ListingAction::MoveTo(ref mailbox_path) => { drop(envelope); - /* - * FIXME - if let Err(err) = - account - .mailbox_by_path(mailbox_path) - .and_then(|mailbox_hash| { - op.as_bytes() - .and_then(|bytes| account.save(bytes, mailbox_hash, None)) - }) + match account + .mailbox_by_path(mailbox_path) + .and_then(|mailbox_hash| op.copy_to(mailbox_hash)) { + Err(err) => { + context.replies.push_back(UIEvent::Notification( + Some("Could not copy.".to_string()), + err.to_string(), + Some(NotificationType::ERROR), + )); + return; + } + Ok(fut) => { + let (rcvr, job_id) = account.job_executor.spawn_specialized(fut); + account + .active_jobs + .insert(job_id, JobRequest::SaveMessage(mailbox_hash, rcvr)); + } + } + continue; + } + ListingAction::MoveToOtherAccount(ref account_name, ref mailbox_path) => { + drop(envelope); + if let Err(err) = op.as_bytes().and_then(|bytes_fut| { + let account_pos = context + .accounts + .iter() + .position(|el| el.name() == account_name) + .ok_or_else(|| { + MeliError::new(format!( + "`{}` is not a valid account name", + account_name + )) + })?; + let account = &mut context.accounts[account_pos]; + let mailbox_hash = account.mailbox_by_path(mailbox_path)?; + let (rcvr, job_id) = account.job_executor.spawn_specialized(bytes_fut); + account + .active_jobs + .insert(job_id, JobRequest::CopyTo(mailbox_hash, rcvr)); + Ok(()) + }) { context.replies.push_back(UIEvent::Notification( Some("Could not copy.".to_string()), err.to_string(), @@ -272,46 +319,12 @@ pub trait MailListingTrait: ListingTrait { )); return; } - */ - continue; - } - ListingAction::MoveToOtherAccount(ref account_name, ref mailbox_path) => { - drop(envelope); - /* FIXME - if let Err(err) = op - .as_bytes() - .map(|b| b.to_vec()) - .and_then(|bytes| { - let account_pos = context - .accounts - .iter() - .position(|el| el.name() == account_name) - .ok_or_else(|| { - MeliError::new(format!( - "`{}` is not a valid account name", - account_name - )) - })?; - let account = &mut context.accounts[account_pos]; - let mailbox_hash = account.mailbox_by_path(mailbox_path)?; - account.save(&bytes, mailbox_hash, None) - }) - .and_then(|()| { - let account = &mut context.accounts[account_pos]; + /* account .delete(env_hash, mailbox_hash) .chain_err_summary(|| { "Envelope was copied but removal from original account failed" }) - }) - { - context.replies.push_back(UIEvent::Notification( - Some("Could not move.".to_string()), - err.to_string(), - Some(NotificationType::ERROR), - )); - return; - } */ continue; } @@ -326,7 +339,10 @@ pub trait MailListingTrait: ListingTrait { return; } Ok(fut) => { - //FIXME + let (rcvr, job_id) = account.job_executor.spawn_specialized(fut); + account + .active_jobs + .insert(job_id, JobRequest::SetFlags(env_hash, rcvr)); } } } @@ -341,7 +357,10 @@ pub trait MailListingTrait: ListingTrait { return; } Ok(fut) => { - // FIXME + let (rcvr, job_id) = account.job_executor.spawn_specialized(fut); + account + .active_jobs + .insert(job_id, JobRequest::SetFlags(env_hash, rcvr)); } } } diff --git a/src/components/mail/listing/compact.rs b/src/components/mail/listing/compact.rs index 7c770ee1..f2d1d49f 100644 --- a/src/components/mail/listing/compact.rs +++ b/src/components/mail/listing/compact.rs @@ -1559,7 +1559,7 @@ impl Component for CompactListing { .map(|(_, _, j)| j == job_id) .unwrap_or(false) => { - let (filter_term, mut rcvr, job_id) = self.search_job.take().unwrap(); + let (filter_term, mut rcvr, _job_id) = self.search_job.take().unwrap(); let results = rcvr.try_recv().unwrap().unwrap(); self.filter(filter_term, results, context); self.set_dirty(true); diff --git a/src/components/mail/listing/conversations.rs b/src/components/mail/listing/conversations.rs index f7dc2146..6c9fee75 100644 --- a/src/components/mail/listing/conversations.rs +++ b/src/components/mail/listing/conversations.rs @@ -21,6 +21,7 @@ use super::*; use crate::components::utilities::PageMovement; +use crate::jobs::{oneshot, JobId}; use std::iter::FromIterator; /// A list of all mail (`Envelope`s) in a `Mailbox`. On `\n` it opens the `Envelope` content in a @@ -38,6 +39,11 @@ pub struct ConversationsListing { /// Cache current view. content: CellBuffer, + search_job: Option<( + String, + oneshot::Receiver>>, + JobId, + )>, filter_term: String, filtered_selection: Vec, filtered_order: HashMap, @@ -693,82 +699,53 @@ impl ListingTrait for ConversationsListing { results: Result>, context: &Context, ) { - /* - if filter_term.is_empty() { - return; - } + if filter_term.is_empty() { + return; + } - self.order.clear(); - self.selection.clear(); - self.length = 0; - self.filtered_selection.clear(); - self.filtered_order.clear(); - self.filter_term = filter_term.to_string(); - self.row_updates.clear(); - for v in self.selection.values_mut() { - *v = false; - } + self.order.clear(); + self.selection.clear(); + self.length = 0; + self.filtered_selection.clear(); + self.filtered_order.clear(); + self.filter_term = filter_term.to_string(); + self.row_updates.clear(); + for v in self.selection.values_mut() { + *v = false; + } - let account = &context.accounts[self.cursor_pos.0]; - match account.search(&self.filter_term, self.sort, self.cursor_pos.1) { - Ok(results) => { - /* - let threads = &account.collection.threads[&self.cursor_pos.1]; - for env_hash in results { - if !account.collection.contains_key(&env_hash) { - continue; - } - let env_thread_node_hash = account.collection.get_env(env_hash).thread(); - if !threads.thread_nodes.contains_key(&env_thread_node_hash) { - continue; - } - let thread = - threads.find_group(threads.thread_nodes[&env_thread_node_hash].group); - if self.filtered_order.contains_key(&thread) { - continue; - } - if self.all_threads.contains(&thread) { - self.filtered_selection.push(thread); - self.filtered_order - .insert(thread, self.filtered_selection.len() - 1); - } + let account = &context.accounts[self.cursor_pos.0]; + match results { + Ok(results) => { + let threads = &account.collection.threads[&self.cursor_pos.1]; + for env_hash in results { + if !account.collection.contains_key(&env_hash) { + continue; } - if !self.filtered_selection.is_empty() { - threads.group_inner_sort_by( - &mut self.filtered_selection, - self.sort, - &context.accounts[self.cursor_pos.0].collection.envelopes, - ); - self.new_cursor_pos.2 = - std::cmp::min(self.filtered_selection.len() - 1, self.cursor_pos.2); - } else { - let default_cell = { - let mut ret = Cell::with_char(' '); - ret.set_fg(self.color_cache.theme_default.fg) - .set_bg(self.color_cache.theme_default.bg) - .set_attrs(self.color_cache.theme_default.attrs); - ret - }; - self.content = CellBuffer::new_with_context(0, 0, default_cell, context); + let env_thread_node_hash = account.collection.get_env(env_hash).thread(); + if !threads.thread_nodes.contains_key(&env_thread_node_hash) { + continue; + } + let thread = + threads.find_group(threads.thread_nodes[&env_thread_node_hash].group); + if self.filtered_order.contains_key(&thread) { + continue; + } + if self.all_threads.contains(&thread) { + self.filtered_selection.push(thread); + self.filtered_order + .insert(thread, self.filtered_selection.len() - 1); } - self.redraw_threads_list( - context, - Box::new(self.filtered_selection.clone().into_iter()) - as Box>, - ); - */ } - Err(e) => { - self.cursor_pos.2 = 0; - self.new_cursor_pos.2 = 0; - let message = format!( - "Encountered an error while searching for `{}`: {}.", - self.filter_term, e - ); - log( - format!("Failed to search for term {}: {}", self.filter_term, e), - ERROR, + if !self.filtered_selection.is_empty() { + threads.group_inner_sort_by( + &mut self.filtered_selection, + self.sort, + &context.accounts[self.cursor_pos.0].collection.envelopes, ); + self.new_cursor_pos.2 = + std::cmp::min(self.filtered_selection.len() - 1, self.cursor_pos.2); + } else { let default_cell = { let mut ret = Cell::with_char(' '); ret.set_fg(self.color_cache.theme_default.fg) @@ -776,20 +753,45 @@ impl ListingTrait for ConversationsListing { .set_attrs(self.color_cache.theme_default.attrs); ret }; - self.content = - CellBuffer::new_with_context(message.len(), 1, default_cell, context); - write_string_to_grid( - &message, - &mut self.content, - self.color_cache.theme_default.fg, - self.color_cache.theme_default.bg, - self.color_cache.theme_default.attrs, - ((0, 0), (message.len() - 1, 0)), - None, - ); + self.content = CellBuffer::new_with_context(0, 0, default_cell, context); } + self.redraw_threads_list( + context, + Box::new(self.filtered_selection.clone().into_iter()) + as Box>, + ); } - */ + Err(e) => { + self.cursor_pos.2 = 0; + self.new_cursor_pos.2 = 0; + let message = format!( + "Encountered an error while searching for `{}`: {}.", + self.filter_term, e + ); + log( + format!("Failed to search for term {}: {}", self.filter_term, e), + ERROR, + ); + let default_cell = { + let mut ret = Cell::with_char(' '); + ret.set_fg(self.color_cache.theme_default.fg) + .set_bg(self.color_cache.theme_default.bg) + .set_attrs(self.color_cache.theme_default.attrs); + ret + }; + self.content = + CellBuffer::new_with_context(message.len(), 1, default_cell, context); + write_string_to_grid( + &message, + &mut self.content, + self.color_cache.theme_default.fg, + self.color_cache.theme_default.bg, + self.color_cache.theme_default.attrs, + ((0, 0), (message.len() - 1, 0)), + None, + ); + } + } } fn set_movement(&mut self, mvm: PageMovement) { @@ -815,6 +817,7 @@ impl ConversationsListing { subsort: (SortField::Date, SortOrder::Desc), order: HashMap::default(), all_threads: HashSet::default(), + search_job: None, filter_term: String::new(), filtered_selection: Vec::new(), filtered_order: HashMap::default(), @@ -1345,8 +1348,29 @@ impl Component for ConversationsListing { } UIEvent::Action(ref action) => match action { Action::Listing(Search(ref filter_term)) if !self.unfocused => { - //self.filter(filter_term, context); - self.dirty = true; + match context.accounts[self.cursor_pos.0].search( + filter_term, + self.sort, + self.cursor_pos.1, + ) { + Ok(job) => { + let (chan, job_id) = context.accounts[self.cursor_pos.0] + .job_executor + .spawn_specialized(job); + context.accounts[self.cursor_pos.0] + .active_jobs + .insert(job_id.clone(), crate::conf::accounts::JobRequest::Search); + self.search_job = Some((filter_term.to_string(), chan, job_id)); + } + Err(err) => { + context.replies.push_back(UIEvent::Notification( + Some("Could not perform search".to_string()), + err.to_string(), + Some(crate::types::NotificationType::ERROR), + )); + } + }; + self.set_dirty(true); return true; } _ => {} @@ -1370,6 +1394,18 @@ impl Component for ConversationsListing { self.set_dirty(true); return true; } + UIEvent::StatusEvent(StatusEvent::JobFinished(ref job_id)) + if self + .search_job + .as_ref() + .map(|(_, _, j)| j == job_id) + .unwrap_or(false) => + { + let (filter_term, mut rcvr, _job_id) = self.search_job.take().unwrap(); + let results = rcvr.try_recv().unwrap().unwrap(); + self.filter(filter_term, results, context); + self.set_dirty(true); + } _ => {} } diff --git a/src/components/mail/listing/plain.rs b/src/components/mail/listing/plain.rs index 98e5ddcc..76a21774 100644 --- a/src/components/mail/listing/plain.rs +++ b/src/components/mail/listing/plain.rs @@ -59,6 +59,11 @@ pub struct PlainListing { /// Cache current view. data_columns: DataColumns, + search_job: Option<( + String, + oneshot::Receiver>>, + JobId, + )>, filter_term: String, filtered_selection: Vec, filtered_order: HashMap, @@ -587,71 +592,41 @@ impl ListingTrait for PlainListing { results: Result>, context: &Context, ) { - /* - if filter_term.is_empty() { - return; - } + if filter_term.is_empty() { + return; + } - self.order.clear(); - self.selection.clear(); - self.length = 0; - self.filtered_selection.clear(); - self.filtered_order.clear(); - self.filter_term = filter_term.to_string(); - self.row_updates.clear(); - for v in self.selection.values_mut() { - *v = false; - } + self.order.clear(); + self.selection.clear(); + self.length = 0; + self.filtered_selection.clear(); + self.filtered_order.clear(); + self.filter_term = filter_term.to_string(); + self.row_updates.clear(); + for v in self.selection.values_mut() { + *v = false; + } - let account = &context.accounts[self.cursor_pos.0]; - match account.search(&self.filter_term, self.sort, self.cursor_pos.1) { - Ok(results) => { - /* - for env_hash in results { - if !account.collection.contains_key(&env_hash) { - continue; - } - if self.filtered_order.contains_key(&env_hash) { - continue; - } - if self.all_envelopes.contains(&env_hash) { - self.filtered_selection.push(env_hash); - self.filtered_order - .insert(env_hash, self.filtered_selection.len() - 1); - } + let account = &context.accounts[self.cursor_pos.0]; + match results { + Ok(results) => { + for env_hash in results { + if !account.collection.contains_key(&env_hash) { + continue; } - if !self.filtered_selection.is_empty() { - self.new_cursor_pos.2 = - std::cmp::min(self.filtered_selection.len() - 1, self.cursor_pos.2); - } else { - let default_cell = { - let mut ret = Cell::with_char(' '); - ret.set_fg(self.color_cache.theme_default.fg) - .set_bg(self.color_cache.theme_default.bg) - .set_attrs(self.color_cache.theme_default.attrs); - ret - }; - self.data_columns.columns[0] = - CellBuffer::new_with_context(0, 0, default_cell, context); + if self.filtered_order.contains_key(&env_hash) { + continue; + } + if self.all_envelopes.contains(&env_hash) { + self.filtered_selection.push(env_hash); + self.filtered_order + .insert(env_hash, self.filtered_selection.len() - 1); } - self.redraw_list( - context, - Box::new(self.filtered_selection.clone().into_iter()) - as Box>, - ); - */ } - Err(e) => { - self.cursor_pos.2 = 0; - self.new_cursor_pos.2 = 0; - let message = format!( - "Encountered an error while searching for `{}`: {}.", - &self.filter_term, e - ); - log( - format!("Failed to search for term {}: {}", &self.filter_term, e), - ERROR, - ); + if !self.filtered_selection.is_empty() { + self.new_cursor_pos.2 = + std::cmp::min(self.filtered_selection.len() - 1, self.cursor_pos.2); + } else { let default_cell = { let mut ret = Cell::with_char(' '); ret.set_fg(self.color_cache.theme_default.fg) @@ -660,19 +635,45 @@ impl ListingTrait for PlainListing { ret }; self.data_columns.columns[0] = - CellBuffer::new_with_context(message.len(), 1, default_cell, context); - write_string_to_grid( - &message, - &mut self.data_columns.columns[0], - self.color_cache.theme_default.fg, - self.color_cache.theme_default.bg, - self.color_cache.theme_default.attrs, - ((0, 0), (message.len() - 1, 0)), - None, - ); + CellBuffer::new_with_context(0, 0, default_cell, context); } + self.redraw_list( + context, + Box::new(self.filtered_selection.clone().into_iter()) + as Box>, + ); } - */ + Err(e) => { + self.cursor_pos.2 = 0; + self.new_cursor_pos.2 = 0; + let message = format!( + "Encountered an error while searching for `{}`: {}.", + &self.filter_term, e + ); + log( + format!("Failed to search for term {}: {}", &self.filter_term, e), + ERROR, + ); + let default_cell = { + let mut ret = Cell::with_char(' '); + ret.set_fg(self.color_cache.theme_default.fg) + .set_bg(self.color_cache.theme_default.bg) + .set_attrs(self.color_cache.theme_default.attrs); + ret + }; + self.data_columns.columns[0] = + CellBuffer::new_with_context(message.len(), 1, default_cell, context); + write_string_to_grid( + &message, + &mut self.data_columns.columns[0], + self.color_cache.theme_default.fg, + self.color_cache.theme_default.bg, + self.color_cache.theme_default.attrs, + ((0, 0), (message.len() - 1, 0)), + None, + ); + } + } } fn set_movement(&mut self, mvm: PageMovement) { @@ -701,6 +702,7 @@ impl PlainListing { thread_node_hashes: HashMap::default(), order: HashMap::default(), filter_term: String::new(), + search_job: None, filtered_selection: Vec::new(), filtered_order: HashMap::default(), selection: HashMap::default(), @@ -1276,8 +1278,41 @@ impl Component for PlainListing { return true; } UIEvent::Action(Action::Listing(Search(ref filter_term))) if !self.unfocused => { - //self.filter(filter_term, context); - self.dirty = true; + match context.accounts[self.cursor_pos.0].search( + filter_term, + self.sort, + self.cursor_pos.1, + ) { + Ok(job) => { + let (chan, job_id) = context.accounts[self.cursor_pos.0] + .job_executor + .spawn_specialized(job); + context.accounts[self.cursor_pos.0] + .active_jobs + .insert(job_id.clone(), crate::conf::accounts::JobRequest::Search); + self.search_job = Some((filter_term.to_string(), chan, job_id)); + } + Err(err) => { + context.replies.push_back(UIEvent::Notification( + Some("Could not perform search".to_string()), + err.to_string(), + Some(crate::types::NotificationType::ERROR), + )); + } + }; + self.set_dirty(true); + } + UIEvent::StatusEvent(StatusEvent::JobFinished(ref job_id)) + if self + .search_job + .as_ref() + .map(|(_, _, j)| j == job_id) + .unwrap_or(false) => + { + let (filter_term, mut rcvr, _job_id) = self.search_job.take().unwrap(); + let results = rcvr.try_recv().unwrap().unwrap(); + self.filter(filter_term, results, context); + self.set_dirty(true); } _ => {} } diff --git a/src/components/mail/view.rs b/src/components/mail/view.rs index 77dd48d9..81aab19f 100644 --- a/src/components/mail/view.rs +++ b/src/components/mail/view.rs @@ -193,11 +193,15 @@ impl MailView { .and_then(|mut op| op.as_bytes()) { Ok(fut) => { - let (chan, job_id) = account.job_executor.spawn_specialized(fut); + let (mut chan, job_id) = account.job_executor.spawn_specialized(fut); debug!(&job_id); self.active_jobs.insert(job_id.clone()); account.active_jobs.insert(job_id, JobRequest::AsBytes); - self.state = MailViewState::LoadingBody { job_id, chan }; + if let Ok(Some(bytes_result)) = try_recv_timeout!(&mut chan) { + self.state = MailViewState::Loaded { body: bytes_result }; + } else { + self.state = MailViewState::LoadingBody { job_id, chan }; + } } Err(err) => { context.replies.push_back(UIEvent::StatusEvent( diff --git a/src/conf/accounts.rs b/src/conf/accounts.rs index 3222bc7d..f925232d 100644 --- a/src/conf/accounts.rs +++ b/src/conf/accounts.rs @@ -55,6 +55,22 @@ use std::pin::Pin; use std::result; use std::sync::{Arc, RwLock}; +#[macro_export] +macro_rules! try_recv_timeout { + ($oneshot:expr) => {{ + const _3_MS: std::time::Duration = std::time::Duration::from_millis(95); + let now = std::time::Instant::now(); + let mut res = Ok(None); + while now + _3_MS >= std::time::Instant::now() { + res = $oneshot.try_recv().map_err(|_| MeliError::new("canceled")); + if res.as_ref().map(|r| r.is_some()).unwrap_or(false) || res.is_err() { + break; + } + } + res + }}; +} + pub type Worker = Option>>>; #[derive(Debug)] @@ -151,6 +167,7 @@ pub enum JobRequest { Refresh(MailboxHash, oneshot::Receiver>), SetFlags(EnvelopeHash, oneshot::Receiver>), SaveMessage(MailboxHash, oneshot::Receiver>), + CopyTo(MailboxHash, oneshot::Receiver>>), DeleteMessage(EnvelopeHash, oneshot::Receiver>), CreateMailbox(oneshot::Receiver)>>), DeleteMailbox(oneshot::Receiver>>), @@ -171,6 +188,7 @@ impl core::fmt::Debug for JobRequest { JobRequest::Refresh(_, _) => write!(f, "{}", "JobRequest::Refresh"), JobRequest::SetFlags(_, _) => write!(f, "{}", "JobRequest::SetFlags"), JobRequest::SaveMessage(_, _) => write!(f, "{}", "JobRequest::SaveMessage"), + JobRequest::CopyTo(_, _) => write!(f, "{}", "JobRequest::CopyTo"), JobRequest::DeleteMessage(_, _) => write!(f, "{}", "JobRequest::DeleteMessage"), JobRequest::CreateMailbox(_) => write!(f, "{}", "JobRequest::CreateMailbox"), JobRequest::DeleteMailbox(_) => write!(f, "{}", "JobRequest::DeleteMailbox"), @@ -189,6 +207,13 @@ impl core::fmt::Debug for JobRequest { } impl JobRequest { + fn is_watch(&self) -> bool { + match self { + JobRequest::Watch(_) => true, + _ => false, + } + } + fn is_get(&self, mailbox_hash: MailboxHash) -> bool { match self { JobRequest::Get(h, _) if *h == mailbox_hash => true, @@ -316,10 +341,6 @@ impl Account { active_jobs.insert(job_id, JobRequest::Mailboxes(rcvr)); } } - if let Ok(online_job) = backend.is_online_async() { - let (rcvr, job_id) = job_executor.spawn_specialized(online_job); - active_jobs.insert(job_id, JobRequest::IsOnline(rcvr)); - } } Ok(Account { @@ -827,7 +848,7 @@ impl Account { } Ok(()) } - pub fn watch(&self) { + pub fn watch(&mut self) { if self.settings.account().manual_refresh { return; } @@ -836,27 +857,45 @@ impl Account { let r = RefreshEventConsumer::new(Box::new(move |r| { sender_.send(ThreadEvent::from(r)).unwrap(); })); - match self - .backend - .read() - .unwrap() - .watch(r, self.work_context.clone()) - { - Ok(id) => { - self.sender - .send(ThreadEvent::NewThread( - id, - format!("watching {}", self.name()).into(), - )) - .unwrap(); + if self.settings.conf.is_async { + if !self.active_jobs.values().any(|j| j.is_watch()) { + match self.backend.read().unwrap().watch_async(r) { + Ok(fut) => { + let (handle, job_id) = self.job_executor.spawn(fut); + self.active_jobs.insert(job_id, JobRequest::Watch(handle)); + } + Err(e) => { + self.sender + .send(ThreadEvent::UIEvent(UIEvent::StatusEvent( + StatusEvent::DisplayMessage(e.to_string()), + ))) + .unwrap(); + } + } } + } else { + match self + .backend + .read() + .unwrap() + .watch(r, self.work_context.clone()) + { + Ok(id) => { + self.sender + .send(ThreadEvent::NewThread( + id, + format!("watching {}", self.name()).into(), + )) + .unwrap(); + } - Err(e) => { - self.sender - .send(ThreadEvent::UIEvent(UIEvent::StatusEvent( - StatusEvent::DisplayMessage(e.to_string()), - ))) - .unwrap(); + Err(e) => { + self.sender + .send(ThreadEvent::UIEvent(UIEvent::StatusEvent( + StatusEvent::DisplayMessage(e.to_string()), + ))) + .unwrap(); + } } } } @@ -911,21 +950,35 @@ impl Account { MailboxStatus::None => { if self.settings.conf.is_async { if !self.active_jobs.values().any(|j| j.is_get(mailbox_hash)) { - if let Ok(mailbox_job) = - self.backend.write().unwrap().get_async( - &&self.mailbox_entries[&mailbox_hash].ref_mailbox, - ) - { - let mailbox_job = mailbox_job.into_future(); - let (rcvr, job_id) = - self.job_executor.spawn_specialized(mailbox_job); - self.sender - .send(ThreadEvent::UIEvent(UIEvent::StatusEvent( - StatusEvent::NewJob(job_id.clone()), - ))) - .unwrap(); - self.active_jobs - .insert(job_id, JobRequest::Get(mailbox_hash, rcvr)); + match self.backend.write().unwrap().get_async( + &&self.mailbox_entries[&mailbox_hash].ref_mailbox, + ) { + Ok(mailbox_job) => { + let mailbox_job = mailbox_job.into_future(); + let (rcvr, job_id) = + self.job_executor.spawn_specialized(mailbox_job); + self.sender + .send(ThreadEvent::UIEvent(UIEvent::StatusEvent( + StatusEvent::NewJob(job_id.clone()), + ))) + .unwrap(); + self.active_jobs.insert( + job_id, + JobRequest::Get(mailbox_hash, rcvr), + ); + } + Err(err) => { + self.mailbox_entries.entry(mailbox_hash).and_modify( + |entry| { + entry.status = MailboxStatus::Failed(err); + }, + ); + self.sender + .send(ThreadEvent::UIEvent(UIEvent::StartupCheck( + mailbox_hash, + ))) + .unwrap(); + } } } } else if self.mailbox_entries[&mailbox_hash].worker.is_none() { @@ -945,7 +998,7 @@ impl Account { Err(0) } _ => Err(0), - } + }; } Some(ref mut w) => match debug!(w.poll()) { Ok(AsyncStatus::NoUpdate) => { @@ -1105,27 +1158,18 @@ impl Account { Ok(()) } - pub fn delete(&mut self, env_hash: EnvelopeHash, mailbox_hash: MailboxHash) -> Result<()> { + pub fn delete( + &mut self, + env_hash: EnvelopeHash, + mailbox_hash: MailboxHash, + ) -> ResultFuture<()> { if self.settings.account.read_only() { return Err(MeliError::new(format!( "Account {} is read-only.", self.name.as_str() ))); } - let job = self - .backend - .write() - .unwrap() - .delete(env_hash, mailbox_hash)?; - let (rcvr, job_id) = self.job_executor.spawn_specialized(job); - self.sender - .send(ThreadEvent::UIEvent(UIEvent::StatusEvent( - StatusEvent::NewJob(job_id.clone()), - ))) - .unwrap(); - self.active_jobs - .insert(job_id, JobRequest::DeleteMessage(env_hash, rcvr)); - Ok(()) + self.backend.write().unwrap().delete(env_hash, mailbox_hash) } pub fn contains_key(&self, h: EnvelopeHash) -> bool { @@ -1343,7 +1387,7 @@ impl Account { pub fn search( &self, search_term: &str, - sort: (SortField, SortOrder), + _sort: (SortField, SortOrder), mailbox_hash: MailboxHash, ) -> ResultFuture> { use melib::parsec::Parser; @@ -1545,6 +1589,22 @@ impl Account { .expect("Could not send event on main channel"); } } + JobRequest::CopyTo(mailbox_hash, mut chan) => { + if let Err(err) = chan + .try_recv() + .unwrap() + .unwrap() + .and_then(|bytes| self.save(&bytes, mailbox_hash, None)) + { + self.sender + .send(ThreadEvent::UIEvent(UIEvent::Notification( + Some(format!("{}: could not save message", &self.name)), + err.to_string(), + Some(crate::types::NotificationType::ERROR), + ))) + .expect("Could not send event on main channel"); + } + } JobRequest::DeleteMessage(_, mut chan) => { let r = chan.try_recv().unwrap(); if let Some(Err(err)) = r { @@ -1668,7 +1728,9 @@ impl Account { None => {} } } - JobRequest::Watch(_) => {} + JobRequest::Watch(_) => { + debug!("JobRequest::Watch finished??? "); + } } true } else { diff --git a/src/jobs.rs b/src/jobs.rs index 152202db..3f3a8d49 100644 --- a/src/jobs.rs +++ b/src/jobs.rs @@ -142,7 +142,7 @@ impl JobExecutor { let MeliTask { task, id } = meli_task; debug!("Worker {} got task {:?}", i, id); let _ = catch_unwind(|| task.run()); - debug!("Worker {} got result {:?}", i, id); + debug!("Worker {} returned after {:?}", i, id); } }) .unwrap();