/* * meli - imap module. * * Copyright 2019 Manos Pitsidianakis * * This file is part of meli. * * meli is free software: you can redistribute it and/or modify * it under the terms of the GNU General Public License as published by * the Free Software Foundation, either version 3 of the License, or * (at your option) any later version. * * meli is distributed in the hope that it will be useful, * but WITHOUT ANY WARRANTY; without even the implied warranty of * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the * GNU General Public License for more details. * * You should have received a copy of the GNU General Public License * along with meli. If not, see . */ use super::*; use crate::backends::SpecialUsageMailbox; use std::sync::{Arc, Mutex, RwLock}; /// Arguments for IMAP watching functions pub struct ImapWatchKit { pub conn: ImapConnection, pub is_online: Arc)>>, pub main_conn: Arc>, pub uid_store: Arc, pub mailboxes: Arc>>, pub sender: RefreshEventConsumer, pub work_context: WorkContext, pub tag_index: Arc>>, } macro_rules! exit_on_error { ($sender:expr, $mailbox_hash:ident, $work_context:ident, $thread_id:ident, $($result:expr)+) => { $(if let Err(e) = $result { debug!("failure: {}", e.to_string()); $work_context.set_status.send(($thread_id, e.to_string())).unwrap(); $work_context.finished.send($thread_id).unwrap(); $sender.send(RefreshEvent { hash: $mailbox_hash, kind: RefreshEventKind::Failure(e.clone()), }); Err(e) } else { Ok(()) }?;)+ }; } pub fn poll_with_examine(kit: ImapWatchKit) -> Result<()> { debug!("poll with examine"); let ImapWatchKit { is_online, mut conn, main_conn, uid_store, mailboxes, sender, work_context, tag_index, } = kit; loop { if is_online.lock().unwrap().1.is_ok() { break; } std::thread::sleep(std::time::Duration::from_millis(100)); } conn.connect()?; let mut response = String::with_capacity(8 * 1024); let thread_id: std::thread::ThreadId = std::thread::current().id(); loop { work_context .set_status .send((thread_id, "sleeping...".to_string())) .unwrap(); std::thread::sleep(std::time::Duration::from_millis(5 * 60 * 1000)); let mailboxes = mailboxes.read().unwrap(); for mailbox in mailboxes.values() { work_context .set_status .send(( thread_id, format!("examining `{}` for updates...", mailbox.path()), )) .unwrap(); examine_updates( mailbox, &sender, &mut conn, &uid_store, &work_context, &tag_index, )?; } let mut main_conn = main_conn.lock().unwrap(); main_conn.send_command(b"NOOP").unwrap(); main_conn.read_response(&mut response).unwrap(); } } pub fn idle(kit: ImapWatchKit) -> Result<()> { debug!("IDLE"); /* IDLE only watches the connection's selected mailbox. We will IDLE on INBOX and every ~5 * minutes wake up and poll the others */ let ImapWatchKit { mut conn, is_online, main_conn, uid_store, mailboxes, sender, work_context, tag_index, } = kit; loop { if is_online.lock().unwrap().1.is_ok() { break; } std::thread::sleep(std::time::Duration::from_millis(100)); } conn.connect()?; let thread_id: std::thread::ThreadId = std::thread::current().id(); let mailbox: ImapMailbox = match mailboxes .read() .unwrap() .values() .find(|f| f.parent.is_none() && (f.special_usage() == SpecialUsageMailbox::Inbox)) .map(std::clone::Clone::clone) { Some(mailbox) => mailbox, None => { let err = MeliError::new("INBOX mailbox not found in local mailbox index. meli may have not parsed the IMAP mailboxes correctly"); debug!("failure: {}", err.to_string()); work_context .set_status .send((thread_id, err.to_string())) .unwrap(); sender.send(RefreshEvent { hash: 0, kind: RefreshEventKind::Failure(err.clone()), }); return Err(err); } }; let mailbox_hash = mailbox.hash(); let mut response = String::with_capacity(8 * 1024); exit_on_error!( sender, mailbox_hash, work_context, thread_id, conn.send_command(format!("SELECT \"{}\"", mailbox.imap_path()).as_bytes()) conn.read_response(&mut response) ); debug!("select response {}", &response); { let mut prev_exists = mailbox.exists.lock().unwrap(); *prev_exists = match protocol_parser::select_response(&response) { Ok(ok) => { { let mut uidvalidities = uid_store.uidvalidity.lock().unwrap(); if let Some(v) = uidvalidities.get_mut(&mailbox_hash) { if *v != ok.uidvalidity { sender.send(RefreshEvent { hash: mailbox_hash, kind: RefreshEventKind::Rescan, }); *prev_exists = 0; uid_store.uid_index.lock().unwrap().clear(); uid_store.hash_index.lock().unwrap().clear(); uid_store.byte_cache.lock().unwrap().clear(); *v = ok.uidvalidity; } } else { sender.send(RefreshEvent { hash: mailbox_hash, kind: RefreshEventKind::Rescan, }); sender.send(RefreshEvent { hash: mailbox_hash, kind: RefreshEventKind::Failure(MeliError::new(format!( "Unknown mailbox: {} {}", mailbox.path(), mailbox_hash ))), }); } } debug!(&ok); ok.exists } Err(e) => { debug!("{:?}", e); panic!("could not select mailbox"); } }; } exit_on_error!( sender, mailbox_hash, work_context, thread_id, conn.send_command(b"IDLE") ); work_context .set_status .send((thread_id, "IDLEing".to_string())) .unwrap(); let mut iter = ImapBlockingConnection::from(conn); let mut beat = std::time::Instant::now(); let mut watch = std::time::Instant::now(); /* duration interval to send heartbeat */ let _26_mins = std::time::Duration::from_secs(26 * 60); /* duration interval to check other mailboxes for changes */ let _5_mins = std::time::Duration::from_secs(5 * 60); while let Some(line) = iter.next() { let now = std::time::Instant::now(); if now.duration_since(beat) >= _26_mins { exit_on_error!( sender, mailbox_hash, work_context, thread_id, iter.conn.set_nonblocking(true) iter.conn.send_raw(b"DONE") iter.conn.read_response(&mut response) iter.conn.send_command(b"IDLE") iter.conn.set_nonblocking(false) main_conn.lock().unwrap().send_command(b"NOOP") main_conn.lock().unwrap().read_response(&mut response) ); beat = now; } if now.duration_since(watch) >= _5_mins { /* Time to poll all inboxes */ let mut conn = main_conn.lock().unwrap(); for mailbox in mailboxes.read().unwrap().values() { work_context .set_status .send(( thread_id, format!("examining `{}` for updates...", mailbox.path()), )) .unwrap(); exit_on_error!( sender, mailbox_hash, work_context, thread_id, examine_updates( mailbox, &sender, &mut conn, &uid_store, &work_context, &tag_index ) ); } work_context .set_status .send((thread_id, "done examining mailboxes.".to_string())) .unwrap(); watch = now; } match protocol_parser::untagged_responses(line.as_slice()) .to_full_result() .map_err(MeliError::from) { Ok(Some(Recent(r))) => { let mut conn = main_conn.lock().unwrap(); work_context .set_status .send((thread_id, format!("got `{} RECENT` notification", r))) .unwrap(); /* UID SEARCH RECENT */ exit_on_error!( sender, mailbox_hash, work_context, thread_id, conn.send_command(b"EXAMINE INBOX") conn.read_response(&mut response) conn.send_command(b"UID SEARCH RECENT") conn.read_response(&mut response) ); match protocol_parser::search_results_raw(response.as_bytes()) .to_full_result() .map_err(MeliError::from) { Ok(&[]) => { debug!("UID SEARCH RECENT returned no results"); } Ok(v) => { exit_on_error!( sender, mailbox_hash, work_context, thread_id, conn.send_command( &[b"UID FETCH", v, b"(FLAGS RFC822.HEADER)"] .join(&b' '), ) conn.read_response(&mut response) ); debug!(&response); match protocol_parser::uid_fetch_responses(&response) { Ok((_, v, _)) => { let len = v.len(); let mut ctr = 0; for UidFetchResponse { uid, flags, body, .. } in v { work_context .set_status .send(( thread_id, format!("parsing {}/{} envelopes..", ctr, len), )) .unwrap(); if let Ok(mut env) = Envelope::from_bytes( body.unwrap(), flags.as_ref().map(|&(f, _)| f), ) { ctr += 1; uid_store .hash_index .lock() .unwrap() .insert(env.hash(), (uid, mailbox_hash)); uid_store.uid_index.lock().unwrap().insert(uid, env.hash()); debug!( "Create event {} {} {}", env.hash(), env.subject(), mailbox.path(), ); if let Some((_, keywords)) = flags { let mut tag_lck = tag_index.write().unwrap(); for f in keywords { let hash = tag_hash!(f); if !tag_lck.contains_key(&hash) { tag_lck.insert(hash, f); } env.labels_mut().push(hash); } } if !env.is_seen() { *mailbox.unseen.lock().unwrap() += 1; } *mailbox.exists.lock().unwrap() += 1; sender.send(RefreshEvent { hash: mailbox_hash, kind: Create(Box::new(env)), }); } } work_context .set_status .send((thread_id, format!("parsed {}/{} envelopes.", ctr, len))) .unwrap(); } Err(e) => { debug!(e); } } } Err(e) => { debug!( "UID SEARCH RECENT err: {}\nresp: {}", e.to_string(), &response ); } } } Ok(Some(Expunge(n))) => { work_context .set_status .send((thread_id, format!("got `{} EXPUNGED` notification", n))) .unwrap(); debug!("expunge {}", n); } Ok(Some(Exists(n))) => { let mut conn = main_conn.lock().unwrap(); /* UID FETCH ALL UID, cross-ref, then FETCH difference headers * */ let mut prev_exists = mailbox.exists.lock().unwrap(); debug!("exists {}", n); work_context .set_status .send(( thread_id, format!( "got `{} EXISTS` notification (EXISTS was previously {} for {}", n, *prev_exists, mailbox.path() ), )) .unwrap(); if n > *prev_exists { exit_on_error!( sender, mailbox_hash, work_context, thread_id, conn.send_command(b"EXAMINE INBOX") conn.read_response(&mut response) conn.send_command( &[ b"FETCH", format!("{}:{}", *prev_exists + 1, n).as_bytes(), b"(UID FLAGS RFC822.HEADER)", ] .join(&b' '), ) conn.read_response(&mut response) ); match protocol_parser::uid_fetch_responses(&response) { Ok((_, v, _)) => { let len = v.len(); let mut ctr = 0; for UidFetchResponse { uid, flags, body, .. } in v { work_context .set_status .send(( thread_id, format!("parsing {}/{} envelopes..", ctr, len), )) .unwrap(); if uid_store.uid_index.lock().unwrap().contains_key(&uid) { ctr += 1; continue; } if let Ok(mut env) = Envelope::from_bytes( body.unwrap(), flags.as_ref().map(|&(f, _)| f), ) { ctr += 1; uid_store .hash_index .lock() .unwrap() .insert(env.hash(), (uid, mailbox_hash)); uid_store.uid_index.lock().unwrap().insert(uid, env.hash()); if let Some((_, keywords)) = flags { let mut tag_lck = tag_index.write().unwrap(); for f in keywords { let hash = tag_hash!(f); if !tag_lck.contains_key(&hash) { tag_lck.insert(hash, f); } env.labels_mut().push(hash); } } debug!( "Create event {} {} {}", env.hash(), env.subject(), mailbox.path(), ); if !env.is_seen() { *mailbox.unseen.lock().unwrap() += 1; } sender.send(RefreshEvent { hash: mailbox_hash, kind: Create(Box::new(env)), }); } } work_context .set_status .send((thread_id, format!("parsed {}/{} envelopes.", ctr, len))) .unwrap(); } Err(e) => { debug!(e); } } *prev_exists = n; } else if n < *prev_exists { *prev_exists = n; } } Ok(None) | Err(_) => {} } work_context .set_status .send((thread_id, "IDLEing".to_string())) .unwrap(); } debug!("IDLE connection dropped"); let err: &str = iter.err().unwrap_or("Unknown reason."); work_context .set_status .send((thread_id, "IDLE connection dropped".to_string())) .unwrap(); work_context.finished.send(thread_id).unwrap(); sender.send(RefreshEvent { hash: mailbox_hash, kind: RefreshEventKind::Failure(MeliError::new(format!( "IDLE connection dropped: {}", &err ))), }); Err(MeliError::new(format!("IDLE connection dropped: {}", err))) } pub fn examine_updates( mailbox: &ImapMailbox, sender: &RefreshEventConsumer, conn: &mut ImapConnection, uid_store: &Arc, work_context: &WorkContext, tag_index: &Arc>>, ) -> Result<()> { let thread_id: std::thread::ThreadId = std::thread::current().id(); let mailbox_hash = mailbox.hash(); debug!("examining mailbox {} {}", mailbox_hash, mailbox.path()); let mut response = String::with_capacity(8 * 1024); exit_on_error!( sender, mailbox_hash, work_context, thread_id, conn.send_command(format!("EXAMINE \"{}\"", mailbox.imap_path()).as_bytes()) conn.read_response(&mut response) ); match protocol_parser::select_response(&response) { Ok(ok) => { debug!(&ok); { let mut uidvalidities = uid_store.uidvalidity.lock().unwrap(); if let Some(v) = uidvalidities.get_mut(&mailbox_hash) { if *v != ok.uidvalidity { sender.send(RefreshEvent { hash: mailbox_hash, kind: RefreshEventKind::Rescan, }); uid_store.uid_index.lock().unwrap().clear(); uid_store.hash_index.lock().unwrap().clear(); uid_store.byte_cache.lock().unwrap().clear(); *v = ok.uidvalidity; } } else { sender.send(RefreshEvent { hash: mailbox_hash, kind: RefreshEventKind::Rescan, }); sender.send(RefreshEvent { hash: mailbox_hash, kind: RefreshEventKind::Failure(MeliError::new(format!( "Unknown mailbox: {} {}", mailbox.path(), mailbox_hash ))), }); } } let mut prev_exists = mailbox.exists.lock().unwrap(); let n = ok.exists; if ok.recent > 0 { { /* UID SEARCH RECENT */ exit_on_error!( sender, mailbox_hash, work_context, thread_id, conn.send_command(b"UID SEARCH RECENT") conn.read_response(&mut response) ); match protocol_parser::search_results_raw(response.as_bytes()) .to_full_result() .map_err(MeliError::from) { Ok(&[]) => { debug!("UID SEARCH RECENT returned no results"); } Ok(v) => { exit_on_error!( sender, mailbox_hash, work_context, thread_id, conn.send_command( &[b"UID FETCH", v, b"(FLAGS RFC822.HEADER)"] .join(&b' '), ) conn.read_response(&mut response) ); debug!(&response); match protocol_parser::uid_fetch_responses(&response) { Ok((_, v, _)) => { for UidFetchResponse { uid, flags, body, .. } in v { if let Ok(mut env) = Envelope::from_bytes( body.unwrap(), flags.as_ref().map(|&(f, _)| f), ) { uid_store .hash_index .lock() .unwrap() .insert(env.hash(), (uid, mailbox_hash)); uid_store .uid_index .lock() .unwrap() .insert(uid, env.hash()); debug!( "Create event {} {} {}", env.hash(), env.subject(), mailbox.path(), ); if let Some((_, keywords)) = flags { let mut tag_lck = tag_index.write().unwrap(); for f in keywords { let hash = tag_hash!(f); if !tag_lck.contains_key(&hash) { tag_lck.insert(hash, f); } env.labels_mut().push(hash); } } if !env.is_seen() { *mailbox.unseen.lock().unwrap() += 1; } sender.send(RefreshEvent { hash: mailbox_hash, kind: Create(Box::new(env)), }); } } } Err(e) => { debug!(e); } } } Err(e) => { debug!( "UID SEARCH RECENT err: {}\nresp: {}", e.to_string(), &response ); } } } } else if n > *prev_exists { /* UID FETCH ALL UID, cross-ref, then FETCH difference headers * */ debug!("exists {}", n); exit_on_error!( sender, mailbox_hash, work_context, thread_id, conn.send_command( &[ b"FETCH", format!("{}:{}", *prev_exists + 1, n).as_bytes(), b"(UID FLAGS RFC822.HEADER)", ] .join(&b' '), ) conn.read_response(&mut response) ); match protocol_parser::uid_fetch_responses(&response) { Ok((_, v, _)) => { for UidFetchResponse { uid, flags, body, .. } in v { if let Ok(mut env) = Envelope::from_bytes(body.unwrap(), flags.as_ref().map(|&(f, _)| f)) { uid_store .hash_index .lock() .unwrap() .insert(env.hash(), (uid, mailbox_hash)); uid_store.uid_index.lock().unwrap().insert(uid, env.hash()); if let Some((_, keywords)) = flags { let mut tag_lck = tag_index.write().unwrap(); for f in keywords { let hash = tag_hash!(f); if !tag_lck.contains_key(&hash) { tag_lck.insert(hash, f); } env.labels_mut().push(hash); } } debug!( "Create event {} {} {}", env.hash(), env.subject(), mailbox.path(), ); if !env.is_seen() { *mailbox.unseen.lock().unwrap() += 1; } sender.send(RefreshEvent { hash: mailbox_hash, kind: Create(Box::new(env)), }); } } } Err(e) => { debug!(e); } } *prev_exists = n; } else if n < *prev_exists { *prev_exists = n; } } Err(e) => { debug!("{:?}", e); panic!("could not select mailbox"); } }; Ok(()) }