JobRequest: add more variants

async
Manos Pitsidianakis 2020-06-29 00:18:24 +03:00
parent 42419327f8
commit 21051fa862
Signed by: Manos Pitsidianakis
GPG Key ID: 73627C2F690DF710
3 changed files with 241 additions and 21 deletions

View File

@ -22,6 +22,7 @@
use super::EntryStrings;
use super::*;
use crate::components::utilities::PageMovement;
use crate::jobs1::{oneshot, JobId, JoinHandle};
use std::cmp;
use std::iter::FromIterator;
@ -74,6 +75,7 @@ pub struct PlainListing {
_row_updates: SmallVec<[ThreadHash; 8]>,
color_cache: ColorCache,
active_jobs: HashMap<JobId, oneshot::Receiver<Result<()>>>,
movement: Option<PageMovement>,
id: ComponentId,
}
@ -701,6 +703,7 @@ impl PlainListing {
unfocused: false,
view: MailView::default(),
color_cache: ColorCache::default(),
active_jobs: HashMap::default(),
movement: None,
id: ComponentId::new_v4(),
@ -997,23 +1000,29 @@ impl PlainListing {
fn perform_action(&mut self, context: &mut Context, env_hash: EnvelopeHash, a: &ListingAction) {
let account = &mut context.accounts[self.cursor_pos.0];
let hash = account.collection.get_env(env_hash).hash();
if let Err(e) = account.operation(hash).and_then(|op| {
match account.operation(hash).and_then(|op| {
let mut envelope: EnvelopeRefMut = account.collection.get_env_mut(env_hash);
match a {
ListingAction::SetSeen => envelope.set_seen(op),
ListingAction::SetUnseen => envelope.set_unseen(op),
ListingAction::Delete => {
/* do nothing */
Ok(())
Err(MeliError::new("Delete is unimplemented"))
}
_ => unreachable!(),
}
}) {
context
.replies
.push_back(UIEvent::StatusEvent(StatusEvent::DisplayMessage(
e.to_string(),
)));
Err(e) => {
context
.replies
.push_back(UIEvent::StatusEvent(StatusEvent::DisplayMessage(
e.to_string(),
)));
}
Ok(fut) => {
let (handle, job_id) = account.job_executor.spawn_specialized(fut);
self.active_jobs.insert(job_id, handle);
}
}
self.row_updates.push(env_hash);
}

View File

@ -147,7 +147,21 @@ enum JobRequest {
)>,
),
IsOnline(oneshot::Receiver<Result<()>>),
Refresh(oneshot::Receiver<Result<()>>),
Refresh(MailboxHash, oneshot::Receiver<Result<()>>),
SetFlags(EnvelopeHash, oneshot::Receiver<Result<()>>),
SaveMessage(MailboxHash, oneshot::Receiver<Result<()>>),
DeleteMessage(EnvelopeHash, oneshot::Receiver<Result<()>>),
CreateMailbox(oneshot::Receiver<Result<(MailboxHash, HashMap<MailboxHash, Mailbox>)>>),
DeleteMailbox(oneshot::Receiver<Result<HashMap<MailboxHash, Mailbox>>>),
//RenameMailbox,
Search(
crate::search::Query,
Option<MailboxHash>,
oneshot::Receiver<Result<SmallVec<[EnvelopeHash; 512]>>>,
),
SetMailboxPermissions(MailboxHash, oneshot::Receiver<Result<()>>),
SetMailboxSubscription(MailboxHash, oneshot::Receiver<Result<()>>),
Watch(JoinHandle),
}
impl core::fmt::Debug for JobRequest {
@ -156,8 +170,21 @@ impl core::fmt::Debug for JobRequest {
JobRequest::Mailboxes(_) => write!(f, "{}", "JobRequest::Mailboxes"),
JobRequest::Get(hash, _) => write!(f, "JobRequest::Get({})", hash),
JobRequest::IsOnline(_) => write!(f, "{}", "JobRequest::IsOnline"),
JobRequest::Refresh(_) => write!(f, "{}", "JobRequest::Refresh"),
JobRequest::Refresh(_, _) => write!(f, "{}", "JobRequest::Refresh"),
JobRequest::SetFlags(_, _) => write!(f, "{}", "JobRequest::SetFlags"),
JobRequest::SaveMessage(_, _) => write!(f, "{}", "JobRequest::SaveMessage"),
JobRequest::DeleteMessage(_, _) => write!(f, "{}", "JobRequest::DeleteMessage"),
JobRequest::CreateMailbox(_) => write!(f, "{}", "JobRequest::CreateMailbox"),
JobRequest::DeleteMailbox(_) => write!(f, "{}", "JobRequest::DeleteMailbox"),
//JobRequest::RenameMailbox,
JobRequest::Search(_, _, _) => write!(f, "{}", "JobRequest::Search"),
JobRequest::SetMailboxPermissions(_, _) => {
write!(f, "{}", "JobRequest::SetMailboxPermissions")
}
JobRequest::SetMailboxSubscription(_, _) => {
write!(f, "{}", "JobRequest::SetMailboxSubscription")
}
JobRequest::Watch(_) => write!(f, "{}", "JobRequest::Watch"),
}
}
}
@ -756,13 +783,20 @@ impl Account {
.unwrap();
return Ok(());
}
let sender_ = self.sender.clone();
let r = RefreshEventConsumer::new(Box::new(move |r| {
sender_.send(ThreadEvent::from(r)).unwrap();
}));
let mut h = self.backend.write().unwrap().refresh(mailbox_hash, r)?;
self.work_context.new_work.send(h.work().unwrap()).unwrap();
if self.settings.conf.is_async {
if let Ok(refresh_job) = self.backend.write().unwrap().refresh_async(mailbox_hash, r) {
let (rcvr, job_id) = self.job_executor.spawn_specialized(refresh_job);
self.active_jobs
.insert(job_id, JobRequest::Refresh(mailbox_hash, rcvr));
}
} else {
let mut h = self.backend.write().unwrap().refresh(mailbox_hash, r)?;
self.work_context.new_work.send(h.work().unwrap()).unwrap();
}
Ok(())
}
pub fn watch(&self) {
@ -1426,7 +1460,181 @@ impl Account {
self.active_jobs.insert(job_id, JobRequest::IsOnline(rcvr));
}
}
_ => {}
JobRequest::Refresh(mailbox_hash, mut chan) => {
let r = debug!(chan.try_recv()).unwrap();
if r.is_some() && r.unwrap().is_ok() {
self.is_online = true;
}
}
JobRequest::Refresh(_, mut chan) => {
let r = chan.try_recv().unwrap();
if let Some(Err(err)) = r {
self.sender
.send(ThreadEvent::UIEvent(UIEvent::Notification(
Some(format!("{} refresh exited with error", &self.name)),
err.to_string(),
Some(crate::types::NotificationType::ERROR),
)))
.expect("Could not send event on main channel");
}
}
JobRequest::SetFlags(_, mut chan) => {
let r = chan.try_recv().unwrap();
if let Some(Err(err)) = r {
self.sender
.send(ThreadEvent::UIEvent(UIEvent::Notification(
Some(format!("{}: could not set flag", &self.name)),
err.to_string(),
Some(crate::types::NotificationType::ERROR),
)))
.expect("Could not send event on main channel");
}
}
JobRequest::SaveMessage(_, mut chan) => {
let r = chan.try_recv().unwrap();
if let Some(Err(err)) = r {
self.sender
.send(ThreadEvent::UIEvent(UIEvent::Notification(
Some(format!("{}: could not save message", &self.name)),
err.to_string(),
Some(crate::types::NotificationType::ERROR),
)))
.expect("Could not send event on main channel");
}
}
JobRequest::DeleteMessage(_, mut chan) => {
let r = chan.try_recv().unwrap();
if let Some(Err(err)) = r {
self.sender
.send(ThreadEvent::UIEvent(UIEvent::Notification(
Some(format!("{}: could not delete message", &self.name)),
err.to_string(),
Some(crate::types::NotificationType::ERROR),
)))
.expect("Could not send event on main channel");
}
}
JobRequest::CreateMailbox(mut chan) => {
let r = chan.try_recv().unwrap();
if let Some(r) = r {
self.sender
.send(match r {
Err(err) => ThreadEvent::UIEvent(UIEvent::Notification(
Some(format!("{}: could not create mailbox", &self.name)),
err.to_string(),
Some(crate::types::NotificationType::ERROR),
)),
Ok(_) => ThreadEvent::UIEvent(UIEvent::Notification(
Some(format!("Mailbox successfully created.")),
String::new(),
Some(crate::types::NotificationType::INFO),
)),
})
.expect("Could not send event on main channel");
}
}
JobRequest::DeleteMailbox(mut chan) => {
let r = chan.try_recv().unwrap();
match r {
Some(Err(err)) => {
self.sender
.send(ThreadEvent::UIEvent(UIEvent::Notification(
Some(format!("{}: could not delete mailbox", &self.name)),
err.to_string(),
Some(crate::types::NotificationType::ERROR),
)))
.expect("Could not send event on main channel");
}
Some(Ok(_)) => {
self.sender
.send(ThreadEvent::UIEvent(UIEvent::Notification(
Some(format!("{}: mailbox deleted successfully", &self.name)),
String::new(),
Some(crate::types::NotificationType::INFO),
)))
.expect("Could not send event on main channel");
}
None => {}
}
}
//JobRequest::RenameMailbox,
JobRequest::Search(_, _, mut chan) => {
let r = chan.try_recv().unwrap();
match r {
Some(Err(err)) => {
self.sender
.send(ThreadEvent::UIEvent(UIEvent::Notification(
Some(format!("{}: could not perform search", &self.name)),
err.to_string(),
Some(crate::types::NotificationType::ERROR),
)))
.expect("Could not send event on main channel");
}
Some(Ok(v)) => unimplemented!(),
None => {}
}
}
JobRequest::SetMailboxPermissions(_, mut chan) => {
let r = chan.try_recv().unwrap();
match r {
Some(Err(err)) => {
self.sender
.send(ThreadEvent::UIEvent(UIEvent::Notification(
Some(format!(
"{}: could not set mailbox permissions",
&self.name
)),
err.to_string(),
Some(crate::types::NotificationType::ERROR),
)))
.expect("Could not send event on main channel");
}
Some(Ok(_)) => {
self.sender
.send(ThreadEvent::UIEvent(UIEvent::Notification(
Some(format!(
"{}: mailbox permissions set successfully",
&self.name
)),
String::new(),
Some(crate::types::NotificationType::INFO),
)))
.expect("Could not send event on main channel");
}
None => {}
}
}
JobRequest::SetMailboxSubscription(_, mut chan) => {
let r = chan.try_recv().unwrap();
match r {
Some(Err(err)) => {
self.sender
.send(ThreadEvent::UIEvent(UIEvent::Notification(
Some(format!(
"{}: could not set mailbox subscription",
&self.name
)),
err.to_string(),
Some(crate::types::NotificationType::ERROR),
)))
.expect("Could not send event on main channel");
}
Some(Ok(_)) => {
self.sender
.send(ThreadEvent::UIEvent(UIEvent::Notification(
Some(format!(
"{}: mailbox subscription set successfully",
&self.name
)),
String::new(),
Some(crate::types::NotificationType::INFO),
)))
.expect("Could not send event on main channel");
}
None => {}
}
}
JobRequest::Watch(_) => {}
}
true
} else {

View File

@ -19,6 +19,7 @@
* along with meli. If not, see <http://www.gnu.org/licenses/>.
*/
use melib::error::Result;
use melib::smol;
use std::future::Future;
use std::panic::catch_unwind;
@ -34,7 +35,7 @@ use crossbeam::channel;
use crossbeam::deque::{Injector, Steal, Stealer, Worker};
use crossbeam::sync::{Parker, Unparker};
use crossbeam::Sender;
use futures::channel::oneshot;
pub use futures::channel::oneshot;
use once_cell::sync::Lazy;
use std::iter;
@ -151,9 +152,9 @@ impl JobExecutor {
ret
}
/// Spawns a future on the executor.
pub fn spawn<F>(&self, future: F) -> JoinHandle
pub fn spawn<F>(&self, future: F) -> (JoinHandle, JobId)
where
F: Future<Output = ()> + Send + 'static,
F: Future<Output = Result<()>> + Send + 'static,
{
let job_id = JobId::new();
let _job_id = job_id.clone();
@ -163,10 +164,11 @@ impl JobExecutor {
// Create a task and schedule it for execution.
let (task, handle) = async_task::spawn(
async move {
let _ = future.await;
let r = future.await;
finished_sender
.send(ThreadEvent::JobFinished(__job_id))
.unwrap();
r
},
move |task| injector.push(MeliTask { task, id: _job_id }),
(),
@ -177,7 +179,7 @@ impl JobExecutor {
}
// Return a join handle that retrieves the output of the future.
JoinHandle(handle)
(JoinHandle(handle), job_id)
}
///// Spawns a future on the executor.
@ -227,11 +229,12 @@ impl JobExecutor {
// JoinHandle(handle)
//}
#[derive(Debug)]
/// Awaits the output of a spawned future.
pub struct JoinHandle(async_task::JoinHandle<(), ()>);
pub struct JoinHandle(pub async_task::JoinHandle<Result<()>, ()>);
impl Future for JoinHandle {
type Output = ();
type Output = Result<()>;
fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
match Pin::new(&mut self.0).poll(cx) {