Async loading of mailboxes on startup

embed
Manos Pitsidianakis 2018-08-03 13:46:08 +03:00
parent 4e5721563e
commit b21d30c2ef
Signed by: Manos Pitsidianakis
GPG Key ID: 73627C2F690DF710
16 changed files with 324 additions and 83 deletions

View File

@ -20,3 +20,4 @@ data-encoding = "2.1.1"
encoding = "0.2.33"
bitflags = "1.0"
termion = "1.5.1"
chan = "0.1.21"

86
melib/src/async.rs 100644
View File

@ -0,0 +1,86 @@
use chan;
use std::thread;
#[derive(Debug)]
pub enum AsyncStatus {
NoUpdate,
Finished,
ProgressReport(usize),
}
#[derive(Debug)]
pub struct AsyncBuilder {
tx: chan::Sender<AsyncStatus>,
rx: chan::Receiver<AsyncStatus>,
}
#[derive(Debug)]
pub struct Async<T> {
value: Option<T>,
worker: Option<thread::JoinHandle<T>>,
tx: chan::Sender<AsyncStatus>,
rx: chan::Receiver<AsyncStatus>,
}
impl AsyncBuilder {
pub fn new() -> Self {
let (sender, receiver) = chan::sync(::std::mem::size_of::<AsyncStatus>());
AsyncBuilder {
tx: sender,
rx: receiver,
}
}
pub fn tx(&mut self) -> chan::Sender<AsyncStatus> {
self.tx.clone()
}
pub fn rx(&mut self) -> chan::Receiver<AsyncStatus> {
self.rx.clone()
}
pub fn build<T: Clone>(self, worker: thread::JoinHandle<T>) -> Async<T> {
Async {
worker: Some(worker),
value: None,
tx: self.tx,
rx: self.rx,
}
}
}
impl<T> Async<T> {
pub fn extract(self) -> T {
self.value.unwrap()
}
pub fn poll(&mut self) -> Result<AsyncStatus, ()> {
if self.value.is_some() {
return Ok(AsyncStatus::Finished);
}
//self.tx.send(true);
let rx = &self.rx;
chan_select! {
default => {
return Ok(AsyncStatus::NoUpdate);
},
rx.recv() -> r => {
match r {
Some(AsyncStatus::Finished) => {
},
Some(a) => {
eprintln!("async got {:?}", a);
return Ok(a);
}
_ => {
return Err(());
},
}
},
}
let v = self.worker.take().unwrap().join().unwrap();
self.value = Some(v);
eprintln!("worker joined");
return Ok(AsyncStatus::Finished);
}
}

View File

@ -111,7 +111,8 @@ impl FileSettings {
let mut s = Config::new();
let s = s.merge(File::new(config_path.to_str().unwrap(), FileFormat::Toml));
// TODO: Return result
// No point in returning without a config file.
// TODO: Error and exit instead of panic.
s.unwrap().deserialize().unwrap()
}
}

View File

@ -21,6 +21,8 @@
pub mod conf;
pub mod error;
pub mod mailbox;
pub mod utilities;
pub mod async;
#[macro_use]
extern crate serde_derive;
@ -31,6 +33,8 @@ extern crate chrono;
extern crate data_encoding;
extern crate encoding;
extern crate memmap;
#[macro_use]
extern crate chan;
#[macro_use]
extern crate bitflags;
@ -41,3 +45,4 @@ pub use mailbox::*;
pub use error::{MeliError, Result};
pub use mailbox::backends::{Backends, RefreshEvent, RefreshEventConsumer};
pub use mailbox::email::{Envelope, Flag};
pub use utilities::*;

View File

@ -22,13 +22,19 @@
use conf::{AccountSettings, Folder};
use mailbox::backends::{Backends, RefreshEventConsumer};
use mailbox::*;
use async::*;
use std::ops::{Index, IndexMut};
use std::result;
pub type Worker = Option<Async<Result<Vec<Envelope>>>>;
#[derive(Debug)]
pub struct Account {
name: String,
folders: Vec<Option<Result<Mailbox>>>,
workers: Vec<Worker>,
sent_folder: Option<usize>,
pub settings: AccountSettings,
@ -43,13 +49,17 @@ impl Account {
.iter()
.position(|x| *x.path() == settings.sent_folder);
let mut folders = Vec::with_capacity(settings.folders.len());
for _ in 0..settings.folders.len() {
folders.push(None);
}
let mut workers = Vec::new();
let backend = backends.get(settings.format());
for f in &settings.folders {
folders.push(None);
let mut handle = backend.get(&f);
workers.push(Some(handle));
}
Account {
name: name,
folders: folders,
workers: workers,
sent_folder: sent_folder,
@ -71,24 +81,16 @@ impl Account {
pub fn name(&self) -> &str {
&self.name
}
}
impl Index<usize> for Account {
type Output = Option<Result<Mailbox>>;
fn index(&self, index: usize) -> &Option<Result<Mailbox>> {
&self.folders[index]
pub fn workers(&mut self) -> &mut Vec<Worker> {
&mut self.workers
}
}
impl IndexMut<usize> for Account {
fn index_mut(&mut self, index: usize) -> &mut Option<Result<Mailbox>> {
if self.folders[index].is_none() {
fn load_mailbox(&mut self, index: usize, envelopes: Result<Vec<Envelope>>) -> () {
let folder = &self.settings.folders[index];
if self.sent_folder.is_some() {
let id = self.sent_folder.unwrap();
if id == index {
self.folders[index] =
Some(Mailbox::new(folder, &None, self.backend.get(&folder)));
Some(Mailbox::new(folder, &None, envelopes));
} else {
let (sent, cur) = {
let ptr = self.folders.as_mut_ptr();
@ -102,14 +104,54 @@ impl IndexMut<usize> for Account {
};
let sent_path = &self.settings.folders[id];
if sent[0].is_none() {
sent[0] = Some(Mailbox::new(sent_path, &None, self.backend.get(&folder)));
sent[0] = Some(Mailbox::new(sent_path, &None, envelopes.clone()));
}
cur[0] = Some(Mailbox::new(folder, &sent[0], self.backend.get(folder)));
cur[0] = Some(Mailbox::new(folder, &sent[0], envelopes));
}
} else {
self.folders[index] = Some(Mailbox::new(folder, &None, self.backend.get(&folder)));
self.folders[index] = Some(Mailbox::new(folder, &None, envelopes));
};
}
&mut self.folders[index]
}
pub fn status(&mut self, index: usize) -> result::Result<(), usize> {
match self.workers[index].as_mut() {
None => { return Ok(()); },
Some(ref mut w) => {
match w.poll() {
Ok(AsyncStatus::NoUpdate) => {
return Err(0);
},
Ok(AsyncStatus::Finished) => {
},
Ok(AsyncStatus::ProgressReport(n)) => {
return Err(n);
},
a => {
eprintln!("{:?}", a);
return Err(0);
}
}
},
};
let m = self.workers[index].take().unwrap().extract();
self.load_mailbox(index, m);
self.workers[index] = None;
Ok(())
}
}
impl Index<usize> for Account {
type Output = Result<Mailbox>;
fn index(&self, index: usize) -> &Result<Mailbox> {
&self.folders[index].as_ref().expect("BUG: Requested mailbox that is not yet available.")
}
}
/// Will panic if mailbox hasn't loaded, ask `status()` first.
impl IndexMut<usize> for Account {
fn index_mut(&mut self, index: usize) -> &mut Result<Mailbox> {
self.folders[index].as_mut().expect("BUG: Requested mailbox that is not yet available.")
}
}

View File

@ -21,6 +21,7 @@
use conf::Folder;
use error::Result;
use async::*;
use mailbox::backends::{BackendOp, MailBackend, RefreshEventConsumer};
use mailbox::email::{Envelope, Flag};
@ -57,7 +58,7 @@ impl BackendOp for ImapOp {
pub struct ImapType {}
impl MailBackend for ImapType {
fn get(&self, _folder: &Folder) -> Result<Vec<Envelope>> {
fn get(&self, _folder: &Folder) -> Async<Result<Vec<Envelope>>> {
unimplemented!();
}
fn watch(&self, _sender: RefreshEventConsumer, _folders: &[Folder]) -> () {

View File

@ -19,6 +19,7 @@
* along with meli. If not, see <http://www.gnu.org/licenses/>.
*/
use async::*;
use conf::Folder;
use error::{MeliError, Result};
use mailbox::backends::{
@ -40,6 +41,8 @@ use std::thread;
extern crate crossbeam;
use memmap::{Mmap, Protection};
use std::path::PathBuf;
use std::collections::hash_map::DefaultHasher;
use std::hash::Hasher;
/// `BackendOp` implementor for Maildir
#[derive(Debug, Default)]
@ -116,10 +119,11 @@ impl BackendOp for MaildirOp {
#[derive(Debug)]
pub struct MaildirType {
path: String,
idx: (usize, usize),
}
impl MailBackend for MaildirType {
fn get(&self, folder: &Folder) -> Result<Vec<Envelope>> {
fn get(&self, folder: &Folder) -> Async<Result<Vec<Envelope>>> {
self.multicore(4, folder)
}
fn watch(&self, sender: RefreshEventConsumer, folders: &[Folder]) -> () {
@ -145,11 +149,15 @@ impl MailBackend for MaildirType {
match rx.recv() {
Ok(event) => match event {
DebouncedEvent::Create(pathbuf) => {
let path = pathbuf.parent().unwrap().to_str().unwrap();
let mut hasher = DefaultHasher::new();
hasher.write(path.as_bytes());
sender.send(RefreshEvent {
folder: format!(
"{}",
pathbuf.parent().unwrap().to_str().unwrap()
"{}", path
),
hash: hasher.finish(),
});
}
_ => {}
@ -163,9 +171,10 @@ impl MailBackend for MaildirType {
}
impl MaildirType {
pub fn new(path: &str) -> Self {
pub fn new(path: &str, idx: (usize, usize)) -> Self {
MaildirType {
path: path.to_string(),
idx: idx,
}
}
fn is_valid(f: &Folder) -> Result<()> {
@ -183,8 +192,19 @@ impl MaildirType {
}
Ok(())
}
pub fn multicore(&self, cores: usize, folder: &Folder) -> Result<Vec<Envelope>> {
MaildirType::is_valid(folder)?;
pub fn multicore(&self, cores: usize, folder: &Folder) -> Async<Result<Vec<Envelope>>> {
let mut w = AsyncBuilder::new();
let handle = {
let tx = w.tx();
// TODO: Avoid clone
let folder = folder.clone();
thread::Builder::new()
.name(format!("parsing {:?}", folder))
.spawn(move || {
MaildirType::is_valid(&folder)?;
let path = folder.path();
let mut path = PathBuf::from(path);
path.push("cur");
@ -208,19 +228,26 @@ impl MaildirType {
count
};
for chunk in files.chunks(chunk_size) {
let mut tx = tx.clone();
let s = scope.spawn(move || {
let len = chunk.len();
let size = if len <= 100 { 100 } else { (len / 100) * 100};
let mut local_r: Vec<Envelope> = Vec::with_capacity(chunk.len());
for e in chunk {
let e_copy = e.to_string();
if let Some(mut e) =
Envelope::from_token(Box::new(BackendOpGenerator::new(Box::new(
move || Box::new(MaildirOp::new(e_copy.clone())),
)))) {
if e.populate_headers().is_err() {
continue;
}
local_r.push(e);
for c in chunk.chunks(size) {
let len = c.len();
for e in c {
let e_copy = e.to_string();
if let Some(mut e) =
Envelope::from_token(Box::new(BackendOpGenerator::new(Box::new(
move || Box::new(MaildirOp::new(e_copy.clone())),
)))) {
if e.populate_headers().is_err() {
continue;
}
local_r.push(e);
}
}
tx.send(AsyncStatus::ProgressReport(len));
}
local_r
});
@ -232,6 +259,10 @@ impl MaildirType {
let mut result = t.join();
r.append(&mut result);
}
tx.send(AsyncStatus::Finished);
Ok(r)
}).unwrap()
};
w.build(handle)
}
}

View File

@ -21,6 +21,7 @@
use conf::Folder;
use error::Result;
use async::*;
use mailbox::backends::{BackendOp, MailBackend, RefreshEventConsumer};
use mailbox::email::{Envelope, Flag};
@ -57,7 +58,7 @@ impl BackendOp for MboxOp {
pub struct MboxType {}
impl MailBackend for MboxType {
fn get(&self, _folder: &Folder) -> Result<Vec<Envelope>> {
fn get(&self, _folder: &Folder) -> Async<Result<Vec<Envelope>>> {
unimplemented!();
}
fn watch(&self, _sender: RefreshEventConsumer, _folders: &[Folder]) -> () {

View File

@ -24,6 +24,7 @@ pub mod mbox;
use conf::Folder;
use error::Result;
use async::*;
use mailbox::backends::imap::ImapType;
use mailbox::backends::maildir::MaildirType;
use mailbox::backends::mbox::MboxType;
@ -47,7 +48,7 @@ impl Backends {
};
b.register(
"maildir".to_string(),
Box::new(|| Box::new(MaildirType::new(""))),
Box::new(|| Box::new(MaildirType::new("", (0, 0)))),
);
b.register("mbox".to_string(), Box::new(|| Box::new(MboxType::new(""))));
b.register("imap".to_string(), Box::new(|| Box::new(ImapType::new(""))));
@ -69,6 +70,7 @@ impl Backends {
}
pub struct RefreshEvent {
pub hash: u64,
pub folder: String,
}
@ -87,7 +89,7 @@ impl RefreshEventConsumer {
}
}
pub trait MailBackend: ::std::fmt::Debug {
fn get(&self, folder: &Folder) -> Result<Vec<Envelope>>;
fn get(&self, folder: &Folder) -> Async<Result<Vec<Envelope>>>;
fn watch(&self, sender: RefreshEventConsumer, folders: &[Folder]) -> ();
//fn new(folders: &Vec<String>) -> Box<Self>;
//login function

View File

@ -0,0 +1,11 @@
pub trait ProgressTracker {
fn new(s: String, total_work: usize) -> Self;
fn add_work(&mut self, n: usize) -> ();
fn set_work(&mut self, n: usize) -> ();
fn work(&mut self, n: usize) -> ();
fn percentage(&self) -> usize;
fn description(&self) -> &str;
}

View File

@ -68,6 +68,7 @@ fn make_input_thread(
})
.unwrap()
}
fn main() {
/* Lock all stdio outs */
//let _stdout = stdout();
@ -182,8 +183,9 @@ fn main() {
},
}
},
ThreadEvent::RefreshMailbox { name : n } => {
state.rcv_event(UIEvent { id: 0, event_type: UIEventType::Notification(n.clone())});
ThreadEvent::RefreshMailbox { hash : h } => {
eprintln!("got refresh mailbox hash {:x}", h);
//state.rcv_event(UIEvent { id: 0, event_type: UIEventType::Notification(n.clone())});
state.redraw();
/* Don't handle this yet. */
},

View File

@ -1,9 +1,10 @@
use super::*;
const MAX_COLS: usize = 500;
/// A list of all mail (`Envelope`s) in a `Mailbox`. On `\n` it opens the `Envelope` content in a
/// `Pager`.
/// `MailView`.
pub struct MailListing {
/// (x, y, z): x is accounts, y is folders, z is index inside a folder.
cursor_pos: (usize, usize, usize),
@ -24,7 +25,7 @@ impl MailListing {
/* TODO: Make this configurable */
fn make_entry_string(e: &Envelope, idx: usize) -> String {
format!(
"{} {} {:.85}",
"{} {} {}",
idx,
&e.datetime().format("%Y-%m-%d %H:%M:%S").to_string(),
e.subject()
@ -55,17 +56,25 @@ impl MailListing {
let threaded = context.accounts[self.cursor_pos.0]
.runtime_settings
.threaded;
// Get mailbox as a reference.
let mailbox = &mut context.accounts[self.cursor_pos.0][self.cursor_pos.1]
.as_ref()
.unwrap()
.as_ref()
.unwrap();
// Inform State that we changed the current folder view.
context.replies.push_back(UIEvent {
id: 0,
event_type: UIEventType::RefreshMailbox((self.cursor_pos.0, self.cursor_pos.1)),
});
// Get mailbox as a reference.
//
loop {
eprintln!("loop round");
match context.accounts[self.cursor_pos.0].status(self.cursor_pos.1) {
Ok(()) => { break; },
Err(a) => {
eprintln!("status returned {:?}", a);
}
}
}
let mailbox = &mut context.accounts[self.cursor_pos.0][self.cursor_pos.1]
.as_ref()
.unwrap();
self.length = if threaded {
mailbox.threaded_collection.len()
@ -81,7 +90,7 @@ impl MailListing {
Color::Default,
((0, 0), (MAX_COLS - 1, 0)),
true,
);
);
self.content = content;
return;
}
@ -90,7 +99,7 @@ impl MailListing {
if threaded {
let mut indentations: Vec<bool> = Vec::with_capacity(6);
let mut thread_idx = 0; // needed for alternate thread colors
/* Draw threaded view. */
/* Draw threaded view. */
let mut iter = mailbox.threaded_collection.iter().enumerate().peekable();
let len = mailbox
.threaded_collection
@ -143,13 +152,13 @@ impl MailListing {
container,
&indentations,
len,
),
&mut content,
fg_color,
bg_color,
((0, idx), (MAX_COLS - 1, idx)),
false,
);
),
&mut content,
fg_color,
bg_color,
((0, idx), (MAX_COLS - 1, idx)),
false,
);
for x in x..MAX_COLS {
content[(x, idx)].set_ch(' ');
content[(x, idx)].set_bg(bg_color);
@ -199,7 +208,7 @@ impl MailListing {
bg_color,
((0, y), (MAX_COLS - 1, y)),
false,
);
);
for x in x..MAX_COLS {
content[(x, y)].set_ch(' ');
@ -217,8 +226,6 @@ impl MailListing {
.runtime_settings
.threaded;
let mailbox = &context.accounts[self.cursor_pos.0][self.cursor_pos.1]
.as_ref()
.unwrap()
.as_ref()
.unwrap();
let envelope: &Envelope = if threaded {
@ -563,6 +570,12 @@ impl Component for MailListing {
self.dirty = true;
self.view = None;
}
UIEventType::MailboxUpdate((ref idxa, ref idxf)) => {
if *idxa == self.new_cursor_pos.1 && *idxf == self.new_cursor_pos.0 {
self.refresh_mailbox(context);
self.dirty = true;
}
}
UIEventType::ChangeMode(UIMode::Normal) => {
self.dirty = true;
}

View File

@ -61,8 +61,6 @@ impl Component for MailView {
.runtime_settings
.threaded;
let mailbox = &mut context.accounts[self.coordinates.0][self.coordinates.1]
.as_ref()
.unwrap()
.as_ref()
.unwrap();
let envelope_idx: usize = if threaded {
@ -157,8 +155,6 @@ impl Component for MailView {
let buf = {
let mailbox_idx = self.coordinates; // coordinates are mailbox idxs
let mailbox = &mut context.accounts[mailbox_idx.0][mailbox_idx.1]
.as_ref()
.unwrap()
.as_ref()
.unwrap();
let envelope: &Envelope = &mailbox.collection[envelope_idx];
@ -268,8 +264,6 @@ impl Component for MailView {
.runtime_settings
.threaded;
let mailbox = &mut context.accounts[self.coordinates.0][self.coordinates.1]
.as_ref()
.unwrap()
.as_ref()
.unwrap();
let envelope_idx: usize = if threaded {
@ -339,8 +333,6 @@ impl Component for MailView {
.runtime_settings
.threaded;
let mailbox = &mut context.accounts[self.coordinates.0][self.coordinates.1]
.as_ref()
.unwrap()
.as_ref()
.unwrap();
let envelope_idx: usize = if threaded {

View File

@ -31,7 +31,7 @@ pub mod notifications;
pub mod utilities;
pub use mail::*;
pub use utilities::*;
pub use self::utilities::*;
use super::cells::{CellBuffer, Color};
use super::position::Area;
@ -138,6 +138,14 @@ pub fn copy_area(grid_dest: &mut CellBuffer, grid_src: &CellBuffer, dest: Area,
}
let mut src_x = get_x(upper_left!(src));
let mut src_y = get_y(upper_left!(src));
let (cols, rows) = grid_src.size();
if src_x >= cols || src_y >= rows {
eprintln!(
"DEBUG: src area outside of grid_src in copy_area",
);
return;
}
for y in get_y(upper_left!(dest))..=get_y(bottom_right!(dest)) {
'for_x: for x in get_x(upper_left!(dest))..=get_x(bottom_right!(dest)) {

View File

@ -413,9 +413,13 @@ impl Component for StatusBar {
self.container.rcv_event(event, context);
match &event.event_type {
UIEventType::RefreshMailbox((ref idx_a, ref idx_f)) => {
match context.accounts[*idx_a].status(*idx_f) {
Ok(()) => {},
Err(_) => {
return;
}
}
let m = &context.accounts[*idx_a][*idx_f]
.as_ref()
.unwrap()
.as_ref()
.unwrap();
self.status = format!(
@ -483,3 +487,47 @@ impl Component for TextBox {
return;
}
}
pub struct Progress {
description: String,
total_work: usize,
finished: usize,
}
impl Progress {
pub fn new(s: String, total_work: usize) -> Self {
Progress {
description: s,
total_work: total_work,
finished: 0,
}
}
pub fn add_work(&mut self, n: usize) -> () {
if self.finished >= self.total_work {
return;
}
self.finished += n;
}
pub fn percentage(&self) -> usize {
if self.total_work > 0 {
100 * self.finished / self.total_work
} else {
0
}
}
pub fn description(&self) -> &str {
&self.description
}
}
impl Component for Progress {
fn draw(&mut self, _grid: &mut CellBuffer, _area: Area, _context: &mut Context) {
unimplemented!()
}
fn process_event(&mut self, _event: &UIEvent, _context: &mut Context) {
return;
}
}

View File

@ -75,7 +75,7 @@ pub enum ThreadEvent {
Input(Key),
/// A watched folder has been refreshed.
RefreshMailbox {
name: String,
hash: u64,
},
UIEventType(UIEventType),
//Decode { _ }, // For gpg2 signature check
@ -83,7 +83,7 @@ pub enum ThreadEvent {
impl From<RefreshEvent> for ThreadEvent {
fn from(event: RefreshEvent) -> Self {
ThreadEvent::RefreshMailbox { name: event.folder }
ThreadEvent::RefreshMailbox { hash: event.hash }
}
}
@ -109,6 +109,7 @@ pub enum UIEventType {
EditDraft(File),
Action(Action),
StatusNotification(String),
MailboxUpdate((usize, usize)),
}
/// An event passed from `State` to its Entities.
@ -248,7 +249,7 @@ impl State<std::io::Stdout> {
cursor::Hide,
clear::All,
cursor::Goto(1, 1)
).unwrap();
).unwrap();
s.flush();
for account in &mut s.context.accounts {
let sender = s.sender.clone();
@ -455,15 +456,11 @@ impl<W: Write> State<W> {
/// Tries to load a mailbox's content
pub fn refresh_mailbox(&mut self, account_idx: usize, folder_idx: usize) {
let flag = match &mut self.context.accounts[account_idx][folder_idx] {
Some(Ok(_)) => true,
Some(Err(e)) => {
Ok(_) => true,
Err(e) => {
eprintln!("error {:?}", e);
false
}
None => {
eprintln!("None");
false
}
};
if flag {
self.rcv_event(UIEvent {