Compare commits

...

2 Commits

Author SHA1 Message Date
Manos Pitsidianakis a05f36341b melib/jmap: add some connection status messages and errors 2022-10-06 16:44:40 +03:00
Manos Pitsidianakis 659a34bf21 accounts: add new IsOnline type and exponential backoff connect retry
Replace unlimited instant retry to connect with a limited exponential
backoff strategy.
2022-10-06 16:33:01 +03:00
7 changed files with 294 additions and 95 deletions

View File

@ -105,6 +105,12 @@ impl Default for Backends {
}
}
#[cfg(feature = "jmap_backend")]
pub const JMAP_ERROR_MSG: &str = "";
#[cfg(not(feature = "jmap_backend"))]
pub const JMAP_ERROR_MSG: &str = "This library build lacks JMAP support. JMAP requires an HTTP client dependency and thus is turned off by default when compiling.";
#[cfg(feature = "notmuch_backend")]
pub const NOTMUCH_ERROR_MSG: &str =
"libnotmuch5 was not found in your system. Make sure it is installed and in the library paths. For a custom file path, use `library_file_path` setting in your notmuch account.\n";
@ -219,6 +225,11 @@ impl Backends {
{
eprint!("{}", NOTMUCH_ERROR_DETAILS);
}
} else if key == "jmap" {
#[cfg(not(feature = "jmap_backend"))]
{
eprintln!("{}", JMAP_ERROR_MSG);
}
}
panic!("{} is not a valid mail backend", key);
}
@ -247,6 +258,8 @@ impl Backends {
key,
if cfg!(feature = "notmuch_backend") && key == "notmuch" {
NOTMUCH_ERROR_DETAILS
} else if !cfg!(feature = "jmap_backend") && key == "jmap" {
JMAP_ERROR_MSG
} else {
""
},

View File

@ -20,6 +20,7 @@
*/
use super::*;
use crate::error::IntoMeliError;
use isahc::config::Configurable;
use std::sync::MutexGuard;
@ -34,6 +35,12 @@ pub struct JmapConnection {
impl JmapConnection {
pub fn new(server_conf: &JmapServerConf, store: Arc<Store>) -> Result<Self> {
(store.event_consumer)(
store.account_hash,
crate::backends::BackendEvent::AccountStateChange {
message: "Creating connection.".into(),
},
);
let client = HttpClient::builder()
.timeout(std::time::Duration::from_secs(10))
.redirect_policy(RedirectPolicy::Limit(10))
@ -53,6 +60,15 @@ impl JmapConnection {
})
}
#[inline(always)]
pub(crate) fn account_state_change(&self, message: impl Into<std::borrow::Cow<'static, str>>) {
let message = message.into();
(self.store.event_consumer)(
self.store.account_hash,
crate::backends::BackendEvent::AccountStateChange { message },
);
}
pub async fn connect(&mut self) -> Result<()> {
if self.store.online_status.lock().await.1.is_ok() {
return Ok(());
@ -63,12 +79,16 @@ impl JmapConnection {
jmap_session_resource_url.push_str(&self.server_conf.server_port.to_string());
}
jmap_session_resource_url.push_str("/.well-known/jmap");
self.account_state_change(format!("Requesting {}…", &jmap_session_resource_url));
let mut req = self.client.get_async(&jmap_session_resource_url).await.map_err(|err| {
let err = MeliError::new(format!("Could not connect to JMAP server endpoint for {}. Is your server hostname setting correct? (i.e. \"jmap.mailserver.org\") (Note: only session resource discovery via /.well-known/jmap is supported. DNS SRV records are not suppported.)\nError connecting to server: {}", &self.server_conf.server_hostname, &err)).set_source(Some(Arc::new(err)));
//*self.store.online_status.lock().await = (Instant::now(), Err(err.clone()));
err
})?;
let mut req = match self.client.get_async(&jmap_session_resource_url).await {
Err(err) => {
let err = err.set_err_summary(format!("Could not connect to JMAP server endpoint for {}. Is your server hostname setting correct? (i.e. \"jmap.mailserver.org\") (Note: only session resource discovery via /.well-known/jmap is supported. DNS SRV records are not suppported.)\nError connecting to server.", &self.server_conf.server_hostname));
*self.store.online_status.lock().await = (Instant::now(), Err(err.clone()));
return Err(err);
}
Ok(r) => r,
};
if !req.status().is_success() {
let kind: crate::error::NetworkErrorKind = req.status().into();
@ -86,7 +106,7 @@ impl JmapConnection {
let session: JmapSession = match serde_json::from_str(&res_text) {
Err(err) => {
let err = MeliError::new(format!("Could not connect to JMAP server endpoint for {}. Is your server hostname setting correct? (i.e. \"jmap.mailserver.org\") (Note: only session resource discovery via /.well-known/jmap is supported. DNS SRV records are not suppported.)\nReply from server: {}", &self.server_conf.server_hostname, &res_text)).set_source(Some(Arc::new(err)));
let err = err.set_err_summary(format!("Could not connect to JMAP server endpoint for {}. Is your server hostname setting correct? (i.e. \"jmap.mailserver.org\") (Note: only session resource discovery via /.well-known/jmap is supported. DNS SRV records are not suppported.)\nReply from server: {}", &self.server_conf.server_hostname, &res_text));
*self.store.online_status.lock().await = (Instant::now(), Err(err.clone()));
return Err(err);
}

View File

@ -85,6 +85,7 @@ pub struct JsonResponse<'a> {
}
pub async fn get_mailboxes(conn: &JmapConnection) -> Result<HashMap<MailboxHash, JmapMailbox>> {
conn.account_state_change("Fetching mailbox list…");
let seq = get_request_no!(conn.request_no);
let api_url = conn.session.lock().unwrap().api_url.clone();
let mut res = conn
@ -180,6 +181,7 @@ pub async fn get_message_list(
conn: &JmapConnection,
mailbox: &JmapMailbox,
) -> Result<Vec<Id<EmailObject>>> {
conn.account_state_change(format!("Fetching email list for {}…", mailbox.name()));
let email_call: EmailQuery = EmailQuery::new(
Query::new()
.account_id(conn.mail_account_id().clone())

View File

@ -24,7 +24,7 @@ use std::fs::File;
use std::io::prelude::*;
use std::time::SystemTime;
fn random_u64() -> u64 {
pub fn random_u64() -> u64 {
let mut f = File::open("/dev/urandom").unwrap();
let mut buffer = [0; 8];

View File

@ -25,6 +25,7 @@
use super::{AccountConf, FileMailboxConf};
use crate::jobs::{JobExecutor, JobId, JoinHandle};
use crate::RateLimit;
use indexmap::IndexMap;
use melib::backends::*;
use melib::email::*;
@ -130,11 +131,47 @@ impl MailboxEntry {
}
}
#[derive(Debug)]
pub enum IsOnline {
Unitialized {
last_activity: std::time::Instant,
rate_limit: RateLimit,
},
Yes {
last_activity: std::time::Instant,
},
No {
last_activity: std::time::Instant,
err: MeliError,
},
NoWillRetry {
last_activity: std::time::Instant,
err: MeliError,
rate_limit: RateLimit,
},
}
impl IsOnline {
#[inline(always)]
pub fn as_err(&self) -> Option<&MeliError> {
if let IsOnline::No { ref err, .. } | IsOnline::NoWillRetry { ref err, .. } = self {
Some(err)
} else {
None
}
}
#[inline(always)]
pub fn is_ok(&self) -> bool {
matches!(&self, IsOnline::Yes { .. })
}
}
#[derive(Debug)]
pub struct Account {
name: String,
hash: AccountHash,
pub is_online: Result<()>,
pub is_online: IsOnline,
pub(crate) mailbox_entries: IndexMap<MailboxHash, MailboxEntry>,
pub(crate) mailboxes_order: Vec<MailboxHash>,
tree: Vec<MailboxNode>,
@ -338,6 +375,10 @@ impl core::fmt::Display for JobRequest {
}
impl JobRequest {
pub fn is_mailboxes(&self) -> bool {
matches!(self, JobRequest::Mailboxes { .. })
}
pub fn is_watch(&self) -> bool {
matches!(self, JobRequest::Watch { .. })
}
@ -518,9 +559,14 @@ impl Account {
hash,
name,
is_online: if !backend.capabilities().is_remote {
Ok(())
IsOnline::Yes {
last_activity: std::time::Instant::now(),
}
} else {
Err(MeliError::new("Attempting connection."))
IsOnline::Unitialized {
last_activity: std::time::Instant::now(),
rate_limit: RateLimit::new(1, 1, job_executor.clone()).exponential_backoff(5),
}
},
mailbox_entries: Default::default(),
mailboxes_order: Default::default(),
@ -1555,36 +1601,25 @@ impl Account {
/* Call only in Context::is_online, since only Context can launch the watcher threads if an
* account goes from offline to online. */
pub fn is_online(&mut self) -> Result<()> {
if !self.backend_capabilities.is_remote && !self.backend_capabilities.is_async {
return Ok(());
}
if self.is_online.is_err()
&& self
.is_online
.as_ref()
.unwrap_err()
.kind
.is_authentication()
{
return self.is_online.clone();
}
if self.is_online.is_ok() {
return Ok(());
}
if !self.active_jobs.values().any(JobRequest::is_online) {
let online_job = self.backend.read().unwrap().is_online();
if let Ok(online_job) = online_job {
let handle = if self.backend_capabilities.is_async {
self.job_executor.spawn_specialized(online_job)
} else {
self.job_executor.spawn_blocking(online_job)
};
self.insert_job(handle.job_id, JobRequest::IsOnline { handle });
pub fn is_online(&mut self) -> &IsOnline {
match &self.is_online {
_ if !self.backend_capabilities.is_remote && !self.backend_capabilities.is_async => {}
IsOnline::Unitialized { .. } | IsOnline::No { .. } | IsOnline::Yes { .. } => {}
IsOnline::NoWillRetry { .. }
if self.active_jobs.values().any(JobRequest::is_online) => {}
IsOnline::NoWillRetry { .. } => {
let online_job = self.backend.read().unwrap().is_online();
if let Ok(online_job) = online_job {
let handle = if self.backend_capabilities.is_async {
self.job_executor.spawn_specialized(online_job)
} else {
self.job_executor.spawn_blocking(online_job)
};
self.insert_job(handle.job_id, JobRequest::IsOnline { handle });
}
}
}
self.is_online.clone()
&self.is_online
}
pub fn search(
@ -1632,6 +1667,135 @@ impl Account {
}
}
pub fn update_status(&mut self, result: Result<()>) {
if let Err(err) = result {
if err.kind.is_authentication() {
self.sender
.send(ThreadEvent::UIEvent(UIEvent::Notification(
Some(format!("{}: authentication error", &self.name)),
err.to_string(),
Some(crate::types::NotificationType::Error(err.kind)),
)))
.expect("Could not send event on main channel");
self.is_online = IsOnline::No {
last_activity: std::time::Instant::now(),
err,
};
return;
}
if let IsOnline::NoWillRetry {
ref mut last_activity,
err: ref mut err_ptr,
ref mut rate_limit,
} = self.is_online
{
if rate_limit.active {
*last_activity = std::time::Instant::now();
*err_ptr = err;
} else {
self.is_online = IsOnline::No {
last_activity: std::time::Instant::now(),
err,
};
}
} else {
self.is_online = IsOnline::NoWillRetry {
last_activity: std::time::Instant::now(),
err,
rate_limit: RateLimit::new(1, 1, self.job_executor.clone())
.exponential_backoff(5),
};
}
} else {
if self.mailbox_entries.is_empty() {
let backend = self.backend.read().unwrap();
if let Ok(mailboxes_job) = backend.mailboxes() {
if let Ok(online_job) = backend.is_online() {
let handle = if backend.capabilities().is_async {
self.job_executor
.spawn_specialized(online_job.then(|_| mailboxes_job))
} else {
self.job_executor
.spawn_blocking(online_job.then(|_| mailboxes_job))
};
let job_id = handle.job_id;
self.active_jobs
.insert(job_id, JobRequest::Mailboxes { handle });
self.active_job_instants
.insert(std::time::Instant::now(), job_id);
self.sender
.send(ThreadEvent::UIEvent(UIEvent::StatusEvent(
StatusEvent::NewJob(job_id),
)))
.unwrap();
}
}
self.is_online = IsOnline::Unitialized {
last_activity: std::time::Instant::now(),
rate_limit: RateLimit::new(1, 1, self.job_executor.clone())
.exponential_backoff(5),
};
} else {
if let IsOnline::Yes {
ref mut last_activity,
} = self.is_online
{
*last_activity = std::time::Instant::now();
} else {
self.is_online = IsOnline::Yes {
last_activity: std::time::Instant::now(),
};
}
}
}
self.sender
.send(ThreadEvent::UIEvent(UIEvent::AccountStatusChange(
self.hash, None,
)))
.unwrap();
}
pub fn retry_mailboxes(&mut self) {
match self.is_online {
IsOnline::NoWillRetry {
ref rate_limit,
ref err,
last_activity,
} if !rate_limit.active => {
self.is_online = IsOnline::No {
err: err.clone(),
last_activity,
};
return;
}
IsOnline::NoWillRetry { .. } | IsOnline::Unitialized { .. } => {}
_ => return,
}
let backend = self.backend.read().unwrap();
if let Ok(mailboxes_job) = backend.mailboxes() {
if let Ok(online_job) = backend.is_online() {
let handle = if backend.capabilities().is_async {
self.job_executor
.spawn_specialized(online_job.then(|_| mailboxes_job))
} else {
self.job_executor
.spawn_blocking(online_job.then(|_| mailboxes_job))
};
let job_id = handle.job_id;
self.active_jobs
.insert(job_id, JobRequest::Mailboxes { handle });
self.active_job_instants
.insert(std::time::Instant::now(), job_id);
self.sender
.send(ThreadEvent::UIEvent(UIEvent::StatusEvent(
StatusEvent::NewJob(job_id),
)))
.unwrap();
}
}
}
pub fn process_event(&mut self, job_id: &JobId) -> bool {
self.sender
.send(ThreadEvent::UIEvent(UIEvent::StatusEvent(
@ -1644,26 +1808,8 @@ impl Account {
JobRequest::Mailboxes { ref mut handle } => {
if let Ok(Some(mailboxes)) = handle.chan.try_recv() {
if let Err(err) = mailboxes.and_then(|mailboxes| self.init(mailboxes)) {
if err.kind.is_authentication() {
self.sender
.send(ThreadEvent::UIEvent(UIEvent::Notification(
Some(format!("{}: authentication error", &self.name)),
err.to_string(),
Some(crate::types::NotificationType::Error(err.kind)),
)))
.expect("Could not send event on main channel");
self.is_online = Err(err);
return true;
}
let mailboxes_job = self.backend.read().unwrap().mailboxes();
if let Ok(mailboxes_job) = mailboxes_job {
let handle = if self.backend_capabilities.is_async {
self.job_executor.spawn_specialized(mailboxes_job)
} else {
self.job_executor.spawn_blocking(mailboxes_job)
};
self.insert_job(handle.job_id, JobRequest::Mailboxes { handle });
};
self.update_status(Err(err));
return true;
} else {
self.sender
.send(ThreadEvent::UIEvent(UIEvent::AccountStatusChange(
@ -1770,20 +1916,13 @@ impl Account {
)))
.unwrap();
if is_online.is_ok() {
if self.is_online.is_err()
&& !self
.is_online
.as_ref()
.unwrap_err()
.kind
.is_authentication()
{
if self.is_online.as_err().is_none() {
self.watch();
}
self.is_online = Ok(());
self.update_status(Ok(()));
return true;
}
self.is_online = is_online;
self.update_status(is_online);
}
let online_job = self.backend.read().unwrap().is_online();
if let Ok(online_job) = online_job {
@ -1800,31 +1939,10 @@ impl Account {
Err(_) => { /* canceled */ }
Ok(None) => {}
Ok(Some(Ok(()))) => {
if self.is_online.is_err()
&& !self
.is_online
.as_ref()
.unwrap_err()
.kind
.is_authentication()
{
self.update_status(Ok(()));
if self.is_online.as_err().is_none() {
self.watch();
}
if !(self.is_online.is_err()
&& self
.is_online
.as_ref()
.unwrap_err()
.kind
.is_authentication())
{
self.is_online = Ok(());
self.sender
.send(ThreadEvent::UIEvent(UIEvent::AccountStatusChange(
self.hash, None,
)))
.unwrap();
}
}
Ok(Some(Err(err))) => {
if !err.kind.is_authentication() {
@ -1838,7 +1956,7 @@ impl Account {
self.insert_job(handle.job_id, JobRequest::IsOnline { handle });
};
}
self.is_online = Err(err);
self.update_status(Err(err));
self.sender
.send(ThreadEvent::UIEvent(UIEvent::AccountStatusChange(
self.hash, None,

View File

@ -134,6 +134,11 @@ impl Context {
} = self;
let was_online = accounts[account_pos].is_online.is_ok();
let ret = accounts[account_pos].is_online();
let ret = if let Some(err) = ret.as_err() {
Err(err.clone())
} else {
Ok(())
};
if ret.is_ok() && !was_online {
debug!("inserting mailbox hashes:");
for mailbox_node in accounts[account_pos].list_mailboxes() {
@ -1040,6 +1045,22 @@ impl State {
self.redraw();
return;
}
UIEvent::Timer(id) => {
for i in 0..self.context.accounts.len() {
use crate::conf::accounts::IsOnline;
match self.context.accounts[i].is_online {
IsOnline::Unitialized { ref rate_limit, .. }
| IsOnline::NoWillRetry { ref rate_limit, .. }
if rate_limit.id() == id =>
{
self.context.accounts[i].retry_mailboxes();
return;
}
_ => {}
}
}
}
UIEvent::Input(ref key)
if *key
== self

View File

@ -316,6 +316,7 @@ pub struct RateLimit {
pub timer: crate::jobs::Timer,
rate: std::time::Duration,
pub active: bool,
retries: Option<u8>,
}
impl RateLimit {
@ -328,9 +329,20 @@ impl RateLimit {
),
rate: std::time::Duration::from_millis(millis / reqs),
active: false,
retries: None,
}
}
pub fn once(mut self) -> Self {
self.retries = Some(1);
self
}
pub fn exponential_backoff(mut self, n: u8) -> Self {
self.retries = Some(n);
self
}
pub fn reset(&mut self) {
self.last_tick = std::time::Instant::now();
self.active = false;
@ -341,9 +353,22 @@ impl RateLimit {
if self.last_tick + self.rate > now {
self.active = false;
} else {
self.timer.rearm();
self.last_tick = now;
self.active = true;
let jitter = if self.retries.is_some() {
Some(melib::email::compose::random::random_u64() % 1000)
} else {
None
};
if let Some(v) = self.retries {
self.retries = Some(v.saturating_sub(1));
}
self.active = self.retries.is_none() || self.retries != Some(0);
if self.active {
self.timer.rearm();
self.last_tick = now;
if let Some(jitter) = jitter {
self.last_tick += std::time::Duration::from_millis(jitter);
}
}
}
self.active
}