Merge commit 'b3c78ca9ecb731e6fa00844b7939b1eca61dbc95' as 'melib/src/backends/jmap/eventsource'
commit
e875ba40ce
|
@ -0,0 +1,2 @@
|
|||
target
|
||||
Cargo.lock
|
|
@ -0,0 +1 @@
|
|||
language: rust
|
|
@ -0,0 +1,14 @@
|
|||
# Changelog
|
||||
|
||||
## 0.4.0 - 2019-09-16
|
||||
### Changed
|
||||
- Update reqwest to 0.9.x
|
||||
|
||||
## 0.3.0 - 2017-10-27
|
||||
### Changed
|
||||
- Update reqwest to 0.8.0
|
||||
- `Client::new()` no longer returns a `Result`. This is a breaking
|
||||
change carried over from reqwest 0.8.0.
|
||||
|
||||
### Fixed
|
||||
- Infinite loop when error occur in the stream (#2)
|
|
@ -0,0 +1,20 @@
|
|||
[package]
|
||||
name = "eventsource"
|
||||
version = "0.4.0"
|
||||
authors = ["Lukas Werling <lukas.werling@gmail.com>"]
|
||||
|
||||
description = "Library for accessing EventSource/Server-Sent Events endpoints"
|
||||
repository = "https://github.com/lluchs/eventsource"
|
||||
license = "MIT"
|
||||
keywords = ["http"]
|
||||
|
||||
[features]
|
||||
default = ["with-reqwest"]
|
||||
|
||||
# Enable the reqwest-based client.
|
||||
with-reqwest = ["reqwest"]
|
||||
|
||||
[dependencies]
|
||||
error-chain = "0.11.0"
|
||||
reqwest = { version = "0.9.2", optional = true }
|
||||
mime = "0.3.7"
|
|
@ -0,0 +1,25 @@
|
|||
Copyright (c) 2016 Lukas Werling
|
||||
|
||||
Permission is hereby granted, free of charge, to any
|
||||
person obtaining a copy of this software and associated
|
||||
documentation files (the "Software"), to deal in the
|
||||
Software without restriction, including without
|
||||
limitation the rights to use, copy, modify, merge,
|
||||
publish, distribute, sublicense, and/or sell copies of
|
||||
the Software, and to permit persons to whom the Software
|
||||
is furnished to do so, subject to the following
|
||||
conditions:
|
||||
|
||||
The above copyright notice and this permission notice
|
||||
shall be included in all copies or substantial portions
|
||||
of the Software.
|
||||
|
||||
THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF
|
||||
ANY KIND, EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED
|
||||
TO THE WARRANTIES OF MERCHANTABILITY, FITNESS FOR A
|
||||
PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT
|
||||
SHALL THE AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY
|
||||
CLAIM, DAMAGES OR OTHER LIABILITY, WHETHER IN AN ACTION
|
||||
OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, OUT OF OR
|
||||
IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER
|
||||
DEALINGS IN THE SOFTWARE.
|
|
@ -0,0 +1,23 @@
|
|||
# EventSource
|
||||
|
||||
[![crates.io](https://img.shields.io/crates/v/eventsource.svg)](https://crates.io/crates/eventsource) [![Documentation](https://docs.rs/eventsource/badge.svg)](https://docs.rs/eventsource/) [![Build Status](https://travis-ci.org/lluchs/eventsource.svg?branch=master)](https://travis-ci.org/lluchs/eventsource)
|
||||
|
||||
EventSource is a Rust library for reading from Server-Sent Events endpoints. It transparently
|
||||
sends HTTP requests and only exposes a stream of events to the user. It handles automatic
|
||||
reconnection and parsing of the `text/event-stream` data format.
|
||||
|
||||
## Examples
|
||||
|
||||
```no_run
|
||||
extern crate eventsource;
|
||||
extern crate reqwest;
|
||||
use eventsource::reqwest::Client;
|
||||
use reqwest::Url;
|
||||
|
||||
fn main() {
|
||||
let client = Client::new(Url::parse("http://example.com").unwrap());
|
||||
for event in client {
|
||||
println!("{}", event.unwrap());
|
||||
}
|
||||
}
|
||||
```
|
|
@ -0,0 +1,14 @@
|
|||
extern crate eventsource;
|
||||
extern crate reqwest;
|
||||
|
||||
use eventsource::reqwest::Client;
|
||||
use reqwest::Url;
|
||||
|
||||
fn main() {
|
||||
//let url = "https://clonkspot.org/league/game_events.php";
|
||||
let url = "http://league.openclonk.org/poll_game_events.php";
|
||||
let client = Client::new(Url::parse(url).unwrap());
|
||||
for event in client {
|
||||
println!("{}", event.unwrap());
|
||||
}
|
||||
}
|
|
@ -0,0 +1,147 @@
|
|||
use std::fmt;
|
||||
use std::time::Duration;
|
||||
|
||||
/// A single Server-Sent Event.
|
||||
#[derive(Debug)]
|
||||
pub struct Event {
|
||||
/// Corresponds to the `id` field.
|
||||
pub id: Option<String>,
|
||||
/// Corresponds to the `event` field.
|
||||
pub event_type: Option<String>,
|
||||
/// All `data` fields concatenated by newlines.
|
||||
pub data: String,
|
||||
}
|
||||
|
||||
/// Possible results from parsing a single event-stream line.
|
||||
#[derive(Debug, PartialEq)]
|
||||
pub enum ParseResult {
|
||||
/// Line parsed successfully, but the event is not complete yet.
|
||||
Next,
|
||||
/// The event is complete now. Pass a new (empty) event for the next call.
|
||||
Dispatch,
|
||||
/// Set retry time.
|
||||
SetRetry(Duration),
|
||||
}
|
||||
|
||||
/// Parse a single line of an event-stream.
|
||||
///
|
||||
/// The line may end with a newline.
|
||||
///
|
||||
/// You will have to call this function multiple times until it returns `ParseResult::Dispatch`.
|
||||
/// Make sure to clear the event struct for the next line, then.
|
||||
///
|
||||
/// To handle the `Last-Event-ID` header, check the `id` field for each finished event.
|
||||
///
|
||||
/// # Examples
|
||||
///
|
||||
/// ```
|
||||
/// # use eventsource::event::{Event, ParseResult, parse_event_line};
|
||||
/// let mut event = Event::new();
|
||||
/// assert_eq!(parse_event_line("id: 42", &mut event), ParseResult::Next);
|
||||
/// assert_eq!(parse_event_line("data: foobar", &mut event), ParseResult::Next);
|
||||
/// assert_eq!(parse_event_line("", &mut event), ParseResult::Dispatch);
|
||||
/// // The event is finished now.
|
||||
/// assert_eq!(event.id, Some("42".into()));
|
||||
/// assert_eq!(event.data, "foobar\n");
|
||||
/// // Now clear and continue.
|
||||
/// event.clear();
|
||||
/// // ...
|
||||
/// ```
|
||||
pub fn parse_event_line(line: &str, event: &mut Event) -> ParseResult {
|
||||
let line = line.trim_right_matches(|c| c == '\r' || c == '\n');
|
||||
if line == "" {
|
||||
ParseResult::Dispatch
|
||||
} else {
|
||||
let (field, value) = if let Some(pos) = line.find(':') {
|
||||
let (f, v) = line.split_at(pos);
|
||||
// Strip : and an optional space.
|
||||
let v = &v[1..];
|
||||
let v = if v.starts_with(' ') { &v[1..] } else { v };
|
||||
(f, v)
|
||||
} else {
|
||||
(line, "")
|
||||
};
|
||||
|
||||
match field {
|
||||
"event" => { event.event_type = Some(value.to_string()); },
|
||||
"data" => { event.data.push_str(value); event.data.push('\n'); },
|
||||
"id" => { event.id = Some(value.to_string()); }
|
||||
"retry" => {
|
||||
if let Ok(retry) = value.parse::<u64>() {
|
||||
return ParseResult::SetRetry(Duration::from_millis(retry));
|
||||
}
|
||||
},
|
||||
_ => () // ignored
|
||||
}
|
||||
|
||||
ParseResult::Next
|
||||
}
|
||||
}
|
||||
|
||||
impl Event {
|
||||
/// Creates an empty event.
|
||||
pub fn new() -> Event {
|
||||
Event {
|
||||
id: None,
|
||||
event_type: None,
|
||||
data: "".to_string(),
|
||||
}
|
||||
}
|
||||
|
||||
/// Returns `true` if the event is empty.
|
||||
///
|
||||
/// An event is empty if it has no id or event type and its data field is empty.
|
||||
pub fn is_empty(&self) -> bool {
|
||||
self.id.is_none() && self.event_type.is_none() && self.data.is_empty()
|
||||
}
|
||||
|
||||
/// Makes the event empty.
|
||||
pub fn clear(&mut self) {
|
||||
self.id = None;
|
||||
self.event_type = None;
|
||||
self.data.clear();
|
||||
}
|
||||
}
|
||||
|
||||
impl fmt::Display for Event {
|
||||
fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
|
||||
if let Some(ref id) = self.id {
|
||||
try!(write!(f, "id: {}\n", id));
|
||||
}
|
||||
if let Some(ref event_type) = self.event_type {
|
||||
try!(write!(f, "event: {}\n", event_type));
|
||||
}
|
||||
for line in self.data.lines() {
|
||||
try!(write!(f, "data: {}\n", line));
|
||||
}
|
||||
Ok(())
|
||||
}
|
||||
}
|
||||
|
||||
#[cfg(test)]
|
||||
mod tests {
|
||||
use super::*;
|
||||
|
||||
#[test]
|
||||
fn basic_event_display() {
|
||||
assert_eq!(
|
||||
"data: hello world\n",
|
||||
Event { id: None, event_type: None, data: "hello world".to_string() }.to_string());
|
||||
assert_eq!(
|
||||
"id: foo\ndata: hello world\n",
|
||||
Event { id: Some("foo".to_string()), event_type: None, data: "hello world".to_string() }.to_string());
|
||||
assert_eq!(
|
||||
"event: bar\ndata: hello world\n",
|
||||
Event { id: None, event_type: Some("bar".to_string()), data: "hello world".to_string() }.to_string());
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn multiline_event_display() {
|
||||
assert_eq!(
|
||||
"data: hello\ndata: world\n",
|
||||
Event { id: None, event_type: None, data: "hello\nworld".to_string() }.to_string());
|
||||
assert_eq!(
|
||||
"data: hello\ndata: \ndata: world\n",
|
||||
Event { id: None, event_type: None, data: "hello\n\nworld".to_string() }.to_string());
|
||||
}
|
||||
}
|
|
@ -0,0 +1,32 @@
|
|||
//! # EventSource
|
||||
//!
|
||||
//! EventSource is a Rust library for reading from Server-Sent Events endpoints. It transparently
|
||||
//! sends HTTP requests and only exposes a stream of events to the user. It handles automatic
|
||||
//! reconnection and parsing of the `text/event-stream` data format.
|
||||
//!
|
||||
//! # Examples
|
||||
//!
|
||||
//! ```no_run
|
||||
//! extern crate eventsource;
|
||||
//! extern crate reqwest;
|
||||
//! use eventsource::reqwest::Client;
|
||||
//! use reqwest::Url;
|
||||
//!
|
||||
//! fn main() {
|
||||
//! let client = Client::new(Url::parse("http://example.com").unwrap());
|
||||
//! for event in client {
|
||||
//! println!("{}", event.unwrap());
|
||||
//! }
|
||||
//! }
|
||||
//! ```
|
||||
//!
|
||||
|
||||
#[macro_use]
|
||||
extern crate error_chain;
|
||||
|
||||
// Generic text/event-stream parsing and serialization.
|
||||
pub mod event;
|
||||
|
||||
// HTTP interface
|
||||
#[cfg(feature = "with-reqwest")]
|
||||
pub mod reqwest;
|
|
@ -0,0 +1,178 @@
|
|||
//! # Reqwest-based EventSource client
|
||||
|
||||
extern crate mime;
|
||||
extern crate reqwest as reqw;
|
||||
|
||||
mod errors {
|
||||
error_chain! {
|
||||
foreign_links {
|
||||
Reqwest(super::reqw::Error);
|
||||
Io(::std::io::Error);
|
||||
}
|
||||
|
||||
errors {
|
||||
Http(status: super::reqw::StatusCode) {
|
||||
description("HTTP request failed")
|
||||
display("HTTP status code: {}", status)
|
||||
}
|
||||
InvalidContentType(mime_type: mime::Mime) {
|
||||
description("unexpected Content-Type header")
|
||||
display("unexpected Content-Type: {}", mime_type)
|
||||
}
|
||||
NoContentType {
|
||||
description("no Content-Type header in response")
|
||||
display("Content-Type missing")
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
pub use self::errors::*;
|
||||
|
||||
use self::reqw::header::{HeaderMap, HeaderValue, ACCEPT, CONTENT_TYPE};
|
||||
use super::event::{parse_event_line, Event, ParseResult};
|
||||
use std::io::{BufRead, BufReader};
|
||||
use std::time::{Duration, Instant};
|
||||
|
||||
const DEFAULT_RETRY: u64 = 5000;
|
||||
|
||||
/// A client for a Server-Sent Events endpoint.
|
||||
///
|
||||
/// Read events by iterating over the client.
|
||||
pub struct Client {
|
||||
client: reqw::Client,
|
||||
response: Option<BufReader<reqw::Response>>,
|
||||
url: reqw::Url,
|
||||
last_event_id: Option<String>,
|
||||
last_try: Option<Instant>,
|
||||
|
||||
/// Reconnection time in milliseconds. Note that the reconnection time can be changed by the
|
||||
/// event stream, so changing this may not make a difference.
|
||||
pub retry: Duration,
|
||||
}
|
||||
|
||||
impl Client {
|
||||
/// Constructs a new EventSource client for the given URL.
|
||||
///
|
||||
/// This does not start an HTTP request.
|
||||
pub fn new(url: reqw::Url) -> Client {
|
||||
Client {
|
||||
client: reqw::Client::new(),
|
||||
response: None,
|
||||
url: url,
|
||||
last_event_id: None,
|
||||
last_try: None,
|
||||
retry: Duration::from_millis(DEFAULT_RETRY),
|
||||
}
|
||||
}
|
||||
|
||||
fn next_request(&mut self) -> Result<()> {
|
||||
let mut headers = HeaderMap::with_capacity(2);
|
||||
headers.insert(ACCEPT, HeaderValue::from_str("text/event-stream").unwrap());
|
||||
if let Some(ref id) = self.last_event_id {
|
||||
headers.insert("Last-Event-ID", HeaderValue::from_str(id).unwrap());
|
||||
}
|
||||
|
||||
let res = self.client.get(self.url.clone()).headers(headers).send()?;
|
||||
|
||||
// Check status code and Content-Type.
|
||||
{
|
||||
let status = res.status();
|
||||
if !status.is_success() {
|
||||
return Err(ErrorKind::Http(status.clone()).into());
|
||||
}
|
||||
|
||||
if let Some(content_type_hv) = res.headers().get(CONTENT_TYPE) {
|
||||
let content_type = content_type_hv
|
||||
.to_str()
|
||||
.unwrap()
|
||||
.to_string()
|
||||
.parse::<mime::Mime>()
|
||||
.unwrap();
|
||||
// Compare type and subtype only, MIME parameters are ignored.
|
||||
if (content_type.type_(), content_type.subtype())
|
||||
!= (mime::TEXT, mime::EVENT_STREAM)
|
||||
{
|
||||
return Err(ErrorKind::InvalidContentType(content_type.clone()).into());
|
||||
}
|
||||
} else {
|
||||
return Err(ErrorKind::NoContentType.into());
|
||||
}
|
||||
}
|
||||
|
||||
self.response = Some(BufReader::new(res));
|
||||
Ok(())
|
||||
}
|
||||
}
|
||||
|
||||
// Helper macro for Option<Result<...>>
|
||||
macro_rules! try_option {
|
||||
($e:expr) => {
|
||||
match $e {
|
||||
Ok(val) => val,
|
||||
Err(err) => return Some(Err(::std::convert::From::from(err))),
|
||||
}
|
||||
};
|
||||
}
|
||||
|
||||
/// Iterate over the client to get events.
|
||||
///
|
||||
/// HTTP requests are made transparently while iterating.
|
||||
impl Iterator for Client {
|
||||
type Item = Result<Event>;
|
||||
|
||||
fn next(&mut self) -> Option<Result<Event>> {
|
||||
if self.response.is_none() {
|
||||
// We may have to wait for the next request.
|
||||
if let Some(last_try) = self.last_try {
|
||||
let elapsed = last_try.elapsed();
|
||||
if elapsed < self.retry {
|
||||
::std::thread::sleep(self.retry - elapsed);
|
||||
}
|
||||
}
|
||||
// Set here in case the request fails.
|
||||
self.last_try = Some(Instant::now());
|
||||
|
||||
try_option!(self.next_request());
|
||||
}
|
||||
|
||||
let result = {
|
||||
let mut event = Event::new();
|
||||
let mut line = String::new();
|
||||
let reader = self.response.as_mut().unwrap();
|
||||
|
||||
loop {
|
||||
match reader.read_line(&mut line) {
|
||||
// Got new bytes from stream
|
||||
Ok(_n) if _n > 0 => {
|
||||
match parse_event_line(&line, &mut event) {
|
||||
ParseResult::Next => (), // okay, just continue
|
||||
ParseResult::Dispatch => {
|
||||
if let Some(ref id) = event.id {
|
||||
self.last_event_id = Some(id.clone());
|
||||
}
|
||||
return Some(Ok(event));
|
||||
}
|
||||
ParseResult::SetRetry(ref retry) => {
|
||||
self.retry = *retry;
|
||||
}
|
||||
}
|
||||
line.clear();
|
||||
}
|
||||
// Nothing read from stream
|
||||
Ok(_) => break None,
|
||||
Err(err) => break Some(Err(::std::convert::From::from(err))),
|
||||
}
|
||||
}
|
||||
};
|
||||
|
||||
match result {
|
||||
None | Some(Err(_)) => {
|
||||
// EOF or a stream error, retry after timeout
|
||||
self.last_try = Some(Instant::now());
|
||||
self.response = None;
|
||||
self.next()
|
||||
}
|
||||
_ => result,
|
||||
}
|
||||
}
|
||||
}
|
|
@ -0,0 +1,126 @@
|
|||
extern crate eventsource;
|
||||
extern crate reqwest;
|
||||
|
||||
use eventsource::reqwest::{Client, Error, ErrorKind};
|
||||
use reqwest::Url;
|
||||
use std::time::Duration;
|
||||
|
||||
use server::Server;
|
||||
mod server;
|
||||
|
||||
fn server() -> Server {
|
||||
let s = Server::new();
|
||||
s.receive(
|
||||
"\
|
||||
GET / HTTP/1.1\r\n\
|
||||
host: 127.0.0.1:$PORT\r\n\
|
||||
accept: text/event-stream\r\n\
|
||||
accept-encoding: gzip\r\n\
|
||||
\r\n",
|
||||
);
|
||||
return s;
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn simple_events() {
|
||||
let s = server();
|
||||
s.send(
|
||||
"HTTP/1.1 200 OK\r\n\
|
||||
Content-Type: text/event-stream\r\n\
|
||||
\r\n\
|
||||
id: 42\r\n\
|
||||
event: foo\r\n\
|
||||
data: bar\r\n\
|
||||
\r\n\
|
||||
event: bar\n\
|
||||
: comment\n\
|
||||
data: baz\n\
|
||||
\n",
|
||||
);
|
||||
|
||||
println!("url: {}", s.url("/"));
|
||||
let mut client = Client::new(Url::parse(&s.url("/")).unwrap());
|
||||
|
||||
let event = client.next().unwrap().unwrap();
|
||||
assert_eq!(event.id, Some("42".into()));
|
||||
assert_eq!(event.event_type, Some("foo".into()));
|
||||
assert_eq!(event.data, "bar\n");
|
||||
|
||||
let event = client.next().unwrap().unwrap();
|
||||
assert_eq!(event.id, None);
|
||||
assert_eq!(event.event_type, Some("bar".into()));
|
||||
assert_eq!(event.data, "baz\n");
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn retry() {
|
||||
let s = server();
|
||||
s.send(
|
||||
"HTTP/1.1 200 OK\r\n\
|
||||
Content-Type: text/event-stream\r\n\
|
||||
\r\n\
|
||||
retry: 42\r\n\
|
||||
data: bar\r\n\
|
||||
\r\n",
|
||||
);
|
||||
|
||||
println!("url: {}", s.url("/"));
|
||||
let mut client = Client::new(Url::parse(&s.url("/")).unwrap());
|
||||
let event = client.next().unwrap().unwrap();
|
||||
assert_eq!(event.data, "bar\n");
|
||||
assert_eq!(client.retry, Duration::from_millis(42));
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn missing_content_type() {
|
||||
let s = server();
|
||||
s.send(
|
||||
"HTTP/1.1 200 OK\r\n\
|
||||
\r\n\
|
||||
data: bar\r\n\
|
||||
\r\n",
|
||||
);
|
||||
|
||||
let mut client = Client::new(Url::parse(&s.url("/")).unwrap());
|
||||
match client.next().unwrap() {
|
||||
Err(Error(ErrorKind::NoContentType, _)) => assert!(true),
|
||||
_ => assert!(false, "NoContentType error expected"),
|
||||
}
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn invalid_content_type() {
|
||||
let s = server();
|
||||
s.send(
|
||||
"HTTP/1.1 200 OK\r\n\
|
||||
Content-Type: text/plain\r\n\
|
||||
\r\n\
|
||||
data: bar\r\n\
|
||||
\r\n",
|
||||
);
|
||||
|
||||
let mut client = Client::new(Url::parse(&s.url("/")).unwrap());
|
||||
match client.next().unwrap() {
|
||||
Err(Error(ErrorKind::InvalidContentType(_), _)) => assert!(true),
|
||||
_ => assert!(false, "InvalidContentType error expected"),
|
||||
}
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn content_type_with_mime_parameter() {
|
||||
let s = server();
|
||||
s.send(
|
||||
"HTTP/1.1 200 OK\r\n\
|
||||
Content-Type: text/event-stream;charset=utf8\r\n\
|
||||
\r\n\
|
||||
data: bar\r\n\
|
||||
\r\n",
|
||||
);
|
||||
|
||||
let mut client = Client::new(Url::parse(&s.url("/")).unwrap());
|
||||
let event = client
|
||||
.next()
|
||||
.unwrap()
|
||||
.expect("MIME parameter should be ignored");
|
||||
assert_eq!(event.data, "bar\n");
|
||||
}
|
|
@ -0,0 +1,19 @@
|
|||
Copyright (c) 2014 Carl Lerche
|
||||
|
||||
Permission is hereby granted, free of charge, to any person obtaining a copy
|
||||
of this software and associated documentation files (the "Software"), to deal
|
||||
in the Software without restriction, including without limitation the rights
|
||||
to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
|
||||
copies of the Software, and to permit persons to whom the Software is
|
||||
furnished to do so, subject to the following conditions:
|
||||
|
||||
The above copyright notice and this permission notice shall be included in
|
||||
all copies or substantial portions of the Software.
|
||||
|
||||
THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
|
||||
IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
|
||||
FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
|
||||
AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
|
||||
LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
|
||||
OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN
|
||||
THE SOFTWARE.
|
|
@ -0,0 +1,191 @@
|
|||
#![allow(dead_code)]
|
||||
|
||||
use std::collections::HashSet;
|
||||
use std::io::prelude::*;
|
||||
use std::io::BufReader;
|
||||
use std::net::{SocketAddr, TcpListener, TcpStream};
|
||||
use std::sync::mpsc::{channel, Receiver, Sender};
|
||||
use std::thread;
|
||||
|
||||
macro_rules! t {
|
||||
($e:expr) => {
|
||||
match $e {
|
||||
Ok(e) => e,
|
||||
Err(e) => panic!("{} failed with {:?}", stringify!($e), e),
|
||||
}
|
||||
};
|
||||
}
|
||||
|
||||
pub struct Server {
|
||||
messages: Option<Sender<Message>>,
|
||||
addr: SocketAddr,
|
||||
thread: Option<thread::JoinHandle<()>>,
|
||||
}
|
||||
|
||||
enum Message {
|
||||
Read(String),
|
||||
Write(String),
|
||||
}
|
||||
|
||||
fn run(listener: &TcpListener, rx: &Receiver<Message>) {
|
||||
let mut socket = BufReader::new(listener.accept().unwrap().0);
|
||||
for msg in rx.iter() {
|
||||
match msg {
|
||||
Message::Read(ref expected) => {
|
||||
let mut expected = &expected[..];
|
||||
let mut expected_headers = HashSet::new();
|
||||
while let Some(i) = expected.find("\n") {
|
||||
let line = &expected[..i + 1];
|
||||
expected = &expected[i + 1..];
|
||||
expected_headers.insert(line);
|
||||
if line == "\r\n" {
|
||||
break;
|
||||
}
|
||||
}
|
||||
|
||||
let mut expected_len = None;
|
||||
while expected_headers.len() > 0 {
|
||||
let mut actual = String::new();
|
||||
t!(socket.read_line(&mut actual));
|
||||
if actual.starts_with("Content-Length") {
|
||||
let len = actual.split(": ").skip(1).next().unwrap();
|
||||
expected_len = len.trim().parse().ok();
|
||||
}
|
||||
// various versions of libcurl do different things here
|
||||
if actual == "Proxy-Connection: Keep-Alive\r\n" {
|
||||
continue;
|
||||
}
|
||||
// The User-Agent header changes with the reqwest version.
|
||||
if actual.starts_with("user-agent:") {
|
||||
continue;
|
||||
}
|
||||
if expected_headers.remove(&actual[..]) {
|
||||
continue;
|
||||
}
|
||||
|
||||
let mut found = None;
|
||||
for header in expected_headers.iter() {
|
||||
if lines_match(header, &actual) {
|
||||
found = Some(header.clone());
|
||||
break;
|
||||
}
|
||||
}
|
||||
if let Some(found) = found {
|
||||
expected_headers.remove(&found);
|
||||
continue;
|
||||
}
|
||||
panic!(
|
||||
"unexpected header: {:?} (remaining headers {:?})",
|
||||
actual, expected_headers
|
||||
);
|
||||
}
|
||||
for header in expected_headers {
|
||||
panic!("expected header but not found: {:?}", header);
|
||||
}
|
||||
|
||||
let mut line = String::new();
|
||||
let mut socket = match expected_len {
|
||||
Some(amt) => socket.by_ref().take(amt),
|
||||
None => socket.by_ref().take(expected.len() as u64),
|
||||
};
|
||||
while socket.limit() > 0 {
|
||||
line.truncate(0);
|
||||
t!(socket.read_line(&mut line));
|
||||
if line.len() == 0 {
|
||||
break;
|
||||
}
|
||||
if expected.len() == 0 {
|
||||
panic!("unexpected line: {:?}", line);
|
||||
}
|
||||
let i = expected.find("\n").unwrap_or(expected.len() - 1);
|
||||
let expected_line = &expected[..i + 1];
|
||||
expected = &expected[i + 1..];
|
||||
if lines_match(expected_line, &line) {
|
||||
continue;
|
||||
}
|
||||
panic!(
|
||||
"lines didn't match:\n\
|
||||
expected: {:?}\n\
|
||||
actual: {:?}\n",
|
||||
expected_line, line
|
||||
)
|
||||
}
|
||||
if expected.len() != 0 {
|
||||
println!("didn't get expected data: {:?}", expected);
|
||||
}
|
||||
}
|
||||
Message::Write(ref to_write) => {
|
||||
t!(socket.get_mut().write_all(to_write.as_bytes()));
|
||||
return;
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
let mut dst = Vec::new();
|
||||
t!(socket.read_to_end(&mut dst));
|
||||
assert!(dst.len() == 0);
|
||||
}
|
||||
|
||||
fn lines_match(expected: &str, mut actual: &str) -> bool {
|
||||
for (i, part) in expected.split("[..]").enumerate() {
|
||||
match actual.find(part) {
|
||||
Some(j) => {
|
||||
if i == 0 && j != 0 {
|
||||
return false;
|
||||
}
|
||||
actual = &actual[j + part.len()..];
|
||||
}
|
||||
None => return false,
|
||||
}
|
||||
}
|
||||
actual.is_empty() || expected.ends_with("[..]")
|
||||
}
|
||||
|
||||
impl Server {
|
||||
pub fn new() -> Server {
|
||||
let listener = t!(TcpListener::bind("127.0.0.1:0"));
|
||||
let addr = t!(listener.local_addr());
|
||||
let (tx, rx) = channel();
|
||||
let thread = thread::spawn(move || run(&listener, &rx));
|
||||
Server {
|
||||
messages: Some(tx),
|
||||
addr: addr,
|
||||
thread: Some(thread),
|
||||
}
|
||||
}
|
||||
|
||||
pub fn receive(&self, msg: &str) {
|
||||
let msg = msg.replace("$PORT", &self.addr.port().to_string());
|
||||
self.msg(Message::Read(msg));
|
||||
}
|
||||
|
||||
pub fn send(&self, msg: &str) {
|
||||
let msg = msg.replace("$PORT", &self.addr.port().to_string());
|
||||
self.msg(Message::Write(msg));
|
||||
}
|
||||
|
||||
fn msg(&self, msg: Message) {
|
||||
t!(self.messages.as_ref().unwrap().send(msg));
|
||||
}
|
||||
|
||||
pub fn addr(&self) -> &SocketAddr {
|
||||
&self.addr
|
||||
}
|
||||
|
||||
pub fn url(&self, path: &str) -> String {
|
||||
format!("http://{}{}", self.addr, path)
|
||||
}
|
||||
}
|
||||
|
||||
impl Drop for Server {
|
||||
fn drop(&mut self) {
|
||||
drop(TcpStream::connect(&self.addr));
|
||||
drop(self.messages.take());
|
||||
let res = self.thread.take().unwrap().join();
|
||||
if !thread::panicking() {
|
||||
t!(res);
|
||||
} else if let Err(e) = res {
|
||||
println!("child server thread also failed: {:?}", e);
|
||||
}
|
||||
}
|
||||
}
|
Loading…
Reference in New Issue