diff --git a/melib/src/backends/notmuch.rs b/melib/src/backends/notmuch.rs index 469a3e07c..27e43c4f4 100644 --- a/melib/src/backends/notmuch.rs +++ b/melib/src/backends/notmuch.rs @@ -33,9 +33,11 @@ use fnv::FnvHashMap; use smallvec::SmallVec; use std::collections::hash_map::DefaultHasher; use std::collections::BTreeMap; -use std::ffi::{CStr, CString}; +use std::error::Error; +use std::ffi::{CStr, CString, OsStr}; use std::hash::{Hash, Hasher}; use std::io::Read; +use std::os::unix::ffi::OsStrExt; use std::path::{Path, PathBuf}; use std::sync::{Arc, Mutex, RwLock}; @@ -59,6 +61,35 @@ macro_rules! call { }}; } +#[derive(Debug)] +pub struct NotmuchError(String); + +impl std::fmt::Display for NotmuchError { + fn fmt(&self, f: &mut std::fmt::Formatter) -> std::fmt::Result { + std::fmt::Display::fmt(&self.0, f) + } +} + +impl Error for NotmuchError { + fn source(&self) -> Option<&(dyn Error + 'static)> { + None + } +} + +macro_rules! try_call { + ($lib:expr, $call:expr) => {{ + let status = unsafe { $call }; + if status == _notmuch_status_NOTMUCH_STATUS_SUCCESS { + Ok(()) + } else { + let c_str = unsafe { call!($lib, notmuch_status_to_string)(status) }; + Err(NotmuchError(unsafe { + CStr::from_ptr(c_str).to_string_lossy().into_owned() + })) + } + }}; +} + impl Drop for DbConnection { fn drop(&mut self) { let inner = self.inner.write().unwrap(); @@ -72,8 +103,9 @@ impl Drop for DbConnection { #[derive(Debug)] pub struct NotmuchDb { lib: Arc, + revision_uuid: Arc>, mailboxes: Arc>>, - index: Arc>>, + index: Arc>>, tag_index: Arc>>, path: PathBuf, save_messages_to: Option, @@ -198,6 +230,7 @@ impl NotmuchDb { } Ok(Box::new(NotmuchDb { lib, + revision_uuid: Arc::new(RwLock::new(0)), path, index: Arc::new(RwLock::new(Default::default())), tag_index: Arc::new(RwLock::new(Default::default())), @@ -228,33 +261,14 @@ impl NotmuchDb { } pub fn search(&self, query_s: &str) -> Result> { - let database = self.new_connection(false)?; + let database = Self::new_connection(self.path.as_path(), self.lib.clone(), false)?; let database_lck = database.inner.read().unwrap(); - let query_str = std::ffi::CString::new(query_s).unwrap(); - let query: *mut notmuch_query_t = - unsafe { call!(self.lib, notmuch_query_create)(*database_lck, query_str.as_ptr()) }; - if query.is_null() { - return Err(MeliError::new("Could not create query. Out of memory?")); - } - let mut messages: *mut notmuch_messages_t = std::ptr::null_mut(); - let status = unsafe { - call!(self.lib, notmuch_query_search_messages)(query, &mut messages as *mut _) - }; - if status != 0 { - return Err(MeliError::new(format!( - "Search for {} returned {}", - query_s, status, - ))); - } - assert!(!messages.is_null()); - let iter = MessageIterator { - messages, - lib: self.lib.clone(), - }; + let query: Query = Query::new(self.lib.clone(), &database_lck, query_s)?; let mut ret = SmallVec::new(); + let iter = query.search()?; for message in iter { - let fs_path = unsafe { call!(self.lib, notmuch_message_get_filename)(message) }; - let c_str = unsafe { CStr::from_ptr(fs_path) }; + let msg_id = unsafe { call!(self.lib, notmuch_message_get_message_id)(message) }; + let c_str = unsafe { CStr::from_ptr(msg_id) }; let env_hash = { let mut hasher = DefaultHasher::default(); c_str.hash(&mut hasher); @@ -263,18 +277,19 @@ impl NotmuchDb { ret.push(env_hash); } - unsafe { - call!(self.lib, notmuch_query_destroy)(query); - } Ok(ret) } - fn new_connection(&self, write: bool) -> Result { - let path_c = std::ffi::CString::new(self.path.to_str().unwrap()).unwrap(); + fn new_connection( + path: &Path, + lib: Arc, + write: bool, + ) -> Result { + let path_c = std::ffi::CString::new(path.to_str().unwrap()).unwrap(); let path_ptr = path_c.as_ptr(); let mut database: *mut notmuch_database_t = std::ptr::null_mut(); let status = unsafe { - call!(self.lib, notmuch_database_open)( + call!(lib, notmuch_database_open)( path_ptr, if write { notmuch_database_mode_t_NOTMUCH_DATABASE_MODE_READ_WRITE @@ -287,13 +302,13 @@ impl NotmuchDb { if status != 0 { return Err(MeliError::new(format!( "Could not open notmuch database at path {}. notmuch_database_open returned {}.", - self.path.display(), + path.display(), status ))); } assert!(!database.is_null()); Ok(DbConnection { - lib: self.lib.clone(), + lib, inner: Arc::new(RwLock::new(database)), database_ph: std::marker::PhantomData, }) @@ -307,7 +322,7 @@ impl MailBackend for NotmuchDb { fn get(&mut self, mailbox: &Mailbox) -> Async>> { let mut w = AsyncBuilder::new(); let mailbox_hash = mailbox.hash(); - let database = self.new_connection(false); + let database = NotmuchDb::new_connection(self.path.as_path(), self.lib.clone(), false); let index = self.index.clone(); let tag_index = self.tag_index.clone(); let mailboxes = self.mailboxes.clone(); @@ -325,35 +340,22 @@ impl MailBackend for NotmuchDb { let database_lck = database.inner.read().unwrap(); let mut mailboxes_lck = mailboxes.write().unwrap(); let mailbox = mailboxes_lck.get_mut(&mailbox_hash).unwrap(); - let query_str = std::ffi::CString::new(mailbox.query_str.as_str()).unwrap(); - let query: *mut notmuch_query_t = - unsafe { call!(lib, notmuch_query_create)(*database_lck, query_str.as_ptr()) }; - if query.is_null() { - tx.send(AsyncStatus::Payload(Err(MeliError::new( - "Could not create query. Out of memory?", - )))) - .unwrap(); - tx.send(AsyncStatus::Finished).unwrap(); - return; - } - let mut messages: *mut notmuch_messages_t = std::ptr::null_mut(); - let status = unsafe { - call!(lib, notmuch_query_search_messages)(query, &mut messages as *mut _) - }; - if status != 0 { - tx.send(AsyncStatus::Payload(Err(MeliError::new(format!( - "Search for {} returned {}", - mailbox.query_str.as_str(), - status, - ))))) - .unwrap(); - tx.send(AsyncStatus::Finished).unwrap(); - return; - } - assert!(!messages.is_null()); - let iter = MessageIterator { - messages, - lib: lib.clone(), + let query: Query = + match Query::new(lib.clone(), &database_lck, mailbox.query_str.as_str()) { + Ok(q) => q, + Err(err) => { + tx.send(AsyncStatus::Payload(Err(err))).unwrap(); + tx.send(AsyncStatus::Finished).unwrap(); + return; + } + }; + let iter = match query.search() { + Ok(i) => i, + Err(err) => { + tx.send(AsyncStatus::Payload(Err(err))).unwrap(); + tx.send(AsyncStatus::Finished).unwrap(); + return; + } }; for message in iter { let mut response = String::new(); @@ -375,13 +377,15 @@ impl MailBackend for NotmuchDb { debug!("could not read fs_path {:?} {}", fs_path, e); continue; } - let c_str = unsafe { CStr::from_ptr(fs_path) }; let env_hash = { + let msg_id = unsafe { call!(lib, notmuch_message_get_message_id)(message) }; + let c_str = unsafe { CStr::from_ptr(msg_id) }; let mut hasher = DefaultHasher::default(); c_str.hash(&mut hasher); hasher.finish() }; - index.write().unwrap().insert(env_hash, c_str); + let c_str = unsafe { CStr::from_ptr(fs_path) }; + index.write().unwrap().insert(env_hash, c_str.into()); let op = Box::new(NotmuchOp { database: database.clone(), lib: lib.clone(), @@ -392,10 +396,7 @@ impl MailBackend for NotmuchDb { }); if let Some(mut env) = Envelope::from_token(op, env_hash) { let mut tag_lock = tag_index.write().unwrap(); - for tag in (TagIterator { - tags: unsafe { call!(lib, notmuch_message_get_tags)(message) }, - lib: lib.clone(), - }) { + for tag in TagIterator::new(lib.clone(), message) { let tag = tag.to_string_lossy().into_owned(); let mut hasher = DefaultHasher::new(); @@ -412,9 +413,6 @@ impl MailBackend for NotmuchDb { index.write().unwrap().remove(&env_hash); } } - unsafe { - call!(lib, notmuch_query_destroy)(query); - } tx.send(AsyncStatus::Payload(Ok(ret))).unwrap(); tx.send(AsyncStatus::Finished).unwrap(); }; @@ -425,15 +423,71 @@ impl MailBackend for NotmuchDb { fn watch( &self, - _sender: RefreshEventConsumer, + sender: RefreshEventConsumer, _work_context: WorkContext, ) -> Result { + extern crate notify; + use crate::backends::{RefreshEvent, RefreshEventKind::*}; + use notify::{watcher, RecursiveMode, Watcher}; + let (tx, rx) = std::sync::mpsc::channel(); + let mut watcher = watcher(tx, std::time::Duration::from_secs(2)).unwrap(); + watcher.watch(&self.path, RecursiveMode::Recursive).unwrap(); + let path = self.path.clone(); + let lib = self.lib.clone(); + { + let database = NotmuchDb::new_connection(path.as_path(), lib.clone(), false)?; + let mut revision_uuid_lck = self.revision_uuid.write().unwrap(); + + *revision_uuid_lck = unsafe { + call!(lib, notmuch_database_get_revision)( + *database.inner.read().unwrap(), + std::ptr::null_mut(), + ) + }; + debug!(*revision_uuid_lck); + } + let revision_uuid = self.revision_uuid.clone(); + let handle = std::thread::Builder::new() .name(format!( "watching {}", self.path.file_name().unwrap().to_str().unwrap() )) - .spawn(move || {})?; + .spawn(move || { + let _watcher = watcher; + let c = move || -> std::result::Result<(), MeliError> { + loop { + let _ = rx.recv().map_err(|err| err.to_string())?; + let database = + NotmuchDb::new_connection(path.as_path(), lib.clone(), false)?; + let database_lck = database.inner.read().unwrap(); + let mut revision_uuid_lck = revision_uuid.write().unwrap(); + + let new_revision = unsafe { + call!(lib, notmuch_database_get_revision)( + *database_lck, + std::ptr::null_mut(), + ) + }; + if new_revision > *revision_uuid_lck { + debug!(new_revision); + debug!(*revision_uuid_lck); + let query_str = + format!("lastmod:{}..{}", *revision_uuid_lck, new_revision); + let query: Query = Query::new(lib.clone(), &database_lck, &query_str)?; + let iter = query.search()?; + *revision_uuid_lck = new_revision; + } + } + }; + + if let Err(err) = c() { + sender.send(RefreshEvent { + hash: 0, + kind: Failure(err.into()), + }); + } + })?; Ok(handle.thread().id()) } fn mailboxes(&self) -> Result> { @@ -447,7 +501,9 @@ impl MailBackend for NotmuchDb { } fn operation(&self, hash: EnvelopeHash) -> Box { Box::new(NotmuchOp { - database: Arc::new(self.new_connection(true).unwrap()), + database: Arc::new( + Self::new_connection(self.path.as_path(), self.lib.clone(), true).unwrap(), + ), lib: self.lib.clone(), hash, index: self.index.clone(), @@ -477,7 +533,7 @@ impl MailBackend for NotmuchDb { #[derive(Debug)] struct NotmuchOp { hash: EnvelopeHash, - index: Arc>>, + index: Arc>>, tag_index: Arc>>, database: Arc, bytes: Option, @@ -491,7 +547,7 @@ impl BackendOp for NotmuchOp { fn as_bytes(&mut self) -> Result<&[u8]> { let path = &self.index.read().unwrap()[&self.hash]; - let mut f = std::fs::File::open(path.to_str().unwrap())?; + let mut f = std::fs::File::open(&OsStr::from_bytes(path.to_bytes()))?; let mut response = String::new(); f.read_to_string(&mut response)?; self.bytes = Some(response); @@ -500,22 +556,21 @@ impl BackendOp for NotmuchOp { fn fetch_flags(&self) -> Flag { let mut flag = Flag::default(); - let path = self.index.read().unwrap()[&self.hash].to_str().unwrap(); - if !path.contains(":2,") { - return flag; - } - - for f in path.chars().rev() { + let path = &self.index.read().unwrap()[&self.hash]; + for f in path.to_bytes().iter().rev() { match f { - ',' => break, - 'D' => flag |= Flag::DRAFT, - 'F' => flag |= Flag::FLAGGED, - 'P' => flag |= Flag::PASSED, - 'R' => flag |= Flag::REPLIED, - 'S' => flag |= Flag::SEEN, - 'T' => flag |= Flag::TRASHED, + b',' => break, + b'D' => flag |= Flag::DRAFT, + b'F' => flag |= Flag::FLAGGED, + b'P' => flag |= Flag::PASSED, + b'R' => flag |= Flag::REPLIED, + b'S' => flag |= Flag::SEEN, + b'T' => flag |= Flag::TRASHED, _ => { - debug!("DEBUG: in fetch_flags, path is {}", path); + debug!( + "DEBUG: in fetch_flags, unknown flag '{}' path is {:?}", + f, path + ); } } } @@ -540,11 +595,8 @@ impl BackendOp for NotmuchOp { ))); } - let tags = (TagIterator { - tags: unsafe { call!(self.lib, notmuch_message_get_tags)(message) }, - lib: self.lib.clone(), - }) - .collect::>(); + // TODO: freeze/thaw message. + let tags = TagIterator::new(self.lib.clone(), message).collect::>(); debug!(&tags); macro_rules! cstr { @@ -553,32 +605,34 @@ impl BackendOp for NotmuchOp { }; } macro_rules! add_tag { - ($l:literal) => { - unsafe { - if tags.contains(&cstr!($l)) { - return Ok(()); - } - if call!(self.lib, notmuch_message_add_tag)(message, cstr!($l).as_ptr()) - != _notmuch_status_NOTMUCH_STATUS_SUCCESS - { - return Err(MeliError::new("Could not set tag.")); - } + ($l:literal) => {{ + if tags.contains(unsafe { &cstr!($l) }) { + return Ok(()); } - }; + if let Err(err) = try_call!( + self.lib, + call!(self.lib, notmuch_message_add_tag)(message, cstr!($l).as_ptr()) + ) { + return Err( + MeliError::new("Could not set tag.").set_source(Some(Arc::new(err))) + ); + } + }}; } macro_rules! remove_tag { - ($l:literal) => { - unsafe { - if !tags.contains(&cstr!($l)) { - return Ok(()); - } - if call!(self.lib, notmuch_message_remove_tag)(message, cstr!($l).as_ptr()) - != _notmuch_status_NOTMUCH_STATUS_SUCCESS - { - return Err(MeliError::new("Could not set tag.")); - } + ($l:literal) => {{ + if !tags.contains(unsafe { &cstr!($l) }) { + return Ok(()); } - }; + if let Err(err) = try_call!( + self.lib, + call!(self.lib, notmuch_message_remove_tag)(message, cstr!($l).as_ptr()) + ) { + return Err( + MeliError::new("Could not set tag.").set_source(Some(Arc::new(err))) + ); + } + }}; } match f { @@ -598,23 +652,18 @@ impl BackendOp for NotmuchOp { } /* Update message filesystem path. */ - if unsafe { call!(self.lib, notmuch_message_tags_to_maildir_flags)(message) } - != _notmuch_status_NOTMUCH_STATUS_SUCCESS - { - return Err(MeliError::new("Could not set tag.")); + if let Err(err) = try_call!( + self.lib, + call!(self.lib, notmuch_message_tags_to_maildir_flags)(message) + ) { + return Err(MeliError::new("Could not set tag.").set_source(Some(Arc::new(err)))); } let fs_path = unsafe { call!(self.lib, notmuch_message_get_filename)(message) }; let c_str = unsafe { CStr::from_ptr(fs_path) }; if let Some(p) = index_lck.get_mut(&self.hash) { - *p = c_str; + *p = c_str.into(); } - let new_hash = { - let mut hasher = DefaultHasher::default(); - c_str.hash(&mut hasher); - hasher.finish() - }; - index_lck.insert(new_hash, c_str); Ok(()) } @@ -636,25 +685,25 @@ impl BackendOp for NotmuchOp { ))); } if value { - if unsafe { + if let Err(err) = try_call!( + self.lib, call!(self.lib, notmuch_message_add_tag)( message, CString::new(tag.as_str()).unwrap().as_ptr(), ) - } != _notmuch_status_NOTMUCH_STATUS_SUCCESS - { - return Err(MeliError::new("Could not set tag.")); + ) { + return Err(MeliError::new("Could not set tag.").set_source(Some(Arc::new(err)))); } debug!("added tag {}", &tag); } else { - if unsafe { + if let Err(err) = try_call!( + self.lib, call!(self.lib, notmuch_message_remove_tag)( message, CString::new(tag.as_str()).unwrap().as_ptr(), ) - } != _notmuch_status_NOTMUCH_STATUS_SUCCESS - { - return Err(MeliError::new("Could not set tag.")); + ) { + return Err(MeliError::new("Could not set tag.").set_source(Some(Arc::new(err)))); } debug!("removed tag {}", &tag); } @@ -706,6 +755,15 @@ pub struct TagIterator { tags: *mut notmuch_tags_t, } +impl TagIterator { + fn new(lib: Arc, message: *mut notmuch_message_t) -> Self { + TagIterator { + tags: unsafe { call!(lib, notmuch_message_get_tags)(message) }, + lib, + } + } +} + impl Iterator for TagIterator { type Item = &'static CStr; fn next(&mut self) -> Option { @@ -723,3 +781,55 @@ impl Iterator for TagIterator { } } } + +pub struct Query<'s> { + lib: Arc, + ptr: *mut notmuch_query_t, + query_str: &'s str, +} + +impl<'s> Query<'s> { + fn new( + lib: Arc, + database: &*mut notmuch_database_t, + query_str: &'s str, + ) -> Result { + let query_cstr = std::ffi::CString::new(query_str)?; + let query: *mut notmuch_query_t = + unsafe { call!(lib, notmuch_query_create)(*database, query_cstr.as_ptr()) }; + if query.is_null() { + return Err(MeliError::new("Could not create query. Out of memory?")); + } + Ok(Query { + lib, + ptr: query, + query_str, + }) + } + + fn search(&self) -> Result { + let mut messages: *mut notmuch_messages_t = std::ptr::null_mut(); + let status = unsafe { + call!(self.lib, notmuch_query_search_messages)(self.ptr, &mut messages as *mut _) + }; + if status != 0 { + return Err(MeliError::new(format!( + "Search for {} returned {}", + self.query_str, status, + ))); + } + assert!(!messages.is_null()); + Ok(MessageIterator { + messages, + lib: self.lib.clone(), + }) + } +} + +impl Drop for Query<'_> { + fn drop(&mut self) { + unsafe { + call!(self.lib, notmuch_query_destroy)(self.ptr); + } + } +}