Poll all parse workers on startup

embed
Manos Pitsidianakis 2018-08-06 13:33:10 +03:00
parent f2a646158d
commit 3f35b69ff1
Signed by: Manos Pitsidianakis
GPG Key ID: 73627C2F690DF710
6 changed files with 133 additions and 85 deletions

View File

@ -67,7 +67,6 @@ impl<T> Async<T> {
Some(AsyncStatus::Finished) => {
},
Some(a) => {
eprintln!("async got {:?}", a);
return Ok(a);
}
_ => {
@ -79,7 +78,6 @@ impl<T> Async<T> {
}
let v = self.worker.take().unwrap().join().unwrap();
self.value = Some(v);
eprintln!("worker joined");
return Ok(AsyncStatus::Finished);
}
}

View File

@ -33,7 +33,7 @@ pub struct Account {
name: String,
folders: Vec<Option<Result<Mailbox>>>,
workers: Vec<Worker>,
pub workers: Vec<Worker>,
sent_folder: Option<usize>,

View File

@ -137,7 +137,6 @@ impl BackendOp for MaildirOp {
if !(flags & Flag::TRASHED).is_empty() {
new_name.push('T');
}
eprintln!("new name is {}", new_name);
fs::rename(&self.path, &new_name)?;
envelope.set_operation_token(
@ -191,9 +190,9 @@ impl MailBackend for MaildirType {
hasher.write(path.as_bytes());
sender.send(RefreshEvent {
folder: format!(
"{}", path
),
hash: hasher.finish(),
"{}", path
),
hash: hasher.finish(),
});
}
_ => {}
@ -202,7 +201,7 @@ impl MailBackend for MaildirType {
}
}
})
.unwrap();
.unwrap();
}
}
@ -220,9 +219,9 @@ impl MaildirType {
p.push(d);
if !p.is_dir() {
return Err(MeliError::new(format!(
"{} is not a valid maildir folder",
path
)));
"{} is not a valid maildir folder",
path
)));
}
p.pop();
}
@ -235,67 +234,67 @@ impl MaildirType {
// TODO: Avoid clone
let folder = folder.clone();
thread::Builder::new()
.name(format!("parsing {:?}", folder))
.spawn(move || {
MaildirType::is_valid(&folder)?;
let path = folder.path();
let mut path = PathBuf::from(path);
path.push("cur");
let iter = path.read_dir()?;
let count = path.read_dir()?.count();
let mut files: Vec<String> = Vec::with_capacity(count);
let mut r = Vec::with_capacity(count);
for e in iter {
let e = e.and_then(|x| {
let path = x.path();
Ok(path.to_str().unwrap().to_string())
})?;
files.push(e);
}
let mut threads = Vec::with_capacity(cores);
if !files.is_empty() {
crossbeam::scope(|scope| {
let chunk_size = if count / cores > 0 {
count / cores
} else {
count
};
for chunk in files.chunks(chunk_size) {
let mut tx = tx.clone();
let s = scope.spawn(move || {
let len = chunk.len();
let size = if len <= 100 { 100 } else { (len / 100) * 100};
let mut local_r: Vec<Envelope> = Vec::with_capacity(chunk.len());
for c in chunk.chunks(size) {
let len = c.len();
for e in c {
let e_copy = e.to_string();
if let Some(mut e) =
Envelope::from_token(Box::new(BackendOpGenerator::new(Box::new(
move || Box::new(MaildirOp::new(e_copy.clone())),
)))) {
if e.populate_headers().is_err() {
continue;
thread::Builder::new()
.name(format!("parsing {:?}", folder))
.spawn(move || {
MaildirType::is_valid(&folder)?;
let path = folder.path();
let mut path = PathBuf::from(path);
path.push("cur");
let iter = path.read_dir()?;
let count = path.read_dir()?.count();
let mut files: Vec<String> = Vec::with_capacity(count);
let mut r = Vec::with_capacity(count);
for e in iter {
let e = e.and_then(|x| {
let path = x.path();
Ok(path.to_str().unwrap().to_string())
})?;
files.push(e);
}
let mut threads = Vec::with_capacity(cores);
if !files.is_empty() {
crossbeam::scope(|scope| {
let chunk_size = if count / cores > 0 {
count / cores
} else {
count
};
for chunk in files.chunks(chunk_size) {
let mut tx = tx.clone();
let s = scope.spawn(move || {
let len = chunk.len();
let size = if len <= 100 { 100 } else { (len / 100) * 100};
let mut local_r: Vec<Envelope> = Vec::with_capacity(chunk.len());
for c in chunk.chunks(size) {
let len = c.len();
for e in c {
let e_copy = e.to_string();
if let Some(mut e) =
Envelope::from_token(Box::new(BackendOpGenerator::new(Box::new(
move || Box::new(MaildirOp::new(e_copy.clone())),
)))) {
if e.populate_headers().is_err() {
continue;
}
local_r.push(e);
}
}
local_r.push(e);
tx.send(AsyncStatus::ProgressReport(len));
}
local_r
});
threads.push(s);
}
tx.send(AsyncStatus::ProgressReport(len));
}
local_r
});
threads.push(s);
}
});
}
for t in threads {
let mut result = t.join();
r.append(&mut result);
}
tx.send(AsyncStatus::Finished);
Ok(r)
}).unwrap()
});
}
for t in threads {
let mut result = t.join();
r.append(&mut result);
}
tx.send(AsyncStatus::Finished);
Ok(r)
}).unwrap()
};
w.build(handle)
}

View File

@ -59,7 +59,7 @@ fn make_input_thread(
sx.send(ThreadEvent::Input(k));
},
|| {
sx.send(ThreadEvent::UIEventType(UIEventType::ChangeMode(
sx.send(ThreadEvent::UIEvent(UIEventType::ChangeMode(
UIMode::Fork,
)));
},
@ -189,11 +189,29 @@ fn main() {
state.redraw();
/* Don't handle this yet. */
},
ThreadEvent::UIEventType(UIEventType::ChangeMode(f)) => {
ThreadEvent::UIEvent(UIEventType::ChangeMode(f)) => {
state.mode = f;
break 'inner; // `goto` 'reap loop, and wait on child.
}
ThreadEvent::UIEventType(e) => {
ThreadEvent::UIEvent(UIEventType::StartupCheck) => {
let mut flag = false;
for account in &mut state.context.accounts {
let len = account.len();
for i in 0..len {
match account.status(i) {
Ok(()) => { },
Err(_) => {
flag |= true;
}
}
}
}
if !flag {
state.finish_startup();
}
}
ThreadEvent::UIEvent(e) => {
state.rcv_event(UIEvent { id: 0, event_type: e});
state.render();
},

View File

@ -64,14 +64,13 @@ impl MailListing {
// Get mailbox as a reference.
//
loop {
eprintln!("loop round");
match context.accounts[self.cursor_pos.0].status(self.cursor_pos.1) {
Ok(()) => { break; },
Err(a) => {
eprintln!("status returned {:?}", a);
match context.accounts[self.cursor_pos.0].status(self.cursor_pos.1) {
Ok(()) => { break; },
Err(_) => {
// TODO: Show progress visually
}
}
}
}
let mailbox = &mut context.accounts[self.cursor_pos.0][self.cursor_pos.1]
.as_ref()
.unwrap();
@ -455,8 +454,7 @@ impl Component for MailListing {
&mut mailbox.collection[idx]
};
if !envelope.is_seen() {
eprintln!("setting seen");
envelope.set_seen();
envelope.set_seen().unwrap();
true
} else {
false
@ -464,7 +462,6 @@ impl Component for MailListing {
}
};
if must_highlight {
eprintln!("must highlight");
self.highlight_line_self(idx, context);
}
let mid = get_y(upper_left) + total_rows - bottom_entity_rows;
@ -659,13 +656,12 @@ impl Component for MailListing {
return;
},
Action::ViewMailbox(idx) => {
eprintln!("listing got viewmailbox({})", idx);
self.new_cursor_pos.1 = *idx;
self.dirty = true;
self.refresh_mailbox(context);
return;
}
_ => {},
},
//_ => {},
},
_ => {}
}

View File

@ -55,6 +55,8 @@ use melib::*;
use std::collections::VecDeque;
use std::fmt;
use std::io::Write;
use std::thread;
use std::time;
extern crate termion;
use termion::event::Key as TermionKey;
use termion::input::TermRead;
@ -77,7 +79,7 @@ pub enum ThreadEvent {
RefreshMailbox {
hash: u64,
},
UIEventType(UIEventType),
UIEvent(UIEventType),
//Decode { _ }, // For gpg2 signature check
}
@ -110,6 +112,8 @@ pub enum UIEventType {
Action(Action),
StatusNotification(String),
MailboxUpdate((usize, usize)),
StartupCheck,
}
/// An event passed from `State` to its Entities.
@ -186,6 +190,8 @@ pub struct State<W: Write> {
sender: Sender<ThreadEvent>,
entities: Vec<Entity>,
pub context: Context,
startup_thread: Option<(chan::Sender<bool>, thread::JoinHandle<()>)>,
}
impl<W: Write> Drop for State<W> {
@ -222,6 +228,25 @@ impl State<std::io::Stdout> {
.map(|(n, a_s)| Account::new(n.to_string(), a_s.clone(), &backends))
.collect();
accounts.sort_by(|a, b| a.name().cmp(&b.name()));
let (startup_tx, startup_rx) = chan::async();
let startup_thread = {
let sender = sender.clone();
thread::Builder::new()
.name("startup-thread".to_string())
.spawn(move || {
let dur = time::Duration::from_millis(100);
loop {
chan_select! {
default => {},
startup_rx.recv() -> _ => {
return;
}
}
sender.send(ThreadEvent::UIEvent(UIEventType::StartupCheck));
thread::sleep(dur);
}
}).unwrap()
};
let mut s = State {
cols: cols,
rows: rows,
@ -242,6 +267,7 @@ impl State<std::io::Stdout> {
input_thread: input_thread,
},
startup_thread: Some((startup_tx, startup_thread)),
};
write!(
s.stdout(),
@ -259,6 +285,17 @@ impl State<std::io::Stdout> {
}
s
}
pub fn finish_startup(&mut self) {
// TODO: Encode startup process with the type system if possible
if self.startup_thread.is_none() {
return;
}
{
let (tx, handle) = self.startup_thread.take().unwrap();
tx.send(true);
handle.join().unwrap();
}
}
pub fn to_main_screen(&mut self) {
write!(
self.stdout(),