BackendOp: change fetch_flags() retval to future

async
Manos Pitsidianakis 2020-06-30 19:36:02 +03:00
parent ed3e66cedf
commit 1ddde9ccba
Signed by: Manos Pitsidianakis
GPG Key ID: 73627C2F690DF710
12 changed files with 390 additions and 374 deletions

View File

@ -437,17 +437,9 @@ pub trait MailBackend: ::std::fmt::Debug + Send + Sync {
/// ```
pub trait BackendOp: ::std::fmt::Debug + ::std::marker::Send {
fn as_bytes(&mut self) -> Result<&[u8]>;
fn fetch_flags(&self) -> Result<Flag>;
fn set_flag(
&mut self,
flag: Flag,
value: bool,
) -> Result<Pin<Box<dyn Future<Output = Result<()>> + Send>>>;
fn set_tag(
&mut self,
tag: String,
value: bool,
) -> Result<Pin<Box<dyn Future<Output = Result<()>> + Send>>>;
fn fetch_flags(&self) -> ResultFuture<Flag>;
fn set_flag(&mut self, flag: Flag, value: bool) -> ResultFuture<()>;
fn set_tag(&mut self, tag: String, value: bool) -> ResultFuture<()>;
}
/// Wrapper for BackendOps that are to be set read-only.
@ -469,7 +461,7 @@ impl BackendOp for ReadOnlyOp {
fn as_bytes(&mut self) -> Result<&[u8]> {
self.op.as_bytes()
}
fn fetch_flags(&self) -> Result<Flag> {
fn fetch_flags(&self) -> ResultFuture<Flag> {
self.op.fetch_flags()
}
fn set_flag(&mut self, _flag: Flag, _value: bool) -> ResultFuture<()> {

View File

@ -20,11 +20,11 @@
*/
use super::*;
use futures::lock::Mutex as FutureMutex;
use crate::backends::*;
use crate::email::*;
use crate::error::{MeliError, Result};
use std::cell::Cell;
use std::sync::{Arc, Mutex};
/// `BackendOp` implementor for Imap
@ -36,7 +36,7 @@ pub struct ImapOp {
body: Option<String>,
mailbox_path: String,
mailbox_hash: MailboxHash,
flags: Cell<Option<Flag>>,
flags: Arc<FutureMutex<Option<Flag>>>,
connection: Arc<Mutex<ImapConnection>>,
uid_store: Arc<UIDStore>,
}
@ -57,7 +57,7 @@ impl ImapOp {
body: None,
mailbox_path,
mailbox_hash,
flags: Cell::new(None),
flags: Arc::new(FutureMutex::new(None)),
uid_store,
}
}
@ -104,44 +104,65 @@ impl BackendOp for ImapOp {
Ok(self.bytes.as_ref().unwrap().as_bytes())
}
fn fetch_flags(&self) -> Result<Flag> {
if self.flags.get().is_some() {
return Ok(self.flags.get().unwrap());
}
let mut bytes_cache = self.uid_store.byte_cache.lock()?;
let cache = bytes_cache.entry(self.uid).or_default();
if cache.flags.is_some() {
self.flags.set(cache.flags);
} else {
let mut response = String::with_capacity(8 * 1024);
let mut conn = try_lock(&self.connection, Some(std::time::Duration::new(2, 0)))?;
conn.examine_mailbox(self.mailbox_hash, &mut response, false)?;
conn.send_command(format!("UID FETCH {} FLAGS", self.uid).as_bytes())?;
conn.read_response(&mut response, RequiredResponses::FETCH_REQUIRED)?;
debug!(
"fetch response is {} bytes and {} lines",
response.len(),
response.lines().collect::<Vec<&str>>().len()
);
let v = protocol_parser::uid_fetch_flags_response(response.as_bytes())
.map(|(_, v)| v)
.map_err(MeliError::from)?;
if v.len() != 1 {
debug!("responses len is {}", v.len());
debug!(&response);
/* TODO: Trigger cache invalidation here. */
debug!(format!("message with UID {} was not found", self.uid));
return Err(
MeliError::new(format!("Invalid/unexpected response: {:?}", response))
.set_summary(format!("message with UID {} was not found?", self.uid)),
);
fn fetch_flags(&self) -> ResultFuture<Flag> {
let connection = self.connection.clone();
let mailbox_hash = self.mailbox_hash;
let uid = self.uid;
let uid_store = self.uid_store.clone();
let flags = self.flags.clone();
let mut response = String::with_capacity(8 * 1024);
Ok(Box::pin(async move {
if let Some(val) = *flags.lock().await {
return Ok(val);
}
let (uid, (flags, _)) = v[0];
assert_eq!(uid, self.uid);
cache.flags = Some(flags);
self.flags.set(Some(flags));
}
Ok(self.flags.get().unwrap())
let exists_in_cache = {
let mut bytes_cache = uid_store.byte_cache.lock()?;
let cache = bytes_cache.entry(uid).or_default();
cache.flags.is_some()
};
if !exists_in_cache {
let mut conn = try_lock(&connection, Some(std::time::Duration::new(2, 0)))?;
conn.examine_mailbox(mailbox_hash, &mut response, false)?;
conn.send_command(format!("UID FETCH {} FLAGS", uid).as_bytes())?;
conn.read_response(&mut response, RequiredResponses::FETCH_REQUIRED)?;
debug!(
"fetch response is {} bytes and {} lines",
response.len(),
response.lines().collect::<Vec<&str>>().len()
);
let v = protocol_parser::uid_fetch_flags_response(response.as_bytes())
.map(|(_, v)| v)
.map_err(MeliError::from)?;
if v.len() != 1 {
debug!("responses len is {}", v.len());
debug!(&response);
/* TODO: Trigger cache invalidation here. */
debug!(format!("message with UID {} was not found", uid));
return Err(MeliError::new(format!(
"Invalid/unexpected response: {:?}",
response
))
.set_summary(format!("message with UID {} was not found?", uid)));
}
let (_uid, (_flags, _)) = v[0];
assert_eq!(_uid, uid);
let mut bytes_cache = uid_store.byte_cache.lock()?;
let cache = bytes_cache.entry(uid).or_default();
cache.flags = Some(_flags);
}
{
let val = {
let mut bytes_cache = uid_store.byte_cache.lock()?;
let cache = bytes_cache.entry(uid).or_default();
let val = cache.flags;
val
};
let mut f = flags.lock().await;
*f = val;
Ok(val.unwrap())
}
}))
}
fn set_flag(
@ -149,8 +170,7 @@ impl BackendOp for ImapOp {
f: Flag,
value: bool,
) -> Result<Pin<Box<dyn Future<Output = Result<()>> + Send>>> {
let mut flags = self.fetch_flags()?;
flags.set(f, value);
let flags = self.fetch_flags()?;
let connection = self.connection.clone();
let mailbox_hash = self.mailbox_hash;
let uid = self.uid;
@ -158,6 +178,8 @@ impl BackendOp for ImapOp {
let mut response = String::with_capacity(8 * 1024);
Ok(Box::pin(async move {
let mut flags = flags.await?;
flags.set(f, value);
let mut conn = try_lock(&connection, Some(std::time::Duration::new(2, 0)))?;
conn.select_mailbox(mailbox_hash, &mut response, false)?;
debug!(&response);

View File

@ -387,46 +387,49 @@ impl MailBackend for ImapType {
fn save(
&self,
_bytes: Vec<u8>,
_mailbox_hash: MailboxHash,
_flags: Option<Flag>,
bytes: Vec<u8>,
mailbox_hash: MailboxHash,
flags: Option<Flag>,
) -> ResultFuture<()> {
unimplemented!()
/*
let path = {
let mailboxes = self.uid_store.mailboxes.read().unwrap();
let uid_store = self.uid_store.clone();
let connection = self.connection.clone();
Ok(Box::pin(async move {
let path = {
let mailboxes = uid_store.mailboxes.read().unwrap();
let mailbox = mailboxes.get(&mailbox_hash).ok_or(MeliError::new(format!(
"Mailbox with hash {} not found.",
mailbox_hash
)))?;
if !mailbox.permissions.lock().unwrap().create_messages {
return Err(MeliError::new(format!(
"You are not allowed to create messages in mailbox {}",
mailbox.path()
)));
}
let mailbox = mailboxes.get(&mailbox_hash).ok_or(MeliError::new(format!(
"Mailbox with hash {} not found.",
mailbox_hash
)))?;
if !mailbox.permissions.lock().unwrap().create_messages {
return Err(MeliError::new(format!(
"You are not allowed to create messages in mailbox {}",
mailbox.path()
)));
}
mailbox.imap_path().to_string()
};
let mut response = String::with_capacity(8 * 1024);
let mut conn = try_lock(&self.connection, Some(std::time::Duration::new(5, 0)))?;
let flags = flags.unwrap_or(Flag::empty());
conn.send_command(
format!(
"APPEND \"{}\" ({}) {{{}}}",
&path,
flags_to_imap_list!(flags),
bytes.len()
mailbox.imap_path().to_string()
};
let mut response = String::with_capacity(8 * 1024);
let mut conn = connection.lock().await;
let flags = flags.unwrap_or(Flag::empty());
conn.send_command(
format!(
"APPEND \"{}\" ({}) {{{}}}",
&path,
flags_to_imap_list!(flags),
bytes.len()
)
.as_bytes(),
)
.as_bytes(),
)?;
// wait for "+ Ready for literal data" reply
conn.wait_for_continuation_request()?;
conn.send_literal(bytes)?;
conn.read_response(&mut response, RequiredResponses::empty())?;
Ok(())
*/
.await?;
// wait for "+ Ready for literal data" reply
conn.wait_for_continuation_request().await?;
conn.send_literal(&bytes).await?;
conn.read_response(&mut response, RequiredResponses::empty())
.await?;
Ok(())
}))
}
fn as_any(&self) -> &dyn ::std::any::Any {
@ -447,201 +450,237 @@ impl MailBackend for ImapType {
fn create_mailbox(
&mut self,
_path: String,
mut path: String,
) -> ResultFuture<(MailboxHash, HashMap<MailboxHash, Mailbox>)> {
unimplemented!()
/*
/* Must transform path to something the IMAP server will accept
*
* Each root mailbox has a hierarchy delimeter reported by the LIST entry. All paths
* must use this delimeter to indicate children of this mailbox.
*
* A new root mailbox should have the default delimeter, which can be found out by issuing
* an empty LIST command as described in RFC3501:
* C: A101 LIST "" ""
* S: * LIST (\Noselect) "/" ""
*
* The default delimiter for us is '/' just like UNIX paths. I apologise if this
* decision is unpleasant for you.
*/
let uid_store = self.uid_store.clone();
let connection = self.connection.clone();
let new_mailbox_fut = self.mailboxes_async();
Ok(Box::pin(async move {
/* Must transform path to something the IMAP server will accept
*
* Each root mailbox has a hierarchy delimeter reported by the LIST entry. All paths
* must use this delimeter to indicate children of this mailbox.
*
* A new root mailbox should have the default delimeter, which can be found out by issuing
* an empty LIST command as described in RFC3501:
* C: A101 LIST "" ""
* S: * LIST (\Noselect) "/" ""
*
* The default delimiter for us is '/' just like UNIX paths. I apologise if this
* decision is unpleasant for you.
*/
let mut mailboxes = self.uid_store.mailboxes.write().unwrap();
for root_mailbox in mailboxes.values().filter(|f| f.parent.is_none()) {
if path.starts_with(&root_mailbox.name) {
debug!("path starts with {:?}", &root_mailbox);
path = path.replace(
'/',
(root_mailbox.separator as char).encode_utf8(&mut [0; 4]),
);
break;
{
let mailboxes = uid_store.mailboxes.write().unwrap();
for root_mailbox in mailboxes.values().filter(|f| f.parent.is_none()) {
if path.starts_with(&root_mailbox.name) {
debug!("path starts with {:?}", &root_mailbox);
path = path.replace(
'/',
(root_mailbox.separator as char).encode_utf8(&mut [0; 4]),
);
break;
}
}
if mailboxes.values().any(|f| f.path == path) {
return Err(MeliError::new(format!(
"Mailbox named `{}` already exists.",
path,
)));
}
}
}
if mailboxes.values().any(|f| f.path == path) {
return Err(MeliError::new(format!(
"Mailbox named `{}` in account `{}` already exists.",
path, self.account_name,
)));
}
let mut response = String::with_capacity(8 * 1024);
{
let mut conn_lck = connection.lock().await;
let mut response = String::with_capacity(8 * 1024);
{
let mut conn_lck = try_lock(&self.connection, None)?;
conn_lck.send_command(format!("CREATE \"{}\"", path,).as_bytes())?;
conn_lck.read_response(&mut response, RequiredResponses::empty())?;
conn_lck.send_command(format!("SUBSCRIBE \"{}\"", path,).as_bytes())?;
conn_lck.read_response(&mut response, RequiredResponses::empty())?;
}
let ret: Result<()> = ImapResponse::from(&response).into();
ret?;
let new_hash = get_path_hash!(path.as_str());
mailboxes.clear();
drop(mailboxes);
Ok((new_hash, self.mailboxes().map_err(|err| MeliError::new(format!("Mailbox create was succesful (returned `{}`) but listing mailboxes afterwards returned `{}`", response, err)))?))
*/
conn_lck
.send_command(format!("CREATE \"{}\"", path,).as_bytes())
.await?;
conn_lck
.read_response(&mut response, RequiredResponses::empty())
.await?;
conn_lck
.send_command(format!("SUBSCRIBE \"{}\"", path,).as_bytes())
.await?;
conn_lck
.read_response(&mut response, RequiredResponses::empty())
.await?;
}
let ret: Result<()> = ImapResponse::from(&response).into();
ret?;
let new_hash = get_path_hash!(path.as_str());
uid_store.mailboxes.write().unwrap().clear();
Ok((new_hash, new_mailbox_fut?.await.map_err(|err| MeliError::new(format!("Mailbox create was succesful (returned `{}`) but listing mailboxes afterwards returned `{}`", response, err)))?))
}))
}
fn delete_mailbox(
&mut self,
_mailbox_hash: MailboxHash,
mailbox_hash: MailboxHash,
) -> ResultFuture<HashMap<MailboxHash, Mailbox>> {
unimplemented!()
/*
let mailboxes = self.uid_store.mailboxes.read().unwrap();
let permissions = mailboxes[&mailbox_hash].permissions();
if !permissions.delete_mailbox {
return Err(MeliError::new(format!("You do not have permission to delete `{}`. Set permissions for this mailbox are {}", mailboxes[&mailbox_hash].name(), permissions)));
}
let mut response = String::with_capacity(8 * 1024);
{
let mut conn_lck = try_lock(&self.connection, None)?;
if !mailboxes[&mailbox_hash].no_select && conn_lck.current_mailbox == Some(mailbox_hash)
let uid_store = self.uid_store.clone();
let connection = self.connection.clone();
let new_mailbox_fut = self.mailboxes_async();
Ok(Box::pin(async move {
let imap_path: String;
let no_select: bool;
let is_subscribed: bool;
{
/* make sure mailbox is not selected before it gets deleted, otherwise
* connection gets dropped by server */
conn_lck.unselect()?;
}
if mailboxes[&mailbox_hash].is_subscribed() {
conn_lck.send_command(
format!("UNSUBSCRIBE \"{}\"", mailboxes[&mailbox_hash].imap_path()).as_bytes(),
)?;
conn_lck.read_response(&mut response, RequiredResponses::empty())?;
let mailboxes = uid_store.mailboxes.read().unwrap();
no_select = mailboxes[&mailbox_hash].no_select;
is_subscribed = mailboxes[&mailbox_hash].is_subscribed();
imap_path = mailboxes[&mailbox_hash].imap_path().to_string();
let permissions = mailboxes[&mailbox_hash].permissions();
if !permissions.delete_mailbox {
return Err(MeliError::new(format!("You do not have permission to delete `{}`. Set permissions for this mailbox are {}", mailboxes[&mailbox_hash].name(), permissions)));
}
}
let mut response = String::with_capacity(8 * 1024);
{
let mut conn_lck = connection.lock().await;
if !no_select
&& (conn_lck.current_mailbox == MailboxSelection::Examine(mailbox_hash)
|| conn_lck.current_mailbox == MailboxSelection::Select(mailbox_hash))
{
/* make sure mailbox is not selected before it gets deleted, otherwise
* connection gets dropped by server */
conn_lck.unselect().await?;
}
if is_subscribed {
conn_lck
.send_command(format!("UNSUBSCRIBE \"{}\"", &imap_path).as_bytes())
.await?;
conn_lck
.read_response(&mut response, RequiredResponses::empty())
.await?;
}
conn_lck.send_command(
debug!(format!(
"DELETE \"{}\"",
mailboxes[&mailbox_hash].imap_path()
))
.as_bytes(),
)?;
conn_lck.read_response(&mut response, RequiredResponses::empty())?;
}
let ret: Result<()> = ImapResponse::from(&response).into();
ret?;
let mut mailboxes = self.uid_store.mailboxes.write().unwrap();
mailboxes.clear();
drop(mailboxes);
self.mailboxes().map_err(|err| format!("Mailbox delete was succesful (returned `{}`) but listing mailboxes afterwards returned `{}`", response, err).into())
*/
conn_lck
.send_command(debug!(format!("DELETE \"{}\"", &imap_path,)).as_bytes())
.await?;
conn_lck
.read_response(&mut response, RequiredResponses::empty())
.await?;
}
let ret: Result<()> = ImapResponse::from(&response).into();
ret?;
uid_store.mailboxes.write().unwrap().clear();
new_mailbox_fut?.await.map_err(|err| format!("Mailbox delete was succesful (returned `{}`) but listing mailboxes afterwards returned `{}`", response, err).into())
}))
}
fn set_mailbox_subscription(
&mut self,
_mailbox_hash: MailboxHash,
_new_val: bool,
mailbox_hash: MailboxHash,
new_val: bool,
) -> ResultFuture<()> {
unimplemented!()
/*
let mut mailboxes = self.uid_store.mailboxes.write().unwrap();
if mailboxes[&mailbox_hash].is_subscribed() == new_val {
return Ok(());
}
let mut response = String::with_capacity(8 * 1024);
{
let mut conn_lck = try_lock(&self.connection, None)?;
if new_val {
conn_lck.send_command(
format!("SUBSCRIBE \"{}\"", mailboxes[&mailbox_hash].imap_path()).as_bytes(),
)?;
} else {
conn_lck.send_command(
format!("UNSUBSCRIBE \"{}\"", mailboxes[&mailbox_hash].imap_path()).as_bytes(),
)?;
let uid_store = self.uid_store.clone();
let connection = self.connection.clone();
Ok(Box::pin(async move {
let command: String;
{
let mailboxes = uid_store.mailboxes.write().unwrap();
if mailboxes[&mailbox_hash].is_subscribed() == new_val {
return Ok(());
}
command = format!("SUBSCRIBE \"{}\"", mailboxes[&mailbox_hash].imap_path());
}
conn_lck.read_response(&mut response, RequiredResponses::empty())?;
}
let ret: Result<()> = ImapResponse::from(&response).into();
if ret.is_ok() {
mailboxes.entry(mailbox_hash).and_modify(|entry| {
let _ = entry.set_is_subscribed(new_val);
});
}
ret
*/
let mut response = String::with_capacity(8 * 1024);
{
let mut conn_lck = connection.lock().await;
if new_val {
conn_lck.send_command(command.as_bytes()).await?;
} else {
conn_lck
.send_command(format!("UN{}", command).as_bytes())
.await?;
}
conn_lck
.read_response(&mut response, RequiredResponses::empty())
.await?;
}
let ret: Result<()> = ImapResponse::from(&response).into();
if ret.is_ok() {
uid_store
.mailboxes
.write()
.unwrap()
.entry(mailbox_hash)
.and_modify(|entry| {
let _ = entry.set_is_subscribed(new_val);
});
}
ret
}))
}
fn rename_mailbox(
&mut self,
_mailbox_hash: MailboxHash,
_new_path: String,
mailbox_hash: MailboxHash,
mut new_path: String,
) -> ResultFuture<Mailbox> {
unimplemented!()
/*
let mut mailboxes = self.uid_store.mailboxes.write().unwrap();
let permissions = mailboxes[&mailbox_hash].permissions();
if !permissions.delete_mailbox {
return Err(MeliError::new(format!("You do not have permission to rename mailbox `{}` (rename is equivalent to delete + create). Set permissions for this mailbox are {}", mailboxes[&mailbox_hash].name(), permissions)));
}
let mut response = String::with_capacity(8 * 1024);
if mailboxes[&mailbox_hash].separator != b'/' {
new_path = new_path.replace(
'/',
(mailboxes[&mailbox_hash].separator as char).encode_utf8(&mut [0; 4]),
);
}
{
let mut conn_lck = try_lock(&self.connection, None)?;
conn_lck.send_command(
debug!(format!(
let uid_store = self.uid_store.clone();
let connection = self.connection.clone();
let new_mailbox_fut = self.mailboxes_async();
Ok(Box::pin(async move {
let command: String;
let mut response = String::with_capacity(8 * 1024);
{
let mailboxes = uid_store.mailboxes.write().unwrap();
let permissions = mailboxes[&mailbox_hash].permissions();
if !permissions.delete_mailbox {
return Err(MeliError::new(format!("You do not have permission to rename mailbox `{}` (rename is equivalent to delete + create). Set permissions for this mailbox are {}", mailboxes[&mailbox_hash].name(), permissions)));
}
if mailboxes[&mailbox_hash].separator != b'/' {
new_path = new_path.replace(
'/',
(mailboxes[&mailbox_hash].separator as char).encode_utf8(&mut [0; 4]),
);
}
command = format!(
"RENAME \"{}\" \"{}\"",
mailboxes[&mailbox_hash].imap_path(),
new_path
))
.as_bytes(),
)?;
conn_lck.read_response(&mut response, RequiredResponses::empty())?;
}
let new_hash = get_path_hash!(new_path.as_str());
let ret: Result<()> = ImapResponse::from(&response).into();
ret?;
mailboxes.clear();
drop(mailboxes);
self.mailboxes().map_err(|err| format!("Mailbox rename was succesful (returned `{}`) but listing mailboxes afterwards returned `{}`", response, err))?;
Ok(BackendMailbox::clone(
&self.uid_store.mailboxes.read().unwrap()[&new_hash],
))
*/
);
}
{
let mut conn_lck = connection.lock().await;
conn_lck.send_command(debug!(command).as_bytes()).await?;
conn_lck
.read_response(&mut response, RequiredResponses::empty())
.await?;
}
let new_hash = get_path_hash!(new_path.as_str());
let ret: Result<()> = ImapResponse::from(&response).into();
ret?;
uid_store.mailboxes.write().unwrap().clear();
new_mailbox_fut?.await.map_err(|err| format!("Mailbox rename was succesful (returned `{}`) but listing mailboxes afterwards returned `{}`", response, err))?;
Ok(BackendMailbox::clone(
&uid_store.mailboxes.read().unwrap()[&new_hash],
))
}))
}
fn set_mailbox_permissions(
&mut self,
_mailbox_hash: MailboxHash,
_val: crate::backends::MailboxPermissions,
mailbox_hash: MailboxHash,
val: crate::backends::MailboxPermissions,
) -> ResultFuture<()> {
unimplemented!()
/*
let mailboxes = self.uid_store.mailboxes.write().unwrap();
let permissions = mailboxes[&mailbox_hash].permissions();
if !permissions.change_permissions {
return Err(MeliError::new(format!("You do not have permission to change permissions for mailbox `{}`. Set permissions for this mailbox are {}", mailboxes[&mailbox_hash].name(), permissions)));
}
let uid_store = self.uid_store.clone();
let connection = self.connection.clone();
Ok(Box::pin(async move {
let mailboxes = uid_store.mailboxes.write().unwrap();
let permissions = mailboxes[&mailbox_hash].permissions();
if !permissions.change_permissions {
return Err(MeliError::new(format!("You do not have permission to change permissions for mailbox `{}`. Set permissions for this mailbox are {}", mailboxes[&mailbox_hash].name(), permissions)));
}
Err(MeliError::new("Unimplemented."))
*/
Err(MeliError::new("Unimplemented."))
}))
}
fn search(
@ -953,20 +992,6 @@ impl ImapType {
Ok(debug!(mailboxes))
}
pub fn capabilities(&self) -> Vec<String> {
vec![]
/*
try_lock(&self.connection, Some(std::time::Duration::new(2, 0)))
.map(|c| {
c.capabilities
.iter()
.map(|c| String::from_utf8_lossy(c).into())
.collect::<Vec<String>>()
})
.unwrap_or_default()
*/
}
pub fn validate_config(s: &AccountSettings) -> Result<()> {
get_conf_val!(s["server_hostname"])?;
get_conf_val!(s["server_username"])?;

View File

@ -24,7 +24,6 @@ use super::*;
use crate::backends::*;
use crate::email::*;
use crate::error::{MeliError, Result};
use std::cell::Cell;
use std::sync::Arc;
/// `BackendOp` implementor for Imap
@ -36,7 +35,7 @@ pub struct ImapOp {
body: Option<String>,
mailbox_path: String,
mailbox_hash: MailboxHash,
flags: Cell<Option<Flag>>,
flags: Arc<FutureMutex<Option<Flag>>>,
connection: Arc<FutureMutex<ImapConnection>>,
uid_store: Arc<UIDStore>,
}
@ -57,7 +56,7 @@ impl ImapOp {
body: None,
mailbox_path,
mailbox_hash,
flags: Cell::new(None),
flags: Arc::new(FutureMutex::new(None)),
uid_store,
}
}
@ -99,7 +98,7 @@ impl BackendOp for ImapOp {
let mut bytes_cache = self.uid_store.byte_cache.lock()?;
let cache = bytes_cache.entry(self.uid).or_default();
if let Some((flags, _)) = flags {
self.flags.set(Some(flags));
//self.flags.set(Some(flags));
cache.flags = Some(flags);
}
cache.bytes =
@ -113,21 +112,28 @@ impl BackendOp for ImapOp {
Ok(self.bytes.as_ref().unwrap().as_bytes())
}
fn fetch_flags(&self) -> Result<Flag> {
if self.flags.get().is_some() {
return Ok(self.flags.get().unwrap());
}
let mut bytes_cache = self.uid_store.byte_cache.lock()?;
let cache = bytes_cache.entry(self.uid).or_default();
if cache.flags.is_some() {
self.flags.set(cache.flags);
} else {
futures::executor::block_on(async {
let mut response = String::with_capacity(8 * 1024);
let mut conn = self.connection.lock().await;
conn.examine_mailbox(self.mailbox_hash, &mut response, false)
fn fetch_flags(&self) -> ResultFuture<Flag> {
let mut response = String::with_capacity(8 * 1024);
let connection = self.connection.clone();
let mailbox_hash = self.mailbox_hash;
let uid = self.uid;
let uid_store = self.uid_store.clone();
let flags = self.flags.clone();
Ok(Box::pin(async move {
if let Some(val) = *flags.lock().await {
return Ok(val);
}
let exists_in_cache = {
let mut bytes_cache = uid_store.byte_cache.lock()?;
let cache = bytes_cache.entry(uid).or_default();
cache.flags.is_some()
};
if !exists_in_cache {
let mut conn = connection.lock().await;
conn.examine_mailbox(mailbox_hash, &mut response, false)
.await?;
conn.send_command(format!("UID FETCH {} FLAGS", self.uid).as_bytes())
conn.send_command(format!("UID FETCH {} FLAGS", uid).as_bytes())
.await?;
conn.read_response(&mut response, RequiredResponses::FETCH_REQUIRED)
.await?;
@ -143,21 +149,32 @@ impl BackendOp for ImapOp {
debug!("responses len is {}", v.len());
debug!(&response);
/* TODO: Trigger cache invalidation here. */
debug!(format!("message with UID {} was not found", self.uid));
debug!(format!("message with UID {} was not found", uid));
return Err(MeliError::new(format!(
"Invalid/unexpected response: {:?}",
response
))
.set_summary(format!("message with UID {} was not found?", self.uid)));
.set_summary(format!("message with UID {} was not found?", uid)));
}
let (uid, (flags, _)) = v[0];
assert_eq!(uid, self.uid);
cache.flags = Some(flags);
self.flags.set(Some(flags));
Ok(())
})?;
}
Ok(self.flags.get().unwrap())
let (_uid, (_flags, _)) = v[0];
assert_eq!(uid, uid);
*flags.lock().await = Some(_flags);
let mut bytes_cache = uid_store.byte_cache.lock()?;
let cache = bytes_cache.entry(uid).or_default();
cache.flags = Some(_flags);
}
{
let val = {
let mut bytes_cache = uid_store.byte_cache.lock()?;
let cache = bytes_cache.entry(uid).or_default();
let val = cache.flags;
val
};
let mut f = flags.lock().await;
*f = val;
Ok(val.unwrap())
}
}))
}
fn set_flag(
@ -165,8 +182,7 @@ impl BackendOp for ImapOp {
flag: Flag,
value: bool,
) -> Result<Pin<Box<dyn Future<Output = Result<()>> + Send>>> {
let mut flags = self.fetch_flags()?;
flags.set(flag, value);
let flags = self.fetch_flags()?;
let mut response = String::with_capacity(8 * 1024);
let connection = self.connection.clone();
@ -174,6 +190,8 @@ impl BackendOp for ImapOp {
let uid = self.uid;
let uid_store = self.uid_store.clone();
Ok(Box::pin(async move {
let mut flags = flags.await?;
flags.set(flag, value);
let mut conn = connection.lock().await;
conn.select_mailbox(mailbox_hash, &mut response, false)
.await?;

View File

@ -21,7 +21,6 @@
use super::*;
use crate::backends::*;
use crate::error::Result;
use std::cell::Cell;
use std::sync::{Arc, RwLock};
@ -90,23 +89,15 @@ impl BackendOp for JmapOp {
Ok(&self.bytes.as_ref().unwrap().as_bytes())
}
fn fetch_flags(&self) -> Result<Flag> {
Ok(Flag::default())
fn fetch_flags(&self) -> ResultFuture<Flag> {
Ok(Box::pin(async { Ok(Flag::default()) }))
}
fn set_flag(
&mut self,
_f: Flag,
_value: bool,
) -> Result<Pin<Box<dyn Future<Output = Result<()>> + Send>>> {
fn set_flag(&mut self, _f: Flag, _value: bool) -> ResultFuture<()> {
Err(MeliError::new("Unimplemented"))
}
fn set_tag(
&mut self,
_tag: String,
_value: bool,
) -> Result<Pin<Box<dyn Future<Output = Result<()>> + Send>>> {
fn set_tag(&mut self, _tag: String, _value: bool) -> ResultFuture<()> {
Err(MeliError::new("Unimplemented"))
}
}

View File

@ -98,17 +98,14 @@ impl<'a> BackendOp for MaildirOp {
Ok(unsafe { self.slice.as_ref().unwrap().as_slice() })
}
fn fetch_flags(&self) -> Result<Flag> {
fn fetch_flags(&self) -> ResultFuture<Flag> {
let path = self.path();
Ok(path.flags())
let ret = Ok(path.flags());
Ok(Box::pin(async move { ret }))
}
fn set_flag(
&mut self,
f: Flag,
value: bool,
) -> Result<Pin<Box<dyn Future<Output = Result<()>> + Send>>> {
let mut flags = self.fetch_flags()?;
fn set_flag(&mut self, f: Flag, value: bool) -> ResultFuture<()> {
let mut flags = futures::executor::block_on(self.fetch_flags()?)?;
let old_hash = self.hash;
let mailbox_hash = self.mailbox_hash;
let hash_index = self.hash_index.clone();
@ -153,11 +150,7 @@ impl<'a> BackendOp for MaildirOp {
}))
}
fn set_tag(
&mut self,
_tag: String,
_value: bool,
) -> Result<Pin<Box<dyn Future<Output = Result<()>> + Send>>> {
fn set_tag(&mut self, _tag: String, _value: bool) -> ResultFuture<()> {
Err(MeliError::new("Maildir doesn't support tags."))
}
}

View File

@ -26,6 +26,7 @@ use crate::conf::AccountSettings;
use crate::email::{Envelope, EnvelopeHash, Flag};
use crate::error::{MeliError, Result};
use crate::shellexpand::ShellExpandTrait;
use futures::prelude::Stream;
extern crate notify;
use self::notify::{watcher, DebouncedEvent, RecursiveMode, Watcher};
@ -280,7 +281,7 @@ impl MailBackend for MaildirType {
(*map).insert(hash, PathBuf::from(&file).into());
}
let op = Box::new(MaildirOp::new(hash, map.clone(), mailbox_hash));
if let Some(e) = Envelope::from_token(op, hash) {
if let Ok(e) = Envelope::from_token(op, hash) {
mailbox_index.lock().unwrap().insert(e.hash(), mailbox_hash);
let file_name = PathBuf::from(file)
.strip_prefix(&root_path)
@ -477,7 +478,7 @@ impl MailBackend for MaildirType {
hash_indexes.clone(),
mailbox_hash,
));
if let Some(env) = Envelope::from_token(op, new_hash) {
if let Ok(env) = Envelope::from_token(op, new_hash) {
debug!("{}\t{:?}", new_hash, &pathbuf);
debug!(
"hash {}, path: {:?} couldn't be parsed",
@ -1040,7 +1041,7 @@ impl MaildirType {
map.clone(),
mailbox_hash,
));
if let Some(e) = Envelope::from_token(op, hash) {
if let Ok(e) = Envelope::from_token(op, hash) {
mailbox_index
.lock()
.unwrap()
@ -1220,7 +1221,7 @@ fn add_path_to_index(
);
}
let op = Box::new(MaildirOp::new(hash, hash_index.clone(), mailbox_hash));
if let Some(e) = Envelope::from_token(op, hash) {
if let Ok(e) = Envelope::from_token(op, hash) {
debug!("add_path_to_index gen {}\t{}", hash, file_name.display());
if let Ok(cached) = cache_dir.place_cache_file(file_name) {
debug!("putting in cache");

View File

@ -147,7 +147,7 @@ impl MaildirStream {
(*map).insert(hash, PathBuf::from(file).into());
}
let op = Box::new(MaildirOp::new(hash, map.clone(), mailbox_hash));
if let Some(e) = Envelope::from_token(op, hash) {
if let Ok(e) = Envelope::from_token(op, hash) {
mailbox_index.lock().unwrap().insert(e.hash(), mailbox_hash);
if let Ok(cached) = cache_dir.place_cache_file(file_name) {
/* place result in cache directory */

View File

@ -192,7 +192,7 @@ impl BackendOp for MboxOp {
})
}
fn fetch_flags(&self) -> Result<Flag> {
fn fetch_flags(&self) -> ResultFuture<Flag> {
let mut flags = Flag::empty();
let file = std::fs::OpenOptions::new()
.read(true)
@ -245,22 +245,14 @@ impl BackendOp for MboxOp {
}
}
}
Ok(flags)
Ok(Box::pin(async move { Ok(flags) }))
}
fn set_flag(
&mut self,
_flag: Flag,
_value: bool,
) -> Result<Pin<Box<dyn Future<Output = Result<()>> + Send>>> {
fn set_flag(&mut self, _flag: Flag, _value: bool) -> ResultFuture<()> {
Err(MeliError::new("Unimplemented."))
}
fn set_tag(
&mut self,
_tag: String,
_value: bool,
) -> Result<Pin<Box<dyn Future<Output = Result<()>> + Send>>> {
fn set_tag(&mut self, _tag: String, _value: bool) -> ResultFuture<()> {
Err(MeliError::new("mbox doesn't support tags."))
}
}

View File

@ -23,7 +23,7 @@ use crate::async_workers::{Async, AsyncBuilder, AsyncStatus, WorkContext};
use crate::backends::*;
use crate::conf::AccountSettings;
use crate::email::{Envelope, EnvelopeHash, Flag};
use crate::error::{MeliError, Result};
use crate::error::{MeliError, Result, ResultIntoMeliError};
use crate::shellexpand::ShellExpandTrait;
use smallvec::SmallVec;
use std::collections::{
@ -665,7 +665,7 @@ impl BackendOp for NotmuchOp {
Ok(self.bytes.as_ref().unwrap().as_bytes())
}
fn fetch_flags(&self) -> Result<Flag> {
fn fetch_flags(&self) -> ResultFuture<Flag> {
let mut message: *mut notmuch_message_t = std::ptr::null_mut();
let index_lck = self.index.write().unwrap();
unsafe {
@ -676,15 +676,11 @@ impl BackendOp for NotmuchOp {
)
};
let (flags, _tags) = TagIterator::new(self.lib.clone(), message).collect_flags_and_tags();
Ok(flags)
Ok(Box::pin(async move { Ok(flags) }))
}
fn set_flag(
&mut self,
f: Flag,
value: bool,
) -> Result<Pin<Box<dyn Future<Output = Result<()>> + Send>>> {
let mut flags = self.fetch_flags()?;
fn set_flag(&mut self, f: Flag, value: bool) -> ResultFuture<()> {
let mut flags = futures::executor::block_on(self.fetch_flags()?)?;
flags.set(f, value);
let mut message: *mut notmuch_message_t = std::ptr::null_mut();
let mut index_lck = self.index.write().unwrap();
@ -775,11 +771,7 @@ impl BackendOp for NotmuchOp {
Ok(Box::pin(async { Ok(()) }))
}
fn set_tag(
&mut self,
tag: String,
value: bool,
) -> Result<Pin<Box<dyn Future<Output = Result<()>> + Send>>> {
fn set_tag(&mut self, tag: String, value: bool) -> ResultFuture<()> {
let mut message: *mut notmuch_message_t = std::ptr::null_mut();
let index_lck = self.index.read().unwrap();
unsafe {
@ -1058,8 +1050,8 @@ fn notmuch_message_into_envelope(
env.set_flags(flags);
env
})
.ok_or_else(|| {
.chain_err_summary(|| {
index.write().unwrap().remove(&env_hash);
format!("could not parse path {:?}", c_str).into()
format!("could not parse path {:?}", c_str)
})
}

View File

@ -225,17 +225,15 @@ impl Envelope {
}
Err(MeliError::new("Couldn't parse mail."))
}
pub fn from_token(mut operation: Box<dyn BackendOp>, hash: EnvelopeHash) -> Option<Envelope> {
pub fn from_token(mut operation: Box<dyn BackendOp>, hash: EnvelopeHash) -> Result<Envelope> {
let mut e = Envelope::new(hash);
e.flags = operation.fetch_flags().unwrap_or_default();
if let Ok(bytes) = operation.as_bytes() {
let res = e.populate_headers(bytes).ok();
if res.is_some() {
return Some(e);
}
}
None
e.flags = futures::executor::block_on(operation.fetch_flags()?)?;
let bytes = operation.as_bytes()?;
e.populate_headers(bytes)?;
Ok(e)
}
pub fn hash(&self) -> EnvelopeHash {
self.hash
}

View File

@ -315,23 +315,15 @@ impl BackendOp for PluginOp {
}
}
fn fetch_flags(&self) -> Result<Flag> {
fn fetch_flags(&self) -> ResultFuture<Flag> {
Err(MeliError::new("Unimplemented."))
}
fn set_flag(
&mut self,
_f: Flag,
_value: bool,
) -> Result<Pin<Box<dyn Future<Output = Result<()>> + Send>>> {
fn set_flag(&mut self, _f: Flag, _value: bool) -> ResultFuture<()> {
Err(MeliError::new("Unimplemented."))
}
fn set_tag(
&mut self,
_tag: String,
_value: bool,
) -> Result<Pin<Box<dyn Future<Output = Result<()>> + Send>>> {
fn set_tag(&mut self, _tag: String, _value: bool) -> ResultFuture<()> {
Err(MeliError::new("Unimplemented."))
}
}