diff --git a/melib/src/backends/jmap.rs b/melib/src/backends/jmap.rs index a16838c0d..31e58ab3f 100644 --- a/melib/src/backends/jmap.rs +++ b/melib/src/backends/jmap.rs @@ -27,12 +27,26 @@ use futures::lock::Mutex as FutureMutex; use isahc::prelude::HttpClient; use isahc::ResponseExt; use serde_json::Value; -use std::collections::{BTreeMap, HashMap}; +use std::collections::{hash_map::DefaultHasher, BTreeMap, HashMap, HashSet}; use std::convert::TryFrom; +use std::hash::{Hash, Hasher}; use std::str::FromStr; use std::sync::{Arc, Mutex, RwLock}; use std::time::Instant; +macro_rules! tag_hash { + ($t:ident) => {{ + let mut hasher = DefaultHasher::default(); + $t.hash(&mut hasher); + hasher.finish() + }}; + ($t:literal) => {{ + let mut hasher = DefaultHasher::default(); + $t.hash(&mut hasher); + hasher.finish() + }}; +} + #[macro_export] macro_rules! _impl { ($(#[$outer:meta])*$field:ident : $t:ty) => { @@ -131,7 +145,7 @@ impl JmapServerConf { } } -struct IsSubscribedFn(Box bool + Send + Sync>); +pub struct IsSubscribedFn(Box bool + Send + Sync>); impl std::fmt::Debug for IsSubscribedFn { fn fmt(&self, f: &mut std::fmt::Formatter) -> std::fmt::Result { @@ -173,24 +187,116 @@ macro_rules! get_conf_val { }; } -#[derive(Debug, Default)] +#[derive(Debug)] pub struct Store { - byte_cache: HashMap, - id_store: HashMap, - blob_id_store: HashMap, + pub account_name: Arc, + pub account_hash: AccountHash, + pub account_id: Arc>, + pub byte_cache: Arc>>, + pub id_store: Arc>>, + pub reverse_id_store: Arc>>, + pub blob_id_store: Arc>>, + pub tag_index: Arc>>, + pub mailboxes: Arc>>, + pub mailboxes_index: Arc>>>, + pub object_set_states: Arc>>, + pub online_status: Arc)>>, + pub is_subscribed: Arc, + pub event_consumer: BackendEventConsumer, +} + +impl Store { + pub fn add_envelope(&self, obj: EmailObject) -> Envelope { + let mut tag_lck = self.tag_index.write().unwrap(); + let tags = obj + .keywords() + .keys() + .map(|tag| { + let tag_hash = { + let mut hasher = DefaultHasher::default(); + tag.hash(&mut hasher); + hasher.finish() + }; + if !tag_lck.contains_key(&tag_hash) { + tag_lck.insert(tag_hash, tag.to_string()); + } + tag_hash + }) + .collect::>(); + let id = obj.id.clone(); + let mailbox_ids = obj.mailbox_ids.clone(); + let blob_id = obj.blob_id.clone(); + drop(tag_lck); + let mut ret: Envelope = obj.into(); + + debug_assert_eq!(tag_hash!("$draft"), 6613915297903591176); + debug_assert_eq!(tag_hash!("$seen"), 1683863812294339685); + debug_assert_eq!(tag_hash!("$flagged"), 2714010747478170100); + debug_assert_eq!(tag_hash!("$answered"), 8940855303929342213); + debug_assert_eq!(tag_hash!("$junk"), 2656839745430720464); + debug_assert_eq!(tag_hash!("$notjunk"), 4091323799684325059); + let mut id_store_lck = self.id_store.lock().unwrap(); + let mut reverse_id_store_lck = self.reverse_id_store.lock().unwrap(); + let mut blob_id_store_lck = self.blob_id_store.lock().unwrap(); + let mailboxes_lck = self.mailboxes.read().unwrap(); + let mut mailboxes_index_lck = self.mailboxes_index.write().unwrap(); + for (mailbox_id, _) in mailbox_ids { + if let Some((mailbox_hash, _)) = mailboxes_lck.iter().find(|(_, m)| m.id == mailbox_id) + { + mailboxes_index_lck + .entry(*mailbox_hash) + .or_default() + .insert(ret.hash()); + } + } + reverse_id_store_lck.insert(id.clone(), ret.hash()); + id_store_lck.insert(ret.hash(), id); + blob_id_store_lck.insert(ret.hash(), blob_id); + for t in tags { + match t { + 6613915297903591176 => { + ret.set_flags(ret.flags() | Flag::DRAFT); + } + 1683863812294339685 => { + ret.set_flags(ret.flags() | Flag::SEEN); + } + 2714010747478170100 => { + ret.set_flags(ret.flags() | Flag::FLAGGED); + } + 8940855303929342213 => { + ret.set_flags(ret.flags() | Flag::REPLIED); + } + 2656839745430720464 | 4091323799684325059 => { /* ignore */ } + _ => ret.labels_mut().push(t), + } + } + ret + } + + pub fn remove_envelope( + &self, + obj_id: Id, + ) -> Option<(EnvelopeHash, SmallVec<[MailboxHash; 8]>)> { + let env_hash = self.reverse_id_store.lock().unwrap().remove(&obj_id)?; + self.id_store.lock().unwrap().remove(&env_hash); + self.blob_id_store.lock().unwrap().remove(&env_hash); + self.byte_cache.lock().unwrap().remove(&env_hash); + let mut mailbox_hashes = SmallVec::new(); + let mailboxes_lck = self.mailboxes.read().unwrap(); + for (k, set) in self.mailboxes_index.write().unwrap().iter_mut() { + if set.remove(&env_hash) { + mailbox_hashes.push(*k); + } + } + Some((env_hash, mailbox_hashes)) + } } #[derive(Debug)] pub struct JmapType { - account_name: String, - account_hash: AccountHash, - online: Arc)>>, - is_subscribed: Arc, server_conf: JmapServerConf, connection: Arc>, - store: Arc>, - tag_index: Arc>>, - mailboxes: Arc>>, + store: Arc, } impl MailBackend for JmapType { @@ -207,7 +313,7 @@ impl MailBackend for JmapType { } fn is_online(&self) -> ResultFuture<()> { - let online = self.online.clone(); + let online = self.store.online_status.clone(); Ok(Box::pin(async move { //match timeout(std::time::Duration::from_secs(3), connection.lock()).await { let online_lck = online.lock().await; @@ -224,9 +330,7 @@ impl MailBackend for JmapType { &mut self, mailbox_hash: MailboxHash, ) -> Result>> + Send + 'static>>> { - let mailboxes = self.mailboxes.clone(); let store = self.store.clone(); - let tag_index = self.tag_index.clone(); let connection = self.connection.clone(); Ok(Box::pin(async_stream::try_stream! { let mut conn = connection.lock().await; @@ -234,8 +338,6 @@ impl MailBackend for JmapType { let res = protocol::fetch( &conn, &store, - &tag_index, - &mailboxes, mailbox_hash, ).await?; yield res; @@ -243,7 +345,13 @@ impl MailBackend for JmapType { } fn refresh(&mut self, _mailbox_hash: MailboxHash) -> ResultFuture<()> { - Err(MeliError::new("Unimplemented.")) + let connection = self.connection.clone(); + Ok(Box::pin(async move { + let mut conn = connection.lock().await; + conn.connect().await?; + conn.email_changes().await?; + Ok(()) + })) } fn watch(&self) -> ResultFuture<()> { @@ -251,17 +359,18 @@ impl MailBackend for JmapType { } fn mailboxes(&self) -> ResultFuture> { - let mailboxes = self.mailboxes.clone(); + let store = self.store.clone(); let connection = self.connection.clone(); Ok(Box::pin(async move { let mut conn = connection.lock().await; conn.connect().await?; - if mailboxes.read().unwrap().is_empty() { + if store.mailboxes.read().unwrap().is_empty() { let new_mailboxes = debug!(protocol::get_mailboxes(&conn).await)?; - *mailboxes.write().unwrap() = new_mailboxes; + *store.mailboxes.write().unwrap() = new_mailboxes; } - let ret = mailboxes + let ret = store + .mailboxes .read() .unwrap() .iter() @@ -287,7 +396,7 @@ impl MailBackend for JmapType { mailbox_hash: MailboxHash, _flags: Option, ) -> ResultFuture<()> { - let mailboxes = self.mailboxes.clone(); + let store = self.store.clone(); let connection = self.connection.clone(); Ok(Box::pin(async move { let mut conn = connection.lock().await; @@ -305,7 +414,7 @@ impl MailBackend for JmapType { .await?; let mailbox_id: String = { - let mailboxes_lck = mailboxes.read().unwrap(); + let mailboxes_lck = store.mailboxes.read().unwrap(); if let Some(mailbox) = mailboxes_lck.get(&mailbox_hash) { mailbox.id.clone() } else { @@ -360,7 +469,7 @@ impl MailBackend for JmapType { } fn tags(&self) -> Option>>> { - Some(self.tag_index.clone()) + Some(self.store.tag_index.clone()) } fn as_any(&self) -> &dyn Any { @@ -376,9 +485,12 @@ impl MailBackend for JmapType { q: crate::search::Query, mailbox_hash: Option, ) -> ResultFuture> { + let store = self.store.clone(); let connection = self.connection.clone(); let filter = if let Some(mailbox_hash) = mailbox_hash { - let mailbox_id = self.mailboxes.read().unwrap()[&mailbox_hash].id.clone(); + let mailbox_id = self.store.mailboxes.read().unwrap()[&mailbox_hash] + .id + .clone(); let mut f = Filter::Condition( EmailFilterCondition::new() @@ -412,14 +524,13 @@ impl MailBackend for JmapType { let res_text = res.text_async().await?; let mut v: MethodResponse = serde_json::from_str(&res_text).unwrap(); - *conn.online_status.lock().await = (std::time::Instant::now(), Ok(())); + *store.online_status.lock().await = (std::time::Instant::now(), Ok(())); let m = QueryResponse::::try_from(v.method_responses.remove(0))?; let QueryResponse:: { ids, .. } = m; let ret = ids .into_iter() .map(|id| { - use std::hash::Hasher; - let mut h = std::collections::hash_map::DefaultHasher::new(); + let mut h = DefaultHasher::new(); h.write(id.as_bytes()); h.finish() }) @@ -450,12 +561,11 @@ impl MailBackend for JmapType { destination_mailbox_hash: MailboxHash, move_: bool, ) -> ResultFuture<()> { - let mailboxes = self.mailboxes.clone(); let store = self.store.clone(); let connection = self.connection.clone(); Ok(Box::pin(async move { let (source_mailbox_id, destination_mailbox_id) = { - let mailboxes_lck = mailboxes.read().unwrap(); + let mailboxes_lck = store.mailboxes.read().unwrap(); if !mailboxes_lck.contains_key(&source_mailbox_hash) { return Err(MeliError::new(format!( "Could not find source mailbox with hash {}", @@ -489,9 +599,8 @@ impl MailBackend for JmapType { ); } { - let store_lck = store.read().unwrap(); for env_hash in env_hashes.iter() { - if let Some(id) = store_lck.id_store.get(&env_hash) { + if let Some(id) = store.id_store.lock().unwrap().get(&env_hash) { ids.push(id.clone()); id_map.insert(id.clone(), env_hash); update_map.insert(id.clone(), serde_json::json!(update_keywords.clone())); @@ -517,7 +626,7 @@ impl MailBackend for JmapType { let res_text = res.text_async().await?; let mut v: MethodResponse = serde_json::from_str(&res_text).unwrap(); - *conn.online_status.lock().await = (std::time::Instant::now(), Ok(())); + *store.online_status.lock().await = (std::time::Instant::now(), Ok(())); let m = SetResponse::::try_from(v.method_responses.remove(0))?; if let Some(ids) = m.not_updated { if !ids.is_empty() { @@ -540,13 +649,10 @@ impl MailBackend for JmapType { mailbox_hash: MailboxHash, flags: SmallVec<[(std::result::Result, bool); 8]>, ) -> ResultFuture<()> { - let mailboxes = self.mailboxes.clone(); let store = self.store.clone(); - let account_hash = self.account_hash; - let tag_index = self.tag_index.clone(); let connection = self.connection.clone(); Ok(Box::pin(async move { - let mailbox_id = mailboxes.read().unwrap()[&mailbox_hash].id.clone(); + let mailbox_id = store.mailboxes.read().unwrap()[&mailbox_hash].id.clone(); let mut update_map: HashMap = HashMap::default(); let mut ids: Vec = Vec::with_capacity(env_hashes.rest.len() + 1); let mut id_map: HashMap = HashMap::default(); @@ -587,9 +693,8 @@ impl MailBackend for JmapType { } } { - let store_lck = store.read().unwrap(); for hash in env_hashes.iter() { - if let Some(id) = store_lck.id_store.get(&hash) { + if let Some(id) = store.id_store.lock().unwrap().get(&hash) { ids.push(id.clone()); id_map.insert(id.clone(), hash); update_map.insert(id.clone(), serde_json::json!(update_keywords.clone())); @@ -626,7 +731,7 @@ impl MailBackend for JmapType { */ //debug!("res_text = {}", &res_text); let mut v: MethodResponse = serde_json::from_str(&res_text).unwrap(); - *conn.online_status.lock().await = (std::time::Instant::now(), Ok(())); + *store.online_status.lock().await = (std::time::Instant::now(), Ok(())); let m = SetResponse::::try_from(v.method_responses.remove(0))?; if let Some(ids) = m.not_updated { return Err(MeliError::new( @@ -637,24 +742,45 @@ impl MailBackend for JmapType { )); } - let mut tag_index_lck = tag_index.write().unwrap(); - for (flag, value) in flags.iter() { - match flag { - Ok(f) => {} - Err(t) => { - if *value { - tag_index_lck.insert(tag_hash!(t), t.clone()); + { + let mut tag_index_lck = store.tag_index.write().unwrap(); + for (flag, value) in flags.iter() { + match flag { + Ok(f) => {} + Err(t) => { + if *value { + tag_index_lck.insert(tag_hash!(t), t.clone()); + } } } } + drop(tag_index_lck); } let e = GetResponse::::try_from(v.method_responses.pop().unwrap())?; let GetResponse:: { list, state, .. } = e; - //debug!(&list); + { + let c = store + .object_set_states + .lock() + .unwrap() + .get(&EmailObject::NAME) + .map(|prev_state| *prev_state == state); + if let Some(false) = c { + conn.email_changes().await?; + } else { + debug!("{:?}: inserting state {}", EmailObject::NAME, &state); + store + .object_set_states + .lock() + .unwrap() + .insert(EmailObject::NAME, state); + } + } + debug!(&list); for envobj in list { let env_hash = id_map[&envobj.id]; conn.add_refresh_event(RefreshEvent { - account_hash, + account_hash: store.account_hash, mailbox_hash, kind: RefreshEventKind::NewFlags( env_hash, @@ -673,34 +799,41 @@ impl JmapType { is_subscribed: Box bool + Send + Sync>, event_consumer: BackendEventConsumer, ) -> Result> { - let online = Arc::new(FutureMutex::new(( + let online_status = Arc::new(FutureMutex::new(( std::time::Instant::now(), Err(MeliError::new("Account is uninitialised.")), ))); let server_conf = JmapServerConf::new(s)?; let account_hash = { - use std::collections::hash_map::DefaultHasher; - use std::hash::Hasher; let mut hasher = DefaultHasher::new(); hasher.write(s.name.as_bytes()); hasher.finish() }; + let store = Arc::new(Store { + account_name: Arc::new(s.name.clone()), + account_hash, + account_id: Arc::new(Mutex::new(String::new())), + online_status, + event_consumer, + is_subscribed: Arc::new(IsSubscribedFn(is_subscribed)), + + byte_cache: Default::default(), + id_store: Default::default(), + reverse_id_store: Default::default(), + blob_id_store: Default::default(), + tag_index: Default::default(), + mailboxes: Default::default(), + mailboxes_index: Default::default(), + object_set_states: Default::default(), + }); Ok(Box::new(JmapType { connection: Arc::new(FutureMutex::new(JmapConnection::new( &server_conf, - account_hash, - event_consumer, - online.clone(), + store.clone(), )?)), - store: Arc::new(RwLock::new(Store::default())), - tag_index: Arc::new(RwLock::new(Default::default())), - mailboxes: Arc::new(RwLock::new(HashMap::default())), - account_name: s.name.clone(), - account_hash, - online, - is_subscribed: Arc::new(IsSubscribedFn(is_subscribed)), + store, server_conf, })) } diff --git a/melib/src/backends/jmap/connection.rs b/melib/src/backends/jmap/connection.rs index 415795ebb..af46bba64 100644 --- a/melib/src/backends/jmap/connection.rs +++ b/melib/src/backends/jmap/connection.rs @@ -27,21 +27,12 @@ pub struct JmapConnection { pub session: JmapSession, pub request_no: Arc>, pub client: Arc, - pub online_status: Arc)>>, pub server_conf: JmapServerConf, - pub account_id: Arc>, - pub account_hash: AccountHash, - pub method_call_states: Arc>>, - pub event_consumer: BackendEventConsumer, + pub store: Arc, } impl JmapConnection { - pub fn new( - server_conf: &JmapServerConf, - account_hash: AccountHash, - event_consumer: BackendEventConsumer, - online_status: Arc)>>, - ) -> Result { + pub fn new(server_conf: &JmapServerConf, store: Arc) -> Result { let client = HttpClient::builder() .timeout(std::time::Duration::from_secs(10)) .authentication(isahc::auth::Authentication::basic()) @@ -55,17 +46,13 @@ impl JmapConnection { session: Default::default(), request_no: Arc::new(Mutex::new(0)), client: Arc::new(client), - online_status, server_conf, - account_id: Arc::new(Mutex::new(String::new())), - account_hash, - event_consumer, - method_call_states: Arc::new(Mutex::new(Default::default())), + store, }) } pub async fn connect(&mut self) -> Result<()> { - if self.online_status.lock().await.1.is_ok() { + if self.store.online_status.lock().await.1.is_ok() { return Ok(()); } let mut jmap_session_resource_url = @@ -86,7 +73,7 @@ impl JmapConnection { let session: JmapSession = match serde_json::from_str(&res_text) { Err(err) => { let err = MeliError::new(format!("Could not connect to JMAP server endpoint for {}. Is your server hostname setting correct? (i.e. \"jmap.mailserver.org\") (Note: only session resource discovery via /.well-known/jmap is supported. DNS SRV records are not suppported.)\nReply from server: {}", &self.server_conf.server_hostname, &res_text)).set_source(Some(Arc::new(err))); - *self.online_status.lock().await = (Instant::now(), Err(err.clone())); + *self.store.online_status.lock().await = (Instant::now(), Err(err.clone())); return Err(err); } Ok(s) => s, @@ -96,7 +83,7 @@ impl JmapConnection { .contains_key("urn:ietf:params:jmap:core") { let err = MeliError::new(format!("Server {} did not return JMAP Core capability (urn:ietf:params:jmap:core). Returned capabilities were: {}", &self.server_conf.server_hostname, session.capabilities.keys().map(String::as_str).collect::>().join(", "))); - *self.online_status.lock().await = (Instant::now(), Err(err.clone())); + *self.store.online_status.lock().await = (Instant::now(), Err(err.clone())); return Err(err); } if !session @@ -104,11 +91,11 @@ impl JmapConnection { .contains_key("urn:ietf:params:jmap:mail") { let err = MeliError::new(format!("Server {} does not support JMAP Mail capability (urn:ietf:params:jmap:mail). Returned capabilities were: {}", &self.server_conf.server_hostname, session.capabilities.keys().map(String::as_str).collect::>().join(", "))); - *self.online_status.lock().await = (Instant::now(), Err(err.clone())); + *self.store.online_status.lock().await = (Instant::now(), Err(err.clone())); return Err(err); } - *self.online_status.lock().await = (Instant::now(), Ok(())); + *self.store.online_status.lock().await = (Instant::now(), Ok(())); self.session = session; Ok(()) } @@ -118,6 +105,128 @@ impl JmapConnection { } pub fn add_refresh_event(&self, event: RefreshEvent) { - (self.event_consumer)(self.account_hash, BackendEvent::Refresh(event)); + (self.store.event_consumer)(self.store.account_hash, BackendEvent::Refresh(event)); + } + + pub async fn email_changes(&self) -> Result<()> { + let mut current_state: String = { + let object_set_states_lck = self.store.object_set_states.lock().unwrap(); + let v = if let Some(prev_state) = debug!(object_set_states_lck.get(&EmailObject::NAME)) + { + prev_state.clone() + } else { + return Ok(()); + }; + drop(object_set_states_lck); + v + }; + loop { + + let email_changes_call: EmailChanges = EmailChanges::new( + Changes::::new() + .account_id(self.mail_account_id().to_string()) + .since_state(current_state.clone()), + ); + + let mut req = Request::new(self.request_no.clone()); + let prev_seq = req.add_call(&email_changes_call); + let email_get_call: EmailGet = EmailGet::new( + Get::new() + .ids(Some(JmapArgument::reference( + prev_seq, + ResultField::::new("created"), + ))) + .account_id(self.mail_account_id().to_string()), + ); + + req.add_call(&email_get_call); + + let mut res = self + .client + .post_async(&self.session.api_url, serde_json::to_string(&req)?) + .await?; + + let res_text = res.text_async().await?; + debug!(&res_text); + let mut v: MethodResponse = serde_json::from_str(&res_text).unwrap(); + let get_response = + GetResponse::::try_from(v.method_responses.pop().unwrap())?; + debug!(&get_response); + let GetResponse:: { list, .. } = get_response; + let mut mailbox_hashes: Vec> = + Vec::with_capacity(list.len()); + for envobj in &list { + let v = self + .store + .mailboxes + .read() + .unwrap() + .iter() + .filter(|(_, m)| envobj.mailbox_ids.contains_key(&m.id)) + .map(|(k, _)| *k) + .collect::>(); + mailbox_hashes.push(v); + } + + for (env, mailbox_hashes) in list + .into_iter() + .map(|obj| self.store.add_envelope(obj)) + .zip(mailbox_hashes) + { + for mailbox_hash in mailbox_hashes.iter().skip(1).cloned() { + self.add_refresh_event(RefreshEvent { + account_hash: self.store.account_hash, + mailbox_hash, + kind: RefreshEventKind::Create(Box::new(env.clone())), + }); + } + if let Some(mailbox_hash) = mailbox_hashes.first().cloned() { + self.add_refresh_event(RefreshEvent { + account_hash: self.store.account_hash, + mailbox_hash, + kind: RefreshEventKind::Create(Box::new(env)), + }); + } + } + + let changes_response = + ChangesResponse::::try_from(v.method_responses.pop().unwrap())?; + + let ChangesResponse:: { + account_id: _, + new_state, + old_state: _, + has_more_changes, + created: _, + updated, + destroyed, + _ph: _, + } = changes_response; + for (env_hash, mailbox_hashes) in destroyed + .into_iter() + .filter_map(|obj_id| self.store.remove_envelope(obj_id)) + { + for mailbox_hash in mailbox_hashes { + self.add_refresh_event(RefreshEvent { + account_hash: self.store.account_hash, + mailbox_hash, + kind: RefreshEventKind::Remove(env_hash), + }); + } + } + + if has_more_changes { + current_state = new_state; + } else { + self.store + .object_set_states + .lock() + .unwrap() + .insert(EmailObject::NAME, new_state); + break; + } + } + + Ok(()) } } diff --git a/melib/src/backends/jmap/mailbox.rs b/melib/src/backends/jmap/mailbox.rs index cbed47e4f..cf1987a46 100644 --- a/melib/src/backends/jmap/mailbox.rs +++ b/melib/src/backends/jmap/mailbox.rs @@ -28,7 +28,7 @@ pub struct JmapMailbox { pub name: String, pub path: String, pub hash: MailboxHash, - pub v: Vec, + pub children: Vec, pub id: String, pub is_subscribed: bool, pub my_rights: JmapRights, @@ -62,7 +62,7 @@ impl BackendMailbox for JmapMailbox { } fn children(&self) -> &[MailboxHash] { - &self.v + &self.children } fn parent(&self) -> Option { diff --git a/melib/src/backends/jmap/objects/email.rs b/melib/src/backends/jmap/objects/email.rs index bb9f8fb07..415b62bc3 100644 --- a/melib/src/backends/jmap/objects/email.rs +++ b/melib/src/backends/jmap/objects/email.rs @@ -770,3 +770,20 @@ impl EmailSet { EmailSet { set_call } } } + +#[derive(Serialize, Debug)] +#[serde(rename_all = "camelCase")] +pub struct EmailChanges { + #[serde(flatten)] + pub changes_call: Changes, +} + +impl Method for EmailChanges { + const NAME: &'static str = "Email/changes"; +} + +impl EmailChanges { + pub fn new(changes_call: Changes) -> Self { + EmailChanges { changes_call } + } +} diff --git a/melib/src/backends/jmap/operations.rs b/melib/src/backends/jmap/operations.rs index 0605b484b..6358c71e9 100644 --- a/melib/src/backends/jmap/operations.rs +++ b/melib/src/backends/jmap/operations.rs @@ -20,21 +20,21 @@ */ use super::*; -use std::sync::{Arc, RwLock}; +use std::sync::Arc; /// `BackendOp` implementor for Imap #[derive(Debug, Clone)] pub struct JmapOp { hash: EnvelopeHash, connection: Arc>, - store: Arc>, + store: Arc, } impl JmapOp { pub fn new( hash: EnvelopeHash, connection: Arc>, - store: Arc>, + store: Arc, ) -> Self { JmapOp { hash, @@ -47,11 +47,9 @@ impl JmapOp { impl BackendOp for JmapOp { fn as_bytes(&mut self) -> ResultFuture> { { - let store_lck = self.store.read().unwrap(); - if store_lck.byte_cache.contains_key(&self.hash) - && store_lck.byte_cache[&self.hash].bytes.is_some() - { - let ret = store_lck.byte_cache[&self.hash].bytes.clone().unwrap(); + let byte_lck = self.store.byte_cache.lock().unwrap(); + if byte_lck.contains_key(&self.hash) && byte_lck[&self.hash].bytes.is_some() { + let ret = byte_lck[&self.hash].bytes.clone().unwrap(); return Ok(Box::pin(async move { Ok(ret.into_bytes()) })); } } @@ -59,7 +57,7 @@ impl BackendOp for JmapOp { let hash = self.hash; let connection = self.connection.clone(); Ok(Box::pin(async move { - let blob_id = store.read().unwrap().blob_id_store[&hash].clone(); + let blob_id = store.blob_id_store.lock().unwrap()[&hash].clone(); let mut conn = connection.lock().await; conn.connect().await?; let mut res = conn @@ -75,9 +73,9 @@ impl BackendOp for JmapOp { let res_text = res.text_async().await?; store - .write() - .unwrap() .byte_cache + .lock() + .unwrap() .entry(hash) .or_default() .bytes = Some(res_text.clone()); diff --git a/melib/src/backends/jmap/protocol.rs b/melib/src/backends/jmap/protocol.rs index 87e33b048..cf6b3d0cc 100644 --- a/melib/src/backends/jmap/protocol.rs +++ b/melib/src/backends/jmap/protocol.rs @@ -23,11 +23,7 @@ use super::mailbox::JmapMailbox; use super::*; use serde::Serialize; use serde_json::{json, Value}; -use smallvec::SmallVec; -use std::collections::hash_map::DefaultHasher; -use std::collections::HashMap; use std::convert::TryFrom; -use std::hash::{Hash, Hasher}; pub type Id = String; pub type UtcDate = String; @@ -43,19 +39,6 @@ macro_rules! get_request_no { }}; } -macro_rules! tag_hash { - ($t:ident) => {{ - let mut hasher = DefaultHasher::default(); - $t.hash(&mut hasher); - hasher.finish() - }}; - ($t:literal) => {{ - let mut hasher = DefaultHasher::default(); - $t.hash(&mut hasher); - hasher.finish() - }}; -} - pub trait Response { const NAME: &'static str; } @@ -120,12 +103,12 @@ pub async fn get_mailboxes(conn: &JmapConnection) -> Result::try_from(v.method_responses.remove(0))?; let GetResponse:: { list, account_id, .. } = m; - *conn.account_id.lock().unwrap() = account_id; + *conn.store.account_id.lock().unwrap() = account_id; Ok(list .into_iter() .map(|r| { @@ -149,7 +132,7 @@ pub async fn get_mailboxes(conn: &JmapConnection) -> Result R let res_text = res.text_async().await?; let mut v: MethodResponse = serde_json::from_str(&res_text).unwrap(); - *conn.online_status.lock().await = (std::time::Instant::now(), Ok(())); + *conn.store.online_status.lock().await = (std::time::Instant::now(), Ok(())); let m = QueryResponse::::try_from(v.method_responses.remove(0))?; let QueryResponse:: { ids, .. } = m; Ok(ids) } +/* pub async fn get_message(conn: &JmapConnection, ids: &[String]) -> Result> { let email_call: EmailGet = EmailGet::new( Get::new() @@ -219,15 +203,14 @@ pub async fn get_message(conn: &JmapConnection, ids: &[String]) -> Result>()) } +*/ pub async fn fetch( conn: &JmapConnection, - store: &Arc>, - tag_index: &Arc>>, - mailboxes: &Arc>>, + store: &Store, mailbox_hash: MailboxHash, ) -> Result> { - let mailbox_id = mailboxes.read().unwrap()[&mailbox_hash].id.clone(); + let mailbox_id = store.mailboxes.read().unwrap()[&mailbox_hash].id.clone(); let email_query_call: EmailQuery = EmailQuery::new( Query::new() .account_id(conn.mail_account_id().to_string()) @@ -265,76 +248,27 @@ pub async fn fetch( let e = GetResponse::::try_from(v.method_responses.pop().unwrap())?; let GetResponse:: { list, state, .. } = e; { - let mut states_lck = conn.method_call_states.lock().unwrap(); - - if let Some(prev_state) = states_lck.get_mut(&EmailGet::NAME) { - debug!("{:?}: prev_state was {}", EmailGet::NAME, prev_state); - - if *prev_state != state { /* FIXME Query Changes. */ } - - *prev_state = state; - debug!("{:?}: curr state is {}", EmailGet::NAME, prev_state); + let v = conn + .store + .object_set_states + .lock() + .unwrap() + .get(&EmailObject::NAME) + .map(|prev_state| *prev_state == state); + if let Some(false) = v { + conn.email_changes().await?; } else { - debug!("{:?}: inserting state {}", EmailGet::NAME, &state); - states_lck.insert(EmailGet::NAME, state); + debug!("{:?}: inserting state {}", EmailObject::NAME, &state); + conn.store + .object_set_states + .lock() + .unwrap() + .insert(EmailObject::NAME, state); } } - let mut tag_lck = tag_index.write().unwrap(); - let ids = list - .iter() - .map(|obj| { - let tags = obj - .keywords() - .keys() - .map(|tag| { - let tag_hash = { - let mut hasher = DefaultHasher::default(); - tag.hash(&mut hasher); - hasher.finish() - }; - if !tag_lck.contains_key(&tag_hash) { - tag_lck.insert(tag_hash, tag.to_string()); - } - tag_hash - }) - .collect::>(); - (tags, obj.id.clone(), obj.blob_id.clone()) - }) - .collect::, Id, Id)>>(); - drop(tag_lck); - let mut ret = list - .into_iter() - .map(std::convert::Into::into) - .collect::>(); - - let mut store_lck = store.write().unwrap(); - debug_assert_eq!(tag_hash!("$draft"), 6613915297903591176); - debug_assert_eq!(tag_hash!("$seen"), 1683863812294339685); - debug_assert_eq!(tag_hash!("$flagged"), 2714010747478170100); - debug_assert_eq!(tag_hash!("$answered"), 8940855303929342213); - debug_assert_eq!(tag_hash!("$junk"), 2656839745430720464); - debug_assert_eq!(tag_hash!("$notjunk"), 4091323799684325059); - for (env, (tags, id, blob_id)) in ret.iter_mut().zip(ids.into_iter()) { - store_lck.id_store.insert(env.hash(), id); - store_lck.blob_id_store.insert(env.hash(), blob_id); - for t in tags { - match t { - 6613915297903591176 => { - env.set_flags(env.flags() | Flag::DRAFT); - } - 1683863812294339685 => { - env.set_flags(env.flags() | Flag::SEEN); - } - 2714010747478170100 => { - env.set_flags(env.flags() | Flag::FLAGGED); - } - 8940855303929342213 => { - env.set_flags(env.flags() | Flag::REPLIED); - } - 2656839745430720464 | 4091323799684325059 => { /* ignore */ } - _ => env.labels_mut().push(t), - } - } + let mut ret = Vec::with_capacity(list.len()); + for obj in list { + ret.push(store.add_envelope(obj)); } Ok(ret) } diff --git a/melib/src/backends/jmap/rfc8620.rs b/melib/src/backends/jmap/rfc8620.rs index 2df2b8b23..863221ee3 100644 --- a/melib/src/backends/jmap/rfc8620.rs +++ b/melib/src/backends/jmap/rfc8620.rs @@ -472,7 +472,7 @@ pub struct ChangesResponse { pub updated: Vec, pub destroyed: Vec, #[serde(skip)] - _ph: PhantomData OBJ>, + pub _ph: PhantomData OBJ>, } impl std::convert::TryFrom<&RawValue> for ChangesResponse {