imap: add experimental header caching with sqlite3

Add support for header caching. It is currently unstable and should not
be used. It can be turned on by specifying "X_header_caching" to true in
the IMAP account's configuration.

The header cache is saved in a sqlite3 database in your XDG_DATA_DIR,
for example:

  /home/epilys/.local/share/meli/17328072387188469646_header_cache.db

Concerns #31 https://git.meli.delivery/meli/meli/issues/31
memfd
Manos Pitsidianakis 2020-06-07 14:00:13 +03:00
parent 6458ccb860
commit b4dfc1f89d
Signed by: Manos Pitsidianakis
GPG Key ID: 73627C2F690DF710
5 changed files with 320 additions and 13 deletions

View File

@ -41,7 +41,7 @@ reqwest = { version ="0.10.0-alpha.2", optional=true, features = ["json", "block
serde_json = { version = "1.0", optional = true, features = ["raw_value",] }
smallvec = { version = "1.1.0", features = ["serde", ] }
nix = "0.17.0"
rusqlite = {version = "0.20.0", optional =true }
rusqlite = {version = "0.20.0", optional = true }
libloading = "0.6.2"

View File

@ -32,6 +32,7 @@ mod connection;
pub use connection::*;
mod watch;
pub use watch::*;
mod cache;
pub mod managesieve;
mod untagged;
@ -122,6 +123,7 @@ macro_rules! get_conf_val {
#[derive(Debug)]
pub struct UIDStore {
account_hash: AccountHash,
cache_headers: bool,
uidvalidity: Arc<Mutex<HashMap<MailboxHash, UID>>>,
hash_index: Arc<Mutex<HashMap<EnvelopeHash, (UID, MailboxHash)>>>,
uid_index: Arc<Mutex<HashMap<UID, EnvelopeHash>>>,
@ -139,6 +141,7 @@ impl Default for UIDStore {
fn default() -> Self {
UIDStore {
account_hash: 0,
cache_headers: false,
uidvalidity: Default::default(),
hash_index: Default::default(),
uid_index: Default::default(),
@ -227,9 +230,55 @@ impl MailBackend for ImapType {
let _tx = tx.clone();
if let Err(err) = (move || {
let tx = _tx;
let mut response = String::with_capacity(8 * 1024);
let mut our_unseen = 0;
let mut max_uid: cache::MaxUID = 0;
let mut valid_hash_set: HashSet<EnvelopeHash> = HashSet::default();
let cached_hash_set: HashSet<EnvelopeHash> =
(|max_uid: &mut cache::MaxUID| -> Result<HashSet<EnvelopeHash>> {
if !uid_store.cache_headers {
return Ok(HashSet::default());
}
let uidvalidities = uid_store.uidvalidity.lock().unwrap();
let v = if let Some(v) = uidvalidities.get(&mailbox_hash) {
v
} else {
return Ok(HashSet::default());
};
let cached_envs: (cache::MaxUID, Vec<(UID, Envelope)>);
cache::save_envelopes(uid_store.account_hash, mailbox_hash, *v, &[])?;
cached_envs =
cache::get_envelopes(uid_store.account_hash, mailbox_hash, *v)?;
let (_max_uid, envelopes) = debug!(cached_envs);
*max_uid = _max_uid;
let ret = envelopes.iter().map(|(_, env)| env.hash()).collect();
if !envelopes.is_empty() {
let mut payload = vec![];
for (uid, env) in envelopes {
if !env.flags().contains(Flag::SEEN) {
our_unseen += 1;
}
uid_store
.hash_index
.lock()
.unwrap()
.insert(env.hash(), (uid, mailbox_hash));
uid_store.uid_index.lock().unwrap().insert(uid, env.hash());
payload.push(env);
}
debug!("sending cached payload for {}", mailbox_hash);
*unseen.lock().unwrap() = our_unseen;
tx.send(AsyncStatus::Payload(Ok(payload))).unwrap();
}
Ok(ret)
})(&mut max_uid)
.unwrap_or_default();
let mut conn = connection.lock()?;
debug!("locked for get {}", mailbox_path);
let mut response = String::with_capacity(8 * 1024);
/* first SELECT the mailbox to get READ/WRITE permissions (because EXAMINE only
* returns READ-ONLY for both cases) */
@ -247,8 +296,15 @@ impl MailBackend for ImapType {
let v = uidvalidities
.entry(mailbox_hash)
.or_insert(examine_response.uidvalidity);
if uid_store.cache_headers {
let _ = cache::save_envelopes(
uid_store.account_hash,
mailbox_hash,
examine_response.uidvalidity,
&[],
);
}
*v = examine_response.uidvalidity;
let mut permissions = permissions.lock().unwrap();
permissions.create_messages = !examine_response.read_only;
permissions.remove_messages = !examine_response.read_only;
@ -263,8 +319,8 @@ impl MailBackend for ImapType {
conn.examine_mailbox(mailbox_hash, &mut response)?;
let mut tag_lck = uid_store.tag_index.write().unwrap();
let mut our_unseen = 0;
while exists > 0 {
while exists > max_uid {
let mut envelopes = vec![];
debug!("{} exists= {}", mailbox_hash, exists);
if exists == 1 {
@ -274,7 +330,10 @@ impl MailBackend for ImapType {
conn.send_command(
debug!(format!(
"UID FETCH {}:{} (UID FLAGS ENVELOPE BODYSTRUCTURE)",
std::cmp::max(exists.saturating_sub(500), 1),
std::cmp::max(
std::cmp::max(exists.saturating_sub(500), 1),
max_uid + 1
),
exists
))
.as_bytes(),
@ -300,6 +359,7 @@ impl MailBackend for ImapType {
h.write_usize(uid);
h.write(mailbox_path.as_bytes());
env.set_hash(h.finish());
valid_hash_set.insert(env.hash());
if let Some((flags, keywords)) = flags {
if !flags.contains(Flag::SEEN) {
our_unseen += 1;
@ -319,13 +379,37 @@ impl MailBackend for ImapType {
.unwrap()
.insert(env.hash(), (uid, mailbox_hash));
uid_store.uid_index.lock().unwrap().insert(uid, env.hash());
envelopes.push(env);
envelopes.push((uid, env));
}
exists = std::cmp::max(exists.saturating_sub(500), 1);
exists =
std::cmp::max(std::cmp::max(exists.saturating_sub(500), 1), max_uid);
debug!("sending payload for {}", mailbox_hash);
if uid_store.cache_headers {
cache::save_envelopes(
uid_store.account_hash,
mailbox_hash,
examine_response.uidvalidity,
&envelopes
.iter()
.map(|(uid, env)| (*uid, env))
.collect::<SmallVec<[(UID, &Envelope); 1024]>>(),
)?;
}
for &env_hash in cached_hash_set.difference(&valid_hash_set) {
conn.add_refresh_event(RefreshEvent {
account_hash: uid_store.account_hash,
mailbox_hash,
kind: RefreshEventKind::Remove(env_hash),
});
}
*unseen.lock().unwrap() = our_unseen;
tx.send(AsyncStatus::Payload(Ok(envelopes))).unwrap();
let progress = envelopes.len();
tx.send(AsyncStatus::Payload(Ok(envelopes
.into_iter()
.map(|(_, env)| env)
.collect::<Vec<Envelope>>())))
.unwrap();
tx.send(AsyncStatus::ProgressReport(progress)).unwrap();
if exists == 1 {
break;
}
@ -904,6 +988,7 @@ impl ImapType {
};
let uid_store: Arc<UIDStore> = Arc::new(UIDStore {
account_hash,
cache_headers: get_conf_val!(s["X_header_caching"], false)?,
..UIDStore::default()
});
let connection = ImapConnection::new_connection(&server_conf, uid_store.clone());
@ -1047,6 +1132,7 @@ impl ImapType {
get_conf_val!(s["server_port"], 143)?;
get_conf_val!(s["use_starttls"], false)?;
get_conf_val!(s["danger_accept_invalid_certs"], false)?;
get_conf_val!(s["X_header_caching"], false)?;
Ok(())
}
}

View File

@ -0,0 +1,174 @@
/*
* meli - imap melib
*
* Copyright 2020 Manos Pitsidianakis
*
* This file is part of meli.
*
* meli is free software: you can redistribute it and/or modify
* it under the terms of the GNU General Public License as published by
* the Free Software Foundation, either version 3 of the License, or
* (at your option) any later version.
*
* meli is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* GNU General Public License for more details.
*
* You should have received a copy of the GNU General Public License
* along with meli. If not, see <http://www.gnu.org/licenses/>.
*/
use super::UID;
use crate::{
backends::{AccountHash, MailboxHash},
email::Envelope,
error::*,
};
pub type MaxUID = UID;
#[cfg(feature = "sqlite3")]
pub use sqlite3_m::*;
#[cfg(feature = "sqlite3")]
mod sqlite3_m {
use super::*;
use crate::sqlite3;
const DB_NAME: &'static str = "header_cache.db";
const INIT_SCRIPT: &'static str = "PRAGMA foreign_keys = true;
PRAGMA encoding = 'UTF-8';
CREATE TABLE IF NOT EXISTS envelopes (
mailbox_hash INTEGER,
uid INTEGER,
validity INTEGER,
envelope BLOB NOT NULL UNIQUE,
PRIMARY KEY (mailbox_hash, uid, validity),
FOREIGN KEY (mailbox_hash, validity) REFERENCES uidvalidity(mailbox_hash, uid) ON DELETE CASCADE
);
CREATE TABLE IF NOT EXISTS uidvalidity (
uid INTEGER UNIQUE,
mailbox_hash INTEGER UNIQUE,
PRIMARY KEY (mailbox_hash, uid)
);
CREATE INDEX IF NOT EXISTS envelope_idx ON envelopes(mailbox_hash, uid, validity);
CREATE INDEX IF NOT EXISTS uidvalidity_idx ON uidvalidity(mailbox_hash);";
pub fn get_envelopes(
account_hash: AccountHash,
mailbox_hash: MailboxHash,
uidvalidity: usize,
) -> Result<(MaxUID, Vec<(UID, Envelope)>)> {
let conn = sqlite3::open_or_create_db(
&format!("{}_{}", account_hash, DB_NAME),
Some(INIT_SCRIPT),
)?;
let mut stmt = conn
.prepare("SELECT MAX(uid) FROM envelopes WHERE mailbox_hash = ? AND validity = ?")
.unwrap();
let max_uid: usize = stmt
.query_map(
sqlite3::params![mailbox_hash as i64, uidvalidity as i64],
|row| row.get(0).map(|u: i64| u as usize),
)
.chain_err_summary(|| {
format!(
"Error while performing query {:?}",
"SELECT MAX(uid) FROM envelopes WHERE mailbox_hash = ? AND validity = ?"
)
})?
.next()
.unwrap()
.unwrap_or(0);
let mut stmt = conn
.prepare("SELECT uid, envelope FROM envelopes WHERE mailbox_hash = ? AND validity = ?")
.unwrap();
let results: Vec<(UID, Vec<u8>)> = stmt
.query_map(
sqlite3::params![mailbox_hash as i64, uidvalidity as i64],
|row| Ok((row.get::<_, i64>(0)? as usize, row.get(1)?)),
)
.chain_err_summary(|| {
format!(
"Error while performing query {:?}",
"SELECT uid, envelope FROM envelopes WHERE mailbox_hash = ? AND validity = ?",
)
})?
.collect::<std::result::Result<_, _>>()?;
debug!(
"imap cache max_uid: {} results len: {}",
max_uid,
results.len()
);
Ok((
max_uid,
results
.into_iter()
.map(|(uid, env)| {
Ok((
uid,
bincode::deserialize(&env).map_err(|e| MeliError::new(e.to_string()))?,
))
})
.collect::<Result<Vec<(UID, Envelope)>>>()?,
))
}
pub fn save_envelopes(
account_hash: AccountHash,
mailbox_hash: MailboxHash,
uidvalidity: usize,
envs: &[(UID, &Envelope)],
) -> Result<()> {
let conn =
sqlite3::open_or_create_db(&format!("{}_{}", account_hash, DB_NAME), Some(INIT_SCRIPT))
.chain_err_summary(|| {
format!(
"Could not create header_cache.db for account {}",
account_hash
)
})?;
conn.execute(
"INSERT OR REPLACE INTO uidvalidity (uid, mailbox_hash) VALUES (?1, ?2)",
sqlite3::params![uidvalidity as i64, mailbox_hash as i64],
)
.chain_err_summary(|| {
format!(
"Could not insert uidvalidity {} in header_cache of account {}",
uidvalidity, account_hash
)
})?;
for (uid, env) in envs {
conn.execute(
"INSERT OR REPLACE INTO envelopes (uid, mailbox_hash, validity, envelope) VALUES (?1, ?2, ?3, ?4)",
sqlite3::params![*uid as i64, mailbox_hash as i64, uidvalidity as i64, bincode::serialize(env).map_err(|e| MeliError::new(e.to_string()))?],
).chain_err_summary(|| format!("Could not insert envelope with hash {} in header_cache of account {}", env.hash(), account_hash))?;
}
Ok(())
}
}
#[cfg(not(feature = "sqlite3"))]
pub use filesystem_m::*;
#[cfg(not(feature = "sqlite3"))]
mod filesystem_m {
use super::*;
pub fn get_envelopes(
_account_hash: AccountHash,
_mailbox_hash: MailboxHash,
_uidvalidity: usize,
) -> Result<(MaxUID, Vec<(UID, Envelope)>)> {
Ok((0, vec![]))
}
pub fn save_envelopes(
_account_hash: AccountHash,
_mailbox_hash: MailboxHash,
_uidvalidity: usize,
_envs: &[(UID, &Envelope)],
) -> Result<()> {
Ok(())
}
}

View File

@ -144,10 +144,11 @@ pub fn idle(kit: ImapWatchKit) -> Result<()> {
}
};
let mailbox_hash = mailbox.hash();
let uidvalidity;
let mut response = String::with_capacity(8 * 1024);
exit_on_error!(
conn,
account_hash,
account_hash,
mailbox_hash,
work_context,
thread_id,
@ -160,6 +161,7 @@ pub fn idle(kit: ImapWatchKit) -> Result<()> {
*prev_exists = match protocol_parser::select_response(&response) {
Ok(ok) => {
{
uidvalidity = ok.uidvalidity;
let mut uidvalidities = uid_store.uidvalidity.lock().unwrap();
if let Some(v) = uidvalidities.get_mut(&mailbox_hash) {
@ -229,7 +231,7 @@ pub fn idle(kit: ImapWatchKit) -> Result<()> {
super::try_lock(&main_conn, Some(std::time::Duration::new(10, 0)))?;
exit_on_error!(
iter.conn,
account_hash,
account_hash,
mailbox_hash,
work_context,
thread_id,
@ -283,7 +285,7 @@ pub fn idle(kit: ImapWatchKit) -> Result<()> {
/* UID SEARCH RECENT */
exit_on_error!(
conn,
account_hash,
account_hash,
mailbox_hash,
work_context,
thread_id,
@ -364,6 +366,14 @@ pub fn idle(kit: ImapWatchKit) -> Result<()> {
if !env.is_seen() {
*mailbox.unseen.lock().unwrap() += 1;
}
if uid_store.cache_headers {
cache::save_envelopes(
account_hash,
mailbox_hash,
uidvalidity,
&[(uid, &env)],
)?;
}
conn.add_refresh_event(RefreshEvent {
account_hash,
@ -484,6 +494,15 @@ pub fn idle(kit: ImapWatchKit) -> Result<()> {
if !env.is_seen() {
*mailbox.unseen.lock().unwrap() += 1;
}
if uid_store.cache_headers {
cache::save_envelopes(
account_hash,
mailbox_hash,
uidvalidity,
&[(uid, &env)],
)?;
}
conn.add_refresh_event(RefreshEvent {
account_hash,
mailbox_hash,
@ -594,8 +613,10 @@ pub fn examine_updates(
conn.examine_mailbox(mailbox_hash, &mut response)
);
*uid_store.is_online.lock().unwrap() = (Instant::now(), Ok(()));
let uidvalidity;
match protocol_parser::select_response(&response) {
Ok(ok) => {
uidvalidity = ok.uidvalidity;
debug!(&ok);
{
let mut uidvalidities = uid_store.uidvalidity.lock().unwrap();
@ -712,6 +733,15 @@ pub fn examine_updates(
if !env.is_seen() {
*mailbox.unseen.lock().unwrap() += 1;
}
if uid_store.cache_headers {
cache::save_envelopes(
account_hash,
mailbox_hash,
uidvalidity,
&[(uid, &env)],
)?;
}
conn.add_refresh_event(RefreshEvent {
account_hash,
mailbox_hash,
@ -791,6 +821,15 @@ pub fn examine_updates(
if !env.is_seen() {
*mailbox.unseen.lock().unwrap() += 1;
}
if uid_store.cache_headers {
cache::save_envelopes(
account_hash,
mailbox_hash,
uidvalidity,
&[(uid, &env)],
)?;
}
conn.add_refresh_event(RefreshEvent {
account_hash,
mailbox_hash,

View File

@ -232,6 +232,14 @@ impl From<nix::Error> for MeliError {
}
}
#[cfg(feature = "sqlite3")]
impl From<rusqlite::Error> for MeliError {
#[inline]
fn from(kind: rusqlite::Error) -> MeliError {
MeliError::new(format!("{}", kind)).set_source(Some(Arc::new(kind)))
}
}
impl From<libloading::Error> for MeliError {
#[inline]
fn from(kind: libloading::Error) -> MeliError {