/* * meli - plugins * * Copyright 2019 Manos Pitsidianakis * * This file is part of meli. * * meli is free software: you can redistribute it and/or modify * it under the terms of the GNU General Public License as published by * the Free Software Foundation, either version 3 of the License, or * (at your option) any later version. * * meli is distributed in the hope that it will be useful, * but WITHOUT ANY WARRANTY; without even the implied warranty of * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the * GNU General Public License for more details. * * You should have received a copy of the GNU General Public License * along with meli. If not, see . */ use super::*; use fnv::FnvHashMap; use melib::async_workers::{Async, AsyncBuilder, AsyncStatus, WorkContext}; use melib::backends::FolderHash; use melib::backends::{ Backend, BackendOp, Backends, Folder, MailBackend, RefreshEvent, RefreshEventConsumer, }; use melib::conf::AccountSettings; use melib::email::{Envelope, EnvelopeHash, Flag}; use melib::error::{MeliError, Result}; use std::collections::BTreeMap; use std::sync::{Arc, Mutex, RwLock}; // TODO replace with melib::Envelope after simplifying melib::Envelope's // fields/interface/deserializing #[derive(Debug, Clone, Serialize, Deserialize)] struct SimpleEnvelope { hash: EnvelopeHash, subject: String, from: String, to: String, date: String, message_id: String, references: String, } #[derive(Debug)] pub struct PluginBackend { plugin: Plugin, child: std::process::Child, channel: Arc>, tag_index: Option>>>, is_online: Arc)>>, } impl Drop for PluginBackend { fn drop(&mut self) { if let Err(err) = debug!(self.child.kill()) { eprintln!( "Error: could not kill process {} spawned by plugin {} ({})", self.child.id(), &self.plugin.name, err ); } } } impl MailBackend for PluginBackend { fn is_online(&self) -> Result<()> { if let Ok(mut is_online) = self.is_online.try_lock() { let now = std::time::Instant::now(); if now.duration_since(is_online.0) >= std::time::Duration::new(2, 0) { if let Ok(mut channel) = self.channel.try_lock() { channel.write_ref(&rmpv::ValueRef::Ext(BACKEND_FN, b"is_online"))?; debug!(channel.expect_ack())?; let ret: PluginResult<()> = debug!(channel.from_read())?; is_online.0 = now; is_online.1 = ret.into(); } } is_online.1.clone() } else { Err(MeliError::new("busy")) } } fn connect(&mut self) {} fn get(&mut self, folder: &Folder) -> Async>> { let mut w = AsyncBuilder::new(); let _folder_hash = folder.hash(); let channel = self.channel.clone(); let handle = { let tx = w.tx(); let closure = move |_work_context| { let mut channel = channel.lock().unwrap(); channel .write_ref(&rmpv::ValueRef::Ext(BACKEND_FN, b"get")) .unwrap(); channel.expect_ack().unwrap(); loop { let read_val: Result>>> = channel.from_read(); match read_val.map(Into::into).and_then(std::convert::identity) { Ok(Some(a)) => { tx.send(AsyncStatus::Payload(Ok(a .into_iter() .filter_map( |SimpleEnvelope { hash, date, from, to, subject, message_id, references, }| { let mut env = melib::Envelope::new(hash); env.set_date(date.as_bytes()); if let Ok(d) = melib::email::parser::date(date.as_bytes()) { env.set_datetime(d); } env.set_message_id(message_id.as_bytes()); let parse_result = melib::email::parser::rfc2822address_list( from.as_bytes(), ); if parse_result.is_done() { let value = parse_result.to_full_result().unwrap(); env.set_from(value); } let parse_result = melib::email::parser::rfc2822address_list( to.as_bytes(), ); if parse_result.is_done() { let value = parse_result.to_full_result().unwrap(); env.set_to(value); } let parse_result = melib::email::parser::phrase(subject.as_bytes()); if parse_result.is_done() { let value = parse_result.to_full_result().unwrap(); env.set_subject(value); } if !references.is_empty() { let parse_result = melib::email::parser::references( references.as_bytes(), ); if parse_result.is_done() { for v in parse_result.to_full_result().unwrap() { env.push_references(v); } } env.set_references(references.as_bytes()); } Some(env) }, ) .collect::>()))) .unwrap(); } Ok(None) => { tx.send(AsyncStatus::Finished).unwrap(); return; } Err(err) => { tx.send(AsyncStatus::Payload(Err(err))).unwrap(); tx.send(AsyncStatus::Finished).unwrap(); return; } }; } }; Box::new(closure) }; w.build(handle) } fn refresh( &mut self, _folder_hash: FolderHash, _sender: RefreshEventConsumer, ) -> Result>>> { Err(MeliError::new("Unimplemented.")) } fn watch( &self, _sender: RefreshEventConsumer, _work_context: WorkContext, ) -> Result { Err(MeliError::new("Unimplemented.")) } fn folders(&self) -> Result> { let mut ret: FnvHashMap = Default::default(); ret.insert(0, Folder::default()); Ok(ret) } fn operation(&self, hash: EnvelopeHash) -> Box { Box::new(PluginOp { hash, channel: self.channel.clone(), tag_index: self.tag_index.clone(), bytes: None, }) } fn save(&self, _bytes: &[u8], _folder: &str, _flags: Option) -> Result<()> { Err(MeliError::new("Unimplemented.")) } fn create_folder(&mut self, _name: String) -> Result { Err(MeliError::new("Unimplemented.")) } fn tags(&self) -> Option>>> { self.tag_index.clone() } fn as_any(&self) -> &dyn::std::any::Any { self } } impl PluginBackend { pub fn new( listener: UnixListener, plugin: Plugin, _s: &AccountSettings, _is_subscribed: Box bool>, ) -> Result> { if plugin.kind != PluginKind::Backend { return Err(MeliError::new(format!( "Error: Plugin `{}` is not a mail backend plugin, it's `{:?}`", &plugin.name, &plugin.kind ))); } let parts = split_command!(&plugin.executable); let child = std::process::Command::new(&parts[0]) .args(&parts[1..]) .stdin(Stdio::piped()) .stdout(Stdio::piped()) .spawn()?; let (stream, _) = listener.accept()?; /* send init message to plugin to register hooks */ let session = Uuid::new_v4(); let channel = RpcChannel::new(stream, &session)?; let now = std::time::Instant::now() - std::time::Duration::from_secs(5); Ok(Box::new(PluginBackend { child, plugin, channel: Arc::new(Mutex::new(channel)), tag_index: None, is_online: Arc::new(Mutex::new((now, Err(MeliError::new("Unitialized"))))), })) } pub fn register(listener: UnixListener, plugin: Plugin, backends: &mut Backends) { backends.register( plugin.name.clone(), Backend { create_fn: Box::new(move || { let plugin = plugin.clone(); let listener = listener.try_clone().unwrap(); Box::new(move |f, i| { let plugin = plugin.clone(); let listener = listener.try_clone().unwrap(); PluginBackend::new(listener, plugin, f, i) }) }), validate_conf_fn: Box::new(|_| Ok(())), }, ); } } #[derive(Debug)] struct PluginOp { hash: EnvelopeHash, channel: Arc>, tag_index: Option>>>, bytes: Option, } impl BackendOp for PluginOp { fn description(&self) -> String { String::new() } fn as_bytes(&mut self) -> Result<&[u8]> { if let Some(ref bytes) = self.bytes { return Ok(bytes.as_bytes()); } if let Ok(mut channel) = self.channel.try_lock() { channel.write_ref(&rmpv::ValueRef::Ext(BACKEND_OP_FN, b"as_bytes"))?; debug!(channel.expect_ack())?; channel.write_ref(&rmpv::ValueRef::Integer(self.hash.into()))?; debug!(channel.expect_ack())?; let bytes: Result> = channel.from_read(); self.bytes = Some(bytes.map(Into::into).and_then(std::convert::identity)?); Ok(self.bytes.as_ref().map(String::as_bytes).unwrap()) } else { Err(MeliError::new("busy")) } } fn fetch_flags(&self) -> Flag { let flag = Flag::default(); flag } fn set_flag(&mut self, __envelope: &mut Envelope, _f: Flag, _value: bool) -> Result<()> { Err(MeliError::new("Unimplemented.")) } fn set_tag(&mut self, _envelope: &mut Envelope, _tag: String, _value: bool) -> Result<()> { Err(MeliError::new("Unimplemented.")) } }