diff --git a/src/components/mail/compose.rs b/src/components/mail/compose.rs index 29f1f57f3..8fb0e4a55 100644 --- a/src/components/mail/compose.rs +++ b/src/components/mail/compose.rs @@ -25,7 +25,7 @@ use melib::list_management; use melib::Draft; use crate::conf::accounts::JobRequest; -use crate::jobs::{JobChannel, JobId, JoinHandle}; +use crate::jobs::JoinHandle; use crate::terminal::embed::EmbedGrid; use indexmap::IndexSet; use nix::sys::wait::WaitStatus; @@ -125,7 +125,7 @@ enum ViewMode { Embed, SelectRecipients(UIDialog
), Send(UIConfirmationDialog), - WaitingForSendResult(UIDialog, JoinHandle, JobId, JobChannel<()>), + WaitingForSendResult(UIDialog, JoinHandle>), } impl ViewMode { @@ -579,11 +579,6 @@ impl Component for Composer { context[self.account_hash].pgp.auto_sign )); } - if self.encrypt_mail.is_unset() { - self.encrypt_mail = ToggleFlag::InternalVal(*account_settings!( - context[self.account_hash].pgp.auto_encrypt - )); - } if !self.draft.headers().contains_key("From") || self.draft.headers()["From"].is_empty() { self.draft.set_header( @@ -763,7 +758,7 @@ impl Component for Composer { /* Let user choose whether to quit with/without saving or cancel */ s.draw(grid, center_area(area, s.content.size()), context); } - ViewMode::WaitingForSendResult(ref mut s, _, _, _) => { + ViewMode::WaitingForSendResult(ref mut s, _) => { /* Let user choose whether to wait for success or cancel */ s.draw(grid, center_area(area, s.content.size()), context); } @@ -796,7 +791,7 @@ impl Component for Composer { Flag::SEEN, ) { Ok(job) => { - let (chan, handle, job_id) = context.job_executor.spawn_blocking(job); + let handle = context.job_executor.spawn_blocking(job); self.mode = ViewMode::WaitingForSendResult( UIDialog::new( "Waiting for confirmation.. The tab will close automatically on successful submission.", @@ -812,7 +807,7 @@ impl Component for Composer { )) })), context, - ), handle, job_id, chan); + ), handle); } Err(err) => { context.replies.push_back(UIEvent::Notification( @@ -906,7 +901,7 @@ impl Component for Composer { } } ( - ViewMode::WaitingForSendResult(ref selector, _, _, _), + ViewMode::WaitingForSendResult(ref selector, _), UIEvent::FinishedUIDialog(id, result), ) if selector.id() == *id => { if let Some(key) = result.downcast_mut::() { @@ -919,12 +914,12 @@ impl Component for Composer { } 'n' => { self.set_dirty(true); - if let ViewMode::WaitingForSendResult(_, handle, job_id, chan) = + if let ViewMode::WaitingForSendResult(_, handle) = std::mem::replace(&mut self.mode, ViewMode::Edit) { context.accounts[&self.account_hash].active_jobs.insert( - job_id, - JobRequest::SendMessageBackground(handle, chan), + handle.job_id, + JobRequest::SendMessageBackground { handle }, ); } } @@ -934,10 +929,10 @@ impl Component for Composer { return true; } ( - ViewMode::WaitingForSendResult(_, _, ref our_job_id, ref mut chan), + ViewMode::WaitingForSendResult(_, ref mut handle), UIEvent::StatusEvent(StatusEvent::JobFinished(ref job_id)), - ) if *our_job_id == *job_id => { - let result = chan.try_recv().unwrap(); + ) if handle.job_id == *job_id => { + let result = handle.chan.try_recv().unwrap(); if let Some(Err(err)) = result { self.mode = ViewMode::Edit; context.replies.push_back(UIEvent::Notification( @@ -953,7 +948,7 @@ impl Component for Composer { } return true; } - (ViewMode::WaitingForSendResult(ref mut selector, _, _, _), _) => { + (ViewMode::WaitingForSendResult(ref mut selector, _), _) => { if selector.process_event(event, context) { return true; } @@ -1548,7 +1543,7 @@ pub fn send_draft( mailbox_type: SpecialUsageMailbox, flags: Flag, complete_in_background: bool, -) -> Result)>> { +) -> Result>>> { let format_flowed = *account_settings!(context[account_hash].composing.format_flowed); if sign_mail.is_true() { let mut content_type = ContentType::default(); diff --git a/src/components/mail/listing.rs b/src/components/mail/listing.rs index e347e86b8..698a7f73f 100644 --- a/src/components/mail/listing.rs +++ b/src/components/mail/listing.rs @@ -177,9 +177,9 @@ pub trait MailListingTrait: ListingTrait { )); } Ok(fut) => { - let (channel, handle, job_id) = account.job_executor.spawn_specialized(fut); + let handle = account.job_executor.spawn_specialized(fut); account - .insert_job(job_id, JobRequest::SetFlags(env_hashes, handle, channel)); + .insert_job(handle.job_id, JobRequest::SetFlags { env_hashes, handle }); } } } @@ -196,9 +196,9 @@ pub trait MailListingTrait: ListingTrait { )); } Ok(fut) => { - let (channel, handle, job_id) = account.job_executor.spawn_specialized(fut); + let handle = account.job_executor.spawn_specialized(fut); account - .insert_job(job_id, JobRequest::SetFlags(env_hashes, handle, channel)); + .insert_job(handle.job_id, JobRequest::SetFlags { env_hashes, handle }); } } } @@ -215,9 +215,9 @@ pub trait MailListingTrait: ListingTrait { )); } Ok(fut) => { - let (channel, handle, job_id) = account.job_executor.spawn_specialized(fut); + let handle = account.job_executor.spawn_specialized(fut); account - .insert_job(job_id, JobRequest::SetFlags(env_hashes, handle, channel)); + .insert_job(handle.job_id, JobRequest::SetFlags { env_hashes, handle }); } } } @@ -234,9 +234,9 @@ pub trait MailListingTrait: ListingTrait { )); } Ok(fut) => { - let (channel, handle, job_id) = account.job_executor.spawn_specialized(fut); + let handle = account.job_executor.spawn_specialized(fut); account - .insert_job(job_id, JobRequest::SetFlags(env_hashes, handle, channel)); + .insert_job(handle.job_id, JobRequest::SetFlags { env_hashes, handle }); } } } @@ -253,10 +253,10 @@ pub trait MailListingTrait: ListingTrait { )); } Ok(fut) => { - let (channel, handle, job_id) = account.job_executor.spawn_specialized(fut); + let handle = account.job_executor.spawn_specialized(fut); account.insert_job( - job_id, - JobRequest::DeleteMessages(env_hashes, handle, channel), + handle.job_id, + JobRequest::DeleteMessages { env_hashes, handle }, ); } } @@ -278,13 +278,12 @@ pub trait MailListingTrait: ListingTrait { )); } Ok(fut) => { - let (channel, handle, job_id) = account.job_executor.spawn_specialized(fut); + let handle = account.job_executor.spawn_specialized(fut); account.insert_job( - job_id, + handle.job_id, JobRequest::Generic { name: "message copying".into(), handle, - channel, on_finish: None, logging_level: melib::LoggingLevel::INFO, }, @@ -316,13 +315,12 @@ pub trait MailListingTrait: ListingTrait { )); } Ok(fut) => { - let (channel, handle, job_id) = account.job_executor.spawn_specialized(fut); + let handle = account.job_executor.spawn_specialized(fut); account.insert_job( - job_id, + handle.job_id, JobRequest::Generic { name: "message moving".into(), handle, - channel, on_finish: None, logging_level: melib::LoggingLevel::INFO, }, diff --git a/src/components/mail/listing/compact.rs b/src/components/mail/listing/compact.rs index 91d6342c9..7b9109523 100644 --- a/src/components/mail/listing/compact.rs +++ b/src/components/mail/listing/compact.rs @@ -22,7 +22,7 @@ use super::EntryStrings; use super::*; use crate::components::utilities::PageMovement; -use crate::jobs::{oneshot, JobId}; +use crate::jobs::JoinHandle; use std::cmp; use std::convert::TryInto; use std::iter::FromIterator; @@ -136,16 +136,8 @@ pub struct CompactListing { rows_drawn: SegmentTree, rows: Vec<((usize, (ThreadHash, EnvelopeHash)), EntryStrings)>, - search_job: Option<( - String, - oneshot::Receiver>>, - JobId, - )>, - select_job: Option<( - String, - oneshot::Receiver>>, - JobId, - )>, + search_job: Option<(String, JoinHandle>>)>, + select_job: Option<(String, JoinHandle>>)>, filter_term: String, filtered_selection: Vec, filtered_order: HashMap, @@ -1698,13 +1690,10 @@ impl Component for CompactListing { self.cursor_pos.1, ) { Ok(job) => { - let (chan, handle, job_id) = context.accounts[&self.cursor_pos.0] + let handle = context.accounts[&self.cursor_pos.0] .job_executor .spawn_specialized(job); - context.accounts[&self.cursor_pos.0] - .active_jobs - .insert(job_id, crate::conf::accounts::JobRequest::Search(handle)); - self.search_job = Some((filter_term.to_string(), chan, job_id)); + self.search_job = Some((filter_term.to_string(), handle)); } Err(err) => { context.replies.push_back(UIEvent::Notification( @@ -1723,16 +1712,13 @@ impl Component for CompactListing { self.cursor_pos.1, ) { Ok(job) => { - let (mut chan, handle, job_id) = context.accounts[&self.cursor_pos.0] + let mut handle = context.accounts[&self.cursor_pos.0] .job_executor .spawn_specialized(job); - if let Ok(Some(search_result)) = try_recv_timeout!(&mut chan) { + if let Ok(Some(search_result)) = try_recv_timeout!(&mut handle.chan) { self.select(search_term, search_result, context); } else { - context.accounts[&self.cursor_pos.0] - .active_jobs - .insert(job_id, crate::conf::accounts::JobRequest::Search(handle)); - self.select_job = Some((search_term.to_string(), chan, job_id)); + self.select_job = Some((search_term.to_string(), handle)); } } Err(err) => { @@ -1749,11 +1735,11 @@ impl Component for CompactListing { if self .search_job .as_ref() - .map(|(_, _, j)| j == job_id) + .map(|(_, j)| j == job_id) .unwrap_or(false) => { - let (filter_term, mut rcvr, _job_id) = self.search_job.take().unwrap(); - let results = rcvr.try_recv().unwrap().unwrap(); + let (filter_term, mut handle) = self.search_job.take().unwrap(); + let results = handle.chan.try_recv().unwrap().unwrap(); self.filter(filter_term, results, context); self.set_dirty(true); } @@ -1761,11 +1747,11 @@ impl Component for CompactListing { if self .select_job .as_ref() - .map(|(_, _, j)| j == job_id) + .map(|(_, j)| j == job_id) .unwrap_or(false) => { - let (search_term, mut rcvr, _job_id) = self.select_job.take().unwrap(); - let results = rcvr.try_recv().unwrap().unwrap(); + let (search_term, mut handle) = self.select_job.take().unwrap(); + let results = handle.chan.try_recv().unwrap().unwrap(); self.select(&search_term, results, context); self.set_dirty(true); } diff --git a/src/components/mail/listing/conversations.rs b/src/components/mail/listing/conversations.rs index 4a342d750..af24de61c 100644 --- a/src/components/mail/listing/conversations.rs +++ b/src/components/mail/listing/conversations.rs @@ -21,7 +21,7 @@ use super::*; use crate::components::utilities::PageMovement; -use crate::jobs::{oneshot, JobId}; +use crate::jobs::JoinHandle; use std::iter::FromIterator; macro_rules! row_attr { @@ -104,11 +104,7 @@ pub struct ConversationsListing { /// Cache current view. content: CellBuffer, - search_job: Option<( - String, - oneshot::Receiver>>, - JobId, - )>, + search_job: Option<(String, JoinHandle>>)>, filter_term: String, filtered_selection: Vec, filtered_order: HashMap, @@ -1555,13 +1551,10 @@ impl Component for ConversationsListing { self.cursor_pos.1, ) { Ok(job) => { - let (chan, handle, job_id) = context.accounts[&self.cursor_pos.0] + let handle = context.accounts[&self.cursor_pos.0] .job_executor .spawn_specialized(job); - context.accounts[&self.cursor_pos.0] - .active_jobs - .insert(job_id, crate::conf::accounts::JobRequest::Search(handle)); - self.search_job = Some((filter_term.to_string(), chan, job_id)); + self.search_job = Some((filter_term.to_string(), handle)); } Err(err) => { context.replies.push_back(UIEvent::Notification( @@ -1601,11 +1594,11 @@ impl Component for ConversationsListing { if self .search_job .as_ref() - .map(|(_, _, j)| j == job_id) + .map(|(_, j)| j == job_id) .unwrap_or(false) => { - let (filter_term, mut rcvr, _job_id) = self.search_job.take().unwrap(); - let results = rcvr.try_recv().unwrap().unwrap(); + let (filter_term, mut handle) = self.search_job.take().unwrap(); + let results = handle.chan.try_recv().unwrap().unwrap(); self.filter(filter_term, results, context); self.set_dirty(true); } diff --git a/src/components/mail/listing/plain.rs b/src/components/mail/listing/plain.rs index 7730e653f..326dbb485 100644 --- a/src/components/mail/listing/plain.rs +++ b/src/components/mail/listing/plain.rs @@ -22,7 +22,7 @@ use super::EntryStrings; use super::*; use crate::components::utilities::PageMovement; -use crate::jobs::{oneshot, JobId, JoinHandle}; +use crate::jobs::{JobId, JoinHandle}; use std::cmp; use std::iter::FromIterator; @@ -133,11 +133,7 @@ pub struct PlainListing { /// Cache current view. data_columns: DataColumns, - search_job: Option<( - String, - oneshot::Receiver>>, - JobId, - )>, + search_job: Option<(String, JoinHandle>>)>, filter_term: String, filtered_selection: Vec, filtered_order: HashMap, @@ -155,7 +151,7 @@ pub struct PlainListing { _row_updates: SmallVec<[ThreadHash; 8]>, color_cache: ColorCache, - active_jobs: HashMap>)>, + active_jobs: HashMap>>, movement: Option, id: ComponentId, } @@ -1074,8 +1070,8 @@ impl PlainListing { ))); } Ok(fut) => { - let (rcvr, handle, job_id) = account.job_executor.spawn_specialized(fut); - self.active_jobs.insert(job_id, (handle, rcvr)); + let handle = account.job_executor.spawn_specialized(fut); + self.active_jobs.insert(handle.job_id, handle); } } self.row_updates.push(env_hash); @@ -1337,12 +1333,10 @@ impl Component for PlainListing { self.cursor_pos.1, ) { Ok(job) => { - let (chan, handle, job_id) = context.accounts[&self.cursor_pos.0] + let handle = context.accounts[&self.cursor_pos.0] .job_executor .spawn_specialized(job); - context.accounts[&self.cursor_pos.0] - .insert_job(job_id, crate::conf::accounts::JobRequest::Search(handle)); - self.search_job = Some((filter_term.to_string(), chan, job_id)); + self.search_job = Some((filter_term.to_string(), handle)); } Err(err) => { context.replies.push_back(UIEvent::Notification( @@ -1358,11 +1352,11 @@ impl Component for PlainListing { if self .search_job .as_ref() - .map(|(_, _, j)| j == job_id) + .map(|(_, j)| j == job_id) .unwrap_or(false) => { - let (filter_term, mut rcvr, _job_id) = self.search_job.take().unwrap(); - let results = rcvr.try_recv().unwrap().unwrap(); + let (filter_term, mut handle) = self.search_job.take().unwrap(); + let results = handle.chan.try_recv().unwrap().unwrap(); self.filter(filter_term, results, context); self.set_dirty(true); } diff --git a/src/components/mail/status.rs b/src/components/mail/status.rs index ece3af215..bd409103d 100644 --- a/src/components/mail/status.rs +++ b/src/components/mail/status.rs @@ -580,11 +580,14 @@ impl Component for AccountStatus { None, ); if let JobRequest::DeleteMailbox { mailbox_hash, .. } - | JobRequest::SetMailboxPermissions(mailbox_hash, _, _) - | JobRequest::SetMailboxSubscription(mailbox_hash, _, _) - | JobRequest::CopyTo(mailbox_hash, _, _) - | JobRequest::Refresh(mailbox_hash, _, _) - | JobRequest::Fetch(mailbox_hash, _, _) = req + | JobRequest::SetMailboxPermissions { mailbox_hash, .. } + | JobRequest::SetMailboxSubscription { mailbox_hash, .. } + | JobRequest::CopyTo { + dest_mailbox_hash: mailbox_hash, + .. + } + | JobRequest::Refresh { mailbox_hash, .. } + | JobRequest::Fetch { mailbox_hash, .. } = req { write_string_to_grid( a.mailbox_entries[mailbox_hash].name(), diff --git a/src/components/mail/view.rs b/src/components/mail/view.rs index c7cae0fc8..62a03302b 100644 --- a/src/components/mail/view.rs +++ b/src/components/mail/view.rs @@ -21,7 +21,7 @@ use super::*; use crate::conf::accounts::JobRequest; -use crate::jobs::{oneshot, JobId}; +use crate::jobs::{JobId, JoinHandle}; use melib::email::attachment_types::ContentType; use melib::list_management; use melib::parser::BytesExt; @@ -105,8 +105,7 @@ pub enum AttachmentDisplay { SignedPending { inner: Attachment, display: Vec, - chan: - std::result::Result>, oneshot::Receiver>>>, + handle: std::result::Result>, JoinHandle>>>, job_id: JobId, }, SignedFailed { @@ -125,8 +124,7 @@ pub enum AttachmentDisplay { }, EncryptedPending { inner: Attachment, - chan: oneshot::Receiver)>>, - job_id: JobId, + handle: JoinHandle)>>, }, EncryptedFailed { inner: Attachment, @@ -177,8 +175,7 @@ enum MailViewState { pending_action: Option, }, LoadingBody { - job_id: JobId, - chan: oneshot::Receiver>>, + handle: JoinHandle>>, pending_action: Option, }, Error { @@ -282,8 +279,8 @@ impl MailView { .and_then(|mut op| op.as_bytes()) { Ok(fut) => { - let (mut chan, handle, job_id) = - account.job_executor.spawn_specialized(fut); + let mut handle = account.job_executor.spawn_specialized(fut); + let job_id = handle.job_id; pending_action = if let MailViewState::Init { ref mut pending_action, } = self.state @@ -292,7 +289,7 @@ impl MailView { } else { None }; - if let Ok(Some(bytes_result)) = try_recv_timeout!(&mut chan) { + if let Ok(Some(bytes_result)) = try_recv_timeout!(&mut handle.chan) { match bytes_result { Ok(bytes) => { if account @@ -333,12 +330,10 @@ impl MailView { } } else { self.state = MailViewState::LoadingBody { - job_id, - chan, + handle, pending_action: pending_action.take(), }; self.active_jobs.insert(job_id); - account.insert_job(job_id, JobRequest::AsBytes(handle)); } } Err(err) => { @@ -357,10 +352,13 @@ impl MailView { ); match job { Ok(fut) => { - let (rcvr, handle, job_id) = account.job_executor.spawn_specialized(fut); + let handle = account.job_executor.spawn_specialized(fut); account.insert_job( - job_id, - JobRequest::SetFlags(self.coordinates.2.into(), handle, rcvr), + handle.job_id, + JobRequest::SetFlags { + env_hashes: self.coordinates.2.into(), + handle, + }, ); } Err(e) => { @@ -439,7 +437,7 @@ impl MailView { SignedPending { inner: _, display, - chan: _, + handle: _, job_id: _, } => { acc.push_str("Waiting for signature verification.\n\n"); @@ -512,7 +510,7 @@ impl MailView { | SignedPending { inner, display: _, - chan: _, + handle: _, job_id: _, } | SignedUnverified { inner, display: _ } @@ -526,11 +524,7 @@ impl MailView { display: _, description: _, } - | EncryptedPending { - inner, - chan: _, - job_id: _, - } + | EncryptedPending { inner, handle: _ } | EncryptedFailed { inner, error: _ } | EncryptedSuccess { inner: _, @@ -663,9 +657,8 @@ impl MailView { a.clone(), Some(bin.to_string()), ); - let (chan, _handle, job_id) = - context.job_executor.spawn_blocking(verify_fut); - active_jobs.insert(job_id); + let handle = context.job_executor.spawn_blocking(verify_fut); + active_jobs.insert(handle.job_id); acc.push(AttachmentDisplay::SignedPending { inner: a.clone(), display: { @@ -673,8 +666,8 @@ impl MailView { rec(&parts[0], context, coordinates, &mut v, active_jobs); v }, - chan: Err(chan), - job_id, + job_id: handle.job_id, + handle: Err(handle), }); } else { #[cfg(not(feature = "gpgme"))] @@ -705,11 +698,12 @@ impl MailView { ctx.verify(sig, data) }) { Ok(verify_fut) => { - let (chan, _handle, job_id) = + let handle = context.job_executor.spawn_specialized(verify_fut); - active_jobs.insert(job_id); + active_jobs.insert(handle.job_id); acc.push(AttachmentDisplay::SignedPending { inner: a.clone(), + job_id: handle.job_id, display: { let mut v = vec![]; rec( @@ -721,8 +715,7 @@ impl MailView { ); v }, - chan: Ok(chan), - job_id, + handle: Ok(handle), }); } Err(error) => { @@ -774,13 +767,12 @@ impl MailView { Some(bin.to_string()), None, ); - let (chan, _handle, job_id) = + let handle = context.job_executor.spawn_blocking(decrypt_fut); - active_jobs.insert(job_id); + active_jobs.insert(handle.job_id); acc.push(AttachmentDisplay::EncryptedPending { inner: a.clone(), - chan, - job_id, + handle, }); } else { #[cfg(not(feature = "gpgme"))] @@ -796,14 +788,13 @@ impl MailView { ctx.decrypt(cipher) }) { Ok(decrypt_fut) => { - let (chan, _handle, job_id) = context + let handle = context .job_executor .spawn_specialized(decrypt_fut); - active_jobs.insert(job_id); + active_jobs.insert(handle.job_id); acc.push(AttachmentDisplay::EncryptedPending { inner: a.clone(), - chan, - job_id, + handle, }); } Err(error) => { @@ -875,7 +866,7 @@ impl MailView { | SignedPending { inner, display: _, - chan: _, + handle: _, job_id: _, } | SignedFailed { @@ -889,11 +880,7 @@ impl MailView { description: _, } | SignedUnverified { inner, display: _ } - | EncryptedPending { - inner, - chan: _, - job_id: _, - } + | EncryptedPending { inner, handle: _ } | EncryptedFailed { inner, error: _ } | EncryptedSuccess { inner: _, @@ -1482,11 +1469,10 @@ impl Component for MailView { { match self.state { MailViewState::LoadingBody { - job_id: ref id, - ref mut chan, + ref mut handle, pending_action: _, - } if job_id == id => { - let bytes_result = chan.try_recv().unwrap().unwrap(); + } if handle.job_id == *job_id => { + let bytes_result = handle.chan.try_recv().unwrap().unwrap(); match bytes_result { Ok(bytes) => { if context.accounts[&self.coordinates.0] @@ -1537,14 +1523,19 @@ impl Component for MailView { match d { AttachmentDisplay::SignedPending { inner, - chan, + handle, display, - job_id: id, - } if id == job_id => { + job_id: our_job_id, + } if *our_job_id == *job_id => { caught = true; self.initialised = false; - match chan { - Ok(chan) => match chan.try_recv().unwrap().unwrap() { + match handle.as_mut() { + Ok(handle) => match handle + .chan + .try_recv() + .unwrap() + .unwrap() + { Ok(()) => { *d = AttachmentDisplay::SignedVerified { inner: std::mem::replace( @@ -1566,7 +1557,12 @@ impl Component for MailView { }; } }, - Err(chan) => match chan.try_recv().unwrap().unwrap() { + Err(handle) => match handle + .chan + .try_recv() + .unwrap() + .unwrap() + { Ok(verify_bytes) => { *d = AttachmentDisplay::SignedVerified { inner: std::mem::replace( @@ -1593,14 +1589,12 @@ impl Component for MailView { }, } } - AttachmentDisplay::EncryptedPending { - inner, - chan, - job_id: id, - } if id == job_id => { + AttachmentDisplay::EncryptedPending { inner, handle } + if handle.job_id == *job_id => + { caught = true; self.initialised = false; - match chan.try_recv().unwrap().unwrap() { + match handle.chan.try_recv().unwrap().unwrap() { Ok((metadata, decrypted_bytes)) => { let plaintext = AttachmentBuilder::new(&decrypted_bytes) @@ -1700,7 +1694,7 @@ impl Component for MailView { let _ = sender.send(operation?.as_bytes()?.await); Ok(()) }; - let (channel, handle, job_id) = if context.accounts[&account_hash] + let handle = if context.accounts[&account_hash] .backend_capabilities .is_async { @@ -1713,11 +1707,10 @@ impl Component for MailView { .spawn_blocking(bytes_job) }; context.accounts[&account_hash].insert_job( - job_id, + handle.job_id, crate::conf::accounts::JobRequest::Generic { name: "fetch envelope".into(), handle, - channel, on_finish: Some(CallbackFn(Box::new(move |context: &mut Context| { let result = receiver.try_recv().unwrap().unwrap(); match result.and_then(|bytes| { diff --git a/src/conf/accounts.rs b/src/conf/accounts.rs index dc0346737..c93a5f2d1 100644 --- a/src/conf/accounts.rs +++ b/src/conf/accounts.rs @@ -24,7 +24,7 @@ */ use super::{AccountConf, FileMailboxConf}; -use crate::jobs::{JobChannel, JobExecutor, JobId, JoinHandle}; +use crate::jobs::{JobExecutor, JobId, JoinHandle}; use indexmap::IndexMap; use melib::backends::*; use melib::email::*; @@ -40,7 +40,6 @@ use std::collections::{HashMap, HashSet}; use crate::types::UIEvent::{self, EnvelopeRemove, EnvelopeRename, EnvelopeUpdate, Notification}; use crate::{StatusEvent, ThreadEvent}; use crossbeam::Sender; -use futures::channel::oneshot; use futures::future::FutureExt; pub use futures::stream::Stream; use futures::stream::StreamExt; @@ -160,87 +159,112 @@ pub struct Account { } pub enum JobRequest { - Mailboxes( - JoinHandle, - oneshot::Receiver>>, - ), - Fetch( - MailboxHash, - JoinHandle, - oneshot::Receiver<( + Mailboxes { + handle: JoinHandle>>, + }, + Fetch { + mailbox_hash: MailboxHash, + handle: JoinHandle<( Option>>, Pin>> + Send + 'static>>, )>, - ), + }, Generic { name: Cow<'static, str>, logging_level: melib::LoggingLevel, - handle: JoinHandle, - channel: JobChannel<()>, + handle: JoinHandle>, on_finish: Option, }, - IsOnline(JoinHandle, oneshot::Receiver>), - Refresh(MailboxHash, JoinHandle, oneshot::Receiver>), - SetFlags(EnvelopeHashBatch, JoinHandle, oneshot::Receiver>), + IsOnline { + handle: JoinHandle>, + }, + Refresh { + mailbox_hash: MailboxHash, + handle: JoinHandle>, + }, + SetFlags { + env_hashes: EnvelopeHashBatch, + handle: JoinHandle>, + }, SaveMessage { bytes: Vec, mailbox_hash: MailboxHash, - handle: JoinHandle, - channel: oneshot::Receiver>, + handle: JoinHandle>, }, SendMessage, - SendMessageBackground(JoinHandle, JobChannel<()>), - CopyTo(MailboxHash, JoinHandle, oneshot::Receiver>>), - DeleteMessages(EnvelopeHashBatch, JoinHandle, oneshot::Receiver>), + SendMessageBackground { + handle: JoinHandle>, + }, + CopyTo { + dest_mailbox_hash: MailboxHash, + handle: JoinHandle>>, + }, + DeleteMessages { + env_hashes: EnvelopeHashBatch, + handle: JoinHandle>, + }, CreateMailbox { path: String, - handle: JoinHandle, - channel: JobChannel<(MailboxHash, HashMap)>, + handle: JoinHandle)>>, }, DeleteMailbox { mailbox_hash: MailboxHash, - handle: JoinHandle, - channel: JobChannel>, + handle: JoinHandle>>, }, //RenameMailbox, - Search(JoinHandle), - AsBytes(JoinHandle), - SetMailboxPermissions(MailboxHash, JoinHandle, oneshot::Receiver>), - SetMailboxSubscription(MailboxHash, JoinHandle, oneshot::Receiver>), + Search { + handle: JoinHandle>, + }, + AsBytes { + handle: JoinHandle>, + }, + SetMailboxPermissions { + mailbox_hash: MailboxHash, + handle: JoinHandle>, + }, + SetMailboxSubscription { + mailbox_hash: MailboxHash, + handle: JoinHandle>, + }, Watch { - channel: oneshot::Receiver>, - handle: JoinHandle, + handle: JoinHandle>, }, } impl Drop for JobRequest { fn drop(&mut self) { match self { - JobRequest::Generic { handle, .. } => handle.0.cancel(), - JobRequest::Mailboxes(h, _) => h.0.cancel(), - JobRequest::Fetch(_, h, _) => h.0.cancel(), - JobRequest::IsOnline(h, _) => h.0.cancel(), - JobRequest::Refresh(_, h, _) => h.0.cancel(), - JobRequest::SetFlags(_, h, _) => h.0.cancel(), - JobRequest::SaveMessage { handle, .. } => handle.0.cancel(), - JobRequest::CopyTo(_, h, _) => h.0.cancel(), - JobRequest::DeleteMessages(_, h, _) => h.0.cancel(), - JobRequest::CreateMailbox { handle, .. } => handle.0.cancel(), - JobRequest::DeleteMailbox { handle, .. } => handle.0.cancel(), + JobRequest::Generic { handle, .. } | + JobRequest::IsOnline { handle, .. } | + JobRequest::Refresh { handle, .. } | + JobRequest::SetFlags { handle, .. } | + JobRequest::SaveMessage { handle, .. } | //JobRequest::RenameMailbox, - JobRequest::Search(h) => h.0.cancel(), - JobRequest::AsBytes(h) => h.0.cancel(), - JobRequest::SetMailboxPermissions(_, h, _) => { - h.0.cancel(); + JobRequest::Search { handle, .. } | + JobRequest::AsBytes { handle, .. } | + JobRequest::SetMailboxPermissions { handle, .. } | + JobRequest::SetMailboxSubscription { handle, .. } | + JobRequest::Watch { handle, .. } | + JobRequest::SendMessageBackground { handle, .. } => { + handle.cancel(); } - JobRequest::SetMailboxSubscription(_, h, _) => { - h.0.cancel(); + JobRequest::DeleteMessages { handle, .. } => { + handle.cancel(); } - JobRequest::Watch { handle, .. } => handle.0.cancel(), + JobRequest::CreateMailbox { handle, .. } => { + handle.cancel(); + } + JobRequest::DeleteMailbox { handle, .. } => { + handle.cancel(); + } + JobRequest::Fetch { handle, .. } => { + handle.cancel(); + } + JobRequest::Mailboxes { handle, .. } => { + handle.cancel(); + } + JobRequest::CopyTo { handle, .. } => { handle.cancel(); } JobRequest::SendMessage => {} - JobRequest::SendMessageBackground(h, _) => { - h.0.cancel(); - } } } } @@ -249,30 +273,32 @@ impl core::fmt::Debug for JobRequest { fn fmt(&self, f: &mut core::fmt::Formatter) -> core::fmt::Result { match self { JobRequest::Generic { name, .. } => write!(f, "JobRequest::Generic({})", name), - JobRequest::Mailboxes(_, _) => write!(f, "JobRequest::Mailboxes"), - JobRequest::Fetch(hash, _, _) => write!(f, "JobRequest::Fetch({})", hash), - JobRequest::IsOnline(_, _) => write!(f, "JobRequest::IsOnline"), - JobRequest::Refresh(_, _, _) => write!(f, "JobRequest::Refresh"), - JobRequest::SetFlags(_, _, _) => write!(f, "JobRequest::SetFlags"), + JobRequest::Mailboxes { .. } => write!(f, "JobRequest::Mailboxes"), + JobRequest::Fetch { mailbox_hash, .. } => { + write!(f, "JobRequest::Fetch({})", mailbox_hash) + } + JobRequest::IsOnline { .. } => write!(f, "JobRequest::IsOnline"), + JobRequest::Refresh { .. } => write!(f, "JobRequest::Refresh"), + JobRequest::SetFlags { .. } => write!(f, "JobRequest::SetFlags"), JobRequest::SaveMessage { .. } => write!(f, "JobRequest::SaveMessage"), - JobRequest::CopyTo(_, _, _) => write!(f, "JobRequest::CopyTo"), - JobRequest::DeleteMessages(_, _, _) => write!(f, "JobRequest::DeleteMessages"), + JobRequest::CopyTo { .. } => write!(f, "JobRequest::CopyTo"), + JobRequest::DeleteMessages { .. } => write!(f, "JobRequest::DeleteMessages"), JobRequest::CreateMailbox { .. } => write!(f, "JobRequest::CreateMailbox"), JobRequest::DeleteMailbox { mailbox_hash, .. } => { write!(f, "JobRequest::DeleteMailbox({})", mailbox_hash) } //JobRequest::RenameMailbox, - JobRequest::Search(_) => write!(f, "JobRequest::Search"), - JobRequest::AsBytes(_) => write!(f, "JobRequest::AsBytes"), - JobRequest::SetMailboxPermissions(_, _, _) => { + JobRequest::Search { .. } => write!(f, "JobRequest::Search"), + JobRequest::AsBytes { .. } => write!(f, "JobRequest::AsBytes"), + JobRequest::SetMailboxPermissions { .. } => { write!(f, "JobRequest::SetMailboxPermissions") } - JobRequest::SetMailboxSubscription(_, _, _) => { + JobRequest::SetMailboxSubscription { .. } => { write!(f, "JobRequest::SetMailboxSubscription") } JobRequest::Watch { .. } => write!(f, "JobRequest::Watch"), JobRequest::SendMessage => write!(f, "JobRequest::SendMessage"), - JobRequest::SendMessageBackground(_, _) => { + JobRequest::SendMessageBackground { .. } => { write!(f, "JobRequest::SendMessageBackground") } } @@ -283,33 +309,33 @@ impl core::fmt::Display for JobRequest { fn fmt(&self, f: &mut core::fmt::Formatter) -> core::fmt::Result { match self { JobRequest::Generic { name, .. } => write!(f, "{}", name), - JobRequest::Mailboxes(_, _) => write!(f, "Get mailbox list"), - JobRequest::Fetch(_, _, _) => write!(f, "Mailbox fetch"), - JobRequest::IsOnline(_, _) => write!(f, "Online status check"), - JobRequest::Refresh(_, _, _) => write!(f, "Refresh mailbox"), - JobRequest::SetFlags(batch, _, _) => write!( + JobRequest::Mailboxes { .. } => write!(f, "Get mailbox list"), + JobRequest::Fetch { .. } => write!(f, "Mailbox fetch"), + JobRequest::IsOnline { .. } => write!(f, "Online status check"), + JobRequest::Refresh { .. } => write!(f, "Refresh mailbox"), + JobRequest::SetFlags { env_hashes, .. } => write!( f, "Set flags for {} message{}", - batch.len(), - if batch.len() == 1 { "" } else { "s" } + env_hashes.len(), + if env_hashes.len() == 1 { "" } else { "s" } ), JobRequest::SaveMessage { .. } => write!(f, "Save message"), - JobRequest::CopyTo(_, _, _) => write!(f, "Copy message."), - JobRequest::DeleteMessages(batch, _, _) => write!( + JobRequest::CopyTo { .. } => write!(f, "Copy message."), + JobRequest::DeleteMessages { env_hashes, .. } => write!( f, "Delete {} message{}", - batch.len(), - if batch.len() == 1 { "" } else { "s" } + env_hashes.len(), + if env_hashes.len() == 1 { "" } else { "s" } ), JobRequest::CreateMailbox { path, .. } => write!(f, "Create mailbox {}", path), JobRequest::DeleteMailbox { .. } => write!(f, "Delete mailbox"), //JobRequest::RenameMailbox, - JobRequest::Search(_) => write!(f, "Search"), - JobRequest::AsBytes(_) => write!(f, "Message body fetch"), - JobRequest::SetMailboxPermissions(_, _, _) => write!(f, "Set mailbox permissions"), - JobRequest::SetMailboxSubscription(_, _, _) => write!(f, "Set mailbox subscription"), + JobRequest::Search { .. } => write!(f, "Search"), + JobRequest::AsBytes { .. } => write!(f, "Message body fetch"), + JobRequest::SetMailboxPermissions { .. } => write!(f, "Set mailbox permissions"), + JobRequest::SetMailboxSubscription { .. } => write!(f, "Set mailbox subscription"), JobRequest::Watch { .. } => write!(f, "Background watch"), - JobRequest::SendMessageBackground(_, _) | JobRequest::SendMessage => { + JobRequest::SendMessageBackground { .. } | JobRequest::SendMessage => { write!(f, "Sending message") } } @@ -326,14 +352,16 @@ impl JobRequest { pub fn is_fetch(&self, mailbox_hash: MailboxHash) -> bool { match self { - JobRequest::Fetch(h, _, _) if *h == mailbox_hash => true, + JobRequest::Fetch { + mailbox_hash: h, .. + } if *h == mailbox_hash => true, _ => false, } } pub fn is_online(&self) -> bool { match self { - JobRequest::IsOnline(_, _) => true, + JobRequest::IsOnline { .. } => true, _ => false, } } @@ -457,12 +485,13 @@ impl Account { let mut active_job_instants = BTreeMap::default(); if let Ok(mailboxes_job) = backend.mailboxes() { if let Ok(online_job) = backend.is_online() { - let (rcvr, handle, job_id) = if backend.capabilities().is_async { + let handle = if backend.capabilities().is_async { job_executor.spawn_specialized(online_job.then(|_| mailboxes_job)) } else { job_executor.spawn_blocking(online_job.then(|_| mailboxes_job)) }; - active_jobs.insert(job_id, JobRequest::Mailboxes(handle, rcvr)); + let job_id = handle.job_id; + active_jobs.insert(job_id, JobRequest::Mailboxes { handle }); active_job_instants.insert(std::time::Instant::now(), job_id); sender .send(ThreadEvent::UIEvent(UIEvent::StatusEvent( @@ -625,18 +654,24 @@ impl Account { entry.status = MailboxStatus::Parsing(0, total); if let Ok(mailbox_job) = self.backend.write().unwrap().fetch(*h) { let mailbox_job = mailbox_job.into_future(); - let (rcvr, handle, job_id) = if self.backend_capabilities.is_async { + let handle = if self.backend_capabilities.is_async { self.job_executor.spawn_specialized(mailbox_job) } else { self.job_executor.spawn_blocking(mailbox_job) }; + let job_id = handle.job_id; self.sender .send(ThreadEvent::UIEvent(UIEvent::StatusEvent( StatusEvent::NewJob(job_id), ))) .unwrap(); - self.active_jobs - .insert(job_id, JobRequest::Fetch(*h, handle, rcvr)); + self.active_jobs.insert( + job_id, + JobRequest::Fetch { + mailbox_hash: *h, + handle, + }, + ); self.active_job_instants .insert(std::time::Instant::now(), job_id); } @@ -695,10 +730,9 @@ impl Account { ); } Ok(job) => { - let (channel, handle, job_id) = - self.job_executor.spawn_blocking(job); + let handle = self.job_executor.spawn_blocking(job); self.insert_job( - job_id, + handle.job_id, JobRequest::Generic { name: format!( "Update envelope {} in sqlite3 cache", @@ -706,7 +740,6 @@ impl Account { ) .into(), handle, - channel, logging_level: melib::LoggingLevel::TRACE, on_finish: None, }, @@ -743,10 +776,9 @@ impl Account { ) }) { Ok(job) => { - let (channel, handle, job_id) = - self.job_executor.spawn_blocking(job); + let handle = self.job_executor.spawn_blocking(job); self.insert_job( - job_id, + handle.job_id, JobRequest::Generic { name: format!( "Update envelope {} in sqlite3 cache", @@ -755,7 +787,6 @@ impl Account { ) .into(), handle, - channel, logging_level: melib::LoggingLevel::TRACE, on_finish: None, }, @@ -803,10 +834,9 @@ impl Account { ); } Ok(job) => { - let (channel, handle, job_id) = - self.job_executor.spawn_blocking(job); + let handle = self.job_executor.spawn_blocking(job); self.insert_job( - job_id, + handle.job_id, JobRequest::Generic { name: format!( "Update envelope {} in sqlite3 cache", @@ -815,7 +845,6 @@ impl Account { ) .into(), handle, - channel, logging_level: melib::LoggingLevel::TRACE, on_finish: None, }, @@ -845,14 +874,13 @@ impl Account { }; #[cfg(feature = "sqlite3")] if self.settings.conf.search_backend == crate::conf::SearchBackend::Sqlite3 { - let (channel, handle, job_id) = - self.job_executor.spawn_blocking(crate::sqlite3::insert( - (*envelope).clone(), - self.backend.clone(), - self.name.clone(), - )); + let handle = self.job_executor.spawn_blocking(crate::sqlite3::insert( + (*envelope).clone(), + self.backend.clone(), + self.name.clone(), + )); self.insert_job( - job_id, + handle.job_id, JobRequest::Generic { name: format!( "Update envelope {} in sqlite3 cache", @@ -860,7 +888,6 @@ impl Account { ) .into(), handle, - channel, logging_level: melib::LoggingLevel::TRACE, on_finish: None, }, @@ -1006,12 +1033,18 @@ impl Account { } let refresh_job = self.backend.write().unwrap().refresh(mailbox_hash); if let Ok(refresh_job) = refresh_job { - let (rcvr, handle, job_id) = if self.backend_capabilities.is_async { + let handle = if self.backend_capabilities.is_async { self.job_executor.spawn_specialized(refresh_job) } else { self.job_executor.spawn_blocking(refresh_job) }; - self.insert_job(job_id, JobRequest::Refresh(mailbox_hash, handle, rcvr)); + self.insert_job( + handle.job_id, + JobRequest::Refresh { + mailbox_hash, + handle, + }, + ); } Ok(()) } @@ -1024,13 +1057,13 @@ impl Account { if !self.active_jobs.values().any(|j| j.is_watch()) { match self.backend.read().unwrap().watch() { Ok(fut) => { - let (channel, handle, job_id) = if self.backend_capabilities.is_async { + let handle = if self.backend_capabilities.is_async { self.job_executor.spawn_specialized(fut) } else { self.job_executor.spawn_blocking(fut) }; self.active_jobs - .insert(job_id, JobRequest::Watch { channel, handle }); + .insert(handle.job_id, JobRequest::Watch { handle }); } Err(e) => { self.sender @@ -1096,12 +1129,18 @@ impl Account { match mailbox_job { Ok(mailbox_job) => { let mailbox_job = mailbox_job.into_future(); - let (rcvr, handle, job_id) = if self.backend_capabilities.is_async { + let handle = if self.backend_capabilities.is_async { self.job_executor.spawn_specialized(mailbox_job) } else { self.job_executor.spawn_blocking(mailbox_job) }; - self.insert_job(job_id, JobRequest::Fetch(mailbox_hash, handle, rcvr)); + self.insert_job( + handle.job_id, + JobRequest::Fetch { + mailbox_hash, + handle, + }, + ); } Err(err) => { self.mailbox_entries @@ -1192,18 +1231,17 @@ impl Account { .unwrap() .save(bytes.to_vec(), mailbox_hash, flags)?; - let (channel, handle, job_id) = if self.backend_capabilities.is_async { + let handle = if self.backend_capabilities.is_async { self.job_executor.spawn_specialized(job) } else { self.job_executor.spawn_blocking(job) }; self.insert_job( - job_id, + handle.job_id, JobRequest::SaveMessage { bytes: bytes.to_vec(), mailbox_hash, handle, - channel, }, ); Ok(()) @@ -1214,7 +1252,7 @@ impl Account { message: String, send_mail: crate::conf::composing::SendMail, complete_in_background: bool, - ) -> Result)>> { + ) -> Result>>> { use crate::conf::composing::SendMail; use std::io::Write; use std::process::{Command, Stdio}; @@ -1262,18 +1300,18 @@ impl Account { } #[cfg(feature = "smtp")] SendMail::Smtp(conf) => { - let (chan, handle, job_id) = self.job_executor.spawn_specialized(async move { + let handle = self.job_executor.spawn_specialized(async move { let mut smtp_connection = melib::smtp::SmtpConnection::new_connection(conf).await?; smtp_connection.mail_transaction(&message, None).await }); if complete_in_background { - self.insert_job(job_id, JobRequest::SendMessageBackground(handle, chan)); + self.insert_job(handle.job_id, JobRequest::SendMessageBackground { handle }); return Ok(None); } else { - self.insert_job(job_id, JobRequest::SendMessage); + self.insert_job(handle.job_id, JobRequest::SendMessage); } - Ok(Some((job_id, handle, chan))) + Ok(Some(handle)) } } } @@ -1381,19 +1419,12 @@ impl Account { .write() .unwrap() .create_mailbox(path.to_string())?; - let (channel, handle, job_id) = if self.backend_capabilities.is_async { + let handle = if self.backend_capabilities.is_async { self.job_executor.spawn_specialized(job) } else { self.job_executor.spawn_blocking(job) }; - self.insert_job( - job_id, - JobRequest::CreateMailbox { - path, - handle, - channel, - }, - ); + self.insert_job(handle.job_id, JobRequest::CreateMailbox { path, handle }); Ok(()) } MailboxOperation::Delete(path) => { @@ -1403,17 +1434,16 @@ impl Account { let mailbox_hash = self.mailbox_by_path(&path)?; let job = self.backend.write().unwrap().delete_mailbox(mailbox_hash)?; - let (channel, handle, job_id) = if self.backend_capabilities.is_async { + let handle = if self.backend_capabilities.is_async { self.job_executor.spawn_specialized(job) } else { self.job_executor.spawn_blocking(job) }; self.insert_job( - job_id, + handle.job_id, JobRequest::DeleteMailbox { mailbox_hash, handle, - channel, }, ); Ok(()) @@ -1498,12 +1528,12 @@ impl Account { if !self.active_jobs.values().any(JobRequest::is_online) { let online_job = self.backend.read().unwrap().is_online(); if let Ok(online_job) = online_job { - let (rcvr, handle, job_id) = if self.backend_capabilities.is_async { + let handle = if self.backend_capabilities.is_async { self.job_executor.spawn_specialized(online_job) } else { self.job_executor.spawn_blocking(online_job) }; - self.insert_job(job_id, JobRequest::IsOnline(handle, rcvr)); + self.insert_job(handle.job_id, JobRequest::IsOnline { handle }); } } return self.is_online.clone(); @@ -1563,8 +1593,8 @@ impl Account { if self.active_jobs.contains_key(job_id) { match self.active_jobs.remove(job_id).unwrap() { - JobRequest::Mailboxes(_, ref mut chan) => { - if let Some(mailboxes) = chan.try_recv().unwrap() { + JobRequest::Mailboxes { ref mut handle } => { + if let Some(mailboxes) = handle.chan.try_recv().unwrap() { self.sender .send(ThreadEvent::UIEvent(UIEvent::AccountStatusChange( self.hash, @@ -1584,19 +1614,23 @@ impl Account { } let mailboxes_job = self.backend.read().unwrap().mailboxes(); if let Ok(mailboxes_job) = mailboxes_job { - let (rcvr, handle, job_id) = if self.backend_capabilities.is_async { + let handle = if self.backend_capabilities.is_async { self.job_executor.spawn_specialized(mailboxes_job) } else { self.job_executor.spawn_blocking(mailboxes_job) }; - self.insert_job(job_id, JobRequest::Mailboxes(handle, rcvr)); + self.insert_job(handle.job_id, JobRequest::Mailboxes { handle }); }; } } } - JobRequest::Fetch(mailbox_hash, _, ref mut chan) => { + JobRequest::Fetch { + mailbox_hash, + ref mut handle, + .. + } => { let (payload, rest): (Option>>, _) = - chan.try_recv().unwrap().unwrap(); + handle.chan.try_recv().unwrap().unwrap(); debug!("got payload in status for {}", mailbox_hash); if payload.is_none() { debug!("finished in status for {}", mailbox_hash); @@ -1613,12 +1647,18 @@ impl Account { .unwrap(); return true; } - let (rcvr, handle, job_id) = if self.backend_capabilities.is_async { + let handle = if self.backend_capabilities.is_async { self.job_executor.spawn_specialized(rest.into_future()) } else { self.job_executor.spawn_blocking(rest.into_future()) }; - self.insert_job(job_id, JobRequest::Fetch(mailbox_hash, handle, rcvr)); + self.insert_job( + handle.job_id, + JobRequest::Fetch { + mailbox_hash, + handle, + }, + ); let payload = payload.unwrap(); if let Err(err) = payload { self.sender @@ -1663,8 +1703,8 @@ impl Account { )))) .unwrap(); } - JobRequest::IsOnline(_, ref mut chan) => { - let is_online = chan.try_recv().unwrap(); + JobRequest::IsOnline { ref mut handle, .. } => { + let is_online = handle.chan.try_recv().unwrap(); if let Some(is_online) = is_online { self.sender .send(ThreadEvent::UIEvent(UIEvent::AccountStatusChange( @@ -1689,16 +1729,16 @@ impl Account { } let online_job = self.backend.read().unwrap().is_online(); if let Ok(online_job) = online_job { - let (rcvr, handle, job_id) = if self.backend_capabilities.is_async { + let handle = if self.backend_capabilities.is_async { self.job_executor.spawn_specialized(online_job) } else { self.job_executor.spawn_blocking(online_job) }; - self.insert_job(job_id, JobRequest::IsOnline(handle, rcvr)); + self.insert_job(handle.job_id, JobRequest::IsOnline { handle }); }; } - JobRequest::Refresh(_mailbox_hash, _, ref mut chan) => { - let r = chan.try_recv().unwrap(); + JobRequest::Refresh { ref mut handle, .. } => { + let r = handle.chan.try_recv().unwrap(); match r { Some(Ok(())) => { if self.is_online.is_err() @@ -1731,13 +1771,12 @@ impl Account { if !err.kind.is_authentication() { let online_job = self.backend.read().unwrap().is_online(); if let Ok(online_job) = online_job { - let (rcvr, handle, job_id) = - if self.backend_capabilities.is_async { - self.job_executor.spawn_specialized(online_job) - } else { - self.job_executor.spawn_blocking(online_job) - }; - self.insert_job(job_id, JobRequest::IsOnline(handle, rcvr)); + let handle = if self.backend_capabilities.is_async { + self.job_executor.spawn_specialized(online_job) + } else { + self.job_executor.spawn_blocking(online_job) + }; + self.insert_job(handle.job_id, JobRequest::IsOnline { handle }); }; } self.is_online = Err(err); @@ -1750,8 +1789,8 @@ impl Account { None => {} } } - JobRequest::SetFlags(_, _, ref mut chan) => { - let r = chan.try_recv().unwrap(); + JobRequest::SetFlags { ref mut handle, .. } => { + let r = handle.chan.try_recv().unwrap(); if let Some(Err(err)) = r { self.sender .send(ThreadEvent::UIEvent(UIEvent::Notification( @@ -1763,11 +1802,11 @@ impl Account { } } JobRequest::SaveMessage { - ref mut channel, + ref mut handle, ref bytes, .. } => { - let r = channel.try_recv().unwrap(); + let r = handle.chan.try_recv().unwrap(); if let Some(Err(err)) = r { melib::log(format!("Could not save message: {}", err), melib::ERROR); let file = crate::types::create_temp_file(bytes, None, None, false); @@ -1792,8 +1831,8 @@ impl Account { } } JobRequest::SendMessage => {} - JobRequest::SendMessageBackground(_, ref mut chan) => { - let r = chan.try_recv().unwrap(); + JobRequest::SendMessageBackground { ref mut handle, .. } => { + let r = handle.chan.try_recv().unwrap(); if let Some(Err(err)) = r { self.sender .send(ThreadEvent::UIEvent(UIEvent::Notification( @@ -1804,8 +1843,13 @@ impl Account { .expect("Could not send event on main channel"); } } - JobRequest::CopyTo(mailbox_hash, _, ref mut chan) => { - if let Err(err) = chan + JobRequest::CopyTo { + dest_mailbox_hash: mailbox_hash, + ref mut handle, + .. + } => { + if let Err(err) = handle + .chan .try_recv() .unwrap() .unwrap() @@ -1820,8 +1864,8 @@ impl Account { .expect("Could not send event on main channel"); } } - JobRequest::DeleteMessages(_, _, ref mut chan) => { - let r = chan.try_recv().unwrap(); + JobRequest::DeleteMessages { ref mut handle, .. } => { + let r = handle.chan.try_recv().unwrap(); if let Some(Err(err)) = r { self.sender .send(ThreadEvent::UIEvent(UIEvent::Notification( @@ -1834,10 +1878,10 @@ impl Account { } JobRequest::CreateMailbox { ref path, - ref mut channel, + ref mut handle, .. } => { - let r = channel.try_recv().unwrap(); + let r = handle.chan.try_recv().unwrap(); if let Some(r) = r { match r { Err(err) => { @@ -1918,10 +1962,10 @@ impl Account { } JobRequest::DeleteMailbox { mailbox_hash, - ref mut channel, + ref mut handle, .. } => { - let r = channel.try_recv().unwrap(); + let r = handle.chan.try_recv().unwrap(); match r { Some(Err(err)) => { self.sender @@ -1991,9 +2035,9 @@ impl Account { } } //JobRequest::RenameMailbox, - JobRequest::Search(_) | JobRequest::AsBytes(_) => {} - JobRequest::SetMailboxPermissions(_, _, ref mut chan) => { - let r = chan.try_recv().unwrap(); + JobRequest::Search { .. } | JobRequest::AsBytes { .. } => {} + JobRequest::SetMailboxPermissions { ref mut handle, .. } => { + let r = handle.chan.try_recv().unwrap(); match r { Some(Err(err)) => { self.sender @@ -2022,8 +2066,8 @@ impl Account { None => {} } } - JobRequest::SetMailboxSubscription(_, _, ref mut chan) => { - let r = chan.try_recv().unwrap(); + JobRequest::SetMailboxSubscription { ref mut handle, .. } => { + let r = handle.chan.try_recv().unwrap(); match r { Some(Err(err)) => { self.sender @@ -2052,12 +2096,9 @@ impl Account { None => {} } } - JobRequest::Watch { - ref mut channel, - handle: _, - } => { + JobRequest::Watch { ref mut handle } => { debug!("JobRequest::Watch finished??? "); - let r = channel.try_recv().unwrap(); + let r = handle.chan.try_recv().unwrap(); debug!("JobRequest::Watch {:?}", r); if let Some(Err(err)) = r { if err.kind.is_timeout() { @@ -2076,12 +2117,11 @@ impl Account { } JobRequest::Generic { ref name, - ref mut channel, - handle: _, + ref mut handle, ref mut on_finish, logging_level, } => { - let r = channel.try_recv().unwrap(); + let r = handle.chan.try_recv().unwrap(); match r { Some(Err(err)) => { self.sender diff --git a/src/jobs.rs b/src/jobs.rs index d82eff35e..63e0fcb4a 100644 --- a/src/jobs.rs +++ b/src/jobs.rs @@ -30,9 +30,7 @@ use melib::error::Result; use melib::smol; use std::future::Future; use std::panic::catch_unwind; -use std::pin::Pin; use std::sync::Arc; -use std::task::{Context, Poll}; use std::thread; use std::time::Duration; use uuid::Uuid; @@ -159,37 +157,8 @@ impl JobExecutor { ret } - /// Spawns a future on the executor. - pub fn spawn(&self, future: F) -> (JoinHandle, JobId) - where - F: Future> + Send + 'static, - { - let job_id = JobId::new(); - let finished_sender = self.sender.clone(); - let injector = self.global_queue.clone(); - // Create a task and schedule it for execution. - let (task, handle) = async_task::spawn( - async move { - let r = future.await; - finished_sender - .send(ThreadEvent::JobFinished(job_id)) - .unwrap(); - r - }, - move |task| injector.push(MeliTask { task, id: job_id }), - (), - ); - task.schedule(); - for unparker in self.parkers.iter() { - unparker.unpark(); - } - - // Return a join handle that retrieves the output of the future. - (JoinHandle(handle), job_id) - } - /// Spawns a future with a generic return value `R` - pub fn spawn_specialized(&self, future: F) -> (oneshot::Receiver, JoinHandle, JobId) + pub fn spawn_specialized(&self, future: F) -> JoinHandle where F: Future + Send + 'static, R: Send + 'static, @@ -216,11 +185,15 @@ impl JobExecutor { unparker.unpark(); } - (receiver, JoinHandle(handle), job_id) + JoinHandle { + inner: handle, + chan: receiver, + job_id, + } } /// Spawns a future with a generic return value `R` that might block on a new thread - pub fn spawn_blocking(&self, future: F) -> (oneshot::Receiver, JoinHandle, JobId) + pub fn spawn_blocking(&self, future: F) -> JoinHandle where F: Future + Send + 'static, R: Send + 'static, @@ -229,19 +202,39 @@ impl JobExecutor { } } -pub type JobChannel = oneshot::Receiver>; +pub type JobChannel = oneshot::Receiver; #[derive(Debug)] /// JoinHandle for the future that allows us to cancel the task. -pub struct JoinHandle(pub async_task::JoinHandle, ()>); +pub struct JoinHandle { + pub inner: async_task::JoinHandle, ()>, + pub chan: JobChannel, + pub job_id: JobId, +} +impl JoinHandle { + pub fn cancel(&self) { + self.inner.cancel() + } +} + +impl std::cmp::PartialEq for JoinHandle { + fn eq(&self, other: &JobId) -> bool { + self.job_id == *other + } +} + +/* +use std::pin::Pin; +use std::task::{Context, Poll}; impl Future for JoinHandle { type Output = Result<()>; fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll { - match Pin::new(&mut self.0).poll(cx) { + match Pin::new(&mut self.inner).poll(cx) { Poll::Pending => Poll::Pending, Poll::Ready(output) => Poll::Ready(output.expect("task failed")), } } } +*/ diff --git a/src/state.rs b/src/state.rs index 0dff0a188..2d0fbee88 100644 --- a/src/state.rs +++ b/src/state.rs @@ -873,14 +873,12 @@ impl State { } match crate::sqlite3::index(&mut self.context, account_index) { Ok(job) => { - let (channel, handle, job_id) = - self.context.job_executor.spawn_blocking(job); + let handle = self.context.job_executor.spawn_blocking(job); self.context.accounts[account_index].active_jobs.insert( - job_id, + handle.job_id, crate::conf::accounts::JobRequest::Generic { name: "Message index rebuild".into(), handle, - channel, on_finish: None, logging_level: melib::LoggingLevel::INFO, },