core: add database migrations

axum-login-upgrade
Manos Pitsidianakis 2023-05-05 12:29:21 +03:00
parent 9eaa580af4
commit d5fc2d8e75
Signed by: Manos Pitsidianakis
GPG Key ID: 7729C7707F7E09D0
11 changed files with 307 additions and 61 deletions

View File

@ -76,16 +76,20 @@ jobs:
- name: cargo test
if: success() || failure() # always run even if other steps fail, except when cancelled <https://stackoverflow.com/questions/58858429/how-to-run-a-github-actions-step-even-if-the-previous-step-fails-while-still-f>
run: |
cargo test --all --no-fail-fast --all-features
cargo test --all --no-fail-fast --all-features
- name: cargo-sort
if: success() || failure() # always run even if other steps fail, except when cancelled <https://stackoverflow.com/questions/58858429/how-to-run-a-github-actions-step-even-if-the-previous-step-fails-while-still-f>
if: success() || failure()
run: |
cargo sort --check
- name: rustfmt
if: success() || failure() # always run even if other steps fail, except when cancelled <https://stackoverflow.com/questions/58858429/how-to-run-a-github-actions-step-even-if-the-previous-step-fails-while-still-f>
if: success() || failure()
run: |
cargo fmt --check --all
- name: clippy
if: success() || failure() # always run even if other steps fail, except when cancelled <https://stackoverflow.com/questions/58858429/how-to-run-a-github-actions-step-even-if-the-previous-step-fails-while-still-f>
if: success() || failure()
run: |
cargo clippy --no-deps --all-features --all --tests --examples --benches --bins
- name: rustdoc
if: success() || failure()
run: |
make rustdoc

View File

@ -18,45 +18,141 @@
*/
use std::{
fs::{metadata, read_dir, OpenOptions},
io,
io::Write,
path::Path,
process::{Command, Stdio},
};
// Source: https://stackoverflow.com/a/64535181
fn is_output_file_outdated<P1, P2>(input: P1, output: P2) -> io::Result<bool>
where
P1: AsRef<Path>,
P2: AsRef<Path>,
{
let out_meta = metadata(output);
if let Ok(meta) = out_meta {
let output_mtime = meta.modified()?;
// if input file is more recent than our output, we are outdated
let input_meta = metadata(input)?;
let input_mtime = input_meta.modified()?;
Ok(input_mtime > output_mtime)
} else {
// output file not found, we are outdated
Ok(true)
}
}
fn main() {
println!("cargo:rerun-if-changed=migrations");
println!("cargo:rerun-if-changed=src/schema.sql.m4");
let output = Command::new("m4")
.arg("./src/schema.sql.m4")
.output()
.unwrap();
if String::from_utf8_lossy(&output.stdout).trim().is_empty() {
panic!(
"m4 output is empty. stderr was {}",
String::from_utf8_lossy(&output.stderr)
if is_output_file_outdated("src/schema.sql.m4", "src/schema.sql").unwrap() {
let output = Command::new("m4")
.arg("./src/schema.sql.m4")
.output()
.unwrap();
if String::from_utf8_lossy(&output.stdout).trim().is_empty() {
panic!(
"m4 output is empty. stderr was {}",
String::from_utf8_lossy(&output.stderr)
);
}
let mut verify = Command::new("sqlite3")
.stdin(Stdio::piped())
.stdout(Stdio::piped())
.stderr(Stdio::piped())
.spawn()
.unwrap();
println!(
"Verifying by creating an in-memory database in sqlite3 and feeding it the output \
schema."
);
verify
.stdin
.take()
.unwrap()
.write_all(&output.stdout)
.unwrap();
let exit = verify.wait_with_output().unwrap();
if !exit.status.success() {
panic!(
"sqlite3 could not read SQL schema: {}",
String::from_utf8_lossy(&exit.stdout)
);
}
let mut file = std::fs::File::create("./src/schema.sql").unwrap();
file.write_all(&output.stdout).unwrap();
}
let mut verify = Command::new("sqlite3")
.stdin(Stdio::piped())
.stdout(Stdio::piped())
.stderr(Stdio::piped())
.spawn()
.unwrap();
println!(
"Verifying by creating an in-memory database in sqlite3 and feeding it the output schema."
);
verify
.stdin
.take()
.unwrap()
.write_all(&output.stdout)
.unwrap();
let exit = verify.wait_with_output().unwrap();
if !exit.status.success() {
panic!(
"sqlite3 could not read SQL schema: {}",
String::from_utf8_lossy(&exit.stdout)
);
const MIGRATION_RS: &str = "src/migrations.rs.inc";
let mut regen = false;
let mut paths = vec![];
let mut undo_paths = vec![];
for entry in read_dir("migrations").unwrap() {
let entry = entry.unwrap();
let path = entry.path();
if path.is_dir() || path.extension().map(|os| os.to_str().unwrap()) != Some("sql") {
continue;
}
if is_output_file_outdated(&path, MIGRATION_RS).unwrap() {
regen = true;
}
if path
.file_name()
.unwrap()
.to_str()
.unwrap()
.ends_with("undo.sql")
{
undo_paths.push(path);
} else {
paths.push(path);
}
}
if regen {
paths.sort();
undo_paths.sort();
let mut migr_rs = OpenOptions::new()
.write(true)
.create(true)
.truncate(true)
.open(MIGRATION_RS)
.unwrap();
migr_rs
.write_all(b"\n//(user_version, redo sql, undo sql\n&[")
.unwrap();
for (p, u) in paths.iter().zip(undo_paths.iter()) {
// This should be a number string, padded with 2 zeros if it's less than 3
// digits. e.g. 001, \d{3}
let num = p.file_stem().unwrap().to_str().unwrap();
if !u.file_name().unwrap().to_str().unwrap().starts_with(num) {
panic!("Undo file {u:?} should match with {p:?}");
}
if num.parse::<u32>().is_err() {
panic!("Migration file {p:?} should start with a number");
}
migr_rs.write_all(b"(").unwrap();
migr_rs
.write_all(num.trim_start_matches('0').as_bytes())
.unwrap();
migr_rs.write_all(b",\"").unwrap();
migr_rs
.write_all(std::fs::read_to_string(p).unwrap().as_bytes())
.unwrap();
migr_rs.write_all(b"\",\"").unwrap();
migr_rs
.write_all(std::fs::read_to_string(u).unwrap().as_bytes())
.unwrap();
migr_rs.write_all(b"\"),").unwrap();
}
migr_rs.write_all(b"]").unwrap();
migr_rs.flush().unwrap();
}
let mut file = std::fs::File::create("./src/schema.sql").unwrap();
file.write_all(&output.stdout).unwrap();
}

View File

@ -0,0 +1,4 @@
PRAGMA foreign_keys=ON;
BEGIN;
ALTER TABLE templates RENAME TO template;
COMMIT;

View File

@ -0,0 +1,4 @@
PRAGMA foreign_keys=ON;
BEGIN;
ALTER TABLE template RENAME TO templates;
COMMIT;

View File

@ -91,7 +91,7 @@ fn user_authorizer_callback(
table_name: "post" | "queue" | "candidate_subscription" | "subscription" | "account",
}
| AuthAction::Update {
table_name: "candidate_subscription" | "templates",
table_name: "candidate_subscription" | "template",
column_name: "accepted" | "last_modified" | "verified" | "address",
}
| AuthAction::Update {
@ -129,6 +129,10 @@ impl Connection {
/// ```
pub const SCHEMA: &str = include_str!("./schema.sql");
/// Database migrations.
pub const MIGRATIONS: &'static [(u32, &'static str, &'static str)] =
include!("./migrations.rs.inc");
/// Creates a new database connection.
///
/// `Connection` supports a limited subset of operations by default (see
@ -159,11 +163,68 @@ impl Connection {
conn.set_db_config(DbConfig::SQLITE_DBCONFIG_TRUSTED_SCHEMA, false)?;
conn.busy_timeout(core::time::Duration::from_millis(500))?;
conn.busy_handler(Some(|times: i32| -> bool { times < 5 }))?;
conn.authorizer(Some(user_authorizer_callback));
Ok(Self {
let mut ret = Self {
conf,
connection: conn,
})
};
if let Some(&(latest, _, _)) = Self::MIGRATIONS.last() {
let version = ret.schema_version()?;
trace!(
"SQLITE user_version PRAGMA returned {version}. Most recent migration is {latest}."
);
if version < latest {
info!("Updating database schema from version {version} to {latest}...");
}
ret.migrate(version, latest)?;
}
ret.connection.authorizer(Some(user_authorizer_callback));
Ok(ret)
}
/// The version of the current schema.
pub fn schema_version(&self) -> Result<u32> {
Ok(self
.connection
.prepare("SELECT user_version FROM pragma_user_version;")?
.query_row([], |row| {
let v: u32 = row.get(0)?;
Ok(v)
})?)
}
/// Migrate from version `from` to `to`.
///
/// See [Self::MIGRATIONS].
pub fn migrate(&mut self, mut from: u32, to: u32) -> Result<()> {
if from == to {
return Ok(());
}
let undo = from > to;
let tx = self.connection.transaction()?;
while from != to {
log::trace!(
"exec migration from {from} to {to}, type: {}do",
if undo { "un " } else { "re" }
);
if undo {
trace!("{}", Self::MIGRATIONS[from as usize].2);
tx.execute(Self::MIGRATIONS[from as usize].2, [])?;
from -= 1;
} else {
trace!("{}", Self::MIGRATIONS[from as usize].1);
tx.execute(Self::MIGRATIONS[from as usize].1, [])?;
from += 1;
}
}
tx.pragma_update(None, "user_version", Self::MIGRATIONS[to as usize - 1].0)?;
tx.commit()?;
Ok(())
}
/// Removes operational limits from this connection. (see
@ -211,8 +272,22 @@ impl Connection {
let mut stdin = child.stdin.take().unwrap();
std::thread::spawn(move || {
stdin
.write_all(include_bytes!("./schema.sql"))
.write_all(Self::SCHEMA.as_bytes())
.expect("failed to write to stdin");
if !Self::MIGRATIONS.is_empty() {
stdin
.write_all(b"\nPRAGMA user_version = ")
.expect("failed to write to stdin");
stdin
.write_all(
Self::MIGRATIONS[Self::MIGRATIONS.len() - 1]
.0
.to_string()
.as_bytes(),
)
.expect("failed to write to stdin");
stdin.write_all(b";").expect("failed to write to stdin");
}
stdin.flush().expect("could not flush stdin");
});
let output = child.wait_with_output()?;

View File

@ -17,8 +17,9 @@
* along with this program. If not, see <https://www.gnu.org/licenses/>.
*/
//! Types for processing new posts: [`PostFilter`](message_filters::PostFilter),
//! [`ListContext`], [`MailJob`] and [`PostAction`].
//! Types for processing new posts:
//! [`PostFilter`](crate::message_filters::PostFilter), [`ListContext`],
//! [`MailJob`] and [`PostAction`].
use log::trace;
use melib::Address;
@ -28,7 +29,7 @@ use crate::{
DbVal,
};
/// Post action returned from a list's
/// [`PostFilter`](message_filters::PostFilter) stack.
/// [`PostFilter`](crate::message_filters::PostFilter) stack.
#[derive(Debug)]
pub enum PostAction {
/// Add to `hold` queue.
@ -47,8 +48,8 @@ pub enum PostAction {
},
}
/// List context passed to a list's [`PostFilter`](message_filters::PostFilter)
/// stack.
/// List context passed to a list's
/// [`PostFilter`](crate::message_filters::PostFilter) stack.
#[derive(Debug)]
pub struct ListContext<'list> {
/// Which mailing list a post was addressed to.
@ -62,12 +63,12 @@ pub struct ListContext<'list> {
/// The mailing list subscription policy.
pub subscription_policy: Option<DbVal<SubscriptionPolicy>>,
/// The scheduled jobs added by each filter in a list's
/// [`PostFilter`](message_filters::PostFilter) stack.
/// [`PostFilter`](crate::message_filters::PostFilter) stack.
pub scheduled_jobs: Vec<MailJob>,
}
/// Post to be considered by the list's
/// [`PostFilter`](message_filters::PostFilter) stack.
/// [`PostFilter`](crate::message_filters::PostFilter) stack.
pub struct PostEntry {
/// `From` address of post.
pub from: Address,
@ -76,7 +77,7 @@ pub struct PostEntry {
/// `To` addresses of post.
pub to: Vec<Address>,
/// Final action set by each filter in a list's
/// [`PostFilter`](message_filters::PostFilter) stack.
/// [`PostFilter`](crate::message_filters::PostFilter) stack.
pub action: PostAction,
}
@ -92,7 +93,7 @@ impl core::fmt::Debug for PostEntry {
}
/// Scheduled jobs added to a [`ListContext`] by a list's
/// [`PostFilter`](message_filters::PostFilter) stack.
/// [`PostFilter`](crate::message_filters::PostFilter) stack.
#[derive(Debug)]
pub enum MailJob {
/// Send post to recipients.

View File

@ -0,0 +1,11 @@
//(user_version, redo sql, undo sql
&[(1,"PRAGMA foreign_keys=ON;
BEGIN;
ALTER TABLE templates RENAME TO template;
COMMIT;
","PRAGMA foreign_keys=ON;
BEGIN;
ALTER TABLE template RENAME TO templates;
COMMIT;
"),]

View File

@ -256,7 +256,7 @@ CREATE TABLE IF NOT EXISTS post (
created INTEGER NOT NULL DEFAULT (unixepoch())
);
CREATE TABLE IF NOT EXISTS templates (
CREATE TABLE IF NOT EXISTS template (
pk INTEGER PRIMARY KEY NOT NULL,
name TEXT NOT NULL,
list INTEGER,
@ -457,13 +457,13 @@ BEGIN
WHERE pk = NEW.pk;
END;
-- [tag:last_modified_templates]: update last_modified on every change.
-- [tag:last_modified_template]: update last_modified on every change.
CREATE TRIGGER
IF NOT EXISTS last_modified_templates
AFTER UPDATE ON templates
IF NOT EXISTS last_modified_template
AFTER UPDATE ON template
FOR EACH ROW
WHEN NEW.last_modified != OLD.last_modified
BEGIN
UPDATE templates SET last_modified = unixepoch()
UPDATE template SET last_modified = unixepoch()
WHERE pk = NEW.pk;
END;

View File

@ -158,7 +158,7 @@ CREATE TABLE IF NOT EXISTS post (
created INTEGER NOT NULL DEFAULT (unixepoch())
);
CREATE TABLE IF NOT EXISTS templates (
CREATE TABLE IF NOT EXISTS template (
pk INTEGER PRIMARY KEY NOT NULL,
name TEXT NOT NULL,
list INTEGER,
@ -288,4 +288,4 @@ update_last_modified(`subscription_policy')
update_last_modified(`subscription')
update_last_modified(`account')
update_last_modified(`candidate_subscription')
update_last_modified(`templates')
update_last_modified(`template')

View File

@ -216,7 +216,7 @@ impl Connection {
pub fn fetch_templates(&self) -> Result<Vec<DbVal<Template>>> {
let mut stmt = self
.connection
.prepare("SELECT * FROM templates ORDER BY pk;")?;
.prepare("SELECT * FROM template ORDER BY pk;")?;
let iter = stmt.query_map(rusqlite::params![], |row| {
let pk = row.get("pk")?;
Ok(DbVal(
@ -248,7 +248,7 @@ impl Connection {
) -> Result<Option<DbVal<Template>>> {
let mut stmt = self
.connection
.prepare("SELECT * FROM templates WHERE name = ? AND list IS ?;")?;
.prepare("SELECT * FROM template WHERE name = ? AND list IS ?;")?;
let ret = stmt
.query_row(rusqlite::params![&template, &list_pk], |row| {
let pk = row.get("pk")?;
@ -268,7 +268,7 @@ impl Connection {
if ret.is_none() && list_pk.is_some() {
let mut stmt = self
.connection
.prepare("SELECT * FROM templates WHERE name = ? AND list IS NULL;")?;
.prepare("SELECT * FROM template WHERE name = ? AND list IS NULL;")?;
Ok(stmt
.query_row(rusqlite::params![&template], |row| {
let pk = row.get("pk")?;
@ -293,7 +293,7 @@ impl Connection {
/// Insert a named template.
pub fn add_template(&self, template: Template) -> Result<DbVal<Template>> {
let mut stmt = self.connection.prepare(
"INSERT INTO templates(name, list, subject, headers_json, body) VALUES(?, ?, ?, ?, ?) \
"INSERT INTO template(name, list, subject, headers_json, body) VALUES(?, ?, ?, ?, ?) \
RETURNING *;",
)?;
let ret = stmt
@ -345,7 +345,7 @@ impl Connection {
pub fn remove_template(&self, template: &str, list_pk: Option<i64>) -> Result<Template> {
let mut stmt = self
.connection
.prepare("DELETE FROM templates WHERE name = ? AND list IS ? RETURNING *;")?;
.prepare("DELETE FROM template WHERE name = ? AND list IS ? RETURNING *;")?;
let ret = stmt.query_row(rusqlite::params![&template, &list_pk], |row| {
Ok(Template {
pk: -1,

View File

@ -0,0 +1,51 @@
/*
* This file is part of mailpot
*
* Copyright 2020 - Manos Pitsidianakis
*
* This program is free software: you can redistribute it and/or modify
* it under the terms of the GNU Affero General Public License as
* published by the Free Software Foundation, either version 3 of the
* License, or (at your option) any later version.
*
* This program is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* GNU Affero General Public License for more details.
*
* You should have received a copy of the GNU Affero General Public License
* along with this program. If not, see <https://www.gnu.org/licenses/>.
*/
use mailpot::{Configuration, Connection, SendMail};
use mailpot_tests::init_stderr_logging;
use tempfile::TempDir;
#[test]
fn test_init_empty() {
init_stderr_logging();
let tmp_dir = TempDir::new().unwrap();
let db_path = tmp_dir.path().join("mpot.db");
let config = Configuration {
send_mail: SendMail::ShellCommand("/usr/bin/false".to_string()),
db_path,
data_path: tmp_dir.path().to_path_buf(),
administrators: vec![],
};
let mut db = Connection::open_or_create_db(config).unwrap().trusted();
let migrations = Connection::MIGRATIONS;
if migrations.is_empty() {
return;
}
let version = db.schema_version().unwrap();
assert_eq!(version, migrations[migrations.len() - 1].0);
db.migrate(version, migrations[0].0).unwrap();
db.migrate(migrations[0].0, version).unwrap();
}