From f6de511abdafb4cb08c0264df6e2c88cf88faaab Mon Sep 17 00:00:00 2001 From: Manos Pitsidianakis Date: Thu, 2 Jan 2020 00:13:18 +0200 Subject: [PATCH] plugin-backend: add BackendOp for PluginBackend --- ui/src/plugins.rs | 23 +++- ui/src/plugins/backend.rs | 169 ++++++++++++++++++++++--- ui/src/plugins/python3/libmeliapi.py | 48 ++++--- ui/src/plugins/python3/nntp-backend.py | 97 +++++++++----- ui/src/plugins/rpc.rs | 8 +- 5 files changed, 262 insertions(+), 83 deletions(-) diff --git a/ui/src/plugins.rs b/ui/src/plugins.rs index 75092f8e..a321ad63 100644 --- a/ui/src/plugins.rs +++ b/ui/src/plugins.rs @@ -20,7 +20,6 @@ */ use melib::error::{MeliError, Result}; -use rmpv::Value; use std::collections::HashMap; use std::io::Write; use std::os::unix::net::{UnixListener, UnixStream}; @@ -32,6 +31,7 @@ pub mod rpc; pub use rpc::*; pub const BACKEND_FN: i8 = 0; +pub const BACKEND_OP_FN: i8 = 1; #[derive(Debug, Clone, Copy, PartialEq, Serialize, Deserialize)] pub enum PluginKind { @@ -74,6 +74,16 @@ pub struct PluginManager { impl Drop for PluginManager { fn drop(&mut self) { let _ = std::fs::remove_file("./soworkfile"); + for (k, c) in self.instances.iter_mut() { + if let Err(err) = debug!(c.kill()) { + eprintln!( + "Error: could not kill process {} spawned by plugin {} ({})", + c.id(), + &self.plugins[&self.sessions[k]].name, + err + ); + } + } } } @@ -170,7 +180,7 @@ impl PluginManager { .stdin(Stdio::piped()) .stdout(Stdio::piped()) .spawn()?; - let (mut stream, _) = self.listener.accept()?; + let (stream, _) = self.listener.accept()?; /* send init message to plugin to register hooks */ let session = Uuid::new_v4(); let channel = RpcChannel::new(stream, &session)?; @@ -224,21 +234,22 @@ impl PluginManager { PluginKind::LongLived => { debug!("listener: {}", l); let channel = self.streams.get_mut(l).unwrap(); - channel.write_ref(&rmpv::ValueRef::Binary(bytes.as_slice())); + channel.write_ref(&rmpv::ValueRef::Binary(bytes.as_slice()))?; let reply: Result = channel.from_read(); return reply; } PluginKind::Filter => { let parts = split_command!(&plugin.executable); - let child = std::process::Command::new(&parts[0]) + let mut child = std::process::Command::new(&parts[0]) .args(&parts[1..]) .stdin(Stdio::piped()) .stdout(Stdio::piped()) .spawn()?; - let (mut stream, _) = self.listener.accept()?; + let (stream, _) = self.listener.accept()?; let mut channel = RpcChannel::new(stream, l)?; - channel.write_ref(&rmpv::ValueRef::Binary(bytes.as_slice())); + channel.write_ref(&rmpv::ValueRef::Binary(bytes.as_slice()))?; let reply: Result = channel.from_read(); + child.kill()?; return reply; } k => { diff --git a/ui/src/plugins/backend.rs b/ui/src/plugins/backend.rs index 2cd701c3..e1062ca0 100644 --- a/ui/src/plugins/backend.rs +++ b/ui/src/plugins/backend.rs @@ -32,25 +32,53 @@ use melib::error::{MeliError, Result}; use std::collections::BTreeMap; use std::sync::{Arc, Mutex, RwLock}; +// TODO replace with melib::Envelope after simplifying melib::Envelope's +// fields/interface/deserializing +#[derive(Debug, Clone, Serialize, Deserialize)] +struct SimpleEnvelope { + hash: EnvelopeHash, + subject: String, + from: String, + to: String, + date: String, + message_id: String, + references: String, +} + #[derive(Debug)] pub struct PluginBackend { plugin: Plugin, child: std::process::Child, channel: Arc>, + tag_index: Option>>>, is_online: Arc)>>, } +impl Drop for PluginBackend { + fn drop(&mut self) { + if let Err(err) = debug!(self.child.kill()) { + eprintln!( + "Error: could not kill process {} spawned by plugin {} ({})", + self.child.id(), + &self.plugin.name, + err + ); + } + } +} + impl MailBackend for PluginBackend { fn is_online(&self) -> Result<()> { if let Ok(mut is_online) = self.is_online.try_lock() { let now = std::time::Instant::now(); if now.duration_since(is_online.0) >= std::time::Duration::new(2, 0) { - let mut channel = self.channel.lock().unwrap(); - channel.write_ref(&rmpv::ValueRef::Ext(BACKEND_FN, b"is_online"))?; - debug!(channel.expect_ack())?; - let ret: PluginResult<()> = debug!(channel.from_read())?; - is_online.0 = now; - is_online.1 = ret.into(); + if let Ok(mut channel) = self.channel.try_lock() { + channel.write_ref(&rmpv::ValueRef::Ext(BACKEND_FN, b"is_online"))?; + debug!(channel.expect_ack())?; + let ret: PluginResult<()> = debug!(channel.from_read())?; + is_online.0 = now; + is_online.1 = ret.into(); + } } is_online.1.clone() } else { @@ -62,7 +90,7 @@ impl MailBackend for PluginBackend { fn get(&mut self, folder: &Folder) -> Async>> { let mut w = AsyncBuilder::new(); - let folder_hash = folder.hash(); + let _folder_hash = folder.hash(); let channel = self.channel.clone(); let handle = { let tx = w.tx(); @@ -73,13 +101,65 @@ impl MailBackend for PluginBackend { .unwrap(); channel.expect_ack().unwrap(); loop { - let read_val: Result>>> = + let read_val: Result>>> = debug!(channel.from_read()); match read_val.map(Into::into).and_then(std::convert::identity) { Ok(Some(a)) => { tx.send(AsyncStatus::Payload(Ok(a .into_iter() - .filter_map(|s| Envelope::from_bytes(s.as_bytes(), None).ok()) + .filter_map( + |SimpleEnvelope { + hash, + date, + from, + to, + subject, + message_id, + references, + }| { + let mut env = melib::Envelope::new(hash); + env.set_date(date.as_bytes()); + if let Ok(d) = melib::email::parser::date(date.as_bytes()) { + env.set_datetime(d); + } + env.set_message_id(message_id.as_bytes()); + let parse_result = + melib::email::parser::rfc2822address_list( + from.as_bytes(), + ); + if parse_result.is_done() { + let value = parse_result.to_full_result().unwrap(); + env.set_from(value); + } + let parse_result = + melib::email::parser::rfc2822address_list( + to.as_bytes(), + ); + if parse_result.is_done() { + let value = parse_result.to_full_result().unwrap(); + env.set_to(value); + } + let parse_result = + melib::email::parser::phrase(subject.as_bytes()); + if parse_result.is_done() { + let value = parse_result.to_full_result().unwrap(); + env.set_subject(value); + } + if !references.is_empty() { + let parse_result = melib::email::parser::references( + references.as_bytes(), + ); + if parse_result.is_done() { + for v in parse_result.to_full_result().unwrap() { + env.push_references(v); + } + } + env.set_references(references.as_bytes()); + } + + Some(env) + }, + ) .collect::>()))) .unwrap(); } @@ -109,28 +189,35 @@ impl MailBackend for PluginBackend { } fn watch( &self, - sender: RefreshEventConsumer, - work_context: WorkContext, + _sender: RefreshEventConsumer, + _work_context: WorkContext, ) -> Result { Err(MeliError::new("Unimplemented.")) } + fn folders(&self) -> Result> { let mut ret: FnvHashMap = Default::default(); ret.insert(0, Folder::default()); Ok(ret) } + fn operation(&self, hash: EnvelopeHash) -> Box { - unimplemented!() + Box::new(PluginOp { + hash, + channel: self.channel.clone(), + tag_index: self.tag_index.clone(), + bytes: None, + }) } - fn save(&self, bytes: &[u8], folder: &str, flags: Option) -> Result<()> { + fn save(&self, _bytes: &[u8], _folder: &str, _flags: Option) -> Result<()> { Err(MeliError::new("Unimplemented.")) } - fn create_folder(&mut self, name: String) -> Result { + fn create_folder(&mut self, _name: String) -> Result { Err(MeliError::new("Unimplemented.")) } fn tags(&self) -> Option>>> { - None + self.tag_index.clone() } fn as_any(&self) -> &dyn::std::any::Any { self @@ -156,7 +243,7 @@ impl PluginBackend { .stdin(Stdio::piped()) .stdout(Stdio::piped()) .spawn()?; - let (mut stream, _) = listener.accept()?; + let (stream, _) = listener.accept()?; /* send init message to plugin to register hooks */ let session = Uuid::new_v4(); let channel = RpcChannel::new(stream, &session)?; @@ -166,6 +253,7 @@ impl PluginBackend { child, plugin, channel: Arc::new(Mutex::new(channel)), + tag_index: None, is_online: Arc::new(Mutex::new((now, Err(MeliError::new("Unitialized"))))), })) } @@ -188,3 +276,52 @@ impl PluginBackend { ); } } + +#[derive(Debug)] +struct PluginOp { + hash: EnvelopeHash, + channel: Arc>, + tag_index: Option>>>, + bytes: Option, +} + +impl BackendOp for PluginOp { + fn description(&self) -> String { + String::new() + } + + fn as_bytes(&mut self) -> Result<&[u8]> { + if let Some(ref bytes) = self.bytes { + return Ok(bytes.as_bytes()); + } + + if let Ok(mut channel) = self.channel.try_lock() { + channel.write_ref(&rmpv::ValueRef::Ext(BACKEND_OP_FN, b"as_bytes"))?; + debug!(channel.expect_ack())?; + channel.write_ref(&rmpv::ValueRef::Integer(self.hash.into()))?; + debug!(channel.expect_ack())?; + let bytes: Result> = debug!(channel.from_read()); + self.bytes = Some(bytes.map(Into::into).and_then(std::convert::identity)?); + if let Some(ref bytes) = self.bytes { + debug!(Envelope::from_bytes(bytes.as_bytes(), None)); + } + Ok(self.bytes.as_ref().map(String::as_bytes).unwrap()) + } else { + Err(MeliError::new("busy")) + } + } + + fn fetch_flags(&self) -> Flag { + let flag = Flag::default(); + + flag + } + + fn set_flag(&mut self, __envelope: &mut Envelope, _f: Flag, _value: bool) -> Result<()> { + Err(MeliError::new("Unimplemented.")) + } + + fn set_tag(&mut self, _envelope: &mut Envelope, _tag: String, _value: bool) -> Result<()> { + Err(MeliError::new("Unimplemented.")) + } +} diff --git a/ui/src/plugins/python3/libmeliapi.py b/ui/src/plugins/python3/libmeliapi.py index 044b1a9a..6475f798 100644 --- a/ui/src/plugins/python3/libmeliapi.py +++ b/ui/src/plugins/python3/libmeliapi.py @@ -45,10 +45,7 @@ def _read_objects(sock): unpacker = msgpack.Unpacker() ret = [] #reader = socket.socket.makefile(sock, 'rb') - counter = 0 while True: - print("[libmeliapi]: _read_objects loop = ", counter, flush=True, file=sys.stderr) - counter += 1 try: buf = sock.recv(1024**2) if not buf: @@ -85,6 +82,9 @@ def _write_objects(sock, objects): sys.stderr.flush() if e.errno == errno.EWOULDBLOCK: break + elif e.errno == errno.EAGAIN: + time.sleep(0.001) + continue else: raise @@ -126,35 +126,37 @@ class Client(object): def send(self, objects): sys.stderr.flush() - print("[libmeliapi]: ", "stuck in send ", self.buffer, flush=True, file=sys.stderr, ) + #print("[libmeliapi]: ", "stuck in send ", self.buffer, flush=True, file=sys.stderr, ) _write_objects(self.sock, objects) - print("[libmeliapi]: ", "unstuck wrote objs", flush=True, file=sys.stderr, ) + #print("[libmeliapi]: ", "unstuck wrote objs", flush=True, file=sys.stderr, ) #print("[libmeliapi]: ", "wrote object ", objects, file=sys.stderr) - time.sleep(0.1) + time.sleep(0.001) def ack(self): sys.stderr.flush() _write_objects(self.sock, 0x06) - time.sleep(0.1) + time.sleep(0.001) def expect_ack(self): - print("[libmeliapi]: expect_ack, ", self.buffer, flush=True, file=sys.stderr, ) - read_list = _read_objects(self.sock) - time.sleep(0.1) - self.buffer.extend(read_list) - if len(self.buffer) > 0 and self.buffer.popleft() == 0x6: - print("[libmeliapi]: got_ack, ", self.buffer, flush=True, file=sys.stderr, ) - return - else: - raise "ACK expected" + #print("[libmeliapi]: expect_ack, ", self.buffer, flush=True, file=sys.stderr, ) + while True: + time.sleep(0.1) + read_list = _read_objects(self.sock) + self.buffer.extend(read_list) + try: + self.buffer.remove(0x6) + #print("[libmeliapi]: got_ack, ", self.buffer, flush=True, file=sys.stderr, ) + return + except ValueError: + pass def read(self): sys.stderr.flush() - print("[libmeliapi]: ", "stuck in read ", self.buffer, flush=True, file=sys.stderr, ) + #print("[libmeliapi]: ", "stuck in read ", self.buffer, flush=True, file=sys.stderr, ) read_list = _read_objects(self.sock) - time.sleep(0.1) + time.sleep(0.01) self.buffer.extend(read_list) - print("[libmeliapi]: ", "unstuck read self.buffer =", self.buffer, flush=True, file=sys.stderr, ) + #print("[libmeliapi]: ", "unstuck read self.buffer =", self.buffer, flush=True, file=sys.stderr, ) if len(self.buffer) > 0: return self.buffer.popleft() else: @@ -164,10 +166,14 @@ class Client(object): def backend_fn_type(self): return 0 - def backend_fn_ok_send(self, objects): + @property + def backend_op_fn_type(self): + return 1 + + def ok_send(self, objects): self.send({"t": "ok", "c": objects }) self.expect_ack() - def backend_fn_err_send(self, objects): + def err_send(self, objects): self.send({"t": "err", "c": objects }) self.expect_ack() diff --git a/ui/src/plugins/python3/nntp-backend.py b/ui/src/plugins/python3/nntp-backend.py index 37331eef..e8a8c24a 100755 --- a/ui/src/plugins/python3/nntp-backend.py +++ b/ui/src/plugins/python3/nntp-backend.py @@ -35,19 +35,70 @@ def chunks(iterable, n): except: break +class NNTPClient(libmeliapi.Client): + def __init__(self, stream_address, server_address, newsgroup): + super().__init__(stream_address) + self.bytes_cache = {} + self.conn = nntplib.NNTP(server_address) + self.newsgroup = newsgroup + def backend_req(self, req): + print("[nntp-plugin]: backend_req = ", req, flush=True, file=sys.stderr) + if req.data == b'is_online': + self.ok_send(None) + elif req.data == b'get': + resp, count, first, last, name = self.conn.group(self.newsgroup) + print('Group', name, 'has', count, 'articles, range', first, 'to', last, flush=True, file=sys.stderr) + + resp, overviews = self.conn.over((0, last)) + for chunk in chunks(iter(reversed(overviews)), 100): + ret = [] + for id, over in chunk: + #print(id, nntplib.decode_header(over['subject']), flush=True, file=sys.stderr) + env = {} + env["hash"] = id + env["subject"] = nntplib.decode_header(over["subject"]) + env["from"] = nntplib.decode_header(over["from"]) + env["date"] = nntplib.decode_header(over["date"]) + env["message_id"] = nntplib.decode_header(over["message-id"]) + env["references"] = nntplib.decode_header(over["references"]) + try: + env["to"] = nntplib.decode_header(over["to"]) + except KeyError: + env["to"] = self.newsgroup + ret.append(env) + print("ret len = ", len(ret), flush=True,file=sys.stderr) + self.ok_send(ret) + self.ok_send(None) + def backend_op_req(self, req): + print("[nntp-plugin]: backend_op_req = ", req, flush=True, file=sys.stderr) + if req.data == b'as_bytes': + _hash = self.read() + print("[nntp-plugin]: hash = ", _hash, flush=True, file=sys.stderr) + self.ack() + try: + try: + self.ok_send(self.bytes_cache[_hash]) + except KeyError: + resp, info = self.conn.article(_hash) + #print(_id, " line0 = ", str(info.lines[0], 'utf-8', 'ignore')) + elem = b'\n'.join(info.lines) + self.bytes_cache[_hash] = str(elem, 'utf-8', 'ignore') + self.ok_send(self.bytes_cache[_hash]) + except Exception as e: + self.err_send(str(e)) + if __name__ == "__main__": import importlib importlib.reload(libmeliapi) - server_address = './soworkfile' - client = libmeliapi.Client(server_address) + stream_address = './soworkfile' + server_address = 'news.gmane.org' + newsgroup = 'gmane.comp.python.committers' + client = NNTPClient(stream_address, server_address, newsgroup) client.connect() #client.setblocking(True) try: - counter = 0 while True: - print("[nntp-plugin]: loop = ", counter, flush=True, file=sys.stderr) - counter += 1 req = client.read() if req is None: time.sleep(0.15) @@ -57,36 +108,12 @@ if __name__ == "__main__": print("[nntp-plugin]: ", "req: ", req, flush=True, file=sys.stderr) sys.stderr.flush() if isinstance(req, msgpack.ExtType): + if req.code == client.backend_fn_type: + client.backend_req(req) + elif req.code == client.backend_op_fn_type: + client.backend_op_req(req) print("[nntp-plugin]: ", req, flush=True, file=sys.stderr) - if req.data == b'is_online': - client.backend_fn_ok_send(None) - elif req.data == b'get': - s = nntplib.NNTP('news.gmane.org') - resp, count, first, last, name = s.group('gmane.comp.python.committers') - print('Group', name, 'has', count, 'articles, range', first, 'to', last, flush=True, file=sys.stderr) - - resp, overviews = s.over((last - 9, last)) - ids = [] - for id, over in overviews: - ids.append(id) - print(id, nntplib.decode_header(over['subject']), flush=True, file=sys.stderr) - for chunk in chunks(iter(ids), 2): - ret = [] - for _id in chunk: - resp, info = s.article(_id) - #print(_id, " line0 = ", str(info.lines[0], 'utf-8', 'ignore')) - elem = b'\n'.join(info.lines) - ret.append(str(elem, 'utf-8', 'ignore')) - print("ret len = ", len(ret), flush=True,file=sys.stderr) - client.backend_fn_ok_send(ret) - time.sleep(0.85) - s.quit() - client.backend_fn_ok_send(None) #client.setblocking(True) time.sleep(0.15) - - - except Exception as msg: - print("[nntp-plugin]: ", msg, flush=True, file=sys.stderr,) - sys.stderr.flush() - + except: + raise RuntimeError("Something bad happened") diff --git a/ui/src/plugins/rpc.rs b/ui/src/plugins/rpc.rs index 1d3b91a0..0de0e64a 100644 --- a/ui/src/plugins/rpc.rs +++ b/ui/src/plugins/rpc.rs @@ -20,8 +20,6 @@ */ use super::*; -use rmp_serde::Deserializer; -use serde::{Deserialize, Serialize}; #[derive(Debug)] pub struct RpcChannel { @@ -36,7 +34,7 @@ pub struct PluginGreeting { } impl RpcChannel { - pub fn new(mut stream: UnixStream, session: &Uuid) -> Result { + pub fn new(stream: UnixStream, session: &Uuid) -> Result { let mut ret = RpcChannel { stream, session: session.clone(), @@ -88,7 +86,7 @@ impl RpcChannel { let ret: RpcResult = debug!(rmp_serde::decode::from_read(&mut self.stream)) .map_err(|err| MeliError::new(err.to_string()))?; let _ = self.stream.flush(); - self.ack(); + self.ack()?; debug!("read() ret={:?}", &ret); ret.into() } @@ -101,7 +99,7 @@ impl RpcChannel { let ret: Result = debug!(rmp_serde::decode::from_read(&mut self.stream)) .map_err(|err| MeliError::new(err.to_string())); let _ = self.stream.flush(); - self.ack(); + self.ack()?; debug!("read() ret={:?}", &ret); ret }