Browse Source

melib/maildir: split parsing into big chunks

jmap-eventsource
Manos Pitsidianakis 1 year ago
parent
commit
00f5c4b9c0
Signed by: epilys GPG Key ID: 73627C2F690DF710
  1. 17
      melib/src/backends/maildir/backend.rs
  2. 130
      melib/src/backends/maildir/stream.rs

17
melib/src/backends/maildir/backend.rs

@ -141,15 +141,6 @@ macro_rules! get_path_hash {
}
pub(super) fn get_file_hash(file: &Path) -> EnvelopeHash {
/*
let mut buf = Vec::with_capacity(2048);
let mut f = fs::File::open(&file).unwrap_or_else(|_| panic!("Can't open {}", file.display()));
f.read_to_end(&mut buf)
.unwrap_or_else(|_| panic!("Can't read {}", file.display()));
let mut hasher = DefaultHasher::default();
hasher.write(&buf);
hasher.finish()
*/
let mut hasher = DefaultHasher::default();
file.hash(&mut hasher);
hasher.finish()
@ -546,9 +537,13 @@ impl MailBackend for MaildirType {
});
continue;
}
*mailbox_counts[&mailbox_hash].1.lock().unwrap() -= 1;
{
let mut lck = mailbox_counts[&mailbox_hash].1.lock().unwrap();
*lck = lck.saturating_sub(1);
}
if !pathbuf.flags().contains(Flag::SEEN) {
*mailbox_counts[&mailbox_hash].0.lock().unwrap() -= 1;
let mut lck = mailbox_counts[&mailbox_hash].0.lock().unwrap();
*lck = lck.saturating_sub(1);
}
index_lock.entry(hash).and_modify(|e| {

130
melib/src/backends/maildir/stream.rs

@ -50,6 +50,7 @@ impl MaildirStream {
map: HashIndexes,
mailbox_index: Arc<Mutex<HashMap<EnvelopeHash, MailboxHash>>>,
) -> Result<Pin<Box<dyn Stream<Item = Result<Vec<Envelope>>> + Send + 'static>>> {
let chunk_size = 2048;
path.push("new");
for d in path.read_dir()? {
if let Ok(p) = d {
@ -57,7 +58,6 @@ impl MaildirStream {
}
}
path.pop();
path.push("cur");
let iter = path.read_dir()?;
let count = path.read_dir()?.count();
@ -70,16 +70,9 @@ impl MaildirStream {
files.push(e);
}
let payloads = Box::pin(if !files.is_empty() {
let cores = 4_usize;
let chunk_size = if count / cores > 0 {
count / cores
} else {
count
};
files
.chunks(chunk_size)
.map(|chunk| {
//Self::chunk(chunk, name, mailbox_hash, unseen, total, path, root_path, map, mailbox_index)})
let cache_dir = xdg::BaseDirectories::with_profile("meli", &name).unwrap();
Box::pin(Self::chunk(
SmallVec::from(chunk),
@ -109,80 +102,76 @@ impl MaildirStream {
map: HashIndexes,
mailbox_index: Arc<Mutex<HashMap<EnvelopeHash, MailboxHash>>>,
) -> Result<Vec<Envelope>> {
let len = chunk.len();
let size = if len <= 100 { 100 } else { (len / 100) * 100 };
let mut local_r: Vec<Envelope> = Vec::with_capacity(chunk.len());
let mut unseen_total: usize = 0;
let mut buf = Vec::with_capacity(4096);
for c in chunk.chunks(size) {
let map = map.clone();
for file in c {
/* Check if we have a cache file with this email's
* filename */
let file_name = PathBuf::from(file)
.strip_prefix(&root_path)
.unwrap()
.to_path_buf();
if let Some(cached) = cache_dir.find_cache_file(&file_name) {
/* Cached struct exists, try to load it */
let reader = io::BufReader::new(fs::File::open(&cached).unwrap());
let result: result::Result<Envelope, _> = bincode::deserialize_from(reader);
if let Ok(env) = result {
let mut map = map.lock().unwrap();
let map = map.entry(mailbox_hash).or_default();
let hash = env.hash();
map.insert(hash, file.clone().into());
mailbox_index.lock().unwrap().insert(hash, mailbox_hash);
if !env.is_seen() {
*unseen.lock().unwrap() += 1;
}
*total.lock().unwrap() += 1;
local_r.push(env);
continue;
}
};
let env_hash = get_file_hash(file);
{
for file in chunk {
/* Check if we have a cache file with this email's
* filename */
let file_name = PathBuf::from(&file)
.strip_prefix(&root_path)
.unwrap()
.to_path_buf();
if let Some(cached) = cache_dir.find_cache_file(&file_name) {
/* Cached struct exists, try to load it */
let reader = io::BufReader::new(fs::File::open(&cached)?);
let result: result::Result<Envelope, _> = bincode::deserialize_from(reader);
if let Ok(env) = result {
let mut map = map.lock().unwrap();
let map = map.entry(mailbox_hash).or_default();
(*map).insert(env_hash, PathBuf::from(file).into());
let hash = env.hash();
map.insert(hash, file.clone().into());
mailbox_index.lock().unwrap().insert(hash, mailbox_hash);
if !env.is_seen() {
unseen_total += 1;
}
local_r.push(env);
continue;
}
let mut reader = io::BufReader::new(fs::File::open(&file)?);
buf.clear();
reader.read_to_end(&mut buf)?;
match Envelope::from_bytes(buf.as_slice(), Some(file.flags())) {
Ok(mut env) => {
env.set_hash(env_hash);
mailbox_index.lock().unwrap().insert(env_hash, mailbox_hash);
if let Ok(cached) = cache_dir.place_cache_file(file_name) {
/* place result in cache directory */
let f = fs::File::create(cached)?;
let metadata = f.metadata()?;
let mut permissions = metadata.permissions();
};
let env_hash = get_file_hash(&file);
{
let mut map = map.lock().unwrap();
let map = map.entry(mailbox_hash).or_default();
map.insert(env_hash, PathBuf::from(&file).into());
}
let mut reader = io::BufReader::new(fs::File::open(&file)?);
buf.clear();
reader.read_to_end(&mut buf)?;
match Envelope::from_bytes(buf.as_slice(), Some(file.flags())) {
Ok(mut env) => {
env.set_hash(env_hash);
mailbox_index.lock().unwrap().insert(env_hash, mailbox_hash);
if let Ok(cached) = cache_dir.place_cache_file(file_name) {
/* place result in cache directory */
let f = fs::File::create(cached)?;
let metadata = f.metadata()?;
let mut permissions = metadata.permissions();
permissions.set_mode(0o600); // Read/write for owner only.
f.set_permissions(permissions)?;
permissions.set_mode(0o600); // Read/write for owner only.
f.set_permissions(permissions)?;
let writer = io::BufWriter::new(f);
bincode::serialize_into(writer, &env)?;
}
if !env.is_seen() {
*unseen.lock().unwrap() += 1;
}
*total.lock().unwrap() += 1;
local_r.push(env);
let writer = io::BufWriter::new(f);
bincode::serialize_into(writer, &env)?;
}
Err(err) => {
debug!(
"DEBUG: hash {}, path: {} couldn't be parsed, {}",
env_hash,
file.as_path().display(),
err,
);
continue;
if !env.is_seen() {
unseen_total += 1;
}
local_r.push(env);
}
Err(err) => {
debug!(
"DEBUG: hash {}, path: {} couldn't be parsed, {}",
env_hash,
file.as_path().display(),
err,
);
continue;
}
}
}
*total.lock().unwrap() += local_r.len();
*unseen.lock().unwrap() += unseen_total;
Ok(local_r)
}
}
@ -190,7 +179,6 @@ impl MaildirStream {
impl Stream for MaildirStream {
type Item = Result<Vec<Envelope>>;
fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
//todo!()
let payloads = self.payloads.as_mut();
payloads.poll_next(cx)
}

Loading…
Cancel
Save