Plugins WIP #2

async
Manos Pitsidianakis 2019-12-27 15:20:02 +02:00
parent 12509748f6
commit b964a6a033
Signed by: Manos Pitsidianakis
GPG Key ID: 73627C2F690DF710
15 changed files with 932 additions and 123 deletions

3
.gitignore vendored
View File

@ -6,3 +6,6 @@ target/
**/*.rs.bk
.gdb_history
*.log
__pycache__/
*.py[cod]

11
Cargo.lock generated
View File

@ -1383,6 +1383,8 @@ source = "registry+https://github.com/rust-lang/crates.io-index"
dependencies = [
"num-traits 0.2.8 (registry+https://github.com/rust-lang/crates.io-index)",
"rmp 0.8.8 (registry+https://github.com/rust-lang/crates.io-index)",
"serde 1.0.99 (registry+https://github.com/rust-lang/crates.io-index)",
"serde_bytes 0.11.3 (registry+https://github.com/rust-lang/crates.io-index)",
]
[[package]]
@ -1481,6 +1483,14 @@ name = "serde"
version = "1.0.99"
source = "registry+https://github.com/rust-lang/crates.io-index"
[[package]]
name = "serde_bytes"
version = "0.11.3"
source = "registry+https://github.com/rust-lang/crates.io-index"
dependencies = [
"serde 1.0.99 (registry+https://github.com/rust-lang/crates.io-index)",
]
[[package]]
name = "serde_derive"
version = "1.0.92"
@ -2282,6 +2292,7 @@ source = "registry+https://github.com/rust-lang/crates.io-index"
"checksum semver 0.9.0 (registry+https://github.com/rust-lang/crates.io-index)" = "1d7eb9ef2c18661902cc47e535f9bc51b78acd254da71d375c2f6720d9a40403"
"checksum semver-parser 0.7.0 (registry+https://github.com/rust-lang/crates.io-index)" = "388a1df253eca08550bef6c72392cfe7c30914bf41df5269b68cbd6ff8f570a3"
"checksum serde 1.0.99 (registry+https://github.com/rust-lang/crates.io-index)" = "fec2851eb56d010dc9a21b89ca53ee75e6528bab60c11e89d38390904982da9f"
"checksum serde_bytes 0.11.3 (registry+https://github.com/rust-lang/crates.io-index)" = "325a073952621257820e7a3469f55ba4726d8b28657e7e36653d1c36dc2c84ae"
"checksum serde_derive 1.0.92 (registry+https://github.com/rust-lang/crates.io-index)" = "46a3223d0c9ba936b61c0d2e3e559e3217dbfb8d65d06d26e8b3c25de38bae3e"
"checksum serde_json 1.0.39 (registry+https://github.com/rust-lang/crates.io-index)" = "5a23aa71d4a4d43fdbfaac00eff68ba8a06a51759a89ac3304323e800c4dd40d"
"checksum serde_urlencoded 0.6.1 (registry+https://github.com/rust-lang/crates.io-index)" = "9ec5d77e2d4c73717816afac02670d5c4f534ea95ed430442cad02e7a6e32c97"

View File

@ -28,7 +28,7 @@ libc = {version = "0.2.59", features = ["extra_traits",]}
nix = "0.15.0"
rusqlite = {version = "0.20.0", optional =true }
rmp = "^0.8"
rmpv = "^0.4.2"
rmpv = { version = "^0.4.2", features=["with-serde",] }
rmp-serde = "^0.14.0"
[features]

View File

@ -42,6 +42,7 @@ enum ViewMode {
Url,
Attachment(usize),
Raw,
Ansi(RawBuffer),
Subview,
ContactSelector(Selector<Card>),
}
@ -53,6 +54,12 @@ impl Default for ViewMode {
}
impl ViewMode {
fn is_ansi(&self) -> bool {
match self {
ViewMode::Ansi(_) => true,
_ => false,
}
}
fn is_attachment(&self) -> bool {
match self {
ViewMode::Attachment(_) => true,
@ -315,6 +322,7 @@ impl MailView {
ret.push_str(&attachments[aidx].text());
ret
}
ViewMode::Ansi(_) => "Viewing attachment. Press `r` to return \n".to_string(),
}
}
@ -609,6 +617,17 @@ impl Component for MailView {
};
self.pager = Pager::from_string(text, Some(context), None, None);
}
ViewMode::Ansi(ref buf) => {
write_string_to_grid(
&format!("Viewing `{}`. Press `r` to return", buf.title()),
grid,
Color::Default,
Color::Default,
Attr::Default,
(set_y(upper_left, y), bottom_right),
Some(get_x(upper_left)),
);
}
_ => {
let text = {
self.attachment_to_text(&body, context)
@ -633,6 +652,9 @@ impl Component for MailView {
s.draw(grid, (set_y(upper_left, y), bottom_right), context);
}
}
ViewMode::Ansi(ref mut buf) => {
buf.draw(grid, (set_y(upper_left, y + 1), bottom_right), context);
}
_ => {
self.pager
.draw(grid, (set_y(upper_left, y), bottom_right), context);
@ -647,6 +669,11 @@ impl Component for MailView {
fn process_event(&mut self, event: &mut UIEvent, context: &mut Context) -> bool {
let shortcuts = self.get_shortcuts(context);
match self.mode {
ViewMode::Ansi(ref mut buf) => {
if buf.process_event(event, context) {
return true;
}
}
ViewMode::Subview => {
if let Some(s) = self.subview.as_mut() {
if s.process_event(event, context) {
@ -799,6 +826,7 @@ impl Component for MailView {
}
UIEvent::Input(ref key)
if (self.mode.is_attachment()
|| self.mode.is_ansi()
|| self.mode == ViewMode::Subview
|| self.mode == ViewMode::Url
|| self.mode == ViewMode::Raw)
@ -952,8 +980,35 @@ impl Component for MailView {
name_opt = name.as_ref().map(|n| n.clone());
}
if let Ok(binary) = binary {
let p =
create_temp_file(&decode(u, None), name_opt, None, true);
let p = create_temp_file(
&decode(u, None),
name_opt.as_ref().map(String::as_str),
None,
true,
);
match debug!(context.plugin_manager.activate_hook(
"attachment-view",
p.path().display().to_string().into_bytes()
)) {
Ok(crate::plugins::FilterResult::Ansi(s)) => {
if let Some(buf) =
crate::terminal::ansi::ansi_to_cellbuffer(&s)
{
let raw_buf = RawBuffer::new(buf, name_opt);
self.mode = ViewMode::Ansi(raw_buf);
self.dirty = true;
return true;
}
}
Ok(crate::plugins::FilterResult::UiMessage(s)) => {
context.replies.push_back(UIEvent::Notification(
None,
s,
Some(NotificationType::ERROR),
));
}
_ => {}
}
Command::new(&binary)
.arg(p.path())
.stdin(Stdio::piped())
@ -1318,6 +1373,8 @@ impl Component for MailView {
|| self.subview.as_ref().map(|p| p.is_dirty()).unwrap_or(false)
|| if let ViewMode::ContactSelector(ref s) = self.mode {
s.is_dirty()
} else if let ViewMode::Ansi(ref r) = self.mode {
r.is_dirty()
} else {
false
}
@ -1346,6 +1403,7 @@ impl Component for MailView {
let mut our_map = context.settings.shortcuts.envelope_view.key_values();
if !(self.mode.is_attachment()
|| self.mode.is_ansi()
|| self.mode == ViewMode::Subview
|| self.mode == ViewMode::Raw
|| self.mode == ViewMode::Url)

View File

@ -437,7 +437,7 @@ impl Component for EnvelopeView {
if let Ok(binary) = binary {
let p = create_temp_file(
&decode(u, None),
name.as_ref().map(|n| n.clone()),
name.as_ref().map(String::as_str),
None,
true,
);

View File

@ -2246,9 +2246,10 @@ impl<T: PartialEq + Debug + Clone + Sync + Send> Selector<T> {
}
}
#[derive(Debug)]
#[derive(Debug, Clone, PartialEq)]
pub struct RawBuffer {
pub buf: CellBuffer,
title: Option<String>,
cursor: (usize, usize),
dirty: bool,
}
@ -2329,11 +2330,18 @@ impl Component for RawBuffer {
}
impl RawBuffer {
pub fn new(buf: CellBuffer) -> Self {
pub fn new(buf: CellBuffer, title: Option<String>) -> Self {
RawBuffer {
buf,
title,
dirty: true,
cursor: (0, 0),
}
}
pub fn title(&self) -> &str {
self.title
.as_ref()
.map(String::as_str)
.unwrap_or("untitled")
}
}

View File

@ -148,10 +148,8 @@ impl From<FileAccount> for AccountConf {
let root_tmp = root_path
.components()
.last()
.unwrap()
.as_os_str()
.to_str()
.unwrap()
.and_then(|c| c.as_os_str().to_str())
.unwrap_or("")
.to_string();
if !acc.subscribed_folders.contains(&root_tmp) {
acc.subscribed_folders.push(root_tmp);
@ -339,7 +337,17 @@ impl FileSettings {
e.to_string()
))
})?;
let backends = melib::backends::Backends::new();
let mut backends = melib::backends::Backends::new();
let plugin_manager = crate::plugins::PluginManager::new();
for (_, p) in s.plugins.clone() {
if crate::plugins::PluginKind::Backend == p.kind() {
crate::plugins::backend::PluginBackend::register(
plugin_manager.listener(),
p.clone(),
&mut backends,
);
}
}
for (name, acc) in s.accounts {
let FileAccount {
root_folder,

View File

@ -19,23 +19,25 @@
* along with meli. If not, see <http://www.gnu.org/licenses/>.
*/
use crate::workers::WorkController;
use melib::error::{MeliError, Result};
use rmpv::{Value, ValueRef};
use std::any::TypeId;
use rmpv::Value;
use std::collections::HashMap;
use std::io::{self, BufRead, BufReader};
use std::io::{Read, Write};
use std::io::Write;
use std::os::unix::net::{UnixListener, UnixStream};
use std::path::{Path, PathBuf};
use std::process::{Command, Stdio};
use std::thread;
use std::thread::ThreadId;
use std::process::Stdio;
use uuid::Uuid;
#[derive(Debug, Clone, Serialize, Deserialize)]
pub mod backend;
pub mod rpc;
pub use rpc::*;
pub const BACKEND_FN: i8 = 0;
#[derive(Debug, Clone, Copy, PartialEq, Serialize, Deserialize)]
pub enum PluginKind {
LongLived,
Ephemeral,
Filter,
Backend,
}
impl Default for PluginKind {
@ -49,13 +51,24 @@ pub struct Plugin {
kind: PluginKind,
executable: String,
name: String,
#[serde(default)]
hooks: Vec<String>,
}
impl Plugin {
pub fn kind(&self) -> PluginKind {
self.kind
}
}
#[derive(Debug)]
pub struct PluginManager {
plugins: HashMap<String, Plugin>,
instances: HashMap<String, std::process::Child>,
sessions: HashMap<Uuid, String>,
instances: HashMap<Uuid, std::process::Child>,
streams: HashMap<Uuid, RpcChannel>,
hooks: HashMap<String, UIHook>,
listener: UnixListener,
}
impl Drop for PluginManager {
@ -68,57 +81,81 @@ impl PluginManager {
pub fn new() -> Self {
let _ = std::fs::remove_file("./soworkfile");
let listener = UnixListener::bind("./soworkfile").unwrap();
debug!("bound");
// accept connections and process them, spawning a new thread for each one
thread::spawn(move || {
debug!("spawn");
let stream = listener.accept();
debug!("socket stream {:?}", &stream);
match stream {
Ok((mut stream, _)) => {
debug!("socket stream {:?}", &stream);
/* connection succeeded */
thread::spawn(move || {
debug!("socket listen {:?}", &stream);
debug!(initialize(stream));
//let mut response = Vec::new();
//debug!(stream.read_to_end(&mut response));
//loop {
// debug!("pre-flush 1");
// stream.flush();
// debug!("post-flush 1");
// if debug!(rmpv::decode::value::read_value(&mut stream)).is_err() {
// return;
// }
// debug!("post-read_value");
// //debug!("socket response {}", unsafe {
// // String::from_utf8_lossy(&response)
// //});
// stream.flush();
// debug!("post-flush 2");
// if debug!(rmpv::encode::write_value(
// &mut stream,
// &rmpv::Value::String("hello 2 u 2".into())
// ))
// .is_err()
// {
// return;
// }
// debug!("post-write_value");
//}
});
/*
debug!("bound");
// accept connections and process them, spawning a new thread for each one
thread::spawn(move || {
debug!("spawn");
let stream = listener.accept();
debug!("socket stream {:?}", &stream);
match stream {
Ok((mut stream, _)) => {
debug!("socket stream {:?}", &stream);
/* connection succeeded */
thread::spawn(move || {
debug!("socket listen {:?}", &stream);
debug!(initialize(stream));
//let mut response = Vec::new();
//debug!(stream.read_to_end(&mut response));
//loop {
// debug!("pre-flush 1");
// stream.flush();
// debug!("post-flush 1");
// if debug!(rmpv::decode::value::read_value(&mut stream)).is_err() {
// return;
// }
// debug!("post-read_value");
// //debug!("socket response {}", unsafe {
// // String::from_utf8_lossy(&response)
// //});
// stream.flush();
// debug!("post-flush 2");
// if debug!(rmpv::encode::write_value(
// &mut stream,
// &rmpv::Value::String("hello 2 u 2".into())
// ))
// .is_err()
// {
// return;
// }
// debug!("post-write_value");
//}
});
}
Err(err) => {
/* connection failed */
debug!(err);
}
}
Err(err) => {
/* connection failed */
debug!(err);
}
}
});
});
*/
let mut hooks: HashMap<String, UIHook> = Default::default();
hooks.insert(
"attachment-view".to_string(),
UIHook {
name: "attachment-view".to_string(),
wait_response: true,
listeners: Vec::new(),
},
);
hooks.insert(
"refresh-account".to_string(),
UIHook {
name: "refresh-account".to_string(),
wait_response: false,
listeners: Vec::new(),
},
);
PluginManager {
plugins: Default::default(),
sessions: Default::default(),
instances: Default::default(),
hooks: Default::default(),
streams: Default::default(),
hooks,
listener,
}
}
@ -128,19 +165,38 @@ impl PluginManager {
PluginKind::LongLived => {
/* spawn thread */
let parts = split_command!(&plugin.executable);
let mut child = std::process::Command::new(&parts[0])
let child = std::process::Command::new(&parts[0])
.args(&parts[1..])
.stdin(Stdio::piped())
.stdout(Stdio::piped())
.spawn()?;
let (mut stream, _) = self.listener.accept()?;
/* send init message to plugin to register hooks */
let session = Uuid::new_v4();
let channel = RpcChannel::new(stream, &session)?;
/* add thread to workcontroller */
self.instances.insert(plugin.name.clone(), child);
for h in &plugin.hooks {
self.add_listener(h, session.clone());
}
self.instances.insert(session.clone(), child);
self.sessions.insert(session.clone(), plugin.name.clone());
self.streams.insert(session.clone(), channel);
self.plugins.insert(plugin.name.clone(), plugin);
Ok(())
}
PluginKind::Filter => {
let session = Uuid::new_v4();
for h in &plugin.hooks {
self.add_listener(h, session.clone());
}
self.sessions.insert(session.clone(), plugin.name.clone());
self.plugins.insert(plugin.name.clone(), plugin);
/* send init message to plugin to register hooks */
Ok(())
}
PluginKind::Ephemeral => {
PluginKind::Backend => {
self.plugins.insert(plugin.name.clone(), plugin);
/* send init message to plugin to register hooks */
Ok(())
@ -151,57 +207,67 @@ impl PluginManager {
pub fn register_hook(&mut self, hook: UIHook) {
self.hooks.insert(hook.name.clone(), hook);
}
pub fn add_listener(&mut self, hook: &str, session: Uuid) {
self.hooks
.entry(hook.to_string())
.and_modify(|entry| entry.listeners.push(session));
}
pub fn activate_hook(&mut self, hook: &str, bytes: Vec<u8>) -> Result<FilterResult> {
debug!("activate_hook {}", hook);
debug!("bytes {:?}", &bytes);
for l in &self.hooks[hook].listeners {
let plugin = &self.plugins[&self.sessions[l]];
debug!(&plugin);
match &plugin.kind {
PluginKind::LongLived => {
debug!("listener: {}", l);
let channel = self.streams.get_mut(l).unwrap();
channel.write_ref(&rmpv::ValueRef::Binary(bytes.as_slice()));
let reply: Result<FilterResult> = channel.from_read();
return reply;
}
PluginKind::Filter => {
let parts = split_command!(&plugin.executable);
let child = std::process::Command::new(&parts[0])
.args(&parts[1..])
.stdin(Stdio::piped())
.stdout(Stdio::piped())
.spawn()?;
let (mut stream, _) = self.listener.accept()?;
let mut channel = RpcChannel::new(stream, l)?;
channel.write_ref(&rmpv::ValueRef::Binary(bytes.as_slice()));
let reply: Result<FilterResult> = channel.from_read();
return reply;
}
k => {
debug!("got plugin kind {:?} in hook {}", k, hook);
}
}
}
Err(MeliError::new("no listeners for this hook"))
}
pub fn listener(&self) -> UnixListener {
self.listener.try_clone().unwrap()
}
}
#[derive(Debug)]
pub struct UIHook {
name: String,
listeners: Vec<String>,
kind: TypeId,
wait_response: bool,
listeners: Vec<Uuid>,
}
#[derive(Debug, Serialize, Deserialize)]
pub struct PluginGreeting {
version: String,
}
pub fn initialize(mut stream: UnixStream) -> Result<()> {
let greeting: std::result::Result<PluginGreeting, _> =
rmp_serde::decode::from_read(&mut stream);
match debug!(greeting) {
Ok(greeting) => {
if greeting.version != "dev" {
return Err("Plugin is not compatible with our API (dev)".into());
}
}
Err(err) => {
return Err(MeliError::new(err.to_string()));
}
}
loop {
debug!("pre-flush 1");
stream.flush();
debug!("post-flush 1");
if debug!(rmpv::decode::value::read_value(&mut stream)).is_err() {
break;
}
debug!("post-read_value");
//debug!("socket response {}", unsafe {
// String::from_utf8_lossy(&response)
//});
stream.flush();
debug!("post-flush 2");
if debug!(rmpv::encode::write_value(
&mut stream,
&rmpv::Value::String("hello 2 u 2".into())
))
.is_err()
{
break;
}
debug!("post-write_value");
}
return Ok(());
#[derive(Debug, Clone, Serialize, Deserialize)]
#[serde(rename_all = "snake_case")]
#[serde(tag = "t", content = "c")]
pub enum FilterResult {
UiMessage(String),
Text(String),
Ansi(String),
Binary(Vec<u8>),
Error(String),
}

View File

@ -0,0 +1,190 @@
/*
* meli - plugins
*
* Copyright 2019 Manos Pitsidianakis
*
* This file is part of meli.
*
* meli is free software: you can redistribute it and/or modify
* it under the terms of the GNU General Public License as published by
* the Free Software Foundation, either version 3 of the License, or
* (at your option) any later version.
*
* meli is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* GNU General Public License for more details.
*
* You should have received a copy of the GNU General Public License
* along with meli. If not, see <http://www.gnu.org/licenses/>.
*/
use super::*;
use fnv::FnvHashMap;
use melib::async_workers::{Async, AsyncBuilder, AsyncStatus, WorkContext};
use melib::backends::FolderHash;
use melib::backends::{
Backend, BackendOp, Backends, Folder, MailBackend, RefreshEvent, RefreshEventConsumer,
};
use melib::conf::AccountSettings;
use melib::email::{Envelope, EnvelopeHash, Flag};
use melib::error::{MeliError, Result};
use std::collections::BTreeMap;
use std::sync::{Arc, Mutex, RwLock};
#[derive(Debug)]
pub struct PluginBackend {
plugin: Plugin,
child: std::process::Child,
channel: Arc<Mutex<RpcChannel>>,
is_online: Arc<Mutex<(std::time::Instant, Result<()>)>>,
}
impl MailBackend for PluginBackend {
fn is_online(&self) -> Result<()> {
if let Ok(mut is_online) = self.is_online.try_lock() {
let now = std::time::Instant::now();
if now.duration_since(is_online.0) >= std::time::Duration::new(2, 0) {
let mut channel = self.channel.lock().unwrap();
channel.write_ref(&rmpv::ValueRef::Ext(BACKEND_FN, b"is_online"))?;
debug!(channel.expect_ack())?;
let ret: PluginResult<()> = debug!(channel.from_read())?;
is_online.0 = now;
is_online.1 = ret.into();
}
is_online.1.clone()
} else {
Err(MeliError::new("busy"))
}
}
fn connect(&mut self) {}
fn get(&mut self, folder: &Folder) -> Async<Result<Vec<Envelope>>> {
let mut w = AsyncBuilder::new();
let folder_hash = folder.hash();
let channel = self.channel.clone();
let handle = {
let tx = w.tx();
let closure = move |_work_context| {
let mut channel = channel.lock().unwrap();
channel
.write_ref(&rmpv::ValueRef::Ext(BACKEND_FN, b"get"))
.unwrap();
channel.expect_ack().unwrap();
loop {
let read_val: Result<PluginResult<Option<Vec<String>>>> =
debug!(channel.from_read());
match read_val.map(Into::into).and_then(std::convert::identity) {
Ok(Some(a)) => {
tx.send(AsyncStatus::Payload(Ok(a
.into_iter()
.filter_map(|s| Envelope::from_bytes(s.as_bytes(), None).ok())
.collect::<Vec<Envelope>>())))
.unwrap();
}
Ok(None) => {
tx.send(AsyncStatus::Finished).unwrap();
return;
}
Err(err) => {
tx.send(AsyncStatus::Payload(Err(err))).unwrap();
tx.send(AsyncStatus::Finished).unwrap();
return;
}
};
}
};
Box::new(closure)
};
w.build(handle)
}
fn refresh(
&mut self,
_folder_hash: FolderHash,
_sender: RefreshEventConsumer,
) -> Result<Async<Result<Vec<RefreshEvent>>>> {
Err(MeliError::new("Unimplemented."))
}
fn watch(
&self,
sender: RefreshEventConsumer,
work_context: WorkContext,
) -> Result<std::thread::ThreadId> {
Err(MeliError::new("Unimplemented."))
}
fn folders(&self) -> Result<FnvHashMap<FolderHash, Folder>> {
let mut ret: FnvHashMap<FolderHash, Folder> = Default::default();
ret.insert(0, Folder::default());
Ok(ret)
}
fn operation(&self, hash: EnvelopeHash) -> Box<dyn BackendOp> {
unimplemented!()
}
fn save(&self, bytes: &[u8], folder: &str, flags: Option<Flag>) -> Result<()> {
Err(MeliError::new("Unimplemented."))
}
fn create_folder(&mut self, name: String) -> Result<Folder> {
Err(MeliError::new("Unimplemented."))
}
fn tags(&self) -> Option<Arc<RwLock<BTreeMap<u64, String>>>> {
None
}
fn as_any(&self) -> &dyn::std::any::Any {
self
}
}
impl PluginBackend {
pub fn new(
listener: UnixListener,
plugin: Plugin,
_s: &AccountSettings,
_is_subscribed: Box<dyn Fn(&str) -> bool>,
) -> Result<Box<dyn MailBackend>> {
if plugin.kind != PluginKind::Backend {
return Err(MeliError::new(format!(
"Error: Plugin `{}` is not a mail backend plugin, it's `{:?}`",
&plugin.name, &plugin.kind
)));
}
let parts = split_command!(&plugin.executable);
let child = std::process::Command::new(&parts[0])
.args(&parts[1..])
.stdin(Stdio::piped())
.stdout(Stdio::piped())
.spawn()?;
let (mut stream, _) = listener.accept()?;
/* send init message to plugin to register hooks */
let session = Uuid::new_v4();
let channel = RpcChannel::new(stream, &session)?;
let now = std::time::Instant::now() - std::time::Duration::from_secs(5);
Ok(Box::new(PluginBackend {
child,
plugin,
channel: Arc::new(Mutex::new(channel)),
is_online: Arc::new(Mutex::new((now, Err(MeliError::new("Unitialized"))))),
}))
}
pub fn register(listener: UnixListener, plugin: Plugin, backends: &mut Backends) {
backends.register(
plugin.name.clone(),
Backend {
create_fn: Box::new(move || {
let plugin = plugin.clone();
let listener = listener.try_clone().unwrap();
Box::new(move |f, i| {
let plugin = plugin.clone();
let listener = listener.try_clone().unwrap();
PluginBackend::new(listener, plugin, f, i)
})
}),
validate_conf_fn: Box::new(|_| Ok(())),
},
);
}
}

View File

@ -0,0 +1,48 @@
#! /usr/bin/env python3
"""
meli - sample plugin
Copyright 2019 Manos Pitsidianakis
This file is part of meli.
meli is free software: you can redistribute it and/or modify
it under the terms of the GNU General Public License as published by
the Free Software Foundation, either version 3 of the License, or
(at your option) any later version.
meli is distributed in the hope that it will be useful,
but WITHOUT ANY WARRANTY; without even the implied warranty of
MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
GNU General Public License for more details.
You should have received a copy of the GNU General Public License
along with meli. If not, see <http://www.gnu.org/licenses/>.
"""
import sys
import subprocess
print(sys.path, file=sys.stderr)
from libmeliapi import Client
if __name__ == "__main__":
server_address = './soworkfile'
client = Client(server_address)
client.connect()
try:
_bytes = client.read()
print('got bytes {!r}'.format(_bytes),file=sys.stderr, )
# run() returns a CompletedProcess object if it was successful
# errors in the created process are raised here too
process = subprocess.run(['tiv','-w', '120','-h', '40', _bytes[0]], check=True, stdout=subprocess.PIPE, universal_newlines=True)
output = process.stdout
print('tiv output len {}'.format(len(output)),file=sys.stderr, )
#print('tiv output bytes {!r}'.format(output),file=sys.stderr, )
message = { "t": "ansi", "c": output }
#print('sending {!r}'.format(message),file=sys.stderr, )
print('returned :', client.send(message), file=sys.stderr,)
except Exception as msg:
print(msg, file=sys.stderr,)

View File

@ -0,0 +1,173 @@
"""
meli - python3 api plugin
Copyright 2019 Manos Pitsidianakis
This file is part of meli.
meli is free software: you can redistribute it and/or modify
it under the terms of the GNU General Public License as published by
the Free Software Foundation, either version 3 of the License, or
(at your option) any later version.
meli is distributed in the hope that it will be useful,
but WITHOUT ANY WARRANTY; without even the implied warranty of
MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
GNU General Public License for more details.
You should have received a copy of the GNU General Public License
along with meli. If not, see <http://www.gnu.org/licenses/>.
"""
from collections import deque
import errno
import json
import msgpack
import socket
import struct
import sys
import time
class IPCError(Exception):
pass
class UnknownMessageClass(IPCError):
pass
class InvalidSerialization(IPCError):
pass
class ConnectionClosed(IPCError):
pass
def _read_objects(sock):
unpacker = msgpack.Unpacker()
ret = []
#reader = socket.socket.makefile(sock, 'rb')
counter = 0
while True:
print("[libmeliapi]: _read_objects loop = ", counter, flush=True, file=sys.stderr)
counter += 1
try:
buf = sock.recv(1024**2)
if not buf:
break
unpacker.feed(buf)
for o in unpacker:
ret.append(o)
except:
break
return ret
#try:
# for unpack in unpacker:
# return unpack
#except Exception as e:
# print("[libmeliapi]: ", "_read_objects error ", e, file=sys.stderr,)
# return None
#finally:
# reader.flush()
def _write_objects(sock, objects):
sys.stderr.flush()
print("[libmeliapi]: ", "_write_objects ", objects, flush=True, file=sys.stderr, )
data = msgpack.packb(objects)
#print("[libmeliapi]: ", "_write_objects data ", data, flush=True, file=sys.stderr, )
sent = 0
while sent < len(data):
try:
_len = min(len(data[sent:]), 2048)
sent += sock.send(data[sent:sent+_len])
except IOError as e:
print("[libmeliapi]: IOError: ", e, e.errno, flush=True, file=sys.stderr, )
sys.stderr.flush()
if e.errno == errno.EWOULDBLOCK:
break
else:
raise
class Client(object):
def __init__(self, server_address):
self.buffer = deque()
self.addr = server_address
address_family = socket.AF_UNIX
self.sock = socket.socket(address_family, socket.SOCK_STREAM)
self.sock.setblocking(0)
def connect(self):
try:
self.sock.connect(self.addr)
print("[libmeliapi]: ", "self.send({ \"version\": \"dev\" }) = ",self.send({ "version": "dev" }), flush=True, file=sys.stderr)
self.expect_ack()
self._session = self.read()
self.ack()
print("[libmeliapi]: ", "self.buffer =", self.buffer, flush=True, file=sys.stderr, )
print("[libmeliapi]: ", "connected, session id is", self._session, flush=True, file=sys.stderr)
except socket.error as msg:
print("[libmeliapi]: ", msg, flush=True, file=sys.stderr, )
sys.stderr.flush()
sys.exit(1)
def close(self):
self.sock.close()
def setblocking(self, new_val):
self.sock.setblocking(new_val)
def __enter__(self):
self.connect()
return self
def __exit__(self, exc_type, exc_value, traceback):
self.close()
def send(self, objects):
sys.stderr.flush()
print("[libmeliapi]: ", "stuck in send ", self.buffer, flush=True, file=sys.stderr, )
_write_objects(self.sock, objects)
print("[libmeliapi]: ", "unstuck wrote objs", flush=True, file=sys.stderr, )
#print("[libmeliapi]: ", "wrote object ", objects, file=sys.stderr)
time.sleep(0.1)
def ack(self):
sys.stderr.flush()
_write_objects(self.sock, 0x06)
time.sleep(0.1)
def expect_ack(self):
print("[libmeliapi]: expect_ack, ", self.buffer, flush=True, file=sys.stderr, )
read_list = _read_objects(self.sock)
time.sleep(0.1)
self.buffer.extend(read_list)
if len(self.buffer) > 0 and self.buffer.popleft() == 0x6:
print("[libmeliapi]: got_ack, ", self.buffer, flush=True, file=sys.stderr, )
return
else:
raise "ACK expected"
def read(self):
sys.stderr.flush()
print("[libmeliapi]: ", "stuck in read ", self.buffer, flush=True, file=sys.stderr, )
read_list = _read_objects(self.sock)
time.sleep(0.1)
self.buffer.extend(read_list)
print("[libmeliapi]: ", "unstuck read self.buffer =", self.buffer, flush=True, file=sys.stderr, )
if len(self.buffer) > 0:
return self.buffer.popleft()
else:
return None
@property
def backend_fn_type(self):
return 0
def backend_fn_ok_send(self, objects):
self.send({"t": "ok", "c": objects })
self.expect_ack()
def backend_fn_err_send(self, objects):
self.send({"t": "err", "c": objects })
self.expect_ack()

View File

@ -0,0 +1,92 @@
#! /usr/bin/env python3
"""
meli - sample plugin
Copyright 2019 Manos Pitsidianakis
This file is part of meli.
meli is free software: you can redistribute it and/or modify
it under the terms of the GNU General Public License as published by
the Free Software Foundation, either version 3 of the License, or
(at your option) any later version.
meli is distributed in the hope that it will be useful,
but WITHOUT ANY WARRANTY; without even the implied warranty of
MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
GNU General Public License for more details.
You should have received a copy of the GNU General Public License
along with meli. If not, see <http://www.gnu.org/licenses/>.
"""
import sys
import time
import subprocess
import msgpack
import nntplib
import libmeliapi
import itertools
def chunks(iterable, n):
while True:
try:
yield itertools.chain((next(iterable),), itertools.islice(iterable, n-1))
except:
break
if __name__ == "__main__":
import importlib
importlib.reload(libmeliapi)
server_address = './soworkfile'
client = libmeliapi.Client(server_address)
client.connect()
#client.setblocking(True)
try:
counter = 0
while True:
print("[nntp-plugin]: loop = ", counter, flush=True, file=sys.stderr)
counter += 1
req = client.read()
if req is None:
time.sleep(0.15)
continue
#client.setblocking(True)
client.ack()
print("[nntp-plugin]: ", "req: ", req, flush=True, file=sys.stderr)
sys.stderr.flush()
if isinstance(req, msgpack.ExtType):
print("[nntp-plugin]: ", req, flush=True, file=sys.stderr)
if req.data == b'is_online':
client.backend_fn_ok_send(None)
elif req.data == b'get':
s = nntplib.NNTP('news.gmane.org')
resp, count, first, last, name = s.group('gmane.comp.python.committers')
print('Group', name, 'has', count, 'articles, range', first, 'to', last, flush=True, file=sys.stderr)
resp, overviews = s.over((last - 9, last))
ids = []
for id, over in overviews:
ids.append(id)
print(id, nntplib.decode_header(over['subject']), flush=True, file=sys.stderr)
for chunk in chunks(iter(ids), 2):
ret = []
for _id in chunk:
resp, info = s.article(_id)
#print(_id, " line0 = ", str(info.lines[0], 'utf-8', 'ignore'))
elem = b'\n'.join(info.lines)
ret.append(str(elem, 'utf-8', 'ignore'))
print("ret len = ", len(ret), flush=True,file=sys.stderr)
client.backend_fn_ok_send(ret)
time.sleep(0.85)
s.quit()
client.backend_fn_ok_send(None)
#client.setblocking(True)
time.sleep(0.15)
except Exception as msg:
print("[nntp-plugin]: ", msg, flush=True, file=sys.stderr,)
sys.stderr.flush()

View File

@ -0,0 +1,144 @@
/*
* meli - plugins
*
* Copyright 2019 Manos Pitsidianakis
*
* This file is part of meli.
*
* meli is free software: you can redistribute it and/or modify
* it under the terms of the GNU General Public License as published by
* the Free Software Foundation, either version 3 of the License, or
* (at your option) any later version.
*
* meli is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* GNU General Public License for more details.
*
* You should have received a copy of the GNU General Public License
* along with meli. If not, see <http://www.gnu.org/licenses/>.
*/
use super::*;
use rmp_serde::Deserializer;
use serde::{Deserialize, Serialize};
#[derive(Debug)]
pub struct RpcChannel {
stream: UnixStream,
session: Uuid,
}
#[derive(Debug, Serialize, Deserialize)]
#[serde(rename_all = "snake_case")]
pub struct PluginGreeting {
version: String,
}
impl RpcChannel {
pub fn new(mut stream: UnixStream, session: &Uuid) -> Result<RpcChannel> {
let mut ret = RpcChannel {
stream,
session: session.clone(),
};
let greeting: PluginGreeting = ret.from_read().map_err(|err| {
MeliError::new(format!("Could not get correct plugin greeting: {}", err))
})?;
debug!(&greeting);
//if greeting.version != "dev" {
// return Err("Plugin is not compatible with our API (dev)".into());
//}
ret.write_ref(&rmpv::ValueRef::String(session.to_string().as_str().into()))?;
debug!(ret.expect_ack())?;
Ok(ret)
}
pub fn expect_ack(&mut self) -> Result<()> {
debug!("expect_ack()");
let ack: u32 = debug!(rmp_serde::decode::from_read(&mut self.stream))
.map_err(|_| MeliError::new("Plugin did not return ACK."))?;
if 0x6 == ack {
Ok(())
} else {
Err(MeliError::new("Plugin did not return ACK."))
}
}
pub fn ack(&mut self) -> Result<()> {
debug!("ack()");
debug!(rmpv::encode::write_value_ref(
&mut self.stream,
&rmpv::ValueRef::Integer(0x6.into())
))
.map_err(|err| MeliError::new(err.to_string()))?;
let _ = self.stream.flush();
Ok(())
}
pub fn write_ref(&mut self, value_ref: &rmpv::ValueRef) -> Result<()> {
debug!("write_ref() {:?}", value_ref);
debug!(rmpv::encode::write_value_ref(&mut self.stream, value_ref))
.map_err(|err| MeliError::new(err.to_string()))?;
let _ = self.stream.flush();
Ok(())
}
pub fn read(&mut self) -> Result<rmpv::Value> {
debug!("read()");
let ret: RpcResult = debug!(rmp_serde::decode::from_read(&mut self.stream))
.map_err(|err| MeliError::new(err.to_string()))?;
let _ = self.stream.flush();
self.ack();
debug!("read() ret={:?}", &ret);
ret.into()
}
pub fn from_read<T>(&mut self) -> Result<T>
where
T: core::fmt::Debug + serde::de::DeserializeOwned,
{
debug!("from_read()");
let ret: Result<T> = debug!(rmp_serde::decode::from_read(&mut self.stream))
.map_err(|err| MeliError::new(err.to_string()));
let _ = self.stream.flush();
self.ack();
debug!("read() ret={:?}", &ret);
ret
}
}
#[derive(Debug, Clone, Serialize, Deserialize)]
#[serde(rename_all = "snake_case")]
#[serde(tag = "t", content = "c")]
enum RpcResult {
Ok(rmpv::Value),
Err(String),
}
impl RpcResult {
fn into(self) -> Result<rmpv::Value> {
match self {
RpcResult::Ok(v) => Ok(v),
RpcResult::Err(err) => Err(MeliError::new(err)),
}
}
}
#[derive(Debug, Clone, Serialize, Deserialize)]
#[serde(rename_all = "snake_case")]
#[serde(tag = "t", content = "c")]
pub enum PluginResult<T: core::fmt::Debug + Clone> {
Ok(T),
Err(String),
}
impl<T: core::fmt::Debug + Clone + serde::Serialize + serde::de::DeserializeOwned> Into<Result<T>>
for PluginResult<T>
{
fn into(self) -> Result<T> {
match self {
PluginResult::Ok(v) => Ok(v),
PluginResult::Err(err) => Err(MeliError::new(err)),
}
}
}

View File

@ -29,7 +29,7 @@ Input is received in the main loop from threads which listen on the stdin for us
*/
use super::*;
use crate::plugins::{Plugin, PluginManager};
use crate::plugins::PluginManager;
use melib::backends::{FolderHash, NotifyFn};
use crossbeam::channel::{bounded, unbounded, Receiver, Sender};
@ -97,7 +97,7 @@ pub struct Context {
receiver: Receiver<ThreadEvent>,
input: InputHandler,
work_controller: WorkController,
plugin_manager: PluginManager,
pub plugin_manager: PluginManager,
pub temp_files: Vec<File>,
}
@ -209,10 +209,18 @@ impl State {
* stdin, see get_events() for details
* */
let input_thread = unbounded();
let backends = Backends::new();
let mut backends = Backends::new();
let settings = Settings::new()?;
let mut plugin_manager = PluginManager::new();
for (_, p) in settings.plugins.clone() {
if crate::plugins::PluginKind::Backend == p.kind() {
debug!("registering {:?}", &p);
crate::plugins::backend::PluginBackend::register(
plugin_manager.listener(),
p.clone(),
&mut backends,
);
}
plugin_manager.register(p)?;
}

View File

@ -69,7 +69,7 @@ impl File {
/// to reap it later.
pub fn create_temp_file(
bytes: &[u8],
filename: Option<String>,
filename: Option<&str>,
path: Option<&PathBuf>,
delete_on_drop: bool,
) -> File {