melib/imap: turn some sync connections to unsync

Signed-off-by: Manos Pitsidianakis <manos@pitsidianak.is>
pull/371/head
Manos Pitsidianakis 2024-03-16 23:47:30 +02:00
parent 4e941a9e8b
commit ec01a4412a
Signed by: Manos Pitsidianakis
GPG Key ID: 7729C7707F7E09D0
1 changed files with 67 additions and 44 deletions

View File

@ -19,11 +19,15 @@
* along with meli. If not, see <http://www.gnu.org/licenses/>.
*/
use super::protocol_parser::{ImapLineSplit, ImapResponse, RequiredResponses, SelectResponse};
use crate::{
backends::{BackendEvent, MailboxHash, RefreshEvent},
email::parser::BytesExt,
error::*,
imap::{
protocol_parser::{self, ImapLineSplit, ImapResponse, RequiredResponses, SelectResponse},
Capabilities, ImapServerConf, UIDStore,
},
text::Truncate,
utils::{
connections::{std_net::connect as tcp_stream_connect, Connection},
futures::timeout,
@ -72,8 +76,6 @@ macro_rules! imap_log {
};
}
use super::{protocol_parser, Capabilities, ImapServerConf, UIDStore};
#[derive(Clone, Copy, Debug)]
pub enum SyncPolicy {
None,
@ -170,10 +172,12 @@ impl ImapStream {
.build()
.chain_err_kind(ErrorKind::Network(NetworkErrorKind::InvalidTLSConnection))?;
let addr = (path.as_str(), server_conf.server_port);
let mut socket = AsyncWrapper::new({
let conn = Connection::new_tcp(tcp_stream_connect(addr, server_conf.timeout)?);
let addr = (path.clone(), server_conf.server_port);
let timeout = server_conf.timeout;
let conn = Connection::new_tcp(
smol::unblock(move || tcp_stream_connect(addr, timeout)).await?,
);
#[cfg(feature = "imap-trace")]
{
conn.trace(true).with_id("imap")
@ -244,39 +248,50 @@ impl ImapStream {
}
{
// [ref:FIXME]: This is blocking
let socket = socket.into_inner()?;
let mut conn_result = connector.connect(path, socket);
if let Err(native_tls::HandshakeError::WouldBlock(midhandshake_stream)) =
conn_result
{
let mut midhandshake_stream = Some(midhandshake_stream);
loop {
match midhandshake_stream.take().unwrap().handshake() {
Ok(r) => {
conn_result = Ok(r);
break;
}
Err(native_tls::HandshakeError::WouldBlock(stream)) => {
midhandshake_stream = Some(stream);
}
p => {
p.chain_err_kind(ErrorKind::Network(
NetworkErrorKind::InvalidTLSConnection,
))?;
let path = Arc::new(path.to_string());
let conn = smol::unblock({
let socket = socket.into_inner()?;
let path = Arc::clone(&path);
move || {
let conn_result = connector.connect(&path, socket);
if let Err(native_tls::HandshakeError::WouldBlock(midhandshake_stream)) =
conn_result
{
let mut midhandshake_stream = Some(midhandshake_stream);
loop {
match midhandshake_stream.take().unwrap().handshake() {
Ok(r) => {
return Ok(r);
}
Err(native_tls::HandshakeError::WouldBlock(stream)) => {
midhandshake_stream = Some(stream);
}
Err(err) => {
return Err(Error::from(err).set_kind(ErrorKind::Network(
NetworkErrorKind::InvalidTLSConnection,
)));
}
}
}
}
conn_result.chain_err_kind(ErrorKind::Network(
NetworkErrorKind::InvalidTLSConnection,
))
}
}
AsyncWrapper::new(Connection::new_tls(conn_result.chain_err_summary(
|| format!("Could not initiate TLS negotiation to {}.", path),
)?))
.chain_err_summary(|| format!("Could not initiate TLS negotiation to {}.", path))?
})
.await
.chain_err_summary(|| format!("Could not initiate TLS negotiation to {}.", path))?;
AsyncWrapper::new(Connection::new_tls(conn))
.chain_err_summary(|| format!("{} connection failed.", path))
.chain_err_kind(ErrorKind::OSError)?
}
} else {
let addr = (path.as_str(), server_conf.server_port);
AsyncWrapper::new({
let conn = Connection::new_tcp(tcp_stream_connect(addr, server_conf.timeout)?);
let addr = (path.clone(), server_conf.server_port);
let timeout = server_conf.timeout;
let conn = Connection::new_tcp(
smol::unblock(move || tcp_stream_connect(addr, timeout)).await?,
);
#[cfg(feature = "imap-trace")]
{
conn.trace(true).with_id("imap")
@ -335,16 +350,24 @@ impl ImapStream {
.map(|(_, v)| v)
});
if capabilities.is_err() {
return Err(Error::new(format!(
"Could not connect to {}: expected CAPABILITY response but got:{}",
&server_conf.server_hostname,
String::from_utf8_lossy(&res)
))
.set_kind(ErrorKind::ProtocolError));
}
let capabilities = match capabilities {
Err(_err) => {
log::debug!(
"Could not connect to {}: expected CAPABILITY response but got: {} `{}`",
&server_conf.server_hostname,
_err,
String::from_utf8_lossy(&res)
);
return Err(Error::new(format!(
"Could not connect to {}: expected CAPABILITY response but got: `{}`",
&server_conf.server_hostname,
String::from_utf8_lossy(&res).as_ref().trim_at_boundary(40)
))
.set_kind(ErrorKind::ProtocolError));
}
Ok(v) => v,
};
let capabilities = capabilities.unwrap();
if !capabilities
.iter()
.any(|cap| cap.eq_ignore_ascii_case(b"IMAP4rev1"))
@ -533,9 +556,9 @@ impl ImapStream {
match self.protocol {
ImapProtocol::IMAP { .. } => {
if matches!(command.body, CommandBody::Login { .. }) {
imap_log!(trace, self, "sent: M{} LOGIN ..", self.cmd_id - 1);
imap_log!(trace, self, "sent: M{} LOGIN ..", self.cmd_id);
} else {
imap_log!(trace, self, "sent: M{} {:?}", self.cmd_id - 1, command.body);
imap_log!(trace, self, "sent: M{} {:?}", self.cmd_id, command.body);
}
}
ImapProtocol::ManageSieve => {}