forked from meli/meli
1
Fork 0
duesee/experiment/use_imap_codec
Damian Poddebniak 2023-05-26 22:53:08 +02:00
parent 613de70a93
commit af8fe0b6d8
5 changed files with 255 additions and 75 deletions

49
Cargo.lock generated
View File

@ -2,6 +2,15 @@
# It is not intended for manual editing.
version = 3
[[package]]
name = "abnf-core"
version = "0.5.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "c44e09c43ae1c368fb91a03a566472d0087c26cf7e1b9e8e289c14ede681dd7d"
dependencies = [
"nom",
]
[[package]]
name = "adler"
version = "1.0.2"
@ -185,6 +194,12 @@ version = "0.13.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "904dfeac50f3cdaba28fc6f57fdcddb75f49ed61346676a78c4ffe55877802fd"
[[package]]
name = "base64"
version = "0.21.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "a4a4ddaa51a5bc52a6948f74c06d20aaaddb71924eab79b8c97a8c556e942d6a"
[[package]]
name = "bincode"
version = "1.3.3"
@ -869,6 +884,27 @@ dependencies = [
"unicode-normalization",
]
[[package]]
name = "imap-codec"
version = "0.9.0-dev"
dependencies = [
"abnf-core",
"base64 0.21.0",
"chrono",
"imap-types",
"nom",
]
[[package]]
name = "imap-types"
version = "0.9.0-dev"
dependencies = [
"base64 0.21.0",
"chrono",
"subtle",
"thiserror",
]
[[package]]
name = "indexmap"
version = "1.9.1"
@ -1087,7 +1123,7 @@ version = "0.6.1"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "24d0411d6d3cf6baacae37461dc5b0a32b9c68ae99ddef61bcd88174b8da890a"
dependencies = [
"base64",
"base64 0.13.0",
"either",
"log",
"nom",
@ -1167,7 +1203,7 @@ name = "melib"
version = "0.7.2"
dependencies = [
"async-stream",
"base64",
"base64 0.13.0",
"bincode",
"bitflags",
"data-encoding",
@ -1175,6 +1211,7 @@ dependencies = [
"encoding_rs",
"flate2",
"futures",
"imap-codec",
"indexmap",
"isahc",
"libc",
@ -1712,7 +1749,7 @@ version = "1.0.1"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "0864aeff53f8c05aa08d86e5ef839d3dfcf07aeba2db32f12db0ef716e87bd55"
dependencies = [
"base64",
"base64 0.13.0",
]
[[package]]
@ -1959,6 +1996,12 @@ dependencies = [
"syn",
]
[[package]]
name = "subtle"
version = "2.5.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "81cdd64d312baedb58e21336b31bc043b77e01cc99033ce76ef539f78e965ebc"
[[package]]
name = "svg"
version = "0.10.0"

View File

@ -47,6 +47,8 @@ serde_derive = "1.0.71"
serde_json = { version = "1.0", optional = true, features = ["raw_value",] }
smallvec = { version = "^1.5.0", features = ["serde", ] }
smol = "1.0.0"
# TODO(duesee): Use latest version from crates.io.
imap-codec = { path = "../../imap-codec", optional = true, features = ["ext_enable", "ext_literal", "ext_sasl_ir", "ext_unselect", "ext_condstore_qresync"] }
unicode-segmentation = { version = "1.2.1", default-features = false, optional = true }
uuid = { version = "^1", features = ["serde", "v4", "v5"] }
@ -61,11 +63,11 @@ stderrlog = "^0.5"
default = ["unicode_algorithms", "imap_backend", "maildir_backend", "mbox_backend", "vcard", "sqlite3", "smtp", "deflate_compression"]
debug-tracing = []
deflate_compression = ["flate2", ]
deflate_compression = ["flate2", "imap-codec/ext_compress"]
gpgme = []
http = ["isahc"]
http-static = ["isahc", "isahc/static-curl"]
imap_backend = ["tls"]
imap_backend = ["tls", "imap-codec"]
jmap_backend = ["http", "serde_json"]
maildir_backend = ["notify"]
mbox_backend = ["notify"]

View File

@ -40,6 +40,7 @@ use std::{
collections::{hash_map::DefaultHasher, BTreeSet, HashMap, HashSet},
convert::TryFrom,
hash::Hasher,
num::NonZeroU32,
pin::Pin,
str::FromStr,
sync::{Arc, Mutex},
@ -47,6 +48,11 @@ use std::{
};
use futures::{lock::Mutex as FutureMutex, stream::Stream};
use imap_codec::{
command::{fetch::FetchAttribute, CommandBody, ListMailbox as ImapCodecListMailbox},
core::{AString, NonEmptyVec},
message::{Mailbox as ImapCodecMailbox, Section},
};
use crate::{
backends::{
@ -876,7 +882,7 @@ impl MailBackend for ImapType {
flag_future.await?;
let mut response = Vec::with_capacity(8 * 1024);
let mut conn = connection.lock().await;
conn.send_command("EXPUNGE".as_bytes()).await?;
conn.send_command_imap_codec(CommandBody::Expunge).await?;
conn.read_response(&mut response, RequiredResponses::empty())
.await?;
debug!("EXPUNGE response: {}", &String::from_utf8_lossy(&response));
@ -1003,6 +1009,9 @@ impl MailBackend for ImapType {
)));
}
}
let mailbox = ImapCodecMailbox::try_from(imap_path.as_str()).unwrap();
let mut response = Vec::with_capacity(8 * 1024);
{
let mut conn_lck = connection.lock().await;
@ -1011,7 +1020,9 @@ impl MailBackend for ImapType {
conn_lck.unselect().await?;
if is_subscribed {
conn_lck
.send_command(format!("UNSUBSCRIBE \"{}\"", &imap_path).as_bytes())
.send_command_imap_codec(CommandBody::Unsubscribe {
mailbox: mailbox.clone(),
})
.await?;
conn_lck
.read_response(&mut response, RequiredResponses::empty())
@ -1019,7 +1030,7 @@ impl MailBackend for ImapType {
}
conn_lck
.send_command(debug!(format!("DELETE \"{}\"", &imap_path,)).as_bytes())
.send_command_imap_codec(debug!(CommandBody::Delete { mailbox }))
.await?;
conn_lck
.read_response(&mut response, RequiredResponses::empty())
@ -1048,23 +1059,24 @@ impl MailBackend for ImapType {
let uid_store = self.uid_store.clone();
let connection = self.connection.clone();
Ok(Box::pin(async move {
let command: String;
{
let mailboxes = uid_store.mailboxes.lock().await;
if mailboxes[&mailbox_hash].is_subscribed() == new_val {
return Ok(());
}
command = format!("SUBSCRIBE \"{}\"", mailboxes[&mailbox_hash].imap_path());
let mailboxes = uid_store.mailboxes.lock().await;
if mailboxes[&mailbox_hash].is_subscribed() == new_val {
return Ok(());
}
let mut response = Vec::with_capacity(8 * 1024);
{
let mailbox =
ImapCodecMailbox::try_from(mailboxes[&mailbox_hash].imap_path()).unwrap();
let mut conn_lck = connection.lock().await;
if new_val {
conn_lck.send_command(command.as_bytes()).await?;
conn_lck
.send_command_imap_codec(CommandBody::Subscribe { mailbox })
.await?;
} else {
conn_lck
.send_command(format!("UN{}", command).as_bytes())
.send_command_imap_codec(CommandBody::Unsubscribe { mailbox })
.await?;
}
conn_lck
@ -1395,7 +1407,7 @@ impl ImapType {
let mut res = Vec::with_capacity(8 * 1024);
futures::executor::block_on(timeout(
self.server_conf.timeout,
conn.send_command(b"NOOP"),
conn.send_command_imap_codec(CommandBody::Noop),
))
.unwrap()
.unwrap();
@ -1458,6 +1470,7 @@ impl ImapType {
.iter()
.any(|cap| cap.eq_ignore_ascii_case(b"LIST-STATUS"));
if has_list_status {
// TODO(imap-codec): LIST-STATUS not supported.
conn.send_command(b"LIST \"\" \"*\" RETURN (STATUS (MESSAGES UNSEEN))")
.await?;
conn.read_response(
@ -1466,7 +1479,11 @@ impl ImapType {
)
.await?;
} else {
conn.send_command(b"LIST \"\" \"*\"").await?;
conn.send_command_imap_codec(CommandBody::List {
reference: ImapCodecMailbox::try_from("").unwrap(),
mailbox_wildcard: ImapCodecListMailbox::try_from("*").unwrap(),
})
.await?;
conn.read_response(&mut res, RequiredResponses::LIST_REQUIRED)
.await?;
}
@ -1806,20 +1823,40 @@ async fn fetch_hlpr(state: &mut FetchState) -> Result<Vec<Envelope>> {
.await?;
if max_uid_left > 0 {
debug!("{} max_uid_left= {}", mailbox_hash, max_uid_left);
let attributes = vec![
FetchAttribute::Uid,
FetchAttribute::Flags,
FetchAttribute::Envelope,
FetchAttribute::BodyExt {
section: Some(Section::HeaderFields(
None,
NonEmptyVec::from(AString::try_from("REFERENCES").unwrap()),
)),
partial: None,
peek: true,
},
FetchAttribute::BodyStructure,
];
let command = if max_uid_left == 1 {
"UID FETCH 1 (UID FLAGS ENVELOPE BODY.PEEK[HEADER.FIELDS (REFERENCES)] \
BODYSTRUCTURE)"
.to_string()
CommandBody::fetch(1, attributes, true).unwrap()
} else {
format!(
"UID FETCH {}:{} (UID FLAGS ENVELOPE BODY.PEEK[HEADER.FIELDS \
(REFERENCES)] BODYSTRUCTURE)",
std::cmp::max(max_uid_left.saturating_sub(chunk_size), 1),
max_uid_left
let from = NonZeroU32::try_from(
u32::try_from(std::cmp::max(
max_uid_left.saturating_sub(chunk_size),
1,
))
.unwrap(),
)
.unwrap();
let to =
NonZeroU32::try_from(u32::try_from(max_uid_left).unwrap()).unwrap();
CommandBody::fetch(format!("{}:{}", from, to), attributes, true).unwrap()
};
debug!("sending {:?}", &command);
conn.send_command(command.as_bytes()).await?;
conn.send_command_imap_codec(command).await?;
conn.read_response(&mut response, RequiredResponses::FETCH_REQUIRED)
.await
.chain_err_summary(|| {

View File

@ -33,12 +33,24 @@ use std::{
convert::TryFrom,
future::Future,
iter::FromIterator,
num::NonZeroU32,
pin::Pin,
sync::Arc,
time::{Duration, Instant, SystemTime},
};
use futures::io::{AsyncReadExt, AsyncWriteExt};
use imap_codec::{
codec::{Action, Encode},
command::{
search::SearchKey, status::StatusAttribute, Command, CommandBody, SeqOrUid, Sequence,
SequenceSet,
},
core::{AString, NonEmptyVec},
extensions::{compress::CompressionAlgorithm, enable::CapabilityEnable},
message::{AuthMechanism, Mailbox, Tag},
secret::Secret,
};
use native_tls::TlsConnector;
pub use smol::Async as AsyncWrapper;
@ -272,23 +284,19 @@ impl ImapStream {
timeout: server_conf.timeout,
};
if let ImapProtocol::ManageSieve = server_conf.protocol {
use data_encoding::BASE64;
ret.read_response(&mut res).await?;
ret.send_command(
format!(
"AUTHENTICATE \"PLAIN\" \"{}\"",
BASE64.encode(
format!(
"\0{}\0{}",
&server_conf.server_username, &server_conf.server_password
)
.as_bytes()
)
)
.as_bytes(),
)
let credentials = format!(
"\0{}\0{}",
&server_conf.server_username, &server_conf.server_password
);
ret.send_command_imap_codec(CommandBody::authenticate(
AuthMechanism::Plain,
Some(credentials.as_bytes()),
))
.await?;
ret.read_response(&mut res).await?;
return Ok((Default::default(), ret));
}
@ -298,7 +306,7 @@ impl ImapStream {
message: "Negotiating server capabilities.".into(),
},
);
ret.send_command(b"CAPABILITY").await?;
ret.send_command_imap_codec(CommandBody::Capability).await?;
ret.read_response(&mut res).await?;
let capabilities: std::result::Result<Vec<&[u8]>, _> = res
.split_rn()
@ -370,24 +378,14 @@ impl ImapStream {
.await?;
}
_ => {
ret.send_command(
format!(
r#"LOGIN "{}" {{{}}}"#,
&server_conf
.server_username
.replace('\\', r#"\\"#)
.replace('"', r#"\""#)
.replace('{', r#"\{"#)
.replace('}', r#"\}"#),
&server_conf.server_password.as_bytes().len()
)
.as_bytes(),
)
let username = AString::try_from(server_conf.server_username.as_str())?;
let password = AString::try_from(server_conf.server_password.as_str())?;
ret.send_command_imap_codec(CommandBody::Login {
username,
password: Secret::new(password),
})
.await?;
// wait for "+ Ready for literal data" reply
ret.wait_for_continuation_request().await?;
ret.send_literal(server_conf.server_password.as_bytes())
.await?;
}
}
let tag_start = format!("M{} ", (ret.cmd_id - 1));
@ -425,7 +423,7 @@ impl ImapStream {
/* sending CAPABILITY after LOGIN automatically is an RFC recommendation, so
* check for lazy servers */
drop(capabilities);
ret.send_command(b"CAPABILITY").await?;
ret.send_command_imap_codec(CommandBody::Capability).await?;
ret.read_response(&mut res).await.unwrap();
let capabilities = protocol_parser::capabilities(&res)?.1;
let capabilities = HashSet::from_iter(capabilities.into_iter().map(|s| s.to_vec()));
@ -500,6 +498,55 @@ impl ImapStream {
Ok(())
}
// TODO(duesee): Rename after `send_command` was removed.
pub async fn send_command_imap_codec(&mut self, command_body: CommandBody<'_>) -> Result<()> {
let command = Command {
// We know that this tag is valid.
tag: Tag::unchecked(format!("M{}", self.cmd_id.to_string())),
body: command_body,
};
_ = timeout(
self.timeout,
try_await(async move {
match self.protocol {
ImapProtocol::IMAP { .. } => {
self.cmd_id += 1;
}
ImapProtocol::ManageSieve => {}
}
for action in command.encode() {
match action {
Action::Send { data } => {
self.stream.write_all(&data).await?;
}
Action::RecvContinuationRequest => {
self.wait_for_continuation_request().await?;
}
Action::Unknown => {
return Err("Unexpected message flow".into());
}
}
}
match self.protocol {
ImapProtocol::IMAP { .. } => {
// We do not need to worry about logging sensitive values here because
// imap-codec redacts it. The only way to obtain sensitive data (without
// calling `expose_secret`) is through `Encode::encode{,_detached}`.
debug!("sent: {:?}", command);
}
ImapProtocol::ManageSieve => {}
}
Ok(())
}),
)
.await?;
Ok(())
}
// TODO(duesee): Replace this with `send_command_imap_codec`.
pub async fn send_command(&mut self, command: &[u8]) -> Result<()> {
_ = timeout(
self.timeout,
@ -589,7 +636,7 @@ impl ImapConnection {
if self.stream.is_ok() {
let mut ret = Vec::new();
if let Err(err) = try_await(async {
self.send_command(b"NOOP").await?;
self.send_command_imap_codec(CommandBody::Noop).await?;
self.read_response(&mut ret, RequiredResponses::empty())
.await
})
@ -630,14 +677,26 @@ impl ImapConnection {
/* Upgrade to Condstore */
let mut ret = Vec::new();
if capabilities.contains(&b"ENABLE"[..]) {
self.send_command(b"ENABLE CONDSTORE").await?;
self.send_command_imap_codec(CommandBody::Enable {
capabilities: NonEmptyVec::from(
CapabilityEnable::CondStore,
),
})
.await?;
self.read_response(&mut ret, RequiredResponses::empty())
.await?;
} else {
self.send_command(
b"STATUS INBOX (UIDNEXT UIDVALIDITY UNSEEN MESSAGES HIGHESTMODSEQ)",
)
.await?;
self.send_command_imap_codec(CommandBody::Status {
mailbox: Mailbox::Inbox,
attributes: vec![
StatusAttribute::UidNext,
StatusAttribute::UidValidity,
StatusAttribute::Unseen,
StatusAttribute::Messages,
StatusAttribute::HighestModSeq,
],
})
.await?;
self.read_response(&mut ret, RequiredResponses::empty())
.await?;
}
@ -648,7 +707,10 @@ impl ImapConnection {
#[cfg(feature = "deflate_compression")]
if capabilities.contains(&b"COMPRESS=DEFLATE"[..]) && deflate {
let mut ret = Vec::new();
self.send_command(b"COMPRESS DEFLATE").await?;
self.send_command_imap_codec(CommandBody::compress(
CompressionAlgorithm::Deflate,
))
.await?;
self.read_response(&mut ret, RequiredResponses::empty())
.await?;
match ImapResponse::try_from(ret.as_slice())? {
@ -798,6 +860,21 @@ impl ImapConnection {
Ok(())
}
pub async fn send_command_imap_codec(&mut self, command: CommandBody<'_>) -> Result<()> {
if let Err(err) =
try_await(async { self.stream.as_mut()?.send_command_imap_codec(command).await }).await
{
self.stream = Err(err.clone());
if err.kind.is_network() {
self.connect().await?;
}
Err(err)
} else {
*self.uid_store.is_online.lock().unwrap() = (SystemTime::now(), Ok(()));
Ok(())
}
}
pub async fn send_command(&mut self, command: &[u8]) -> Result<()> {
if let Err(err) =
try_await(async { self.stream.as_mut()?.send_command(command).await }).await
@ -863,7 +940,7 @@ impl ImapConnection {
))
.set_kind(crate::error::ErrorKind::Bug));
}
self.send_command(format!("SELECT \"{}\"", imap_path).as_bytes())
self.send_command_imap_codec(CommandBody::select(imap_path.as_str()).unwrap())
.await?;
self.read_response(ret, RequiredResponses::SELECT_REQUIRED)
.await?;
@ -949,7 +1026,7 @@ impl ImapConnection {
))
.set_kind(crate::error::ErrorKind::Bug));
}
self.send_command(format!("EXAMINE \"{}\"", &imap_path).as_bytes())
self.send_command_imap_codec(CommandBody::examine(imap_path.as_str()).unwrap())
.await?;
self.read_response(ret, RequiredResponses::EXAMINE_REQUIRED)
.await?;
@ -985,7 +1062,7 @@ impl ImapConnection {
.iter()
.any(|cap| cap.eq_ignore_ascii_case(b"UNSELECT"))
{
self.send_command(b"UNSELECT").await?;
self.send_command_imap_codec(CommandBody::Unselect).await?;
self.read_response(&mut response, RequiredResponses::empty())
.await?;
} else {
@ -1000,7 +1077,7 @@ impl ImapConnection {
nonexistent.push('p');
}
}
self.send_command(format!("SELECT \"{}\"", nonexistent).as_bytes())
self.send_command_imap_codec(CommandBody::select(nonexistent).unwrap())
.await?;
self.read_response(&mut response, RequiredResponses::NO_REQUIRED)
.await?;
@ -1026,8 +1103,15 @@ impl ImapConnection {
) -> Result<()> {
debug_assert!(low > 0);
let mut response = Vec::new();
self.send_command(format!("UID SEARCH {}:*", low).as_bytes())
.await?;
self.send_command_imap_codec(CommandBody::search(
None,
SearchKey::Uid(SequenceSet::from(Sequence::Range(
SeqOrUid::Value(NonZeroU32::try_from(u32::try_from(low).unwrap()).unwrap()),
SeqOrUid::Asterisk,
))),
true,
))
.await?;
self.read_response(&mut response, RequiredResponses::SEARCH)
.await?;
let mut msn_index_lck = self.uid_store.msn_index.lock().unwrap();

View File

@ -704,3 +704,17 @@ impl<'a> From<&'a Error> for Error {
kind.clone()
}
}
// ----- imap-codec -----
impl From<imap_codec::core::LiteralError> for Error {
#[inline]
fn from(error: imap_codec::core::LiteralError) -> Error {
Error{
summary: error.to_string().into(),
details: None,
source: Some(Arc::new(error)),
kind: ErrorKind::Configuration,
}
}
}