imap: fix cases that would block connection

Fix blocking if TLS negotiation can't start

Fix blocking if IDLE connection dies.
jmap
Manos Pitsidianakis 2019-11-10 13:30:33 +02:00
parent a907b9c21d
commit 580f0be8a4
Signed by: Manos Pitsidianakis
GPG Key ID: 73627C2F690DF710
2 changed files with 248 additions and 225 deletions

View File

@ -163,6 +163,8 @@ impl ImapStream {
}; };
let mut socket = TcpStream::connect(&addr)?; let mut socket = TcpStream::connect(&addr)?;
socket.set_read_timeout(Some(std::time::Duration::new(120, 0)))?;
socket.set_write_timeout(Some(std::time::Duration::new(120, 0)))?;
let cmd_id = 0; let cmd_id = 0;
if server_conf.use_starttls { if server_conf.use_starttls {
socket.write_all(format!("M{} STARTTLS\r\n", cmd_id).as_bytes())?; socket.write_all(format!("M{} STARTTLS\r\n", cmd_id).as_bytes())?;
@ -170,7 +172,10 @@ impl ImapStream {
let mut buf = vec![0; 1024]; let mut buf = vec![0; 1024];
let mut response = String::with_capacity(1024); let mut response = String::with_capacity(1024);
let mut cap_flag = false; let mut cap_flag = false;
loop { let mut broken = false;
let now = std::time::Instant::now();
while now.elapsed().as_secs() < 3 {
let len = socket.read(&mut buf)?; let len = socket.read(&mut buf)?;
response.push_str(unsafe { std::str::from_utf8_unchecked(&buf[0..len]) }); response.push_str(unsafe { std::str::from_utf8_unchecked(&buf[0..len]) });
if !cap_flag { if !cap_flag {
@ -190,15 +195,21 @@ impl ImapStream {
} }
} }
if cap_flag && response == "M0 OK Begin TLS negotiation now.\r\n" { if cap_flag && response == "M0 OK Begin TLS negotiation now.\r\n" {
broken = true;
break; break;
} }
} }
if !broken {
return Err(MeliError::new(format!(
"Could not initiate TLS negotiation to {}.",
path
)));
}
} }
socket socket
.set_nonblocking(true) .set_nonblocking(true)
.expect("set_nonblocking call failed"); .expect("set_nonblocking call failed");
socket.set_read_timeout(Some(std::time::Duration::new(120, 0)))?;
let stream = { let stream = {
let mut conn_result = connector.connect(path, socket); let mut conn_result = connector.connect(path, socket);
if let Err(native_tls::HandshakeError::WouldBlock(midhandshake_stream)) = conn_result { if let Err(native_tls::HandshakeError::WouldBlock(midhandshake_stream)) = conn_result {
@ -364,6 +375,18 @@ impl From<ImapConnection> for ImapBlockingConnection {
fn from(mut conn: ImapConnection) -> Self { fn from(mut conn: ImapConnection) -> Self {
conn.set_nonblocking(false) conn.set_nonblocking(false)
.expect("set_nonblocking call failed"); .expect("set_nonblocking call failed");
conn.stream.as_mut().map(|s| {
s.stream
.get_mut()
.set_write_timeout(Some(std::time::Duration::new(5 * 60, 0)))
.expect("set_write_timeout call failed")
});
conn.stream.as_mut().map(|s| {
s.stream
.get_mut()
.set_read_timeout(Some(std::time::Duration::new(5 * 60, 0)))
.expect("set_read_timeout call failed")
});
ImapBlockingConnection { ImapBlockingConnection {
buf: [0; 1024], buf: [0; 1024],
conn, conn,
@ -392,12 +415,8 @@ impl Iterator for ImapBlockingConnection {
} = self; } = self;
loop { loop {
if conn.stream.is_err() { if conn.stream.is_err() {
if let Ok((_, stream)) = ImapStream::new_connection(&conn.server_conf) { debug!(&conn.stream);
conn.stream = Ok(stream); return None;
} else {
debug!(&conn.stream);
return None;
}
} }
match conn.stream.as_mut().unwrap().stream.read(buf) { match conn.stream.as_mut().unwrap().stream.read(buf) {
Ok(0) => continue, Ok(0) => continue,

View File

@ -175,227 +175,113 @@ pub fn idle(kit: ImapWatchKit) -> Result<()> {
let _26_mins = std::time::Duration::from_secs(26 * 60); let _26_mins = std::time::Duration::from_secs(26 * 60);
/* duration interval to check other folders for changes */ /* duration interval to check other folders for changes */
let _5_mins = std::time::Duration::from_secs(5 * 60); let _5_mins = std::time::Duration::from_secs(5 * 60);
loop { while let Some(line) = iter.next() {
while let Some(line) = iter.next() { let now = std::time::Instant::now();
let now = std::time::Instant::now(); if now.duration_since(beat) >= _26_mins {
if now.duration_since(beat) >= _26_mins { exit_on_error!(
exit_on_error!( sender,
sender, folder_hash,
folder_hash, work_context,
work_context, thread_id,
thread_id, iter.conn.set_nonblocking(true)
iter.conn.set_nonblocking(true) iter.conn.send_raw(b"DONE")
iter.conn.send_raw(b"DONE") iter.conn.read_response(&mut response)
iter.conn.read_response(&mut response) iter.conn.send_command(b"IDLE")
iter.conn.send_command(b"IDLE") iter.conn.set_nonblocking(false)
iter.conn.set_nonblocking(false) main_conn.lock().unwrap().send_command(b"NOOP")
main_conn.lock().unwrap().send_command(b"NOOP") main_conn.lock().unwrap().read_response(&mut response)
main_conn.lock().unwrap().read_response(&mut response) );
); beat = now;
beat = now; }
} if now.duration_since(watch) >= _5_mins {
if now.duration_since(watch) >= _5_mins { /* Time to poll the other inboxes */
/* Time to poll the other inboxes */ exit_on_error!(
exit_on_error!( sender,
sender, folder_hash,
folder_hash, work_context,
work_context, thread_id,
thread_id, iter.conn.set_nonblocking(true)
iter.conn.set_nonblocking(true) iter.conn.send_raw(b"DONE")
iter.conn.send_raw(b"DONE") iter.conn.read_response(&mut response)
iter.conn.read_response(&mut response) );
); for (hash, folder) in folders.lock().unwrap().iter() {
for (hash, folder) in folders.lock().unwrap().iter() { if *hash == folder_hash {
if *hash == folder_hash { /* Skip INBOX */
/* Skip INBOX */ continue;
continue;
}
work_context
.set_status
.send((
thread_id,
format!("examining `{}` for updates...", folder.path()),
))
.unwrap();
examine_updates(
folder,
&sender,
&mut iter.conn,
&hash_index,
&uid_index,
&work_context,
);
} }
work_context work_context
.set_status .set_status
.send((thread_id, "done examining mailboxes.".to_string())) .send((
thread_id,
format!("examining `{}` for updates...", folder.path()),
))
.unwrap(); .unwrap();
examine_updates(
folder,
&sender,
&mut iter.conn,
&hash_index,
&uid_index,
&work_context,
);
}
work_context
.set_status
.send((thread_id, "done examining mailboxes.".to_string()))
.unwrap();
exit_on_error!(
sender,
folder_hash,
work_context,
thread_id,
iter.conn.send_command(b"IDLE")
iter.conn.set_nonblocking(false)
main_conn.lock().unwrap().send_command(b"NOOP")
main_conn.lock().unwrap().read_response(&mut response)
);
watch = now;
}
match protocol_parser::untagged_responses(line.as_slice())
.to_full_result()
.map_err(MeliError::from)
{
Ok(Some(Recent(r))) => {
work_context
.set_status
.send((thread_id, format!("got `{} RECENT` notification", r)))
.unwrap();
/* UID SEARCH RECENT */
exit_on_error!( exit_on_error!(
sender, sender,
folder_hash, folder_hash,
work_context, work_context,
thread_id, thread_id,
iter.conn.send_command(b"IDLE") iter.conn.set_nonblocking(true)
iter.conn.set_nonblocking(false) iter.conn.send_raw(b"DONE")
main_conn.lock().unwrap().send_command(b"NOOP") iter.conn.read_response(&mut response)
main_conn.lock().unwrap().read_response(&mut response) iter.conn.send_command(b"UID SEARCH RECENT")
iter.conn.read_response(&mut response)
); );
watch = now; match protocol_parser::search_results_raw(response.as_bytes())
} .to_full_result()
match protocol_parser::untagged_responses(line.as_slice()) .map_err(MeliError::from)
.to_full_result() {
.map_err(MeliError::from) Ok(&[]) => {
{ debug!("UID SEARCH RECENT returned no results");
Ok(Some(Recent(r))) => {
work_context
.set_status
.send((thread_id, format!("got `{} RECENT` notification", r)))
.unwrap();
/* UID SEARCH RECENT */
exit_on_error!(
sender,
folder_hash,
work_context,
thread_id,
iter.conn.set_nonblocking(true)
iter.conn.send_raw(b"DONE")
iter.conn.read_response(&mut response)
iter.conn.send_command(b"UID SEARCH RECENT")
iter.conn.read_response(&mut response)
);
match protocol_parser::search_results_raw(response.as_bytes())
.to_full_result()
.map_err(MeliError::from)
{
Ok(&[]) => {
debug!("UID SEARCH RECENT returned no results");
}
Ok(v) => {
exit_on_error!(
sender,
folder_hash,
work_context,
thread_id,
iter.conn.send_command(
&[b"UID FETCH", v, b"(FLAGS RFC822.HEADER)"]
.join(&b' '),
)
iter.conn.read_response(&mut response)
);
debug!(&response);
match protocol_parser::uid_fetch_response(response.as_bytes())
.to_full_result()
.map_err(MeliError::from)
{
Ok(v) => {
let len = v.len();
let mut ctr = 0;
for (uid, flags, b) in v {
work_context
.set_status
.send((
thread_id,
format!("parsing {}/{} envelopes..", ctr, len),
))
.unwrap();
if let Ok(env) = Envelope::from_bytes(&b, flags) {
ctr += 1;
hash_index
.lock()
.unwrap()
.insert(env.hash(), (uid, folder_hash));
uid_index.lock().unwrap().insert(uid, env.hash());
debug!(
"Create event {} {} {}",
env.hash(),
env.subject(),
folder.path(),
);
sender.send(RefreshEvent {
hash: folder_hash,
kind: Create(Box::new(env)),
});
}
}
work_context
.set_status
.send((
thread_id,
format!("parsed {}/{} envelopes.", ctr, len),
))
.unwrap();
}
Err(e) => {
debug!(e);
}
}
}
Err(e) => {
debug!(
"UID SEARCH RECENT err: {}\nresp: {}",
e.to_string(),
&response
);
}
} }
exit_on_error!( Ok(v) => {
sender,
folder_hash,
work_context,
thread_id,
iter.conn.send_command(b"IDLE")
iter.conn.set_nonblocking(false)
);
}
Ok(Some(Expunge(n))) => {
work_context
.set_status
.send((thread_id, format!("got `{} EXPUNGED` notification", n)))
.unwrap();
debug!("expunge {}", n);
}
Ok(Some(Exists(n))) => {
exit_on_error!(
sender,
folder_hash,
work_context,
thread_id,
iter.conn.set_nonblocking(true)
iter.conn.send_raw(b"DONE")
iter.conn.read_response(&mut response)
);
/* UID FETCH ALL UID, cross-ref, then FETCH difference headers
* */
let mut prev_exists = folder.exists.lock().unwrap();
debug!("exists {}", n);
work_context
.set_status
.send((
thread_id,
format!(
"got `{} EXISTS` notification (EXISTS was previously {} for {}",
n,
*prev_exists,
folder.path()
),
))
.unwrap();
if n > *prev_exists {
exit_on_error!( exit_on_error!(
sender, sender,
folder_hash, folder_hash,
work_context, work_context,
thread_id, thread_id,
iter.conn.send_command( iter.conn.send_command(
&[ &[b"UID FETCH", v, b"(FLAGS RFC822.HEADER)"]
b"FETCH",
format!("{}:{}", *prev_exists + 1, n).as_bytes(),
b"(UID FLAGS RFC822.HEADER)",
]
.join(&b' '), .join(&b' '),
) )
iter.conn.read_response(&mut response) iter.conn.read_response(&mut response)
); );
debug!(&response);
match protocol_parser::uid_fetch_response(response.as_bytes()) match protocol_parser::uid_fetch_response(response.as_bytes())
.to_full_result() .to_full_result()
.map_err(MeliError::from) .map_err(MeliError::from)
@ -411,10 +297,6 @@ pub fn idle(kit: ImapWatchKit) -> Result<()> {
format!("parsing {}/{} envelopes..", ctr, len), format!("parsing {}/{} envelopes..", ctr, len),
)) ))
.unwrap(); .unwrap();
if uid_index.lock().unwrap().contains_key(&uid) {
ctr += 1;
continue;
}
if let Ok(env) = Envelope::from_bytes(&b, flags) { if let Ok(env) = Envelope::from_bytes(&b, flags) {
ctr += 1; ctr += 1;
hash_index hash_index
@ -443,29 +325,151 @@ pub fn idle(kit: ImapWatchKit) -> Result<()> {
debug!(e); debug!(e);
} }
} }
*prev_exists = n;
} else if n < *prev_exists {
*prev_exists = n;
} }
Err(e) => {
debug!(
"UID SEARCH RECENT err: {}\nresp: {}",
e.to_string(),
&response
);
}
}
exit_on_error!(
sender,
folder_hash,
work_context,
thread_id,
iter.conn.send_command(b"IDLE")
iter.conn.set_nonblocking(false)
);
}
Ok(Some(Expunge(n))) => {
work_context
.set_status
.send((thread_id, format!("got `{} EXPUNGED` notification", n)))
.unwrap();
debug!("expunge {}", n);
}
Ok(Some(Exists(n))) => {
exit_on_error!(
sender,
folder_hash,
work_context,
thread_id,
iter.conn.set_nonblocking(true)
iter.conn.send_raw(b"DONE")
iter.conn.read_response(&mut response)
);
/* UID FETCH ALL UID, cross-ref, then FETCH difference headers
* */
let mut prev_exists = folder.exists.lock().unwrap();
debug!("exists {}", n);
work_context
.set_status
.send((
thread_id,
format!(
"got `{} EXISTS` notification (EXISTS was previously {} for {}",
n,
*prev_exists,
folder.path()
),
))
.unwrap();
if n > *prev_exists {
exit_on_error!( exit_on_error!(
sender, sender,
folder_hash, folder_hash,
work_context, work_context,
thread_id, thread_id,
iter.conn.send_command(b"IDLE") iter.conn.send_command(
iter.conn.set_nonblocking(false) &[
b"FETCH",
format!("{}:{}", *prev_exists + 1, n).as_bytes(),
b"(UID FLAGS RFC822.HEADER)",
]
.join(&b' '),
)
iter.conn.read_response(&mut response)
); );
match protocol_parser::uid_fetch_response(response.as_bytes())
.to_full_result()
.map_err(MeliError::from)
{
Ok(v) => {
let len = v.len();
let mut ctr = 0;
for (uid, flags, b) in v {
work_context
.set_status
.send((
thread_id,
format!("parsing {}/{} envelopes..", ctr, len),
))
.unwrap();
if uid_index.lock().unwrap().contains_key(&uid) {
ctr += 1;
continue;
}
if let Ok(env) = Envelope::from_bytes(&b, flags) {
ctr += 1;
hash_index
.lock()
.unwrap()
.insert(env.hash(), (uid, folder_hash));
uid_index.lock().unwrap().insert(uid, env.hash());
debug!(
"Create event {} {} {}",
env.hash(),
env.subject(),
folder.path(),
);
sender.send(RefreshEvent {
hash: folder_hash,
kind: Create(Box::new(env)),
});
}
}
work_context
.set_status
.send((thread_id, format!("parsed {}/{} envelopes.", ctr, len)))
.unwrap();
}
Err(e) => {
debug!(e);
}
}
*prev_exists = n;
} else if n < *prev_exists {
*prev_exists = n;
} }
Ok(None) | Err(_) => {} exit_on_error!(
sender,
folder_hash,
work_context,
thread_id,
iter.conn.send_command(b"IDLE")
iter.conn.set_nonblocking(false)
);
} }
work_context Ok(None) | Err(_) => {}
.set_status
.send((thread_id, "IDLEing".to_string()))
.unwrap();
} }
work_context
.set_status
.send((thread_id, "IDLEing".to_string()))
.unwrap();
} }
Ok(()) debug!("IDLE exited");
work_context
.set_status
.send((thread_id, "IDLE exited".to_string()))
.unwrap();
sender.send(RefreshEvent {
hash: folder_hash,
kind: RefreshEventKind::Failure(MeliError::new("IDLE exited".to_string())),
});
Err(MeliError::new("IDLE exited".to_string()))
} }
fn examine_updates( fn examine_updates(