From 00f5c4b9c03a86727193e49d9a47d4d2f4022059 Mon Sep 17 00:00:00 2001 From: Manos Pitsidianakis Date: Tue, 20 Oct 2020 23:27:10 +0300 Subject: [PATCH] melib/maildir: split parsing into big chunks --- melib/src/backends/maildir/backend.rs | 17 ++-- melib/src/backends/maildir/stream.rs | 130 ++++++++++++-------------- 2 files changed, 65 insertions(+), 82 deletions(-) diff --git a/melib/src/backends/maildir/backend.rs b/melib/src/backends/maildir/backend.rs index c5b5d96f..de0ba249 100644 --- a/melib/src/backends/maildir/backend.rs +++ b/melib/src/backends/maildir/backend.rs @@ -141,15 +141,6 @@ macro_rules! get_path_hash { } pub(super) fn get_file_hash(file: &Path) -> EnvelopeHash { - /* - let mut buf = Vec::with_capacity(2048); - let mut f = fs::File::open(&file).unwrap_or_else(|_| panic!("Can't open {}", file.display())); - f.read_to_end(&mut buf) - .unwrap_or_else(|_| panic!("Can't read {}", file.display())); - let mut hasher = DefaultHasher::default(); - hasher.write(&buf); - hasher.finish() - */ let mut hasher = DefaultHasher::default(); file.hash(&mut hasher); hasher.finish() @@ -546,9 +537,13 @@ impl MailBackend for MaildirType { }); continue; } - *mailbox_counts[&mailbox_hash].1.lock().unwrap() -= 1; + { + let mut lck = mailbox_counts[&mailbox_hash].1.lock().unwrap(); + *lck = lck.saturating_sub(1); + } if !pathbuf.flags().contains(Flag::SEEN) { - *mailbox_counts[&mailbox_hash].0.lock().unwrap() -= 1; + let mut lck = mailbox_counts[&mailbox_hash].0.lock().unwrap(); + *lck = lck.saturating_sub(1); } index_lock.entry(hash).and_modify(|e| { diff --git a/melib/src/backends/maildir/stream.rs b/melib/src/backends/maildir/stream.rs index be750f72..6a674d37 100644 --- a/melib/src/backends/maildir/stream.rs +++ b/melib/src/backends/maildir/stream.rs @@ -50,6 +50,7 @@ impl MaildirStream { map: HashIndexes, mailbox_index: Arc>>, ) -> Result>> + Send + 'static>>> { + let chunk_size = 2048; path.push("new"); for d in path.read_dir()? { if let Ok(p) = d { @@ -57,7 +58,6 @@ impl MaildirStream { } } path.pop(); - path.push("cur"); let iter = path.read_dir()?; let count = path.read_dir()?.count(); @@ -70,16 +70,9 @@ impl MaildirStream { files.push(e); } let payloads = Box::pin(if !files.is_empty() { - let cores = 4_usize; - let chunk_size = if count / cores > 0 { - count / cores - } else { - count - }; files .chunks(chunk_size) .map(|chunk| { - //Self::chunk(chunk, name, mailbox_hash, unseen, total, path, root_path, map, mailbox_index)}) let cache_dir = xdg::BaseDirectories::with_profile("meli", &name).unwrap(); Box::pin(Self::chunk( SmallVec::from(chunk), @@ -109,80 +102,76 @@ impl MaildirStream { map: HashIndexes, mailbox_index: Arc>>, ) -> Result> { - let len = chunk.len(); - let size = if len <= 100 { 100 } else { (len / 100) * 100 }; let mut local_r: Vec = Vec::with_capacity(chunk.len()); + let mut unseen_total: usize = 0; let mut buf = Vec::with_capacity(4096); - for c in chunk.chunks(size) { - let map = map.clone(); - for file in c { - /* Check if we have a cache file with this email's - * filename */ - let file_name = PathBuf::from(file) - .strip_prefix(&root_path) - .unwrap() - .to_path_buf(); - if let Some(cached) = cache_dir.find_cache_file(&file_name) { - /* Cached struct exists, try to load it */ - let reader = io::BufReader::new(fs::File::open(&cached).unwrap()); - let result: result::Result = bincode::deserialize_from(reader); - if let Ok(env) = result { - let mut map = map.lock().unwrap(); - let map = map.entry(mailbox_hash).or_default(); - let hash = env.hash(); - map.insert(hash, file.clone().into()); - mailbox_index.lock().unwrap().insert(hash, mailbox_hash); - if !env.is_seen() { - *unseen.lock().unwrap() += 1; - } - *total.lock().unwrap() += 1; - local_r.push(env); - continue; - } - }; - let env_hash = get_file_hash(file); - { + for file in chunk { + /* Check if we have a cache file with this email's + * filename */ + let file_name = PathBuf::from(&file) + .strip_prefix(&root_path) + .unwrap() + .to_path_buf(); + if let Some(cached) = cache_dir.find_cache_file(&file_name) { + /* Cached struct exists, try to load it */ + let reader = io::BufReader::new(fs::File::open(&cached)?); + let result: result::Result = bincode::deserialize_from(reader); + if let Ok(env) = result { let mut map = map.lock().unwrap(); let map = map.entry(mailbox_hash).or_default(); - (*map).insert(env_hash, PathBuf::from(file).into()); + let hash = env.hash(); + map.insert(hash, file.clone().into()); + mailbox_index.lock().unwrap().insert(hash, mailbox_hash); + if !env.is_seen() { + unseen_total += 1; + } + local_r.push(env); + continue; } - let mut reader = io::BufReader::new(fs::File::open(&file)?); - buf.clear(); - reader.read_to_end(&mut buf)?; - match Envelope::from_bytes(buf.as_slice(), Some(file.flags())) { - Ok(mut env) => { - env.set_hash(env_hash); - mailbox_index.lock().unwrap().insert(env_hash, mailbox_hash); - if let Ok(cached) = cache_dir.place_cache_file(file_name) { - /* place result in cache directory */ - let f = fs::File::create(cached)?; - let metadata = f.metadata()?; - let mut permissions = metadata.permissions(); + }; + let env_hash = get_file_hash(&file); + { + let mut map = map.lock().unwrap(); + let map = map.entry(mailbox_hash).or_default(); + map.insert(env_hash, PathBuf::from(&file).into()); + } + let mut reader = io::BufReader::new(fs::File::open(&file)?); + buf.clear(); + reader.read_to_end(&mut buf)?; + match Envelope::from_bytes(buf.as_slice(), Some(file.flags())) { + Ok(mut env) => { + env.set_hash(env_hash); + mailbox_index.lock().unwrap().insert(env_hash, mailbox_hash); + if let Ok(cached) = cache_dir.place_cache_file(file_name) { + /* place result in cache directory */ + let f = fs::File::create(cached)?; + let metadata = f.metadata()?; + let mut permissions = metadata.permissions(); - permissions.set_mode(0o600); // Read/write for owner only. - f.set_permissions(permissions)?; + permissions.set_mode(0o600); // Read/write for owner only. + f.set_permissions(permissions)?; - let writer = io::BufWriter::new(f); - bincode::serialize_into(writer, &env)?; - } - if !env.is_seen() { - *unseen.lock().unwrap() += 1; - } - *total.lock().unwrap() += 1; - local_r.push(env); + let writer = io::BufWriter::new(f); + bincode::serialize_into(writer, &env)?; } - Err(err) => { - debug!( - "DEBUG: hash {}, path: {} couldn't be parsed, {}", - env_hash, - file.as_path().display(), - err, - ); - continue; + if !env.is_seen() { + unseen_total += 1; } + local_r.push(env); + } + Err(err) => { + debug!( + "DEBUG: hash {}, path: {} couldn't be parsed, {}", + env_hash, + file.as_path().display(), + err, + ); + continue; } } } + *total.lock().unwrap() += local_r.len(); + *unseen.lock().unwrap() += unseen_total; Ok(local_r) } } @@ -190,7 +179,6 @@ impl MaildirStream { impl Stream for MaildirStream { type Item = Result>; fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll> { - //todo!() let payloads = self.payloads.as_mut(); payloads.poll_next(cx) }