Compare commits
2 Commits
master
...
lazy_fetch
Author | SHA1 | Date |
---|---|---|
Manos Pitsidianakis | 77e4488637 | |
Manos Pitsidianakis | 819d993f11 |
|
@ -310,13 +310,26 @@ pub trait MailBackend: ::std::fmt::Debug + Send + Sync {
|
|||
Ok(Box::pin(async { Ok(()) }))
|
||||
}
|
||||
|
||||
fn fetch_batch(&mut self, env_hashes: EnvelopeHashBatch) -> ResultFuture<()> {
|
||||
Err(MeliError::new("Unimplemented."))
|
||||
}
|
||||
|
||||
fn load(&mut self, mailbox_hash: MailboxHash) -> ResultFuture<()> {
|
||||
Err(MeliError::new("Unimplemented."))
|
||||
}
|
||||
|
||||
fn fetch(
|
||||
&mut self,
|
||||
mailbox_hash: MailboxHash,
|
||||
) -> Result<Pin<Box<dyn Stream<Item = Result<Vec<Envelope>>> + Send + 'static>>>;
|
||||
|
||||
fn refresh(&mut self, mailbox_hash: MailboxHash) -> ResultFuture<()>;
|
||||
fn watch(&self) -> ResultFuture<()>;
|
||||
|
||||
/// Return a [`Box<dyn BackendWatcher>`](BackendWatcher), to which you can register the
|
||||
/// mailboxes you are interested in for updates and then consume to spawn a watching `Future`.
|
||||
/// The `Future` sends events to the [`BackendEventConsumer`](BackendEventConsumer) supplied to
|
||||
/// the backend in its constructor method.
|
||||
fn watcher(&self) -> Result<Box<dyn BackendWatcher>>;
|
||||
fn mailboxes(&self) -> ResultFuture<HashMap<MailboxHash, Mailbox>>;
|
||||
fn operation(&self, hash: EnvelopeHash) -> Result<Box<dyn BackendOp>>;
|
||||
|
||||
|
@ -623,7 +636,7 @@ impl EnvelopeHashBatch {
|
|||
#[derive(Default, Clone)]
|
||||
pub struct LazyCountSet {
|
||||
not_yet_seen: usize,
|
||||
set: BTreeSet<EnvelopeHash>,
|
||||
pub set: BTreeSet<EnvelopeHash>,
|
||||
}
|
||||
|
||||
impl fmt::Debug for LazyCountSet {
|
||||
|
@ -711,3 +724,31 @@ impl std::ops::Deref for IsSubscribedFn {
|
|||
&self.0
|
||||
}
|
||||
}
|
||||
|
||||
/// Urgency for the events of a single Mailbox.
|
||||
#[derive(Debug, Copy, Clone, PartialEq)]
|
||||
pub enum MailboxWatchUrgency {
|
||||
High,
|
||||
Medium,
|
||||
Low,
|
||||
}
|
||||
|
||||
/// Register the mailboxes you are interested in for updates and then consume with `spawn` to spawn
|
||||
/// a watching `Future`. The `Future` sends events to the
|
||||
/// [`BackendEventConsumer`](backends::BackendEventConsumer) supplied to the backend in its constructor
|
||||
/// method.
|
||||
pub trait BackendWatcher: ::std::fmt::Debug + Send + Sync {
|
||||
/// Whether the watcher's `Future` requires blocking I/O.
|
||||
fn is_blocking(&self) -> bool;
|
||||
fn register_mailbox(
|
||||
&mut self,
|
||||
mailbox_hash: MailboxHash,
|
||||
urgency: MailboxWatchUrgency,
|
||||
) -> Result<()>;
|
||||
fn set_polling_period(&mut self, period: Option<std::time::Duration>) -> Result<()>;
|
||||
fn spawn(self: Box<Self>) -> ResultFuture<()>;
|
||||
/// Use the [`Any`](std::any::Any) trait to get the underlying type implementing the
|
||||
/// [`BackendWatcher`](backends::BackendEventConsumer) trait.
|
||||
fn as_any(&self) -> &dyn Any;
|
||||
fn as_any_mut(&mut self) -> &mut dyn Any;
|
||||
}
|
||||
|
|
|
@ -370,13 +370,13 @@ impl MailBackend for ImapType {
|
|||
let main_conn = self.connection.clone();
|
||||
let uid_store = self.uid_store.clone();
|
||||
Ok(Box::pin(async move {
|
||||
let inbox = timeout(uid_store.timeout, uid_store.mailboxes.lock())
|
||||
let mut inbox = timeout(uid_store.timeout, uid_store.mailboxes.lock())
|
||||
.await?
|
||||
.get(&mailbox_hash)
|
||||
.map(std::clone::Clone::clone)
|
||||
.unwrap();
|
||||
let mut conn = timeout(uid_store.timeout, main_conn.lock()).await?;
|
||||
watch::examine_updates(inbox, &mut conn, &uid_store).await?;
|
||||
watch::ImapWatcher::examine_updates(&mut inbox, &mut conn, &uid_store).await?;
|
||||
Ok(())
|
||||
}))
|
||||
}
|
||||
|
@ -450,68 +450,16 @@ impl MailBackend for ImapType {
|
|||
}))
|
||||
}
|
||||
|
||||
fn watch(&self) -> ResultFuture<()> {
|
||||
fn watcher(&self) -> Result<Box<dyn BackendWatcher>> {
|
||||
let server_conf = self.server_conf.clone();
|
||||
let main_conn = self.connection.clone();
|
||||
let uid_store = self.uid_store.clone();
|
||||
Ok(Box::pin(async move {
|
||||
let has_idle: bool = match server_conf.protocol {
|
||||
ImapProtocol::IMAP {
|
||||
extension_use: ImapExtensionUse { idle, .. },
|
||||
} => {
|
||||
idle && uid_store
|
||||
.capabilities
|
||||
.lock()
|
||||
.unwrap()
|
||||
.iter()
|
||||
.any(|cap| cap.eq_ignore_ascii_case(b"IDLE"))
|
||||
}
|
||||
_ => false,
|
||||
};
|
||||
while let Err(err) = if has_idle {
|
||||
idle(ImapWatchKit {
|
||||
conn: ImapConnection::new_connection(&server_conf, uid_store.clone()),
|
||||
main_conn: main_conn.clone(),
|
||||
uid_store: uid_store.clone(),
|
||||
})
|
||||
.await
|
||||
} else {
|
||||
poll_with_examine(ImapWatchKit {
|
||||
conn: ImapConnection::new_connection(&server_conf, uid_store.clone()),
|
||||
main_conn: main_conn.clone(),
|
||||
uid_store: uid_store.clone(),
|
||||
})
|
||||
.await
|
||||
} {
|
||||
let mut main_conn_lck = timeout(uid_store.timeout, main_conn.lock()).await?;
|
||||
if err.kind.is_network() {
|
||||
uid_store.is_online.lock().unwrap().1 = Err(err.clone());
|
||||
} else {
|
||||
return Err(err);
|
||||
}
|
||||
debug!("Watch failure: {}", err.to_string());
|
||||
match timeout(uid_store.timeout, main_conn_lck.connect())
|
||||
.await
|
||||
.and_then(|res| res)
|
||||
{
|
||||
Err(err2) => {
|
||||
debug!("Watch reconnect attempt failed: {}", err2.to_string());
|
||||
}
|
||||
Ok(()) => {
|
||||
debug!("Watch reconnect attempt succesful");
|
||||
continue;
|
||||
}
|
||||
}
|
||||
let account_hash = uid_store.account_hash;
|
||||
main_conn_lck.add_refresh_event(RefreshEvent {
|
||||
account_hash,
|
||||
mailbox_hash: 0,
|
||||
kind: RefreshEventKind::Failure(err.clone()),
|
||||
});
|
||||
return Err(err);
|
||||
}
|
||||
debug!("watch future returning");
|
||||
Ok(())
|
||||
Ok(Box::new(ImapWatcher {
|
||||
main_conn,
|
||||
uid_store,
|
||||
mailbox_hashes: BTreeSet::default(),
|
||||
polling_period: std::time::Duration::from_secs(5 * 60),
|
||||
server_conf,
|
||||
}))
|
||||
}
|
||||
|
||||
|
|
|
@ -588,7 +588,9 @@ impl ImapConnection {
|
|||
pub fn connect<'a>(&'a mut self) -> Pin<Box<dyn Future<Output = Result<()>> + Send + 'a>> {
|
||||
Box::pin(async move {
|
||||
if let (time, ref mut status @ Ok(())) = *self.uid_store.is_online.lock().unwrap() {
|
||||
if SystemTime::now().duration_since(time).unwrap_or_default() >= IMAP_PROTOCOL_TIMEOUT {
|
||||
if SystemTime::now().duration_since(time).unwrap_or_default()
|
||||
>= IMAP_PROTOCOL_TIMEOUT
|
||||
{
|
||||
let err = MeliError::new("Connection timed out").set_kind(ErrorKind::Timeout);
|
||||
*status = Err(err.clone());
|
||||
self.stream = Err(err);
|
||||
|
|
|
@ -23,201 +23,129 @@ use crate::backends::SpecialUsageMailbox;
|
|||
use std::sync::Arc;
|
||||
|
||||
/// Arguments for IMAP watching functions
|
||||
pub struct ImapWatchKit {
|
||||
pub conn: ImapConnection,
|
||||
#[derive(Debug)]
|
||||
pub struct ImapWatcher {
|
||||
pub main_conn: Arc<FutureMutex<ImapConnection>>,
|
||||
pub uid_store: Arc<UIDStore>,
|
||||
pub mailbox_hashes: BTreeSet<MailboxHash>,
|
||||
pub polling_period: std::time::Duration,
|
||||
pub server_conf: ImapServerConf,
|
||||
}
|
||||
|
||||
pub async fn poll_with_examine(kit: ImapWatchKit) -> Result<()> {
|
||||
debug!("poll with examine");
|
||||
let ImapWatchKit {
|
||||
mut conn,
|
||||
main_conn: _,
|
||||
uid_store,
|
||||
} = kit;
|
||||
conn.connect().await?;
|
||||
let mailboxes: HashMap<MailboxHash, ImapMailbox> = {
|
||||
let mailboxes_lck = timeout(uid_store.timeout, uid_store.mailboxes.lock()).await?;
|
||||
mailboxes_lck.clone()
|
||||
};
|
||||
loop {
|
||||
for (_, mailbox) in mailboxes.clone() {
|
||||
examine_updates(mailbox, &mut conn, &uid_store).await?;
|
||||
}
|
||||
//FIXME: make sleep duration configurable
|
||||
smol::Timer::after(std::time::Duration::from_secs(3 * 60)).await;
|
||||
impl BackendWatcher for ImapWatcher {
|
||||
fn is_blocking(&self) -> bool {
|
||||
false
|
||||
}
|
||||
}
|
||||
|
||||
pub async fn idle(kit: ImapWatchKit) -> Result<()> {
|
||||
debug!("IDLE");
|
||||
/* IDLE only watches the connection's selected mailbox. We will IDLE on INBOX and every ~5
|
||||
* minutes wake up and poll the others */
|
||||
let ImapWatchKit {
|
||||
mut conn,
|
||||
main_conn,
|
||||
uid_store,
|
||||
} = kit;
|
||||
conn.connect().await?;
|
||||
let mailbox: ImapMailbox = match uid_store
|
||||
.mailboxes
|
||||
.lock()
|
||||
.await
|
||||
.values()
|
||||
.find(|f| f.parent.is_none() && (f.special_usage() == SpecialUsageMailbox::Inbox))
|
||||
.map(std::clone::Clone::clone)
|
||||
{
|
||||
Some(mailbox) => mailbox,
|
||||
None => {
|
||||
return Err(MeliError::new("INBOX mailbox not found in local mailbox index. meli may have not parsed the IMAP mailboxes correctly"));
|
||||
fn register_mailbox(
|
||||
&mut self,
|
||||
mailbox_hash: MailboxHash,
|
||||
_urgency: MailboxWatchUrgency,
|
||||
) -> Result<()> {
|
||||
self.mailbox_hashes.insert(mailbox_hash);
|
||||
Ok(())
|
||||
}
|
||||
|
||||
fn set_polling_period(&mut self, period: Option<std::time::Duration>) -> Result<()> {
|
||||
if let Some(period) = period {
|
||||
self.polling_period = period;
|
||||
}
|
||||
};
|
||||
let mailbox_hash = mailbox.hash();
|
||||
let mut response = Vec::with_capacity(8 * 1024);
|
||||
let select_response = conn
|
||||
.examine_mailbox(mailbox_hash, &mut response, true)
|
||||
.await?
|
||||
.unwrap();
|
||||
{
|
||||
let mut uidvalidities = uid_store.uidvalidity.lock().unwrap();
|
||||
Ok(())
|
||||
}
|
||||
|
||||
if let Some(v) = uidvalidities.get(&mailbox_hash) {
|
||||
if *v != select_response.uidvalidity {
|
||||
if uid_store.keep_offline_cache {
|
||||
#[cfg(not(feature = "sqlite3"))]
|
||||
let mut cache_handle = super::cache::DefaultCache::get(uid_store.clone())?;
|
||||
#[cfg(feature = "sqlite3")]
|
||||
let mut cache_handle = super::cache::Sqlite3Cache::get(uid_store.clone())?;
|
||||
cache_handle.clear(mailbox_hash, &select_response)?;
|
||||
fn spawn(mut self: Box<Self>) -> ResultFuture<()> {
|
||||
Ok(Box::pin(async move {
|
||||
let has_idle: bool = match self.server_conf.protocol {
|
||||
ImapProtocol::IMAP {
|
||||
extension_use: ImapExtensionUse { idle, .. },
|
||||
} => {
|
||||
idle && self
|
||||
.uid_store
|
||||
.capabilities
|
||||
.lock()
|
||||
.unwrap()
|
||||
.iter()
|
||||
.any(|cap| cap.eq_ignore_ascii_case(b"IDLE"))
|
||||
}
|
||||
conn.add_refresh_event(RefreshEvent {
|
||||
account_hash: uid_store.account_hash,
|
||||
mailbox_hash,
|
||||
kind: RefreshEventKind::Rescan,
|
||||
_ => false,
|
||||
};
|
||||
while let Err(err) = if has_idle {
|
||||
self.idle().await
|
||||
} else {
|
||||
self.poll_with_examine().await
|
||||
} {
|
||||
let mut main_conn_lck =
|
||||
timeout(self.uid_store.timeout, self.main_conn.lock()).await?;
|
||||
if err.kind.is_network() {
|
||||
self.uid_store.is_online.lock().unwrap().1 = Err(err.clone());
|
||||
} else {
|
||||
return Err(err);
|
||||
}
|
||||
debug!("Watch failure: {}", err.to_string());
|
||||
match timeout(self.uid_store.timeout, main_conn_lck.connect())
|
||||
.await
|
||||
.and_then(|res| res)
|
||||
{
|
||||
Err(err2) => {
|
||||
debug!("Watch reconnect attempt failed: {}", err2.to_string());
|
||||
}
|
||||
Ok(()) => {
|
||||
debug!("Watch reconnect attempt succesful");
|
||||
continue;
|
||||
}
|
||||
}
|
||||
let account_hash = self.uid_store.account_hash;
|
||||
main_conn_lck.add_refresh_event(RefreshEvent {
|
||||
account_hash,
|
||||
mailbox_hash: 0,
|
||||
kind: RefreshEventKind::Failure(err.clone()),
|
||||
});
|
||||
/*
|
||||
uid_store.uid_index.lock().unwrap().clear();
|
||||
uid_store.hash_index.lock().unwrap().clear();
|
||||
uid_store.byte_cache.lock().unwrap().clear();
|
||||
*/
|
||||
return Err(err);
|
||||
}
|
||||
} else {
|
||||
uidvalidities.insert(mailbox_hash, select_response.uidvalidity);
|
||||
}
|
||||
debug!("watch future returning");
|
||||
Ok(())
|
||||
}))
|
||||
}
|
||||
let mailboxes: HashMap<MailboxHash, ImapMailbox> = {
|
||||
let mailboxes_lck = timeout(uid_store.timeout, uid_store.mailboxes.lock()).await?;
|
||||
mailboxes_lck.clone()
|
||||
};
|
||||
for (h, mailbox) in mailboxes.clone() {
|
||||
if mailbox_hash == h {
|
||||
continue;
|
||||
}
|
||||
examine_updates(mailbox, &mut conn, &uid_store).await?;
|
||||
|
||||
fn as_any(&self) -> &dyn Any {
|
||||
self
|
||||
}
|
||||
conn.send_command(b"IDLE").await?;
|
||||
let mut blockn = ImapBlockingConnection::from(conn);
|
||||
let mut watch = std::time::Instant::now();
|
||||
/* duration interval to send heartbeat */
|
||||
const _10_MINS: std::time::Duration = std::time::Duration::from_secs(10 * 60);
|
||||
/* duration interval to check other mailboxes for changes */
|
||||
const _5_MINS: std::time::Duration = std::time::Duration::from_secs(5 * 60);
|
||||
loop {
|
||||
let line = match timeout(Some(_10_MINS), blockn.as_stream()).await {
|
||||
Ok(Some(line)) => line,
|
||||
Ok(None) => {
|
||||
debug!("IDLE connection dropped: {:?}", &blockn.err());
|
||||
blockn.conn.connect().await?;
|
||||
let mut main_conn_lck = timeout(uid_store.timeout, main_conn.lock()).await?;
|
||||
main_conn_lck.connect().await?;
|
||||
continue;
|
||||
}
|
||||
Err(_) => {
|
||||
/* Timeout */
|
||||
blockn.conn.send_raw(b"DONE").await?;
|
||||
blockn
|
||||
.conn
|
||||
.read_response(&mut response, RequiredResponses::empty())
|
||||
.await?;
|
||||
blockn.conn.send_command(b"IDLE").await?;
|
||||
let mut main_conn_lck = timeout(uid_store.timeout, main_conn.lock()).await?;
|
||||
main_conn_lck.connect().await?;
|
||||
continue;
|
||||
|
||||
fn as_any_mut(&mut self) -> &mut dyn Any {
|
||||
self
|
||||
}
|
||||
}
|
||||
|
||||
impl ImapWatcher {
|
||||
pub async fn idle(&mut self) -> Result<()> {
|
||||
debug!("IDLE");
|
||||
/* IDLE only watches the connection's selected mailbox. We will IDLE on INBOX and every X
|
||||
* minutes wake up and poll the others */
|
||||
let ImapWatcher {
|
||||
ref main_conn,
|
||||
ref uid_store,
|
||||
ref mailbox_hashes,
|
||||
ref polling_period,
|
||||
ref server_conf,
|
||||
..
|
||||
} = self;
|
||||
let mut connection = ImapConnection::new_connection(server_conf, uid_store.clone());
|
||||
connection.connect().await?;
|
||||
let mailbox_hash: MailboxHash = match uid_store
|
||||
.mailboxes
|
||||
.lock()
|
||||
.await
|
||||
.values()
|
||||
.find(|f| f.parent.is_none() && (f.special_usage() == SpecialUsageMailbox::Inbox))
|
||||
.map(|f| f.hash)
|
||||
{
|
||||
Some(h) => h,
|
||||
None => {
|
||||
return Err(MeliError::new("INBOX mailbox not found in local mailbox index. meli may have not parsed the IMAP mailboxes correctly"));
|
||||
}
|
||||
};
|
||||
let now = std::time::Instant::now();
|
||||
if now.duration_since(watch) >= _5_MINS {
|
||||
/* Time to poll all inboxes */
|
||||
let mut conn = timeout(uid_store.timeout, main_conn.lock()).await?;
|
||||
for (_h, mailbox) in mailboxes.clone() {
|
||||
examine_updates(mailbox, &mut conn, &uid_store).await?;
|
||||
}
|
||||
watch = now;
|
||||
}
|
||||
if line
|
||||
.split_rn()
|
||||
.filter(|l| {
|
||||
!l.starts_with(b"+ ")
|
||||
&& !l.starts_with(b"* ok")
|
||||
&& !l.starts_with(b"* ok")
|
||||
&& !l.starts_with(b"* Ok")
|
||||
&& !l.starts_with(b"* OK")
|
||||
})
|
||||
.count()
|
||||
== 0
|
||||
{
|
||||
continue;
|
||||
}
|
||||
{
|
||||
blockn.conn.send_raw(b"DONE").await?;
|
||||
blockn
|
||||
.conn
|
||||
.read_response(&mut response, RequiredResponses::empty())
|
||||
.await?;
|
||||
for l in line.split_rn().chain(response.split_rn()) {
|
||||
debug!("process_untagged {:?}", &l);
|
||||
if l.starts_with(b"+ ")
|
||||
|| l.starts_with(b"* ok")
|
||||
|| l.starts_with(b"* ok")
|
||||
|| l.starts_with(b"* Ok")
|
||||
|| l.starts_with(b"* OK")
|
||||
{
|
||||
debug!("ignore continuation mark");
|
||||
continue;
|
||||
}
|
||||
blockn.conn.process_untagged(l).await?;
|
||||
}
|
||||
blockn.conn.send_command(b"IDLE").await?;
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
pub async fn examine_updates(
|
||||
mailbox: ImapMailbox,
|
||||
conn: &mut ImapConnection,
|
||||
uid_store: &Arc<UIDStore>,
|
||||
) -> Result<()> {
|
||||
if mailbox.no_select {
|
||||
return Ok(());
|
||||
}
|
||||
let mailbox_hash = mailbox.hash();
|
||||
debug!("examining mailbox {} {}", mailbox_hash, mailbox.path());
|
||||
if let Some(new_envelopes) = conn.resync(mailbox_hash).await? {
|
||||
for env in new_envelopes {
|
||||
conn.add_refresh_event(RefreshEvent {
|
||||
mailbox_hash,
|
||||
account_hash: uid_store.account_hash,
|
||||
kind: RefreshEventKind::Create(Box::new(env)),
|
||||
});
|
||||
}
|
||||
} else {
|
||||
#[cfg(not(feature = "sqlite3"))]
|
||||
let mut cache_handle = super::cache::DefaultCache::get(uid_store.clone())?;
|
||||
#[cfg(feature = "sqlite3")]
|
||||
let mut cache_handle = super::cache::Sqlite3Cache::get(uid_store.clone())?;
|
||||
let mut response = Vec::with_capacity(8 * 1024);
|
||||
let select_response = conn
|
||||
let select_response = connection
|
||||
.examine_mailbox(mailbox_hash, &mut response, true)
|
||||
.await?
|
||||
.unwrap();
|
||||
|
@ -227,9 +155,13 @@ pub async fn examine_updates(
|
|||
if let Some(v) = uidvalidities.get(&mailbox_hash) {
|
||||
if *v != select_response.uidvalidity {
|
||||
if uid_store.keep_offline_cache {
|
||||
#[cfg(not(feature = "sqlite3"))]
|
||||
let mut cache_handle = super::cache::DefaultCache::get(uid_store.clone())?;
|
||||
#[cfg(feature = "sqlite3")]
|
||||
let mut cache_handle = super::cache::Sqlite3Cache::get(uid_store.clone())?;
|
||||
cache_handle.clear(mailbox_hash, &select_response)?;
|
||||
}
|
||||
conn.add_refresh_event(RefreshEvent {
|
||||
connection.add_refresh_event(RefreshEvent {
|
||||
account_hash: uid_store.account_hash,
|
||||
mailbox_hash,
|
||||
kind: RefreshEventKind::Rescan,
|
||||
|
@ -239,215 +171,384 @@ pub async fn examine_updates(
|
|||
uid_store.hash_index.lock().unwrap().clear();
|
||||
uid_store.byte_cache.lock().unwrap().clear();
|
||||
*/
|
||||
return Ok(());
|
||||
}
|
||||
} else {
|
||||
uidvalidities.insert(mailbox_hash, select_response.uidvalidity);
|
||||
}
|
||||
}
|
||||
if mailbox.is_cold() {
|
||||
/* Mailbox hasn't been loaded yet */
|
||||
let has_list_status: bool = conn
|
||||
.uid_store
|
||||
.capabilities
|
||||
.lock()
|
||||
.unwrap()
|
||||
.iter()
|
||||
.any(|cap| cap.eq_ignore_ascii_case(b"LIST-STATUS"));
|
||||
if has_list_status {
|
||||
conn.send_command(
|
||||
format!(
|
||||
"LIST \"{}\" \"\" RETURN (STATUS (MESSAGES UNSEEN))",
|
||||
mailbox.imap_path()
|
||||
)
|
||||
.as_bytes(),
|
||||
)
|
||||
.await?;
|
||||
conn.read_response(
|
||||
&mut response,
|
||||
RequiredResponses::LIST_REQUIRED | RequiredResponses::STATUS,
|
||||
)
|
||||
.await?;
|
||||
debug!(
|
||||
"list return status out: {}",
|
||||
String::from_utf8_lossy(&response)
|
||||
);
|
||||
for l in response.split_rn() {
|
||||
if !l.starts_with(b"*") {
|
||||
continue;
|
||||
}
|
||||
if let Ok(status) = protocol_parser::status_response(&l).map(|(_, v)| v) {
|
||||
if Some(mailbox_hash) == status.mailbox {
|
||||
if let Some(total) = status.messages {
|
||||
if let Ok(mut exists_lck) = mailbox.exists.lock() {
|
||||
exists_lck.clear();
|
||||
exists_lck.set_not_yet_seen(total);
|
||||
}
|
||||
}
|
||||
if let Some(total) = status.unseen {
|
||||
if let Ok(mut unseen_lck) = mailbox.unseen.lock() {
|
||||
unseen_lck.clear();
|
||||
unseen_lck.set_not_yet_seen(total);
|
||||
}
|
||||
}
|
||||
break;
|
||||
}
|
||||
}
|
||||
}
|
||||
} else {
|
||||
conn.send_command(b"SEARCH UNSEEN").await?;
|
||||
conn.read_response(&mut response, RequiredResponses::SEARCH)
|
||||
.await?;
|
||||
let unseen_count = protocol_parser::search_results(&response)?.1.len();
|
||||
if let Ok(mut exists_lck) = mailbox.exists.lock() {
|
||||
exists_lck.clear();
|
||||
exists_lck.set_not_yet_seen(select_response.exists);
|
||||
}
|
||||
if let Ok(mut unseen_lck) = mailbox.unseen.lock() {
|
||||
unseen_lck.clear();
|
||||
unseen_lck.set_not_yet_seen(unseen_count);
|
||||
}
|
||||
}
|
||||
mailbox.set_warm(true);
|
||||
return Ok(());
|
||||
}
|
||||
|
||||
if select_response.recent > 0 {
|
||||
/* UID SEARCH RECENT */
|
||||
conn.send_command(b"UID SEARCH RECENT").await?;
|
||||
conn.read_response(&mut response, RequiredResponses::SEARCH)
|
||||
.await?;
|
||||
let v = protocol_parser::search_results(response.as_slice()).map(|(_, v)| v)?;
|
||||
if v.is_empty() {
|
||||
debug!(
|
||||
"search response was empty: {}",
|
||||
String::from_utf8_lossy(&response)
|
||||
);
|
||||
return Ok(());
|
||||
}
|
||||
let mut cmd = "UID FETCH ".to_string();
|
||||
if v.len() == 1 {
|
||||
cmd.push_str(&v[0].to_string());
|
||||
} else {
|
||||
cmd.push_str(&v[0].to_string());
|
||||
for n in v.into_iter().skip(1) {
|
||||
cmd.push(',');
|
||||
cmd.push_str(&n.to_string());
|
||||
}
|
||||
}
|
||||
cmd.push_str(
|
||||
" (UID FLAGS ENVELOPE BODY.PEEK[HEADER.FIELDS (REFERENCES)] BODYSTRUCTURE)",
|
||||
);
|
||||
conn.send_command(cmd.as_bytes()).await?;
|
||||
conn.read_response(&mut response, RequiredResponses::FETCH_REQUIRED)
|
||||
.await?;
|
||||
} else if select_response.exists > mailbox.exists.lock().unwrap().len() {
|
||||
conn.send_command(
|
||||
format!(
|
||||
"FETCH {}:* (UID FLAGS ENVELOPE BODY.PEEK[HEADER.FIELDS (REFERENCES)] BODYSTRUCTURE)",
|
||||
std::cmp::max(mailbox.exists.lock().unwrap().len(), 1)
|
||||
)
|
||||
.as_bytes(),
|
||||
)
|
||||
.await?;
|
||||
conn.read_response(&mut response, RequiredResponses::FETCH_REQUIRED)
|
||||
.await?;
|
||||
} else {
|
||||
return Ok(());
|
||||
}
|
||||
debug!(
|
||||
"fetch response is {} bytes and {} lines",
|
||||
response.len(),
|
||||
String::from_utf8_lossy(&response).lines().count()
|
||||
);
|
||||
let (_, mut v, _) = protocol_parser::fetch_responses(&response)?;
|
||||
debug!("responses len is {}", v.len());
|
||||
for FetchResponse {
|
||||
ref uid,
|
||||
ref mut envelope,
|
||||
ref mut flags,
|
||||
ref references,
|
||||
..
|
||||
} in v.iter_mut()
|
||||
{
|
||||
let uid = uid.unwrap();
|
||||
let env = envelope.as_mut().unwrap();
|
||||
env.set_hash(generate_envelope_hash(&mailbox.imap_path(), &uid));
|
||||
if let Some(value) = references {
|
||||
env.set_references(value);
|
||||
}
|
||||
let mut tag_lck = uid_store.collection.tag_index.write().unwrap();
|
||||
if let Some((flags, keywords)) = flags {
|
||||
env.set_flags(*flags);
|
||||
if !env.is_seen() {
|
||||
mailbox.unseen.lock().unwrap().insert_new(env.hash());
|
||||
}
|
||||
mailbox.exists.lock().unwrap().insert_new(env.hash());
|
||||
for f in keywords {
|
||||
let hash = tag_hash!(f);
|
||||
if !tag_lck.contains_key(&hash) {
|
||||
tag_lck.insert(hash, f.to_string());
|
||||
}
|
||||
env.labels_mut().push(hash);
|
||||
}
|
||||
}
|
||||
}
|
||||
if uid_store.keep_offline_cache {
|
||||
if !cache_handle.mailbox_state(mailbox_hash)?.is_none() {
|
||||
cache_handle
|
||||
.insert_envelopes(mailbox_hash, &v)
|
||||
.chain_err_summary(|| {
|
||||
format!(
|
||||
"Could not save envelopes in cache for mailbox {}",
|
||||
mailbox.imap_path()
|
||||
)
|
||||
})?;
|
||||
}
|
||||
}
|
||||
|
||||
for FetchResponse { uid, envelope, .. } in v {
|
||||
if uid.is_none() || envelope.is_none() {
|
||||
let mailboxes: HashMap<MailboxHash, ImapMailbox> = {
|
||||
let mailboxes_lck = timeout(uid_store.timeout, uid_store.mailboxes.lock()).await?;
|
||||
let mut ret = mailboxes_lck.clone();
|
||||
ret.retain(|k, _| mailbox_hashes.contains(k));
|
||||
ret
|
||||
};
|
||||
for (h, mailbox) in mailboxes.iter() {
|
||||
if mailbox_hash == *h {
|
||||
continue;
|
||||
}
|
||||
let uid = uid.unwrap();
|
||||
if uid_store
|
||||
.uid_index
|
||||
.lock()
|
||||
.unwrap()
|
||||
.contains_key(&(mailbox_hash, uid))
|
||||
Self::examine_updates(mailbox, &mut connection, &uid_store).await?;
|
||||
}
|
||||
connection.send_command(b"IDLE").await?;
|
||||
let mut blockn = ImapBlockingConnection::from(connection);
|
||||
let mut watch = std::time::Instant::now();
|
||||
/* duration interval to send heartbeat */
|
||||
const _10_MINS: std::time::Duration = std::time::Duration::from_secs(10 * 60);
|
||||
/* duration interval to check other mailboxes for changes */
|
||||
loop {
|
||||
let line = match timeout(
|
||||
Some(std::cmp::min(*polling_period, _10_MINS)),
|
||||
blockn.as_stream(),
|
||||
)
|
||||
.await
|
||||
{
|
||||
Ok(Some(line)) => line,
|
||||
Ok(None) => {
|
||||
debug!("IDLE connection dropped: {:?}", &blockn.err());
|
||||
blockn.conn.connect().await?;
|
||||
let mut main_conn_lck = timeout(uid_store.timeout, main_conn.lock()).await?;
|
||||
main_conn_lck.connect().await?;
|
||||
continue;
|
||||
}
|
||||
Err(_) => {
|
||||
/* Timeout */
|
||||
blockn.conn.send_raw(b"DONE").await?;
|
||||
blockn
|
||||
.conn
|
||||
.read_response(&mut response, RequiredResponses::empty())
|
||||
.await?;
|
||||
blockn.conn.send_command(b"IDLE").await?;
|
||||
let mut main_conn_lck = timeout(uid_store.timeout, main_conn.lock()).await?;
|
||||
main_conn_lck.connect().await?;
|
||||
continue;
|
||||
}
|
||||
};
|
||||
let now = std::time::Instant::now();
|
||||
if now.duration_since(watch) >= *polling_period {
|
||||
/* Time to poll all inboxes */
|
||||
let mut conn = timeout(uid_store.timeout, main_conn.lock()).await?;
|
||||
for (_h, mailbox) in mailboxes.iter() {
|
||||
Self::examine_updates(mailbox, &mut conn, &uid_store).await?;
|
||||
}
|
||||
watch = now;
|
||||
}
|
||||
if line
|
||||
.split_rn()
|
||||
.filter(|l| {
|
||||
!l.starts_with(b"+ ")
|
||||
&& !l.starts_with(b"* ok")
|
||||
&& !l.starts_with(b"* ok")
|
||||
&& !l.starts_with(b"* Ok")
|
||||
&& !l.starts_with(b"* OK")
|
||||
})
|
||||
.count()
|
||||
== 0
|
||||
{
|
||||
continue;
|
||||
}
|
||||
let env = envelope.unwrap();
|
||||
debug!(
|
||||
"Create event {} {} {}",
|
||||
env.hash(),
|
||||
env.subject(),
|
||||
mailbox.path(),
|
||||
);
|
||||
uid_store
|
||||
.msn_index
|
||||
.lock()
|
||||
.unwrap()
|
||||
.entry(mailbox_hash)
|
||||
.or_default()
|
||||
.push(uid);
|
||||
uid_store
|
||||
.hash_index
|
||||
.lock()
|
||||
.unwrap()
|
||||
.insert(env.hash(), (uid, mailbox_hash));
|
||||
uid_store
|
||||
.uid_index
|
||||
.lock()
|
||||
.unwrap()
|
||||
.insert((mailbox_hash, uid), env.hash());
|
||||
conn.add_refresh_event(RefreshEvent {
|
||||
account_hash: uid_store.account_hash,
|
||||
mailbox_hash,
|
||||
kind: Create(Box::new(env)),
|
||||
});
|
||||
{
|
||||
blockn.conn.send_raw(b"DONE").await?;
|
||||
blockn
|
||||
.conn
|
||||
.read_response(&mut response, RequiredResponses::empty())
|
||||
.await?;
|
||||
for l in line.split_rn().chain(response.split_rn()) {
|
||||
debug!("process_untagged {:?}", String::from_utf8_lossy(&l));
|
||||
if l.starts_with(b"+ ")
|
||||
|| l.starts_with(b"* ok")
|
||||
|| l.starts_with(b"* ok")
|
||||
|| l.starts_with(b"* Ok")
|
||||
|| l.starts_with(b"* OK")
|
||||
{
|
||||
debug!("ignore continuation mark");
|
||||
continue;
|
||||
}
|
||||
blockn.conn.process_untagged(l).await?;
|
||||
}
|
||||
blockn.conn.send_command(b"IDLE").await?;
|
||||
}
|
||||
}
|
||||
}
|
||||
Ok(())
|
||||
pub async fn poll_with_examine(&mut self) -> Result<()> {
|
||||
debug!("poll with examine");
|
||||
let ImapWatcher {
|
||||
ref mailbox_hashes,
|
||||
ref uid_store,
|
||||
ref polling_period,
|
||||
ref server_conf,
|
||||
..
|
||||
} = self;
|
||||
let mut connection = ImapConnection::new_connection(server_conf, uid_store.clone());
|
||||
connection.connect().await?;
|
||||
let mailboxes: HashMap<MailboxHash, ImapMailbox> = {
|
||||
let mailboxes_lck = timeout(uid_store.timeout, uid_store.mailboxes.lock()).await?;
|
||||
let mut ret = mailboxes_lck.clone();
|
||||
ret.retain(|k, _| mailbox_hashes.contains(k));
|
||||
ret
|
||||
};
|
||||
loop {
|
||||
for (_, mailbox) in mailboxes.iter() {
|
||||
Self::examine_updates(mailbox, &mut connection, &uid_store).await?;
|
||||
}
|
||||
crate::connections::sleep(*polling_period).await;
|
||||
}
|
||||
}
|
||||
|
||||
pub async fn examine_updates(
|
||||
mailbox: &ImapMailbox,
|
||||
conn: &mut ImapConnection,
|
||||
uid_store: &Arc<UIDStore>,
|
||||
) -> Result<()> {
|
||||
if mailbox.no_select {
|
||||
return Ok(());
|
||||
}
|
||||
let mailbox_hash = mailbox.hash();
|
||||
debug!("examining mailbox {} {}", mailbox_hash, mailbox.path());
|
||||
if let Some(new_envelopes) = conn.resync(mailbox_hash).await? {
|
||||
for env in new_envelopes {
|
||||
conn.add_refresh_event(RefreshEvent {
|
||||
mailbox_hash,
|
||||
account_hash: uid_store.account_hash,
|
||||
kind: RefreshEventKind::Create(Box::new(env)),
|
||||
});
|
||||
}
|
||||
} else {
|
||||
#[cfg(not(feature = "sqlite3"))]
|
||||
let mut cache_handle = super::cache::DefaultCache::get(uid_store.clone())?;
|
||||
#[cfg(feature = "sqlite3")]
|
||||
let mut cache_handle = super::cache::Sqlite3Cache::get(uid_store.clone())?;
|
||||
let mut response = Vec::with_capacity(8 * 1024);
|
||||
let select_response = conn
|
||||
.examine_mailbox(mailbox_hash, &mut response, true)
|
||||
.await?
|
||||
.unwrap();
|
||||
{
|
||||
let mut uidvalidities = uid_store.uidvalidity.lock().unwrap();
|
||||
|
||||
if let Some(v) = uidvalidities.get(&mailbox_hash) {
|
||||
if *v != select_response.uidvalidity {
|
||||
if uid_store.keep_offline_cache {
|
||||
cache_handle.clear(mailbox_hash, &select_response)?;
|
||||
}
|
||||
conn.add_refresh_event(RefreshEvent {
|
||||
account_hash: uid_store.account_hash,
|
||||
mailbox_hash,
|
||||
kind: RefreshEventKind::Rescan,
|
||||
});
|
||||
/*
|
||||
uid_store.uid_index.lock().unwrap().clear();
|
||||
uid_store.hash_index.lock().unwrap().clear();
|
||||
uid_store.byte_cache.lock().unwrap().clear();
|
||||
*/
|
||||
return Ok(());
|
||||
}
|
||||
} else {
|
||||
uidvalidities.insert(mailbox_hash, select_response.uidvalidity);
|
||||
}
|
||||
}
|
||||
if mailbox.is_cold() {
|
||||
/* Mailbox hasn't been loaded yet */
|
||||
let has_list_status: bool = conn
|
||||
.uid_store
|
||||
.capabilities
|
||||
.lock()
|
||||
.unwrap()
|
||||
.iter()
|
||||
.any(|cap| cap.eq_ignore_ascii_case(b"LIST-STATUS"));
|
||||
if has_list_status {
|
||||
conn.send_command(
|
||||
format!(
|
||||
"LIST \"{}\" \"\" RETURN (STATUS (MESSAGES UNSEEN))",
|
||||
mailbox.imap_path()
|
||||
)
|
||||
.as_bytes(),
|
||||
)
|
||||
.await?;
|
||||
conn.read_response(
|
||||
&mut response,
|
||||
RequiredResponses::LIST_REQUIRED | RequiredResponses::STATUS,
|
||||
)
|
||||
.await?;
|
||||
debug!(
|
||||
"list return status out: {}",
|
||||
String::from_utf8_lossy(&response)
|
||||
);
|
||||
for l in response.split_rn() {
|
||||
if !l.starts_with(b"*") {
|
||||
continue;
|
||||
}
|
||||
if let Ok(status) = protocol_parser::status_response(&l).map(|(_, v)| v) {
|
||||
if Some(mailbox_hash) == status.mailbox {
|
||||
if let Some(total) = status.messages {
|
||||
if let Ok(mut exists_lck) = mailbox.exists.lock() {
|
||||
exists_lck.clear();
|
||||
exists_lck.set_not_yet_seen(total);
|
||||
}
|
||||
}
|
||||
if let Some(total) = status.unseen {
|
||||
if let Ok(mut unseen_lck) = mailbox.unseen.lock() {
|
||||
unseen_lck.clear();
|
||||
unseen_lck.set_not_yet_seen(total);
|
||||
}
|
||||
}
|
||||
break;
|
||||
}
|
||||
}
|
||||
}
|
||||
} else {
|
||||
conn.send_command(b"SEARCH UNSEEN").await?;
|
||||
conn.read_response(&mut response, RequiredResponses::SEARCH)
|
||||
.await?;
|
||||
let unseen_count = protocol_parser::search_results(&response)?.1.len();
|
||||
if let Ok(mut exists_lck) = mailbox.exists.lock() {
|
||||
exists_lck.clear();
|
||||
exists_lck.set_not_yet_seen(select_response.exists);
|
||||
}
|
||||
if let Ok(mut unseen_lck) = mailbox.unseen.lock() {
|
||||
unseen_lck.clear();
|
||||
unseen_lck.set_not_yet_seen(unseen_count);
|
||||
}
|
||||
}
|
||||
mailbox.set_warm(true);
|
||||
return Ok(());
|
||||
}
|
||||
|
||||
if select_response.recent > 0 {
|
||||
/* UID SEARCH RECENT */
|
||||
conn.send_command(b"UID SEARCH RECENT").await?;
|
||||
conn.read_response(&mut response, RequiredResponses::SEARCH)
|
||||
.await?;
|
||||
let v = protocol_parser::search_results(response.as_slice()).map(|(_, v)| v)?;
|
||||
if v.is_empty() {
|
||||
debug!(
|
||||
"search response was empty: {}",
|
||||
String::from_utf8_lossy(&response)
|
||||
);
|
||||
return Ok(());
|
||||
}
|
||||
let mut cmd = "UID FETCH ".to_string();
|
||||
if v.len() == 1 {
|
||||
cmd.push_str(&v[0].to_string());
|
||||
} else {
|
||||
cmd.push_str(&v[0].to_string());
|
||||
for n in v.into_iter().skip(1) {
|
||||
cmd.push(',');
|
||||
cmd.push_str(&n.to_string());
|
||||
}
|
||||
}
|
||||
cmd.push_str(
|
||||
" (UID FLAGS ENVELOPE BODY.PEEK[HEADER.FIELDS (REFERENCES)] BODYSTRUCTURE)",
|
||||
);
|
||||
conn.send_command(cmd.as_bytes()).await?;
|
||||
conn.read_response(&mut response, RequiredResponses::FETCH_REQUIRED)
|
||||
.await?;
|
||||
} else if select_response.exists > mailbox.exists.lock().unwrap().len() {
|
||||
conn.send_command(
|
||||
format!(
|
||||
"FETCH {}:* (UID FLAGS ENVELOPE BODY.PEEK[HEADER.FIELDS (REFERENCES)] BODYSTRUCTURE)",
|
||||
std::cmp::max(mailbox.exists.lock().unwrap().len(), 1)
|
||||
)
|
||||
.as_bytes(),
|
||||
)
|
||||
.await?;
|
||||
conn.read_response(&mut response, RequiredResponses::FETCH_REQUIRED)
|
||||
.await?;
|
||||
} else {
|
||||
return Ok(());
|
||||
}
|
||||
debug!(
|
||||
"fetch response is {} bytes and {} lines",
|
||||
response.len(),
|
||||
String::from_utf8_lossy(&response).lines().count()
|
||||
);
|
||||
let (_, mut v, _) = protocol_parser::fetch_responses(&response)?;
|
||||
debug!("responses len is {}", v.len());
|
||||
for FetchResponse {
|
||||
ref uid,
|
||||
ref mut envelope,
|
||||
ref mut flags,
|
||||
ref references,
|
||||
..
|
||||
} in v.iter_mut()
|
||||
{
|
||||
let uid = uid.unwrap();
|
||||
let env = envelope.as_mut().unwrap();
|
||||
env.set_hash(generate_envelope_hash(&mailbox.imap_path(), &uid));
|
||||
if let Some(value) = references {
|
||||
env.set_references(value);
|
||||
}
|
||||
let mut tag_lck = uid_store.collection.tag_index.write().unwrap();
|
||||
if let Some((flags, keywords)) = flags {
|
||||
env.set_flags(*flags);
|
||||
if !env.is_seen() {
|
||||
mailbox.unseen.lock().unwrap().insert_new(env.hash());
|
||||
}
|
||||
mailbox.exists.lock().unwrap().insert_new(env.hash());
|
||||
for f in keywords {
|
||||
let hash = tag_hash!(f);
|
||||
if !tag_lck.contains_key(&hash) {
|
||||
tag_lck.insert(hash, f.to_string());
|
||||
}
|
||||
env.labels_mut().push(hash);
|
||||
}
|
||||
}
|
||||
}
|
||||
if uid_store.keep_offline_cache {
|
||||
if !cache_handle.mailbox_state(mailbox_hash)?.is_none() {
|
||||
cache_handle
|
||||
.insert_envelopes(mailbox_hash, &v)
|
||||
.chain_err_summary(|| {
|
||||
format!(
|
||||
"Could not save envelopes in cache for mailbox {}",
|
||||
mailbox.imap_path()
|
||||
)
|
||||
})?;
|
||||
}
|
||||
}
|
||||
|
||||
for FetchResponse { uid, envelope, .. } in v {
|
||||
if uid.is_none() || envelope.is_none() {
|
||||
continue;
|
||||
}
|
||||
let uid = uid.unwrap();
|
||||
if uid_store
|
||||
.uid_index
|
||||
.lock()
|
||||
.unwrap()
|
||||
.contains_key(&(mailbox_hash, uid))
|
||||
{
|
||||
continue;
|
||||
}
|
||||
let env = envelope.unwrap();
|
||||
debug!(
|
||||
"Create event {} {} {}",
|
||||
env.hash(),
|
||||
env.subject(),
|
||||
mailbox.path(),
|
||||
);
|
||||
uid_store
|
||||
.msn_index
|
||||
.lock()
|
||||
.unwrap()
|
||||
.entry(mailbox_hash)
|
||||
.or_default()
|
||||
.push(uid);
|
||||
uid_store
|
||||
.hash_index
|
||||
.lock()
|
||||
.unwrap()
|
||||
.insert(env.hash(), (uid, mailbox_hash));
|
||||
uid_store
|
||||
.uid_index
|
||||
.lock()
|
||||
.unwrap()
|
||||
.insert((mailbox_hash, uid), env.hash());
|
||||
conn.add_refresh_event(RefreshEvent {
|
||||
account_hash: uid_store.account_hash,
|
||||
mailbox_hash,
|
||||
kind: Create(Box::new(env)),
|
||||
});
|
||||
}
|
||||
}
|
||||
Ok(())
|
||||
}
|
||||
}
|
||||
|
|
|
@ -90,6 +90,8 @@ use objects::*;
|
|||
pub mod mailbox;
|
||||
use mailbox::*;
|
||||
|
||||
pub mod watch;
|
||||
|
||||
#[derive(Debug, Default)]
|
||||
pub struct EnvelopeCache {
|
||||
bytes: Option<String>,
|
||||
|
@ -313,6 +315,147 @@ impl MailBackend for JmapType {
|
|||
}))
|
||||
}
|
||||
|
||||
fn load(&mut self, mailbox_hash: MailboxHash) -> ResultFuture<()> {
|
||||
let store = self.store.clone();
|
||||
let connection = self.connection.clone();
|
||||
Ok(Box::pin(async move {
|
||||
{
|
||||
crate::connections::sleep(std::time::Duration::from_secs(2)).await;
|
||||
}
|
||||
let mailbox_id = store.mailboxes.read().unwrap()[&mailbox_hash].id.clone();
|
||||
let email_query_call: EmailQuery = EmailQuery::new(
|
||||
Query::new()
|
||||
.account_id(conn.mail_account_id().clone())
|
||||
.filter(Some(Filter::Condition(
|
||||
EmailFilterCondition::new()
|
||||
.in_mailbox(Some(mailbox_id))
|
||||
.into(),
|
||||
)))
|
||||
.position(0)
|
||||
.properties(Some(vec![
|
||||
"id".to_string(),
|
||||
"receivedAt".to_string(),
|
||||
"messageId".to_string(),
|
||||
"inReplyTo".to_string(),
|
||||
"hasAttachment".to_string(),
|
||||
"keywords".to_string(),
|
||||
])),
|
||||
)
|
||||
.collapse_threads(false);
|
||||
|
||||
let mut req = Request::new(conn.request_no.clone());
|
||||
let prev_seq = req.add_call(&email_query_call);
|
||||
|
||||
let email_call: EmailGet = EmailGet::new(
|
||||
Get::new()
|
||||
.ids(Some(JmapArgument::reference(
|
||||
prev_seq,
|
||||
EmailQuery::RESULT_FIELD_IDS,
|
||||
)))
|
||||
.account_id(conn.mail_account_id().clone()),
|
||||
);
|
||||
|
||||
req.add_call(&email_call);
|
||||
|
||||
let api_url = conn.session.lock().unwrap().api_url.clone();
|
||||
let mut res = conn
|
||||
.client
|
||||
.post_async(api_url.as_str(), serde_json::to_string(&req)?)
|
||||
.await?;
|
||||
|
||||
let res_text = res.text_async().await?;
|
||||
|
||||
let mut v: MethodResponse = serde_json::from_str(&res_text).unwrap();
|
||||
let e = GetResponse::<EmailObject>::try_from(v.method_responses.pop().unwrap())?;
|
||||
let query_response =
|
||||
QueryResponse::<EmailObject>::try_from(v.method_responses.pop().unwrap())?;
|
||||
store
|
||||
.mailboxes
|
||||
.write()
|
||||
.unwrap()
|
||||
.entry(mailbox_hash)
|
||||
.and_modify(|mbox| {
|
||||
*mbox.email_query_state.lock().unwrap() = Some(query_response.query_state);
|
||||
});
|
||||
let GetResponse::<EmailObject> { list, state, .. } = e;
|
||||
{
|
||||
let (is_empty, is_equal) = {
|
||||
let mailboxes_lck = conn.store.mailboxes.read().unwrap();
|
||||
mailboxes_lck
|
||||
.get(&mailbox_hash)
|
||||
.map(|mbox| {
|
||||
let current_state_lck = mbox.email_state.lock().unwrap();
|
||||
(
|
||||
current_state_lck.is_none(),
|
||||
current_state_lck.as_ref() != Some(&state),
|
||||
)
|
||||
})
|
||||
.unwrap_or((true, true))
|
||||
};
|
||||
if is_empty {
|
||||
let mut mailboxes_lck = conn.store.mailboxes.write().unwrap();
|
||||
debug!("{:?}: inserting state {}", EmailObject::NAME, &state);
|
||||
mailboxes_lck.entry(mailbox_hash).and_modify(|mbox| {
|
||||
*mbox.email_state.lock().unwrap() = Some(state);
|
||||
});
|
||||
} else if !is_equal {
|
||||
conn.email_changes(mailbox_hash).await?;
|
||||
}
|
||||
}
|
||||
let mut total = BTreeSet::default();
|
||||
let mut unread = BTreeSet::default();
|
||||
let new_envelopes: HashMap<EnvelopeHash, Envelope> = list
|
||||
.into_iter(|obj| {
|
||||
let env = store.add_envelope(obj);
|
||||
total.insert(env.hash());
|
||||
if !env.is_seen() {
|
||||
unread.insert(env.hash());
|
||||
}
|
||||
(env.hash(), env)
|
||||
})
|
||||
.collect();
|
||||
let mut mailboxes_lck = store.mailboxes.write().unwrap();
|
||||
mailboxes_lck.entry(mailbox_hash).and_modify(|mbox| {
|
||||
mbox.total_emails.lock().unwrap().insert_existing_set(total);
|
||||
mbox.unread_emails
|
||||
.lock()
|
||||
.unwrap()
|
||||
.insert_existing_set(unread);
|
||||
});
|
||||
let keys: BTreeSet<EnvelopeHash> = new_envelopes.keys().cloned().collect();
|
||||
collection.merge(new_envelopes, mailbox_hash, None);
|
||||
let mut envelopes_lck = collection.envelopes.write().unwrap();
|
||||
envelopes_lck.retain(|k, _| !keys.contains(k));
|
||||
|
||||
Ok(())
|
||||
}))
|
||||
}
|
||||
|
||||
fn fetch_batch(&mut self, env_hashes: EnvelopeHashBatch) -> ResultFuture<()> {
|
||||
todo!()
|
||||
/*
|
||||
let store = self.store.clone();
|
||||
let connection = self.connection.clone();
|
||||
Ok(Box::pin(async move {
|
||||
//crate::connections::sleep(std::time::Duration::from_secs(2)).await;
|
||||
debug!("fetch_batch {:?}", &env_hashes);
|
||||
let mut envelopes_lck = collection.envelopes.write().unwrap();
|
||||
for env_hash in env_hashes.iter() {
|
||||
if envelopes_lck.contains_key(&env_hash) {
|
||||
continue;
|
||||
}
|
||||
let index_lck = index.write().unwrap();
|
||||
let message = Message::find_message(&database, &index_lck[&env_hash])?;
|
||||
drop(index_lck);
|
||||
let env = message.into_envelope(&index, &collection.tag_index);
|
||||
envelopes_lck.insert(env_hash, env);
|
||||
}
|
||||
debug!("fetch_batch {:?} done", &env_hashes);
|
||||
Ok(())
|
||||
}))
|
||||
*/
|
||||
}
|
||||
|
||||
fn fetch(
|
||||
&mut self,
|
||||
mailbox_hash: MailboxHash,
|
||||
|
@ -341,32 +484,12 @@ impl MailBackend for JmapType {
|
|||
}))
|
||||
}
|
||||
|
||||
fn watch(&self) -> ResultFuture<()> {
|
||||
fn watcher(&self) -> Result<Box<dyn BackendWatcher>> {
|
||||
let connection = self.connection.clone();
|
||||
let store = self.store.clone();
|
||||
Ok(Box::pin(async move {
|
||||
{
|
||||
let mut conn = connection.lock().await;
|
||||
conn.connect().await?;
|
||||
}
|
||||
loop {
|
||||
{
|
||||
let mailbox_hashes = {
|
||||
store
|
||||
.mailboxes
|
||||
.read()
|
||||
.unwrap()
|
||||
.keys()
|
||||
.cloned()
|
||||
.collect::<SmallVec<[MailboxHash; 16]>>()
|
||||
};
|
||||
let conn = connection.lock().await;
|
||||
for mailbox_hash in mailbox_hashes {
|
||||
conn.email_changes(mailbox_hash).await?;
|
||||
}
|
||||
}
|
||||
crate::connections::sleep(std::time::Duration::from_secs(60)).await;
|
||||
}
|
||||
Ok(Box::new(watch::JmapWatcher {
|
||||
connection,
|
||||
mailbox_hashes: BTreeSet::default(),
|
||||
polling_period: std::time::Duration::from_secs(60),
|
||||
}))
|
||||
}
|
||||
|
||||
|
|
|
@ -0,0 +1,83 @@
|
|||
/*
|
||||
* melib - JMAP
|
||||
*
|
||||
* Copyright 2020 Manos Pitsidianakis
|
||||
*
|
||||
* This file is part of meli.
|
||||
*
|
||||
* meli is free software: you can redistribute it and/or modify
|
||||
* it under the terms of the GNU General Public License as published by
|
||||
* the Free Software Foundation, either version 3 of the License, or
|
||||
* (at your option) any later version.
|
||||
*
|
||||
* meli is distributed in the hope that it will be useful,
|
||||
* but WITHOUT ANY WARRANTY; without even the implied warranty of
|
||||
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
|
||||
* GNU General Public License for more details.
|
||||
*
|
||||
* You should have received a copy of the GNU General Public License
|
||||
* along with meli. If not, see <http://www.gnu.org/licenses/>.
|
||||
*/
|
||||
|
||||
use super::*;
|
||||
use std::collections::BTreeSet;
|
||||
|
||||
#[derive(Debug)]
|
||||
pub struct JmapWatcher {
|
||||
pub mailbox_hashes: BTreeSet<MailboxHash>,
|
||||
pub polling_period: std::time::Duration,
|
||||
pub connection: Arc<FutureMutex<JmapConnection>>,
|
||||
}
|
||||
|
||||
impl BackendWatcher for JmapWatcher {
|
||||
fn is_blocking(&self) -> bool {
|
||||
false
|
||||
}
|
||||
|
||||
fn register_mailbox(
|
||||
&mut self,
|
||||
mailbox_hash: MailboxHash,
|
||||
_urgency: MailboxWatchUrgency,
|
||||
) -> Result<()> {
|
||||
self.mailbox_hashes.insert(mailbox_hash);
|
||||
Ok(())
|
||||
}
|
||||
|
||||
fn set_polling_period(&mut self, period: Option<std::time::Duration>) -> Result<()> {
|
||||
if let Some(period) = period {
|
||||
self.polling_period = period;
|
||||
}
|
||||
Ok(())
|
||||
}
|
||||
|
||||
fn spawn(self: Box<Self>) -> ResultFuture<()> {
|
||||
let JmapWatcher {
|
||||
mailbox_hashes,
|
||||
polling_period,
|
||||
connection,
|
||||
} = *self;
|
||||
Ok(Box::pin(async move {
|
||||
{
|
||||
let mut conn = connection.lock().await;
|
||||
conn.connect().await?;
|
||||
}
|
||||
loop {
|
||||
{
|
||||
let conn = connection.lock().await;
|
||||
for &mailbox_hash in &mailbox_hashes {
|
||||
conn.email_changes(mailbox_hash).await?;
|
||||
}
|
||||
}
|
||||
crate::connections::sleep(polling_period).await;
|
||||
}
|
||||
}))
|
||||
}
|
||||
|
||||
fn as_any(&self) -> &dyn Any {
|
||||
self
|
||||
}
|
||||
|
||||
fn as_any_mut(&mut self) -> &mut dyn Any {
|
||||
self
|
||||
}
|
||||
}
|
|
@ -26,6 +26,8 @@ pub use self::backend::*;
|
|||
mod stream;
|
||||
pub use stream::*;
|
||||
|
||||
pub mod watch;
|
||||
|
||||
use crate::backends::*;
|
||||
use crate::email::Flag;
|
||||
use crate::error::{MeliError, Result};
|
||||
|
|
|
@ -27,20 +27,13 @@ use crate::error::{ErrorKind, MeliError, Result};
|
|||
use crate::shellexpand::ShellExpandTrait;
|
||||
use crate::Collection;
|
||||
use futures::prelude::Stream;
|
||||
|
||||
extern crate notify;
|
||||
use self::notify::{watcher, DebouncedEvent, RecursiveMode, Watcher};
|
||||
use std::time::Duration;
|
||||
|
||||
use std::collections::{hash_map::DefaultHasher, HashMap, HashSet};
|
||||
use std::ffi::OsStr;
|
||||
use std::fs;
|
||||
use std::hash::{Hash, Hasher};
|
||||
use std::io::{self, Read, Write};
|
||||
use std::ops::{Deref, DerefMut};
|
||||
use std::os::unix::fs::PermissionsExt;
|
||||
use std::path::{Component, Path, PathBuf};
|
||||
use std::sync::mpsc::channel;
|
||||
use std::path::{Path, PathBuf};
|
||||
use std::sync::{Arc, Mutex};
|
||||
|
||||
#[derive(Clone, Debug, PartialEq)]
|
||||
|
@ -339,494 +332,28 @@ impl MailBackend for MaildirType {
|
|||
}))
|
||||
}
|
||||
|
||||
fn watch(&self) -> ResultFuture<()> {
|
||||
let sender = self.event_consumer.clone();
|
||||
let (tx, rx) = channel();
|
||||
let mut watcher = watcher(tx, Duration::from_secs(2)).unwrap();
|
||||
fn watcher(&self) -> Result<Box<dyn BackendWatcher>> {
|
||||
let account_hash = {
|
||||
let mut hasher = DefaultHasher::default();
|
||||
hasher.write(self.name.as_bytes());
|
||||
hasher.finish()
|
||||
};
|
||||
let root_path = self.path.to_path_buf();
|
||||
watcher.watch(&root_path, RecursiveMode::Recursive).unwrap();
|
||||
let cache_dir = xdg::BaseDirectories::with_profile("meli", &self.name).unwrap();
|
||||
debug!("watching {:?}", root_path);
|
||||
let event_consumer = self.event_consumer.clone();
|
||||
let hash_indexes = self.hash_indexes.clone();
|
||||
let mailbox_index = self.mailbox_index.clone();
|
||||
let root_mailbox_hash: MailboxHash = self
|
||||
.mailboxes
|
||||
.values()
|
||||
.find(|m| m.parent.is_none())
|
||||
.map(|m| m.hash())
|
||||
.unwrap();
|
||||
let mailbox_counts = self
|
||||
.mailboxes
|
||||
.iter()
|
||||
.map(|(&k, v)| (k, (v.unseen.clone(), v.total.clone())))
|
||||
.collect::<HashMap<MailboxHash, (Arc<Mutex<usize>>, Arc<Mutex<usize>>)>>();
|
||||
Ok(Box::pin(async move {
|
||||
// Move `watcher` in the closure's scope so that it doesn't get dropped.
|
||||
let _watcher = watcher;
|
||||
let mut buf = Vec::with_capacity(4096);
|
||||
loop {
|
||||
match rx.recv() {
|
||||
/*
|
||||
* Event types:
|
||||
*
|
||||
* pub enum RefreshEventKind {
|
||||
* Update(EnvelopeHash, Envelope), // Old hash, new envelope
|
||||
* Create(Envelope),
|
||||
* Remove(EnvelopeHash),
|
||||
* Rescan,
|
||||
* }
|
||||
*/
|
||||
Ok(event) => match event {
|
||||
/* Create */
|
||||
DebouncedEvent::Create(mut pathbuf) => {
|
||||
debug!("DebouncedEvent::Create(path = {:?}", pathbuf);
|
||||
if path_is_new!(pathbuf) {
|
||||
debug!("path_is_new");
|
||||
/* This creates a Rename event that we will receive later */
|
||||
pathbuf = match move_to_cur(pathbuf) {
|
||||
Ok(p) => p,
|
||||
Err(e) => {
|
||||
debug!("error: {}", e.to_string());
|
||||
continue;
|
||||
}
|
||||
};
|
||||
}
|
||||
let mailbox_hash = get_path_hash!(pathbuf);
|
||||
let file_name = pathbuf
|
||||
.as_path()
|
||||
.strip_prefix(&root_path)
|
||||
.unwrap()
|
||||
.to_path_buf();
|
||||
if let Ok(env) = add_path_to_index(
|
||||
&hash_indexes,
|
||||
mailbox_hash,
|
||||
pathbuf.as_path(),
|
||||
&cache_dir,
|
||||
file_name,
|
||||
&mut buf,
|
||||
) {
|
||||
mailbox_index
|
||||
.lock()
|
||||
.unwrap()
|
||||
.insert(env.hash(), mailbox_hash);
|
||||
debug!(
|
||||
"Create event {} {} {}",
|
||||
env.hash(),
|
||||
env.subject(),
|
||||
pathbuf.display()
|
||||
);
|
||||
if !env.is_seen() {
|
||||
*mailbox_counts[&mailbox_hash].0.lock().unwrap() += 1;
|
||||
}
|
||||
*mailbox_counts[&mailbox_hash].1.lock().unwrap() += 1;
|
||||
(sender)(
|
||||
account_hash,
|
||||
BackendEvent::Refresh(RefreshEvent {
|
||||
account_hash,
|
||||
mailbox_hash,
|
||||
kind: Create(Box::new(env)),
|
||||
}),
|
||||
);
|
||||
}
|
||||
}
|
||||
/* Update */
|
||||
DebouncedEvent::NoticeWrite(pathbuf) | DebouncedEvent::Write(pathbuf) => {
|
||||
debug!("DebouncedEvent::Write(path = {:?}", &pathbuf);
|
||||
let mailbox_hash = get_path_hash!(pathbuf);
|
||||
let mut hash_indexes_lock = hash_indexes.lock().unwrap();
|
||||
let index_lock =
|
||||
&mut hash_indexes_lock.entry(mailbox_hash).or_default();
|
||||
let file_name = pathbuf
|
||||
.as_path()
|
||||
.strip_prefix(&root_path)
|
||||
.unwrap()
|
||||
.to_path_buf();
|
||||
/* Linear search in hash_index to find old hash */
|
||||
let old_hash: EnvelopeHash = {
|
||||
if let Some((k, v)) =
|
||||
index_lock.iter_mut().find(|(_, v)| *v.buf == pathbuf)
|
||||
{
|
||||
*v = pathbuf.clone().into();
|
||||
*k
|
||||
} else {
|
||||
drop(hash_indexes_lock);
|
||||
/* Did we just miss a Create event? In any case, create
|
||||
* envelope. */
|
||||
if let Ok(env) = add_path_to_index(
|
||||
&hash_indexes,
|
||||
mailbox_hash,
|
||||
pathbuf.as_path(),
|
||||
&cache_dir,
|
||||
file_name,
|
||||
&mut buf,
|
||||
) {
|
||||
mailbox_index
|
||||
.lock()
|
||||
.unwrap()
|
||||
.insert(env.hash(), mailbox_hash);
|
||||
(sender)(
|
||||
account_hash,
|
||||
BackendEvent::Refresh(RefreshEvent {
|
||||
account_hash,
|
||||
mailbox_hash,
|
||||
kind: Create(Box::new(env)),
|
||||
}),
|
||||
);
|
||||
}
|
||||
continue;
|
||||
}
|
||||
};
|
||||
let new_hash: EnvelopeHash = get_file_hash(pathbuf.as_path());
|
||||
let mut reader = io::BufReader::new(fs::File::open(&pathbuf)?);
|
||||
buf.clear();
|
||||
reader.read_to_end(&mut buf)?;
|
||||
if index_lock.get_mut(&new_hash).is_none() {
|
||||
debug!("write notice");
|
||||
if let Ok(mut env) =
|
||||
Envelope::from_bytes(buf.as_slice(), Some(pathbuf.flags()))
|
||||
{
|
||||
env.set_hash(new_hash);
|
||||
debug!("{}\t{:?}", new_hash, &pathbuf);
|
||||
debug!(
|
||||
"hash {}, path: {:?} couldn't be parsed",
|
||||
new_hash, &pathbuf
|
||||
);
|
||||
index_lock.insert(new_hash, pathbuf.into());
|
||||
|
||||
/* Send Write notice */
|
||||
|
||||
(sender)(
|
||||
account_hash,
|
||||
BackendEvent::Refresh(RefreshEvent {
|
||||
account_hash,
|
||||
mailbox_hash,
|
||||
kind: Update(old_hash, Box::new(env)),
|
||||
}),
|
||||
);
|
||||
}
|
||||
}
|
||||
}
|
||||
/* Remove */
|
||||
DebouncedEvent::NoticeRemove(pathbuf) | DebouncedEvent::Remove(pathbuf) => {
|
||||
debug!("DebouncedEvent::Remove(path = {:?}", pathbuf);
|
||||
let mailbox_hash = get_path_hash!(pathbuf);
|
||||
let mut hash_indexes_lock = hash_indexes.lock().unwrap();
|
||||
let index_lock = hash_indexes_lock.entry(mailbox_hash).or_default();
|
||||
let hash: EnvelopeHash = if let Some((k, _)) =
|
||||
index_lock.iter().find(|(_, v)| *v.buf == pathbuf)
|
||||
{
|
||||
*k
|
||||
} else {
|
||||
debug!("removed but not contained in index");
|
||||
continue;
|
||||
};
|
||||
if let Some(ref modif) = &index_lock[&hash].modified {
|
||||
match modif {
|
||||
PathMod::Path(path) => debug!(
|
||||
"envelope {} has modified path set {}",
|
||||
hash,
|
||||
path.display()
|
||||
),
|
||||
PathMod::Hash(hash) => debug!(
|
||||
"envelope {} has modified path set {}",
|
||||
hash,
|
||||
&index_lock[&hash].buf.display()
|
||||
),
|
||||
}
|
||||
index_lock.entry(hash).and_modify(|e| {
|
||||
e.removed = false;
|
||||
});
|
||||
continue;
|
||||
}
|
||||
{
|
||||
let mut lck = mailbox_counts[&mailbox_hash].1.lock().unwrap();
|
||||
*lck = lck.saturating_sub(1);
|
||||
}
|
||||
if !pathbuf.flags().contains(Flag::SEEN) {
|
||||
let mut lck = mailbox_counts[&mailbox_hash].0.lock().unwrap();
|
||||
*lck = lck.saturating_sub(1);
|
||||
}
|
||||
|
||||
index_lock.entry(hash).and_modify(|e| {
|
||||
e.removed = true;
|
||||
});
|
||||
|
||||
(sender)(
|
||||
account_hash,
|
||||
BackendEvent::Refresh(RefreshEvent {
|
||||
account_hash,
|
||||
mailbox_hash,
|
||||
kind: Remove(hash),
|
||||
}),
|
||||
);
|
||||
}
|
||||
/* Envelope hasn't changed */
|
||||
DebouncedEvent::Rename(src, dest) => {
|
||||
debug!("DebouncedEvent::Rename(src = {:?}, dest = {:?})", src, dest);
|
||||
let mailbox_hash = get_path_hash!(src);
|
||||
let dest_mailbox = {
|
||||
let dest_mailbox = get_path_hash!(dest);
|
||||
if dest_mailbox == mailbox_hash {
|
||||
None
|
||||
} else {
|
||||
Some(dest_mailbox)
|
||||
}
|
||||
};
|
||||
let old_hash: EnvelopeHash = get_file_hash(src.as_path());
|
||||
let new_hash: EnvelopeHash = get_file_hash(dest.as_path());
|
||||
|
||||
let mut hash_indexes_lock = hash_indexes.lock().unwrap();
|
||||
let index_lock = hash_indexes_lock.entry(mailbox_hash).or_default();
|
||||
let old_flags = src.flags();
|
||||
let new_flags = dest.flags();
|
||||
let was_seen: bool = old_flags.contains(Flag::SEEN);
|
||||
let is_seen: bool = new_flags.contains(Flag::SEEN);
|
||||
|
||||
if index_lock.contains_key(&old_hash) && !index_lock[&old_hash].removed
|
||||
{
|
||||
debug!("contains_old_key");
|
||||
if let Some(dest_mailbox) = dest_mailbox {
|
||||
index_lock.entry(old_hash).and_modify(|e| {
|
||||
e.removed = true;
|
||||
});
|
||||
|
||||
(sender)(
|
||||
account_hash,
|
||||
BackendEvent::Refresh(RefreshEvent {
|
||||
account_hash,
|
||||
mailbox_hash,
|
||||
kind: Remove(old_hash),
|
||||
}),
|
||||
);
|
||||
let file_name = dest
|
||||
.as_path()
|
||||
.strip_prefix(&root_path)
|
||||
.unwrap()
|
||||
.to_path_buf();
|
||||
drop(hash_indexes_lock);
|
||||
if let Ok(env) = add_path_to_index(
|
||||
&hash_indexes,
|
||||
dest_mailbox,
|
||||
dest.as_path(),
|
||||
&cache_dir,
|
||||
file_name,
|
||||
&mut buf,
|
||||
) {
|
||||
mailbox_index
|
||||
.lock()
|
||||
.unwrap()
|
||||
.insert(env.hash(), dest_mailbox);
|
||||
debug!(
|
||||
"Create event {} {} {}",
|
||||
env.hash(),
|
||||
env.subject(),
|
||||
dest.display()
|
||||
);
|
||||
if !env.is_seen() {
|
||||
*mailbox_counts[&dest_mailbox].0.lock().unwrap() += 1;
|
||||
}
|
||||
*mailbox_counts[&dest_mailbox].1.lock().unwrap() += 1;
|
||||
(sender)(
|
||||
account_hash,
|
||||
BackendEvent::Refresh(RefreshEvent {
|
||||
account_hash,
|
||||
mailbox_hash: dest_mailbox,
|
||||
kind: Create(Box::new(env)),
|
||||
}),
|
||||
);
|
||||
}
|
||||
} else {
|
||||
index_lock.entry(old_hash).and_modify(|e| {
|
||||
debug!(&e.modified);
|
||||
e.modified = Some(PathMod::Hash(new_hash));
|
||||
});
|
||||
(sender)(
|
||||
account_hash,
|
||||
BackendEvent::Refresh(RefreshEvent {
|
||||
account_hash,
|
||||
mailbox_hash,
|
||||
kind: Rename(old_hash, new_hash),
|
||||
}),
|
||||
);
|
||||
if !was_seen && is_seen {
|
||||
let mut lck =
|
||||
mailbox_counts[&mailbox_hash].0.lock().unwrap();
|
||||
*lck = lck.saturating_sub(1);
|
||||
} else if was_seen && !is_seen {
|
||||
*mailbox_counts[&mailbox_hash].0.lock().unwrap() += 1;
|
||||
}
|
||||
if old_flags != new_flags {
|
||||
(sender)(
|
||||
account_hash,
|
||||
BackendEvent::Refresh(RefreshEvent {
|
||||
account_hash,
|
||||
mailbox_hash,
|
||||
kind: NewFlags(new_hash, (new_flags, vec![])),
|
||||
}),
|
||||
);
|
||||
}
|
||||
mailbox_index.lock().unwrap().insert(new_hash, mailbox_hash);
|
||||
index_lock.insert(new_hash, dest.into());
|
||||
}
|
||||
continue;
|
||||
} else if !index_lock.contains_key(&new_hash)
|
||||
&& index_lock
|
||||
.get(&old_hash)
|
||||
.map(|e| e.removed)
|
||||
.unwrap_or(false)
|
||||
{
|
||||
if index_lock
|
||||
.get(&old_hash)
|
||||
.map(|e| e.removed)
|
||||
.unwrap_or(false)
|
||||
{
|
||||
index_lock.entry(old_hash).and_modify(|e| {
|
||||
e.modified = Some(PathMod::Hash(new_hash));
|
||||
e.removed = false;
|
||||
});
|
||||
debug!("contains_old_key, key was marked as removed (by external source)");
|
||||
} else {
|
||||
debug!("not contains_new_key");
|
||||
}
|
||||
let file_name = dest
|
||||
.as_path()
|
||||
.strip_prefix(&root_path)
|
||||
.unwrap()
|
||||
.to_path_buf();
|
||||
debug!("filename = {:?}", file_name);
|
||||
drop(hash_indexes_lock);
|
||||
if let Ok(env) = add_path_to_index(
|
||||
&hash_indexes,
|
||||
dest_mailbox.unwrap_or(mailbox_hash),
|
||||
dest.as_path(),
|
||||
&cache_dir,
|
||||
file_name,
|
||||
&mut buf,
|
||||
) {
|
||||
mailbox_index
|
||||
.lock()
|
||||
.unwrap()
|
||||
.insert(env.hash(), dest_mailbox.unwrap_or(mailbox_hash));
|
||||
debug!(
|
||||
"Create event {} {} {}",
|
||||
env.hash(),
|
||||
env.subject(),
|
||||
dest.display()
|
||||
);
|
||||
if !env.is_seen() {
|
||||
*mailbox_counts[&dest_mailbox.unwrap_or(mailbox_hash)]
|
||||
.0
|
||||
.lock()
|
||||
.unwrap() += 1;
|
||||
}
|
||||
*mailbox_counts[&dest_mailbox.unwrap_or(mailbox_hash)]
|
||||
.1
|
||||
.lock()
|
||||
.unwrap() += 1;
|
||||
(sender)(
|
||||
account_hash,
|
||||
BackendEvent::Refresh(RefreshEvent {
|
||||
account_hash,
|
||||
mailbox_hash: dest_mailbox.unwrap_or(mailbox_hash),
|
||||
kind: Create(Box::new(env)),
|
||||
}),
|
||||
);
|
||||
continue;
|
||||
} else {
|
||||
debug!("not valid email");
|
||||
}
|
||||
} else if let Some(dest_mailbox) = dest_mailbox {
|
||||
drop(hash_indexes_lock);
|
||||
let file_name = dest
|
||||
.as_path()
|
||||
.strip_prefix(&root_path)
|
||||
.unwrap()
|
||||
.to_path_buf();
|
||||
if let Ok(env) = add_path_to_index(
|
||||
&hash_indexes,
|
||||
dest_mailbox,
|
||||
dest.as_path(),
|
||||
&cache_dir,
|
||||
file_name,
|
||||
&mut buf,
|
||||
) {
|
||||
mailbox_index
|
||||
.lock()
|
||||
.unwrap()
|
||||
.insert(env.hash(), dest_mailbox);
|
||||
debug!(
|
||||
"Create event {} {} {}",
|
||||
env.hash(),
|
||||
env.subject(),
|
||||
dest.display()
|
||||
);
|
||||
if !env.is_seen() {
|
||||
*mailbox_counts[&dest_mailbox].0.lock().unwrap() += 1;
|
||||
}
|
||||
*mailbox_counts[&dest_mailbox].1.lock().unwrap() += 1;
|
||||
(sender)(
|
||||
account_hash,
|
||||
BackendEvent::Refresh(RefreshEvent {
|
||||
account_hash,
|
||||
mailbox_hash: dest_mailbox,
|
||||
kind: Create(Box::new(env)),
|
||||
}),
|
||||
);
|
||||
}
|
||||
} else {
|
||||
if was_seen && !is_seen {
|
||||
*mailbox_counts[&mailbox_hash].0.lock().unwrap() += 1;
|
||||
}
|
||||
(sender)(
|
||||
account_hash,
|
||||
BackendEvent::Refresh(RefreshEvent {
|
||||
account_hash,
|
||||
mailbox_hash,
|
||||
kind: Rename(old_hash, new_hash),
|
||||
}),
|
||||
);
|
||||
debug!("contains_new_key");
|
||||
if old_flags != new_flags {
|
||||
(sender)(
|
||||
account_hash,
|
||||
BackendEvent::Refresh(RefreshEvent {
|
||||
account_hash,
|
||||
mailbox_hash,
|
||||
kind: NewFlags(new_hash, (new_flags, vec![])),
|
||||
}),
|
||||
);
|
||||
}
|
||||
}
|
||||
|
||||
/* Maybe a re-read should be triggered here just to be safe.
|
||||
(sender)(account_hash, BackendEvent::Refresh(RefreshEvent {
|
||||
account_hash,
|
||||
mailbox_hash: get_path_hash!(dest),
|
||||
kind: Rescan,
|
||||
}));
|
||||
*/
|
||||
}
|
||||
/* Trigger rescan of mailbox */
|
||||
DebouncedEvent::Rescan => {
|
||||
(sender)(
|
||||
account_hash,
|
||||
BackendEvent::Refresh(RefreshEvent {
|
||||
account_hash,
|
||||
mailbox_hash: root_mailbox_hash,
|
||||
kind: Rescan,
|
||||
}),
|
||||
);
|
||||
}
|
||||
_ => {}
|
||||
},
|
||||
Err(e) => debug!("watch error: {:?}", e),
|
||||
}
|
||||
}
|
||||
let mailboxes = self.mailboxes.clone();
|
||||
let root_path = self.path.to_path_buf();
|
||||
Ok(Box::new(super::watch::MaildirWatcher {
|
||||
account_hash,
|
||||
cache_dir,
|
||||
event_consumer,
|
||||
hash_indexes,
|
||||
mailbox_hashes: BTreeSet::default(),
|
||||
mailbox_index,
|
||||
mailboxes,
|
||||
polling_period: std::time::Duration::from_secs(2),
|
||||
root_path,
|
||||
}))
|
||||
}
|
||||
|
||||
|
@ -1335,49 +862,3 @@ impl MaildirType {
|
|||
Ok(())
|
||||
}
|
||||
}
|
||||
|
||||
fn add_path_to_index(
|
||||
hash_index: &HashIndexes,
|
||||
mailbox_hash: MailboxHash,
|
||||
path: &Path,
|
||||
cache_dir: &xdg::BaseDirectories,
|
||||
file_name: PathBuf,
|
||||
buf: &mut Vec<u8>,
|
||||
) -> Result<Envelope> {
|
||||
debug!("add_path_to_index path {:?} filename{:?}", path, file_name);
|
||||
let env_hash = get_file_hash(path);
|
||||
{
|
||||
let mut map = hash_index.lock().unwrap();
|
||||
let map = map.entry(mailbox_hash).or_default();
|
||||
map.insert(env_hash, path.to_path_buf().into());
|
||||
debug!(
|
||||
"inserted {} in {} map, len={}",
|
||||
env_hash,
|
||||
mailbox_hash,
|
||||
map.len()
|
||||
);
|
||||
}
|
||||
let mut reader = io::BufReader::new(fs::File::open(&path)?);
|
||||
buf.clear();
|
||||
reader.read_to_end(buf)?;
|
||||
let mut env = Envelope::from_bytes(buf.as_slice(), Some(path.flags()))?;
|
||||
env.set_hash(env_hash);
|
||||
debug!(
|
||||
"add_path_to_index gen {}\t{}",
|
||||
env_hash,
|
||||
file_name.display()
|
||||
);
|
||||
if let Ok(cached) = cache_dir.place_cache_file(file_name) {
|
||||
debug!("putting in cache");
|
||||
/* place result in cache directory */
|
||||
let f = fs::File::create(cached)?;
|
||||
let metadata = f.metadata()?;
|
||||
let mut permissions = metadata.permissions();
|
||||
|
||||
permissions.set_mode(0o600); // Read/write for owner only.
|
||||
f.set_permissions(permissions)?;
|
||||
let writer = io::BufWriter::new(f);
|
||||
bincode::Options::serialize_into(bincode::config::DefaultOptions::new(), writer, &env)?;
|
||||
}
|
||||
Ok(env)
|
||||
}
|
||||
|
|
|
@ -0,0 +1,608 @@
|
|||
/*
|
||||
* meli - mailbox module.
|
||||
*
|
||||
* Copyright 2017 - 2021 Manos Pitsidianakis
|
||||
*
|
||||
* This file is part of meli.
|
||||
*
|
||||
* meli is free software: you can redistribute it and/or modify
|
||||
* it under the terms of the GNU General Public License as published by
|
||||
* the Free Software Foundation, either version 3 of the License, or
|
||||
* (at your option) any later version.
|
||||
*
|
||||
* meli is distributed in the hope that it will be useful,
|
||||
* but WITHOUT ANY WARRANTY; without even the implied warranty of
|
||||
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
|
||||
* GNU General Public License for more details.
|
||||
*
|
||||
* You should have received a copy of the GNU General Public License
|
||||
* along with meli. If not, see <http://www.gnu.org/licenses/>.
|
||||
*/
|
||||
|
||||
use super::*;
|
||||
use crate::backends::{RefreshEventKind::*, *};
|
||||
use std::collections::BTreeSet;
|
||||
use std::ffi::OsStr;
|
||||
use std::io;
|
||||
use std::os::unix::fs::PermissionsExt;
|
||||
extern crate notify;
|
||||
use notify::{watcher, DebouncedEvent, RecursiveMode, Watcher};
|
||||
use std::path::{Component, Path, PathBuf};
|
||||
use std::sync::mpsc::channel;
|
||||
|
||||
#[derive(Debug)]
|
||||
pub struct MaildirWatcher {
|
||||
pub account_hash: AccountHash,
|
||||
pub cache_dir: xdg::BaseDirectories,
|
||||
pub event_consumer: BackendEventConsumer,
|
||||
pub hash_indexes: HashIndexes,
|
||||
pub mailbox_hashes: BTreeSet<MailboxHash>,
|
||||
pub mailbox_index: Arc<Mutex<HashMap<EnvelopeHash, MailboxHash>>>,
|
||||
pub mailboxes: HashMap<MailboxHash, MaildirMailbox>,
|
||||
pub polling_period: std::time::Duration,
|
||||
pub root_path: PathBuf,
|
||||
}
|
||||
|
||||
impl BackendWatcher for MaildirWatcher {
|
||||
fn is_blocking(&self) -> bool {
|
||||
true
|
||||
}
|
||||
|
||||
fn register_mailbox(
|
||||
&mut self,
|
||||
mailbox_hash: MailboxHash,
|
||||
_urgency: MailboxWatchUrgency,
|
||||
) -> Result<()> {
|
||||
self.mailbox_hashes.insert(mailbox_hash);
|
||||
Ok(())
|
||||
}
|
||||
|
||||
fn set_polling_period(&mut self, period: Option<std::time::Duration>) -> Result<()> {
|
||||
if let Some(period) = period {
|
||||
self.polling_period = period;
|
||||
}
|
||||
Ok(())
|
||||
}
|
||||
|
||||
fn spawn(self: Box<Self>) -> ResultFuture<()> {
|
||||
let MaildirWatcher {
|
||||
account_hash,
|
||||
cache_dir,
|
||||
event_consumer: sender,
|
||||
hash_indexes,
|
||||
mailbox_hashes: _,
|
||||
mailbox_index,
|
||||
mailboxes,
|
||||
polling_period,
|
||||
root_path,
|
||||
} = *self;
|
||||
Ok(Box::pin(async move {
|
||||
let (tx, rx) = channel();
|
||||
let mut watcher = watcher(tx, polling_period).unwrap();
|
||||
watcher.watch(&root_path, RecursiveMode::Recursive).unwrap();
|
||||
debug!("watching {:?}", root_path);
|
||||
let root_mailbox_hash: MailboxHash = mailboxes
|
||||
.values()
|
||||
.find(|m| m.parent.is_none())
|
||||
.map(|m| m.hash())
|
||||
.unwrap();
|
||||
let mailbox_counts = mailboxes
|
||||
.iter()
|
||||
.map(|(&k, v)| (k, (v.unseen.clone(), v.total.clone())))
|
||||
.collect::<HashMap<MailboxHash, (Arc<Mutex<usize>>, Arc<Mutex<usize>>)>>();
|
||||
let mut buf = Vec::with_capacity(4096);
|
||||
loop {
|
||||
match rx.recv() {
|
||||
/*
|
||||
* Event types:
|
||||
*
|
||||
* pub enum RefreshEventKind {
|
||||
* Update(EnvelopeHash, Envelope), // Old hash, new envelope
|
||||
* Create(Envelope),
|
||||
* Remove(EnvelopeHash),
|
||||
* Rescan,
|
||||
* }
|
||||
*/
|
||||
Ok(event) => match event {
|
||||
/* Create */
|
||||
DebouncedEvent::Create(mut pathbuf) => {
|
||||
debug!("DebouncedEvent::Create(path = {:?}", pathbuf);
|
||||
if path_is_new!(pathbuf) {
|
||||
debug!("path_is_new");
|
||||
/* This creates a Rename event that we will receive later */
|
||||
pathbuf = match move_to_cur(pathbuf) {
|
||||
Ok(p) => p,
|
||||
Err(e) => {
|
||||
debug!("error: {}", e.to_string());
|
||||
continue;
|
||||
}
|
||||
};
|
||||
}
|
||||
let mailbox_hash = get_path_hash!(pathbuf);
|
||||
let file_name = pathbuf
|
||||
.as_path()
|
||||
.strip_prefix(&root_path)
|
||||
.unwrap()
|
||||
.to_path_buf();
|
||||
if let Ok(env) = add_path_to_index(
|
||||
&hash_indexes,
|
||||
mailbox_hash,
|
||||
pathbuf.as_path(),
|
||||
&cache_dir,
|
||||
file_name,
|
||||
&mut buf,
|
||||
) {
|
||||
mailbox_index
|
||||
.lock()
|
||||
.unwrap()
|
||||
.insert(env.hash(), mailbox_hash);
|
||||
debug!(
|
||||
"Create event {} {} {}",
|
||||
env.hash(),
|
||||
env.subject(),
|
||||
pathbuf.display()
|
||||
);
|
||||
if !env.is_seen() {
|
||||
*mailbox_counts[&mailbox_hash].0.lock().unwrap() += 1;
|
||||
}
|
||||
*mailbox_counts[&mailbox_hash].1.lock().unwrap() += 1;
|
||||
(sender)(
|
||||
account_hash,
|
||||
BackendEvent::Refresh(RefreshEvent {
|
||||
account_hash,
|
||||
mailbox_hash,
|
||||
kind: Create(Box::new(env)),
|
||||
}),
|
||||
);
|
||||
}
|
||||
}
|
||||
/* Update */
|
||||
DebouncedEvent::NoticeWrite(pathbuf) | DebouncedEvent::Write(pathbuf) => {
|
||||
debug!("DebouncedEvent::Write(path = {:?}", &pathbuf);
|
||||
let mailbox_hash = get_path_hash!(pathbuf);
|
||||
let mut hash_indexes_lock = hash_indexes.lock().unwrap();
|
||||
let index_lock =
|
||||
&mut hash_indexes_lock.entry(mailbox_hash).or_default();
|
||||
let file_name = pathbuf
|
||||
.as_path()
|
||||
.strip_prefix(&root_path)
|
||||
.unwrap()
|
||||
.to_path_buf();
|
||||
/* Linear search in hash_index to find old hash */
|
||||
let old_hash: EnvelopeHash = {
|
||||
if let Some((k, v)) =
|
||||
index_lock.iter_mut().find(|(_, v)| *v.buf == pathbuf)
|
||||
{
|
||||
*v = pathbuf.clone().into();
|
||||
*k
|
||||
} else {
|
||||
drop(hash_indexes_lock);
|
||||
/* Did we just miss a Create event? In any case, create
|
||||
* envelope. */
|
||||
if let Ok(env) = add_path_to_index(
|
||||
&hash_indexes,
|
||||
mailbox_hash,
|
||||
pathbuf.as_path(),
|
||||
&cache_dir,
|
||||
file_name,
|
||||
&mut buf,
|
||||
) {
|
||||
mailbox_index
|
||||
.lock()
|
||||
.unwrap()
|
||||
.insert(env.hash(), mailbox_hash);
|
||||
(sender)(
|
||||
account_hash,
|
||||
BackendEvent::Refresh(RefreshEvent {
|
||||
account_hash,
|
||||
mailbox_hash,
|
||||
kind: Create(Box::new(env)),
|
||||
}),
|
||||
);
|
||||
}
|
||||
continue;
|
||||
}
|
||||
};
|
||||
let new_hash: EnvelopeHash = get_file_hash(pathbuf.as_path());
|
||||
let mut reader = io::BufReader::new(fs::File::open(&pathbuf)?);
|
||||
buf.clear();
|
||||
reader.read_to_end(&mut buf)?;
|
||||
if index_lock.get_mut(&new_hash).is_none() {
|
||||
debug!("write notice");
|
||||
if let Ok(mut env) =
|
||||
Envelope::from_bytes(buf.as_slice(), Some(pathbuf.flags()))
|
||||
{
|
||||
env.set_hash(new_hash);
|
||||
debug!("{}\t{:?}", new_hash, &pathbuf);
|
||||
debug!(
|
||||
"hash {}, path: {:?} couldn't be parsed",
|
||||
new_hash, &pathbuf
|
||||
);
|
||||
index_lock.insert(new_hash, pathbuf.into());
|
||||
|
||||
/* Send Write notice */
|
||||
|
||||
(sender)(
|
||||
account_hash,
|
||||
BackendEvent::Refresh(RefreshEvent {
|
||||
account_hash,
|
||||
mailbox_hash,
|
||||
kind: Update(old_hash, Box::new(env)),
|
||||
}),
|
||||
);
|
||||
}
|
||||
}
|
||||
}
|
||||
/* Remove */
|
||||
DebouncedEvent::NoticeRemove(pathbuf) | DebouncedEvent::Remove(pathbuf) => {
|
||||
debug!("DebouncedEvent::Remove(path = {:?}", pathbuf);
|
||||
let mailbox_hash = get_path_hash!(pathbuf);
|
||||
let mut hash_indexes_lock = hash_indexes.lock().unwrap();
|
||||
let index_lock = hash_indexes_lock.entry(mailbox_hash).or_default();
|
||||
let hash: EnvelopeHash = if let Some((k, _)) =
|
||||
index_lock.iter().find(|(_, v)| *v.buf == pathbuf)
|
||||
{
|
||||
*k
|
||||
} else {
|
||||
debug!("removed but not contained in index");
|
||||
continue;
|
||||
};
|
||||
if let Some(ref modif) = &index_lock[&hash].modified {
|
||||
match modif {
|
||||
PathMod::Path(path) => debug!(
|
||||
"envelope {} has modified path set {}",
|
||||
hash,
|
||||
path.display()
|
||||
),
|
||||
PathMod::Hash(hash) => debug!(
|
||||
"envelope {} has modified path set {}",
|
||||
hash,
|
||||
&index_lock[&hash].buf.display()
|
||||
),
|
||||
}
|
||||
index_lock.entry(hash).and_modify(|e| {
|
||||
e.removed = false;
|
||||
});
|
||||
continue;
|
||||
}
|
||||
{
|
||||
let mut lck = mailbox_counts[&mailbox_hash].1.lock().unwrap();
|
||||
*lck = lck.saturating_sub(1);
|
||||
}
|
||||
if !pathbuf.flags().contains(Flag::SEEN) {
|
||||
let mut lck = mailbox_counts[&mailbox_hash].0.lock().unwrap();
|
||||
*lck = lck.saturating_sub(1);
|
||||
}
|
||||
|
||||
index_lock.entry(hash).and_modify(|e| {
|
||||
e.removed = true;
|
||||
});
|
||||
|
||||
(sender)(
|
||||
account_hash,
|
||||
BackendEvent::Refresh(RefreshEvent {
|
||||
account_hash,
|
||||
mailbox_hash,
|
||||
kind: Remove(hash),
|
||||
}),
|
||||
);
|
||||
}
|
||||
/* Envelope hasn't changed */
|
||||
DebouncedEvent::Rename(src, dest) => {
|
||||
debug!("DebouncedEvent::Rename(src = {:?}, dest = {:?})", src, dest);
|
||||
let mailbox_hash = get_path_hash!(src);
|
||||
let dest_mailbox = {
|
||||
let dest_mailbox = get_path_hash!(dest);
|
||||
if dest_mailbox == mailbox_hash {
|
||||
None
|
||||
} else {
|
||||
Some(dest_mailbox)
|
||||
}
|
||||
};
|
||||
let old_hash: EnvelopeHash = get_file_hash(src.as_path());
|
||||
let new_hash: EnvelopeHash = get_file_hash(dest.as_path());
|
||||
|
||||
let mut hash_indexes_lock = hash_indexes.lock().unwrap();
|
||||
let index_lock = hash_indexes_lock.entry(mailbox_hash).or_default();
|
||||
let old_flags = src.flags();
|
||||
let new_flags = dest.flags();
|
||||
let was_seen: bool = old_flags.contains(Flag::SEEN);
|
||||
let is_seen: bool = new_flags.contains(Flag::SEEN);
|
||||
|
||||
if index_lock.contains_key(&old_hash) && !index_lock[&old_hash].removed
|
||||
{
|
||||
debug!("contains_old_key");
|
||||
if let Some(dest_mailbox) = dest_mailbox {
|
||||
index_lock.entry(old_hash).and_modify(|e| {
|
||||
e.removed = true;
|
||||
});
|
||||
|
||||
(sender)(
|
||||
account_hash,
|
||||
BackendEvent::Refresh(RefreshEvent {
|
||||
account_hash,
|
||||
mailbox_hash,
|
||||
kind: Remove(old_hash),
|
||||
}),
|
||||
);
|
||||
let file_name = dest
|
||||
.as_path()
|
||||
.strip_prefix(&root_path)
|
||||
.unwrap()
|
||||
.to_path_buf();
|
||||
drop(hash_indexes_lock);
|
||||
if let Ok(env) = add_path_to_index(
|
||||
&hash_indexes,
|
||||
dest_mailbox,
|
||||
dest.as_path(),
|
||||
&cache_dir,
|
||||
file_name,
|
||||
&mut buf,
|
||||
) {
|
||||
mailbox_index
|
||||
.lock()
|
||||
.unwrap()
|
||||
.insert(env.hash(), dest_mailbox);
|
||||
debug!(
|
||||
"Create event {} {} {}",
|
||||
env.hash(),
|
||||
env.subject(),
|
||||
dest.display()
|
||||
);
|
||||
if !env.is_seen() {
|
||||
*mailbox_counts[&dest_mailbox].0.lock().unwrap() += 1;
|
||||
}
|
||||
*mailbox_counts[&dest_mailbox].1.lock().unwrap() += 1;
|
||||
(sender)(
|
||||
account_hash,
|
||||
BackendEvent::Refresh(RefreshEvent {
|
||||
account_hash,
|
||||
mailbox_hash: dest_mailbox,
|
||||
kind: Create(Box::new(env)),
|
||||
}),
|
||||
);
|
||||
}
|
||||
} else {
|
||||
index_lock.entry(old_hash).and_modify(|e| {
|
||||
debug!(&e.modified);
|
||||
e.modified = Some(PathMod::Hash(new_hash));
|
||||
});
|
||||
(sender)(
|
||||
account_hash,
|
||||
BackendEvent::Refresh(RefreshEvent {
|
||||
account_hash,
|
||||
mailbox_hash,
|
||||
kind: Rename(old_hash, new_hash),
|
||||
}),
|
||||
);
|
||||
if !was_seen && is_seen {
|
||||
let mut lck =
|
||||
mailbox_counts[&mailbox_hash].0.lock().unwrap();
|
||||
*lck = lck.saturating_sub(1);
|
||||
} else if was_seen && !is_seen {
|
||||
*mailbox_counts[&mailbox_hash].0.lock().unwrap() += 1;
|
||||
}
|
||||
if old_flags != new_flags {
|
||||
(sender)(
|
||||
account_hash,
|
||||
BackendEvent::Refresh(RefreshEvent {
|
||||
account_hash,
|
||||
mailbox_hash,
|
||||
kind: NewFlags(new_hash, (new_flags, vec![])),
|
||||
}),
|
||||
);
|
||||
}
|
||||
mailbox_index.lock().unwrap().insert(new_hash, mailbox_hash);
|
||||
index_lock.insert(new_hash, dest.into());
|
||||
}
|
||||
continue;
|
||||
} else if !index_lock.contains_key(&new_hash)
|
||||
&& index_lock
|
||||
.get(&old_hash)
|
||||
.map(|e| e.removed)
|
||||
.unwrap_or(false)
|
||||
{
|
||||
if index_lock
|
||||
.get(&old_hash)
|
||||
.map(|e| e.removed)
|
||||
.unwrap_or(false)
|
||||
{
|
||||
index_lock.entry(old_hash).and_modify(|e| {
|
||||
e.modified = Some(PathMod::Hash(new_hash));
|
||||
e.removed = false;
|
||||
});
|
||||
debug!("contains_old_key, key was marked as removed (by external source)");
|
||||
} else {
|
||||
debug!("not contains_new_key");
|
||||
}
|
||||
let file_name = dest
|
||||
.as_path()
|
||||
.strip_prefix(&root_path)
|
||||
.unwrap()
|
||||
.to_path_buf();
|
||||
debug!("filename = {:?}", file_name);
|
||||
drop(hash_indexes_lock);
|
||||
if let Ok(env) = add_path_to_index(
|
||||
&hash_indexes,
|
||||
dest_mailbox.unwrap_or(mailbox_hash),
|
||||
dest.as_path(),
|
||||
&cache_dir,
|
||||
file_name,
|
||||
&mut buf,
|
||||
) {
|
||||
mailbox_index
|
||||
.lock()
|
||||
.unwrap()
|
||||
.insert(env.hash(), dest_mailbox.unwrap_or(mailbox_hash));
|
||||
debug!(
|
||||
"Create event {} {} {}",
|
||||
env.hash(),
|
||||
env.subject(),
|
||||
dest.display()
|
||||
);
|
||||
if !env.is_seen() {
|
||||
*mailbox_counts[&dest_mailbox.unwrap_or(mailbox_hash)]
|
||||
.0
|
||||
.lock()
|
||||
.unwrap() += 1;
|
||||
}
|
||||
*mailbox_counts[&dest_mailbox.unwrap_or(mailbox_hash)]
|
||||
.1
|
||||
.lock()
|
||||
.unwrap() += 1;
|
||||
(sender)(
|
||||
account_hash,
|
||||
BackendEvent::Refresh(RefreshEvent {
|
||||
account_hash,
|
||||
mailbox_hash: dest_mailbox.unwrap_or(mailbox_hash),
|
||||
kind: Create(Box::new(env)),
|
||||
}),
|
||||
);
|
||||
continue;
|
||||
} else {
|
||||
debug!("not valid email");
|
||||
}
|
||||
} else if let Some(dest_mailbox) = dest_mailbox {
|
||||
drop(hash_indexes_lock);
|
||||
let file_name = dest
|
||||
.as_path()
|
||||
.strip_prefix(&root_path)
|
||||
.unwrap()
|
||||
.to_path_buf();
|
||||
if let Ok(env) = add_path_to_index(
|
||||
&hash_indexes,
|
||||
dest_mailbox,
|
||||
dest.as_path(),
|
||||
&cache_dir,
|
||||
file_name,
|
||||
&mut buf,
|
||||
) {
|
||||
mailbox_index
|
||||
.lock()
|
||||
.unwrap()
|
||||
.insert(env.hash(), dest_mailbox);
|
||||
debug!(
|
||||
"Create event {} {} {}",
|
||||
env.hash(),
|
||||
env.subject(),
|
||||
dest.display()
|
||||
);
|
||||
if !env.is_seen() {
|
||||
*mailbox_counts[&dest_mailbox].0.lock().unwrap() += 1;
|
||||
}
|
||||
*mailbox_counts[&dest_mailbox].1.lock().unwrap() += 1;
|
||||
(sender)(
|
||||
account_hash,
|
||||
BackendEvent::Refresh(RefreshEvent {
|
||||
account_hash,
|
||||
mailbox_hash: dest_mailbox,
|
||||
kind: Create(Box::new(env)),
|
||||
}),
|
||||
);
|
||||
}
|
||||
} else {
|
||||
if was_seen && !is_seen {
|
||||
*mailbox_counts[&mailbox_hash].0.lock().unwrap() += 1;
|
||||
}
|
||||
(sender)(
|
||||
account_hash,
|
||||
BackendEvent::Refresh(RefreshEvent {
|
||||
account_hash,
|
||||
mailbox_hash,
|
||||
kind: Rename(old_hash, new_hash),
|
||||
}),
|
||||
);
|
||||
debug!("contains_new_key");
|
||||
if old_flags != new_flags {
|
||||
(sender)(
|
||||
account_hash,
|
||||
BackendEvent::Refresh(RefreshEvent {
|
||||
account_hash,
|
||||
mailbox_hash,
|
||||
kind: NewFlags(new_hash, (new_flags, vec![])),
|
||||
}),
|
||||
);
|
||||
}
|
||||
}
|
||||
|
||||
/* Maybe a re-read should be triggered here just to be safe.
|
||||
(sender)(account_hash, BackendEvent::Refresh(RefreshEvent {
|
||||
account_hash,
|
||||
mailbox_hash: get_path_hash!(dest),
|
||||
kind: Rescan,
|
||||
}));
|
||||
*/
|
||||
}
|
||||
/* Trigger rescan of mailbox */
|
||||
DebouncedEvent::Rescan => {
|
||||
(sender)(
|
||||
account_hash,
|
||||
BackendEvent::Refresh(RefreshEvent {
|
||||
account_hash,
|
||||
mailbox_hash: root_mailbox_hash,
|
||||
kind: Rescan,
|
||||
}),
|
||||
);
|
||||
}
|
||||
_ => {}
|
||||
},
|
||||
Err(e) => debug!("watch error: {:?}", e),
|
||||
}
|
||||
}
|
||||
}))
|
||||
}
|
||||
|
||||
fn as_any(&self) -> &dyn Any {
|
||||
self
|
||||
}
|
||||
|
||||
fn as_any_mut(&mut self) -> &mut dyn Any {
|
||||
self
|
||||
}
|
||||
}
|
||||
|
||||
fn add_path_to_index(
|
||||
hash_index: &HashIndexes,
|
||||
mailbox_hash: MailboxHash,
|
||||
path: &Path,
|
||||
cache_dir: &xdg::BaseDirectories,
|
||||
file_name: PathBuf,
|
||||
buf: &mut Vec<u8>,
|
||||
) -> Result<Envelope> {
|
||||
debug!("add_path_to_index path {:?} filename{:?}", path, file_name);
|
||||
let env_hash = get_file_hash(path);
|
||||
{
|
||||
let mut map = hash_index.lock().unwrap();
|
||||
let map = map.entry(mailbox_hash).or_default();
|
||||
map.insert(env_hash, path.to_path_buf().into());
|
||||
debug!(
|
||||
"inserted {} in {} map, len={}",
|
||||
env_hash,
|
||||
mailbox_hash,
|
||||
map.len()
|
||||
);
|
||||
}
|
||||
let mut reader = io::BufReader::new(fs::File::open(&path)?);
|
||||
buf.clear();
|
||||
reader.read_to_end(buf)?;
|
||||
let mut env = Envelope::from_bytes(buf.as_slice(), Some(path.flags()))?;
|
||||
env.set_hash(env_hash);
|
||||
debug!(
|
||||
"add_path_to_index gen {}\t{}",
|
||||
env_hash,
|
||||
file_name.display()
|
||||
);
|
||||
if let Ok(cached) = cache_dir.place_cache_file(file_name) {
|
||||
debug!("putting in cache");
|
||||
/* place result in cache directory */
|
||||
let f = fs::File::create(cached)?;
|
||||
let metadata = f.metadata()?;
|
||||
let mut permissions = metadata.permissions();
|
||||
|
||||
permissions.set_mode(0o600); // Read/write for owner only.
|
||||
f.set_permissions(permissions)?;
|
||||
let writer = io::BufWriter::new(f);
|
||||
bincode::Options::serialize_into(bincode::config::DefaultOptions::new(), writer, &env)?;
|
||||
}
|
||||
Ok(env)
|
||||
}
|
|
@ -147,6 +147,8 @@ use std::sync::{Arc, Mutex, RwLock};
|
|||
|
||||
pub mod write;
|
||||
|
||||
pub mod watch;
|
||||
|
||||
pub type Offset = usize;
|
||||
pub type Length = usize;
|
||||
|
||||
|
@ -184,7 +186,7 @@ fn get_rw_lock_blocking(f: &File, path: &Path) -> Result<()> {
|
|||
}
|
||||
|
||||
#[derive(Debug)]
|
||||
struct MboxMailbox {
|
||||
pub struct MboxMailbox {
|
||||
hash: MailboxHash,
|
||||
name: String,
|
||||
path: PathBuf,
|
||||
|
@ -950,157 +952,24 @@ impl MailBackend for MboxType {
|
|||
Err(MeliError::new("Unimplemented."))
|
||||
}
|
||||
|
||||
fn watch(&self) -> ResultFuture<()> {
|
||||
let sender = self.event_consumer.clone();
|
||||
let (tx, rx) = channel();
|
||||
let mut watcher = watcher(tx, std::time::Duration::from_secs(10))
|
||||
.map_err(|e| e.to_string())
|
||||
.map_err(MeliError::new)?;
|
||||
for f in self.mailboxes.lock().unwrap().values() {
|
||||
watcher
|
||||
.watch(&f.fs_path, RecursiveMode::Recursive)
|
||||
.map_err(|e| e.to_string())
|
||||
.map_err(MeliError::new)?;
|
||||
debug!("watching {:?}", f.fs_path.as_path());
|
||||
}
|
||||
fn watcher(&self) -> Result<Box<dyn BackendWatcher>> {
|
||||
let account_hash = {
|
||||
let mut hasher = DefaultHasher::new();
|
||||
hasher.write(self.account_name.as_bytes());
|
||||
hasher.finish()
|
||||
};
|
||||
let mailboxes = self.mailboxes.clone();
|
||||
let event_consumer = self.event_consumer.clone();
|
||||
let mailbox_index = self.mailbox_index.clone();
|
||||
let prefer_mbox_type = self.prefer_mbox_type;
|
||||
Ok(Box::pin(async move {
|
||||
loop {
|
||||
match rx.recv() {
|
||||
/*
|
||||
* Event types:
|
||||
*
|
||||
* pub enum RefreshEventKind {
|
||||
* Update(EnvelopeHash, Envelope), // Old hash, new envelope
|
||||
* Create(Envelope),
|
||||
* Remove(EnvelopeHash),
|
||||
* Rescan,
|
||||
* }
|
||||
*/
|
||||
Ok(event) => match event {
|
||||
/* Update */
|
||||
DebouncedEvent::NoticeWrite(pathbuf) | DebouncedEvent::Write(pathbuf) => {
|
||||
let mailbox_hash = get_path_hash!(&pathbuf);
|
||||
let file = match std::fs::OpenOptions::new()
|
||||
.read(true)
|
||||
.write(true)
|
||||
.open(&pathbuf)
|
||||
{
|
||||
Ok(f) => f,
|
||||
Err(_) => {
|
||||
continue;
|
||||
}
|
||||
};
|
||||
get_rw_lock_blocking(&file, &pathbuf)?;
|
||||
let mut mailbox_lock = mailboxes.lock().unwrap();
|
||||
let mut buf_reader = BufReader::new(file);
|
||||
let mut contents = Vec::new();
|
||||
if let Err(e) = buf_reader.read_to_end(&mut contents) {
|
||||
debug!(e);
|
||||
continue;
|
||||
};
|
||||
if contents.starts_with(mailbox_lock[&mailbox_hash].content.as_slice())
|
||||
{
|
||||
if let Ok((_, envelopes)) = mbox_parse(
|
||||
mailbox_lock[&mailbox_hash].index.clone(),
|
||||
&contents,
|
||||
mailbox_lock[&mailbox_hash].content.len(),
|
||||
prefer_mbox_type,
|
||||
) {
|
||||
let mut mailbox_index_lck = mailbox_index.lock().unwrap();
|
||||
for env in envelopes {
|
||||
mailbox_index_lck.insert(env.hash(), mailbox_hash);
|
||||
(sender)(
|
||||
account_hash,
|
||||
BackendEvent::Refresh(RefreshEvent {
|
||||
account_hash,
|
||||
mailbox_hash,
|
||||
kind: RefreshEventKind::Create(Box::new(env)),
|
||||
}),
|
||||
);
|
||||
}
|
||||
}
|
||||
} else {
|
||||
(sender)(
|
||||
account_hash,
|
||||
BackendEvent::Refresh(RefreshEvent {
|
||||
account_hash,
|
||||
mailbox_hash,
|
||||
kind: RefreshEventKind::Rescan,
|
||||
}),
|
||||
);
|
||||
}
|
||||
mailbox_lock
|
||||
.entry(mailbox_hash)
|
||||
.and_modify(|f| f.content = contents);
|
||||
}
|
||||
/* Remove */
|
||||
DebouncedEvent::NoticeRemove(pathbuf) | DebouncedEvent::Remove(pathbuf) => {
|
||||
if mailboxes
|
||||
.lock()
|
||||
.unwrap()
|
||||
.values()
|
||||
.any(|f| f.fs_path == pathbuf)
|
||||
{
|
||||
let mailbox_hash = get_path_hash!(&pathbuf);
|
||||
(sender)(
|
||||
account_hash,
|
||||
BackendEvent::Refresh(RefreshEvent {
|
||||
account_hash,
|
||||
mailbox_hash,
|
||||
kind: RefreshEventKind::Failure(MeliError::new(format!(
|
||||
"mbox mailbox {} was removed.",
|
||||
pathbuf.display()
|
||||
))),
|
||||
}),
|
||||
);
|
||||
return Ok(());
|
||||
}
|
||||
}
|
||||
DebouncedEvent::Rename(src, dest) => {
|
||||
if mailboxes.lock().unwrap().values().any(|f| f.fs_path == src) {
|
||||
let mailbox_hash = get_path_hash!(&src);
|
||||
(sender)(
|
||||
account_hash,
|
||||
BackendEvent::Refresh(RefreshEvent {
|
||||
account_hash,
|
||||
mailbox_hash,
|
||||
kind: RefreshEventKind::Failure(MeliError::new(format!(
|
||||
"mbox mailbox {} was renamed to {}.",
|
||||
src.display(),
|
||||
dest.display()
|
||||
))),
|
||||
}),
|
||||
);
|
||||
return Ok(());
|
||||
}
|
||||
}
|
||||
/* Trigger rescan of mailboxes */
|
||||
DebouncedEvent::Rescan => {
|
||||
for &mailbox_hash in mailboxes.lock().unwrap().keys() {
|
||||
(sender)(
|
||||
account_hash,
|
||||
BackendEvent::Refresh(RefreshEvent {
|
||||
account_hash,
|
||||
mailbox_hash,
|
||||
kind: RefreshEventKind::Rescan,
|
||||
}),
|
||||
);
|
||||
}
|
||||
return Ok(());
|
||||
}
|
||||
_ => {}
|
||||
},
|
||||
Err(e) => debug!("watch error: {:?}", e),
|
||||
}
|
||||
}
|
||||
let mailboxes = self.mailboxes.clone();
|
||||
let prefer_mbox_type = self.prefer_mbox_type.clone();
|
||||
Ok(Box::new(watch::MboxWatcher {
|
||||
account_hash,
|
||||
event_consumer,
|
||||
mailbox_hashes: BTreeSet::default(),
|
||||
mailbox_index,
|
||||
mailboxes,
|
||||
polling_period: std::time::Duration::from_secs(60),
|
||||
prefer_mbox_type,
|
||||
}))
|
||||
}
|
||||
|
||||
|
|
|
@ -0,0 +1,223 @@
|
|||
/*
|
||||
* meli - mailbox module.
|
||||
*
|
||||
* Copyright 2017 - 2021 Manos Pitsidianakis
|
||||
*
|
||||
* This file is part of meli.
|
||||
*
|
||||
* meli is free software: you can redistribute it and/or modify
|
||||
* it under the terms of the GNU General Public License as published by
|
||||
* the Free Software Foundation, either version 3 of the License, or
|
||||
* (at your option) any later version.
|
||||
*
|
||||
* meli is distributed in the hope that it will be useful,
|
||||
* but WITHOUT ANY WARRANTY; without even the implied warranty of
|
||||
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
|
||||
* GNU General Public License for more details.
|
||||
*
|
||||
* You should have received a copy of the GNU General Public License
|
||||
* along with meli. If not, see <http://www.gnu.org/licenses/>.
|
||||
*/
|
||||
|
||||
use super::*;
|
||||
use std::collections::BTreeSet;
|
||||
|
||||
#[derive(Debug)]
|
||||
pub struct MboxWatcher {
|
||||
pub account_hash: AccountHash,
|
||||
pub event_consumer: BackendEventConsumer,
|
||||
pub mailbox_hashes: BTreeSet<MailboxHash>,
|
||||
pub mailbox_index: Arc<Mutex<HashMap<EnvelopeHash, MailboxHash>>>,
|
||||
pub mailboxes: Arc<Mutex<HashMap<MailboxHash, MboxMailbox>>>,
|
||||
pub polling_period: std::time::Duration,
|
||||
pub prefer_mbox_type: Option<MboxFormat>,
|
||||
}
|
||||
|
||||
impl BackendWatcher for MboxWatcher {
|
||||
fn is_blocking(&self) -> bool {
|
||||
true
|
||||
}
|
||||
|
||||
fn register_mailbox(
|
||||
&mut self,
|
||||
mailbox_hash: MailboxHash,
|
||||
_urgency: MailboxWatchUrgency,
|
||||
) -> Result<()> {
|
||||
self.mailbox_hashes.insert(mailbox_hash);
|
||||
Ok(())
|
||||
}
|
||||
|
||||
fn set_polling_period(&mut self, period: Option<std::time::Duration>) -> Result<()> {
|
||||
if let Some(period) = period {
|
||||
self.polling_period = period;
|
||||
}
|
||||
Ok(())
|
||||
}
|
||||
|
||||
fn spawn(self: Box<Self>) -> ResultFuture<()> {
|
||||
let MboxWatcher {
|
||||
account_hash,
|
||||
event_consumer: sender,
|
||||
mailbox_hashes,
|
||||
mailbox_index,
|
||||
mailboxes,
|
||||
polling_period,
|
||||
prefer_mbox_type,
|
||||
} = *self;
|
||||
let (tx, rx) = channel();
|
||||
let mut watcher = watcher(tx, polling_period)
|
||||
.map_err(|e| e.to_string())
|
||||
.map_err(MeliError::new)?;
|
||||
for (_, f) in mailboxes
|
||||
.lock()
|
||||
.unwrap()
|
||||
.iter()
|
||||
.filter(|(k, _)| mailbox_hashes.contains(k))
|
||||
{
|
||||
watcher
|
||||
.watch(&f.fs_path, RecursiveMode::Recursive)
|
||||
.map_err(|e| e.to_string())
|
||||
.map_err(MeliError::new)?;
|
||||
debug!("watching {:?}", f.fs_path.as_path());
|
||||
}
|
||||
Ok(Box::pin(async move {
|
||||
loop {
|
||||
match rx.recv() {
|
||||
/*
|
||||
* Event types:
|
||||
*
|
||||
* pub enum RefreshEventKind {
|
||||
* Update(EnvelopeHash, Envelope), // Old hash, new envelope
|
||||
* Create(Envelope),
|
||||
* Remove(EnvelopeHash),
|
||||
* Rescan,
|
||||
* }
|
||||
*/
|
||||
Ok(event) => match event {
|
||||
/* Update */
|
||||
DebouncedEvent::NoticeWrite(pathbuf) | DebouncedEvent::Write(pathbuf) => {
|
||||
let mailbox_hash = get_path_hash!(&pathbuf);
|
||||
let file = match std::fs::OpenOptions::new()
|
||||
.read(true)
|
||||
.write(true)
|
||||
.open(&pathbuf)
|
||||
{
|
||||
Ok(f) => f,
|
||||
Err(_) => {
|
||||
continue;
|
||||
}
|
||||
};
|
||||
get_rw_lock_blocking(&file, &pathbuf)?;
|
||||
let mut mailbox_lock = mailboxes.lock().unwrap();
|
||||
let mut buf_reader = BufReader::new(file);
|
||||
let mut contents = Vec::new();
|
||||
if let Err(e) = buf_reader.read_to_end(&mut contents) {
|
||||
debug!(e);
|
||||
continue;
|
||||
};
|
||||
if contents.starts_with(mailbox_lock[&mailbox_hash].content.as_slice())
|
||||
{
|
||||
if let Ok((_, envelopes)) = mbox_parse(
|
||||
mailbox_lock[&mailbox_hash].index.clone(),
|
||||
&contents,
|
||||
mailbox_lock[&mailbox_hash].content.len(),
|
||||
prefer_mbox_type,
|
||||
) {
|
||||
let mut mailbox_index_lck = mailbox_index.lock().unwrap();
|
||||
for env in envelopes {
|
||||
mailbox_index_lck.insert(env.hash(), mailbox_hash);
|
||||
(sender)(
|
||||
account_hash,
|
||||
BackendEvent::Refresh(RefreshEvent {
|
||||
account_hash,
|
||||
mailbox_hash,
|
||||
kind: RefreshEventKind::Create(Box::new(env)),
|
||||
}),
|
||||
);
|
||||
}
|
||||
}
|
||||
} else {
|
||||
(sender)(
|
||||
account_hash,
|
||||
BackendEvent::Refresh(RefreshEvent {
|
||||
account_hash,
|
||||
mailbox_hash,
|
||||
kind: RefreshEventKind::Rescan,
|
||||
}),
|
||||
);
|
||||
}
|
||||
mailbox_lock
|
||||
.entry(mailbox_hash)
|
||||
.and_modify(|f| f.content = contents);
|
||||
}
|
||||
/* Remove */
|
||||
DebouncedEvent::NoticeRemove(pathbuf) | DebouncedEvent::Remove(pathbuf) => {
|
||||
if mailboxes
|
||||
.lock()
|
||||
.unwrap()
|
||||
.values()
|
||||
.any(|f| f.fs_path == pathbuf)
|
||||
{
|
||||
let mailbox_hash = get_path_hash!(&pathbuf);
|
||||
(sender)(
|
||||
account_hash,
|
||||
BackendEvent::Refresh(RefreshEvent {
|
||||
account_hash,
|
||||
mailbox_hash,
|
||||
kind: RefreshEventKind::Failure(MeliError::new(format!(
|
||||
"mbox mailbox {} was removed.",
|
||||
pathbuf.display()
|
||||
))),
|
||||
}),
|
||||
);
|
||||
return Ok(());
|
||||
}
|
||||
}
|
||||
DebouncedEvent::Rename(src, dest) => {
|
||||
if mailboxes.lock().unwrap().values().any(|f| f.fs_path == src) {
|
||||
let mailbox_hash = get_path_hash!(&src);
|
||||
(sender)(
|
||||
account_hash,
|
||||
BackendEvent::Refresh(RefreshEvent {
|
||||
account_hash,
|
||||
mailbox_hash,
|
||||
kind: RefreshEventKind::Failure(MeliError::new(format!(
|
||||
"mbox mailbox {} was renamed to {}.",
|
||||
src.display(),
|
||||
dest.display()
|
||||
))),
|
||||
}),
|
||||
);
|
||||
return Ok(());
|
||||
}
|
||||
}
|
||||
/* Trigger rescan of mailboxes */
|
||||
DebouncedEvent::Rescan => {
|
||||
for &mailbox_hash in mailboxes.lock().unwrap().keys() {
|
||||
(sender)(
|
||||
account_hash,
|
||||
BackendEvent::Refresh(RefreshEvent {
|
||||
account_hash,
|
||||
mailbox_hash,
|
||||
kind: RefreshEventKind::Rescan,
|
||||
}),
|
||||
);
|
||||
}
|
||||
return Ok(());
|
||||
}
|
||||
_ => {}
|
||||
},
|
||||
Err(e) => debug!("watch error: {:?}", e),
|
||||
}
|
||||
}
|
||||
}))
|
||||
}
|
||||
|
||||
fn as_any(&self) -> &dyn Any {
|
||||
self
|
||||
}
|
||||
|
||||
fn as_any_mut(&mut self) -> &mut dyn Any {
|
||||
self
|
||||
}
|
||||
}
|
|
@ -239,7 +239,7 @@ impl MailBackend for NntpType {
|
|||
}))
|
||||
}
|
||||
|
||||
fn watch(&self) -> ResultFuture<()> {
|
||||
fn watcher(&self) -> Result<Box<dyn BackendWatcher>> {
|
||||
Err(MeliError::new("Unimplemented."))
|
||||
}
|
||||
|
||||
|
|
|
@ -67,6 +67,8 @@ pub use tags::*;
|
|||
mod thread;
|
||||
pub use thread::*;
|
||||
|
||||
mod watch;
|
||||
|
||||
#[derive(Debug)]
|
||||
pub struct DbConnection {
|
||||
pub lib: Arc<libloading::Library>,
|
||||
|
@ -232,7 +234,7 @@ unsafe impl Send for NotmuchDb {}
|
|||
unsafe impl Sync for NotmuchDb {}
|
||||
|
||||
#[derive(Debug, Clone, Default)]
|
||||
struct NotmuchMailbox {
|
||||
pub struct NotmuchMailbox {
|
||||
hash: MailboxHash,
|
||||
children: Vec<MailboxHash>,
|
||||
parent: Option<MailboxHash>,
|
||||
|
@ -447,6 +449,82 @@ impl MailBackend for NotmuchDb {
|
|||
Ok(Box::pin(async { Ok(()) }))
|
||||
}
|
||||
|
||||
fn load(&mut self, mailbox_hash: MailboxHash) -> ResultFuture<()> {
|
||||
let database = NotmuchDb::new_connection(
|
||||
self.path.as_path(),
|
||||
self.revision_uuid.clone(),
|
||||
self.lib.clone(),
|
||||
false,
|
||||
)?;
|
||||
let mailboxes = self.mailboxes.clone();
|
||||
let index = self.index.clone();
|
||||
let mailbox_index = self.mailbox_index.clone();
|
||||
let collection = self.collection.clone();
|
||||
Ok(Box::pin(async move {
|
||||
{
|
||||
//crate::connections::sleep(std::time::Duration::from_secs(2)).await;
|
||||
}
|
||||
let mailboxes_lck = mailboxes.read().unwrap();
|
||||
let mailbox = mailboxes_lck.get(&mailbox_hash).unwrap();
|
||||
let query: Query = Query::new(&database, mailbox.query_str.as_str())?;
|
||||
let mut unseen_total = 0;
|
||||
let mut mailbox_index_lck = mailbox_index.write().unwrap();
|
||||
let new_envelopes: HashMap<EnvelopeHash, Envelope> = query
|
||||
.search()?
|
||||
.into_iter()
|
||||
.map(|m| {
|
||||
let env = m.into_envelope(&index, &collection.tag_index);
|
||||
mailbox_index_lck
|
||||
.entry(env.hash())
|
||||
.or_default()
|
||||
.push(mailbox_hash);
|
||||
if !env.is_seen() {
|
||||
unseen_total += 1;
|
||||
}
|
||||
(env.hash(), env)
|
||||
})
|
||||
.collect();
|
||||
{
|
||||
let mut total_lck = mailbox.total.lock().unwrap();
|
||||
let mut unseen_lck = mailbox.unseen.lock().unwrap();
|
||||
*total_lck = new_envelopes.len();
|
||||
*unseen_lck = unseen_total;
|
||||
}
|
||||
collection.merge(new_envelopes, mailbox_hash, None);
|
||||
let mut envelopes_lck = collection.envelopes.write().unwrap();
|
||||
envelopes_lck.retain(|&k, _| k % 2 == 0);
|
||||
Ok(())
|
||||
}))
|
||||
}
|
||||
|
||||
fn fetch_batch(&mut self, env_hashes: EnvelopeHashBatch) -> ResultFuture<()> {
|
||||
let database = NotmuchDb::new_connection(
|
||||
self.path.as_path(),
|
||||
self.revision_uuid.clone(),
|
||||
self.lib.clone(),
|
||||
false,
|
||||
)?;
|
||||
let index = self.index.clone();
|
||||
let collection = self.collection.clone();
|
||||
Ok(Box::pin(async move {
|
||||
//crate::connections::sleep(std::time::Duration::from_secs(2)).await;
|
||||
debug!("fetch_batch {:?}", &env_hashes);
|
||||
let mut envelopes_lck = collection.envelopes.write().unwrap();
|
||||
for env_hash in env_hashes.iter() {
|
||||
if envelopes_lck.contains_key(&env_hash) {
|
||||
continue;
|
||||
}
|
||||
let index_lck = index.write().unwrap();
|
||||
let message = Message::find_message(&database, &index_lck[&env_hash])?;
|
||||
drop(index_lck);
|
||||
let env = message.into_envelope(&index, &collection.tag_index);
|
||||
envelopes_lck.insert(env_hash, env);
|
||||
}
|
||||
debug!("fetch_batch {:?} done", &env_hashes);
|
||||
Ok(())
|
||||
}))
|
||||
}
|
||||
|
||||
fn fetch(
|
||||
&mut self,
|
||||
mailbox_hash: MailboxHash,
|
||||
|
@ -581,50 +659,29 @@ impl MailBackend for NotmuchDb {
|
|||
}))
|
||||
}
|
||||
|
||||
fn watch(&self) -> ResultFuture<()> {
|
||||
extern crate notify;
|
||||
use notify::{watcher, RecursiveMode, Watcher};
|
||||
|
||||
fn watcher(&self) -> Result<Box<dyn BackendWatcher>> {
|
||||
let account_hash = self.account_hash;
|
||||
let collection = self.collection.clone();
|
||||
let event_consumer = self.event_consumer.clone();
|
||||
let index = self.index.clone();
|
||||
let lib = self.lib.clone();
|
||||
let mailbox_index = self.mailbox_index.clone();
|
||||
let mailboxes = self.mailboxes.clone();
|
||||
let path = self.path.clone();
|
||||
let revision_uuid = self.revision_uuid.clone();
|
||||
let mailboxes = self.mailboxes.clone();
|
||||
let index = self.index.clone();
|
||||
let mailbox_index = self.mailbox_index.clone();
|
||||
let event_consumer = self.event_consumer.clone();
|
||||
|
||||
let (tx, rx) = std::sync::mpsc::channel();
|
||||
let mut watcher = watcher(tx, std::time::Duration::from_secs(2)).unwrap();
|
||||
watcher.watch(&self.path, RecursiveMode::Recursive).unwrap();
|
||||
Ok(Box::pin(async move {
|
||||
let _watcher = watcher;
|
||||
let rx = rx;
|
||||
loop {
|
||||
let _ = rx.recv().map_err(|err| err.to_string())?;
|
||||
{
|
||||
let mut database = NotmuchDb::new_connection(
|
||||
path.as_path(),
|
||||
revision_uuid.clone(),
|
||||
lib.clone(),
|
||||
false,
|
||||
)?;
|
||||
let new_revision_uuid = database.get_revision_uuid();
|
||||
if new_revision_uuid > *database.revision_uuid.read().unwrap() {
|
||||
database.refresh(
|
||||
mailboxes.clone(),
|
||||
index.clone(),
|
||||
mailbox_index.clone(),
|
||||
collection.tag_index.clone(),
|
||||
account_hash.clone(),
|
||||
event_consumer.clone(),
|
||||
new_revision_uuid,
|
||||
)?;
|
||||
*revision_uuid.write().unwrap() = new_revision_uuid;
|
||||
}
|
||||
}
|
||||
}
|
||||
Ok(Box::new(watch::NotmuchWatcher {
|
||||
account_hash,
|
||||
collection,
|
||||
event_consumer,
|
||||
index,
|
||||
lib,
|
||||
mailbox_hashes: BTreeSet::default(),
|
||||
mailbox_index,
|
||||
mailboxes,
|
||||
path,
|
||||
polling_period: std::time::Duration::from_secs(3),
|
||||
revision_uuid,
|
||||
}))
|
||||
}
|
||||
|
||||
|
|
|
@ -20,7 +20,6 @@
|
|||
*/
|
||||
|
||||
use super::*;
|
||||
use crate::thread::{ThreadHash, ThreadNode, ThreadNodeHash};
|
||||
|
||||
#[derive(Clone)]
|
||||
pub struct Message<'m> {
|
||||
|
@ -188,22 +187,6 @@ impl<'m> Message<'m> {
|
|||
}
|
||||
}
|
||||
|
||||
pub fn into_thread_node(&self) -> (ThreadNodeHash, ThreadNode) {
|
||||
(
|
||||
ThreadNodeHash::from(self.msg_id()),
|
||||
ThreadNode {
|
||||
message: Some(self.env_hash()),
|
||||
parent: None,
|
||||
other_mailbox: false,
|
||||
children: vec![],
|
||||
date: self.date(),
|
||||
show_subject: true,
|
||||
group: ThreadHash::new(),
|
||||
unseen: false,
|
||||
},
|
||||
)
|
||||
}
|
||||
|
||||
pub fn add_tag(&self, tag: &CStr) -> Result<()> {
|
||||
if let Err(err) = unsafe {
|
||||
try_call!(
|
||||
|
|
|
@ -0,0 +1,116 @@
|
|||
/*
|
||||
* meli - notmuch backend
|
||||
*
|
||||
* Copyright 2019 - 2021 Manos Pitsidianakis
|
||||
*
|
||||
* This file is part of meli.
|
||||
*
|
||||
* meli is free software: you can redistribute it and/or modify
|
||||
* it under the terms of the GNU General Public License as published by
|
||||
* the Free Software Foundation, either version 3 of the License, or
|
||||
* (at your option) any later version.
|
||||
*
|
||||
* meli is distributed in the hope that it will be useful,
|
||||
* but WITHOUT ANY WARRANTY; without even the implied warranty of
|
||||
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
|
||||
* GNU General Public License for more details.
|
||||
*
|
||||
* You should have received a copy of the GNU General Public License
|
||||
* along with meli. If not, see <http://www.gnu.org/licenses/>.
|
||||
*/
|
||||
|
||||
use super::*;
|
||||
use std::collections::BTreeSet;
|
||||
|
||||
#[derive(Debug)]
|
||||
pub struct NotmuchWatcher {
|
||||
pub account_hash: AccountHash,
|
||||
pub collection: Collection,
|
||||
pub event_consumer: BackendEventConsumer,
|
||||
pub index: Arc<RwLock<HashMap<EnvelopeHash, CString>>>,
|
||||
pub lib: Arc<libloading::Library>,
|
||||
pub mailbox_hashes: BTreeSet<MailboxHash>,
|
||||
pub mailbox_index: Arc<RwLock<HashMap<EnvelopeHash, SmallVec<[MailboxHash; 16]>>>>,
|
||||
pub mailboxes: Arc<RwLock<HashMap<MailboxHash, NotmuchMailbox>>>,
|
||||
pub path: PathBuf,
|
||||
pub polling_period: std::time::Duration,
|
||||
pub revision_uuid: Arc<RwLock<u64>>,
|
||||
}
|
||||
|
||||
impl BackendWatcher for NotmuchWatcher {
|
||||
fn is_blocking(&self) -> bool {
|
||||
true
|
||||
}
|
||||
|
||||
fn register_mailbox(
|
||||
&mut self,
|
||||
mailbox_hash: MailboxHash,
|
||||
_urgency: MailboxWatchUrgency,
|
||||
) -> Result<()> {
|
||||
self.mailbox_hashes.insert(mailbox_hash);
|
||||
Ok(())
|
||||
}
|
||||
|
||||
fn set_polling_period(&mut self, period: Option<std::time::Duration>) -> Result<()> {
|
||||
if let Some(period) = period {
|
||||
self.polling_period = period;
|
||||
}
|
||||
Ok(())
|
||||
}
|
||||
|
||||
fn spawn(self: Box<Self>) -> ResultFuture<()> {
|
||||
Ok(Box::pin(async move {
|
||||
extern crate notify;
|
||||
use notify::{watcher, RecursiveMode, Watcher};
|
||||
let NotmuchWatcher {
|
||||
account_hash,
|
||||
collection,
|
||||
event_consumer,
|
||||
index,
|
||||
lib,
|
||||
mailbox_hashes: _,
|
||||
mailbox_index,
|
||||
mailboxes,
|
||||
path,
|
||||
polling_period,
|
||||
revision_uuid,
|
||||
} = *self;
|
||||
|
||||
let (tx, rx) = std::sync::mpsc::channel();
|
||||
let mut watcher = watcher(tx, polling_period).unwrap();
|
||||
watcher.watch(&path, RecursiveMode::Recursive).unwrap();
|
||||
loop {
|
||||
let _ = rx.recv().map_err(|err| err.to_string())?;
|
||||
{
|
||||
let mut database = NotmuchDb::new_connection(
|
||||
path.as_path(),
|
||||
revision_uuid.clone(),
|
||||
lib.clone(),
|
||||
false,
|
||||
)?;
|
||||
let new_revision_uuid = database.get_revision_uuid();
|
||||
if new_revision_uuid > *database.revision_uuid.read().unwrap() {
|
||||
database.refresh(
|
||||
mailboxes.clone(),
|
||||
index.clone(),
|
||||
mailbox_index.clone(),
|
||||
collection.tag_index.clone(),
|
||||
account_hash.clone(),
|
||||
event_consumer.clone(),
|
||||
new_revision_uuid,
|
||||
)?;
|
||||
*revision_uuid.write().unwrap() = new_revision_uuid;
|
||||
}
|
||||
}
|
||||
}
|
||||
}))
|
||||
}
|
||||
|
||||
fn as_any(&self) -> &dyn Any {
|
||||
self
|
||||
}
|
||||
|
||||
fn as_any_mut(&mut self) -> &mut dyn Any {
|
||||
self
|
||||
}
|
||||
}
|
|
@ -348,7 +348,7 @@ fn run_app(opt: Opt) -> Result<()> {
|
|||
state.register_component(Box::new(components::svg::SVGScreenshotFilter::new()));
|
||||
let window = Box::new(Tabbed::new(
|
||||
vec![
|
||||
Box::new(listing::Listing::new(&mut state.context)),
|
||||
Box::new(listing2::Listing::new(&mut state.context)),
|
||||
Box::new(ContactList::new(&state.context)),
|
||||
],
|
||||
&state.context,
|
||||
|
|
|
@ -27,6 +27,7 @@ use melib::email::{attachment_types::*, attachments::*};
|
|||
use melib::thread::ThreadNodeHash;
|
||||
|
||||
pub mod listing;
|
||||
pub mod listing2;
|
||||
pub use crate::listing::*;
|
||||
pub mod view;
|
||||
pub use crate::view::*;
|
||||
|
|
File diff suppressed because it is too large
Load Diff
|
@ -163,6 +163,13 @@ pub enum JobRequest {
|
|||
Mailboxes {
|
||||
handle: JoinHandle<Result<HashMap<MailboxHash, Mailbox>>>,
|
||||
},
|
||||
Load {
|
||||
mailbox_hash: MailboxHash,
|
||||
handle: JoinHandle<Result<()>>,
|
||||
},
|
||||
FetchBatch {
|
||||
handle: JoinHandle<Result<()>>,
|
||||
},
|
||||
Fetch {
|
||||
mailbox_hash: MailboxHash,
|
||||
handle: JoinHandle<(
|
||||
|
@ -237,6 +244,8 @@ impl Drop for JobRequest {
|
|||
match self {
|
||||
JobRequest::Generic { handle, .. } |
|
||||
JobRequest::IsOnline { handle, .. } |
|
||||
JobRequest::Load { handle, .. } |
|
||||
JobRequest::FetchBatch { handle, .. } |
|
||||
JobRequest::Refresh { handle, .. } |
|
||||
JobRequest::SetFlags { handle, .. } |
|
||||
JobRequest::SaveMessage { handle, .. } |
|
||||
|
@ -275,6 +284,7 @@ impl core::fmt::Debug for JobRequest {
|
|||
match self {
|
||||
JobRequest::Generic { name, .. } => write!(f, "JobRequest::Generic({})", name),
|
||||
JobRequest::Mailboxes { .. } => write!(f, "JobRequest::Mailboxes"),
|
||||
JobRequest::FetchBatch { .. } => write!(f, "JobRequest::FetchBatch",),
|
||||
JobRequest::Fetch { mailbox_hash, .. } => {
|
||||
write!(f, "JobRequest::Fetch({})", mailbox_hash)
|
||||
}
|
||||
|
@ -302,6 +312,9 @@ impl core::fmt::Debug for JobRequest {
|
|||
JobRequest::SendMessageBackground { .. } => {
|
||||
write!(f, "JobRequest::SendMessageBackground")
|
||||
}
|
||||
JobRequest::Load { mailbox_hash, .. } => {
|
||||
write!(f, "JobRequest::Load({})", mailbox_hash)
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
@ -312,6 +325,8 @@ impl core::fmt::Display for JobRequest {
|
|||
JobRequest::Generic { name, .. } => write!(f, "{}", name),
|
||||
JobRequest::Mailboxes { .. } => write!(f, "Get mailbox list"),
|
||||
JobRequest::Fetch { .. } => write!(f, "Mailbox fetch"),
|
||||
JobRequest::FetchBatch { .. } => write!(f, "Fetch envelopes"),
|
||||
JobRequest::Load { .. } => write!(f, "Mailbox load"),
|
||||
JobRequest::IsOnline { .. } => write!(f, "Online status check"),
|
||||
JobRequest::Refresh { .. } => write!(f, "Refresh mailbox"),
|
||||
JobRequest::SetFlags { env_hashes, .. } => write!(
|
||||
|
@ -351,6 +366,15 @@ impl JobRequest {
|
|||
}
|
||||
}
|
||||
|
||||
pub fn is_load(&self, mailbox_hash: MailboxHash) -> bool {
|
||||
match self {
|
||||
JobRequest::Load {
|
||||
mailbox_hash: h, ..
|
||||
} if *h == mailbox_hash => true,
|
||||
_ => false,
|
||||
}
|
||||
}
|
||||
|
||||
pub fn is_fetch(&self, mailbox_hash: MailboxHash) -> bool {
|
||||
match self {
|
||||
JobRequest::Fetch {
|
||||
|
@ -658,8 +682,7 @@ impl Account {
|
|||
{
|
||||
let total = entry.ref_mailbox.count().ok().unwrap_or((0, 0)).1;
|
||||
entry.status = MailboxStatus::Parsing(0, total);
|
||||
if let Ok(mailbox_job) = self.backend.write().unwrap().fetch(*h) {
|
||||
let mailbox_job = mailbox_job.into_future();
|
||||
if let Ok(mailbox_job) = self.backend.write().unwrap().load(*h) {
|
||||
let handle = if self.backend_capabilities.is_async {
|
||||
self.job_executor.spawn_specialized(mailbox_job)
|
||||
} else {
|
||||
|
@ -671,9 +694,10 @@ impl Account {
|
|||
StatusEvent::NewJob(job_id),
|
||||
)))
|
||||
.unwrap();
|
||||
debug!("JobRequest::Load {} {:?}", *h, job_id);
|
||||
self.active_jobs.insert(
|
||||
job_id,
|
||||
JobRequest::Fetch {
|
||||
JobRequest::Load {
|
||||
mailbox_hash: *h,
|
||||
handle,
|
||||
},
|
||||
|
@ -1068,9 +1092,23 @@ impl Account {
|
|||
}
|
||||
|
||||
if !self.active_jobs.values().any(|j| j.is_watch()) {
|
||||
match self.backend.read().unwrap().watch() {
|
||||
Ok(fut) => {
|
||||
let handle = if self.backend_capabilities.is_async {
|
||||
match self
|
||||
.backend
|
||||
.read()
|
||||
.unwrap()
|
||||
.watcher()
|
||||
.and_then(|mut watcher| {
|
||||
for (mailbox_hash, _) in self
|
||||
.mailbox_entries
|
||||
.iter()
|
||||
.filter(|(_, m)| m.conf.mailbox_conf.subscribe.is_true())
|
||||
{
|
||||
watcher.register_mailbox(*mailbox_hash, MailboxWatchUrgency::High)?;
|
||||
}
|
||||
Ok((watcher.is_blocking(), watcher.spawn()?))
|
||||
}) {
|
||||
Ok((is_blocking, fut)) => {
|
||||
let handle = if is_blocking {
|
||||
self.job_executor.spawn_specialized(fut)
|
||||
} else {
|
||||
self.job_executor.spawn_blocking(fut)
|
||||
|
@ -1121,6 +1159,73 @@ impl Account {
|
|||
self.hash
|
||||
}
|
||||
|
||||
pub fn fetch_batch(&mut self, env_hashes: EnvelopeHashBatch) -> Result<JobId> {
|
||||
debug!("account fetch_batch {:?}", &env_hashes);
|
||||
let job = self.backend.write().unwrap().fetch_batch(env_hashes)?;
|
||||
let handle = if self.backend_capabilities.is_async {
|
||||
self.job_executor.spawn_specialized(job)
|
||||
} else {
|
||||
self.job_executor.spawn_blocking(job)
|
||||
};
|
||||
let job_id = handle.job_id;
|
||||
self.insert_job(handle.job_id, JobRequest::FetchBatch { handle });
|
||||
Ok(job_id)
|
||||
}
|
||||
|
||||
pub fn load2(&mut self, mailbox_hash: MailboxHash) -> Result<Option<JobId>> {
|
||||
debug!("account load2({}", mailbox_hash);
|
||||
match self.mailbox_entries[&mailbox_hash].status {
|
||||
MailboxStatus::Available => Ok(None),
|
||||
MailboxStatus::Failed(ref err) => Err(err.clone()),
|
||||
MailboxStatus::Parsing(_, _) | MailboxStatus::None => {
|
||||
debug!("load2 find: ");
|
||||
if let Some(job_id) = self
|
||||
.active_jobs
|
||||
.iter()
|
||||
.find(|(id, j)| {
|
||||
debug!(id);
|
||||
debug!(j).is_load(mailbox_hash)
|
||||
})
|
||||
.map(|(j, _)| *j)
|
||||
{
|
||||
Ok(Some(job_id))
|
||||
} else {
|
||||
let mailbox_job = self.backend.write().unwrap().load(mailbox_hash);
|
||||
match mailbox_job {
|
||||
Ok(mailbox_job) => {
|
||||
let handle = if self.backend_capabilities.is_async {
|
||||
self.job_executor.spawn_specialized(mailbox_job)
|
||||
} else {
|
||||
self.job_executor.spawn_blocking(mailbox_job)
|
||||
};
|
||||
let job_id = handle.job_id;
|
||||
debug!("JobRequest::Load {} {:?}", mailbox_hash, handle.job_id);
|
||||
self.insert_job(
|
||||
handle.job_id,
|
||||
JobRequest::Load {
|
||||
mailbox_hash,
|
||||
handle,
|
||||
},
|
||||
);
|
||||
Ok(Some(job_id))
|
||||
}
|
||||
Err(err) => {
|
||||
self.mailbox_entries
|
||||
.entry(mailbox_hash)
|
||||
.and_modify(|entry| {
|
||||
entry.status = MailboxStatus::Failed(err.clone());
|
||||
});
|
||||
self.sender
|
||||
.send(ThreadEvent::UIEvent(UIEvent::StartupCheck(mailbox_hash)))
|
||||
.unwrap();
|
||||
Err(err)
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
pub fn load(&mut self, mailbox_hash: MailboxHash) -> result::Result<(), usize> {
|
||||
if mailbox_hash == 0 {
|
||||
return Err(0);
|
||||
|
@ -1623,6 +1728,57 @@ impl Account {
|
|||
}
|
||||
}
|
||||
}
|
||||
JobRequest::Load {
|
||||
mailbox_hash,
|
||||
ref mut handle,
|
||||
..
|
||||
} => {
|
||||
debug!("got mailbox load for {}", mailbox_hash);
|
||||
match handle.chan.try_recv() {
|
||||
Err(_) => {
|
||||
/* canceled */
|
||||
return true;
|
||||
}
|
||||
Ok(None) => {
|
||||
return true;
|
||||
}
|
||||
Ok(Some(Ok(()))) => {
|
||||
self.mailbox_entries
|
||||
.entry(mailbox_hash)
|
||||
.and_modify(|entry| {
|
||||
entry.status = MailboxStatus::Available;
|
||||
});
|
||||
self.sender
|
||||
.send(ThreadEvent::UIEvent(UIEvent::MailboxUpdate((
|
||||
self.hash,
|
||||
mailbox_hash,
|
||||
))))
|
||||
.unwrap();
|
||||
return true;
|
||||
}
|
||||
Ok(Some(Err(err))) => {
|
||||
self.sender
|
||||
.send(ThreadEvent::UIEvent(UIEvent::Notification(
|
||||
Some(format!("{}: could not load mailbox", &self.name)),
|
||||
err.to_string(),
|
||||
Some(crate::types::NotificationType::Error(err.kind)),
|
||||
)))
|
||||
.expect("Could not send event on main channel");
|
||||
self.mailbox_entries
|
||||
.entry(mailbox_hash)
|
||||
.and_modify(|entry| {
|
||||
entry.status = MailboxStatus::Failed(err);
|
||||
});
|
||||
self.sender
|
||||
.send(ThreadEvent::UIEvent(UIEvent::MailboxUpdate((
|
||||
self.hash,
|
||||
mailbox_hash,
|
||||
))))
|
||||
.unwrap();
|
||||
return true;
|
||||
}
|
||||
}
|
||||
}
|
||||
JobRequest::Fetch {
|
||||
mailbox_hash,
|
||||
ref mut handle,
|
||||
|
@ -2103,6 +2259,17 @@ impl Account {
|
|||
}
|
||||
}
|
||||
}
|
||||
JobRequest::FetchBatch { ref mut handle } => {
|
||||
if let Ok(Some(Err(err))) = handle.chan.try_recv() {
|
||||
self.sender
|
||||
.send(ThreadEvent::UIEvent(UIEvent::Notification(
|
||||
Some(format!("{}: envelope fetch failed", &self.name)),
|
||||
err.to_string(),
|
||||
Some(crate::types::NotificationType::Error(err.kind)),
|
||||
)))
|
||||
.expect("Could not send event on main channel");
|
||||
}
|
||||
}
|
||||
JobRequest::Watch { ref mut handle } => {
|
||||
debug!("JobRequest::Watch finished??? ");
|
||||
if let Ok(Some(Err(err))) = handle.chan.try_recv() {
|
||||
|
|
|
@ -393,7 +393,7 @@ impl State {
|
|||
.contains_key(&mailbox_hash)
|
||||
{
|
||||
if self.context.accounts[&account_hash]
|
||||
.load(mailbox_hash)
|
||||
.load2(mailbox_hash)
|
||||
.is_err()
|
||||
{
|
||||
self.context.replies.push_back(UIEvent::from(event));
|
||||
|
|
|
@ -285,6 +285,13 @@ pub mod segment_tree {
|
|||
max
|
||||
}
|
||||
|
||||
pub fn get(&self, index: usize) -> u8 {
|
||||
if self.array.is_empty() {
|
||||
return 0;
|
||||
}
|
||||
self.array[index]
|
||||
}
|
||||
|
||||
pub fn update(&mut self, pos: usize, value: u8) {
|
||||
let mut ctr = pos + self.array.len();
|
||||
|
||||
|
|
Loading…
Reference in New Issue