imap: allow conn to be offline and retry on demand

Split the TlsStream of the live IMAP connection to an enum to allow both
offline and online states. The connection is restarted if offline when
requested.
embed
Manos Pitsidianakis 2019-09-20 09:07:55 +03:00
parent b98ce8828c
commit 7dc3efaedd
Signed by: Manos Pitsidianakis
GPG Key ID: 73627C2F690DF710
2 changed files with 201 additions and 99 deletions

View File

@ -65,7 +65,6 @@ pub struct ImapType {
danger_accept_invalid_certs: bool, danger_accept_invalid_certs: bool,
connection: Arc<Mutex<ImapConnection>>, connection: Arc<Mutex<ImapConnection>>,
capabilities: Capabilities,
folders: FnvHashMap<FolderHash, ImapFolder>, folders: FnvHashMap<FolderHash, ImapFolder>,
hash_index: Arc<Mutex<FnvHashMap<EnvelopeHash, (UID, FolderHash)>>>, hash_index: Arc<Mutex<FnvHashMap<EnvelopeHash, (UID, FolderHash)>>>,
uid_index: Arc<Mutex<FnvHashMap<usize, EnvelopeHash>>>, uid_index: Arc<Mutex<FnvHashMap<usize, EnvelopeHash>>>,
@ -174,15 +173,19 @@ impl MailBackend for ImapType {
sender: RefreshEventConsumer, sender: RefreshEventConsumer,
work_context: WorkContext, work_context: WorkContext,
) -> Result<std::thread::ThreadId> { ) -> Result<std::thread::ThreadId> {
let has_idle: bool = self.capabilities.contains(&b"IDLE"[0..]); let has_idle: bool = self
.connection
.lock()
.unwrap()
.capabilities
.contains(&b"IDLE"[0..]);
let folders = self.folders.clone(); let folders = self.folders.clone();
let conn = ImapConnection::new_connection( let conn = ImapConnection::new_connection(
self.server_hostname.clone(), self.server_hostname.clone(),
self.server_username.clone(), self.server_username.clone(),
self.server_password.clone(), self.server_password.clone(),
self.danger_accept_invalid_certs, self.danger_accept_invalid_certs,
)? );
.1;
let main_conn = self.connection.clone(); let main_conn = self.connection.clone();
let hash_index = self.hash_index.clone(); let hash_index = self.hash_index.clone();
let uid_index = self.uid_index.clone(); let uid_index = self.uid_index.clone();
@ -385,31 +388,6 @@ macro_rules! get_conf_val {
}; };
} }
macro_rules! exit_on_error {
($s:ident, $($result:expr)+) => {
$(if let Err(e) = $result {
eprintln!(
"IMAP error ({}): {}",
$s.name.as_str(),
e.to_string(),
);
std::process::exit(1);
})+
};
($s:ident returning $result:expr) => {
match $result {
Err(e) => {
eprintln!(
"IMAP error ({}): {}",
$s.name.as_str(),
e.to_string(),
);
std::process::exit(1);
},
Ok(v) => v,
}
}
}
impl ImapType { impl ImapType {
pub fn new(s: &AccountSettings, is_subscribed: Box<dyn Fn(&str) -> bool>) -> Self { pub fn new(s: &AccountSettings, is_subscribed: Box<dyn Fn(&str) -> bool>) -> Self {
debug!(s); debug!(s);
@ -418,7 +396,12 @@ impl ImapType {
let server_password = get_conf_val!(s["server_password"]); let server_password = get_conf_val!(s["server_password"]);
let danger_accept_invalid_certs: bool = let danger_accept_invalid_certs: bool =
get_conf_val!(s["danger_accept_invalid_certs"], false); get_conf_val!(s["danger_accept_invalid_certs"], false);
let (capabilities, connection) = exit_on_error!(s returning ImapConnection::new_connection(server_hostname.to_string(), server_username.to_string(), server_password.to_string(), danger_accept_invalid_certs)); let connection = ImapConnection::new_connection(
server_hostname.to_string(),
server_username.to_string(),
server_password.to_string(),
danger_accept_invalid_certs,
);
let mut m = ImapType { let mut m = ImapType {
account_name: s.name().to_string(), account_name: s.name().to_string(),
@ -430,16 +413,9 @@ impl ImapType {
danger_accept_invalid_certs, danger_accept_invalid_certs,
hash_index: Default::default(), hash_index: Default::default(),
uid_index: Default::default(), uid_index: Default::default(),
capabilities,
byte_cache: Default::default(), byte_cache: Default::default(),
}; };
debug!(m
.capabilities
.iter()
.map(|s| String::from_utf8(s.to_vec()).unwrap())
.collect::<Vec<String>>());
m.folders = m.imap_folders(); m.folders = m.imap_folders();
m.folders.retain(|_, f| is_subscribed(f.path())); m.folders.retain(|_, f| is_subscribed(f.path()));
let keys = m let keys = m
@ -459,9 +435,7 @@ impl ImapType {
self.server_username.clone(), self.server_username.clone(),
self.server_password.clone(), self.server_password.clone(),
self.danger_accept_invalid_certs, self.danger_accept_invalid_certs,
) );
.unwrap()
.1;
let mut res = String::with_capacity(8 * 1024); let mut res = String::with_capacity(8 * 1024);
conn.read_response(&mut res).unwrap(); conn.read_response(&mut res).unwrap();
debug!("out: {}", &res); debug!("out: {}", &res);

View File

@ -21,7 +21,6 @@
use crate::email::parser::BytesExt; use crate::email::parser::BytesExt;
use crate::error::*; use crate::error::*;
use crate::logging::{LoggingLevel::*, *};
use std::io::Read; use std::io::Read;
use std::io::Write; use std::io::Write;
extern crate native_tls; extern crate native_tls;
@ -33,23 +32,29 @@ use std::net::SocketAddr;
use super::protocol_parser; use super::protocol_parser;
use super::Capabilities; use super::Capabilities;
#[derive(Debug)]
pub struct ImapStream {
cmd_id: usize,
stream: native_tls::TlsStream<std::net::TcpStream>,
}
#[derive(Debug)] #[derive(Debug)]
pub struct ImapConnection { pub struct ImapConnection {
pub cmd_id: usize, pub stream: Result<ImapStream>,
pub stream: native_tls::TlsStream<std::net::TcpStream>,
server_hostname: String, server_hostname: String,
server_username: String, server_username: String,
server_password: String, server_password: String,
danger_accept_invalid_certs: bool, danger_accept_invalid_certs: bool,
pub capabilities: Capabilities,
} }
impl Drop for ImapConnection { impl Drop for ImapStream {
fn drop(&mut self) { fn drop(&mut self) {
self.send_command(b"LOGOUT").ok().take(); self.send_command(b"LOGOUT").ok().take();
} }
} }
impl ImapConnection { impl ImapStream {
pub fn read_response(&mut self, ret: &mut String) -> Result<()> { pub fn read_response(&mut self, ret: &mut String) -> Result<()> {
let id = format!("M{} ", self.cmd_id - 1); let id = format!("M{} ", self.cmd_id - 1);
self.read_lines(ret, id) self.read_lines(ret, id)
@ -59,7 +64,6 @@ impl ImapConnection {
let mut buf: [u8; 1024] = [0; 1024]; let mut buf: [u8; 1024] = [0; 1024];
ret.clear(); ret.clear();
let mut last_line_idx: usize = 0; let mut last_line_idx: usize = 0;
let mut connection_tries = 0;
loop { loop {
match self.stream.read(&mut buf) { match self.stream.read(&mut buf) {
Ok(0) => break, Ok(0) => break,
@ -92,34 +96,7 @@ impl ImapConnection {
Err(ref e) if e.kind() == std::io::ErrorKind::WouldBlock => { Err(ref e) if e.kind() == std::io::ErrorKind::WouldBlock => {
continue; continue;
} }
Err(_) if connection_tries == 0 => {
let server_hostname =
std::mem::replace(&mut self.server_hostname, String::new());
let server_username =
std::mem::replace(&mut self.server_username, String::new());
let server_password =
std::mem::replace(&mut self.server_password, String::new());
log(
format!(
"IMAP connection to `{}` failed. Retrying one more time...",
&server_hostname
),
ERROR,
);
*self = ImapConnection::new_connection(
server_hostname,
server_username,
server_password,
self.danger_accept_invalid_certs,
)?
.1;
connection_tries += 1;
}
Err(e) => { Err(e) => {
log(
format!("IMAP connection to `{}` failed.", &self.server_hostname),
FATAL,
);
return Err(MeliError::from(e)); return Err(MeliError::from(e));
} }
} }
@ -169,11 +146,11 @@ impl ImapConnection {
} }
pub fn new_connection( pub fn new_connection(
server_hostname: String, server_hostname: &str,
server_username: String, server_username: &str,
server_password: String, server_password: &str,
danger_accept_invalid_certs: bool, danger_accept_invalid_certs: bool,
) -> Result<(Capabilities, ImapConnection)> { ) -> Result<(Capabilities, ImapStream)> {
use std::io::prelude::*; use std::io::prelude::*;
use std::net::TcpStream; use std::net::TcpStream;
let path = &server_hostname; let path = &server_hostname;
@ -249,20 +226,9 @@ impl ImapConnection {
} }
conn_result? conn_result?
}; };
let mut ret = ImapConnection { let mut ret = ImapStream { cmd_id, stream };
cmd_id,
stream,
server_hostname,
server_username,
server_password,
danger_accept_invalid_certs,
};
ret.send_command( ret.send_command(
format!( format!("LOGIN \"{}\" \"{}\"", &server_username, &server_password).as_bytes(),
"LOGIN \"{}\" \"{}\"",
&ret.server_username, &ret.server_password
)
.as_bytes(),
)?; )?;
let mut res = String::with_capacity(8 * 1024); let mut res = String::with_capacity(8 * 1024);
ret.read_lines(&mut res, String::new())?; ret.read_lines(&mut res, String::new())?;
@ -273,6 +239,151 @@ impl ImapConnection {
} }
} }
impl ImapConnection {
pub fn new_connection(
server_hostname: String,
server_username: String,
server_password: String,
danger_accept_invalid_certs: bool,
) -> ImapConnection {
ImapConnection {
stream: Err(MeliError::new("Offline".to_string())),
server_hostname,
server_username,
server_password,
danger_accept_invalid_certs,
capabilities: Capabilities::default(),
}
}
pub fn read_response(&mut self, ret: &mut String) -> Result<()> {
if let Ok(ref mut stream) = self.stream {
return stream.read_response(ret);
}
let (capabilities, mut stream) = ImapStream::new_connection(
&self.server_hostname,
&self.server_username,
&self.server_password,
self.danger_accept_invalid_certs,
)?;
let ret = stream.read_response(ret);
if ret.is_ok() {
self.stream = Ok(stream);
self.capabilities = capabilities;
}
ret
}
pub fn read_lines(&mut self, ret: &mut String, termination_string: String) -> Result<()> {
if let Ok(ref mut stream) = self.stream {
return stream.read_lines(ret, termination_string);
}
let (capabilities, mut stream) = ImapStream::new_connection(
&self.server_hostname,
&self.server_username,
&self.server_password,
self.danger_accept_invalid_certs,
)?;
let ret = stream.read_lines(ret, termination_string);
if ret.is_ok() {
self.stream = Ok(stream);
self.capabilities = capabilities;
}
ret
}
pub fn wait_for_continuation_request(&mut self) -> Result<()> {
if let Ok(ref mut stream) = self.stream {
return stream.wait_for_continuation_request();
}
let (capabilities, mut stream) = ImapStream::new_connection(
&self.server_hostname,
&self.server_username,
&self.server_password,
self.danger_accept_invalid_certs,
)?;
let ret = stream.wait_for_continuation_request();
if ret.is_ok() {
self.stream = Ok(stream);
self.capabilities = capabilities;
}
ret
}
pub fn send_command(&mut self, command: &[u8]) -> Result<usize> {
if let Ok(ref mut stream) = self.stream {
return stream.send_command(command);
}
let (capabilities, mut stream) = ImapStream::new_connection(
&self.server_hostname,
&self.server_username,
&self.server_password,
self.danger_accept_invalid_certs,
)?;
let ret = stream.send_command(command);
if ret.is_ok() {
self.stream = Ok(stream);
self.capabilities = capabilities;
}
ret
}
pub fn send_literal(&mut self, data: &[u8]) -> Result<()> {
if let Ok(ref mut stream) = self.stream {
return stream.send_literal(data);
}
let (capabilities, mut stream) = ImapStream::new_connection(
&self.server_hostname,
&self.server_username,
&self.server_password,
self.danger_accept_invalid_certs,
)?;
let ret = stream.send_literal(data);
if ret.is_ok() {
self.stream = Ok(stream);
self.capabilities = capabilities;
}
ret
}
pub fn send_raw(&mut self, raw: &[u8]) -> Result<()> {
if let Ok(ref mut stream) = self.stream {
return stream.send_raw(raw);
}
let (capabilities, mut stream) = ImapStream::new_connection(
&self.server_hostname,
&self.server_username,
&self.server_password,
self.danger_accept_invalid_certs,
)?;
let ret = stream.send_raw(raw);
if ret.is_ok() {
self.stream = Ok(stream);
self.capabilities = capabilities;
}
ret
}
pub fn set_nonblocking(&mut self, val: bool) -> Result<()> {
if let Ok(ref mut stream) = self.stream {
return stream.set_nonblocking(val);
}
let (capabilities, mut stream) = ImapStream::new_connection(
&self.server_hostname,
&self.server_username,
&self.server_password,
self.danger_accept_invalid_certs,
)?;
let ret = stream.set_nonblocking(val);
if ret.is_ok() {
self.stream = Ok(stream);
self.capabilities = capabilities;
}
ret
}
}
pub struct ImapBlockingConnection { pub struct ImapBlockingConnection {
buf: [u8; 1024], buf: [u8; 1024],
result: Vec<u8>, result: Vec<u8>,
@ -282,9 +393,7 @@ pub struct ImapBlockingConnection {
impl From<ImapConnection> for ImapBlockingConnection { impl From<ImapConnection> for ImapBlockingConnection {
fn from(mut conn: ImapConnection) -> Self { fn from(mut conn: ImapConnection) -> Self {
conn.stream conn.set_nonblocking(false)
.get_mut()
.set_nonblocking(false)
.expect("set_nonblocking call failed"); .expect("set_nonblocking call failed");
ImapBlockingConnection { ImapBlockingConnection {
buf: [0; 1024], buf: [0; 1024],
@ -306,15 +415,34 @@ impl Iterator for ImapBlockingConnection {
fn next(&mut self) -> Option<Self::Item> { fn next(&mut self) -> Option<Self::Item> {
self.result.drain(0..self.prev_res_length); self.result.drain(0..self.prev_res_length);
self.prev_res_length = 0; self.prev_res_length = 0;
let ImapBlockingConnection {
ref mut prev_res_length,
ref mut result,
ref mut conn,
ref mut buf,
} = self;
loop { loop {
match self.conn.stream.read(&mut self.buf) { if conn.stream.is_err() {
if let Ok((_, stream)) = ImapStream::new_connection(
&conn.server_hostname,
&conn.server_username,
&conn.server_password,
conn.danger_accept_invalid_certs,
) {
conn.stream = Ok(stream);
} else {
debug!(&conn.stream);
return None;
}
}
match conn.stream.as_mut().unwrap().stream.read(buf) {
Ok(0) => continue, Ok(0) => continue,
Ok(b) => { Ok(b) => {
self.result.extend_from_slice(&self.buf[0..b]); result.extend_from_slice(&buf[0..b]);
debug!(unsafe { std::str::from_utf8_unchecked(&self.result) }); debug!(unsafe { std::str::from_utf8_unchecked(result) });
if let Some(pos) = self.result.find(b"\r\n") { if let Some(pos) = result.find(b"\r\n") {
self.prev_res_length = pos + b"\r\n".len(); *prev_res_length = pos + b"\r\n".len();
return Some(self.result[0..self.prev_res_length].to_vec()); return Some(result[0..*prev_res_length].to_vec());
} }
} }
Err(e) => { Err(e) => {