diff --git a/Cargo.toml b/Cargo.toml index f864a1a2f..4c83aaf46 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -10,8 +10,8 @@ path = "src/bin.rs" [dependencies] xdg = "2.1.0" -chan = "0.1.21" -chan-signal = "0.3.1" +crossbeam = "0.7.2" +signal-hook = "0.1.10" nix = "*" melib = { path = "melib", version = "*" } ui = { path = "ui", version = "*" } diff --git a/melib/Cargo.toml b/melib/Cargo.toml index 9a85e4e33..c1072ec67 100644 --- a/melib/Cargo.toml +++ b/melib/Cargo.toml @@ -7,9 +7,8 @@ edition = "2018" [dependencies] bitflags = "1.0" -chan = "0.1.21" chrono = { version = "0.4", features = ["serde"] } -crossbeam = "^0.3.0" +crossbeam = "0.7.2" data-encoding = "2.1.1" encoding = "0.2.33" fnv = "1.0.3" diff --git a/melib/src/async_workers.rs b/melib/src/async_workers.rs index bc7571a61..3dfb36efb 100644 --- a/melib/src/async_workers.rs +++ b/melib/src/async_workers.rs @@ -31,7 +31,11 @@ * can be extracted with `extract`. */ -use chan; +use crossbeam::{ + bounded, + channel::{Receiver, Sender}, + select, +}; use std::fmt; use std::sync::Arc; @@ -74,16 +78,16 @@ impl fmt::Debug for AsyncStatus { /// A builder object for `Async` #[derive(Debug, Clone)] pub struct AsyncBuilder { - tx: chan::Sender>, - rx: chan::Receiver>, + tx: Sender>, + rx: Receiver>, } #[derive(Clone, Debug)] pub struct Async { work: Work, active: bool, - tx: chan::Sender>, - rx: chan::Receiver>, + tx: Sender>, + rx: Receiver>, } impl Default for AsyncBuilder { @@ -97,18 +101,18 @@ where T: Send + Sync, { pub fn new() -> Self { - let (sender, receiver) = chan::sync(8 * ::std::mem::size_of::>()); + let (sender, receiver) = bounded(8 * ::std::mem::size_of::>()); AsyncBuilder { tx: sender, rx: receiver, } } /// Returns the sender object of the promise's channel. - pub fn tx(&mut self) -> chan::Sender> { + pub fn tx(&mut self) -> Sender> { self.tx.clone() } /// Returns the receiver object of the promise's channel. - pub fn rx(&mut self) -> chan::Receiver> { + pub fn rx(&mut self) -> Receiver> { self.rx.clone() } /// Returns an `Async` object that contains a `Thread` join handle that returns a `T` @@ -135,11 +139,11 @@ where } } /// Returns the sender object of the promise's channel. - pub fn tx(&mut self) -> chan::Sender> { + pub fn tx(&mut self) -> Sender> { self.tx.clone() } /// Returns the receiver object of the promise's channel. - pub fn rx(&mut self) -> chan::Receiver> { + pub fn rx(&mut self) -> Receiver> { self.rx.clone() } /// Polls worker thread and returns result. @@ -149,20 +153,20 @@ where } let rx = &self.rx; - chan_select! { - rx.recv() -> r => { + select! { + recv(rx) -> r => { match r { - Some(p @ AsyncStatus::Payload(_)) => { + Ok(p @ AsyncStatus::Payload(_)) => { return Ok(p); }, - Some(f @ AsyncStatus::Finished) => { + Ok(f @ AsyncStatus::Finished) => { self.active = false; return Ok(f); }, - Some(a) => { + Ok(a) => { return Ok(a); } - _ => { + Err(_) => { return Err(()); }, } @@ -176,23 +180,23 @@ where } let rx = &self.rx; - chan_select! { + select! { default => { return Ok(AsyncStatus::NoUpdate); }, - rx.recv() -> r => { + recv(rx) -> r => { match r { - Some(p @ AsyncStatus::Payload(_)) => { + Ok(p @ AsyncStatus::Payload(_)) => { return Ok(p); }, - Some(f @ AsyncStatus::Finished) => { + Ok(f @ AsyncStatus::Finished) => { self.active = false; return Ok(f); }, - Some(a) => { + Ok(a) => { return Ok(a); } - _ => { + Err(_) => { return Err(()); }, } diff --git a/melib/src/backends/imap.rs b/melib/src/backends/imap.rs index d68ff4908..9e44c43cc 100644 --- a/melib/src/backends/imap.rs +++ b/melib/src/backends/imap.rs @@ -74,7 +74,7 @@ impl MailBackend for ImapType { macro_rules! exit_on_error { ($tx:expr,$($result:expr)+) => { $(if let Err(e) = $result { - $tx.send(AsyncStatus::Payload(Err(e.into()))); + $tx.send(AsyncStatus::Payload(Err(e.into()))).unwrap(); std::process::exit(1); })+ }; @@ -147,15 +147,15 @@ impl MailBackend for ImapType { } Err(e) => { debug!(&e); - tx.send(AsyncStatus::Payload(Err(e))); + tx.send(AsyncStatus::Payload(Err(e))).unwrap(); } } exists = std::cmp::max(exists.saturating_sub(20000), 1); debug!("sending payload"); - tx.send(AsyncStatus::Payload(Ok(envelopes))); + tx.send(AsyncStatus::Payload(Ok(envelopes))).unwrap(); } drop(conn); - tx.send(AsyncStatus::Finished); + tx.send(AsyncStatus::Finished).unwrap(); }; Box::new(closure) }; diff --git a/melib/src/backends/maildir/backend.rs b/melib/src/backends/maildir/backend.rs index 46b51eece..d06bfff9c 100644 --- a/melib/src/backends/maildir/backend.rs +++ b/melib/src/backends/maildir/backend.rs @@ -706,9 +706,9 @@ impl MaildirType { })?; files.push(e); } - let mut threads = Vec::with_capacity(cores); if !files.is_empty() { crossbeam::scope(|scope| { + let mut threads = Vec::with_capacity(cores); let cache_dir = cache_dir.clone(); let chunk_size = if count / cores > 0 { count / cores @@ -720,7 +720,7 @@ impl MaildirType { let tx = tx.clone(); let map = map.clone(); let root_path = root_path.clone(); - let s = scope.builder().name(name.clone()).spawn(move || { + let s = scope.builder().name(name.clone()).spawn(move |_| { let len = chunk.len(); let size = if len <= 100 { 100 } else { (len / 100) * 100 }; let mut local_r: Vec = @@ -789,23 +789,24 @@ impl MaildirType { continue; } } - tx.send(AsyncStatus::ProgressReport(len)); + tx.send(AsyncStatus::ProgressReport(len)).unwrap(); } local_r }); threads.push(s.unwrap()); } - }); - } - for t in threads { - let mut result = t.join(); - ret.append(&mut result); + for t in threads { + let mut result = t.join().unwrap(); + ret.append(&mut result); + } + }) + .unwrap(); } Ok(ret) }; let result = thunk(); - tx_final.send(AsyncStatus::Payload(result)); - tx_final.send(AsyncStatus::Finished); + tx_final.send(AsyncStatus::Payload(result)).unwrap(); + tx_final.send(AsyncStatus::Finished).unwrap(); }; Box::new(closure) }; diff --git a/melib/src/backends/mbox.rs b/melib/src/backends/mbox.rs index 51d6231f0..a984602b0 100644 --- a/melib/src/backends/mbox.rs +++ b/melib/src/backends/mbox.rs @@ -384,7 +384,8 @@ impl MailBackend for MboxType { { Ok(f) => f, Err(e) => { - tx.send(AsyncStatus::Payload(Err(MeliError::from(e)))); + tx.send(AsyncStatus::Payload(Err(MeliError::from(e)))) + .unwrap(); return; } }; @@ -392,7 +393,8 @@ impl MailBackend for MboxType { let mut buf_reader = BufReader::new(file); let mut contents = Vec::new(); if let Err(e) = buf_reader.read_to_end(&mut contents) { - tx.send(AsyncStatus::Payload(Err(MeliError::from(e)))); + tx.send(AsyncStatus::Payload(Err(MeliError::from(e)))) + .unwrap(); return; }; @@ -406,7 +408,7 @@ impl MailBackend for MboxType { .and_modify(|f| f.content = contents); } - tx.send(AsyncStatus::Payload(payload)); + tx.send(AsyncStatus::Payload(payload)).unwrap(); }; Box::new(closure) }; diff --git a/melib/src/lib.rs b/melib/src/lib.rs index d9cea92af..06d97dae2 100644 --- a/melib/src/lib.rs +++ b/melib/src/lib.rs @@ -109,8 +109,6 @@ extern crate chrono; extern crate data_encoding; extern crate encoding; extern crate memmap; -#[macro_use] -extern crate chan; #[macro_use] extern crate bitflags; diff --git a/src/bin.rs b/src/bin.rs index f5883a426..5c159fbb5 100644 --- a/src/bin.rs +++ b/src/bin.rs @@ -39,15 +39,23 @@ use ui; pub use melib::*; pub use ui::*; -#[macro_use] -extern crate chan; -use chan_signal; - -use chan_signal::Signal; - use nix; +use std::os::raw::c_int; use xdg; +fn notify( + signals: &[c_int], +) -> std::result::Result, std::io::Error> { + let (s, r) = crossbeam::channel::bounded(100); + let signals = signal_hook::iterator::Signals::new(signals)?; + std::thread::spawn(move || { + for signal in signals.forever() { + s.send(signal).unwrap(); + } + }); + Ok(r) +} + macro_rules! error_and_exit { ($($err:expr),*) => {{ eprintln!($($err),*); @@ -63,7 +71,7 @@ struct CommandLineArguments { version: bool, } -fn main() { +fn main() -> std::result::Result<(), std::io::Error> { enum CommandLineFlags { CreateConfig, Config, @@ -168,7 +176,12 @@ fn main() { } /* Catch SIGWINCH to handle terminal resizing */ - let signal = chan_signal::notify(&[Signal::WINCH]); + let signals = &[ + /* Catch SIGWINCH to handle terminal resizing */ + signal_hook::SIGWINCH, + ]; + + let signal_recvr = notify(signals)?; /* Create the application State. This is the 'System' part of an ECS architecture */ let mut state = State::new(); @@ -206,8 +219,8 @@ fn main() { state.redraw(); /* Poll on all channels. Currently we have the input channel for stdin, watching events and the signal watcher. */ - chan_select! { - receiver.recv() -> r => { + crossbeam::select! { + recv(receiver) -> r => { match debug!(r.unwrap()) { ThreadEvent::Input(Key::Ctrl('z')) => { state.switch_to_main_screen(); @@ -290,16 +303,19 @@ fn main() { }, } }, - signal.recv() -> signal => { - if state.mode != UIMode::Fork { - if let Some(Signal::WINCH) = signal { - state.update_size(); - state.render(); - state.redraw(); - } + recv(signal_recvr) -> sig => { + match sig.unwrap() { + signal_hook::SIGWINCH => { + if state.mode != UIMode::Fork { + state.update_size(); + state.render(); + state.redraw(); + } + }, + _ => {} } }, - worker_receiver.recv() -> _ => { + recv(worker_receiver) -> _ => { /* Some worker thread finished their job, acknowledge * it and move on*/ }, @@ -327,4 +343,5 @@ fn main() { } } } + Ok(()) } diff --git a/ui/Cargo.toml b/ui/Cargo.toml index 2816ff603..603640098 100644 --- a/ui/Cargo.toml +++ b/ui/Cargo.toml @@ -11,8 +11,7 @@ serde = "1.0.71" serde_derive = "1.0.71" serde_json = "1.0" toml = "0.5.3" -chan = "0.1.21" -chan-signal = "0.3.1" +crossbeam = "0.7.2" fnv = "1.0.3" # >:c linkify = "0.3.1" # >:c melib = { path = "../melib", version = "*" } diff --git a/ui/src/conf/accounts.rs b/ui/src/conf/accounts.rs index 3a0525ba3..5f9bcee64 100644 --- a/ui/src/conf/accounts.rs +++ b/ui/src/conf/accounts.rs @@ -341,18 +341,18 @@ impl Account { debug!("LL"); match debug!(mailbox_handle.poll_block()) { Ok(s @ AsyncStatus::Payload(_)) => { - our_tx.send(s); + our_tx.send(s).unwrap(); debug!("notifying for {}", folder_hash); notify_fn.notify(folder_hash); } Ok(s @ AsyncStatus::Finished) => { - our_tx.send(s); + our_tx.send(s).unwrap(); notify_fn.notify(folder_hash); debug!("exiting"); return; } Ok(s) => { - our_tx.send(s); + our_tx.send(s).unwrap(); } Err(_) => { debug!("poll error"); @@ -368,7 +368,7 @@ impl Account { &mut self, event: RefreshEvent, folder_hash: FolderHash, - sender: &chan::Sender, + sender: &crossbeam::channel::Sender, ) -> Option { if !self.folders[&folder_hash].is_available() { self.event_queue.push_back((folder_hash, event)); @@ -442,7 +442,7 @@ impl Account { debug!("RefreshEvent Failure: {}", e.to_string()); let sender = sender.clone(); self.watch(RefreshEventConsumer::new(Box::new(move |r| { - sender.send(crate::types::ThreadEvent::from(r)); + sender.send(crate::types::ThreadEvent::from(r)).unwrap(); }))); } } diff --git a/ui/src/lib.rs b/ui/src/lib.rs index 443d49323..d93213e13 100644 --- a/ui/src/lib.rs +++ b/ui/src/lib.rs @@ -30,9 +30,6 @@ extern crate notify_rust; extern crate text_processing; #[macro_use] extern crate serde_derive; -#[macro_use] -extern crate chan; -extern crate chan_signal; extern crate linkify; extern crate uuid; diff --git a/ui/src/state.rs b/ui/src/state.rs index 270846084..70342abf9 100644 --- a/ui/src/state.rs +++ b/ui/src/state.rs @@ -31,7 +31,7 @@ Input is received in the main loop from threads which listen on the stdin for us use super::*; use melib::backends::{FolderHash, NotifyFn}; -use chan::{Receiver, Sender}; +use crossbeam::channel::{bounded, unbounded, Receiver, Sender}; use fnv::FnvHashMap; use std::env; use std::io::Write; @@ -58,10 +58,11 @@ impl InputHandler { get_events( stdin, |k| { - tx.send(ThreadEvent::Input(k)); + tx.send(ThreadEvent::Input(k)).unwrap(); }, || { - tx.send(ThreadEvent::UIEvent(UIEvent::ChangeMode(UIMode::Fork))); + tx.send(ThreadEvent::UIEvent(UIEvent::ChangeMode(UIMode::Fork))) + .unwrap(); }, &rx, ) @@ -69,7 +70,7 @@ impl InputHandler { .unwrap(); } fn kill(&self) { - self.tx.send(false); + self.tx.send(false).unwrap(); } } @@ -130,7 +131,7 @@ pub struct State { pub mode: UIMode, components: Vec>, pub context: Context, - threads: FnvHashMap, thread::JoinHandle<()>)>, + threads: FnvHashMap, thread::JoinHandle<()>)>, work_controller: WorkController, } @@ -161,13 +162,13 @@ impl State { pub fn new() -> Self { /* Create a channel to communicate with other threads. The main process is the sole receiver. * */ - let (sender, receiver) = chan::sync(32 * ::std::mem::size_of::()); + let (sender, receiver) = bounded(32 * ::std::mem::size_of::()); /* * Create async channel to block the input-thread if we need to fork and stop it from reading * stdin, see get_events() for details * */ - let input_thread = chan::r#async(); + let input_thread = unbounded(); let backends = Backends::new(); let settings = Settings::new(); @@ -186,7 +187,9 @@ impl State { a_s.clone(), &backends, NotifyFn::new(Box::new(move |f: FolderHash| { - sender.send(ThreadEvent::UIEvent(UIEvent::StartupCheck(f))) + sender + .send(ThreadEvent::UIEvent(UIEvent::StartupCheck(f))) + .unwrap(); })), ) }) @@ -254,14 +257,14 @@ impl State { } let sender = s.context.sender.clone(); account.watch(RefreshEventConsumer::new(Box::new(move |r| { - sender.send(ThreadEvent::from(r)); + sender.send(ThreadEvent::from(r)).unwrap(); }))); } s.restore_input(); s } - pub fn worker_receiver(&mut self) -> chan::Receiver { + pub fn worker_receiver(&mut self) -> Receiver { self.work_controller.results_rx() } @@ -299,7 +302,7 @@ impl State { /// the thread from its list and `join` it. pub fn join(&mut self, id: thread::ThreadId) { let (tx, handle) = self.threads.remove(&id).unwrap(); - tx.send(true); + tx.send(true).unwrap(); handle.join().unwrap(); } @@ -555,7 +558,8 @@ impl State { UIEvent::ChangeMode(m) => { self.context .sender - .send(ThreadEvent::UIEvent(UIEvent::ChangeMode(m))); + .send(ThreadEvent::UIEvent(UIEvent::ChangeMode(m))) + .unwrap(); } _ => {} } diff --git a/ui/src/terminal/keys.rs b/ui/src/terminal/keys.rs index fc4318707..3bf1561e8 100644 --- a/ui/src/terminal/keys.rs +++ b/ui/src/terminal/keys.rs @@ -20,7 +20,7 @@ */ use super::*; -use chan; +use crossbeam::{channel::Receiver, select}; use serde::{Serialize, Serializer}; use std::fmt; use std::io; @@ -154,24 +154,23 @@ pub fn get_events( stdin: io::Stdin, mut closure: impl FnMut(Key), mut exit: impl FnMut(), - rx: &chan::Receiver, + rx: &Receiver, ) { let mut input_mode = InputMode::Normal; let mut paste_buf = String::with_capacity(256); for c in stdin.events() { - chan_select! { + select! { default => {}, - rx.recv() -> val => { - if let Some(true) = val { + recv(rx) -> val => { + if let Ok(true) = val { exit(); return; - } else if let Some(false) = val { + } else { return; } } - - }; + match c { Ok(TermionEvent::Key(k)) if input_mode == InputMode::Normal => { closure(Key::from(k)); diff --git a/ui/src/workers.rs b/ui/src/workers.rs index 1ea7f131d..4bdd1325c 100644 --- a/ui/src/workers.rs +++ b/ui/src/workers.rs @@ -1,4 +1,7 @@ -use chan; +use crossbeam::{ + channel::{bounded, unbounded, Receiver, Sender}, + select, +}; use melib::async_workers::Work; use std; @@ -8,13 +11,13 @@ const MAX_WORKER: usize = 4; pub struct WorkController { pub queue: WorkQueue, - thread_end_tx: chan::Sender, - results: Option>, + thread_end_tx: Sender, + results: Option>, threads: Vec>, } impl WorkController { - pub fn results_rx(&mut self) -> chan::Receiver { + pub fn results_rx(&mut self) -> Receiver { self.results.take().unwrap() } } @@ -22,7 +25,7 @@ impl WorkController { impl Drop for WorkController { fn drop(&mut self) { for _ in 0..self.threads.len() { - self.thread_end_tx.send(true); + self.thread_end_tx.send(true).unwrap(); } /* let threads = mem::replace(&mut self.threads, Vec::new()); @@ -141,7 +144,7 @@ impl WorkQueue { // the internal VecDeque. queue.push_back(work); - self.new_jobs_tx.send(true); + self.new_jobs_tx.send(true).unwrap(); // Now return the length of the queue. queue.len() } else { @@ -152,7 +155,7 @@ impl WorkQueue { impl WorkController { pub fn new() -> WorkController { - let (new_jobs_tx, new_jobs_rx) = chan::r#async(); + let (new_jobs_tx, new_jobs_rx) = unbounded(); // Create a new work queue to keep track of what work needs to be done. // Note that the queue is internally mutable (or, rather, the Mutex is), // but this binding doesn't need to be mutable. This isn't unsound because @@ -163,10 +166,10 @@ impl WorkController { // Create a MPSC (Multiple Producer, Single Consumer) channel. Every worker // is a producer, the main thread is a consumer; the producers put their // work into the channel when it's done. - let (results_tx, results_rx) = chan::r#async(); + let (results_tx, results_rx) = unbounded(); // Create a SyncFlag to share whether or not there are more jobs to be done. - let (thread_end_tx, thread_end_rx) = chan::sync(::std::mem::size_of::()); + let (thread_end_tx, thread_end_rx) = bounded(::std::mem::size_of::()); // This Vec will hold thread join handles to allow us to not exit while work // is still being done. These handles provide a .join() method which blocks @@ -196,12 +199,12 @@ impl WorkController { 'work_loop: loop { debug!("Waiting for work"); // Loop while there's expected to be work, looking for work. - chan_select! { - thread_end_rx.recv() -> _ => { + select! { + recv(thread_end_rx) -> _ => { debug!("received thread_end_rx, quitting"); break 'work_loop; }, - new_jobs_rx.recv() -> _ => { + recv(new_jobs_rx) -> _ => { // If work is available, do that work. while let Some(work) = thread_queue.get_work() { debug!("Got some work"); @@ -216,7 +219,7 @@ impl WorkController { // // Sending could fail. If so, there's no use in // doing any more work, so abort. - thread_results_tx.send(true); + thread_results_tx.send(true).unwrap(); // Signal to the operating system that now is a good time // to give another thread a chance to run.