mbox: send envelope payload in chunks

master
Manos Pitsidianakis 2020-07-16 17:59:27 +03:00
parent 15b15854bf
commit d3391e96c0
Signed by: Manos Pitsidianakis
GPG Key ID: 73627C2F690DF710
1 changed files with 92 additions and 10 deletions

View File

@ -636,6 +636,61 @@ pub fn mbox_parse(
Ok((&[], envelopes))
}
struct MessageIterator<'a> {
index: Arc<Mutex<HashMap<EnvelopeHash, (Offset, Length)>>>,
input: &'a [u8],
file_offset: usize,
offset: usize,
reader: Option<MboxReader>,
}
impl<'a> Iterator for MessageIterator<'a> {
type Item = Result<Envelope>;
fn next(&mut self) -> Option<Self::Item> {
if self.input.is_empty() {
return None;
}
let mut index = self.index.lock().unwrap();
let reader = self.reader.unwrap_or(MboxReader::MboxCl2);
while !self.input[self.offset + self.file_offset..].is_empty() {
let (next_input, env) =
match reader.parse(&self.input[self.offset + self.file_offset..]) {
Ok(v) => v,
Err(e) => {
// Try to recover from this error by finding a new candidate From_ line
if let Some(next_offset) =
find_From__line!(&self.input[self.offset + self.file_offset..])
{
self.offset += next_offset;
if self.offset != self.input.len() {
// If we are not at EOF, we will be at this point
// "\n\nFrom ..."
// ↑
// So, skip those two newlines.
self.offset += 2;
}
} else {
self.input = b"";
return Some(Err(e.into()));
}
continue;
}
};
let start: Offset = self.input[self.offset + self.file_offset..]
.find(b"\n")
.map(|v| v + 1)
.unwrap_or(0);
let len = self.input.len() - next_input.len() - self.offset - self.file_offset - start;
index.insert(env.hash(), (self.offset + self.file_offset + start, len));
self.offset += len + start;
return Some(Ok(env));
}
None
}
}
/// Mbox backend
#[derive(Debug, Default)]
pub struct MboxType {
@ -692,25 +747,52 @@ impl MailBackend for MboxType {
};
let mailboxes_lck = mailboxes.lock().unwrap();
let mut mailbox_index_lck = mailbox_index.lock().unwrap();
let index = mailboxes_lck[&mailbox_hash].index.clone();
drop(mailboxes_lck);
let payload = mbox_parse(index, contents.as_slice(), 0, prefer_mbox_type)
.map_err(MeliError::from)
.map(|(_, v)| {
for v in v.iter() {
mailbox_index_lck.insert(v.hash(), mailbox_hash);
let mut message_iter = MessageIterator {
index,
input: &contents.as_slice(),
offset: 0,
file_offset: 0,
reader: prefer_mbox_type,
};
let mut err = None;
loop {
let mut payload = vec![];
'iter_for_loop: for _i in 0..150 {
match message_iter.next() {
Some(Ok(env)) => {
payload.push(env);
}
Some(Err(_err)) => {
debug!(&_err);
err = Some(_err);
}
None => {
break 'iter_for_loop;
}
}
v
});
}
if !payload.is_empty() {
err = None;
} else {
break;
}
let mut mailbox_index_lck = mailbox_index.lock().unwrap();
for env in &payload {
mailbox_index_lck.insert(env.hash(), mailbox_hash);
}
tx.send(AsyncStatus::Payload(Ok(payload))).unwrap();
}
if let Some(err) = err {
tx.send(AsyncStatus::Payload(Err(err))).unwrap();
}
{
let mut mailbox_lock = mailboxes.lock().unwrap();
mailbox_lock
.entry(mailbox_hash)
.and_modify(|f| f.content = contents);
}
tx.send(AsyncStatus::Payload(payload)).unwrap();
tx.send(AsyncStatus::Finished).unwrap();
};
Box::new(closure)