Replace old pseudo-async code with blocking rust async

master
Manos Pitsidianakis 2020-08-20 17:37:19 +03:00
parent a190805384
commit 3eadaba34e
Signed by: Manos Pitsidianakis
GPG Key ID: 73627C2F690DF710
20 changed files with 1114 additions and 2001 deletions

View File

@ -1,255 +0,0 @@
/*
* meli - async module
*
* Copyright 2017 Manos Pitsidianakis
*
* This file is part of meli.
*
* meli is free software: you can redistribute it and/or modify
* it under the terms of the GNU General Public License as published by
* the Free Software Foundation, either version 3 of the License, or
* (at your option) any later version.
*
* meli is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* GNU General Public License for more details.
*
* You should have received a copy of the GNU General Public License
* along with meli. If not, see <http://www.gnu.org/licenses/>.
*/
/*!
* Primitive Async/Wait implementation.
*
* To create an Async promise, create an AsyncBuilder. Ask for its channel receiver/sender with
* `tx` and `rx` methods to pass them in your worker's closure. Build an `Async<T>` with your
* `JoinHandle<T>`. The thread must communicate with the `Async<T>` object via `AsyncStatus`
* messages.
*
* When `Async<T>` receives `AsyncStatus::Finished` it joins the thread and takes its value which
* can be extracted with `extract`.
*/
use crossbeam::{
bounded,
channel::{Receiver, Sender},
select,
};
use std::fmt;
#[derive(Clone, Debug)]
pub struct WorkContext {
pub new_work: Sender<Work>,
pub set_name: Sender<(std::thread::ThreadId, String)>,
pub set_status: Sender<(std::thread::ThreadId, String)>,
pub finished: Sender<std::thread::ThreadId>,
}
pub struct Work {
priority: u64,
pub is_static: bool,
pub closure: Box<dyn FnOnce(WorkContext) -> () + Send + Sync>,
}
impl Ord for Work {
fn cmp(&self, other: &Work) -> std::cmp::Ordering {
self.priority.cmp(&other.priority)
}
}
impl PartialOrd for Work {
fn partial_cmp(&self, other: &Work) -> Option<std::cmp::Ordering> {
Some(self.priority.cmp(&other.priority))
}
}
impl PartialEq for Work {
fn eq(&self, other: &Work) -> bool {
self.priority == other.priority
}
}
impl Eq for Work {}
impl Work {
pub fn compute(self, work_context: WorkContext) {
(self.closure)(work_context);
}
}
impl fmt::Debug for Work {
fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
write!(f, "Work object")
}
}
/// Messages to pass between `Async<T>` owner and its worker thread.
#[derive(Clone)]
pub enum AsyncStatus<T> {
NoUpdate,
Payload(T),
Finished,
///The number may hold whatever meaning the user chooses.
ProgressReport(usize),
}
impl<T> fmt::Debug for AsyncStatus<T> {
fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
match self {
AsyncStatus::NoUpdate => write!(f, "AsyncStatus<T>::NoUpdate"),
AsyncStatus::Payload(_) => write!(f, "AsyncStatus<T>::Payload(_)"),
AsyncStatus::Finished => write!(f, "AsyncStatus<T>::Finished"),
AsyncStatus::ProgressReport(u) => write!(f, "AsyncStatus<T>::ProgressReport({})", u),
}
}
}
/// A builder object for `Async<T>`
#[derive(Debug, Clone)]
pub struct AsyncBuilder<T: Send + Sync> {
tx: Sender<AsyncStatus<T>>,
rx: Receiver<AsyncStatus<T>>,
priority: u64,
is_static: bool,
}
#[derive(Debug)]
pub struct Async<T: Send + Sync> {
work: Option<Work>,
active: bool,
tx: Sender<AsyncStatus<T>>,
rx: Receiver<AsyncStatus<T>>,
}
impl<T: Send + Sync> Default for AsyncBuilder<T> {
fn default() -> Self {
AsyncBuilder::<T>::new()
}
}
impl<T> AsyncBuilder<T>
where
T: Send + Sync,
{
pub fn new() -> Self {
let (sender, receiver) = bounded(8 * ::std::mem::size_of::<AsyncStatus<T>>());
AsyncBuilder {
tx: sender,
rx: receiver,
priority: 0,
is_static: false,
}
}
/// Returns the sender object of the promise's channel.
pub fn tx(&mut self) -> Sender<AsyncStatus<T>> {
self.tx.clone()
}
/// Returns the receiver object of the promise's channel.
pub fn rx(&mut self) -> Receiver<AsyncStatus<T>> {
self.rx.clone()
}
pub fn set_priority(&mut self, new_val: u64) -> &mut Self {
self.priority = new_val;
self
}
pub fn set_is_static(&mut self, new_val: bool) -> &mut Self {
self.is_static = new_val;
self
}
/// Returns an `Async<T>` object that contains a `Thread` join handle that returns a `T`
pub fn build(self, work: Box<dyn FnOnce(WorkContext) -> () + Send + Sync>) -> Async<T> {
Async {
work: Some(Work {
priority: self.priority,
is_static: self.is_static,
closure: work,
}),
tx: self.tx,
rx: self.rx,
active: false,
}
}
}
impl<T> Async<T>
where
T: Send + Sync,
{
pub fn work(&mut self) -> Option<Work> {
if !self.active {
self.active = true;
self.work.take()
} else {
None
}
}
/// Returns the sender object of the promise's channel.
pub fn tx(&mut self) -> Sender<AsyncStatus<T>> {
self.tx.clone()
}
/// Returns the receiver object of the promise's channel.
pub fn rx(&mut self) -> Receiver<AsyncStatus<T>> {
self.rx.clone()
}
/// Polls worker thread and returns result.
pub fn poll_block(&mut self) -> Result<AsyncStatus<T>, ()> {
if !self.active {
return Ok(AsyncStatus::Finished);
}
let rx = &self.rx;
select! {
recv(rx) -> r => {
match r {
Ok(p @ AsyncStatus::Payload(_)) => {
Ok(p)
},
Ok(f @ AsyncStatus::Finished) => {
self.active = false;
Ok(f)
},
Ok(a) => {
Ok(a)
}
Err(_) => {
Err(())
},
}
},
}
}
/// Polls worker thread and returns result.
pub fn poll(&mut self) -> Result<AsyncStatus<T>, ()> {
if !self.active {
return Ok(AsyncStatus::Finished);
}
let rx = &self.rx;
select! {
default => {
Ok(AsyncStatus::NoUpdate)
},
recv(rx) -> r => {
match r {
Ok(p @ AsyncStatus::Payload(_)) => {
Ok(p)
},
Ok(f @ AsyncStatus::Finished) => {
self.active = false;
Ok(f)
},
Ok(a) => {
Ok(a)
}
Err(_) => {
Err(())
},
}
},
}
}
}

View File

@ -49,7 +49,6 @@ pub mod mbox;
pub use self::imap::ImapType;
#[cfg(feature = "imap_backend")]
pub use self::nntp::NntpType;
use crate::async_workers::*;
use crate::conf::AccountSettings;
use crate::error::{MeliError, Result};
@ -304,31 +303,23 @@ pub type ResultFuture<T> = Result<Pin<Box<dyn Future<Output = Result<T>> + Send
pub trait MailBackend: ::std::fmt::Debug + Send + Sync {
fn capabilities(&self) -> MailBackendCapabilities;
fn is_online(&self) -> Result<()> {
fn is_online(&self) -> ResultFuture<()> {
Err(MeliError::new("Unimplemented."))
}
fn is_online_async(&self) -> ResultFuture<()> {
Err(MeliError::new("Unimplemented."))
}
fn fetch(&mut self, mailbox_hash: MailboxHash) -> Result<Async<Result<Vec<Envelope>>>>;
fn fetch_async(
//fn fetch(&mut self, mailbox_hash: MailboxHash) -> Result<Async<Result<Vec<Envelope>>>>;
fn fetch(
&mut self,
_mailbox_hash: MailboxHash,
) -> Result<Pin<Box<dyn Stream<Item = Result<Vec<Envelope>>> + Send + 'static>>> {
Err(MeliError::new("Unimplemented."))
}
fn refresh(&mut self, _mailbox_hash: MailboxHash) -> Result<Async<()>> {
fn refresh(&mut self, _mailbox_hash: MailboxHash) -> ResultFuture<()> {
Err(MeliError::new("Unimplemented."))
}
fn refresh_async(&mut self, _mailbox_hash: MailboxHash) -> ResultFuture<()> {
fn watch(&self) -> ResultFuture<()> {
Err(MeliError::new("Unimplemented."))
}
fn watch(&self, work_context: WorkContext) -> Result<std::thread::ThreadId>;
fn watch_async(&self) -> ResultFuture<()> {
Err(MeliError::new("Unimplemented."))
}
fn mailboxes(&self) -> Result<HashMap<MailboxHash, Mailbox>>;
fn mailboxes_async(&self) -> ResultFuture<HashMap<MailboxHash, Mailbox>> {
fn mailboxes(&self) -> ResultFuture<HashMap<MailboxHash, Mailbox>> {
Err(MeliError::new("Unimplemented."))
}
fn operation(&self, hash: EnvelopeHash) -> Result<Box<dyn BackendOp>>;

View File

@ -36,11 +36,11 @@ mod cache;
pub mod managesieve;
mod untagged;
use crate::async_workers::{Async, WorkContext};
use crate::backends::{
RefreshEventKind::{self, *},
*,
};
use crate::conf::AccountSettings;
use crate::connections::timeout;
use crate::email::*;
@ -260,7 +260,7 @@ impl MailBackend for ImapType {
}
}
fn fetch_async(
fn fetch(
&mut self,
mailbox_hash: MailboxHash,
) -> Result<Pin<Box<dyn Stream<Item = Result<Vec<Envelope>>> + Send + 'static>>> {
@ -289,7 +289,7 @@ impl MailBackend for ImapType {
}))
}
fn refresh_async(&mut self, mailbox_hash: MailboxHash) -> ResultFuture<()> {
fn refresh(&mut self, mailbox_hash: MailboxHash) -> ResultFuture<()> {
let main_conn = self.connection.clone();
let uid_store = self.uid_store.clone();
Ok(Box::pin(async move {
@ -304,7 +304,7 @@ impl MailBackend for ImapType {
}))
}
fn mailboxes_async(&self) -> ResultFuture<HashMap<MailboxHash, Mailbox>> {
fn mailboxes(&self) -> ResultFuture<HashMap<MailboxHash, Mailbox>> {
let uid_store = self.uid_store.clone();
let connection = self.connection.clone();
Ok(Box::pin(async move {
@ -356,12 +356,12 @@ impl MailBackend for ImapType {
}))
}
fn is_online_async(&self) -> ResultFuture<()> {
fn is_online(&self) -> ResultFuture<()> {
let connection = self.connection.clone();
Ok(Box::pin(async move {
match timeout(std::time::Duration::from_secs(3), connection.lock()).await {
Ok(mut conn) => {
debug!("is_online_async");
debug!("is_online");
match debug!(timeout(std::time::Duration::from_secs(3), conn.connect()).await) {
Ok(Ok(())) => Ok(()),
Err(err) | Ok(Err(err)) => {
@ -375,20 +375,7 @@ impl MailBackend for ImapType {
}))
}
fn fetch(&mut self, _mailbox_hash: MailboxHash) -> Result<Async<Result<Vec<Envelope>>>> {
Err(MeliError::new("Unimplemented."))
}
fn refresh(&mut self, _mailbox_hash: MailboxHash) -> Result<Async<()>> {
Err(MeliError::new("Unimplemented."))
}
fn watch(&self, _work_context: WorkContext) -> Result<std::thread::ThreadId> {
Err(MeliError::new("Unimplemented."))
}
fn watch_async(&self) -> ResultFuture<()> {
debug!("watch_async called");
fn watch(&self) -> ResultFuture<()> {
let conn = ImapConnection::new_connection(&self.server_conf, self.uid_store.clone());
let main_conn = self.connection.clone();
let uid_store = self.uid_store.clone();
@ -417,15 +404,11 @@ impl MailBackend for ImapType {
} else {
poll_with_examine(kit).await?;
}
debug!("watch_async future returning");
debug!("watch future returning");
Ok(())
}))
}
fn mailboxes(&self) -> Result<HashMap<MailboxHash, Mailbox>> {
Err(MeliError::new("Unimplemented."))
}
fn operation(&self, hash: EnvelopeHash) -> Result<Box<dyn BackendOp>> {
let (uid, mailbox_hash) = if let Some(v) =
self.uid_store.hash_index.lock().unwrap().get(&hash)
@ -748,7 +731,7 @@ impl MailBackend for ImapType {
) -> ResultFuture<(MailboxHash, HashMap<MailboxHash, Mailbox>)> {
let uid_store = self.uid_store.clone();
let connection = self.connection.clone();
let new_mailbox_fut = self.mailboxes_async();
let new_mailbox_fut = self.mailboxes();
Ok(Box::pin(async move {
/* Must transform path to something the IMAP server will accept
*
@ -819,7 +802,7 @@ impl MailBackend for ImapType {
) -> ResultFuture<HashMap<MailboxHash, Mailbox>> {
let uid_store = self.uid_store.clone();
let connection = self.connection.clone();
let new_mailbox_fut = self.mailboxes_async();
let new_mailbox_fut = self.mailboxes();
Ok(Box::pin(async move {
let imap_path: String;
let no_select: bool;
@ -923,7 +906,7 @@ impl MailBackend for ImapType {
) -> ResultFuture<Mailbox> {
let uid_store = self.uid_store.clone();
let connection = self.connection.clone();
let new_mailbox_fut = self.mailboxes_async();
let new_mailbox_fut = self.mailboxes();
Ok(Box::pin(async move {
let command: String;
let mut response = String::with_capacity(8 * 1024);

View File

@ -19,7 +19,6 @@
* along with meli. If not, see <http://www.gnu.org/licenses/>.
*/
use crate::async_workers::{Async, WorkContext};
use crate::backends::*;
use crate::conf::AccountSettings;
use crate::email::*;
@ -207,7 +206,7 @@ impl MailBackend for JmapType {
CAPABILITIES
}
fn is_online_async(&self) -> ResultFuture<()> {
fn is_online(&self) -> ResultFuture<()> {
let online = self.online.clone();
Ok(Box::pin(async move {
//match timeout(std::time::Duration::from_secs(3), connection.lock()).await {
@ -221,7 +220,7 @@ impl MailBackend for JmapType {
}))
}
fn fetch_async(
fn fetch(
&mut self,
mailbox_hash: MailboxHash,
) -> Result<Pin<Box<dyn Stream<Item = Result<Vec<Envelope>>> + Send + 'static>>> {
@ -243,13 +242,13 @@ impl MailBackend for JmapType {
}))
}
fn watch_async(&self) -> ResultFuture<()> {
fn watch(&self) -> ResultFuture<()> {
Ok(Box::pin(async move {
Err(MeliError::from("JMAP watch for updates is unimplemented"))
}))
}
fn mailboxes_async(&self) -> ResultFuture<HashMap<MailboxHash, Mailbox>> {
fn mailboxes(&self) -> ResultFuture<HashMap<MailboxHash, Mailbox>> {
let mailboxes = self.mailboxes.clone();
let connection = self.connection.clone();
Ok(Box::pin(async move {
@ -354,18 +353,6 @@ impl MailBackend for JmapType {
}))
}
fn fetch(&mut self, _mailbox_hash: MailboxHash) -> Result<Async<Result<Vec<Envelope>>>> {
Err(MeliError::new("Unimplemented."))
}
fn watch(&self, _work_context: WorkContext) -> Result<std::thread::ThreadId> {
Err(MeliError::new("Unimplemented."))
}
fn mailboxes(&self) -> Result<HashMap<MailboxHash, Mailbox>> {
Err(MeliError::new("Unimplemented."))
}
fn rename_mailbox(
&mut self,
_mailbox_hash: MailboxHash,

View File

@ -20,7 +20,6 @@
*/
use super::{MaildirMailbox, MaildirOp, MaildirPathTrait};
use crate::async_workers::*;
use crate::backends::{RefreshEventKind::*, *};
use crate::conf::AccountSettings;
use crate::email::{Envelope, EnvelopeHash, Flag};
@ -40,10 +39,8 @@ use std::io::{self, Read, Write};
use std::ops::{Deref, DerefMut};
use std::os::unix::fs::PermissionsExt;
use std::path::{Component, Path, PathBuf};
use std::result;
use std::sync::mpsc::channel;
use std::sync::{Arc, Mutex};
use std::thread;
#[derive(Clone, Debug, PartialEq)]
pub(super) enum PathMod {
@ -188,31 +185,20 @@ impl MailBackend for MaildirType {
CAPABILITIES
}
fn is_online(&self) -> Result<()> {
Ok(())
}
fn is_online_async(&self) -> ResultFuture<()> {
fn is_online(&self) -> ResultFuture<()> {
Ok(Box::pin(async { Ok(()) }))
}
fn mailboxes(&self) -> Result<HashMap<MailboxHash, Mailbox>> {
Ok(self
fn mailboxes(&self) -> ResultFuture<HashMap<MailboxHash, Mailbox>> {
let res = Ok(self
.mailboxes
.iter()
.map(|(h, f)| (*h, BackendMailbox::clone(f)))
.collect())
}
fn mailboxes_async(&self) -> ResultFuture<HashMap<MailboxHash, Mailbox>> {
let res = self.mailboxes();
.collect());
Ok(Box::pin(async { res }))
}
fn fetch(&mut self, mailbox_hash: MailboxHash) -> Result<Async<Result<Vec<Envelope>>>> {
Ok(self.multicore(4, mailbox_hash))
}
fn fetch_async(
fn fetch(
&mut self,
mailbox_hash: MailboxHash,
) -> Result<core::pin::Pin<Box<dyn Stream<Item = Result<Vec<Envelope>>> + Send + 'static>>>
@ -224,14 +210,19 @@ impl MailBackend for MaildirType {
let root_path = self.path.to_path_buf();
let map = self.hash_indexes.clone();
let mailbox_index = self.mailbox_index.clone();
super::stream::MaildirStream::new(&self.name, mailbox_hash, unseen, total, path, root_path, map, mailbox_index)
super::stream::MaildirStream::new(
&self.name,
mailbox_hash,
unseen,
total,
path,
root_path,
map,
mailbox_index,
)
}
fn refresh(
&mut self,
mailbox_hash: MailboxHash,
) -> Result<Async<()>> {
let w = AsyncBuilder::new();
fn refresh(&mut self, mailbox_hash: MailboxHash) -> ResultFuture<()> {
let cache_dir = xdg::BaseDirectories::with_profile("meli", &self.name).unwrap();
let account_hash = {
let mut hasher = DefaultHasher::default();
@ -240,117 +231,116 @@ impl MailBackend for MaildirType {
};
let sender = self.event_consumer.clone();
let handle = {
let mailbox: &MaildirMailbox = &self.mailboxes[&mailbox_hash];
let path: PathBuf = mailbox.fs_path().into();
let name = format!("refresh {:?}", mailbox.name());
let root_path = self.path.to_path_buf();
let map = self.hash_indexes.clone();
let mailbox_index = self.mailbox_index.clone();
let mailbox: &MaildirMailbox = &self.mailboxes[&mailbox_hash];
let path: PathBuf = mailbox.fs_path().into();
let root_path = self.path.to_path_buf();
let map = self.hash_indexes.clone();
let mailbox_index = self.mailbox_index.clone();
Box::new(move |work_context: crate::async_workers::WorkContext| {
work_context
.set_name
.send((std::thread::current().id(), name.clone()))
.unwrap();
let thunk = move |sender: &BackendEventConsumer| {
debug!("refreshing");
let mut path = path.clone();
path.push("new");
for d in path.read_dir()? {
if let Ok(p) = d {
move_to_cur(p.path()).ok().take();
}
Ok(Box::pin(async move {
let thunk = move |sender: &BackendEventConsumer| {
debug!("refreshing");
let mut path = path.clone();
path.push("new");
for d in path.read_dir()? {
if let Ok(p) = d {
move_to_cur(p.path()).ok().take();
}
path.pop();
}
path.pop();
path.push("cur");
let iter = path.read_dir()?;
let count = path.read_dir()?.count();
let mut files: Vec<PathBuf> = Vec::with_capacity(count);
for e in iter {
let e = e.and_then(|x| {
let path = x.path();
Ok(path)
})?;
files.push(e);
}
let mut current_hashes = {
path.push("cur");
let iter = path.read_dir()?;
let count = path.read_dir()?.count();
let mut files: Vec<PathBuf> = Vec::with_capacity(count);
for e in iter {
let e = e.and_then(|x| {
let path = x.path();
Ok(path)
})?;
files.push(e);
}
let mut current_hashes = {
let mut map = map.lock().unwrap();
let map = map.entry(mailbox_hash).or_default();
map.keys().cloned().collect::<HashSet<EnvelopeHash>>()
};
for file in files {
let hash = get_file_hash(&file);
{
let mut map = map.lock().unwrap();
let map = map.entry(mailbox_hash).or_default();
map.keys().cloned().collect::<HashSet<EnvelopeHash>>()
};
for file in files {
let hash = get_file_hash(&file);
{
let mut map = map.lock().unwrap();
let map = map.entry(mailbox_hash).or_default();
if map.contains_key(&hash) {
map.remove(&hash);
current_hashes.remove(&hash);
continue;
}
(*map).insert(hash, PathBuf::from(&file).into());
if map.contains_key(&hash) {
map.remove(&hash);
current_hashes.remove(&hash);
continue;
}
let op = Box::new(MaildirOp::new(hash, map.clone(), mailbox_hash));
if let Ok(e) = Envelope::from_token(op, hash) {
mailbox_index.lock().unwrap().insert(e.hash(), mailbox_hash);
let file_name = file.strip_prefix(&root_path).unwrap().to_path_buf();
if let Ok(cached) = cache_dir.place_cache_file(file_name) {
/* place result in cache directory */
let f = match fs::File::create(cached) {
Ok(f) => f,
Err(e) => {
panic!("{}", e);
}
};
let metadata = f.metadata().unwrap();
let mut permissions = metadata.permissions();
(*map).insert(hash, PathBuf::from(&file).into());
}
let op = Box::new(MaildirOp::new(hash, map.clone(), mailbox_hash));
if let Ok(e) = Envelope::from_token(op, hash) {
mailbox_index.lock().unwrap().insert(e.hash(), mailbox_hash);
let file_name = file.strip_prefix(&root_path).unwrap().to_path_buf();
if let Ok(cached) = cache_dir.place_cache_file(file_name) {
/* place result in cache directory */
let f = match fs::File::create(cached) {
Ok(f) => f,
Err(e) => {
panic!("{}", e);
}
};
let metadata = f.metadata().unwrap();
let mut permissions = metadata.permissions();
permissions.set_mode(0o600); // Read/write for owner only.
f.set_permissions(permissions).unwrap();
permissions.set_mode(0o600); // Read/write for owner only.
f.set_permissions(permissions).unwrap();
let writer = io::BufWriter::new(f);
bincode::serialize_into(writer, &e).unwrap();
}
(sender)(account_hash, BackendEvent::Refresh(RefreshEvent {
let writer = io::BufWriter::new(f);
bincode::serialize_into(writer, &e).unwrap();
}
(sender)(
account_hash,
BackendEvent::Refresh(RefreshEvent {
account_hash,
mailbox_hash,
kind: Create(Box::new(e)),
}));
} else {
debug!(
"DEBUG: hash {}, path: {} couldn't be parsed",
hash,
file.as_path().display()
);
continue;
}
}),
);
} else {
debug!(
"DEBUG: hash {}, path: {} couldn't be parsed",
hash,
file.as_path().display()
);
continue;
}
for ev in current_hashes.into_iter().map(|h| BackendEvent::Refresh(RefreshEvent {
}
for ev in current_hashes.into_iter().map(|h| {
BackendEvent::Refresh(RefreshEvent {
account_hash,
mailbox_hash,
kind: Remove(h),
})) {
(sender)(account_hash, ev);
}
Ok(())
};
if let Err(err) = thunk(&sender) {
(sender)(account_hash, BackendEvent::Refresh(RefreshEvent {
})
}) {
(sender)(account_hash, ev);
}
Ok(())
};
if let Err(err) = thunk(&sender) {
(sender)(
account_hash,
BackendEvent::Refresh(RefreshEvent {
account_hash,
mailbox_hash,
kind: Failure(err),
}));
}
})
};
Ok(w.build(handle))
}),
);
}
Ok(())
}))
}
fn watch(
&self,
work_context: WorkContext,
) -> Result<std::thread::ThreadId> {
fn watch(&self) -> ResultFuture<()> {
let sender = self.event_consumer.clone();
let (tx, rx) = channel();
let mut watcher = watcher(tx, Duration::from_secs(2)).unwrap();
@ -365,335 +355,371 @@ impl MailBackend for MaildirType {
debug!("watching {:?}", root_path);
let hash_indexes = self.hash_indexes.clone();
let mailbox_index = self.mailbox_index.clone();
let root_mailbox_hash: MailboxHash = self.mailboxes.values().find(|m| m.parent.is_none()).map(|m| m.hash()).unwrap();
let root_mailbox_hash: MailboxHash = self
.mailboxes
.values()
.find(|m| m.parent.is_none())
.map(|m| m.hash())
.unwrap();
let mailbox_counts = self
.mailboxes
.iter()
.map(|(&k, v)| (k, (v.unseen.clone(), v.total.clone())))
.collect::<HashMap<MailboxHash, (Arc<Mutex<usize>>, Arc<Mutex<usize>>)>>();
let handle = thread::Builder::new()
.name("mailbox watch".to_string())
.spawn(move || {
// Move `watcher` in the closure's scope so that it doesn't get dropped.
let _watcher = watcher;
let _work_context = work_context;
loop {
match rx.recv() {
/*
* Event types:
*
* pub enum RefreshEventKind {
* Update(EnvelopeHash, Envelope), // Old hash, new envelope
* Create(Envelope),
* Remove(EnvelopeHash),
* Rescan,
* }
*/
Ok(event) => match event {
/* Create */
DebouncedEvent::Create(mut pathbuf) => {
debug!("DebouncedEvent::Create(path = {:?}", pathbuf);
if path_is_new!(pathbuf) {
debug!("path_is_new");
/* This creates a Rename event that we will receive later */
pathbuf = match move_to_cur(pathbuf) {
Ok(p) => p,
Err(e) => {
debug!("error: {}", e.to_string());
continue;
}
};
Ok(Box::pin(async move {
// Move `watcher` in the closure's scope so that it doesn't get dropped.
let _watcher = watcher;
loop {
match rx.recv() {
/*
* Event types:
*
* pub enum RefreshEventKind {
* Update(EnvelopeHash, Envelope), // Old hash, new envelope
* Create(Envelope),
* Remove(EnvelopeHash),
* Rescan,
* }
*/
Ok(event) => match event {
/* Create */
DebouncedEvent::Create(mut pathbuf) => {
debug!("DebouncedEvent::Create(path = {:?}", pathbuf);
if path_is_new!(pathbuf) {
debug!("path_is_new");
/* This creates a Rename event that we will receive later */
pathbuf = match move_to_cur(pathbuf) {
Ok(p) => p,
Err(e) => {
debug!("error: {}", e.to_string());
continue;
}
};
}
let mailbox_hash = get_path_hash!(pathbuf);
let file_name = pathbuf
.as_path()
.strip_prefix(&root_path)
.unwrap()
.to_path_buf();
if let Some(env) = add_path_to_index(
&hash_indexes,
mailbox_hash,
pathbuf.as_path(),
&cache_dir,
file_name,
) {
mailbox_index
.lock()
.unwrap()
.insert(env.hash(), mailbox_hash);
debug!(
"Create event {} {} {}",
env.hash(),
env.subject(),
pathbuf.display()
);
if !env.is_seen() {
*mailbox_counts[&mailbox_hash].0.lock().unwrap() += 1;
}
let mailbox_hash = get_path_hash!(pathbuf);
let file_name = pathbuf
*mailbox_counts[&mailbox_hash].1.lock().unwrap() += 1;
(sender)(
account_hash,
BackendEvent::Refresh(RefreshEvent {
account_hash,
mailbox_hash,
kind: Create(Box::new(env)),
}),
);
}
}
/* Update */
DebouncedEvent::NoticeWrite(pathbuf) | DebouncedEvent::Write(pathbuf) => {
debug!("DebouncedEvent::Write(path = {:?}", &pathbuf);
let mailbox_hash = get_path_hash!(pathbuf);
let mut hash_indexes_lock = hash_indexes.lock().unwrap();
let index_lock =
&mut hash_indexes_lock.entry(mailbox_hash).or_default();
let file_name = pathbuf
.as_path()
.strip_prefix(&root_path)
.unwrap()
.to_path_buf();
/* Linear search in hash_index to find old hash */
let old_hash: EnvelopeHash = {
if let Some((k, v)) =
index_lock.iter_mut().find(|(_, v)| *v.buf == pathbuf)
{
//TODO FIXME This doesn't make sense?
*v = pathbuf.clone().into();
*k
} else {
drop(hash_indexes_lock);
/* Did we just miss a Create event? In any case, create
* envelope. */
if let Some(env) = add_path_to_index(
&hash_indexes,
mailbox_hash,
pathbuf.as_path(),
&cache_dir,
file_name,
) {
mailbox_index
.lock()
.unwrap()
.insert(env.hash(), mailbox_hash);
(sender)(
account_hash,
BackendEvent::Refresh(RefreshEvent {
account_hash,
mailbox_hash,
kind: Create(Box::new(env)),
}),
);
}
continue;
}
};
let new_hash: EnvelopeHash = get_file_hash(pathbuf.as_path());
if index_lock.get_mut(&new_hash).is_none() {
debug!("write notice");
let op = Box::new(MaildirOp::new(
new_hash,
hash_indexes.clone(),
mailbox_hash,
));
if let Ok(env) = Envelope::from_token(op, new_hash) {
debug!("{}\t{:?}", new_hash, &pathbuf);
debug!(
"hash {}, path: {:?} couldn't be parsed",
new_hash, &pathbuf
);
index_lock.insert(new_hash, pathbuf.into());
/* Send Write notice */
(sender)(
account_hash,
BackendEvent::Refresh(RefreshEvent {
account_hash,
mailbox_hash,
kind: Update(old_hash, Box::new(env)),
}),
);
}
}
}
/* Remove */
DebouncedEvent::NoticeRemove(pathbuf) | DebouncedEvent::Remove(pathbuf) => {
debug!("DebouncedEvent::Remove(path = {:?}", pathbuf);
let mailbox_hash = get_path_hash!(pathbuf);
let mut hash_indexes_lock = hash_indexes.lock().unwrap();
let index_lock = hash_indexes_lock.entry(mailbox_hash).or_default();
let hash: EnvelopeHash = if let Some((k, _)) =
index_lock.iter().find(|(_, v)| *v.buf == pathbuf)
{
*k
} else {
debug!("removed but not contained in index");
continue;
};
if let Some(ref modif) = &index_lock[&hash].modified {
match modif {
PathMod::Path(path) => debug!(
"envelope {} has modified path set {}",
hash,
path.display()
),
PathMod::Hash(hash) => debug!(
"envelope {} has modified path set {}",
hash,
&index_lock[&hash].buf.display()
),
}
index_lock.entry(hash).and_modify(|e| {
e.removed = false;
});
continue;
}
*mailbox_counts[&mailbox_hash].1.lock().unwrap() -= 1;
if !pathbuf.flags().contains(Flag::SEEN) {
*mailbox_counts[&mailbox_hash].0.lock().unwrap() -= 1;
}
index_lock.entry(hash).and_modify(|e| {
e.removed = true;
});
(sender)(
account_hash,
BackendEvent::Refresh(RefreshEvent {
account_hash,
mailbox_hash,
kind: Remove(hash),
}),
);
}
/* Envelope hasn't changed */
DebouncedEvent::Rename(src, dest) => {
debug!("DebouncedEvent::Rename(src = {:?}, dest = {:?})", src, dest);
let mailbox_hash = get_path_hash!(src);
let old_hash: EnvelopeHash = get_file_hash(src.as_path());
let new_hash: EnvelopeHash = get_file_hash(dest.as_path());
let mut hash_indexes_lock = hash_indexes.lock().unwrap();
let index_lock = hash_indexes_lock.entry(mailbox_hash).or_default();
let old_flags = src.flags();
let new_flags = dest.flags();
let was_seen: bool = old_flags.contains(Flag::SEEN);
let is_seen: bool = new_flags.contains(Flag::SEEN);
if index_lock.contains_key(&old_hash) && !index_lock[&old_hash].removed
{
debug!("contains_old_key");
index_lock.entry(old_hash).and_modify(|e| {
debug!(&e.modified);
e.modified = Some(PathMod::Hash(new_hash));
});
(sender)(
account_hash,
BackendEvent::Refresh(RefreshEvent {
account_hash,
mailbox_hash: get_path_hash!(dest),
kind: Rename(old_hash, new_hash),
}),
);
if !was_seen && is_seen {
let mut lck = mailbox_counts[&mailbox_hash].0.lock().unwrap();
*lck = lck.saturating_sub(1);
} else if was_seen && !is_seen {
*mailbox_counts[&mailbox_hash].0.lock().unwrap() += 1;
}
if old_flags != new_flags {
(sender)(
account_hash,
BackendEvent::Refresh(RefreshEvent {
account_hash,
mailbox_hash: get_path_hash!(dest),
kind: NewFlags(new_hash, (new_flags, vec![])),
}),
);
}
mailbox_index
.lock()
.unwrap()
.insert(new_hash, get_path_hash!(dest));
index_lock.insert(new_hash, dest.into());
continue;
} else if !index_lock.contains_key(&new_hash)
&& index_lock
.get(&old_hash)
.map(|e| e.removed)
.unwrap_or(false)
{
if index_lock
.get(&old_hash)
.map(|e| e.removed)
.unwrap_or(false)
{
index_lock.entry(old_hash).and_modify(|e| {
e.modified = Some(PathMod::Hash(new_hash));
e.removed = false;
});
debug!("contains_old_key, key was marked as removed (by external source)");
} else {
debug!("not contains_new_key");
}
let file_name = dest
.as_path()
.strip_prefix(&root_path)
.unwrap()
.to_path_buf();
debug!("filename = {:?}", file_name);
drop(hash_indexes_lock);
if let Some(env) = add_path_to_index(
&hash_indexes,
mailbox_hash,
pathbuf.as_path(),
dest.as_path(),
&cache_dir,
file_name,
) {
mailbox_index.lock().unwrap().insert(env.hash(),mailbox_hash);
mailbox_index
.lock()
.unwrap()
.insert(env.hash(), mailbox_hash);
debug!(
"Create event {} {} {}",
env.hash(),
env.subject(),
pathbuf.display()
dest.display()
);
if !env.is_seen() {
*mailbox_counts[&mailbox_hash].0.lock().unwrap() += 1;
}
*mailbox_counts[&mailbox_hash].1.lock().unwrap() += 1;
(sender)(account_hash, BackendEvent::Refresh(RefreshEvent {
(sender)(
account_hash,
mailbox_hash,
kind: Create(Box::new(env)),
}));
}
}
/* Update */
DebouncedEvent::NoticeWrite(pathbuf)
| DebouncedEvent::Write(pathbuf) => {
debug!("DebouncedEvent::Write(path = {:?}", &pathbuf);
let mailbox_hash = get_path_hash!(pathbuf);
let mut hash_indexes_lock = hash_indexes.lock().unwrap();
let index_lock =
&mut hash_indexes_lock.entry(mailbox_hash).or_default();
let file_name = pathbuf
.as_path()
.strip_prefix(&root_path)
.unwrap()
.to_path_buf();
/* Linear search in hash_index to find old hash */
let old_hash: EnvelopeHash = {
if let Some((k, v)) =
index_lock.iter_mut().find(|(_, v)| *v.buf == pathbuf)
{
//TODO FIXME This doesn't make sense?
*v = pathbuf.clone().into();
*k
} else {
drop(hash_indexes_lock);
/* Did we just miss a Create event? In any case, create
* envelope. */
if let Some(env) = add_path_to_index(
&hash_indexes,
mailbox_hash,
pathbuf.as_path(),
&cache_dir,
file_name,
) {
mailbox_index.lock().unwrap().insert(env.hash(),mailbox_hash);
(sender)(account_hash, BackendEvent::Refresh(RefreshEvent {
account_hash,
mailbox_hash,
kind: Create(Box::new(env)),
}));
}
return;
}
};
let new_hash: EnvelopeHash = get_file_hash(pathbuf.as_path());
if index_lock.get_mut(&new_hash).is_none() {
debug!("write notice");
let op = Box::new(MaildirOp::new(
new_hash,
hash_indexes.clone(),
mailbox_hash,
));
if let Ok(env) = Envelope::from_token(op, new_hash) {
debug!("{}\t{:?}", new_hash, &pathbuf);
debug!(
"hash {}, path: {:?} couldn't be parsed",
new_hash, &pathbuf
);
index_lock.insert(new_hash, pathbuf.into());
/* Send Write notice */
(sender)(account_hash, BackendEvent::Refresh(RefreshEvent {
account_hash,
mailbox_hash,
kind: Update(old_hash, Box::new(env)),
}));
}
}
}
/* Remove */
DebouncedEvent::NoticeRemove(pathbuf)
| DebouncedEvent::Remove(pathbuf) => {
debug!("DebouncedEvent::Remove(path = {:?}", pathbuf);
let mailbox_hash = get_path_hash!(pathbuf);
let mut hash_indexes_lock = hash_indexes.lock().unwrap();
let index_lock = hash_indexes_lock.entry(mailbox_hash).or_default();
let hash: EnvelopeHash = if let Some((k, _)) =
index_lock.iter().find(|(_, v)| *v.buf == pathbuf)
{
*k
} else {
debug!("removed but not contained in index");
continue;
};
if let Some(ref modif) = &index_lock[&hash].modified {
match modif {
PathMod::Path(path) => debug!(
"envelope {} has modified path set {}",
hash,
path.display()
),
PathMod::Hash(hash) => debug!(
"envelope {} has modified path set {}",
hash,
&index_lock[&hash].buf.display()
),
}
index_lock.entry(hash).and_modify(|e| {
e.removed = false;
});
continue;
}
*mailbox_counts[&mailbox_hash].1.lock().unwrap() -= 1;
if !pathbuf.flags().contains(Flag::SEEN) {
*mailbox_counts[&mailbox_hash].0.lock().unwrap() -= 1;
}
index_lock.entry(hash).and_modify(|e| {
e.removed = true;
});
(sender)(account_hash, BackendEvent::Refresh(RefreshEvent {
account_hash,
mailbox_hash,
kind: Remove(hash),
}));
}
/* Envelope hasn't changed */
DebouncedEvent::Rename(src, dest) => {
debug!(
"DebouncedEvent::Rename(src = {:?}, dest = {:?})",
src, dest
);
let mailbox_hash = get_path_hash!(src);
let old_hash: EnvelopeHash = get_file_hash(src.as_path());
let new_hash: EnvelopeHash = get_file_hash(dest.as_path());
let mut hash_indexes_lock = hash_indexes.lock().unwrap();
let index_lock = hash_indexes_lock.entry(mailbox_hash).or_default();
let old_flags = src.flags();
let new_flags = dest.flags();
let was_seen: bool = old_flags.contains(Flag::SEEN);
let is_seen: bool = new_flags.contains(Flag::SEEN);
if index_lock.contains_key(&old_hash)
&& !index_lock[&old_hash].removed
{
debug!("contains_old_key");
index_lock.entry(old_hash).and_modify(|e| {
debug!(&e.modified);
e.modified = Some(PathMod::Hash(new_hash));
});
(sender)(account_hash, BackendEvent::Refresh(RefreshEvent {
account_hash,
mailbox_hash: get_path_hash!(dest),
kind: Rename(old_hash, new_hash),
}));
if !was_seen && is_seen {
let mut lck = mailbox_counts[&mailbox_hash].0.lock().unwrap();
*lck = lck.saturating_sub(1);
} else if was_seen && !is_seen {
*mailbox_counts[&mailbox_hash].0.lock().unwrap() += 1;
}
if old_flags != new_flags {
(sender)(account_hash, BackendEvent::Refresh(RefreshEvent {
account_hash,
mailbox_hash: get_path_hash!(dest),
kind: NewFlags(new_hash, (new_flags, vec![])),
}));
}
mailbox_index.lock().unwrap().insert(new_hash,get_path_hash!(dest) );
index_lock.insert(new_hash, dest.into());
continue;
} else if !index_lock.contains_key(&new_hash)
&& index_lock
.get(&old_hash)
.map(|e| e.removed)
.unwrap_or(false)
{
if index_lock
.get(&old_hash)
.map(|e| e.removed)
.unwrap_or(false)
{
index_lock.entry(old_hash).and_modify(|e| {
e.modified = Some(PathMod::Hash(new_hash));
e.removed = false;
});
debug!("contains_old_key, key was marked as removed (by external source)");
} else {
debug!("not contains_new_key");
}
let file_name = dest
.as_path()
.strip_prefix(&root_path)
.unwrap()
.to_path_buf();
debug!("filename = {:?}", file_name);
drop(hash_indexes_lock);
if let Some(env) = add_path_to_index(
&hash_indexes,
mailbox_hash,
dest.as_path(),
&cache_dir,
file_name,
) {
mailbox_index.lock().unwrap().insert(env.hash(), mailbox_hash);
debug!(
"Create event {} {} {}",
env.hash(),
env.subject(),
dest.display()
);
if !env.is_seen() {
*mailbox_counts[&mailbox_hash].0.lock().unwrap() += 1;
}
*mailbox_counts[&mailbox_hash].1.lock().unwrap() += 1;
(sender)(account_hash, BackendEvent::Refresh(RefreshEvent {
BackendEvent::Refresh(RefreshEvent {
account_hash,
mailbox_hash,
kind: Create(Box::new(env)),
}));
continue;
} else {
debug!("not valid email");
}
}),
);
continue;
} else {
if was_seen && !is_seen {
*mailbox_counts[&mailbox_hash].0.lock().unwrap() += 1;
}
(sender)(account_hash, BackendEvent::Refresh(RefreshEvent {
debug!("not valid email");
}
} else {
if was_seen && !is_seen {
*mailbox_counts[&mailbox_hash].0.lock().unwrap() += 1;
}
(sender)(
account_hash,
BackendEvent::Refresh(RefreshEvent {
account_hash,
mailbox_hash: get_path_hash!(dest),
kind: Rename(old_hash, new_hash),
}));
debug!("contains_new_key");
if old_flags != new_flags {
(sender)(account_hash, BackendEvent::Refresh(RefreshEvent {
}),
);
debug!("contains_new_key");
if old_flags != new_flags {
(sender)(
account_hash,
BackendEvent::Refresh(RefreshEvent {
account_hash,
mailbox_hash: get_path_hash!(dest),
kind: NewFlags(new_hash, (new_flags, vec![])),
}));
}
}),
);
}
/* Maybe a re-read should be triggered here just to be safe.
(sender)(account_hash, BackendEvent::Refresh(RefreshEvent {
account_hash,
mailbox_hash: get_path_hash!(dest),
kind: Rescan,
}));
*/
}
/* Trigger rescan of mailbox */
DebouncedEvent::Rescan => {
(sender)(account_hash, BackendEvent::Refresh(RefreshEvent {
/* Maybe a re-read should be triggered here just to be safe.
(sender)(account_hash, BackendEvent::Refresh(RefreshEvent {
account_hash,
mailbox_hash: get_path_hash!(dest),
kind: Rescan,
}));
*/
}
/* Trigger rescan of mailbox */
DebouncedEvent::Rescan => {
(sender)(
account_hash,
BackendEvent::Refresh(RefreshEvent {
account_hash,
mailbox_hash: root_mailbox_hash,
kind: Rescan,
}));
}
_ => {}
},
Err(e) => debug!("watch error: {:?}", e),
}
}),
);
}
_ => {}
},
Err(e) => debug!("watch error: {:?}", e),
}
})?;
Ok(handle.thread().id())
}
Ok(())
}))
}
fn operation(&self, hash: EnvelopeHash) -> Result<Box<dyn BackendOp>> {
@ -832,8 +858,8 @@ impl MailBackend for MaildirType {
};
self.mailboxes.insert(mailbox_hash, new_mailbox);
let ret = Ok((mailbox_hash, self.mailboxes()?));
Ok(Box::pin(async { ret }))
let ret = self.mailboxes()?;
Ok(Box::pin(async move { Ok((mailbox_hash, ret.await?)) }))
}
fn delete_mailbox(
@ -1019,7 +1045,12 @@ impl MaildirType {
}))
}
pub fn multicore(&mut self, cores: usize, mailbox_hash: MailboxHash) -> Async<Result<Vec<Envelope>>> {
/*
pub fn multicore(
&mut self,
cores: usize,
mailbox_hash: MailboxHash,
) -> Async<Result<Vec<Envelope>>> {
let mut w = AsyncBuilder::new();
let cache_dir = xdg::BaseDirectories::with_profile("meli", &self.name).unwrap();
@ -1199,6 +1230,7 @@ impl MaildirType {
};
w.build(handle)
}
*/
pub fn save_to_mailbox(mut path: PathBuf, bytes: Vec<u8>, flags: Option<Flag>) -> Result<()> {
for d in &["cur", "new", "tmp"] {

View File

@ -23,7 +23,6 @@
* https://wiki2.dovecot.org/MailboxFormat/mbox
*/
use crate::async_workers::{Async, AsyncBuilder, AsyncStatus, WorkContext};
use crate::backends::*;
use crate::conf::AccountSettings;
use crate::email::parser::BytesExt;
@ -707,96 +706,109 @@ impl MailBackend for MboxType {
CAPABILITIES
}
fn is_online(&self) -> Result<()> {
Ok(())
fn is_online(&self) -> ResultFuture<()> {
Ok(Box::pin(async { Ok(()) }))
}
fn fetch(&mut self, mailbox_hash: MailboxHash) -> Result<Async<Result<Vec<Envelope>>>> {
let mut w = AsyncBuilder::new();
let handle = {
let tx = w.tx();
let mailbox_index = self.mailbox_index.clone();
let mailboxes = self.mailboxes.clone();
let mailbox_path = mailboxes.lock().unwrap()[&mailbox_hash].fs_path.clone();
let prefer_mbox_type = self.prefer_mbox_type;
let closure = move |_work_context| {
let tx = tx.clone();
let file = match std::fs::OpenOptions::new()
.read(true)
.write(true)
.open(&mailbox_path)
{
Ok(f) => f,
Err(e) => {
tx.send(AsyncStatus::Payload(Err(MeliError::from(e))))
.unwrap();
return;
}
};
get_rw_lock_blocking(&file);
let mut buf_reader = BufReader::new(file);
let mut contents = Vec::new();
if let Err(e) = buf_reader.read_to_end(&mut contents) {
tx.send(AsyncStatus::Payload(Err(MeliError::from(e))))
.unwrap();
return;
};
let mailboxes_lck = mailboxes.lock().unwrap();
let index = mailboxes_lck[&mailbox_hash].index.clone();
fn fetch(
&mut self,
mailbox_hash: MailboxHash,
) -> Result<Pin<Box<dyn Stream<Item = Result<Vec<Envelope>>> + Send + 'static>>> {
struct FetchState {
mailbox_hash: MailboxHash,
mailbox_index: Arc<Mutex<HashMap<EnvelopeHash, MailboxHash>>>,
mailboxes: Arc<Mutex<HashMap<MailboxHash, MboxMailbox>>>,
prefer_mbox_type: Option<MboxReader>,
offset: usize,
file_offset: usize,
contents: Vec<u8>,
}
impl FetchState {
async fn fetch(&mut self) -> Result<Option<Vec<Envelope>>> {
let mailboxes_lck = self.mailboxes.lock().unwrap();
let index = mailboxes_lck[&self.mailbox_hash].index.clone();
drop(mailboxes_lck);
let mut message_iter = MessageIterator {
index,
input: &contents.as_slice(),
offset: 0,
file_offset: 0,
reader: prefer_mbox_type,
input: &self.contents.as_slice(),
offset: self.offset,
file_offset: self.file_offset,
reader: self.prefer_mbox_type,
};
let mut err = None;
loop {
let mut payload = vec![];
'iter_for_loop: for _i in 0..150 {
match message_iter.next() {
Some(Ok(env)) => {
payload.push(env);
}
Some(Err(_err)) => {
debug!(&_err);
err = Some(_err);
}
None => {
break 'iter_for_loop;
}
let mut payload = vec![];
let mut done = false;
'iter_for_loop: for _i in 0..150 {
match message_iter.next() {
Some(Ok(env)) => {
payload.push(env);
}
Some(Err(_err)) => {
debug!(&_err);
}
None => {
done = true;
break 'iter_for_loop;
}
}
if !payload.is_empty() {
err = None;
} else {
break;
}
let mut mailbox_index_lck = mailbox_index.lock().unwrap();
for env in &payload {
mailbox_index_lck.insert(env.hash(), mailbox_hash);
}
tx.send(AsyncStatus::Payload(Ok(payload))).unwrap();
}
if let Some(err) = err {
tx.send(AsyncStatus::Payload(Err(err))).unwrap();
}
self.offset = message_iter.offset;
self.file_offset = message_iter.file_offset;
{
let mut mailbox_lock = mailboxes.lock().unwrap();
mailbox_lock
.entry(mailbox_hash)
.and_modify(|f| f.content = contents);
let mut mailbox_index_lck = self.mailbox_index.lock().unwrap();
for env in &payload {
mailbox_index_lck.insert(env.hash(), self.mailbox_hash);
}
}
tx.send(AsyncStatus::Finished).unwrap();
};
Box::new(closure)
if done {
if payload.is_empty() {
return Ok(None);
} else {
let mut mailbox_lock = self.mailboxes.lock().unwrap();
let contents = std::mem::replace(&mut self.contents, vec![]);
mailbox_lock
.entry(self.mailbox_hash)
.and_modify(|f| f.content = contents);
Ok(Some(payload))
}
} else {
Ok(Some(payload))
}
}
}
let mailboxes = self.mailboxes.clone();
let mailbox_path = mailboxes.lock().unwrap()[&mailbox_hash].fs_path.clone();
let file = std::fs::OpenOptions::new()
.read(true)
.write(true)
.open(&mailbox_path)?;
get_rw_lock_blocking(&file);
let mut buf_reader = BufReader::new(file);
let mut contents = Vec::new();
buf_reader.read_to_end(&mut contents)?;
let mut state = FetchState {
mailbox_hash,
mailboxes,
mailbox_index: self.mailbox_index.clone(),
prefer_mbox_type: self.prefer_mbox_type,
contents,
offset: 0,
file_offset: 0,
};
Ok(w.build(handle))
Ok(Box::pin(async_stream::try_stream! {
loop {
if let Some(res) = state.fetch().await.map_err(|err| {
debug!("fetch err {:?}", &err);
err})? {
yield res;
} else {
return;
}
}
}))
}
fn watch(&self, work_context: WorkContext) -> Result<std::thread::ThreadId> {
fn watch(&self) -> ResultFuture<()> {
let sender = self.event_consumer.clone();
let (tx, rx) = channel();
let mut watcher = watcher(tx, std::time::Duration::from_secs(10))
@ -817,165 +829,154 @@ impl MailBackend for MboxType {
let mailboxes = self.mailboxes.clone();
let mailbox_index = self.mailbox_index.clone();
let prefer_mbox_type = self.prefer_mbox_type;
let handle = std::thread::Builder::new()
.name(format!("watching {}", self.account_name,))
.spawn(move || {
// Move `watcher` in the closure's scope so that it doesn't get dropped.
let _watcher = watcher;
let _work_context = work_context;
let mailboxes = mailboxes;
loop {
match rx.recv() {
/*
* Event types:
*
* pub enum RefreshEventKind {
* Update(EnvelopeHash, Envelope), // Old hash, new envelope
* Create(Envelope),
* Remove(EnvelopeHash),
* Rescan,
* }
*/
Ok(event) => match event {
/* Update */
DebouncedEvent::NoticeWrite(pathbuf)
| DebouncedEvent::Write(pathbuf) => {
let mailbox_hash = get_path_hash!(&pathbuf);
let file = match std::fs::OpenOptions::new()
.read(true)
.write(true)
.open(&pathbuf)
{
Ok(f) => f,
Err(_) => {
continue;
}
};
get_rw_lock_blocking(&file);
let mut mailbox_lock = mailboxes.lock().unwrap();
let mut buf_reader = BufReader::new(file);
let mut contents = Vec::new();
if let Err(e) = buf_reader.read_to_end(&mut contents) {
debug!(e);
Ok(Box::pin(async move {
loop {
match rx.recv() {
/*
* Event types:
*
* pub enum RefreshEventKind {
* Update(EnvelopeHash, Envelope), // Old hash, new envelope
* Create(Envelope),
* Remove(EnvelopeHash),
* Rescan,
* }
*/
Ok(event) => match event {
/* Update */
DebouncedEvent::NoticeWrite(pathbuf) | DebouncedEvent::Write(pathbuf) => {
let mailbox_hash = get_path_hash!(&pathbuf);
let file = match std::fs::OpenOptions::new()
.read(true)
.write(true)
.open(&pathbuf)
{
Ok(f) => f,
Err(_) => {
continue;
};
if contents
.starts_with(mailbox_lock[&mailbox_hash].content.as_slice())
{
if let Ok((_, envelopes)) = mbox_parse(
mailbox_lock[&mailbox_hash].index.clone(),
&contents,
mailbox_lock[&mailbox_hash].content.len(),
prefer_mbox_type,
) {
let mut mailbox_index_lck = mailbox_index.lock().unwrap();
for env in envelopes {
mailbox_index_lck.insert(env.hash(), mailbox_hash);
(sender)(
}
};
get_rw_lock_blocking(&file);
let mut mailbox_lock = mailboxes.lock().unwrap();
let mut buf_reader = BufReader::new(file);
let mut contents = Vec::new();
if let Err(e) = buf_reader.read_to_end(&mut contents) {
debug!(e);
continue;
};
if contents.starts_with(mailbox_lock[&mailbox_hash].content.as_slice())
{
if let Ok((_, envelopes)) = mbox_parse(
mailbox_lock[&mailbox_hash].index.clone(),
&contents,
mailbox_lock[&mailbox_hash].content.len(),
prefer_mbox_type,
) {
let mut mailbox_index_lck = mailbox_index.lock().unwrap();
for env in envelopes {
mailbox_index_lck.insert(env.hash(), mailbox_hash);
(sender)(
account_hash,
BackendEvent::Refresh(RefreshEvent {
account_hash,
BackendEvent::Refresh(RefreshEvent {
account_hash,
mailbox_hash,
kind: RefreshEventKind::Create(Box::new(env)),
}),
);
}
mailbox_hash,
kind: RefreshEventKind::Create(Box::new(env)),
}),
);
}
} else {
(sender)(
account_hash,
BackendEvent::Refresh(RefreshEvent {
account_hash,
mailbox_hash,
kind: RefreshEventKind::Rescan,
}),
);
}
mailbox_lock
.entry(mailbox_hash)
.and_modify(|f| f.content = contents);
}
/* Remove */
DebouncedEvent::NoticeRemove(pathbuf)
| DebouncedEvent::Remove(pathbuf) => {
if mailboxes
.lock()
.unwrap()
.values()
.any(|f| f.fs_path == pathbuf)
{
let mailbox_hash = get_path_hash!(&pathbuf);
(sender)(
} else {
(sender)(
account_hash,
BackendEvent::Refresh(RefreshEvent {
account_hash,
BackendEvent::Refresh(RefreshEvent {
account_hash,
mailbox_hash,
kind: RefreshEventKind::Failure(MeliError::new(
format!(
"mbox mailbox {} was removed.",
pathbuf.display()
),
)),
}),
);
return;
}
mailbox_hash,
kind: RefreshEventKind::Rescan,
}),
);
}
DebouncedEvent::Rename(src, dest) => {
if mailboxes
.lock()
.unwrap()
.values()
.any(|f| &f.fs_path == &src)
{
let mailbox_hash = get_path_hash!(&src);
(sender)(
mailbox_lock
.entry(mailbox_hash)
.and_modify(|f| f.content = contents);
}
/* Remove */
DebouncedEvent::NoticeRemove(pathbuf) | DebouncedEvent::Remove(pathbuf) => {
if mailboxes
.lock()
.unwrap()
.values()
.any(|f| f.fs_path == pathbuf)
{
let mailbox_hash = get_path_hash!(&pathbuf);
(sender)(
account_hash,
BackendEvent::Refresh(RefreshEvent {
account_hash,
BackendEvent::Refresh(RefreshEvent {
account_hash,
mailbox_hash,
kind: RefreshEventKind::Failure(MeliError::new(
format!(
"mbox mailbox {} was renamed to {}.",
src.display(),
dest.display()
),
)),
}),
);
return;
}
mailbox_hash,
kind: RefreshEventKind::Failure(MeliError::new(format!(
"mbox mailbox {} was removed.",
pathbuf.display()
))),
}),
);
return Ok(());
}
/* Trigger rescan of mailboxes */
DebouncedEvent::Rescan => {
for &mailbox_hash in mailboxes.lock().unwrap().keys() {
(sender)(
}
DebouncedEvent::Rename(src, dest) => {
if mailboxes
.lock()
.unwrap()
.values()
.any(|f| &f.fs_path == &src)
{
let mailbox_hash = get_path_hash!(&src);
(sender)(
account_hash,
BackendEvent::Refresh(RefreshEvent {
account_hash,
BackendEvent::Refresh(RefreshEvent {
account_hash,
mailbox_hash,
kind: RefreshEventKind::Rescan,
}),
);
}
return;
mailbox_hash,
kind: RefreshEventKind::Failure(MeliError::new(format!(
"mbox mailbox {} was renamed to {}.",
src.display(),
dest.display()
))),
}),
);
return Ok(());
}
_ => {}
},
Err(e) => debug!("watch error: {:?}", e),
}
}
/* Trigger rescan of mailboxes */
DebouncedEvent::Rescan => {
for &mailbox_hash in mailboxes.lock().unwrap().keys() {
(sender)(
account_hash,
BackendEvent::Refresh(RefreshEvent {
account_hash,
mailbox_hash,
kind: RefreshEventKind::Rescan,
}),
);
}
return Ok(());
}
_ => {}
},
Err(e) => debug!("watch error: {:?}", e),
}
})?;
Ok(handle.thread().id())
}
Ok(())
}))
}
fn mailboxes(&self) -> Result<HashMap<MailboxHash, Mailbox>> {
Ok(self
fn mailboxes(&self) -> ResultFuture<HashMap<MailboxHash, Mailbox>> {
let ret = Ok(self
.mailboxes
.lock()
.unwrap()
.iter()
.map(|(h, f)| (*h, f.clone() as Mailbox))
.collect())
.collect());
Ok(Box::pin(async { ret }))
}
fn operation(&self, env_hash: EnvelopeHash) -> Result<Box<dyn BackendOp>> {

View File

@ -32,7 +32,6 @@ pub use operations::*;
mod connection;
pub use connection::*;
use crate::async_workers::{Async, WorkContext};
use crate::backends::*;
use crate::conf::AccountSettings;
use crate::email::*;
@ -185,7 +184,7 @@ impl MailBackend for NntpType {
}
}
fn fetch_async(
fn fetch(
&mut self,
mailbox_hash: MailboxHash,
) -> Result<Pin<Box<dyn Stream<Item = Result<Vec<Envelope>>> + Send + 'static>>> {
@ -211,11 +210,11 @@ impl MailBackend for NntpType {
}))
}
fn refresh_async(&mut self, _mailbox_hash: MailboxHash) -> ResultFuture<()> {
fn refresh(&mut self, _mailbox_hash: MailboxHash) -> ResultFuture<()> {
Err(MeliError::new("Unimplemented."))
}
fn mailboxes_async(&self) -> ResultFuture<HashMap<MailboxHash, Mailbox>> {
fn mailboxes(&self) -> ResultFuture<HashMap<MailboxHash, Mailbox>> {
let uid_store = self.uid_store.clone();
let connection = self.connection.clone();
Ok(Box::pin(async move {
@ -229,12 +228,12 @@ impl MailBackend for NntpType {
}))
}
fn is_online_async(&self) -> ResultFuture<()> {
fn is_online(&self) -> ResultFuture<()> {
let connection = self.connection.clone();
Ok(Box::pin(async move {
match timeout(std::time::Duration::from_secs(3), connection.lock()).await {
Ok(mut conn) => {
debug!("is_online_async");
debug!("is_online");
match debug!(timeout(std::time::Duration::from_secs(3), conn.connect()).await) {
Ok(Ok(())) => Ok(()),
Err(err) | Ok(Err(err)) => {
@ -248,23 +247,7 @@ impl MailBackend for NntpType {
}))
}
fn fetch(&mut self, _mailbox_hash: MailboxHash) -> Result<Async<Result<Vec<Envelope>>>> {
Err(MeliError::new("Unimplemented."))
}
fn refresh(&mut self, _mailbox_hash: MailboxHash) -> Result<Async<()>> {
Err(MeliError::new("Unimplemented."))
}
fn watch(&self, _work_context: WorkContext) -> Result<std::thread::ThreadId> {
Err(MeliError::new("Unimplemented."))
}
fn watch_async(&self) -> ResultFuture<()> {
Err(MeliError::new("Unimplemented."))
}
fn mailboxes(&self) -> Result<HashMap<MailboxHash, Mailbox>> {
fn watch(&self) -> ResultFuture<()> {
Err(MeliError::new("Unimplemented."))
}

View File

@ -19,7 +19,6 @@
* along with meli. If not, see <http://www.gnu.org/licenses/>.
*/
use crate::async_workers::{Async, AsyncBuilder, AsyncStatus, WorkContext};
use crate::backends::*;
use crate::conf::AccountSettings;
use crate::email::{Envelope, EnvelopeHash, Flag};
@ -333,71 +332,56 @@ impl MailBackend for NotmuchDb {
CAPABILITIES
}
fn is_online(&self) -> Result<()> {
Ok(())
fn is_online(&self) -> ResultFuture<()> {
Ok(Box::pin(async { Ok(()) }))
}
fn fetch(&mut self, mailbox_hash: MailboxHash) -> Result<Async<Result<Vec<Envelope>>>> {
let mut w = AsyncBuilder::new();
let database = NotmuchDb::new_connection(self.path.as_path(), self.lib.clone(), false);
let index = self.index.clone();
let mailbox_index = self.mailbox_index.clone();
let tag_index = self.tag_index.clone();
let mailboxes = self.mailboxes.clone();
let lib = self.lib.clone();
let handle = {
let tx = w.tx();
let closure = move |_work_context| {
if let Err(err) = database {
tx.send(AsyncStatus::Payload(Err(err))).unwrap();
tx.send(AsyncStatus::Finished).unwrap();
return;
}
let database = Arc::new(database.unwrap());
let database_lck = database.inner.read().unwrap();
let mailboxes_lck = mailboxes.read().unwrap();
let mailbox = mailboxes_lck.get(&mailbox_hash).unwrap();
fn fetch(
&mut self,
mailbox_hash: MailboxHash,
) -> Result<Pin<Box<dyn Stream<Item = Result<Vec<Envelope>>> + Send + 'static>>> {
struct FetchState {
mailbox_hash: MailboxHash,
database: Arc<DbConnection>,
index: Arc<RwLock<HashMap<EnvelopeHash, CString>>>,
mailbox_index: Arc<RwLock<HashMap<EnvelopeHash, SmallVec<[MailboxHash; 16]>>>>,
mailboxes: Arc<RwLock<HashMap<u64, NotmuchMailbox>>>,
tag_index: Arc<RwLock<BTreeMap<u64, String>>>,
lib: Arc<libloading::Library>,
iter: std::vec::IntoIter<CString>,
}
impl FetchState {
async fn fetch(&mut self) -> Result<Option<Vec<Envelope>>> {
let mut unseen_count = 0;
let query: Query =
match Query::new(lib.clone(), &database_lck, mailbox.query_str.as_str()) {
Ok(q) => q,
Err(err) => {
tx.send(AsyncStatus::Payload(Err(err))).unwrap();
tx.send(AsyncStatus::Finished).unwrap();
return;
}
};
let iter: Vec<<MessageIterator as Iterator>::Item> = match query.search() {
Ok(i) => i.collect(),
Err(err) => {
tx.send(AsyncStatus::Payload(Err(err))).unwrap();
tx.send(AsyncStatus::Finished).unwrap();
return;
}
};
{
let mut total_lck = mailbox.total.lock().unwrap();
let mut unseen_lck = mailbox.unseen.lock().unwrap();
*total_lck = iter.len();
*unseen_lck = 0;
}
let chunk_size = 250;
for chunk in iter.chunks(chunk_size) {
let mut ret: Vec<Envelope> = Vec::with_capacity(chunk_size);
let mut mailbox_index_lck = mailbox_index.write().unwrap();
for &message in chunk {
let mut mailbox_index_lck = self.mailbox_index.write().unwrap();
let mut ret: Vec<Envelope> = Vec::with_capacity(chunk_size);
let mut done: bool = false;
for _ in 0..chunk_size {
if let Some(message_id) = self.iter.next() {
let mut message: *mut notmuch_message_t = std::ptr::null_mut();
unsafe {
call!(self.lib, notmuch_database_find_message)(
*self.database.inner.read().unwrap(),
message_id.as_ptr(),
&mut message as *mut _,
)
};
if message.is_null() {
continue;
}
match notmuch_message_into_envelope(
lib.clone(),
index.clone(),
tag_index.clone(),
database.clone(),
self.lib.clone(),
self.index.clone(),
self.tag_index.clone(),
self.database.clone(),
message,
) {
Ok(env) => {
mailbox_index_lck
.entry(env.hash())
.or_default()
.push(mailbox_hash);
.push(self.mailbox_hash);
if !env.is_seen() {
unseen_count += 1;
}
@ -406,235 +390,290 @@ impl MailBackend for NotmuchDb {
Err(err) => {
debug!("could not parse message {:?} {}", err, {
let fs_path = unsafe {
call!(lib, notmuch_message_get_filename)(message)
call!(self.lib, notmuch_message_get_filename)(message)
};
let c_str = unsafe { CStr::from_ptr(fs_path) };
String::from_utf8_lossy(c_str.to_bytes())
});
}
}
} else {
done = true;
break;
}
{
let mut unseen_lck = mailbox.unseen.lock().unwrap();
*unseen_lck = unseen_count;
}
tx.send(AsyncStatus::Payload(Ok(ret))).unwrap();
}
tx.send(AsyncStatus::Finished).unwrap();
};
Box::new(closure)
{
let mailboxes_lck = self.mailboxes.read().unwrap();
let mailbox = mailboxes_lck.get(&self.mailbox_hash).unwrap();
let mut unseen_lck = mailbox.unseen.lock().unwrap();
*unseen_lck += unseen_count;
}
if done && ret.is_empty() {
Ok(None)
} else {
Ok(Some(ret))
}
}
}
let database = Arc::new(NotmuchDb::new_connection(
self.path.as_path(),
self.lib.clone(),
false,
)?);
let index = self.index.clone();
let mailbox_index = self.mailbox_index.clone();
let tag_index = self.tag_index.clone();
let mailboxes = self.mailboxes.clone();
let lib = self.lib.clone();
let v: Vec<CString>;
{
let database_lck = database.inner.read().unwrap();
let mailboxes_lck = mailboxes.read().unwrap();
let mailbox = mailboxes_lck.get(&mailbox_hash).unwrap();
let query: Query =
Query::new(self.lib.clone(), &database_lck, mailbox.query_str.as_str())?;
{
let mut total_lck = mailbox.total.lock().unwrap();
let mut unseen_lck = mailbox.unseen.lock().unwrap();
*total_lck = query.count()? as usize;
*unseen_lck = 0;
}
v = query
.search()?
.into_iter()
.map(|m| notmuch_message_insert(&lib, &index, m))
.collect();
}
let mut state = FetchState {
mailbox_hash,
mailboxes,
database,
lib,
index,
mailbox_index,
tag_index,
iter: v.into_iter(),
};
Ok(w.build(handle))
Ok(Box::pin(async_stream::try_stream! {
while let Some(res) = state.fetch().await.map_err(|err| { debug!("fetch err {:?}", &err); err})? {
yield res;
}
}))
}
fn watch(&self, _work_context: WorkContext) -> Result<std::thread::ThreadId> {
extern crate notify;
use crate::backends::RefreshEventKind::*;
use notify::{watcher, RecursiveMode, Watcher};
let sender = self.event_consumer.clone();
let (tx, rx) = std::sync::mpsc::channel();
let mut watcher = watcher(tx, std::time::Duration::from_secs(2)).unwrap();
watcher.watch(&self.path, RecursiveMode::Recursive).unwrap();
let path = self.path.clone();
let lib = self.lib.clone();
let tag_index = self.tag_index.clone();
let index = self.index.clone();
let account_hash = {
let mut hasher = DefaultHasher::new();
hasher.write(self.account_name.as_bytes());
hasher.finish()
};
let mailbox_index = self.mailbox_index.clone();
let mailboxes = self.mailboxes.clone();
{
let database = NotmuchDb::new_connection(path.as_path(), lib.clone(), false)?;
let mut revision_uuid_lck = self.revision_uuid.write().unwrap();
*revision_uuid_lck = unsafe {
call!(lib, notmuch_database_get_revision)(
*database.inner.read().unwrap(),
std::ptr::null_mut(),
)
/*
fn watch(&self) -> ResultFuture<()> {
extern crate notify;
use crate::backends::RefreshEventKind::*;
use notify::{watcher, RecursiveMode, Watcher};
let sender = self.event_consumer.clone();
let (tx, rx) = std::sync::mpsc::channel();
let mut watcher = watcher(tx, std::time::Duration::from_secs(2)).unwrap();
watcher.watch(&self.path, RecursiveMode::Recursive).unwrap();
let path = self.path.clone();
let lib = self.lib.clone();
let tag_index = self.tag_index.clone();
let index = self.index.clone();
let account_hash = {
let mut hasher = DefaultHasher::new();
hasher.write(self.account_name.as_bytes());
hasher.finish()
};
}
let revision_uuid = self.revision_uuid.clone();
let mailbox_index = self.mailbox_index.clone();
let mailboxes = self.mailboxes.clone();
{
let database = NotmuchDb::new_connection(path.as_path(), lib.clone(), false)?;
let mut revision_uuid_lck = self.revision_uuid.write().unwrap();
let handle = std::thread::Builder::new()
.name(format!("watching {}", self.account_name))
.spawn(move || {
let _watcher = watcher;
let c = move |sender: &BackendEventConsumer| -> std::result::Result<(), MeliError> {
loop {
let _ = rx.recv().map_err(|err| err.to_string())?;
{
let database =
NotmuchDb::new_connection(path.as_path(), lib.clone(), false)?;
let database_lck = database.inner.read().unwrap();
let mut revision_uuid_lck = revision_uuid.write().unwrap();
*revision_uuid_lck = unsafe {
call!(lib, notmuch_database_get_revision)(
*database.inner.read().unwrap(),
std::ptr::null_mut(),
)
};
}
let revision_uuid = self.revision_uuid.clone();
let new_revision = unsafe {
call!(lib, notmuch_database_get_revision)(
*database_lck,
std::ptr::null_mut(),
)
};
if new_revision > *revision_uuid_lck {
let query_str =
format!("lastmod:{}..{}", *revision_uuid_lck, new_revision);
let query: Query =
Query::new(lib.clone(), &database_lck, &query_str)?;
drop(database_lck);
let iter = query.search()?;
let mut tag_lock = tag_index.write().unwrap();
let mailbox_index_lck = mailbox_index.write().unwrap();
let mailboxes_lck = mailboxes.read().unwrap();
let database = Arc::new(database);
for message in iter {
let msg_id = unsafe {
call!(lib, notmuch_message_get_message_id)(message)
};
let c_str = unsafe { CStr::from_ptr(msg_id) };
let env_hash = {
let mut hasher = DefaultHasher::default();
c_str.hash(&mut hasher);
hasher.finish()
};
if let Some(mailbox_hashes) = mailbox_index_lck.get(&env_hash) {
let tags: (Flag, Vec<String>) =
TagIterator::new(lib.clone(), message)
.collect_flags_and_tags();
for tag in tags.1.iter() {
let mut hasher = DefaultHasher::new();
hasher.write(tag.as_bytes());
let num = hasher.finish();
if !tag_lock.contains_key(&num) {
tag_lock.insert(num, tag.clone());
let handle = std::thread::Builder::new()
.name(format!("watching {}", self.account_name))
.spawn(move || {
let _watcher = watcher;
let c = move |sender: &BackendEventConsumer| -> std::result::Result<(), MeliError> {
loop {
let _ = rx.recv().map_err(|err| err.to_string())?;
{
let database =
NotmuchDb::new_connection(path.as_path(), lib.clone(), false)?;
let database_lck = database.inner.read().unwrap();
let mut revision_uuid_lck = revision_uuid.write().unwrap();
let new_revision = unsafe {
call!(lib, notmuch_database_get_revision)(
*database_lck,
std::ptr::null_mut(),
)
};
if new_revision > *revision_uuid_lck {
let query_str =
format!("lastmod:{}..{}", *revision_uuid_lck, new_revision);
let query: Query =
Query::new(lib.clone(), &database_lck, &query_str)?;
drop(database_lck);
let iter = query.search()?;
let mut tag_lock = tag_index.write().unwrap();
let mailbox_index_lck = mailbox_index.write().unwrap();
let mailboxes_lck = mailboxes.read().unwrap();
let database = Arc::new(database);
for message in iter {
let msg_id = unsafe {
call!(lib, notmuch_message_get_message_id)(message)
};
let c_str = unsafe { CStr::from_ptr(msg_id) };
let env_hash = {
let mut hasher = DefaultHasher::default();
c_str.hash(&mut hasher);
hasher.finish()
};
if let Some(mailbox_hashes) = mailbox_index_lck.get(&env_hash) {
let tags: (Flag, Vec<String>) =
TagIterator::new(lib.clone(), message)
.collect_flags_and_tags();
for tag in tags.1.iter() {
let mut hasher = DefaultHasher::new();
hasher.write(tag.as_bytes());
let num = hasher.finish();
if !tag_lock.contains_key(&num) {
tag_lock.insert(num, tag.clone());
}
}
for &mailbox_hash in mailbox_hashes {
(sender)(
account_hash,
BackendEvent::Refresh(RefreshEvent {
account_hash,
mailbox_hash,
kind: NewFlags(env_hash, tags.clone()),
}),
);
}
} else {
match notmuch_message_into_envelope(
lib.clone(),
index.clone(),
tag_index.clone(),
database.clone(),
message,
) {
Ok(env) => {
for (&mailbox_hash, m) in mailboxes_lck.iter() {
let query_str = format!(
"{} id:{}",
m.query_str.as_str(),
c_str.to_string_lossy()
);
let database_lck =
database.inner.read().unwrap();
let query: Query = Query::new(
lib.clone(),
&database_lck,
&query_str,
)?;
if query.count().unwrap_or(0) > 0 {
let mut total_lck = m.total.lock().unwrap();
let mut unseen_lck =
m.unseen.lock().unwrap();
*total_lck += 1;
if !env.is_seen() {
*unseen_lck += 1;
}
(sender)(
account_hash,
BackendEvent::Refresh(RefreshEvent {
account_hash,
mailbox_hash,
kind: Create(Box::new(env.clone())),
}),
);
}
}
}
Err(err) => {
debug!("could not parse message {:?}", err);
}
}
}
for &mailbox_hash in mailbox_hashes {
(sender)(
account_hash,
BackendEvent::Refresh(RefreshEvent {
account_hash,
mailbox_hash,
kind: NewFlags(env_hash, tags.clone()),
}),
);
}
} else {
match notmuch_message_into_envelope(
lib.clone(),
index.clone(),
tag_index.clone(),
database.clone(),
message,
) {
Ok(env) => {
for (&mailbox_hash, m) in mailboxes_lck.iter() {
let query_str = format!(
"{} id:{}",
m.query_str.as_str(),
c_str.to_string_lossy()
);
let database_lck =
database.inner.read().unwrap();
let query: Query = Query::new(
lib.clone(),
&database_lck,
&query_str,
)?;
if query.count().unwrap_or(0) > 0 {
}
drop(query);
let database_lck = database.inner.read().unwrap();
index.write().unwrap().retain(|&env_hash, msg_id| {
let mut message: *mut notmuch_message_t = std::ptr::null_mut();
if let Err(err) = unsafe {
try_call!(
lib,
call!(lib, notmuch_database_find_message)(
*database_lck,
msg_id.as_ptr(),
&mut message as *mut _,
)
)
} {
debug!(err);
false
} else {
if message.is_null() {
if let Some(mailbox_hashes) =
mailbox_index_lck.get(&env_hash)
{
for &mailbox_hash in mailbox_hashes {
let m = &mailboxes_lck[&mailbox_hash];
let mut total_lck = m.total.lock().unwrap();
let mut unseen_lck =
m.unseen.lock().unwrap();
*total_lck += 1;
if !env.is_seen() {
*unseen_lck += 1;
}
*total_lck = total_lck.saturating_sub(1);
(sender)(
account_hash,
BackendEvent::Refresh(RefreshEvent {
account_hash,
mailbox_hash,
kind: Create(Box::new(env.clone())),
kind: Remove(env_hash),
}),
);
}
}
}
Err(err) => {
debug!("could not parse message {:?}", err);
}
!message.is_null()
}
}
}
drop(query);
let database_lck = database.inner.read().unwrap();
index.write().unwrap().retain(|&env_hash, msg_id| {
let mut message: *mut notmuch_message_t = std::ptr::null_mut();
if let Err(err) = unsafe {
try_call!(
lib,
call!(lib, notmuch_database_find_message)(
*database_lck,
msg_id.as_ptr(),
&mut message as *mut _,
)
)
} {
debug!(err);
false
} else {
if message.is_null() {
if let Some(mailbox_hashes) =
mailbox_index_lck.get(&env_hash)
{
for &mailbox_hash in mailbox_hashes {
let m = &mailboxes_lck[&mailbox_hash];
let mut total_lck = m.total.lock().unwrap();
*total_lck = total_lck.saturating_sub(1);
(sender)(
account_hash,
BackendEvent::Refresh(RefreshEvent {
account_hash,
mailbox_hash,
kind: Remove(env_hash),
}),
);
}
}
}
!message.is_null()
}
});
});
*revision_uuid_lck = new_revision;
*revision_uuid_lck = new_revision;
}
}
}
}
};
};
if let Err(err) = c(&sender) {
(sender)(
account_hash,
BackendEvent::Refresh(RefreshEvent {
if let Err(err) = c(&sender) {
(sender)(
account_hash,
mailbox_hash: 0,
kind: Failure(err),
}),
);
}
})?;
Ok(handle.thread().id())
}
fn mailboxes(&self) -> Result<HashMap<MailboxHash, Mailbox>> {
Ok(self
BackendEvent::Refresh(RefreshEvent {
account_hash,
mailbox_hash: 0,
kind: Failure(err),
}),
);
}
})?;
Ok(handle.thread().id())
}
*/
fn mailboxes(&self) -> ResultFuture<HashMap<MailboxHash, Mailbox>> {
let ret = Ok(self
.mailboxes
.read()
.unwrap()
.iter()
.map(|(k, f)| (*k, BackendMailbox::clone(f)))
.collect())
.collect());
Ok(Box::pin(async { ret }))
}
fn operation(&self, hash: EnvelopeHash) -> Result<Box<dyn BackendOp>> {
@ -851,12 +890,13 @@ impl BackendOp for NotmuchOp {
}
}
pub struct MessageIterator {
pub struct MessageIterator<'query> {
lib: Arc<libloading::Library>,
messages: *mut notmuch_messages_t,
_ph: std::marker::PhantomData<*const Query<'query>>,
}
impl Iterator for MessageIterator {
impl Iterator for MessageIterator<'_> {
type Item = *mut notmuch_message_t;
fn next(&mut self) -> Option<Self::Item> {
if self.messages.is_null() {
@ -1011,7 +1051,7 @@ impl<'s> Query<'s> {
Ok(count)
}
fn search(&self) -> Result<MessageIterator> {
fn search(&'s self) -> Result<MessageIterator<'s>> {
let mut messages: *mut notmuch_messages_t = std::ptr::null_mut();
let status = unsafe {
call!(self.lib, notmuch_query_search_messages)(self.ptr, &mut messages as *mut _)
@ -1026,6 +1066,7 @@ impl<'s> Query<'s> {
Ok(MessageIterator {
messages,
lib: self.lib.clone(),
_ph: std::marker::PhantomData,
})
}
}
@ -1038,6 +1079,23 @@ impl Drop for Query<'_> {
}
}
fn notmuch_message_insert(
lib: &libloading::Library,
index: &RwLock<HashMap<EnvelopeHash, CString>>,
message: *mut notmuch_message_t,
) -> CString {
let msg_id = unsafe { call!(lib, notmuch_message_get_message_id)(message) };
let env_hash = {
let c_str = unsafe { CStr::from_ptr(msg_id) };
let mut hasher = DefaultHasher::default();
c_str.hash(&mut hasher);
hasher.finish()
};
let c_str = unsafe { CStr::from_ptr(msg_id) };
index.write().unwrap().insert(env_hash, c_str.into());
c_str.into()
}
fn notmuch_message_into_envelope(
lib: Arc<libloading::Library>,
index: Arc<RwLock<HashMap<EnvelopeHash, CString>>>,

View File

@ -31,7 +31,6 @@
//! values (see module `thread`)
//!
//! Other exports are
//! - Thread management (see module `async_workers`)
//! - Basic mail account configuration to use with `backends` (see module `conf`)
//! - Parser combinators (see module `parsec`)
//! - A `ShellExpandTrait` to expand paths like a shell.
@ -105,7 +104,6 @@ pub use self::logging::LoggingLevel::*;
pub use self::logging::*;
pub mod addressbook;
pub mod async_workers;
pub mod backends;
mod collection;
pub mod conf;

View File

@ -75,9 +75,6 @@ use crate::components::*;
pub mod conf;
use crate::conf::*;
pub mod workers;
use crate::workers::*;
#[cfg(feature = "sqlite3")]
pub mod sqlite3;
@ -468,9 +465,6 @@ fn run_app(opt: Opt) -> Result<()> {
state.check_accounts();
state.redraw();
},
ThreadEvent::NewThread(id, name) => {
state.new_thread(id, name);
},
ThreadEvent::JobFinished(id) => {
debug!("Job finished {}", id);
for account in state.context.accounts.values_mut() {

View File

@ -50,7 +50,7 @@ impl Component for StatusPanel {
self.draw_accounts(context);
let (width, height) = self.content.size();
{
let (_, y) = write_string_to_grid(
let (_, _) = write_string_to_grid(
"Worker threads",
&mut self.content,
self.theme_default.fg,
@ -59,6 +59,7 @@ impl Component for StatusPanel {
((1, 1), (width - 1, height - 1)),
Some(1),
);
/*
let mut y = y + 1;
let work_controller = context.work_controller().threads.lock().unwrap();
let mut workers: Vec<&Worker> = work_controller.values().collect::<Vec<&Worker>>();
@ -130,6 +131,7 @@ impl Component for StatusPanel {
y = y_off + 1;
}
*/
}
let (cols, rows) = (width!(area), height!(area));
self.cursor = (

View File

@ -350,18 +350,7 @@ impl FileSettings {
e.to_string()
))
})?;
let mut backends = melib::backends::Backends::new();
let plugin_manager = crate::plugins::PluginManager::new();
for (_, p) in s.plugins.clone() {
if crate::plugins::PluginKind::Backend == p.kind() {
crate::plugins::backend::PluginBackend::register(
plugin_manager.listener(),
p.clone(),
&mut backends,
);
}
}
let backends = melib::backends::Backends::new();
let Themes {
light: default_light,
dark: default_dark,

View File

@ -26,7 +26,6 @@
use super::{AccountConf, FileMailboxConf};
use crate::jobs::{JobChannel, JobExecutor, JobId, JoinHandle};
use indexmap::IndexMap;
use melib::async_workers::{Async, AsyncBuilder, AsyncStatus, WorkContext};
use melib::backends::*;
use melib::email::*;
use melib::error::{MeliError, Result};
@ -72,8 +71,6 @@ macro_rules! try_recv_timeout {
}};
}
pub type Worker = Option<Async<Result<Vec<Envelope>>>>;
#[derive(Debug)]
pub enum MailboxStatus {
Available,
@ -112,7 +109,6 @@ pub struct MailboxEntry {
pub name: String,
pub ref_mailbox: Mailbox,
pub conf: FileMailboxConf,
pub worker: Worker,
}
impl MailboxEntry {
@ -152,7 +148,6 @@ pub struct Account {
sent_mailbox: Option<MailboxHash>,
pub(crate) collection: Collection,
pub(crate) address_book: AddressBook,
pub(crate) work_context: WorkContext,
pub(crate) settings: AccountConf,
pub(crate) backend: Arc<RwLock<Box<dyn MailBackend>>>,
@ -354,7 +349,6 @@ impl Account {
name: String,
mut settings: AccountConf,
map: &Backends,
work_context: WorkContext,
job_executor: Arc<JobExecutor>,
sender: Sender<ThreadEvent>,
event_consumer: BackendEventConsumer,
@ -397,17 +391,18 @@ impl Account {
let mut active_jobs = HashMap::default();
let mut active_job_instants = BTreeMap::default();
if backend.capabilities().is_async {
if let Ok(mailboxes_job) = backend.mailboxes_async() {
if let Ok(online_job) = backend.is_online_async() {
let (rcvr, handle, job_id) =
job_executor.spawn_specialized(online_job.then(|_| mailboxes_job));
active_jobs.insert(job_id, JobRequest::Mailboxes(handle, rcvr));
active_job_instants.insert(std::time::Instant::now(), job_id);
}
if let Ok(mailboxes_job) = backend.mailboxes() {
if let Ok(online_job) = backend.is_online() {
let (rcvr, handle, job_id) = if backend.capabilities().is_async {
job_executor.spawn_specialized(online_job.then(|_| mailboxes_job))
} else {
job_executor.spawn_blocking(online_job.then(|_| mailboxes_job))
};
active_jobs.insert(job_id, JobRequest::Mailboxes(handle, rcvr));
active_job_instants.insert(std::time::Instant::now(), job_id);
}
}
let mut ret = Account {
Ok(Account {
hash,
name,
is_online: if !backend.capabilities().is_remote {
@ -422,7 +417,6 @@ impl Account {
address_book,
sent_mailbox: Default::default(),
collection: Default::default(),
work_context,
settings,
sender,
job_executor,
@ -431,21 +425,10 @@ impl Account {
event_queue: VecDeque::with_capacity(8),
backend_capabilities: backend.capabilities(),
backend: Arc::new(RwLock::new(backend)),
};
if !ret.backend_capabilities.is_remote && !ret.backend_capabilities.is_async {
ret.init(None)?;
}
Ok(ret)
})
}
fn init(&mut self, ref_mailboxes: Option<HashMap<MailboxHash, Mailbox>>) -> Result<()> {
let mut ref_mailboxes: HashMap<MailboxHash, Mailbox> = if let Some(v) = ref_mailboxes {
v
} else {
self.backend.read().unwrap().mailboxes()?
};
fn init(&mut self, mut ref_mailboxes: HashMap<MailboxHash, Mailbox>) -> Result<()> {
self.backend_capabilities = self.backend.read().unwrap().capabilities();
let mut mailbox_entries: IndexMap<MailboxHash, MailboxEntry> =
IndexMap::with_capacity_and_hasher(ref_mailboxes.len(), Default::default());
@ -493,7 +476,6 @@ impl Account {
name: f.path().to_string(),
status: MailboxStatus::None,
conf: conf.clone(),
worker: None,
},
);
} else {
@ -518,7 +500,6 @@ impl Account {
name: f.path().to_string(),
status: MailboxStatus::None,
conf: new,
worker: None,
},
);
}
@ -574,30 +555,22 @@ impl Account {
{
let total = entry.ref_mailbox.count().ok().unwrap_or((0, 0)).1;
entry.status = MailboxStatus::Parsing(0, total);
if self.backend_capabilities.is_async {
if let Ok(mailbox_job) = self.backend.write().unwrap().fetch_async(*h) {
let mailbox_job = mailbox_job.into_future();
let (rcvr, handle, job_id) =
self.job_executor.spawn_specialized(mailbox_job);
self.sender
.send(ThreadEvent::UIEvent(UIEvent::StatusEvent(
StatusEvent::NewJob(job_id),
)))
.unwrap();
self.active_jobs
.insert(job_id, JobRequest::Fetch(*h, handle, rcvr));
self.active_job_instants
.insert(std::time::Instant::now(), job_id);
}
} else {
entry.worker =
match Account::new_worker(&f, &mut self.backend, &self.work_context) {
Ok(v) => v,
Err(err) => {
entry.status = MailboxStatus::Failed(err);
None
}
};
if let Ok(mailbox_job) = self.backend.write().unwrap().fetch(*h) {
let mailbox_job = mailbox_job.into_future();
let (rcvr, handle, job_id) = if self.backend_capabilities.is_async {
self.job_executor.spawn_specialized(mailbox_job)
} else {
self.job_executor.spawn_blocking(mailbox_job)
};
self.sender
.send(ThreadEvent::UIEvent(UIEvent::StatusEvent(
StatusEvent::NewJob(job_id),
)))
.unwrap();
self.active_jobs
.insert(job_id, JobRequest::Fetch(*h, handle, rcvr));
self.active_job_instants
.insert(std::time::Instant::now(), job_id);
}
}
});
@ -612,13 +585,9 @@ impl Account {
Ok(())
}
fn new_worker(
mailbox: &Mailbox,
backend: &Arc<RwLock<Box<dyn MailBackend>>>,
work_context: &WorkContext,
) -> Result<Worker> {
fn new_worker(mailbox: &Mailbox, backend: &Arc<RwLock<Box<dyn MailBackend>>>) -> Result<()> {
let mailbox_hash = mailbox.hash();
let mut mailbox_handle = backend.write().unwrap().fetch(mailbox_hash)?;
let mailbox_handle = backend.write().unwrap().fetch(mailbox_hash)?;
let priority = match mailbox.special_usage() {
SpecialUsageMailbox::Inbox => 0,
SpecialUsageMailbox::Sent => 1,
@ -822,31 +791,7 @@ impl Account {
return Some(EnvelopeRemove(env_hash, thread_hash));
}
RefreshEventKind::Rescan => {
let handle = match Account::new_worker(
&self.mailbox_entries[&mailbox_hash].ref_mailbox,
&mut self.backend,
&self.work_context,
) {
Ok(v) => v,
Err(err) => {
let ret = Some(Notification(
None,
err.to_string(),
Some(crate::types::NotificationType::ERROR),
));
self.mailbox_entries
.entry(mailbox_hash)
.and_modify(|entry| {
entry.status = MailboxStatus::Failed(err);
});
return ret;
}
};
self.mailbox_entries
.entry(mailbox_hash)
.and_modify(|entry| {
entry.worker = handle;
});
self.watch();
}
RefreshEventKind::Failure(err) => {
debug!("RefreshEvent Failure: {}", err.to_string());
@ -891,62 +836,40 @@ impl Account {
.unwrap();
return Ok(());
}
if self.backend_capabilities.is_async {
if let Ok(refresh_job) = self.backend.write().unwrap().refresh_async(mailbox_hash) {
let (rcvr, handle, job_id) = self.job_executor.spawn_specialized(refresh_job);
self.sender
.send(ThreadEvent::UIEvent(UIEvent::StatusEvent(
StatusEvent::NewJob(job_id),
)))
.unwrap();
self.active_jobs
.insert(job_id, JobRequest::Refresh(mailbox_hash, handle, rcvr));
self.active_job_instants
.insert(std::time::Instant::now(), job_id);
}
} else {
let mut h = self.backend.write().unwrap().refresh(mailbox_hash)?;
self.work_context.new_work.send(h.work().unwrap()).unwrap();
if let Ok(refresh_job) = self.backend.write().unwrap().refresh(mailbox_hash) {
let (rcvr, handle, job_id) = if self.backend_capabilities.is_async {
self.job_executor.spawn_specialized(refresh_job)
} else {
self.job_executor.spawn_blocking(refresh_job)
};
self.sender
.send(ThreadEvent::UIEvent(UIEvent::StatusEvent(
StatusEvent::NewJob(job_id),
)))
.unwrap();
self.active_jobs
.insert(job_id, JobRequest::Refresh(mailbox_hash, handle, rcvr));
self.active_job_instants
.insert(std::time::Instant::now(), job_id);
}
Ok(())
}
pub fn watch(&mut self) {
if self.settings.account().manual_refresh {
return;
}
if self.backend_capabilities.is_async {
if !self.active_jobs.values().any(|j| j.is_watch()) {
match self.backend.read().unwrap().watch_async() {
Ok(fut) => {
let (handle, job_id) = self.job_executor.spawn(fut);
self.active_jobs.insert(job_id, JobRequest::Watch(handle));
}
Err(e) => {
self.sender
.send(ThreadEvent::UIEvent(UIEvent::StatusEvent(
StatusEvent::DisplayMessage(e.to_string()),
)))
.unwrap();
}
if !self.active_jobs.values().any(|j| j.is_watch()) {
match self.backend.read().unwrap().watch() {
Ok(fut) => {
let (_channel, handle, job_id) = if self.backend_capabilities.is_async {
self.job_executor.spawn_specialized(fut)
} else {
self.job_executor.spawn_blocking(fut)
};
self.active_jobs.insert(job_id, JobRequest::Watch(handle));
}
}
} else {
match self
.backend
.read()
.unwrap()
.watch(self.work_context.clone())
{
Ok(id) => {
self.sender
.send(ThreadEvent::NewThread(
id,
format!("watching {}", self.name()).into(),
))
.unwrap();
}
Err(e) => {
self.sender
.send(ThreadEvent::UIEvent(UIEvent::StatusEvent(
@ -994,95 +917,37 @@ impl Account {
if mailbox_hash == 0 {
return Err(0);
}
loop {
match self
.mailbox_entries
.get_mut(&mailbox_hash)
.unwrap()
.worker
.as_mut()
match self.mailbox_entries[&mailbox_hash].status {
MailboxStatus::Available | MailboxStatus::Parsing(_, _)
if self
.collection
.mailboxes
.read()
.unwrap()
.contains_key(&mailbox_hash) =>
{
None => {
return match self.mailbox_entries[&mailbox_hash].status {
MailboxStatus::Available | MailboxStatus::Parsing(_, _)
if self
.collection
.mailboxes
.read()
.unwrap()
.contains_key(&mailbox_hash) =>
{
Ok(())
Ok(())
}
MailboxStatus::None => {
if !self.active_jobs.values().any(|j| j.is_fetch(mailbox_hash)) {
let mailbox_job = self.backend.write().unwrap().fetch(mailbox_hash);
match mailbox_job {
Ok(mailbox_job) => {
let mailbox_job = mailbox_job.into_future();
let (rcvr, handle, job_id) = if self.backend_capabilities.is_async {
self.job_executor.spawn_specialized(mailbox_job)
} else {
self.job_executor.spawn_blocking(mailbox_job)
};
self.sender
.send(ThreadEvent::UIEvent(UIEvent::StatusEvent(
StatusEvent::NewJob(job_id),
)))
.unwrap();
self.active_jobs
.insert(job_id, JobRequest::Fetch(mailbox_hash, handle, rcvr));
}
MailboxStatus::None => {
if self.backend_capabilities.is_async {
if !self.active_jobs.values().any(|j| j.is_fetch(mailbox_hash)) {
let mailbox_job =
self.backend.write().unwrap().fetch_async(mailbox_hash);
match mailbox_job {
Ok(mailbox_job) => {
let mailbox_job = mailbox_job.into_future();
let (rcvr, handle, job_id) =
self.job_executor.spawn_specialized(mailbox_job);
self.sender
.send(ThreadEvent::UIEvent(UIEvent::StatusEvent(
StatusEvent::NewJob(job_id),
)))
.unwrap();
self.active_jobs.insert(
job_id,
JobRequest::Fetch(mailbox_hash, handle, rcvr),
);
}
Err(err) => {
self.mailbox_entries.entry(mailbox_hash).and_modify(
|entry| {
entry.status = MailboxStatus::Failed(err);
},
);
self.sender
.send(ThreadEvent::UIEvent(UIEvent::StartupCheck(
mailbox_hash,
)))
.unwrap();
}
}
}
} else if self.mailbox_entries[&mailbox_hash].worker.is_none() {
let handle = match Account::new_worker(
&self.mailbox_entries[&mailbox_hash].ref_mailbox,
&mut self.backend,
&self.work_context,
) {
Ok(v) => v,
Err(err) => {
self.mailbox_entries.entry(mailbox_hash).and_modify(
|entry| {
entry.status = MailboxStatus::Failed(err);
},
);
return Err(0);
}
};
self.mailbox_entries
.entry(mailbox_hash)
.and_modify(|entry| {
entry.worker = handle;
});
}
self.collection.new_mailbox(mailbox_hash);
Err(0)
}
_ => Err(0),
};
}
Some(ref mut w) => match debug!(w.poll()) {
Ok(AsyncStatus::NoUpdate) => {
break;
}
Ok(AsyncStatus::Payload(payload)) => {
debug!("got payload in status for {}", mailbox_hash);
if let Err(err) = payload {
Err(err) => {
self.mailbox_entries
.entry(mailbox_hash)
.and_modify(|entry| {
@ -1091,85 +956,13 @@ impl Account {
self.sender
.send(ThreadEvent::UIEvent(UIEvent::StartupCheck(mailbox_hash)))
.unwrap();
return Err(0);
}
let envelopes = payload
.unwrap()
.into_iter()
.map(|e| (e.hash(), e))
.collect::<HashMap<EnvelopeHash, Envelope>>();
self.mailbox_entries
.entry(mailbox_hash)
.and_modify(|entry| match entry.status {
MailboxStatus::None => {
entry.status = MailboxStatus::Parsing(envelopes.len(), 0);
}
MailboxStatus::Parsing(ref mut done, _) => {
*done += envelopes.len();
}
MailboxStatus::Failed(_) => {
entry.status = MailboxStatus::Parsing(envelopes.len(), 0);
}
MailboxStatus::Available => {}
});
if let Some(updated_mailboxes) =
self.collection
.merge(envelopes, mailbox_hash, self.sent_mailbox)
{
for f in updated_mailboxes {
self.sender
.send(ThreadEvent::UIEvent(UIEvent::StartupCheck(f)))
.unwrap();
}
}
self.sender
.send(ThreadEvent::UIEvent(UIEvent::StartupCheck(mailbox_hash)))
.unwrap();
}
Ok(AsyncStatus::Finished) => {
debug!("got finished in status for {}", mailbox_hash);
self.mailbox_entries
.entry(mailbox_hash)
.and_modify(|entry| {
entry.status = MailboxStatus::Available;
entry.worker = None;
});
self.sender
.send(ThreadEvent::UIEvent(UIEvent::MailboxUpdate((
self.hash,
mailbox_hash,
))))
.unwrap();
}
Ok(AsyncStatus::ProgressReport(n)) => {
self.mailbox_entries
.entry(mailbox_hash)
.and_modify(|entry| match entry.status {
MailboxStatus::Parsing(ref mut d, _) => {
*d += n;
}
_ => {}
});
//return Err(n);
}
_ => {
break;
}
},
};
}
if self.mailbox_entries[&mailbox_hash].status.is_available()
|| (self.mailbox_entries[&mailbox_hash].status.is_parsing()
&& self
.collection
.mailboxes
.read()
.unwrap()
.contains_key(&mailbox_hash))
{
Ok(())
} else {
Err(0)
}
self.collection.new_mailbox(mailbox_hash);
Err(0)
}
_ => Err(0),
}
}
@ -1403,14 +1196,7 @@ impl Account {
parent.ref_mailbox = mailboxes.remove(&parent_hash).unwrap();
});
}
let (status, worker) = match Account::new_worker(
&mailboxes[&mailbox_hash],
&mut self.backend,
&self.work_context,
) {
Ok(v) => (MailboxStatus::Parsing(0, 0), v),
Err(err) => (MailboxStatus::Failed(err), None),
};
let status = MailboxStatus::Parsing(0, 0);
self.mailbox_entries.insert(
mailbox_hash,
@ -1418,7 +1204,6 @@ impl Account {
name: mailboxes[&mailbox_hash].path().to_string(),
status,
conf: new,
worker,
ref_mailbox: mailboxes.remove(&mailbox_hash).unwrap(),
},
);
@ -1486,8 +1271,6 @@ impl Account {
&self.mailbox_entries,
&mut self.mailboxes_order,
);
// FIXME Kill worker as well
// FIXME remove from settings as well
Ok(format!(
@ -1577,34 +1360,21 @@ impl Account {
{
return self.is_online.clone();
}
if self.backend_capabilities.is_async {
if self.is_online.is_ok() && !timeout {
return Ok(());
}
if !self.active_jobs.values().any(JobRequest::is_online) {
let online_job = self.backend.read().unwrap().is_online_async();
if let Ok(online_job) = online_job {
let (rcvr, handle, job_id) = self.job_executor.spawn_specialized(online_job);
self.insert_job(job_id, JobRequest::IsOnline(handle, rcvr));
}
}
return self.is_online.clone();
} else {
let ret = self.backend.read().unwrap().is_online();
if ret.is_ok() != self.is_online.is_ok() {
if ret.is_ok() {
self.last_online_request = std::time::Instant::now();
self.init(None)?;
}
self.sender
.send(ThreadEvent::UIEvent(UIEvent::AccountStatusChange(
self.hash,
)))
.unwrap();
}
self.is_online = ret.clone();
ret
if self.is_online.is_ok() && !timeout {
return Ok(());
}
if !self.active_jobs.values().any(JobRequest::is_online) {
let online_job = self.backend.read().unwrap().is_online();
if let Ok(online_job) = online_job {
let (rcvr, handle, job_id) = if self.backend_capabilities.is_async {
self.job_executor.spawn_specialized(online_job)
} else {
self.job_executor.spawn_blocking(online_job)
};
self.insert_job(job_id, JobRequest::IsOnline(handle, rcvr));
}
}
return self.is_online.clone();
}
pub fn search(
@ -1657,8 +1427,7 @@ impl Account {
match self.active_jobs.remove(job_id).unwrap() {
JobRequest::Mailboxes(_, ref mut chan) => {
if let Some(mailboxes) = chan.try_recv().unwrap() {
if let Err(err) = mailboxes.and_then(|mailboxes| self.init(Some(mailboxes)))
{
if let Err(err) = mailboxes.and_then(|mailboxes| self.init(mailboxes)) {
if err.kind.is_authentication() {
self.sender
.send(ThreadEvent::UIEvent(UIEvent::Notification(
@ -1670,9 +1439,7 @@ impl Account {
self.is_online = Err(err);
return true;
}
if let Ok(mailboxes_job) =
self.backend.read().unwrap().mailboxes_async()
{
if let Ok(mailboxes_job) = self.backend.read().unwrap().mailboxes() {
let (rcvr, handle, job_id) =
self.job_executor.spawn_specialized(mailboxes_job);
self.active_jobs
@ -1699,7 +1466,6 @@ impl Account {
.entry(mailbox_hash)
.and_modify(|entry| {
entry.status = MailboxStatus::Available;
entry.worker = None;
});
self.sender
.send(ThreadEvent::UIEvent(UIEvent::MailboxUpdate((
@ -1789,7 +1555,7 @@ impl Account {
}
self.is_online = is_online;
}
if let Ok(online_job) = self.backend.read().unwrap().is_online_async() {
if let Ok(online_job) = self.backend.read().unwrap().is_online() {
let (rcvr, handle, job_id) =
self.job_executor.spawn_specialized(online_job);
self.active_jobs

View File

@ -157,8 +157,6 @@ impl JobExecutor {
F: Future<Output = Result<()>> + Send + 'static,
{
let job_id = JobId::new();
let _job_id = job_id;
let __job_id = job_id;
let finished_sender = self.sender.clone();
let injector = self.global_queue.clone();
// Create a task and schedule it for execution.
@ -166,11 +164,11 @@ impl JobExecutor {
async move {
let r = future.await;
finished_sender
.send(ThreadEvent::JobFinished(__job_id))
.send(ThreadEvent::JobFinished(job_id))
.unwrap();
r
},
move |task| injector.push(MeliTask { task, id: _job_id }),
move |task| injector.push(MeliTask { task, id: job_id }),
(),
);
task.schedule();
@ -191,8 +189,6 @@ impl JobExecutor {
let (sender, receiver) = oneshot::channel();
let finished_sender = self.sender.clone();
let job_id = JobId::new();
let _job_id = job_id;
let __job_id = job_id;
let injector = self.global_queue.clone();
// Create a task and schedule it for execution.
let (task, handle) = async_task::spawn(
@ -200,11 +196,11 @@ impl JobExecutor {
let res = future.await;
let _ = sender.send(res);
finished_sender
.send(ThreadEvent::JobFinished(__job_id))
.send(ThreadEvent::JobFinished(job_id))
.unwrap();
Ok(())
},
move |task| injector.push(MeliTask { task, id: _job_id }),
move |task| injector.push(MeliTask { task, id: job_id }),
(),
);
task.schedule();

View File

@ -63,9 +63,6 @@ use crate::components::*;
pub mod conf;
use crate::conf::*;
pub mod workers;
use crate::workers::*;
#[cfg(feature = "sqlite3")]
pub mod sqlite3;

View File

@ -28,7 +28,7 @@ use std::os::unix::net::{UnixListener, UnixStream};
use std::process::Stdio;
use uuid::Uuid;
pub mod backend;
//pub mod backend;
pub mod rpc;
pub use rpc::*;

View File

@ -191,17 +191,10 @@ impl MailBackend for PluginBackend {
Ok(w.build(handle))
}
fn refresh(&mut self, _mailbox_hash: MailboxHash) -> Result<Async<()>> {
Err(MeliError::new("Unimplemented."))
}
fn watch(&self, _work_context: WorkContext) -> Result<std::thread::ThreadId> {
Err(MeliError::new("Unimplemented."))
}
fn mailboxes(&self) -> Result<HashMap<MailboxHash, Mailbox>> {
fn mailboxes(&self) -> ResultFuture<HashMap<MailboxHash, Mailbox>> {
let mut ret: HashMap<MailboxHash, Mailbox> = Default::default();
ret.insert(0, Mailbox::default());
Ok(ret)
Ok(Box::pin(async { Ok(ret) }))
}
fn operation(&self, hash: EnvelopeHash) -> Result<Box<dyn BackendOp>> {

View File

@ -113,7 +113,6 @@ pub struct Context {
sender: Sender<ThreadEvent>,
receiver: Receiver<ThreadEvent>,
input_thread: InputHandler,
work_controller: WorkController,
job_executor: Arc<JobExecutor>,
pub children: Vec<std::process::Child>,
pub plugin_manager: PluginManager,
@ -167,10 +166,6 @@ impl Context {
let idx = self.accounts.get_index_of(&account_hash).unwrap();
self.is_online_idx(idx)
}
pub fn work_controller(&self) -> &WorkController {
&self.work_controller
}
}
/// A State object to manage and own components and components of the UI. `State` is responsible for
@ -246,6 +241,7 @@ impl State {
};
let mut plugin_manager = PluginManager::new();
for (_, p) in settings.plugins.clone() {
/*
if crate::plugins::PluginKind::Backend == p.kind() {
debug!("registering {:?}", &p);
crate::plugins::backend::PluginBackend::register(
@ -254,6 +250,7 @@ impl State {
&mut backends,
);
}
*/
plugin_manager.register(p)?;
}
@ -261,7 +258,6 @@ impl State {
let cols = termsize.0 as usize;
let rows = termsize.1 as usize;
let work_controller = WorkController::new(sender.clone());
let job_executor = Arc::new(JobExecutor::new(sender.clone()));
let accounts = {
settings
@ -281,7 +277,6 @@ impl State {
n.to_string(),
a_s.clone(),
&backends,
work_controller.get_context(),
job_executor.clone(),
sender.clone(),
BackendEventConsumer::new(Arc::new(
@ -346,7 +341,6 @@ impl State {
dirty_areas: VecDeque::with_capacity(5),
replies: VecDeque::with_capacity(5),
temp_files: Vec::new(),
work_controller,
job_executor,
children: vec![],
plugin_manager,
@ -422,15 +416,6 @@ impl State {
}
}
pub fn new_thread(&mut self, id: thread::ThreadId, name: String) {
self.context
.work_controller
.static_threads
.lock()
.unwrap()
.insert(id, name.into());
}
/// Switch back to the terminal's main screen (The command line the user sees before opening
/// the application)
pub fn switch_to_main_screen(&mut self) {

View File

@ -44,7 +44,6 @@ use melib::backends::{AccountHash, BackendEvent, MailboxHash};
use melib::{EnvelopeHash, RefreshEvent, ThreadHash};
use nix::unistd::Pid;
use std::fmt;
use std::thread;
use uuid::Uuid;
#[derive(Debug)]
@ -62,7 +61,6 @@ pub enum StatusEvent {
/// to the main process.
#[derive(Debug)]
pub enum ThreadEvent {
NewThread(thread::ThreadId, String),
/// User input.
Input((Key, Vec<u8>)),
/// User input and input as raw bytes.

View File

@ -1,385 +0,0 @@
/*
* meli
*
* Copyright 2017-2020 Manos Pitsidianakis
*
* This file is part of meli.
*
* meli is free software: you can redistribute it and/or modify
* it under the terms of the GNU General Public License as published by
* the Free Software Foundation, either version 3 of the License, or
* (at your option) any later version.
*
* meli is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* GNU General Public License for more details.
*
* You should have received a copy of the GNU General Public License
* along with meli. If not, see <http://www.gnu.org/licenses/>.
*/
/*! Simple blocking job control.
*/
use crate::types::ThreadEvent;
use crossbeam::{
channel::{bounded, unbounded, Sender},
select,
};
use melib::async_workers::{Work, WorkContext};
use melib::datetime::{self, UnixTimestamp};
use melib::text_processing::Truncate;
use std::collections::HashMap;
use std::sync::atomic::{AtomicUsize, Ordering};
use std::sync::Arc;
use std::sync::Mutex;
use std::thread;
const MAX_WORKER: usize = 4;
/// Representation of a worker thread for use in `WorkController`. These values are to be displayed
/// to the user.
#[derive(Debug)]
pub struct Worker {
pub name: String,
pub status: String,
pub heartbeat: UnixTimestamp,
}
impl From<String> for Worker {
fn from(val: String) -> Self {
Worker {
name: val,
status: String::new(),
heartbeat: datetime::now(),
}
}
}
pub struct WorkController {
pub queue: WorkQueue,
thread_end_tx: Sender<bool>,
/// Worker threads that take up on jobs from self.queue
pub threads: Arc<Mutex<HashMap<thread::ThreadId, Worker>>>,
/// Special function threads that live indefinitely (eg watching a mailbox)
pub static_threads: Arc<Mutex<HashMap<thread::ThreadId, Worker>>>,
work_context: WorkContext,
}
impl Drop for WorkController {
fn drop(&mut self) {
if let Ok(lock) = self.threads.lock() {
for _ in 0..lock.len() {
let _ = self.thread_end_tx.send(true);
}
}
}
}
#[derive(Clone)]
pub struct WorkQueue {
inner: Arc<Mutex<Vec<Work>>>,
new_jobs_tx: Sender<bool>,
work_context: WorkContext,
}
impl WorkQueue {
fn new(new_jobs_tx: Sender<bool>, work_context: WorkContext) -> Self {
Self {
inner: Arc::new(Mutex::new(Vec::new())),
new_jobs_tx,
work_context,
}
}
/// Blocks the current thread until work is available, then
/// gets the data required to perform that work.
///
/// # Errors
/// Returns None if there is no more work in the queue.
///
/// # Panics
/// Panics if the underlying mutex became poisoned. This is exceedingly
/// unlikely.
fn get_work(&self) -> Option<Work> {
// try to get a lock on the mutex.
let maybe_queue = self.inner.lock();
if let Ok(mut queue) = maybe_queue {
if queue.is_empty() {
return None;
} else {
return Some(queue.swap_remove(0));
}
} else {
// poisoned mutex, some other thread holding the mutex has panicked!
panic!("WorkQueue::get_work() tried to lock a poisoned mutex");
}
}
// Both the controller (main thread) and workers can use this
// function to add work to the queue.
/// Blocks the current thread until work can be added, then
/// adds that work to the end of the queue.
/// Returns the amount of work now in the queue.
///
/// # Panics
/// Panics if the underlying mutex became poisoned. This is exceedingly
/// unlikely.
pub fn add_work(&self, work: Work) {
if work.is_static {
self.work_context.new_work.send(work).unwrap();
return;
}
// As above, try to get a lock on the mutex.
if let Ok(mut queue) = self.inner.lock() {
/* Insert in position that maintains the queue sorted */
let pos = match queue.binary_search_by(|probe| probe.cmp(&work)) {
Ok(p) => p,
Err(p) => p,
};
queue.insert(pos, work);
/* inform threads that new job is available */
self.new_jobs_tx.send(true).unwrap();
} else {
panic!("WorkQueue::add_work() tried to lock a poisoned mutex");
}
}
}
impl WorkController {
pub fn new(pulse: Sender<ThreadEvent>) -> WorkController {
let (new_jobs_tx, new_jobs_rx) = unbounded();
/* create a channel for jobs to send new work to Controller thread */
let (new_work_tx, new_work_rx) = unbounded();
/* create a channel for jobs to set their names */
let (set_name_tx, set_name_rx) = unbounded();
/* create a channel for jobs to set their statuses */
let (set_status_tx, set_status_rx) = unbounded();
/* create a channel for jobs to announce their demise */
let (finished_tx, finished_rx) = unbounded();
/* each associated thread will hold a copy of this context item in order to communicate
* with the controller thread */
let work_context = WorkContext {
new_work: new_work_tx,
set_name: set_name_tx,
set_status: set_status_tx,
finished: finished_tx,
};
let queue: WorkQueue = WorkQueue::new(new_jobs_tx, work_context.clone());
let active_threads = Arc::new(AtomicUsize::new(MAX_WORKER));
// Create a SyncFlag to share whether or not there are more jobs to be done.
let (thread_end_tx, thread_end_rx) = bounded(1);
let threads_lock: Arc<Mutex<HashMap<thread::ThreadId, Worker>>> =
Arc::new(Mutex::new(HashMap::default()));
let static_threads_lock: Arc<Mutex<HashMap<thread::ThreadId, Worker>>> =
Arc::new(Mutex::new(HashMap::default()));
let mut threads = threads_lock.lock().unwrap();
/* spawn worker threads */
for _ in 0..MAX_WORKER {
/* Each worker thread will wait on two channels: thread_end and new_jobs. thread_end
* informs the worker that it should quit and new_jobs informs that there is a new job
* available inside the queue. Only one worker will get each job, and others will
* go back to waiting on the channels */
let thread_queue = queue.clone();
let active_threads = active_threads.clone();
let thread_end_rx = thread_end_rx.clone();
let new_jobs_rx = new_jobs_rx.clone();
let new_jobs_rx = new_jobs_rx.clone();
let work_context = work_context.clone();
let pulse = pulse.clone();
let handle = spawn_worker(
thread_queue,
active_threads,
thread_end_rx,
new_jobs_rx,
work_context,
pulse,
);
/* add the handle for the newly spawned thread to the list of handles */
threads.insert(handle.thread().id(), String::from("idle-worker").into());
}
/* drop lock */
drop(threads);
{
/* start controller thread */
let threads_lock = threads_lock.clone();
let _static_threads_lock = static_threads_lock.clone();
let thread_queue = queue.clone();
let thread_end_rx = thread_end_rx.clone();
let work_context = work_context.clone();
let handle = thread::spawn(move || 'control_loop: loop {
select! {
recv(thread_end_rx) -> _ => {
debug!("received thread_end_rx, quitting");
break 'control_loop;
},
recv(new_work_rx) -> work => {
if let Ok(work) = work {
if work.is_static {
let work_context = work_context.clone();
let handle = thread::spawn(move || work.compute(work_context));
_static_threads_lock.lock().unwrap().insert(handle.thread().id(), String::new().into());
} else {
if active_threads.load(Ordering::SeqCst) == 0 {
let handle = spawn_worker(
thread_queue.clone(),
active_threads.clone(),
thread_end_rx.clone(),
new_jobs_rx.clone(),
work_context.clone(),
pulse.clone(),
);
/* add the handle for the newly spawned thread to the list of handles */
threads_lock.lock().unwrap().insert(handle.thread().id(), String::from("idle-worker").into());
}
thread_queue.add_work(work);
}
}
}
recv(set_name_rx) -> new_name => {
if let Ok((thread_id, mut new_name)) = new_name {
new_name.truncate_at_boundary(256);
let mut threads = threads_lock.lock().unwrap();
let mut static_threads = _static_threads_lock.lock().unwrap();
let now = datetime::now();
if threads.contains_key(&thread_id) {
threads.entry(thread_id).and_modify(|e| {
e.name = new_name;
e.heartbeat = now;
});
} else if static_threads.contains_key(&thread_id) {
static_threads.entry(thread_id).and_modify(|e| {
e.name = new_name;
e.heartbeat = now;
});
} else {
static_threads.insert(thread_id, Worker { heartbeat: now, .. new_name.into() });
static_threads.entry(thread_id).and_modify(|e| {
e.heartbeat = now;
});
}
pulse.send(ThreadEvent::Pulse).unwrap();
}
}
recv(set_status_rx) -> new_status => {
if let Ok((thread_id, mut new_status)) = new_status {
new_status.truncate_at_boundary(256);
let mut threads = threads_lock.lock().unwrap();
let mut static_threads = _static_threads_lock.lock().unwrap();
let now = datetime::now();
if threads.contains_key(&thread_id) {
threads.entry(thread_id).and_modify(|e| {
e.status = new_status;
e.heartbeat = now;
});
} else if static_threads.contains_key(&thread_id) {
static_threads.entry(thread_id).and_modify(|e| {
e.status = new_status;
e.heartbeat = now;
});
debug!(&static_threads[&thread_id]);
} else {
static_threads.insert(thread_id, Worker { status: new_status, heartbeat: now, .. String::new().into() });
}
pulse.send(ThreadEvent::Pulse).unwrap();
}
}
recv(finished_rx) -> dead_thread_id => {
if let Ok(thread_id) = dead_thread_id {
let mut threads = threads_lock.lock().unwrap();
let mut static_threads = _static_threads_lock.lock().unwrap();
if threads.contains_key(&thread_id) {
threads.remove(&thread_id);
} else if static_threads.contains_key(&thread_id) {
static_threads.remove(&thread_id);
} else {
/* Nothing to do */
}
pulse.send(ThreadEvent::Pulse).unwrap();
}
}
}
});
let mut static_threads = static_threads_lock.lock().unwrap();
static_threads.insert(
handle.thread().id(),
"WorkController-thread".to_string().into(),
);
}
WorkController {
queue,
thread_end_tx,
threads: threads_lock,
static_threads: static_threads_lock,
work_context,
}
}
pub fn add_static_thread(&mut self, id: std::thread::ThreadId) {
self.static_threads
.lock()
.unwrap()
.insert(id, String::new().into());
}
pub fn get_context(&self) -> WorkContext {
self.work_context.clone()
}
}
fn spawn_worker(
thread_queue: WorkQueue,
active_threads: Arc<AtomicUsize>,
thread_end_rx: crossbeam::Receiver<bool>,
new_jobs_rx: crossbeam::Receiver<bool>,
work_context: WorkContext,
pulse: crossbeam::Sender<ThreadEvent>,
) -> std::thread::JoinHandle<()> {
thread::spawn(move || 'work_loop: loop {
debug!("Waiting for work");
select! {
recv(thread_end_rx) -> _ => {
debug!("received thread_end_rx, quitting");
active_threads.fetch_sub(1, Ordering::SeqCst);
break 'work_loop;
},
recv(new_jobs_rx) -> _ => {
active_threads.fetch_sub(1, Ordering::SeqCst);
while let Some(work) = thread_queue.get_work() {
debug!("Got some work");
work.compute(work_context.clone());
debug!("finished work");
work_context.set_name.send((std::thread::current().id(), "idle-worker".to_string())).unwrap();
work_context.set_status.send((std::thread::current().id(), "inactive".to_string())).unwrap();
pulse.send(ThreadEvent::Pulse).unwrap();
std::thread::yield_now();
}
active_threads.fetch_add(1, Ordering::SeqCst);
},
}
})
}