Browse Source

mbox: Add different readers for mbox{o,rd,cl,cl2}

master
parent
commit
674073899d
WARNING! Although there is a key with this ID in the database it does not verify this commit! This commit is SUSPICIOUS. GPG Key ID: 73627C2F690DF710
4 changed files with 507 additions and 91 deletions
  1. +21
    -0
      meli.conf.5
  2. +412
    -91
      melib/src/backends/mbox.rs
  3. +4
    -0
      testing/Cargo.toml
  4. +70
    -0
      testing/src/mboxparse.rs

+ 21
- 0
meli.conf.5 View File

@@ -236,6 +236,27 @@ example:
.\" default value
.Pq Em false
.El
.Sh mbox only
mbox specific options are:
.Bl -tag -width 36n
.It Ic prefer_mbox_type Ar String
(optional) prefer specific mbox format reader for each message. Default is mboxcl2 format. If the preferred format fails, the message is retried with mboxrd and then if it fails again there's a recover attempt, which discards the invalid message.
Valid values are:
.Bl -bullet -compact
.It
.Ar auto
.It
.Ar mboxo
.It
.Ar mboxrd
.It
.Ar mboxcl
.It
.Ar mboxcl2
.El
.\" default value
.Pq Em auto
.El
.Sh mailboxes
.Bl -tag -width 36n
.It Ic alias Ar String


+ 412
- 91
melib/src/backends/mbox.rs View File

@@ -38,7 +38,11 @@ use crate::get_path_hash;
use crate::shellexpand::ShellExpandTrait;
use libc;
use memmap::{Mmap, Protection};
use nom::bytes::complete::tag;
use nom::character::complete::digit1;
use nom::combinator::map_res;
use nom::{self, error::ErrorKind, IResult};

extern crate notify;
use self::notify::{watcher, DebouncedEvent, RecursiveMode, Watcher};
use std::collections::hash_map::{DefaultHasher, HashMap};
@@ -48,6 +52,7 @@ use std::io::BufReader;
use std::io::Read;
use std::os::unix::io::AsRawFd;
use std::path::{Path, PathBuf};
use std::str::FromStr;
use std::sync::mpsc::channel;
use std::sync::{Arc, Mutex, RwLock};

@@ -269,118 +274,383 @@ impl BackendOp for MboxOp {
}
}

pub fn mbox_parse(
index: Arc<Mutex<HashMap<EnvelopeHash, (Offset, Length)>>>,
input: &[u8],
file_offset: usize,
) -> IResult<&[u8], Vec<Envelope>> {
if input.is_empty() {
return Err(nom::Err::Error((input, ErrorKind::Tag)));
#[derive(Debug, Clone, Copy)]
pub enum MboxReader {
MboxO,
MboxRd,
MboxCl,
MboxCl2,
}

impl Default for MboxReader {
fn default() -> Self {
Self::MboxCl2
}
let mut input = input;
let mut offset = 0;
let mut index = index.lock().unwrap();
let mut envelopes = Vec::with_capacity(32);
while !input.is_empty() {
let next_offset: Option<(usize, usize)> = input
.find(b"\n\nFrom ")
.and_then(|end| input.find(b"\n").and_then(|start| Some((start + 1, end))));

if let Some((start, len)) = next_offset {
match Envelope::from_bytes(&input[start..len], None) {
Ok(mut env) => {
let mut flags = Flag::empty();
if env.other_headers().contains_key("Status") {
if env.other_headers()["Status"].contains("F") {
flags.set(Flag::FLAGGED, true);
}
if env.other_headers()["Status"].contains("A") {
flags.set(Flag::REPLIED, true);
}
if env.other_headers()["Status"].contains("R") {
flags.set(Flag::SEEN, true);
}
if env.other_headers()["Status"].contains("D") {
flags.set(Flag::TRASHED, true);
}
}

macro_rules! find_From__line {
($input:expr) => {{
//debug!("find_From__line invocation");
let input = $input;
let mut ptr = 0;
let mut found = None;
while ptr < input.len() {
// Find next From_ candidate line.
const TAG: &'static [u8] = b"\n\nFrom ";
if let Some(end) = input[ptr..].find(TAG) {
// This candidate is a valid From_ if it ends in a new line and the next line is
// a header.
if let Some(line_end) = input[ptr + end + TAG.len()..].find(b"\n") {
if crate::email::parser::headers::header(
&input[ptr + end + TAG.len() + line_end + 1..],
)
.is_ok()
{
found = Some(ptr + end);
break;
} else {
/* Ignore invalid From_ line. */
ptr += end + TAG.len() + line_end;
}
if env.other_headers().contains_key("X-Status") {
if env.other_headers()["X-Status"].contains("F") {
flags.set(Flag::FLAGGED, true);
}
if env.other_headers()["X-Status"].contains("A") {
flags.set(Flag::REPLIED, true);
} else {
/* Ignore invalid From_ line. */
ptr += end + TAG.len();
}
} else {
found = Some(input.len());
break;
}
}
found
}};
}

impl MboxReader {
fn parse<'i>(&self, input: &'i [u8]) -> IResult<&'i [u8], Envelope> {
let orig_input = input;
let mut input = input;
match self {
Self::MboxO => {
let next_offset: Option<(usize, usize)> = find_From__line!(input)
.and_then(|end| input.find(b"\n").and_then(|start| Some((start + 1, end))));

if let Some((start, len)) = next_offset {
match Envelope::from_bytes(&input[start..len], None) {
Ok(mut env) => {
let mut flags = Flag::empty();
if env.other_headers().contains_key("Status") {
if env.other_headers()["Status"].contains("F") {
flags.set(Flag::FLAGGED, true);
}
if env.other_headers()["Status"].contains("A") {
flags.set(Flag::REPLIED, true);
}
if env.other_headers()["Status"].contains("R") {
flags.set(Flag::SEEN, true);
}
if env.other_headers()["Status"].contains("D") {
flags.set(Flag::TRASHED, true);
}
}
if env.other_headers().contains_key("X-Status") {
if env.other_headers()["X-Status"].contains("F") {
flags.set(Flag::FLAGGED, true);
}
if env.other_headers()["X-Status"].contains("A") {
flags.set(Flag::REPLIED, true);
}
if env.other_headers()["X-Status"].contains("R") {
flags.set(Flag::SEEN, true);
}
if env.other_headers()["X-Status"].contains("D") {
flags.set(Flag::TRASHED, true);
}
if env.other_headers()["X-Status"].contains("T") {
flags.set(Flag::DRAFT, true);
}
}
env.set_flags(flags);
if len == input.len() {
Ok((&[], env))
} else {
input = &input[len + 2..];
Ok((input, env))
}
}
if env.other_headers()["X-Status"].contains("R") {
flags.set(Flag::SEEN, true);
Err(err) => {
debug!("Could not parse mail {:?}", err);
Err(nom::Err::Error((input, ErrorKind::Tag)))
}
if env.other_headers()["X-Status"].contains("D") {
flags.set(Flag::TRASHED, true);
}
} else {
let start: Offset = input.find(b"\n").map(|v| v + 1).unwrap_or(0);
match Envelope::from_bytes(&input[start..], None) {
Ok(mut env) => {
let mut flags = Flag::empty();
if env.other_headers().contains_key("Status") {
if env.other_headers()["Status"].contains("F") {
flags.set(Flag::FLAGGED, true);
}
if env.other_headers()["Status"].contains("A") {
flags.set(Flag::REPLIED, true);
}
if env.other_headers()["Status"].contains("R") {
flags.set(Flag::SEEN, true);
}
if env.other_headers()["Status"].contains("D") {
flags.set(Flag::TRASHED, true);
}
}
if env.other_headers().contains_key("X-Status") {
if env.other_headers()["X-Status"].contains("F") {
flags.set(Flag::FLAGGED, true);
}
if env.other_headers()["X-Status"].contains("A") {
flags.set(Flag::REPLIED, true);
}
if env.other_headers()["X-Status"].contains("R") {
flags.set(Flag::SEEN, true);
}
if env.other_headers()["X-Status"].contains("D") {
flags.set(Flag::TRASHED, true);
}
if env.other_headers()["X-Status"].contains("T") {
flags.set(Flag::DRAFT, true);
}
}
env.set_flags(flags);
Ok((&[], env))
}
if env.other_headers()["X-Status"].contains("T") {
flags.set(Flag::DRAFT, true);
Err(err) => {
debug!("Could not parse mail at {:?}", err);
Err(nom::Err::Error((input, ErrorKind::Tag)))
}
}
env.set_flags(flags);
index.insert(env.hash(), (offset + file_offset + start, len - start));
envelopes.push(env);
}
Err(_) => {
debug!("Could not parse mail at byte offset {}", offset);
}
}
offset += len + 2;
input = &input[len + 2..];
} else {
let start: Offset = input.find(b"\n").map(|v| v + 1).unwrap_or(0);
match Envelope::from_bytes(&input[start..], None) {
Ok(mut env) => {
let mut flags = Flag::empty();
if env.other_headers().contains_key("Status") {
if env.other_headers()["Status"].contains("F") {
flags.set(Flag::FLAGGED, true);
}
if env.other_headers()["Status"].contains("A") {
flags.set(Flag::REPLIED, true);
}
if env.other_headers()["Status"].contains("R") {
flags.set(Flag::SEEN, true);
Self::MboxRd => {
let next_offset: Option<(usize, usize)> = find_From__line!(input)
.and_then(|end| input.find(b"\n").and_then(|start| Some((start + 1, end))));

if let Some((start, len)) = next_offset {
match Envelope::from_bytes(&input[start..len], None) {
Ok(mut env) => {
let mut flags = Flag::empty();
if env.other_headers().contains_key("Status") {
if env.other_headers()["Status"].contains("F") {
flags.set(Flag::FLAGGED, true);
}
if env.other_headers()["Status"].contains("A") {
flags.set(Flag::REPLIED, true);
}
if env.other_headers()["Status"].contains("R") {
flags.set(Flag::SEEN, true);
}
if env.other_headers()["Status"].contains("D") {
flags.set(Flag::TRASHED, true);
}
}
if env.other_headers().contains_key("X-Status") {
if env.other_headers()["X-Status"].contains("F") {
flags.set(Flag::FLAGGED, true);
}
if env.other_headers()["X-Status"].contains("A") {
flags.set(Flag::REPLIED, true);
}
if env.other_headers()["X-Status"].contains("R") {
flags.set(Flag::SEEN, true);
}
if env.other_headers()["X-Status"].contains("D") {
flags.set(Flag::TRASHED, true);
}
if env.other_headers()["X-Status"].contains("T") {
flags.set(Flag::DRAFT, true);
}
}
env.set_flags(flags);
if len == input.len() {
Ok((&[], env))
} else {
input = &input[len + 2..];
Ok((input, env))
}
}
if env.other_headers()["Status"].contains("D") {
flags.set(Flag::TRASHED, true);
Err(err) => {
debug!("Could not parse mail {:?}", err);
Err(nom::Err::Error((input, ErrorKind::Tag)))
}
}
if env.other_headers().contains_key("X-Status") {
if env.other_headers()["X-Status"].contains("F") {
flags.set(Flag::FLAGGED, true);
} else {
let start: Offset = input.find(b"\n").map(|v| v + 1).unwrap_or(0);
match Envelope::from_bytes(&input[start..], None) {
Ok(mut env) => {
let mut flags = Flag::empty();
if env.other_headers().contains_key("Status") {
if env.other_headers()["Status"].contains("F") {
flags.set(Flag::FLAGGED, true);
}
if env.other_headers()["Status"].contains("A") {
flags.set(Flag::REPLIED, true);
}
if env.other_headers()["Status"].contains("R") {
flags.set(Flag::SEEN, true);
}
if env.other_headers()["Status"].contains("D") {
flags.set(Flag::TRASHED, true);
}
}
if env.other_headers().contains_key("X-Status") {
if env.other_headers()["X-Status"].contains("F") {
flags.set(Flag::FLAGGED, true);
}
if env.other_headers()["X-Status"].contains("A") {
flags.set(Flag::REPLIED, true);
}
if env.other_headers()["X-Status"].contains("R") {
flags.set(Flag::SEEN, true);
}
if env.other_headers()["X-Status"].contains("D") {
flags.set(Flag::TRASHED, true);
}
if env.other_headers()["X-Status"].contains("T") {
flags.set(Flag::DRAFT, true);
}
}
env.set_flags(flags);
Ok((&[], env))
}
if env.other_headers()["X-Status"].contains("A") {
flags.set(Flag::REPLIED, true);
Err(err) => {
debug!("Could not parse mail {:?}", err);
Err(nom::Err::Error((input, ErrorKind::Tag)))
}
if env.other_headers()["X-Status"].contains("R") {
flags.set(Flag::SEEN, true);
}
}
}
Self::MboxCl | Self::MboxCl2 => {
let start: Offset = input.find(b"\n").map(|v| v + 1).unwrap_or(0);
input = &input[start..];
let headers_end: usize = input.find(b"\n\n").unwrap_or(input.len());
let content_length = if let Some(v) = input[..headers_end].find(b"Content-Length: ")
{
v
} else {
// Is not MboxCl{,2}
return Self::MboxRd.parse(orig_input);
};
let (_input, _) = if let Ok(s) = tag::<_, &[u8], (&[u8], nom::error::ErrorKind)>(
"Content-Length:",
)(&input[content_length..])
{
s
} else {
return Self::MboxRd.parse(orig_input);
};
let (_input, bytes) = if let Ok(s) =
map_res::<&[u8], _, _, (&[u8], nom::error::ErrorKind), _, _, _>(
digit1,
|s: &[u8]| String::from_utf8_lossy(s).parse::<usize>(),
)(_input.ltrim())
{
s
} else {
return Self::MboxRd.parse(orig_input);
};

match Envelope::from_bytes(&input[..headers_end + bytes], None) {
Ok(mut env) => {
let mut flags = Flag::empty();
if env.other_headers().contains_key("Status") {
if env.other_headers()["Status"].contains("F") {
flags.set(Flag::FLAGGED, true);
}
if env.other_headers()["Status"].contains("A") {
flags.set(Flag::REPLIED, true);
}
if env.other_headers()["Status"].contains("R") {
flags.set(Flag::SEEN, true);
}
if env.other_headers()["Status"].contains("D") {
flags.set(Flag::TRASHED, true);
}
}
if env.other_headers()["X-Status"].contains("D") {
flags.set(Flag::TRASHED, true);
if env.other_headers().contains_key("X-Status") {
if env.other_headers()["X-Status"].contains("F") {
flags.set(Flag::FLAGGED, true);
}
if env.other_headers()["X-Status"].contains("A") {
flags.set(Flag::REPLIED, true);
}
if env.other_headers()["X-Status"].contains("R") {
flags.set(Flag::SEEN, true);
}
if env.other_headers()["X-Status"].contains("D") {
flags.set(Flag::TRASHED, true);
}
if env.other_headers()["X-Status"].contains("T") {
flags.set(Flag::DRAFT, true);
}
}
if env.other_headers()["X-Status"].contains("T") {
flags.set(Flag::DRAFT, true);
env.set_flags(flags);
if headers_end + 2 + bytes >= input.len() {
Ok((&[], env))
} else {
input = &input[headers_end + 3 + bytes..];
Ok((input, env))
}
}
env.set_flags(flags);
index.insert(
env.hash(),
(offset + file_offset + start, input.len() - start),
);
envelopes.push(env);
}
Err(_) => {
debug!("Could not parse mail at byte offset {}", offset);
Err(_err) => {
return Self::MboxRd.parse(orig_input);
}
}
}
break;
}
}
}

pub fn mbox_parse(
index: Arc<Mutex<HashMap<EnvelopeHash, (Offset, Length)>>>,
input: &[u8],
file_offset: usize,
reader: Option<MboxReader>,
) -> IResult<&[u8], Vec<Envelope>> {
if input.is_empty() {
return Err(nom::Err::Error((input, ErrorKind::Tag)));
}
let mut offset = 0;
let mut index = index.lock().unwrap();
let mut envelopes = Vec::with_capacity(32);

let reader = reader.unwrap_or(MboxReader::MboxCl2);
while !input[offset + file_offset..].is_empty() {
let (next_input, env) = match reader.parse(&input[offset + file_offset..]) {
Ok(v) => v,
Err(e) => {
// Try to recover from this error by finding a new candidate From_ line
if let Some(next_offset) = find_From__line!(&input[offset + file_offset..]) {
offset += next_offset;
if offset != input.len() {
// If we are not at EOF, we will be at this point
// "\n\nFrom ..."
// ↑
// So, skip those two newlines.
offset += 2;
}
} else {
Err(e)?;
}
continue;
}
};
let start: Offset = input[offset + file_offset..]
.find(b"\n")
.map(|v| v + 1)
.unwrap_or(0);
let len = input.len() - next_input.len() - offset - file_offset - start;
index.insert(env.hash(), (offset + file_offset + start, len));
offset += len + start;

envelopes.push(env);
}
return Ok((&[], envelopes));
}

@@ -391,12 +661,14 @@ pub struct MboxType {
path: PathBuf,
index: Arc<Mutex<HashMap<EnvelopeHash, (Offset, Length)>>>,
mailboxes: Arc<Mutex<HashMap<MailboxHash, MboxMailbox>>>,
prefer_mbox_type: Option<MboxReader>,
}

impl MailBackend for MboxType {
fn is_online(&self) -> Result<()> {
Ok(())
}

fn get(&mut self, mailbox: &Mailbox) -> Async<Result<Vec<Envelope>>> {
let mut w = AsyncBuilder::new();
let handle = {
@@ -405,6 +677,7 @@ impl MailBackend for MboxType {
let mailbox_path = mailbox.path().to_string();
let mailbox_hash = mailbox.hash();
let mailboxes = self.mailboxes.clone();
let prefer_mbox_type = self.prefer_mbox_type.clone();
let closure = move |_work_context| {
let tx = tx.clone();
let index = index.clone();
@@ -429,7 +702,7 @@ impl MailBackend for MboxType {
return;
};

let payload = mbox_parse(index, contents.as_slice(), 0)
let payload = mbox_parse(index, contents.as_slice(), 0, prefer_mbox_type)
.map_err(|e| MeliError::from(e))
.map(|(_, v)| v);
{
@@ -470,6 +743,7 @@ impl MailBackend for MboxType {
};
let index = self.index.clone();
let mailboxes = self.mailboxes.clone();
let prefer_mbox_type = self.prefer_mbox_type.clone();
let handle = std::thread::Builder::new()
.name(format!("watching {}", self.account_name,))
.spawn(move || {
@@ -520,6 +794,7 @@ impl MailBackend for MboxType {
index.clone(),
&contents[mailbox_lock[&mailbox_hash].content.len()..],
mailbox_lock[&mailbox_hash].content.len(),
prefer_mbox_type,
) {
for env in envelopes {
sender.send(RefreshEvent {
@@ -621,6 +896,34 @@ impl MailBackend for MboxType {
}
}

macro_rules! get_conf_val {
($s:ident[$var:literal]) => {
$s.extra.get($var).ok_or_else(|| {
MeliError::new(format!(
"Configuration error ({}): mbox backend requires the field `{}` set",
$s.name.as_str(),
$var
))
})
};
($s:ident[$var:literal], $default:expr) => {
$s.extra
.get($var)
.map(|v| {
<_>::from_str(v).map_err(|e| {
MeliError::new(format!(
"Configuration error ({}): Invalid value for field `{}`: {}\n{}",
$s.name.as_str(),
$var,
v,
e
))
})
})
.unwrap_or_else(|| Ok($default))
};
}

impl MboxType {
pub fn new(
s: &AccountSettings,
@@ -634,9 +937,24 @@ impl MboxType {
s.name()
)));
}
let prefer_mbox_type: String = get_conf_val!(s["prefer_mbox_type"], "auto".to_string())?;
let ret = MboxType {
account_name: s.name().to_string(),
path,
prefer_mbox_type: match prefer_mbox_type.as_str() {
"auto" => None,
"mboxo" => Some(MboxReader::MboxO),
"mboxrd" => Some(MboxReader::MboxRd),
"mboxcl" => Some(MboxReader::MboxCl),
"mboxcl2" => Some(MboxReader::MboxCl2),
_ => {
return Err(MeliError::new(format!(
"{} invalid `prefer_mbox_type` value: `{}`",
s.name(),
prefer_mbox_type,
)))
}
},
..Default::default()
};
let name: String = ret
@@ -720,6 +1038,9 @@ impl MboxType {
s.name()
)));
}
let prefer_mbox_type: Result<String> =
get_conf_val!(s["prefer_mbox_type"], "auto".to_string());
prefer_mbox_type?;
Ok(())
}
}

+ 4
- 0
testing/Cargo.toml View File

@@ -10,6 +10,10 @@ name = "emailparse"
path = "src/email_parse.rs"

[[bin]]
name = "mboxparse"
path = "src/mboxparse.rs"

[[bin]]
name = "imapconn"
path = "src/imap_conn.rs"



+ 70
- 0
testing/src/mboxparse.rs View File

@@ -0,0 +1,70 @@
/*
* meli - mboxparse.rs
*
* Copyright 2020 Manos Pitsidianakis
*
* This file is part of meli.
*
* meli is free software: you can redistribute it and/or modify
* it under the terms of the GNU General Public License as published by
* the Free Software Foundation, either version 3 of the License, or
* (at your option) any later version.
*
* meli is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* GNU General Public License for more details.
*
* You should have received a copy of the GNU General Public License
* along with meli. If not, see <http://www.gnu.org/licenses/>.
*/

extern crate melib;
use melib::Result;
use melib::*;

/// Parses e-mail from files and prints the debug information of the parsed `Envelope`
///
/// # Example invocation
/// ```sh
/// ./mboxparse /path/to/mbox"
/// ```

fn main() -> Result<()> {
if std::env::args().len() == 1 {
eprintln!("Usage: ./mboxparse /path/to/mbox");
std::process::exit(1);
}

for i in std::env::args().skip(1) {
println!("Path is {}", i);
let filename = std::path::PathBuf::from(&i);

if filename.exists() && filename.is_file() {
let buffer = std::fs::read_to_string(&filename)
.expect(&format!("Something went wrong reading the file {}", i,));
let res =
melib::backends::mbox::mbox_parse(Default::default(), buffer.as_bytes(), 0, None);
match res {
Ok((_, v)) => {
println!("{} envelopes parsed", v.len());
}
Err(melib::nom::Err::Error(err)) => {
println!(
"Error in parsing {:?}",
unsafe { std::str::from_utf8_unchecked(err.0) }
.chars()
.take(150)
.collect::<String>()
);
}
Err(err) => {
println!("Error in parsing {:?}", err);
}
}
} else {
println!("{} is not a valid file.", i);
}
}
Ok(())
}

Loading…
Cancel
Save