embed
Manos Pitsidianakis 2018-10-14 19:49:16 +03:00
parent 21a918e4c0
commit 5a28320004
Signed by: Manos Pitsidianakis
GPG Key ID: 73627C2F690DF710
22 changed files with 1440 additions and 428 deletions

View File

@ -32,61 +32,94 @@
*/
use chan;
use std::thread;
use std::fmt;
use std::sync::Arc;
#[derive(Clone)]
pub struct Work(pub Arc<Box<dyn Fn() -> ()>>);
impl Work {
pub fn compute(&self) {
(self.0)();
}
}
impl fmt::Debug for Work {
fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
write!(f, "Work object")
}
}
unsafe impl Send for Work {}
unsafe impl Sync for Work {}
/// Messages to pass between `Async<T>` owner and its worker thread.
#[derive(Debug)]
pub enum AsyncStatus {
#[derive(Clone)]
pub enum AsyncStatus<T> {
NoUpdate,
Payload(T),
Finished,
///The number may hold whatever meaning the user chooses.
ProgressReport(usize),
}
/// A builder object for `Async<T>`
#[derive(Debug)]
pub struct AsyncBuilder {
tx: chan::Sender<AsyncStatus>,
rx: chan::Receiver<AsyncStatus>,
}
#[derive(Debug)]
pub struct Async<T> {
value: Option<T>,
worker: Option<thread::JoinHandle<T>>,
tx: chan::Sender<AsyncStatus>,
rx: chan::Receiver<AsyncStatus>,
}
impl Default for AsyncBuilder {
fn default() -> Self {
AsyncBuilder::new()
impl<T> fmt::Debug for AsyncStatus<T> {
fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
match self {
AsyncStatus::NoUpdate => write!(f, "AsyncStatus<T>::NoUpdate"),
AsyncStatus::Payload(_) => write!(f, "AsyncStatus<T>::Payload(_)"),
AsyncStatus::Finished => write!(f, "AsyncStatus<T>::Finished"),
AsyncStatus::ProgressReport(u) => write!(f, "AsyncStatus<T>::ProgressReport({})", u),
}
}
}
impl AsyncBuilder {
/// A builder object for `Async<T>`
#[derive(Debug, Clone)]
pub struct AsyncBuilder<T> {
tx: chan::Sender<AsyncStatus<T>>,
rx: chan::Receiver<AsyncStatus<T>>,
}
#[derive(Debug, Clone)]
pub struct Async<T> {
value: Option<T>,
work: Work,
active: bool,
tx: chan::Sender<AsyncStatus<T>>,
rx: chan::Receiver<AsyncStatus<T>>,
}
impl<T> Default for AsyncBuilder<T> {
fn default() -> Self {
AsyncBuilder::<T>::new()
}
}
impl<T> AsyncBuilder<T> {
pub fn new() -> Self {
let (sender, receiver) = chan::sync(::std::mem::size_of::<AsyncStatus>());
let (sender, receiver) = chan::sync(8 * ::std::mem::size_of::<AsyncStatus<T>>());
AsyncBuilder {
tx: sender,
rx: receiver,
}
}
/// Returns the sender object of the promise's channel.
pub fn tx(&mut self) -> chan::Sender<AsyncStatus> {
pub fn tx(&mut self) -> chan::Sender<AsyncStatus<T>> {
self.tx.clone()
}
/// Returns the receiver object of the promise's channel.
pub fn rx(&mut self) -> chan::Receiver<AsyncStatus> {
pub fn rx(&mut self) -> chan::Receiver<AsyncStatus<T>> {
self.rx.clone()
}
/// Returns an `Async<T>` object that contains a `Thread` join handle that returns a `T`
pub fn build<T: Clone>(self, worker: thread::JoinHandle<T>) -> Async<T> {
pub fn build(self, work: Box<dyn Fn() -> ()>) -> Async<T> {
Async {
worker: Some(worker),
work: Work(Arc::new(work)),
value: None,
tx: self.tx,
rx: self.rx,
active: false,
}
}
}
@ -96,20 +129,34 @@ impl<T> Async<T> {
pub fn extract(self) -> T {
self.value.unwrap()
}
pub fn work(&mut self) -> Option<Work> {
if !self.active {
self.active = true;
Some(self.work.clone())
} else {
None
}
}
/// Returns the sender object of the promise's channel.
pub fn tx(&mut self) -> chan::Sender<AsyncStatus<T>> {
self.tx.clone()
}
/// Polls worker thread and returns result.
pub fn poll(&mut self) -> Result<AsyncStatus, ()> {
pub fn poll(&mut self) -> Result<AsyncStatus<T>, ()> {
if self.value.is_some() {
return Ok(AsyncStatus::Finished);
}
//self.tx.send(true);
let rx = &self.rx;
let result: T;
chan_select! {
default => {
return Ok(AsyncStatus::NoUpdate);
},
rx.recv() -> r => {
match r {
Some(AsyncStatus::Finished) => {
Some(AsyncStatus::Payload(payload)) => {
result = payload;
},
Some(a) => {
return Ok(a);
@ -118,15 +165,29 @@ impl<T> Async<T> {
return Err(());
},
}
},
}
let v = self.worker.take().unwrap().join().unwrap();
self.value = Some(v);
};
self.value = Some(result);
Ok(AsyncStatus::Finished)
}
/// Blocks until thread joins.
pub fn join(mut self) -> T {
self.worker.take().unwrap().join().unwrap()
pub fn join(&mut self) {
let result: T;
let rx = &self.rx;
loop {
chan_select! {
rx.recv() -> r => {
match r {
Some(AsyncStatus::Payload(payload)) => {
result = payload;
break;
},
_ => continue,
}
}
}
}
self.value = Some(result);
}
}

View File

@ -24,7 +24,7 @@ extern crate fnv;
extern crate notify;
extern crate xdg;
use super::{MaildirFolder, MaildirOp, NotifyFn};
use super::{MaildirFolder, MaildirOp};
use async::*;
use conf::AccountSettings;
use error::Result;
@ -56,19 +56,19 @@ use std::sync::{Arc, Mutex};
#[derive(Debug, Default)]
pub struct HashIndex {
index: FnvHashMap<EnvelopeHash, (usize, PathBuf)>,
index: FnvHashMap<EnvelopeHash, PathBuf>,
hash: FolderHash,
}
impl Deref for HashIndex {
type Target = FnvHashMap<EnvelopeHash, (usize, PathBuf)>;
fn deref(&self) -> &FnvHashMap<EnvelopeHash, (usize, PathBuf)> {
type Target = FnvHashMap<EnvelopeHash, PathBuf>;
fn deref(&self) -> &FnvHashMap<EnvelopeHash, PathBuf> {
&self.index
}
}
impl DerefMut for HashIndex {
fn deref_mut(&mut self) -> &mut FnvHashMap<EnvelopeHash, (usize, PathBuf)> {
fn deref_mut(&mut self) -> &mut FnvHashMap<EnvelopeHash, PathBuf> {
&mut self.index
}
}
@ -82,7 +82,6 @@ pub struct MaildirType {
folders: Vec<MaildirFolder>,
//folder_index: FnvHashMap<FolderHash, usize>,
hash_indexes: HashIndexes,
path: PathBuf,
}
@ -93,11 +92,11 @@ macro_rules! path_is_new {
} else {
let mut iter = $path.components().rev();
iter.next();
iter.next();
iter.next() == Some(Component::Normal(OsStr::new("new")))
}
};
}
macro_rules! get_path_hash {
($path:expr) => {{
let mut path = $path.clone();
@ -132,6 +131,7 @@ fn get_file_hash(file: &Path) -> EnvelopeHash {
}
fn move_to_cur(p: PathBuf) -> PathBuf {
eprintln!("moved to cur");
let mut new = p.clone();
{
let file_name = p.file_name().unwrap();
@ -149,8 +149,8 @@ impl MailBackend for MaildirType {
fn folders(&self) -> Vec<Folder> {
self.folders.iter().map(|f| f.clone()).collect()
}
fn get(&mut self, folder: &Folder, notify_fn: Arc<NotifyFn>) -> Async<Result<Vec<Envelope>>> {
self.multicore(4, folder, notify_fn)
fn get(&mut self, folder: &Folder) -> Async<Result<Vec<Envelope>>> {
self.multicore(4, folder)
}
fn watch(&self, sender: RefreshEventConsumer) -> Result<()> {
let (tx, rx) = channel();
@ -190,6 +190,9 @@ impl MailBackend for MaildirType {
Ok(event) => match event {
/* Create */
DebouncedEvent::Create(mut pathbuf) => {
if path_is_new!(pathbuf) {
pathbuf = move_to_cur(pathbuf);
}
let folder_hash = get_path_hash!(pathbuf);
let file_name = pathbuf
.as_path()
@ -203,13 +206,11 @@ impl MailBackend for MaildirType {
&cache_dir,
file_name,
) {
eprintln!("Create event {} {} {}", env.hash(), env.subject(), pathbuf.display());
sender.send(RefreshEvent {
hash: folder_hash,
kind: Create(Box::new(env)),
});
if path_is_new!(pathbuf) {
move_to_cur(pathbuf);
}
} else {
continue;
}
@ -228,9 +229,9 @@ impl MailBackend for MaildirType {
/* Linear search in hash_index to find old hash */
let old_hash: EnvelopeHash = {
if let Some((k, v)) =
index_lock.iter_mut().find(|(_, v)| v.1 == pathbuf)
index_lock.iter_mut().find(|(_, v)| **v == pathbuf)
{
v.1 = pathbuf.clone();
*v = pathbuf.clone();
*k
} else {
/* Did we just miss a Create event? In any case, create
@ -254,7 +255,8 @@ impl MailBackend for MaildirType {
if index_lock.get_mut(&new_hash).is_none() {
let op = Box::new(MaildirOp::new(new_hash, hash_indexes.clone(), folder_hash));
if let Some(env) = Envelope::from_token(op, new_hash) {
index_lock.insert(new_hash, (0, pathbuf.clone()));
eprintln!("{}\t{}", new_hash, pathbuf.display());
index_lock.insert(new_hash, pathbuf);
/* Send Write notice */
@ -271,15 +273,16 @@ impl MailBackend for MaildirType {
DebouncedEvent::NoticeRemove(mut pathbuf)
| DebouncedEvent::Remove(mut pathbuf) => {
let folder_hash = get_path_hash!(pathbuf);
let hash_indexes_lock = hash_indexes.lock().unwrap();
let index_lock = &hash_indexes_lock[&folder_hash];
let mut hash_indexes_lock = hash_indexes.lock().unwrap();
let index_lock = hash_indexes_lock.entry(folder_hash).or_default();
let hash: EnvelopeHash = if let Some((k, _)) =
index_lock.iter().find(|(_, v)| v.1 == pathbuf)
index_lock.iter().find(|(_, v)| **v == pathbuf)
{
*k
} else {
continue;
};
index_lock.remove(&hash);
sender.send(RefreshEvent {
hash: folder_hash,
@ -290,10 +293,18 @@ impl MailBackend for MaildirType {
DebouncedEvent::Rename(mut src, mut dest) => {
let folder_hash = get_path_hash!(src);
let old_hash: EnvelopeHash = get_file_hash(src.as_path());
let new_hash: EnvelopeHash = get_file_hash(dest.as_path());
let mut hash_indexes_lock = hash_indexes.lock().unwrap();
let mut index_lock = hash_indexes_lock.entry(folder_hash).or_default();
if let Some(v) = index_lock.get_mut(&old_hash) {
v.1 = dest;
let index_lock = hash_indexes_lock.entry(folder_hash).or_default();
if index_lock.contains_key(&old_hash) {
sender.send(RefreshEvent {
hash: get_path_hash!(dest),
kind: Rename(old_hash, new_hash),
});
index_lock.remove(&old_hash);
index_lock.insert(new_hash, dest);
} else {
/* Maybe a re-read should be triggered here just to be safe. */
sender.send(RefreshEvent {
@ -391,33 +402,31 @@ impl MaildirType {
.0
}
pub fn multicore(
&mut self,
cores: usize,
folder: &Folder,
notify_fn: Arc<NotifyFn>,
) -> Async<Result<Vec<Envelope>>> {
pub fn multicore(&mut self, cores: usize, folder: &Folder) -> Async<Result<Vec<Envelope>>> {
let mut w = AsyncBuilder::new();
let root_path = self.path.to_path_buf();
let cache_dir = xdg::BaseDirectories::with_profile("meli", &self.name).unwrap();
{
let mut hash_index = self.hash_indexes.lock().unwrap();
let index_lock = hash_index.entry(folder.hash()).or_default();
index_lock.clear();
}
let handle = {
let tx = w.tx();
// TODO: Avoid clone
let folder: &MaildirFolder = &self.folders[self.owned_folder_idx(folder)];
let folder_hash = folder.hash();
let mut path: PathBuf = folder.path().into();
let tx_final = w.tx();
let path: PathBuf = folder.path().into();
let name = format!("parsing {:?}", folder.name());
let root_path = self.path.to_path_buf();
let map = self.hash_indexes.clone();
let map2 = self.hash_indexes.clone();
thread::Builder::new()
.name(name.clone())
.spawn(move || {
let closure = move || {
let name = name.clone();
let root_path = root_path.clone();
let map = map.clone();
let map2 = map.clone();
let tx = tx.clone();
let cache_dir = cache_dir.clone();
let path = path.clone();
let thunk = move || {
let mut path = path.clone();
let cache_dir = cache_dir.clone();
{
path.push("new");
@ -428,11 +437,12 @@ impl MaildirType {
}
path.pop();
}
path.push("cur");
let iter = path.read_dir()?;
let count = path.read_dir()?.count();
let mut files: Vec<PathBuf> = Vec::with_capacity(count);
let mut r = Vec::with_capacity(count);
let mut ret = Vec::with_capacity(count);
for e in iter {
let e = e.and_then(|x| {
let path = x.path();
@ -459,7 +469,7 @@ impl MaildirType {
let size = if len <= 100 { 100 } else { (len / 100) * 100 };
let mut local_r: Vec<
Envelope,
> = Vec::with_capacity(chunk.len());
> = Vec::with_capacity(chunk.len());
for c in chunk.chunks(size) {
//thread::yield_now();
let map = map.clone();
@ -474,26 +484,26 @@ impl MaildirType {
.to_path_buf();
if let Some(cached) =
cache_dir.find_cache_file(&file_name)
{
/* Cached struct exists, try to load it */
let reader = io::BufReader::new(
fs::File::open(&cached).unwrap(),
);
let result: result::Result<Envelope, _> = bincode::deserialize_from(reader);
if let Ok(env) = result {
let mut map = map.lock().unwrap();
let mut map = map.entry(folder_hash).or_default();;
let hash = env.hash();
map.insert(hash, (0, file.clone()));
local_r.push(env);
continue;
}
};
{
/* Cached struct exists, try to load it */
let reader = io::BufReader::new(
fs::File::open(&cached).unwrap(),
);
let result: result::Result<Envelope, _> = bincode::deserialize_from(reader);
if let Ok(env) = result {
let mut map = map.lock().unwrap();
let mut map = map.entry(folder_hash).or_default();;
let hash = env.hash();
map.insert(hash, file.clone());
local_r.push(env);
continue;
}
};
let hash = get_file_hash(file);
{
let mut map = map.lock().unwrap();
let mut map = map.entry(folder_hash).or_default();
(*map).insert(hash, (0, PathBuf::from(file)));
(*map).insert(hash, PathBuf::from(file));
}
let op =
Box::new(MaildirOp::new(hash, map.clone(), folder_hash));
@ -515,7 +525,7 @@ impl MaildirType {
}
local_r.push(e);
} else {
eprintln!("DEBUG: hash {}, path: {} couldn't be parsed in `add_path_to_index`", hash, file.as_path().display());
eprintln!("DEBUG: hash {}, path: {} couldn't be parsed in `add_path_to_index`", hash, file.as_path().display());
continue;
}
}
@ -529,20 +539,16 @@ impl MaildirType {
}
for t in threads {
let mut result = t.join();
r.append(&mut result);
ret.append(&mut result);
}
let mut map = map2.lock().unwrap();
let map = map.entry(folder_hash).or_default();
for (idx, e) in r.iter().enumerate() {
let mut y = (*map)[&e.hash()].clone();
y.0 = idx;
(*map).insert(e.hash(), y);
}
tx.send(AsyncStatus::Finished);
notify_fn.notify();
Ok(r)
})
.unwrap()
Ok(ret)
};
let result = thunk();
tx_final.send(AsyncStatus::Payload(result));
};
Box::new(closure)
};
w.build(handle)
}
@ -558,15 +564,22 @@ fn add_path_to_index(
let env: Envelope;
let hash = get_file_hash(path);
{
let mut hash_index = hash_index.lock().unwrap();
let index_lock = hash_index.entry(folder_hash).or_default();
if index_lock.contains_key(&hash) {
return None;
let mut map = hash_index.lock().unwrap();
let map = map.entry(folder_hash).or_default();;
map.insert(hash, path.to_path_buf());
eprintln!(
"inserted {} in {} map, len={}",
hash,
folder_hash,
map.len()
);
for e in map.iter() {
eprintln!("{:#?}", e);
}
index_lock.insert(hash, (0, path.to_path_buf()));
}
let op = Box::new(MaildirOp::new(hash, hash_index.clone(), folder_hash));
if let Some(e) = Envelope::from_token(op, hash) {
eprintln!("add_path_to_index gen {}\t{}", hash, file_name.display());
if let Ok(cached) = cache_dir.place_cache_file(file_name) {
/* place result in cache directory */
let f = match fs::File::create(cached) {

View File

@ -67,7 +67,14 @@ impl MaildirOp {
fn path(&self) -> PathBuf {
let map = self.hash_index.lock().unwrap();
let map = &map[&self.folder_hash];
map.get(&self.hash).unwrap().1.clone()
eprintln!("looking for {} in {} map", self.hash, self.folder_hash);
if !map.contains_key(&self.hash) {
eprintln!("doesn't contain it though len = {}\n{:#?}", map.len(), map);
for e in map.iter() {
eprintln!("{:#?}", e);
}
}
map.get(&self.hash).unwrap().clone()
}
}
@ -153,7 +160,7 @@ impl<'a> BackendOp for MaildirOp {
let hash_index = self.hash_index.clone();
let mut map = hash_index.lock().unwrap();
let map = map.entry(self.folder_hash).or_default();
map.get_mut(&hash).unwrap().1 = PathBuf::from(new_name);
*map.get_mut(&hash).unwrap() = PathBuf::from(new_name);
Ok(())
}
}

View File

@ -32,7 +32,6 @@ use mailbox::email::{Envelope, EnvelopeHash, Flag};
use std::fmt;
use std::fmt::Debug;
use std::ops::Deref;
use std::sync::Arc;
extern crate fnv;
use self::fnv::FnvHashMap;
@ -84,6 +83,8 @@ impl Backends {
#[derive(Debug)]
pub enum RefreshEventKind {
Update(EnvelopeHash, Box<Envelope>),
/// Rename(old_hash, new_hash)
Rename(EnvelopeHash, EnvelopeHash),
Create(Box<Envelope>),
Remove(FolderHash),
Rescan,
@ -145,7 +146,7 @@ impl NotifyFn {
}
}
pub trait MailBackend: ::std::fmt::Debug {
fn get(&mut self, folder: &Folder, notify_fn: Arc<NotifyFn>) -> Async<Result<Vec<Envelope>>>;
fn get(&mut self, folder: &Folder) -> Async<Result<Vec<Envelope>>>;
fn watch(&self, sender: RefreshEventConsumer) -> Result<()>;
fn folders(&self) -> Vec<Folder>;
fn operation(&self, hash: EnvelopeHash, folder_hash: FolderHash) -> Box<BackendOp>;

View File

@ -10,6 +10,7 @@ use std::result;
extern crate fnv;
use self::fnv::FnvHashMap;
use self::fnv::FnvHashSet;
/// `Mailbox` represents a folder of mail.
#[derive(Debug, Clone, Default)]
@ -53,42 +54,29 @@ impl Collection {
let cache_dir =
xdg::BaseDirectories::with_profile("meli", format!("{}_Thread", folder.hash()))
.unwrap();
let threads = if let Some(cached) = cache_dir.find_cache_file("threads") {
/* Scrap caching for now. When a cached threads file is loaded, we must remove/rehash the
* thread nodes that shouldn't exist anymore (e.g. because their file moved from /new to
* /cur, or it was deleted).
*/
let threads = Threads::new(&mut envelopes);
/*if let Some(cached) = cache_dir.find_cache_file("threads") {
let reader = io::BufReader::new(fs::File::open(cached).unwrap());
let result: result::Result<Threads, _> = bincode::deserialize_from(reader);
let ret = if let Ok(mut cached_t) = result {
cached_t.update(&mut envelopes);
use std::iter::FromIterator;
eprintln!("loaded cache, our hash set is {:?}\n and the cached one is {:?}", FnvHashSet::from_iter(envelopes.keys().cloned()), cached_t.hash_set);
cached_t.amend(&mut envelopes);
cached_t
} else {
Threads::new(&mut envelopes)
};
if let Ok(cached) = cache_dir.place_cache_file("threads") {
/* place result in cache directory */
let f = match fs::File::create(cached) {
Ok(f) => f,
Err(e) => {
panic!("{}", e);
}
};
let writer = io::BufWriter::new(f);
bincode::serialize_into(writer, &ret).unwrap();
}
ret
} else {
let ret = Threads::new(&mut envelopes);
if let Ok(cached) = cache_dir.place_cache_file("threads") {
/* place result in cache directory */
let f = match fs::File::create(cached) {
Ok(f) => f,
Err(e) => {
panic!("{}", e);
}
};
let writer = io::BufWriter::new(f);
bincode::serialize_into(writer, &ret).unwrap();
}
ret
Threads::new(&mut envelopes)
};
*/
Collection {
folder: folder.clone(),
envelopes,
@ -106,17 +94,60 @@ impl Collection {
self.envelopes.is_empty()
}
pub fn remove(&mut self, envelope_hash: EnvelopeHash) {
eprintln!("DEBUG: Removing {}", envelope_hash);
self.envelopes.remove(&envelope_hash);
self.threads.remove(envelope_hash, &mut self.envelopes);
}
pub fn rename(&mut self, old_hash: EnvelopeHash, new_hash: EnvelopeHash) {
let mut env = self.envelopes.remove(&old_hash).unwrap();
env.set_hash(new_hash);
self.envelopes.insert(new_hash, env);
{
if self.threads.update_envelope(old_hash, new_hash).is_ok() {
return;
}
}
/* envelope is not in threads, so insert it */
let env = self.envelopes.entry(new_hash).or_default() as *mut Envelope;
unsafe {
self.threads.insert(&mut (*env), &self.envelopes);
}
}
pub fn update_envelope(&mut self, old_hash: EnvelopeHash, envelope: Envelope) {
self.envelopes.remove(&old_hash);
let new_hash = envelope.hash();
self.envelopes.insert(new_hash, envelope);
{
if self.threads.update_envelope(old_hash, new_hash).is_ok() {
return;
}
}
/* envelope is not in threads, so insert it */
let env = self.envelopes.entry(new_hash).or_default() as *mut Envelope;
unsafe {
self.threads.insert(&mut (*env), &self.envelopes);
}
}
pub fn insert(&mut self, envelope: Envelope) {
let hash = envelope.hash();
eprintln!("DEBUG: Inserting hash {} in {}", hash, self.folder.name());
self.envelopes.insert(hash, envelope);
let env = self.envelopes.entry(hash).or_default() as *mut Envelope;
unsafe {
self.threads.insert(&mut (*env), &self.envelopes);
}
}
pub(crate) fn insert_reply(&mut self, envelope: Envelope) {
self.insert(envelope);
//self.threads.insert_reply(envelope, &mut self.envelopes);
pub(crate) fn insert_reply(&mut self, _envelope: &Envelope) {
return;
/*
//self.insert(envelope);
eprintln!("insert_reply in collections");
self.threads.insert_reply(envelope, &mut self.envelopes);
*/
}
}

View File

@ -160,6 +160,7 @@ impl AttachmentBuilder {
};
self
}
/*
fn decode(&self) -> Vec<u8> {
// TODO merge this and standalone decode() function
let charset = match self.content_type {
@ -186,6 +187,7 @@ impl AttachmentBuilder {
self.raw.to_vec()
}
}
*/
pub fn build(self) -> Attachment {
Attachment {
content_type: self.content_type,

View File

@ -336,6 +336,11 @@ impl Envelope {
flags: Flag::default(),
}
}
pub fn set_hash(&mut self, new_hash: EnvelopeHash) {
self.hash = new_hash;
}
pub fn from_bytes(bytes: &[u8]) -> Result<Envelope> {
let mut h = DefaultHasher::new();
h.write(bytes);
@ -569,22 +574,21 @@ impl Envelope {
_ => Cow::from(String::new()),
}
}
pub fn in_reply_to_bytes<'a>(&'a self) -> &'a [u8] {
match self.in_reply_to {
Some(ref s) => s.raw(),
_ => &[],
pub fn in_reply_to(&self) -> Option<&MessageID> {
self.in_reply_to.as_ref()
}
pub fn in_reply_to_display(&self) -> Option<Cow<str>> {
if let Some(ref m) = self.in_reply_to {
Some(String::from_utf8_lossy(m.val()))
} else {
None
}
}
pub fn in_reply_to(&self) -> Cow<str> {
match self.in_reply_to {
Some(ref s) => String::from_utf8_lossy(s.val()),
_ => Cow::from(String::new()),
}
}
pub fn in_reply_to_raw(&self) -> Cow<str> {
match self.in_reply_to {
Some(ref s) => String::from_utf8_lossy(s.raw()),
_ => Cow::from(String::new()),
pub fn in_reply_to_raw(&self) -> Option<Cow<str>> {
if let Some(ref m) = self.in_reply_to {
Some(String::from_utf8_lossy(m.raw()))
} else {
None
}
}
pub fn message_id(&self) -> &MessageID {

View File

@ -57,7 +57,7 @@ impl Mailbox {
Ok(Mailbox {
folder,
collection,
name: name,
name,
..Default::default()
})
}
@ -96,18 +96,21 @@ impl Mailbox {
&self.collection.threads.thread_nodes()[i]
}
pub fn insert_sent_folder(&mut self, sent: &Mailbox) {
if !self.has_sent {
for envelope in sent.collection.envelopes.values().cloned() {
pub fn insert_sent_folder(&mut self, _sent: &Mailbox) {
/*if !self.has_sent {
for envelope in sent.collection.envelopes.values() {
self.insert_reply(envelope);
}
self.has_sent = true;
}
}*/
}
pub fn rename(&mut self, old_hash: EnvelopeHash, new_hash: EnvelopeHash) {
self.collection.rename(old_hash, new_hash);
}
pub fn update(&mut self, old_hash: EnvelopeHash, envelope: Envelope) {
self.collection.remove(&old_hash);
self.collection.insert(envelope);
self.collection.update_envelope(old_hash, envelope);
}
pub fn insert(&mut self, envelope: Envelope) -> &Envelope {
@ -116,12 +119,13 @@ impl Mailbox {
&self.collection[&hash]
}
fn insert_reply(&mut self, envelope: Envelope) {
pub fn insert_reply(&mut self, envelope: &Envelope) {
eprintln!("mailbox insert reply {}", self.name);
self.collection.insert_reply(envelope);
}
pub fn remove(&mut self, envelope_hash: EnvelopeHash) {
self.collection.remove(&envelope_hash);
self.collection.remove(envelope_hash);
// eprintln!("envelope_hash: {}\ncollection:\n{:?}", envelope_hash, self.collection);
}
}

View File

@ -47,6 +47,8 @@ use std::ops::Index;
use std::result::Result as StdResult;
use std::str::FromStr;
type Envelopes = FnvHashMap<EnvelopeHash, Envelope>;
/* Helper macros to avoid repeating ourselves */
fn rec_change_root_parent(b: &mut Vec<ThreadNode>, idx: usize, new_root: usize) {
@ -221,7 +223,7 @@ impl ThreadTree {
}
}
/* `ThreadIterator` returns messages according to the sorted order. For example, for the following
/* `ThreadsIterator` returns messages according to the sorted order. For example, for the following
* threads:
*
* ```
@ -236,6 +238,53 @@ impl ThreadTree {
* the iterator returns them as `A, B, C, D, E, F`
*/
pub struct ThreadsIterator<'a> {
pos: usize,
stack: Vec<usize>,
tree: Ref<'a, Vec<ThreadTree>>,
}
impl<'a> Iterator for ThreadsIterator<'a> {
type Item = (usize, usize);
fn next(&mut self) -> Option<(usize, usize)> {
{
let mut tree = &(*self.tree);
for i in &self.stack {
tree = &tree[*i].children;
}
if self.pos == tree.len() {
if let Some(p) = self.stack.pop() {
self.pos = p + 1;
} else {
return None;
}
} else {
debug_assert!(self.pos < tree.len());
let ret = (self.stack.len(), tree[self.pos].id);
if !tree[self.pos].children.is_empty() {
self.stack.push(self.pos);
self.pos = 0;
return Some(ret);
}
self.pos += 1;
return Some(ret);
}
}
self.next()
}
}
/* `ThreadIterator` returns messages of a specific thread according to the sorted order. For example, for the following
* thread:
*
* ```
* A_
* |_ B
* |_C
* |_D
* ```
*
* the iterator returns them as `A, B, C, D`
*/
pub struct ThreadIterator<'a> {
init_pos: usize,
pos: usize,
@ -355,7 +404,7 @@ pub struct Threads {
tree: RefCell<Vec<ThreadTree>>,
message_ids: FnvHashMap<Vec<u8>, usize>,
hash_set: FnvHashSet<EnvelopeHash>,
pub hash_set: FnvHashSet<EnvelopeHash>,
sort: RefCell<(SortField, SortOrder)>,
subsort: RefCell<(SortField, SortOrder)>,
}
@ -504,8 +553,7 @@ impl Threads {
}
}
// FIXME: Split this function
pub fn new(collection: &mut FnvHashMap<EnvelopeHash, Envelope>) -> Threads {
pub fn new(collection: &mut Envelopes) -> Threads {
/* To reconstruct thread information from the mails we need: */
/* a vector to hold thread members */
@ -529,20 +577,46 @@ impl Threads {
* References / In-Reply-To headers */
t.link_threads(collection);
t.create_root_set(collection);
t.build_collection(collection);
for (i, _t) in t.thread_nodes.iter().enumerate() {
eprintln!("Thread #{}, children {}", i, _t.children.len());
if !_t.children.is_empty() {
eprintln!("{:?}", _t.children);
}
if let Some(m) = _t.message {
eprintln!("\tmessage: {}", collection[&m].subject());
} else {
eprintln!("\tNo message");
}
}
eprintln!("\n");
for (i, _t) in t.tree.borrow().iter().enumerate() {
eprintln!("Tree #{} id {}, children {}", i, _t.id, _t.children.len());
if let Some(m) = t.thread_nodes[_t.id].message {
eprintln!("\tmessage: {}", collection[&m].subject());
} else {
eprintln!("\tNo message");
}
}
t
}
fn create_root_set(&mut self, collection: &Envelopes) {
/* Walk over the elements of message_ids, and gather a list of the ThreadNode objects that
* have no parents. These are the root messages of each thread */
let mut root_set: Vec<usize> = Vec::with_capacity(collection.len());
/* Find the root set */
'root_set: for v in t.message_ids.values() {
if t.thread_nodes[*v].parent.is_none() {
'root_set: for v in self.message_ids.values() {
if self.thread_nodes[*v].parent.is_none() {
root_set.push(*v);
}
}
let mut roots_to_remove: Vec<usize> = Vec::with_capacity(root_set.len());
/* Prune empty thread nodes */
t.prune_empty_nodes(&mut root_set);
self.prune_empty_nodes(&mut root_set);
/* "Group root set by subject."
*
@ -553,19 +627,19 @@ impl Threads {
let mut subject_table: FnvHashMap<Vec<u8>, (bool, usize)> =
FnvHashMap::with_capacity_and_hasher(collection.len(), Default::default());
for r in &root_set {
for (i, &r) in root_set.iter().enumerate() {
/* "Find the subject of that sub-tree": */
let (mut subject, mut is_re): (_, bool) = if t.thread_nodes[*r].message.is_some() {
let (mut subject, mut is_re): (_, bool) = if self.thread_nodes[r].message.is_some() {
/* "If there is a message in the Container, the subject is the subject of that
* message. " */
let msg_idx = t.thread_nodes[*r].message.unwrap();
let msg_idx = self.thread_nodes[r].message.unwrap();
let envelope = &collection[&msg_idx];
(envelope.subject(), !envelope.references().is_empty())
} else {
/* "If there is no message in the Container, then the Container will have at least
* one child Container, and that Container will have a message. Use the subject of
* that message instead." */
let msg_idx = t.thread_nodes[t.thread_nodes[*r].children[0]]
let mut msg_idx = self.thread_nodes[self.thread_nodes[r].children[0]]
.message
.unwrap();
let envelope = &collection[&msg_idx];
@ -591,17 +665,17 @@ impl Threads {
* "The container in the table has a ``Re:'' version of this subject, and this
* container has a non-``Re:'' version of this subject. The non-re version is the
* more interesting of the two." */
if (!t.thread_nodes[id].has_message() && t.thread_nodes[*r].has_message())
if (!self.thread_nodes[id].has_message() && self.thread_nodes[r].has_message())
|| (other_is_re && !is_re)
{
mem::replace(
subject_table.entry(stripped_subj.to_vec()).or_default(),
(is_re, *r),
(is_re, r),
);
}
} else {
/* "There is no container in the table with this subject" */
subject_table.insert(stripped_subj.to_vec(), (is_re, *r));
subject_table.insert(stripped_subj.to_vec(), (is_re, r));
}
}
@ -609,13 +683,14 @@ impl Threads {
* root set. Now iterate over the root set, and gather together the difference." */
for i in 0..root_set.len() {
let r = root_set[i];
/* "Find the subject of this Container (as above.)" */
let (mut subject, mut is_re): (_, bool) = if t.thread_nodes[r].message.is_some() {
let msg_idx = t.thread_nodes[r].message.unwrap();
let (mut subject, mut is_re): (_, bool) = if self.thread_nodes[r].message.is_some() {
let msg_idx = self.thread_nodes[r].message.unwrap();
let envelope = &collection[&msg_idx];
(envelope.subject(), !envelope.references().is_empty())
} else {
let msg_idx = t.thread_nodes[t.thread_nodes[r].children[0]]
let msg_idx = self.thread_nodes[self.thread_nodes[r].children[0]]
.message
.unwrap();
let envelope = &collection[&msg_idx];
@ -630,7 +705,7 @@ impl Threads {
let (other_is_re, other_idx) = subject_table[subject];
/* "If it is null, or if it is this container, continue." */
if !t.thread_nodes[other_idx].has_message() || other_idx == r {
if !self.thread_nodes[other_idx].has_message() || other_idx == r {
continue;
}
@ -641,10 +716,10 @@ impl Threads {
* "If both are dummies, append one's children to the other, and remove the now-empty
* container."
*/
if !t.thread_nodes[r].has_message() && !t.thread_nodes[other_idx].has_message() {
let children = t.thread_nodes[r].children.clone();
if !self.thread_nodes[r].has_message() && !self.thread_nodes[other_idx].has_message() {
let children = self.thread_nodes[r].children.clone();
for c in children {
make!((other_idx) parent of (c), &mut t.thread_nodes);
make!((other_idx) parent of (c), &mut self.thread_nodes);
}
roots_to_remove.push(i);
@ -652,14 +727,18 @@ impl Threads {
* of the empty, and a sibling of the other ``real'' messages with the same subject
* (the empty's children.)"
*/
} else if t.thread_nodes[r].has_message() && !t.thread_nodes[other_idx].has_message() {
make!((other_idx) parent of (r), &mut t.thread_nodes);
} else if self.thread_nodes[r].has_message()
&& !self.thread_nodes[other_idx].has_message()
{
make!((other_idx) parent of (r), &mut self.thread_nodes);
if !root_set.contains(&other_idx) {
root_set.push(other_idx);
}
roots_to_remove.push(i);
} else if !t.thread_nodes[r].has_message() && t.thread_nodes[other_idx].has_message() {
make!((r) parent of (other_idx), &mut t.thread_nodes);
} else if !self.thread_nodes[r].has_message()
&& self.thread_nodes[other_idx].has_message()
{
make!((r) parent of (other_idx), &mut self.thread_nodes);
if let Some(pos) = root_set.iter().position(|&i| i == other_idx) {
roots_to_remove.push(pos);
}
@ -667,8 +746,8 @@ impl Threads {
* "If that container is a non-empty, and that message's subject does not begin with ``Re:'', but this
* message's subject does, then make this be a child of the other."
*/
} else if t.thread_nodes[other_idx].has_message() && !other_is_re && is_re {
make!((other_idx) parent of (r), &mut t.thread_nodes);
} else if self.thread_nodes[other_idx].has_message() && !other_is_re && is_re {
make!((other_idx) parent of (r), &mut self.thread_nodes);
roots_to_remove.push(i);
/* "If that container is a non-empty, and that message's subject begins with ``Re:'', but this
@ -677,8 +756,8 @@ impl Threads {
* without will be in the hash table, regardless of the order in which they were
* seen.)"
*/
} else if t.thread_nodes[other_idx].has_message() && other_is_re && !is_re {
make!((r) parent of (other_idx), &mut t.thread_nodes);
} else if self.thread_nodes[other_idx].has_message() && other_is_re && !is_re {
make!((r) parent of (other_idx), &mut self.thread_nodes);
if let Some(pos) = root_set.iter().position(|r| *r == other_idx) {
roots_to_remove.push(pos);
}
@ -688,11 +767,11 @@ impl Threads {
* hierarchical relationship which might not be true."
*/
} else {
t.thread_nodes.push(Default::default());
let new_id = t.thread_nodes.len() - 1;
t.thread_nodes[new_id].thread_group = new_id;
make!((new_id) parent of (r), &mut t.thread_nodes);
make!((new_id) parent of (other_idx), &mut t.thread_nodes);
self.thread_nodes.push(Default::default());
let new_id = self.thread_nodes.len() - 1;
self.thread_nodes[new_id].thread_group = new_id;
make!((new_id) parent of (r), &mut self.thread_nodes);
make!((new_id) parent of (other_idx), &mut self.thread_nodes);
root_set[i] = new_id;
if let Some(pos) = root_set.iter().position(|r| *r == other_idx) {
roots_to_remove.push(pos);
@ -705,41 +784,118 @@ impl Threads {
root_set.remove(r);
}
t.root_set = RefCell::new(root_set);
t.build_collection(&collection);
t
self.root_set = RefCell::new(root_set);
}
pub fn threads_iter(&self) -> ThreadsIterator {
ThreadsIterator {
pos: 0,
stack: Vec::with_capacity(4),
tree: self.tree.borrow(),
}
}
pub fn thread_iter(&self, index: usize) -> ThreadIterator {
ThreadIterator {
init_pos: index,
pos: index,
stack: Vec::new(),
stack: Vec::with_capacity(4),
tree: self.tree.borrow(),
}
}
pub fn update_envelope(&mut self, old_hash: EnvelopeHash, envelope: &Envelope) {
pub fn update_envelope(
&mut self,
old_hash: EnvelopeHash,
new_hash: EnvelopeHash,
) -> Result<(), ()> {
/* must update:
* - hash_set
* - message fields in thread_nodes
*/
self.hash_set.remove(&old_hash);
self.hash_set.insert(envelope.hash());
let node = self
if let Some(node) = self
.thread_nodes
.iter_mut()
.find(|n| n.message.map(|n| n == old_hash).unwrap_or(false))
.unwrap();
node.message = Some(envelope.hash());
{
node.message = Some(new_hash);
} else {
return Err(());
}
self.hash_set.insert(new_hash);
Ok(())
}
pub fn update(&mut self, collection: &mut FnvHashMap<EnvelopeHash, Envelope>) {
#[inline]
pub fn remove(&mut self, envelope_hash: EnvelopeHash, collection: &mut Envelopes) {
self.hash_set.remove(&envelope_hash);
//{
// let pos = self
// .thread_nodes
// .iter()
// .position(|n| n.message.map(|n| n == envelope_hash).unwrap_or(false))
// .unwrap();
// eprintln!("DEBUG: {} in threads is idx= {}", envelope_hash, pos);
//}
let t_id: usize;
{
if let Some(pos) = self
.thread_nodes
.iter()
.position(|n| n.message.map(|n| n == envelope_hash).unwrap_or(false))
{
t_id = pos;
self.thread_nodes[pos].message = None;
} else {
/* else it was deleted during a thread_rebuild or others */
return;
}
}
let mut node_idx = t_id;
/* Trace path back to root ThreadNode */
while let Some(p) = &self.thread_nodes[node_idx].parent {
node_idx = *p;
}
{
let tree = self.tree.get_mut();
if let Some(pos) = tree.iter().position(|t| t.id == node_idx) {
tree[pos].children.clear();
if node_idx == t_id {
tree.remove(pos);
} else {
node_build(
&mut tree[pos],
node_idx,
&mut self.thread_nodes,
1,
collection,
);
}
}
}
let mut root_set: Vec<usize> = self.tree.borrow().iter().map(|t| t.id).collect();
self.prune_empty_nodes(&mut root_set);
self.tree.borrow_mut().retain(|t| root_set.contains(&t.id));
}
pub fn amend(&mut self, collection: &mut Envelopes) {
let new_hash_set = FnvHashSet::from_iter(collection.keys().cloned());
let difference: Vec<EnvelopeHash> =
self.hash_set.difference(&new_hash_set).cloned().collect();
for h in difference {
self.remove(h, collection);
}
let difference: Vec<EnvelopeHash> =
new_hash_set.difference(&self.hash_set).cloned().collect();
for h in difference {
eprintln!("inserting {}", collection[&h].subject());
let env = collection.entry(h).or_default() as *mut Envelope;
unsafe {
// `collection` is borrowed immutably and `insert` only changes the envelope's
@ -747,25 +903,15 @@ impl Threads {
self.insert(&mut (*env), collection);
}
}
self.create_root_set(collection);
let difference: Vec<EnvelopeHash> =
self.hash_set.difference(&new_hash_set).cloned().collect();
for h in difference {
self.hash_set.remove(&h);
let node = self
.thread_nodes
.iter_mut()
.find(|n| n.message.map(|n| n == h).unwrap_or(false))
.unwrap();
node.message = None;
}
let mut root_set: Vec<usize> = self.tree.borrow().iter().map(|t| t.id).collect();
self.prune_empty_nodes(&mut root_set);
let tree = self.tree.get_mut();
tree.retain(|t| root_set.contains(&t.id));
}
pub fn insert(
&mut self,
envelope: &mut Envelope,
collection: &FnvHashMap<EnvelopeHash, Envelope>,
) {
pub fn insert(&mut self, envelope: &mut Envelope, collection: &Envelopes) {
self.link_envelope(envelope);
{
let id = self.message_ids[envelope.message_id().raw()];
@ -773,30 +919,29 @@ impl Threads {
}
}
pub fn insert_reply(
&mut self,
envelope: Envelope,
collection: &mut FnvHashMap<EnvelopeHash, Envelope>,
) -> bool {
pub fn insert_reply(&mut self, envelope: &Envelope, collection: &mut Envelopes) -> bool {
//return false;
{
let in_reply_to = envelope.in_reply_to_bytes();
if !self.message_ids.contains_key(in_reply_to) {
if let Some(in_reply_to) = envelope.in_reply_to() {
if !self.message_ids.contains_key(in_reply_to.raw()) {
return false;
}
} else {
return false;
}
}
let hash: EnvelopeHash = envelope.hash();
collection.insert(hash, envelope);
collection.insert(hash, envelope.clone());
{
let envelope: &mut Envelope = collection.entry(hash).or_default();
/* FIXME: This does not update show_subject and len which is done in node_build upon
* creation */
self.link_envelope(envelope);
let envelope = collection.entry(hash).or_default() as *mut Envelope;
unsafe {
/* Safe because insert only changes envelope's fields and nothing more */
self.insert(&mut (*envelope), &collection);
}
}
let envelope: &Envelope = &collection[&hash];
{
let in_reply_to = envelope.in_reply_to_bytes();
let in_reply_to = envelope.in_reply_to().unwrap().raw();
let parent_id = self.message_ids[in_reply_to];
self.rebuild_thread(parent_id, collection);
}
@ -804,7 +949,7 @@ impl Threads {
}
/* Update thread tree information on envelope insertion */
fn rebuild_thread(&mut self, id: usize, collection: &FnvHashMap<EnvelopeHash, Envelope>) {
fn rebuild_thread(&mut self, id: usize, collection: &Envelopes) {
let mut node_idx = id;
let mut stack = Vec::with_capacity(32);
@ -846,9 +991,10 @@ impl Threads {
/*
* Finalize instance by building the thread tree, set show subject and thread lengths etc. */
fn build_collection(&mut self, collection: &FnvHashMap<EnvelopeHash, Envelope>) {
fn build_collection(&mut self, collection: &Envelopes) {
{
let tree = self.tree.get_mut();
tree.clear();
for i in self.root_set.borrow().iter() {
let mut tree_node = ThreadTree::new(*i);
node_build(
@ -865,11 +1011,7 @@ impl Threads {
self.inner_subsort_by(*self.subsort.borrow(), collection);
}
fn inner_subsort_by(
&self,
subsort: (SortField, SortOrder),
collection: &FnvHashMap<EnvelopeHash, Envelope>,
) {
fn inner_subsort_by(&self, subsort: (SortField, SortOrder), collection: &Envelopes) {
let tree = &mut self.tree.borrow_mut();
for mut t in tree.iter_mut() {
t.children.sort_by(|a, b| match subsort {
@ -909,11 +1051,7 @@ impl Threads {
}
}
fn inner_sort_by(
&self,
sort: (SortField, SortOrder),
collection: &FnvHashMap<EnvelopeHash, Envelope>,
) {
fn inner_sort_by(&self, sort: (SortField, SortOrder), collection: &Envelopes) {
let tree = &mut self.tree.borrow_mut();
tree.sort_by(|a, b| match sort {
(SortField::Date, SortOrder::Desc) => {
@ -955,7 +1093,7 @@ impl Threads {
&self,
sort: (SortField, SortOrder),
subsort: (SortField, SortOrder),
collection: &FnvHashMap<EnvelopeHash, Envelope>,
collection: &Envelopes,
) {
if *self.sort.borrow() != sort {
self.inner_sort_by(sort, collection);
@ -968,7 +1106,7 @@ impl Threads {
}
pub fn thread_to_mail(&self, i: usize) -> EnvelopeHash {
let thread = &self.thread_nodes[self.root_set.borrow()[i]];
let thread = &self.thread_nodes[i];
thread.message().unwrap()
}
@ -976,6 +1114,10 @@ impl Threads {
&self.thread_nodes
}
pub fn len(&self) -> usize {
self.thread_nodes.len()
}
pub fn root_len(&self) -> usize {
self.tree.borrow().len()
}
@ -1089,7 +1231,7 @@ impl Threads {
}
}
fn link_threads(&mut self, collection: &mut FnvHashMap<EnvelopeHash, Envelope>) {
fn link_threads(&mut self, collection: &mut Envelopes) {
for v in collection.values_mut() {
self.link_envelope(v);
}
@ -1101,7 +1243,7 @@ impl Index<usize> for Threads {
fn index(&self, index: usize) -> &ThreadNode {
self.thread_nodes
.get(self.tree.borrow()[index].id)
.get(index)
.expect("thread index out of bounds")
}
}
@ -1111,23 +1253,31 @@ fn node_build(
idx: usize,
thread_nodes: &mut Vec<ThreadNode>,
indentation: usize,
collection: &FnvHashMap<EnvelopeHash, Envelope>,
collection: &Envelopes,
) {
if let Some(hash) = thread_nodes[idx].message {
if let Some(parent_id) = thread_nodes[idx].parent {
if !collection.contains_key(&hash) {
/* invalidate node */
// thread_nodes[idx].message = None;
} else if let Some(parent_id) = thread_nodes[idx].parent {
if let Some(parent_hash) = thread_nodes[parent_id].message {
/* decide if the subject should be shown in the UI.
* If parent subject is Foobar and reply is `Re: Foobar`
* then showing the reply's subject can be reduntant
*/
let mut subject = collection[&hash].subject();
let mut subject = subject.to_mut().as_bytes();
let subject = subject.strip_prefixes();
let mut parent_subject = collection[&parent_hash].subject();
let mut parent_subject = parent_subject.to_mut().as_bytes();
let parent_subject = parent_subject.strip_prefixes();
if subject == parent_subject {
thread_nodes[idx].show_subject = false;
if !collection.contains_key(&parent_hash) {
/* invalidate node */
// thread_nodes[parent_id].message = None;
} else {
/* decide if the subject should be shown in the UI.
* If parent subject is Foobar and reply is `Re: Foobar`
* then showing the reply's subject can be reduntant
*/
let mut subject = collection[&hash].subject();
let mut subject = subject.to_mut().as_bytes();
let subject = subject.strip_prefixes();
let mut parent_subject = collection[&parent_hash].subject();
let mut parent_subject = parent_subject.to_mut().as_bytes();
let parent_subject = parent_subject.strip_prefixes();
if subject == parent_subject {
thread_nodes[idx].show_subject = false;
}
}
}
}
@ -1146,7 +1296,12 @@ fn node_build(
let mut child_vec: Vec<ThreadTree> = Vec::new();
thread_nodes[idx].len = thread_nodes[idx].children.len();
for c in thread_nodes[idx].children.clone() {
/* No child/parent relationship is mutated at any point and no nodes are added or removed. Only
* each node's fields change, so the following is safe.
*/
let children = &thread_nodes[idx].children as *const Vec<usize>;
for &c in unsafe { &(*children) } {
let mut new_tree = ThreadTree::new(c);
node_build(&mut new_tree, c, thread_nodes, indentation, collection);
thread_nodes[idx].len += thread_nodes[c].len;

View File

@ -60,6 +60,8 @@ fn main() {
let receiver = state.receiver();
let worker_receiver = state.worker_receiver();
/* Register some reasonably useful interfaces */
let menu = Entity::from(Box::new(AccountMenu::new(&state.context.accounts)));
let listing = listing::Listing::default();
@ -84,6 +86,7 @@ fn main() {
for e in events {
state.rcv_event(e);
}
state.redraw();
/* Poll on all channels. Currently we have the input channel for stdin, watching events and the signal watcher. */
chan_select! {
@ -151,11 +154,12 @@ fn main() {
for idx_a in 0..state.context.accounts.len() {
let len = state.context.accounts[idx_a].len();
for idx_m in 0..len {
match state.context.account_status(idx_a, idx_m) {
Ok(true) => {
Ok(_) => {
render_flag = true;
},
Ok(false) | Err(_) => {}
Err(_) => {}
}
}
}
@ -181,6 +185,10 @@ fn main() {
}
}
},
worker_receiver.recv() -> _ => {
/* Some worker thread finished their job, acknowledge
* it and move on*/
},
}
} // end of 'inner

View File

@ -0,0 +1,187 @@
use super::*;
use components::utilities::PageMovement;
pub trait IndexContent: Component {
/* Handles the drawing of one entry */
fn make_entry(&mut self, idx: usize) -> ();
/* Handles what happens when the user selects an entry in the index listing */
fn enter_entry(&mut self, grid: &mut CellBuffer, area: Area, context: &mut Context) -> ();
/* Refreshes content */
fn refresh(&mut self, context: &mut Context) -> ();
fn search(&self, term: &str) -> Option<usize>;
}
#[derive(Debug, PartialEq)]
enum IndexState {
Uninitialized,
Listing,
Unfocused,
Search,
}
#[derive(Debug)]
pub struct Index {
cursor_pos: usize,
new_cursor_pos: usize,
length: usize,
/// Cache current view.
canvas: CellBuffer,
/// If we must redraw on next redraw event
dirty: bool,
state: IndexState,
content: Box<IndexContent>,
}
impl Index {
fn highlight_line(&self, grid: &mut CellBuffer, area: Area, idx: usize) {
let fg_color = Color::Default;
let bg_color = if self.cursor_pos == idx {
Color::Byte(246)
/* } else if idx % 2 == 0 {
Color::Byte(236)*/
} else {
Color::Default
};
change_colors(grid, area, fg_color, bg_color);
}
}
impl Component for Index {
fn draw(&mut self, grid: &mut CellBuffer, area: Area, context: &mut Context) {
if !self.dirty {
return;
}
match self.state {
IndexState::Uninitialized => {
self.content.refresh(context);
/* copy area */
self.state = IndexState::Listing;
self.draw(grid, area, context);
return;
}
IndexState::Listing => {
/* rehighlight entries, redraw pages */
let upper_left = upper_left!(area);
let bottom_right = bottom_right!(area);
let rows = get_y(bottom_right) - get_y(upper_left) + 1;
let prev_page_no = (self.cursor_pos).wrapping_div(rows);
let page_no = (self.new_cursor_pos).wrapping_div(rows);
let top_idx = page_no * rows;
if self.new_cursor_pos >= self.length {
self.new_cursor_pos = self.length - 1;
}
/* If cursor position has changed, remove the highlight from the previous position and
* apply it in the new one. */
if self.cursor_pos != self.new_cursor_pos && prev_page_no == page_no {
let old_cursor_pos = self.cursor_pos;
self.cursor_pos = self.new_cursor_pos;
for idx in &[old_cursor_pos, self.new_cursor_pos] {
if *idx >= self.length {
continue; //bounds check
}
let new_area = (
set_y(upper_left, get_y(upper_left) + (*idx % rows)),
set_y(bottom_right, get_y(upper_left) + (*idx % rows)),
);
self.highlight_line(grid, new_area, *idx);
context.dirty_areas.push_back(new_area);
}
return;
} else if self.cursor_pos != self.new_cursor_pos {
self.cursor_pos = self.new_cursor_pos;
}
/* Page_no has changed, so draw new page */
copy_area(
grid,
&self.canvas,
area,
((0, top_idx), (500 - 1, self.length)),
);
self.highlight_line(
grid,
(
(
get_x(upper_left),
get_y(upper_left) + (self.cursor_pos % rows),
),
(
get_x(bottom_right),
get_y(upper_left) + (self.cursor_pos % rows),
),
),
self.cursor_pos,
);
context.dirty_areas.push_back(area);
}
IndexState::Unfocused => {
self.content.draw(grid, area, context);
}
IndexState::Search => unreachable!(),
}
self.dirty = false;
return;
}
fn process_event(&mut self, event: &UIEvent, context: &mut Context) -> bool {
if self.content.process_event(event, context) {
return true;
}
match event.event_type {
UIEventType::Input(Key::Up) => {
if self.cursor_pos > 0 {
self.new_cursor_pos = self.new_cursor_pos.saturating_sub(1);
self.set_dirty();
}
return true;
}
UIEventType::Input(Key::Down) => {
if self.length > 0 && self.new_cursor_pos < self.length - 1 {
self.new_cursor_pos += 1;
self.set_dirty();
}
return true;
}
UIEventType::Input(Key::Char('\n')) if self.state == IndexState::Listing => {
self.state = IndexState::Unfocused;
self.set_dirty();
return true;
}
UIEventType::Input(Key::Char('i')) if self.state == IndexState::Unfocused => {
self.state = IndexState::Listing;
self.set_dirty();
return true;
}
UIEventType::ChangeMode(UIMode::Normal) => {
self.set_dirty();
}
UIEventType::Resize => {
self.set_dirty();
}
_ => {}
}
false
}
fn is_dirty(&self) -> bool {
self.dirty || self.content.is_dirty()
}
fn set_dirty(&mut self) {
self.dirty = true;
}
}
impl fmt::Display for Index {
fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
Display::fmt(&self.content, f)
}
}

View File

@ -0,0 +1,128 @@
/*
* meli - ui crate.
*
* Copyright 2017-2018 Manos Pitsidianakis
*
* This file is part of meli.
*
* meli is free software: you can redistribute it and/or modify
* it under the terms of the GNU General Public License as published by
* the Free Software Foundation, either version 3 of the License, or
* (at your option) any later version.
*
* meli is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* GNU General Public License for more details.
*
* You should have received a copy of the GNU General Public License
* along with meli. If not, see <http://www.gnu.org/licenses/>.
*/
/*! Entities that handle Mail specific functions.
*/
use super::*;
pub mod index;
pub use self::index::*;
#[derive(Debug)]
struct MenuEntry {
name: String,
subentries: Vec<MenuEntry>,
index: Index,
}
#[derive(Debug)]
pub struct Indexer {
entries: Vec<MenuEntry>,
dirty: bool,
cursor: Vec<usize>,
}
impl fmt::Display for Indexer {
fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
// TODO display subject/info
write!(f, "index")
}
}
impl Default for Indexer {
fn default() -> Self {
Indexer {
entries: Vec::with_capacity(8),
dirty: true,
cursor: Vec::with_capacity(8),
}
}
}
impl Indexer {
fn draw_menu(&mut self, grid: &mut CellBuffer, area: Area, context: &mut Context) {}
}
impl Component for Indexer {
fn draw(&mut self, grid: &mut CellBuffer, area: Area, context: &mut Context) {
if !self.is_dirty() {
return;
}
clear_area(grid, area);
let upper_left = upper_left!(area);
let bottom_right = bottom_right!(area);
let total_cols = get_x(bottom_right) - get_x(upper_left);
let index_entity_width = (30 * total_cols) / 100;
let mid = get_x(bottom_right) - index_entity_width;
for i in get_y(upper_left)..=get_y(bottom_right) {
set_and_join_box(grid, (mid, i), VERT_BOUNDARY);
}
let left_menu_area = (upper_left, (set_x(bottom_right, mid - 1)));
let right_index_area = (set_x(upper_left, mid + 1), bottom_right);
self.draw_menu(grid, left_menu_area, context);
self.entries[self.cursor[0]]
.index
.draw(grid, right_index_area, context);
self.dirty = false;
context.dirty_areas.push_back(area);
}
fn process_event(&mut self, event: &UIEvent, _context: &mut Context) -> bool {
if !self.entries[self.cursor[0]]
.index
.process_event(event, _context)
{
for i in 0..self.entries.len() {
if i == self.cursor[0] {
continue;
}
self.entries[i].index.process_event(event, _context);
}
}
match event.event_type {
UIEventType::RefreshMailbox(c) => {
self.dirty = true;
}
UIEventType::ChangeMode(UIMode::Normal) => {
self.dirty = true;
}
UIEventType::Resize => {
self.dirty = true;
}
_ => {}
}
false
}
fn is_dirty(&self) -> bool {
self.dirty
}
fn set_dirty(&mut self) {
self.dirty = true;
}
}

View File

@ -63,7 +63,7 @@ impl CompactListing {
/// Helper function to format entry strings for CompactListing */
/* TODO: Make this configurable */
fn make_entry_string(e: &Envelope, len: usize, idx: usize) -> String {
if len > 1 {
if len > 0 {
format!(
"{} {} {:.85} ({})",
idx,
@ -101,8 +101,13 @@ impl CompactListing {
/// chosen.
fn refresh_mailbox(&mut self, context: &mut Context) {
self.dirty = true;
self.cursor_pos.2 = 0;
self.new_cursor_pos.2 = 0;
if !(self.cursor_pos.0 == self.new_cursor_pos.0
&& self.cursor_pos.1 == self.new_cursor_pos.1)
{
//TODO: store cursor_pos in each folder
self.cursor_pos.2 = 0;
self.new_cursor_pos.2 = 0;
}
self.cursor_pos.1 = self.new_cursor_pos.1;
self.cursor_pos.0 = self.new_cursor_pos.0;
@ -124,7 +129,7 @@ impl CompactListing {
Color::Default,
Color::Default,
((0, 0), (MAX_COLS - 1, 0)),
true,
false,
);
return;
}
@ -142,7 +147,7 @@ impl CompactListing {
Color::Default,
Color::Default,
((0, 0), (MAX_COLS - 1, 0)),
true,
false,
);
return;
}
@ -204,6 +209,9 @@ impl CompactListing {
let mailbox = &context.accounts[self.cursor_pos.0][self.cursor_pos.1]
.as_ref()
.unwrap();
if mailbox.len() == 0 {
return;
}
let threads = &mailbox.collection.threads;
let thread_node = threads.root_set(idx);
let thread_node = &threads.thread_nodes()[thread_node];
@ -216,6 +224,7 @@ impl CompactListing {
}
threads.thread_nodes()[iter_ptr].message().unwrap()
};
let root_envelope: &Envelope = &mailbox.collection[&i];
let fg_color = if !root_envelope.is_seen() {
Color::Byte(0)
@ -253,7 +262,12 @@ impl CompactListing {
let bottom_right = bottom_right!(area);
if self.length == 0 {
clear_area(grid, area);
copy_area(grid, &self.content, area, ((0, 0), (MAX_COLS - 1, 0)));
copy_area(
grid,
&self.content,
area,
((0, 0), (MAX_COLS - 1, self.length)),
);
context.dirty_areas.push_back(area);
return;
}
@ -346,7 +360,6 @@ impl Component for CompactListing {
if !self.is_dirty() {
return;
}
self.dirty = false;
/* Draw the entire list */
self.draw_list(grid, area, context);
} else {
@ -365,8 +378,8 @@ impl Component for CompactListing {
}
self.view = Some(ThreadView::new(self.cursor_pos, None, context));
self.view.as_mut().unwrap().draw(grid, area, context);
self.dirty = false;
}
self.dirty = false;
}
fn process_event(&mut self, event: &UIEvent, context: &mut Context) -> bool {
if let Some(ref mut v) = self.view {

View File

@ -49,7 +49,7 @@ impl fmt::Display for Listing {
impl Default for Listing {
fn default() -> Self {
Listing::Compact(Default::default())
Listing::Threaded(Default::default())
}
}

View File

@ -35,10 +35,13 @@ pub struct ThreadListing {
subsort: (SortField, SortOrder),
/// Cache current view.
content: CellBuffer,
locations: Vec<(usize, usize)>,
/// If we must redraw on next redraw event
dirty: bool,
/// If `self.view` exists or not.
unfocused: bool,
initialised: bool,
view: Option<MailView>,
}
@ -64,17 +67,24 @@ impl ThreadListing {
sort: (Default::default(), Default::default()),
subsort: (Default::default(), Default::default()),
content,
locations: Vec::new(),
dirty: true,
unfocused: false,
view: None,
initialised: false,
}
}
/// Fill the `self.content` `CellBuffer` with the contents of the account folder the user has
/// chosen.
fn refresh_mailbox(&mut self, context: &mut Context) {
self.dirty = true;
self.cursor_pos.2 = 0;
self.new_cursor_pos.2 = 0;
if !(self.cursor_pos.0 == self.new_cursor_pos.0
&& self.cursor_pos.1 == self.new_cursor_pos.1)
{
//TODO: store cursor_pos in each folder
self.cursor_pos.2 = 0;
self.new_cursor_pos.2 = 0;
}
self.cursor_pos.1 = self.new_cursor_pos.1;
self.cursor_pos.0 = self.new_cursor_pos.0;
@ -85,20 +95,27 @@ impl ThreadListing {
});
// Get mailbox as a reference.
//
loop {
// TODO: Show progress visually
if context.accounts[self.cursor_pos.0]
.status(self.cursor_pos.1)
.is_ok()
{
break;
match context.accounts[self.cursor_pos.0].status(self.cursor_pos.1) {
Ok(_) => {}
Err(_) => {
self.content = CellBuffer::new(MAX_COLS, 1, Cell::with_char(' '));
self.length = 0;
write_string_to_grid(
"Loading.",
&mut self.content,
Color::Default,
Color::Default,
((0, 0), (MAX_COLS - 1, 0)),
false,
);
return;
}
}
let mailbox = &context.accounts[self.cursor_pos.0][self.cursor_pos.1]
.as_ref()
.unwrap();
self.length = mailbox.collection.threads.root_len();
self.length = mailbox.collection.threads.len();
self.content = CellBuffer::new(MAX_COLS, self.length + 1, Cell::with_char(' '));
if self.length == 0 {
write_string_to_grid(
@ -118,25 +135,19 @@ impl ThreadListing {
let threads = &mailbox.collection.threads;
threads.sort_by(self.sort, self.subsort, &mailbox.collection);
let thread_nodes: &Vec<ThreadNode> = &threads.thread_nodes();
let mut iter = threads.root_iter().peekable();
let len = threads.root_len().to_string().chars().count();
self.locations = threads.threads_iter().collect();
let mut iter = self.locations.iter().peekable();
/* This is just a desugared for loop so that we can use .peek() */
let mut idx = 0;
while let Some(i) = iter.next() {
let thread_node = &thread_nodes[i];
while let Some((indentation, i)) = iter.next() {
let thread_node = &thread_nodes[*i];
if !thread_node.has_message() {
continue;
}
let indentation = thread_node.indentation();
if indentation == 0 {
if *indentation == 0 {
thread_idx += 1;
}
match iter.peek() {
Some(&x) if thread_nodes[x].indentation() == indentation => {
Some((x, _)) if *x == *indentation => {
indentations.pop();
indentations.push(true);
}
@ -145,51 +156,58 @@ impl ThreadListing {
indentations.push(false);
}
}
if threads.has_sibling(i) {
if threads.has_sibling(*i) {
indentations.pop();
indentations.push(true);
}
let envelope: &Envelope = &mailbox.collection[&thread_node.message().unwrap()];
let fg_color = if !envelope.is_seen() {
Color::Byte(0)
if thread_node.has_message() {
let envelope: &Envelope = &mailbox.collection[&thread_node.message().unwrap()];
let fg_color = if !envelope.is_seen() {
Color::Byte(0)
} else {
Color::Default
};
let bg_color = if !envelope.is_seen() {
Color::Byte(251)
} else if thread_idx % 2 == 0 {
Color::Byte(236)
} else {
Color::Default
};
let (x, _) = write_string_to_grid(
&ThreadListing::make_thread_entry(
envelope,
idx,
*indentation,
*i,
threads,
&indentations,
self.length,
// context.accounts[self.cursor_pos.0].backend.operation(envelope.hash())
),
&mut self.content,
fg_color,
bg_color,
((0, idx), (MAX_COLS - 1, idx)),
false,
);
for x in x..MAX_COLS {
self.content[(x, idx)].set_ch(' ');
self.content[(x, idx)].set_bg(bg_color);
}
} else {
Color::Default
};
let bg_color = if !envelope.is_seen() {
Color::Byte(251)
} else if thread_idx % 2 == 0 {
Color::Byte(236)
} else {
Color::Default
};
let (x, _) = write_string_to_grid(
&ThreadListing::make_thread_entry(
envelope,
idx,
indentation,
i,
threads,
&indentations,
len,
// context.accounts[self.cursor_pos.0].backend.operation(envelope.hash())
),
&mut self.content,
fg_color,
bg_color,
((0, idx), (MAX_COLS - 1, idx)),
false,
);
for x in x..MAX_COLS {
self.content[(x, idx)].set_ch(' ');
self.content[(x, idx)].set_bg(bg_color);
for x in 0..MAX_COLS {
self.content[(x, idx)].set_ch(' ');
self.content[(x, idx)].set_bg(Color::Default);
}
}
match iter.peek() {
Some(&x) if thread_nodes[x].indentation() > indentation => {
Some((x, _)) if *x > *indentation => {
indentations.push(false);
}
Some(&x) if thread_nodes[x].indentation() < indentation => {
for _ in 0..(indentation - thread_nodes[x].indentation()) {
Some((x, _)) if *x < *indentation => {
for _ in 0..(*indentation - *x) {
indentations.pop();
}
}
@ -203,49 +221,64 @@ impl ThreadListing {
let mailbox = &context.accounts[self.cursor_pos.0][self.cursor_pos.1]
.as_ref()
.unwrap();
let envelope: &Envelope = mailbox.thread_to_mail(idx);
if mailbox.len() == 0 {
return;
}
if let Some(hash) =
mailbox.collection.threads.thread_nodes()[self.locations[idx].1].message()
{
let envelope: &Envelope = &mailbox.collection[&hash];
let fg_color = if !envelope.is_seen() {
Color::Byte(0)
} else {
Color::Default
};
let bg_color = if !envelope.is_seen() {
Color::Byte(251)
} else if idx % 2 == 0 {
Color::Byte(236)
} else {
Color::Default
};
change_colors(
&mut self.content,
((0, idx), (MAX_COLS - 1, idx)),
fg_color,
bg_color,
);
let fg_color = if !envelope.is_seen() {
Color::Byte(0)
} else {
Color::Default
};
let bg_color = if !envelope.is_seen() {
Color::Byte(251)
} else if idx % 2 == 0 {
Color::Byte(236)
} else {
Color::Default
};
change_colors(
&mut self.content,
((0, idx), (MAX_COLS - 1, idx)),
fg_color,
bg_color,
);
}
}
fn highlight_line(&self, grid: &mut CellBuffer, area: Area, idx: usize, context: &Context) {
let mailbox = &context.accounts[self.cursor_pos.0][self.cursor_pos.1]
.as_ref()
.unwrap();
let envelope: &Envelope = mailbox.thread_to_mail(idx);
if mailbox.len() == 0 || mailbox.len() <= idx {
return;
}
let fg_color = if !envelope.is_seen() {
Color::Byte(0)
} else {
Color::Default
};
let bg_color = if self.cursor_pos.2 == idx {
Color::Byte(246)
} else if !envelope.is_seen() {
Color::Byte(251)
} else if idx % 2 == 0 {
Color::Byte(236)
} else {
Color::Default
};
change_colors(grid, area, fg_color, bg_color);
if let Some(hash) =
mailbox.collection.threads.thread_nodes()[self.locations[idx].1].message()
{
let envelope: &Envelope = &mailbox.collection[&hash];
let fg_color = if !envelope.is_seen() {
Color::Byte(0)
} else {
Color::Default
};
let bg_color = if self.cursor_pos.2 == idx {
Color::Byte(246)
} else if !envelope.is_seen() {
Color::Byte(251)
} else if idx % 2 == 0 {
Color::Byte(236)
} else {
Color::Default
};
change_colors(grid, area, fg_color, bg_color);
}
}
/// Draw the list of `Envelope`s.
@ -266,7 +299,25 @@ impl ThreadListing {
let page_no = (self.new_cursor_pos.2).wrapping_div(rows);
let top_idx = page_no * rows;
if !self.initialised {
self.initialised = false;
copy_area(
grid,
&self.content,
area,
((0, top_idx), (MAX_COLS - 1, self.length)),
);
self.highlight_line(
grid,
(
set_y(upper_left, get_y(upper_left) + (self.cursor_pos.2 % rows)),
set_y(bottom_right, get_y(upper_left) + (self.cursor_pos.2 % rows)),
),
self.cursor_pos.2,
context,
);
context.dirty_areas.push_back(area);
}
/* If cursor position has changed, remove the highlight from the previous position and
* apply it in the new one. */
if self.cursor_pos.2 != self.new_cursor_pos.2 && prev_page_no == page_no {
@ -402,8 +453,26 @@ impl Component for ThreadListing {
context.dirty_areas.push_back(area);
return;
}
/* Mark message as read */
let idx = self.cursor_pos.2;
{
let has_message: bool = {
let account = &context.accounts[self.cursor_pos.0];
let mailbox = &account[self.cursor_pos.1].as_ref().unwrap();
mailbox.collection.threads.thread_nodes()
[self.locations[self.new_cursor_pos.2].1]
.message()
.is_some()
};
if !has_message {
self.dirty = false;
/* Draw the entire list */
return self.draw_list(grid, area, context);
}
}
/* Mark message as read */
let must_highlight = {
if self.length == 0 {
false
@ -411,7 +480,8 @@ impl Component for ThreadListing {
let account = &mut context.accounts[self.cursor_pos.0];
let (hash, is_seen) = {
let mailbox = &mut account[self.cursor_pos.1].as_mut().unwrap();
let envelope: &mut Envelope = mailbox.thread_to_mail_mut(idx);
let envelope: &mut Envelope =
mailbox.thread_to_mail_mut(self.locations[self.new_cursor_pos.2].1);
(envelope.hash(), envelope.is_seen())
};
if !is_seen {
@ -424,7 +494,8 @@ impl Component for ThreadListing {
backend.operation(hash, folder_hash)
};
let mailbox = &mut account[self.cursor_pos.1].as_mut().unwrap();
let envelope: &mut Envelope = mailbox.thread_to_mail_mut(idx);
let envelope: &mut Envelope =
mailbox.thread_to_mail_mut(self.locations[self.new_cursor_pos.2].1);
envelope.set_seen(op).unwrap();
true
} else {
@ -476,7 +547,7 @@ impl Component for ThreadListing {
let coordinates = (
self.cursor_pos.0,
self.cursor_pos.1,
mailbox.threaded_mail(self.cursor_pos.2),
mailbox.threaded_mail(self.locations[self.cursor_pos.2].1),
);
self.view = Some(MailView::new(coordinates, None, None));
}
@ -587,6 +658,7 @@ impl Component for ThreadListing {
if *idxa == self.new_cursor_pos.0 && *idxf == self.new_cursor_pos.1 {
self.dirty = true;
self.refresh_mailbox(context);
eprintln!("mailboxupdate");
}
}
UIEventType::ChangeMode(UIMode::Normal) => {
@ -596,8 +668,8 @@ impl Component for ThreadListing {
self.dirty = true;
}
UIEventType::Action(ref action) => match action {
Action::ViewMailbox(idx) => {
self.new_cursor_pos.1 = *idx;
Action::ViewMailbox(idx_m) => {
self.new_cursor_pos.1 = *idx_m;
self.dirty = true;
self.refresh_mailbox(context);
return true;

View File

@ -70,8 +70,7 @@ impl AccountMenu {
}
entries
},
})
.collect();
}).collect();
AccountMenu {
accounts,
dirty: true,

View File

@ -32,6 +32,9 @@ pub use mail::*;
pub mod notifications;
pub mod indexer;
pub use self::indexer::*;
pub mod utilities;
pub use self::utilities::*;

View File

@ -59,3 +59,6 @@ pub use components::*;
pub mod conf;
pub use conf::*;
pub mod workers;
pub use workers::*;

View File

@ -100,14 +100,14 @@ impl Context {
pub fn restore_input(&self) {
self.input.restore(self.sender.clone());
}
pub fn account_status(&mut self, idx_a: usize, idx_m: usize) -> result::Result<bool, usize> {
pub fn account_status(&mut self, idx_a: usize, idx_m: usize) -> result::Result<(), usize> {
match self.accounts[idx_a].status(idx_m) {
Ok(()) => {
self.replies.push_back(UIEvent {
id: 0,
event_type: UIEventType::MailboxUpdate((idx_a, idx_m)),
});
Ok(true)
Ok(())
}
Err(n) => Err(n),
}
@ -127,6 +127,7 @@ pub struct State {
entities: Vec<Entity>,
pub context: Context,
threads: FnvHashMap<thread::ThreadId, (chan::Sender<bool>, thread::JoinHandle<()>)>,
work_controller: WorkController,
}
impl Drop for State {
@ -155,7 +156,7 @@ impl State {
pub fn new() -> Self {
/* Create a channel to communicate with other threads. The main process is the sole receiver.
* */
let (sender, receiver) = chan::sync(::std::mem::size_of::<ThreadEvent>());
let (sender, receiver) = chan::sync(32 * ::std::mem::size_of::<ThreadEvent>());
/*
* Create async channel to block the input-thread if we need to fork and stop it from reading
@ -215,7 +216,18 @@ impl State {
},
},
threads: FnvHashMap::with_capacity_and_hasher(1, Default::default()),
work_controller: WorkController::new(),
};
for a in s.context.accounts.iter_mut() {
for worker in a.workers.iter_mut() {
if let Some(worker) = worker.as_mut() {
if let Some(w) = worker.work() {
s.work_controller.queue.add_work(w);
}
}
}
}
write!(
s.stdout(),
"{}{}{}{}",
@ -239,6 +251,11 @@ impl State {
s.restore_input();
s
}
pub fn worker_receiver(&mut self) -> chan::Receiver<bool> {
self.work_controller.results_rx()
}
/*
* When we receive a folder hash from a watcher thread,
* we match the hash to the index of the mailbox, request a reload
@ -252,15 +269,18 @@ impl State {
return;
}
if let Some(notification) = self.context.accounts[idxa].reload(event, idxm) {
self.context
.sender
.send(ThreadEvent::UIEvent(UIEventType::StartupCheck));
self.context.replies.push_back(UIEvent {
id: 0,
event_type: notification,
});
self.context.replies.push_back(UIEvent {
id: 0,
event_type: UIEventType::MailboxUpdate((idxa, idxm)),
});
}
self.context.replies.push_back(UIEvent {
id: 0,
event_type: UIEventType::MailboxUpdate((idxa, idxm)),
});
} else {
eprintln!(
"BUG: mailbox with hash {} not found in mailbox_hashes.",

View File

@ -34,11 +34,16 @@ use std::mem;
use std::ops::{Index, IndexMut};
use std::result;
use std::sync::Arc;
use std::thread;
use types::UIEventType::{self, Notification};
pub type Worker = Option<Async<Result<Mailbox>>>;
macro_rules! mailbox {
($idx:expr, $folders:expr) => {
$folders[$idx].as_mut().unwrap().as_mut().unwrap()
};
}
#[derive(Debug)]
pub struct Account {
name: String,
@ -66,13 +71,9 @@ impl Account {
let notify_fn = Arc::new(notify_fn);
for f in ref_folders {
folders.push(None);
workers.push(Account::new_worker(
&name,
f,
&mut backend,
notify_fn.clone(),
));
workers.push(Account::new_worker(f, &mut backend, notify_fn.clone()));
}
eprintln!("sent_folder for {} is {:?}", name, sent_folder);
Account {
name,
folders,
@ -85,56 +86,56 @@ impl Account {
}
}
fn new_worker(
name: &str,
folder: Folder,
backend: &mut Box<MailBackend>,
notify_fn: Arc<NotifyFn>,
) -> Worker {
let mailbox_handle = backend.get(&folder, notify_fn.clone());
let mailbox_handle = backend.get(&folder);
let mut builder = AsyncBuilder::new();
let tx = builder.tx();
Some(
builder.build(
thread::Builder::new()
.name(format!("Loading {}", name))
.spawn(move || {
let envelopes = mailbox_handle.join();
let ret = Mailbox::new(folder, envelopes);
tx.send(AsyncStatus::Finished);
notify_fn.notify();
ret
}).unwrap(),
),
)
Some(builder.build(Box::new(move || {
let mut handle = mailbox_handle.clone();
let folder = folder.clone();
let work = handle.work().unwrap();
work.compute();
handle.join();
let envelopes = handle.extract();
let ret = Mailbox::new(folder, envelopes);
tx.send(AsyncStatus::Payload(ret));
notify_fn.notify();
})))
}
pub fn reload(&mut self, event: RefreshEvent, idx: usize) -> Option<UIEventType> {
let kind = event.kind();
{
let mailbox: &mut Mailbox = self.folders[idx].as_mut().unwrap().as_mut().unwrap();
//let mailbox: &mut Mailbox = self.folders[idx].as_mut().unwrap().as_mut().unwrap();
match kind {
RefreshEventKind::Update(old_hash, envelope) => {
mailbox.update(old_hash, *envelope);
mailbox!(idx, self.folders).update(old_hash, *envelope);
}
RefreshEventKind::Rename(old_hash, new_hash) => {
mailbox!(idx, self.folders).rename(old_hash, new_hash);
}
RefreshEventKind::Create(envelope) => {
let env: &Envelope = mailbox.insert(*envelope);
eprintln!("create {}", envelope.hash());
let env: &Envelope = mailbox!(idx, self.folders).insert(*envelope);
let ref_folders: Vec<Folder> = self.backend.folders();
return Some(Notification(
Some("new mail".into()),
format!(
"{:.15}:\nSubject: {:.15}\nFrom: {:.15}",
"{:<15}:\nSubject: {:<15}\nFrom: {:<15}",
ref_folders[idx].name(),
env.subject(),
env.field_from_to_string()
env.field_from_to_string(),
),
));
}
RefreshEventKind::Remove(envelope_hash) => {
mailbox.remove(envelope_hash);
mailbox!(idx, self.folders).remove(envelope_hash);
}
RefreshEventKind::Rescan => {
let ref_folders: Vec<Folder> = self.backend.folders();
let handle = Account::new_worker(
&self.name,
ref_folders[idx].clone(),
&mut self.backend,
self.notify_fn.clone(),
@ -143,9 +144,6 @@ impl Account {
}
}
}
if self.workers[idx].is_some() {
self.folders[idx] = None;
}
None
}
pub fn watch(&self, r: RefreshEventConsumer) -> () {
@ -180,6 +178,8 @@ impl Account {
}
fn load_mailbox(&mut self, index: usize, mailbox: Result<Mailbox>) {
self.folders[index] = Some(mailbox);
/*
if self.sent_folder.is_some() && self.sent_folder.unwrap() == index {
self.folders[index] = Some(mailbox);
/* Add our replies to other folders */
@ -190,6 +190,7 @@ impl Account {
self.folders[index] = Some(mailbox);
self.add_replies_to_folder(index);
};
*/
}
fn add_replies_to_folder(&mut self, folder_index: usize) {
@ -231,7 +232,7 @@ impl Account {
None => {
return Ok(());
}
Some(ref mut w) => match w.poll() {
Some(ref mut w) if self.folders[index].is_none() => match w.poll() {
Ok(AsyncStatus::NoUpdate) => {
return Err(0);
}
@ -239,13 +240,15 @@ impl Account {
Ok(AsyncStatus::ProgressReport(n)) => {
return Err(n);
}
a => {
eprintln!("Error: {:?}", a);
_ => {
return Err(0);
}
},
Some(_) => return Ok(()),
};
let m = self.workers[index].take().unwrap().extract();
let m = mem::replace(&mut self.workers[index], None)
.unwrap()
.extract();
self.workers[index] = None;
self.load_mailbox(index, m);
Ok(())

View File

@ -627,15 +627,15 @@ pub fn copy_area(grid_dest: &mut CellBuffer, grid_src: &CellBuffer, dest: Area,
src_x += 1;
}
src_x = get_x(upper_left!(src));
if src_y >= get_y(bottom_right!(src)) {
src_y += 1;
if src_y > get_y(bottom_right!(src)) {
clear_area(
grid_dest,
((get_x(upper_left!(dest)), y), bottom_right!(dest)),
((get_x(upper_left!(dest)), y + 1), bottom_right!(dest)),
);
ret.1 = y;
break;
}
src_y += 1;
}
ret
}

View File

@ -0,0 +1,298 @@
use chan;
use melib::async::Work;
use std;
use std::mem;
use std::thread;
const MAX_WORKER: usize = 4;
pub struct WorkController {
pub queue: WorkQueue<Work>,
thread_end_tx: chan::Sender<bool>,
results: Option<chan::Receiver<bool>>,
threads: Vec<std::thread::JoinHandle<()>>,
}
impl WorkController {
pub fn results_rx(&mut self) -> chan::Receiver<bool> {
self.results.take().unwrap()
}
}
impl Drop for WorkController {
fn drop(&mut self) {
for _ in 0..self.threads.len() {
self.thread_end_tx.send(true);
}
let threads = mem::replace(&mut self.threads, Vec::new());
for handle in threads {
handle.join().unwrap();
}
}
}
// We need a way to keep track of what work needs to be done.
// This is a multi-source, multi-consumer queue which we call a
// WorkQueue.
// To create this type, we wrap a mutex (std::sync::mutex) around a
// queue (technically a double-ended queue, std::collections::VecDeque).
//
// Mutex stands for MUTually EXclusive. It essentially ensures that only
// one thread has access to a given resource at one time.
use std::sync::Mutex;
// A VecDeque is a double-ended queue, but we will only be using it in forward
// mode; that is, we will push onto the back and pull from the front.
use std::collections::VecDeque;
// Finally we wrap the whole thing in Arc (Atomic Reference Counting) so that
// we can safely share it with other threads. Arc (std::sync::arc) is a lot
// like Rc (std::rc::Rc), in that it allows multiple references to some memory
// which is freed when no references remain, except that it is atomic, making
// it comparitively slow but able to be shared across the thread boundary.
use std::sync::Arc;
// All three of these types are wrapped around a generic type T.
// T is required to be Send (a marker trait automatically implemented when
// it is safe to do so) because it denotes types that are safe to move between
// threads, which is the whole point of the WorkQueue.
// For this implementation, T is required to be Copy as well, for simplicity.
/// A generic work queue for work elements which can be trivially copied.
/// Any producer of work can add elements and any worker can consume them.
/// WorkQueue derives Clone so that it can be distributed among threads.
#[derive(Clone)]
pub struct WorkQueue<T: Send> {
inner: Arc<Mutex<VecDeque<T>>>,
new_jobs_tx: chan::Sender<bool>,
}
impl<T: Send> WorkQueue<T> {
// Creating one of these by hand would be kind of a pain,
// so let's provide a convenience function.
/// Creates a new WorkQueue, ready to be used.
fn new(new_jobs_tx: chan::Sender<bool>) -> Self {
Self {
inner: Arc::new(Mutex::new(VecDeque::new())),
new_jobs_tx,
}
}
// This is the function workers will use to acquire work from the queue.
// They will call it in a loop, checking to see if there is any work available.
/// Blocks the current thread until work is available, then
/// gets the data required to perform that work.
///
/// # Errors
/// Returns None if there is no more work in the queue.
///
/// # Panics
/// Panics if the underlying mutex became poisoned. This is exceedingly
/// unlikely.
fn get_work(&self) -> Option<T> {
// Try to get a lock on the Mutex. If this fails, there is a
// problem with the mutex - it's poisoned, meaning that a thread that
// held the mutex lock panicked before releasing it. There is no way
// to guarantee that all its invariants are upheld, so we need to not
// use it in that case.
let maybe_queue = self.inner.lock();
// A lot is going on here. self.inner is an Arc of Mutex. Arc can deref
// into its internal type, so we can call the methods of that inner
// type (Mutex) without dereferencing, so this is like
// *(self.inner).lock()
// but doesn't look awful. Mutex::lock() returns a
// Result<MutexGuard<VecDeque<T>>>.
// Unwrapping with if let, we get a MutexGuard, which is an RAII guard
// that unlocks the Mutex when it goes out of scope.
if let Ok(mut queue) = maybe_queue {
// queue is a MutexGuard<VecDeque>, so this is like
// (*queue).pop_front()
// Returns Some(item) or None if there are no more items.
queue.pop_front()
// The function has returned, so queue goes out of scope and the
// mutex unlocks.
} else {
// There's a problem with the mutex.
panic!("WorkQueue::get_work() tried to lock a poisoned mutex");
}
}
// Both the controller (main thread) and possibly workers can use this
// function to add work to the queue.
/// Blocks the current thread until work can be added, then
/// adds that work to the end of the queue.
/// Returns the amount of work now in the queue.
///
/// # Panics
/// Panics if the underlying mutex became poisoned. This is exceedingly
/// unlikely.
pub fn add_work(&self, work: T) -> usize {
// As above, try to get a lock on the mutex.
if let Ok(mut queue) = self.inner.lock() {
// As above, we can use the MutexGuard<VecDeque<T>> to access
// the internal VecDeque.
queue.push_back(work);
self.new_jobs_tx.send(true);
// Now return the length of the queue.
queue.len()
} else {
panic!("WorkQueue::add_work() tried to lock a poisoned mutex");
}
}
}
impl WorkController {
pub fn new() -> WorkController {
let (new_jobs_tx, new_jobs_rx) = chan::async();
// Create a new work queue to keep track of what work needs to be done.
// Note that the queue is internally mutable (or, rather, the Mutex is),
// but this binding doesn't need to be mutable. This isn't unsound because
// the Mutex ensures at runtime that no two references can be used;
// therefore no mutation can occur at the same time as aliasing.
let queue: WorkQueue<Work> = WorkQueue::new(new_jobs_tx);
// Create a MPSC (Multiple Producer, Single Consumer) channel. Every worker
// is a producer, the main thread is a consumer; the producers put their
// work into the channel when it's done.
let (results_tx, results_rx) = chan::async();
// Create a SyncFlag to share whether or not there are more jobs to be done.
let (thread_end_tx, thread_end_rx) = chan::sync(::std::mem::size_of::<bool>());
// This Vec will hold thread join handles to allow us to not exit while work
// is still being done. These handles provide a .join() method which blocks
// the current thread until the thread referred to by the handle exits.
let mut threads = Vec::new();
for thread_num in 0..MAX_WORKER {
// Get a reference to the queue for the thread to use
// .clone() here doesn't clone the actual queue data, but rather the
// internal Arc produces a new reference for use in the new queue
// instance.
let thread_queue = queue.clone();
// Similarly, create a new transmitter for the thread to use
let thread_results_tx = results_tx.clone();
let thread_end_rx = thread_end_rx.clone();
let new_jobs_rx = new_jobs_rx.clone();
// thread::spawn takes a closure (an anonymous function that "closes"
// over its environment). The move keyword means it takes ownership of
// those variables, meaning they can't be used again in the main thread.
let handle = thread::spawn(move || {
// A varaible to keep track of how much work was done.
let mut work_done = 0;
'work_loop: loop {
// Loop while there's expected to be work, looking for work.
chan_select! {
thread_end_rx.recv() -> _ => {
break 'work_loop;
},
new_jobs_rx.recv() -> _ => {
// If work is available, do that work.
while let Some(work) = thread_queue.get_work() {
// Do some work.
work.compute();
// Record that some work was done.
work_done += 1;
// Send the work and the result of that work.
//
// Sending could fail. If so, there's no use in
// doing any more work, so abort.
thread_results_tx.send(true);
// Signal to the operating system that now is a good time
// to give another thread a chance to run.
//
// This isn't strictly necessary - the OS can preemptively
// switch between threads, without asking - but it helps make
// sure that other threads do get a chance to get some work.
std::thread::yield_now();
}
continue 'work_loop;
},
}
}
// Report the amount of work done.
eprintln!("Thread {} did {} jobs.", thread_num, work_done);
});
// Add the handle for the newly spawned thread to the list of handles
threads.push(handle);
}
WorkController {
queue,
thread_end_tx,
results: Some(results_rx),
threads,
}
}
}
/*
pub fn add_jobkk
println!("Adding jobs to the queue.");
// Variables to keep track of the number of jobs we expect to do.
let mut jobs_remaining = 0;
let mut jobs_total = 0;
// Just add some numbers to the queue.
// These numbers will be passed into fib(), so they need to stay pretty
// small.
for work in 0..90 {
// Add each one several times.
for _ in 0..100 {
jobs_remaining = queue.add_work(work);
jobs_total += 1;
}
}
// Report that some jobs were inserted, and how many are left to be done.
// This is interesting because the workers have been taking jobs out of the queue
// the whole time the control thread has been putting them in!
//
// Try removing the use of std::thread::yield_now() in the thread closure.
// You'll probably (depending on your system) notice that the number remaining
// after insertion goes way up. That's because the operating system is usually
// (not always, but usually) fairly conservative about interrupting a thread
// that is actually doing work.
//
// Similarly, if you add a call to yield_now() in the loop above, you'll see the
// number remaining probably drop to 1 or 2. This can also change depending on
// how optimized the output code is - try `cargo run --release` vs `cargo run`.
//
// This inconsistency should drive home to you that you as the programmer can't
// make any assumptions at all about when and in what order things will happen
// in parallel code unless you use thread control primatives as demonstrated
// in this program.
println!("Total of {} jobs inserted into the queue ({} remaining at this time).",
jobs_total,
jobs_remaining);
// Get completed work from the channel while there's work to be done.
while jobs_total > 0 {
match results_rx.recv() {
// If the control thread successfully receives, a job was completed.
Ok(_) => { jobs_total -= 1 },
// If the control thread is the one left standing, that's pretty
// problematic.
Err(_) => {panic!("All workers died unexpectedly.");}
}
}
*/