Browse Source

Make get_async() return a Stream

async
Manos Pitsidianakis 2 years ago
parent
commit
ee10cdbcd5
Signed by: epilys GPG Key ID: 73627C2F690DF710
  1. 1
      melib/Cargo.toml
  2. 2
      melib/src/backends.rs
  3. 574
      melib/src/backends/imap_async.rs
  4. 5
      melib/src/backends/imap_async/connection.rs
  5. 220
      melib/src/backends/imap_async/watch.rs
  6. 4
      melib/src/backends/maildir/backend.rs
  7. 141
      melib/src/connections.rs
  8. 89
      src/conf/accounts.rs
  9. 15
      src/jobs1.rs

1
melib/Cargo.toml

@ -44,6 +44,7 @@ rusqlite = {version = "0.20.0", optional = true }
libloading = "0.6.2"
futures = "0.3.5"
smol = "0.1.18"
async-stream = "0.2.1"
[features]
default = ["unicode_algorithms", "imap_backend", "maildir_backend", "mbox_backend", "vcard", "sqlite3"]

2
melib/src/backends.rs

@ -307,7 +307,7 @@ pub trait MailBackend: ::std::fmt::Debug + Send + Sync {
fn get_async(
&mut self,
mailbox: &Mailbox,
) -> Result<Pin<Box<dyn Future<Output = Result<Vec<Envelope>>> + Send + 'static>>> {
) -> Result<Pin<Box<dyn Stream<Item = Result<Vec<Envelope>>> + Send + 'static>>> {
Err(MeliError::new("Unimplemented."))
}
fn refresh(

574
melib/src/backends/imap_async.rs

@ -30,8 +30,8 @@ pub use mailbox::*;
//pub use operations::*;
mod connection;
pub use connection::*;
//mod watch;
//pub use watch::*;
mod watch;
pub use watch::*;
mod cache;
pub mod managesieve;
//mod untagged;
@ -181,299 +181,51 @@ impl MailBackend for ImapType {
fn get_async(
&mut self,
mailbox: &Mailbox,
) -> Result<Pin<Box<dyn Future<Output = Result<Vec<Envelope>>> + Send + 'static>>> {
) -> Result<Pin<Box<dyn Stream<Item = Result<Vec<Envelope>>> + Send + 'static>>> {
let uid_store = self.uid_store.clone();
let can_create_flags = self.can_create_flags.clone();
let mailbox_hash = mailbox.hash();
let (permissions, mailbox_path, mailbox_exists, no_select, unseen) = {
let f = &uid_store.mailboxes.read().unwrap()[&mailbox_hash];
(
f.permissions.clone(),
f.imap_path().to_string(),
f.exists.clone(),
f.no_select,
f.unseen.clone(),
)
};
let connection = self.connection.clone();
Ok(Box::pin(async move {
if no_select {
return Ok(vec![]);
}
let mut our_unseen: BTreeSet<EnvelopeHash> = Default::default();
let mut valid_hash_set: HashSet<EnvelopeHash> = HashSet::default();
let (cached_hash_set, mut payload): (HashSet<EnvelopeHash>, Vec<Envelope>) =
(|| -> Result<(HashSet<EnvelopeHash>, Vec<Envelope>)> {
if !uid_store.cache_headers {
return Ok(Default::default());
}
let uidvalidities = uid_store.uidvalidity.lock().unwrap();
let v = if let Some(v) = uidvalidities.get(&mailbox_hash) {
v
} else {
return Ok(Default::default());
};
let cached_envs: (cache::MaxUID, Vec<(UID, Envelope)>);
cache::save_envelopes(uid_store.account_hash, mailbox_hash, *v, &[])
.chain_err_summary(|| "Could not save envelopes in cache in get()")?;
cached_envs = cache::get_envelopes(uid_store.account_hash, mailbox_hash, *v)
.chain_err_summary(|| "Could not get envelopes in cache in get()")?;
let (_max_uid, envelopes) = debug!(cached_envs);
let ret = envelopes.iter().map(|(_, env)| env.hash()).collect();
if !envelopes.is_empty() {
let mut payload = vec![];
for (uid, env) in envelopes {
if !env.is_seen() {
our_unseen.insert(env.hash());
}
uid_store
.hash_index
.lock()
.unwrap()
.insert(env.hash(), (uid, mailbox_hash));
uid_store
.uid_index
.lock()
.unwrap()
.insert((mailbox_hash, uid), env.hash());
payload.push(env);
}
debug!("sending cached payload for {}", mailbox_hash);
unseen.lock().unwrap().insert_set(our_unseen.clone());
return Ok((ret, payload));
}
Ok((ret, vec![]))
})()
.unwrap_or_default();
let mut conn = connection.lock().await;
debug!("locked for get {}", mailbox_path);
let mut response = String::with_capacity(8 * 1024);
conn.create_uid_msn_cache(mailbox_hash, 1).await?;
/* first SELECT the mailbox to get READ/WRITE permissions (because EXAMINE only
* returns READ-ONLY for both cases) */
conn.select_mailbox(mailbox_hash, &mut response)
.await
.chain_err_summary(|| format!("Could not select mailbox {}", mailbox_path))?;
let mut examine_response = protocol_parser::select_response(&response)
.chain_err_summary(|| {
format!(
"Could not parse select response for mailbox {}",
mailbox_path
)
})?;
*can_create_flags.lock().unwrap() = examine_response.can_create_flags;
debug!(
"mailbox: {} examine_response: {:?}",
mailbox_path, examine_response
);
{
let mut uidvalidities = uid_store.uidvalidity.lock().unwrap();
let v = uidvalidities
.entry(mailbox_hash)
.or_insert(examine_response.uidvalidity);
if uid_store.cache_headers {
let _ = cache::save_envelopes(
uid_store.account_hash,
mailbox_hash,
examine_response.uidvalidity,
&[],
);
}
*v = examine_response.uidvalidity;
let mut permissions = permissions.lock().unwrap();
permissions.create_messages = !examine_response.read_only;
permissions.remove_messages = !examine_response.read_only;
permissions.set_flags = !examine_response.read_only;
permissions.rename_messages = !examine_response.read_only;
permissions.delete_messages = !examine_response.read_only;
permissions.delete_messages = !examine_response.read_only;
mailbox_exists
.lock()
.unwrap()
.set_not_yet_seen(examine_response.exists);
}
if examine_response.exists == 0 {
if uid_store.cache_headers {
for &env_hash in &cached_hash_set {
conn.add_refresh_event(RefreshEvent {
account_hash: uid_store.account_hash,
mailbox_hash,
kind: RefreshEventKind::Remove(env_hash),
});
}
let _ = cache::save_envelopes(
uid_store.account_hash,
mailbox_hash,
examine_response.uidvalidity,
&[],
);
}
return Ok(Vec::new());
}
/* reselecting the same mailbox with EXAMINE prevents expunging it */
conn.examine_mailbox(mailbox_hash, &mut response).await?;
if examine_response.uidnext == 0 {
/* UIDNEXT shouldn't be 0, since exists != 0 at this point */
conn.send_command(format!("STATUS \"{}\" (UIDNEXT)", mailbox_path).as_bytes())
.await?;
conn.read_response(&mut response, RequiredResponses::STATUS)
.await?;
let (_, status) = protocol_parser::status_response(response.as_bytes())?;
if let Some(uidnext) = status.uidnext {
if uidnext == 0 {
return Err(MeliError::new(
"IMAP server error: zero UIDNEXt with nonzero exists.",
));
}
examine_response.uidnext = uidnext;
} else {
return Err(MeliError::new("IMAP server did not reply with UIDNEXT"));
}
}
let mut max_uid_left: usize = examine_response.uidnext - 1;
while max_uid_left > 0 {
let mut envelopes = vec![];
debug!("{} max_uid_left= {}", mailbox_hash, max_uid_left);
if max_uid_left == 1 {
debug!("UID FETCH 1 (UID FLAGS ENVELOPE BODYSTRUCTURE)");
conn.send_command(b"UID FETCH 1 (UID FLAGS ENVELOPE BODYSTRUCTURE)")
.await?;
} else {
conn.send_command(
debug!(format!(
"UID FETCH {}:{} (UID FLAGS ENVELOPE BODYSTRUCTURE)",
std::cmp::max(std::cmp::max(max_uid_left.saturating_sub(500), 1), 1),
max_uid_left
))
.as_bytes(),
)
.await?
};
conn.read_response(&mut response, RequiredResponses::FETCH_REQUIRED)
.await
.chain_err_summary(|| {
format!(
"Could not parse fetch response for mailbox {}",
mailbox_path
)
})?;
debug!(
"fetch response is {} bytes and {} lines",
response.len(),
response.lines().collect::<Vec<&str>>().len()
);
let (_, v, _) = protocol_parser::uid_fetch_responses(&response)?;
debug!("responses len is {}", v.len());
for UidFetchResponse {
uid,
message_sequence_number,
flags,
envelope,
..
} in v
{
let mut env = envelope.unwrap();
let mut h = DefaultHasher::new();
h.write_usize(uid);
h.write(mailbox_path.as_bytes());
env.set_hash(h.finish());
debug!(
"env hash {} {} UID = {} MSN = {}",
env.hash(),
env.subject(),
uid,
message_sequence_number
);
valid_hash_set.insert(env.hash());
let mut tag_lck = uid_store.tag_index.write().unwrap();
if let Some((flags, keywords)) = flags {
if !flags.intersects(Flag::SEEN) {
our_unseen.insert(env.hash());
}
env.set_flags(flags);
for f in keywords {
let hash = tag_hash!(f);
if !tag_lck.contains_key(&hash) {
tag_lck.insert(hash, f);
}
env.labels_mut().push(hash);
}
}
uid_store
.msn_index
.lock()
.unwrap()
.entry(mailbox_hash)
.or_default()
.insert(message_sequence_number - 1, uid);
uid_store
.hash_index
.lock()
.unwrap()
.insert(env.hash(), (uid, mailbox_hash));
uid_store
.uid_index
.lock()
.unwrap()
.insert((mailbox_hash, uid), env.hash());
envelopes.push((uid, env));
}
max_uid_left = std::cmp::max(std::cmp::max(max_uid_left.saturating_sub(500), 1), 1);
debug!("sending payload for {}", mailbox_hash);
if uid_store.cache_headers {
cache::save_envelopes(
uid_store.account_hash,
mailbox_hash,
examine_response.uidvalidity,
&envelopes
.iter()
.map(|(uid, env)| (*uid, env))
.collect::<SmallVec<[(UID, &Envelope); 1024]>>(),
)
.chain_err_summary(|| {
format!(
"Could not save envelopes in cache for mailbox {}",
mailbox_path
)
})?;
}
for &env_hash in cached_hash_set.difference(&valid_hash_set) {
conn.add_refresh_event(RefreshEvent {
account_hash: uid_store.account_hash,
mailbox_hash,
kind: RefreshEventKind::Remove(env_hash),
});
}
let progress = envelopes.len();
unseen
.lock()
.unwrap()
.insert_set(our_unseen.iter().cloned().collect());
mailbox_exists.lock().unwrap().insert_existing_set(
envelopes.iter().map(|(_, env)| env.hash()).collect::<_>(),
);
payload.extend(envelopes.into_iter().map(|(_, env)| env));
if max_uid_left == 1 {
break;
let mut max_uid: Option<usize> = None;
let mut valid_hash_set: HashSet<EnvelopeHash> = HashSet::default();
let mut our_unseen: BTreeSet<EnvelopeHash> = Default::default();
Ok(Box::pin(async_stream::try_stream! {
let (cached_hash_set, cached_payload) = get_cached_envs(mailbox_hash, &mut our_unseen, &uid_store)?;
yield cached_payload;
loop {
let res = get_hlpr(&connection, mailbox_hash,&cached_hash_set, &can_create_flags, &mut our_unseen, &mut valid_hash_set, &uid_store, &mut max_uid).await?;
yield res;
if max_uid == Some(1) {
return;
}
}
Ok(payload)
}))
}
fn refresh_async(
&mut self,
_mailbox_hash: MailboxHash,
_sender: RefreshEventConsumer,
mailbox_hash: MailboxHash,
sender: RefreshEventConsumer,
) -> Result<Pin<Box<dyn Future<Output = Result<()>> + Send + 'static>>> {
Err(MeliError::new("Unimplemented."))
let inbox = self
.uid_store
.mailboxes
.read()
.unwrap()
.get(&mailbox_hash)
.map(std::clone::Clone::clone)
.unwrap();
let main_conn = self.connection.clone();
*self.uid_store.sender.write().unwrap() = Some(sender);
let uid_store = self.uid_store.clone();
Ok(Box::pin(async move {
let mut conn = main_conn.lock().await;
watch::examine_updates(&inbox, &mut conn, &uid_store).await?;
Ok(())
}))
}
fn mailboxes_async(
&self,
) -> Result<
@ -1813,3 +1565,259 @@ async fn get_initial_max_uid(
}
Ok(examine_response.uidnext - 1)
}
async fn get_hlpr(
connection: &Arc<FutureMutex<ImapConnection>>,
mailbox_hash: MailboxHash,
cached_hash_set: &HashSet<EnvelopeHash>,
can_create_flags: &Arc<Mutex<bool>>,
our_unseen: &mut BTreeSet<EnvelopeHash>,
valid_hash_set: &mut HashSet<EnvelopeHash>,
uid_store: &UIDStore,
max_uid: &mut Option<usize>,
) -> Result<Vec<Envelope>> {
let (permissions, mailbox_path, mailbox_exists, no_select, unseen) = {
let f = &uid_store.mailboxes.read().unwrap()[&mailbox_hash];
(
f.permissions.clone(),
f.imap_path().to_string(),
f.exists.clone(),
f.no_select,
f.unseen.clone(),
)
};
let mut conn = connection.lock().await;
debug!("locked for get {}", mailbox_path);
let mut response = String::with_capacity(8 * 1024);
let max_uid_left = if let Some(max_uid) = max_uid {
*max_uid
} else {
conn.create_uid_msn_cache(mailbox_hash, 1).await?;
/* first SELECT the mailbox to get READ/WRITE permissions (because EXAMINE only
* returns READ-ONLY for both cases) */
conn.select_mailbox(mailbox_hash, &mut response)
.await
.chain_err_summary(|| format!("Could not select mailbox {}", mailbox_path))?;
let mut examine_response =
protocol_parser::select_response(&response).chain_err_summary(|| {
format!(
"Could not parse select response for mailbox {}",
mailbox_path
)
})?;
*can_create_flags.lock().unwrap() = examine_response.can_create_flags;
debug!(
"mailbox: {} examine_response: {:?}",
mailbox_path, examine_response
);
{
let mut uidvalidities = uid_store.uidvalidity.lock().unwrap();
let v = uidvalidities
.entry(mailbox_hash)
.or_insert(examine_response.uidvalidity);
if uid_store.cache_headers {
let _ = cache::save_envelopes(
uid_store.account_hash,
mailbox_hash,
examine_response.uidvalidity,
&[],
);
}
*v = examine_response.uidvalidity;
let mut permissions = permissions.lock().unwrap();
permissions.create_messages = !examine_response.read_only;
permissions.remove_messages = !examine_response.read_only;
permissions.set_flags = !examine_response.read_only;
permissions.rename_messages = !examine_response.read_only;
permissions.delete_messages = !examine_response.read_only;
permissions.delete_messages = !examine_response.read_only;
mailbox_exists
.lock()
.unwrap()
.set_not_yet_seen(examine_response.exists);
}
if examine_response.exists == 0 {
if uid_store.cache_headers {
/*
for &env_hash in &cached_hash_set {
conn.add_refresh_event(RefreshEvent {
account_hash: uid_store.account_hash,
mailbox_hash,
kind: RefreshEventKind::Remove(env_hash),
});
}
*/
let _ = cache::save_envelopes(
uid_store.account_hash,
mailbox_hash,
examine_response.uidvalidity,
&[],
);
}
*max_uid = Some(0);
return Ok(Vec::new());
}
/* reselecting the same mailbox with EXAMINE prevents expunging it */
conn.examine_mailbox(mailbox_hash, &mut response).await?;
if examine_response.uidnext == 0 {
/* UIDNEXT shouldn't be 0, since exists != 0 at this point */
conn.send_command(format!("STATUS \"{}\" (UIDNEXT)", mailbox_path).as_bytes())
.await?;
conn.read_response(&mut response, RequiredResponses::STATUS)
.await?;
let (_, status) = protocol_parser::status_response(response.as_bytes())?;
if let Some(uidnext) = status.uidnext {
if uidnext == 0 {
return Err(MeliError::new(
"IMAP server error: zero UIDNEXt with nonzero exists.",
));
}
examine_response.uidnext = uidnext;
} else {
return Err(MeliError::new("IMAP server did not reply with UIDNEXT"));
}
}
*max_uid = Some(examine_response.uidnext - 1);
examine_response.uidnext - 1
};
let chunk_size = 200;
let mut payload = vec![];
if conn.current_mailbox != Some(mailbox_hash) {
conn.examine_mailbox(mailbox_hash, &mut response).await?;
}
if max_uid_left > 0 {
let mut envelopes = vec![];
debug!("{} max_uid_left= {}", mailbox_hash, max_uid_left);
if max_uid_left == 1 {
debug!("UID FETCH 1 (UID FLAGS ENVELOPE BODYSTRUCTURE)");
conn.send_command(b"UID FETCH 1 (UID FLAGS ENVELOPE BODYSTRUCTURE)")
.await?;
} else {
conn.send_command(
debug!(format!(
"UID FETCH {}:{} (UID FLAGS ENVELOPE BODYSTRUCTURE)",
std::cmp::max(std::cmp::max(max_uid_left.saturating_sub(chunk_size), 1), 1),
max_uid_left
))
.as_bytes(),
)
.await?
};
conn.read_response(&mut response, RequiredResponses::FETCH_REQUIRED)
.await
.chain_err_summary(|| {
format!(
"Could not parse fetch response for mailbox {}",
mailbox_path
)
})?;
drop(conn);
debug!(
"fetch response is {} bytes and {} lines",
response.len(),
response.lines().collect::<Vec<&str>>().len()
);
let (_, v, _) = protocol_parser::uid_fetch_responses(&response)?;
debug!("responses len is {}", v.len());
for UidFetchResponse {
uid,
message_sequence_number,
flags,
envelope,
..
} in v
{
let mut env = envelope.unwrap();
let mut h = DefaultHasher::new();
h.write_usize(uid);
h.write(mailbox_path.as_bytes());
env.set_hash(h.finish());
debug!(
"env hash {} {} UID = {} MSN = {}",
env.hash(),
env.subject(),
uid,
message_sequence_number
);
valid_hash_set.insert(env.hash());
let mut tag_lck = uid_store.tag_index.write().unwrap();
if let Some((flags, keywords)) = flags {
if !flags.intersects(Flag::SEEN) {
our_unseen.insert(env.hash());
}
env.set_flags(flags);
for f in keywords {
let hash = tag_hash!(f);
if !tag_lck.contains_key(&hash) {
tag_lck.insert(hash, f);
}
env.labels_mut().push(hash);
}
}
uid_store
.msn_index
.lock()
.unwrap()
.entry(mailbox_hash)
.or_default()
.insert(message_sequence_number - 1, uid);
uid_store
.hash_index
.lock()
.unwrap()
.insert(env.hash(), (uid, mailbox_hash));
uid_store
.uid_index
.lock()
.unwrap()
.insert((mailbox_hash, uid), env.hash());
envelopes.push((uid, env));
}
debug!("sending payload for {}", mailbox_hash);
if uid_store.cache_headers {
//FIXME
/*
cache::save_envelopes(
uid_store.account_hash,
mailbox_hash,
examine_response.uidvalidity,
&envelopes
.iter()
.map(|(uid, env)| (*uid, env))
.collect::<SmallVec<[(UID, &Envelope); 1024]>>(),
)
.chain_err_summary(|| {
format!(
"Could not save envelopes in cache for mailbox {}",
mailbox_path
)
})?;
*/
}
/*
for &env_hash in cached_hash_set.difference(&valid_hash_set) {
conn.add_refresh_event(RefreshEvent {
account_hash: uid_store.account_hash,
mailbox_hash,
kind: RefreshEventKind::Remove(env_hash),
});
}
*/
unseen
.lock()
.unwrap()
.insert_set(our_unseen.iter().cloned().collect());
mailbox_exists
.lock()
.unwrap()
.insert_existing_set(envelopes.iter().map(|(_, env)| env.hash()).collect::<_>());
payload.extend(envelopes.into_iter().map(|(_, env)| env));
}
*max_uid = Some(std::cmp::max(
std::cmp::max(max_uid_left.saturating_sub(chunk_size), 1),
1,
));
Ok(payload)
}

5
melib/src/backends/imap_async/connection.rs

@ -148,14 +148,15 @@ impl ImapStream {
}
{
// FIXME: This is blocking
let socket = socket.into_inner()?;
let mut conn_result = debug!(connector.connect(path, socket));
let mut conn_result = connector.connect(path, socket);
if let Err(native_tls::HandshakeError::WouldBlock(midhandshake_stream)) =
conn_result
{
let mut midhandshake_stream = Some(midhandshake_stream);
loop {
match debug!(midhandshake_stream.take().unwrap().handshake()) {
match midhandshake_stream.take().unwrap().handshake() {
Ok(r) => {
conn_result = Ok(r);
break;

220
melib/src/backends/imap_async/watch.rs

@ -27,21 +27,18 @@ use std::sync::{Arc, Mutex};
/// Arguments for IMAP watching functions
pub struct ImapWatchKit {
pub conn: ImapConnection,
pub main_conn: Arc<Mutex<ImapConnection>>,
pub main_conn: Arc<FutureMutex<ImapConnection>>,
pub uid_store: Arc<UIDStore>,
pub work_context: WorkContext,
}
macro_rules! exit_on_error {
($conn:expr, $mailbox_hash:ident, $work_context:ident, $thread_id:ident, $($result:expr)+) => {
($conn:expr, $mailbox_hash:ident, $thread_id:ident, $($result:expr)+) => {
$(if let Err(e) = $result {
*$conn.uid_store.is_online.lock().unwrap() = (
Instant::now(),
Err(e.clone()),
);
debug!("failure: {}", e.to_string());
$work_context.set_status.send(($thread_id, e.to_string())).unwrap();
$work_context.finished.send($thread_id).unwrap();
let account_hash = $conn.uid_store.account_hash;
$conn.add_refresh_event(RefreshEvent {
account_hash,
@ -53,50 +50,30 @@ macro_rules! exit_on_error {
};
}
pub fn poll_with_examine(kit: ImapWatchKit) -> Result<()> {
pub async fn poll_with_examine(kit: ImapWatchKit) -> Result<()> {
debug!("poll with examine");
let ImapWatchKit {
mut conn,
main_conn,
uid_store,
work_context,
} = kit;
loop {
if super::try_lock(&uid_store.is_online, Some(std::time::Duration::new(10, 0)))?
.1
.is_ok()
{
break;
}
std::thread::sleep(std::time::Duration::from_millis(100));
}
conn.connect()?;
conn.connect().await?;
let mut response = String::with_capacity(8 * 1024);
let thread_id: std::thread::ThreadId = std::thread::current().id();
loop {
work_context
.set_status
.send((thread_id, "sleeping...".to_string()))
.unwrap();
std::thread::sleep(std::time::Duration::from_millis(5 * 60 * 1000));
let mailboxes = uid_store.mailboxes.read()?;
for mailbox in mailboxes.values() {
work_context
.set_status
.send((
thread_id,
format!("examining `{}` for updates...", mailbox.path()),
))
.unwrap();
examine_updates(mailbox, &mut conn, &uid_store, &work_context)?;
examine_updates(mailbox, &mut conn, &uid_store).await?;
}
let mut main_conn = super::try_lock(&main_conn, Some(std::time::Duration::new(10, 0)))?;
main_conn.send_command(b"NOOP")?;
main_conn.read_response(&mut response, RequiredResponses::empty())?;
let mut main_conn = main_conn.lock().await;
main_conn.send_command(b"NOOP").await?;
main_conn
.read_response(&mut response, RequiredResponses::empty())
.await?;
}
}
pub fn idle(kit: ImapWatchKit) -> Result<()> {
pub async fn idle(kit: ImapWatchKit) -> Result<()> {
debug!("IDLE");
/* IDLE only watches the connection's selected mailbox. We will IDLE on INBOX and every ~5
* minutes wake up and poll the others */
@ -104,18 +81,8 @@ pub fn idle(kit: ImapWatchKit) -> Result<()> {
mut conn,
main_conn,
uid_store,
work_context,
} = kit;
loop {
if super::try_lock(&uid_store.is_online, Some(std::time::Duration::new(10, 0)))?
.1
.is_ok()
{
break;
}
std::thread::sleep(std::time::Duration::from_millis(100));
}
conn.connect()?;
conn.connect().await?;
let thread_id: std::thread::ThreadId = std::thread::current().id();
let mailbox: ImapMailbox = match uid_store
.mailboxes
@ -129,10 +96,6 @@ pub fn idle(kit: ImapWatchKit) -> Result<()> {
None => {
let err = MeliError::new("INBOX mailbox not found in local mailbox index. meli may have not parsed the IMAP mailboxes correctly");
debug!("failure: {}", err.to_string());
work_context
.set_status
.send((thread_id, err.to_string()))
.unwrap();
conn.add_refresh_event(RefreshEvent {
account_hash: uid_store.account_hash,
mailbox_hash: 0,
@ -147,10 +110,11 @@ pub fn idle(kit: ImapWatchKit) -> Result<()> {
exit_on_error!(
conn,
mailbox_hash,
work_context,
thread_id,
conn.send_command(format!("SELECT \"{}\"", mailbox.imap_path()).as_bytes())
.await
conn.read_response(&mut response, RequiredResponses::SELECT_REQUIRED)
.await
);
debug!("select response {}", &response);
{
@ -204,14 +168,9 @@ pub fn idle(kit: ImapWatchKit) -> Result<()> {
exit_on_error!(
conn,
mailbox_hash,
work_context,
thread_id,
conn.send_command(b"IDLE")
conn.send_command(b"IDLE").await
);
work_context
.set_status
.send((thread_id, "IDLEing".to_string()))
.unwrap();
let mut iter = ImapBlockingConnection::from(conn);
let mut beat = std::time::Instant::now();
let mut watch = std::time::Instant::now();
@ -222,46 +181,30 @@ pub fn idle(kit: ImapWatchKit) -> Result<()> {
while let Some(line) = iter.next() {
let now = std::time::Instant::now();
if now.duration_since(beat) >= _26_mins {
let mut main_conn_lck =
super::try_lock(&main_conn, Some(std::time::Duration::new(10, 0)))?;
let mut main_conn_lck = main_conn.lock().await;
exit_on_error!(
iter.conn,
mailbox_hash,
work_context,
thread_id,
iter.conn.set_nonblocking(true)
iter.conn.send_raw(b"DONE")
iter.conn.read_response(&mut response, RequiredResponses::empty())
iter.conn.send_command(b"IDLE")
iter.conn.set_nonblocking(false)
main_conn_lck.send_command(b"NOOP")
main_conn_lck.read_response(&mut response, RequiredResponses::empty())
iter.conn.send_raw(b"DONE").await
iter.conn.read_response(&mut response, RequiredResponses::empty()).await
iter.conn.send_command(b"IDLE").await
main_conn_lck.send_command(b"NOOP").await
main_conn_lck.read_response(&mut response, RequiredResponses::empty()).await
);
beat = now;
}
if now.duration_since(watch) >= _5_mins {
/* Time to poll all inboxes */
let mut conn = try_lock(&main_conn, Some(std::time::Duration::new(10, 0)))?;
let mut conn = main_conn.lock().await;
for mailbox in uid_store.mailboxes.read().unwrap().values() {
work_context
.set_status
.send((
thread_id,
format!("examining `{}` for updates...", mailbox.path()),
))
.unwrap();
exit_on_error!(
conn,
mailbox_hash,
work_context,
thread_id,
examine_updates(mailbox, &mut conn, &uid_store, &work_context,)
examine_updates(mailbox, &mut conn, &uid_store).await
);
}
work_context
.set_status
.send((thread_id, "done examining mailboxes.".to_string()))
.unwrap();
watch = now;
}
*uid_store.is_online.lock().unwrap() = (Instant::now(), Ok(()));
@ -270,20 +213,15 @@ pub fn idle(kit: ImapWatchKit) -> Result<()> {
.map_err(MeliError::from)
{
Ok(Some(Recent(r))) => {
let mut conn = super::try_lock(&main_conn, Some(std::time::Duration::new(10, 0)))?;
work_context
.set_status
.send((thread_id, format!("got `{} RECENT` notification", r)))
.unwrap();
let mut conn = main_conn.lock().await;
/* UID SEARCH RECENT */
exit_on_error!(
conn,
mailbox_hash,
work_context,
thread_id,
conn.examine_mailbox(mailbox_hash, &mut response)
conn.send_command(b"UID SEARCH RECENT")
conn.read_response(&mut response, RequiredResponses::SEARCH)
conn.examine_mailbox(mailbox_hash, &mut response).await
conn.send_command(b"UID SEARCH RECENT").await
conn.read_response(&mut response, RequiredResponses::SEARCH).await
);
match protocol_parser::search_results_raw(response.as_bytes())
.map(|(_, v)| v)
@ -296,13 +234,12 @@ pub fn idle(kit: ImapWatchKit) -> Result<()> {
exit_on_error!(
conn,
mailbox_hash,
work_context,
thread_id,
conn.send_command(
&[&b"UID FETCH"[..], &v.trim().split(|b| b == &b' ').join(b','), &b"(FLAGS RFC822)"[..]]
.join(&b' '),
)
conn.read_response(&mut response, RequiredResponses::FETCH_REQUIRED)
).await
conn.read_response(&mut response, RequiredResponses::FETCH_REQUIRED).await
);
debug!(&response);
match protocol_parser::uid_fetch_responses(&response) {
@ -313,13 +250,6 @@ pub fn idle(kit: ImapWatchKit) -> Result<()> {
uid, flags, body, ..
} in v
{
work_context
.set_status
.send((
thread_id,
format!("parsing {}/{} envelopes..", ctr, len),
))
.unwrap();
ctr += 1;
if !uid_store
.uid_index
@ -383,10 +313,6 @@ pub fn idle(kit: ImapWatchKit) -> Result<()> {
}
}
}
work_context
.set_status
.send((thread_id, format!("parsed {}/{} envelopes.", ctr, len)))
.unwrap();
}
Err(e) => {
debug!(e);
@ -409,11 +335,7 @@ pub fn idle(kit: ImapWatchKit) -> Result<()> {
// immediately decremented by 1, and this decrement is reflected in
// message sequence numbers in subsequent responses (including other
// untagged EXPUNGE responses).
let mut conn = super::try_lock(&main_conn, Some(std::time::Duration::new(10, 0)))?;
work_context
.set_status
.send((thread_id, format!("got `{} EXPUNGED` notification", n)))
.unwrap();
let mut conn = main_conn.lock().await;
let deleted_uid = uid_store
.msn_index
.lock()
@ -436,30 +358,17 @@ pub fn idle(kit: ImapWatchKit) -> Result<()> {
});
}
Ok(Some(Exists(n))) => {
let mut conn = super::try_lock(&main_conn, Some(std::time::Duration::new(10, 0)))?;
let mut conn = main_conn.lock().await;
/* UID FETCH ALL UID, cross-ref, then FETCH difference headers
* */
let mut prev_exists = mailbox.exists.lock().unwrap();
debug!("exists {}", n);
work_context
.set_status
.send((
thread_id,
format!(
"got `{} EXISTS` notification (EXISTS was previously {} for {}",
n,
prev_exists.len(),
mailbox.path()
),
))
.unwrap();
if n > prev_exists.len() {
exit_on_error!(
conn,
mailbox_hash,
work_context,
thread_id,
conn.examine_mailbox(mailbox_hash, &mut response)
conn.examine_mailbox(mailbox_hash, &mut response).await
conn.send_command(
&[
b"FETCH",
@ -467,8 +376,8 @@ pub fn idle(kit: ImapWatchKit) -> Result<()> {
b"(UID FLAGS RFC822)",
]
.join(&b' '),
)
conn.read_response(&mut response, RequiredResponses::FETCH_REQUIRED)
).await
conn.read_response(&mut response, RequiredResponses::FETCH_REQUIRED).await
);
match protocol_parser::uid_fetch_responses(&response) {
Ok((_, v, _)) => {
@ -478,13 +387,6 @@ pub fn idle(kit: ImapWatchKit) -> Result<()> {
uid, flags, body, ..
} in v
{
work_context
.set_status
.send((
thread_id,
format!("parsing {}/{} envelopes..", ctr, len),
))
.unwrap();
if uid_store
.uid_index
.lock()
@ -545,10 +447,6 @@ pub fn idle(kit: ImapWatchKit) -> Result<()> {
});
}
}
work_context
.set_status
.send((thread_id, format!("parsed {}/{} envelopes.", ctr, len)))
.unwrap();
}
Err(e) => {
debug!(e);
@ -560,22 +458,21 @@ pub fn idle(kit: ImapWatchKit) -> Result<()> {
/* a * {msg_seq} FETCH (FLAGS ({flags})) was received, so find out UID from msg_seq
* and send update
*/
let mut conn = super::try_lock(&main_conn, Some(std::time::Duration::new(10, 0)))?;
let mut conn = main_conn.lock().await;
debug!("fetch {} {:?}", msg_seq, flags);
exit_on_error!(
conn,
mailbox_hash,
work_context,
thread_id,
conn.examine_mailbox(mailbox_hash, &mut response)
conn.examine_mailbox(mailbox_hash, &mut response).await
conn.send_command(
&[
b"UID SEARCH ",
format!("{}", msg_seq).as_bytes(),
]
.join(&b' '),
)
conn.read_response(&mut response, RequiredResponses::SEARCH)
).await
conn.read_response(&mut response, RequiredResponses::SEARCH).await
);
match search_results(response.split_rn().next().unwrap_or("").as_bytes())
.map(|(_, v)| v)
@ -610,19 +507,10 @@ pub fn idle(kit: ImapWatchKit) -> Result<()> {
Ok(Some(Bye { .. })) => break,
Ok(None) | Err(_) => {}
}
work_context
.set_status
.send((thread_id, "IDLEing".to_string()))
.unwrap();
}
debug!("IDLE connection dropped");
let err: &str = iter.err().unwrap_or("Unknown reason.");
work_context
.set_status
.send((thread_id, "IDLE connection dropped".to_string()))
.unwrap();
work_context.finished.send(thread_id).unwrap();
main_conn.lock().unwrap().add_refresh_event(RefreshEvent {
main_conn.lock().await.add_refresh_event(RefreshEvent {
account_hash: uid_store.account_hash,
mailbox_hash,
kind: RefreshEventKind::Failure(MeliError::new(format!(
@ -633,22 +521,19 @@ pub fn idle(kit: ImapWatchKit) -> Result<()> {
Err(MeliError::new(format!("IDLE connection dropped: {}", err)))
}
pub fn examine_updates(
pub async fn examine_updates(
mailbox: &ImapMailbox,
conn: &mut ImapConnection,
uid_store: &Arc<UIDStore>,
work_context: &WorkContext,
) -> Result<()> {
let thread_id: std::thread::ThreadId = std::thread::current().id();
let mailbox_hash = mailbox.hash();
debug!("examining mailbox {} {}", mailbox_hash, mailbox.path());
let mut response = String::with_capacity(8 * 1024);
exit_on_error!(
conn,
mailbox_hash,
work_context,
thread_id,
conn.examine_mailbox(mailbox_hash, &mut response)
conn.examine_mailbox(mailbox_hash, &mut response).await
);
*uid_store.is_online.lock().unwrap() = (Instant::now(), Ok(()));
let uidvalidity;
@ -690,7 +575,6 @@ pub fn examine_updates(
});
}
}
let mut prev_exists = mailbox.exists.lock().unwrap();
let n = ok.exists;
if ok.recent > 0 {
{
@ -698,10 +582,9 @@ pub fn examine_updates(
exit_on_error!(
conn,
mailbox_hash,
work_context,
thread_id,
conn.send_command(b"UID SEARCH RECENT")
conn.read_response(&mut response, RequiredResponses::SEARCH)
conn.send_command(b"UID SEARCH RECENT").await
conn.read_response(&mut response, RequiredResponses::SEARCH).await
);
match protocol_parser::search_results_raw(response.as_bytes())
.map(|(_, v)| v)
@ -714,13 +597,12 @@ pub fn examine_updates(
exit_on_error!(
conn,
mailbox_hash,
work_context,
thread_id,
conn.send_command(
&[&b"UID FETCH"[..], &v.trim().split(|b| b == &b' ').join(b','), &b"(FLAGS RFC822)"[..]]
.join(&b' '),
)
conn.read_response(&mut response, RequiredResponses::FETCH_REQUIRED)
).await
conn.read_response(&mut response, RequiredResponses::FETCH_REQUIRED).await
);
debug!(&response);
match protocol_parser::uid_fetch_responses(&response) {
@ -786,6 +668,7 @@ pub fn examine_updates(
&[(uid, &env)],
)?;
}
let mut prev_exists = mailbox.exists.lock().unwrap();
prev_exists.insert_new(env.hash());
conn.add_refresh_event(RefreshEvent {
@ -810,24 +693,23 @@ pub fn examine_updates(
}
}
}
} else if n > prev_exists.len() {
} else if n > mailbox.exists.lock().unwrap().len() {
/* UID FETCH ALL UID, cross-ref, then FETCH difference headers
* */
debug!("exists {}", n);
exit_on_error!(
conn,
mailbox_hash,
work_context,
thread_id,
conn.send_command(
&[
b"FETCH",
format!("{}:{}", prev_exists.len() + 1, n).as_bytes(),
format!("{}:{}", mailbox.exists.lock().unwrap().len() + 1, n).as_bytes(),
b"(UID FLAGS RFC822)",
]
.join(&b' '),
)
conn.read_response(&mut response, RequiredResponses::FETCH_REQUIRED)
).await
conn.read_response(&mut response, RequiredResponses::FETCH_REQUIRED).await
);
match protocol_parser::uid_fetch_responses(&response) {
Ok((_, v, _)) => {
@ -883,7 +765,7 @@ pub fn examine_updates(
&[(uid, &env)],
)?;
}
prev_exists.insert_new(env.hash());
mailbox.exists.lock().unwrap().insert_new(env.hash());
conn.add_refresh_event(RefreshEvent {
account_hash: uid_store.account_hash,

4
melib/src/backends/maildir/backend.rs

@ -193,11 +193,10 @@ impl MailBackend for MaildirType {
self.multicore(4, mailbox)
}
/*
fn get_async(
&mut self,
mailbox: &Mailbox,
) -> Result<core::pin::Pin<Box<dyn Future<Output = Result<Vec<Envelope>>> + Send + 'static>>>
) -> Result<core::pin::Pin<Box<dyn Stream<Item = Result<Vec<Envelope>>> + Send + 'static>>>
{
let mailbox: &MaildirMailbox = &self.mailboxes[&self.owned_mailbox_idx(mailbox)];
let mailbox_hash = mailbox.hash();
@ -209,7 +208,6 @@ impl MailBackend for MaildirType {
let mailbox_index = self.mailbox_index.clone();
super::stream::MaildirStream::new(&self.name, mailbox_hash, unseen, total, path, root_path, map, mailbox_index)
}
*/
fn refresh(
&mut self,

141
melib/src/connections.rs

@ -0,0 +1,141 @@
/*
* meli - melib library
*
* Copyright 2020 Manos Pitsidianakis
*
* This file is part of meli.
*
* meli is free software: you can redistribute it and/or modify
* it under the terms of the GNU General Public License as published by
* the Free Software Foundation, either version 3 of the License, or
* (at your option) any later version.
*
* meli is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* GNU General Public License for more details.
*
* You should have received a copy of the GNU General Public License
* along with meli. If not, see <http://www.gnu.org/licenses/>.
*/
#[derive(Debug)]
pub enum Connection {
Tcp(std::net::TcpStream),
Fd(std::os::unix::io::RawFd),
#[cfg(feature = "imap_backend")]
Tls(native_tls::TlsStream<Self>),
}
use Connection::*;
impl Connection {
pub fn set_nonblocking(&self, nonblocking: bool) -> std::io::Result<()> {
match self {
Tcp(ref t) => t.set_nonblocking(nonblocking),
#[cfg(feature = "imap_backend")]
Tls(ref t) => t.get_ref().set_nonblocking(nonblocking),
Fd(fd) => {
//FIXME TODO Review
nix::fcntl::fcntl(
*fd,
nix::fcntl::FcntlArg::F_SETFL(if nonblocking {
nix::fcntl::OFlag::O_NONBLOCK
} else {
!nix::fcntl::OFlag::O_NONBLOCK
}),
)
.map_err(|err| {
std::io::Error::from_raw_os_error(err.as_errno().map(|n| n as i32).unwrap_or(0))
})?;
Ok(())
}
}
}
pub fn set_read_timeout(&self, dur: Option<std::time::Duration>) -> std::io::Result<()> {
match self {
Tcp(ref t) => t.set_read_timeout(dur),
#[cfg(feature = "imap_backend")]
Tls(ref t) => t.get_ref().set_read_timeout(dur),
Fd(_) => Ok(()),
}
}
pub fn set_write_timeout(&self, dur: Option<std::time::Duration>) -> std::io::Result<()> {
match self {
Tcp(ref t) => t.set_write_timeout(dur),
#[cfg(feature = "imap_backend")]
Tls(ref t) => t.get_ref().set_write_timeout(dur),
Fd(_) => Ok(()),
}
}
}
impl Drop for Connection {
fn drop(&mut self) {
if let Fd(fd) = self {
let _ = nix::unistd::close(*fd);
}
}
}
impl std::io::Read for Connection {
fn read(&mut self, buf: &mut [u8]) -> std::io::Result<usize> {
match self {
Tcp(ref mut t) => t.read(buf),
#[cfg(feature = "imap_backend")]
Tls(ref mut t) => t.read(buf),
Fd(f) => {
use std::os::unix::io::{FromRawFd, IntoRawFd};
let mut f = unsafe { std::fs::File::from_raw_fd(*f) };
let ret = f.read(buf);
let _ = f.into_raw_fd();
ret
}
}
}
}
impl std::io::Write for Connection {
fn write(&mut self, buf: &[u8]) -> std::io::Result<usize> {
match self {
Tcp(ref mut t) => t.write(buf),
#[cfg(feature = "imap_backend")]
Tls(ref mut t) => t.write(buf),
Fd(f) => {
use std::os::unix::io::{FromRawFd, IntoRawFd};
let mut f = unsafe { std::fs::File::from_raw_fd(*f) };
let ret = f.write(buf);
let _ = f.into_raw_fd();
ret
}
}
}
fn flush(&mut self) -> std::io::Result<()> {
match self {
Tcp(ref mut t) => t.flush(),
#[cfg(feature = "imap_backend")]