diff --git a/melib/src/backends/nntp.rs b/melib/src/backends/nntp.rs index fb8258ac..2aa07343 100644 --- a/melib/src/backends/nntp.rs +++ b/melib/src/backends/nntp.rs @@ -39,8 +39,7 @@ use crate::email::*; use crate::error::{MeliError, Result, ResultIntoMeliError}; use futures::lock::Mutex as FutureMutex; use futures::stream::Stream; -use std::collections::{hash_map::DefaultHasher, BTreeMap}; -use std::collections::{HashMap, HashSet}; +use std::collections::{hash_map::DefaultHasher, BTreeMap, BTreeSet, HashMap, HashSet}; use std::future::Future; use std::hash::Hasher; use std::pin::Pin; @@ -188,16 +187,25 @@ impl MailBackend for NntpType { &mut self, mailbox_hash: MailboxHash, ) -> Result>> + Send + 'static>>> { - let uid_store = self.uid_store.clone(); - let connection = self.connection.clone(); + let mut state = FetchState { + mailbox_hash, + uid_store: self.uid_store.clone(), + connection: self.connection.clone(), + high_low_total: None, + }; Ok(Box::pin(async_stream::try_stream! { { - let f = &uid_store.mailboxes.lock().await[&mailbox_hash]; + let f = &state.uid_store.mailboxes.lock().await[&state.mailbox_hash]; f.exists.lock().unwrap().clear(); f.unseen.lock().unwrap().clear(); }; - let ret = fetch_envs(mailbox_hash, connection, &uid_store).await?; - yield ret; + loop { + if let Some(ret) = state.fetch_envs().await? { + yield ret; + continue; + } + break; + } })) } @@ -562,67 +570,95 @@ impl NntpType { } } -async fn fetch_envs( +struct FetchState { mailbox_hash: MailboxHash, connection: Arc>, - uid_store: &UIDStore, -) -> Result> { - let mut res = String::with_capacity(8 * 1024); - let mut conn = connection.lock().await; - let path = uid_store.mailboxes.lock().await[&mailbox_hash] - .name() - .to_string(); - conn.send_command(format!("GROUP {}", path).as_bytes()) - .await?; - conn.read_response(&mut res, false, &["211 "]) - .await - .chain_err_summary(|| { - format!( - "{} Could not select newsgroup {}: expected GROUP response but got: {}", - &uid_store.account_name, path, res - ) - })?; - /* - * Parameters - group Name of newsgroup - number Estimated number of articles in the group - low Reported low water mark - high Reported high water mark - */ - let s = res.split_whitespace().collect::>(); - if s.len() != 5 { - return Err(MeliError::new(format!( - "{} Could not select newsgroup {}: expected GROUP response but got: {}", - &uid_store.account_name, path, res - ))); - } - let total = usize::from_str(&s[1]).unwrap_or(0); - let _low = usize::from_str(&s[2]).unwrap_or(0); - let high = usize::from_str(&s[3]).unwrap_or(0); - drop(s); + uid_store: Arc, + high_low_total: Option<(usize, usize, usize)>, +} - conn.send_command(format!("OVER {}-{}", high.saturating_sub(100), high).as_bytes()) - .await?; - conn.read_response(&mut res, true, &["224 "]) - .await - .chain_err_summary(|| { - format!( - "{} Could not select newsgroup {}: expected OVER response but got: {}", - &uid_store.account_name, path, res - ) - })?; - let mut ret = Vec::with_capacity(total); - //hash_index: Arc>>, - //uid_index: Arc>>, - let mut hash_index_lck = uid_store.hash_index.lock().unwrap(); - let mut uid_index_lck = uid_store.uid_index.lock().unwrap(); - for l in res.split_rn().skip(1) { - let (_, (num, env)) = debug!(protocol_parser::over_article(&l))?; - hash_index_lck.insert(env.hash(), (num, mailbox_hash)); - uid_index_lck.insert((mailbox_hash, num), env.hash()); - ret.push(env); +impl FetchState { + async fn fetch_envs(&mut self) -> Result>> { + let FetchState { + mailbox_hash, + ref connection, + ref uid_store, + ref mut high_low_total, + } = self; + let mailbox_hash = *mailbox_hash; + let mut res = String::with_capacity(8 * 1024); + let mut conn = connection.lock().await; + if high_low_total.is_none() { + conn.select_group(mailbox_hash, true, &mut res).await?; + /* + * Parameters + group Name of newsgroup + number Estimated number of articles in the group + low Reported low water mark + high Reported high water mark + */ + let s = res.split_whitespace().collect::>(); + let path = conn.uid_store.mailboxes.lock().await[&mailbox_hash] + .name() + .to_string(); + if s.len() != 5 { + return Err(MeliError::new(format!( + "{} Could not select newsgroup {}: expected GROUP response but got: {}", + &uid_store.account_name, path, res + ))); + } + let total = usize::from_str(&s[1]).unwrap_or(0); + let _low = usize::from_str(&s[2]).unwrap_or(0); + let high = usize::from_str(&s[3]).unwrap_or(0); + *high_low_total = Some((high, _low, total)); + { + let f = &uid_store.mailboxes.lock().await[&mailbox_hash]; + f.exists.lock().unwrap().set_not_yet_seen(total); + f.unseen.lock().unwrap().set_not_yet_seen(total); + }; + } + let (high, low, total) = high_low_total.unwrap(); + if high <= low { + return Ok(None); + } + const CHUNK_SIZE: usize = 100; + let new_low = std::cmp::max(low, high.saturating_sub(CHUNK_SIZE)); + high_low_total.as_mut().unwrap().0 = new_low; + + conn.send_command(format!("OVER {}-{}", new_low, high).as_bytes()) + .await?; + conn.read_response(&mut res, true, command_to_replycodes("OVER")) + .await + .chain_err_summary(|| { + format!( + "{} Could not select newsgroup: expected OVER response but got: {}", + &uid_store.account_name, res + ) + })?; + let mut ret = Vec::with_capacity(high - new_low); + //hash_index: Arc>>, + //uid_index: Arc>>, + { + let mut hash_index_lck = uid_store.hash_index.lock().unwrap(); + let mut uid_index_lck = uid_store.uid_index.lock().unwrap(); + for l in res.split_rn().skip(1) { + let (_, (num, env)) = protocol_parser::over_article(&l)?; + hash_index_lck.insert(env.hash(), (num, mailbox_hash)); + uid_index_lck.insert((mailbox_hash, num), env.hash()); + ret.push(env); + } + } + { + let hash_set: BTreeSet = ret.iter().map(|env| env.hash()).collect(); + let f = &uid_store.mailboxes.lock().await[&mailbox_hash]; + f.exists + .lock() + .unwrap() + .insert_existing_set(hash_set.clone()); + f.unseen.lock().unwrap().insert_existing_set(hash_set); + }; + Ok(Some(ret)) } - Ok(ret) } use futures::future::{self, Either}; diff --git a/melib/src/backends/nntp/connection.rs b/melib/src/backends/nntp/connection.rs index 82e55dd3..5eaa8396 100644 --- a/melib/src/backends/nntp/connection.rs +++ b/melib/src/backends/nntp/connection.rs @@ -19,7 +19,7 @@ * along with meli. If not, see . */ -use crate::backends::MailboxHash; +use crate::backends::{BackendMailbox, MailboxHash}; use crate::connections::{lookup_ipv4, Connection}; use crate::email::parser::BytesExt; use crate::error::*; @@ -61,7 +61,6 @@ pub struct NntpStream { pub enum MailboxSelection { None, Select(MailboxHash), - Examine(MailboxHash), } impl MailboxSelection { @@ -371,6 +370,29 @@ impl NntpStream { Ok(()) } } + + pub async fn send_multiline_data_block(&mut self, data: &str) -> Result<()> { + if let Err(err) = try_await(async move { + for l in data.lines() { + if l.starts_with('.') { + self.stream.write_all(b".").await?; + } + self.stream.write_all(l.as_bytes()).await?; + self.stream.write_all(b"\r\n").await?; + } + self.stream.write_all(b".\r\n").await?; + self.stream.flush().await?; + debug!("sent data block {} bytes", data.len()); + Ok(()) + }) + .await + { + debug!("stream send_multiline_data_block err {:?}", err); + Err(err.set_err_kind(crate::error::ErrorKind::Network)) + } else { + Ok(()) + } + } } impl NntpConnection { @@ -463,13 +485,51 @@ impl NntpConnection { self.uid_store.refresh_events.lock().unwrap().push(ev); } } + + pub async fn select_group( + &mut self, + mailbox_hash: MailboxHash, + force: bool, + res: &mut String, + ) -> Result<()> { + if !force { + match self.stream.as_ref()?.current_mailbox { + MailboxSelection::Select(m) if m == mailbox_hash => return Ok(()), + _ => {} + } + } + let path = self.uid_store.mailboxes.lock().await[&mailbox_hash] + .name() + .to_string(); + self.send_command(format!("GROUP {}", path).as_bytes()) + .await?; + self.read_response(res, false, command_to_replycodes("GROUP")) + .await + .chain_err_summary(|| { + format!( + "{} Could not select newsgroup {}: expected GROUP response but got: {}", + &self.uid_store.account_name, path, res + ) + })?; + self.stream.as_mut()?.current_mailbox = MailboxSelection::Select(mailbox_hash); + Ok(()) + } + + pub async fn send_multiline_data_block(&mut self, message: &str) -> Result<()> { + self.stream + .as_mut()? + .send_multiline_data_block(message) + .await + } } -fn command_to_replycodes(c: &str) -> &'static [&'static str] { +pub fn command_to_replycodes(c: &str) -> &'static [&'static str] { if c.starts_with("OVER") { &["224 "] } else if c.starts_with("LIST") { &["215 "] + } else if c.starts_with("POST") { + &["340 "] } else if c.starts_with("STARTTLS") { &["382 "] } else if c.starts_with("GROUP") {