jmap: Incorporate eventsource crate as a module

https://crates.io/crates/eventsource
Manos Pitsidianakis 2020-04-02 13:40:40 +03:00
parent e875ba40ce
commit c5a99a9cf9
Signed by: Manos Pitsidianakis
GPG Key ID: 73627C2F690DF710
15 changed files with 180 additions and 528 deletions

View File

@ -74,6 +74,8 @@ use objects::*;
pub mod mailbox;
use mailbox::*;
pub mod eventsource;
#[derive(Debug, Default)]
pub struct EnvelopeCache {
bytes: Option<String>,

View File

@ -0,0 +1,48 @@
/*
* meli - jmap module.
*
* Copyright 2019 Lukas Werling (lluchs)
*
* This file is part of meli.
*
* meli is free software: you can redistribute it and/or modify
* it under the terms of the GNU General Public License as published by
* the Free Software Foundation, either version 3 of the License, or
* (at your option) any later version.
*
* meli is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* GNU General Public License for more details.
*
* You should have received a copy of the GNU General Public License
* along with meli. If not, see <http://www.gnu.org/licenses/>.
*/
//! # EventSource
//!
//! EventSource is a Rust library for reading from Server-Sent Events endpoints. It transparently
//! sends HTTP requests and only exposes a stream of events to the user. It handles automatic
//! reconnection and parsing of the `text/event-stream` data format.
//!
//! # Examples
//!
//! ```no_run
//! extern crate eventsource;
//! extern crate reqwest;
//! use eventsource::reqwest::Client;
//! use reqwest::Url;
//!
//! fn main() {
//! let client = Client::new(Url::parse("http://example.com").unwrap());
//! for event in client {
//! println!("{}", event.unwrap());
//! }
//! }
//! ```
//!
// Generic text/event-stream parsing and serialization.
pub mod event;
// HTTP interface
pub mod client;

View File

@ -1,2 +0,0 @@
target
Cargo.lock

View File

@ -1 +0,0 @@
language: rust

View File

@ -1,14 +0,0 @@
# Changelog
## 0.4.0 - 2019-09-16
### Changed
- Update reqwest to 0.9.x
## 0.3.0 - 2017-10-27
### Changed
- Update reqwest to 0.8.0
- `Client::new()` no longer returns a `Result`. This is a breaking
change carried over from reqwest 0.8.0.
### Fixed
- Infinite loop when error occur in the stream (#2)

View File

@ -1,20 +0,0 @@
[package]
name = "eventsource"
version = "0.4.0"
authors = ["Lukas Werling <lukas.werling@gmail.com>"]
description = "Library for accessing EventSource/Server-Sent Events endpoints"
repository = "https://github.com/lluchs/eventsource"
license = "MIT"
keywords = ["http"]
[features]
default = ["with-reqwest"]
# Enable the reqwest-based client.
with-reqwest = ["reqwest"]
[dependencies]
error-chain = "0.11.0"
reqwest = { version = "0.9.2", optional = true }
mime = "0.3.7"

View File

@ -1,25 +0,0 @@
Copyright (c) 2016 Lukas Werling
Permission is hereby granted, free of charge, to any
person obtaining a copy of this software and associated
documentation files (the "Software"), to deal in the
Software without restriction, including without
limitation the rights to use, copy, modify, merge,
publish, distribute, sublicense, and/or sell copies of
the Software, and to permit persons to whom the Software
is furnished to do so, subject to the following
conditions:
The above copyright notice and this permission notice
shall be included in all copies or substantial portions
of the Software.
THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF
ANY KIND, EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED
TO THE WARRANTIES OF MERCHANTABILITY, FITNESS FOR A
PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT
SHALL THE AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY
CLAIM, DAMAGES OR OTHER LIABILITY, WHETHER IN AN ACTION
OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, OUT OF OR
IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER
DEALINGS IN THE SOFTWARE.

View File

@ -1,23 +0,0 @@
# EventSource
[![crates.io](https://img.shields.io/crates/v/eventsource.svg)](https://crates.io/crates/eventsource) [![Documentation](https://docs.rs/eventsource/badge.svg)](https://docs.rs/eventsource/) [![Build Status](https://travis-ci.org/lluchs/eventsource.svg?branch=master)](https://travis-ci.org/lluchs/eventsource)
EventSource is a Rust library for reading from Server-Sent Events endpoints. It transparently
sends HTTP requests and only exposes a stream of events to the user. It handles automatic
reconnection and parsing of the `text/event-stream` data format.
## Examples
```no_run
extern crate eventsource;
extern crate reqwest;
use eventsource::reqwest::Client;
use reqwest::Url;
fn main() {
let client = Client::new(Url::parse("http://example.com").unwrap());
for event in client {
println!("{}", event.unwrap());
}
}
```

View File

@ -1,47 +1,65 @@
/*
* meli - jmap module.
*
* Copyright 2019 Lukas Werling (lluchs)
*
* This file is part of meli.
*
* meli is free software: you can redistribute it and/or modify
* it under the terms of the GNU General Public License as published by
* the Free Software Foundation, either version 3 of the License, or
* (at your option) any later version.
*
* meli is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* GNU General Public License for more details.
*
* You should have received a copy of the GNU General Public License
* along with meli. If not, see <http://www.gnu.org/licenses/>.
*/
// Original code by Lukas Werling (lluchs)
//! # Reqwest-based EventSource client
extern crate mime;
extern crate reqwest as reqw;
mod errors {
error_chain! {
foreign_links {
Reqwest(super::reqw::Error);
Io(::std::io::Error);
}
errors {
Http(status: super::reqw::StatusCode) {
description("HTTP request failed")
display("HTTP status code: {}", status)
}
InvalidContentType(mime_type: mime::Mime) {
description("unexpected Content-Type header")
display("unexpected Content-Type: {}", mime_type)
}
NoContentType {
description("no Content-Type header in response")
display("Content-Type missing")
}
}
}
}
pub use self::errors::*;
use self::reqw::header::{HeaderMap, HeaderValue, ACCEPT, CONTENT_TYPE};
use super::event::{parse_event_line, Event, ParseResult};
use crate::error::*;
use reqwest;
use reqwest::header::{HeaderMap, HeaderValue, ACCEPT, CONTENT_TYPE};
use std::io::{BufRead, BufReader};
use std::time::{Duration, Instant};
const DEFAULT_RETRY: u64 = 5000;
impl MeliError {
fn http_error(status_code: reqwest::StatusCode) -> MeliError {
MeliError {
summary: Some("HTTP request failed".into()),
details: format!("HTTP status code: {}", status_code).into(),
}
}
fn invalid_content_type(mime_type: &str) -> MeliError {
MeliError {
summary: Some("unexpected Content-Type header".into()),
details: format!("unexpected Content-Type: {}", mime_type).into(),
}
}
fn no_content_type() -> MeliError {
MeliError {
summary: Some("no Content-Type header in response".into()),
details: "Content-Type missing".into(),
}
}
}
/// A client for a Server-Sent Events endpoint.
///
/// Read events by iterating over the client.
pub struct Client {
client: reqw::Client,
response: Option<BufReader<reqw::Response>>,
url: reqw::Url,
client: reqwest::blocking::Client,
response: Option<BufReader<reqwest::blocking::Response>>,
url: reqwest::Url,
last_event_id: Option<String>,
last_try: Option<Instant>,
@ -54,9 +72,9 @@ impl Client {
/// Constructs a new EventSource client for the given URL.
///
/// This does not start an HTTP request.
pub fn new(url: reqw::Url) -> Client {
pub fn new(url: reqwest::Url) -> Client {
Client {
client: reqw::Client::new(),
client: reqwest::blocking::Client::new(),
response: None,
url: url,
last_event_id: None,
@ -78,24 +96,17 @@ impl Client {
{
let status = res.status();
if !status.is_success() {
return Err(ErrorKind::Http(status.clone()).into());
return Err(MeliError::http_error(status));
}
if let Some(content_type_hv) = res.headers().get(CONTENT_TYPE) {
let content_type = content_type_hv
.to_str()
.unwrap()
.to_string()
.parse::<mime::Mime>()
.unwrap();
let content_type = content_type_hv.to_str().unwrap().to_string();
// Compare type and subtype only, MIME parameters are ignored.
if (content_type.type_(), content_type.subtype())
!= (mime::TEXT, mime::EVENT_STREAM)
{
return Err(ErrorKind::InvalidContentType(content_type.clone()).into());
if content_type != "text/event-stream" {
return Err(MeliError::invalid_content_type(&content_type));
}
} else {
return Err(ErrorKind::NoContentType.into());
return Err(MeliError::no_content_type());
}
}

View File

@ -1,3 +1,24 @@
/*
* meli - jmap module.
*
* Copyright 2019 Lukas Werling (lluchs)
*
* This file is part of meli.
*
* meli is free software: you can redistribute it and/or modify
* it under the terms of the GNU General Public License as published by
* the Free Software Foundation, either version 3 of the License, or
* (at your option) any later version.
*
* meli is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* GNU General Public License for more details.
*
* You should have received a copy of the GNU General Public License
* along with meli. If not, see <http://www.gnu.org/licenses/>.
*/
use std::fmt;
use std::time::Duration;
@ -48,7 +69,7 @@ pub enum ParseResult {
/// // ...
/// ```
pub fn parse_event_line(line: &str, event: &mut Event) -> ParseResult {
let line = line.trim_right_matches(|c| c == '\r' || c == '\n');
let line = line.trim_end_matches(|c| c == '\r' || c == '\n');
if line == "" {
ParseResult::Dispatch
} else {
@ -61,17 +82,24 @@ pub fn parse_event_line(line: &str, event: &mut Event) -> ParseResult {
} else {
(line, "")
};
match field {
"event" => { event.event_type = Some(value.to_string()); },
"data" => { event.data.push_str(value); event.data.push('\n'); },
"id" => { event.id = Some(value.to_string()); }
"event" => {
event.event_type = Some(value.to_string());
}
"data" => {
event.data.push_str(value);
event.data.push('\n');
}
"id" => {
event.id = Some(value.to_string());
}
"retry" => {
if let Ok(retry) = value.parse::<u64>() {
return ParseResult::SetRetry(Duration::from_millis(retry));
}
},
_ => () // ignored
}
_ => (), // ignored
}
ParseResult::Next
@ -106,13 +134,13 @@ impl Event {
impl fmt::Display for Event {
fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
if let Some(ref id) = self.id {
try!(write!(f, "id: {}\n", id));
write!(f, "id: {}\n", id)?;
}
if let Some(ref event_type) = self.event_type {
try!(write!(f, "event: {}\n", event_type));
write!(f, "event: {}\n", event_type)?;
}
for line in self.data.lines() {
try!(write!(f, "data: {}\n", line));
write!(f, "data: {}\n", line)?;
}
Ok(())
}
@ -126,22 +154,52 @@ mod tests {
fn basic_event_display() {
assert_eq!(
"data: hello world\n",
Event { id: None, event_type: None, data: "hello world".to_string() }.to_string());
Event {
id: None,
event_type: None,
data: "hello world".to_string()
}
.to_string()
);
assert_eq!(
"id: foo\ndata: hello world\n",
Event { id: Some("foo".to_string()), event_type: None, data: "hello world".to_string() }.to_string());
Event {
id: Some("foo".to_string()),
event_type: None,
data: "hello world".to_string()
}
.to_string()
);
assert_eq!(
"event: bar\ndata: hello world\n",
Event { id: None, event_type: Some("bar".to_string()), data: "hello world".to_string() }.to_string());
Event {
id: None,
event_type: Some("bar".to_string()),
data: "hello world".to_string()
}
.to_string()
);
}
#[test]
fn multiline_event_display() {
assert_eq!(
"data: hello\ndata: world\n",
Event { id: None, event_type: None, data: "hello\nworld".to_string() }.to_string());
Event {
id: None,
event_type: None,
data: "hello\nworld".to_string()
}
.to_string()
);
assert_eq!(
"data: hello\ndata: \ndata: world\n",
Event { id: None, event_type: None, data: "hello\n\nworld".to_string() }.to_string());
Event {
id: None,
event_type: None,
data: "hello\n\nworld".to_string()
}
.to_string()
);
}
}

View File

@ -1,14 +0,0 @@
extern crate eventsource;
extern crate reqwest;
use eventsource::reqwest::Client;
use reqwest::Url;
fn main() {
//let url = "https://clonkspot.org/league/game_events.php";
let url = "http://league.openclonk.org/poll_game_events.php";
let client = Client::new(Url::parse(url).unwrap());
for event in client {
println!("{}", event.unwrap());
}
}

View File

@ -1,32 +0,0 @@
//! # EventSource
//!
//! EventSource is a Rust library for reading from Server-Sent Events endpoints. It transparently
//! sends HTTP requests and only exposes a stream of events to the user. It handles automatic
//! reconnection and parsing of the `text/event-stream` data format.
//!
//! # Examples
//!
//! ```no_run
//! extern crate eventsource;
//! extern crate reqwest;
//! use eventsource::reqwest::Client;
//! use reqwest::Url;
//!
//! fn main() {
//! let client = Client::new(Url::parse("http://example.com").unwrap());
//! for event in client {
//! println!("{}", event.unwrap());
//! }
//! }
//! ```
//!
#[macro_use]
extern crate error_chain;
// Generic text/event-stream parsing and serialization.
pub mod event;
// HTTP interface
#[cfg(feature = "with-reqwest")]
pub mod reqwest;

View File

@ -1,126 +0,0 @@
extern crate eventsource;
extern crate reqwest;
use eventsource::reqwest::{Client, Error, ErrorKind};
use reqwest::Url;
use std::time::Duration;
use server::Server;
mod server;
fn server() -> Server {
let s = Server::new();
s.receive(
"\
GET / HTTP/1.1\r\n\
host: 127.0.0.1:$PORT\r\n\
accept: text/event-stream\r\n\
accept-encoding: gzip\r\n\
\r\n",
);
return s;
}
#[test]
fn simple_events() {
let s = server();
s.send(
"HTTP/1.1 200 OK\r\n\
Content-Type: text/event-stream\r\n\
\r\n\
id: 42\r\n\
event: foo\r\n\
data: bar\r\n\
\r\n\
event: bar\n\
: comment\n\
data: baz\n\
\n",
);
println!("url: {}", s.url("/"));
let mut client = Client::new(Url::parse(&s.url("/")).unwrap());
let event = client.next().unwrap().unwrap();
assert_eq!(event.id, Some("42".into()));
assert_eq!(event.event_type, Some("foo".into()));
assert_eq!(event.data, "bar\n");
let event = client.next().unwrap().unwrap();
assert_eq!(event.id, None);
assert_eq!(event.event_type, Some("bar".into()));
assert_eq!(event.data, "baz\n");
}
#[test]
fn retry() {
let s = server();
s.send(
"HTTP/1.1 200 OK\r\n\
Content-Type: text/event-stream\r\n\
\r\n\
retry: 42\r\n\
data: bar\r\n\
\r\n",
);
println!("url: {}", s.url("/"));
let mut client = Client::new(Url::parse(&s.url("/")).unwrap());
let event = client.next().unwrap().unwrap();
assert_eq!(event.data, "bar\n");
assert_eq!(client.retry, Duration::from_millis(42));
}
#[test]
fn missing_content_type() {
let s = server();
s.send(
"HTTP/1.1 200 OK\r\n\
\r\n\
data: bar\r\n\
\r\n",
);
let mut client = Client::new(Url::parse(&s.url("/")).unwrap());
match client.next().unwrap() {
Err(Error(ErrorKind::NoContentType, _)) => assert!(true),
_ => assert!(false, "NoContentType error expected"),
}
}
#[test]
fn invalid_content_type() {
let s = server();
s.send(
"HTTP/1.1 200 OK\r\n\
Content-Type: text/plain\r\n\
\r\n\
data: bar\r\n\
\r\n",
);
let mut client = Client::new(Url::parse(&s.url("/")).unwrap());
match client.next().unwrap() {
Err(Error(ErrorKind::InvalidContentType(_), _)) => assert!(true),
_ => assert!(false, "InvalidContentType error expected"),
}
}
#[test]
fn content_type_with_mime_parameter() {
let s = server();
s.send(
"HTTP/1.1 200 OK\r\n\
Content-Type: text/event-stream;charset=utf8\r\n\
\r\n\
data: bar\r\n\
\r\n",
);
let mut client = Client::new(Url::parse(&s.url("/")).unwrap());
let event = client
.next()
.unwrap()
.expect("MIME parameter should be ignored");
assert_eq!(event.data, "bar\n");
}

View File

@ -1,19 +0,0 @@
Copyright (c) 2014 Carl Lerche
Permission is hereby granted, free of charge, to any person obtaining a copy
of this software and associated documentation files (the "Software"), to deal
in the Software without restriction, including without limitation the rights
to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
copies of the Software, and to permit persons to whom the Software is
furnished to do so, subject to the following conditions:
The above copyright notice and this permission notice shall be included in
all copies or substantial portions of the Software.
THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN
THE SOFTWARE.

View File

@ -1,191 +0,0 @@
#![allow(dead_code)]
use std::collections::HashSet;
use std::io::prelude::*;
use std::io::BufReader;
use std::net::{SocketAddr, TcpListener, TcpStream};
use std::sync::mpsc::{channel, Receiver, Sender};
use std::thread;
macro_rules! t {
($e:expr) => {
match $e {
Ok(e) => e,
Err(e) => panic!("{} failed with {:?}", stringify!($e), e),
}
};
}
pub struct Server {
messages: Option<Sender<Message>>,
addr: SocketAddr,
thread: Option<thread::JoinHandle<()>>,
}
enum Message {
Read(String),
Write(String),
}
fn run(listener: &TcpListener, rx: &Receiver<Message>) {
let mut socket = BufReader::new(listener.accept().unwrap().0);
for msg in rx.iter() {
match msg {
Message::Read(ref expected) => {
let mut expected = &expected[..];
let mut expected_headers = HashSet::new();
while let Some(i) = expected.find("\n") {
let line = &expected[..i + 1];
expected = &expected[i + 1..];
expected_headers.insert(line);
if line == "\r\n" {
break;
}
}
let mut expected_len = None;
while expected_headers.len() > 0 {
let mut actual = String::new();
t!(socket.read_line(&mut actual));
if actual.starts_with("Content-Length") {
let len = actual.split(": ").skip(1).next().unwrap();
expected_len = len.trim().parse().ok();
}
// various versions of libcurl do different things here
if actual == "Proxy-Connection: Keep-Alive\r\n" {
continue;
}
// The User-Agent header changes with the reqwest version.
if actual.starts_with("user-agent:") {
continue;
}
if expected_headers.remove(&actual[..]) {
continue;
}
let mut found = None;
for header in expected_headers.iter() {
if lines_match(header, &actual) {
found = Some(header.clone());
break;
}
}
if let Some(found) = found {
expected_headers.remove(&found);
continue;
}
panic!(
"unexpected header: {:?} (remaining headers {:?})",
actual, expected_headers
);
}
for header in expected_headers {
panic!("expected header but not found: {:?}", header);
}
let mut line = String::new();
let mut socket = match expected_len {
Some(amt) => socket.by_ref().take(amt),
None => socket.by_ref().take(expected.len() as u64),
};
while socket.limit() > 0 {
line.truncate(0);
t!(socket.read_line(&mut line));
if line.len() == 0 {
break;
}
if expected.len() == 0 {
panic!("unexpected line: {:?}", line);
}
let i = expected.find("\n").unwrap_or(expected.len() - 1);
let expected_line = &expected[..i + 1];
expected = &expected[i + 1..];
if lines_match(expected_line, &line) {
continue;
}
panic!(
"lines didn't match:\n\
expected: {:?}\n\
actual: {:?}\n",
expected_line, line
)
}
if expected.len() != 0 {
println!("didn't get expected data: {:?}", expected);
}
}
Message::Write(ref to_write) => {
t!(socket.get_mut().write_all(to_write.as_bytes()));
return;
}
}
}
let mut dst = Vec::new();
t!(socket.read_to_end(&mut dst));
assert!(dst.len() == 0);
}
fn lines_match(expected: &str, mut actual: &str) -> bool {
for (i, part) in expected.split("[..]").enumerate() {
match actual.find(part) {
Some(j) => {
if i == 0 && j != 0 {
return false;
}
actual = &actual[j + part.len()..];
}
None => return false,
}
}
actual.is_empty() || expected.ends_with("[..]")
}
impl Server {
pub fn new() -> Server {
let listener = t!(TcpListener::bind("127.0.0.1:0"));
let addr = t!(listener.local_addr());
let (tx, rx) = channel();
let thread = thread::spawn(move || run(&listener, &rx));
Server {
messages: Some(tx),
addr: addr,
thread: Some(thread),
}
}
pub fn receive(&self, msg: &str) {
let msg = msg.replace("$PORT", &self.addr.port().to_string());
self.msg(Message::Read(msg));
}
pub fn send(&self, msg: &str) {
let msg = msg.replace("$PORT", &self.addr.port().to_string());
self.msg(Message::Write(msg));
}
fn msg(&self, msg: Message) {
t!(self.messages.as_ref().unwrap().send(msg));
}
pub fn addr(&self) -> &SocketAddr {
&self.addr
}
pub fn url(&self, path: &str) -> String {
format!("http://{}{}", self.addr, path)
}
}
impl Drop for Server {
fn drop(&mut self) {
drop(TcpStream::connect(&self.addr));
drop(self.messages.take());
let res = self.thread.take().unwrap().join();
if !thread::panicking() {
t!(res);
} else if let Err(e) = res {
println!("child server thread also failed: {:?}", e);
}
}
}