From bc0189ffa1e09ccf895af4bda254cf4ba3da1975 Mon Sep 17 00:00:00 2001 From: Manos Pitsidianakis Date: Thu, 11 Jun 2020 12:01:11 +0300 Subject: [PATCH] Spawn workers on demand --- src/workers.rs | 93 ++++++++++++++++++++++++++++++++------------------ 1 file changed, 60 insertions(+), 33 deletions(-) diff --git a/src/workers.rs b/src/workers.rs index 71ed329f..c5846e00 100644 --- a/src/workers.rs +++ b/src/workers.rs @@ -30,6 +30,7 @@ use melib::async_workers::{Work, WorkContext}; use melib::datetime::{self, UnixTimestamp}; use melib::text_processing::Truncate; use std::collections::HashMap; +use std::sync::atomic::{AtomicUsize, Ordering}; use std::sync::Arc; use std::sync::Mutex; use std::thread; @@ -174,6 +175,8 @@ impl WorkController { }; let queue: WorkQueue = WorkQueue::new(new_jobs_tx, work_context.clone()); + + let active_threads = Arc::new(AtomicUsize::new(MAX_WORKER)); // Create a SyncFlag to share whether or not there are more jobs to be done. let (thread_end_tx, thread_end_rx) = bounded(1); @@ -185,13 +188,14 @@ impl WorkController { let mut threads = threads_lock.lock().unwrap(); /* spawn worker threads */ - for thread_num in 0..MAX_WORKER { + for _ in 0..MAX_WORKER { /* Each worker thread will wait on two channels: thread_end and new_jobs. thread_end * informs the worker that it should quit and new_jobs informs that there is a new job * available inside the queue. Only one worker will get each job, and others will * go back to waiting on the channels */ let thread_queue = queue.clone(); + let active_threads = active_threads.clone(); let thread_end_rx = thread_end_rx.clone(); let new_jobs_rx = new_jobs_rx.clone(); let new_jobs_rx = new_jobs_rx.clone(); @@ -199,37 +203,14 @@ impl WorkController { let work_context = work_context.clone(); let pulse = pulse.clone(); - let handle = thread::spawn(move || { - let mut work_done = 0; - - 'work_loop: loop { - debug!("Waiting for work"); - select! { - recv(thread_end_rx) -> _ => { - debug!("received thread_end_rx, quitting"); - break 'work_loop; - }, - recv(new_jobs_rx) -> _ => { - while let Some(work) = thread_queue.get_work() { - debug!("Got some work"); - work.compute(work_context.clone()); - debug!("finished work"); - - work_done += 1; - work_context.set_name.send((std::thread::current().id(), "idle-worker".to_string())).unwrap(); - work_context.set_status.send((std::thread::current().id(), "inactive".to_string())).unwrap(); - pulse.send(ThreadEvent::Pulse).unwrap(); - - std::thread::yield_now(); - } - continue 'work_loop; - }, - } - } - - /* report the amount of work done. */ - debug!("Thread {} did {} jobs.", thread_num, work_done); - }); + let handle = spawn_worker( + thread_queue, + active_threads, + thread_end_rx, + new_jobs_rx, + work_context, + pulse, + ); /* add the handle for the newly spawned thread to the list of handles */ threads.insert(handle.thread().id(), String::from("idle-worker").into()); @@ -242,7 +223,6 @@ impl WorkController { let threads_lock = threads_lock.clone(); let _static_threads_lock = static_threads_lock.clone(); let thread_queue = queue.clone(); - let threads_lock = threads_lock.clone(); let thread_end_rx = thread_end_rx.clone(); let work_context = work_context.clone(); @@ -259,6 +239,19 @@ impl WorkController { let handle = thread::spawn(move || work.compute(work_context)); _static_threads_lock.lock().unwrap().insert(handle.thread().id(), String::new().into()); } else { + if active_threads.load(Ordering::SeqCst) == 0 { + let handle = spawn_worker( + thread_queue.clone(), + active_threads.clone(), + thread_end_rx.clone(), + new_jobs_rx.clone(), + work_context.clone(), + pulse.clone(), + ); + + /* add the handle for the newly spawned thread to the list of handles */ + threads_lock.lock().unwrap().insert(handle.thread().id(), String::from("idle-worker").into()); + } thread_queue.add_work(work); } } @@ -356,3 +349,37 @@ impl WorkController { self.work_context.clone() } } + +fn spawn_worker( + thread_queue: WorkQueue, + active_threads: Arc, + thread_end_rx: crossbeam::Receiver, + new_jobs_rx: crossbeam::Receiver, + work_context: WorkContext, + pulse: crossbeam::Sender, +) -> std::thread::JoinHandle<()> { + thread::spawn(move || 'work_loop: loop { + debug!("Waiting for work"); + select! { + recv(thread_end_rx) -> _ => { + debug!("received thread_end_rx, quitting"); + active_threads.fetch_sub(1, Ordering::SeqCst); + break 'work_loop; + }, + recv(new_jobs_rx) -> _ => { + active_threads.fetch_sub(1, Ordering::SeqCst); + while let Some(work) = thread_queue.get_work() { + debug!("Got some work"); + work.compute(work_context.clone()); + debug!("finished work"); + work_context.set_name.send((std::thread::current().id(), "idle-worker".to_string())).unwrap(); + work_context.set_status.send((std::thread::current().id(), "inactive".to_string())).unwrap(); + pulse.send(ThreadEvent::Pulse).unwrap(); + + std::thread::yield_now(); + } + active_threads.fetch_add(1, Ordering::SeqCst); + }, + } + }) +}