parent
425f4b9930
commit
36cc0d4212
|
@ -58,7 +58,7 @@ use self::maildir::MaildirType;
|
|||
use self::mbox::MboxType;
|
||||
use super::email::{Envelope, EnvelopeHash, Flag};
|
||||
use std::any::Any;
|
||||
use std::collections::BTreeMap;
|
||||
use std::collections::{BTreeMap, BTreeSet};
|
||||
use std::fmt;
|
||||
use std::fmt::Debug;
|
||||
use std::ops::Deref;
|
||||
|
@ -617,3 +617,70 @@ impl EnvelopeHashBatch {
|
|||
1 + self.rest.len()
|
||||
}
|
||||
}
|
||||
|
||||
#[derive(Debug, Default, Clone)]
|
||||
pub struct LazyCountSet {
|
||||
not_yet_seen: usize,
|
||||
set: BTreeSet<EnvelopeHash>,
|
||||
}
|
||||
|
||||
impl LazyCountSet {
|
||||
pub fn set_not_yet_seen(&mut self, new_val: usize) {
|
||||
self.not_yet_seen = new_val;
|
||||
}
|
||||
|
||||
pub fn insert_existing(&mut self, new_val: EnvelopeHash) -> bool {
|
||||
if self.not_yet_seen == 0 {
|
||||
false
|
||||
} else {
|
||||
self.not_yet_seen -= 1;
|
||||
self.set.insert(new_val);
|
||||
true
|
||||
}
|
||||
}
|
||||
|
||||
pub fn insert_existing_set(&mut self, set: BTreeSet<EnvelopeHash>) -> bool {
|
||||
debug!("insert_existing_set {:?}", &set);
|
||||
if self.not_yet_seen < set.len() {
|
||||
false
|
||||
} else {
|
||||
self.not_yet_seen -= set.len();
|
||||
self.set.extend(set.into_iter());
|
||||
true
|
||||
}
|
||||
}
|
||||
|
||||
#[inline(always)]
|
||||
pub fn len(&self) -> usize {
|
||||
self.set.len() + self.not_yet_seen
|
||||
}
|
||||
|
||||
#[inline(always)]
|
||||
pub fn clear(&mut self) {
|
||||
self.set.clear();
|
||||
self.not_yet_seen = 0;
|
||||
}
|
||||
|
||||
pub fn insert_new(&mut self, new_val: EnvelopeHash) {
|
||||
self.set.insert(new_val);
|
||||
}
|
||||
|
||||
pub fn insert_set(&mut self, set: BTreeSet<EnvelopeHash>) {
|
||||
debug!("insert__set {:?}", &set);
|
||||
self.set.extend(set.into_iter());
|
||||
}
|
||||
|
||||
pub fn remove(&mut self, new_val: EnvelopeHash) -> bool {
|
||||
self.set.remove(&new_val)
|
||||
}
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn test_lazy_count_set() {
|
||||
let mut new = LazyCountSet::default();
|
||||
new.set_not_yet_seen(10);
|
||||
for i in 0..10 {
|
||||
assert!(new.insert_existing(i));
|
||||
}
|
||||
assert!(!new.insert_existing(10));
|
||||
}
|
||||
|
|
|
@ -21,80 +21,11 @@
|
|||
|
||||
use super::protocol_parser::SelectResponse;
|
||||
use crate::backends::{
|
||||
BackendMailbox, Mailbox, MailboxHash, MailboxPermissions, SpecialUsageMailbox,
|
||||
BackendMailbox, LazyCountSet, Mailbox, MailboxHash, MailboxPermissions, SpecialUsageMailbox,
|
||||
};
|
||||
use crate::email::EnvelopeHash;
|
||||
use crate::error::*;
|
||||
use std::collections::BTreeSet;
|
||||
use std::sync::{Arc, Mutex, RwLock};
|
||||
|
||||
#[derive(Debug, Default, Clone)]
|
||||
pub struct LazyCountSet {
|
||||
not_yet_seen: usize,
|
||||
set: BTreeSet<EnvelopeHash>,
|
||||
}
|
||||
|
||||
impl LazyCountSet {
|
||||
pub fn set_not_yet_seen(&mut self, new_val: usize) {
|
||||
self.not_yet_seen = new_val;
|
||||
}
|
||||
|
||||
pub fn insert_existing(&mut self, new_val: EnvelopeHash) -> bool {
|
||||
if self.not_yet_seen == 0 {
|
||||
false
|
||||
} else {
|
||||
self.not_yet_seen -= 1;
|
||||
self.set.insert(new_val);
|
||||
true
|
||||
}
|
||||
}
|
||||
|
||||
pub fn insert_existing_set(&mut self, set: BTreeSet<EnvelopeHash>) -> bool {
|
||||
debug!("insert_existing_set {:?}", &set);
|
||||
if self.not_yet_seen < set.len() {
|
||||
false
|
||||
} else {
|
||||
self.not_yet_seen -= set.len();
|
||||
self.set.extend(set.into_iter());
|
||||
true
|
||||
}
|
||||
}
|
||||
|
||||
#[inline(always)]
|
||||
pub fn len(&self) -> usize {
|
||||
self.set.len() + self.not_yet_seen
|
||||
}
|
||||
|
||||
#[inline(always)]
|
||||
pub fn clear(&mut self) {
|
||||
self.set.clear();
|
||||
self.not_yet_seen = 0;
|
||||
}
|
||||
|
||||
pub fn insert_new(&mut self, new_val: EnvelopeHash) {
|
||||
self.set.insert(new_val);
|
||||
}
|
||||
|
||||
pub fn insert_set(&mut self, set: BTreeSet<EnvelopeHash>) {
|
||||
debug!("insert__set {:?}", &set);
|
||||
self.set.extend(set.into_iter());
|
||||
}
|
||||
|
||||
pub fn remove(&mut self, new_val: EnvelopeHash) -> bool {
|
||||
self.set.remove(&new_val)
|
||||
}
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn test_lazy_count_set() {
|
||||
let mut new = LazyCountSet::default();
|
||||
new.set_not_yet_seen(10);
|
||||
for i in 0..10 {
|
||||
assert!(new.insert_existing(i));
|
||||
}
|
||||
assert!(!new.insert_existing(10));
|
||||
}
|
||||
|
||||
#[derive(Debug, Default, Clone)]
|
||||
pub struct ImapMailbox {
|
||||
pub hash: MailboxHash,
|
||||
|
|
|
@ -199,7 +199,6 @@ pub struct Store {
|
|||
pub tag_index: Arc<RwLock<BTreeMap<u64, String>>>,
|
||||
pub mailboxes: Arc<RwLock<HashMap<MailboxHash, JmapMailbox>>>,
|
||||
pub mailboxes_index: Arc<RwLock<HashMap<MailboxHash, HashSet<EnvelopeHash>>>>,
|
||||
pub email_state: Arc<Mutex<State<EmailObject>>>,
|
||||
pub mailbox_state: Arc<Mutex<State<MailboxObject>>>,
|
||||
pub online_status: Arc<FutureMutex<(Instant, Result<()>)>>,
|
||||
pub is_subscribed: Arc<IsSubscribedFn>,
|
||||
|
@ -283,7 +282,6 @@ impl Store {
|
|||
self.blob_id_store.lock().unwrap().remove(&env_hash);
|
||||
self.byte_cache.lock().unwrap().remove(&env_hash);
|
||||
let mut mailbox_hashes = SmallVec::new();
|
||||
let mailboxes_lck = self.mailboxes.read().unwrap();
|
||||
for (k, set) in self.mailboxes_index.write().unwrap().iter_mut() {
|
||||
if set.remove(&env_hash) {
|
||||
mailbox_hashes.push(*k);
|
||||
|
@ -345,12 +343,12 @@ impl MailBackend for JmapType {
|
|||
}))
|
||||
}
|
||||
|
||||
fn refresh(&mut self, _mailbox_hash: MailboxHash) -> ResultFuture<()> {
|
||||
fn refresh(&mut self, mailbox_hash: MailboxHash) -> ResultFuture<()> {
|
||||
let connection = self.connection.clone();
|
||||
Ok(Box::pin(async move {
|
||||
let mut conn = connection.lock().await;
|
||||
conn.connect().await?;
|
||||
conn.email_changes().await?;
|
||||
conn.email_changes(mailbox_hash).await?;
|
||||
Ok(())
|
||||
}))
|
||||
}
|
||||
|
@ -646,7 +644,6 @@ impl MailBackend for JmapType {
|
|||
let store = self.store.clone();
|
||||
let connection = self.connection.clone();
|
||||
Ok(Box::pin(async move {
|
||||
let mailbox_id = store.mailboxes.read().unwrap()[&mailbox_hash].id.clone();
|
||||
let mut update_map: HashMap<Id<EmailObject>, Value> = HashMap::default();
|
||||
let mut ids: Vec<Id<EmailObject>> = Vec::with_capacity(env_hashes.rest.len() + 1);
|
||||
let mut id_map: HashMap<Id<EmailObject>, EnvelopeHash> = HashMap::default();
|
||||
|
@ -695,7 +692,7 @@ impl MailBackend for JmapType {
|
|||
}
|
||||
}
|
||||
}
|
||||
let mut conn = connection.lock().await;
|
||||
let conn = connection.lock().await;
|
||||
|
||||
let email_set_call: EmailSet = EmailSet::new(
|
||||
Set::<EmailObject>::new()
|
||||
|
@ -704,7 +701,7 @@ impl MailBackend for JmapType {
|
|||
);
|
||||
|
||||
let mut req = Request::new(conn.request_no.clone());
|
||||
let prev_seq = req.add_call(&email_set_call);
|
||||
req.add_call(&email_set_call);
|
||||
let email_call: EmailGet = EmailGet::new(
|
||||
Get::new()
|
||||
.ids(Some(JmapArgument::Value(ids)))
|
||||
|
@ -740,7 +737,7 @@ impl MailBackend for JmapType {
|
|||
let mut tag_index_lck = store.tag_index.write().unwrap();
|
||||
for (flag, value) in flags.iter() {
|
||||
match flag {
|
||||
Ok(f) => {}
|
||||
Ok(_) => {}
|
||||
Err(t) => {
|
||||
if *value {
|
||||
tag_index_lck.insert(tag_hash!(t), t.clone());
|
||||
|
@ -754,14 +751,26 @@ impl MailBackend for JmapType {
|
|||
let GetResponse::<EmailObject> { list, state, .. } = e;
|
||||
{
|
||||
let (is_empty, is_equal) = {
|
||||
let current_state_lck = conn.store.email_state.lock().unwrap();
|
||||
(current_state_lck.is_empty(), *current_state_lck != state)
|
||||
let mailboxes_lck = conn.store.mailboxes.read().unwrap();
|
||||
mailboxes_lck
|
||||
.get(&mailbox_hash)
|
||||
.map(|mbox| {
|
||||
let current_state_lck = mbox.email_state.lock().unwrap();
|
||||
(
|
||||
current_state_lck.is_some(),
|
||||
current_state_lck.as_ref() != Some(&state),
|
||||
)
|
||||
})
|
||||
.unwrap_or((true, true))
|
||||
};
|
||||
if is_empty {
|
||||
let mut mailboxes_lck = conn.store.mailboxes.write().unwrap();
|
||||
debug!("{:?}: inserting state {}", EmailObject::NAME, &state);
|
||||
*conn.store.email_state.lock().unwrap() = state;
|
||||
mailboxes_lck.entry(mailbox_hash).and_modify(|mbox| {
|
||||
*mbox.email_state.lock().unwrap() = Some(state);
|
||||
});
|
||||
} else if !is_equal {
|
||||
conn.email_changes().await?;
|
||||
conn.email_changes(mailbox_hash).await?;
|
||||
}
|
||||
}
|
||||
debug!(&list);
|
||||
|
@ -813,7 +822,6 @@ impl JmapType {
|
|||
tag_index: Default::default(),
|
||||
mailboxes: Default::default(),
|
||||
mailboxes_index: Default::default(),
|
||||
email_state: Default::default(),
|
||||
mailbox_state: Default::default(),
|
||||
});
|
||||
|
||||
|
|
|
@ -108,12 +108,19 @@ impl JmapConnection {
|
|||
(self.store.event_consumer)(self.store.account_hash, BackendEvent::Refresh(event));
|
||||
}
|
||||
|
||||
pub async fn email_changes(&self) -> Result<()> {
|
||||
let mut current_state: State<EmailObject> = self.store.email_state.lock().unwrap().clone();
|
||||
if current_state.is_empty() {
|
||||
debug!("{:?}: has no saved state", EmailObject::NAME);
|
||||
pub async fn email_changes(&self, mailbox_hash: MailboxHash) -> Result<()> {
|
||||
let mut current_state: State<EmailObject> = if let Some(s) = self
|
||||
.store
|
||||
.mailboxes
|
||||
.read()
|
||||
.unwrap()
|
||||
.get(&mailbox_hash)
|
||||
.and_then(|mbox| mbox.email_state.lock().unwrap().clone())
|
||||
{
|
||||
s
|
||||
} else {
|
||||
return Ok(());
|
||||
}
|
||||
};
|
||||
loop {
|
||||
|
||||
let email_changes_call: EmailChanges = EmailChanges::new(
|
||||
|
@ -134,7 +141,36 @@ impl JmapConnection {
|
|||
);
|
||||
|
||||
req.add_call(&email_get_call);
|
||||
|
||||
if let Some(mailbox) = self.store.mailboxes.read().unwrap().get(&mailbox_hash) {
|
||||
if let Some(email_query_state) = mailbox.email_query_state.lock().unwrap().clone() {
|
||||
let email_query_changes_call = EmailQueryChanges::new(
|
||||
QueryChanges::new(self.mail_account_id().clone(), email_query_state)
|
||||
.filter(Some(Filter::Condition(
|
||||
EmailFilterCondition::new()
|
||||
.in_mailbox(Some(mailbox.id.clone()))
|
||||
.into(),
|
||||
))),
|
||||
);
|
||||
let seq_no = req.add_call(&email_query_changes_call);
|
||||
let email_get_call: EmailGet = EmailGet::new(
|
||||
Get::new()
|
||||
.ids(Some(JmapArgument::reference(
|
||||
seq_no,
|
||||
ResultField::<EmailQueryChanges, EmailObject>::new("removed"),
|
||||
)))
|
||||
.account_id(self.mail_account_id().clone())
|
||||
.properties(Some(vec![
|
||||
"keywords".to_string(),
|
||||
"mailboxIds".to_string(),
|
||||
])),
|
||||
);
|
||||
req.add_call(&email_get_call);
|
||||
} else {
|
||||
return Ok(());
|
||||
}
|
||||
} else {
|
||||
return Ok(());
|
||||
}
|
||||
let mut res = self
|
||||
.client
|
||||
.post_async(&self.session.api_url, serde_json::to_string(&req)?)
|
||||
|
@ -143,79 +179,159 @@ impl JmapConnection {
|
|||
let res_text = res.text_async().await?;
|
||||
debug!(&res_text);
|
||||
let mut v: MethodResponse = serde_json::from_str(&res_text).unwrap();
|
||||
let get_response =
|
||||
GetResponse::<EmailObject>::try_from(v.method_responses.pop().unwrap())?;
|
||||
debug!(&get_response);
|
||||
let GetResponse::<EmailObject> { list, .. } = get_response;
|
||||
let changes_response =
|
||||
ChangesResponse::<EmailObject>::try_from(v.method_responses.pop().unwrap())?;
|
||||
ChangesResponse::<EmailObject>::try_from(v.method_responses.remove(0))?;
|
||||
if changes_response.new_state == current_state {
|
||||
return Ok(());
|
||||
}
|
||||
let get_response = GetResponse::<EmailObject>::try_from(v.method_responses.remove(0))?;
|
||||
|
||||
let mut mailbox_hashes: Vec<SmallVec<[MailboxHash; 8]>> =
|
||||
Vec::with_capacity(list.len());
|
||||
for envobj in &list {
|
||||
let v = self
|
||||
.store
|
||||
.mailboxes
|
||||
.read()
|
||||
.unwrap()
|
||||
.iter()
|
||||
.filter(|(_, m)| envobj.mailbox_ids.contains_key(&m.id))
|
||||
.map(|(k, _)| *k)
|
||||
.collect::<SmallVec<[MailboxHash; 8]>>();
|
||||
mailbox_hashes.push(v);
|
||||
}
|
||||
|
||||
for (env, mailbox_hashes) in list
|
||||
.into_iter()
|
||||
.map(|obj| self.store.add_envelope(obj))
|
||||
.zip(mailbox_hashes)
|
||||
{
|
||||
for mailbox_hash in mailbox_hashes.iter().skip(1).cloned() {
|
||||
self.add_refresh_event(RefreshEvent {
|
||||
account_hash: self.store.account_hash,
|
||||
mailbox_hash,
|
||||
kind: RefreshEventKind::Create(Box::new(env.clone())),
|
||||
});
|
||||
/* process get response */
|
||||
let GetResponse::<EmailObject> { list, .. } = get_response;
|
||||
|
||||
let mut mailbox_hashes: Vec<SmallVec<[MailboxHash; 8]>> =
|
||||
Vec::with_capacity(list.len());
|
||||
for envobj in &list {
|
||||
let v = self
|
||||
.store
|
||||
.mailboxes
|
||||
.read()
|
||||
.unwrap()
|
||||
.iter()
|
||||
.filter(|(_, m)| envobj.mailbox_ids.contains_key(&m.id))
|
||||
.map(|(k, _)| *k)
|
||||
.collect::<SmallVec<[MailboxHash; 8]>>();
|
||||
mailbox_hashes.push(v);
|
||||
}
|
||||
if let Some(mailbox_hash) = mailbox_hashes.first().cloned() {
|
||||
for (env, mailbox_hashes) in list
|
||||
.into_iter()
|
||||
.map(|obj| self.store.add_envelope(obj))
|
||||
.zip(mailbox_hashes)
|
||||
{
|
||||
for mailbox_hash in mailbox_hashes.iter().skip(1).cloned() {
|
||||
let mut mailboxes_lck = self.store.mailboxes.write().unwrap();
|
||||
mailboxes_lck.entry(mailbox_hash).and_modify(|mbox| {
|
||||
if !env.is_seen() {
|
||||
mbox.unread_emails.lock().unwrap().insert_new(env.hash());
|
||||
}
|
||||
mbox.total_emails.lock().unwrap().insert_new(env.hash());
|
||||
});
|
||||
self.add_refresh_event(RefreshEvent {
|
||||
account_hash: self.store.account_hash,
|
||||
mailbox_hash,
|
||||
kind: RefreshEventKind::Create(Box::new(env.clone())),
|
||||
});
|
||||
}
|
||||
if let Some(mailbox_hash) = mailbox_hashes.first().cloned() {
|
||||
let mut mailboxes_lck = self.store.mailboxes.write().unwrap();
|
||||
mailboxes_lck.entry(mailbox_hash).and_modify(|mbox| {
|
||||
if !env.is_seen() {
|
||||
mbox.unread_emails.lock().unwrap().insert_new(env.hash());
|
||||
}
|
||||
mbox.total_emails.lock().unwrap().insert_new(env.hash());
|
||||
});
|
||||
self.add_refresh_event(RefreshEvent {
|
||||
account_hash: self.store.account_hash,
|
||||
mailbox_hash,
|
||||
kind: RefreshEventKind::Create(Box::new(env)),
|
||||
});
|
||||
}
|
||||
}
|
||||
}
|
||||
let reverse_id_store_lck = self.store.reverse_id_store.lock().unwrap();
|
||||
let response = v.method_responses.remove(0);
|
||||
match EmailQueryChangesResponse::try_from(response) {
|
||||
Ok(EmailQueryChangesResponse {
|
||||
collapse_threads: _,
|
||||
query_changes_response:
|
||||
QueryChangesResponse {
|
||||
account_id: _,
|
||||
old_query_state,
|
||||
new_query_state,
|
||||
total: _,
|
||||
removed,
|
||||
added,
|
||||
},
|
||||
}) if old_query_state != new_query_state => {
|
||||
self.store
|
||||
.mailboxes
|
||||
.write()
|
||||
.unwrap()
|
||||
.entry(mailbox_hash)
|
||||
.and_modify(|mbox| {
|
||||
*mbox.email_query_state.lock().unwrap() = Some(new_query_state);
|
||||
});
|
||||
/* If the "filter" or "sort" includes a mutable property, the server
|
||||
MUST include all Foos in the current results for which this
|
||||
property may have changed. The position of these may have moved
|
||||
in the results, so they must be reinserted by the client to ensure
|
||||
its query cache is correct. */
|
||||
for email_obj_id in removed
|
||||
.into_iter()
|
||||
.filter(|id| !added.iter().any(|item| item.id == *id))
|
||||
{
|
||||
if let Some(env_hash) = reverse_id_store_lck.get(&email_obj_id) {
|
||||
let mut mailboxes_lck = self.store.mailboxes.write().unwrap();
|
||||
mailboxes_lck.entry(mailbox_hash).and_modify(|mbox| {
|
||||
mbox.unread_emails.lock().unwrap().remove(*env_hash);
|
||||
mbox.total_emails.lock().unwrap().insert_new(*env_hash);
|
||||
});
|
||||
self.add_refresh_event(RefreshEvent {
|
||||
account_hash: self.store.account_hash,
|
||||
mailbox_hash,
|
||||
kind: RefreshEventKind::Remove(*env_hash),
|
||||
});
|
||||
}
|
||||
}
|
||||
for AddedItem {
|
||||
id: _email_obj_id,
|
||||
index: _,
|
||||
} in added
|
||||
{
|
||||
// FIXME
|
||||
}
|
||||
}
|
||||
Ok(_) => {}
|
||||
Err(err) => {
|
||||
debug!(mailbox_hash);
|
||||
debug!(err);
|
||||
}
|
||||
}
|
||||
let GetResponse::<EmailObject> { list, .. } =
|
||||
GetResponse::<EmailObject>::try_from(v.method_responses.remove(0))?;
|
||||
let mut mailboxes_lck = self.store.mailboxes.write().unwrap();
|
||||
for envobj in list {
|
||||
if let Some(env_hash) = reverse_id_store_lck.get(&envobj.id) {
|
||||
let new_flags =
|
||||
protocol::keywords_to_flags(envobj.keywords().keys().cloned().collect());
|
||||
mailboxes_lck.entry(mailbox_hash).and_modify(|mbox| {
|
||||
if new_flags.0.contains(Flag::SEEN) {
|
||||
mbox.unread_emails.lock().unwrap().remove(*env_hash);
|
||||
} else {
|
||||
mbox.unread_emails.lock().unwrap().insert_new(*env_hash);
|
||||
}
|
||||
});
|
||||
self.add_refresh_event(RefreshEvent {
|
||||
account_hash: self.store.account_hash,
|
||||
mailbox_hash,
|
||||
kind: RefreshEventKind::Create(Box::new(env)),
|
||||
kind: RefreshEventKind::NewFlags(*env_hash, new_flags),
|
||||
});
|
||||
}
|
||||
}
|
||||
|
||||
let ChangesResponse::<EmailObject> {
|
||||
account_id: _,
|
||||
new_state,
|
||||
old_state: _,
|
||||
has_more_changes,
|
||||
created: _,
|
||||
updated,
|
||||
destroyed,
|
||||
_ph: _,
|
||||
} = changes_response;
|
||||
for (env_hash, mailbox_hashes) in destroyed
|
||||
.into_iter()
|
||||
.filter_map(|obj_id| self.store.remove_envelope(obj_id))
|
||||
{
|
||||
for mailbox_hash in mailbox_hashes {
|
||||
self.add_refresh_event(RefreshEvent {
|
||||
account_hash: self.store.account_hash,
|
||||
mailbox_hash,
|
||||
kind: RefreshEventKind::Remove(env_hash),
|
||||
});
|
||||
}
|
||||
}
|
||||
|
||||
if has_more_changes {
|
||||
current_state = new_state;
|
||||
drop(mailboxes_lck);
|
||||
if changes_response.has_more_changes {
|
||||
current_state = changes_response.new_state;
|
||||
} else {
|
||||
*self.store.email_state.lock().unwrap() = new_state;
|
||||
self.store
|
||||
.mailboxes
|
||||
.write()
|
||||
.unwrap()
|
||||
.entry(mailbox_hash)
|
||||
.and_modify(|mbox| {
|
||||
*mbox.email_state.lock().unwrap() = Some(changes_response.new_state);
|
||||
});
|
||||
|
||||
break;
|
||||
}
|
||||
}
|
||||
|
|
|
@ -20,7 +20,7 @@
|
|||
*/
|
||||
|
||||
use super::*;
|
||||
use crate::backends::{MailboxPermissions, SpecialUsageMailbox};
|
||||
use crate::backends::{LazyCountSet, MailboxPermissions, SpecialUsageMailbox};
|
||||
use std::sync::{Arc, Mutex, RwLock};
|
||||
|
||||
#[derive(Debug, Clone)]
|
||||
|
@ -36,11 +36,13 @@ pub struct JmapMailbox {
|
|||
pub parent_hash: Option<MailboxHash>,
|
||||
pub role: Option<String>,
|
||||
pub sort_order: u64,
|
||||
pub total_emails: Arc<Mutex<u64>>,
|
||||
pub total_emails: Arc<Mutex<LazyCountSet>>,
|
||||
pub total_threads: u64,
|
||||
pub unread_emails: Arc<Mutex<u64>>,
|
||||
pub unread_emails: Arc<Mutex<LazyCountSet>>,
|
||||
pub unread_threads: u64,
|
||||
pub usage: Arc<RwLock<SpecialUsageMailbox>>,
|
||||
pub email_state: Arc<Mutex<Option<State<EmailObject>>>>,
|
||||
pub email_query_state: Arc<Mutex<Option<String>>>,
|
||||
}
|
||||
|
||||
impl BackendMailbox for JmapMailbox {
|
||||
|
@ -109,8 +111,8 @@ impl BackendMailbox for JmapMailbox {
|
|||
|
||||
fn count(&self) -> Result<(usize, usize)> {
|
||||
Ok((
|
||||
*self.unread_emails.lock()? as usize,
|
||||
*self.total_emails.lock()? as usize,
|
||||
self.unread_emails.lock()?.len(),
|
||||
self.total_emails.lock()?.len(),
|
||||
))
|
||||
}
|
||||
}
|
||||
|
|
|
@ -24,6 +24,7 @@ use crate::backends::jmap::rfc8620::bool_false;
|
|||
use crate::email::address::{Address, MailboxAddress};
|
||||
use core::marker::PhantomData;
|
||||
use serde::de::{Deserialize, Deserializer};
|
||||
use serde_json::value::RawValue;
|
||||
use serde_json::Value;
|
||||
use std::collections::hash_map::DefaultHasher;
|
||||
use std::collections::HashMap;
|
||||
|
@ -391,21 +392,6 @@ impl Object for EmailObject {
|
|||
const NAME: &'static str = "Email";
|
||||
}
|
||||
|
||||
#[derive(Deserialize, Serialize, Debug)]
|
||||
#[serde(rename_all = "camelCase")]
|
||||
pub struct EmailQueryResponse {
|
||||
pub account_id: Id<Account>,
|
||||
pub can_calculate_changes: bool,
|
||||
pub collapse_threads: bool,
|
||||
// FIXME
|
||||
pub filter: String,
|
||||
pub ids: Vec<Id<EmailObject>>,
|
||||
pub position: u64,
|
||||
pub query_state: String,
|
||||
pub sort: Option<String>,
|
||||
pub total: usize,
|
||||
}
|
||||
|
||||
#[derive(Serialize, Debug)]
|
||||
#[serde(rename_all = "camelCase")]
|
||||
pub struct EmailQuery {
|
||||
|
@ -800,3 +786,40 @@ impl EmailChanges {
|
|||
EmailChanges { changes_call }
|
||||
}
|
||||
}
|
||||
|
||||
#[derive(Serialize, Debug)]
|
||||
#[serde(rename_all = "camelCase")]
|
||||
pub struct EmailQueryChanges {
|
||||
#[serde(flatten)]
|
||||
pub query_changes_call: QueryChanges<Filter<EmailFilterCondition, EmailObject>, EmailObject>,
|
||||
}
|
||||
|
||||
impl Method<EmailObject> for EmailQueryChanges {
|
||||
const NAME: &'static str = "Email/queryChanges";
|
||||
}
|
||||
|
||||
impl EmailQueryChanges {
|
||||
pub fn new(
|
||||
query_changes_call: QueryChanges<Filter<EmailFilterCondition, EmailObject>, EmailObject>,
|
||||
) -> Self {
|
||||
EmailQueryChanges { query_changes_call }
|
||||
}
|
||||
}
|
||||
|
||||
#[derive(Deserialize, Serialize, Debug)]
|
||||
pub struct EmailQueryChangesResponse {
|
||||
///o The "collapseThreads" argument that was used with "Email/query".
|
||||
#[serde(default = "bool_false")]
|
||||
pub collapse_threads: bool,
|
||||
#[serde(flatten)]
|
||||
pub query_changes_response: QueryChangesResponse<EmailObject>,
|
||||
}
|
||||
|
||||
impl std::convert::TryFrom<&RawValue> for EmailQueryChangesResponse {
|
||||
type Error = crate::error::MeliError;
|
||||
fn try_from(t: &RawValue) -> Result<EmailQueryChangesResponse> {
|
||||
let res: (String, EmailQueryChangesResponse, String) = serde_json::from_str(t.get())?;
|
||||
assert_eq!(&res.0, "Email/queryChanges");
|
||||
Ok(res.1)
|
||||
}
|
||||
}
|
||||
|
|
|
@ -23,7 +23,7 @@ use super::mailbox::JmapMailbox;
|
|||
use super::*;
|
||||
use serde::Serialize;
|
||||
use serde_json::{json, Value};
|
||||
use std::convert::TryFrom;
|
||||
use std::convert::{TryFrom, TryInto};
|
||||
|
||||
pub type UtcDate = String;
|
||||
|
||||
|
@ -124,6 +124,12 @@ pub async fn get_mailboxes(conn: &JmapConnection) -> Result<HashMap<MailboxHash,
|
|||
unread_emails,
|
||||
unread_threads,
|
||||
} = r;
|
||||
let mut total_emails_set = LazyCountSet::default();
|
||||
total_emails_set.set_not_yet_seen(total_emails.try_into().unwrap_or(0));
|
||||
let total_emails = total_emails_set;
|
||||
let mut unread_emails_set = LazyCountSet::default();
|
||||
unread_emails_set.set_not_yet_seen(unread_emails.try_into().unwrap_or(0));
|
||||
let unread_emails = unread_emails_set;
|
||||
let hash = id.into_hash();
|
||||
let parent_hash = parent_id.clone().map(|id| id.into_hash());
|
||||
(
|
||||
|
@ -145,6 +151,8 @@ pub async fn get_mailboxes(conn: &JmapConnection) -> Result<HashMap<MailboxHash,
|
|||
total_threads,
|
||||
unread_emails: Arc::new(Mutex::new(unread_emails)),
|
||||
unread_threads,
|
||||
email_state: Arc::new(Mutex::new(None)),
|
||||
email_query_state: Arc::new(Mutex::new(None)),
|
||||
},
|
||||
)
|
||||
})
|
||||
|
@ -250,23 +258,59 @@ pub async fn fetch(
|
|||
|
||||
let mut v: MethodResponse = serde_json::from_str(&res_text).unwrap();
|
||||
let e = GetResponse::<EmailObject>::try_from(v.method_responses.pop().unwrap())?;
|
||||
let query_response = QueryResponse::<EmailObject>::try_from(v.method_responses.pop().unwrap())?;
|
||||
store
|
||||
.mailboxes
|
||||
.write()
|
||||
.unwrap()
|
||||
.entry(mailbox_hash)
|
||||
.and_modify(|mbox| {
|
||||
*mbox.email_query_state.lock().unwrap() = Some(query_response.query_state);
|
||||
});
|
||||
let GetResponse::<EmailObject> { list, state, .. } = e;
|
||||
{
|
||||
let (is_empty, is_equal) = {
|
||||
let current_state_lck = conn.store.email_state.lock().unwrap();
|
||||
(current_state_lck.is_empty(), *current_state_lck != state)
|
||||
let mailboxes_lck = conn.store.mailboxes.read().unwrap();
|
||||
mailboxes_lck
|
||||
.get(&mailbox_hash)
|
||||
.map(|mbox| {
|
||||
let current_state_lck = mbox.email_state.lock().unwrap();
|
||||
(
|
||||
current_state_lck.is_none(),
|
||||
current_state_lck.as_ref() != Some(&state),
|
||||
)
|
||||
})
|
||||
.unwrap_or((true, true))
|
||||
};
|
||||
if is_empty {
|
||||
let mut mailboxes_lck = conn.store.mailboxes.write().unwrap();
|
||||
debug!("{:?}: inserting state {}", EmailObject::NAME, &state);
|
||||
*conn.store.email_state.lock().unwrap() = state;
|
||||
mailboxes_lck.entry(mailbox_hash).and_modify(|mbox| {
|
||||
*mbox.email_state.lock().unwrap() = Some(state);
|
||||
});
|
||||
} else if !is_equal {
|
||||
conn.email_changes().await?;
|
||||
conn.email_changes(mailbox_hash).await?;
|
||||
}
|
||||
}
|
||||
let mut total = BTreeSet::default();
|
||||
let mut unread = BTreeSet::default();
|
||||
let mut ret = Vec::with_capacity(list.len());
|
||||
for obj in list {
|
||||
ret.push(store.add_envelope(obj));
|
||||
let env = store.add_envelope(obj);
|
||||
total.insert(env.hash());
|
||||
if !env.is_seen() {
|
||||
unread.insert(env.hash());
|
||||
}
|
||||
ret.push(env);
|
||||
}
|
||||
let mut mailboxes_lck = store.mailboxes.write().unwrap();
|
||||
mailboxes_lck.entry(mailbox_hash).and_modify(|mbox| {
|
||||
mbox.total_emails.lock().unwrap().insert_existing_set(total);
|
||||
mbox.unread_emails
|
||||
.lock()
|
||||
.unwrap()
|
||||
.insert_existing_set(unread);
|
||||
});
|
||||
Ok(ret)
|
||||
}
|
||||
|
||||
|
|
|
@ -1013,3 +1013,161 @@ pub struct UploadResponse {
|
|||
/// The size of the file in octets.
|
||||
pub size: usize,
|
||||
}
|
||||
|
||||
/// #`queryChanges`
|
||||
///
|
||||
/// The "Foo/queryChanges" method allows a client to efficiently update
|
||||
/// the state of a cached query to match the new state on the server. It
|
||||
/// takes the following arguments:
|
||||
#[derive(Serialize, Debug)]
|
||||
#[serde(rename_all = "camelCase")]
|
||||
pub struct QueryChanges<F: FilterTrait<OBJ>, OBJ: Object>
|
||||
where
|
||||
OBJ: std::fmt::Debug + Serialize,
|
||||
{
|
||||
pub account_id: Id<Account>,
|
||||
pub filter: Option<F>,
|
||||
pub sort: Option<Comparator<OBJ>>,
|
||||
///sinceQueryState: "String"
|
||||
///
|
||||
///The current state of the query in the client. This is the string
|
||||
///that was returned as the "queryState" argument in the "Foo/query"
|
||||
///response with the same sort/filter. The server will return the
|
||||
///changes made to the query since this state.
|
||||
pub since_query_state: String,
|
||||
///o maxChanges: "UnsignedInt|null"
|
||||
///
|
||||
///The maximum number of changes to return in the response. See
|
||||
///error descriptions below for more details.
|
||||
pub max_changes: Option<usize>,
|
||||
///o upToId: "Id|null"
|
||||
///
|
||||
///The last (highest-index) id the client currently has cached from
|
||||
///the query results. When there are a large number of results, in a
|
||||
///common case, the client may have only downloaded and cached a
|
||||
///small subset from the beginning of the results. If the sort and
|
||||
///filter are both only on immutable properties, this allows the
|
||||
///server to omit changes after this point in the results, which can
|
||||
///significantly increase efficiency. If they are not immutable,
|
||||
///this argument is ignored.
|
||||
pub up_to_id: Option<Id<OBJ>>,
|
||||
|
||||
///o calculateTotal: "Boolean" (default: false)
|
||||
///
|
||||
///Does the client wish to know the total number of results now in
|
||||
///the query? This may be slow and expensive for servers to
|
||||
///calculate, particularly with complex filters, so clients should
|
||||
///take care to only request the total when needed.
|
||||
#[serde(default = "bool_false")]
|
||||
pub calculate_total: bool,
|
||||
|
||||
#[serde(skip)]
|
||||
_ph: PhantomData<fn() -> OBJ>,
|
||||
}
|
||||
|
||||
impl<F: FilterTrait<OBJ>, OBJ: Object> QueryChanges<F, OBJ>
|
||||
where
|
||||
OBJ: std::fmt::Debug + Serialize,
|
||||
{
|
||||
pub fn new(account_id: Id<Account>, since_query_state: String) -> Self {
|
||||
Self {
|
||||
account_id,
|
||||
filter: None,
|
||||
sort: None,
|
||||
since_query_state,
|
||||
max_changes: None,
|
||||
up_to_id: None,
|
||||
calculate_total: false,
|
||||
_ph: PhantomData,
|
||||
}
|
||||
}
|
||||
_impl!(filter: Option<F>);
|
||||
_impl!(sort: Option<Comparator<OBJ>>);
|
||||
_impl!(max_changes: Option<usize>);
|
||||
_impl!(up_to_id: Option<Id<OBJ>>);
|
||||
_impl!(calculate_total: bool);
|
||||
}
|
||||
|
||||
#[derive(Serialize, Deserialize, Debug)]
|
||||
#[serde(rename_all = "camelCase")]
|
||||
pub struct QueryChangesResponse<OBJ: Object> {
|
||||
/// The id of the account used for the call.
|
||||
pub account_id: Id<Account>,
|
||||
/// This is the "sinceQueryState" argument echoed back; that is, the state from which the server is returning changes.
|
||||
pub old_query_state: String,
|
||||
///This is the state the query will be in after applying the set of changes to the old state.
|
||||
pub new_query_state: String,
|
||||
/// The total number of Foos in the results (given the "filter"). This argument MUST be omitted if the "calculateTotal" request argument is not true.
|
||||
#[serde(default)]
|
||||
pub total: Option<usize>,
|
||||
///The "id" for every Foo that was in the query results in the old
|
||||
///state and that is not in the results in the new state.
|
||||
|
||||
///If the server cannot calculate this exactly, the server MAY return
|
||||
///the ids of extra Foos in addition that may have been in the old
|
||||
///results but are not in the new results.
|
||||
|
||||
///If the sort and filter are both only on immutable properties and
|
||||
///an "upToId" is supplied and exists in the results, any ids that
|
||||
///were removed but have a higher index than "upToId" SHOULD be
|
||||
///omitted.
|
||||
|
||||
///If the "filter" or "sort" includes a mutable property, the server
|
||||
///MUST include all Foos in the current results for which this
|
||||
///property may have changed. The position of these may have moved
|
||||
///in the results, so they must be reinserted by the client to ensure
|
||||
///its query cache is correct.
|
||||
pub removed: Vec<Id<OBJ>>,
|
||||
///The id and index in the query results (in the new state) for every
|
||||
///Foo that has been added to the results since the old state AND
|
||||
///every Foo in the current results that was included in the
|
||||
///"removed" array (due to a filter or sort based upon a mutable
|
||||
///property).
|
||||
|
||||
///If the sort and filter are both only on immutable properties and
|
||||
///an "upToId" is supplied and exists in the results, any ids that
|
||||
///were added but have a higher index than "upToId" SHOULD be
|
||||
///omitted.
|
||||
|
||||
///The array MUST be sorted in order of index, with the lowest index
|
||||
///first.
|
||||
|
||||
///An *AddedItem* object has the following properties:
|
||||
|
||||
///* id: "Id"
|
||||
|
||||
///* index: "UnsignedInt"
|
||||
|
||||
///The result of this is that if the client has a cached sparse array of
|
||||
///Foo ids corresponding to the results in the old state, then:
|
||||
|
||||
///fooIds = [ "id1", "id2", null, null, "id3", "id4", null, null, null ]
|
||||
|
||||
///If it *splices out* all ids in the removed array that it has in its
|
||||
///cached results, then:
|
||||
|
||||
/// removed = [ "id2", "id31", ... ];
|
||||
/// fooIds => [ "id1", null, null, "id3", "id4", null, null, null ]
|
||||
|
||||
///and *splices in* (one by one in order, starting with the lowest
|
||||
///index) all of the ids in the added array:
|
||||
|
||||
///added = [{ id: "id5", index: 0, ... }];
|
||||
///fooIds => [ "id5", "id1", null, null, "id3", "id4", null, null, null ]
|
||||
|
||||
///and *truncates* or *extends* to the new total length, then the
|
||||
///results will now be in the new state.
|
||||
|
||||
///Note: splicing in adds the item at the given index, incrementing the
|
||||
///index of all items previously at that or a higher index. Splicing
|
||||
///out is the inverse, removing the item and decrementing the index of
|
||||
///every item after it in the array.
|
||||
pub added: Vec<AddedItem<OBJ>>,
|
||||
}
|
||||
|
||||
#[derive(Serialize, Deserialize, Debug)]
|
||||
#[serde(rename_all = "camelCase")]
|
||||
pub struct AddedItem<OBJ: Object> {
|
||||
pub id: Id<OBJ>,
|
||||
pub index: usize,
|
||||
}
|
||||
|
|
|
@ -18,9 +18,8 @@
|
|||
* You should have received a copy of the GNU General Public License
|
||||
* along with meli. If not, see <http://www.gnu.org/licenses/>.
|
||||
*/
|
||||
use crate::backends::imap::LazyCountSet;
|
||||
use crate::backends::{
|
||||
BackendMailbox, Mailbox, MailboxHash, MailboxPermissions, SpecialUsageMailbox,
|
||||
BackendMailbox, LazyCountSet, Mailbox, MailboxHash, MailboxPermissions, SpecialUsageMailbox,
|
||||
};
|
||||
use crate::error::*;
|
||||
use std::sync::{Arc, Mutex};
|
||||
|
|
Loading…
Reference in New Issue