diff --git a/melib/src/backends/imap.rs b/melib/src/backends/imap.rs index ef6465ea..3b01e3ba 100644 --- a/melib/src/backends/imap.rs +++ b/melib/src/backends/imap.rs @@ -140,9 +140,23 @@ pub struct ImapType { mailboxes: Arc>>, } +#[inline(always)] +pub(self) fn try_lock(connection: &Arc>) -> Result> { + let now = Instant::now(); + while Instant::now().duration_since(now) <= std::time::Duration::new(2, 0) { + if let Ok(guard) = connection.try_lock() { + return Ok(guard); + } + } + Err("Connection timeout".into()) +} + impl MailBackend for ImapType { fn is_online(&self) -> Result<()> { - self.online.lock().unwrap().1.clone() + if let Ok(mut g) = try_lock(&self.connection) { + let _ = g.connect(); + } + try_lock(&self.online)?.1.clone() } fn connect(&mut self) { @@ -150,7 +164,9 @@ impl MailBackend for ImapType { if Instant::now().duration_since(self.online.lock().unwrap().0) >= std::time::Duration::new(2, 0) { - let _ = self.connection.lock().unwrap().connect(); + if let Ok(mut g) = try_lock(&self.connection) { + let _ = g.connect(); + } } } } @@ -184,7 +200,7 @@ impl MailBackend for ImapType { if let Err(err) = (move || { let tx = _tx; let mut response = String::with_capacity(8 * 1024); - let mut conn = connection.lock()?; + let mut conn = try_lock(&connection)?; debug!("locked for get {}", mailbox_path); /* first SELECT the mailbox to get READ/WRITE permissions (because EXAMINE only @@ -296,7 +312,6 @@ impl MailBackend for ImapType { mailbox_hash: MailboxHash, sender: RefreshEventConsumer, ) -> Result> { - self.connection.lock().unwrap().connect()?; let inbox = self .mailboxes .read() @@ -311,7 +326,17 @@ impl MailBackend for ImapType { let w = AsyncBuilder::new(); let closure = move |work_context: WorkContext| { let thread = std::thread::current(); - let mut conn = main_conn.lock().unwrap(); + let mut conn = match try_lock(&main_conn) { + Ok(conn) => conn, + Err(err) => { + sender.send(RefreshEvent { + hash: mailbox_hash, + kind: RefreshEventKind::Failure(err.clone()), + }); + + return; + } + }; work_context .set_name .send(( @@ -450,7 +475,7 @@ impl MailBackend for ImapType { )))? }; let mut response = String::with_capacity(8 * 1024); - let mut conn = self.connection.lock().unwrap(); + let mut conn = try_lock(&self.connection)?; let flags = flags.unwrap_or(Flag::empty()); conn.send_command( format!( @@ -523,7 +548,7 @@ impl MailBackend for ImapType { let mut response = String::with_capacity(8 * 1024); { - let mut conn_lck = self.connection.lock()?; + let mut conn_lck = try_lock(&self.connection)?; conn_lck.send_command(format!("CREATE \"{}\"", path,).as_bytes())?; conn_lck.read_response(&mut response)?; @@ -549,7 +574,7 @@ impl MailBackend for ImapType { } let mut response = String::with_capacity(8 * 1024); { - let mut conn_lck = self.connection.lock()?; + let mut conn_lck = try_lock(&self.connection)?; if !mailboxes[&mailbox_hash].no_select { /* make sure mailbox is not selected before it gets deleted, otherwise * connection gets dropped by server */ @@ -604,7 +629,7 @@ impl MailBackend for ImapType { let mut response = String::with_capacity(8 * 1024); { - let mut conn_lck = self.connection.lock()?; + let mut conn_lck = try_lock(&self.connection)?; if new_val { conn_lck.send_command( format!("SUBSCRIBE \"{}\"", mailboxes[&mailbox_hash].imap_path()).as_bytes(), @@ -644,7 +669,7 @@ impl MailBackend for ImapType { ); } { - let mut conn_lck = self.connection.lock()?; + let mut conn_lck = try_lock(&self.connection)?; conn_lck.send_command( debug!(format!( "RENAME \"{}\" \"{}\"", @@ -787,7 +812,7 @@ impl ImapType { ) -> Result> { let mut mailboxes: FnvHashMap = Default::default(); let mut res = String::with_capacity(8 * 1024); - let mut conn = connection.lock().unwrap(); + let mut conn = try_lock(&connection)?; conn.send_command(b"LIST \"\" \"*\"")?; conn.read_response(&mut res)?; debug!("out: {}", &res); @@ -852,13 +877,14 @@ impl ImapType { } pub fn capabilities(&self) -> Vec { - self.connection - .lock() - .unwrap() - .capabilities - .iter() - .map(|c| String::from_utf8_lossy(c).into()) - .collect::>() + try_lock(&self.connection) + .map(|c| { + c.capabilities + .iter() + .map(|c| String::from_utf8_lossy(c).into()) + .collect::>() + }) + .unwrap_or_default() } pub fn search( @@ -868,7 +894,7 @@ impl ImapType { ) -> Result> { let mailboxes_lck = self.mailboxes.read()?; let mut response = String::with_capacity(8 * 1024); - let mut conn = self.connection.lock()?; + let mut conn = try_lock(&self.connection)?; conn.send_command( format!("EXAMINE \"{}\"", mailboxes_lck[&mailbox_hash].imap_path()).as_bytes(), )?; diff --git a/melib/src/backends/imap/connection.rs b/melib/src/backends/imap/connection.rs index 58eb2857..31246b50 100644 --- a/melib/src/backends/imap/connection.rs +++ b/melib/src/backends/imap/connection.rs @@ -83,7 +83,7 @@ impl ImapStream { ))); }; - let mut socket = TcpStream::connect(&addr)?; + let mut socket = TcpStream::connect_timeout(&addr, std::time::Duration::new(4, 0))?; socket.set_read_timeout(Some(std::time::Duration::new(4, 0)))?; socket.set_write_timeout(Some(std::time::Duration::new(4, 0)))?; let cmd_id = 1; @@ -577,7 +577,11 @@ impl Iterator for ImapBlockingConnection { return Some(result[0..*prev_res_length].to_vec()); } } - Err(e) if e.kind() == std::io::ErrorKind::WouldBlock => { + Err(e) + if e.kind() == std::io::ErrorKind::WouldBlock + || e.kind() == std::io::ErrorKind::Interrupted => + { + debug!(&e); continue; } Err(e) => { diff --git a/melib/src/backends/imap/operations.rs b/melib/src/backends/imap/operations.rs index 703cf5a4..746d5b5f 100644 --- a/melib/src/backends/imap/operations.rs +++ b/melib/src/backends/imap/operations.rs @@ -79,7 +79,7 @@ impl BackendOp for ImapOp { drop(bytes_cache); let mut response = String::with_capacity(8 * 1024); { - let mut conn = self.connection.lock().unwrap(); + let mut conn = try_lock(&self.connection)?; conn.send_command(format!("SELECT \"{}\"", &self.mailbox_path,).as_bytes())?; conn.read_response(&mut response)?; conn.send_command(format!("UID FETCH {} (FLAGS RFC822)", self.uid).as_bytes())?; @@ -110,22 +110,32 @@ impl BackendOp for ImapOp { } fn fetch_flags(&self) -> Flag { + macro_rules! or_return_default { + ($expr:expr) => { + match $expr { + Ok(ok) => ok, + Err(_) => return Default::default(), + } + }; + } if self.flags.get().is_some() { return self.flags.get().unwrap(); } - let mut bytes_cache = self.uid_store.byte_cache.lock().unwrap(); + let mut bytes_cache = or_return_default!(self.uid_store.byte_cache.lock()); let cache = bytes_cache.entry(self.uid).or_default(); if cache.flags.is_some() { self.flags.set(cache.flags); } else { let mut response = String::with_capacity(8 * 1024); - let mut conn = self.connection.lock().unwrap(); - conn.send_command(format!("EXAMINE \"{}\"", &self.mailbox_path,).as_bytes()) - .unwrap(); - conn.read_response(&mut response).unwrap(); - conn.send_command(format!("UID FETCH {} FLAGS", self.uid).as_bytes()) - .unwrap(); - conn.read_response(&mut response).unwrap(); + let mut conn = or_return_default!(try_lock(&self.connection)); + or_return_default!( + conn.send_command(format!("EXAMINE \"{}\"", &self.mailbox_path,).as_bytes()) + ); + or_return_default!(conn.read_response(&mut response)); + or_return_default!( + conn.send_command(format!("UID FETCH {} FLAGS", self.uid).as_bytes()) + ); + or_return_default!(conn.read_response(&mut response)); debug!( "fetch response is {} bytes and {} lines", response.len(), @@ -146,7 +156,7 @@ impl BackendOp for ImapOp { cache.flags = Some(flags); self.flags.set(Some(flags)); } - Err(e) => Err(e).unwrap(), + Err(e) => or_return_default!(Err(e)), } } self.flags.get().unwrap() @@ -157,7 +167,7 @@ impl BackendOp for ImapOp { flags.set(f, value); let mut response = String::with_capacity(8 * 1024); - let mut conn = self.connection.lock().unwrap(); + let mut conn = try_lock(&self.connection)?; conn.send_command(format!("SELECT \"{}\"", &self.mailbox_path,).as_bytes())?; conn.read_response(&mut response)?; debug!(&response); @@ -183,7 +193,7 @@ impl BackendOp for ImapOp { self.flags.set(Some(flags)); } } - Err(e) => Err(e).unwrap(), + Err(e) => Err(e)?, } let mut bytes_cache = self.uid_store.byte_cache.lock()?; let cache = bytes_cache.entry(self.uid).or_default(); @@ -193,7 +203,7 @@ impl BackendOp for ImapOp { fn set_tag(&mut self, envelope: &mut Envelope, tag: String, value: bool) -> Result<()> { let mut response = String::with_capacity(8 * 1024); - let mut conn = self.connection.lock().unwrap(); + let mut conn = try_lock(&self.connection)?; conn.send_command(format!("SELECT \"{}\"", &self.mailbox_path,).as_bytes())?; conn.read_response(&mut response)?; conn.send_command( diff --git a/melib/src/backends/imap/watch.rs b/melib/src/backends/imap/watch.rs index 1dbda0de..685fc17a 100644 --- a/melib/src/backends/imap/watch.rs +++ b/melib/src/backends/imap/watch.rs @@ -62,7 +62,7 @@ pub fn poll_with_examine(kit: ImapWatchKit) -> Result<()> { tag_index, } = kit; loop { - if is_online.lock().unwrap().1.is_ok() { + if super::try_lock(&is_online)?.1.is_ok() { break; } std::thread::sleep(std::time::Duration::from_millis(100)); @@ -76,7 +76,7 @@ pub fn poll_with_examine(kit: ImapWatchKit) -> Result<()> { .send((thread_id, "sleeping...".to_string())) .unwrap(); std::thread::sleep(std::time::Duration::from_millis(5 * 60 * 1000)); - let mailboxes = mailboxes.read().unwrap(); + let mailboxes = mailboxes.read()?; for mailbox in mailboxes.values() { work_context .set_status @@ -94,9 +94,9 @@ pub fn poll_with_examine(kit: ImapWatchKit) -> Result<()> { &tag_index, )?; } - let mut main_conn = main_conn.lock().unwrap(); - main_conn.send_command(b"NOOP").unwrap(); - main_conn.read_response(&mut response).unwrap(); + let mut main_conn = super::try_lock(&main_conn)?; + main_conn.send_command(b"NOOP")?; + main_conn.read_response(&mut response)?; } } @@ -115,7 +115,7 @@ pub fn idle(kit: ImapWatchKit) -> Result<()> { tag_index, } = kit; loop { - if is_online.lock().unwrap().1.is_ok() { + if super::try_lock(&is_online)?.1.is_ok() { break; } std::thread::sleep(std::time::Duration::from_millis(100)); @@ -169,9 +169,11 @@ pub fn idle(kit: ImapWatchKit) -> Result<()> { kind: RefreshEventKind::Rescan, }); *prev_exists = 0; + /* uid_store.uid_index.lock().unwrap().clear(); uid_store.hash_index.lock().unwrap().clear(); uid_store.byte_cache.lock().unwrap().clear(); + */ *v = ok.uidvalidity; } } else { @@ -219,6 +221,7 @@ pub fn idle(kit: ImapWatchKit) -> Result<()> { while let Some(line) = iter.next() { let now = std::time::Instant::now(); if now.duration_since(beat) >= _26_mins { + let mut main_conn_lck = super::try_lock(&main_conn)?; exit_on_error!( sender, mailbox_hash, @@ -229,14 +232,14 @@ pub fn idle(kit: ImapWatchKit) -> Result<()> { iter.conn.read_response(&mut response) iter.conn.send_command(b"IDLE") iter.conn.set_nonblocking(false) - main_conn.lock().unwrap().send_command(b"NOOP") - main_conn.lock().unwrap().read_response(&mut response) + main_conn_lck.send_command(b"NOOP") + main_conn_lck.read_response(&mut response) ); beat = now; } if now.duration_since(watch) >= _5_mins { /* Time to poll all inboxes */ - let mut conn = main_conn.lock().unwrap(); + let mut conn = try_lock(&main_conn)?; for mailbox in mailboxes.read().unwrap().values() { work_context .set_status @@ -271,7 +274,7 @@ pub fn idle(kit: ImapWatchKit) -> Result<()> { .map_err(MeliError::from) { Ok(Some(Recent(r))) => { - let mut conn = main_conn.lock().unwrap(); + let mut conn = super::try_lock(&main_conn)?; work_context .set_status .send((thread_id, format!("got `{} RECENT` notification", r))) @@ -387,7 +390,7 @@ pub fn idle(kit: ImapWatchKit) -> Result<()> { debug!("expunge {}", n); } Ok(Some(Exists(n))) => { - let mut conn = main_conn.lock().unwrap(); + let mut conn = super::try_lock(&main_conn)?; /* UID FETCH ALL UID, cross-ref, then FETCH difference headers * */ let mut prev_exists = mailbox.exists.lock().unwrap(); @@ -496,7 +499,7 @@ pub fn idle(kit: ImapWatchKit) -> Result<()> { /* a * {msg_seq} FETCH (FLAGS ({flags})) was received, so find out UID from msg_seq * and send update */ - let mut conn = main_conn.lock().unwrap(); + let mut conn = super::try_lock(&main_conn)?; debug!("fetch {} {:?}", msg_seq, flags); exit_on_error!( sender, @@ -589,9 +592,11 @@ pub fn examine_updates( hash: mailbox_hash, kind: RefreshEventKind::Rescan, }); + /* uid_store.uid_index.lock().unwrap().clear(); uid_store.hash_index.lock().unwrap().clear(); uid_store.byte_cache.lock().unwrap().clear(); + */ *v = ok.uidvalidity; } } else {