From 819d993f11f24a8db8800925725dd635861e42f0 Mon Sep 17 00:00:00 2001 From: Manos Pitsidianakis Date: Thu, 7 Jan 2021 15:24:14 +0200 Subject: [PATCH] melib/backends: replace watch() with watcher(), BackendWatcher trait ``` Return a [`Box`](BackendWatcher), to which you can register the mailboxes you are interested in for updates and then consume to spawn a watching `Future`. The `Future` sends events to the [`BackendEventConsumer`](BackendEventConsumer) supplied to the backend in its constructor method. ``` Watching mailboxes for updates is more flexible now that you can explicitly register mailboxes and set polling period. --- melib/src/backends.rs | 35 +- melib/src/backends/imap.rs | 70 +-- melib/src/backends/imap/connection.rs | 4 +- melib/src/backends/imap/watch.rs | 859 ++++++++++++++------------ melib/src/backends/jmap.rs | 32 +- melib/src/backends/jmap/watch.rs | 83 +++ melib/src/backends/maildir.rs | 2 + melib/src/backends/maildir/backend.rs | 549 +--------------- melib/src/backends/maildir/watch.rs | 608 ++++++++++++++++++ melib/src/backends/mbox.rs | 161 +---- melib/src/backends/mbox/watch.rs | 223 +++++++ melib/src/backends/nntp.rs | 2 +- melib/src/backends/notmuch.rs | 59 +- melib/src/backends/notmuch/watch.rs | 116 ++++ src/conf/accounts.rs | 20 +- 15 files changed, 1633 insertions(+), 1190 deletions(-) create mode 100644 melib/src/backends/jmap/watch.rs create mode 100644 melib/src/backends/maildir/watch.rs create mode 100644 melib/src/backends/mbox/watch.rs create mode 100644 melib/src/backends/notmuch/watch.rs diff --git a/melib/src/backends.rs b/melib/src/backends.rs index 9b86a24df..09eb1d566 100644 --- a/melib/src/backends.rs +++ b/melib/src/backends.rs @@ -316,7 +316,12 @@ pub trait MailBackend: ::std::fmt::Debug + Send + Sync { ) -> Result>> + Send + 'static>>>; fn refresh(&mut self, mailbox_hash: MailboxHash) -> ResultFuture<()>; - fn watch(&self) -> ResultFuture<()>; + + /// Return a [`Box`](BackendWatcher), to which you can register the + /// mailboxes you are interested in for updates and then consume to spawn a watching `Future`. + /// The `Future` sends events to the [`BackendEventConsumer`](BackendEventConsumer) supplied to + /// the backend in its constructor method. + fn watcher(&self) -> Result>; fn mailboxes(&self) -> ResultFuture>; fn operation(&self, hash: EnvelopeHash) -> Result>; @@ -711,3 +716,31 @@ impl std::ops::Deref for IsSubscribedFn { &self.0 } } + +/// Urgency for the events of a single Mailbox. +#[derive(Debug, Copy, Clone, PartialEq)] +pub enum MailboxWatchUrgency { + High, + Medium, + Low, +} + +/// Register the mailboxes you are interested in for updates and then consume with `spawn` to spawn +/// a watching `Future`. The `Future` sends events to the +/// [`BackendEventConsumer`](backends::BackendEventConsumer) supplied to the backend in its constructor +/// method. +pub trait BackendWatcher: ::std::fmt::Debug + Send + Sync { + /// Whether the watcher's `Future` requires blocking I/O. + fn is_blocking(&self) -> bool; + fn register_mailbox( + &mut self, + mailbox_hash: MailboxHash, + urgency: MailboxWatchUrgency, + ) -> Result<()>; + fn set_polling_period(&mut self, period: Option) -> Result<()>; + fn spawn(self: Box) -> ResultFuture<()>; + /// Use the [`Any`](std::any::Any) trait to get the underlying type implementing the + /// [`BackendWatcher`](backends::BackendEventConsumer) trait. + fn as_any(&self) -> &dyn Any; + fn as_any_mut(&mut self) -> &mut dyn Any; +} diff --git a/melib/src/backends/imap.rs b/melib/src/backends/imap.rs index 062ac3cca..f68344ef6 100644 --- a/melib/src/backends/imap.rs +++ b/melib/src/backends/imap.rs @@ -370,13 +370,13 @@ impl MailBackend for ImapType { let main_conn = self.connection.clone(); let uid_store = self.uid_store.clone(); Ok(Box::pin(async move { - let inbox = timeout(uid_store.timeout, uid_store.mailboxes.lock()) + let mut inbox = timeout(uid_store.timeout, uid_store.mailboxes.lock()) .await? .get(&mailbox_hash) .map(std::clone::Clone::clone) .unwrap(); let mut conn = timeout(uid_store.timeout, main_conn.lock()).await?; - watch::examine_updates(inbox, &mut conn, &uid_store).await?; + watch::ImapWatcher::examine_updates(&mut inbox, &mut conn, &uid_store).await?; Ok(()) })) } @@ -450,68 +450,16 @@ impl MailBackend for ImapType { })) } - fn watch(&self) -> ResultFuture<()> { + fn watcher(&self) -> Result> { let server_conf = self.server_conf.clone(); let main_conn = self.connection.clone(); let uid_store = self.uid_store.clone(); - Ok(Box::pin(async move { - let has_idle: bool = match server_conf.protocol { - ImapProtocol::IMAP { - extension_use: ImapExtensionUse { idle, .. }, - } => { - idle && uid_store - .capabilities - .lock() - .unwrap() - .iter() - .any(|cap| cap.eq_ignore_ascii_case(b"IDLE")) - } - _ => false, - }; - while let Err(err) = if has_idle { - idle(ImapWatchKit { - conn: ImapConnection::new_connection(&server_conf, uid_store.clone()), - main_conn: main_conn.clone(), - uid_store: uid_store.clone(), - }) - .await - } else { - poll_with_examine(ImapWatchKit { - conn: ImapConnection::new_connection(&server_conf, uid_store.clone()), - main_conn: main_conn.clone(), - uid_store: uid_store.clone(), - }) - .await - } { - let mut main_conn_lck = timeout(uid_store.timeout, main_conn.lock()).await?; - if err.kind.is_network() { - uid_store.is_online.lock().unwrap().1 = Err(err.clone()); - } else { - return Err(err); - } - debug!("Watch failure: {}", err.to_string()); - match timeout(uid_store.timeout, main_conn_lck.connect()) - .await - .and_then(|res| res) - { - Err(err2) => { - debug!("Watch reconnect attempt failed: {}", err2.to_string()); - } - Ok(()) => { - debug!("Watch reconnect attempt succesful"); - continue; - } - } - let account_hash = uid_store.account_hash; - main_conn_lck.add_refresh_event(RefreshEvent { - account_hash, - mailbox_hash: 0, - kind: RefreshEventKind::Failure(err.clone()), - }); - return Err(err); - } - debug!("watch future returning"); - Ok(()) + Ok(Box::new(ImapWatcher { + main_conn, + uid_store, + mailbox_hashes: BTreeSet::default(), + polling_period: std::time::Duration::from_secs(5 * 60), + server_conf, })) } diff --git a/melib/src/backends/imap/connection.rs b/melib/src/backends/imap/connection.rs index 9b1ce79a1..edbb7343d 100644 --- a/melib/src/backends/imap/connection.rs +++ b/melib/src/backends/imap/connection.rs @@ -588,7 +588,9 @@ impl ImapConnection { pub fn connect<'a>(&'a mut self) -> Pin> + Send + 'a>> { Box::pin(async move { if let (time, ref mut status @ Ok(())) = *self.uid_store.is_online.lock().unwrap() { - if SystemTime::now().duration_since(time).unwrap_or_default() >= IMAP_PROTOCOL_TIMEOUT { + if SystemTime::now().duration_since(time).unwrap_or_default() + >= IMAP_PROTOCOL_TIMEOUT + { let err = MeliError::new("Connection timed out").set_kind(ErrorKind::Timeout); *status = Err(err.clone()); self.stream = Err(err); diff --git a/melib/src/backends/imap/watch.rs b/melib/src/backends/imap/watch.rs index 2c9c82b61..85acbf3c6 100644 --- a/melib/src/backends/imap/watch.rs +++ b/melib/src/backends/imap/watch.rs @@ -23,201 +23,129 @@ use crate::backends::SpecialUsageMailbox; use std::sync::Arc; /// Arguments for IMAP watching functions -pub struct ImapWatchKit { - pub conn: ImapConnection, +#[derive(Debug)] +pub struct ImapWatcher { pub main_conn: Arc>, pub uid_store: Arc, + pub mailbox_hashes: BTreeSet, + pub polling_period: std::time::Duration, + pub server_conf: ImapServerConf, } -pub async fn poll_with_examine(kit: ImapWatchKit) -> Result<()> { - debug!("poll with examine"); - let ImapWatchKit { - mut conn, - main_conn: _, - uid_store, - } = kit; - conn.connect().await?; - let mailboxes: HashMap = { - let mailboxes_lck = timeout(uid_store.timeout, uid_store.mailboxes.lock()).await?; - mailboxes_lck.clone() - }; - loop { - for (_, mailbox) in mailboxes.clone() { - examine_updates(mailbox, &mut conn, &uid_store).await?; - } - //FIXME: make sleep duration configurable - smol::Timer::after(std::time::Duration::from_secs(3 * 60)).await; +impl BackendWatcher for ImapWatcher { + fn is_blocking(&self) -> bool { + false } -} -pub async fn idle(kit: ImapWatchKit) -> Result<()> { - debug!("IDLE"); - /* IDLE only watches the connection's selected mailbox. We will IDLE on INBOX and every ~5 - * minutes wake up and poll the others */ - let ImapWatchKit { - mut conn, - main_conn, - uid_store, - } = kit; - conn.connect().await?; - let mailbox: ImapMailbox = match uid_store - .mailboxes - .lock() - .await - .values() - .find(|f| f.parent.is_none() && (f.special_usage() == SpecialUsageMailbox::Inbox)) - .map(std::clone::Clone::clone) - { - Some(mailbox) => mailbox, - None => { - return Err(MeliError::new("INBOX mailbox not found in local mailbox index. meli may have not parsed the IMAP mailboxes correctly")); + fn register_mailbox( + &mut self, + mailbox_hash: MailboxHash, + _urgency: MailboxWatchUrgency, + ) -> Result<()> { + self.mailbox_hashes.insert(mailbox_hash); + Ok(()) + } + + fn set_polling_period(&mut self, period: Option) -> Result<()> { + if let Some(period) = period { + self.polling_period = period; } - }; - let mailbox_hash = mailbox.hash(); - let mut response = Vec::with_capacity(8 * 1024); - let select_response = conn - .examine_mailbox(mailbox_hash, &mut response, true) - .await? - .unwrap(); - { - let mut uidvalidities = uid_store.uidvalidity.lock().unwrap(); + Ok(()) + } - if let Some(v) = uidvalidities.get(&mailbox_hash) { - if *v != select_response.uidvalidity { - if uid_store.keep_offline_cache { - #[cfg(not(feature = "sqlite3"))] - let mut cache_handle = super::cache::DefaultCache::get(uid_store.clone())?; - #[cfg(feature = "sqlite3")] - let mut cache_handle = super::cache::Sqlite3Cache::get(uid_store.clone())?; - cache_handle.clear(mailbox_hash, &select_response)?; + fn spawn(mut self: Box) -> ResultFuture<()> { + Ok(Box::pin(async move { + let has_idle: bool = match self.server_conf.protocol { + ImapProtocol::IMAP { + extension_use: ImapExtensionUse { idle, .. }, + } => { + idle && self + .uid_store + .capabilities + .lock() + .unwrap() + .iter() + .any(|cap| cap.eq_ignore_ascii_case(b"IDLE")) } - conn.add_refresh_event(RefreshEvent { - account_hash: uid_store.account_hash, - mailbox_hash, - kind: RefreshEventKind::Rescan, + _ => false, + }; + while let Err(err) = if has_idle { + self.idle().await + } else { + self.poll_with_examine().await + } { + let mut main_conn_lck = + timeout(self.uid_store.timeout, self.main_conn.lock()).await?; + if err.kind.is_network() { + self.uid_store.is_online.lock().unwrap().1 = Err(err.clone()); + } else { + return Err(err); + } + debug!("Watch failure: {}", err.to_string()); + match timeout(self.uid_store.timeout, main_conn_lck.connect()) + .await + .and_then(|res| res) + { + Err(err2) => { + debug!("Watch reconnect attempt failed: {}", err2.to_string()); + } + Ok(()) => { + debug!("Watch reconnect attempt succesful"); + continue; + } + } + let account_hash = self.uid_store.account_hash; + main_conn_lck.add_refresh_event(RefreshEvent { + account_hash, + mailbox_hash: 0, + kind: RefreshEventKind::Failure(err.clone()), }); - /* - uid_store.uid_index.lock().unwrap().clear(); - uid_store.hash_index.lock().unwrap().clear(); - uid_store.byte_cache.lock().unwrap().clear(); - */ + return Err(err); } - } else { - uidvalidities.insert(mailbox_hash, select_response.uidvalidity); - } + debug!("watch future returning"); + Ok(()) + })) } - let mailboxes: HashMap = { - let mailboxes_lck = timeout(uid_store.timeout, uid_store.mailboxes.lock()).await?; - mailboxes_lck.clone() - }; - for (h, mailbox) in mailboxes.clone() { - if mailbox_hash == h { - continue; - } - examine_updates(mailbox, &mut conn, &uid_store).await?; + + fn as_any(&self) -> &dyn Any { + self } - conn.send_command(b"IDLE").await?; - let mut blockn = ImapBlockingConnection::from(conn); - let mut watch = std::time::Instant::now(); - /* duration interval to send heartbeat */ - const _10_MINS: std::time::Duration = std::time::Duration::from_secs(10 * 60); - /* duration interval to check other mailboxes for changes */ - const _5_MINS: std::time::Duration = std::time::Duration::from_secs(5 * 60); - loop { - let line = match timeout(Some(_10_MINS), blockn.as_stream()).await { - Ok(Some(line)) => line, - Ok(None) => { - debug!("IDLE connection dropped: {:?}", &blockn.err()); - blockn.conn.connect().await?; - let mut main_conn_lck = timeout(uid_store.timeout, main_conn.lock()).await?; - main_conn_lck.connect().await?; - continue; - } - Err(_) => { - /* Timeout */ - blockn.conn.send_raw(b"DONE").await?; - blockn - .conn - .read_response(&mut response, RequiredResponses::empty()) - .await?; - blockn.conn.send_command(b"IDLE").await?; - let mut main_conn_lck = timeout(uid_store.timeout, main_conn.lock()).await?; - main_conn_lck.connect().await?; - continue; + + fn as_any_mut(&mut self) -> &mut dyn Any { + self + } +} + +impl ImapWatcher { + pub async fn idle(&mut self) -> Result<()> { + debug!("IDLE"); + /* IDLE only watches the connection's selected mailbox. We will IDLE on INBOX and every X + * minutes wake up and poll the others */ + let ImapWatcher { + ref main_conn, + ref uid_store, + ref mailbox_hashes, + ref polling_period, + ref server_conf, + .. + } = self; + let mut connection = ImapConnection::new_connection(server_conf, uid_store.clone()); + connection.connect().await?; + let mailbox_hash: MailboxHash = match uid_store + .mailboxes + .lock() + .await + .values() + .find(|f| f.parent.is_none() && (f.special_usage() == SpecialUsageMailbox::Inbox)) + .map(|f| f.hash) + { + Some(h) => h, + None => { + return Err(MeliError::new("INBOX mailbox not found in local mailbox index. meli may have not parsed the IMAP mailboxes correctly")); } }; - let now = std::time::Instant::now(); - if now.duration_since(watch) >= _5_MINS { - /* Time to poll all inboxes */ - let mut conn = timeout(uid_store.timeout, main_conn.lock()).await?; - for (_h, mailbox) in mailboxes.clone() { - examine_updates(mailbox, &mut conn, &uid_store).await?; - } - watch = now; - } - if line - .split_rn() - .filter(|l| { - !l.starts_with(b"+ ") - && !l.starts_with(b"* ok") - && !l.starts_with(b"* ok") - && !l.starts_with(b"* Ok") - && !l.starts_with(b"* OK") - }) - .count() - == 0 - { - continue; - } - { - blockn.conn.send_raw(b"DONE").await?; - blockn - .conn - .read_response(&mut response, RequiredResponses::empty()) - .await?; - for l in line.split_rn().chain(response.split_rn()) { - debug!("process_untagged {:?}", &l); - if l.starts_with(b"+ ") - || l.starts_with(b"* ok") - || l.starts_with(b"* ok") - || l.starts_with(b"* Ok") - || l.starts_with(b"* OK") - { - debug!("ignore continuation mark"); - continue; - } - blockn.conn.process_untagged(l).await?; - } - blockn.conn.send_command(b"IDLE").await?; - } - } -} - -pub async fn examine_updates( - mailbox: ImapMailbox, - conn: &mut ImapConnection, - uid_store: &Arc, -) -> Result<()> { - if mailbox.no_select { - return Ok(()); - } - let mailbox_hash = mailbox.hash(); - debug!("examining mailbox {} {}", mailbox_hash, mailbox.path()); - if let Some(new_envelopes) = conn.resync(mailbox_hash).await? { - for env in new_envelopes { - conn.add_refresh_event(RefreshEvent { - mailbox_hash, - account_hash: uid_store.account_hash, - kind: RefreshEventKind::Create(Box::new(env)), - }); - } - } else { - #[cfg(not(feature = "sqlite3"))] - let mut cache_handle = super::cache::DefaultCache::get(uid_store.clone())?; - #[cfg(feature = "sqlite3")] - let mut cache_handle = super::cache::Sqlite3Cache::get(uid_store.clone())?; let mut response = Vec::with_capacity(8 * 1024); - let select_response = conn + let select_response = connection .examine_mailbox(mailbox_hash, &mut response, true) .await? .unwrap(); @@ -227,9 +155,13 @@ pub async fn examine_updates( if let Some(v) = uidvalidities.get(&mailbox_hash) { if *v != select_response.uidvalidity { if uid_store.keep_offline_cache { + #[cfg(not(feature = "sqlite3"))] + let mut cache_handle = super::cache::DefaultCache::get(uid_store.clone())?; + #[cfg(feature = "sqlite3")] + let mut cache_handle = super::cache::Sqlite3Cache::get(uid_store.clone())?; cache_handle.clear(mailbox_hash, &select_response)?; } - conn.add_refresh_event(RefreshEvent { + connection.add_refresh_event(RefreshEvent { account_hash: uid_store.account_hash, mailbox_hash, kind: RefreshEventKind::Rescan, @@ -239,215 +171,384 @@ pub async fn examine_updates( uid_store.hash_index.lock().unwrap().clear(); uid_store.byte_cache.lock().unwrap().clear(); */ - return Ok(()); } } else { uidvalidities.insert(mailbox_hash, select_response.uidvalidity); } } - if mailbox.is_cold() { - /* Mailbox hasn't been loaded yet */ - let has_list_status: bool = conn - .uid_store - .capabilities - .lock() - .unwrap() - .iter() - .any(|cap| cap.eq_ignore_ascii_case(b"LIST-STATUS")); - if has_list_status { - conn.send_command( - format!( - "LIST \"{}\" \"\" RETURN (STATUS (MESSAGES UNSEEN))", - mailbox.imap_path() - ) - .as_bytes(), - ) - .await?; - conn.read_response( - &mut response, - RequiredResponses::LIST_REQUIRED | RequiredResponses::STATUS, - ) - .await?; - debug!( - "list return status out: {}", - String::from_utf8_lossy(&response) - ); - for l in response.split_rn() { - if !l.starts_with(b"*") { - continue; - } - if let Ok(status) = protocol_parser::status_response(&l).map(|(_, v)| v) { - if Some(mailbox_hash) == status.mailbox { - if let Some(total) = status.messages { - if let Ok(mut exists_lck) = mailbox.exists.lock() { - exists_lck.clear(); - exists_lck.set_not_yet_seen(total); - } - } - if let Some(total) = status.unseen { - if let Ok(mut unseen_lck) = mailbox.unseen.lock() { - unseen_lck.clear(); - unseen_lck.set_not_yet_seen(total); - } - } - break; - } - } - } - } else { - conn.send_command(b"SEARCH UNSEEN").await?; - conn.read_response(&mut response, RequiredResponses::SEARCH) - .await?; - let unseen_count = protocol_parser::search_results(&response)?.1.len(); - if let Ok(mut exists_lck) = mailbox.exists.lock() { - exists_lck.clear(); - exists_lck.set_not_yet_seen(select_response.exists); - } - if let Ok(mut unseen_lck) = mailbox.unseen.lock() { - unseen_lck.clear(); - unseen_lck.set_not_yet_seen(unseen_count); - } - } - mailbox.set_warm(true); - return Ok(()); - } - - if select_response.recent > 0 { - /* UID SEARCH RECENT */ - conn.send_command(b"UID SEARCH RECENT").await?; - conn.read_response(&mut response, RequiredResponses::SEARCH) - .await?; - let v = protocol_parser::search_results(response.as_slice()).map(|(_, v)| v)?; - if v.is_empty() { - debug!( - "search response was empty: {}", - String::from_utf8_lossy(&response) - ); - return Ok(()); - } - let mut cmd = "UID FETCH ".to_string(); - if v.len() == 1 { - cmd.push_str(&v[0].to_string()); - } else { - cmd.push_str(&v[0].to_string()); - for n in v.into_iter().skip(1) { - cmd.push(','); - cmd.push_str(&n.to_string()); - } - } - cmd.push_str( - " (UID FLAGS ENVELOPE BODY.PEEK[HEADER.FIELDS (REFERENCES)] BODYSTRUCTURE)", - ); - conn.send_command(cmd.as_bytes()).await?; - conn.read_response(&mut response, RequiredResponses::FETCH_REQUIRED) - .await?; - } else if select_response.exists > mailbox.exists.lock().unwrap().len() { - conn.send_command( - format!( - "FETCH {}:* (UID FLAGS ENVELOPE BODY.PEEK[HEADER.FIELDS (REFERENCES)] BODYSTRUCTURE)", - std::cmp::max(mailbox.exists.lock().unwrap().len(), 1) - ) - .as_bytes(), - ) - .await?; - conn.read_response(&mut response, RequiredResponses::FETCH_REQUIRED) - .await?; - } else { - return Ok(()); - } - debug!( - "fetch response is {} bytes and {} lines", - response.len(), - String::from_utf8_lossy(&response).lines().count() - ); - let (_, mut v, _) = protocol_parser::fetch_responses(&response)?; - debug!("responses len is {}", v.len()); - for FetchResponse { - ref uid, - ref mut envelope, - ref mut flags, - ref references, - .. - } in v.iter_mut() - { - let uid = uid.unwrap(); - let env = envelope.as_mut().unwrap(); - env.set_hash(generate_envelope_hash(&mailbox.imap_path(), &uid)); - if let Some(value) = references { - env.set_references(value); - } - let mut tag_lck = uid_store.collection.tag_index.write().unwrap(); - if let Some((flags, keywords)) = flags { - env.set_flags(*flags); - if !env.is_seen() { - mailbox.unseen.lock().unwrap().insert_new(env.hash()); - } - mailbox.exists.lock().unwrap().insert_new(env.hash()); - for f in keywords { - let hash = tag_hash!(f); - if !tag_lck.contains_key(&hash) { - tag_lck.insert(hash, f.to_string()); - } - env.labels_mut().push(hash); - } - } - } - if uid_store.keep_offline_cache { - if !cache_handle.mailbox_state(mailbox_hash)?.is_none() { - cache_handle - .insert_envelopes(mailbox_hash, &v) - .chain_err_summary(|| { - format!( - "Could not save envelopes in cache for mailbox {}", - mailbox.imap_path() - ) - })?; - } - } - - for FetchResponse { uid, envelope, .. } in v { - if uid.is_none() || envelope.is_none() { + let mailboxes: HashMap = { + let mailboxes_lck = timeout(uid_store.timeout, uid_store.mailboxes.lock()).await?; + let mut ret = mailboxes_lck.clone(); + ret.retain(|k, _| mailbox_hashes.contains(k)); + ret + }; + for (h, mailbox) in mailboxes.iter() { + if mailbox_hash == *h { continue; } - let uid = uid.unwrap(); - if uid_store - .uid_index - .lock() - .unwrap() - .contains_key(&(mailbox_hash, uid)) + Self::examine_updates(mailbox, &mut connection, &uid_store).await?; + } + connection.send_command(b"IDLE").await?; + let mut blockn = ImapBlockingConnection::from(connection); + let mut watch = std::time::Instant::now(); + /* duration interval to send heartbeat */ + const _10_MINS: std::time::Duration = std::time::Duration::from_secs(10 * 60); + /* duration interval to check other mailboxes for changes */ + loop { + let line = match timeout( + Some(std::cmp::min(*polling_period, _10_MINS)), + blockn.as_stream(), + ) + .await + { + Ok(Some(line)) => line, + Ok(None) => { + debug!("IDLE connection dropped: {:?}", &blockn.err()); + blockn.conn.connect().await?; + let mut main_conn_lck = timeout(uid_store.timeout, main_conn.lock()).await?; + main_conn_lck.connect().await?; + continue; + } + Err(_) => { + /* Timeout */ + blockn.conn.send_raw(b"DONE").await?; + blockn + .conn + .read_response(&mut response, RequiredResponses::empty()) + .await?; + blockn.conn.send_command(b"IDLE").await?; + let mut main_conn_lck = timeout(uid_store.timeout, main_conn.lock()).await?; + main_conn_lck.connect().await?; + continue; + } + }; + let now = std::time::Instant::now(); + if now.duration_since(watch) >= *polling_period { + /* Time to poll all inboxes */ + let mut conn = timeout(uid_store.timeout, main_conn.lock()).await?; + for (_h, mailbox) in mailboxes.iter() { + Self::examine_updates(mailbox, &mut conn, &uid_store).await?; + } + watch = now; + } + if line + .split_rn() + .filter(|l| { + !l.starts_with(b"+ ") + && !l.starts_with(b"* ok") + && !l.starts_with(b"* ok") + && !l.starts_with(b"* Ok") + && !l.starts_with(b"* OK") + }) + .count() + == 0 { continue; } - let env = envelope.unwrap(); - debug!( - "Create event {} {} {}", - env.hash(), - env.subject(), - mailbox.path(), - ); - uid_store - .msn_index - .lock() - .unwrap() - .entry(mailbox_hash) - .or_default() - .push(uid); - uid_store - .hash_index - .lock() - .unwrap() - .insert(env.hash(), (uid, mailbox_hash)); - uid_store - .uid_index - .lock() - .unwrap() - .insert((mailbox_hash, uid), env.hash()); - conn.add_refresh_event(RefreshEvent { - account_hash: uid_store.account_hash, - mailbox_hash, - kind: Create(Box::new(env)), - }); + { + blockn.conn.send_raw(b"DONE").await?; + blockn + .conn + .read_response(&mut response, RequiredResponses::empty()) + .await?; + for l in line.split_rn().chain(response.split_rn()) { + debug!("process_untagged {:?}", String::from_utf8_lossy(&l)); + if l.starts_with(b"+ ") + || l.starts_with(b"* ok") + || l.starts_with(b"* ok") + || l.starts_with(b"* Ok") + || l.starts_with(b"* OK") + { + debug!("ignore continuation mark"); + continue; + } + blockn.conn.process_untagged(l).await?; + } + blockn.conn.send_command(b"IDLE").await?; + } } } - Ok(()) + pub async fn poll_with_examine(&mut self) -> Result<()> { + debug!("poll with examine"); + let ImapWatcher { + ref mailbox_hashes, + ref uid_store, + ref polling_period, + ref server_conf, + .. + } = self; + let mut connection = ImapConnection::new_connection(server_conf, uid_store.clone()); + connection.connect().await?; + let mailboxes: HashMap = { + let mailboxes_lck = timeout(uid_store.timeout, uid_store.mailboxes.lock()).await?; + let mut ret = mailboxes_lck.clone(); + ret.retain(|k, _| mailbox_hashes.contains(k)); + ret + }; + loop { + for (_, mailbox) in mailboxes.iter() { + Self::examine_updates(mailbox, &mut connection, &uid_store).await?; + } + crate::connections::sleep(*polling_period).await; + } + } + + pub async fn examine_updates( + mailbox: &ImapMailbox, + conn: &mut ImapConnection, + uid_store: &Arc, + ) -> Result<()> { + if mailbox.no_select { + return Ok(()); + } + let mailbox_hash = mailbox.hash(); + debug!("examining mailbox {} {}", mailbox_hash, mailbox.path()); + if let Some(new_envelopes) = conn.resync(mailbox_hash).await? { + for env in new_envelopes { + conn.add_refresh_event(RefreshEvent { + mailbox_hash, + account_hash: uid_store.account_hash, + kind: RefreshEventKind::Create(Box::new(env)), + }); + } + } else { + #[cfg(not(feature = "sqlite3"))] + let mut cache_handle = super::cache::DefaultCache::get(uid_store.clone())?; + #[cfg(feature = "sqlite3")] + let mut cache_handle = super::cache::Sqlite3Cache::get(uid_store.clone())?; + let mut response = Vec::with_capacity(8 * 1024); + let select_response = conn + .examine_mailbox(mailbox_hash, &mut response, true) + .await? + .unwrap(); + { + let mut uidvalidities = uid_store.uidvalidity.lock().unwrap(); + + if let Some(v) = uidvalidities.get(&mailbox_hash) { + if *v != select_response.uidvalidity { + if uid_store.keep_offline_cache { + cache_handle.clear(mailbox_hash, &select_response)?; + } + conn.add_refresh_event(RefreshEvent { + account_hash: uid_store.account_hash, + mailbox_hash, + kind: RefreshEventKind::Rescan, + }); + /* + uid_store.uid_index.lock().unwrap().clear(); + uid_store.hash_index.lock().unwrap().clear(); + uid_store.byte_cache.lock().unwrap().clear(); + */ + return Ok(()); + } + } else { + uidvalidities.insert(mailbox_hash, select_response.uidvalidity); + } + } + if mailbox.is_cold() { + /* Mailbox hasn't been loaded yet */ + let has_list_status: bool = conn + .uid_store + .capabilities + .lock() + .unwrap() + .iter() + .any(|cap| cap.eq_ignore_ascii_case(b"LIST-STATUS")); + if has_list_status { + conn.send_command( + format!( + "LIST \"{}\" \"\" RETURN (STATUS (MESSAGES UNSEEN))", + mailbox.imap_path() + ) + .as_bytes(), + ) + .await?; + conn.read_response( + &mut response, + RequiredResponses::LIST_REQUIRED | RequiredResponses::STATUS, + ) + .await?; + debug!( + "list return status out: {}", + String::from_utf8_lossy(&response) + ); + for l in response.split_rn() { + if !l.starts_with(b"*") { + continue; + } + if let Ok(status) = protocol_parser::status_response(&l).map(|(_, v)| v) { + if Some(mailbox_hash) == status.mailbox { + if let Some(total) = status.messages { + if let Ok(mut exists_lck) = mailbox.exists.lock() { + exists_lck.clear(); + exists_lck.set_not_yet_seen(total); + } + } + if let Some(total) = status.unseen { + if let Ok(mut unseen_lck) = mailbox.unseen.lock() { + unseen_lck.clear(); + unseen_lck.set_not_yet_seen(total); + } + } + break; + } + } + } + } else { + conn.send_command(b"SEARCH UNSEEN").await?; + conn.read_response(&mut response, RequiredResponses::SEARCH) + .await?; + let unseen_count = protocol_parser::search_results(&response)?.1.len(); + if let Ok(mut exists_lck) = mailbox.exists.lock() { + exists_lck.clear(); + exists_lck.set_not_yet_seen(select_response.exists); + } + if let Ok(mut unseen_lck) = mailbox.unseen.lock() { + unseen_lck.clear(); + unseen_lck.set_not_yet_seen(unseen_count); + } + } + mailbox.set_warm(true); + return Ok(()); + } + + if select_response.recent > 0 { + /* UID SEARCH RECENT */ + conn.send_command(b"UID SEARCH RECENT").await?; + conn.read_response(&mut response, RequiredResponses::SEARCH) + .await?; + let v = protocol_parser::search_results(response.as_slice()).map(|(_, v)| v)?; + if v.is_empty() { + debug!( + "search response was empty: {}", + String::from_utf8_lossy(&response) + ); + return Ok(()); + } + let mut cmd = "UID FETCH ".to_string(); + if v.len() == 1 { + cmd.push_str(&v[0].to_string()); + } else { + cmd.push_str(&v[0].to_string()); + for n in v.into_iter().skip(1) { + cmd.push(','); + cmd.push_str(&n.to_string()); + } + } + cmd.push_str( + " (UID FLAGS ENVELOPE BODY.PEEK[HEADER.FIELDS (REFERENCES)] BODYSTRUCTURE)", + ); + conn.send_command(cmd.as_bytes()).await?; + conn.read_response(&mut response, RequiredResponses::FETCH_REQUIRED) + .await?; + } else if select_response.exists > mailbox.exists.lock().unwrap().len() { + conn.send_command( + format!( + "FETCH {}:* (UID FLAGS ENVELOPE BODY.PEEK[HEADER.FIELDS (REFERENCES)] BODYSTRUCTURE)", + std::cmp::max(mailbox.exists.lock().unwrap().len(), 1) + ) + .as_bytes(), + ) + .await?; + conn.read_response(&mut response, RequiredResponses::FETCH_REQUIRED) + .await?; + } else { + return Ok(()); + } + debug!( + "fetch response is {} bytes and {} lines", + response.len(), + String::from_utf8_lossy(&response).lines().count() + ); + let (_, mut v, _) = protocol_parser::fetch_responses(&response)?; + debug!("responses len is {}", v.len()); + for FetchResponse { + ref uid, + ref mut envelope, + ref mut flags, + ref references, + .. + } in v.iter_mut() + { + let uid = uid.unwrap(); + let env = envelope.as_mut().unwrap(); + env.set_hash(generate_envelope_hash(&mailbox.imap_path(), &uid)); + if let Some(value) = references { + env.set_references(value); + } + let mut tag_lck = uid_store.collection.tag_index.write().unwrap(); + if let Some((flags, keywords)) = flags { + env.set_flags(*flags); + if !env.is_seen() { + mailbox.unseen.lock().unwrap().insert_new(env.hash()); + } + mailbox.exists.lock().unwrap().insert_new(env.hash()); + for f in keywords { + let hash = tag_hash!(f); + if !tag_lck.contains_key(&hash) { + tag_lck.insert(hash, f.to_string()); + } + env.labels_mut().push(hash); + } + } + } + if uid_store.keep_offline_cache { + if !cache_handle.mailbox_state(mailbox_hash)?.is_none() { + cache_handle + .insert_envelopes(mailbox_hash, &v) + .chain_err_summary(|| { + format!( + "Could not save envelopes in cache for mailbox {}", + mailbox.imap_path() + ) + })?; + } + } + + for FetchResponse { uid, envelope, .. } in v { + if uid.is_none() || envelope.is_none() { + continue; + } + let uid = uid.unwrap(); + if uid_store + .uid_index + .lock() + .unwrap() + .contains_key(&(mailbox_hash, uid)) + { + continue; + } + let env = envelope.unwrap(); + debug!( + "Create event {} {} {}", + env.hash(), + env.subject(), + mailbox.path(), + ); + uid_store + .msn_index + .lock() + .unwrap() + .entry(mailbox_hash) + .or_default() + .push(uid); + uid_store + .hash_index + .lock() + .unwrap() + .insert(env.hash(), (uid, mailbox_hash)); + uid_store + .uid_index + .lock() + .unwrap() + .insert((mailbox_hash, uid), env.hash()); + conn.add_refresh_event(RefreshEvent { + account_hash: uid_store.account_hash, + mailbox_hash, + kind: Create(Box::new(env)), + }); + } + } + Ok(()) + } } diff --git a/melib/src/backends/jmap.rs b/melib/src/backends/jmap.rs index 29939d053..106bfa24f 100644 --- a/melib/src/backends/jmap.rs +++ b/melib/src/backends/jmap.rs @@ -90,6 +90,8 @@ use objects::*; pub mod mailbox; use mailbox::*; +pub mod watch; + #[derive(Debug, Default)] pub struct EnvelopeCache { bytes: Option, @@ -341,32 +343,12 @@ impl MailBackend for JmapType { })) } - fn watch(&self) -> ResultFuture<()> { + fn watcher(&self) -> Result> { let connection = self.connection.clone(); - let store = self.store.clone(); - Ok(Box::pin(async move { - { - let mut conn = connection.lock().await; - conn.connect().await?; - } - loop { - { - let mailbox_hashes = { - store - .mailboxes - .read() - .unwrap() - .keys() - .cloned() - .collect::>() - }; - let conn = connection.lock().await; - for mailbox_hash in mailbox_hashes { - conn.email_changes(mailbox_hash).await?; - } - } - crate::connections::sleep(std::time::Duration::from_secs(60)).await; - } + Ok(Box::new(watch::JmapWatcher { + connection, + mailbox_hashes: BTreeSet::default(), + polling_period: std::time::Duration::from_secs(60), })) } diff --git a/melib/src/backends/jmap/watch.rs b/melib/src/backends/jmap/watch.rs new file mode 100644 index 000000000..6b9403239 --- /dev/null +++ b/melib/src/backends/jmap/watch.rs @@ -0,0 +1,83 @@ +/* + * melib - JMAP + * + * Copyright 2020 Manos Pitsidianakis + * + * This file is part of meli. + * + * meli is free software: you can redistribute it and/or modify + * it under the terms of the GNU General Public License as published by + * the Free Software Foundation, either version 3 of the License, or + * (at your option) any later version. + * + * meli is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU General Public License for more details. + * + * You should have received a copy of the GNU General Public License + * along with meli. If not, see . + */ + +use super::*; +use std::collections::BTreeSet; + +#[derive(Debug)] +pub struct JmapWatcher { + pub mailbox_hashes: BTreeSet, + pub polling_period: std::time::Duration, + pub connection: Arc>, +} + +impl BackendWatcher for JmapWatcher { + fn is_blocking(&self) -> bool { + false + } + + fn register_mailbox( + &mut self, + mailbox_hash: MailboxHash, + _urgency: MailboxWatchUrgency, + ) -> Result<()> { + self.mailbox_hashes.insert(mailbox_hash); + Ok(()) + } + + fn set_polling_period(&mut self, period: Option) -> Result<()> { + if let Some(period) = period { + self.polling_period = period; + } + Ok(()) + } + + fn spawn(self: Box) -> ResultFuture<()> { + let JmapWatcher { + mailbox_hashes, + polling_period, + connection, + } = *self; + Ok(Box::pin(async move { + { + let mut conn = connection.lock().await; + conn.connect().await?; + } + loop { + { + let conn = connection.lock().await; + for &mailbox_hash in &mailbox_hashes { + conn.email_changes(mailbox_hash).await?; + } + } + crate::connections::sleep(polling_period).await; + } + })) + } + + fn as_any(&self) -> &dyn Any { + self + } + + fn as_any_mut(&mut self) -> &mut dyn Any { + self + } +} diff --git a/melib/src/backends/maildir.rs b/melib/src/backends/maildir.rs index 2090a7d7a..cbd2f535a 100644 --- a/melib/src/backends/maildir.rs +++ b/melib/src/backends/maildir.rs @@ -26,6 +26,8 @@ pub use self::backend::*; mod stream; pub use stream::*; +pub mod watch; + use crate::backends::*; use crate::email::Flag; use crate::error::{MeliError, Result}; diff --git a/melib/src/backends/maildir/backend.rs b/melib/src/backends/maildir/backend.rs index 870d189a3..40abaf0f0 100644 --- a/melib/src/backends/maildir/backend.rs +++ b/melib/src/backends/maildir/backend.rs @@ -27,20 +27,13 @@ use crate::error::{ErrorKind, MeliError, Result}; use crate::shellexpand::ShellExpandTrait; use crate::Collection; use futures::prelude::Stream; - -extern crate notify; -use self::notify::{watcher, DebouncedEvent, RecursiveMode, Watcher}; -use std::time::Duration; - use std::collections::{hash_map::DefaultHasher, HashMap, HashSet}; -use std::ffi::OsStr; use std::fs; use std::hash::{Hash, Hasher}; use std::io::{self, Read, Write}; use std::ops::{Deref, DerefMut}; use std::os::unix::fs::PermissionsExt; -use std::path::{Component, Path, PathBuf}; -use std::sync::mpsc::channel; +use std::path::{Path, PathBuf}; use std::sync::{Arc, Mutex}; #[derive(Clone, Debug, PartialEq)] @@ -339,494 +332,28 @@ impl MailBackend for MaildirType { })) } - fn watch(&self) -> ResultFuture<()> { - let sender = self.event_consumer.clone(); - let (tx, rx) = channel(); - let mut watcher = watcher(tx, Duration::from_secs(2)).unwrap(); + fn watcher(&self) -> Result> { let account_hash = { let mut hasher = DefaultHasher::default(); hasher.write(self.name.as_bytes()); hasher.finish() }; - let root_path = self.path.to_path_buf(); - watcher.watch(&root_path, RecursiveMode::Recursive).unwrap(); let cache_dir = xdg::BaseDirectories::with_profile("meli", &self.name).unwrap(); - debug!("watching {:?}", root_path); + let event_consumer = self.event_consumer.clone(); let hash_indexes = self.hash_indexes.clone(); let mailbox_index = self.mailbox_index.clone(); - let root_mailbox_hash: MailboxHash = self - .mailboxes - .values() - .find(|m| m.parent.is_none()) - .map(|m| m.hash()) - .unwrap(); - let mailbox_counts = self - .mailboxes - .iter() - .map(|(&k, v)| (k, (v.unseen.clone(), v.total.clone()))) - .collect::>, Arc>)>>(); - Ok(Box::pin(async move { - // Move `watcher` in the closure's scope so that it doesn't get dropped. - let _watcher = watcher; - let mut buf = Vec::with_capacity(4096); - loop { - match rx.recv() { - /* - * Event types: - * - * pub enum RefreshEventKind { - * Update(EnvelopeHash, Envelope), // Old hash, new envelope - * Create(Envelope), - * Remove(EnvelopeHash), - * Rescan, - * } - */ - Ok(event) => match event { - /* Create */ - DebouncedEvent::Create(mut pathbuf) => { - debug!("DebouncedEvent::Create(path = {:?}", pathbuf); - if path_is_new!(pathbuf) { - debug!("path_is_new"); - /* This creates a Rename event that we will receive later */ - pathbuf = match move_to_cur(pathbuf) { - Ok(p) => p, - Err(e) => { - debug!("error: {}", e.to_string()); - continue; - } - }; - } - let mailbox_hash = get_path_hash!(pathbuf); - let file_name = pathbuf - .as_path() - .strip_prefix(&root_path) - .unwrap() - .to_path_buf(); - if let Ok(env) = add_path_to_index( - &hash_indexes, - mailbox_hash, - pathbuf.as_path(), - &cache_dir, - file_name, - &mut buf, - ) { - mailbox_index - .lock() - .unwrap() - .insert(env.hash(), mailbox_hash); - debug!( - "Create event {} {} {}", - env.hash(), - env.subject(), - pathbuf.display() - ); - if !env.is_seen() { - *mailbox_counts[&mailbox_hash].0.lock().unwrap() += 1; - } - *mailbox_counts[&mailbox_hash].1.lock().unwrap() += 1; - (sender)( - account_hash, - BackendEvent::Refresh(RefreshEvent { - account_hash, - mailbox_hash, - kind: Create(Box::new(env)), - }), - ); - } - } - /* Update */ - DebouncedEvent::NoticeWrite(pathbuf) | DebouncedEvent::Write(pathbuf) => { - debug!("DebouncedEvent::Write(path = {:?}", &pathbuf); - let mailbox_hash = get_path_hash!(pathbuf); - let mut hash_indexes_lock = hash_indexes.lock().unwrap(); - let index_lock = - &mut hash_indexes_lock.entry(mailbox_hash).or_default(); - let file_name = pathbuf - .as_path() - .strip_prefix(&root_path) - .unwrap() - .to_path_buf(); - /* Linear search in hash_index to find old hash */ - let old_hash: EnvelopeHash = { - if let Some((k, v)) = - index_lock.iter_mut().find(|(_, v)| *v.buf == pathbuf) - { - *v = pathbuf.clone().into(); - *k - } else { - drop(hash_indexes_lock); - /* Did we just miss a Create event? In any case, create - * envelope. */ - if let Ok(env) = add_path_to_index( - &hash_indexes, - mailbox_hash, - pathbuf.as_path(), - &cache_dir, - file_name, - &mut buf, - ) { - mailbox_index - .lock() - .unwrap() - .insert(env.hash(), mailbox_hash); - (sender)( - account_hash, - BackendEvent::Refresh(RefreshEvent { - account_hash, - mailbox_hash, - kind: Create(Box::new(env)), - }), - ); - } - continue; - } - }; - let new_hash: EnvelopeHash = get_file_hash(pathbuf.as_path()); - let mut reader = io::BufReader::new(fs::File::open(&pathbuf)?); - buf.clear(); - reader.read_to_end(&mut buf)?; - if index_lock.get_mut(&new_hash).is_none() { - debug!("write notice"); - if let Ok(mut env) = - Envelope::from_bytes(buf.as_slice(), Some(pathbuf.flags())) - { - env.set_hash(new_hash); - debug!("{}\t{:?}", new_hash, &pathbuf); - debug!( - "hash {}, path: {:?} couldn't be parsed", - new_hash, &pathbuf - ); - index_lock.insert(new_hash, pathbuf.into()); - - /* Send Write notice */ - - (sender)( - account_hash, - BackendEvent::Refresh(RefreshEvent { - account_hash, - mailbox_hash, - kind: Update(old_hash, Box::new(env)), - }), - ); - } - } - } - /* Remove */ - DebouncedEvent::NoticeRemove(pathbuf) | DebouncedEvent::Remove(pathbuf) => { - debug!("DebouncedEvent::Remove(path = {:?}", pathbuf); - let mailbox_hash = get_path_hash!(pathbuf); - let mut hash_indexes_lock = hash_indexes.lock().unwrap(); - let index_lock = hash_indexes_lock.entry(mailbox_hash).or_default(); - let hash: EnvelopeHash = if let Some((k, _)) = - index_lock.iter().find(|(_, v)| *v.buf == pathbuf) - { - *k - } else { - debug!("removed but not contained in index"); - continue; - }; - if let Some(ref modif) = &index_lock[&hash].modified { - match modif { - PathMod::Path(path) => debug!( - "envelope {} has modified path set {}", - hash, - path.display() - ), - PathMod::Hash(hash) => debug!( - "envelope {} has modified path set {}", - hash, - &index_lock[&hash].buf.display() - ), - } - index_lock.entry(hash).and_modify(|e| { - e.removed = false; - }); - continue; - } - { - let mut lck = mailbox_counts[&mailbox_hash].1.lock().unwrap(); - *lck = lck.saturating_sub(1); - } - if !pathbuf.flags().contains(Flag::SEEN) { - let mut lck = mailbox_counts[&mailbox_hash].0.lock().unwrap(); - *lck = lck.saturating_sub(1); - } - - index_lock.entry(hash).and_modify(|e| { - e.removed = true; - }); - - (sender)( - account_hash, - BackendEvent::Refresh(RefreshEvent { - account_hash, - mailbox_hash, - kind: Remove(hash), - }), - ); - } - /* Envelope hasn't changed */ - DebouncedEvent::Rename(src, dest) => { - debug!("DebouncedEvent::Rename(src = {:?}, dest = {:?})", src, dest); - let mailbox_hash = get_path_hash!(src); - let dest_mailbox = { - let dest_mailbox = get_path_hash!(dest); - if dest_mailbox == mailbox_hash { - None - } else { - Some(dest_mailbox) - } - }; - let old_hash: EnvelopeHash = get_file_hash(src.as_path()); - let new_hash: EnvelopeHash = get_file_hash(dest.as_path()); - - let mut hash_indexes_lock = hash_indexes.lock().unwrap(); - let index_lock = hash_indexes_lock.entry(mailbox_hash).or_default(); - let old_flags = src.flags(); - let new_flags = dest.flags(); - let was_seen: bool = old_flags.contains(Flag::SEEN); - let is_seen: bool = new_flags.contains(Flag::SEEN); - - if index_lock.contains_key(&old_hash) && !index_lock[&old_hash].removed - { - debug!("contains_old_key"); - if let Some(dest_mailbox) = dest_mailbox { - index_lock.entry(old_hash).and_modify(|e| { - e.removed = true; - }); - - (sender)( - account_hash, - BackendEvent::Refresh(RefreshEvent { - account_hash, - mailbox_hash, - kind: Remove(old_hash), - }), - ); - let file_name = dest - .as_path() - .strip_prefix(&root_path) - .unwrap() - .to_path_buf(); - drop(hash_indexes_lock); - if let Ok(env) = add_path_to_index( - &hash_indexes, - dest_mailbox, - dest.as_path(), - &cache_dir, - file_name, - &mut buf, - ) { - mailbox_index - .lock() - .unwrap() - .insert(env.hash(), dest_mailbox); - debug!( - "Create event {} {} {}", - env.hash(), - env.subject(), - dest.display() - ); - if !env.is_seen() { - *mailbox_counts[&dest_mailbox].0.lock().unwrap() += 1; - } - *mailbox_counts[&dest_mailbox].1.lock().unwrap() += 1; - (sender)( - account_hash, - BackendEvent::Refresh(RefreshEvent { - account_hash, - mailbox_hash: dest_mailbox, - kind: Create(Box::new(env)), - }), - ); - } - } else { - index_lock.entry(old_hash).and_modify(|e| { - debug!(&e.modified); - e.modified = Some(PathMod::Hash(new_hash)); - }); - (sender)( - account_hash, - BackendEvent::Refresh(RefreshEvent { - account_hash, - mailbox_hash, - kind: Rename(old_hash, new_hash), - }), - ); - if !was_seen && is_seen { - let mut lck = - mailbox_counts[&mailbox_hash].0.lock().unwrap(); - *lck = lck.saturating_sub(1); - } else if was_seen && !is_seen { - *mailbox_counts[&mailbox_hash].0.lock().unwrap() += 1; - } - if old_flags != new_flags { - (sender)( - account_hash, - BackendEvent::Refresh(RefreshEvent { - account_hash, - mailbox_hash, - kind: NewFlags(new_hash, (new_flags, vec![])), - }), - ); - } - mailbox_index.lock().unwrap().insert(new_hash, mailbox_hash); - index_lock.insert(new_hash, dest.into()); - } - continue; - } else if !index_lock.contains_key(&new_hash) - && index_lock - .get(&old_hash) - .map(|e| e.removed) - .unwrap_or(false) - { - if index_lock - .get(&old_hash) - .map(|e| e.removed) - .unwrap_or(false) - { - index_lock.entry(old_hash).and_modify(|e| { - e.modified = Some(PathMod::Hash(new_hash)); - e.removed = false; - }); - debug!("contains_old_key, key was marked as removed (by external source)"); - } else { - debug!("not contains_new_key"); - } - let file_name = dest - .as_path() - .strip_prefix(&root_path) - .unwrap() - .to_path_buf(); - debug!("filename = {:?}", file_name); - drop(hash_indexes_lock); - if let Ok(env) = add_path_to_index( - &hash_indexes, - dest_mailbox.unwrap_or(mailbox_hash), - dest.as_path(), - &cache_dir, - file_name, - &mut buf, - ) { - mailbox_index - .lock() - .unwrap() - .insert(env.hash(), dest_mailbox.unwrap_or(mailbox_hash)); - debug!( - "Create event {} {} {}", - env.hash(), - env.subject(), - dest.display() - ); - if !env.is_seen() { - *mailbox_counts[&dest_mailbox.unwrap_or(mailbox_hash)] - .0 - .lock() - .unwrap() += 1; - } - *mailbox_counts[&dest_mailbox.unwrap_or(mailbox_hash)] - .1 - .lock() - .unwrap() += 1; - (sender)( - account_hash, - BackendEvent::Refresh(RefreshEvent { - account_hash, - mailbox_hash: dest_mailbox.unwrap_or(mailbox_hash), - kind: Create(Box::new(env)), - }), - ); - continue; - } else { - debug!("not valid email"); - } - } else if let Some(dest_mailbox) = dest_mailbox { - drop(hash_indexes_lock); - let file_name = dest - .as_path() - .strip_prefix(&root_path) - .unwrap() - .to_path_buf(); - if let Ok(env) = add_path_to_index( - &hash_indexes, - dest_mailbox, - dest.as_path(), - &cache_dir, - file_name, - &mut buf, - ) { - mailbox_index - .lock() - .unwrap() - .insert(env.hash(), dest_mailbox); - debug!( - "Create event {} {} {}", - env.hash(), - env.subject(), - dest.display() - ); - if !env.is_seen() { - *mailbox_counts[&dest_mailbox].0.lock().unwrap() += 1; - } - *mailbox_counts[&dest_mailbox].1.lock().unwrap() += 1; - (sender)( - account_hash, - BackendEvent::Refresh(RefreshEvent { - account_hash, - mailbox_hash: dest_mailbox, - kind: Create(Box::new(env)), - }), - ); - } - } else { - if was_seen && !is_seen { - *mailbox_counts[&mailbox_hash].0.lock().unwrap() += 1; - } - (sender)( - account_hash, - BackendEvent::Refresh(RefreshEvent { - account_hash, - mailbox_hash, - kind: Rename(old_hash, new_hash), - }), - ); - debug!("contains_new_key"); - if old_flags != new_flags { - (sender)( - account_hash, - BackendEvent::Refresh(RefreshEvent { - account_hash, - mailbox_hash, - kind: NewFlags(new_hash, (new_flags, vec![])), - }), - ); - } - } - - /* Maybe a re-read should be triggered here just to be safe. - (sender)(account_hash, BackendEvent::Refresh(RefreshEvent { - account_hash, - mailbox_hash: get_path_hash!(dest), - kind: Rescan, - })); - */ - } - /* Trigger rescan of mailbox */ - DebouncedEvent::Rescan => { - (sender)( - account_hash, - BackendEvent::Refresh(RefreshEvent { - account_hash, - mailbox_hash: root_mailbox_hash, - kind: Rescan, - }), - ); - } - _ => {} - }, - Err(e) => debug!("watch error: {:?}", e), - } - } + let mailboxes = self.mailboxes.clone(); + let root_path = self.path.to_path_buf(); + Ok(Box::new(super::watch::MaildirWatcher { + account_hash, + cache_dir, + event_consumer, + hash_indexes, + mailbox_hashes: BTreeSet::default(), + mailbox_index, + mailboxes, + polling_period: std::time::Duration::from_secs(2), + root_path, })) } @@ -1335,49 +862,3 @@ impl MaildirType { Ok(()) } } - -fn add_path_to_index( - hash_index: &HashIndexes, - mailbox_hash: MailboxHash, - path: &Path, - cache_dir: &xdg::BaseDirectories, - file_name: PathBuf, - buf: &mut Vec, -) -> Result { - debug!("add_path_to_index path {:?} filename{:?}", path, file_name); - let env_hash = get_file_hash(path); - { - let mut map = hash_index.lock().unwrap(); - let map = map.entry(mailbox_hash).or_default(); - map.insert(env_hash, path.to_path_buf().into()); - debug!( - "inserted {} in {} map, len={}", - env_hash, - mailbox_hash, - map.len() - ); - } - let mut reader = io::BufReader::new(fs::File::open(&path)?); - buf.clear(); - reader.read_to_end(buf)?; - let mut env = Envelope::from_bytes(buf.as_slice(), Some(path.flags()))?; - env.set_hash(env_hash); - debug!( - "add_path_to_index gen {}\t{}", - env_hash, - file_name.display() - ); - if let Ok(cached) = cache_dir.place_cache_file(file_name) { - debug!("putting in cache"); - /* place result in cache directory */ - let f = fs::File::create(cached)?; - let metadata = f.metadata()?; - let mut permissions = metadata.permissions(); - - permissions.set_mode(0o600); // Read/write for owner only. - f.set_permissions(permissions)?; - let writer = io::BufWriter::new(f); - bincode::Options::serialize_into(bincode::config::DefaultOptions::new(), writer, &env)?; - } - Ok(env) -} diff --git a/melib/src/backends/maildir/watch.rs b/melib/src/backends/maildir/watch.rs new file mode 100644 index 000000000..ca26078b6 --- /dev/null +++ b/melib/src/backends/maildir/watch.rs @@ -0,0 +1,608 @@ +/* + * meli - mailbox module. + * + * Copyright 2017 - 2021 Manos Pitsidianakis + * + * This file is part of meli. + * + * meli is free software: you can redistribute it and/or modify + * it under the terms of the GNU General Public License as published by + * the Free Software Foundation, either version 3 of the License, or + * (at your option) any later version. + * + * meli is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU General Public License for more details. + * + * You should have received a copy of the GNU General Public License + * along with meli. If not, see . + */ + +use super::*; +use crate::backends::{RefreshEventKind::*, *}; +use std::collections::BTreeSet; +use std::ffi::OsStr; +use std::io; +use std::os::unix::fs::PermissionsExt; +extern crate notify; +use notify::{watcher, DebouncedEvent, RecursiveMode, Watcher}; +use std::path::{Component, Path, PathBuf}; +use std::sync::mpsc::channel; + +#[derive(Debug)] +pub struct MaildirWatcher { + pub account_hash: AccountHash, + pub cache_dir: xdg::BaseDirectories, + pub event_consumer: BackendEventConsumer, + pub hash_indexes: HashIndexes, + pub mailbox_hashes: BTreeSet, + pub mailbox_index: Arc>>, + pub mailboxes: HashMap, + pub polling_period: std::time::Duration, + pub root_path: PathBuf, +} + +impl BackendWatcher for MaildirWatcher { + fn is_blocking(&self) -> bool { + true + } + + fn register_mailbox( + &mut self, + mailbox_hash: MailboxHash, + _urgency: MailboxWatchUrgency, + ) -> Result<()> { + self.mailbox_hashes.insert(mailbox_hash); + Ok(()) + } + + fn set_polling_period(&mut self, period: Option) -> Result<()> { + if let Some(period) = period { + self.polling_period = period; + } + Ok(()) + } + + fn spawn(self: Box) -> ResultFuture<()> { + let MaildirWatcher { + account_hash, + cache_dir, + event_consumer: sender, + hash_indexes, + mailbox_hashes: _, + mailbox_index, + mailboxes, + polling_period, + root_path, + } = *self; + Ok(Box::pin(async move { + let (tx, rx) = channel(); + let mut watcher = watcher(tx, polling_period).unwrap(); + watcher.watch(&root_path, RecursiveMode::Recursive).unwrap(); + debug!("watching {:?}", root_path); + let root_mailbox_hash: MailboxHash = mailboxes + .values() + .find(|m| m.parent.is_none()) + .map(|m| m.hash()) + .unwrap(); + let mailbox_counts = mailboxes + .iter() + .map(|(&k, v)| (k, (v.unseen.clone(), v.total.clone()))) + .collect::>, Arc>)>>(); + let mut buf = Vec::with_capacity(4096); + loop { + match rx.recv() { + /* + * Event types: + * + * pub enum RefreshEventKind { + * Update(EnvelopeHash, Envelope), // Old hash, new envelope + * Create(Envelope), + * Remove(EnvelopeHash), + * Rescan, + * } + */ + Ok(event) => match event { + /* Create */ + DebouncedEvent::Create(mut pathbuf) => { + debug!("DebouncedEvent::Create(path = {:?}", pathbuf); + if path_is_new!(pathbuf) { + debug!("path_is_new"); + /* This creates a Rename event that we will receive later */ + pathbuf = match move_to_cur(pathbuf) { + Ok(p) => p, + Err(e) => { + debug!("error: {}", e.to_string()); + continue; + } + }; + } + let mailbox_hash = get_path_hash!(pathbuf); + let file_name = pathbuf + .as_path() + .strip_prefix(&root_path) + .unwrap() + .to_path_buf(); + if let Ok(env) = add_path_to_index( + &hash_indexes, + mailbox_hash, + pathbuf.as_path(), + &cache_dir, + file_name, + &mut buf, + ) { + mailbox_index + .lock() + .unwrap() + .insert(env.hash(), mailbox_hash); + debug!( + "Create event {} {} {}", + env.hash(), + env.subject(), + pathbuf.display() + ); + if !env.is_seen() { + *mailbox_counts[&mailbox_hash].0.lock().unwrap() += 1; + } + *mailbox_counts[&mailbox_hash].1.lock().unwrap() += 1; + (sender)( + account_hash, + BackendEvent::Refresh(RefreshEvent { + account_hash, + mailbox_hash, + kind: Create(Box::new(env)), + }), + ); + } + } + /* Update */ + DebouncedEvent::NoticeWrite(pathbuf) | DebouncedEvent::Write(pathbuf) => { + debug!("DebouncedEvent::Write(path = {:?}", &pathbuf); + let mailbox_hash = get_path_hash!(pathbuf); + let mut hash_indexes_lock = hash_indexes.lock().unwrap(); + let index_lock = + &mut hash_indexes_lock.entry(mailbox_hash).or_default(); + let file_name = pathbuf + .as_path() + .strip_prefix(&root_path) + .unwrap() + .to_path_buf(); + /* Linear search in hash_index to find old hash */ + let old_hash: EnvelopeHash = { + if let Some((k, v)) = + index_lock.iter_mut().find(|(_, v)| *v.buf == pathbuf) + { + *v = pathbuf.clone().into(); + *k + } else { + drop(hash_indexes_lock); + /* Did we just miss a Create event? In any case, create + * envelope. */ + if let Ok(env) = add_path_to_index( + &hash_indexes, + mailbox_hash, + pathbuf.as_path(), + &cache_dir, + file_name, + &mut buf, + ) { + mailbox_index + .lock() + .unwrap() + .insert(env.hash(), mailbox_hash); + (sender)( + account_hash, + BackendEvent::Refresh(RefreshEvent { + account_hash, + mailbox_hash, + kind: Create(Box::new(env)), + }), + ); + } + continue; + } + }; + let new_hash: EnvelopeHash = get_file_hash(pathbuf.as_path()); + let mut reader = io::BufReader::new(fs::File::open(&pathbuf)?); + buf.clear(); + reader.read_to_end(&mut buf)?; + if index_lock.get_mut(&new_hash).is_none() { + debug!("write notice"); + if let Ok(mut env) = + Envelope::from_bytes(buf.as_slice(), Some(pathbuf.flags())) + { + env.set_hash(new_hash); + debug!("{}\t{:?}", new_hash, &pathbuf); + debug!( + "hash {}, path: {:?} couldn't be parsed", + new_hash, &pathbuf + ); + index_lock.insert(new_hash, pathbuf.into()); + + /* Send Write notice */ + + (sender)( + account_hash, + BackendEvent::Refresh(RefreshEvent { + account_hash, + mailbox_hash, + kind: Update(old_hash, Box::new(env)), + }), + ); + } + } + } + /* Remove */ + DebouncedEvent::NoticeRemove(pathbuf) | DebouncedEvent::Remove(pathbuf) => { + debug!("DebouncedEvent::Remove(path = {:?}", pathbuf); + let mailbox_hash = get_path_hash!(pathbuf); + let mut hash_indexes_lock = hash_indexes.lock().unwrap(); + let index_lock = hash_indexes_lock.entry(mailbox_hash).or_default(); + let hash: EnvelopeHash = if let Some((k, _)) = + index_lock.iter().find(|(_, v)| *v.buf == pathbuf) + { + *k + } else { + debug!("removed but not contained in index"); + continue; + }; + if let Some(ref modif) = &index_lock[&hash].modified { + match modif { + PathMod::Path(path) => debug!( + "envelope {} has modified path set {}", + hash, + path.display() + ), + PathMod::Hash(hash) => debug!( + "envelope {} has modified path set {}", + hash, + &index_lock[&hash].buf.display() + ), + } + index_lock.entry(hash).and_modify(|e| { + e.removed = false; + }); + continue; + } + { + let mut lck = mailbox_counts[&mailbox_hash].1.lock().unwrap(); + *lck = lck.saturating_sub(1); + } + if !pathbuf.flags().contains(Flag::SEEN) { + let mut lck = mailbox_counts[&mailbox_hash].0.lock().unwrap(); + *lck = lck.saturating_sub(1); + } + + index_lock.entry(hash).and_modify(|e| { + e.removed = true; + }); + + (sender)( + account_hash, + BackendEvent::Refresh(RefreshEvent { + account_hash, + mailbox_hash, + kind: Remove(hash), + }), + ); + } + /* Envelope hasn't changed */ + DebouncedEvent::Rename(src, dest) => { + debug!("DebouncedEvent::Rename(src = {:?}, dest = {:?})", src, dest); + let mailbox_hash = get_path_hash!(src); + let dest_mailbox = { + let dest_mailbox = get_path_hash!(dest); + if dest_mailbox == mailbox_hash { + None + } else { + Some(dest_mailbox) + } + }; + let old_hash: EnvelopeHash = get_file_hash(src.as_path()); + let new_hash: EnvelopeHash = get_file_hash(dest.as_path()); + + let mut hash_indexes_lock = hash_indexes.lock().unwrap(); + let index_lock = hash_indexes_lock.entry(mailbox_hash).or_default(); + let old_flags = src.flags(); + let new_flags = dest.flags(); + let was_seen: bool = old_flags.contains(Flag::SEEN); + let is_seen: bool = new_flags.contains(Flag::SEEN); + + if index_lock.contains_key(&old_hash) && !index_lock[&old_hash].removed + { + debug!("contains_old_key"); + if let Some(dest_mailbox) = dest_mailbox { + index_lock.entry(old_hash).and_modify(|e| { + e.removed = true; + }); + + (sender)( + account_hash, + BackendEvent::Refresh(RefreshEvent { + account_hash, + mailbox_hash, + kind: Remove(old_hash), + }), + ); + let file_name = dest + .as_path() + .strip_prefix(&root_path) + .unwrap() + .to_path_buf(); + drop(hash_indexes_lock); + if let Ok(env) = add_path_to_index( + &hash_indexes, + dest_mailbox, + dest.as_path(), + &cache_dir, + file_name, + &mut buf, + ) { + mailbox_index + .lock() + .unwrap() + .insert(env.hash(), dest_mailbox); + debug!( + "Create event {} {} {}", + env.hash(), + env.subject(), + dest.display() + ); + if !env.is_seen() { + *mailbox_counts[&dest_mailbox].0.lock().unwrap() += 1; + } + *mailbox_counts[&dest_mailbox].1.lock().unwrap() += 1; + (sender)( + account_hash, + BackendEvent::Refresh(RefreshEvent { + account_hash, + mailbox_hash: dest_mailbox, + kind: Create(Box::new(env)), + }), + ); + } + } else { + index_lock.entry(old_hash).and_modify(|e| { + debug!(&e.modified); + e.modified = Some(PathMod::Hash(new_hash)); + }); + (sender)( + account_hash, + BackendEvent::Refresh(RefreshEvent { + account_hash, + mailbox_hash, + kind: Rename(old_hash, new_hash), + }), + ); + if !was_seen && is_seen { + let mut lck = + mailbox_counts[&mailbox_hash].0.lock().unwrap(); + *lck = lck.saturating_sub(1); + } else if was_seen && !is_seen { + *mailbox_counts[&mailbox_hash].0.lock().unwrap() += 1; + } + if old_flags != new_flags { + (sender)( + account_hash, + BackendEvent::Refresh(RefreshEvent { + account_hash, + mailbox_hash, + kind: NewFlags(new_hash, (new_flags, vec![])), + }), + ); + } + mailbox_index.lock().unwrap().insert(new_hash, mailbox_hash); + index_lock.insert(new_hash, dest.into()); + } + continue; + } else if !index_lock.contains_key(&new_hash) + && index_lock + .get(&old_hash) + .map(|e| e.removed) + .unwrap_or(false) + { + if index_lock + .get(&old_hash) + .map(|e| e.removed) + .unwrap_or(false) + { + index_lock.entry(old_hash).and_modify(|e| { + e.modified = Some(PathMod::Hash(new_hash)); + e.removed = false; + }); + debug!("contains_old_key, key was marked as removed (by external source)"); + } else { + debug!("not contains_new_key"); + } + let file_name = dest + .as_path() + .strip_prefix(&root_path) + .unwrap() + .to_path_buf(); + debug!("filename = {:?}", file_name); + drop(hash_indexes_lock); + if let Ok(env) = add_path_to_index( + &hash_indexes, + dest_mailbox.unwrap_or(mailbox_hash), + dest.as_path(), + &cache_dir, + file_name, + &mut buf, + ) { + mailbox_index + .lock() + .unwrap() + .insert(env.hash(), dest_mailbox.unwrap_or(mailbox_hash)); + debug!( + "Create event {} {} {}", + env.hash(), + env.subject(), + dest.display() + ); + if !env.is_seen() { + *mailbox_counts[&dest_mailbox.unwrap_or(mailbox_hash)] + .0 + .lock() + .unwrap() += 1; + } + *mailbox_counts[&dest_mailbox.unwrap_or(mailbox_hash)] + .1 + .lock() + .unwrap() += 1; + (sender)( + account_hash, + BackendEvent::Refresh(RefreshEvent { + account_hash, + mailbox_hash: dest_mailbox.unwrap_or(mailbox_hash), + kind: Create(Box::new(env)), + }), + ); + continue; + } else { + debug!("not valid email"); + } + } else if let Some(dest_mailbox) = dest_mailbox { + drop(hash_indexes_lock); + let file_name = dest + .as_path() + .strip_prefix(&root_path) + .unwrap() + .to_path_buf(); + if let Ok(env) = add_path_to_index( + &hash_indexes, + dest_mailbox, + dest.as_path(), + &cache_dir, + file_name, + &mut buf, + ) { + mailbox_index + .lock() + .unwrap() + .insert(env.hash(), dest_mailbox); + debug!( + "Create event {} {} {}", + env.hash(), + env.subject(), + dest.display() + ); + if !env.is_seen() { + *mailbox_counts[&dest_mailbox].0.lock().unwrap() += 1; + } + *mailbox_counts[&dest_mailbox].1.lock().unwrap() += 1; + (sender)( + account_hash, + BackendEvent::Refresh(RefreshEvent { + account_hash, + mailbox_hash: dest_mailbox, + kind: Create(Box::new(env)), + }), + ); + } + } else { + if was_seen && !is_seen { + *mailbox_counts[&mailbox_hash].0.lock().unwrap() += 1; + } + (sender)( + account_hash, + BackendEvent::Refresh(RefreshEvent { + account_hash, + mailbox_hash, + kind: Rename(old_hash, new_hash), + }), + ); + debug!("contains_new_key"); + if old_flags != new_flags { + (sender)( + account_hash, + BackendEvent::Refresh(RefreshEvent { + account_hash, + mailbox_hash, + kind: NewFlags(new_hash, (new_flags, vec![])), + }), + ); + } + } + + /* Maybe a re-read should be triggered here just to be safe. + (sender)(account_hash, BackendEvent::Refresh(RefreshEvent { + account_hash, + mailbox_hash: get_path_hash!(dest), + kind: Rescan, + })); + */ + } + /* Trigger rescan of mailbox */ + DebouncedEvent::Rescan => { + (sender)( + account_hash, + BackendEvent::Refresh(RefreshEvent { + account_hash, + mailbox_hash: root_mailbox_hash, + kind: Rescan, + }), + ); + } + _ => {} + }, + Err(e) => debug!("watch error: {:?}", e), + } + } + })) + } + + fn as_any(&self) -> &dyn Any { + self + } + + fn as_any_mut(&mut self) -> &mut dyn Any { + self + } +} + +fn add_path_to_index( + hash_index: &HashIndexes, + mailbox_hash: MailboxHash, + path: &Path, + cache_dir: &xdg::BaseDirectories, + file_name: PathBuf, + buf: &mut Vec, +) -> Result { + debug!("add_path_to_index path {:?} filename{:?}", path, file_name); + let env_hash = get_file_hash(path); + { + let mut map = hash_index.lock().unwrap(); + let map = map.entry(mailbox_hash).or_default(); + map.insert(env_hash, path.to_path_buf().into()); + debug!( + "inserted {} in {} map, len={}", + env_hash, + mailbox_hash, + map.len() + ); + } + let mut reader = io::BufReader::new(fs::File::open(&path)?); + buf.clear(); + reader.read_to_end(buf)?; + let mut env = Envelope::from_bytes(buf.as_slice(), Some(path.flags()))?; + env.set_hash(env_hash); + debug!( + "add_path_to_index gen {}\t{}", + env_hash, + file_name.display() + ); + if let Ok(cached) = cache_dir.place_cache_file(file_name) { + debug!("putting in cache"); + /* place result in cache directory */ + let f = fs::File::create(cached)?; + let metadata = f.metadata()?; + let mut permissions = metadata.permissions(); + + permissions.set_mode(0o600); // Read/write for owner only. + f.set_permissions(permissions)?; + let writer = io::BufWriter::new(f); + bincode::Options::serialize_into(bincode::config::DefaultOptions::new(), writer, &env)?; + } + Ok(env) +} diff --git a/melib/src/backends/mbox.rs b/melib/src/backends/mbox.rs index fbc3cbb4c..981a65340 100644 --- a/melib/src/backends/mbox.rs +++ b/melib/src/backends/mbox.rs @@ -147,6 +147,8 @@ use std::sync::{Arc, Mutex, RwLock}; pub mod write; +pub mod watch; + pub type Offset = usize; pub type Length = usize; @@ -184,7 +186,7 @@ fn get_rw_lock_blocking(f: &File, path: &Path) -> Result<()> { } #[derive(Debug)] -struct MboxMailbox { +pub struct MboxMailbox { hash: MailboxHash, name: String, path: PathBuf, @@ -950,157 +952,24 @@ impl MailBackend for MboxType { Err(MeliError::new("Unimplemented.")) } - fn watch(&self) -> ResultFuture<()> { - let sender = self.event_consumer.clone(); - let (tx, rx) = channel(); - let mut watcher = watcher(tx, std::time::Duration::from_secs(10)) - .map_err(|e| e.to_string()) - .map_err(MeliError::new)?; - for f in self.mailboxes.lock().unwrap().values() { - watcher - .watch(&f.fs_path, RecursiveMode::Recursive) - .map_err(|e| e.to_string()) - .map_err(MeliError::new)?; - debug!("watching {:?}", f.fs_path.as_path()); - } + fn watcher(&self) -> Result> { let account_hash = { let mut hasher = DefaultHasher::new(); hasher.write(self.account_name.as_bytes()); hasher.finish() }; - let mailboxes = self.mailboxes.clone(); + let event_consumer = self.event_consumer.clone(); let mailbox_index = self.mailbox_index.clone(); - let prefer_mbox_type = self.prefer_mbox_type; - Ok(Box::pin(async move { - loop { - match rx.recv() { - /* - * Event types: - * - * pub enum RefreshEventKind { - * Update(EnvelopeHash, Envelope), // Old hash, new envelope - * Create(Envelope), - * Remove(EnvelopeHash), - * Rescan, - * } - */ - Ok(event) => match event { - /* Update */ - DebouncedEvent::NoticeWrite(pathbuf) | DebouncedEvent::Write(pathbuf) => { - let mailbox_hash = get_path_hash!(&pathbuf); - let file = match std::fs::OpenOptions::new() - .read(true) - .write(true) - .open(&pathbuf) - { - Ok(f) => f, - Err(_) => { - continue; - } - }; - get_rw_lock_blocking(&file, &pathbuf)?; - let mut mailbox_lock = mailboxes.lock().unwrap(); - let mut buf_reader = BufReader::new(file); - let mut contents = Vec::new(); - if let Err(e) = buf_reader.read_to_end(&mut contents) { - debug!(e); - continue; - }; - if contents.starts_with(mailbox_lock[&mailbox_hash].content.as_slice()) - { - if let Ok((_, envelopes)) = mbox_parse( - mailbox_lock[&mailbox_hash].index.clone(), - &contents, - mailbox_lock[&mailbox_hash].content.len(), - prefer_mbox_type, - ) { - let mut mailbox_index_lck = mailbox_index.lock().unwrap(); - for env in envelopes { - mailbox_index_lck.insert(env.hash(), mailbox_hash); - (sender)( - account_hash, - BackendEvent::Refresh(RefreshEvent { - account_hash, - mailbox_hash, - kind: RefreshEventKind::Create(Box::new(env)), - }), - ); - } - } - } else { - (sender)( - account_hash, - BackendEvent::Refresh(RefreshEvent { - account_hash, - mailbox_hash, - kind: RefreshEventKind::Rescan, - }), - ); - } - mailbox_lock - .entry(mailbox_hash) - .and_modify(|f| f.content = contents); - } - /* Remove */ - DebouncedEvent::NoticeRemove(pathbuf) | DebouncedEvent::Remove(pathbuf) => { - if mailboxes - .lock() - .unwrap() - .values() - .any(|f| f.fs_path == pathbuf) - { - let mailbox_hash = get_path_hash!(&pathbuf); - (sender)( - account_hash, - BackendEvent::Refresh(RefreshEvent { - account_hash, - mailbox_hash, - kind: RefreshEventKind::Failure(MeliError::new(format!( - "mbox mailbox {} was removed.", - pathbuf.display() - ))), - }), - ); - return Ok(()); - } - } - DebouncedEvent::Rename(src, dest) => { - if mailboxes.lock().unwrap().values().any(|f| f.fs_path == src) { - let mailbox_hash = get_path_hash!(&src); - (sender)( - account_hash, - BackendEvent::Refresh(RefreshEvent { - account_hash, - mailbox_hash, - kind: RefreshEventKind::Failure(MeliError::new(format!( - "mbox mailbox {} was renamed to {}.", - src.display(), - dest.display() - ))), - }), - ); - return Ok(()); - } - } - /* Trigger rescan of mailboxes */ - DebouncedEvent::Rescan => { - for &mailbox_hash in mailboxes.lock().unwrap().keys() { - (sender)( - account_hash, - BackendEvent::Refresh(RefreshEvent { - account_hash, - mailbox_hash, - kind: RefreshEventKind::Rescan, - }), - ); - } - return Ok(()); - } - _ => {} - }, - Err(e) => debug!("watch error: {:?}", e), - } - } + let mailboxes = self.mailboxes.clone(); + let prefer_mbox_type = self.prefer_mbox_type.clone(); + Ok(Box::new(watch::MboxWatcher { + account_hash, + event_consumer, + mailbox_hashes: BTreeSet::default(), + mailbox_index, + mailboxes, + polling_period: std::time::Duration::from_secs(60), + prefer_mbox_type, })) } diff --git a/melib/src/backends/mbox/watch.rs b/melib/src/backends/mbox/watch.rs new file mode 100644 index 000000000..81036feb2 --- /dev/null +++ b/melib/src/backends/mbox/watch.rs @@ -0,0 +1,223 @@ +/* + * meli - mailbox module. + * + * Copyright 2017 - 2021 Manos Pitsidianakis + * + * This file is part of meli. + * + * meli is free software: you can redistribute it and/or modify + * it under the terms of the GNU General Public License as published by + * the Free Software Foundation, either version 3 of the License, or + * (at your option) any later version. + * + * meli is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU General Public License for more details. + * + * You should have received a copy of the GNU General Public License + * along with meli. If not, see . + */ + +use super::*; +use std::collections::BTreeSet; + +#[derive(Debug)] +pub struct MboxWatcher { + pub account_hash: AccountHash, + pub event_consumer: BackendEventConsumer, + pub mailbox_hashes: BTreeSet, + pub mailbox_index: Arc>>, + pub mailboxes: Arc>>, + pub polling_period: std::time::Duration, + pub prefer_mbox_type: Option, +} + +impl BackendWatcher for MboxWatcher { + fn is_blocking(&self) -> bool { + true + } + + fn register_mailbox( + &mut self, + mailbox_hash: MailboxHash, + _urgency: MailboxWatchUrgency, + ) -> Result<()> { + self.mailbox_hashes.insert(mailbox_hash); + Ok(()) + } + + fn set_polling_period(&mut self, period: Option) -> Result<()> { + if let Some(period) = period { + self.polling_period = period; + } + Ok(()) + } + + fn spawn(self: Box) -> ResultFuture<()> { + let MboxWatcher { + account_hash, + event_consumer: sender, + mailbox_hashes, + mailbox_index, + mailboxes, + polling_period, + prefer_mbox_type, + } = *self; + let (tx, rx) = channel(); + let mut watcher = watcher(tx, polling_period) + .map_err(|e| e.to_string()) + .map_err(MeliError::new)?; + for (_, f) in mailboxes + .lock() + .unwrap() + .iter() + .filter(|(k, _)| mailbox_hashes.contains(k)) + { + watcher + .watch(&f.fs_path, RecursiveMode::Recursive) + .map_err(|e| e.to_string()) + .map_err(MeliError::new)?; + debug!("watching {:?}", f.fs_path.as_path()); + } + Ok(Box::pin(async move { + loop { + match rx.recv() { + /* + * Event types: + * + * pub enum RefreshEventKind { + * Update(EnvelopeHash, Envelope), // Old hash, new envelope + * Create(Envelope), + * Remove(EnvelopeHash), + * Rescan, + * } + */ + Ok(event) => match event { + /* Update */ + DebouncedEvent::NoticeWrite(pathbuf) | DebouncedEvent::Write(pathbuf) => { + let mailbox_hash = get_path_hash!(&pathbuf); + let file = match std::fs::OpenOptions::new() + .read(true) + .write(true) + .open(&pathbuf) + { + Ok(f) => f, + Err(_) => { + continue; + } + }; + get_rw_lock_blocking(&file, &pathbuf)?; + let mut mailbox_lock = mailboxes.lock().unwrap(); + let mut buf_reader = BufReader::new(file); + let mut contents = Vec::new(); + if let Err(e) = buf_reader.read_to_end(&mut contents) { + debug!(e); + continue; + }; + if contents.starts_with(mailbox_lock[&mailbox_hash].content.as_slice()) + { + if let Ok((_, envelopes)) = mbox_parse( + mailbox_lock[&mailbox_hash].index.clone(), + &contents, + mailbox_lock[&mailbox_hash].content.len(), + prefer_mbox_type, + ) { + let mut mailbox_index_lck = mailbox_index.lock().unwrap(); + for env in envelopes { + mailbox_index_lck.insert(env.hash(), mailbox_hash); + (sender)( + account_hash, + BackendEvent::Refresh(RefreshEvent { + account_hash, + mailbox_hash, + kind: RefreshEventKind::Create(Box::new(env)), + }), + ); + } + } + } else { + (sender)( + account_hash, + BackendEvent::Refresh(RefreshEvent { + account_hash, + mailbox_hash, + kind: RefreshEventKind::Rescan, + }), + ); + } + mailbox_lock + .entry(mailbox_hash) + .and_modify(|f| f.content = contents); + } + /* Remove */ + DebouncedEvent::NoticeRemove(pathbuf) | DebouncedEvent::Remove(pathbuf) => { + if mailboxes + .lock() + .unwrap() + .values() + .any(|f| f.fs_path == pathbuf) + { + let mailbox_hash = get_path_hash!(&pathbuf); + (sender)( + account_hash, + BackendEvent::Refresh(RefreshEvent { + account_hash, + mailbox_hash, + kind: RefreshEventKind::Failure(MeliError::new(format!( + "mbox mailbox {} was removed.", + pathbuf.display() + ))), + }), + ); + return Ok(()); + } + } + DebouncedEvent::Rename(src, dest) => { + if mailboxes.lock().unwrap().values().any(|f| f.fs_path == src) { + let mailbox_hash = get_path_hash!(&src); + (sender)( + account_hash, + BackendEvent::Refresh(RefreshEvent { + account_hash, + mailbox_hash, + kind: RefreshEventKind::Failure(MeliError::new(format!( + "mbox mailbox {} was renamed to {}.", + src.display(), + dest.display() + ))), + }), + ); + return Ok(()); + } + } + /* Trigger rescan of mailboxes */ + DebouncedEvent::Rescan => { + for &mailbox_hash in mailboxes.lock().unwrap().keys() { + (sender)( + account_hash, + BackendEvent::Refresh(RefreshEvent { + account_hash, + mailbox_hash, + kind: RefreshEventKind::Rescan, + }), + ); + } + return Ok(()); + } + _ => {} + }, + Err(e) => debug!("watch error: {:?}", e), + } + } + })) + } + + fn as_any(&self) -> &dyn Any { + self + } + + fn as_any_mut(&mut self) -> &mut dyn Any { + self + } +} diff --git a/melib/src/backends/nntp.rs b/melib/src/backends/nntp.rs index c3176e262..2993a635c 100644 --- a/melib/src/backends/nntp.rs +++ b/melib/src/backends/nntp.rs @@ -239,7 +239,7 @@ impl MailBackend for NntpType { })) } - fn watch(&self) -> ResultFuture<()> { + fn watcher(&self) -> Result> { Err(MeliError::new("Unimplemented.")) } diff --git a/melib/src/backends/notmuch.rs b/melib/src/backends/notmuch.rs index d2de93673..19ca08c07 100644 --- a/melib/src/backends/notmuch.rs +++ b/melib/src/backends/notmuch.rs @@ -67,6 +67,8 @@ pub use tags::*; mod thread; pub use thread::*; +mod watch; + #[derive(Debug)] pub struct DbConnection { pub lib: Arc, @@ -232,7 +234,7 @@ unsafe impl Send for NotmuchDb {} unsafe impl Sync for NotmuchDb {} #[derive(Debug, Clone, Default)] -struct NotmuchMailbox { +pub struct NotmuchMailbox { hash: MailboxHash, children: Vec, parent: Option, @@ -581,50 +583,29 @@ impl MailBackend for NotmuchDb { })) } - fn watch(&self) -> ResultFuture<()> { - extern crate notify; - use notify::{watcher, RecursiveMode, Watcher}; - + fn watcher(&self) -> Result> { let account_hash = self.account_hash; let collection = self.collection.clone(); + let event_consumer = self.event_consumer.clone(); + let index = self.index.clone(); let lib = self.lib.clone(); + let mailbox_index = self.mailbox_index.clone(); + let mailboxes = self.mailboxes.clone(); let path = self.path.clone(); let revision_uuid = self.revision_uuid.clone(); - let mailboxes = self.mailboxes.clone(); - let index = self.index.clone(); - let mailbox_index = self.mailbox_index.clone(); - let event_consumer = self.event_consumer.clone(); - let (tx, rx) = std::sync::mpsc::channel(); - let mut watcher = watcher(tx, std::time::Duration::from_secs(2)).unwrap(); - watcher.watch(&self.path, RecursiveMode::Recursive).unwrap(); - Ok(Box::pin(async move { - let _watcher = watcher; - let rx = rx; - loop { - let _ = rx.recv().map_err(|err| err.to_string())?; - { - let mut database = NotmuchDb::new_connection( - path.as_path(), - revision_uuid.clone(), - lib.clone(), - false, - )?; - let new_revision_uuid = database.get_revision_uuid(); - if new_revision_uuid > *database.revision_uuid.read().unwrap() { - database.refresh( - mailboxes.clone(), - index.clone(), - mailbox_index.clone(), - collection.tag_index.clone(), - account_hash.clone(), - event_consumer.clone(), - new_revision_uuid, - )?; - *revision_uuid.write().unwrap() = new_revision_uuid; - } - } - } + Ok(Box::new(watch::NotmuchWatcher { + account_hash, + collection, + event_consumer, + index, + lib, + mailbox_hashes: BTreeSet::default(), + mailbox_index, + mailboxes, + path, + polling_period: std::time::Duration::from_secs(3), + revision_uuid, })) } diff --git a/melib/src/backends/notmuch/watch.rs b/melib/src/backends/notmuch/watch.rs new file mode 100644 index 000000000..caedc5f55 --- /dev/null +++ b/melib/src/backends/notmuch/watch.rs @@ -0,0 +1,116 @@ +/* + * meli - notmuch backend + * + * Copyright 2019 - 2021 Manos Pitsidianakis + * + * This file is part of meli. + * + * meli is free software: you can redistribute it and/or modify + * it under the terms of the GNU General Public License as published by + * the Free Software Foundation, either version 3 of the License, or + * (at your option) any later version. + * + * meli is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU General Public License for more details. + * + * You should have received a copy of the GNU General Public License + * along with meli. If not, see . + */ + +use super::*; +use std::collections::BTreeSet; + +#[derive(Debug)] +pub struct NotmuchWatcher { + pub account_hash: AccountHash, + pub collection: Collection, + pub event_consumer: BackendEventConsumer, + pub index: Arc>>, + pub lib: Arc, + pub mailbox_hashes: BTreeSet, + pub mailbox_index: Arc>>>, + pub mailboxes: Arc>>, + pub path: PathBuf, + pub polling_period: std::time::Duration, + pub revision_uuid: Arc>, +} + +impl BackendWatcher for NotmuchWatcher { + fn is_blocking(&self) -> bool { + true + } + + fn register_mailbox( + &mut self, + mailbox_hash: MailboxHash, + _urgency: MailboxWatchUrgency, + ) -> Result<()> { + self.mailbox_hashes.insert(mailbox_hash); + Ok(()) + } + + fn set_polling_period(&mut self, period: Option) -> Result<()> { + if let Some(period) = period { + self.polling_period = period; + } + Ok(()) + } + + fn spawn(self: Box) -> ResultFuture<()> { + Ok(Box::pin(async move { + extern crate notify; + use notify::{watcher, RecursiveMode, Watcher}; + let NotmuchWatcher { + account_hash, + collection, + event_consumer, + index, + lib, + mailbox_hashes: _, + mailbox_index, + mailboxes, + path, + polling_period, + revision_uuid, + } = *self; + + let (tx, rx) = std::sync::mpsc::channel(); + let mut watcher = watcher(tx, polling_period).unwrap(); + watcher.watch(&path, RecursiveMode::Recursive).unwrap(); + loop { + let _ = rx.recv().map_err(|err| err.to_string())?; + { + let mut database = NotmuchDb::new_connection( + path.as_path(), + revision_uuid.clone(), + lib.clone(), + false, + )?; + let new_revision_uuid = database.get_revision_uuid(); + if new_revision_uuid > *database.revision_uuid.read().unwrap() { + database.refresh( + mailboxes.clone(), + index.clone(), + mailbox_index.clone(), + collection.tag_index.clone(), + account_hash.clone(), + event_consumer.clone(), + new_revision_uuid, + )?; + *revision_uuid.write().unwrap() = new_revision_uuid; + } + } + } + })) + } + + fn as_any(&self) -> &dyn Any { + self + } + + fn as_any_mut(&mut self) -> &mut dyn Any { + self + } +} diff --git a/src/conf/accounts.rs b/src/conf/accounts.rs index be2a75ac0..5299b42ad 100644 --- a/src/conf/accounts.rs +++ b/src/conf/accounts.rs @@ -1068,9 +1068,23 @@ impl Account { } if !self.active_jobs.values().any(|j| j.is_watch()) { - match self.backend.read().unwrap().watch() { - Ok(fut) => { - let handle = if self.backend_capabilities.is_async { + match self + .backend + .read() + .unwrap() + .watcher() + .and_then(|mut watcher| { + for (mailbox_hash, _) in self + .mailbox_entries + .iter() + .filter(|(_, m)| m.conf.mailbox_conf.subscribe.is_true()) + { + watcher.register_mailbox(*mailbox_hash, MailboxWatchUrgency::High)?; + } + Ok((watcher.is_blocking(), watcher.spawn()?)) + }) { + Ok((is_blocking, fut)) => { + let handle = if is_blocking { self.job_executor.spawn_specialized(fut) } else { self.job_executor.spawn_blocking(fut)