diff --git a/melib/src/backends/imap.rs b/melib/src/backends/imap.rs index 4e5cac23..46fd2ecd 100644 --- a/melib/src/backends/imap.rs +++ b/melib/src/backends/imap.rs @@ -33,6 +33,7 @@ pub use connection::*; mod watch; pub use watch::*; pub mod managesieve; +mod untagged; use crate::async_workers::{Async, AsyncBuilder, AsyncStatus, WorkContext}; use crate::backends::BackendOp; @@ -126,6 +127,12 @@ pub struct UIDStore { uid_index: Arc>>, byte_cache: Arc>>, + tag_index: Arc>>, + + mailboxes: Arc>>, + is_online: Arc)>>, + refresh_events: Arc>>, + sender: Arc>>, } impl Default for UIDStore { @@ -136,6 +143,14 @@ impl Default for UIDStore { hash_index: Default::default(), uid_index: Default::default(), byte_cache: Default::default(), + mailboxes: Arc::new(RwLock::new(Default::default())), + tag_index: Arc::new(RwLock::new(Default::default())), + is_online: Arc::new(Mutex::new(( + Instant::now(), + Err(MeliError::new("Account is uninitialised.")), + ))), + refresh_events: Default::default(), + sender: Arc::new(RwLock::new(None)), } } } @@ -143,21 +158,21 @@ impl Default for UIDStore { #[derive(Debug)] pub struct ImapType { account_name: String, - online: Arc)>>, is_subscribed: Arc, connection: Arc>, server_conf: ImapServerConf, uid_store: Arc, can_create_flags: Arc>, - tag_index: Arc>>, - - mailboxes: Arc>>, } #[inline(always)] -pub(self) fn try_lock(connection: &Arc>) -> Result> { +pub(self) fn try_lock( + connection: &Arc>, + dur: Option, +) -> Result> { let now = Instant::now(); - while Instant::now().duration_since(now) <= std::time::Duration::new(2, 0) { + while Instant::now().duration_since(now) <= dur.unwrap_or(std::time::Duration::from_millis(150)) + { if let Ok(guard) = connection.try_lock() { return Ok(guard); } @@ -167,18 +182,18 @@ pub(self) fn try_lock(connection: &Arc>) -> Result Result<()> { - if let Ok(mut g) = try_lock(&self.connection) { + if let Ok(mut g) = try_lock(&self.connection, None) { let _ = g.connect(); } - try_lock(&self.online)?.1.clone() + try_lock(&self.uid_store.is_online, None)?.1.clone() } fn connect(&mut self) { if self.is_online().is_err() { - if Instant::now().duration_since(self.online.lock().unwrap().0) + if Instant::now().duration_since(self.uid_store.is_online.lock().unwrap().0) >= std::time::Duration::new(2, 0) { - if let Ok(mut g) = try_lock(&self.connection) { + if let Ok(mut g) = try_lock(&self.connection, None) { let _ = g.connect(); } } @@ -190,11 +205,10 @@ impl MailBackend for ImapType { let handle = { let tx = w.tx(); let uid_store = self.uid_store.clone(); - let tag_index = self.tag_index.clone(); let can_create_flags = self.can_create_flags.clone(); let mailbox_hash = mailbox.hash(); let (permissions, mailbox_path, mailbox_exists, no_select, unseen) = { - let f = &self.mailboxes.read().unwrap()[&mailbox_hash]; + let f = &self.uid_store.mailboxes.read().unwrap()[&mailbox_hash]; ( f.permissions.clone(), f.imap_path().to_string(), @@ -214,13 +228,12 @@ impl MailBackend for ImapType { if let Err(err) = (move || { let tx = _tx; let mut response = String::with_capacity(8 * 1024); - let mut conn = try_lock(&connection)?; + let mut conn = connection.lock()?; debug!("locked for get {}", mailbox_path); /* first SELECT the mailbox to get READ/WRITE permissions (because EXAMINE only * returns READ-ONLY for both cases) */ - conn.send_command(format!("SELECT \"{}\"", mailbox_path).as_bytes())?; - conn.read_response(&mut response)?; + conn.select_mailbox(mailbox_hash, &mut response)?; let examine_response = protocol_parser::select_response(&response)?; *can_create_flags.lock().unwrap() = examine_response.can_create_flags; debug!( @@ -247,22 +260,27 @@ impl MailBackend for ImapType { *mailbox_exists = exists; } /* reselecting the same mailbox with EXAMINE prevents expunging it */ - conn.send_command(format!("EXAMINE \"{}\"", mailbox_path).as_bytes())?; - conn.read_response(&mut response)?; + conn.examine_mailbox(mailbox_hash, &mut response)?; - let mut tag_lck = tag_index.write().unwrap(); + let mut tag_lck = uid_store.tag_index.write().unwrap(); let mut our_unseen = 0; - while exists > 1 { + while exists > 0 { let mut envelopes = vec![]; - conn.send_command( - format!( - "UID FETCH {}:{} (UID FLAGS ENVELOPE BODYSTRUCTURE)", - std::cmp::max(exists.saturating_sub(500), 1), - exists - ) - .as_bytes(), - )?; - conn.read_response(&mut response)?; + debug!("{} exists= {}", mailbox_hash, exists); + if exists == 1 { + debug!("UID FETCH 1 (UID FLAGS ENVELOPE BODYSTRUCTURE)"); + conn.send_command(b"UID FETCH 1 (UID FLAGS ENVELOPE BODYSTRUCTURE)")?; + } else { + conn.send_command( + debug!(format!( + "UID FETCH {}:{} (UID FLAGS ENVELOPE BODYSTRUCTURE)", + std::cmp::max(exists.saturating_sub(500), 1), + exists + )) + .as_bytes(), + )? + }; + conn.read_response(&mut response, RequiredResponses::FETCH_REQUIRED)?; debug!( "fetch response is {} bytes and {} lines", response.len(), @@ -304,14 +322,18 @@ impl MailBackend for ImapType { envelopes.push(env); } exists = std::cmp::max(exists.saturating_sub(500), 1); - debug!("sending payload"); + debug!("sending payload for {}", mailbox_hash); *unseen.lock().unwrap() = our_unseen; tx.send(AsyncStatus::Payload(Ok(envelopes))).unwrap(); + if exists == 1 { + break; + } } drop(conn); Ok(()) })() { + debug!("sending error payload for {}: {:?}", mailbox_hash, &err); tx.send(AsyncStatus::Payload(Err(err))).unwrap(); } tx.send(AsyncStatus::Finished).unwrap(); @@ -327,14 +349,15 @@ impl MailBackend for ImapType { sender: RefreshEventConsumer, ) -> Result> { let inbox = self + .uid_store .mailboxes .read() .unwrap() .get(&mailbox_hash) .map(std::clone::Clone::clone) .unwrap(); - let tag_index = self.tag_index.clone(); let main_conn = self.connection.clone(); + *self.uid_store.sender.write().unwrap() = Some(sender); let uid_store = self.uid_store.clone(); let account_name = self.account_name.clone(); let account_hash = { @@ -345,14 +368,20 @@ impl MailBackend for ImapType { let w = AsyncBuilder::new(); let closure = move |work_context: WorkContext| { let thread = std::thread::current(); - let mut conn = match try_lock(&main_conn) { + let mut conn = match try_lock(&main_conn, Some(std::time::Duration::new(2, 0))) { Ok(conn) => conn, Err(err) => { - sender.send(RefreshEvent { - account_hash, - mailbox_hash, - kind: RefreshEventKind::Failure(err.clone()), - }); + uid_store + .sender + .read() + .unwrap() + .as_ref() + .unwrap() + .send(RefreshEvent { + account_hash, + mailbox_hash, + kind: RefreshEventKind::Failure(err.clone()), + }); return; } @@ -369,17 +398,9 @@ impl MailBackend for ImapType { .set_status .send((thread.id(), "refresh".to_string())) .unwrap(); - watch::examine_updates( - account_hash, - &inbox, - &sender, - &mut conn, - &uid_store, - &work_context, - &tag_index, - ) - .ok() - .take(); + watch::examine_updates(account_hash, &inbox, &mut conn, &uid_store, &work_context) + .ok() + .take(); }; Ok(w.build(Box::new(closure))) } @@ -389,11 +410,9 @@ impl MailBackend for ImapType { sender: RefreshEventConsumer, work_context: WorkContext, ) -> Result { - let mailboxes = self.mailboxes.clone(); - let tag_index = self.tag_index.clone(); - let conn = ImapConnection::new_connection(&self.server_conf, self.online.clone()); + let conn = ImapConnection::new_connection(&self.server_conf, self.uid_store.clone()); let main_conn = self.connection.clone(); - let is_online = self.online.clone(); + *self.uid_store.sender.write().unwrap() = Some(sender); let uid_store = self.uid_store.clone(); let account_hash = { let mut hasher = DefaultHasher::new(); @@ -416,13 +435,9 @@ impl MailBackend for ImapType { .any(|cap| cap.eq_ignore_ascii_case(b"IDLE")); let kit = ImapWatchKit { conn, - is_online, main_conn, uid_store, - mailboxes, - sender, work_context, - tag_index, account_hash, }; if has_idle { @@ -436,7 +451,7 @@ impl MailBackend for ImapType { fn mailboxes(&self) -> Result> { { - let mailboxes = self.mailboxes.read().unwrap(); + let mailboxes = self.uid_store.mailboxes.read().unwrap(); if !mailboxes.is_empty() { return Ok(mailboxes .iter() @@ -444,8 +459,9 @@ impl MailBackend for ImapType { .collect()); } } - let mut mailboxes = self.mailboxes.write()?; - *mailboxes = ImapType::imap_mailboxes(&self.connection)?; + let new_mailboxes = ImapType::imap_mailboxes(&self.connection)?; + let mut mailboxes = self.uid_store.mailboxes.write()?; + *mailboxes = new_mailboxes; mailboxes.retain(|_, f| (self.is_subscribed)(f.path())); let keys = mailboxes.keys().cloned().collect::>(); let mut uid_lock = self.uid_store.uidvalidity.lock().unwrap(); @@ -465,18 +481,18 @@ impl MailBackend for ImapType { let (uid, mailbox_hash) = self.uid_store.hash_index.lock().unwrap()[&hash]; Box::new(ImapOp::new( uid, - self.mailboxes.read().unwrap()[&mailbox_hash] + self.uid_store.mailboxes.read().unwrap()[&mailbox_hash] .imap_path() .to_string(), + mailbox_hash, self.connection.clone(), self.uid_store.clone(), - self.tag_index.clone(), )) } fn save(&self, bytes: &[u8], mailbox: &str, flags: Option) -> Result<()> { let path = { - let mailboxes = self.mailboxes.read().unwrap(); + let mailboxes = self.uid_store.mailboxes.read().unwrap(); let f_result = mailboxes .values() @@ -499,7 +515,7 @@ impl MailBackend for ImapType { )))? }; let mut response = String::with_capacity(8 * 1024); - let mut conn = try_lock(&self.connection)?; + let mut conn = try_lock(&self.connection, Some(std::time::Duration::new(5, 0)))?; let flags = flags.unwrap_or(Flag::empty()); conn.send_command( format!( @@ -513,7 +529,7 @@ impl MailBackend for ImapType { // wait for "+ Ready for literal data" reply conn.wait_for_continuation_request()?; conn.send_literal(bytes)?; - conn.read_response(&mut response)?; + conn.read_response(&mut response, RequiredResponses::empty())?; Ok(()) } @@ -527,7 +543,7 @@ impl MailBackend for ImapType { fn tags(&self) -> Option>>> { if *self.can_create_flags.lock().unwrap() { - Some(self.tag_index.clone()) + Some(self.uid_store.tag_index.clone()) } else { None } @@ -551,7 +567,7 @@ impl MailBackend for ImapType { * decision is unpleasant for you. */ - let mut mailboxes = self.mailboxes.write().unwrap(); + let mut mailboxes = self.uid_store.mailboxes.write().unwrap(); for root_mailbox in mailboxes.values().filter(|f| f.parent.is_none()) { if path.starts_with(&root_mailbox.name) { debug!("path starts with {:?}", &root_mailbox); @@ -572,12 +588,12 @@ impl MailBackend for ImapType { let mut response = String::with_capacity(8 * 1024); { - let mut conn_lck = try_lock(&self.connection)?; + let mut conn_lck = try_lock(&self.connection, None)?; conn_lck.send_command(format!("CREATE \"{}\"", path,).as_bytes())?; - conn_lck.read_response(&mut response)?; + conn_lck.read_response(&mut response, RequiredResponses::empty())?; conn_lck.send_command(format!("SUBSCRIBE \"{}\"", path,).as_bytes())?; - conn_lck.read_response(&mut response)?; + conn_lck.read_response(&mut response, RequiredResponses::empty())?; } let ret: Result<()> = ImapResponse::from(&response).into(); ret?; @@ -591,42 +607,25 @@ impl MailBackend for ImapType { &mut self, mailbox_hash: MailboxHash, ) -> Result> { - let mut mailboxes = self.mailboxes.write().unwrap(); + let mailboxes = self.uid_store.mailboxes.read().unwrap(); let permissions = mailboxes[&mailbox_hash].permissions(); if !permissions.delete_mailbox { return Err(MeliError::new(format!("You do not have permission to delete `{}`. Set permissions for this mailbox are {}", mailboxes[&mailbox_hash].name(), permissions))); } let mut response = String::with_capacity(8 * 1024); { - let mut conn_lck = try_lock(&self.connection)?; - if !mailboxes[&mailbox_hash].no_select { + let mut conn_lck = try_lock(&self.connection, None)?; + if !mailboxes[&mailbox_hash].no_select && conn_lck.current_mailbox == Some(mailbox_hash) + { /* make sure mailbox is not selected before it gets deleted, otherwise * connection gets dropped by server */ - if conn_lck - .capabilities - .iter() - .any(|cap| cap.eq_ignore_ascii_case(b"UNSELECT")) - { - conn_lck.send_command( - format!("UNSELECT \"{}\"", mailboxes[&mailbox_hash].imap_path()).as_bytes(), - )?; - conn_lck.read_response(&mut response)?; - } else { - conn_lck.send_command( - format!("SELECT \"{}\"", mailboxes[&mailbox_hash].imap_path()).as_bytes(), - )?; - conn_lck.read_response(&mut response)?; - conn_lck.send_command( - format!("EXAMINE \"{}\"", mailboxes[&mailbox_hash].imap_path()).as_bytes(), - )?; - conn_lck.read_response(&mut response)?; - } + conn_lck.unselect()?; } if mailboxes[&mailbox_hash].is_subscribed() { conn_lck.send_command( format!("UNSUBSCRIBE \"{}\"", mailboxes[&mailbox_hash].imap_path()).as_bytes(), )?; - conn_lck.read_response(&mut response)?; + conn_lck.read_response(&mut response, RequiredResponses::empty())?; } conn_lck.send_command( @@ -636,24 +635,25 @@ impl MailBackend for ImapType { )) .as_bytes(), )?; - conn_lck.read_response(&mut response)?; + conn_lck.read_response(&mut response, RequiredResponses::empty())?; } let ret: Result<()> = ImapResponse::from(&response).into(); ret?; + let mut mailboxes = self.uid_store.mailboxes.write().unwrap(); mailboxes.clear(); drop(mailboxes); self.mailboxes().map_err(|err| format!("Mailbox delete was succesful (returned `{}`) but listing mailboxes afterwards returned `{}`", response, err).into()) } fn set_mailbox_subscription(&mut self, mailbox_hash: MailboxHash, new_val: bool) -> Result<()> { - let mut mailboxes = self.mailboxes.write().unwrap(); + let mut mailboxes = self.uid_store.mailboxes.write().unwrap(); if mailboxes[&mailbox_hash].is_subscribed() == new_val { return Ok(()); } let mut response = String::with_capacity(8 * 1024); { - let mut conn_lck = try_lock(&self.connection)?; + let mut conn_lck = try_lock(&self.connection, None)?; if new_val { conn_lck.send_command( format!("SUBSCRIBE \"{}\"", mailboxes[&mailbox_hash].imap_path()).as_bytes(), @@ -663,7 +663,7 @@ impl MailBackend for ImapType { format!("UNSUBSCRIBE \"{}\"", mailboxes[&mailbox_hash].imap_path()).as_bytes(), )?; } - conn_lck.read_response(&mut response)?; + conn_lck.read_response(&mut response, RequiredResponses::empty())?; } let ret: Result<()> = ImapResponse::from(&response).into(); @@ -680,7 +680,7 @@ impl MailBackend for ImapType { mailbox_hash: MailboxHash, mut new_path: String, ) -> Result { - let mut mailboxes = self.mailboxes.write().unwrap(); + let mut mailboxes = self.uid_store.mailboxes.write().unwrap(); let permissions = mailboxes[&mailbox_hash].permissions(); if !permissions.delete_mailbox { return Err(MeliError::new(format!("You do not have permission to rename mailbox `{}` (rename is equivalent to delete + create). Set permissions for this mailbox are {}", mailboxes[&mailbox_hash].name(), permissions))); @@ -693,7 +693,7 @@ impl MailBackend for ImapType { ); } { - let mut conn_lck = try_lock(&self.connection)?; + let mut conn_lck = try_lock(&self.connection, None)?; conn_lck.send_command( debug!(format!( "RENAME \"{}\" \"{}\"", @@ -702,7 +702,7 @@ impl MailBackend for ImapType { )) .as_bytes(), )?; - conn_lck.read_response(&mut response)?; + conn_lck.read_response(&mut response, RequiredResponses::empty())?; } let new_hash = get_path_hash!(new_path.as_str()); let ret: Result<()> = ImapResponse::from(&response).into(); @@ -711,7 +711,7 @@ impl MailBackend for ImapType { drop(mailboxes); self.mailboxes().map_err(|err| format!("Mailbox rename was succesful (returned `{}`) but listing mailboxes afterwards returned `{}`", response, err))?; Ok(BackendMailbox::clone( - &self.mailboxes.read().unwrap()[&new_hash], + &self.uid_store.mailboxes.read().unwrap()[&new_hash], )) } @@ -720,7 +720,7 @@ impl MailBackend for ImapType { mailbox_hash: MailboxHash, _val: crate::backends::MailboxPermissions, ) -> Result<()> { - let mailboxes = self.mailboxes.write().unwrap(); + let mailboxes = self.uid_store.mailboxes.write().unwrap(); let permissions = mailboxes[&mailbox_hash].permissions(); if !permissions.change_permissions { return Err(MeliError::new(format!("You do not have permission to change permissions for mailbox `{}`. Set permissions for this mailbox are {}", mailboxes[&mailbox_hash].name(), permissions))); @@ -829,15 +829,11 @@ impl MailBackend for ImapType { let mut query_str = String::new(); rec(&query, &mut query_str); - let mailboxes_lck = self.mailboxes.read()?; let mut response = String::with_capacity(8 * 1024); - let mut conn = try_lock(&self.connection)?; - conn.send_command( - format!("EXAMINE \"{}\"", mailboxes_lck[&mailbox_hash].imap_path()).as_bytes(), - )?; - conn.read_response(&mut response)?; + let mut conn = try_lock(&self.connection, Some(std::time::Duration::new(2, 0)))?; + conn.examine_mailbox(mailbox_hash, &mut response)?; conn.send_command(format!("UID SEARCH CHARSET UTF-8 {}", query_str).as_bytes())?; - conn.read_response(&mut response)?; + conn.read_response(&mut response, RequiredResponses::SEARCH)?; debug!(&response); let mut lines = response.lines(); @@ -901,11 +897,6 @@ impl ImapType { danger_accept_invalid_certs, protocol: ImapProtocol::IMAP, }; - let online = Arc::new(Mutex::new(( - Instant::now(), - Err(MeliError::new("Account is uninitialised.")), - ))); - let connection = ImapConnection::new_connection(&server_conf, online.clone()); let account_hash = { let mut hasher = DefaultHasher::new(); hasher.write(s.name.as_bytes()); @@ -915,27 +906,26 @@ impl ImapType { account_hash, ..UIDStore::default() }); + let connection = ImapConnection::new_connection(&server_conf, uid_store.clone()); Ok(Box::new(ImapType { account_name: s.name().to_string(), - online, server_conf, is_subscribed: Arc::new(IsSubscribedFn(is_subscribed)), can_create_flags: Arc::new(Mutex::new(false)), - tag_index: Arc::new(RwLock::new(Default::default())), - mailboxes: Arc::new(RwLock::new(Default::default())), connection: Arc::new(Mutex::new(connection)), uid_store, })) } pub fn shell(&mut self) { - let mut conn = ImapConnection::new_connection(&self.server_conf, self.online.clone()); + let mut conn = ImapConnection::new_connection(&self.server_conf, self.uid_store.clone()); conn.connect().unwrap(); let mut res = String::with_capacity(8 * 1024); conn.send_command(b"NOOP").unwrap(); - conn.read_response(&mut res).unwrap(); + conn.read_response(&mut res, RequiredResponses::empty()) + .unwrap(); let mut input = String::new(); loop { @@ -968,9 +958,9 @@ impl ImapType { ) -> Result> { let mut mailboxes: HashMap = Default::default(); let mut res = String::with_capacity(8 * 1024); - let mut conn = try_lock(&connection)?; + let mut conn = try_lock(&connection, Some(std::time::Duration::new(2, 0)))?; conn.send_command(b"LIST \"\" \"*\"")?; - conn.read_response(&mut res)?; + conn.read_response(&mut res, RequiredResponses::LIST_REQUIRED)?; debug!("out: {}", &res); let mut lines = res.lines(); /* Remove "M__ OK .." line */ @@ -1010,7 +1000,7 @@ impl ImapType { } mailboxes.retain(|_, v| v.hash != 0); conn.send_command(b"LSUB \"\" \"*\"")?; - conn.read_response(&mut res)?; + conn.read_response(&mut res, RequiredResponses::LSUB_REQUIRED)?; debug!("out: {}", &res); let mut lines = res.lines(); /* Remove "M__ OK .." line */ @@ -1033,7 +1023,7 @@ impl ImapType { } pub fn capabilities(&self) -> Vec { - try_lock(&self.connection) + try_lock(&self.connection, Some(std::time::Duration::new(2, 0))) .map(|c| { c.capabilities .iter() diff --git a/melib/src/backends/imap/connection.rs b/melib/src/backends/imap/connection.rs index 570abaf1..2fc7cc6d 100644 --- a/melib/src/backends/imap/connection.rs +++ b/melib/src/backends/imap/connection.rs @@ -19,7 +19,8 @@ * along with meli. If not, see . */ -use super::protocol_parser::{ImapLineSplit, ImapResponse}; +use super::protocol_parser::{ImapLineSplit, ImapResponse, RequiredResponses}; +use crate::backends::MailboxHash; use crate::email::parser::BytesExt; use crate::error::*; use std::io::Read; @@ -29,11 +30,11 @@ use native_tls::TlsConnector; use std::collections::HashSet; use std::iter::FromIterator; use std::net::SocketAddr; -use std::sync::{Arc, Mutex}; +use std::sync::Arc; use std::time::Instant; use super::protocol_parser; -use super::{Capabilities, ImapServerConf}; +use super::{Capabilities, ImapServerConf, UIDStore}; #[derive(Debug, Clone, Copy, PartialEq)] pub enum ImapProtocol { @@ -53,7 +54,8 @@ pub struct ImapConnection { pub stream: Result, pub server_conf: ImapServerConf, pub capabilities: Capabilities, - pub online: Arc)>>, + pub uid_store: Arc, + pub current_mailbox: Option, } impl Drop for ImapStream { @@ -390,18 +392,19 @@ impl ImapStream { impl ImapConnection { pub fn new_connection( server_conf: &ImapServerConf, - online: Arc)>>, + uid_store: Arc, ) -> ImapConnection { ImapConnection { stream: Err(MeliError::new("Offline".to_string())), server_conf: server_conf.clone(), capabilities: Capabilities::default(), - online, + uid_store, + current_mailbox: None, } } pub fn connect(&mut self) -> Result<()> { - if let (instant, ref mut status @ Ok(())) = *self.online.lock().unwrap() { + if let (instant, ref mut status @ Ok(())) = *self.uid_store.is_online.lock().unwrap() { if Instant::now().duration_since(instant) >= std::time::Duration::new(60 * 30, 0) { *status = Err(MeliError::new("Connection timed out")); self.stream = Err(MeliError::new("Connection timed out")); @@ -412,12 +415,12 @@ impl ImapConnection { } let new_stream = ImapStream::new_connection(&self.server_conf); if new_stream.is_err() { - *self.online.lock().unwrap() = ( + *self.uid_store.is_online.lock().unwrap() = ( Instant::now(), Err(new_stream.as_ref().unwrap_err().clone()), ); } else { - *self.online.lock().unwrap() = (Instant::now(), Ok(())); + *self.uid_store.is_online.lock().unwrap() = (Instant::now(), Ok(())); } let (capabilities, stream) = new_stream?; self.stream = Ok(stream); @@ -425,21 +428,43 @@ impl ImapConnection { Ok(()) } - pub fn read_response(&mut self, ret: &mut String) -> Result<()> { - self.try_send(|s| s.read_response(ret))?; + pub fn read_response( + &mut self, + ret: &mut String, + required_responses: RequiredResponses, + ) -> Result<()> { + let mut response = String::new(); + ret.clear(); + self.try_send(|s| s.read_response(&mut response))?; match self.server_conf.protocol { ImapProtocol::IMAP => { - let r: ImapResponse = ImapResponse::from(&ret); + let r: ImapResponse = ImapResponse::from(&response); if let ImapResponse::Bye(ref response_code) = r { self.stream = Err(MeliError::new(format!( "Offline: received BYE: {:?}", response_code ))); + ret.push_str(&response); + } else { + /*debug!( + "check every line for required_responses: {:#?}", + &required_responses + );*/ + for l in response.split_rn() { + /*debug!("check line: {}", &l);*/ + if required_responses.check(l) || !self.process_untagged(l)? { + ret.push_str(l); + } + } + //ret.push_str(&response); } r.into() } - ImapProtocol::ManageSieve => Ok(()), + ImapProtocol::ManageSieve => { + ret.push_str(&response); + Ok(()) + } } } @@ -471,7 +496,7 @@ impl ImapConnection { &mut self, mut action: impl FnMut(&mut ImapStream) -> Result<()>, ) -> Result<()> { - if let (instant, ref mut status @ Ok(())) = *self.online.lock().unwrap() { + if let (instant, ref mut status @ Ok(())) = *self.uid_store.is_online.lock().unwrap() { if Instant::now().duration_since(instant) >= std::time::Duration::new(60 * 30, 0) { *status = Err(MeliError::new("Connection timed out")); self.stream = Err(MeliError::new("Connection timed out")); @@ -484,18 +509,80 @@ impl ImapConnection { } let new_stream = ImapStream::new_connection(&self.server_conf); if new_stream.is_err() { - *self.online.lock().unwrap() = ( + *self.uid_store.is_online.lock().unwrap() = ( Instant::now(), Err(new_stream.as_ref().unwrap_err().clone()), ); } else { - *self.online.lock().unwrap() = (Instant::now(), Ok(())); + *self.uid_store.is_online.lock().unwrap() = (Instant::now(), Ok(())); } let (capabilities, stream) = new_stream?; self.stream = Ok(stream); self.capabilities = capabilities; Err(MeliError::new("Connection timed out")) } + + pub fn select_mailbox(&mut self, mailbox_hash: MailboxHash, ret: &mut String) -> Result<()> { + self.send_command( + format!( + "SELECT \"{}\"", + self.uid_store.mailboxes.read().unwrap()[&mailbox_hash].imap_path() + ) + .as_bytes(), + )?; + self.read_response(ret, RequiredResponses::SELECT_REQUIRED)?; + debug!("select response {}", ret); + self.current_mailbox = Some(mailbox_hash); + Ok(()) + } + + pub fn examine_mailbox(&mut self, mailbox_hash: MailboxHash, ret: &mut String) -> Result<()> { + self.send_command( + format!( + "EXAMINE \"{}\"", + self.uid_store.mailboxes.read().unwrap()[&mailbox_hash].imap_path() + ) + .as_bytes(), + )?; + self.read_response(ret, RequiredResponses::EXAMINE_REQUIRED)?; + debug!("examine response {}", ret); + self.current_mailbox = Some(mailbox_hash); + Ok(()) + } + + pub fn unselect(&mut self) -> Result<()> { + if let Some(mailbox_hash) = self.current_mailbox.take() { + let mut response = String::with_capacity(8 * 1024); + if self + .capabilities + .iter() + .any(|cap| cap.eq_ignore_ascii_case(b"UNSELECT")) + { + self.send_command(b"UNSELECT")?; + self.read_response(&mut response, RequiredResponses::empty())?; + } else { + /* `RFC3691 - UNSELECT Command` states: "[..] IMAP4 provides this + * functionality (via a SELECT command with a nonexistent mailbox name or + * reselecting the same mailbox with EXAMINE command)[..] + */ + + self.select_mailbox(mailbox_hash, &mut response)?; + self.examine_mailbox(mailbox_hash, &mut response)?; + } + } + Ok(()) + } + + pub fn add_refresh_event(&mut self, ev: crate::backends::RefreshEvent) { + if let Some(ref sender) = self.uid_store.sender.read().unwrap().as_ref() { + sender.send(ev); + for ev in self.uid_store.refresh_events.lock().unwrap().drain(..) { + sender.send(ev); + } + } else { + self.uid_store.refresh_events.lock().unwrap().push(ev); + } + } } pub struct ImapBlockingConnection { diff --git a/melib/src/backends/imap/managesieve.rs b/melib/src/backends/imap/managesieve.rs index f31489cb..0f731fc2 100644 --- a/melib/src/backends/imap/managesieve.rs +++ b/melib/src/backends/imap/managesieve.rs @@ -19,7 +19,7 @@ * along with meli. If not, see . */ -use super::{ImapConnection, ImapProtocol, ImapServerConf}; +use super::{ImapConnection, ImapProtocol, ImapServerConf, UIDStore}; use crate::conf::AccountSettings; use crate::error::{MeliError, Result}; use crate::get_conf_val; @@ -95,11 +95,14 @@ pub fn new_managesieve_connection(s: &AccountSettings) -> Result danger_accept_invalid_certs, protocol: ImapProtocol::ManageSieve, }; - let online = Arc::new(Mutex::new(( - Instant::now(), - Err(MeliError::new("Account is uninitialised.")), - ))); - Ok(ImapConnection::new_connection(&server_conf, online)) + let uid_store = Arc::new(UIDStore { + is_online: Arc::new(Mutex::new(( + Instant::now(), + Err(MeliError::new("Account is uninitialised.")), + ))), + ..Default::default() + }); + Ok(ImapConnection::new_connection(&server_conf, uid_store)) } impl ManageSieve for ImapConnection { diff --git a/melib/src/backends/imap/operations.rs b/melib/src/backends/imap/operations.rs index 746d5b5f..165537cb 100644 --- a/melib/src/backends/imap/operations.rs +++ b/melib/src/backends/imap/operations.rs @@ -35,19 +35,19 @@ pub struct ImapOp { headers: Option, body: Option, mailbox_path: String, + mailbox_hash: MailboxHash, flags: Cell>, connection: Arc>, uid_store: Arc, - tag_index: Arc>>, } impl ImapOp { pub fn new( uid: usize, mailbox_path: String, + mailbox_hash: MailboxHash, connection: Arc>, uid_store: Arc, - tag_index: Arc>>, ) -> Self { ImapOp { uid, @@ -56,16 +56,16 @@ impl ImapOp { headers: None, body: None, mailbox_path, + mailbox_hash, flags: Cell::new(None), uid_store, - tag_index, } } } impl BackendOp for ImapOp { fn description(&self) -> String { - unimplemented!(); + format!("Message in mailbox: {}", &self.mailbox_path) } fn as_bytes(&mut self) -> Result<&[u8]> { @@ -79,11 +79,11 @@ impl BackendOp for ImapOp { drop(bytes_cache); let mut response = String::with_capacity(8 * 1024); { - let mut conn = try_lock(&self.connection)?; - conn.send_command(format!("SELECT \"{}\"", &self.mailbox_path,).as_bytes())?; - conn.read_response(&mut response)?; + let mut conn = + try_lock(&self.connection, Some(std::time::Duration::new(2, 0)))?; + conn.examine_mailbox(self.mailbox_hash, &mut response)?; conn.send_command(format!("UID FETCH {} (FLAGS RFC822)", self.uid).as_bytes())?; - conn.read_response(&mut response)?; + conn.read_response(&mut response, RequiredResponses::FETCH_REQUIRED)?; } debug!( "fetch response is {} bytes and {} lines", @@ -127,15 +127,15 @@ impl BackendOp for ImapOp { self.flags.set(cache.flags); } else { let mut response = String::with_capacity(8 * 1024); - let mut conn = or_return_default!(try_lock(&self.connection)); - or_return_default!( - conn.send_command(format!("EXAMINE \"{}\"", &self.mailbox_path,).as_bytes()) - ); - or_return_default!(conn.read_response(&mut response)); + let mut conn = or_return_default!(try_lock( + &self.connection, + Some(std::time::Duration::new(2, 0)) + )); + or_return_default!(conn.examine_mailbox(self.mailbox_hash, &mut response)); or_return_default!( conn.send_command(format!("UID FETCH {} FLAGS", self.uid).as_bytes()) ); - or_return_default!(conn.read_response(&mut response)); + or_return_default!(conn.read_response(&mut response, RequiredResponses::FETCH_REQUIRED)); debug!( "fetch response is {} bytes and {} lines", response.len(), @@ -167,9 +167,8 @@ impl BackendOp for ImapOp { flags.set(f, value); let mut response = String::with_capacity(8 * 1024); - let mut conn = try_lock(&self.connection)?; - conn.send_command(format!("SELECT \"{}\"", &self.mailbox_path,).as_bytes())?; - conn.read_response(&mut response)?; + let mut conn = try_lock(&self.connection, Some(std::time::Duration::new(2, 0)))?; + conn.select_mailbox(self.mailbox_hash, &mut response)?; debug!(&response); conn.send_command( format!( @@ -179,7 +178,7 @@ impl BackendOp for ImapOp { ) .as_bytes(), )?; - conn.read_response(&mut response)?; + conn.read_response(&mut response, RequiredResponses::STORE_REQUIRED)?; debug!(&response); match protocol_parser::uid_fetch_flags_response(response.as_bytes()) .to_full_result() @@ -203,9 +202,8 @@ impl BackendOp for ImapOp { fn set_tag(&mut self, envelope: &mut Envelope, tag: String, value: bool) -> Result<()> { let mut response = String::with_capacity(8 * 1024); - let mut conn = try_lock(&self.connection)?; - conn.send_command(format!("SELECT \"{}\"", &self.mailbox_path,).as_bytes())?; - conn.read_response(&mut response)?; + let mut conn = try_lock(&self.connection, Some(std::time::Duration::new(2, 0)))?; + conn.select_mailbox(self.mailbox_hash, &mut response)?; conn.send_command( format!( "UID STORE {} {}FLAGS.SILENT ({})", @@ -215,15 +213,15 @@ impl BackendOp for ImapOp { ) .as_bytes(), )?; - conn.read_response(&mut response)?; + conn.read_response(&mut response, RequiredResponses::STORE_REQUIRED)?; protocol_parser::uid_fetch_flags_response(response.as_bytes()) .to_full_result() .map_err(MeliError::from)?; let hash = tag_hash!(tag); if value { - self.tag_index.write().unwrap().insert(hash, tag); + self.uid_store.tag_index.write().unwrap().insert(hash, tag); } else { - self.tag_index.write().unwrap().remove(&hash); + self.uid_store.tag_index.write().unwrap().remove(&hash); } if !envelope.labels().iter().any(|&h_| h_ == hash) { if value { diff --git a/melib/src/backends/imap/protocol_parser.rs b/melib/src/backends/imap/protocol_parser.rs index 9eb2e046..5a656fa8 100644 --- a/melib/src/backends/imap/protocol_parser.rs +++ b/melib/src/backends/imap/protocol_parser.rs @@ -25,6 +25,98 @@ use crate::get_path_hash; use nom::{digit, is_digit, rest, IResult}; use std::str::FromStr; +bitflags! { + #[derive(Default, Serialize, Deserialize)] + pub struct RequiredResponses: u64 { + const CAPABILITY = 0b0000_0000_0000_0001; + const BYE = 0b0000_0000_0000_0010; + const FLAGS = 0b0000_0000_0000_0100; + const EXISTS = 0b0000_0000_0000_1000; + const RECENT = 0b0000_0000_0001_0000; + const UNSEEN = 0b0000_0000_0010_0000; + const PERMANENTFLAGS = 0b0000_0000_0100_0000; + const UIDNEXT = 0b0000_0000_1000_0000; + const UIDVALIDITY = 0b0000_0001_0000_0000; + const LIST = 0b0000_0010_0000_0000; + const LSUB = 0b0000_0100_0000_0000; + const STATUS = 0b0000_1000_0000_0000; + const EXPUNGE = 0b0001_0000_0000_0000; + const SEARCH = 0b0010_0000_0000_0000; + const FETCH = 0b0100_0000_0000_0000; + const CAPABILITY_REQUIRED = Self::CAPABILITY.bits; + const LOGOUT_REQUIRED = Self::BYE.bits; + const SELECT_REQUIRED = Self::FLAGS.bits | Self::EXISTS.bits | Self::RECENT.bits | Self::UNSEEN.bits | Self::PERMANENTFLAGS.bits | Self::UIDNEXT.bits | Self::UIDVALIDITY.bits; + const EXAMINE_REQUIRED = Self::FLAGS.bits | Self::EXISTS.bits | Self::RECENT.bits | Self::UNSEEN.bits | Self::PERMANENTFLAGS.bits | Self::UIDNEXT.bits | Self::UIDVALIDITY.bits; + const LIST_REQUIRED = Self::LIST.bits; + const LSUB_REQUIRED = Self::LSUB.bits; + const FETCH_REQUIRED = Self::FETCH.bits; + const STORE_REQUIRED = Self::FETCH.bits; + } +} + +impl RequiredResponses { + pub fn check(&self, line: &str) -> bool { + if !line.starts_with("* ") { + return false; + } + let line = &line["* ".len()..]; + let mut ret = false; + if self.intersects(RequiredResponses::CAPABILITY) { + ret |= line.starts_with("CAPABILITY"); + } + if self.intersects(RequiredResponses::BYE) { + ret |= line.starts_with("BYE"); + } + if self.intersects(RequiredResponses::FLAGS) { + ret |= line.starts_with("FLAGS"); + } + if self.intersects(RequiredResponses::EXISTS) { + ret |= line.ends_with("EXISTS\r\n"); + } + if self.intersects(RequiredResponses::RECENT) { + ret |= line.ends_with("RECENT\r\n"); + } + if self.intersects(RequiredResponses::UNSEEN) { + ret |= line.starts_with("UNSEEN"); + } + if self.intersects(RequiredResponses::PERMANENTFLAGS) { + ret |= line.starts_with("PERMANENTFLAGS"); + } + if self.intersects(RequiredResponses::UIDNEXT) { + ret |= line.starts_with("UIDNEXT"); + } + if self.intersects(RequiredResponses::UIDVALIDITY) { + ret |= line.starts_with("UIDVALIDITY"); + } + if self.intersects(RequiredResponses::LIST) { + ret |= line.starts_with("LIST"); + } + if self.intersects(RequiredResponses::LSUB) { + ret |= line.starts_with("LSUB"); + } + if self.intersects(RequiredResponses::STATUS) { + ret |= line.starts_with("STATUS"); + } + if self.intersects(RequiredResponses::EXPUNGE) { + ret |= line.ends_with("EXPUNGE\r\n"); + } + if self.intersects(RequiredResponses::SEARCH) { + ret |= line.starts_with("SEARCH"); + } + if self.intersects(RequiredResponses::FETCH) { + let mut ptr = 0; + for i in 0..line.len() { + if !line.as_bytes()[i].is_ascii_digit() { + ptr = i; + break; + } + } + ret |= line[ptr..].trim_start().starts_with("FETCH"); + } + ret + } +} + #[derive(Debug)] pub struct Alert(String); pub type ImapParseResult<'a, T> = Result<(&'a str, T, Option)>; @@ -649,6 +741,9 @@ pub enum UntaggedResponse { /// ``` Recent(usize), Fetch(usize, (Flag, Vec)), + Bye { + reason: String, + }, } named!( diff --git a/melib/src/backends/imap/untagged.rs b/melib/src/backends/imap/untagged.rs new file mode 100644 index 00000000..eaa8a1ef --- /dev/null +++ b/melib/src/backends/imap/untagged.rs @@ -0,0 +1,298 @@ +/* + * meli - imap + * + * Copyright 2020 Manos Pitsidianakis + * + * This file is part of meli. + * + * meli is free software: you can redistribute it and/or modify + * it under the terms of the GNU General Public License as published by + * the Free Software Foundation, either version 3 of the License, or + * (at your option) any later version. + * + * meli is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU General Public License for more details. + * + * You should have received a copy of the GNU General Public License + * along with meli. If not, see . + */ + +use super::ImapConnection; +use crate::backends::imap::protocol_parser::{ + ImapLineSplit, RequiredResponses, UidFetchResponse, UntaggedResponse, +}; +use crate::backends::BackendMailbox; +use crate::backends::{ + RefreshEvent, + RefreshEventKind::{self, *}, +}; +use crate::email::Envelope; +use crate::error::*; +use std::time::Instant; + +impl ImapConnection { + pub fn process_untagged(&mut self, line: &str) -> Result { + macro_rules! try_fail { + ($mailbox_hash: expr, $($result:expr)+) => { + $(if let Err(err) = $result { + *self.uid_store.is_online.lock().unwrap() = ( + Instant::now(), + Err(err.clone()), + ); + debug!("failure: {}", err.to_string()); + self.uid_store.refresh_events.lock().unwrap().push(RefreshEvent { + account_hash: self.uid_store.account_hash, + mailbox_hash: $mailbox_hash, + kind: RefreshEventKind::Failure(err.clone()), + }); + Err(err) + } else { Ok(()) }?;)+ + }; + } + //FIXME + let mailbox_hash = if let Some(mailbox_hash) = self.current_mailbox { + mailbox_hash + } else { + return Ok(false); + }; + let mailbox = + std::clone::Clone::clone(&self.uid_store.mailboxes.read().unwrap()[&mailbox_hash]); + + let mut response = String::with_capacity(8 * 1024); + let untagged_response = + match super::protocol_parser::untagged_responses(line.as_bytes()).to_full_result() { + Ok(None) | Err(_) => { + return Ok(false); + } + Ok(Some(r)) => r, + }; + match untagged_response { + UntaggedResponse::Bye { reason } => { + *self.uid_store.is_online.lock().unwrap() = + (std::time::Instant::now(), Err(reason.into())); + } + UntaggedResponse::Expunge(n) => { + debug!("expunge {}", n); + } + UntaggedResponse::Exists(n) => { + /* UID FETCH ALL UID, cross-ref, then FETCH difference headers + * */ + let mut prev_exists = mailbox.exists.lock().unwrap(); + debug!("exists {}", n); + if n > *prev_exists { + try_fail!( + mailbox_hash, + self.send_command( + &[ + b"FETCH", + format!("{}:{}", *prev_exists + 1, n).as_bytes(), + b"(UID FLAGS RFC822)", + ] + .join(&b' '), + ) + self.read_response(&mut response, RequiredResponses::FETCH_REQUIRED) + ); + match super::protocol_parser::uid_fetch_responses(&response) { + Ok((_, v, _)) => { + 'fetch_responses: for UidFetchResponse { + uid, flags, body, .. + } in v + { + if self.uid_store.uid_index.lock().unwrap().contains_key(&uid) { + continue 'fetch_responses; + } + if let Ok(mut env) = Envelope::from_bytes( + body.unwrap(), + flags.as_ref().map(|&(f, _)| f), + ) { + self.uid_store + .hash_index + .lock() + .unwrap() + .insert(env.hash(), (uid, mailbox_hash)); + self.uid_store + .uid_index + .lock() + .unwrap() + .insert(uid, env.hash()); + if let Some((_, keywords)) = flags { + let mut tag_lck = self.uid_store.tag_index.write().unwrap(); + for f in keywords { + let hash = tag_hash!(f); + if !tag_lck.contains_key(&hash) { + tag_lck.insert(hash, f); + } + env.labels_mut().push(hash); + } + } + debug!( + "Create event {} {} {}", + env.hash(), + env.subject(), + mailbox.path(), + ); + if !env.is_seen() { + *mailbox.unseen.lock().unwrap() += 1; + } + self.uid_store.refresh_events.lock().unwrap().push( + RefreshEvent { + account_hash: self.uid_store.account_hash, + mailbox_hash, + kind: Create(Box::new(env)), + }, + ); + } + } + } + Err(e) => { + debug!(e); + } + } + *prev_exists = n; + } else if n < *prev_exists { + *prev_exists = n; + } + } + UntaggedResponse::Recent(_) => { + try_fail!( + mailbox_hash, + self.send_command(b"UID SEARCH RECENT") + self.read_response(&mut response, RequiredResponses::SEARCH) + ); + match super::protocol_parser::search_results_raw(response.as_bytes()) + .to_full_result() + .map_err(MeliError::from) + { + Ok(&[]) => { + debug!("UID SEARCH RECENT returned no results"); + } + Ok(v) => { + try_fail!( + mailbox_hash, + self.send_command( + &[b"UID FETCH", v, b"(FLAGS RFC822)"] + .join(&b' '), + ) + self.read_response(&mut response, RequiredResponses::FETCH_REQUIRED) + ); + debug!(&response); + match super::protocol_parser::uid_fetch_responses(&response) { + Ok((_, v, _)) => { + for UidFetchResponse { + uid, flags, body, .. + } in v + { + *mailbox.exists.lock().unwrap() += 1; + if !self.uid_store.uid_index.lock().unwrap().contains_key(&uid) + { + if let Ok(mut env) = Envelope::from_bytes( + body.unwrap(), + flags.as_ref().map(|&(f, _)| f), + ) { + self.uid_store + .hash_index + .lock() + .unwrap() + .insert(env.hash(), (uid, mailbox_hash)); + self.uid_store + .uid_index + .lock() + .unwrap() + .insert(uid, env.hash()); + debug!( + "Create event {} {} {}", + env.hash(), + env.subject(), + mailbox.path(), + ); + if let Some((_, keywords)) = flags { + let mut tag_lck = + self.uid_store.tag_index.write().unwrap(); + for f in keywords { + let hash = tag_hash!(f); + if !tag_lck.contains_key(&hash) { + tag_lck.insert(hash, f); + } + env.labels_mut().push(hash); + } + } + if !env.is_seen() { + *mailbox.unseen.lock().unwrap() += 1; + } + + self.uid_store.refresh_events.lock().unwrap().push( + RefreshEvent { + account_hash: self.uid_store.account_hash, + mailbox_hash, + kind: Create(Box::new(env)), + }, + ); + } + } + } + } + Err(e) => { + debug!(e); + } + } + } + Err(e) => { + debug!( + "UID SEARCH RECENT err: {}\nresp: {}", + e.to_string(), + &response + ); + } + } + } + UntaggedResponse::Fetch(msg_seq, flags) => { + /* a * {msg_seq} FETCH (FLAGS ({flags})) was received, so find out UID from msg_seq + * and send update + */ + debug!("fetch {} {:?}", msg_seq, flags); + try_fail!( + mailbox_hash, + self.send_command( + &[ + b"UID SEARCH ", + format!("{}", msg_seq).as_bytes(), + ] + .join(&b' '), + ) + self.read_response(&mut response, RequiredResponses::SEARCH) + ); + debug!(&response); + match super::protocol_parser::search_results( + response.split_rn().next().unwrap_or("").as_bytes(), + ) + .to_full_result() + { + Ok(mut v) => { + if let Some(uid) = v.pop() { + if let Some(env_hash) = + self.uid_store.uid_index.lock().unwrap().get(&uid) + { + self.uid_store + .refresh_events + .lock() + .unwrap() + .push(RefreshEvent { + account_hash: self.uid_store.account_hash, + mailbox_hash, + kind: NewFlags(*env_hash, flags), + }); + } + } + } + Err(e) => { + debug!(&response); + debug!(e); + } + } + } + } + Ok(true) + } +} diff --git a/melib/src/backends/imap/watch.rs b/melib/src/backends/imap/watch.rs index f0b997fd..f6a4dec4 100644 --- a/melib/src/backends/imap/watch.rs +++ b/melib/src/backends/imap/watch.rs @@ -20,28 +20,30 @@ */ use super::*; use crate::backends::SpecialUsageMailbox; -use std::sync::{Arc, Mutex, RwLock}; +use crate::email::parser::BytesExt; +use crate::email::parser::BytesIterExt; +use std::sync::{Arc, Mutex}; /// Arguments for IMAP watching functions pub struct ImapWatchKit { pub account_hash: AccountHash, pub conn: ImapConnection, - pub is_online: Arc)>>, pub main_conn: Arc>, pub uid_store: Arc, - pub mailboxes: Arc>>, - pub sender: RefreshEventConsumer, pub work_context: WorkContext, - pub tag_index: Arc>>, } macro_rules! exit_on_error { - ($sender:expr, $account_hash:ident, $mailbox_hash:ident, $work_context:ident, $thread_id:ident, $($result:expr)+) => { + ($conn:expr, $account_hash:ident, $mailbox_hash:ident, $work_context:ident, $thread_id:ident, $($result:expr)+) => { $(if let Err(e) = $result { + *$conn.uid_store.is_online.lock().unwrap() = ( + Instant::now(), + Err(e.clone()), + ); debug!("failure: {}", e.to_string()); $work_context.set_status.send(($thread_id, e.to_string())).unwrap(); $work_context.finished.send($thread_id).unwrap(); - $sender.send(RefreshEvent { + $conn.add_refresh_event(RefreshEvent { account_hash: $account_hash, mailbox_hash: $mailbox_hash, kind: RefreshEventKind::Failure(e.clone()), @@ -54,18 +56,17 @@ macro_rules! exit_on_error { pub fn poll_with_examine(kit: ImapWatchKit) -> Result<()> { debug!("poll with examine"); let ImapWatchKit { - is_online, mut conn, main_conn, uid_store, - mailboxes, - sender, work_context, - tag_index, account_hash, } = kit; loop { - if super::try_lock(&is_online)?.1.is_ok() { + if super::try_lock(&uid_store.is_online, Some(std::time::Duration::new(10, 0)))? + .1 + .is_ok() + { break; } std::thread::sleep(std::time::Duration::from_millis(100)); @@ -79,7 +80,7 @@ pub fn poll_with_examine(kit: ImapWatchKit) -> Result<()> { .send((thread_id, "sleeping...".to_string())) .unwrap(); std::thread::sleep(std::time::Duration::from_millis(5 * 60 * 1000)); - let mailboxes = mailboxes.read()?; + let mailboxes = uid_store.mailboxes.read()?; for mailbox in mailboxes.values() { work_context .set_status @@ -88,19 +89,11 @@ pub fn poll_with_examine(kit: ImapWatchKit) -> Result<()> { format!("examining `{}` for updates...", mailbox.path()), )) .unwrap(); - examine_updates( - account_hash, - mailbox, - &sender, - &mut conn, - &uid_store, - &work_context, - &tag_index, - )?; + examine_updates(account_hash, mailbox, &mut conn, &uid_store, &work_context)?; } - let mut main_conn = super::try_lock(&main_conn)?; + let mut main_conn = super::try_lock(&main_conn, Some(std::time::Duration::new(10, 0)))?; main_conn.send_command(b"NOOP")?; - main_conn.read_response(&mut response)?; + main_conn.read_response(&mut response, RequiredResponses::empty())?; } } @@ -110,24 +103,24 @@ pub fn idle(kit: ImapWatchKit) -> Result<()> { * minutes wake up and poll the others */ let ImapWatchKit { mut conn, - is_online, main_conn, uid_store, - mailboxes, - sender, work_context, - tag_index, account_hash, } = kit; loop { - if super::try_lock(&is_online)?.1.is_ok() { + if super::try_lock(&uid_store.is_online, Some(std::time::Duration::new(10, 0)))? + .1 + .is_ok() + { break; } std::thread::sleep(std::time::Duration::from_millis(100)); } conn.connect()?; let thread_id: std::thread::ThreadId = std::thread::current().id(); - let mailbox: ImapMailbox = match mailboxes + let mailbox: ImapMailbox = match uid_store + .mailboxes .read() .unwrap() .values() @@ -142,7 +135,7 @@ pub fn idle(kit: ImapWatchKit) -> Result<()> { .set_status .send((thread_id, err.to_string())) .unwrap(); - sender.send(RefreshEvent { + conn.add_refresh_event(RefreshEvent { account_hash, mailbox_hash: 0, kind: RefreshEventKind::Failure(err.clone()), @@ -153,13 +146,13 @@ pub fn idle(kit: ImapWatchKit) -> Result<()> { let mailbox_hash = mailbox.hash(); let mut response = String::with_capacity(8 * 1024); exit_on_error!( - sender, - account_hash, + conn, + account_hash, mailbox_hash, work_context, thread_id, conn.send_command(format!("SELECT \"{}\"", mailbox.imap_path()).as_bytes()) - conn.read_response(&mut response) + conn.read_response(&mut response, RequiredResponses::SELECT_REQUIRED) ); debug!("select response {}", &response); { @@ -171,7 +164,7 @@ pub fn idle(kit: ImapWatchKit) -> Result<()> { if let Some(v) = uidvalidities.get_mut(&mailbox_hash) { if *v != ok.uidvalidity { - sender.send(RefreshEvent { + conn.add_refresh_event(RefreshEvent { account_hash, mailbox_hash, kind: RefreshEventKind::Rescan, @@ -185,12 +178,12 @@ pub fn idle(kit: ImapWatchKit) -> Result<()> { *v = ok.uidvalidity; } } else { - sender.send(RefreshEvent { + conn.add_refresh_event(RefreshEvent { account_hash, mailbox_hash, kind: RefreshEventKind::Rescan, }); - sender.send(RefreshEvent { + conn.add_refresh_event(RefreshEvent { account_hash, mailbox_hash, kind: RefreshEventKind::Failure(MeliError::new(format!( @@ -211,7 +204,7 @@ pub fn idle(kit: ImapWatchKit) -> Result<()> { }; } exit_on_error!( - sender, + conn, account_hash, mailbox_hash, work_context, @@ -232,27 +225,28 @@ pub fn idle(kit: ImapWatchKit) -> Result<()> { while let Some(line) = iter.next() { let now = std::time::Instant::now(); if now.duration_since(beat) >= _26_mins { - let mut main_conn_lck = super::try_lock(&main_conn)?; + let mut main_conn_lck = + super::try_lock(&main_conn, Some(std::time::Duration::new(10, 0)))?; exit_on_error!( - sender, - account_hash, + iter.conn, + account_hash, mailbox_hash, work_context, thread_id, iter.conn.set_nonblocking(true) iter.conn.send_raw(b"DONE") - iter.conn.read_response(&mut response) + iter.conn.read_response(&mut response, RequiredResponses::empty()) iter.conn.send_command(b"IDLE") iter.conn.set_nonblocking(false) main_conn_lck.send_command(b"NOOP") - main_conn_lck.read_response(&mut response) + main_conn_lck.read_response(&mut response, RequiredResponses::empty()) ); beat = now; } if now.duration_since(watch) >= _5_mins { /* Time to poll all inboxes */ - let mut conn = try_lock(&main_conn)?; - for mailbox in mailboxes.read().unwrap().values() { + let mut conn = try_lock(&main_conn, Some(std::time::Duration::new(10, 0)))?; + for mailbox in uid_store.mailboxes.read().unwrap().values() { work_context .set_status .send(( @@ -261,20 +255,12 @@ pub fn idle(kit: ImapWatchKit) -> Result<()> { )) .unwrap(); exit_on_error!( - sender, + conn, account_hash, mailbox_hash, work_context, thread_id, - examine_updates( - account_hash, - mailbox, - &sender, - &mut conn, - &uid_store, - &work_context, - &tag_index - ) + examine_updates(account_hash, mailbox, &mut conn, &uid_store, &work_context,) ); } work_context @@ -283,27 +269,27 @@ pub fn idle(kit: ImapWatchKit) -> Result<()> { .unwrap(); watch = now; } + *uid_store.is_online.lock().unwrap() = (Instant::now(), Ok(())); match protocol_parser::untagged_responses(line.as_slice()) .to_full_result() .map_err(MeliError::from) { Ok(Some(Recent(r))) => { - let mut conn = super::try_lock(&main_conn)?; + let mut conn = super::try_lock(&main_conn, Some(std::time::Duration::new(10, 0)))?; work_context .set_status .send((thread_id, format!("got `{} RECENT` notification", r))) .unwrap(); /* UID SEARCH RECENT */ exit_on_error!( - sender, - account_hash, + conn, + account_hash, mailbox_hash, work_context, thread_id, - conn.send_command(b"EXAMINE INBOX") - conn.read_response(&mut response) + conn.examine_mailbox(mailbox_hash, &mut response) conn.send_command(b"UID SEARCH RECENT") - conn.read_response(&mut response) + conn.read_response(&mut response, RequiredResponses::SEARCH) ); match protocol_parser::search_results_raw(response.as_bytes()) .to_full_result() @@ -314,16 +300,16 @@ pub fn idle(kit: ImapWatchKit) -> Result<()> { } Ok(v) => { exit_on_error!( - sender, - account_hash, + conn, + account_hash, mailbox_hash, work_context, thread_id, conn.send_command( - &[b"UID FETCH", v, b"(FLAGS RFC822)"] + &[&b"UID FETCH"[..], &v.trim().split(|b| b == &b' ').join(b','), &b"(FLAGS RFC822)"[..]] .join(&b' '), ) - conn.read_response(&mut response) + conn.read_response(&mut response, RequiredResponses::FETCH_REQUIRED) ); debug!(&response); match protocol_parser::uid_fetch_responses(&response) { @@ -341,43 +327,50 @@ pub fn idle(kit: ImapWatchKit) -> Result<()> { format!("parsing {}/{} envelopes..", ctr, len), )) .unwrap(); - if let Ok(mut env) = Envelope::from_bytes( - body.unwrap(), - flags.as_ref().map(|&(f, _)| f), - ) { - ctr += 1; - uid_store - .hash_index - .lock() - .unwrap() - .insert(env.hash(), (uid, mailbox_hash)); - uid_store.uid_index.lock().unwrap().insert(uid, env.hash()); - debug!( - "Create event {} {} {}", - env.hash(), - env.subject(), - mailbox.path(), - ); - if let Some((_, keywords)) = flags { - let mut tag_lck = tag_index.write().unwrap(); - for f in keywords { - let hash = tag_hash!(f); - if !tag_lck.contains_key(&hash) { - tag_lck.insert(hash, f); + ctr += 1; + *mailbox.exists.lock().unwrap() += 1; + if !uid_store.uid_index.lock().unwrap().contains_key(&uid) { + if let Ok(mut env) = Envelope::from_bytes( + body.unwrap(), + flags.as_ref().map(|&(f, _)| f), + ) { + uid_store + .hash_index + .lock() + .unwrap() + .insert(env.hash(), (uid, mailbox_hash)); + uid_store + .uid_index + .lock() + .unwrap() + .insert(uid, env.hash()); + debug!( + "Create event {} {} {}", + env.hash(), + env.subject(), + mailbox.path(), + ); + if let Some((_, keywords)) = flags { + let mut tag_lck = + uid_store.tag_index.write().unwrap(); + for f in keywords { + let hash = tag_hash!(f); + if !tag_lck.contains_key(&hash) { + tag_lck.insert(hash, f); + } + env.labels_mut().push(hash); } - env.labels_mut().push(hash); } - } - if !env.is_seen() { - *mailbox.unseen.lock().unwrap() += 1; - } - *mailbox.exists.lock().unwrap() += 1; + if !env.is_seen() { + *mailbox.unseen.lock().unwrap() += 1; + } - sender.send(RefreshEvent { - account_hash, - mailbox_hash, - kind: Create(Box::new(env)), - }); + conn.add_refresh_event(RefreshEvent { + account_hash, + mailbox_hash, + kind: Create(Box::new(env)), + }); + } } } work_context @@ -407,7 +400,7 @@ pub fn idle(kit: ImapWatchKit) -> Result<()> { debug!("expunge {}", n); } Ok(Some(Exists(n))) => { - let mut conn = super::try_lock(&main_conn)?; + let mut conn = super::try_lock(&main_conn, Some(std::time::Duration::new(10, 0)))?; /* UID FETCH ALL UID, cross-ref, then FETCH difference headers * */ let mut prev_exists = mailbox.exists.lock().unwrap(); @@ -426,13 +419,12 @@ pub fn idle(kit: ImapWatchKit) -> Result<()> { .unwrap(); if n > *prev_exists { exit_on_error!( - sender, + conn, account_hash, mailbox_hash, work_context, thread_id, - conn.send_command(b"EXAMINE INBOX") - conn.read_response(&mut response) + conn.examine_mailbox(mailbox_hash, &mut response) conn.send_command( &[ b"FETCH", @@ -441,13 +433,13 @@ pub fn idle(kit: ImapWatchKit) -> Result<()> { ] .join(&b' '), ) - conn.read_response(&mut response) + conn.read_response(&mut response, RequiredResponses::FETCH_REQUIRED) ); match protocol_parser::uid_fetch_responses(&response) { Ok((_, v, _)) => { let len = v.len(); let mut ctr = 0; - for UidFetchResponse { + 'fetch_responses_b: for UidFetchResponse { uid, flags, body, .. } in v { @@ -460,7 +452,7 @@ pub fn idle(kit: ImapWatchKit) -> Result<()> { .unwrap(); if uid_store.uid_index.lock().unwrap().contains_key(&uid) { ctr += 1; - continue; + continue 'fetch_responses_b; } if let Ok(mut env) = Envelope::from_bytes( body.unwrap(), @@ -474,7 +466,7 @@ pub fn idle(kit: ImapWatchKit) -> Result<()> { .insert(env.hash(), (uid, mailbox_hash)); uid_store.uid_index.lock().unwrap().insert(uid, env.hash()); if let Some((_, keywords)) = flags { - let mut tag_lck = tag_index.write().unwrap(); + let mut tag_lck = uid_store.tag_index.write().unwrap(); for f in keywords { let hash = tag_hash!(f); if !tag_lck.contains_key(&hash) { @@ -492,7 +484,7 @@ pub fn idle(kit: ImapWatchKit) -> Result<()> { if !env.is_seen() { *mailbox.unseen.lock().unwrap() += 1; } - sender.send(RefreshEvent { + conn.add_refresh_event(RefreshEvent { account_hash, mailbox_hash, kind: Create(Box::new(env)), @@ -518,16 +510,15 @@ pub fn idle(kit: ImapWatchKit) -> Result<()> { /* a * {msg_seq} FETCH (FLAGS ({flags})) was received, so find out UID from msg_seq * and send update */ - let mut conn = super::try_lock(&main_conn)?; + let mut conn = super::try_lock(&main_conn, Some(std::time::Duration::new(10, 0)))?; debug!("fetch {} {:?}", msg_seq, flags); exit_on_error!( - sender, + conn, account_hash, mailbox_hash, work_context, thread_id, - conn.send_command(b"EXAMINE INBOX") - conn.read_response(&mut response) + conn.examine_mailbox(mailbox_hash, &mut response) conn.send_command( &[ b"UID SEARCH ", @@ -535,7 +526,7 @@ pub fn idle(kit: ImapWatchKit) -> Result<()> { ] .join(&b' '), ) - conn.read_response(&mut response) + conn.read_response(&mut response, RequiredResponses::SEARCH) ); match search_results(response.split_rn().next().unwrap_or("").as_bytes()) .to_full_result() @@ -543,7 +534,7 @@ pub fn idle(kit: ImapWatchKit) -> Result<()> { Ok(mut v) => { if let Some(uid) = v.pop() { if let Some(env_hash) = uid_store.uid_index.lock().unwrap().get(&uid) { - sender.send(RefreshEvent { + conn.add_refresh_event(RefreshEvent { account_hash, mailbox_hash, kind: NewFlags(*env_hash, flags), @@ -557,6 +548,7 @@ pub fn idle(kit: ImapWatchKit) -> Result<()> { } } } + Ok(Some(Bye { .. })) => break, Ok(None) | Err(_) => {} } work_context @@ -571,7 +563,7 @@ pub fn idle(kit: ImapWatchKit) -> Result<()> { .send((thread_id, "IDLE connection dropped".to_string())) .unwrap(); work_context.finished.send(thread_id).unwrap(); - sender.send(RefreshEvent { + main_conn.lock().unwrap().add_refresh_event(RefreshEvent { account_hash, mailbox_hash, kind: RefreshEventKind::Failure(MeliError::new(format!( @@ -585,25 +577,23 @@ pub fn idle(kit: ImapWatchKit) -> Result<()> { pub fn examine_updates( account_hash: AccountHash, mailbox: &ImapMailbox, - sender: &RefreshEventConsumer, conn: &mut ImapConnection, uid_store: &Arc, work_context: &WorkContext, - tag_index: &Arc>>, ) -> Result<()> { let thread_id: std::thread::ThreadId = std::thread::current().id(); let mailbox_hash = mailbox.hash(); debug!("examining mailbox {} {}", mailbox_hash, mailbox.path()); let mut response = String::with_capacity(8 * 1024); exit_on_error!( - sender, + conn, account_hash, mailbox_hash, work_context, thread_id, - conn.send_command(format!("EXAMINE \"{}\"", mailbox.imap_path()).as_bytes()) - conn.read_response(&mut response) + conn.examine_mailbox(mailbox_hash, &mut response) ); + *uid_store.is_online.lock().unwrap() = (Instant::now(), Ok(())); match protocol_parser::select_response(&response) { Ok(ok) => { debug!(&ok); @@ -612,7 +602,7 @@ pub fn examine_updates( if let Some(v) = uidvalidities.get_mut(&mailbox_hash) { if *v != ok.uidvalidity { - sender.send(RefreshEvent { + conn.add_refresh_event(RefreshEvent { account_hash, mailbox_hash, kind: RefreshEventKind::Rescan, @@ -625,12 +615,12 @@ pub fn examine_updates( *v = ok.uidvalidity; } } else { - sender.send(RefreshEvent { + conn.add_refresh_event(RefreshEvent { account_hash, mailbox_hash, kind: RefreshEventKind::Rescan, }); - sender.send(RefreshEvent { + conn.add_refresh_event(RefreshEvent { account_hash, mailbox_hash, kind: RefreshEventKind::Failure(MeliError::new(format!( @@ -647,13 +637,13 @@ pub fn examine_updates( { /* UID SEARCH RECENT */ exit_on_error!( - sender, + conn, account_hash, mailbox_hash, work_context, thread_id, conn.send_command(b"UID SEARCH RECENT") - conn.read_response(&mut response) + conn.read_response(&mut response, RequiredResponses::SEARCH) ); match protocol_parser::search_results_raw(response.as_bytes()) .to_full_result() @@ -664,24 +654,30 @@ pub fn examine_updates( } Ok(v) => { exit_on_error!( - sender, + conn, account_hash, mailbox_hash, work_context, thread_id, conn.send_command( - &[b"UID FETCH", v, b"(FLAGS RFC822)"] + &[&b"UID FETCH"[..], &v.trim().split(|b| b == &b' ').join(b','), &b"(FLAGS RFC822)"[..]] .join(&b' '), ) - conn.read_response(&mut response) + conn.read_response(&mut response, RequiredResponses::FETCH_REQUIRED) ); debug!(&response); match protocol_parser::uid_fetch_responses(&response) { Ok((_, v, _)) => { - for UidFetchResponse { - uid, flags, body, .. + 'fetch_responses_c: for UidFetchResponse { + uid, + flags, + body, + .. } in v { + if uid_store.uid_index.lock().unwrap().contains_key(&uid) { + continue 'fetch_responses_c; + } if let Ok(mut env) = Envelope::from_bytes( body.unwrap(), flags.as_ref().map(|&(f, _)| f), @@ -703,7 +699,8 @@ pub fn examine_updates( mailbox.path(), ); if let Some((_, keywords)) = flags { - let mut tag_lck = tag_index.write().unwrap(); + let mut tag_lck = + uid_store.tag_index.write().unwrap(); for f in keywords { let hash = tag_hash!(f); if !tag_lck.contains_key(&hash) { @@ -715,7 +712,7 @@ pub fn examine_updates( if !env.is_seen() { *mailbox.unseen.lock().unwrap() += 1; } - sender.send(RefreshEvent { + conn.add_refresh_event(RefreshEvent { account_hash, mailbox_hash, kind: Create(Box::new(env)), @@ -742,7 +739,7 @@ pub fn examine_updates( * */ debug!("exists {}", n); exit_on_error!( - sender, + conn, account_hash, mailbox_hash, work_context, @@ -755,14 +752,17 @@ pub fn examine_updates( ] .join(&b' '), ) - conn.read_response(&mut response) + conn.read_response(&mut response, RequiredResponses::FETCH_REQUIRED) ); match protocol_parser::uid_fetch_responses(&response) { Ok((_, v, _)) => { - for UidFetchResponse { + 'fetch_responses_a: for UidFetchResponse { uid, flags, body, .. } in v { + if uid_store.uid_index.lock().unwrap().contains_key(&uid) { + continue 'fetch_responses_a; + } if let Ok(mut env) = Envelope::from_bytes(body.unwrap(), flags.as_ref().map(|&(f, _)| f)) { @@ -773,7 +773,7 @@ pub fn examine_updates( .insert(env.hash(), (uid, mailbox_hash)); uid_store.uid_index.lock().unwrap().insert(uid, env.hash()); if let Some((_, keywords)) = flags { - let mut tag_lck = tag_index.write().unwrap(); + let mut tag_lck = uid_store.tag_index.write().unwrap(); for f in keywords { let hash = tag_hash!(f); if !tag_lck.contains_key(&hash) { @@ -791,7 +791,7 @@ pub fn examine_updates( if !env.is_seen() { *mailbox.unseen.lock().unwrap() += 1; } - sender.send(RefreshEvent { + conn.add_refresh_event(RefreshEvent { account_hash, mailbox_hash, kind: Create(Box::new(env)), diff --git a/melib/src/email/parser.rs b/melib/src/email/parser.rs index 827550fe..b0380cfb 100644 --- a/melib/src/email/parser.rs +++ b/melib/src/email/parser.rs @@ -105,6 +105,22 @@ impl BytesExt for [u8] { } } +pub trait BytesIterExt { + fn join(&mut self, sep: u8) -> Vec; +} + +impl<'a, P: for<'r> FnMut(&'r u8) -> bool> BytesIterExt for std::slice::Split<'a, u8, P> { + fn join(&mut self, sep: u8) -> Vec { + self.fold(vec![], |mut acc, el| { + if !acc.is_empty() { + acc.push(sep); + } + acc.extend(el.iter()); + acc + }) + } +} + fn quoted_printable_byte(input: &[u8]) -> IResult<&[u8], u8> { if input.len() < 3 { IResult::Incomplete(Needed::Size(1))