From a19080538473bfd9225f9b22e0ae969cb75bb9c6 Mon Sep 17 00:00:00 2001 From: Manos Pitsidianakis Date: Thu, 20 Aug 2020 01:55:24 +0300 Subject: [PATCH] melib/backends: Add BackendEvent enum --- melib/src/backends.rs | 86 +++++++++-------------- melib/src/backends/imap.rs | 46 +++++-------- melib/src/backends/imap/connection.rs | 12 ++-- melib/src/backends/imap/managesieve.rs | 9 ++- melib/src/backends/jmap.rs | 13 ++-- melib/src/backends/jmap/connection.rs | 19 ++---- melib/src/backends/maildir/backend.rs | 75 ++++++++++---------- melib/src/backends/mbox.rs | 88 ++++++++++++++---------- melib/src/backends/nntp.rs | 43 +++++------- melib/src/backends/nntp/connection.rs | 12 ++-- melib/src/backends/notmuch.rs | 56 +++++++++------ melib/src/lib.rs | 4 +- src/conf/accounts.rs | 94 ++++---------------------- src/plugins/backend.rs | 17 ++--- src/state.rs | 47 ++++++++++--- src/types.rs | 4 +- testing/src/imap_conn.rs | 8 ++- 17 files changed, 286 insertions(+), 347 deletions(-) diff --git a/melib/src/backends.rs b/melib/src/backends.rs index 322599c7..e6954ec0 100644 --- a/melib/src/backends.rs +++ b/melib/src/backends.rs @@ -86,6 +86,7 @@ pub type BackendCreator = Box< dyn Fn( &AccountSettings, Box bool + Send + Sync>, + BackendEventConsumer, ) -> Result>, >; @@ -122,7 +123,7 @@ impl Backends { b.register( "maildir".to_string(), Backend { - create_fn: Box::new(|| Box::new(|f, i| MaildirType::new(f, i))), + create_fn: Box::new(|| Box::new(|f, i, ev| MaildirType::new(f, i, ev))), validate_conf_fn: Box::new(MaildirType::validate_config), }, ); @@ -132,7 +133,7 @@ impl Backends { b.register( "mbox".to_string(), Backend { - create_fn: Box::new(|| Box::new(|f, i| MboxType::new(f, i))), + create_fn: Box::new(|| Box::new(|f, i, ev| MboxType::new(f, i, ev))), validate_conf_fn: Box::new(MboxType::validate_config), }, ); @@ -142,14 +143,14 @@ impl Backends { b.register( "imap".to_string(), Backend { - create_fn: Box::new(|| Box::new(|f, i| imap::ImapType::new(f, i))), + create_fn: Box::new(|| Box::new(|f, i, ev| imap::ImapType::new(f, i, ev))), validate_conf_fn: Box::new(imap::ImapType::validate_config), }, ); b.register( "nntp".to_string(), Backend { - create_fn: Box::new(|| Box::new(|f, i| nntp::NntpType::new(f, i))), + create_fn: Box::new(|| Box::new(|f, i, ev| nntp::NntpType::new(f, i, ev))), validate_conf_fn: Box::new(nntp::NntpType::validate_config), }, ); @@ -160,7 +161,7 @@ impl Backends { b.register( "notmuch".to_string(), Backend { - create_fn: Box::new(|| Box::new(|f, i| NotmuchDb::new(f, i))), + create_fn: Box::new(|| Box::new(|f, i, ev| NotmuchDb::new(f, i, ev))), validate_conf_fn: Box::new(NotmuchDb::validate_config), }, ); @@ -171,7 +172,7 @@ impl Backends { b.register( "jmap".to_string(), Backend { - create_fn: Box::new(|| Box::new(|f, i| jmap::JmapType::new(f, i))), + create_fn: Box::new(|| Box::new(|f, i, ev| jmap::JmapType::new(f, i, ev))), validate_conf_fn: Box::new(jmap::JmapType::validate_config), }, ); @@ -215,6 +216,17 @@ impl Backends { } } +#[derive(Debug, Clone)] +pub enum BackendEvent { + Notice { + description: Option, + content: String, + level: crate::LoggingLevel, + }, + Refresh(RefreshEvent), + //Job(Box> + Send + 'static>) +} + #[derive(Debug, Clone)] pub enum RefreshEventKind { Update(EnvelopeHash, Box), @@ -249,45 +261,25 @@ impl RefreshEvent { } } -/// A `RefreshEventConsumer` is a boxed closure that must be used to consume a `RefreshEvent` and -/// send it to a UI provided channel. We need this level of abstraction to provide an interface for -/// all users of mailbox refresh events. -pub struct RefreshEventConsumer(Box () + Send + Sync>); -impl RefreshEventConsumer { - pub fn new(b: Box () + Send + Sync>) -> Self { - RefreshEventConsumer(b) - } - pub fn send(&self, r: RefreshEvent) { - self.0(r); +#[derive(Clone)] +pub struct BackendEventConsumer(Arc () + Send + Sync>); +impl BackendEventConsumer { + pub fn new(b: Arc () + Send + Sync>) -> Self { + BackendEventConsumer(b) } } -impl fmt::Debug for RefreshEventConsumer { +impl fmt::Debug for BackendEventConsumer { fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result { - write!(f, "RefreshEventConsumer") + write!(f, "BackendEventConsumer") } } -pub struct NotifyFn(Box () + Send + Sync>); +impl Deref for BackendEventConsumer { + type Target = dyn Fn(AccountHash, BackendEvent) -> () + Send + Sync; -impl fmt::Debug for NotifyFn { - fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result { - write!(f, "NotifyFn Box") - } -} - -impl From () + Send + Sync>> for NotifyFn { - fn from(kind: Box () + Send + Sync>) -> Self { - NotifyFn(kind) - } -} - -impl NotifyFn { - pub fn new(b: Box () + Send + Sync>) -> Self { - NotifyFn(b) - } - pub fn notify(&self, f: MailboxHash) { - self.0(f); + fn deref(&self) -> &Self::Target { + &(*self.0) } } @@ -325,26 +317,14 @@ pub trait MailBackend: ::std::fmt::Debug + Send + Sync { ) -> Result>> + Send + 'static>>> { Err(MeliError::new("Unimplemented.")) } - fn refresh( - &mut self, - _mailbox_hash: MailboxHash, - _sender: RefreshEventConsumer, - ) -> Result> { + fn refresh(&mut self, _mailbox_hash: MailboxHash) -> Result> { Err(MeliError::new("Unimplemented.")) } - fn refresh_async( - &mut self, - _mailbox_hash: MailboxHash, - _sender: RefreshEventConsumer, - ) -> ResultFuture<()> { + fn refresh_async(&mut self, _mailbox_hash: MailboxHash) -> ResultFuture<()> { Err(MeliError::new("Unimplemented.")) } - fn watch( - &self, - sender: RefreshEventConsumer, - work_context: WorkContext, - ) -> Result; - fn watch_async(&self, _sender: RefreshEventConsumer) -> ResultFuture<()> { + fn watch(&self, work_context: WorkContext) -> Result; + fn watch_async(&self) -> ResultFuture<()> { Err(MeliError::new("Unimplemented.")) } fn mailboxes(&self) -> Result>; diff --git a/melib/src/backends/imap.rs b/melib/src/backends/imap.rs index 6fce5a7b..a383b6a0 100644 --- a/melib/src/backends/imap.rs +++ b/melib/src/backends/imap.rs @@ -49,7 +49,6 @@ use futures::lock::Mutex as FutureMutex; use futures::stream::Stream; use std::collections::{hash_map::DefaultHasher, BTreeMap}; use std::collections::{BTreeSet, HashMap, HashSet}; -use std::future::Future; use std::hash::Hasher; use std::pin::Pin; use std::str::FromStr; @@ -153,16 +152,19 @@ pub struct UIDStore { mailboxes: Arc>>, is_online: Arc)>>, - refresh_events: Arc>>, - sender: Arc>>, + event_consumer: BackendEventConsumer, } -impl Default for UIDStore { - fn default() -> Self { +impl UIDStore { + fn new( + account_hash: AccountHash, + account_name: Arc, + event_consumer: BackendEventConsumer, + ) -> Self { UIDStore { - account_hash: 0, + account_hash, cache_headers: false, - account_name: Arc::new(String::new()), + account_name, capabilities: Default::default(), uidvalidity: Default::default(), hash_index: Default::default(), @@ -175,8 +177,7 @@ impl Default for UIDStore { Instant::now(), Err(MeliError::new("Account is uninitialised.")), ))), - refresh_events: Default::default(), - sender: Arc::new(RwLock::new(None)), + event_consumer, } } } @@ -288,13 +289,8 @@ impl MailBackend for ImapType { })) } - fn refresh_async( - &mut self, - mailbox_hash: MailboxHash, - sender: RefreshEventConsumer, - ) -> ResultFuture<()> { + fn refresh_async(&mut self, mailbox_hash: MailboxHash) -> ResultFuture<()> { let main_conn = self.connection.clone(); - *self.uid_store.sender.write().unwrap() = Some(sender); let uid_store = self.uid_store.clone(); Ok(Box::pin(async move { let inbox = timeout(Duration::from_secs(3), uid_store.mailboxes.lock()) @@ -383,27 +379,18 @@ impl MailBackend for ImapType { Err(MeliError::new("Unimplemented.")) } - fn refresh( - &mut self, - _mailbox_hash: MailboxHash, - _sender: RefreshEventConsumer, - ) -> Result> { + fn refresh(&mut self, _mailbox_hash: MailboxHash) -> Result> { Err(MeliError::new("Unimplemented.")) } - fn watch( - &self, - _sender: RefreshEventConsumer, - _work_context: WorkContext, - ) -> Result { + fn watch(&self, _work_context: WorkContext) -> Result { Err(MeliError::new("Unimplemented.")) } - fn watch_async(&self, sender: RefreshEventConsumer) -> ResultFuture<()> { + fn watch_async(&self) -> ResultFuture<()> { debug!("watch_async called"); let conn = ImapConnection::new_connection(&self.server_conf, self.uid_store.clone()); let main_conn = self.connection.clone(); - *self.uid_store.sender.write().unwrap() = Some(sender); let uid_store = self.uid_store.clone(); let has_idle: bool = match self.server_conf.protocol { ImapProtocol::IMAP { @@ -1132,6 +1119,7 @@ impl ImapType { pub fn new( s: &AccountSettings, is_subscribed: Box bool + Send + Sync>, + event_consumer: BackendEventConsumer, ) -> Result> { let server_hostname = get_conf_val!(s["server_hostname"])?; let server_username = get_conf_val!(s["server_username"])?; @@ -1184,10 +1172,8 @@ impl ImapType { }; let account_name = Arc::new(s.name().to_string()); let uid_store: Arc = Arc::new(UIDStore { - account_hash, cache_headers: get_conf_val!(s["X_header_caching"], false)?, - account_name, - ..UIDStore::default() + ..UIDStore::new(account_hash, account_name, event_consumer) }); let connection = ImapConnection::new_connection(&server_conf, uid_store.clone()); diff --git a/melib/src/backends/imap/connection.rs b/melib/src/backends/imap/connection.rs index c4a35450..db034f4e 100644 --- a/melib/src/backends/imap/connection.rs +++ b/melib/src/backends/imap/connection.rs @@ -758,14 +758,10 @@ impl ImapConnection { } pub fn add_refresh_event(&mut self, ev: crate::backends::RefreshEvent) { - if let Some(ref sender) = self.uid_store.sender.read().unwrap().as_ref() { - sender.send(ev); - for ev in self.uid_store.refresh_events.lock().unwrap().drain(..) { - sender.send(ev); - } - } else { - self.uid_store.refresh_events.lock().unwrap().push(ev); - } + (self.uid_store.event_consumer)( + self.uid_store.account_hash, + crate::backends::BackendEvent::Refresh(ev), + ); } pub async fn create_uid_msn_cache( diff --git a/melib/src/backends/imap/managesieve.rs b/melib/src/backends/imap/managesieve.rs index 18511bc5..1417d69b 100644 --- a/melib/src/backends/imap/managesieve.rs +++ b/melib/src/backends/imap/managesieve.rs @@ -85,7 +85,12 @@ pub trait ManageSieve { fn renamescript(&mut self) -> Result<()>; } -pub fn new_managesieve_connection(s: &AccountSettings) -> Result { +pub fn new_managesieve_connection( + account_hash: crate::backends::AccountHash, + account_name: String, + s: &AccountSettings, + event_consumer: crate::backends::BackendEventConsumer, +) -> Result { let server_hostname = get_conf_val!(s["server_hostname"])?; let server_username = get_conf_val!(s["server_username"])?; let server_password = get_conf_val!(s["server_password"])?; @@ -106,7 +111,7 @@ pub fn new_managesieve_connection(s: &AccountSettings) -> Result Instant::now(), Err(MeliError::new("Account is uninitialised.")), ))), - ..Default::default() + ..UIDStore::new(account_hash, Arc::new(account_name), event_consumer) }); Ok(ImapConnection::new_connection(&server_conf, uid_store)) } diff --git a/melib/src/backends/jmap.rs b/melib/src/backends/jmap.rs index 92cbf8cf..8c07f442 100644 --- a/melib/src/backends/jmap.rs +++ b/melib/src/backends/jmap.rs @@ -243,10 +243,8 @@ impl MailBackend for JmapType { })) } - fn watch_async(&self, sender: RefreshEventConsumer) -> ResultFuture<()> { - let conn = self.connection.clone(); + fn watch_async(&self) -> ResultFuture<()> { Ok(Box::pin(async move { - *conn.lock().await.sender.lock().unwrap() = Some(sender); Err(MeliError::from("JMAP watch for updates is unimplemented")) })) } @@ -360,11 +358,7 @@ impl MailBackend for JmapType { Err(MeliError::new("Unimplemented.")) } - fn watch( - &self, - _sender: RefreshEventConsumer, - _work_context: WorkContext, - ) -> Result { + fn watch(&self, _work_context: WorkContext) -> Result { Err(MeliError::new("Unimplemented.")) } @@ -535,6 +529,7 @@ impl JmapType { pub fn new( s: &AccountSettings, is_subscribed: Box bool + Send + Sync>, + event_consumer: BackendEventConsumer, ) -> Result> { let online = Arc::new(FutureMutex::new(( std::time::Instant::now(), @@ -553,6 +548,8 @@ impl JmapType { Ok(Box::new(JmapType { connection: Arc::new(FutureMutex::new(JmapConnection::new( &server_conf, + account_hash, + event_consumer, online.clone(), )?)), store: Arc::new(RwLock::new(Store::default())), diff --git a/melib/src/backends/jmap/connection.rs b/melib/src/backends/jmap/connection.rs index 4c17e93b..415795eb 100644 --- a/melib/src/backends/jmap/connection.rs +++ b/melib/src/backends/jmap/connection.rs @@ -30,14 +30,16 @@ pub struct JmapConnection { pub online_status: Arc)>>, pub server_conf: JmapServerConf, pub account_id: Arc>, + pub account_hash: AccountHash, pub method_call_states: Arc>>, - pub refresh_events: Arc>>, - pub sender: Arc>>, + pub event_consumer: BackendEventConsumer, } impl JmapConnection { pub fn new( server_conf: &JmapServerConf, + account_hash: AccountHash, + event_consumer: BackendEventConsumer, online_status: Arc)>>, ) -> Result { let client = HttpClient::builder() @@ -56,9 +58,9 @@ impl JmapConnection { online_status, server_conf, account_id: Arc::new(Mutex::new(String::new())), + account_hash, + event_consumer, method_call_states: Arc::new(Mutex::new(Default::default())), - refresh_events: Arc::new(Mutex::new(Default::default())), - sender: Arc::new(Mutex::new(Default::default())), }) } @@ -116,13 +118,6 @@ impl JmapConnection { } pub fn add_refresh_event(&self, event: RefreshEvent) { - if let Some(ref sender) = self.sender.lock().unwrap().as_ref() { - for event in self.refresh_events.lock().unwrap().drain(..) { - sender.send(event); - } - sender.send(event); - } else { - self.refresh_events.lock().unwrap().push(event); - } + (self.event_consumer)(self.account_hash, BackendEvent::Refresh(event)); } } diff --git a/melib/src/backends/maildir/backend.rs b/melib/src/backends/maildir/backend.rs index 09962aed..f159ce2f 100644 --- a/melib/src/backends/maildir/backend.rs +++ b/melib/src/backends/maildir/backend.rs @@ -111,6 +111,7 @@ pub struct MaildirType { mailboxes: HashMap, mailbox_index: Arc>>, hash_indexes: HashIndexes, + event_consumer: BackendEventConsumer, path: PathBuf, } @@ -229,7 +230,6 @@ impl MailBackend for MaildirType { fn refresh( &mut self, mailbox_hash: MailboxHash, - sender: RefreshEventConsumer, ) -> Result> { let w = AsyncBuilder::new(); let cache_dir = xdg::BaseDirectories::with_profile("meli", &self.name).unwrap(); @@ -238,6 +238,7 @@ impl MailBackend for MaildirType { hasher.write(self.name.as_bytes()); hasher.finish() }; + let sender = self.event_consumer.clone(); let handle = { let mailbox: &MaildirMailbox = &self.mailboxes[&mailbox_hash]; @@ -252,7 +253,7 @@ impl MailBackend for MaildirType { .set_name .send((std::thread::current().id(), name.clone())) .unwrap(); - let thunk = move |sender: &RefreshEventConsumer| { + let thunk = move |sender: &BackendEventConsumer| { debug!("refreshing"); let mut path = path.clone(); path.push("new"); @@ -312,11 +313,11 @@ impl MailBackend for MaildirType { let writer = io::BufWriter::new(f); bincode::serialize_into(writer, &e).unwrap(); } - sender.send(RefreshEvent { + (sender)(account_hash, BackendEvent::Refresh(RefreshEvent { account_hash, mailbox_hash, kind: Create(Box::new(e)), - }); + })); } else { debug!( "DEBUG: hash {}, path: {} couldn't be parsed", @@ -326,21 +327,21 @@ impl MailBackend for MaildirType { continue; } } - for ev in current_hashes.into_iter().map(|h| RefreshEvent { + for ev in current_hashes.into_iter().map(|h| BackendEvent::Refresh(RefreshEvent { account_hash, mailbox_hash, kind: Remove(h), - }) { - sender.send(ev); + })) { + (sender)(account_hash, ev); } Ok(()) }; if let Err(err) = thunk(&sender) { - sender.send(RefreshEvent { + (sender)(account_hash, BackendEvent::Refresh(RefreshEvent { account_hash, mailbox_hash, kind: Failure(err), - }); + })); } }) }; @@ -348,9 +349,9 @@ impl MailBackend for MaildirType { } fn watch( &self, - sender: RefreshEventConsumer, work_context: WorkContext, ) -> Result { + let sender = self.event_consumer.clone(); let (tx, rx) = channel(); let mut watcher = watcher(tx, Duration::from_secs(2)).unwrap(); let account_hash = { @@ -425,15 +426,15 @@ impl MailBackend for MaildirType { env.subject(), pathbuf.display() ); - if !env.is_seen() { - *mailbox_counts[&mailbox_hash].0.lock().unwrap() += 1; - } - *mailbox_counts[&mailbox_hash].1.lock().unwrap() += 1; - sender.send(RefreshEvent { + if !env.is_seen() { + *mailbox_counts[&mailbox_hash].0.lock().unwrap() += 1; + } + *mailbox_counts[&mailbox_hash].1.lock().unwrap() += 1; + (sender)(account_hash, BackendEvent::Refresh(RefreshEvent { account_hash, mailbox_hash, kind: Create(Box::new(env)), - }); + })); } } /* Update */ @@ -469,11 +470,11 @@ impl MailBackend for MaildirType { file_name, ) { mailbox_index.lock().unwrap().insert(env.hash(),mailbox_hash); - sender.send(RefreshEvent { + (sender)(account_hash, BackendEvent::Refresh(RefreshEvent { account_hash, mailbox_hash, kind: Create(Box::new(env)), - }); + })); } return; } @@ -496,11 +497,11 @@ impl MailBackend for MaildirType { /* Send Write notice */ - sender.send(RefreshEvent { + (sender)(account_hash, BackendEvent::Refresh(RefreshEvent { account_hash, mailbox_hash, kind: Update(old_hash, Box::new(env)), - }); + })); } } } @@ -546,11 +547,11 @@ impl MailBackend for MaildirType { e.removed = true; }); - sender.send(RefreshEvent { + (sender)(account_hash, BackendEvent::Refresh(RefreshEvent { account_hash, mailbox_hash, kind: Remove(hash), - }); + })); } /* Envelope hasn't changed */ DebouncedEvent::Rename(src, dest) => { @@ -577,11 +578,11 @@ impl MailBackend for MaildirType { debug!(&e.modified); e.modified = Some(PathMod::Hash(new_hash)); }); - sender.send(RefreshEvent { + (sender)(account_hash, BackendEvent::Refresh(RefreshEvent { account_hash, mailbox_hash: get_path_hash!(dest), kind: Rename(old_hash, new_hash), - }); + })); if !was_seen && is_seen { let mut lck = mailbox_counts[&mailbox_hash].0.lock().unwrap(); *lck = lck.saturating_sub(1); @@ -589,11 +590,11 @@ impl MailBackend for MaildirType { *mailbox_counts[&mailbox_hash].0.lock().unwrap() += 1; } if old_flags != new_flags { - sender.send(RefreshEvent { + (sender)(account_hash, BackendEvent::Refresh(RefreshEvent { account_hash, mailbox_hash: get_path_hash!(dest), kind: NewFlags(new_hash, (new_flags, vec![])), - }); + })); } mailbox_index.lock().unwrap().insert(new_hash,get_path_hash!(dest) ); index_lock.insert(new_hash, dest.into()); @@ -642,11 +643,11 @@ impl MailBackend for MaildirType { *mailbox_counts[&mailbox_hash].0.lock().unwrap() += 1; } *mailbox_counts[&mailbox_hash].1.lock().unwrap() += 1; - sender.send(RefreshEvent { + (sender)(account_hash, BackendEvent::Refresh(RefreshEvent { account_hash, mailbox_hash, kind: Create(Box::new(env)), - }); + })); continue; } else { debug!("not valid email"); @@ -655,36 +656,36 @@ impl MailBackend for MaildirType { if was_seen && !is_seen { *mailbox_counts[&mailbox_hash].0.lock().unwrap() += 1; } - sender.send(RefreshEvent { + (sender)(account_hash, BackendEvent::Refresh(RefreshEvent { account_hash, mailbox_hash: get_path_hash!(dest), kind: Rename(old_hash, new_hash), - }); + })); debug!("contains_new_key"); if old_flags != new_flags { - sender.send(RefreshEvent { + (sender)(account_hash, BackendEvent::Refresh(RefreshEvent { account_hash, mailbox_hash: get_path_hash!(dest), kind: NewFlags(new_hash, (new_flags, vec![])), - }); + })); } } /* Maybe a re-read should be triggered here just to be safe. - sender.send(RefreshEvent { + (sender)(account_hash, BackendEvent::Refresh(RefreshEvent { account_hash, mailbox_hash: get_path_hash!(dest), kind: Rescan, - }); + })); */ } /* Trigger rescan of mailbox */ DebouncedEvent::Rescan => { - sender.send(RefreshEvent { + (sender)(account_hash, BackendEvent::Refresh(RefreshEvent { account_hash, mailbox_hash: root_mailbox_hash, kind: Rescan, - }); + })); } _ => {} }, @@ -871,6 +872,7 @@ impl MaildirType { pub fn new( settings: &AccountSettings, is_subscribed: Box bool>, + event_consumer: BackendEventConsumer, ) -> Result> { let mut mailboxes: HashMap = Default::default(); fn recurse_mailboxes>( @@ -1012,6 +1014,7 @@ impl MaildirType { mailboxes, hash_indexes: Arc::new(Mutex::new(hash_indexes)), mailbox_index: Default::default(), + event_consumer, path: root_path, })) } diff --git a/melib/src/backends/mbox.rs b/melib/src/backends/mbox.rs index 16f0bcd9..c0224807 100644 --- a/melib/src/backends/mbox.rs +++ b/melib/src/backends/mbox.rs @@ -684,13 +684,14 @@ impl<'a> Iterator for MessageIterator<'a> { } /// Mbox backend -#[derive(Debug, Default)] +#[derive(Debug)] pub struct MboxType { account_name: String, path: PathBuf, mailbox_index: Arc>>, mailboxes: Arc>>, prefer_mbox_type: Option, + event_consumer: BackendEventConsumer, } impl MailBackend for MboxType { @@ -795,11 +796,8 @@ impl MailBackend for MboxType { Ok(w.build(handle)) } - fn watch( - &self, - sender: RefreshEventConsumer, - work_context: WorkContext, - ) -> Result { + fn watch(&self, work_context: WorkContext) -> Result { + let sender = self.event_consumer.clone(); let (tx, rx) = channel(); let mut watcher = watcher(tx, std::time::Duration::from_secs(10)) .map_err(|e| e.to_string()) @@ -873,19 +871,25 @@ impl MailBackend for MboxType { let mut mailbox_index_lck = mailbox_index.lock().unwrap(); for env in envelopes { mailbox_index_lck.insert(env.hash(), mailbox_hash); - sender.send(RefreshEvent { + (sender)( account_hash, - mailbox_hash, - kind: RefreshEventKind::Create(Box::new(env)), - }); + BackendEvent::Refresh(RefreshEvent { + account_hash, + mailbox_hash, + kind: RefreshEventKind::Create(Box::new(env)), + }), + ); } } } else { - sender.send(RefreshEvent { + (sender)( account_hash, - mailbox_hash, - kind: RefreshEventKind::Rescan, - }); + BackendEvent::Refresh(RefreshEvent { + account_hash, + mailbox_hash, + kind: RefreshEventKind::Rescan, + }), + ); } mailbox_lock .entry(mailbox_hash) @@ -901,14 +905,19 @@ impl MailBackend for MboxType { .any(|f| f.fs_path == pathbuf) { let mailbox_hash = get_path_hash!(&pathbuf); - sender.send(RefreshEvent { + (sender)( account_hash, - mailbox_hash, - kind: RefreshEventKind::Failure(MeliError::new(format!( - "mbox mailbox {} was removed.", - pathbuf.display() - ))), - }); + BackendEvent::Refresh(RefreshEvent { + account_hash, + mailbox_hash, + kind: RefreshEventKind::Failure(MeliError::new( + format!( + "mbox mailbox {} was removed.", + pathbuf.display() + ), + )), + }), + ); return; } } @@ -920,26 +929,34 @@ impl MailBackend for MboxType { .any(|f| &f.fs_path == &src) { let mailbox_hash = get_path_hash!(&src); - sender.send(RefreshEvent { + (sender)( account_hash, - mailbox_hash, - kind: RefreshEventKind::Failure(MeliError::new(format!( - "mbox mailbox {} was renamed to {}.", - src.display(), - dest.display() - ))), - }); + BackendEvent::Refresh(RefreshEvent { + account_hash, + mailbox_hash, + kind: RefreshEventKind::Failure(MeliError::new( + format!( + "mbox mailbox {} was renamed to {}.", + src.display(), + dest.display() + ), + )), + }), + ); return; } } /* Trigger rescan of mailboxes */ DebouncedEvent::Rescan => { for &mailbox_hash in mailboxes.lock().unwrap().keys() { - sender.send(RefreshEvent { + (sender)( account_hash, - mailbox_hash, - kind: RefreshEventKind::Rescan, - }); + BackendEvent::Refresh(RefreshEvent { + account_hash, + mailbox_hash, + kind: RefreshEventKind::Rescan, + }), + ); } return; } @@ -1023,6 +1040,7 @@ impl MboxType { pub fn new( s: &AccountSettings, _is_subscribed: Box bool>, + event_consumer: BackendEventConsumer, ) -> Result> { let path = Path::new(s.root_mailbox.as_str()).expand(); if !path.exists() { @@ -1035,6 +1053,7 @@ impl MboxType { let prefer_mbox_type: String = get_conf_val!(s["prefer_mbox_type"], "auto".to_string())?; let ret = MboxType { account_name: s.name().to_string(), + event_consumer, path, prefer_mbox_type: match prefer_mbox_type.as_str() { "auto" => None, @@ -1050,7 +1069,8 @@ impl MboxType { ))) } }, - ..Default::default() + mailbox_index: Default::default(), + mailboxes: Default::default(), }; let name: String = ret .path diff --git a/melib/src/backends/nntp.rs b/melib/src/backends/nntp.rs index 2aa07343..6789e158 100644 --- a/melib/src/backends/nntp.rs +++ b/melib/src/backends/nntp.rs @@ -94,15 +94,19 @@ pub struct UIDStore { mailboxes: Arc>>, is_online: Arc)>>, - refresh_events: Arc>>, - sender: Arc>>, + event_consumer: BackendEventConsumer, } -impl Default for UIDStore { - fn default() -> Self { +impl UIDStore { + fn new( + account_hash: AccountHash, + account_name: Arc, + event_consumer: BackendEventConsumer, + ) -> Self { UIDStore { - account_hash: 0, - account_name: Arc::new(String::new()), + account_hash, + account_name, + event_consumer, offline_cache: false, capabilities: Default::default(), hash_index: Default::default(), @@ -112,8 +116,6 @@ impl Default for UIDStore { Instant::now(), Err(MeliError::new("Account is uninitialised.")), ))), - refresh_events: Default::default(), - sender: Arc::new(RwLock::new(None)), } } } @@ -209,11 +211,7 @@ impl MailBackend for NntpType { })) } - fn refresh_async( - &mut self, - _mailbox_hash: MailboxHash, - _sender: RefreshEventConsumer, - ) -> ResultFuture<()> { + fn refresh_async(&mut self, _mailbox_hash: MailboxHash) -> ResultFuture<()> { Err(MeliError::new("Unimplemented.")) } @@ -254,23 +252,15 @@ impl MailBackend for NntpType { Err(MeliError::new("Unimplemented.")) } - fn refresh( - &mut self, - _mailbox_hash: MailboxHash, - _sender: RefreshEventConsumer, - ) -> Result> { + fn refresh(&mut self, _mailbox_hash: MailboxHash) -> Result> { Err(MeliError::new("Unimplemented.")) } - fn watch( - &self, - _sender: RefreshEventConsumer, - _work_context: WorkContext, - ) -> Result { + fn watch(&self, _work_context: WorkContext) -> Result { Err(MeliError::new("Unimplemented.")) } - fn watch_async(&self, _sender: RefreshEventConsumer) -> ResultFuture<()> { + fn watch_async(&self) -> ResultFuture<()> { Err(MeliError::new("Unimplemented.")) } @@ -388,6 +378,7 @@ impl NntpType { pub fn new( s: &AccountSettings, is_subscribed: Box bool + Send + Sync>, + event_consumer: BackendEventConsumer, ) -> Result> { let server_hostname = get_conf_val!(s["server_hostname"])?; /*let server_username = get_conf_val!(s["server_username"], "")?; @@ -469,11 +460,9 @@ impl NntpType { ))); } let uid_store: Arc = Arc::new(UIDStore { - account_hash, - account_name, offline_cache: false, //get_conf_val!(s["X_header_caching"], false)?, mailboxes: Arc::new(FutureMutex::new(mailboxes)), - ..UIDStore::default() + ..UIDStore::new(account_hash, account_name, event_consumer) }); let connection = NntpConnection::new_connection(&server_conf, uid_store.clone()); diff --git a/melib/src/backends/nntp/connection.rs b/melib/src/backends/nntp/connection.rs index 5eaa8396..8d19b54b 100644 --- a/melib/src/backends/nntp/connection.rs +++ b/melib/src/backends/nntp/connection.rs @@ -476,14 +476,10 @@ impl NntpConnection { } pub fn add_refresh_event(&mut self, ev: crate::backends::RefreshEvent) { - if let Some(ref sender) = self.uid_store.sender.read().unwrap().as_ref() { - sender.send(ev); - for ev in self.uid_store.refresh_events.lock().unwrap().drain(..) { - sender.send(ev); - } - } else { - self.uid_store.refresh_events.lock().unwrap().push(ev); - } + (self.uid_store.event_consumer)( + self.uid_store.account_hash, + crate::backends::BackendEvent::Refresh(ev), + ); } pub async fn select_group( diff --git a/melib/src/backends/notmuch.rs b/melib/src/backends/notmuch.rs index 54e57196..c5b74c8a 100644 --- a/melib/src/backends/notmuch.rs +++ b/melib/src/backends/notmuch.rs @@ -107,6 +107,7 @@ pub struct NotmuchDb { tag_index: Arc>>, path: PathBuf, account_name: String, + event_consumer: BackendEventConsumer, save_messages_to: Option, } @@ -187,6 +188,7 @@ impl NotmuchDb { pub fn new( s: &AccountSettings, _is_subscribed: Box bool>, + event_consumer: BackendEventConsumer, ) -> Result> { let lib = Arc::new(libloading::Library::new("libnotmuch.so.5")?); let path = Path::new(s.root_mailbox.as_str()).expand(); @@ -239,6 +241,7 @@ impl NotmuchDb { mailboxes: Arc::new(RwLock::new(mailboxes)), save_messages_to: None, account_name: s.name().to_string(), + event_consumer, })) } @@ -424,14 +427,11 @@ impl MailBackend for NotmuchDb { Ok(w.build(handle)) } - fn watch( - &self, - sender: RefreshEventConsumer, - _work_context: WorkContext, - ) -> Result { + fn watch(&self, _work_context: WorkContext) -> Result { extern crate notify; use crate::backends::RefreshEventKind::*; use notify::{watcher, RecursiveMode, Watcher}; + let sender = self.event_consumer.clone(); let (tx, rx) = std::sync::mpsc::channel(); let mut watcher = watcher(tx, std::time::Duration::from_secs(2)).unwrap(); watcher.watch(&self.path, RecursiveMode::Recursive).unwrap(); @@ -463,7 +463,7 @@ impl MailBackend for NotmuchDb { .name(format!("watching {}", self.account_name)) .spawn(move || { let _watcher = watcher; - let c = move |sender: &RefreshEventConsumer| -> std::result::Result<(), MeliError> { + let c = move |sender: &BackendEventConsumer| -> std::result::Result<(), MeliError> { loop { let _ = rx.recv().map_err(|err| err.to_string())?; { @@ -512,11 +512,14 @@ impl MailBackend for NotmuchDb { } } for &mailbox_hash in mailbox_hashes { - sender.send(RefreshEvent { + (sender)( account_hash, - mailbox_hash, - kind: NewFlags(env_hash, tags.clone()), - }); + BackendEvent::Refresh(RefreshEvent { + account_hash, + mailbox_hash, + kind: NewFlags(env_hash, tags.clone()), + }), + ); } } else { match notmuch_message_into_envelope( @@ -548,11 +551,14 @@ impl MailBackend for NotmuchDb { if !env.is_seen() { *unseen_lck += 1; } - sender.send(RefreshEvent { + (sender)( account_hash, - mailbox_hash, - kind: Create(Box::new(env.clone())), - }); + BackendEvent::Refresh(RefreshEvent { + account_hash, + mailbox_hash, + kind: Create(Box::new(env.clone())), + }), + ); } } } @@ -587,11 +593,14 @@ impl MailBackend for NotmuchDb { let m = &mailboxes_lck[&mailbox_hash]; let mut total_lck = m.total.lock().unwrap(); *total_lck = total_lck.saturating_sub(1); - sender.send(RefreshEvent { + (sender)( account_hash, - mailbox_hash, - kind: Remove(env_hash), - }); + BackendEvent::Refresh(RefreshEvent { + account_hash, + mailbox_hash, + kind: Remove(env_hash), + }), + ); } } } @@ -606,11 +615,14 @@ impl MailBackend for NotmuchDb { }; if let Err(err) = c(&sender) { - sender.send(RefreshEvent { + (sender)( account_hash, - mailbox_hash: 0, - kind: Failure(err), - }); + BackendEvent::Refresh(RefreshEvent { + account_hash, + mailbox_hash: 0, + kind: Failure(err), + }), + ); } })?; Ok(handle.thread().id()) diff --git a/melib/src/lib.rs b/melib/src/lib.rs index cf45e9c5..106f3e44 100644 --- a/melib/src/lib.rs +++ b/melib/src/lib.rs @@ -139,7 +139,9 @@ pub use smallvec; pub use futures; pub use smol; -pub use crate::backends::{Backends, RefreshEvent, RefreshEventConsumer, SpecialUsageMailbox}; +pub use crate::backends::{ + BackendEvent, BackendEventConsumer, Backends, RefreshEvent, SpecialUsageMailbox, +}; pub use crate::collection::*; pub use crate::conf::*; pub use crate::email::{Envelope, EnvelopeHash, Flag}; diff --git a/src/conf/accounts.rs b/src/conf/accounts.rs index b70458f2..333688f2 100644 --- a/src/conf/accounts.rs +++ b/src/conf/accounts.rs @@ -161,7 +161,6 @@ pub struct Account { pub active_job_instants: BTreeMap, sender: Sender, event_queue: VecDeque<(MailboxHash, RefreshEvent)>, - notify_fn: Arc, pub backend_capabilities: MailBackendCapabilities, } @@ -358,7 +357,7 @@ impl Account { work_context: WorkContext, job_executor: Arc, sender: Sender, - notify_fn: NotifyFn, + event_consumer: BackendEventConsumer, ) -> Result { let s = settings.clone(); let backend = map.get(settings.account().format())( @@ -372,8 +371,8 @@ impl Account { .iter() .any(|m| path.matches_glob(m)) }), + event_consumer, )?; - let notify_fn = Arc::new(notify_fn); let data_dir = xdg::BaseDirectories::with_profile("meli", &name).unwrap(); let mut address_book = AddressBook::with_account(&settings.account()); @@ -425,7 +424,6 @@ impl Account { collection: Default::default(), work_context, settings, - notify_fn, sender, job_executor, active_jobs, @@ -592,18 +590,14 @@ impl Account { .insert(std::time::Instant::now(), job_id); } } else { - entry.worker = match Account::new_worker( - &f, - &mut self.backend, - &self.work_context, - self.notify_fn.clone(), - ) { - Ok(v) => v, - Err(err) => { - entry.status = MailboxStatus::Failed(err); - None - } - }; + entry.worker = + match Account::new_worker(&f, &mut self.backend, &self.work_context) { + Ok(v) => v, + Err(err) => { + entry.status = MailboxStatus::Failed(err); + None + } + }; } } }); @@ -622,12 +616,9 @@ impl Account { mailbox: &Mailbox, backend: &Arc>>, work_context: &WorkContext, - notify_fn: Arc, ) -> Result { let mailbox_hash = mailbox.hash(); let mut mailbox_handle = backend.write().unwrap().fetch(mailbox_hash)?; - let mut builder = AsyncBuilder::new(); - let our_tx = builder.tx(); let priority = match mailbox.special_usage() { SpecialUsageMailbox::Inbox => 0, SpecialUsageMailbox::Sent => 1, @@ -644,51 +635,7 @@ impl Account { } }; - /* This polling closure needs to be 'static', that is to spawn its own thread instead of - * being assigned to a worker thread. Otherwise the polling closures could fill up the - * workers causing no actual parsing to be done. If we could yield from within the worker - * threads' closures this could be avoided, but it requires green threads. - */ - let name = format!("Parsing {}", mailbox.path()); - builder.set_priority(priority).set_is_static(true); - let mut w = builder.build(Box::new(move |work_context| { - let work = mailbox_handle.work().unwrap(); - work_context.new_work.send(work).unwrap(); - let thread_id = std::thread::current().id(); - work_context.set_name.send((thread_id, name)).unwrap(); - work_context - .set_status - .send((thread_id, "Waiting for subworkers..".to_string())) - .unwrap(); - - loop { - match debug!(mailbox_handle.poll_block()) { - Ok(s @ AsyncStatus::Payload(_)) => { - our_tx.send(s).unwrap(); - debug!("notifying for {}", mailbox_hash); - notify_fn.notify(mailbox_hash); - } - Ok(s @ AsyncStatus::Finished) => { - our_tx.send(s).unwrap(); - notify_fn.notify(mailbox_hash); - debug!("exiting"); - work_context.finished.send(thread_id).unwrap(); - return; - } - Ok(s) => { - our_tx.send(s).unwrap(); - } - Err(_) => { - debug!("poll error"); - return; - } - } - } - })); - if let Some(w) = w.work() { - work_context.new_work.send(w).unwrap(); - } - Ok(Some(w)) + todo!() } pub fn reload(&mut self, event: RefreshEvent, mailbox_hash: MailboxHash) -> Option { if !self.mailbox_entries[&mailbox_hash].status.is_available() { @@ -879,7 +826,6 @@ impl Account { &self.mailbox_entries[&mailbox_hash].ref_mailbox, &mut self.backend, &self.work_context, - self.notify_fn.clone(), ) { Ok(v) => v, Err(err) => { @@ -945,12 +891,8 @@ impl Account { .unwrap(); return Ok(()); } - let sender_ = self.sender.clone(); - let r = RefreshEventConsumer::new(Box::new(move |r| { - sender_.send(ThreadEvent::from(r)).unwrap(); - })); if self.backend_capabilities.is_async { - if let Ok(refresh_job) = self.backend.write().unwrap().refresh_async(mailbox_hash, r) { + if let Ok(refresh_job) = self.backend.write().unwrap().refresh_async(mailbox_hash) { let (rcvr, handle, job_id) = self.job_executor.spawn_specialized(refresh_job); self.sender .send(ThreadEvent::UIEvent(UIEvent::StatusEvent( @@ -963,7 +905,7 @@ impl Account { .insert(std::time::Instant::now(), job_id); } } else { - let mut h = self.backend.write().unwrap().refresh(mailbox_hash, r)?; + let mut h = self.backend.write().unwrap().refresh(mailbox_hash)?; self.work_context.new_work.send(h.work().unwrap()).unwrap(); } Ok(()) @@ -973,13 +915,9 @@ impl Account { return; } - let sender_ = self.sender.clone(); - let r = RefreshEventConsumer::new(Box::new(move |r| { - sender_.send(ThreadEvent::from(r)).unwrap(); - })); if self.backend_capabilities.is_async { if !self.active_jobs.values().any(|j| j.is_watch()) { - match self.backend.read().unwrap().watch_async(r) { + match self.backend.read().unwrap().watch_async() { Ok(fut) => { let (handle, job_id) = self.job_executor.spawn(fut); self.active_jobs.insert(job_id, JobRequest::Watch(handle)); @@ -998,7 +936,7 @@ impl Account { .backend .read() .unwrap() - .watch(r, self.work_context.clone()) + .watch(self.work_context.clone()) { Ok(id) => { self.sender @@ -1115,7 +1053,6 @@ impl Account { &self.mailbox_entries[&mailbox_hash].ref_mailbox, &mut self.backend, &self.work_context, - self.notify_fn.clone(), ) { Ok(v) => v, Err(err) => { @@ -1470,7 +1407,6 @@ impl Account { &mailboxes[&mailbox_hash], &mut self.backend, &self.work_context, - self.notify_fn.clone(), ) { Ok(v) => (MailboxStatus::Parsing(0, 0), v), Err(err) => (MailboxStatus::Failed(err), None), diff --git a/src/plugins/backend.rs b/src/plugins/backend.rs index 7794da7a..ed0413cf 100644 --- a/src/plugins/backend.rs +++ b/src/plugins/backend.rs @@ -191,18 +191,10 @@ impl MailBackend for PluginBackend { Ok(w.build(handle)) } - fn refresh( - &mut self, - _mailbox_hash: MailboxHash, - _sender: RefreshEventConsumer, - ) -> Result> { + fn refresh(&mut self, _mailbox_hash: MailboxHash) -> Result> { Err(MeliError::new("Unimplemented.")) } - fn watch( - &self, - _sender: RefreshEventConsumer, - _work_context: WorkContext, - ) -> Result { + fn watch(&self, _work_context: WorkContext) -> Result { Err(MeliError::new("Unimplemented.")) } @@ -249,6 +241,7 @@ impl PluginBackend { plugin: Plugin, _s: &AccountSettings, _is_subscribed: Box bool>, + _ev: melib::backends::BackendEventConsumer, ) -> Result> { if plugin.kind != PluginKind::Backend { return Err(MeliError::new(format!( @@ -284,10 +277,10 @@ impl PluginBackend { create_fn: Box::new(move || { let plugin = plugin.clone(); let listener = listener.try_clone().unwrap(); - Box::new(move |f, i| { + Box::new(move |f, i, ev| { let plugin = plugin.clone(); let listener = listener.try_clone().unwrap(); - PluginBackend::new(listener, plugin, f, i) + PluginBackend::new(listener, plugin, f, i, ev) }) }), validate_conf_fn: Box::new(|_| Ok(())), diff --git a/src/state.rs b/src/state.rs index 034ef97c..3ecec8d8 100644 --- a/src/state.rs +++ b/src/state.rs @@ -30,7 +30,7 @@ Input is received in the main loop from threads which listen on the stdin for us use super::*; use crate::plugins::PluginManager; -use melib::backends::{AccountHash, MailboxHash, NotifyFn}; +use melib::backends::{AccountHash, BackendEventConsumer}; use crate::jobs::JobExecutor; use crossbeam::channel::{unbounded, Receiver, Sender}; @@ -284,14 +284,16 @@ impl State { work_controller.get_context(), job_executor.clone(), sender.clone(), - NotifyFn::new(Box::new(move |f: MailboxHash| { - sender - .send(ThreadEvent::UIEvent(UIEvent::WorkerProgress( - account_hash, - f, - ))) - .unwrap(); - })), + BackendEventConsumer::new(Arc::new( + move |account_hash: AccountHash, ev: BackendEvent| { + sender + .send(ThreadEvent::UIEvent(UIEvent::BackendEvent( + account_hash, + ev, + ))) + .unwrap(); + }, + )), ) }) .collect::>>()? @@ -1028,8 +1030,31 @@ impl State { self.child = Some(child); return; } - UIEvent::WorkerProgress(account_hash, mailbox_hash) => { - let _ = self.context.accounts[&account_hash].load(mailbox_hash); + UIEvent::BackendEvent( + account_hash, + BackendEvent::Notice { + ref description, + ref content, + level, + }, + ) => { + log( + format!( + "{}: {}{}{}", + self.context.accounts[&account_hash].name(), + description.as_ref().map(|s| s.as_str()).unwrap_or(""), + if description.is_some() { ": " } else { "" }, + content.as_str() + ), + level, + ); + self.rcv_event(UIEvent::StatusEvent(StatusEvent::DisplayMessage( + content.to_string(), + ))); + return; + } + UIEvent::BackendEvent(_, BackendEvent::Refresh(refresh_event)) => { + self.refresh_event(refresh_event); return; } UIEvent::ChangeMode(m) => { diff --git a/src/types.rs b/src/types.rs index 20e69229..8b1c740a 100644 --- a/src/types.rs +++ b/src/types.rs @@ -40,7 +40,7 @@ use super::jobs::JobId; use super::terminal::*; use crate::components::{Component, ComponentId}; -use melib::backends::{AccountHash, MailboxHash}; +use melib::backends::{AccountHash, BackendEvent, MailboxHash}; use melib::{EnvelopeHash, RefreshEvent, ThreadHash}; use nix::unistd::Pid; use std::fmt; @@ -119,7 +119,7 @@ pub enum UIEvent { MailboxCreate((AccountHash, MailboxHash)), AccountStatusChange(AccountHash), ComponentKill(Uuid), - WorkerProgress(AccountHash, MailboxHash), + BackendEvent(AccountHash, BackendEvent), StartupCheck(MailboxHash), RefreshEvent(Box), EnvelopeUpdate(EnvelopeHash), diff --git a/testing/src/imap_conn.rs b/testing/src/imap_conn.rs index cd467623..00b66523 100644 --- a/testing/src/imap_conn.rs +++ b/testing/src/imap_conn.rs @@ -1,8 +1,8 @@ extern crate melib; use melib::backends::ImapType; -use melib::AccountSettings; use melib::Result; +use melib::{AccountSettings, BackendEventConsumer}; /// Opens an interactive shell on an IMAP server. Suggested use is with rlwrap(1) /// @@ -42,7 +42,11 @@ fn main() -> Result<()> { .collect(), ..Default::default() }; - let mut imap = ImapType::new(&set, Box::new(|_| true))?; + let mut imap = ImapType::new( + &set, + Box::new(|_| true), + BackendEventConsumer::new(std::sync::Arc::new(|_, _| ())), + )?; (imap.as_any_mut()) .downcast_mut::()