diff --git a/meli/src/accounts.rs b/meli/src/accounts.rs index 8dc341f2..f2b34c4f 100644 --- a/meli/src/accounts.rs +++ b/meli/src/accounts.rs @@ -552,7 +552,7 @@ impl Account { #[cfg(feature = "sqlite3")] if settings.conf.search_backend == crate::conf::SearchBackend::Sqlite3 { - let db_path = match crate::sqlite3::db_path() { + let db_path = match crate::sqlite3::AccountCache::db_path(&name) { Err(err) => { main_loop_handler.send(ThreadEvent::UIEvent(UIEvent::StatusEvent( StatusEvent::DisplayMessage(format!( @@ -872,9 +872,9 @@ impl Account { }; #[cfg(feature = "sqlite3")] if self.settings.conf.search_backend == crate::conf::SearchBackend::Sqlite3 { - let handle = self.main_loop_handler.job_executor.spawn_blocking( + let handle = self.main_loop_handler.job_executor.spawn_specialized( "sqlite3::insert".into(), - crate::sqlite3::insert( + crate::sqlite3::AccountCache::insert( (*envelope).clone(), self.backend.clone(), self.name.clone(), @@ -951,15 +951,18 @@ impl Account { } #[cfg(feature = "sqlite3")] if self.settings.conf.search_backend == crate::conf::SearchBackend::Sqlite3 { - if let Err(err) = crate::sqlite3::remove(env_hash) { - let envelopes = self.collection.envelopes.read().unwrap(); - log::error!( - "Failed to remove envelope {} [{}] in cache: {}", - &envelopes[&env_hash].message_id_display(), - env_hash, - err - ); - } + let fut = crate::sqlite3::AccountCache::remove(self.name.clone(), env_hash); + let handle = self + .main_loop_handler + .job_executor + .spawn_specialized("remove envelope from cache".into(), fut); + self.insert_job( + handle.job_id, + JobRequest::Refresh { + mailbox_hash, + handle, + }, + ); } let thread_hash = self.collection.get_env(env_hash).thread(); if !self @@ -1643,7 +1646,9 @@ impl Account { let query = melib::search::Query::try_from(search_term)?; match self.settings.conf.search_backend { #[cfg(feature = "sqlite3")] - crate::conf::SearchBackend::Sqlite3 => crate::sqlite3::search(&query, _sort), + crate::conf::SearchBackend::Sqlite3 => Ok(Box::pin( + crate::sqlite3::AccountCache::search(self.name.clone(), query, _sort), + )), crate::conf::SearchBackend::Auto | crate::conf::SearchBackend::None => { if self.backend_capabilities.supports_search { self.backend diff --git a/meli/src/accounts/backend_ops.rs b/meli/src/accounts/backend_ops.rs index a29a45e0..ca0dae85 100644 --- a/meli/src/accounts/backend_ops.rs +++ b/meli/src/accounts/backend_ops.rs @@ -60,28 +60,31 @@ impl Account { pub(super) fn update_cached_env(&mut self, env: Envelope, old_hash: Option) { if self.settings.conf.search_backend == crate::conf::SearchBackend::Sqlite3 { let msg_id = env.message_id_display().to_string(); - match crate::sqlite3::remove(old_hash.unwrap_or_else(|| env.hash())) - .map(|_| crate::sqlite3::insert(env, self.backend.clone(), self.name.clone())) - { - Ok(job) => { - let handle = self - .main_loop_handler - .job_executor - .spawn_blocking("sqlite3::remove".into(), job); - self.insert_job( - handle.job_id, - JobRequest::Generic { - name: format!("Update envelope {} in sqlite3 cache", msg_id).into(), - handle, - log_level: LogLevel::TRACE, - on_finish: None, - }, - ); - } - Err(err) => { - log::error!("Failed to update envelope {} in cache: {}", msg_id, err); - } - } + let name = self.name.clone(); + let backend = self.backend.clone(); + let fut = async move { + crate::sqlite3::AccountCache::remove( + name.clone(), + old_hash.unwrap_or_else(|| env.hash()), + ) + .await?; + + crate::sqlite3::AccountCache::insert(env, backend, name).await?; + Ok(()) + }; + let handle = self + .main_loop_handler + .job_executor + .spawn_specialized("sqlite3::remove".into(), fut); + self.insert_job( + handle.job_id, + JobRequest::Generic { + name: format!("Update envelope {} in sqlite3 cache", msg_id).into(), + handle, + log_level: LogLevel::TRACE, + on_finish: None, + }, + ); } } } diff --git a/meli/src/jobs.rs b/meli/src/jobs.rs index 23240e18..2bc1fffc 100644 --- a/meli/src/jobs.rs +++ b/meli/src/jobs.rs @@ -234,6 +234,7 @@ impl JobExecutor { } /// Spawns a future with a generic return value `R` + #[inline(always)] pub fn spawn_specialized(&self, desc: Cow<'static, str>, future: F) -> JoinHandle where F: Future + Send + 'static, @@ -295,6 +296,7 @@ impl JobExecutor { /// Spawns a future with a generic return value `R` that might block on a /// new thread + #[inline(always)] pub fn spawn_blocking(&self, desc: Cow<'static, str>, future: F) -> JoinHandle where F: Future + Send + 'static, diff --git a/meli/src/mail/status.rs b/meli/src/mail/status.rs index c59f48fd..68ecd01e 100644 --- a/meli/src/mail/status.rs +++ b/meli/src/mail/status.rs @@ -178,10 +178,9 @@ impl AccountStatus { } #[cfg(feature = "sqlite3")] (SearchBackend::Sqlite3, _) => { - if let Ok(path) = crate::sqlite3::db_path() { - format!("sqlite3 database {}", path.display()) - } else { - "sqlite3 database".to_string() + match crate::sqlite3::AccountCache::db_path(&a.name) { + Ok(path) => format!("sqlite3 database: {}", path.display()), + Err(err) => format!("sqlite3 error: {err}"), } } }, diff --git a/meli/src/sqlite3.rs b/meli/src/sqlite3.rs index de238bb9..842898db 100644 --- a/meli/src/sqlite3.rs +++ b/meli/src/sqlite3.rs @@ -26,22 +26,23 @@ use std::{ }; use melib::{ - backends::{MailBackend, ResultFuture}, + backends::MailBackend, email::{Envelope, EnvelopeHash}, log, search::{ escape_double_quote, Query::{self, *}, }, - utils::sqlite3::{self as melib_sqlite3, rusqlite::params, DatabaseDescription}, - Error, Result, SortField, SortOrder, + smol, + utils::sqlite3::{rusqlite::params, DatabaseDescription}, + Error, Result, ResultIntoError, SortField, SortOrder, }; use smallvec::SmallVec; -use crate::melib::ResultIntoError; - const DB: DatabaseDescription = DatabaseDescription { name: "index.db", + identifier: None, + application_prefix: "meli", init_script: Some( "CREATE TABLE IF NOT EXISTS envelopes ( id INTEGER PRIMARY KEY, @@ -113,10 +114,6 @@ END; ", version: 1, }; -pub fn db_path() -> Result { - melib_sqlite3::db_path(DB.name) -} - //#[inline(always)] //fn fts5_bareword(w: &str) -> Cow { // if w == "AND" || w == "OR" || w == "NOT" { @@ -140,152 +137,192 @@ pub fn db_path() -> Result { // } //} // -// -pub async fn insert( - envelope: Envelope, - backend: Arc>>, - acc_name: String, -) -> Result<()> { - let db_path = db_path()?; - if !db_path.exists() { - return Err(Error::new( - "Database hasn't been initialised. Run `reindex` command", - )); - } - let conn = melib_sqlite3::open_db(db_path)?; +pub struct AccountCache; - let op = backend - .read() - .unwrap() - .operation(envelope.hash())? - .as_bytes()?; +impl AccountCache { + pub async fn insert( + envelope: Envelope, + backend: Arc>>, + acc_name: String, + ) -> Result<()> { + let db_desc = DatabaseDescription { + identifier: Some(acc_name.clone().into()), + ..DB.clone() + }; - let body = match op.await.map(|bytes| envelope.body_bytes(&bytes)) { - Ok(body) => body.text(), - Err(err) => { - log::error!( - "Failed to open envelope {}: {err}", - envelope.message_id_display(), - ); - return Err(err); + if !db_desc.exists().unwrap_or(false) { + return Err(Error::new(format!( + "Database hasn't been initialised. Run `reindex {acc_name}` command" + ))); } - }; - if let Err(err) = conn.execute( - "INSERT OR IGNORE INTO accounts (name) VALUES (?1)", - params![acc_name,], - ) { - log::error!( - "Failed to insert envelope {}: {err}", - envelope.message_id_display(), - ); - return Err(Error::new(err.to_string())); - } - let account_id: i32 = { - let mut stmt = conn - .prepare("SELECT id FROM accounts WHERE name = ?") - .unwrap(); - let x = stmt - .query_map(params![acc_name], |row| row.get(0)) + let op = backend + .read() .unwrap() - .next() - .unwrap() - .unwrap(); - x - }; - if let Err(err) = conn - .execute( - "INSERT OR REPLACE INTO envelopes (account_id, hash, date, _from, _to, cc, bcc, \ - subject, message_id, in_reply_to, _references, flags, has_attachments, body_text, \ - timestamp) + .operation(envelope.hash())? + .as_bytes()?; + + let body = match op.await.map(|bytes| envelope.body_bytes(&bytes)) { + Ok(body) => body.text(), + Err(err) => { + log::error!( + "Failed to open envelope {}: {err}", + envelope.message_id_display(), + ); + return Err(err); + } + }; + smol::unblock(move || { + let mut conn = db_desc.open_or_create_db()?; + + let tx = + conn.transaction_with_behavior(melib::rusqlite::TransactionBehavior::Immediate)?; + if let Err(err) = tx.execute( + "INSERT OR IGNORE INTO accounts (name) VALUES (?1)", + params![acc_name,], + ) { + log::error!( + "Failed to insert envelope {}: {err}", + envelope.message_id_display(), + ); + return Err(Error::new(err.to_string())); + } + let account_id: i32 = { + let mut stmt = tx + .prepare("SELECT id FROM accounts WHERE name = ?") + .unwrap(); + let x = stmt + .query_map(params![acc_name], |row| row.get(0)) + .unwrap() + .next() + .unwrap() + .unwrap(); + x + }; + if let Err(err) = tx + .execute( + "INSERT OR REPLACE INTO envelopes (account_id, hash, date, _from, _to, cc, \ + bcc, subject, message_id, in_reply_to, _references, flags, has_attachments, \ + body_text, timestamp) VALUES (?1, ?2, ?3, ?4, ?5, ?6, ?7, ?8, ?9, ?10, ?11, ?12, ?13, ?14, ?15)", - params![ - account_id, - envelope.hash().to_be_bytes().to_vec(), - envelope.date_as_str(), - envelope.field_from_to_string(), - envelope.field_to_to_string(), - envelope.field_cc_to_string(), - envelope.field_bcc_to_string(), - envelope.subject().into_owned().trim_end_matches('\u{0}'), - envelope.message_id_display().to_string(), - envelope - .in_reply_to_display() - .map(|f| f.to_string()) - .unwrap_or_default(), - envelope.field_references_to_string(), - i64::from(envelope.flags().bits()), - i32::from(envelope.has_attachments()), - body, - envelope.date().to_be_bytes().to_vec() - ], - ) - .map_err(|e| Error::new(e.to_string())) - { - log::error!( - "Failed to insert envelope {}: {err}", - envelope.message_id_display(), - ); - } - Ok(()) -} - -pub fn remove(env_hash: EnvelopeHash) -> Result<()> { - let db_path = db_path()?; - if !db_path.exists() { - return Err(Error::new( - "Database hasn't been initialised. Run `reindex` command", - )); + params![ + account_id, + envelope.hash().to_be_bytes().to_vec(), + envelope.date_as_str(), + envelope.field_from_to_string(), + envelope.field_to_to_string(), + envelope.field_cc_to_string(), + envelope.field_bcc_to_string(), + envelope.subject().into_owned().trim_end_matches('\u{0}'), + envelope.message_id_display().to_string(), + envelope + .in_reply_to_display() + .map(|f| f.to_string()) + .unwrap_or_default(), + envelope.field_references_to_string(), + i64::from(envelope.flags().bits()), + i32::from(envelope.has_attachments()), + body, + envelope.date().to_be_bytes().to_vec() + ], + ) + .map_err(|e| Error::new(e.to_string())) + { + drop(tx); + log::error!( + "Failed to insert envelope {}: {err}", + envelope.message_id_display(), + ); + } else { + tx.commit()?; + } + Ok(()) + }) + .await?; + Ok(()) } - let conn = melib_sqlite3::open_db(db_path)?; - if let Err(err) = conn - .execute( - "DELETE FROM envelopes WHERE hash = ?", - params![env_hash.to_be_bytes().to_vec(),], - ) - .map_err(|e| Error::new(e.to_string())) - { - log::error!("Failed to remove envelope {env_hash}: {err}"); - return Err(err); + pub async fn remove(acc_name: String, env_hash: EnvelopeHash) -> Result<()> { + let db_desc = DatabaseDescription { + identifier: Some(acc_name.into()), + ..DB.clone() + }; + let db_path = db_desc.db_path()?; + if !db_path.exists() { + return Err(Error::new( + "Database hasn't been initialised. Run `reindex {acc_name}` command", + )); + } + + smol::unblock(move || { + let mut conn = db_desc.open_or_create_db()?; + let tx = + conn.transaction_with_behavior(melib::rusqlite::TransactionBehavior::Immediate)?; + if let Err(err) = tx + .execute( + "DELETE FROM envelopes WHERE hash = ?", + params![env_hash.to_be_bytes().to_vec(),], + ) + .map_err(|e| Error::new(e.to_string())) + { + drop(tx); + log::error!("Failed to remove envelope {env_hash}: {err}"); + return Err(err); + } + tx.commit()?; + Ok(()) + }) + .await?; + Ok(()) } - Ok(()) -} -pub fn index(context: &crate::state::Context, account_index: usize) -> ResultFuture<()> { - let account = &context.accounts[account_index]; - let (acc_name, acc_mutex, backend_mutex): (String, Arc>, Arc<_>) = ( - account.name().to_string(), - account.collection.envelopes.clone(), - account.backend.clone(), - ); - let conn = melib_sqlite3::open_or_create_db(&DB, Some(acc_name.as_str()))?; - let env_hashes = acc_mutex - .read() - .unwrap() - .keys() - .cloned() - .collect::>(); + pub async fn index( + acc_name: Arc, + collection: melib::Collection, + backend_mutex: Arc>>, + ) -> Result<()> { + let acc_mutex = collection.envelopes.clone(); + let db_desc = Arc::new(DatabaseDescription { + identifier: Some(acc_name.to_string().into()), + ..DB.clone() + }); + let env_hashes = acc_mutex + .read() + .unwrap() + .keys() + .cloned() + .collect::>(); - /* Sleep, index and repeat in order not to block the main process */ - Ok(Box::pin(async move { - conn.execute( - "INSERT OR REPLACE INTO accounts (name) VALUES (?1)", - params![acc_name.as_str(),], - ) - .chain_err_summary(|| "Failed to update index:")?; + /* Sleep, index and repeat in order not to block the main process */ let account_id: i32 = { - let mut stmt = conn - .prepare("SELECT id FROM accounts WHERE name = ?") - .unwrap(); - let x = stmt - .query_map(params![acc_name.as_str()], |row| row.get(0)) - .unwrap() - .next() - .unwrap() - .unwrap(); - x + let acc_name = Arc::clone(&acc_name); + let db_desc = Arc::clone(&db_desc); + smol::unblock(move || { + let mut conn = db_desc.open_or_create_db()?; + let tx = conn + .transaction_with_behavior(melib::rusqlite::TransactionBehavior::Immediate)?; + tx.execute( + "INSERT OR REPLACE INTO accounts (name) VALUES (?1)", + params![acc_name.as_str(),], + ) + .chain_err_summary(|| "Failed to update index:")?; + let account_id = { + let mut stmt = tx + .prepare("SELECT id FROM accounts WHERE name = ?") + .unwrap(); + let x = stmt + .query_map(params![acc_name.as_str()], |row| row.get(0)) + .unwrap() + .next() + .unwrap() + .unwrap(); + x + }; + tx.commit()?; + Ok::(account_id) + }) + .await? }; let mut ctr = 0; log::trace!( @@ -296,90 +333,133 @@ pub fn index(context: &crate::state::Context, account_index: usize) -> ResultFut ); for chunk in env_hashes.chunks(200) { ctr += chunk.len(); - for env_hash in chunk { - let mut op = backend_mutex.read().unwrap().operation(*env_hash)?; + let mut chunk_bytes = Vec::with_capacity(chunk.len()); + for &env_hash in chunk { + let mut op = backend_mutex.read().unwrap().operation(env_hash)?; let bytes = op .as_bytes()? .await .chain_err_summary(|| format!("Failed to open envelope {}", env_hash))?; - let envelopes_lck = acc_mutex.read().unwrap(); - if let Some(e) = envelopes_lck.get(env_hash) { - let body = e.body_bytes(&bytes).text().replace('\0', ""); - conn.execute( - "INSERT OR REPLACE INTO envelopes (account_id, hash, date, _from, _to, \ - cc, bcc, subject, message_id, in_reply_to, _references, flags, \ - has_attachments, body_text, timestamp) - VALUES (?1, ?2, ?3, ?4, ?5, ?6, ?7, ?8, ?9, ?10, ?11, ?12, ?13, ?14, ?15)", - params![ - account_id, - e.hash().to_be_bytes().to_vec(), - e.date_as_str(), - e.field_from_to_string(), - e.field_to_to_string(), - e.field_cc_to_string(), - e.field_bcc_to_string(), - e.subject().into_owned().trim_end_matches('\u{0}'), - e.message_id_display().to_string(), - e.in_reply_to_display() - .map(|f| f.to_string()) - .unwrap_or_default(), - e.field_references_to_string(), - i64::from(e.flags().bits()), - i32::from(e.has_attachments()), - body, - e.date().to_be_bytes().to_vec() - ], - ) - .chain_err_summary(|| { - format!("Failed to insert envelope {}", e.message_id_display()) - })?; - } + chunk_bytes.push((env_hash, bytes)); } - let sleep_dur = std::time::Duration::from_millis(20); - std::thread::sleep(sleep_dur); + { + let acc_mutex = acc_mutex.clone(); + let db_desc = Arc::clone(&db_desc); + smol::unblock(move || { + let mut conn = db_desc.open_or_create_db()?; + let tx = conn.transaction_with_behavior( + melib::rusqlite::TransactionBehavior::Immediate, + )?; + let envelopes_lck = acc_mutex.read().unwrap(); + for (env_hash, bytes) in chunk_bytes { + if let Some(e) = envelopes_lck.get(&env_hash) { + let body = e.body_bytes(&bytes).text().replace('\0', ""); + tx.execute( + "INSERT OR REPLACE INTO envelopes (account_id, hash, date, _from, \ + _to, cc, bcc, subject, message_id, in_reply_to, _references, \ + flags, has_attachments, body_text, timestamp) + VALUES (?1, ?2, ?3, ?4, ?5, ?6, ?7, ?8, ?9, ?10, ?11, ?12, ?13, ?14, ?15)", + params![ + account_id, + e.hash().to_be_bytes().to_vec(), + e.date_as_str(), + e.field_from_to_string(), + e.field_to_to_string(), + e.field_cc_to_string(), + e.field_bcc_to_string(), + e.subject().into_owned().trim_end_matches('\u{0}'), + e.message_id_display().to_string(), + e.in_reply_to_display() + .map(|f| f.to_string()) + .unwrap_or_default(), + e.field_references_to_string(), + i64::from(e.flags().bits()), + i32::from(e.has_attachments()), + body, + e.date().to_be_bytes().to_vec() + ], + ) + .chain_err_summary(|| { + format!("Failed to insert envelope {}", e.message_id_display()) + })?; + } + } + tx.commit()?; + Ok::<(), Error>(()) + }) + .await?; + } + let sleep_dur = std::time::Duration::from_millis(50); + smol::Timer::after(sleep_dur).await; } Ok(()) - })) -} - -pub fn search( - query: &Query, - (sort_field, sort_order): (SortField, SortOrder), -) -> ResultFuture> { - let db_path = db_path()?; - if !db_path.exists() { - return Err(Error::new( - "Database hasn't been initialised. Run `reindex` command", - )); } - let conn = melib_sqlite3::open_db(db_path)?; + pub async fn search( + acc_name: String, + query: Query, + (sort_field, sort_order): (SortField, SortOrder), + ) -> Result> { + let db_desc = DatabaseDescription { + identifier: Some(acc_name.clone().into()), + ..DB.clone() + }; - let sort_field = match sort_field { - SortField::Subject => "subject", - SortField::Date => "timestamp", - }; + if !db_desc.exists().unwrap_or(false) { + return Err(Error::new(format!( + "Database hasn't been initialised for account `{}`. Run `reindex` command to \ + build an index.", + acc_name + ))); + } + let query = query_to_sql(&query); - let sort_order = match sort_order { - SortOrder::Asc => "ASC", - SortOrder::Desc => "DESC", - }; + smol::unblock(move || { + let mut conn = db_desc.open_or_create_db()?; - let mut stmt = conn - .prepare(&format!( - "SELECT hash FROM envelopes WHERE {} ORDER BY {} {};", - query_to_sql(query), - sort_field, - sort_order - )) - .map_err(|e| Error::new(e.to_string()))?; + let sort_field = match sort_field { + SortField::Subject => "subject", + SortField::Date => "timestamp", + }; - let results = stmt - .query_map([], |row| row.get::<_, EnvelopeHash>(0)) - .map_err(Error::from)? - .map(|item| item.map_err(Error::from)) - .collect::>>(); - Ok(Box::pin(async { results })) + let sort_order = match sort_order { + SortOrder::Asc => "ASC", + SortOrder::Desc => "DESC", + }; + + let tx = conn.transaction()?; + let mut stmt = tx + .prepare(&format!( + "SELECT hash FROM envelopes WHERE {} ORDER BY {} {};", + query, sort_field, sort_order + )) + .map_err(|e| Error::new(e.to_string()))?; + + #[allow(clippy::let_and_return)] // false positive, the let binding is needed + // for the temporary to live long enough + let x = stmt + .query_map([], |row| row.get::<_, EnvelopeHash>(0)) + .map_err(Error::from)? + .map(|item| item.map_err(Error::from)) + .collect::>>(); + x + }) + .await + } + + pub fn db_path(acc_name: &str) -> Result { + let db_desc = DatabaseDescription { + identifier: Some(acc_name.to_string().into()), + ..DB.clone() + }; + let db_path = db_desc.db_path()?; + if !db_path.exists() { + return Err(Error::new( + "Database hasn't been initialised. Run `reindex {acc_name}` command", + )); + } + Ok(db_path) + } } /// Translates a `Query` to an Sqlite3 expression in a `String`. diff --git a/meli/src/state.rs b/meli/src/state.rs index d374d76a..5c8e793d 100644 --- a/meli/src/state.rs +++ b/meli/src/state.rs @@ -799,38 +799,36 @@ impl State { }); return; } - match crate::sqlite3::index(&self.context, account_index) { - Ok(job) => { - let handle = self - .context - .main_loop_handler - .job_executor - .spawn_blocking("sqlite3::index".into(), job); - self.context.accounts[account_index].active_jobs.insert( - handle.job_id, - crate::accounts::JobRequest::Generic { - name: "Message index rebuild".into(), - handle, - on_finish: None, - log_level: LogLevel::INFO, - }, - ); - self.context.replies.push_back(UIEvent::Notification { - title: None, - source: None, - body: "Message index rebuild started.".into(), - kind: Some(NotificationType::Info), - }); - } - Err(err) => { - self.context.replies.push_back(UIEvent::Notification { - title: Some("Message index rebuild failed".into()), - source: None, - body: err.to_string().into(), - kind: Some(NotificationType::Error(err.kind)), - }); - } - } + let account = &self.context.accounts[account_index]; + let (acc_name, backend_mutex): (Arc, Arc<_>) = ( + Arc::new(account.name().to_string()), + account.backend.clone(), + ); + let job = crate::sqlite3::AccountCache::index( + acc_name, + account.collection.clone(), + backend_mutex, + ); + let handle = self + .context + .main_loop_handler + .job_executor + .spawn_specialized("sqlite3::index".into(), job); + self.context.accounts[account_index].active_jobs.insert( + handle.job_id, + crate::accounts::JobRequest::Generic { + name: "Message index rebuild".into(), + handle, + on_finish: None, + log_level: LogLevel::INFO, + }, + ); + self.context.replies.push_back(UIEvent::Notification { + title: None, + source: None, + body: "Message index rebuild started.".into(), + kind: Some(NotificationType::Info), + }); } #[cfg(not(feature = "sqlite3"))] AccountAction(_, ReIndex) => { diff --git a/melib/src/imap/cache.rs b/melib/src/imap/cache.rs index e658c71b..1c51501c 100644 --- a/melib/src/imap/cache.rs +++ b/melib/src/imap/cache.rs @@ -19,16 +19,20 @@ * along with meli. If not, see . */ -use super::*; -pub mod sync; use std::convert::TryFrom; +use super::*; use crate::{ backends::MailboxHash, email::{Envelope, EnvelopeHash}, error::*, }; +pub mod ram_cache; +#[cfg(feature = "sqlite3")] +pub mod sqlite3_cache; +pub mod sync; + #[derive(Clone, Copy, Debug, Eq, Hash, Ord, PartialEq, PartialOrd)] pub struct ModSequence(pub std::num::NonZeroU64); @@ -107,690 +111,6 @@ pub trait ImapCacheReset: Send + std::fmt::Debug { Self: Sized; } -#[cfg(feature = "sqlite3")] -pub use sqlite3_m::*; - -#[cfg(feature = "sqlite3")] -pub mod sqlite3_m { - use super::*; - use crate::utils::sqlite3::{ - self, - rusqlite::types::{FromSql, FromSqlError, FromSqlResult, ToSql, ToSqlOutput, Value}, - Connection, DatabaseDescription, - }; - - type Sqlite3UID = i32; - - #[derive(Debug)] - pub struct Sqlite3Cache { - connection: Connection, - loaded_mailboxes: BTreeSet, - uid_store: Arc, - } - - const DB_DESCRIPTION: DatabaseDescription = DatabaseDescription { - name: "header_cache.db", - init_script: Some( - "PRAGMA foreign_keys = true; - PRAGMA encoding = 'UTF-8'; - - CREATE TABLE IF NOT EXISTS envelopes ( - hash INTEGER NOT NULL, - mailbox_hash INTEGER NOT NULL, - uid INTEGER NOT NULL, - modsequence INTEGER, - rfc822 BLOB, - envelope BLOB NOT NULL, - PRIMARY KEY (mailbox_hash, uid), - FOREIGN KEY (mailbox_hash) REFERENCES mailbox(mailbox_hash) ON DELETE CASCADE - ); - CREATE TABLE IF NOT EXISTS mailbox ( - mailbox_hash INTEGER UNIQUE, - uidvalidity INTEGER, - flags BLOB NOT NULL, - highestmodseq INTEGER, - PRIMARY KEY (mailbox_hash) - ); - CREATE INDEX IF NOT EXISTS envelope_uid_idx ON envelopes(mailbox_hash, uid); - CREATE INDEX IF NOT EXISTS envelope_idx ON envelopes(hash); - CREATE INDEX IF NOT EXISTS mailbox_idx ON mailbox(mailbox_hash);", - ), - version: 3, - }; - - impl From for Value { - fn from(env_hash: EnvelopeHash) -> Self { - (env_hash.0 as i64).into() - } - } - - impl ToSql for ModSequence { - fn to_sql(&self) -> rusqlite::Result { - Ok(ToSqlOutput::from(self.0.get() as i64)) - } - } - - impl FromSql for ModSequence { - fn column_result(value: rusqlite::types::ValueRef) -> FromSqlResult { - let i: i64 = FromSql::column_result(value)?; - if i == 0 { - return Err(FromSqlError::OutOfRange(0)); - } - Ok(Self::try_from(i).unwrap()) - } - } - - impl Sqlite3Cache { - pub fn get(uid_store: Arc) -> Result> { - let connection = - match sqlite3::open_or_create_db(&DB_DESCRIPTION, Some(&uid_store.account_name)) { - Ok(c) => Ok(c), - Err(err) => { - // try resetting database on error, but only one time. - if Self::reset_db(&uid_store).is_ok() { - sqlite3::open_or_create_db( - &DB_DESCRIPTION, - Some(&uid_store.account_name), - ) - } else { - Err(err) - } - } - }?; - - Ok(Box::new(Self { - connection, - loaded_mailboxes: BTreeSet::default(), - uid_store, - })) - } - - fn max_uid(&self, mailbox_hash: MailboxHash) -> Result { - let mut stmt = self - .connection - .prepare("SELECT MAX(uid) FROM envelopes WHERE mailbox_hash = ?1;")?; - - let mut ret: Vec = stmt - .query_map(sqlite3::params![mailbox_hash], |row| { - row.get(0).map(|i: Sqlite3UID| i as UID) - })? - .collect::>()?; - Ok(ret.pop().unwrap_or(0)) - } - } - - impl ImapCacheReset for Sqlite3Cache { - fn reset_db(uid_store: &UIDStore) -> Result<()> { - sqlite3::reset_db(&DB_DESCRIPTION, Some(&uid_store.account_name)) - } - } - - impl ImapCache for Sqlite3Cache { - fn reset(&mut self) -> Result<()> { - Self::reset_db(&self.uid_store) - } - - fn mailbox_state(&mut self, mailbox_hash: MailboxHash) -> Result> { - if self.loaded_mailboxes.contains(&mailbox_hash) { - return Ok(Some(())); - } - debug!("loading mailbox state {} from cache", mailbox_hash); - let mut stmt = self.connection.prepare( - "SELECT uidvalidity, flags, highestmodseq FROM mailbox WHERE mailbox_hash = ?1;", - )?; - - let mut ret = stmt.query_map(sqlite3::params![mailbox_hash], |row| { - Ok(( - row.get(0).map(|u: Sqlite3UID| u as UID)?, - row.get(1)?, - row.get(2)?, - )) - })?; - if let Some(v) = ret.next() { - let (uidvalidity, flags, highestmodseq): ( - UIDVALIDITY, - Vec, - Option, - ) = v?; - debug!( - "mailbox state {} in cache uidvalidity {}", - mailbox_hash, uidvalidity - ); - debug!( - "mailbox state {} in cache highestmodseq {:?}", - mailbox_hash, &highestmodseq - ); - debug!( - "mailbox state {} inserting flags: {:?}", - mailbox_hash, - to_str!(&flags) - ); - self.uid_store - .highestmodseqs - .lock() - .unwrap() - .entry(mailbox_hash) - .and_modify(|entry| *entry = highestmodseq.ok_or(())) - .or_insert_with(|| highestmodseq.ok_or(())); - self.uid_store - .uidvalidity - .lock() - .unwrap() - .entry(mailbox_hash) - .and_modify(|entry| *entry = uidvalidity) - .or_insert(uidvalidity); - let mut tag_lck = self.uid_store.collection.tag_index.write().unwrap(); - for f in to_str!(&flags).split('\0') { - let hash = TagHash::from_bytes(f.as_bytes()); - tag_lck.entry(hash).or_insert_with(|| f.to_string()); - } - self.loaded_mailboxes.insert(mailbox_hash); - Ok(Some(())) - } else { - debug!("mailbox state {} not in cache", mailbox_hash); - Ok(None) - } - } - - fn clear( - &mut self, - mailbox_hash: MailboxHash, - select_response: &SelectResponse, - ) -> Result<()> { - debug!("clear mailbox_hash {} {:?}", mailbox_hash, select_response); - self.loaded_mailboxes.remove(&mailbox_hash); - self.connection - .execute( - "DELETE FROM mailbox WHERE mailbox_hash = ?1", - sqlite3::params![mailbox_hash], - ) - .chain_err_summary(|| { - format!( - "Could not clear cache of mailbox {} account {}", - mailbox_hash, self.uid_store.account_name - ) - })?; - - if let Some(Ok(highestmodseq)) = select_response.highestmodseq { - self.connection - .execute( - "INSERT OR IGNORE INTO mailbox (uidvalidity, flags, highestmodseq, \ - mailbox_hash) VALUES (?1, ?2, ?3, ?4)", - sqlite3::params![ - select_response.uidvalidity as Sqlite3UID, - select_response - .flags - .1 - .iter() - .map(|s| s.as_str()) - .collect::>() - .join("\0") - .as_bytes(), - highestmodseq, - mailbox_hash - ], - ) - .chain_err_summary(|| { - format!( - "Could not insert uidvalidity {} in header_cache of account {}", - select_response.uidvalidity, self.uid_store.account_name - ) - })?; - } else { - self.connection - .execute( - "INSERT OR IGNORE INTO mailbox (uidvalidity, flags, mailbox_hash) VALUES \ - (?1, ?2, ?3)", - sqlite3::params![ - select_response.uidvalidity as Sqlite3UID, - select_response - .flags - .1 - .iter() - .map(|s| s.as_str()) - .collect::>() - .join("\0") - .as_bytes(), - mailbox_hash - ], - ) - .chain_err_summary(|| { - format!( - "Could not insert mailbox {} in header_cache of account {}", - select_response.uidvalidity, self.uid_store.account_name - ) - })?; - } - Ok(()) - } - - fn update_mailbox( - &mut self, - mailbox_hash: MailboxHash, - select_response: &SelectResponse, - ) -> Result<()> { - if self.mailbox_state(mailbox_hash)?.is_none() { - return self.clear(mailbox_hash, select_response); - } - - if let Some(Ok(highestmodseq)) = select_response.highestmodseq { - self.connection - .execute( - "UPDATE mailbox SET flags=?1, highestmodseq =?2 where mailbox_hash = ?3;", - sqlite3::params![ - select_response - .flags - .1 - .iter() - .map(|s| s.as_str()) - .collect::>() - .join("\0") - .as_bytes(), - highestmodseq, - mailbox_hash - ], - ) - .chain_err_summary(|| { - format!( - "Could not update mailbox {} in header_cache of account {}", - mailbox_hash, self.uid_store.account_name - ) - })?; - } else { - self.connection - .execute( - "UPDATE mailbox SET flags=?1 where mailbox_hash = ?2;", - sqlite3::params![ - select_response - .flags - .1 - .iter() - .map(|s| s.as_str()) - .collect::>() - .join("\0") - .as_bytes(), - mailbox_hash - ], - ) - .chain_err_summary(|| { - format!( - "Could not update mailbox {} in header_cache of account {}", - mailbox_hash, self.uid_store.account_name - ) - })?; - } - Ok(()) - } - - fn envelopes(&mut self, mailbox_hash: MailboxHash) -> Result>> { - debug!("envelopes mailbox_hash {}", mailbox_hash); - if self.mailbox_state(mailbox_hash)?.is_none() { - return Ok(None); - } - - let res = { - let mut stmt = self.connection.prepare( - "SELECT uid, envelope, modsequence FROM envelopes WHERE mailbox_hash = ?1;", - )?; - - #[allow(clippy::let_and_return)] // false positive, the let binding is needed - // for the temporary to live long enough - let x = stmt - .query_map(sqlite3::params![mailbox_hash], |row| { - Ok(( - row.get(0).map(|i: Sqlite3UID| i as UID)?, - row.get(1)?, - row.get(2)?, - )) - })? - .collect::>(); - x - }; - let ret: Vec<(UID, Envelope, Option)> = match res { - Err(err) if matches!(&err, rusqlite::Error::FromSqlConversionFailure(_, _, _)) => { - drop(err); - self.reset()?; - return Ok(None); - } - Err(err) => return Err(err.into()), - Ok(v) => v, - }; - let mut max_uid = 0; - let mut env_lck = self.uid_store.envelopes.lock().unwrap(); - let mut hash_index_lck = self.uid_store.hash_index.lock().unwrap(); - let mut uid_index_lck = self.uid_store.uid_index.lock().unwrap(); - let mut env_hashes = Vec::with_capacity(ret.len()); - for (uid, env, modseq) in ret { - env_hashes.push(env.hash()); - max_uid = std::cmp::max(max_uid, uid); - hash_index_lck.insert(env.hash(), (uid, mailbox_hash)); - uid_index_lck.insert((mailbox_hash, uid), env.hash()); - env_lck.insert( - env.hash(), - CachedEnvelope { - inner: env, - uid, - mailbox_hash, - modsequence: modseq, - }, - ); - } - self.uid_store - .max_uids - .lock() - .unwrap() - .insert(mailbox_hash, max_uid); - Ok(Some(env_hashes)) - } - - fn insert_envelopes( - &mut self, - mailbox_hash: MailboxHash, - fetches: &[FetchResponse<'_>], - ) -> Result<()> { - debug!( - "insert_envelopes mailbox_hash {} len {}", - mailbox_hash, - fetches.len() - ); - let mut max_uid = self - .uid_store - .max_uids - .lock() - .unwrap() - .get(&mailbox_hash) - .cloned() - .unwrap_or_default(); - if self.mailbox_state(mailbox_hash)?.is_none() { - return Err(Error::new("Mailbox is not in cache").set_kind(ErrorKind::Bug)); - } - let Self { - ref mut connection, - ref uid_store, - loaded_mailboxes: _, - } = self; - let tx = connection.transaction()?; - for item in fetches { - if let FetchResponse { - uid: Some(uid), - message_sequence_number: _, - modseq, - flags: _, - body: _, - references: _, - envelope: Some(envelope), - raw_fetch_value: _, - } = item - { - max_uid = std::cmp::max(max_uid, *uid); - tx.execute( - "INSERT OR REPLACE INTO envelopes (hash, uid, mailbox_hash, modsequence, \ - envelope) VALUES (?1, ?2, ?3, ?4, ?5)", - sqlite3::params![ - envelope.hash(), - *uid as Sqlite3UID, - mailbox_hash, - modseq, - &envelope - ], - ) - .chain_err_summary(|| { - format!( - "Could not insert envelope {} {} in header_cache of account {}", - envelope.message_id(), - envelope.hash(), - uid_store.account_name - ) - })?; - } - } - tx.commit()?; - self.uid_store - .max_uids - .lock() - .unwrap() - .insert(mailbox_hash, max_uid); - Ok(()) - } - - fn update_flags( - &mut self, - env_hashes: EnvelopeHashBatch, - mailbox_hash: MailboxHash, - flags: SmallVec<[FlagOp; 8]>, - ) -> Result<()> { - if self.mailbox_state(mailbox_hash)?.is_none() { - return Err(Error::new("Mailbox is not in cache").set_kind(ErrorKind::Bug)); - } - let Self { - ref mut connection, - ref uid_store, - loaded_mailboxes: _, - } = self; - let tx = connection.transaction()?; - let values = - std::rc::Rc::new(env_hashes.iter().map(Value::from).collect::>()); - - let mut stmt = - tx.prepare("SELECT uid, envelope FROM envelopes WHERE hash IN rarray(?1);")?; - let rows = stmt - .query_map([values], |row| Ok((row.get(0)?, row.get(1)?)))? - .filter_map(|r| r.ok()) - .collect::>(); - drop(stmt); - let mut stmt = tx.prepare( - "UPDATE envelopes SET envelope = ?1 WHERE mailbox_hash = ?2 AND uid = ?3;", - )?; - for (uid, mut env) in rows { - for op in flags.iter() { - match op { - FlagOp::UnSet(flag) | FlagOp::Set(flag) => { - let mut f = env.flags(); - f.set(*flag, op.as_bool()); - env.set_flags(f); - } - FlagOp::UnSetTag(tag) | FlagOp::SetTag(tag) => { - let hash = TagHash::from_bytes(tag.as_bytes()); - if op.as_bool() { - env.tags_mut().insert(hash); - } else { - env.tags_mut().remove(&hash); - } - } - } - } - stmt.execute(sqlite3::params![&env, mailbox_hash, uid as Sqlite3UID])?; - uid_store - .envelopes - .lock() - .unwrap() - .entry(env.hash()) - .and_modify(|entry| { - entry.inner = env; - }); - } - drop(stmt); - tx.commit()?; - Ok(()) - } - - fn update( - &mut self, - mailbox_hash: MailboxHash, - refresh_events: &[(UID, RefreshEvent)], - ) -> Result<()> { - if self.mailbox_state(mailbox_hash)?.is_none() { - return Err(Error::new("Mailbox is not in cache").set_kind(ErrorKind::Bug)); - } - let Self { - ref mut connection, - ref uid_store, - loaded_mailboxes: _, - } = self; - let tx = connection.transaction()?; - let mut hash_index_lck = uid_store.hash_index.lock().unwrap(); - for (uid, event) in refresh_events { - match &event.kind { - RefreshEventKind::Remove(env_hash) => { - hash_index_lck.remove(env_hash); - tx.execute( - "DELETE FROM envelopes WHERE mailbox_hash = ?1 AND uid = ?2;", - sqlite3::params![mailbox_hash, *uid as Sqlite3UID], - ) - .chain_err_summary(|| { - format!( - "Could not remove envelope {} uid {} from mailbox {} account {}", - env_hash, *uid, mailbox_hash, uid_store.account_name - ) - })?; - } - RefreshEventKind::NewFlags(env_hash, (flags, tags)) => { - let mut stmt = tx.prepare( - "SELECT envelope FROM envelopes WHERE mailbox_hash = ?1 AND uid = ?2;", - )?; - - let mut ret: Vec = stmt - .query_map(sqlite3::params![mailbox_hash, *uid as Sqlite3UID], |row| { - row.get(0) - })? - .collect::>()?; - if let Some(mut env) = ret.pop() { - env.set_flags(*flags); - env.tags_mut().clear(); - env.tags_mut() - .extend(tags.iter().map(|t| TagHash::from_bytes(t.as_bytes()))); - tx.execute( - "UPDATE envelopes SET envelope = ?1 WHERE mailbox_hash = ?2 AND \ - uid = ?3;", - sqlite3::params![&env, mailbox_hash, *uid as Sqlite3UID], - ) - .chain_err_summary(|| { - format!( - "Could not update envelope {} uid {} from mailbox {} account \ - {}", - env_hash, *uid, mailbox_hash, uid_store.account_name - ) - })?; - uid_store - .envelopes - .lock() - .unwrap() - .entry(*env_hash) - .and_modify(|entry| { - entry.inner = env; - }); - } - } - _ => {} - } - } - tx.commit()?; - let new_max_uid = self.max_uid(mailbox_hash).unwrap_or(0); - self.uid_store - .max_uids - .lock() - .unwrap() - .insert(mailbox_hash, new_max_uid); - Ok(()) - } - - fn find_envelope( - &mut self, - identifier: std::result::Result, - mailbox_hash: MailboxHash, - ) -> Result> { - let mut ret: Vec<(UID, Envelope, Option)> = match identifier { - Ok(uid) => { - let mut stmt = self.connection.prepare( - "SELECT uid, envelope, modsequence FROM envelopes WHERE mailbox_hash = ?1 \ - AND uid = ?2;", - )?; - - #[allow(clippy::let_and_return)] // false positive, the let binding is needed - // for the temporary to live long enough - let x = stmt - .query_map(sqlite3::params![mailbox_hash, uid as Sqlite3UID], |row| { - Ok(( - row.get(0).map(|u: Sqlite3UID| u as UID)?, - row.get(1)?, - row.get(2)?, - )) - })? - .collect::>()?; - x - } - Err(env_hash) => { - let mut stmt = self.connection.prepare( - "SELECT uid, envelope, modsequence FROM envelopes WHERE mailbox_hash = ?1 \ - AND hash = ?2;", - )?; - - #[allow(clippy::let_and_return)] // false positive, the let binding is needed - // for the temporary to live long enough - let x = stmt - .query_map(sqlite3::params![mailbox_hash, env_hash], |row| { - Ok(( - row.get(0).map(|u: Sqlite3UID| u as UID)?, - row.get(1)?, - row.get(2)?, - )) - })? - .collect::>()?; - x - } - }; - if ret.len() != 1 { - return Ok(None); - } - let (uid, inner, modsequence) = ret.pop().unwrap(); - Ok(Some(CachedEnvelope { - inner, - uid, - mailbox_hash, - modsequence, - })) - } - - fn rfc822( - &mut self, - identifier: std::result::Result, - mailbox_hash: MailboxHash, - ) -> Result>> { - let mut ret: Vec>> = match identifier { - Ok(uid) => { - let mut stmt = self.connection.prepare( - "SELECT rfc822 FROM envelopes WHERE mailbox_hash = ?1 AND uid = ?2;", - )?; - #[allow(clippy::let_and_return)] // false positive, the let binding is needed - // for the temporary to live long enough - let x = stmt - .query_map(sqlite3::params![mailbox_hash, uid as Sqlite3UID], |row| { - row.get(0) - })? - .collect::>()?; - x - } - Err(env_hash) => { - let mut stmt = self.connection.prepare( - "SELECT rfc822 FROM envelopes WHERE mailbox_hash = ?1 AND hash = ?2;", - )?; - #[allow(clippy::let_and_return)] // false positive, the let binding is needed - // for the temporary to live long enough - let x = stmt - .query_map(sqlite3::params![mailbox_hash, env_hash], |row| row.get(0))? - .collect::>()?; - x - } - }; - - if ret.len() != 1 { - return Ok(None); - } - Ok(ret.pop().unwrap()) - } - } -} - pub(super) async fn fetch_cached_envs(state: &mut FetchState) -> Result>> { let FetchState { stage: _, @@ -822,96 +142,3 @@ pub(super) async fn fetch_cached_envs(state: &mut FetchState) -> Result) -> Result> { - Ok(Box::new(Self)) - } - } - - impl ImapCacheReset for DefaultCache { - fn reset_db(_: &UIDStore) -> Result<()> { - Err(Error::new("melib is not built with any imap cache").set_kind(ErrorKind::Bug)) - } - } - - impl ImapCache for DefaultCache { - fn reset(&mut self) -> Result<()> { - Ok(()) - } - - fn mailbox_state(&mut self, _mailbox_hash: MailboxHash) -> Result> { - Err(Error::new("melib is not built with any imap cache").set_kind(ErrorKind::Bug)) - } - - fn clear( - &mut self, - _mailbox_hash: MailboxHash, - _select_response: &SelectResponse, - ) -> Result<()> { - Err(Error::new("melib is not built with any imap cache").set_kind(ErrorKind::Bug)) - } - - fn envelopes(&mut self, _mailbox_hash: MailboxHash) -> Result>> { - Err(Error::new("melib is not built with any imap cache").set_kind(ErrorKind::Bug)) - } - - fn insert_envelopes( - &mut self, - _mailbox_hash: MailboxHash, - _fetches: &[FetchResponse<'_>], - ) -> Result<()> { - Err(Error::new("melib is not built with any imap cache").set_kind(ErrorKind::Bug)) - } - - fn update_mailbox( - &mut self, - _mailbox_hash: MailboxHash, - _select_response: &SelectResponse, - ) -> Result<()> { - Err(Error::new("melib is not built with any imap cache").set_kind(ErrorKind::Bug)) - } - - fn update( - &mut self, - _mailbox_hash: MailboxHash, - _refresh_events: &[(UID, RefreshEvent)], - ) -> Result<()> { - Err(Error::new("melib is not built with any imap cache").set_kind(ErrorKind::Bug)) - } - - fn find_envelope( - &mut self, - _identifier: std::result::Result, - _mailbox_hash: MailboxHash, - ) -> Result> { - Err(Error::new("melib is not built with any imap cache").set_kind(ErrorKind::Bug)) - } - - fn rfc822( - &mut self, - _identifier: std::result::Result, - _mailbox_hash: MailboxHash, - ) -> Result>> { - Err(Error::new("melib is not built with any imap cache").set_kind(ErrorKind::Bug)) - } - - fn update_flags( - &mut self, - _env_hashes: EnvelopeHashBatch, - _mailbox_hash: MailboxHash, - _flags: SmallVec<[FlagOp; 8]>, - ) -> Result<()> { - Err(Error::new("melib is not built with any imap cache").set_kind(ErrorKind::Bug)) - } - } -} diff --git a/melib/src/imap/cache/ram_cache.rs b/melib/src/imap/cache/ram_cache.rs new file mode 100644 index 00000000..77f6d7e7 --- /dev/null +++ b/melib/src/imap/cache/ram_cache.rs @@ -0,0 +1,21 @@ +// +// ____ +// +// Copyright 2024 Emmanouil Pitsidianakis +// +// This file is part of ____. +// +// ____ is free software: you can redistribute it and/or modify +// it under the terms of the GNU General Public License as published by +// the Free Software Foundation, either version 3 of the License, or +// (at your option) any later version. +// +// ____ is distributed in the hope that it will be useful, +// but WITHOUT ANY WARRANTY; without even the implied warranty of +// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +// GNU General Public License for more details. +// +// You should have received a copy of the GNU General Public License +// along with ____. If not, see . +// +// SPDX-License-Identifier: EUPL-1.2 OR GPL-3.0-or-later diff --git a/melib/src/imap/cache/sqlite3_cache.rs b/melib/src/imap/cache/sqlite3_cache.rs new file mode 100644 index 00000000..cae26b75 --- /dev/null +++ b/melib/src/imap/cache/sqlite3_cache.rs @@ -0,0 +1,714 @@ +// +// melib - IMAP +// +// Copyright 2024 Emmanouil Pitsidianakis +// +// This file is part of melib. +// +// melib is free software: you can redistribute it and/or modify +// it under the terms of the GNU General Public License as published by +// the Free Software Foundation, either version 3 of the License, or +// (at your option) any later version. +// +// melib is distributed in the hope that it will be useful, +// but WITHOUT ANY WARRANTY; without even the implied warranty of +// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +// GNU General Public License for more details. +// +// You should have received a copy of the GNU General Public License +// along with melib. If not, see . +// +// SPDX-License-Identifier: EUPL-1.2 OR GPL-3.0-or-later + +use std::{collections::BTreeSet, sync::Arc}; + +use smallvec::SmallVec; + +use crate::{ + backends::{EnvelopeHashBatch, FlagOp, MailboxHash, RefreshEvent, RefreshEventKind, TagHash}, + email::{Envelope, EnvelopeHash}, + error::{Error, ErrorKind, Result, ResultIntoError}, + imap::{ + cache::{CachedEnvelope, ImapCache, ImapCacheReset}, + FetchResponse, ModSequence, SelectResponse, UIDStore, UID, UIDVALIDITY, + }, + utils::sqlite3::{ + self, + rusqlite::types::{FromSql, FromSqlError, FromSqlResult, ToSql, ToSqlOutput, Value}, + Connection, DatabaseDescription, + }, +}; + +type Sqlite3UID = i32; + +#[derive(Debug)] +pub struct Sqlite3Cache { + connection: Connection, + loaded_mailboxes: BTreeSet, + uid_store: Arc, +} + +const DB_DESCRIPTION: DatabaseDescription = DatabaseDescription { + name: "header_cache.db", + identifier: None, + application_prefix: "meli", + init_script: Some( + "PRAGMA foreign_keys = true; + PRAGMA encoding = 'UTF-8'; + + CREATE TABLE IF NOT EXISTS envelopes ( + hash INTEGER NOT NULL, + mailbox_hash INTEGER NOT NULL, + uid INTEGER NOT NULL, + modsequence INTEGER, + rfc822 BLOB, + envelope BLOB NOT NULL, + PRIMARY KEY (mailbox_hash, uid), + FOREIGN KEY (mailbox_hash) REFERENCES mailbox(mailbox_hash) ON DELETE CASCADE + ); + CREATE TABLE IF NOT EXISTS mailbox ( + mailbox_hash INTEGER UNIQUE, + uidvalidity INTEGER, + flags BLOB NOT NULL, + highestmodseq INTEGER, + PRIMARY KEY (mailbox_hash) + ); + CREATE INDEX IF NOT EXISTS envelope_uid_idx ON envelopes(mailbox_hash, uid); + CREATE INDEX IF NOT EXISTS envelope_idx ON envelopes(hash); + CREATE INDEX IF NOT EXISTS mailbox_idx ON mailbox(mailbox_hash);", + ), + version: 3, +}; + +impl From for Value { + fn from(env_hash: EnvelopeHash) -> Self { + (env_hash.0 as i64).into() + } +} + +impl ToSql for ModSequence { + fn to_sql(&self) -> rusqlite::Result { + Ok(ToSqlOutput::from(self.0.get() as i64)) + } +} + +impl FromSql for ModSequence { + fn column_result(value: rusqlite::types::ValueRef) -> FromSqlResult { + let i: i64 = FromSql::column_result(value)?; + if i == 0 { + return Err(FromSqlError::OutOfRange(0)); + } + Ok(Self::try_from(i).unwrap()) + } +} + +impl Sqlite3Cache { + pub fn get(uid_store: Arc) -> Result> { + let db_desc = DatabaseDescription { + identifier: Some(uid_store.account_name.to_string().into()), + ..DB_DESCRIPTION.clone() + }; + let connection = match db_desc.open_or_create_db() { + Ok(c) => Ok(c), + Err(err) => { + // try resetting database on error, but only one time. + if db_desc.reset_db().is_ok() { + db_desc.open_or_create_db() + } else { + Err(err) + } + } + }?; + + Ok(Box::new(Self { + connection, + loaded_mailboxes: BTreeSet::default(), + uid_store, + })) + } + + fn max_uid(&mut self, mailbox_hash: MailboxHash) -> Result { + let tx = self.connection.transaction()?; + let mut stmt = tx.prepare("SELECT MAX(uid) FROM envelopes WHERE mailbox_hash = ?1;")?; + + let mut ret: Vec = stmt + .query_map(sqlite3::params![mailbox_hash], |row| { + row.get(0).map(|i: Sqlite3UID| i as UID) + })? + .collect::>()?; + Ok(ret.pop().unwrap_or(0)) + } +} + +impl ImapCacheReset for Sqlite3Cache { + fn reset_db(uid_store: &UIDStore) -> Result<()> { + let db_desc = DatabaseDescription { + identifier: Some(uid_store.account_name.to_string().into()), + ..DB_DESCRIPTION.clone() + }; + db_desc.reset_db() + } +} + +impl ImapCache for Sqlite3Cache { + fn reset(&mut self) -> Result<()> { + Self::reset_db(&self.uid_store) + } + + fn mailbox_state(&mut self, mailbox_hash: MailboxHash) -> Result> { + if self.loaded_mailboxes.contains(&mailbox_hash) { + return Ok(Some(())); + } + debug!("loading mailbox state {} from cache", mailbox_hash); + let tx = self.connection.transaction()?; + let mut stmt = tx.prepare( + "SELECT uidvalidity, flags, highestmodseq FROM mailbox WHERE mailbox_hash = ?1;", + )?; + + let mut ret = stmt.query_map(sqlite3::params![mailbox_hash], |row| { + Ok(( + row.get(0).map(|u: Sqlite3UID| u as UID)?, + row.get(1)?, + row.get(2)?, + )) + })?; + if let Some(v) = ret.next() { + let (uidvalidity, flags, highestmodseq): (UIDVALIDITY, Vec, Option) = + v?; + debug!( + "mailbox state {} in cache uidvalidity {}", + mailbox_hash, uidvalidity + ); + debug!( + "mailbox state {} in cache highestmodseq {:?}", + mailbox_hash, &highestmodseq + ); + debug!( + "mailbox state {} inserting flags: {:?}", + mailbox_hash, + to_str!(&flags) + ); + self.uid_store + .highestmodseqs + .lock() + .unwrap() + .entry(mailbox_hash) + .and_modify(|entry| *entry = highestmodseq.ok_or(())) + .or_insert_with(|| highestmodseq.ok_or(())); + self.uid_store + .uidvalidity + .lock() + .unwrap() + .entry(mailbox_hash) + .and_modify(|entry| *entry = uidvalidity) + .or_insert(uidvalidity); + let mut tag_lck = self.uid_store.collection.tag_index.write().unwrap(); + for f in to_str!(&flags).split('\0') { + let hash = TagHash::from_bytes(f.as_bytes()); + tag_lck.entry(hash).or_insert_with(|| f.to_string()); + } + self.loaded_mailboxes.insert(mailbox_hash); + Ok(Some(())) + } else { + debug!("mailbox state {} not in cache", mailbox_hash); + Ok(None) + } + } + + fn clear(&mut self, mailbox_hash: MailboxHash, select_response: &SelectResponse) -> Result<()> { + debug!("clear mailbox_hash {} {:?}", mailbox_hash, select_response); + self.loaded_mailboxes.remove(&mailbox_hash); + let tx = self.connection.transaction()?; + tx.execute( + "DELETE FROM mailbox WHERE mailbox_hash = ?1", + sqlite3::params![mailbox_hash], + ) + .chain_err_summary(|| { + format!( + "Could not clear cache of mailbox {} account {}", + mailbox_hash, self.uid_store.account_name + ) + })?; + + if let Some(Ok(highestmodseq)) = select_response.highestmodseq { + tx.execute( + "INSERT OR IGNORE INTO mailbox (uidvalidity, flags, highestmodseq, mailbox_hash) \ + VALUES (?1, ?2, ?3, ?4)", + sqlite3::params![ + select_response.uidvalidity as Sqlite3UID, + select_response + .flags + .1 + .iter() + .map(|s| s.as_str()) + .collect::>() + .join("\0") + .as_bytes(), + highestmodseq, + mailbox_hash + ], + ) + .chain_err_summary(|| { + format!( + "Could not insert uidvalidity {} in header_cache of account {}", + select_response.uidvalidity, self.uid_store.account_name + ) + })?; + } else { + tx.execute( + "INSERT OR IGNORE INTO mailbox (uidvalidity, flags, mailbox_hash) VALUES (?1, ?2, \ + ?3)", + sqlite3::params![ + select_response.uidvalidity as Sqlite3UID, + select_response + .flags + .1 + .iter() + .map(|s| s.as_str()) + .collect::>() + .join("\0") + .as_bytes(), + mailbox_hash + ], + ) + .chain_err_summary(|| { + format!( + "Could not insert mailbox {} in header_cache of account {}", + select_response.uidvalidity, self.uid_store.account_name + ) + })?; + } + tx.commit()?; + Ok(()) + } + + fn update_mailbox( + &mut self, + mailbox_hash: MailboxHash, + select_response: &SelectResponse, + ) -> Result<()> { + if self.mailbox_state(mailbox_hash)?.is_none() { + return self.clear(mailbox_hash, select_response); + } + + let tx = self.connection.transaction()?; + if let Some(Ok(highestmodseq)) = select_response.highestmodseq { + tx.execute( + "UPDATE mailbox SET flags=?1, highestmodseq =?2 where mailbox_hash = ?3;", + sqlite3::params![ + select_response + .flags + .1 + .iter() + .map(|s| s.as_str()) + .collect::>() + .join("\0") + .as_bytes(), + highestmodseq, + mailbox_hash + ], + ) + .chain_err_summary(|| { + format!( + "Could not update mailbox {} in header_cache of account {}", + mailbox_hash, self.uid_store.account_name + ) + })?; + } else { + tx.execute( + "UPDATE mailbox SET flags=?1 where mailbox_hash = ?2;", + sqlite3::params![ + select_response + .flags + .1 + .iter() + .map(|s| s.as_str()) + .collect::>() + .join("\0") + .as_bytes(), + mailbox_hash + ], + ) + .chain_err_summary(|| { + format!( + "Could not update mailbox {} in header_cache of account {}", + mailbox_hash, self.uid_store.account_name + ) + })?; + } + tx.commit()?; + Ok(()) + } + + fn envelopes(&mut self, mailbox_hash: MailboxHash) -> Result>> { + debug!("envelopes mailbox_hash {}", mailbox_hash); + if self.mailbox_state(mailbox_hash)?.is_none() { + return Ok(None); + } + + let res = { + let tx = self.connection.transaction()?; + let mut stmt = tx.prepare( + "SELECT uid, envelope, modsequence FROM envelopes WHERE mailbox_hash = ?1;", + )?; + + #[allow(clippy::let_and_return)] // false positive, the let binding is needed + // for the temporary to live long enough + let x = stmt + .query_map(sqlite3::params![mailbox_hash], |row| { + Ok(( + row.get(0).map(|i: Sqlite3UID| i as UID)?, + row.get(1)?, + row.get(2)?, + )) + })? + .collect::>(); + x + }; + let ret: Vec<(UID, Envelope, Option)> = match res { + Err(err) if matches!(&err, rusqlite::Error::FromSqlConversionFailure(_, _, _)) => { + drop(err); + self.reset()?; + return Ok(None); + } + Err(err) => return Err(err.into()), + Ok(v) => v, + }; + let mut max_uid = 0; + let mut env_lck = self.uid_store.envelopes.lock().unwrap(); + let mut hash_index_lck = self.uid_store.hash_index.lock().unwrap(); + let mut uid_index_lck = self.uid_store.uid_index.lock().unwrap(); + let mut env_hashes = Vec::with_capacity(ret.len()); + for (uid, env, modseq) in ret { + env_hashes.push(env.hash()); + max_uid = std::cmp::max(max_uid, uid); + hash_index_lck.insert(env.hash(), (uid, mailbox_hash)); + uid_index_lck.insert((mailbox_hash, uid), env.hash()); + env_lck.insert( + env.hash(), + CachedEnvelope { + inner: env, + uid, + mailbox_hash, + modsequence: modseq, + }, + ); + } + self.uid_store + .max_uids + .lock() + .unwrap() + .insert(mailbox_hash, max_uid); + Ok(Some(env_hashes)) + } + + fn insert_envelopes( + &mut self, + mailbox_hash: MailboxHash, + fetches: &[FetchResponse<'_>], + ) -> Result<()> { + debug!( + "insert_envelopes mailbox_hash {} len {}", + mailbox_hash, + fetches.len() + ); + let mut max_uid = self + .uid_store + .max_uids + .lock() + .unwrap() + .get(&mailbox_hash) + .cloned() + .unwrap_or_default(); + if self.mailbox_state(mailbox_hash)?.is_none() { + return Err(Error::new("Mailbox is not in cache").set_kind(ErrorKind::Bug)); + } + let Self { + ref mut connection, + ref uid_store, + loaded_mailboxes: _, + } = self; + let tx = connection.transaction()?; + for item in fetches { + if let FetchResponse { + uid: Some(uid), + message_sequence_number: _, + modseq, + flags: _, + body: _, + references: _, + envelope: Some(envelope), + raw_fetch_value: _, + } = item + { + max_uid = std::cmp::max(max_uid, *uid); + tx.execute( + "INSERT OR REPLACE INTO envelopes (hash, uid, mailbox_hash, modsequence, \ + envelope) VALUES (?1, ?2, ?3, ?4, ?5)", + sqlite3::params![ + envelope.hash(), + *uid as Sqlite3UID, + mailbox_hash, + modseq, + &envelope + ], + ) + .chain_err_summary(|| { + format!( + "Could not insert envelope {} {} in header_cache of account {}", + envelope.message_id(), + envelope.hash(), + uid_store.account_name + ) + })?; + } + } + tx.commit()?; + self.uid_store + .max_uids + .lock() + .unwrap() + .insert(mailbox_hash, max_uid); + Ok(()) + } + + fn update_flags( + &mut self, + env_hashes: EnvelopeHashBatch, + mailbox_hash: MailboxHash, + flags: SmallVec<[FlagOp; 8]>, + ) -> Result<()> { + if self.mailbox_state(mailbox_hash)?.is_none() { + return Err(Error::new("Mailbox is not in cache").set_kind(ErrorKind::Bug)); + } + let Self { + ref mut connection, + ref uid_store, + loaded_mailboxes: _, + } = self; + let tx = connection.transaction()?; + let values = std::rc::Rc::new(env_hashes.iter().map(Value::from).collect::>()); + + let mut stmt = + tx.prepare("SELECT uid, envelope FROM envelopes WHERE hash IN rarray(?1);")?; + let rows = stmt + .query_map([values], |row| Ok((row.get(0)?, row.get(1)?)))? + .filter_map(|r| r.ok()) + .collect::>(); + drop(stmt); + let mut stmt = + tx.prepare("UPDATE envelopes SET envelope = ?1 WHERE mailbox_hash = ?2 AND uid = ?3;")?; + for (uid, mut env) in rows { + for op in flags.iter() { + match op { + FlagOp::UnSet(flag) | FlagOp::Set(flag) => { + let mut f = env.flags(); + f.set(*flag, op.as_bool()); + env.set_flags(f); + } + FlagOp::UnSetTag(tag) | FlagOp::SetTag(tag) => { + let hash = TagHash::from_bytes(tag.as_bytes()); + if op.as_bool() { + env.tags_mut().insert(hash); + } else { + env.tags_mut().remove(&hash); + } + } + } + } + stmt.execute(sqlite3::params![&env, mailbox_hash, uid as Sqlite3UID])?; + uid_store + .envelopes + .lock() + .unwrap() + .entry(env.hash()) + .and_modify(|entry| { + entry.inner = env; + }); + } + drop(stmt); + tx.commit()?; + Ok(()) + } + + fn update( + &mut self, + mailbox_hash: MailboxHash, + refresh_events: &[(UID, RefreshEvent)], + ) -> Result<()> { + if self.mailbox_state(mailbox_hash)?.is_none() { + return Err(Error::new("Mailbox is not in cache").set_kind(ErrorKind::Bug)); + } + { + let Self { + ref mut connection, + ref uid_store, + loaded_mailboxes: _, + } = self; + let tx = connection.transaction()?; + let mut hash_index_lck = uid_store.hash_index.lock().unwrap(); + for (uid, event) in refresh_events { + match &event.kind { + RefreshEventKind::Remove(env_hash) => { + hash_index_lck.remove(env_hash); + tx.execute( + "DELETE FROM envelopes WHERE mailbox_hash = ?1 AND uid = ?2;", + sqlite3::params![mailbox_hash, *uid as Sqlite3UID], + ) + .chain_err_summary(|| { + format!( + "Could not remove envelope {} uid {} from mailbox {} account {}", + env_hash, *uid, mailbox_hash, uid_store.account_name + ) + })?; + } + RefreshEventKind::NewFlags(env_hash, (flags, tags)) => { + let mut stmt = tx.prepare( + "SELECT envelope FROM envelopes WHERE mailbox_hash = ?1 AND uid = ?2;", + )?; + + let mut ret: Vec = stmt + .query_map(sqlite3::params![mailbox_hash, *uid as Sqlite3UID], |row| { + row.get(0) + })? + .collect::>()?; + if let Some(mut env) = ret.pop() { + env.set_flags(*flags); + env.tags_mut().clear(); + env.tags_mut() + .extend(tags.iter().map(|t| TagHash::from_bytes(t.as_bytes()))); + tx.execute( + "UPDATE envelopes SET envelope = ?1 WHERE mailbox_hash = ?2 AND \ + uid = ?3;", + sqlite3::params![&env, mailbox_hash, *uid as Sqlite3UID], + ) + .chain_err_summary(|| { + format!( + "Could not update envelope {} uid {} from mailbox {} account \ + {}", + env_hash, *uid, mailbox_hash, uid_store.account_name + ) + })?; + uid_store + .envelopes + .lock() + .unwrap() + .entry(*env_hash) + .and_modify(|entry| { + entry.inner = env; + }); + } + } + _ => {} + } + } + tx.commit()?; + } + let new_max_uid = self.max_uid(mailbox_hash).unwrap_or(0); + self.uid_store + .max_uids + .lock() + .unwrap() + .insert(mailbox_hash, new_max_uid); + Ok(()) + } + + fn find_envelope( + &mut self, + identifier: std::result::Result, + mailbox_hash: MailboxHash, + ) -> Result> { + let mut ret: Vec<(UID, Envelope, Option)> = match identifier { + Ok(uid) => { + let tx = self.connection.transaction()?; + let mut stmt = tx.prepare( + "SELECT uid, envelope, modsequence FROM envelopes WHERE mailbox_hash = ?1 AND \ + uid = ?2;", + )?; + + #[allow(clippy::let_and_return)] // false positive, the let binding is needed + // for the temporary to live long enough + let x = stmt + .query_map(sqlite3::params![mailbox_hash, uid as Sqlite3UID], |row| { + Ok(( + row.get(0).map(|u: Sqlite3UID| u as UID)?, + row.get(1)?, + row.get(2)?, + )) + })? + .collect::>()?; + x + } + Err(env_hash) => { + let tx = self.connection.transaction()?; + let mut stmt = tx.prepare( + "SELECT uid, envelope, modsequence FROM envelopes WHERE mailbox_hash = ?1 AND \ + hash = ?2;", + )?; + + #[allow(clippy::let_and_return)] // false positive, the let binding is needed + // for the temporary to live long enough + let x = stmt + .query_map(sqlite3::params![mailbox_hash, env_hash], |row| { + Ok(( + row.get(0).map(|u: Sqlite3UID| u as UID)?, + row.get(1)?, + row.get(2)?, + )) + })? + .collect::>()?; + x + } + }; + if ret.len() != 1 { + return Ok(None); + } + let (uid, inner, modsequence) = ret.pop().unwrap(); + Ok(Some(CachedEnvelope { + inner, + uid, + mailbox_hash, + modsequence, + })) + } + + fn rfc822( + &mut self, + identifier: std::result::Result, + mailbox_hash: MailboxHash, + ) -> Result>> { + let mut ret: Vec>> = match identifier { + Ok(uid) => { + let tx = self.connection.transaction()?; + let mut stmt = tx.prepare( + "SELECT rfc822 FROM envelopes WHERE mailbox_hash = ?1 AND uid = ?2;", + )?; + #[allow(clippy::let_and_return)] // false positive, the let binding is needed + // for the temporary to live long enough + let x = stmt + .query_map(sqlite3::params![mailbox_hash, uid as Sqlite3UID], |row| { + row.get(0) + })? + .collect::>()?; + x + } + Err(env_hash) => { + let tx = self.connection.transaction()?; + let mut stmt = tx.prepare( + "SELECT rfc822 FROM envelopes WHERE mailbox_hash = ?1 AND hash = ?2;", + )?; + #[allow(clippy::let_and_return)] // false positive, the let binding is needed + // for the temporary to live long enough + let x = stmt + .query_map(sqlite3::params![mailbox_hash, env_hash], |row| row.get(0))? + .collect::>()?; + x + } + }; + + if ret.len() != 1 { + return Ok(None); + } + Ok(ret.pop().unwrap()) + } +} diff --git a/melib/src/imap/cache/sync.rs b/melib/src/imap/cache/sync.rs index 29420179..9b86fa99 100644 --- a/melib/src/imap/cache/sync.rs +++ b/melib/src/imap/cache/sync.rs @@ -37,22 +37,22 @@ impl ImapConnection { return Ok(None); } - #[cfg(not(feature = "sqlite3"))] - let mut cache_handle = DefaultCache::get(self.uid_store.clone())?; - #[cfg(feature = "sqlite3")] - let mut cache_handle = Sqlite3Cache::get(self.uid_store.clone())?; - if cache_handle.mailbox_state(mailbox_hash)?.is_none() { - return Ok(None); - } - - match self.sync_policy { - SyncPolicy::None => Ok(None), - SyncPolicy::Basic => self.resync_basic(cache_handle, mailbox_hash).await, - SyncPolicy::Condstore => self.resync_condstore(cache_handle, mailbox_hash).await, - SyncPolicy::CondstoreQresync => { - self.resync_condstoreqresync(cache_handle, mailbox_hash) - .await + if let Some(mut cache_handle) = self.uid_store.cache_handle()? { + if cache_handle.mailbox_state(mailbox_hash)?.is_none() { + return Ok(None); } + + match self.sync_policy { + SyncPolicy::None => Ok(None), + SyncPolicy::Basic => self.resync_basic(cache_handle, mailbox_hash).await, + SyncPolicy::Condstore => self.resync_condstore(cache_handle, mailbox_hash).await, + SyncPolicy::CondstoreQresync => { + self.resync_condstoreqresync(cache_handle, mailbox_hash) + .await + } + } + } else { + Ok(None) } } @@ -61,14 +61,8 @@ impl ImapConnection { mailbox_hash: MailboxHash, ) -> Option>> { debug!("load_cache {}", mailbox_hash); - #[cfg(not(feature = "sqlite3"))] - let mut cache_handle = match DefaultCache::get(self.uid_store.clone()) { - Ok(v) => v, - Err(err) => return Some(Err(err)), - }; - #[cfg(feature = "sqlite3")] - let mut cache_handle = match Sqlite3Cache::get(self.uid_store.clone()) { - Ok(v) => v, + let mut cache_handle = match self.uid_store.cache_handle() { + Ok(v) => v?, Err(err) => return Some(Err(err)), }; match cache_handle.mailbox_state(mailbox_hash) { @@ -85,7 +79,7 @@ impl ImapConnection { } } - //rfc4549_Synchronization_Operations_for_Disconnected_IMAP4_Clients + /// > rfc4549_Synchronization_Operations_for_Disconnected_IMAP4_Clients pub async fn resync_basic( &mut self, mut cache_handle: Box, @@ -317,8 +311,8 @@ impl ImapConnection { Ok(Some(payload.into_iter().map(|(_, env)| env).collect())) } - //rfc4549_Synchronization_Operations_for_Disconnected_IMAP4_Clients - //Section 6.1 + /// > rfc4549_Synchronization_Operations_for_Disconnected_IMAP4_Clients + /// > Section 6.1 pub async fn resync_condstore( &mut self, mut cache_handle: Box, @@ -627,8 +621,9 @@ impl ImapConnection { Ok(Some(payload.into_iter().map(|(_, env)| env).collect())) } - //rfc7162_Quick Flag Changes Resynchronization (CONDSTORE)_and Quick Mailbox - // Resynchronization (QRESYNC) + /// > rfc7162_Quick Flag Changes Resynchronization (CONDSTORE)_and Quick + /// > Mailbox + /// > Resynchronization (QRESYNC) pub async fn resync_condstoreqresync( &mut self, _cache_handle: Box, diff --git a/melib/src/imap/connection.rs b/melib/src/imap/connection.rs index 1650caae..6b8d5500 100644 --- a/melib/src/imap/connection.rs +++ b/melib/src/imap/connection.rs @@ -993,11 +993,7 @@ impl ImapConnection { format!("Could not parse select response for mailbox {}", imap_path) })?; { - if *self.uid_store.keep_offline_cache.lock().unwrap() { - #[cfg(not(feature = "sqlite3"))] - let mut cache_handle = super::cache::DefaultCache::get(self.uid_store.clone())?; - #[cfg(feature = "sqlite3")] - let mut cache_handle = super::cache::Sqlite3Cache::get(self.uid_store.clone())?; + if let Some(mut cache_handle) = self.uid_store.cache_handle()? { if let Err(err) = cache_handle.mailbox_state(mailbox_hash).and_then(|r| { if r.is_none() { cache_handle.clear(mailbox_hash, &select_response) diff --git a/melib/src/imap/mod.rs b/melib/src/imap/mod.rs index 323e3d18..f14a98a3 100644 --- a/melib/src/imap/mod.rs +++ b/melib/src/imap/mod.rs @@ -52,8 +52,6 @@ use std::{ time::{Duration, SystemTime}, }; -#[cfg(feature = "sqlite3")] -pub use cache::ImapCacheReset; pub use cache::ModSequence; use futures::{lock::Mutex as FutureMutex, stream::Stream}; use imap_codec::imap_types::{ @@ -203,6 +201,30 @@ impl UIDStore { timeout, } } + + pub fn cache_handle(self: &Arc) -> Result>> { + if !*self.keep_offline_cache.lock().unwrap() { + return Ok(None); + } + #[cfg(not(feature = "sqlite3"))] + return Ok(None); + #[cfg(feature = "sqlite3")] + return Ok(Some(cache::sqlite3_cache::Sqlite3Cache::get(Arc::clone( + self, + ))?)); + } + + pub fn reset_db(self: &Arc) -> Result<()> { + if !*self.keep_offline_cache.lock().unwrap() { + return Ok(()); + } + #[cfg(not(feature = "sqlite3"))] + return Ok(()); + #[cfg(feature = "sqlite3")] + use crate::imap::cache::ImapCacheReset; + #[cfg(feature = "sqlite3")] + return cache::sqlite3_cache::Sqlite3Cache::reset_db(self); + } } #[derive(Debug)] @@ -304,39 +326,32 @@ impl MailBackend for ImapType { mailbox_hash: MailboxHash, ) -> Result>> + Send + 'static>>> { let cache_handle = { - #[cfg(feature = "sqlite3")] - if *self.uid_store.keep_offline_cache.lock().unwrap() { - match cache::Sqlite3Cache::get(self.uid_store.clone()).chain_err_summary(|| { - format!( - "Could not initialize cache for IMAP account {}. Resetting database.", - self.uid_store.account_name - ) - }) { - Ok(v) => Some(v), - Err(err) => { - (self.uid_store.event_consumer)(self.uid_store.account_hash, err.into()); - match cache::Sqlite3Cache::reset_db(&self.uid_store) - .and_then(|()| cache::Sqlite3Cache::get(self.uid_store.clone())) - .chain_err_summary(|| "Could not reset IMAP cache database.") - { - Ok(v) => Some(v), - Err(err) => { - *self.uid_store.keep_offline_cache.lock().unwrap() = false; - log::trace!( - "{}: sqlite3 cache error: {}", - self.uid_store.account_name, - err - ); - None - } + match self.uid_store.cache_handle().chain_err_summary(|| { + format!( + "Could not initialize cache for IMAP account {}. Resetting database.", + self.uid_store.account_name + ) + }) { + Ok(Some(v)) => Some(v), + Ok(None) => None, + Err(err) => { + (self.uid_store.event_consumer)(self.uid_store.account_hash, err.into()); + match self + .uid_store + .reset_db() + .and_then(|()| self.uid_store.cache_handle()) + .chain_err_summary(|| "Could not reset IMAP cache database.") + { + Ok(Some(v)) => Some(v), + Ok(None) => None, + Err(err) => { + *self.uid_store.keep_offline_cache.lock().unwrap() = false; + log::trace!("{}: cache error: {}", self.uid_store.account_name, err); + None } } } - } else { - None } - #[cfg(not(feature = "sqlite3"))] - None }; let mut state = FetchState { stage: if *self.uid_store.keep_offline_cache.lock().unwrap() && cache_handle.is_some() { @@ -860,9 +875,7 @@ impl MailBackend for ImapType { } } } - #[cfg(feature = "sqlite3")] - if *uid_store.keep_offline_cache.lock().unwrap() { - let mut cache_handle = cache::Sqlite3Cache::get(uid_store.clone())?; + if let Some(mut cache_handle) = uid_store.cache_handle()? { let res = cache_handle.update_flags(env_hashes, mailbox_hash, flags); log::trace!("update_flags in cache: {:?}", res); } diff --git a/melib/src/imap/untagged.rs b/melib/src/imap/untagged.rs index 221ba92b..eb268e76 100644 --- a/melib/src/imap/untagged.rs +++ b/melib/src/imap/untagged.rs @@ -23,7 +23,6 @@ use std::convert::{TryFrom, TryInto}; use imap_codec::imap_types::{command::CommandBody, search::SearchKey, sequence::SequenceSet}; -use super::{ImapConnection, MailboxSelection, UID}; use crate::{ backends::{ BackendMailbox, RefreshEvent, @@ -32,8 +31,12 @@ use crate::{ }, email::common_attributes, error::*, - imap::protocol_parser::{ - generate_envelope_hash, FetchResponse, ImapLineSplit, RequiredResponses, UntaggedResponse, + imap::{ + protocol_parser::{ + generate_envelope_hash, FetchResponse, ImapLineSplit, RequiredResponses, + UntaggedResponse, + }, + ImapConnection, MailboxSelection, UID, }, }; @@ -60,10 +63,7 @@ impl ImapConnection { let mailbox = std::clone::Clone::clone(&self.uid_store.mailboxes.lock().await[&mailbox_hash]); - #[cfg(not(feature = "sqlite3"))] - let mut cache_handle = super::cache::DefaultCache::get(self.uid_store.clone())?; - #[cfg(feature = "sqlite3")] - let mut cache_handle = super::cache::Sqlite3Cache::get(self.uid_store.clone())?; + let mut cache_handle = self.uid_store.cache_handle(); let mut response = Vec::with_capacity(8 * 1024); let untagged_response = match super::protocol_parser::untagged_responses(line).map(|(_, v, _)| v) { @@ -156,7 +156,7 @@ impl ImapConnection { } } } - if *self.uid_store.keep_offline_cache.lock().unwrap() { + if let Ok(Some(ref mut cache_handle)) = cache_handle { for mailbox_hash in mboxes_to_update { cache_handle.update(mailbox_hash, &events)?; } @@ -215,7 +215,7 @@ impl ImapConnection { }) .collect::>(); for (mailbox_hash, pair) in events { - if *self.uid_store.keep_offline_cache.lock().unwrap() { + if let Ok(Some(ref mut cache_handle)) = cache_handle { cache_handle.update(mailbox_hash, &pair)?; } let [(_, event)] = pair; @@ -302,7 +302,7 @@ impl ImapConnection { mailbox.path(), ); } - if *self.uid_store.keep_offline_cache.lock().unwrap() { + if let Ok(Some(ref mut cache_handle)) = cache_handle { if let Err(err) = cache_handle .insert_envelopes(mailbox_hash, &v) .chain_err_summary(|| { @@ -404,7 +404,7 @@ impl ImapConnection { } mailbox.exists.lock().unwrap().insert_new(env.hash()); } - if *self.uid_store.keep_offline_cache.lock().unwrap() { + if let Ok(Some(ref mut cache_handle)) = cache_handle { if let Err(err) = cache_handle .insert_envelopes(mailbox_hash, &v) .chain_err_summary(|| { @@ -551,7 +551,7 @@ impl ImapConnection { kind: NewFlags(env_hash, flags), }, )]; - if *self.uid_store.keep_offline_cache.lock().unwrap() { + if let Ok(Some(ref mut cache_handle)) = cache_handle { cache_handle.update(mailbox_hash, &event)?; } self.add_refresh_event(std::mem::replace( diff --git a/melib/src/imap/watch.rs b/melib/src/imap/watch.rs index ca1b0478..048360c7 100644 --- a/melib/src/imap/watch.rs +++ b/melib/src/imap/watch.rs @@ -90,11 +90,7 @@ pub async fn idle(kit: ImapWatchKit) -> Result<()> { if let Some(v) = uidvalidities.get(&mailbox_hash) { if *v != select_response.uidvalidity { - if *uid_store.keep_offline_cache.lock().unwrap() { - #[cfg(not(feature = "sqlite3"))] - let mut cache_handle = super::cache::DefaultCache::get(uid_store.clone())?; - #[cfg(feature = "sqlite3")] - let mut cache_handle = super::cache::Sqlite3Cache::get(uid_store.clone())?; + if let Ok(Some(mut cache_handle)) = uid_store.cache_handle() { cache_handle.clear(mailbox_hash, &select_response)?; } conn.add_refresh_event(RefreshEvent { @@ -213,10 +209,7 @@ pub async fn examine_updates( }); } } else { - #[cfg(not(feature = "sqlite3"))] - let mut cache_handle = super::cache::DefaultCache::get(uid_store.clone())?; - #[cfg(feature = "sqlite3")] - let mut cache_handle = super::cache::Sqlite3Cache::get(uid_store.clone())?; + let cache_handle = uid_store.cache_handle(); let mut response = Vec::with_capacity(8 * 1024); let select_response = conn .examine_mailbox(mailbox_hash, &mut response, true) @@ -227,7 +220,7 @@ pub async fn examine_updates( if let Some(v) = uidvalidities.get(&mailbox_hash) { if *v != select_response.uidvalidity { - if *uid_store.keep_offline_cache.lock().unwrap() { + if let Ok(Some(mut cache_handle)) = cache_handle { cache_handle.clear(mailbox_hash, &select_response)?; } conn.add_refresh_event(RefreshEvent { @@ -378,17 +371,17 @@ pub async fn examine_updates( } } } - if *uid_store.keep_offline_cache.lock().unwrap() - && cache_handle.mailbox_state(mailbox_hash)?.is_some() - { - cache_handle - .insert_envelopes(mailbox_hash, &v) - .chain_err_summary(|| { - format!( - "Could not save envelopes in cache for mailbox {}", - mailbox.imap_path() - ) - })?; + if let Ok(Some(mut cache_handle)) = cache_handle { + if cache_handle.mailbox_state(mailbox_hash)?.is_some() { + cache_handle + .insert_envelopes(mailbox_hash, &v) + .chain_err_summary(|| { + format!( + "Could not save envelopes in cache for mailbox {}", + mailbox.imap_path() + ) + })?; + } } for FetchResponse { uid, envelope, .. } in v { diff --git a/melib/src/lib.rs b/melib/src/lib.rs index 2c543a89..de4a2a95 100644 --- a/melib/src/lib.rs +++ b/melib/src/lib.rs @@ -191,6 +191,8 @@ pub extern crate futures; #[allow(unused_imports)] #[macro_use] pub extern crate indexmap; +#[cfg(feature = "sqlite3")] +pub extern crate rusqlite; pub extern crate serde_path_to_error; pub extern crate smallvec; pub extern crate smol; diff --git a/melib/src/nntp/store.rs b/melib/src/nntp/store.rs index 1ae7e03a..6b88c7d1 100644 --- a/melib/src/nntp/store.rs +++ b/melib/src/nntp/store.rs @@ -35,6 +35,8 @@ mod inner { pub const DB_DESCRIPTION: DatabaseDescription = DatabaseDescription { name: "nntp_store.db", + application_prefix: "meli", + identifier: None, init_script: Some( "PRAGMA foreign_keys = true; PRAGMA encoding = 'UTF-8'; @@ -60,8 +62,12 @@ CREATE TABLE IF NOT EXISTS article ( impl Store { pub fn new(id: &str) -> Result { + let db_desc = DatabaseDescription { + identifier: Some(id.to_string().into()), + ..DB_DESCRIPTION + }; Ok(Self { - connection: sqlite3::open_or_create_db(&DB_DESCRIPTION, Some(id))?, + connection: db_desc.open_or_create_db()?, }) } diff --git a/melib/src/utils/sqlite3.rs b/melib/src/utils/sqlite3.rs index ea264fb7..24313e6b 100644 --- a/melib/src/utils/sqlite3.rs +++ b/melib/src/utils/sqlite3.rs @@ -19,115 +19,151 @@ * along with meli. If not, see . */ -use std::path::PathBuf; +use std::{borrow::Cow, os::unix::fs::PermissionsExt, path::PathBuf, sync::Arc}; use rusqlite::types::{FromSql, FromSqlError, FromSqlResult, ToSql, ToSqlOutput}; -pub use rusqlite::{self, params, Connection}; +pub use rusqlite::{self, config::DbConfig, params, Connection}; use crate::{error::*, log, Envelope}; -#[derive(Clone, Copy, Debug)] +/// A description for creating, opening and handling application databases. +#[derive(Clone, Debug)] pub struct DatabaseDescription { + /// A name that represents the function of this database, e.g. + /// `headers_cache`, `contacts`, `settings`, etc. pub name: &'static str, + /// An optional identifier string that along with + /// [`DatabaseDescription::name`] makes a specialized identifier for the + /// database. E.g. an account name, a date, etc. + pub identifier: Option>, + /// The name of the application to use when storing the database in `XDG` + /// directories, used for when the consumer application is not `meli` + /// itself. + pub application_prefix: &'static str, + /// A script that initializes the schema of the database. pub init_script: Option<&'static str>, + /// The current value of the `user_version` `PRAGMA` of the `sqlite3` + /// database, used for schema versioning. pub version: u32, } -pub fn db_path(name: &str) -> Result { - let data_dir = - xdg::BaseDirectories::with_prefix("meli").map_err(|e| Error::new(e.to_string()))?; - data_dir - .place_data_file(name) - .map_err(|err| Error::new(err.to_string())) -} - -pub fn open_db(db_path: PathBuf) -> Result { - if !db_path.exists() { - return Err(Error::new("Database doesn't exist")); +impl DatabaseDescription { + /// Returns whether the computed database path for this description exist. + pub fn exists(&self) -> Result { + let path = self.db_path()?; + Ok(path.exists()) } - Ok(Connection::open(&db_path).and_then(|db| { - rusqlite::vtab::array::load_module(&db)?; - Ok(db) - })?) -} -pub fn open_or_create_db( - description: &DatabaseDescription, - identifier: Option<&str>, -) -> Result { - let mut second_try: bool = false; - loop { - let db_path = identifier.map_or_else( - || db_path(description.name), - |id| db_path(&format!("{}_{}", id, description.name)), - )?; + /// Returns the computed database path for this description. + pub fn db_path(&self) -> Result { + let name: Cow<'static, str> = self.identifier.as_ref().map_or_else( + || self.name.into(), + |id| format!("{}_{}", id, self.name).into(), + ); + let data_dir = + xdg::BaseDirectories::with_prefix(self.application_prefix).map_err(|err| { + Error::new(format!( + "Could not open XDG data directory with prefix {}", + self.application_prefix + )) + .set_source(Some(Arc::new(err))) + })?; + data_dir.place_data_file(name.as_ref()).map_err(|err| { + Error::new(format!("Could not create `{}`", name)).set_source(Some(Arc::new(err))) + }) + } + + /// Returns an [`rusqlite::Connection`] for this description. + pub fn open_or_create_db(&self) -> Result { + let mut second_try: bool = false; + let db_path = self.db_path()?; let set_mode = !db_path.exists(); if set_mode { - log::info!( - "Creating {} database in {}", - description.name, - db_path.display() - ); + log::info!("Creating {} database in {}", self.name, db_path.display()); } - let conn = Connection::open(&db_path)?; - rusqlite::vtab::array::load_module(&conn)?; - if set_mode { - use std::os::unix::fs::PermissionsExt; - let file = std::fs::File::open(&db_path)?; - let metadata = file.metadata()?; - let mut permissions = metadata.permissions(); + loop { + let mut inner_fn = || { + let conn = Connection::open(&db_path)?; + conn.busy_timeout(std::time::Duration::new(10, 0))?; + for conf_flag in [ + DbConfig::SQLITE_DBCONFIG_ENABLE_FKEY, + DbConfig::SQLITE_DBCONFIG_ENABLE_TRIGGER, + ] + .into_iter() + { + conn.set_db_config(conf_flag, true)?; + } + rusqlite::vtab::array::load_module(&conn)?; + if set_mode { + let file = std::fs::File::open(&db_path)?; + let metadata = file.metadata()?; + let mut permissions = metadata.permissions(); - permissions.set_mode(0o600); // Read/write for owner only. - file.set_permissions(permissions)?; - } - let version: i32 = conn.pragma_query_value(None, "user_version", |row| row.get(0))?; - if version != 0_i32 && version as u32 != description.version { - log::info!( - "Database version mismatch, is {} but expected {}. Attempting to recreate \ - database.", - version, - description.version - ); - if second_try { - return Err(Error::new(format!( - "Database version mismatch, is {} but expected {}. Could not recreate \ - database.", - version, description.version - ))); + permissions.set_mode(0o600); // Read/write for owner only. + file.set_permissions(permissions)?; + } + let _: String = + conn.pragma_update_and_check(None, "journal_mode", "WAL", |row| row.get(0))?; + let version: i32 = + conn.pragma_query_value(None, "user_version", |row| row.get(0))?; + if version != 0_i32 && version as u32 != self.version { + log::info!( + "Database version mismatch, is {} but expected {}. Attempting to recreate \ + database.", + version, + self.version + ); + if second_try { + return Err(Error::new(format!( + "Database version mismatch, is {} but expected {}. Could not recreate \ + database.", + version, self.version + ))); + } + self.reset_db()?; + second_try = true; + return Ok(conn); + } + + if version == 0 { + conn.pragma_update(None, "user_version", self.version)?; + } + if let Some(s) = self.init_script { + conn.execute_batch(s) + .map_err(|err| Error::new(err.to_string()))?; + } + + Ok(conn) + }; + inner_fn().unwrap(); + match inner_fn() { + Ok(_) if second_try => continue, + Ok(conn) => return Ok(conn), + Err(err) => { + return Err(Error::new(format!( + "{}: Could not open or create database", + db_path.display() + )) + .set_source(Some(Arc::new(err)))) + } } - reset_db(description, identifier)?; - second_try = true; - continue; } - - if version == 0 { - conn.pragma_update(None, "user_version", description.version)?; - } - if let Some(s) = description.init_script { - conn.execute_batch(s) - .map_err(|e| Error::new(e.to_string()))?; - } - - return Ok(conn); } -} -/// Return database to a clean slate. -pub fn reset_db(description: &DatabaseDescription, identifier: Option<&str>) -> Result<()> { - let db_path = identifier.map_or_else( - || db_path(description.name), - |id| db_path(&format!("{}_{}", id, description.name)), - )?; - if !db_path.exists() { - return Ok(()); + /// Reset database to a clean slate. + pub fn reset_db(&self) -> Result<()> { + let db_path = self.db_path()?; + if !db_path.exists() { + return Ok(()); + } + log::info!("Resetting {} database in {}", self.name, db_path.display()); + std::fs::remove_file(&db_path).map_err(|err| { + Error::new(format!("{}: could not remove file", db_path.display())) + .set_kind(ErrorKind::from(err.kind())) + .set_source(Some(Arc::new(err))) + })?; + Ok(()) } - log::info!( - "Resetting {} database in {}", - description.name, - db_path.display() - ); - std::fs::remove_file(&db_path)?; - Ok(()) } impl ToSql for Envelope {