Browse Source

melib/imap: fix some LazyCountSet logic errors in sync

jmap-eventsource
Manos Pitsidianakis 1 year ago
parent
commit
f9ce5327c2
Signed by untrusted user: epilys GPG Key ID: 73627C2F690DF710
  1. 29
      melib/src/backends.rs
  2. 49
      melib/src/backends/imap.rs
  3. 26
      melib/src/backends/imap/cache.rs
  4. 26
      melib/src/backends/imap/cache/sync.rs
  5. 7
      melib/src/backends/imap/connection.rs

29
melib/src/backends.rs

@ -614,12 +614,22 @@ impl EnvelopeHashBatch {
}
}
#[derive(Debug, Default, Clone)]
#[derive(Default, Clone)]
pub struct LazyCountSet {
not_yet_seen: usize,
set: BTreeSet<EnvelopeHash>,
}
impl fmt::Debug for LazyCountSet {
fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
f.debug_struct("LazyCountSet")
.field("not_yet_seen", &self.not_yet_seen)
.field("set", &self.set.len())
.field("total_len", &self.len())
.finish()
}
}
impl LazyCountSet {
pub fn set_not_yet_seen(&mut self, new_val: usize) {
self.not_yet_seen = new_val;
@ -629,19 +639,21 @@ impl LazyCountSet {
if self.not_yet_seen == 0 {
false
} else {
self.not_yet_seen -= 1;
if !self.set.contains(&new_val) {
self.not_yet_seen -= 1;
}
self.set.insert(new_val);
true
}
}
pub fn insert_existing_set(&mut self, set: BTreeSet<EnvelopeHash>) -> bool {
debug!("insert_existing_set {:?}", &set);
if self.not_yet_seen < set.len() {
false
} else {
self.not_yet_seen -= set.len();
let old_len = self.set.len();
self.set.extend(set.into_iter());
self.not_yet_seen -= self.set.len() - old_len;
true
}
}
@ -662,21 +674,24 @@ impl LazyCountSet {
}
pub fn insert_set(&mut self, set: BTreeSet<EnvelopeHash>) {
debug!("insert__set {:?}", &set);
self.set.extend(set.into_iter());
}
pub fn remove(&mut self, new_val: EnvelopeHash) -> bool {
self.set.remove(&new_val)
pub fn remove(&mut self, env_hash: EnvelopeHash) -> bool {
self.set.remove(&env_hash)
}
}
#[test]
fn test_lazy_count_set() {
let mut new = LazyCountSet::default();
assert_eq!(new.len(), 0);
new.set_not_yet_seen(10);
assert_eq!(new.len(), 10);
for i in 0..10 {
assert!(new.insert_existing(i));
}
assert_eq!(new.len(), 10);
assert!(!new.insert_existing(10));
assert_eq!(new.len(), 10);
}

49
melib/src/backends/imap.rs

@ -1602,7 +1602,7 @@ async fn fetch_hlpr(state: &mut FetchState) -> Result<Vec<Envelope>> {
let f = &state.uid_store.mailboxes.lock().await[&state.mailbox_hash];
(f.exists.clone(), f.unseen.clone())
};
unseen.lock().unwrap().insert_existing_set(
unseen.lock().unwrap().insert_set(
cached_payload
.iter()
.filter_map(|env| {
@ -1614,9 +1614,10 @@ async fn fetch_hlpr(state: &mut FetchState) -> Result<Vec<Envelope>> {
})
.collect(),
);
mailbox_exists.lock().unwrap().insert_existing_set(
cached_payload.iter().map(|env| env.hash()).collect::<_>(),
);
mailbox_exists
.lock()
.unwrap()
.insert_set(cached_payload.iter().map(|env| env.hash()).collect::<_>());
return Ok(cached_payload);
}
}
@ -1656,26 +1657,21 @@ async fn fetch_hlpr(state: &mut FetchState) -> Result<Vec<Envelope>> {
return Ok(Vec::new());
}
let mut conn = connection.lock().await;
debug!("locked for fetch {}", mailbox_path);
let mut response = Vec::with_capacity(8 * 1024);
let max_uid_left = max_uid;
let chunk_size = 250;
let mut payload = vec![];
let mut envelopes = Vec::with_capacity(chunk_size);
conn.examine_mailbox(mailbox_hash, &mut response, false)
.await?;
if max_uid_left > 0 {
let mut envelopes = vec![];
debug!("{} max_uid_left= {}", mailbox_hash, max_uid_left);
let command = if max_uid_left == 1 {
"UID FETCH 1 (UID FLAGS ENVELOPE BODY.PEEK[HEADER.FIELDS (REFERENCES)] BODYSTRUCTURE)".to_string()
} else {
format!(
"UID FETCH {}:{} (UID FLAGS ENVELOPE BODY.PEEK[HEADER.FIELDS (REFERENCES)] BODYSTRUCTURE)",
std::cmp::max(
std::cmp::max(max_uid_left.saturating_sub(chunk_size), 1),
1
),
max_uid_left
)
};
@ -1689,13 +1685,13 @@ async fn fetch_hlpr(state: &mut FetchState) -> Result<Vec<Envelope>> {
mailbox_path
)
})?;
let (_, mut v, _) = protocol_parser::fetch_responses(&response)?;
debug!(
"fetch response is {} bytes and {} lines",
"fetch response is {} bytes and {} lines and has {} parsed Envelopes",
response.len(),
String::from_utf8_lossy(&response).lines().count()
String::from_utf8_lossy(&response).lines().count(),
v.len()
);
let (_, mut v, _) = protocol_parser::fetch_responses(&response)?;
debug!("responses len is {}", v.len());
for FetchResponse {
ref uid,
ref mut envelope,
@ -1734,10 +1730,10 @@ async fn fetch_hlpr(state: &mut FetchState) -> Result<Vec<Envelope>> {
}
let mut tag_lck = uid_store.tag_index.write().unwrap();
if let Some((flags, keywords)) = flags {
if !flags.intersects(Flag::SEEN) {
env.set_flags(*flags);
if !env.is_seen() {
our_unseen.insert(env.hash());
}
env.set_flags(*flags);
for f in keywords {
let hash = tag_hash!(f);
if !tag_lck.contains_key(&hash) {
@ -1799,30 +1795,25 @@ async fn fetch_hlpr(state: &mut FetchState) -> Result<Vec<Envelope>> {
.lock()
.unwrap()
.insert((mailbox_hash, uid), env.hash());
envelopes.push((uid, env));
envelopes.push(env);
}
debug!("sending payload for {}", mailbox_hash);
unseen
unseen.lock().unwrap().insert_existing_set(our_unseen);
mailbox_exists
.lock()
.unwrap()
.insert_existing_set(our_unseen.iter().cloned().collect());
mailbox_exists.lock().unwrap().insert_existing_set(
envelopes.iter().map(|(_, env)| env.hash()).collect::<_>(),
);
.insert_existing_set(envelopes.iter().map(|env| env.hash()).collect::<_>());
drop(conn);
payload.extend(envelopes.into_iter().map(|(_, env)| env));
}
if max_uid_left <= 1 {
unseen.lock().unwrap().set_not_yet_seen(0);
mailbox_exists.lock().unwrap().set_not_yet_seen(0);
*stage = FetchStage::Finished;
} else {
*stage = FetchStage::FreshFetch {
max_uid: std::cmp::max(
std::cmp::max(max_uid_left.saturating_sub(chunk_size), 1),
1,
),
max_uid: std::cmp::max(max_uid_left.saturating_sub(chunk_size + 1), 1),
};
}
return Ok(payload);
return Ok(envelopes);
}
FetchStage::Finished => {
return Ok(vec![]);

26
melib/src/backends/imap/cache.rs

@ -665,33 +665,7 @@ pub(super) async fn fetch_cached_envs(state: &mut FetchState) -> Result<Option<V
match debug!(conn.load_cache(mailbox_hash).await) {
None => return Ok(None),
Some(Ok(env_hashes)) => {
uid_store
.mailboxes
.lock()
.await
.entry(mailbox_hash)
.and_modify(|entry| {
entry
.exists
.lock()
.unwrap()
.insert_set(env_hashes.iter().cloned().collect());
let env_lck = uid_store.envelopes.lock().unwrap();
entry.unseen.lock().unwrap().insert_set(
env_hashes
.iter()
.filter_map(|h| {
if !env_lck[h].inner.is_seen() {
Some(*h)
} else {
None
}
})
.collect(),
);
});
let env_lck = uid_store.envelopes.lock().unwrap();
return Ok(Some(
env_hashes
.into_iter()

26
melib/src/backends/imap/cache/sync.rs

@ -237,11 +237,11 @@ impl ImapConnection {
unseen
.lock()
.unwrap()
.insert_existing_set(new_unseen.iter().cloned().collect());
.insert_set(new_unseen.iter().cloned().collect());
mailbox_exists
.lock()
.unwrap()
.insert_existing_set(payload.iter().map(|(_, env)| env.hash()).collect::<_>());
.insert_set(payload.iter().map(|(_, env)| env.hash()).collect::<_>());
// 3. tag2 UID FETCH 1:<lastseenuid> FLAGS
if max_uid == 0 {
self.send_command("UID FETCH 1:* FLAGS".as_bytes()).await?;
@ -535,11 +535,11 @@ impl ImapConnection {
unseen
.lock()
.unwrap()
.insert_existing_set(new_unseen.iter().cloned().collect());
.insert_set(new_unseen.iter().cloned().collect());
mailbox_exists
.lock()
.unwrap()
.insert_existing_set(payload.iter().map(|(_, env)| env.hash()).collect::<_>());
.insert_set(payload.iter().map(|(_, env)| env.hash()).collect::<_>());
// 3. tag2 UID FETCH 1:<lastseenuid> FLAGS
if cached_max_uid == 0 {
self.send_command(
@ -700,14 +700,16 @@ impl ImapConnection {
permissions.set_flags = !select_response.read_only;
permissions.rename_messages = !select_response.read_only;
permissions.delete_messages = !select_response.read_only;
mailbox_exists
.lock()
.unwrap()
.set_not_yet_seen(select_response.exists);
unseen
.lock()
.unwrap()
.set_not_yet_seen(select_response.unseen);
{
let mut mailbox_exists_lck = mailbox_exists.lock().unwrap();
mailbox_exists_lck.clear();
mailbox_exists_lck.set_not_yet_seen(select_response.exists);
}
{
let mut unseen_lck = unseen.lock().unwrap();
unseen_lck.clear();
unseen_lck.set_not_yet_seen(select_response.unseen);
}
}
if select_response.exists == 0 {
return Ok(select_response);

7
melib/src/backends/imap/connection.rs

@ -1017,15 +1017,10 @@ impl ImapConnection {
.await?;
self.read_response(&mut response, RequiredResponses::SEARCH)
.await?;
debug!("uid search response {:?}", &response);
let mut msn_index_lck = self.uid_store.msn_index.lock().unwrap();
let msn_index = msn_index_lck.entry(mailbox_hash).or_default();
let _ = msn_index.drain(low - 1..);
msn_index.extend(
debug!(protocol_parser::search_results(&response))?
.1
.into_iter(),
);
msn_index.extend(protocol_parser::search_results(&response)?.1.into_iter());
Ok(())
}
}

Loading…
Cancel
Save