melib/nntp: fetch all articles of group

memfd
Manos Pitsidianakis 2020-08-09 21:23:13 +03:00
parent 92a9127758
commit 1affee183a
Signed by: Manos Pitsidianakis
GPG Key ID: 73627C2F690DF710
2 changed files with 163 additions and 67 deletions

View File

@ -39,8 +39,7 @@ use crate::email::*;
use crate::error::{MeliError, Result, ResultIntoMeliError};
use futures::lock::Mutex as FutureMutex;
use futures::stream::Stream;
use std::collections::{hash_map::DefaultHasher, BTreeMap};
use std::collections::{HashMap, HashSet};
use std::collections::{hash_map::DefaultHasher, BTreeMap, BTreeSet, HashMap, HashSet};
use std::future::Future;
use std::hash::Hasher;
use std::pin::Pin;
@ -188,16 +187,25 @@ impl MailBackend for NntpType {
&mut self,
mailbox_hash: MailboxHash,
) -> Result<Pin<Box<dyn Stream<Item = Result<Vec<Envelope>>> + Send + 'static>>> {
let uid_store = self.uid_store.clone();
let connection = self.connection.clone();
let mut state = FetchState {
mailbox_hash,
uid_store: self.uid_store.clone(),
connection: self.connection.clone(),
high_low_total: None,
};
Ok(Box::pin(async_stream::try_stream! {
{
let f = &uid_store.mailboxes.lock().await[&mailbox_hash];
let f = &state.uid_store.mailboxes.lock().await[&state.mailbox_hash];
f.exists.lock().unwrap().clear();
f.unseen.lock().unwrap().clear();
};
let ret = fetch_envs(mailbox_hash, connection, &uid_store).await?;
yield ret;
loop {
if let Some(ret) = state.fetch_envs().await? {
yield ret;
continue;
}
break;
}
}))
}
@ -562,67 +570,95 @@ impl NntpType {
}
}
async fn fetch_envs(
struct FetchState {
mailbox_hash: MailboxHash,
connection: Arc<FutureMutex<NntpConnection>>,
uid_store: &UIDStore,
) -> Result<Vec<Envelope>> {
let mut res = String::with_capacity(8 * 1024);
let mut conn = connection.lock().await;
let path = uid_store.mailboxes.lock().await[&mailbox_hash]
.name()
.to_string();
conn.send_command(format!("GROUP {}", path).as_bytes())
.await?;
conn.read_response(&mut res, false, &["211 "])
.await
.chain_err_summary(|| {
format!(
"{} Could not select newsgroup {}: expected GROUP response but got: {}",
&uid_store.account_name, path, res
)
})?;
/*
* Parameters
group Name of newsgroup
number Estimated number of articles in the group
low Reported low water mark
high Reported high water mark
*/
let s = res.split_whitespace().collect::<SmallVec<[&str; 6]>>();
if s.len() != 5 {
return Err(MeliError::new(format!(
"{} Could not select newsgroup {}: expected GROUP response but got: {}",
&uid_store.account_name, path, res
)));
}
let total = usize::from_str(&s[1]).unwrap_or(0);
let _low = usize::from_str(&s[2]).unwrap_or(0);
let high = usize::from_str(&s[3]).unwrap_or(0);
drop(s);
uid_store: Arc<UIDStore>,
high_low_total: Option<(usize, usize, usize)>,
}
conn.send_command(format!("OVER {}-{}", high.saturating_sub(100), high).as_bytes())
.await?;
conn.read_response(&mut res, true, &["224 "])
.await
.chain_err_summary(|| {
format!(
"{} Could not select newsgroup {}: expected OVER response but got: {}",
&uid_store.account_name, path, res
)
})?;
let mut ret = Vec::with_capacity(total);
//hash_index: Arc<Mutex<HashMap<EnvelopeHash, (UID, MailboxHash)>>>,
//uid_index: Arc<Mutex<HashMap<(MailboxHash, UID), EnvelopeHash>>>,
let mut hash_index_lck = uid_store.hash_index.lock().unwrap();
let mut uid_index_lck = uid_store.uid_index.lock().unwrap();
for l in res.split_rn().skip(1) {
let (_, (num, env)) = debug!(protocol_parser::over_article(&l))?;
hash_index_lck.insert(env.hash(), (num, mailbox_hash));
uid_index_lck.insert((mailbox_hash, num), env.hash());
ret.push(env);
impl FetchState {
async fn fetch_envs(&mut self) -> Result<Option<Vec<Envelope>>> {
let FetchState {
mailbox_hash,
ref connection,
ref uid_store,
ref mut high_low_total,
} = self;
let mailbox_hash = *mailbox_hash;
let mut res = String::with_capacity(8 * 1024);
let mut conn = connection.lock().await;
if high_low_total.is_none() {
conn.select_group(mailbox_hash, true, &mut res).await?;
/*
* Parameters
group Name of newsgroup
number Estimated number of articles in the group
low Reported low water mark
high Reported high water mark
*/
let s = res.split_whitespace().collect::<SmallVec<[&str; 6]>>();
let path = conn.uid_store.mailboxes.lock().await[&mailbox_hash]
.name()
.to_string();
if s.len() != 5 {
return Err(MeliError::new(format!(
"{} Could not select newsgroup {}: expected GROUP response but got: {}",
&uid_store.account_name, path, res
)));
}
let total = usize::from_str(&s[1]).unwrap_or(0);
let _low = usize::from_str(&s[2]).unwrap_or(0);
let high = usize::from_str(&s[3]).unwrap_or(0);
*high_low_total = Some((high, _low, total));
{
let f = &uid_store.mailboxes.lock().await[&mailbox_hash];
f.exists.lock().unwrap().set_not_yet_seen(total);
f.unseen.lock().unwrap().set_not_yet_seen(total);
};
}
let (high, low, total) = high_low_total.unwrap();
if high <= low {
return Ok(None);
}
const CHUNK_SIZE: usize = 100;
let new_low = std::cmp::max(low, high.saturating_sub(CHUNK_SIZE));
high_low_total.as_mut().unwrap().0 = new_low;
conn.send_command(format!("OVER {}-{}", new_low, high).as_bytes())
.await?;
conn.read_response(&mut res, true, command_to_replycodes("OVER"))
.await
.chain_err_summary(|| {
format!(
"{} Could not select newsgroup: expected OVER response but got: {}",
&uid_store.account_name, res
)
})?;
let mut ret = Vec::with_capacity(high - new_low);
//hash_index: Arc<Mutex<HashMap<EnvelopeHash, (UID, MailboxHash)>>>,
//uid_index: Arc<Mutex<HashMap<(MailboxHash, UID), EnvelopeHash>>>,
{
let mut hash_index_lck = uid_store.hash_index.lock().unwrap();
let mut uid_index_lck = uid_store.uid_index.lock().unwrap();
for l in res.split_rn().skip(1) {
let (_, (num, env)) = protocol_parser::over_article(&l)?;
hash_index_lck.insert(env.hash(), (num, mailbox_hash));
uid_index_lck.insert((mailbox_hash, num), env.hash());
ret.push(env);
}
}
{
let hash_set: BTreeSet<EnvelopeHash> = ret.iter().map(|env| env.hash()).collect();
let f = &uid_store.mailboxes.lock().await[&mailbox_hash];
f.exists
.lock()
.unwrap()
.insert_existing_set(hash_set.clone());
f.unseen.lock().unwrap().insert_existing_set(hash_set);
};
Ok(Some(ret))
}
Ok(ret)
}
use futures::future::{self, Either};

View File

@ -19,7 +19,7 @@
* along with meli. If not, see <http://www.gnu.org/licenses/>.
*/
use crate::backends::MailboxHash;
use crate::backends::{BackendMailbox, MailboxHash};
use crate::connections::{lookup_ipv4, Connection};
use crate::email::parser::BytesExt;
use crate::error::*;
@ -61,7 +61,6 @@ pub struct NntpStream {
pub enum MailboxSelection {
None,
Select(MailboxHash),
Examine(MailboxHash),
}
impl MailboxSelection {
@ -371,6 +370,29 @@ impl NntpStream {
Ok(())
}
}
pub async fn send_multiline_data_block(&mut self, data: &str) -> Result<()> {
if let Err(err) = try_await(async move {
for l in data.lines() {
if l.starts_with('.') {
self.stream.write_all(b".").await?;
}
self.stream.write_all(l.as_bytes()).await?;
self.stream.write_all(b"\r\n").await?;
}
self.stream.write_all(b".\r\n").await?;
self.stream.flush().await?;
debug!("sent data block {} bytes", data.len());
Ok(())
})
.await
{
debug!("stream send_multiline_data_block err {:?}", err);
Err(err.set_err_kind(crate::error::ErrorKind::Network))
} else {
Ok(())
}
}
}
impl NntpConnection {
@ -463,13 +485,51 @@ impl NntpConnection {
self.uid_store.refresh_events.lock().unwrap().push(ev);
}
}
pub async fn select_group(
&mut self,
mailbox_hash: MailboxHash,
force: bool,
res: &mut String,
) -> Result<()> {
if !force {
match self.stream.as_ref()?.current_mailbox {
MailboxSelection::Select(m) if m == mailbox_hash => return Ok(()),
_ => {}
}
}
let path = self.uid_store.mailboxes.lock().await[&mailbox_hash]
.name()
.to_string();
self.send_command(format!("GROUP {}", path).as_bytes())
.await?;
self.read_response(res, false, command_to_replycodes("GROUP"))
.await
.chain_err_summary(|| {
format!(
"{} Could not select newsgroup {}: expected GROUP response but got: {}",
&self.uid_store.account_name, path, res
)
})?;
self.stream.as_mut()?.current_mailbox = MailboxSelection::Select(mailbox_hash);
Ok(())
}
pub async fn send_multiline_data_block(&mut self, message: &str) -> Result<()> {
self.stream
.as_mut()?
.send_multiline_data_block(message)
.await
}
}
fn command_to_replycodes(c: &str) -> &'static [&'static str] {
pub fn command_to_replycodes(c: &str) -> &'static [&'static str] {
if c.starts_with("OVER") {
&["224 "]
} else if c.starts_with("LIST") {
&["215 "]
} else if c.starts_with("POST") {
&["340 "]
} else if c.starts_with("STARTTLS") {
&["382 "]
} else if c.starts_with("GROUP") {