From b72a1ca6d8a432fb7347c805f33a00390aa6a8ae Mon Sep 17 00:00:00 2001 From: Manos Pitsidianakis Date: Mon, 22 Jun 2020 11:29:36 +0300 Subject: [PATCH] WIP maildir async --- Cargo.lock | 65 +++++++++ Cargo.toml | 1 - melib/Cargo.toml | 1 + melib/src/backends.rs | 10 ++ melib/src/backends/maildir.rs | 4 + melib/src/backends/maildir/backend.rs | 16 ++- melib/src/backends/maildir/stream.rs | 195 ++++++++++++++++++++++++++ 7 files changed, 290 insertions(+), 2 deletions(-) create mode 100644 melib/src/backends/maildir/stream.rs diff --git a/Cargo.lock b/Cargo.lock index b0ee1af34..b085c30dc 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -27,6 +27,12 @@ version = "0.5.1" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "cff77d8686867eceff3105329d4698d96c2391c176d5d03adc90c7389162b5b8" +[[package]] +name = "async-task" +version = "3.0.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "c17772156ef2829aadc587461c7753af20b7e8db1529bc66855add962a3b35d3" + [[package]] name = "autocfg" version = "1.0.0" @@ -419,6 +425,21 @@ version = "0.3.3" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "3dcaa9ae7725d12cdb85b3ad99a434db70b468c09ded17e012d86b5c1010f7a7" +[[package]] +name = "futures" +version = "0.3.5" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "1e05b85ec287aac0dc34db7d4a569323df697f9c55b99b15d6b4ef8cde49f613" +dependencies = [ + "futures-channel", + "futures-core", + "futures-executor", + "futures-io", + "futures-sink", + "futures-task", + "futures-util", +] + [[package]] name = "futures-channel" version = "0.3.5" @@ -426,6 +447,7 @@ source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "f366ad74c28cca6ba456d95e6422883cfb4b252a83bed929c83abfdbbf2967d5" dependencies = [ "futures-core", + "futures-sink", ] [[package]] @@ -434,12 +456,35 @@ version = "0.3.5" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "59f5fff90fd5d971f936ad674802482ba441b6f09ba5e15fd8b39145582ca399" +[[package]] +name = "futures-executor" +version = "0.3.5" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "10d6bb888be1153d3abeb9006b11b02cf5e9b209fda28693c31ae1e4e012e314" +dependencies = [ + "futures-core", + "futures-task", + "futures-util", +] + [[package]] name = "futures-io" version = "0.3.5" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "de27142b013a8e869c14957e6d2edeef89e97c289e69d042ee3a49acd8b51789" +[[package]] +name = "futures-macro" +version = "0.3.5" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "d0b5a30a4328ab5473878237c447333c093297bded83a4983d10f4deea240d39" +dependencies = [ + "proc-macro-hack", + "proc-macro2", + "quote", + "syn", +] + [[package]] name = "futures-sink" version = "0.3.5" @@ -461,12 +506,17 @@ version = "0.3.5" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "8764574ff08b701a084482c3c7031349104b07ac897393010494beaa18ce32c6" dependencies = [ + "futures-channel", "futures-core", "futures-io", + "futures-macro", + "futures-sink", "futures-task", "memchr", "pin-project", "pin-utils", + "proc-macro-hack", + "proc-macro-nested", "slab", ] @@ -785,9 +835,11 @@ checksum = "60302e4db3a61da70c0cb7991976248362f30319e88850c487b9b95bbf059e00" name = "meli" version = "0.5.1" dependencies = [ + "async-task", "bincode", "bitflags", "crossbeam", + "futures", "libc", "linkify", "melib", @@ -826,6 +878,7 @@ dependencies = [ "crossbeam", "data-encoding", "encoding", + "futures", "libc", "libloading", "memmap", @@ -1219,6 +1272,18 @@ dependencies = [ "version_check", ] +[[package]] +name = "proc-macro-hack" +version = "0.5.16" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "7e0456befd48169b9f13ef0f0ad46d492cf9d2dbb918bcf38e01eed4ce3ec5e4" + +[[package]] +name = "proc-macro-nested" +version = "0.1.5" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "0afe1bd463b9e9ed51d0e0f0b50b6b146aec855c56fd182bb242388710a9b6de" + [[package]] name = "proc-macro2" version = "1.0.18" diff --git a/Cargo.toml b/Cargo.toml index 43a621655..44708afc6 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -53,7 +53,6 @@ structopt = { version = "0.3.14", default-features = false } svg_crate = { version = "0.8.0", optional = true, package = "svg" } futures = "0.3.5" async-task = "3.0.0" -num_cpus = "1.12.0" [build-dependencies] syn = { version = "1.0.31", features = [] } diff --git a/melib/Cargo.toml b/melib/Cargo.toml index bff932bd4..9ea702b9b 100644 --- a/melib/Cargo.toml +++ b/melib/Cargo.toml @@ -42,6 +42,7 @@ nix = "0.17.0" rusqlite = {version = "0.20.0", optional = true } libloading = "0.6.2" +futures = "0.3.5" [features] default = ["unicode_algorithms", "imap_backend", "maildir_backend", "mbox_backend", "vcard", "sqlite3"] diff --git a/melib/src/backends.rs b/melib/src/backends.rs index 2eb12fe7a..76efa8b8f 100644 --- a/melib/src/backends.rs +++ b/melib/src/backends.rs @@ -33,6 +33,8 @@ macro_rules! tag_hash { #[cfg(feature = "imap_backend")] pub mod imap; +//#[cfg(feature = "imap_backend")] +//pub mod imap2; #[cfg(feature = "maildir_backend")] pub mod maildir; #[cfg(feature = "mbox_backend")] @@ -64,6 +66,8 @@ use std::fmt::Debug; use std::ops::Deref; use std::sync::{Arc, RwLock}; +pub use futures::stream::Stream; + use std; use std::collections::HashMap; @@ -284,6 +288,12 @@ pub trait MailBackend: ::std::fmt::Debug + Send + Sync { fn is_online(&self) -> Result<()>; fn connect(&mut self) {} fn get(&mut self, mailbox: &Mailbox) -> Async>>; + fn get_async( + &mut self, + mailbox: &Mailbox, + ) -> Result>>>> { + Err(MeliError::new("Unimplemented.")) + } fn refresh( &mut self, _mailbox_hash: MailboxHash, diff --git a/melib/src/backends/maildir.rs b/melib/src/backends/maildir.rs index 5e8c770e5..cb234199b 100644 --- a/melib/src/backends/maildir.rs +++ b/melib/src/backends/maildir.rs @@ -23,6 +23,10 @@ mod backend; pub use self::backend::*; +mod stream; +pub use stream::*; + +pub use futures::stream::Stream; use crate::backends::*; use crate::email::{Envelope, Flag}; use crate::error::{MeliError, Result}; diff --git a/melib/src/backends/maildir/backend.rs b/melib/src/backends/maildir/backend.rs index c3f80dbee..e00f50681 100644 --- a/melib/src/backends/maildir/backend.rs +++ b/melib/src/backends/maildir/backend.rs @@ -24,6 +24,7 @@ use super::{ RefreshEventConsumer, RefreshEventKind::*, }; use super::{MaildirMailbox, MaildirOp}; +use futures::prelude::Stream; use crate::async_workers::*; use crate::conf::AccountSettings; use crate::email::{Envelope, EnvelopeHash, Flag}; @@ -159,7 +160,7 @@ pub(super) fn get_file_hash(file: &Path) -> EnvelopeHash { hasher.finish() } -fn move_to_cur(p: PathBuf) -> Result { +pub fn move_to_cur(p: PathBuf) -> Result { let mut new = p.clone(); let file_name = p.to_string_lossy(); let slash_pos = file_name.bytes().rposition(|c| c == b'/').unwrap() + 1; @@ -192,6 +193,19 @@ impl MailBackend for MaildirType { fn get(&mut self, mailbox: &Mailbox) -> Async>> { self.multicore(4, mailbox) } + + fn get_async(&mut self, mailbox: &Mailbox) -> Result>>>> { + let mailbox: &MaildirMailbox = &self.mailboxes[&self.owned_mailbox_idx(mailbox)]; + let mailbox_hash = mailbox.hash(); + let unseen = mailbox.unseen.clone(); + let total = mailbox.total.clone(); + let mut path: PathBuf = mailbox.fs_path().into(); + let root_path = self.path.to_path_buf(); + let map = self.hash_indexes.clone(); + let mailbox_index = self.mailbox_index.clone(); + super::stream::MaildirStream::new(&self.name, mailbox_hash, unseen, total, path, root_path, map, mailbox_index) + } + fn refresh( &mut self, mailbox_hash: MailboxHash, diff --git a/melib/src/backends/maildir/stream.rs b/melib/src/backends/maildir/stream.rs new file mode 100644 index 000000000..cbe9d5b72 --- /dev/null +++ b/melib/src/backends/maildir/stream.rs @@ -0,0 +1,195 @@ +/* + * meli - maildir async + * + * Copyright 2020 Manos Pitsidianakis + * + * This file is part of meli. + * + * meli is free software: you can redistribute it and/or modify + * it under the terms of the GNU General Public License as published by + * the Free Software Foundation, either version 3 of the License, or + * (at your option) any later version. + * + * meli is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU General Public License for more details. + * + * You should have received a copy of the GNU General Public License + * along with meli. If not, see . + */ + + +use futures::task::{Context, Poll}; +use core::pin::Pin; +use super::*; +use std::sync::{Arc,Mutex}; +use std::io::{self, Read, Write}; +use std::ops::{Deref, DerefMut}; +use std::os::unix::fs::PermissionsExt; +use std::path::{Component, Path, PathBuf}; +use std::result; +use std::sync::mpsc::channel; +use crate::backends::maildir::backend::move_to_cur; +use core::future::Future; +use futures::stream::FuturesUnordered; + +pub struct MaildirStream { + mailbox_hash: MailboxHash, + unseen: Arc>, + total: Arc>, + path: PathBuf, + root_path: PathBuf, + map: HashIndexes, + mailbox_index: Arc>>, + payloads: Pin>>>>>>>, + } + +impl MaildirStream { + pub fn new(name: &str, mailbox_hash: MailboxHash, unseen: Arc>, total: Arc>, mut path: PathBuf, root_path: PathBuf, map: HashIndexes, mailbox_index: Arc>>) -> Result>>>> { + path.push("new"); + for d in path.read_dir()? { + if let Ok(p) = d { + move_to_cur(p.path()).ok().take(); + } + } + path.pop(); + + path.push("cur"); + let iter = path.read_dir()?; + let count = path.read_dir()?.count(); + let mut files: Vec = Vec::with_capacity(count); + for e in iter { + let e = e.and_then(|x| { + let path = x.path(); + Ok(path) + })?; + files.push(e); + }let payloads =Box::pin( + if !files.is_empty() { + let cores = 4_usize; + let chunk_size = if count / cores > 0 { + count / cores + } else { + count + }; + files.chunks(chunk_size).map(|chunk| { + //Self::chunk(chunk, name, mailbox_hash, unseen, total, path, root_path, map, mailbox_index)}) + let cache_dir = xdg::BaseDirectories::with_profile("meli", &name).unwrap(); + Box::pin(Self::chunk(SmallVec::from(chunk), cache_dir, mailbox_hash, unseen.clone(), total.clone(), path.clone(), root_path.clone(), map.clone(), mailbox_index.clone())) as Pin>>}) + .collect::<_>() + + + } else { FuturesUnordered::new() }); + Ok(Box::new(Self{mailbox_hash, unseen, total, path, root_path, map, mailbox_index, payloads})) + } + + async fn chunk(chunk:SmallVec<[std::path::PathBuf; 2048]>, cache_dir:xdg::BaseDirectories, mailbox_hash: MailboxHash, unseen: Arc>, total: Arc>, path: PathBuf, root_path: PathBuf, map: HashIndexes, mailbox_index: Arc>>) -> Result> { + let unseen = unseen.clone(); + let total = total.clone(); + let map = map.clone(); + let mailbox_index = mailbox_index.clone(); + let root_path = root_path.clone(); + let len = chunk.len(); + let size = if len <= 100 { 100 } else { (len / 100) * 100 }; + let mut local_r: Vec = + Vec::with_capacity(chunk.len()); + for c in chunk.chunks(size) { + let map = map.clone(); + let mailbox_index = mailbox_index.clone(); + let len = c.len(); + for file in c { + /* Check if we have a cache file with this email's + * filename */ + let file_name = PathBuf::from(file) + .strip_prefix(&root_path) + .unwrap() + .to_path_buf(); + if let Some(cached) = + cache_dir.find_cache_file(&file_name) + { + /* Cached struct exists, try to load it */ + let reader = io::BufReader::new( + fs::File::open(&cached).unwrap(), + ); + let result: result::Result = + bincode::deserialize_from(reader); + if let Ok(env) = result { + let mut map = map.lock().unwrap(); + let map = map.entry(mailbox_hash).or_default(); + let hash = env.hash(); + map.insert(hash, file.clone().into()); + mailbox_index + .lock() + .unwrap() + .insert(hash, mailbox_hash); + if !env.is_seen() { + *unseen.lock().unwrap() += 1; + } + *total.lock().unwrap() += 1; + local_r.push(env); + continue; + } + }; + let hash = get_file_hash(file); + { + let mut map = map.lock().unwrap(); + let map = map.entry(mailbox_hash).or_default(); + (*map).insert(hash, PathBuf::from(file).into()); + } + let op = Box::new(MaildirOp::new( + hash, + map.clone(), + mailbox_hash, + )); + if let Some(e) = Envelope::from_token(op, hash) { + mailbox_index + .lock() + .unwrap() + .insert(e.hash(), mailbox_hash); + if let Ok(cached) = + cache_dir.place_cache_file(file_name) + { + /* place result in cache directory */ + let f = match fs::File::create(cached) { + Ok(f) => f, + Err(e) => { + panic!("{}", e); + } + }; + let metadata = f.metadata().unwrap(); + let mut permissions = metadata.permissions(); + + permissions.set_mode(0o600); // Read/write for owner only. + f.set_permissions(permissions).unwrap(); + + let writer = io::BufWriter::new(f); + bincode::serialize_into(writer, &e).unwrap(); + } + if !e.is_seen() { + *unseen.lock().unwrap() += 1; + } + *total.lock().unwrap() += 1; + local_r.push(e); + } else { + debug!( + "DEBUG: hash {}, path: {} couldn't be parsed", + hash, + file.as_path().display() + ); + continue; + } + } + } + Ok(local_r) + } +} + +impl Stream for MaildirStream { + type Item = Result>; + fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll>{ + //todo!() + let payloads = self.payloads.as_mut(); + payloads.poll_next(cx) + } +}