Browse Source

melib: remove ThreadTree, use ThreadNodes for root_set

Remove ThreadTree index in Threads {} struct. Keep a Vec<ThreadHash> for
root_set state of mailbox instead of rebuilding ThreadTrees every
time.
tags/pre-alpha-0.3.0
Manos Pitsidianakis 2 years ago
parent
commit
3f7d962abd
Signed by untrusted user: epilys GPG Key ID: 73627C2F690DF710
  1. 55
      melib/src/async_workers.rs
  2. 80
      melib/src/collection.rs
  3. 13
      melib/src/mailbox.rs
  4. 956
      melib/src/thread.rs
  5. 2
      ui/src/components/mail/listing/compact.rs
  6. 181
      ui/src/conf/accounts.rs
  7. 2
      ui/src/execute.rs
  8. 2
      ui/src/execute/actions.rs
  9. 63
      ui/src/workers.rs

55
melib/src/async_workers.rs

@ -72,21 +72,36 @@ impl<T> fmt::Debug for AsyncStatus<T> {
}
/// A builder object for `Async<T>`
#[derive(Debug, Clone)]
#[derive(Clone)]
pub struct AsyncBuilder<T: Send + Sync> {
payload_hook: Option<Arc<Fn() -> () + Send + Sync>>,
tx: chan::Sender<AsyncStatus<T>>,
rx: chan::Receiver<AsyncStatus<T>>,
}
#[derive(Debug, Clone)]
#[derive(Clone)]
pub struct Async<T: Send + Sync> {
value: Option<T>,
pub value: Option<T>,
work: Work,
active: bool,
payload_hook: Option<Arc<dyn Fn() -> () + Send + Sync>>,
link: Option<T>,
tx: chan::Sender<AsyncStatus<T>>,
rx: chan::Receiver<AsyncStatus<T>>,
}
impl<T: Send + Sync> std::fmt::Debug for Async<T> {
fn fmt(&self, f: &mut std::fmt::Formatter) -> std::fmt::Result {
write!(
f,
"Async<{}> {{ active: {}, payload_hook: {} }}",
stringify!(T),
self.active,
self.payload_hook.is_some()
)
}
}
impl<T: Send + Sync> Default for AsyncBuilder<T> {
fn default() -> Self {
AsyncBuilder::<T>::new()
@ -102,6 +117,7 @@ where
AsyncBuilder {
tx: sender,
rx: receiver,
payload_hook: None,
}
}
/// Returns the sender object of the promise's channel.
@ -119,9 +135,19 @@ where
value: None,
tx: self.tx,
rx: self.rx,
link: None,
payload_hook: None,
active: false,
}
}
pub fn add_payload_hook(
&mut self,
payload_hook: Option<Arc<dyn Fn() -> () + Send + Sync>>,
) -> &mut Self {
self.payload_hook = payload_hook;
self
}
}
impl<T> Async<T>
@ -144,6 +170,10 @@ where
pub fn tx(&mut self) -> chan::Sender<AsyncStatus<T>> {
self.tx.clone()
}
/// Returns the receiver object of the promise's channel.
pub fn rx(&mut self) -> chan::Receiver<AsyncStatus<T>> {
self.rx.clone()
}
/// Polls worker thread and returns result.
pub fn poll(&mut self) -> Result<AsyncStatus<T>, ()> {
if self.value.is_some() {
@ -171,6 +201,10 @@ where
},
};
self.value = Some(result);
if let Some(hook) = self.payload_hook.as_ref() {
hook();
}
Ok(AsyncStatus::Finished)
}
/// Blocks until thread joins.
@ -193,4 +227,19 @@ where
}
self.value = Some(result);
}
pub fn link(&mut self, other: Async<T>) -> &mut Self {
let Async {
rx,
tx,
work,
value,
..
} = other;
self.rx = rx;
self.tx = tx;
self.work = work;
self.value = value;
self
}
}

80
melib/src/collection.rs

@ -71,12 +71,12 @@ impl Collection {
self.threads
.entry(folder_hash)
.or_default()
.remove(envelope_hash, &mut self.envelopes);
.remove(envelope_hash);
for (h, t) in self.threads.iter_mut() {
if *h == folder_hash {
continue;
}
t.remove(envelope_hash, &mut self.envelopes);
t.remove(envelope_hash);
}
}
@ -99,7 +99,7 @@ impl Collection {
.threads
.entry(folder_hash)
.or_default()
.update_envelope(old_hash, new_hash, &self.envelopes)
.update_envelope(old_hash, new_hash)
.is_ok()
{
return;
@ -114,9 +114,7 @@ impl Collection {
if *h == folder_hash {
continue;
}
t.update_envelope(old_hash, new_hash, &self.envelopes)
.ok()
.take();
t.update_envelope(old_hash, new_hash).ok().take();
}
}
@ -124,13 +122,13 @@ impl Collection {
/// Returns a list of already existing folders whose threads were updated
pub fn merge(
&mut self,
mut envelopes: FnvHashMap<EnvelopeHash, Envelope>,
mut new_envelopes: FnvHashMap<EnvelopeHash, Envelope>,
folder_hash: FolderHash,
mailbox: &mut Mailbox,
sent_folder: Option<FolderHash>,
) -> Option<StackVec<FolderHash>> {
self.sent_folder = sent_folder;
envelopes.retain(|&h, e| {
new_envelopes.retain(|&h, e| {
if self.message_ids.contains_key(e.message_id().raw()) {
/* skip duplicates until a better way to handle them is found. */
//FIXME
@ -141,11 +139,7 @@ impl Collection {
true
}
});
let mut new_threads = Threads::new(&mut envelopes);
for (h, e) in envelopes {
self.envelopes.insert(h, e);
}
let &mut Collection {
ref mut threads,
ref mut envelopes,
@ -153,10 +147,36 @@ impl Collection {
..
} = self;
if !threads.contains_key(&folder_hash) {
threads.insert(folder_hash, Threads::new(&mut new_envelopes));
for (h, e) in new_envelopes {
envelopes.insert(h, e);
}
} else {
threads.entry(folder_hash).and_modify(|t| {
let mut ordered_hash_set =
new_envelopes.keys().cloned().collect::<Vec<EnvelopeHash>>();
ordered_hash_set.sort_by(|a, b| {
new_envelopes[a]
.date()
.partial_cmp(&new_envelopes[b].date())
.unwrap()
});
for h in ordered_hash_set {
envelopes.insert(h, new_envelopes.remove(&h).unwrap());
t.insert(envelopes, h);
}
});
}
let mut ret = StackVec::new();
for (t_fh, t) in threads.iter_mut() {
let keys = threads.keys().cloned().collect::<Vec<FolderHash>>();
for t_fh in keys {
if t_fh == folder_hash {
continue;
}
if sent_folder.map(|f| f == folder_hash).unwrap_or(false) {
let mut ordered_hash_set = new_threads
let mut ordered_hash_set = threads[&folder_hash]
.hash_set
.iter()
.cloned()
@ -169,28 +189,37 @@ impl Collection {
});
let mut updated = false;
for h in ordered_hash_set {
updated |= t.insert_reply(envelopes, h);
updated |= threads.entry(t_fh).or_default().insert_reply(envelopes, h);
}
if updated {
ret.push(*t_fh);
ret.push(t_fh);
}
continue;
}
if sent_folder.map(|f| f == *t_fh).unwrap_or(false) {
let mut ordered_hash_set =
t.hash_set.iter().cloned().collect::<Vec<EnvelopeHash>>();
if sent_folder.map(|f| f == t_fh).unwrap_or(false) {
let mut ordered_hash_set = threads[&t_fh]
.hash_set
.iter()
.cloned()
.collect::<Vec<EnvelopeHash>>();
ordered_hash_set.sort_by(|a, b| {
envelopes[a]
.date()
.partial_cmp(&envelopes[b].date())
.unwrap()
});
let mut updated = false;
for h in ordered_hash_set {
new_threads.insert_reply(envelopes, h);
updated |= threads
.entry(folder_hash)
.or_default()
.insert_reply(envelopes, h);
}
if updated {
ret.push(folder_hash);
}
}
}
threads.insert(folder_hash, new_threads);
if ret.is_empty() {
None
} else {
@ -206,8 +235,7 @@ impl Collection {
self.envelopes.insert(new_hash, envelope);
if self.sent_folder.map(|f| f == folder_hash).unwrap_or(false) {
for (_, t) in self.threads.iter_mut() {
t.update_envelope(old_hash, new_hash, &self.envelopes)
.unwrap_or(());
t.update_envelope(old_hash, new_hash).unwrap_or(());
}
}
{
@ -215,7 +243,7 @@ impl Collection {
.threads
.entry(folder_hash)
.or_default()
.update_envelope(old_hash, new_hash, &self.envelopes)
.update_envelope(old_hash, new_hash)
.is_ok()
{
return;
@ -230,9 +258,7 @@ impl Collection {
if *h == folder_hash {
continue;
}
t.update_envelope(old_hash, new_hash, &self.envelopes)
.ok()
.take();
t.update_envelope(old_hash, new_hash).ok().take();
}
}

13
melib/src/mailbox.rs

@ -26,14 +26,10 @@
*/
use crate::backends::Folder;
pub use crate::email::*;
use crate::thread::ThreadHash;
pub use crate::thread::{SortField, SortOrder, ThreadNode, Threads};
pub use crate::collection::*;
pub use crate::email::*;
use fnv::{FnvHashMap, FnvHashSet};
/// `Mailbox` represents a folder of mail.
#[derive(Debug, Deserialize, Serialize, Clone, Default)]
pub struct Mailbox {
@ -41,7 +37,6 @@ pub struct Mailbox {
pub folder: Folder,
name: String,
pub envelopes: FnvHashSet<EnvelopeHash>,
pub thread_root_set: FnvHashSet<ThreadHash>,
has_sent: bool,
}
@ -57,6 +52,10 @@ impl Mailbox {
}
}
pub fn merge(&mut self, envelopes: &FnvHashMap<EnvelopeHash, Envelope>) {
self.envelopes.extend(envelopes.keys().cloned());
}
pub fn name(&self) -> &str {
&self.name
}

956
melib/src/thread.rs
File diff suppressed because it is too large
View File

2
ui/src/components/mail/listing/compact.rs

@ -301,7 +301,7 @@ impl ListingTrait for CompactListing {
grid,
(
pos_inc(upper_left, (0, r)),
(flag_x - 1, get_y(upper_left) + r),
(flag_x.saturating_sub(1), get_y(upper_left) + r),
),
fg_color,
bg_color,

181
ui/src/conf/accounts.rs

@ -32,7 +32,7 @@ use melib::backends::{
};
use melib::error::{MeliError, Result};
use melib::mailbox::*;
use melib::thread::ThreadHash;
use melib::thread::{ThreadHash, ThreadNode, Threads};
use melib::AddressBook;
use melib::StackVec;
@ -45,7 +45,7 @@ use std::ops::{Index, IndexMut};
use std::result;
use std::sync::Arc;
pub type Worker = Option<Async<Result<(FnvHashMap<EnvelopeHash, Envelope>, Mailbox)>>>;
pub type Worker = Option<Async<Result<Vec<Envelope>>>>;
macro_rules! mailbox {
($idx:expr, $folders:expr) => {
@ -58,9 +58,13 @@ pub enum MailboxEntry {
Available(Mailbox),
Failed(MeliError),
/// first argument is done work, and second is total work
Parsing(usize, usize),
/// first argument is done work, and second is total work
Threading(usize, usize),
Parsing(Mailbox, usize, usize),
}
impl Default for MailboxEntry {
fn default() -> Self {
MailboxEntry::Parsing(Mailbox::default(), 0, 0)
}
}
impl std::fmt::Display for MailboxEntry {
@ -71,12 +75,9 @@ impl std::fmt::Display for MailboxEntry {
match self {
MailboxEntry::Available(ref m) => m.name().to_string(),
MailboxEntry::Failed(ref e) => e.to_string(),
MailboxEntry::Parsing(done, total) => {
MailboxEntry::Parsing(_, done, total) => {
format!("Parsing messages. [{}/{}]", done, total)
}
MailboxEntry::Threading(done, total) => {
format!("Calculating threads. [{}/{}]", done, total)
}
}
)
}
@ -90,7 +91,7 @@ impl MailboxEntry {
}
}
pub fn is_parsing(&self) -> bool {
if let MailboxEntry::Parsing(_, _) = self {
if let MailboxEntry::Parsing(_, _, _) = self {
true
} else {
false
@ -99,12 +100,14 @@ impl MailboxEntry {
pub fn unwrap_mut(&mut self) -> &mut Mailbox {
match self {
MailboxEntry::Available(ref mut m) => m,
MailboxEntry::Parsing(ref mut m, _, _) => m,
e => panic!(format!("mailbox is not available! {:#}", e)),
}
}
pub fn unwrap(&self) -> &Mailbox {
match self {
MailboxEntry::Available(ref m) => m,
MailboxEntry::Parsing(ref m, _, _) => m,
e => panic!(format!("mailbox is not available! {:#}", e)),
}
}
@ -247,7 +250,10 @@ impl Account {
}
}
}
folders.insert(*h, MailboxEntry::Parsing(0, 0));
folders.insert(
*h,
MailboxEntry::Parsing(Mailbox::new(f.clone(), &FnvHashMap::default()), 0, 0),
);
workers.insert(
*h,
Account::new_worker(f.clone(), &mut backend, notify_fn.clone()),
@ -309,29 +315,45 @@ impl Account {
) -> Worker {
let mailbox_handle = backend.get(&folder);
let mut builder = AsyncBuilder::new();
let tx = builder.tx();
Some(builder.build(Box::new(move || {
let mut handle = mailbox_handle.clone();
let folder = folder.clone();
let work = handle.work().unwrap();
work.compute();
handle.join();
let envelopes: Result<FnvHashMap<EnvelopeHash, Envelope>> = handle.extract().map(|v| {
v.into_iter()
.map(|e| (e.hash(), e))
.collect::<FnvHashMap<EnvelopeHash, Envelope>>()
});
let hash = folder.hash();
if envelopes.is_err() {
tx.send(AsyncStatus::Payload(Err(envelopes.unwrap_err())));
notify_fn.notify(hash);
return;
let our_tx = builder.tx();
let folder_hash = folder.hash();
let w = builder.build(Box::new(move || {
let mut mailbox_handle = mailbox_handle.clone();
let work = mailbox_handle.work().unwrap();
let rx = mailbox_handle.rx();
let tx = mailbox_handle.tx();
std::thread::Builder::new()
.spawn(move || {
work.compute();
})
.unwrap();
loop {
debug!("looping");
chan_select! {
rx.recv() -> r => {
debug!("got {:?}", r);
match r {
Some(s @ AsyncStatus::Payload(_)) => {
our_tx.send(s);
debug!("notifying for {}", folder_hash);
notify_fn.notify(folder_hash);
}
Some(AsyncStatus::Finished) => {
debug!("exiting");
return;
}
Some(s) => {
our_tx.send(s);
}
None => return,
}
}
}
}
let envelopes = envelopes.unwrap();
let m = Mailbox::new(folder, &envelopes);
tx.send(AsyncStatus::Payload(Ok((envelopes, m))));
notify_fn.notify(hash);
})))
}));
Some(w)
}
pub fn reload(&mut self, event: RefreshEvent, folder_hash: FolderHash) -> Option<UIEvent> {
if !self.folders[&folder_hash].is_available() {
@ -460,27 +482,32 @@ impl Account {
&mut self.workers
}
fn load_mailbox(
&mut self,
folder_hash: FolderHash,
payload: (Result<(FnvHashMap<EnvelopeHash, Envelope>, Mailbox)>),
) {
fn load_mailbox(&mut self, folder_hash: FolderHash, payload: Result<Vec<Envelope>>) {
if payload.is_err() {
self.folders
.insert(folder_hash, MailboxEntry::Failed(payload.unwrap_err()));
return;
}
let (envelopes, mut mailbox) = payload.unwrap();
if let Some(updated_folders) =
self.collection
.merge(envelopes, folder_hash, &mut mailbox, self.sent_folder)
{
for f in updated_folders {
self.notify_fn.notify(f);
let envelopes = payload
.unwrap()
.into_iter()
.map(|e| (e.hash(), e))
.collect::<FnvHashMap<EnvelopeHash, Envelope>>();
match self.folders.entry(folder_hash).or_default() {
MailboxEntry::Failed(_) => {}
MailboxEntry::Parsing(ref mut m, _, _) | MailboxEntry::Available(ref mut m) => {
m.merge(&envelopes);
if let Some(updated_folders) =
self.collection
.merge(envelopes, folder_hash, m, self.sent_folder)
{
for f in updated_folders {
self.notify_fn.notify(f);
}
}
}
}
self.folders
.insert(folder_hash, MailboxEntry::Available(mailbox));
self.notify_fn.notify(folder_hash);
}
pub fn status(&mut self, folder_hash: FolderHash) -> result::Result<(), usize> {
@ -488,31 +515,50 @@ impl Account {
None => {
return Ok(());
}
Some(ref mut w) if self.folders[&folder_hash].is_parsing() => match w.poll() {
Some(ref mut w) => match w.poll() {
Ok(AsyncStatus::NoUpdate) => {
return Err(0);
//return Err(0);
}
Ok(AsyncStatus::Payload(envs)) => {
debug!("got payload in status for {}", folder_hash);
self.load_mailbox(folder_hash, envs);
}
Ok(AsyncStatus::Finished) if w.value.is_none() => {
debug!("got finished in status for {}", folder_hash);
self.folders.entry(folder_hash).and_modify(|f| {
let m = if let MailboxEntry::Parsing(m, _, _) = f {
std::mem::replace(m, Mailbox::default())
} else {
return;
};
*f = MailboxEntry::Available(m);
});
self.workers.insert(folder_hash, None);
}
Ok(AsyncStatus::Finished) if w.value.is_some() => {
let envs = w.value.take().unwrap();
debug!("got payload in status for {}", folder_hash);
self.load_mailbox(folder_hash, envs);
}
Ok(AsyncStatus::Finished) => {}
Ok(AsyncStatus::ProgressReport(n)) => {
self.folders.entry(folder_hash).and_modify(|f| {
if let MailboxEntry::Parsing(ref mut d, _) = f {
if let MailboxEntry::Parsing(_, ref mut d, _) = f {
*d += n;
}
});
return Err(n);
//return Err(n);
}
_ => {
return Err(0);
//return Err(0);
}
},
Some(_) => return Ok(()),
};
let m = mem::replace(self.workers.get_mut(&folder_hash).unwrap(), None)
.unwrap()
.extract();
self.workers.insert(folder_hash, None);
self.load_mailbox(folder_hash, m);
if self.folders[&folder_hash].is_available() {
if self.folders[&folder_hash].is_available()
|| (self.folders[&folder_hash].is_parsing()
&& self.collection.threads.contains_key(&folder_hash))
{
Ok(())
} else {
Err(0)
@ -558,15 +604,18 @@ impl Account {
}
pub fn operation(&self, h: EnvelopeHash) -> Box<BackendOp> {
for mailbox in self.folders.values() {
if let MailboxEntry::Available(ref m) = mailbox {
if m.envelopes.contains(&h) {
let operation = self.backend.operation(h, m.folder.hash());
if self.settings.account.read_only() {
return ReadOnlyOp::new(operation);
} else {
return operation;
match mailbox {
MailboxEntry::Available(ref m) | MailboxEntry::Parsing(ref m, _, _) => {
if m.envelopes.contains(&h) {
let operation = self.backend.operation(h, m.folder.hash());
if self.settings.account.read_only() {
return ReadOnlyOp::new(operation);
} else {
return operation;
}
}
}
_ => {}
}
}
debug!("didn't find {}", h);

2
ui/src/execute.rs

@ -22,7 +22,7 @@
/*! A parser module for user commands passed through the Ex mode.
*/
use melib::backends::FolderOperation;
pub use melib::mailbox::{SortField, SortOrder};
pub use melib::thread::{SortField, SortOrder};
use nom::{digit, not_line_ending};
use std;
pub mod actions;

2
ui/src/execute/actions.rs

@ -25,8 +25,8 @@
use crate::components::Component;
use melib::backends::FolderOperation;
pub use melib::mailbox::{SortField, SortOrder};
use melib::thread::ThreadHash;
pub use melib::thread::{SortField, SortOrder};
use melib::{Draft, EnvelopeHash};
extern crate uuid;

63
ui/src/workers.rs

@ -19,19 +19,19 @@ impl WorkController {
}
}
/*
impl Drop for WorkController {
fn drop(&mut self) {
for _ in 0..self.threads.len() {
self.thread_end_tx.send(true);
}
/*
let threads = mem::replace(&mut self.threads, Vec::new());
for handle in threads {
handle.join().unwrap();
}
*/
}
}
*/
// We need a way to keep track of what work needs to be done.
// This is a multi-source, multi-consumer queue which we call a
@ -194,16 +194,20 @@ impl WorkController {
let mut work_done = 0;
'work_loop: loop {
debug!("Waiting for work");
// Loop while there's expected to be work, looking for work.
chan_select! {
thread_end_rx.recv() -> _ => {
debug!("received thread_end_rx, quitting");
break 'work_loop;
},
new_jobs_rx.recv() -> _ => {
// If work is available, do that work.
while let Some(work) = thread_queue.get_work() {
debug!("Got some work");
// Do some work.
work.compute();
debug!("finished work");
// Record that some work was done.
work_done += 1;
@ -243,58 +247,3 @@ impl WorkController {
}
}
}
/*
pub fn add_jobkk
println!("Adding jobs to the queue.");
// Variables to keep track of the number of jobs we expect to do.
let mut jobs_remaining = 0;
let mut jobs_total = 0;
// Just add some numbers to the queue.
// These numbers will be passed into fib(), so they need to stay pretty
// small.
for work in 0..90 {
// Add each one several times.
for _ in 0..100 {
jobs_remaining = queue.add_work(work);
jobs_total += 1;
}
}
// Report that some jobs were inserted, and how many are left to be done.
// This is interesting because the workers have been taking jobs out of the queue
// the whole time the control thread has been putting them in!
//
// Try removing the use of std::thread::yield_now() in the thread closure.
// You'll probably (depending on your system) notice that the number remaining
// after insertion goes way up. That's because the operating system is usually
// (not always, but usually) fairly conservative about interrupting a thread
// that is actually doing work.
//
// Similarly, if you add a call to yield_now() in the loop above, you'll see the
// number remaining probably drop to 1 or 2. This can also change depending on
// how optimized the output code is - try `cargo run --release` vs `cargo run`.
//
// This inconsistency should drive home to you that you as the programmer can't
// make any assumptions at all about when and in what order things will happen
// in parallel code unless you use thread control primatives as demonstrated
// in this program.
println!("Total of {} jobs inserted into the queue ({} remaining at this time).",
jobs_total,
jobs_remaining);
// Get completed work from the channel while there's work to be done.
while jobs_total > 0 {
match results_rx.recv() {
// If the control thread successfully receives, a job was completed.
Ok(_) => { jobs_total -= 1 },
// If the control thread is the one left standing, that's pretty
// problematic.
Err(_) => {panic!("All workers died unexpectedly.");}
}
}
*/
Loading…
Cancel
Save