You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.

385 lines
15 KiB

3 years ago
3 years ago
3 years ago
3 years ago
3 years ago
3 years ago
3 years ago
3 years ago
3 years ago
3 years ago
3 years ago
3 years ago
3 years ago
3 years ago
3 years ago
3 years ago
3 years ago
3 years ago
3 years ago
3 years ago
3 years ago
3 years ago
3 years ago
2 years ago
3 years ago
3 years ago
3 years ago
  1. /*
  2. * meli
  3. *
  4. * Copyright 2017-2020 Manos Pitsidianakis
  5. *
  6. * This file is part of meli.
  7. *
  8. * meli is free software: you can redistribute it and/or modify
  9. * it under the terms of the GNU General Public License as published by
  10. * the Free Software Foundation, either version 3 of the License, or
  11. * (at your option) any later version.
  12. *
  13. * meli is distributed in the hope that it will be useful,
  14. * but WITHOUT ANY WARRANTY; without even the implied warranty of
  15. * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
  16. * GNU General Public License for more details.
  17. *
  18. * You should have received a copy of the GNU General Public License
  19. * along with meli. If not, see <http://www.gnu.org/licenses/>.
  20. */
  21. /*! Simple blocking job control.
  22. */
  23. use crate::types::ThreadEvent;
  24. use crossbeam::{
  25. channel::{bounded, unbounded, Sender},
  26. select,
  27. };
  28. use melib::async_workers::{Work, WorkContext};
  29. use melib::datetime::{self, UnixTimestamp};
  30. use melib::text_processing::Truncate;
  31. use std::collections::HashMap;
  32. use std::sync::atomic::{AtomicUsize, Ordering};
  33. use std::sync::Arc;
  34. use std::sync::Mutex;
  35. use std::thread;
  36. const MAX_WORKER: usize = 4;
  37. /// Representation of a worker thread for use in `WorkController`. These values are to be displayed
  38. /// to the user.
  39. #[derive(Debug)]
  40. pub struct Worker {
  41. pub name: String,
  42. pub status: String,
  43. pub heartbeat: UnixTimestamp,
  44. }
  45. impl From<String> for Worker {
  46. fn from(val: String) -> Self {
  47. Worker {
  48. name: val,
  49. status: String::new(),
  50. heartbeat: datetime::now(),
  51. }
  52. }
  53. }
  54. pub struct WorkController {
  55. pub queue: WorkQueue,
  56. thread_end_tx: Sender<bool>,
  57. /// Worker threads that take up on jobs from self.queue
  58. pub threads: Arc<Mutex<HashMap<thread::ThreadId, Worker>>>,
  59. /// Special function threads that live indefinitely (eg watching a mailbox)
  60. pub static_threads: Arc<Mutex<HashMap<thread::ThreadId, Worker>>>,
  61. work_context: WorkContext,
  62. }
  63. impl Drop for WorkController {
  64. fn drop(&mut self) {
  65. if let Ok(lock) = self.threads.lock() {
  66. for _ in 0..lock.len() {
  67. let _ = self.thread_end_tx.send(true);
  68. }
  69. }
  70. }
  71. }
  72. #[derive(Clone)]
  73. pub struct WorkQueue {
  74. inner: Arc<Mutex<Vec<Work>>>,
  75. new_jobs_tx: Sender<bool>,
  76. work_context: WorkContext,
  77. }
  78. impl WorkQueue {
  79. fn new(new_jobs_tx: Sender<bool>, work_context: WorkContext) -> Self {
  80. Self {
  81. inner: Arc::new(Mutex::new(Vec::new())),
  82. new_jobs_tx,
  83. work_context,
  84. }
  85. }
  86. /// Blocks the current thread until work is available, then
  87. /// gets the data required to perform that work.
  88. ///
  89. /// # Errors
  90. /// Returns None if there is no more work in the queue.
  91. ///
  92. /// # Panics
  93. /// Panics if the underlying mutex became poisoned. This is exceedingly
  94. /// unlikely.
  95. fn get_work(&self) -> Option<Work> {
  96. // try to get a lock on the mutex.
  97. let maybe_queue = self.inner.lock();
  98. if let Ok(mut queue) = maybe_queue {
  99. if queue.is_empty() {
  100. return None;
  101. } else {
  102. return Some(queue.swap_remove(0));
  103. }
  104. } else {
  105. // poisoned mutex, some other thread holding the mutex has panicked!
  106. panic!("WorkQueue::get_work() tried to lock a poisoned mutex");
  107. }
  108. }
  109. // Both the controller (main thread) and workers can use this
  110. // function to add work to the queue.
  111. /// Blocks the current thread until work can be added, then
  112. /// adds that work to the end of the queue.
  113. /// Returns the amount of work now in the queue.
  114. ///
  115. /// # Panics
  116. /// Panics if the underlying mutex became poisoned. This is exceedingly
  117. /// unlikely.
  118. pub fn add_work(&self, work: Work) {
  119. if work.is_static {
  120. self.work_context.new_work.send(work).unwrap();
  121. return;
  122. }
  123. // As above, try to get a lock on the mutex.
  124. if let Ok(mut queue) = self.inner.lock() {
  125. /* Insert in position that maintains the queue sorted */
  126. let pos = match queue.binary_search_by(|probe| probe.cmp(&work)) {
  127. Ok(p) => p,
  128. Err(p) => p,
  129. };
  130. queue.insert(pos, work);
  131. /* inform threads that new job is available */
  132. self.new_jobs_tx.send(true).unwrap();
  133. } else {
  134. panic!("WorkQueue::add_work() tried to lock a poisoned mutex");
  135. }
  136. }
  137. }
  138. impl WorkController {
  139. pub fn new(pulse: Sender<ThreadEvent>) -> WorkController {
  140. let (new_jobs_tx, new_jobs_rx) = unbounded();
  141. /* create a channel for jobs to send new work to Controller thread */
  142. let (new_work_tx, new_work_rx) = unbounded();
  143. /* create a channel for jobs to set their names */
  144. let (set_name_tx, set_name_rx) = unbounded();
  145. /* create a channel for jobs to set their statuses */
  146. let (set_status_tx, set_status_rx) = unbounded();
  147. /* create a channel for jobs to announce their demise */
  148. let (finished_tx, finished_rx) = unbounded();
  149. /* each associated thread will hold a copy of this context item in order to communicate
  150. * with the controller thread */
  151. let work_context = WorkContext {
  152. new_work: new_work_tx,
  153. set_name: set_name_tx,
  154. set_status: set_status_tx,
  155. finished: finished_tx,
  156. };
  157. let queue: WorkQueue = WorkQueue::new(new_jobs_tx, work_context.clone());
  158. let active_threads = Arc::new(AtomicUsize::new(MAX_WORKER));
  159. // Create a SyncFlag to share whether or not there are more jobs to be done.
  160. let (thread_end_tx, thread_end_rx) = bounded(1);
  161. let threads_lock: Arc<Mutex<HashMap<thread::ThreadId, Worker>>> =
  162. Arc::new(Mutex::new(HashMap::default()));
  163. let static_threads_lock: Arc<Mutex<HashMap<thread::ThreadId, Worker>>> =
  164. Arc::new(Mutex::new(HashMap::default()));
  165. let mut threads = threads_lock.lock().unwrap();
  166. /* spawn worker threads */
  167. for _ in 0..MAX_WORKER {
  168. /* Each worker thread will wait on two channels: thread_end and new_jobs. thread_end
  169. * informs the worker that it should quit and new_jobs informs that there is a new job
  170. * available inside the queue. Only one worker will get each job, and others will
  171. * go back to waiting on the channels */
  172. let thread_queue = queue.clone();
  173. let active_threads = active_threads.clone();
  174. let thread_end_rx = thread_end_rx.clone();
  175. let new_jobs_rx = new_jobs_rx.clone();
  176. let new_jobs_rx = new_jobs_rx.clone();
  177. let work_context = work_context.clone();
  178. let pulse = pulse.clone();
  179. let handle = spawn_worker(
  180. thread_queue,
  181. active_threads,
  182. thread_end_rx,
  183. new_jobs_rx,
  184. work_context,
  185. pulse,
  186. );
  187. /* add the handle for the newly spawned thread to the list of handles */
  188. threads.insert(handle.thread().id(), String::from("idle-worker").into());
  189. }
  190. /* drop lock */
  191. drop(threads);
  192. {
  193. /* start controller thread */
  194. let threads_lock = threads_lock.clone();
  195. let _static_threads_lock = static_threads_lock.clone();
  196. let thread_queue = queue.clone();
  197. let thread_end_rx = thread_end_rx.clone();
  198. let work_context = work_context.clone();
  199. let handle = thread::spawn(move || 'control_loop: loop {
  200. select! {
  201. recv(thread_end_rx) -> _ => {
  202. debug!("received thread_end_rx, quitting");
  203. break 'control_loop;
  204. },
  205. recv(new_work_rx) -> work => {
  206. if let Ok(work) = work {
  207. if work.is_static {
  208. let work_context = work_context.clone();
  209. let handle = thread::spawn(move || work.compute(work_context));
  210. _static_threads_lock.lock().unwrap().insert(handle.thread().id(), String::new().into());
  211. } else {
  212. if active_threads.load(Ordering::SeqCst) == 0 {
  213. let handle = spawn_worker(
  214. thread_queue.clone(),
  215. active_threads.clone(),
  216. thread_end_rx.clone(),
  217. new_jobs_rx.clone(),
  218. work_context.clone(),
  219. pulse.clone(),
  220. );
  221. /* add the handle for the newly spawned thread to the list of handles */
  222. threads_lock.lock().unwrap().insert(handle.thread().id(), String::from("idle-worker").into());
  223. }
  224. thread_queue.add_work(work);
  225. }
  226. }
  227. }
  228. recv(set_name_rx) -> new_name => {
  229. if let Ok((thread_id, mut new_name)) = new_name {
  230. new_name.truncate_at_boundary(256);
  231. let mut threads = threads_lock.lock().unwrap();
  232. let mut static_threads = _static_threads_lock.lock().unwrap();
  233. let now = datetime::now();
  234. if threads.contains_key(&thread_id) {
  235. threads.entry(thread_id).and_modify(|e| {
  236. e.name = new_name;
  237. e.heartbeat = now;
  238. });
  239. } else if static_threads.contains_key(&thread_id) {
  240. static_threads.entry(thread_id).and_modify(|e| {
  241. e.name = new_name;
  242. e.heartbeat = now;
  243. });
  244. } else {
  245. static_threads.insert(thread_id, Worker { heartbeat: now, .. new_name.into() });
  246. static_threads.entry(thread_id).and_modify(|e| {
  247. e.heartbeat = now;
  248. });
  249. }
  250. pulse.send(ThreadEvent::Pulse).unwrap();
  251. }
  252. }
  253. recv(set_status_rx) -> new_status => {
  254. if let Ok((thread_id, mut new_status)) = new_status {
  255. new_status.truncate_at_boundary(256);
  256. let mut threads = threads_lock.lock().unwrap();
  257. let mut static_threads = _static_threads_lock.lock().unwrap();
  258. let now = datetime::now();
  259. if threads.contains_key(&thread_id) {
  260. threads.entry(thread_id).and_modify(|e| {
  261. e.status = new_status;
  262. e.heartbeat = now;
  263. });
  264. } else if static_threads.contains_key(&thread_id) {
  265. static_threads.entry(thread_id).and_modify(|e| {
  266. e.status = new_status;
  267. e.heartbeat = now;
  268. });
  269. debug!(&static_threads[&thread_id]);
  270. } else {
  271. static_threads.insert(thread_id, Worker { status: new_status, heartbeat: now, .. String::new().into() });
  272. }
  273. pulse.send(ThreadEvent::Pulse).unwrap();
  274. }
  275. }
  276. recv(finished_rx) -> dead_thread_id => {
  277. if let Ok(thread_id) = dead_thread_id {
  278. let mut threads = threads_lock.lock().unwrap();
  279. let mut static_threads = _static_threads_lock.lock().unwrap();
  280. if threads.contains_key(&thread_id) {
  281. threads.remove(&thread_id);
  282. } else if static_threads.contains_key(&thread_id) {
  283. static_threads.remove(&thread_id);
  284. } else {
  285. /* Nothing to do */
  286. }
  287. pulse.send(ThreadEvent::Pulse).unwrap();
  288. }
  289. }
  290. }
  291. });
  292. let mut static_threads = static_threads_lock.lock().unwrap();
  293. static_threads.insert(
  294. handle.thread().id(),
  295. "WorkController-thread".to_string().into(),
  296. );
  297. }
  298. WorkController {
  299. queue,
  300. thread_end_tx,
  301. threads: threads_lock,
  302. static_threads: static_threads_lock,
  303. work_context,
  304. }
  305. }
  306. pub fn add_static_thread(&mut self, id: std::thread::ThreadId) {
  307. self.static_threads
  308. .lock()
  309. .unwrap()
  310. .insert(id, String::new().into());
  311. }
  312. pub fn get_context(&self) -> WorkContext {
  313. self.work_context.clone()
  314. }
  315. }
  316. fn spawn_worker(
  317. thread_queue: WorkQueue,
  318. active_threads: Arc<AtomicUsize>,
  319. thread_end_rx: crossbeam::Receiver<bool>,
  320. new_jobs_rx: crossbeam::Receiver<bool>,
  321. work_context: WorkContext,
  322. pulse: crossbeam::Sender<ThreadEvent>,
  323. ) -> std::thread::JoinHandle<()> {
  324. thread::spawn(move || 'work_loop: loop {
  325. debug!("Waiting for work");
  326. select! {
  327. recv(thread_end_rx) -> _ => {
  328. debug!("received thread_end_rx, quitting");
  329. active_threads.fetch_sub(1, Ordering::SeqCst);
  330. break 'work_loop;
  331. },
  332. recv(new_jobs_rx) -> _ => {
  333. active_threads.fetch_sub(1, Ordering::SeqCst);
  334. while let Some(work) = thread_queue.get_work() {
  335. debug!("Got some work");
  336. work.compute(work_context.clone());
  337. debug!("finished work");
  338. work_context.set_name.send((std::thread::current().id(), "idle-worker".to_string())).unwrap();
  339. work_context.set_status.send((std::thread::current().id(), "inactive".to_string())).unwrap();
  340. pulse.send(ThreadEvent::Pulse).unwrap();
  341. std::thread::yield_now();
  342. }
  343. active_threads.fetch_add(1, Ordering::SeqCst);
  344. },
  345. }
  346. })
  347. }