You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.

889 lines
32 KiB

  1. /*
  2. * meli - imap module.
  3. *
  4. * Copyright 2017 - 2019 Manos Pitsidianakis
  5. *
  6. * This file is part of meli.
  7. *
  8. * meli is free software: you can redistribute it and/or modify
  9. * it under the terms of the GNU General Public License as published by
  10. * the Free Software Foundation, either version 3 of the License, or
  11. * (at your option) any later version.
  12. *
  13. * meli is distributed in the hope that it will be useful,
  14. * but WITHOUT ANY WARRANTY; without even the implied warranty of
  15. * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
  16. * GNU General Public License for more details.
  17. *
  18. * You should have received a copy of the GNU General Public License
  19. * along with meli. If not, see <http://www.gnu.org/licenses/>.
  20. */
  21. use super::protocol_parser::{ImapLineSplit, ImapResponse, RequiredResponses};
  22. use crate::backends::MailboxHash;
  23. use crate::connections::{lookup_ipv4, Connection};
  24. use crate::email::parser::BytesExt;
  25. use crate::error::*;
  26. extern crate native_tls;
  27. use futures::io::{AsyncReadExt, AsyncWriteExt};
  28. use native_tls::TlsConnector;
  29. pub use smol::Async as AsyncWrapper;
  30. use std::collections::HashSet;
  31. use std::future::Future;
  32. use std::iter::FromIterator;
  33. use std::pin::Pin;
  34. use std::sync::Arc;
  35. use std::time::Instant;
  36. use super::protocol_parser;
  37. use super::{Capabilities, ImapServerConf, UIDStore};
  38. #[derive(Debug, Clone, Copy)]
  39. pub enum ImapProtocol {
  40. IMAP { extension_use: ImapExtensionUse },
  41. ManageSieve,
  42. }
  43. #[derive(Debug, Clone, Copy)]
  44. pub struct ImapExtensionUse {
  45. pub idle: bool,
  46. #[cfg(feature = "deflate_compression")]
  47. pub deflate: bool,
  48. }
  49. impl Default for ImapExtensionUse {
  50. fn default() -> Self {
  51. Self {
  52. idle: true,
  53. #[cfg(feature = "deflate_compression")]
  54. deflate: false,
  55. }
  56. }
  57. }
  58. #[derive(Debug)]
  59. pub struct ImapStream {
  60. pub cmd_id: usize,
  61. pub stream: AsyncWrapper<Connection>,
  62. pub protocol: ImapProtocol,
  63. pub current_mailbox: MailboxSelection,
  64. }
  65. #[derive(Debug, Copy, Clone, Eq, PartialEq)]
  66. pub enum MailboxSelection {
  67. None,
  68. Select(MailboxHash),
  69. Examine(MailboxHash),
  70. }
  71. impl MailboxSelection {
  72. pub fn take(&mut self) -> Self {
  73. std::mem::replace(self, MailboxSelection::None)
  74. }
  75. }
  76. async fn try_await(cl: impl Future<Output = Result<()>> + Send) -> Result<()> {
  77. cl.await
  78. }
  79. #[derive(Debug)]
  80. pub struct ImapConnection {
  81. pub stream: Result<ImapStream>,
  82. pub server_conf: ImapServerConf,
  83. pub uid_store: Arc<UIDStore>,
  84. }
  85. impl ImapStream {
  86. pub async fn new_connection(
  87. server_conf: &ImapServerConf,
  88. ) -> Result<(Capabilities, ImapStream)> {
  89. use std::net::TcpStream;
  90. let path = &server_conf.server_hostname;
  91. let cmd_id = 1;
  92. let stream = if server_conf.use_tls {
  93. let mut connector = TlsConnector::builder();
  94. if server_conf.danger_accept_invalid_certs {
  95. connector.danger_accept_invalid_certs(true);
  96. }
  97. let connector = connector
  98. .build()
  99. .chain_err_kind(crate::error::ErrorKind::Network)?;
  100. let addr = if let Ok(a) = lookup_ipv4(path, server_conf.server_port) {
  101. a
  102. } else {
  103. return Err(MeliError::new(format!(
  104. "Could not lookup address {}",
  105. &path
  106. )));
  107. };
  108. let mut socket = AsyncWrapper::new(Connection::Tcp(
  109. TcpStream::connect_timeout(&addr, std::time::Duration::new(4, 0))
  110. .chain_err_kind(crate::error::ErrorKind::Network)?,
  111. ))
  112. .chain_err_kind(crate::error::ErrorKind::Network)?;
  113. if server_conf.use_starttls {
  114. let mut buf = vec![0; Connection::IO_BUF_SIZE];
  115. match server_conf.protocol {
  116. ImapProtocol::IMAP { .. } => socket
  117. .write_all(format!("M{} STARTTLS\r\n", cmd_id).as_bytes())
  118. .await
  119. .chain_err_kind(crate::error::ErrorKind::Network)?,
  120. ImapProtocol::ManageSieve => {
  121. socket
  122. .read(&mut buf)
  123. .await
  124. .chain_err_kind(crate::error::ErrorKind::Network)?;
  125. socket
  126. .write_all(b"STARTTLS\r\n")
  127. .await
  128. .chain_err_kind(crate::error::ErrorKind::Network)?;
  129. }
  130. }
  131. let mut response = String::with_capacity(1024);
  132. let mut broken = false;
  133. let now = std::time::Instant::now();
  134. while now.elapsed().as_secs() < 3 {
  135. let len = socket
  136. .read(&mut buf)
  137. .await
  138. .chain_err_kind(crate::error::ErrorKind::Network)?;
  139. response.push_str(unsafe { std::str::from_utf8_unchecked(&buf[0..len]) });
  140. match server_conf.protocol {
  141. ImapProtocol::IMAP { .. } => {
  142. if response.starts_with("* OK ") && response.find("\r\n").is_some() {
  143. if let Some(pos) = response.as_bytes().find(b"\r\n") {
  144. response.drain(0..pos + 2);
  145. }
  146. }
  147. }
  148. ImapProtocol::ManageSieve => {
  149. if response.starts_with("OK ") && response.find("\r\n").is_some() {
  150. response.clear();
  151. broken = true;
  152. break;
  153. }
  154. }
  155. }
  156. if response.starts_with("M1 OK") {
  157. broken = true;
  158. break;
  159. }
  160. }
  161. if !broken {
  162. return Err(MeliError::new(format!(
  163. "Could not initiate TLS negotiation to {}.",
  164. path
  165. )));
  166. }
  167. }
  168. {
  169. // FIXME: This is blocking
  170. let socket = socket
  171. .into_inner()
  172. .chain_err_kind(crate::error::ErrorKind::Network)?;
  173. let mut conn_result = connector.connect(path, socket);
  174. if let Err(native_tls::HandshakeError::WouldBlock(midhandshake_stream)) =
  175. conn_result
  176. {
  177. let mut midhandshake_stream = Some(midhandshake_stream);
  178. loop {
  179. match midhandshake_stream.take().unwrap().handshake() {
  180. Ok(r) => {
  181. conn_result = Ok(r);
  182. break;
  183. }
  184. Err(native_tls::HandshakeError::WouldBlock(stream)) => {
  185. midhandshake_stream = Some(stream);
  186. }
  187. p => {
  188. p.chain_err_kind(crate::error::ErrorKind::Network)?;
  189. }
  190. }
  191. }
  192. }
  193. AsyncWrapper::new(Connection::Tls(
  194. conn_result.chain_err_kind(crate::error::ErrorKind::Network)?,
  195. ))
  196. .chain_err_kind(crate::error::ErrorKind::Network)?
  197. }
  198. } else {
  199. let addr = if let Ok(a) = lookup_ipv4(path, server_conf.server_port) {
  200. a
  201. } else {
  202. return Err(MeliError::new(format!(
  203. "Could not lookup address {}",
  204. &path
  205. )));
  206. };
  207. AsyncWrapper::new(Connection::Tcp(
  208. TcpStream::connect_timeout(&addr, std::time::Duration::new(4, 0))
  209. .chain_err_kind(crate::error::ErrorKind::Network)?,
  210. ))
  211. .chain_err_kind(crate::error::ErrorKind::Network)?
  212. };
  213. let mut res = String::with_capacity(8 * 1024);
  214. let mut ret = ImapStream {
  215. cmd_id,
  216. stream,
  217. protocol: server_conf.protocol,
  218. current_mailbox: MailboxSelection::None,
  219. };
  220. if let ImapProtocol::ManageSieve = server_conf.protocol {
  221. use data_encoding::BASE64;
  222. ret.read_response(&mut res).await?;
  223. ret.send_command(
  224. format!(
  225. "AUTHENTICATE \"PLAIN\" \"{}\"",
  226. BASE64.encode(
  227. format!(
  228. "\0{}\0{}",
  229. &server_conf.server_username, &server_conf.server_password
  230. )
  231. .as_bytes()
  232. )
  233. )
  234. .as_bytes(),
  235. )
  236. .await?;
  237. ret.read_response(&mut res).await?;
  238. return Ok((Default::default(), ret));
  239. }
  240. ret.send_command(b"CAPABILITY").await?;
  241. ret.read_response(&mut res).await?;
  242. let capabilities: std::result::Result<Vec<&[u8]>, _> = res
  243. .split_rn()
  244. .find(|l| l.starts_with("* CAPABILITY"))
  245. .ok_or_else(|| MeliError::new(""))
  246. .and_then(|res| {
  247. protocol_parser::capabilities(res.as_bytes())
  248. .map_err(|_| MeliError::new(""))
  249. .map(|(_, v)| v)
  250. });
  251. if capabilities.is_err() {
  252. return Err(MeliError::new(format!(
  253. "Could not connect to {}: expected CAPABILITY response but got:{}",
  254. &server_conf.server_hostname, res
  255. )));
  256. }
  257. let capabilities = capabilities.unwrap();
  258. if !capabilities
  259. .iter()
  260. .any(|cap| cap.eq_ignore_ascii_case(b"IMAP4rev1"))
  261. {
  262. return Err(MeliError::new(format!(
  263. "Could not connect to {}: server is not IMAP4rev1 compliant",
  264. &server_conf.server_hostname
  265. )));
  266. } else if capabilities
  267. .iter()
  268. .any(|cap| cap.eq_ignore_ascii_case(b"LOGINDISABLED"))
  269. {
  270. return Err(MeliError::new(format!(
  271. "Could not connect to {}: server does not accept logins [LOGINDISABLED]",
  272. &server_conf.server_hostname
  273. ))
  274. .set_err_kind(crate::error::ErrorKind::Authentication));
  275. }
  276. let mut capabilities = None;
  277. ret.send_command(
  278. format!(
  279. "LOGIN \"{}\" \"{}\"",
  280. &server_conf.server_username, &server_conf.server_password
  281. )
  282. .as_bytes(),
  283. )
  284. .await?;
  285. let tag_start = format!("M{} ", (ret.cmd_id - 1));
  286. loop {
  287. ret.read_lines(&mut res, &String::new(), false).await?;
  288. let mut should_break = false;
  289. for l in res.split_rn() {
  290. if l.starts_with("* CAPABILITY") {
  291. capabilities = protocol_parser::capabilities(l.as_bytes())
  292. .map(|(_, capabilities)| {
  293. HashSet::from_iter(capabilities.into_iter().map(|s: &[u8]| s.to_vec()))
  294. })
  295. .ok();
  296. }
  297. if l.starts_with(tag_start.as_str()) {
  298. if !l[tag_start.len()..].trim().starts_with("OK ") {
  299. return Err(MeliError::new(format!(
  300. "Could not connect. Server replied with '{}'",
  301. l[tag_start.len()..].trim()
  302. ))
  303. .set_err_kind(crate::error::ErrorKind::Authentication));
  304. }
  305. should_break = true;
  306. }
  307. }
  308. if should_break {
  309. break;
  310. }
  311. }
  312. if capabilities.is_none() {
  313. /* sending CAPABILITY after LOGIN automatically is an RFC recommendation, so check
  314. * for lazy servers */
  315. drop(capabilities);
  316. ret.send_command(b"CAPABILITY").await?;
  317. ret.read_response(&mut res).await.unwrap();
  318. let capabilities = protocol_parser::capabilities(res.as_bytes())?.1;
  319. let capabilities = HashSet::from_iter(capabilities.into_iter().map(|s| s.to_vec()));
  320. Ok((capabilities, ret))
  321. } else {
  322. let capabilities = capabilities.unwrap();
  323. Ok((capabilities, ret))
  324. }
  325. }
  326. pub async fn read_response(&mut self, ret: &mut String) -> Result<()> {
  327. let id = match self.protocol {
  328. ImapProtocol::IMAP { .. } => format!("M{} ", self.cmd_id - 1),
  329. ImapProtocol::ManageSieve => String::new(),
  330. };
  331. self.read_lines(ret, &id, true).await?;
  332. Ok(())
  333. }
  334. pub async fn read_lines(
  335. &mut self,
  336. ret: &mut String,
  337. termination_string: &str,
  338. keep_termination_string: bool,
  339. ) -> Result<()> {
  340. let mut buf: Vec<u8> = vec![0; Connection::IO_BUF_SIZE];
  341. ret.clear();
  342. let mut last_line_idx: usize = 0;
  343. loop {
  344. match self.stream.read(&mut buf).await {
  345. Ok(0) => break,
  346. Ok(b) => {
  347. ret.push_str(unsafe { std::str::from_utf8_unchecked(&buf[0..b]) });
  348. if let Some(mut pos) = ret[last_line_idx..].rfind("\r\n") {
  349. if ret[last_line_idx..].starts_with("* BYE") {
  350. return Err(MeliError::new("Disconnected"));
  351. }
  352. if let Some(prev_line) =
  353. ret[last_line_idx..pos + last_line_idx].rfind("\r\n")
  354. {
  355. last_line_idx += prev_line + "\r\n".len();
  356. pos -= prev_line + "\r\n".len();
  357. }
  358. if Some(pos + "\r\n".len()) == ret.get(last_line_idx..).map(|r| r.len()) {
  359. if !termination_string.is_empty()
  360. && ret[last_line_idx..].starts_with(termination_string)
  361. {
  362. debug!(&ret[last_line_idx..]);
  363. if !keep_termination_string {
  364. ret.replace_range(last_line_idx.., "");
  365. }
  366. break;
  367. } else if termination_string.is_empty() {
  368. break;
  369. }
  370. }
  371. last_line_idx += pos + "\r\n".len();
  372. }
  373. }
  374. Err(ref e) if e.kind() == std::io::ErrorKind::WouldBlock => {
  375. continue;
  376. }
  377. Err(e) => {
  378. return Err(MeliError::from(e).set_err_kind(crate::error::ErrorKind::Network));
  379. }
  380. }
  381. }
  382. //debug!("returning IMAP response:\n{:?}", &ret);
  383. Ok(())
  384. }
  385. pub async fn wait_for_continuation_request(&mut self) -> Result<()> {
  386. let term = "+ ".to_string();
  387. let mut ret = String::new();
  388. self.read_lines(&mut ret, &term, false).await?;
  389. Ok(())
  390. }
  391. pub async fn send_command(&mut self, command: &[u8]) -> Result<()> {
  392. if let Err(err) = try_await(async move {
  393. let command = command.trim();
  394. match self.protocol {
  395. ImapProtocol::IMAP { .. } => {
  396. self.stream.write_all(b"M").await?;
  397. self.stream
  398. .write_all(self.cmd_id.to_string().as_bytes())
  399. .await?;
  400. self.stream.write_all(b" ").await?;
  401. self.cmd_id += 1;
  402. }
  403. ImapProtocol::ManageSieve => {}
  404. }
  405. self.stream.write_all(command).await?;
  406. self.stream.write_all(b"\r\n").await?;
  407. self.stream.flush().await?;
  408. match self.protocol {
  409. ImapProtocol::IMAP { .. } => {
  410. debug!("sent: M{} {}", self.cmd_id - 1, unsafe {
  411. std::str::from_utf8_unchecked(command)
  412. });
  413. }
  414. ImapProtocol::ManageSieve => {}
  415. }
  416. Ok(())
  417. })
  418. .await
  419. {
  420. Err(err.set_err_kind(crate::error::ErrorKind::Network))
  421. } else {
  422. Ok(())
  423. }
  424. }
  425. pub async fn send_literal(&mut self, data: &[u8]) -> Result<()> {
  426. if let Err(err) = try_await(async move {
  427. self.stream.write_all(data).await?;
  428. self.stream.write_all(b"\r\n").await?;
  429. Ok(())
  430. })
  431. .await
  432. {
  433. Err(err.set_err_kind(crate::error::ErrorKind::Network))
  434. } else {
  435. Ok(())
  436. }
  437. }
  438. pub async fn send_raw(&mut self, raw: &[u8]) -> Result<()> {
  439. if let Err(err) = try_await(async move {
  440. self.stream.write_all(raw).await?;
  441. self.stream.write_all(b"\r\n").await?;
  442. Ok(())
  443. })
  444. .await
  445. {
  446. Err(err.set_err_kind(crate::error::ErrorKind::Network))
  447. } else {
  448. Ok(())
  449. }
  450. }
  451. }
  452. impl ImapConnection {
  453. pub fn new_connection(
  454. server_conf: &ImapServerConf,
  455. uid_store: Arc<UIDStore>,
  456. ) -> ImapConnection {
  457. ImapConnection {
  458. stream: Err(MeliError::new("Offline".to_string())),
  459. server_conf: server_conf.clone(),
  460. uid_store,
  461. }
  462. }
  463. pub fn connect<'a>(&'a mut self) -> Pin<Box<dyn Future<Output = Result<()>> + Send + 'a>> {
  464. Box::pin(async move {
  465. if let (instant, ref mut status @ Ok(())) = *self.uid_store.is_online.lock().unwrap() {
  466. if Instant::now().duration_since(instant) >= std::time::Duration::new(60 * 30, 0) {
  467. *status = Err(MeliError::new("Connection timed out"));
  468. self.stream = Err(MeliError::new("Connection timed out"));
  469. }
  470. }
  471. if self.stream.is_ok() {
  472. self.uid_store.is_online.lock().unwrap().0 = Instant::now();
  473. return Ok(());
  474. }
  475. let new_stream = debug!(ImapStream::new_connection(&self.server_conf).await);
  476. if let Err(err) = new_stream.as_ref() {
  477. *self.uid_store.is_online.lock().unwrap() = (Instant::now(), Err(err.clone()));
  478. } else {
  479. *self.uid_store.is_online.lock().unwrap() = (Instant::now(), Ok(()));
  480. }
  481. let (capabilities, stream) = new_stream?;
  482. self.stream = Ok(stream);
  483. match self.stream.as_ref()?.protocol {
  484. ImapProtocol::IMAP {
  485. extension_use:
  486. ImapExtensionUse {
  487. #[cfg(feature = "deflate_compression")]
  488. deflate,
  489. idle: _idle,
  490. },
  491. } =>
  492. {
  493. #[cfg(feature = "deflate_compression")]
  494. if capabilities.contains(&b"COMPRESS=DEFLATE"[..]) && deflate {
  495. let mut ret = String::new();
  496. self.send_command(b"COMPRESS DEFLATE").await?;
  497. self.read_response(&mut ret, RequiredResponses::empty())
  498. .await?;
  499. match ImapResponse::from(&ret) {
  500. ImapResponse::No(code)
  501. | ImapResponse::Bad(code)
  502. | ImapResponse::Preauth(code)
  503. | ImapResponse::Bye(code) => {
  504. crate::log(format!("Could not use COMPRESS=DEFLATE in account `{}`: server replied with `{}`", self.uid_store.account_name, code), crate::LoggingLevel::WARN);
  505. }
  506. ImapResponse::Ok(_) => {
  507. let ImapStream {
  508. cmd_id,
  509. stream,
  510. protocol,
  511. current_mailbox,
  512. } = std::mem::replace(&mut self.stream, Err(MeliError::new("")))?;
  513. let stream = stream.into_inner()?;
  514. self.stream = Ok(ImapStream {
  515. cmd_id,
  516. stream: AsyncWrapper::new(stream.deflate())?,
  517. protocol,
  518. current_mailbox,
  519. });
  520. }
  521. }
  522. }
  523. }
  524. ImapProtocol::ManageSieve => {}
  525. }
  526. *self.uid_store.capabilities.lock().unwrap() = capabilities;
  527. Ok(())
  528. })
  529. }
  530. pub fn read_response<'a>(
  531. &'a mut self,
  532. ret: &'a mut String,
  533. required_responses: RequiredResponses,
  534. ) -> Pin<Box<dyn Future<Output = Result<()>> + Send + 'a>> {
  535. Box::pin(async move {
  536. let mut response = String::new();
  537. ret.clear();
  538. self.stream.as_mut()?.read_response(&mut response).await?;
  539. match self.server_conf.protocol {
  540. ImapProtocol::IMAP { .. } => {
  541. let r: ImapResponse = ImapResponse::from(&response);
  542. match r {
  543. ImapResponse::Bye(ref response_code) => {
  544. self.stream = Err(MeliError::new(format!(
  545. "Offline: received BYE: {:?}",
  546. response_code
  547. )));
  548. ret.push_str(&response);
  549. }
  550. ImapResponse::No(ref response_code) => {
  551. //FIXME return error
  552. debug!("Received NO response: {:?} {:?}", response_code, response);
  553. ret.push_str(&response);
  554. }
  555. ImapResponse::Bad(ref response_code) => {
  556. //FIXME return error
  557. debug!("Received BAD response: {:?} {:?}", response_code, response);
  558. ret.push_str(&response);
  559. }
  560. _ => {
  561. /*debug!(
  562. "check every line for required_responses: {:#?}",
  563. &required_responses
  564. );*/
  565. for l in response.split_rn() {
  566. /*debug!("check line: {}", &l);*/
  567. if required_responses.check(l) || !self.process_untagged(l).await? {
  568. ret.push_str(l);
  569. }
  570. }
  571. }
  572. }
  573. r.into()
  574. }
  575. ImapProtocol::ManageSieve => {
  576. ret.push_str(&response);
  577. Ok(())
  578. }
  579. }
  580. })
  581. }
  582. pub async fn read_lines(&mut self, ret: &mut String, termination_string: String) -> Result<()> {
  583. self.stream
  584. .as_mut()?
  585. .read_lines(ret, &termination_string, false)
  586. .await?;
  587. Ok(())
  588. }
  589. pub async fn wait_for_continuation_request(&mut self) -> Result<()> {
  590. self.stream
  591. .as_mut()?
  592. .wait_for_continuation_request()
  593. .await?;
  594. Ok(())
  595. }
  596. pub async fn send_command(&mut self, command: &[u8]) -> Result<()> {
  597. if let Err(err) =
  598. try_await(async { self.stream.as_mut()?.send_command(command).await }).await
  599. {
  600. if err.kind.is_network() {
  601. self.connect().await?;
  602. }
  603. Err(err)
  604. } else {
  605. Ok(())
  606. }
  607. }
  608. pub async fn send_literal(&mut self, data: &[u8]) -> Result<()> {
  609. if let Err(err) = try_await(async { self.stream.as_mut()?.send_literal(data).await }).await
  610. {
  611. if err.kind.is_network() {
  612. self.connect().await?;
  613. }
  614. Err(err)
  615. } else {
  616. Ok(())
  617. }
  618. }
  619. pub async fn send_raw(&mut self, raw: &[u8]) -> Result<()> {
  620. if let Err(err) = try_await(async { self.stream.as_mut()?.send_raw(raw).await }).await {
  621. if err.kind.is_network() {
  622. self.connect().await?;
  623. }
  624. Err(err)
  625. } else {
  626. Ok(())
  627. }
  628. }
  629. pub async fn select_mailbox(
  630. &mut self,
  631. mailbox_hash: MailboxHash,
  632. ret: &mut String,
  633. force: bool,
  634. ) -> Result<()> {
  635. if !force && self.stream.as_ref()?.current_mailbox == MailboxSelection::Select(mailbox_hash)
  636. {
  637. return Ok(());
  638. }
  639. self.send_command(
  640. format!(
  641. "SELECT \"{}\"",
  642. self.uid_store.mailboxes.lock().await[&mailbox_hash].imap_path()
  643. )
  644. .as_bytes(),
  645. )
  646. .await?;
  647. self.read_response(ret, RequiredResponses::SELECT_REQUIRED)
  648. .await?;
  649. debug!("select response {}", ret);
  650. self.stream.as_mut()?.current_mailbox = MailboxSelection::Select(mailbox_hash);
  651. Ok(())
  652. }
  653. pub async fn examine_mailbox(
  654. &mut self,
  655. mailbox_hash: MailboxHash,
  656. ret: &mut String,
  657. force: bool,
  658. ) -> Result<()> {
  659. if !force
  660. && self.stream.as_ref()?.current_mailbox == MailboxSelection::Examine(mailbox_hash)
  661. {
  662. return Ok(());
  663. }
  664. self.send_command(
  665. format!(
  666. "EXAMINE \"{}\"",
  667. self.uid_store.mailboxes.lock().await[&mailbox_hash].imap_path()
  668. )
  669. .as_bytes(),
  670. )
  671. .await?;
  672. self.read_response(ret, RequiredResponses::EXAMINE_REQUIRED)
  673. .await?;
  674. debug!("examine response {}", ret);
  675. self.stream.as_mut()?.current_mailbox = MailboxSelection::Examine(mailbox_hash);
  676. Ok(())
  677. }
  678. pub async fn unselect(&mut self) -> Result<()> {
  679. match self.stream.as_mut()?.current_mailbox.take() {
  680. MailboxSelection::Examine(mailbox_hash) |
  681. MailboxSelection::Select(mailbox_hash) =>{
  682. let mut response = String::with_capacity(8 * 1024);
  683. if self
  684. .uid_store
  685. .capabilities
  686. .lock()
  687. .unwrap()
  688. .iter()
  689. .any(|cap| cap.eq_ignore_ascii_case(b"UNSELECT"))
  690. {
  691. self.send_command(b"UNSELECT").await?;
  692. self.read_response(&mut response, RequiredResponses::empty())
  693. .await?;
  694. } else {
  695. /* `RFC3691 - UNSELECT Command` states: "[..] IMAP4 provides this
  696. * functionality (via a SELECT command with a nonexistent mailbox name or
  697. * reselecting the same mailbox with EXAMINE command)[..]
  698. */
  699. self.select_mailbox(mailbox_hash, &mut response, true).await?;
  700. self.examine_mailbox(mailbox_hash, &mut response, true).await?;
  701. }
  702. },
  703. MailboxSelection::None => {},
  704. }
  705. Ok(())
  706. }
  707. pub fn add_refresh_event(&mut self, ev: crate::backends::RefreshEvent) {
  708. if let Some(ref sender) = self.uid_store.sender.read().unwrap().as_ref() {
  709. sender.send(ev);
  710. for ev in self.uid_store.refresh_events.lock().unwrap().drain(..) {
  711. sender.send(ev);
  712. }
  713. } else {
  714. self.uid_store.refresh_events.lock().unwrap().push(ev);
  715. }
  716. }
  717. pub async fn create_uid_msn_cache(
  718. &mut self,
  719. mailbox_hash: MailboxHash,
  720. low: usize,
  721. ) -> Result<()> {
  722. debug_assert!(low > 0);
  723. let mut response = String::new();
  724. self.examine_mailbox(mailbox_hash, &mut response, false)
  725. .await?;
  726. self.send_command(format!("UID SEARCH {}:*", low).as_bytes())
  727. .await?;
  728. self.read_response(&mut response, RequiredResponses::SEARCH)
  729. .await?;
  730. debug!("uid search response {:?}", &response);
  731. let mut msn_index_lck = self.uid_store.msn_index.lock().unwrap();
  732. let msn_index = msn_index_lck.entry(mailbox_hash).or_default();
  733. let _ = msn_index.drain(low - 1..);
  734. msn_index.extend(
  735. debug!(protocol_parser::search_results(response.as_bytes()))?
  736. .1
  737. .into_iter(),
  738. );
  739. Ok(())
  740. }
  741. }
  742. pub struct ImapBlockingConnection {
  743. buf: Vec<u8>,
  744. result: Vec<u8>,
  745. prev_res_length: usize,
  746. pub conn: ImapConnection,
  747. err: Option<String>,
  748. }
  749. impl From<ImapConnection> for ImapBlockingConnection {
  750. fn from(conn: ImapConnection) -> Self {
  751. ImapBlockingConnection {
  752. buf: vec![0; Connection::IO_BUF_SIZE],
  753. conn,
  754. prev_res_length: 0,
  755. result: Vec::with_capacity(8 * 1024),
  756. err: None,
  757. }
  758. }
  759. }
  760. impl ImapBlockingConnection {
  761. pub fn into_conn(self) -> ImapConnection {
  762. self.conn
  763. }
  764. pub fn err(&self) -> Option<&str> {
  765. self.err.as_ref().map(String::as_str)
  766. }
  767. pub fn as_stream<'a>(&'a mut self) -> impl Future<Output = Option<Vec<u8>>> + 'a {
  768. self.result.drain(0..self.prev_res_length);
  769. self.prev_res_length = 0;
  770. let mut break_flag = false;
  771. let mut prev_failure = None;
  772. async move {
  773. if self.conn.stream.is_err() {
  774. debug!(&self.conn.stream);
  775. return None;
  776. }
  777. loop {
  778. if let Some(y) = read(self, &mut break_flag, &mut prev_failure).await {
  779. return Some(y);
  780. }
  781. if break_flag {
  782. return None;
  783. }
  784. }
  785. }
  786. }
  787. }
  788. async fn read(
  789. conn: &mut ImapBlockingConnection,
  790. break_flag: &mut bool,
  791. prev_failure: &mut Option<std::time::Instant>,
  792. ) -> Option<Vec<u8>> {
  793. let ImapBlockingConnection {
  794. ref mut prev_res_length,
  795. ref mut result,
  796. ref mut conn,
  797. ref mut buf,
  798. ref mut err,
  799. } = conn;
  800. match conn.stream.as_mut().unwrap().stream.read(buf).await {
  801. Ok(0) => {
  802. *break_flag = true;
  803. }
  804. Ok(b) => {
  805. result.extend_from_slice(&buf[0..b]);
  806. debug!(unsafe { std::str::from_utf8_unchecked(result) });
  807. if let Some(pos) = result.find(b"\r\n") {
  808. *prev_res_length = pos + b"\r\n".len();
  809. return Some(result[0..*prev_res_length].to_vec());
  810. }
  811. *prev_failure = None;
  812. }
  813. Err(e)
  814. if e.kind() == std::io::ErrorKind::WouldBlock
  815. || e.kind() == std::io::ErrorKind::Interrupted =>
  816. {
  817. debug!(&e);
  818. if let Some(prev_failure) = prev_failure.as_ref() {
  819. if Instant::now().duration_since(*prev_failure)
  820. >= std::time::Duration::new(60 * 5, 0)
  821. {
  822. *err = Some(e.to_string());
  823. *break_flag = true;
  824. }
  825. } else {
  826. *prev_failure = Some(Instant::now());
  827. }
  828. }
  829. Err(e) => {
  830. debug!(&conn.stream);
  831. debug!(&e);
  832. *err = Some(e.to_string());
  833. *break_flag = true;
  834. }
  835. }
  836. None
  837. }