imap_async: add force parameter to {examine,select}_mailbox()

async
Manos Pitsidianakis 2020-06-29 17:55:51 +03:00
parent 21051fa862
commit adb9061adc
Signed by: Manos Pitsidianakis
GPG Key ID: 73627C2F690DF710
4 changed files with 54 additions and 65 deletions

View File

@ -1480,7 +1480,7 @@ async fn get_initial_max_uid(
conn.create_uid_msn_cache(mailbox_hash, 1).await?; conn.create_uid_msn_cache(mailbox_hash, 1).await?;
/* first SELECT the mailbox to get READ/WRITE permissions (because EXAMINE only /* first SELECT the mailbox to get READ/WRITE permissions (because EXAMINE only
* returns READ-ONLY for both cases) */ * returns READ-ONLY for both cases) */
conn.select_mailbox(mailbox_hash, &mut response) conn.select_mailbox(mailbox_hash, &mut response, true)
.await .await
.chain_err_summary(|| format!("Could not select mailbox {}", mailbox_path))?; .chain_err_summary(|| format!("Could not select mailbox {}", mailbox_path))?;
let mut examine_response = let mut examine_response =
@ -1541,7 +1541,8 @@ async fn get_initial_max_uid(
return Ok(0); return Ok(0);
} }
/* reselecting the same mailbox with EXAMINE prevents expunging it */ /* reselecting the same mailbox with EXAMINE prevents expunging it */
conn.examine_mailbox(mailbox_hash, &mut response).await?; conn.examine_mailbox(mailbox_hash, &mut response, false)
.await?;
if examine_response.uidnext == 0 { if examine_response.uidnext == 0 {
/* UIDNEXT shouldn't be 0, since exists != 0 at this point */ /* UIDNEXT shouldn't be 0, since exists != 0 at this point */
conn.send_command(format!("STATUS \"{}\" (UIDNEXT)", mailbox_path).as_bytes()) conn.send_command(format!("STATUS \"{}\" (UIDNEXT)", mailbox_path).as_bytes())
@ -1592,7 +1593,7 @@ async fn get_hlpr(
conn.create_uid_msn_cache(mailbox_hash, 1).await?; conn.create_uid_msn_cache(mailbox_hash, 1).await?;
/* first SELECT the mailbox to get READ/WRITE permissions (because EXAMINE only /* first SELECT the mailbox to get READ/WRITE permissions (because EXAMINE only
* returns READ-ONLY for both cases) */ * returns READ-ONLY for both cases) */
conn.select_mailbox(mailbox_hash, &mut response) conn.select_mailbox(mailbox_hash, &mut response, true)
.await .await
.chain_err_summary(|| format!("Could not select mailbox {}", mailbox_path))?; .chain_err_summary(|| format!("Could not select mailbox {}", mailbox_path))?;
let mut examine_response = let mut examine_response =
@ -1654,7 +1655,8 @@ async fn get_hlpr(
return Ok(Vec::new()); return Ok(Vec::new());
} }
/* reselecting the same mailbox with EXAMINE prevents expunging it */ /* reselecting the same mailbox with EXAMINE prevents expunging it */
conn.examine_mailbox(mailbox_hash, &mut response).await?; conn.examine_mailbox(mailbox_hash, &mut response, true)
.await?;
if examine_response.uidnext == 0 { if examine_response.uidnext == 0 {
/* UIDNEXT shouldn't be 0, since exists != 0 at this point */ /* UIDNEXT shouldn't be 0, since exists != 0 at this point */
conn.send_command(format!("STATUS \"{}\" (UIDNEXT)", mailbox_path).as_bytes()) conn.send_command(format!("STATUS \"{}\" (UIDNEXT)", mailbox_path).as_bytes())
@ -1676,12 +1678,11 @@ async fn get_hlpr(
*max_uid = Some(examine_response.uidnext - 1); *max_uid = Some(examine_response.uidnext - 1);
examine_response.uidnext - 1 examine_response.uidnext - 1
}; };
let chunk_size = 200; let chunk_size = 600;
let mut payload = vec![]; let mut payload = vec![];
if conn.current_mailbox != Some(mailbox_hash) { conn.examine_mailbox(mailbox_hash, &mut response, false)
conn.examine_mailbox(mailbox_hash, &mut response).await?; .await?;
}
if max_uid_left > 0 { if max_uid_left > 0 {
let mut envelopes = vec![]; let mut envelopes = vec![];
debug!("{} max_uid_left= {}", mailbox_hash, max_uid_left); debug!("{} max_uid_left= {}", mailbox_hash, max_uid_left);

View File

@ -24,8 +24,6 @@ use crate::backends::MailboxHash;
use crate::connections::Connection; use crate::connections::Connection;
use crate::email::parser::BytesExt; use crate::email::parser::BytesExt;
use crate::error::*; use crate::error::*;
use std::io::Read;
use std::io::Write;
extern crate native_tls; extern crate native_tls;
use futures::io::{AsyncReadExt, AsyncWriteExt}; use futures::io::{AsyncReadExt, AsyncWriteExt};
use native_tls::TlsConnector; use native_tls::TlsConnector;
@ -52,13 +50,26 @@ pub struct ImapStream {
protocol: ImapProtocol, protocol: ImapProtocol,
} }
#[derive(Debug, Copy, Clone, Eq, PartialEq)]
pub enum MailboxSelection {
None,
Select(MailboxHash),
Examine(MailboxHash),
}
impl MailboxSelection {
pub fn take(&mut self) -> Self {
std::mem::replace(self, MailboxSelection::None)
}
}
#[derive(Debug)] #[derive(Debug)]
pub struct ImapConnection { pub struct ImapConnection {
pub stream: Result<ImapStream>, pub stream: Result<ImapStream>,
pub server_conf: ImapServerConf, pub server_conf: ImapServerConf,
pub capabilities: Capabilities, pub capabilities: Capabilities,
pub uid_store: Arc<UIDStore>, pub uid_store: Arc<UIDStore>,
pub current_mailbox: Option<MailboxHash>, pub current_mailbox: MailboxSelection,
} }
impl Drop for ImapStream { impl Drop for ImapStream {
@ -427,7 +438,7 @@ impl ImapConnection {
server_conf: server_conf.clone(), server_conf: server_conf.clone(),
capabilities: Capabilities::default(), capabilities: Capabilities::default(),
uid_store, uid_store,
current_mailbox: None, current_mailbox: MailboxSelection::None,
} }
} }
@ -545,44 +556,15 @@ impl ImapConnection {
Ok(()) Ok(())
} }
/*
pub fn try_send(
&mut self,
mut action: impl FnMut(&mut ImapStream) -> Result<()>,
) -> Result<()> {
if let (instant, ref mut status @ Ok(())) = *self.uid_store.is_online.lock().unwrap() {
if Instant::now().duration_since(instant) >= std::time::Duration::new(60 * 30, 0) {
*status = Err(MeliError::new("Connection timed out"));
self.stream = Err(MeliError::new("Connection timed out"));
}
}
if let Ok(ref mut stream) = self.stream {
if let Ok(_) = action(stream).await {
self.uid_store.is_online.lock().unwrap().0 = Instant::now();
return Ok(());
}
}
let new_stream = ImapStream::new_connection(&self.server_conf).await;
if new_stream.is_err() {
*self.uid_store.is_online.lock().unwrap() = (
Instant::now(),
Err(new_stream.as_ref().unwrap_err().clone()),
);
} else {
*self.uid_store.is_online.lock().unwrap() = (Instant::now(), Ok(()));
}
let (capabilities, stream) = new_stream?;
self.stream = Ok(stream);
self.capabilities = capabilities;
Err(MeliError::new("Connection timed out"))
}
*/
pub async fn select_mailbox( pub async fn select_mailbox(
&mut self, &mut self,
mailbox_hash: MailboxHash, mailbox_hash: MailboxHash,
ret: &mut String, ret: &mut String,
force: bool,
) -> Result<()> { ) -> Result<()> {
if !force && self.current_mailbox == MailboxSelection::Select(mailbox_hash) {
return Ok(());
}
self.send_command( self.send_command(
format!( format!(
"SELECT \"{}\"", "SELECT \"{}\"",
@ -594,7 +576,7 @@ impl ImapConnection {
self.read_response(ret, RequiredResponses::SELECT_REQUIRED) self.read_response(ret, RequiredResponses::SELECT_REQUIRED)
.await?; .await?;
debug!("select response {}", ret); debug!("select response {}", ret);
self.current_mailbox = Some(mailbox_hash); self.current_mailbox = MailboxSelection::Select(mailbox_hash);
Ok(()) Ok(())
} }
@ -602,7 +584,11 @@ impl ImapConnection {
&mut self, &mut self,
mailbox_hash: MailboxHash, mailbox_hash: MailboxHash,
ret: &mut String, ret: &mut String,
force: bool,
) -> Result<()> { ) -> Result<()> {
if !force && self.current_mailbox == MailboxSelection::Examine(mailbox_hash) {
return Ok(());
}
self.send_command( self.send_command(
format!( format!(
"EXAMINE \"{}\"", "EXAMINE \"{}\"",
@ -614,12 +600,14 @@ impl ImapConnection {
self.read_response(ret, RequiredResponses::EXAMINE_REQUIRED) self.read_response(ret, RequiredResponses::EXAMINE_REQUIRED)
.await?; .await?;
debug!("examine response {}", ret); debug!("examine response {}", ret);
self.current_mailbox = Some(mailbox_hash); self.current_mailbox = MailboxSelection::Examine(mailbox_hash);
Ok(()) Ok(())
} }
pub async fn unselect(&mut self) -> Result<()> { pub async fn unselect(&mut self) -> Result<()> {
if let Some(mailbox_hash) = self.current_mailbox.take() { match self.current_mailbox.take() {
MailboxSelection::Examine(mailbox_hash) |
MailboxSelection::Select(mailbox_hash) => {
let mut response = String::with_capacity(8 * 1024); let mut response = String::with_capacity(8 * 1024);
if self if self
.capabilities .capabilities
@ -635,9 +623,11 @@ impl ImapConnection {
* reselecting the same mailbox with EXAMINE command)[..] * reselecting the same mailbox with EXAMINE command)[..]
*/ */
self.select_mailbox(mailbox_hash, &mut response).await?; self.select_mailbox(mailbox_hash, &mut response, true).await?;
self.examine_mailbox(mailbox_hash, &mut response).await?; self.examine_mailbox(mailbox_hash, &mut response, true).await?;
} }
},
MailboxSelection::None => {},
} }
Ok(()) Ok(())
} }
@ -660,13 +650,8 @@ impl ImapConnection {
) -> Result<()> { ) -> Result<()> {
debug_assert!(low > 0); debug_assert!(low > 0);
let mut response = String::new(); let mut response = String::new();
if self self.examine_mailbox(mailbox_hash, &mut response, false)
.current_mailbox .await?;
.map(|h| h != mailbox_hash)
.unwrap_or(true)
{
self.examine_mailbox(mailbox_hash, &mut response).await?;
}
self.send_command(format!("UID SEARCH {}:*", low).as_bytes()) self.send_command(format!("UID SEARCH {}:*", low).as_bytes())
.await?; .await?;
self.read_response(&mut response, RequiredResponses::SEARCH) self.read_response(&mut response, RequiredResponses::SEARCH)

View File

@ -81,7 +81,7 @@ impl BackendOp for ImapOp {
let mut response = String::with_capacity(8 * 1024); let mut response = String::with_capacity(8 * 1024);
{ {
let mut conn = self.connection.lock().await; let mut conn = self.connection.lock().await;
conn.examine_mailbox(self.mailbox_hash, &mut response) conn.examine_mailbox(self.mailbox_hash, &mut response, false)
.await?; .await?;
conn.send_command( conn.send_command(
format!("UID FETCH {} (FLAGS RFC822)", self.uid).as_bytes(), format!("UID FETCH {} (FLAGS RFC822)", self.uid).as_bytes(),
@ -129,7 +129,7 @@ impl BackendOp for ImapOp {
futures::executor::block_on(async { futures::executor::block_on(async {
let mut response = String::with_capacity(8 * 1024); let mut response = String::with_capacity(8 * 1024);
let mut conn = self.connection.lock().await; let mut conn = self.connection.lock().await;
conn.examine_mailbox(self.mailbox_hash, &mut response) conn.examine_mailbox(self.mailbox_hash, &mut response, false)
.await?; .await?;
conn.send_command(format!("UID FETCH {} FLAGS", self.uid).as_bytes()) conn.send_command(format!("UID FETCH {} FLAGS", self.uid).as_bytes())
.await?; .await?;
@ -179,7 +179,8 @@ impl BackendOp for ImapOp {
let uid_store = self.uid_store.clone(); let uid_store = self.uid_store.clone();
Ok(Box::pin(async move { Ok(Box::pin(async move {
let mut conn = connection.lock().await; let mut conn = connection.lock().await;
conn.select_mailbox(mailbox_hash, &mut response).await?; conn.select_mailbox(mailbox_hash, &mut response, false)
.await?;
debug!(&response); debug!(&response);
conn.send_command( conn.send_command(
format!( format!(
@ -225,7 +226,8 @@ impl BackendOp for ImapOp {
let uid_store = self.uid_store.clone(); let uid_store = self.uid_store.clone();
Ok(Box::pin(async move { Ok(Box::pin(async move {
let mut conn = connection.lock().await; let mut conn = connection.lock().await;
conn.select_mailbox(mailbox_hash, &mut response).await?; conn.select_mailbox(mailbox_hash, &mut response, false)
.await?;
conn.send_command( conn.send_command(
format!( format!(
"UID STORE {} {}FLAGS.SILENT ({})", "UID STORE {} {}FLAGS.SILENT ({})",

View File

@ -219,7 +219,7 @@ pub async fn idle(kit: ImapWatchKit) -> Result<()> {
conn, conn,
mailbox_hash, mailbox_hash,
thread_id, thread_id,
conn.examine_mailbox(mailbox_hash, &mut response).await conn.examine_mailbox(mailbox_hash, &mut response, false).await
conn.send_command(b"UID SEARCH RECENT").await conn.send_command(b"UID SEARCH RECENT").await
conn.read_response(&mut response, RequiredResponses::SEARCH).await conn.read_response(&mut response, RequiredResponses::SEARCH).await
); );
@ -368,7 +368,7 @@ pub async fn idle(kit: ImapWatchKit) -> Result<()> {
conn, conn,
mailbox_hash, mailbox_hash,
thread_id, thread_id,
conn.examine_mailbox(mailbox_hash, &mut response).await conn.examine_mailbox(mailbox_hash, &mut response, false).await
conn.send_command( conn.send_command(
&[ &[
b"FETCH", b"FETCH",
@ -464,7 +464,7 @@ pub async fn idle(kit: ImapWatchKit) -> Result<()> {
conn, conn,
mailbox_hash, mailbox_hash,
thread_id, thread_id,
conn.examine_mailbox(mailbox_hash, &mut response).await conn.examine_mailbox(mailbox_hash, &mut response, false).await
conn.send_command( conn.send_command(
&[ &[
b"UID SEARCH ", b"UID SEARCH ",
@ -533,7 +533,8 @@ pub async fn examine_updates(
conn, conn,
mailbox_hash, mailbox_hash,
thread_id, thread_id,
conn.examine_mailbox(mailbox_hash, &mut response).await conn.examine_mailbox(mailbox_hash, &mut response, true)
.await
); );
*uid_store.is_online.lock().unwrap() = (Instant::now(), Ok(())); *uid_store.is_online.lock().unwrap() = (Instant::now(), Ok(()));
let uidvalidity; let uidvalidity;