/* * meli - nntp module. * * Copyright 2019 Manos Pitsidianakis * * This file is part of meli. * * meli is free software: you can redistribute it and/or modify * it under the terms of the GNU General Public License as published by * the Free Software Foundation, either version 3 of the License, or * (at your option) any later version. * * meli is distributed in the hope that it will be useful, * but WITHOUT ANY WARRANTY; without even the implied warranty of * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the * GNU General Public License for more details. * * You should have received a copy of the GNU General Public License * along with meli. If not, see . */ use crate::get_conf_val; use crate::get_path_hash; use smallvec::SmallVec; #[macro_use] mod protocol_parser; pub use protocol_parser::*; mod mailbox; pub use mailbox::*; mod operations; pub use operations::*; mod connection; pub use connection::*; use crate::backends::*; use crate::conf::AccountSettings; use crate::email::*; use crate::error::{MeliError, Result, ResultIntoMeliError}; use futures::lock::Mutex as FutureMutex; use futures::stream::Stream; use std::collections::{hash_map::DefaultHasher, BTreeMap, BTreeSet, HashMap, HashSet}; use std::future::Future; use std::hash::Hasher; use std::pin::Pin; use std::str::FromStr; use std::sync::{Arc, Mutex, RwLock}; use std::time::Instant; pub type UID = usize; pub static SUPPORTED_CAPABILITIES: &[&str] = &[ #[cfg(feature = "deflate_compression")] "COMPRESS DEFLATE", "VERSION 2", ]; #[derive(Debug, Clone)] pub struct NntpServerConf { pub server_hostname: String, pub server_username: String, pub server_password: String, pub server_port: u16, pub use_starttls: bool, pub use_tls: bool, pub require_auth: bool, pub danger_accept_invalid_certs: bool, pub extension_use: NntpExtensionUse, } type Capabilities = HashSet; #[derive(Debug)] pub struct UIDStore { account_hash: AccountHash, account_name: Arc, offline_cache: bool, capabilities: Arc>, hash_index: Arc>>, uid_index: Arc>>, mailboxes: Arc>>, is_online: Arc)>>, event_consumer: BackendEventConsumer, } impl UIDStore { fn new( account_hash: AccountHash, account_name: Arc, event_consumer: BackendEventConsumer, ) -> Self { UIDStore { account_hash, account_name, event_consumer, offline_cache: false, capabilities: Default::default(), hash_index: Default::default(), uid_index: Default::default(), mailboxes: Arc::new(FutureMutex::new(Default::default())), is_online: Arc::new(Mutex::new(( Instant::now(), Err(MeliError::new("Account is uninitialised.")), ))), } } } #[derive(Debug)] pub struct NntpType { is_subscribed: Arc, connection: Arc>, server_conf: NntpServerConf, uid_store: Arc, can_create_flags: Arc>, } impl MailBackend for NntpType { fn capabilities(&self) -> MailBackendCapabilities { let mut extensions = self .uid_store .capabilities .lock() .unwrap() .iter() .map(|c| { ( c.to_string(), MailBackendExtensionStatus::Unsupported { comment: None }, ) }) .collect::>(); let NntpExtensionUse { #[cfg(feature = "deflate_compression")] deflate, } = self.server_conf.extension_use; { for (name, status) in extensions.iter_mut() { match name.as_str() { "COMPRESS DEFLATE" => { #[cfg(feature = "deflate_compression")] { if deflate { *status = MailBackendExtensionStatus::Enabled { comment: None }; } else { *status = MailBackendExtensionStatus::Supported { comment: Some("Disabled by user configuration"), }; } } #[cfg(not(feature = "deflate_compression"))] { *status = MailBackendExtensionStatus::Unsupported { comment: Some("melib not compiled with DEFLATE."), }; } } _ => { if SUPPORTED_CAPABILITIES.contains(&name.as_str()) { *status = MailBackendExtensionStatus::Enabled { comment: None }; } } } } } extensions.sort_by(|a, b| a.0.cmp(&b.0)); MailBackendCapabilities { is_async: true, is_remote: true, supports_search: false, extensions: Some(extensions), supports_tags: false, supports_submission: false, } } fn fetch( &mut self, mailbox_hash: MailboxHash, ) -> Result>> + Send + 'static>>> { let mut state = FetchState { mailbox_hash, uid_store: self.uid_store.clone(), connection: self.connection.clone(), high_low_total: None, }; Ok(Box::pin(async_stream::try_stream! { { let f = &state.uid_store.mailboxes.lock().await[&state.mailbox_hash]; f.exists.lock().unwrap().clear(); f.unseen.lock().unwrap().clear(); }; loop { if let Some(ret) = state.fetch_envs().await? { yield ret; continue; } break; } })) } fn refresh(&mut self, _mailbox_hash: MailboxHash) -> ResultFuture<()> { Err(MeliError::new("Unimplemented.")) } fn mailboxes(&self) -> ResultFuture> { let uid_store = self.uid_store.clone(); let connection = self.connection.clone(); Ok(Box::pin(async move { NntpType::nntp_mailboxes(&connection).await?; let mailboxes_lck = uid_store.mailboxes.lock().await; let ret = mailboxes_lck .iter() .map(|(h, f)| (*h, Box::new(Clone::clone(f)) as Mailbox)) .collect(); Ok(ret) })) } fn is_online(&self) -> ResultFuture<()> { let connection = self.connection.clone(); Ok(Box::pin(async move { match timeout(std::time::Duration::from_secs(3), connection.lock()).await { Ok(mut conn) => { debug!("is_online"); match debug!(timeout(std::time::Duration::from_secs(3), conn.connect()).await) { Ok(Ok(())) => Ok(()), Err(err) | Ok(Err(err)) => { conn.stream = Err(err.clone()); debug!(conn.connect().await) } } } Err(err) => Err(err), } })) } fn watch(&self) -> ResultFuture<()> { Err(MeliError::new("Unimplemented.")) } fn operation(&self, env_hash: EnvelopeHash) -> Result> { let (uid, mailbox_hash) = if let Some(v) = self.uid_store.hash_index.lock().unwrap().get(&env_hash) { *v } else { return Err(MeliError::new( "Message not found in local cache, it might have been deleted before you requested it." )); }; Ok(Box::new(NntpOp::new( uid, mailbox_hash, self.connection.clone(), self.uid_store.clone(), ))) } fn save( &self, _bytes: Vec, _mailbox_hash: MailboxHash, _flags: Option, ) -> ResultFuture<()> { Err(MeliError::new("NNTP doesn't support saving.")) } fn copy_messages( &mut self, _env_hashes: EnvelopeHashBatch, _source_mailbox_hash: MailboxHash, _destination_mailbox_hash: MailboxHash, _move_: bool, ) -> ResultFuture<()> { Err(MeliError::new("NNTP doesn't support copying/moving.")) } fn set_flags( &mut self, _env_hashes: EnvelopeHashBatch, _mailbox_hash: MailboxHash, _flags: SmallVec<[(std::result::Result, bool); 8]>, ) -> ResultFuture<()> { Err(MeliError::new("NNTP doesn't support flags.")) } fn delete_messages( &mut self, _env_hashes: EnvelopeHashBatch, _mailbox_hash: MailboxHash, ) -> ResultFuture<()> { Err(MeliError::new("NNTP doesn't support deletion.")) } fn tags(&self) -> Option>>> { None } fn as_any(&self) -> &dyn Any { self } fn as_any_mut(&mut self) -> &mut dyn Any { self } fn create_mailbox( &mut self, _path: String, ) -> ResultFuture<(MailboxHash, HashMap)> { Err(MeliError::new("Unimplemented.")) } fn delete_mailbox( &mut self, _mailbox_hash: MailboxHash, ) -> ResultFuture> { Err(MeliError::new("Unimplemented.")) } fn set_mailbox_subscription( &mut self, _mailbox_hash: MailboxHash, _new_val: bool, ) -> ResultFuture<()> { Err(MeliError::new("Unimplemented.")) } fn rename_mailbox( &mut self, _mailbox_hash: MailboxHash, _new_path: String, ) -> ResultFuture { Err(MeliError::new("Unimplemented.")) } fn set_mailbox_permissions( &mut self, _mailbox_hash: MailboxHash, _val: crate::backends::MailboxPermissions, ) -> ResultFuture<()> { Err(MeliError::new("Unimplemented.")) } fn search( &self, _query: crate::search::Query, _mailbox_hash: Option, ) -> ResultFuture> { Err(MeliError::new("Unimplemented.")) } } impl NntpType { pub fn new( s: &AccountSettings, is_subscribed: Box bool + Send + Sync>, event_consumer: BackendEventConsumer, ) -> Result> { let server_hostname = get_conf_val!(s["server_hostname"])?; /*let server_username = get_conf_val!(s["server_username"], "")?; let server_password = if !s.extra.contains_key("server_password_command") { get_conf_val!(s["server_password"], "")?.to_string() } else { let invocation = get_conf_val!(s["server_password_command"])?; let output = std::process::Command::new("sh") .args(&["-c", invocation]) .stdin(std::process::Stdio::piped()) .stdout(std::process::Stdio::piped()) .stderr(std::process::Stdio::piped()) .output()?; if !output.status.success() { return Err(MeliError::new(format!( "({}) server_password_command `{}` returned {}: {}", s.name, get_conf_val!(s["server_password_command"])?, output.status, String::from_utf8_lossy(&output.stderr) ))); } std::str::from_utf8(&output.stdout)?.trim_end().to_string() }; */ let server_port = get_conf_val!(s["server_port"], 119)?; let use_tls = get_conf_val!(s["use_tls"], server_port == 563)?; let use_starttls = use_tls && get_conf_val!(s["use_starttls"], !(server_port == 563))?; let danger_accept_invalid_certs: bool = get_conf_val!(s["danger_accept_invalid_certs"], false)?; let require_auth = get_conf_val!(s["require_auth"], true)?; let server_conf = NntpServerConf { server_hostname: server_hostname.to_string(), server_username: if require_auth { get_conf_val!(s["server_username"])?.to_string() } else { get_conf_val!(s["server_username"], String::new())? }, server_password: if require_auth { get_conf_val!(s["server_password"])?.to_string() } else { get_conf_val!(s["server_password"], String::new())? }, require_auth, server_port, use_tls, use_starttls, danger_accept_invalid_certs, extension_use: NntpExtensionUse { #[cfg(feature = "deflate_compression")] deflate: get_conf_val!(s["use_deflate"], true)?, }, }; let account_hash = { let mut hasher = DefaultHasher::new(); hasher.write(s.name.as_bytes()); hasher.finish() }; let account_name = Arc::new(s.name().to_string()); let mut mailboxes = HashMap::default(); for (k, _f) in s.mailboxes.iter() { let mailbox_hash = get_path_hash!(&k); mailboxes.insert( mailbox_hash, NntpMailbox { hash: mailbox_hash, nntp_path: k.to_string(), high_watermark: Arc::new(Mutex::new(0)), low_watermark: Arc::new(Mutex::new(0)), exists: Default::default(), unseen: Default::default(), }, ); } if mailboxes.is_empty() { return Err(MeliError::new(format!( "{} has no newsgroups configured.", account_name ))); } let uid_store: Arc = Arc::new(UIDStore { offline_cache: false, //get_conf_val!(s["X_header_caching"], false)?, mailboxes: Arc::new(FutureMutex::new(mailboxes)), ..UIDStore::new(account_hash, account_name, event_consumer) }); let connection = NntpConnection::new_connection(&server_conf, uid_store.clone()); Ok(Box::new(NntpType { server_conf, is_subscribed: Arc::new(IsSubscribedFn(is_subscribed)), can_create_flags: Arc::new(Mutex::new(false)), connection: Arc::new(FutureMutex::new(connection)), uid_store, })) } pub async fn nntp_mailboxes(connection: &Arc>) -> Result<()> { let mut res = String::with_capacity(8 * 1024); let mut conn = connection.lock().await; let command = { let mailboxes_lck = conn.uid_store.mailboxes.lock().await; mailboxes_lck .values() .fold("LIST ACTIVE ".to_string(), |mut acc, x| { if acc.len() != "LIST ACTIVE ".len() { acc.push(','); } acc.push_str(x.name()); acc }) }; conn.send_command(command.as_bytes()).await?; conn.read_response(&mut res, true, &["215 "]) .await .chain_err_summary(|| { format!( "Could not get newsgroups {}: expected LIST ACTIVE response but got: {}", &conn.uid_store.account_name, res ) })?; debug!(&res); let mut mailboxes_lck = conn.uid_store.mailboxes.lock().await; for l in res.split_rn().skip(1) { let s = l.split_whitespace().collect::>(); if s.len() != 3 { continue; } let mailbox_hash = get_path_hash!(&s[0]); mailboxes_lck.entry(mailbox_hash).and_modify(|m| { *m.high_watermark.lock().unwrap() = usize::from_str(s[1]).unwrap_or(0); *m.low_watermark.lock().unwrap() = usize::from_str(s[2]).unwrap_or(0); }); } Ok(()) } pub fn validate_config(s: &AccountSettings) -> Result<()> { get_conf_val!(s["server_hostname"])?; get_conf_val!(s["server_username"], String::new())?; if !s.extra.contains_key("server_password_command") { get_conf_val!(s["server_password"], String::new())?; } else if s.extra.contains_key("server_password") { return Err(MeliError::new(format!( "Configuration error ({}): both server_password and server_password_command are set, cannot choose", s.name.as_str(), ))); } let server_port = get_conf_val!(s["server_port"], 119)?; let use_tls = get_conf_val!(s["use_tls"], server_port == 563)?; let use_starttls = get_conf_val!(s["use_starttls"], !(server_port == 563))?; if !use_tls && use_starttls { return Err(MeliError::new(format!( "Configuration error ({}): incompatible use_tls and use_starttls values: use_tls = false, use_starttls = true", s.name.as_str(), ))); } #[cfg(feature = "deflate_compression")] get_conf_val!(s["use_deflate"], true)?; #[cfg(not(feature = "deflate_compression"))] if s.extra.contains_key("use_deflate") { return Err(MeliError::new(format!( "Configuration error ({}): setting `use_deflate` is set but this version of meli isn't compiled with DEFLATE support.", s.name.as_str(), ))); } get_conf_val!(s["danger_accept_invalid_certs"], false)?; Ok(()) } pub fn capabilities(&self) -> Vec { self.uid_store .capabilities .lock() .unwrap() .iter() .map(|c| c.clone()) .collect::>() } } struct FetchState { mailbox_hash: MailboxHash, connection: Arc>, uid_store: Arc, high_low_total: Option<(usize, usize, usize)>, } impl FetchState { async fn fetch_envs(&mut self) -> Result>> { let FetchState { mailbox_hash, ref connection, ref uid_store, ref mut high_low_total, } = self; let mailbox_hash = *mailbox_hash; let mut res = String::with_capacity(8 * 1024); let mut conn = connection.lock().await; if high_low_total.is_none() { conn.select_group(mailbox_hash, true, &mut res).await?; /* * Parameters group Name of newsgroup number Estimated number of articles in the group low Reported low water mark high Reported high water mark */ let s = res.split_whitespace().collect::>(); let path = conn.uid_store.mailboxes.lock().await[&mailbox_hash] .name() .to_string(); if s.len() != 5 { return Err(MeliError::new(format!( "{} Could not select newsgroup {}: expected GROUP response but got: {}", &uid_store.account_name, path, res ))); } let total = usize::from_str(&s[1]).unwrap_or(0); let _low = usize::from_str(&s[2]).unwrap_or(0); let high = usize::from_str(&s[3]).unwrap_or(0); *high_low_total = Some((high, _low, total)); { let f = &uid_store.mailboxes.lock().await[&mailbox_hash]; f.exists.lock().unwrap().set_not_yet_seen(total); f.unseen.lock().unwrap().set_not_yet_seen(total); }; } let (high, low, _) = high_low_total.unwrap(); if high <= low { return Ok(None); } const CHUNK_SIZE: usize = 100; let new_low = std::cmp::max(low, high.saturating_sub(CHUNK_SIZE)); high_low_total.as_mut().unwrap().0 = new_low; conn.send_command(format!("OVER {}-{}", new_low, high).as_bytes()) .await?; conn.read_response(&mut res, true, command_to_replycodes("OVER")) .await .chain_err_summary(|| { format!( "{} Could not select newsgroup: expected OVER response but got: {}", &uid_store.account_name, res ) })?; let mut ret = Vec::with_capacity(high - new_low); //hash_index: Arc>>, //uid_index: Arc>>, { let mut hash_index_lck = uid_store.hash_index.lock().unwrap(); let mut uid_index_lck = uid_store.uid_index.lock().unwrap(); for l in res.split_rn().skip(1) { let (_, (num, env)) = protocol_parser::over_article(&l)?; hash_index_lck.insert(env.hash(), (num, mailbox_hash)); uid_index_lck.insert((mailbox_hash, num), env.hash()); ret.push(env); } } { let hash_set: BTreeSet = ret.iter().map(|env| env.hash()).collect(); let f = &uid_store.mailboxes.lock().await[&mailbox_hash]; f.exists .lock() .unwrap() .insert_existing_set(hash_set.clone()); f.unseen.lock().unwrap().insert_existing_set(hash_set); }; Ok(Some(ret)) } } use futures::future::{self, Either}; async fn timeout(dur: std::time::Duration, f: impl Future) -> Result { futures::pin_mut!(f); match future::select(f, smol::Timer::after(dur)).await { Either::Left((out, _)) => Ok(out), Either::Right(_) => { Err(MeliError::new("Timedout").set_kind(crate::error::ErrorKind::Network)) } } }