Squashed 'melib/src/backends/jmap/eventsource/' content from commit 53bff58

git-subtree-dir: melib/src/backends/jmap/eventsource
git-subtree-split: 53bff58549ebdd29eda128d05a695f6ba9528deb
Manos Pitsidianakis 2020-04-02 12:56:03 +03:00
commit b3c78ca9ec
13 changed files with 792 additions and 0 deletions

2
.gitignore vendored 100644
View File

@ -0,0 +1,2 @@
target
Cargo.lock

1
.travis.yml 100644
View File

@ -0,0 +1 @@
language: rust

14
CHANGELOG.md 100644
View File

@ -0,0 +1,14 @@
# Changelog
## 0.4.0 - 2019-09-16
### Changed
- Update reqwest to 0.9.x
## 0.3.0 - 2017-10-27
### Changed
- Update reqwest to 0.8.0
- `Client::new()` no longer returns a `Result`. This is a breaking
change carried over from reqwest 0.8.0.
### Fixed
- Infinite loop when error occur in the stream (#2)

20
Cargo.toml 100644
View File

@ -0,0 +1,20 @@
[package]
name = "eventsource"
version = "0.4.0"
authors = ["Lukas Werling <lukas.werling@gmail.com>"]
description = "Library for accessing EventSource/Server-Sent Events endpoints"
repository = "https://github.com/lluchs/eventsource"
license = "MIT"
keywords = ["http"]
[features]
default = ["with-reqwest"]
# Enable the reqwest-based client.
with-reqwest = ["reqwest"]
[dependencies]
error-chain = "0.11.0"
reqwest = { version = "0.9.2", optional = true }
mime = "0.3.7"

25
LICENSE 100644
View File

@ -0,0 +1,25 @@
Copyright (c) 2016 Lukas Werling
Permission is hereby granted, free of charge, to any
person obtaining a copy of this software and associated
documentation files (the "Software"), to deal in the
Software without restriction, including without
limitation the rights to use, copy, modify, merge,
publish, distribute, sublicense, and/or sell copies of
the Software, and to permit persons to whom the Software
is furnished to do so, subject to the following
conditions:
The above copyright notice and this permission notice
shall be included in all copies or substantial portions
of the Software.
THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF
ANY KIND, EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED
TO THE WARRANTIES OF MERCHANTABILITY, FITNESS FOR A
PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT
SHALL THE AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY
CLAIM, DAMAGES OR OTHER LIABILITY, WHETHER IN AN ACTION
OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, OUT OF OR
IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER
DEALINGS IN THE SOFTWARE.

23
README.md 100644
View File

@ -0,0 +1,23 @@
# EventSource
[![crates.io](https://img.shields.io/crates/v/eventsource.svg)](https://crates.io/crates/eventsource) [![Documentation](https://docs.rs/eventsource/badge.svg)](https://docs.rs/eventsource/) [![Build Status](https://travis-ci.org/lluchs/eventsource.svg?branch=master)](https://travis-ci.org/lluchs/eventsource)
EventSource is a Rust library for reading from Server-Sent Events endpoints. It transparently
sends HTTP requests and only exposes a stream of events to the user. It handles automatic
reconnection and parsing of the `text/event-stream` data format.
## Examples
```no_run
extern crate eventsource;
extern crate reqwest;
use eventsource::reqwest::Client;
use reqwest::Url;
fn main() {
let client = Client::new(Url::parse("http://example.com").unwrap());
for event in client {
println!("{}", event.unwrap());
}
}
```

View File

@ -0,0 +1,14 @@
extern crate eventsource;
extern crate reqwest;
use eventsource::reqwest::Client;
use reqwest::Url;
fn main() {
//let url = "https://clonkspot.org/league/game_events.php";
let url = "http://league.openclonk.org/poll_game_events.php";
let client = Client::new(Url::parse(url).unwrap());
for event in client {
println!("{}", event.unwrap());
}
}

147
src/event.rs 100644
View File

@ -0,0 +1,147 @@
use std::fmt;
use std::time::Duration;
/// A single Server-Sent Event.
#[derive(Debug)]
pub struct Event {
/// Corresponds to the `id` field.
pub id: Option<String>,
/// Corresponds to the `event` field.
pub event_type: Option<String>,
/// All `data` fields concatenated by newlines.
pub data: String,
}
/// Possible results from parsing a single event-stream line.
#[derive(Debug, PartialEq)]
pub enum ParseResult {
/// Line parsed successfully, but the event is not complete yet.
Next,
/// The event is complete now. Pass a new (empty) event for the next call.
Dispatch,
/// Set retry time.
SetRetry(Duration),
}
/// Parse a single line of an event-stream.
///
/// The line may end with a newline.
///
/// You will have to call this function multiple times until it returns `ParseResult::Dispatch`.
/// Make sure to clear the event struct for the next line, then.
///
/// To handle the `Last-Event-ID` header, check the `id` field for each finished event.
///
/// # Examples
///
/// ```
/// # use eventsource::event::{Event, ParseResult, parse_event_line};
/// let mut event = Event::new();
/// assert_eq!(parse_event_line("id: 42", &mut event), ParseResult::Next);
/// assert_eq!(parse_event_line("data: foobar", &mut event), ParseResult::Next);
/// assert_eq!(parse_event_line("", &mut event), ParseResult::Dispatch);
/// // The event is finished now.
/// assert_eq!(event.id, Some("42".into()));
/// assert_eq!(event.data, "foobar\n");
/// // Now clear and continue.
/// event.clear();
/// // ...
/// ```
pub fn parse_event_line(line: &str, event: &mut Event) -> ParseResult {
let line = line.trim_right_matches(|c| c == '\r' || c == '\n');
if line == "" {
ParseResult::Dispatch
} else {
let (field, value) = if let Some(pos) = line.find(':') {
let (f, v) = line.split_at(pos);
// Strip : and an optional space.
let v = &v[1..];
let v = if v.starts_with(' ') { &v[1..] } else { v };
(f, v)
} else {
(line, "")
};
match field {
"event" => { event.event_type = Some(value.to_string()); },
"data" => { event.data.push_str(value); event.data.push('\n'); },
"id" => { event.id = Some(value.to_string()); }
"retry" => {
if let Ok(retry) = value.parse::<u64>() {
return ParseResult::SetRetry(Duration::from_millis(retry));
}
},
_ => () // ignored
}
ParseResult::Next
}
}
impl Event {
/// Creates an empty event.
pub fn new() -> Event {
Event {
id: None,
event_type: None,
data: "".to_string(),
}
}
/// Returns `true` if the event is empty.
///
/// An event is empty if it has no id or event type and its data field is empty.
pub fn is_empty(&self) -> bool {
self.id.is_none() && self.event_type.is_none() && self.data.is_empty()
}
/// Makes the event empty.
pub fn clear(&mut self) {
self.id = None;
self.event_type = None;
self.data.clear();
}
}
impl fmt::Display for Event {
fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
if let Some(ref id) = self.id {
try!(write!(f, "id: {}\n", id));
}
if let Some(ref event_type) = self.event_type {
try!(write!(f, "event: {}\n", event_type));
}
for line in self.data.lines() {
try!(write!(f, "data: {}\n", line));
}
Ok(())
}
}
#[cfg(test)]
mod tests {
use super::*;
#[test]
fn basic_event_display() {
assert_eq!(
"data: hello world\n",
Event { id: None, event_type: None, data: "hello world".to_string() }.to_string());
assert_eq!(
"id: foo\ndata: hello world\n",
Event { id: Some("foo".to_string()), event_type: None, data: "hello world".to_string() }.to_string());
assert_eq!(
"event: bar\ndata: hello world\n",
Event { id: None, event_type: Some("bar".to_string()), data: "hello world".to_string() }.to_string());
}
#[test]
fn multiline_event_display() {
assert_eq!(
"data: hello\ndata: world\n",
Event { id: None, event_type: None, data: "hello\nworld".to_string() }.to_string());
assert_eq!(
"data: hello\ndata: \ndata: world\n",
Event { id: None, event_type: None, data: "hello\n\nworld".to_string() }.to_string());
}
}

32
src/lib.rs 100644
View File

@ -0,0 +1,32 @@
//! # EventSource
//!
//! EventSource is a Rust library for reading from Server-Sent Events endpoints. It transparently
//! sends HTTP requests and only exposes a stream of events to the user. It handles automatic
//! reconnection and parsing of the `text/event-stream` data format.
//!
//! # Examples
//!
//! ```no_run
//! extern crate eventsource;
//! extern crate reqwest;
//! use eventsource::reqwest::Client;
//! use reqwest::Url;
//!
//! fn main() {
//! let client = Client::new(Url::parse("http://example.com").unwrap());
//! for event in client {
//! println!("{}", event.unwrap());
//! }
//! }
//! ```
//!
#[macro_use]
extern crate error_chain;
// Generic text/event-stream parsing and serialization.
pub mod event;
// HTTP interface
#[cfg(feature = "with-reqwest")]
pub mod reqwest;

178
src/reqwest.rs 100644
View File

@ -0,0 +1,178 @@
//! # Reqwest-based EventSource client
extern crate mime;
extern crate reqwest as reqw;
mod errors {
error_chain! {
foreign_links {
Reqwest(super::reqw::Error);
Io(::std::io::Error);
}
errors {
Http(status: super::reqw::StatusCode) {
description("HTTP request failed")
display("HTTP status code: {}", status)
}
InvalidContentType(mime_type: mime::Mime) {
description("unexpected Content-Type header")
display("unexpected Content-Type: {}", mime_type)
}
NoContentType {
description("no Content-Type header in response")
display("Content-Type missing")
}
}
}
}
pub use self::errors::*;
use self::reqw::header::{HeaderMap, HeaderValue, ACCEPT, CONTENT_TYPE};
use super::event::{parse_event_line, Event, ParseResult};
use std::io::{BufRead, BufReader};
use std::time::{Duration, Instant};
const DEFAULT_RETRY: u64 = 5000;
/// A client for a Server-Sent Events endpoint.
///
/// Read events by iterating over the client.
pub struct Client {
client: reqw::Client,
response: Option<BufReader<reqw::Response>>,
url: reqw::Url,
last_event_id: Option<String>,
last_try: Option<Instant>,
/// Reconnection time in milliseconds. Note that the reconnection time can be changed by the
/// event stream, so changing this may not make a difference.
pub retry: Duration,
}
impl Client {
/// Constructs a new EventSource client for the given URL.
///
/// This does not start an HTTP request.
pub fn new(url: reqw::Url) -> Client {
Client {
client: reqw::Client::new(),
response: None,
url: url,
last_event_id: None,
last_try: None,
retry: Duration::from_millis(DEFAULT_RETRY),
}
}
fn next_request(&mut self) -> Result<()> {
let mut headers = HeaderMap::with_capacity(2);
headers.insert(ACCEPT, HeaderValue::from_str("text/event-stream").unwrap());
if let Some(ref id) = self.last_event_id {
headers.insert("Last-Event-ID", HeaderValue::from_str(id).unwrap());
}
let res = self.client.get(self.url.clone()).headers(headers).send()?;
// Check status code and Content-Type.
{
let status = res.status();
if !status.is_success() {
return Err(ErrorKind::Http(status.clone()).into());
}
if let Some(content_type_hv) = res.headers().get(CONTENT_TYPE) {
let content_type = content_type_hv
.to_str()
.unwrap()
.to_string()
.parse::<mime::Mime>()
.unwrap();
// Compare type and subtype only, MIME parameters are ignored.
if (content_type.type_(), content_type.subtype())
!= (mime::TEXT, mime::EVENT_STREAM)
{
return Err(ErrorKind::InvalidContentType(content_type.clone()).into());
}
} else {
return Err(ErrorKind::NoContentType.into());
}
}
self.response = Some(BufReader::new(res));
Ok(())
}
}
// Helper macro for Option<Result<...>>
macro_rules! try_option {
($e:expr) => {
match $e {
Ok(val) => val,
Err(err) => return Some(Err(::std::convert::From::from(err))),
}
};
}
/// Iterate over the client to get events.
///
/// HTTP requests are made transparently while iterating.
impl Iterator for Client {
type Item = Result<Event>;
fn next(&mut self) -> Option<Result<Event>> {
if self.response.is_none() {
// We may have to wait for the next request.
if let Some(last_try) = self.last_try {
let elapsed = last_try.elapsed();
if elapsed < self.retry {
::std::thread::sleep(self.retry - elapsed);
}
}
// Set here in case the request fails.
self.last_try = Some(Instant::now());
try_option!(self.next_request());
}
let result = {
let mut event = Event::new();
let mut line = String::new();
let reader = self.response.as_mut().unwrap();
loop {
match reader.read_line(&mut line) {
// Got new bytes from stream
Ok(_n) if _n > 0 => {
match parse_event_line(&line, &mut event) {
ParseResult::Next => (), // okay, just continue
ParseResult::Dispatch => {
if let Some(ref id) = event.id {
self.last_event_id = Some(id.clone());
}
return Some(Ok(event));
}
ParseResult::SetRetry(ref retry) => {
self.retry = *retry;
}
}
line.clear();
}
// Nothing read from stream
Ok(_) => break None,
Err(err) => break Some(Err(::std::convert::From::from(err))),
}
}
};
match result {
None | Some(Err(_)) => {
// EOF or a stream error, retry after timeout
self.last_try = Some(Instant::now());
self.response = None;
self.next()
}
_ => result,
}
}
}

126
tests/reqwest.rs 100644
View File

@ -0,0 +1,126 @@
extern crate eventsource;
extern crate reqwest;
use eventsource::reqwest::{Client, Error, ErrorKind};
use reqwest::Url;
use std::time::Duration;
use server::Server;
mod server;
fn server() -> Server {
let s = Server::new();
s.receive(
"\
GET / HTTP/1.1\r\n\
host: 127.0.0.1:$PORT\r\n\
accept: text/event-stream\r\n\
accept-encoding: gzip\r\n\
\r\n",
);
return s;
}
#[test]
fn simple_events() {
let s = server();
s.send(
"HTTP/1.1 200 OK\r\n\
Content-Type: text/event-stream\r\n\
\r\n\
id: 42\r\n\
event: foo\r\n\
data: bar\r\n\
\r\n\
event: bar\n\
: comment\n\
data: baz\n\
\n",
);
println!("url: {}", s.url("/"));
let mut client = Client::new(Url::parse(&s.url("/")).unwrap());
let event = client.next().unwrap().unwrap();
assert_eq!(event.id, Some("42".into()));
assert_eq!(event.event_type, Some("foo".into()));
assert_eq!(event.data, "bar\n");
let event = client.next().unwrap().unwrap();
assert_eq!(event.id, None);
assert_eq!(event.event_type, Some("bar".into()));
assert_eq!(event.data, "baz\n");
}
#[test]
fn retry() {
let s = server();
s.send(
"HTTP/1.1 200 OK\r\n\
Content-Type: text/event-stream\r\n\
\r\n\
retry: 42\r\n\
data: bar\r\n\
\r\n",
);
println!("url: {}", s.url("/"));
let mut client = Client::new(Url::parse(&s.url("/")).unwrap());
let event = client.next().unwrap().unwrap();
assert_eq!(event.data, "bar\n");
assert_eq!(client.retry, Duration::from_millis(42));
}
#[test]
fn missing_content_type() {
let s = server();
s.send(
"HTTP/1.1 200 OK\r\n\
\r\n\
data: bar\r\n\
\r\n",
);
let mut client = Client::new(Url::parse(&s.url("/")).unwrap());
match client.next().unwrap() {
Err(Error(ErrorKind::NoContentType, _)) => assert!(true),
_ => assert!(false, "NoContentType error expected"),
}
}
#[test]
fn invalid_content_type() {
let s = server();
s.send(
"HTTP/1.1 200 OK\r\n\
Content-Type: text/plain\r\n\
\r\n\
data: bar\r\n\
\r\n",
);
let mut client = Client::new(Url::parse(&s.url("/")).unwrap());
match client.next().unwrap() {
Err(Error(ErrorKind::InvalidContentType(_), _)) => assert!(true),
_ => assert!(false, "InvalidContentType error expected"),
}
}
#[test]
fn content_type_with_mime_parameter() {
let s = server();
s.send(
"HTTP/1.1 200 OK\r\n\
Content-Type: text/event-stream;charset=utf8\r\n\
\r\n\
data: bar\r\n\
\r\n",
);
let mut client = Client::new(Url::parse(&s.url("/")).unwrap());
let event = client
.next()
.unwrap()
.expect("MIME parameter should be ignored");
assert_eq!(event.data, "bar\n");
}

View File

@ -0,0 +1,19 @@
Copyright (c) 2014 Carl Lerche
Permission is hereby granted, free of charge, to any person obtaining a copy
of this software and associated documentation files (the "Software"), to deal
in the Software without restriction, including without limitation the rights
to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
copies of the Software, and to permit persons to whom the Software is
furnished to do so, subject to the following conditions:
The above copyright notice and this permission notice shall be included in
all copies or substantial portions of the Software.
THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN
THE SOFTWARE.

191
tests/server/mod.rs 100644
View File

@ -0,0 +1,191 @@
#![allow(dead_code)]
use std::collections::HashSet;
use std::io::prelude::*;
use std::io::BufReader;
use std::net::{SocketAddr, TcpListener, TcpStream};
use std::sync::mpsc::{channel, Receiver, Sender};
use std::thread;
macro_rules! t {
($e:expr) => {
match $e {
Ok(e) => e,
Err(e) => panic!("{} failed with {:?}", stringify!($e), e),
}
};
}
pub struct Server {
messages: Option<Sender<Message>>,
addr: SocketAddr,
thread: Option<thread::JoinHandle<()>>,
}
enum Message {
Read(String),
Write(String),
}
fn run(listener: &TcpListener, rx: &Receiver<Message>) {
let mut socket = BufReader::new(listener.accept().unwrap().0);
for msg in rx.iter() {
match msg {
Message::Read(ref expected) => {
let mut expected = &expected[..];
let mut expected_headers = HashSet::new();
while let Some(i) = expected.find("\n") {
let line = &expected[..i + 1];
expected = &expected[i + 1..];
expected_headers.insert(line);
if line == "\r\n" {
break;
}
}
let mut expected_len = None;
while expected_headers.len() > 0 {
let mut actual = String::new();
t!(socket.read_line(&mut actual));
if actual.starts_with("Content-Length") {
let len = actual.split(": ").skip(1).next().unwrap();
expected_len = len.trim().parse().ok();
}
// various versions of libcurl do different things here
if actual == "Proxy-Connection: Keep-Alive\r\n" {
continue;
}
// The User-Agent header changes with the reqwest version.
if actual.starts_with("user-agent:") {
continue;
}
if expected_headers.remove(&actual[..]) {
continue;
}
let mut found = None;
for header in expected_headers.iter() {
if lines_match(header, &actual) {
found = Some(header.clone());
break;
}
}
if let Some(found) = found {
expected_headers.remove(&found);
continue;
}
panic!(
"unexpected header: {:?} (remaining headers {:?})",
actual, expected_headers
);
}
for header in expected_headers {
panic!("expected header but not found: {:?}", header);
}
let mut line = String::new();
let mut socket = match expected_len {
Some(amt) => socket.by_ref().take(amt),
None => socket.by_ref().take(expected.len() as u64),
};
while socket.limit() > 0 {
line.truncate(0);
t!(socket.read_line(&mut line));
if line.len() == 0 {
break;
}
if expected.len() == 0 {
panic!("unexpected line: {:?}", line);
}
let i = expected.find("\n").unwrap_or(expected.len() - 1);
let expected_line = &expected[..i + 1];
expected = &expected[i + 1..];
if lines_match(expected_line, &line) {
continue;
}
panic!(
"lines didn't match:\n\
expected: {:?}\n\
actual: {:?}\n",
expected_line, line
)
}
if expected.len() != 0 {
println!("didn't get expected data: {:?}", expected);
}
}
Message::Write(ref to_write) => {
t!(socket.get_mut().write_all(to_write.as_bytes()));
return;
}
}
}
let mut dst = Vec::new();
t!(socket.read_to_end(&mut dst));
assert!(dst.len() == 0);
}
fn lines_match(expected: &str, mut actual: &str) -> bool {
for (i, part) in expected.split("[..]").enumerate() {
match actual.find(part) {
Some(j) => {
if i == 0 && j != 0 {
return false;
}
actual = &actual[j + part.len()..];
}
None => return false,
}
}
actual.is_empty() || expected.ends_with("[..]")
}
impl Server {
pub fn new() -> Server {
let listener = t!(TcpListener::bind("127.0.0.1:0"));
let addr = t!(listener.local_addr());
let (tx, rx) = channel();
let thread = thread::spawn(move || run(&listener, &rx));
Server {
messages: Some(tx),
addr: addr,
thread: Some(thread),
}
}
pub fn receive(&self, msg: &str) {
let msg = msg.replace("$PORT", &self.addr.port().to_string());
self.msg(Message::Read(msg));
}
pub fn send(&self, msg: &str) {
let msg = msg.replace("$PORT", &self.addr.port().to_string());
self.msg(Message::Write(msg));
}
fn msg(&self, msg: Message) {
t!(self.messages.as_ref().unwrap().send(msg));
}
pub fn addr(&self) -> &SocketAddr {
&self.addr
}
pub fn url(&self, path: &str) -> String {
format!("http://{}{}", self.addr, path)
}
}
impl Drop for Server {
fn drop(&mut self) {
drop(TcpStream::connect(&self.addr));
drop(self.messages.take());
let res = self.thread.take().unwrap().join();
if !thread::panicking() {
t!(res);
} else if let Err(e) = res {
println!("child server thread also failed: {:?}", e);
}
}
}