From 44ffbe54e238f2d69609f902a7554c72b74a955d Mon Sep 17 00:00:00 2001 From: Manos Pitsidianakis Date: Tue, 21 Jul 2020 19:22:56 +0300 Subject: [PATCH] input_thread: add atomic refcount to check if thread is dead --- src/state.rs | 39 +++++++++++++++++++++++++++++++-------- src/terminal/keys.rs | 2 ++ 2 files changed, 33 insertions(+), 8 deletions(-) diff --git a/src/state.rs b/src/state.rs index 2c8a4a9e..91f3c389 100644 --- a/src/state.rs +++ b/src/state.rs @@ -51,16 +51,22 @@ struct InputHandler { pipe: (RawFd, RawFd), rx: Receiver, tx: Sender, + state_tx: Sender, + control: std::sync::Weak<()>, } impl InputHandler { - fn restore(&self, tx: Sender) { + fn restore(&mut self) { + let working = Arc::new(()); + let control = Arc::downgrade(&working); + /* Clear channel without blocking. switch_to_main_screen() issues a kill when * returning from a fork and there's no input thread, so the newly created thread will * receive it and die. */ //let _ = self.rx.try_iter().count(); let rx = self.rx.clone(); let pipe = self.pipe.0; + let tx = self.state_tx.clone(); thread::Builder::new() .name("input-thread".to_string()) .spawn(move || { @@ -70,15 +76,27 @@ impl InputHandler { }, &rx, pipe, + working, ) }) .unwrap(); + self.control = control; } fn kill(&self) { let _ = nix::unistd::write(self.pipe.1, &[1]); self.tx.send(InputCommand::Kill).unwrap(); } + + fn check(&mut self) { + match self.control.upgrade() { + Some(_) => {} + None => { + debug!("restarting input_thread"); + self.restore(); + } + } + } } /// A context container for loaded settings, accounts, UI changes, etc. @@ -95,7 +113,7 @@ pub struct Context { pub replies: VecDeque, sender: Sender, receiver: Receiver, - input: InputHandler, + input_thread: InputHandler, work_controller: WorkController, job_executor: Arc, pub children: Vec, @@ -110,11 +128,11 @@ impl Context { } pub fn input_kill(&self) { - self.input.kill(); + self.input_thread.kill(); } - pub fn restore_input(&self) { - self.input.restore(self.sender.clone()); + pub fn restore_input(&mut self) { + self.input_thread.restore(); } pub fn is_online(&mut self, account_pos: usize) -> Result<()> { @@ -299,6 +317,8 @@ impl State { timer.thread().unpark(); + let working = Arc::new(()); + let control = Arc::downgrade(&working); let mut s = State { cols, rows, @@ -334,13 +354,15 @@ impl State { children: vec![], plugin_manager, - sender, - receiver, - input: InputHandler { + input_thread: InputHandler { pipe: input_thread_pipe, rx: input_thread.1, tx: input_thread.0, + control, + state_tx: sender.clone(), }, + sender, + receiver, }, }; s.draw_rate_limit @@ -1128,5 +1150,6 @@ impl State { if ctr != self.context.accounts.len() { self.timer.thread().unpark(); } + self.context.input_thread.check(); } } diff --git a/src/terminal/keys.rs b/src/terminal/keys.rs index 28fb8cb9..8cfedec0 100644 --- a/src/terminal/keys.rs +++ b/src/terminal/keys.rs @@ -163,6 +163,7 @@ pub fn get_events( mut closure: impl FnMut((Key, Vec)), rx: &Receiver, new_command_fd: RawFd, + working: std::sync::Arc<()>, ) { let stdin = std::io::stdin(); let stdin_fd = PollFd::new(std::io::stdin().as_raw_fd(), PollFlags::POLLIN); @@ -229,6 +230,7 @@ pub fn get_events( } }; } + drop(working); } impl<'de> Deserialize<'de> for Key {