jobs: save handle for each Job

If we save the JoinHandle for each task, we can cancel it in future
commits if we have to timeout network requests.
master
Manos Pitsidianakis 2020-07-15 11:02:53 +03:00
parent 08c462801d
commit ddafde7b37
Signed by: Manos Pitsidianakis
GPG Key ID: 73627C2F690DF710
8 changed files with 139 additions and 105 deletions

View File

@ -256,12 +256,12 @@ impl Composer {
)); ));
} }
Ok(fut) => { Ok(fut) => {
let (mut rcvr, job_id) = context.accounts[coordinates.0] let (mut rcvr, handle, job_id) = context.accounts[coordinates.0]
.job_executor .job_executor
.spawn_specialized(fut); .spawn_specialized(fut);
context.accounts[coordinates.0] context.accounts[coordinates.0]
.active_jobs .active_jobs
.insert(job_id, JobRequest::AsBytes); .insert(job_id, JobRequest::AsBytes(handle));
if let Ok(Some(parent_bytes)) = try_recv_timeout!(&mut rcvr) { if let Ok(Some(parent_bytes)) = try_recv_timeout!(&mut rcvr) {
match parent_bytes { match parent_bytes {
Err(err) => { Err(err) => {

View File

@ -176,10 +176,10 @@ pub trait MailListingTrait: ListingTrait {
)); ));
} }
Ok(fut) => { Ok(fut) => {
let (rcvr, job_id) = account.job_executor.spawn_specialized(fut); let (rcvr, handle, job_id) = account.job_executor.spawn_specialized(fut);
account account
.active_jobs .active_jobs
.insert(job_id, JobRequest::SetFlags(env_hash, rcvr)); .insert(job_id, JobRequest::SetFlags(env_hash, handle, rcvr));
} }
}, },
ListingAction::SetUnseen => match envelope.set_unseen(op) { ListingAction::SetUnseen => match envelope.set_unseen(op) {
@ -189,10 +189,10 @@ pub trait MailListingTrait: ListingTrait {
)); ));
} }
Ok(fut) => { Ok(fut) => {
let (rcvr, job_id) = account.job_executor.spawn_specialized(fut); let (rcvr, handle, job_id) = account.job_executor.spawn_specialized(fut);
account account
.active_jobs .active_jobs
.insert(job_id, JobRequest::SetFlags(env_hash, rcvr)); .insert(job_id, JobRequest::SetFlags(env_hash, handle, rcvr));
} }
}, },
ListingAction::Delete => { ListingAction::Delete => {
@ -207,10 +207,11 @@ pub trait MailListingTrait: ListingTrait {
return; return;
} }
Ok(fut) => { Ok(fut) => {
let (rcvr, job_id) = account.job_executor.spawn_specialized(fut); let (rcvr, handle, job_id) =
account.job_executor.spawn_specialized(fut);
account account
.active_jobs .active_jobs
.insert(job_id, JobRequest::DeleteMessage(env_hash, rcvr)); .insert(job_id, JobRequest::DeleteMessage(env_hash, handle, rcvr));
} }
} }
continue; continue;
@ -230,10 +231,12 @@ pub trait MailListingTrait: ListingTrait {
return; return;
} }
Ok(fut) => { Ok(fut) => {
let (rcvr, job_id) = account.job_executor.spawn_specialized(fut); let (rcvr, handle, job_id) =
account account.job_executor.spawn_specialized(fut);
.active_jobs account.active_jobs.insert(
.insert(job_id, JobRequest::SaveMessage(mailbox_hash, rcvr)); job_id,
JobRequest::SaveMessage(mailbox_hash, handle, rcvr),
);
} }
} }
continue; continue;
@ -253,10 +256,11 @@ pub trait MailListingTrait: ListingTrait {
})?; })?;
let account = &mut context.accounts[account_pos]; let account = &mut context.accounts[account_pos];
let mailbox_hash = account.mailbox_by_path(mailbox_path)?; let mailbox_hash = account.mailbox_by_path(mailbox_path)?;
let (rcvr, job_id) = account.job_executor.spawn_specialized(bytes_fut); let (rcvr, handle, job_id) =
account.job_executor.spawn_specialized(bytes_fut);
account account
.active_jobs .active_jobs
.insert(job_id, JobRequest::CopyTo(mailbox_hash, rcvr)); .insert(job_id, JobRequest::CopyTo(mailbox_hash, handle, rcvr));
Ok(()) Ok(())
}) { }) {
context.replies.push_back(UIEvent::Notification( context.replies.push_back(UIEvent::Notification(
@ -283,10 +287,12 @@ pub trait MailListingTrait: ListingTrait {
return; return;
} }
Ok(fut) => { Ok(fut) => {
let (rcvr, job_id) = account.job_executor.spawn_specialized(fut); let (rcvr, handle, job_id) =
account account.job_executor.spawn_specialized(fut);
.active_jobs account.active_jobs.insert(
.insert(job_id, JobRequest::SaveMessage(mailbox_hash, rcvr)); job_id,
JobRequest::SaveMessage(mailbox_hash, handle, rcvr),
);
} }
} }
continue; continue;
@ -306,10 +312,11 @@ pub trait MailListingTrait: ListingTrait {
})?; })?;
let account = &mut context.accounts[account_pos]; let account = &mut context.accounts[account_pos];
let mailbox_hash = account.mailbox_by_path(mailbox_path)?; let mailbox_hash = account.mailbox_by_path(mailbox_path)?;
let (rcvr, job_id) = account.job_executor.spawn_specialized(bytes_fut); let (rcvr, handle, job_id) =
account.job_executor.spawn_specialized(bytes_fut);
account account
.active_jobs .active_jobs
.insert(job_id, JobRequest::CopyTo(mailbox_hash, rcvr)); .insert(job_id, JobRequest::CopyTo(mailbox_hash, handle, rcvr));
Ok(()) Ok(())
}) { }) {
context.replies.push_back(UIEvent::Notification( context.replies.push_back(UIEvent::Notification(
@ -339,10 +346,11 @@ pub trait MailListingTrait: ListingTrait {
return; return;
} }
Ok(fut) => { Ok(fut) => {
let (rcvr, job_id) = account.job_executor.spawn_specialized(fut); let (rcvr, handle, job_id) =
account.job_executor.spawn_specialized(fut);
account account
.active_jobs .active_jobs
.insert(job_id, JobRequest::SetFlags(env_hash, rcvr)); .insert(job_id, JobRequest::SetFlags(env_hash, handle, rcvr));
} }
} }
} }
@ -357,10 +365,11 @@ pub trait MailListingTrait: ListingTrait {
return; return;
} }
Ok(fut) => { Ok(fut) => {
let (rcvr, job_id) = account.job_executor.spawn_specialized(fut); let (rcvr, handle, job_id) =
account.job_executor.spawn_specialized(fut);
account account
.active_jobs .active_jobs
.insert(job_id, JobRequest::SetFlags(env_hash, rcvr)); .insert(job_id, JobRequest::SetFlags(env_hash, handle, rcvr));
} }
} }
} }

View File

@ -1534,12 +1534,12 @@ impl Component for CompactListing {
self.cursor_pos.1, self.cursor_pos.1,
) { ) {
Ok(job) => { Ok(job) => {
let (chan, job_id) = context.accounts[self.cursor_pos.0] let (chan, handle, job_id) = context.accounts[self.cursor_pos.0]
.job_executor .job_executor
.spawn_specialized(job); .spawn_specialized(job);
context.accounts[self.cursor_pos.0] context.accounts[self.cursor_pos.0]
.active_jobs .active_jobs
.insert(job_id.clone(), crate::conf::accounts::JobRequest::Search); .insert(job_id, crate::conf::accounts::JobRequest::Search(handle));
self.search_job = Some((filter_term.to_string(), chan, job_id)); self.search_job = Some((filter_term.to_string(), chan, job_id));
} }
Err(err) => { Err(err) => {

View File

@ -1354,12 +1354,12 @@ impl Component for ConversationsListing {
self.cursor_pos.1, self.cursor_pos.1,
) { ) {
Ok(job) => { Ok(job) => {
let (chan, job_id) = context.accounts[self.cursor_pos.0] let (chan, handle, job_id) = context.accounts[self.cursor_pos.0]
.job_executor .job_executor
.spawn_specialized(job); .spawn_specialized(job);
context.accounts[self.cursor_pos.0] context.accounts[self.cursor_pos.0]
.active_jobs .active_jobs
.insert(job_id, crate::conf::accounts::JobRequest::Search); .insert(job_id, crate::conf::accounts::JobRequest::Search(handle));
self.search_job = Some((filter_term.to_string(), chan, job_id)); self.search_job = Some((filter_term.to_string(), chan, job_id));
} }
Err(err) => { Err(err) => {

View File

@ -22,7 +22,7 @@
use super::EntryStrings; use super::EntryStrings;
use super::*; use super::*;
use crate::components::utilities::PageMovement; use crate::components::utilities::PageMovement;
use crate::jobs::{oneshot, JobId}; use crate::jobs::{oneshot, JobId, JoinHandle};
use std::cmp; use std::cmp;
use std::iter::FromIterator; use std::iter::FromIterator;
@ -80,7 +80,7 @@ pub struct PlainListing {
_row_updates: SmallVec<[ThreadHash; 8]>, _row_updates: SmallVec<[ThreadHash; 8]>,
color_cache: ColorCache, color_cache: ColorCache,
active_jobs: HashMap<JobId, oneshot::Receiver<Result<()>>>, active_jobs: HashMap<JobId, (JoinHandle, oneshot::Receiver<Result<()>>)>,
movement: Option<PageMovement>, movement: Option<PageMovement>,
id: ComponentId, id: ComponentId,
} }
@ -1031,8 +1031,8 @@ impl PlainListing {
))); )));
} }
Ok(fut) => { Ok(fut) => {
let (handle, job_id) = account.job_executor.spawn_specialized(fut); let (rcvr, handle, job_id) = account.job_executor.spawn_specialized(fut);
self.active_jobs.insert(job_id, handle); self.active_jobs.insert(job_id, (handle, rcvr));
} }
} }
self.row_updates.push(env_hash); self.row_updates.push(env_hash);
@ -1284,12 +1284,13 @@ impl Component for PlainListing {
self.cursor_pos.1, self.cursor_pos.1,
) { ) {
Ok(job) => { Ok(job) => {
let (chan, job_id) = context.accounts[self.cursor_pos.0] let (chan, handle, job_id) = context.accounts[self.cursor_pos.0]
.job_executor .job_executor
.spawn_specialized(job); .spawn_specialized(job);
context.accounts[self.cursor_pos.0] context.accounts[self.cursor_pos.0].active_jobs.insert(
.active_jobs job_id,
.insert(job_id.clone(), crate::conf::accounts::JobRequest::Search); crate::conf::accounts::JobRequest::Search(handle),
);
self.search_job = Some((filter_term.to_string(), chan, job_id)); self.search_job = Some((filter_term.to_string(), chan, job_id));
} }
Err(err) => { Err(err) => {

View File

@ -193,13 +193,16 @@ impl MailView {
.and_then(|mut op| op.as_bytes()) .and_then(|mut op| op.as_bytes())
{ {
Ok(fut) => { Ok(fut) => {
let (mut chan, job_id) = account.job_executor.spawn_specialized(fut); let (mut chan, handle, job_id) =
account.job_executor.spawn_specialized(fut);
if let Ok(Some(bytes_result)) = try_recv_timeout!(&mut chan) { if let Ok(Some(bytes_result)) = try_recv_timeout!(&mut chan) {
self.state = MailViewState::Loaded { body: bytes_result }; self.state = MailViewState::Loaded { body: bytes_result };
} else { } else {
self.state = MailViewState::LoadingBody { job_id, chan }; self.state = MailViewState::LoadingBody { job_id, chan };
self.active_jobs.insert(job_id); self.active_jobs.insert(job_id);
account.active_jobs.insert(job_id, JobRequest::AsBytes); account
.active_jobs
.insert(job_id, JobRequest::AsBytes(handle));
context context
.replies .replies
.push_back(UIEvent::StatusEvent(StatusEvent::NewJob(job_id))); .push_back(UIEvent::StatusEvent(StatusEvent::NewJob(job_id)));
@ -218,10 +221,11 @@ impl MailView {
.and_then(|mut op| op.set_flag(Flag::SEEN, true)) .and_then(|mut op| op.set_flag(Flag::SEEN, true))
{ {
Ok(fut) => { Ok(fut) => {
let (rcvr, job_id) = account.job_executor.spawn_specialized(fut); let (rcvr, handle, job_id) = account.job_executor.spawn_specialized(fut);
account account.active_jobs.insert(
.active_jobs job_id,
.insert(job_id, JobRequest::SetFlags(self.coordinates.2, rcvr)); JobRequest::SetFlags(self.coordinates.2, handle, rcvr),
);
} }
Err(e) => { Err(e) => {
context.replies.push_back(UIEvent::StatusEvent( context.replies.push_back(UIEvent::StatusEvent(

View File

@ -157,53 +157,65 @@ pub struct Account {
} }
pub enum JobRequest { pub enum JobRequest {
Mailboxes(oneshot::Receiver<Result<HashMap<MailboxHash, Mailbox>>>), Mailboxes(
JoinHandle,
oneshot::Receiver<Result<HashMap<MailboxHash, Mailbox>>>,
),
Get( Get(
MailboxHash, MailboxHash,
JoinHandle,
oneshot::Receiver<( oneshot::Receiver<(
Option<Result<Vec<Envelope>>>, Option<Result<Vec<Envelope>>>,
Pin<Box<dyn Stream<Item = Result<Vec<Envelope>>> + Send + 'static>>, Pin<Box<dyn Stream<Item = Result<Vec<Envelope>>> + Send + 'static>>,
)>, )>,
), ),
IsOnline(oneshot::Receiver<Result<()>>), IsOnline(JoinHandle, oneshot::Receiver<Result<()>>),
Refresh(MailboxHash, oneshot::Receiver<Result<()>>), Refresh(MailboxHash, JoinHandle, oneshot::Receiver<Result<()>>),
SetFlags(EnvelopeHash, oneshot::Receiver<Result<()>>), SetFlags(EnvelopeHash, JoinHandle, oneshot::Receiver<Result<()>>),
SaveMessage(MailboxHash, oneshot::Receiver<Result<()>>), SaveMessage(MailboxHash, JoinHandle, oneshot::Receiver<Result<()>>),
CopyTo(MailboxHash, oneshot::Receiver<Result<Vec<u8>>>), SendMessage(JoinHandle, oneshot::Receiver<Result<()>>),
DeleteMessage(EnvelopeHash, oneshot::Receiver<Result<()>>), CopyTo(MailboxHash, JoinHandle, oneshot::Receiver<Result<Vec<u8>>>),
CreateMailbox(oneshot::Receiver<Result<(MailboxHash, HashMap<MailboxHash, Mailbox>)>>), DeleteMessage(EnvelopeHash, JoinHandle, oneshot::Receiver<Result<()>>),
DeleteMailbox(oneshot::Receiver<Result<HashMap<MailboxHash, Mailbox>>>), CreateMailbox(
JoinHandle,
oneshot::Receiver<Result<(MailboxHash, HashMap<MailboxHash, Mailbox>)>>,
),
DeleteMailbox(
JoinHandle,
oneshot::Receiver<Result<HashMap<MailboxHash, Mailbox>>>,
),
//RenameMailbox, //RenameMailbox,
Search, Search(JoinHandle),
AsBytes, AsBytes(JoinHandle),
SetMailboxPermissions(MailboxHash, oneshot::Receiver<Result<()>>), SetMailboxPermissions(MailboxHash, JoinHandle, oneshot::Receiver<Result<()>>),
SetMailboxSubscription(MailboxHash, oneshot::Receiver<Result<()>>), SetMailboxSubscription(MailboxHash, JoinHandle, oneshot::Receiver<Result<()>>),
Watch(JoinHandle), Watch(JoinHandle),
} }
impl core::fmt::Debug for JobRequest { impl core::fmt::Debug for JobRequest {
fn fmt(&self, f: &mut core::fmt::Formatter) -> core::fmt::Result { fn fmt(&self, f: &mut core::fmt::Formatter) -> core::fmt::Result {
match self { match self {
JobRequest::Mailboxes(_) => write!(f, "JobRequest::Mailboxes"), JobRequest::Mailboxes(_, _) => write!(f, "JobRequest::Mailboxes"),
JobRequest::Get(hash, _) => write!(f, "JobRequest::Get({})", hash), JobRequest::Get(hash, _, _) => write!(f, "JobRequest::Get({})", hash),
JobRequest::IsOnline(_) => write!(f, "JobRequest::IsOnline"), JobRequest::IsOnline(_, _) => write!(f, "JobRequest::IsOnline"),
JobRequest::Refresh(_, _) => write!(f, "JobRequest::Refresh"), JobRequest::Refresh(_, _, _) => write!(f, "JobRequest::Refresh"),
JobRequest::SetFlags(_, _) => write!(f, "JobRequest::SetFlags"), JobRequest::SetFlags(_, _, _) => write!(f, "JobRequest::SetFlags"),
JobRequest::SaveMessage(_, _) => write!(f, "JobRequest::SaveMessage"), JobRequest::SaveMessage(_, _, _) => write!(f, "JobRequest::SaveMessage"),
JobRequest::CopyTo(_, _) => write!(f, "JobRequest::CopyTo"), JobRequest::CopyTo(_, _, _) => write!(f, "JobRequest::CopyTo"),
JobRequest::DeleteMessage(_, _) => write!(f, "JobRequest::DeleteMessage"), JobRequest::DeleteMessage(_, _, _) => write!(f, "JobRequest::DeleteMessage"),
JobRequest::CreateMailbox(_) => write!(f, "JobRequest::CreateMailbox"), JobRequest::CreateMailbox(_, _) => write!(f, "JobRequest::CreateMailbox"),
JobRequest::DeleteMailbox(_) => write!(f, "JobRequest::DeleteMailbox"), JobRequest::DeleteMailbox(_, _) => write!(f, "JobRequest::DeleteMailbox"),
//JobRequest::RenameMailbox, //JobRequest::RenameMailbox,
JobRequest::Search => write!(f, "JobRequest::Search"), JobRequest::Search(_) => write!(f, "JobRequest::Search"),
JobRequest::AsBytes => write!(f, "JobRequest::AsBytes"), JobRequest::AsBytes(_) => write!(f, "JobRequest::AsBytes"),
JobRequest::SetMailboxPermissions(_, _) => { JobRequest::SetMailboxPermissions(_, _, _) => {
write!(f, "JobRequest::SetMailboxPermissions") write!(f, "JobRequest::SetMailboxPermissions")
} }
JobRequest::SetMailboxSubscription(_, _) => { JobRequest::SetMailboxSubscription(_, _, _) => {
write!(f, "JobRequest::SetMailboxSubscription") write!(f, "JobRequest::SetMailboxSubscription")
} }
JobRequest::Watch(_) => write!(f, "JobRequest::Watch"), JobRequest::Watch(_) => write!(f, "JobRequest::Watch"),
JobRequest::SendMessage(_, _) => write!(f, "JobRequest::SendMessage"),
} }
} }
} }
@ -218,14 +230,14 @@ impl JobRequest {
fn is_get(&self, mailbox_hash: MailboxHash) -> bool { fn is_get(&self, mailbox_hash: MailboxHash) -> bool {
match self { match self {
JobRequest::Get(h, _) if *h == mailbox_hash => true, JobRequest::Get(h, _, _) if *h == mailbox_hash => true,
_ => false, _ => false,
} }
} }
fn is_online(&self) -> bool { fn is_online(&self) -> bool {
match self { match self {
JobRequest::IsOnline(_) => true, JobRequest::IsOnline(_, _) => true,
_ => false, _ => false,
} }
} }
@ -338,9 +350,9 @@ impl Account {
if backend.is_async() { if backend.is_async() {
if let Ok(mailboxes_job) = backend.mailboxes_async() { if let Ok(mailboxes_job) = backend.mailboxes_async() {
if let Ok(online_job) = backend.is_online_async() { if let Ok(online_job) = backend.is_online_async() {
let (rcvr, job_id) = let (rcvr, handle, job_id) =
job_executor.spawn_specialized(online_job.then(|_| mailboxes_job)); job_executor.spawn_specialized(online_job.then(|_| mailboxes_job));
active_jobs.insert(job_id, JobRequest::Mailboxes(rcvr)); active_jobs.insert(job_id, JobRequest::Mailboxes(handle, rcvr));
} }
} }
} }
@ -511,13 +523,15 @@ impl Account {
if self.is_async { if self.is_async {
if let Ok(mailbox_job) = self.backend.write().unwrap().get_async(&f) { if let Ok(mailbox_job) = self.backend.write().unwrap().get_async(&f) {
let mailbox_job = mailbox_job.into_future(); let mailbox_job = mailbox_job.into_future();
let (rcvr, job_id) = self.job_executor.spawn_specialized(mailbox_job); let (rcvr, handle, job_id) =
self.job_executor.spawn_specialized(mailbox_job);
self.sender self.sender
.send(ThreadEvent::UIEvent(UIEvent::StatusEvent( .send(ThreadEvent::UIEvent(UIEvent::StatusEvent(
StatusEvent::NewJob(job_id), StatusEvent::NewJob(job_id),
))) )))
.unwrap(); .unwrap();
self.active_jobs.insert(job_id, JobRequest::Get(*h, rcvr)); self.active_jobs
.insert(job_id, JobRequest::Get(*h, handle, rcvr));
} }
} else { } else {
entry.worker = match Account::new_worker( entry.worker = match Account::new_worker(
@ -872,14 +886,14 @@ impl Account {
})); }));
if self.is_async { if self.is_async {
if let Ok(refresh_job) = self.backend.write().unwrap().refresh_async(mailbox_hash, r) { if let Ok(refresh_job) = self.backend.write().unwrap().refresh_async(mailbox_hash, r) {
let (rcvr, job_id) = self.job_executor.spawn_specialized(refresh_job); let (rcvr, handle, job_id) = self.job_executor.spawn_specialized(refresh_job);
self.sender self.sender
.send(ThreadEvent::UIEvent(UIEvent::StatusEvent( .send(ThreadEvent::UIEvent(UIEvent::StatusEvent(
StatusEvent::NewJob(job_id), StatusEvent::NewJob(job_id),
))) )))
.unwrap(); .unwrap();
self.active_jobs self.active_jobs
.insert(job_id, JobRequest::Refresh(mailbox_hash, rcvr)); .insert(job_id, JobRequest::Refresh(mailbox_hash, handle, rcvr));
} }
} else { } else {
let mut h = self.backend.write().unwrap().refresh(mailbox_hash, r)?; let mut h = self.backend.write().unwrap().refresh(mailbox_hash, r)?;
@ -994,7 +1008,7 @@ impl Account {
) { ) {
Ok(mailbox_job) => { Ok(mailbox_job) => {
let mailbox_job = mailbox_job.into_future(); let mailbox_job = mailbox_job.into_future();
let (rcvr, job_id) = let (rcvr, handle, job_id) =
self.job_executor.spawn_specialized(mailbox_job); self.job_executor.spawn_specialized(mailbox_job);
self.sender self.sender
.send(ThreadEvent::UIEvent(UIEvent::StatusEvent( .send(ThreadEvent::UIEvent(UIEvent::StatusEvent(
@ -1003,7 +1017,7 @@ impl Account {
.unwrap(); .unwrap();
self.active_jobs.insert( self.active_jobs.insert(
job_id, job_id,
JobRequest::Get(mailbox_hash, rcvr), JobRequest::Get(mailbox_hash, handle, rcvr),
); );
} }
Err(err) => { Err(err) => {
@ -1196,14 +1210,14 @@ impl Account {
.write() .write()
.unwrap() .unwrap()
.save(bytes.to_vec(), mailbox_hash, flags)?; .save(bytes.to_vec(), mailbox_hash, flags)?;
let (rcvr, job_id) = self.job_executor.spawn_specialized(job); let (rcvr, handle, job_id) = self.job_executor.spawn_specialized(job);
self.sender self.sender
.send(ThreadEvent::UIEvent(UIEvent::StatusEvent( .send(ThreadEvent::UIEvent(UIEvent::StatusEvent(
StatusEvent::NewJob(job_id), StatusEvent::NewJob(job_id),
))) )))
.unwrap(); .unwrap();
self.active_jobs self.active_jobs
.insert(job_id, JobRequest::SaveMessage(mailbox_hash, rcvr)); .insert(job_id, JobRequest::SaveMessage(mailbox_hash, handle, rcvr));
Ok(()) Ok(())
} }
@ -1419,8 +1433,9 @@ impl Account {
} }
if !self.active_jobs.values().any(JobRequest::is_online) { if !self.active_jobs.values().any(JobRequest::is_online) {
if let Ok(online_job) = self.backend.read().unwrap().is_online_async() { if let Ok(online_job) = self.backend.read().unwrap().is_online_async() {
let (rcvr, job_id) = self.job_executor.spawn_specialized(online_job); let (rcvr, handle, job_id) = self.job_executor.spawn_specialized(online_job);
self.active_jobs.insert(job_id, JobRequest::IsOnline(rcvr)); self.active_jobs
.insert(job_id, JobRequest::IsOnline(handle, rcvr));
} }
} }
return self.is_online.clone(); return self.is_online.clone();
@ -1506,20 +1521,21 @@ impl Account {
pub fn process_event(&mut self, job_id: &JobId) -> bool { pub fn process_event(&mut self, job_id: &JobId) -> bool {
if self.active_jobs.contains_key(job_id) { if self.active_jobs.contains_key(job_id) {
match self.active_jobs.remove(job_id).unwrap() { match self.active_jobs.remove(job_id).unwrap() {
JobRequest::Mailboxes(mut chan) => { JobRequest::Mailboxes(_, mut chan) => {
if let Some(mailboxes) = chan.try_recv().unwrap() { if let Some(mailboxes) = chan.try_recv().unwrap() {
if mailboxes.is_err() || self.init(Some(mailboxes.unwrap())).is_err() { if mailboxes.is_err() || self.init(Some(mailboxes.unwrap())).is_err() {
if let Ok(mailboxes_job) = if let Ok(mailboxes_job) =
self.backend.read().unwrap().mailboxes_async() self.backend.read().unwrap().mailboxes_async()
{ {
let (rcvr, job_id) = let (rcvr, handle, job_id) =
self.job_executor.spawn_specialized(mailboxes_job); self.job_executor.spawn_specialized(mailboxes_job);
self.active_jobs.insert(job_id, JobRequest::Mailboxes(rcvr)); self.active_jobs
.insert(job_id, JobRequest::Mailboxes(handle, rcvr));
} }
} }
} }
} }
JobRequest::Get(mailbox_hash, mut chan) => { JobRequest::Get(mailbox_hash, _, mut chan) => {
self.sender self.sender
.send(ThreadEvent::UIEvent(UIEvent::StatusEvent( .send(ThreadEvent::UIEvent(UIEvent::StatusEvent(
StatusEvent::JobFinished(*job_id), StatusEvent::JobFinished(*job_id),
@ -1545,14 +1561,15 @@ impl Account {
.unwrap(); .unwrap();
return true; return true;
} }
let (rcvr, job_id) = self.job_executor.spawn_specialized(rest.into_future()); let (rcvr, handle, job_id) =
self.job_executor.spawn_specialized(rest.into_future());
self.sender self.sender
.send(ThreadEvent::UIEvent(UIEvent::StatusEvent( .send(ThreadEvent::UIEvent(UIEvent::StatusEvent(
StatusEvent::NewJob(job_id), StatusEvent::NewJob(job_id),
))) )))
.unwrap(); .unwrap();
self.active_jobs self.active_jobs
.insert(job_id, JobRequest::Get(mailbox_hash, rcvr)); .insert(job_id, JobRequest::Get(mailbox_hash, handle, rcvr));
let payload = payload.unwrap(); let payload = payload.unwrap();
if let Err(err) = payload { if let Err(err) = payload {
self.mailbox_entries self.mailbox_entries
@ -1592,7 +1609,7 @@ impl Account {
)))) ))))
.unwrap(); .unwrap();
} }
JobRequest::IsOnline(mut chan) => { JobRequest::IsOnline(_, mut chan) => {
let is_online = chan.try_recv().unwrap(); let is_online = chan.try_recv().unwrap();
if let Some(is_online) = is_online { if let Some(is_online) = is_online {
self.sender self.sender
@ -1610,11 +1627,13 @@ impl Account {
self.is_online = is_online; self.is_online = is_online;
} }
if let Ok(online_job) = self.backend.read().unwrap().is_online_async() { if let Ok(online_job) = self.backend.read().unwrap().is_online_async() {
let (rcvr, job_id) = self.job_executor.spawn_specialized(online_job); let (rcvr, handle, job_id) =
self.active_jobs.insert(job_id, JobRequest::IsOnline(rcvr)); self.job_executor.spawn_specialized(online_job);
self.active_jobs
.insert(job_id, JobRequest::IsOnline(handle, rcvr));
} }
} }
JobRequest::Refresh(_mailbox_hash, mut chan) => { JobRequest::Refresh(_mailbox_hash, _, mut chan) => {
let r = chan.try_recv().unwrap(); let r = chan.try_recv().unwrap();
if let Some(r) = r { if let Some(r) = r {
if r.is_ok() { if r.is_ok() {
@ -1635,7 +1654,7 @@ impl Account {
))) )))
.unwrap(); .unwrap();
} }
JobRequest::SetFlags(_, mut chan) => { JobRequest::SetFlags(_, _, mut chan) => {
let r = chan.try_recv().unwrap(); let r = chan.try_recv().unwrap();
if let Some(Err(err)) = r { if let Some(Err(err)) = r {
self.sender self.sender
@ -1647,7 +1666,7 @@ impl Account {
.expect("Could not send event on main channel"); .expect("Could not send event on main channel");
} }
} }
JobRequest::SaveMessage(_, mut chan) => { JobRequest::SaveMessage(_, _, mut chan) => {
let r = chan.try_recv().unwrap(); let r = chan.try_recv().unwrap();
if let Some(Err(err)) = r { if let Some(Err(err)) = r {
self.sender self.sender
@ -1659,7 +1678,7 @@ impl Account {
.expect("Could not send event on main channel"); .expect("Could not send event on main channel");
} }
} }
JobRequest::CopyTo(mailbox_hash, mut chan) => { JobRequest::CopyTo(mailbox_hash, _, mut chan) => {
if let Err(err) = chan if let Err(err) = chan
.try_recv() .try_recv()
.unwrap() .unwrap()
@ -1675,7 +1694,7 @@ impl Account {
.expect("Could not send event on main channel"); .expect("Could not send event on main channel");
} }
} }
JobRequest::DeleteMessage(_, mut chan) => { JobRequest::DeleteMessage(_, _, mut chan) => {
let r = chan.try_recv().unwrap(); let r = chan.try_recv().unwrap();
if let Some(Err(err)) = r { if let Some(Err(err)) = r {
self.sender self.sender
@ -1687,7 +1706,7 @@ impl Account {
.expect("Could not send event on main channel"); .expect("Could not send event on main channel");
} }
} }
JobRequest::CreateMailbox(mut chan) => { JobRequest::CreateMailbox(_, mut chan) => {
let r = chan.try_recv().unwrap(); let r = chan.try_recv().unwrap();
if let Some(r) = r { if let Some(r) = r {
self.sender self.sender
@ -1706,7 +1725,7 @@ impl Account {
.expect("Could not send event on main channel"); .expect("Could not send event on main channel");
} }
} }
JobRequest::DeleteMailbox(mut chan) => { JobRequest::DeleteMailbox(_, mut chan) => {
let r = chan.try_recv().unwrap(); let r = chan.try_recv().unwrap();
match r { match r {
Some(Err(err)) => { Some(Err(err)) => {
@ -1731,14 +1750,14 @@ impl Account {
} }
} }
//JobRequest::RenameMailbox, //JobRequest::RenameMailbox,
JobRequest::Search | JobRequest::AsBytes => { JobRequest::Search(_) | JobRequest::AsBytes(_) => {
self.sender self.sender
.send(ThreadEvent::UIEvent(UIEvent::StatusEvent( .send(ThreadEvent::UIEvent(UIEvent::StatusEvent(
StatusEvent::JobFinished(*job_id), StatusEvent::JobFinished(*job_id),
))) )))
.unwrap(); .unwrap();
} }
JobRequest::SetMailboxPermissions(_, mut chan) => { JobRequest::SetMailboxPermissions(_, _, mut chan) => {
let r = chan.try_recv().unwrap(); let r = chan.try_recv().unwrap();
match r { match r {
Some(Err(err)) => { Some(Err(err)) => {
@ -1768,7 +1787,7 @@ impl Account {
None => {} None => {}
} }
} }
JobRequest::SetMailboxSubscription(_, mut chan) => { JobRequest::SetMailboxSubscription(_, _, mut chan) => {
let r = chan.try_recv().unwrap(); let r = chan.try_recv().unwrap();
match r { match r {
Some(Err(err)) => { Some(Err(err)) => {

View File

@ -183,7 +183,7 @@ impl JobExecutor {
} }
///// Spawns a future on the executor. ///// Spawns a future on the executor.
pub fn spawn_specialized<F, R>(&self, future: F) -> (oneshot::Receiver<R>, JobId) pub fn spawn_specialized<F, R>(&self, future: F) -> (oneshot::Receiver<R>, JoinHandle, JobId)
where where
F: Future<Output = R> + Send + 'static, F: Future<Output = R> + Send + 'static,
R: Send + 'static, R: Send + 'static,
@ -195,13 +195,14 @@ impl JobExecutor {
let __job_id = job_id; let __job_id = job_id;
let injector = self.global_queue.clone(); let injector = self.global_queue.clone();
// Create a task and schedule it for execution. // Create a task and schedule it for execution.
let (task, _) = async_task::spawn( let (task, handle) = async_task::spawn(
async move { async move {
let res = future.await; let res = future.await;
let _ = sender.send(res); let _ = sender.send(res);
finished_sender finished_sender
.send(ThreadEvent::JobFinished(__job_id)) .send(ThreadEvent::JobFinished(__job_id))
.unwrap(); .unwrap();
Ok(())
}, },
move |task| injector.push(MeliTask { task, id: _job_id }), move |task| injector.push(MeliTask { task, id: _job_id }),
(), (),
@ -211,7 +212,7 @@ impl JobExecutor {
unparker.unpark(); unparker.unpark();
} }
(receiver, job_id) (receiver, JoinHandle(handle), job_id)
} }
} }