Browse Source

lazy_fetch WIP

lazy_fetch
Manos Pitsidianakis 11 months ago
parent
commit
77e4488637
Signed by untrusted user: epilys GPG Key ID: 73627C2F690DF710
  1. 10
      melib/src/backends.rs
  2. 141
      melib/src/backends/jmap.rs
  3. 76
      melib/src/backends/notmuch.rs
  4. 17
      melib/src/backends/notmuch/message.rs
  5. 2
      src/bin.rs
  6. 1
      src/components/mail.rs
  7. 4074
      src/components/mail/listing2.rs
  8. 159
      src/conf/accounts.rs
  9. 2
      src/state.rs
  10. 7
      src/types.rs

10
melib/src/backends.rs

@ -310,6 +310,14 @@ pub trait MailBackend: ::std::fmt::Debug + Send + Sync {
Ok(Box::pin(async { Ok(()) }))
}
fn fetch_batch(&mut self, env_hashes: EnvelopeHashBatch) -> ResultFuture<()> {
Err(MeliError::new("Unimplemented."))
}
fn load(&mut self, mailbox_hash: MailboxHash) -> ResultFuture<()> {
Err(MeliError::new("Unimplemented."))
}
fn fetch(
&mut self,
mailbox_hash: MailboxHash,
@ -628,7 +636,7 @@ impl EnvelopeHashBatch {
#[derive(Default, Clone)]
pub struct LazyCountSet {
not_yet_seen: usize,
set: BTreeSet<EnvelopeHash>,
pub set: BTreeSet<EnvelopeHash>,
}
impl fmt::Debug for LazyCountSet {

141
melib/src/backends/jmap.rs

@ -315,6 +315,147 @@ impl MailBackend for JmapType {
}))
}
fn load(&mut self, mailbox_hash: MailboxHash) -> ResultFuture<()> {
let store = self.store.clone();
let connection = self.connection.clone();
Ok(Box::pin(async move {
{
crate::connections::sleep(std::time::Duration::from_secs(2)).await;
}
let mailbox_id = store.mailboxes.read().unwrap()[&mailbox_hash].id.clone();
let email_query_call: EmailQuery = EmailQuery::new(
Query::new()
.account_id(conn.mail_account_id().clone())
.filter(Some(Filter::Condition(
EmailFilterCondition::new()
.in_mailbox(Some(mailbox_id))
.into(),
)))
.position(0)
.properties(Some(vec![
"id".to_string(),
"receivedAt".to_string(),
"messageId".to_string(),
"inReplyTo".to_string(),
"hasAttachment".to_string(),
"keywords".to_string(),
])),
)
.collapse_threads(false);
let mut req = Request::new(conn.request_no.clone());
let prev_seq = req.add_call(&email_query_call);
let email_call: EmailGet = EmailGet::new(
Get::new()
.ids(Some(JmapArgument::reference(
prev_seq,
EmailQuery::RESULT_FIELD_IDS,
)))
.account_id(conn.mail_account_id().clone()),
);
req.add_call(&email_call);
let api_url = conn.session.lock().unwrap().api_url.clone();
let mut res = conn
.client
.post_async(api_url.as_str(), serde_json::to_string(&req)?)
.await?;
let res_text = res.text_async().await?;
let mut v: MethodResponse = serde_json::from_str(&res_text).unwrap();
let e = GetResponse::<EmailObject>::try_from(v.method_responses.pop().unwrap())?;
let query_response =
QueryResponse::<EmailObject>::try_from(v.method_responses.pop().unwrap())?;
store
.mailboxes
.write()
.unwrap()
.entry(mailbox_hash)
.and_modify(|mbox| {
*mbox.email_query_state.lock().unwrap() = Some(query_response.query_state);
});
let GetResponse::<EmailObject> { list, state, .. } = e;
{
let (is_empty, is_equal) = {
let mailboxes_lck = conn.store.mailboxes.read().unwrap();
mailboxes_lck
.get(&mailbox_hash)
.map(|mbox| {
let current_state_lck = mbox.email_state.lock().unwrap();
(
current_state_lck.is_none(),
current_state_lck.as_ref() != Some(&state),
)
})
.unwrap_or((true, true))
};
if is_empty {
let mut mailboxes_lck = conn.store.mailboxes.write().unwrap();
debug!("{:?}: inserting state {}", EmailObject::NAME, &state);
mailboxes_lck.entry(mailbox_hash).and_modify(|mbox| {
*mbox.email_state.lock().unwrap() = Some(state);
});
} else if !is_equal {
conn.email_changes(mailbox_hash).await?;
}
}
let mut total = BTreeSet::default();
let mut unread = BTreeSet::default();
let new_envelopes: HashMap<EnvelopeHash, Envelope> = list
.into_iter(|obj| {
let env = store.add_envelope(obj);
total.insert(env.hash());
if !env.is_seen() {
unread.insert(env.hash());
}
(env.hash(), env)
})
.collect();
let mut mailboxes_lck = store.mailboxes.write().unwrap();
mailboxes_lck.entry(mailbox_hash).and_modify(|mbox| {
mbox.total_emails.lock().unwrap().insert_existing_set(total);
mbox.unread_emails
.lock()
.unwrap()
.insert_existing_set(unread);
});
let keys: BTreeSet<EnvelopeHash> = new_envelopes.keys().cloned().collect();
collection.merge(new_envelopes, mailbox_hash, None);
let mut envelopes_lck = collection.envelopes.write().unwrap();
envelopes_lck.retain(|k, _| !keys.contains(k));
Ok(())
}))
}
fn fetch_batch(&mut self, env_hashes: EnvelopeHashBatch) -> ResultFuture<()> {
todo!()
/*
let store = self.store.clone();
let connection = self.connection.clone();
Ok(Box::pin(async move {
//crate::connections::sleep(std::time::Duration::from_secs(2)).await;
debug!("fetch_batch {:?}", &env_hashes);
let mut envelopes_lck = collection.envelopes.write().unwrap();
for env_hash in env_hashes.iter() {
if envelopes_lck.contains_key(&env_hash) {
continue;
}
let index_lck = index.write().unwrap();
let message = Message::find_message(&database, &index_lck[&env_hash])?;
drop(index_lck);
let env = message.into_envelope(&index, &collection.tag_index);
envelopes_lck.insert(env_hash, env);
}
debug!("fetch_batch {:?} done", &env_hashes);
Ok(())
}))
*/
}
fn fetch(
&mut self,
mailbox_hash: MailboxHash,

76
melib/src/backends/notmuch.rs

@ -449,6 +449,82 @@ impl MailBackend for NotmuchDb {
Ok(Box::pin(async { Ok(()) }))
}
fn load(&mut self, mailbox_hash: MailboxHash) -> ResultFuture<()> {
let database = NotmuchDb::new_connection(
self.path.as_path(),
self.revision_uuid.clone(),
self.lib.clone(),
false,
)?;
let mailboxes = self.mailboxes.clone();
let index = self.index.clone();
let mailbox_index = self.mailbox_index.clone();
let collection = self.collection.clone();
Ok(Box::pin(async move {
{
//crate::connections::sleep(std::time::Duration::from_secs(2)).await;
}
let mailboxes_lck = mailboxes.read().unwrap();
let mailbox = mailboxes_lck.get(&mailbox_hash).unwrap();
let query: Query = Query::new(&database, mailbox.query_str.as_str())?;
let mut unseen_total = 0;
let mut mailbox_index_lck = mailbox_index.write().unwrap();
let new_envelopes: HashMap<EnvelopeHash, Envelope> = query
.search()?
.into_iter()
.map(|m| {
let env = m.into_envelope(&index, &collection.tag_index);
mailbox_index_lck
.entry(env.hash())
.or_default()
.push(mailbox_hash);
if !env.is_seen() {
unseen_total += 1;
}
(env.hash(), env)
})
.collect();
{
let mut total_lck = mailbox.total.lock().unwrap();
let mut unseen_lck = mailbox.unseen.lock().unwrap();
*total_lck = new_envelopes.len();
*unseen_lck = unseen_total;
}
collection.merge(new_envelopes, mailbox_hash, None);
let mut envelopes_lck = collection.envelopes.write().unwrap();
envelopes_lck.retain(|&k, _| k % 2 == 0);
Ok(())
}))
}
fn fetch_batch(&mut self, env_hashes: EnvelopeHashBatch) -> ResultFuture<()> {
let database = NotmuchDb::new_connection(
self.path.as_path(),
self.revision_uuid.clone(),
self.lib.clone(),
false,
)?;
let index = self.index.clone();
let collection = self.collection.clone();
Ok(Box::pin(async move {
//crate::connections::sleep(std::time::Duration::from_secs(2)).await;
debug!("fetch_batch {:?}", &env_hashes);
let mut envelopes_lck = collection.envelopes.write().unwrap();
for env_hash in env_hashes.iter() {
if envelopes_lck.contains_key(&env_hash) {
continue;
}
let index_lck = index.write().unwrap();
let message = Message::find_message(&database, &index_lck[&env_hash])?;
drop(index_lck);
let env = message.into_envelope(&index, &collection.tag_index);
envelopes_lck.insert(env_hash, env);
}
debug!("fetch_batch {:?} done", &env_hashes);
Ok(())
}))
}
fn fetch(
&mut self,
mailbox_hash: MailboxHash,

17
melib/src/backends/notmuch/message.rs

@ -20,7 +20,6 @@
*/
use super::*;
use crate::thread::{ThreadHash, ThreadNode, ThreadNodeHash};
#[derive(Clone)]
pub struct Message<'m> {
@ -188,22 +187,6 @@ impl<'m> Message<'m> {
}
}
pub fn into_thread_node(&self) -> (ThreadNodeHash, ThreadNode) {
(
ThreadNodeHash::from(self.msg_id()),
ThreadNode {
message: Some(self.env_hash()),
parent: None,
other_mailbox: false,
children: vec![],
date: self.date(),
show_subject: true,
group: ThreadHash::new(),
unseen: false,
},
)
}
pub fn add_tag(&self, tag: &CStr) -> Result<()> {
if let Err(err) = unsafe {
try_call!(

2
src/bin.rs

@ -348,7 +348,7 @@ fn run_app(opt: Opt) -> Result<()> {
state.register_component(Box::new(components::svg::SVGScreenshotFilter::new()));
let window = Box::new(Tabbed::new(
vec![
Box::new(listing::Listing::new(&mut state.context)),
Box::new(listing2::Listing::new(&mut state.context)),
Box::new(ContactList::new(&state.context)),
],
&state.context,

1
src/components/mail.rs

@ -27,6 +27,7 @@ use melib::email::{attachment_types::*, attachments::*};
use melib::thread::ThreadNodeHash;
pub mod listing;
pub mod listing2;
pub use crate::listing::*;
pub mod view;
pub use crate::view::*;

4074
src/components/mail/listing2.rs
File diff suppressed because it is too large
View File

159
src/conf/accounts.rs

@ -163,6 +163,13 @@ pub enum JobRequest {
Mailboxes {
handle: JoinHandle<Result<HashMap<MailboxHash, Mailbox>>>,
},
Load {
mailbox_hash: MailboxHash,
handle: JoinHandle<Result<()>>,
},
FetchBatch {
handle: JoinHandle<Result<()>>,
},
Fetch {
mailbox_hash: MailboxHash,
handle: JoinHandle<(
@ -237,6 +244,8 @@ impl Drop for JobRequest {
match self {
JobRequest::Generic { handle, .. } |
JobRequest::IsOnline { handle, .. } |
JobRequest::Load { handle, .. } |
JobRequest::FetchBatch { handle, .. } |
JobRequest::Refresh { handle, .. } |
JobRequest::SetFlags { handle, .. } |
JobRequest::SaveMessage { handle, .. } |
@ -275,6 +284,7 @@ impl core::fmt::Debug for JobRequest {
match self {
JobRequest::Generic { name, .. } => write!(f, "JobRequest::Generic({})", name),
JobRequest::Mailboxes { .. } => write!(f, "JobRequest::Mailboxes"),
JobRequest::FetchBatch { .. } => write!(f, "JobRequest::FetchBatch",),
JobRequest::Fetch { mailbox_hash, .. } => {
write!(f, "JobRequest::Fetch({})", mailbox_hash)
}
@ -302,6 +312,9 @@ impl core::fmt::Debug for JobRequest {
JobRequest::SendMessageBackground { .. } => {
write!(f, "JobRequest::SendMessageBackground")
}
JobRequest::Load { mailbox_hash, .. } => {
write!(f, "JobRequest::Load({})", mailbox_hash)
}
}
}
}
@ -312,6 +325,8 @@ impl core::fmt::Display for JobRequest {
JobRequest::Generic { name, .. } => write!(f, "{}", name),
JobRequest::Mailboxes { .. } => write!(f, "Get mailbox list"),
JobRequest::Fetch { .. } => write!(f, "Mailbox fetch"),
JobRequest::FetchBatch { .. } => write!(f, "Fetch envelopes"),
JobRequest::Load { .. } => write!(f, "Mailbox load"),
JobRequest::IsOnline { .. } => write!(f, "Online status check"),
JobRequest::Refresh { .. } => write!(f, "Refresh mailbox"),
JobRequest::SetFlags { env_hashes, .. } => write!(
@ -351,6 +366,15 @@ impl JobRequest {
}
}
pub fn is_load(&self, mailbox_hash: MailboxHash) -> bool {
match self {
JobRequest::Load {
mailbox_hash: h, ..
} if *h == mailbox_hash => true,
_ => false,
}
}
pub fn is_fetch(&self, mailbox_hash: MailboxHash) -> bool {
match self {
JobRequest::Fetch {
@ -658,8 +682,7 @@ impl Account {
{
let total = entry.ref_mailbox.count().ok().unwrap_or((0, 0)).1;
entry.status = MailboxStatus::Parsing(0, total);
if let Ok(mailbox_job) = self.backend.write().unwrap().fetch(*h) {
let mailbox_job = mailbox_job.into_future();
if let Ok(mailbox_job) = self.backend.write().unwrap().load(*h) {
let handle = if self.backend_capabilities.is_async {
self.job_executor.spawn_specialized(mailbox_job)
} else {
@ -671,9 +694,10 @@ impl Account {
StatusEvent::NewJob(job_id),
)))
.unwrap();
debug!("JobRequest::Load {} {:?}", *h, job_id);
self.active_jobs.insert(
job_id,
JobRequest::Fetch {
JobRequest::Load {
mailbox_hash: *h,
handle,
},
@ -1135,6 +1159,73 @@ impl Account {
self.hash
}
pub fn fetch_batch(&mut self, env_hashes: EnvelopeHashBatch) -> Result<JobId> {
debug!("account fetch_batch {:?}", &env_hashes);
let job = self.backend.write().unwrap().fetch_batch(env_hashes)?;
let handle = if self.backend_capabilities.is_async {
self.job_executor.spawn_specialized(job)
} else {
self.job_executor.spawn_blocking(job)
};
let job_id = handle.job_id;
self.insert_job(handle.job_id, JobRequest::FetchBatch { handle });
Ok(job_id)
}
pub fn load2(&mut self, mailbox_hash: MailboxHash) -> Result<Option<JobId>> {
debug!("account load2({}", mailbox_hash);
match self.mailbox_entries[&mailbox_hash].status {
MailboxStatus::Available => Ok(None),
MailboxStatus::Failed(ref err) => Err(err.clone()),
MailboxStatus::Parsing(_, _) | MailboxStatus::None => {
debug!("load2 find: ");
if let Some(job_id) = self
.active_jobs
.iter()
.find(|(id, j)| {
debug!(id);
debug!(j).is_load(mailbox_hash)
})
.map(|(j, _)| *j)
{
Ok(Some(job_id))
} else {
let mailbox_job = self.backend.write().unwrap().load(mailbox_hash);
match mailbox_job {
Ok(mailbox_job) => {
let handle = if self.backend_capabilities.is_async {
self.job_executor.spawn_specialized(mailbox_job)
} else {
self.job_executor.spawn_blocking(mailbox_job)
};
let job_id = handle.job_id;
debug!("JobRequest::Load {} {:?}", mailbox_hash, handle.job_id);
self.insert_job(
handle.job_id,
JobRequest::Load {
mailbox_hash,
handle,
},
);
Ok(Some(job_id))
}
Err(err) => {
self.mailbox_entries
.entry(mailbox_hash)
.and_modify(|entry| {
entry.status = MailboxStatus::Failed(err.clone());
});
self.sender
.send(ThreadEvent::UIEvent(UIEvent::StartupCheck(mailbox_hash)))
.unwrap();
Err(err)
}
}
}
}
}
}
pub fn load(&mut self, mailbox_hash: MailboxHash) -> result::Result<(), usize> {
if mailbox_hash == 0 {
return Err(0);
@ -1637,6 +1728,57 @@ impl Account {
}
}
}
JobRequest::Load {
mailbox_hash,
ref mut handle,
..
} => {
debug!("got mailbox load for {}", mailbox_hash);
match handle.chan.try_recv() {
Err(_) => {
/* canceled */
return true;
}
Ok(None) => {
return true;
}
Ok(Some(Ok(()))) => {
self.mailbox_entries
.entry(mailbox_hash)
.and_modify(|entry| {
entry.status = MailboxStatus::Available;
});
self.sender
.send(ThreadEvent::UIEvent(UIEvent::MailboxUpdate((
self.hash,
mailbox_hash,
))))
.unwrap();
return true;
}
Ok(Some(Err(err))) => {
self.sender
.send(ThreadEvent::UIEvent(UIEvent::Notification(
Some(format!("{}: could not load mailbox", &self.name)),
err.to_string(),
Some(crate::types::NotificationType::Error(err.kind)),
)))
.expect("Could not send event on main channel");
self.mailbox_entries
.entry(mailbox_hash)
.and_modify(|entry| {
entry.status = MailboxStatus::Failed(err);
});
self.sender
.send(ThreadEvent::UIEvent(UIEvent::MailboxUpdate((
self.hash,
mailbox_hash,
))))
.unwrap();
return true;
}
}
}
JobRequest::Fetch {
mailbox_hash,
ref mut handle,
@ -2117,6 +2259,17 @@ impl Account {
}
}
}
JobRequest::FetchBatch { ref mut handle } => {
if let Ok(Some(Err(err))) = handle.chan.try_recv() {
self.sender
.send(ThreadEvent::UIEvent(UIEvent::Notification(
Some(format!("{}: envelope fetch failed", &self.name)),
err.to_string(),
Some(crate::types::NotificationType::Error(err.kind)),
)))
.expect("Could not send event on main channel");
}
}
JobRequest::Watch { ref mut handle } => {
debug!("JobRequest::Watch finished??? ");
if let Ok(Some(Err(err))) = handle.chan.try_recv() {

2
src/state.rs

@ -393,7 +393,7 @@ impl State {
.contains_key(&mailbox_hash)
{
if self.context.accounts[&account_hash]
.load(mailbox_hash)
.load2(mailbox_hash)
.is_err()
{
self.context.replies.push_back(UIEvent::from(event));

7
src/types.rs

@ -285,6 +285,13 @@ pub mod segment_tree {
max
}
pub fn get(&self, index: usize) -> u8 {
if self.array.is_empty() {
return 0;
}
self.array[index]
}
pub fn update(&mut self, pos: usize, value: u8) {
let mut ctr = pos + self.array.len();

Loading…
Cancel
Save