meli/melib/src/backends/imap/watch.rs

450 lines
17 KiB
Rust

/*
* meli - imap module.
*
* Copyright 2019 Manos Pitsidianakis
*
* This file is part of meli.
*
* meli is free software: you can redistribute it and/or modify
* it under the terms of the GNU General Public License as published by
* the Free Software Foundation, either version 3 of the License, or
* (at your option) any later version.
*
* meli is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* GNU General Public License for more details.
*
* You should have received a copy of the GNU General Public License
* along with meli. If not, see <http://www.gnu.org/licenses/>.
*/
use super::*;
use crate::backends::SpecialUsageMailbox;
use std::sync::Arc;
/// Arguments for IMAP watching functions
pub struct ImapWatchKit {
pub conn: ImapConnection,
pub main_conn: Arc<FutureMutex<ImapConnection>>,
pub uid_store: Arc<UIDStore>,
}
pub async fn poll_with_examine(kit: ImapWatchKit) -> Result<()> {
debug!("poll with examine");
let ImapWatchKit {
mut conn,
main_conn: _,
uid_store,
} = kit;
conn.connect().await?;
let mailboxes: HashMap<MailboxHash, ImapMailbox> = {
let mailboxes_lck = timeout(uid_store.timeout, uid_store.mailboxes.lock()).await?;
mailboxes_lck.clone()
};
loop {
for (_, mailbox) in mailboxes.clone() {
examine_updates(mailbox, &mut conn, &uid_store).await?;
}
//FIXME: make sleep duration configurable
smol::Timer::after(std::time::Duration::from_secs(3 * 60)).await;
}
}
pub async fn idle(kit: ImapWatchKit) -> Result<()> {
debug!("IDLE");
/* IDLE only watches the connection's selected mailbox. We will IDLE on INBOX and every ~5
* minutes wake up and poll the others */
let ImapWatchKit {
mut conn,
main_conn,
uid_store,
} = kit;
conn.connect().await?;
let mailbox: ImapMailbox = match uid_store
.mailboxes
.lock()
.await
.values()
.find(|f| f.parent.is_none() && (f.special_usage() == SpecialUsageMailbox::Inbox))
.map(std::clone::Clone::clone)
{
Some(mailbox) => mailbox,
None => {
return Err(MeliError::new("INBOX mailbox not found in local mailbox index. meli may have not parsed the IMAP mailboxes correctly"));
}
};
let mailbox_hash = mailbox.hash();
let mut response = Vec::with_capacity(8 * 1024);
let select_response = conn
.examine_mailbox(mailbox_hash, &mut response, true)
.await?
.unwrap();
{
let mut uidvalidities = uid_store.uidvalidity.lock().unwrap();
if let Some(v) = uidvalidities.get(&mailbox_hash) {
if *v != select_response.uidvalidity {
if uid_store.keep_offline_cache {
#[cfg(not(feature = "sqlite3"))]
let mut cache_handle = super::cache::DefaultCache::get(uid_store.clone())?;
#[cfg(feature = "sqlite3")]
let mut cache_handle = super::cache::Sqlite3Cache::get(uid_store.clone())?;
cache_handle.clear(mailbox_hash, &select_response)?;
}
conn.add_refresh_event(RefreshEvent {
account_hash: uid_store.account_hash,
mailbox_hash,
kind: RefreshEventKind::Rescan,
});
/*
uid_store.uid_index.lock().unwrap().clear();
uid_store.hash_index.lock().unwrap().clear();
uid_store.byte_cache.lock().unwrap().clear();
*/
}
} else {
uidvalidities.insert(mailbox_hash, select_response.uidvalidity);
}
}
let mailboxes: HashMap<MailboxHash, ImapMailbox> = {
let mailboxes_lck = timeout(uid_store.timeout, uid_store.mailboxes.lock()).await?;
mailboxes_lck.clone()
};
for (h, mailbox) in mailboxes.clone() {
if mailbox_hash == h {
continue;
}
examine_updates(mailbox, &mut conn, &uid_store).await?;
}
conn.send_command(b"IDLE").await?;
let mut blockn = ImapBlockingConnection::from(conn);
let mut watch = std::time::Instant::now();
/* duration interval to send heartbeat */
const _10_MINS: std::time::Duration = std::time::Duration::from_secs(10 * 60);
/* duration interval to check other mailboxes for changes */
const _5_MINS: std::time::Duration = std::time::Duration::from_secs(5 * 60);
loop {
let line = match timeout(Some(_10_MINS), blockn.as_stream()).await {
Ok(Some(line)) => line,
Ok(None) => {
debug!("IDLE connection dropped: {:?}", &blockn.err());
blockn.conn.connect().await?;
let mut main_conn_lck = timeout(uid_store.timeout, main_conn.lock()).await?;
main_conn_lck.connect().await?;
continue;
}
Err(_) => {
/* Timeout */
blockn.conn.send_raw(b"DONE").await?;
blockn
.conn
.read_response(&mut response, RequiredResponses::empty())
.await?;
blockn.conn.send_command(b"IDLE").await?;
let mut main_conn_lck = timeout(uid_store.timeout, main_conn.lock()).await?;
main_conn_lck.connect().await?;
continue;
}
};
let now = std::time::Instant::now();
if now.duration_since(watch) >= _5_MINS {
/* Time to poll all inboxes */
let mut conn = timeout(uid_store.timeout, main_conn.lock()).await?;
for (_h, mailbox) in mailboxes.clone() {
examine_updates(mailbox, &mut conn, &uid_store).await?;
}
watch = now;
}
if line
.split_rn()
.filter(|l| {
!l.starts_with(b"+ ")
&& !l.starts_with(b"* ok")
&& !l.starts_with(b"* ok")
&& !l.starts_with(b"* Ok")
&& !l.starts_with(b"* OK")
})
.count()
== 0
{
continue;
}
{
blockn.conn.send_raw(b"DONE").await?;
blockn
.conn
.read_response(&mut response, RequiredResponses::empty())
.await?;
for l in line.split_rn().chain(response.split_rn()) {
debug!("process_untagged {:?}", &l);
if l.starts_with(b"+ ")
|| l.starts_with(b"* ok")
|| l.starts_with(b"* ok")
|| l.starts_with(b"* Ok")
|| l.starts_with(b"* OK")
{
debug!("ignore continuation mark");
continue;
}
blockn.conn.process_untagged(l).await?;
}
blockn.conn.send_command(b"IDLE").await?;
}
}
}
pub async fn examine_updates(
mailbox: ImapMailbox,
conn: &mut ImapConnection,
uid_store: &Arc<UIDStore>,
) -> Result<()> {
if mailbox.no_select {
return Ok(());
}
let mailbox_hash = mailbox.hash();
debug!("examining mailbox {} {}", mailbox_hash, mailbox.path());
if let Some(new_envelopes) = conn.resync(mailbox_hash).await? {
for env in new_envelopes {
conn.add_refresh_event(RefreshEvent {
mailbox_hash,
account_hash: uid_store.account_hash,
kind: RefreshEventKind::Create(Box::new(env)),
});
}
} else {
#[cfg(not(feature = "sqlite3"))]
let mut cache_handle = super::cache::DefaultCache::get(uid_store.clone())?;
#[cfg(feature = "sqlite3")]
let mut cache_handle = super::cache::Sqlite3Cache::get(uid_store.clone())?;
let mut response = Vec::with_capacity(8 * 1024);
let select_response = conn
.examine_mailbox(mailbox_hash, &mut response, true)
.await?
.unwrap();
{
let mut uidvalidities = uid_store.uidvalidity.lock().unwrap();
if let Some(v) = uidvalidities.get(&mailbox_hash) {
if *v != select_response.uidvalidity {
if uid_store.keep_offline_cache {
cache_handle.clear(mailbox_hash, &select_response)?;
}
conn.add_refresh_event(RefreshEvent {
account_hash: uid_store.account_hash,
mailbox_hash,
kind: RefreshEventKind::Rescan,
});
/*
uid_store.uid_index.lock().unwrap().clear();
uid_store.hash_index.lock().unwrap().clear();
uid_store.byte_cache.lock().unwrap().clear();
*/
return Ok(());
}
} else {
uidvalidities.insert(mailbox_hash, select_response.uidvalidity);
}
}
if mailbox.is_cold() {
/* Mailbox hasn't been loaded yet */
let has_list_status: bool = conn
.uid_store
.capabilities
.lock()
.unwrap()
.iter()
.any(|cap| cap.eq_ignore_ascii_case(b"LIST-STATUS"));
if has_list_status {
conn.send_command(
format!(
"LIST \"{}\" \"\" RETURN (STATUS (MESSAGES UNSEEN))",
mailbox.imap_path()
)
.as_bytes(),
)
.await?;
conn.read_response(
&mut response,
RequiredResponses::LIST_REQUIRED | RequiredResponses::STATUS,
)
.await?;
debug!(
"list return status out: {}",
String::from_utf8_lossy(&response)
);
for l in response.split_rn() {
if !l.starts_with(b"*") {
continue;
}
if let Ok(status) = protocol_parser::status_response(l).map(|(_, v)| v) {
if Some(mailbox_hash) == status.mailbox {
if let Some(total) = status.messages {
if let Ok(mut exists_lck) = mailbox.exists.lock() {
exists_lck.clear();
exists_lck.set_not_yet_seen(total);
}
}
if let Some(total) = status.unseen {
if let Ok(mut unseen_lck) = mailbox.unseen.lock() {
unseen_lck.clear();
unseen_lck.set_not_yet_seen(total);
}
}
break;
}
}
}
} else {
conn.send_command(b"SEARCH UNSEEN").await?;
conn.read_response(&mut response, RequiredResponses::SEARCH)
.await?;
let unseen_count = protocol_parser::search_results(&response)?.1.len();
if let Ok(mut exists_lck) = mailbox.exists.lock() {
exists_lck.clear();
exists_lck.set_not_yet_seen(select_response.exists);
}
if let Ok(mut unseen_lck) = mailbox.unseen.lock() {
unseen_lck.clear();
unseen_lck.set_not_yet_seen(unseen_count);
}
}
mailbox.set_warm(true);
return Ok(());
}
if select_response.recent > 0 {
/* UID SEARCH RECENT */
conn.send_command(b"UID SEARCH RECENT").await?;
conn.read_response(&mut response, RequiredResponses::SEARCH)
.await?;
let v = protocol_parser::search_results(response.as_slice()).map(|(_, v)| v)?;
if v.is_empty() {
debug!(
"search response was empty: {}",
String::from_utf8_lossy(&response)
);
return Ok(());
}
let mut cmd = "UID FETCH ".to_string();
cmd.push_str(&v[0].to_string());
if v.len() != 1 {
for n in v.into_iter().skip(1) {
cmd.push(',');
cmd.push_str(&n.to_string());
}
}
cmd.push_str(
" (UID FLAGS ENVELOPE BODY.PEEK[HEADER.FIELDS (REFERENCES)] BODYSTRUCTURE)",
);
conn.send_command(cmd.as_bytes()).await?;
conn.read_response(&mut response, RequiredResponses::FETCH_REQUIRED)
.await?;
} else if select_response.exists > mailbox.exists.lock().unwrap().len() {
conn.send_command(
format!(
"FETCH {}:* (UID FLAGS ENVELOPE BODY.PEEK[HEADER.FIELDS (REFERENCES)] BODYSTRUCTURE)",
std::cmp::max(mailbox.exists.lock().unwrap().len(), 1)
)
.as_bytes(),
)
.await?;
conn.read_response(&mut response, RequiredResponses::FETCH_REQUIRED)
.await?;
} else {
return Ok(());
}
debug!(
"fetch response is {} bytes and {} lines",
response.len(),
String::from_utf8_lossy(&response).lines().count()
);
let (_, mut v, _) = protocol_parser::fetch_responses(&response)?;
debug!("responses len is {}", v.len());
for FetchResponse {
ref uid,
ref mut envelope,
ref mut flags,
ref references,
..
} in v.iter_mut()
{
let uid = uid.unwrap();
let env = envelope.as_mut().unwrap();
env.set_hash(generate_envelope_hash(mailbox.imap_path(), &uid));
if let Some(value) = references {
env.set_references(value);
}
let mut tag_lck = uid_store.collection.tag_index.write().unwrap();
if let Some((flags, keywords)) = flags {
env.set_flags(*flags);
if !env.is_seen() {
mailbox.unseen.lock().unwrap().insert_new(env.hash());
}
mailbox.exists.lock().unwrap().insert_new(env.hash());
for f in keywords {
let hash = tag_hash!(f);
if !tag_lck.contains_key(&hash) {
tag_lck.insert(hash, f.to_string());
}
env.labels_mut().push(hash);
}
}
}
if uid_store.keep_offline_cache && cache_handle.mailbox_state(mailbox_hash)?.is_some() {
cache_handle
.insert_envelopes(mailbox_hash, &v)
.chain_err_summary(|| {
format!(
"Could not save envelopes in cache for mailbox {}",
mailbox.imap_path()
)
})?;
}
for FetchResponse { uid, envelope, .. } in v {
if uid.is_none() || envelope.is_none() {
continue;
}
let uid = uid.unwrap();
if uid_store
.uid_index
.lock()
.unwrap()
.contains_key(&(mailbox_hash, uid))
{
continue;
}
let env = envelope.unwrap();
debug!(
"Create event {} {} {}",
env.hash(),
env.subject(),
mailbox.path(),
);
uid_store
.msn_index
.lock()
.unwrap()
.entry(mailbox_hash)
.or_default()
.push(uid);
uid_store
.hash_index
.lock()
.unwrap()
.insert(env.hash(), (uid, mailbox_hash));
uid_store
.uid_index
.lock()
.unwrap()
.insert((mailbox_hash, uid), env.hash());
conn.add_refresh_event(RefreshEvent {
account_hash: uid_store.account_hash,
mailbox_hash,
kind: Create(Box::new(env)),
});
}
}
Ok(())
}