forked from meli/meli
1
Fork 0

Compare commits

...

2 Commits

Author SHA1 Message Date
Manos Pitsidianakis a05f36341b melib/jmap: add some connection status messages and errors 2022-10-06 16:44:40 +03:00
Manos Pitsidianakis 659a34bf21 accounts: add new IsOnline type and exponential backoff connect retry
Replace unlimited instant retry to connect with a limited exponential
backoff strategy.
2022-10-06 16:33:01 +03:00
7 changed files with 294 additions and 95 deletions

View File

@ -105,6 +105,12 @@ impl Default for Backends {
} }
} }
#[cfg(feature = "jmap_backend")]
pub const JMAP_ERROR_MSG: &str = "";
#[cfg(not(feature = "jmap_backend"))]
pub const JMAP_ERROR_MSG: &str = "This library build lacks JMAP support. JMAP requires an HTTP client dependency and thus is turned off by default when compiling.";
#[cfg(feature = "notmuch_backend")] #[cfg(feature = "notmuch_backend")]
pub const NOTMUCH_ERROR_MSG: &str = pub const NOTMUCH_ERROR_MSG: &str =
"libnotmuch5 was not found in your system. Make sure it is installed and in the library paths. For a custom file path, use `library_file_path` setting in your notmuch account.\n"; "libnotmuch5 was not found in your system. Make sure it is installed and in the library paths. For a custom file path, use `library_file_path` setting in your notmuch account.\n";
@ -219,6 +225,11 @@ impl Backends {
{ {
eprint!("{}", NOTMUCH_ERROR_DETAILS); eprint!("{}", NOTMUCH_ERROR_DETAILS);
} }
} else if key == "jmap" {
#[cfg(not(feature = "jmap_backend"))]
{
eprintln!("{}", JMAP_ERROR_MSG);
}
} }
panic!("{} is not a valid mail backend", key); panic!("{} is not a valid mail backend", key);
} }
@ -247,6 +258,8 @@ impl Backends {
key, key,
if cfg!(feature = "notmuch_backend") && key == "notmuch" { if cfg!(feature = "notmuch_backend") && key == "notmuch" {
NOTMUCH_ERROR_DETAILS NOTMUCH_ERROR_DETAILS
} else if !cfg!(feature = "jmap_backend") && key == "jmap" {
JMAP_ERROR_MSG
} else { } else {
"" ""
}, },

View File

@ -20,6 +20,7 @@
*/ */
use super::*; use super::*;
use crate::error::IntoMeliError;
use isahc::config::Configurable; use isahc::config::Configurable;
use std::sync::MutexGuard; use std::sync::MutexGuard;
@ -34,6 +35,12 @@ pub struct JmapConnection {
impl JmapConnection { impl JmapConnection {
pub fn new(server_conf: &JmapServerConf, store: Arc<Store>) -> Result<Self> { pub fn new(server_conf: &JmapServerConf, store: Arc<Store>) -> Result<Self> {
(store.event_consumer)(
store.account_hash,
crate::backends::BackendEvent::AccountStateChange {
message: "Creating connection.".into(),
},
);
let client = HttpClient::builder() let client = HttpClient::builder()
.timeout(std::time::Duration::from_secs(10)) .timeout(std::time::Duration::from_secs(10))
.redirect_policy(RedirectPolicy::Limit(10)) .redirect_policy(RedirectPolicy::Limit(10))
@ -53,6 +60,15 @@ impl JmapConnection {
}) })
} }
#[inline(always)]
pub(crate) fn account_state_change(&self, message: impl Into<std::borrow::Cow<'static, str>>) {
let message = message.into();
(self.store.event_consumer)(
self.store.account_hash,
crate::backends::BackendEvent::AccountStateChange { message },
);
}
pub async fn connect(&mut self) -> Result<()> { pub async fn connect(&mut self) -> Result<()> {
if self.store.online_status.lock().await.1.is_ok() { if self.store.online_status.lock().await.1.is_ok() {
return Ok(()); return Ok(());
@ -63,12 +79,16 @@ impl JmapConnection {
jmap_session_resource_url.push_str(&self.server_conf.server_port.to_string()); jmap_session_resource_url.push_str(&self.server_conf.server_port.to_string());
} }
jmap_session_resource_url.push_str("/.well-known/jmap"); jmap_session_resource_url.push_str("/.well-known/jmap");
self.account_state_change(format!("Requesting {}…", &jmap_session_resource_url));
let mut req = self.client.get_async(&jmap_session_resource_url).await.map_err(|err| { let mut req = match self.client.get_async(&jmap_session_resource_url).await {
let err = MeliError::new(format!("Could not connect to JMAP server endpoint for {}. Is your server hostname setting correct? (i.e. \"jmap.mailserver.org\") (Note: only session resource discovery via /.well-known/jmap is supported. DNS SRV records are not suppported.)\nError connecting to server: {}", &self.server_conf.server_hostname, &err)).set_source(Some(Arc::new(err))); Err(err) => {
//*self.store.online_status.lock().await = (Instant::now(), Err(err.clone())); let err = err.set_err_summary(format!("Could not connect to JMAP server endpoint for {}. Is your server hostname setting correct? (i.e. \"jmap.mailserver.org\") (Note: only session resource discovery via /.well-known/jmap is supported. DNS SRV records are not suppported.)\nError connecting to server.", &self.server_conf.server_hostname));
err *self.store.online_status.lock().await = (Instant::now(), Err(err.clone()));
})?; return Err(err);
}
Ok(r) => r,
};
if !req.status().is_success() { if !req.status().is_success() {
let kind: crate::error::NetworkErrorKind = req.status().into(); let kind: crate::error::NetworkErrorKind = req.status().into();
@ -86,7 +106,7 @@ impl JmapConnection {
let session: JmapSession = match serde_json::from_str(&res_text) { let session: JmapSession = match serde_json::from_str(&res_text) {
Err(err) => { Err(err) => {
let err = MeliError::new(format!("Could not connect to JMAP server endpoint for {}. Is your server hostname setting correct? (i.e. \"jmap.mailserver.org\") (Note: only session resource discovery via /.well-known/jmap is supported. DNS SRV records are not suppported.)\nReply from server: {}", &self.server_conf.server_hostname, &res_text)).set_source(Some(Arc::new(err))); let err = err.set_err_summary(format!("Could not connect to JMAP server endpoint for {}. Is your server hostname setting correct? (i.e. \"jmap.mailserver.org\") (Note: only session resource discovery via /.well-known/jmap is supported. DNS SRV records are not suppported.)\nReply from server: {}", &self.server_conf.server_hostname, &res_text));
*self.store.online_status.lock().await = (Instant::now(), Err(err.clone())); *self.store.online_status.lock().await = (Instant::now(), Err(err.clone()));
return Err(err); return Err(err);
} }

View File

@ -85,6 +85,7 @@ pub struct JsonResponse<'a> {
} }
pub async fn get_mailboxes(conn: &JmapConnection) -> Result<HashMap<MailboxHash, JmapMailbox>> { pub async fn get_mailboxes(conn: &JmapConnection) -> Result<HashMap<MailboxHash, JmapMailbox>> {
conn.account_state_change("Fetching mailbox list…");
let seq = get_request_no!(conn.request_no); let seq = get_request_no!(conn.request_no);
let api_url = conn.session.lock().unwrap().api_url.clone(); let api_url = conn.session.lock().unwrap().api_url.clone();
let mut res = conn let mut res = conn
@ -180,6 +181,7 @@ pub async fn get_message_list(
conn: &JmapConnection, conn: &JmapConnection,
mailbox: &JmapMailbox, mailbox: &JmapMailbox,
) -> Result<Vec<Id<EmailObject>>> { ) -> Result<Vec<Id<EmailObject>>> {
conn.account_state_change(format!("Fetching email list for {}…", mailbox.name()));
let email_call: EmailQuery = EmailQuery::new( let email_call: EmailQuery = EmailQuery::new(
Query::new() Query::new()
.account_id(conn.mail_account_id().clone()) .account_id(conn.mail_account_id().clone())

View File

@ -24,7 +24,7 @@ use std::fs::File;
use std::io::prelude::*; use std::io::prelude::*;
use std::time::SystemTime; use std::time::SystemTime;
fn random_u64() -> u64 { pub fn random_u64() -> u64 {
let mut f = File::open("/dev/urandom").unwrap(); let mut f = File::open("/dev/urandom").unwrap();
let mut buffer = [0; 8]; let mut buffer = [0; 8];

View File

@ -25,6 +25,7 @@
use super::{AccountConf, FileMailboxConf}; use super::{AccountConf, FileMailboxConf};
use crate::jobs::{JobExecutor, JobId, JoinHandle}; use crate::jobs::{JobExecutor, JobId, JoinHandle};
use crate::RateLimit;
use indexmap::IndexMap; use indexmap::IndexMap;
use melib::backends::*; use melib::backends::*;
use melib::email::*; use melib::email::*;
@ -130,11 +131,47 @@ impl MailboxEntry {
} }
} }
#[derive(Debug)]
pub enum IsOnline {
Unitialized {
last_activity: std::time::Instant,
rate_limit: RateLimit,
},
Yes {
last_activity: std::time::Instant,
},
No {
last_activity: std::time::Instant,
err: MeliError,
},
NoWillRetry {
last_activity: std::time::Instant,
err: MeliError,
rate_limit: RateLimit,
},
}
impl IsOnline {
#[inline(always)]
pub fn as_err(&self) -> Option<&MeliError> {
if let IsOnline::No { ref err, .. } | IsOnline::NoWillRetry { ref err, .. } = self {
Some(err)
} else {
None
}
}
#[inline(always)]
pub fn is_ok(&self) -> bool {
matches!(&self, IsOnline::Yes { .. })
}
}
#[derive(Debug)] #[derive(Debug)]
pub struct Account { pub struct Account {
name: String, name: String,
hash: AccountHash, hash: AccountHash,
pub is_online: Result<()>, pub is_online: IsOnline,
pub(crate) mailbox_entries: IndexMap<MailboxHash, MailboxEntry>, pub(crate) mailbox_entries: IndexMap<MailboxHash, MailboxEntry>,
pub(crate) mailboxes_order: Vec<MailboxHash>, pub(crate) mailboxes_order: Vec<MailboxHash>,
tree: Vec<MailboxNode>, tree: Vec<MailboxNode>,
@ -338,6 +375,10 @@ impl core::fmt::Display for JobRequest {
} }
impl JobRequest { impl JobRequest {
pub fn is_mailboxes(&self) -> bool {
matches!(self, JobRequest::Mailboxes { .. })
}
pub fn is_watch(&self) -> bool { pub fn is_watch(&self) -> bool {
matches!(self, JobRequest::Watch { .. }) matches!(self, JobRequest::Watch { .. })
} }
@ -518,9 +559,14 @@ impl Account {
hash, hash,
name, name,
is_online: if !backend.capabilities().is_remote { is_online: if !backend.capabilities().is_remote {
Ok(()) IsOnline::Yes {
last_activity: std::time::Instant::now(),
}
} else { } else {
Err(MeliError::new("Attempting connection.")) IsOnline::Unitialized {
last_activity: std::time::Instant::now(),
rate_limit: RateLimit::new(1, 1, job_executor.clone()).exponential_backoff(5),
}
}, },
mailbox_entries: Default::default(), mailbox_entries: Default::default(),
mailboxes_order: Default::default(), mailboxes_order: Default::default(),
@ -1555,36 +1601,25 @@ impl Account {
/* Call only in Context::is_online, since only Context can launch the watcher threads if an /* Call only in Context::is_online, since only Context can launch the watcher threads if an
* account goes from offline to online. */ * account goes from offline to online. */
pub fn is_online(&mut self) -> Result<()> { pub fn is_online(&mut self) -> &IsOnline {
if !self.backend_capabilities.is_remote && !self.backend_capabilities.is_async { match &self.is_online {
return Ok(()); _ if !self.backend_capabilities.is_remote && !self.backend_capabilities.is_async => {}
} IsOnline::Unitialized { .. } | IsOnline::No { .. } | IsOnline::Yes { .. } => {}
IsOnline::NoWillRetry { .. }
if self.is_online.is_err() if self.active_jobs.values().any(JobRequest::is_online) => {}
&& self IsOnline::NoWillRetry { .. } => {
.is_online let online_job = self.backend.read().unwrap().is_online();
.as_ref() if let Ok(online_job) = online_job {
.unwrap_err() let handle = if self.backend_capabilities.is_async {
.kind self.job_executor.spawn_specialized(online_job)
.is_authentication() } else {
{ self.job_executor.spawn_blocking(online_job)
return self.is_online.clone(); };
} self.insert_job(handle.job_id, JobRequest::IsOnline { handle });
if self.is_online.is_ok() { }
return Ok(());
}
if !self.active_jobs.values().any(JobRequest::is_online) {
let online_job = self.backend.read().unwrap().is_online();
if let Ok(online_job) = online_job {
let handle = if self.backend_capabilities.is_async {
self.job_executor.spawn_specialized(online_job)
} else {
self.job_executor.spawn_blocking(online_job)
};
self.insert_job(handle.job_id, JobRequest::IsOnline { handle });
} }
} }
self.is_online.clone() &self.is_online
} }
pub fn search( pub fn search(
@ -1632,6 +1667,135 @@ impl Account {
} }
} }
pub fn update_status(&mut self, result: Result<()>) {
if let Err(err) = result {
if err.kind.is_authentication() {
self.sender
.send(ThreadEvent::UIEvent(UIEvent::Notification(
Some(format!("{}: authentication error", &self.name)),
err.to_string(),
Some(crate::types::NotificationType::Error(err.kind)),
)))
.expect("Could not send event on main channel");
self.is_online = IsOnline::No {
last_activity: std::time::Instant::now(),
err,
};
return;
}
if let IsOnline::NoWillRetry {
ref mut last_activity,
err: ref mut err_ptr,
ref mut rate_limit,
} = self.is_online
{
if rate_limit.active {
*last_activity = std::time::Instant::now();
*err_ptr = err;
} else {
self.is_online = IsOnline::No {
last_activity: std::time::Instant::now(),
err,
};
}
} else {
self.is_online = IsOnline::NoWillRetry {
last_activity: std::time::Instant::now(),
err,
rate_limit: RateLimit::new(1, 1, self.job_executor.clone())
.exponential_backoff(5),
};
}
} else {
if self.mailbox_entries.is_empty() {
let backend = self.backend.read().unwrap();
if let Ok(mailboxes_job) = backend.mailboxes() {
if let Ok(online_job) = backend.is_online() {
let handle = if backend.capabilities().is_async {
self.job_executor
.spawn_specialized(online_job.then(|_| mailboxes_job))
} else {
self.job_executor
.spawn_blocking(online_job.then(|_| mailboxes_job))
};
let job_id = handle.job_id;
self.active_jobs
.insert(job_id, JobRequest::Mailboxes { handle });
self.active_job_instants
.insert(std::time::Instant::now(), job_id);
self.sender
.send(ThreadEvent::UIEvent(UIEvent::StatusEvent(
StatusEvent::NewJob(job_id),
)))
.unwrap();
}
}
self.is_online = IsOnline::Unitialized {
last_activity: std::time::Instant::now(),
rate_limit: RateLimit::new(1, 1, self.job_executor.clone())
.exponential_backoff(5),
};
} else {
if let IsOnline::Yes {
ref mut last_activity,
} = self.is_online
{
*last_activity = std::time::Instant::now();
} else {
self.is_online = IsOnline::Yes {
last_activity: std::time::Instant::now(),
};
}
}
}
self.sender
.send(ThreadEvent::UIEvent(UIEvent::AccountStatusChange(
self.hash, None,
)))
.unwrap();
}
pub fn retry_mailboxes(&mut self) {
match self.is_online {
IsOnline::NoWillRetry {
ref rate_limit,
ref err,
last_activity,
} if !rate_limit.active => {
self.is_online = IsOnline::No {
err: err.clone(),
last_activity,
};
return;
}
IsOnline::NoWillRetry { .. } | IsOnline::Unitialized { .. } => {}
_ => return,
}
let backend = self.backend.read().unwrap();
if let Ok(mailboxes_job) = backend.mailboxes() {
if let Ok(online_job) = backend.is_online() {
let handle = if backend.capabilities().is_async {
self.job_executor
.spawn_specialized(online_job.then(|_| mailboxes_job))
} else {
self.job_executor
.spawn_blocking(online_job.then(|_| mailboxes_job))
};
let job_id = handle.job_id;
self.active_jobs
.insert(job_id, JobRequest::Mailboxes { handle });
self.active_job_instants
.insert(std::time::Instant::now(), job_id);
self.sender
.send(ThreadEvent::UIEvent(UIEvent::StatusEvent(
StatusEvent::NewJob(job_id),
)))
.unwrap();
}
}
}
pub fn process_event(&mut self, job_id: &JobId) -> bool { pub fn process_event(&mut self, job_id: &JobId) -> bool {
self.sender self.sender
.send(ThreadEvent::UIEvent(UIEvent::StatusEvent( .send(ThreadEvent::UIEvent(UIEvent::StatusEvent(
@ -1644,26 +1808,8 @@ impl Account {
JobRequest::Mailboxes { ref mut handle } => { JobRequest::Mailboxes { ref mut handle } => {
if let Ok(Some(mailboxes)) = handle.chan.try_recv() { if let Ok(Some(mailboxes)) = handle.chan.try_recv() {
if let Err(err) = mailboxes.and_then(|mailboxes| self.init(mailboxes)) { if let Err(err) = mailboxes.and_then(|mailboxes| self.init(mailboxes)) {
if err.kind.is_authentication() { self.update_status(Err(err));
self.sender return true;
.send(ThreadEvent::UIEvent(UIEvent::Notification(
Some(format!("{}: authentication error", &self.name)),
err.to_string(),
Some(crate::types::NotificationType::Error(err.kind)),
)))
.expect("Could not send event on main channel");
self.is_online = Err(err);
return true;
}
let mailboxes_job = self.backend.read().unwrap().mailboxes();
if let Ok(mailboxes_job) = mailboxes_job {
let handle = if self.backend_capabilities.is_async {
self.job_executor.spawn_specialized(mailboxes_job)
} else {
self.job_executor.spawn_blocking(mailboxes_job)
};
self.insert_job(handle.job_id, JobRequest::Mailboxes { handle });
};
} else { } else {
self.sender self.sender
.send(ThreadEvent::UIEvent(UIEvent::AccountStatusChange( .send(ThreadEvent::UIEvent(UIEvent::AccountStatusChange(
@ -1770,20 +1916,13 @@ impl Account {
))) )))
.unwrap(); .unwrap();
if is_online.is_ok() { if is_online.is_ok() {
if self.is_online.is_err() if self.is_online.as_err().is_none() {
&& !self
.is_online
.as_ref()
.unwrap_err()
.kind
.is_authentication()
{
self.watch(); self.watch();
} }
self.is_online = Ok(()); self.update_status(Ok(()));
return true; return true;
} }
self.is_online = is_online; self.update_status(is_online);
} }
let online_job = self.backend.read().unwrap().is_online(); let online_job = self.backend.read().unwrap().is_online();
if let Ok(online_job) = online_job { if let Ok(online_job) = online_job {
@ -1800,31 +1939,10 @@ impl Account {
Err(_) => { /* canceled */ } Err(_) => { /* canceled */ }
Ok(None) => {} Ok(None) => {}
Ok(Some(Ok(()))) => { Ok(Some(Ok(()))) => {
if self.is_online.is_err() self.update_status(Ok(()));
&& !self if self.is_online.as_err().is_none() {
.is_online
.as_ref()
.unwrap_err()
.kind
.is_authentication()
{
self.watch(); self.watch();
} }
if !(self.is_online.is_err()
&& self
.is_online
.as_ref()
.unwrap_err()
.kind
.is_authentication())
{
self.is_online = Ok(());
self.sender
.send(ThreadEvent::UIEvent(UIEvent::AccountStatusChange(
self.hash, None,
)))
.unwrap();
}
} }
Ok(Some(Err(err))) => { Ok(Some(Err(err))) => {
if !err.kind.is_authentication() { if !err.kind.is_authentication() {
@ -1838,7 +1956,7 @@ impl Account {
self.insert_job(handle.job_id, JobRequest::IsOnline { handle }); self.insert_job(handle.job_id, JobRequest::IsOnline { handle });
}; };
} }
self.is_online = Err(err); self.update_status(Err(err));
self.sender self.sender
.send(ThreadEvent::UIEvent(UIEvent::AccountStatusChange( .send(ThreadEvent::UIEvent(UIEvent::AccountStatusChange(
self.hash, None, self.hash, None,

View File

@ -134,6 +134,11 @@ impl Context {
} = self; } = self;
let was_online = accounts[account_pos].is_online.is_ok(); let was_online = accounts[account_pos].is_online.is_ok();
let ret = accounts[account_pos].is_online(); let ret = accounts[account_pos].is_online();
let ret = if let Some(err) = ret.as_err() {
Err(err.clone())
} else {
Ok(())
};
if ret.is_ok() && !was_online { if ret.is_ok() && !was_online {
debug!("inserting mailbox hashes:"); debug!("inserting mailbox hashes:");
for mailbox_node in accounts[account_pos].list_mailboxes() { for mailbox_node in accounts[account_pos].list_mailboxes() {
@ -1040,6 +1045,22 @@ impl State {
self.redraw(); self.redraw();
return; return;
} }
UIEvent::Timer(id) => {
for i in 0..self.context.accounts.len() {
use crate::conf::accounts::IsOnline;
match self.context.accounts[i].is_online {
IsOnline::Unitialized { ref rate_limit, .. }
| IsOnline::NoWillRetry { ref rate_limit, .. }
if rate_limit.id() == id =>
{
self.context.accounts[i].retry_mailboxes();
return;
}
_ => {}
}
}
}
UIEvent::Input(ref key) UIEvent::Input(ref key)
if *key if *key
== self == self

View File

@ -316,6 +316,7 @@ pub struct RateLimit {
pub timer: crate::jobs::Timer, pub timer: crate::jobs::Timer,
rate: std::time::Duration, rate: std::time::Duration,
pub active: bool, pub active: bool,
retries: Option<u8>,
} }
impl RateLimit { impl RateLimit {
@ -328,9 +329,20 @@ impl RateLimit {
), ),
rate: std::time::Duration::from_millis(millis / reqs), rate: std::time::Duration::from_millis(millis / reqs),
active: false, active: false,
retries: None,
} }
} }
pub fn once(mut self) -> Self {
self.retries = Some(1);
self
}
pub fn exponential_backoff(mut self, n: u8) -> Self {
self.retries = Some(n);
self
}
pub fn reset(&mut self) { pub fn reset(&mut self) {
self.last_tick = std::time::Instant::now(); self.last_tick = std::time::Instant::now();
self.active = false; self.active = false;
@ -341,9 +353,22 @@ impl RateLimit {
if self.last_tick + self.rate > now { if self.last_tick + self.rate > now {
self.active = false; self.active = false;
} else { } else {
self.timer.rearm(); let jitter = if self.retries.is_some() {
self.last_tick = now; Some(melib::email::compose::random::random_u64() % 1000)
self.active = true; } else {
None
};
if let Some(v) = self.retries {
self.retries = Some(v.saturating_sub(1));
}
self.active = self.retries.is_none() || self.retries != Some(0);
if self.active {
self.timer.rearm();
self.last_tick = now;
if let Some(jitter) = jitter {
self.last_tick += std::time::Duration::from_millis(jitter);
}
}
} }
self.active self.active
} }