imap: add UIDVALIDITY check

On UIDVALIDITY change, discard cache and force rescan.
jmap
Manos Pitsidianakis 2019-11-10 23:02:23 +02:00
parent 0cbc44fd0e
commit c1902f96b5
Signed by: Manos Pitsidianakis
GPG Key ID: 73627C2F690DF710
4 changed files with 136 additions and 116 deletions

View File

@ -82,6 +82,15 @@ impl std::ops::Deref for IsSubscribedFn {
} }
} }
type Capabilities = FnvHashSet<Vec<u8>>; type Capabilities = FnvHashSet<Vec<u8>>;
#[derive(Debug)]
pub struct UIDStore {
uidvalidity: Arc<Mutex<FnvHashMap<FolderHash, UID>>>,
hash_index: Arc<Mutex<FnvHashMap<EnvelopeHash, (UID, FolderHash)>>>,
uid_index: Arc<Mutex<FnvHashMap<UID, EnvelopeHash>>>,
byte_cache: Arc<Mutex<FnvHashMap<UID, EnvelopeCache>>>,
}
#[derive(Debug)] #[derive(Debug)]
pub struct ImapType { pub struct ImapType {
account_name: String, account_name: String,
@ -89,12 +98,9 @@ pub struct ImapType {
is_subscribed: Arc<IsSubscribedFn>, is_subscribed: Arc<IsSubscribedFn>,
connection: Arc<Mutex<ImapConnection>>, connection: Arc<Mutex<ImapConnection>>,
server_conf: ImapServerConf, server_conf: ImapServerConf,
uid_store: Arc<UIDStore>,
folders: Arc<Mutex<FnvHashMap<FolderHash, ImapFolder>>>, folders: Arc<Mutex<FnvHashMap<FolderHash, ImapFolder>>>,
hash_index: Arc<Mutex<FnvHashMap<EnvelopeHash, (UID, FolderHash)>>>,
uid_index: Arc<Mutex<FnvHashMap<usize, EnvelopeHash>>>,
byte_cache: Arc<Mutex<FnvHashMap<UID, EnvelopeCache>>>,
} }
impl MailBackend for ImapType { impl MailBackend for ImapType {
@ -114,8 +120,7 @@ impl MailBackend for ImapType {
let mut w = AsyncBuilder::new(); let mut w = AsyncBuilder::new();
let handle = { let handle = {
let tx = w.tx(); let tx = w.tx();
let hash_index = self.hash_index.clone(); let uid_store = self.uid_store.clone();
let uid_index = self.uid_index.clone();
let folder_path = folder.path().to_string(); let folder_path = folder.path().to_string();
let folder_hash = folder.hash(); let folder_hash = folder.hash();
let folder_exists = self.folders.lock().unwrap()[&folder_hash].exists.clone(); let folder_exists = self.folders.lock().unwrap()[&folder_hash].exists.clone();
@ -133,14 +138,18 @@ impl MailBackend for ImapType {
conn.send_command(format!("EXAMINE {}", folder_path).as_bytes()) conn.send_command(format!("EXAMINE {}", folder_path).as_bytes())
conn.read_response(&mut response) conn.read_response(&mut response)
); );
let examine_response = protocol_parser::select_response(&response) let examine_response = protocol_parser::select_response(&response);
.to_full_result()
.map_err(MeliError::from);
exit_on_error!(&tx, examine_response); exit_on_error!(&tx, examine_response);
let mut exists: usize = match examine_response.unwrap() { let examine_response = examine_response.unwrap();
SelectResponse::Ok(ok) => ok.uidnext - 1, let mut exists: usize = examine_response.uidnext - 1;
SelectResponse::Bad(b) => b.exists, {
}; let mut uidvalidities = uid_store.uidvalidity.lock().unwrap();
let v = uidvalidities
.entry(folder_hash)
.or_insert(examine_response.uidvalidity);
*v = examine_response.uidvalidity;
}
{ {
let mut folder_exists = folder_exists.lock().unwrap(); let mut folder_exists = folder_exists.lock().unwrap();
*folder_exists = exists; *folder_exists = exists;
@ -171,11 +180,12 @@ impl MailBackend for ImapType {
if let Some(flags) = flags { if let Some(flags) = flags {
env.set_flags(flags); env.set_flags(flags);
} }
hash_index uid_store
.hash_index
.lock() .lock()
.unwrap() .unwrap()
.insert(env.hash(), (uid, folder_hash)); .insert(env.hash(), (uid, folder_hash));
uid_index.lock().unwrap().insert(uid, env.hash()); uid_store.uid_index.lock().unwrap().insert(uid, env.hash());
envelopes.push(env); envelopes.push(env);
} }
} }
@ -211,8 +221,7 @@ impl MailBackend for ImapType {
let conn = ImapConnection::new_connection(&self.server_conf); let conn = ImapConnection::new_connection(&self.server_conf);
let main_conn = self.connection.clone(); let main_conn = self.connection.clone();
let is_online = self.online.clone(); let is_online = self.online.clone();
let hash_index = self.hash_index.clone(); let uid_store = self.uid_store.clone();
let uid_index = self.uid_index.clone();
let handle = std::thread::Builder::new() let handle = std::thread::Builder::new()
.name(format!("{} imap connection", self.account_name.as_str(),)) .name(format!("{} imap connection", self.account_name.as_str(),))
.spawn(move || { .spawn(move || {
@ -225,8 +234,7 @@ impl MailBackend for ImapType {
conn, conn,
is_online, is_online,
main_conn, main_conn,
hash_index, uid_store,
uid_index,
folders, folders,
sender, sender,
work_context, work_context,
@ -262,14 +270,14 @@ impl MailBackend for ImapType {
} }
fn operation(&self, hash: EnvelopeHash) -> Box<dyn BackendOp> { fn operation(&self, hash: EnvelopeHash) -> Box<dyn BackendOp> {
let (uid, folder_hash) = self.hash_index.lock().unwrap()[&hash]; let (uid, folder_hash) = self.uid_store.hash_index.lock().unwrap()[&hash];
Box::new(ImapOp::new( Box::new(ImapOp::new(
uid, uid,
self.folders.lock().unwrap()[&folder_hash] self.folders.lock().unwrap()[&folder_hash]
.path() .path()
.to_string(), .to_string(),
self.connection.clone(), self.connection.clone(),
self.byte_cache.clone(), self.uid_store.clone(),
)) ))
} }
@ -432,9 +440,12 @@ impl ImapType {
folders: Arc::new(Mutex::new(Default::default())), folders: Arc::new(Mutex::new(Default::default())),
connection: Arc::new(Mutex::new(connection)), connection: Arc::new(Mutex::new(connection)),
hash_index: Default::default(), uid_store: Arc::new(UIDStore {
uid_index: Default::default(), uidvalidity: Default::default(),
byte_cache: Default::default(), hash_index: Default::default(),
uid_index: Default::default(),
byte_cache: Default::default(),
}),
} }
} }
@ -543,7 +554,7 @@ impl ImapType {
for l in lines.by_ref() { for l in lines.by_ref() {
if l.starts_with("* SEARCH") { if l.starts_with("* SEARCH") {
use std::iter::FromIterator; use std::iter::FromIterator;
let uid_index = self.uid_index.lock()?; let uid_index = self.uid_store.uid_index.lock()?;
return Ok(crate::structs::StackVec::from_iter( return Ok(crate::structs::StackVec::from_iter(
l["* SEARCH".len()..] l["* SEARCH".len()..]
.trim() .trim()

View File

@ -37,7 +37,7 @@ pub struct ImapOp {
folder_path: String, folder_path: String,
flags: Cell<Option<Flag>>, flags: Cell<Option<Flag>>,
connection: Arc<Mutex<ImapConnection>>, connection: Arc<Mutex<ImapConnection>>,
byte_cache: Arc<Mutex<FnvHashMap<UID, EnvelopeCache>>>, uid_store: Arc<UIDStore>,
} }
impl ImapOp { impl ImapOp {
@ -45,7 +45,7 @@ impl ImapOp {
uid: usize, uid: usize,
folder_path: String, folder_path: String,
connection: Arc<Mutex<ImapConnection>>, connection: Arc<Mutex<ImapConnection>>,
byte_cache: Arc<Mutex<FnvHashMap<UID, EnvelopeCache>>>, uid_store: Arc<UIDStore>,
) -> Self { ) -> Self {
ImapOp { ImapOp {
uid, uid,
@ -55,7 +55,7 @@ impl ImapOp {
body: None, body: None,
folder_path, folder_path,
flags: Cell::new(None), flags: Cell::new(None),
byte_cache, uid_store,
} }
} }
} }
@ -67,7 +67,7 @@ impl BackendOp for ImapOp {
fn as_bytes(&mut self) -> Result<&[u8]> { fn as_bytes(&mut self) -> Result<&[u8]> {
if self.bytes.is_none() { if self.bytes.is_none() {
let mut bytes_cache = self.byte_cache.lock()?; let mut bytes_cache = self.uid_store.byte_cache.lock()?;
let cache = bytes_cache.entry(self.uid).or_default(); let cache = bytes_cache.entry(self.uid).or_default();
if cache.bytes.is_some() { if cache.bytes.is_some() {
self.bytes = cache.bytes.clone(); self.bytes = cache.bytes.clone();
@ -122,7 +122,7 @@ impl BackendOp for ImapOp {
return Ok(result); return Ok(result);
} }
if self.headers.is_none() { if self.headers.is_none() {
let mut bytes_cache = self.byte_cache.lock()?; let mut bytes_cache = self.uid_store.byte_cache.lock()?;
let cache = bytes_cache.entry(self.uid).or_default(); let cache = bytes_cache.entry(self.uid).or_default();
if cache.headers.is_some() { if cache.headers.is_some() {
self.headers = cache.headers.clone(); self.headers = cache.headers.clone();
@ -175,7 +175,7 @@ impl BackendOp for ImapOp {
return Ok(result); return Ok(result);
} }
if self.body.is_none() { if self.body.is_none() {
let mut bytes_cache = self.byte_cache.lock()?; let mut bytes_cache = self.uid_store.byte_cache.lock()?;
let cache = bytes_cache.entry(self.uid).or_default(); let cache = bytes_cache.entry(self.uid).or_default();
if cache.body.is_some() { if cache.body.is_some() {
self.body = cache.body.clone(); self.body = cache.body.clone();
@ -223,7 +223,7 @@ impl BackendOp for ImapOp {
if self.flags.get().is_some() { if self.flags.get().is_some() {
return self.flags.get().unwrap(); return self.flags.get().unwrap();
} }
let mut bytes_cache = self.byte_cache.lock().unwrap(); let mut bytes_cache = self.uid_store.byte_cache.lock().unwrap();
let cache = bytes_cache.entry(self.uid).or_default(); let cache = bytes_cache.entry(self.uid).or_default();
if cache.flags.is_some() { if cache.flags.is_some() {
self.flags.set(cache.flags); self.flags.set(cache.flags);
@ -295,7 +295,7 @@ impl BackendOp for ImapOp {
} }
conn.send_command(format!("EXAMINE \"{}\"", &self.folder_path,).as_bytes())?; conn.send_command(format!("EXAMINE \"{}\"", &self.folder_path,).as_bytes())?;
conn.read_response(&mut response)?; conn.read_response(&mut response)?;
let mut bytes_cache = self.byte_cache.lock()?; let mut bytes_cache = self.uid_store.byte_cache.lock()?;
let cache = bytes_cache.entry(self.uid).or_default(); let cache = bytes_cache.entry(self.uid).or_default();
cache.flags = Some(flag); cache.flags = Some(flag);
Ok(()) Ok(())

View File

@ -278,14 +278,8 @@ named!(
do_parse!(tag!("* SEARCH\r\n") >> ({ &b""[0..] }))) do_parse!(tag!("* SEARCH\r\n") >> ({ &b""[0..] })))
); );
#[derive(Debug, Clone)]
pub enum SelectResponse {
Ok(SelectResponseOk),
Bad(SelectResponseBad),
}
#[derive(Debug, Default, Clone)] #[derive(Debug, Default, Clone)]
pub struct SelectResponseOk { pub struct SelectResponse {
pub exists: usize, pub exists: usize,
pub recent: usize, pub recent: usize,
pub flags: Flag, pub flags: Flag,
@ -295,13 +289,6 @@ pub struct SelectResponseOk {
pub permanentflags: Flag, pub permanentflags: Flag,
} }
#[derive(Debug, Default, Clone)]
pub struct SelectResponseBad {
pub exists: usize,
pub recent: usize,
pub flags: Flag,
}
/* /*
* Example: C: A142 SELECT INBOX * Example: C: A142 SELECT INBOX
* S: * 172 EXISTS * S: * 172 EXISTS
@ -324,9 +311,9 @@ pub struct SelectResponseBad {
* * OK [UIDVALIDITY 1554422056] UIDs valid * * OK [UIDVALIDITY 1554422056] UIDs valid
* * OK [UIDNEXT 50] Predicted next UID * * OK [UIDNEXT 50] Predicted next UID
*/ */
pub fn select_response(input: &str) -> IResult<&str, SelectResponse> { pub fn select_response(input: &str) -> Result<SelectResponse> {
if input.contains("* OK") { if input.contains("* OK") {
let mut ret = SelectResponseOk::default(); let mut ret = SelectResponse::default();
for l in input.split("\r\n") { for l in input.split("\r\n") {
if l.starts_with("* ") && l.ends_with(" EXISTS") { if l.starts_with("* ") && l.ends_with(" EXISTS") {
ret.exists = usize::from_str(&l["* ".len()..l.len() - " EXISTS".len()]).unwrap(); ret.exists = usize::from_str(&l["* ".len()..l.len() - " EXISTS".len()]).unwrap();
@ -354,23 +341,10 @@ pub fn select_response(input: &str) -> IResult<&str, SelectResponse> {
debug!("select response: {}", l); debug!("select response: {}", l);
} }
} }
IResult::Done(&""[0..], SelectResponse::Ok(ret)) Ok(ret)
} else { } else {
let mut ret = SelectResponseBad::default(); debug!("BAD/NO response in select: {}", input);
for l in input.split("\r\n") { Err(MeliError::new(input))
if l.starts_with("* ") && l.ends_with(" EXISTS") {
ret.exists = usize::from_str(&l["* ".len()..l.len() - " EXISTS".len()]).unwrap();
} else if l.starts_with("* ") && l.ends_with(" RECENT") {
ret.recent = usize::from_str(&l["* ".len()..l.len() - " RECENT".len()]).unwrap();
} else if l.starts_with("* FLAGS (") {
ret.flags = flags(&l["* FLAGS (".len()..l.len() - ")".len()])
.to_full_result()
.unwrap();
} else if !l.is_empty() {
debug!("select response: {}", l);
}
}
IResult::Done(&""[0..], SelectResponse::Bad(ret))
} }
} }

View File

@ -26,8 +26,7 @@ pub struct ImapWatchKit {
pub conn: ImapConnection, pub conn: ImapConnection,
pub is_online: Arc<Mutex<bool>>, pub is_online: Arc<Mutex<bool>>,
pub main_conn: Arc<Mutex<ImapConnection>>, pub main_conn: Arc<Mutex<ImapConnection>>,
pub hash_index: Arc<Mutex<FnvHashMap<EnvelopeHash, (UID, FolderHash)>>>, pub uid_store: Arc<UIDStore>,
pub uid_index: Arc<Mutex<FnvHashMap<usize, EnvelopeHash>>>,
pub folders: Arc<Mutex<FnvHashMap<FolderHash, ImapFolder>>>, pub folders: Arc<Mutex<FnvHashMap<FolderHash, ImapFolder>>>,
pub sender: RefreshEventConsumer, pub sender: RefreshEventConsumer,
pub work_context: WorkContext, pub work_context: WorkContext,
@ -53,8 +52,7 @@ pub fn poll_with_examine(kit: ImapWatchKit) -> Result<()> {
is_online, is_online,
mut conn, mut conn,
main_conn, main_conn,
hash_index, uid_store,
uid_index,
folders, folders,
sender, sender,
work_context, work_context,
@ -82,14 +80,7 @@ pub fn poll_with_examine(kit: ImapWatchKit) -> Result<()> {
format!("examining `{}` for updates...", folder.path()), format!("examining `{}` for updates...", folder.path()),
)) ))
.unwrap(); .unwrap();
examine_updates( examine_updates(folder, &sender, &mut conn, &uid_store, &work_context)?;
folder,
&sender,
&mut conn,
&hash_index,
&uid_index,
&work_context,
)?;
} }
let mut main_conn = main_conn.lock().unwrap(); let mut main_conn = main_conn.lock().unwrap();
main_conn.send_command(b"NOOP").unwrap(); main_conn.send_command(b"NOOP").unwrap();
@ -106,8 +97,7 @@ pub fn idle(kit: ImapWatchKit) -> Result<()> {
mut conn, mut conn,
is_online, is_online,
main_conn, main_conn,
hash_index, uid_store,
uid_index,
folders, folders,
sender, sender,
work_context, work_context,
@ -139,15 +129,37 @@ pub fn idle(kit: ImapWatchKit) -> Result<()> {
debug!("select response {}", &response); debug!("select response {}", &response);
{ {
let mut prev_exists = folder.exists.lock().unwrap(); let mut prev_exists = folder.exists.lock().unwrap();
*prev_exists = match protocol_parser::select_response(&response) *prev_exists = match protocol_parser::select_response(&response) {
.to_full_result() Ok(ok) => {
.map_err(MeliError::from) {
{ let mut uidvalidities = uid_store.uidvalidity.lock().unwrap();
Ok(SelectResponse::Bad(bad)) => {
debug!(bad); if let Some(v) = uidvalidities.get_mut(&folder_hash) {
panic!("could not select mailbox"); if *v != ok.uidvalidity {
} sender.send(RefreshEvent {
Ok(SelectResponse::Ok(ok)) => { hash: folder_hash,
kind: RefreshEventKind::Rescan,
});
uid_store.uid_index.lock().unwrap().clear();
uid_store.hash_index.lock().unwrap().clear();
uid_store.byte_cache.lock().unwrap().clear();
*v = ok.uidvalidity;
}
} else {
sender.send(RefreshEvent {
hash: folder_hash,
kind: RefreshEventKind::Rescan,
});
sender.send(RefreshEvent {
hash: folder_hash,
kind: RefreshEventKind::Failure(MeliError::new(format!(
"Unknown mailbox: {} {}",
folder.path(),
folder_hash
))),
});
}
}
debug!(&ok); debug!(&ok);
ok.exists ok.exists
} }
@ -216,14 +228,7 @@ pub fn idle(kit: ImapWatchKit) -> Result<()> {
format!("examining `{}` for updates...", folder.path()), format!("examining `{}` for updates...", folder.path()),
)) ))
.unwrap(); .unwrap();
examine_updates( examine_updates(folder, &sender, &mut iter.conn, &uid_store, &work_context);
folder,
&sender,
&mut iter.conn,
&hash_index,
&uid_index,
&work_context,
);
} }
work_context work_context
.set_status .set_status
@ -299,11 +304,12 @@ pub fn idle(kit: ImapWatchKit) -> Result<()> {
.unwrap(); .unwrap();
if let Ok(env) = Envelope::from_bytes(&b, flags) { if let Ok(env) = Envelope::from_bytes(&b, flags) {
ctr += 1; ctr += 1;
hash_index uid_store
.hash_index
.lock() .lock()
.unwrap() .unwrap()
.insert(env.hash(), (uid, folder_hash)); .insert(env.hash(), (uid, folder_hash));
uid_index.lock().unwrap().insert(uid, env.hash()); uid_store.uid_index.lock().unwrap().insert(uid, env.hash());
debug!( debug!(
"Create event {} {} {}", "Create event {} {} {}",
env.hash(), env.hash(),
@ -407,17 +413,18 @@ pub fn idle(kit: ImapWatchKit) -> Result<()> {
format!("parsing {}/{} envelopes..", ctr, len), format!("parsing {}/{} envelopes..", ctr, len),
)) ))
.unwrap(); .unwrap();
if uid_index.lock().unwrap().contains_key(&uid) { if uid_store.uid_index.lock().unwrap().contains_key(&uid) {
ctr += 1; ctr += 1;
continue; continue;
} }
if let Ok(env) = Envelope::from_bytes(&b, flags) { if let Ok(env) = Envelope::from_bytes(&b, flags) {
ctr += 1; ctr += 1;
hash_index uid_store
.hash_index
.lock() .lock()
.unwrap() .unwrap()
.insert(env.hash(), (uid, folder_hash)); .insert(env.hash(), (uid, folder_hash));
uid_index.lock().unwrap().insert(uid, env.hash()); uid_store.uid_index.lock().unwrap().insert(uid, env.hash());
debug!( debug!(
"Create event {} {} {}", "Create event {} {} {}",
env.hash(), env.hash(),
@ -476,8 +483,7 @@ fn examine_updates(
folder: &ImapFolder, folder: &ImapFolder,
sender: &RefreshEventConsumer, sender: &RefreshEventConsumer,
conn: &mut ImapConnection, conn: &mut ImapConnection,
hash_index: &Arc<Mutex<FnvHashMap<EnvelopeHash, (UID, FolderHash)>>>, uid_store: &Arc<UIDStore>,
uid_index: &Arc<Mutex<FnvHashMap<usize, EnvelopeHash>>>,
work_context: &WorkContext, work_context: &WorkContext,
) -> Result<()> { ) -> Result<()> {
let thread_id: std::thread::ThreadId = std::thread::current().id(); let thread_id: std::thread::ThreadId = std::thread::current().id();
@ -492,16 +498,39 @@ fn examine_updates(
conn.send_command(format!("EXAMINE {}", folder.path()).as_bytes()) conn.send_command(format!("EXAMINE {}", folder.path()).as_bytes())
conn.read_response(&mut response) conn.read_response(&mut response)
); );
match protocol_parser::select_response(&response) match protocol_parser::select_response(&response) {
.to_full_result() Ok(ok) => {
.map_err(MeliError::from)
{
Ok(SelectResponse::Bad(bad)) => {
debug!(bad);
panic!("could not select mailbox");
}
Ok(SelectResponse::Ok(ok)) => {
debug!(&ok); debug!(&ok);
{
let mut uidvalidities = uid_store.uidvalidity.lock().unwrap();
if let Some(v) = uidvalidities.get_mut(&folder_hash) {
if *v != ok.uidvalidity {
sender.send(RefreshEvent {
hash: folder_hash,
kind: RefreshEventKind::Rescan,
});
uid_store.uid_index.lock().unwrap().clear();
uid_store.hash_index.lock().unwrap().clear();
uid_store.byte_cache.lock().unwrap().clear();
*v = ok.uidvalidity;
}
} else {
// FIXME: Handle this case in ui/src/conf/accounts.rs
sender.send(RefreshEvent {
hash: folder_hash,
kind: RefreshEventKind::Rescan,
});
sender.send(RefreshEvent {
hash: folder_hash,
kind: RefreshEventKind::Failure(MeliError::new(format!(
"Unknown mailbox: {} {}",
folder.path(),
folder_hash
))),
});
}
}
let mut prev_exists = folder.exists.lock().unwrap(); let mut prev_exists = folder.exists.lock().unwrap();
let n = ok.exists; let n = ok.exists;
if ok.recent > 0 { if ok.recent > 0 {
@ -542,11 +571,16 @@ fn examine_updates(
Ok(v) => { Ok(v) => {
for (uid, flags, b) in v { for (uid, flags, b) in v {
if let Ok(env) = Envelope::from_bytes(&b, flags) { if let Ok(env) = Envelope::from_bytes(&b, flags) {
hash_index uid_store
.hash_index
.lock() .lock()
.unwrap() .unwrap()
.insert(env.hash(), (uid, folder_hash)); .insert(env.hash(), (uid, folder_hash));
uid_index.lock().unwrap().insert(uid, env.hash()); uid_store
.uid_index
.lock()
.unwrap()
.insert(uid, env.hash());
debug!( debug!(
"Create event {} {} {}", "Create event {} {} {}",
env.hash(), env.hash(),
@ -600,11 +634,12 @@ fn examine_updates(
Ok(v) => { Ok(v) => {
for (uid, flags, b) in v { for (uid, flags, b) in v {
if let Ok(env) = Envelope::from_bytes(&b, flags) { if let Ok(env) = Envelope::from_bytes(&b, flags) {
hash_index uid_store
.hash_index
.lock() .lock()
.unwrap() .unwrap()
.insert(env.hash(), (uid, folder_hash)); .insert(env.hash(), (uid, folder_hash));
uid_index.lock().unwrap().insert(uid, env.hash()); uid_store.uid_index.lock().unwrap().insert(uid, env.hash());
debug!( debug!(
"Create event {} {} {}", "Create event {} {} {}",
env.hash(), env.hash(),