From bfc36f63de4d532d012cb37221688eb4858dc168 Mon Sep 17 00:00:00 2001 From: Manos Pitsidianakis Date: Sat, 14 Sep 2019 15:57:28 +0300 Subject: [PATCH] imap: add byte cache for Envelopes in IMAP backend --- melib/src/backends/imap.rs | 33 +++- melib/src/backends/imap/operations.rs | 260 +++++++++++++++----------- melib/src/backends/imap/watch.rs | 1 - 3 files changed, 181 insertions(+), 113 deletions(-) diff --git a/melib/src/backends/imap.rs b/melib/src/backends/imap.rs index 6e209742..2f81537a 100644 --- a/melib/src/backends/imap.rs +++ b/melib/src/backends/imap.rs @@ -53,19 +53,30 @@ use std::str::FromStr; use std::sync::{Arc, Mutex}; pub type UID = usize; +#[derive(Debug, Default)] +pub struct EnvelopeCache { + bytes: Option, + headers: Option, + body: Option, + flags: Option, +} + +type Capabilities = FnvHashSet>; #[derive(Debug)] pub struct ImapType { account_name: String, server_hostname: String, server_username: String, server_password: String, - connection: Arc>, danger_accept_invalid_certs: bool, + connection: Arc>, - capabilities: FnvHashSet>, + capabilities: Capabilities, folders: FnvHashMap, hash_index: Arc>>, uid_index: Arc>>, + + byte_cache: Arc>>, } impl MailBackend for ImapType { @@ -106,7 +117,7 @@ impl MailBackend for ImapType { .map_err(MeliError::from); exit_on_error!(&tx, examine_response); let mut exists: usize = match examine_response.unwrap() { - SelectResponse::Ok(ok) => ok.exists, + SelectResponse::Ok(ok) => ok.uidnext - 1, SelectResponse::Bad(b) => b.exists, }; { @@ -117,7 +128,7 @@ impl MailBackend for ImapType { while exists > 1 { let mut envelopes = vec![]; exit_on_error!(&tx, - conn.send_command(format!("UID FETCH {}:{} (FLAGS ENVELOPE)", std::cmp::max(exists.saturating_sub(20000), 1), exists).as_bytes()) + conn.send_command(format!("UID FETCH {}:{} (UID FLAGS ENVELOPE)", std::cmp::max(exists.saturating_sub(20000), 1), exists).as_bytes()) conn.read_response(&mut response) ); debug!( @@ -136,6 +147,9 @@ impl MailBackend for ImapType { h.write_usize(uid); h.write(folder_path.as_bytes()); env.set_hash(h.finish()); + if let Some(flags) = flags { + env.set_flags(flags); + } hash_index .lock() .unwrap() @@ -260,6 +274,7 @@ impl MailBackend for ImapType { uid, self.folders[&folder_hash].path().to_string(), self.connection.clone(), + self.byte_cache.clone(), )) } @@ -501,6 +516,7 @@ impl ImapType { hash_index: Default::default(), uid_index: Default::default(), capabilities: Default::default(), + byte_cache: Default::default(), }; let mut res = String::with_capacity(8 * 1024); @@ -555,7 +571,14 @@ impl ImapType { match io::stdin().read_line(&mut input) { Ok(_) => { conn.send_command(input.as_bytes()).unwrap(); - conn.read_response(&mut res).unwrap(); + conn.read_lines(&mut res, String::new()).unwrap(); + if input.trim() == "IDLE" { + let mut iter = ImapBlockingConnection::from(conn); + while let Some(line) = iter.next() { + debug!("out: {}", unsafe { std::str::from_utf8_unchecked(&line) }); + } + conn = iter.into_conn(); + } debug!("out: {}", &res); if input.trim().eq_ignore_ascii_case("logout") { break; diff --git a/melib/src/backends/imap/operations.rs b/melib/src/backends/imap/operations.rs index 266bb14d..ade4b70c 100644 --- a/melib/src/backends/imap/operations.rs +++ b/melib/src/backends/imap/operations.rs @@ -37,10 +37,16 @@ pub struct ImapOp { folder_path: String, flags: Cell>, connection: Arc>, + byte_cache: Arc>>, } impl ImapOp { - pub fn new(uid: usize, folder_path: String, connection: Arc>) -> Self { + pub fn new( + uid: usize, + folder_path: String, + connection: Arc>, + byte_cache: Arc>>, + ) -> Self { ImapOp { uid, connection, @@ -49,6 +55,7 @@ impl ImapOp { body: None, folder_path, flags: Cell::new(None), + byte_cache, } } } @@ -60,40 +67,49 @@ impl BackendOp for ImapOp { fn as_bytes(&mut self) -> Result<&[u8]> { if self.bytes.is_none() { - let mut response = String::with_capacity(8 * 1024); - { - let mut conn = self.connection.lock().unwrap(); - conn.send_command(format!("SELECT {}", self.folder_path).as_bytes())?; - conn.read_response(&mut response)?; - conn.send_command(format!("UID FETCH {} (FLAGS RFC822)", self.uid).as_bytes())?; - conn.read_response(&mut response)?; - } - debug!( - "fetch response is {} bytes and {} lines", - response.len(), - response.lines().collect::>().len() - ); - match protocol_parser::uid_fetch_response(response.as_bytes()) - .to_full_result() - .map_err(MeliError::from) - { - Ok(v) => { - if v.len() != 1 { - debug!("responses len is {}", v.len()); - /* TODO: Trigger cache invalidation here. */ - return Err(MeliError::new(format!( - "message with UID {} was not found", - self.uid - ))); - } - let (uid, flags, b) = v[0]; - assert_eq!(uid, self.uid); - if flags.is_some() { - self.flags.set(flags); - } - self.bytes = Some(unsafe { std::str::from_utf8_unchecked(b).to_string() }); + let mut bytes_cache = self.byte_cache.lock()?; + let cache = bytes_cache.entry(self.uid).or_default(); + if cache.bytes.is_some() { + self.bytes = cache.bytes.clone(); + } else { + let mut response = String::with_capacity(8 * 1024); + { + let mut conn = self.connection.lock().unwrap(); + conn.send_command(format!("SELECT {}", self.folder_path).as_bytes())?; + conn.read_response(&mut response)?; + conn.send_command(format!("UID FETCH {} (FLAGS RFC822)", self.uid).as_bytes())?; + conn.read_response(&mut response)?; } - Err(e) => return Err(e), + debug!( + "fetch response is {} bytes and {} lines", + response.len(), + response.lines().collect::>().len() + ); + match protocol_parser::uid_fetch_response(response.as_bytes()) + .to_full_result() + .map_err(MeliError::from) + { + Ok(v) => { + if v.len() != 1 { + debug!("responses len is {}", v.len()); + /* TODO: Trigger cache invalidation here. */ + return Err(MeliError::new(format!( + "message with UID {} was not found", + self.uid + ))); + } + let (uid, flags, b) = v[0]; + assert_eq!(uid, self.uid); + if flags.is_some() { + self.flags.set(flags); + cache.flags = flags; + } + cache.bytes = Some(unsafe { std::str::from_utf8_unchecked(b).to_string() }); + } + Err(e) => return Err(e), + } + + self.bytes = cache.bytes.clone(); } } Ok(self.bytes.as_ref().unwrap().as_bytes()) @@ -106,39 +122,50 @@ impl BackendOp for ImapOp { return Ok(result); } if self.headers.is_none() { - let mut response = String::with_capacity(8 * 1024); - let mut conn = self.connection.lock().unwrap(); - conn.send_command(format!("UID FETCH {} (FLAGS RFC822.HEADER)", self.uid).as_bytes())?; - conn.read_response(&mut response)?; - debug!( - "fetch response is {} bytes and {} lines", - response.len(), - response.lines().collect::>().len() - ); - match protocol_parser::uid_fetch_response(response.as_bytes()) - .to_full_result() - .map_err(MeliError::from) - { - Ok(v) => { - if v.len() != 1 { - debug!("responses len is {}", v.len()); - /* TODO: Trigger cache invalidation here. */ - return Err(MeliError::new(format!( - "message with UID {} was not found", - self.uid - ))); + let mut bytes_cache = self.byte_cache.lock()?; + let cache = bytes_cache.entry(self.uid).or_default(); + if cache.headers.is_some() { + self.headers = cache.headers.clone(); + } else { + let mut response = String::with_capacity(8 * 1024); + let mut conn = self.connection.lock().unwrap(); + conn.send_command( + format!("UID FETCH {} (FLAGS RFC822.HEADER)", self.uid).as_bytes(), + )?; + conn.read_response(&mut response)?; + debug!( + "fetch response is {} bytes and {} lines", + response.len(), + response.lines().collect::>().len() + ); + match protocol_parser::uid_fetch_response(response.as_bytes()) + .to_full_result() + .map_err(MeliError::from) + { + Ok(v) => { + if v.len() != 1 { + debug!("responses len is {}", v.len()); + /* TODO: Trigger cache invalidation here. */ + return Err(MeliError::new(format!( + "message with UID {} was not found", + self.uid + ))); + } + let (uid, flags, b) = v[0]; + assert_eq!(uid, self.uid); + if flags.is_some() { + self.flags.set(flags); + cache.flags = flags; + } + cache.headers = + Some(unsafe { std::str::from_utf8_unchecked(b).to_string() }); } - let (uid, flags, b) = v[0]; - assert_eq!(uid, self.uid); - if flags.is_some() { - self.flags.set(flags); - } - self.body = Some(unsafe { std::str::from_utf8_unchecked(b).to_string() }); + Err(e) => return Err(e), } - Err(e) => return Err(e), + self.headers = cache.headers.clone(); } } - Ok(self.body.as_ref().unwrap().as_bytes()) + Ok(self.headers.as_ref().unwrap().as_bytes()) } fn fetch_body(&mut self) -> Result<&[u8]> { @@ -148,10 +175,64 @@ impl BackendOp for ImapOp { return Ok(result); } if self.body.is_none() { + let mut bytes_cache = self.byte_cache.lock()?; + let cache = bytes_cache.entry(self.uid).or_default(); + if cache.body.is_some() { + self.body = cache.body.clone(); + } else { + let mut response = String::with_capacity(8 * 1024); + let mut conn = self.connection.lock().unwrap(); + conn.send_command( + format!("UID FETCH {} (FLAGS RFC822.TEXT)", self.uid).as_bytes(), + )?; + conn.read_response(&mut response)?; + debug!( + "fetch response is {} bytes and {} lines", + response.len(), + response.lines().collect::>().len() + ); + match protocol_parser::uid_fetch_response(response.as_bytes()) + .to_full_result() + .map_err(MeliError::from) + { + Ok(v) => { + if v.len() != 1 { + debug!("responses len is {}", v.len()); + /* TODO: Trigger cache invalidation here. */ + return Err(MeliError::new(format!( + "message with UID {} was not found", + self.uid + ))); + } + let (uid, flags, b) = v[0]; + assert_eq!(uid, self.uid); + if flags.is_some() { + self.flags.set(flags); + } + cache.body = Some(unsafe { std::str::from_utf8_unchecked(b).to_string() }); + } + Err(e) => return Err(e), + } + self.body = cache.body.clone(); + } + } + Ok(self.body.as_ref().unwrap().as_bytes()) + } + + fn fetch_flags(&self) -> Flag { + if self.flags.get().is_some() { + return self.flags.get().unwrap(); + } + let mut bytes_cache = self.byte_cache.lock().unwrap(); + let cache = bytes_cache.entry(self.uid).or_default(); + if cache.flags.is_some() { + self.flags.set(cache.flags); + } else { let mut response = String::with_capacity(8 * 1024); let mut conn = self.connection.lock().unwrap(); - conn.send_command(format!("UID FETCH {} (FLAGS RFC822.TEXT)", self.uid).as_bytes())?; - conn.read_response(&mut response)?; + conn.send_command(format!("UID FETCH {} FLAGS", self.uid).as_bytes()) + .unwrap(); + conn.read_response(&mut response).unwrap(); debug!( "fetch response is {} bytes and {} lines", response.len(), @@ -165,56 +246,18 @@ impl BackendOp for ImapOp { if v.len() != 1 { debug!("responses len is {}", v.len()); /* TODO: Trigger cache invalidation here. */ - return Err(MeliError::new(format!( - "message with UID {} was not found", - self.uid - ))); + panic!(format!("message with UID {} was not found", self.uid)); } - let (uid, flags, b) = v[0]; + let (uid, flags, _) = v[0]; assert_eq!(uid, self.uid); if flags.is_some() { + cache.flags = flags; self.flags.set(flags); } - self.body = Some(unsafe { std::str::from_utf8_unchecked(b).to_string() }); } - Err(e) => return Err(e), + Err(e) => Err(e).unwrap(), } } - Ok(self.body.as_ref().unwrap().as_bytes()) - } - - fn fetch_flags(&self) -> Flag { - if self.flags.get().is_some() { - return self.flags.get().unwrap(); - } - let mut response = String::with_capacity(8 * 1024); - let mut conn = self.connection.lock().unwrap(); - conn.send_command(format!("UID FETCH {} FLAGS", self.uid).as_bytes()) - .unwrap(); - conn.read_response(&mut response).unwrap(); - debug!( - "fetch response is {} bytes and {} lines", - response.len(), - response.lines().collect::>().len() - ); - match protocol_parser::uid_fetch_response(response.as_bytes()) - .to_full_result() - .map_err(MeliError::from) - { - Ok(v) => { - if v.len() != 1 { - debug!("responses len is {}", v.len()); - /* TODO: Trigger cache invalidation here. */ - panic!(format!("message with UID {} was not found", self.uid)); - } - let (uid, flags, _) = v[0]; - assert_eq!(uid, self.uid); - if flags.is_some() { - self.flags.set(flags); - } - } - Err(e) => Err(e).unwrap(), - } self.flags.get().unwrap() } @@ -252,6 +295,9 @@ impl BackendOp for ImapOp { } conn.send_command(format!("EXAMINE \"{}\"", &self.folder_path,).as_bytes())?; conn.read_response(&mut response)?; + let mut bytes_cache = self.byte_cache.lock()?; + let cache = bytes_cache.entry(self.uid).or_default(); + cache.flags = Some(flag); Ok(()) } } diff --git a/melib/src/backends/imap/watch.rs b/melib/src/backends/imap/watch.rs index 29448c7b..f3dfab78 100644 --- a/melib/src/backends/imap/watch.rs +++ b/melib/src/backends/imap/watch.rs @@ -113,7 +113,6 @@ pub fn idle(kit: ImapWatchKit) { folder_hash, work_context, thread_id, - conn.read_response(&mut response) conn.send_command(format!("SELECT {}", folder.path()).as_bytes()) conn.read_response(&mut response) );