Compare commits

...

2 Commits

Author SHA1 Message Date
Manos Pitsidianakis 77e4488637
lazy_fetch WIP 2021-01-15 19:13:34 +02:00
Manos Pitsidianakis 819d993f11
melib/backends: replace watch() with watcher(), BackendWatcher trait
```
Return a [`Box<dyn BackendWatcher>`](BackendWatcher), to which you can
register the mailboxes you are interested in for updates and then
consume to spawn a watching `Future`.  The `Future` sends events to the
[`BackendEventConsumer`](BackendEventConsumer) supplied to the backend
in its constructor method.
```

Watching mailboxes for updates is more flexible now that you can
explicitly register mailboxes and set polling period.
2021-01-15 19:12:09 +02:00
21 changed files with 6099 additions and 1213 deletions

View File

@ -310,13 +310,26 @@ pub trait MailBackend: ::std::fmt::Debug + Send + Sync {
Ok(Box::pin(async { Ok(()) }))
}
fn fetch_batch(&mut self, env_hashes: EnvelopeHashBatch) -> ResultFuture<()> {
Err(MeliError::new("Unimplemented."))
}
fn load(&mut self, mailbox_hash: MailboxHash) -> ResultFuture<()> {
Err(MeliError::new("Unimplemented."))
}
fn fetch(
&mut self,
mailbox_hash: MailboxHash,
) -> Result<Pin<Box<dyn Stream<Item = Result<Vec<Envelope>>> + Send + 'static>>>;
fn refresh(&mut self, mailbox_hash: MailboxHash) -> ResultFuture<()>;
fn watch(&self) -> ResultFuture<()>;
/// Return a [`Box<dyn BackendWatcher>`](BackendWatcher), to which you can register the
/// mailboxes you are interested in for updates and then consume to spawn a watching `Future`.
/// The `Future` sends events to the [`BackendEventConsumer`](BackendEventConsumer) supplied to
/// the backend in its constructor method.
fn watcher(&self) -> Result<Box<dyn BackendWatcher>>;
fn mailboxes(&self) -> ResultFuture<HashMap<MailboxHash, Mailbox>>;
fn operation(&self, hash: EnvelopeHash) -> Result<Box<dyn BackendOp>>;
@ -623,7 +636,7 @@ impl EnvelopeHashBatch {
#[derive(Default, Clone)]
pub struct LazyCountSet {
not_yet_seen: usize,
set: BTreeSet<EnvelopeHash>,
pub set: BTreeSet<EnvelopeHash>,
}
impl fmt::Debug for LazyCountSet {
@ -711,3 +724,31 @@ impl std::ops::Deref for IsSubscribedFn {
&self.0
}
}
/// Urgency for the events of a single Mailbox.
#[derive(Debug, Copy, Clone, PartialEq)]
pub enum MailboxWatchUrgency {
High,
Medium,
Low,
}
/// Register the mailboxes you are interested in for updates and then consume with `spawn` to spawn
/// a watching `Future`. The `Future` sends events to the
/// [`BackendEventConsumer`](backends::BackendEventConsumer) supplied to the backend in its constructor
/// method.
pub trait BackendWatcher: ::std::fmt::Debug + Send + Sync {
/// Whether the watcher's `Future` requires blocking I/O.
fn is_blocking(&self) -> bool;
fn register_mailbox(
&mut self,
mailbox_hash: MailboxHash,
urgency: MailboxWatchUrgency,
) -> Result<()>;
fn set_polling_period(&mut self, period: Option<std::time::Duration>) -> Result<()>;
fn spawn(self: Box<Self>) -> ResultFuture<()>;
/// Use the [`Any`](std::any::Any) trait to get the underlying type implementing the
/// [`BackendWatcher`](backends::BackendEventConsumer) trait.
fn as_any(&self) -> &dyn Any;
fn as_any_mut(&mut self) -> &mut dyn Any;
}

View File

@ -370,13 +370,13 @@ impl MailBackend for ImapType {
let main_conn = self.connection.clone();
let uid_store = self.uid_store.clone();
Ok(Box::pin(async move {
let inbox = timeout(uid_store.timeout, uid_store.mailboxes.lock())
let mut inbox = timeout(uid_store.timeout, uid_store.mailboxes.lock())
.await?
.get(&mailbox_hash)
.map(std::clone::Clone::clone)
.unwrap();
let mut conn = timeout(uid_store.timeout, main_conn.lock()).await?;
watch::examine_updates(inbox, &mut conn, &uid_store).await?;
watch::ImapWatcher::examine_updates(&mut inbox, &mut conn, &uid_store).await?;
Ok(())
}))
}
@ -450,68 +450,16 @@ impl MailBackend for ImapType {
}))
}
fn watch(&self) -> ResultFuture<()> {
fn watcher(&self) -> Result<Box<dyn BackendWatcher>> {
let server_conf = self.server_conf.clone();
let main_conn = self.connection.clone();
let uid_store = self.uid_store.clone();
Ok(Box::pin(async move {
let has_idle: bool = match server_conf.protocol {
ImapProtocol::IMAP {
extension_use: ImapExtensionUse { idle, .. },
} => {
idle && uid_store
.capabilities
.lock()
.unwrap()
.iter()
.any(|cap| cap.eq_ignore_ascii_case(b"IDLE"))
}
_ => false,
};
while let Err(err) = if has_idle {
idle(ImapWatchKit {
conn: ImapConnection::new_connection(&server_conf, uid_store.clone()),
main_conn: main_conn.clone(),
uid_store: uid_store.clone(),
})
.await
} else {
poll_with_examine(ImapWatchKit {
conn: ImapConnection::new_connection(&server_conf, uid_store.clone()),
main_conn: main_conn.clone(),
uid_store: uid_store.clone(),
})
.await
} {
let mut main_conn_lck = timeout(uid_store.timeout, main_conn.lock()).await?;
if err.kind.is_network() {
uid_store.is_online.lock().unwrap().1 = Err(err.clone());
} else {
return Err(err);
}
debug!("Watch failure: {}", err.to_string());
match timeout(uid_store.timeout, main_conn_lck.connect())
.await
.and_then(|res| res)
{
Err(err2) => {
debug!("Watch reconnect attempt failed: {}", err2.to_string());
}
Ok(()) => {
debug!("Watch reconnect attempt succesful");
continue;
}
}
let account_hash = uid_store.account_hash;
main_conn_lck.add_refresh_event(RefreshEvent {
account_hash,
mailbox_hash: 0,
kind: RefreshEventKind::Failure(err.clone()),
});
return Err(err);
}
debug!("watch future returning");
Ok(())
Ok(Box::new(ImapWatcher {
main_conn,
uid_store,
mailbox_hashes: BTreeSet::default(),
polling_period: std::time::Duration::from_secs(5 * 60),
server_conf,
}))
}

View File

@ -588,7 +588,9 @@ impl ImapConnection {
pub fn connect<'a>(&'a mut self) -> Pin<Box<dyn Future<Output = Result<()>> + Send + 'a>> {
Box::pin(async move {
if let (time, ref mut status @ Ok(())) = *self.uid_store.is_online.lock().unwrap() {
if SystemTime::now().duration_since(time).unwrap_or_default() >= IMAP_PROTOCOL_TIMEOUT {
if SystemTime::now().duration_since(time).unwrap_or_default()
>= IMAP_PROTOCOL_TIMEOUT
{
let err = MeliError::new("Connection timed out").set_kind(ErrorKind::Timeout);
*status = Err(err.clone());
self.stream = Err(err);

View File

@ -23,201 +23,129 @@ use crate::backends::SpecialUsageMailbox;
use std::sync::Arc;
/// Arguments for IMAP watching functions
pub struct ImapWatchKit {
pub conn: ImapConnection,
#[derive(Debug)]
pub struct ImapWatcher {
pub main_conn: Arc<FutureMutex<ImapConnection>>,
pub uid_store: Arc<UIDStore>,
pub mailbox_hashes: BTreeSet<MailboxHash>,
pub polling_period: std::time::Duration,
pub server_conf: ImapServerConf,
}
pub async fn poll_with_examine(kit: ImapWatchKit) -> Result<()> {
debug!("poll with examine");
let ImapWatchKit {
mut conn,
main_conn: _,
uid_store,
} = kit;
conn.connect().await?;
let mailboxes: HashMap<MailboxHash, ImapMailbox> = {
let mailboxes_lck = timeout(uid_store.timeout, uid_store.mailboxes.lock()).await?;
mailboxes_lck.clone()
};
loop {
for (_, mailbox) in mailboxes.clone() {
examine_updates(mailbox, &mut conn, &uid_store).await?;
}
//FIXME: make sleep duration configurable
smol::Timer::after(std::time::Duration::from_secs(3 * 60)).await;
impl BackendWatcher for ImapWatcher {
fn is_blocking(&self) -> bool {
false
}
}
pub async fn idle(kit: ImapWatchKit) -> Result<()> {
debug!("IDLE");
/* IDLE only watches the connection's selected mailbox. We will IDLE on INBOX and every ~5
* minutes wake up and poll the others */
let ImapWatchKit {
mut conn,
main_conn,
uid_store,
} = kit;
conn.connect().await?;
let mailbox: ImapMailbox = match uid_store
.mailboxes
.lock()
.await
.values()
.find(|f| f.parent.is_none() && (f.special_usage() == SpecialUsageMailbox::Inbox))
.map(std::clone::Clone::clone)
{
Some(mailbox) => mailbox,
None => {
return Err(MeliError::new("INBOX mailbox not found in local mailbox index. meli may have not parsed the IMAP mailboxes correctly"));
fn register_mailbox(
&mut self,
mailbox_hash: MailboxHash,
_urgency: MailboxWatchUrgency,
) -> Result<()> {
self.mailbox_hashes.insert(mailbox_hash);
Ok(())
}
fn set_polling_period(&mut self, period: Option<std::time::Duration>) -> Result<()> {
if let Some(period) = period {
self.polling_period = period;
}
};
let mailbox_hash = mailbox.hash();
let mut response = Vec::with_capacity(8 * 1024);
let select_response = conn
.examine_mailbox(mailbox_hash, &mut response, true)
.await?
.unwrap();
{
let mut uidvalidities = uid_store.uidvalidity.lock().unwrap();
Ok(())
}
if let Some(v) = uidvalidities.get(&mailbox_hash) {
if *v != select_response.uidvalidity {
if uid_store.keep_offline_cache {
#[cfg(not(feature = "sqlite3"))]
let mut cache_handle = super::cache::DefaultCache::get(uid_store.clone())?;
#[cfg(feature = "sqlite3")]
let mut cache_handle = super::cache::Sqlite3Cache::get(uid_store.clone())?;
cache_handle.clear(mailbox_hash, &select_response)?;
fn spawn(mut self: Box<Self>) -> ResultFuture<()> {
Ok(Box::pin(async move {
let has_idle: bool = match self.server_conf.protocol {
ImapProtocol::IMAP {
extension_use: ImapExtensionUse { idle, .. },
} => {
idle && self
.uid_store
.capabilities
.lock()
.unwrap()
.iter()
.any(|cap| cap.eq_ignore_ascii_case(b"IDLE"))
}
conn.add_refresh_event(RefreshEvent {
account_hash: uid_store.account_hash,
mailbox_hash,
kind: RefreshEventKind::Rescan,
_ => false,
};
while let Err(err) = if has_idle {
self.idle().await
} else {
self.poll_with_examine().await
} {
let mut main_conn_lck =
timeout(self.uid_store.timeout, self.main_conn.lock()).await?;
if err.kind.is_network() {
self.uid_store.is_online.lock().unwrap().1 = Err(err.clone());
} else {
return Err(err);
}
debug!("Watch failure: {}", err.to_string());
match timeout(self.uid_store.timeout, main_conn_lck.connect())
.await
.and_then(|res| res)
{
Err(err2) => {
debug!("Watch reconnect attempt failed: {}", err2.to_string());
}
Ok(()) => {
debug!("Watch reconnect attempt succesful");
continue;
}
}
let account_hash = self.uid_store.account_hash;
main_conn_lck.add_refresh_event(RefreshEvent {
account_hash,
mailbox_hash: 0,
kind: RefreshEventKind::Failure(err.clone()),
});
/*
uid_store.uid_index.lock().unwrap().clear();
uid_store.hash_index.lock().unwrap().clear();
uid_store.byte_cache.lock().unwrap().clear();
*/
return Err(err);
}
} else {
uidvalidities.insert(mailbox_hash, select_response.uidvalidity);
}
debug!("watch future returning");
Ok(())
}))
}
let mailboxes: HashMap<MailboxHash, ImapMailbox> = {
let mailboxes_lck = timeout(uid_store.timeout, uid_store.mailboxes.lock()).await?;
mailboxes_lck.clone()
};
for (h, mailbox) in mailboxes.clone() {
if mailbox_hash == h {
continue;
}
examine_updates(mailbox, &mut conn, &uid_store).await?;
fn as_any(&self) -> &dyn Any {
self
}
conn.send_command(b"IDLE").await?;
let mut blockn = ImapBlockingConnection::from(conn);
let mut watch = std::time::Instant::now();
/* duration interval to send heartbeat */
const _10_MINS: std::time::Duration = std::time::Duration::from_secs(10 * 60);
/* duration interval to check other mailboxes for changes */
const _5_MINS: std::time::Duration = std::time::Duration::from_secs(5 * 60);
loop {
let line = match timeout(Some(_10_MINS), blockn.as_stream()).await {
Ok(Some(line)) => line,
Ok(None) => {
debug!("IDLE connection dropped: {:?}", &blockn.err());
blockn.conn.connect().await?;
let mut main_conn_lck = timeout(uid_store.timeout, main_conn.lock()).await?;
main_conn_lck.connect().await?;
continue;
}
Err(_) => {
/* Timeout */
blockn.conn.send_raw(b"DONE").await?;
blockn
.conn
.read_response(&mut response, RequiredResponses::empty())
.await?;
blockn.conn.send_command(b"IDLE").await?;
let mut main_conn_lck = timeout(uid_store.timeout, main_conn.lock()).await?;
main_conn_lck.connect().await?;
continue;
fn as_any_mut(&mut self) -> &mut dyn Any {
self
}
}
impl ImapWatcher {
pub async fn idle(&mut self) -> Result<()> {
debug!("IDLE");
/* IDLE only watches the connection's selected mailbox. We will IDLE on INBOX and every X
* minutes wake up and poll the others */
let ImapWatcher {
ref main_conn,
ref uid_store,
ref mailbox_hashes,
ref polling_period,
ref server_conf,
..
} = self;
let mut connection = ImapConnection::new_connection(server_conf, uid_store.clone());
connection.connect().await?;
let mailbox_hash: MailboxHash = match uid_store
.mailboxes
.lock()
.await
.values()
.find(|f| f.parent.is_none() && (f.special_usage() == SpecialUsageMailbox::Inbox))
.map(|f| f.hash)
{
Some(h) => h,
None => {
return Err(MeliError::new("INBOX mailbox not found in local mailbox index. meli may have not parsed the IMAP mailboxes correctly"));
}
};
let now = std::time::Instant::now();
if now.duration_since(watch) >= _5_MINS {
/* Time to poll all inboxes */
let mut conn = timeout(uid_store.timeout, main_conn.lock()).await?;
for (_h, mailbox) in mailboxes.clone() {
examine_updates(mailbox, &mut conn, &uid_store).await?;
}
watch = now;
}
if line
.split_rn()
.filter(|l| {
!l.starts_with(b"+ ")
&& !l.starts_with(b"* ok")
&& !l.starts_with(b"* ok")
&& !l.starts_with(b"* Ok")
&& !l.starts_with(b"* OK")
})
.count()
== 0
{
continue;
}
{
blockn.conn.send_raw(b"DONE").await?;
blockn
.conn
.read_response(&mut response, RequiredResponses::empty())
.await?;
for l in line.split_rn().chain(response.split_rn()) {
debug!("process_untagged {:?}", &l);
if l.starts_with(b"+ ")
|| l.starts_with(b"* ok")
|| l.starts_with(b"* ok")
|| l.starts_with(b"* Ok")
|| l.starts_with(b"* OK")
{
debug!("ignore continuation mark");
continue;
}
blockn.conn.process_untagged(l).await?;
}
blockn.conn.send_command(b"IDLE").await?;
}
}
}
pub async fn examine_updates(
mailbox: ImapMailbox,
conn: &mut ImapConnection,
uid_store: &Arc<UIDStore>,
) -> Result<()> {
if mailbox.no_select {
return Ok(());
}
let mailbox_hash = mailbox.hash();
debug!("examining mailbox {} {}", mailbox_hash, mailbox.path());
if let Some(new_envelopes) = conn.resync(mailbox_hash).await? {
for env in new_envelopes {
conn.add_refresh_event(RefreshEvent {
mailbox_hash,
account_hash: uid_store.account_hash,
kind: RefreshEventKind::Create(Box::new(env)),
});
}
} else {
#[cfg(not(feature = "sqlite3"))]
let mut cache_handle = super::cache::DefaultCache::get(uid_store.clone())?;
#[cfg(feature = "sqlite3")]
let mut cache_handle = super::cache::Sqlite3Cache::get(uid_store.clone())?;
let mut response = Vec::with_capacity(8 * 1024);
let select_response = conn
let select_response = connection
.examine_mailbox(mailbox_hash, &mut response, true)
.await?
.unwrap();
@ -227,9 +155,13 @@ pub async fn examine_updates(
if let Some(v) = uidvalidities.get(&mailbox_hash) {
if *v != select_response.uidvalidity {
if uid_store.keep_offline_cache {
#[cfg(not(feature = "sqlite3"))]
let mut cache_handle = super::cache::DefaultCache::get(uid_store.clone())?;
#[cfg(feature = "sqlite3")]
let mut cache_handle = super::cache::Sqlite3Cache::get(uid_store.clone())?;
cache_handle.clear(mailbox_hash, &select_response)?;
}
conn.add_refresh_event(RefreshEvent {
connection.add_refresh_event(RefreshEvent {
account_hash: uid_store.account_hash,
mailbox_hash,
kind: RefreshEventKind::Rescan,
@ -239,215 +171,384 @@ pub async fn examine_updates(
uid_store.hash_index.lock().unwrap().clear();
uid_store.byte_cache.lock().unwrap().clear();
*/
return Ok(());
}
} else {
uidvalidities.insert(mailbox_hash, select_response.uidvalidity);
}
}
if mailbox.is_cold() {
/* Mailbox hasn't been loaded yet */
let has_list_status: bool = conn
.uid_store
.capabilities
.lock()
.unwrap()
.iter()
.any(|cap| cap.eq_ignore_ascii_case(b"LIST-STATUS"));
if has_list_status {
conn.send_command(
format!(
"LIST \"{}\" \"\" RETURN (STATUS (MESSAGES UNSEEN))",
mailbox.imap_path()
)
.as_bytes(),
)
.await?;
conn.read_response(
&mut response,
RequiredResponses::LIST_REQUIRED | RequiredResponses::STATUS,
)
.await?;
debug!(
"list return status out: {}",
String::from_utf8_lossy(&response)
);
for l in response.split_rn() {
if !l.starts_with(b"*") {
continue;
}
if let Ok(status) = protocol_parser::status_response(&l).map(|(_, v)| v) {
if Some(mailbox_hash) == status.mailbox {
if let Some(total) = status.messages {
if let Ok(mut exists_lck) = mailbox.exists.lock() {
exists_lck.clear();
exists_lck.set_not_yet_seen(total);
}
}
if let Some(total) = status.unseen {
if let Ok(mut unseen_lck) = mailbox.unseen.lock() {
unseen_lck.clear();
unseen_lck.set_not_yet_seen(total);
}
}
break;
}
}
}
} else {
conn.send_command(b"SEARCH UNSEEN").await?;
conn.read_response(&mut response, RequiredResponses::SEARCH)
.await?;
let unseen_count = protocol_parser::search_results(&response)?.1.len();
if let Ok(mut exists_lck) = mailbox.exists.lock() {
exists_lck.clear();
exists_lck.set_not_yet_seen(select_response.exists);
}
if let Ok(mut unseen_lck) = mailbox.unseen.lock() {
unseen_lck.clear();
unseen_lck.set_not_yet_seen(unseen_count);
}
}
mailbox.set_warm(true);
return Ok(());
}
if select_response.recent > 0 {
/* UID SEARCH RECENT */
conn.send_command(b"UID SEARCH RECENT").await?;
conn.read_response(&mut response, RequiredResponses::SEARCH)
.await?;
let v = protocol_parser::search_results(response.as_slice()).map(|(_, v)| v)?;
if v.is_empty() {
debug!(
"search response was empty: {}",
String::from_utf8_lossy(&response)
);
return Ok(());
}
let mut cmd = "UID FETCH ".to_string();
if v.len() == 1 {
cmd.push_str(&v[0].to_string());
} else {
cmd.push_str(&v[0].to_string());
for n in v.into_iter().skip(1) {
cmd.push(',');
cmd.push_str(&n.to_string());
}
}
cmd.push_str(
" (UID FLAGS ENVELOPE BODY.PEEK[HEADER.FIELDS (REFERENCES)] BODYSTRUCTURE)",
);
conn.send_command(cmd.as_bytes()).await?;
conn.read_response(&mut response, RequiredResponses::FETCH_REQUIRED)
.await?;
} else if select_response.exists > mailbox.exists.lock().unwrap().len() {
conn.send_command(
format!(
"FETCH {}:* (UID FLAGS ENVELOPE BODY.PEEK[HEADER.FIELDS (REFERENCES)] BODYSTRUCTURE)",
std::cmp::max(mailbox.exists.lock().unwrap().len(), 1)
)
.as_bytes(),
)
.await?;
conn.read_response(&mut response, RequiredResponses::FETCH_REQUIRED)
.await?;
} else {
return Ok(());
}
debug!(
"fetch response is {} bytes and {} lines",
response.len(),
String::from_utf8_lossy(&response).lines().count()
);
let (_, mut v, _) = protocol_parser::fetch_responses(&response)?;
debug!("responses len is {}", v.len());
for FetchResponse {
ref uid,
ref mut envelope,
ref mut flags,
ref references,
..
} in v.iter_mut()
{
let uid = uid.unwrap();
let env = envelope.as_mut().unwrap();
env.set_hash(generate_envelope_hash(&mailbox.imap_path(), &uid));
if let Some(value) = references {
env.set_references(value);
}
let mut tag_lck = uid_store.collection.tag_index.write().unwrap();
if let Some((flags, keywords)) = flags {
env.set_flags(*flags);
if !env.is_seen() {
mailbox.unseen.lock().unwrap().insert_new(env.hash());
}
mailbox.exists.lock().unwrap().insert_new(env.hash());
for f in keywords {
let hash = tag_hash!(f);
if !tag_lck.contains_key(&hash) {
tag_lck.insert(hash, f.to_string());
}
env.labels_mut().push(hash);
}
}
}
if uid_store.keep_offline_cache {
if !cache_handle.mailbox_state(mailbox_hash)?.is_none() {
cache_handle
.insert_envelopes(mailbox_hash, &v)
.chain_err_summary(|| {
format!(
"Could not save envelopes in cache for mailbox {}",
mailbox.imap_path()
)
})?;
}
}
for FetchResponse { uid, envelope, .. } in v {
if uid.is_none() || envelope.is_none() {
let mailboxes: HashMap<MailboxHash, ImapMailbox> = {
let mailboxes_lck = timeout(uid_store.timeout, uid_store.mailboxes.lock()).await?;
let mut ret = mailboxes_lck.clone();
ret.retain(|k, _| mailbox_hashes.contains(k));
ret
};
for (h, mailbox) in mailboxes.iter() {
if mailbox_hash == *h {
continue;
}
let uid = uid.unwrap();
if uid_store
.uid_index
.lock()
.unwrap()
.contains_key(&(mailbox_hash, uid))
Self::examine_updates(mailbox, &mut connection, &uid_store).await?;
}
connection.send_command(b"IDLE").await?;
let mut blockn = ImapBlockingConnection::from(connection);
let mut watch = std::time::Instant::now();
/* duration interval to send heartbeat */
const _10_MINS: std::time::Duration = std::time::Duration::from_secs(10 * 60);
/* duration interval to check other mailboxes for changes */
loop {
let line = match timeout(
Some(std::cmp::min(*polling_period, _10_MINS)),
blockn.as_stream(),
)
.await
{
Ok(Some(line)) => line,
Ok(None) => {
debug!("IDLE connection dropped: {:?}", &blockn.err());
blockn.conn.connect().await?;
let mut main_conn_lck = timeout(uid_store.timeout, main_conn.lock()).await?;
main_conn_lck.connect().await?;
continue;
}
Err(_) => {
/* Timeout */
blockn.conn.send_raw(b"DONE").await?;
blockn
.conn
.read_response(&mut response, RequiredResponses::empty())
.await?;
blockn.conn.send_command(b"IDLE").await?;
let mut main_conn_lck = timeout(uid_store.timeout, main_conn.lock()).await?;
main_conn_lck.connect().await?;
continue;
}
};
let now = std::time::Instant::now();
if now.duration_since(watch) >= *polling_period {
/* Time to poll all inboxes */
let mut conn = timeout(uid_store.timeout, main_conn.lock()).await?;
for (_h, mailbox) in mailboxes.iter() {
Self::examine_updates(mailbox, &mut conn, &uid_store).await?;
}
watch = now;
}
if line
.split_rn()
.filter(|l| {
!l.starts_with(b"+ ")
&& !l.starts_with(b"* ok")
&& !l.starts_with(b"* ok")
&& !l.starts_with(b"* Ok")
&& !l.starts_with(b"* OK")
})
.count()
== 0
{
continue;
}
let env = envelope.unwrap();
debug!(
"Create event {} {} {}",
env.hash(),
env.subject(),
mailbox.path(),
);
uid_store
.msn_index
.lock()
.unwrap()
.entry(mailbox_hash)
.or_default()
.push(uid);
uid_store
.hash_index
.lock()
.unwrap()
.insert(env.hash(), (uid, mailbox_hash));
uid_store
.uid_index
.lock()
.unwrap()
.insert((mailbox_hash, uid), env.hash());
conn.add_refresh_event(RefreshEvent {
account_hash: uid_store.account_hash,
mailbox_hash,
kind: Create(Box::new(env)),
});
{
blockn.conn.send_raw(b"DONE").await?;
blockn
.conn
.read_response(&mut response, RequiredResponses::empty())
.await?;
for l in line.split_rn().chain(response.split_rn()) {
debug!("process_untagged {:?}", String::from_utf8_lossy(&l));
if l.starts_with(b"+ ")
|| l.starts_with(b"* ok")
|| l.starts_with(b"* ok")
|| l.starts_with(b"* Ok")
|| l.starts_with(b"* OK")
{
debug!("ignore continuation mark");
continue;
}
blockn.conn.process_untagged(l).await?;
}
blockn.conn.send_command(b"IDLE").await?;
}
}
}
Ok(())
pub async fn poll_with_examine(&mut self) -> Result<()> {
debug!("poll with examine");
let ImapWatcher {
ref mailbox_hashes,
ref uid_store,
ref polling_period,
ref server_conf,
..
} = self;
let mut connection = ImapConnection::new_connection(server_conf, uid_store.clone());
connection.connect().await?;
let mailboxes: HashMap<MailboxHash, ImapMailbox> = {
let mailboxes_lck = timeout(uid_store.timeout, uid_store.mailboxes.lock()).await?;
let mut ret = mailboxes_lck.clone();
ret.retain(|k, _| mailbox_hashes.contains(k));
ret
};
loop {
for (_, mailbox) in mailboxes.iter() {
Self::examine_updates(mailbox, &mut connection, &uid_store).await?;
}
crate::connections::sleep(*polling_period).await;
}
}
pub async fn examine_updates(
mailbox: &ImapMailbox,
conn: &mut ImapConnection,
uid_store: &Arc<UIDStore>,
) -> Result<()> {
if mailbox.no_select {
return Ok(());
}
let mailbox_hash = mailbox.hash();
debug!("examining mailbox {} {}", mailbox_hash, mailbox.path());
if let Some(new_envelopes) = conn.resync(mailbox_hash).await? {
for env in new_envelopes {
conn.add_refresh_event(RefreshEvent {
mailbox_hash,
account_hash: uid_store.account_hash,
kind: RefreshEventKind::Create(Box::new(env)),
});
}
} else {
#[cfg(not(feature = "sqlite3"))]
let mut cache_handle = super::cache::DefaultCache::get(uid_store.clone())?;
#[cfg(feature = "sqlite3")]
let mut cache_handle = super::cache::Sqlite3Cache::get(uid_store.clone())?;
let mut response = Vec::with_capacity(8 * 1024);
let select_response = conn
.examine_mailbox(mailbox_hash, &mut response, true)
.await?
.unwrap();
{
let mut uidvalidities = uid_store.uidvalidity.lock().unwrap();
if let Some(v) = uidvalidities.get(&mailbox_hash) {
if *v != select_response.uidvalidity {
if uid_store.keep_offline_cache {
cache_handle.clear(mailbox_hash, &select_response)?;
}
conn.add_refresh_event(RefreshEvent {
account_hash: uid_store.account_hash,
mailbox_hash,
kind: RefreshEventKind::Rescan,
});
/*
uid_store.uid_index.lock().unwrap().clear();
uid_store.hash_index.lock().unwrap().clear();
uid_store.byte_cache.lock().unwrap().clear();
*/
return Ok(());
}
} else {
uidvalidities.insert(mailbox_hash, select_response.uidvalidity);
}
}
if mailbox.is_cold() {
/* Mailbox hasn't been loaded yet */
let has_list_status: bool = conn
.uid_store
.capabilities
.lock()
.unwrap()
.iter()
.any(|cap| cap.eq_ignore_ascii_case(b"LIST-STATUS"));
if has_list_status {
conn.send_command(
format!(
"LIST \"{}\" \"\" RETURN (STATUS (MESSAGES UNSEEN))",
mailbox.imap_path()
)
.as_bytes(),
)
.await?;
conn.read_response(
&mut response,
RequiredResponses::LIST_REQUIRED | RequiredResponses::STATUS,
)
.await?;
debug!(
"list return status out: {}",
String::from_utf8_lossy(&response)
);
for l in response.split_rn() {
if !l.starts_with(b"*") {
continue;
}
if let Ok(status) = protocol_parser::status_response(&l).map(|(_, v)| v) {
if Some(mailbox_hash) == status.mailbox {
if let Some(total) = status.messages {
if let Ok(mut exists_lck) = mailbox.exists.lock() {
exists_lck.clear();
exists_lck.set_not_yet_seen(total);
}
}
if let Some(total) = status.unseen {
if let Ok(mut unseen_lck) = mailbox.unseen.lock() {
unseen_lck.clear();
unseen_lck.set_not_yet_seen(total);
}
}
break;
}
}
}
} else {
conn.send_command(b"SEARCH UNSEEN").await?;
conn.read_response(&mut response, RequiredResponses::SEARCH)
.await?;
let unseen_count = protocol_parser::search_results(&response)?.1.len();
if let Ok(mut exists_lck) = mailbox.exists.lock() {
exists_lck.clear();
exists_lck.set_not_yet_seen(select_response.exists);
}
if let Ok(mut unseen_lck) = mailbox.unseen.lock() {
unseen_lck.clear();
unseen_lck.set_not_yet_seen(unseen_count);
}
}
mailbox.set_warm(true);
return Ok(());
}
if select_response.recent > 0 {
/* UID SEARCH RECENT */
conn.send_command(b"UID SEARCH RECENT").await?;
conn.read_response(&mut response, RequiredResponses::SEARCH)
.await?;
let v = protocol_parser::search_results(response.as_slice()).map(|(_, v)| v)?;
if v.is_empty() {
debug!(
"search response was empty: {}",
String::from_utf8_lossy(&response)
);
return Ok(());
}
let mut cmd = "UID FETCH ".to_string();
if v.len() == 1 {
cmd.push_str(&v[0].to_string());
} else {
cmd.push_str(&v[0].to_string());
for n in v.into_iter().skip(1) {
cmd.push(',');
cmd.push_str(&n.to_string());
}
}
cmd.push_str(
" (UID FLAGS ENVELOPE BODY.PEEK[HEADER.FIELDS (REFERENCES)] BODYSTRUCTURE)",
);
conn.send_command(cmd.as_bytes()).await?;
conn.read_response(&mut response, RequiredResponses::FETCH_REQUIRED)
.await?;
} else if select_response.exists > mailbox.exists.lock().unwrap().len() {
conn.send_command(
format!(
"FETCH {}:* (UID FLAGS ENVELOPE BODY.PEEK[HEADER.FIELDS (REFERENCES)] BODYSTRUCTURE)",
std::cmp::max(mailbox.exists.lock().unwrap().len(), 1)
)
.as_bytes(),
)
.await?;
conn.read_response(&mut response, RequiredResponses::FETCH_REQUIRED)
.await?;
} else {
return Ok(());
}
debug!(
"fetch response is {} bytes and {} lines",
response.len(),
String::from_utf8_lossy(&response).lines().count()
);
let (_, mut v, _) = protocol_parser::fetch_responses(&response)?;
debug!("responses len is {}", v.len());
for FetchResponse {
ref uid,
ref mut envelope,
ref mut flags,
ref references,
..
} in v.iter_mut()
{
let uid = uid.unwrap();
let env = envelope.as_mut().unwrap();
env.set_hash(generate_envelope_hash(&mailbox.imap_path(), &uid));
if let Some(value) = references {
env.set_references(value);
}
let mut tag_lck = uid_store.collection.tag_index.write().unwrap();
if let Some((flags, keywords)) = flags {
env.set_flags(*flags);
if !env.is_seen() {
mailbox.unseen.lock().unwrap().insert_new(env.hash());
}
mailbox.exists.lock().unwrap().insert_new(env.hash());
for f in keywords {
let hash = tag_hash!(f);
if !tag_lck.contains_key(&hash) {
tag_lck.insert(hash, f.to_string());
}
env.labels_mut().push(hash);
}
}
}
if uid_store.keep_offline_cache {
if !cache_handle.mailbox_state(mailbox_hash)?.is_none() {
cache_handle
.insert_envelopes(mailbox_hash, &v)
.chain_err_summary(|| {
format!(
"Could not save envelopes in cache for mailbox {}",
mailbox.imap_path()
)
})?;
}
}
for FetchResponse { uid, envelope, .. } in v {
if uid.is_none() || envelope.is_none() {
continue;
}
let uid = uid.unwrap();
if uid_store
.uid_index
.lock()
.unwrap()
.contains_key(&(mailbox_hash, uid))
{
continue;
}
let env = envelope.unwrap();
debug!(
"Create event {} {} {}",
env.hash(),
env.subject(),
mailbox.path(),
);
uid_store
.msn_index
.lock()
.unwrap()
.entry(mailbox_hash)
.or_default()
.push(uid);
uid_store
.hash_index
.lock()
.unwrap()
.insert(env.hash(), (uid, mailbox_hash));
uid_store
.uid_index
.lock()
.unwrap()
.insert((mailbox_hash, uid), env.hash());
conn.add_refresh_event(RefreshEvent {
account_hash: uid_store.account_hash,
mailbox_hash,
kind: Create(Box::new(env)),
});
}
}
Ok(())
}
}

View File

@ -90,6 +90,8 @@ use objects::*;
pub mod mailbox;
use mailbox::*;
pub mod watch;
#[derive(Debug, Default)]
pub struct EnvelopeCache {
bytes: Option<String>,
@ -313,6 +315,147 @@ impl MailBackend for JmapType {
}))
}
fn load(&mut self, mailbox_hash: MailboxHash) -> ResultFuture<()> {
let store = self.store.clone();
let connection = self.connection.clone();
Ok(Box::pin(async move {
{
crate::connections::sleep(std::time::Duration::from_secs(2)).await;
}
let mailbox_id = store.mailboxes.read().unwrap()[&mailbox_hash].id.clone();
let email_query_call: EmailQuery = EmailQuery::new(
Query::new()
.account_id(conn.mail_account_id().clone())
.filter(Some(Filter::Condition(
EmailFilterCondition::new()
.in_mailbox(Some(mailbox_id))
.into(),
)))
.position(0)
.properties(Some(vec![
"id".to_string(),
"receivedAt".to_string(),
"messageId".to_string(),
"inReplyTo".to_string(),
"hasAttachment".to_string(),
"keywords".to_string(),
])),
)
.collapse_threads(false);
let mut req = Request::new(conn.request_no.clone());
let prev_seq = req.add_call(&email_query_call);
let email_call: EmailGet = EmailGet::new(
Get::new()
.ids(Some(JmapArgument::reference(
prev_seq,
EmailQuery::RESULT_FIELD_IDS,
)))
.account_id(conn.mail_account_id().clone()),
);
req.add_call(&email_call);
let api_url = conn.session.lock().unwrap().api_url.clone();
let mut res = conn
.client
.post_async(api_url.as_str(), serde_json::to_string(&req)?)
.await?;
let res_text = res.text_async().await?;
let mut v: MethodResponse = serde_json::from_str(&res_text).unwrap();
let e = GetResponse::<EmailObject>::try_from(v.method_responses.pop().unwrap())?;
let query_response =
QueryResponse::<EmailObject>::try_from(v.method_responses.pop().unwrap())?;
store
.mailboxes
.write()
.unwrap()
.entry(mailbox_hash)
.and_modify(|mbox| {
*mbox.email_query_state.lock().unwrap() = Some(query_response.query_state);
});
let GetResponse::<EmailObject> { list, state, .. } = e;
{
let (is_empty, is_equal) = {
let mailboxes_lck = conn.store.mailboxes.read().unwrap();
mailboxes_lck
.get(&mailbox_hash)
.map(|mbox| {
let current_state_lck = mbox.email_state.lock().unwrap();
(
current_state_lck.is_none(),
current_state_lck.as_ref() != Some(&state),
)
})
.unwrap_or((true, true))
};
if is_empty {
let mut mailboxes_lck = conn.store.mailboxes.write().unwrap();
debug!("{:?}: inserting state {}", EmailObject::NAME, &state);
mailboxes_lck.entry(mailbox_hash).and_modify(|mbox| {
*mbox.email_state.lock().unwrap() = Some(state);
});
} else if !is_equal {
conn.email_changes(mailbox_hash).await?;
}
}
let mut total = BTreeSet::default();
let mut unread = BTreeSet::default();
let new_envelopes: HashMap<EnvelopeHash, Envelope> = list
.into_iter(|obj| {
let env = store.add_envelope(obj);
total.insert(env.hash());
if !env.is_seen() {
unread.insert(env.hash());
}
(env.hash(), env)
})
.collect();
let mut mailboxes_lck = store.mailboxes.write().unwrap();
mailboxes_lck.entry(mailbox_hash).and_modify(|mbox| {
mbox.total_emails.lock().unwrap().insert_existing_set(total);
mbox.unread_emails
.lock()
.unwrap()
.insert_existing_set(unread);
});
let keys: BTreeSet<EnvelopeHash> = new_envelopes.keys().cloned().collect();
collection.merge(new_envelopes, mailbox_hash, None);
let mut envelopes_lck = collection.envelopes.write().unwrap();
envelopes_lck.retain(|k, _| !keys.contains(k));
Ok(())
}))
}
fn fetch_batch(&mut self, env_hashes: EnvelopeHashBatch) -> ResultFuture<()> {
todo!()
/*
let store = self.store.clone();
let connection = self.connection.clone();
Ok(Box::pin(async move {
//crate::connections::sleep(std::time::Duration::from_secs(2)).await;
debug!("fetch_batch {:?}", &env_hashes);
let mut envelopes_lck = collection.envelopes.write().unwrap();
for env_hash in env_hashes.iter() {
if envelopes_lck.contains_key(&env_hash) {
continue;
}
let index_lck = index.write().unwrap();
let message = Message::find_message(&database, &index_lck[&env_hash])?;
drop(index_lck);
let env = message.into_envelope(&index, &collection.tag_index);
envelopes_lck.insert(env_hash, env);
}
debug!("fetch_batch {:?} done", &env_hashes);
Ok(())
}))
*/
}
fn fetch(
&mut self,
mailbox_hash: MailboxHash,
@ -341,32 +484,12 @@ impl MailBackend for JmapType {
}))
}
fn watch(&self) -> ResultFuture<()> {
fn watcher(&self) -> Result<Box<dyn BackendWatcher>> {
let connection = self.connection.clone();
let store = self.store.clone();
Ok(Box::pin(async move {
{
let mut conn = connection.lock().await;
conn.connect().await?;
}
loop {
{
let mailbox_hashes = {
store
.mailboxes
.read()
.unwrap()
.keys()
.cloned()
.collect::<SmallVec<[MailboxHash; 16]>>()
};
let conn = connection.lock().await;
for mailbox_hash in mailbox_hashes {
conn.email_changes(mailbox_hash).await?;
}
}
crate::connections::sleep(std::time::Duration::from_secs(60)).await;
}
Ok(Box::new(watch::JmapWatcher {
connection,
mailbox_hashes: BTreeSet::default(),
polling_period: std::time::Duration::from_secs(60),
}))
}

View File

@ -0,0 +1,83 @@
/*
* melib - JMAP
*
* Copyright 2020 Manos Pitsidianakis
*
* This file is part of meli.
*
* meli is free software: you can redistribute it and/or modify
* it under the terms of the GNU General Public License as published by
* the Free Software Foundation, either version 3 of the License, or
* (at your option) any later version.
*
* meli is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* GNU General Public License for more details.
*
* You should have received a copy of the GNU General Public License
* along with meli. If not, see <http://www.gnu.org/licenses/>.
*/
use super::*;
use std::collections::BTreeSet;
#[derive(Debug)]
pub struct JmapWatcher {
pub mailbox_hashes: BTreeSet<MailboxHash>,
pub polling_period: std::time::Duration,
pub connection: Arc<FutureMutex<JmapConnection>>,
}
impl BackendWatcher for JmapWatcher {
fn is_blocking(&self) -> bool {
false
}
fn register_mailbox(
&mut self,
mailbox_hash: MailboxHash,
_urgency: MailboxWatchUrgency,
) -> Result<()> {
self.mailbox_hashes.insert(mailbox_hash);
Ok(())
}
fn set_polling_period(&mut self, period: Option<std::time::Duration>) -> Result<()> {
if let Some(period) = period {
self.polling_period = period;
}
Ok(())
}
fn spawn(self: Box<Self>) -> ResultFuture<()> {
let JmapWatcher {
mailbox_hashes,
polling_period,
connection,
} = *self;
Ok(Box::pin(async move {
{
let mut conn = connection.lock().await;
conn.connect().await?;
}
loop {
{
let conn = connection.lock().await;
for &mailbox_hash in &mailbox_hashes {
conn.email_changes(mailbox_hash).await?;
}
}
crate::connections::sleep(polling_period).await;
}
}))
}
fn as_any(&self) -> &dyn Any {
self
}
fn as_any_mut(&mut self) -> &mut dyn Any {
self
}
}

View File

@ -26,6 +26,8 @@ pub use self::backend::*;
mod stream;
pub use stream::*;
pub mod watch;
use crate::backends::*;
use crate::email::Flag;
use crate::error::{MeliError, Result};

View File

@ -27,20 +27,13 @@ use crate::error::{ErrorKind, MeliError, Result};
use crate::shellexpand::ShellExpandTrait;
use crate::Collection;
use futures::prelude::Stream;
extern crate notify;
use self::notify::{watcher, DebouncedEvent, RecursiveMode, Watcher};
use std::time::Duration;
use std::collections::{hash_map::DefaultHasher, HashMap, HashSet};
use std::ffi::OsStr;
use std::fs;
use std::hash::{Hash, Hasher};
use std::io::{self, Read, Write};
use std::ops::{Deref, DerefMut};
use std::os::unix::fs::PermissionsExt;
use std::path::{Component, Path, PathBuf};
use std::sync::mpsc::channel;
use std::path::{Path, PathBuf};
use std::sync::{Arc, Mutex};
#[derive(Clone, Debug, PartialEq)]
@ -339,494 +332,28 @@ impl MailBackend for MaildirType {
}))
}
fn watch(&self) -> ResultFuture<()> {
let sender = self.event_consumer.clone();
let (tx, rx) = channel();
let mut watcher = watcher(tx, Duration::from_secs(2)).unwrap();
fn watcher(&self) -> Result<Box<dyn BackendWatcher>> {
let account_hash = {
let mut hasher = DefaultHasher::default();
hasher.write(self.name.as_bytes());
hasher.finish()
};
let root_path = self.path.to_path_buf();
watcher.watch(&root_path, RecursiveMode::Recursive).unwrap();
let cache_dir = xdg::BaseDirectories::with_profile("meli", &self.name).unwrap();
debug!("watching {:?}", root_path);
let event_consumer = self.event_consumer.clone();
let hash_indexes = self.hash_indexes.clone();
let mailbox_index = self.mailbox_index.clone();
let root_mailbox_hash: MailboxHash = self
.mailboxes
.values()
.find(|m| m.parent.is_none())
.map(|m| m.hash())
.unwrap();
let mailbox_counts = self
.mailboxes
.iter()
.map(|(&k, v)| (k, (v.unseen.clone(), v.total.clone())))
.collect::<HashMap<MailboxHash, (Arc<Mutex<usize>>, Arc<Mutex<usize>>)>>();
Ok(Box::pin(async move {
// Move `watcher` in the closure's scope so that it doesn't get dropped.
let _watcher = watcher;
let mut buf = Vec::with_capacity(4096);
loop {
match rx.recv() {
/*
* Event types:
*
* pub enum RefreshEventKind {
* Update(EnvelopeHash, Envelope), // Old hash, new envelope
* Create(Envelope),
* Remove(EnvelopeHash),
* Rescan,
* }
*/
Ok(event) => match event {
/* Create */
DebouncedEvent::Create(mut pathbuf) => {
debug!("DebouncedEvent::Create(path = {:?}", pathbuf);
if path_is_new!(pathbuf) {
debug!("path_is_new");
/* This creates a Rename event that we will receive later */
pathbuf = match move_to_cur(pathbuf) {
Ok(p) => p,
Err(e) => {
debug!("error: {}", e.to_string());
continue;
}
};
}
let mailbox_hash = get_path_hash!(pathbuf);
let file_name = pathbuf
.as_path()
.strip_prefix(&root_path)
.unwrap()
.to_path_buf();
if let Ok(env) = add_path_to_index(
&hash_indexes,
mailbox_hash,
pathbuf.as_path(),
&cache_dir,
file_name,
&mut buf,
) {
mailbox_index
.lock()
.unwrap()
.insert(env.hash(), mailbox_hash);
debug!(
"Create event {} {} {}",
env.hash(),
env.subject(),
pathbuf.display()
);
if !env.is_seen() {
*mailbox_counts[&mailbox_hash].0.lock().unwrap() += 1;
}
*mailbox_counts[&mailbox_hash].1.lock().unwrap() += 1;
(sender)(
account_hash,
BackendEvent::Refresh(RefreshEvent {
account_hash,
mailbox_hash,
kind: Create(Box::new(env)),
}),
);
}
}
/* Update */
DebouncedEvent::NoticeWrite(pathbuf) | DebouncedEvent::Write(pathbuf) => {
debug!("DebouncedEvent::Write(path = {:?}", &pathbuf);
let mailbox_hash = get_path_hash!(pathbuf);
let mut hash_indexes_lock = hash_indexes.lock().unwrap();
let index_lock =
&mut hash_indexes_lock.entry(mailbox_hash).or_default();
let file_name = pathbuf
.as_path()
.strip_prefix(&root_path)
.unwrap()
.to_path_buf();
/* Linear search in hash_index to find old hash */
let old_hash: EnvelopeHash = {
if let Some((k, v)) =
index_lock.iter_mut().find(|(_, v)| *v.buf == pathbuf)
{
*v = pathbuf.clone().into();
*k
} else {
drop(hash_indexes_lock);
/* Did we just miss a Create event? In any case, create
* envelope. */
if let Ok(env) = add_path_to_index(
&hash_indexes,
mailbox_hash,
pathbuf.as_path(),
&cache_dir,
file_name,
&mut buf,
) {
mailbox_index
.lock()
.unwrap()
.insert(env.hash(), mailbox_hash);
(sender)(
account_hash,
BackendEvent::Refresh(RefreshEvent {
account_hash,
mailbox_hash,
kind: Create(Box::new(env)),
}),
);
}
continue;
}
};
let new_hash: EnvelopeHash = get_file_hash(pathbuf.as_path());
let mut reader = io::BufReader::new(fs::File::open(&pathbuf)?);
buf.clear();
reader.read_to_end(&mut buf)?;
if index_lock.get_mut(&new_hash).is_none() {
debug!("write notice");
if let Ok(mut env) =
Envelope::from_bytes(buf.as_slice(), Some(pathbuf.flags()))
{
env.set_hash(new_hash);
debug!("{}\t{:?}", new_hash, &pathbuf);
debug!(
"hash {}, path: {:?} couldn't be parsed",
new_hash, &pathbuf
);
index_lock.insert(new_hash, pathbuf.into());
/* Send Write notice */
(sender)(
account_hash,
BackendEvent::Refresh(RefreshEvent {
account_hash,
mailbox_hash,
kind: Update(old_hash, Box::new(env)),
}),
);
}
}
}
/* Remove */
DebouncedEvent::NoticeRemove(pathbuf) | DebouncedEvent::Remove(pathbuf) => {
debug!("DebouncedEvent::Remove(path = {:?}", pathbuf);
let mailbox_hash = get_path_hash!(pathbuf);
let mut hash_indexes_lock = hash_indexes.lock().unwrap();
let index_lock = hash_indexes_lock.entry(mailbox_hash).or_default();
let hash: EnvelopeHash = if let Some((k, _)) =
index_lock.iter().find(|(_, v)| *v.buf == pathbuf)
{
*k
} else {
debug!("removed but not contained in index");
continue;
};
if let Some(ref modif) = &index_lock[&hash].modified {
match modif {
PathMod::Path(path) => debug!(
"envelope {} has modified path set {}",
hash,
path.display()
),
PathMod::Hash(hash) => debug!(
"envelope {} has modified path set {}",
hash,
&index_lock[&hash].buf.display()
),
}
index_lock.entry(hash).and_modify(|e| {
e.removed = false;
});
continue;
}
{
let mut lck = mailbox_counts[&mailbox_hash].1.lock().unwrap();
*lck = lck.saturating_sub(1);
}
if !pathbuf.flags().contains(Flag::SEEN) {
let mut lck = mailbox_counts[&mailbox_hash].0.lock().unwrap();
*lck = lck.saturating_sub(1);
}
index_lock.entry(hash).and_modify(|e| {
e.removed = true;
});
(sender)(
account_hash,
BackendEvent::Refresh(RefreshEvent {
account_hash,
mailbox_hash,
kind: Remove(hash),
}),
);
}
/* Envelope hasn't changed */
DebouncedEvent::Rename(src, dest) => {
debug!("DebouncedEvent::Rename(src = {:?}, dest = {:?})", src, dest);
let mailbox_hash = get_path_hash!(src);
let dest_mailbox = {
let dest_mailbox = get_path_hash!(dest);
if dest_mailbox == mailbox_hash {
None
} else {
Some(dest_mailbox)
}
};
let old_hash: EnvelopeHash = get_file_hash(src.as_path());
let new_hash: EnvelopeHash = get_file_hash(dest.as_path());
let mut hash_indexes_lock = hash_indexes.lock().unwrap();
let index_lock = hash_indexes_lock.entry(mailbox_hash).or_default();
let old_flags = src.flags();
let new_flags = dest.flags();
let was_seen: bool = old_flags.contains(Flag::SEEN);
let is_seen: bool = new_flags.contains(Flag::SEEN);
if index_lock.contains_key(&old_hash) && !index_lock[&old_hash].removed
{
debug!("contains_old_key");
if let Some(dest_mailbox) = dest_mailbox {
index_lock.entry(old_hash).and_modify(|e| {
e.removed = true;
});
(sender)(
account_hash,
BackendEvent::Refresh(RefreshEvent {
account_hash,
mailbox_hash,
kind: Remove(old_hash),
}),
);
let file_name = dest
.as_path()
.strip_prefix(&root_path)
.unwrap()
.to_path_buf();
drop(hash_indexes_lock);
if let Ok(env) = add_path_to_index(
&hash_indexes,
dest_mailbox,
dest.as_path(),
&cache_dir,
file_name,
&mut buf,
) {
mailbox_index
.lock()
.unwrap()
.insert(env.hash(), dest_mailbox);
debug!(
"Create event {} {} {}",
env.hash(),
env.subject(),
dest.display()
);
if !env.is_seen() {
*mailbox_counts[&dest_mailbox].0.lock().unwrap() += 1;
}
*mailbox_counts[&dest_mailbox].1.lock().unwrap() += 1;
(sender)(
account_hash,
BackendEvent::Refresh(RefreshEvent {
account_hash,
mailbox_hash: dest_mailbox,
kind: Create(Box::new(env)),
}),
);
}
} else {
index_lock.entry(old_hash).and_modify(|e| {
debug!(&e.modified);
e.modified = Some(PathMod::Hash(new_hash));
});
(sender)(
account_hash,
BackendEvent::Refresh(RefreshEvent {
account_hash,
mailbox_hash,
kind: Rename(old_hash, new_hash),
}),
);
if !was_seen && is_seen {
let mut lck =
mailbox_counts[&mailbox_hash].0.lock().unwrap();
*lck = lck.saturating_sub(1);
} else if was_seen && !is_seen {
*mailbox_counts[&mailbox_hash].0.lock().unwrap() += 1;
}
if old_flags != new_flags {
(sender)(
account_hash,
BackendEvent::Refresh(RefreshEvent {
account_hash,
mailbox_hash,
kind: NewFlags(new_hash, (new_flags, vec![])),
}),
);
}
mailbox_index.lock().unwrap().insert(new_hash, mailbox_hash);
index_lock.insert(new_hash, dest.into());
}
continue;
} else if !index_lock.contains_key(&new_hash)
&& index_lock
.get(&old_hash)
.map(|e| e.removed)
.unwrap_or(false)
{
if index_lock
.get(&old_hash)
.map(|e| e.removed)
.unwrap_or(false)
{
index_lock.entry(old_hash).and_modify(|e| {
e.modified = Some(PathMod::Hash(new_hash));
e.removed = false;
});
debug!("contains_old_key, key was marked as removed (by external source)");
} else {
debug!("not contains_new_key");
}
let file_name = dest
.as_path()
.strip_prefix(&root_path)
.unwrap()
.to_path_buf();
debug!("filename = {:?}", file_name);
drop(hash_indexes_lock);
if let Ok(env) = add_path_to_index(
&hash_indexes,
dest_mailbox.unwrap_or(mailbox_hash),
dest.as_path(),
&cache_dir,
file_name,
&mut buf,
) {
mailbox_index
.lock()
.unwrap()
.insert(env.hash(), dest_mailbox.unwrap_or(mailbox_hash));
debug!(
"Create event {} {} {}",
env.hash(),
env.subject(),
dest.display()
);
if !env.is_seen() {
*mailbox_counts[&dest_mailbox.unwrap_or(mailbox_hash)]
.0
.lock()
.unwrap() += 1;
}
*mailbox_counts[&dest_mailbox.unwrap_or(mailbox_hash)]
.1
.lock()
.unwrap() += 1;
(sender)(
account_hash,
BackendEvent::Refresh(RefreshEvent {
account_hash,
mailbox_hash: dest_mailbox.unwrap_or(mailbox_hash),
kind: Create(Box::new(env)),
}),
);
continue;
} else {
debug!("not valid email");
}
} else if let Some(dest_mailbox) = dest_mailbox {
drop(hash_indexes_lock);
let file_name = dest
.as_path()
.strip_prefix(&root_path)
.unwrap()
.to_path_buf();
if let Ok(env) = add_path_to_index(
&hash_indexes,
dest_mailbox,
dest.as_path(),
&cache_dir,
file_name,
&mut buf,
) {
mailbox_index
.lock()
.unwrap()
.insert(env.hash(), dest_mailbox);
debug!(
"Create event {} {} {}",
env.hash(),
env.subject(),
dest.display()
);
if !env.is_seen() {
*mailbox_counts[&dest_mailbox].0.lock().unwrap() += 1;
}
*mailbox_counts[&dest_mailbox].1.lock().unwrap() += 1;
(sender)(
account_hash,
BackendEvent::Refresh(RefreshEvent {
account_hash,
mailbox_hash: dest_mailbox,
kind: Create(Box::new(env)),
}),
);
}
} else {
if was_seen && !is_seen {
*mailbox_counts[&mailbox_hash].0.lock().unwrap() += 1;
}
(sender)(
account_hash,
BackendEvent::Refresh(RefreshEvent {
account_hash,
mailbox_hash,
kind: Rename(old_hash, new_hash),
}),
);
debug!("contains_new_key");
if old_flags != new_flags {
(sender)(
account_hash,
BackendEvent::Refresh(RefreshEvent {
account_hash,
mailbox_hash,
kind: NewFlags(new_hash, (new_flags, vec![])),
}),
);
}
}
/* Maybe a re-read should be triggered here just to be safe.
(sender)(account_hash, BackendEvent::Refresh(RefreshEvent {
account_hash,
mailbox_hash: get_path_hash!(dest),
kind: Rescan,
}));
*/
}
/* Trigger rescan of mailbox */
DebouncedEvent::Rescan => {
(sender)(
account_hash,
BackendEvent::Refresh(RefreshEvent {
account_hash,
mailbox_hash: root_mailbox_hash,
kind: Rescan,
}),
);
}
_ => {}
},
Err(e) => debug!("watch error: {:?}", e),
}
}
let mailboxes = self.mailboxes.clone();
let root_path = self.path.to_path_buf();
Ok(Box::new(super::watch::MaildirWatcher {
account_hash,
cache_dir,
event_consumer,
hash_indexes,
mailbox_hashes: BTreeSet::default(),
mailbox_index,
mailboxes,
polling_period: std::time::Duration::from_secs(2),
root_path,
}))
}
@ -1335,49 +862,3 @@ impl MaildirType {
Ok(())
}
}
fn add_path_to_index(
hash_index: &HashIndexes,
mailbox_hash: MailboxHash,
path: &Path,
cache_dir: &xdg::BaseDirectories,
file_name: PathBuf,
buf: &mut Vec<u8>,
) -> Result<Envelope> {
debug!("add_path_to_index path {:?} filename{:?}", path, file_name);
let env_hash = get_file_hash(path);
{
let mut map = hash_index.lock().unwrap();
let map = map.entry(mailbox_hash).or_default();
map.insert(env_hash, path.to_path_buf().into());
debug!(
"inserted {} in {} map, len={}",
env_hash,
mailbox_hash,
map.len()
);
}
let mut reader = io::BufReader::new(fs::File::open(&path)?);
buf.clear();
reader.read_to_end(buf)?;
let mut env = Envelope::from_bytes(buf.as_slice(), Some(path.flags()))?;
env.set_hash(env_hash);
debug!(
"add_path_to_index gen {}\t{}",
env_hash,
file_name.display()
);
if let Ok(cached) = cache_dir.place_cache_file(file_name) {
debug!("putting in cache");
/* place result in cache directory */
let f = fs::File::create(cached)?;
let metadata = f.metadata()?;
let mut permissions = metadata.permissions();
permissions.set_mode(0o600); // Read/write for owner only.
f.set_permissions(permissions)?;
let writer = io::BufWriter::new(f);
bincode::Options::serialize_into(bincode::config::DefaultOptions::new(), writer, &env)?;
}
Ok(env)
}

View File

@ -0,0 +1,608 @@
/*
* meli - mailbox module.
*
* Copyright 2017 - 2021 Manos Pitsidianakis
*
* This file is part of meli.
*
* meli is free software: you can redistribute it and/or modify
* it under the terms of the GNU General Public License as published by
* the Free Software Foundation, either version 3 of the License, or
* (at your option) any later version.
*
* meli is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* GNU General Public License for more details.
*
* You should have received a copy of the GNU General Public License
* along with meli. If not, see <http://www.gnu.org/licenses/>.
*/
use super::*;
use crate::backends::{RefreshEventKind::*, *};
use std::collections::BTreeSet;
use std::ffi::OsStr;
use std::io;
use std::os::unix::fs::PermissionsExt;
extern crate notify;
use notify::{watcher, DebouncedEvent, RecursiveMode, Watcher};
use std::path::{Component, Path, PathBuf};
use std::sync::mpsc::channel;
#[derive(Debug)]
pub struct MaildirWatcher {
pub account_hash: AccountHash,
pub cache_dir: xdg::BaseDirectories,
pub event_consumer: BackendEventConsumer,
pub hash_indexes: HashIndexes,
pub mailbox_hashes: BTreeSet<MailboxHash>,
pub mailbox_index: Arc<Mutex<HashMap<EnvelopeHash, MailboxHash>>>,
pub mailboxes: HashMap<MailboxHash, MaildirMailbox>,
pub polling_period: std::time::Duration,
pub root_path: PathBuf,
}
impl BackendWatcher for MaildirWatcher {
fn is_blocking(&self) -> bool {
true
}
fn register_mailbox(
&mut self,
mailbox_hash: MailboxHash,
_urgency: MailboxWatchUrgency,
) -> Result<()> {
self.mailbox_hashes.insert(mailbox_hash);
Ok(())
}
fn set_polling_period(&mut self, period: Option<std::time::Duration>) -> Result<()> {
if let Some(period) = period {
self.polling_period = period;
}
Ok(())
}
fn spawn(self: Box<Self>) -> ResultFuture<()> {
let MaildirWatcher {
account_hash,
cache_dir,
event_consumer: sender,
hash_indexes,
mailbox_hashes: _,
mailbox_index,
mailboxes,
polling_period,
root_path,
} = *self;
Ok(Box::pin(async move {
let (tx, rx) = channel();
let mut watcher = watcher(tx, polling_period).unwrap();
watcher.watch(&root_path, RecursiveMode::Recursive).unwrap();
debug!("watching {:?}", root_path);
let root_mailbox_hash: MailboxHash = mailboxes
.values()
.find(|m| m.parent.is_none())
.map(|m| m.hash())
.unwrap();
let mailbox_counts = mailboxes
.iter()
.map(|(&k, v)| (k, (v.unseen.clone(), v.total.clone())))
.collect::<HashMap<MailboxHash, (Arc<Mutex<usize>>, Arc<Mutex<usize>>)>>();
let mut buf = Vec::with_capacity(4096);
loop {
match rx.recv() {
/*
* Event types:
*
* pub enum RefreshEventKind {
* Update(EnvelopeHash, Envelope), // Old hash, new envelope
* Create(Envelope),
* Remove(EnvelopeHash),
* Rescan,
* }
*/
Ok(event) => match event {
/* Create */
DebouncedEvent::Create(mut pathbuf) => {
debug!("DebouncedEvent::Create(path = {:?}", pathbuf);
if path_is_new!(pathbuf) {
debug!("path_is_new");
/* This creates a Rename event that we will receive later */
pathbuf = match move_to_cur(pathbuf) {
Ok(p) => p,
Err(e) => {
debug!("error: {}", e.to_string());
continue;
}
};
}
let mailbox_hash = get_path_hash!(pathbuf);
let file_name = pathbuf
.as_path()
.strip_prefix(&root_path)
.unwrap()
.to_path_buf();
if let Ok(env) = add_path_to_index(
&hash_indexes,
mailbox_hash,
pathbuf.as_path(),
&cache_dir,
file_name,
&mut buf,
) {
mailbox_index
.lock()
.unwrap()
.insert(env.hash(), mailbox_hash);
debug!(
"Create event {} {} {}",
env.hash(),
env.subject(),
pathbuf.display()
);
if !env.is_seen() {
*mailbox_counts[&mailbox_hash].0.lock().unwrap() += 1;
}
*mailbox_counts[&mailbox_hash].1.lock().unwrap() += 1;
(sender)(
account_hash,
BackendEvent::Refresh(RefreshEvent {
account_hash,
mailbox_hash,
kind: Create(Box::new(env)),
}),
);
}
}
/* Update */
DebouncedEvent::NoticeWrite(pathbuf) | DebouncedEvent::Write(pathbuf) => {
debug!("DebouncedEvent::Write(path = {:?}", &pathbuf);
let mailbox_hash = get_path_hash!(pathbuf);
let mut hash_indexes_lock = hash_indexes.lock().unwrap();
let index_lock =
&mut hash_indexes_lock.entry(mailbox_hash).or_default();
let file_name = pathbuf
.as_path()
.strip_prefix(&root_path)
.unwrap()
.to_path_buf();
/* Linear search in hash_index to find old hash */
let old_hash: EnvelopeHash = {
if let Some((k, v)) =
index_lock.iter_mut().find(|(_, v)| *v.buf == pathbuf)
{
*v = pathbuf.clone().into();
*k
} else {
drop(hash_indexes_lock);
/* Did we just miss a Create event? In any case, create
* envelope. */
if let Ok(env) = add_path_to_index(
&hash_indexes,
mailbox_hash,
pathbuf.as_path(),
&cache_dir,
file_name,
&mut buf,
) {
mailbox_index
.lock()
.unwrap()
.insert(env.hash(), mailbox_hash);
(sender)(
account_hash,
BackendEvent::Refresh(RefreshEvent {
account_hash,
mailbox_hash,
kind: Create(Box::new(env)),
}),
);
}
continue;
}
};
let new_hash: EnvelopeHash = get_file_hash(pathbuf.as_path());
let mut reader = io::BufReader::new(fs::File::open(&pathbuf)?);
buf.clear();
reader.read_to_end(&mut buf)?;
if index_lock.get_mut(&new_hash).is_none() {
debug!("write notice");
if let Ok(mut env) =
Envelope::from_bytes(buf.as_slice(), Some(pathbuf.flags()))
{
env.set_hash(new_hash);
debug!("{}\t{:?}", new_hash, &pathbuf);
debug!(
"hash {}, path: {:?} couldn't be parsed",
new_hash, &pathbuf
);
index_lock.insert(new_hash, pathbuf.into());
/* Send Write notice */
(sender)(
account_hash,
BackendEvent::Refresh(RefreshEvent {
account_hash,
mailbox_hash,
kind: Update(old_hash, Box::new(env)),
}),
);
}
}
}
/* Remove */
DebouncedEvent::NoticeRemove(pathbuf) | DebouncedEvent::Remove(pathbuf) => {
debug!("DebouncedEvent::Remove(path = {:?}", pathbuf);
let mailbox_hash = get_path_hash!(pathbuf);
let mut hash_indexes_lock = hash_indexes.lock().unwrap();
let index_lock = hash_indexes_lock.entry(mailbox_hash).or_default();
let hash: EnvelopeHash = if let Some((k, _)) =
index_lock.iter().find(|(_, v)| *v.buf == pathbuf)
{
*k
} else {
debug!("removed but not contained in index");
continue;
};
if let Some(ref modif) = &index_lock[&hash].modified {
match modif {
PathMod::Path(path) => debug!(
"envelope {} has modified path set {}",
hash,
path.display()
),
PathMod::Hash(hash) => debug!(
"envelope {} has modified path set {}",
hash,
&index_lock[&hash].buf.display()
),
}
index_lock.entry(hash).and_modify(|e| {
e.removed = false;
});
continue;
}
{
let mut lck = mailbox_counts[&mailbox_hash].1.lock().unwrap();
*lck = lck.saturating_sub(1);
}
if !pathbuf.flags().contains(Flag::SEEN) {
let mut lck = mailbox_counts[&mailbox_hash].0.lock().unwrap();
*lck = lck.saturating_sub(1);
}
index_lock.entry(hash).and_modify(|e| {
e.removed = true;
});
(sender)(
account_hash,
BackendEvent::Refresh(RefreshEvent {
account_hash,
mailbox_hash,
kind: Remove(hash),
}),
);
}
/* Envelope hasn't changed */
DebouncedEvent::Rename(src, dest) => {
debug!("DebouncedEvent::Rename(src = {:?}, dest = {:?})", src, dest);
let mailbox_hash = get_path_hash!(src);
let dest_mailbox = {
let dest_mailbox = get_path_hash!(dest);
if dest_mailbox == mailbox_hash {
None
} else {
Some(dest_mailbox)
}
};
let old_hash: EnvelopeHash = get_file_hash(src.as_path());
let new_hash: EnvelopeHash = get_file_hash(dest.as_path());
let mut hash_indexes_lock = hash_indexes.lock().unwrap();
let index_lock = hash_indexes_lock.entry(mailbox_hash).or_default();
let old_flags = src.flags();
let new_flags = dest.flags();
let was_seen: bool = old_flags.contains(Flag::SEEN);
let is_seen: bool = new_flags.contains(Flag::SEEN);
if index_lock.contains_key(&old_hash) && !index_lock[&old_hash].removed
{
debug!("contains_old_key");
if let Some(dest_mailbox) = dest_mailbox {
index_lock.entry(old_hash).and_modify(|e| {
e.removed = true;
});
(sender)(
account_hash,
BackendEvent::Refresh(RefreshEvent {
account_hash,
mailbox_hash,
kind: Remove(old_hash),
}),
);
let file_name = dest
.as_path()
.strip_prefix(&root_path)
.unwrap()
.to_path_buf();
drop(hash_indexes_lock);
if let Ok(env) = add_path_to_index(
&hash_indexes,
dest_mailbox,
dest.as_path(),
&cache_dir,
file_name,
&mut buf,
) {
mailbox_index
.lock()
.unwrap()
.insert(env.hash(), dest_mailbox);
debug!(
"Create event {} {} {}",
env.hash(),
env.subject(),
dest.display()
);
if !env.is_seen() {
*mailbox_counts[&dest_mailbox].0.lock().unwrap() += 1;
}
*mailbox_counts[&dest_mailbox].1.lock().unwrap() += 1;
(sender)(
account_hash,
BackendEvent::Refresh(RefreshEvent {
account_hash,
mailbox_hash: dest_mailbox,
kind: Create(Box::new(env)),
}),
);
}
} else {
index_lock.entry(old_hash).and_modify(|e| {
debug!(&e.modified);
e.modified = Some(PathMod::Hash(new_hash));
});
(sender)(
account_hash,
BackendEvent::Refresh(RefreshEvent {
account_hash,
mailbox_hash,
kind: Rename(old_hash, new_hash),
}),
);
if !was_seen && is_seen {
let mut lck =
mailbox_counts[&mailbox_hash].0.lock().unwrap();
*lck = lck.saturating_sub(1);
} else if was_seen && !is_seen {
*mailbox_counts[&mailbox_hash].0.lock().unwrap() += 1;
}
if old_flags != new_flags {
(sender)(
account_hash,
BackendEvent::Refresh(RefreshEvent {
account_hash,
mailbox_hash,
kind: NewFlags(new_hash, (new_flags, vec![])),
}),
);
}
mailbox_index.lock().unwrap().insert(new_hash, mailbox_hash);
index_lock.insert(new_hash, dest.into());
}
continue;
} else if !index_lock.contains_key(&new_hash)
&& index_lock
.get(&old_hash)
.map(|e| e.removed)
.unwrap_or(false)
{
if index_lock
.get(&old_hash)
.map(|e| e.removed)
.unwrap_or(false)
{
index_lock.entry(old_hash).and_modify(|e| {
e.modified = Some(PathMod::Hash(new_hash));
e.removed = false;
});
debug!("contains_old_key, key was marked as removed (by external source)");
} else {
debug!("not contains_new_key");
}
let file_name = dest
.as_path()
.strip_prefix(&root_path)
.unwrap()
.to_path_buf();
debug!("filename = {:?}", file_name);
drop(hash_indexes_lock);
if let Ok(env) = add_path_to_index(
&hash_indexes,
dest_mailbox.unwrap_or(mailbox_hash),
dest.as_path(),
&cache_dir,
file_name,
&mut buf,
) {
mailbox_index
.lock()
.unwrap()
.insert(env.hash(), dest_mailbox.unwrap_or(mailbox_hash));
debug!(
"Create event {} {} {}",
env.hash(),
env.subject(),
dest.display()
);
if !env.is_seen() {
*mailbox_counts[&dest_mailbox.unwrap_or(mailbox_hash)]
.0
.lock()
.unwrap() += 1;
}
*mailbox_counts[&dest_mailbox.unwrap_or(mailbox_hash)]
.1
.lock()
.unwrap() += 1;
(sender)(
account_hash,
BackendEvent::Refresh(RefreshEvent {
account_hash,
mailbox_hash: dest_mailbox.unwrap_or(mailbox_hash),
kind: Create(Box::new(env)),
}),
);
continue;
} else {
debug!("not valid email");
}
} else if let Some(dest_mailbox) = dest_mailbox {
drop(hash_indexes_lock);
let file_name = dest
.as_path()
.strip_prefix(&root_path)
.unwrap()
.to_path_buf();
if let Ok(env) = add_path_to_index(
&hash_indexes,
dest_mailbox,
dest.as_path(),
&cache_dir,
file_name,
&mut buf,
) {
mailbox_index
.lock()
.unwrap()
.insert(env.hash(), dest_mailbox);
debug!(
"Create event {} {} {}",
env.hash(),
env.subject(),
dest.display()
);
if !env.is_seen() {
*mailbox_counts[&dest_mailbox].0.lock().unwrap() += 1;
}
*mailbox_counts[&dest_mailbox].1.lock().unwrap() += 1;
(sender)(
account_hash,
BackendEvent::Refresh(RefreshEvent {
account_hash,
mailbox_hash: dest_mailbox,
kind: Create(Box::new(env)),
}),
);
}
} else {
if was_seen && !is_seen {
*mailbox_counts[&mailbox_hash].0.lock().unwrap() += 1;
}
(sender)(
account_hash,
BackendEvent::Refresh(RefreshEvent {
account_hash,
mailbox_hash,
kind: Rename(old_hash, new_hash),
}),
);
debug!("contains_new_key");
if old_flags != new_flags {
(sender)(
account_hash,
BackendEvent::Refresh(RefreshEvent {
account_hash,
mailbox_hash,
kind: NewFlags(new_hash, (new_flags, vec![])),
}),
);
}
}
/* Maybe a re-read should be triggered here just to be safe.
(sender)(account_hash, BackendEvent::Refresh(RefreshEvent {
account_hash,
mailbox_hash: get_path_hash!(dest),
kind: Rescan,
}));
*/
}
/* Trigger rescan of mailbox */
DebouncedEvent::Rescan => {
(sender)(
account_hash,
BackendEvent::Refresh(RefreshEvent {
account_hash,
mailbox_hash: root_mailbox_hash,
kind: Rescan,
}),
);
}
_ => {}
},
Err(e) => debug!("watch error: {:?}", e),
}
}
}))
}
fn as_any(&self) -> &dyn Any {
self
}
fn as_any_mut(&mut self) -> &mut dyn Any {
self
}
}
fn add_path_to_index(
hash_index: &HashIndexes,
mailbox_hash: MailboxHash,
path: &Path,
cache_dir: &xdg::BaseDirectories,
file_name: PathBuf,
buf: &mut Vec<u8>,
) -> Result<Envelope> {
debug!("add_path_to_index path {:?} filename{:?}", path, file_name);
let env_hash = get_file_hash(path);
{
let mut map = hash_index.lock().unwrap();
let map = map.entry(mailbox_hash).or_default();
map.insert(env_hash, path.to_path_buf().into());
debug!(
"inserted {} in {} map, len={}",
env_hash,
mailbox_hash,
map.len()
);
}
let mut reader = io::BufReader::new(fs::File::open(&path)?);
buf.clear();
reader.read_to_end(buf)?;
let mut env = Envelope::from_bytes(buf.as_slice(), Some(path.flags()))?;
env.set_hash(env_hash);
debug!(
"add_path_to_index gen {}\t{}",
env_hash,
file_name.display()
);
if let Ok(cached) = cache_dir.place_cache_file(file_name) {
debug!("putting in cache");
/* place result in cache directory */
let f = fs::File::create(cached)?;
let metadata = f.metadata()?;
let mut permissions = metadata.permissions();
permissions.set_mode(0o600); // Read/write for owner only.
f.set_permissions(permissions)?;
let writer = io::BufWriter::new(f);
bincode::Options::serialize_into(bincode::config::DefaultOptions::new(), writer, &env)?;
}
Ok(env)
}

View File

@ -147,6 +147,8 @@ use std::sync::{Arc, Mutex, RwLock};
pub mod write;
pub mod watch;
pub type Offset = usize;
pub type Length = usize;
@ -184,7 +186,7 @@ fn get_rw_lock_blocking(f: &File, path: &Path) -> Result<()> {
}
#[derive(Debug)]
struct MboxMailbox {
pub struct MboxMailbox {
hash: MailboxHash,
name: String,
path: PathBuf,
@ -950,157 +952,24 @@ impl MailBackend for MboxType {
Err(MeliError::new("Unimplemented."))
}
fn watch(&self) -> ResultFuture<()> {
let sender = self.event_consumer.clone();
let (tx, rx) = channel();
let mut watcher = watcher(tx, std::time::Duration::from_secs(10))
.map_err(|e| e.to_string())
.map_err(MeliError::new)?;
for f in self.mailboxes.lock().unwrap().values() {
watcher
.watch(&f.fs_path, RecursiveMode::Recursive)
.map_err(|e| e.to_string())
.map_err(MeliError::new)?;
debug!("watching {:?}", f.fs_path.as_path());
}
fn watcher(&self) -> Result<Box<dyn BackendWatcher>> {
let account_hash = {
let mut hasher = DefaultHasher::new();
hasher.write(self.account_name.as_bytes());
hasher.finish()
};
let mailboxes = self.mailboxes.clone();
let event_consumer = self.event_consumer.clone();
let mailbox_index = self.mailbox_index.clone();
let prefer_mbox_type = self.prefer_mbox_type;
Ok(Box::pin(async move {
loop {
match rx.recv() {
/*
* Event types:
*
* pub enum RefreshEventKind {
* Update(EnvelopeHash, Envelope), // Old hash, new envelope
* Create(Envelope),
* Remove(EnvelopeHash),
* Rescan,
* }
*/
Ok(event) => match event {
/* Update */
DebouncedEvent::NoticeWrite(pathbuf) | DebouncedEvent::Write(pathbuf) => {
let mailbox_hash = get_path_hash!(&pathbuf);
let file = match std::fs::OpenOptions::new()
.read(true)
.write(true)
.open(&pathbuf)
{
Ok(f) => f,
Err(_) => {
continue;
}
};
get_rw_lock_blocking(&file, &pathbuf)?;
let mut mailbox_lock = mailboxes.lock().unwrap();
let mut buf_reader = BufReader::new(file);
let mut contents = Vec::new();
if let Err(e) = buf_reader.read_to_end(&mut contents) {
debug!(e);
continue;
};
if contents.starts_with(mailbox_lock[&mailbox_hash].content.as_slice())
{
if let Ok((_, envelopes)) = mbox_parse(
mailbox_lock[&mailbox_hash].index.clone(),
&contents,
mailbox_lock[&mailbox_hash].content.len(),
prefer_mbox_type,
) {
let mut mailbox_index_lck = mailbox_index.lock().unwrap();
for env in envelopes {
mailbox_index_lck.insert(env.hash(), mailbox_hash);
(sender)(
account_hash,
BackendEvent::Refresh(RefreshEvent {
account_hash,
mailbox_hash,
kind: RefreshEventKind::Create(Box::new(env)),
}),
);
}
}
} else {
(sender)(
account_hash,
BackendEvent::Refresh(RefreshEvent {
account_hash,
mailbox_hash,
kind: RefreshEventKind::Rescan,
}),
);
}
mailbox_lock
.entry(mailbox_hash)
.and_modify(|f| f.content = contents);
}
/* Remove */
DebouncedEvent::NoticeRemove(pathbuf) | DebouncedEvent::Remove(pathbuf) => {
if mailboxes
.lock()
.unwrap()
.values()
.any(|f| f.fs_path == pathbuf)
{
let mailbox_hash = get_path_hash!(&pathbuf);
(sender)(
account_hash,
BackendEvent::Refresh(RefreshEvent {
account_hash,
mailbox_hash,
kind: RefreshEventKind::Failure(MeliError::new(format!(
"mbox mailbox {} was removed.",
pathbuf.display()
))),
}),
);
return Ok(());
}
}
DebouncedEvent::Rename(src, dest) => {
if mailboxes.lock().unwrap().values().any(|f| f.fs_path == src) {
let mailbox_hash = get_path_hash!(&src);
(sender)(
account_hash,
BackendEvent::Refresh(RefreshEvent {
account_hash,
mailbox_hash,
kind: RefreshEventKind::Failure(MeliError::new(format!(
"mbox mailbox {} was renamed to {}.",
src.display(),
dest.display()
))),
}),
);
return Ok(());
}
}
/* Trigger rescan of mailboxes */
DebouncedEvent::Rescan => {
for &mailbox_hash in mailboxes.lock().unwrap().keys() {
(sender)(
account_hash,
BackendEvent::Refresh(RefreshEvent {
account_hash,
mailbox_hash,
kind: RefreshEventKind::Rescan,
}),
);
}
return Ok(());
}
_ => {}
},
Err(e) => debug!("watch error: {:?}", e),
}
}
let mailboxes = self.mailboxes.clone();
let prefer_mbox_type = self.prefer_mbox_type.clone();
Ok(Box::new(watch::MboxWatcher {
account_hash,
event_consumer,
mailbox_hashes: BTreeSet::default(),
mailbox_index,
mailboxes,
polling_period: std::time::Duration::from_secs(60),
prefer_mbox_type,
}))
}

View File

@ -0,0 +1,223 @@
/*
* meli - mailbox module.
*
* Copyright 2017 - 2021 Manos Pitsidianakis
*
* This file is part of meli.
*
* meli is free software: you can redistribute it and/or modify
* it under the terms of the GNU General Public License as published by
* the Free Software Foundation, either version 3 of the License, or
* (at your option) any later version.
*
* meli is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* GNU General Public License for more details.
*
* You should have received a copy of the GNU General Public License
* along with meli. If not, see <http://www.gnu.org/licenses/>.
*/
use super::*;
use std::collections::BTreeSet;
#[derive(Debug)]
pub struct MboxWatcher {
pub account_hash: AccountHash,
pub event_consumer: BackendEventConsumer,
pub mailbox_hashes: BTreeSet<MailboxHash>,
pub mailbox_index: Arc<Mutex<HashMap<EnvelopeHash, MailboxHash>>>,
pub mailboxes: Arc<Mutex<HashMap<MailboxHash, MboxMailbox>>>,
pub polling_period: std::time::Duration,
pub prefer_mbox_type: Option<MboxFormat>,
}
impl BackendWatcher for MboxWatcher {
fn is_blocking(&self) -> bool {
true
}
fn register_mailbox(
&mut self,
mailbox_hash: MailboxHash,
_urgency: MailboxWatchUrgency,
) -> Result<()> {
self.mailbox_hashes.insert(mailbox_hash);
Ok(())
}
fn set_polling_period(&mut self, period: Option<std::time::Duration>) -> Result<()> {
if let Some(period) = period {
self.polling_period = period;
}
Ok(())
}
fn spawn(self: Box<Self>) -> ResultFuture<()> {
let MboxWatcher {
account_hash,
event_consumer: sender,
mailbox_hashes,
mailbox_index,
mailboxes,
polling_period,
prefer_mbox_type,
} = *self;
let (tx, rx) = channel();
let mut watcher = watcher(tx, polling_period)
.map_err(|e| e.to_string())
.map_err(MeliError::new)?;
for (_, f) in mailboxes
.lock()
.unwrap()
.iter()
.filter(|(k, _)| mailbox_hashes.contains(k))
{
watcher
.watch(&f.fs_path, RecursiveMode::Recursive)
.map_err(|e| e.to_string())
.map_err(MeliError::new)?;
debug!("watching {:?}", f.fs_path.as_path());
}
Ok(Box::pin(async move {
loop {
match rx.recv() {
/*
* Event types:
*
* pub enum RefreshEventKind {
* Update(EnvelopeHash, Envelope), // Old hash, new envelope
* Create(Envelope),
* Remove(EnvelopeHash),
* Rescan,
* }
*/
Ok(event) => match event {
/* Update */
DebouncedEvent::NoticeWrite(pathbuf) | DebouncedEvent::Write(pathbuf) => {
let mailbox_hash = get_path_hash!(&pathbuf);
let file = match std::fs::OpenOptions::new()
.read(true)
.write(true)
.open(&pathbuf)
{
Ok(f) => f,
Err(_) => {
continue;
}
};
get_rw_lock_blocking(&file, &pathbuf)?;
let mut mailbox_lock = mailboxes.lock().unwrap();
let mut buf_reader = BufReader::new(file);
let mut contents = Vec::new();
if let Err(e) = buf_reader.read_to_end(&mut contents) {
debug!(e);
continue;
};
if contents.starts_with(mailbox_lock[&mailbox_hash].content.as_slice())
{
if let Ok((_, envelopes)) = mbox_parse(
mailbox_lock[&mailbox_hash].index.clone(),
&contents,
mailbox_lock[&mailbox_hash].content.len(),
prefer_mbox_type,
) {
let mut mailbox_index_lck = mailbox_index.lock().unwrap();
for env in envelopes {
mailbox_index_lck.insert(env.hash(), mailbox_hash);
(sender)(
account_hash,
BackendEvent::Refresh(RefreshEvent {
account_hash,
mailbox_hash,
kind: RefreshEventKind::Create(Box::new(env)),
}),
);
}
}
} else {
(sender)(
account_hash,
BackendEvent::Refresh(RefreshEvent {
account_hash,
mailbox_hash,
kind: RefreshEventKind::Rescan,
}),
);
}
mailbox_lock
.entry(mailbox_hash)
.and_modify(|f| f.content = contents);
}
/* Remove */
DebouncedEvent::NoticeRemove(pathbuf) | DebouncedEvent::Remove(pathbuf) => {
if mailboxes
.lock()
.unwrap()
.values()
.any(|f| f.fs_path == pathbuf)
{
let mailbox_hash = get_path_hash!(&pathbuf);
(sender)(
account_hash,
BackendEvent::Refresh(RefreshEvent {
account_hash,
mailbox_hash,
kind: RefreshEventKind::Failure(MeliError::new(format!(
"mbox mailbox {} was removed.",
pathbuf.display()
))),
}),
);
return Ok(());
}
}
DebouncedEvent::Rename(src, dest) => {
if mailboxes.lock().unwrap().values().any(|f| f.fs_path == src) {
let mailbox_hash = get_path_hash!(&src);
(sender)(
account_hash,
BackendEvent::Refresh(RefreshEvent {
account_hash,
mailbox_hash,
kind: RefreshEventKind::Failure(MeliError::new(format!(
"mbox mailbox {} was renamed to {}.",
src.display(),
dest.display()
))),
}),
);
return Ok(());
}
}
/* Trigger rescan of mailboxes */
DebouncedEvent::Rescan => {
for &mailbox_hash in mailboxes.lock().unwrap().keys() {
(sender)(
account_hash,
BackendEvent::Refresh(RefreshEvent {
account_hash,
mailbox_hash,
kind: RefreshEventKind::Rescan,
}),
);
}
return Ok(());
}
_ => {}
},
Err(e) => debug!("watch error: {:?}", e),
}
}
}))
}
fn as_any(&self) -> &dyn Any {
self
}
fn as_any_mut(&mut self) -> &mut dyn Any {
self
}
}

View File

@ -239,7 +239,7 @@ impl MailBackend for NntpType {
}))
}
fn watch(&self) -> ResultFuture<()> {
fn watcher(&self) -> Result<Box<dyn BackendWatcher>> {
Err(MeliError::new("Unimplemented."))
}

View File

@ -67,6 +67,8 @@ pub use tags::*;
mod thread;
pub use thread::*;
mod watch;
#[derive(Debug)]
pub struct DbConnection {
pub lib: Arc<libloading::Library>,
@ -232,7 +234,7 @@ unsafe impl Send for NotmuchDb {}
unsafe impl Sync for NotmuchDb {}
#[derive(Debug, Clone, Default)]
struct NotmuchMailbox {
pub struct NotmuchMailbox {
hash: MailboxHash,
children: Vec<MailboxHash>,
parent: Option<MailboxHash>,
@ -447,6 +449,82 @@ impl MailBackend for NotmuchDb {
Ok(Box::pin(async { Ok(()) }))
}
fn load(&mut self, mailbox_hash: MailboxHash) -> ResultFuture<()> {
let database = NotmuchDb::new_connection(
self.path.as_path(),
self.revision_uuid.clone(),
self.lib.clone(),
false,
)?;
let mailboxes = self.mailboxes.clone();
let index = self.index.clone();
let mailbox_index = self.mailbox_index.clone();
let collection = self.collection.clone();
Ok(Box::pin(async move {
{
//crate::connections::sleep(std::time::Duration::from_secs(2)).await;
}
let mailboxes_lck = mailboxes.read().unwrap();
let mailbox = mailboxes_lck.get(&mailbox_hash).unwrap();
let query: Query = Query::new(&database, mailbox.query_str.as_str())?;
let mut unseen_total = 0;
let mut mailbox_index_lck = mailbox_index.write().unwrap();
let new_envelopes: HashMap<EnvelopeHash, Envelope> = query
.search()?
.into_iter()
.map(|m| {
let env = m.into_envelope(&index, &collection.tag_index);
mailbox_index_lck
.entry(env.hash())
.or_default()
.push(mailbox_hash);
if !env.is_seen() {
unseen_total += 1;
}
(env.hash(), env)
})
.collect();
{
let mut total_lck = mailbox.total.lock().unwrap();
let mut unseen_lck = mailbox.unseen.lock().unwrap();
*total_lck = new_envelopes.len();
*unseen_lck = unseen_total;
}
collection.merge(new_envelopes, mailbox_hash, None);
let mut envelopes_lck = collection.envelopes.write().unwrap();
envelopes_lck.retain(|&k, _| k % 2 == 0);
Ok(())
}))
}
fn fetch_batch(&mut self, env_hashes: EnvelopeHashBatch) -> ResultFuture<()> {
let database = NotmuchDb::new_connection(
self.path.as_path(),
self.revision_uuid.clone(),
self.lib.clone(),
false,
)?;
let index = self.index.clone();
let collection = self.collection.clone();
Ok(Box::pin(async move {
//crate::connections::sleep(std::time::Duration::from_secs(2)).await;
debug!("fetch_batch {:?}", &env_hashes);
let mut envelopes_lck = collection.envelopes.write().unwrap();
for env_hash in env_hashes.iter() {
if envelopes_lck.contains_key(&env_hash) {
continue;
}
let index_lck = index.write().unwrap();
let message = Message::find_message(&database, &index_lck[&env_hash])?;
drop(index_lck);
let env = message.into_envelope(&index, &collection.tag_index);
envelopes_lck.insert(env_hash, env);
}
debug!("fetch_batch {:?} done", &env_hashes);
Ok(())
}))
}
fn fetch(
&mut self,
mailbox_hash: MailboxHash,
@ -581,50 +659,29 @@ impl MailBackend for NotmuchDb {
}))
}
fn watch(&self) -> ResultFuture<()> {
extern crate notify;
use notify::{watcher, RecursiveMode, Watcher};
fn watcher(&self) -> Result<Box<dyn BackendWatcher>> {
let account_hash = self.account_hash;
let collection = self.collection.clone();
let event_consumer = self.event_consumer.clone();
let index = self.index.clone();
let lib = self.lib.clone();
let mailbox_index = self.mailbox_index.clone();
let mailboxes = self.mailboxes.clone();
let path = self.path.clone();
let revision_uuid = self.revision_uuid.clone();
let mailboxes = self.mailboxes.clone();
let index = self.index.clone();
let mailbox_index = self.mailbox_index.clone();
let event_consumer = self.event_consumer.clone();
let (tx, rx) = std::sync::mpsc::channel();
let mut watcher = watcher(tx, std::time::Duration::from_secs(2)).unwrap();
watcher.watch(&self.path, RecursiveMode::Recursive).unwrap();
Ok(Box::pin(async move {
let _watcher = watcher;
let rx = rx;
loop {
let _ = rx.recv().map_err(|err| err.to_string())?;
{
let mut database = NotmuchDb::new_connection(
path.as_path(),
revision_uuid.clone(),
lib.clone(),
false,
)?;
let new_revision_uuid = database.get_revision_uuid();
if new_revision_uuid > *database.revision_uuid.read().unwrap() {
database.refresh(
mailboxes.clone(),
index.clone(),
mailbox_index.clone(),
collection.tag_index.clone(),
account_hash.clone(),
event_consumer.clone(),
new_revision_uuid,
)?;
*revision_uuid.write().unwrap() = new_revision_uuid;
}
}
}
Ok(Box::new(watch::NotmuchWatcher {
account_hash,
collection,
event_consumer,
index,
lib,
mailbox_hashes: BTreeSet::default(),
mailbox_index,
mailboxes,
path,
polling_period: std::time::Duration::from_secs(3),
revision_uuid,
}))
}

View File

@ -20,7 +20,6 @@
*/
use super::*;
use crate::thread::{ThreadHash, ThreadNode, ThreadNodeHash};
#[derive(Clone)]
pub struct Message<'m> {
@ -188,22 +187,6 @@ impl<'m> Message<'m> {
}
}
pub fn into_thread_node(&self) -> (ThreadNodeHash, ThreadNode) {
(
ThreadNodeHash::from(self.msg_id()),
ThreadNode {
message: Some(self.env_hash()),
parent: None,
other_mailbox: false,
children: vec![],
date: self.date(),
show_subject: true,
group: ThreadHash::new(),
unseen: false,
},
)
}
pub fn add_tag(&self, tag: &CStr) -> Result<()> {
if let Err(err) = unsafe {
try_call!(

View File

@ -0,0 +1,116 @@
/*
* meli - notmuch backend
*
* Copyright 2019 - 2021 Manos Pitsidianakis
*
* This file is part of meli.
*
* meli is free software: you can redistribute it and/or modify
* it under the terms of the GNU General Public License as published by
* the Free Software Foundation, either version 3 of the License, or
* (at your option) any later version.
*
* meli is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* GNU General Public License for more details.
*
* You should have received a copy of the GNU General Public License
* along with meli. If not, see <http://www.gnu.org/licenses/>.
*/
use super::*;
use std::collections::BTreeSet;
#[derive(Debug)]
pub struct NotmuchWatcher {
pub account_hash: AccountHash,
pub collection: Collection,
pub event_consumer: BackendEventConsumer,
pub index: Arc<RwLock<HashMap<EnvelopeHash, CString>>>,
pub lib: Arc<libloading::Library>,
pub mailbox_hashes: BTreeSet<MailboxHash>,
pub mailbox_index: Arc<RwLock<HashMap<EnvelopeHash, SmallVec<[MailboxHash; 16]>>>>,
pub mailboxes: Arc<RwLock<HashMap<MailboxHash, NotmuchMailbox>>>,
pub path: PathBuf,
pub polling_period: std::time::Duration,
pub revision_uuid: Arc<RwLock<u64>>,
}
impl BackendWatcher for NotmuchWatcher {
fn is_blocking(&self) -> bool {
true
}
fn register_mailbox(
&mut self,
mailbox_hash: MailboxHash,
_urgency: MailboxWatchUrgency,
) -> Result<()> {
self.mailbox_hashes.insert(mailbox_hash);
Ok(())
}
fn set_polling_period(&mut self, period: Option<std::time::Duration>) -> Result<()> {
if let Some(period) = period {
self.polling_period = period;
}
Ok(())
}
fn spawn(self: Box<Self>) -> ResultFuture<()> {
Ok(Box::pin(async move {
extern crate notify;
use notify::{watcher, RecursiveMode, Watcher};
let NotmuchWatcher {
account_hash,
collection,
event_consumer,
index,
lib,
mailbox_hashes: _,
mailbox_index,
mailboxes,
path,
polling_period,
revision_uuid,
} = *self;
let (tx, rx) = std::sync::mpsc::channel();
let mut watcher = watcher(tx, polling_period).unwrap();
watcher.watch(&path, RecursiveMode::Recursive).unwrap();
loop {
let _ = rx.recv().map_err(|err| err.to_string())?;
{
let mut database = NotmuchDb::new_connection(
path.as_path(),
revision_uuid.clone(),
lib.clone(),
false,
)?;
let new_revision_uuid = database.get_revision_uuid();
if new_revision_uuid > *database.revision_uuid.read().unwrap() {
database.refresh(
mailboxes.clone(),
index.clone(),
mailbox_index.clone(),
collection.tag_index.clone(),
account_hash.clone(),
event_consumer.clone(),
new_revision_uuid,
)?;
*revision_uuid.write().unwrap() = new_revision_uuid;
}
}
}
}))
}
fn as_any(&self) -> &dyn Any {
self
}
fn as_any_mut(&mut self) -> &mut dyn Any {
self
}
}

View File

@ -348,7 +348,7 @@ fn run_app(opt: Opt) -> Result<()> {
state.register_component(Box::new(components::svg::SVGScreenshotFilter::new()));
let window = Box::new(Tabbed::new(
vec![
Box::new(listing::Listing::new(&mut state.context)),
Box::new(listing2::Listing::new(&mut state.context)),
Box::new(ContactList::new(&state.context)),
],
&state.context,

View File

@ -27,6 +27,7 @@ use melib::email::{attachment_types::*, attachments::*};
use melib::thread::ThreadNodeHash;
pub mod listing;
pub mod listing2;
pub use crate::listing::*;
pub mod view;
pub use crate::view::*;

File diff suppressed because it is too large Load Diff

View File

@ -163,6 +163,13 @@ pub enum JobRequest {
Mailboxes {
handle: JoinHandle<Result<HashMap<MailboxHash, Mailbox>>>,
},
Load {
mailbox_hash: MailboxHash,
handle: JoinHandle<Result<()>>,
},
FetchBatch {
handle: JoinHandle<Result<()>>,
},
Fetch {
mailbox_hash: MailboxHash,
handle: JoinHandle<(
@ -237,6 +244,8 @@ impl Drop for JobRequest {
match self {
JobRequest::Generic { handle, .. } |
JobRequest::IsOnline { handle, .. } |
JobRequest::Load { handle, .. } |
JobRequest::FetchBatch { handle, .. } |
JobRequest::Refresh { handle, .. } |
JobRequest::SetFlags { handle, .. } |
JobRequest::SaveMessage { handle, .. } |
@ -275,6 +284,7 @@ impl core::fmt::Debug for JobRequest {
match self {
JobRequest::Generic { name, .. } => write!(f, "JobRequest::Generic({})", name),
JobRequest::Mailboxes { .. } => write!(f, "JobRequest::Mailboxes"),
JobRequest::FetchBatch { .. } => write!(f, "JobRequest::FetchBatch",),
JobRequest::Fetch { mailbox_hash, .. } => {
write!(f, "JobRequest::Fetch({})", mailbox_hash)
}
@ -302,6 +312,9 @@ impl core::fmt::Debug for JobRequest {
JobRequest::SendMessageBackground { .. } => {
write!(f, "JobRequest::SendMessageBackground")
}
JobRequest::Load { mailbox_hash, .. } => {
write!(f, "JobRequest::Load({})", mailbox_hash)
}
}
}
}
@ -312,6 +325,8 @@ impl core::fmt::Display for JobRequest {
JobRequest::Generic { name, .. } => write!(f, "{}", name),
JobRequest::Mailboxes { .. } => write!(f, "Get mailbox list"),
JobRequest::Fetch { .. } => write!(f, "Mailbox fetch"),
JobRequest::FetchBatch { .. } => write!(f, "Fetch envelopes"),
JobRequest::Load { .. } => write!(f, "Mailbox load"),
JobRequest::IsOnline { .. } => write!(f, "Online status check"),
JobRequest::Refresh { .. } => write!(f, "Refresh mailbox"),
JobRequest::SetFlags { env_hashes, .. } => write!(
@ -351,6 +366,15 @@ impl JobRequest {
}
}
pub fn is_load(&self, mailbox_hash: MailboxHash) -> bool {
match self {
JobRequest::Load {
mailbox_hash: h, ..
} if *h == mailbox_hash => true,
_ => false,
}
}
pub fn is_fetch(&self, mailbox_hash: MailboxHash) -> bool {
match self {
JobRequest::Fetch {
@ -658,8 +682,7 @@ impl Account {
{
let total = entry.ref_mailbox.count().ok().unwrap_or((0, 0)).1;
entry.status = MailboxStatus::Parsing(0, total);
if let Ok(mailbox_job) = self.backend.write().unwrap().fetch(*h) {
let mailbox_job = mailbox_job.into_future();
if let Ok(mailbox_job) = self.backend.write().unwrap().load(*h) {
let handle = if self.backend_capabilities.is_async {
self.job_executor.spawn_specialized(mailbox_job)
} else {
@ -671,9 +694,10 @@ impl Account {
StatusEvent::NewJob(job_id),
)))
.unwrap();
debug!("JobRequest::Load {} {:?}", *h, job_id);
self.active_jobs.insert(
job_id,
JobRequest::Fetch {
JobRequest::Load {
mailbox_hash: *h,
handle,
},
@ -1068,9 +1092,23 @@ impl Account {
}
if !self.active_jobs.values().any(|j| j.is_watch()) {
match self.backend.read().unwrap().watch() {
Ok(fut) => {
let handle = if self.backend_capabilities.is_async {
match self
.backend
.read()
.unwrap()
.watcher()
.and_then(|mut watcher| {
for (mailbox_hash, _) in self
.mailbox_entries
.iter()
.filter(|(_, m)| m.conf.mailbox_conf.subscribe.is_true())
{
watcher.register_mailbox(*mailbox_hash, MailboxWatchUrgency::High)?;
}
Ok((watcher.is_blocking(), watcher.spawn()?))
}) {
Ok((is_blocking, fut)) => {
let handle = if is_blocking {
self.job_executor.spawn_specialized(fut)
} else {
self.job_executor.spawn_blocking(fut)
@ -1121,6 +1159,73 @@ impl Account {
self.hash
}
pub fn fetch_batch(&mut self, env_hashes: EnvelopeHashBatch) -> Result<JobId> {
debug!("account fetch_batch {:?}", &env_hashes);
let job = self.backend.write().unwrap().fetch_batch(env_hashes)?;
let handle = if self.backend_capabilities.is_async {
self.job_executor.spawn_specialized(job)
} else {
self.job_executor.spawn_blocking(job)
};
let job_id = handle.job_id;
self.insert_job(handle.job_id, JobRequest::FetchBatch { handle });
Ok(job_id)
}
pub fn load2(&mut self, mailbox_hash: MailboxHash) -> Result<Option<JobId>> {
debug!("account load2({}", mailbox_hash);
match self.mailbox_entries[&mailbox_hash].status {
MailboxStatus::Available => Ok(None),
MailboxStatus::Failed(ref err) => Err(err.clone()),
MailboxStatus::Parsing(_, _) | MailboxStatus::None => {
debug!("load2 find: ");
if let Some(job_id) = self
.active_jobs
.iter()
.find(|(id, j)| {
debug!(id);
debug!(j).is_load(mailbox_hash)
})
.map(|(j, _)| *j)
{
Ok(Some(job_id))
} else {
let mailbox_job = self.backend.write().unwrap().load(mailbox_hash);
match mailbox_job {
Ok(mailbox_job) => {
let handle = if self.backend_capabilities.is_async {
self.job_executor.spawn_specialized(mailbox_job)
} else {
self.job_executor.spawn_blocking(mailbox_job)
};
let job_id = handle.job_id;
debug!("JobRequest::Load {} {:?}", mailbox_hash, handle.job_id);
self.insert_job(
handle.job_id,
JobRequest::Load {
mailbox_hash,
handle,
},
);
Ok(Some(job_id))
}
Err(err) => {
self.mailbox_entries
.entry(mailbox_hash)
.and_modify(|entry| {
entry.status = MailboxStatus::Failed(err.clone());
});
self.sender
.send(ThreadEvent::UIEvent(UIEvent::StartupCheck(mailbox_hash)))
.unwrap();
Err(err)
}
}
}
}
}
}
pub fn load(&mut self, mailbox_hash: MailboxHash) -> result::Result<(), usize> {
if mailbox_hash == 0 {
return Err(0);
@ -1623,6 +1728,57 @@ impl Account {
}
}
}
JobRequest::Load {
mailbox_hash,
ref mut handle,
..
} => {
debug!("got mailbox load for {}", mailbox_hash);
match handle.chan.try_recv() {
Err(_) => {
/* canceled */
return true;
}
Ok(None) => {
return true;
}
Ok(Some(Ok(()))) => {
self.mailbox_entries
.entry(mailbox_hash)
.and_modify(|entry| {
entry.status = MailboxStatus::Available;
});
self.sender
.send(ThreadEvent::UIEvent(UIEvent::MailboxUpdate((
self.hash,
mailbox_hash,
))))
.unwrap();
return true;
}
Ok(Some(Err(err))) => {
self.sender
.send(ThreadEvent::UIEvent(UIEvent::Notification(
Some(format!("{}: could not load mailbox", &self.name)),
err.to_string(),
Some(crate::types::NotificationType::Error(err.kind)),
)))
.expect("Could not send event on main channel");
self.mailbox_entries
.entry(mailbox_hash)
.and_modify(|entry| {
entry.status = MailboxStatus::Failed(err);
});
self.sender
.send(ThreadEvent::UIEvent(UIEvent::MailboxUpdate((
self.hash,
mailbox_hash,
))))
.unwrap();
return true;
}
}
}
JobRequest::Fetch {
mailbox_hash,
ref mut handle,
@ -2103,6 +2259,17 @@ impl Account {
}
}
}
JobRequest::FetchBatch { ref mut handle } => {
if let Ok(Some(Err(err))) = handle.chan.try_recv() {
self.sender
.send(ThreadEvent::UIEvent(UIEvent::Notification(
Some(format!("{}: envelope fetch failed", &self.name)),
err.to_string(),
Some(crate::types::NotificationType::Error(err.kind)),
)))
.expect("Could not send event on main channel");
}
}
JobRequest::Watch { ref mut handle } => {
debug!("JobRequest::Watch finished??? ");
if let Ok(Some(Err(err))) = handle.chan.try_recv() {

View File

@ -393,7 +393,7 @@ impl State {
.contains_key(&mailbox_hash)
{
if self.context.accounts[&account_hash]
.load(mailbox_hash)
.load2(mailbox_hash)
.is_err()
{
self.context.replies.push_back(UIEvent::from(event));

View File

@ -285,6 +285,13 @@ pub mod segment_tree {
max
}
pub fn get(&self, index: usize) -> u8 {
if self.array.is_empty() {
return 0;
}
self.array[index]
}
pub fn update(&mut self, pos: usize, value: u8) {
let mut ctr = pos + self.array.len();