diff --git a/melib/src/async_workers.rs b/melib/src/async_workers.rs deleted file mode 100644 index 7b45c09f..00000000 --- a/melib/src/async_workers.rs +++ /dev/null @@ -1,255 +0,0 @@ -/* - * meli - async module - * - * Copyright 2017 Manos Pitsidianakis - * - * This file is part of meli. - * - * meli is free software: you can redistribute it and/or modify - * it under the terms of the GNU General Public License as published by - * the Free Software Foundation, either version 3 of the License, or - * (at your option) any later version. - * - * meli is distributed in the hope that it will be useful, - * but WITHOUT ANY WARRANTY; without even the implied warranty of - * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the - * GNU General Public License for more details. - * - * You should have received a copy of the GNU General Public License - * along with meli. If not, see . - */ - -/*! - * Primitive Async/Wait implementation. - * - * To create an Async promise, create an AsyncBuilder. Ask for its channel receiver/sender with - * `tx` and `rx` methods to pass them in your worker's closure. Build an `Async` with your - * `JoinHandle`. The thread must communicate with the `Async` object via `AsyncStatus` - * messages. - * - * When `Async` receives `AsyncStatus::Finished` it joins the thread and takes its value which - * can be extracted with `extract`. - */ - -use crossbeam::{ - bounded, - channel::{Receiver, Sender}, - select, -}; -use std::fmt; - -#[derive(Clone, Debug)] -pub struct WorkContext { - pub new_work: Sender, - pub set_name: Sender<(std::thread::ThreadId, String)>, - pub set_status: Sender<(std::thread::ThreadId, String)>, - pub finished: Sender, -} - -pub struct Work { - priority: u64, - pub is_static: bool, - pub closure: Box () + Send + Sync>, -} - -impl Ord for Work { - fn cmp(&self, other: &Work) -> std::cmp::Ordering { - self.priority.cmp(&other.priority) - } -} - -impl PartialOrd for Work { - fn partial_cmp(&self, other: &Work) -> Option { - Some(self.priority.cmp(&other.priority)) - } -} - -impl PartialEq for Work { - fn eq(&self, other: &Work) -> bool { - self.priority == other.priority - } -} - -impl Eq for Work {} - -impl Work { - pub fn compute(self, work_context: WorkContext) { - (self.closure)(work_context); - } -} - -impl fmt::Debug for Work { - fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result { - write!(f, "Work object") - } -} - -/// Messages to pass between `Async` owner and its worker thread. -#[derive(Clone)] -pub enum AsyncStatus { - NoUpdate, - Payload(T), - Finished, - ///The number may hold whatever meaning the user chooses. - ProgressReport(usize), -} - -impl fmt::Debug for AsyncStatus { - fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result { - match self { - AsyncStatus::NoUpdate => write!(f, "AsyncStatus::NoUpdate"), - AsyncStatus::Payload(_) => write!(f, "AsyncStatus::Payload(_)"), - AsyncStatus::Finished => write!(f, "AsyncStatus::Finished"), - AsyncStatus::ProgressReport(u) => write!(f, "AsyncStatus::ProgressReport({})", u), - } - } -} - -/// A builder object for `Async` -#[derive(Debug, Clone)] -pub struct AsyncBuilder { - tx: Sender>, - rx: Receiver>, - priority: u64, - is_static: bool, -} - -#[derive(Debug)] -pub struct Async { - work: Option, - active: bool, - tx: Sender>, - rx: Receiver>, -} - -impl Default for AsyncBuilder { - fn default() -> Self { - AsyncBuilder::::new() - } -} - -impl AsyncBuilder -where - T: Send + Sync, -{ - pub fn new() -> Self { - let (sender, receiver) = bounded(8 * ::std::mem::size_of::>()); - AsyncBuilder { - tx: sender, - rx: receiver, - priority: 0, - is_static: false, - } - } - /// Returns the sender object of the promise's channel. - pub fn tx(&mut self) -> Sender> { - self.tx.clone() - } - /// Returns the receiver object of the promise's channel. - pub fn rx(&mut self) -> Receiver> { - self.rx.clone() - } - - pub fn set_priority(&mut self, new_val: u64) -> &mut Self { - self.priority = new_val; - self - } - - pub fn set_is_static(&mut self, new_val: bool) -> &mut Self { - self.is_static = new_val; - self - } - - /// Returns an `Async` object that contains a `Thread` join handle that returns a `T` - pub fn build(self, work: Box () + Send + Sync>) -> Async { - Async { - work: Some(Work { - priority: self.priority, - is_static: self.is_static, - closure: work, - }), - tx: self.tx, - rx: self.rx, - active: false, - } - } -} - -impl Async -where - T: Send + Sync, -{ - pub fn work(&mut self) -> Option { - if !self.active { - self.active = true; - self.work.take() - } else { - None - } - } - /// Returns the sender object of the promise's channel. - pub fn tx(&mut self) -> Sender> { - self.tx.clone() - } - /// Returns the receiver object of the promise's channel. - pub fn rx(&mut self) -> Receiver> { - self.rx.clone() - } - /// Polls worker thread and returns result. - pub fn poll_block(&mut self) -> Result, ()> { - if !self.active { - return Ok(AsyncStatus::Finished); - } - - let rx = &self.rx; - select! { - recv(rx) -> r => { - match r { - Ok(p @ AsyncStatus::Payload(_)) => { - Ok(p) - }, - Ok(f @ AsyncStatus::Finished) => { - self.active = false; - Ok(f) - }, - Ok(a) => { - Ok(a) - } - Err(_) => { - Err(()) - }, - } - }, - } - } - /// Polls worker thread and returns result. - pub fn poll(&mut self) -> Result, ()> { - if !self.active { - return Ok(AsyncStatus::Finished); - } - - let rx = &self.rx; - select! { - default => { - Ok(AsyncStatus::NoUpdate) - }, - recv(rx) -> r => { - match r { - Ok(p @ AsyncStatus::Payload(_)) => { - Ok(p) - }, - Ok(f @ AsyncStatus::Finished) => { - self.active = false; - Ok(f) - }, - Ok(a) => { - Ok(a) - } - Err(_) => { - Err(()) - }, - } - }, - } - } -} diff --git a/melib/src/backends.rs b/melib/src/backends.rs index e6954ec0..62434bc5 100644 --- a/melib/src/backends.rs +++ b/melib/src/backends.rs @@ -49,7 +49,6 @@ pub mod mbox; pub use self::imap::ImapType; #[cfg(feature = "imap_backend")] pub use self::nntp::NntpType; -use crate::async_workers::*; use crate::conf::AccountSettings; use crate::error::{MeliError, Result}; @@ -304,31 +303,23 @@ pub type ResultFuture = Result> + Send pub trait MailBackend: ::std::fmt::Debug + Send + Sync { fn capabilities(&self) -> MailBackendCapabilities; - fn is_online(&self) -> Result<()> { + fn is_online(&self) -> ResultFuture<()> { Err(MeliError::new("Unimplemented.")) } - fn is_online_async(&self) -> ResultFuture<()> { - Err(MeliError::new("Unimplemented.")) - } - fn fetch(&mut self, mailbox_hash: MailboxHash) -> Result>>>; - fn fetch_async( + //fn fetch(&mut self, mailbox_hash: MailboxHash) -> Result>>>; + fn fetch( &mut self, _mailbox_hash: MailboxHash, ) -> Result>> + Send + 'static>>> { Err(MeliError::new("Unimplemented.")) } - fn refresh(&mut self, _mailbox_hash: MailboxHash) -> Result> { + fn refresh(&mut self, _mailbox_hash: MailboxHash) -> ResultFuture<()> { Err(MeliError::new("Unimplemented.")) } - fn refresh_async(&mut self, _mailbox_hash: MailboxHash) -> ResultFuture<()> { + fn watch(&self) -> ResultFuture<()> { Err(MeliError::new("Unimplemented.")) } - fn watch(&self, work_context: WorkContext) -> Result; - fn watch_async(&self) -> ResultFuture<()> { - Err(MeliError::new("Unimplemented.")) - } - fn mailboxes(&self) -> Result>; - fn mailboxes_async(&self) -> ResultFuture> { + fn mailboxes(&self) -> ResultFuture> { Err(MeliError::new("Unimplemented.")) } fn operation(&self, hash: EnvelopeHash) -> Result>; diff --git a/melib/src/backends/imap.rs b/melib/src/backends/imap.rs index a383b6a0..106c1bd5 100644 --- a/melib/src/backends/imap.rs +++ b/melib/src/backends/imap.rs @@ -36,11 +36,11 @@ mod cache; pub mod managesieve; mod untagged; -use crate::async_workers::{Async, WorkContext}; use crate::backends::{ RefreshEventKind::{self, *}, *, }; + use crate::conf::AccountSettings; use crate::connections::timeout; use crate::email::*; @@ -260,7 +260,7 @@ impl MailBackend for ImapType { } } - fn fetch_async( + fn fetch( &mut self, mailbox_hash: MailboxHash, ) -> Result>> + Send + 'static>>> { @@ -289,7 +289,7 @@ impl MailBackend for ImapType { })) } - fn refresh_async(&mut self, mailbox_hash: MailboxHash) -> ResultFuture<()> { + fn refresh(&mut self, mailbox_hash: MailboxHash) -> ResultFuture<()> { let main_conn = self.connection.clone(); let uid_store = self.uid_store.clone(); Ok(Box::pin(async move { @@ -304,7 +304,7 @@ impl MailBackend for ImapType { })) } - fn mailboxes_async(&self) -> ResultFuture> { + fn mailboxes(&self) -> ResultFuture> { let uid_store = self.uid_store.clone(); let connection = self.connection.clone(); Ok(Box::pin(async move { @@ -356,12 +356,12 @@ impl MailBackend for ImapType { })) } - fn is_online_async(&self) -> ResultFuture<()> { + fn is_online(&self) -> ResultFuture<()> { let connection = self.connection.clone(); Ok(Box::pin(async move { match timeout(std::time::Duration::from_secs(3), connection.lock()).await { Ok(mut conn) => { - debug!("is_online_async"); + debug!("is_online"); match debug!(timeout(std::time::Duration::from_secs(3), conn.connect()).await) { Ok(Ok(())) => Ok(()), Err(err) | Ok(Err(err)) => { @@ -375,20 +375,7 @@ impl MailBackend for ImapType { })) } - fn fetch(&mut self, _mailbox_hash: MailboxHash) -> Result>>> { - Err(MeliError::new("Unimplemented.")) - } - - fn refresh(&mut self, _mailbox_hash: MailboxHash) -> Result> { - Err(MeliError::new("Unimplemented.")) - } - - fn watch(&self, _work_context: WorkContext) -> Result { - Err(MeliError::new("Unimplemented.")) - } - - fn watch_async(&self) -> ResultFuture<()> { - debug!("watch_async called"); + fn watch(&self) -> ResultFuture<()> { let conn = ImapConnection::new_connection(&self.server_conf, self.uid_store.clone()); let main_conn = self.connection.clone(); let uid_store = self.uid_store.clone(); @@ -417,15 +404,11 @@ impl MailBackend for ImapType { } else { poll_with_examine(kit).await?; } - debug!("watch_async future returning"); + debug!("watch future returning"); Ok(()) })) } - fn mailboxes(&self) -> Result> { - Err(MeliError::new("Unimplemented.")) - } - fn operation(&self, hash: EnvelopeHash) -> Result> { let (uid, mailbox_hash) = if let Some(v) = self.uid_store.hash_index.lock().unwrap().get(&hash) @@ -748,7 +731,7 @@ impl MailBackend for ImapType { ) -> ResultFuture<(MailboxHash, HashMap)> { let uid_store = self.uid_store.clone(); let connection = self.connection.clone(); - let new_mailbox_fut = self.mailboxes_async(); + let new_mailbox_fut = self.mailboxes(); Ok(Box::pin(async move { /* Must transform path to something the IMAP server will accept * @@ -819,7 +802,7 @@ impl MailBackend for ImapType { ) -> ResultFuture> { let uid_store = self.uid_store.clone(); let connection = self.connection.clone(); - let new_mailbox_fut = self.mailboxes_async(); + let new_mailbox_fut = self.mailboxes(); Ok(Box::pin(async move { let imap_path: String; let no_select: bool; @@ -923,7 +906,7 @@ impl MailBackend for ImapType { ) -> ResultFuture { let uid_store = self.uid_store.clone(); let connection = self.connection.clone(); - let new_mailbox_fut = self.mailboxes_async(); + let new_mailbox_fut = self.mailboxes(); Ok(Box::pin(async move { let command: String; let mut response = String::with_capacity(8 * 1024); diff --git a/melib/src/backends/jmap.rs b/melib/src/backends/jmap.rs index 8c07f442..feb90b9b 100644 --- a/melib/src/backends/jmap.rs +++ b/melib/src/backends/jmap.rs @@ -19,7 +19,6 @@ * along with meli. If not, see . */ -use crate::async_workers::{Async, WorkContext}; use crate::backends::*; use crate::conf::AccountSettings; use crate::email::*; @@ -207,7 +206,7 @@ impl MailBackend for JmapType { CAPABILITIES } - fn is_online_async(&self) -> ResultFuture<()> { + fn is_online(&self) -> ResultFuture<()> { let online = self.online.clone(); Ok(Box::pin(async move { //match timeout(std::time::Duration::from_secs(3), connection.lock()).await { @@ -221,7 +220,7 @@ impl MailBackend for JmapType { })) } - fn fetch_async( + fn fetch( &mut self, mailbox_hash: MailboxHash, ) -> Result>> + Send + 'static>>> { @@ -243,13 +242,13 @@ impl MailBackend for JmapType { })) } - fn watch_async(&self) -> ResultFuture<()> { + fn watch(&self) -> ResultFuture<()> { Ok(Box::pin(async move { Err(MeliError::from("JMAP watch for updates is unimplemented")) })) } - fn mailboxes_async(&self) -> ResultFuture> { + fn mailboxes(&self) -> ResultFuture> { let mailboxes = self.mailboxes.clone(); let connection = self.connection.clone(); Ok(Box::pin(async move { @@ -354,18 +353,6 @@ impl MailBackend for JmapType { })) } - fn fetch(&mut self, _mailbox_hash: MailboxHash) -> Result>>> { - Err(MeliError::new("Unimplemented.")) - } - - fn watch(&self, _work_context: WorkContext) -> Result { - Err(MeliError::new("Unimplemented.")) - } - - fn mailboxes(&self) -> Result> { - Err(MeliError::new("Unimplemented.")) - } - fn rename_mailbox( &mut self, _mailbox_hash: MailboxHash, diff --git a/melib/src/backends/maildir/backend.rs b/melib/src/backends/maildir/backend.rs index f159ce2f..44739fc5 100644 --- a/melib/src/backends/maildir/backend.rs +++ b/melib/src/backends/maildir/backend.rs @@ -20,7 +20,6 @@ */ use super::{MaildirMailbox, MaildirOp, MaildirPathTrait}; -use crate::async_workers::*; use crate::backends::{RefreshEventKind::*, *}; use crate::conf::AccountSettings; use crate::email::{Envelope, EnvelopeHash, Flag}; @@ -40,10 +39,8 @@ use std::io::{self, Read, Write}; use std::ops::{Deref, DerefMut}; use std::os::unix::fs::PermissionsExt; use std::path::{Component, Path, PathBuf}; -use std::result; use std::sync::mpsc::channel; use std::sync::{Arc, Mutex}; -use std::thread; #[derive(Clone, Debug, PartialEq)] pub(super) enum PathMod { @@ -188,31 +185,20 @@ impl MailBackend for MaildirType { CAPABILITIES } - fn is_online(&self) -> Result<()> { - Ok(()) - } - - fn is_online_async(&self) -> ResultFuture<()> { + fn is_online(&self) -> ResultFuture<()> { Ok(Box::pin(async { Ok(()) })) } - fn mailboxes(&self) -> Result> { - Ok(self + fn mailboxes(&self) -> ResultFuture> { + let res = Ok(self .mailboxes .iter() .map(|(h, f)| (*h, BackendMailbox::clone(f))) - .collect()) - } - fn mailboxes_async(&self) -> ResultFuture> { - let res = self.mailboxes(); + .collect()); Ok(Box::pin(async { res })) } - fn fetch(&mut self, mailbox_hash: MailboxHash) -> Result>>> { - Ok(self.multicore(4, mailbox_hash)) - } - - fn fetch_async( + fn fetch( &mut self, mailbox_hash: MailboxHash, ) -> Result>> + Send + 'static>>> @@ -224,14 +210,19 @@ impl MailBackend for MaildirType { let root_path = self.path.to_path_buf(); let map = self.hash_indexes.clone(); let mailbox_index = self.mailbox_index.clone(); - super::stream::MaildirStream::new(&self.name, mailbox_hash, unseen, total, path, root_path, map, mailbox_index) + super::stream::MaildirStream::new( + &self.name, + mailbox_hash, + unseen, + total, + path, + root_path, + map, + mailbox_index, + ) } - fn refresh( - &mut self, - mailbox_hash: MailboxHash, - ) -> Result> { - let w = AsyncBuilder::new(); + fn refresh(&mut self, mailbox_hash: MailboxHash) -> ResultFuture<()> { let cache_dir = xdg::BaseDirectories::with_profile("meli", &self.name).unwrap(); let account_hash = { let mut hasher = DefaultHasher::default(); @@ -240,117 +231,116 @@ impl MailBackend for MaildirType { }; let sender = self.event_consumer.clone(); - let handle = { - let mailbox: &MaildirMailbox = &self.mailboxes[&mailbox_hash]; - let path: PathBuf = mailbox.fs_path().into(); - let name = format!("refresh {:?}", mailbox.name()); - let root_path = self.path.to_path_buf(); - let map = self.hash_indexes.clone(); - let mailbox_index = self.mailbox_index.clone(); + let mailbox: &MaildirMailbox = &self.mailboxes[&mailbox_hash]; + let path: PathBuf = mailbox.fs_path().into(); + let root_path = self.path.to_path_buf(); + let map = self.hash_indexes.clone(); + let mailbox_index = self.mailbox_index.clone(); - Box::new(move |work_context: crate::async_workers::WorkContext| { - work_context - .set_name - .send((std::thread::current().id(), name.clone())) - .unwrap(); - let thunk = move |sender: &BackendEventConsumer| { - debug!("refreshing"); - let mut path = path.clone(); - path.push("new"); - for d in path.read_dir()? { - if let Ok(p) = d { - move_to_cur(p.path()).ok().take(); - } + Ok(Box::pin(async move { + let thunk = move |sender: &BackendEventConsumer| { + debug!("refreshing"); + let mut path = path.clone(); + path.push("new"); + for d in path.read_dir()? { + if let Ok(p) = d { + move_to_cur(p.path()).ok().take(); } - path.pop(); + } + path.pop(); - path.push("cur"); - let iter = path.read_dir()?; - let count = path.read_dir()?.count(); - let mut files: Vec = Vec::with_capacity(count); - for e in iter { - let e = e.and_then(|x| { - let path = x.path(); - Ok(path) - })?; - files.push(e); - } - let mut current_hashes = { + path.push("cur"); + let iter = path.read_dir()?; + let count = path.read_dir()?.count(); + let mut files: Vec = Vec::with_capacity(count); + for e in iter { + let e = e.and_then(|x| { + let path = x.path(); + Ok(path) + })?; + files.push(e); + } + let mut current_hashes = { + let mut map = map.lock().unwrap(); + let map = map.entry(mailbox_hash).or_default(); + map.keys().cloned().collect::>() + }; + for file in files { + let hash = get_file_hash(&file); + { let mut map = map.lock().unwrap(); let map = map.entry(mailbox_hash).or_default(); - map.keys().cloned().collect::>() - }; - for file in files { - let hash = get_file_hash(&file); - { - let mut map = map.lock().unwrap(); - let map = map.entry(mailbox_hash).or_default(); - if map.contains_key(&hash) { - map.remove(&hash); - current_hashes.remove(&hash); - continue; - } - (*map).insert(hash, PathBuf::from(&file).into()); + if map.contains_key(&hash) { + map.remove(&hash); + current_hashes.remove(&hash); + continue; } - let op = Box::new(MaildirOp::new(hash, map.clone(), mailbox_hash)); - if let Ok(e) = Envelope::from_token(op, hash) { - mailbox_index.lock().unwrap().insert(e.hash(), mailbox_hash); - let file_name = file.strip_prefix(&root_path).unwrap().to_path_buf(); - if let Ok(cached) = cache_dir.place_cache_file(file_name) { - /* place result in cache directory */ - let f = match fs::File::create(cached) { - Ok(f) => f, - Err(e) => { - panic!("{}", e); - } - }; - let metadata = f.metadata().unwrap(); - let mut permissions = metadata.permissions(); + (*map).insert(hash, PathBuf::from(&file).into()); + } + let op = Box::new(MaildirOp::new(hash, map.clone(), mailbox_hash)); + if let Ok(e) = Envelope::from_token(op, hash) { + mailbox_index.lock().unwrap().insert(e.hash(), mailbox_hash); + let file_name = file.strip_prefix(&root_path).unwrap().to_path_buf(); + if let Ok(cached) = cache_dir.place_cache_file(file_name) { + /* place result in cache directory */ + let f = match fs::File::create(cached) { + Ok(f) => f, + Err(e) => { + panic!("{}", e); + } + }; + let metadata = f.metadata().unwrap(); + let mut permissions = metadata.permissions(); - permissions.set_mode(0o600); // Read/write for owner only. - f.set_permissions(permissions).unwrap(); + permissions.set_mode(0o600); // Read/write for owner only. + f.set_permissions(permissions).unwrap(); - let writer = io::BufWriter::new(f); - bincode::serialize_into(writer, &e).unwrap(); - } - (sender)(account_hash, BackendEvent::Refresh(RefreshEvent { + let writer = io::BufWriter::new(f); + bincode::serialize_into(writer, &e).unwrap(); + } + (sender)( + account_hash, + BackendEvent::Refresh(RefreshEvent { account_hash, mailbox_hash, kind: Create(Box::new(e)), - })); - } else { - debug!( - "DEBUG: hash {}, path: {} couldn't be parsed", - hash, - file.as_path().display() - ); - continue; - } + }), + ); + } else { + debug!( + "DEBUG: hash {}, path: {} couldn't be parsed", + hash, + file.as_path().display() + ); + continue; } - for ev in current_hashes.into_iter().map(|h| BackendEvent::Refresh(RefreshEvent { + } + for ev in current_hashes.into_iter().map(|h| { + BackendEvent::Refresh(RefreshEvent { account_hash, mailbox_hash, kind: Remove(h), - })) { - (sender)(account_hash, ev); - } - Ok(()) - }; - if let Err(err) = thunk(&sender) { - (sender)(account_hash, BackendEvent::Refresh(RefreshEvent { + }) + }) { + (sender)(account_hash, ev); + } + Ok(()) + }; + if let Err(err) = thunk(&sender) { + (sender)( + account_hash, + BackendEvent::Refresh(RefreshEvent { account_hash, mailbox_hash, kind: Failure(err), - })); - } - }) - }; - Ok(w.build(handle)) + }), + ); + } + Ok(()) + })) } - fn watch( - &self, - work_context: WorkContext, - ) -> Result { + + fn watch(&self) -> ResultFuture<()> { let sender = self.event_consumer.clone(); let (tx, rx) = channel(); let mut watcher = watcher(tx, Duration::from_secs(2)).unwrap(); @@ -365,335 +355,371 @@ impl MailBackend for MaildirType { debug!("watching {:?}", root_path); let hash_indexes = self.hash_indexes.clone(); let mailbox_index = self.mailbox_index.clone(); - let root_mailbox_hash: MailboxHash = self.mailboxes.values().find(|m| m.parent.is_none()).map(|m| m.hash()).unwrap(); + let root_mailbox_hash: MailboxHash = self + .mailboxes + .values() + .find(|m| m.parent.is_none()) + .map(|m| m.hash()) + .unwrap(); let mailbox_counts = self .mailboxes .iter() .map(|(&k, v)| (k, (v.unseen.clone(), v.total.clone()))) .collect::>, Arc>)>>(); - let handle = thread::Builder::new() - .name("mailbox watch".to_string()) - .spawn(move || { - // Move `watcher` in the closure's scope so that it doesn't get dropped. - let _watcher = watcher; - let _work_context = work_context; - loop { - match rx.recv() { - /* - * Event types: - * - * pub enum RefreshEventKind { - * Update(EnvelopeHash, Envelope), // Old hash, new envelope - * Create(Envelope), - * Remove(EnvelopeHash), - * Rescan, - * } - */ - Ok(event) => match event { - /* Create */ - DebouncedEvent::Create(mut pathbuf) => { - debug!("DebouncedEvent::Create(path = {:?}", pathbuf); - if path_is_new!(pathbuf) { - debug!("path_is_new"); - /* This creates a Rename event that we will receive later */ - pathbuf = match move_to_cur(pathbuf) { - Ok(p) => p, - Err(e) => { - debug!("error: {}", e.to_string()); - continue; - } - }; - - + Ok(Box::pin(async move { + // Move `watcher` in the closure's scope so that it doesn't get dropped. + let _watcher = watcher; + loop { + match rx.recv() { + /* + * Event types: + * + * pub enum RefreshEventKind { + * Update(EnvelopeHash, Envelope), // Old hash, new envelope + * Create(Envelope), + * Remove(EnvelopeHash), + * Rescan, + * } + */ + Ok(event) => match event { + /* Create */ + DebouncedEvent::Create(mut pathbuf) => { + debug!("DebouncedEvent::Create(path = {:?}", pathbuf); + if path_is_new!(pathbuf) { + debug!("path_is_new"); + /* This creates a Rename event that we will receive later */ + pathbuf = match move_to_cur(pathbuf) { + Ok(p) => p, + Err(e) => { + debug!("error: {}", e.to_string()); + continue; + } + }; + } + let mailbox_hash = get_path_hash!(pathbuf); + let file_name = pathbuf + .as_path() + .strip_prefix(&root_path) + .unwrap() + .to_path_buf(); + if let Some(env) = add_path_to_index( + &hash_indexes, + mailbox_hash, + pathbuf.as_path(), + &cache_dir, + file_name, + ) { + mailbox_index + .lock() + .unwrap() + .insert(env.hash(), mailbox_hash); + debug!( + "Create event {} {} {}", + env.hash(), + env.subject(), + pathbuf.display() + ); + if !env.is_seen() { + *mailbox_counts[&mailbox_hash].0.lock().unwrap() += 1; } - let mailbox_hash = get_path_hash!(pathbuf); - let file_name = pathbuf + *mailbox_counts[&mailbox_hash].1.lock().unwrap() += 1; + (sender)( + account_hash, + BackendEvent::Refresh(RefreshEvent { + account_hash, + mailbox_hash, + kind: Create(Box::new(env)), + }), + ); + } + } + /* Update */ + DebouncedEvent::NoticeWrite(pathbuf) | DebouncedEvent::Write(pathbuf) => { + debug!("DebouncedEvent::Write(path = {:?}", &pathbuf); + let mailbox_hash = get_path_hash!(pathbuf); + let mut hash_indexes_lock = hash_indexes.lock().unwrap(); + let index_lock = + &mut hash_indexes_lock.entry(mailbox_hash).or_default(); + let file_name = pathbuf + .as_path() + .strip_prefix(&root_path) + .unwrap() + .to_path_buf(); + /* Linear search in hash_index to find old hash */ + let old_hash: EnvelopeHash = { + if let Some((k, v)) = + index_lock.iter_mut().find(|(_, v)| *v.buf == pathbuf) + { + //TODO FIXME This doesn't make sense? + *v = pathbuf.clone().into(); + *k + } else { + drop(hash_indexes_lock); + /* Did we just miss a Create event? In any case, create + * envelope. */ + if let Some(env) = add_path_to_index( + &hash_indexes, + mailbox_hash, + pathbuf.as_path(), + &cache_dir, + file_name, + ) { + mailbox_index + .lock() + .unwrap() + .insert(env.hash(), mailbox_hash); + (sender)( + account_hash, + BackendEvent::Refresh(RefreshEvent { + account_hash, + mailbox_hash, + kind: Create(Box::new(env)), + }), + ); + } + continue; + } + }; + let new_hash: EnvelopeHash = get_file_hash(pathbuf.as_path()); + if index_lock.get_mut(&new_hash).is_none() { + debug!("write notice"); + let op = Box::new(MaildirOp::new( + new_hash, + hash_indexes.clone(), + mailbox_hash, + )); + if let Ok(env) = Envelope::from_token(op, new_hash) { + debug!("{}\t{:?}", new_hash, &pathbuf); + debug!( + "hash {}, path: {:?} couldn't be parsed", + new_hash, &pathbuf + ); + index_lock.insert(new_hash, pathbuf.into()); + + /* Send Write notice */ + + (sender)( + account_hash, + BackendEvent::Refresh(RefreshEvent { + account_hash, + mailbox_hash, + kind: Update(old_hash, Box::new(env)), + }), + ); + } + } + } + /* Remove */ + DebouncedEvent::NoticeRemove(pathbuf) | DebouncedEvent::Remove(pathbuf) => { + debug!("DebouncedEvent::Remove(path = {:?}", pathbuf); + let mailbox_hash = get_path_hash!(pathbuf); + let mut hash_indexes_lock = hash_indexes.lock().unwrap(); + let index_lock = hash_indexes_lock.entry(mailbox_hash).or_default(); + let hash: EnvelopeHash = if let Some((k, _)) = + index_lock.iter().find(|(_, v)| *v.buf == pathbuf) + { + *k + } else { + debug!("removed but not contained in index"); + continue; + }; + if let Some(ref modif) = &index_lock[&hash].modified { + match modif { + PathMod::Path(path) => debug!( + "envelope {} has modified path set {}", + hash, + path.display() + ), + PathMod::Hash(hash) => debug!( + "envelope {} has modified path set {}", + hash, + &index_lock[&hash].buf.display() + ), + } + index_lock.entry(hash).and_modify(|e| { + e.removed = false; + }); + continue; + } + *mailbox_counts[&mailbox_hash].1.lock().unwrap() -= 1; + if !pathbuf.flags().contains(Flag::SEEN) { + *mailbox_counts[&mailbox_hash].0.lock().unwrap() -= 1; + } + + index_lock.entry(hash).and_modify(|e| { + e.removed = true; + }); + + (sender)( + account_hash, + BackendEvent::Refresh(RefreshEvent { + account_hash, + mailbox_hash, + kind: Remove(hash), + }), + ); + } + /* Envelope hasn't changed */ + DebouncedEvent::Rename(src, dest) => { + debug!("DebouncedEvent::Rename(src = {:?}, dest = {:?})", src, dest); + let mailbox_hash = get_path_hash!(src); + let old_hash: EnvelopeHash = get_file_hash(src.as_path()); + let new_hash: EnvelopeHash = get_file_hash(dest.as_path()); + + let mut hash_indexes_lock = hash_indexes.lock().unwrap(); + let index_lock = hash_indexes_lock.entry(mailbox_hash).or_default(); + let old_flags = src.flags(); + let new_flags = dest.flags(); + let was_seen: bool = old_flags.contains(Flag::SEEN); + let is_seen: bool = new_flags.contains(Flag::SEEN); + + if index_lock.contains_key(&old_hash) && !index_lock[&old_hash].removed + { + debug!("contains_old_key"); + index_lock.entry(old_hash).and_modify(|e| { + debug!(&e.modified); + e.modified = Some(PathMod::Hash(new_hash)); + }); + (sender)( + account_hash, + BackendEvent::Refresh(RefreshEvent { + account_hash, + mailbox_hash: get_path_hash!(dest), + kind: Rename(old_hash, new_hash), + }), + ); + if !was_seen && is_seen { + let mut lck = mailbox_counts[&mailbox_hash].0.lock().unwrap(); + *lck = lck.saturating_sub(1); + } else if was_seen && !is_seen { + *mailbox_counts[&mailbox_hash].0.lock().unwrap() += 1; + } + if old_flags != new_flags { + (sender)( + account_hash, + BackendEvent::Refresh(RefreshEvent { + account_hash, + mailbox_hash: get_path_hash!(dest), + kind: NewFlags(new_hash, (new_flags, vec![])), + }), + ); + } + mailbox_index + .lock() + .unwrap() + .insert(new_hash, get_path_hash!(dest)); + index_lock.insert(new_hash, dest.into()); + continue; + } else if !index_lock.contains_key(&new_hash) + && index_lock + .get(&old_hash) + .map(|e| e.removed) + .unwrap_or(false) + { + if index_lock + .get(&old_hash) + .map(|e| e.removed) + .unwrap_or(false) + { + index_lock.entry(old_hash).and_modify(|e| { + e.modified = Some(PathMod::Hash(new_hash)); + e.removed = false; + }); + debug!("contains_old_key, key was marked as removed (by external source)"); + } else { + debug!("not contains_new_key"); + } + let file_name = dest .as_path() .strip_prefix(&root_path) .unwrap() .to_path_buf(); + debug!("filename = {:?}", file_name); + drop(hash_indexes_lock); if let Some(env) = add_path_to_index( &hash_indexes, mailbox_hash, - pathbuf.as_path(), + dest.as_path(), &cache_dir, file_name, ) { - mailbox_index.lock().unwrap().insert(env.hash(),mailbox_hash); + mailbox_index + .lock() + .unwrap() + .insert(env.hash(), mailbox_hash); debug!( "Create event {} {} {}", env.hash(), env.subject(), - pathbuf.display() + dest.display() ); if !env.is_seen() { *mailbox_counts[&mailbox_hash].0.lock().unwrap() += 1; } *mailbox_counts[&mailbox_hash].1.lock().unwrap() += 1; - (sender)(account_hash, BackendEvent::Refresh(RefreshEvent { + (sender)( account_hash, - mailbox_hash, - kind: Create(Box::new(env)), - })); - } - } - /* Update */ - DebouncedEvent::NoticeWrite(pathbuf) - | DebouncedEvent::Write(pathbuf) => { - debug!("DebouncedEvent::Write(path = {:?}", &pathbuf); - let mailbox_hash = get_path_hash!(pathbuf); - let mut hash_indexes_lock = hash_indexes.lock().unwrap(); - let index_lock = - &mut hash_indexes_lock.entry(mailbox_hash).or_default(); - let file_name = pathbuf - .as_path() - .strip_prefix(&root_path) - .unwrap() - .to_path_buf(); - /* Linear search in hash_index to find old hash */ - let old_hash: EnvelopeHash = { - if let Some((k, v)) = - index_lock.iter_mut().find(|(_, v)| *v.buf == pathbuf) - { - //TODO FIXME This doesn't make sense? - *v = pathbuf.clone().into(); - *k - } else { - drop(hash_indexes_lock); - /* Did we just miss a Create event? In any case, create - * envelope. */ - if let Some(env) = add_path_to_index( - &hash_indexes, - mailbox_hash, - pathbuf.as_path(), - &cache_dir, - file_name, - ) { - mailbox_index.lock().unwrap().insert(env.hash(),mailbox_hash); - (sender)(account_hash, BackendEvent::Refresh(RefreshEvent { - account_hash, - mailbox_hash, - kind: Create(Box::new(env)), - })); - } - return; - } - }; - let new_hash: EnvelopeHash = get_file_hash(pathbuf.as_path()); - if index_lock.get_mut(&new_hash).is_none() { - debug!("write notice"); - let op = Box::new(MaildirOp::new( - new_hash, - hash_indexes.clone(), - mailbox_hash, - )); - if let Ok(env) = Envelope::from_token(op, new_hash) { - debug!("{}\t{:?}", new_hash, &pathbuf); - debug!( - "hash {}, path: {:?} couldn't be parsed", - new_hash, &pathbuf - ); - index_lock.insert(new_hash, pathbuf.into()); - - /* Send Write notice */ - - (sender)(account_hash, BackendEvent::Refresh(RefreshEvent { - account_hash, - mailbox_hash, - kind: Update(old_hash, Box::new(env)), - })); - } - } - } - /* Remove */ - DebouncedEvent::NoticeRemove(pathbuf) - | DebouncedEvent::Remove(pathbuf) => { - debug!("DebouncedEvent::Remove(path = {:?}", pathbuf); - let mailbox_hash = get_path_hash!(pathbuf); - let mut hash_indexes_lock = hash_indexes.lock().unwrap(); - let index_lock = hash_indexes_lock.entry(mailbox_hash).or_default(); - let hash: EnvelopeHash = if let Some((k, _)) = - index_lock.iter().find(|(_, v)| *v.buf == pathbuf) - { - *k - } else { - debug!("removed but not contained in index"); - continue; - }; - if let Some(ref modif) = &index_lock[&hash].modified { - match modif { - PathMod::Path(path) => debug!( - "envelope {} has modified path set {}", - hash, - path.display() - ), - PathMod::Hash(hash) => debug!( - "envelope {} has modified path set {}", - hash, - &index_lock[&hash].buf.display() - ), - } - index_lock.entry(hash).and_modify(|e| { - e.removed = false; - }); - continue; - } - *mailbox_counts[&mailbox_hash].1.lock().unwrap() -= 1; - if !pathbuf.flags().contains(Flag::SEEN) { - *mailbox_counts[&mailbox_hash].0.lock().unwrap() -= 1; - } - - index_lock.entry(hash).and_modify(|e| { - e.removed = true; - }); - - (sender)(account_hash, BackendEvent::Refresh(RefreshEvent { - account_hash, - mailbox_hash, - kind: Remove(hash), - })); - } - /* Envelope hasn't changed */ - DebouncedEvent::Rename(src, dest) => { - debug!( - "DebouncedEvent::Rename(src = {:?}, dest = {:?})", - src, dest - ); - let mailbox_hash = get_path_hash!(src); - let old_hash: EnvelopeHash = get_file_hash(src.as_path()); - let new_hash: EnvelopeHash = get_file_hash(dest.as_path()); - - let mut hash_indexes_lock = hash_indexes.lock().unwrap(); - let index_lock = hash_indexes_lock.entry(mailbox_hash).or_default(); - let old_flags = src.flags(); - let new_flags = dest.flags(); - let was_seen: bool = old_flags.contains(Flag::SEEN); - let is_seen: bool = new_flags.contains(Flag::SEEN); - - if index_lock.contains_key(&old_hash) - && !index_lock[&old_hash].removed - { - debug!("contains_old_key"); - index_lock.entry(old_hash).and_modify(|e| { - debug!(&e.modified); - e.modified = Some(PathMod::Hash(new_hash)); - }); - (sender)(account_hash, BackendEvent::Refresh(RefreshEvent { - account_hash, - mailbox_hash: get_path_hash!(dest), - kind: Rename(old_hash, new_hash), - })); - if !was_seen && is_seen { - let mut lck = mailbox_counts[&mailbox_hash].0.lock().unwrap(); - *lck = lck.saturating_sub(1); - } else if was_seen && !is_seen { - *mailbox_counts[&mailbox_hash].0.lock().unwrap() += 1; - } - if old_flags != new_flags { - (sender)(account_hash, BackendEvent::Refresh(RefreshEvent { - account_hash, - mailbox_hash: get_path_hash!(dest), - kind: NewFlags(new_hash, (new_flags, vec![])), - })); - } - mailbox_index.lock().unwrap().insert(new_hash,get_path_hash!(dest) ); - index_lock.insert(new_hash, dest.into()); - continue; - } else if !index_lock.contains_key(&new_hash) - && index_lock - .get(&old_hash) - .map(|e| e.removed) - .unwrap_or(false) - { - if index_lock - .get(&old_hash) - .map(|e| e.removed) - .unwrap_or(false) - { - index_lock.entry(old_hash).and_modify(|e| { - e.modified = Some(PathMod::Hash(new_hash)); - e.removed = false; - }); - debug!("contains_old_key, key was marked as removed (by external source)"); - } else { - debug!("not contains_new_key"); - } - let file_name = dest - .as_path() - .strip_prefix(&root_path) - .unwrap() - .to_path_buf(); - debug!("filename = {:?}", file_name); - drop(hash_indexes_lock); - if let Some(env) = add_path_to_index( - &hash_indexes, - mailbox_hash, - dest.as_path(), - &cache_dir, - file_name, - ) { - mailbox_index.lock().unwrap().insert(env.hash(), mailbox_hash); - debug!( - "Create event {} {} {}", - env.hash(), - env.subject(), - dest.display() - ); - if !env.is_seen() { - *mailbox_counts[&mailbox_hash].0.lock().unwrap() += 1; - } - *mailbox_counts[&mailbox_hash].1.lock().unwrap() += 1; - (sender)(account_hash, BackendEvent::Refresh(RefreshEvent { + BackendEvent::Refresh(RefreshEvent { account_hash, mailbox_hash, kind: Create(Box::new(env)), - })); - continue; - } else { - debug!("not valid email"); - } + }), + ); + continue; } else { - if was_seen && !is_seen { - *mailbox_counts[&mailbox_hash].0.lock().unwrap() += 1; - } - (sender)(account_hash, BackendEvent::Refresh(RefreshEvent { + debug!("not valid email"); + } + } else { + if was_seen && !is_seen { + *mailbox_counts[&mailbox_hash].0.lock().unwrap() += 1; + } + (sender)( + account_hash, + BackendEvent::Refresh(RefreshEvent { account_hash, mailbox_hash: get_path_hash!(dest), kind: Rename(old_hash, new_hash), - })); - debug!("contains_new_key"); - if old_flags != new_flags { - (sender)(account_hash, BackendEvent::Refresh(RefreshEvent { + }), + ); + debug!("contains_new_key"); + if old_flags != new_flags { + (sender)( + account_hash, + BackendEvent::Refresh(RefreshEvent { account_hash, mailbox_hash: get_path_hash!(dest), kind: NewFlags(new_hash, (new_flags, vec![])), - })); - } + }), + ); } - - /* Maybe a re-read should be triggered here just to be safe. - (sender)(account_hash, BackendEvent::Refresh(RefreshEvent { - account_hash, - mailbox_hash: get_path_hash!(dest), - kind: Rescan, - })); - */ } - /* Trigger rescan of mailbox */ - DebouncedEvent::Rescan => { - (sender)(account_hash, BackendEvent::Refresh(RefreshEvent { + + /* Maybe a re-read should be triggered here just to be safe. + (sender)(account_hash, BackendEvent::Refresh(RefreshEvent { + account_hash, + mailbox_hash: get_path_hash!(dest), + kind: Rescan, + })); + */ + } + /* Trigger rescan of mailbox */ + DebouncedEvent::Rescan => { + (sender)( + account_hash, + BackendEvent::Refresh(RefreshEvent { account_hash, mailbox_hash: root_mailbox_hash, kind: Rescan, - })); - } - _ => {} - }, - Err(e) => debug!("watch error: {:?}", e), - } + }), + ); + } + _ => {} + }, + Err(e) => debug!("watch error: {:?}", e), } - })?; - Ok(handle.thread().id()) + } + Ok(()) + })) } fn operation(&self, hash: EnvelopeHash) -> Result> { @@ -832,8 +858,8 @@ impl MailBackend for MaildirType { }; self.mailboxes.insert(mailbox_hash, new_mailbox); - let ret = Ok((mailbox_hash, self.mailboxes()?)); - Ok(Box::pin(async { ret })) + let ret = self.mailboxes()?; + Ok(Box::pin(async move { Ok((mailbox_hash, ret.await?)) })) } fn delete_mailbox( @@ -1019,7 +1045,12 @@ impl MaildirType { })) } - pub fn multicore(&mut self, cores: usize, mailbox_hash: MailboxHash) -> Async>> { + /* + pub fn multicore( + &mut self, + cores: usize, + mailbox_hash: MailboxHash, + ) -> Async>> { let mut w = AsyncBuilder::new(); let cache_dir = xdg::BaseDirectories::with_profile("meli", &self.name).unwrap(); @@ -1199,6 +1230,7 @@ impl MaildirType { }; w.build(handle) } + */ pub fn save_to_mailbox(mut path: PathBuf, bytes: Vec, flags: Option) -> Result<()> { for d in &["cur", "new", "tmp"] { diff --git a/melib/src/backends/mbox.rs b/melib/src/backends/mbox.rs index c0224807..7c150cba 100644 --- a/melib/src/backends/mbox.rs +++ b/melib/src/backends/mbox.rs @@ -23,7 +23,6 @@ * https://wiki2.dovecot.org/MailboxFormat/mbox */ -use crate::async_workers::{Async, AsyncBuilder, AsyncStatus, WorkContext}; use crate::backends::*; use crate::conf::AccountSettings; use crate::email::parser::BytesExt; @@ -707,96 +706,109 @@ impl MailBackend for MboxType { CAPABILITIES } - fn is_online(&self) -> Result<()> { - Ok(()) + fn is_online(&self) -> ResultFuture<()> { + Ok(Box::pin(async { Ok(()) })) } - fn fetch(&mut self, mailbox_hash: MailboxHash) -> Result>>> { - let mut w = AsyncBuilder::new(); - let handle = { - let tx = w.tx(); - let mailbox_index = self.mailbox_index.clone(); - let mailboxes = self.mailboxes.clone(); - let mailbox_path = mailboxes.lock().unwrap()[&mailbox_hash].fs_path.clone(); - let prefer_mbox_type = self.prefer_mbox_type; - let closure = move |_work_context| { - let tx = tx.clone(); - let file = match std::fs::OpenOptions::new() - .read(true) - .write(true) - .open(&mailbox_path) - { - Ok(f) => f, - Err(e) => { - tx.send(AsyncStatus::Payload(Err(MeliError::from(e)))) - .unwrap(); - return; - } - }; - get_rw_lock_blocking(&file); - let mut buf_reader = BufReader::new(file); - let mut contents = Vec::new(); - if let Err(e) = buf_reader.read_to_end(&mut contents) { - tx.send(AsyncStatus::Payload(Err(MeliError::from(e)))) - .unwrap(); - return; - }; - - let mailboxes_lck = mailboxes.lock().unwrap(); - let index = mailboxes_lck[&mailbox_hash].index.clone(); + fn fetch( + &mut self, + mailbox_hash: MailboxHash, + ) -> Result>> + Send + 'static>>> { + struct FetchState { + mailbox_hash: MailboxHash, + mailbox_index: Arc>>, + mailboxes: Arc>>, + prefer_mbox_type: Option, + offset: usize, + file_offset: usize, + contents: Vec, + } + impl FetchState { + async fn fetch(&mut self) -> Result>> { + let mailboxes_lck = self.mailboxes.lock().unwrap(); + let index = mailboxes_lck[&self.mailbox_hash].index.clone(); drop(mailboxes_lck); let mut message_iter = MessageIterator { index, - input: &contents.as_slice(), - offset: 0, - file_offset: 0, - reader: prefer_mbox_type, + input: &self.contents.as_slice(), + offset: self.offset, + file_offset: self.file_offset, + reader: self.prefer_mbox_type, }; - let mut err = None; - loop { - let mut payload = vec![]; - 'iter_for_loop: for _i in 0..150 { - match message_iter.next() { - Some(Ok(env)) => { - payload.push(env); - } - Some(Err(_err)) => { - debug!(&_err); - err = Some(_err); - } - None => { - break 'iter_for_loop; - } + let mut payload = vec![]; + let mut done = false; + 'iter_for_loop: for _i in 0..150 { + match message_iter.next() { + Some(Ok(env)) => { + payload.push(env); + } + Some(Err(_err)) => { + debug!(&_err); + } + None => { + done = true; + break 'iter_for_loop; } } - if !payload.is_empty() { - err = None; - } else { - break; - } - let mut mailbox_index_lck = mailbox_index.lock().unwrap(); - for env in &payload { - mailbox_index_lck.insert(env.hash(), mailbox_hash); - } - tx.send(AsyncStatus::Payload(Ok(payload))).unwrap(); - } - if let Some(err) = err { - tx.send(AsyncStatus::Payload(Err(err))).unwrap(); } + self.offset = message_iter.offset; + self.file_offset = message_iter.file_offset; { - let mut mailbox_lock = mailboxes.lock().unwrap(); - mailbox_lock - .entry(mailbox_hash) - .and_modify(|f| f.content = contents); + let mut mailbox_index_lck = self.mailbox_index.lock().unwrap(); + for env in &payload { + mailbox_index_lck.insert(env.hash(), self.mailbox_hash); + } } - tx.send(AsyncStatus::Finished).unwrap(); - }; - Box::new(closure) + if done { + if payload.is_empty() { + return Ok(None); + } else { + let mut mailbox_lock = self.mailboxes.lock().unwrap(); + let contents = std::mem::replace(&mut self.contents, vec![]); + mailbox_lock + .entry(self.mailbox_hash) + .and_modify(|f| f.content = contents); + Ok(Some(payload)) + } + } else { + Ok(Some(payload)) + } + } + } + let mailboxes = self.mailboxes.clone(); + + let mailbox_path = mailboxes.lock().unwrap()[&mailbox_hash].fs_path.clone(); + let file = std::fs::OpenOptions::new() + .read(true) + .write(true) + .open(&mailbox_path)?; + get_rw_lock_blocking(&file); + let mut buf_reader = BufReader::new(file); + let mut contents = Vec::new(); + buf_reader.read_to_end(&mut contents)?; + let mut state = FetchState { + mailbox_hash, + mailboxes, + mailbox_index: self.mailbox_index.clone(), + prefer_mbox_type: self.prefer_mbox_type, + contents, + offset: 0, + file_offset: 0, }; - Ok(w.build(handle)) + Ok(Box::pin(async_stream::try_stream! { + loop { + if let Some(res) = state.fetch().await.map_err(|err| { + debug!("fetch err {:?}", &err); + err})? { + yield res; + } else { + return; + } + } + })) } - fn watch(&self, work_context: WorkContext) -> Result { + fn watch(&self) -> ResultFuture<()> { let sender = self.event_consumer.clone(); let (tx, rx) = channel(); let mut watcher = watcher(tx, std::time::Duration::from_secs(10)) @@ -817,165 +829,154 @@ impl MailBackend for MboxType { let mailboxes = self.mailboxes.clone(); let mailbox_index = self.mailbox_index.clone(); let prefer_mbox_type = self.prefer_mbox_type; - let handle = std::thread::Builder::new() - .name(format!("watching {}", self.account_name,)) - .spawn(move || { - // Move `watcher` in the closure's scope so that it doesn't get dropped. - let _watcher = watcher; - let _work_context = work_context; - let mailboxes = mailboxes; - loop { - match rx.recv() { - /* - * Event types: - * - * pub enum RefreshEventKind { - * Update(EnvelopeHash, Envelope), // Old hash, new envelope - * Create(Envelope), - * Remove(EnvelopeHash), - * Rescan, - * } - */ - Ok(event) => match event { - /* Update */ - DebouncedEvent::NoticeWrite(pathbuf) - | DebouncedEvent::Write(pathbuf) => { - let mailbox_hash = get_path_hash!(&pathbuf); - let file = match std::fs::OpenOptions::new() - .read(true) - .write(true) - .open(&pathbuf) - { - Ok(f) => f, - Err(_) => { - continue; - } - }; - get_rw_lock_blocking(&file); - let mut mailbox_lock = mailboxes.lock().unwrap(); - let mut buf_reader = BufReader::new(file); - let mut contents = Vec::new(); - if let Err(e) = buf_reader.read_to_end(&mut contents) { - debug!(e); + Ok(Box::pin(async move { + loop { + match rx.recv() { + /* + * Event types: + * + * pub enum RefreshEventKind { + * Update(EnvelopeHash, Envelope), // Old hash, new envelope + * Create(Envelope), + * Remove(EnvelopeHash), + * Rescan, + * } + */ + Ok(event) => match event { + /* Update */ + DebouncedEvent::NoticeWrite(pathbuf) | DebouncedEvent::Write(pathbuf) => { + let mailbox_hash = get_path_hash!(&pathbuf); + let file = match std::fs::OpenOptions::new() + .read(true) + .write(true) + .open(&pathbuf) + { + Ok(f) => f, + Err(_) => { continue; - }; - if contents - .starts_with(mailbox_lock[&mailbox_hash].content.as_slice()) - { - if let Ok((_, envelopes)) = mbox_parse( - mailbox_lock[&mailbox_hash].index.clone(), - &contents, - mailbox_lock[&mailbox_hash].content.len(), - prefer_mbox_type, - ) { - let mut mailbox_index_lck = mailbox_index.lock().unwrap(); - for env in envelopes { - mailbox_index_lck.insert(env.hash(), mailbox_hash); - (sender)( + } + }; + get_rw_lock_blocking(&file); + let mut mailbox_lock = mailboxes.lock().unwrap(); + let mut buf_reader = BufReader::new(file); + let mut contents = Vec::new(); + if let Err(e) = buf_reader.read_to_end(&mut contents) { + debug!(e); + continue; + }; + if contents.starts_with(mailbox_lock[&mailbox_hash].content.as_slice()) + { + if let Ok((_, envelopes)) = mbox_parse( + mailbox_lock[&mailbox_hash].index.clone(), + &contents, + mailbox_lock[&mailbox_hash].content.len(), + prefer_mbox_type, + ) { + let mut mailbox_index_lck = mailbox_index.lock().unwrap(); + for env in envelopes { + mailbox_index_lck.insert(env.hash(), mailbox_hash); + (sender)( + account_hash, + BackendEvent::Refresh(RefreshEvent { account_hash, - BackendEvent::Refresh(RefreshEvent { - account_hash, - mailbox_hash, - kind: RefreshEventKind::Create(Box::new(env)), - }), - ); - } + mailbox_hash, + kind: RefreshEventKind::Create(Box::new(env)), + }), + ); } - } else { - (sender)( - account_hash, - BackendEvent::Refresh(RefreshEvent { - account_hash, - mailbox_hash, - kind: RefreshEventKind::Rescan, - }), - ); } - mailbox_lock - .entry(mailbox_hash) - .and_modify(|f| f.content = contents); - } - /* Remove */ - DebouncedEvent::NoticeRemove(pathbuf) - | DebouncedEvent::Remove(pathbuf) => { - if mailboxes - .lock() - .unwrap() - .values() - .any(|f| f.fs_path == pathbuf) - { - let mailbox_hash = get_path_hash!(&pathbuf); - (sender)( + } else { + (sender)( + account_hash, + BackendEvent::Refresh(RefreshEvent { account_hash, - BackendEvent::Refresh(RefreshEvent { - account_hash, - mailbox_hash, - kind: RefreshEventKind::Failure(MeliError::new( - format!( - "mbox mailbox {} was removed.", - pathbuf.display() - ), - )), - }), - ); - return; - } + mailbox_hash, + kind: RefreshEventKind::Rescan, + }), + ); } - DebouncedEvent::Rename(src, dest) => { - if mailboxes - .lock() - .unwrap() - .values() - .any(|f| &f.fs_path == &src) - { - let mailbox_hash = get_path_hash!(&src); - (sender)( + mailbox_lock + .entry(mailbox_hash) + .and_modify(|f| f.content = contents); + } + /* Remove */ + DebouncedEvent::NoticeRemove(pathbuf) | DebouncedEvent::Remove(pathbuf) => { + if mailboxes + .lock() + .unwrap() + .values() + .any(|f| f.fs_path == pathbuf) + { + let mailbox_hash = get_path_hash!(&pathbuf); + (sender)( + account_hash, + BackendEvent::Refresh(RefreshEvent { account_hash, - BackendEvent::Refresh(RefreshEvent { - account_hash, - mailbox_hash, - kind: RefreshEventKind::Failure(MeliError::new( - format!( - "mbox mailbox {} was renamed to {}.", - src.display(), - dest.display() - ), - )), - }), - ); - return; - } + mailbox_hash, + kind: RefreshEventKind::Failure(MeliError::new(format!( + "mbox mailbox {} was removed.", + pathbuf.display() + ))), + }), + ); + return Ok(()); } - /* Trigger rescan of mailboxes */ - DebouncedEvent::Rescan => { - for &mailbox_hash in mailboxes.lock().unwrap().keys() { - (sender)( + } + DebouncedEvent::Rename(src, dest) => { + if mailboxes + .lock() + .unwrap() + .values() + .any(|f| &f.fs_path == &src) + { + let mailbox_hash = get_path_hash!(&src); + (sender)( + account_hash, + BackendEvent::Refresh(RefreshEvent { account_hash, - BackendEvent::Refresh(RefreshEvent { - account_hash, - mailbox_hash, - kind: RefreshEventKind::Rescan, - }), - ); - } - return; + mailbox_hash, + kind: RefreshEventKind::Failure(MeliError::new(format!( + "mbox mailbox {} was renamed to {}.", + src.display(), + dest.display() + ))), + }), + ); + return Ok(()); } - _ => {} - }, - Err(e) => debug!("watch error: {:?}", e), - } + } + /* Trigger rescan of mailboxes */ + DebouncedEvent::Rescan => { + for &mailbox_hash in mailboxes.lock().unwrap().keys() { + (sender)( + account_hash, + BackendEvent::Refresh(RefreshEvent { + account_hash, + mailbox_hash, + kind: RefreshEventKind::Rescan, + }), + ); + } + return Ok(()); + } + _ => {} + }, + Err(e) => debug!("watch error: {:?}", e), } - })?; - Ok(handle.thread().id()) + } + Ok(()) + })) } - fn mailboxes(&self) -> Result> { - Ok(self + + fn mailboxes(&self) -> ResultFuture> { + let ret = Ok(self .mailboxes .lock() .unwrap() .iter() .map(|(h, f)| (*h, f.clone() as Mailbox)) - .collect()) + .collect()); + Ok(Box::pin(async { ret })) } fn operation(&self, env_hash: EnvelopeHash) -> Result> { diff --git a/melib/src/backends/nntp.rs b/melib/src/backends/nntp.rs index 6789e158..6ef152b1 100644 --- a/melib/src/backends/nntp.rs +++ b/melib/src/backends/nntp.rs @@ -32,7 +32,6 @@ pub use operations::*; mod connection; pub use connection::*; -use crate::async_workers::{Async, WorkContext}; use crate::backends::*; use crate::conf::AccountSettings; use crate::email::*; @@ -185,7 +184,7 @@ impl MailBackend for NntpType { } } - fn fetch_async( + fn fetch( &mut self, mailbox_hash: MailboxHash, ) -> Result>> + Send + 'static>>> { @@ -211,11 +210,11 @@ impl MailBackend for NntpType { })) } - fn refresh_async(&mut self, _mailbox_hash: MailboxHash) -> ResultFuture<()> { + fn refresh(&mut self, _mailbox_hash: MailboxHash) -> ResultFuture<()> { Err(MeliError::new("Unimplemented.")) } - fn mailboxes_async(&self) -> ResultFuture> { + fn mailboxes(&self) -> ResultFuture> { let uid_store = self.uid_store.clone(); let connection = self.connection.clone(); Ok(Box::pin(async move { @@ -229,12 +228,12 @@ impl MailBackend for NntpType { })) } - fn is_online_async(&self) -> ResultFuture<()> { + fn is_online(&self) -> ResultFuture<()> { let connection = self.connection.clone(); Ok(Box::pin(async move { match timeout(std::time::Duration::from_secs(3), connection.lock()).await { Ok(mut conn) => { - debug!("is_online_async"); + debug!("is_online"); match debug!(timeout(std::time::Duration::from_secs(3), conn.connect()).await) { Ok(Ok(())) => Ok(()), Err(err) | Ok(Err(err)) => { @@ -248,23 +247,7 @@ impl MailBackend for NntpType { })) } - fn fetch(&mut self, _mailbox_hash: MailboxHash) -> Result>>> { - Err(MeliError::new("Unimplemented.")) - } - - fn refresh(&mut self, _mailbox_hash: MailboxHash) -> Result> { - Err(MeliError::new("Unimplemented.")) - } - - fn watch(&self, _work_context: WorkContext) -> Result { - Err(MeliError::new("Unimplemented.")) - } - - fn watch_async(&self) -> ResultFuture<()> { - Err(MeliError::new("Unimplemented.")) - } - - fn mailboxes(&self) -> Result> { + fn watch(&self) -> ResultFuture<()> { Err(MeliError::new("Unimplemented.")) } diff --git a/melib/src/backends/notmuch.rs b/melib/src/backends/notmuch.rs index c5b74c8a..cd1d33a5 100644 --- a/melib/src/backends/notmuch.rs +++ b/melib/src/backends/notmuch.rs @@ -19,7 +19,6 @@ * along with meli. If not, see . */ -use crate::async_workers::{Async, AsyncBuilder, AsyncStatus, WorkContext}; use crate::backends::*; use crate::conf::AccountSettings; use crate::email::{Envelope, EnvelopeHash, Flag}; @@ -333,71 +332,56 @@ impl MailBackend for NotmuchDb { CAPABILITIES } - fn is_online(&self) -> Result<()> { - Ok(()) + fn is_online(&self) -> ResultFuture<()> { + Ok(Box::pin(async { Ok(()) })) } - fn fetch(&mut self, mailbox_hash: MailboxHash) -> Result>>> { - let mut w = AsyncBuilder::new(); - let database = NotmuchDb::new_connection(self.path.as_path(), self.lib.clone(), false); - let index = self.index.clone(); - let mailbox_index = self.mailbox_index.clone(); - let tag_index = self.tag_index.clone(); - let mailboxes = self.mailboxes.clone(); - let lib = self.lib.clone(); - let handle = { - let tx = w.tx(); - let closure = move |_work_context| { - if let Err(err) = database { - tx.send(AsyncStatus::Payload(Err(err))).unwrap(); - tx.send(AsyncStatus::Finished).unwrap(); - return; - } - let database = Arc::new(database.unwrap()); - let database_lck = database.inner.read().unwrap(); - let mailboxes_lck = mailboxes.read().unwrap(); - let mailbox = mailboxes_lck.get(&mailbox_hash).unwrap(); + fn fetch( + &mut self, + mailbox_hash: MailboxHash, + ) -> Result>> + Send + 'static>>> { + struct FetchState { + mailbox_hash: MailboxHash, + database: Arc, + index: Arc>>, + mailbox_index: Arc>>>, + mailboxes: Arc>>, + tag_index: Arc>>, + lib: Arc, + iter: std::vec::IntoIter, + } + impl FetchState { + async fn fetch(&mut self) -> Result>> { let mut unseen_count = 0; - let query: Query = - match Query::new(lib.clone(), &database_lck, mailbox.query_str.as_str()) { - Ok(q) => q, - Err(err) => { - tx.send(AsyncStatus::Payload(Err(err))).unwrap(); - tx.send(AsyncStatus::Finished).unwrap(); - return; - } - }; - let iter: Vec<::Item> = match query.search() { - Ok(i) => i.collect(), - Err(err) => { - tx.send(AsyncStatus::Payload(Err(err))).unwrap(); - tx.send(AsyncStatus::Finished).unwrap(); - return; - } - }; - { - let mut total_lck = mailbox.total.lock().unwrap(); - let mut unseen_lck = mailbox.unseen.lock().unwrap(); - *total_lck = iter.len(); - *unseen_lck = 0; - } let chunk_size = 250; - for chunk in iter.chunks(chunk_size) { - let mut ret: Vec = Vec::with_capacity(chunk_size); - let mut mailbox_index_lck = mailbox_index.write().unwrap(); - for &message in chunk { + let mut mailbox_index_lck = self.mailbox_index.write().unwrap(); + let mut ret: Vec = Vec::with_capacity(chunk_size); + let mut done: bool = false; + for _ in 0..chunk_size { + if let Some(message_id) = self.iter.next() { + let mut message: *mut notmuch_message_t = std::ptr::null_mut(); + unsafe { + call!(self.lib, notmuch_database_find_message)( + *self.database.inner.read().unwrap(), + message_id.as_ptr(), + &mut message as *mut _, + ) + }; + if message.is_null() { + continue; + } match notmuch_message_into_envelope( - lib.clone(), - index.clone(), - tag_index.clone(), - database.clone(), + self.lib.clone(), + self.index.clone(), + self.tag_index.clone(), + self.database.clone(), message, ) { Ok(env) => { mailbox_index_lck .entry(env.hash()) .or_default() - .push(mailbox_hash); + .push(self.mailbox_hash); if !env.is_seen() { unseen_count += 1; } @@ -406,235 +390,290 @@ impl MailBackend for NotmuchDb { Err(err) => { debug!("could not parse message {:?} {}", err, { let fs_path = unsafe { - call!(lib, notmuch_message_get_filename)(message) + call!(self.lib, notmuch_message_get_filename)(message) }; let c_str = unsafe { CStr::from_ptr(fs_path) }; String::from_utf8_lossy(c_str.to_bytes()) }); } } + } else { + done = true; + break; } - { - let mut unseen_lck = mailbox.unseen.lock().unwrap(); - *unseen_lck = unseen_count; - } - tx.send(AsyncStatus::Payload(Ok(ret))).unwrap(); } - tx.send(AsyncStatus::Finished).unwrap(); - }; - Box::new(closure) + { + let mailboxes_lck = self.mailboxes.read().unwrap(); + let mailbox = mailboxes_lck.get(&self.mailbox_hash).unwrap(); + let mut unseen_lck = mailbox.unseen.lock().unwrap(); + *unseen_lck += unseen_count; + } + if done && ret.is_empty() { + Ok(None) + } else { + Ok(Some(ret)) + } + } + } + let database = Arc::new(NotmuchDb::new_connection( + self.path.as_path(), + self.lib.clone(), + false, + )?); + let index = self.index.clone(); + let mailbox_index = self.mailbox_index.clone(); + let tag_index = self.tag_index.clone(); + let mailboxes = self.mailboxes.clone(); + let lib = self.lib.clone(); + let v: Vec; + { + let database_lck = database.inner.read().unwrap(); + let mailboxes_lck = mailboxes.read().unwrap(); + let mailbox = mailboxes_lck.get(&mailbox_hash).unwrap(); + let query: Query = + Query::new(self.lib.clone(), &database_lck, mailbox.query_str.as_str())?; + { + let mut total_lck = mailbox.total.lock().unwrap(); + let mut unseen_lck = mailbox.unseen.lock().unwrap(); + *total_lck = query.count()? as usize; + *unseen_lck = 0; + } + v = query + .search()? + .into_iter() + .map(|m| notmuch_message_insert(&lib, &index, m)) + .collect(); + } + + let mut state = FetchState { + mailbox_hash, + mailboxes, + database, + lib, + index, + mailbox_index, + tag_index, + iter: v.into_iter(), }; - Ok(w.build(handle)) + Ok(Box::pin(async_stream::try_stream! { + while let Some(res) = state.fetch().await.map_err(|err| { debug!("fetch err {:?}", &err); err})? { + yield res; + } + })) } - fn watch(&self, _work_context: WorkContext) -> Result { - extern crate notify; - use crate::backends::RefreshEventKind::*; - use notify::{watcher, RecursiveMode, Watcher}; - let sender = self.event_consumer.clone(); - let (tx, rx) = std::sync::mpsc::channel(); - let mut watcher = watcher(tx, std::time::Duration::from_secs(2)).unwrap(); - watcher.watch(&self.path, RecursiveMode::Recursive).unwrap(); - let path = self.path.clone(); - let lib = self.lib.clone(); - let tag_index = self.tag_index.clone(); - let index = self.index.clone(); - let account_hash = { - let mut hasher = DefaultHasher::new(); - hasher.write(self.account_name.as_bytes()); - hasher.finish() - }; - let mailbox_index = self.mailbox_index.clone(); - let mailboxes = self.mailboxes.clone(); - { - let database = NotmuchDb::new_connection(path.as_path(), lib.clone(), false)?; - let mut revision_uuid_lck = self.revision_uuid.write().unwrap(); - - *revision_uuid_lck = unsafe { - call!(lib, notmuch_database_get_revision)( - *database.inner.read().unwrap(), - std::ptr::null_mut(), - ) + /* + fn watch(&self) -> ResultFuture<()> { + extern crate notify; + use crate::backends::RefreshEventKind::*; + use notify::{watcher, RecursiveMode, Watcher}; + let sender = self.event_consumer.clone(); + let (tx, rx) = std::sync::mpsc::channel(); + let mut watcher = watcher(tx, std::time::Duration::from_secs(2)).unwrap(); + watcher.watch(&self.path, RecursiveMode::Recursive).unwrap(); + let path = self.path.clone(); + let lib = self.lib.clone(); + let tag_index = self.tag_index.clone(); + let index = self.index.clone(); + let account_hash = { + let mut hasher = DefaultHasher::new(); + hasher.write(self.account_name.as_bytes()); + hasher.finish() }; - } - let revision_uuid = self.revision_uuid.clone(); + let mailbox_index = self.mailbox_index.clone(); + let mailboxes = self.mailboxes.clone(); + { + let database = NotmuchDb::new_connection(path.as_path(), lib.clone(), false)?; + let mut revision_uuid_lck = self.revision_uuid.write().unwrap(); - let handle = std::thread::Builder::new() - .name(format!("watching {}", self.account_name)) - .spawn(move || { - let _watcher = watcher; - let c = move |sender: &BackendEventConsumer| -> std::result::Result<(), MeliError> { - loop { - let _ = rx.recv().map_err(|err| err.to_string())?; - { - let database = - NotmuchDb::new_connection(path.as_path(), lib.clone(), false)?; - let database_lck = database.inner.read().unwrap(); - let mut revision_uuid_lck = revision_uuid.write().unwrap(); + *revision_uuid_lck = unsafe { + call!(lib, notmuch_database_get_revision)( + *database.inner.read().unwrap(), + std::ptr::null_mut(), + ) + }; + } + let revision_uuid = self.revision_uuid.clone(); - let new_revision = unsafe { - call!(lib, notmuch_database_get_revision)( - *database_lck, - std::ptr::null_mut(), - ) - }; - if new_revision > *revision_uuid_lck { - let query_str = - format!("lastmod:{}..{}", *revision_uuid_lck, new_revision); - let query: Query = - Query::new(lib.clone(), &database_lck, &query_str)?; - drop(database_lck); - let iter = query.search()?; - let mut tag_lock = tag_index.write().unwrap(); - let mailbox_index_lck = mailbox_index.write().unwrap(); - let mailboxes_lck = mailboxes.read().unwrap(); - let database = Arc::new(database); - for message in iter { - let msg_id = unsafe { - call!(lib, notmuch_message_get_message_id)(message) - }; - let c_str = unsafe { CStr::from_ptr(msg_id) }; - let env_hash = { - let mut hasher = DefaultHasher::default(); - c_str.hash(&mut hasher); - hasher.finish() - }; - if let Some(mailbox_hashes) = mailbox_index_lck.get(&env_hash) { - let tags: (Flag, Vec) = - TagIterator::new(lib.clone(), message) - .collect_flags_and_tags(); - for tag in tags.1.iter() { - let mut hasher = DefaultHasher::new(); - hasher.write(tag.as_bytes()); - let num = hasher.finish(); - if !tag_lock.contains_key(&num) { - tag_lock.insert(num, tag.clone()); + let handle = std::thread::Builder::new() + .name(format!("watching {}", self.account_name)) + .spawn(move || { + let _watcher = watcher; + let c = move |sender: &BackendEventConsumer| -> std::result::Result<(), MeliError> { + loop { + let _ = rx.recv().map_err(|err| err.to_string())?; + { + let database = + NotmuchDb::new_connection(path.as_path(), lib.clone(), false)?; + let database_lck = database.inner.read().unwrap(); + let mut revision_uuid_lck = revision_uuid.write().unwrap(); + + let new_revision = unsafe { + call!(lib, notmuch_database_get_revision)( + *database_lck, + std::ptr::null_mut(), + ) + }; + if new_revision > *revision_uuid_lck { + let query_str = + format!("lastmod:{}..{}", *revision_uuid_lck, new_revision); + let query: Query = + Query::new(lib.clone(), &database_lck, &query_str)?; + drop(database_lck); + let iter = query.search()?; + let mut tag_lock = tag_index.write().unwrap(); + let mailbox_index_lck = mailbox_index.write().unwrap(); + let mailboxes_lck = mailboxes.read().unwrap(); + let database = Arc::new(database); + for message in iter { + let msg_id = unsafe { + call!(lib, notmuch_message_get_message_id)(message) + }; + let c_str = unsafe { CStr::from_ptr(msg_id) }; + let env_hash = { + let mut hasher = DefaultHasher::default(); + c_str.hash(&mut hasher); + hasher.finish() + }; + if let Some(mailbox_hashes) = mailbox_index_lck.get(&env_hash) { + let tags: (Flag, Vec) = + TagIterator::new(lib.clone(), message) + .collect_flags_and_tags(); + for tag in tags.1.iter() { + let mut hasher = DefaultHasher::new(); + hasher.write(tag.as_bytes()); + let num = hasher.finish(); + if !tag_lock.contains_key(&num) { + tag_lock.insert(num, tag.clone()); + } + } + for &mailbox_hash in mailbox_hashes { + (sender)( + account_hash, + BackendEvent::Refresh(RefreshEvent { + account_hash, + mailbox_hash, + kind: NewFlags(env_hash, tags.clone()), + }), + ); + } + } else { + match notmuch_message_into_envelope( + lib.clone(), + index.clone(), + tag_index.clone(), + database.clone(), + message, + ) { + Ok(env) => { + for (&mailbox_hash, m) in mailboxes_lck.iter() { + let query_str = format!( + "{} id:{}", + m.query_str.as_str(), + c_str.to_string_lossy() + ); + let database_lck = + database.inner.read().unwrap(); + let query: Query = Query::new( + lib.clone(), + &database_lck, + &query_str, + )?; + if query.count().unwrap_or(0) > 0 { + let mut total_lck = m.total.lock().unwrap(); + let mut unseen_lck = + m.unseen.lock().unwrap(); + *total_lck += 1; + if !env.is_seen() { + *unseen_lck += 1; + } + (sender)( + account_hash, + BackendEvent::Refresh(RefreshEvent { + account_hash, + mailbox_hash, + kind: Create(Box::new(env.clone())), + }), + ); + } + } + } + Err(err) => { + debug!("could not parse message {:?}", err); + } } } - for &mailbox_hash in mailbox_hashes { - (sender)( - account_hash, - BackendEvent::Refresh(RefreshEvent { - account_hash, - mailbox_hash, - kind: NewFlags(env_hash, tags.clone()), - }), - ); - } - } else { - match notmuch_message_into_envelope( - lib.clone(), - index.clone(), - tag_index.clone(), - database.clone(), - message, - ) { - Ok(env) => { - for (&mailbox_hash, m) in mailboxes_lck.iter() { - let query_str = format!( - "{} id:{}", - m.query_str.as_str(), - c_str.to_string_lossy() - ); - let database_lck = - database.inner.read().unwrap(); - let query: Query = Query::new( - lib.clone(), - &database_lck, - &query_str, - )?; - if query.count().unwrap_or(0) > 0 { + } + drop(query); + let database_lck = database.inner.read().unwrap(); + index.write().unwrap().retain(|&env_hash, msg_id| { + let mut message: *mut notmuch_message_t = std::ptr::null_mut(); + if let Err(err) = unsafe { + try_call!( + lib, + call!(lib, notmuch_database_find_message)( + *database_lck, + msg_id.as_ptr(), + &mut message as *mut _, + ) + ) + } { + debug!(err); + false + } else { + if message.is_null() { + if let Some(mailbox_hashes) = + mailbox_index_lck.get(&env_hash) + { + for &mailbox_hash in mailbox_hashes { + let m = &mailboxes_lck[&mailbox_hash]; let mut total_lck = m.total.lock().unwrap(); - let mut unseen_lck = - m.unseen.lock().unwrap(); - *total_lck += 1; - if !env.is_seen() { - *unseen_lck += 1; - } + *total_lck = total_lck.saturating_sub(1); (sender)( account_hash, BackendEvent::Refresh(RefreshEvent { account_hash, mailbox_hash, - kind: Create(Box::new(env.clone())), + kind: Remove(env_hash), }), ); } } } - Err(err) => { - debug!("could not parse message {:?}", err); - } + !message.is_null() } - } - } - drop(query); - let database_lck = database.inner.read().unwrap(); - index.write().unwrap().retain(|&env_hash, msg_id| { - let mut message: *mut notmuch_message_t = std::ptr::null_mut(); - if let Err(err) = unsafe { - try_call!( - lib, - call!(lib, notmuch_database_find_message)( - *database_lck, - msg_id.as_ptr(), - &mut message as *mut _, - ) - ) - } { - debug!(err); - false - } else { - if message.is_null() { - if let Some(mailbox_hashes) = - mailbox_index_lck.get(&env_hash) - { - for &mailbox_hash in mailbox_hashes { - let m = &mailboxes_lck[&mailbox_hash]; - let mut total_lck = m.total.lock().unwrap(); - *total_lck = total_lck.saturating_sub(1); - (sender)( - account_hash, - BackendEvent::Refresh(RefreshEvent { - account_hash, - mailbox_hash, - kind: Remove(env_hash), - }), - ); - } - } - } - !message.is_null() - } - }); + }); - *revision_uuid_lck = new_revision; + *revision_uuid_lck = new_revision; + } } } - } - }; + }; - if let Err(err) = c(&sender) { - (sender)( - account_hash, - BackendEvent::Refresh(RefreshEvent { + if let Err(err) = c(&sender) { + (sender)( account_hash, - mailbox_hash: 0, - kind: Failure(err), - }), - ); - } - })?; - Ok(handle.thread().id()) - } - fn mailboxes(&self) -> Result> { - Ok(self + BackendEvent::Refresh(RefreshEvent { + account_hash, + mailbox_hash: 0, + kind: Failure(err), + }), + ); + } + })?; + Ok(handle.thread().id()) + } + */ + + fn mailboxes(&self) -> ResultFuture> { + let ret = Ok(self .mailboxes .read() .unwrap() .iter() .map(|(k, f)| (*k, BackendMailbox::clone(f))) - .collect()) + .collect()); + Ok(Box::pin(async { ret })) } fn operation(&self, hash: EnvelopeHash) -> Result> { @@ -851,12 +890,13 @@ impl BackendOp for NotmuchOp { } } -pub struct MessageIterator { +pub struct MessageIterator<'query> { lib: Arc, messages: *mut notmuch_messages_t, + _ph: std::marker::PhantomData<*const Query<'query>>, } -impl Iterator for MessageIterator { +impl Iterator for MessageIterator<'_> { type Item = *mut notmuch_message_t; fn next(&mut self) -> Option { if self.messages.is_null() { @@ -1011,7 +1051,7 @@ impl<'s> Query<'s> { Ok(count) } - fn search(&self) -> Result { + fn search(&'s self) -> Result> { let mut messages: *mut notmuch_messages_t = std::ptr::null_mut(); let status = unsafe { call!(self.lib, notmuch_query_search_messages)(self.ptr, &mut messages as *mut _) @@ -1026,6 +1066,7 @@ impl<'s> Query<'s> { Ok(MessageIterator { messages, lib: self.lib.clone(), + _ph: std::marker::PhantomData, }) } } @@ -1038,6 +1079,23 @@ impl Drop for Query<'_> { } } +fn notmuch_message_insert( + lib: &libloading::Library, + index: &RwLock>, + message: *mut notmuch_message_t, +) -> CString { + let msg_id = unsafe { call!(lib, notmuch_message_get_message_id)(message) }; + let env_hash = { + let c_str = unsafe { CStr::from_ptr(msg_id) }; + let mut hasher = DefaultHasher::default(); + c_str.hash(&mut hasher); + hasher.finish() + }; + let c_str = unsafe { CStr::from_ptr(msg_id) }; + index.write().unwrap().insert(env_hash, c_str.into()); + c_str.into() +} + fn notmuch_message_into_envelope( lib: Arc, index: Arc>>, diff --git a/melib/src/lib.rs b/melib/src/lib.rs index 106f3e44..532aa65d 100644 --- a/melib/src/lib.rs +++ b/melib/src/lib.rs @@ -31,7 +31,6 @@ //! values (see module `thread`) //! //! Other exports are -//! - Thread management (see module `async_workers`) //! - Basic mail account configuration to use with `backends` (see module `conf`) //! - Parser combinators (see module `parsec`) //! - A `ShellExpandTrait` to expand paths like a shell. @@ -105,7 +104,6 @@ pub use self::logging::LoggingLevel::*; pub use self::logging::*; pub mod addressbook; -pub mod async_workers; pub mod backends; mod collection; pub mod conf; diff --git a/src/bin.rs b/src/bin.rs index d511f634..d22e4513 100644 --- a/src/bin.rs +++ b/src/bin.rs @@ -75,9 +75,6 @@ use crate::components::*; pub mod conf; use crate::conf::*; -pub mod workers; -use crate::workers::*; - #[cfg(feature = "sqlite3")] pub mod sqlite3; @@ -468,9 +465,6 @@ fn run_app(opt: Opt) -> Result<()> { state.check_accounts(); state.redraw(); }, - ThreadEvent::NewThread(id, name) => { - state.new_thread(id, name); - }, ThreadEvent::JobFinished(id) => { debug!("Job finished {}", id); for account in state.context.accounts.values_mut() { diff --git a/src/components/mail/status.rs b/src/components/mail/status.rs index 37951691..714660e2 100644 --- a/src/components/mail/status.rs +++ b/src/components/mail/status.rs @@ -50,7 +50,7 @@ impl Component for StatusPanel { self.draw_accounts(context); let (width, height) = self.content.size(); { - let (_, y) = write_string_to_grid( + let (_, _) = write_string_to_grid( "Worker threads", &mut self.content, self.theme_default.fg, @@ -59,6 +59,7 @@ impl Component for StatusPanel { ((1, 1), (width - 1, height - 1)), Some(1), ); + /* let mut y = y + 1; let work_controller = context.work_controller().threads.lock().unwrap(); let mut workers: Vec<&Worker> = work_controller.values().collect::>(); @@ -130,6 +131,7 @@ impl Component for StatusPanel { y = y_off + 1; } + */ } let (cols, rows) = (width!(area), height!(area)); self.cursor = ( diff --git a/src/conf.rs b/src/conf.rs index c85319c4..1f294f87 100644 --- a/src/conf.rs +++ b/src/conf.rs @@ -350,18 +350,7 @@ impl FileSettings { e.to_string() )) })?; - let mut backends = melib::backends::Backends::new(); - let plugin_manager = crate::plugins::PluginManager::new(); - for (_, p) in s.plugins.clone() { - if crate::plugins::PluginKind::Backend == p.kind() { - crate::plugins::backend::PluginBackend::register( - plugin_manager.listener(), - p.clone(), - &mut backends, - ); - } - } - + let backends = melib::backends::Backends::new(); let Themes { light: default_light, dark: default_dark, diff --git a/src/conf/accounts.rs b/src/conf/accounts.rs index 333688f2..29e7973e 100644 --- a/src/conf/accounts.rs +++ b/src/conf/accounts.rs @@ -26,7 +26,6 @@ use super::{AccountConf, FileMailboxConf}; use crate::jobs::{JobChannel, JobExecutor, JobId, JoinHandle}; use indexmap::IndexMap; -use melib::async_workers::{Async, AsyncBuilder, AsyncStatus, WorkContext}; use melib::backends::*; use melib::email::*; use melib::error::{MeliError, Result}; @@ -72,8 +71,6 @@ macro_rules! try_recv_timeout { }}; } -pub type Worker = Option>>>; - #[derive(Debug)] pub enum MailboxStatus { Available, @@ -112,7 +109,6 @@ pub struct MailboxEntry { pub name: String, pub ref_mailbox: Mailbox, pub conf: FileMailboxConf, - pub worker: Worker, } impl MailboxEntry { @@ -152,7 +148,6 @@ pub struct Account { sent_mailbox: Option, pub(crate) collection: Collection, pub(crate) address_book: AddressBook, - pub(crate) work_context: WorkContext, pub(crate) settings: AccountConf, pub(crate) backend: Arc>>, @@ -354,7 +349,6 @@ impl Account { name: String, mut settings: AccountConf, map: &Backends, - work_context: WorkContext, job_executor: Arc, sender: Sender, event_consumer: BackendEventConsumer, @@ -397,17 +391,18 @@ impl Account { let mut active_jobs = HashMap::default(); let mut active_job_instants = BTreeMap::default(); - if backend.capabilities().is_async { - if let Ok(mailboxes_job) = backend.mailboxes_async() { - if let Ok(online_job) = backend.is_online_async() { - let (rcvr, handle, job_id) = - job_executor.spawn_specialized(online_job.then(|_| mailboxes_job)); - active_jobs.insert(job_id, JobRequest::Mailboxes(handle, rcvr)); - active_job_instants.insert(std::time::Instant::now(), job_id); - } + if let Ok(mailboxes_job) = backend.mailboxes() { + if let Ok(online_job) = backend.is_online() { + let (rcvr, handle, job_id) = if backend.capabilities().is_async { + job_executor.spawn_specialized(online_job.then(|_| mailboxes_job)) + } else { + job_executor.spawn_blocking(online_job.then(|_| mailboxes_job)) + }; + active_jobs.insert(job_id, JobRequest::Mailboxes(handle, rcvr)); + active_job_instants.insert(std::time::Instant::now(), job_id); } } - let mut ret = Account { + Ok(Account { hash, name, is_online: if !backend.capabilities().is_remote { @@ -422,7 +417,6 @@ impl Account { address_book, sent_mailbox: Default::default(), collection: Default::default(), - work_context, settings, sender, job_executor, @@ -431,21 +425,10 @@ impl Account { event_queue: VecDeque::with_capacity(8), backend_capabilities: backend.capabilities(), backend: Arc::new(RwLock::new(backend)), - }; - - if !ret.backend_capabilities.is_remote && !ret.backend_capabilities.is_async { - ret.init(None)?; - } - - Ok(ret) + }) } - fn init(&mut self, ref_mailboxes: Option>) -> Result<()> { - let mut ref_mailboxes: HashMap = if let Some(v) = ref_mailboxes { - v - } else { - self.backend.read().unwrap().mailboxes()? - }; + fn init(&mut self, mut ref_mailboxes: HashMap) -> Result<()> { self.backend_capabilities = self.backend.read().unwrap().capabilities(); let mut mailbox_entries: IndexMap = IndexMap::with_capacity_and_hasher(ref_mailboxes.len(), Default::default()); @@ -493,7 +476,6 @@ impl Account { name: f.path().to_string(), status: MailboxStatus::None, conf: conf.clone(), - worker: None, }, ); } else { @@ -518,7 +500,6 @@ impl Account { name: f.path().to_string(), status: MailboxStatus::None, conf: new, - worker: None, }, ); } @@ -574,30 +555,22 @@ impl Account { { let total = entry.ref_mailbox.count().ok().unwrap_or((0, 0)).1; entry.status = MailboxStatus::Parsing(0, total); - if self.backend_capabilities.is_async { - if let Ok(mailbox_job) = self.backend.write().unwrap().fetch_async(*h) { - let mailbox_job = mailbox_job.into_future(); - let (rcvr, handle, job_id) = - self.job_executor.spawn_specialized(mailbox_job); - self.sender - .send(ThreadEvent::UIEvent(UIEvent::StatusEvent( - StatusEvent::NewJob(job_id), - ))) - .unwrap(); - self.active_jobs - .insert(job_id, JobRequest::Fetch(*h, handle, rcvr)); - self.active_job_instants - .insert(std::time::Instant::now(), job_id); - } - } else { - entry.worker = - match Account::new_worker(&f, &mut self.backend, &self.work_context) { - Ok(v) => v, - Err(err) => { - entry.status = MailboxStatus::Failed(err); - None - } - }; + if let Ok(mailbox_job) = self.backend.write().unwrap().fetch(*h) { + let mailbox_job = mailbox_job.into_future(); + let (rcvr, handle, job_id) = if self.backend_capabilities.is_async { + self.job_executor.spawn_specialized(mailbox_job) + } else { + self.job_executor.spawn_blocking(mailbox_job) + }; + self.sender + .send(ThreadEvent::UIEvent(UIEvent::StatusEvent( + StatusEvent::NewJob(job_id), + ))) + .unwrap(); + self.active_jobs + .insert(job_id, JobRequest::Fetch(*h, handle, rcvr)); + self.active_job_instants + .insert(std::time::Instant::now(), job_id); } } }); @@ -612,13 +585,9 @@ impl Account { Ok(()) } - fn new_worker( - mailbox: &Mailbox, - backend: &Arc>>, - work_context: &WorkContext, - ) -> Result { + fn new_worker(mailbox: &Mailbox, backend: &Arc>>) -> Result<()> { let mailbox_hash = mailbox.hash(); - let mut mailbox_handle = backend.write().unwrap().fetch(mailbox_hash)?; + let mailbox_handle = backend.write().unwrap().fetch(mailbox_hash)?; let priority = match mailbox.special_usage() { SpecialUsageMailbox::Inbox => 0, SpecialUsageMailbox::Sent => 1, @@ -822,31 +791,7 @@ impl Account { return Some(EnvelopeRemove(env_hash, thread_hash)); } RefreshEventKind::Rescan => { - let handle = match Account::new_worker( - &self.mailbox_entries[&mailbox_hash].ref_mailbox, - &mut self.backend, - &self.work_context, - ) { - Ok(v) => v, - Err(err) => { - let ret = Some(Notification( - None, - err.to_string(), - Some(crate::types::NotificationType::ERROR), - )); - self.mailbox_entries - .entry(mailbox_hash) - .and_modify(|entry| { - entry.status = MailboxStatus::Failed(err); - }); - return ret; - } - }; - self.mailbox_entries - .entry(mailbox_hash) - .and_modify(|entry| { - entry.worker = handle; - }); + self.watch(); } RefreshEventKind::Failure(err) => { debug!("RefreshEvent Failure: {}", err.to_string()); @@ -891,62 +836,40 @@ impl Account { .unwrap(); return Ok(()); } - if self.backend_capabilities.is_async { - if let Ok(refresh_job) = self.backend.write().unwrap().refresh_async(mailbox_hash) { - let (rcvr, handle, job_id) = self.job_executor.spawn_specialized(refresh_job); - self.sender - .send(ThreadEvent::UIEvent(UIEvent::StatusEvent( - StatusEvent::NewJob(job_id), - ))) - .unwrap(); - self.active_jobs - .insert(job_id, JobRequest::Refresh(mailbox_hash, handle, rcvr)); - self.active_job_instants - .insert(std::time::Instant::now(), job_id); - } - } else { - let mut h = self.backend.write().unwrap().refresh(mailbox_hash)?; - self.work_context.new_work.send(h.work().unwrap()).unwrap(); + if let Ok(refresh_job) = self.backend.write().unwrap().refresh(mailbox_hash) { + let (rcvr, handle, job_id) = if self.backend_capabilities.is_async { + self.job_executor.spawn_specialized(refresh_job) + } else { + self.job_executor.spawn_blocking(refresh_job) + }; + self.sender + .send(ThreadEvent::UIEvent(UIEvent::StatusEvent( + StatusEvent::NewJob(job_id), + ))) + .unwrap(); + self.active_jobs + .insert(job_id, JobRequest::Refresh(mailbox_hash, handle, rcvr)); + self.active_job_instants + .insert(std::time::Instant::now(), job_id); } Ok(()) } + pub fn watch(&mut self) { if self.settings.account().manual_refresh { return; } - if self.backend_capabilities.is_async { - if !self.active_jobs.values().any(|j| j.is_watch()) { - match self.backend.read().unwrap().watch_async() { - Ok(fut) => { - let (handle, job_id) = self.job_executor.spawn(fut); - self.active_jobs.insert(job_id, JobRequest::Watch(handle)); - } - Err(e) => { - self.sender - .send(ThreadEvent::UIEvent(UIEvent::StatusEvent( - StatusEvent::DisplayMessage(e.to_string()), - ))) - .unwrap(); - } + if !self.active_jobs.values().any(|j| j.is_watch()) { + match self.backend.read().unwrap().watch() { + Ok(fut) => { + let (_channel, handle, job_id) = if self.backend_capabilities.is_async { + self.job_executor.spawn_specialized(fut) + } else { + self.job_executor.spawn_blocking(fut) + }; + self.active_jobs.insert(job_id, JobRequest::Watch(handle)); } - } - } else { - match self - .backend - .read() - .unwrap() - .watch(self.work_context.clone()) - { - Ok(id) => { - self.sender - .send(ThreadEvent::NewThread( - id, - format!("watching {}", self.name()).into(), - )) - .unwrap(); - } - Err(e) => { self.sender .send(ThreadEvent::UIEvent(UIEvent::StatusEvent( @@ -994,95 +917,37 @@ impl Account { if mailbox_hash == 0 { return Err(0); } - loop { - match self - .mailbox_entries - .get_mut(&mailbox_hash) - .unwrap() - .worker - .as_mut() + match self.mailbox_entries[&mailbox_hash].status { + MailboxStatus::Available | MailboxStatus::Parsing(_, _) + if self + .collection + .mailboxes + .read() + .unwrap() + .contains_key(&mailbox_hash) => { - None => { - return match self.mailbox_entries[&mailbox_hash].status { - MailboxStatus::Available | MailboxStatus::Parsing(_, _) - if self - .collection - .mailboxes - .read() - .unwrap() - .contains_key(&mailbox_hash) => - { - Ok(()) + Ok(()) + } + MailboxStatus::None => { + if !self.active_jobs.values().any(|j| j.is_fetch(mailbox_hash)) { + let mailbox_job = self.backend.write().unwrap().fetch(mailbox_hash); + match mailbox_job { + Ok(mailbox_job) => { + let mailbox_job = mailbox_job.into_future(); + let (rcvr, handle, job_id) = if self.backend_capabilities.is_async { + self.job_executor.spawn_specialized(mailbox_job) + } else { + self.job_executor.spawn_blocking(mailbox_job) + }; + self.sender + .send(ThreadEvent::UIEvent(UIEvent::StatusEvent( + StatusEvent::NewJob(job_id), + ))) + .unwrap(); + self.active_jobs + .insert(job_id, JobRequest::Fetch(mailbox_hash, handle, rcvr)); } - MailboxStatus::None => { - if self.backend_capabilities.is_async { - if !self.active_jobs.values().any(|j| j.is_fetch(mailbox_hash)) { - let mailbox_job = - self.backend.write().unwrap().fetch_async(mailbox_hash); - match mailbox_job { - Ok(mailbox_job) => { - let mailbox_job = mailbox_job.into_future(); - let (rcvr, handle, job_id) = - self.job_executor.spawn_specialized(mailbox_job); - self.sender - .send(ThreadEvent::UIEvent(UIEvent::StatusEvent( - StatusEvent::NewJob(job_id), - ))) - .unwrap(); - self.active_jobs.insert( - job_id, - JobRequest::Fetch(mailbox_hash, handle, rcvr), - ); - } - Err(err) => { - self.mailbox_entries.entry(mailbox_hash).and_modify( - |entry| { - entry.status = MailboxStatus::Failed(err); - }, - ); - self.sender - .send(ThreadEvent::UIEvent(UIEvent::StartupCheck( - mailbox_hash, - ))) - .unwrap(); - } - } - } - } else if self.mailbox_entries[&mailbox_hash].worker.is_none() { - let handle = match Account::new_worker( - &self.mailbox_entries[&mailbox_hash].ref_mailbox, - &mut self.backend, - &self.work_context, - ) { - Ok(v) => v, - Err(err) => { - self.mailbox_entries.entry(mailbox_hash).and_modify( - |entry| { - entry.status = MailboxStatus::Failed(err); - }, - ); - return Err(0); - } - }; - self.mailbox_entries - .entry(mailbox_hash) - .and_modify(|entry| { - entry.worker = handle; - }); - } - self.collection.new_mailbox(mailbox_hash); - Err(0) - } - _ => Err(0), - }; - } - Some(ref mut w) => match debug!(w.poll()) { - Ok(AsyncStatus::NoUpdate) => { - break; - } - Ok(AsyncStatus::Payload(payload)) => { - debug!("got payload in status for {}", mailbox_hash); - if let Err(err) = payload { + Err(err) => { self.mailbox_entries .entry(mailbox_hash) .and_modify(|entry| { @@ -1091,85 +956,13 @@ impl Account { self.sender .send(ThreadEvent::UIEvent(UIEvent::StartupCheck(mailbox_hash))) .unwrap(); - return Err(0); } - let envelopes = payload - .unwrap() - .into_iter() - .map(|e| (e.hash(), e)) - .collect::>(); - self.mailbox_entries - .entry(mailbox_hash) - .and_modify(|entry| match entry.status { - MailboxStatus::None => { - entry.status = MailboxStatus::Parsing(envelopes.len(), 0); - } - MailboxStatus::Parsing(ref mut done, _) => { - *done += envelopes.len(); - } - MailboxStatus::Failed(_) => { - entry.status = MailboxStatus::Parsing(envelopes.len(), 0); - } - MailboxStatus::Available => {} - }); - if let Some(updated_mailboxes) = - self.collection - .merge(envelopes, mailbox_hash, self.sent_mailbox) - { - for f in updated_mailboxes { - self.sender - .send(ThreadEvent::UIEvent(UIEvent::StartupCheck(f))) - .unwrap(); - } - } - self.sender - .send(ThreadEvent::UIEvent(UIEvent::StartupCheck(mailbox_hash))) - .unwrap(); } - Ok(AsyncStatus::Finished) => { - debug!("got finished in status for {}", mailbox_hash); - self.mailbox_entries - .entry(mailbox_hash) - .and_modify(|entry| { - entry.status = MailboxStatus::Available; - entry.worker = None; - }); - self.sender - .send(ThreadEvent::UIEvent(UIEvent::MailboxUpdate(( - self.hash, - mailbox_hash, - )))) - .unwrap(); - } - Ok(AsyncStatus::ProgressReport(n)) => { - self.mailbox_entries - .entry(mailbox_hash) - .and_modify(|entry| match entry.status { - MailboxStatus::Parsing(ref mut d, _) => { - *d += n; - } - _ => {} - }); - //return Err(n); - } - _ => { - break; - } - }, - }; - } - if self.mailbox_entries[&mailbox_hash].status.is_available() - || (self.mailbox_entries[&mailbox_hash].status.is_parsing() - && self - .collection - .mailboxes - .read() - .unwrap() - .contains_key(&mailbox_hash)) - { - Ok(()) - } else { - Err(0) + } + self.collection.new_mailbox(mailbox_hash); + Err(0) + } + _ => Err(0), } } @@ -1403,14 +1196,7 @@ impl Account { parent.ref_mailbox = mailboxes.remove(&parent_hash).unwrap(); }); } - let (status, worker) = match Account::new_worker( - &mailboxes[&mailbox_hash], - &mut self.backend, - &self.work_context, - ) { - Ok(v) => (MailboxStatus::Parsing(0, 0), v), - Err(err) => (MailboxStatus::Failed(err), None), - }; + let status = MailboxStatus::Parsing(0, 0); self.mailbox_entries.insert( mailbox_hash, @@ -1418,7 +1204,6 @@ impl Account { name: mailboxes[&mailbox_hash].path().to_string(), status, conf: new, - worker, ref_mailbox: mailboxes.remove(&mailbox_hash).unwrap(), }, ); @@ -1486,8 +1271,6 @@ impl Account { &self.mailbox_entries, &mut self.mailboxes_order, ); - // FIXME Kill worker as well - // FIXME remove from settings as well Ok(format!( @@ -1577,34 +1360,21 @@ impl Account { { return self.is_online.clone(); } - if self.backend_capabilities.is_async { - if self.is_online.is_ok() && !timeout { - return Ok(()); - } - if !self.active_jobs.values().any(JobRequest::is_online) { - let online_job = self.backend.read().unwrap().is_online_async(); - if let Ok(online_job) = online_job { - let (rcvr, handle, job_id) = self.job_executor.spawn_specialized(online_job); - self.insert_job(job_id, JobRequest::IsOnline(handle, rcvr)); - } - } - return self.is_online.clone(); - } else { - let ret = self.backend.read().unwrap().is_online(); - if ret.is_ok() != self.is_online.is_ok() { - if ret.is_ok() { - self.last_online_request = std::time::Instant::now(); - self.init(None)?; - } - self.sender - .send(ThreadEvent::UIEvent(UIEvent::AccountStatusChange( - self.hash, - ))) - .unwrap(); - } - self.is_online = ret.clone(); - ret + if self.is_online.is_ok() && !timeout { + return Ok(()); } + if !self.active_jobs.values().any(JobRequest::is_online) { + let online_job = self.backend.read().unwrap().is_online(); + if let Ok(online_job) = online_job { + let (rcvr, handle, job_id) = if self.backend_capabilities.is_async { + self.job_executor.spawn_specialized(online_job) + } else { + self.job_executor.spawn_blocking(online_job) + }; + self.insert_job(job_id, JobRequest::IsOnline(handle, rcvr)); + } + } + return self.is_online.clone(); } pub fn search( @@ -1657,8 +1427,7 @@ impl Account { match self.active_jobs.remove(job_id).unwrap() { JobRequest::Mailboxes(_, ref mut chan) => { if let Some(mailboxes) = chan.try_recv().unwrap() { - if let Err(err) = mailboxes.and_then(|mailboxes| self.init(Some(mailboxes))) - { + if let Err(err) = mailboxes.and_then(|mailboxes| self.init(mailboxes)) { if err.kind.is_authentication() { self.sender .send(ThreadEvent::UIEvent(UIEvent::Notification( @@ -1670,9 +1439,7 @@ impl Account { self.is_online = Err(err); return true; } - if let Ok(mailboxes_job) = - self.backend.read().unwrap().mailboxes_async() - { + if let Ok(mailboxes_job) = self.backend.read().unwrap().mailboxes() { let (rcvr, handle, job_id) = self.job_executor.spawn_specialized(mailboxes_job); self.active_jobs @@ -1699,7 +1466,6 @@ impl Account { .entry(mailbox_hash) .and_modify(|entry| { entry.status = MailboxStatus::Available; - entry.worker = None; }); self.sender .send(ThreadEvent::UIEvent(UIEvent::MailboxUpdate(( @@ -1789,7 +1555,7 @@ impl Account { } self.is_online = is_online; } - if let Ok(online_job) = self.backend.read().unwrap().is_online_async() { + if let Ok(online_job) = self.backend.read().unwrap().is_online() { let (rcvr, handle, job_id) = self.job_executor.spawn_specialized(online_job); self.active_jobs diff --git a/src/jobs.rs b/src/jobs.rs index 62fde1d9..9023e833 100644 --- a/src/jobs.rs +++ b/src/jobs.rs @@ -157,8 +157,6 @@ impl JobExecutor { F: Future> + Send + 'static, { let job_id = JobId::new(); - let _job_id = job_id; - let __job_id = job_id; let finished_sender = self.sender.clone(); let injector = self.global_queue.clone(); // Create a task and schedule it for execution. @@ -166,11 +164,11 @@ impl JobExecutor { async move { let r = future.await; finished_sender - .send(ThreadEvent::JobFinished(__job_id)) + .send(ThreadEvent::JobFinished(job_id)) .unwrap(); r }, - move |task| injector.push(MeliTask { task, id: _job_id }), + move |task| injector.push(MeliTask { task, id: job_id }), (), ); task.schedule(); @@ -191,8 +189,6 @@ impl JobExecutor { let (sender, receiver) = oneshot::channel(); let finished_sender = self.sender.clone(); let job_id = JobId::new(); - let _job_id = job_id; - let __job_id = job_id; let injector = self.global_queue.clone(); // Create a task and schedule it for execution. let (task, handle) = async_task::spawn( @@ -200,11 +196,11 @@ impl JobExecutor { let res = future.await; let _ = sender.send(res); finished_sender - .send(ThreadEvent::JobFinished(__job_id)) + .send(ThreadEvent::JobFinished(job_id)) .unwrap(); Ok(()) }, - move |task| injector.push(MeliTask { task, id: _job_id }), + move |task| injector.push(MeliTask { task, id: job_id }), (), ); task.schedule(); diff --git a/src/managesieve.rs b/src/managesieve.rs index 5f23e7bf..55feb111 100644 --- a/src/managesieve.rs +++ b/src/managesieve.rs @@ -63,9 +63,6 @@ use crate::components::*; pub mod conf; use crate::conf::*; -pub mod workers; -use crate::workers::*; - #[cfg(feature = "sqlite3")] pub mod sqlite3; diff --git a/src/plugins.rs b/src/plugins.rs index 727733bc..39686688 100644 --- a/src/plugins.rs +++ b/src/plugins.rs @@ -28,7 +28,7 @@ use std::os::unix::net::{UnixListener, UnixStream}; use std::process::Stdio; use uuid::Uuid; -pub mod backend; +//pub mod backend; pub mod rpc; pub use rpc::*; diff --git a/src/plugins/backend.rs b/src/plugins/backend.rs index ed0413cf..1d18e423 100644 --- a/src/plugins/backend.rs +++ b/src/plugins/backend.rs @@ -191,17 +191,10 @@ impl MailBackend for PluginBackend { Ok(w.build(handle)) } - fn refresh(&mut self, _mailbox_hash: MailboxHash) -> Result> { - Err(MeliError::new("Unimplemented.")) - } - fn watch(&self, _work_context: WorkContext) -> Result { - Err(MeliError::new("Unimplemented.")) - } - - fn mailboxes(&self) -> Result> { + fn mailboxes(&self) -> ResultFuture> { let mut ret: HashMap = Default::default(); ret.insert(0, Mailbox::default()); - Ok(ret) + Ok(Box::pin(async { Ok(ret) })) } fn operation(&self, hash: EnvelopeHash) -> Result> { diff --git a/src/state.rs b/src/state.rs index 3ecec8d8..ac4faf8b 100644 --- a/src/state.rs +++ b/src/state.rs @@ -113,7 +113,6 @@ pub struct Context { sender: Sender, receiver: Receiver, input_thread: InputHandler, - work_controller: WorkController, job_executor: Arc, pub children: Vec, pub plugin_manager: PluginManager, @@ -167,10 +166,6 @@ impl Context { let idx = self.accounts.get_index_of(&account_hash).unwrap(); self.is_online_idx(idx) } - - pub fn work_controller(&self) -> &WorkController { - &self.work_controller - } } /// A State object to manage and own components and components of the UI. `State` is responsible for @@ -246,6 +241,7 @@ impl State { }; let mut plugin_manager = PluginManager::new(); for (_, p) in settings.plugins.clone() { + /* if crate::plugins::PluginKind::Backend == p.kind() { debug!("registering {:?}", &p); crate::plugins::backend::PluginBackend::register( @@ -254,6 +250,7 @@ impl State { &mut backends, ); } + */ plugin_manager.register(p)?; } @@ -261,7 +258,6 @@ impl State { let cols = termsize.0 as usize; let rows = termsize.1 as usize; - let work_controller = WorkController::new(sender.clone()); let job_executor = Arc::new(JobExecutor::new(sender.clone())); let accounts = { settings @@ -281,7 +277,6 @@ impl State { n.to_string(), a_s.clone(), &backends, - work_controller.get_context(), job_executor.clone(), sender.clone(), BackendEventConsumer::new(Arc::new( @@ -346,7 +341,6 @@ impl State { dirty_areas: VecDeque::with_capacity(5), replies: VecDeque::with_capacity(5), temp_files: Vec::new(), - work_controller, job_executor, children: vec![], plugin_manager, @@ -422,15 +416,6 @@ impl State { } } - pub fn new_thread(&mut self, id: thread::ThreadId, name: String) { - self.context - .work_controller - .static_threads - .lock() - .unwrap() - .insert(id, name.into()); - } - /// Switch back to the terminal's main screen (The command line the user sees before opening /// the application) pub fn switch_to_main_screen(&mut self) { diff --git a/src/types.rs b/src/types.rs index 8b1c740a..a3fa8873 100644 --- a/src/types.rs +++ b/src/types.rs @@ -44,7 +44,6 @@ use melib::backends::{AccountHash, BackendEvent, MailboxHash}; use melib::{EnvelopeHash, RefreshEvent, ThreadHash}; use nix::unistd::Pid; use std::fmt; -use std::thread; use uuid::Uuid; #[derive(Debug)] @@ -62,7 +61,6 @@ pub enum StatusEvent { /// to the main process. #[derive(Debug)] pub enum ThreadEvent { - NewThread(thread::ThreadId, String), /// User input. Input((Key, Vec)), /// User input and input as raw bytes. diff --git a/src/workers.rs b/src/workers.rs deleted file mode 100644 index c5846e00..00000000 --- a/src/workers.rs +++ /dev/null @@ -1,385 +0,0 @@ -/* - * meli - * - * Copyright 2017-2020 Manos Pitsidianakis - * - * This file is part of meli. - * - * meli is free software: you can redistribute it and/or modify - * it under the terms of the GNU General Public License as published by - * the Free Software Foundation, either version 3 of the License, or - * (at your option) any later version. - * - * meli is distributed in the hope that it will be useful, - * but WITHOUT ANY WARRANTY; without even the implied warranty of - * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the - * GNU General Public License for more details. - * - * You should have received a copy of the GNU General Public License - * along with meli. If not, see . - */ - -/*! Simple blocking job control. - */ -use crate::types::ThreadEvent; -use crossbeam::{ - channel::{bounded, unbounded, Sender}, - select, -}; -use melib::async_workers::{Work, WorkContext}; -use melib::datetime::{self, UnixTimestamp}; -use melib::text_processing::Truncate; -use std::collections::HashMap; -use std::sync::atomic::{AtomicUsize, Ordering}; -use std::sync::Arc; -use std::sync::Mutex; -use std::thread; - -const MAX_WORKER: usize = 4; - -/// Representation of a worker thread for use in `WorkController`. These values are to be displayed -/// to the user. -#[derive(Debug)] -pub struct Worker { - pub name: String, - pub status: String, - pub heartbeat: UnixTimestamp, -} - -impl From for Worker { - fn from(val: String) -> Self { - Worker { - name: val, - status: String::new(), - heartbeat: datetime::now(), - } - } -} - -pub struct WorkController { - pub queue: WorkQueue, - thread_end_tx: Sender, - /// Worker threads that take up on jobs from self.queue - pub threads: Arc>>, - /// Special function threads that live indefinitely (eg watching a mailbox) - pub static_threads: Arc>>, - work_context: WorkContext, -} - -impl Drop for WorkController { - fn drop(&mut self) { - if let Ok(lock) = self.threads.lock() { - for _ in 0..lock.len() { - let _ = self.thread_end_tx.send(true); - } - } - } -} - -#[derive(Clone)] -pub struct WorkQueue { - inner: Arc>>, - new_jobs_tx: Sender, - work_context: WorkContext, -} - -impl WorkQueue { - fn new(new_jobs_tx: Sender, work_context: WorkContext) -> Self { - Self { - inner: Arc::new(Mutex::new(Vec::new())), - new_jobs_tx, - work_context, - } - } - - /// Blocks the current thread until work is available, then - /// gets the data required to perform that work. - /// - /// # Errors - /// Returns None if there is no more work in the queue. - /// - /// # Panics - /// Panics if the underlying mutex became poisoned. This is exceedingly - /// unlikely. - fn get_work(&self) -> Option { - // try to get a lock on the mutex. - let maybe_queue = self.inner.lock(); - if let Ok(mut queue) = maybe_queue { - if queue.is_empty() { - return None; - } else { - return Some(queue.swap_remove(0)); - } - } else { - // poisoned mutex, some other thread holding the mutex has panicked! - panic!("WorkQueue::get_work() tried to lock a poisoned mutex"); - } - } - - // Both the controller (main thread) and workers can use this - // function to add work to the queue. - - /// Blocks the current thread until work can be added, then - /// adds that work to the end of the queue. - /// Returns the amount of work now in the queue. - /// - /// # Panics - /// Panics if the underlying mutex became poisoned. This is exceedingly - /// unlikely. - pub fn add_work(&self, work: Work) { - if work.is_static { - self.work_context.new_work.send(work).unwrap(); - return; - } - - // As above, try to get a lock on the mutex. - if let Ok(mut queue) = self.inner.lock() { - /* Insert in position that maintains the queue sorted */ - let pos = match queue.binary_search_by(|probe| probe.cmp(&work)) { - Ok(p) => p, - Err(p) => p, - }; - queue.insert(pos, work); - - /* inform threads that new job is available */ - self.new_jobs_tx.send(true).unwrap(); - } else { - panic!("WorkQueue::add_work() tried to lock a poisoned mutex"); - } - } -} - -impl WorkController { - pub fn new(pulse: Sender) -> WorkController { - let (new_jobs_tx, new_jobs_rx) = unbounded(); - - /* create a channel for jobs to send new work to Controller thread */ - let (new_work_tx, new_work_rx) = unbounded(); - - /* create a channel for jobs to set their names */ - let (set_name_tx, set_name_rx) = unbounded(); - - /* create a channel for jobs to set their statuses */ - let (set_status_tx, set_status_rx) = unbounded(); - - /* create a channel for jobs to announce their demise */ - let (finished_tx, finished_rx) = unbounded(); - - /* each associated thread will hold a copy of this context item in order to communicate - * with the controller thread */ - let work_context = WorkContext { - new_work: new_work_tx, - set_name: set_name_tx, - set_status: set_status_tx, - finished: finished_tx, - }; - - let queue: WorkQueue = WorkQueue::new(new_jobs_tx, work_context.clone()); - - let active_threads = Arc::new(AtomicUsize::new(MAX_WORKER)); - // Create a SyncFlag to share whether or not there are more jobs to be done. - let (thread_end_tx, thread_end_rx) = bounded(1); - - let threads_lock: Arc>> = - Arc::new(Mutex::new(HashMap::default())); - - let static_threads_lock: Arc>> = - Arc::new(Mutex::new(HashMap::default())); - - let mut threads = threads_lock.lock().unwrap(); - /* spawn worker threads */ - for _ in 0..MAX_WORKER { - /* Each worker thread will wait on two channels: thread_end and new_jobs. thread_end - * informs the worker that it should quit and new_jobs informs that there is a new job - * available inside the queue. Only one worker will get each job, and others will - * go back to waiting on the channels */ - let thread_queue = queue.clone(); - - let active_threads = active_threads.clone(); - let thread_end_rx = thread_end_rx.clone(); - let new_jobs_rx = new_jobs_rx.clone(); - let new_jobs_rx = new_jobs_rx.clone(); - - let work_context = work_context.clone(); - let pulse = pulse.clone(); - - let handle = spawn_worker( - thread_queue, - active_threads, - thread_end_rx, - new_jobs_rx, - work_context, - pulse, - ); - - /* add the handle for the newly spawned thread to the list of handles */ - threads.insert(handle.thread().id(), String::from("idle-worker").into()); - } - /* drop lock */ - drop(threads); - - { - /* start controller thread */ - let threads_lock = threads_lock.clone(); - let _static_threads_lock = static_threads_lock.clone(); - let thread_queue = queue.clone(); - let thread_end_rx = thread_end_rx.clone(); - let work_context = work_context.clone(); - - let handle = thread::spawn(move || 'control_loop: loop { - select! { - recv(thread_end_rx) -> _ => { - debug!("received thread_end_rx, quitting"); - break 'control_loop; - }, - recv(new_work_rx) -> work => { - if let Ok(work) = work { - if work.is_static { - let work_context = work_context.clone(); - let handle = thread::spawn(move || work.compute(work_context)); - _static_threads_lock.lock().unwrap().insert(handle.thread().id(), String::new().into()); - } else { - if active_threads.load(Ordering::SeqCst) == 0 { - let handle = spawn_worker( - thread_queue.clone(), - active_threads.clone(), - thread_end_rx.clone(), - new_jobs_rx.clone(), - work_context.clone(), - pulse.clone(), - ); - - /* add the handle for the newly spawned thread to the list of handles */ - threads_lock.lock().unwrap().insert(handle.thread().id(), String::from("idle-worker").into()); - } - thread_queue.add_work(work); - } - } - } - recv(set_name_rx) -> new_name => { - if let Ok((thread_id, mut new_name)) = new_name { - new_name.truncate_at_boundary(256); - let mut threads = threads_lock.lock().unwrap(); - let mut static_threads = _static_threads_lock.lock().unwrap(); - let now = datetime::now(); - if threads.contains_key(&thread_id) { - threads.entry(thread_id).and_modify(|e| { - e.name = new_name; - e.heartbeat = now; - }); - } else if static_threads.contains_key(&thread_id) { - static_threads.entry(thread_id).and_modify(|e| { - - e.name = new_name; - e.heartbeat = now; - }); - } else { - static_threads.insert(thread_id, Worker { heartbeat: now, .. new_name.into() }); - static_threads.entry(thread_id).and_modify(|e| { - e.heartbeat = now; - }); - } - pulse.send(ThreadEvent::Pulse).unwrap(); - } - } - recv(set_status_rx) -> new_status => { - if let Ok((thread_id, mut new_status)) = new_status { - new_status.truncate_at_boundary(256); - let mut threads = threads_lock.lock().unwrap(); - let mut static_threads = _static_threads_lock.lock().unwrap(); - let now = datetime::now(); - if threads.contains_key(&thread_id) { - threads.entry(thread_id).and_modify(|e| { - e.status = new_status; - e.heartbeat = now; - }); - } else if static_threads.contains_key(&thread_id) { - static_threads.entry(thread_id).and_modify(|e| { - e.status = new_status; - e.heartbeat = now; - }); - debug!(&static_threads[&thread_id]); - } else { - static_threads.insert(thread_id, Worker { status: new_status, heartbeat: now, .. String::new().into() }); - } - pulse.send(ThreadEvent::Pulse).unwrap(); - } - } - recv(finished_rx) -> dead_thread_id => { - if let Ok(thread_id) = dead_thread_id { - let mut threads = threads_lock.lock().unwrap(); - let mut static_threads = _static_threads_lock.lock().unwrap(); - if threads.contains_key(&thread_id) { - threads.remove(&thread_id); - } else if static_threads.contains_key(&thread_id) { - static_threads.remove(&thread_id); - } else { - /* Nothing to do */ - } - pulse.send(ThreadEvent::Pulse).unwrap(); - } - } - } - }); - - let mut static_threads = static_threads_lock.lock().unwrap(); - static_threads.insert( - handle.thread().id(), - "WorkController-thread".to_string().into(), - ); - } - - WorkController { - queue, - thread_end_tx, - threads: threads_lock, - static_threads: static_threads_lock, - work_context, - } - } - - pub fn add_static_thread(&mut self, id: std::thread::ThreadId) { - self.static_threads - .lock() - .unwrap() - .insert(id, String::new().into()); - } - - pub fn get_context(&self) -> WorkContext { - self.work_context.clone() - } -} - -fn spawn_worker( - thread_queue: WorkQueue, - active_threads: Arc, - thread_end_rx: crossbeam::Receiver, - new_jobs_rx: crossbeam::Receiver, - work_context: WorkContext, - pulse: crossbeam::Sender, -) -> std::thread::JoinHandle<()> { - thread::spawn(move || 'work_loop: loop { - debug!("Waiting for work"); - select! { - recv(thread_end_rx) -> _ => { - debug!("received thread_end_rx, quitting"); - active_threads.fetch_sub(1, Ordering::SeqCst); - break 'work_loop; - }, - recv(new_jobs_rx) -> _ => { - active_threads.fetch_sub(1, Ordering::SeqCst); - while let Some(work) = thread_queue.get_work() { - debug!("Got some work"); - work.compute(work_context.clone()); - debug!("finished work"); - work_context.set_name.send((std::thread::current().id(), "idle-worker".to_string())).unwrap(); - work_context.set_status.send((std::thread::current().id(), "inactive".to_string())).unwrap(); - pulse.send(ThreadEvent::Pulse).unwrap(); - - std::thread::yield_now(); - } - active_threads.fetch_add(1, Ordering::SeqCst); - }, - } - }) -}