You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.
 
 
 
 

934 lines
40 KiB

  1. /*
  2. * meli - mailbox module.
  3. *
  4. * Copyright 2017 Manos Pitsidianakis
  5. *
  6. * This file is part of meli.
  7. *
  8. * meli is free software: you can redistribute it and/or modify
  9. * it under the terms of the GNU General Public License as published by
  10. * the Free Software Foundation, either version 3 of the License, or
  11. * (at your option) any later version.
  12. *
  13. * meli is distributed in the hope that it will be useful,
  14. * but WITHOUT ANY WARRANTY; without even the implied warranty of
  15. * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
  16. * GNU General Public License for more details.
  17. *
  18. * You should have received a copy of the GNU General Public License
  19. * along with meli. If not, see <http://www.gnu.org/licenses/>.
  20. */
  21. use super::{
  22. BackendFolder, BackendOp, Folder, FolderHash, MailBackend, RefreshEvent, RefreshEventConsumer,
  23. RefreshEventKind::*,
  24. };
  25. use super::{MaildirFolder, MaildirOp};
  26. use crate::async_workers::*;
  27. use crate::conf::AccountSettings;
  28. use crate::email::{Envelope, EnvelopeHash, Flag};
  29. use crate::error::{MeliError, Result};
  30. use crate::shellexpand::ShellExpandTrait;
  31. extern crate notify;
  32. use self::notify::{watcher, DebouncedEvent, RecursiveMode, Watcher};
  33. use std::time::Duration;
  34. use std::sync::mpsc::channel;
  35. //use std::sync::mpsc::sync_channel;
  36. //use std::sync::mpsc::SyncSender;
  37. //use std::time::Duration;
  38. use fnv::{FnvHashMap, FnvHashSet, FnvHasher};
  39. use std::collections::hash_map::DefaultHasher;
  40. use std::ffi::OsStr;
  41. use std::fs;
  42. use std::hash::{Hash, Hasher};
  43. use std::io::{self, Read, Write};
  44. use std::ops::{Deref, DerefMut};
  45. use std::os::unix::fs::PermissionsExt;
  46. use std::path::{Component, Path, PathBuf};
  47. use std::result;
  48. use std::sync::{Arc, Mutex};
  49. use std::thread;
  50. #[derive(Clone, Debug, PartialEq)]
  51. pub(super) enum PathMod {
  52. Path(PathBuf),
  53. Hash(EnvelopeHash),
  54. }
  55. #[derive(Debug, Default)]
  56. pub struct MaildirPath {
  57. pub(super) buf: PathBuf,
  58. pub(super) modified: Option<PathMod>,
  59. pub(super) removed: bool,
  60. }
  61. impl Deref for MaildirPath {
  62. type Target = PathBuf;
  63. fn deref(&self) -> &PathBuf {
  64. assert!(!(self.removed && self.modified.is_none()));
  65. &self.buf
  66. }
  67. }
  68. impl DerefMut for MaildirPath {
  69. fn deref_mut(&mut self) -> &mut PathBuf {
  70. assert!(!(self.removed && self.modified.is_none()));
  71. &mut self.buf
  72. }
  73. }
  74. impl From<PathBuf> for MaildirPath {
  75. fn from(val: PathBuf) -> MaildirPath {
  76. MaildirPath {
  77. buf: val,
  78. modified: None,
  79. removed: false,
  80. }
  81. }
  82. }
  83. #[derive(Debug, Default)]
  84. pub struct HashIndex {
  85. index: FnvHashMap<EnvelopeHash, MaildirPath>,
  86. hash: FolderHash,
  87. }
  88. impl Deref for HashIndex {
  89. type Target = FnvHashMap<EnvelopeHash, MaildirPath>;
  90. fn deref(&self) -> &FnvHashMap<EnvelopeHash, MaildirPath> {
  91. &self.index
  92. }
  93. }
  94. impl DerefMut for HashIndex {
  95. fn deref_mut(&mut self) -> &mut FnvHashMap<EnvelopeHash, MaildirPath> {
  96. &mut self.index
  97. }
  98. }
  99. pub type HashIndexes = Arc<Mutex<FnvHashMap<FolderHash, HashIndex>>>;
  100. /// Maildir backend https://cr.yp.to/proto/maildir.html
  101. #[derive(Debug)]
  102. pub struct MaildirType {
  103. name: String,
  104. folders: FnvHashMap<FolderHash, MaildirFolder>,
  105. folder_index: Arc<Mutex<FnvHashMap<EnvelopeHash, FolderHash>>>,
  106. hash_indexes: HashIndexes,
  107. path: PathBuf,
  108. }
  109. macro_rules! path_is_new {
  110. ($path:expr) => {
  111. if $path.is_dir() {
  112. false
  113. } else {
  114. let mut iter = $path.components().rev();
  115. iter.next();
  116. iter.next() == Some(Component::Normal(OsStr::new("new")))
  117. }
  118. };
  119. }
  120. #[macro_export]
  121. macro_rules! get_path_hash {
  122. ($path:expr) => {{
  123. let mut path = $path.clone();
  124. if path.is_dir() {
  125. if path.ends_with("cur") | path.ends_with("new") {
  126. path.pop();
  127. }
  128. } else {
  129. path.pop();
  130. path.pop();
  131. };
  132. let mut hasher = DefaultHasher::new();
  133. path.hash(&mut hasher);
  134. hasher.finish()
  135. }};
  136. }
  137. pub(super) fn get_file_hash(file: &Path) -> EnvelopeHash {
  138. /*
  139. let mut buf = Vec::with_capacity(2048);
  140. let mut f = fs::File::open(&file).unwrap_or_else(|_| panic!("Can't open {}", file.display()));
  141. f.read_to_end(&mut buf)
  142. .unwrap_or_else(|_| panic!("Can't read {}", file.display()));
  143. let mut hasher = FnvHasher::default();
  144. hasher.write(&buf);
  145. hasher.finish()
  146. */
  147. let mut hasher = FnvHasher::default();
  148. file.hash(&mut hasher);
  149. hasher.finish()
  150. }
  151. fn move_to_cur(p: PathBuf) -> Result<PathBuf> {
  152. let mut new = p.clone();
  153. let file_name = p.to_string_lossy();
  154. let slash_pos = file_name.bytes().rposition(|c| c == b'/').unwrap() + 1;
  155. new.pop();
  156. new.pop();
  157. new.push("cur");
  158. new.push(&file_name[slash_pos..]);
  159. if !file_name.ends_with(":2,") {
  160. new.set_extension(":2,");
  161. }
  162. debug!("moved to cur: {}", new.display());
  163. fs::rename(&p, &new)?;
  164. Ok(new)
  165. }
  166. impl MailBackend for MaildirType {
  167. fn is_online(&self) -> bool {
  168. true
  169. }
  170. fn folders(&self) -> Result<FnvHashMap<FolderHash, Folder>> {
  171. Ok(self
  172. .folders
  173. .iter()
  174. .map(|(h, f)| (*h, f.clone() as Folder))
  175. .collect())
  176. }
  177. fn get(&mut self, folder: &Folder) -> Async<Result<Vec<Envelope>>> {
  178. self.multicore(4, folder)
  179. }
  180. fn watch(
  181. &self,
  182. sender: RefreshEventConsumer,
  183. work_context: WorkContext,
  184. ) -> Result<std::thread::ThreadId> {
  185. let (tx, rx) = channel();
  186. let mut watcher = watcher(tx, Duration::from_secs(2)).unwrap();
  187. let root_path = self.path.to_path_buf();
  188. watcher.watch(&root_path, RecursiveMode::Recursive).unwrap();
  189. let cache_dir = xdg::BaseDirectories::with_profile("meli", &self.name).unwrap();
  190. debug!("watching {:?}", root_path);
  191. let hash_indexes = self.hash_indexes.clone();
  192. let folder_index = self.folder_index.clone();
  193. let handle = thread::Builder::new()
  194. .name("folder watch".to_string())
  195. .spawn(move || {
  196. // Move `watcher` in the closure's scope so that it doesn't get dropped.
  197. let _watcher = watcher;
  198. let _work_context = work_context;
  199. loop {
  200. match rx.recv() {
  201. /*
  202. * Event types:
  203. *
  204. * pub enum RefreshEventKind {
  205. * Update(EnvelopeHash, Envelope), // Old hash, new envelope
  206. * Create(Envelope),
  207. * Remove(EnvelopeHash),
  208. * Rescan,
  209. * }
  210. */
  211. Ok(event) => match event {
  212. /* Create */
  213. DebouncedEvent::Create(mut pathbuf) => {
  214. debug!("DebouncedEvent::Create(path = {:?}", pathbuf);
  215. if path_is_new!(pathbuf) {
  216. debug!("path_is_new");
  217. /* This creates a Rename event that we will receive later */
  218. pathbuf = match move_to_cur(pathbuf) {
  219. Ok(p) => p,
  220. Err(e) => {
  221. debug!("error: {}", e.to_string());
  222. continue;
  223. }
  224. };
  225. }
  226. let folder_hash = get_path_hash!(pathbuf);
  227. let file_name = pathbuf
  228. .as_path()
  229. .strip_prefix(&root_path)
  230. .unwrap()
  231. .to_path_buf();
  232. if let Some(env) = add_path_to_index(
  233. &hash_indexes,
  234. folder_hash,
  235. pathbuf.as_path(),
  236. &cache_dir,
  237. file_name,
  238. ) {
  239. folder_index.lock().unwrap().insert(env.hash(),folder_hash);
  240. debug!(
  241. "Create event {} {} {}",
  242. env.hash(),
  243. env.subject(),
  244. pathbuf.display()
  245. );
  246. sender.send(RefreshEvent {
  247. hash: folder_hash,
  248. kind: Create(Box::new(env)),
  249. });
  250. }
  251. }
  252. /* Update */
  253. DebouncedEvent::NoticeWrite(pathbuf)
  254. | DebouncedEvent::Write(pathbuf) => {
  255. debug!("DebouncedEvent::Write(path = {:?}", &pathbuf);
  256. let folder_hash = get_path_hash!(pathbuf);
  257. let mut hash_indexes_lock = hash_indexes.lock().unwrap();
  258. let index_lock =
  259. &mut hash_indexes_lock.entry(folder_hash).or_default();
  260. let file_name = pathbuf
  261. .as_path()
  262. .strip_prefix(&root_path)
  263. .unwrap()
  264. .to_path_buf();
  265. /* Linear search in hash_index to find old hash */
  266. let old_hash: EnvelopeHash = {
  267. if let Some((k, v)) =
  268. index_lock.iter_mut().find(|(_, v)| *v.buf == pathbuf)
  269. {
  270. //TODO FIXME This doesn't make sense?
  271. *v = pathbuf.clone().into();
  272. *k
  273. } else {
  274. drop(index_lock);
  275. drop(hash_indexes_lock);
  276. /* Did we just miss a Create event? In any case, create
  277. * envelope. */
  278. if let Some(env) = add_path_to_index(
  279. &hash_indexes,
  280. folder_hash,
  281. pathbuf.as_path(),
  282. &cache_dir,
  283. file_name,
  284. ) {
  285. folder_index.lock().unwrap().insert(env.hash(),folder_hash);
  286. sender.send(RefreshEvent {
  287. hash: folder_hash,
  288. kind: Create(Box::new(env)),
  289. });
  290. }
  291. return;
  292. }
  293. };
  294. let new_hash: EnvelopeHash = get_file_hash(pathbuf.as_path());
  295. if index_lock.get_mut(&new_hash).is_none() {
  296. debug!("write notice");
  297. let op = Box::new(MaildirOp::new(
  298. new_hash,
  299. hash_indexes.clone(),
  300. folder_hash,
  301. ));
  302. if let Some(env) = Envelope::from_token(op, new_hash) {
  303. debug!("{}\t{:?}", new_hash, &pathbuf);
  304. debug!(
  305. "hash {}, path: {:?} couldn't be parsed",
  306. new_hash, &pathbuf
  307. );
  308. index_lock.insert(new_hash, pathbuf.into());
  309. /* Send Write notice */
  310. sender.send(RefreshEvent {
  311. hash: folder_hash,
  312. kind: Update(old_hash, Box::new(env)),
  313. });
  314. }
  315. }
  316. }
  317. /* Remove */
  318. DebouncedEvent::NoticeRemove(pathbuf)
  319. | DebouncedEvent::Remove(pathbuf) => {
  320. debug!("DebouncedEvent::Remove(path = {:?}", pathbuf);
  321. let folder_hash = get_path_hash!(pathbuf);
  322. let mut hash_indexes_lock = hash_indexes.lock().unwrap();
  323. let index_lock = hash_indexes_lock.entry(folder_hash).or_default();
  324. let hash: EnvelopeHash = if let Some((k, _)) =
  325. index_lock.iter().find(|(_, v)| *v.buf == pathbuf)
  326. {
  327. *k
  328. } else {
  329. debug!("removed but not contained in index");
  330. continue;
  331. };
  332. if let Some(ref modif) = &index_lock[&hash].modified {
  333. match modif {
  334. PathMod::Path(path) => debug!(
  335. "envelope {} has modified path set {}",
  336. hash,
  337. path.display()
  338. ),
  339. PathMod::Hash(hash) => debug!(
  340. "envelope {} has modified path set {}",
  341. hash,
  342. &index_lock[&hash].buf.display()
  343. ),
  344. }
  345. continue;
  346. }
  347. index_lock.entry(hash).and_modify(|e| {
  348. e.removed = true;
  349. });
  350. sender.send(RefreshEvent {
  351. hash: folder_hash,
  352. kind: Remove(hash),
  353. });
  354. }
  355. /* Envelope hasn't changed */
  356. DebouncedEvent::Rename(src, dest) => {
  357. debug!(
  358. "DebouncedEvent::Rename(src = {:?}, dest = {:?})",
  359. src, dest
  360. );
  361. let folder_hash = get_path_hash!(src);
  362. let old_hash: EnvelopeHash = get_file_hash(src.as_path());
  363. let new_hash: EnvelopeHash = get_file_hash(dest.as_path());
  364. let mut hash_indexes_lock = hash_indexes.lock().unwrap();
  365. let index_lock = hash_indexes_lock.entry(folder_hash).or_default();
  366. if index_lock.contains_key(&old_hash)
  367. && !index_lock[&old_hash].removed
  368. {
  369. debug!("contains_old_key");
  370. index_lock.entry(old_hash).and_modify(|e| {
  371. debug!(&e.modified);
  372. e.modified = Some(PathMod::Hash(new_hash));
  373. });
  374. sender.send(RefreshEvent {
  375. hash: get_path_hash!(dest),
  376. kind: Rename(old_hash, new_hash),
  377. });
  378. folder_index.lock().unwrap().insert(new_hash,get_path_hash!(dest) );
  379. index_lock.insert(new_hash, dest.into());
  380. continue;
  381. } else if !index_lock.contains_key(&new_hash)
  382. || index_lock
  383. .get(&old_hash)
  384. .map(|e| e.removed)
  385. .unwrap_or(false)
  386. {
  387. if index_lock
  388. .get(&old_hash)
  389. .map(|e| e.removed)
  390. .unwrap_or(false)
  391. {
  392. index_lock.entry(old_hash).and_modify(|e| {
  393. e.modified = Some(PathMod::Hash(new_hash));
  394. e.removed = false;
  395. });
  396. debug!("contains_old_key, key was marked as removed (by external source)");
  397. } else {
  398. debug!("not contains_new_key");
  399. }
  400. let file_name = dest
  401. .as_path()
  402. .strip_prefix(&root_path)
  403. .unwrap()
  404. .to_path_buf();
  405. debug!("filename = {:?}", file_name);
  406. drop(index_lock);
  407. drop(hash_indexes_lock);
  408. if let Some(env) = add_path_to_index(
  409. &hash_indexes,
  410. folder_hash,
  411. dest.as_path(),
  412. &cache_dir,
  413. file_name,
  414. ) {
  415. folder_index.lock().unwrap().insert(env.hash(),folder_hash);
  416. debug!(
  417. "Create event {} {} {}",
  418. env.hash(),
  419. env.subject(),
  420. dest.display()
  421. );
  422. sender.send(RefreshEvent {
  423. hash: folder_hash,
  424. kind: Create(Box::new(env)),
  425. });
  426. continue;
  427. } else {
  428. debug!("not valid email");
  429. }
  430. } else {
  431. sender.send(RefreshEvent {
  432. hash: get_path_hash!(dest),
  433. kind: Rename(old_hash, new_hash),
  434. });
  435. debug!("contains_new_key");
  436. }
  437. /* Maybe a re-read should be triggered here just to be safe.
  438. sender.send(RefreshEvent {
  439. hash: get_path_hash!(dest),
  440. kind: Rescan,
  441. });
  442. */
  443. }
  444. /* Trigger rescan of folder */
  445. DebouncedEvent::Rescan => {
  446. /* Actually should rescan all folders */
  447. unreachable!("Unimplemented: rescan of all folders in MaildirType")
  448. }
  449. _ => {}
  450. },
  451. Err(e) => debug!("watch error: {:?}", e),
  452. }
  453. }
  454. })?;
  455. Ok(handle.thread().id())
  456. }
  457. fn operation(&self, hash: EnvelopeHash) -> Box<dyn BackendOp> {
  458. Box::new(MaildirOp::new(
  459. hash,
  460. self.hash_indexes.clone(),
  461. self.folder_index.lock().unwrap()[&hash],
  462. ))
  463. }
  464. fn save(&self, bytes: &[u8], folder: &str, flags: Option<Flag>) -> Result<()> {
  465. for f in self.folders.values() {
  466. if f.name == folder || f.path.to_str().unwrap() == folder {
  467. return MaildirType::save_to_folder(f.fs_path.clone(), bytes, flags);
  468. }
  469. }
  470. Err(MeliError::new(format!(
  471. "'{}' is not a valid folder.",
  472. folder
  473. )))
  474. }
  475. fn as_any(&self) -> &dyn::std::any::Any {
  476. self
  477. }
  478. }
  479. impl MaildirType {
  480. pub fn new(
  481. settings: &AccountSettings,
  482. is_subscribed: Box<dyn Fn(&str) -> bool>,
  483. ) -> Result<Box<dyn MailBackend>> {
  484. let mut folders: FnvHashMap<FolderHash, MaildirFolder> = Default::default();
  485. fn recurse_folders<P: AsRef<Path>>(
  486. folders: &mut FnvHashMap<FolderHash, MaildirFolder>,
  487. settings: &AccountSettings,
  488. p: P,
  489. ) -> Result<Vec<FolderHash>> {
  490. if !p.as_ref().exists() || !p.as_ref().is_dir() {
  491. return Err(MeliError::new(format!(
  492. "Configuration error: Path \"{}\" {}",
  493. p.as_ref().display(),
  494. if !p.as_ref().exists() {
  495. "does not exist."
  496. } else {
  497. "is not a directory."
  498. }
  499. )));
  500. }
  501. let mut children = Vec::new();
  502. for mut f in fs::read_dir(p).unwrap() {
  503. 'entries: for f in f.iter_mut() {
  504. {
  505. let path = f.path();
  506. if path.ends_with("cur") || path.ends_with("new") || path.ends_with("tmp") {
  507. continue 'entries;
  508. }
  509. if path.is_dir() {
  510. if let Ok(mut f) = MaildirFolder::new(
  511. path.to_str().unwrap().to_string(),
  512. path.file_name().unwrap().to_str().unwrap().to_string(),
  513. None,
  514. Vec::new(),
  515. &settings,
  516. ) {
  517. f.children = recurse_folders(folders, settings, &path)?;
  518. f.children
  519. .iter()
  520. .map(|c| folders.get_mut(c).map(|f| f.parent = Some(f.hash)))
  521. .count();
  522. children.push(f.hash);
  523. folders.insert(f.hash, f);
  524. }
  525. }
  526. }
  527. }
  528. }
  529. Ok(children)
  530. };
  531. let root_path = PathBuf::from(settings.root_folder()).expand();
  532. if !root_path.exists() {
  533. return Err(MeliError::new(format!(
  534. "Configuration error ({}): root_path `{}` is not a valid directory.",
  535. settings.name(),
  536. settings.root_folder.as_str()
  537. )));
  538. } else if !root_path.is_dir() {
  539. return Err(MeliError::new(format!(
  540. "Configuration error ({}): root_path `{}` is not a directory.",
  541. settings.name(),
  542. settings.root_folder.as_str()
  543. )));
  544. }
  545. if let Ok(f) = MaildirFolder::new(
  546. root_path.to_str().unwrap().to_string(),
  547. root_path.file_name().unwrap().to_str().unwrap().to_string(),
  548. None,
  549. Vec::with_capacity(0),
  550. settings,
  551. ) {
  552. folders.insert(f.hash, f);
  553. }
  554. if folders.is_empty() {
  555. let children = recurse_folders(&mut folders, settings, &root_path)?;
  556. children
  557. .iter()
  558. .map(|c| folders.get_mut(c).map(|f| f.parent = None))
  559. .count();
  560. } else {
  561. let root_hash = *folders.keys().nth(0).unwrap();
  562. let children = recurse_folders(&mut folders, settings, &root_path)?;
  563. children
  564. .iter()
  565. .map(|c| folders.get_mut(c).map(|f| f.parent = Some(root_hash)))
  566. .count();
  567. folders.get_mut(&root_hash).map(|f| f.children = children);
  568. }
  569. folders.retain(|_, f| is_subscribed(f.path()));
  570. let keys = folders.keys().cloned().collect::<FnvHashSet<FolderHash>>();
  571. for f in folders.values_mut() {
  572. f.children.retain(|c| keys.contains(c));
  573. }
  574. let mut hash_indexes =
  575. FnvHashMap::with_capacity_and_hasher(folders.len(), Default::default());
  576. for &fh in folders.keys() {
  577. hash_indexes.insert(
  578. fh,
  579. HashIndex {
  580. index: FnvHashMap::with_capacity_and_hasher(0, Default::default()),
  581. hash: fh,
  582. },
  583. );
  584. }
  585. Ok(Box::new(MaildirType {
  586. name: settings.name().to_string(),
  587. folders,
  588. hash_indexes: Arc::new(Mutex::new(hash_indexes)),
  589. folder_index: Default::default(),
  590. path: root_path,
  591. }))
  592. }
  593. fn owned_folder_idx(&self, folder: &Folder) -> FolderHash {
  594. *self
  595. .folders
  596. .iter()
  597. .find(|(_, f)| f.hash() == folder.hash())
  598. .unwrap()
  599. .0
  600. }
  601. pub fn multicore(&mut self, cores: usize, folder: &Folder) -> Async<Result<Vec<Envelope>>> {
  602. let mut w = AsyncBuilder::new();
  603. let cache_dir = xdg::BaseDirectories::with_profile("meli", &self.name).unwrap();
  604. let handle = {
  605. let tx = w.tx();
  606. // TODO: Avoid clone
  607. let folder: &MaildirFolder = &self.folders[&self.owned_folder_idx(folder)];
  608. let folder_hash = folder.hash();
  609. let tx_final = w.tx();
  610. let path: PathBuf = folder.fs_path().into();
  611. let name = format!("parsing {:?}", folder.name());
  612. let root_path = self.path.to_path_buf();
  613. let map = self.hash_indexes.clone();
  614. let folder_index = self.folder_index.clone();
  615. let closure = move |work_context: crate::async_workers::WorkContext| {
  616. let name = name.clone();
  617. work_context
  618. .set_name
  619. .send((std::thread::current().id(), name.clone()))
  620. .unwrap();
  621. let root_path = root_path.clone();
  622. let map = map.clone();
  623. let folder_index = folder_index.clone();
  624. let tx = tx.clone();
  625. let cache_dir = cache_dir.clone();
  626. let path = path.clone();
  627. let thunk = move || {
  628. let mut path = path.clone();
  629. let cache_dir = cache_dir.clone();
  630. path.push("new");
  631. for d in path.read_dir()? {
  632. if let Ok(p) = d {
  633. move_to_cur(p.path()).ok().take();
  634. }
  635. }
  636. path.pop();
  637. path.push("cur");
  638. let iter = path.read_dir()?;
  639. let count = path.read_dir()?.count();
  640. let mut files: Vec<PathBuf> = Vec::with_capacity(count);
  641. let mut ret = Vec::with_capacity(count);
  642. for e in iter {
  643. let e = e.and_then(|x| {
  644. let path = x.path();
  645. Ok(path)
  646. })?;
  647. files.push(e);
  648. }
  649. if !files.is_empty() {
  650. crossbeam::scope(|scope| {
  651. let mut threads = Vec::with_capacity(cores);
  652. let cache_dir = cache_dir.clone();
  653. let chunk_size = if count / cores > 0 {
  654. count / cores
  655. } else {
  656. count
  657. };
  658. for chunk in files.chunks(chunk_size) {
  659. let cache_dir = cache_dir.clone();
  660. let tx = tx.clone();
  661. let map = map.clone();
  662. let folder_index = folder_index.clone();
  663. let root_path = root_path.clone();
  664. let s = scope.builder().name(name.clone()).spawn(move |_| {
  665. let len = chunk.len();
  666. let size = if len <= 100 { 100 } else { (len / 100) * 100 };
  667. let mut local_r: Vec<Envelope> =
  668. Vec::with_capacity(chunk.len());
  669. for c in chunk.chunks(size) {
  670. //thread::yield_now();
  671. let map = map.clone();
  672. let folder_index = folder_index.clone();
  673. let len = c.len();
  674. for file in c {
  675. /* Check if we have a cache file with this email's
  676. * filename */
  677. let file_name = PathBuf::from(file)
  678. .strip_prefix(&root_path)
  679. .unwrap()
  680. .to_path_buf();
  681. if let Some(cached) =
  682. cache_dir.find_cache_file(&file_name)
  683. {
  684. /* Cached struct exists, try to load it */
  685. let reader = io::BufReader::new(
  686. fs::File::open(&cached).unwrap(),
  687. );
  688. let result: result::Result<Envelope, _> =
  689. bincode::deserialize_from(reader);
  690. if let Ok(env) = result {
  691. let mut map = map.lock().unwrap();
  692. let map = map.entry(folder_hash).or_default();
  693. let hash = env.hash();
  694. map.insert(hash, file.clone().into());
  695. folder_index
  696. .lock()
  697. .unwrap()
  698. .insert(hash, folder_hash);
  699. local_r.push(env);
  700. continue;
  701. }
  702. };
  703. let hash = get_file_hash(file);
  704. {
  705. let mut map = map.lock().unwrap();
  706. let map = map.entry(folder_hash).or_default();
  707. (*map).insert(hash, PathBuf::from(file).into());
  708. }
  709. let op = Box::new(MaildirOp::new(
  710. hash,
  711. map.clone(),
  712. folder_hash,
  713. ));
  714. if let Some(e) = Envelope::from_token(op, hash) {
  715. folder_index
  716. .lock()
  717. .unwrap()
  718. .insert(e.hash(), folder_hash);
  719. if let Ok(cached) =
  720. cache_dir.place_cache_file(file_name)
  721. {
  722. /* place result in cache directory */
  723. let f = match fs::File::create(cached) {
  724. Ok(f) => f,
  725. Err(e) => {
  726. panic!("{}", e);
  727. }
  728. };
  729. let metadata = f.metadata().unwrap();
  730. let mut permissions = metadata.permissions();
  731. permissions.set_mode(0o600); // Read/write for owner only.
  732. f.set_permissions(permissions).unwrap();
  733. let writer = io::BufWriter::new(f);
  734. bincode::serialize_into(writer, &e).unwrap();
  735. }
  736. local_r.push(e);
  737. } else {
  738. debug!(
  739. "DEBUG: hash {}, path: {} couldn't be parsed",
  740. hash,
  741. file.as_path().display()
  742. );
  743. continue;
  744. }
  745. }
  746. tx.send(AsyncStatus::ProgressReport(len)).unwrap();
  747. }
  748. local_r
  749. });
  750. threads.push(s.unwrap());
  751. }
  752. for t in threads {
  753. let mut result = t.join().unwrap();
  754. ret.append(&mut result);
  755. work_context
  756. .set_status
  757. .send((
  758. std::thread::current().id(),
  759. format!("parsing.. {}/{}", ret.len(), files.len()),
  760. ))
  761. .unwrap();
  762. }
  763. })
  764. .unwrap();
  765. }
  766. Ok(ret)
  767. };
  768. let result = thunk();
  769. tx_final.send(AsyncStatus::Payload(result)).unwrap();
  770. tx_final.send(AsyncStatus::Finished).unwrap();
  771. };
  772. Box::new(closure)
  773. };
  774. w.build(handle)
  775. }
  776. pub fn save_to_folder(mut path: PathBuf, bytes: &[u8], flags: Option<Flag>) -> Result<()> {
  777. path.push("cur");
  778. {
  779. let mut rand_buf = [0u8; 16];
  780. let mut f =
  781. fs::File::open("/dev/urandom").expect("Could not open /dev/urandom for reading");
  782. f.read_exact(&mut rand_buf)
  783. .expect("Could not read from /dev/urandom/");
  784. let mut hostn_buf = String::with_capacity(256);
  785. let mut f =
  786. fs::File::open("/etc/hostname").expect("Could not open /etc/hostname for reading");
  787. f.read_to_string(&mut hostn_buf)
  788. .expect("Could not read from /etc/hostname");
  789. let timestamp = std::time::SystemTime::now()
  790. .duration_since(std::time::SystemTime::UNIX_EPOCH)
  791. .unwrap()
  792. .as_millis();
  793. let mut filename = format!(
  794. "{}.{:x}_{}.{}:2,",
  795. timestamp,
  796. u128::from_be_bytes(rand_buf),
  797. std::process::id(),
  798. hostn_buf.trim()
  799. );
  800. if let Some(flags) = flags {
  801. if !(flags & Flag::DRAFT).is_empty() {
  802. filename.push('D');
  803. }
  804. if !(flags & Flag::FLAGGED).is_empty() {
  805. filename.push('F');
  806. }
  807. if !(flags & Flag::PASSED).is_empty() {
  808. filename.push('P');
  809. }
  810. if !(flags & Flag::REPLIED).is_empty() {
  811. filename.push('R');
  812. }
  813. if !(flags & Flag::SEEN).is_empty() {
  814. filename.push('S');
  815. }
  816. if !(flags & Flag::TRASHED).is_empty() {
  817. filename.push('T');
  818. }
  819. }
  820. path.push(filename);
  821. }
  822. debug!("saving at {}", path.display());
  823. let file = fs::File::create(path).unwrap();
  824. let metadata = file.metadata()?;
  825. let mut permissions = metadata.permissions();
  826. permissions.set_mode(0o600); // Read/write for owner only.
  827. file.set_permissions(permissions)?;
  828. let mut writer = io::BufWriter::new(file);
  829. writer.write_all(bytes).unwrap();
  830. return Ok(());
  831. }
  832. }
  833. fn add_path_to_index(
  834. hash_index: &HashIndexes,
  835. folder_hash: FolderHash,
  836. path: &Path,
  837. cache_dir: &xdg::BaseDirectories,
  838. file_name: PathBuf,
  839. ) -> Option<Envelope> {
  840. let env: Envelope;
  841. debug!("add_path_to_index path {:?} filename{:?}", path, file_name);
  842. let hash = get_file_hash(path);
  843. {
  844. let mut map = hash_index.lock().unwrap();
  845. let map = map.entry(folder_hash).or_default();
  846. map.insert(hash, path.to_path_buf().into());
  847. debug!(
  848. "inserted {} in {} map, len={}",
  849. hash,
  850. folder_hash,
  851. map.len()
  852. );
  853. }
  854. let op = Box::new(MaildirOp::new(hash, hash_index.clone(), folder_hash));
  855. if let Some(e) = Envelope::from_token(op, hash) {
  856. debug!("add_path_to_index gen {}\t{}", hash, file_name.display());
  857. if let Ok(cached) = cache_dir.place_cache_file(file_name) {
  858. debug!("putting in cache");
  859. /* place result in cache directory */
  860. let f = match fs::File::create(cached) {
  861. Ok(f) => f,
  862. Err(e) => {
  863. panic!("{}", e);
  864. }
  865. };
  866. let metadata = f.metadata().unwrap();
  867. let mut permissions = metadata.permissions();
  868. permissions.set_mode(0o600); // Read/write for owner only.
  869. f.set_permissions(permissions).unwrap();
  870. let writer = io::BufWriter::new(f);
  871. bincode::serialize_into(writer, &e).unwrap();
  872. }
  873. env = e;
  874. } else {
  875. debug!(
  876. "DEBUG: hash {}, path: {} couldn't be parsed in `add_path_to_index`",
  877. hash,
  878. path.display()
  879. );
  880. return None;
  881. }
  882. Some(env)
  883. }