melib/backends: Add BackendEvent enum

memfd
Manos Pitsidianakis 2020-08-20 01:55:24 +03:00
parent 9928ee78e7
commit a190805384
Signed by: Manos Pitsidianakis
GPG Key ID: 73627C2F690DF710
17 changed files with 286 additions and 347 deletions

View File

@ -86,6 +86,7 @@ pub type BackendCreator = Box<
dyn Fn(
&AccountSettings,
Box<dyn Fn(&str) -> bool + Send + Sync>,
BackendEventConsumer,
) -> Result<Box<dyn MailBackend>>,
>;
@ -122,7 +123,7 @@ impl Backends {
b.register(
"maildir".to_string(),
Backend {
create_fn: Box::new(|| Box::new(|f, i| MaildirType::new(f, i))),
create_fn: Box::new(|| Box::new(|f, i, ev| MaildirType::new(f, i, ev))),
validate_conf_fn: Box::new(MaildirType::validate_config),
},
);
@ -132,7 +133,7 @@ impl Backends {
b.register(
"mbox".to_string(),
Backend {
create_fn: Box::new(|| Box::new(|f, i| MboxType::new(f, i))),
create_fn: Box::new(|| Box::new(|f, i, ev| MboxType::new(f, i, ev))),
validate_conf_fn: Box::new(MboxType::validate_config),
},
);
@ -142,14 +143,14 @@ impl Backends {
b.register(
"imap".to_string(),
Backend {
create_fn: Box::new(|| Box::new(|f, i| imap::ImapType::new(f, i))),
create_fn: Box::new(|| Box::new(|f, i, ev| imap::ImapType::new(f, i, ev))),
validate_conf_fn: Box::new(imap::ImapType::validate_config),
},
);
b.register(
"nntp".to_string(),
Backend {
create_fn: Box::new(|| Box::new(|f, i| nntp::NntpType::new(f, i))),
create_fn: Box::new(|| Box::new(|f, i, ev| nntp::NntpType::new(f, i, ev))),
validate_conf_fn: Box::new(nntp::NntpType::validate_config),
},
);
@ -160,7 +161,7 @@ impl Backends {
b.register(
"notmuch".to_string(),
Backend {
create_fn: Box::new(|| Box::new(|f, i| NotmuchDb::new(f, i))),
create_fn: Box::new(|| Box::new(|f, i, ev| NotmuchDb::new(f, i, ev))),
validate_conf_fn: Box::new(NotmuchDb::validate_config),
},
);
@ -171,7 +172,7 @@ impl Backends {
b.register(
"jmap".to_string(),
Backend {
create_fn: Box::new(|| Box::new(|f, i| jmap::JmapType::new(f, i))),
create_fn: Box::new(|| Box::new(|f, i, ev| jmap::JmapType::new(f, i, ev))),
validate_conf_fn: Box::new(jmap::JmapType::validate_config),
},
);
@ -215,6 +216,17 @@ impl Backends {
}
}
#[derive(Debug, Clone)]
pub enum BackendEvent {
Notice {
description: Option<String>,
content: String,
level: crate::LoggingLevel,
},
Refresh(RefreshEvent),
//Job(Box<Future<Output = Result<()>> + Send + 'static>)
}
#[derive(Debug, Clone)]
pub enum RefreshEventKind {
Update(EnvelopeHash, Box<Envelope>),
@ -249,45 +261,25 @@ impl RefreshEvent {
}
}
/// A `RefreshEventConsumer` is a boxed closure that must be used to consume a `RefreshEvent` and
/// send it to a UI provided channel. We need this level of abstraction to provide an interface for
/// all users of mailbox refresh events.
pub struct RefreshEventConsumer(Box<dyn Fn(RefreshEvent) -> () + Send + Sync>);
impl RefreshEventConsumer {
pub fn new(b: Box<dyn Fn(RefreshEvent) -> () + Send + Sync>) -> Self {
RefreshEventConsumer(b)
}
pub fn send(&self, r: RefreshEvent) {
self.0(r);
#[derive(Clone)]
pub struct BackendEventConsumer(Arc<dyn Fn(AccountHash, BackendEvent) -> () + Send + Sync>);
impl BackendEventConsumer {
pub fn new(b: Arc<dyn Fn(AccountHash, BackendEvent) -> () + Send + Sync>) -> Self {
BackendEventConsumer(b)
}
}
impl fmt::Debug for RefreshEventConsumer {
impl fmt::Debug for BackendEventConsumer {
fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
write!(f, "RefreshEventConsumer")
write!(f, "BackendEventConsumer")
}
}
pub struct NotifyFn(Box<dyn Fn(MailboxHash) -> () + Send + Sync>);
impl Deref for BackendEventConsumer {
type Target = dyn Fn(AccountHash, BackendEvent) -> () + Send + Sync;
impl fmt::Debug for NotifyFn {
fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
write!(f, "NotifyFn Box")
}
}
impl From<Box<dyn Fn(MailboxHash) -> () + Send + Sync>> for NotifyFn {
fn from(kind: Box<dyn Fn(MailboxHash) -> () + Send + Sync>) -> Self {
NotifyFn(kind)
}
}
impl NotifyFn {
pub fn new(b: Box<dyn Fn(MailboxHash) -> () + Send + Sync>) -> Self {
NotifyFn(b)
}
pub fn notify(&self, f: MailboxHash) {
self.0(f);
fn deref(&self) -> &Self::Target {
&(*self.0)
}
}
@ -325,26 +317,14 @@ pub trait MailBackend: ::std::fmt::Debug + Send + Sync {
) -> Result<Pin<Box<dyn Stream<Item = Result<Vec<Envelope>>> + Send + 'static>>> {
Err(MeliError::new("Unimplemented."))
}
fn refresh(
&mut self,
_mailbox_hash: MailboxHash,
_sender: RefreshEventConsumer,
) -> Result<Async<()>> {
fn refresh(&mut self, _mailbox_hash: MailboxHash) -> Result<Async<()>> {
Err(MeliError::new("Unimplemented."))
}
fn refresh_async(
&mut self,
_mailbox_hash: MailboxHash,
_sender: RefreshEventConsumer,
) -> ResultFuture<()> {
fn refresh_async(&mut self, _mailbox_hash: MailboxHash) -> ResultFuture<()> {
Err(MeliError::new("Unimplemented."))
}
fn watch(
&self,
sender: RefreshEventConsumer,
work_context: WorkContext,
) -> Result<std::thread::ThreadId>;
fn watch_async(&self, _sender: RefreshEventConsumer) -> ResultFuture<()> {
fn watch(&self, work_context: WorkContext) -> Result<std::thread::ThreadId>;
fn watch_async(&self) -> ResultFuture<()> {
Err(MeliError::new("Unimplemented."))
}
fn mailboxes(&self) -> Result<HashMap<MailboxHash, Mailbox>>;

View File

@ -49,7 +49,6 @@ use futures::lock::Mutex as FutureMutex;
use futures::stream::Stream;
use std::collections::{hash_map::DefaultHasher, BTreeMap};
use std::collections::{BTreeSet, HashMap, HashSet};
use std::future::Future;
use std::hash::Hasher;
use std::pin::Pin;
use std::str::FromStr;
@ -153,16 +152,19 @@ pub struct UIDStore {
mailboxes: Arc<FutureMutex<HashMap<MailboxHash, ImapMailbox>>>,
is_online: Arc<Mutex<(Instant, Result<()>)>>,
refresh_events: Arc<Mutex<Vec<RefreshEvent>>>,
sender: Arc<RwLock<Option<RefreshEventConsumer>>>,
event_consumer: BackendEventConsumer,
}
impl Default for UIDStore {
fn default() -> Self {
impl UIDStore {
fn new(
account_hash: AccountHash,
account_name: Arc<String>,
event_consumer: BackendEventConsumer,
) -> Self {
UIDStore {
account_hash: 0,
account_hash,
cache_headers: false,
account_name: Arc::new(String::new()),
account_name,
capabilities: Default::default(),
uidvalidity: Default::default(),
hash_index: Default::default(),
@ -175,8 +177,7 @@ impl Default for UIDStore {
Instant::now(),
Err(MeliError::new("Account is uninitialised.")),
))),
refresh_events: Default::default(),
sender: Arc::new(RwLock::new(None)),
event_consumer,
}
}
}
@ -288,13 +289,8 @@ impl MailBackend for ImapType {
}))
}
fn refresh_async(
&mut self,
mailbox_hash: MailboxHash,
sender: RefreshEventConsumer,
) -> ResultFuture<()> {
fn refresh_async(&mut self, mailbox_hash: MailboxHash) -> ResultFuture<()> {
let main_conn = self.connection.clone();
*self.uid_store.sender.write().unwrap() = Some(sender);
let uid_store = self.uid_store.clone();
Ok(Box::pin(async move {
let inbox = timeout(Duration::from_secs(3), uid_store.mailboxes.lock())
@ -383,27 +379,18 @@ impl MailBackend for ImapType {
Err(MeliError::new("Unimplemented."))
}
fn refresh(
&mut self,
_mailbox_hash: MailboxHash,
_sender: RefreshEventConsumer,
) -> Result<Async<()>> {
fn refresh(&mut self, _mailbox_hash: MailboxHash) -> Result<Async<()>> {
Err(MeliError::new("Unimplemented."))
}
fn watch(
&self,
_sender: RefreshEventConsumer,
_work_context: WorkContext,
) -> Result<std::thread::ThreadId> {
fn watch(&self, _work_context: WorkContext) -> Result<std::thread::ThreadId> {
Err(MeliError::new("Unimplemented."))
}
fn watch_async(&self, sender: RefreshEventConsumer) -> ResultFuture<()> {
fn watch_async(&self) -> ResultFuture<()> {
debug!("watch_async called");
let conn = ImapConnection::new_connection(&self.server_conf, self.uid_store.clone());
let main_conn = self.connection.clone();
*self.uid_store.sender.write().unwrap() = Some(sender);
let uid_store = self.uid_store.clone();
let has_idle: bool = match self.server_conf.protocol {
ImapProtocol::IMAP {
@ -1132,6 +1119,7 @@ impl ImapType {
pub fn new(
s: &AccountSettings,
is_subscribed: Box<dyn Fn(&str) -> bool + Send + Sync>,
event_consumer: BackendEventConsumer,
) -> Result<Box<dyn MailBackend>> {
let server_hostname = get_conf_val!(s["server_hostname"])?;
let server_username = get_conf_val!(s["server_username"])?;
@ -1184,10 +1172,8 @@ impl ImapType {
};
let account_name = Arc::new(s.name().to_string());
let uid_store: Arc<UIDStore> = Arc::new(UIDStore {
account_hash,
cache_headers: get_conf_val!(s["X_header_caching"], false)?,
account_name,
..UIDStore::default()
..UIDStore::new(account_hash, account_name, event_consumer)
});
let connection = ImapConnection::new_connection(&server_conf, uid_store.clone());

View File

@ -758,14 +758,10 @@ impl ImapConnection {
}
pub fn add_refresh_event(&mut self, ev: crate::backends::RefreshEvent) {
if let Some(ref sender) = self.uid_store.sender.read().unwrap().as_ref() {
sender.send(ev);
for ev in self.uid_store.refresh_events.lock().unwrap().drain(..) {
sender.send(ev);
}
} else {
self.uid_store.refresh_events.lock().unwrap().push(ev);
}
(self.uid_store.event_consumer)(
self.uid_store.account_hash,
crate::backends::BackendEvent::Refresh(ev),
);
}
pub async fn create_uid_msn_cache(

View File

@ -85,7 +85,12 @@ pub trait ManageSieve {
fn renamescript(&mut self) -> Result<()>;
}
pub fn new_managesieve_connection(s: &AccountSettings) -> Result<ImapConnection> {
pub fn new_managesieve_connection(
account_hash: crate::backends::AccountHash,
account_name: String,
s: &AccountSettings,
event_consumer: crate::backends::BackendEventConsumer,
) -> Result<ImapConnection> {
let server_hostname = get_conf_val!(s["server_hostname"])?;
let server_username = get_conf_val!(s["server_username"])?;
let server_password = get_conf_val!(s["server_password"])?;
@ -106,7 +111,7 @@ pub fn new_managesieve_connection(s: &AccountSettings) -> Result<ImapConnection>
Instant::now(),
Err(MeliError::new("Account is uninitialised.")),
))),
..Default::default()
..UIDStore::new(account_hash, Arc::new(account_name), event_consumer)
});
Ok(ImapConnection::new_connection(&server_conf, uid_store))
}

View File

@ -243,10 +243,8 @@ impl MailBackend for JmapType {
}))
}
fn watch_async(&self, sender: RefreshEventConsumer) -> ResultFuture<()> {
let conn = self.connection.clone();
fn watch_async(&self) -> ResultFuture<()> {
Ok(Box::pin(async move {
*conn.lock().await.sender.lock().unwrap() = Some(sender);
Err(MeliError::from("JMAP watch for updates is unimplemented"))
}))
}
@ -360,11 +358,7 @@ impl MailBackend for JmapType {
Err(MeliError::new("Unimplemented."))
}
fn watch(
&self,
_sender: RefreshEventConsumer,
_work_context: WorkContext,
) -> Result<std::thread::ThreadId> {
fn watch(&self, _work_context: WorkContext) -> Result<std::thread::ThreadId> {
Err(MeliError::new("Unimplemented."))
}
@ -535,6 +529,7 @@ impl JmapType {
pub fn new(
s: &AccountSettings,
is_subscribed: Box<dyn Fn(&str) -> bool + Send + Sync>,
event_consumer: BackendEventConsumer,
) -> Result<Box<dyn MailBackend>> {
let online = Arc::new(FutureMutex::new((
std::time::Instant::now(),
@ -553,6 +548,8 @@ impl JmapType {
Ok(Box::new(JmapType {
connection: Arc::new(FutureMutex::new(JmapConnection::new(
&server_conf,
account_hash,
event_consumer,
online.clone(),
)?)),
store: Arc::new(RwLock::new(Store::default())),

View File

@ -30,14 +30,16 @@ pub struct JmapConnection {
pub online_status: Arc<FutureMutex<(Instant, Result<()>)>>,
pub server_conf: JmapServerConf,
pub account_id: Arc<Mutex<String>>,
pub account_hash: AccountHash,
pub method_call_states: Arc<Mutex<HashMap<&'static str, String>>>,
pub refresh_events: Arc<Mutex<Vec<RefreshEvent>>>,
pub sender: Arc<Mutex<Option<RefreshEventConsumer>>>,
pub event_consumer: BackendEventConsumer,
}
impl JmapConnection {
pub fn new(
server_conf: &JmapServerConf,
account_hash: AccountHash,
event_consumer: BackendEventConsumer,
online_status: Arc<FutureMutex<(Instant, Result<()>)>>,
) -> Result<Self> {
let client = HttpClient::builder()
@ -56,9 +58,9 @@ impl JmapConnection {
online_status,
server_conf,
account_id: Arc::new(Mutex::new(String::new())),
account_hash,
event_consumer,
method_call_states: Arc::new(Mutex::new(Default::default())),
refresh_events: Arc::new(Mutex::new(Default::default())),
sender: Arc::new(Mutex::new(Default::default())),
})
}
@ -116,13 +118,6 @@ impl JmapConnection {
}
pub fn add_refresh_event(&self, event: RefreshEvent) {
if let Some(ref sender) = self.sender.lock().unwrap().as_ref() {
for event in self.refresh_events.lock().unwrap().drain(..) {
sender.send(event);
}
sender.send(event);
} else {
self.refresh_events.lock().unwrap().push(event);
}
(self.event_consumer)(self.account_hash, BackendEvent::Refresh(event));
}
}

View File

@ -111,6 +111,7 @@ pub struct MaildirType {
mailboxes: HashMap<MailboxHash, MaildirMailbox>,
mailbox_index: Arc<Mutex<HashMap<EnvelopeHash, MailboxHash>>>,
hash_indexes: HashIndexes,
event_consumer: BackendEventConsumer,
path: PathBuf,
}
@ -229,7 +230,6 @@ impl MailBackend for MaildirType {
fn refresh(
&mut self,
mailbox_hash: MailboxHash,
sender: RefreshEventConsumer,
) -> Result<Async<()>> {
let w = AsyncBuilder::new();
let cache_dir = xdg::BaseDirectories::with_profile("meli", &self.name).unwrap();
@ -238,6 +238,7 @@ impl MailBackend for MaildirType {
hasher.write(self.name.as_bytes());
hasher.finish()
};
let sender = self.event_consumer.clone();
let handle = {
let mailbox: &MaildirMailbox = &self.mailboxes[&mailbox_hash];
@ -252,7 +253,7 @@ impl MailBackend for MaildirType {
.set_name
.send((std::thread::current().id(), name.clone()))
.unwrap();
let thunk = move |sender: &RefreshEventConsumer| {
let thunk = move |sender: &BackendEventConsumer| {
debug!("refreshing");
let mut path = path.clone();
path.push("new");
@ -312,11 +313,11 @@ impl MailBackend for MaildirType {
let writer = io::BufWriter::new(f);
bincode::serialize_into(writer, &e).unwrap();
}
sender.send(RefreshEvent {
(sender)(account_hash, BackendEvent::Refresh(RefreshEvent {
account_hash,
mailbox_hash,
kind: Create(Box::new(e)),
});
}));
} else {
debug!(
"DEBUG: hash {}, path: {} couldn't be parsed",
@ -326,21 +327,21 @@ impl MailBackend for MaildirType {
continue;
}
}
for ev in current_hashes.into_iter().map(|h| RefreshEvent {
for ev in current_hashes.into_iter().map(|h| BackendEvent::Refresh(RefreshEvent {
account_hash,
mailbox_hash,
kind: Remove(h),
}) {
sender.send(ev);
})) {
(sender)(account_hash, ev);
}
Ok(())
};
if let Err(err) = thunk(&sender) {
sender.send(RefreshEvent {
(sender)(account_hash, BackendEvent::Refresh(RefreshEvent {
account_hash,
mailbox_hash,
kind: Failure(err),
});
}));
}
})
};
@ -348,9 +349,9 @@ impl MailBackend for MaildirType {
}
fn watch(
&self,
sender: RefreshEventConsumer,
work_context: WorkContext,
) -> Result<std::thread::ThreadId> {
let sender = self.event_consumer.clone();
let (tx, rx) = channel();
let mut watcher = watcher(tx, Duration::from_secs(2)).unwrap();
let account_hash = {
@ -425,15 +426,15 @@ impl MailBackend for MaildirType {
env.subject(),
pathbuf.display()
);
if !env.is_seen() {
*mailbox_counts[&mailbox_hash].0.lock().unwrap() += 1;
}
*mailbox_counts[&mailbox_hash].1.lock().unwrap() += 1;
sender.send(RefreshEvent {
if !env.is_seen() {
*mailbox_counts[&mailbox_hash].0.lock().unwrap() += 1;
}
*mailbox_counts[&mailbox_hash].1.lock().unwrap() += 1;
(sender)(account_hash, BackendEvent::Refresh(RefreshEvent {
account_hash,
mailbox_hash,
kind: Create(Box::new(env)),
});
}));
}
}
/* Update */
@ -469,11 +470,11 @@ impl MailBackend for MaildirType {
file_name,
) {
mailbox_index.lock().unwrap().insert(env.hash(),mailbox_hash);
sender.send(RefreshEvent {
(sender)(account_hash, BackendEvent::Refresh(RefreshEvent {
account_hash,
mailbox_hash,
kind: Create(Box::new(env)),
});
}));
}
return;
}
@ -496,11 +497,11 @@ impl MailBackend for MaildirType {
/* Send Write notice */
sender.send(RefreshEvent {
(sender)(account_hash, BackendEvent::Refresh(RefreshEvent {
account_hash,
mailbox_hash,
kind: Update(old_hash, Box::new(env)),
});
}));
}
}
}
@ -546,11 +547,11 @@ impl MailBackend for MaildirType {
e.removed = true;
});
sender.send(RefreshEvent {
(sender)(account_hash, BackendEvent::Refresh(RefreshEvent {
account_hash,
mailbox_hash,
kind: Remove(hash),
});
}));
}
/* Envelope hasn't changed */
DebouncedEvent::Rename(src, dest) => {
@ -577,11 +578,11 @@ impl MailBackend for MaildirType {
debug!(&e.modified);
e.modified = Some(PathMod::Hash(new_hash));
});
sender.send(RefreshEvent {
(sender)(account_hash, BackendEvent::Refresh(RefreshEvent {
account_hash,
mailbox_hash: get_path_hash!(dest),
kind: Rename(old_hash, new_hash),
});
}));
if !was_seen && is_seen {
let mut lck = mailbox_counts[&mailbox_hash].0.lock().unwrap();
*lck = lck.saturating_sub(1);
@ -589,11 +590,11 @@ impl MailBackend for MaildirType {
*mailbox_counts[&mailbox_hash].0.lock().unwrap() += 1;
}
if old_flags != new_flags {
sender.send(RefreshEvent {
(sender)(account_hash, BackendEvent::Refresh(RefreshEvent {
account_hash,
mailbox_hash: get_path_hash!(dest),
kind: NewFlags(new_hash, (new_flags, vec![])),
});
}));
}
mailbox_index.lock().unwrap().insert(new_hash,get_path_hash!(dest) );
index_lock.insert(new_hash, dest.into());
@ -642,11 +643,11 @@ impl MailBackend for MaildirType {
*mailbox_counts[&mailbox_hash].0.lock().unwrap() += 1;
}
*mailbox_counts[&mailbox_hash].1.lock().unwrap() += 1;
sender.send(RefreshEvent {
(sender)(account_hash, BackendEvent::Refresh(RefreshEvent {
account_hash,
mailbox_hash,
kind: Create(Box::new(env)),
});
}));
continue;
} else {
debug!("not valid email");
@ -655,36 +656,36 @@ impl MailBackend for MaildirType {
if was_seen && !is_seen {
*mailbox_counts[&mailbox_hash].0.lock().unwrap() += 1;
}
sender.send(RefreshEvent {
(sender)(account_hash, BackendEvent::Refresh(RefreshEvent {
account_hash,
mailbox_hash: get_path_hash!(dest),
kind: Rename(old_hash, new_hash),
});
}));
debug!("contains_new_key");
if old_flags != new_flags {
sender.send(RefreshEvent {
(sender)(account_hash, BackendEvent::Refresh(RefreshEvent {
account_hash,
mailbox_hash: get_path_hash!(dest),
kind: NewFlags(new_hash, (new_flags, vec![])),
});
}));
}
}
/* Maybe a re-read should be triggered here just to be safe.
sender.send(RefreshEvent {
(sender)(account_hash, BackendEvent::Refresh(RefreshEvent {
account_hash,
mailbox_hash: get_path_hash!(dest),
kind: Rescan,
});
}));
*/
}
/* Trigger rescan of mailbox */
DebouncedEvent::Rescan => {
sender.send(RefreshEvent {
(sender)(account_hash, BackendEvent::Refresh(RefreshEvent {
account_hash,
mailbox_hash: root_mailbox_hash,
kind: Rescan,
});
}));
}
_ => {}
},
@ -871,6 +872,7 @@ impl MaildirType {
pub fn new(
settings: &AccountSettings,
is_subscribed: Box<dyn Fn(&str) -> bool>,
event_consumer: BackendEventConsumer,
) -> Result<Box<dyn MailBackend>> {
let mut mailboxes: HashMap<MailboxHash, MaildirMailbox> = Default::default();
fn recurse_mailboxes<P: AsRef<Path>>(
@ -1012,6 +1014,7 @@ impl MaildirType {
mailboxes,
hash_indexes: Arc::new(Mutex::new(hash_indexes)),
mailbox_index: Default::default(),
event_consumer,
path: root_path,
}))
}

View File

@ -684,13 +684,14 @@ impl<'a> Iterator for MessageIterator<'a> {
}
/// Mbox backend
#[derive(Debug, Default)]
#[derive(Debug)]
pub struct MboxType {
account_name: String,
path: PathBuf,
mailbox_index: Arc<Mutex<HashMap<EnvelopeHash, MailboxHash>>>,
mailboxes: Arc<Mutex<HashMap<MailboxHash, MboxMailbox>>>,
prefer_mbox_type: Option<MboxReader>,
event_consumer: BackendEventConsumer,
}
impl MailBackend for MboxType {
@ -795,11 +796,8 @@ impl MailBackend for MboxType {
Ok(w.build(handle))
}
fn watch(
&self,
sender: RefreshEventConsumer,
work_context: WorkContext,
) -> Result<std::thread::ThreadId> {
fn watch(&self, work_context: WorkContext) -> Result<std::thread::ThreadId> {
let sender = self.event_consumer.clone();
let (tx, rx) = channel();
let mut watcher = watcher(tx, std::time::Duration::from_secs(10))
.map_err(|e| e.to_string())
@ -873,19 +871,25 @@ impl MailBackend for MboxType {
let mut mailbox_index_lck = mailbox_index.lock().unwrap();
for env in envelopes {
mailbox_index_lck.insert(env.hash(), mailbox_hash);
sender.send(RefreshEvent {
(sender)(
account_hash,
mailbox_hash,
kind: RefreshEventKind::Create(Box::new(env)),
});
BackendEvent::Refresh(RefreshEvent {
account_hash,
mailbox_hash,
kind: RefreshEventKind::Create(Box::new(env)),
}),
);
}
}
} else {
sender.send(RefreshEvent {
(sender)(
account_hash,
mailbox_hash,
kind: RefreshEventKind::Rescan,
});
BackendEvent::Refresh(RefreshEvent {
account_hash,
mailbox_hash,
kind: RefreshEventKind::Rescan,
}),
);
}
mailbox_lock
.entry(mailbox_hash)
@ -901,14 +905,19 @@ impl MailBackend for MboxType {
.any(|f| f.fs_path == pathbuf)
{
let mailbox_hash = get_path_hash!(&pathbuf);
sender.send(RefreshEvent {
(sender)(
account_hash,
mailbox_hash,
kind: RefreshEventKind::Failure(MeliError::new(format!(
"mbox mailbox {} was removed.",
pathbuf.display()
))),
});
BackendEvent::Refresh(RefreshEvent {
account_hash,
mailbox_hash,
kind: RefreshEventKind::Failure(MeliError::new(
format!(
"mbox mailbox {} was removed.",
pathbuf.display()
),
)),
}),
);
return;
}
}
@ -920,26 +929,34 @@ impl MailBackend for MboxType {
.any(|f| &f.fs_path == &src)
{
let mailbox_hash = get_path_hash!(&src);
sender.send(RefreshEvent {
(sender)(
account_hash,
mailbox_hash,
kind: RefreshEventKind::Failure(MeliError::new(format!(
"mbox mailbox {} was renamed to {}.",
src.display(),
dest.display()
))),
});
BackendEvent::Refresh(RefreshEvent {
account_hash,
mailbox_hash,
kind: RefreshEventKind::Failure(MeliError::new(
format!(
"mbox mailbox {} was renamed to {}.",
src.display(),
dest.display()
),
)),
}),
);
return;
}
}
/* Trigger rescan of mailboxes */
DebouncedEvent::Rescan => {
for &mailbox_hash in mailboxes.lock().unwrap().keys() {
sender.send(RefreshEvent {
(sender)(
account_hash,
mailbox_hash,
kind: RefreshEventKind::Rescan,
});
BackendEvent::Refresh(RefreshEvent {
account_hash,
mailbox_hash,
kind: RefreshEventKind::Rescan,
}),
);
}
return;
}
@ -1023,6 +1040,7 @@ impl MboxType {
pub fn new(
s: &AccountSettings,
_is_subscribed: Box<dyn Fn(&str) -> bool>,
event_consumer: BackendEventConsumer,
) -> Result<Box<dyn MailBackend>> {
let path = Path::new(s.root_mailbox.as_str()).expand();
if !path.exists() {
@ -1035,6 +1053,7 @@ impl MboxType {
let prefer_mbox_type: String = get_conf_val!(s["prefer_mbox_type"], "auto".to_string())?;
let ret = MboxType {
account_name: s.name().to_string(),
event_consumer,
path,
prefer_mbox_type: match prefer_mbox_type.as_str() {
"auto" => None,
@ -1050,7 +1069,8 @@ impl MboxType {
)))
}
},
..Default::default()
mailbox_index: Default::default(),
mailboxes: Default::default(),
};
let name: String = ret
.path

View File

@ -94,15 +94,19 @@ pub struct UIDStore {
mailboxes: Arc<FutureMutex<HashMap<MailboxHash, NntpMailbox>>>,
is_online: Arc<Mutex<(Instant, Result<()>)>>,
refresh_events: Arc<Mutex<Vec<RefreshEvent>>>,
sender: Arc<RwLock<Option<RefreshEventConsumer>>>,
event_consumer: BackendEventConsumer,
}
impl Default for UIDStore {
fn default() -> Self {
impl UIDStore {
fn new(
account_hash: AccountHash,
account_name: Arc<String>,
event_consumer: BackendEventConsumer,
) -> Self {
UIDStore {
account_hash: 0,
account_name: Arc::new(String::new()),
account_hash,
account_name,
event_consumer,
offline_cache: false,
capabilities: Default::default(),
hash_index: Default::default(),
@ -112,8 +116,6 @@ impl Default for UIDStore {
Instant::now(),
Err(MeliError::new("Account is uninitialised.")),
))),
refresh_events: Default::default(),
sender: Arc::new(RwLock::new(None)),
}
}
}
@ -209,11 +211,7 @@ impl MailBackend for NntpType {
}))
}
fn refresh_async(
&mut self,
_mailbox_hash: MailboxHash,
_sender: RefreshEventConsumer,
) -> ResultFuture<()> {
fn refresh_async(&mut self, _mailbox_hash: MailboxHash) -> ResultFuture<()> {
Err(MeliError::new("Unimplemented."))
}
@ -254,23 +252,15 @@ impl MailBackend for NntpType {
Err(MeliError::new("Unimplemented."))
}
fn refresh(
&mut self,
_mailbox_hash: MailboxHash,
_sender: RefreshEventConsumer,
) -> Result<Async<()>> {
fn refresh(&mut self, _mailbox_hash: MailboxHash) -> Result<Async<()>> {
Err(MeliError::new("Unimplemented."))
}
fn watch(
&self,
_sender: RefreshEventConsumer,
_work_context: WorkContext,
) -> Result<std::thread::ThreadId> {
fn watch(&self, _work_context: WorkContext) -> Result<std::thread::ThreadId> {
Err(MeliError::new("Unimplemented."))
}
fn watch_async(&self, _sender: RefreshEventConsumer) -> ResultFuture<()> {
fn watch_async(&self) -> ResultFuture<()> {
Err(MeliError::new("Unimplemented."))
}
@ -388,6 +378,7 @@ impl NntpType {
pub fn new(
s: &AccountSettings,
is_subscribed: Box<dyn Fn(&str) -> bool + Send + Sync>,
event_consumer: BackendEventConsumer,
) -> Result<Box<dyn MailBackend>> {
let server_hostname = get_conf_val!(s["server_hostname"])?;
/*let server_username = get_conf_val!(s["server_username"], "")?;
@ -469,11 +460,9 @@ impl NntpType {
)));
}
let uid_store: Arc<UIDStore> = Arc::new(UIDStore {
account_hash,
account_name,
offline_cache: false, //get_conf_val!(s["X_header_caching"], false)?,
mailboxes: Arc::new(FutureMutex::new(mailboxes)),
..UIDStore::default()
..UIDStore::new(account_hash, account_name, event_consumer)
});
let connection = NntpConnection::new_connection(&server_conf, uid_store.clone());

View File

@ -476,14 +476,10 @@ impl NntpConnection {
}
pub fn add_refresh_event(&mut self, ev: crate::backends::RefreshEvent) {
if let Some(ref sender) = self.uid_store.sender.read().unwrap().as_ref() {
sender.send(ev);
for ev in self.uid_store.refresh_events.lock().unwrap().drain(..) {
sender.send(ev);
}
} else {
self.uid_store.refresh_events.lock().unwrap().push(ev);
}
(self.uid_store.event_consumer)(
self.uid_store.account_hash,
crate::backends::BackendEvent::Refresh(ev),
);
}
pub async fn select_group(

View File

@ -107,6 +107,7 @@ pub struct NotmuchDb {
tag_index: Arc<RwLock<BTreeMap<u64, String>>>,
path: PathBuf,
account_name: String,
event_consumer: BackendEventConsumer,
save_messages_to: Option<PathBuf>,
}
@ -187,6 +188,7 @@ impl NotmuchDb {
pub fn new(
s: &AccountSettings,
_is_subscribed: Box<dyn Fn(&str) -> bool>,
event_consumer: BackendEventConsumer,
) -> Result<Box<dyn MailBackend>> {
let lib = Arc::new(libloading::Library::new("libnotmuch.so.5")?);
let path = Path::new(s.root_mailbox.as_str()).expand();
@ -239,6 +241,7 @@ impl NotmuchDb {
mailboxes: Arc::new(RwLock::new(mailboxes)),
save_messages_to: None,
account_name: s.name().to_string(),
event_consumer,
}))
}
@ -424,14 +427,11 @@ impl MailBackend for NotmuchDb {
Ok(w.build(handle))
}
fn watch(
&self,
sender: RefreshEventConsumer,
_work_context: WorkContext,
) -> Result<std::thread::ThreadId> {
fn watch(&self, _work_context: WorkContext) -> Result<std::thread::ThreadId> {
extern crate notify;
use crate::backends::RefreshEventKind::*;
use notify::{watcher, RecursiveMode, Watcher};
let sender = self.event_consumer.clone();
let (tx, rx) = std::sync::mpsc::channel();
let mut watcher = watcher(tx, std::time::Duration::from_secs(2)).unwrap();
watcher.watch(&self.path, RecursiveMode::Recursive).unwrap();
@ -463,7 +463,7 @@ impl MailBackend for NotmuchDb {
.name(format!("watching {}", self.account_name))
.spawn(move || {
let _watcher = watcher;
let c = move |sender: &RefreshEventConsumer| -> std::result::Result<(), MeliError> {
let c = move |sender: &BackendEventConsumer| -> std::result::Result<(), MeliError> {
loop {
let _ = rx.recv().map_err(|err| err.to_string())?;
{
@ -512,11 +512,14 @@ impl MailBackend for NotmuchDb {
}
}
for &mailbox_hash in mailbox_hashes {
sender.send(RefreshEvent {
(sender)(
account_hash,
mailbox_hash,
kind: NewFlags(env_hash, tags.clone()),
});
BackendEvent::Refresh(RefreshEvent {
account_hash,
mailbox_hash,
kind: NewFlags(env_hash, tags.clone()),
}),
);
}
} else {
match notmuch_message_into_envelope(
@ -548,11 +551,14 @@ impl MailBackend for NotmuchDb {
if !env.is_seen() {
*unseen_lck += 1;
}
sender.send(RefreshEvent {
(sender)(
account_hash,
mailbox_hash,
kind: Create(Box::new(env.clone())),
});
BackendEvent::Refresh(RefreshEvent {
account_hash,
mailbox_hash,
kind: Create(Box::new(env.clone())),
}),
);
}
}
}
@ -587,11 +593,14 @@ impl MailBackend for NotmuchDb {
let m = &mailboxes_lck[&mailbox_hash];
let mut total_lck = m.total.lock().unwrap();
*total_lck = total_lck.saturating_sub(1);
sender.send(RefreshEvent {
(sender)(
account_hash,
mailbox_hash,
kind: Remove(env_hash),
});
BackendEvent::Refresh(RefreshEvent {
account_hash,
mailbox_hash,
kind: Remove(env_hash),
}),
);
}
}
}
@ -606,11 +615,14 @@ impl MailBackend for NotmuchDb {
};
if let Err(err) = c(&sender) {
sender.send(RefreshEvent {
(sender)(
account_hash,
mailbox_hash: 0,
kind: Failure(err),
});
BackendEvent::Refresh(RefreshEvent {
account_hash,
mailbox_hash: 0,
kind: Failure(err),
}),
);
}
})?;
Ok(handle.thread().id())

View File

@ -139,7 +139,9 @@ pub use smallvec;
pub use futures;
pub use smol;
pub use crate::backends::{Backends, RefreshEvent, RefreshEventConsumer, SpecialUsageMailbox};
pub use crate::backends::{
BackendEvent, BackendEventConsumer, Backends, RefreshEvent, SpecialUsageMailbox,
};
pub use crate::collection::*;
pub use crate::conf::*;
pub use crate::email::{Envelope, EnvelopeHash, Flag};

View File

@ -161,7 +161,6 @@ pub struct Account {
pub active_job_instants: BTreeMap<std::time::Instant, JobId>,
sender: Sender<ThreadEvent>,
event_queue: VecDeque<(MailboxHash, RefreshEvent)>,
notify_fn: Arc<NotifyFn>,
pub backend_capabilities: MailBackendCapabilities,
}
@ -358,7 +357,7 @@ impl Account {
work_context: WorkContext,
job_executor: Arc<JobExecutor>,
sender: Sender<ThreadEvent>,
notify_fn: NotifyFn,
event_consumer: BackendEventConsumer,
) -> Result<Self> {
let s = settings.clone();
let backend = map.get(settings.account().format())(
@ -372,8 +371,8 @@ impl Account {
.iter()
.any(|m| path.matches_glob(m))
}),
event_consumer,
)?;
let notify_fn = Arc::new(notify_fn);
let data_dir = xdg::BaseDirectories::with_profile("meli", &name).unwrap();
let mut address_book = AddressBook::with_account(&settings.account());
@ -425,7 +424,6 @@ impl Account {
collection: Default::default(),
work_context,
settings,
notify_fn,
sender,
job_executor,
active_jobs,
@ -592,18 +590,14 @@ impl Account {
.insert(std::time::Instant::now(), job_id);
}
} else {
entry.worker = match Account::new_worker(
&f,
&mut self.backend,
&self.work_context,
self.notify_fn.clone(),
) {
Ok(v) => v,
Err(err) => {
entry.status = MailboxStatus::Failed(err);
None
}
};
entry.worker =
match Account::new_worker(&f, &mut self.backend, &self.work_context) {
Ok(v) => v,
Err(err) => {
entry.status = MailboxStatus::Failed(err);
None
}
};
}
}
});
@ -622,12 +616,9 @@ impl Account {
mailbox: &Mailbox,
backend: &Arc<RwLock<Box<dyn MailBackend>>>,
work_context: &WorkContext,
notify_fn: Arc<NotifyFn>,
) -> Result<Worker> {
let mailbox_hash = mailbox.hash();
let mut mailbox_handle = backend.write().unwrap().fetch(mailbox_hash)?;
let mut builder = AsyncBuilder::new();
let our_tx = builder.tx();
let priority = match mailbox.special_usage() {
SpecialUsageMailbox::Inbox => 0,
SpecialUsageMailbox::Sent => 1,
@ -644,51 +635,7 @@ impl Account {
}
};
/* This polling closure needs to be 'static', that is to spawn its own thread instead of
* being assigned to a worker thread. Otherwise the polling closures could fill up the
* workers causing no actual parsing to be done. If we could yield from within the worker
* threads' closures this could be avoided, but it requires green threads.
*/
let name = format!("Parsing {}", mailbox.path());
builder.set_priority(priority).set_is_static(true);
let mut w = builder.build(Box::new(move |work_context| {
let work = mailbox_handle.work().unwrap();
work_context.new_work.send(work).unwrap();
let thread_id = std::thread::current().id();
work_context.set_name.send((thread_id, name)).unwrap();
work_context
.set_status
.send((thread_id, "Waiting for subworkers..".to_string()))
.unwrap();
loop {
match debug!(mailbox_handle.poll_block()) {
Ok(s @ AsyncStatus::Payload(_)) => {
our_tx.send(s).unwrap();
debug!("notifying for {}", mailbox_hash);
notify_fn.notify(mailbox_hash);
}
Ok(s @ AsyncStatus::Finished) => {
our_tx.send(s).unwrap();
notify_fn.notify(mailbox_hash);
debug!("exiting");
work_context.finished.send(thread_id).unwrap();
return;
}
Ok(s) => {
our_tx.send(s).unwrap();
}
Err(_) => {
debug!("poll error");
return;
}
}
}
}));
if let Some(w) = w.work() {
work_context.new_work.send(w).unwrap();
}
Ok(Some(w))
todo!()
}
pub fn reload(&mut self, event: RefreshEvent, mailbox_hash: MailboxHash) -> Option<UIEvent> {
if !self.mailbox_entries[&mailbox_hash].status.is_available() {
@ -879,7 +826,6 @@ impl Account {
&self.mailbox_entries[&mailbox_hash].ref_mailbox,
&mut self.backend,
&self.work_context,
self.notify_fn.clone(),
) {
Ok(v) => v,
Err(err) => {
@ -945,12 +891,8 @@ impl Account {
.unwrap();
return Ok(());
}
let sender_ = self.sender.clone();
let r = RefreshEventConsumer::new(Box::new(move |r| {
sender_.send(ThreadEvent::from(r)).unwrap();
}));
if self.backend_capabilities.is_async {
if let Ok(refresh_job) = self.backend.write().unwrap().refresh_async(mailbox_hash, r) {
if let Ok(refresh_job) = self.backend.write().unwrap().refresh_async(mailbox_hash) {
let (rcvr, handle, job_id) = self.job_executor.spawn_specialized(refresh_job);
self.sender
.send(ThreadEvent::UIEvent(UIEvent::StatusEvent(
@ -963,7 +905,7 @@ impl Account {
.insert(std::time::Instant::now(), job_id);
}
} else {
let mut h = self.backend.write().unwrap().refresh(mailbox_hash, r)?;
let mut h = self.backend.write().unwrap().refresh(mailbox_hash)?;
self.work_context.new_work.send(h.work().unwrap()).unwrap();
}
Ok(())
@ -973,13 +915,9 @@ impl Account {
return;
}
let sender_ = self.sender.clone();
let r = RefreshEventConsumer::new(Box::new(move |r| {
sender_.send(ThreadEvent::from(r)).unwrap();
}));
if self.backend_capabilities.is_async {
if !self.active_jobs.values().any(|j| j.is_watch()) {
match self.backend.read().unwrap().watch_async(r) {
match self.backend.read().unwrap().watch_async() {
Ok(fut) => {
let (handle, job_id) = self.job_executor.spawn(fut);
self.active_jobs.insert(job_id, JobRequest::Watch(handle));
@ -998,7 +936,7 @@ impl Account {
.backend
.read()
.unwrap()
.watch(r, self.work_context.clone())
.watch(self.work_context.clone())
{
Ok(id) => {
self.sender
@ -1115,7 +1053,6 @@ impl Account {
&self.mailbox_entries[&mailbox_hash].ref_mailbox,
&mut self.backend,
&self.work_context,
self.notify_fn.clone(),
) {
Ok(v) => v,
Err(err) => {
@ -1470,7 +1407,6 @@ impl Account {
&mailboxes[&mailbox_hash],
&mut self.backend,
&self.work_context,
self.notify_fn.clone(),
) {
Ok(v) => (MailboxStatus::Parsing(0, 0), v),
Err(err) => (MailboxStatus::Failed(err), None),

View File

@ -191,18 +191,10 @@ impl MailBackend for PluginBackend {
Ok(w.build(handle))
}
fn refresh(
&mut self,
_mailbox_hash: MailboxHash,
_sender: RefreshEventConsumer,
) -> Result<Async<()>> {
fn refresh(&mut self, _mailbox_hash: MailboxHash) -> Result<Async<()>> {
Err(MeliError::new("Unimplemented."))
}
fn watch(
&self,
_sender: RefreshEventConsumer,
_work_context: WorkContext,
) -> Result<std::thread::ThreadId> {
fn watch(&self, _work_context: WorkContext) -> Result<std::thread::ThreadId> {
Err(MeliError::new("Unimplemented."))
}
@ -249,6 +241,7 @@ impl PluginBackend {
plugin: Plugin,
_s: &AccountSettings,
_is_subscribed: Box<dyn Fn(&str) -> bool>,
_ev: melib::backends::BackendEventConsumer,
) -> Result<Box<dyn MailBackend>> {
if plugin.kind != PluginKind::Backend {
return Err(MeliError::new(format!(
@ -284,10 +277,10 @@ impl PluginBackend {
create_fn: Box::new(move || {
let plugin = plugin.clone();
let listener = listener.try_clone().unwrap();
Box::new(move |f, i| {
Box::new(move |f, i, ev| {
let plugin = plugin.clone();
let listener = listener.try_clone().unwrap();
PluginBackend::new(listener, plugin, f, i)
PluginBackend::new(listener, plugin, f, i, ev)
})
}),
validate_conf_fn: Box::new(|_| Ok(())),

View File

@ -30,7 +30,7 @@ Input is received in the main loop from threads which listen on the stdin for us
use super::*;
use crate::plugins::PluginManager;
use melib::backends::{AccountHash, MailboxHash, NotifyFn};
use melib::backends::{AccountHash, BackendEventConsumer};
use crate::jobs::JobExecutor;
use crossbeam::channel::{unbounded, Receiver, Sender};
@ -284,14 +284,16 @@ impl State {
work_controller.get_context(),
job_executor.clone(),
sender.clone(),
NotifyFn::new(Box::new(move |f: MailboxHash| {
sender
.send(ThreadEvent::UIEvent(UIEvent::WorkerProgress(
account_hash,
f,
)))
.unwrap();
})),
BackendEventConsumer::new(Arc::new(
move |account_hash: AccountHash, ev: BackendEvent| {
sender
.send(ThreadEvent::UIEvent(UIEvent::BackendEvent(
account_hash,
ev,
)))
.unwrap();
},
)),
)
})
.collect::<Result<Vec<Account>>>()?
@ -1028,8 +1030,31 @@ impl State {
self.child = Some(child);
return;
}
UIEvent::WorkerProgress(account_hash, mailbox_hash) => {
let _ = self.context.accounts[&account_hash].load(mailbox_hash);
UIEvent::BackendEvent(
account_hash,
BackendEvent::Notice {
ref description,
ref content,
level,
},
) => {
log(
format!(
"{}: {}{}{}",
self.context.accounts[&account_hash].name(),
description.as_ref().map(|s| s.as_str()).unwrap_or(""),
if description.is_some() { ": " } else { "" },
content.as_str()
),
level,
);
self.rcv_event(UIEvent::StatusEvent(StatusEvent::DisplayMessage(
content.to_string(),
)));
return;
}
UIEvent::BackendEvent(_, BackendEvent::Refresh(refresh_event)) => {
self.refresh_event(refresh_event);
return;
}
UIEvent::ChangeMode(m) => {

View File

@ -40,7 +40,7 @@ use super::jobs::JobId;
use super::terminal::*;
use crate::components::{Component, ComponentId};
use melib::backends::{AccountHash, MailboxHash};
use melib::backends::{AccountHash, BackendEvent, MailboxHash};
use melib::{EnvelopeHash, RefreshEvent, ThreadHash};
use nix::unistd::Pid;
use std::fmt;
@ -119,7 +119,7 @@ pub enum UIEvent {
MailboxCreate((AccountHash, MailboxHash)),
AccountStatusChange(AccountHash),
ComponentKill(Uuid),
WorkerProgress(AccountHash, MailboxHash),
BackendEvent(AccountHash, BackendEvent),
StartupCheck(MailboxHash),
RefreshEvent(Box<RefreshEvent>),
EnvelopeUpdate(EnvelopeHash),

View File

@ -1,8 +1,8 @@
extern crate melib;
use melib::backends::ImapType;
use melib::AccountSettings;
use melib::Result;
use melib::{AccountSettings, BackendEventConsumer};
/// Opens an interactive shell on an IMAP server. Suggested use is with rlwrap(1)
///
@ -42,7 +42,11 @@ fn main() -> Result<()> {
.collect(),
..Default::default()
};
let mut imap = ImapType::new(&set, Box::new(|_| true))?;
let mut imap = ImapType::new(
&set,
Box::new(|_| true),
BackendEventConsumer::new(std::sync::Arc::new(|_, _| ())),
)?;
(imap.as_any_mut())
.downcast_mut::<ImapType>()