melib/email: remove Envelope::from_token

memfd
Manos Pitsidianakis 2020-08-26 00:00:38 +03:00
parent 9e9be0b5f3
commit 4217839155
Signed by: Manos Pitsidianakis
GPG Key ID: 73627C2F690DF710
6 changed files with 249 additions and 362 deletions

View File

@ -27,6 +27,7 @@ use crate::error::{MeliError, Result};
use crate::shellexpand::ShellExpandTrait; use crate::shellexpand::ShellExpandTrait;
use futures::prelude::Stream; use futures::prelude::Stream;
use memmap::{Mmap, Protection};
extern crate notify; extern crate notify;
use self::notify::{watcher, DebouncedEvent, RecursiveMode, Watcher}; use self::notify::{watcher, DebouncedEvent, RecursiveMode, Watcher};
use std::time::Duration; use std::time::Duration;
@ -277,33 +278,34 @@ impl MailBackend for MaildirType {
} }
(*map).insert(hash, PathBuf::from(&file).into()); (*map).insert(hash, PathBuf::from(&file).into());
} }
let op = Box::new(MaildirOp::new(hash, map.clone(), mailbox_hash)); if let Ok(mut env) = Envelope::from_bytes(
if let Ok(e) = Envelope::from_token(op, hash) { unsafe { &Mmap::open_path(&file, Protection::Read)?.as_slice() },
mailbox_index.lock().unwrap().insert(e.hash(), mailbox_hash); None,
) {
env.set_hash(hash);
mailbox_index
.lock()
.unwrap()
.insert(env.hash(), mailbox_hash);
let file_name = file.strip_prefix(&root_path).unwrap().to_path_buf(); let file_name = file.strip_prefix(&root_path).unwrap().to_path_buf();
if let Ok(cached) = cache_dir.place_cache_file(file_name) { if let Ok(cached) = cache_dir.place_cache_file(file_name) {
/* place result in cache directory */ /* place result in cache directory */
let f = match fs::File::create(cached) { let f = fs::File::create(cached)?;
Ok(f) => f, let metadata = f.metadata()?;
Err(e) => {
panic!("{}", e);
}
};
let metadata = f.metadata().unwrap();
let mut permissions = metadata.permissions(); let mut permissions = metadata.permissions();
permissions.set_mode(0o600); // Read/write for owner only. permissions.set_mode(0o600); // Read/write for owner only.
f.set_permissions(permissions).unwrap(); f.set_permissions(permissions)?;
let writer = io::BufWriter::new(f); let writer = io::BufWriter::new(f);
bincode::serialize_into(writer, &e).unwrap(); bincode::serialize_into(writer, &env)?;
} }
(sender)( (sender)(
account_hash, account_hash,
BackendEvent::Refresh(RefreshEvent { BackendEvent::Refresh(RefreshEvent {
account_hash, account_hash,
mailbox_hash, mailbox_hash,
kind: Create(Box::new(e)), kind: Create(Box::new(env)),
}), }),
); );
} else { } else {
@ -402,7 +404,7 @@ impl MailBackend for MaildirType {
.strip_prefix(&root_path) .strip_prefix(&root_path)
.unwrap() .unwrap()
.to_path_buf(); .to_path_buf();
if let Some(env) = add_path_to_index( if let Ok(env) = add_path_to_index(
&hash_indexes, &hash_indexes,
mailbox_hash, mailbox_hash,
pathbuf.as_path(), pathbuf.as_path(),
@ -457,7 +459,7 @@ impl MailBackend for MaildirType {
drop(hash_indexes_lock); drop(hash_indexes_lock);
/* Did we just miss a Create event? In any case, create /* Did we just miss a Create event? In any case, create
* envelope. */ * envelope. */
if let Some(env) = add_path_to_index( if let Ok(env) = add_path_to_index(
&hash_indexes, &hash_indexes,
mailbox_hash, mailbox_hash,
pathbuf.as_path(), pathbuf.as_path(),
@ -483,12 +485,13 @@ impl MailBackend for MaildirType {
let new_hash: EnvelopeHash = get_file_hash(pathbuf.as_path()); let new_hash: EnvelopeHash = get_file_hash(pathbuf.as_path());
if index_lock.get_mut(&new_hash).is_none() { if index_lock.get_mut(&new_hash).is_none() {
debug!("write notice"); debug!("write notice");
let op = Box::new(MaildirOp::new( if let Ok(mut env) = Envelope::from_bytes(
new_hash, unsafe {
hash_indexes.clone(), &Mmap::open_path(&pathbuf, Protection::Read)?.as_slice()
mailbox_hash, },
)); None,
if let Ok(env) = Envelope::from_token(op, new_hash) { ) {
env.set_hash(new_hash);
debug!("{}\t{:?}", new_hash, &pathbuf); debug!("{}\t{:?}", new_hash, &pathbuf);
debug!( debug!(
"hash {}, path: {:?} couldn't be parsed", "hash {}, path: {:?} couldn't be parsed",
@ -636,7 +639,7 @@ impl MailBackend for MaildirType {
.to_path_buf(); .to_path_buf();
debug!("filename = {:?}", file_name); debug!("filename = {:?}", file_name);
drop(hash_indexes_lock); drop(hash_indexes_lock);
if let Some(env) = add_path_to_index( if let Ok(env) = add_path_to_index(
&hash_indexes, &hash_indexes,
mailbox_hash, mailbox_hash,
dest.as_path(), dest.as_path(),
@ -1048,193 +1051,6 @@ impl MaildirType {
})) }))
} }
/*
pub fn multicore(
&mut self,
cores: usize,
mailbox_hash: MailboxHash,
) -> Async<Result<Vec<Envelope>>> {
let mut w = AsyncBuilder::new();
let cache_dir = xdg::BaseDirectories::with_profile("meli", &self.name).unwrap();
let handle = {
let tx = w.tx();
let mailbox: &MaildirMailbox = &self.mailboxes[&mailbox_hash];
let unseen = mailbox.unseen.clone();
let total = mailbox.total.clone();
let tx_final = w.tx();
let mut path: PathBuf = mailbox.fs_path().into();
let name = format!("parsing {:?}", mailbox.name());
let root_path = self.path.to_path_buf();
let map = self.hash_indexes.clone();
let mailbox_index = self.mailbox_index.clone();
let closure = move |work_context: crate::async_workers::WorkContext| {
work_context
.set_name
.send((std::thread::current().id(), name.clone()))
.unwrap();
let mut thunk = move || {
path.push("new");
for d in path.read_dir()? {
if let Ok(p) = d {
move_to_cur(p.path()).ok().take();
}
}
path.pop();
path.push("cur");
let iter = path.read_dir()?;
let count = path.read_dir()?.count();
let mut files: Vec<PathBuf> = Vec::with_capacity(count);
let mut ret = Vec::with_capacity(count);
for e in iter {
let e = e.and_then(|x| {
let path = x.path();
Ok(path)
})?;
files.push(e);
}
if !files.is_empty() {
crossbeam::scope(|scope| {
let mut threads = Vec::with_capacity(cores);
let chunk_size = if count / cores > 0 {
count / cores
} else {
count
};
for chunk in files.chunks(chunk_size) {
let unseen = unseen.clone();
let total = total.clone();
let cache_dir = cache_dir.clone();
let tx = tx.clone();
let map = map.clone();
let mailbox_index = mailbox_index.clone();
let root_path = root_path.clone();
let s = scope.builder().name(name.clone()).spawn(move |_| {
let len = chunk.len();
let size = if len <= 100 { 100 } else { (len / 100) * 100 };
let mut local_r: Vec<Envelope> =
Vec::with_capacity(chunk.len());
for c in chunk.chunks(size) {
//thread::yield_now();
let map = map.clone();
let mailbox_index = mailbox_index.clone();
let len = c.len();
for file in c {
/* Check if we have a cache file with this email's
* filename */
let file_name = PathBuf::from(file)
.strip_prefix(&root_path)
.unwrap()
.to_path_buf();
if let Some(cached) =
cache_dir.find_cache_file(&file_name)
{
/* Cached struct exists, try to load it */
let reader = io::BufReader::new(
fs::File::open(&cached).unwrap(),
);
let result: result::Result<Envelope, _> =
bincode::deserialize_from(reader);
if let Ok(env) = result {
let mut map = map.lock().unwrap();
let map = map.entry(mailbox_hash).or_default();
let hash = env.hash();
map.insert(hash, file.clone().into());
mailbox_index
.lock()
.unwrap()
.insert(hash, mailbox_hash);
if !env.is_seen() {
*unseen.lock().unwrap() += 1;
}
*total.lock().unwrap() += 1;
local_r.push(env);
continue;
}
};
let hash = get_file_hash(file);
{
let mut map = map.lock().unwrap();
let map = map.entry(mailbox_hash).or_default();
(*map).insert(hash, PathBuf::from(file).into());
}
let op = Box::new(MaildirOp::new(
hash,
map.clone(),
mailbox_hash,
));
if let Ok(e) = Envelope::from_token(op, hash) {
mailbox_index
.lock()
.unwrap()
.insert(e.hash(), mailbox_hash);
if let Ok(cached) =
cache_dir.place_cache_file(file_name)
{
/* place result in cache directory */
let f = match fs::File::create(cached) {
Ok(f) => f,
Err(e) => {
panic!("{}", e);
}
};
let metadata = f.metadata().unwrap();
let mut permissions = metadata.permissions();
permissions.set_mode(0o600); // Read/write for owner only.
f.set_permissions(permissions).unwrap();
let writer = io::BufWriter::new(f);
bincode::serialize_into(writer, &e).unwrap();
}
if !e.is_seen() {
*unseen.lock().unwrap() += 1;
}
*total.lock().unwrap() += 1;
local_r.push(e);
} else {
debug!(
"DEBUG: hash {}, path: {} couldn't be parsed",
hash,
file.as_path().display()
);
continue;
}
}
tx.send(AsyncStatus::ProgressReport(len)).unwrap();
}
local_r
});
threads.push(s.unwrap());
}
for t in threads {
let mut result = t.join().unwrap();
ret.append(&mut result);
work_context
.set_status
.send((
std::thread::current().id(),
format!("parsing.. {}/{}", ret.len(), files.len()),
))
.unwrap();
}
})
.unwrap();
}
Ok(ret)
};
let result = thunk();
tx_final.send(AsyncStatus::Payload(result)).unwrap();
tx_final.send(AsyncStatus::Finished).unwrap();
};
Box::new(closure)
};
w.build(handle)
}
*/
pub fn save_to_mailbox(mut path: PathBuf, bytes: Vec<u8>, flags: Option<Flag>) -> Result<()> { pub fn save_to_mailbox(mut path: PathBuf, bytes: Vec<u8>, flags: Option<Flag>) -> Result<()> {
for d in &["cur", "new", "tmp"] { for d in &["cur", "new", "tmp"] {
path.push(d); path.push(d);
@ -1330,49 +1146,42 @@ fn add_path_to_index(
path: &Path, path: &Path,
cache_dir: &xdg::BaseDirectories, cache_dir: &xdg::BaseDirectories,
file_name: PathBuf, file_name: PathBuf,
) -> Option<Envelope> { ) -> Result<Envelope> {
let env: Envelope;
debug!("add_path_to_index path {:?} filename{:?}", path, file_name); debug!("add_path_to_index path {:?} filename{:?}", path, file_name);
let hash = get_file_hash(path); let env_hash = get_file_hash(path);
{ {
let mut map = hash_index.lock().unwrap(); let mut map = hash_index.lock().unwrap();
let map = map.entry(mailbox_hash).or_default(); let map = map.entry(mailbox_hash).or_default();
map.insert(hash, path.to_path_buf().into()); map.insert(env_hash, path.to_path_buf().into());
debug!( debug!(
"inserted {} in {} map, len={}", "inserted {} in {} map, len={}",
hash, env_hash,
mailbox_hash, mailbox_hash,
map.len() map.len()
); );
} }
let op = Box::new(MaildirOp::new(hash, hash_index.clone(), mailbox_hash)); //Mmap::open_path(self.path(), Protection::Read)?
if let Ok(e) = Envelope::from_token(op, hash) { let mut env = Envelope::from_bytes(
debug!("add_path_to_index gen {}\t{}", hash, file_name.display()); unsafe { &Mmap::open_path(path, Protection::Read)?.as_slice() },
if let Ok(cached) = cache_dir.place_cache_file(file_name) { None,
debug!("putting in cache"); )?;
/* place result in cache directory */ env.set_hash(env_hash);
let f = match fs::File::create(cached) { debug!(
Ok(f) => f, "add_path_to_index gen {}\t{}",
Err(e) => { env_hash,
panic!("{}", e); file_name.display()
} );
}; if let Ok(cached) = cache_dir.place_cache_file(file_name) {
let metadata = f.metadata().unwrap(); debug!("putting in cache");
let mut permissions = metadata.permissions(); /* place result in cache directory */
let f = fs::File::create(cached)?;
let metadata = f.metadata()?;
let mut permissions = metadata.permissions();
permissions.set_mode(0o600); // Read/write for owner only. permissions.set_mode(0o600); // Read/write for owner only.
f.set_permissions(permissions).unwrap(); f.set_permissions(permissions)?;
let writer = io::BufWriter::new(f); let writer = io::BufWriter::new(f);
bincode::serialize_into(writer, &e).unwrap(); bincode::serialize_into(writer, &env)?;
}
env = e;
} else {
debug!(
"DEBUG: hash {}, path: {} couldn't be parsed in `add_path_to_index`",
hash,
path.display()
);
return None;
} }
Some(env) Ok(env)
} }

View File

@ -25,6 +25,7 @@ use core::future::Future;
use core::pin::Pin; use core::pin::Pin;
use futures::stream::{FuturesUnordered, StreamExt}; use futures::stream::{FuturesUnordered, StreamExt};
use futures::task::{Context, Poll}; use futures::task::{Context, Poll};
use memmap::{Mmap, Protection};
use std::io::{self}; use std::io::{self};
use std::os::unix::fs::PermissionsExt; use std::os::unix::fs::PermissionsExt;
use std::path::PathBuf; use std::path::PathBuf;
@ -32,7 +33,11 @@ use std::result;
use std::sync::{Arc, Mutex}; use std::sync::{Arc, Mutex};
pub struct MaildirStream { pub struct MaildirStream {
payloads: Pin<Box<FuturesUnordered<Pin<Box<dyn Future<Output = Result<Vec<Envelope>>> + Send + 'static>>>>>, payloads: Pin<
Box<
FuturesUnordered<Pin<Box<dyn Future<Output = Result<Vec<Envelope>>> + Send + 'static>>>,
>,
>,
} }
impl MaildirStream { impl MaildirStream {
@ -135,44 +140,46 @@ impl MaildirStream {
continue; continue;
} }
}; };
let hash = get_file_hash(file); let env_hash = get_file_hash(file);
{ {
let mut map = map.lock().unwrap(); let mut map = map.lock().unwrap();
let map = map.entry(mailbox_hash).or_default(); let map = map.entry(mailbox_hash).or_default();
(*map).insert(hash, PathBuf::from(file).into()); (*map).insert(env_hash, PathBuf::from(file).into());
} }
let op = Box::new(MaildirOp::new(hash, map.clone(), mailbox_hash)); match Envelope::from_bytes(
if let Ok(e) = Envelope::from_token(op, hash) { unsafe { &Mmap::open_path(&file, Protection::Read)?.as_slice() },
mailbox_index.lock().unwrap().insert(e.hash(), mailbox_hash); None,
if let Ok(cached) = cache_dir.place_cache_file(file_name) { ) {
/* place result in cache directory */ Ok(mut env) => {
let f = match fs::File::create(cached) { env.set_hash(env_hash);
Ok(f) => f, mailbox_index.lock().unwrap().insert(env_hash, mailbox_hash);
Err(e) => { if let Ok(cached) = cache_dir.place_cache_file(file_name) {
panic!("{}", e); /* place result in cache directory */
} let f = fs::File::create(cached)?;
}; let metadata = f.metadata()?;
let metadata = f.metadata().unwrap(); let mut permissions = metadata.permissions();
let mut permissions = metadata.permissions();
permissions.set_mode(0o600); // Read/write for owner only. permissions.set_mode(0o600); // Read/write for owner only.
f.set_permissions(permissions).unwrap(); f.set_permissions(permissions)?;
let writer = io::BufWriter::new(f); let writer = io::BufWriter::new(f);
bincode::serialize_into(writer, &e).unwrap(); bincode::serialize_into(writer, &env)?;
}
if !env.is_seen() {
*unseen.lock().unwrap() += 1;
}
*total.lock().unwrap() += 1;
local_r.push(env);
} }
if !e.is_seen() { Err(err) => {
*unseen.lock().unwrap() += 1; debug!(
"DEBUG: hash {}, path: {} couldn't be parsed, {}",
env_hash,
file.as_path().display(),
err,
);
continue;
} }
*total.lock().unwrap() += 1;
local_r.push(e);
} else {
debug!(
"DEBUG: hash {}, path: {} couldn't be parsed",
hash,
file.as_path().display()
);
continue;
} }
} }
} }

View File

@ -41,7 +41,6 @@ mod headers;
pub mod signatures; pub mod signatures;
pub use headers::*; pub use headers::*;
use crate::backends::BackendOp;
use crate::datetime::UnixTimestamp; use crate::datetime::UnixTimestamp;
use crate::error::{MeliError, Result}; use crate::error::{MeliError, Result};
use crate::thread::ThreadNodeHash; use crate::thread::ThreadNodeHash;
@ -217,14 +216,6 @@ impl Envelope {
Err(MeliError::new("Couldn't parse mail.")) Err(MeliError::new("Couldn't parse mail."))
} }
pub fn from_token(mut operation: Box<dyn BackendOp>, hash: EnvelopeHash) -> Result<Envelope> {
let mut e = Envelope::new(hash);
e.flags = futures::executor::block_on(operation.fetch_flags()?)?;
let bytes = futures::executor::block_on(operation.as_bytes()?)?;
e.populate_headers(&bytes)?;
Ok(e)
}
pub fn hash(&self) -> EnvelopeHash { pub fn hash(&self) -> EnvelopeHash {
self.hash self.hash
} }
@ -487,13 +478,6 @@ impl Envelope {
}) })
} }
/// Requests bytes from backend and thus can fail
pub fn body(&self, mut operation: Box<dyn BackendOp>) -> Result<Attachment> {
debug!("searching body for {:?}", self.message_id_display());
let bytes = futures::executor::block_on(operation.as_bytes()?)?;
Ok(self.body_bytes(&bytes))
}
pub fn subject(&self) -> Cow<str> { pub fn subject(&self) -> Cow<str> {
match self.subject { match self.subject {
Some(ref s) => Cow::from(s), Some(ref s) => Cow::from(s),

View File

@ -269,6 +269,13 @@ impl From<std::ffi::NulError> for MeliError {
} }
} }
impl From<Box<bincode::ErrorKind>> for MeliError {
#[inline]
fn from(kind: Box<bincode::ErrorKind>) -> MeliError {
MeliError::new(format!("{}", kind)).set_source(Some(Arc::new(kind)))
}
}
impl From<nix::Error> for MeliError { impl From<nix::Error> for MeliError {
#[inline] #[inline]
fn from(kind: nix::Error) -> MeliError { fn from(kind: nix::Error) -> MeliError {

View File

@ -606,47 +606,98 @@ impl Account {
RefreshEventKind::Update(old_hash, envelope) => { RefreshEventKind::Update(old_hash, envelope) => {
#[cfg(feature = "sqlite3")] #[cfg(feature = "sqlite3")]
{ {
if let Err(err) = crate::sqlite3::remove(old_hash).and_then(|_| { match crate::sqlite3::remove(old_hash).map(|_| {
crate::sqlite3::insert(&envelope, &self.backend, &self.name) crate::sqlite3::insert(
(*envelope).clone(),
self.backend.clone(),
self.name.clone(),
)
}) { }) {
melib::log( Err(err) => {
format!( melib::log(
"Failed to update envelope {} in cache: {}", format!(
envelope.message_id_display(), "Failed to update envelope {} in cache: {}",
err.to_string() envelope.message_id_display(),
), err.to_string()
melib::ERROR, ),
); melib::ERROR,
);
}
Ok(job) => {
let (channel, handle, job_id) =
self.job_executor.spawn_blocking(job);
self.insert_job(
job_id,
JobRequest::Generic {
name: format!(
"Update envelope {} in sqlite3 cache",
envelope.message_id_display()
)
.into(),
handle,
channel,
on_finish: None,
},
);
}
} }
} }
self.collection.update(old_hash, *envelope, mailbox_hash); self.collection.update(old_hash, *envelope, mailbox_hash);
return Some(EnvelopeUpdate(old_hash)); return Some(EnvelopeUpdate(old_hash));
} }
RefreshEventKind::NewFlags(env_hash, (flags, tags)) => { RefreshEventKind::NewFlags(env_hash, (flags, tags)) => {
let mut envelopes = self.collection.envelopes.write().unwrap(); self.collection
envelopes.entry(env_hash).and_modify(|entry| { .envelopes
entry.labels_mut().clear(); .write()
entry .unwrap()
.labels_mut() .entry(env_hash)
.extend(tags.into_iter().map(|h| tag_hash!(h))); .and_modify(|entry| {
entry.set_flags(flags); entry.labels_mut().clear();
}); entry
.labels_mut()
.extend(tags.into_iter().map(|h| tag_hash!(h)));
entry.set_flags(flags);
});
#[cfg(feature = "sqlite3")] #[cfg(feature = "sqlite3")]
{ {
if let Err(err) = crate::sqlite3::remove(env_hash).and_then(|_| { match crate::sqlite3::remove(env_hash).map(|_| {
crate::sqlite3::insert(&envelopes[&env_hash], &self.backend, &self.name) crate::sqlite3::insert(
self.collection.envelopes.read().unwrap()[&env_hash].clone(),
self.backend.clone(),
self.name.clone(),
)
}) { }) {
melib::log( Ok(job) => {
format!( let (channel, handle, job_id) =
"Failed to update envelope {} in cache: {}", self.job_executor.spawn_blocking(job);
envelopes[&env_hash].message_id_display(), self.insert_job(
err.to_string() job_id,
), JobRequest::Generic {
melib::ERROR, name: format!(
); "Update envelope {} in sqlite3 cache",
self.collection.envelopes.read().unwrap()[&env_hash]
.message_id_display()
)
.into(),
handle,
channel,
on_finish: None,
},
);
}
Err(err) => {
melib::log(
format!(
"Failed to update envelope {} in cache: {}",
self.collection.envelopes.read().unwrap()[&env_hash]
.message_id_display(),
err.to_string()
),
melib::ERROR,
);
}
} }
} }
drop(envelopes);
self.collection.update_flags(env_hash, mailbox_hash); self.collection.update_flags(env_hash, mailbox_hash);
return Some(EnvelopeUpdate(env_hash)); return Some(EnvelopeUpdate(env_hash));
} }
@ -657,19 +708,42 @@ impl Account {
} }
#[cfg(feature = "sqlite3")] #[cfg(feature = "sqlite3")]
{ {
let envelopes = self.collection.envelopes.read(); match crate::sqlite3::remove(old_hash).map(|_| {
let envelopes = envelopes.unwrap(); crate::sqlite3::insert(
if let Err(err) = crate::sqlite3::remove(old_hash).and_then(|_| { self.collection.envelopes.read().unwrap()[&new_hash].clone(),
crate::sqlite3::insert(&envelopes[&new_hash], &self.backend, &self.name) self.backend.clone(),
self.name.clone(),
)
}) { }) {
melib::log( Err(err) => {
format!( melib::log(
"Failed to update envelope {} in cache: {}", format!(
&envelopes[&new_hash].message_id_display(), "Failed to update envelope {} in cache: {}",
err.to_string() &self.collection.envelopes.read().unwrap()[&new_hash]
), .message_id_display(),
melib::ERROR, err.to_string()
); ),
melib::ERROR,
);
}
Ok(job) => {
let (channel, handle, job_id) =
self.job_executor.spawn_blocking(job);
self.insert_job(
job_id,
JobRequest::Generic {
name: format!(
"Update envelope {} in sqlite3 cache",
self.collection.envelopes.read().unwrap()[&new_hash]
.message_id_display()
)
.into(),
handle,
channel,
on_finish: None,
},
);
}
} }
} }
return Some(EnvelopeRename(old_hash, new_hash)); return Some(EnvelopeRename(old_hash, new_hash));
@ -694,18 +768,25 @@ impl Account {
}; };
#[cfg(feature = "sqlite3")] #[cfg(feature = "sqlite3")]
{ {
if let Err(err) = let (channel, handle, job_id) =
crate::sqlite3::insert(&envelope, &self.backend, &self.name) self.job_executor.spawn_blocking(crate::sqlite3::insert(
{ (*envelope).clone(),
melib::log( self.backend.clone(),
format!( self.name.clone(),
"Failed to insert envelope {} in cache: {}", ));
envelope.message_id_display(), self.insert_job(
err.to_string() job_id,
), JobRequest::Generic {
melib::ERROR, name: format!(
); "Update envelope {} in sqlite3 cache",
} envelope.message_id_display()
)
.into(),
handle,
channel,
on_finish: None,
},
);
} }
if self.collection.insert(*envelope, mailbox_hash) { if self.collection.insert(*envelope, mailbox_hash) {

View File

@ -140,10 +140,10 @@ pub fn db_path() -> Result<PathBuf> {
//} //}
// //
// //
pub fn insert( pub async fn insert(
envelope: &Envelope, envelope: Envelope,
backend: &Arc<RwLock<Box<dyn MailBackend>>>, backend: Arc<RwLock<Box<dyn MailBackend>>>,
acc_name: &str, acc_name: String,
) -> Result<()> { ) -> Result<()> {
let db_path = db_path()?; let db_path = db_path()?;
if !db_path.exists() { if !db_path.exists() {
@ -153,11 +153,14 @@ pub fn insert(
} }
let conn = melib_sqlite3::open_db(db_path)?; let conn = melib_sqlite3::open_db(db_path)?;
let backend_lck = backend.read().unwrap();
let body = match backend_lck let op = backend
.operation(envelope.hash()) .read()
.and_then(|op| envelope.body(op)) .unwrap()
{ .operation(envelope.hash())?
.as_bytes()?;
let body = match op.await.map(|bytes| envelope.body_bytes(&bytes)) {
Ok(body) => body.text(), Ok(body) => body.text(),
Err(err) => { Err(err) => {
debug!( debug!(
@ -314,25 +317,21 @@ pub fn index(context: &mut crate::state::Context, account_index: usize) -> Resul
); );
for chunk in env_hashes.chunks(200) { for chunk in env_hashes.chunks(200) {
ctr += chunk.len(); ctr += chunk.len();
let envelopes_lck = acc_mutex.read().unwrap();
let backend_lck = backend_mutex.read().unwrap();
for env_hash in chunk { for env_hash in chunk {
let mut op = backend_mutex.read().unwrap().operation(*env_hash)?;
let bytes = op
.as_bytes()?
.await
.chain_err_summary(|| format!("Failed to open envelope {}", env_hash))?;
let envelopes_lck = acc_mutex.read().unwrap();
if let Some(e) = envelopes_lck.get(&env_hash) { if let Some(e) = envelopes_lck.get(&env_hash) {
let body = backend_lck let body = e.body_bytes(&bytes).text().replace('\0', "");
.operation(e.hash())
.and_then(|op| e.body(op))
.chain_err_summary(|| {
format!("Failed to open envelope {}", e.message_id_display(),)
})?
.text()
.replace('\0', "");
conn.execute("INSERT OR REPLACE INTO envelopes (account_id, hash, date, _from, _to, cc, bcc, subject, message_id, in_reply_to, _references, flags, has_attachments, body_text, timestamp) conn.execute("INSERT OR REPLACE INTO envelopes (account_id, hash, date, _from, _to, cc, bcc, subject, message_id, in_reply_to, _references, flags, has_attachments, body_text, timestamp)
VALUES (?1, ?2, ?3, ?4, ?5, ?6, ?7, ?8, ?9, ?10, ?11, ?12, ?13, ?14, ?15)", VALUES (?1, ?2, ?3, ?4, ?5, ?6, ?7, ?8, ?9, ?10, ?11, ?12, ?13, ?14, ?15)",
params![account_id, e.hash().to_be_bytes().to_vec(), e.date_as_str(), e.field_from_to_string(), e.field_to_to_string(), e.field_cc_to_string(), e.field_bcc_to_string(), e.subject().into_owned().trim_end_matches('\u{0}'), e.message_id_display().to_string(), e.in_reply_to_display().map(|f| f.to_string()).unwrap_or(String::new()), e.field_references_to_string(), i64::from(e.flags().bits()), if e.has_attachments() { 1 } else { 0 }, body, e.date().to_be_bytes().to_vec()], params![account_id, e.hash().to_be_bytes().to_vec(), e.date_as_str(), e.field_from_to_string(), e.field_to_to_string(), e.field_cc_to_string(), e.field_bcc_to_string(), e.subject().into_owned().trim_end_matches('\u{0}'), e.message_id_display().to_string(), e.in_reply_to_display().map(|f| f.to_string()).unwrap_or(String::new()), e.field_references_to_string(), i64::from(e.flags().bits()), if e.has_attachments() { 1 } else { 0 }, body, e.date().to_be_bytes().to_vec()],
).chain_err_summary(|| format!( "Failed to insert envelope {}", e.message_id_display()))?; ).chain_err_summary(|| format!( "Failed to insert envelope {}", e.message_id_display()))?;
} }
} }
drop(envelopes_lck);
let sleep_dur = std::time::Duration::from_millis(20); let sleep_dur = std::time::Duration::from_millis(20);
std::thread::sleep(sleep_dur); std::thread::sleep(sleep_dur);
} }