sqlite3: make reindex operation async

memfd
Manos Pitsidianakis 2020-07-16 23:58:46 +03:00
parent 32f196143e
commit 0d3fe288c5
Signed by: Manos Pitsidianakis
GPG Key ID: 73627C2F690DF710
3 changed files with 158 additions and 170 deletions

View File

@ -1563,46 +1563,45 @@ impl Account {
) -> ResultFuture<SmallVec<[EnvelopeHash; 512]>> { ) -> ResultFuture<SmallVec<[EnvelopeHash; 512]>> {
use melib::parsec::Parser; use melib::parsec::Parser;
let query = melib::search::query().parse(search_term)?.1; let query = melib::search::query().parse(search_term)?.1;
self.backend match self.settings.conf.search_backend {
.read() #[cfg(feature = "sqlite3")]
.unwrap() crate::conf::SearchBackend::Sqlite3 => crate::sqlite3::search(search_term, _sort),
.search(query, Some(mailbox_hash)) crate::conf::SearchBackend::None => {
/* if self.backend.read().unwrap().supports_search() {
#[cfg(feature = "sqlite3")] self.backend
{ .read()
crate::sqlite3::search(search_term, sort) .unwrap()
} .search(query, Some(mailbox_hash))
#[cfg(not(feature = "sqlite3"))]
{
let mut ret = SmallVec::new();
let envelopes = self.collection.envelopes.read().unwrap();
for &env_hash in self.collection[&mailbox_hash].iter() {
let envelope = &envelopes[&env_hash];
if envelope.subject().contains(&search_term) {
ret.push(env_hash);
continue;
}
if envelope.field_from_to_string().contains(&search_term) {
ret.push(env_hash);
continue;
}
let op = if let Ok(op) = self.operation(env_hash) {
op
} else { } else {
continue; let mut ret = SmallVec::new();
}; let envelopes = self.collection.envelopes.read().unwrap();
let body = envelope.body(op)?;
let decoded = decode_rec(&body, None); for &env_hash in self.collection[&mailbox_hash].iter() {
let body_text = String::from_utf8_lossy(&decoded); let envelope = &envelopes[&env_hash];
if body_text.contains(&search_term) { if envelope.subject().contains(&search_term) {
ret.push(env_hash); ret.push(env_hash);
continue;
}
if envelope.field_from_to_string().contains(&search_term) {
ret.push(env_hash);
continue;
}
let op = if let Ok(op) = self.operation(env_hash) {
op
} else {
continue;
};
let body = envelope.body(op)?;
let decoded = decode_rec(&body, None);
let body_text = String::from_utf8_lossy(&decoded);
if body_text.contains(&search_term) {
ret.push(env_hash);
}
}
Ok(Box::pin(async { Ok(ret) }))
} }
} }
Ok(ret)
} }
*/
} }
pub fn mailbox_by_path(&self, path: &str) -> Result<MailboxHash> { pub fn mailbox_by_path(&self, path: &str) -> Result<MailboxHash> {

View File

@ -22,16 +22,20 @@
/*! Use an sqlite3 database for fast searching. /*! Use an sqlite3 database for fast searching.
*/ */
use crate::melib::parsec::Parser; use crate::melib::parsec::Parser;
use crate::melib::ResultIntoMeliError;
use melib::search::{ use melib::search::{
escape_double_quote, query, escape_double_quote, query,
Query::{self, *}, Query::{self, *},
}; };
use melib::{ use melib::{
backends::MailBackend, backends::{MailBackend, ResultFuture},
email::{Envelope, EnvelopeHash}, email::{Envelope, EnvelopeHash},
log, log,
sqlite3::{
self as melib_sqlite3,
rusqlite::{self, params},
},
thread::{SortField, SortOrder}, thread::{SortField, SortOrder},
sqlite3::{self as melib_sqlite3, rusqlite::{self, params}},
MeliError, Result, ERROR, MeliError, Result, ERROR,
}; };
@ -147,7 +151,10 @@ pub fn insert(
let conn = melib_sqlite3::open_db(db_path)?; let conn = melib_sqlite3::open_db(db_path)?;
let backend_lck = backend.read().unwrap(); let backend_lck = backend.read().unwrap();
let body = match backend_lck.operation(envelope.hash()).and_then(|op| envelope.body(op)) { let body = match backend_lck
.operation(envelope.hash())
.and_then(|op| envelope.body(op))
{
Ok(body) => body.text(), Ok(body) => body.text(),
Err(err) => { Err(err) => {
debug!( debug!(
@ -258,35 +265,14 @@ pub fn remove(env_hash: EnvelopeHash) -> Result<()> {
Ok(()) Ok(())
} }
pub fn index(context: &mut crate::state::Context, account_name: &str) -> Result<()> { pub fn index(context: &mut crate::state::Context, account_index: usize) -> ResultFuture<()> {
let account = if let Some(a) = context let account = &context.accounts[account_index];
.accounts let (acc_name, acc_mutex, backend_mutex): (String, Arc<RwLock<_>>, Arc<_>) = (
.iter() account.name().to_string(),
.find(|acc| acc.name() == account_name) account.collection.envelopes.clone(),
{ account.backend.clone(),
a );
} else {
return Err(MeliError::new(format!(
"Account {} was not found.",
account_name
)));
};
let (acc_name, acc_mutex, backend_mutex): (String, Arc<RwLock<_>>, Arc<_>) =
if *account.settings.conf.cache_type() != crate::conf::CacheType::Sqlite3 {
return Err(MeliError::new(format!(
"Account {} doesn't have an sqlite3 search backend.",
account_name
)));
} else {
(
account.name().to_string(),
account.collection.envelopes.clone(),
account.backend.clone(),
)
};
let conn = melib_sqlite3::open_or_create_db(DB_NAME, Some(INIT_SCRIPT))?; let conn = melib_sqlite3::open_or_create_db(DB_NAME, Some(INIT_SCRIPT))?;
let work_context = context.work_controller().get_context();
let env_hashes = acc_mutex let env_hashes = acc_mutex
.read() .read()
.unwrap() .unwrap()
@ -295,106 +281,66 @@ pub fn index(context: &mut crate::state::Context, account_name: &str) -> Result<
.collect::<Vec<_>>(); .collect::<Vec<_>>();
/* Sleep, index and repeat in order not to block the main process */ /* Sleep, index and repeat in order not to block the main process */
let handle = std::thread::Builder::new().name(String::from("rebuilding index")).spawn(move || { Ok(Box::pin(async move {
let thread_id = std::thread::current().id(); conn.execute(
"INSERT OR REPLACE INTO accounts (name) VALUES (?1)",
let sleep_dur = std::time::Duration::from_millis(20); params![acc_name.as_str(),],
if let Err(err) = conn.execute( )
"INSERT OR REPLACE INTO accounts (name) VALUES (?1)", params![acc_name.as_str(),],).map_err(|e| MeliError::new(e.to_string())) { .chain_err_summary(|| "Failed to update index:")?;
debug!("{}", let account_id: i32 = {
format!( let mut stmt = conn
"Failed to update index: {}", .prepare("SELECT id FROM accounts WHERE name = ?")
err.to_string()
));
log(
format!(
"Failed to update index: {}",
err.to_string()
),
ERROR,
);
}
let account_id: i32 = {
let mut stmt = conn.prepare("SELECT id FROM accounts WHERE name = ?").unwrap();
let x = stmt.query_map(params![acc_name.as_str()], |row| row.get(0)).unwrap().next().unwrap().unwrap();
x
};
let mut ctr = 0;
debug!("{}", format!("Rebuilding {} index. {}/{}", acc_name, ctr, env_hashes.len()));
work_context
.set_status
.send((thread_id, format!("Rebuilding {} index. {}/{}", acc_name, ctr, env_hashes.len())))
.unwrap(); .unwrap();
for chunk in env_hashes.chunks(200) { let x = stmt
ctr += chunk.len(); .query_map(params![acc_name.as_str()], |row| row.get(0))
let envelopes_lck = acc_mutex.read().unwrap(); .unwrap()
let backend_lck = backend_mutex.read().unwrap(); .next()
for env_hash in chunk { .unwrap()
if let Some(e) = envelopes_lck.get(&env_hash) { .unwrap();
let body = match backend_lck.operation(e.hash()).and_then(|op| e.body(op)) { x
Ok(body) => body.text(), };
Err(err) => { let mut ctr = 0;
debug!("{}", debug!(
format!( "{}",
"Failed to open envelope {}: {}", format!(
e.message_id_display(), "Rebuilding {} index. {}/{}",
err.to_string() acc_name,
)); ctr,
log( env_hashes.len()
format!( )
"Failed to open envelope {}: {}", );
e.message_id_display(), for chunk in env_hashes.chunks(200) {
err.to_string() ctr += chunk.len();
), let envelopes_lck = acc_mutex.read().unwrap();
ERROR, let backend_lck = backend_mutex.read().unwrap();
); for env_hash in chunk {
return; if let Some(e) = envelopes_lck.get(&env_hash) {
} let body = backend_lck
}; .operation(e.hash())
if let Err(err) = conn.execute( .and_then(|op| e.body(op))
"INSERT OR REPLACE INTO envelopes (account_id, hash, date, _from, _to, cc, bcc, subject, message_id, in_reply_to, _references, flags, has_attachments, body_text, timestamp) .chain_err_summary(|| {
format!("Failed to open envelope {}", e.message_id_display(),)
})?
.text()
.replace('\0', "");
conn.execute("INSERT OR REPLACE INTO envelopes (account_id, hash, date, _from, _to, cc, bcc, subject, message_id, in_reply_to, _references, flags, has_attachments, body_text, timestamp)
VALUES (?1, ?2, ?3, ?4, ?5, ?6, ?7, ?8, ?9, ?10, ?11, ?12, ?13, ?14, ?15)", VALUES (?1, ?2, ?3, ?4, ?5, ?6, ?7, ?8, ?9, ?10, ?11, ?12, ?13, ?14, ?15)",
params![account_id, e.hash().to_be_bytes().to_vec(), e.date_as_str(), e.field_from_to_string(), e.field_to_to_string(), e.field_cc_to_string(), e.field_bcc_to_string(), e.subject().into_owned().trim_end_matches('\u{0}'), e.message_id_display().to_string(), e.in_reply_to_display().map(|f| f.to_string()).unwrap_or(String::new()), e.field_references_to_string(), i64::from(e.flags().bits()), if e.has_attachments() { 1 } else { 0 }, body, e.date().to_be_bytes().to_vec()], params![account_id, e.hash().to_be_bytes().to_vec(), e.date_as_str(), e.field_from_to_string(), e.field_to_to_string(), e.field_cc_to_string(), e.field_bcc_to_string(), e.subject().into_owned().trim_end_matches('\u{0}'), e.message_id_display().to_string(), e.in_reply_to_display().map(|f| f.to_string()).unwrap_or(String::new()), e.field_references_to_string(), i64::from(e.flags().bits()), if e.has_attachments() { 1 } else { 0 }, body, e.date().to_be_bytes().to_vec()],
) ).chain_err_summary(|| format!( "Failed to insert envelope {}", e.message_id_display()))?;
.map_err(|e| MeliError::new(e.to_string())) {
debug!("{}",
format!(
"Failed to insert envelope {}: {}",
e.message_id_display(),
err.to_string()
));
log(
format!(
"Failed to insert envelope {}: {}",
e.message_id_display(),
err.to_string()
),
ERROR,
);
}
}
} }
drop(envelopes_lck); }
work_context drop(envelopes_lck);
.set_status let sleep_dur = std::time::Duration::from_millis(20);
.send((thread_id, format!("Rebuilding {} index. {}/{}", acc_name, ctr, env_hashes.len()))) std::thread::sleep(sleep_dur);
.unwrap();
std::thread::sleep(sleep_dur);
} }
work_context.finished.send(thread_id).unwrap(); Ok(())
})?; }))
context.work_controller().static_threads.lock()?.insert(
handle.thread().id(),
String::from("Rebuilding sqlite3 index").into(),
);
Ok(())
} }
pub fn search( pub fn search(
term: &str, term: &str,
(sort_field, sort_order): (SortField, SortOrder), (sort_field, sort_order): (SortField, SortOrder),
) -> Result<SmallVec<[EnvelopeHash; 512]>> { ) -> ResultFuture<SmallVec<[EnvelopeHash; 512]>> {
let db_path = melib_sqlite3::db_path(DB_NAME)?; let db_path = melib_sqlite3::db_path(DB_NAME)?;
if !db_path.exists() { if !db_path.exists() {
return Err(MeliError::new( return Err(MeliError::new(
@ -438,7 +384,7 @@ pub fn search(
)) ))
}) })
.collect::<Result<SmallVec<[EnvelopeHash; 512]>>>(); .collect::<Result<SmallVec<[EnvelopeHash; 512]>>>();
results Ok(Box::pin(async { results }))
} }
/// Translates a `Query` to an Sqlite3 expression in a `String`. /// Translates a `Query` to an Sqlite3 expression in a `String`.

View File

@ -97,6 +97,7 @@ pub struct Context {
receiver: Receiver<ThreadEvent>, receiver: Receiver<ThreadEvent>,
input: InputHandler, input: InputHandler,
work_controller: WorkController, work_controller: WorkController,
job_executor: Arc<JobExecutor>,
pub children: Vec<std::process::Child>, pub children: Vec<std::process::Child>,
pub plugin_manager: PluginManager, pub plugin_manager: PluginManager,
@ -329,6 +330,7 @@ impl State {
replies: VecDeque::with_capacity(5), replies: VecDeque::with_capacity(5),
temp_files: Vec::new(), temp_files: Vec::new(),
work_controller, work_controller,
job_executor,
children: vec![], children: vec![],
plugin_manager, plugin_manager,
@ -838,10 +840,51 @@ impl State {
)); ));
} }
} }
#[cfg(feature = "sqlite3")]
AccountAction(ref account_name, ReIndex) => { AccountAction(ref account_name, ReIndex) => {
#[cfg(feature = "sqlite3")] let account_index = if let Some(a) = self
match crate::sqlite3::index(&mut self.context, account_name) { .context
Ok(()) => { .accounts
.iter()
.position(|acc| acc.name() == account_name)
{
a
} else {
self.context.replies.push_back(UIEvent::Notification(
None,
format!("Account {} was not found.", account_name),
Some(NotificationType::ERROR),
));
return;
};
if *self.context.accounts[account_index]
.settings
.conf
.search_backend()
!= crate::conf::SearchBackend::Sqlite3
{
self.context.replies.push_back(UIEvent::Notification(
None,
format!(
"Account {} doesn't have an sqlite3 search backend.",
account_name
),
Some(NotificationType::ERROR),
));
return;
}
match crate::sqlite3::index(&mut self.context, account_index) {
Ok(job) => {
let (channel, handle, job_id) =
self.context.job_executor.spawn_blocking(job);
self.context.accounts[account_index].active_jobs.insert(
job_id,
crate::conf::accounts::JobRequest::Generic {
name: "Message index rebuild".into(),
handle,
channel,
},
);
self.context.replies.push_back(UIEvent::Notification( self.context.replies.push_back(UIEvent::Notification(
None, None,
"Message index rebuild started.".to_string(), "Message index rebuild started.".to_string(),
@ -856,15 +899,15 @@ impl State {
)); ));
} }
} }
#[cfg(not(feature = "sqlite3"))] }
{ #[cfg(not(feature = "sqlite3"))]
self.context.replies.push_back(UIEvent::Notification( AccountAction(ref account_name, ReIndex) => {
None, self.context.replies.push_back(UIEvent::Notification(
"Message index rebuild failed: meli is not built with sqlite3 support." None,
.to_string(), "Message index rebuild failed: meli is not built with sqlite3 support."
Some(NotificationType::ERROR), .to_string(),
)); Some(NotificationType::ERROR),
} ));
} }
v => { v => {
self.rcv_event(UIEvent::Action(v)); self.rcv_event(UIEvent::Action(v));