From a36a444b2c1f54ca2f1e3c019aebedea4323d58d Mon Sep 17 00:00:00 2001 From: Manos Pitsidianakis Date: Tue, 5 Jan 2021 17:09:48 +0200 Subject: [PATCH] JMAP eventsource WIP --- melib/src/backends/jmap/connection.rs | 1 + .../backends/jmap/connection/eventsource.rs | 266 ++++++++++++++++++ 2 files changed, 267 insertions(+) create mode 100644 melib/src/backends/jmap/connection/eventsource.rs diff --git a/melib/src/backends/jmap/connection.rs b/melib/src/backends/jmap/connection.rs index 8f5e506fe..e15eee433 100644 --- a/melib/src/backends/jmap/connection.rs +++ b/melib/src/backends/jmap/connection.rs @@ -19,6 +19,7 @@ * along with meli. If not, see . */ +pub mod eventsource; use super::*; use isahc::config::Configurable; diff --git a/melib/src/backends/jmap/connection/eventsource.rs b/melib/src/backends/jmap/connection/eventsource.rs new file mode 100644 index 000000000..e113c04b1 --- /dev/null +++ b/melib/src/backends/jmap/connection/eventsource.rs @@ -0,0 +1,266 @@ +/* + * meli - jmap module. + * + * Copyright 2021 Manos Pitsidianakis + * + * This file is part of meli. + * + * meli is free software: you can redistribute it and/or modify + * it under the terms of the GNU General Public License as published by + * the Free Software Foundation, either version 3 of the License, or + * (at your option) any later version. + * + * meli is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU General Public License for more details. + * + * You should have received a copy of the GNU General Public License + * along with meli. If not, see . + */ + +use super::{HttpClient, JmapConnection, JmapServerConf, Store}; +use crate::error::Result; +use std::convert::TryFrom; +use std::io::{BufRead, BufReader}; +use std::sync::Arc; +use std::time::{Duration, Instant}; + +const DEFAULT_RETRY: u64 = 5000; + +/// A single Server-Sent Event. +#[derive(Debug, Default)] +pub struct Event { + /// Corresponds to the `id` field. + pub id: Option, + /// Corresponds to the `event` field. + pub event_type: Option, + /// All `data` fields concatenated by newlines. + pub data: String, +} + +/// Possible results from parsing a single event-stream line. +#[derive(Debug, PartialEq)] +pub enum ParseResult { + /// Line parsed successfully, but the event is not complete yet. + Next, + /// The event is complete now. Pass a new (empty) event for the next call. + Dispatch, + /// Set retry time. + SetRetry(Duration), +} + +pub fn parse_event_line(line: &str, event: &mut Event) -> ParseResult { + let line = line.trim_end_matches(|c| c == '\r' || c == '\n'); + if line == "" { + ParseResult::Dispatch + } else { + let (field, value) = if let Some(pos) = line.find(':') { + let (f, v) = line.split_at(pos); + // Strip : and an optional space. + let v = &v[1..]; + let v = if v.starts_with(' ') { &v[1..] } else { v }; + (f, v) + } else { + (line, "") + }; + match field { + "event" => { + event.event_type = Some(value.to_string()); + } + "data" => { + event.data.push_str(value); + event.data.push('\n'); + } + "id" => { + event.id = Some(value.to_string()); + } + "retry" => { + if let Ok(retry) = value.parse::() { + return ParseResult::SetRetry(Duration::from_millis(retry)); + } + } + _ => (), // ignored + } + + ParseResult::Next + } +} + +impl Event { + /// Creates an empty event. + pub fn new() -> Event { + Event::default() + } + + /// Returns `true` if the event is empty. + /// + /// An event is empty if it has no id or event type and its data field is empty. + pub fn is_empty(&self) -> bool { + self.id.is_none() && self.event_type.is_none() && self.data.is_empty() + } + + /// Makes the event empty. + pub fn clear(&mut self) { + self.id = None; + self.event_type = None; + self.data.clear(); + } +} + +/// A client for a Server-Sent Events endpoint. +/// +/// Read events by iterating over the client. +pub struct JmapEventSourceConnection { + pub client: Arc, + pub store: Arc, + pub server_conf: JmapServerConf, + response: Option>, + url: isahc::http::Uri, + last_event_id: Option, + last_try: Option, + pub retry: Duration, +} + +impl JmapEventSourceConnection { + pub fn new(conn: &JmapConnection) -> Result { + let url = + isahc::http::Uri::try_from(conn.session.lock().unwrap().event_source_url.as_str()) + .map_err(|err| err.to_string())?; + debug!("event_source {}", &url); + Ok(Self { + client: conn.client.clone(), + server_conf: conn.server_conf.clone(), + store: conn.store.clone(), + response: None, + url: url, + last_event_id: None, + last_try: None, + retry: Duration::from_millis(DEFAULT_RETRY), + }) + } + + pub async fn next_request(&mut self) -> Result<()> { + use isahc::{http, http::request::Request, prelude::*}; + + let mut request = Request::get(&self.url) + .timeout(std::time::Duration::from_secs(10)) + .redirect_policy(isahc::config::RedirectPolicy::Limit(10)) + .authentication(isahc::auth::Authentication::basic()) + .credentials(isahc::auth::Credentials::new( + &self.server_conf.server_username, + &self.server_conf.server_password, + )) + .header(http::header::ACCEPT, "text/event-stream"); + if let Some(ref id) = self.last_event_id { + request = request.header("Last-Event-ID", id.as_str()); + } + let request = request.body(()).map_err(|err| err.to_string())?; + + debug!(&request); + let mut response = self.client.send_async(request).await?; + debug!(&response); + //debug_assert!(response.status().is_success()); + /* + let mut headers = HeaderMap::with_capacity(2); + headers.insert(ACCEPT, HeaderValue::from_str("text/event-stream").unwrap()); + if let Some(ref id) = self.last_event_id { + headers.insert("Last-Event-ID", HeaderValue::from_str(id).unwrap()); + } + + let res = self.client.get(self.url.clone()).headers(headers).send()?; + */ + + // Check status code and Content-Type. + { + let status = response.status(); + if !status.is_success() { + let res_text = response.text_async().await?; + return Err(debug!(format!("{} {}", status.as_str(), res_text)).into()); + } + + if let Some(content_type_hv) = response.headers().get(isahc::http::header::CONTENT_TYPE) + { + if content_type_hv.to_str().unwrap() != "text/event-stream" { + panic!(content_type_hv.to_str().unwrap().to_string()); + } + /* + let content_type = content_type_hv + .to_str() + .unwrap() + .to_string() + .parse::() + .unwrap(); + // Compare type and subtype only, MIME parameters are ignored. + if (content_type.type_(), content_type.subtype()) + != (mime::TEXT, mime::EVENT_STREAM) + { + return Err(ErrorKind::InvalidContentType(content_type.clone()).into()); + } + */ + } + } + + self.response = Some(BufReader::new(response.into_body())); + Ok(()) + } +} /* + + pub async fn next_event(&mut self) -> Result { + let mut line = String::new(); + 'main_loop: loop { + match self.response.as_mut() { + None => { + // wait for the next request. + if let Some(last_try) = self.last_try { + let elapsed = last_try.elapsed(); + if elapsed < self.retry { + crate::connection::sleep(self.retry - elapsed).await; + } + } + // Set here in case the request fails. + self.last_try = Some(Instant::now()); + + self.next_request().await?; + } + Some(reader) => { + let mut event = Event::new(); + loop { + match reader.read_line(&mut line) { + // Got new bytes from stream + Ok(_n) if _n > 0 => { + match parse_event_line(&line, &mut event) { + ParseResult::Next => {} // okay, just continue + ParseResult::Dispatch => { + if let Some(ref id) = event.id { + self.last_event_id = Some(id.clone()); + } + return Ok(event); + } + ParseResult::SetRetry(ref retry) => { + self.retry = *retry; + } + } + line.clear(); + } + Ok(0) => { + // EOF or a stream error, retry after timeout + self.last_try = Some(Instant::now()); + self.response = None; + continue 'main_loop; + } + Err(err) => { + // EOF or a stream error, retry after timeout + self.last_try = Some(Instant::now()); + self.response = None; + debug!(&err); + continue 'main_loop; + } + } + } + } + } + } + } + } + */