diff --git a/src/components/mail/listing/plain.rs b/src/components/mail/listing/plain.rs index 675fa8b6..e6677847 100644 --- a/src/components/mail/listing/plain.rs +++ b/src/components/mail/listing/plain.rs @@ -22,6 +22,7 @@ use super::EntryStrings; use super::*; use crate::components::utilities::PageMovement; +use crate::jobs1::{oneshot, JobId, JoinHandle}; use std::cmp; use std::iter::FromIterator; @@ -74,6 +75,7 @@ pub struct PlainListing { _row_updates: SmallVec<[ThreadHash; 8]>, color_cache: ColorCache, + active_jobs: HashMap>>, movement: Option, id: ComponentId, } @@ -701,6 +703,7 @@ impl PlainListing { unfocused: false, view: MailView::default(), color_cache: ColorCache::default(), + active_jobs: HashMap::default(), movement: None, id: ComponentId::new_v4(), @@ -997,23 +1000,29 @@ impl PlainListing { fn perform_action(&mut self, context: &mut Context, env_hash: EnvelopeHash, a: &ListingAction) { let account = &mut context.accounts[self.cursor_pos.0]; let hash = account.collection.get_env(env_hash).hash(); - if let Err(e) = account.operation(hash).and_then(|op| { + match account.operation(hash).and_then(|op| { let mut envelope: EnvelopeRefMut = account.collection.get_env_mut(env_hash); match a { ListingAction::SetSeen => envelope.set_seen(op), ListingAction::SetUnseen => envelope.set_unseen(op), ListingAction::Delete => { /* do nothing */ - Ok(()) + Err(MeliError::new("Delete is unimplemented")) } _ => unreachable!(), } }) { - context - .replies - .push_back(UIEvent::StatusEvent(StatusEvent::DisplayMessage( - e.to_string(), - ))); + Err(e) => { + context + .replies + .push_back(UIEvent::StatusEvent(StatusEvent::DisplayMessage( + e.to_string(), + ))); + } + Ok(fut) => { + let (handle, job_id) = account.job_executor.spawn_specialized(fut); + self.active_jobs.insert(job_id, handle); + } } self.row_updates.push(env_hash); } diff --git a/src/conf/accounts.rs b/src/conf/accounts.rs index fe873678..fc2540b4 100644 --- a/src/conf/accounts.rs +++ b/src/conf/accounts.rs @@ -147,7 +147,21 @@ enum JobRequest { )>, ), IsOnline(oneshot::Receiver>), - Refresh(oneshot::Receiver>), + Refresh(MailboxHash, oneshot::Receiver>), + SetFlags(EnvelopeHash, oneshot::Receiver>), + SaveMessage(MailboxHash, oneshot::Receiver>), + DeleteMessage(EnvelopeHash, oneshot::Receiver>), + CreateMailbox(oneshot::Receiver)>>), + DeleteMailbox(oneshot::Receiver>>), + //RenameMailbox, + Search( + crate::search::Query, + Option, + oneshot::Receiver>>, + ), + SetMailboxPermissions(MailboxHash, oneshot::Receiver>), + SetMailboxSubscription(MailboxHash, oneshot::Receiver>), + Watch(JoinHandle), } impl core::fmt::Debug for JobRequest { @@ -156,8 +170,21 @@ impl core::fmt::Debug for JobRequest { JobRequest::Mailboxes(_) => write!(f, "{}", "JobRequest::Mailboxes"), JobRequest::Get(hash, _) => write!(f, "JobRequest::Get({})", hash), JobRequest::IsOnline(_) => write!(f, "{}", "JobRequest::IsOnline"), - - JobRequest::Refresh(_) => write!(f, "{}", "JobRequest::Refresh"), + JobRequest::Refresh(_, _) => write!(f, "{}", "JobRequest::Refresh"), + JobRequest::SetFlags(_, _) => write!(f, "{}", "JobRequest::SetFlags"), + JobRequest::SaveMessage(_, _) => write!(f, "{}", "JobRequest::SaveMessage"), + JobRequest::DeleteMessage(_, _) => write!(f, "{}", "JobRequest::DeleteMessage"), + JobRequest::CreateMailbox(_) => write!(f, "{}", "JobRequest::CreateMailbox"), + JobRequest::DeleteMailbox(_) => write!(f, "{}", "JobRequest::DeleteMailbox"), + //JobRequest::RenameMailbox, + JobRequest::Search(_, _, _) => write!(f, "{}", "JobRequest::Search"), + JobRequest::SetMailboxPermissions(_, _) => { + write!(f, "{}", "JobRequest::SetMailboxPermissions") + } + JobRequest::SetMailboxSubscription(_, _) => { + write!(f, "{}", "JobRequest::SetMailboxSubscription") + } + JobRequest::Watch(_) => write!(f, "{}", "JobRequest::Watch"), } } } @@ -756,13 +783,20 @@ impl Account { .unwrap(); return Ok(()); } - let sender_ = self.sender.clone(); let r = RefreshEventConsumer::new(Box::new(move |r| { sender_.send(ThreadEvent::from(r)).unwrap(); })); - let mut h = self.backend.write().unwrap().refresh(mailbox_hash, r)?; - self.work_context.new_work.send(h.work().unwrap()).unwrap(); + if self.settings.conf.is_async { + if let Ok(refresh_job) = self.backend.write().unwrap().refresh_async(mailbox_hash, r) { + let (rcvr, job_id) = self.job_executor.spawn_specialized(refresh_job); + self.active_jobs + .insert(job_id, JobRequest::Refresh(mailbox_hash, rcvr)); + } + } else { + let mut h = self.backend.write().unwrap().refresh(mailbox_hash, r)?; + self.work_context.new_work.send(h.work().unwrap()).unwrap(); + } Ok(()) } pub fn watch(&self) { @@ -1426,7 +1460,181 @@ impl Account { self.active_jobs.insert(job_id, JobRequest::IsOnline(rcvr)); } } - _ => {} + JobRequest::Refresh(mailbox_hash, mut chan) => { + let r = debug!(chan.try_recv()).unwrap(); + if r.is_some() && r.unwrap().is_ok() { + self.is_online = true; + } + } + JobRequest::Refresh(_, mut chan) => { + let r = chan.try_recv().unwrap(); + if let Some(Err(err)) = r { + self.sender + .send(ThreadEvent::UIEvent(UIEvent::Notification( + Some(format!("{} refresh exited with error", &self.name)), + err.to_string(), + Some(crate::types::NotificationType::ERROR), + ))) + .expect("Could not send event on main channel"); + } + } + JobRequest::SetFlags(_, mut chan) => { + let r = chan.try_recv().unwrap(); + if let Some(Err(err)) = r { + self.sender + .send(ThreadEvent::UIEvent(UIEvent::Notification( + Some(format!("{}: could not set flag", &self.name)), + err.to_string(), + Some(crate::types::NotificationType::ERROR), + ))) + .expect("Could not send event on main channel"); + } + } + JobRequest::SaveMessage(_, mut chan) => { + let r = chan.try_recv().unwrap(); + if let Some(Err(err)) = r { + self.sender + .send(ThreadEvent::UIEvent(UIEvent::Notification( + Some(format!("{}: could not save message", &self.name)), + err.to_string(), + Some(crate::types::NotificationType::ERROR), + ))) + .expect("Could not send event on main channel"); + } + } + JobRequest::DeleteMessage(_, mut chan) => { + let r = chan.try_recv().unwrap(); + if let Some(Err(err)) = r { + self.sender + .send(ThreadEvent::UIEvent(UIEvent::Notification( + Some(format!("{}: could not delete message", &self.name)), + err.to_string(), + Some(crate::types::NotificationType::ERROR), + ))) + .expect("Could not send event on main channel"); + } + } + JobRequest::CreateMailbox(mut chan) => { + let r = chan.try_recv().unwrap(); + if let Some(r) = r { + self.sender + .send(match r { + Err(err) => ThreadEvent::UIEvent(UIEvent::Notification( + Some(format!("{}: could not create mailbox", &self.name)), + err.to_string(), + Some(crate::types::NotificationType::ERROR), + )), + Ok(_) => ThreadEvent::UIEvent(UIEvent::Notification( + Some(format!("Mailbox successfully created.")), + String::new(), + Some(crate::types::NotificationType::INFO), + )), + }) + .expect("Could not send event on main channel"); + } + } + JobRequest::DeleteMailbox(mut chan) => { + let r = chan.try_recv().unwrap(); + match r { + Some(Err(err)) => { + self.sender + .send(ThreadEvent::UIEvent(UIEvent::Notification( + Some(format!("{}: could not delete mailbox", &self.name)), + err.to_string(), + Some(crate::types::NotificationType::ERROR), + ))) + .expect("Could not send event on main channel"); + } + Some(Ok(_)) => { + self.sender + .send(ThreadEvent::UIEvent(UIEvent::Notification( + Some(format!("{}: mailbox deleted successfully", &self.name)), + String::new(), + Some(crate::types::NotificationType::INFO), + ))) + .expect("Could not send event on main channel"); + } + None => {} + } + } + //JobRequest::RenameMailbox, + JobRequest::Search(_, _, mut chan) => { + let r = chan.try_recv().unwrap(); + match r { + Some(Err(err)) => { + self.sender + .send(ThreadEvent::UIEvent(UIEvent::Notification( + Some(format!("{}: could not perform search", &self.name)), + err.to_string(), + Some(crate::types::NotificationType::ERROR), + ))) + .expect("Could not send event on main channel"); + } + Some(Ok(v)) => unimplemented!(), + None => {} + } + } + JobRequest::SetMailboxPermissions(_, mut chan) => { + let r = chan.try_recv().unwrap(); + match r { + Some(Err(err)) => { + self.sender + .send(ThreadEvent::UIEvent(UIEvent::Notification( + Some(format!( + "{}: could not set mailbox permissions", + &self.name + )), + err.to_string(), + Some(crate::types::NotificationType::ERROR), + ))) + .expect("Could not send event on main channel"); + } + Some(Ok(_)) => { + self.sender + .send(ThreadEvent::UIEvent(UIEvent::Notification( + Some(format!( + "{}: mailbox permissions set successfully", + &self.name + )), + String::new(), + Some(crate::types::NotificationType::INFO), + ))) + .expect("Could not send event on main channel"); + } + None => {} + } + } + JobRequest::SetMailboxSubscription(_, mut chan) => { + let r = chan.try_recv().unwrap(); + match r { + Some(Err(err)) => { + self.sender + .send(ThreadEvent::UIEvent(UIEvent::Notification( + Some(format!( + "{}: could not set mailbox subscription", + &self.name + )), + err.to_string(), + Some(crate::types::NotificationType::ERROR), + ))) + .expect("Could not send event on main channel"); + } + Some(Ok(_)) => { + self.sender + .send(ThreadEvent::UIEvent(UIEvent::Notification( + Some(format!( + "{}: mailbox subscription set successfully", + &self.name + )), + String::new(), + Some(crate::types::NotificationType::INFO), + ))) + .expect("Could not send event on main channel"); + } + None => {} + } + } + JobRequest::Watch(_) => {} } true } else { diff --git a/src/jobs1.rs b/src/jobs1.rs index 4cac95e8..e39d22ff 100644 --- a/src/jobs1.rs +++ b/src/jobs1.rs @@ -19,6 +19,7 @@ * along with meli. If not, see . */ +use melib::error::Result; use melib::smol; use std::future::Future; use std::panic::catch_unwind; @@ -34,7 +35,7 @@ use crossbeam::channel; use crossbeam::deque::{Injector, Steal, Stealer, Worker}; use crossbeam::sync::{Parker, Unparker}; use crossbeam::Sender; -use futures::channel::oneshot; +pub use futures::channel::oneshot; use once_cell::sync::Lazy; use std::iter; @@ -151,9 +152,9 @@ impl JobExecutor { ret } /// Spawns a future on the executor. - pub fn spawn(&self, future: F) -> JoinHandle + pub fn spawn(&self, future: F) -> (JoinHandle, JobId) where - F: Future + Send + 'static, + F: Future> + Send + 'static, { let job_id = JobId::new(); let _job_id = job_id.clone(); @@ -163,10 +164,11 @@ impl JobExecutor { // Create a task and schedule it for execution. let (task, handle) = async_task::spawn( async move { - let _ = future.await; + let r = future.await; finished_sender .send(ThreadEvent::JobFinished(__job_id)) .unwrap(); + r }, move |task| injector.push(MeliTask { task, id: _job_id }), (), @@ -177,7 +179,7 @@ impl JobExecutor { } // Return a join handle that retrieves the output of the future. - JoinHandle(handle) + (JoinHandle(handle), job_id) } ///// Spawns a future on the executor. @@ -227,11 +229,12 @@ impl JobExecutor { // JoinHandle(handle) //} +#[derive(Debug)] /// Awaits the output of a spawned future. -pub struct JoinHandle(async_task::JoinHandle<(), ()>); +pub struct JoinHandle(pub async_task::JoinHandle, ()>); impl Future for JoinHandle { - type Output = (); + type Output = Result<()>; fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll { match Pin::new(&mut self.0).poll(cx) {