You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.

301 lines
12KB

  1. use chan;
  2. use melib::async_workers::Work;
  3. use std;
  4. use std::thread;
  5. const MAX_WORKER: usize = 4;
  6. pub struct WorkController {
  7. pub queue: WorkQueue<Work>,
  8. thread_end_tx: chan::Sender<bool>,
  9. results: Option<chan::Receiver<bool>>,
  10. threads: Vec<std::thread::JoinHandle<()>>,
  11. }
  12. impl WorkController {
  13. pub fn results_rx(&mut self) -> chan::Receiver<bool> {
  14. self.results.take().unwrap()
  15. }
  16. }
  17. /*
  18. impl Drop for WorkController {
  19. fn drop(&mut self) {
  20. for _ in 0..self.threads.len() {
  21. self.thread_end_tx.send(true);
  22. }
  23. let threads = mem::replace(&mut self.threads, Vec::new());
  24. for handle in threads {
  25. handle.join().unwrap();
  26. }
  27. }
  28. }
  29. */
  30. // We need a way to keep track of what work needs to be done.
  31. // This is a multi-source, multi-consumer queue which we call a
  32. // WorkQueue.
  33. // To create this type, we wrap a mutex (std::sync::mutex) around a
  34. // queue (technically a double-ended queue, std::collections::VecDeque).
  35. //
  36. // Mutex stands for MUTually EXclusive. It essentially ensures that only
  37. // one thread has access to a given resource at one time.
  38. use std::sync::Mutex;
  39. // A VecDeque is a double-ended queue, but we will only be using it in forward
  40. // mode; that is, we will push onto the back and pull from the front.
  41. use std::collections::VecDeque;
  42. // Finally we wrap the whole thing in Arc (Atomic Reference Counting) so that
  43. // we can safely share it with other threads. Arc (std::sync::arc) is a lot
  44. // like Rc (std::rc::Rc), in that it allows multiple references to some memory
  45. // which is freed when no references remain, except that it is atomic, making
  46. // it comparitively slow but able to be shared across the thread boundary.
  47. use std::sync::Arc;
  48. // All three of these types are wrapped around a generic type T.
  49. // T is required to be Send (a marker trait automatically implemented when
  50. // it is safe to do so) because it denotes types that are safe to move between
  51. // threads, which is the whole point of the WorkQueue.
  52. // For this implementation, T is required to be Copy as well, for simplicity.
  53. /// A generic work queue for work elements which can be trivially copied.
  54. /// Any producer of work can add elements and any worker can consume them.
  55. /// WorkQueue derives Clone so that it can be distributed among threads.
  56. #[derive(Clone)]
  57. pub struct WorkQueue<T: Send> {
  58. inner: Arc<Mutex<VecDeque<T>>>,
  59. new_jobs_tx: chan::Sender<bool>,
  60. }
  61. impl<T: Send> WorkQueue<T> {
  62. // Creating one of these by hand would be kind of a pain,
  63. // so let's provide a convenience function.
  64. /// Creates a new WorkQueue, ready to be used.
  65. fn new(new_jobs_tx: chan::Sender<bool>) -> Self {
  66. Self {
  67. inner: Arc::new(Mutex::new(VecDeque::new())),
  68. new_jobs_tx,
  69. }
  70. }
  71. // This is the function workers will use to acquire work from the queue.
  72. // They will call it in a loop, checking to see if there is any work available.
  73. /// Blocks the current thread until work is available, then
  74. /// gets the data required to perform that work.
  75. ///
  76. /// # Errors
  77. /// Returns None if there is no more work in the queue.
  78. ///
  79. /// # Panics
  80. /// Panics if the underlying mutex became poisoned. This is exceedingly
  81. /// unlikely.
  82. fn get_work(&self) -> Option<T> {
  83. // Try to get a lock on the Mutex. If this fails, there is a
  84. // problem with the mutex - it's poisoned, meaning that a thread that
  85. // held the mutex lock panicked before releasing it. There is no way
  86. // to guarantee that all its invariants are upheld, so we need to not
  87. // use it in that case.
  88. let maybe_queue = self.inner.lock();
  89. // A lot is going on here. self.inner is an Arc of Mutex. Arc can deref
  90. // into its internal type, so we can call the methods of that inner
  91. // type (Mutex) without dereferencing, so this is like
  92. // *(self.inner).lock()
  93. // but doesn't look awful. Mutex::lock() returns a
  94. // Result<MutexGuard<VecDeque<T>>>.
  95. // Unwrapping with if let, we get a MutexGuard, which is an RAII guard
  96. // that unlocks the Mutex when it goes out of scope.
  97. if let Ok(mut queue) = maybe_queue {
  98. // queue is a MutexGuard<VecDeque>, so this is like
  99. // (*queue).pop_front()
  100. // Returns Some(item) or None if there are no more items.
  101. queue.pop_front()
  102. // The function has returned, so queue goes out of scope and the
  103. // mutex unlocks.
  104. } else {
  105. // There's a problem with the mutex.
  106. panic!("WorkQueue::get_work() tried to lock a poisoned mutex");
  107. }
  108. }
  109. // Both the controller (main thread) and possibly workers can use this
  110. // function to add work to the queue.
  111. /// Blocks the current thread until work can be added, then
  112. /// adds that work to the end of the queue.
  113. /// Returns the amount of work now in the queue.
  114. ///
  115. /// # Panics
  116. /// Panics if the underlying mutex became poisoned. This is exceedingly
  117. /// unlikely.
  118. pub fn add_work(&self, work: T) -> usize {
  119. // As above, try to get a lock on the mutex.
  120. if let Ok(mut queue) = self.inner.lock() {
  121. // As above, we can use the MutexGuard<VecDeque<T>> to access
  122. // the internal VecDeque.
  123. queue.push_back(work);
  124. self.new_jobs_tx.send(true);
  125. // Now return the length of the queue.
  126. queue.len()
  127. } else {
  128. panic!("WorkQueue::add_work() tried to lock a poisoned mutex");
  129. }
  130. }
  131. }
  132. impl WorkController {
  133. pub fn new() -> WorkController {
  134. let (new_jobs_tx, new_jobs_rx) = chan::r#async();
  135. // Create a new work queue to keep track of what work needs to be done.
  136. // Note that the queue is internally mutable (or, rather, the Mutex is),
  137. // but this binding doesn't need to be mutable. This isn't unsound because
  138. // the Mutex ensures at runtime that no two references can be used;
  139. // therefore no mutation can occur at the same time as aliasing.
  140. let queue: WorkQueue<Work> = WorkQueue::new(new_jobs_tx);
  141. // Create a MPSC (Multiple Producer, Single Consumer) channel. Every worker
  142. // is a producer, the main thread is a consumer; the producers put their
  143. // work into the channel when it's done.
  144. let (results_tx, results_rx) = chan::r#async();
  145. // Create a SyncFlag to share whether or not there are more jobs to be done.
  146. let (thread_end_tx, thread_end_rx) = chan::sync(::std::mem::size_of::<bool>());
  147. // This Vec will hold thread join handles to allow us to not exit while work
  148. // is still being done. These handles provide a .join() method which blocks
  149. // the current thread until the thread referred to by the handle exits.
  150. let mut threads = Vec::new();
  151. for thread_num in 0..MAX_WORKER {
  152. // Get a reference to the queue for the thread to use
  153. // .clone() here doesn't clone the actual queue data, but rather the
  154. // internal Arc produces a new reference for use in the new queue
  155. // instance.
  156. let thread_queue = queue.clone();
  157. // Similarly, create a new transmitter for the thread to use
  158. let thread_results_tx = results_tx.clone();
  159. let thread_end_rx = thread_end_rx.clone();
  160. let new_jobs_rx = new_jobs_rx.clone();
  161. // thread::spawn takes a closure (an anonymous function that "closes"
  162. // over its environment). The move keyword means it takes ownership of
  163. // those variables, meaning they can't be used again in the main thread.
  164. let handle = thread::spawn(move || {
  165. // A varaible to keep track of how much work was done.
  166. let mut work_done = 0;
  167. 'work_loop: loop {
  168. // Loop while there's expected to be work, looking for work.
  169. chan_select! {
  170. thread_end_rx.recv() -> _ => {
  171. break 'work_loop;
  172. },
  173. new_jobs_rx.recv() -> _ => {
  174. // If work is available, do that work.
  175. while let Some(work) = thread_queue.get_work() {
  176. // Do some work.
  177. work.compute();
  178. // Record that some work was done.
  179. work_done += 1;
  180. // Send the work and the result of that work.
  181. //
  182. // Sending could fail. If so, there's no use in
  183. // doing any more work, so abort.
  184. thread_results_tx.send(true);
  185. // Signal to the operating system that now is a good time
  186. // to give another thread a chance to run.
  187. //
  188. // This isn't strictly necessary - the OS can preemptively
  189. // switch between threads, without asking - but it helps make
  190. // sure that other threads do get a chance to get some work.
  191. std::thread::yield_now();
  192. }
  193. continue 'work_loop;
  194. },
  195. }
  196. }
  197. // Report the amount of work done.
  198. debug!("Thread {} did {} jobs.", thread_num, work_done);
  199. });
  200. // Add the handle for the newly spawned thread to the list of handles
  201. threads.push(handle);
  202. }
  203. WorkController {
  204. queue,
  205. thread_end_tx,
  206. results: Some(results_rx),
  207. threads,
  208. }
  209. }
  210. }
  211. /*
  212. pub fn add_jobkk
  213. println!("Adding jobs to the queue.");
  214. // Variables to keep track of the number of jobs we expect to do.
  215. let mut jobs_remaining = 0;
  216. let mut jobs_total = 0;
  217. // Just add some numbers to the queue.
  218. // These numbers will be passed into fib(), so they need to stay pretty
  219. // small.
  220. for work in 0..90 {
  221. // Add each one several times.
  222. for _ in 0..100 {
  223. jobs_remaining = queue.add_work(work);
  224. jobs_total += 1;
  225. }
  226. }
  227. // Report that some jobs were inserted, and how many are left to be done.
  228. // This is interesting because the workers have been taking jobs out of the queue
  229. // the whole time the control thread has been putting them in!
  230. //
  231. // Try removing the use of std::thread::yield_now() in the thread closure.
  232. // You'll probably (depending on your system) notice that the number remaining
  233. // after insertion goes way up. That's because the operating system is usually
  234. // (not always, but usually) fairly conservative about interrupting a thread
  235. // that is actually doing work.
  236. //
  237. // Similarly, if you add a call to yield_now() in the loop above, you'll see the
  238. // number remaining probably drop to 1 or 2. This can also change depending on
  239. // how optimized the output code is - try `cargo run --release` vs `cargo run`.
  240. //
  241. // This inconsistency should drive home to you that you as the programmer can't
  242. // make any assumptions at all about when and in what order things will happen
  243. // in parallel code unless you use thread control primatives as demonstrated
  244. // in this program.
  245. println!("Total of {} jobs inserted into the queue ({} remaining at this time).",
  246. jobs_total,
  247. jobs_remaining);
  248. // Get completed work from the channel while there's work to be done.
  249. while jobs_total > 0 {
  250. match results_rx.recv() {
  251. // If the control thread successfully receives, a job was completed.
  252. Ok(_) => { jobs_total -= 1 },
  253. // If the control thread is the one left standing, that's pretty
  254. // problematic.
  255. Err(_) => {panic!("All workers died unexpectedly.");}
  256. }
  257. }
  258. */