notmuch: cache messages by msg-id, not path

memfd
Manos Pitsidianakis 2020-05-09 14:27:20 +03:00
parent 3ea1ce5454
commit b5b9982d9e
Signed by: Manos Pitsidianakis
GPG Key ID: 73627C2F690DF710
1 changed files with 248 additions and 138 deletions

View File

@ -33,9 +33,11 @@ use fnv::FnvHashMap;
use smallvec::SmallVec;
use std::collections::hash_map::DefaultHasher;
use std::collections::BTreeMap;
use std::ffi::{CStr, CString};
use std::error::Error;
use std::ffi::{CStr, CString, OsStr};
use std::hash::{Hash, Hasher};
use std::io::Read;
use std::os::unix::ffi::OsStrExt;
use std::path::{Path, PathBuf};
use std::sync::{Arc, Mutex, RwLock};
@ -59,6 +61,35 @@ macro_rules! call {
}};
}
#[derive(Debug)]
pub struct NotmuchError(String);
impl std::fmt::Display for NotmuchError {
fn fmt(&self, f: &mut std::fmt::Formatter) -> std::fmt::Result {
std::fmt::Display::fmt(&self.0, f)
}
}
impl Error for NotmuchError {
fn source(&self) -> Option<&(dyn Error + 'static)> {
None
}
}
macro_rules! try_call {
($lib:expr, $call:expr) => {{
let status = unsafe { $call };
if status == _notmuch_status_NOTMUCH_STATUS_SUCCESS {
Ok(())
} else {
let c_str = unsafe { call!($lib, notmuch_status_to_string)(status) };
Err(NotmuchError(unsafe {
CStr::from_ptr(c_str).to_string_lossy().into_owned()
}))
}
}};
}
impl Drop for DbConnection {
fn drop(&mut self) {
let inner = self.inner.write().unwrap();
@ -72,8 +103,9 @@ impl Drop for DbConnection {
#[derive(Debug)]
pub struct NotmuchDb {
lib: Arc<libloading::Library>,
revision_uuid: Arc<RwLock<u64>>,
mailboxes: Arc<RwLock<FnvHashMap<MailboxHash, NotmuchMailbox>>>,
index: Arc<RwLock<FnvHashMap<EnvelopeHash, &'static CStr>>>,
index: Arc<RwLock<FnvHashMap<EnvelopeHash, CString>>>,
tag_index: Arc<RwLock<BTreeMap<u64, String>>>,
path: PathBuf,
save_messages_to: Option<PathBuf>,
@ -198,6 +230,7 @@ impl NotmuchDb {
}
Ok(Box::new(NotmuchDb {
lib,
revision_uuid: Arc::new(RwLock::new(0)),
path,
index: Arc::new(RwLock::new(Default::default())),
tag_index: Arc::new(RwLock::new(Default::default())),
@ -228,33 +261,14 @@ impl NotmuchDb {
}
pub fn search(&self, query_s: &str) -> Result<SmallVec<[EnvelopeHash; 512]>> {
let database = self.new_connection(false)?;
let database = Self::new_connection(self.path.as_path(), self.lib.clone(), false)?;
let database_lck = database.inner.read().unwrap();
let query_str = std::ffi::CString::new(query_s).unwrap();
let query: *mut notmuch_query_t =
unsafe { call!(self.lib, notmuch_query_create)(*database_lck, query_str.as_ptr()) };
if query.is_null() {
return Err(MeliError::new("Could not create query. Out of memory?"));
}
let mut messages: *mut notmuch_messages_t = std::ptr::null_mut();
let status = unsafe {
call!(self.lib, notmuch_query_search_messages)(query, &mut messages as *mut _)
};
if status != 0 {
return Err(MeliError::new(format!(
"Search for {} returned {}",
query_s, status,
)));
}
assert!(!messages.is_null());
let iter = MessageIterator {
messages,
lib: self.lib.clone(),
};
let query: Query = Query::new(self.lib.clone(), &database_lck, query_s)?;
let mut ret = SmallVec::new();
let iter = query.search()?;
for message in iter {
let fs_path = unsafe { call!(self.lib, notmuch_message_get_filename)(message) };
let c_str = unsafe { CStr::from_ptr(fs_path) };
let msg_id = unsafe { call!(self.lib, notmuch_message_get_message_id)(message) };
let c_str = unsafe { CStr::from_ptr(msg_id) };
let env_hash = {
let mut hasher = DefaultHasher::default();
c_str.hash(&mut hasher);
@ -263,18 +277,19 @@ impl NotmuchDb {
ret.push(env_hash);
}
unsafe {
call!(self.lib, notmuch_query_destroy)(query);
}
Ok(ret)
}
fn new_connection(&self, write: bool) -> Result<DbConnection> {
let path_c = std::ffi::CString::new(self.path.to_str().unwrap()).unwrap();
fn new_connection(
path: &Path,
lib: Arc<libloading::Library>,
write: bool,
) -> Result<DbConnection> {
let path_c = std::ffi::CString::new(path.to_str().unwrap()).unwrap();
let path_ptr = path_c.as_ptr();
let mut database: *mut notmuch_database_t = std::ptr::null_mut();
let status = unsafe {
call!(self.lib, notmuch_database_open)(
call!(lib, notmuch_database_open)(
path_ptr,
if write {
notmuch_database_mode_t_NOTMUCH_DATABASE_MODE_READ_WRITE
@ -287,13 +302,13 @@ impl NotmuchDb {
if status != 0 {
return Err(MeliError::new(format!(
"Could not open notmuch database at path {}. notmuch_database_open returned {}.",
self.path.display(),
path.display(),
status
)));
}
assert!(!database.is_null());
Ok(DbConnection {
lib: self.lib.clone(),
lib,
inner: Arc::new(RwLock::new(database)),
database_ph: std::marker::PhantomData,
})
@ -307,7 +322,7 @@ impl MailBackend for NotmuchDb {
fn get(&mut self, mailbox: &Mailbox) -> Async<Result<Vec<Envelope>>> {
let mut w = AsyncBuilder::new();
let mailbox_hash = mailbox.hash();
let database = self.new_connection(false);
let database = NotmuchDb::new_connection(self.path.as_path(), self.lib.clone(), false);
let index = self.index.clone();
let tag_index = self.tag_index.clone();
let mailboxes = self.mailboxes.clone();
@ -325,35 +340,22 @@ impl MailBackend for NotmuchDb {
let database_lck = database.inner.read().unwrap();
let mut mailboxes_lck = mailboxes.write().unwrap();
let mailbox = mailboxes_lck.get_mut(&mailbox_hash).unwrap();
let query_str = std::ffi::CString::new(mailbox.query_str.as_str()).unwrap();
let query: *mut notmuch_query_t =
unsafe { call!(lib, notmuch_query_create)(*database_lck, query_str.as_ptr()) };
if query.is_null() {
tx.send(AsyncStatus::Payload(Err(MeliError::new(
"Could not create query. Out of memory?",
))))
.unwrap();
tx.send(AsyncStatus::Finished).unwrap();
return;
}
let mut messages: *mut notmuch_messages_t = std::ptr::null_mut();
let status = unsafe {
call!(lib, notmuch_query_search_messages)(query, &mut messages as *mut _)
};
if status != 0 {
tx.send(AsyncStatus::Payload(Err(MeliError::new(format!(
"Search for {} returned {}",
mailbox.query_str.as_str(),
status,
)))))
.unwrap();
tx.send(AsyncStatus::Finished).unwrap();
return;
}
assert!(!messages.is_null());
let iter = MessageIterator {
messages,
lib: lib.clone(),
let query: Query =
match Query::new(lib.clone(), &database_lck, mailbox.query_str.as_str()) {
Ok(q) => q,
Err(err) => {
tx.send(AsyncStatus::Payload(Err(err))).unwrap();
tx.send(AsyncStatus::Finished).unwrap();
return;
}
};
let iter = match query.search() {
Ok(i) => i,
Err(err) => {
tx.send(AsyncStatus::Payload(Err(err))).unwrap();
tx.send(AsyncStatus::Finished).unwrap();
return;
}
};
for message in iter {
let mut response = String::new();
@ -375,13 +377,15 @@ impl MailBackend for NotmuchDb {
debug!("could not read fs_path {:?} {}", fs_path, e);
continue;
}
let c_str = unsafe { CStr::from_ptr(fs_path) };
let env_hash = {
let msg_id = unsafe { call!(lib, notmuch_message_get_message_id)(message) };
let c_str = unsafe { CStr::from_ptr(msg_id) };
let mut hasher = DefaultHasher::default();
c_str.hash(&mut hasher);
hasher.finish()
};
index.write().unwrap().insert(env_hash, c_str);
let c_str = unsafe { CStr::from_ptr(fs_path) };
index.write().unwrap().insert(env_hash, c_str.into());
let op = Box::new(NotmuchOp {
database: database.clone(),
lib: lib.clone(),
@ -392,10 +396,7 @@ impl MailBackend for NotmuchDb {
});
if let Some(mut env) = Envelope::from_token(op, env_hash) {
let mut tag_lock = tag_index.write().unwrap();
for tag in (TagIterator {
tags: unsafe { call!(lib, notmuch_message_get_tags)(message) },
lib: lib.clone(),
}) {
for tag in TagIterator::new(lib.clone(), message) {
let tag = tag.to_string_lossy().into_owned();
let mut hasher = DefaultHasher::new();
@ -412,9 +413,6 @@ impl MailBackend for NotmuchDb {
index.write().unwrap().remove(&env_hash);
}
}
unsafe {
call!(lib, notmuch_query_destroy)(query);
}
tx.send(AsyncStatus::Payload(Ok(ret))).unwrap();
tx.send(AsyncStatus::Finished).unwrap();
};
@ -425,15 +423,71 @@ impl MailBackend for NotmuchDb {
fn watch(
&self,
_sender: RefreshEventConsumer,
sender: RefreshEventConsumer,
_work_context: WorkContext,
) -> Result<std::thread::ThreadId> {
extern crate notify;
use crate::backends::{RefreshEvent, RefreshEventKind::*};
use notify::{watcher, RecursiveMode, Watcher};
let (tx, rx) = std::sync::mpsc::channel();
let mut watcher = watcher(tx, std::time::Duration::from_secs(2)).unwrap();
watcher.watch(&self.path, RecursiveMode::Recursive).unwrap();
let path = self.path.clone();
let lib = self.lib.clone();
{
let database = NotmuchDb::new_connection(path.as_path(), lib.clone(), false)?;
let mut revision_uuid_lck = self.revision_uuid.write().unwrap();
*revision_uuid_lck = unsafe {
call!(lib, notmuch_database_get_revision)(
*database.inner.read().unwrap(),
std::ptr::null_mut(),
)
};
debug!(*revision_uuid_lck);
}
let revision_uuid = self.revision_uuid.clone();
let handle = std::thread::Builder::new()
.name(format!(
"watching {}",
self.path.file_name().unwrap().to_str().unwrap()
))
.spawn(move || {})?;
.spawn(move || {
let _watcher = watcher;
let c = move || -> std::result::Result<(), MeliError> {
loop {
let _ = rx.recv().map_err(|err| err.to_string())?;
let database =
NotmuchDb::new_connection(path.as_path(), lib.clone(), false)?;
let database_lck = database.inner.read().unwrap();
let mut revision_uuid_lck = revision_uuid.write().unwrap();
let new_revision = unsafe {
call!(lib, notmuch_database_get_revision)(
*database_lck,
std::ptr::null_mut(),
)
};
if new_revision > *revision_uuid_lck {
debug!(new_revision);
debug!(*revision_uuid_lck);
let query_str =
format!("lastmod:{}..{}", *revision_uuid_lck, new_revision);
let query: Query = Query::new(lib.clone(), &database_lck, &query_str)?;
let iter = query.search()?;
*revision_uuid_lck = new_revision;
}
}
};
if let Err(err) = c() {
sender.send(RefreshEvent {
hash: 0,
kind: Failure(err.into()),
});
}
})?;
Ok(handle.thread().id())
}
fn mailboxes(&self) -> Result<FnvHashMap<MailboxHash, Mailbox>> {
@ -447,7 +501,9 @@ impl MailBackend for NotmuchDb {
}
fn operation(&self, hash: EnvelopeHash) -> Box<dyn BackendOp> {
Box::new(NotmuchOp {
database: Arc::new(self.new_connection(true).unwrap()),
database: Arc::new(
Self::new_connection(self.path.as_path(), self.lib.clone(), true).unwrap(),
),
lib: self.lib.clone(),
hash,
index: self.index.clone(),
@ -477,7 +533,7 @@ impl MailBackend for NotmuchDb {
#[derive(Debug)]
struct NotmuchOp {
hash: EnvelopeHash,
index: Arc<RwLock<FnvHashMap<EnvelopeHash, &'static CStr>>>,
index: Arc<RwLock<FnvHashMap<EnvelopeHash, CString>>>,
tag_index: Arc<RwLock<BTreeMap<u64, String>>>,
database: Arc<DbConnection>,
bytes: Option<String>,
@ -491,7 +547,7 @@ impl BackendOp for NotmuchOp {
fn as_bytes(&mut self) -> Result<&[u8]> {
let path = &self.index.read().unwrap()[&self.hash];
let mut f = std::fs::File::open(path.to_str().unwrap())?;
let mut f = std::fs::File::open(&OsStr::from_bytes(path.to_bytes()))?;
let mut response = String::new();
f.read_to_string(&mut response)?;
self.bytes = Some(response);
@ -500,22 +556,21 @@ impl BackendOp for NotmuchOp {
fn fetch_flags(&self) -> Flag {
let mut flag = Flag::default();
let path = self.index.read().unwrap()[&self.hash].to_str().unwrap();
if !path.contains(":2,") {
return flag;
}
for f in path.chars().rev() {
let path = &self.index.read().unwrap()[&self.hash];
for f in path.to_bytes().iter().rev() {
match f {
',' => break,
'D' => flag |= Flag::DRAFT,
'F' => flag |= Flag::FLAGGED,
'P' => flag |= Flag::PASSED,
'R' => flag |= Flag::REPLIED,
'S' => flag |= Flag::SEEN,
'T' => flag |= Flag::TRASHED,
b',' => break,
b'D' => flag |= Flag::DRAFT,
b'F' => flag |= Flag::FLAGGED,
b'P' => flag |= Flag::PASSED,
b'R' => flag |= Flag::REPLIED,
b'S' => flag |= Flag::SEEN,
b'T' => flag |= Flag::TRASHED,
_ => {
debug!("DEBUG: in fetch_flags, path is {}", path);
debug!(
"DEBUG: in fetch_flags, unknown flag '{}' path is {:?}",
f, path
);
}
}
}
@ -540,11 +595,8 @@ impl BackendOp for NotmuchOp {
)));
}
let tags = (TagIterator {
tags: unsafe { call!(self.lib, notmuch_message_get_tags)(message) },
lib: self.lib.clone(),
})
.collect::<Vec<&CStr>>();
// TODO: freeze/thaw message.
let tags = TagIterator::new(self.lib.clone(), message).collect::<Vec<&CStr>>();
debug!(&tags);
macro_rules! cstr {
@ -553,32 +605,34 @@ impl BackendOp for NotmuchOp {
};
}
macro_rules! add_tag {
($l:literal) => {
unsafe {
if tags.contains(&cstr!($l)) {
return Ok(());
}
if call!(self.lib, notmuch_message_add_tag)(message, cstr!($l).as_ptr())
!= _notmuch_status_NOTMUCH_STATUS_SUCCESS
{
return Err(MeliError::new("Could not set tag."));
}
($l:literal) => {{
if tags.contains(unsafe { &cstr!($l) }) {
return Ok(());
}
};
if let Err(err) = try_call!(
self.lib,
call!(self.lib, notmuch_message_add_tag)(message, cstr!($l).as_ptr())
) {
return Err(
MeliError::new("Could not set tag.").set_source(Some(Arc::new(err)))
);
}
}};
}
macro_rules! remove_tag {
($l:literal) => {
unsafe {
if !tags.contains(&cstr!($l)) {
return Ok(());
}
if call!(self.lib, notmuch_message_remove_tag)(message, cstr!($l).as_ptr())
!= _notmuch_status_NOTMUCH_STATUS_SUCCESS
{
return Err(MeliError::new("Could not set tag."));
}
($l:literal) => {{
if !tags.contains(unsafe { &cstr!($l) }) {
return Ok(());
}
};
if let Err(err) = try_call!(
self.lib,
call!(self.lib, notmuch_message_remove_tag)(message, cstr!($l).as_ptr())
) {
return Err(
MeliError::new("Could not set tag.").set_source(Some(Arc::new(err)))
);
}
}};
}
match f {
@ -598,23 +652,18 @@ impl BackendOp for NotmuchOp {
}
/* Update message filesystem path. */
if unsafe { call!(self.lib, notmuch_message_tags_to_maildir_flags)(message) }
!= _notmuch_status_NOTMUCH_STATUS_SUCCESS
{
return Err(MeliError::new("Could not set tag."));
if let Err(err) = try_call!(
self.lib,
call!(self.lib, notmuch_message_tags_to_maildir_flags)(message)
) {
return Err(MeliError::new("Could not set tag.").set_source(Some(Arc::new(err))));
}
let fs_path = unsafe { call!(self.lib, notmuch_message_get_filename)(message) };
let c_str = unsafe { CStr::from_ptr(fs_path) };
if let Some(p) = index_lck.get_mut(&self.hash) {
*p = c_str;
*p = c_str.into();
}
let new_hash = {
let mut hasher = DefaultHasher::default();
c_str.hash(&mut hasher);
hasher.finish()
};
index_lck.insert(new_hash, c_str);
Ok(())
}
@ -636,25 +685,25 @@ impl BackendOp for NotmuchOp {
)));
}
if value {
if unsafe {
if let Err(err) = try_call!(
self.lib,
call!(self.lib, notmuch_message_add_tag)(
message,
CString::new(tag.as_str()).unwrap().as_ptr(),
)
} != _notmuch_status_NOTMUCH_STATUS_SUCCESS
{
return Err(MeliError::new("Could not set tag."));
) {
return Err(MeliError::new("Could not set tag.").set_source(Some(Arc::new(err))));
}
debug!("added tag {}", &tag);
} else {
if unsafe {
if let Err(err) = try_call!(
self.lib,
call!(self.lib, notmuch_message_remove_tag)(
message,
CString::new(tag.as_str()).unwrap().as_ptr(),
)
} != _notmuch_status_NOTMUCH_STATUS_SUCCESS
{
return Err(MeliError::new("Could not set tag."));
) {
return Err(MeliError::new("Could not set tag.").set_source(Some(Arc::new(err))));
}
debug!("removed tag {}", &tag);
}
@ -706,6 +755,15 @@ pub struct TagIterator {
tags: *mut notmuch_tags_t,
}
impl TagIterator {
fn new(lib: Arc<libloading::Library>, message: *mut notmuch_message_t) -> Self {
TagIterator {
tags: unsafe { call!(lib, notmuch_message_get_tags)(message) },
lib,
}
}
}
impl Iterator for TagIterator {
type Item = &'static CStr;
fn next(&mut self) -> Option<Self::Item> {
@ -723,3 +781,55 @@ impl Iterator for TagIterator {
}
}
}
pub struct Query<'s> {
lib: Arc<libloading::Library>,
ptr: *mut notmuch_query_t,
query_str: &'s str,
}
impl<'s> Query<'s> {
fn new(
lib: Arc<libloading::Library>,
database: &*mut notmuch_database_t,
query_str: &'s str,
) -> Result<Self> {
let query_cstr = std::ffi::CString::new(query_str)?;
let query: *mut notmuch_query_t =
unsafe { call!(lib, notmuch_query_create)(*database, query_cstr.as_ptr()) };
if query.is_null() {
return Err(MeliError::new("Could not create query. Out of memory?"));
}
Ok(Query {
lib,
ptr: query,
query_str,
})
}
fn search(&self) -> Result<MessageIterator> {
let mut messages: *mut notmuch_messages_t = std::ptr::null_mut();
let status = unsafe {
call!(self.lib, notmuch_query_search_messages)(self.ptr, &mut messages as *mut _)
};
if status != 0 {
return Err(MeliError::new(format!(
"Search for {} returned {}",
self.query_str, status,
)));
}
assert!(!messages.is_null());
Ok(MessageIterator {
messages,
lib: self.lib.clone(),
})
}
}
impl Drop for Query<'_> {
fn drop(&mut self) {
unsafe {
call!(self.lib, notmuch_query_destroy)(self.ptr);
}
}
}