Replace PosixTimer with async timers

jmap-eventsource
Manos Pitsidianakis 2020-10-29 13:09:31 +02:00
parent 57e6cf3980
commit 6392904047
Signed by: Manos Pitsidianakis
GPG Key ID: 73627C2F690DF710
11 changed files with 406 additions and 492 deletions

View File

@ -49,9 +49,6 @@ static GLOBAL: System = System;
extern crate melib;
use melib::*;
mod unix;
use unix::*;
#[macro_use]
pub mod types;
use crate::types::*;
@ -104,7 +101,6 @@ fn notify(
nix::fcntl::FcntlArg::F_SETFL(nix::fcntl::OFlag::O_NONBLOCK),
);
std::thread::spawn(move || {
let mut buf = [0; 1];
let mut ctr = 0;
loop {
ctr %= 3;
@ -117,16 +113,6 @@ fn notify(
for signal in signals.pending() {
let _ = s.send_timeout(signal, Duration::from_millis(500)).ok();
}
while nix::unistd::read(alarm_pipe_r, buf.as_mut())
.map(|s| s > 0)
.unwrap_or(false)
{
let value = buf[0];
let _ = sender.send_timeout(
ThreadEvent::UIEvent(UIEvent::Timer(value)),
Duration::from_millis(2000),
);
}
std::thread::sleep(std::time::Duration::from_millis(100));
ctr += 1;
@ -377,7 +363,9 @@ fn run_app(opt: Opt) -> Result<()> {
#[cfg(all(target_os = "linux", feature = "dbus-notifications"))]
{
let dbus_notifications = Box::new(components::notifications::DbusNotifications::new());
let dbus_notifications = Box::new(components::notifications::DbusNotifications::new(
&state.context,
));
state.register_component(dbus_notifications);
}
state.register_component(Box::new(

View File

@ -69,7 +69,7 @@
}
let job = ctx.keylist(secret, Some(pattern.clone()))?;
let handle = context.job_executor.spawn_specialized(job);
let mut progress_spinner = ProgressSpinner::new(8);
let mut progress_spinner = ProgressSpinner::new(8, context);
progress_spinner.start();
Ok(KeySelection::LoadingKeys {
handle,
@ -120,9 +120,7 @@
allow_remote_lookup,
..
} => match event {
UIEvent::StatusEvent(StatusEvent::JobFinished(ref id))
if *id == handle.job_id =>
{
UIEvent::StatusEvent(StatusEvent::JobFinished(ref id)) if *id == handle.job_id => {
match handle.chan.try_recv().unwrap().unwrap() {
Ok(keys) => {
if keys.is_empty() {
@ -162,10 +160,9 @@
StatusEvent::DisplayMessage(err.to_string()),
));
let res: Option<melib::gpgme::Key> = None;
context.replies.push_back(UIEvent::FinishedUIDialog(
id,
Box::new(res),
));
context
.replies
.push_back(UIEvent::FinishedUIDialog(id, Box::new(res)));
}
return true;
}

View File

@ -1391,7 +1391,7 @@ impl Listing {
dirty: true,
cursor_pos: (0, 0),
menu_cursor_pos: (0, 0),
startup_checks_rate: RateLimit::new(2, 1000),
startup_checks_rate: RateLimit::new(2, 1000, context.job_executor.clone()),
theme_default: conf::value(context, "theme_default"),
id: ComponentId::new_v4(),
show_divider: false,

View File

@ -47,9 +47,9 @@ mod dbus {
}
impl DbusNotifications {
pub fn new() -> Self {
pub fn new(context: &Context) -> Self {
DbusNotifications {
rate_limit: RateLimit::new(1000, 1000),
rate_limit: RateLimit::new(1000, 1000, context.job_executor.clone()),
}
}
}

View File

@ -74,7 +74,7 @@ impl fmt::Display for StatusBar {
impl StatusBar {
pub fn new(context: &Context, container: Box<dyn Component>) -> Self {
let mut progress_spinner = ProgressSpinner::new(19);
let mut progress_spinner = ProgressSpinner::new(19, context);
match context.settings.terminal.progress_spinner_sequence.as_ref() {
Some(conf::terminal::ProgressSpinnerSequence::Integer(k)) => {
progress_spinner.set_kind(*k);

View File

@ -1154,7 +1154,7 @@ impl ScrollBar {
#[derive(Debug)]
pub struct ProgressSpinner {
timer: crate::timer::PosixTimer,
timer: crate::jobs::Timer,
stage: usize,
pub kind: std::result::Result<usize, Vec<String>>,
pub width: usize,
@ -1201,13 +1201,11 @@ impl ProgressSpinner {
const INTERVAL: std::time::Duration = std::time::Duration::from_millis(50);
pub fn new(kind: usize) -> Self {
let timer = crate::timer::PosixTimer::new_with_signal(
std::time::Duration::from_millis(0),
std::time::Duration::from_millis(0),
nix::sys::signal::Signal::SIGALRM,
)
.unwrap();
pub fn new(kind: usize, context: &Context) -> Self {
let timer = context
.job_executor
.clone()
.create_timer(Self::INTERVAL, Self::INTERVAL);
let kind = kind % Self::KINDS.len();
let width = Self::KINDS[kind]
.iter()
@ -1255,19 +1253,13 @@ impl ProgressSpinner {
return;
}
self.active = true;
self.timer
.set_interval(Self::INTERVAL)
.set_value(Self::INTERVAL)
.rearm()
self.timer.rearm();
}
pub fn stop(&mut self) {
self.active = false;
self.stage = 0;
self.timer
.set_interval(std::time::Duration::from_millis(0))
.set_value(std::time::Duration::from_millis(0))
.rearm()
self.timer.disable();
}
}
@ -1309,7 +1301,7 @@ impl Component for ProgressSpinner {
fn process_event(&mut self, event: &mut UIEvent, _context: &mut Context) -> bool {
match event {
UIEvent::Timer(id) if *id == self.timer.si_value => {
UIEvent::Timer(id) if *id == self.timer.id() => {
match self.kind.as_ref() {
Ok(kind) => {
self.stage = (self.stage + 1).wrapping_rem(Self::KINDS[*kind].len());

View File

@ -28,14 +28,15 @@
use melib::error::Result;
use melib::smol;
use std::collections::HashMap;
use std::future::Future;
use std::panic::catch_unwind;
use std::sync::Arc;
use std::sync::{Arc, Mutex};
use std::thread;
use std::time::Duration;
use uuid::Uuid;
use crate::types::ThreadEvent;
use crate::types::{ThreadEvent, UIEvent};
use crossbeam::deque::{Injector, Stealer, Worker};
use crossbeam::sync::{Parker, Unparker};
use crossbeam::Sender;
@ -97,6 +98,7 @@ uuid_hash_type!(JobId);
pub struct MeliTask {
task: AsyncTask,
id: JobId,
timer: bool,
}
#[derive(Debug)]
@ -105,6 +107,37 @@ pub struct JobExecutor {
workers: Vec<Stealer<MeliTask>>,
sender: Sender<ThreadEvent>,
parkers: Vec<Unparker>,
timers: Arc<Mutex<HashMap<Uuid, TimerPrivate>>>,
}
#[derive(Debug, Default)]
struct TimerPrivate {
/// Interval for periodic timer.
interval: Duration,
/// Time until next expiration.
value: Duration,
active: bool,
handle: Option<async_task::JoinHandle<(), ()>>,
}
#[derive(Debug)]
pub struct Timer {
id: Uuid,
job_executor: Arc<JobExecutor>,
}
impl Timer {
pub fn id(&self) -> Uuid {
self.id
}
pub fn rearm(&self) {
self.job_executor.rearm(self.id);
}
pub fn disable(&self) {
self.job_executor.disable_timer(self.id);
}
}
impl JobExecutor {
@ -116,6 +149,7 @@ impl JobExecutor {
workers: vec![],
parkers: vec![],
sender,
timers: Arc::new(Mutex::new(HashMap::default())),
};
let mut workers = vec![];
for _ in 0..num_cpus::get().max(1) {
@ -146,11 +180,15 @@ impl JobExecutor {
parker.park_timeout(Duration::from_millis(100));
let task = find_task(&local, &global, stealers.as_slice());
if let Some(meli_task) = task {
let MeliTask { task, id } = meli_task;
let MeliTask { task, id, timer } = meli_task;
if !timer {
debug!("Worker {} got task {:?}", i, id);
}
let _ = catch_unwind(|| task.run());
if !timer {
debug!("Worker {} returned after {:?}", i, id);
}
}
})
.unwrap();
}
@ -177,7 +215,13 @@ impl JobExecutor {
.unwrap();
Ok(())
},
move |task| injector.push(MeliTask { task, id: job_id }),
move |task| {
injector.push(MeliTask {
task,
id: job_id,
timer: false,
})
},
(),
);
task.schedule();
@ -200,6 +244,91 @@ impl JobExecutor {
{
self.spawn_specialized(smol::unblock(move || futures::executor::block_on(future)))
}
pub fn create_timer(self: Arc<JobExecutor>, interval: Duration, value: Duration) -> Timer {
let id = Uuid::new_v4();
let timer = TimerPrivate {
interval,
value,
active: true,
handle: None,
};
self.timers.lock().unwrap().insert(id, timer);
self.arm_timer(id, value);
Timer {
id,
job_executor: self,
}
}
pub fn rearm(&self, timer_id: Uuid) {
let mut timers_lck = self.timers.lock().unwrap();
if let Some(timer) = timers_lck.get_mut(&timer_id) {
if let Some(handle) = timer.handle.take() {
handle.cancel();
}
let value = timer.value;
drop(timers_lck);
self.arm_timer(timer_id, value);
}
}
fn arm_timer(&self, id: Uuid, value: Duration) {
let job_id = JobId::new();
let sender = self.sender.clone();
let injector = self.global_queue.clone();
let timers = self.timers.clone();
let (task, handle) = async_task::spawn(
async move {
let mut value = value;
loop {
smol::Timer::after(value).await;
sender
.send(ThreadEvent::UIEvent(UIEvent::Timer(id)))
.unwrap();
if let Some(interval) = timers.lock().unwrap().get(&id).and_then(|timer| {
if timer.interval.as_millis() == 0 && timer.interval.as_secs() == 0 {
None
} else if timer.active {
Some(timer.interval)
} else {
None
}
}) {
value = interval;
} else {
break;
}
}
},
move |task| {
injector.push(MeliTask {
task,
id: job_id,
timer: true,
})
},
(),
);
self.timers.lock().unwrap().entry(id).and_modify(|timer| {
timer.handle = Some(handle);
timer.active = true;
});
task.schedule();
for unparker in self.parkers.iter() {
unparker.unpark();
}
}
fn disable_timer(&self, id: Uuid) {
let mut timers_lck = self.timers.lock().unwrap();
if let Some(timer) = timers_lck.get_mut(&id) {
if let Some(handle) = timer.handle.take() {
handle.cancel();
}
timer.active = false;
}
}
}
pub type JobChannel<T> = oneshot::Receiver<T>;

View File

@ -37,9 +37,6 @@ extern crate termion;
use melib::backends::imap::managesieve::new_managesieve_connection;
use melib::Result;
mod unix;
use unix::*;
#[macro_use]
pub mod types;
use crate::types::*;

View File

@ -327,7 +327,7 @@ impl State {
components: Vec::with_capacity(8),
overlay: Vec::new(),
timer,
draw_rate_limit: RateLimit::new(1, 3),
draw_rate_limit: RateLimit::new(1, 3, job_executor.clone()),
draw_horizontal_segment_fn: if settings.terminal.use_color() {
State::draw_horizontal_segment
} else {
@ -361,9 +361,6 @@ impl State {
receiver,
},
};
s.draw_rate_limit
.timer
.set_value(std::time::Duration::from_millis(3));
if s.context.settings.terminal.ascii_drawing {
s.grid.set_ascii_drawing(true);
s.overlay_grid.set_ascii_drawing(true);

View File

@ -36,9 +36,10 @@ mod helpers;
pub use self::helpers::*;
use super::command::Action;
use super::jobs::JobId;
use super::jobs::{JobExecutor, JobId};
use super::terminal::*;
use crate::components::{Component, ComponentId};
use std::sync::Arc;
use melib::backends::{AccountHash, BackendEvent, MailboxHash};
use melib::{EnvelopeHash, RefreshEvent, ThreadHash};
@ -144,7 +145,7 @@ pub enum UIEvent {
FinishedUIDialog(ComponentId, UIMessage),
Callback(CallbackFn),
GlobalUIDialog(Box<dyn Component>),
Timer(u8),
Timer(Uuid),
}
pub struct CallbackFn(pub Box<dyn FnOnce(&mut crate::Context) -> () + Send + 'static>);
@ -313,7 +314,7 @@ pub mod segment_tree {
#[derive(Debug)]
pub struct RateLimit {
last_tick: std::time::Instant,
pub timer: crate::timer::PosixTimer,
pub timer: crate::jobs::Timer,
rate: std::time::Duration,
reqs: u64,
millis: std::time::Duration,
@ -322,16 +323,13 @@ pub struct RateLimit {
//FIXME: tests.
impl RateLimit {
pub fn new(reqs: u64, millis: u64) -> Self {
pub fn new(reqs: u64, millis: u64, job_executor: Arc<JobExecutor>) -> Self {
RateLimit {
last_tick: std::time::Instant::now(),
timer: crate::timer::PosixTimer::new_with_signal(
timer: job_executor.create_timer(
std::time::Duration::from_secs(0),
std::time::Duration::from_millis(millis),
nix::sys::signal::Signal::SIGALRM,
)
.unwrap(),
),
rate: std::time::Duration::from_millis(millis / reqs),
reqs,
millis: std::time::Duration::from_millis(millis),
@ -357,8 +355,8 @@ impl RateLimit {
}
#[inline(always)]
pub fn id(&self) -> u8 {
self.timer.si_value
pub fn id(&self) -> Uuid {
self.timer.id()
}
}
#[test]

View File

@ -1,184 +0,0 @@
/*
* meli
*
* Copyright 2017-2018 Manos Pitsidianakis
*
* This file is part of meli.
*
* meli is free software: you can redistribute it and/or modify
* it under the terms of the GNU General Public License as published by
* the Free Software Foundation, either version 3 of the License, or
* (at your option) any later version.
*
* meli is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* GNU General Public License for more details.
*
* You should have received a copy of the GNU General Public License
* along with meli. If not, see <http://www.gnu.org/licenses/>.
*/
/*! UNIX and POSIX interfaces.
*/
pub mod timer {
//! POSIX timers
//!
//! # Example usage
//! ```no_run
//! let timer = crate::timer::PosixTimer::new_with_signal(
//! std::time::Duration::from_secs(0),
//! std::time::Duration::from_secs(1),
//! nix::sys::signal::Signal::SIGALRM,
//! )
//! .unwrap();
//!
//! // some time passes, we should receive and handle the SIGALRM
//! // The timer remains unarmed since the interval given was zero, until we rearm it explicitly.
//! timer.rearm();
//! ```
use libc::clockid_t;
use libc::sigevent;
use libc::{itimerspec, timespec};
use melib::{MeliError, Result};
use nix::sys::signal::{SigEvent, SigevNotify};
use std::cell::RefCell;
use std::convert::TryInto;
use std::time::Duration;
thread_local!(static TIMER_IDS: RefCell<u8> = RefCell::new(0));
#[allow(non_camel_case_types)]
pub type timer_t = libc::intptr_t;
#[link(name = "rt")]
extern "C" {
fn timer_create(clockid: clockid_t, sevp: *const sigevent, timerid: *mut timer_t) -> i32;
fn timer_settime(
timerid: timer_t,
flags: i32,
new_value: *const itimerspec,
old_value: *const itimerspec,
) -> i32;
fn timer_delete(timerid: timer_t) -> i32;
}
#[derive(Debug)]
pub struct PosixTimer {
timer_id: timer_t,
/// Interval for periodic timer.
interval: Duration,
/// Time until next expiration.
value: Duration,
/// `si_value` is a byte accessible from the signal handler when it receives signals from this timer.
pub si_value: u8,
}
impl Drop for PosixTimer {
fn drop(&mut self) {
unsafe {
timer_delete(self.timer_id);
}
}
}
impl PosixTimer {
/// Arm without changing interval and value.
pub fn rearm(&self) {
let spec = itimerspec {
it_interval: timespec {
tv_sec: self.interval.as_secs().try_into().unwrap_or(0),
tv_nsec: self.interval.subsec_nanos().try_into().unwrap_or(0),
},
it_value: timespec {
tv_sec: self.value.as_secs().try_into().unwrap_or(0),
tv_nsec: self.value.subsec_nanos().try_into().unwrap_or(0),
},
};
let ret =
unsafe { timer_settime(self.timer_id, 0, &spec as *const _, std::ptr::null_mut()) };
if ret != 0 {
match ret {
libc::EFAULT => {
panic!(
"EFAULT: new_value, old_value, or curr_value is not a valid pointer."
);
}
libc::EINVAL => {
panic!("EINVAL: timerid is invalid.");
}
_ => {}
}
}
}
/// Sets value without arming timer
pub fn set_value(&mut self, value: Duration) -> &mut Self {
self.value = value;
self
}
/// Sets interval without arming timer
pub fn set_interval(&mut self, interval: Duration) -> &mut Self {
self.interval = interval;
self
}
pub fn new_with_signal(
interval: Duration,
value: Duration,
signal: nix::sys::signal::Signal,
) -> Result<PosixTimer> {
let mut timer_id = Default::default();
let mut si_value = 0;
TIMER_IDS.with(|t| {
si_value = *t.borrow_mut();
*t.borrow_mut() += 1;
});
let sigev_notify = SigevNotify::SigevSignal {
signal,
si_value: si_value as isize,
};
let event = SigEvent::new(sigev_notify);
let ret = unsafe {
timer_create(
libc::CLOCK_MONOTONIC,
&event.sigevent() as *const _,
&mut timer_id as *mut _,
)
};
if ret != 0 {
match ret {
libc::EAGAIN => {
return Err(MeliError::new(
"Temporary error during kernel allocation of timer",
));
}
libc::EINVAL => {
panic!("Clock ID, sigev_notify, sigev_signo, or sigev_notify_thread_id is invalid.");
}
libc::ENOMEM => {
return Err(MeliError::new("Could not allocate memory."));
}
_ => {}
}
}
let ret = PosixTimer {
timer_id,
interval,
value,
si_value,
};
ret.rearm();
Ok(ret)
}
}
}