From b2c14abd6e77a8ebfc3ee644ed35f77d1fb40cd0 Mon Sep 17 00:00:00 2001 From: Manos Pitsidianakis Date: Sun, 9 Aug 2020 09:47:01 +0300 Subject: [PATCH] melib/jmap: add {flag,tag} set support Closes #61 --- melib/src/backends/jmap.rs | 177 ++++++++++++++- melib/src/backends/jmap/connection.rs | 15 ++ melib/src/backends/jmap/objects/email.rs | 17 ++ melib/src/backends/jmap/protocol.rs | 24 ++ melib/src/backends/jmap/rfc8620.rs | 273 +++++++++++++++++++++++ 5 files changed, 504 insertions(+), 2 deletions(-) diff --git a/melib/src/backends/jmap.rs b/melib/src/backends/jmap.rs index c2b93061..793255ea 100644 --- a/melib/src/backends/jmap.rs +++ b/melib/src/backends/jmap.rs @@ -27,6 +27,7 @@ use crate::error::{MeliError, Result}; use futures::lock::Mutex as FutureMutex; use isahc::prelude::HttpClient; use isahc::ResponseExt; +use serde_json::Value; use std::collections::{BTreeMap, HashMap}; use std::convert::TryFrom; use std::str::FromStr; @@ -183,6 +184,7 @@ pub struct Store { #[derive(Debug)] pub struct JmapType { account_name: String, + account_hash: AccountHash, online: Arc)>>, is_subscribed: Arc, server_conf: JmapServerConf, @@ -240,8 +242,12 @@ impl MailBackend for JmapType { })) } - fn watch_async(&self, _sender: RefreshEventConsumer) -> ResultFuture<()> { - Err(MeliError::from("JMAP watch for updates is unimplemented")) + fn watch_async(&self, sender: RefreshEventConsumer) -> ResultFuture<()> { + let conn = self.connection.clone(); + Ok(Box::pin(async move { + *conn.lock().await.sender.lock().unwrap() = Some(sender); + Err(MeliError::from("JMAP watch for updates is unimplemented")) + })) } fn mailboxes_async(&self) -> ResultFuture> { @@ -364,6 +370,164 @@ impl MailBackend for JmapType { fn mailboxes(&self) -> Result> { Err(MeliError::new("Unimplemented.")) } + + fn rename_mailbox( + &mut self, + _mailbox_hash: MailboxHash, + _new_path: String, + ) -> ResultFuture { + Err(MeliError::new("Unimplemented.")) + } + + fn create_mailbox( + &mut self, + _path: String, + ) -> ResultFuture<(MailboxHash, HashMap)> { + Err(MeliError::new("Unimplemented.")) + } + + fn copy_messages( + &mut self, + _env_hashes: EnvelopeHashBatch, + _source_mailbox_hash: MailboxHash, + _destination_mailbox_hash: MailboxHash, + _move_: bool, + _destination_flags: Option, + ) -> ResultFuture<()> { + Err(MeliError::new("Unimplemented.")) + } + + fn set_flags( + &mut self, + env_hashes: EnvelopeHashBatch, + mailbox_hash: MailboxHash, + flags: SmallVec<[(std::result::Result, bool); 8]>, + ) -> ResultFuture<()> { + let mailboxes = self.mailboxes.clone(); + let store = self.store.clone(); + let account_hash = self.account_hash; + let tag_index = self.tag_index.clone(); + let connection = self.connection.clone(); + Ok(Box::pin(async move { + let mailbox_id = mailboxes.read().unwrap()[&mailbox_hash].id.clone(); + let mut update_map: HashMap = HashMap::default(); + let mut ids: Vec = Vec::with_capacity(env_hashes.rest.len() + 1); + let mut id_map: HashMap = HashMap::default(); + let mut update_keywords: HashMap = HashMap::default(); + for (flag, value) in flags.iter() { + match flag { + Ok(f) => { + update_keywords.insert( + format!( + "keywords/{}", + match *f { + Flag::DRAFT => "$draft", + Flag::FLAGGED => "$flagged", + Flag::SEEN => "$seen", + Flag::REPLIED => "$answered", + Flag::TRASHED => "$junk", + Flag::PASSED => "$passed", + _ => continue, //FIXME + } + ), + if *value { + serde_json::json!(true) + } else { + serde_json::json!(null) + }, + ); + } + Err(t) => { + update_keywords.insert( + format!("keywords/{}", t), + if *value { + serde_json::json!(true) + } else { + serde_json::json!(null) + }, + ); + } + } + } + { + let store_lck = store.read().unwrap(); + for hash in env_hashes.iter() { + if let Some(id) = store_lck.id_store.get(&hash) { + ids.push(id.clone()); + id_map.insert(id.clone(), hash); + update_map.insert(id.clone(), serde_json::json!(update_keywords.clone())); + } + } + } + let conn = connection.lock().await; + + let email_set_call: EmailSet = EmailSet::new( + Set::::new() + .account_id(conn.mail_account_id().to_string()) + .update(Some(update_map)), + ); + + let mut req = Request::new(conn.request_no.clone()); + let prev_seq = req.add_call(&email_set_call); + let email_call: EmailGet = EmailGet::new( + Get::new() + .ids(Some(JmapArgument::Value(ids))) + .account_id(conn.mail_account_id().to_string()) + .properties(Some(vec!["keywords".to_string()])), + ); + + req.add_call(&email_call); + //debug!(serde_json::to_string(&req)?); + let mut res = conn + .client + .post_async(&conn.session.api_url, serde_json::to_string(&req)?) + .await?; + + let res_text = res.text_async().await?; + /* + *{"methodResponses":[["Email/set",{"notUpdated":null,"notDestroyed":null,"oldState":"86","newState":"87","accountId":"u148940c7","updated":{"M045926eed54b11423918f392":{"id":"M045926eed54b11423918f392"}},"created":null,"destroyed":null,"notCreated":null},"m3"]],"sessionState":"cyrus-0;p-5;vfs-0"} + */ + //debug!("res_text = {}", &res_text); + let mut v: MethodResponse = serde_json::from_str(&res_text).unwrap(); + *conn.online_status.lock().await = (std::time::Instant::now(), Ok(())); + let m = SetResponse::::try_from(v.method_responses.remove(0))?; + if let Some(ids) = m.not_updated { + return Err(MeliError::new( + ids.into_iter() + .map(|err| err.to_string()) + .collect::>() + .join(","), + )); + } + + let mut tag_index_lck = tag_index.write().unwrap(); + for (flag, value) in flags.iter() { + match flag { + Ok(f) => {} + Err(t) => { + if *value { + tag_index_lck.insert(tag_hash!(t), t.clone()); + } + } + } + } + let e = GetResponse::::try_from(v.method_responses.pop().unwrap())?; + let GetResponse:: { list, state, .. } = e; + //debug!(&list); + for envobj in list { + let env_hash = id_map[&envobj.id]; + conn.add_refresh_event(RefreshEvent { + account_hash, + mailbox_hash, + kind: RefreshEventKind::NewFlags( + env_hash, + protocol::keywords_to_flags(envobj.keywords().keys().cloned().collect()), + ), + }); + } + Ok(()) + })) + } } impl JmapType { @@ -377,6 +541,14 @@ impl JmapType { ))); let server_conf = JmapServerConf::new(s)?; + let account_hash = { + use std::collections::hash_map::DefaultHasher; + use std::hash::Hasher; + let mut hasher = DefaultHasher::new(); + hasher.write(s.name.as_bytes()); + hasher.finish() + }; + Ok(Box::new(JmapType { connection: Arc::new(FutureMutex::new(JmapConnection::new( &server_conf, @@ -386,6 +558,7 @@ impl JmapType { tag_index: Arc::new(RwLock::new(Default::default())), mailboxes: Arc::new(RwLock::new(HashMap::default())), account_name: s.name.clone(), + account_hash, online, is_subscribed: Arc::new(IsSubscribedFn(is_subscribed)), server_conf, diff --git a/melib/src/backends/jmap/connection.rs b/melib/src/backends/jmap/connection.rs index f3e3ab38..4c17e93b 100644 --- a/melib/src/backends/jmap/connection.rs +++ b/melib/src/backends/jmap/connection.rs @@ -31,6 +31,8 @@ pub struct JmapConnection { pub server_conf: JmapServerConf, pub account_id: Arc>, pub method_call_states: Arc>>, + pub refresh_events: Arc>>, + pub sender: Arc>>, } impl JmapConnection { @@ -55,6 +57,8 @@ impl JmapConnection { server_conf, account_id: Arc::new(Mutex::new(String::new())), method_call_states: Arc::new(Mutex::new(Default::default())), + refresh_events: Arc::new(Mutex::new(Default::default())), + sender: Arc::new(Mutex::new(Default::default())), }) } @@ -110,4 +114,15 @@ impl JmapConnection { pub fn mail_account_id(&self) -> &Id { &self.session.primary_accounts["urn:ietf:params:jmap:mail"] } + + pub fn add_refresh_event(&self, event: RefreshEvent) { + if let Some(ref sender) = self.sender.lock().unwrap().as_ref() { + for event in self.refresh_events.lock().unwrap().drain(..) { + sender.send(event); + } + sender.send(event); + } else { + self.refresh_events.lock().unwrap().push(event); + } + } } diff --git a/melib/src/backends/jmap/objects/email.rs b/melib/src/backends/jmap/objects/email.rs index 49f1350a..f5f529cc 100644 --- a/melib/src/backends/jmap/objects/email.rs +++ b/melib/src/backends/jmap/objects/email.rs @@ -752,3 +752,20 @@ fn test_jmap_query() { ); assert_eq!(*request_no.lock().unwrap(), 1); } + +#[derive(Serialize, Debug)] +#[serde(rename_all = "camelCase")] +pub struct EmailSet { + #[serde(flatten)] + pub set_call: Set, +} + +impl Method for EmailSet { + const NAME: &'static str = "Email/set"; +} + +impl EmailSet { + pub fn new(set_call: Set) -> Self { + EmailSet { set_call } + } +} diff --git a/melib/src/backends/jmap/protocol.rs b/melib/src/backends/jmap/protocol.rs index be982c0d..87e33b04 100644 --- a/melib/src/backends/jmap/protocol.rs +++ b/melib/src/backends/jmap/protocol.rs @@ -338,3 +338,27 @@ pub async fn fetch( } Ok(ret) } + +pub fn keywords_to_flags(keywords: Vec) -> (Flag, Vec) { + let mut f = Flag::default(); + let mut tags = vec![]; + for k in keywords { + match k.as_str() { + "$draft" => { + f |= Flag::DRAFT; + } + "$seen" => { + f |= Flag::SEEN; + } + "$flagged" => { + f |= Flag::FLAGGED; + } + "$answered" => { + f |= Flag::REPLIED; + } + "$junk" | "$notjunk" => { /* ignore */ } + _ => tags.push(k), + } + } + (f, tags) +} diff --git a/melib/src/backends/jmap/rfc8620.rs b/melib/src/backends/jmap/rfc8620.rs index 787dcb2d..d75185be 100644 --- a/melib/src/backends/jmap/rfc8620.rs +++ b/melib/src/backends/jmap/rfc8620.rs @@ -357,6 +357,15 @@ pub struct ResultField, OBJ: Object> { pub _ph: PhantomData (OBJ, M)>, } +impl, OBJ: Object> ResultField { + pub fn new(field: &'static str) -> Self { + ResultField { + field, + _ph: PhantomData, + } + } +} + // error[E0723]: trait bounds other than `Sized` on const fn parameters are unstable // --> melib/src/backends/jmap/rfc8620.rs:626:6 // | @@ -485,6 +494,270 @@ impl ChangesResponse { _impl!(get_mut destroyed_mut, destroyed: Vec); } +///#`set` +/// +///Modifying the state of Foo objects on the server is done via the +///"Foo/set" method. This encompasses creating, updating, and +///destroying Foo records. This allows the server to sort out ordering +///and dependencies that may exist if doing multiple operations at once +///(for example, to ensure there is always a minimum number of a certain +///record type). +#[derive(Deserialize, Serialize, Debug)] +#[serde(rename_all = "camelCase")] +pub struct Set +where + OBJ: std::fmt::Debug + Serialize, +{ + #[serde(skip_serializing_if = "String::is_empty")] + ///o accountId: "Id" + /// + /// The id of the account to use. + pub account_id: String, + ///o ifInState: "String|null" + /// + /// This is a state string as returned by the "Foo/get" method + /// (representing the state of all objects of this type in the + /// account). If supplied, the string must match the current state; + /// otherwise, the method will be aborted and a "stateMismatch" error + /// returned. If null, any changes will be applied to the current + /// state. + pub if_in_state: Option, + ///o create: "Id[Foo]|null" + /// + /// A map of a *creation id* (a temporary id set by the client) to Foo + /// objects, or null if no objects are to be created. + /// + /// The Foo object type definition may define default values for + /// properties. Any such property may be omitted by the client. + /// + /// The client MUST omit any properties that may only be set by the + /// server (for example, the "id" property on most object types). + /// + pub create: Option>, + ///o update: "Id[PatchObject]|null" + /// + /// A map of an id to a Patch object to apply to the current Foo + /// object with that id, or null if no objects are to be updated. + /// + /// A *PatchObject* is of type "String[*]" and represents an unordered + /// set of patches. The keys are a path in JSON Pointer format + /// [RFC6901], with an implicit leading "/" (i.e., prefix each key + /// with "/" before applying the JSON Pointer evaluation algorithm). + /// + /// All paths MUST also conform to the following restrictions; if + /// there is any violation, the update MUST be rejected with an + /// "invalidPatch" error: + /// * The pointer MUST NOT reference inside an array (i.e., you MUST + /// NOT insert/delete from an array; the array MUST be replaced in + /// its entirety instead). + /// + /// * All parts prior to the last (i.e., the value after the final + /// slash) MUST already exist on the object being patched. + /// + /// * There MUST NOT be two patches in the PatchObject where the + /// pointer of one is the prefix of the pointer of the other, e.g., + /// "alerts/1/offset" and "alerts". + /// + /// The value associated with each pointer determines how to apply + /// that patch: + /// + /// * If null, set to the default value if specified for this + /// property; otherwise, remove the property from the patched + /// object. If the key is not present in the parent, this a no-op. + /// + /// * Anything else: The value to set for this property (this may be + /// a replacement or addition to the object being patched). + /// + /// Any server-set properties MAY be included in the patch if their + /// value is identical to the current server value (before applying + /// the patches to the object). Otherwise, the update MUST be + /// rejected with an "invalidProperties" SetError. + /// + /// This patch definition is designed such that an entire Foo object + /// is also a valid PatchObject. The client may choose to optimise + /// network usage by just sending the diff or may send the whole + /// object; the server processes it the same either way. + pub update: Option>, + ///o destroy: "Id[]|null" + /// + /// A list of ids for Foo objects to permanently delete, or null if no + /// objects are to be destroyed. + pub destroy: Option>, +} + +impl Set +where + OBJ: std::fmt::Debug + Serialize, +{ + pub fn new() -> Self { + Self { + account_id: String::new(), + if_in_state: None, + create: None, + update: None, + destroy: None, + } + } + _impl!(account_id: String); + _impl!( + ///o ifInState: "String|null" + /// + /// This is a state string as returned by the "Foo/get" method + /// (representing the state of all objects of this type in the + /// account). If supplied, the string must match the current state; + /// otherwise, the method will be aborted and a "stateMismatch" error + /// returned. If null, any changes will be applied to the current + /// state. + if_in_state: Option + ); + _impl!(update: Option>); +} + +#[derive(Serialize, Deserialize, Debug)] +#[serde(rename_all = "camelCase")] +pub struct SetResponse { + ///o accountId: "Id" + /// + /// The id of the account used for the call. + pub account_id: String, + ///o oldState: "String|null" + /// + /// The state string that would have been returned by "Foo/get" before + /// making the requested changes, or null if the server doesn't know + /// what the previous state string was. + pub old_state: String, + ///o newState: "String" + /// + /// The state string that will now be returned by "Foo/get". + pub new_state: String, + ///o created: "Id[Foo]|null" + /// + /// A map of the creation id to an object containing any properties of + /// the created Foo object that were not sent by the client. This + /// includes all server-set properties (such as the "id" in most + /// object types) and any properties that were omitted by the client + /// and thus set to a default by the server. + /// + /// This argument is null if no Foo objects were successfully created. + pub created: Option>, + ///o updated: "Id[Foo|null]|null" + /// + /// The keys in this map are the ids of all Foos that were + /// successfully updated. + /// + /// The value for each id is a Foo object containing any property that + /// changed in a way *not* explicitly requested by the PatchObject + /// sent to the server, or null if none. This lets the client know of + /// any changes to server-set or computed properties. + /// + /// This argument is null if no Foo objects were successfully updated. + pub updated: Option>>, + ///o destroyed: "Id[]|null" + /// + /// A list of Foo ids for records that were successfully destroyed, or + /// null if none. + pub destroyed: Option>, + ///o notCreated: "Id[SetError]|null" + /// + /// A map of the creation id to a SetError object for each record that + /// failed to be created, or null if all successful. + pub not_created: Option>, + ///o notUpdated: "Id[SetError]|null" + /// + /// A map of the Foo id to a SetError object for each record that + /// failed to be updated, or null if all successful. + pub not_updated: Option>, + ///o notDestroyed: "Id[SetError]|null" + /// + /// A map of the Foo id to a SetError object for each record that + /// failed to be destroyed, or null if all successful.// + pub not_destroyed: Option>, +} + +impl std::convert::TryFrom<&RawValue> for SetResponse { + type Error = crate::error::MeliError; + fn try_from(t: &RawValue) -> Result, crate::error::MeliError> { + let res: (String, SetResponse, String) = serde_json::from_str(t.get())?; + assert_eq!(&res.0, &format!("{}/set", OBJ::NAME)); + Ok(res.1) + } +} + +#[derive(Deserialize, Serialize, Debug)] +#[serde(rename_all = "camelCase")] +#[serde(tag = "type", content = "description")] +pub enum SetError { + ///(create; update; destroy). The create/update/destroy would violate an ACL or other permissions policy. + Forbidden(Option), + ///(create; update). The create would exceed a server- defined limit on the number or total size of objects of this type. + OverQuota(Option), + + ///(create; update). The create/update would result in an object that exceeds a server-defined limit for the maximum size of a single object of this type. + TooLarge(Option), + + ///(create). Too many objects of this type have been created recently, and a server-defined rate limit has been reached. It may work if tried again later. + RateLimit(Option), + + ///(update; destroy). The id given to update/destroy cannot be found. + NotFound(Option), + + ///(update). The PatchObject given to update the record was not a valid patch (see the patch description). + InvalidPatch(Option), + + ///(update). The client requested that an object be both updated and destroyed in the same /set request, and the server has decided to therefore ignore the update. + WillDestroy(Option), + ///(create; update). The record given is invalid in some way. + InvalidProperties { + description: Option, + properties: Vec, + }, + ///(create; destroy). This is a singleton type, so you cannot create another one or destroy the existing one. + Singleton(Option), + RequestTooLarge(Option), + StateMismatch(Option), +} + +impl core::fmt::Display for SetError { + fn fmt(&self, fmt: &mut core::fmt::Formatter) -> core::fmt::Result { + use SetError::*; + match self { + Forbidden(Some(description)) => write!(fmt, "Forbidden: {}", description), + Forbidden(None) => write!(fmt, "Forbidden"), + OverQuota(Some(description)) => write!(fmt, "OverQuota: {}", description), + OverQuota(None) => write!(fmt, "OverQuota"), + TooLarge(Some(description)) => write!(fmt, "TooLarge: {}", description), + TooLarge(None) => write!(fmt, "TooLarge"), + RateLimit(Some(description)) => write!(fmt, "RateLimit: {}", description), + RateLimit(None) => write!(fmt, "RateLimit"), + NotFound(Some(description)) => write!(fmt, "NotFound: {}", description), + NotFound(None) => write!(fmt, "NotFound"), + InvalidPatch(Some(description)) => write!(fmt, "InvalidPatch: {}", description), + InvalidPatch(None) => write!(fmt, "InvalidPatch"), + WillDestroy(Some(description)) => write!(fmt, "WillDestroy: {}", description), + WillDestroy(None) => write!(fmt, "WillDestroy"), + InvalidProperties { + description: Some(description), + properties, + } => write!( + fmt, + "InvalidProperties: {}, {}", + description, + properties.join(",") + ), + InvalidProperties { + description: None, + properties, + } => write!(fmt, "InvalidProperties: {}", properties.join(",")), + Singleton(Some(description)) => write!(fmt, "Singleton: {}", description), + Singleton(None) => write!(fmt, "Singleton"), + RequestTooLarge(Some(description)) => write!(fmt, "RequestTooLarge: {}", description), + RequestTooLarge(None) => write!(fmt, "RequestTooLarge"), + StateMismatch(Some(description)) => write!(fmt, "StateMismatch: {}", description), + StateMismatch(None) => write!(fmt, "StateMismatch"), + } + } +} + pub fn download_request_format( session: &JmapSession, account_id: &Id,