WIP maildir async

async
Manos Pitsidianakis 2020-06-22 11:29:36 +03:00
parent 4f3a98f90a
commit b72a1ca6d8
Signed by: Manos Pitsidianakis
GPG Key ID: 73627C2F690DF710
7 changed files with 290 additions and 2 deletions

65
Cargo.lock generated
View File

@ -27,6 +27,12 @@ version = "0.5.1"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "cff77d8686867eceff3105329d4698d96c2391c176d5d03adc90c7389162b5b8"
[[package]]
name = "async-task"
version = "3.0.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "c17772156ef2829aadc587461c7753af20b7e8db1529bc66855add962a3b35d3"
[[package]]
name = "autocfg"
version = "1.0.0"
@ -419,6 +425,21 @@ version = "0.3.3"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "3dcaa9ae7725d12cdb85b3ad99a434db70b468c09ded17e012d86b5c1010f7a7"
[[package]]
name = "futures"
version = "0.3.5"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "1e05b85ec287aac0dc34db7d4a569323df697f9c55b99b15d6b4ef8cde49f613"
dependencies = [
"futures-channel",
"futures-core",
"futures-executor",
"futures-io",
"futures-sink",
"futures-task",
"futures-util",
]
[[package]]
name = "futures-channel"
version = "0.3.5"
@ -426,6 +447,7 @@ source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "f366ad74c28cca6ba456d95e6422883cfb4b252a83bed929c83abfdbbf2967d5"
dependencies = [
"futures-core",
"futures-sink",
]
[[package]]
@ -434,12 +456,35 @@ version = "0.3.5"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "59f5fff90fd5d971f936ad674802482ba441b6f09ba5e15fd8b39145582ca399"
[[package]]
name = "futures-executor"
version = "0.3.5"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "10d6bb888be1153d3abeb9006b11b02cf5e9b209fda28693c31ae1e4e012e314"
dependencies = [
"futures-core",
"futures-task",
"futures-util",
]
[[package]]
name = "futures-io"
version = "0.3.5"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "de27142b013a8e869c14957e6d2edeef89e97c289e69d042ee3a49acd8b51789"
[[package]]
name = "futures-macro"
version = "0.3.5"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "d0b5a30a4328ab5473878237c447333c093297bded83a4983d10f4deea240d39"
dependencies = [
"proc-macro-hack",
"proc-macro2",
"quote",
"syn",
]
[[package]]
name = "futures-sink"
version = "0.3.5"
@ -461,12 +506,17 @@ version = "0.3.5"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "8764574ff08b701a084482c3c7031349104b07ac897393010494beaa18ce32c6"
dependencies = [
"futures-channel",
"futures-core",
"futures-io",
"futures-macro",
"futures-sink",
"futures-task",
"memchr",
"pin-project",
"pin-utils",
"proc-macro-hack",
"proc-macro-nested",
"slab",
]
@ -785,9 +835,11 @@ checksum = "60302e4db3a61da70c0cb7991976248362f30319e88850c487b9b95bbf059e00"
name = "meli"
version = "0.5.1"
dependencies = [
"async-task",
"bincode",
"bitflags",
"crossbeam",
"futures",
"libc",
"linkify",
"melib",
@ -826,6 +878,7 @@ dependencies = [
"crossbeam",
"data-encoding",
"encoding",
"futures",
"libc",
"libloading",
"memmap",
@ -1219,6 +1272,18 @@ dependencies = [
"version_check",
]
[[package]]
name = "proc-macro-hack"
version = "0.5.16"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "7e0456befd48169b9f13ef0f0ad46d492cf9d2dbb918bcf38e01eed4ce3ec5e4"
[[package]]
name = "proc-macro-nested"
version = "0.1.5"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "0afe1bd463b9e9ed51d0e0f0b50b6b146aec855c56fd182bb242388710a9b6de"
[[package]]
name = "proc-macro2"
version = "1.0.18"

View File

@ -53,7 +53,6 @@ structopt = { version = "0.3.14", default-features = false }
svg_crate = { version = "0.8.0", optional = true, package = "svg" }
futures = "0.3.5"
async-task = "3.0.0"
num_cpus = "1.12.0"
[build-dependencies]
syn = { version = "1.0.31", features = [] }

View File

@ -42,6 +42,7 @@ nix = "0.17.0"
rusqlite = {version = "0.20.0", optional = true }
libloading = "0.6.2"
futures = "0.3.5"
[features]
default = ["unicode_algorithms", "imap_backend", "maildir_backend", "mbox_backend", "vcard", "sqlite3"]

View File

@ -33,6 +33,8 @@ macro_rules! tag_hash {
#[cfg(feature = "imap_backend")]
pub mod imap;
//#[cfg(feature = "imap_backend")]
//pub mod imap2;
#[cfg(feature = "maildir_backend")]
pub mod maildir;
#[cfg(feature = "mbox_backend")]
@ -64,6 +66,8 @@ use std::fmt::Debug;
use std::ops::Deref;
use std::sync::{Arc, RwLock};
pub use futures::stream::Stream;
use std;
use std::collections::HashMap;
@ -284,6 +288,12 @@ pub trait MailBackend: ::std::fmt::Debug + Send + Sync {
fn is_online(&self) -> Result<()>;
fn connect(&mut self) {}
fn get(&mut self, mailbox: &Mailbox) -> Async<Result<Vec<Envelope>>>;
fn get_async(
&mut self,
mailbox: &Mailbox,
) -> Result<Box<dyn Stream<Item = Result<Vec<Envelope>>>>> {
Err(MeliError::new("Unimplemented."))
}
fn refresh(
&mut self,
_mailbox_hash: MailboxHash,

View File

@ -23,6 +23,10 @@
mod backend;
pub use self::backend::*;
mod stream;
pub use stream::*;
pub use futures::stream::Stream;
use crate::backends::*;
use crate::email::{Envelope, Flag};
use crate::error::{MeliError, Result};

View File

@ -24,6 +24,7 @@ use super::{
RefreshEventConsumer, RefreshEventKind::*,
};
use super::{MaildirMailbox, MaildirOp};
use futures::prelude::Stream;
use crate::async_workers::*;
use crate::conf::AccountSettings;
use crate::email::{Envelope, EnvelopeHash, Flag};
@ -159,7 +160,7 @@ pub(super) fn get_file_hash(file: &Path) -> EnvelopeHash {
hasher.finish()
}
fn move_to_cur(p: PathBuf) -> Result<PathBuf> {
pub fn move_to_cur(p: PathBuf) -> Result<PathBuf> {
let mut new = p.clone();
let file_name = p.to_string_lossy();
let slash_pos = file_name.bytes().rposition(|c| c == b'/').unwrap() + 1;
@ -192,6 +193,19 @@ impl MailBackend for MaildirType {
fn get(&mut self, mailbox: &Mailbox) -> Async<Result<Vec<Envelope>>> {
self.multicore(4, mailbox)
}
fn get_async(&mut self, mailbox: &Mailbox) -> Result<Box<dyn Stream<Item = Result<Vec<Envelope>>>>> {
let mailbox: &MaildirMailbox = &self.mailboxes[&self.owned_mailbox_idx(mailbox)];
let mailbox_hash = mailbox.hash();
let unseen = mailbox.unseen.clone();
let total = mailbox.total.clone();
let mut path: PathBuf = mailbox.fs_path().into();
let root_path = self.path.to_path_buf();
let map = self.hash_indexes.clone();
let mailbox_index = self.mailbox_index.clone();
super::stream::MaildirStream::new(&self.name, mailbox_hash, unseen, total, path, root_path, map, mailbox_index)
}
fn refresh(
&mut self,
mailbox_hash: MailboxHash,

View File

@ -0,0 +1,195 @@
/*
* meli - maildir async
*
* Copyright 2020 Manos Pitsidianakis
*
* This file is part of meli.
*
* meli is free software: you can redistribute it and/or modify
* it under the terms of the GNU General Public License as published by
* the Free Software Foundation, either version 3 of the License, or
* (at your option) any later version.
*
* meli is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* GNU General Public License for more details.
*
* You should have received a copy of the GNU General Public License
* along with meli. If not, see <http://www.gnu.org/licenses/>.
*/
use futures::task::{Context, Poll};
use core::pin::Pin;
use super::*;
use std::sync::{Arc,Mutex};
use std::io::{self, Read, Write};
use std::ops::{Deref, DerefMut};
use std::os::unix::fs::PermissionsExt;
use std::path::{Component, Path, PathBuf};
use std::result;
use std::sync::mpsc::channel;
use crate::backends::maildir::backend::move_to_cur;
use core::future::Future;
use futures::stream::FuturesUnordered;
pub struct MaildirStream {
mailbox_hash: MailboxHash,
unseen: Arc<Mutex<usize>>,
total: Arc<Mutex<usize>>,
path: PathBuf,
root_path: PathBuf,
map: HashIndexes,
mailbox_index: Arc<Mutex<HashMap<EnvelopeHash, MailboxHash>>>,
payloads: Pin<Box<FuturesUnordered<Pin<Box<dyn Future<Output = Result<Vec<Envelope>>>>>>>>,
}
impl MaildirStream {
pub fn new(name: &str, mailbox_hash: MailboxHash, unseen: Arc<Mutex<usize>>, total: Arc<Mutex<usize>>, mut path: PathBuf, root_path: PathBuf, map: HashIndexes, mailbox_index: Arc<Mutex<HashMap<EnvelopeHash, MailboxHash>>>) -> Result<Box<dyn Stream<Item = Result<Vec<Envelope>>>>> {
path.push("new");
for d in path.read_dir()? {
if let Ok(p) = d {
move_to_cur(p.path()).ok().take();
}
}
path.pop();
path.push("cur");
let iter = path.read_dir()?;
let count = path.read_dir()?.count();
let mut files: Vec<PathBuf> = Vec::with_capacity(count);
for e in iter {
let e = e.and_then(|x| {
let path = x.path();
Ok(path)
})?;
files.push(e);
}let payloads =Box::pin(
if !files.is_empty() {
let cores = 4_usize;
let chunk_size = if count / cores > 0 {
count / cores
} else {
count
};
files.chunks(chunk_size).map(|chunk| {
//Self::chunk(chunk, name, mailbox_hash, unseen, total, path, root_path, map, mailbox_index)})
let cache_dir = xdg::BaseDirectories::with_profile("meli", &name).unwrap();
Box::pin(Self::chunk(SmallVec::from(chunk), cache_dir, mailbox_hash, unseen.clone(), total.clone(), path.clone(), root_path.clone(), map.clone(), mailbox_index.clone())) as Pin<Box<dyn Future<Output = _>>>})
.collect::<_>()
} else { FuturesUnordered::new() });
Ok(Box::new(Self{mailbox_hash, unseen, total, path, root_path, map, mailbox_index, payloads}))
}
async fn chunk(chunk:SmallVec<[std::path::PathBuf; 2048]>, cache_dir:xdg::BaseDirectories, mailbox_hash: MailboxHash, unseen: Arc<Mutex<usize>>, total: Arc<Mutex<usize>>, path: PathBuf, root_path: PathBuf, map: HashIndexes, mailbox_index: Arc<Mutex<HashMap<EnvelopeHash, MailboxHash>>>) -> Result<Vec<Envelope>> {
let unseen = unseen.clone();
let total = total.clone();
let map = map.clone();
let mailbox_index = mailbox_index.clone();
let root_path = root_path.clone();
let len = chunk.len();
let size = if len <= 100 { 100 } else { (len / 100) * 100 };
let mut local_r: Vec<Envelope> =
Vec::with_capacity(chunk.len());
for c in chunk.chunks(size) {
let map = map.clone();
let mailbox_index = mailbox_index.clone();
let len = c.len();
for file in c {
/* Check if we have a cache file with this email's
* filename */
let file_name = PathBuf::from(file)
.strip_prefix(&root_path)
.unwrap()
.to_path_buf();
if let Some(cached) =
cache_dir.find_cache_file(&file_name)
{
/* Cached struct exists, try to load it */
let reader = io::BufReader::new(
fs::File::open(&cached).unwrap(),
);
let result: result::Result<Envelope, _> =
bincode::deserialize_from(reader);
if let Ok(env) = result {
let mut map = map.lock().unwrap();
let map = map.entry(mailbox_hash).or_default();
let hash = env.hash();
map.insert(hash, file.clone().into());
mailbox_index
.lock()
.unwrap()
.insert(hash, mailbox_hash);
if !env.is_seen() {
*unseen.lock().unwrap() += 1;
}
*total.lock().unwrap() += 1;
local_r.push(env);
continue;
}
};
let hash = get_file_hash(file);
{
let mut map = map.lock().unwrap();
let map = map.entry(mailbox_hash).or_default();
(*map).insert(hash, PathBuf::from(file).into());
}
let op = Box::new(MaildirOp::new(
hash,
map.clone(),
mailbox_hash,
));
if let Some(e) = Envelope::from_token(op, hash) {
mailbox_index
.lock()
.unwrap()
.insert(e.hash(), mailbox_hash);
if let Ok(cached) =
cache_dir.place_cache_file(file_name)
{
/* place result in cache directory */
let f = match fs::File::create(cached) {
Ok(f) => f,
Err(e) => {
panic!("{}", e);
}
};
let metadata = f.metadata().unwrap();
let mut permissions = metadata.permissions();
permissions.set_mode(0o600); // Read/write for owner only.
f.set_permissions(permissions).unwrap();
let writer = io::BufWriter::new(f);
bincode::serialize_into(writer, &e).unwrap();
}
if !e.is_seen() {
*unseen.lock().unwrap() += 1;
}
*total.lock().unwrap() += 1;
local_r.push(e);
} else {
debug!(
"DEBUG: hash {}, path: {} couldn't be parsed",
hash,
file.as_path().display()
);
continue;
}
}
}
Ok(local_r)
}
}
impl Stream for MaildirStream {
type Item = Result<Vec<Envelope>>;
fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>>{
//todo!()
let payloads = self.payloads.as_mut();
payloads.poll_next(cx)
}
}