core: remove obsolete error_queue module

Replace with generic queue functionality
axum-login-upgrade
Manos Pitsidianakis 2023-04-24 20:50:22 +03:00
parent 090b72711b
commit 703e9cb75c
Signed by: Manos Pitsidianakis
GPG Key ID: 7729C7707F7E09D0
12 changed files with 38 additions and 148 deletions

View File

@ -236,9 +236,6 @@ pub enum ErrorQueueCommand {
/// index of entry.
#[arg(long)]
index: Vec<i64>,
/// JSON format.
#[arg(long)]
json: bool,
},
/// Delete entry and print it in stdout.
Delete {

View File

@ -530,24 +530,20 @@ fn run_app(opt: Opt) -> Result<()> {
}
ErrorQueue { cmd } => match cmd {
ErrorQueueCommand::List => {
let errors = db.error_queue()?;
let errors = db.queue(Queue::Error)?;
if errors.is_empty() {
println!("Error queue is empty.");
} else {
for e in errors {
println!(
"- {} {} {} {} {}",
e["pk"],
e["datetime"],
e["from_address"],
e["to_address"],
e["subject"]
e.pk, e.datetime, e.from_address, e.to_addresses, e.subject
);
}
}
}
ErrorQueueCommand::Print { index, json } => {
let mut errors = db.error_queue()?;
ErrorQueueCommand::Print { index } => {
let mut errors = db.queue(Queue::Error)?;
if !index.is_empty() {
errors.retain(|el| index.contains(&el.pk()));
}
@ -555,16 +551,12 @@ fn run_app(opt: Opt) -> Result<()> {
println!("Error queue is empty.");
} else {
for e in errors {
if json {
println!("{:#}", e);
} else {
println!("{}", e["message"]);
}
println!("{e:?}");
}
}
}
ErrorQueueCommand::Delete { index, quiet } => {
let mut errors = db.error_queue()?;
let mut errors = db.queue(Queue::Error)?;
if !index.is_empty() {
errors.retain(|el| index.contains(&el.pk()));
}
@ -576,10 +568,10 @@ fn run_app(opt: Opt) -> Result<()> {
if !quiet {
println!("Deleting error queue elements {:?}", &index);
}
db.delete_from_error_queue(index)?;
db.delete_from_queue(Queue::Error, index)?;
if !quiet {
for e in errors {
println!("{}", e["message"]);
println!("{e:?}");
}
}
}

View File

@ -156,7 +156,7 @@ fn test_out_queue_flush() {
let out_queue = db.queue(Queue::Out).unwrap();
assert_eq!(out_queue.len(), 2);
assert_eq!(db.list_subscriptions(foo_chat.pk()).unwrap().len(), 2);
assert_eq!(db.error_queue().unwrap().len(), 0);
assert_eq!(db.queue(Queue::Error).unwrap().len(), 0);
}
log::info!("Flush out queue, subscription confirmations should be sent to the new users.");
@ -340,7 +340,7 @@ fn test_list_requests_submission() {
let out_queue = db.queue(Queue::Out).unwrap();
assert_eq!(out_queue.len(), 1);
assert_eq!(db.list_subscriptions(foo_chat.pk()).unwrap().len(), 0);
assert_eq!(db.error_queue().unwrap().len(), 0);
assert_eq!(db.queue(Queue::Error).unwrap().len(), 0);
}
log::info!("Flush out queue, help reply should go to Αλίκη.");

View File

@ -65,8 +65,6 @@ mod templates;
pub use templates::*;
mod queue;
pub use queue::*;
mod error_queue;
pub use error_queue::*;
mod posts;
pub use posts::*;
mod subscriptions;

View File

@ -1,103 +0,0 @@
/*
* This file is part of mailpot
*
* Copyright 2020 - Manos Pitsidianakis
*
* This program is free software: you can redistribute it and/or modify
* it under the terms of the GNU Affero General Public License as
* published by the Free Software Foundation, either version 3 of the
* License, or (at your option) any later version.
*
* This program is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* GNU Affero General Public License for more details.
*
* You should have received a copy of the GNU Affero General Public License
* along with this program. If not, see <https://www.gnu.org/licenses/>.
*/
use serde_json::{json, Value};
use super::*;
impl Connection {
/// Insert a received email into the error queue.
pub fn insert_to_error_queue(
&self,
list_pk: Option<i64>,
env: &Envelope,
raw: &[u8],
reason: String,
) -> Result<i64> {
let mut stmt = self.connection.prepare(
"INSERT INTO queue(which, list, comment, to_addresses, from_address, subject, \
message_id, message) VALUES('error', ?, ?, ?, ?, ?, ?, ?) RETURNING pk;",
)?;
let pk = stmt.query_row(
rusqlite::params![
&list_pk,
&reason,
&env.field_to_to_string(),
&env.field_from_to_string(),
&env.subject(),
&env.message_id().to_string(),
raw,
],
|row| {
let pk: i64 = row.get("pk")?;
Ok(pk)
},
)?;
Ok(pk)
}
/// Fetch all error queue entries.
pub fn error_queue(&self) -> Result<Vec<DbVal<Value>>> {
let mut stmt = self
.connection
.prepare("SELECT * FROM queue WHERE which = 'error';")?;
let error_iter = stmt.query_map([], |row| {
let pk = row.get::<_, i64>("pk")?;
Ok(DbVal(
json!({
"pk" : pk,
"error": row.get::<_, Option<String>>("comment")?,
"to_addresses": row.get::<_, String>("to_addresses")?,
"from_address": row.get::<_, String>("from_address")?,
"subject": row.get::<_, String>("subject")?,
"message_id": row.get::<_, String>("message_id")?,
"message": row.get::<_, Vec<u8>>("message")?,
"timestamp": row.get::<_, u64>("timestamp")?,
"datetime": row.get::<_, String>("datetime")?,
}),
pk,
))
})?;
let mut ret = vec![];
for error in error_iter {
let error = error?;
ret.push(error);
}
Ok(ret)
}
/// Delete error queue entries.
pub fn delete_from_error_queue(&mut self, index: Vec<i64>) -> Result<()> {
let tx = self.connection.transaction()?;
if index.is_empty() {
tx.execute("DELETE FROM queue WHERE which = 'error';", [])?;
} else {
for i in index {
tx.execute(
"DELETE FROM queue WHERE which = 'error' AND pk = ?;",
rusqlite::params![i],
)?;
}
};
tx.commit()?;
Ok(())
}
}

View File

@ -75,7 +75,13 @@ impl Connection {
pub fn post(&mut self, env: &Envelope, raw: &[u8], _dry_run: bool) -> Result<()> {
let result = self.inner_post(env, raw, _dry_run);
if let Err(err) = result {
return match self.insert_to_error_queue(None, env, raw, err.to_string()) {
return match self.insert_to_queue(QueueEntry::new(
Queue::Error,
None,
Some(Cow::Borrowed(env)),
raw,
Some(err.to_string()),
)?) {
Ok(idx) => {
log::info!(
"Inserted mail from {:?} into error_queue at index {}",

View File

@ -45,6 +45,8 @@ pub enum Queue {
Corrupt,
/// Emails that must be sent as soon as possible.
Out,
/// Error queue
Error,
}
impl Queue {
@ -56,6 +58,7 @@ impl Queue {
Self::Deferred => "deferred",
Self::Corrupt => "corrupt",
Self::Out => "out",
Self::Error => "error",
}
}
}

View File

@ -17,7 +17,7 @@
* along with this program. If not, see <https://www.gnu.org/licenses/>.
*/
use mailpot::{models::*, Configuration, Connection, SendMail};
use mailpot::{models::*, Configuration, Connection, Queue, SendMail};
use mailpot_tests::init_stderr_logging;
use tempfile::TempDir;
@ -67,7 +67,7 @@ fn test_accounts() {
.unwrap();
assert_eq!(post_policy.pk(), 1);
assert_eq!(db.error_queue().unwrap().len(), 0);
assert_eq!(db.queue(Queue::Error).unwrap().len(), 0);
assert_eq!(db.list_subscriptions(foo_chat.pk()).unwrap().len(), 0);
let mut db = db.untrusted();
@ -89,7 +89,7 @@ MIME-Version: 1.0
db.post(&envelope, subscribe_bytes, /* dry_run */ false)
.unwrap();
assert_eq!(db.list_subscriptions(foo_chat.pk()).unwrap().len(), 1);
assert_eq!(db.error_queue().unwrap().len(), 0);
assert_eq!(db.queue(Queue::Error).unwrap().len(), 0);
assert_eq!(db.account_by_address("user@example.com").unwrap(), None);
@ -115,7 +115,7 @@ MIME-Version: 1.0
melib::Envelope::from_bytes(&set_password_bytes, None).expect("Could not parse message");
db.post(&envelope, &set_password_bytes, /* dry_run */ false)
.unwrap();
assert_eq!(db.error_queue().unwrap().len(), 0);
assert_eq!(db.queue(Queue::Error).unwrap().len(), 0);
let acc = db.account_by_address("user@example.com").unwrap().unwrap();
assert_eq!(
@ -136,7 +136,7 @@ MIME-Version: 1.0
melib::Envelope::from_bytes(&set_password_bytes, None).expect("Could not parse message");
db.post(&envelope, &set_password_bytes, /* dry_run */ false)
.unwrap();
assert_eq!(db.error_queue().unwrap().len(), 0);
assert_eq!(db.queue(Queue::Error).unwrap().len(), 0);
let acc = db.account_by_address("user@example.com").unwrap().unwrap();
assert!(

View File

@ -17,7 +17,7 @@
* along with this program. If not, see <https://www.gnu.org/licenses/>.
*/
use mailpot::{melib, models::*, Configuration, Connection, SendMail};
use mailpot::{melib, models::*, Configuration, Connection, Queue, SendMail};
use mailpot_tests::init_stderr_logging;
use tempfile::TempDir;
@ -73,7 +73,7 @@ fn test_error_queue() {
.unwrap();
assert_eq!(post_policy.pk(), 1);
assert_eq!(db.error_queue().unwrap().len(), 0);
assert_eq!(db.queue(Queue::Error).unwrap().len(), 0);
// drop privileges
let mut db = db.untrusted();
@ -88,5 +88,5 @@ fn test_error_queue() {
mailpot::ErrorKind::PostRejected(_reason) => {}
other => panic!("Got unexpected error: {}", other),
}
assert_eq!(db.error_queue().unwrap().len(), 1);
assert_eq!(db.queue(Queue::Error).unwrap().len(), 1);
}

View File

@ -17,7 +17,7 @@
* along with this program. If not, see <https://www.gnu.org/licenses/>.
*/
use mailpot::{models::*, Configuration, Connection, SendMail};
use mailpot::{models::*, Configuration, Connection, Queue, SendMail};
use mailpot_tests::init_stderr_logging;
use tempfile::TempDir;
@ -65,7 +65,7 @@ fn test_list_subscription() {
.unwrap();
assert_eq!(post_policy.pk(), 1);
assert_eq!(db.error_queue().unwrap().len(), 0);
assert_eq!(db.queue(Queue::Error).unwrap().len(), 0);
assert_eq!(db.list_subscriptions(foo_chat.pk()).unwrap().len(), 0);
let mut db = db.untrusted();
@ -96,7 +96,7 @@ eT48L2h0bWw+
mailpot::ErrorKind::PostRejected(_reason) => {}
other => panic!("Got unexpected error: {}", other),
}
assert_eq!(db.error_queue().unwrap().len(), 1);
assert_eq!(db.queue(Queue::Error).unwrap().len(), 1);
let input_bytes_2 = b"From: Name <user@example.com>
To: <foo-chat+subscribe@example.com>
@ -115,11 +115,11 @@ MIME-Version: 1.0
db.post(&envelope, input_bytes_2, /* dry_run */ false)
.unwrap();
assert_eq!(db.list_subscriptions(foo_chat.pk()).unwrap().len(), 1);
assert_eq!(db.error_queue().unwrap().len(), 1);
assert_eq!(db.queue(Queue::Error).unwrap().len(), 1);
let envelope =
melib::Envelope::from_bytes(input_bytes_1, None).expect("Could not parse message");
db.post(&envelope, input_bytes_1, /* dry_run */ false)
.unwrap();
assert_eq!(db.error_queue().unwrap().len(), 1);
assert_eq!(db.queue(Queue::Error).unwrap().len(), 1);
assert_eq!(db.list_posts(foo_chat.pk(), None).unwrap().len(), 1);
}

View File

@ -65,7 +65,7 @@ fn test_template_replies() {
.unwrap();
assert_eq!(post_policy.pk(), 1);
assert_eq!(db.error_queue().unwrap().len(), 0);
assert_eq!(db.queue(Queue::Error).unwrap().len(), 0);
assert_eq!(db.list_subscriptions(foo_chat.pk()).unwrap().len(), 0);
/* create custom subscribe confirm template, and check that it is used in
@ -98,7 +98,7 @@ MIME-Version: 1.0
let subenvelope = melib::Envelope::from_bytes(bytes, None).expect("Could not parse message");
db.post(&subenvelope, bytes, /* dry_run */ false).unwrap();
assert_eq!(db.list_subscriptions(foo_chat.pk()).unwrap().len(), 1);
assert_eq!(db.error_queue().unwrap().len(), 0);
assert_eq!(db.queue(Queue::Error).unwrap().len(), 0);
let out_queue = db.queue(Queue::Out).unwrap();
assert_eq!(out_queue.len(), 1);
@ -138,7 +138,7 @@ MIME-Version: 1.0
let envelope = melib::Envelope::from_bytes(unbytes, None).expect("Could not parse message");
db.post(&envelope, unbytes, /* dry_run */ false).unwrap();
assert_eq!(db.list_subscriptions(foo_chat.pk()).unwrap().len(), 0);
assert_eq!(db.error_queue().unwrap().len(), 0);
assert_eq!(db.queue(Queue::Error).unwrap().len(), 0);
let out_queue = db.queue(Queue::Out).unwrap();
assert_eq!(out_queue.len(), 2);
@ -155,7 +155,7 @@ MIME-Version: 1.0
db.post(&subenvelope, bytes, /* dry_run */ false).unwrap();
assert_eq!(db.list_subscriptions(foo_chat.pk()).unwrap().len(), 1);
assert_eq!(db.error_queue().unwrap().len(), 0);
assert_eq!(db.queue(Queue::Error).unwrap().len(), 0);
let out_queue = db.queue(Queue::Out).unwrap();

View File

@ -649,16 +649,13 @@ List.
.br
mpot error\-queue print [\-\-index \fIINDEX\fR] [\-\-json \fIJSON\fR]
mpot error\-queue print [\-\-index \fIINDEX\fR]
.br
Print entry in RFC5322 or JSON format.
.TP
\-\-index \fIINDEX\fR
index of entry.
.TP
\-\-json
JSON format.
.ie \n(.g .ds Aq \(aq
.el .ds Aq '
.\fB