From 44fdc0765ea06e0b6f31ba4bd7d2502e13ad74e0 Mon Sep 17 00:00:00 2001 From: Manos Pitsidianakis Date: Wed, 29 Jul 2020 14:27:43 +0300 Subject: [PATCH] conf/accounts.rs: add 30s job timeout --- src/conf/accounts.rs | 91 ++++++++++++++++++++++++++++++++++++-------- 1 file changed, 75 insertions(+), 16 deletions(-) diff --git a/src/conf/accounts.rs b/src/conf/accounts.rs index 5da3f4ca..a867dda1 100644 --- a/src/conf/accounts.rs +++ b/src/conf/accounts.rs @@ -140,6 +140,7 @@ pub struct Account { name: String, hash: AccountHash, pub is_online: Result<()>, + pub last_online_request: std::time::Instant, pub(crate) mailbox_entries: HashMap, pub(crate) mailboxes_order: Vec, tree: Vec, @@ -201,6 +202,38 @@ pub enum JobRequest { Watch(JoinHandle), } +impl Drop for JobRequest { + fn drop(&mut self) { + match self { + JobRequest::Generic { handle, .. } => handle.0.cancel(), + JobRequest::Mailboxes(h, _) => h.0.cancel(), + JobRequest::Fetch(_, h, _) => h.0.cancel(), + JobRequest::IsOnline(h, _) => h.0.cancel(), + JobRequest::Refresh(_, h, _) => h.0.cancel(), + JobRequest::SetFlags(_, h, _) => h.0.cancel(), + JobRequest::SaveMessage(_, h, _) => h.0.cancel(), + JobRequest::CopyTo(_, h, _) => h.0.cancel(), + JobRequest::DeleteMessages(_, h, _) => h.0.cancel(), + JobRequest::CreateMailbox(h, _) => h.0.cancel(), + JobRequest::DeleteMailbox(h, _) => h.0.cancel(), + //JobRequest::RenameMailbox, + JobRequest::Search(h) => h.0.cancel(), + JobRequest::AsBytes(h) => h.0.cancel(), + JobRequest::SetMailboxPermissions(_, h, _) => { + h.0.cancel(); + } + JobRequest::SetMailboxSubscription(_, h, _) => { + h.0.cancel(); + } + JobRequest::Watch(h) => h.0.cancel(), + JobRequest::SendMessage => {} + JobRequest::SendMessageBackground(h, _) => { + h.0.cancel(); + } + } + } +} + impl core::fmt::Debug for JobRequest { fn fmt(&self, f: &mut core::fmt::Formatter) -> core::fmt::Result { match self { @@ -380,6 +413,7 @@ impl Account { } else { Err(MeliError::new("Attempting connection.")) }, + last_online_request: std::time::Instant::now(), mailbox_entries: Default::default(), mailboxes_order: Default::default(), tree: Default::default(), @@ -1570,8 +1604,30 @@ impl Account { return Ok(()); } + let mut timeout = false; + let mut drain: SmallVec<[std::time::Instant; 16]> = SmallVec::new(); + const ONLINE_TIMEOUT: std::time::Duration = std::time::Duration::from_secs(30); + for (_instant, j) in self + .active_job_instants + .range(..std::time::Instant::now() - ONLINE_TIMEOUT) + { + if self.active_jobs.contains_key(j) { + debug!("timeout for {} {:?}", j, self.active_jobs[j]); + let req = self.active_jobs.remove(j).unwrap(); + self.sender + .send(ThreadEvent::UIEvent(UIEvent::StatusEvent( + StatusEvent::JobCanceled(*j), + ))) + .unwrap(); + timeout |= !req.is_watch(); + } + drain.push(*_instant); + } + for j in drain { + self.active_job_instants.remove(&j); + } if self.backend_capabilities.is_async { - if self.is_online.is_ok() { + if self.is_online.is_ok() && !timeout { return Ok(()); } if !self.active_jobs.values().any(JobRequest::is_online) { @@ -1586,6 +1642,7 @@ impl Account { let ret = self.backend.read().unwrap().is_online(); if ret.is_ok() != self.is_online.is_ok() { if ret.is_ok() { + self.last_online_request = std::time::Instant::now(); self.init(None)?; } self.sender @@ -1648,7 +1705,7 @@ impl Account { pub fn process_event(&mut self, job_id: &JobId) -> bool { if self.active_jobs.contains_key(job_id) { match self.active_jobs.remove(job_id).unwrap() { - JobRequest::Mailboxes(_, mut chan) => { + JobRequest::Mailboxes(_, ref mut chan) => { if let Some(mailboxes) = chan.try_recv().unwrap() { if mailboxes.is_err() || self.init(Some(mailboxes.unwrap())).is_err() { if let Ok(mailboxes_job) = @@ -1664,7 +1721,7 @@ impl Account { } } } - JobRequest::Fetch(mailbox_hash, _, mut chan) => { + JobRequest::Fetch(mailbox_hash, _, ref mut chan) => { self.sender .send(ThreadEvent::UIEvent(UIEvent::StatusEvent( StatusEvent::JobFinished(*job_id), @@ -1740,7 +1797,7 @@ impl Account { )))) .unwrap(); } - JobRequest::IsOnline(_, mut chan) => { + JobRequest::IsOnline(_, ref mut chan) => { let is_online = chan.try_recv().unwrap(); if let Some(is_online) = is_online { self.sender @@ -1752,6 +1809,7 @@ impl Account { if self.is_online.is_err() { self.watch(); } + self.last_online_request = std::time::Instant::now(); self.is_online = Ok(()); return true; } @@ -1766,7 +1824,7 @@ impl Account { .insert(std::time::Instant::now(), job_id); } } - JobRequest::Refresh(_mailbox_hash, _, mut chan) => { + JobRequest::Refresh(_mailbox_hash, _, ref mut chan) => { let r = chan.try_recv().unwrap(); if let Some(r) = r { if r.is_ok() { @@ -1774,6 +1832,7 @@ impl Account { self.watch(); } } + self.last_online_request = std::time::Instant::now(); self.is_online = Ok(()); self.sender .send(ThreadEvent::UIEvent(UIEvent::AccountStatusChange( @@ -1787,7 +1846,7 @@ impl Account { ))) .unwrap(); } - JobRequest::SetFlags(_, _, mut chan) => { + JobRequest::SetFlags(_, _, ref mut chan) => { let r = chan.try_recv().unwrap(); if let Some(Err(err)) = r { self.sender @@ -1799,7 +1858,7 @@ impl Account { .expect("Could not send event on main channel"); } } - JobRequest::SaveMessage(_, _, mut chan) => { + JobRequest::SaveMessage(_, _, ref mut chan) => { let r = chan.try_recv().unwrap(); if let Some(Err(err)) = r { self.sender @@ -1818,7 +1877,7 @@ impl Account { ))) .unwrap(); } - JobRequest::SendMessageBackground(_, mut chan) => { + JobRequest::SendMessageBackground(_, ref mut chan) => { let r = chan.try_recv().unwrap(); if let Some(Err(err)) = r { self.sender @@ -1835,7 +1894,7 @@ impl Account { ))) .unwrap(); } - JobRequest::CopyTo(mailbox_hash, _, mut chan) => { + JobRequest::CopyTo(mailbox_hash, _, ref mut chan) => { if let Err(err) = chan .try_recv() .unwrap() @@ -1851,7 +1910,7 @@ impl Account { .expect("Could not send event on main channel"); } } - JobRequest::DeleteMessages(_, _, mut chan) => { + JobRequest::DeleteMessages(_, _, ref mut chan) => { let r = chan.try_recv().unwrap(); if let Some(Err(err)) = r { self.sender @@ -1863,7 +1922,7 @@ impl Account { .expect("Could not send event on main channel"); } } - JobRequest::CreateMailbox(_, mut chan) => { + JobRequest::CreateMailbox(_, ref mut chan) => { let r = chan.try_recv().unwrap(); if let Some(r) = r { self.sender @@ -1882,7 +1941,7 @@ impl Account { .expect("Could not send event on main channel"); } } - JobRequest::DeleteMailbox(_, mut chan) => { + JobRequest::DeleteMailbox(_, ref mut chan) => { let r = chan.try_recv().unwrap(); match r { Some(Err(err)) => { @@ -1914,7 +1973,7 @@ impl Account { ))) .unwrap(); } - JobRequest::SetMailboxPermissions(_, _, mut chan) => { + JobRequest::SetMailboxPermissions(_, _, ref mut chan) => { let r = chan.try_recv().unwrap(); match r { Some(Err(err)) => { @@ -1944,7 +2003,7 @@ impl Account { None => {} } } - JobRequest::SetMailboxSubscription(_, _, mut chan) => { + JobRequest::SetMailboxSubscription(_, _, ref mut chan) => { let r = chan.try_recv().unwrap(); match r { Some(Err(err)) => { @@ -1978,8 +2037,8 @@ impl Account { debug!("JobRequest::Watch finished??? "); } JobRequest::Generic { - name, - mut channel, + ref name, + ref mut channel, handle: _, } => { let r = channel.try_recv().unwrap();