diff --git a/melib/src/backends/imap/connection.rs b/melib/src/backends/imap/connection.rs index 19c367eb..7904f67d 100644 --- a/melib/src/backends/imap/connection.rs +++ b/melib/src/backends/imap/connection.rs @@ -163,6 +163,8 @@ impl ImapStream { }; let mut socket = TcpStream::connect(&addr)?; + socket.set_read_timeout(Some(std::time::Duration::new(120, 0)))?; + socket.set_write_timeout(Some(std::time::Duration::new(120, 0)))?; let cmd_id = 0; if server_conf.use_starttls { socket.write_all(format!("M{} STARTTLS\r\n", cmd_id).as_bytes())?; @@ -170,7 +172,10 @@ impl ImapStream { let mut buf = vec![0; 1024]; let mut response = String::with_capacity(1024); let mut cap_flag = false; - loop { + let mut broken = false; + let now = std::time::Instant::now(); + + while now.elapsed().as_secs() < 3 { let len = socket.read(&mut buf)?; response.push_str(unsafe { std::str::from_utf8_unchecked(&buf[0..len]) }); if !cap_flag { @@ -190,15 +195,21 @@ impl ImapStream { } } if cap_flag && response == "M0 OK Begin TLS negotiation now.\r\n" { + broken = true; break; } } + if !broken { + return Err(MeliError::new(format!( + "Could not initiate TLS negotiation to {}.", + path + ))); + } } socket .set_nonblocking(true) .expect("set_nonblocking call failed"); - socket.set_read_timeout(Some(std::time::Duration::new(120, 0)))?; let stream = { let mut conn_result = connector.connect(path, socket); if let Err(native_tls::HandshakeError::WouldBlock(midhandshake_stream)) = conn_result { @@ -364,6 +375,18 @@ impl From for ImapBlockingConnection { fn from(mut conn: ImapConnection) -> Self { conn.set_nonblocking(false) .expect("set_nonblocking call failed"); + conn.stream.as_mut().map(|s| { + s.stream + .get_mut() + .set_write_timeout(Some(std::time::Duration::new(5 * 60, 0))) + .expect("set_write_timeout call failed") + }); + conn.stream.as_mut().map(|s| { + s.stream + .get_mut() + .set_read_timeout(Some(std::time::Duration::new(5 * 60, 0))) + .expect("set_read_timeout call failed") + }); ImapBlockingConnection { buf: [0; 1024], conn, @@ -392,12 +415,8 @@ impl Iterator for ImapBlockingConnection { } = self; loop { if conn.stream.is_err() { - if let Ok((_, stream)) = ImapStream::new_connection(&conn.server_conf) { - conn.stream = Ok(stream); - } else { - debug!(&conn.stream); - return None; - } + debug!(&conn.stream); + return None; } match conn.stream.as_mut().unwrap().stream.read(buf) { Ok(0) => continue, diff --git a/melib/src/backends/imap/watch.rs b/melib/src/backends/imap/watch.rs index 59291ba9..62c4eae4 100644 --- a/melib/src/backends/imap/watch.rs +++ b/melib/src/backends/imap/watch.rs @@ -175,227 +175,113 @@ pub fn idle(kit: ImapWatchKit) -> Result<()> { let _26_mins = std::time::Duration::from_secs(26 * 60); /* duration interval to check other folders for changes */ let _5_mins = std::time::Duration::from_secs(5 * 60); - loop { - while let Some(line) = iter.next() { - let now = std::time::Instant::now(); - if now.duration_since(beat) >= _26_mins { - exit_on_error!( - sender, - folder_hash, - work_context, - thread_id, - iter.conn.set_nonblocking(true) - iter.conn.send_raw(b"DONE") - iter.conn.read_response(&mut response) - iter.conn.send_command(b"IDLE") - iter.conn.set_nonblocking(false) - main_conn.lock().unwrap().send_command(b"NOOP") - main_conn.lock().unwrap().read_response(&mut response) - ); - beat = now; - } - if now.duration_since(watch) >= _5_mins { - /* Time to poll the other inboxes */ - exit_on_error!( - sender, - folder_hash, - work_context, - thread_id, - iter.conn.set_nonblocking(true) - iter.conn.send_raw(b"DONE") - iter.conn.read_response(&mut response) - ); - for (hash, folder) in folders.lock().unwrap().iter() { - if *hash == folder_hash { - /* Skip INBOX */ - continue; - } - work_context - .set_status - .send(( - thread_id, - format!("examining `{}` for updates...", folder.path()), - )) - .unwrap(); - examine_updates( - folder, - &sender, - &mut iter.conn, - &hash_index, - &uid_index, - &work_context, - ); + while let Some(line) = iter.next() { + let now = std::time::Instant::now(); + if now.duration_since(beat) >= _26_mins { + exit_on_error!( + sender, + folder_hash, + work_context, + thread_id, + iter.conn.set_nonblocking(true) + iter.conn.send_raw(b"DONE") + iter.conn.read_response(&mut response) + iter.conn.send_command(b"IDLE") + iter.conn.set_nonblocking(false) + main_conn.lock().unwrap().send_command(b"NOOP") + main_conn.lock().unwrap().read_response(&mut response) + ); + beat = now; + } + if now.duration_since(watch) >= _5_mins { + /* Time to poll the other inboxes */ + exit_on_error!( + sender, + folder_hash, + work_context, + thread_id, + iter.conn.set_nonblocking(true) + iter.conn.send_raw(b"DONE") + iter.conn.read_response(&mut response) + ); + for (hash, folder) in folders.lock().unwrap().iter() { + if *hash == folder_hash { + /* Skip INBOX */ + continue; } work_context .set_status - .send((thread_id, "done examining mailboxes.".to_string())) + .send(( + thread_id, + format!("examining `{}` for updates...", folder.path()), + )) .unwrap(); + examine_updates( + folder, + &sender, + &mut iter.conn, + &hash_index, + &uid_index, + &work_context, + ); + } + work_context + .set_status + .send((thread_id, "done examining mailboxes.".to_string())) + .unwrap(); + exit_on_error!( + sender, + folder_hash, + work_context, + thread_id, + iter.conn.send_command(b"IDLE") + iter.conn.set_nonblocking(false) + main_conn.lock().unwrap().send_command(b"NOOP") + main_conn.lock().unwrap().read_response(&mut response) + ); + watch = now; + } + match protocol_parser::untagged_responses(line.as_slice()) + .to_full_result() + .map_err(MeliError::from) + { + Ok(Some(Recent(r))) => { + work_context + .set_status + .send((thread_id, format!("got `{} RECENT` notification", r))) + .unwrap(); + /* UID SEARCH RECENT */ exit_on_error!( sender, folder_hash, work_context, thread_id, - iter.conn.send_command(b"IDLE") - iter.conn.set_nonblocking(false) - main_conn.lock().unwrap().send_command(b"NOOP") - main_conn.lock().unwrap().read_response(&mut response) + iter.conn.set_nonblocking(true) + iter.conn.send_raw(b"DONE") + iter.conn.read_response(&mut response) + iter.conn.send_command(b"UID SEARCH RECENT") + iter.conn.read_response(&mut response) ); - watch = now; - } - match protocol_parser::untagged_responses(line.as_slice()) - .to_full_result() - .map_err(MeliError::from) - { - Ok(Some(Recent(r))) => { - work_context - .set_status - .send((thread_id, format!("got `{} RECENT` notification", r))) - .unwrap(); - /* UID SEARCH RECENT */ - exit_on_error!( - sender, - folder_hash, - work_context, - thread_id, - iter.conn.set_nonblocking(true) - iter.conn.send_raw(b"DONE") - iter.conn.read_response(&mut response) - iter.conn.send_command(b"UID SEARCH RECENT") - iter.conn.read_response(&mut response) - ); - match protocol_parser::search_results_raw(response.as_bytes()) - .to_full_result() - .map_err(MeliError::from) - { - Ok(&[]) => { - debug!("UID SEARCH RECENT returned no results"); - } - Ok(v) => { - exit_on_error!( - sender, - folder_hash, - work_context, - thread_id, - iter.conn.send_command( - &[b"UID FETCH", v, b"(FLAGS RFC822.HEADER)"] - .join(&b' '), - ) - iter.conn.read_response(&mut response) - ); - debug!(&response); - match protocol_parser::uid_fetch_response(response.as_bytes()) - .to_full_result() - .map_err(MeliError::from) - { - Ok(v) => { - let len = v.len(); - let mut ctr = 0; - for (uid, flags, b) in v { - work_context - .set_status - .send(( - thread_id, - format!("parsing {}/{} envelopes..", ctr, len), - )) - .unwrap(); - if let Ok(env) = Envelope::from_bytes(&b, flags) { - ctr += 1; - hash_index - .lock() - .unwrap() - .insert(env.hash(), (uid, folder_hash)); - uid_index.lock().unwrap().insert(uid, env.hash()); - debug!( - "Create event {} {} {}", - env.hash(), - env.subject(), - folder.path(), - ); - sender.send(RefreshEvent { - hash: folder_hash, - kind: Create(Box::new(env)), - }); - } - } - work_context - .set_status - .send(( - thread_id, - format!("parsed {}/{} envelopes.", ctr, len), - )) - .unwrap(); - } - Err(e) => { - debug!(e); - } - } - } - Err(e) => { - debug!( - "UID SEARCH RECENT err: {}\nresp: {}", - e.to_string(), - &response - ); - } + match protocol_parser::search_results_raw(response.as_bytes()) + .to_full_result() + .map_err(MeliError::from) + { + Ok(&[]) => { + debug!("UID SEARCH RECENT returned no results"); } - exit_on_error!( - sender, - folder_hash, - work_context, - thread_id, - iter.conn.send_command(b"IDLE") - iter.conn.set_nonblocking(false) - ); - } - Ok(Some(Expunge(n))) => { - work_context - .set_status - .send((thread_id, format!("got `{} EXPUNGED` notification", n))) - .unwrap(); - debug!("expunge {}", n); - } - Ok(Some(Exists(n))) => { - exit_on_error!( - sender, - folder_hash, - work_context, - thread_id, - iter.conn.set_nonblocking(true) - iter.conn.send_raw(b"DONE") - iter.conn.read_response(&mut response) - ); - /* UID FETCH ALL UID, cross-ref, then FETCH difference headers - * */ - let mut prev_exists = folder.exists.lock().unwrap(); - debug!("exists {}", n); - work_context - .set_status - .send(( - thread_id, - format!( - "got `{} EXISTS` notification (EXISTS was previously {} for {}", - n, - *prev_exists, - folder.path() - ), - )) - .unwrap(); - if n > *prev_exists { + Ok(v) => { exit_on_error!( sender, folder_hash, work_context, thread_id, iter.conn.send_command( - &[ - b"FETCH", - format!("{}:{}", *prev_exists + 1, n).as_bytes(), - b"(UID FLAGS RFC822.HEADER)", - ] + &[b"UID FETCH", v, b"(FLAGS RFC822.HEADER)"] .join(&b' '), - ) + ) iter.conn.read_response(&mut response) ); + debug!(&response); match protocol_parser::uid_fetch_response(response.as_bytes()) .to_full_result() .map_err(MeliError::from) @@ -411,10 +297,6 @@ pub fn idle(kit: ImapWatchKit) -> Result<()> { format!("parsing {}/{} envelopes..", ctr, len), )) .unwrap(); - if uid_index.lock().unwrap().contains_key(&uid) { - ctr += 1; - continue; - } if let Ok(env) = Envelope::from_bytes(&b, flags) { ctr += 1; hash_index @@ -443,29 +325,151 @@ pub fn idle(kit: ImapWatchKit) -> Result<()> { debug!(e); } } - - *prev_exists = n; - } else if n < *prev_exists { - *prev_exists = n; } + Err(e) => { + debug!( + "UID SEARCH RECENT err: {}\nresp: {}", + e.to_string(), + &response + ); + } + } + exit_on_error!( + sender, + folder_hash, + work_context, + thread_id, + iter.conn.send_command(b"IDLE") + iter.conn.set_nonblocking(false) + ); + } + Ok(Some(Expunge(n))) => { + work_context + .set_status + .send((thread_id, format!("got `{} EXPUNGED` notification", n))) + .unwrap(); + debug!("expunge {}", n); + } + Ok(Some(Exists(n))) => { + exit_on_error!( + sender, + folder_hash, + work_context, + thread_id, + iter.conn.set_nonblocking(true) + iter.conn.send_raw(b"DONE") + iter.conn.read_response(&mut response) + ); + /* UID FETCH ALL UID, cross-ref, then FETCH difference headers + * */ + let mut prev_exists = folder.exists.lock().unwrap(); + debug!("exists {}", n); + work_context + .set_status + .send(( + thread_id, + format!( + "got `{} EXISTS` notification (EXISTS was previously {} for {}", + n, + *prev_exists, + folder.path() + ), + )) + .unwrap(); + if n > *prev_exists { exit_on_error!( sender, folder_hash, work_context, thread_id, - iter.conn.send_command(b"IDLE") - iter.conn.set_nonblocking(false) + iter.conn.send_command( + &[ + b"FETCH", + format!("{}:{}", *prev_exists + 1, n).as_bytes(), + b"(UID FLAGS RFC822.HEADER)", + ] + .join(&b' '), + ) + iter.conn.read_response(&mut response) ); + match protocol_parser::uid_fetch_response(response.as_bytes()) + .to_full_result() + .map_err(MeliError::from) + { + Ok(v) => { + let len = v.len(); + let mut ctr = 0; + for (uid, flags, b) in v { + work_context + .set_status + .send(( + thread_id, + format!("parsing {}/{} envelopes..", ctr, len), + )) + .unwrap(); + if uid_index.lock().unwrap().contains_key(&uid) { + ctr += 1; + continue; + } + if let Ok(env) = Envelope::from_bytes(&b, flags) { + ctr += 1; + hash_index + .lock() + .unwrap() + .insert(env.hash(), (uid, folder_hash)); + uid_index.lock().unwrap().insert(uid, env.hash()); + debug!( + "Create event {} {} {}", + env.hash(), + env.subject(), + folder.path(), + ); + sender.send(RefreshEvent { + hash: folder_hash, + kind: Create(Box::new(env)), + }); + } + } + work_context + .set_status + .send((thread_id, format!("parsed {}/{} envelopes.", ctr, len))) + .unwrap(); + } + Err(e) => { + debug!(e); + } + } + + *prev_exists = n; + } else if n < *prev_exists { + *prev_exists = n; } - Ok(None) | Err(_) => {} + exit_on_error!( + sender, + folder_hash, + work_context, + thread_id, + iter.conn.send_command(b"IDLE") + iter.conn.set_nonblocking(false) + ); } - work_context - .set_status - .send((thread_id, "IDLEing".to_string())) - .unwrap(); + Ok(None) | Err(_) => {} } + work_context + .set_status + .send((thread_id, "IDLEing".to_string())) + .unwrap(); } - Ok(()) + debug!("IDLE exited"); + work_context + .set_status + .send((thread_id, "IDLE exited".to_string())) + .unwrap(); + sender.send(RefreshEvent { + hash: folder_hash, + kind: RefreshEventKind::Failure(MeliError::new("IDLE exited".to_string())), + }); + Err(MeliError::new("IDLE exited".to_string())) } fn examine_updates(