melib/imap: don't fail utterly if cache fails on fetch

Show notice to user, and then try a fresh fetch. Also try resetting the
cache if possible.
master
Manos Pitsidianakis 2020-09-12 21:24:45 +03:00
parent 7b324359c5
commit 6d9cdce923
Signed by: Manos Pitsidianakis
GPG Key ID: 73627C2F690DF710
5 changed files with 228 additions and 55 deletions

View File

@ -298,6 +298,27 @@ impl MailBackend for ImapType {
&mut self, &mut self,
mailbox_hash: MailboxHash, mailbox_hash: MailboxHash,
) -> Result<Pin<Box<dyn Stream<Item = Result<Vec<Envelope>>> + Send + 'static>>> { ) -> Result<Pin<Box<dyn Stream<Item = Result<Vec<Envelope>>> + Send + 'static>>> {
let cache_handle = {
#[cfg(feature = "sqlite3")]
if self.uid_store.keep_offline_cache {
match cache::Sqlite3Cache::get(self.uid_store.clone()).chain_err_summary(|| {
format!(
"Could not initialize cache for IMAP account {}",
self.uid_store.account_name
)
}) {
Ok(v) => Some(v),
Err(err) => {
(self.uid_store.event_consumer)(self.uid_store.account_hash, err.into());
None
}
}
} else {
None
}
#[cfg(not(feature = "sqlite3"))]
None
};
let mut state = FetchState { let mut state = FetchState {
stage: if self.uid_store.keep_offline_cache { stage: if self.uid_store.keep_offline_cache {
FetchStage::InitialCache FetchStage::InitialCache
@ -307,6 +328,7 @@ impl MailBackend for ImapType {
connection: self.connection.clone(), connection: self.connection.clone(),
mailbox_hash, mailbox_hash,
uid_store: self.uid_store.clone(), uid_store: self.uid_store.clone(),
cache_handle,
}; };
Ok(Box::pin(async_stream::try_stream! { Ok(Box::pin(async_stream::try_stream! {
@ -1455,6 +1477,7 @@ struct FetchState {
connection: Arc<FutureMutex<ImapConnection>>, connection: Arc<FutureMutex<ImapConnection>>,
mailbox_hash: MailboxHash, mailbox_hash: MailboxHash,
uid_store: Arc<UIDStore>, uid_store: Arc<UIDStore>,
cache_handle: Option<Box<dyn cache::ImapCache>>,
} }
async fn fetch_hlpr(state: &mut FetchState) -> Result<Vec<Envelope>> { async fn fetch_hlpr(state: &mut FetchState) -> Result<Vec<Envelope>> {
@ -1468,6 +1491,17 @@ async fn fetch_hlpr(state: &mut FetchState) -> Result<Vec<Envelope>> {
.await .await
.init_mailbox(state.mailbox_hash) .init_mailbox(state.mailbox_hash)
.await?; .await?;
if let Some(ref mut cache_handle) = state.cache_handle {
if let Err(err) = cache_handle
.update_mailbox(state.mailbox_hash, &select_response)
.chain_err_summary(|| {
format!("Could not update cache for mailbox {}.", state.mailbox_hash)
})
{
(state.uid_store.event_consumer)(state.uid_store.account_hash, err.into());
}
}
if select_response.exists == 0 { if select_response.exists == 0 {
state.stage = FetchStage::Finished; state.stage = FetchStage::Finished;
return Ok(Vec::new()); return Ok(Vec::new());
@ -1478,36 +1512,57 @@ async fn fetch_hlpr(state: &mut FetchState) -> Result<Vec<Envelope>> {
continue; continue;
} }
FetchStage::InitialCache => { FetchStage::InitialCache => {
if let Some(cached_payload) = cache::fetch_cached_envs(state).await? { match cache::fetch_cached_envs(state).await {
state.stage = FetchStage::ResyncCache; Err(err) => {
debug!( crate::log(
"fetch_hlpr fetch_cached_envs payload {} len for mailbox_hash {}", format!(
cached_payload.len(), "IMAP cache error: could not fetch cache for {}. Reason: {}",
state.mailbox_hash state.uid_store.account_name, err
); ),
let (mailbox_exists, unseen) = { crate::ERROR,
let f = &state.uid_store.mailboxes.lock().await[&state.mailbox_hash]; );
(f.exists.clone(), f.unseen.clone()) /* Try resetting the database */
}; if let Some(ref mut cache_handle) = state.cache_handle {
unseen.lock().unwrap().insert_existing_set( if let Err(err) = cache_handle.reset() {
cached_payload crate::log(format!("IMAP cache error: could not reset cache for {}. Reason: {}", state.uid_store.account_name, err), crate::ERROR);
.iter() }
.filter_map(|env| { }
if !env.is_seen() { state.stage = FetchStage::InitialFresh;
Some(env.hash()) continue;
} else { }
None Ok(None) => {
} state.stage = FetchStage::InitialFresh;
}) continue;
.collect(), }
); Ok(Some(cached_payload)) => {
mailbox_exists.lock().unwrap().insert_existing_set( state.stage = FetchStage::ResyncCache;
cached_payload.iter().map(|env| env.hash()).collect::<_>(), debug!(
); "fetch_hlpr fetch_cached_envs payload {} len for mailbox_hash {}",
return Ok(cached_payload); cached_payload.len(),
state.mailbox_hash
);
let (mailbox_exists, unseen) = {
let f = &state.uid_store.mailboxes.lock().await[&state.mailbox_hash];
(f.exists.clone(), f.unseen.clone())
};
unseen.lock().unwrap().insert_existing_set(
cached_payload
.iter()
.filter_map(|env| {
if !env.is_seen() {
Some(env.hash())
} else {
None
}
})
.collect(),
);
mailbox_exists.lock().unwrap().insert_existing_set(
cached_payload.iter().map(|env| env.hash()).collect::<_>(),
);
return Ok(cached_payload);
}
} }
state.stage = FetchStage::InitialFresh;
continue;
} }
FetchStage::ResyncCache => { FetchStage::ResyncCache => {
let mailbox_hash = state.mailbox_hash; let mailbox_hash = state.mailbox_hash;
@ -1526,6 +1581,7 @@ async fn fetch_hlpr(state: &mut FetchState) -> Result<Vec<Envelope>> {
ref connection, ref connection,
mailbox_hash, mailbox_hash,
ref uid_store, ref uid_store,
ref mut cache_handle,
} = state; } = state;
let mailbox_hash = *mailbox_hash; let mailbox_hash = *mailbox_hash;
let mut our_unseen: BTreeSet<EnvelopeHash> = BTreeSet::default(); let mut our_unseen: BTreeSet<EnvelopeHash> = BTreeSet::default();
@ -1608,17 +1664,21 @@ async fn fetch_hlpr(state: &mut FetchState) -> Result<Vec<Envelope>> {
} }
} }
} }
#[cfg(feature = "sqlite3")] if let Some(ref mut cache_handle) = cache_handle {
if uid_store.keep_offline_cache { if let Err(err) = debug!(cache_handle
let mut cache_handle = cache::Sqlite3Cache::get(uid_store.clone())?;
debug!(cache_handle
.insert_envelopes(mailbox_hash, &v) .insert_envelopes(mailbox_hash, &v)
.chain_err_summary(|| { .chain_err_summary(|| {
format!( format!(
"Could not save envelopes in cache for mailbox {}", "Could not save envelopes in cache for mailbox {}",
mailbox_path mailbox_path
) )
}))?; }))
{
(state.uid_store.event_consumer)(
state.uid_store.account_hash,
err.into(),
);
}
} }
for FetchResponse { for FetchResponse {

View File

@ -54,7 +54,8 @@ pub struct CachedEnvelope {
pub modsequence: Option<ModSequence>, pub modsequence: Option<ModSequence>,
} }
pub trait ImapCache: Send { pub trait ImapCache: Send + core::fmt::Debug {
fn reset(&mut self) -> Result<()>;
fn mailbox_state(&mut self, mailbox_hash: MailboxHash) -> Result<Option<()>>; fn mailbox_state(&mut self, mailbox_hash: MailboxHash) -> Result<Option<()>>;
fn find_envelope( fn find_envelope(
@ -69,6 +70,12 @@ pub trait ImapCache: Send {
refresh_events: &[(UID, RefreshEvent)], refresh_events: &[(UID, RefreshEvent)],
) -> Result<()>; ) -> Result<()>;
fn update_mailbox(
&mut self,
mailbox_hash: MailboxHash,
select_response: &SelectResponse,
) -> Result<()>;
fn insert_envelopes( fn insert_envelopes(
&mut self, &mut self,
mailbox_hash: MailboxHash, mailbox_hash: MailboxHash,
@ -99,6 +106,7 @@ mod sqlite3_m {
type Sqlite3UID = i32; type Sqlite3UID = i32;
#[derive(Debug)]
pub struct Sqlite3Cache { pub struct Sqlite3Cache {
connection: crate::sqlite3::Connection, connection: crate::sqlite3::Connection,
loaded_mailboxes: BTreeSet<MailboxHash>, loaded_mailboxes: BTreeSet<MailboxHash>,
@ -178,6 +186,10 @@ mod sqlite3_m {
} }
impl ImapCache for Sqlite3Cache { impl ImapCache for Sqlite3Cache {
fn reset(&mut self) -> Result<()> {
sqlite3::reset_db(&DB_DESCRIPTION, Some(self.uid_store.account_name.as_str()))
}
fn mailbox_state(&mut self, mailbox_hash: MailboxHash) -> Result<Option<()>> { fn mailbox_state(&mut self, mailbox_hash: MailboxHash) -> Result<Option<()>> {
if self.loaded_mailboxes.contains(&mailbox_hash) { if self.loaded_mailboxes.contains(&mailbox_hash) {
return Ok(Some(())); return Ok(Some(()));
@ -293,6 +305,64 @@ mod sqlite3_m {
Ok(()) Ok(())
} }
fn update_mailbox(
&mut self,
mailbox_hash: MailboxHash,
select_response: &SelectResponse,
) -> Result<()> {
if self.mailbox_state(mailbox_hash)?.is_none() {
return self.clear(mailbox_hash, select_response);
}
if let Some(Ok(highestmodseq)) = select_response.highestmodseq {
self.connection
.execute(
"UPDATE mailbox SET flags=?1, highestmodseq =?2 where mailbox_hash = ?3;",
sqlite3::params![
select_response
.flags
.1
.iter()
.map(|s| s.as_str())
.collect::<Vec<&str>>()
.join("\0")
.as_bytes(),
highestmodseq,
mailbox_hash as i64
],
)
.chain_err_summary(|| {
format!(
"Could not update mailbox {} in header_cache of account {}",
mailbox_hash, self.uid_store.account_name
)
})?;
} else {
self.connection
.execute(
"UPDATE mailbox SET flags=?1 where mailbox_hash = ?2;",
sqlite3::params![
select_response
.flags
.1
.iter()
.map(|s| s.as_str())
.collect::<Vec<&str>>()
.join("\0")
.as_bytes(),
mailbox_hash as i64
],
)
.chain_err_summary(|| {
format!(
"Could not update mailbox {} in header_cache of account {}",
mailbox_hash, self.uid_store.account_name
)
})?;
}
Ok(())
}
fn envelopes(&mut self, mailbox_hash: MailboxHash) -> Result<Option<Vec<EnvelopeHash>>> { fn envelopes(&mut self, mailbox_hash: MailboxHash) -> Result<Option<Vec<EnvelopeHash>>> {
debug!("envelopes mailbox_hash {}", mailbox_hash); debug!("envelopes mailbox_hash {}", mailbox_hash);
if debug!(self.mailbox_state(mailbox_hash)?.is_none()) { if debug!(self.mailbox_state(mailbox_hash)?.is_none()) {
@ -581,6 +651,7 @@ pub(super) async fn fetch_cached_envs(state: &mut FetchState) -> Result<Option<V
ref mut connection, ref mut connection,
mailbox_hash, mailbox_hash,
ref uid_store, ref uid_store,
cache_handle: _,
} = state; } = state;
debug!(uid_store.keep_offline_cache); debug!(uid_store.keep_offline_cache);
let mailbox_hash = *mailbox_hash; let mailbox_hash = *mailbox_hash;
@ -638,6 +709,8 @@ pub use default_m::*;
#[cfg(not(feature = "sqlite3"))] #[cfg(not(feature = "sqlite3"))]
mod default_m { mod default_m {
use super::*;
#[derive(Debug)]
pub struct DefaultCache; pub struct DefaultCache;
impl DefaultCache { impl DefaultCache {
@ -647,6 +720,10 @@ mod default_m {
} }
impl ImapCache for DefaultCache { impl ImapCache for DefaultCache {
fn reset(&mut self) -> Result<()> {
Err(MeliError::new("melib is not built with any imap cache").set_kind(ErrorKind::Bug))
}
fn mailbox_state(&mut self, _mailbox_hash: MailboxHash) -> Result<Option<()>> { fn mailbox_state(&mut self, _mailbox_hash: MailboxHash) -> Result<Option<()>> {
Err(MeliError::new("melib is not built with any imap cache").set_kind(ErrorKind::Bug)) Err(MeliError::new("melib is not built with any imap cache").set_kind(ErrorKind::Bug))
} }
@ -671,6 +748,14 @@ mod default_m {
Err(MeliError::new("melib is not built with any imap cache").set_kind(ErrorKind::Bug)) Err(MeliError::new("melib is not built with any imap cache").set_kind(ErrorKind::Bug))
} }
fn update_mailbox(
&mut self,
_mailbox_hash: MailboxHash,
_select_response: &SelectResponse,
) -> Result<()> {
Err(MeliError::new("melib is not built with any imap cache").set_kind(ErrorKind::Bug))
}
fn update( fn update(
&mut self, &mut self,
_mailbox_hash: MailboxHash, _mailbox_hash: MailboxHash,
@ -686,5 +771,13 @@ mod default_m {
) -> Result<Option<CachedEnvelope>> { ) -> Result<Option<CachedEnvelope>> {
Err(MeliError::new("melib is not built with any imap cache").set_kind(ErrorKind::Bug)) Err(MeliError::new("melib is not built with any imap cache").set_kind(ErrorKind::Bug))
} }
fn rfc822(
&mut self,
_identifier: std::result::Result<UID, EnvelopeHash>,
_mailbox_hash: MailboxHash,
) -> Result<Option<Vec<u8>>> {
Err(MeliError::new("melib is not built with any imap cache").set_kind(ErrorKind::Bug))
}
} }
} }

View File

@ -37,8 +37,6 @@ impl ImapConnection {
return Ok(None); return Ok(None);
} }
self.select_mailbox(mailbox_hash, &mut String::new(), false)
.await?;
match self.sync_policy { match self.sync_policy {
SyncPolicy::None => Ok(None), SyncPolicy::None => Ok(None),
SyncPolicy::Basic => self.resync_basic(cache_handle, mailbox_hash).await, SyncPolicy::Basic => self.resync_basic(cache_handle, mailbox_hash).await,
@ -87,15 +85,10 @@ impl ImapConnection {
debug!("build_cache {}", mailbox_hash); debug!("build_cache {}", mailbox_hash);
let mut response = String::with_capacity(8 * 1024); let mut response = String::with_capacity(8 * 1024);
// 1 get uidvalidity, highestmodseq // 1 get uidvalidity, highestmodseq
self.select_mailbox(mailbox_hash, &mut response, true) let select_response = self
.await?; .select_mailbox(mailbox_hash, &mut response, true)
let select_response = .await?
protocol_parser::select_response(&response).chain_err_summary(|| { .unwrap();
format!(
"Could not parse select response for mailbox {}",
mailbox_hash
)
})?;
self.uid_store self.uid_store
.uidvalidity .uidvalidity
.lock() .lock()
@ -159,9 +152,10 @@ impl ImapConnection {
let mut new_unseen = BTreeSet::default(); let mut new_unseen = BTreeSet::default();
debug!("current_uidvalidity is {}", current_uidvalidity); debug!("current_uidvalidity is {}", current_uidvalidity);
debug!("max_uid is {}", max_uid); debug!("max_uid is {}", max_uid);
self.select_mailbox(mailbox_hash, &mut response, true) let select_response = self
.await?; .select_mailbox(mailbox_hash, &mut response, true)
let select_response = protocol_parser::select_response(&response)?; .await?
.unwrap();
debug!( debug!(
"select_response.uidvalidity is {}", "select_response.uidvalidity is {}",
select_response.uidvalidity select_response.uidvalidity
@ -171,6 +165,7 @@ impl ImapConnection {
cache_handle.clear(mailbox_hash, &select_response)?; cache_handle.clear(mailbox_hash, &select_response)?;
return Ok(None); return Ok(None);
} }
cache_handle.update_mailbox(mailbox_hash, &select_response)?;
// 2. tag1 UID FETCH <lastseenuid+1>:* <descriptors> // 2. tag1 UID FETCH <lastseenuid+1>:* <descriptors>
self.send_command( self.send_command(
@ -403,9 +398,10 @@ impl ImapConnection {
debug!("current_uidvalidity is {}", cached_uidvalidity); debug!("current_uidvalidity is {}", cached_uidvalidity);
debug!("max_uid is {}", cached_max_uid); debug!("max_uid is {}", cached_max_uid);
// 1. check UIDVALIDITY. If fail, discard cache and rebuild // 1. check UIDVALIDITY. If fail, discard cache and rebuild
self.select_mailbox(mailbox_hash, &mut response, true) let select_response = self
.await?; .select_mailbox(mailbox_hash, &mut response, true)
let select_response = protocol_parser::select_response(&response)?; .await?
.unwrap();
debug!( debug!(
"select_response.uidvalidity is {}", "select_response.uidvalidity is {}",
select_response.uidvalidity select_response.uidvalidity
@ -436,6 +432,7 @@ impl ImapConnection {
} }
return self.resync_basic(cache_handle, mailbox_hash).await; return self.resync_basic(cache_handle, mailbox_hash).await;
} }
cache_handle.update_mailbox(mailbox_hash, &select_response)?;
let new_highestmodseq = select_response.highestmodseq.unwrap().unwrap(); let new_highestmodseq = select_response.highestmodseq.unwrap().unwrap();
let mut refresh_events = vec![]; let mut refresh_events = vec![];
// 1b) Check the mailbox HIGHESTMODSEQ. // 1b) Check the mailbox HIGHESTMODSEQ.
@ -662,8 +659,7 @@ impl ImapConnection {
* returns READ-ONLY for both cases) */ * returns READ-ONLY for both cases) */
let mut select_response = self let mut select_response = self
.select_mailbox(mailbox_hash, &mut response, true) .select_mailbox(mailbox_hash, &mut response, true)
.await .await?
.chain_err_summary(|| format!("Could not select mailbox {}", mailbox_path))?
.unwrap(); .unwrap();
debug!( debug!(
"mailbox: {} select_response: {:?}", "mailbox: {} select_response: {:?}",

View File

@ -762,7 +762,9 @@ impl ImapConnection {
self.read_response(ret, RequiredResponses::SELECT_REQUIRED) self.read_response(ret, RequiredResponses::SELECT_REQUIRED)
.await?; .await?;
debug!("select response {}", ret); debug!("select response {}", ret);
let select_response = protocol_parser::select_response(&ret)?; let select_response = protocol_parser::select_response(&ret).chain_err_summary(|| {
format!("Could not parse select response for mailbox {}", imap_path)
})?;
{ {
if self.uid_store.keep_offline_cache { if self.uid_store.keep_offline_cache {
#[cfg(not(feature = "sqlite3"))] #[cfg(not(feature = "sqlite3"))]

View File

@ -96,6 +96,28 @@ pub fn open_or_create_db(
Ok(conn) Ok(conn)
} }
/// Return database to a clean slate.
pub fn reset_db(description: &DatabaseDescription, identifier: Option<&str>) -> Result<()> {
let db_path = if let Some(id) = identifier {
db_path(&format!("{}_{}", id, description.name))
} else {
db_path(description.name)
}?;
if !db_path.exists() {
return Ok(());
}
log(
format!(
"Resetting {} database in {}",
description.name,
db_path.display()
),
crate::INFO,
);
std::fs::remove_file(&db_path)?;
Ok(())
}
impl ToSql for Envelope { impl ToSql for Envelope {
fn to_sql(&self) -> rusqlite::Result<ToSqlOutput> { fn to_sql(&self) -> rusqlite::Result<ToSqlOutput> {
let v: Vec<u8> = bincode::serialize(self).map_err(|e| { let v: Vec<u8> = bincode::serialize(self).map_err(|e| {