From 12509748f6cf7a34e0b15935729c59e31993eb61 Mon Sep 17 00:00:00 2001 From: Manos Pitsidianakis Date: Mon, 23 Dec 2019 17:08:57 +0200 Subject: [PATCH] Plugins WIP --- Cargo.lock | 34 +++++++ ui/Cargo.toml | 3 + ui/src/conf.rs | 5 + ui/src/lib.rs | 2 + ui/src/plugins.rs | 207 ++++++++++++++++++++++++++++++++++++++++ ui/src/sample-plugin.py | 133 ++++++++++++++++++++++++++ ui/src/state.rs | 7 ++ 7 files changed, 391 insertions(+) create mode 100644 ui/src/plugins.rs create mode 100755 ui/src/sample-plugin.py diff --git a/Cargo.lock b/Cargo.lock index 42da72a26..8c9356fb4 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -1357,6 +1357,34 @@ dependencies = [ "winreg 0.6.2 (registry+https://github.com/rust-lang/crates.io-index)", ] +[[package]] +name = "rmp" +version = "0.8.8" +source = "registry+https://github.com/rust-lang/crates.io-index" +dependencies = [ + "byteorder 1.3.2 (registry+https://github.com/rust-lang/crates.io-index)", + "num-traits 0.2.8 (registry+https://github.com/rust-lang/crates.io-index)", +] + +[[package]] +name = "rmp-serde" +version = "0.14.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +dependencies = [ + "byteorder 1.3.2 (registry+https://github.com/rust-lang/crates.io-index)", + "rmp 0.8.8 (registry+https://github.com/rust-lang/crates.io-index)", + "serde 1.0.99 (registry+https://github.com/rust-lang/crates.io-index)", +] + +[[package]] +name = "rmpv" +version = "0.4.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +dependencies = [ + "num-traits 0.2.8 (registry+https://github.com/rust-lang/crates.io-index)", + "rmp 0.8.8 (registry+https://github.com/rust-lang/crates.io-index)", +] + [[package]] name = "rusqlite" version = "0.20.0" @@ -1817,6 +1845,9 @@ dependencies = [ "nom 3.2.1 (registry+https://github.com/rust-lang/crates.io-index)", "notify 4.0.12 (registry+https://github.com/rust-lang/crates.io-index)", "notify-rust 3.6.0 (registry+https://github.com/rust-lang/crates.io-index)", + "rmp 0.8.8 (registry+https://github.com/rust-lang/crates.io-index)", + "rmp-serde 0.14.0 (registry+https://github.com/rust-lang/crates.io-index)", + "rmpv 0.4.2 (registry+https://github.com/rust-lang/crates.io-index)", "rusqlite 0.20.0 (registry+https://github.com/rust-lang/crates.io-index)", "serde 1.0.99 (registry+https://github.com/rust-lang/crates.io-index)", "serde_derive 1.0.92 (registry+https://github.com/rust-lang/crates.io-index)", @@ -2235,6 +2266,9 @@ source = "registry+https://github.com/rust-lang/crates.io-index" "checksum redox_users 0.3.0 (registry+https://github.com/rust-lang/crates.io-index)" = "3fe5204c3a17e97dde73f285d49be585df59ed84b50a872baf416e73b62c3828" "checksum remove_dir_all 0.5.2 (registry+https://github.com/rust-lang/crates.io-index)" = "4a83fa3702a688b9359eccba92d153ac33fd2e8462f9e0e3fdf155239ea7792e" "checksum reqwest 0.10.0-alpha.2 (registry+https://github.com/rust-lang/crates.io-index)" = "e83b47defcad97ddbe592fd5fe49e16661f754b0ba5847cf41bcd870a2d338d7" +"checksum rmp 0.8.8 (registry+https://github.com/rust-lang/crates.io-index)" = "0f594cb7ff8f1c5a7907f6be91f15795c8301e0d5718eb007fb5832723dd716e" +"checksum rmp-serde 0.14.0 (registry+https://github.com/rust-lang/crates.io-index)" = "4a31c0798045f039ace94e0166f76478b3ba83116ec7c9d4bc934c5b13b8df21" +"checksum rmpv 0.4.2 (registry+https://github.com/rust-lang/crates.io-index)" = "83caf745acbd99179ab6ce95398ddf548edfa2d3a99ff195248b30cd1524c43f" "checksum rusqlite 0.20.0 (registry+https://github.com/rust-lang/crates.io-index)" = "2a194373ef527035645a1bc21b10dc2125f73497e6e155771233eb187aedd051" "checksum rustc-demangle 0.1.15 (registry+https://github.com/rust-lang/crates.io-index)" = "a7f4dccf6f4891ebcc0c39f9b6eb1a83b9bf5d747cb439ec6fba4f3b977038af" "checksum rustc_version 0.2.3 (registry+https://github.com/rust-lang/crates.io-index)" = "138e3e0acb6c9fb258b19b67cb8abd63c00679d2851805ea151465464fe9030a" diff --git a/ui/Cargo.toml b/ui/Cargo.toml index bccf388bb..ae73b2504 100644 --- a/ui/Cargo.toml +++ b/ui/Cargo.toml @@ -27,6 +27,9 @@ text_processing = { path = "../text_processing", version = "*" } libc = {version = "0.2.59", features = ["extra_traits",]} nix = "0.15.0" rusqlite = {version = "0.20.0", optional =true } +rmp = "^0.8" +rmpv = "^0.4.2" +rmp-serde = "^0.14.0" [features] default = ["sqlite3"] diff --git a/ui/src/conf.rs b/ui/src/conf.rs index 9e3fb7981..f20f0eb0c 100644 --- a/ui/src/conf.rs +++ b/ui/src/conf.rs @@ -44,6 +44,7 @@ use self::default_vals::*; use self::notifications::NotificationsSettings; use self::terminal::TerminalSettings; use crate::pager::PagerSettings; +use crate::plugins::Plugin; use melib::backends::SpecialUsageMailbox; use melib::conf::{toggleflag_de, AccountSettings, FolderConf, ToggleFlag}; use melib::error::*; @@ -241,6 +242,8 @@ pub struct FileSettings { pgp: PGPSettings, #[serde(default)] terminal: TerminalSettings, + #[serde(default)] + plugins: HashMap, } #[derive(Debug, Clone, Default)] @@ -272,6 +275,7 @@ pub struct Settings { pub composing: ComposingSettings, pub pgp: PGPSettings, pub terminal: TerminalSettings, + pub plugins: HashMap, } impl FileSettings { @@ -395,6 +399,7 @@ impl Settings { composing: fs.composing, pgp: fs.pgp, terminal: fs.terminal, + plugins: fs.plugins, }) } } diff --git a/ui/src/lib.rs b/ui/src/lib.rs index 922bafb79..51465bd14 100644 --- a/ui/src/lib.rs +++ b/ui/src/lib.rs @@ -77,6 +77,8 @@ pub mod cache; pub mod mailcap; +pub mod plugins; + pub use crate::username::*; pub mod username { use libc; diff --git a/ui/src/plugins.rs b/ui/src/plugins.rs new file mode 100644 index 000000000..a661e853c --- /dev/null +++ b/ui/src/plugins.rs @@ -0,0 +1,207 @@ +/* + * meli - ui plugins + * + * Copyright 2019 Manos Pitsidianakis + * + * This file is part of meli. + * + * meli is free software: you can redistribute it and/or modify + * it under the terms of the GNU General Public License as published by + * the Free Software Foundation, either version 3 of the License, or + * (at your option) any later version. + * + * meli is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU General Public License for more details. + * + * You should have received a copy of the GNU General Public License + * along with meli. If not, see . + */ + +use crate::workers::WorkController; +use melib::error::{MeliError, Result}; +use rmpv::{Value, ValueRef}; +use std::any::TypeId; +use std::collections::HashMap; +use std::io::{self, BufRead, BufReader}; +use std::io::{Read, Write}; +use std::os::unix::net::{UnixListener, UnixStream}; +use std::path::{Path, PathBuf}; +use std::process::{Command, Stdio}; +use std::thread; +use std::thread::ThreadId; + +#[derive(Debug, Clone, Serialize, Deserialize)] +pub enum PluginKind { + LongLived, + Ephemeral, +} + +impl Default for PluginKind { + fn default() -> Self { + Self::LongLived + } +} + +#[derive(Default, Debug, Clone, Serialize, Deserialize)] +pub struct Plugin { + kind: PluginKind, + executable: String, + name: String, +} + +#[derive(Debug)] +pub struct PluginManager { + plugins: HashMap, + instances: HashMap, + hooks: HashMap, +} + +impl Drop for PluginManager { + fn drop(&mut self) { + let _ = std::fs::remove_file("./soworkfile"); + } +} + +impl PluginManager { + pub fn new() -> Self { + let _ = std::fs::remove_file("./soworkfile"); + let listener = UnixListener::bind("./soworkfile").unwrap(); + debug!("bound"); + // accept connections and process them, spawning a new thread for each one + thread::spawn(move || { + debug!("spawn"); + let stream = listener.accept(); + debug!("socket stream {:?}", &stream); + match stream { + Ok((mut stream, _)) => { + debug!("socket stream {:?}", &stream); + /* connection succeeded */ + thread::spawn(move || { + debug!("socket listen {:?}", &stream); + debug!(initialize(stream)); + //let mut response = Vec::new(); + //debug!(stream.read_to_end(&mut response)); + //loop { + // debug!("pre-flush 1"); + // stream.flush(); + // debug!("post-flush 1"); + // if debug!(rmpv::decode::value::read_value(&mut stream)).is_err() { + // return; + // } + // debug!("post-read_value"); + // //debug!("socket response {}", unsafe { + // // String::from_utf8_lossy(&response) + // //}); + // stream.flush(); + // debug!("post-flush 2"); + // if debug!(rmpv::encode::write_value( + // &mut stream, + // &rmpv::Value::String("hello 2 u 2".into()) + // )) + // .is_err() + // { + // return; + // } + // debug!("post-write_value"); + //} + }); + } + Err(err) => { + /* connection failed */ + debug!(err); + } + } + }); + + PluginManager { + plugins: Default::default(), + instances: Default::default(), + hooks: Default::default(), + } + } + + pub fn register(&mut self, plugin: Plugin) -> Result<()> { + debug!(&plugin); + match plugin.kind { + PluginKind::LongLived => { + /* spawn thread */ + let parts = split_command!(&plugin.executable); + let mut child = std::process::Command::new(&parts[0]) + .args(&parts[1..]) + .stdin(Stdio::piped()) + .stdout(Stdio::piped()) + .spawn()?; + + /* add thread to workcontroller */ + self.instances.insert(plugin.name.clone(), child); + self.plugins.insert(plugin.name.clone(), plugin); + /* send init message to plugin to register hooks */ + Ok(()) + } + PluginKind::Ephemeral => { + self.plugins.insert(plugin.name.clone(), plugin); + /* send init message to plugin to register hooks */ + Ok(()) + } + } + } + + pub fn register_hook(&mut self, hook: UIHook) { + self.hooks.insert(hook.name.clone(), hook); + } +} + +#[derive(Debug)] +pub struct UIHook { + name: String, + listeners: Vec, + kind: TypeId, +} + +#[derive(Debug, Serialize, Deserialize)] +pub struct PluginGreeting { + version: String, +} + +pub fn initialize(mut stream: UnixStream) -> Result<()> { + let greeting: std::result::Result = + rmp_serde::decode::from_read(&mut stream); + match debug!(greeting) { + Ok(greeting) => { + if greeting.version != "dev" { + return Err("Plugin is not compatible with our API (dev)".into()); + } + } + Err(err) => { + return Err(MeliError::new(err.to_string())); + } + } + + loop { + debug!("pre-flush 1"); + stream.flush(); + debug!("post-flush 1"); + if debug!(rmpv::decode::value::read_value(&mut stream)).is_err() { + break; + } + debug!("post-read_value"); + //debug!("socket response {}", unsafe { + // String::from_utf8_lossy(&response) + //}); + stream.flush(); + debug!("post-flush 2"); + if debug!(rmpv::encode::write_value( + &mut stream, + &rmpv::Value::String("hello 2 u 2".into()) + )) + .is_err() + { + break; + } + debug!("post-write_value"); + } + + return Ok(()); +} diff --git a/ui/src/sample-plugin.py b/ui/src/sample-plugin.py new file mode 100755 index 000000000..58d3e5538 --- /dev/null +++ b/ui/src/sample-plugin.py @@ -0,0 +1,133 @@ +#! /usr/bin/env python3 +""" +meli - sample plugin + +Copyright 2019 Manos Pitsidianakis + +This file is part of meli. + +meli is free software: you can redistribute it and/or modify +it under the terms of the GNU General Public License as published by +the Free Software Foundation, either version 3 of the License, or +(at your option) any later version. + +meli is distributed in the hope that it will be useful, +but WITHOUT ANY WARRANTY; without even the implied warranty of +MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +GNU General Public License for more details. + +You should have received a copy of the GNU General Public License +along with meli. If not, see . +""" + +import msgpack +import socket +import time +import struct +import json +import sys + + +class IPCError(Exception): + pass + +class UnknownMessageClass(IPCError): + pass + +class InvalidSerialization(IPCError): + pass + +class ConnectionClosed(IPCError): + pass + + +def _read_objects(sock): + unpacker = msgpack.Unpacker() + ret = [] + #reader = socket.socket.makefile(sock, 'rb') + while True: + try: + buf = sock.recv(1024**2) + if not buf: + break + unpacker.feed(buf) + for o in unpacker: + ret.append(o) + except: + break + return ret + + #try: + # for unpack in unpacker: + # return unpack + #except Exception as e: + # print("_read_objects error ", e, file=sys.stderr,) + # return None + #finally: + # reader.flush() + +def _write_objects(sock, objects): + data = msgpack.packb(objects) + sock.sendall(data) + +def _recursive_subclasses(cls): + classmap = {} + for subcls in cls.__subclasses__(): + classmap[subcls.__name__] = subcls + classmap.update(_recursive_subclasses(subcls)) + return classmap + + +class Client(object): + def __init__(self, server_address): + self.addr = server_address + if isinstance(self.addr, str): + address_family = socket.AF_UNIX + else: + address_family = socket.AF_INET + self.sock = socket.socket(address_family, socket.SOCK_STREAM) + self.sock.setblocking(0) + + def connect(self): + try: + self.sock.connect(self.addr) + print("connected", file=sys.stderr) + except socket.error as msg: + print(msg,file=sys.stderr, ) + sys.exit(1) + + def close(self): + self.sock.close() + + def __enter__(self): + self.connect() + return self + + def __exit__(self, exc_type, exc_value, traceback): + self.close() + + def send(self, objects): + _write_objects(self.sock, objects) + print("wrote object ", objects, file=sys.stderr) + return self.read() + + def read(self): + return _read_objects(self.sock) + +if __name__ == "__main__": + sock = socket.socket(socket.AF_UNIX, socket.SOCK_STREAM) + server_address = './soworkfile' + client = Client(server_address) + client.connect() + client.send({ "version": "dev" }) + counter = 0 + try: + while True: + message = "This is the message. And this is the well {}.".format(counter) + counter += 1 + time.sleep(0.05) + print('sending {!r}'.format(message),file=sys.stderr, ) + print('returned :', client.send(message), file=sys.stderr,) + except Exception as msg: + print(msg, file=sys.stderr,) + diff --git a/ui/src/state.rs b/ui/src/state.rs index d1237faf1..e46c41778 100644 --- a/ui/src/state.rs +++ b/ui/src/state.rs @@ -29,6 +29,7 @@ Input is received in the main loop from threads which listen on the stdin for us */ use super::*; +use crate::plugins::{Plugin, PluginManager}; use melib::backends::{FolderHash, NotifyFn}; use crossbeam::channel::{bounded, unbounded, Receiver, Sender}; @@ -96,6 +97,7 @@ pub struct Context { receiver: Receiver, input: InputHandler, work_controller: WorkController, + plugin_manager: PluginManager, pub temp_files: Vec, } @@ -209,6 +211,10 @@ impl State { let input_thread = unbounded(); let backends = Backends::new(); let settings = Settings::new()?; + let mut plugin_manager = PluginManager::new(); + for (_, p) in settings.plugins.clone() { + plugin_manager.register(p)?; + } let termsize = termion::terminal_size()?; let cols = termsize.0 as usize; @@ -274,6 +280,7 @@ impl State { replies: VecDeque::with_capacity(5), temp_files: Vec::new(), work_controller, + plugin_manager, sender, receiver,