Update crossbeam to 0.7.2 and remove chan

embed
Manos Pitsidianakis 2019-09-09 12:53:39 +03:00
parent ecb3fd7f3d
commit 81a55abc7c
Signed by: Manos Pitsidianakis
GPG Key ID: 73627C2F690DF710
14 changed files with 129 additions and 106 deletions

View File

@ -10,8 +10,8 @@ path = "src/bin.rs"
[dependencies] [dependencies]
xdg = "2.1.0" xdg = "2.1.0"
chan = "0.1.21" crossbeam = "0.7.2"
chan-signal = "0.3.1" signal-hook = "0.1.10"
nix = "*" nix = "*"
melib = { path = "melib", version = "*" } melib = { path = "melib", version = "*" }
ui = { path = "ui", version = "*" } ui = { path = "ui", version = "*" }

View File

@ -7,9 +7,8 @@ edition = "2018"
[dependencies] [dependencies]
bitflags = "1.0" bitflags = "1.0"
chan = "0.1.21"
chrono = { version = "0.4", features = ["serde"] } chrono = { version = "0.4", features = ["serde"] }
crossbeam = "^0.3.0" crossbeam = "0.7.2"
data-encoding = "2.1.1" data-encoding = "2.1.1"
encoding = "0.2.33" encoding = "0.2.33"
fnv = "1.0.3" fnv = "1.0.3"

View File

@ -31,7 +31,11 @@
* can be extracted with `extract`. * can be extracted with `extract`.
*/ */
use chan; use crossbeam::{
bounded,
channel::{Receiver, Sender},
select,
};
use std::fmt; use std::fmt;
use std::sync::Arc; use std::sync::Arc;
@ -74,16 +78,16 @@ impl<T> fmt::Debug for AsyncStatus<T> {
/// A builder object for `Async<T>` /// A builder object for `Async<T>`
#[derive(Debug, Clone)] #[derive(Debug, Clone)]
pub struct AsyncBuilder<T: Send + Sync> { pub struct AsyncBuilder<T: Send + Sync> {
tx: chan::Sender<AsyncStatus<T>>, tx: Sender<AsyncStatus<T>>,
rx: chan::Receiver<AsyncStatus<T>>, rx: Receiver<AsyncStatus<T>>,
} }
#[derive(Clone, Debug)] #[derive(Clone, Debug)]
pub struct Async<T: Send + Sync> { pub struct Async<T: Send + Sync> {
work: Work, work: Work,
active: bool, active: bool,
tx: chan::Sender<AsyncStatus<T>>, tx: Sender<AsyncStatus<T>>,
rx: chan::Receiver<AsyncStatus<T>>, rx: Receiver<AsyncStatus<T>>,
} }
impl<T: Send + Sync> Default for AsyncBuilder<T> { impl<T: Send + Sync> Default for AsyncBuilder<T> {
@ -97,18 +101,18 @@ where
T: Send + Sync, T: Send + Sync,
{ {
pub fn new() -> Self { pub fn new() -> Self {
let (sender, receiver) = chan::sync(8 * ::std::mem::size_of::<AsyncStatus<T>>()); let (sender, receiver) = bounded(8 * ::std::mem::size_of::<AsyncStatus<T>>());
AsyncBuilder { AsyncBuilder {
tx: sender, tx: sender,
rx: receiver, rx: receiver,
} }
} }
/// Returns the sender object of the promise's channel. /// Returns the sender object of the promise's channel.
pub fn tx(&mut self) -> chan::Sender<AsyncStatus<T>> { pub fn tx(&mut self) -> Sender<AsyncStatus<T>> {
self.tx.clone() self.tx.clone()
} }
/// Returns the receiver object of the promise's channel. /// Returns the receiver object of the promise's channel.
pub fn rx(&mut self) -> chan::Receiver<AsyncStatus<T>> { pub fn rx(&mut self) -> Receiver<AsyncStatus<T>> {
self.rx.clone() self.rx.clone()
} }
/// Returns an `Async<T>` object that contains a `Thread` join handle that returns a `T` /// Returns an `Async<T>` object that contains a `Thread` join handle that returns a `T`
@ -135,11 +139,11 @@ where
} }
} }
/// Returns the sender object of the promise's channel. /// Returns the sender object of the promise's channel.
pub fn tx(&mut self) -> chan::Sender<AsyncStatus<T>> { pub fn tx(&mut self) -> Sender<AsyncStatus<T>> {
self.tx.clone() self.tx.clone()
} }
/// Returns the receiver object of the promise's channel. /// Returns the receiver object of the promise's channel.
pub fn rx(&mut self) -> chan::Receiver<AsyncStatus<T>> { pub fn rx(&mut self) -> Receiver<AsyncStatus<T>> {
self.rx.clone() self.rx.clone()
} }
/// Polls worker thread and returns result. /// Polls worker thread and returns result.
@ -149,20 +153,20 @@ where
} }
let rx = &self.rx; let rx = &self.rx;
chan_select! { select! {
rx.recv() -> r => { recv(rx) -> r => {
match r { match r {
Some(p @ AsyncStatus::Payload(_)) => { Ok(p @ AsyncStatus::Payload(_)) => {
return Ok(p); return Ok(p);
}, },
Some(f @ AsyncStatus::Finished) => { Ok(f @ AsyncStatus::Finished) => {
self.active = false; self.active = false;
return Ok(f); return Ok(f);
}, },
Some(a) => { Ok(a) => {
return Ok(a); return Ok(a);
} }
_ => { Err(_) => {
return Err(()); return Err(());
}, },
} }
@ -176,23 +180,23 @@ where
} }
let rx = &self.rx; let rx = &self.rx;
chan_select! { select! {
default => { default => {
return Ok(AsyncStatus::NoUpdate); return Ok(AsyncStatus::NoUpdate);
}, },
rx.recv() -> r => { recv(rx) -> r => {
match r { match r {
Some(p @ AsyncStatus::Payload(_)) => { Ok(p @ AsyncStatus::Payload(_)) => {
return Ok(p); return Ok(p);
}, },
Some(f @ AsyncStatus::Finished) => { Ok(f @ AsyncStatus::Finished) => {
self.active = false; self.active = false;
return Ok(f); return Ok(f);
}, },
Some(a) => { Ok(a) => {
return Ok(a); return Ok(a);
} }
_ => { Err(_) => {
return Err(()); return Err(());
}, },
} }

View File

@ -74,7 +74,7 @@ impl MailBackend for ImapType {
macro_rules! exit_on_error { macro_rules! exit_on_error {
($tx:expr,$($result:expr)+) => { ($tx:expr,$($result:expr)+) => {
$(if let Err(e) = $result { $(if let Err(e) = $result {
$tx.send(AsyncStatus::Payload(Err(e.into()))); $tx.send(AsyncStatus::Payload(Err(e.into()))).unwrap();
std::process::exit(1); std::process::exit(1);
})+ })+
}; };
@ -147,15 +147,15 @@ impl MailBackend for ImapType {
} }
Err(e) => { Err(e) => {
debug!(&e); debug!(&e);
tx.send(AsyncStatus::Payload(Err(e))); tx.send(AsyncStatus::Payload(Err(e))).unwrap();
} }
} }
exists = std::cmp::max(exists.saturating_sub(20000), 1); exists = std::cmp::max(exists.saturating_sub(20000), 1);
debug!("sending payload"); debug!("sending payload");
tx.send(AsyncStatus::Payload(Ok(envelopes))); tx.send(AsyncStatus::Payload(Ok(envelopes))).unwrap();
} }
drop(conn); drop(conn);
tx.send(AsyncStatus::Finished); tx.send(AsyncStatus::Finished).unwrap();
}; };
Box::new(closure) Box::new(closure)
}; };

View File

@ -706,9 +706,9 @@ impl MaildirType {
})?; })?;
files.push(e); files.push(e);
} }
let mut threads = Vec::with_capacity(cores);
if !files.is_empty() { if !files.is_empty() {
crossbeam::scope(|scope| { crossbeam::scope(|scope| {
let mut threads = Vec::with_capacity(cores);
let cache_dir = cache_dir.clone(); let cache_dir = cache_dir.clone();
let chunk_size = if count / cores > 0 { let chunk_size = if count / cores > 0 {
count / cores count / cores
@ -720,7 +720,7 @@ impl MaildirType {
let tx = tx.clone(); let tx = tx.clone();
let map = map.clone(); let map = map.clone();
let root_path = root_path.clone(); let root_path = root_path.clone();
let s = scope.builder().name(name.clone()).spawn(move || { let s = scope.builder().name(name.clone()).spawn(move |_| {
let len = chunk.len(); let len = chunk.len();
let size = if len <= 100 { 100 } else { (len / 100) * 100 }; let size = if len <= 100 { 100 } else { (len / 100) * 100 };
let mut local_r: Vec<Envelope> = let mut local_r: Vec<Envelope> =
@ -789,23 +789,24 @@ impl MaildirType {
continue; continue;
} }
} }
tx.send(AsyncStatus::ProgressReport(len)); tx.send(AsyncStatus::ProgressReport(len)).unwrap();
} }
local_r local_r
}); });
threads.push(s.unwrap()); threads.push(s.unwrap());
} }
}); for t in threads {
} let mut result = t.join().unwrap();
for t in threads { ret.append(&mut result);
let mut result = t.join(); }
ret.append(&mut result); })
.unwrap();
} }
Ok(ret) Ok(ret)
}; };
let result = thunk(); let result = thunk();
tx_final.send(AsyncStatus::Payload(result)); tx_final.send(AsyncStatus::Payload(result)).unwrap();
tx_final.send(AsyncStatus::Finished); tx_final.send(AsyncStatus::Finished).unwrap();
}; };
Box::new(closure) Box::new(closure)
}; };

View File

@ -384,7 +384,8 @@ impl MailBackend for MboxType {
{ {
Ok(f) => f, Ok(f) => f,
Err(e) => { Err(e) => {
tx.send(AsyncStatus::Payload(Err(MeliError::from(e)))); tx.send(AsyncStatus::Payload(Err(MeliError::from(e))))
.unwrap();
return; return;
} }
}; };
@ -392,7 +393,8 @@ impl MailBackend for MboxType {
let mut buf_reader = BufReader::new(file); let mut buf_reader = BufReader::new(file);
let mut contents = Vec::new(); let mut contents = Vec::new();
if let Err(e) = buf_reader.read_to_end(&mut contents) { if let Err(e) = buf_reader.read_to_end(&mut contents) {
tx.send(AsyncStatus::Payload(Err(MeliError::from(e)))); tx.send(AsyncStatus::Payload(Err(MeliError::from(e))))
.unwrap();
return; return;
}; };
@ -406,7 +408,7 @@ impl MailBackend for MboxType {
.and_modify(|f| f.content = contents); .and_modify(|f| f.content = contents);
} }
tx.send(AsyncStatus::Payload(payload)); tx.send(AsyncStatus::Payload(payload)).unwrap();
}; };
Box::new(closure) Box::new(closure)
}; };

View File

@ -109,8 +109,6 @@ extern crate chrono;
extern crate data_encoding; extern crate data_encoding;
extern crate encoding; extern crate encoding;
extern crate memmap; extern crate memmap;
#[macro_use]
extern crate chan;
#[macro_use] #[macro_use]
extern crate bitflags; extern crate bitflags;

View File

@ -39,15 +39,23 @@ use ui;
pub use melib::*; pub use melib::*;
pub use ui::*; pub use ui::*;
#[macro_use]
extern crate chan;
use chan_signal;
use chan_signal::Signal;
use nix; use nix;
use std::os::raw::c_int;
use xdg; use xdg;
fn notify(
signals: &[c_int],
) -> std::result::Result<crossbeam::channel::Receiver<c_int>, std::io::Error> {
let (s, r) = crossbeam::channel::bounded(100);
let signals = signal_hook::iterator::Signals::new(signals)?;
std::thread::spawn(move || {
for signal in signals.forever() {
s.send(signal).unwrap();
}
});
Ok(r)
}
macro_rules! error_and_exit { macro_rules! error_and_exit {
($($err:expr),*) => {{ ($($err:expr),*) => {{
eprintln!($($err),*); eprintln!($($err),*);
@ -63,7 +71,7 @@ struct CommandLineArguments {
version: bool, version: bool,
} }
fn main() { fn main() -> std::result::Result<(), std::io::Error> {
enum CommandLineFlags { enum CommandLineFlags {
CreateConfig, CreateConfig,
Config, Config,
@ -168,7 +176,12 @@ fn main() {
} }
/* Catch SIGWINCH to handle terminal resizing */ /* Catch SIGWINCH to handle terminal resizing */
let signal = chan_signal::notify(&[Signal::WINCH]); let signals = &[
/* Catch SIGWINCH to handle terminal resizing */
signal_hook::SIGWINCH,
];
let signal_recvr = notify(signals)?;
/* Create the application State. This is the 'System' part of an ECS architecture */ /* Create the application State. This is the 'System' part of an ECS architecture */
let mut state = State::new(); let mut state = State::new();
@ -206,8 +219,8 @@ fn main() {
state.redraw(); state.redraw();
/* Poll on all channels. Currently we have the input channel for stdin, watching events and the signal watcher. */ /* Poll on all channels. Currently we have the input channel for stdin, watching events and the signal watcher. */
chan_select! { crossbeam::select! {
receiver.recv() -> r => { recv(receiver) -> r => {
match debug!(r.unwrap()) { match debug!(r.unwrap()) {
ThreadEvent::Input(Key::Ctrl('z')) => { ThreadEvent::Input(Key::Ctrl('z')) => {
state.switch_to_main_screen(); state.switch_to_main_screen();
@ -290,16 +303,19 @@ fn main() {
}, },
} }
}, },
signal.recv() -> signal => { recv(signal_recvr) -> sig => {
if state.mode != UIMode::Fork { match sig.unwrap() {
if let Some(Signal::WINCH) = signal { signal_hook::SIGWINCH => {
state.update_size(); if state.mode != UIMode::Fork {
state.render(); state.update_size();
state.redraw(); state.render();
} state.redraw();
}
},
_ => {}
} }
}, },
worker_receiver.recv() -> _ => { recv(worker_receiver) -> _ => {
/* Some worker thread finished their job, acknowledge /* Some worker thread finished their job, acknowledge
* it and move on*/ * it and move on*/
}, },
@ -327,4 +343,5 @@ fn main() {
} }
} }
} }
Ok(())
} }

View File

@ -11,8 +11,7 @@ serde = "1.0.71"
serde_derive = "1.0.71" serde_derive = "1.0.71"
serde_json = "1.0" serde_json = "1.0"
toml = "0.5.3" toml = "0.5.3"
chan = "0.1.21" crossbeam = "0.7.2"
chan-signal = "0.3.1"
fnv = "1.0.3" # >:c fnv = "1.0.3" # >:c
linkify = "0.3.1" # >:c linkify = "0.3.1" # >:c
melib = { path = "../melib", version = "*" } melib = { path = "../melib", version = "*" }

View File

@ -341,18 +341,18 @@ impl Account {
debug!("LL"); debug!("LL");
match debug!(mailbox_handle.poll_block()) { match debug!(mailbox_handle.poll_block()) {
Ok(s @ AsyncStatus::Payload(_)) => { Ok(s @ AsyncStatus::Payload(_)) => {
our_tx.send(s); our_tx.send(s).unwrap();
debug!("notifying for {}", folder_hash); debug!("notifying for {}", folder_hash);
notify_fn.notify(folder_hash); notify_fn.notify(folder_hash);
} }
Ok(s @ AsyncStatus::Finished) => { Ok(s @ AsyncStatus::Finished) => {
our_tx.send(s); our_tx.send(s).unwrap();
notify_fn.notify(folder_hash); notify_fn.notify(folder_hash);
debug!("exiting"); debug!("exiting");
return; return;
} }
Ok(s) => { Ok(s) => {
our_tx.send(s); our_tx.send(s).unwrap();
} }
Err(_) => { Err(_) => {
debug!("poll error"); debug!("poll error");
@ -368,7 +368,7 @@ impl Account {
&mut self, &mut self,
event: RefreshEvent, event: RefreshEvent,
folder_hash: FolderHash, folder_hash: FolderHash,
sender: &chan::Sender<crate::types::ThreadEvent>, sender: &crossbeam::channel::Sender<crate::types::ThreadEvent>,
) -> Option<UIEvent> { ) -> Option<UIEvent> {
if !self.folders[&folder_hash].is_available() { if !self.folders[&folder_hash].is_available() {
self.event_queue.push_back((folder_hash, event)); self.event_queue.push_back((folder_hash, event));
@ -442,7 +442,7 @@ impl Account {
debug!("RefreshEvent Failure: {}", e.to_string()); debug!("RefreshEvent Failure: {}", e.to_string());
let sender = sender.clone(); let sender = sender.clone();
self.watch(RefreshEventConsumer::new(Box::new(move |r| { self.watch(RefreshEventConsumer::new(Box::new(move |r| {
sender.send(crate::types::ThreadEvent::from(r)); sender.send(crate::types::ThreadEvent::from(r)).unwrap();
}))); })));
} }
} }

View File

@ -30,9 +30,6 @@ extern crate notify_rust;
extern crate text_processing; extern crate text_processing;
#[macro_use] #[macro_use]
extern crate serde_derive; extern crate serde_derive;
#[macro_use]
extern crate chan;
extern crate chan_signal;
extern crate linkify; extern crate linkify;
extern crate uuid; extern crate uuid;

View File

@ -31,7 +31,7 @@ Input is received in the main loop from threads which listen on the stdin for us
use super::*; use super::*;
use melib::backends::{FolderHash, NotifyFn}; use melib::backends::{FolderHash, NotifyFn};
use chan::{Receiver, Sender}; use crossbeam::channel::{bounded, unbounded, Receiver, Sender};
use fnv::FnvHashMap; use fnv::FnvHashMap;
use std::env; use std::env;
use std::io::Write; use std::io::Write;
@ -58,10 +58,11 @@ impl InputHandler {
get_events( get_events(
stdin, stdin,
|k| { |k| {
tx.send(ThreadEvent::Input(k)); tx.send(ThreadEvent::Input(k)).unwrap();
}, },
|| { || {
tx.send(ThreadEvent::UIEvent(UIEvent::ChangeMode(UIMode::Fork))); tx.send(ThreadEvent::UIEvent(UIEvent::ChangeMode(UIMode::Fork)))
.unwrap();
}, },
&rx, &rx,
) )
@ -69,7 +70,7 @@ impl InputHandler {
.unwrap(); .unwrap();
} }
fn kill(&self) { fn kill(&self) {
self.tx.send(false); self.tx.send(false).unwrap();
} }
} }
@ -130,7 +131,7 @@ pub struct State {
pub mode: UIMode, pub mode: UIMode,
components: Vec<Box<dyn Component>>, components: Vec<Box<dyn Component>>,
pub context: Context, pub context: Context,
threads: FnvHashMap<thread::ThreadId, (chan::Sender<bool>, thread::JoinHandle<()>)>, threads: FnvHashMap<thread::ThreadId, (Sender<bool>, thread::JoinHandle<()>)>,
work_controller: WorkController, work_controller: WorkController,
} }
@ -161,13 +162,13 @@ impl State {
pub fn new() -> Self { pub fn new() -> Self {
/* Create a channel to communicate with other threads. The main process is the sole receiver. /* Create a channel to communicate with other threads. The main process is the sole receiver.
* */ * */
let (sender, receiver) = chan::sync(32 * ::std::mem::size_of::<ThreadEvent>()); let (sender, receiver) = bounded(32 * ::std::mem::size_of::<ThreadEvent>());
/* /*
* Create async channel to block the input-thread if we need to fork and stop it from reading * Create async channel to block the input-thread if we need to fork and stop it from reading
* stdin, see get_events() for details * stdin, see get_events() for details
* */ * */
let input_thread = chan::r#async(); let input_thread = unbounded();
let backends = Backends::new(); let backends = Backends::new();
let settings = Settings::new(); let settings = Settings::new();
@ -186,7 +187,9 @@ impl State {
a_s.clone(), a_s.clone(),
&backends, &backends,
NotifyFn::new(Box::new(move |f: FolderHash| { NotifyFn::new(Box::new(move |f: FolderHash| {
sender.send(ThreadEvent::UIEvent(UIEvent::StartupCheck(f))) sender
.send(ThreadEvent::UIEvent(UIEvent::StartupCheck(f)))
.unwrap();
})), })),
) )
}) })
@ -254,14 +257,14 @@ impl State {
} }
let sender = s.context.sender.clone(); let sender = s.context.sender.clone();
account.watch(RefreshEventConsumer::new(Box::new(move |r| { account.watch(RefreshEventConsumer::new(Box::new(move |r| {
sender.send(ThreadEvent::from(r)); sender.send(ThreadEvent::from(r)).unwrap();
}))); })));
} }
s.restore_input(); s.restore_input();
s s
} }
pub fn worker_receiver(&mut self) -> chan::Receiver<bool> { pub fn worker_receiver(&mut self) -> Receiver<bool> {
self.work_controller.results_rx() self.work_controller.results_rx()
} }
@ -299,7 +302,7 @@ impl State {
/// the thread from its list and `join` it. /// the thread from its list and `join` it.
pub fn join(&mut self, id: thread::ThreadId) { pub fn join(&mut self, id: thread::ThreadId) {
let (tx, handle) = self.threads.remove(&id).unwrap(); let (tx, handle) = self.threads.remove(&id).unwrap();
tx.send(true); tx.send(true).unwrap();
handle.join().unwrap(); handle.join().unwrap();
} }
@ -555,7 +558,8 @@ impl State {
UIEvent::ChangeMode(m) => { UIEvent::ChangeMode(m) => {
self.context self.context
.sender .sender
.send(ThreadEvent::UIEvent(UIEvent::ChangeMode(m))); .send(ThreadEvent::UIEvent(UIEvent::ChangeMode(m)))
.unwrap();
} }
_ => {} _ => {}
} }

View File

@ -20,7 +20,7 @@
*/ */
use super::*; use super::*;
use chan; use crossbeam::{channel::Receiver, select};
use serde::{Serialize, Serializer}; use serde::{Serialize, Serializer};
use std::fmt; use std::fmt;
use std::io; use std::io;
@ -154,24 +154,23 @@ pub fn get_events(
stdin: io::Stdin, stdin: io::Stdin,
mut closure: impl FnMut(Key), mut closure: impl FnMut(Key),
mut exit: impl FnMut(), mut exit: impl FnMut(),
rx: &chan::Receiver<bool>, rx: &Receiver<bool>,
) { ) {
let mut input_mode = InputMode::Normal; let mut input_mode = InputMode::Normal;
let mut paste_buf = String::with_capacity(256); let mut paste_buf = String::with_capacity(256);
for c in stdin.events() { for c in stdin.events() {
chan_select! { select! {
default => {}, default => {},
rx.recv() -> val => { recv(rx) -> val => {
if let Some(true) = val { if let Ok(true) = val {
exit(); exit();
return; return;
} else if let Some(false) = val { } else {
return; return;
} }
} }
}; };
match c { match c {
Ok(TermionEvent::Key(k)) if input_mode == InputMode::Normal => { Ok(TermionEvent::Key(k)) if input_mode == InputMode::Normal => {
closure(Key::from(k)); closure(Key::from(k));

View File

@ -1,4 +1,7 @@
use chan; use crossbeam::{
channel::{bounded, unbounded, Receiver, Sender},
select,
};
use melib::async_workers::Work; use melib::async_workers::Work;
use std; use std;
@ -8,13 +11,13 @@ const MAX_WORKER: usize = 4;
pub struct WorkController { pub struct WorkController {
pub queue: WorkQueue<Work>, pub queue: WorkQueue<Work>,
thread_end_tx: chan::Sender<bool>, thread_end_tx: Sender<bool>,
results: Option<chan::Receiver<bool>>, results: Option<Receiver<bool>>,
threads: Vec<std::thread::JoinHandle<()>>, threads: Vec<std::thread::JoinHandle<()>>,
} }
impl WorkController { impl WorkController {
pub fn results_rx(&mut self) -> chan::Receiver<bool> { pub fn results_rx(&mut self) -> Receiver<bool> {
self.results.take().unwrap() self.results.take().unwrap()
} }
} }
@ -22,7 +25,7 @@ impl WorkController {
impl Drop for WorkController { impl Drop for WorkController {
fn drop(&mut self) { fn drop(&mut self) {
for _ in 0..self.threads.len() { for _ in 0..self.threads.len() {
self.thread_end_tx.send(true); self.thread_end_tx.send(true).unwrap();
} }
/* /*
let threads = mem::replace(&mut self.threads, Vec::new()); let threads = mem::replace(&mut self.threads, Vec::new());
@ -141,7 +144,7 @@ impl<T: Send> WorkQueue<T> {
// the internal VecDeque. // the internal VecDeque.
queue.push_back(work); queue.push_back(work);
self.new_jobs_tx.send(true); self.new_jobs_tx.send(true).unwrap();
// Now return the length of the queue. // Now return the length of the queue.
queue.len() queue.len()
} else { } else {
@ -152,7 +155,7 @@ impl<T: Send> WorkQueue<T> {
impl WorkController { impl WorkController {
pub fn new() -> WorkController { pub fn new() -> WorkController {
let (new_jobs_tx, new_jobs_rx) = chan::r#async(); let (new_jobs_tx, new_jobs_rx) = unbounded();
// Create a new work queue to keep track of what work needs to be done. // Create a new work queue to keep track of what work needs to be done.
// Note that the queue is internally mutable (or, rather, the Mutex is), // Note that the queue is internally mutable (or, rather, the Mutex is),
// but this binding doesn't need to be mutable. This isn't unsound because // but this binding doesn't need to be mutable. This isn't unsound because
@ -163,10 +166,10 @@ impl WorkController {
// Create a MPSC (Multiple Producer, Single Consumer) channel. Every worker // Create a MPSC (Multiple Producer, Single Consumer) channel. Every worker
// is a producer, the main thread is a consumer; the producers put their // is a producer, the main thread is a consumer; the producers put their
// work into the channel when it's done. // work into the channel when it's done.
let (results_tx, results_rx) = chan::r#async(); let (results_tx, results_rx) = unbounded();
// Create a SyncFlag to share whether or not there are more jobs to be done. // Create a SyncFlag to share whether or not there are more jobs to be done.
let (thread_end_tx, thread_end_rx) = chan::sync(::std::mem::size_of::<bool>()); let (thread_end_tx, thread_end_rx) = bounded(::std::mem::size_of::<bool>());
// This Vec will hold thread join handles to allow us to not exit while work // This Vec will hold thread join handles to allow us to not exit while work
// is still being done. These handles provide a .join() method which blocks // is still being done. These handles provide a .join() method which blocks
@ -196,12 +199,12 @@ impl WorkController {
'work_loop: loop { 'work_loop: loop {
debug!("Waiting for work"); debug!("Waiting for work");
// Loop while there's expected to be work, looking for work. // Loop while there's expected to be work, looking for work.
chan_select! { select! {
thread_end_rx.recv() -> _ => { recv(thread_end_rx) -> _ => {
debug!("received thread_end_rx, quitting"); debug!("received thread_end_rx, quitting");
break 'work_loop; break 'work_loop;
}, },
new_jobs_rx.recv() -> _ => { recv(new_jobs_rx) -> _ => {
// If work is available, do that work. // If work is available, do that work.
while let Some(work) = thread_queue.get_work() { while let Some(work) = thread_queue.get_work() {
debug!("Got some work"); debug!("Got some work");
@ -216,7 +219,7 @@ impl WorkController {
// //
// Sending could fail. If so, there's no use in // Sending could fail. If so, there's no use in
// doing any more work, so abort. // doing any more work, so abort.
thread_results_tx.send(true); thread_results_tx.send(true).unwrap();
// Signal to the operating system that now is a good time // Signal to the operating system that now is a good time
// to give another thread a chance to run. // to give another thread a chance to run.