conf/accounts.rs: add 30s job timeout

memfd
Manos Pitsidianakis 2020-07-29 14:27:43 +03:00
parent 5c038887db
commit 44fdc0765e
Signed by: Manos Pitsidianakis
GPG Key ID: 73627C2F690DF710
1 changed files with 75 additions and 16 deletions

View File

@ -140,6 +140,7 @@ pub struct Account {
name: String,
hash: AccountHash,
pub is_online: Result<()>,
pub last_online_request: std::time::Instant,
pub(crate) mailbox_entries: HashMap<MailboxHash, MailboxEntry>,
pub(crate) mailboxes_order: Vec<MailboxHash>,
tree: Vec<MailboxNode>,
@ -201,6 +202,38 @@ pub enum JobRequest {
Watch(JoinHandle),
}
impl Drop for JobRequest {
fn drop(&mut self) {
match self {
JobRequest::Generic { handle, .. } => handle.0.cancel(),
JobRequest::Mailboxes(h, _) => h.0.cancel(),
JobRequest::Fetch(_, h, _) => h.0.cancel(),
JobRequest::IsOnline(h, _) => h.0.cancel(),
JobRequest::Refresh(_, h, _) => h.0.cancel(),
JobRequest::SetFlags(_, h, _) => h.0.cancel(),
JobRequest::SaveMessage(_, h, _) => h.0.cancel(),
JobRequest::CopyTo(_, h, _) => h.0.cancel(),
JobRequest::DeleteMessages(_, h, _) => h.0.cancel(),
JobRequest::CreateMailbox(h, _) => h.0.cancel(),
JobRequest::DeleteMailbox(h, _) => h.0.cancel(),
//JobRequest::RenameMailbox,
JobRequest::Search(h) => h.0.cancel(),
JobRequest::AsBytes(h) => h.0.cancel(),
JobRequest::SetMailboxPermissions(_, h, _) => {
h.0.cancel();
}
JobRequest::SetMailboxSubscription(_, h, _) => {
h.0.cancel();
}
JobRequest::Watch(h) => h.0.cancel(),
JobRequest::SendMessage => {}
JobRequest::SendMessageBackground(h, _) => {
h.0.cancel();
}
}
}
}
impl core::fmt::Debug for JobRequest {
fn fmt(&self, f: &mut core::fmt::Formatter) -> core::fmt::Result {
match self {
@ -380,6 +413,7 @@ impl Account {
} else {
Err(MeliError::new("Attempting connection."))
},
last_online_request: std::time::Instant::now(),
mailbox_entries: Default::default(),
mailboxes_order: Default::default(),
tree: Default::default(),
@ -1570,8 +1604,30 @@ impl Account {
return Ok(());
}
let mut timeout = false;
let mut drain: SmallVec<[std::time::Instant; 16]> = SmallVec::new();
const ONLINE_TIMEOUT: std::time::Duration = std::time::Duration::from_secs(30);
for (_instant, j) in self
.active_job_instants
.range(..std::time::Instant::now() - ONLINE_TIMEOUT)
{
if self.active_jobs.contains_key(j) {
debug!("timeout for {} {:?}", j, self.active_jobs[j]);
let req = self.active_jobs.remove(j).unwrap();
self.sender
.send(ThreadEvent::UIEvent(UIEvent::StatusEvent(
StatusEvent::JobCanceled(*j),
)))
.unwrap();
timeout |= !req.is_watch();
}
drain.push(*_instant);
}
for j in drain {
self.active_job_instants.remove(&j);
}
if self.backend_capabilities.is_async {
if self.is_online.is_ok() {
if self.is_online.is_ok() && !timeout {
return Ok(());
}
if !self.active_jobs.values().any(JobRequest::is_online) {
@ -1586,6 +1642,7 @@ impl Account {
let ret = self.backend.read().unwrap().is_online();
if ret.is_ok() != self.is_online.is_ok() {
if ret.is_ok() {
self.last_online_request = std::time::Instant::now();
self.init(None)?;
}
self.sender
@ -1648,7 +1705,7 @@ impl Account {
pub fn process_event(&mut self, job_id: &JobId) -> bool {
if self.active_jobs.contains_key(job_id) {
match self.active_jobs.remove(job_id).unwrap() {
JobRequest::Mailboxes(_, mut chan) => {
JobRequest::Mailboxes(_, ref mut chan) => {
if let Some(mailboxes) = chan.try_recv().unwrap() {
if mailboxes.is_err() || self.init(Some(mailboxes.unwrap())).is_err() {
if let Ok(mailboxes_job) =
@ -1664,7 +1721,7 @@ impl Account {
}
}
}
JobRequest::Fetch(mailbox_hash, _, mut chan) => {
JobRequest::Fetch(mailbox_hash, _, ref mut chan) => {
self.sender
.send(ThreadEvent::UIEvent(UIEvent::StatusEvent(
StatusEvent::JobFinished(*job_id),
@ -1740,7 +1797,7 @@ impl Account {
))))
.unwrap();
}
JobRequest::IsOnline(_, mut chan) => {
JobRequest::IsOnline(_, ref mut chan) => {
let is_online = chan.try_recv().unwrap();
if let Some(is_online) = is_online {
self.sender
@ -1752,6 +1809,7 @@ impl Account {
if self.is_online.is_err() {
self.watch();
}
self.last_online_request = std::time::Instant::now();
self.is_online = Ok(());
return true;
}
@ -1766,7 +1824,7 @@ impl Account {
.insert(std::time::Instant::now(), job_id);
}
}
JobRequest::Refresh(_mailbox_hash, _, mut chan) => {
JobRequest::Refresh(_mailbox_hash, _, ref mut chan) => {
let r = chan.try_recv().unwrap();
if let Some(r) = r {
if r.is_ok() {
@ -1774,6 +1832,7 @@ impl Account {
self.watch();
}
}
self.last_online_request = std::time::Instant::now();
self.is_online = Ok(());
self.sender
.send(ThreadEvent::UIEvent(UIEvent::AccountStatusChange(
@ -1787,7 +1846,7 @@ impl Account {
)))
.unwrap();
}
JobRequest::SetFlags(_, _, mut chan) => {
JobRequest::SetFlags(_, _, ref mut chan) => {
let r = chan.try_recv().unwrap();
if let Some(Err(err)) = r {
self.sender
@ -1799,7 +1858,7 @@ impl Account {
.expect("Could not send event on main channel");
}
}
JobRequest::SaveMessage(_, _, mut chan) => {
JobRequest::SaveMessage(_, _, ref mut chan) => {
let r = chan.try_recv().unwrap();
if let Some(Err(err)) = r {
self.sender
@ -1818,7 +1877,7 @@ impl Account {
)))
.unwrap();
}
JobRequest::SendMessageBackground(_, mut chan) => {
JobRequest::SendMessageBackground(_, ref mut chan) => {
let r = chan.try_recv().unwrap();
if let Some(Err(err)) = r {
self.sender
@ -1835,7 +1894,7 @@ impl Account {
)))
.unwrap();
}
JobRequest::CopyTo(mailbox_hash, _, mut chan) => {
JobRequest::CopyTo(mailbox_hash, _, ref mut chan) => {
if let Err(err) = chan
.try_recv()
.unwrap()
@ -1851,7 +1910,7 @@ impl Account {
.expect("Could not send event on main channel");
}
}
JobRequest::DeleteMessages(_, _, mut chan) => {
JobRequest::DeleteMessages(_, _, ref mut chan) => {
let r = chan.try_recv().unwrap();
if let Some(Err(err)) = r {
self.sender
@ -1863,7 +1922,7 @@ impl Account {
.expect("Could not send event on main channel");
}
}
JobRequest::CreateMailbox(_, mut chan) => {
JobRequest::CreateMailbox(_, ref mut chan) => {
let r = chan.try_recv().unwrap();
if let Some(r) = r {
self.sender
@ -1882,7 +1941,7 @@ impl Account {
.expect("Could not send event on main channel");
}
}
JobRequest::DeleteMailbox(_, mut chan) => {
JobRequest::DeleteMailbox(_, ref mut chan) => {
let r = chan.try_recv().unwrap();
match r {
Some(Err(err)) => {
@ -1914,7 +1973,7 @@ impl Account {
)))
.unwrap();
}
JobRequest::SetMailboxPermissions(_, _, mut chan) => {
JobRequest::SetMailboxPermissions(_, _, ref mut chan) => {
let r = chan.try_recv().unwrap();
match r {
Some(Err(err)) => {
@ -1944,7 +2003,7 @@ impl Account {
None => {}
}
}
JobRequest::SetMailboxSubscription(_, _, mut chan) => {
JobRequest::SetMailboxSubscription(_, _, ref mut chan) => {
let r = chan.try_recv().unwrap();
match r {
Some(Err(err)) => {
@ -1978,8 +2037,8 @@ impl Account {
debug!("JobRequest::Watch finished??? ");
}
JobRequest::Generic {
name,
mut channel,
ref name,
ref mut channel,
handle: _,
} => {
let r = channel.try_recv().unwrap();