BackendOp: return future in as_bytes()

memfd
Manos Pitsidianakis 2020-07-04 17:38:57 +03:00
parent 4721073bc3
commit b3876113aa
Signed by: Manos Pitsidianakis
GPG Key ID: 73627C2F690DF710
19 changed files with 562 additions and 571 deletions

View File

@ -436,7 +436,7 @@ pub trait MailBackend: ::std::fmt::Debug + Send + Sync {
/// let operation = Box::new(FooOp {});
/// ```
pub trait BackendOp: ::std::fmt::Debug + ::std::marker::Send {
fn as_bytes(&mut self) -> Result<&[u8]>;
fn as_bytes(&mut self) -> ResultFuture<Vec<u8>>;
fn fetch_flags(&self) -> ResultFuture<Flag>;
fn set_flag(&mut self, flag: Flag, value: bool) -> ResultFuture<()>;
fn set_tag(&mut self, tag: String, value: bool) -> ResultFuture<()>;
@ -458,7 +458,7 @@ impl ReadOnlyOp {
}
impl BackendOp for ReadOnlyOp {
fn as_bytes(&mut self) -> Result<&[u8]> {
fn as_bytes(&mut self) -> ResultFuture<Vec<u8>> {
self.op.as_bytes()
}
fn fetch_flags(&self) -> ResultFuture<Flag> {

View File

@ -31,12 +31,7 @@ use std::sync::{Arc, Mutex};
#[derive(Debug, Clone)]
pub struct ImapOp {
uid: usize,
bytes: Option<String>,
headers: Option<String>,
body: Option<String>,
mailbox_path: String,
mailbox_hash: MailboxHash,
flags: Arc<FutureMutex<Option<Flag>>>,
connection: Arc<Mutex<ImapConnection>>,
uid_store: Arc<UIDStore>,
}
@ -52,56 +47,43 @@ impl ImapOp {
ImapOp {
uid,
connection,
bytes: None,
headers: None,
body: None,
mailbox_path,
mailbox_hash,
flags: Arc::new(FutureMutex::new(None)),
uid_store,
}
}
}
impl BackendOp for ImapOp {
fn as_bytes(&mut self) -> Result<&[u8]> {
if self.bytes.is_none() {
fn as_bytes(&mut self) -> ResultFuture<Vec<u8>> {
let mut bytes_cache = self.uid_store.byte_cache.lock()?;
let cache = bytes_cache.entry(self.uid).or_default();
if cache.bytes.is_none() {
let mut response = String::with_capacity(8 * 1024);
{
let mut conn = try_lock(&self.connection, Some(std::time::Duration::new(2, 0)))?;
conn.examine_mailbox(self.mailbox_hash, &mut response)?;
conn.send_command(format!("UID FETCH {} (FLAGS RFC822)", self.uid).as_bytes())?;
conn.read_response(&mut response, RequiredResponses::FETCH_REQUIRED)?;
}
debug!(
"fetch response is {} bytes and {} lines",
response.len(),
response.lines().collect::<Vec<&str>>().len()
);
let UidFetchResponse {
uid, flags, body, ..
} = protocol_parser::uid_fetch_response(&response)?.1;
assert_eq!(uid, self.uid);
assert!(body.is_some());
let mut bytes_cache = self.uid_store.byte_cache.lock()?;
let cache = bytes_cache.entry(self.uid).or_default();
if cache.bytes.is_some() {
self.bytes = cache.bytes.clone();
} else {
drop(cache);
drop(bytes_cache);
let mut response = String::with_capacity(8 * 1024);
{
let mut conn =
try_lock(&self.connection, Some(std::time::Duration::new(2, 0)))?;
conn.examine_mailbox(self.mailbox_hash, &mut response, false)?;
conn.send_command(format!("UID FETCH {} (FLAGS RFC822)", self.uid).as_bytes())?;
conn.read_response(&mut response, RequiredResponses::FETCH_REQUIRED)?;
}
debug!(
"fetch response is {} bytes and {} lines",
response.len(),
response.lines().collect::<Vec<&str>>().len()
);
let UidFetchResponse {
uid, flags, body, ..
} = protocol_parser::uid_fetch_response(&response)?.1;
assert_eq!(uid, self.uid);
assert!(body.is_some());
let mut bytes_cache = self.uid_store.byte_cache.lock()?;
let cache = bytes_cache.entry(self.uid).or_default();
if let Some((flags, _)) = flags {
cache.flags = Some(flags);
}
cache.bytes =
Some(unsafe { std::str::from_utf8_unchecked(body.unwrap()).to_string() });
self.bytes = cache.bytes.clone();
if let Some((flags, _)) = flags {
cache.flags = Some(flags);
}
cache.bytes = Some(unsafe { std::str::from_utf8_unchecked(body.unwrap()).to_string() });
}
Ok(self.bytes.as_ref().unwrap().as_bytes())
let ret = cache.bytes.clone().unwrap().into_bytes();
Ok(Box::pin(async move { Ok(ret) }))
}
fn fetch_flags(&self) -> ResultFuture<Flag> {
@ -109,13 +91,9 @@ impl BackendOp for ImapOp {
let mailbox_hash = self.mailbox_hash;
let uid = self.uid;
let uid_store = self.uid_store.clone();
let flags = self.flags.clone();
let mut response = String::with_capacity(8 * 1024);
Ok(Box::pin(async move {
if let Some(val) = *flags.lock().await {
return Ok(val);
}
let exists_in_cache = {
let mut bytes_cache = uid_store.byte_cache.lock()?;
let cache = bytes_cache.entry(uid).or_default();
@ -158,8 +136,6 @@ impl BackendOp for ImapOp {
let val = cache.flags;
val
};
let mut f = flags.lock().await;
*f = val;
Ok(val.unwrap())
}
}))

View File

@ -376,9 +376,6 @@ impl MailBackend for ImapType {
};
Ok(Box::new(ImapOp::new(
uid,
self.uid_store.mailboxes.read().unwrap()[&mailbox_hash]
.imap_path()
.to_string(),
mailbox_hash,
self.connection.clone(),
self.uid_store.clone(),

View File

@ -30,12 +30,7 @@ use std::sync::Arc;
#[derive(Debug, Clone)]
pub struct ImapOp {
uid: usize,
bytes: Option<String>,
headers: Option<String>,
body: Option<String>,
mailbox_path: String,
mailbox_hash: MailboxHash,
flags: Arc<FutureMutex<Option<Flag>>>,
connection: Arc<FutureMutex<ImapConnection>>,
uid_store: Arc<UIDStore>,
}
@ -43,7 +38,6 @@ pub struct ImapOp {
impl ImapOp {
pub fn new(
uid: usize,
mailbox_path: String,
mailbox_hash: MailboxHash,
connection: Arc<FutureMutex<ImapConnection>>,
uid_store: Arc<UIDStore>,
@ -51,65 +45,63 @@ impl ImapOp {
ImapOp {
uid,
connection,
bytes: None,
headers: None,
body: None,
mailbox_path,
mailbox_hash,
flags: Arc::new(FutureMutex::new(None)),
uid_store,
}
}
}
impl BackendOp for ImapOp {
fn as_bytes(&mut self) -> Result<&[u8]> {
if self.bytes.is_none() {
let mut bytes_cache = self.uid_store.byte_cache.lock()?;
let cache = bytes_cache.entry(self.uid).or_default();
if cache.bytes.is_some() {
self.bytes = cache.bytes.clone();
} else {
drop(cache);
drop(bytes_cache);
let ret: Result<()> = futures::executor::block_on(async {
let mut response = String::with_capacity(8 * 1024);
{
let mut conn = self.connection.lock().await;
conn.examine_mailbox(self.mailbox_hash, &mut response, false)
.await?;
conn.send_command(
format!("UID FETCH {} (FLAGS RFC822)", self.uid).as_bytes(),
)
fn as_bytes(&mut self) -> ResultFuture<Vec<u8>> {
let mut response = String::with_capacity(8 * 1024);
let connection = self.connection.clone();
let mailbox_hash = self.mailbox_hash;
let uid = self.uid;
let uid_store = self.uid_store.clone();
Ok(Box::pin(async move {
let exists_in_cache = {
let mut bytes_cache = uid_store.byte_cache.lock()?;
let cache = bytes_cache.entry(uid).or_default();
cache.bytes.is_some()
};
if !exists_in_cache {
let mut response = String::with_capacity(8 * 1024);
{
let mut conn = connection.lock().await;
conn.examine_mailbox(mailbox_hash, &mut response, false)
.await?;
conn.read_response(&mut response, RequiredResponses::FETCH_REQUIRED)
.await?;
}
debug!(
"fetch response is {} bytes and {} lines",
response.len(),
response.lines().collect::<Vec<&str>>().len()
);
let UidFetchResponse {
uid, flags, body, ..
} = protocol_parser::uid_fetch_response(&response)?.1;
assert_eq!(uid, self.uid);
assert!(body.is_some());
let mut bytes_cache = self.uid_store.byte_cache.lock()?;
let cache = bytes_cache.entry(self.uid).or_default();
if let Some((flags, _)) = flags {
//self.flags.set(Some(flags));
cache.flags = Some(flags);
}
cache.bytes =
Some(unsafe { std::str::from_utf8_unchecked(body.unwrap()).to_string() });
self.bytes = cache.bytes.clone();
Ok(())
});
ret?;
conn.send_command(format!("UID FETCH {} (FLAGS RFC822)", uid).as_bytes())
.await?;
conn.read_response(&mut response, RequiredResponses::FETCH_REQUIRED)
.await?;
}
debug!(
"fetch response is {} bytes and {} lines",
response.len(),
response.lines().collect::<Vec<&str>>().len()
);
let UidFetchResponse {
uid: _uid,
flags: _flags,
body,
..
} = protocol_parser::uid_fetch_response(&response)?.1;
assert_eq!(_uid, uid);
assert!(body.is_some());
let mut bytes_cache = uid_store.byte_cache.lock()?;
let cache = bytes_cache.entry(uid).or_default();
if let Some((_flags, _)) = _flags {
//flags.lock().await.set(Some(_flags));
cache.flags = Some(_flags);
}
cache.bytes =
Some(unsafe { std::str::from_utf8_unchecked(body.unwrap()).to_string() });
}
}
Ok(self.bytes.as_ref().unwrap().as_bytes())
let mut bytes_cache = uid_store.byte_cache.lock()?;
let cache = bytes_cache.entry(uid).or_default();
let ret = cache.bytes.clone().unwrap().into_bytes();
Ok(ret)
}))
}
fn fetch_flags(&self) -> ResultFuture<Flag> {
@ -118,12 +110,8 @@ impl BackendOp for ImapOp {
let mailbox_hash = self.mailbox_hash;
let uid = self.uid;
let uid_store = self.uid_store.clone();
let flags = self.flags.clone();
Ok(Box::pin(async move {
if let Some(val) = *flags.lock().await {
return Ok(val);
}
let exists_in_cache = {
let mut bytes_cache = uid_store.byte_cache.lock()?;
let cache = bytes_cache.entry(uid).or_default();
@ -158,7 +146,6 @@ impl BackendOp for ImapOp {
}
let (_uid, (_flags, _)) = v[0];
assert_eq!(uid, uid);
*flags.lock().await = Some(_flags);
let mut bytes_cache = uid_store.byte_cache.lock()?;
let cache = bytes_cache.entry(uid).or_default();
cache.flags = Some(_flags);
@ -170,8 +157,6 @@ impl BackendOp for ImapOp {
let val = cache.flags;
val
};
let mut f = flags.lock().await;
*f = val;
Ok(val.unwrap())
}
}))

View File

@ -56,7 +56,7 @@ impl JmapOp {
}
impl BackendOp for JmapOp {
fn as_bytes(&mut self) -> Result<&[u8]> {
fn as_bytes(&mut self) -> ResultFuture<Vec<u8>> {
if self.bytes.is_none() {
let mut store_lck = self.store.write().unwrap();
if !(store_lck.byte_cache.contains_key(&self.hash)
@ -86,7 +86,8 @@ impl BackendOp for JmapOp {
}
self.bytes = store_lck.byte_cache[&self.hash].bytes.clone();
}
Ok(&self.bytes.as_ref().unwrap().as_bytes())
let ret = self.bytes.as_ref().unwrap().as_bytes().to_vec();
Ok(Box::pin(async move { Ok(ret) }))
}
fn fetch_flags(&self) -> ResultFuture<Flag> {

View File

@ -90,12 +90,13 @@ impl MaildirOp {
}
impl<'a> BackendOp for MaildirOp {
fn as_bytes(&mut self) -> Result<&[u8]> {
fn as_bytes(&mut self) -> ResultFuture<Vec<u8>> {
if self.slice.is_none() {
self.slice = Some(Mmap::open_path(self.path(), Protection::Read)?);
}
/* Unwrap is safe since we use ? above. */
Ok(unsafe { self.slice.as_ref().unwrap().as_slice() })
let ret = Ok((unsafe { self.slice.as_ref().unwrap().as_slice() }).to_vec());
Ok(Box::pin(async move { ret }))
}
fn fetch_flags(&self) -> ResultFuture<Flag> {

View File

@ -182,14 +182,16 @@ impl MboxOp {
}
impl BackendOp for MboxOp {
fn as_bytes(&mut self) -> Result<&[u8]> {
fn as_bytes(&mut self) -> ResultFuture<Vec<u8>> {
if self.slice.is_none() {
self.slice = Some(Mmap::open_path(&self.path, Protection::Read)?);
}
/* Unwrap is safe since we use ? above. */
Ok(unsafe {
let ret = Ok((unsafe {
&self.slice.as_ref().unwrap().as_slice()[self.offset..self.offset + self.length]
})
.to_vec());
Ok(Box::pin(async move { ret }))
}
fn fetch_flags(&self) -> ResultFuture<Flag> {

View File

@ -646,7 +646,7 @@ struct NotmuchOp {
}
impl BackendOp for NotmuchOp {
fn as_bytes(&mut self) -> Result<&[u8]> {
fn as_bytes(&mut self) -> ResultFuture<Vec<u8>> {
let mut message: *mut notmuch_message_t = std::ptr::null_mut();
let index_lck = self.index.write().unwrap();
unsafe {
@ -662,7 +662,8 @@ impl BackendOp for NotmuchOp {
let mut response = String::new();
f.read_to_string(&mut response)?;
self.bytes = Some(response);
Ok(self.bytes.as_ref().unwrap().as_bytes())
let ret = Ok(self.bytes.as_ref().unwrap().as_bytes().to_vec());
Ok(Box::pin(async move { ret }))
}
fn fetch_flags(&self) -> ResultFuture<Flag> {

View File

@ -229,8 +229,8 @@ impl Envelope {
pub fn from_token(mut operation: Box<dyn BackendOp>, hash: EnvelopeHash) -> Result<Envelope> {
let mut e = Envelope::new(hash);
e.flags = futures::executor::block_on(operation.fetch_flags()?)?;
let bytes = operation.as_bytes()?;
e.populate_headers(bytes)?;
let bytes = futures::executor::block_on(operation.as_bytes()?)?;
e.populate_headers(&bytes)?;
Ok(e)
}
@ -413,11 +413,6 @@ impl Envelope {
_strings.join(", ")
}
/// Requests bytes from backend and thus can fail
pub fn bytes(&self, mut operation: Box<dyn BackendOp>) -> Result<Vec<u8>> {
operation.as_bytes().map(|v| v.to_vec())
}
pub fn body_bytes(&self, bytes: &[u8]) -> Attachment {
let builder = AttachmentBuilder::new(bytes);
builder.build()
@ -439,8 +434,8 @@ impl Envelope {
/// Requests bytes from backend and thus can fail
pub fn body(&self, mut operation: Box<dyn BackendOp>) -> Result<Attachment> {
debug!("searching body for {:?}", self.message_id_display());
let file = operation.as_bytes()?;
Ok(self.body_bytes(file))
let bytes = futures::executor::block_on(operation.as_bytes()?)?;
Ok(self.body_bytes(&bytes))
}
pub fn subject(&self) -> Cow<str> {

View File

@ -138,8 +138,8 @@ impl Draft {
let mut ret = Draft::default();
//TODO: Inform user if error
{
let bytes = op.as_bytes().unwrap_or(&[]);
for (k, v) in envelope.headers(bytes).unwrap_or_else(|_| Vec::new()) {
let bytes = futures::executor::block_on(op.as_bytes()?)?;
for (k, v) in envelope.headers(&bytes).unwrap_or_else(|_| Vec::new()) {
if ignore_header(k.as_bytes()) {
continue;
}

View File

@ -238,9 +238,10 @@ impl Composer {
Some(NotificationType::ERROR),
));
}
Ok(mut op) => {
let parent_bytes = op.as_bytes();
ret.draft = Draft::new_reply(&parent_message, parent_bytes.unwrap());
Ok(op) => {
//FIXME
//let parent_bytes = op.as_bytes();
//ret.draft = Draft::new_reply(&parent_message, parent_bytes.unwrap());
}
}
let subject = parent_message.subject();

View File

@ -203,6 +203,8 @@ pub trait MailListingTrait: ListingTrait {
}
ListingAction::CopyTo(ref mailbox_path) => {
drop(envelope);
/*
* FIXME
match account
.mailbox_by_path(mailbox_path)
.and_then(|mailbox_hash| {
@ -219,10 +221,13 @@ pub trait MailListingTrait: ListingTrait {
}
Ok(fut) => {}
}
*/
continue;
}
ListingAction::CopyToOtherAccount(ref account_name, ref mailbox_path) => {
drop(envelope);
/*
* FIXME
if let Err(err) = op.as_bytes().map(|b| b.to_vec()).and_then(|bytes| {
let account_pos = context
.accounts
@ -245,10 +250,13 @@ pub trait MailListingTrait: ListingTrait {
));
return;
}
*/
continue;
}
ListingAction::MoveTo(ref mailbox_path) => {
drop(envelope);
/*
* FIXME
if let Err(err) =
account
.mailbox_by_path(mailbox_path)
@ -264,10 +272,12 @@ pub trait MailListingTrait: ListingTrait {
));
return;
}
*/
continue;
}
ListingAction::MoveToOtherAccount(ref account_name, ref mailbox_path) => {
drop(envelope);
/* FIXME
if let Err(err) = op
.as_bytes()
.map(|b| b.to_vec())
@ -302,26 +312,37 @@ pub trait MailListingTrait: ListingTrait {
));
return;
}
*/
continue;
}
ListingAction::Tag(Remove(ref tag_str)) => {
if let Err(err) = op.set_tag(tag_str.to_string(), false) {
context.replies.push_back(UIEvent::Notification(
Some("Could not set tag.".to_string()),
err.to_string(),
Some(NotificationType::ERROR),
));
return;
match op.set_tag(tag_str.to_string(), false) {
Err(err) => {
context.replies.push_back(UIEvent::Notification(
Some("Could not set tag.".to_string()),
err.to_string(),
Some(NotificationType::ERROR),
));
return;
}
Ok(fut) => {
//FIXME
}
}
}
ListingAction::Tag(Add(ref tag_str)) => {
if let Err(err) = op.set_tag(tag_str.to_string(), true) {
context.replies.push_back(UIEvent::Notification(
Some("Could not set tag.".to_string()),
err.to_string(),
Some(NotificationType::ERROR),
));
return;
match op.set_tag(tag_str.to_string(), true) {
Err(err) => {
context.replies.push_back(UIEvent::Notification(
Some("Could not set tag.".to_string()),
err.to_string(),
Some(NotificationType::ERROR),
));
return;
}
Ok(fut) => {
// FIXME
}
}
}
_ => unreachable!(),

View File

@ -214,7 +214,7 @@ impl MailListingTrait for PlainListing {
let env_hash = self.get_env_under_cursor(self.cursor_pos.2, context);
let temp = (self.new_cursor_pos.0, self.new_cursor_pos.1, env_hash);
if !force && old_cursor_pos == self.new_cursor_pos {
self.view.update(temp);
self.view.update(temp, context);
} else if self.unfocused {
self.view = MailView::new(temp, None, None, context);
}

View File

@ -632,7 +632,7 @@ impl Component for ThreadListing {
);
if let Some(ref mut v) = self.view {
v.update(coordinates);
v.update(coordinates, context);
} else {
self.view = Some(MailView::new(coordinates, None, None, context));
}

File diff suppressed because it is too large Load Diff

View File

@ -95,7 +95,6 @@ impl EnvelopeView {
&body,
Some(Box::new(|a: &Attachment, v: &mut Vec<u8>| {
if a.content_type().is_text_html() {
use std::io::Write;
let settings = &context.settings;
if let Some(filter_invocation) = settings.pager.html_filter.as_ref() {
let command_obj = Command::new("sh")

View File

@ -961,7 +961,7 @@ impl Component for ThreadView {
self.coordinates.1,
self.entries[self.current_pos()].msg_hash,
);
self.mailview.update(coordinates);
self.mailview.update(coordinates, context);
}
if self.entries.len() == 1 {

View File

@ -156,6 +156,7 @@ pub enum JobRequest {
DeleteMailbox(oneshot::Receiver<Result<HashMap<MailboxHash, Mailbox>>>),
//RenameMailbox,
Search,
AsBytes,
SetMailboxPermissions(MailboxHash, oneshot::Receiver<Result<()>>),
SetMailboxSubscription(MailboxHash, oneshot::Receiver<Result<()>>),
Watch(JoinHandle),
@ -175,6 +176,7 @@ impl core::fmt::Debug for JobRequest {
JobRequest::DeleteMailbox(_) => write!(f, "{}", "JobRequest::DeleteMailbox"),
//JobRequest::RenameMailbox,
JobRequest::Search => write!(f, "{}", "JobRequest::Search"),
JobRequest::AsBytes => write!(f, "{}", "JobRequest::AsBytes"),
JobRequest::SetMailboxPermissions(_, _) => {
write!(f, "{}", "JobRequest::SetMailboxPermissions")
}
@ -1599,7 +1601,7 @@ impl Account {
}
}
//JobRequest::RenameMailbox,
JobRequest::Search => {
JobRequest::Search | JobRequest::AsBytes => {
self.sender
.send(ThreadEvent::UIEvent(UIEvent::StatusEvent(
StatusEvent::JobFinished(job_id.clone()),

View File

@ -296,22 +296,24 @@ struct PluginOp {
}
impl BackendOp for PluginOp {
fn as_bytes(&mut self) -> Result<&[u8]> {
if let Some(ref bytes) = self.bytes {
return Ok(bytes.as_bytes());
}
if let Ok(mut channel) = self.channel.try_lock() {
channel.write_ref(&rmpv::ValueRef::Ext(BACKEND_OP_FN, b"as_bytes"))?;
debug!(channel.expect_ack())?;
channel.write_ref(&rmpv::ValueRef::Integer(self.hash.into()))?;
debug!(channel.expect_ack())?;
let bytes: Result<PluginResult<String>> = channel.from_read();
self.bytes = Some(bytes.map(Into::into).and_then(std::convert::identity)?);
Ok(self.bytes.as_ref().map(String::as_bytes).unwrap())
} else {
Err(MeliError::new("busy"))
}
fn as_bytes(&mut self) -> ResultFuture<Vec<u8>> {
let hash = self.hash;
let channel = self.channel.clone();
Ok(Box::pin(async move {
if let Ok(mut channel) = channel.try_lock() {
channel.write_ref(&rmpv::ValueRef::Ext(BACKEND_OP_FN, b"as_bytes"))?;
debug!(channel.expect_ack())?;
channel.write_ref(&rmpv::ValueRef::Integer(hash.into()))?;
debug!(channel.expect_ack())?;
let bytes: Result<PluginResult<String>> = channel.from_read();
Ok(bytes
.map(Into::into)
.and_then(std::convert::identity)?
.into_bytes())
} else {
Err(MeliError::new("busy"))
}
}))
}
fn fetch_flags(&self) -> ResultFuture<Flag> {