From f7c9f21575e004ccd3d21cdd4ea34ec034436272 Mon Sep 17 00:00:00 2001 From: Manos Pitsidianakis Date: Tue, 25 Aug 2020 12:49:31 +0300 Subject: [PATCH] melib/imap: add CONDSTORE support Closes #52 --- melib/src/backends/imap.rs | 594 +++++++-------- melib/src/backends/imap/cache.rs | 430 ++++++++--- melib/src/backends/imap/cache/sync.rs | 735 ++++++++++++++++++ melib/src/backends/imap/connection.rs | 102 ++- melib/src/backends/imap/operations.rs | 5 +- melib/src/backends/imap/protocol_parser.rs | 461 +++++------- melib/src/backends/imap/untagged.rs | 281 +++---- melib/src/backends/imap/watch.rs | 829 +++++---------------- melib/src/sqlite3.rs | 20 +- 9 files changed, 1992 insertions(+), 1465 deletions(-) create mode 100644 melib/src/backends/imap/cache/sync.rs diff --git a/melib/src/backends/imap.rs b/melib/src/backends/imap.rs index e5bbb1019..154d37e5f 100644 --- a/melib/src/backends/imap.rs +++ b/melib/src/backends/imap.rs @@ -33,6 +33,7 @@ pub use connection::*; mod watch; pub use watch::*; mod cache; +use cache::ModSequence; pub mod managesieve; mod untagged; @@ -60,6 +61,7 @@ pub type UID = usize; pub static SUPPORTED_CAPABILITIES: &[&str] = &[ #[cfg(feature = "deflate_compression")] "COMPRESS=DEFLATE", + "CONDSTORE", "ENABLE", "IDLE", "IMAP4REV1", @@ -140,10 +142,9 @@ macro_rules! get_conf_val { #[derive(Debug)] pub struct UIDStore { account_hash: AccountHash, - cache_headers: bool, account_name: Arc, + keep_offline_cache: bool, capabilities: Arc>, - uidvalidity: Arc>>, hash_index: Arc>>, uid_index: Arc>>, msn_index: Arc>>>, @@ -151,6 +152,13 @@ pub struct UIDStore { byte_cache: Arc>>, tag_index: Arc>>, + /* Offline caching */ + uidvalidity: Arc>>, + envelopes: Arc>>, + max_uids: Arc>>, + modseq: Arc>>, + reverse_modseq: Arc>>>, + highestmodseqs: Arc>>>, mailboxes: Arc>>, is_online: Arc)>>, event_consumer: BackendEventConsumer, @@ -164,10 +172,15 @@ impl UIDStore { ) -> Self { UIDStore { account_hash, - cache_headers: false, account_name, + keep_offline_cache: false, capabilities: Default::default(), uidvalidity: Default::default(), + envelopes: Default::default(), + max_uids: Default::default(), + modseq: Default::default(), + reverse_modseq: Default::default(), + highestmodseqs: Default::default(), hash_index: Default::default(), uid_index: Default::default(), msn_index: Default::default(), @@ -213,6 +226,7 @@ impl MailBackend for ImapType { idle, #[cfg(feature = "deflate_compression")] deflate, + condstore, }, } = self.server_conf.protocol { @@ -242,6 +256,15 @@ impl MailBackend for ImapType { }; } } + "CONDSTORE" => { + if condstore { + *status = MailBackendExtensionStatus::Enabled { comment: None }; + } else { + *status = MailBackendExtensionStatus::Supported { + comment: Some("Disabled by user configuration"), + }; + } + } _ => { if SUPPORTED_CAPABILITIES.contains(&name.as_str()) { *status = MailBackendExtensionStatus::Enabled { comment: None }; @@ -265,24 +288,30 @@ impl MailBackend for ImapType { &mut self, mailbox_hash: MailboxHash, ) -> Result>> + Send + 'static>>> { - let uid_store = self.uid_store.clone(); - let can_create_flags = self.can_create_flags.clone(); - let connection = self.connection.clone(); - let mut max_uid: Option = None; - let mut valid_hash_set: HashSet = HashSet::default(); - let mut our_unseen: BTreeSet = Default::default(); + let mut state = FetchState { + stage: if self.uid_store.keep_offline_cache { + FetchStage::InitialCache + } else { + FetchStage::InitialFresh + }, + connection: self.connection.clone(), + mailbox_hash, + can_create_flags: self.can_create_flags.clone(), + uid_store: self.uid_store.clone(), + }; + Ok(Box::pin(async_stream::try_stream! { { - let f = &uid_store.mailboxes.lock().await[&mailbox_hash]; + let f = &state.uid_store.mailboxes.lock().await[&mailbox_hash]; f.exists.lock().unwrap().clear(); f.unseen.lock().unwrap().clear(); }; - let (cached_hash_set, cached_payload) = fetch_cached_envs(mailbox_hash, &mut our_unseen, &uid_store)?; - yield cached_payload; loop { - let res = fetch_hlpr(&connection, mailbox_hash, &cached_hash_set, &can_create_flags, &mut our_unseen, &mut valid_hash_set, &uid_store, &mut max_uid).await?; + let res = fetch_hlpr(&mut state).await.map_err(|err| { + debug!("fetch_hlpr err {:?}", &err); + err})?; yield res; - if max_uid == Some(1) || max_uid == Some(0) { + if state.stage == FetchStage::Finished { return; } @@ -343,12 +372,9 @@ impl MailBackend for ImapType { mailboxes.retain(|_, f| (self.is_subscribed)(f.path())); */ let keys = mailboxes.keys().cloned().collect::>(); - let mut uid_lock = uid_store.uidvalidity.lock().unwrap(); for f in mailboxes.values_mut() { - uid_lock.entry(f.hash()).or_default(); f.children.retain(|c| keys.contains(c)); } - drop(uid_lock); Ok(mailboxes .iter() .filter(|(_, f)| f.is_subscribed) @@ -1133,6 +1159,7 @@ impl ImapType { let use_starttls = use_tls && get_conf_val!(s["use_starttls"], !(server_port == 993))?; let danger_accept_invalid_certs: bool = get_conf_val!(s["danger_accept_invalid_certs"], false)?; + let keep_offline_cache = get_conf_val!(s["offline_cache"], true)?; let server_conf = ImapServerConf { server_hostname: server_hostname.to_string(), server_username: server_username.to_string(), @@ -1144,6 +1171,7 @@ impl ImapType { protocol: ImapProtocol::IMAP { extension_use: ImapExtensionUse { idle: get_conf_val!(s["use_idle"], true)?, + condstore: get_conf_val!(s["use_condstore"], true)?, #[cfg(feature = "deflate_compression")] deflate: get_conf_val!(s["use_deflate"], true)?, }, @@ -1156,7 +1184,7 @@ impl ImapType { }; let account_name = Arc::new(s.name().to_string()); let uid_store: Arc = Arc::new(UIDStore { - cache_headers: get_conf_val!(s["X_header_caching"], false)?, + keep_offline_cache, ..UIDStore::new(account_hash, account_name, event_consumer) }); let connection = ImapConnection::new_connection(&server_conf, uid_store.clone()); @@ -1330,8 +1358,9 @@ impl ImapType { ))); } get_conf_val!(s["danger_accept_invalid_certs"], false)?; - get_conf_val!(s["X_header_caching"], false)?; + get_conf_val!(s["offline_cache"], true)?; get_conf_val!(s["use_idle"], true)?; + get_conf_val!(s["use_condstore"], true)?; #[cfg(feature = "deflate_compression")] get_conf_val!(s["use_deflate"], true)?; #[cfg(not(feature = "deflate_compression"))] @@ -1355,311 +1384,252 @@ impl ImapType { } } -fn fetch_cached_envs( - mailbox_hash: MailboxHash, - our_unseen: &mut BTreeSet, - uid_store: &UIDStore, -) -> Result<(HashSet, Vec)> { - if !uid_store.cache_headers { - return Ok((HashSet::default(), vec![])); - } - - let uidvalidities = uid_store.uidvalidity.lock().unwrap(); - - let v = if let Some(v) = uidvalidities.get(&mailbox_hash) { - v - } else { - return Ok((HashSet::default(), vec![])); - }; - let cached_envs: (cache::MaxUID, Vec<(UID, Envelope)>); - cache::save_envelopes(uid_store.account_hash, mailbox_hash, *v, &[]) - .chain_err_summary(|| "Could not save envelopes in cache in get()")?; - cached_envs = cache::fetch_envelopes(uid_store.account_hash, mailbox_hash, *v) - .chain_err_summary(|| "Could not get envelopes in cache in get()")?; - let (_max_uid, envelopes) = debug!(cached_envs); - let ret = envelopes.iter().map(|(_, env)| env.hash()).collect(); - let payload = if !envelopes.is_empty() { - let mut payload = vec![]; - for (uid, env) in envelopes { - if !env.is_seen() { - our_unseen.insert(env.hash()); - } - uid_store - .hash_index - .lock() - .unwrap() - .insert(env.hash(), (uid, mailbox_hash)); - uid_store - .uid_index - .lock() - .unwrap() - .insert((mailbox_hash, uid), env.hash()); - payload.push(env); - } - debug!("sending cached payload for {}", mailbox_hash); - - payload - } else { - vec![] - }; - Ok((ret, payload)) +#[derive(Debug, PartialEq, Copy, Clone)] +enum FetchStage { + InitialFresh, + InitialCache, + ResyncCache, + FreshFetch { max_uid: usize }, + Finished, } -async fn fetch_hlpr( - connection: &Arc>, +#[derive(Debug)] +struct FetchState { + stage: FetchStage, + connection: Arc>, mailbox_hash: MailboxHash, - cached_hash_set: &HashSet, - can_create_flags: &Arc>, - our_unseen: &mut BTreeSet, - valid_hash_set: &mut HashSet, - uid_store: &UIDStore, - max_uid: &mut Option, -) -> Result> { - let (permissions, mailbox_path, mailbox_exists, no_select, unseen) = { - let f = &uid_store.mailboxes.lock().await[&mailbox_hash]; - ( - f.permissions.clone(), - f.imap_path().to_string(), - f.exists.clone(), - f.no_select, - f.unseen.clone(), - ) - }; - if no_select { - *max_uid = Some(0); - return Ok(Vec::new()); - } - let mut conn = connection.lock().await; - debug!("locked for fetch {}", mailbox_path); - let mut response = String::with_capacity(8 * 1024); - let max_uid_left = if let Some(max_uid) = max_uid { - *max_uid - } else { - conn.create_uid_msn_cache(mailbox_hash, 1).await?; - /* first SELECT the mailbox to get READ/WRITE permissions (because EXAMINE only - * returns READ-ONLY for both cases) */ - conn.select_mailbox(mailbox_hash, &mut response, true) - .await - .chain_err_summary(|| format!("Could not select mailbox {}", mailbox_path))?; - let mut examine_response = - protocol_parser::select_response(&response).chain_err_summary(|| { - format!( - "Could not parse select response for mailbox {}", - mailbox_path - ) - })?; - *can_create_flags.lock().unwrap() = examine_response.can_create_flags; - debug!( - "mailbox: {} examine_response: {:?}", - mailbox_path, examine_response - ); - { - let mut uidvalidities = uid_store.uidvalidity.lock().unwrap(); + can_create_flags: Arc>, + uid_store: Arc, +} - let v = uidvalidities - .entry(mailbox_hash) - .or_insert(examine_response.uidvalidity); - if uid_store.cache_headers { - let _ = cache::save_envelopes( - uid_store.account_hash, - mailbox_hash, - examine_response.uidvalidity, - &[], - ); - } - *v = examine_response.uidvalidity; - let mut permissions = permissions.lock().unwrap(); - permissions.create_messages = !examine_response.read_only; - permissions.remove_messages = !examine_response.read_only; - permissions.set_flags = !examine_response.read_only; - permissions.rename_messages = !examine_response.read_only; - permissions.delete_messages = !examine_response.read_only; - mailbox_exists - .lock() - .unwrap() - .set_not_yet_seen(examine_response.exists); - } - if examine_response.exists == 0 { - if uid_store.cache_headers { - for &env_hash in cached_hash_set { - conn.add_refresh_event(RefreshEvent { - account_hash: uid_store.account_hash, - mailbox_hash, - kind: RefreshEventKind::Remove(env_hash), - }); +async fn fetch_hlpr(state: &mut FetchState) -> Result> { + debug!((state.mailbox_hash, &state.stage)); + loop { + match state.stage { + FetchStage::InitialFresh => { + let select_response = state + .connection + .lock() + .await + .init_mailbox(state.mailbox_hash) + .await?; + *state.can_create_flags.lock().unwrap() = select_response.can_create_flags; + if select_response.exists == 0 { + state.stage = FetchStage::Finished; + return Ok(Vec::new()); } - let _ = cache::save_envelopes( - uid_store.account_hash, - mailbox_hash, - examine_response.uidvalidity, - &[], - ); + state.stage = FetchStage::FreshFetch { + max_uid: select_response.uidnext - 1, + }; + continue; } - *max_uid = Some(0); - return Ok(Vec::new()); - } - /* reselecting the same mailbox with EXAMINE prevents expunging it */ - conn.examine_mailbox(mailbox_hash, &mut response, true) - .await?; - if examine_response.uidnext == 0 { - /* UIDNEXT shouldn't be 0, since exists != 0 at this point */ - conn.send_command(format!("STATUS \"{}\" (UIDNEXT)", mailbox_path).as_bytes()) - .await?; - conn.read_response(&mut response, RequiredResponses::STATUS) - .await?; - let (_, status) = protocol_parser::status_response(response.as_bytes())?; - if let Some(uidnext) = status.uidnext { - if uidnext == 0 { - return Err(MeliError::new( - "IMAP server error: zero UIDNEXt with nonzero exists.", - )); + FetchStage::InitialCache => { + if let Some(cached_payload) = cache::fetch_cached_envs(state).await? { + state.stage = FetchStage::ResyncCache; + debug!( + "fetch_hlpr fetch_cached_envs payload {} len for mailbox_hash {}", + cached_payload.len(), + state.mailbox_hash + ); + let (mailbox_exists, unseen) = { + let f = &state.uid_store.mailboxes.lock().await[&state.mailbox_hash]; + (f.exists.clone(), f.unseen.clone()) + }; + unseen.lock().unwrap().insert_existing_set( + cached_payload + .iter() + .filter_map(|env| { + if !env.is_seen() { + Some(env.hash()) + } else { + None + } + }) + .collect(), + ); + mailbox_exists.lock().unwrap().insert_existing_set( + cached_payload.iter().map(|env| env.hash()).collect::<_>(), + ); + return Ok(cached_payload); } - examine_response.uidnext = uidnext; - } else { - return Err(MeliError::new("IMAP server did not reply with UIDNEXT")); + state.stage = FetchStage::InitialFresh; + continue; } - } - *max_uid = Some(examine_response.uidnext - 1); - examine_response.uidnext - 1 - }; - let chunk_size = 600; + FetchStage::ResyncCache => { + let mailbox_hash = state.mailbox_hash; + let mut conn = state.connection.lock().await; + let res = debug!(conn.resync(mailbox_hash).await); + if let Ok(Some(payload)) = res { + state.stage = FetchStage::Finished; + return Ok(payload); + } + state.stage = FetchStage::InitialFresh; + continue; + } + FetchStage::FreshFetch { max_uid } => { + let FetchState { + ref mut stage, + ref connection, + mailbox_hash, + can_create_flags: _, + ref uid_store, + } = state; + let mailbox_hash = *mailbox_hash; + let mut our_unseen: BTreeSet = BTreeSet::default(); + let (mailbox_path, mailbox_exists, no_select, unseen) = { + let f = &uid_store.mailboxes.lock().await[&mailbox_hash]; + ( + f.imap_path().to_string(), + f.exists.clone(), + f.no_select, + f.unseen.clone(), + ) + }; + if no_select { + state.stage = FetchStage::Finished; + return Ok(Vec::new()); + } + let mut conn = connection.lock().await; + debug!("locked for fetch {}", mailbox_path); + let mut response = String::with_capacity(8 * 1024); + let max_uid_left = max_uid; + let chunk_size = 250; - let mut payload = vec![]; - conn.examine_mailbox(mailbox_hash, &mut response, false) - .await?; - if max_uid_left > 0 { - let mut envelopes = vec![]; - debug!("{} max_uid_left= {}", mailbox_hash, max_uid_left); - if max_uid_left == 1 { - debug!("UID FETCH 1 (UID FLAGS ENVELOPE BODYSTRUCTURE)"); - conn.send_command(b"UID FETCH 1 (UID FLAGS ENVELOPE BODYSTRUCTURE)") - .await?; - } else { - conn.send_command( - debug!(format!( - "UID FETCH {}:{} (UID FLAGS ENVELOPE BODYSTRUCTURE)", - std::cmp::max(std::cmp::max(max_uid_left.saturating_sub(chunk_size), 1), 1), - max_uid_left - )) - .as_bytes(), - ) - .await? - }; - conn.read_response(&mut response, RequiredResponses::FETCH_REQUIRED) - .await - .chain_err_summary(|| { - format!( - "Could not parse fetch response for mailbox {}", - mailbox_path - ) - })?; - debug!( - "fetch response is {} bytes and {} lines", - response.len(), - response.lines().count() - ); - let (_, v, _) = protocol_parser::uid_fetch_responses(&response)?; - debug!("responses len is {}", v.len()); - for UidFetchResponse { - uid, - message_sequence_number, - flags, - envelope, - .. - } in v - { - let mut env = envelope.unwrap(); - let mut h = DefaultHasher::new(); - h.write_usize(uid); - h.write(mailbox_path.as_bytes()); - env.set_hash(h.finish()); - /* - debug!( - "env hash {} {} UID = {} MSN = {}", - env.hash(), - env.subject(), - uid, - message_sequence_number - ); - */ - valid_hash_set.insert(env.hash()); - let mut tag_lck = uid_store.tag_index.write().unwrap(); - if let Some((flags, keywords)) = flags { - if !flags.intersects(Flag::SEEN) { - our_unseen.insert(env.hash()); - } - env.set_flags(flags); - for f in keywords { - let hash = tag_hash!(f); - if !tag_lck.contains_key(&hash) { - tag_lck.insert(hash, f); + let mut payload = vec![]; + conn.examine_mailbox(mailbox_hash, &mut response, false) + .await?; + if max_uid_left > 0 { + let mut envelopes = vec![]; + debug!("{} max_uid_left= {}", mailbox_hash, max_uid_left); + let command = if max_uid_left == 1 { + "UID FETCH 1 (UID FLAGS ENVELOPE BODYSTRUCTURE)".to_string() + } else { + format!( + "UID FETCH {}:{} (UID FLAGS ENVELOPE BODYSTRUCTURE)", + std::cmp::max( + std::cmp::max(max_uid_left.saturating_sub(chunk_size), 1), + 1 + ), + max_uid_left + ) + }; + debug!("sending {:?}", &command); + conn.send_command(command.as_bytes()).await?; + conn.read_response(&mut response, RequiredResponses::FETCH_REQUIRED) + .await + .chain_err_summary(|| { + format!( + "Could not parse fetch response for mailbox {}", + mailbox_path + ) + })?; + debug!( + "fetch response is {} bytes and {} lines", + response.len(), + response.lines().count() + ); + let (_, mut v, _) = protocol_parser::fetch_responses(&response)?; + debug!("responses len is {}", v.len()); + for FetchResponse { + ref uid, + ref mut envelope, + ref mut flags, + .. + } in v.iter_mut() + { + let uid = uid.unwrap(); + let env = envelope.as_mut().unwrap(); + env.set_hash(generate_envelope_hash(&mailbox_path, &uid)); + let mut tag_lck = uid_store.tag_index.write().unwrap(); + if let Some((flags, keywords)) = flags { + if !flags.intersects(Flag::SEEN) { + our_unseen.insert(env.hash()); + } + env.set_flags(*flags); + for f in keywords { + let hash = tag_hash!(f); + if !tag_lck.contains_key(&hash) { + tag_lck.insert(hash, f.to_string()); + } + env.labels_mut().push(hash); + } + } } - env.labels_mut().push(hash); + if uid_store.keep_offline_cache { + let mut cache_handle = cache::CacheHandle::get(uid_store.clone())?; + debug!(cache_handle + .insert_envelopes(mailbox_hash, &v) + .chain_err_summary(|| { + format!( + "Could not save envelopes in cache for mailbox {}", + mailbox_path + ) + }))?; + } + + for FetchResponse { + uid, + message_sequence_number, + envelope, + .. + } in v + { + let uid = uid.unwrap(); + let env = envelope.unwrap(); + /* + debug!( + "env hash {} {} UID = {} MSN = {}", + env.hash(), + env.subject(), + uid, + message_sequence_number + ); + */ + uid_store + .msn_index + .lock() + .unwrap() + .entry(mailbox_hash) + .or_default() + .insert(message_sequence_number - 1, uid); + uid_store + .hash_index + .lock() + .unwrap() + .insert(env.hash(), (uid, mailbox_hash)); + uid_store + .uid_index + .lock() + .unwrap() + .insert((mailbox_hash, uid), env.hash()); + envelopes.push((uid, env)); + } + debug!("sending payload for {}", mailbox_hash); + unseen + .lock() + .unwrap() + .insert_existing_set(our_unseen.iter().cloned().collect()); + mailbox_exists.lock().unwrap().insert_existing_set( + envelopes.iter().map(|(_, env)| env.hash()).collect::<_>(), + ); + drop(conn); + payload.extend(envelopes.into_iter().map(|(_, env)| env)); } + if max_uid_left <= 1 { + *stage = FetchStage::Finished; + } else { + *stage = FetchStage::FreshFetch { + max_uid: std::cmp::max( + std::cmp::max(max_uid_left.saturating_sub(chunk_size), 1), + 1, + ), + }; + } + return Ok(payload); + } + FetchStage::Finished => { + return Ok(vec![]); } - uid_store - .msn_index - .lock() - .unwrap() - .entry(mailbox_hash) - .or_default() - .insert(message_sequence_number - 1, uid); - uid_store - .hash_index - .lock() - .unwrap() - .insert(env.hash(), (uid, mailbox_hash)); - uid_store - .uid_index - .lock() - .unwrap() - .insert((mailbox_hash, uid), env.hash()); - envelopes.push((uid, env)); } - debug!("sending payload for {}", mailbox_hash); - if uid_store.cache_headers { - //FIXME - cache::save_envelopes( - uid_store.account_hash, - mailbox_hash, - uid_store.uidvalidity.lock().unwrap()[&mailbox_hash], - &envelopes - .iter() - .map(|(uid, env)| (*uid, env)) - .collect::>(), - ) - .chain_err_summary(|| { - format!( - "Could not save envelopes in cache for mailbox {}", - mailbox_path - ) - })?; - } - for &env_hash in cached_hash_set.difference(&valid_hash_set) { - conn.add_refresh_event(RefreshEvent { - account_hash: uid_store.account_hash, - mailbox_hash, - kind: RefreshEventKind::Remove(env_hash), - }); - } - unseen - .lock() - .unwrap() - .insert_set(our_unseen.iter().cloned().collect()); - mailbox_exists - .lock() - .unwrap() - .insert_existing_set(envelopes.iter().map(|(_, env)| env.hash()).collect::<_>()); - drop(conn); - payload.extend(envelopes.into_iter().map(|(_, env)| env)); } - *max_uid = if max_uid_left <= 1 { - Some(0) - } else { - Some(std::cmp::max( - std::cmp::max(max_uid_left.saturating_sub(chunk_size), 1), - 1, - )) - }; - Ok(payload) } diff --git a/melib/src/backends/imap/cache.rs b/melib/src/backends/imap/cache.rs index 816efa64e..b12ddc7ef 100644 --- a/melib/src/backends/imap/cache.rs +++ b/melib/src/backends/imap/cache.rs @@ -19,14 +19,46 @@ * along with meli. If not, see . */ -use super::UID; +use super::*; +mod sync; use crate::{ - backends::{AccountHash, MailboxHash}, - email::Envelope, + backends::MailboxHash, + email::{Envelope, EnvelopeHash}, error::*, }; -pub type MaxUID = UID; +use std::convert::TryFrom; + +#[derive(Debug, PartialEq, Hash, Eq, Ord, PartialOrd, Copy, Clone)] +pub struct ModSequence(pub std::num::NonZeroU64); + +impl TryFrom for ModSequence { + type Error = (); + fn try_from(val: i64) -> std::result::Result { + std::num::NonZeroU64::new(val as u64) + .map(|u| Ok(ModSequence(u))) + .unwrap_or(Err(())) + } +} + +impl core::fmt::Display for ModSequence { + fn fmt(&self, fmt: &mut core::fmt::Formatter) -> core::fmt::Result { + write!(fmt, "{}", &self.0) + } +} + +#[derive(Debug)] +pub struct CachedEnvelope { + pub inner: Envelope, + pub mailbox_hash: MailboxHash, + pub modsequence: Option, +} + +pub struct CacheHandle { + #[cfg(feature = "sqlite3")] + connection: crate::sqlite3::Connection, + uid_store: Arc, +} #[cfg(feature = "sqlite3")] pub use sqlite3_m::*; @@ -34,118 +66,220 @@ pub use sqlite3_m::*; #[cfg(feature = "sqlite3")] mod sqlite3_m { use super::*; - use crate::sqlite3; - const DB_NAME: &str = "header_cache.db"; - const INIT_SCRIPT: &str = "PRAGMA foreign_keys = true; + use crate::sqlite3::rusqlite::types::{ + FromSql, FromSqlError, FromSqlResult, ToSql, ToSqlOutput, + }; + use crate::sqlite3::{self, DatabaseDescription}; + const DB_DESCRIPTION: DatabaseDescription = DatabaseDescription { + name: "header_cache.db", + init_script: Some("PRAGMA foreign_keys = true; PRAGMA encoding = 'UTF-8'; CREATE TABLE IF NOT EXISTS envelopes ( mailbox_hash INTEGER, uid INTEGER, - validity INTEGER, - envelope BLOB NOT NULL UNIQUE, - PRIMARY KEY (mailbox_hash, uid, validity), - FOREIGN KEY (mailbox_hash, validity) REFERENCES uidvalidity(mailbox_hash, uid) ON DELETE CASCADE + modsequence INTEGER, + envelope BLOB NOT NULL, + PRIMARY KEY (mailbox_hash, uid), + FOREIGN KEY (mailbox_hash) REFERENCES uidvalidity(mailbox_hash) ON DELETE CASCADE ); CREATE TABLE IF NOT EXISTS uidvalidity ( uid INTEGER UNIQUE, mailbox_hash INTEGER UNIQUE, + highestmodseq INTEGER, PRIMARY KEY (mailbox_hash, uid) ); - CREATE INDEX IF NOT EXISTS envelope_idx ON envelopes(mailbox_hash, uid, validity); - CREATE INDEX IF NOT EXISTS uidvalidity_idx ON uidvalidity(mailbox_hash);"; + CREATE INDEX IF NOT EXISTS envelope_idx ON envelopes(mailbox_hash); + CREATE INDEX IF NOT EXISTS uidvalidity_idx ON uidvalidity(mailbox_hash);"), + version: 1, + }; - pub fn fetch_envelopes( - account_hash: AccountHash, - mailbox_hash: MailboxHash, - uidvalidity: usize, - ) -> Result<(MaxUID, Vec<(UID, Envelope)>)> { - let conn = sqlite3::open_or_create_db( - &format!("{}_{}", account_hash, DB_NAME), - Some(INIT_SCRIPT), - )?; - let mut stmt = conn - .prepare("SELECT MAX(uid) FROM envelopes WHERE mailbox_hash = ? AND validity = ?") - .unwrap(); - let max_uid: usize = stmt - .query_map( - sqlite3::params![mailbox_hash as i64, uidvalidity as i64], - |row| row.get(0).map(|u: i64| u as usize), - ) - .chain_err_summary(|| { - format!( - "Error while performing query {:?}", - "SELECT MAX(uid) FROM envelopes WHERE mailbox_hash = ? AND validity = ?" - ) - })? - .next() - .unwrap() - .unwrap_or(0); - let mut stmt = conn - .prepare("SELECT uid, envelope FROM envelopes WHERE mailbox_hash = ? AND validity = ?") - .unwrap(); - let results: Vec<(UID, Vec)> = stmt - .query_map( - sqlite3::params![mailbox_hash as i64, uidvalidity as i64], - |row| Ok((row.get::<_, i64>(0)? as usize, row.get(1)?)), - ) - .chain_err_summary(|| { - format!( - "Error while performing query {:?}", - "SELECT uid, envelope FROM envelopes WHERE mailbox_hash = ? AND validity = ?", - ) - })? - .collect::>()?; - debug!( - "imap cache max_uid: {} results len: {}", - max_uid, - results.len() - ); - Ok(( - max_uid, - results - .into_iter() - .map(|(uid, env)| { - Ok(( - uid, - bincode::deserialize(&env).map_err(|e| MeliError::new(e.to_string()))?, - )) - }) - .collect::>>()?, - )) + impl ToSql for ModSequence { + fn to_sql(&self) -> rusqlite::Result { + Ok(ToSqlOutput::from(self.0.get() as i64)) + } } - pub fn save_envelopes( - account_hash: AccountHash, - mailbox_hash: MailboxHash, - uidvalidity: usize, - envs: &[(UID, &Envelope)], - ) -> Result<()> { - let conn = - sqlite3::open_or_create_db(&format!("{}_{}", account_hash, DB_NAME), Some(INIT_SCRIPT)) + impl FromSql for ModSequence { + fn column_result(value: rusqlite::types::ValueRef) -> FromSqlResult { + let i: i64 = FromSql::column_result(value)?; + if i == 0 { + return Err(FromSqlError::OutOfRange(0)); + } + Ok(ModSequence::try_from(i).unwrap()) + } + } + + impl CacheHandle { + pub fn get(uid_store: Arc) -> Result { + Ok(Self { + connection: sqlite3::open_or_create_db( + &DB_DESCRIPTION, + Some(uid_store.account_name.as_str()), + )?, + uid_store, + }) + } + + pub fn mailbox_state( + &self, + mailbox_hash: MailboxHash, + ) -> Result)>> { + let mut stmt = self + .connection + .prepare("SELECT uid, highestmodseq FROM uidvalidity WHERE mailbox_hash = ?1;")?; + + let mut ret = stmt.query_map(sqlite3::params![mailbox_hash as i64], |row| { + Ok((row.get(0).map(|u: i64| u as usize)?, row.get(1)?)) + })?; + if let Some(row_res) = ret.next() { + Ok(Some(row_res?)) + } else { + Ok(None) + } + } + + pub fn clear( + &self, + mailbox_hash: MailboxHash, + new_uidvalidity: UID, + highestmodseq: Option, + ) -> Result<()> { + debug!("clear mailbox_hash {}", mailbox_hash); + debug!(new_uidvalidity); + debug!(&highestmodseq); + self.connection + .execute( + "DELETE FROM uidvalidity WHERE mailbox_hash = ?1", + sqlite3::params![mailbox_hash as i64], + ) .chain_err_summary(|| { format!( - "Could not create header_cache.db for account {}", - account_hash + "Could not clear cache of mailbox {} account {}", + mailbox_hash, self.uid_store.account_name ) })?; - conn.execute( - "INSERT OR REPLACE INTO uidvalidity (uid, mailbox_hash) VALUES (?1, ?2)", - sqlite3::params![uidvalidity as i64, mailbox_hash as i64], - ) - .chain_err_summary(|| { - format!( - "Could not insert uidvalidity {} in header_cache of account {}", - uidvalidity, account_hash + + self.connection.execute( + "INSERT OR IGNORE INTO uidvalidity (uid, highestmodseq, mailbox_hash) VALUES (?1, ?2, ?3)", + sqlite3::params![new_uidvalidity as i64, highestmodseq, mailbox_hash as i64], ) - })?; - for (uid, env) in envs { - conn.execute( - "INSERT OR REPLACE INTO envelopes (uid, mailbox_hash, validity, envelope) VALUES (?1, ?2, ?3, ?4)", - sqlite3::params![*uid as i64, mailbox_hash as i64, uidvalidity as i64, bincode::serialize(env).map_err(|e| MeliError::new(e.to_string()))?], - ).chain_err_summary(|| format!("Could not insert envelope with hash {} in header_cache of account {}", env.hash(), account_hash))?; + .chain_err_summary(|| { + format!( + "Could not insert uidvalidity {} in header_cache of account {}", + new_uidvalidity, self.uid_store.account_name + ) + })?; + Ok(()) + } + + pub fn envelopes(&self, mailbox_hash: MailboxHash) -> Result>> { + debug!("envelopes mailbox_hash {}", mailbox_hash); + if debug!(self.mailbox_state(mailbox_hash)?.is_none()) { + return Ok(None); + } + + let mut stmt = self.connection.prepare( + "SELECT uid, envelope, modsequence FROM envelopes WHERE mailbox_hash = ?1;", + )?; + + let ret: Vec<(UID, Envelope, Option)> = stmt + .query_map(sqlite3::params![mailbox_hash as i64], |row| { + Ok(( + row.get(0).map(|i: i64| i as usize)?, + row.get(1)?, + row.get(2)?, + )) + })? + .into_iter() + .collect::>()?; + let mut max_uid = 0; + let mut env_lck = self.uid_store.envelopes.lock().unwrap(); + let mut hash_index_lck = self.uid_store.hash_index.lock().unwrap(); + let mut uid_index_lck = self.uid_store.uid_index.lock().unwrap(); + let mut env_hashes = Vec::with_capacity(ret.len()); + for (uid, env, modseq) in ret { + env_hashes.push(env.hash()); + max_uid = std::cmp::max(max_uid, uid); + hash_index_lck.insert(env.hash(), (uid, mailbox_hash)); + uid_index_lck.insert((mailbox_hash, uid), env.hash()); + env_lck.insert( + env.hash(), + CachedEnvelope { + inner: env, + mailbox_hash, + modsequence: modseq, + }, + ); + } + self.uid_store + .max_uids + .lock() + .unwrap() + .insert(mailbox_hash, max_uid); + Ok(Some(env_hashes)) + } + + pub fn insert_envelopes( + &mut self, + mailbox_hash: MailboxHash, + fetches: &[FetchResponse<'_>], + ) -> Result<()> { + debug!( + "insert_envelopes mailbox_hash {} len {}", + mailbox_hash, + fetches.len() + ); + if self.mailbox_state(mailbox_hash)?.is_none() { + debug!(self.mailbox_state(mailbox_hash)?.is_none()); + let uidvalidity = self + .uid_store + .uidvalidity + .lock() + .unwrap() + .get(&mailbox_hash) + .cloned(); + let highestmodseq = self + .uid_store + .highestmodseqs + .lock() + .unwrap() + .get(&mailbox_hash) + .cloned(); + debug!(&uidvalidity); + debug!(&highestmodseq); + if let Some(uidvalidity) = uidvalidity { + debug!(self.clear( + mailbox_hash, + uidvalidity, + highestmodseq.and_then(|v| v.ok()), + ))?; + } + } + let Self { + ref mut connection, + ref uid_store, + } = self; + let tx = connection.transaction()?; + for item in fetches { + if let FetchResponse { + uid: Some(uid), + message_sequence_number: _, + modseq, + flags: _, + body: _, + envelope: Some(envelope), + } = item + { + tx.execute( + "INSERT OR REPLACE INTO envelopes (uid, mailbox_hash, modsequence, envelope) VALUES (?1, ?2, ?3, ?4)", + sqlite3::params![*uid as i64, mailbox_hash as i64, modseq, &envelope], + ).chain_err_summary(|| format!("Could not insert envelope {} {} in header_cache of account {}", envelope.message_id(), envelope.hash(), uid_store.account_name))?; + } + } + tx.commit()?; + Ok(()) } - Ok(()) } } @@ -155,20 +289,96 @@ pub use filesystem_m::*; #[cfg(not(feature = "sqlite3"))] mod filesystem_m { use super::*; - pub fn fetch_envelopes( - _account_hash: AccountHash, - _mailbox_hash: MailboxHash, - _uidvalidity: usize, - ) -> Result<(MaxUID, Vec<(UID, Envelope)>)> { - Ok((0, vec![])) - } + impl CacheHandle { + pub fn get(uid_store: Arc) -> Result { + Ok(Self { uid_store }) + } - pub fn save_envelopes( - _account_hash: AccountHash, - _mailbox_hash: MailboxHash, - _uidvalidity: usize, - _envs: &[(UID, &Envelope)], - ) -> Result<()> { - Ok(()) + pub fn mailbox_state( + &self, + _mailbox_hash: MailboxHash, + ) -> Result)>> { + Ok(None) + } + + pub fn clear( + &self, + _mailbox_hash: MailboxHash, + _new_uidvalidity: UID, + _highestmodseq: Option, + ) -> Result<()> { + Ok(()) + } + + pub fn envelopes(&self, _mailbox_hash: MailboxHash) -> Result>> { + Ok(None) + } + + pub fn insert_envelopes( + &mut self, + _mailbox_hash: MailboxHash, + _fetches: &[FetchResponse<'_>], + ) -> Result<()> { + Ok(()) + } + } +} + +pub(super) async fn fetch_cached_envs(state: &mut FetchState) -> Result>> { + let FetchState { + stage: _, + ref mut connection, + mailbox_hash, + can_create_flags: _, + ref uid_store, + } = state; + debug!(uid_store.keep_offline_cache); + let mailbox_hash = *mailbox_hash; + if !uid_store.keep_offline_cache { + return Ok(None); + } + { + let mut conn = connection.lock().await; + match debug!(conn.load_cache(mailbox_hash).await) { + None => return Ok(None), + Some(Ok(env_hashes)) => { + uid_store + .mailboxes + .lock() + .await + .entry(mailbox_hash) + .and_modify(|entry| { + entry + .exists + .lock() + .unwrap() + .insert_set(env_hashes.iter().cloned().collect()); + let env_lck = uid_store.envelopes.lock().unwrap(); + entry.unseen.lock().unwrap().insert_set( + env_hashes + .iter() + .filter_map(|h| { + if !env_lck[h].inner.is_seen() { + Some(*h) + } else { + None + } + }) + .collect(), + ); + }); + let env_lck = uid_store.envelopes.lock().unwrap(); + + return Ok(Some( + env_hashes + .into_iter() + .filter_map(|env_hash| { + env_lck.get(&env_hash).map(|c_env| c_env.inner.clone()) + }) + .collect::>(), + )); + } + Some(Err(err)) => return debug!(Err(err)), + } } } diff --git a/melib/src/backends/imap/cache/sync.rs b/melib/src/backends/imap/cache/sync.rs new file mode 100644 index 000000000..518bbe45e --- /dev/null +++ b/melib/src/backends/imap/cache/sync.rs @@ -0,0 +1,735 @@ +/* + * melib - IMAP + * + * Copyright 2020 Manos Pitsidianakis + * + * This file is part of meli. + * + * meli is free software: you can redistribute it and/or modify + * it under the terms of the GNU General Public License as published by + * the Free Software Foundation, either version 3 of the License, or + * (at your option) any later version. + * + * meli is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU General Public License for more details. + * + * You should have received a copy of the GNU General Public License + * along with meli. If not, see . + */ + +use super::*; + +impl ImapConnection { + pub async fn resync(&mut self, mailbox_hash: MailboxHash) -> Result>> { + debug!("resync mailbox_hash {}", mailbox_hash); + debug!(&self.sync_policy); + if let SyncPolicy::None = self.sync_policy { + return Ok(None); + } + + let cache_handle = CacheHandle::get(self.uid_store.clone())?; + if cache_handle.mailbox_state(mailbox_hash)?.is_none() { + return Ok(None); + } + + self.select_mailbox(mailbox_hash, &mut String::new(), false) + .await?; + match self.sync_policy { + SyncPolicy::None => Ok(None), + SyncPolicy::Basic => self.resync_basic(cache_handle, mailbox_hash).await, + SyncPolicy::Condstore => self.resync_condstore(cache_handle, mailbox_hash).await, + SyncPolicy::CondstoreQresync => { + self.resync_condstoreqresync(cache_handle, mailbox_hash) + .await + } + } + } + + pub async fn load_cache( + &mut self, + mailbox_hash: MailboxHash, + ) -> Option>> { + debug!("load_cache {}", mailbox_hash); + let cache_handle = match CacheHandle::get(self.uid_store.clone()) { + Ok(v) => v, + Err(err) => return Some(Err(err)), + }; + let (uidvalidity, highestmodseq) = match debug!(cache_handle.mailbox_state(mailbox_hash)) { + Err(err) => return Some(Err(err)), + Ok(Some(v)) => v, + Ok(None) => { + return None; + } + }; + self.uid_store + .uidvalidity + .lock() + .unwrap() + .entry(mailbox_hash) + .or_insert(uidvalidity); + self.uid_store + .highestmodseqs + .lock() + .unwrap() + .entry(mailbox_hash) + .or_insert(highestmodseq.ok_or(())); + match debug!(cache_handle.envelopes(mailbox_hash)) { + Ok(Some(envs)) => Some(Ok(envs)), + Ok(None) => None, + Err(err) => Some(Err(err)), + } + } + + pub async fn build_cache( + &mut self, + cache_handle: &mut CacheHandle, + mailbox_hash: MailboxHash, + ) -> Result<()> { + debug!("build_cache {}", mailbox_hash); + let mut response = String::with_capacity(8 * 1024); + // 1 get uidvalidity, highestmodseq + self.select_mailbox(mailbox_hash, &mut response, true) + .await?; + let select_response = + protocol_parser::select_response(&response).chain_err_summary(|| { + format!( + "Could not parse select response for mailbox {}", + mailbox_hash + ) + })?; + self.uid_store + .uidvalidity + .lock() + .unwrap() + .insert(mailbox_hash, select_response.uidvalidity); + if let Some(v) = select_response.highestmodseq { + self.uid_store + .highestmodseqs + .lock() + .unwrap() + .insert(mailbox_hash, v); + } + cache_handle.clear( + mailbox_hash, + select_response.uidvalidity, + select_response.highestmodseq.and_then(|i| i.ok()), + )?; + self.send_command(b"UID FETCH 1:* (UID FLAGS ENVELOPE BODYSTRUCTURE)") + .await?; + self.read_response(&mut response, RequiredResponses::FETCH_REQUIRED) + .await?; + let fetches = protocol_parser::fetch_responses(&response)?.1; + cache_handle.insert_envelopes(mailbox_hash, &fetches)?; + Ok(()) + } + + //rfc4549_Synchronization_Operations_for_Disconnected_IMAP4_Clients + pub async fn resync_basic( + &mut self, + cache_handle: CacheHandle, + mailbox_hash: MailboxHash, + ) -> Result>> { + let mut payload = vec![]; + debug!("resync_basic"); + debug!(self + .uid_store + .uidvalidity + .lock() + .unwrap() + .get(&mailbox_hash)); + debug!(self.uid_store.max_uids.lock().unwrap().get(&mailbox_hash)); + let mut response = String::with_capacity(8 * 1024); + let cached_uidvalidity = self + .uid_store + .uidvalidity + .lock() + .unwrap() + .get(&mailbox_hash) + .cloned(); + let cached_max_uid = self + .uid_store + .max_uids + .lock() + .unwrap() + .get(&mailbox_hash) + .cloned(); + // 3. tag2 UID FETCH 1: FLAGS + if cached_uidvalidity.is_none() || cached_max_uid.is_none() { + return Ok(None); + } + + let current_uidvalidity: UID = cached_uidvalidity.unwrap(); + let max_uid: UID = cached_max_uid.unwrap(); + let (mailbox_path, mailbox_exists, unseen) = { + let f = &self.uid_store.mailboxes.lock().await[&mailbox_hash]; + ( + f.imap_path().to_string(), + f.exists.clone(), + f.unseen.clone(), + ) + }; + let mut new_unseen = BTreeSet::default(); + debug!("current_uidvalidity is {}", current_uidvalidity); + debug!("max_uid is {}", max_uid); + self.select_mailbox(mailbox_hash, &mut response, true) + .await?; + let select_response = protocol_parser::select_response(&response)?; + debug!( + "select_response.uidvalidity is {}", + select_response.uidvalidity + ); + // 1. check UIDVALIDITY. If fail, discard cache and rebuild + if select_response.uidvalidity != current_uidvalidity { + cache_handle.clear( + mailbox_hash, + select_response.uidvalidity, + select_response.highestmodseq.and_then(|i| i.ok()), + )?; + return Ok(None); + } + + // 2. tag1 UID FETCH :* + self.send_command( + format!( + "UID FETCH {}:* (UID FLAGS ENVELOPE BODYSTRUCTURE)", + max_uid + 1 + ) + .as_bytes(), + ) + .await?; + self.read_response(&mut response, RequiredResponses::FETCH_REQUIRED) + .await?; + debug!( + "fetch response is {} bytes and {} lines", + response.len(), + response.lines().count() + ); + let (_, mut v, _) = protocol_parser::fetch_responses(&response)?; + debug!("responses len is {}", v.len()); + for FetchResponse { + ref uid, + ref mut envelope, + ref mut flags, + .. + } in v.iter_mut() + { + let uid = uid.unwrap(); + let env = envelope.as_mut().unwrap(); + env.set_hash(generate_envelope_hash(&mailbox_path, &uid)); + let mut tag_lck = self.uid_store.tag_index.write().unwrap(); + if let Some((flags, keywords)) = flags { + if !flags.intersects(Flag::SEEN) { + new_unseen.insert(env.hash()); + } + env.set_flags(*flags); + for f in keywords { + let hash = tag_hash!(f); + if !tag_lck.contains_key(&hash) { + tag_lck.insert(hash, f.to_string()); + } + env.labels_mut().push(hash); + } + } + } + { + let mut cache_handle = cache::CacheHandle::get(self.uid_store.clone())?; + debug!(cache_handle + .insert_envelopes(mailbox_hash, &v) + .chain_err_summary(|| { + format!( + "Could not save envelopes in cache for mailbox {}", + mailbox_path + ) + }))?; + } + + for FetchResponse { + uid, + message_sequence_number: _, + envelope, + .. + } in v + { + let uid = uid.unwrap(); + let env = envelope.unwrap(); + /* + debug!( + "env hash {} {} UID = {} MSN = {}", + env.hash(), + env.subject(), + uid, + message_sequence_number + ); + */ + self.uid_store + .hash_index + .lock() + .unwrap() + .insert(env.hash(), (uid, mailbox_hash)); + self.uid_store + .uid_index + .lock() + .unwrap() + .insert((mailbox_hash, uid), env.hash()); + payload.push((uid, env)); + } + debug!("sending payload for {}", mailbox_hash); + unseen + .lock() + .unwrap() + .insert_existing_set(new_unseen.iter().cloned().collect()); + mailbox_exists + .lock() + .unwrap() + .insert_existing_set(payload.iter().map(|(_, env)| env.hash()).collect::<_>()); + // 3. tag2 UID FETCH 1: FLAGS + self.send_command(format!("UID FETCH 1:{} FLAGS", max_uid).as_bytes()) + .await?; + self.read_response(&mut response, RequiredResponses::FETCH_REQUIRED) + .await?; + //1) update cached flags for old messages; + //2) find out which old messages got expunged; and + //3) build a mapping between message numbers and UIDs (for old messages). + let mut valid_envs = BTreeSet::default(); + let mut env_lck = self.uid_store.envelopes.lock().unwrap(); + let (_, v, _) = protocol_parser::fetch_responses(&response)?; + let mut refresh_events = vec![]; + for FetchResponse { uid, flags, .. } in v { + let uid = uid.unwrap(); + let env_hash = generate_envelope_hash(&mailbox_path, &uid); + valid_envs.insert(env_hash); + if !env_lck.contains_key(&env_hash) { + return Ok(None); + } + let (flags, tags) = flags.unwrap(); + if env_lck[&env_hash].inner.flags() != flags + || env_lck[&env_hash].inner.labels() + != &tags + .iter() + .map(|t| tag_hash!(t)) + .collect::>() + { + env_lck.entry(env_hash).and_modify(|entry| { + entry.inner.set_flags(flags); + entry.inner.labels_mut().clear(); + entry + .inner + .labels_mut() + .extend(tags.iter().map(|t| tag_hash!(t))); + }); + refresh_events.push(RefreshEvent { + mailbox_hash, + account_hash: self.uid_store.account_hash, + kind: RefreshEventKind::NewFlags(env_hash, (flags, tags)), + }); + } + } + for env_hash in valid_envs.difference( + &env_lck + .iter() + .filter_map(|(h, cenv)| { + if cenv.mailbox_hash == mailbox_hash { + Some(*h) + } else { + None + } + }) + .collect(), + ) { + env_lck.remove(env_hash); + refresh_events.push(RefreshEvent { + mailbox_hash, + account_hash: self.uid_store.account_hash, + kind: RefreshEventKind::Remove(*env_hash), + }); + } + drop(env_lck); + for ev in refresh_events { + self.add_refresh_event(ev); + } + Ok(Some(payload.into_iter().map(|(_, env)| env).collect())) + } + + //rfc4549_Synchronization_Operations_for_Disconnected_IMAP4_Clients + //Section 6.1 + pub async fn resync_condstore( + &mut self, + cache_handle: CacheHandle, + mailbox_hash: MailboxHash, + ) -> Result>> { + let mut payload = vec![]; + debug!("resync_condstore"); + debug!(self + .uid_store + .uidvalidity + .lock() + .unwrap() + .get(&mailbox_hash)); + debug!(self.uid_store.max_uids.lock().unwrap().get(&mailbox_hash)); + let mut response = String::with_capacity(8 * 1024); + let cached_uidvalidity = self + .uid_store + .uidvalidity + .lock() + .unwrap() + .get(&mailbox_hash) + .cloned(); + let cached_max_uid = self + .uid_store + .max_uids + .lock() + .unwrap() + .get(&mailbox_hash) + .cloned(); + let cached_highestmodseq = self + .uid_store + .highestmodseqs + .lock() + .unwrap() + .get(&mailbox_hash) + .cloned(); + if cached_uidvalidity.is_none() + || cached_max_uid.is_none() + || cached_highestmodseq.is_none() + { + // This means the mailbox is not cached. + return Ok(None); + } + let cached_uidvalidity: UID = cached_uidvalidity.unwrap(); + let cached_max_uid: UID = cached_max_uid.unwrap(); + let cached_highestmodseq: std::result::Result = + cached_highestmodseq.unwrap(); + if cached_highestmodseq.is_err() { + // No MODSEQ is available for __this__ mailbox, fallback to basic sync + return self.resync_basic(cache_handle, mailbox_hash).await; + } + let cached_highestmodseq: ModSequence = cached_highestmodseq.unwrap(); + + let (mailbox_path, mailbox_exists, unseen) = { + let f = &self.uid_store.mailboxes.lock().await[&mailbox_hash]; + ( + f.imap_path().to_string(), + f.exists.clone(), + f.unseen.clone(), + ) + }; + let mut new_unseen = BTreeSet::default(); + debug!("current_uidvalidity is {}", cached_uidvalidity); + debug!("max_uid is {}", cached_max_uid); + // 1. check UIDVALIDITY. If fail, discard cache and rebuild + self.select_mailbox(mailbox_hash, &mut response, true) + .await?; + let select_response = protocol_parser::select_response(&response)?; + debug!( + "select_response.uidvalidity is {}", + select_response.uidvalidity + ); + if select_response.uidvalidity != cached_uidvalidity { + // 1a) Check the mailbox UIDVALIDITY (see section 4.1 for more + //details) with SELECT/EXAMINE/STATUS. + // If the UIDVALIDITY value returned by the server differs, the + // client MUST + // * empty the local cache of that mailbox; + // * "forget" the cached HIGHESTMODSEQ value for the mailbox; + // * remove any pending "actions" that refer to UIDs in that + // mailbox (note that this doesn't affect actions performed on + // client-generated fake UIDs; see Section 5); and + // * skip steps 1b and 2-II; + cache_handle.clear( + mailbox_hash, + select_response.uidvalidity, + select_response.highestmodseq.and_then(|i| i.ok()), + )?; + return Ok(None); + } + if select_response.highestmodseq.is_none() + || select_response.highestmodseq.as_ref().unwrap().is_err() + { + if select_response.highestmodseq.as_ref().unwrap().is_err() { + self.uid_store + .highestmodseqs + .lock() + .unwrap() + .insert(mailbox_hash, Err(())); + } + return self.resync_basic(cache_handle, mailbox_hash).await; + } + let new_highestmodseq = select_response.highestmodseq.unwrap().unwrap(); + let mut refresh_events = vec![]; + // 1b) Check the mailbox HIGHESTMODSEQ. + // If the cached value is the same as the one returned by the server, skip fetching + // message flags on step 2-II, i.e., the client only has to find out which messages got + // expunged. + if cached_highestmodseq != new_highestmodseq { + /* Cache is synced, only figure out which messages got expunged */ + + // 2) Fetch the current "descriptors". + // I) Discover new messages. + + // II) Discover changes to old messages and flags for new messages + // using + // "FETCH 1:* (FLAGS) (CHANGEDSINCE )" or + // "SEARCH MODSEQ ". + + // 2. tag1 UID FETCH :* + self.send_command( + format!( + "UID FETCH {}:* (UID FLAGS ENVELOPE BODYSTRUCTURE) (CHANGEDSINCE {})", + cached_max_uid + 1, + cached_highestmodseq, + ) + .as_bytes(), + ) + .await?; + self.read_response(&mut response, RequiredResponses::FETCH_REQUIRED) + .await?; + debug!( + "fetch response is {} bytes and {} lines", + response.len(), + response.lines().count() + ); + let (_, mut v, _) = protocol_parser::fetch_responses(&response)?; + debug!("responses len is {}", v.len()); + for FetchResponse { + ref uid, + ref mut envelope, + ref mut flags, + .. + } in v.iter_mut() + { + let uid = uid.unwrap(); + let env = envelope.as_mut().unwrap(); + env.set_hash(generate_envelope_hash(&mailbox_path, &uid)); + let mut tag_lck = self.uid_store.tag_index.write().unwrap(); + if let Some((flags, keywords)) = flags { + if !flags.intersects(Flag::SEEN) { + new_unseen.insert(env.hash()); + } + env.set_flags(*flags); + for f in keywords { + let hash = tag_hash!(f); + if !tag_lck.contains_key(&hash) { + tag_lck.insert(hash, f.to_string()); + } + env.labels_mut().push(hash); + } + } + } + { + let mut cache_handle = cache::CacheHandle::get(self.uid_store.clone())?; + debug!(cache_handle + .insert_envelopes(mailbox_hash, &v) + .chain_err_summary(|| { + format!( + "Could not save envelopes in cache for mailbox {}", + mailbox_path + ) + }))?; + } + + for FetchResponse { uid, envelope, .. } in v { + let uid = uid.unwrap(); + let env = envelope.unwrap(); + /* + debug!( + "env hash {} {} UID = {} MSN = {}", + env.hash(), + env.subject(), + uid, + message_sequence_number + ); + */ + self.uid_store + .hash_index + .lock() + .unwrap() + .insert(env.hash(), (uid, mailbox_hash)); + self.uid_store + .uid_index + .lock() + .unwrap() + .insert((mailbox_hash, uid), env.hash()); + payload.push((uid, env)); + } + debug!("sending payload for {}", mailbox_hash); + unseen + .lock() + .unwrap() + .insert_existing_set(new_unseen.iter().cloned().collect()); + mailbox_exists + .lock() + .unwrap() + .insert_existing_set(payload.iter().map(|(_, env)| env.hash()).collect::<_>()); + // 3. tag2 UID FETCH 1: FLAGS + self.send_command( + format!( + "UID FETCH 1:{} FLAGS (CHANGEDSINCE {})", + cached_max_uid, cached_highestmodseq + ) + .as_bytes(), + ) + .await?; + self.read_response(&mut response, RequiredResponses::FETCH_REQUIRED) + .await?; + //1) update cached flags for old messages; + let mut env_lck = self.uid_store.envelopes.lock().unwrap(); + let (_, v, _) = protocol_parser::fetch_responses(&response)?; + for FetchResponse { uid, flags, .. } in v { + let uid = uid.unwrap(); + let env_hash = generate_envelope_hash(&mailbox_path, &uid); + if !env_lck.contains_key(&env_hash) { + return Ok(None); + } + let (flags, tags) = flags.unwrap(); + if env_lck[&env_hash].inner.flags() != flags + || env_lck[&env_hash].inner.labels() + != &tags + .iter() + .map(|t| tag_hash!(t)) + .collect::>() + { + env_lck.entry(env_hash).and_modify(|entry| { + entry.inner.set_flags(flags); + entry.inner.labels_mut().clear(); + entry + .inner + .labels_mut() + .extend(tags.iter().map(|t| tag_hash!(t))); + }); + refresh_events.push(RefreshEvent { + mailbox_hash, + account_hash: self.uid_store.account_hash, + kind: RefreshEventKind::NewFlags(env_hash, (flags, tags)), + }); + } + } + self.uid_store + .highestmodseqs + .lock() + .unwrap() + .insert(mailbox_hash, Ok(new_highestmodseq)); + } + let mut valid_envs = BTreeSet::default(); + // This should be UID SEARCH 1: but it's difficult to compare to cached UIDs at the + // point of calling this function + self.send_command(b"UID SEARCH ALL").await?; + self.read_response(&mut response, RequiredResponses::SEARCH) + .await?; + //1) update cached flags for old messages; + let (_, v) = protocol_parser::search_results(response.as_bytes())?; + for uid in v { + valid_envs.insert(generate_envelope_hash(&mailbox_path, &uid)); + } + let mut env_lck = self.uid_store.envelopes.lock().unwrap(); + for env_hash in valid_envs.difference( + &env_lck + .iter() + .filter_map(|(h, cenv)| { + if cenv.mailbox_hash == mailbox_hash { + Some(*h) + } else { + None + } + }) + .collect(), + ) { + env_lck.remove(env_hash); + refresh_events.push(RefreshEvent { + mailbox_hash, + account_hash: self.uid_store.account_hash, + kind: RefreshEventKind::Remove(*env_hash), + }); + } + drop(env_lck); + for ev in refresh_events { + self.add_refresh_event(ev); + } + Ok(Some(payload.into_iter().map(|(_, env)| env).collect())) + } + + //rfc7162_Quick Flag Changes Resynchronization (CONDSTORE)_and Quick Mailbox Resynchronization (QRESYNC) + pub async fn resync_condstoreqresync( + &mut self, + _cache_handle: CacheHandle, + _mailbox_hash: MailboxHash, + ) -> Result>> { + Ok(None) + } + + pub async fn init_mailbox(&mut self, mailbox_hash: MailboxHash) -> Result { + let mut response = String::with_capacity(8 * 1024); + let (mailbox_path, mailbox_exists, unseen, permissions) = { + let f = &self.uid_store.mailboxes.lock().await[&mailbox_hash]; + ( + f.imap_path().to_string(), + f.exists.clone(), + f.unseen.clone(), + f.permissions.clone(), + ) + }; + + self.create_uid_msn_cache(mailbox_hash, 1).await?; + /* first SELECT the mailbox to get READ/WRITE permissions (because EXAMINE only + * returns READ-ONLY for both cases) */ + let mut select_response = self + .select_mailbox(mailbox_hash, &mut response, true) + .await + .chain_err_summary(|| format!("Could not select mailbox {}", mailbox_path))? + .unwrap(); + debug!( + "mailbox: {} select_response: {:?}", + mailbox_path, select_response + ); + { + { + let mut uidvalidities = self.uid_store.uidvalidity.lock().unwrap(); + + let v = uidvalidities + .entry(mailbox_hash) + .or_insert(select_response.uidvalidity); + *v = select_response.uidvalidity; + } + let mut permissions = permissions.lock().unwrap(); + permissions.create_messages = !select_response.read_only; + permissions.remove_messages = !select_response.read_only; + permissions.set_flags = !select_response.read_only; + permissions.rename_messages = !select_response.read_only; + permissions.delete_messages = !select_response.read_only; + mailbox_exists + .lock() + .unwrap() + .set_not_yet_seen(select_response.exists); + unseen + .lock() + .unwrap() + .set_not_yet_seen(select_response.unseen); + } + if select_response.exists == 0 { + return Ok(select_response); + } + /* reselecting the same mailbox with EXAMINE prevents expunging it */ + self.examine_mailbox(mailbox_hash, &mut response, true) + .await?; + if select_response.uidnext == 0 { + /* UIDNEXT shouldn't be 0, since exists != 0 at this point */ + self.send_command(format!("STATUS \"{}\" (UIDNEXT)", mailbox_path).as_bytes()) + .await?; + self.read_response(&mut response, RequiredResponses::STATUS) + .await?; + let (_, status) = protocol_parser::status_response(response.as_bytes())?; + if let Some(uidnext) = status.uidnext { + if uidnext == 0 { + return Err(MeliError::new( + "IMAP server error: zero UIDNEXT with nonzero exists.", + )); + } + select_response.uidnext = uidnext; + } else { + return Err(MeliError::new("IMAP server did not reply with UIDNEXT")); + } + } + Ok(select_response) + } +} diff --git a/melib/src/backends/imap/connection.rs b/melib/src/backends/imap/connection.rs index 7379cea00..63a8bd335 100644 --- a/melib/src/backends/imap/connection.rs +++ b/melib/src/backends/imap/connection.rs @@ -19,8 +19,8 @@ * along with meli. If not, see . */ -use super::protocol_parser::{ImapLineSplit, ImapResponse, RequiredResponses}; -use crate::backends::MailboxHash; +use super::protocol_parser::{ImapLineSplit, ImapResponse, RequiredResponses, SelectResponse}; +use crate::backends::{MailboxHash, RefreshEvent}; use crate::connections::{lookup_ipv4, timeout, Connection}; use crate::email::parser::BytesExt; use crate::error::*; @@ -39,6 +39,16 @@ use std::time::{Duration, Instant}; use super::protocol_parser; use super::{Capabilities, ImapServerConf, UIDStore}; +#[derive(Debug, Clone, Copy)] +pub enum SyncPolicy { + None, + ///rfc4549 `Synch Ops for Disconnected IMAP4 Clients` https://tools.ietf.org/html/rfc4549 + Basic, + ///rfc7162 `IMAP Extensions: Quick Flag Changes Resynchronization (CONDSTORE) and Quick Mailbox Resynchronization (QRESYNC)` + Condstore, + CondstoreQresync, +} + #[derive(Debug, Clone, Copy)] pub enum ImapProtocol { IMAP { extension_use: ImapExtensionUse }, @@ -47,6 +57,7 @@ pub enum ImapProtocol { #[derive(Debug, Clone, Copy)] pub struct ImapExtensionUse { + pub condstore: bool, pub idle: bool, #[cfg(feature = "deflate_compression")] pub deflate: bool, @@ -55,6 +66,7 @@ pub struct ImapExtensionUse { impl Default for ImapExtensionUse { fn default() -> Self { Self { + condstore: true, idle: true, #[cfg(feature = "deflate_compression")] deflate: true, @@ -91,6 +103,7 @@ async fn try_await(cl: impl Future> + Send) -> Result<()> { pub struct ImapConnection { pub stream: Result, pub server_conf: ImapServerConf, + pub sync_policy: SyncPolicy, pub uid_store: Arc, } @@ -495,6 +508,11 @@ impl ImapConnection { ImapConnection { stream: Err(MeliError::new("Offline".to_string())), server_conf: server_conf.clone(), + sync_policy: if uid_store.keep_offline_cache { + SyncPolicy::Basic + } else { + SyncPolicy::None + }, uid_store, } } @@ -523,12 +541,34 @@ impl ImapConnection { ImapProtocol::IMAP { extension_use: ImapExtensionUse { + condstore, #[cfg(feature = "deflate_compression")] deflate, idle: _idle, }, - } => - { + } => { + if capabilities.contains(&b"CONDSTORE"[..]) && condstore { + match self.sync_policy { + SyncPolicy::None => { /* do nothing, sync is disabled */ } + _ => { + /* Upgrade to Condstore */ + let mut ret = String::new(); + if capabilities.contains(&b"ENABLE"[..]) { + self.send_command(b"ENABLE CONDSTORE").await?; + self.read_response(&mut ret, RequiredResponses::empty()) + .await?; + } else { + self.send_command( + b"STATUS INBOX (UIDNEXT UIDVALIDITY UNSEEN MESSAGES HIGHESTMODSEQ)", + ) + .await?; + self.read_response(&mut ret, RequiredResponses::empty()) + .await?; + } + self.sync_policy = SyncPolicy::Condstore; + } + } + } #[cfg(feature = "deflate_compression")] if capabilities.contains(&b"COMPRESS=DEFLATE"[..]) && deflate { let mut ret = String::new(); @@ -600,12 +640,28 @@ impl ImapConnection { ImapResponse::No(ref response_code) => { //FIXME return error debug!("Received NO response: {:?} {:?}", response_code, response); + (self.uid_store.event_consumer)( + self.uid_store.account_hash, + crate::backends::BackendEvent::Notice { + description: None, + content: response_code.to_string(), + level: crate::logging::LoggingLevel::ERROR, + }, + ); ret.push_str(&response); return r.into(); } ImapResponse::Bad(ref response_code) => { //FIXME return error debug!("Received BAD response: {:?} {:?}", response_code, response); + (self.uid_store.event_consumer)( + self.uid_store.account_hash, + crate::backends::BackendEvent::Notice { + description: None, + content: response_code.to_string(), + level: crate::logging::LoggingLevel::ERROR, + }, + ); ret.push_str(&response); return r.into(); } @@ -691,24 +747,31 @@ impl ImapConnection { mailbox_hash: MailboxHash, ret: &mut String, force: bool, - ) -> Result<()> { + ) -> Result> { if !force && self.stream.as_ref()?.current_mailbox == MailboxSelection::Select(mailbox_hash) { - return Ok(()); + return Ok(None); } - self.send_command( - format!( - "SELECT \"{}\"", - self.uid_store.mailboxes.lock().await[&mailbox_hash].imap_path() - ) - .as_bytes(), - ) - .await?; + let (imap_path, permissions) = { + let m = &self.uid_store.mailboxes.lock().await[&mailbox_hash]; + (m.imap_path().to_string(), m.permissions.clone()) + }; + self.send_command(format!("SELECT \"{}\"", imap_path).as_bytes()) + .await?; self.read_response(ret, RequiredResponses::SELECT_REQUIRED) .await?; debug!("select response {}", ret); + let select_response = protocol_parser::select_response(&ret)?; + { + let mut permissions = permissions.lock().unwrap(); + permissions.create_messages = !select_response.read_only; + permissions.remove_messages = !select_response.read_only; + permissions.set_flags = !select_response.read_only; + permissions.rename_messages = !select_response.read_only; + permissions.delete_messages = !select_response.read_only; + } self.stream.as_mut()?.current_mailbox = MailboxSelection::Select(mailbox_hash); - Ok(()) + Ok(Some(select_response)) } pub async fn examine_mailbox( @@ -716,11 +779,11 @@ impl ImapConnection { mailbox_hash: MailboxHash, ret: &mut String, force: bool, - ) -> Result<()> { + ) -> Result> { if !force && self.stream.as_ref()?.current_mailbox == MailboxSelection::Examine(mailbox_hash) { - return Ok(()); + return Ok(None); } self.send_command( format!( @@ -733,8 +796,9 @@ impl ImapConnection { self.read_response(ret, RequiredResponses::EXAMINE_REQUIRED) .await?; debug!("examine response {}", ret); + let select_response = protocol_parser::select_response(&ret)?; self.stream.as_mut()?.current_mailbox = MailboxSelection::Examine(mailbox_hash); - Ok(()) + Ok(Some(select_response)) } pub async fn unselect(&mut self) -> Result<()> { @@ -782,7 +846,7 @@ impl ImapConnection { Ok(()) } - pub fn add_refresh_event(&mut self, ev: crate::backends::RefreshEvent) { + pub fn add_refresh_event(&mut self, ev: RefreshEvent) { (self.uid_store.event_consumer)( self.uid_store.account_hash, crate::backends::BackendEvent::Refresh(ev), diff --git a/melib/src/backends/imap/operations.rs b/melib/src/backends/imap/operations.rs index 9c30213f7..b4c73cfb8 100644 --- a/melib/src/backends/imap/operations.rs +++ b/melib/src/backends/imap/operations.rs @@ -80,12 +80,13 @@ impl BackendOp for ImapOp { response.len(), response.lines().collect::>().len() ); - let UidFetchResponse { + let FetchResponse { uid: _uid, flags: _flags, body, .. - } = protocol_parser::uid_fetch_response(&response)?.1; + } = protocol_parser::fetch_response(&response)?.1; + let _uid = _uid.unwrap(); assert_eq!(_uid, uid); assert!(body.is_some()); let mut bytes_cache = uid_store.byte_cache.lock()?; diff --git a/melib/src/backends/imap/protocol_parser.rs b/melib/src/backends/imap/protocol_parser.rs index 444c6df07..c23f8f76e 100644 --- a/melib/src/backends/imap/protocol_parser.rs +++ b/melib/src/backends/imap/protocol_parser.rs @@ -29,7 +29,7 @@ use nom::{ character::complete::digit1, character::is_digit, combinator::{map, map_res, opt}, - multi::{fold_many1, length_data, many0, separated_list, separated_nonempty_list}, + multi::{fold_many1, length_data, many0, separated_nonempty_list}, sequence::{delimited, preceded}, }; use std::convert::TryFrom; @@ -449,46 +449,17 @@ pub fn list_mailbox_result(input: &[u8]) -> IResult<&[u8], ImapMailbox> { )) } -pub fn my_flags(input: &[u8]) -> IResult<&[u8], Flag> { - let (input, flags) = separated_list(tag(" "), preceded(tag("\\"), is_not(")")))(input)?; - let mut ret = Flag::default(); - for f in flags { - match f { - b"Answered" => { - ret.set(Flag::REPLIED, true); - } - b"Flagged" => { - ret.set(Flag::FLAGGED, true); - } - b"Deleted" => { - ret.set(Flag::TRASHED, true); - } - b"Seen" => { - ret.set(Flag::SEEN, true); - } - b"Draft" => { - ret.set(Flag::DRAFT, true); - } - f => { - debug!("unknown Flag token value: {}", unsafe { - std::str::from_utf8_unchecked(f) - }); - } - } - } - Ok((input, ret)) -} - -#[derive(Debug)] -pub struct UidFetchResponse<'a> { - pub uid: UID, +#[derive(Debug, Clone, PartialEq)] +pub struct FetchResponse<'a> { + pub uid: Option, pub message_sequence_number: usize, + pub modseq: Option, pub flags: Option<(Flag, Vec)>, pub body: Option<&'a [u8]>, pub envelope: Option, } -pub fn uid_fetch_response(input: &str) -> ImapParseResult> { +pub fn fetch_response(input: &str) -> ImapParseResult> { macro_rules! should_start_with { ($input:expr, $tag:literal) => { if !$input.starts_with($tag) { @@ -533,9 +504,10 @@ pub fn uid_fetch_response(input: &str) -> ImapParseResult> }; } - let mut ret = UidFetchResponse { - uid: 0, + let mut ret = FetchResponse { + uid: None, message_sequence_number: 0, + modseq: None, flags: None, body: None, envelope: None, @@ -564,7 +536,8 @@ pub fn uid_fetch_response(input: &str) -> ImapParseResult> )(input[i..].as_bytes()) { i += input.len() - i - rest.len(); - ret.uid = usize::from_str(unsafe { std::str::from_utf8_unchecked(uid) }).unwrap(); + ret.uid = + Some(usize::from_str(unsafe { std::str::from_utf8_unchecked(uid) }).unwrap()); } else { return debug!(Err(MeliError::new(format!( "Unexpected input while parsing UID FETCH response. Got: `{:.40}`", @@ -582,6 +555,23 @@ pub fn uid_fetch_response(input: &str) -> ImapParseResult> input )))); } + } else if input[i..].starts_with("MODSEQ (") { + i += "MODSEQ (".len(); + if let Ok((rest, modseq)) = take_while::<_, &[u8], (&[u8], nom::error::ErrorKind)>( + is_digit, + )(input[i..].as_bytes()) + { + i += (input.len() - i - rest.len()) + 1; + ret.modseq = u64::from_str(to_str!(modseq)) + .ok() + .and_then(std::num::NonZeroU64::new) + .map(ModSequence); + } else { + return debug!(Err(MeliError::new(format!( + "Unexpected input while parsing MODSEQ in UID FETCH response. Got: `{:.40}`", + input + )))); + } } else if input[i..].starts_with("RFC822 {") { i += "RFC822 ".len(); if let Ok((rest, body)) = @@ -668,12 +658,12 @@ pub fn uid_fetch_response(input: &str) -> ImapParseResult> Ok((&input[i..], ret, None)) } -pub fn uid_fetch_responses(mut input: &str) -> ImapParseResult>> { +pub fn fetch_responses(mut input: &str) -> ImapParseResult>> { let mut ret = Vec::new(); let mut alert: Option = None; - loop { - let next_response = uid_fetch_response(input); + while input.starts_with("* ") { + let next_response = fetch_response(input); match next_response { Ok((rest, el, el_alert)) => { if let Some(el_alert) = el_alert { @@ -686,9 +676,6 @@ pub fn uid_fetch_responses(mut input: &str) -> ImapParseResult { return Err(MeliError::new(format!( @@ -700,50 +687,17 @@ pub fn uid_fetch_responses(mut input: &str) -> ImapParseResult IResult<&[u8], Vec<(usize, Option<(Flag, Vec)>, &[u8])>> { - many0( - |input| -> IResult<&[u8], (usize, Option<(Flag, Vec)>, &[u8])> { - let (input, _) = tag("* ")(input)?; - let (input, _) = take_while(is_digit)(input)?; - let (input, result) = permutation(( - preceded( - alt((tag("UID "), tag(" UID "))), - map_res(digit1, |s| { - usize::from_str(unsafe { std::str::from_utf8_unchecked(s) }) - }), - ), - opt(preceded( - alt((tag("FLAGS "), tag(" FLAGS "))), - delimited(tag("("), byte_flags, tag(")")), - )), - length_data(delimited( - tag("{"), - map_res(digit1, |s| { - usize::from_str(unsafe { std::str::from_utf8_unchecked(s) }) - }), - tag("}\r\n"), - )), - ))(input.ltrim())?; - let (input, _) = tag(")\r\n")(input)?; - Ok((input, (result.0, result.1, result.2))) - }, - )(input) -} - pub fn uid_fetch_flags_responses( input: &[u8], ) -> IResult<&[u8], Vec<(usize, (Flag, Vec))>> { @@ -817,21 +771,11 @@ pub fn capabilities(input: &[u8]) -> IResult<&[u8], Vec<&[u8]>> { let (input, _) = take_until("\r\n")(input)?; let (input, _) = tag("\r\n")(input)?; Ok((input, ret)) - /* - pub capabilities<>, - do_parse!( - take_until!("CAPABILITY ") - >> tag!("CAPABILITY ") - >> ret: separated_nonempty_list_complete!(tag!(" "), is_not!(" ]\r\n")) - >> take_until!("\r\n") - >> tag!("\r\n") - >> ({ ret }) - ) - */ } /// This enum represents the server's untagged responses detailed in `7. Server Responses` of RFC 3501 INTERNET MESSAGE ACCESS PROTOCOL - VERSION 4rev1 -pub enum UntaggedResponse { +#[derive(Debug, PartialEq)] +pub enum UntaggedResponse<'s> { /// ```text /// 7.4.1. EXPUNGE Response /// @@ -891,52 +835,81 @@ pub enum UntaggedResponse { /// messages). /// ``` Recent(usize), - Fetch(usize, (Flag, Vec)), - UIDFetch(UID, (Flag, Vec)), + Fetch(FetchResponse<'s>), Bye { - reason: String, + reason: &'s str, }, } -pub fn untagged_responses(input: &[u8]) -> IResult<&[u8], Option> { +pub fn untagged_responses(input: &str) -> ImapParseResult>> { let orig_input = input; - let (input, _) = tag("* ")(input)?; - let (input, num) = map_res(digit1, |s| usize::from_str(to_str!(s)))(input)?; - let (input, _) = tag(" ")(input)?; - let (input, _tag) = take_until("\r\n")(input)?; - let (input, _) = tag("\r\n")(input)?; - debug!("Parse untagged response from {:?}", to_str!(orig_input)); - Ok((input, { - use UntaggedResponse::*; - match _tag { - b"EXPUNGE" => Some(Expunge(num)), - b"EXISTS" => Some(Exists(num)), - b"RECENT" => Some(Recent(num)), - _ if _tag.starts_with(b"FETCH ") => { - if to_str!(_tag).contains("UID") { - let (uid, flags) = uid_fetch_flags_response(orig_input)?.1; - Some(UIDFetch(uid, flags)) - } else { - let f = flags(unsafe { - std::str::from_utf8_unchecked(&_tag[b"FETCH (FLAGS (".len()..]) - }) - .map(|(_, flags)| Fetch(num, flags)); - if let Err(ref err) = f { - debug!( - "untagged_response malformed fetch: {} {}", - unsafe { std::str::from_utf8_unchecked(_tag) }, - err - ) - } - f.ok() + let (input, _) = tag::<_, &str, (&str, nom::error::ErrorKind)>("* ")(input)?; + let (input, num) = + map_res::<_, _, _, (&str, nom::error::ErrorKind), _, _, _>(digit1, |s| usize::from_str(s))( + input, + )?; + let (input, _) = tag::<_, &str, (&str, nom::error::ErrorKind)>(" ")(input)?; + let (input, _tag) = take_until::<_, &str, (&str, nom::error::ErrorKind)>("\r\n")(input)?; + let (input, _) = tag::<_, &str, (&str, nom::error::ErrorKind)>("\r\n")(input)?; + debug!("Parse untagged response from {:?}", orig_input); + Ok(( + input, + { + use UntaggedResponse::*; + match _tag { + "EXPUNGE" => Some(Expunge(num)), + "EXISTS" => Some(Exists(num)), + "RECENT" => Some(Recent(num)), + _ if _tag.starts_with("FETCH ") => Some(Fetch(fetch_response(orig_input)?.1)), + _ => { + debug!("unknown untagged_response: {}", _tag); + None } } - _ => { - debug!("unknown untagged_response: {}", to_str!(_tag)); - None - } - } - })) + }, + None, + )) +} + +#[test] +fn test_untagged_responses() { + use std::convert::TryInto; + use UntaggedResponse::*; + assert_eq!( + untagged_responses("* 2 EXISTS\r\n") + .map(|(_, v, _)| v) + .unwrap() + .unwrap(), + Exists(2) + ); + assert_eq!( + untagged_responses("* 1079 FETCH (UID 1103 MODSEQ (1365) FLAGS (\\Seen))\r\n") + .map(|(_, v, _)| v) + .unwrap() + .unwrap(), + Fetch(FetchResponse { + uid: Some(1103), + message_sequence_number: 1079, + modseq: Some(ModSequence(std::num::NonZeroU64::new(1365_u64).unwrap())), + flags: Some((Flag::SEEN, vec![])), + body: None, + envelope: None + }) + ); + assert_eq!( + untagged_responses("* 1 FETCH (FLAGS (\\Seen))\r\n") + .map(|(_, v, _)| v) + .unwrap() + .unwrap(), + Fetch(FetchResponse { + uid: None, + message_sequence_number: 1, + modseq: None, + flags: Some((Flag::SEEN, vec![])), + body: None, + envelope: None + }) + ); } pub fn search_results<'a>(input: &'a [u8]) -> IResult<&'a [u8], Vec> { @@ -991,7 +964,7 @@ fn test_imap_search() { ); } -#[derive(Debug, Default, Clone)] +#[derive(Debug, Default, PartialEq, Clone)] pub struct SelectResponse { pub exists: usize, pub recent: usize, @@ -1003,6 +976,7 @@ pub struct SelectResponse { /// if SELECT returns \* we can set arbritary flags permanently. pub can_create_flags: bool, pub read_only: bool, + pub highestmodseq: Option>, } /* @@ -1030,11 +1004,11 @@ pub struct SelectResponse { pub fn select_response(input: &str) -> Result { if input.contains("* OK") { let mut ret = SelectResponse::default(); - for l in input.split("\r\n") { - if l.starts_with("* ") && l.ends_with(" EXISTS") { - ret.exists = usize::from_str(&l["* ".len()..l.len() - " EXISTS".len()])?; - } else if l.starts_with("* ") && l.ends_with(" RECENT") { - ret.recent = usize::from_str(&l["* ".len()..l.len() - " RECENT".len()])?; + for l in input.split_rn() { + if l.starts_with("* ") && l.ends_with(" EXISTS\r\n") { + ret.exists = usize::from_str(&l["* ".len()..l.len() - " EXISTS\r\n".len()])?; + } else if l.starts_with("* ") && l.ends_with(" RECENT\r\n") { + ret.recent = usize::from_str(&l["* ".len()..l.len() - " RECENT\r\n".len()])?; } else if l.starts_with("* FLAGS (") { ret.flags = flags(&l["* FLAGS (".len()..l.len() - ")".len()]).map(|(_, v)| v)?; } else if l.starts_with("* OK [UNSEEN ") { @@ -1053,6 +1027,16 @@ pub fn select_response(input: &str) -> Result { ret.read_only = false; } else if l.contains("OK [READ-ONLY]") { ret.read_only = true; + } else if l.starts_with("* OK [HIGHESTMODSEQ ") { + let res: IResult<&str, &str> = take_until("]")(&l["* OK [HIGHESTMODSEQ ".len()..]); + let (_, highestmodseq) = res?; + ret.highestmodseq = Some( + std::num::NonZeroU64::new(u64::from_str(&highestmodseq)?) + .map(|u| Ok(ModSequence(u))) + .unwrap_or(Err(())), + ); + } else if l.starts_with("* OK [NOMODSEQ") { + ret.highestmodseq = Some(Err(())); } else if !l.is_empty() { debug!("select response: {}", l); } @@ -1064,6 +1048,76 @@ pub fn select_response(input: &str) -> Result { } } +#[test] +fn test_select_response() { + use std::convert::TryInto; + let r = "* FLAGS (\\Answered \\Flagged \\Deleted \\Seen \\Draft)\r\n* OK [PERMANENTFLAGS (\\Answered \\Flagged \\Deleted \\Seen \\Draft \\*)] Flags permitted.\r\n* 45 EXISTS\r\n* 0 RECENT\r\n* OK [UNSEEN 16] First unseen.\r\n* OK [UIDVALIDITY 1554422056] UIDs valid\r\n* OK [UIDNEXT 50] Predicted next UID\r\n"; + + assert_eq!( + select_response(r).expect("Could not parse IMAP select response"), + SelectResponse { + exists: 45, + recent: 0, + flags: ( + Flag::REPLIED | Flag::SEEN | Flag::TRASHED | Flag::DRAFT | Flag::FLAGGED, + Vec::new() + ), + unseen: 16, + uidvalidity: 1554422056, + uidnext: 50, + permanentflags: ( + Flag::REPLIED | Flag::SEEN | Flag::TRASHED | Flag::DRAFT | Flag::FLAGGED, + vec!["*".into()] + ), + can_create_flags: true, + read_only: false, + highestmodseq: None + } + ); + let r = "* 172 EXISTS\r\n* 1 RECENT\r\n* OK [UNSEEN 12] Message 12 is first unseen\r\n* OK [UIDVALIDITY 3857529045] UIDs valid\r\n* OK [UIDNEXT 4392] Predicted next UID\r\n* FLAGS (\\Answered \\Flagged \\Deleted \\Seen \\Draft)\r\n* OK [PERMANENTFLAGS (\\Deleted \\Seen \\*)] Limited\r\n* OK [HIGHESTMODSEQ 715194045007]\r\n* A142 OK [READ-WRITE] SELECT completed\r\n"; + + assert_eq!( + select_response(r).expect("Could not parse IMAP select response"), + SelectResponse { + exists: 172, + recent: 1, + flags: ( + Flag::REPLIED | Flag::SEEN | Flag::TRASHED | Flag::DRAFT | Flag::FLAGGED, + Vec::new() + ), + unseen: 12, + uidvalidity: 3857529045, + uidnext: 4392, + permanentflags: (Flag::SEEN | Flag::TRASHED, vec!["*".into()]), + can_create_flags: true, + read_only: false, + highestmodseq: Some(Ok(ModSequence( + std::num::NonZeroU64::new(715194045007_u64).unwrap() + ))), + } + ); + let r = "* 172 EXISTS\r\n* 1 RECENT\r\n* OK [UNSEEN 12] Message 12 is first unseen\r\n* OK [UIDVALIDITY 3857529045] UIDs valid\r\n* OK [UIDNEXT 4392] Predicted next UID\r\n* FLAGS (\\Answered \\Flagged \\Deleted \\Seen \\Draft)\r\n* OK [PERMANENTFLAGS (\\Deleted \\Seen \\*)] Limited\r\n* OK [NOMODSEQ] Sorry, this mailbox format doesn't support modsequences\r\n* A142 OK [READ-WRITE] SELECT completed\r\n"; + + assert_eq!( + select_response(r).expect("Could not parse IMAP select response"), + SelectResponse { + exists: 172, + recent: 1, + flags: ( + Flag::REPLIED | Flag::SEEN | Flag::TRASHED | Flag::DRAFT | Flag::FLAGGED, + Vec::new() + ), + unseen: 12, + uidvalidity: 3857529045, + uidnext: 4392, + permanentflags: (Flag::SEEN | Flag::TRASHED, vec!["*".into()]), + can_create_flags: true, + read_only: false, + highestmodseq: Some(Err(())), + } + ); +} + pub fn flags(input: &str) -> IResult<&str, (Flag, Vec)> { let mut ret = Flag::default(); let mut keywords = Vec::new(); @@ -1209,69 +1263,6 @@ pub fn envelope(input: &[u8]) -> IResult<&[u8], Envelope> { env }), )) - /* - do_parse!( - tag!("(") - >> opt!(is_a!("\r\n\t ")) - >> date: quoted_or_nil - >> opt!(is_a!("\r\n\t ")) - >> subject: quoted_or_nil - >> opt!(is_a!("\r\n\t ")) - >> from: envelope_addresses - >> opt!(is_a!("\r\n\t ")) - >> sender: envelope_addresses - >> opt!(is_a!("\r\n\t ")) - >> reply_to: envelope_addresses - >> opt!(is_a!("\r\n\t ")) - >> to: envelope_addresses - >> opt!(is_a!("\r\n\t ")) - >> cc: envelope_addresses - >> opt!(is_a!("\r\n\t ")) - >> bcc: envelope_addresses - >> opt!(is_a!("\r\n\t ")) - >> in_reply_to: quoted_or_nil - >> opt!(is_a!("\r\n\t ")) - >> message_id: quoted_or_nil - >> opt!(is_a!("\r\n\t ")) - >> tag!(")") - >> ({ - let mut env = Envelope::new(0); - if let Some(date) = date { - env.set_date(&date); - if let Ok(d) = crate::email::parser::generic::date(env.date_as_str().as_bytes()) { - env.set_datetime(d); - } - } - - if let Some(subject) = subject { - env.set_subject(subject.to_vec()); - } - - if let Some(from) = from { - env.set_from(from); - } - if let Some(to) = to { - env.set_to(to); - } - - if let Some(cc) = cc { - env.set_cc(cc); - } - - if let Some(bcc) = bcc { - env.set_bcc(bcc); - } - if let Some(in_reply_to) = in_reply_to { - env.set_in_reply_to(&in_reply_to); - env.push_references(&in_reply_to); - } - - if let Some(message_id) = message_id { - env.set_message_id(&message_id); - } - env - }) - */ } /* Helper to build StrBuilder for Address structs */ @@ -1305,18 +1296,6 @@ pub fn envelope_addresses<'a>( }, map(tag("\"\""), |_| None), ))(input) - /* - alt_complete!(map!(tag!("NIL"), |_| None) | - do_parse!( - tag!("(") - >> envelopes: many1!(delimited!(ws!(tag!("(")), envelope_address, tag!(")"))) - >> tag!(")") - >> ({ - Some(envelopes) - }) - ) - )); - */ } // Parse an address in the format of the ENVELOPE structure eg @@ -1366,24 +1345,6 @@ pub fn envelope_address(input: &[u8]) -> IResult<&[u8], Address> { }, }), )) - /* - do_parse!( - name: alt_complete!(quoted | map!(tag!("NIL"), |_| Vec::new())) - >> is_a!("\r\n\t ") - >> alt_complete!(quoted| map!(tag!("NIL"), |_| Vec::new())) - >> is_a!("\r\n\t ") - >> mailbox_name: dbg_dmp!(alt_complete!(quoted | map!(tag!("NIL"), |_| Vec::new()))) - >> is_a!("\r\n\t ") - >> host_name: alt_complete!(quoted | map!(tag!("NIL"), |_| Vec::new())) - >> ({ - Address::Mailbox(MailboxAddress { - raw: format!("{}{}<{}@{}>", to_str!(&name), if name.is_empty() { "" } else { " " }, to_str!(&mailbox_name), to_str!(&host_name)).into_bytes(), - display_name: str_builder!(0, name.len()), - address_spec: str_builder!(if name.is_empty() { 1 } else { name.len() + 2 }, mailbox_name.len() + host_name.len() + 1), - }) - }) - )); - */ } // Read a literal ie a byte sequence prefixed with a tag containing its length delimited in {}s @@ -1427,9 +1388,6 @@ pub fn quoted(input: &[u8]) -> IResult<&[u8], Vec> { pub fn quoted_or_nil(input: &[u8]) -> IResult<&[u8], Option>> { alt((map(tag("NIL"), |_| None), map(quoted, |v| Some(v))))(input.ltrim()) - /* - alt_complete!(map!(ws!(tag!("NIL")), |_| None) | map!(quoted, |v| Some(v)))); - */ } pub fn uid_fetch_envelopes_response( @@ -1465,28 +1423,6 @@ pub fn uid_fetch_envelopes_response( })) }, )(input) - /* - many0!( - do_parse!( - tag!("* ") - >> take_while!(call!(is_digit)) - >> tag!(" FETCH (") - >> uid_flags: permutation!(preceded!(ws!(tag!("UID ")), map_res!(digit1, |s| { usize::from_str(unsafe { std::str::from_utf8_unchecked(s) }) })), opt!(preceded!(ws!(tag!("FLAGS ")), delimited!(tag!("("), byte_flags, tag!(")"))))) - >> tag!(" ENVELOPE ") - >> env: ws!(envelope) - >> tag!("BODYSTRUCTURE ") - >> bodystructure: take_until!(")\r\n") - >> tag!(")\r\n") - >> ({ - let mut env = env; - let has_attachments = bodystructure_has_attachments(bodystructure); - env.set_has_attachments(has_attachments); - (uid_flags.0, uid_flags.1, env) - }) - ) - ) - ); - */ } pub fn bodystructure_has_attachments(input: &[u8]) -> bool { @@ -1608,3 +1544,10 @@ fn astring_char_tokens(input: &[u8]) -> IResult<&[u8], &[u8]> { // FIXME is_not(" \r\n")(input) } + +pub fn generate_envelope_hash(mailbox_path: &str, uid: &UID) -> EnvelopeHash { + let mut h = DefaultHasher::new(); + h.write_usize(*uid); + h.write(mailbox_path.as_bytes()); + h.finish() +} diff --git a/melib/src/backends/imap/untagged.rs b/melib/src/backends/imap/untagged.rs index 43ae2b20f..3de81b20b 100644 --- a/melib/src/backends/imap/untagged.rs +++ b/melib/src/backends/imap/untagged.rs @@ -21,7 +21,7 @@ use super::{ImapConnection, MailboxSelection}; use crate::backends::imap::protocol_parser::{ - ImapLineSplit, RequiredResponses, UidFetchResponse, UntaggedResponse, + generate_envelope_hash, FetchResponse, ImapLineSplit, RequiredResponses, UntaggedResponse, }; use crate::backends::BackendMailbox; use crate::backends::{ @@ -60,7 +60,7 @@ impl ImapConnection { let mut response = String::with_capacity(8 * 1024); let untagged_response = - match super::protocol_parser::untagged_responses(line.as_bytes()).map(|(_, v)| v) { + match super::protocol_parser::untagged_responses(line).map(|(_, v, _)| v) { Ok(None) | Err(_) => { return Ok(false); } @@ -103,79 +103,79 @@ impl ImapConnection { /* UID FETCH ALL UID, cross-ref, then FETCH difference headers * */ debug!("exists {}", n); - if n > mailbox.exists.lock().unwrap().len() { - try_fail!( - mailbox_hash, - self.send_command( - &[ - b"FETCH", - format!("{}:{}", mailbox.exists.lock().unwrap().len() + 1, n).as_bytes(), - b"(UID FLAGS RFC822)", - ] - .join(&b' '), - ).await - self.read_response(&mut response, RequiredResponses::FETCH_REQUIRED).await - ); - match super::protocol_parser::uid_fetch_responses(&response) { - Ok((_, v, _)) => { - 'fetch_responses: for UidFetchResponse { - uid, flags, body, .. - } in v + try_fail!( + mailbox_hash, + self.send_command(format!("FETCH {} (UID FLAGS RFC822)", n).as_bytes()).await + self.read_response(&mut response, RequiredResponses::FETCH_REQUIRED).await + ); + match super::protocol_parser::fetch_responses(&response) { + Ok((_, v, _)) => { + 'fetch_responses: for FetchResponse { + uid, flags, body, .. + } in v + { + let uid = uid.unwrap(); + if self + .uid_store + .uid_index + .lock() + .unwrap() + .contains_key(&(mailbox_hash, uid)) { - if self - .uid_store + continue 'fetch_responses; + } + let env_hash = generate_envelope_hash(&mailbox.imap_path(), &uid); + self.uid_store + .msn_index + .lock() + .unwrap() + .entry(mailbox_hash) + .or_default() + .push(uid); + if let Ok(mut env) = + Envelope::from_bytes(body.unwrap(), flags.as_ref().map(|&(f, _)| f)) + { + env.set_hash(env_hash); + self.uid_store + .hash_index + .lock() + .unwrap() + .insert(env_hash, (uid, mailbox_hash)); + self.uid_store .uid_index .lock() .unwrap() - .contains_key(&(mailbox_hash, uid)) - { - continue 'fetch_responses; - } - if let Ok(mut env) = Envelope::from_bytes( - body.unwrap(), - flags.as_ref().map(|&(f, _)| f), - ) { - self.uid_store - .hash_index - .lock() - .unwrap() - .insert(env.hash(), (uid, mailbox_hash)); - self.uid_store - .uid_index - .lock() - .unwrap() - .insert((mailbox_hash, uid), env.hash()); - if let Some((_, keywords)) = flags { - let mut tag_lck = self.uid_store.tag_index.write().unwrap(); - for f in keywords { - let hash = tag_hash!(f); - if !tag_lck.contains_key(&hash) { - tag_lck.insert(hash, f); - } - env.labels_mut().push(hash); + .insert((mailbox_hash, uid), env_hash); + if let Some((_, keywords)) = flags { + let mut tag_lck = self.uid_store.tag_index.write().unwrap(); + for f in keywords { + let hash = tag_hash!(f); + if !tag_lck.contains_key(&hash) { + tag_lck.insert(hash, f); } + env.labels_mut().push(hash); } - debug!( - "Create event {} {} {}", - env.hash(), - env.subject(), - mailbox.path(), - ); - if !env.is_seen() { - mailbox.unseen.lock().unwrap().insert_new(env.hash()); - } - mailbox.exists.lock().unwrap().insert_new(env.hash()); - self.add_refresh_event(RefreshEvent { - account_hash: self.uid_store.account_hash, - mailbox_hash, - kind: Create(Box::new(env)), - }); } + debug!( + "Create event {} {} {}", + env.hash(), + env.subject(), + mailbox.path(), + ); + if !env.is_seen() { + mailbox.unseen.lock().unwrap().insert_new(env.hash()); + } + mailbox.exists.lock().unwrap().insert_new(env.hash()); + self.add_refresh_event(RefreshEvent { + account_hash: self.uid_store.account_hash, + mailbox_hash, + kind: Create(Box::new(env)), + }); } } - Err(e) => { - debug!(e); - } + } + Err(e) => { + debug!(e); } } } @@ -202,12 +202,13 @@ impl ImapConnection { self.read_response(&mut response, RequiredResponses::FETCH_REQUIRED).await ); debug!(&response); - match super::protocol_parser::uid_fetch_responses(&response) { + match super::protocol_parser::fetch_responses(&response) { Ok((_, v, _)) => { - for UidFetchResponse { + for FetchResponse { uid, flags, body, .. } in v { + let uid = uid.unwrap(); if !self .uid_store .uid_index @@ -278,69 +279,91 @@ impl ImapConnection { } } } - UntaggedResponse::UIDFetch(uid, flags) => { - debug!("fetch uid {} {:?}", uid, flags); - let lck = self.uid_store.uid_index.lock().unwrap(); - let env_hash = lck.get(&(mailbox_hash, uid)).copied(); - drop(lck); - if let Some(env_hash) = env_hash { - if !flags.0.intersects(crate::email::Flag::SEEN) { - mailbox.unseen.lock().unwrap().insert_new(env_hash); + UntaggedResponse::Fetch(FetchResponse { + uid, + message_sequence_number: msg_seq, + modseq, + flags, + body: _, + envelope: _, + }) => { + if let Some(modseq) = modseq { + if self + .uid_store + .reverse_modseq + .lock() + .unwrap() + .entry(mailbox_hash) + .or_default() + .contains_key(&modseq) + { + return Ok(true); + } + } + + if let Some(flags) = flags { + let uid = if let Some(uid) = uid { + uid } else { - mailbox.unseen.lock().unwrap().remove(env_hash); - } - self.add_refresh_event(RefreshEvent { - account_hash: self.uid_store.account_hash, - mailbox_hash, - kind: NewFlags(env_hash, flags), - }); - }; - } - UntaggedResponse::Fetch(msg_seq, flags) => { - /* a * {msg_seq} FETCH (FLAGS ({flags})) was received, so find out UID from msg_seq - * and send update - */ - debug!("fetch {} {:?}", msg_seq, flags); - try_fail!( - mailbox_hash, - self.send_command( - &[ - b"UID SEARCH", - format!("{}", msg_seq).as_bytes(), - ] - .join(&b' '), - ).await - self.read_response(&mut response, RequiredResponses::SEARCH).await - ); - debug!(&response); - match super::protocol_parser::search_results( - response.split_rn().next().unwrap_or("").as_bytes(), - ) - .map(|(_, v)| v) - { - Ok(mut v) => { - if let Some(uid) = v.pop() { - let lck = self.uid_store.uid_index.lock().unwrap(); - let env_hash = lck.get(&(mailbox_hash, uid)).copied(); - drop(lck); - if let Some(env_hash) = env_hash { - if !flags.0.intersects(crate::email::Flag::SEEN) { - mailbox.unseen.lock().unwrap().insert_new(env_hash); - } else { - mailbox.unseen.lock().unwrap().remove(env_hash); - } - self.add_refresh_event(RefreshEvent { - account_hash: self.uid_store.account_hash, - mailbox_hash, - kind: NewFlags(env_hash, flags), - }); - }; - } - } - Err(e) => { + try_fail!( + mailbox_hash, + self.send_command( + &[ + b"UID SEARCH", + format!("{}", msg_seq).as_bytes(), + ] + .join(&b' '), + ).await + self.read_response(&mut response, RequiredResponses::SEARCH).await + ); debug!(&response); - debug!(e); - } + match super::protocol_parser::search_results( + response.split_rn().next().unwrap_or("").as_bytes(), + ) + .map(|(_, v)| v) + { + Ok(mut v) if v.len() == 1 => v.pop().unwrap(), + Ok(_) => { + return Ok(false); + } + Err(e) => { + debug!(&response); + debug!(e); + return Ok(false); + } + } + }; + debug!("fetch uid {} {:?}", uid, flags); + let lck = self.uid_store.uid_index.lock().unwrap(); + let env_hash = lck.get(&(mailbox_hash, uid)).copied(); + drop(lck); + if let Some(env_hash) = env_hash { + if !flags.0.intersects(crate::email::Flag::SEEN) { + mailbox.unseen.lock().unwrap().insert_new(env_hash); + } else { + mailbox.unseen.lock().unwrap().remove(env_hash); + } + if let Some(modseq) = modseq { + self.uid_store + .reverse_modseq + .lock() + .unwrap() + .entry(mailbox_hash) + .or_default() + .insert(modseq, env_hash); + self.uid_store + .modseq + .lock() + .unwrap() + .insert(env_hash, modseq); + } + + self.add_refresh_event(RefreshEvent { + account_hash: self.uid_store.account_hash, + mailbox_hash, + kind: NewFlags(env_hash, flags), + }); + }; } } } diff --git a/melib/src/backends/imap/watch.rs b/melib/src/backends/imap/watch.rs index d452f24f5..367a10b7b 100644 --- a/melib/src/backends/imap/watch.rs +++ b/melib/src/backends/imap/watch.rs @@ -20,8 +20,6 @@ */ use super::*; use crate::backends::SpecialUsageMailbox; -use crate::email::parser::BytesExt; -use crate::email::parser::BytesIterExt; use std::sync::Arc; /// Arguments for IMAP watching functions @@ -31,25 +29,6 @@ pub struct ImapWatchKit { pub uid_store: Arc, } -macro_rules! exit_on_error { - ($conn:expr, $mailbox_hash:ident, $($result:expr)+) => { - $(if let Err(e) = $result { - *$conn.uid_store.is_online.lock().unwrap() = ( - Instant::now(), - Err(e.clone()), - ); - debug!("failure: {}", e.to_string()); - let account_hash = $conn.uid_store.account_hash; - $conn.add_refresh_event(RefreshEvent { - account_hash, - mailbox_hash: $mailbox_hash, - kind: RefreshEventKind::Failure(e.clone()), - }); - Err(e) - } else { Ok(()) }?;)+ - }; -} - pub async fn poll_with_examine(kit: ImapWatchKit) -> Result<()> { debug!("poll with examine"); let ImapWatchKit { @@ -61,13 +40,13 @@ pub async fn poll_with_examine(kit: ImapWatchKit) -> Result<()> { let mut response = String::with_capacity(8 * 1024); loop { let mailboxes: HashMap = { - let mailboxes_lck = uid_store.mailboxes.lock().await; + let mailboxes_lck = timeout(Duration::from_secs(3), uid_store.mailboxes.lock()).await?; mailboxes_lck.clone() }; for (_, mailbox) in mailboxes { examine_updates(mailbox, &mut conn, &uid_store).await?; } - let mut main_conn = main_conn.lock().await; + let mut main_conn = timeout(Duration::from_secs(3), main_conn.lock()).await?; main_conn.send_command(b"NOOP").await?; main_conn .read_response(&mut response, RequiredResponses::empty()) @@ -95,37 +74,24 @@ pub async fn idle(kit: ImapWatchKit) -> Result<()> { { Some(mailbox) => mailbox, None => { - let err = MeliError::new("INBOX mailbox not found in local mailbox index. meli may have not parsed the IMAP mailboxes correctly"); - debug!("failure: {}", err.to_string()); - conn.add_refresh_event(RefreshEvent { - account_hash: uid_store.account_hash, - mailbox_hash: 0, - kind: RefreshEventKind::Failure(err.clone()), - }); - return Err(err); + return Err(MeliError::new("INBOX mailbox not found in local mailbox index. meli may have not parsed the IMAP mailboxes correctly")); } }; let mailbox_hash = mailbox.hash(); - let uidvalidity; let mut response = String::with_capacity(8 * 1024); - exit_on_error!( - conn, - mailbox_hash, - conn.send_command(format!("SELECT \"{}\"", mailbox.imap_path()).as_bytes()) - .await - conn.read_response(&mut response, RequiredResponses::SELECT_REQUIRED) - .await - ); + conn.send_command(format!("SELECT \"{}\"", mailbox.imap_path()).as_bytes()) + .await?; + conn.read_response(&mut response, RequiredResponses::SELECT_REQUIRED) + .await?; debug!("select response {}", &response); { let mut prev_exists = mailbox.exists.lock().unwrap(); match protocol_parser::select_response(&response) { Ok(ok) => { { - uidvalidity = ok.uidvalidity; - let mut uidvalidities = uid_store.uidvalidity.lock().unwrap(); + let uidvalidities = uid_store.uidvalidity.lock().unwrap(); - if let Some(v) = uidvalidities.get_mut(&mailbox_hash) { + if let Some(v) = uidvalidities.get(&mailbox_hash) { if *v != ok.uidvalidity { conn.add_refresh_event(RefreshEvent { account_hash: uid_store.account_hash, @@ -138,7 +104,6 @@ pub async fn idle(kit: ImapWatchKit) -> Result<()> { uid_store.hash_index.lock().unwrap().clear(); uid_store.byte_cache.lock().unwrap().clear(); */ - *v = ok.uidvalidity; } } else { conn.add_refresh_event(RefreshEvent { @@ -146,15 +111,11 @@ pub async fn idle(kit: ImapWatchKit) -> Result<()> { mailbox_hash, kind: RefreshEventKind::Rescan, }); - conn.add_refresh_event(RefreshEvent { - account_hash: uid_store.account_hash, - mailbox_hash, - kind: RefreshEventKind::Failure(MeliError::new(format!( - "Unknown mailbox: {} {}", - mailbox.path(), - mailbox_hash - ))), - }); + return Err(MeliError::new(format!( + "Unknown mailbox: {} {}", + mailbox.path(), + mailbox_hash + ))); } } debug!(&ok); @@ -165,7 +126,7 @@ pub async fn idle(kit: ImapWatchKit) -> Result<()> { } }; } - exit_on_error!(conn, mailbox_hash, conn.send_command(b"IDLE").await); + conn.send_command(b"IDLE").await?; let mut blockn = ImapBlockingConnection::from(conn); let mut beat = std::time::Instant::now(); let mut watch = std::time::Instant::now(); @@ -176,358 +137,50 @@ pub async fn idle(kit: ImapWatchKit) -> Result<()> { while let Some(line) = blockn.as_stream().await { let now = std::time::Instant::now(); if now.duration_since(beat) >= _26_MINS { - let mut main_conn_lck = main_conn.lock().await; - exit_on_error!( - blockn.conn, - mailbox_hash, - blockn.conn.send_raw(b"DONE").await - blockn.conn.read_response(&mut response, RequiredResponses::empty()).await - blockn.conn.send_command(b"IDLE").await - main_conn_lck.send_command(b"NOOP").await - main_conn_lck.read_response(&mut response, RequiredResponses::empty()).await - ); + let mut main_conn_lck = timeout(Duration::from_secs(3), main_conn.lock()).await?; + blockn.conn.send_raw(b"DONE").await?; + blockn + .conn + .read_response(&mut response, RequiredResponses::empty()) + .await?; + blockn.conn.send_command(b"IDLE").await?; + main_conn_lck.send_command(b"NOOP").await?; + main_conn_lck + .read_response(&mut response, RequiredResponses::empty()) + .await?; beat = now; } if now.duration_since(watch) >= _5_MINS { /* Time to poll all inboxes */ - let mut conn = main_conn.lock().await; + let mut conn = timeout(Duration::from_secs(3), main_conn.lock()).await?; let mailboxes: HashMap = { - let mailboxes_lck = uid_store.mailboxes.lock().await; + let mailboxes_lck = + timeout(Duration::from_secs(3), uid_store.mailboxes.lock()).await?; mailboxes_lck.clone() }; for (_, mailbox) in mailboxes { - exit_on_error!( - conn, - mailbox_hash, - examine_updates(mailbox, &mut conn, &uid_store).await - ); + examine_updates(mailbox, &mut conn, &uid_store).await?; } watch = now; } - *uid_store.is_online.lock().unwrap() = (Instant::now(), Ok(())); - match protocol_parser::untagged_responses(line.as_slice()) - .map(|(_, v)| v) - .map_err(MeliError::from) { - Ok(Some(Recent(_r))) => { - let mut conn = main_conn.lock().await; - /* UID SEARCH RECENT */ - exit_on_error!( - conn, - mailbox_hash, - conn.examine_mailbox(mailbox_hash, &mut response, false).await - conn.send_command(b"UID SEARCH RECENT").await - conn.read_response(&mut response, RequiredResponses::SEARCH).await - ); - match protocol_parser::search_results_raw(response.as_bytes()) - .map(|(_, v)| v) - .map_err(MeliError::from) - { - Ok(&[]) => { - debug!("UID SEARCH RECENT returned no results"); - } - Ok(v) => { - exit_on_error!( - conn, - mailbox_hash, - conn.send_command( - &[&b"UID FETCH"[..], &v.trim().split(|b| b == &b' ').join(b','), &b"(FLAGS RFC822)"[..]] - .join(&b' '), - ).await - conn.read_response(&mut response, RequiredResponses::FETCH_REQUIRED).await - ); - debug!(&response); - match protocol_parser::uid_fetch_responses(&response) { - Ok((_, v, _)) => { - for UidFetchResponse { - uid, flags, body, .. - } in v - { - if !uid_store - .uid_index - .lock() - .unwrap() - .contains_key(&(mailbox_hash, uid)) - { - if let Ok(mut env) = Envelope::from_bytes( - /* unwrap() is safe since we ask for RFC822 in the - * above FETCH, thus uid_fetch_responses() if - * returns a successful parse, it will include the - * RFC822 response */ - body.unwrap(), - flags.as_ref().map(|&(f, _)| f), - ) { - uid_store - .hash_index - .lock() - .unwrap() - .insert(env.hash(), (uid, mailbox_hash)); - uid_store - .uid_index - .lock() - .unwrap() - .insert((mailbox_hash, uid), env.hash()); - debug!( - "Create event {} {} {}", - env.hash(), - env.subject(), - mailbox.path(), - ); - if let Some((_, keywords)) = flags { - let mut tag_lck = - uid_store.tag_index.write().unwrap(); - for f in keywords { - let hash = tag_hash!(f); - if !tag_lck.contains_key(&hash) { - tag_lck.insert(hash, f); - } - env.labels_mut().push(hash); - } - } - if !env.is_seen() { - mailbox - .unseen - .lock() - .unwrap() - .insert_new(env.hash()); - } - if uid_store.cache_headers { - cache::save_envelopes( - uid_store.account_hash, - mailbox_hash, - uidvalidity, - &[(uid, &env)], - )?; - } - mailbox.exists.lock().unwrap().insert_new(env.hash()); - - conn.add_refresh_event(RefreshEvent { - account_hash: uid_store.account_hash, - mailbox_hash, - kind: Create(Box::new(env)), - }); - } - } - } - } - Err(e) => { - debug!(e); - } - } - } - Err(e) => { - debug!( - "UID SEARCH RECENT err: {}\nresp: {}", - e.to_string(), - &response - ); - } - } - } - Ok(Some(Expunge(n))) => { - // The EXPUNGE response reports that the specified message sequence - // number has been permanently removed from the mailbox. The message - // sequence number for each successive message in the mailbox is - // immediately decremented by 1, and this decrement is reflected in - // message sequence numbers in subsequent responses (including other - // untagged EXPUNGE responses). - let mut conn = main_conn.lock().await; - let deleted_uid = uid_store - .msn_index - .lock() - .unwrap() - .entry(mailbox_hash) - .or_default() - .remove(n); - debug!("expunge {}, UID = {}", n, deleted_uid); - let deleted_hash: EnvelopeHash = uid_store - .uid_index - .lock() - .unwrap() - .remove(&(mailbox_hash, deleted_uid)) - .unwrap(); - uid_store.hash_index.lock().unwrap().remove(&deleted_hash); - conn.add_refresh_event(RefreshEvent { - account_hash: uid_store.account_hash, - mailbox_hash, - kind: Remove(deleted_hash), - }); - } - Ok(Some(Exists(n))) => { - let mut conn = main_conn.lock().await; - /* UID FETCH ALL UID, cross-ref, then FETCH difference headers - * */ - debug!("exists {}", n); - if n > mailbox.exists.lock().unwrap().len() { - exit_on_error!( - conn, - mailbox_hash, - conn.examine_mailbox(mailbox_hash, &mut response, false).await - conn.send_command( - &[ - b"FETCH", - format!("{}:{}", mailbox.exists.lock().unwrap().len() + 1, n).as_bytes(), - b"(UID FLAGS RFC822)", - ] - .join(&b' '), - ).await - conn.read_response(&mut response, RequiredResponses::FETCH_REQUIRED).await - ); - match protocol_parser::uid_fetch_responses(&response) { - Ok((_, v, _)) => { - 'fetch_responses_b: for UidFetchResponse { - uid, flags, body, .. - } in v - { - if uid_store - .uid_index - .lock() - .unwrap() - .contains_key(&(mailbox_hash, uid)) - { - continue 'fetch_responses_b; - } - if let Ok(mut env) = Envelope::from_bytes( - body.unwrap(), - flags.as_ref().map(|&(f, _)| f), - ) { - uid_store - .hash_index - .lock() - .unwrap() - .insert(env.hash(), (uid, mailbox_hash)); - uid_store - .uid_index - .lock() - .unwrap() - .insert((mailbox_hash, uid), env.hash()); - if let Some((_, keywords)) = flags { - let mut tag_lck = uid_store.tag_index.write().unwrap(); - for f in keywords { - let hash = tag_hash!(f); - if !tag_lck.contains_key(&hash) { - tag_lck.insert(hash, f); - } - env.labels_mut().push(hash); - } - } - debug!( - "Create event {} {} {}", - env.hash(), - env.subject(), - mailbox.path(), - ); - if !env.is_seen() { - mailbox.unseen.lock().unwrap().insert_new(env.hash()); - } - if uid_store.cache_headers { - cache::save_envelopes( - uid_store.account_hash, - mailbox_hash, - uidvalidity, - &[(uid, &env)], - )?; - } - mailbox.exists.lock().unwrap().insert_new(env.hash()); - - conn.add_refresh_event(RefreshEvent { - account_hash: uid_store.account_hash, - mailbox_hash, - kind: Create(Box::new(env)), - }); - } - } - } - Err(e) => { - debug!(e); - } - } - } - } - Ok(Some(UIDFetch(uid, flags))) => { - let res = uid_store - .uid_index - .lock() - .unwrap() - .get(&(mailbox_hash, uid)) - .map(|h| *h); - if let Some(env_hash) = res { - if !flags.0.intersects(crate::email::Flag::SEEN) { - mailbox.unseen.lock().unwrap().insert_new(env_hash); - } else { - mailbox.unseen.lock().unwrap().remove(env_hash); - } - let mut conn = main_conn.lock().await; - conn.add_refresh_event(RefreshEvent { - account_hash: uid_store.account_hash, - mailbox_hash, - kind: NewFlags(env_hash, flags), - }); - } - } - Ok(Some(Fetch(msg_seq, flags))) => { - /* a * {msg_seq} FETCH (FLAGS ({flags})) was received, so find out UID from msg_seq - * and send update - */ - let mut conn = main_conn.lock().await; - debug!("fetch {} {:?}", msg_seq, flags); - exit_on_error!( - conn, - mailbox_hash, - conn.examine_mailbox(mailbox_hash, &mut response, false).await - conn.send_command( - &[ - b"UID SEARCH", - format!("{}", msg_seq).as_bytes(), - ] - .join(&b' '), - ).await - conn.read_response(&mut response, RequiredResponses::SEARCH).await - ); - match search_results(response.split_rn().next().unwrap_or("").as_bytes()) - .map(|(_, v)| v) - { - Ok(mut v) => { - if let Some(uid) = v.pop() { - if let Some(env_hash) = uid_store - .uid_index - .lock() - .unwrap() - .get(&(mailbox_hash, uid)) - { - if !flags.0.intersects(crate::email::Flag::SEEN) { - mailbox.unseen.lock().unwrap().insert_new(*env_hash); - } else { - mailbox.unseen.lock().unwrap().remove(*env_hash); - } - conn.add_refresh_event(RefreshEvent { - account_hash: uid_store.account_hash, - mailbox_hash, - kind: NewFlags(*env_hash, flags), - }); - } - } - } - Err(e) => { - debug!(&response); - debug!(e); - } - } - } - Ok(Some(Bye { .. })) => break, - Ok(None) | Err(_) => {} + let mut conn = timeout(Duration::from_secs(3), main_conn.lock()).await?; + conn.process_untagged(to_str!(&line)).await?; } + *uid_store.is_online.lock().unwrap() = (Instant::now(), Ok(())); } debug!("IDLE connection dropped"); let err: &str = blockn.err().unwrap_or("Unknown reason."); - main_conn.lock().await.add_refresh_event(RefreshEvent { - account_hash: uid_store.account_hash, - mailbox_hash, - kind: RefreshEventKind::Failure(MeliError::new(format!( - "IDLE connection dropped: {}", - &err - ))), - }); + timeout(Duration::from_secs(3), main_conn.lock()) + .await? + .add_refresh_event(RefreshEvent { + account_hash: uid_store.account_hash, + mailbox_hash, + kind: RefreshEventKind::Failure(MeliError::new(format!( + "IDLE connection dropped: {}", + &err + ))), + }); Err(MeliError::new(format!("IDLE connection dropped: {}", err))) } @@ -538,260 +191,170 @@ pub async fn examine_updates( ) -> Result<()> { let mailbox_hash = mailbox.hash(); debug!("examining mailbox {} {}", mailbox_hash, mailbox.path()); - let mut response = String::with_capacity(8 * 1024); - exit_on_error!( - conn, - mailbox_hash, + if let Some(new_envelopes) = conn.resync(mailbox_hash).await? { + for env in new_envelopes { + conn.add_refresh_event(RefreshEvent { + mailbox_hash, + account_hash: uid_store.account_hash, + kind: RefreshEventKind::Create(Box::new(env)), + }); + } + } else { + let mut response = String::with_capacity(8 * 1024); conn.examine_mailbox(mailbox_hash, &mut response, true) - .await - ); - *uid_store.is_online.lock().unwrap() = (Instant::now(), Ok(())); - let uidvalidity; - match protocol_parser::select_response(&response) { - Ok(ok) => { - uidvalidity = ok.uidvalidity; - debug!(&ok); - { - let mut uidvalidities = uid_store.uidvalidity.lock().unwrap(); + .await?; + *uid_store.is_online.lock().unwrap() = (Instant::now(), Ok(())); + let select_response = protocol_parser::select_response(&response) + .chain_err_summary(|| "could not select mailbox")?; + debug!(&select_response); + { + let uidvalidities = uid_store.uidvalidity.lock().unwrap(); - if let Some(v) = uidvalidities.get_mut(&mailbox_hash) { - if *v != ok.uidvalidity { - conn.add_refresh_event(RefreshEvent { - account_hash: uid_store.account_hash, - mailbox_hash, - kind: RefreshEventKind::Rescan, - }); - /* - uid_store.uid_index.lock().unwrap().clear(); - uid_store.hash_index.lock().unwrap().clear(); - uid_store.byte_cache.lock().unwrap().clear(); - */ - *v = ok.uidvalidity; - } - } else { + if let Some(v) = uidvalidities.get(&mailbox_hash) { + if *v != select_response.uidvalidity { + let cache_handle = cache::CacheHandle::get(uid_store.clone())?; + cache_handle.clear( + mailbox_hash, + select_response.uidvalidity, + select_response.highestmodseq.and_then(|i| i.ok()), + )?; conn.add_refresh_event(RefreshEvent { account_hash: uid_store.account_hash, mailbox_hash, kind: RefreshEventKind::Rescan, }); - conn.add_refresh_event(RefreshEvent { - account_hash: uid_store.account_hash, - mailbox_hash, - kind: RefreshEventKind::Failure(MeliError::new(format!( - "Unknown mailbox: {} {}", - mailbox.path(), - mailbox_hash - ))), - }); + /* + uid_store.uid_index.lock().unwrap().clear(); + uid_store.hash_index.lock().unwrap().clear(); + uid_store.byte_cache.lock().unwrap().clear(); + */ + return Ok(()); } - } - let n = ok.exists; - if ok.recent > 0 { - { - /* UID SEARCH RECENT */ - exit_on_error!( - conn, - mailbox_hash, - conn.send_command(b"UID SEARCH RECENT").await - conn.read_response(&mut response, RequiredResponses::SEARCH).await - ); - match protocol_parser::search_results_raw(response.as_bytes()) - .map(|(_, v)| v) - .map_err(MeliError::from) - { - Ok(&[]) => { - debug!("UID SEARCH RECENT returned no results"); - } - Ok(v) => { - exit_on_error!( - conn, - mailbox_hash, - conn.send_command( - &[&b"UID FETCH"[..], &v.trim().split(|b| b == &b' ').join(b','), &b"(FLAGS RFC822)"[..]] - .join(&b' '), - ).await - conn.read_response(&mut response, RequiredResponses::FETCH_REQUIRED).await - ); - debug!(&response); - match protocol_parser::uid_fetch_responses(&response) { - Ok((_, v, _)) => { - 'fetch_responses_c: for UidFetchResponse { - uid, - flags, - body, - .. - } in v - { - if uid_store - .uid_index - .lock() - .unwrap() - .contains_key(&(mailbox_hash, uid)) - { - continue 'fetch_responses_c; - } - if let Ok(mut env) = Envelope::from_bytes( - body.unwrap(), - flags.as_ref().map(|&(f, _)| f), - ) { - uid_store - .hash_index - .lock() - .unwrap() - .insert(env.hash(), (uid, mailbox_hash)); - uid_store - .uid_index - .lock() - .unwrap() - .insert((mailbox_hash, uid), env.hash()); - debug!( - "Create event {} {} {}", - env.hash(), - env.subject(), - mailbox.path(), - ); - if let Some((_, keywords)) = flags { - let mut tag_lck = - uid_store.tag_index.write().unwrap(); - for f in keywords { - let hash = tag_hash!(f); - if !tag_lck.contains_key(&hash) { - tag_lck.insert(hash, f); - } - env.labels_mut().push(hash); - } - } - if !env.is_seen() { - mailbox - .unseen - .lock() - .unwrap() - .insert_new(env.hash()); - } - if uid_store.cache_headers { - cache::save_envelopes( - uid_store.account_hash, - mailbox_hash, - uidvalidity, - &[(uid, &env)], - )?; - } - let mut prev_exists = mailbox.exists.lock().unwrap(); - prev_exists.insert_new(env.hash()); - - conn.add_refresh_event(RefreshEvent { - account_hash: uid_store.account_hash, - mailbox_hash, - kind: Create(Box::new(env)), - }); - } - } - } - Err(e) => { - debug!(e); - } - } - } - Err(e) => { - debug!( - "UID SEARCH RECENT err: {}\nresp: {}", - e.to_string(), - &response - ); - } - } - } - } else if n > mailbox.exists.lock().unwrap().len() { - /* UID FETCH ALL UID, cross-ref, then FETCH difference headers - * */ - debug!("exists {}", n); - exit_on_error!( - conn, + } else { + conn.add_refresh_event(RefreshEvent { + account_hash: uid_store.account_hash, mailbox_hash, - conn.send_command( - &[ - b"FETCH", - format!("{}:{}", mailbox.exists.lock().unwrap().len() + 1, n).as_bytes(), - b"(UID FLAGS RFC822)", - ] - .join(&b' '), - ).await - conn.read_response(&mut response, RequiredResponses::FETCH_REQUIRED).await - ); - match protocol_parser::uid_fetch_responses(&response) { - Ok((_, v, _)) => { - 'fetch_responses_a: for UidFetchResponse { - uid, flags, body, .. - } in v - { - if uid_store - .uid_index - .lock() - .unwrap() - .contains_key(&(mailbox_hash, uid)) - { - continue 'fetch_responses_a; - } - if let Ok(mut env) = - Envelope::from_bytes(body.unwrap(), flags.as_ref().map(|&(f, _)| f)) - { - uid_store - .hash_index - .lock() - .unwrap() - .insert(env.hash(), (uid, mailbox_hash)); - uid_store - .uid_index - .lock() - .unwrap() - .insert((mailbox_hash, uid), env.hash()); - if let Some((_, keywords)) = flags { - let mut tag_lck = uid_store.tag_index.write().unwrap(); - for f in keywords { - let hash = tag_hash!(f); - if !tag_lck.contains_key(&hash) { - tag_lck.insert(hash, f); - } - env.labels_mut().push(hash); - } - } - debug!( - "Create event {} {} {}", - env.hash(), - env.subject(), - mailbox.path(), - ); - if !env.is_seen() { - mailbox.unseen.lock().unwrap().insert_new(env.hash()); - } - if uid_store.cache_headers { - cache::save_envelopes( - uid_store.account_hash, - mailbox_hash, - uidvalidity, - &[(uid, &env)], - )?; - } - mailbox.exists.lock().unwrap().insert_new(env.hash()); - - conn.add_refresh_event(RefreshEvent { - account_hash: uid_store.account_hash, - mailbox_hash, - kind: Create(Box::new(env)), - }); - } - } - } - Err(e) => { - debug!(e); - } - } + kind: RefreshEventKind::Rescan, + }); + return Err(MeliError::new(format!( + "Unknown mailbox: {} {}", + mailbox.path(), + mailbox_hash + ))); } } - Err(e) => { - debug!("{:?}", e); - return Err(e).chain_err_summary(|| "could not select mailbox"); + let mut cache_handle = cache::CacheHandle::get(uid_store.clone())?; + if debug!(select_response.recent > 0) { + /* UID SEARCH RECENT */ + conn.send_command(b"UID SEARCH RECENT").await?; + conn.read_response(&mut response, RequiredResponses::SEARCH) + .await?; + let v = protocol_parser::search_results(response.as_bytes()).map(|(_, v)| v)?; + if v.is_empty() { + debug!("search response was empty: {}", response); + return Ok(()); + } + let mut cmd = "UID FETCH ".to_string(); + if v.len() == 1 { + cmd.push_str(&v[0].to_string()); + } else { + cmd.push_str(&v[0].to_string()); + for n in v.into_iter().skip(1) { + cmd.push(','); + cmd.push_str(&n.to_string()); + } + } + cmd.push_str(" (UID FLAGS RFC822)"); + conn.send_command(cmd.as_bytes()).await?; + conn.read_response(&mut response, RequiredResponses::FETCH_REQUIRED) + .await?; + } else if debug!(select_response.exists > mailbox.exists.lock().unwrap().len()) { + conn.send_command( + format!( + "FETCH {}:* (UID FLAGS RFC822)", + mailbox.exists.lock().unwrap().len() + ) + .as_bytes(), + ) + .await?; + conn.read_response(&mut response, RequiredResponses::FETCH_REQUIRED) + .await?; + } else { + return Ok(()); } - }; + debug!(&response); + let (_, mut v, _) = protocol_parser::fetch_responses(&response)?; + for FetchResponse { + ref uid, + ref mut flags, + ref mut body, + ref mut envelope, + .. + } in v.iter_mut() + { + let uid = uid.unwrap(); + *envelope = Envelope::from_bytes(body.take().unwrap(), flags.as_ref().map(|&(f, _)| f)) + .map(|mut env| { + env.set_hash(generate_envelope_hash(&mailbox.imap_path(), &uid)); + if let Some((_, keywords)) = flags.take() { + let mut tag_lck = uid_store.tag_index.write().unwrap(); + for f in keywords { + let hash = tag_hash!(f); + if !tag_lck.contains_key(&hash) { + tag_lck.insert(hash, f); + } + env.labels_mut().push(hash); + } + } + env + }) + .map_err(|err| { + debug!("uid {} envelope parse error {}", uid, &err); + err + }) + .ok(); + } + if uid_store.keep_offline_cache { + cache_handle.insert_envelopes(mailbox_hash, &v)?; + } + 'fetch_responses_c: for FetchResponse { uid, envelope, .. } in v { + let uid = uid.unwrap(); + if uid_store + .uid_index + .lock() + .unwrap() + .contains_key(&(mailbox_hash, uid)) + { + continue 'fetch_responses_c; + } + if let Some(env) = envelope { + uid_store + .hash_index + .lock() + .unwrap() + .insert(env.hash(), (uid, mailbox_hash)); + uid_store + .uid_index + .lock() + .unwrap() + .insert((mailbox_hash, uid), env.hash()); + debug!( + "Create event {} {} {}", + env.hash(), + env.subject(), + mailbox.path(), + ); + if !env.is_seen() { + mailbox.unseen.lock().unwrap().insert_new(env.hash()); + } + mailbox.exists.lock().unwrap().insert_new(env.hash()); + conn.add_refresh_event(RefreshEvent { + account_hash: uid_store.account_hash, + mailbox_hash, + kind: Create(Box::new(env)), + }); + } + } + } Ok(()) } diff --git a/melib/src/sqlite3.rs b/melib/src/sqlite3.rs index 5eb640788..b001aae13 100644 --- a/melib/src/sqlite3.rs +++ b/melib/src/sqlite3.rs @@ -19,7 +19,8 @@ * along with meli. If not, see . */ -use crate::{error::*, logging::log}; +use crate::{error::*, logging::log, Envelope}; +use rusqlite::types::{FromSql, FromSqlError, FromSqlResult, ToSql, ToSqlOutput}; pub use rusqlite::{self, params, Connection}; use std::path::PathBuf; @@ -94,3 +95,20 @@ pub fn open_or_create_db( Ok(conn) } + +impl ToSql for Envelope { + fn to_sql(&self) -> rusqlite::Result { + let v: Vec = bincode::serialize(self).map_err(|e| { + rusqlite::Error::ToSqlConversionFailure(Box::new(MeliError::new(e.to_string()))) + })?; + Ok(ToSqlOutput::from(v)) + } +} + +impl FromSql for Envelope { + fn column_result(value: rusqlite::types::ValueRef) -> FromSqlResult { + let b: Vec = FromSql::column_result(value)?; + Ok(bincode::deserialize(&b) + .map_err(|e| FromSqlError::Other(Box::new(MeliError::new(e.to_string()))))?) + } +}