Refactor job structs into JoinHandle

Put oneshot::channel<R> into JoinHandle<R>
jmap-eventsource
Manos Pitsidianakis 2020-10-09 19:34:55 +03:00
parent 4dd8474c30
commit a4b78532b7
Signed by: Manos Pitsidianakis
GPG Key ID: 73627C2F690DF710
10 changed files with 381 additions and 388 deletions

View File

@ -25,7 +25,7 @@ use melib::list_management;
use melib::Draft;
use crate::conf::accounts::JobRequest;
use crate::jobs::{JobChannel, JobId, JoinHandle};
use crate::jobs::JoinHandle;
use crate::terminal::embed::EmbedGrid;
use indexmap::IndexSet;
use nix::sys::wait::WaitStatus;
@ -125,7 +125,7 @@ enum ViewMode {
Embed,
SelectRecipients(UIDialog<Address>),
Send(UIConfirmationDialog),
WaitingForSendResult(UIDialog<char>, JoinHandle, JobId, JobChannel<()>),
WaitingForSendResult(UIDialog<char>, JoinHandle<Result<()>>),
}
impl ViewMode {
@ -579,11 +579,6 @@ impl Component for Composer {
context[self.account_hash].pgp.auto_sign
));
}
if self.encrypt_mail.is_unset() {
self.encrypt_mail = ToggleFlag::InternalVal(*account_settings!(
context[self.account_hash].pgp.auto_encrypt
));
}
if !self.draft.headers().contains_key("From") || self.draft.headers()["From"].is_empty()
{
self.draft.set_header(
@ -763,7 +758,7 @@ impl Component for Composer {
/* Let user choose whether to quit with/without saving or cancel */
s.draw(grid, center_area(area, s.content.size()), context);
}
ViewMode::WaitingForSendResult(ref mut s, _, _, _) => {
ViewMode::WaitingForSendResult(ref mut s, _) => {
/* Let user choose whether to wait for success or cancel */
s.draw(grid, center_area(area, s.content.size()), context);
}
@ -796,7 +791,7 @@ impl Component for Composer {
Flag::SEEN,
) {
Ok(job) => {
let (chan, handle, job_id) = context.job_executor.spawn_blocking(job);
let handle = context.job_executor.spawn_blocking(job);
self.mode = ViewMode::WaitingForSendResult(
UIDialog::new(
"Waiting for confirmation.. The tab will close automatically on successful submission.",
@ -812,7 +807,7 @@ impl Component for Composer {
))
})),
context,
), handle, job_id, chan);
), handle);
}
Err(err) => {
context.replies.push_back(UIEvent::Notification(
@ -906,7 +901,7 @@ impl Component for Composer {
}
}
(
ViewMode::WaitingForSendResult(ref selector, _, _, _),
ViewMode::WaitingForSendResult(ref selector, _),
UIEvent::FinishedUIDialog(id, result),
) if selector.id() == *id => {
if let Some(key) = result.downcast_mut::<char>() {
@ -919,12 +914,12 @@ impl Component for Composer {
}
'n' => {
self.set_dirty(true);
if let ViewMode::WaitingForSendResult(_, handle, job_id, chan) =
if let ViewMode::WaitingForSendResult(_, handle) =
std::mem::replace(&mut self.mode, ViewMode::Edit)
{
context.accounts[&self.account_hash].active_jobs.insert(
job_id,
JobRequest::SendMessageBackground(handle, chan),
handle.job_id,
JobRequest::SendMessageBackground { handle },
);
}
}
@ -934,10 +929,10 @@ impl Component for Composer {
return true;
}
(
ViewMode::WaitingForSendResult(_, _, ref our_job_id, ref mut chan),
ViewMode::WaitingForSendResult(_, ref mut handle),
UIEvent::StatusEvent(StatusEvent::JobFinished(ref job_id)),
) if *our_job_id == *job_id => {
let result = chan.try_recv().unwrap();
) if handle.job_id == *job_id => {
let result = handle.chan.try_recv().unwrap();
if let Some(Err(err)) = result {
self.mode = ViewMode::Edit;
context.replies.push_back(UIEvent::Notification(
@ -953,7 +948,7 @@ impl Component for Composer {
}
return true;
}
(ViewMode::WaitingForSendResult(ref mut selector, _, _, _), _) => {
(ViewMode::WaitingForSendResult(ref mut selector, _), _) => {
if selector.process_event(event, context) {
return true;
}
@ -1548,7 +1543,7 @@ pub fn send_draft(
mailbox_type: SpecialUsageMailbox,
flags: Flag,
complete_in_background: bool,
) -> Result<Option<(JobId, JoinHandle, JobChannel<()>)>> {
) -> Result<Option<JoinHandle<Result<()>>>> {
let format_flowed = *account_settings!(context[account_hash].composing.format_flowed);
if sign_mail.is_true() {
let mut content_type = ContentType::default();

View File

@ -177,9 +177,9 @@ pub trait MailListingTrait: ListingTrait {
));
}
Ok(fut) => {
let (channel, handle, job_id) = account.job_executor.spawn_specialized(fut);
let handle = account.job_executor.spawn_specialized(fut);
account
.insert_job(job_id, JobRequest::SetFlags(env_hashes, handle, channel));
.insert_job(handle.job_id, JobRequest::SetFlags { env_hashes, handle });
}
}
}
@ -196,9 +196,9 @@ pub trait MailListingTrait: ListingTrait {
));
}
Ok(fut) => {
let (channel, handle, job_id) = account.job_executor.spawn_specialized(fut);
let handle = account.job_executor.spawn_specialized(fut);
account
.insert_job(job_id, JobRequest::SetFlags(env_hashes, handle, channel));
.insert_job(handle.job_id, JobRequest::SetFlags { env_hashes, handle });
}
}
}
@ -215,9 +215,9 @@ pub trait MailListingTrait: ListingTrait {
));
}
Ok(fut) => {
let (channel, handle, job_id) = account.job_executor.spawn_specialized(fut);
let handle = account.job_executor.spawn_specialized(fut);
account
.insert_job(job_id, JobRequest::SetFlags(env_hashes, handle, channel));
.insert_job(handle.job_id, JobRequest::SetFlags { env_hashes, handle });
}
}
}
@ -234,9 +234,9 @@ pub trait MailListingTrait: ListingTrait {
));
}
Ok(fut) => {
let (channel, handle, job_id) = account.job_executor.spawn_specialized(fut);
let handle = account.job_executor.spawn_specialized(fut);
account
.insert_job(job_id, JobRequest::SetFlags(env_hashes, handle, channel));
.insert_job(handle.job_id, JobRequest::SetFlags { env_hashes, handle });
}
}
}
@ -253,10 +253,10 @@ pub trait MailListingTrait: ListingTrait {
));
}
Ok(fut) => {
let (channel, handle, job_id) = account.job_executor.spawn_specialized(fut);
let handle = account.job_executor.spawn_specialized(fut);
account.insert_job(
job_id,
JobRequest::DeleteMessages(env_hashes, handle, channel),
handle.job_id,
JobRequest::DeleteMessages { env_hashes, handle },
);
}
}
@ -278,13 +278,12 @@ pub trait MailListingTrait: ListingTrait {
));
}
Ok(fut) => {
let (channel, handle, job_id) = account.job_executor.spawn_specialized(fut);
let handle = account.job_executor.spawn_specialized(fut);
account.insert_job(
job_id,
handle.job_id,
JobRequest::Generic {
name: "message copying".into(),
handle,
channel,
on_finish: None,
logging_level: melib::LoggingLevel::INFO,
},
@ -316,13 +315,12 @@ pub trait MailListingTrait: ListingTrait {
));
}
Ok(fut) => {
let (channel, handle, job_id) = account.job_executor.spawn_specialized(fut);
let handle = account.job_executor.spawn_specialized(fut);
account.insert_job(
job_id,
handle.job_id,
JobRequest::Generic {
name: "message moving".into(),
handle,
channel,
on_finish: None,
logging_level: melib::LoggingLevel::INFO,
},

View File

@ -22,7 +22,7 @@
use super::EntryStrings;
use super::*;
use crate::components::utilities::PageMovement;
use crate::jobs::{oneshot, JobId};
use crate::jobs::JoinHandle;
use std::cmp;
use std::convert::TryInto;
use std::iter::FromIterator;
@ -136,16 +136,8 @@ pub struct CompactListing {
rows_drawn: SegmentTree,
rows: Vec<((usize, (ThreadHash, EnvelopeHash)), EntryStrings)>,
search_job: Option<(
String,
oneshot::Receiver<Result<SmallVec<[EnvelopeHash; 512]>>>,
JobId,
)>,
select_job: Option<(
String,
oneshot::Receiver<Result<SmallVec<[EnvelopeHash; 512]>>>,
JobId,
)>,
search_job: Option<(String, JoinHandle<Result<SmallVec<[EnvelopeHash; 512]>>>)>,
select_job: Option<(String, JoinHandle<Result<SmallVec<[EnvelopeHash; 512]>>>)>,
filter_term: String,
filtered_selection: Vec<ThreadHash>,
filtered_order: HashMap<ThreadHash, usize>,
@ -1698,13 +1690,10 @@ impl Component for CompactListing {
self.cursor_pos.1,
) {
Ok(job) => {
let (chan, handle, job_id) = context.accounts[&self.cursor_pos.0]
let handle = context.accounts[&self.cursor_pos.0]
.job_executor
.spawn_specialized(job);
context.accounts[&self.cursor_pos.0]
.active_jobs
.insert(job_id, crate::conf::accounts::JobRequest::Search(handle));
self.search_job = Some((filter_term.to_string(), chan, job_id));
self.search_job = Some((filter_term.to_string(), handle));
}
Err(err) => {
context.replies.push_back(UIEvent::Notification(
@ -1723,16 +1712,13 @@ impl Component for CompactListing {
self.cursor_pos.1,
) {
Ok(job) => {
let (mut chan, handle, job_id) = context.accounts[&self.cursor_pos.0]
let mut handle = context.accounts[&self.cursor_pos.0]
.job_executor
.spawn_specialized(job);
if let Ok(Some(search_result)) = try_recv_timeout!(&mut chan) {
if let Ok(Some(search_result)) = try_recv_timeout!(&mut handle.chan) {
self.select(search_term, search_result, context);
} else {
context.accounts[&self.cursor_pos.0]
.active_jobs
.insert(job_id, crate::conf::accounts::JobRequest::Search(handle));
self.select_job = Some((search_term.to_string(), chan, job_id));
self.select_job = Some((search_term.to_string(), handle));
}
}
Err(err) => {
@ -1749,11 +1735,11 @@ impl Component for CompactListing {
if self
.search_job
.as_ref()
.map(|(_, _, j)| j == job_id)
.map(|(_, j)| j == job_id)
.unwrap_or(false) =>
{
let (filter_term, mut rcvr, _job_id) = self.search_job.take().unwrap();
let results = rcvr.try_recv().unwrap().unwrap();
let (filter_term, mut handle) = self.search_job.take().unwrap();
let results = handle.chan.try_recv().unwrap().unwrap();
self.filter(filter_term, results, context);
self.set_dirty(true);
}
@ -1761,11 +1747,11 @@ impl Component for CompactListing {
if self
.select_job
.as_ref()
.map(|(_, _, j)| j == job_id)
.map(|(_, j)| j == job_id)
.unwrap_or(false) =>
{
let (search_term, mut rcvr, _job_id) = self.select_job.take().unwrap();
let results = rcvr.try_recv().unwrap().unwrap();
let (search_term, mut handle) = self.select_job.take().unwrap();
let results = handle.chan.try_recv().unwrap().unwrap();
self.select(&search_term, results, context);
self.set_dirty(true);
}

View File

@ -21,7 +21,7 @@
use super::*;
use crate::components::utilities::PageMovement;
use crate::jobs::{oneshot, JobId};
use crate::jobs::JoinHandle;
use std::iter::FromIterator;
macro_rules! row_attr {
@ -104,11 +104,7 @@ pub struct ConversationsListing {
/// Cache current view.
content: CellBuffer,
search_job: Option<(
String,
oneshot::Receiver<Result<SmallVec<[EnvelopeHash; 512]>>>,
JobId,
)>,
search_job: Option<(String, JoinHandle<Result<SmallVec<[EnvelopeHash; 512]>>>)>,
filter_term: String,
filtered_selection: Vec<ThreadHash>,
filtered_order: HashMap<ThreadHash, usize>,
@ -1555,13 +1551,10 @@ impl Component for ConversationsListing {
self.cursor_pos.1,
) {
Ok(job) => {
let (chan, handle, job_id) = context.accounts[&self.cursor_pos.0]
let handle = context.accounts[&self.cursor_pos.0]
.job_executor
.spawn_specialized(job);
context.accounts[&self.cursor_pos.0]
.active_jobs
.insert(job_id, crate::conf::accounts::JobRequest::Search(handle));
self.search_job = Some((filter_term.to_string(), chan, job_id));
self.search_job = Some((filter_term.to_string(), handle));
}
Err(err) => {
context.replies.push_back(UIEvent::Notification(
@ -1601,11 +1594,11 @@ impl Component for ConversationsListing {
if self
.search_job
.as_ref()
.map(|(_, _, j)| j == job_id)
.map(|(_, j)| j == job_id)
.unwrap_or(false) =>
{
let (filter_term, mut rcvr, _job_id) = self.search_job.take().unwrap();
let results = rcvr.try_recv().unwrap().unwrap();
let (filter_term, mut handle) = self.search_job.take().unwrap();
let results = handle.chan.try_recv().unwrap().unwrap();
self.filter(filter_term, results, context);
self.set_dirty(true);
}

View File

@ -22,7 +22,7 @@
use super::EntryStrings;
use super::*;
use crate::components::utilities::PageMovement;
use crate::jobs::{oneshot, JobId, JoinHandle};
use crate::jobs::{JobId, JoinHandle};
use std::cmp;
use std::iter::FromIterator;
@ -133,11 +133,7 @@ pub struct PlainListing {
/// Cache current view.
data_columns: DataColumns,
search_job: Option<(
String,
oneshot::Receiver<Result<SmallVec<[EnvelopeHash; 512]>>>,
JobId,
)>,
search_job: Option<(String, JoinHandle<Result<SmallVec<[EnvelopeHash; 512]>>>)>,
filter_term: String,
filtered_selection: Vec<EnvelopeHash>,
filtered_order: HashMap<EnvelopeHash, usize>,
@ -155,7 +151,7 @@ pub struct PlainListing {
_row_updates: SmallVec<[ThreadHash; 8]>,
color_cache: ColorCache,
active_jobs: HashMap<JobId, (JoinHandle, oneshot::Receiver<Result<()>>)>,
active_jobs: HashMap<JobId, JoinHandle<Result<()>>>,
movement: Option<PageMovement>,
id: ComponentId,
}
@ -1074,8 +1070,8 @@ impl PlainListing {
)));
}
Ok(fut) => {
let (rcvr, handle, job_id) = account.job_executor.spawn_specialized(fut);
self.active_jobs.insert(job_id, (handle, rcvr));
let handle = account.job_executor.spawn_specialized(fut);
self.active_jobs.insert(handle.job_id, handle);
}
}
self.row_updates.push(env_hash);
@ -1337,12 +1333,10 @@ impl Component for PlainListing {
self.cursor_pos.1,
) {
Ok(job) => {
let (chan, handle, job_id) = context.accounts[&self.cursor_pos.0]
let handle = context.accounts[&self.cursor_pos.0]
.job_executor
.spawn_specialized(job);
context.accounts[&self.cursor_pos.0]
.insert_job(job_id, crate::conf::accounts::JobRequest::Search(handle));
self.search_job = Some((filter_term.to_string(), chan, job_id));
self.search_job = Some((filter_term.to_string(), handle));
}
Err(err) => {
context.replies.push_back(UIEvent::Notification(
@ -1358,11 +1352,11 @@ impl Component for PlainListing {
if self
.search_job
.as_ref()
.map(|(_, _, j)| j == job_id)
.map(|(_, j)| j == job_id)
.unwrap_or(false) =>
{
let (filter_term, mut rcvr, _job_id) = self.search_job.take().unwrap();
let results = rcvr.try_recv().unwrap().unwrap();
let (filter_term, mut handle) = self.search_job.take().unwrap();
let results = handle.chan.try_recv().unwrap().unwrap();
self.filter(filter_term, results, context);
self.set_dirty(true);
}

View File

@ -580,11 +580,14 @@ impl Component for AccountStatus {
None,
);
if let JobRequest::DeleteMailbox { mailbox_hash, .. }
| JobRequest::SetMailboxPermissions(mailbox_hash, _, _)
| JobRequest::SetMailboxSubscription(mailbox_hash, _, _)
| JobRequest::CopyTo(mailbox_hash, _, _)
| JobRequest::Refresh(mailbox_hash, _, _)
| JobRequest::Fetch(mailbox_hash, _, _) = req
| JobRequest::SetMailboxPermissions { mailbox_hash, .. }
| JobRequest::SetMailboxSubscription { mailbox_hash, .. }
| JobRequest::CopyTo {
dest_mailbox_hash: mailbox_hash,
..
}
| JobRequest::Refresh { mailbox_hash, .. }
| JobRequest::Fetch { mailbox_hash, .. } = req
{
write_string_to_grid(
a.mailbox_entries[mailbox_hash].name(),

View File

@ -21,7 +21,7 @@
use super::*;
use crate::conf::accounts::JobRequest;
use crate::jobs::{oneshot, JobId};
use crate::jobs::{JobId, JoinHandle};
use melib::email::attachment_types::ContentType;
use melib::list_management;
use melib::parser::BytesExt;
@ -105,8 +105,7 @@ pub enum AttachmentDisplay {
SignedPending {
inner: Attachment,
display: Vec<AttachmentDisplay>,
chan:
std::result::Result<oneshot::Receiver<Result<()>>, oneshot::Receiver<Result<Vec<u8>>>>,
handle: std::result::Result<JoinHandle<Result<()>>, JoinHandle<Result<Vec<u8>>>>,
job_id: JobId,
},
SignedFailed {
@ -125,8 +124,7 @@ pub enum AttachmentDisplay {
},
EncryptedPending {
inner: Attachment,
chan: oneshot::Receiver<Result<(melib::pgp::DecryptionMetadata, Vec<u8>)>>,
job_id: JobId,
handle: JoinHandle<Result<(melib::pgp::DecryptionMetadata, Vec<u8>)>>,
},
EncryptedFailed {
inner: Attachment,
@ -177,8 +175,7 @@ enum MailViewState {
pending_action: Option<PendingReplyAction>,
},
LoadingBody {
job_id: JobId,
chan: oneshot::Receiver<Result<Vec<u8>>>,
handle: JoinHandle<Result<Vec<u8>>>,
pending_action: Option<PendingReplyAction>,
},
Error {
@ -282,8 +279,8 @@ impl MailView {
.and_then(|mut op| op.as_bytes())
{
Ok(fut) => {
let (mut chan, handle, job_id) =
account.job_executor.spawn_specialized(fut);
let mut handle = account.job_executor.spawn_specialized(fut);
let job_id = handle.job_id;
pending_action = if let MailViewState::Init {
ref mut pending_action,
} = self.state
@ -292,7 +289,7 @@ impl MailView {
} else {
None
};
if let Ok(Some(bytes_result)) = try_recv_timeout!(&mut chan) {
if let Ok(Some(bytes_result)) = try_recv_timeout!(&mut handle.chan) {
match bytes_result {
Ok(bytes) => {
if account
@ -333,12 +330,10 @@ impl MailView {
}
} else {
self.state = MailViewState::LoadingBody {
job_id,
chan,
handle,
pending_action: pending_action.take(),
};
self.active_jobs.insert(job_id);
account.insert_job(job_id, JobRequest::AsBytes(handle));
}
}
Err(err) => {
@ -357,10 +352,13 @@ impl MailView {
);
match job {
Ok(fut) => {
let (rcvr, handle, job_id) = account.job_executor.spawn_specialized(fut);
let handle = account.job_executor.spawn_specialized(fut);
account.insert_job(
job_id,
JobRequest::SetFlags(self.coordinates.2.into(), handle, rcvr),
handle.job_id,
JobRequest::SetFlags {
env_hashes: self.coordinates.2.into(),
handle,
},
);
}
Err(e) => {
@ -439,7 +437,7 @@ impl MailView {
SignedPending {
inner: _,
display,
chan: _,
handle: _,
job_id: _,
} => {
acc.push_str("Waiting for signature verification.\n\n");
@ -512,7 +510,7 @@ impl MailView {
| SignedPending {
inner,
display: _,
chan: _,
handle: _,
job_id: _,
}
| SignedUnverified { inner, display: _ }
@ -526,11 +524,7 @@ impl MailView {
display: _,
description: _,
}
| EncryptedPending {
inner,
chan: _,
job_id: _,
}
| EncryptedPending { inner, handle: _ }
| EncryptedFailed { inner, error: _ }
| EncryptedSuccess {
inner: _,
@ -663,9 +657,8 @@ impl MailView {
a.clone(),
Some(bin.to_string()),
);
let (chan, _handle, job_id) =
context.job_executor.spawn_blocking(verify_fut);
active_jobs.insert(job_id);
let handle = context.job_executor.spawn_blocking(verify_fut);
active_jobs.insert(handle.job_id);
acc.push(AttachmentDisplay::SignedPending {
inner: a.clone(),
display: {
@ -673,8 +666,8 @@ impl MailView {
rec(&parts[0], context, coordinates, &mut v, active_jobs);
v
},
chan: Err(chan),
job_id,
job_id: handle.job_id,
handle: Err(handle),
});
} else {
#[cfg(not(feature = "gpgme"))]
@ -705,11 +698,12 @@ impl MailView {
ctx.verify(sig, data)
}) {
Ok(verify_fut) => {
let (chan, _handle, job_id) =
let handle =
context.job_executor.spawn_specialized(verify_fut);
active_jobs.insert(job_id);
active_jobs.insert(handle.job_id);
acc.push(AttachmentDisplay::SignedPending {
inner: a.clone(),
job_id: handle.job_id,
display: {
let mut v = vec![];
rec(
@ -721,8 +715,7 @@ impl MailView {
);
v
},
chan: Ok(chan),
job_id,
handle: Ok(handle),
});
}
Err(error) => {
@ -774,13 +767,12 @@ impl MailView {
Some(bin.to_string()),
None,
);
let (chan, _handle, job_id) =
let handle =
context.job_executor.spawn_blocking(decrypt_fut);
active_jobs.insert(job_id);
active_jobs.insert(handle.job_id);
acc.push(AttachmentDisplay::EncryptedPending {
inner: a.clone(),
chan,
job_id,
handle,
});
} else {
#[cfg(not(feature = "gpgme"))]
@ -796,14 +788,13 @@ impl MailView {
ctx.decrypt(cipher)
}) {
Ok(decrypt_fut) => {
let (chan, _handle, job_id) = context
let handle = context
.job_executor
.spawn_specialized(decrypt_fut);
active_jobs.insert(job_id);
active_jobs.insert(handle.job_id);
acc.push(AttachmentDisplay::EncryptedPending {
inner: a.clone(),
chan,
job_id,
handle,
});
}
Err(error) => {
@ -875,7 +866,7 @@ impl MailView {
| SignedPending {
inner,
display: _,
chan: _,
handle: _,
job_id: _,
}
| SignedFailed {
@ -889,11 +880,7 @@ impl MailView {
description: _,
}
| SignedUnverified { inner, display: _ }
| EncryptedPending {
inner,
chan: _,
job_id: _,
}
| EncryptedPending { inner, handle: _ }
| EncryptedFailed { inner, error: _ }
| EncryptedSuccess {
inner: _,
@ -1482,11 +1469,10 @@ impl Component for MailView {
{
match self.state {
MailViewState::LoadingBody {
job_id: ref id,
ref mut chan,
ref mut handle,
pending_action: _,
} if job_id == id => {
let bytes_result = chan.try_recv().unwrap().unwrap();
} if handle.job_id == *job_id => {
let bytes_result = handle.chan.try_recv().unwrap().unwrap();
match bytes_result {
Ok(bytes) => {
if context.accounts[&self.coordinates.0]
@ -1537,14 +1523,19 @@ impl Component for MailView {
match d {
AttachmentDisplay::SignedPending {
inner,
chan,
handle,
display,
job_id: id,
} if id == job_id => {
job_id: our_job_id,
} if *our_job_id == *job_id => {
caught = true;
self.initialised = false;
match chan {
Ok(chan) => match chan.try_recv().unwrap().unwrap() {
match handle.as_mut() {
Ok(handle) => match handle
.chan
.try_recv()
.unwrap()
.unwrap()
{
Ok(()) => {
*d = AttachmentDisplay::SignedVerified {
inner: std::mem::replace(
@ -1566,7 +1557,12 @@ impl Component for MailView {
};
}
},
Err(chan) => match chan.try_recv().unwrap().unwrap() {
Err(handle) => match handle
.chan
.try_recv()
.unwrap()
.unwrap()
{
Ok(verify_bytes) => {
*d = AttachmentDisplay::SignedVerified {
inner: std::mem::replace(
@ -1593,14 +1589,12 @@ impl Component for MailView {
},
}
}
AttachmentDisplay::EncryptedPending {
inner,
chan,
job_id: id,
} if id == job_id => {
AttachmentDisplay::EncryptedPending { inner, handle }
if handle.job_id == *job_id =>
{
caught = true;
self.initialised = false;
match chan.try_recv().unwrap().unwrap() {
match handle.chan.try_recv().unwrap().unwrap() {
Ok((metadata, decrypted_bytes)) => {
let plaintext =
AttachmentBuilder::new(&decrypted_bytes)
@ -1700,7 +1694,7 @@ impl Component for MailView {
let _ = sender.send(operation?.as_bytes()?.await);
Ok(())
};
let (channel, handle, job_id) = if context.accounts[&account_hash]
let handle = if context.accounts[&account_hash]
.backend_capabilities
.is_async
{
@ -1713,11 +1707,10 @@ impl Component for MailView {
.spawn_blocking(bytes_job)
};
context.accounts[&account_hash].insert_job(
job_id,
handle.job_id,
crate::conf::accounts::JobRequest::Generic {
name: "fetch envelope".into(),
handle,
channel,
on_finish: Some(CallbackFn(Box::new(move |context: &mut Context| {
let result = receiver.try_recv().unwrap().unwrap();
match result.and_then(|bytes| {

View File

@ -24,7 +24,7 @@
*/
use super::{AccountConf, FileMailboxConf};
use crate::jobs::{JobChannel, JobExecutor, JobId, JoinHandle};
use crate::jobs::{JobExecutor, JobId, JoinHandle};
use indexmap::IndexMap;
use melib::backends::*;
use melib::email::*;
@ -40,7 +40,6 @@ use std::collections::{HashMap, HashSet};
use crate::types::UIEvent::{self, EnvelopeRemove, EnvelopeRename, EnvelopeUpdate, Notification};
use crate::{StatusEvent, ThreadEvent};
use crossbeam::Sender;
use futures::channel::oneshot;
use futures::future::FutureExt;
pub use futures::stream::Stream;
use futures::stream::StreamExt;
@ -160,87 +159,112 @@ pub struct Account {
}
pub enum JobRequest {
Mailboxes(
JoinHandle,
oneshot::Receiver<Result<HashMap<MailboxHash, Mailbox>>>,
),
Fetch(
MailboxHash,
JoinHandle,
oneshot::Receiver<(
Mailboxes {
handle: JoinHandle<Result<HashMap<MailboxHash, Mailbox>>>,
},
Fetch {
mailbox_hash: MailboxHash,
handle: JoinHandle<(
Option<Result<Vec<Envelope>>>,
Pin<Box<dyn Stream<Item = Result<Vec<Envelope>>> + Send + 'static>>,
)>,
),
},
Generic {
name: Cow<'static, str>,
logging_level: melib::LoggingLevel,
handle: JoinHandle,
channel: JobChannel<()>,
handle: JoinHandle<Result<()>>,
on_finish: Option<crate::types::CallbackFn>,
},
IsOnline(JoinHandle, oneshot::Receiver<Result<()>>),
Refresh(MailboxHash, JoinHandle, oneshot::Receiver<Result<()>>),
SetFlags(EnvelopeHashBatch, JoinHandle, oneshot::Receiver<Result<()>>),
IsOnline {
handle: JoinHandle<Result<()>>,
},
Refresh {
mailbox_hash: MailboxHash,
handle: JoinHandle<Result<()>>,
},
SetFlags {
env_hashes: EnvelopeHashBatch,
handle: JoinHandle<Result<()>>,
},
SaveMessage {
bytes: Vec<u8>,
mailbox_hash: MailboxHash,
handle: JoinHandle,
channel: oneshot::Receiver<Result<()>>,
handle: JoinHandle<Result<()>>,
},
SendMessage,
SendMessageBackground(JoinHandle, JobChannel<()>),
CopyTo(MailboxHash, JoinHandle, oneshot::Receiver<Result<Vec<u8>>>),
DeleteMessages(EnvelopeHashBatch, JoinHandle, oneshot::Receiver<Result<()>>),
SendMessageBackground {
handle: JoinHandle<Result<()>>,
},
CopyTo {
dest_mailbox_hash: MailboxHash,
handle: JoinHandle<Result<Vec<u8>>>,
},
DeleteMessages {
env_hashes: EnvelopeHashBatch,
handle: JoinHandle<Result<()>>,
},
CreateMailbox {
path: String,
handle: JoinHandle,
channel: JobChannel<(MailboxHash, HashMap<MailboxHash, Mailbox>)>,
handle: JoinHandle<Result<(MailboxHash, HashMap<MailboxHash, Mailbox>)>>,
},
DeleteMailbox {
mailbox_hash: MailboxHash,
handle: JoinHandle,
channel: JobChannel<HashMap<MailboxHash, Mailbox>>,
handle: JoinHandle<Result<HashMap<MailboxHash, Mailbox>>>,
},
//RenameMailbox,
Search(JoinHandle),
AsBytes(JoinHandle),
SetMailboxPermissions(MailboxHash, JoinHandle, oneshot::Receiver<Result<()>>),
SetMailboxSubscription(MailboxHash, JoinHandle, oneshot::Receiver<Result<()>>),
Search {
handle: JoinHandle<Result<()>>,
},
AsBytes {
handle: JoinHandle<Result<()>>,
},
SetMailboxPermissions {
mailbox_hash: MailboxHash,
handle: JoinHandle<Result<()>>,
},
SetMailboxSubscription {
mailbox_hash: MailboxHash,
handle: JoinHandle<Result<()>>,
},
Watch {
channel: oneshot::Receiver<Result<()>>,
handle: JoinHandle,
handle: JoinHandle<Result<()>>,
},
}
impl Drop for JobRequest {
fn drop(&mut self) {
match self {
JobRequest::Generic { handle, .. } => handle.0.cancel(),
JobRequest::Mailboxes(h, _) => h.0.cancel(),
JobRequest::Fetch(_, h, _) => h.0.cancel(),
JobRequest::IsOnline(h, _) => h.0.cancel(),
JobRequest::Refresh(_, h, _) => h.0.cancel(),
JobRequest::SetFlags(_, h, _) => h.0.cancel(),
JobRequest::SaveMessage { handle, .. } => handle.0.cancel(),
JobRequest::CopyTo(_, h, _) => h.0.cancel(),
JobRequest::DeleteMessages(_, h, _) => h.0.cancel(),
JobRequest::CreateMailbox { handle, .. } => handle.0.cancel(),
JobRequest::DeleteMailbox { handle, .. } => handle.0.cancel(),
JobRequest::Generic { handle, .. } |
JobRequest::IsOnline { handle, .. } |
JobRequest::Refresh { handle, .. } |
JobRequest::SetFlags { handle, .. } |
JobRequest::SaveMessage { handle, .. } |
//JobRequest::RenameMailbox,
JobRequest::Search(h) => h.0.cancel(),
JobRequest::AsBytes(h) => h.0.cancel(),
JobRequest::SetMailboxPermissions(_, h, _) => {
h.0.cancel();
JobRequest::Search { handle, .. } |
JobRequest::AsBytes { handle, .. } |
JobRequest::SetMailboxPermissions { handle, .. } |
JobRequest::SetMailboxSubscription { handle, .. } |
JobRequest::Watch { handle, .. } |
JobRequest::SendMessageBackground { handle, .. } => {
handle.cancel();
}
JobRequest::SetMailboxSubscription(_, h, _) => {
h.0.cancel();
JobRequest::DeleteMessages { handle, .. } => {
handle.cancel();
}
JobRequest::Watch { handle, .. } => handle.0.cancel(),
JobRequest::CreateMailbox { handle, .. } => {
handle.cancel();
}
JobRequest::DeleteMailbox { handle, .. } => {
handle.cancel();
}
JobRequest::Fetch { handle, .. } => {
handle.cancel();
}
JobRequest::Mailboxes { handle, .. } => {
handle.cancel();
}
JobRequest::CopyTo { handle, .. } => { handle.cancel(); }
JobRequest::SendMessage => {}
JobRequest::SendMessageBackground(h, _) => {
h.0.cancel();
}
}
}
}
@ -249,30 +273,32 @@ impl core::fmt::Debug for JobRequest {
fn fmt(&self, f: &mut core::fmt::Formatter) -> core::fmt::Result {
match self {
JobRequest::Generic { name, .. } => write!(f, "JobRequest::Generic({})", name),
JobRequest::Mailboxes(_, _) => write!(f, "JobRequest::Mailboxes"),
JobRequest::Fetch(hash, _, _) => write!(f, "JobRequest::Fetch({})", hash),
JobRequest::IsOnline(_, _) => write!(f, "JobRequest::IsOnline"),
JobRequest::Refresh(_, _, _) => write!(f, "JobRequest::Refresh"),
JobRequest::SetFlags(_, _, _) => write!(f, "JobRequest::SetFlags"),
JobRequest::Mailboxes { .. } => write!(f, "JobRequest::Mailboxes"),
JobRequest::Fetch { mailbox_hash, .. } => {
write!(f, "JobRequest::Fetch({})", mailbox_hash)
}
JobRequest::IsOnline { .. } => write!(f, "JobRequest::IsOnline"),
JobRequest::Refresh { .. } => write!(f, "JobRequest::Refresh"),
JobRequest::SetFlags { .. } => write!(f, "JobRequest::SetFlags"),
JobRequest::SaveMessage { .. } => write!(f, "JobRequest::SaveMessage"),
JobRequest::CopyTo(_, _, _) => write!(f, "JobRequest::CopyTo"),
JobRequest::DeleteMessages(_, _, _) => write!(f, "JobRequest::DeleteMessages"),
JobRequest::CopyTo { .. } => write!(f, "JobRequest::CopyTo"),
JobRequest::DeleteMessages { .. } => write!(f, "JobRequest::DeleteMessages"),
JobRequest::CreateMailbox { .. } => write!(f, "JobRequest::CreateMailbox"),
JobRequest::DeleteMailbox { mailbox_hash, .. } => {
write!(f, "JobRequest::DeleteMailbox({})", mailbox_hash)
}
//JobRequest::RenameMailbox,
JobRequest::Search(_) => write!(f, "JobRequest::Search"),
JobRequest::AsBytes(_) => write!(f, "JobRequest::AsBytes"),
JobRequest::SetMailboxPermissions(_, _, _) => {
JobRequest::Search { .. } => write!(f, "JobRequest::Search"),
JobRequest::AsBytes { .. } => write!(f, "JobRequest::AsBytes"),
JobRequest::SetMailboxPermissions { .. } => {
write!(f, "JobRequest::SetMailboxPermissions")
}
JobRequest::SetMailboxSubscription(_, _, _) => {
JobRequest::SetMailboxSubscription { .. } => {
write!(f, "JobRequest::SetMailboxSubscription")
}
JobRequest::Watch { .. } => write!(f, "JobRequest::Watch"),
JobRequest::SendMessage => write!(f, "JobRequest::SendMessage"),
JobRequest::SendMessageBackground(_, _) => {
JobRequest::SendMessageBackground { .. } => {
write!(f, "JobRequest::SendMessageBackground")
}
}
@ -283,33 +309,33 @@ impl core::fmt::Display for JobRequest {
fn fmt(&self, f: &mut core::fmt::Formatter) -> core::fmt::Result {
match self {
JobRequest::Generic { name, .. } => write!(f, "{}", name),
JobRequest::Mailboxes(_, _) => write!(f, "Get mailbox list"),
JobRequest::Fetch(_, _, _) => write!(f, "Mailbox fetch"),
JobRequest::IsOnline(_, _) => write!(f, "Online status check"),
JobRequest::Refresh(_, _, _) => write!(f, "Refresh mailbox"),
JobRequest::SetFlags(batch, _, _) => write!(
JobRequest::Mailboxes { .. } => write!(f, "Get mailbox list"),
JobRequest::Fetch { .. } => write!(f, "Mailbox fetch"),
JobRequest::IsOnline { .. } => write!(f, "Online status check"),
JobRequest::Refresh { .. } => write!(f, "Refresh mailbox"),
JobRequest::SetFlags { env_hashes, .. } => write!(
f,
"Set flags for {} message{}",
batch.len(),
if batch.len() == 1 { "" } else { "s" }
env_hashes.len(),
if env_hashes.len() == 1 { "" } else { "s" }
),
JobRequest::SaveMessage { .. } => write!(f, "Save message"),
JobRequest::CopyTo(_, _, _) => write!(f, "Copy message."),
JobRequest::DeleteMessages(batch, _, _) => write!(
JobRequest::CopyTo { .. } => write!(f, "Copy message."),
JobRequest::DeleteMessages { env_hashes, .. } => write!(
f,
"Delete {} message{}",
batch.len(),
if batch.len() == 1 { "" } else { "s" }
env_hashes.len(),
if env_hashes.len() == 1 { "" } else { "s" }
),
JobRequest::CreateMailbox { path, .. } => write!(f, "Create mailbox {}", path),
JobRequest::DeleteMailbox { .. } => write!(f, "Delete mailbox"),
//JobRequest::RenameMailbox,
JobRequest::Search(_) => write!(f, "Search"),
JobRequest::AsBytes(_) => write!(f, "Message body fetch"),
JobRequest::SetMailboxPermissions(_, _, _) => write!(f, "Set mailbox permissions"),
JobRequest::SetMailboxSubscription(_, _, _) => write!(f, "Set mailbox subscription"),
JobRequest::Search { .. } => write!(f, "Search"),
JobRequest::AsBytes { .. } => write!(f, "Message body fetch"),
JobRequest::SetMailboxPermissions { .. } => write!(f, "Set mailbox permissions"),
JobRequest::SetMailboxSubscription { .. } => write!(f, "Set mailbox subscription"),
JobRequest::Watch { .. } => write!(f, "Background watch"),
JobRequest::SendMessageBackground(_, _) | JobRequest::SendMessage => {
JobRequest::SendMessageBackground { .. } | JobRequest::SendMessage => {
write!(f, "Sending message")
}
}
@ -326,14 +352,16 @@ impl JobRequest {
pub fn is_fetch(&self, mailbox_hash: MailboxHash) -> bool {
match self {
JobRequest::Fetch(h, _, _) if *h == mailbox_hash => true,
JobRequest::Fetch {
mailbox_hash: h, ..
} if *h == mailbox_hash => true,
_ => false,
}
}
pub fn is_online(&self) -> bool {
match self {
JobRequest::IsOnline(_, _) => true,
JobRequest::IsOnline { .. } => true,
_ => false,
}
}
@ -457,12 +485,13 @@ impl Account {
let mut active_job_instants = BTreeMap::default();
if let Ok(mailboxes_job) = backend.mailboxes() {
if let Ok(online_job) = backend.is_online() {
let (rcvr, handle, job_id) = if backend.capabilities().is_async {
let handle = if backend.capabilities().is_async {
job_executor.spawn_specialized(online_job.then(|_| mailboxes_job))
} else {
job_executor.spawn_blocking(online_job.then(|_| mailboxes_job))
};
active_jobs.insert(job_id, JobRequest::Mailboxes(handle, rcvr));
let job_id = handle.job_id;
active_jobs.insert(job_id, JobRequest::Mailboxes { handle });
active_job_instants.insert(std::time::Instant::now(), job_id);
sender
.send(ThreadEvent::UIEvent(UIEvent::StatusEvent(
@ -625,18 +654,24 @@ impl Account {
entry.status = MailboxStatus::Parsing(0, total);
if let Ok(mailbox_job) = self.backend.write().unwrap().fetch(*h) {
let mailbox_job = mailbox_job.into_future();
let (rcvr, handle, job_id) = if self.backend_capabilities.is_async {
let handle = if self.backend_capabilities.is_async {
self.job_executor.spawn_specialized(mailbox_job)
} else {
self.job_executor.spawn_blocking(mailbox_job)
};
let job_id = handle.job_id;
self.sender
.send(ThreadEvent::UIEvent(UIEvent::StatusEvent(
StatusEvent::NewJob(job_id),
)))
.unwrap();
self.active_jobs
.insert(job_id, JobRequest::Fetch(*h, handle, rcvr));
self.active_jobs.insert(
job_id,
JobRequest::Fetch {
mailbox_hash: *h,
handle,
},
);
self.active_job_instants
.insert(std::time::Instant::now(), job_id);
}
@ -695,10 +730,9 @@ impl Account {
);
}
Ok(job) => {
let (channel, handle, job_id) =
self.job_executor.spawn_blocking(job);
let handle = self.job_executor.spawn_blocking(job);
self.insert_job(
job_id,
handle.job_id,
JobRequest::Generic {
name: format!(
"Update envelope {} in sqlite3 cache",
@ -706,7 +740,6 @@ impl Account {
)
.into(),
handle,
channel,
logging_level: melib::LoggingLevel::TRACE,
on_finish: None,
},
@ -743,10 +776,9 @@ impl Account {
)
}) {
Ok(job) => {
let (channel, handle, job_id) =
self.job_executor.spawn_blocking(job);
let handle = self.job_executor.spawn_blocking(job);
self.insert_job(
job_id,
handle.job_id,
JobRequest::Generic {
name: format!(
"Update envelope {} in sqlite3 cache",
@ -755,7 +787,6 @@ impl Account {
)
.into(),
handle,
channel,
logging_level: melib::LoggingLevel::TRACE,
on_finish: None,
},
@ -803,10 +834,9 @@ impl Account {
);
}
Ok(job) => {
let (channel, handle, job_id) =
self.job_executor.spawn_blocking(job);
let handle = self.job_executor.spawn_blocking(job);
self.insert_job(
job_id,
handle.job_id,
JobRequest::Generic {
name: format!(
"Update envelope {} in sqlite3 cache",
@ -815,7 +845,6 @@ impl Account {
)
.into(),
handle,
channel,
logging_level: melib::LoggingLevel::TRACE,
on_finish: None,
},
@ -845,14 +874,13 @@ impl Account {
};
#[cfg(feature = "sqlite3")]
if self.settings.conf.search_backend == crate::conf::SearchBackend::Sqlite3 {
let (channel, handle, job_id) =
self.job_executor.spawn_blocking(crate::sqlite3::insert(
(*envelope).clone(),
self.backend.clone(),
self.name.clone(),
));
let handle = self.job_executor.spawn_blocking(crate::sqlite3::insert(
(*envelope).clone(),
self.backend.clone(),
self.name.clone(),
));
self.insert_job(
job_id,
handle.job_id,
JobRequest::Generic {
name: format!(
"Update envelope {} in sqlite3 cache",
@ -860,7 +888,6 @@ impl Account {
)
.into(),
handle,
channel,
logging_level: melib::LoggingLevel::TRACE,
on_finish: None,
},
@ -1006,12 +1033,18 @@ impl Account {
}
let refresh_job = self.backend.write().unwrap().refresh(mailbox_hash);
if let Ok(refresh_job) = refresh_job {
let (rcvr, handle, job_id) = if self.backend_capabilities.is_async {
let handle = if self.backend_capabilities.is_async {
self.job_executor.spawn_specialized(refresh_job)
} else {
self.job_executor.spawn_blocking(refresh_job)
};
self.insert_job(job_id, JobRequest::Refresh(mailbox_hash, handle, rcvr));
self.insert_job(
handle.job_id,
JobRequest::Refresh {
mailbox_hash,
handle,
},
);
}
Ok(())
}
@ -1024,13 +1057,13 @@ impl Account {
if !self.active_jobs.values().any(|j| j.is_watch()) {
match self.backend.read().unwrap().watch() {
Ok(fut) => {
let (channel, handle, job_id) = if self.backend_capabilities.is_async {
let handle = if self.backend_capabilities.is_async {
self.job_executor.spawn_specialized(fut)
} else {
self.job_executor.spawn_blocking(fut)
};
self.active_jobs
.insert(job_id, JobRequest::Watch { channel, handle });
.insert(handle.job_id, JobRequest::Watch { handle });
}
Err(e) => {
self.sender
@ -1096,12 +1129,18 @@ impl Account {
match mailbox_job {
Ok(mailbox_job) => {
let mailbox_job = mailbox_job.into_future();
let (rcvr, handle, job_id) = if self.backend_capabilities.is_async {
let handle = if self.backend_capabilities.is_async {
self.job_executor.spawn_specialized(mailbox_job)
} else {
self.job_executor.spawn_blocking(mailbox_job)
};
self.insert_job(job_id, JobRequest::Fetch(mailbox_hash, handle, rcvr));
self.insert_job(
handle.job_id,
JobRequest::Fetch {
mailbox_hash,
handle,
},
);
}
Err(err) => {
self.mailbox_entries
@ -1192,18 +1231,17 @@ impl Account {
.unwrap()
.save(bytes.to_vec(), mailbox_hash, flags)?;
let (channel, handle, job_id) = if self.backend_capabilities.is_async {
let handle = if self.backend_capabilities.is_async {
self.job_executor.spawn_specialized(job)
} else {
self.job_executor.spawn_blocking(job)
};
self.insert_job(
job_id,
handle.job_id,
JobRequest::SaveMessage {
bytes: bytes.to_vec(),
mailbox_hash,
handle,
channel,
},
);
Ok(())
@ -1214,7 +1252,7 @@ impl Account {
message: String,
send_mail: crate::conf::composing::SendMail,
complete_in_background: bool,
) -> Result<Option<(JobId, JoinHandle, JobChannel<()>)>> {
) -> Result<Option<JoinHandle<Result<()>>>> {
use crate::conf::composing::SendMail;
use std::io::Write;
use std::process::{Command, Stdio};
@ -1262,18 +1300,18 @@ impl Account {
}
#[cfg(feature = "smtp")]
SendMail::Smtp(conf) => {
let (chan, handle, job_id) = self.job_executor.spawn_specialized(async move {
let handle = self.job_executor.spawn_specialized(async move {
let mut smtp_connection =
melib::smtp::SmtpConnection::new_connection(conf).await?;
smtp_connection.mail_transaction(&message, None).await
});
if complete_in_background {
self.insert_job(job_id, JobRequest::SendMessageBackground(handle, chan));
self.insert_job(handle.job_id, JobRequest::SendMessageBackground { handle });
return Ok(None);
} else {
self.insert_job(job_id, JobRequest::SendMessage);
self.insert_job(handle.job_id, JobRequest::SendMessage);
}
Ok(Some((job_id, handle, chan)))
Ok(Some(handle))
}
}
}
@ -1381,19 +1419,12 @@ impl Account {
.write()
.unwrap()
.create_mailbox(path.to_string())?;
let (channel, handle, job_id) = if self.backend_capabilities.is_async {
let handle = if self.backend_capabilities.is_async {
self.job_executor.spawn_specialized(job)
} else {
self.job_executor.spawn_blocking(job)
};
self.insert_job(
job_id,
JobRequest::CreateMailbox {
path,
handle,
channel,
},
);
self.insert_job(handle.job_id, JobRequest::CreateMailbox { path, handle });
Ok(())
}
MailboxOperation::Delete(path) => {
@ -1403,17 +1434,16 @@ impl Account {
let mailbox_hash = self.mailbox_by_path(&path)?;
let job = self.backend.write().unwrap().delete_mailbox(mailbox_hash)?;
let (channel, handle, job_id) = if self.backend_capabilities.is_async {
let handle = if self.backend_capabilities.is_async {
self.job_executor.spawn_specialized(job)
} else {
self.job_executor.spawn_blocking(job)
};
self.insert_job(
job_id,
handle.job_id,
JobRequest::DeleteMailbox {
mailbox_hash,
handle,
channel,
},
);
Ok(())
@ -1498,12 +1528,12 @@ impl Account {
if !self.active_jobs.values().any(JobRequest::is_online) {
let online_job = self.backend.read().unwrap().is_online();
if let Ok(online_job) = online_job {
let (rcvr, handle, job_id) = if self.backend_capabilities.is_async {
let handle = if self.backend_capabilities.is_async {
self.job_executor.spawn_specialized(online_job)
} else {
self.job_executor.spawn_blocking(online_job)
};
self.insert_job(job_id, JobRequest::IsOnline(handle, rcvr));
self.insert_job(handle.job_id, JobRequest::IsOnline { handle });
}
}
return self.is_online.clone();
@ -1563,8 +1593,8 @@ impl Account {
if self.active_jobs.contains_key(job_id) {
match self.active_jobs.remove(job_id).unwrap() {
JobRequest::Mailboxes(_, ref mut chan) => {
if let Some(mailboxes) = chan.try_recv().unwrap() {
JobRequest::Mailboxes { ref mut handle } => {
if let Some(mailboxes) = handle.chan.try_recv().unwrap() {
self.sender
.send(ThreadEvent::UIEvent(UIEvent::AccountStatusChange(
self.hash,
@ -1584,19 +1614,23 @@ impl Account {
}
let mailboxes_job = self.backend.read().unwrap().mailboxes();
if let Ok(mailboxes_job) = mailboxes_job {
let (rcvr, handle, job_id) = if self.backend_capabilities.is_async {
let handle = if self.backend_capabilities.is_async {
self.job_executor.spawn_specialized(mailboxes_job)
} else {
self.job_executor.spawn_blocking(mailboxes_job)
};
self.insert_job(job_id, JobRequest::Mailboxes(handle, rcvr));
self.insert_job(handle.job_id, JobRequest::Mailboxes { handle });
};
}
}
}
JobRequest::Fetch(mailbox_hash, _, ref mut chan) => {
JobRequest::Fetch {
mailbox_hash,
ref mut handle,
..
} => {
let (payload, rest): (Option<Result<Vec<Envelope>>>, _) =
chan.try_recv().unwrap().unwrap();
handle.chan.try_recv().unwrap().unwrap();
debug!("got payload in status for {}", mailbox_hash);
if payload.is_none() {
debug!("finished in status for {}", mailbox_hash);
@ -1613,12 +1647,18 @@ impl Account {
.unwrap();
return true;
}
let (rcvr, handle, job_id) = if self.backend_capabilities.is_async {
let handle = if self.backend_capabilities.is_async {
self.job_executor.spawn_specialized(rest.into_future())
} else {
self.job_executor.spawn_blocking(rest.into_future())
};
self.insert_job(job_id, JobRequest::Fetch(mailbox_hash, handle, rcvr));
self.insert_job(
handle.job_id,
JobRequest::Fetch {
mailbox_hash,
handle,
},
);
let payload = payload.unwrap();
if let Err(err) = payload {
self.sender
@ -1663,8 +1703,8 @@ impl Account {
))))
.unwrap();
}
JobRequest::IsOnline(_, ref mut chan) => {
let is_online = chan.try_recv().unwrap();
JobRequest::IsOnline { ref mut handle, .. } => {
let is_online = handle.chan.try_recv().unwrap();
if let Some(is_online) = is_online {
self.sender
.send(ThreadEvent::UIEvent(UIEvent::AccountStatusChange(
@ -1689,16 +1729,16 @@ impl Account {
}
let online_job = self.backend.read().unwrap().is_online();
if let Ok(online_job) = online_job {
let (rcvr, handle, job_id) = if self.backend_capabilities.is_async {
let handle = if self.backend_capabilities.is_async {
self.job_executor.spawn_specialized(online_job)
} else {
self.job_executor.spawn_blocking(online_job)
};
self.insert_job(job_id, JobRequest::IsOnline(handle, rcvr));
self.insert_job(handle.job_id, JobRequest::IsOnline { handle });
};
}
JobRequest::Refresh(_mailbox_hash, _, ref mut chan) => {
let r = chan.try_recv().unwrap();
JobRequest::Refresh { ref mut handle, .. } => {
let r = handle.chan.try_recv().unwrap();
match r {
Some(Ok(())) => {
if self.is_online.is_err()
@ -1731,13 +1771,12 @@ impl Account {
if !err.kind.is_authentication() {
let online_job = self.backend.read().unwrap().is_online();
if let Ok(online_job) = online_job {
let (rcvr, handle, job_id) =
if self.backend_capabilities.is_async {
self.job_executor.spawn_specialized(online_job)
} else {
self.job_executor.spawn_blocking(online_job)
};
self.insert_job(job_id, JobRequest::IsOnline(handle, rcvr));
let handle = if self.backend_capabilities.is_async {
self.job_executor.spawn_specialized(online_job)
} else {
self.job_executor.spawn_blocking(online_job)
};
self.insert_job(handle.job_id, JobRequest::IsOnline { handle });
};
}
self.is_online = Err(err);
@ -1750,8 +1789,8 @@ impl Account {
None => {}
}
}
JobRequest::SetFlags(_, _, ref mut chan) => {
let r = chan.try_recv().unwrap();
JobRequest::SetFlags { ref mut handle, .. } => {
let r = handle.chan.try_recv().unwrap();
if let Some(Err(err)) = r {
self.sender
.send(ThreadEvent::UIEvent(UIEvent::Notification(
@ -1763,11 +1802,11 @@ impl Account {
}
}
JobRequest::SaveMessage {
ref mut channel,
ref mut handle,
ref bytes,
..
} => {
let r = channel.try_recv().unwrap();
let r = handle.chan.try_recv().unwrap();
if let Some(Err(err)) = r {
melib::log(format!("Could not save message: {}", err), melib::ERROR);
let file = crate::types::create_temp_file(bytes, None, None, false);
@ -1792,8 +1831,8 @@ impl Account {
}
}
JobRequest::SendMessage => {}
JobRequest::SendMessageBackground(_, ref mut chan) => {
let r = chan.try_recv().unwrap();
JobRequest::SendMessageBackground { ref mut handle, .. } => {
let r = handle.chan.try_recv().unwrap();
if let Some(Err(err)) = r {
self.sender
.send(ThreadEvent::UIEvent(UIEvent::Notification(
@ -1804,8 +1843,13 @@ impl Account {
.expect("Could not send event on main channel");
}
}
JobRequest::CopyTo(mailbox_hash, _, ref mut chan) => {
if let Err(err) = chan
JobRequest::CopyTo {
dest_mailbox_hash: mailbox_hash,
ref mut handle,
..
} => {
if let Err(err) = handle
.chan
.try_recv()
.unwrap()
.unwrap()
@ -1820,8 +1864,8 @@ impl Account {
.expect("Could not send event on main channel");
}
}
JobRequest::DeleteMessages(_, _, ref mut chan) => {
let r = chan.try_recv().unwrap();
JobRequest::DeleteMessages { ref mut handle, .. } => {
let r = handle.chan.try_recv().unwrap();
if let Some(Err(err)) = r {
self.sender
.send(ThreadEvent::UIEvent(UIEvent::Notification(
@ -1834,10 +1878,10 @@ impl Account {
}
JobRequest::CreateMailbox {
ref path,
ref mut channel,
ref mut handle,
..
} => {
let r = channel.try_recv().unwrap();
let r = handle.chan.try_recv().unwrap();
if let Some(r) = r {
match r {
Err(err) => {
@ -1918,10 +1962,10 @@ impl Account {
}
JobRequest::DeleteMailbox {
mailbox_hash,
ref mut channel,
ref mut handle,
..
} => {
let r = channel.try_recv().unwrap();
let r = handle.chan.try_recv().unwrap();
match r {
Some(Err(err)) => {
self.sender
@ -1991,9 +2035,9 @@ impl Account {
}
}
//JobRequest::RenameMailbox,
JobRequest::Search(_) | JobRequest::AsBytes(_) => {}
JobRequest::SetMailboxPermissions(_, _, ref mut chan) => {
let r = chan.try_recv().unwrap();
JobRequest::Search { .. } | JobRequest::AsBytes { .. } => {}
JobRequest::SetMailboxPermissions { ref mut handle, .. } => {
let r = handle.chan.try_recv().unwrap();
match r {
Some(Err(err)) => {
self.sender
@ -2022,8 +2066,8 @@ impl Account {
None => {}
}
}
JobRequest::SetMailboxSubscription(_, _, ref mut chan) => {
let r = chan.try_recv().unwrap();
JobRequest::SetMailboxSubscription { ref mut handle, .. } => {
let r = handle.chan.try_recv().unwrap();
match r {
Some(Err(err)) => {
self.sender
@ -2052,12 +2096,9 @@ impl Account {
None => {}
}
}
JobRequest::Watch {
ref mut channel,
handle: _,
} => {
JobRequest::Watch { ref mut handle } => {
debug!("JobRequest::Watch finished??? ");
let r = channel.try_recv().unwrap();
let r = handle.chan.try_recv().unwrap();
debug!("JobRequest::Watch {:?}", r);
if let Some(Err(err)) = r {
if err.kind.is_timeout() {
@ -2076,12 +2117,11 @@ impl Account {
}
JobRequest::Generic {
ref name,
ref mut channel,
handle: _,
ref mut handle,
ref mut on_finish,
logging_level,
} => {
let r = channel.try_recv().unwrap();
let r = handle.chan.try_recv().unwrap();
match r {
Some(Err(err)) => {
self.sender

View File

@ -30,9 +30,7 @@ use melib::error::Result;
use melib::smol;
use std::future::Future;
use std::panic::catch_unwind;
use std::pin::Pin;
use std::sync::Arc;
use std::task::{Context, Poll};
use std::thread;
use std::time::Duration;
use uuid::Uuid;
@ -159,37 +157,8 @@ impl JobExecutor {
ret
}
/// Spawns a future on the executor.
pub fn spawn<F>(&self, future: F) -> (JoinHandle, JobId)
where
F: Future<Output = Result<()>> + Send + 'static,
{
let job_id = JobId::new();
let finished_sender = self.sender.clone();
let injector = self.global_queue.clone();
// Create a task and schedule it for execution.
let (task, handle) = async_task::spawn(
async move {
let r = future.await;
finished_sender
.send(ThreadEvent::JobFinished(job_id))
.unwrap();
r
},
move |task| injector.push(MeliTask { task, id: job_id }),
(),
);
task.schedule();
for unparker in self.parkers.iter() {
unparker.unpark();
}
// Return a join handle that retrieves the output of the future.
(JoinHandle(handle), job_id)
}
/// Spawns a future with a generic return value `R`
pub fn spawn_specialized<F, R>(&self, future: F) -> (oneshot::Receiver<R>, JoinHandle, JobId)
pub fn spawn_specialized<F, R>(&self, future: F) -> JoinHandle<R>
where
F: Future<Output = R> + Send + 'static,
R: Send + 'static,
@ -216,11 +185,15 @@ impl JobExecutor {
unparker.unpark();
}
(receiver, JoinHandle(handle), job_id)
JoinHandle {
inner: handle,
chan: receiver,
job_id,
}
}
/// Spawns a future with a generic return value `R` that might block on a new thread
pub fn spawn_blocking<F, R>(&self, future: F) -> (oneshot::Receiver<R>, JoinHandle, JobId)
pub fn spawn_blocking<F, R>(&self, future: F) -> JoinHandle<R>
where
F: Future<Output = R> + Send + 'static,
R: Send + 'static,
@ -229,19 +202,39 @@ impl JobExecutor {
}
}
pub type JobChannel<T> = oneshot::Receiver<Result<T>>;
pub type JobChannel<T> = oneshot::Receiver<T>;
#[derive(Debug)]
/// JoinHandle for the future that allows us to cancel the task.
pub struct JoinHandle(pub async_task::JoinHandle<Result<()>, ()>);
pub struct JoinHandle<T> {
pub inner: async_task::JoinHandle<Result<()>, ()>,
pub chan: JobChannel<T>,
pub job_id: JobId,
}
impl<T> JoinHandle<T> {
pub fn cancel(&self) {
self.inner.cancel()
}
}
impl<T> std::cmp::PartialEq<JobId> for JoinHandle<T> {
fn eq(&self, other: &JobId) -> bool {
self.job_id == *other
}
}
/*
use std::pin::Pin;
use std::task::{Context, Poll};
impl Future for JoinHandle {
type Output = Result<()>;
fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
match Pin::new(&mut self.0).poll(cx) {
match Pin::new(&mut self.inner).poll(cx) {
Poll::Pending => Poll::Pending,
Poll::Ready(output) => Poll::Ready(output.expect("task failed")),
}
}
}
*/

View File

@ -873,14 +873,12 @@ impl State {
}
match crate::sqlite3::index(&mut self.context, account_index) {
Ok(job) => {
let (channel, handle, job_id) =
self.context.job_executor.spawn_blocking(job);
let handle = self.context.job_executor.spawn_blocking(job);
self.context.accounts[account_index].active_jobs.insert(
job_id,
handle.job_id,
crate::conf::accounts::JobRequest::Generic {
name: "Message index rebuild".into(),
handle,
channel,
on_finish: None,
logging_level: melib::LoggingLevel::INFO,
},