diff --git a/Cargo.lock b/Cargo.lock index b085c30dc..c4be75e72 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -27,6 +27,27 @@ version = "0.5.1" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "cff77d8686867eceff3105329d4698d96c2391c176d5d03adc90c7389162b5b8" +[[package]] +name = "async-stream" +version = "0.2.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "22068c0c19514942eefcfd4daf8976ef1aad84e61539f95cd200c35202f80af5" +dependencies = [ + "async-stream-impl", + "futures-core", +] + +[[package]] +name = "async-stream-impl" +version = "0.2.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "25f9db3b38af870bf7e5cc649167533b493928e50744e2c30ae350230b414670" +dependencies = [ + "proc-macro2", + "quote", + "syn", +] + [[package]] name = "async-task" version = "3.0.0" @@ -84,6 +105,19 @@ version = "0.1.6" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "0d8c1fef690941d3e7788d328517591fecc684c084084702d6ff1641e993699a" +[[package]] +name = "blocking" +version = "0.4.6" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "9d17efb70ce4421e351d61aafd90c16a20fb5bfe339fcdc32a86816280e62ce0" +dependencies = [ + "futures-channel", + "futures-util", + "once_cell", + "parking", + "waker-fn", +] + [[package]] name = "bumpalo" version = "3.4.0" @@ -102,6 +136,12 @@ version = "0.5.4" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "130aac562c0dd69c56b3b1cc8ffd2e17be31d0b6c25b61c96b76231aa23e39e1" +[[package]] +name = "cache-padded" +version = "1.1.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "24508e28c677875c380c20f4d28124fab6f8ed4ef929a1397d7b1a31e92f1005" + [[package]] name = "cc" version = "1.0.54" @@ -136,6 +176,15 @@ dependencies = [ "unicode-width", ] +[[package]] +name = "concurrent-queue" +version = "1.1.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "f83c06aff61f2d899eb87c379df3cbf7876f14471dcab474e0b6dc90ab96c080" +dependencies = [ + "cache-padded", +] + [[package]] name = "constant_time_eq" version = "0.1.5" @@ -347,6 +396,12 @@ version = "0.1.9" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "7360491ce676a36bf9bb3c56c1aa791658183a54d2744120f27285738d90465a" +[[package]] +name = "fastrand" +version = "1.3.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "b90eb1dec02087df472ab9f0db65f27edaa654a746830042688bcc2eaf68090f" + [[package]] name = "filetime" version = "0.2.10" @@ -846,6 +901,7 @@ dependencies = [ "nix", "notify", "notify-rust", + "num_cpus", "pcre2", "proc-macro2", "quote", @@ -873,6 +929,7 @@ dependencies = [ name = "melib" version = "0.5.0" dependencies = [ + "async-stream", "bincode", "bitflags", "crossbeam", @@ -892,6 +949,7 @@ dependencies = [ "serde_derive", "serde_json", "smallvec", + "smol", "unicode-segmentation", "uuid", "xdg", @@ -1173,6 +1231,12 @@ dependencies = [ "vcpkg", ] +[[package]] +name = "parking" +version = "1.0.3" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "c4029bc3504a62d92e42f30b9095fdef73b8a0b2a06aa41ce2935143b05a1a06" + [[package]] name = "pcre2" version = "0.2.3" @@ -1507,6 +1571,12 @@ dependencies = [ "winapi 0.3.8", ] +[[package]] +name = "scoped-tls" +version = "1.0.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "ea6a9290e3c9cf0f18145ef7ffa62d68ee0bf5fcd651017e586dc7fd5da448c2" + [[package]] name = "scopeguard" version = "1.1.0" @@ -1638,6 +1708,27 @@ dependencies = [ "serde", ] +[[package]] +name = "smol" +version = "0.1.18" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "620cbb3c6e34da57d3a248cda0cd01cd5848164dc062e764e65d06fe3ea7aed5" +dependencies = [ + "async-task", + "blocking", + "concurrent-queue", + "fastrand", + "futures-io", + "futures-util", + "libc", + "once_cell", + "scoped-tls", + "slab", + "socket2", + "wepoll-sys-stjepang", + "winapi 0.3.8", +] + [[package]] name = "socket2" version = "0.3.12" @@ -1916,6 +2007,12 @@ version = "1.0.2" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "6a02e4885ed3bc0f2de90ea6dd45ebcbb66dacffe03547fadbb0eeae2770887d" +[[package]] +name = "waker-fn" +version = "1.0.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "9571542c2ce85ce642e6b58b3364da2fb53526360dfb7c211add4f5c23105ff7" + [[package]] name = "walkdir" version = "2.3.1" @@ -2021,6 +2118,15 @@ dependencies = [ "wasm-bindgen", ] +[[package]] +name = "wepoll-sys-stjepang" +version = "1.0.6" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "6fd319e971980166b53e17b1026812ad66c6b54063be879eb182342b55284694" +dependencies = [ + "cc", +] + [[package]] name = "winapi" version = "0.2.8" diff --git a/Cargo.toml b/Cargo.toml index 44708afc6..43a621655 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -53,6 +53,7 @@ structopt = { version = "0.3.14", default-features = false } svg_crate = { version = "0.8.0", optional = true, package = "svg" } futures = "0.3.5" async-task = "3.0.0" +num_cpus = "1.12.0" [build-dependencies] syn = { version = "1.0.31", features = [] } diff --git a/melib/src/backends/imap_async.rs b/melib/src/backends/imap_async.rs index 1e255a7d9..368777d89 100644 --- a/melib/src/backends/imap_async.rs +++ b/melib/src/backends/imap_async.rs @@ -34,9 +34,9 @@ mod watch; pub use watch::*; mod cache; pub mod managesieve; -//mod untagged; +mod untagged; -use crate::async_workers::{Async, AsyncBuilder, AsyncStatus, WorkContext}; +use crate::async_workers::{Async, WorkContext}; use crate::backends::BackendOp; use crate::backends::RefreshEvent; use crate::backends::RefreshEventKind::{self, *}; @@ -46,7 +46,7 @@ use crate::conf::AccountSettings; use crate::email::*; use crate::error::{MeliError, Result, ResultIntoMeliError}; use futures::lock::Mutex as FutureMutex; -use futures::Stream; +use futures::stream::Stream; use std::collections::{hash_map::DefaultHasher, BTreeMap}; use std::collections::{BTreeSet, HashMap, HashSet}; use std::future::Future; @@ -174,9 +174,6 @@ pub struct ImapType { can_create_flags: Arc>, } -use futures::pin_mut; -use futures::stream::StreamExt; - impl MailBackend for ImapType { fn get_async( &mut self, @@ -315,392 +312,22 @@ impl MailBackend for ImapType { */ } - fn get(&mut self, mailbox: &Mailbox) -> Async>> { + fn get(&mut self, _mailbox: &Mailbox) -> Async>> { unimplemented!() - /* - let mut w = AsyncBuilder::new(); - let handle = { - let tx = w.tx(); - let uid_store = self.uid_store.clone(); - let can_create_flags = self.can_create_flags.clone(); - let mailbox_hash = mailbox.hash(); - let (permissions, mailbox_path, mailbox_exists, no_select, unseen) = { - let f = &self.uid_store.mailboxes.read().unwrap()[&mailbox_hash]; - ( - f.permissions.clone(), - f.imap_path().to_string(), - f.exists.clone(), - f.no_select, - f.unseen.clone(), - ) - }; - let connection = self.connection.clone(); - let closure = move |_work_context| { - if no_select { - tx.send(AsyncStatus::Payload(Ok(Vec::new()))).unwrap(); - tx.send(AsyncStatus::Finished).unwrap(); - return; - } - let _tx = tx.clone(); - if let Err(err) = (move || { - let tx = _tx; - let mut our_unseen: BTreeSet = Default::default(); - let mut valid_hash_set: HashSet = HashSet::default(); - let cached_hash_set: HashSet = - (|| -> Result> { - if !uid_store.cache_headers { - return Ok(HashSet::default()); - } - - let uidvalidities = uid_store.uidvalidity.lock().unwrap(); - - let v = if let Some(v) = uidvalidities.get(&mailbox_hash) { - v - } else { - return Ok(HashSet::default()); - }; - let cached_envs: (cache::MaxUID, Vec<(UID, Envelope)>); - cache::save_envelopes(uid_store.account_hash, mailbox_hash, *v, &[]) - .chain_err_summary(|| { - "Could not save envelopes in cache in get()" - })?; - cached_envs = - cache::get_envelopes(uid_store.account_hash, mailbox_hash, *v) - .chain_err_summary(|| { - "Could not get envelopes in cache in get()" - })?; - let (_max_uid, envelopes) = debug!(cached_envs); - let ret = envelopes.iter().map(|(_, env)| env.hash()).collect(); - if !envelopes.is_empty() { - let mut payload = vec![]; - for (uid, env) in envelopes { - if !env.is_seen() { - our_unseen.insert(env.hash()); - } - uid_store - .hash_index - .lock() - .unwrap() - .insert(env.hash(), (uid, mailbox_hash)); - uid_store - .uid_index - .lock() - .unwrap() - .insert((mailbox_hash, uid), env.hash()); - payload.push(env); - } - debug!("sending cached payload for {}", mailbox_hash); - - unseen.lock().unwrap().insert_set(our_unseen.clone()); - tx.send(AsyncStatus::Payload(Ok(payload))).unwrap(); - } - Ok(ret) - })() - .unwrap_or_default(); - - let mut conn = connection.lock()?; - debug!("locked for get {}", mailbox_path); - let mut response = String::with_capacity(8 * 1024); - - conn.create_uid_msn_cache(mailbox_hash, 1); - /* first SELECT the mailbox to get READ/WRITE permissions (because EXAMINE only - * returns READ-ONLY for both cases) */ - conn.select_mailbox(mailbox_hash, &mut response) - .chain_err_summary(|| { - format!("Could not select mailbox {}", mailbox_path) - })?; - let mut examine_response = protocol_parser::select_response(&response) - .chain_err_summary(|| { - format!( - "Could not parse select response for mailbox {}", - mailbox_path - ) - })?; - *can_create_flags.lock().unwrap() = examine_response.can_create_flags; - debug!( - "mailbox: {} examine_response: {:?}", - mailbox_path, examine_response - ); - { - let mut uidvalidities = uid_store.uidvalidity.lock().unwrap(); - - let v = uidvalidities - .entry(mailbox_hash) - .or_insert(examine_response.uidvalidity); - if uid_store.cache_headers { - let _ = cache::save_envelopes( - uid_store.account_hash, - mailbox_hash, - examine_response.uidvalidity, - &[], - ); - } - *v = examine_response.uidvalidity; - let mut permissions = permissions.lock().unwrap(); - permissions.create_messages = !examine_response.read_only; - permissions.remove_messages = !examine_response.read_only; - permissions.set_flags = !examine_response.read_only; - permissions.rename_messages = !examine_response.read_only; - permissions.delete_messages = !examine_response.read_only; - permissions.delete_messages = !examine_response.read_only; - mailbox_exists - .lock() - .unwrap() - .set_not_yet_seen(examine_response.exists); - } - if examine_response.exists == 0 { - if uid_store.cache_headers { - for &env_hash in &cached_hash_set { - conn.add_refresh_event(RefreshEvent { - account_hash: uid_store.account_hash, - mailbox_hash, - kind: RefreshEventKind::Remove(env_hash), - }); - } - let _ = cache::save_envelopes( - uid_store.account_hash, - mailbox_hash, - examine_response.uidvalidity, - &[], - ); - } - tx.send(AsyncStatus::Payload(Ok(Vec::new()))).unwrap(); - tx.send(AsyncStatus::Finished).unwrap(); - return Ok(()); - } - /* reselecting the same mailbox with EXAMINE prevents expunging it */ - conn.examine_mailbox(mailbox_hash, &mut response)?; - if examine_response.uidnext == 0 { - /* UIDNEXT shouldn't be 0, since exists != 0 at this point */ - conn.send_command( - format!("STATUS \"{}\" (UIDNEXT)", mailbox_path).as_bytes(), - )?; - conn.read_response(&mut response, RequiredResponses::STATUS)?; - let (_, status) = protocol_parser::status_response(response.as_bytes())?; - if let Some(uidnext) = status.uidnext { - if uidnext == 0 { - return Err(MeliError::new( - "IMAP server error: zero UIDNEXt with nonzero exists.", - )); - } - examine_response.uidnext = uidnext; - } else { - return Err(MeliError::new("IMAP server did not reply with UIDNEXT")); - } - } - let mut max_uid_left: usize = examine_response.uidnext - 1; - - let mut tag_lck = uid_store.tag_index.write().unwrap(); - - while max_uid_left > 0 { - let mut envelopes = vec![]; - debug!("{} max_uid_left= {}", mailbox_hash, max_uid_left); - if max_uid_left == 1 { - debug!("UID FETCH 1 (UID FLAGS ENVELOPE BODYSTRUCTURE)"); - conn.send_command(b"UID FETCH 1 (UID FLAGS ENVELOPE BODYSTRUCTURE)")?; - } else { - conn.send_command( - debug!(format!( - "UID FETCH {}:{} (UID FLAGS ENVELOPE BODYSTRUCTURE)", - std::cmp::max( - std::cmp::max(max_uid_left.saturating_sub(500), 1), - 1 - ), - max_uid_left - )) - .as_bytes(), - )? - }; - conn.read_response(&mut response, RequiredResponses::FETCH_REQUIRED) - .chain_err_summary(|| { - format!( - "Could not parse fetch response for mailbox {}", - mailbox_path - ) - })?; - debug!( - "fetch response is {} bytes and {} lines", - response.len(), - response.lines().collect::>().len() - ); - let (_, v, _) = protocol_parser::uid_fetch_responses(&response)?; - debug!("responses len is {}", v.len()); - for UidFetchResponse { - uid, - message_sequence_number, - flags, - envelope, - .. - } in v - { - let mut env = envelope.unwrap(); - let mut h = DefaultHasher::new(); - h.write_usize(uid); - h.write(mailbox_path.as_bytes()); - env.set_hash(h.finish()); - debug!( - "env hash {} {} UID = {} MSN = {}", - env.hash(), - env.subject(), - uid, - message_sequence_number - ); - valid_hash_set.insert(env.hash()); - if let Some((flags, keywords)) = flags { - if !flags.intersects(Flag::SEEN) { - our_unseen.insert(env.hash()); - } - env.set_flags(flags); - for f in keywords { - let hash = tag_hash!(f); - if !tag_lck.contains_key(&hash) { - tag_lck.insert(hash, f); - } - env.labels_mut().push(hash); - } - } - uid_store - .msn_index - .lock() - .unwrap() - .entry(mailbox_hash) - .or_default() - .insert(message_sequence_number - 1, uid); - uid_store - .hash_index - .lock() - .unwrap() - .insert(env.hash(), (uid, mailbox_hash)); - uid_store - .uid_index - .lock() - .unwrap() - .insert((mailbox_hash, uid), env.hash()); - envelopes.push((uid, env)); - } - max_uid_left = - std::cmp::max(std::cmp::max(max_uid_left.saturating_sub(500), 1), 1); - debug!("sending payload for {}", mailbox_hash); - if uid_store.cache_headers { - cache::save_envelopes( - uid_store.account_hash, - mailbox_hash, - examine_response.uidvalidity, - &envelopes - .iter() - .map(|(uid, env)| (*uid, env)) - .collect::>(), - ) - .chain_err_summary(|| { - format!( - "Could not save envelopes in cache for mailbox {}", - mailbox_path - ) - })?; - } - for &env_hash in cached_hash_set.difference(&valid_hash_set) { - conn.add_refresh_event(RefreshEvent { - account_hash: uid_store.account_hash, - mailbox_hash, - kind: RefreshEventKind::Remove(env_hash), - }); - } - let progress = envelopes.len(); - unseen - .lock() - .unwrap() - .insert_set(our_unseen.iter().cloned().collect()); - mailbox_exists.lock().unwrap().insert_existing_set( - envelopes.iter().map(|(_, env)| env.hash()).collect::<_>(), - ); - tx.send(AsyncStatus::Payload(Ok(envelopes - .into_iter() - .map(|(_, env)| env) - .collect::>()))) - .unwrap(); - tx.send(AsyncStatus::ProgressReport(progress)).unwrap(); - if max_uid_left == 1 { - break; - } - } - drop(conn); - Ok(()) - })() { - debug!("sending error payload for {}: {:?}", mailbox_hash, &err); - tx.send(AsyncStatus::Payload(Err(err))).unwrap(); - } - tx.send(AsyncStatus::Finished).unwrap(); - }; - Box::new(closure) - }; - w.build(handle) - */ } fn refresh( &mut self, - mailbox_hash: MailboxHash, - sender: RefreshEventConsumer, + _mailbox_hash: MailboxHash, + _sender: RefreshEventConsumer, ) -> Result> { unimplemented!() - /* - let inbox = self - .uid_store - .mailboxes - .read() - .unwrap() - .get(&mailbox_hash) - .map(std::clone::Clone::clone) - .unwrap(); - let main_conn = self.connection.clone(); - *self.uid_store.sender.write().unwrap() = Some(sender); - let uid_store = self.uid_store.clone(); - let account_name = self.account_name.clone(); - let w = AsyncBuilder::new(); - let closure = move |work_context: WorkContext| { - let thread = std::thread::current(); - let mut conn = match try_lock(&main_conn, Some(std::time::Duration::new(2, 0))) { - Ok(conn) => conn, - Err(err) => { - uid_store - .sender - .read() - .unwrap() - .as_ref() - .unwrap() - .send(RefreshEvent { - account_hash: uid_store.account_hash, - mailbox_hash, - kind: RefreshEventKind::Failure(err.clone()), - }); - - return; - } - }; - work_context - .set_name - .send(( - thread.id(), - format!("refreshing {} imap connection", account_name.as_str(),), - )) - .unwrap(); - - work_context - .set_status - .send((thread.id(), "refresh".to_string())) - .unwrap(); - //watch::examine_updates(&inbox, &mut conn, &uid_store, &work_context) - // .ok() - // .take(); - }; - Ok(w.build(Box::new(closure))) - */ } fn watch( &self, - sender: RefreshEventConsumer, - work_context: WorkContext, + _sender: RefreshEventConsumer, + _work_context: WorkContext, ) -> Result { Ok(std::thread::current().id()) //Err(MeliError::new("Unimplemented.")) @@ -742,51 +369,6 @@ impl MailBackend for ImapType { fn mailboxes(&self) -> Result> { unimplemented!() - /* - { - let mailboxes = self.uid_store.mailboxes.read().unwrap(); - if !mailboxes.is_empty() { - return Ok(mailboxes - .iter() - .map(|(h, f)| (*h, Box::new(Clone::clone(f)) as Mailbox)) - .collect()); - } - } - let new_mailboxes = ImapType::imap_mailboxes(&self.connection)?; - let mut mailboxes = self.uid_store.mailboxes.write()?; - *mailboxes = new_mailboxes; - let mut invalid_configs = vec![]; - for m in mailboxes.values() { - if m.is_subscribed() != (self.is_subscribed)(m.path()) { - invalid_configs.push((m.path(), m.is_subscribed())); - } - } - if !invalid_configs.is_empty() { - let mut err_string = format!("{}: ", self.account_name); - for (m, server_value) in invalid_configs.iter() { - err_string.extend(format!( - "Mailbox `{}` is {}subscribed on server but {}subscribed in your configuration. These settings have to match.\n", - if *server_value { "" } else { "not " }, - if *server_value { "not " } else { "" }, - m - ).chars()); - } - return Err(MeliError::new(err_string)); - } - mailboxes.retain(|_, f| (self.is_subscribed)(f.path())); - let keys = mailboxes.keys().cloned().collect::>(); - let mut uid_lock = self.uid_store.uidvalidity.lock().unwrap(); - for f in mailboxes.values_mut() { - uid_lock.entry(f.hash()).or_default(); - f.children.retain(|c| keys.contains(c)); - } - drop(uid_lock); - Ok(mailboxes - .iter() - .filter(|(_, f)| f.is_subscribed) - .map(|(h, f)| (*h, Box::new(Clone::clone(f)) as Mailbox)) - .collect()) - */ } fn operation(&self, hash: EnvelopeHash) -> Result> { @@ -810,7 +392,7 @@ impl MailBackend for ImapType { ))) } - fn save(&self, bytes: &[u8], mailbox_hash: MailboxHash, flags: Option) -> Result<()> { + fn save(&self, _bytes: &[u8], _mailbox_hash: MailboxHash, _flags: Option) -> Result<()> { unimplemented!() /* let path = { @@ -867,7 +449,7 @@ impl MailBackend for ImapType { fn create_mailbox( &mut self, - mut path: String, + _path: String, ) -> Result<(MailboxHash, HashMap)> { unimplemented!() /* @@ -924,7 +506,7 @@ impl MailBackend for ImapType { fn delete_mailbox( &mut self, - mailbox_hash: MailboxHash, + _mailbox_hash: MailboxHash, ) -> Result> { unimplemented!() /* @@ -967,7 +549,11 @@ impl MailBackend for ImapType { */ } - fn set_mailbox_subscription(&mut self, mailbox_hash: MailboxHash, new_val: bool) -> Result<()> { + fn set_mailbox_subscription( + &mut self, + _mailbox_hash: MailboxHash, + _new_val: bool, + ) -> Result<()> { unimplemented!() /* let mut mailboxes = self.uid_store.mailboxes.write().unwrap(); @@ -1000,11 +586,7 @@ impl MailBackend for ImapType { */ } - fn rename_mailbox( - &mut self, - mailbox_hash: MailboxHash, - mut new_path: String, - ) -> Result { + fn rename_mailbox(&mut self, _mailbox_hash: MailboxHash, _new_path: String) -> Result { unimplemented!() /* let mut mailboxes = self.uid_store.mailboxes.write().unwrap(); @@ -1045,7 +627,7 @@ impl MailBackend for ImapType { fn set_mailbox_permissions( &mut self, - mailbox_hash: MailboxHash, + _mailbox_hash: MailboxHash, _val: crate::backends::MailboxPermissions, ) -> Result<()> { unimplemented!() @@ -1062,8 +644,8 @@ impl MailBackend for ImapType { fn search( &self, - query: crate::search::Query, - mailbox_hash: Option, + _query: crate::search::Query, + _mailbox_hash: Option, ) -> Result> { unimplemented!() /* @@ -1455,116 +1037,6 @@ fn get_cached_envs( Ok((ret, payload)) } -async fn get_initial_max_uid( - connection: &Arc>, - mailbox_hash: MailboxHash, - cached_hash_set: &HashSet, - can_create_flags: &Arc>, - our_unseen: &mut BTreeSet, - valid_hash_set: &mut HashSet, - uid_store: &UIDStore, -) -> Result { - let (permissions, mailbox_path, mailbox_exists, no_select, unseen) = { - let f = &uid_store.mailboxes.read().unwrap()[&mailbox_hash]; - ( - f.permissions.clone(), - f.imap_path().to_string(), - f.exists.clone(), - f.no_select, - f.unseen.clone(), - ) - }; - let mut conn = connection.lock().await; - debug!("locked for get {}", mailbox_path); - let mut response = String::with_capacity(8 * 1024); - - conn.create_uid_msn_cache(mailbox_hash, 1).await?; - /* first SELECT the mailbox to get READ/WRITE permissions (because EXAMINE only - * returns READ-ONLY for both cases) */ - conn.select_mailbox(mailbox_hash, &mut response, true) - .await - .chain_err_summary(|| format!("Could not select mailbox {}", mailbox_path))?; - let mut examine_response = - protocol_parser::select_response(&response).chain_err_summary(|| { - format!( - "Could not parse select response for mailbox {}", - mailbox_path - ) - })?; - *can_create_flags.lock().unwrap() = examine_response.can_create_flags; - debug!( - "mailbox: {} examine_response: {:?}", - mailbox_path, examine_response - ); - { - let mut uidvalidities = uid_store.uidvalidity.lock().unwrap(); - - let v = uidvalidities - .entry(mailbox_hash) - .or_insert(examine_response.uidvalidity); - if uid_store.cache_headers { - let _ = cache::save_envelopes( - uid_store.account_hash, - mailbox_hash, - examine_response.uidvalidity, - &[], - ); - } - *v = examine_response.uidvalidity; - let mut permissions = permissions.lock().unwrap(); - permissions.create_messages = !examine_response.read_only; - permissions.remove_messages = !examine_response.read_only; - permissions.set_flags = !examine_response.read_only; - permissions.rename_messages = !examine_response.read_only; - permissions.delete_messages = !examine_response.read_only; - permissions.delete_messages = !examine_response.read_only; - mailbox_exists - .lock() - .unwrap() - .set_not_yet_seen(examine_response.exists); - } - if examine_response.exists == 0 { - if uid_store.cache_headers { - for &env_hash in cached_hash_set { - conn.add_refresh_event(RefreshEvent { - account_hash: uid_store.account_hash, - mailbox_hash, - kind: RefreshEventKind::Remove(env_hash), - }); - } - let _ = cache::save_envelopes( - uid_store.account_hash, - mailbox_hash, - examine_response.uidvalidity, - &[], - ); - } - return Ok(0); - } - /* reselecting the same mailbox with EXAMINE prevents expunging it */ - conn.examine_mailbox(mailbox_hash, &mut response, false) - .await?; - if examine_response.uidnext == 0 { - /* UIDNEXT shouldn't be 0, since exists != 0 at this point */ - conn.send_command(format!("STATUS \"{}\" (UIDNEXT)", mailbox_path).as_bytes()) - .await?; - conn.read_response(&mut response, RequiredResponses::STATUS) - .await?; - let (_, status) = protocol_parser::status_response(response.as_bytes())?; - if let Some(uidnext) = status.uidnext { - if uidnext == 0 { - return Err(MeliError::new( - "IMAP server error: zero UIDNEXt with nonzero exists.", - )); - } - examine_response.uidnext = uidnext; - } else { - return Err(MeliError::new("IMAP server did not reply with UIDNEXT")); - } - } - Ok(examine_response.uidnext - 1) -} - async fn get_hlpr( connection: &Arc>, mailbox_hash: MailboxHash, @@ -1585,6 +1057,10 @@ async fn get_hlpr( f.unseen.clone(), ) }; + if no_select { + *max_uid = Some(0); + return Ok(Vec::new()); + } let mut conn = connection.lock().await; debug!("locked for get {}", mailbox_path); let mut response = String::with_capacity(8 * 1024); diff --git a/melib/src/backends/imap_async/connection.rs b/melib/src/backends/imap_async/connection.rs index 49bbd6157..362f9c63e 100644 --- a/melib/src/backends/imap_async/connection.rs +++ b/melib/src/backends/imap_async/connection.rs @@ -29,8 +29,10 @@ use futures::io::{AsyncReadExt, AsyncWriteExt}; use native_tls::TlsConnector; pub use smol::Async as AsyncWrapper; use std::collections::HashSet; +use std::future::Future; use std::iter::FromIterator; use std::net::SocketAddr; +use std::pin::Pin; use std::sync::Arc; use std::time::Instant; @@ -82,7 +84,6 @@ impl ImapStream { pub async fn new_connection( server_conf: &ImapServerConf, ) -> Result<(Capabilities, ImapStream)> { - use std::io::prelude::*; use std::net::TcpStream; let path = &server_conf.server_hostname; debug!("ImapStream::new_connection"); @@ -468,56 +469,57 @@ impl ImapConnection { Ok(()) } - pub async fn read_response( - &mut self, - ret: &mut String, + pub fn read_response<'a>( + &'a mut self, + ret: &'a mut String, required_responses: RequiredResponses, - ) -> Result<()> { - let mut response = String::new(); - ret.clear(); - self.stream.as_mut()?.read_response(&mut response).await?; + ) -> Pin> + Send + 'a>> { + Box::pin(async move { + let mut response = String::new(); + ret.clear(); + self.stream.as_mut()?.read_response(&mut response).await?; - match self.server_conf.protocol { - ImapProtocol::IMAP => { - let r: ImapResponse = ImapResponse::from(&response); - match r { - ImapResponse::Bye(ref response_code) => { - self.stream = Err(MeliError::new(format!( - "Offline: received BYE: {:?}", - response_code - ))); - ret.push_str(&response); - } - ImapResponse::No(ref response_code) => { - debug!("Received NO response: {:?} {:?}", response_code, response); - ret.push_str(&response); - } - ImapResponse::Bad(ref response_code) => { - debug!("Received BAD response: {:?} {:?}", response_code, response); - ret.push_str(&response); - } - _ => { - /*debug!( - "check every line for required_responses: {:#?}", - &required_responses - );*/ - for l in response.split_rn() { - /*debug!("check line: {}", &l);*/ - //if required_responses.check(l) || !self.process_untagged(l)? { - // ret.push_str(l); - //} - ret.push_str(l); + match self.server_conf.protocol { + ImapProtocol::IMAP => { + let r: ImapResponse = ImapResponse::from(&response); + match r { + ImapResponse::Bye(ref response_code) => { + self.stream = Err(MeliError::new(format!( + "Offline: received BYE: {:?}", + response_code + ))); + ret.push_str(&response); + } + ImapResponse::No(ref response_code) => { + debug!("Received NO response: {:?} {:?}", response_code, response); + ret.push_str(&response); + } + ImapResponse::Bad(ref response_code) => { + debug!("Received BAD response: {:?} {:?}", response_code, response); + ret.push_str(&response); + } + _ => { + /*debug!( + "check every line for required_responses: {:#?}", + &required_responses + );*/ + for l in response.split_rn() { + /*debug!("check line: {}", &l);*/ + if required_responses.check(l) || !self.process_untagged(l).await? { + ret.push_str(l); + } + ret.push_str(l); + } } - //ret.push_str(&response); } + r.into() + } + ImapProtocol::ManageSieve => { + ret.push_str(&response); + Ok(()) } - r.into() } - ImapProtocol::ManageSieve => { - ret.push_str(&response); - Ok(()) - } - } + }) } pub async fn read_lines(&mut self, ret: &mut String, termination_string: String) -> Result<()> { @@ -607,7 +609,7 @@ impl ImapConnection { pub async fn unselect(&mut self) -> Result<()> { match self.current_mailbox.take() { MailboxSelection::Examine(mailbox_hash) | - MailboxSelection::Select(mailbox_hash) => { + MailboxSelection::Select(mailbox_hash) =>{ let mut response = String::with_capacity(8 * 1024); if self .capabilities @@ -678,7 +680,7 @@ pub struct ImapBlockingConnection { } impl From for ImapBlockingConnection { - fn from(mut conn: ImapConnection) -> Self { + fn from(_conn: ImapConnection) -> Self { unimplemented!() /* conn.set_nonblocking(false) diff --git a/melib/src/backends/imap_async/operations.rs b/melib/src/backends/imap_async/operations.rs index 044a913c5..4a8fd4a0a 100644 --- a/melib/src/backends/imap_async/operations.rs +++ b/melib/src/backends/imap_async/operations.rs @@ -25,7 +25,7 @@ use crate::backends::*; use crate::email::*; use crate::error::{MeliError, Result}; use std::cell::Cell; -use std::sync::{Arc, Mutex}; +use std::sync::Arc; /// `BackendOp` implementor for Imap #[derive(Debug, Clone)] diff --git a/melib/src/backends/imap_async/untagged.rs b/melib/src/backends/imap_async/untagged.rs index a08b9694f..74e2ad271 100644 --- a/melib/src/backends/imap_async/untagged.rs +++ b/melib/src/backends/imap_async/untagged.rs @@ -19,7 +19,7 @@ * along with meli. If not, see . */ -use super::ImapConnection; +use super::{ImapConnection, MailboxSelection}; use crate::backends::imap_async::protocol_parser::{ ImapLineSplit, RequiredResponses, UidFetchResponse, UntaggedResponse, }; @@ -33,7 +33,7 @@ use crate::error::*; use std::time::Instant; impl ImapConnection { - pub fn process_untagged(&mut self, line: &str) -> Result { + pub async fn process_untagged(&mut self, line: &str) -> Result { macro_rules! try_fail { ($mailbox_hash: expr, $($result:expr)+) => { $(if let Err(err) = $result { @@ -52,10 +52,9 @@ impl ImapConnection { }; } //FIXME - let mailbox_hash = if let Some(mailbox_hash) = self.current_mailbox { - mailbox_hash - } else { - return Ok(false); + let mailbox_hash = match self.current_mailbox { + MailboxSelection::Select(h) | MailboxSelection::Examine(h) => h, + MailboxSelection::None => return Ok(false), }; let mailbox = std::clone::Clone::clone(&self.uid_store.mailboxes.read().unwrap()[&mailbox_hash]); @@ -104,20 +103,19 @@ impl ImapConnection { UntaggedResponse::Exists(n) => { /* UID FETCH ALL UID, cross-ref, then FETCH difference headers * */ - let mut prev_exists = mailbox.exists.lock().unwrap(); debug!("exists {}", n); - if n > prev_exists.len() { + if n > mailbox.exists.lock().unwrap().len() { try_fail!( mailbox_hash, self.send_command( &[ b"FETCH", - format!("{}:{}", prev_exists.len() + 1, n).as_bytes(), + format!("{}:{}", mailbox.exists.lock().unwrap().len() + 1, n).as_bytes(), b"(UID FLAGS RFC822)", ] .join(&b' '), - ) - self.read_response(&mut response, RequiredResponses::FETCH_REQUIRED) + ).await + self.read_response(&mut response, RequiredResponses::FETCH_REQUIRED).await ); match super::protocol_parser::uid_fetch_responses(&response) { Ok((_, v, _)) => { @@ -167,7 +165,7 @@ impl ImapConnection { if !env.is_seen() { mailbox.unseen.lock().unwrap().insert_new(env.hash()); } - prev_exists.insert_new(env.hash()); + mailbox.exists.lock().unwrap().insert_new(env.hash()); self.add_refresh_event(RefreshEvent { account_hash: self.uid_store.account_hash, mailbox_hash, @@ -185,8 +183,8 @@ impl ImapConnection { UntaggedResponse::Recent(_) => { try_fail!( mailbox_hash, - self.send_command(b"UID SEARCH RECENT") - self.read_response(&mut response, RequiredResponses::SEARCH) + self.send_command(b"UID SEARCH RECENT").await + self.read_response(&mut response, RequiredResponses::SEARCH).await ); match super::protocol_parser::search_results_raw(response.as_bytes()) .map(|(_, v)| v) @@ -201,8 +199,8 @@ impl ImapConnection { self.send_command( &[b"UID FETCH", v, b"(FLAGS RFC822)"] .join(&b' '), - ) - self.read_response(&mut response, RequiredResponses::FETCH_REQUIRED) + ).await + self.read_response(&mut response, RequiredResponses::FETCH_REQUIRED).await ); debug!(&response); match super::protocol_parser::uid_fetch_responses(&response) { @@ -294,8 +292,8 @@ impl ImapConnection { format!("{}", msg_seq).as_bytes(), ] .join(&b' '), - ) - self.read_response(&mut response, RequiredResponses::SEARCH) + ).await + self.read_response(&mut response, RequiredResponses::SEARCH).await ); debug!(&response); match super::protocol_parser::search_results( diff --git a/melib/src/backends/imap_async/watch.rs b/melib/src/backends/imap_async/watch.rs index 03624d745..31644ada8 100644 --- a/melib/src/backends/imap_async/watch.rs +++ b/melib/src/backends/imap_async/watch.rs @@ -22,7 +22,7 @@ use super::*; use crate::backends::SpecialUsageMailbox; use crate::email::parser::BytesExt; use crate::email::parser::BytesIterExt; -use std::sync::{Arc, Mutex}; +use std::sync::Arc; /// Arguments for IMAP watching functions pub struct ImapWatchKit { @@ -32,7 +32,7 @@ pub struct ImapWatchKit { } macro_rules! exit_on_error { - ($conn:expr, $mailbox_hash:ident, $thread_id:ident, $($result:expr)+) => { + ($conn:expr, $mailbox_hash:ident, $($result:expr)+) => { $(if let Err(e) = $result { *$conn.uid_store.is_online.lock().unwrap() = ( Instant::now(), @@ -59,7 +59,6 @@ pub async fn poll_with_examine(kit: ImapWatchKit) -> Result<()> { } = kit; conn.connect().await?; let mut response = String::with_capacity(8 * 1024); - let thread_id: std::thread::ThreadId = std::thread::current().id(); loop { let mailboxes = uid_store.mailboxes.read()?; for mailbox in mailboxes.values() { @@ -83,7 +82,6 @@ pub async fn idle(kit: ImapWatchKit) -> Result<()> { uid_store, } = kit; conn.connect().await?; - let thread_id: std::thread::ThreadId = std::thread::current().id(); let mailbox: ImapMailbox = match uid_store .mailboxes .read() @@ -110,7 +108,6 @@ pub async fn idle(kit: ImapWatchKit) -> Result<()> { exit_on_error!( conn, mailbox_hash, - thread_id, conn.send_command(format!("SELECT \"{}\"", mailbox.imap_path()).as_bytes()) .await conn.read_response(&mut response, RequiredResponses::SELECT_REQUIRED) @@ -165,12 +162,7 @@ pub async fn idle(kit: ImapWatchKit) -> Result<()> { } }; } - exit_on_error!( - conn, - mailbox_hash, - thread_id, - conn.send_command(b"IDLE").await - ); + exit_on_error!(conn, mailbox_hash, conn.send_command(b"IDLE").await); let mut iter = ImapBlockingConnection::from(conn); let mut beat = std::time::Instant::now(); let mut watch = std::time::Instant::now(); @@ -185,7 +177,6 @@ pub async fn idle(kit: ImapWatchKit) -> Result<()> { exit_on_error!( iter.conn, mailbox_hash, - thread_id, iter.conn.send_raw(b"DONE").await iter.conn.read_response(&mut response, RequiredResponses::empty()).await iter.conn.send_command(b"IDLE").await @@ -201,7 +192,6 @@ pub async fn idle(kit: ImapWatchKit) -> Result<()> { exit_on_error!( conn, mailbox_hash, - thread_id, examine_updates(mailbox, &mut conn, &uid_store).await ); } @@ -218,7 +208,6 @@ pub async fn idle(kit: ImapWatchKit) -> Result<()> { exit_on_error!( conn, mailbox_hash, - thread_id, conn.examine_mailbox(mailbox_hash, &mut response, false).await conn.send_command(b"UID SEARCH RECENT").await conn.read_response(&mut response, RequiredResponses::SEARCH).await @@ -234,7 +223,6 @@ pub async fn idle(kit: ImapWatchKit) -> Result<()> { exit_on_error!( conn, mailbox_hash, - thread_id, conn.send_command( &[&b"UID FETCH"[..], &v.trim().split(|b| b == &b' ').join(b','), &b"(FLAGS RFC822)"[..]] .join(&b' '), @@ -244,13 +232,10 @@ pub async fn idle(kit: ImapWatchKit) -> Result<()> { debug!(&response); match protocol_parser::uid_fetch_responses(&response) { Ok((_, v, _)) => { - let len = v.len(); - let mut ctr = 0; for UidFetchResponse { uid, flags, body, .. } in v { - ctr += 1; if !uid_store .uid_index .lock() @@ -367,7 +352,6 @@ pub async fn idle(kit: ImapWatchKit) -> Result<()> { exit_on_error!( conn, mailbox_hash, - thread_id, conn.examine_mailbox(mailbox_hash, &mut response, false).await conn.send_command( &[ @@ -381,8 +365,6 @@ pub async fn idle(kit: ImapWatchKit) -> Result<()> { ); match protocol_parser::uid_fetch_responses(&response) { Ok((_, v, _)) => { - let len = v.len(); - let mut ctr = 0; 'fetch_responses_b: for UidFetchResponse { uid, flags, body, .. } in v @@ -393,14 +375,12 @@ pub async fn idle(kit: ImapWatchKit) -> Result<()> { .unwrap() .contains_key(&(mailbox_hash, uid)) { - ctr += 1; continue 'fetch_responses_b; } if let Ok(mut env) = Envelope::from_bytes( body.unwrap(), flags.as_ref().map(|&(f, _)| f), ) { - ctr += 1; uid_store .hash_index .lock() @@ -463,7 +443,6 @@ pub async fn idle(kit: ImapWatchKit) -> Result<()> { exit_on_error!( conn, mailbox_hash, - thread_id, conn.examine_mailbox(mailbox_hash, &mut response, false).await conn.send_command( &[ @@ -532,7 +511,6 @@ pub async fn examine_updates( exit_on_error!( conn, mailbox_hash, - thread_id, conn.examine_mailbox(mailbox_hash, &mut response, true) .await ); @@ -583,7 +561,6 @@ pub async fn examine_updates( exit_on_error!( conn, mailbox_hash, - thread_id, conn.send_command(b"UID SEARCH RECENT").await conn.read_response(&mut response, RequiredResponses::SEARCH).await ); @@ -598,7 +575,6 @@ pub async fn examine_updates( exit_on_error!( conn, mailbox_hash, - thread_id, conn.send_command( &[&b"UID FETCH"[..], &v.trim().split(|b| b == &b' ').join(b','), &b"(FLAGS RFC822)"[..]] .join(&b' '), @@ -701,7 +677,6 @@ pub async fn examine_updates( exit_on_error!( conn, mailbox_hash, - thread_id, conn.send_command( &[ b"FETCH", diff --git a/melib/src/backends/maildir/backend.rs b/melib/src/backends/maildir/backend.rs index cd5afd182..5a463cd2b 100644 --- a/melib/src/backends/maildir/backend.rs +++ b/melib/src/backends/maildir/backend.rs @@ -19,12 +19,9 @@ * along with meli. If not, see . */ -use super::{ - BackendMailbox, BackendOp, MailBackend, Mailbox, MailboxHash, MaildirPathTrait, RefreshEvent, - RefreshEventConsumer, RefreshEventKind::*, -}; -use super::{MaildirMailbox, MaildirOp}; +use super::{MaildirMailbox, MaildirOp, MaildirPathTrait}; use crate::async_workers::*; +use crate::backends::{RefreshEventKind::*, *}; use crate::conf::AccountSettings; use crate::email::{Envelope, EnvelopeHash, Flag}; use crate::error::{MeliError, Result}; @@ -202,7 +199,7 @@ impl MailBackend for MaildirType { let mailbox_hash = mailbox.hash(); let unseen = mailbox.unseen.clone(); let total = mailbox.total.clone(); - let mut path: PathBuf = mailbox.fs_path().into(); + let path: PathBuf = mailbox.fs_path().into(); let root_path = self.path.to_path_buf(); let map = self.hash_indexes.clone(); let mailbox_index = self.mailbox_index.clone(); diff --git a/melib/src/backends/maildir/stream.rs b/melib/src/backends/maildir/stream.rs index cbe9d5b72..a3f26c01e 100644 --- a/melib/src/backends/maildir/stream.rs +++ b/melib/src/backends/maildir/stream.rs @@ -19,34 +19,33 @@ * along with meli. If not, see . */ - -use futures::task::{Context, Poll}; -use core::pin::Pin; use super::*; -use std::sync::{Arc,Mutex}; -use std::io::{self, Read, Write}; -use std::ops::{Deref, DerefMut}; -use std::os::unix::fs::PermissionsExt; -use std::path::{Component, Path, PathBuf}; -use std::result; -use std::sync::mpsc::channel; use crate::backends::maildir::backend::move_to_cur; use core::future::Future; -use futures::stream::FuturesUnordered; +use core::pin::Pin; +use futures::stream::{FuturesUnordered, StreamExt}; +use futures::task::{Context, Poll}; +use std::io::{self}; +use std::os::unix::fs::PermissionsExt; +use std::path::PathBuf; +use std::result; +use std::sync::{Arc, Mutex}; pub struct MaildirStream { - mailbox_hash: MailboxHash, - unseen: Arc>, - total: Arc>, - path: PathBuf, - root_path: PathBuf, - map: HashIndexes, - mailbox_index: Arc>>, - payloads: Pin>>>>>>>, - } + payloads: Pin>> + Send + 'static>>>>>, +} impl MaildirStream { - pub fn new(name: &str, mailbox_hash: MailboxHash, unseen: Arc>, total: Arc>, mut path: PathBuf, root_path: PathBuf, map: HashIndexes, mailbox_index: Arc>>) -> Result>>>> { + pub fn new( + name: &str, + mailbox_hash: MailboxHash, + unseen: Arc>, + total: Arc>, + mut path: PathBuf, + root_path: PathBuf, + map: HashIndexes, + mailbox_index: Arc>>, + ) -> Result>> + Send + 'static>>> { path.push("new"); for d in path.read_dir()? { if let Ok(p) = d { @@ -65,26 +64,47 @@ impl MaildirStream { Ok(path) })?; files.push(e); - }let payloads =Box::pin( - if !files.is_empty() { + } + let payloads = Box::pin(if !files.is_empty() { let cores = 4_usize; let chunk_size = if count / cores > 0 { count / cores } else { count }; - files.chunks(chunk_size).map(|chunk| { - //Self::chunk(chunk, name, mailbox_hash, unseen, total, path, root_path, map, mailbox_index)}) - let cache_dir = xdg::BaseDirectories::with_profile("meli", &name).unwrap(); - Box::pin(Self::chunk(SmallVec::from(chunk), cache_dir, mailbox_hash, unseen.clone(), total.clone(), path.clone(), root_path.clone(), map.clone(), mailbox_index.clone())) as Pin>>}) + files + .chunks(chunk_size) + .map(|chunk| { + //Self::chunk(chunk, name, mailbox_hash, unseen, total, path, root_path, map, mailbox_index)}) + let cache_dir = xdg::BaseDirectories::with_profile("meli", &name).unwrap(); + Box::pin(Self::chunk( + SmallVec::from(chunk), + cache_dir, + mailbox_hash, + unseen.clone(), + total.clone(), + root_path.clone(), + map.clone(), + mailbox_index.clone(), + )) as Pin + Send + 'static>> + }) .collect::<_>() - - - } else { FuturesUnordered::new() }); - Ok(Box::new(Self{mailbox_hash, unseen, total, path, root_path, map, mailbox_index, payloads})) + } else { + FuturesUnordered::new() + }); + Ok(Self { payloads }.boxed()) } - async fn chunk(chunk:SmallVec<[std::path::PathBuf; 2048]>, cache_dir:xdg::BaseDirectories, mailbox_hash: MailboxHash, unseen: Arc>, total: Arc>, path: PathBuf, root_path: PathBuf, map: HashIndexes, mailbox_index: Arc>>) -> Result> { + async fn chunk( + chunk: SmallVec<[std::path::PathBuf; 2048]>, + cache_dir: xdg::BaseDirectories, + mailbox_hash: MailboxHash, + unseen: Arc>, + total: Arc>, + root_path: PathBuf, + map: HashIndexes, + mailbox_index: Arc>>, + ) -> Result> { let unseen = unseen.clone(); let total = total.clone(); let map = map.clone(); @@ -92,12 +112,9 @@ impl MaildirStream { let root_path = root_path.clone(); let len = chunk.len(); let size = if len <= 100 { 100 } else { (len / 100) * 100 }; - let mut local_r: Vec = - Vec::with_capacity(chunk.len()); + let mut local_r: Vec = Vec::with_capacity(chunk.len()); for c in chunk.chunks(size) { let map = map.clone(); - let mailbox_index = mailbox_index.clone(); - let len = c.len(); for file in c { /* Check if we have a cache file with this email's * filename */ @@ -105,67 +122,50 @@ impl MaildirStream { .strip_prefix(&root_path) .unwrap() .to_path_buf(); - if let Some(cached) = - cache_dir.find_cache_file(&file_name) - { - /* Cached struct exists, try to load it */ - let reader = io::BufReader::new( - fs::File::open(&cached).unwrap(), - ); - let result: result::Result = - bincode::deserialize_from(reader); - if let Ok(env) = result { - let mut map = map.lock().unwrap(); - let map = map.entry(mailbox_hash).or_default(); - let hash = env.hash(); - map.insert(hash, file.clone().into()); - mailbox_index - .lock() - .unwrap() - .insert(hash, mailbox_hash); - if !env.is_seen() { - *unseen.lock().unwrap() += 1; - } - *total.lock().unwrap() += 1; - local_r.push(env); - continue; + if let Some(cached) = cache_dir.find_cache_file(&file_name) { + /* Cached struct exists, try to load it */ + let reader = io::BufReader::new(fs::File::open(&cached).unwrap()); + let result: result::Result = bincode::deserialize_from(reader); + if let Ok(env) = result { + let mut map = map.lock().unwrap(); + let map = map.entry(mailbox_hash).or_default(); + let hash = env.hash(); + map.insert(hash, file.clone().into()); + mailbox_index.lock().unwrap().insert(hash, mailbox_hash); + if !env.is_seen() { + *unseen.lock().unwrap() += 1; } - }; + *total.lock().unwrap() += 1; + local_r.push(env); + continue; + } + }; let hash = get_file_hash(file); { let mut map = map.lock().unwrap(); let map = map.entry(mailbox_hash).or_default(); (*map).insert(hash, PathBuf::from(file).into()); } - let op = Box::new(MaildirOp::new( - hash, - map.clone(), - mailbox_hash, - )); + let op = Box::new(MaildirOp::new(hash, map.clone(), mailbox_hash)); if let Some(e) = Envelope::from_token(op, hash) { - mailbox_index - .lock() - .unwrap() - .insert(e.hash(), mailbox_hash); - if let Ok(cached) = - cache_dir.place_cache_file(file_name) - { - /* place result in cache directory */ - let f = match fs::File::create(cached) { - Ok(f) => f, - Err(e) => { - panic!("{}", e); - } - }; - let metadata = f.metadata().unwrap(); - let mut permissions = metadata.permissions(); + mailbox_index.lock().unwrap().insert(e.hash(), mailbox_hash); + if let Ok(cached) = cache_dir.place_cache_file(file_name) { + /* place result in cache directory */ + let f = match fs::File::create(cached) { + Ok(f) => f, + Err(e) => { + panic!("{}", e); + } + }; + let metadata = f.metadata().unwrap(); + let mut permissions = metadata.permissions(); - permissions.set_mode(0o600); // Read/write for owner only. - f.set_permissions(permissions).unwrap(); + permissions.set_mode(0o600); // Read/write for owner only. + f.set_permissions(permissions).unwrap(); - let writer = io::BufWriter::new(f); - bincode::serialize_into(writer, &e).unwrap(); - } + let writer = io::BufWriter::new(f); + bincode::serialize_into(writer, &e).unwrap(); + } if !e.is_seen() { *unseen.lock().unwrap() += 1; } @@ -187,7 +187,7 @@ impl MaildirStream { impl Stream for MaildirStream { type Item = Result>; - fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll>{ + fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll> { //todo!() let payloads = self.payloads.as_mut(); payloads.poll_next(cx) diff --git a/melib/src/error.rs b/melib/src/error.rs index 5743af342..6b7c497e7 100644 --- a/melib/src/error.rs +++ b/melib/src/error.rs @@ -276,3 +276,10 @@ impl From> for MeliError { .set_source(Some(Arc::new(MeliError::new(format!("{}", kind))))) } } + +impl<'a> From<&'a mut MeliError> for MeliError { + #[inline] + fn from(kind: &'a mut MeliError) -> MeliError { + kind.clone() + } +} diff --git a/src/conf/accounts.rs b/src/conf/accounts.rs index fc2540b45..fd0f70490 100644 --- a/src/conf/accounts.rs +++ b/src/conf/accounts.rs @@ -24,7 +24,7 @@ */ use super::{AccountConf, FileMailboxConf}; -use crate::jobs1::{JobExecutor, JobId}; +use crate::jobs1::{JobExecutor, JobId, JoinHandle}; use melib::async_workers::{Async, AsyncBuilder, AsyncStatus, WorkContext}; use melib::backends::{ AccountHash, BackendOp, Backends, MailBackend, Mailbox, MailboxHash, NotifyFn, ReadOnlyOp, @@ -130,7 +130,7 @@ pub struct Account { pub(crate) settings: AccountConf, pub(crate) backend: Arc>>, - job_executor: Arc, + pub job_executor: Arc, active_jobs: HashMap, sender: Sender, event_queue: VecDeque<(MailboxHash, RefreshEvent)>, diff --git a/src/jobs1.rs b/src/jobs1.rs index e39d22fff..d4f9e16b7 100644 --- a/src/jobs1.rs +++ b/src/jobs1.rs @@ -36,7 +36,6 @@ use crossbeam::deque::{Injector, Steal, Stealer, Worker}; use crossbeam::sync::{Parker, Unparker}; use crossbeam::Sender; pub use futures::channel::oneshot; -use once_cell::sync::Lazy; use std::iter; type AsyncTask = async_task::Task<()>;