Compare commits

...

2 Commits

Author SHA1 Message Date
Manos Pitsidianakis 77e4488637
lazy_fetch WIP 2021-01-15 19:13:34 +02:00
Manos Pitsidianakis 819d993f11
melib/backends: replace watch() with watcher(), BackendWatcher trait
```
Return a [`Box<dyn BackendWatcher>`](BackendWatcher), to which you can
register the mailboxes you are interested in for updates and then
consume to spawn a watching `Future`.  The `Future` sends events to the
[`BackendEventConsumer`](BackendEventConsumer) supplied to the backend
in its constructor method.
```

Watching mailboxes for updates is more flexible now that you can
explicitly register mailboxes and set polling period.
2021-01-15 19:12:09 +02:00
21 changed files with 6099 additions and 1213 deletions

View File

@ -310,13 +310,26 @@ pub trait MailBackend: ::std::fmt::Debug + Send + Sync {
Ok(Box::pin(async { Ok(()) }))
}
fn fetch_batch(&mut self, env_hashes: EnvelopeHashBatch) -> ResultFuture<()> {
Err(MeliError::new("Unimplemented."))
}
fn load(&mut self, mailbox_hash: MailboxHash) -> ResultFuture<()> {
Err(MeliError::new("Unimplemented."))
}
fn fetch(
&mut self,
mailbox_hash: MailboxHash,
) -> Result<Pin<Box<dyn Stream<Item = Result<Vec<Envelope>>> + Send + 'static>>>;
fn refresh(&mut self, mailbox_hash: MailboxHash) -> ResultFuture<()>;
fn watch(&self) -> ResultFuture<()>;
/// Return a [`Box<dyn BackendWatcher>`](BackendWatcher), to which you can register the
/// mailboxes you are interested in for updates and then consume to spawn a watching `Future`.
/// The `Future` sends events to the [`BackendEventConsumer`](BackendEventConsumer) supplied to
/// the backend in its constructor method.
fn watcher(&self) -> Result<Box<dyn BackendWatcher>>;
fn mailboxes(&self) -> ResultFuture<HashMap<MailboxHash, Mailbox>>;
fn operation(&self, hash: EnvelopeHash) -> Result<Box<dyn BackendOp>>;
@ -623,7 +636,7 @@ impl EnvelopeHashBatch {
#[derive(Default, Clone)]
pub struct LazyCountSet {
not_yet_seen: usize,
set: BTreeSet<EnvelopeHash>,
pub set: BTreeSet<EnvelopeHash>,
}
impl fmt::Debug for LazyCountSet {
@ -711,3 +724,31 @@ impl std::ops::Deref for IsSubscribedFn {
&self.0
}
}
/// Urgency for the events of a single Mailbox.
#[derive(Debug, Copy, Clone, PartialEq)]
pub enum MailboxWatchUrgency {
High,
Medium,
Low,
}
/// Register the mailboxes you are interested in for updates and then consume with `spawn` to spawn
/// a watching `Future`. The `Future` sends events to the
/// [`BackendEventConsumer`](backends::BackendEventConsumer) supplied to the backend in its constructor
/// method.
pub trait BackendWatcher: ::std::fmt::Debug + Send + Sync {
/// Whether the watcher's `Future` requires blocking I/O.
fn is_blocking(&self) -> bool;
fn register_mailbox(
&mut self,
mailbox_hash: MailboxHash,
urgency: MailboxWatchUrgency,
) -> Result<()>;
fn set_polling_period(&mut self, period: Option<std::time::Duration>) -> Result<()>;
fn spawn(self: Box<Self>) -> ResultFuture<()>;
/// Use the [`Any`](std::any::Any) trait to get the underlying type implementing the
/// [`BackendWatcher`](backends::BackendEventConsumer) trait.
fn as_any(&self) -> &dyn Any;
fn as_any_mut(&mut self) -> &mut dyn Any;
}

View File

@ -370,13 +370,13 @@ impl MailBackend for ImapType {
let main_conn = self.connection.clone();
let uid_store = self.uid_store.clone();
Ok(Box::pin(async move {
let inbox = timeout(uid_store.timeout, uid_store.mailboxes.lock())
let mut inbox = timeout(uid_store.timeout, uid_store.mailboxes.lock())
.await?
.get(&mailbox_hash)
.map(std::clone::Clone::clone)
.unwrap();
let mut conn = timeout(uid_store.timeout, main_conn.lock()).await?;
watch::examine_updates(inbox, &mut conn, &uid_store).await?;
watch::ImapWatcher::examine_updates(&mut inbox, &mut conn, &uid_store).await?;
Ok(())
}))
}
@ -450,68 +450,16 @@ impl MailBackend for ImapType {
}))
}
fn watch(&self) -> ResultFuture<()> {
fn watcher(&self) -> Result<Box<dyn BackendWatcher>> {
let server_conf = self.server_conf.clone();
let main_conn = self.connection.clone();
let uid_store = self.uid_store.clone();
Ok(Box::pin(async move {
let has_idle: bool = match server_conf.protocol {
ImapProtocol::IMAP {
extension_use: ImapExtensionUse { idle, .. },
} => {
idle && uid_store
.capabilities
.lock()
.unwrap()
.iter()
.any(|cap| cap.eq_ignore_ascii_case(b"IDLE"))
}
_ => false,
};
while let Err(err) = if has_idle {
idle(ImapWatchKit {
conn: ImapConnection::new_connection(&server_conf, uid_store.clone()),
main_conn: main_conn.clone(),
uid_store: uid_store.clone(),
})
.await
} else {
poll_with_examine(ImapWatchKit {
conn: ImapConnection::new_connection(&server_conf, uid_store.clone()),
main_conn: main_conn.clone(),
uid_store: uid_store.clone(),
})
.await
} {
let mut main_conn_lck = timeout(uid_store.timeout, main_conn.lock()).await?;
if err.kind.is_network() {
uid_store.is_online.lock().unwrap().1 = Err(err.clone());
} else {
return Err(err);
}
debug!("Watch failure: {}", err.to_string());
match timeout(uid_store.timeout, main_conn_lck.connect())
.await
.and_then(|res| res)
{
Err(err2) => {
debug!("Watch reconnect attempt failed: {}", err2.to_string());
}
Ok(()) => {
debug!("Watch reconnect attempt succesful");
continue;
}
}
let account_hash = uid_store.account_hash;
main_conn_lck.add_refresh_event(RefreshEvent {
account_hash,
mailbox_hash: 0,
kind: RefreshEventKind::Failure(err.clone()),
});
return Err(err);
}
debug!("watch future returning");
Ok(())
Ok(Box::new(ImapWatcher {
main_conn,
uid_store,
mailbox_hashes: BTreeSet::default(),
polling_period: std::time::Duration::from_secs(5 * 60),
server_conf,
}))
}

View File

@ -588,7 +588,9 @@ impl ImapConnection {
pub fn connect<'a>(&'a mut self) -> Pin<Box<dyn Future<Output = Result<()>> + Send + 'a>> {
Box::pin(async move {
if let (time, ref mut status @ Ok(())) = *self.uid_store.is_online.lock().unwrap() {
if SystemTime::now().duration_since(time).unwrap_or_default() >= IMAP_PROTOCOL_TIMEOUT {
if SystemTime::now().duration_since(time).unwrap_or_default()
>= IMAP_PROTOCOL_TIMEOUT
{
let err = MeliError::new("Connection timed out").set_kind(ErrorKind::Timeout);
*status = Err(err.clone());
self.stream = Err(err);

View File

@ -23,201 +23,129 @@ use crate::backends::SpecialUsageMailbox;
use std::sync::Arc;
/// Arguments for IMAP watching functions
pub struct ImapWatchKit {
pub conn: ImapConnection,
#[derive(Debug)]
pub struct ImapWatcher {
pub main_conn: Arc<FutureMutex<ImapConnection>>,
pub uid_store: Arc<UIDStore>,
pub mailbox_hashes: BTreeSet<MailboxHash>,
pub polling_period: std::time::Duration,
pub server_conf: ImapServerConf,
}
pub async fn poll_with_examine(kit: ImapWatchKit) -> Result<()> {
debug!("poll with examine");
let ImapWatchKit {
mut conn,
main_conn: _,
uid_store,
} = kit;
conn.connect().await?;
let mailboxes: HashMap<MailboxHash, ImapMailbox> = {
let mailboxes_lck = timeout(uid_store.timeout, uid_store.mailboxes.lock()).await?;
mailboxes_lck.clone()
};
loop {
for (_, mailbox) in mailboxes.clone() {
examine_updates(mailbox, &mut conn, &uid_store).await?;
}
//FIXME: make sleep duration configurable
smol::Timer::after(std::time::Duration::from_secs(3 * 60)).await;
impl BackendWatcher for ImapWatcher {
fn is_blocking(&self) -> bool {
false
}
}
pub async fn idle(kit: ImapWatchKit) -> Result<()> {
debug!("IDLE");
/* IDLE only watches the connection's selected mailbox. We will IDLE on INBOX and every ~5
* minutes wake up and poll the others */
let ImapWatchKit {
mut conn,
main_conn,
uid_store,
} = kit;
conn.connect().await?;
let mailbox: ImapMailbox = match uid_store
.mailboxes
.lock()
.await
.values()
.find(|f| f.parent.is_none() && (f.special_usage() == SpecialUsageMailbox::Inbox))
.map(std::clone::Clone::clone)
{
Some(mailbox) => mailbox,
None => {
return Err(MeliError::new("INBOX mailbox not found in local mailbox index. meli may have not parsed the IMAP mailboxes correctly"));
fn register_mailbox(
&mut self,
mailbox_hash: MailboxHash,
_urgency: MailboxWatchUrgency,
) -> Result<()> {
self.mailbox_hashes.insert(mailbox_hash);
Ok(())
}
fn set_polling_period(&mut self, period: Option<std::time::Duration>) -> Result<()> {
if let Some(period) = period {
self.polling_period = period;
}
};
let mailbox_hash = mailbox.hash();
let mut response = Vec::with_capacity(8 * 1024);
let select_response = conn
.examine_mailbox(mailbox_hash, &mut response, true)
.await?
.unwrap();
{
let mut uidvalidities = uid_store.uidvalidity.lock().unwrap();
Ok(())
}
if let Some(v) = uidvalidities.get(&mailbox_hash) {
if *v != select_response.uidvalidity {
if uid_store.keep_offline_cache {
#[cfg(not(feature = "sqlite3"))]
let mut cache_handle = super::cache::DefaultCache::get(uid_store.clone())?;
#[cfg(feature = "sqlite3")]
let mut cache_handle = super::cache::Sqlite3Cache::get(uid_store.clone())?;
cache_handle.clear(mailbox_hash, &select_response)?;
fn spawn(mut self: Box<Self>) -> ResultFuture<()> {
Ok(Box::pin(async move {
let has_idle: bool = match self.server_conf.protocol {
ImapProtocol::IMAP {
extension_use: ImapExtensionUse { idle, .. },
} => {
idle && self
.uid_store
.capabilities
.lock()
.unwrap()
.iter()
.any(|cap| cap.eq_ignore_ascii_case(b"IDLE"))
}
conn.add_refresh_event(RefreshEvent {
account_hash: uid_store.account_hash,
mailbox_hash,
kind: RefreshEventKind::Rescan,
_ => false,
};
while let Err(err) = if has_idle {
self.idle().await
} else {
self.poll_with_examine().await
} {
let mut main_conn_lck =
timeout(self.uid_store.timeout, self.main_conn.lock()).await?;
if err.kind.is_network() {
self.uid_store.is_online.lock().unwrap().1 = Err(err.clone());
} else {
return Err(err);
}
debug!("Watch failure: {}", err.to_string());
match timeout(self.uid_store.timeout, main_conn_lck.connect())
.await
.and_then(|res| res)
{
Err(err2) => {
debug!("Watch reconnect attempt failed: {}", err2.to_string());
}
Ok(()) => {
debug!("Watch reconnect attempt succesful");
continue;
}
}
let account_hash = self.uid_store.account_hash;
main_conn_lck.add_refresh_event(RefreshEvent {
account_hash,
mailbox_hash: 0,
kind: RefreshEventKind::Failure(err.clone()),
});
/*
uid_store.uid_index.lock().unwrap().clear();
uid_store.hash_index.lock().unwrap().clear();
uid_store.byte_cache.lock().unwrap().clear();
*/
return Err(err);
}
} else {
uidvalidities.insert(mailbox_hash, select_response.uidvalidity);
}
debug!("watch future returning");
Ok(())
}))
}
let mailboxes: HashMap<MailboxHash, ImapMailbox> = {
let mailboxes_lck = timeout(uid_store.timeout, uid_store.mailboxes.lock()).await?;
mailboxes_lck.clone()
};
for (h, mailbox) in mailboxes.clone() {
if mailbox_hash == h {
continue;
}
examine_updates(mailbox, &mut conn, &uid_store).await?;
fn as_any(&self) -> &dyn Any {
self
}
conn.send_command(b"IDLE").await?;
let mut blockn = ImapBlockingConnection::from(conn);
let mut watch = std::time::Instant::now();
/* duration interval to send heartbeat */
const _10_MINS: std::time::Duration = std::time::Duration::from_secs(10 * 60);
/* duration interval to check other mailboxes for changes */
const _5_MINS: std::time::Duration = std::time::Duration::from_secs(5 * 60);
loop {
let line = match timeout(Some(_10_MINS), blockn.as_stream()).await {
Ok(Some(line)) => line,
Ok(None) => {
debug!("IDLE connection dropped: {:?}", &blockn.err());
blockn.conn.connect().await?;
let mut main_conn_lck = timeout(uid_store.timeout, main_conn.lock()).await?;
main_conn_lck.connect().await?;
continue;
}
Err(_) => {
/* Timeout */
blockn.conn.send_raw(b"DONE").await?;
blockn
.conn
.read_response(&mut response, RequiredResponses::empty())
.await?;
blockn.conn.send_command(b"IDLE").await?;
let mut main_conn_lck = timeout(uid_store.timeout, main_conn.lock()).await?;
main_conn_lck.connect().await?;
continue;
fn as_any_mut(&mut self) -> &mut dyn Any {
self
}
}
impl ImapWatcher {
pub async fn idle(&mut self) -> Result<()> {
debug!("IDLE");
/* IDLE only watches the connection's selected mailbox. We will IDLE on INBOX and every X
* minutes wake up and poll the others */
let ImapWatcher {
ref main_conn,
ref uid_store,
ref mailbox_hashes,
ref polling_period,
ref server_conf,
..
} = self;
let mut connection = ImapConnection::new_connection(server_conf, uid_store.clone());
connection.connect().await?;
let mailbox_hash: MailboxHash = match uid_store
.mailboxes
.lock()
.await
.values()
.find(|f| f.parent.is_none() && (f.special_usage() == SpecialUsageMailbox::Inbox))
.map(|f| f.hash)
{
Some(h) => h,
None => {
return Err(MeliError::new("INBOX mailbox not found in local mailbox index. meli may have not parsed the IMAP mailboxes correctly"));
}
};
let now = std::time::Instant::now();
if now.duration_since(watch) >= _5_MINS {
/* Time to poll all inboxes */
let mut conn = timeout(uid_store.timeout, main_conn.lock()).await?;
for (_h, mailbox) in mailboxes.clone() {
examine_updates(mailbox, &mut conn, &uid_store).await?;
}
watch = now;
}
if line
.split_rn()
.filter(|l| {
!l.starts_with(b"+ ")
&& !l.starts_with(b"* ok")
&& !l.starts_with(b"* ok")
&& !l.starts_with(b"* Ok")
&& !l.starts_with(b"* OK")
})
.count()
== 0
{
continue;
}
{
blockn.conn.send_raw(b"DONE").await?;
blockn
.conn
.read_response(&mut response, RequiredResponses::empty())
.await?;
for l in line.split_rn().chain(response.split_rn()) {
debug!("process_untagged {:?}", &l);
if l.starts_with(b"+ ")
|| l.starts_with(b"* ok")
|| l.starts_with(b"* ok")
|| l.starts_with(b"* Ok")
|| l.starts_with(b"* OK")
{
debug!("ignore continuation mark");
continue;
}
blockn.conn.process_untagged(l).await?;
}
blockn.conn.send_command(b"IDLE").await?;
}
}
}
pub async fn examine_updates(
mailbox: ImapMailbox,
conn: &mut ImapConnection,
uid_store: &Arc<UIDStore>,
) -> Result<()> {
if mailbox.no_select {
return Ok(());
}
let mailbox_hash = mailbox.hash();
debug!("examining mailbox {} {}", mailbox_hash, mailbox.path());
if let Some(new_envelopes) = conn.resync(mailbox_hash).await? {
for env in new_envelopes {
conn.add_refresh_event(RefreshEvent {
mailbox_hash,
account_hash: uid_store.account_hash,
kind: RefreshEventKind::Create(Box::new(env)),
});
}
} else {
#[cfg(not(feature = "sqlite3"))]
let mut cache_handle = super::cache::DefaultCache::get(uid_store.clone())?;
#[cfg(feature = "sqlite3")]
let mut cache_handle = super::cache::Sqlite3Cache::get(uid_store.clone())?;
let mut response = Vec::with_capacity(8 * 1024);
let select_response = conn
let select_response = connection
.examine_mailbox(mailbox_hash, &mut response, true)
.await?
.unwrap();
@ -227,9 +155,13 @@ pub async fn examine_updates(
if let Some(v) = uidvalidities.get(&mailbox_hash) {
if *v != select_response.uidvalidity {
if uid_store.keep_offline_cache {
#[cfg(not(feature = "sqlite3"))]
let mut cache_handle = super::cache::DefaultCache::get(uid_store.clone())?;
#[cfg(feature = "sqlite3")]
let mut cache_handle = super::cache::Sqlite3Cache::get(uid_store.clone())?;
cache_handle.clear(mailbox_hash, &select_response)?;
}
conn.add_refresh_event(RefreshEvent {
connection.add_refresh_event(RefreshEvent {
account_hash: uid_store.account_hash,
mailbox_hash,
kind: RefreshEventKind::Rescan,
@ -239,215 +171,384 @@ pub async fn examine_updates(
uid_store.hash_index.lock().unwrap().clear();
uid_store.byte_cache.lock().unwrap().clear();
*/
return Ok(());
}
} else {
uidvalidities.insert(mailbox_hash, select_response.uidvalidity);
}
}
if mailbox.is_cold() {
/* Mailbox hasn't been loaded yet */
let has_list_status: bool = conn
.uid_store
.capabilities
.lock()
.unwrap()
.iter()
.any(|cap| cap.eq_ignore_ascii_case(b"LIST-STATUS"));
if has_list_status {
conn.send_command(
format!(
"LIST \"{}\" \"\" RETURN (STATUS (MESSAGES UNSEEN))",
mailbox.imap_path()
)
.as_bytes(),
)
.await?;
conn.read_response(
&mut response,
RequiredResponses::LIST_REQUIRED | RequiredResponses::STATUS,
)
.await?;
debug!(
"list return status out: {}",
String::from_utf8_lossy(&response)
);
for l in response.split_rn() {
if !l.starts_with(b"*") {
continue;
}
if let Ok(status) = protocol_parser::status_response(&l).map(|(_, v)| v) {
if Some(mailbox_hash) == status.mailbox {
if let Some(total) = status.messages {
if let Ok(mut exists_lck) = mailbox.exists.lock() {
exists_lck.clear();
exists_lck.set_not_yet_seen(total);
}
}
if let Some(total) = status.unseen {
if let Ok(mut unseen_lck) = mailbox.unseen.lock() {
unseen_lck.clear();
unseen_lck.set_not_yet_seen(total);
}
}
break;
}
}
}
} else {
conn.send_command(b"SEARCH UNSEEN").await?;
conn.read_response(&mut response, RequiredResponses::SEARCH)
.await?;
let unseen_count = protocol_parser::search_results(&response)?.1.len();
if let Ok(mut exists_lck) = mailbox.exists.lock() {
exists_lck.clear();
exists_lck.set_not_yet_seen(select_response.exists);
}
if let Ok(mut unseen_lck) = mailbox.unseen.lock() {
unseen_lck.clear();
unseen_lck.set_not_yet_seen(unseen_count);
}
}
mailbox.set_warm(true);
return Ok(());
}
if select_response.recent > 0 {
/* UID SEARCH RECENT */
conn.send_command(b"UID SEARCH RECENT").await?;
conn.read_response(&mut response, RequiredResponses::SEARCH)
.await?;
let v = protocol_parser::search_results(response.as_slice()).map(|(_, v)| v)?;
if v.is_empty() {
debug!(
"search response was empty: {}",
String::from_utf8_lossy(&response)
);
return Ok(());
}
let mut cmd = "UID FETCH ".to_string();
if v.len() == 1 {
cmd.push_str(&v[0].to_string());
} else {
cmd.push_str(&v[0].to_string());
for n in v.into_iter().skip(1) {
cmd.push(',');
cmd.push_str(&n.to_string());
}
}
cmd.push_str(
" (UID FLAGS ENVELOPE BODY.PEEK[HEADER.FIELDS (REFERENCES)] BODYSTRUCTURE)",
);
conn.send_command(cmd.as_bytes()).await?;
conn.read_response(&mut response, RequiredResponses::FETCH_REQUIRED)
.await?;
} else if select_response.exists > mailbox.exists.lock().unwrap().len() {
conn.send_command(
format!(
"FETCH {}:* (UID FLAGS ENVELOPE BODY.PEEK[HEADER.FIELDS (REFERENCES)] BODYSTRUCTURE)",
std::cmp::max(mailbox.exists.lock().unwrap().len(), 1)
)
.as_bytes(),
)
.await?;
conn.read_response(&mut response, RequiredResponses::FETCH_REQUIRED)
.await?;
} else {
return Ok(());
}
debug!(
"fetch response is {} bytes and {} lines",
response.len(),
String::from_utf8_lossy(&response).lines().count()
);
let (_, mut v, _) = protocol_parser::fetch_responses(&response)?;
debug!("responses len is {}", v.len());
for FetchResponse {
ref uid,
ref mut envelope,
ref mut flags,
ref references,
..
} in v.iter_mut()
{
let uid = uid.unwrap();
let env = envelope.as_mut().unwrap();
env.set_hash(generate_envelope_hash(&mailbox.imap_path(), &uid));
if let Some(value) = references {
env.set_references(value);
}
let mut tag_lck = uid_store.collection.tag_index.write().unwrap();
if let Some((flags, keywords)) = flags {
env.set_flags(*flags);
if !env.is_seen() {
mailbox.unseen.lock().unwrap().insert_new(env.hash());
}
mailbox.exists.lock().unwrap().insert_new(env.hash());
for f in keywords {
let hash = tag_hash!(f);
if !tag_lck.contains_key(&hash) {
tag_lck.insert(hash, f.to_string());
}
env.labels_mut().push(hash);
}
}
}
if uid_store.keep_offline_cache {
if !cache_handle.mailbox_state(mailbox_hash)?.is_none() {
cache_handle
.insert_envelopes(mailbox_hash, &v)
.chain_err_summary(|| {
format!(
"Could not save envelopes in cache for mailbox {}",
mailbox.imap_path()
)
})?;
}
}
for FetchResponse { uid, envelope, .. } in v {
if uid.is_none() || envelope.is_none() {
let mailboxes: HashMap<MailboxHash, ImapMailbox> = {
let mailboxes_lck = timeout(uid_store.timeout, uid_store.mailboxes.lock()).await?;
let mut ret = mailboxes_lck.clone();
ret.retain(|k, _| mailbox_hashes.contains(k));
ret
};
for (h, mailbox) in mailboxes.iter() {
if mailbox_hash == *h {
continue;
}
let uid = uid.unwrap();
if uid_store
.uid_index
.lock()
.unwrap()
.contains_key(&(mailbox_hash, uid))
Self::examine_updates(mailbox, &mut connection, &uid_store).await?;
}
connection.send_command(b"IDLE").await?;
let mut blockn = ImapBlockingConnection::from(connection);
let mut watch = std::time::Instant::now();
/* duration interval to send heartbeat */
const _10_MINS: std::time::Duration = std::time::Duration::from_secs(10 * 60);
/* duration interval to check other mailboxes for changes */
loop {
let line = match timeout(
Some(std::cmp::min(*polling_period, _10_MINS)),
blockn.as_stream(),
)
.await
{
Ok(Some(line)) => line,
Ok(None) => {
debug!("IDLE connection dropped: {:?}", &blockn.err());
blockn.conn.connect().await?;
let mut main_conn_lck = timeout(uid_store.timeout, main_conn.lock()).await?;
main_conn_lck.connect().await?;
continue;
}
Err(_) => {
/* Timeout */
blockn.conn.send_raw(b"DONE").await?;
blockn
.conn
.read_response(&mut response, RequiredResponses::empty())
.await?;
blockn.conn.send_command(b"IDLE").await?;
let mut main_conn_lck = timeout(uid_store.timeout, main_conn.lock()).await?;
main_conn_lck.connect().await?;
continue;
}
};
let now = std::time::Instant::now();
if now.duration_since(watch) >= *polling_period {
/* Time to poll all inboxes */
let mut conn = timeout(uid_store.timeout, main_conn.lock()).await?;
for (_h, mailbox) in mailboxes.iter() {
Self::examine_updates(mailbox, &mut conn, &uid_store).await?;
}
watch = now;
}
if line
.split_rn()
.filter(|l| {
!l.starts_with(b"+ ")
&& !l.starts_with(b"* ok")
&& !l.starts_with(b"* ok")
&& !l.starts_with(b"* Ok")
&& !l.starts_with(b"* OK")
})
.count()
== 0
{
continue;
}
let env = envelope.unwrap();
debug!(
"Create event {} {} {}",
env.hash(),
env.subject(),
mailbox.path(),
);
uid_store
.msn_index
.lock()
.unwrap()
.entry(mailbox_hash)
.or_default()
.push(uid);
uid_store
.hash_index
.lock()
.unwrap()
.insert(env.hash(), (uid, mailbox_hash));
uid_store
.uid_index
.lock()
.unwrap()
.insert((mailbox_hash, uid), env.hash());
conn.add_refresh_event(RefreshEvent {
account_hash: uid_store.account_hash,
mailbox_hash,
kind: Create(Box::new(env)),
});
{
blockn.conn.send_raw(b"DONE").await?;
blockn
.conn
.read_response(&mut response, RequiredResponses::empty())
.await?;
for l in line.split_rn().chain(response.split_rn()) {
debug!("process_untagged {:?}", String::from_utf8_lossy(&l));
if l.starts_with(b"+ ")
|| l.starts_with(b"* ok")
|| l.starts_with(b"* ok")
|| l.starts_with(b"* Ok")
|| l.starts_with(b"* OK")
{
debug!("ignore continuation mark");
continue;
}
blockn.conn.process_untagged(l).await?;
}
blockn.conn.send_command(b"IDLE").await?;
}
}
}
Ok(())
pub async fn poll_with_examine(&mut self) -> Result<()> {
debug!("poll with examine");
let ImapWatcher {
ref mailbox_hashes,
ref uid_store,
ref polling_period,
ref server_conf,
..
} = self;
let mut connection = ImapConnection::new_connection(server_conf, uid_store.clone());
connection.connect().await?;
let mailboxes: HashMap<MailboxHash, ImapMailbox> = {
let mailboxes_lck = timeout(uid_store.timeout, uid_store.mailboxes.lock()).await?;
let mut ret = mailboxes_lck.clone();
ret.retain(|k, _| mailbox_hashes.contains(k));
ret
};
loop {
for (_, mailbox) in mailboxes.iter() {
Self::examine_updates(mailbox, &mut connection, &uid_store).await?;
}
crate::connections::sleep(*polling_period).await;
}
}
pub async fn examine_updates(
mailbox: &ImapMailbox,
conn: &mut ImapConnection,
uid_store: &Arc<UIDStore>,
) -> Result<()> {
if mailbox.no_select {
return Ok(());
}
let mailbox_hash = mailbox.hash();
debug!("examining mailbox {} {}", mailbox_hash, mailbox.path());
if let Some(new_envelopes) = conn.resync(mailbox_hash).await? {
for env in new_envelopes {
conn.add_refresh_event(RefreshEvent {
mailbox_hash,
account_hash: uid_store.account_hash,
kind: RefreshEventKind::Create(Box::new(env)),
});
}
} else {
#[cfg(not(feature = "sqlite3"))]
let mut cache_handle = super::cache::DefaultCache::get(uid_store.clone())?;
#[cfg(feature = "sqlite3")]
let mut cache_handle = super::cache::Sqlite3Cache::get(uid_store.clone())?;
let mut response = Vec::with_capacity(8 * 1024);
let select_response = conn
.examine_mailbox(mailbox_hash, &mut response, true)
.await?
.unwrap();
{
let mut uidvalidities = uid_store.uidvalidity.lock().unwrap();
if let Some(v) = uidvalidities.get(&mailbox_hash) {
if *v != select_response.uidvalidity {
if uid_store.keep_offline_cache {
cache_handle.clear(mailbox_hash, &select_response)?;
}
conn.add_refresh_event(RefreshEvent {
account_hash: uid_store.account_hash,
mailbox_hash,
kind: RefreshEventKind::Rescan,
});
/*
uid_store.uid_index.lock().unwrap().clear();
uid_store.hash_index.lock().unwrap().clear();
uid_store.byte_cache.lock().unwrap().clear();
*/
return Ok(());
}
} else {
uidvalidities.insert(mailbox_hash, select_response.uidvalidity);
}
}
if mailbox.is_cold() {
/* Mailbox hasn't been loaded yet */
let has_list_status: bool = conn
.uid_store
.capabilities
.lock()
.unwrap()
.iter()
.any(|cap| cap.eq_ignore_ascii_case(b"LIST-STATUS"));
if has_list_status {
conn.send_command(
format!(
"LIST \"{}\" \"\" RETURN (STATUS (MESSAGES UNSEEN))",
mailbox.imap_path()
)
.as_bytes(),
)
.await?;
conn.read_response(
&mut response,
RequiredResponses::LIST_REQUIRED | RequiredResponses::STATUS,
)
.await?;
debug!(
"list return status out: {}",
String::from_utf8_lossy(&response)
);
for l in response.split_rn() {
if !l.starts_with(b"*") {
continue;
}
if let Ok(status) = protocol_parser::status_response(&l).map(|(_, v)| v) {
if Some(mailbox_hash) == status.mailbox {
if let Some(total) = status.messages {
if let Ok(mut exists_lck) = mailbox.exists.lock() {
exists_lck.clear();
exists_lck.set_not_yet_seen(total);
}
}
if let Some(total) = status.unseen {
if let Ok(mut unseen_lck) = mailbox.unseen.lock() {
unseen_lck.clear();
unseen_lck.set_not_yet_seen(total);
}
}
break;
}
}
}
} else {
conn.send_command(b"SEARCH UNSEEN").await?;
conn.read_response(&mut response, RequiredResponses::SEARCH)
.await?;
let unseen_count = protocol_parser::search_results(&response)?.1.len();
if let Ok(mut exists_lck) = mailbox.exists.lock() {
exists_lck.clear();
exists_lck.set_not_yet_seen(select_response.exists);
}
if let Ok(mut unseen_lck) = mailbox.unseen.lock() {
unseen_lck.clear();
unseen_lck.set_not_yet_seen(unseen_count);
}
}
mailbox.set_warm(true);
return Ok(());
}
if select_response.recent > 0 {
/* UID SEARCH RECENT */
conn.send_command(b"UID SEARCH RECENT").await?;
conn.read_response(&mut response, RequiredResponses::SEARCH)
.await?;
let v = protocol_parser::search_results(response.as_slice()).map(|(_, v)| v)?;
if v.is_empty() {
debug!(
"search response was empty: {}",
String::from_utf8_lossy(&response)
);
return Ok(());
}
let mut cmd = "UID FETCH ".to_string();
if v.len() == 1 {
cmd.push_str(&v[0].to_string());
} else {
cmd.push_str(&v[0].to_string());
for n in v.into_iter().skip(1) {
cmd.push(',');
cmd.push_str(&n.to_string());
}
}
cmd.push_str(
" (UID FLAGS ENVELOPE BODY.PEEK[HEADER.FIELDS (REFERENCES)] BODYSTRUCTURE)",
);
conn.send_command(cmd.as_bytes()).await?;
conn.read_response(&mut response, RequiredResponses::FETCH_REQUIRED)
.await?;
} else if select_response.exists > mailbox.exists.lock().unwrap().len() {
conn.send_command(
format!(
"FETCH {}:* (UID FLAGS ENVELOPE BODY.PEEK[HEADER.FIELDS (REFERENCES)] BODYSTRUCTURE)",
std::cmp::max(mailbox.exists.lock().unwrap().len(), 1)
)
.as_bytes(),
)
.await?;
conn.read_response(&mut response, RequiredResponses::FETCH_REQUIRED)
.await?;
} else {
return Ok(());
}
debug!(
"fetch response is {} bytes and {} lines",
response.len(),
String::from_utf8_lossy(&response).lines().count()
);
let (_, mut v, _) = protocol_parser::fetch_responses(&response)?;
debug!("responses len is {}", v.len());
for FetchResponse {
ref uid,
ref mut envelope,
ref mut flags,
ref references,
..
} in v.iter_mut()
{
let uid = uid.unwrap();
let env = envelope.as_mut().unwrap();
env.set_hash(generate_envelope_hash(&mailbox.imap_path(), &uid));
if let Some(value) = references {
env.set_references(value);
}
let mut tag_lck = uid_store.collection.tag_index.write().unwrap();
if let Some((flags, keywords)) = flags {
env.set_flags(*flags);
if !env.is_seen() {
mailbox.unseen.lock().unwrap().insert_new(env.hash());
}
mailbox.exists.lock().unwrap().insert_new(env.hash());
for f in keywords {
let hash = tag_hash!(f);
if !tag_lck.contains_key(&hash) {
tag_lck.insert(hash, f.to_string());
}
env.labels_mut().push(hash);
}
}
}
if uid_store.keep_offline_cache {
if !cache_handle.mailbox_state(mailbox_hash)?.is_none() {
cache_handle
.insert_envelopes(mailbox_hash, &v)
.chain_err_summary(|| {
format!(
"Could not save envelopes in cache for mailbox {}",
mailbox.imap_path()
)
})?;
}
}
for FetchResponse { uid, envelope, .. } in v {
if uid.is_none() || envelope.is_none() {
continue;
}
let uid = uid.unwrap();
if uid_store
.uid_index
.lock()
.unwrap()
.contains_key(&(mailbox_hash, uid))
{
continue;
}
let env = envelope.unwrap();
debug!(
"Create event {} {} {}",
env.hash(),
env.subject(),
mailbox.path(),
);
uid_store
.msn_index
.lock()
.unwrap()
.entry(mailbox_hash)
.or_default()
.push(uid);
uid_store
.hash_index
.lock()
.unwrap()
.insert(env.hash(), (uid, mailbox_hash));
uid_store
.uid_index
.lock()
.unwrap()
.insert((mailbox_hash, uid), env.hash());
conn.add_refresh_event(RefreshEvent {
account_hash: uid_store.account_hash,
mailbox_hash,
kind: Create(Box::new(env)),
});
}
}
Ok(())
}
}

View File

@ -90,6 +90,8 @@ use objects::*;
pub mod mailbox;
use mailbox::*;
pub mod watch;
#[derive(Debug, Default)]
pub struct EnvelopeCache {
bytes: Option<String>,
@ -313,6 +315,147 @@ impl MailBackend for JmapType {
}))
}
fn load(&mut self, mailbox_hash: MailboxHash) -> ResultFuture<()> {
let store = self.store.clone();
let connection = self.connection.clone();
Ok(Box::pin(async move {
{
crate::connections::sleep(std::time::Duration::from_secs(2)).await;
}
let mailbox_id = store.mailboxes.read().unwrap()[&mailbox_hash].id.clone();
let email_query_call: EmailQuery = EmailQuery::new(
Query::new()
.account_id(conn.mail_account_id().clone())
.filter(Some(Filter::Condition(
EmailFilterCondition::new()
.in_mailbox(Some(mailbox_id))
.into(),
)))
.position(0)
.properties(Some(vec![
"id".to_string(),
"receivedAt".to_string(),
"messageId".to_string(),
"inReplyTo".to_string(),
"hasAttachment".to_string(),
"keywords".to_string(),
])),
)
.collapse_threads(false);
let mut req = Request::new(conn.request_no.clone());
let prev_seq = req.add_call(&email_query_call);
let email_call: EmailGet = EmailGet::new(
Get::new()
.ids(Some(JmapArgument::reference(
prev_seq,
EmailQuery::RESULT_FIELD_IDS,
)))
.account_id(conn.mail_account_id().clone()),
);
req.add_call(&email_call);
let api_url = conn.session.lock().unwrap().api_url.clone();
let mut res = conn
.client
.post_async(api_url.as_str(), serde_json::to_string(&req)?)
.await?;
let res_text = res.text_async().await?;
let mut v: MethodResponse = serde_json::from_str(&res_text).unwrap();
let e = GetResponse::<EmailObject>::try_from(v.method_responses.pop().unwrap())?;
let query_response =
QueryResponse::<EmailObject>::try_from(v.method_responses.pop().unwrap())?;
store
.mailboxes
.write()
.unwrap()
.entry(mailbox_hash)
.and_modify(|mbox| {
*mbox.email_query_state.lock().unwrap() = Some(query_response.query_state);
});
let GetResponse::<EmailObject> { list, state, .. } = e;
{
let (is_empty, is_equal) = {
let mailboxes_lck = conn.store.mailboxes.read().unwrap();
mailboxes_lck
.get(&mailbox_hash)
.map(|mbox| {
let current_state_lck = mbox.email_state.lock().unwrap();
(
current_state_lck.is_none(),
current_state_lck.as_ref() != Some(&state),
)
})
.unwrap_or((true, true))
};
if is_empty {
let mut mailboxes_lck = conn.store.mailboxes.write().unwrap();
debug!("{:?}: inserting state {}", EmailObject::NAME, &state);
mailboxes_lck.entry(mailbox_hash).and_modify(|mbox| {
*mbox.email_state.lock().unwrap() = Some(state);
});
} else if !is_equal {
conn.email_changes(mailbox_hash).await?;
}
}
let mut total = BTreeSet::default();
let mut unread = BTreeSet::default();
let new_envelopes: HashMap<EnvelopeHash, Envelope> = list
.into_iter(|obj| {
let env = store.add_envelope(obj);
total.insert(env.hash());
if !env.is_seen() {
unread.insert(env.hash());
}
(env.hash(), env)
})
.collect();
let mut mailboxes_lck = store.mailboxes.write().unwrap();
mailboxes_lck.entry(mailbox_hash).and_modify(|mbox| {
mbox.total_emails.lock().unwrap().insert_existing_set(total);
mbox.unread_emails
.lock()
.unwrap()
.insert_existing_set(unread);
});
let keys: BTreeSet<EnvelopeHash> = new_envelopes.keys().cloned().collect();
collection.merge(new_envelopes, mailbox_hash, None);
let mut envelopes_lck = collection.envelopes.write().unwrap();
envelopes_lck.retain(|k, _| !keys.contains(k));
Ok(())
}))
}
fn fetch_batch(&mut self, env_hashes: EnvelopeHashBatch) -> ResultFuture<()> {
todo!()
/*
let store = self.store.clone();
let connection = self.connection.clone();
Ok(Box::pin(async move {
//crate::connections::sleep(std::time::Duration::from_secs(2)).await;
debug!("fetch_batch {:?}", &env_hashes);
let mut envelopes_lck = collection.envelopes.write().unwrap();
for env_hash in env_hashes.iter() {
if envelopes_lck.contains_key(&env_hash) {
continue;
}
let index_lck = index.write().unwrap();
let message = Message::find_message(&database, &index_lck[&env_hash])?;
drop(index_lck);
let env = message.into_envelope(&index, &collection.tag_index);
envelopes_lck.insert(env_hash, env);
}
debug!("fetch_batch {:?} done", &env_hashes);
Ok(())
}))
*/
}
fn fetch(
&mut self,
mailbox_hash: MailboxHash,
@ -341,32 +484,12 @@ impl MailBackend for JmapType {
}))
}
fn watch(&self) -> ResultFuture<()> {
fn watcher(&self) -> Result<Box<dyn BackendWatcher>> {
let connection = self.connection.clone();
let store = self.store.clone();
Ok(Box::pin(async move {
{
let mut conn = connection.lock().await;
conn.connect().await?;
}
loop {
{
let mailbox_hashes = {
store
.mailboxes
.read()
.unwrap()
.keys()
.cloned()
.collect::<SmallVec<[MailboxHash; 16]>>()
};
let conn = connection.lock().await;
for mailbox_hash in mailbox_hashes {
conn.email_changes(mailbox_hash).await?;
}
}
crate::connections::sleep(std::time::Duration::from_secs(60)).await;
}
Ok(Box::new(watch::JmapWatcher {
connection,
mailbox_hashes: BTreeSet::default(),
polling_period: std::time::Duration::from_secs(60),
}))
}