From b2857955e49d02cbd4dfcbb3366c360a776a33ad Mon Sep 17 00:00:00 2001 From: Manos Pitsidianakis Date: Sun, 10 May 2020 22:28:21 +0300 Subject: [PATCH] notmuch: add NewFlags, Remove and Create events --- melib/src/backends/notmuch.rs | 405 +++++++++++++++++++++++++--------- 1 file changed, 306 insertions(+), 99 deletions(-) diff --git a/melib/src/backends/notmuch.rs b/melib/src/backends/notmuch.rs index 549eb3556..9023c23e2 100644 --- a/melib/src/backends/notmuch.rs +++ b/melib/src/backends/notmuch.rs @@ -107,6 +107,7 @@ pub struct NotmuchDb { revision_uuid: Arc>, mailboxes: Arc>>, index: Arc>>, + mailbox_index: Arc>>>, tag_index: Arc>>, path: PathBuf, account_name: String, @@ -230,11 +231,13 @@ impl NotmuchDb { ))); } } + Ok(Box::new(NotmuchDb { lib, revision_uuid: Arc::new(RwLock::new(0)), path, index: Arc::new(RwLock::new(Default::default())), + mailbox_index: Arc::new(RwLock::new(Default::default())), tag_index: Arc::new(RwLock::new(Default::default())), mailboxes: Arc::new(RwLock::new(mailboxes)), @@ -327,6 +330,7 @@ impl MailBackend for NotmuchDb { let mailbox_hash = mailbox.hash(); let database = NotmuchDb::new_connection(self.path.as_path(), self.lib.clone(), false); let index = self.index.clone(); + let mailbox_index = self.mailbox_index.clone(); let tag_index = self.tag_index.clone(); let mailboxes = self.mailboxes.clone(); let lib = self.lib.clone(); @@ -360,60 +364,25 @@ impl MailBackend for NotmuchDb { return; } }; + let mut mailbox_index_lck = mailbox_index.write().unwrap(); for message in iter { - let mut response = String::new(); - let fs_path = unsafe { call!(lib, notmuch_message_get_filename)(message) }; - let mut f = match std::fs::File::open(unsafe { - CStr::from_ptr(fs_path) - .to_string_lossy() - .into_owned() - .as_str() - }) { - Ok(f) => f, - Err(e) => { - debug!("could not open fs_path {:?} {}", fs_path, e); - continue; + match notmuch_message_into_envelope( + lib.clone(), + index.clone(), + tag_index.clone(), + database.clone(), + message, + ) { + Ok(env) => { + mailbox_index_lck + .entry(env.hash()) + .or_default() + .push(mailbox_hash); + ret.push(env); } - }; - response.clear(); - if let Err(e) = f.read_to_string(&mut response) { - debug!("could not read fs_path {:?} {}", fs_path, e); - continue; - } - let env_hash = { - let msg_id = unsafe { call!(lib, notmuch_message_get_message_id)(message) }; - let c_str = unsafe { CStr::from_ptr(msg_id) }; - let mut hasher = DefaultHasher::default(); - c_str.hash(&mut hasher); - hasher.finish() - }; - let c_str = unsafe { CStr::from_ptr(fs_path) }; - index.write().unwrap().insert(env_hash, c_str.into()); - let op = Box::new(NotmuchOp { - database: database.clone(), - lib: lib.clone(), - hash: env_hash, - index: index.clone(), - bytes: Some(response), - tag_index: tag_index.clone(), - }); - if let Some(mut env) = Envelope::from_token(op, env_hash) { - let mut tag_lock = tag_index.write().unwrap(); - for tag in TagIterator::new(lib.clone(), message) { - let tag = tag.to_string_lossy().into_owned(); - - let mut hasher = DefaultHasher::new(); - hasher.write(tag.as_bytes()); - let num = hasher.finish(); - if !tag_lock.contains_key(&num) { - tag_lock.insert(num, tag); - } - env.labels_mut().push(num); + Err(err) => { + debug!("could not parse message {:?}", err); } - ret.push(env); - } else { - debug!("could not parse path {:?}", c_str); - index.write().unwrap().remove(&env_hash); } } tx.send(AsyncStatus::Payload(Ok(ret))).unwrap(); @@ -437,11 +406,15 @@ impl MailBackend for NotmuchDb { watcher.watch(&self.path, RecursiveMode::Recursive).unwrap(); let path = self.path.clone(); let lib = self.lib.clone(); + let tag_index = self.tag_index.clone(); + let index = self.index.clone(); let account_hash = { let mut hasher = DefaultHasher::new(); hasher.write(self.account_name.as_bytes()); hasher.finish() }; + let mailbox_index = self.mailbox_index.clone(); + let mailboxes = self.mailboxes.clone(); { let database = NotmuchDb::new_connection(path.as_path(), lib.clone(), false)?; let mut revision_uuid_lck = self.revision_uuid.write().unwrap(); @@ -452,7 +425,6 @@ impl MailBackend for NotmuchDb { std::ptr::null_mut(), ) }; - debug!(*revision_uuid_lck); } let revision_uuid = self.revision_uuid.clone(); @@ -460,33 +432,137 @@ impl MailBackend for NotmuchDb { .name(format!("watching {}", self.account_name)) .spawn(move || { let _watcher = watcher; - let c = move || -> std::result::Result<(), MeliError> { + let c = move |sender: &RefreshEventConsumer| -> std::result::Result<(), MeliError> { loop { let _ = rx.recv().map_err(|err| err.to_string())?; - let database = - NotmuchDb::new_connection(path.as_path(), lib.clone(), false)?; - let database_lck = database.inner.read().unwrap(); - let mut revision_uuid_lck = revision_uuid.write().unwrap(); + { + let database = + NotmuchDb::new_connection(path.as_path(), lib.clone(), false)?; + let database_lck = database.inner.read().unwrap(); + let mut revision_uuid_lck = revision_uuid.write().unwrap(); - let new_revision = unsafe { - call!(lib, notmuch_database_get_revision)( - *database_lck, - std::ptr::null_mut(), - ) - }; - if new_revision > *revision_uuid_lck { - debug!(new_revision); - debug!(*revision_uuid_lck); - let query_str = - format!("lastmod:{}..{}", *revision_uuid_lck, new_revision); - let query: Query = Query::new(lib.clone(), &database_lck, &query_str)?; - let iter = query.search()?; - *revision_uuid_lck = new_revision; + let new_revision = unsafe { + call!(lib, notmuch_database_get_revision)( + *database_lck, + std::ptr::null_mut(), + ) + }; + if new_revision > *revision_uuid_lck { + let query_str = + format!("lastmod:{}..{}", *revision_uuid_lck, new_revision); + let query: Query = + Query::new(lib.clone(), &database_lck, &query_str)?; + drop(database_lck); + let iter = query.search()?; + let mut tag_lock = tag_index.write().unwrap(); + let mailbox_index_lck = mailbox_index.write().unwrap(); + let mailboxes_lck = mailboxes.read().unwrap(); + let database = Arc::new(database); + for message in iter { + let msg_id = unsafe { + call!(lib, notmuch_message_get_message_id)(message) + }; + let c_str = unsafe { CStr::from_ptr(msg_id) }; + let env_hash = { + let mut hasher = DefaultHasher::default(); + c_str.hash(&mut hasher); + hasher.finish() + }; + if let Some(mailbox_hashes) = mailbox_index_lck.get(&env_hash) { + let tags: (Flag, Vec) = + TagIterator::new(lib.clone(), message) + .collect_flags_and_tags(); + for tag in tags.1.iter() { + let mut hasher = DefaultHasher::new(); + hasher.write(tag.as_bytes()); + let num = hasher.finish(); + if !tag_lock.contains_key(&num) { + tag_lock.insert(num, tag.clone()); + } + } + for &mailbox_hash in mailbox_hashes { + sender.send(RefreshEvent { + account_hash, + mailbox_hash, + kind: NewFlags(env_hash, tags.clone()), + }); + } + } else { + match notmuch_message_into_envelope( + lib.clone(), + index.clone(), + tag_index.clone(), + database.clone(), + message, + ) { + Ok(env) => { + for (&mailbox_hash, m) in mailboxes_lck.iter() { + let query_str = format!( + "{} id:{}", + m.query_str.as_str(), + c_str.to_string_lossy() + ); + let database_lck = + database.inner.read().unwrap(); + let query: Query = Query::new( + lib.clone(), + &database_lck, + &query_str, + )?; + if query.count().unwrap_or(0) > 0 { + sender.send(RefreshEvent { + account_hash, + mailbox_hash, + kind: Create(Box::new(env.clone())), + }); + } + } + } + Err(err) => { + debug!("could not parse message {:?}", err); + } + } + } + } + drop(query); + let database_lck = database.inner.read().unwrap(); + index.write().unwrap().retain(|&env_hash, msg_id| { + let mut message: *mut notmuch_message_t = std::ptr::null_mut(); + if let Err(err) = try_call!( + lib, + call!(lib, notmuch_database_find_message)( + *database_lck, + msg_id.as_ptr(), + &mut message as *mut _, + ) + ) { + debug!(err); + false + } else { + if message.is_null() { + if let Some(mailbox_hashes) = + mailbox_index_lck.get(&env_hash) + { + for &mailbox_hash in mailbox_hashes { + sender.send(RefreshEvent { + account_hash, + mailbox_hash, + kind: Remove(env_hash), + }); + } + } + } + !message.is_null() + } + }); + + *revision_uuid_lck = new_revision; + } } } }; - if let Err(err) = c() { + if let Err(err) = c(&sender) { sender.send(RefreshEvent { account_hash, mailbox_hash: 0, @@ -552,8 +628,18 @@ impl BackendOp for NotmuchOp { } fn as_bytes(&mut self) -> Result<&[u8]> { - let path = &self.index.read().unwrap()[&self.hash]; - let mut f = std::fs::File::open(&OsStr::from_bytes(path.to_bytes()))?; + let mut message: *mut notmuch_message_t = std::ptr::null_mut(); + let index_lck = self.index.write().unwrap(); + unsafe { + call!(self.lib, notmuch_database_find_message)( + *self.database.inner.read().unwrap(), + index_lck[&self.hash].as_ptr(), + &mut message as *mut _, + ) + }; + let fs_path = unsafe { call!(self.lib, notmuch_message_get_filename)(message) }; + let c_str = unsafe { CStr::from_ptr(fs_path) }; + let mut f = std::fs::File::open(&OsStr::from_bytes(c_str.to_bytes()))?; let mut response = String::new(); f.read_to_string(&mut response)?; self.bytes = Some(response); @@ -561,34 +647,24 @@ impl BackendOp for NotmuchOp { } fn fetch_flags(&self) -> Flag { - let mut flag = Flag::default(); - let path = &self.index.read().unwrap()[&self.hash]; - for f in path.to_bytes().iter().rev() { - match f { - b',' => break, - b'D' => flag |= Flag::DRAFT, - b'F' => flag |= Flag::FLAGGED, - b'P' => flag |= Flag::PASSED, - b'R' => flag |= Flag::REPLIED, - b'S' => flag |= Flag::SEEN, - b'T' => flag |= Flag::TRASHED, - _ => { - debug!( - "DEBUG: in fetch_flags, unknown flag '{}' path is {:?}", - f, path - ); - } - } - } - - flag + let mut message: *mut notmuch_message_t = std::ptr::null_mut(); + let index_lck = self.index.write().unwrap(); + unsafe { + call!(self.lib, notmuch_database_find_message)( + *self.database.inner.read().unwrap(), + index_lck[&self.hash].as_ptr(), + &mut message as *mut _, + ) + }; + let (flags, _tags) = TagIterator::new(self.lib.clone(), message).collect_flags_and_tags(); + flags } fn set_flag(&mut self, _envelope: &mut Envelope, f: Flag, value: bool) -> Result<()> { let mut message: *mut notmuch_message_t = std::ptr::null_mut(); let mut index_lck = self.index.write().unwrap(); unsafe { - call!(self.lib, notmuch_database_find_message_by_filename)( + call!(self.lib, notmuch_database_find_message)( *self.database.inner.read().unwrap(), index_lck[&self.hash].as_ptr(), &mut message as *mut _, @@ -665,8 +741,8 @@ impl BackendOp for NotmuchOp { return Err(MeliError::new("Could not set tag.").set_source(Some(Arc::new(err)))); } - let fs_path = unsafe { call!(self.lib, notmuch_message_get_filename)(message) }; - let c_str = unsafe { CStr::from_ptr(fs_path) }; + let msg_id = unsafe { call!(self.lib, notmuch_message_get_message_id)(message) }; + let c_str = unsafe { CStr::from_ptr(msg_id) }; if let Some(p) = index_lck.get_mut(&self.hash) { *p = c_str.into(); } @@ -678,7 +754,7 @@ impl BackendOp for NotmuchOp { let mut message: *mut notmuch_message_t = std::ptr::null_mut(); let index_lck = self.index.read().unwrap(); unsafe { - call!(self.lib, notmuch_database_find_message_by_filename)( + call!(self.lib, notmuch_database_find_message)( *self.database.inner.read().unwrap(), index_lck[&self.hash].as_ptr(), &mut message as *mut _, @@ -759,6 +835,7 @@ impl Iterator for MessageIterator { pub struct TagIterator { lib: Arc, tags: *mut notmuch_tags_t, + message: *mut notmuch_message_t, } impl TagIterator { @@ -766,8 +843,75 @@ impl TagIterator { TagIterator { tags: unsafe { call!(lib, notmuch_message_get_tags)(message) }, lib, + message, } } + + fn collect_flags_and_tags(self) -> (Flag, Vec) { + fn flags(path: &CStr) -> Flag { + let mut flag = Flag::default(); + let mut ptr = path.to_bytes().len().saturating_sub(1); + let mut is_valid = true; + while !path.to_bytes()[..ptr + 1].ends_with(b":2,") { + match path.to_bytes()[ptr] { + b'D' => flag |= Flag::DRAFT, + b'F' => flag |= Flag::FLAGGED, + b'P' => flag |= Flag::PASSED, + b'R' => flag |= Flag::REPLIED, + b'S' => flag |= Flag::SEEN, + b'T' => flag |= Flag::TRASHED, + _ => { + is_valid = false; + break; + } + } + if ptr == 0 { + is_valid = false; + break; + } + ptr -= 1; + } + + if !is_valid { + return Flag::default(); + } + + flag + } + let fs_path = unsafe { call!(self.lib, notmuch_message_get_filename)(self.message) }; + let c_str = unsafe { CStr::from_ptr(fs_path) }; + + let tags = self.collect::>(); + let mut flag = Flag::default(); + let mut vec = vec![]; + for t in tags { + match t.to_bytes() { + b"draft" => { + flag.set(Flag::DRAFT, true); + } + b"flagged" => { + flag.set(Flag::FLAGGED, true); + } + b"passed" => { + flag.set(Flag::PASSED, true); + } + b"replied" => { + flag.set(Flag::REPLIED, true); + } + b"unread" => { + flag.set(Flag::SEEN, false); + } + b"trashed" => { + flag.set(Flag::TRASHED, true); + } + _other => { + vec.push(t.to_string_lossy().into_owned()); + } + } + } + + (flag | flags(c_str), vec) + } } impl Iterator for TagIterator { @@ -813,6 +957,16 @@ impl<'s> Query<'s> { }) } + fn count(&self) -> Result { + let mut count = 0_u32; + try_call!( + self.lib, + call!(self.lib, notmuch_query_count_messages)(self.ptr, &mut count as *mut _) + ) + .map_err(|err| err.0)?; + Ok(count) + } + fn search(&self) -> Result { let mut messages: *mut notmuch_messages_t = std::ptr::null_mut(); let status = unsafe { @@ -839,3 +993,56 @@ impl Drop for Query<'_> { } } } + +fn notmuch_message_into_envelope( + lib: Arc, + index: Arc>>, + tag_index: Arc>>, + database: Arc, + message: *mut notmuch_message_t, +) -> Result { + let mut response = String::new(); + let fs_path = unsafe { call!(lib, notmuch_message_get_filename)(message) }; + let c_str = unsafe { CStr::from_ptr(fs_path) }; + let mut f = std::fs::File::open(&OsStr::from_bytes(c_str.to_bytes()))?; + f.read_to_string(&mut response)?; + let msg_id = unsafe { call!(lib, notmuch_message_get_message_id)(message) }; + let env_hash = { + let c_str = unsafe { CStr::from_ptr(msg_id) }; + let mut hasher = DefaultHasher::default(); + c_str.hash(&mut hasher); + hasher.finish() + }; + { + let c_str = unsafe { CStr::from_ptr(msg_id) }; + index.write().unwrap().insert(env_hash, c_str.into()); + } + let op = Box::new(NotmuchOp { + database, + lib: lib.clone(), + hash: env_hash, + index: index.clone(), + bytes: Some(response), + tag_index: tag_index.clone(), + }); + Envelope::from_token(op, env_hash) + .map(|mut env| { + let mut tag_lock = tag_index.write().unwrap(); + let (flags, tags) = TagIterator::new(lib.clone(), message).collect_flags_and_tags(); + for tag in tags { + let mut hasher = DefaultHasher::new(); + hasher.write(tag.as_bytes()); + let num = hasher.finish(); + if !tag_lock.contains_key(&num) { + tag_lock.insert(num, tag); + } + env.labels_mut().push(num); + } + env.set_flags(flags); + env + }) + .ok_or_else(|| { + index.write().unwrap().remove(&env_hash); + format!("could not parse path {:?}", c_str).into() + }) +}