From ee10cdbcd5f28a30cacc020f32345817fc52aef9 Mon Sep 17 00:00:00 2001 From: Manos Pitsidianakis Date: Sun, 28 Jun 2020 15:39:33 +0300 Subject: [PATCH] Make get_async() return a Stream --- melib/Cargo.toml | 1 + melib/src/backends.rs | 2 +- melib/src/backends/imap_async.rs | 576 ++++++++++---------- melib/src/backends/imap_async/connection.rs | 5 +- melib/src/backends/imap_async/watch.rs | 220 ++------ melib/src/backends/maildir/backend.rs | 4 +- melib/src/connections.rs | 141 +++++ src/conf/accounts.rs | 89 ++- src/jobs1.rs | 15 +- 9 files changed, 568 insertions(+), 485 deletions(-) create mode 100644 melib/src/connections.rs diff --git a/melib/Cargo.toml b/melib/Cargo.toml index 8598bd67..48e6b20b 100644 --- a/melib/Cargo.toml +++ b/melib/Cargo.toml @@ -44,6 +44,7 @@ rusqlite = {version = "0.20.0", optional = true } libloading = "0.6.2" futures = "0.3.5" smol = "0.1.18" +async-stream = "0.2.1" [features] default = ["unicode_algorithms", "imap_backend", "maildir_backend", "mbox_backend", "vcard", "sqlite3"] diff --git a/melib/src/backends.rs b/melib/src/backends.rs index 9fc8a78b..52d5edb3 100644 --- a/melib/src/backends.rs +++ b/melib/src/backends.rs @@ -307,7 +307,7 @@ pub trait MailBackend: ::std::fmt::Debug + Send + Sync { fn get_async( &mut self, mailbox: &Mailbox, - ) -> Result>> + Send + 'static>>> { + ) -> Result>> + Send + 'static>>> { Err(MeliError::new("Unimplemented.")) } fn refresh( diff --git a/melib/src/backends/imap_async.rs b/melib/src/backends/imap_async.rs index 63bc3065..6e5142b9 100644 --- a/melib/src/backends/imap_async.rs +++ b/melib/src/backends/imap_async.rs @@ -30,8 +30,8 @@ pub use mailbox::*; //pub use operations::*; mod connection; pub use connection::*; -//mod watch; -//pub use watch::*; +mod watch; +pub use watch::*; mod cache; pub mod managesieve; //mod untagged; @@ -181,299 +181,51 @@ impl MailBackend for ImapType { fn get_async( &mut self, mailbox: &Mailbox, - ) -> Result>> + Send + 'static>>> { + ) -> Result>> + Send + 'static>>> { let uid_store = self.uid_store.clone(); let can_create_flags = self.can_create_flags.clone(); let mailbox_hash = mailbox.hash(); - let (permissions, mailbox_path, mailbox_exists, no_select, unseen) = { - let f = &uid_store.mailboxes.read().unwrap()[&mailbox_hash]; - ( - f.permissions.clone(), - f.imap_path().to_string(), - f.exists.clone(), - f.no_select, - f.unseen.clone(), - ) - }; let connection = self.connection.clone(); - Ok(Box::pin(async move { - if no_select { - return Ok(vec![]); + let mut max_uid: Option = None; + let mut valid_hash_set: HashSet = HashSet::default(); + let mut our_unseen: BTreeSet = Default::default(); + Ok(Box::pin(async_stream::try_stream! { + let (cached_hash_set, cached_payload) = get_cached_envs(mailbox_hash, &mut our_unseen, &uid_store)?; + yield cached_payload; + loop { + let res = get_hlpr(&connection, mailbox_hash,&cached_hash_set, &can_create_flags, &mut our_unseen, &mut valid_hash_set, &uid_store, &mut max_uid).await?; + yield res; + if max_uid == Some(1) { + return; + } + } - let mut our_unseen: BTreeSet = Default::default(); - let mut valid_hash_set: HashSet = HashSet::default(); - let (cached_hash_set, mut payload): (HashSet, Vec) = - (|| -> Result<(HashSet, Vec)> { - if !uid_store.cache_headers { - return Ok(Default::default()); - } - - let uidvalidities = uid_store.uidvalidity.lock().unwrap(); - - let v = if let Some(v) = uidvalidities.get(&mailbox_hash) { - v - } else { - return Ok(Default::default()); - }; - let cached_envs: (cache::MaxUID, Vec<(UID, Envelope)>); - cache::save_envelopes(uid_store.account_hash, mailbox_hash, *v, &[]) - .chain_err_summary(|| "Could not save envelopes in cache in get()")?; - cached_envs = cache::get_envelopes(uid_store.account_hash, mailbox_hash, *v) - .chain_err_summary(|| "Could not get envelopes in cache in get()")?; - let (_max_uid, envelopes) = debug!(cached_envs); - let ret = envelopes.iter().map(|(_, env)| env.hash()).collect(); - if !envelopes.is_empty() { - let mut payload = vec![]; - for (uid, env) in envelopes { - if !env.is_seen() { - our_unseen.insert(env.hash()); - } - uid_store - .hash_index - .lock() - .unwrap() - .insert(env.hash(), (uid, mailbox_hash)); - uid_store - .uid_index - .lock() - .unwrap() - .insert((mailbox_hash, uid), env.hash()); - payload.push(env); - } - debug!("sending cached payload for {}", mailbox_hash); - - unseen.lock().unwrap().insert_set(our_unseen.clone()); - return Ok((ret, payload)); - } - Ok((ret, vec![])) - })() - .unwrap_or_default(); - - let mut conn = connection.lock().await; - debug!("locked for get {}", mailbox_path); - let mut response = String::with_capacity(8 * 1024); - - conn.create_uid_msn_cache(mailbox_hash, 1).await?; - /* first SELECT the mailbox to get READ/WRITE permissions (because EXAMINE only - * returns READ-ONLY for both cases) */ - conn.select_mailbox(mailbox_hash, &mut response) - .await - .chain_err_summary(|| format!("Could not select mailbox {}", mailbox_path))?; - let mut examine_response = protocol_parser::select_response(&response) - .chain_err_summary(|| { - format!( - "Could not parse select response for mailbox {}", - mailbox_path - ) - })?; - *can_create_flags.lock().unwrap() = examine_response.can_create_flags; - debug!( - "mailbox: {} examine_response: {:?}", - mailbox_path, examine_response - ); - { - let mut uidvalidities = uid_store.uidvalidity.lock().unwrap(); - - let v = uidvalidities - .entry(mailbox_hash) - .or_insert(examine_response.uidvalidity); - if uid_store.cache_headers { - let _ = cache::save_envelopes( - uid_store.account_hash, - mailbox_hash, - examine_response.uidvalidity, - &[], - ); - } - *v = examine_response.uidvalidity; - let mut permissions = permissions.lock().unwrap(); - permissions.create_messages = !examine_response.read_only; - permissions.remove_messages = !examine_response.read_only; - permissions.set_flags = !examine_response.read_only; - permissions.rename_messages = !examine_response.read_only; - permissions.delete_messages = !examine_response.read_only; - permissions.delete_messages = !examine_response.read_only; - mailbox_exists - .lock() - .unwrap() - .set_not_yet_seen(examine_response.exists); - } - if examine_response.exists == 0 { - if uid_store.cache_headers { - for &env_hash in &cached_hash_set { - conn.add_refresh_event(RefreshEvent { - account_hash: uid_store.account_hash, - mailbox_hash, - kind: RefreshEventKind::Remove(env_hash), - }); - } - let _ = cache::save_envelopes( - uid_store.account_hash, - mailbox_hash, - examine_response.uidvalidity, - &[], - ); - } - return Ok(Vec::new()); - } - /* reselecting the same mailbox with EXAMINE prevents expunging it */ - conn.examine_mailbox(mailbox_hash, &mut response).await?; - if examine_response.uidnext == 0 { - /* UIDNEXT shouldn't be 0, since exists != 0 at this point */ - conn.send_command(format!("STATUS \"{}\" (UIDNEXT)", mailbox_path).as_bytes()) - .await?; - conn.read_response(&mut response, RequiredResponses::STATUS) - .await?; - let (_, status) = protocol_parser::status_response(response.as_bytes())?; - if let Some(uidnext) = status.uidnext { - if uidnext == 0 { - return Err(MeliError::new( - "IMAP server error: zero UIDNEXt with nonzero exists.", - )); - } - examine_response.uidnext = uidnext; - } else { - return Err(MeliError::new("IMAP server did not reply with UIDNEXT")); - } - } - let mut max_uid_left: usize = examine_response.uidnext - 1; - - while max_uid_left > 0 { - let mut envelopes = vec![]; - debug!("{} max_uid_left= {}", mailbox_hash, max_uid_left); - if max_uid_left == 1 { - debug!("UID FETCH 1 (UID FLAGS ENVELOPE BODYSTRUCTURE)"); - conn.send_command(b"UID FETCH 1 (UID FLAGS ENVELOPE BODYSTRUCTURE)") - .await?; - } else { - conn.send_command( - debug!(format!( - "UID FETCH {}:{} (UID FLAGS ENVELOPE BODYSTRUCTURE)", - std::cmp::max(std::cmp::max(max_uid_left.saturating_sub(500), 1), 1), - max_uid_left - )) - .as_bytes(), - ) - .await? - }; - conn.read_response(&mut response, RequiredResponses::FETCH_REQUIRED) - .await - .chain_err_summary(|| { - format!( - "Could not parse fetch response for mailbox {}", - mailbox_path - ) - })?; - debug!( - "fetch response is {} bytes and {} lines", - response.len(), - response.lines().collect::>().len() - ); - let (_, v, _) = protocol_parser::uid_fetch_responses(&response)?; - debug!("responses len is {}", v.len()); - for UidFetchResponse { - uid, - message_sequence_number, - flags, - envelope, - .. - } in v - { - let mut env = envelope.unwrap(); - let mut h = DefaultHasher::new(); - h.write_usize(uid); - h.write(mailbox_path.as_bytes()); - env.set_hash(h.finish()); - debug!( - "env hash {} {} UID = {} MSN = {}", - env.hash(), - env.subject(), - uid, - message_sequence_number - ); - valid_hash_set.insert(env.hash()); - let mut tag_lck = uid_store.tag_index.write().unwrap(); - if let Some((flags, keywords)) = flags { - if !flags.intersects(Flag::SEEN) { - our_unseen.insert(env.hash()); - } - env.set_flags(flags); - for f in keywords { - let hash = tag_hash!(f); - if !tag_lck.contains_key(&hash) { - tag_lck.insert(hash, f); - } - env.labels_mut().push(hash); - } - } - uid_store - .msn_index - .lock() - .unwrap() - .entry(mailbox_hash) - .or_default() - .insert(message_sequence_number - 1, uid); - uid_store - .hash_index - .lock() - .unwrap() - .insert(env.hash(), (uid, mailbox_hash)); - uid_store - .uid_index - .lock() - .unwrap() - .insert((mailbox_hash, uid), env.hash()); - envelopes.push((uid, env)); - } - max_uid_left = std::cmp::max(std::cmp::max(max_uid_left.saturating_sub(500), 1), 1); - debug!("sending payload for {}", mailbox_hash); - if uid_store.cache_headers { - cache::save_envelopes( - uid_store.account_hash, - mailbox_hash, - examine_response.uidvalidity, - &envelopes - .iter() - .map(|(uid, env)| (*uid, env)) - .collect::>(), - ) - .chain_err_summary(|| { - format!( - "Could not save envelopes in cache for mailbox {}", - mailbox_path - ) - })?; - } - for &env_hash in cached_hash_set.difference(&valid_hash_set) { - conn.add_refresh_event(RefreshEvent { - account_hash: uid_store.account_hash, - mailbox_hash, - kind: RefreshEventKind::Remove(env_hash), - }); - } - let progress = envelopes.len(); - unseen - .lock() - .unwrap() - .insert_set(our_unseen.iter().cloned().collect()); - mailbox_exists.lock().unwrap().insert_existing_set( - envelopes.iter().map(|(_, env)| env.hash()).collect::<_>(), - ); - payload.extend(envelopes.into_iter().map(|(_, env)| env)); - if max_uid_left == 1 { - break; - } - } - Ok(payload) })) } + fn refresh_async( &mut self, - _mailbox_hash: MailboxHash, - _sender: RefreshEventConsumer, + mailbox_hash: MailboxHash, + sender: RefreshEventConsumer, ) -> Result> + Send + 'static>>> { - Err(MeliError::new("Unimplemented.")) + let inbox = self + .uid_store + .mailboxes + .read() + .unwrap() + .get(&mailbox_hash) + .map(std::clone::Clone::clone) + .unwrap(); + let main_conn = self.connection.clone(); + *self.uid_store.sender.write().unwrap() = Some(sender); + let uid_store = self.uid_store.clone(); + Ok(Box::pin(async move { + let mut conn = main_conn.lock().await; + watch::examine_updates(&inbox, &mut conn, &uid_store).await?; + Ok(()) + })) } + fn mailboxes_async( &self, ) -> Result< @@ -1813,3 +1565,259 @@ async fn get_initial_max_uid( } Ok(examine_response.uidnext - 1) } + +async fn get_hlpr( + connection: &Arc>, + mailbox_hash: MailboxHash, + cached_hash_set: &HashSet, + can_create_flags: &Arc>, + our_unseen: &mut BTreeSet, + valid_hash_set: &mut HashSet, + uid_store: &UIDStore, + max_uid: &mut Option, +) -> Result> { + let (permissions, mailbox_path, mailbox_exists, no_select, unseen) = { + let f = &uid_store.mailboxes.read().unwrap()[&mailbox_hash]; + ( + f.permissions.clone(), + f.imap_path().to_string(), + f.exists.clone(), + f.no_select, + f.unseen.clone(), + ) + }; + let mut conn = connection.lock().await; + debug!("locked for get {}", mailbox_path); + let mut response = String::with_capacity(8 * 1024); + let max_uid_left = if let Some(max_uid) = max_uid { + *max_uid + } else { + conn.create_uid_msn_cache(mailbox_hash, 1).await?; + /* first SELECT the mailbox to get READ/WRITE permissions (because EXAMINE only + * returns READ-ONLY for both cases) */ + conn.select_mailbox(mailbox_hash, &mut response) + .await + .chain_err_summary(|| format!("Could not select mailbox {}", mailbox_path))?; + let mut examine_response = + protocol_parser::select_response(&response).chain_err_summary(|| { + format!( + "Could not parse select response for mailbox {}", + mailbox_path + ) + })?; + *can_create_flags.lock().unwrap() = examine_response.can_create_flags; + debug!( + "mailbox: {} examine_response: {:?}", + mailbox_path, examine_response + ); + { + let mut uidvalidities = uid_store.uidvalidity.lock().unwrap(); + + let v = uidvalidities + .entry(mailbox_hash) + .or_insert(examine_response.uidvalidity); + if uid_store.cache_headers { + let _ = cache::save_envelopes( + uid_store.account_hash, + mailbox_hash, + examine_response.uidvalidity, + &[], + ); + } + *v = examine_response.uidvalidity; + let mut permissions = permissions.lock().unwrap(); + permissions.create_messages = !examine_response.read_only; + permissions.remove_messages = !examine_response.read_only; + permissions.set_flags = !examine_response.read_only; + permissions.rename_messages = !examine_response.read_only; + permissions.delete_messages = !examine_response.read_only; + permissions.delete_messages = !examine_response.read_only; + mailbox_exists + .lock() + .unwrap() + .set_not_yet_seen(examine_response.exists); + } + if examine_response.exists == 0 { + if uid_store.cache_headers { + /* + for &env_hash in &cached_hash_set { + conn.add_refresh_event(RefreshEvent { + account_hash: uid_store.account_hash, + mailbox_hash, + kind: RefreshEventKind::Remove(env_hash), + }); + } + */ + let _ = cache::save_envelopes( + uid_store.account_hash, + mailbox_hash, + examine_response.uidvalidity, + &[], + ); + } + *max_uid = Some(0); + return Ok(Vec::new()); + } + /* reselecting the same mailbox with EXAMINE prevents expunging it */ + conn.examine_mailbox(mailbox_hash, &mut response).await?; + if examine_response.uidnext == 0 { + /* UIDNEXT shouldn't be 0, since exists != 0 at this point */ + conn.send_command(format!("STATUS \"{}\" (UIDNEXT)", mailbox_path).as_bytes()) + .await?; + conn.read_response(&mut response, RequiredResponses::STATUS) + .await?; + let (_, status) = protocol_parser::status_response(response.as_bytes())?; + if let Some(uidnext) = status.uidnext { + if uidnext == 0 { + return Err(MeliError::new( + "IMAP server error: zero UIDNEXt with nonzero exists.", + )); + } + examine_response.uidnext = uidnext; + } else { + return Err(MeliError::new("IMAP server did not reply with UIDNEXT")); + } + } + *max_uid = Some(examine_response.uidnext - 1); + examine_response.uidnext - 1 + }; + let chunk_size = 200; + + let mut payload = vec![]; + if conn.current_mailbox != Some(mailbox_hash) { + conn.examine_mailbox(mailbox_hash, &mut response).await?; + } + if max_uid_left > 0 { + let mut envelopes = vec![]; + debug!("{} max_uid_left= {}", mailbox_hash, max_uid_left); + if max_uid_left == 1 { + debug!("UID FETCH 1 (UID FLAGS ENVELOPE BODYSTRUCTURE)"); + conn.send_command(b"UID FETCH 1 (UID FLAGS ENVELOPE BODYSTRUCTURE)") + .await?; + } else { + conn.send_command( + debug!(format!( + "UID FETCH {}:{} (UID FLAGS ENVELOPE BODYSTRUCTURE)", + std::cmp::max(std::cmp::max(max_uid_left.saturating_sub(chunk_size), 1), 1), + max_uid_left + )) + .as_bytes(), + ) + .await? + }; + conn.read_response(&mut response, RequiredResponses::FETCH_REQUIRED) + .await + .chain_err_summary(|| { + format!( + "Could not parse fetch response for mailbox {}", + mailbox_path + ) + })?; + drop(conn); + debug!( + "fetch response is {} bytes and {} lines", + response.len(), + response.lines().collect::>().len() + ); + let (_, v, _) = protocol_parser::uid_fetch_responses(&response)?; + debug!("responses len is {}", v.len()); + for UidFetchResponse { + uid, + message_sequence_number, + flags, + envelope, + .. + } in v + { + let mut env = envelope.unwrap(); + let mut h = DefaultHasher::new(); + h.write_usize(uid); + h.write(mailbox_path.as_bytes()); + env.set_hash(h.finish()); + debug!( + "env hash {} {} UID = {} MSN = {}", + env.hash(), + env.subject(), + uid, + message_sequence_number + ); + valid_hash_set.insert(env.hash()); + let mut tag_lck = uid_store.tag_index.write().unwrap(); + if let Some((flags, keywords)) = flags { + if !flags.intersects(Flag::SEEN) { + our_unseen.insert(env.hash()); + } + env.set_flags(flags); + for f in keywords { + let hash = tag_hash!(f); + if !tag_lck.contains_key(&hash) { + tag_lck.insert(hash, f); + } + env.labels_mut().push(hash); + } + } + uid_store + .msn_index + .lock() + .unwrap() + .entry(mailbox_hash) + .or_default() + .insert(message_sequence_number - 1, uid); + uid_store + .hash_index + .lock() + .unwrap() + .insert(env.hash(), (uid, mailbox_hash)); + uid_store + .uid_index + .lock() + .unwrap() + .insert((mailbox_hash, uid), env.hash()); + envelopes.push((uid, env)); + } + debug!("sending payload for {}", mailbox_hash); + if uid_store.cache_headers { + //FIXME + /* + cache::save_envelopes( + uid_store.account_hash, + mailbox_hash, + examine_response.uidvalidity, + &envelopes + .iter() + .map(|(uid, env)| (*uid, env)) + .collect::>(), + ) + .chain_err_summary(|| { + format!( + "Could not save envelopes in cache for mailbox {}", + mailbox_path + ) + })?; + */ + } + /* + for &env_hash in cached_hash_set.difference(&valid_hash_set) { + conn.add_refresh_event(RefreshEvent { + account_hash: uid_store.account_hash, + mailbox_hash, + kind: RefreshEventKind::Remove(env_hash), + }); + } + */ + unseen + .lock() + .unwrap() + .insert_set(our_unseen.iter().cloned().collect()); + mailbox_exists + .lock() + .unwrap() + .insert_existing_set(envelopes.iter().map(|(_, env)| env.hash()).collect::<_>()); + payload.extend(envelopes.into_iter().map(|(_, env)| env)); + } + *max_uid = Some(std::cmp::max( + std::cmp::max(max_uid_left.saturating_sub(chunk_size), 1), + 1, + )); + Ok(payload) +} diff --git a/melib/src/backends/imap_async/connection.rs b/melib/src/backends/imap_async/connection.rs index c35135dd..65f042e2 100644 --- a/melib/src/backends/imap_async/connection.rs +++ b/melib/src/backends/imap_async/connection.rs @@ -148,14 +148,15 @@ impl ImapStream { } { + // FIXME: This is blocking let socket = socket.into_inner()?; - let mut conn_result = debug!(connector.connect(path, socket)); + let mut conn_result = connector.connect(path, socket); if let Err(native_tls::HandshakeError::WouldBlock(midhandshake_stream)) = conn_result { let mut midhandshake_stream = Some(midhandshake_stream); loop { - match debug!(midhandshake_stream.take().unwrap().handshake()) { + match midhandshake_stream.take().unwrap().handshake() { Ok(r) => { conn_result = Ok(r); break; diff --git a/melib/src/backends/imap_async/watch.rs b/melib/src/backends/imap_async/watch.rs index 9452de37..8df3ce4c 100644 --- a/melib/src/backends/imap_async/watch.rs +++ b/melib/src/backends/imap_async/watch.rs @@ -27,21 +27,18 @@ use std::sync::{Arc, Mutex}; /// Arguments for IMAP watching functions pub struct ImapWatchKit { pub conn: ImapConnection, - pub main_conn: Arc>, + pub main_conn: Arc>, pub uid_store: Arc, - pub work_context: WorkContext, } macro_rules! exit_on_error { - ($conn:expr, $mailbox_hash:ident, $work_context:ident, $thread_id:ident, $($result:expr)+) => { + ($conn:expr, $mailbox_hash:ident, $thread_id:ident, $($result:expr)+) => { $(if let Err(e) = $result { *$conn.uid_store.is_online.lock().unwrap() = ( Instant::now(), Err(e.clone()), ); debug!("failure: {}", e.to_string()); - $work_context.set_status.send(($thread_id, e.to_string())).unwrap(); - $work_context.finished.send($thread_id).unwrap(); let account_hash = $conn.uid_store.account_hash; $conn.add_refresh_event(RefreshEvent { account_hash, @@ -53,50 +50,30 @@ macro_rules! exit_on_error { }; } -pub fn poll_with_examine(kit: ImapWatchKit) -> Result<()> { +pub async fn poll_with_examine(kit: ImapWatchKit) -> Result<()> { debug!("poll with examine"); let ImapWatchKit { mut conn, main_conn, uid_store, - work_context, } = kit; - loop { - if super::try_lock(&uid_store.is_online, Some(std::time::Duration::new(10, 0)))? - .1 - .is_ok() - { - break; - } - std::thread::sleep(std::time::Duration::from_millis(100)); - } - conn.connect()?; + conn.connect().await?; let mut response = String::with_capacity(8 * 1024); let thread_id: std::thread::ThreadId = std::thread::current().id(); loop { - work_context - .set_status - .send((thread_id, "sleeping...".to_string())) - .unwrap(); - std::thread::sleep(std::time::Duration::from_millis(5 * 60 * 1000)); let mailboxes = uid_store.mailboxes.read()?; for mailbox in mailboxes.values() { - work_context - .set_status - .send(( - thread_id, - format!("examining `{}` for updates...", mailbox.path()), - )) - .unwrap(); - examine_updates(mailbox, &mut conn, &uid_store, &work_context)?; + examine_updates(mailbox, &mut conn, &uid_store).await?; } - let mut main_conn = super::try_lock(&main_conn, Some(std::time::Duration::new(10, 0)))?; - main_conn.send_command(b"NOOP")?; - main_conn.read_response(&mut response, RequiredResponses::empty())?; + let mut main_conn = main_conn.lock().await; + main_conn.send_command(b"NOOP").await?; + main_conn + .read_response(&mut response, RequiredResponses::empty()) + .await?; } } -pub fn idle(kit: ImapWatchKit) -> Result<()> { +pub async fn idle(kit: ImapWatchKit) -> Result<()> { debug!("IDLE"); /* IDLE only watches the connection's selected mailbox. We will IDLE on INBOX and every ~5 * minutes wake up and poll the others */ @@ -104,18 +81,8 @@ pub fn idle(kit: ImapWatchKit) -> Result<()> { mut conn, main_conn, uid_store, - work_context, } = kit; - loop { - if super::try_lock(&uid_store.is_online, Some(std::time::Duration::new(10, 0)))? - .1 - .is_ok() - { - break; - } - std::thread::sleep(std::time::Duration::from_millis(100)); - } - conn.connect()?; + conn.connect().await?; let thread_id: std::thread::ThreadId = std::thread::current().id(); let mailbox: ImapMailbox = match uid_store .mailboxes @@ -129,10 +96,6 @@ pub fn idle(kit: ImapWatchKit) -> Result<()> { None => { let err = MeliError::new("INBOX mailbox not found in local mailbox index. meli may have not parsed the IMAP mailboxes correctly"); debug!("failure: {}", err.to_string()); - work_context - .set_status - .send((thread_id, err.to_string())) - .unwrap(); conn.add_refresh_event(RefreshEvent { account_hash: uid_store.account_hash, mailbox_hash: 0, @@ -147,10 +110,11 @@ pub fn idle(kit: ImapWatchKit) -> Result<()> { exit_on_error!( conn, mailbox_hash, - work_context, thread_id, conn.send_command(format!("SELECT \"{}\"", mailbox.imap_path()).as_bytes()) + .await conn.read_response(&mut response, RequiredResponses::SELECT_REQUIRED) + .await ); debug!("select response {}", &response); { @@ -204,14 +168,9 @@ pub fn idle(kit: ImapWatchKit) -> Result<()> { exit_on_error!( conn, mailbox_hash, - work_context, thread_id, - conn.send_command(b"IDLE") + conn.send_command(b"IDLE").await ); - work_context - .set_status - .send((thread_id, "IDLEing".to_string())) - .unwrap(); let mut iter = ImapBlockingConnection::from(conn); let mut beat = std::time::Instant::now(); let mut watch = std::time::Instant::now(); @@ -222,46 +181,30 @@ pub fn idle(kit: ImapWatchKit) -> Result<()> { while let Some(line) = iter.next() { let now = std::time::Instant::now(); if now.duration_since(beat) >= _26_mins { - let mut main_conn_lck = - super::try_lock(&main_conn, Some(std::time::Duration::new(10, 0)))?; + let mut main_conn_lck = main_conn.lock().await; exit_on_error!( iter.conn, mailbox_hash, - work_context, thread_id, - iter.conn.set_nonblocking(true) - iter.conn.send_raw(b"DONE") - iter.conn.read_response(&mut response, RequiredResponses::empty()) - iter.conn.send_command(b"IDLE") - iter.conn.set_nonblocking(false) - main_conn_lck.send_command(b"NOOP") - main_conn_lck.read_response(&mut response, RequiredResponses::empty()) + iter.conn.send_raw(b"DONE").await + iter.conn.read_response(&mut response, RequiredResponses::empty()).await + iter.conn.send_command(b"IDLE").await + main_conn_lck.send_command(b"NOOP").await + main_conn_lck.read_response(&mut response, RequiredResponses::empty()).await ); beat = now; } if now.duration_since(watch) >= _5_mins { /* Time to poll all inboxes */ - let mut conn = try_lock(&main_conn, Some(std::time::Duration::new(10, 0)))?; + let mut conn = main_conn.lock().await; for mailbox in uid_store.mailboxes.read().unwrap().values() { - work_context - .set_status - .send(( - thread_id, - format!("examining `{}` for updates...", mailbox.path()), - )) - .unwrap(); exit_on_error!( conn, mailbox_hash, - work_context, thread_id, - examine_updates(mailbox, &mut conn, &uid_store, &work_context,) + examine_updates(mailbox, &mut conn, &uid_store).await ); } - work_context - .set_status - .send((thread_id, "done examining mailboxes.".to_string())) - .unwrap(); watch = now; } *uid_store.is_online.lock().unwrap() = (Instant::now(), Ok(())); @@ -270,20 +213,15 @@ pub fn idle(kit: ImapWatchKit) -> Result<()> { .map_err(MeliError::from) { Ok(Some(Recent(r))) => { - let mut conn = super::try_lock(&main_conn, Some(std::time::Duration::new(10, 0)))?; - work_context - .set_status - .send((thread_id, format!("got `{} RECENT` notification", r))) - .unwrap(); + let mut conn = main_conn.lock().await; /* UID SEARCH RECENT */ exit_on_error!( conn, mailbox_hash, - work_context, thread_id, - conn.examine_mailbox(mailbox_hash, &mut response) - conn.send_command(b"UID SEARCH RECENT") - conn.read_response(&mut response, RequiredResponses::SEARCH) + conn.examine_mailbox(mailbox_hash, &mut response).await + conn.send_command(b"UID SEARCH RECENT").await + conn.read_response(&mut response, RequiredResponses::SEARCH).await ); match protocol_parser::search_results_raw(response.as_bytes()) .map(|(_, v)| v) @@ -296,13 +234,12 @@ pub fn idle(kit: ImapWatchKit) -> Result<()> { exit_on_error!( conn, mailbox_hash, - work_context, thread_id, conn.send_command( &[&b"UID FETCH"[..], &v.trim().split(|b| b == &b' ').join(b','), &b"(FLAGS RFC822)"[..]] .join(&b' '), - ) - conn.read_response(&mut response, RequiredResponses::FETCH_REQUIRED) + ).await + conn.read_response(&mut response, RequiredResponses::FETCH_REQUIRED).await ); debug!(&response); match protocol_parser::uid_fetch_responses(&response) { @@ -313,13 +250,6 @@ pub fn idle(kit: ImapWatchKit) -> Result<()> { uid, flags, body, .. } in v { - work_context - .set_status - .send(( - thread_id, - format!("parsing {}/{} envelopes..", ctr, len), - )) - .unwrap(); ctr += 1; if !uid_store .uid_index @@ -383,10 +313,6 @@ pub fn idle(kit: ImapWatchKit) -> Result<()> { } } } - work_context - .set_status - .send((thread_id, format!("parsed {}/{} envelopes.", ctr, len))) - .unwrap(); } Err(e) => { debug!(e); @@ -409,11 +335,7 @@ pub fn idle(kit: ImapWatchKit) -> Result<()> { // immediately decremented by 1, and this decrement is reflected in // message sequence numbers in subsequent responses (including other // untagged EXPUNGE responses). - let mut conn = super::try_lock(&main_conn, Some(std::time::Duration::new(10, 0)))?; - work_context - .set_status - .send((thread_id, format!("got `{} EXPUNGED` notification", n))) - .unwrap(); + let mut conn = main_conn.lock().await; let deleted_uid = uid_store .msn_index .lock() @@ -436,30 +358,17 @@ pub fn idle(kit: ImapWatchKit) -> Result<()> { }); } Ok(Some(Exists(n))) => { - let mut conn = super::try_lock(&main_conn, Some(std::time::Duration::new(10, 0)))?; + let mut conn = main_conn.lock().await; /* UID FETCH ALL UID, cross-ref, then FETCH difference headers * */ let mut prev_exists = mailbox.exists.lock().unwrap(); debug!("exists {}", n); - work_context - .set_status - .send(( - thread_id, - format!( - "got `{} EXISTS` notification (EXISTS was previously {} for {}", - n, - prev_exists.len(), - mailbox.path() - ), - )) - .unwrap(); if n > prev_exists.len() { exit_on_error!( conn, mailbox_hash, - work_context, thread_id, - conn.examine_mailbox(mailbox_hash, &mut response) + conn.examine_mailbox(mailbox_hash, &mut response).await conn.send_command( &[ b"FETCH", @@ -467,8 +376,8 @@ pub fn idle(kit: ImapWatchKit) -> Result<()> { b"(UID FLAGS RFC822)", ] .join(&b' '), - ) - conn.read_response(&mut response, RequiredResponses::FETCH_REQUIRED) + ).await + conn.read_response(&mut response, RequiredResponses::FETCH_REQUIRED).await ); match protocol_parser::uid_fetch_responses(&response) { Ok((_, v, _)) => { @@ -478,13 +387,6 @@ pub fn idle(kit: ImapWatchKit) -> Result<()> { uid, flags, body, .. } in v { - work_context - .set_status - .send(( - thread_id, - format!("parsing {}/{} envelopes..", ctr, len), - )) - .unwrap(); if uid_store .uid_index .lock() @@ -545,10 +447,6 @@ pub fn idle(kit: ImapWatchKit) -> Result<()> { }); } } - work_context - .set_status - .send((thread_id, format!("parsed {}/{} envelopes.", ctr, len))) - .unwrap(); } Err(e) => { debug!(e); @@ -560,22 +458,21 @@ pub fn idle(kit: ImapWatchKit) -> Result<()> { /* a * {msg_seq} FETCH (FLAGS ({flags})) was received, so find out UID from msg_seq * and send update */ - let mut conn = super::try_lock(&main_conn, Some(std::time::Duration::new(10, 0)))?; + let mut conn = main_conn.lock().await; debug!("fetch {} {:?}", msg_seq, flags); exit_on_error!( conn, mailbox_hash, - work_context, thread_id, - conn.examine_mailbox(mailbox_hash, &mut response) + conn.examine_mailbox(mailbox_hash, &mut response).await conn.send_command( &[ b"UID SEARCH ", format!("{}", msg_seq).as_bytes(), ] .join(&b' '), - ) - conn.read_response(&mut response, RequiredResponses::SEARCH) + ).await + conn.read_response(&mut response, RequiredResponses::SEARCH).await ); match search_results(response.split_rn().next().unwrap_or("").as_bytes()) .map(|(_, v)| v) @@ -610,19 +507,10 @@ pub fn idle(kit: ImapWatchKit) -> Result<()> { Ok(Some(Bye { .. })) => break, Ok(None) | Err(_) => {} } - work_context - .set_status - .send((thread_id, "IDLEing".to_string())) - .unwrap(); } debug!("IDLE connection dropped"); let err: &str = iter.err().unwrap_or("Unknown reason."); - work_context - .set_status - .send((thread_id, "IDLE connection dropped".to_string())) - .unwrap(); - work_context.finished.send(thread_id).unwrap(); - main_conn.lock().unwrap().add_refresh_event(RefreshEvent { + main_conn.lock().await.add_refresh_event(RefreshEvent { account_hash: uid_store.account_hash, mailbox_hash, kind: RefreshEventKind::Failure(MeliError::new(format!( @@ -633,22 +521,19 @@ pub fn idle(kit: ImapWatchKit) -> Result<()> { Err(MeliError::new(format!("IDLE connection dropped: {}", err))) } -pub fn examine_updates( +pub async fn examine_updates( mailbox: &ImapMailbox, conn: &mut ImapConnection, uid_store: &Arc, - work_context: &WorkContext, ) -> Result<()> { - let thread_id: std::thread::ThreadId = std::thread::current().id(); let mailbox_hash = mailbox.hash(); debug!("examining mailbox {} {}", mailbox_hash, mailbox.path()); let mut response = String::with_capacity(8 * 1024); exit_on_error!( conn, mailbox_hash, - work_context, thread_id, - conn.examine_mailbox(mailbox_hash, &mut response) + conn.examine_mailbox(mailbox_hash, &mut response).await ); *uid_store.is_online.lock().unwrap() = (Instant::now(), Ok(())); let uidvalidity; @@ -690,7 +575,6 @@ pub fn examine_updates( }); } } - let mut prev_exists = mailbox.exists.lock().unwrap(); let n = ok.exists; if ok.recent > 0 { { @@ -698,10 +582,9 @@ pub fn examine_updates( exit_on_error!( conn, mailbox_hash, - work_context, thread_id, - conn.send_command(b"UID SEARCH RECENT") - conn.read_response(&mut response, RequiredResponses::SEARCH) + conn.send_command(b"UID SEARCH RECENT").await + conn.read_response(&mut response, RequiredResponses::SEARCH).await ); match protocol_parser::search_results_raw(response.as_bytes()) .map(|(_, v)| v) @@ -714,13 +597,12 @@ pub fn examine_updates( exit_on_error!( conn, mailbox_hash, - work_context, thread_id, conn.send_command( &[&b"UID FETCH"[..], &v.trim().split(|b| b == &b' ').join(b','), &b"(FLAGS RFC822)"[..]] .join(&b' '), - ) - conn.read_response(&mut response, RequiredResponses::FETCH_REQUIRED) + ).await + conn.read_response(&mut response, RequiredResponses::FETCH_REQUIRED).await ); debug!(&response); match protocol_parser::uid_fetch_responses(&response) { @@ -786,6 +668,7 @@ pub fn examine_updates( &[(uid, &env)], )?; } + let mut prev_exists = mailbox.exists.lock().unwrap(); prev_exists.insert_new(env.hash()); conn.add_refresh_event(RefreshEvent { @@ -810,24 +693,23 @@ pub fn examine_updates( } } } - } else if n > prev_exists.len() { + } else if n > mailbox.exists.lock().unwrap().len() { /* UID FETCH ALL UID, cross-ref, then FETCH difference headers * */ debug!("exists {}", n); exit_on_error!( conn, mailbox_hash, - work_context, thread_id, conn.send_command( &[ b"FETCH", - format!("{}:{}", prev_exists.len() + 1, n).as_bytes(), + format!("{}:{}", mailbox.exists.lock().unwrap().len() + 1, n).as_bytes(), b"(UID FLAGS RFC822)", ] .join(&b' '), - ) - conn.read_response(&mut response, RequiredResponses::FETCH_REQUIRED) + ).await + conn.read_response(&mut response, RequiredResponses::FETCH_REQUIRED).await ); match protocol_parser::uid_fetch_responses(&response) { Ok((_, v, _)) => { @@ -883,7 +765,7 @@ pub fn examine_updates( &[(uid, &env)], )?; } - prev_exists.insert_new(env.hash()); + mailbox.exists.lock().unwrap().insert_new(env.hash()); conn.add_refresh_event(RefreshEvent { account_hash: uid_store.account_hash, diff --git a/melib/src/backends/maildir/backend.rs b/melib/src/backends/maildir/backend.rs index 0435fa17..cd5afd18 100644 --- a/melib/src/backends/maildir/backend.rs +++ b/melib/src/backends/maildir/backend.rs @@ -193,11 +193,10 @@ impl MailBackend for MaildirType { self.multicore(4, mailbox) } - /* fn get_async( &mut self, mailbox: &Mailbox, - ) -> Result>> + Send + 'static>>> + ) -> Result>> + Send + 'static>>> { let mailbox: &MaildirMailbox = &self.mailboxes[&self.owned_mailbox_idx(mailbox)]; let mailbox_hash = mailbox.hash(); @@ -209,7 +208,6 @@ impl MailBackend for MaildirType { let mailbox_index = self.mailbox_index.clone(); super::stream::MaildirStream::new(&self.name, mailbox_hash, unseen, total, path, root_path, map, mailbox_index) } - */ fn refresh( &mut self, diff --git a/melib/src/connections.rs b/melib/src/connections.rs new file mode 100644 index 00000000..4f46815f --- /dev/null +++ b/melib/src/connections.rs @@ -0,0 +1,141 @@ +/* + * meli - melib library + * + * Copyright 2020 Manos Pitsidianakis + * + * This file is part of meli. + * + * meli is free software: you can redistribute it and/or modify + * it under the terms of the GNU General Public License as published by + * the Free Software Foundation, either version 3 of the License, or + * (at your option) any later version. + * + * meli is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU General Public License for more details. + * + * You should have received a copy of the GNU General Public License + * along with meli. If not, see . + */ + +#[derive(Debug)] +pub enum Connection { + Tcp(std::net::TcpStream), + Fd(std::os::unix::io::RawFd), + #[cfg(feature = "imap_backend")] + Tls(native_tls::TlsStream), +} + +use Connection::*; + +impl Connection { + pub fn set_nonblocking(&self, nonblocking: bool) -> std::io::Result<()> { + match self { + Tcp(ref t) => t.set_nonblocking(nonblocking), + #[cfg(feature = "imap_backend")] + Tls(ref t) => t.get_ref().set_nonblocking(nonblocking), + Fd(fd) => { + //FIXME TODO Review + nix::fcntl::fcntl( + *fd, + nix::fcntl::FcntlArg::F_SETFL(if nonblocking { + nix::fcntl::OFlag::O_NONBLOCK + } else { + !nix::fcntl::OFlag::O_NONBLOCK + }), + ) + .map_err(|err| { + std::io::Error::from_raw_os_error(err.as_errno().map(|n| n as i32).unwrap_or(0)) + })?; + Ok(()) + } + } + } + + pub fn set_read_timeout(&self, dur: Option) -> std::io::Result<()> { + match self { + Tcp(ref t) => t.set_read_timeout(dur), + #[cfg(feature = "imap_backend")] + Tls(ref t) => t.get_ref().set_read_timeout(dur), + Fd(_) => Ok(()), + } + } + + pub fn set_write_timeout(&self, dur: Option) -> std::io::Result<()> { + match self { + Tcp(ref t) => t.set_write_timeout(dur), + #[cfg(feature = "imap_backend")] + Tls(ref t) => t.get_ref().set_write_timeout(dur), + Fd(_) => Ok(()), + } + } +} + +impl Drop for Connection { + fn drop(&mut self) { + if let Fd(fd) = self { + let _ = nix::unistd::close(*fd); + } + } +} + +impl std::io::Read for Connection { + fn read(&mut self, buf: &mut [u8]) -> std::io::Result { + match self { + Tcp(ref mut t) => t.read(buf), + #[cfg(feature = "imap_backend")] + Tls(ref mut t) => t.read(buf), + Fd(f) => { + use std::os::unix::io::{FromRawFd, IntoRawFd}; + let mut f = unsafe { std::fs::File::from_raw_fd(*f) }; + let ret = f.read(buf); + let _ = f.into_raw_fd(); + ret + } + } + } +} + +impl std::io::Write for Connection { + fn write(&mut self, buf: &[u8]) -> std::io::Result { + match self { + Tcp(ref mut t) => t.write(buf), + #[cfg(feature = "imap_backend")] + Tls(ref mut t) => t.write(buf), + Fd(f) => { + use std::os::unix::io::{FromRawFd, IntoRawFd}; + let mut f = unsafe { std::fs::File::from_raw_fd(*f) }; + let ret = f.write(buf); + let _ = f.into_raw_fd(); + ret + } + } + } + + fn flush(&mut self) -> std::io::Result<()> { + match self { + Tcp(ref mut t) => t.flush(), + #[cfg(feature = "imap_backend")] + Tls(ref mut t) => t.flush(), + Fd(f) => { + use std::os::unix::io::{FromRawFd, IntoRawFd}; + let mut f = unsafe { std::fs::File::from_raw_fd(*f) }; + let ret = f.flush(); + let _ = f.into_raw_fd(); + ret + } + } + } +} + +impl std::os::unix::io::AsRawFd for Connection { + fn as_raw_fd(&self) -> std::os::unix::io::RawFd { + match self { + Tcp(ref t) => t.as_raw_fd(), + #[cfg(feature = "imap_backend")] + Tls(ref t) => t.get_ref().as_raw_fd(), + Fd(f) => *f, + } + } +} diff --git a/src/conf/accounts.rs b/src/conf/accounts.rs index f926f578..fe873678 100644 --- a/src/conf/accounts.rs +++ b/src/conf/accounts.rs @@ -44,11 +44,13 @@ use crate::{StatusEvent, ThreadEvent}; use crossbeam::Sender; use futures::channel::oneshot; pub use futures::stream::Stream; +use futures::stream::StreamExt; use std::collections::VecDeque; use std::fs; use std::io; use std::ops::{Index, IndexMut}; use std::os::unix::fs::PermissionsExt; +use std::pin::Pin; use std::result; use std::sync::{Arc, RwLock}; @@ -135,14 +137,31 @@ pub struct Account { notify_fn: Arc, } -#[derive(Debug)] enum JobRequest { Mailboxes(oneshot::Receiver>>), - Get(MailboxHash, oneshot::Receiver>>), + Get( + MailboxHash, + oneshot::Receiver<( + Option>>, + Pin>> + Send + 'static>>, + )>, + ), IsOnline(oneshot::Receiver>), Refresh(oneshot::Receiver>), } +impl core::fmt::Debug for JobRequest { + fn fmt(&self, f: &mut core::fmt::Formatter) -> core::fmt::Result { + match self { + JobRequest::Mailboxes(_) => write!(f, "{}", "JobRequest::Mailboxes"), + JobRequest::Get(hash, _) => write!(f, "JobRequest::Get({})", hash), + JobRequest::IsOnline(_) => write!(f, "{}", "JobRequest::IsOnline"), + + JobRequest::Refresh(_) => write!(f, "{}", "JobRequest::Refresh"), + } + } +} + impl Drop for Account { fn drop(&mut self) { if let Ok(data_dir) = xdg::BaseDirectories::with_profile("meli", &self.name) { @@ -248,6 +267,10 @@ impl Account { let mut active_jobs = HashMap::default(); if settings.conf.is_async { + if let Ok(online_job) = backend.is_online_async() { + let (rcvr, job_id) = job_executor.spawn_specialized(online_job); + active_jobs.insert(job_id, JobRequest::IsOnline(rcvr)); + } if let Ok(mailboxes_job) = backend.mailboxes_async() { let (rcvr, job_id) = job_executor.spawn_specialized(mailboxes_job); active_jobs.insert(job_id, JobRequest::Mailboxes(rcvr)); @@ -408,6 +431,7 @@ impl Account { entry.status = MailboxStatus::Parsing(0, 0); if self.settings.conf.is_async { if let Ok(mailbox_job) = self.backend.write().unwrap().get_async(&f) { + let mailbox_job = mailbox_job.into_future(); let (rcvr, job_id) = self.job_executor.spawn_specialized(mailbox_job); self.active_jobs.insert(job_id, JobRequest::Get(*h, rcvr)); } @@ -829,6 +853,7 @@ impl Account { &&self.mailbox_entries[&mailbox_hash].ref_mailbox, ) { + let mailbox_job = mailbox_job.into_future(); let (rcvr, job_id) = self.job_executor.spawn_specialized(mailbox_job); self.active_jobs @@ -1320,8 +1345,30 @@ impl Account { } } JobRequest::Get(mailbox_hash, mut chan) => { - let payload = debug!(chan.try_recv()).unwrap().unwrap(); + let (payload, rest): (Option>>, _) = + chan.try_recv().unwrap().unwrap(); debug!("got payload in status for {}", mailbox_hash); + if payload.is_none() { + debug!("finished in status for {}", mailbox_hash); + self.mailbox_entries + .entry(mailbox_hash) + .and_modify(|entry| { + entry.status = MailboxStatus::Available; + entry.worker = None; + }); + self.sender + .send(ThreadEvent::UIEvent(UIEvent::MailboxUpdate(( + self.index, + mailbox_hash, + )))) + .unwrap(); + return true; + } + + let (rcvr, job_id) = self.job_executor.spawn_specialized(rest.into_future()); + self.active_jobs + .insert(job_id, JobRequest::Get(mailbox_hash, rcvr)); + let payload = payload.unwrap(); if payload.is_err() { self.mailbox_entries .entry(mailbox_hash) @@ -1329,7 +1376,10 @@ impl Account { entry.status = MailboxStatus::Failed(payload.unwrap_err()); }); self.sender - .send(ThreadEvent::UIEvent(UIEvent::StartupCheck(mailbox_hash))) + .send(ThreadEvent::UIEvent(UIEvent::MailboxUpdate(( + self.index, + mailbox_hash, + )))) .unwrap(); return true; } @@ -1344,19 +1394,12 @@ impl Account { { for f in updated_mailboxes { self.sender - .send(ThreadEvent::UIEvent(UIEvent::StartupCheck(f))) + .send(ThreadEvent::UIEvent(UIEvent::MailboxUpdate(( + self.index, f, + )))) .unwrap(); } } - self.sender - .send(ThreadEvent::UIEvent(UIEvent::StartupCheck(mailbox_hash))) - .unwrap(); - self.mailbox_entries - .entry(mailbox_hash) - .and_modify(|entry| { - entry.status = MailboxStatus::Available; - entry.worker = None; - }); self.sender .send(ThreadEvent::UIEvent(UIEvent::MailboxUpdate(( self.index, @@ -1367,13 +1410,21 @@ impl Account { JobRequest::IsOnline(mut chan) => { let is_online = debug!(chan.try_recv()).unwrap(); if is_online.is_some() { - self.is_online = true; - } else { - if let Ok(online_job) = self.backend.read().unwrap().is_online_async() { - let (rcvr, job_id) = self.job_executor.spawn_specialized(online_job); - self.active_jobs.insert(job_id, JobRequest::IsOnline(rcvr)); + let is_online = is_online.unwrap(); + if is_online.is_ok() { + self.is_online = true; + self.sender + .send(ThreadEvent::UIEvent(UIEvent::AccountStatusChange( + self.index, + ))) + .unwrap(); + return true; } } + if let Ok(online_job) = self.backend.read().unwrap().is_online_async() { + let (rcvr, job_id) = self.job_executor.spawn_specialized(online_job); + self.active_jobs.insert(job_id, JobRequest::IsOnline(rcvr)); + } } _ => {} } diff --git a/src/jobs1.rs b/src/jobs1.rs index 2aa252d6..4cac95e8 100644 --- a/src/jobs1.rs +++ b/src/jobs1.rs @@ -127,11 +127,11 @@ impl JobExecutor { .name("meli-reactor".to_string()) .spawn(move || { smol::run(futures::future::pending::<()>()); - }); + }) + .unwrap(); // Spawn executor threads the first time the queue is created. for (i, (local, parker)) in workers.into_iter().enumerate() { - let sender = ret.sender.clone(); let global = ret.global_queue.clone(); let stealers = ret.workers.clone(); thread::Builder::new() @@ -142,10 +142,11 @@ impl JobExecutor { if let Some(meli_task) = task { let MeliTask { task, id } = meli_task; debug!("Worker {} got task {:?}", i, id); - let res = catch_unwind(|| task.run()); + let _ = catch_unwind(|| task.run()); debug!("Worker {} got result {:?}", i, id); } - }); + }) + .unwrap(); } ret } @@ -183,7 +184,7 @@ impl JobExecutor { pub fn spawn_specialized(&self, future: F) -> (oneshot::Receiver, JobId) where F: Future + Send + 'static, - R: Send + core::fmt::Debug + 'static, + R: Send + 'static, { let (sender, receiver) = oneshot::channel(); let finished_sender = self.sender.clone(); @@ -192,10 +193,10 @@ impl JobExecutor { let __job_id = job_id.clone(); let injector = self.global_queue.clone(); // Create a task and schedule it for execution. - let (task, handle) = async_task::spawn( + let (task, _) = async_task::spawn( async move { let res = future.await; - sender.send(res).unwrap(); + let _ = sender.send(res); finished_sender .send(ThreadEvent::JobFinished(__job_id)) .unwrap();