imap_async: add operations

async
Manos Pitsidianakis 2020-06-29 00:16:07 +03:00
parent c82367e00d
commit 42419327f8
Signed by: Manos Pitsidianakis
GPG Key ID: 73627C2F690DF710
2 changed files with 154 additions and 150 deletions

View File

@ -26,8 +26,8 @@ mod protocol_parser;
pub use protocol_parser::{UntaggedResponse::*, *};
mod mailbox;
pub use mailbox::*;
//mod operations;
//pub use operations::*;
mod operations;
pub use operations::*;
mod connection;
pub use connection::*;
mod watch;
@ -195,7 +195,7 @@ impl MailBackend for ImapType {
loop {
let res = get_hlpr(&connection, mailbox_hash,&cached_hash_set, &can_create_flags, &mut our_unseen, &mut valid_hash_set, &uid_store, &mut max_uid).await?;
yield res;
if max_uid == Some(1) {
if max_uid == Some(1) || max_uid == Some(0) {
return;
}
@ -789,8 +789,6 @@ impl MailBackend for ImapType {
}
fn operation(&self, hash: EnvelopeHash) -> Result<Box<dyn BackendOp>> {
unimplemented!()
/*
let (uid, mailbox_hash) = if let Some(v) =
self.uid_store.hash_index.lock().unwrap().get(&hash)
{
@ -809,7 +807,6 @@ impl MailBackend for ImapType {
self.connection.clone(),
self.uid_store.clone(),
)))
*/
}
fn save(&self, bytes: &[u8], mailbox_hash: MailboxHash, flags: Option<Flag>) -> Result<()> {
@ -1639,15 +1636,13 @@ async fn get_hlpr(
}
if examine_response.exists == 0 {
if uid_store.cache_headers {
/*
for &env_hash in &cached_hash_set {
for &env_hash in cached_hash_set {
conn.add_refresh_event(RefreshEvent {
account_hash: uid_store.account_hash,
mailbox_hash,
kind: RefreshEventKind::Remove(env_hash),
});
}
*/
let _ = cache::save_envelopes(
uid_store.account_hash,
mailbox_hash,
@ -1713,7 +1708,6 @@ async fn get_hlpr(
mailbox_path
)
})?;
drop(conn);
debug!(
"fetch response is {} bytes and {} lines",
response.len(),
@ -1778,11 +1772,10 @@ async fn get_hlpr(
debug!("sending payload for {}", mailbox_hash);
if uid_store.cache_headers {
//FIXME
/*
cache::save_envelopes(
uid_store.account_hash,
mailbox_hash,
examine_response.uidvalidity,
uid_store.uidvalidity.lock().unwrap()[&mailbox_hash],
&envelopes
.iter()
.map(|(uid, env)| (*uid, env))
@ -1794,9 +1787,7 @@ async fn get_hlpr(
mailbox_path
)
})?;
*/
}
/*
for &env_hash in cached_hash_set.difference(&valid_hash_set) {
conn.add_refresh_event(RefreshEvent {
account_hash: uid_store.account_hash,
@ -1804,7 +1795,7 @@ async fn get_hlpr(
kind: RefreshEventKind::Remove(env_hash),
});
}
*/
drop(conn);
unseen
.lock()
.unwrap()

View File

@ -21,7 +21,7 @@
use super::*;
use crate::backends::BackendOp;
use crate::backends::*;
use crate::email::*;
use crate::error::{MeliError, Result};
use std::cell::Cell;
@ -37,7 +37,7 @@ pub struct ImapOp {
mailbox_path: String,
mailbox_hash: MailboxHash,
flags: Cell<Option<Flag>>,
connection: Arc<Mutex<ImapConnection>>,
connection: Arc<FutureMutex<ImapConnection>>,
uid_store: Arc<UIDStore>,
}
@ -46,7 +46,7 @@ impl ImapOp {
uid: usize,
mailbox_path: String,
mailbox_hash: MailboxHash,
connection: Arc<Mutex<ImapConnection>>,
connection: Arc<FutureMutex<ImapConnection>>,
uid_store: Arc<UIDStore>,
) -> Self {
ImapOp {
@ -77,164 +77,177 @@ impl BackendOp for ImapOp {
} else {
drop(cache);
drop(bytes_cache);
let mut response = String::with_capacity(8 * 1024);
{
let mut conn =
try_lock(&self.connection, Some(std::time::Duration::new(2, 0)))?;
conn.examine_mailbox(self.mailbox_hash, &mut response)?;
conn.send_command(format!("UID FETCH {} (FLAGS RFC822)", self.uid).as_bytes())?;
conn.read_response(&mut response, RequiredResponses::FETCH_REQUIRED)?;
}
debug!(
"fetch response is {} bytes and {} lines",
response.len(),
response.lines().collect::<Vec<&str>>().len()
);
let UidFetchResponse {
uid, flags, body, ..
} = protocol_parser::uid_fetch_response(&response)?.1;
assert_eq!(uid, self.uid);
assert!(body.is_some());
let mut bytes_cache = self.uid_store.byte_cache.lock()?;
let cache = bytes_cache.entry(self.uid).or_default();
if let Some((flags, _)) = flags {
self.flags.set(Some(flags));
cache.flags = Some(flags);
}
cache.bytes =
Some(unsafe { std::str::from_utf8_unchecked(body.unwrap()).to_string() });
self.bytes = cache.bytes.clone();
let ret: Result<()> = futures::executor::block_on(async {
let mut response = String::with_capacity(8 * 1024);
{
let mut conn = self.connection.lock().await;
conn.examine_mailbox(self.mailbox_hash, &mut response)
.await?;
conn.send_command(
format!("UID FETCH {} (FLAGS RFC822)", self.uid).as_bytes(),
)
.await?;
conn.read_response(&mut response, RequiredResponses::FETCH_REQUIRED)
.await?;
}
debug!(
"fetch response is {} bytes and {} lines",
response.len(),
response.lines().collect::<Vec<&str>>().len()
);
let UidFetchResponse {
uid, flags, body, ..
} = protocol_parser::uid_fetch_response(&response)?.1;
assert_eq!(uid, self.uid);
assert!(body.is_some());
let mut bytes_cache = self.uid_store.byte_cache.lock()?;
let cache = bytes_cache.entry(self.uid).or_default();
if let Some((flags, _)) = flags {
self.flags.set(Some(flags));
cache.flags = Some(flags);
}
cache.bytes =
Some(unsafe { std::str::from_utf8_unchecked(body.unwrap()).to_string() });
self.bytes = cache.bytes.clone();
Ok(())
});
ret?;
}
}
Ok(self.bytes.as_ref().unwrap().as_bytes())
}
fn fetch_flags(&self) -> Flag {
macro_rules! or_return_default {
($expr:expr) => {
match $expr {
Ok(ok) => ok,
Err(_) => return Default::default(),
}
};
}
fn fetch_flags(&self) -> Result<Flag> {
if self.flags.get().is_some() {
return self.flags.get().unwrap();
return Ok(self.flags.get().unwrap());
}
let mut bytes_cache = or_return_default!(self.uid_store.byte_cache.lock());
let mut bytes_cache = self.uid_store.byte_cache.lock()?;
let cache = bytes_cache.entry(self.uid).or_default();
if cache.flags.is_some() {
self.flags.set(cache.flags);
} else {
let mut response = String::with_capacity(8 * 1024);
let mut conn = or_return_default!(try_lock(
&self.connection,
Some(std::time::Duration::new(2, 0))
));
or_return_default!(conn.examine_mailbox(self.mailbox_hash, &mut response));
or_return_default!(
futures::executor::block_on(async {
let mut response = String::with_capacity(8 * 1024);
let mut conn = self.connection.lock().await;
conn.examine_mailbox(self.mailbox_hash, &mut response)
.await?;
conn.send_command(format!("UID FETCH {} FLAGS", self.uid).as_bytes())
);
or_return_default!(conn.read_response(&mut response, RequiredResponses::FETCH_REQUIRED));
debug!(
"fetch response is {} bytes and {} lines",
response.len(),
response.lines().collect::<Vec<&str>>().len()
);
.await?;
conn.read_response(&mut response, RequiredResponses::FETCH_REQUIRED)
.await?;
debug!(
"fetch response is {} bytes and {} lines",
response.len(),
response.lines().collect::<Vec<&str>>().len()
);
let v = protocol_parser::uid_fetch_flags_response(response.as_bytes())
.map(|(_, v)| v)
.map_err(MeliError::from)?;
if v.len() != 1 {
debug!("responses len is {}", v.len());
debug!(&response);
/* TODO: Trigger cache invalidation here. */
debug!(format!("message with UID {} was not found", self.uid));
return Err(MeliError::new(format!(
"Invalid/unexpected response: {:?}",
response
))
.set_summary(format!("message with UID {} was not found?", self.uid)));
}
let (uid, (flags, _)) = v[0];
assert_eq!(uid, self.uid);
cache.flags = Some(flags);
self.flags.set(Some(flags));
Ok(())
})?;
}
Ok(self.flags.get().unwrap())
}
fn set_flag(
&mut self,
flag: Flag,
value: bool,
) -> Result<Pin<Box<dyn Future<Output = Result<()>> + Send>>> {
let mut flags = self.fetch_flags()?;
flags.set(flag, value);
let mut response = String::with_capacity(8 * 1024);
let connection = self.connection.clone();
let mailbox_hash = self.mailbox_hash;
let uid = self.uid;
let uid_store = self.uid_store.clone();
Ok(Box::pin(async move {
let mut conn = connection.lock().await;
conn.select_mailbox(mailbox_hash, &mut response).await?;
debug!(&response);
conn.send_command(
format!(
"UID STORE {} FLAGS.SILENT ({})",
uid,
flags_to_imap_list!(flags)
)
.as_bytes(),
)
.await?;
conn.read_response(&mut response, RequiredResponses::STORE_REQUIRED)
.await?;
debug!(&response);
match protocol_parser::uid_fetch_flags_response(response.as_bytes())
.map(|(_, v)| v)
.map_err(MeliError::from)
{
Ok(v) => {
if v.len() != 1 {
if v.len() == 1 {
debug!("responses len is {}", v.len());
debug!(response);
/* TODO: Trigger cache invalidation here. */
debug!(format!("message with UID {} was not found", self.uid));
return Flag::default();
let (_uid, (flags, _)) = v[0];
assert_eq!(_uid, uid);
}
let (uid, (flags, _)) = v[0];
assert_eq!(uid, self.uid);
cache.flags = Some(flags);
self.flags.set(Some(flags));
}
Err(e) => or_return_default!(Err(e)),
Err(e) => Err(e)?,
}
}
self.flags.get().unwrap()
let mut bytes_cache = uid_store.byte_cache.lock()?;
let cache = bytes_cache.entry(uid).or_default();
cache.flags = Some(flags);
Ok(())
}))
}
fn set_flag(&mut self, f: Flag, value: bool) -> Result<()> {
let mut flags = self.fetch_flags();
flags.set(f, value);
fn set_tag(
&mut self,
tag: String,
value: bool,
) -> Result<Pin<Box<dyn Future<Output = Result<()>> + Send>>> {
let mut response = String::with_capacity(8 * 1024);
let mut conn = try_lock(&self.connection, Some(std::time::Duration::new(2, 0)))?;
conn.select_mailbox(self.mailbox_hash, &mut response)?;
debug!(&response);
conn.send_command(
format!(
"UID STORE {} FLAGS.SILENT ({})",
self.uid,
flags_to_imap_list!(flags)
let connection = self.connection.clone();
let mailbox_hash = self.mailbox_hash;
let uid = self.uid;
let uid_store = self.uid_store.clone();
Ok(Box::pin(async move {
let mut conn = connection.lock().await;
conn.select_mailbox(mailbox_hash, &mut response).await?;
conn.send_command(
format!(
"UID STORE {} {}FLAGS.SILENT ({})",
uid,
if value { "+" } else { "-" },
&tag
)
.as_bytes(),
)
.as_bytes(),
)?;
conn.read_response(&mut response, RequiredResponses::STORE_REQUIRED)?;
debug!(&response);
match protocol_parser::uid_fetch_flags_response(response.as_bytes())
.map(|(_, v)| v)
.map_err(MeliError::from)
{
Ok(v) => {
if v.len() == 1 {
debug!("responses len is {}", v.len());
let (uid, (flags, _)) = v[0];
assert_eq!(uid, self.uid);
self.flags.set(Some(flags));
}
}
Err(e) => Err(e)?,
}
let mut bytes_cache = self.uid_store.byte_cache.lock()?;
let cache = bytes_cache.entry(self.uid).or_default();
cache.flags = Some(flags);
Ok(())
}
fn set_tag(&mut self, tag: String, value: bool) -> Result<()> {
let mut response = String::with_capacity(8 * 1024);
let mut conn = try_lock(&self.connection, Some(std::time::Duration::new(2, 0)))?;
conn.select_mailbox(self.mailbox_hash, &mut response)?;
conn.send_command(
format!(
"UID STORE {} {}FLAGS.SILENT ({})",
self.uid,
if value { "+" } else { "-" },
&tag
)
.as_bytes(),
)?;
conn.read_response(&mut response, RequiredResponses::STORE_REQUIRED)?;
protocol_parser::uid_fetch_flags_response(response.as_bytes())
.map(|(_, v)| v)
.map_err(MeliError::from)?;
let hash = tag_hash!(tag);
if value {
self.uid_store.tag_index.write().unwrap().insert(hash, tag);
} else {
self.uid_store.tag_index.write().unwrap().remove(&hash);
}
if !envelope.labels().iter().any(|&h_| h_ == hash) {
.await?;
conn.read_response(&mut response, RequiredResponses::STORE_REQUIRED)
.await?;
protocol_parser::uid_fetch_flags_response(response.as_bytes())
.map(|(_, v)| v)
.map_err(MeliError::from)?;
let hash = tag_hash!(tag);
if value {
envelope.labels_mut().push(hash);
uid_store.tag_index.write().unwrap().insert(hash, tag);
} else {
uid_store.tag_index.write().unwrap().remove(&hash);
}
}
if !value {
if let Some(pos) = envelope.labels().iter().position(|&h_| h_ == hash) {
envelope.labels_mut().remove(pos);
}
}
Ok(())
Ok(())
}))
}
}