You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.

mbox.rs 22KB


  1. /*
  2. * meli - mailbox module.
  3. *
  4. * Copyright 2017 Manos Pitsidianakis
  5. *
  6. * This file is part of meli.
  7. *
  8. * meli is free software: you can redistribute it and/or modify
  9. * it under the terms of the GNU General Public License as published by
  10. * the Free Software Foundation, either version 3 of the License, or
  11. * (at your option) any later version.
  12. *
  13. * meli is distributed in the hope that it will be useful,
  14. * but WITHOUT ANY WARRANTY; without even the implied warranty of
  15. * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
  16. * GNU General Public License for more details.
  17. *
  18. * You should have received a copy of the GNU General Public License
  19. * along with meli. If not, see <http://www.gnu.org/licenses/>.
  20. */
  21. /*!
  22. * https://wiki2.dovecot.org/MailboxFormat/mbox
  23. */
  24. use crate::async_workers::{Async, AsyncBuilder, AsyncStatus};
  25. use crate::backends::BackendOp;
  26. use crate::backends::FolderHash;
  27. use crate::backends::{
  28. BackendFolder, Folder, MailBackend, RefreshEvent, RefreshEventConsumer, RefreshEventKind,
  29. };
  30. use crate::conf::AccountSettings;
  31. use crate::email::parser::BytesExt;
  32. use crate::email::*;
  33. use crate::error::{MeliError, Result};
  34. use fnv::FnvHashMap;
  35. use libc;
  36. use memmap::{Mmap, Protection};
  37. use nom::{IResult, Needed};
  38. extern crate notify;
  39. use self::notify::{watcher, DebouncedEvent, RecursiveMode, Watcher};
  40. use std::collections::hash_map::DefaultHasher;
  41. use std::fs::File;
  42. use std::hash::{Hash, Hasher};
  43. use std::io::BufReader;
  44. use std::io::Read;
  45. use std::os::unix::io::AsRawFd;
  46. use std::path::{Path, PathBuf};
  47. use std::sync::mpsc::channel;
  48. use std::sync::{Arc, Mutex};
  49. const F_OFD_SETLKW: libc::c_int = 38;
  50. // Open file description locking
  51. // # man fcntl
  52. fn get_rw_lock_blocking(f: &File) {
  53. let fd: libc::c_int = f.as_raw_fd();
  54. let mut flock: libc::flock = libc::flock {
  55. l_type: libc::F_WRLCK as libc::c_short,
  56. l_whence: libc::SEEK_SET as libc::c_short,
  57. l_start: 0,
  58. l_len: 0, /* "Specifying 0 for l_len has the special meaning: lock all bytes starting at the location
  59. specified by l_whence and l_start through to the end of file, no matter how large the file grows." */
  60. l_pid: 0, /* "By contrast with traditional record locks, the l_pid field of that structure must be set to zero when using the commands described below." */
  61. };
  62. let ptr: *mut libc::flock = &mut flock;
  63. let ret_val = unsafe { libc::fcntl(fd, F_OFD_SETLKW, ptr as *mut libc::c_void) };
  64. debug!(&ret_val);
  65. assert!(-1 != ret_val);
  66. }
  67. macro_rules! get_path_hash {
  68. ($path:expr) => {{
  69. let mut hasher = DefaultHasher::new();
  70. $path.hash(&mut hasher);
  71. hasher.finish()
  72. }};
  73. }
  74. #[derive(Debug)]
  75. struct MboxFolder {
  76. hash: FolderHash,
  77. name: String,
  78. path: PathBuf,
  79. content: Vec<u8>,
  80. children: Vec<FolderHash>,
  81. parent: Option<FolderHash>,
  82. }
  83. impl BackendFolder for MboxFolder {
  84. fn hash(&self) -> FolderHash {
  85. self.hash
  86. }
  87. fn name(&self) -> &str {
  88. self.name.as_str()
  89. }
  90. fn path(&self) -> &str {
  91. /* We know it's valid UTF-8 because we supplied it */
  92. self.path.to_str().unwrap()
  93. }
  94. fn change_name(&mut self, s: &str) {
  95. self.name = s.to_string();
  96. }
  97. fn clone(&self) -> Folder {
  98. Box::new(MboxFolder {
  99. hash: self.hash,
  100. name: self.name.clone(),
  101. path: self.path.clone(),
  102. content: self.content.clone(),
  103. children: self.children.clone(),
  104. parent: self.parent,
  105. })
  106. }
  107. fn children(&self) -> &Vec<FolderHash> {
  108. &self.children
  109. }
  110. fn parent(&self) -> Option<FolderHash> {
  111. self.parent
  112. }
  113. }
  114. /// `BackendOp` implementor for Mbox
  115. #[derive(Debug, Default)]
  116. pub struct MboxOp {
  117. hash: EnvelopeHash,
  118. path: PathBuf,
  119. offset: Offset,
  120. length: Length,
  121. slice: Option<Mmap>,
  122. }
  123. impl MboxOp {
  124. pub fn new(hash: EnvelopeHash, path: &Path, offset: Offset, length: Length) -> Self {
  125. MboxOp {
  126. hash,
  127. path: path.to_path_buf(),
  128. slice: None,
  129. offset,
  130. length,
  131. }
  132. }
  133. }
  134. impl BackendOp for MboxOp {
  135. fn description(&self) -> String {
  136. String::new()
  137. }
  138. fn as_bytes(&mut self) -> Result<&[u8]> {
  139. if self.slice.is_none() {
  140. self.slice = Some(Mmap::open_path(&self.path, Protection::Read)?);
  141. }
  142. /* Unwrap is safe since we use ? above. */
  143. Ok(unsafe {
  144. &self.slice.as_ref().unwrap().as_slice()[self.offset..self.offset + self.length]
  145. })
  146. }
  147. fn fetch_headers(&mut self) -> Result<&[u8]> {
  148. let raw = self.as_bytes()?;
  149. let result = parser::headers_raw(raw).to_full_result()?;
  150. Ok(result)
  151. }
  152. fn fetch_body(&mut self) -> Result<&[u8]> {
  153. let raw = self.as_bytes()?;
  154. let result = parser::body_raw(raw).to_full_result()?;
  155. Ok(result)
  156. }
  157. fn fetch_flags(&self) -> Flag {
  158. let mut flags = Flag::empty();
  159. let file = match std::fs::OpenOptions::new()
  160. .read(true)
  161. .write(true)
  162. .open(&self.path)
  163. {
  164. Ok(f) => f,
  165. Err(e) => {
  166. debug!(e);
  167. return flags;
  168. }
  169. };
  170. get_rw_lock_blocking(&file);
  171. let mut buf_reader = BufReader::new(file);
  172. let mut contents = Vec::new();
  173. if let Err(e) = buf_reader.read_to_end(&mut contents) {
  174. debug!(e);
  175. return flags;
  176. };
  177. if let Ok(headers) = parser::headers_raw(contents.as_slice()).to_full_result() {
  178. if let Some(start) = headers.find(b"Status:") {
  179. if let Some(end) = headers[start..].find(b"\n") {
  180. let start = start + b"Status:".len();
  181. let status = headers[start..start + end].trim();
  182. if status.contains(&b'F') {
  183. flags.set(Flag::FLAGGED, true);
  184. }
  185. if status.contains(&b'A') {
  186. flags.set(Flag::REPLIED, true);
  187. }
  188. if status.contains(&b'R') {
  189. flags.set(Flag::SEEN, true);
  190. }
  191. if status.contains(&b'D') {
  192. flags.set(Flag::TRASHED, true);
  193. }
  194. if status.contains(&b'T') {
  195. flags.set(Flag::DRAFT, true);
  196. }
  197. }
  198. }
  199. if let Some(start) = headers.find(b"X-Status:") {
  200. let start = start + b"X-Status:".len();
  201. if let Some(end) = headers[start..].find(b"\n") {
  202. let status = headers[start..start + end].trim();
  203. if status.contains(&b'F') {
  204. flags.set(Flag::FLAGGED, true);
  205. }
  206. if status.contains(&b'A') {
  207. flags.set(Flag::REPLIED, true);
  208. }
  209. if status.contains(&b'R') {
  210. flags.set(Flag::SEEN, true);
  211. }
  212. if status.contains(&b'D') {
  213. flags.set(Flag::TRASHED, true);
  214. }
  215. if status.contains(&b'T') {
  216. flags.set(Flag::DRAFT, true);
  217. }
  218. }
  219. }
  220. }
  221. flags
  222. }
  223. fn set_flag(&mut self, envelope: &mut Envelope, flag: Flag) -> Result<()> {
  224. Ok(())
  225. }
  226. }
  227. pub fn mbox_parse(
  228. index: Arc<Mutex<FnvHashMap<EnvelopeHash, (Offset, Length)>>>,
  229. input: &[u8],
  230. file_offset: usize,
  231. ) -> IResult<&[u8], Vec<Envelope>> {
  232. if input.is_empty() {
  233. return IResult::Incomplete(Needed::Unknown);
  234. }
  235. let mut input = input;
  236. let mut offset = 0;
  237. let mut index = index.lock().unwrap();
  238. let mut envelopes = Vec::with_capacity(32);
  239. while !input.is_empty() {
  240. let next_offset: Option<usize> = input.find(b"\n\nFrom ");
  241. if let Some(len) = next_offset {
  242. match Envelope::from_bytes(&input[..len]) {
  243. Ok(mut env) => {
  244. let mut flags = Flag::empty();
  245. if env.other_headers().contains_key("Status") {
  246. if env.other_headers()["Status"].contains("F") {
  247. flags.set(Flag::FLAGGED, true);
  248. }
  249. if env.other_headers()["Status"].contains("A") {
  250. flags.set(Flag::REPLIED, true);
  251. }
  252. if env.other_headers()["Status"].contains("R") {
  253. flags.set(Flag::SEEN, true);
  254. }
  255. if env.other_headers()["Status"].contains("D") {
  256. flags.set(Flag::TRASHED, true);
  257. }
  258. }
  259. if env.other_headers().contains_key("X-Status") {
  260. if env.other_headers()["X-Status"].contains("F") {
  261. flags.set(Flag::FLAGGED, true);
  262. }
  263. if env.other_headers()["X-Status"].contains("A") {
  264. flags.set(Flag::REPLIED, true);
  265. }
  266. if env.other_headers()["X-Status"].contains("R") {
  267. flags.set(Flag::SEEN, true);
  268. }
  269. if env.other_headers()["X-Status"].contains("D") {
  270. flags.set(Flag::TRASHED, true);
  271. }
  272. if env.other_headers()["X-Status"].contains("T") {
  273. flags.set(Flag::DRAFT, true);
  274. }
  275. }
  276. env.set_flags(flags);
  277. index.insert(env.hash(), (offset + file_offset, len));
  278. envelopes.push(env);
  279. }
  280. Err(_) => {
  281. debug!("Could not parse mail at byte offset {}", offset);
  282. }
  283. }
  284. offset += len + 2;
  285. input = &input[len + 2..];
  286. } else {
  287. match Envelope::from_bytes(input) {
  288. Ok(mut env) => {
  289. let mut flags = Flag::empty();
  290. if env.other_headers().contains_key("Status") {
  291. if env.other_headers()["Status"].contains("F") {
  292. flags.set(Flag::FLAGGED, true);
  293. }
  294. if env.other_headers()["Status"].contains("A") {
  295. flags.set(Flag::REPLIED, true);
  296. }
  297. if env.other_headers()["Status"].contains("R") {
  298. flags.set(Flag::SEEN, true);
  299. }
  300. if env.other_headers()["Status"].contains("D") {
  301. flags.set(Flag::TRASHED, true);
  302. }
  303. }
  304. if env.other_headers().contains_key("X-Status") {
  305. if env.other_headers()["X-Status"].contains("F") {
  306. flags.set(Flag::FLAGGED, true);
  307. }
  308. if env.other_headers()["X-Status"].contains("A") {
  309. flags.set(Flag::REPLIED, true);
  310. }
  311. if env.other_headers()["X-Status"].contains("R") {
  312. flags.set(Flag::SEEN, true);
  313. }
  314. if env.other_headers()["X-Status"].contains("D") {
  315. flags.set(Flag::TRASHED, true);
  316. }
  317. if env.other_headers()["X-Status"].contains("T") {
  318. flags.set(Flag::DRAFT, true);
  319. }
  320. }
  321. env.set_flags(flags);
  322. index.insert(env.hash(), (offset + file_offset, input.len()));
  323. envelopes.push(env);
  324. }
  325. Err(_) => {
  326. debug!("Could not parse mail at byte offset {}", offset);
  327. }
  328. }
  329. break;
  330. }
  331. }
  332. return IResult::Done(&[], envelopes);
  333. }
  334. type Offset = usize;
  335. type Length = usize;
  336. /// Mbox backend
  337. #[derive(Debug, Default)]
  338. pub struct MboxType {
  339. path: PathBuf,
  340. index: Arc<Mutex<FnvHashMap<EnvelopeHash, (Offset, Length)>>>,
  341. folders: Arc<Mutex<FnvHashMap<FolderHash, MboxFolder>>>,
  342. }
  343. impl MailBackend for MboxType {
  344. fn get(&mut self, folder: &Folder) -> Async<Result<Vec<Envelope>>> {
  345. let mut w = AsyncBuilder::new();
  346. let handle = {
  347. let tx = w.tx();
  348. let index = self.index.clone();
  349. let folder_path = folder.path().to_string();
  350. let folder_hash = folder.hash();
  351. let folders = self.folders.clone();
  352. let closure = move || {
  353. let tx = tx.clone();
  354. let index = index.clone();
  355. let file = match std::fs::OpenOptions::new()
  356. .read(true)
  357. .write(true)
  358. .open(&folder_path)
  359. {
  360. Ok(f) => f,
  361. Err(e) => {
  362. tx.send(AsyncStatus::Payload(Err(MeliError::from(e))));
  363. return;
  364. }
  365. };
  366. get_rw_lock_blocking(&file);
  367. let mut buf_reader = BufReader::new(file);
  368. let mut contents = Vec::new();
  369. if let Err(e) = buf_reader.read_to_end(&mut contents) {
  370. tx.send(AsyncStatus::Payload(Err(MeliError::from(e))));
  371. return;
  372. };
  373. let payload = mbox_parse(index, contents.as_slice(), 0)
  374. .to_full_result()
  375. .map_err(|e| MeliError::from(e));
  376. {
  377. let mut folder_lock = folders.lock().unwrap();
  378. folder_lock
  379. .entry(folder_hash)
  380. .and_modify(|f| f.content = contents);
  381. }
  382. tx.send(AsyncStatus::Payload(payload));
  383. };
  384. Box::new(closure)
  385. };
  386. w.build(handle)
  387. }
  388. fn watch(&self, sender: RefreshEventConsumer) -> Result<()> {
  389. let (tx, rx) = channel();
  390. let mut watcher = watcher(tx, std::time::Duration::from_secs(10)).unwrap();
  391. for f in self.folders.lock().unwrap().values() {
  392. watcher.watch(&f.path, RecursiveMode::Recursive).unwrap();
  393. debug!("watching {:?}", f.path.as_path());
  394. }
  395. let index = self.index.clone();
  396. let folders = self.folders.clone();
  397. std::thread::Builder::new()
  398. .name(format!(
  399. "watching {}",
  400. self.path.file_name().unwrap().to_str().unwrap()
  401. ))
  402. .spawn(move || {
  403. // Move `watcher` in the closure's scope so that it doesn't get dropped.
  404. let _watcher = watcher;
  405. let index = index;
  406. let folders = folders;
  407. loop {
  408. match rx.recv() {
  409. /*
  410. * Event types:
  411. *
  412. * pub enum RefreshEventKind {
  413. * Update(EnvelopeHash, Envelope), // Old hash, new envelope
  414. * Create(Envelope),
  415. * Remove(EnvelopeHash),
  416. * Rescan,
  417. * }
  418. */
  419. Ok(event) => match event {
  420. /* Update */
  421. DebouncedEvent::NoticeWrite(pathbuf)
  422. | DebouncedEvent::Write(pathbuf) => {
  423. let folder_hash = get_path_hash!(&pathbuf);
  424. let file = match std::fs::OpenOptions::new()
  425. .read(true)
  426. .write(true)
  427. .open(&pathbuf)
  428. {
  429. Ok(f) => f,
  430. Err(_) => {
  431. continue;
  432. }
  433. };
  434. get_rw_lock_blocking(&file);
  435. let mut folder_lock = folders.lock().unwrap();
  436. let mut buf_reader = BufReader::new(file);
  437. let mut contents = Vec::new();
  438. if let Err(e) = buf_reader.read_to_end(&mut contents) {
  439. debug!(e);
  440. continue;
  441. };
  442. if contents
  443. .starts_with(folder_lock[&folder_hash].content.as_slice())
  444. {
  445. if let Ok(envelopes) = mbox_parse(
  446. index.clone(),
  447. &contents[folder_lock[&folder_hash].content.len()..],
  448. folder_lock[&folder_hash].content.len(),
  449. )
  450. .to_full_result()
  451. {
  452. for env in envelopes {
  453. sender.send(RefreshEvent {
  454. hash: folder_hash,
  455. kind: RefreshEventKind::Create(Box::new(env)),
  456. });
  457. }
  458. }
  459. } else {
  460. sender.send(RefreshEvent {
  461. hash: folder_hash,
  462. kind: RefreshEventKind::Rescan,
  463. });
  464. }
  465. folder_lock
  466. .entry(folder_hash)
  467. .and_modify(|f| f.content = contents);
  468. }
  469. /* Remove */
  470. DebouncedEvent::NoticeRemove(pathbuf)
  471. | DebouncedEvent::Remove(pathbuf) => {
  472. panic!(format!("mbox folder {} was removed.", pathbuf.display()))
  473. }
  474. /* Envelope hasn't changed */
  475. DebouncedEvent::Rename(src, dest) => panic!(format!(
  476. "mbox folder {} was renamed to {}.",
  477. src.display(),
  478. dest.display()
  479. )),
  480. /* Trigger rescan of folder */
  481. DebouncedEvent::Rescan => {
  482. /* Actually should rescan all folders */
  483. unreachable!("Unimplemented: rescan of all folders in MboxType")
  484. }
  485. _ => {}
  486. },
  487. Err(e) => debug!("watch error: {:?}", e),
  488. }
  489. }
  490. })?;
  491. Ok(())
  492. }
  493. fn folders(&self) -> FnvHashMap<FolderHash, Folder> {
  494. self.folders
  495. .lock()
  496. .unwrap()
  497. .iter()
  498. .map(|(h, f)| (*h, f.clone() as Folder))
  499. .collect()
  500. }
  501. fn operation(&self, hash: EnvelopeHash, _folder_hash: FolderHash) -> Box<BackendOp> {
  502. let (offset, length) = {
  503. let index = self.index.lock().unwrap();
  504. index[&hash]
  505. };
  506. Box::new(MboxOp::new(hash, self.path.as_path(), offset, length))
  507. }
  508. fn save(&self, bytes: &[u8], folder: &str) -> Result<()> {
  509. unimplemented!();
  510. }
  511. }
  512. impl MboxType {
  513. pub fn new(s: &AccountSettings) -> Self {
  514. let path = Path::new(s.root_folder.as_str());
  515. if !path.exists() {
  516. panic!(
  517. "\"root_folder\" {} for account {} is not a valid path.",
  518. s.root_folder.as_str(),
  519. s.name()
  520. );
  521. }
  522. let ret = MboxType {
  523. path: PathBuf::from(path),
  524. ..Default::default()
  525. };
  526. let name: String = ret
  527. .path
  528. .file_name()
  529. .map(|f| f.to_string_lossy().into())
  530. .unwrap_or(String::new());
  531. let hash = get_path_hash!(path);
  532. ret.folders.lock().unwrap().insert(
  533. hash,
  534. MboxFolder {
  535. hash,
  536. path: PathBuf::from(path),
  537. name,
  538. content: Vec::new(),
  539. children: Vec::new(),
  540. parent: None,
  541. },
  542. );
  543. /*
  544. /* Look for other mailboxes */
  545. let parent_folder = Path::new(path).parent().unwrap();
  546. let read_dir = std::fs::read_dir(parent_folder);
  547. if read_dir.is_ok() {
  548. for f in read_dir.unwrap() {
  549. if f.is_err() {
  550. continue;
  551. }
  552. let f = f.unwrap().path();
  553. if f.is_file() && f != path {
  554. let name: String = f
  555. .file_name()
  556. .map(|f| f.to_string_lossy().into())
  557. .unwrap_or(String::new());
  558. let hash = get_path_hash!(f);
  559. ret.folders.lock().unwrap().insert(
  560. hash,
  561. MboxFolder {
  562. hash,
  563. path: f,
  564. name,
  565. content: Vec::new(),
  566. children: Vec::new(),
  567. parent: None,
  568. },
  569. );
  570. }
  571. }
  572. }
  573. */
  574. ret
  575. }
  576. }