Don't unwrap try_recv() on async jobs channels

Job might have been canceled.
jmap-eventsource
Manos Pitsidianakis 2020-12-24 21:56:24 +02:00
parent b2e853dd7b
commit ed826357a3
Signed by: Manos Pitsidianakis
GPG Key ID: 73627C2F690DF710
7 changed files with 213 additions and 174 deletions

View File

@ -1115,19 +1115,26 @@ impl Component for Composer {
ViewMode::WaitingForSendResult(_, ref mut handle),
UIEvent::StatusEvent(StatusEvent::JobFinished(ref job_id)),
) if handle.job_id == *job_id => {
let result = handle.chan.try_recv().unwrap();
if let Some(Err(err)) = result {
self.mode = ViewMode::Edit;
context.replies.push_back(UIEvent::Notification(
None,
err.to_string(),
Some(NotificationType::Error(err.kind)),
));
self.set_dirty(true);
} else {
context
.replies
.push_back(UIEvent::Action(Tab(Kill(self.id))));
match handle
.chan
.try_recv()
.map_err(|_: futures::channel::oneshot::Canceled| {
MeliError::new("Job was canceled")
}) {
Err(err) | Ok(Some(Err(err))) => {
self.mode = ViewMode::Edit;
context.replies.push_back(UIEvent::Notification(
None,
err.to_string(),
Some(NotificationType::Error(err.kind)),
));
self.set_dirty(true);
}
Ok(None) | Ok(Some(Ok(()))) => {
context
.replies
.push_back(UIEvent::Action(Tab(Kill(self.id))));
}
}
return false;
}

View File

@ -119,8 +119,10 @@ impl Component for KeySelection {
..
} => match event {
UIEvent::StatusEvent(StatusEvent::JobFinished(ref id)) if *id == handle.job_id => {
match handle.chan.try_recv().unwrap().unwrap() {
Ok(keys) => {
match handle.chan.try_recv() {
Err(_) => { /* Job was canceled */ }
Ok(None) => { /* something happened, perhaps a worker thread panicked */ }
Ok(Some(Ok(keys))) => {
if keys.is_empty() {
let id = progress_spinner.id();
if allow_remote_lookup.is_true() {
@ -192,7 +194,7 @@ impl Component for KeySelection {
widget.set_dirty(true);
*self = KeySelection::Loaded { widget, keys };
}
Err(err) => {
Ok(Some(Err(err))) => {
*self = KeySelection::Error {
err,
id: ComponentId::new_v4(),

View File

@ -1870,8 +1870,11 @@ impl Component for CompactListing {
.unwrap_or(false) =>
{
let (filter_term, mut handle) = self.search_job.take().unwrap();
let results = handle.chan.try_recv().unwrap().unwrap();
self.filter(filter_term, results, context);
match handle.chan.try_recv() {
Err(_) => { /* search was canceled */ }
Ok(None) => { /* something happened, perhaps a worker thread panicked */ }
Ok(Some(results)) => self.filter(filter_term, results, context),
}
self.set_dirty(true);
}
UIEvent::StatusEvent(StatusEvent::JobFinished(ref job_id))
@ -1882,8 +1885,11 @@ impl Component for CompactListing {
.unwrap_or(false) =>
{
let (search_term, mut handle) = self.select_job.take().unwrap();
let results = handle.chan.try_recv().unwrap().unwrap();
self.select(&search_term, results, context);
match handle.chan.try_recv() {
Err(_) => { /* search was canceled */ }
Ok(None) => { /* something happened, perhaps a worker thread panicked */ }
Ok(Some(results)) => self.select(&search_term, results, context),
}
self.set_dirty(true);
}
_ => {}

View File

@ -1715,8 +1715,11 @@ impl Component for ConversationsListing {
.unwrap_or(false) =>
{
let (filter_term, mut handle) = self.search_job.take().unwrap();
let results = handle.chan.try_recv().unwrap().unwrap();
self.filter(filter_term, results, context);
match handle.chan.try_recv() {
Err(_) => { /* search was canceled */ }
Ok(None) => { /* something happened, perhaps a worker thread panicked */ }
Ok(Some(results)) => self.filter(filter_term, results, context),
}
self.set_dirty(true);
}
_ => {}

View File

@ -1362,8 +1362,11 @@ impl Component for PlainListing {
.unwrap_or(false) =>
{
let (filter_term, mut handle) = self.search_job.take().unwrap();
let results = handle.chan.try_recv().unwrap().unwrap();
self.filter(filter_term, results, context);
match handle.chan.try_recv() {
Err(_) => { /* search was canceled */ }
Ok(None) => { /* something happened, perhaps a worker thread panicked */ }
Ok(Some(results)) => self.filter(filter_term, results, context),
}
self.set_dirty(true);
}
_ => {}

View File

@ -1607,9 +1607,11 @@ impl Component for MailView {
ref mut handle,
pending_action: _,
} if handle.job_id == *job_id => {
let bytes_result = handle.chan.try_recv().unwrap().unwrap();
match bytes_result {
Ok(bytes) => {
match handle.chan.try_recv() {
Err(_) => { /* Job was canceled */ }
Ok(None) => { /* something happened, perhaps a worker thread panicked */
}
Ok(Some(Ok(bytes))) => {
if context.accounts[&self.coordinates.0]
.collection
.get_env(self.coordinates.2)
@ -1642,7 +1644,7 @@ impl Component for MailView {
body_text,
};
}
Err(err) => {
Ok(Some(Err(err))) => {
self.state = MailViewState::Error { err };
}
}
@ -1664,8 +1666,11 @@ impl Component for MailView {
} if *our_job_id == *job_id => {
caught = true;
self.initialised = false;
match handle.chan.try_recv().unwrap().unwrap() {
Ok(()) => {
match handle.chan.try_recv() {
Err(_) => { /* Job was canceled */ }
Ok(None) => { /* something happened, perhaps a worker thread panicked */
}
Ok(Some(Ok(()))) => {
*d = AttachmentDisplay::SignedVerified {
inner: std::mem::replace(
inner,
@ -1675,7 +1680,7 @@ impl Component for MailView {
description: String::new(),
};
}
Err(error) => {
Ok(Some(Err(error))) => {
*d = AttachmentDisplay::SignedFailed {
inner: std::mem::replace(
inner,
@ -1692,8 +1697,11 @@ impl Component for MailView {
{
caught = true;
self.initialised = false;
match handle.chan.try_recv().unwrap().unwrap() {
Ok((metadata, decrypted_bytes)) => {
match handle.chan.try_recv() {
Err(_) => { /* Job was canceled */ }
Ok(None) => { /* something happened, perhaps a worker thread panicked */
}
Ok(Some(Ok((metadata, decrypted_bytes)))) => {
let plaintext =
AttachmentBuilder::new(&decrypted_bytes)
.build();
@ -1713,7 +1721,7 @@ impl Component for MailView {
description: format!("{:?}", metadata),
};
}
Err(error) => {
Ok(Some(Err(error))) => {
*d = AttachmentDisplay::EncryptedFailed {
inner: std::mem::replace(
inner,
@ -1814,34 +1822,40 @@ impl Component for MailView {
name: "fetch envelope".into(),
handle,
on_finish: Some(CallbackFn(Box::new(move |context: &mut Context| {
let result = receiver.try_recv().unwrap().unwrap();
match result.and_then(|bytes| {
Composer::edit(account_hash, env_hash, &bytes, context)
}) {
Ok(composer) => {
context.replies.push_back(UIEvent::Action(Tab(New(Some(
Box::new(composer),
)))));
match receiver.try_recv() {
Err(_) => { /* Job was canceled */ }
Ok(None) => { /* something happened, perhaps a worker thread panicked */
}
Err(err) => {
let err_string = format!(
"Failed to open envelope {}: {}",
context.accounts[&account_hash]
.collection
.envelopes
.read()
.unwrap()
.get(&env_hash)
.map(|env| env.message_id_display())
.unwrap_or_else(|| "Not found".into()),
err.to_string()
);
log(&err_string, ERROR);
context.replies.push_back(UIEvent::Notification(
Some("Failed to open e-mail".to_string()),
err_string,
Some(NotificationType::Error(err.kind)),
));
Ok(Some(result)) => {
match result.and_then(|bytes| {
Composer::edit(account_hash, env_hash, &bytes, context)
}) {
Ok(composer) => {
context.replies.push_back(UIEvent::Action(Tab(New(Some(
Box::new(composer),
)))));
}
Err(err) => {
let err_string = format!(
"Failed to open envelope {}: {}",
context.accounts[&account_hash]
.collection
.envelopes
.read()
.unwrap()
.get(&env_hash)
.map(|env| env.message_id_display())
.unwrap_or_else(|| "Not found".into()),
err.to_string()
);
log(&err_string, ERROR);
context.replies.push_back(UIEvent::Notification(
Some("Failed to open e-mail".to_string()),
err_string,
Some(NotificationType::Error(err.kind)),
));
}
}
}
}
}))),

View File

@ -1585,7 +1585,7 @@ impl Account {
if self.active_jobs.contains_key(job_id) {
match self.active_jobs.remove(job_id).unwrap() {
JobRequest::Mailboxes { ref mut handle } => {
if let Some(mailboxes) = handle.chan.try_recv().unwrap() {
if let Ok(Some(mailboxes)) = handle.chan.try_recv() {
self.sender
.send(ThreadEvent::UIEvent(UIEvent::AccountStatusChange(
self.hash,
@ -1620,83 +1620,91 @@ impl Account {
ref mut handle,
..
} => {
let (payload, rest): (Option<Result<Vec<Envelope>>>, _) =
handle.chan.try_recv().unwrap().unwrap();
debug!("got payload in status for {}", mailbox_hash);
if payload.is_none() {
debug!("finished in status for {}", mailbox_hash);
self.mailbox_entries
.entry(mailbox_hash)
.and_modify(|entry| {
entry.status = MailboxStatus::Available;
});
self.sender
.send(ThreadEvent::UIEvent(UIEvent::MailboxUpdate((
self.hash,
mailbox_hash,
))))
.unwrap();
return true;
}
let handle = if self.backend_capabilities.is_async {
self.job_executor.spawn_specialized(rest.into_future())
} else {
self.job_executor.spawn_blocking(rest.into_future())
};
self.insert_job(
handle.job_id,
JobRequest::Fetch {
mailbox_hash,
handle,
},
);
let payload = payload.unwrap();
if let Err(err) = payload {
self.sender
.send(ThreadEvent::UIEvent(UIEvent::Notification(
Some(format!("{}: could not fetch mailbox", &self.name)),
err.to_string(),
Some(crate::types::NotificationType::Error(err.kind)),
)))
.expect("Could not send event on main channel");
self.mailbox_entries
.entry(mailbox_hash)
.and_modify(|entry| {
entry.status = MailboxStatus::Failed(err);
});
self.sender
.send(ThreadEvent::UIEvent(UIEvent::MailboxUpdate((
self.hash,
mailbox_hash,
))))
.unwrap();
return true;
}
let envelopes = payload
.unwrap()
.into_iter()
.map(|e| (e.hash(), e))
.collect::<HashMap<EnvelopeHash, Envelope>>();
if let Some(updated_mailboxes) =
self.collection
.merge(envelopes, mailbox_hash, self.sent_mailbox)
{
for f in updated_mailboxes {
match handle.chan.try_recv() {
Err(_) => {
/* canceled */
return true;
}
Ok(None) => {
return true;
}
Ok(Some((None, _))) => {
debug!("finished in status for {}", mailbox_hash);
self.mailbox_entries
.entry(mailbox_hash)
.and_modify(|entry| {
entry.status = MailboxStatus::Available;
});
self.sender
.send(ThreadEvent::UIEvent(UIEvent::MailboxUpdate((self.hash, f))))
.send(ThreadEvent::UIEvent(UIEvent::MailboxUpdate((
self.hash,
mailbox_hash,
))))
.unwrap();
return true;
}
Ok(Some((Some(Err(err)), _))) => {
self.sender
.send(ThreadEvent::UIEvent(UIEvent::Notification(
Some(format!("{}: could not fetch mailbox", &self.name)),
err.to_string(),
Some(crate::types::NotificationType::Error(err.kind)),
)))
.expect("Could not send event on main channel");
self.mailbox_entries
.entry(mailbox_hash)
.and_modify(|entry| {
entry.status = MailboxStatus::Failed(err);
});
self.sender
.send(ThreadEvent::UIEvent(UIEvent::MailboxUpdate((
self.hash,
mailbox_hash,
))))
.unwrap();
return true;
}
Ok(Some((Some(Ok(payload)), rest))) => {
let handle = if self.backend_capabilities.is_async {
self.job_executor.spawn_specialized(rest.into_future())
} else {
self.job_executor.spawn_blocking(rest.into_future())
};
self.insert_job(
handle.job_id,
JobRequest::Fetch {
mailbox_hash,
handle,
},
);
let envelopes = payload
.into_iter()
.map(|e| (e.hash(), e))
.collect::<HashMap<EnvelopeHash, Envelope>>();
if let Some(updated_mailboxes) =
self.collection
.merge(envelopes, mailbox_hash, self.sent_mailbox)
{
for f in updated_mailboxes {
self.sender
.send(ThreadEvent::UIEvent(UIEvent::MailboxUpdate((
self.hash, f,
))))
.unwrap();
}
}
self.sender
.send(ThreadEvent::UIEvent(UIEvent::MailboxUpdate((
self.hash,
mailbox_hash,
))))
.unwrap();
}
}
self.sender
.send(ThreadEvent::UIEvent(UIEvent::MailboxUpdate((
self.hash,
mailbox_hash,
))))
.unwrap();
}
JobRequest::IsOnline { ref mut handle, .. } => {
let is_online = handle.chan.try_recv().unwrap();
if let Some(is_online) = is_online {
if let Ok(Some(is_online)) = handle.chan.try_recv() {
self.sender
.send(ThreadEvent::UIEvent(UIEvent::AccountStatusChange(
self.hash,
@ -1729,9 +1737,10 @@ impl Account {
};
}
JobRequest::Refresh { ref mut handle, .. } => {
let r = handle.chan.try_recv().unwrap();
match r {
Some(Ok(())) => {
match handle.chan.try_recv() {
Err(_) => { /* canceled */ }
Ok(None) => {}
Ok(Some(Ok(()))) => {
if self.is_online.is_err()
&& !self
.is_online
@ -1758,7 +1767,7 @@ impl Account {
.unwrap();
}
}
Some(Err(err)) => {
Ok(Some(Err(err))) => {
if !err.kind.is_authentication() {
let online_job = self.backend.read().unwrap().is_online();
if let Ok(online_job) = online_job {
@ -1777,12 +1786,10 @@ impl Account {
)))
.unwrap();
}
None => {}
}
}
JobRequest::SetFlags { ref mut handle, .. } => {
let r = handle.chan.try_recv().unwrap();
if let Some(Err(err)) = r {
if let Ok(Some(Err(err))) = handle.chan.try_recv() {
self.sender
.send(ThreadEvent::UIEvent(UIEvent::Notification(
Some(format!("{}: could not set flag", &self.name)),
@ -1797,8 +1804,7 @@ impl Account {
ref bytes,
..
} => {
let r = handle.chan.try_recv().unwrap();
if let Some(Err(err)) = r {
if let Ok(Some(Err(err))) = handle.chan.try_recv() {
melib::log(format!("Could not save message: {}", err), melib::ERROR);
let file = crate::types::create_temp_file(bytes, None, None, false);
debug!("message saved in {}", file.path.display());
@ -1823,8 +1829,7 @@ impl Account {
}
JobRequest::SendMessage => {}
JobRequest::SendMessageBackground { ref mut handle, .. } => {
let r = handle.chan.try_recv().unwrap();
if let Some(Err(err)) = r {
if let Ok(Some(Err(err))) = handle.chan.try_recv() {
self.sender
.send(ThreadEvent::UIEvent(UIEvent::Notification(
Some("Could not send message".to_string()),
@ -1839,12 +1844,15 @@ impl Account {
ref mut handle,
..
} => {
if let Err(err) = handle
if let Ok(Some(Err(err))) = handle
.chan
.try_recv()
.unwrap()
.unwrap()
.and_then(|bytes| self.save(&bytes, mailbox_hash, None))
.map_err(|_: futures::channel::oneshot::Canceled| {
MeliError::new("Job was canceled")
})
.map(|r| {
r.map(|r| r.and_then(|bytes| self.save(&bytes, mailbox_hash, None)))
})
{
self.sender
.send(ThreadEvent::UIEvent(UIEvent::Notification(
@ -1856,8 +1864,7 @@ impl Account {
}
}
JobRequest::DeleteMessages { ref mut handle, .. } => {
let r = handle.chan.try_recv().unwrap();
if let Some(Err(err)) = r {
if let Ok(Some(Err(err))) = handle.chan.try_recv() {
self.sender
.send(ThreadEvent::UIEvent(UIEvent::Notification(
Some(format!("{}: could not delete message", &self.name)),
@ -1872,8 +1879,7 @@ impl Account {
ref mut handle,
..
} => {
let r = handle.chan.try_recv().unwrap();
if let Some(r) = r {
if let Ok(Some(r)) = handle.chan.try_recv() {
match r {
Err(err) => {
self.sender
@ -1956,9 +1962,10 @@ impl Account {
ref mut handle,
..
} => {
let r = handle.chan.try_recv().unwrap();
match r {
Some(Err(err)) => {
match handle.chan.try_recv() {
Err(_) => { /* canceled */ }
Ok(None) => {}
Ok(Some(Err(err))) => {
self.sender
.send(ThreadEvent::UIEvent(UIEvent::Notification(
Some(format!("{}: could not delete mailbox", &self.name)),
@ -1967,7 +1974,7 @@ impl Account {
)))
.expect("Could not send event on main channel");
}
Some(Ok(mut mailboxes)) => {
Ok(Some(Ok(mut mailboxes))) => {
self.sender
.send(ThreadEvent::UIEvent(UIEvent::MailboxDelete((
self.hash,
@ -2022,15 +2029,15 @@ impl Account {
)))
.expect("Could not send event on main channel");
}
None => {}
}
}
//JobRequest::RenameMailbox,
JobRequest::Search { .. } | JobRequest::AsBytes { .. } => {}
JobRequest::SetMailboxPermissions { ref mut handle, .. } => {
let r = handle.chan.try_recv().unwrap();
match r {
Some(Err(err)) => {
match handle.chan.try_recv() {
Err(_) => { /* canceled */ }
Ok(None) => {}
Ok(Some(Err(err))) => {
self.sender
.send(ThreadEvent::UIEvent(UIEvent::Notification(
Some(format!(
@ -2042,7 +2049,7 @@ impl Account {
)))
.expect("Could not send event on main channel");
}
Some(Ok(_)) => {
Ok(Some(Ok(_))) => {
self.sender
.send(ThreadEvent::UIEvent(UIEvent::Notification(
Some(format!(
@ -2054,13 +2061,13 @@ impl Account {
)))
.expect("Could not send event on main channel");
}
None => {}
}
}
JobRequest::SetMailboxSubscription { ref mut handle, .. } => {
let r = handle.chan.try_recv().unwrap();
match r {
Some(Err(err)) => {
match handle.chan.try_recv() {
Err(_) => { /* canceled */ }
Ok(None) => {}
Ok(Some(Err(err))) => {
self.sender
.send(ThreadEvent::UIEvent(UIEvent::Notification(
Some(format!(
@ -2072,7 +2079,7 @@ impl Account {
)))
.expect("Could not send event on main channel");
}
Some(Ok(_)) => {
Ok(Some(Ok(_))) => {
self.sender
.send(ThreadEvent::UIEvent(UIEvent::Notification(
Some(format!(
@ -2084,14 +2091,11 @@ impl Account {
)))
.expect("Could not send event on main channel");
}
None => {}
}
}
JobRequest::Watch { ref mut handle } => {
debug!("JobRequest::Watch finished??? ");
let r = handle.chan.try_recv().unwrap();
debug!("JobRequest::Watch {:?}", r);
if let Some(Err(err)) = r {
if let Ok(Some(Err(err))) = handle.chan.try_recv() {
if err.kind.is_timeout() {
self.watch();
} else {
@ -2112,9 +2116,8 @@ impl Account {
ref mut on_finish,
logging_level,
} => {
let r = handle.chan.try_recv().unwrap();
match r {
Some(Err(err)) => {
match handle.chan.try_recv() {
Ok(Some(Err(err))) => {
self.sender
.send(ThreadEvent::UIEvent(UIEvent::Notification(
Some(format!("{}: {} failed", &self.name, name,)),
@ -2123,7 +2126,7 @@ impl Account {
)))
.expect("Could not send event on main channel");
}
Some(Ok(())) if on_finish.is_none() => {
Ok(Some(Ok(()))) if on_finish.is_none() => {
if logging_level <= melib::LoggingLevel::INFO {
self.sender
.send(ThreadEvent::UIEvent(UIEvent::Notification(
@ -2134,7 +2137,8 @@ impl Account {
.expect("Could not send event on main channel");
}
}
Some(Ok(())) | None => {}
Err(_) => { /* canceled */ }
Ok(Some(Ok(()))) | Ok(None) => {}
}
if on_finish.is_some() {
self.sender