melib/imap: add CONDSTORE support

Closes #52
memfd
Manos Pitsidianakis 2020-08-25 12:49:31 +03:00
parent 1ca0bd0d96
commit f7c9f21575
Signed by: Manos Pitsidianakis
GPG Key ID: 73627C2F690DF710
9 changed files with 1992 additions and 1465 deletions

View File

@ -33,6 +33,7 @@ pub use connection::*;
mod watch;
pub use watch::*;
mod cache;
use cache::ModSequence;
pub mod managesieve;
mod untagged;
@ -60,6 +61,7 @@ pub type UID = usize;
pub static SUPPORTED_CAPABILITIES: &[&str] = &[
#[cfg(feature = "deflate_compression")]
"COMPRESS=DEFLATE",
"CONDSTORE",
"ENABLE",
"IDLE",
"IMAP4REV1",
@ -140,10 +142,9 @@ macro_rules! get_conf_val {
#[derive(Debug)]
pub struct UIDStore {
account_hash: AccountHash,
cache_headers: bool,
account_name: Arc<String>,
keep_offline_cache: bool,
capabilities: Arc<Mutex<Capabilities>>,
uidvalidity: Arc<Mutex<HashMap<MailboxHash, UID>>>,
hash_index: Arc<Mutex<HashMap<EnvelopeHash, (UID, MailboxHash)>>>,
uid_index: Arc<Mutex<HashMap<(MailboxHash, UID), EnvelopeHash>>>,
msn_index: Arc<Mutex<HashMap<MailboxHash, Vec<UID>>>>,
@ -151,6 +152,13 @@ pub struct UIDStore {
byte_cache: Arc<Mutex<HashMap<UID, EnvelopeCache>>>,
tag_index: Arc<RwLock<BTreeMap<u64, String>>>,
/* Offline caching */
uidvalidity: Arc<Mutex<HashMap<MailboxHash, UID>>>,
envelopes: Arc<Mutex<HashMap<EnvelopeHash, cache::CachedEnvelope>>>,
max_uids: Arc<Mutex<HashMap<MailboxHash, UID>>>,
modseq: Arc<Mutex<HashMap<EnvelopeHash, ModSequence>>>,
reverse_modseq: Arc<Mutex<HashMap<MailboxHash, BTreeMap<ModSequence, EnvelopeHash>>>>,
highestmodseqs: Arc<Mutex<HashMap<MailboxHash, std::result::Result<ModSequence, ()>>>>,
mailboxes: Arc<FutureMutex<HashMap<MailboxHash, ImapMailbox>>>,
is_online: Arc<Mutex<(Instant, Result<()>)>>,
event_consumer: BackendEventConsumer,
@ -164,10 +172,15 @@ impl UIDStore {
) -> Self {
UIDStore {
account_hash,
cache_headers: false,
account_name,
keep_offline_cache: false,
capabilities: Default::default(),
uidvalidity: Default::default(),
envelopes: Default::default(),
max_uids: Default::default(),
modseq: Default::default(),
reverse_modseq: Default::default(),
highestmodseqs: Default::default(),
hash_index: Default::default(),
uid_index: Default::default(),
msn_index: Default::default(),
@ -213,6 +226,7 @@ impl MailBackend for ImapType {
idle,
#[cfg(feature = "deflate_compression")]
deflate,
condstore,
},
} = self.server_conf.protocol
{
@ -242,6 +256,15 @@ impl MailBackend for ImapType {
};
}
}
"CONDSTORE" => {
if condstore {
*status = MailBackendExtensionStatus::Enabled { comment: None };
} else {
*status = MailBackendExtensionStatus::Supported {
comment: Some("Disabled by user configuration"),
};
}
}
_ => {
if SUPPORTED_CAPABILITIES.contains(&name.as_str()) {
*status = MailBackendExtensionStatus::Enabled { comment: None };
@ -265,24 +288,30 @@ impl MailBackend for ImapType {
&mut self,
mailbox_hash: MailboxHash,
) -> Result<Pin<Box<dyn Stream<Item = Result<Vec<Envelope>>> + Send + 'static>>> {
let uid_store = self.uid_store.clone();
let can_create_flags = self.can_create_flags.clone();
let connection = self.connection.clone();
let mut max_uid: Option<usize> = None;
let mut valid_hash_set: HashSet<EnvelopeHash> = HashSet::default();
let mut our_unseen: BTreeSet<EnvelopeHash> = Default::default();
let mut state = FetchState {
stage: if self.uid_store.keep_offline_cache {
FetchStage::InitialCache
} else {
FetchStage::InitialFresh
},
connection: self.connection.clone(),
mailbox_hash,
can_create_flags: self.can_create_flags.clone(),
uid_store: self.uid_store.clone(),
};
Ok(Box::pin(async_stream::try_stream! {
{
let f = &uid_store.mailboxes.lock().await[&mailbox_hash];
let f = &state.uid_store.mailboxes.lock().await[&mailbox_hash];
f.exists.lock().unwrap().clear();
f.unseen.lock().unwrap().clear();
};
let (cached_hash_set, cached_payload) = fetch_cached_envs(mailbox_hash, &mut our_unseen, &uid_store)?;
yield cached_payload;
loop {
let res = fetch_hlpr(&connection, mailbox_hash, &cached_hash_set, &can_create_flags, &mut our_unseen, &mut valid_hash_set, &uid_store, &mut max_uid).await?;
let res = fetch_hlpr(&mut state).await.map_err(|err| {
debug!("fetch_hlpr err {:?}", &err);
err})?;
yield res;
if max_uid == Some(1) || max_uid == Some(0) {
if state.stage == FetchStage::Finished {
return;
}
@ -343,12 +372,9 @@ impl MailBackend for ImapType {
mailboxes.retain(|_, f| (self.is_subscribed)(f.path()));
*/
let keys = mailboxes.keys().cloned().collect::<HashSet<MailboxHash>>();
let mut uid_lock = uid_store.uidvalidity.lock().unwrap();
for f in mailboxes.values_mut() {
uid_lock.entry(f.hash()).or_default();
f.children.retain(|c| keys.contains(c));
}
drop(uid_lock);
Ok(mailboxes
.iter()
.filter(|(_, f)| f.is_subscribed)
@ -1133,6 +1159,7 @@ impl ImapType {
let use_starttls = use_tls && get_conf_val!(s["use_starttls"], !(server_port == 993))?;
let danger_accept_invalid_certs: bool =
get_conf_val!(s["danger_accept_invalid_certs"], false)?;
let keep_offline_cache = get_conf_val!(s["offline_cache"], true)?;
let server_conf = ImapServerConf {
server_hostname: server_hostname.to_string(),
server_username: server_username.to_string(),
@ -1144,6 +1171,7 @@ impl ImapType {
protocol: ImapProtocol::IMAP {
extension_use: ImapExtensionUse {
idle: get_conf_val!(s["use_idle"], true)?,
condstore: get_conf_val!(s["use_condstore"], true)?,
#[cfg(feature = "deflate_compression")]
deflate: get_conf_val!(s["use_deflate"], true)?,
},
@ -1156,7 +1184,7 @@ impl ImapType {
};
let account_name = Arc::new(s.name().to_string());
let uid_store: Arc<UIDStore> = Arc::new(UIDStore {
cache_headers: get_conf_val!(s["X_header_caching"], false)?,
keep_offline_cache,
..UIDStore::new(account_hash, account_name, event_consumer)
});
let connection = ImapConnection::new_connection(&server_conf, uid_store.clone());
@ -1330,8 +1358,9 @@ impl ImapType {
)));
}
get_conf_val!(s["danger_accept_invalid_certs"], false)?;
get_conf_val!(s["X_header_caching"], false)?;
get_conf_val!(s["offline_cache"], true)?;
get_conf_val!(s["use_idle"], true)?;
get_conf_val!(s["use_condstore"], true)?;
#[cfg(feature = "deflate_compression")]
get_conf_val!(s["use_deflate"], true)?;
#[cfg(not(feature = "deflate_compression"))]
@ -1355,311 +1384,252 @@ impl ImapType {
}
}
fn fetch_cached_envs(
mailbox_hash: MailboxHash,
our_unseen: &mut BTreeSet<EnvelopeHash>,
uid_store: &UIDStore,
) -> Result<(HashSet<EnvelopeHash>, Vec<Envelope>)> {
if !uid_store.cache_headers {
return Ok((HashSet::default(), vec![]));
}
let uidvalidities = uid_store.uidvalidity.lock().unwrap();
let v = if let Some(v) = uidvalidities.get(&mailbox_hash) {
v
} else {
return Ok((HashSet::default(), vec![]));
};
let cached_envs: (cache::MaxUID, Vec<(UID, Envelope)>);
cache::save_envelopes(uid_store.account_hash, mailbox_hash, *v, &[])
.chain_err_summary(|| "Could not save envelopes in cache in get()")?;
cached_envs = cache::fetch_envelopes(uid_store.account_hash, mailbox_hash, *v)
.chain_err_summary(|| "Could not get envelopes in cache in get()")?;
let (_max_uid, envelopes) = debug!(cached_envs);
let ret = envelopes.iter().map(|(_, env)| env.hash()).collect();
let payload = if !envelopes.is_empty() {
let mut payload = vec![];
for (uid, env) in envelopes {
if !env.is_seen() {
our_unseen.insert(env.hash());
}
uid_store
.hash_index
.lock()
.unwrap()
.insert(env.hash(), (uid, mailbox_hash));
uid_store
.uid_index
.lock()
.unwrap()
.insert((mailbox_hash, uid), env.hash());
payload.push(env);
}
debug!("sending cached payload for {}", mailbox_hash);
payload
} else {
vec![]
};
Ok((ret, payload))
#[derive(Debug, PartialEq, Copy, Clone)]
enum FetchStage {
InitialFresh,
InitialCache,
ResyncCache,
FreshFetch { max_uid: usize },
Finished,
}
async fn fetch_hlpr(
connection: &Arc<FutureMutex<ImapConnection>>,
#[derive(Debug)]
struct FetchState {
stage: FetchStage,
connection: Arc<FutureMutex<ImapConnection>>,
mailbox_hash: MailboxHash,
cached_hash_set: &HashSet<EnvelopeHash>,
can_create_flags: &Arc<Mutex<bool>>,
our_unseen: &mut BTreeSet<EnvelopeHash>,
valid_hash_set: &mut HashSet<EnvelopeHash>,
uid_store: &UIDStore,
max_uid: &mut Option<usize>,
) -> Result<Vec<Envelope>> {
let (permissions, mailbox_path, mailbox_exists, no_select, unseen) = {
let f = &uid_store.mailboxes.lock().await[&mailbox_hash];
(
f.permissions.clone(),
f.imap_path().to_string(),
f.exists.clone(),
f.no_select,
f.unseen.clone(),
)
};
if no_select {
*max_uid = Some(0);
return Ok(Vec::new());
}
let mut conn = connection.lock().await;
debug!("locked for fetch {}", mailbox_path);
let mut response = String::with_capacity(8 * 1024);
let max_uid_left = if let Some(max_uid) = max_uid {
*max_uid
} else {
conn.create_uid_msn_cache(mailbox_hash, 1).await?;
/* first SELECT the mailbox to get READ/WRITE permissions (because EXAMINE only
* returns READ-ONLY for both cases) */
conn.select_mailbox(mailbox_hash, &mut response, true)
.await
.chain_err_summary(|| format!("Could not select mailbox {}", mailbox_path))?;
let mut examine_response =
protocol_parser::select_response(&response).chain_err_summary(|| {
format!(
"Could not parse select response for mailbox {}",
mailbox_path
)
})?;
*can_create_flags.lock().unwrap() = examine_response.can_create_flags;
debug!(
"mailbox: {} examine_response: {:?}",
mailbox_path, examine_response
);
{
let mut uidvalidities = uid_store.uidvalidity.lock().unwrap();
can_create_flags: Arc<Mutex<bool>>,
uid_store: Arc<UIDStore>,
}
let v = uidvalidities
.entry(mailbox_hash)
.or_insert(examine_response.uidvalidity);
if uid_store.cache_headers {
let _ = cache::save_envelopes(
uid_store.account_hash,
mailbox_hash,
examine_response.uidvalidity,
&[],
);
}
*v = examine_response.uidvalidity;
let mut permissions = permissions.lock().unwrap();
permissions.create_messages = !examine_response.read_only;
permissions.remove_messages = !examine_response.read_only;
permissions.set_flags = !examine_response.read_only;
permissions.rename_messages = !examine_response.read_only;
permissions.delete_messages = !examine_response.read_only;
mailbox_exists
.lock()
.unwrap()
.set_not_yet_seen(examine_response.exists);
}
if examine_response.exists == 0 {
if uid_store.cache_headers {
for &env_hash in cached_hash_set {
conn.add_refresh_event(RefreshEvent {
account_hash: uid_store.account_hash,
mailbox_hash,
kind: RefreshEventKind::Remove(env_hash),
});
async fn fetch_hlpr(state: &mut FetchState) -> Result<Vec<Envelope>> {
debug!((state.mailbox_hash, &state.stage));
loop {
match state.stage {
FetchStage::InitialFresh => {
let select_response = state
.connection
.lock()
.await
.init_mailbox(state.mailbox_hash)
.await?;
*state.can_create_flags.lock().unwrap() = select_response.can_create_flags;
if select_response.exists == 0 {
state.stage = FetchStage::Finished;
return Ok(Vec::new());
}
let _ = cache::save_envelopes(
uid_store.account_hash,
mailbox_hash,
examine_response.uidvalidity,
&[],
);
state.stage = FetchStage::FreshFetch {
max_uid: select_response.uidnext - 1,
};
continue;
}
*max_uid = Some(0);
return Ok(Vec::new());
}
/* reselecting the same mailbox with EXAMINE prevents expunging it */
conn.examine_mailbox(mailbox_hash, &mut response, true)
.await?;
if examine_response.uidnext == 0 {
/* UIDNEXT shouldn't be 0, since exists != 0 at this point */
conn.send_command(format!("STATUS \"{}\" (UIDNEXT)", mailbox_path).as_bytes())
.await?;
conn.read_response(&mut response, RequiredResponses::STATUS)
.await?;
let (_, status) = protocol_parser::status_response(response.as_bytes())?;
if let Some(uidnext) = status.uidnext {
if uidnext == 0 {
return Err(MeliError::new(
"IMAP server error: zero UIDNEXt with nonzero exists.",
));
FetchStage::InitialCache => {
if let Some(cached_payload) = cache::fetch_cached_envs(state).await? {
state.stage = FetchStage::ResyncCache;
debug!(
"fetch_hlpr fetch_cached_envs payload {} len for mailbox_hash {}",
cached_payload.len(),
state.mailbox_hash
);
let (mailbox_exists, unseen) = {
let f = &state.uid_store.mailboxes.lock().await[&state.mailbox_hash];
(f.exists.clone(), f.unseen.clone())
};
unseen.lock().unwrap().insert_existing_set(
cached_payload
.iter()
.filter_map(|env| {
if !env.is_seen() {
Some(env.hash())
} else {
None
}
})
.collect(),
);
mailbox_exists.lock().unwrap().insert_existing_set(
cached_payload.iter().map(|env| env.hash()).collect::<_>(),
);
return Ok(cached_payload);
}
examine_response.uidnext = uidnext;
} else {
return Err(MeliError::new("IMAP server did not reply with UIDNEXT"));
state.stage = FetchStage::InitialFresh;
continue;
}
}
*max_uid = Some(examine_response.uidnext - 1);
examine_response.uidnext - 1
};
let chunk_size = 600;
FetchStage::ResyncCache => {
let mailbox_hash = state.mailbox_hash;
let mut conn = state.connection.lock().await;
let res = debug!(conn.resync(mailbox_hash).await);
if let Ok(Some(payload)) = res {
state.stage = FetchStage::Finished;
return Ok(payload);
}
state.stage = FetchStage::InitialFresh;
continue;
}
FetchStage::FreshFetch { max_uid } => {
let FetchState {
ref mut stage,
ref connection,
mailbox_hash,
can_create_flags: _,
ref uid_store,
} = state;
let mailbox_hash = *mailbox_hash;
let mut our_unseen: BTreeSet<EnvelopeHash> = BTreeSet::default();
let (mailbox_path, mailbox_exists, no_select, unseen) = {
let f = &uid_store.mailboxes.lock().await[&mailbox_hash];
(
f.imap_path().to_string(),
f.exists.clone(),
f.no_select,
f.unseen.clone(),
)
};
if no_select {
state.stage = FetchStage::Finished;
return Ok(Vec::new());
}
let mut conn = connection.lock().await;
debug!("locked for fetch {}", mailbox_path);
let mut response = String::with_capacity(8 * 1024);
let max_uid_left = max_uid;
let chunk_size = 250;
let mut payload = vec![];
conn.examine_mailbox(mailbox_hash, &mut response, false)
.await?;
if max_uid_left > 0 {
let mut envelopes = vec![];
debug!("{} max_uid_left= {}", mailbox_hash, max_uid_left);
if max_uid_left == 1 {
debug!("UID FETCH 1 (UID FLAGS ENVELOPE BODYSTRUCTURE)");
conn.send_command(b"UID FETCH 1 (UID FLAGS ENVELOPE BODYSTRUCTURE)")
.await?;
} else {
conn.send_command(
debug!(format!(
"UID FETCH {}:{} (UID FLAGS ENVELOPE BODYSTRUCTURE)",
std::cmp::max(std::cmp::max(max_uid_left.saturating_sub(chunk_size), 1), 1),
max_uid_left
))
.as_bytes(),
)
.await?
};
conn.read_response(&mut response, RequiredResponses::FETCH_REQUIRED)
.await
.chain_err_summary(|| {
format!(
"Could not parse fetch response for mailbox {}",
mailbox_path
)
})?;
debug!(
"fetch response is {} bytes and {} lines",
response.len(),
response.lines().count()
);
let (_, v, _) = protocol_parser::uid_fetch_responses(&response)?;
debug!("responses len is {}", v.len());
for UidFetchResponse {
uid,
message_sequence_number,
flags,
envelope,
..
} in v
{
let mut env = envelope.unwrap();
let mut h = DefaultHasher::new();
h.write_usize(uid);
h.write(mailbox_path.as_bytes());
env.set_hash(h.finish());
/*
debug!(
"env hash {} {} UID = {} MSN = {}",
env.hash(),
env.subject(),
uid,
message_sequence_number
);
*/
valid_hash_set.insert(env.hash());
let mut tag_lck = uid_store.tag_index.write().unwrap();
if let Some((flags, keywords)) = flags {
if !flags.intersects(Flag::SEEN) {
our_unseen.insert(env.hash());
}
env.set_flags(flags);
for f in keywords {
let hash = tag_hash!(f);
if !tag_lck.contains_key(&hash) {
tag_lck.insert(hash, f);
let mut payload = vec![];
conn.examine_mailbox(mailbox_hash, &mut response, false)
.await?;
if max_uid_left > 0 {
let mut envelopes = vec![];
debug!("{} max_uid_left= {}", mailbox_hash, max_uid_left);
let command = if max_uid_left == 1 {
"UID FETCH 1 (UID FLAGS ENVELOPE BODYSTRUCTURE)".to_string()
} else {
format!(
"UID FETCH {}:{} (UID FLAGS ENVELOPE BODYSTRUCTURE)",
std::cmp::max(
std::cmp::max(max_uid_left.saturating_sub(chunk_size), 1),
1
),
max_uid_left
)
};
debug!("sending {:?}", &command);
conn.send_command(command.as_bytes()).await?;
conn.read_response(&mut response, RequiredResponses::FETCH_REQUIRED)
.await
.chain_err_summary(|| {
format!(
"Could not parse fetch response for mailbox {}",
mailbox_path
)
})?;
debug!(
"fetch response is {} bytes and {} lines",
response.len(),
response.lines().count()
);
let (_, mut v, _) = protocol_parser::fetch_responses(&response)?;
debug!("responses len is {}", v.len());
for FetchResponse {
ref uid,
ref mut envelope,
ref mut flags,
..
} in v.iter_mut()
{
let uid = uid.unwrap();
let env = envelope.as_mut().unwrap();
env.set_hash(generate_envelope_hash(&mailbox_path, &uid));
let mut tag_lck = uid_store.tag_index.write().unwrap();
if let Some((flags, keywords)) = flags {
if !flags.intersects(Flag::SEEN) {
our_unseen.insert(env.hash());
}
env.set_flags(*flags);
for f in keywords {
let hash = tag_hash!(f);
if !tag_lck.contains_key(&hash) {
tag_lck.insert(hash, f.to_string());
}
env.labels_mut().push(hash);
}
}
}
env.labels_mut().push(hash);
if uid_store.keep_offline_cache {
let mut cache_handle = cache::CacheHandle::get(uid_store.clone())?;
debug!(cache_handle
.insert_envelopes(mailbox_hash, &v)
.chain_err_summary(|| {
format!(
"Could not save envelopes in cache for mailbox {}",
mailbox_path
)
}))?;
}
for FetchResponse {
uid,
message_sequence_number,
envelope,
..
} in v
{
let uid = uid.unwrap();
let env = envelope.unwrap();
/*
debug!(
"env hash {} {} UID = {} MSN = {}",
env.hash(),
env.subject(),
uid,
message_sequence_number
);
*/
uid_store
.msn_index
.lock()
.unwrap()
.entry(mailbox_hash)
.or_default()
.insert(message_sequence_number - 1, uid);
uid_store
.hash_index
.lock()
.unwrap()
.insert(env.hash(), (uid, mailbox_hash));
uid_store
.uid_index
.lock()
.unwrap()
.insert((mailbox_hash, uid), env.hash());
envelopes.push((uid, env));
}
debug!("sending payload for {}", mailbox_hash);
unseen
.lock()
.unwrap()
.insert_existing_set(our_unseen.iter().cloned().collect());
mailbox_exists.lock().unwrap().insert_existing_set(
envelopes.iter().map(|(_, env)| env.hash()).collect::<_>(),
);
drop(conn);
payload.extend(envelopes.into_iter().map(|(_, env)| env));
}
if max_uid_left <= 1 {
*stage = FetchStage::Finished;
} else {
*stage = FetchStage::FreshFetch {
max_uid: std::cmp::max(
std::cmp::max(max_uid_left.saturating_sub(chunk_size), 1),
1,
),
};
}
return Ok(payload);
}
FetchStage::Finished => {
return Ok(vec![]);
}
uid_store
.msn_index
.lock()
.unwrap()
.entry(mailbox_hash)
.or_default()
.insert(message_sequence_number - 1, uid);
uid_store
.hash_index
.lock()
.unwrap()
.insert(env.hash(), (uid, mailbox_hash));
uid_store
.uid_index
.lock()
.unwrap()
.insert((mailbox_hash, uid), env.hash());
envelopes.push((uid, env));
}
debug!("sending payload for {}", mailbox_hash);
if uid_store.cache_headers {
//FIXME
cache::save_envelopes(
uid_store.account_hash,
mailbox_hash,
uid_store.uidvalidity.lock().unwrap()[&mailbox_hash],
&envelopes
.iter()
.map(|(uid, env)| (*uid, env))
.collect::<SmallVec<[(UID, &Envelope); 1024]>>(),
)
.chain_err_summary(|| {
format!(
"Could not save envelopes in cache for mailbox {}",
mailbox_path
)
})?;
}
for &env_hash in cached_hash_set.difference(&valid_hash_set) {
conn.add_refresh_event(RefreshEvent {
account_hash: uid_store.account_hash,
mailbox_hash,
kind: RefreshEventKind::Remove(env_hash),
});
}
unseen
.lock()
.unwrap()
.insert_set(our_unseen.iter().cloned().collect());
mailbox_exists
.lock()
.unwrap()
.insert_existing_set(envelopes.iter().map(|(_, env)| env.hash()).collect::<_>());
drop(conn);
payload.extend(envelopes.into_iter().map(|(_, env)| env));
}
*max_uid = if max_uid_left <= 1 {
Some(0)
} else {
Some(std::cmp::max(
std::cmp::max(max_uid_left.saturating_sub(chunk_size), 1),
1,
))
};
Ok(payload)
}

View File

@ -19,14 +19,46 @@
* along with meli. If not, see <http://www.gnu.org/licenses/>.
*/
use super::UID;
use super::*;
mod sync;
use crate::{
backends::{AccountHash, MailboxHash},
email::Envelope,
backends::MailboxHash,
email::{Envelope, EnvelopeHash},
error::*,
};
pub type MaxUID = UID;
use std::convert::TryFrom;
#[derive(Debug, PartialEq, Hash, Eq, Ord, PartialOrd, Copy, Clone)]
pub struct ModSequence(pub std::num::NonZeroU64);
impl TryFrom<i64> for ModSequence {
type Error = ();
fn try_from(val: i64) -> std::result::Result<ModSequence, ()> {
std::num::NonZeroU64::new(val as u64)
.map(|u| Ok(ModSequence(u)))
.unwrap_or(Err(()))
}
}
impl core::fmt::Display for ModSequence {
fn fmt(&self, fmt: &mut core::fmt::Formatter) -> core::fmt::Result {
write!(fmt, "{}", &self.0)
}
}
#[derive(Debug)]
pub struct CachedEnvelope {
pub inner: Envelope,
pub mailbox_hash: MailboxHash,
pub modsequence: Option<ModSequence>,
}
pub struct CacheHandle {
#[cfg(feature = "sqlite3")]
connection: crate::sqlite3::Connection,
uid_store: Arc<UIDStore>,
}
#[cfg(feature = "sqlite3")]
pub use sqlite3_m::*;
@ -34,118 +66,220 @@ pub use sqlite3_m::*;
#[cfg(feature = "sqlite3")]
mod sqlite3_m {
use super::*;
use crate::sqlite3;
const DB_NAME: &str = "header_cache.db";
const INIT_SCRIPT: &str = "PRAGMA foreign_keys = true;
use crate::sqlite3::rusqlite::types::{
FromSql, FromSqlError, FromSqlResult, ToSql, ToSqlOutput,
};
use crate::sqlite3::{self, DatabaseDescription};
const DB_DESCRIPTION: DatabaseDescription = DatabaseDescription {
name: "header_cache.db",
init_script: Some("PRAGMA foreign_keys = true;
PRAGMA encoding = 'UTF-8';
CREATE TABLE IF NOT EXISTS envelopes (
mailbox_hash INTEGER,
uid INTEGER,
validity INTEGER,
envelope BLOB NOT NULL UNIQUE,
PRIMARY KEY (mailbox_hash, uid, validity),
FOREIGN KEY (mailbox_hash, validity) REFERENCES uidvalidity(mailbox_hash, uid) ON DELETE CASCADE
modsequence INTEGER,
envelope BLOB NOT NULL,
PRIMARY KEY (mailbox_hash, uid),
FOREIGN KEY (mailbox_hash) REFERENCES uidvalidity(mailbox_hash) ON DELETE CASCADE
);
CREATE TABLE IF NOT EXISTS uidvalidity (
uid INTEGER UNIQUE,
mailbox_hash INTEGER UNIQUE,
highestmodseq INTEGER,
PRIMARY KEY (mailbox_hash, uid)
);
CREATE INDEX IF NOT EXISTS envelope_idx ON envelopes(mailbox_hash, uid, validity);
CREATE INDEX IF NOT EXISTS uidvalidity_idx ON uidvalidity(mailbox_hash);";
CREATE INDEX IF NOT EXISTS envelope_idx ON envelopes(mailbox_hash);
CREATE INDEX IF NOT EXISTS uidvalidity_idx ON uidvalidity(mailbox_hash);"),
version: 1,
};
pub fn fetch_envelopes(
account_hash: AccountHash,
mailbox_hash: MailboxHash,
uidvalidity: usize,
) -> Result<(MaxUID, Vec<(UID, Envelope)>)> {
let conn = sqlite3::open_or_create_db(
&format!("{}_{}", account_hash, DB_NAME),
Some(INIT_SCRIPT),
)?;
let mut stmt = conn
.prepare("SELECT MAX(uid) FROM envelopes WHERE mailbox_hash = ? AND validity = ?")
.unwrap();
let max_uid: usize = stmt
.query_map(
sqlite3::params![mailbox_hash as i64, uidvalidity as i64],
|row| row.get(0).map(|u: i64| u as usize),
)
.chain_err_summary(|| {
format!(
"Error while performing query {:?}",
"SELECT MAX(uid) FROM envelopes WHERE mailbox_hash = ? AND validity = ?"
)
})?
.next()
.unwrap()
.unwrap_or(0);
let mut stmt = conn
.prepare("SELECT uid, envelope FROM envelopes WHERE mailbox_hash = ? AND validity = ?")
.unwrap();
let results: Vec<(UID, Vec<u8>)> = stmt
.query_map(
sqlite3::params![mailbox_hash as i64, uidvalidity as i64],
|row| Ok((row.get::<_, i64>(0)? as usize, row.get(1)?)),
)
.chain_err_summary(|| {
format!(
"Error while performing query {:?}",
"SELECT uid, envelope FROM envelopes WHERE mailbox_hash = ? AND validity = ?",
)
})?
.collect::<std::result::Result<_, _>>()?;
debug!(
"imap cache max_uid: {} results len: {}",
max_uid,
results.len()
);
Ok((
max_uid,
results
.into_iter()
.map(|(uid, env)| {
Ok((
uid,
bincode::deserialize(&env).map_err(|e| MeliError::new(e.to_string()))?,
))
})
.collect::<Result<Vec<(UID, Envelope)>>>()?,
))
impl ToSql for ModSequence {
fn to_sql(&self) -> rusqlite::Result<ToSqlOutput> {
Ok(ToSqlOutput::from(self.0.get() as i64))
}
}
pub fn save_envelopes(
account_hash: AccountHash,
mailbox_hash: MailboxHash,
uidvalidity: usize,
envs: &[(UID, &Envelope)],
) -> Result<()> {
let conn =
sqlite3::open_or_create_db(&format!("{}_{}", account_hash, DB_NAME), Some(INIT_SCRIPT))
impl FromSql for ModSequence {
fn column_result(value: rusqlite::types::ValueRef) -> FromSqlResult<Self> {
let i: i64 = FromSql::column_result(value)?;
if i == 0 {
return Err(FromSqlError::OutOfRange(0));
}
Ok(ModSequence::try_from(i).unwrap())
}
}
impl CacheHandle {
pub fn get(uid_store: Arc<UIDStore>) -> Result<Self> {
Ok(Self {
connection: sqlite3::open_or_create_db(
&DB_DESCRIPTION,
Some(uid_store.account_name.as_str()),
)?,
uid_store,
})
}
pub fn mailbox_state(
&self,
mailbox_hash: MailboxHash,
) -> Result<Option<(UID, Option<ModSequence>)>> {
let mut stmt = self
.connection
.prepare("SELECT uid, highestmodseq FROM uidvalidity WHERE mailbox_hash = ?1;")?;
let mut ret = stmt.query_map(sqlite3::params![mailbox_hash as i64], |row| {
Ok((row.get(0).map(|u: i64| u as usize)?, row.get(1)?))
})?;
if let Some(row_res) = ret.next() {
Ok(Some(row_res?))
} else {
Ok(None)
}
}
pub fn clear(
&self,
mailbox_hash: MailboxHash,
new_uidvalidity: UID,
highestmodseq: Option<ModSequence>,
) -> Result<()> {
debug!("clear mailbox_hash {}", mailbox_hash);
debug!(new_uidvalidity);
debug!(&highestmodseq);
self.connection
.execute(
"DELETE FROM uidvalidity WHERE mailbox_hash = ?1",
sqlite3::params![mailbox_hash as i64],
)
.chain_err_summary(|| {
format!(
"Could not create header_cache.db for account {}",
account_hash
"Could not clear cache of mailbox {} account {}",
mailbox_hash, self.uid_store.account_name
)
})?;
conn.execute(
"INSERT OR REPLACE INTO uidvalidity (uid, mailbox_hash) VALUES (?1, ?2)",
sqlite3::params![uidvalidity as i64, mailbox_hash as i64],
)
.chain_err_summary(|| {
format!(
"Could not insert uidvalidity {} in header_cache of account {}",
uidvalidity, account_hash
self.connection.execute(
"INSERT OR IGNORE INTO uidvalidity (uid, highestmodseq, mailbox_hash) VALUES (?1, ?2, ?3)",
sqlite3::params![new_uidvalidity as i64, highestmodseq, mailbox_hash as i64],
)
})?;
for (uid, env) in envs {
conn.execute(
"INSERT OR REPLACE INTO envelopes (uid, mailbox_hash, validity, envelope) VALUES (?1, ?2, ?3, ?4)",
sqlite3::params![*uid as i64, mailbox_hash as i64, uidvalidity as i64, bincode::serialize(env).map_err(|e| MeliError::new(e.to_string()))?],
).chain_err_summary(|| format!("Could not insert envelope with hash {} in header_cache of account {}", env.hash(), account_hash))?;
.chain_err_summary(|| {
format!(
"Could not insert uidvalidity {} in header_cache of account {}",
new_uidvalidity, self.uid_store.account_name
)
})?;
Ok(())
}
pub fn envelopes(&self, mailbox_hash: MailboxHash) -> Result<Option<Vec<EnvelopeHash>>> {
debug!("envelopes mailbox_hash {}", mailbox_hash);
if debug!(self.mailbox_state(mailbox_hash)?.is_none()) {
return Ok(None);
}
let mut stmt = self.connection.prepare(
"SELECT uid, envelope, modsequence FROM envelopes WHERE mailbox_hash = ?1;",
)?;
let ret: Vec<(UID, Envelope, Option<ModSequence>)> = stmt
.query_map(sqlite3::params![mailbox_hash as i64], |row| {
Ok((
row.get(0).map(|i: i64| i as usize)?,
row.get(1)?,
row.get(2)?,
))
})?
.into_iter()
.collect::<std::result::Result<_, _>>()?;
let mut max_uid = 0;
let mut env_lck = self.uid_store.envelopes.lock().unwrap();
let mut hash_index_lck = self.uid_store.hash_index.lock().unwrap();
let mut uid_index_lck = self.uid_store.uid_index.lock().unwrap();
let mut env_hashes = Vec::with_capacity(ret.len());
for (uid, env, modseq) in ret {
env_hashes.push(env.hash());
max_uid = std::cmp::max(max_uid, uid);
hash_index_lck.insert(env.hash(), (uid, mailbox_hash));
uid_index_lck.insert((mailbox_hash, uid), env.hash());
env_lck.insert(
env.hash(),
CachedEnvelope {
inner: env,
mailbox_hash,
modsequence: modseq,
},
);
}
self.uid_store
.max_uids
.lock()
.unwrap()
.insert(mailbox_hash, max_uid);
Ok(Some(env_hashes))
}
pub fn insert_envelopes(
&mut self,
mailbox_hash: MailboxHash,
fetches: &[FetchResponse<'_>],
) -> Result<()> {
debug!(
"insert_envelopes mailbox_hash {} len {}",
mailbox_hash,
fetches.len()
);
if self.mailbox_state(mailbox_hash)?.is_none() {
debug!(self.mailbox_state(mailbox_hash)?.is_none());
let uidvalidity = self
.uid_store
.uidvalidity
.lock()
.unwrap()
.get(&mailbox_hash)
.cloned();
let highestmodseq = self
.uid_store
.highestmodseqs
.lock()
.unwrap()
.get(&mailbox_hash)
.cloned();
debug!(&uidvalidity);
debug!(&highestmodseq);
if let Some(uidvalidity) = uidvalidity {
debug!(self.clear(
mailbox_hash,
uidvalidity,
highestmodseq.and_then(|v| v.ok()),
))?;
}
}
let Self {
ref mut connection,
ref uid_store,
} = self;
let tx = connection.transaction()?;
for item in fetches {
if let FetchResponse {
uid: Some(uid),
message_sequence_number: _,
modseq,
flags: _,
body: _,
envelope: Some(envelope),
} = item
{
tx.execute(
"INSERT OR REPLACE INTO envelopes (uid, mailbox_hash, modsequence, envelope) VALUES (?1, ?2, ?3, ?4)",
sqlite3::params![*uid as i64, mailbox_hash as i64, modseq, &envelope],
).chain_err_summary(|| format!("Could not insert envelope {} {} in header_cache of account {}", envelope.message_id(), envelope.hash(), uid_store.account_name))?;
}
}
tx.commit()?;
Ok(())
}
Ok(())
}
}
@ -155,20 +289,96 @@ pub use filesystem_m::*;
#[cfg(not(feature = "sqlite3"))]
mod filesystem_m {
use super::*;
pub fn fetch_envelopes(
_account_hash: AccountHash,
_mailbox_hash: MailboxHash,
_uidvalidity: usize,
) -> Result<(MaxUID, Vec<(UID, Envelope)>)> {
Ok((0, vec![]))
}
impl CacheHandle {
pub fn get(uid_store: Arc<UIDStore>) -> Result<Self> {
Ok(Self { uid_store })
}
pub fn save_envelopes(
_account_hash: AccountHash,
_mailbox_hash: MailboxHash,
_uidvalidity: usize,
_envs: &[(UID, &Envelope)],
) -> Result<()> {
Ok(())
pub fn mailbox_state(
&self,
_mailbox_hash: MailboxHash,
) -> Result<Option<(UID, Option<ModSequence>)>> {
Ok(None)
}
pub fn clear(
&self,
_mailbox_hash: MailboxHash,
_new_uidvalidity: UID,
_highestmodseq: Option<ModSequence>,
) -> Result<()> {
Ok(())
}
pub fn envelopes(&self, _mailbox_hash: MailboxHash) -> Result<Option<Vec<EnvelopeHash>>> {
Ok(None)
}
pub fn insert_envelopes(
&mut self,
_mailbox_hash: MailboxHash,
_fetches: &[FetchResponse<'_>],
) -> Result<()> {
Ok(())
}
}
}
pub(super) async fn fetch_cached_envs(state: &mut FetchState) -> Result<Option<Vec<Envelope>>> {
let FetchState {
stage: _,
ref mut connection,
mailbox_hash,
can_create_flags: _,
ref uid_store,
} = state;
debug!(uid_store.keep_offline_cache);
let mailbox_hash = *mailbox_hash;
if !uid_store.keep_offline_cache {
return Ok(None);
}
{
let mut conn = connection.lock().await;
match debug!(conn.load_cache(mailbox_hash).await) {
None => return Ok(None),
Some(Ok(env_hashes)) => {
uid_store
.mailboxes
.lock()
.await
.entry(mailbox_hash)
.and_modify(|entry| {
entry
.exists
.lock()
.unwrap()
.insert_set(env_hashes.iter().cloned().collect());
let env_lck = uid_store.envelopes.lock().unwrap();
entry.unseen.lock().unwrap().insert_set(
env_hashes
.iter()
.filter_map(|h| {
if !env_lck[h].inner.is_seen() {
Some(*h)
} else {
None
}
})
.collect(),
);
});
let env_lck = uid_store.envelopes.lock().unwrap();
return Ok(Some(
env_hashes
.into_iter()
.filter_map(|env_hash| {
env_lck.get(&env_hash).map(|c_env| c_env.inner.clone())
})
.collect::<Vec<Envelope>>(),
));
}
Some(Err(err)) => return debug!(Err(err)),
}
}
}

View File

@ -0,0 +1,735 @@
/*
* melib - IMAP
*
* Copyright 2020 Manos Pitsidianakis
*
* This file is part of meli.
*
* meli is free software: you can redistribute it and/or modify
* it under the terms of the GNU General Public License as published by
* the Free Software Foundation, either version 3 of the License, or
* (at your option) any later version.
*
* meli is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* GNU General Public License for more details.
*
* You should have received a copy of the GNU General Public License
* along with meli. If not, see <http://www.gnu.org/licenses/>.
*/
use super::*;
impl ImapConnection {
pub async fn resync(&mut self, mailbox_hash: MailboxHash) -> Result<Option<Vec<Envelope>>> {
debug!("resync mailbox_hash {}", mailbox_hash);
debug!(&self.sync_policy);
if let SyncPolicy::None = self.sync_policy {
return Ok(None);
}
let cache_handle = CacheHandle::get(self.uid_store.clone())?;
if cache_handle.mailbox_state(mailbox_hash)?.is_none() {
return Ok(None);
}
self.select_mailbox(mailbox_hash, &mut String::new(), false)
.await?;
match self.sync_policy {
SyncPolicy::None => Ok(None),
SyncPolicy::Basic => self.resync_basic(cache_handle, mailbox_hash).await,
SyncPolicy::Condstore => self.resync_condstore(cache_handle, mailbox_hash).await,
SyncPolicy::CondstoreQresync => {
self.resync_condstoreqresync(cache_handle, mailbox_hash)
.await
}
}
}
pub async fn load_cache(
&mut self,
mailbox_hash: MailboxHash,
) -> Option<Result<Vec<EnvelopeHash>>> {
debug!("load_cache {}", mailbox_hash);
let cache_handle = match CacheHandle::get(self.uid_store.clone()) {
Ok(v) => v,
Err(err) => return Some(Err(err)),
};
let (uidvalidity, highestmodseq) = match debug!(cache_handle.mailbox_state(mailbox_hash)) {
Err(err) => return Some(Err(err)),
Ok(Some(v)) => v,
Ok(None) => {
return None;
}
};
self.uid_store
.uidvalidity
.lock()
.unwrap()
.entry(mailbox_hash)
.or_insert(uidvalidity);
self.uid_store
.highestmodseqs
.lock()
.unwrap()
.entry(mailbox_hash)
.or_insert(highestmodseq.ok_or(()));
match debug!(cache_handle.envelopes(mailbox_hash)) {
Ok(Some(envs)) => Some(Ok(envs)),
Ok(None) => None,
Err(err) => Some(Err(err)),
}
}
pub async fn build_cache(
&mut self,
cache_handle: &mut CacheHandle,
mailbox_hash: MailboxHash,
) -> Result<()> {
debug!("build_cache {}", mailbox_hash);
let mut response = String::with_capacity(8 * 1024);
// 1 get uidvalidity, highestmodseq
self.select_mailbox(mailbox_hash, &mut response, true)
.await?;
let select_response =
protocol_parser::select_response(&response).chain_err_summary(|| {
format!(
"Could not parse select response for mailbox {}",
mailbox_hash
)
})?;
self.uid_store
.uidvalidity
.lock()
.unwrap()
.insert(mailbox_hash, select_response.uidvalidity);
if let Some(v) = select_response.highestmodseq {
self.uid_store
.highestmodseqs
.lock()
.unwrap()
.insert(mailbox_hash, v);
}
cache_handle.clear(
mailbox_hash,
select_response.uidvalidity,
select_response.highestmodseq.and_then(|i| i.ok()),
)?;
self.send_command(b"UID FETCH 1:* (UID FLAGS ENVELOPE BODYSTRUCTURE)")
.await?;
self.read_response(&mut response, RequiredResponses::FETCH_REQUIRED)
.await?;
let fetches = protocol_parser::fetch_responses(&response)?.1;
cache_handle.insert_envelopes(mailbox_hash, &fetches)?;
Ok(())
}
//rfc4549_Synchronization_Operations_for_Disconnected_IMAP4_Clients
pub async fn resync_basic(
&mut self,
cache_handle: CacheHandle,
mailbox_hash: MailboxHash,
) -> Result<Option<Vec<Envelope>>> {
let mut payload = vec![];
debug!("resync_basic");
debug!(self
.uid_store
.uidvalidity
.lock()
.unwrap()
.get(&mailbox_hash));
debug!(self.uid_store.max_uids.lock().unwrap().get(&mailbox_hash));
let mut response = String::with_capacity(8 * 1024);
let cached_uidvalidity = self
.uid_store
.uidvalidity
.lock()
.unwrap()
.get(&mailbox_hash)
.cloned();
let cached_max_uid = self
.uid_store
.max_uids
.lock()
.unwrap()
.get(&mailbox_hash)
.cloned();
// 3. tag2 UID FETCH 1:<lastseenuid> FLAGS
if cached_uidvalidity.is_none() || cached_max_uid.is_none() {
return Ok(None);
}
let current_uidvalidity: UID = cached_uidvalidity.unwrap();
let max_uid: UID = cached_max_uid.unwrap();
let (mailbox_path, mailbox_exists, unseen) = {
let f = &self.uid_store.mailboxes.lock().await[&mailbox_hash];
(
f.imap_path().to_string(),
f.exists.clone(),
f.unseen.clone(),
)
};
let mut new_unseen = BTreeSet::default();
debug!("current_uidvalidity is {}", current_uidvalidity);
debug!("max_uid is {}", max_uid);
self.select_mailbox(mailbox_hash, &mut response, true)
.await?;
let select_response = protocol_parser::select_response(&response)?;
debug!(
"select_response.uidvalidity is {}",
select_response.uidvalidity
);
// 1. check UIDVALIDITY. If fail, discard cache and rebuild
if select_response.uidvalidity != current_uidvalidity {
cache_handle.clear(
mailbox_hash,
select_response.uidvalidity,
select_response.highestmodseq.and_then(|i| i.ok()),
)?;
return Ok(None);
}
// 2. tag1 UID FETCH <lastseenuid+1>:* <descriptors>
self.send_command(
format!(
"UID FETCH {}:* (UID FLAGS ENVELOPE BODYSTRUCTURE)",
max_uid + 1
)
.as_bytes(),
)
.await?;
self.read_response(&mut response, RequiredResponses::FETCH_REQUIRED)
.await?;
debug!(
"fetch response is {} bytes and {} lines",
response.len(),
response.lines().count()
);
let (_, mut v, _) = protocol_parser::fetch_responses(&response)?;
debug!("responses len is {}", v.len());
for FetchResponse {
ref uid,
ref mut envelope,
ref mut flags,
..
} in v.iter_mut()
{
let uid = uid.unwrap();
let env = envelope.as_mut().unwrap();
env.set_hash(generate_envelope_hash(&mailbox_path, &uid));
let mut tag_lck = self.uid_store.tag_index.write().unwrap();
if let Some((flags, keywords)) = flags {
if !flags.intersects(Flag::SEEN) {
new_unseen.insert(env.hash());
}
env.set_flags(*flags);
for f in keywords {
let hash = tag_hash!(f);
if !tag_lck.contains_key(&hash) {
tag_lck.insert(hash, f.to_string());
}
env.labels_mut().push(hash);
}
}
}
{
let mut cache_handle = cache::CacheHandle::get(self.uid_store.clone())?;
debug!(cache_handle
.insert_envelopes(mailbox_hash, &v)
.chain_err_summary(|| {
format!(
"Could not save envelopes in cache for mailbox {}",
mailbox_path
)
}))?;
}
for FetchResponse {
uid,
message_sequence_number: _,
envelope,
..
} in v
{
let uid = uid.unwrap();
let env = envelope.unwrap();
/*
debug!(
"env hash {} {} UID = {} MSN = {}",
env.hash(),
env.subject(),
uid,
message_sequence_number
);
*/
self.uid_store
.hash_index
.lock()
.unwrap()
.insert(env.hash(), (uid, mailbox_hash));
self.uid_store
.uid_index
.lock()
.unwrap()
.insert((mailbox_hash, uid), env.hash());
payload.push((uid, env));
}
debug!("sending payload for {}", mailbox_hash);
unseen
.lock()
.unwrap()
.insert_existing_set(new_unseen.iter().cloned().collect());
mailbox_exists
.lock()
.unwrap()
.insert_existing_set(payload.iter().map(|(_, env)| env.hash()).collect::<_>());
// 3. tag2 UID FETCH 1:<lastseenuid> FLAGS
self.send_command(format!("UID FETCH 1:{} FLAGS", max_uid).as_bytes())
.await?;
self.read_response(&mut response, RequiredResponses::FETCH_REQUIRED)
.await?;
//1) update cached flags for old messages;
//2) find out which old messages got expunged; and
//3) build a mapping between message numbers and UIDs (for old messages).
let mut valid_envs = BTreeSet::default();
let mut env_lck = self.uid_store.envelopes.lock().unwrap();
let (_, v, _) = protocol_parser::fetch_responses(&response)?;
let mut refresh_events = vec![];
for FetchResponse { uid, flags, .. } in v {
let uid = uid.unwrap();
let env_hash = generate_envelope_hash(&mailbox_path, &uid);
valid_envs.insert(env_hash);
if !env_lck.contains_key(&env_hash) {
return Ok(None);
}
let (flags, tags) = flags.unwrap();
if env_lck[&env_hash].inner.flags() != flags
|| env_lck[&env_hash].inner.labels()
!= &tags
.iter()
.map(|t| tag_hash!(t))
.collect::<SmallVec<[u64; 8]>>()
{
env_lck.entry(env_hash).and_modify(|entry| {
entry.inner.set_flags(flags);
entry.inner.labels_mut().clear();
entry
.inner
.labels_mut()
.extend(tags.iter().map(|t| tag_hash!(t)));
});
refresh_events.push(RefreshEvent {
mailbox_hash,
account_hash: self.uid_store.account_hash,
kind: RefreshEventKind::NewFlags(env_hash, (flags, tags)),
});
}
}
for env_hash in valid_envs.difference(
&env_lck
.iter()
.filter_map(|(h, cenv)| {
if cenv.mailbox_hash == mailbox_hash {
Some(*h)
} else {
None
}
})
.collect(),
) {
env_lck.remove(env_hash);
refresh_events.push(RefreshEvent {
mailbox_hash,
account_hash: self.uid_store.account_hash,
kind: RefreshEventKind::Remove(*env_hash),
});
}
drop(env_lck);
for ev in refresh_events {
self.add_refresh_event(ev);
}
Ok(Some(payload.into_iter().map(|(_, env)| env).collect()))
}
//rfc4549_Synchronization_Operations_for_Disconnected_IMAP4_Clients
//Section 6.1
pub async fn resync_condstore(
&mut self,
cache_handle: CacheHandle,
mailbox_hash: MailboxHash,
) -> Result<Option<Vec<Envelope>>> {
let mut payload = vec![];
debug!("resync_condstore");
debug!(self
.uid_store
.uidvalidity
.lock()
.unwrap()
.get(&mailbox_hash));
debug!(self.uid_store.max_uids.lock().unwrap().get(&mailbox_hash));
let mut response = String::with_capacity(8 * 1024);
let cached_uidvalidity = self
.uid_store
.uidvalidity
.lock()
.unwrap()
.get(&mailbox_hash)
.cloned();
let cached_max_uid = self
.uid_store
.max_uids
.lock()
.unwrap()
.get(&mailbox_hash)
.cloned();
let cached_highestmodseq = self
.uid_store
.highestmodseqs
.lock()
.unwrap()
.get(&mailbox_hash)
.cloned();
if cached_uidvalidity.is_none()
|| cached_max_uid.is_none()
|| cached_highestmodseq.is_none()
{
// This means the mailbox is not cached.
return Ok(None);
}
let cached_uidvalidity: UID = cached_uidvalidity.unwrap();
let cached_max_uid: UID = cached_max_uid.unwrap();
let cached_highestmodseq: std::result::Result<ModSequence, ()> =
cached_highestmodseq.unwrap();
if cached_highestmodseq.is_err() {
// No MODSEQ is available for __this__ mailbox, fallback to basic sync
return self.resync_basic(cache_handle, mailbox_hash).await;
}
let cached_highestmodseq: ModSequence = cached_highestmodseq.unwrap();
let (mailbox_path, mailbox_exists, unseen) = {
let f = &self.uid_store.mailboxes.lock().await[&mailbox_hash];
(
f.imap_path().to_string(),
f.exists.clone(),
f.unseen.clone(),
)
};
let mut new_unseen = BTreeSet::default();
debug!("current_uidvalidity is {}", cached_uidvalidity);
debug!("max_uid is {}", cached_max_uid);
// 1. check UIDVALIDITY. If fail, discard cache and rebuild
self.select_mailbox(mailbox_hash, &mut response, true)
.await?;
let select_response = protocol_parser::select_response(&response)?;
debug!(
"select_response.uidvalidity is {}",
select_response.uidvalidity
);
if select_response.uidvalidity != cached_uidvalidity {
// 1a) Check the mailbox UIDVALIDITY (see section 4.1 for more
//details) with SELECT/EXAMINE/STATUS.
// If the UIDVALIDITY value returned by the server differs, the
// client MUST
// * empty the local cache of that mailbox;
// * "forget" the cached HIGHESTMODSEQ value for the mailbox;
// * remove any pending "actions" that refer to UIDs in that
// mailbox (note that this doesn't affect actions performed on
// client-generated fake UIDs; see Section 5); and
// * skip steps 1b and 2-II;
cache_handle.clear(
mailbox_hash,
select_response.uidvalidity,
select_response.highestmodseq.and_then(|i| i.ok()),
)?;
return Ok(None);
}
if select_response.highestmodseq.is_none()
|| select_response.highestmodseq.as_ref().unwrap().is_err()
{
if select_response.highestmodseq.as_ref().unwrap().is_err() {
self.uid_store
.highestmodseqs
.lock()
.unwrap()
.insert(mailbox_hash, Err(()));
}
return self.resync_basic(cache_handle, mailbox_hash).await;
}
let new_highestmodseq = select_response.highestmodseq.unwrap().unwrap();
let mut refresh_events = vec![];
// 1b) Check the mailbox HIGHESTMODSEQ.
// If the cached value is the same as the one returned by the server, skip fetching
// message flags on step 2-II, i.e., the client only has to find out which messages got
// expunged.
if cached_highestmodseq != new_highestmodseq {
/* Cache is synced, only figure out which messages got expunged */
// 2) Fetch the current "descriptors".
// I) Discover new messages.
// II) Discover changes to old messages and flags for new messages
// using
// "FETCH 1:* (FLAGS) (CHANGEDSINCE <cached-value>)" or
// "SEARCH MODSEQ <cached-value>".
// 2. tag1 UID FETCH <lastseenuid+1>:* <descriptors>
self.send_command(
format!(
"UID FETCH {}:* (UID FLAGS ENVELOPE BODYSTRUCTURE) (CHANGEDSINCE {})",
cached_max_uid + 1,
cached_highestmodseq,
)
.as_bytes(),
)
.await?;
self.read_response(&mut response, RequiredResponses::FETCH_REQUIRED)
.await?;
debug!(
"fetch response is {} bytes and {} lines",
response.len(),
response.lines().count()
);
let (_, mut v, _) = protocol_parser::fetch_responses(&response)?;
debug!("responses len is {}", v.len());
for FetchResponse {
ref uid,
ref mut envelope,
ref mut flags,
..
} in v.iter_mut()
{
let uid = uid.unwrap();
let env = envelope.as_mut().unwrap();
env.set_hash(generate_envelope_hash(&mailbox_path, &uid));
let mut tag_lck = self.uid_store.tag_index.write().unwrap();
if let Some((flags, keywords)) = flags {
if !flags.intersects(Flag::SEEN) {
new_unseen.insert(env.hash());
}
env.set_flags(*flags);
for f in keywords {
let hash = tag_hash!(f);
if !tag_lck.contains_key(&hash) {
tag_lck.insert(hash, f.to_string());
}
env.labels_mut().push(hash);
}
}
}
{
let mut cache_handle = cache::CacheHandle::get(self.uid_store.clone())?;
debug!(cache_handle
.insert_envelopes(mailbox_hash, &v)
.chain_err_summary(|| {
format!(
"Could not save envelopes in cache for mailbox {}",
mailbox_path
)
}))?;
}
for FetchResponse { uid, envelope, .. } in v {
let uid = uid.unwrap();
let env = envelope.unwrap();
/*
debug!(
"env hash {} {} UID = {} MSN = {}",
env.hash(),
env.subject(),
uid,
message_sequence_number
);
*/
self.uid_store
.hash_index
.lock()
.unwrap()
.insert(env.hash(), (uid, mailbox_hash));
self.uid_store
.uid_index
.lock()
.unwrap()
.insert((mailbox_hash, uid), env.hash());
payload.push((uid, env));
}
debug!("sending payload for {}", mailbox_hash);
unseen
.lock()
.unwrap()
.insert_existing_set(new_unseen.iter().cloned().collect());
mailbox_exists
.lock()
.unwrap()
.insert_existing_set(payload.iter().map(|(_, env)| env.hash()).collect::<_>());
// 3. tag2 UID FETCH 1:<lastseenuid> FLAGS
self.send_command(
format!(
"UID FETCH 1:{} FLAGS (CHANGEDSINCE {})",
cached_max_uid, cached_highestmodseq
)
.as_bytes(),
)
.await?;
self.read_response(&mut response, RequiredResponses::FETCH_REQUIRED)
.await?;
//1) update cached flags for old messages;
let mut env_lck = self.uid_store.envelopes.lock().unwrap();
let (_, v, _) = protocol_parser::fetch_responses(&response)?;
for FetchResponse { uid, flags, .. } in v {
let uid = uid.unwrap();
let env_hash = generate_envelope_hash(&mailbox_path, &uid);
if !env_lck.contains_key(&env_hash) {
return Ok(None);
}
let (flags, tags) = flags.unwrap();
if env_lck[&env_hash].inner.flags() != flags
|| env_lck[&env_hash].inner.labels()
!= &tags
.iter()
.map(|t| tag_hash!(t))
.collect::<SmallVec<[u64; 8]>>()
{
env_lck.entry(env_hash).and_modify(|entry| {
entry.inner.set_flags(flags);
entry.inner.labels_mut().clear();
entry
.inner
.labels_mut()
.extend(tags.iter().map(|t| tag_hash!(t)));
});
refresh_events.push(RefreshEvent {
mailbox_hash,
account_hash: self.uid_store.account_hash,
kind: RefreshEventKind::NewFlags(env_hash, (flags, tags)),
});
}
}
self.uid_store
.highestmodseqs
.lock()
.unwrap()
.insert(mailbox_hash, Ok(new_highestmodseq));
}
let mut valid_envs = BTreeSet::default();
// This should be UID SEARCH 1:<maxuid> but it's difficult to compare to cached UIDs at the
// point of calling this function
self.send_command(b"UID SEARCH ALL").await?;
self.read_response(&mut response, RequiredResponses::SEARCH)
.await?;
//1) update cached flags for old messages;
let (_, v) = protocol_parser::search_results(response.as_bytes())?;
for uid in v {
valid_envs.insert(generate_envelope_hash(&mailbox_path, &uid));
}
let mut env_lck = self.uid_store.envelopes.lock().unwrap();
for env_hash in valid_envs.difference(
&env_lck
.iter()
.filter_map(|(h, cenv)| {
if cenv.mailbox_hash == mailbox_hash {
Some(*h)
} else {
None
}
})
.collect(),
) {
env_lck.remove(env_hash);
refresh_events.push(RefreshEvent {
mailbox_hash,
account_hash: self.uid_store.account_hash,
kind: RefreshEventKind::Remove(*env_hash),
});
}
drop(env_lck);
for ev in refresh_events {
self.add_refresh_event(ev);
}
Ok(Some(payload.into_iter().map(|(_, env)| env).collect()))
}
//rfc7162_Quick Flag Changes Resynchronization (CONDSTORE)_and Quick Mailbox Resynchronization (QRESYNC)
pub async fn resync_condstoreqresync(
&mut self,
_cache_handle: CacheHandle,
_mailbox_hash: MailboxHash,
) -> Result<Option<Vec<Envelope>>> {
Ok(None)
}
pub async fn init_mailbox(&mut self, mailbox_hash: MailboxHash) -> Result<SelectResponse> {
let mut response = String::with_capacity(8 * 1024);
let (mailbox_path, mailbox_exists, unseen, permissions) = {
let f = &self.uid_store.mailboxes.lock().await[&mailbox_hash];
(
f.imap_path().to_string(),
f.exists.clone(),
f.unseen.clone(),
f.permissions.clone(),
)
};
self.create_uid_msn_cache(mailbox_hash, 1).await?;
/* first SELECT the mailbox to get READ/WRITE permissions (because EXAMINE only
* returns READ-ONLY for both cases) */
let mut select_response = self
.select_mailbox(mailbox_hash, &mut response, true)
.await
.chain_err_summary(|| format!("Could not select mailbox {}", mailbox_path))?
.unwrap();
debug!(
"mailbox: {} select_response: {:?}",
mailbox_path, select_response
);
{
{
let mut uidvalidities = self.uid_store.uidvalidity.lock().unwrap();
let v = uidvalidities
.entry(mailbox_hash)
.or_insert(select_response.uidvalidity);
*v = select_response.uidvalidity;
}
let mut permissions = permissions.lock().unwrap();
permissions.create_messages = !select_response.read_only;
permissions.remove_messages = !select_response.read_only;
permissions.set_flags = !select_response.read_only;
permissions.rename_messages = !select_response.read_only;
permissions.delete_messages = !select_response.read_only;
mailbox_exists
.lock()
.unwrap()
.set_not_yet_seen(select_response.exists);
unseen
.lock()
.unwrap()
.set_not_yet_seen(select_response.unseen);
}
if select_response.exists == 0 {
return Ok(select_response);
}
/* reselecting the same mailbox with EXAMINE prevents expunging it */
self.examine_mailbox(mailbox_hash, &mut response, true)
.await?;
if select_response.uidnext == 0 {
/* UIDNEXT shouldn't be 0, since exists != 0 at this point */
self.send_command(format!("STATUS \"{}\" (UIDNEXT)", mailbox_path).as_bytes())
.await?;
self.read_response(&mut response, RequiredResponses::STATUS)
.await?;
let (_, status) = protocol_parser::status_response(response.as_bytes())?;
if let Some(uidnext) = status.uidnext {
if uidnext == 0 {
return Err(MeliError::new(
"IMAP server error: zero UIDNEXT with nonzero exists.",
));
}
select_response.uidnext = uidnext;
} else {
return Err(MeliError::new("IMAP server did not reply with UIDNEXT"));
}
}
Ok(select_response)
}
}

View File

@ -19,8 +19,8 @@
* along with meli. If not, see <http://www.gnu.org/licenses/>.
*/
use super::protocol_parser::{ImapLineSplit, ImapResponse, RequiredResponses};
use crate::backends::MailboxHash;
use super::protocol_parser::{ImapLineSplit, ImapResponse, RequiredResponses, SelectResponse};
use crate::backends::{MailboxHash, RefreshEvent};
use crate::connections::{lookup_ipv4, timeout, Connection};
use crate::email::parser::BytesExt;
use crate::error::*;
@ -39,6 +39,16 @@ use std::time::{Duration, Instant};
use super::protocol_parser;
use super::{Capabilities, ImapServerConf, UIDStore};
#[derive(Debug, Clone, Copy)]
pub enum SyncPolicy {
None,
///rfc4549 `Synch Ops for Disconnected IMAP4 Clients` https://tools.ietf.org/html/rfc4549
Basic,
///rfc7162 `IMAP Extensions: Quick Flag Changes Resynchronization (CONDSTORE) and Quick Mailbox Resynchronization (QRESYNC)`
Condstore,
CondstoreQresync,
}
#[derive(Debug, Clone, Copy)]
pub enum ImapProtocol {
IMAP { extension_use: ImapExtensionUse },
@ -47,6 +57,7 @@ pub enum ImapProtocol {
#[derive(Debug, Clone, Copy)]
pub struct ImapExtensionUse {
pub condstore: bool,
pub idle: bool,
#[cfg(feature = "deflate_compression")]
pub deflate: bool,
@ -55,6 +66,7 @@ pub struct ImapExtensionUse {
impl Default for ImapExtensionUse {
fn default() -> Self {
Self {
condstore: true,
idle: true,
#[cfg(feature = "deflate_compression")]
deflate: true,
@ -91,6 +103,7 @@ async fn try_await(cl: impl Future<Output = Result<()>> + Send) -> Result<()> {
pub struct ImapConnection {
pub stream: Result<ImapStream>,
pub server_conf: ImapServerConf,
pub sync_policy: SyncPolicy,
pub uid_store: Arc<UIDStore>,
}
@ -495,6 +508,11 @@ impl ImapConnection {
ImapConnection {
stream: Err(MeliError::new("Offline".to_string())),
server_conf: server_conf.clone(),
sync_policy: if uid_store.keep_offline_cache {
SyncPolicy::Basic
} else {
SyncPolicy::None
},
uid_store,
}
}
@ -523,12 +541,34 @@ impl ImapConnection {
ImapProtocol::IMAP {
extension_use:
ImapExtensionUse {
condstore,
#[cfg(feature = "deflate_compression")]
deflate,
idle: _idle,
},
} =>
{
} => {
if capabilities.contains(&b"CONDSTORE"[..]) && condstore {
match self.sync_policy {
SyncPolicy::None => { /* do nothing, sync is disabled */ }
_ => {
/* Upgrade to Condstore */
let mut ret = String::new();
if capabilities.contains(&b"ENABLE"[..]) {
self.send_command(b"ENABLE CONDSTORE").await?;
self.read_response(&mut ret, RequiredResponses::empty())
.await?;
} else {
self.send_command(
b"STATUS INBOX (UIDNEXT UIDVALIDITY UNSEEN MESSAGES HIGHESTMODSEQ)",
)
.await?;
self.read_response(&mut ret, RequiredResponses::empty())
.await?;
}
self.sync_policy = SyncPolicy::Condstore;
}
}
}
#[cfg(feature = "deflate_compression")]
if capabilities.contains(&b"COMPRESS=DEFLATE"[..]) && deflate {
let mut ret = String::new();
@ -600,12 +640,28 @@ impl ImapConnection {
ImapResponse::No(ref response_code) => {
//FIXME return error
debug!("Received NO response: {:?} {:?}", response_code, response);
(self.uid_store.event_consumer)(
self.uid_store.account_hash,
crate::backends::BackendEvent::Notice {
description: None,
content: response_code.to_string(),
level: crate::logging::LoggingLevel::ERROR,
},
);
ret.push_str(&response);
return r.into();
}
ImapResponse::Bad(ref response_code) => {
//FIXME return error
debug!("Received BAD response: {:?} {:?}", response_code, response);
(self.uid_store.event_consumer)(
self.uid_store.account_hash,
crate::backends::BackendEvent::Notice {
description: None,
content: response_code.to_string(),
level: crate::logging::LoggingLevel::ERROR,
},
);
ret.push_str(&response);
return r.into();
}
@ -691,24 +747,31 @@ impl ImapConnection {
mailbox_hash: MailboxHash,
ret: &mut String,
force: bool,
) -> Result<()> {
) -> Result<Option<SelectResponse>> {
if !force && self.stream.as_ref()?.current_mailbox == MailboxSelection::Select(mailbox_hash)
{
return Ok(());
return Ok(None);
}
self.send_command(
format!(
"SELECT \"{}\"",
self.uid_store.mailboxes.lock().await[&mailbox_hash].imap_path()
)
.as_bytes(),
)
.await?;
let (imap_path, permissions) = {
let m = &self.uid_store.mailboxes.lock().await[&mailbox_hash];
(m.imap_path().to_string(), m.permissions.clone())
};
self.send_command(format!("SELECT \"{}\"", imap_path).as_bytes())
.await?;
self.read_response(ret, RequiredResponses::SELECT_REQUIRED)
.await?;
debug!("select response {}", ret);
let select_response = protocol_parser::select_response(&ret)?;
{
let mut permissions = permissions.lock().unwrap();
permissions.create_messages = !select_response.read_only;
permissions.remove_messages = !select_response.read_only;
permissions.set_flags = !select_response.read_only;
permissions.rename_messages = !select_response.read_only;
permissions.delete_messages = !select_response.read_only;
}
self.stream.as_mut()?.current_mailbox = MailboxSelection::Select(mailbox_hash);
Ok(())
Ok(Some(select_response))
}
pub async fn examine_mailbox(
@ -716,11 +779,11 @@ impl ImapConnection {
mailbox_hash: MailboxHash,
ret: &mut String,
force: bool,
) -> Result<()> {
) -> Result<Option<SelectResponse>> {
if !force
&& self.stream.as_ref()?.current_mailbox == MailboxSelection::Examine(mailbox_hash)
{
return Ok(());
return Ok(None);
}
self.send_command(
format!(
@ -733,8 +796,9 @@ impl ImapConnection {
self.read_response(ret, RequiredResponses::EXAMINE_REQUIRED)
.await?;
debug!("examine response {}", ret);
let select_response = protocol_parser::select_response(&ret)?;
self.stream.as_mut()?.current_mailbox = MailboxSelection::Examine(mailbox_hash);
Ok(())
Ok(Some(select_response))
}
pub async fn unselect(&mut self) -> Result<()> {
@ -782,7 +846,7 @@ impl ImapConnection {
Ok(())
}
pub fn add_refresh_event(&mut self, ev: crate::backends::RefreshEvent) {
pub fn add_refresh_event(&mut self, ev: RefreshEvent) {
(self.uid_store.event_consumer)(
self.uid_store.account_hash,
crate::backends::BackendEvent::Refresh(ev),

View File

@ -80,12 +80,13 @@ impl BackendOp for ImapOp {
response.len(),
response.lines().collect::<Vec<&str>>().len()
);
let UidFetchResponse {
let FetchResponse {
uid: _uid,
flags: _flags,
body,
..
} = protocol_parser::uid_fetch_response(&response)?.1;
} = protocol_parser::fetch_response(&response)?.1;
let _uid = _uid.unwrap();
assert_eq!(_uid, uid);
assert!(body.is_some());
let mut bytes_cache = uid_store.byte_cache.lock()?;

View File

@ -29,7 +29,7 @@ use nom::{
character::complete::digit1,
character::is_digit,
combinator::{map, map_res, opt},
multi::{fold_many1, length_data, many0, separated_list, separated_nonempty_list},
multi::{fold_many1, length_data, many0, separated_nonempty_list},
sequence::{delimited, preceded},
};
use std::convert::TryFrom;
@ -449,46 +449,17 @@ pub fn list_mailbox_result(input: &[u8]) -> IResult<&[u8], ImapMailbox> {
))
}
pub fn my_flags(input: &[u8]) -> IResult<&[u8], Flag> {
let (input, flags) = separated_list(tag(" "), preceded(tag("\\"), is_not(")")))(input)?;
let mut ret = Flag::default();
for f in flags {
match f {
b"Answered" => {
ret.set(Flag::REPLIED, true);
}
b"Flagged" => {
ret.set(Flag::FLAGGED, true);
}
b"Deleted" => {
ret.set(Flag::TRASHED, true);
}
b"Seen" => {
ret.set(Flag::SEEN, true);
}
b"Draft" => {
ret.set(Flag::DRAFT, true);
}
f => {
debug!("unknown Flag token value: {}", unsafe {
std::str::from_utf8_unchecked(f)
});
}
}
}
Ok((input, ret))
}
#[derive(Debug)]
pub struct UidFetchResponse<'a> {
pub uid: UID,
#[derive(Debug, Clone, PartialEq)]
pub struct FetchResponse<'a> {
pub uid: Option<UID>,
pub message_sequence_number: usize,
pub modseq: Option<ModSequence>,
pub flags: Option<(Flag, Vec<String>)>,
pub body: Option<&'a [u8]>,
pub envelope: Option<Envelope>,
}
pub fn uid_fetch_response(input: &str) -> ImapParseResult<UidFetchResponse<'_>> {
pub fn fetch_response(input: &str) -> ImapParseResult<FetchResponse<'_>> {
macro_rules! should_start_with {
($input:expr, $tag:literal) => {
if !$input.starts_with($tag) {
@ -533,9 +504,10 @@ pub fn uid_fetch_response(input: &str) -> ImapParseResult<UidFetchResponse<'_>>
};
}
let mut ret = UidFetchResponse {
uid: 0,
let mut ret = FetchResponse {
uid: None,
message_sequence_number: 0,
modseq: None,
flags: None,
body: None,
envelope: None,
@ -564,7 +536,8 @@ pub fn uid_fetch_response(input: &str) -> ImapParseResult<UidFetchResponse<'_>>
)(input[i..].as_bytes())
{
i += input.len() - i - rest.len();
ret.uid = usize::from_str(unsafe { std::str::from_utf8_unchecked(uid) }).unwrap();
ret.uid =
Some(usize::from_str(unsafe { std::str::from_utf8_unchecked(uid) }).unwrap());
} else {
return debug!(Err(MeliError::new(format!(
"Unexpected input while parsing UID FETCH response. Got: `{:.40}`",
@ -582,6 +555,23 @@ pub fn uid_fetch_response(input: &str) -> ImapParseResult<UidFetchResponse<'_>>
input
))));
}
} else if input[i..].starts_with("MODSEQ (") {
i += "MODSEQ (".len();
if let Ok((rest, modseq)) = take_while::<_, &[u8], (&[u8], nom::error::ErrorKind)>(
is_digit,
)(input[i..].as_bytes())
{
i += (input.len() - i - rest.len()) + 1;
ret.modseq = u64::from_str(to_str!(modseq))
.ok()
.and_then(std::num::NonZeroU64::new)
.map(ModSequence);
} else {
return debug!(Err(MeliError::new(format!(
"Unexpected input while parsing MODSEQ in UID FETCH response. Got: `{:.40}`",
input
))));
}
} else if input[i..].starts_with("RFC822 {") {
i += "RFC822 ".len();
if let Ok((rest, body)) =
@ -668,12 +658,12 @@ pub fn uid_fetch_response(input: &str) -> ImapParseResult<UidFetchResponse<'_>>
Ok((&input[i..], ret, None))
}
pub fn uid_fetch_responses(mut input: &str) -> ImapParseResult<Vec<UidFetchResponse<'_>>> {
pub fn fetch_responses(mut input: &str) -> ImapParseResult<Vec<FetchResponse<'_>>> {
let mut ret = Vec::new();
let mut alert: Option<Alert> = None;
loop {
let next_response = uid_fetch_response(input);
while input.starts_with("* ") {
let next_response = fetch_response(input);
match next_response {
Ok((rest, el, el_alert)) => {
if let Some(el_alert) = el_alert {
@ -686,9 +676,6 @@ pub fn uid_fetch_responses(mut input: &str) -> ImapParseResult<Vec<UidFetchRespo
}
input = rest;
ret.push(el);
if !input.starts_with("* ") {
break;
}
}
Err(err) => {
return Err(MeliError::new(format!(
@ -700,50 +687,17 @@ pub fn uid_fetch_responses(mut input: &str) -> ImapParseResult<Vec<UidFetchRespo
}
if !input.is_empty() && ret.is_empty() {
return Err(MeliError::new(format!(
"310Unexpected input while parsing UID FETCH responses: `{:.40}`",
input
)));
if let Ok(ImapResponse::Ok(_)) = ImapResponse::try_from(input) {
} else {
return Err(MeliError::new(format!(
"310Unexpected input while parsing UID FETCH responses: `{:.40}`",
input
)));
}
}
Ok((input, ret, None))
}
/*
*
* "* 1 FETCH (FLAGS (\Seen) UID 1 RFC822.HEADER {5224} "
*/
pub fn uid_fetch_response_(
input: &[u8],
) -> IResult<&[u8], Vec<(usize, Option<(Flag, Vec<String>)>, &[u8])>> {
many0(
|input| -> IResult<&[u8], (usize, Option<(Flag, Vec<String>)>, &[u8])> {
let (input, _) = tag("* ")(input)?;
let (input, _) = take_while(is_digit)(input)?;
let (input, result) = permutation((
preceded(
alt((tag("UID "), tag(" UID "))),
map_res(digit1, |s| {
usize::from_str(unsafe { std::str::from_utf8_unchecked(s) })
}),
),
opt(preceded(
alt((tag("FLAGS "), tag(" FLAGS "))),
delimited(tag("("), byte_flags, tag(")")),
)),
length_data(delimited(
tag("{"),
map_res(digit1, |s| {
usize::from_str(unsafe { std::str::from_utf8_unchecked(s) })
}),
tag("}\r\n"),
)),
))(input.ltrim())?;
let (input, _) = tag(")\r\n")(input)?;
Ok((input, (result.0, result.1, result.2)))
},
)(input)
}
pub fn uid_fetch_flags_responses(
input: &[u8],
) -> IResult<&[u8], Vec<(usize, (Flag, Vec<String>))>> {
@ -817,21 +771,11 @@ pub fn capabilities(input: &[u8]) -> IResult<&[u8], Vec<&[u8]>> {
let (input, _) = take_until("\r\n")(input)?;
let (input, _) = tag("\r\n")(input)?;
Ok((input, ret))
/*
pub capabilities<>,
do_parse!(
take_until!("CAPABILITY ")
>> tag!("CAPABILITY ")
>> ret: separated_nonempty_list_complete!(tag!(" "), is_not!(" ]\r\n"))
>> take_until!("\r\n")
>> tag!("\r\n")
>> ({ ret })
)
*/
}
/// This enum represents the server's untagged responses detailed in `7. Server Responses` of RFC 3501 INTERNET MESSAGE ACCESS PROTOCOL - VERSION 4rev1
pub enum UntaggedResponse {
#[derive(Debug, PartialEq)]
pub enum UntaggedResponse<'s> {
/// ```text
/// 7.4.1. EXPUNGE Response
///
@ -891,52 +835,81 @@ pub enum UntaggedResponse {
/// messages).
/// ```
Recent(usize),
Fetch(usize, (Flag, Vec<String>)),
UIDFetch(UID, (Flag, Vec<String>)),
Fetch(FetchResponse<'s>),
Bye {
reason: String,
reason: &'s str,
},
}
pub fn untagged_responses(input: &[u8]) -> IResult<&[u8], Option<UntaggedResponse>> {
pub fn untagged_responses(input: &str) -> ImapParseResult<Option<UntaggedResponse<'_>>> {
let orig_input = input;
let (input, _) = tag("* ")(input)?;
let (input, num) = map_res(digit1, |s| usize::from_str(to_str!(s)))(input)?;
let (input, _) = tag(" ")(input)?;
let (input, _tag) = take_until("\r\n")(input)?;
let (input, _) = tag("\r\n")(input)?;
debug!("Parse untagged response from {:?}", to_str!(orig_input));
Ok((input, {
use UntaggedResponse::*;
match _tag {
b"EXPUNGE" => Some(Expunge(num)),
b"EXISTS" => Some(Exists(num)),
b"RECENT" => Some(Recent(num)),
_ if _tag.starts_with(b"FETCH ") => {
if to_str!(_tag).contains("UID") {
let (uid, flags) = uid_fetch_flags_response(orig_input)?.1;
Some(UIDFetch(uid, flags))
} else {
let f = flags(unsafe {
std::str::from_utf8_unchecked(&_tag[b"FETCH (FLAGS (".len()..])
})
.map(|(_, flags)| Fetch(num, flags));
if let Err(ref err) = f {
debug!(
"untagged_response malformed fetch: {} {}",
unsafe { std::str::from_utf8_unchecked(_tag) },
err
)
}
f.ok()
let (input, _) = tag::<_, &str, (&str, nom::error::ErrorKind)>("* ")(input)?;
let (input, num) =
map_res::<_, _, _, (&str, nom::error::ErrorKind), _, _, _>(digit1, |s| usize::from_str(s))(
input,
)?;
let (input, _) = tag::<_, &str, (&str, nom::error::ErrorKind)>(" ")(input)?;
let (input, _tag) = take_until::<_, &str, (&str, nom::error::ErrorKind)>("\r\n")(input)?;
let (input, _) = tag::<_, &str, (&str, nom::error::ErrorKind)>("\r\n")(input)?;
debug!("Parse untagged response from {:?}", orig_input);
Ok((
input,
{
use UntaggedResponse::*;
match _tag {
"EXPUNGE" => Some(Expunge(num)),
"EXISTS" => Some(Exists(num)),
"RECENT" => Some(Recent(num)),
_ if _tag.starts_with("FETCH ") => Some(Fetch(fetch_response(orig_input)?.1)),
_ => {
debug!("unknown untagged_response: {}", _tag);
None
}
}
_ => {
debug!("unknown untagged_response: {}", to_str!(_tag));
None
}
}
}))
},
None,
))
}
#[test]
fn test_untagged_responses() {
use std::convert::TryInto;
use UntaggedResponse::*;
assert_eq!(
untagged_responses("* 2 EXISTS\r\n")
.map(|(_, v, _)| v)
.unwrap()
.unwrap(),
Exists(2)
);
assert_eq!(
untagged_responses("* 1079 FETCH (UID 1103 MODSEQ (1365) FLAGS (\\Seen))\r\n")
.map(|(_, v, _)| v)
.unwrap()
.unwrap(),
Fetch(FetchResponse {
uid: Some(1103),
message_sequence_number: 1079,
modseq: Some(ModSequence(std::num::NonZeroU64::new(1365_u64).unwrap())),
flags: Some((Flag::SEEN, vec![])),
body: None,
envelope: None
})
);
assert_eq!(
untagged_responses("* 1 FETCH (FLAGS (\\Seen))\r\n")
.map(|(_, v, _)| v)
.unwrap()
.unwrap(),
Fetch(FetchResponse {
uid: None,
message_sequence_number: 1,
modseq: None,
flags: Some((Flag::SEEN, vec![])),
body: None,
envelope: None
})
);
}
pub fn search_results<'a>(input: &'a [u8]) -> IResult<&'a [u8], Vec<usize>> {
@ -991,7 +964,7 @@ fn test_imap_search() {
);
}
#[derive(Debug, Default, Clone)]
#[derive(Debug, Default, PartialEq, Clone)]
pub struct SelectResponse {
pub exists: usize,
pub recent: usize,
@ -1003,6 +976,7 @@ pub struct SelectResponse {
/// if SELECT returns \* we can set arbritary flags permanently.
pub can_create_flags: bool,
pub read_only: bool,
pub highestmodseq: Option<std::result::Result<ModSequence, ()>>,
}
/*
@ -1030,11 +1004,11 @@ pub struct SelectResponse {
pub fn select_response(input: &str) -> Result<SelectResponse> {
if input.contains("* OK") {
let mut ret = SelectResponse::default();
for l in input.split("\r\n") {
if l.starts_with("* ") && l.ends_with(" EXISTS") {
ret.exists = usize::from_str(&l["* ".len()..l.len() - " EXISTS".len()])?;
} else if l.starts_with("* ") && l.ends_with(" RECENT") {
ret.recent = usize::from_str(&l["* ".len()..l.len() - " RECENT".len()])?;
for l in input.split_rn() {
if l.starts_with("* ") && l.ends_with(" EXISTS\r\n") {
ret.exists = usize::from_str(&l["* ".len()..l.len() - " EXISTS\r\n".len()])?;
} else if l.starts_with("* ") && l.ends_with(" RECENT\r\n") {
ret.recent = usize::from_str(&l["* ".len()..l.len() - " RECENT\r\n".len()])?;
} else if l.starts_with("* FLAGS (") {
ret.flags = flags(&l["* FLAGS (".len()..l.len() - ")".len()]).map(|(_, v)| v)?;
} else if l.starts_with("* OK [UNSEEN ") {
@ -1053,6 +1027,16 @@ pub fn select_response(input: &str) -> Result<SelectResponse> {
ret.read_only = false;
} else if l.contains("OK [READ-ONLY]") {
ret.read_only = true;
} else if l.starts_with("* OK [HIGHESTMODSEQ ") {
let res: IResult<&str, &str> = take_until("]")(&l["* OK [HIGHESTMODSEQ ".len()..]);
let (_, highestmodseq) = res?;
ret.highestmodseq = Some(
std::num::NonZeroU64::new(u64::from_str(&highestmodseq)?)
.map(|u| Ok(ModSequence(u)))
.unwrap_or(Err(())),
);
} else if l.starts_with("* OK [NOMODSEQ") {
ret.highestmodseq = Some(Err(()));
} else if !l.is_empty() {
debug!("select response: {}", l);
}
@ -1064,6 +1048,76 @@ pub fn select_response(input: &str) -> Result<SelectResponse> {
}
}
#[test]
fn test_select_response() {
use std::convert::TryInto;
let r = "* FLAGS (\\Answered \\Flagged \\Deleted \\Seen \\Draft)\r\n* OK [PERMANENTFLAGS (\\Answered \\Flagged \\Deleted \\Seen \\Draft \\*)] Flags permitted.\r\n* 45 EXISTS\r\n* 0 RECENT\r\n* OK [UNSEEN 16] First unseen.\r\n* OK [UIDVALIDITY 1554422056] UIDs valid\r\n* OK [UIDNEXT 50] Predicted next UID\r\n";
assert_eq!(
select_response(r).expect("Could not parse IMAP select response"),
SelectResponse {
exists: 45,
recent: 0,
flags: (
Flag::REPLIED | Flag::SEEN | Flag::TRASHED | Flag::DRAFT | Flag::FLAGGED,
Vec::new()
),
unseen: 16,
uidvalidity: 1554422056,
uidnext: 50,
permanentflags: (
Flag::REPLIED | Flag::SEEN | Flag::TRASHED | Flag::DRAFT | Flag::FLAGGED,
vec!["*".into()]
),
can_create_flags: true,
read_only: false,
highestmodseq: None
}
);
let r = "* 172 EXISTS\r\n* 1 RECENT\r\n* OK [UNSEEN 12] Message 12 is first unseen\r\n* OK [UIDVALIDITY 3857529045] UIDs valid\r\n* OK [UIDNEXT 4392] Predicted next UID\r\n* FLAGS (\\Answered \\Flagged \\Deleted \\Seen \\Draft)\r\n* OK [PERMANENTFLAGS (\\Deleted \\Seen \\*)] Limited\r\n* OK [HIGHESTMODSEQ 715194045007]\r\n* A142 OK [READ-WRITE] SELECT completed\r\n";
assert_eq!(
select_response(r).expect("Could not parse IMAP select response"),
SelectResponse {
exists: 172,
recent: 1,
flags: (
Flag::REPLIED | Flag::SEEN | Flag::TRASHED | Flag::DRAFT | Flag::FLAGGED,
Vec::new()
),
unseen: 12,
uidvalidity: 3857529045,
uidnext: 4392,
permanentflags: (Flag::SEEN | Flag::TRASHED, vec!["*".into()]),
can_create_flags: true,
read_only: false,
highestmodseq: Some(Ok(ModSequence(
std::num::NonZeroU64::new(715194045007_u64).unwrap()
))),
}
);
let r = "* 172 EXISTS\r\n* 1 RECENT\r\n* OK [UNSEEN 12] Message 12 is first unseen\r\n* OK [UIDVALIDITY 3857529045] UIDs valid\r\n* OK [UIDNEXT 4392] Predicted next UID\r\n* FLAGS (\\Answered \\Flagged \\Deleted \\Seen \\Draft)\r\n* OK [PERMANENTFLAGS (\\Deleted \\Seen \\*)] Limited\r\n* OK [NOMODSEQ] Sorry, this mailbox format doesn't support modsequences\r\n* A142 OK [READ-WRITE] SELECT completed\r\n";
assert_eq!(
select_response(r).expect("Could not parse IMAP select response"),
SelectResponse {
exists: 172,
recent: 1,
flags: (
Flag::REPLIED | Flag::SEEN | Flag::TRASHED | Flag::DRAFT | Flag::FLAGGED,
Vec::new()
),
unseen: 12,
uidvalidity: 3857529045,
uidnext: 4392,
permanentflags: (Flag::SEEN | Flag::TRASHED, vec!["*".into()]),
can_create_flags: true,
read_only: false,
highestmodseq: Some(Err(())),
}
);
}
pub fn flags(input: &str) -> IResult<&str, (Flag, Vec<String>)> {
let mut ret = Flag::default();
let mut keywords = Vec::new();
@ -1209,69 +1263,6 @@ pub fn envelope(input: &[u8]) -> IResult<&[u8], Envelope> {
env
}),
))
/*
do_parse!(
tag!("(")
>> opt!(is_a!("\r\n\t "))
>> date: quoted_or_nil
>> opt!(is_a!("\r\n\t "))
>> subject: quoted_or_nil
>> opt!(is_a!("\r\n\t "))
>> from: envelope_addresses
>> opt!(is_a!("\r\n\t "))
>> sender: envelope_addresses
>> opt!(is_a!("\r\n\t "))
>> reply_to: envelope_addresses
>> opt!(is_a!("\r\n\t "))
>> to: envelope_addresses
>> opt!(is_a!("\r\n\t "))
>> cc: envelope_addresses
>> opt!(is_a!("\r\n\t "))
>> bcc: envelope_addresses
>> opt!(is_a!("\r\n\t "))
>> in_reply_to: quoted_or_nil
>> opt!(is_a!("\r\n\t "))
>> message_id: quoted_or_nil
>> opt!(is_a!("\r\n\t "))
>> tag!(")")
>> ({
let mut env = Envelope::new(0);
if let Some(date) = date {
env.set_date(&date);
if let Ok(d) = crate::email::parser::generic::date(env.date_as_str().as_bytes()) {
env.set_datetime(d);
}
}
if let Some(subject) = subject {
env.set_subject(subject.to_vec());
}
if let Some(from) = from {
env.set_from(from);
}
if let Some(to) = to {
env.set_to(to);
}
if let Some(cc) = cc {
env.set_cc(cc);
}
if let Some(bcc) = bcc {
env.set_bcc(bcc);
}
if let Some(in_reply_to) = in_reply_to {
env.set_in_reply_to(&in_reply_to);
env.push_references(&in_reply_to);
}
if let Some(message_id) = message_id {
env.set_message_id(&message_id);
}
env
})
*/
}
/* Helper to build StrBuilder for Address structs */
@ -1305,18 +1296,6 @@ pub fn envelope_addresses<'a>(
},
map(tag("\"\""), |_| None),
))(input)
/*
alt_complete!(map!(tag!("NIL"), |_| None) |
do_parse!(
tag!("(")
>> envelopes: many1!(delimited!(ws!(tag!("(")), envelope_address, tag!(")")))
>> tag!(")")
>> ({
Some(envelopes)
})
)
));
*/
}
// Parse an address in the format of the ENVELOPE structure eg
@ -1366,24 +1345,6 @@ pub fn envelope_address(input: &[u8]) -> IResult<&[u8], Address> {
},
}),
))
/*
do_parse!(
name: alt_complete!(quoted | map!(tag!("NIL"), |_| Vec::new()))
>> is_a!("\r\n\t ")
>> alt_complete!(quoted| map!(tag!("NIL"), |_| Vec::new()))
>> is_a!("\r\n\t ")
>> mailbox_name: dbg_dmp!(alt_complete!(quoted | map!(tag!("NIL"), |_| Vec::new())))
>> is_a!("\r\n\t ")
>> host_name: alt_complete!(quoted | map!(tag!("NIL"), |_| Vec::new()))
>> ({
Address::Mailbox(MailboxAddress {
raw: format!("{}{}<{}@{}>", to_str!(&name), if name.is_empty() { "" } else { " " }, to_str!(&mailbox_name), to_str!(&host_name)).into_bytes(),
display_name: str_builder!(0, name.len()),
address_spec: str_builder!(if name.is_empty() { 1 } else { name.len() + 2 }, mailbox_name.len() + host_name.len() + 1),
})
})
));
*/
}
// Read a literal ie a byte sequence prefixed with a tag containing its length delimited in {}s
@ -1427,9 +1388,6 @@ pub fn quoted(input: &[u8]) -> IResult<&[u8], Vec<u8>> {
pub fn quoted_or_nil(input: &[u8]) -> IResult<&[u8], Option<Vec<u8>>> {
alt((map(tag("NIL"), |_| None), map(quoted, |v| Some(v))))(input.ltrim())
/*
alt_complete!(map!(ws!(tag!("NIL")), |_| None) | map!(quoted, |v| Some(v))));
*/
}
pub fn uid_fetch_envelopes_response(
@ -1465,28 +1423,6 @@ pub fn uid_fetch_envelopes_response(
}))
},
)(input)
/*
many0!(
do_parse!(
tag!("* ")
>> take_while!(call!(is_digit))
>> tag!(" FETCH (")
>> uid_flags: permutation!(preceded!(ws!(tag!("UID ")), map_res!(digit1, |s| { usize::from_str(unsafe { std::str::from_utf8_unchecked(s) }) })), opt!(preceded!(ws!(tag!("FLAGS ")), delimited!(tag!("("), byte_flags, tag!(")")))))
>> tag!(" ENVELOPE ")
>> env: ws!(envelope)
>> tag!("BODYSTRUCTURE ")
>> bodystructure: take_until!(")\r\n")
>> tag!(")\r\n")
>> ({
let mut env = env;
let has_attachments = bodystructure_has_attachments(bodystructure);
env.set_has_attachments(has_attachments);
(uid_flags.0, uid_flags.1, env)
})
)
)
);
*/
}
pub fn bodystructure_has_attachments(input: &[u8]) -> bool {
@ -1608,3 +1544,10 @@ fn astring_char_tokens(input: &[u8]) -> IResult<&[u8], &[u8]> {
// FIXME
is_not(" \r\n")(input)
}
pub fn generate_envelope_hash(mailbox_path: &str, uid: &UID) -> EnvelopeHash {
let mut h = DefaultHasher::new();
h.write_usize(*uid);
h.write(mailbox_path.as_bytes());
h.finish()
}

View File

@ -21,7 +21,7 @@
use super::{ImapConnection, MailboxSelection};
use crate::backends::imap::protocol_parser::{
ImapLineSplit, RequiredResponses, UidFetchResponse, UntaggedResponse,
generate_envelope_hash, FetchResponse, ImapLineSplit, RequiredResponses, UntaggedResponse,
};
use crate::backends::BackendMailbox;
use crate::backends::{
@ -60,7 +60,7 @@ impl ImapConnection {
let mut response = String::with_capacity(8 * 1024);
let untagged_response =
match super::protocol_parser::untagged_responses(line.as_bytes()).map(|(_, v)| v) {
match super::protocol_parser::untagged_responses(line).map(|(_, v, _)| v) {
Ok(None) | Err(_) => {
return Ok(false);
}
@ -103,79 +103,79 @@ impl ImapConnection {
/* UID FETCH ALL UID, cross-ref, then FETCH difference headers
* */
debug!("exists {}", n);
if n > mailbox.exists.lock().unwrap().len() {
try_fail!(
mailbox_hash,
self.send_command(
&[
b"FETCH",
format!("{}:{}", mailbox.exists.lock().unwrap().len() + 1, n).as_bytes(),
b"(UID FLAGS RFC822)",
]
.join(&b' '),
).await
self.read_response(&mut response, RequiredResponses::FETCH_REQUIRED).await
);
match super::protocol_parser::uid_fetch_responses(&response) {
Ok((_, v, _)) => {
'fetch_responses: for UidFetchResponse {
uid, flags, body, ..
} in v
try_fail!(
mailbox_hash,
self.send_command(format!("FETCH {} (UID FLAGS RFC822)", n).as_bytes()).await
self.read_response(&mut response, RequiredResponses::FETCH_REQUIRED).await
);
match super::protocol_parser::fetch_responses(&response) {
Ok((_, v, _)) => {
'fetch_responses: for FetchResponse {
uid, flags, body, ..
} in v
{
let uid = uid.unwrap();
if self
.uid_store
.uid_index
.lock()
.unwrap()
.contains_key(&(mailbox_hash, uid))
{
if self
.uid_store
continue 'fetch_responses;
}
let env_hash = generate_envelope_hash(&mailbox.imap_path(), &uid);
self.uid_store
.msn_index
.lock()
.unwrap()
.entry(mailbox_hash)
.or_default()
.push(uid);
if let Ok(mut env) =
Envelope::from_bytes(body.unwrap(), flags.as_ref().map(|&(f, _)| f))
{
env.set_hash(env_hash);
self.uid_store
.hash_index
.lock()
.unwrap()
.insert(env_hash, (uid, mailbox_hash));
self.uid_store
.uid_index
.lock()
.unwrap()
.contains_key(&(mailbox_hash, uid))
{
continue 'fetch_responses;
}
if let Ok(mut env) = Envelope::from_bytes(
body.unwrap(),
flags.as_ref().map(|&(f, _)| f),
) {
self.uid_store
.hash_index
.lock()
.unwrap()
.insert(env.hash(), (uid, mailbox_hash));
self.uid_store
.uid_index
.lock()
.unwrap()
.insert((mailbox_hash, uid), env.hash());
if let Some((_, keywords)) = flags {
let mut tag_lck = self.uid_store.tag_index.write().unwrap();
for f in keywords {
let hash = tag_hash!(f);
if !tag_lck.contains_key(&hash) {
tag_lck.insert(hash, f);
}
env.labels_mut().push(hash);
.insert((mailbox_hash, uid), env_hash);
if let Some((_, keywords)) = flags {
let mut tag_lck = self.uid_store.tag_index.write().unwrap();
for f in keywords {
let hash = tag_hash!(f);
if !tag_lck.contains_key(&hash) {
tag_lck.insert(hash, f);
}
env.labels_mut().push(hash);
}
debug!(
"Create event {} {} {}",
env.hash(),
env.subject(),
mailbox.path(),
);
if !env.is_seen() {
mailbox.unseen.lock().unwrap().insert_new(env.hash());
}
mailbox.exists.lock().unwrap().insert_new(env.hash());
self.add_refresh_event(RefreshEvent {
account_hash: self.uid_store.account_hash,
mailbox_hash,
kind: Create(Box::new(env)),
});
}
debug!(
"Create event {} {} {}",
env.hash(),
env.subject(),
mailbox.path(),
);
if !env.is_seen() {
mailbox.unseen.lock().unwrap().insert_new(env.hash());
}
mailbox.exists.lock().unwrap().insert_new(env.hash());
self.add_refresh_event(RefreshEvent {
account_hash: self.uid_store.account_hash,
mailbox_hash,
kind: Create(Box::new(env)),
});
}
}
Err(e) => {
debug!(e);
}
}
Err(e) => {
debug!(e);
}
}
}
@ -202,12 +202,13 @@ impl ImapConnection {
self.read_response(&mut response, RequiredResponses::FETCH_REQUIRED).await
);
debug!(&response);
match super::protocol_parser::uid_fetch_responses(&response) {
match super::protocol_parser::fetch_responses(&response) {
Ok((_, v, _)) => {
for UidFetchResponse {
for FetchResponse {
uid, flags, body, ..
} in v
{
let uid = uid.unwrap();
if !self
.uid_store
.uid_index
@ -278,69 +279,91 @@ impl ImapConnection {
}
}
}
UntaggedResponse::UIDFetch(uid, flags) => {
debug!("fetch uid {} {:?}", uid, flags);
let lck = self.uid_store.uid_index.lock().unwrap();
let env_hash = lck.get(&(mailbox_hash, uid)).copied();
drop(lck);
if let Some(env_hash) = env_hash {
if !flags.0.intersects(crate::email::Flag::SEEN) {
mailbox.unseen.lock().unwrap().insert_new(env_hash);
UntaggedResponse::Fetch(FetchResponse {
uid,
message_sequence_number: msg_seq,
modseq,
flags,
body: _,
envelope: _,
}) => {
if let Some(modseq) = modseq {
if self
.uid_store
.reverse_modseq
.lock()
.unwrap()
.entry(mailbox_hash)
.or_default()
.contains_key(&modseq)
{
return Ok(true);
}
}
if let Some(flags) = flags {
let uid = if let Some(uid) = uid {
uid
} else {
mailbox.unseen.lock().unwrap().remove(env_hash);
}
self.add_refresh_event(RefreshEvent {
account_hash: self.uid_store.account_hash,
mailbox_hash,
kind: NewFlags(env_hash, flags),
});
};
}
UntaggedResponse::Fetch(msg_seq, flags) => {
/* a * {msg_seq} FETCH (FLAGS ({flags})) was received, so find out UID from msg_seq
* and send update
*/
debug!("fetch {} {:?}", msg_seq, flags);
try_fail!(
mailbox_hash,
self.send_command(
&[
b"UID SEARCH",
format!("{}", msg_seq).as_bytes(),
]
.join(&b' '),
).await
self.read_response(&mut response, RequiredResponses::SEARCH).await
);
debug!(&response);
match super::protocol_parser::search_results(
response.split_rn().next().unwrap_or("").as_bytes(),
)
.map(|(_, v)| v)
{
Ok(mut v) => {
if let Some(uid) = v.pop() {
let lck = self.uid_store.uid_index.lock().unwrap();
let env_hash = lck.get(&(mailbox_hash, uid)).copied();
drop(lck);
if let Some(env_hash) = env_hash {
if !flags.0.intersects(crate::email::Flag::SEEN) {
mailbox.unseen.lock().unwrap().insert_new(env_hash);
} else {
mailbox.unseen.lock().unwrap().remove(env_hash);
}
self.add_refresh_event(RefreshEvent {
account_hash: self.uid_store.account_hash,
mailbox_hash,
kind: NewFlags(env_hash, flags),
});
};
}
}
Err(e) => {
try_fail!(
mailbox_hash,
self.send_command(
&[
b"UID SEARCH",
format!("{}", msg_seq).as_bytes(),
]
.join(&b' '),
).await
self.read_response(&mut response, RequiredResponses::SEARCH).await
);
debug!(&response);
debug!(e);
}
match super::protocol_parser::search_results(
response.split_rn().next().unwrap_or("").as_bytes(),
)
.map(|(_, v)| v)
{
Ok(mut v) if v.len() == 1 => v.pop().unwrap(),
Ok(_) => {
return Ok(false);
}
Err(e) => {
debug!(&response);
debug!(e);
return Ok(false);
}
}
};
debug!("fetch uid {} {:?}", uid, flags);
let lck = self.uid_store.uid_index.lock().unwrap();
let env_hash = lck.get(&(mailbox_hash, uid)).copied();
drop(lck);
if let Some(env_hash) = env_hash {
if !flags.0.intersects(crate::email::Flag::SEEN) {
mailbox.unseen.lock().unwrap().insert_new(env_hash);
} else {
mailbox.unseen.lock().unwrap().remove(env_hash);
}
if let Some(modseq) = modseq {
self.uid_store
.reverse_modseq
.lock()
.unwrap()
.entry(mailbox_hash)
.or_default()
.insert(modseq, env_hash);
self.uid_store
.modseq
.lock()
.unwrap()
.insert(env_hash, modseq);
}
self.add_refresh_event(RefreshEvent {
account_hash: self.uid_store.account_hash,
mailbox_hash,
kind: NewFlags(env_hash, flags),
});
};
}
}
}

View File

@ -20,8 +20,6 @@
*/
use super::*;
use crate::backends::SpecialUsageMailbox;
use crate::email::parser::BytesExt;
use crate::email::parser::BytesIterExt;
use std::sync::Arc;
/// Arguments for IMAP watching functions
@ -31,25 +29,6 @@ pub struct ImapWatchKit {
pub uid_store: Arc<UIDStore>,
}
macro_rules! exit_on_error {
($conn:expr, $mailbox_hash:ident, $($result:expr)+) => {
$(if let Err(e) = $result {
*$conn.uid_store.is_online.lock().unwrap() = (
Instant::now(),
Err(e.clone()),
);
debug!("failure: {}", e.to_string());
let account_hash = $conn.uid_store.account_hash;
$conn.add_refresh_event(RefreshEvent {
account_hash,
mailbox_hash: $mailbox_hash,
kind: RefreshEventKind::Failure(e.clone()),
});
Err(e)
} else { Ok(()) }?;)+
};
}
pub async fn poll_with_examine(kit: ImapWatchKit) -> Result<()> {
debug!("poll with examine");
let ImapWatchKit {
@ -61,13 +40,13 @@ pub async fn poll_with_examine(kit: ImapWatchKit) -> Result<()> {
let mut response = String::with_capacity(8 * 1024);
loop {
let mailboxes: HashMap<MailboxHash, ImapMailbox> = {
let mailboxes_lck = uid_store.mailboxes.lock().await;
let mailboxes_lck = timeout(Duration::from_secs(3), uid_store.mailboxes.lock()).await?;
mailboxes_lck.clone()
};
for (_, mailbox) in mailboxes {
examine_updates(mailbox, &mut conn, &uid_store).await?;
}
let mut main_conn = main_conn.lock().await;
let mut main_conn = timeout(Duration::from_secs(3), main_conn.lock()).await?;
main_conn.send_command(b"NOOP").await?;
main_conn
.read_response(&mut response, RequiredResponses::empty())
@ -95,37 +74,24 @@ pub async fn idle(kit: ImapWatchKit) -> Result<()> {
{
Some(mailbox) => mailbox,
None => {
let err = MeliError::new("INBOX mailbox not found in local mailbox index. meli may have not parsed the IMAP mailboxes correctly");
debug!("failure: {}", err.to_string());
conn.add_refresh_event(RefreshEvent {
account_hash: uid_store.account_hash,
mailbox_hash: 0,
kind: RefreshEventKind::Failure(err.clone()),
});
return Err(err);
return Err(MeliError::new("INBOX mailbox not found in local mailbox index. meli may have not parsed the IMAP mailboxes correctly"));
}
};
let mailbox_hash = mailbox.hash();
let uidvalidity;
let mut response = String::with_capacity(8 * 1024);
exit_on_error!(
conn,
mailbox_hash,
conn.send_command(format!("SELECT \"{}\"", mailbox.imap_path()).as_bytes())
.await
conn.read_response(&mut response, RequiredResponses::SELECT_REQUIRED)
.await
);
conn.send_command(format!("SELECT \"{}\"", mailbox.imap_path()).as_bytes())
.await?;
conn.read_response(&mut response, RequiredResponses::SELECT_REQUIRED)
.await?;
debug!("select response {}", &response);
{
let mut prev_exists = mailbox.exists.lock().unwrap();
match protocol_parser::select_response(&response) {
Ok(ok) => {
{
uidvalidity = ok.uidvalidity;
let mut uidvalidities = uid_store.uidvalidity.lock().unwrap();
let uidvalidities = uid_store.uidvalidity.lock().unwrap();
if let Some(v) = uidvalidities.get_mut(&mailbox_hash) {
if let Some(v) = uidvalidities.get(&mailbox_hash) {
if *v != ok.uidvalidity {
conn.add_refresh_event(RefreshEvent {
account_hash: uid_store.account_hash,
@ -138,7 +104,6 @@ pub async fn idle(kit: ImapWatchKit) -> Result<()> {
uid_store.hash_index.lock().unwrap().clear();
uid_store.byte_cache.lock().unwrap().clear();
*/
*v = ok.uidvalidity;
}
} else {
conn.add_refresh_event(RefreshEvent {
@ -146,15 +111,11 @@ pub async fn idle(kit: ImapWatchKit) -> Result<()> {
mailbox_hash,
kind: RefreshEventKind::Rescan,
});
conn.add_refresh_event(RefreshEvent {
account_hash: uid_store.account_hash,
mailbox_hash,
kind: RefreshEventKind::Failure(MeliError::new(format!(
"Unknown mailbox: {} {}",
mailbox.path(),
mailbox_hash
))),
});
return Err(MeliError::new(format!(
"Unknown mailbox: {} {}",
mailbox.path(),
mailbox_hash
)));
}
}
debug!(&ok);
@ -165,7 +126,7 @@ pub async fn idle(kit: ImapWatchKit) -> Result<()> {
}
};
}
exit_on_error!(conn, mailbox_hash, conn.send_command(b"IDLE").await);
conn.send_command(b"IDLE").await?;
let mut blockn = ImapBlockingConnection::from(conn);
let mut beat = std::time::Instant::now();
let mut watch = std::time::Instant::now();
@ -176,358 +137,50 @@ pub async fn idle(kit: ImapWatchKit) -> Result<()> {
while let Some(line) = blockn.as_stream().await {
let now = std::time::Instant::now();
if now.duration_since(beat) >= _26_MINS {
let mut main_conn_lck = main_conn.lock().await;
exit_on_error!(
blockn.conn,
mailbox_hash,
blockn.conn.send_raw(b"DONE").await
blockn.conn.read_response(&mut response, RequiredResponses::empty()).await
blockn.conn.send_command(b"IDLE").await
main_conn_lck.send_command(b"NOOP").await
main_conn_lck.read_response(&mut response, RequiredResponses::empty()).await
);
let mut main_conn_lck = timeout(Duration::from_secs(3), main_conn.lock()).await?;
blockn.conn.send_raw(b"DONE").await?;
blockn
.conn
.read_response(&mut response, RequiredResponses::empty())
.await?;
blockn.conn.send_command(b"IDLE").await?;
main_conn_lck.send_command(b"NOOP").await?;
main_conn_lck
.read_response(&mut response, RequiredResponses::empty())
.await?;
beat = now;
}
if now.duration_since(watch) >= _5_MINS {
/* Time to poll all inboxes */
let mut conn = main_conn.lock().await;
let mut conn = timeout(Duration::from_secs(3), main_conn.lock()).await?;
let mailboxes: HashMap<MailboxHash, ImapMailbox> = {
let mailboxes_lck = uid_store.mailboxes.lock().await;
let mailboxes_lck =
timeout(Duration::from_secs(3), uid_store.mailboxes.lock()).await?;
mailboxes_lck.clone()
};
for (_, mailbox) in mailboxes {
exit_on_error!(
conn,
mailbox_hash,
examine_updates(mailbox, &mut conn, &uid_store).await
);
examine_updates(mailbox, &mut conn, &uid_store).await?;
}
watch = now;
}
*uid_store.is_online.lock().unwrap() = (Instant::now(), Ok(()));
match protocol_parser::untagged_responses(line.as_slice())
.map(|(_, v)| v)
.map_err(MeliError::from)
{
Ok(Some(Recent(_r))) => {
let mut conn = main_conn.lock().await;
/* UID SEARCH RECENT */
exit_on_error!(
conn,
mailbox_hash,
conn.examine_mailbox(mailbox_hash, &mut response, false).await
conn.send_command(b"UID SEARCH RECENT").await
conn.read_response(&mut response, RequiredResponses::SEARCH).await
);
match protocol_parser::search_results_raw(response.as_bytes())
.map(|(_, v)| v)
.map_err(MeliError::from)
{
Ok(&[]) => {
debug!("UID SEARCH RECENT returned no results");
}
Ok(v) => {
exit_on_error!(
conn,
mailbox_hash,
conn.send_command(
&[&b"UID FETCH"[..], &v.trim().split(|b| b == &b' ').join(b','), &b"(FLAGS RFC822)"[..]]
.join(&b' '),
).await
conn.read_response(&mut response, RequiredResponses::FETCH_REQUIRED).await
);
debug!(&response);
match protocol_parser::uid_fetch_responses(&response) {
Ok((_, v, _)) => {
for UidFetchResponse {
uid, flags, body, ..
} in v
{
if !uid_store
.uid_index
.lock()
.unwrap()
.contains_key(&(mailbox_hash, uid))
{
if let Ok(mut env) = Envelope::from_bytes(
/* unwrap() is safe since we ask for RFC822 in the
* above FETCH, thus uid_fetch_responses() if
* returns a successful parse, it will include the
* RFC822 response */
body.unwrap(),
flags.as_ref().map(|&(f, _)| f),
) {
uid_store
.hash_index
.lock()
.unwrap()
.insert(env.hash(), (uid, mailbox_hash));
uid_store
.uid_index
.lock()
.unwrap()
.insert((mailbox_hash, uid), env.hash());
debug!(
"Create event {} {} {}",
env.hash(),
env.subject(),
mailbox.path(),
);
if let Some((_, keywords)) = flags {
let mut tag_lck =
uid_store.tag_index.write().unwrap();
for f in keywords {
let hash = tag_hash!(f);
if !tag_lck.contains_key(&hash) {
tag_lck.insert(hash, f);
}
env.labels_mut().push(hash);
}
}
if !env.is_seen() {
mailbox
.unseen
.lock()
.unwrap()
.insert_new(env.hash());
}
if uid_store.cache_headers {
cache::save_envelopes(
uid_store.account_hash,
mailbox_hash,
uidvalidity,
&[(uid, &env)],
)?;
}
mailbox.exists.lock().unwrap().insert_new(env.hash());
conn.add_refresh_event(RefreshEvent {
account_hash: uid_store.account_hash,
mailbox_hash,
kind: Create(Box::new(env)),
});
}
}
}
}
Err(e) => {
debug!(e);
}
}
}
Err(e) => {
debug!(
"UID SEARCH RECENT err: {}\nresp: {}",
e.to_string(),
&response
);
}
}
}
Ok(Some(Expunge(n))) => {
// The EXPUNGE response reports that the specified message sequence
// number has been permanently removed from the mailbox. The message
// sequence number for each successive message in the mailbox is
// immediately decremented by 1, and this decrement is reflected in
// message sequence numbers in subsequent responses (including other
// untagged EXPUNGE responses).
let mut conn = main_conn.lock().await;
let deleted_uid = uid_store
.msn_index
.lock()
.unwrap()
.entry(mailbox_hash)
.or_default()
.remove(n);
debug!("expunge {}, UID = {}", n, deleted_uid);
let deleted_hash: EnvelopeHash = uid_store
.uid_index
.lock()
.unwrap()
.remove(&(mailbox_hash, deleted_uid))
.unwrap();
uid_store.hash_index.lock().unwrap().remove(&deleted_hash);
conn.add_refresh_event(RefreshEvent {
account_hash: uid_store.account_hash,
mailbox_hash,
kind: Remove(deleted_hash),
});
}
Ok(Some(Exists(n))) => {
let mut conn = main_conn.lock().await;
/* UID FETCH ALL UID, cross-ref, then FETCH difference headers
* */
debug!("exists {}", n);
if n > mailbox.exists.lock().unwrap().len() {
exit_on_error!(
conn,
mailbox_hash,
conn.examine_mailbox(mailbox_hash, &mut response, false).await
conn.send_command(
&[
b"FETCH",
format!("{}:{}", mailbox.exists.lock().unwrap().len() + 1, n).as_bytes(),
b"(UID FLAGS RFC822)",
]
.join(&b' '),
).await
conn.read_response(&mut response, RequiredResponses::FETCH_REQUIRED).await
);
match protocol_parser::uid_fetch_responses(&response) {
Ok((_, v, _)) => {
'fetch_responses_b: for UidFetchResponse {
uid, flags, body, ..
} in v
{
if uid_store
.uid_index
.lock()
.unwrap()
.contains_key(&(mailbox_hash, uid))
{
continue 'fetch_responses_b;
}
if let Ok(mut env) = Envelope::from_bytes(
body.unwrap(),
flags.as_ref().map(|&(f, _)| f),
) {
uid_store
.hash_index
.lock()
.unwrap()
.insert(env.hash(), (uid, mailbox_hash));
uid_store
.uid_index
.lock()
.unwrap()
.insert((mailbox_hash, uid), env.hash());
if let Some((_, keywords)) = flags {
let mut tag_lck = uid_store.tag_index.write().unwrap();
for f in keywords {
let hash = tag_hash!(f);
if !tag_lck.contains_key(&hash) {
tag_lck.insert(hash, f);
}
env.labels_mut().push(hash);
}
}
debug!(
"Create event {} {} {}",
env.hash(),
env.subject(),
mailbox.path(),
);
if !env.is_seen() {
mailbox.unseen.lock().unwrap().insert_new(env.hash());
}
if uid_store.cache_headers {
cache::save_envelopes(
uid_store.account_hash,
mailbox_hash,
uidvalidity,
&[(uid, &env)],
)?;
}
mailbox.exists.lock().unwrap().insert_new(env.hash());
conn.add_refresh_event(RefreshEvent {
account_hash: uid_store.account_hash,
mailbox_hash,
kind: Create(Box::new(env)),
});
}
}
}
Err(e) => {
debug!(e);
}
}
}
}
Ok(Some(UIDFetch(uid, flags))) => {
let res = uid_store
.uid_index
.lock()
.unwrap()
.get(&(mailbox_hash, uid))
.map(|h| *h);
if let Some(env_hash) = res {
if !flags.0.intersects(crate::email::Flag::SEEN) {
mailbox.unseen.lock().unwrap().insert_new(env_hash);
} else {
mailbox.unseen.lock().unwrap().remove(env_hash);
}
let mut conn = main_conn.lock().await;
conn.add_refresh_event(RefreshEvent {
account_hash: uid_store.account_hash,
mailbox_hash,
kind: NewFlags(env_hash, flags),
});
}
}
Ok(Some(Fetch(msg_seq, flags))) => {
/* a * {msg_seq} FETCH (FLAGS ({flags})) was received, so find out UID from msg_seq
* and send update
*/
let mut conn = main_conn.lock().await;
debug!("fetch {} {:?}", msg_seq, flags);
exit_on_error!(
conn,
mailbox_hash,
conn.examine_mailbox(mailbox_hash, &mut response, false).await
conn.send_command(
&[
b"UID SEARCH",
format!("{}", msg_seq).as_bytes(),
]
.join(&b' '),
).await
conn.read_response(&mut response, RequiredResponses::SEARCH).await
);
match search_results(response.split_rn().next().unwrap_or("").as_bytes())
.map(|(_, v)| v)
{
Ok(mut v) => {
if let Some(uid) = v.pop() {
if let Some(env_hash) = uid_store
.uid_index
.lock()
.unwrap()
.get(&(mailbox_hash, uid))
{
if !flags.0.intersects(crate::email::Flag::SEEN) {
mailbox.unseen.lock().unwrap().insert_new(*env_hash);
} else {
mailbox.unseen.lock().unwrap().remove(*env_hash);
}
conn.add_refresh_event(RefreshEvent {
account_hash: uid_store.account_hash,
mailbox_hash,
kind: NewFlags(*env_hash, flags),
});
}
}
}
Err(e) => {
debug!(&response);
debug!(e);
}
}
}
Ok(Some(Bye { .. })) => break,
Ok(None) | Err(_) => {}
let mut conn = timeout(Duration::from_secs(3), main_conn.lock()).await?;
conn.process_untagged(to_str!(&line)).await?;
}
*uid_store.is_online.lock().unwrap() = (Instant::now(), Ok(()));
}
debug!("IDLE connection dropped");
let err: &str = blockn.err().unwrap_or("Unknown reason.");
main_conn.lock().await.add_refresh_event(RefreshEvent {
account_hash: uid_store.account_hash,
mailbox_hash,
kind: RefreshEventKind::Failure(MeliError::new(format!(
"IDLE connection dropped: {}",
&err
))),
});
timeout(Duration::from_secs(3), main_conn.lock())
.await?
.add_refresh_event(RefreshEvent {
account_hash: uid_store.account_hash,
mailbox_hash,
kind: RefreshEventKind::Failure(MeliError::new(format!(
"IDLE connection dropped: {}",
&err
))),
});
Err(MeliError::new(format!("IDLE connection dropped: {}", err)))
}
@ -538,260 +191,170 @@ pub async fn examine_updates(
) -> Result<()> {
let mailbox_hash = mailbox.hash();
debug!("examining mailbox {} {}", mailbox_hash, mailbox.path());
let mut response = String::with_capacity(8 * 1024);
exit_on_error!(
conn,
mailbox_hash,
if let Some(new_envelopes) = conn.resync(mailbox_hash).await? {
for env in new_envelopes {
conn.add_refresh_event(RefreshEvent {
mailbox_hash,
account_hash: uid_store.account_hash,
kind: RefreshEventKind::Create(Box::new(env)),
});
}
} else {
let mut response = String::with_capacity(8 * 1024);
conn.examine_mailbox(mailbox_hash, &mut response, true)
.await
);
*uid_store.is_online.lock().unwrap() = (Instant::now(), Ok(()));
let uidvalidity;
match protocol_parser::select_response(&response) {
Ok(ok) => {
uidvalidity = ok.uidvalidity;
debug!(&ok);
{
let mut uidvalidities = uid_store.uidvalidity.lock().unwrap();
.await?;
*uid_store.is_online.lock().unwrap() = (Instant::now(), Ok(()));
let select_response = protocol_parser::select_response(&response)
.chain_err_summary(|| "could not select mailbox")?;
debug!(&select_response);
{
let uidvalidities = uid_store.uidvalidity.lock().unwrap();
if let Some(v) = uidvalidities.get_mut(&mailbox_hash) {
if *v != ok.uidvalidity {
conn.add_refresh_event(RefreshEvent {
account_hash: uid_store.account_hash,
mailbox_hash,
kind: RefreshEventKind::Rescan,
});
/*
uid_store.uid_index.lock().unwrap().clear();
uid_store.hash_index.lock().unwrap().clear();
uid_store.byte_cache.lock().unwrap().clear();
*/
*v = ok.uidvalidity;
}
} else {
if let Some(v) = uidvalidities.get(&mailbox_hash) {
if *v != select_response.uidvalidity {
let cache_handle = cache::CacheHandle::get(uid_store.clone())?;
cache_handle.clear(
mailbox_hash,
select_response.uidvalidity,
select_response.highestmodseq.and_then(|i| i.ok()),
)?;
conn.add_refresh_event(RefreshEvent {
account_hash: uid_store.account_hash,
mailbox_hash,
kind: RefreshEventKind::Rescan,
});
conn.add_refresh_event(RefreshEvent {
account_hash: uid_store.account_hash,
mailbox_hash,
kind: RefreshEventKind::Failure(MeliError::new(format!(
"Unknown mailbox: {} {}",
mailbox.path(),
mailbox_hash
))),
});
/*
uid_store.uid_index.lock().unwrap().clear();
uid_store.hash_index.lock().unwrap().clear();
uid_store.byte_cache.lock().unwrap().clear();
*/
return Ok(());
}
}
let n = ok.exists;
if ok.recent > 0 {
{
/* UID SEARCH RECENT */
exit_on_error!(
conn,
mailbox_hash,
conn.send_command(b"UID SEARCH RECENT").await
conn.read_response(&mut response, RequiredResponses::SEARCH).await
);
match protocol_parser::search_results_raw(response.as_bytes())
.map(|(_, v)| v)
.map_err(MeliError::from)
{
Ok(&[]) => {
debug!("UID SEARCH RECENT returned no results");
}
Ok(v) => {
exit_on_error!(
conn,
mailbox_hash,
conn.send_command(
&[&b"UID FETCH"[..], &v.trim().split(|b| b == &b' ').join(b','), &b"(FLAGS RFC822)"[..]]
.join(&b' '),
).await
conn.read_response(&mut response, RequiredResponses::FETCH_REQUIRED).await
);
debug!(&response);
match protocol_parser::uid_fetch_responses(&response) {
Ok((_, v, _)) => {
'fetch_responses_c: for UidFetchResponse {
uid,
flags,
body,
..
} in v
{
if uid_store
.uid_index
.lock()
.unwrap()
.contains_key(&(mailbox_hash, uid))
{
continue 'fetch_responses_c;
}
if let Ok(mut env) = Envelope::from_bytes(
body.unwrap(),
flags.as_ref().map(|&(f, _)| f),
) {
uid_store
.hash_index
.lock()
.unwrap()
.insert(env.hash(), (uid, mailbox_hash));
uid_store
.uid_index
.lock()
.unwrap()
.insert((mailbox_hash, uid), env.hash());
debug!(
"Create event {} {} {}",
env.hash(),
env.subject(),
mailbox.path(),
);
if let Some((_, keywords)) = flags {
let mut tag_lck =
uid_store.tag_index.write().unwrap();
for f in keywords {
let hash = tag_hash!(f);
if !tag_lck.contains_key(&hash) {
tag_lck.insert(hash, f);
}
env.labels_mut().push(hash);
}
}
if !env.is_seen() {
mailbox
.unseen
.lock()
.unwrap()
.insert_new(env.hash());
}
if uid_store.cache_headers {
cache::save_envelopes(
uid_store.account_hash,
mailbox_hash,
uidvalidity,
&[(uid, &env)],
)?;
}
let mut prev_exists = mailbox.exists.lock().unwrap();
prev_exists.insert_new(env.hash());
conn.add_refresh_event(RefreshEvent {
account_hash: uid_store.account_hash,
mailbox_hash,
kind: Create(Box::new(env)),
});
}
}
}
Err(e) => {
debug!(e);
}
}
}
Err(e) => {
debug!(
"UID SEARCH RECENT err: {}\nresp: {}",
e.to_string(),
&response
);
}
}
}
} else if n > mailbox.exists.lock().unwrap().len() {
/* UID FETCH ALL UID, cross-ref, then FETCH difference headers
* */
debug!("exists {}", n);
exit_on_error!(
conn,
} else {
conn.add_refresh_event(RefreshEvent {
account_hash: uid_store.account_hash,
mailbox_hash,
conn.send_command(
&[
b"FETCH",
format!("{}:{}", mailbox.exists.lock().unwrap().len() + 1, n).as_bytes(),
b"(UID FLAGS RFC822)",
]
.join(&b' '),
).await
conn.read_response(&mut response, RequiredResponses::FETCH_REQUIRED).await
);
match protocol_parser::uid_fetch_responses(&response) {
Ok((_, v, _)) => {
'fetch_responses_a: for UidFetchResponse {
uid, flags, body, ..
} in v
{
if uid_store
.uid_index
.lock()
.unwrap()
.contains_key(&(mailbox_hash, uid))
{
continue 'fetch_responses_a;
}
if let Ok(mut env) =
Envelope::from_bytes(body.unwrap(), flags.as_ref().map(|&(f, _)| f))
{
uid_store
.hash_index
.lock()
.unwrap()
.insert(env.hash(), (uid, mailbox_hash));
uid_store
.uid_index
.lock()
.unwrap()
.insert((mailbox_hash, uid), env.hash());
if let Some((_, keywords)) = flags {
let mut tag_lck = uid_store.tag_index.write().unwrap();
for f in keywords {
let hash = tag_hash!(f);
if !tag_lck.contains_key(&hash) {
tag_lck.insert(hash, f);
}
env.labels_mut().push(hash);
}
}
debug!(
"Create event {} {} {}",
env.hash(),
env.subject(),
mailbox.path(),
);
if !env.is_seen() {
mailbox.unseen.lock().unwrap().insert_new(env.hash());
}
if uid_store.cache_headers {
cache::save_envelopes(
uid_store.account_hash,
mailbox_hash,
uidvalidity,
&[(uid, &env)],
)?;
}
mailbox.exists.lock().unwrap().insert_new(env.hash());
conn.add_refresh_event(RefreshEvent {
account_hash: uid_store.account_hash,
mailbox_hash,
kind: Create(Box::new(env)),
});
}
}
}
Err(e) => {
debug!(e);
}
}
kind: RefreshEventKind::Rescan,
});
return Err(MeliError::new(format!(
"Unknown mailbox: {} {}",
mailbox.path(),
mailbox_hash
)));
}
}
Err(e) => {
debug!("{:?}", e);
return Err(e).chain_err_summary(|| "could not select mailbox");
let mut cache_handle = cache::CacheHandle::get(uid_store.clone())?;
if debug!(select_response.recent > 0) {
/* UID SEARCH RECENT */
conn.send_command(b"UID SEARCH RECENT").await?;
conn.read_response(&mut response, RequiredResponses::SEARCH)
.await?;
let v = protocol_parser::search_results(response.as_bytes()).map(|(_, v)| v)?;
if v.is_empty() {
debug!("search response was empty: {}", response);
return Ok(());
}
let mut cmd = "UID FETCH ".to_string();
if v.len() == 1 {
cmd.push_str(&v[0].to_string());
} else {
cmd.push_str(&v[0].to_string());
for n in v.into_iter().skip(1) {
cmd.push(',');
cmd.push_str(&n.to_string());
}
}
cmd.push_str(" (UID FLAGS RFC822)");
conn.send_command(cmd.as_bytes()).await?;
conn.read_response(&mut response, RequiredResponses::FETCH_REQUIRED)
.await?;
} else if debug!(select_response.exists > mailbox.exists.lock().unwrap().len()) {
conn.send_command(
format!(
"FETCH {}:* (UID FLAGS RFC822)",
mailbox.exists.lock().unwrap().len()
)
.as_bytes(),
)
.await?;
conn.read_response(&mut response, RequiredResponses::FETCH_REQUIRED)
.await?;
} else {
return Ok(());
}
};
debug!(&response);
let (_, mut v, _) = protocol_parser::fetch_responses(&response)?;
for FetchResponse {
ref uid,
ref mut flags,
ref mut body,
ref mut envelope,
..
} in v.iter_mut()
{
let uid = uid.unwrap();
*envelope = Envelope::from_bytes(body.take().unwrap(), flags.as_ref().map(|&(f, _)| f))
.map(|mut env| {
env.set_hash(generate_envelope_hash(&mailbox.imap_path(), &uid));
if let Some((_, keywords)) = flags.take() {
let mut tag_lck = uid_store.tag_index.write().unwrap();
for f in keywords {
let hash = tag_hash!(f);
if !tag_lck.contains_key(&hash) {
tag_lck.insert(hash, f);
}
env.labels_mut().push(hash);
}
}
env
})
.map_err(|err| {
debug!("uid {} envelope parse error {}", uid, &err);
err
})
.ok();
}
if uid_store.keep_offline_cache {
cache_handle.insert_envelopes(mailbox_hash, &v)?;
}
'fetch_responses_c: for FetchResponse { uid, envelope, .. } in v {
let uid = uid.unwrap();
if uid_store
.uid_index
.lock()
.unwrap()
.contains_key(&(mailbox_hash, uid))
{
continue 'fetch_responses_c;
}
if let Some(env) = envelope {
uid_store
.hash_index
.lock()
.unwrap()
.insert(env.hash(), (uid, mailbox_hash));
uid_store
.uid_index
.lock()
.unwrap()
.insert((mailbox_hash, uid), env.hash());
debug!(
"Create event {} {} {}",
env.hash(),
env.subject(),
mailbox.path(),
);
if !env.is_seen() {
mailbox.unseen.lock().unwrap().insert_new(env.hash());
}
mailbox.exists.lock().unwrap().insert_new(env.hash());
conn.add_refresh_event(RefreshEvent {
account_hash: uid_store.account_hash,
mailbox_hash,
kind: Create(Box::new(env)),
});
}
}
}
Ok(())
}

View File

@ -19,7 +19,8 @@
* along with meli. If not, see <http://www.gnu.org/licenses/>.
*/
use crate::{error::*, logging::log};
use crate::{error::*, logging::log, Envelope};
use rusqlite::types::{FromSql, FromSqlError, FromSqlResult, ToSql, ToSqlOutput};
pub use rusqlite::{self, params, Connection};
use std::path::PathBuf;
@ -94,3 +95,20 @@ pub fn open_or_create_db(
Ok(conn)
}
impl ToSql for Envelope {
fn to_sql(&self) -> rusqlite::Result<ToSqlOutput> {
let v: Vec<u8> = bincode::serialize(self).map_err(|e| {
rusqlite::Error::ToSqlConversionFailure(Box::new(MeliError::new(e.to_string())))
})?;
Ok(ToSqlOutput::from(v))
}
}
impl FromSql for Envelope {
fn column_result(value: rusqlite::types::ValueRef) -> FromSqlResult<Self> {
let b: Vec<u8> = FromSql::column_result(value)?;
Ok(bincode::deserialize(&b)
.map_err(|e| FromSqlError::Other(Box::new(MeliError::new(e.to_string()))))?)
}
}