From f9ce5327c23e3a2bcf661baaa35145cc7aac5880 Mon Sep 17 00:00:00 2001 From: Manos Pitsidianakis Date: Tue, 24 Nov 2020 00:23:45 +0200 Subject: [PATCH] melib/imap: fix some LazyCountSet logic errors in sync --- melib/src/backends.rs | 29 +++++++++++---- melib/src/backends/imap.rs | 53 +++++++++++---------------- melib/src/backends/imap/cache.rs | 26 ------------- melib/src/backends/imap/cache/sync.rs | 26 +++++++------ melib/src/backends/imap/connection.rs | 7 +--- 5 files changed, 59 insertions(+), 82 deletions(-) diff --git a/melib/src/backends.rs b/melib/src/backends.rs index d64a008df..8e8e8f81f 100644 --- a/melib/src/backends.rs +++ b/melib/src/backends.rs @@ -614,12 +614,22 @@ impl EnvelopeHashBatch { } } -#[derive(Debug, Default, Clone)] +#[derive(Default, Clone)] pub struct LazyCountSet { not_yet_seen: usize, set: BTreeSet, } +impl fmt::Debug for LazyCountSet { + fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result { + f.debug_struct("LazyCountSet") + .field("not_yet_seen", &self.not_yet_seen) + .field("set", &self.set.len()) + .field("total_len", &self.len()) + .finish() + } +} + impl LazyCountSet { pub fn set_not_yet_seen(&mut self, new_val: usize) { self.not_yet_seen = new_val; @@ -629,19 +639,21 @@ impl LazyCountSet { if self.not_yet_seen == 0 { false } else { - self.not_yet_seen -= 1; + if !self.set.contains(&new_val) { + self.not_yet_seen -= 1; + } self.set.insert(new_val); true } } pub fn insert_existing_set(&mut self, set: BTreeSet) -> bool { - debug!("insert_existing_set {:?}", &set); if self.not_yet_seen < set.len() { false } else { - self.not_yet_seen -= set.len(); + let old_len = self.set.len(); self.set.extend(set.into_iter()); + self.not_yet_seen -= self.set.len() - old_len; true } } @@ -662,21 +674,24 @@ impl LazyCountSet { } pub fn insert_set(&mut self, set: BTreeSet) { - debug!("insert__set {:?}", &set); self.set.extend(set.into_iter()); } - pub fn remove(&mut self, new_val: EnvelopeHash) -> bool { - self.set.remove(&new_val) + pub fn remove(&mut self, env_hash: EnvelopeHash) -> bool { + self.set.remove(&env_hash) } } #[test] fn test_lazy_count_set() { let mut new = LazyCountSet::default(); + assert_eq!(new.len(), 0); new.set_not_yet_seen(10); + assert_eq!(new.len(), 10); for i in 0..10 { assert!(new.insert_existing(i)); } + assert_eq!(new.len(), 10); assert!(!new.insert_existing(10)); + assert_eq!(new.len(), 10); } diff --git a/melib/src/backends/imap.rs b/melib/src/backends/imap.rs index ed53521c0..33208c6aa 100644 --- a/melib/src/backends/imap.rs +++ b/melib/src/backends/imap.rs @@ -1602,7 +1602,7 @@ async fn fetch_hlpr(state: &mut FetchState) -> Result> { let f = &state.uid_store.mailboxes.lock().await[&state.mailbox_hash]; (f.exists.clone(), f.unseen.clone()) }; - unseen.lock().unwrap().insert_existing_set( + unseen.lock().unwrap().insert_set( cached_payload .iter() .filter_map(|env| { @@ -1614,9 +1614,10 @@ async fn fetch_hlpr(state: &mut FetchState) -> Result> { }) .collect(), ); - mailbox_exists.lock().unwrap().insert_existing_set( - cached_payload.iter().map(|env| env.hash()).collect::<_>(), - ); + mailbox_exists + .lock() + .unwrap() + .insert_set(cached_payload.iter().map(|env| env.hash()).collect::<_>()); return Ok(cached_payload); } } @@ -1656,26 +1657,21 @@ async fn fetch_hlpr(state: &mut FetchState) -> Result> { return Ok(Vec::new()); } let mut conn = connection.lock().await; - debug!("locked for fetch {}", mailbox_path); let mut response = Vec::with_capacity(8 * 1024); let max_uid_left = max_uid; let chunk_size = 250; - let mut payload = vec![]; + let mut envelopes = Vec::with_capacity(chunk_size); conn.examine_mailbox(mailbox_hash, &mut response, false) .await?; if max_uid_left > 0 { - let mut envelopes = vec![]; debug!("{} max_uid_left= {}", mailbox_hash, max_uid_left); let command = if max_uid_left == 1 { "UID FETCH 1 (UID FLAGS ENVELOPE BODY.PEEK[HEADER.FIELDS (REFERENCES)] BODYSTRUCTURE)".to_string() } else { format!( "UID FETCH {}:{} (UID FLAGS ENVELOPE BODY.PEEK[HEADER.FIELDS (REFERENCES)] BODYSTRUCTURE)", - std::cmp::max( std::cmp::max(max_uid_left.saturating_sub(chunk_size), 1), - 1 - ), max_uid_left ) }; @@ -1689,13 +1685,13 @@ async fn fetch_hlpr(state: &mut FetchState) -> Result> { mailbox_path ) })?; - debug!( - "fetch response is {} bytes and {} lines", - response.len(), - String::from_utf8_lossy(&response).lines().count() - ); let (_, mut v, _) = protocol_parser::fetch_responses(&response)?; - debug!("responses len is {}", v.len()); + debug!( + "fetch response is {} bytes and {} lines and has {} parsed Envelopes", + response.len(), + String::from_utf8_lossy(&response).lines().count(), + v.len() + ); for FetchResponse { ref uid, ref mut envelope, @@ -1734,10 +1730,10 @@ async fn fetch_hlpr(state: &mut FetchState) -> Result> { } let mut tag_lck = uid_store.tag_index.write().unwrap(); if let Some((flags, keywords)) = flags { - if !flags.intersects(Flag::SEEN) { + env.set_flags(*flags); + if !env.is_seen() { our_unseen.insert(env.hash()); } - env.set_flags(*flags); for f in keywords { let hash = tag_hash!(f); if !tag_lck.contains_key(&hash) { @@ -1799,30 +1795,25 @@ async fn fetch_hlpr(state: &mut FetchState) -> Result> { .lock() .unwrap() .insert((mailbox_hash, uid), env.hash()); - envelopes.push((uid, env)); + envelopes.push(env); } - debug!("sending payload for {}", mailbox_hash); - unseen + unseen.lock().unwrap().insert_existing_set(our_unseen); + mailbox_exists .lock() .unwrap() - .insert_existing_set(our_unseen.iter().cloned().collect()); - mailbox_exists.lock().unwrap().insert_existing_set( - envelopes.iter().map(|(_, env)| env.hash()).collect::<_>(), - ); + .insert_existing_set(envelopes.iter().map(|env| env.hash()).collect::<_>()); drop(conn); - payload.extend(envelopes.into_iter().map(|(_, env)| env)); } if max_uid_left <= 1 { + unseen.lock().unwrap().set_not_yet_seen(0); + mailbox_exists.lock().unwrap().set_not_yet_seen(0); *stage = FetchStage::Finished; } else { *stage = FetchStage::FreshFetch { - max_uid: std::cmp::max( - std::cmp::max(max_uid_left.saturating_sub(chunk_size), 1), - 1, - ), + max_uid: std::cmp::max(max_uid_left.saturating_sub(chunk_size + 1), 1), }; } - return Ok(payload); + return Ok(envelopes); } FetchStage::Finished => { return Ok(vec![]); diff --git a/melib/src/backends/imap/cache.rs b/melib/src/backends/imap/cache.rs index 338e1e44d..e0d4be064 100644 --- a/melib/src/backends/imap/cache.rs +++ b/melib/src/backends/imap/cache.rs @@ -665,33 +665,7 @@ pub(super) async fn fetch_cached_envs(state: &mut FetchState) -> Result return Ok(None), Some(Ok(env_hashes)) => { - uid_store - .mailboxes - .lock() - .await - .entry(mailbox_hash) - .and_modify(|entry| { - entry - .exists - .lock() - .unwrap() - .insert_set(env_hashes.iter().cloned().collect()); - let env_lck = uid_store.envelopes.lock().unwrap(); - entry.unseen.lock().unwrap().insert_set( - env_hashes - .iter() - .filter_map(|h| { - if !env_lck[h].inner.is_seen() { - Some(*h) - } else { - None - } - }) - .collect(), - ); - }); let env_lck = uid_store.envelopes.lock().unwrap(); - return Ok(Some( env_hashes .into_iter() diff --git a/melib/src/backends/imap/cache/sync.rs b/melib/src/backends/imap/cache/sync.rs index c26629e00..b91cee088 100644 --- a/melib/src/backends/imap/cache/sync.rs +++ b/melib/src/backends/imap/cache/sync.rs @@ -237,11 +237,11 @@ impl ImapConnection { unseen .lock() .unwrap() - .insert_existing_set(new_unseen.iter().cloned().collect()); + .insert_set(new_unseen.iter().cloned().collect()); mailbox_exists .lock() .unwrap() - .insert_existing_set(payload.iter().map(|(_, env)| env.hash()).collect::<_>()); + .insert_set(payload.iter().map(|(_, env)| env.hash()).collect::<_>()); // 3. tag2 UID FETCH 1: FLAGS if max_uid == 0 { self.send_command("UID FETCH 1:* FLAGS".as_bytes()).await?; @@ -535,11 +535,11 @@ impl ImapConnection { unseen .lock() .unwrap() - .insert_existing_set(new_unseen.iter().cloned().collect()); + .insert_set(new_unseen.iter().cloned().collect()); mailbox_exists .lock() .unwrap() - .insert_existing_set(payload.iter().map(|(_, env)| env.hash()).collect::<_>()); + .insert_set(payload.iter().map(|(_, env)| env.hash()).collect::<_>()); // 3. tag2 UID FETCH 1: FLAGS if cached_max_uid == 0 { self.send_command( @@ -700,14 +700,16 @@ impl ImapConnection { permissions.set_flags = !select_response.read_only; permissions.rename_messages = !select_response.read_only; permissions.delete_messages = !select_response.read_only; - mailbox_exists - .lock() - .unwrap() - .set_not_yet_seen(select_response.exists); - unseen - .lock() - .unwrap() - .set_not_yet_seen(select_response.unseen); + { + let mut mailbox_exists_lck = mailbox_exists.lock().unwrap(); + mailbox_exists_lck.clear(); + mailbox_exists_lck.set_not_yet_seen(select_response.exists); + } + { + let mut unseen_lck = unseen.lock().unwrap(); + unseen_lck.clear(); + unseen_lck.set_not_yet_seen(select_response.unseen); + } } if select_response.exists == 0 { return Ok(select_response); diff --git a/melib/src/backends/imap/connection.rs b/melib/src/backends/imap/connection.rs index 27f3edce6..69aa622a0 100644 --- a/melib/src/backends/imap/connection.rs +++ b/melib/src/backends/imap/connection.rs @@ -1017,15 +1017,10 @@ impl ImapConnection { .await?; self.read_response(&mut response, RequiredResponses::SEARCH) .await?; - debug!("uid search response {:?}", &response); let mut msn_index_lck = self.uid_store.msn_index.lock().unwrap(); let msn_index = msn_index_lck.entry(mailbox_hash).or_default(); let _ = msn_index.drain(low - 1..); - msn_index.extend( - debug!(protocol_parser::search_results(&response))? - .1 - .into_iter(), - ); + msn_index.extend(protocol_parser::search_results(&response)?.1.into_iter()); Ok(()) } }