Compare commits
2 Commits
master
...
lazy_fetch
Author | SHA1 | Date |
---|---|---|
|
77e4488637 | |
|
819d993f11 |
|
@ -310,13 +310,26 @@ pub trait MailBackend: ::std::fmt::Debug + Send + Sync {
|
|||
Ok(Box::pin(async { Ok(()) }))
|
||||
}
|
||||
|
||||
fn fetch_batch(&mut self, env_hashes: EnvelopeHashBatch) -> ResultFuture<()> {
|
||||
Err(MeliError::new("Unimplemented."))
|
||||
}
|
||||
|
||||
fn load(&mut self, mailbox_hash: MailboxHash) -> ResultFuture<()> {
|
||||
Err(MeliError::new("Unimplemented."))
|
||||
}
|
||||
|
||||
fn fetch(
|
||||
&mut self,
|
||||
mailbox_hash: MailboxHash,
|
||||
) -> Result<Pin<Box<dyn Stream<Item = Result<Vec<Envelope>>> + Send + 'static>>>;
|
||||
|
||||
fn refresh(&mut self, mailbox_hash: MailboxHash) -> ResultFuture<()>;
|
||||
fn watch(&self) -> ResultFuture<()>;
|
||||
|
||||
/// Return a [`Box<dyn BackendWatcher>`](BackendWatcher), to which you can register the
|
||||
/// mailboxes you are interested in for updates and then consume to spawn a watching `Future`.
|
||||
/// The `Future` sends events to the [`BackendEventConsumer`](BackendEventConsumer) supplied to
|
||||
/// the backend in its constructor method.
|
||||
fn watcher(&self) -> Result<Box<dyn BackendWatcher>>;
|
||||
fn mailboxes(&self) -> ResultFuture<HashMap<MailboxHash, Mailbox>>;
|
||||
fn operation(&self, hash: EnvelopeHash) -> Result<Box<dyn BackendOp>>;
|
||||
|
||||
|
@ -623,7 +636,7 @@ impl EnvelopeHashBatch {
|
|||
#[derive(Default, Clone)]
|
||||
pub struct LazyCountSet {
|
||||
not_yet_seen: usize,
|
||||
set: BTreeSet<EnvelopeHash>,
|
||||
pub set: BTreeSet<EnvelopeHash>,
|
||||
}
|
||||
|
||||
impl fmt::Debug for LazyCountSet {
|
||||
|
@ -711,3 +724,31 @@ impl std::ops::Deref for IsSubscribedFn {
|
|||
&self.0
|
||||
}
|
||||
}
|
||||
|
||||
/// Urgency for the events of a single Mailbox.
|
||||
#[derive(Debug, Copy, Clone, PartialEq)]
|
||||
pub enum MailboxWatchUrgency {
|
||||
High,
|
||||
Medium,
|
||||
Low,
|
||||
}
|
||||
|
||||
/// Register the mailboxes you are interested in for updates and then consume with `spawn` to spawn
|
||||
/// a watching `Future`. The `Future` sends events to the
|
||||
/// [`BackendEventConsumer`](backends::BackendEventConsumer) supplied to the backend in its constructor
|
||||
/// method.
|
||||
pub trait BackendWatcher: ::std::fmt::Debug + Send + Sync {
|
||||
/// Whether the watcher's `Future` requires blocking I/O.
|
||||
fn is_blocking(&self) -> bool;
|
||||
fn register_mailbox(
|
||||
&mut self,
|
||||
mailbox_hash: MailboxHash,
|
||||
urgency: MailboxWatchUrgency,
|
||||
) -> Result<()>;
|
||||
fn set_polling_period(&mut self, period: Option<std::time::Duration>) -> Result<()>;
|
||||
fn spawn(self: Box<Self>) -> ResultFuture<()>;
|
||||
/// Use the [`Any`](std::any::Any) trait to get the underlying type implementing the
|
||||
/// [`BackendWatcher`](backends::BackendEventConsumer) trait.
|
||||
fn as_any(&self) -> &dyn Any;
|
||||
fn as_any_mut(&mut self) -> &mut dyn Any;
|
||||
}
|
||||
|
|
|
@ -370,13 +370,13 @@ impl MailBackend for ImapType {
|
|||
let main_conn = self.connection.clone();
|
||||
let uid_store = self.uid_store.clone();
|
||||
Ok(Box::pin(async move {
|
||||
let inbox = timeout(uid_store.timeout, uid_store.mailboxes.lock())
|
||||
let mut inbox = timeout(uid_store.timeout, uid_store.mailboxes.lock())
|
||||
.await?
|
||||
.get(&mailbox_hash)
|
||||
.map(std::clone::Clone::clone)
|
||||
.unwrap();
|
||||
let mut conn = timeout(uid_store.timeout, main_conn.lock()).await?;
|
||||
watch::examine_updates(inbox, &mut conn, &uid_store).await?;
|
||||
watch::ImapWatcher::examine_updates(&mut inbox, &mut conn, &uid_store).await?;
|
||||
Ok(())
|
||||
}))
|
||||
}
|
||||
|
@ -450,68 +450,16 @@ impl MailBackend for ImapType {
|
|||
}))
|
||||
}
|
||||
|
||||
fn watch(&self) -> ResultFuture<()> {
|
||||
fn watcher(&self) -> Result<Box<dyn BackendWatcher>> {
|
||||
let server_conf = self.server_conf.clone();
|
||||
let main_conn = self.connection.clone();
|
||||
let uid_store = self.uid_store.clone();
|
||||
Ok(Box::pin(async move {
|
||||
let has_idle: bool = match server_conf.protocol {
|
||||
ImapProtocol::IMAP {
|
||||
extension_use: ImapExtensionUse { idle, .. },
|
||||
} => {
|
||||
idle && uid_store
|
||||
.capabilities
|
||||
.lock()
|
||||
.unwrap()
|
||||
.iter()
|
||||
.any(|cap| cap.eq_ignore_ascii_case(b"IDLE"))
|
||||
}
|
||||
_ => false,
|
||||
};
|
||||
while let Err(err) = if has_idle {
|
||||
idle(ImapWatchKit {
|
||||
conn: ImapConnection::new_connection(&server_conf, uid_store.clone()),
|
||||
main_conn: main_conn.clone(),
|
||||
uid_store: uid_store.clone(),
|
||||
})
|
||||
.await
|
||||
} else {
|
||||
poll_with_examine(ImapWatchKit {
|
||||
conn: ImapConnection::new_connection(&server_conf, uid_store.clone()),
|
||||
main_conn: main_conn.clone(),
|
||||
uid_store: uid_store.clone(),
|
||||
})
|
||||
.await
|
||||
} {
|
||||
let mut main_conn_lck = timeout(uid_store.timeout, main_conn.lock()).await?;
|
||||
if err.kind.is_network() {
|
||||
uid_store.is_online.lock().unwrap().1 = Err(err.clone());
|
||||
} else {
|
||||
return Err(err);
|
||||
}
|
||||
debug!("Watch failure: {}", err.to_string());
|
||||
match timeout(uid_store.timeout, main_conn_lck.connect())
|
||||
.await
|
||||
.and_then(|res| res)
|
||||
{
|
||||
Err(err2) => {
|
||||
debug!("Watch reconnect attempt failed: {}", err2.to_string());
|
||||
}
|
||||
Ok(()) => {
|
||||
debug!("Watch reconnect attempt succesful");
|
||||
continue;
|
||||
}
|
||||
}
|
||||
let account_hash = uid_store.account_hash;
|
||||
main_conn_lck.add_refresh_event(RefreshEvent {
|
||||
account_hash,
|
||||
mailbox_hash: 0,
|
||||
kind: RefreshEventKind::Failure(err.clone()),
|
||||
});
|
||||
return Err(err);
|
||||
}
|
||||
debug!("watch future returning");
|
||||
Ok(())
|
||||
Ok(Box::new(ImapWatcher {
|
||||
main_conn,
|
||||
uid_store,
|
||||
mailbox_hashes: BTreeSet::default(),
|
||||
polling_period: std::time::Duration::from_secs(5 * 60),
|
||||
server_conf,
|
||||
}))
|
||||
}
|
||||
|
||||
|
|
|
@ -588,7 +588,9 @@ impl ImapConnection {
|
|||
pub fn connect<'a>(&'a mut self) -> Pin<Box<dyn Future<Output = Result<()>> + Send + 'a>> {
|
||||
Box::pin(async move {
|
||||
if let (time, ref mut status @ Ok(())) = *self.uid_store.is_online.lock().unwrap() {
|
||||
if SystemTime::now().duration_since(time).unwrap_or_default() >= IMAP_PROTOCOL_TIMEOUT {
|
||||
if SystemTime::now().duration_since(time).unwrap_or_default()
|
||||
>= IMAP_PROTOCOL_TIMEOUT
|
||||
{
|
||||
let err = MeliError::new("Connection timed out").set_kind(ErrorKind::Timeout);
|
||||
*status = Err(err.clone());
|
||||
self.stream = Err(err);
|
||||
|
|
|
@ -23,201 +23,129 @@ use crate::backends::SpecialUsageMailbox;
|
|||
use std::sync::Arc;
|
||||
|
||||
/// Arguments for IMAP watching functions
|
||||
pub struct ImapWatchKit {
|
||||
pub conn: ImapConnection,
|
||||
#[derive(Debug)]
|
||||
pub struct ImapWatcher {
|
||||
pub main_conn: Arc<FutureMutex<ImapConnection>>,
|
||||
pub uid_store: Arc<UIDStore>,
|
||||
pub mailbox_hashes: BTreeSet<MailboxHash>,
|
||||
pub polling_period: std::time::Duration,
|
||||
pub server_conf: ImapServerConf,
|
||||
}
|
||||
|
||||
pub async fn poll_with_examine(kit: ImapWatchKit) -> Result<()> {
|
||||
debug!("poll with examine");
|
||||
let ImapWatchKit {
|
||||
mut conn,
|
||||
main_conn: _,
|
||||
uid_store,
|
||||
} = kit;
|
||||
conn.connect().await?;
|
||||
let mailboxes: HashMap<MailboxHash, ImapMailbox> = {
|
||||
let mailboxes_lck = timeout(uid_store.timeout, uid_store.mailboxes.lock()).await?;
|
||||
mailboxes_lck.clone()
|
||||
};
|
||||
loop {
|
||||
for (_, mailbox) in mailboxes.clone() {
|
||||
examine_updates(mailbox, &mut conn, &uid_store).await?;
|
||||
}
|
||||
//FIXME: make sleep duration configurable
|
||||
smol::Timer::after(std::time::Duration::from_secs(3 * 60)).await;
|
||||
impl BackendWatcher for ImapWatcher {
|
||||
fn is_blocking(&self) -> bool {
|
||||
false
|
||||
}
|
||||
}
|
||||
|
||||
pub async fn idle(kit: ImapWatchKit) -> Result<()> {
|
||||
debug!("IDLE");
|
||||
/* IDLE only watches the connection's selected mailbox. We will IDLE on INBOX and every ~5
|
||||
* minutes wake up and poll the others */
|
||||
let ImapWatchKit {
|
||||
mut conn,
|
||||
main_conn,
|
||||
uid_store,
|
||||
} = kit;
|
||||
conn.connect().await?;
|
||||
let mailbox: ImapMailbox = match uid_store
|
||||
.mailboxes
|
||||
.lock()
|
||||
.await
|
||||
.values()
|
||||
.find(|f| f.parent.is_none() && (f.special_usage() == SpecialUsageMailbox::Inbox))
|
||||
.map(std::clone::Clone::clone)
|
||||
{
|
||||
Some(mailbox) => mailbox,
|
||||
None => {
|
||||
return Err(MeliError::new("INBOX mailbox not found in local mailbox index. meli may have not parsed the IMAP mailboxes correctly"));
|
||||
fn register_mailbox(
|
||||
&mut self,
|
||||
mailbox_hash: MailboxHash,
|
||||
_urgency: MailboxWatchUrgency,
|
||||
) -> Result<()> {
|
||||
self.mailbox_hashes.insert(mailbox_hash);
|
||||
Ok(())
|
||||
}
|
||||
|
||||
fn set_polling_period(&mut self, period: Option<std::time::Duration>) -> Result<()> {
|
||||
if let Some(period) = period {
|
||||
self.polling_period = period;
|
||||
}
|
||||
};
|
||||
let mailbox_hash = mailbox.hash();
|
||||
let mut response = Vec::with_capacity(8 * 1024);
|
||||
let select_response = conn
|
||||
.examine_mailbox(mailbox_hash, &mut response, true)
|
||||
.await?
|
||||
.unwrap();
|
||||
{
|
||||
let mut uidvalidities = uid_store.uidvalidity.lock().unwrap();
|
||||
Ok(())
|
||||
}
|
||||
|
||||
if let Some(v) = uidvalidities.get(&mailbox_hash) {
|
||||
if *v != select_response.uidvalidity {
|
||||
if uid_store.keep_offline_cache {
|
||||
#[cfg(not(feature = "sqlite3"))]
|
||||
let mut cache_handle = super::cache::DefaultCache::get(uid_store.clone())?;
|
||||
#[cfg(feature = "sqlite3")]
|
||||
let mut cache_handle = super::cache::Sqlite3Cache::get(uid_store.clone())?;
|
||||
cache_handle.clear(mailbox_hash, &select_response)?;
|
||||
fn spawn(mut self: Box<Self>) -> ResultFuture<()> {
|
||||
Ok(Box::pin(async move {
|
||||
let has_idle: bool = match self.server_conf.protocol {
|
||||
ImapProtocol::IMAP {
|
||||
extension_use: ImapExtensionUse { idle, .. },
|
||||
} => {
|
||||
idle && self
|
||||
.uid_store
|
||||
.capabilities
|
||||
.lock()
|
||||
.unwrap()
|
||||
.iter()
|
||||
.any(|cap| cap.eq_ignore_ascii_case(b"IDLE"))
|
||||
}
|
||||
conn.add_refresh_event(RefreshEvent {
|
||||
account_hash: uid_store.account_hash,
|
||||
mailbox_hash,
|
||||
kind: RefreshEventKind::Rescan,
|
||||
_ => false,
|
||||
};
|
||||
while let Err(err) = if has_idle {
|
||||
self.idle().await
|
||||
} else {
|
||||
self.poll_with_examine().await
|
||||
} {
|
||||
let mut main_conn_lck =
|
||||
timeout(self.uid_store.timeout, self.main_conn.lock()).await?;
|
||||
if err.kind.is_network() {
|
||||
self.uid_store.is_online.lock().unwrap().1 = Err(err.clone());
|
||||
} else {
|
||||
return Err(err);
|
||||
}
|
||||
debug!("Watch failure: {}", err.to_string());
|
||||
match timeout(self.uid_store.timeout, main_conn_lck.connect())
|
||||
.await
|
||||
.and_then(|res| res)
|
||||
{
|
||||
Err(err2) => {
|
||||
debug!("Watch reconnect attempt failed: {}", err2.to_string());
|
||||
}
|
||||
Ok(()) => {
|
||||
debug!("Watch reconnect attempt succesful");
|
||||
continue;
|
||||
}
|
||||
}
|
||||
let account_hash = self.uid_store.account_hash;
|
||||
main_conn_lck.add_refresh_event(RefreshEvent {
|
||||
account_hash,
|
||||
mailbox_hash: 0,
|
||||
kind: RefreshEventKind::Failure(err.clone()),
|
||||
});
|
||||
/*
|
||||
uid_store.uid_index.lock().unwrap().clear();
|
||||
uid_store.hash_index.lock().unwrap().clear();
|
||||
uid_store.byte_cache.lock().unwrap().clear();
|
||||
*/
|
||||
return Err(err);
|
||||
}
|
||||
} else {
|
||||
uidvalidities.insert(mailbox_hash, select_response.uidvalidity);
|
||||
}
|
||||
debug!("watch future returning");
|
||||
Ok(())
|
||||
}))
|
||||
}
|
||||
let mailboxes: HashMap<MailboxHash, ImapMailbox> = {
|
||||
let mailboxes_lck = timeout(uid_store.timeout, uid_store.mailboxes.lock()).await?;
|
||||
mailboxes_lck.clone()
|
||||
};
|
||||
for (h, mailbox) in mailboxes.clone() {
|
||||
if mailbox_hash == h {
|
||||
continue;
|
||||
}
|
||||
examine_updates(mailbox, &mut conn, &uid_store).await?;
|
||||
|
||||
fn as_any(&self) -> &dyn Any {
|
||||
self
|
||||
}
|
||||
conn.send_command(b"IDLE").await?;
|
||||
let mut blockn = ImapBlockingConnection::from(conn);
|
||||
let mut watch = std::time::Instant::now();
|
||||
/* duration interval to send heartbeat */
|
||||
const _10_MINS: std::time::Duration = std::time::Duration::from_secs(10 * 60);
|
||||
/* duration interval to check other mailboxes for changes */
|
||||
const _5_MINS: std::time::Duration = std::time::Duration::from_secs(5 * 60);
|
||||
loop {
|
||||
let line = match timeout(Some(_10_MINS), blockn.as_stream()).await {
|
||||
Ok(Some(line)) => line,
|
||||
Ok(None) => {
|
||||
debug!("IDLE connection dropped: {:?}", &blockn.err());
|
||||
blockn.conn.connect().await?;
|
||||
let mut main_conn_lck = timeout(uid_store.timeout, main_conn.lock()).await?;
|
||||
main_conn_lck.connect().await?;
|
||||
continue;
|
||||
}
|
||||
Err(_) => {
|
||||
/* Timeout */
|
||||
blockn.conn.send_raw(b"DONE").await?;
|
||||
blockn
|
||||
.conn
|
||||
.read_response(&mut response, RequiredResponses::empty())
|
||||
.await?;
|
||||
blockn.conn.send_command(b"IDLE").await?;
|
||||
let mut main_conn_lck = timeout(uid_store.timeout, main_conn.lock()).await?;
|
||||
main_conn_lck.connect().await?;
|
||||
continue;
|
||||
|
||||
fn as_any_mut(&mut self) -> &mut dyn Any {
|
||||
self
|
||||
}
|
||||
}
|
||||
|
||||
impl ImapWatcher {
|
||||
pub async fn idle(&mut self) -> Result<()> {
|
||||
debug!("IDLE");
|
||||
/* IDLE only watches the connection's selected mailbox. We will IDLE on INBOX and every X
|
||||
* minutes wake up and poll the others */
|
||||
let ImapWatcher {
|
||||
ref main_conn,
|
||||
ref uid_store,
|
||||
ref mailbox_hashes,
|
||||
ref polling_period,
|
||||
ref server_conf,
|
||||
..
|
||||
} = self;
|
||||
let mut connection = ImapConnection::new_connection(server_conf, uid_store.clone());
|
||||
connection.connect().await?;
|
||||
let mailbox_hash: MailboxHash = match uid_store
|
||||
.mailboxes
|
||||
.lock()
|
||||
.await
|
||||
.values()
|
||||
.find(|f| f.parent.is_none() && (f.special_usage() == SpecialUsageMailbox::Inbox))
|
||||
.map(|f| f.hash)
|
||||
{
|
||||
Some(h) => h,
|
||||
None => {
|
||||
return Err(MeliError::new("INBOX mailbox not found in local mailbox index. meli may have not parsed the IMAP mailboxes correctly"));
|
||||
}
|
||||
};
|
||||
let now = std::time::Instant::now();
|
||||
if now.duration_since(watch) >= _5_MINS {
|
||||
/* Time to poll all inboxes */
|
||||
let mut conn = timeout(uid_store.timeout, main_conn.lock()).await?;
|
||||
for (_h, mailbox) in mailboxes.clone() {
|
||||
examine_updates(mailbox, &mut conn, &uid_store).await?;
|
||||
}
|
||||
watch = now;
|
||||
}
|
||||
if line
|
||||
.split_rn()
|
||||
.filter(|l| {
|
||||
!l.starts_with(b"+ ")
|
||||
&& !l.starts_with(b"* ok")
|
||||
&& !l.starts_with(b"* ok")
|
||||
&& !l.starts_with(b"* Ok")
|
||||
&& !l.starts_with(b"* OK")
|
||||
})
|
||||
.count()
|
||||
== 0
|
||||
{
|
||||
continue;
|
||||
}
|
||||
{
|
||||
blockn.conn.send_raw(b"DONE").await?;
|
||||
blockn
|
||||
.conn
|
||||
.read_response(&mut response, RequiredResponses::empty())
|
||||
.await?;
|
||||
for l in line.split_rn().chain(response.split_rn()) {
|
||||
debug!("process_untagged {:?}", &l);
|
||||
if l.starts_with(b"+ ")
|
||||
|| l.starts_with(b"* ok")
|
||||
|| l.starts_with(b"* ok")
|
||||
|| l.starts_with(b"* Ok")
|
||||
|| l.starts_with(b"* OK")
|
||||
{
|
||||
debug!("ignore continuation mark");
|
||||
continue;
|
||||
}
|
||||
blockn.conn.process_untagged(l).await?;
|
||||
}
|
||||
blockn.conn.send_command(b"IDLE").await?;
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
pub async fn examine_updates(
|
||||
mailbox: ImapMailbox,
|
||||
conn: &mut ImapConnection,
|
||||
uid_store: &Arc<UIDStore>,
|
||||
) -> Result<()> {
|
||||
if mailbox.no_select {
|
||||
return Ok(());
|
||||
}
|
||||
let mailbox_hash = mailbox.hash();
|
||||
debug!("examining mailbox {} {}", mailbox_hash, mailbox.path());
|
||||
if let Some(new_envelopes) = conn.resync(mailbox_hash).await? {
|
||||
for env in new_envelopes {
|
||||
conn.add_refresh_event(RefreshEvent {
|
||||
mailbox_hash,
|
||||
account_hash: uid_store.account_hash,
|
||||
kind: RefreshEventKind::Create(Box::new(env)),
|
||||
});
|
||||
}
|
||||
} else {
|
||||
#[cfg(not(feature = "sqlite3"))]
|
||||
let mut cache_handle = super::cache::DefaultCache::get(uid_store.clone())?;
|
||||
#[cfg(feature = "sqlite3")]
|
||||
let mut cache_handle = super::cache::Sqlite3Cache::get(uid_store.clone())?;
|
||||
let mut response = Vec::with_capacity(8 * 1024);
|
||||
let select_response = conn
|
||||
let select_response = connection
|
||||
.examine_mailbox(mailbox_hash, &mut response, true)
|
||||
.await?
|
||||
.unwrap();
|
||||
|
@ -227,9 +155,13 @@ pub async fn examine_updates(
|
|||
if let Some(v) = uidvalidities.get(&mailbox_hash) {
|
||||
if *v != select_response.uidvalidity {
|
||||
if uid_store.keep_offline_cache {
|
||||
#[cfg(not(feature = "sqlite3"))]
|
||||
let mut cache_handle = super::cache::DefaultCache::get(uid_store.clone())?;
|
||||
#[cfg(feature = "sqlite3")]
|
||||
let mut cache_handle = super::cache::Sqlite3Cache::get(uid_store.clone())?;
|
||||
cache_handle.clear(mailbox_hash, &select_response)?;
|
||||
}
|
||||
conn.add_refresh_event(RefreshEvent {
|
||||
connection.add_refresh_event(RefreshEvent {
|
||||
account_hash: uid_store.account_hash,
|
||||
mailbox_hash,
|
||||
kind: RefreshEventKind::Rescan,
|
||||
|
@ -239,215 +171,384 @@ pub async fn examine_updates(
|
|||
uid_store.hash_index.lock().unwrap().clear();
|
||||
uid_store.byte_cache.lock().unwrap().clear();
|
||||
*/
|
||||
return Ok(());
|
||||
}
|
||||
} else {
|
||||
uidvalidities.insert(mailbox_hash, select_response.uidvalidity);
|
||||
}
|
||||
}
|
||||
if mailbox.is_cold() {
|
||||
/* Mailbox hasn't been loaded yet */
|
||||
let has_list_status: bool = conn
|
||||
.uid_store
|
||||
.capabilities
|
||||
.lock()
|
||||
.unwrap()
|
||||
.iter()
|
||||
.any(|cap| cap.eq_ignore_ascii_case(b"LIST-STATUS"));
|
||||
if has_list_status {
|
||||
conn.send_command(
|
||||
format!(
|
||||
"LIST \"{}\" \"\" RETURN (STATUS (MESSAGES UNSEEN))",
|
||||
mailbox.imap_path()
|
||||
)
|
||||
.as_bytes(),
|
||||
)
|
||||
.await?;
|
||||
conn.read_response(
|
||||
&mut response,
|
||||
RequiredResponses::LIST_REQUIRED | RequiredResponses::STATUS,
|
||||
)
|
||||
.await?;
|
||||
debug!(
|
||||
"list return status out: {}",
|
||||
String::from_utf8_lossy(&response)
|
||||
);
|
||||
for l in response.split_rn() {
|
||||
if !l.starts_with(b"*") {
|
||||
continue;
|
||||
}
|
||||
if let Ok(status) = protocol_parser::status_response(&l).map(|(_, v)| v) {
|
||||
if Some(mailbox_hash) == status.mailbox {
|
||||
if let Some(total) = status.messages {
|
||||
if let Ok(mut exists_lck) = mailbox.exists.lock() {
|
||||
exists_lck.clear();
|
||||
exists_lck.set_not_yet_seen(total);
|
||||
}
|
||||
}
|
||||
if let Some(total) = status.unseen {
|
||||
if let Ok(mut unseen_lck) = mailbox.unseen.lock() {
|
||||
unseen_lck.clear();
|
||||
unseen_lck.set_not_yet_seen(total);
|
||||
}
|
||||
}
|
||||
break;
|
||||
}
|
||||
}
|
||||
}
|
||||
} else {
|
||||
conn.send_command(b"SEARCH UNSEEN").await?;
|
||||
conn.read_response(&mut response, RequiredResponses::SEARCH)
|
||||
.await?;
|
||||
let unseen_count = protocol_parser::search_results(&response)?.1.len();
|
||||
if let Ok(mut exists_lck) = mailbox.exists.lock() {
|
||||
exists_lck.clear();
|
||||
exists_lck.set_not_yet_seen(select_response.exists);
|
||||
}
|
||||
if let Ok(mut unseen_lck) = mailbox.unseen.lock() {
|
||||
unseen_lck.clear();
|
||||
unseen_lck.set_not_yet_seen(unseen_count);
|
||||
}
|
||||
}
|
||||
mailbox.set_warm(true);
|
||||
return Ok(());
|
||||
}
|
||||
|
||||
if select_response.recent > 0 {
|
||||
/* UID SEARCH RECENT */
|
||||
conn.send_command(b"UID SEARCH RECENT").await?;
|
||||
conn.read_response(&mut response, RequiredResponses::SEARCH)
|
||||
.await?;
|
||||
let v = protocol_parser::search_results(response.as_slice()).map(|(_, v)| v)?;
|
||||
if v.is_empty() {
|
||||
debug!(
|
||||
"search response was empty: {}",
|
||||
String::from_utf8_lossy(&response)
|
||||
);
|
||||
return Ok(());
|
||||
}
|
||||
let mut cmd = "UID FETCH ".to_string();
|
||||
if v.len() == 1 {
|
||||
cmd.push_str(&v[0].to_string());
|
||||
} else {
|
||||
cmd.push_str(&v[0].to_string());
|
||||
for n in v.into_iter().skip(1) {
|
||||
cmd.push(',');
|
||||
cmd.push_str(&n.to_string());
|
||||
}
|
||||
}
|
||||
cmd.push_str(
|
||||
" (UID FLAGS ENVELOPE BODY.PEEK[HEADER.FIELDS (REFERENCES)] BODYSTRUCTURE)",
|
||||
);
|
||||
conn.send_command(cmd.as_bytes()).await?;
|
||||
conn.read_response(&mut response, RequiredResponses::FETCH_REQUIRED)
|
||||
.await?;
|
||||
} else if select_response.exists > mailbox.exists.lock().unwrap().len() {
|
||||
conn.send_command(
|
||||
format!(
|
||||
"FETCH {}:* (UID FLAGS ENVELOPE BODY.PEEK[HEADER.FIELDS (REFERENCES)] BODYSTRUCTURE)",
|
||||
std::cmp::max(mailbox.exists.lock().unwrap().len(), 1)
|
||||
)
|
||||
.as_bytes(),
|
||||
)
|
||||
.await?;
|
||||
conn.read_response(&mut response, RequiredResponses::FETCH_REQUIRED)
|
||||
.await?;
|
||||
} else {
|
||||
return Ok(());
|
||||
}
|
||||
debug!(
|
||||
"fetch response is {} bytes and {} lines",
|
||||
response.len(),
|
||||
String::from_utf8_lossy(&response).lines().count()
|
||||
);
|
||||
let (_, mut v, _) = protocol_parser::fetch_responses(&response)?;
|
||||
debug!("responses len is {}", v.len());
|
||||
for FetchResponse {
|
||||
ref uid,
|
||||
ref mut envelope,
|
||||
ref mut flags,
|
||||
ref references,
|
||||
..
|
||||
} in v.iter_mut()
|
||||
{
|
||||
let uid = uid.unwrap();
|
||||
let env = envelope.as_mut().unwrap();
|
||||
env.set_hash(generate_envelope_hash(&mailbox.imap_path(), &uid));
|
||||
if let Some(value) = references {
|
||||
env.set_references(value);
|
||||
}
|
||||
let mut tag_lck = uid_store.collection.tag_index.write().unwrap();
|
||||
if let Some((flags, keywords)) = flags {
|
||||
env.set_flags(*flags);
|
||||
if !env.is_seen() {
|
||||
mailbox.unseen.lock().unwrap().insert_new(env.hash());
|
||||
}
|
||||
mailbox.exists.lock().unwrap().insert_new(env.hash());
|
||||
for f in keywords {
|
||||
let hash = tag_hash!(f);
|
||||
if !tag_lck.contains_key(&hash) {
|
||||
tag_lck.insert(hash, f.to_string());
|
||||
}
|
||||
env.labels_mut().push(hash);
|
||||
}
|
||||
}
|
||||
}
|
||||
if uid_store.keep_offline_cache {
|
||||
if !cache_handle.mailbox_state(mailbox_hash)?.is_none() {
|
||||
cache_handle
|
||||
.insert_envelopes(mailbox_hash, &v)
|
||||
.chain_err_summary(|| {
|
||||
format!(
|
||||
"Could not save envelopes in cache for mailbox {}",
|
||||
mailbox.imap_path()
|
||||
)
|
||||
})?;
|
||||
}
|
||||
}
|
||||
|
||||
for FetchResponse { uid, envelope, .. } in v {
|
||||
if uid.is_none() || envelope.is_none() {
|
||||
let mailboxes: HashMap<MailboxHash, ImapMailbox> = {
|
||||
let mailboxes_lck = timeout(uid_store.timeout, uid_store.mailboxes.lock()).await?;
|
||||
let mut ret = mailboxes_lck.clone();
|
||||
ret.retain(|k, _| mailbox_hashes.contains(k));
|
||||
ret
|
||||
};
|
||||
for (h, mailbox) in mailboxes.iter() {
|
||||
if mailbox_hash == *h {
|
||||
continue;
|
||||
}
|
||||
let uid = uid.unwrap();
|
||||
if uid_store
|
||||
.uid_index
|
||||
.lock()
|
||||
.unwrap()
|
||||
.contains_key(&(mailbox_hash, uid))
|
||||
Self::examine_updates(mailbox, &mut connection, &uid_store).await?;
|
||||
}
|
||||
connection.send_command(b"IDLE").await?;
|
||||
let mut blockn = ImapBlockingConnection::from(connection);
|
||||
let mut watch = std::time::Instant::now();
|
||||
/* duration interval to send heartbeat */
|
||||
const _10_MINS: std::time::Duration = std::time::Duration::from_secs(10 * 60);
|
||||
/* duration interval to check other mailboxes for changes */
|
||||
loop {
|
||||
let line = match timeout(
|
||||
Some(std::cmp::min(*polling_period, _10_MINS)),
|
||||
blockn.as_stream(),
|
||||
)
|
||||
.await
|
||||
{
|
||||
Ok(Some(line)) => line,
|
||||
Ok(None) => {
|
||||
debug!("IDLE connection dropped: {:?}", &blockn.err());
|
||||
blockn.conn.connect().await?;
|
||||
let mut main_conn_lck = timeout(uid_store.timeout, main_conn.lock()).await?;
|
||||
main_conn_lck.connect().await?;
|
||||
continue;
|
||||
}
|
||||
Err(_) => {
|
||||
/* Timeout */
|
||||
blockn.conn.send_raw(b"DONE").await?;
|
||||
blockn
|
||||
.conn
|
||||
.read_response(&mut response, RequiredResponses::empty())
|
||||
.await?;
|
||||
blockn.conn.send_command(b"IDLE").await?;
|
||||
let mut main_conn_lck = timeout(uid_store.timeout, main_conn.lock()).await?;
|
||||
main_conn_lck.connect().await?;
|
||||
continue;
|
||||
}
|
||||
};
|
||||
let now = std::time::Instant::now();
|
||||
if now.duration_since(watch) >= *polling_period {
|
||||
/* Time to poll all inboxes */
|
||||
let mut conn = timeout(uid_store.timeout, main_conn.lock()).await?;
|
||||
for (_h, mailbox) in mailboxes.iter() {
|
||||
Self::examine_updates(mailbox, &mut conn, &uid_store).await?;
|
||||
}
|
||||
watch = now;
|
||||
}
|
||||
if line
|
||||
.split_rn()
|
||||
.filter(|l| {
|
||||
!l.starts_with(b"+ ")
|
||||
&& !l.starts_with(b"* ok")
|
||||
&& !l.starts_with(b"* ok")
|
||||
&& !l.starts_with(b"* Ok")
|
||||
&& !l.starts_with(b"* OK")
|
||||
})
|
||||
.count()
|
||||
== 0
|
||||
{
|
||||
continue;
|
||||
}
|
||||
let env = envelope.unwrap();
|
||||
debug!(
|
||||
"Create event {} {} {}",
|
||||
env.hash(),
|
||||
env.subject(),
|
||||
mailbox.path(),
|
||||
);
|
||||
uid_store
|
||||
.msn_index
|
||||
.lock()
|
||||
.unwrap()
|
||||
.entry(mailbox_hash)
|
||||
.or_default()
|
||||
.push(uid);
|
||||
uid_store
|
||||
.hash_index
|
||||
.lock()
|
||||
.unwrap()
|
||||
.insert(env.hash(), (uid, mailbox_hash));
|
||||
uid_store
|
||||
.uid_index
|
||||
.lock()
|
||||
.unwrap()
|
||||
.insert((mailbox_hash, uid), env.hash());
|
||||
conn.add_refresh_event(RefreshEvent {
|
||||
account_hash: uid_store.account_hash,
|
||||
mailbox_hash,
|
||||
kind: Create(Box::new(env)),
|
||||
});
|
||||
{
|
||||
blockn.conn.send_raw(b"DONE").await?;
|
||||
blockn
|
||||
.conn
|
||||
.read_response(&mut response, RequiredResponses::empty())
|
||||
.await?;
|
||||
for l in line.split_rn().chain(response.split_rn()) {
|
||||
debug!("process_untagged {:?}", String::from_utf8_lossy(&l));
|
||||
if l.starts_with(b"+ ")
|
||||
|| l.starts_with(b"* ok")
|
||||
|| l.starts_with(b"* ok")
|
||||
|| l.starts_with(b"* Ok")
|
||||
|| l.starts_with(b"* OK")
|
||||
{
|
||||
debug!("ignore continuation mark");
|
||||
continue;
|
||||
}
|
||||
blockn.conn.process_untagged(l).await?;
|
||||
}
|
||||
blockn.conn.send_command(b"IDLE").await?;
|
||||
}
|
||||
}
|
||||
}
|
||||
Ok(())
|
||||
pub async fn poll_with_examine(&mut self) -> Result<()> {
|
||||
debug!("poll with examine");
|
||||
let ImapWatcher {
|
||||
ref mailbox_hashes,
|
||||
ref uid_store,
|
||||
ref polling_period,
|
||||
ref server_conf,
|
||||
..
|
||||
} = self;
|
||||
let mut connection = ImapConnection::new_connection(server_conf, uid_store.clone());
|
||||
connection.connect().await?;
|
||||
let mailboxes: HashMap<MailboxHash, ImapMailbox> = {
|
||||
let mailboxes_lck = timeout(uid_store.timeout, uid_store.mailboxes.lock()).await?;
|
||||
let mut ret = mailboxes_lck.clone();
|
||||
ret.retain(|k, _| mailbox_hashes.contains(k));
|
||||
ret
|
||||
};
|
||||
loop {
|
||||
for (_, mailbox) in mailboxes.iter() {
|
||||
Self::examine_updates(mailbox, &mut connection, &uid_store).await?;
|
||||
}
|
||||
crate::connections::sleep(*polling_period).await;
|
||||
}
|
||||
}
|
||||
|
||||
pub async fn examine_updates(
|
||||
mailbox: &ImapMailbox,
|
||||
conn: &mut ImapConnection,
|
||||
uid_store: &Arc<UIDStore>,
|
||||
) -> Result<()> {
|
||||
if mailbox.no_select {
|
||||
return Ok(());
|
||||
}
|
||||
let mailbox_hash = mailbox.hash();
|
||||
debug!("examining mailbox {} {}", mailbox_hash, mailbox.path());
|
||||
if let Some(new_envelopes) = conn.resync(mailbox_hash).await? {
|
||||
for env in new_envelopes {
|
||||
conn.add_refresh_event(RefreshEvent {
|
||||
mailbox_hash,
|
||||
account_hash: uid_store.account_hash,
|
||||
kind: RefreshEventKind::Create(Box::new(env)),
|
||||
});
|
||||
}
|
||||
} else {
|
||||
#[cfg(not(feature = "sqlite3"))]
|
||||
let mut cache_handle = super::cache::DefaultCache::get(uid_store.clone())?;
|
||||
#[cfg(feature = "sqlite3")]
|
||||
let mut cache_handle = super::cache::Sqlite3Cache::get(uid_store.clone())?;
|
||||
let mut response = Vec::with_capacity(8 * 1024);
|
||||
let select_response = conn
|
||||
.examine_mailbox(mailbox_hash, &mut response, true)
|
||||
.await?
|
||||
.unwrap();
|
||||
{
|
||||
let mut uidvalidities = uid_store.uidvalidity.lock().unwrap();
|
||||
|
||||
if let Some(v) = uidvalidities.get(&mailbox_hash) {
|
||||
if *v != select_response.uidvalidity {
|
||||
if uid_store.keep_offline_cache {
|
||||
cache_handle.clear(mailbox_hash, &select_response)?;
|
||||
}
|
||||
conn.add_refresh_event(RefreshEvent {
|
||||
account_hash: uid_store.account_hash,
|
||||
mailbox_hash,
|
||||
kind: RefreshEventKind::Rescan,
|
||||
});
|
||||
/*
|
||||
uid_store.uid_index.lock().unwrap().clear();
|
||||
uid_store.hash_index.lock().unwrap().clear();
|
||||
uid_store.byte_cache.lock().unwrap().clear();
|
||||
*/
|
||||
return Ok(());
|
||||
}
|
||||
} else {
|
||||
uidvalidities.insert(mailbox_hash, select_response.uidvalidity);
|
||||
}
|
||||
}
|
||||
if mailbox.is_cold() {
|
||||
/* Mailbox hasn't been loaded yet */
|
||||
let has_list_status: bool = conn
|
||||
.uid_store
|
||||
.capabilities
|
||||
.lock()
|
||||
.unwrap()
|
||||
.iter()
|
||||
.any(|cap| cap.eq_ignore_ascii_case(b"LIST-STATUS"));
|
||||
if has_list_status {
|
||||
conn.send_command(
|
||||
format!(
|
||||
"LIST \"{}\" \"\" RETURN (STATUS (MESSAGES UNSEEN))",
|
||||
mailbox.imap_path()
|
||||
)
|
||||
.as_bytes(),
|
||||
)
|
||||
.await?;
|
||||
conn.read_response(
|
||||
&mut response,
|
||||
RequiredResponses::LIST_REQUIRED | RequiredResponses::STATUS,
|
||||
)
|
||||
.await?;
|
||||
debug!(
|
||||
"list return status out: {}",
|
||||
String::from_utf8_lossy(&response)
|
||||
);
|
||||
for l in response.split_rn() {
|
||||
if !l.starts_with(b"*") {
|
||||
continue;
|
||||
}
|
||||
if let Ok(status) = protocol_parser::status_response(&l).map(|(_, v)| v) {
|
||||
if Some(mailbox_hash) == status.mailbox {
|
||||
if let Some(total) = status.messages {
|
||||
if let Ok(mut exists_lck) = mailbox.exists.lock() {
|
||||
exists_lck.clear();
|
||||
exists_lck.set_not_yet_seen(total);
|
||||
}
|
||||
}
|
||||
if let Some(total) = status.unseen {
|
||||
if let Ok(mut unseen_lck) = mailbox.unseen.lock() {
|
||||
unseen_lck.clear();
|
||||
unseen_lck.set_not_yet_seen(total);
|
||||
}
|
||||
}
|
||||
break;
|
||||
}
|
||||
}
|
||||
}
|
||||
} else {
|
||||
conn.send_command(b"SEARCH UNSEEN").await?;
|
||||
conn.read_response(&mut response, RequiredResponses::SEARCH)
|
||||
.await?;
|
||||
let unseen_count = protocol_parser::search_results(&response)?.1.len();
|
||||
if let Ok(mut exists_lck) = mailbox.exists.lock() {
|
||||
exists_lck.clear();
|
||||
exists_lck.set_not_yet_seen(select_response.exists);
|
||||
}
|
||||
if let Ok(mut unseen_lck) = mailbox.unseen.lock() {
|
||||
unseen_lck.clear();
|
||||
unseen_lck.set_not_yet_seen(unseen_count);
|
||||
}
|
||||
}
|
||||
mailbox.set_warm(true);
|
||||
return Ok(());
|
||||
}
|
||||
|
||||
if select_response.recent > 0 {
|
||||
/* UID SEARCH RECENT */
|
||||
conn.send_command(b"UID SEARCH RECENT").await?;
|
||||
conn.read_response(&mut response, RequiredResponses::SEARCH)
|
||||
.await?;
|
||||
let v = protocol_parser::search_results(response.as_slice()).map(|(_, v)| v)?;
|
||||
if v.is_empty() {
|
||||
debug!(
|
||||
"search response was empty: {}",
|
||||
String::from_utf8_lossy(&response)
|
||||
);
|
||||
return Ok(());
|
||||
}
|
||||
let mut cmd = "UID FETCH ".to_string();
|
||||
if v.len() == 1 {
|
||||
cmd.push_str(&v[0].to_string());
|
||||
} else {
|
||||
cmd.push_str(&v[0].to_string());
|
||||
for n in v.into_iter().skip(1) {
|
||||
cmd.push(',');
|
||||
cmd.push_str(&n.to_string());
|
||||
}
|
||||
}
|
||||
cmd.push_str(
|
||||
" (UID FLAGS ENVELOPE BODY.PEEK[HEADER.FIELDS (REFERENCES)] BODYSTRUCTURE)",
|
||||
);
|
||||
conn.send_command(cmd.as_bytes()).await?;
|
||||
conn.read_response(&mut response, RequiredResponses::FETCH_REQUIRED)
|
||||
.await?;
|
||||
} else if select_response.exists > mailbox.exists.lock().unwrap().len() {
|
||||
conn.send_command(
|
||||
format!(
|
||||
"FETCH {}:* (UID FLAGS ENVELOPE BODY.PEEK[HEADER.FIELDS (REFERENCES)] BODYSTRUCTURE)",
|
||||
std::cmp::max(mailbox.exists.lock().unwrap().len(), 1)
|
||||
)
|
||||
.as_bytes(),
|
||||
)
|
||||
.await?;
|
||||
conn.read_response(&mut response, RequiredResponses::FETCH_REQUIRED)
|
||||
.await?;
|
||||
} else {
|
||||
return Ok(());
|
||||
}
|
||||
debug!(
|
||||
"fetch response is {} bytes and {} lines",
|
||||
response.len(),
|
||||
String::from_utf8_lossy(&response).lines().count()
|
||||
);
|
||||
let (_, mut v, _) = protocol_parser::fetch_responses(&response)?;
|
||||
debug!("responses len is {}", v.len());
|
||||
for FetchResponse {
|
||||
ref uid,
|
||||
ref mut envelope,
|
||||
ref mut flags,
|
||||
ref references,
|
||||
..
|
||||
} in v.iter_mut()
|
||||
{
|
||||
let uid = uid.unwrap();
|
||||
let env = envelope.as_mut().unwrap();
|
||||
env.set_hash(generate_envelope_hash(&mailbox.imap_path(), &uid));
|
||||
if let Some(value) = references {
|
||||
env.set_references(value);
|
||||
}
|
||||
let mut tag_lck = uid_store.collection.tag_index.write().unwrap();
|
||||
if let Some((flags, keywords)) = flags {
|
||||
env.set_flags(*flags);
|
||||
if !env.is_seen() {
|
||||
mailbox.unseen.lock().unwrap().insert_new(env.hash());
|
||||
}
|
||||
mailbox.exists.lock().unwrap().insert_new(env.hash());
|
||||
for f in keywords {
|
||||
let hash = tag_hash!(f);
|
||||
if !tag_lck.contains_key(&hash) {
|
||||
tag_lck.insert(hash, f.to_string());
|
||||
}
|
||||
env.labels_mut().push(hash);
|
||||
}
|
||||
}
|
||||
}
|
||||
if uid_store.keep_offline_cache {
|
||||
if !cache_handle.mailbox_state(mailbox_hash)?.is_none() {
|
||||
cache_handle
|
||||
.insert_envelopes(mailbox_hash, &v)
|
||||
.chain_err_summary(|| {
|
||||
format!(
|
||||
"Could not save envelopes in cache for mailbox {}",
|
||||
mailbox.imap_path()
|
||||
)
|
||||
})?;
|
||||
}
|
||||
}
|
||||
|
||||
for FetchResponse { uid, envelope, .. } in v {
|
||||
if uid.is_none() || envelope.is_none() {
|
||||
continue;
|
||||
}
|
||||
let uid = uid.unwrap();
|
||||
if uid_store
|
||||
.uid_index
|
||||
.lock()
|
||||
.unwrap()
|
||||
.contains_key(&(mailbox_hash, uid))
|
||||
{
|
||||
continue;
|
||||
}
|
||||
let env = envelope.unwrap();
|
||||
debug!(
|
||||
"Create event {} {} {}",
|
||||
env.hash(),
|
||||
env.subject(),
|
||||
mailbox.path(),
|
||||
);
|
||||
uid_store
|
||||
.msn_index
|
||||
.lock()
|
||||
.unwrap()
|
||||
.entry(mailbox_hash)
|
||||
.or_default()
|
||||
.push(uid);
|
||||
uid_store
|
||||
.hash_index
|
||||
.lock()
|
||||
.unwrap()
|
||||
.insert(env.hash(), (uid, mailbox_hash));
|
||||
uid_store
|
||||
.uid_index
|
||||
.lock()
|
||||
.unwrap()
|
||||
.insert((mailbox_hash, uid), env.hash());
|
||||
conn.add_refresh_event(RefreshEvent {
|
||||
account_hash: uid_store.account_hash,
|
||||
mailbox_hash,
|
||||
kind: Create(Box::new(env)),
|
||||
});
|
||||
}
|
||||
}
|
||||
Ok(())
|
||||
}
|
||||
}
|
||||
|
|
|
@ -90,6 +90,8 @@ use objects::*;
|
|||
pub mod mailbox;
|
||||
use mailbox::*;
|
||||
|
||||
pub mod watch;
|
||||
|
||||
#[derive(Debug, Default)]
|
||||
pub struct EnvelopeCache {
|
||||
bytes: Option<String>,
|
||||
|
@ -313,6 +315,147 @@ impl MailBackend for JmapType {
|
|||
}))
|
||||
}
|
||||
|
||||
fn load(&mut self, mailbox_hash: MailboxHash) -> ResultFuture<()> {
|
||||
let store = self.store.clone();
|
||||
let connection = self.connection.clone();
|
||||
Ok(Box::pin(async move {
|
||||
{
|
||||
crate::connections::sleep(std::time::Duration::from_secs(2)).await;
|
||||
}
|
||||
let mailbox_id = store.mailboxes.read().unwrap()[&mailbox_hash].id.clone();
|
||||
let email_query_call: EmailQuery = EmailQuery::new(
|
||||
Query::new()
|
||||
.account_id(conn.mail_account_id().clone())
|
||||
.filter(Some(Filter::Condition(
|
||||
EmailFilterCondition::new()
|
||||
.in_mailbox(Some(mailbox_id))
|
||||
.into(),
|
||||
)))
|
||||
.position(0)
|
||||
.properties(Some(vec![
|
||||
"id".to_string(),
|
||||
"receivedAt".to_string(),
|
||||
"messageId".to_string(),
|
||||
"inReplyTo".to_string(),
|
||||
"hasAttachment".to_string(),
|
||||
"keywords".to_string(),
|
||||
])),
|
||||
)
|
||||
.collapse_threads(false);
|
||||
|
||||
let mut req = Request::new(conn.request_no.clone());
|
||||
let prev_seq = req.add_call(&email_query_call);
|
||||
|
||||
let email_call: EmailGet = EmailGet::new(
|
||||
Get::new()
|
||||
.ids(Some(JmapArgument::reference(
|
||||
prev_seq,
|
||||
EmailQuery::RESULT_FIELD_IDS,
|
||||
)))
|
||||
.account_id(conn.mail_account_id().clone()),
|
||||
);
|
||||
|
||||
req.add_call(&email_call);
|
||||
|
||||
let api_url = conn.session.lock().unwrap().api_url.clone();
|
||||
let mut res = conn
|
||||
.client
|
||||
.post_async(api_url.as_str(), serde_json::to_string(&req)?)
|
||||
.await?;
|
||||
|
||||
let res_text = res.text_async().await?;
|
||||
|
||||
let mut v: MethodResponse = serde_json::from_str(&res_text).unwrap();
|
||||
let e = GetResponse::<EmailObject>::try_from(v.method_responses.pop().unwrap())?;
|
||||
let query_response =
|
||||
QueryResponse::<EmailObject>::try_from(v.method_responses.pop().unwrap())?;
|
||||
store
|
||||
.mailboxes
|
||||
.write()
|
||||
.unwrap()
|
||||
.entry(mailbox_hash)
|
||||
.and_modify(|mbox| {
|
||||
*mbox.email_query_state.lock().unwrap() = Some(query_response.query_state);
|
||||
});
|
||||
let GetResponse::<EmailObject> { list, state, .. } = e;
|
||||
{
|
||||
let (is_empty, is_equal) = {
|
||||
let mailboxes_lck = conn.store.mailboxes.read().unwrap();
|
||||
mailboxes_lck
|
||||
.get(&mailbox_hash)
|
||||
.map(|mbox| {
|
||||
let current_state_lck = mbox.email_state.lock().unwrap();
|
||||
(
|
||||
current_state_lck.is_none(),
|
||||
current_state_lck.as_ref() != Some(&state),
|
||||
)
|
||||
})
|
||||
.unwrap_or((true, true))
|
||||
};
|
||||
if is_empty {
|
||||
let mut mailboxes_lck = conn.store.mailboxes.write().unwrap();
|
||||
debug!("{:?}: inserting state {}", EmailObject::NAME, &state);
|
||||
mailboxes_lck.entry(mailbox_hash).and_modify(|mbox| {
|
||||
*mbox.email_state.lock().unwrap() = Some(state);
|
||||
});
|
||||
} else if !is_equal {
|
||||
conn.email_changes(mailbox_hash).await?;
|
||||
}
|
||||
}
|
||||
let mut total = BTreeSet::default();
|
||||
let mut unread = BTreeSet::default();
|
||||
let new_envelopes: HashMap<EnvelopeHash, Envelope> = list
|
||||
.into_iter(|obj| {
|
||||
let env = store.add_envelope(obj);
|
||||
total.insert(env.hash());
|
||||
if !env.is_seen() {
|
||||
unread.insert(env.hash());
|
||||
}
|
||||
(env.hash(), env)
|
||||
})
|
||||
.collect();
|
||||
let mut mailboxes_lck = store.mailboxes.write().unwrap();
|
||||
mailboxes_lck.entry(mailbox_hash).and_modify(|mbox| {
|
||||
mbox.total_emails.lock().unwrap().insert_existing_set(total);
|
||||
mbox.unread_emails
|
||||
.lock()
|
||||
.unwrap()
|
||||
.insert_existing_set(unread);
|
||||
});
|
||||
let keys: BTreeSet<EnvelopeHash> = new_envelopes.keys().cloned().collect();
|
||||
collection.merge(new_envelopes, mailbox_hash, None);
|
||||
let mut envelopes_lck = collection.envelopes.write().unwrap();
|
||||
envelopes_lck.retain(|k, _| !keys.contains(k));
|
||||
|
||||
Ok(())
|
||||
}))
|
||||
}
|
||||
|
||||
fn fetch_batch(&mut self, env_hashes: EnvelopeHashBatch) -> ResultFuture<()> {
|
||||
todo!()
|
||||
/*
|
||||
let store = self.store.clone();
|
||||
let connection = self.connection.clone();
|
||||
Ok(Box::pin(async move {
|
||||
//crate::connections::sleep(std::time::Duration::from_secs(2)).await;
|
||||
debug!("fetch_batch {:?}", &env_hashes);
|
||||
let mut envelopes_lck = collection.envelopes.write().unwrap();
|
||||
for env_hash in env_hashes.iter() {
|
||||
if envelopes_lck.contains_key(&env_hash) {
|
||||
continue;
|
||||
}
|
||||
let index_lck = index.write().unwrap();
|
||||
let message = Message::find_message(&database, &index_lck[&env_hash])?;
|
||||
drop(index_lck);
|
||||
let env = message.into_envelope(&index, &collection.tag_index);
|
||||
envelopes_lck.insert(env_hash, env);
|
||||
}
|
||||
debug!("fetch_batch {:?} done", &env_hashes);
|
||||
Ok(())
|
||||
}))
|
||||
*/
|
||||
}
|
||||
|
||||
fn fetch(
|
||||
&mut self,
|
||||
mailbox_hash: MailboxHash,
|
||||
|
@ -341,32 +484,12 @@ impl MailBackend for JmapType {
|
|||
}))
|
||||
}
|
||||
|
||||
fn watch(&self) -> ResultFuture<()> {
|
||||
fn watcher(&self) -> Result<Box<dyn BackendWatcher>> {
|
||||
let connection = self.connection.clone();
|
||||
let store = self.store.clone();
|
||||
Ok(Box::pin(async move {
|
||||
{
|
||||
let mut conn = connection.lock().await;
|
||||
conn.connect().await?;
|
||||
}
|
||||
loop {
|
||||
{
|
||||
let mailbox_hashes = {
|
||||
store
|
||||
.mailboxes
|
||||
.read()
|
||||
.unwrap()
|
||||
.keys()
|
||||
.cloned()
|
||||
.collect::<SmallVec<[MailboxHash; 16]>>()
|
||||
};
|
||||
let conn = connection.lock().await;
|
||||
for mailbox_hash in mailbox_hashes {
|
||||
conn.email_changes(mailbox_hash).await?;
|
||||
}
|
||||
}
|
||||
crate::connections::sleep(std::time::Duration::from_secs(60)).await;
|
||||
}
|
||||
Ok(Box::new(watch::JmapWatcher {
|
||||
connection,
|
||||
mailbox_hashes: BTreeSet::default(),
|
||||
polling_period: std::time::Duration::from_secs(60),
|
||||
}))
|
||||
}
|
||||
|
||||
|
|