Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

psql/migrations: when no-transaction is set, split multi-statement sql into individual statements and execute them separately #3574

Open
wants to merge 2 commits into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
53 changes: 53 additions & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

2 changes: 2 additions & 0 deletions sqlx-postgres/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -70,6 +70,8 @@ whoami = { version = "1.2.1", default-features = false }

serde = { version = "1.0.144", features = ["derive"] }
serde_json = { version = "1.0.85", features = ["raw_value"] }
pest = "2.7.14"
pest_derive = "2.7.14"

[dependencies.sqlx-core]
workspace = true
Expand Down
1 change: 1 addition & 0 deletions sqlx-postgres/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ mod message;
mod options;
mod query_result;
mod row;
mod split_sql;
mod statement;
mod transaction;
mod type_checking;
Expand Down
39 changes: 29 additions & 10 deletions sqlx-postgres/src/migrate.rs
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@ use crate::executor::Executor;
use crate::query::query;
use crate::query_as::query_as;
use crate::query_scalar::query_scalar;
use crate::split_sql::split_sql;
use crate::{PgConnectOptions, PgConnection, Postgres};

fn parse_for_maintenance(url: &str) -> Result<(PgConnectOptions, String), Error> {
Expand Down Expand Up @@ -209,7 +210,6 @@ CREATE TABLE IF NOT EXISTS _sqlx_migrations (
) -> BoxFuture<'m, Result<Duration, MigrateError>> {
Box::pin(async move {
let start = Instant::now();

// execute migration queries
if migration.no_tx {
execute_migration(self, migration).await?;
Expand Down Expand Up @@ -276,10 +276,20 @@ async fn execute_migration(
conn: &mut PgConnection,
migration: &Migration,
) -> Result<(), MigrateError> {
let _ = conn
.execute(&*migration.sql)
.await
.map_err(|e| MigrateError::ExecuteMigration(e, migration.version))?;
if migration.no_tx {
let statements = split_sql(&*migration.sql);
for sql in statements.iter() {
let _ = conn
.execute(sql.as_str())
.await
.map_err(|e| MigrateError::ExecuteMigration(e, migration.version))?;
}
} else {
let _ = conn
.execute(&*migration.sql)
.await
.map_err(|e| MigrateError::ExecuteMigration(e, migration.version))?;
}

// language=SQL
let _ = query(
Expand All @@ -301,11 +311,20 @@ async fn revert_migration(
conn: &mut PgConnection,
migration: &Migration,
) -> Result<(), MigrateError> {
let _ = conn
.execute(&*migration.sql)
.await
.map_err(|e| MigrateError::ExecuteMigration(e, migration.version))?;

if migration.no_tx {
let statements = split_sql(&*migration.sql);
for sql in statements.iter() {
let _ = conn
.execute(sql.as_str())
.await
.map_err(|e| MigrateError::ExecuteMigration(e, migration.version))?;
}
} else {
let _ = conn
.execute(&*migration.sql)
.await
.map_err(|e| MigrateError::ExecuteMigration(e, migration.version))?;
}
// language=SQL
let _ = query(r#"DELETE FROM _sqlx_migrations WHERE version = $1"#)
.bind(migration.version)
Expand Down
130 changes: 130 additions & 0 deletions sqlx-postgres/src/split_sql.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,130 @@
use pest::Parser;
use pest_derive::Parser;

#[derive(Parser)]
#[grammar_inline = r#"
// The top-level rule matches the entire SQL input
sql = { SOI ~ statement* ~ EOI }

// A statement consists of optional leading comments and whitespace, content, and is terminated by a semicolon or end of input
statement = { (WHITESPACE | COMMENT)* ~ statement_content ~ (semicolon | &EOI) }

// Statement content is a sequence of constructs, comments, whitespace, or non-construct characters
statement_content = { (construct | COMMENT | WHITESPACE | non_construct_char)+ }

// Constructs that may contain semicolons internally
construct = { DOLLAR_QUOTED_STRING | SINGLE_QUOTED_STRING | DOUBLE_QUOTED_IDENTIFIER }

// Non-construct characters are any characters except semicolons
non_construct_char = { !semicolon ~ ANY }

// Semicolon outside constructs acts as a statement terminator
semicolon = { ";" }

// Single-quoted string literals, handling escaped quotes
SINGLE_QUOTED_STRING = { "'" ~ SINGLE_QUOTED_CONTENT ~ ("'" | EOI) }
SINGLE_QUOTED_CONTENT = { ( "''" | !("'" | EOI) ~ ANY )* }

// Double-quoted identifiers, handling escaped quotes
DOUBLE_QUOTED_IDENTIFIER = { "\"" ~ DOUBLE_QUOTED_IDENTIFIER_CONTENT ~ ("\"" | EOI) }
DOUBLE_QUOTED_IDENTIFIER_CONTENT = { ( "\"\"" | !("\"" | EOI) ~ ANY )* }

// Dollar-quoted strings, handling custom tags
DOLLAR_QUOTED_STRING = { DOLLAR_QUOTE_START ~ DOLLAR_QUOTED_CONTENT ~ DOLLAR_QUOTE_END }
DOLLAR_QUOTE_START = { "$" ~ DOLLAR_QUOTE_TAG ~ "$" }
DOLLAR_QUOTE_TAG = { ASCII_ALPHANUMERIC* }
DOLLAR_QUOTED_CONTENT = { ( !DOLLAR_QUOTE_END ~ ANY )* }
DOLLAR_QUOTE_END = { "$" ~ DOLLAR_QUOTE_TAG ~ "$" }

// Comments (single-line and multi-line)
COMMENT = { SINGLE_LINE_COMMENT | MULTI_LINE_COMMENT }
SINGLE_LINE_COMMENT = { "--" ~ (!NEWLINE ~ ANY)* ~ NEWLINE? }

MULTI_LINE_COMMENT = { "/*" ~ MULTI_LINE_COMMENT_CONTENT* ~ ( "*/" | EOI ) }
MULTI_LINE_COMMENT_CONTENT = { MULTI_LINE_COMMENT | (!"/*" ~ !"*/" ~ ANY) }

// Whitespace rules
WHITESPACE = { " " | "\t" | NEWLINE }
NEWLINE = { "\r\n" | "\n" | "\r" }
"#]
struct PsqlSpliter;

/// Splits a PostgreSQL query string into it's individual statements.
///
/// This function parses and splits a SQL input string into separate statements, handling
/// PostgreSQL-specific syntax elements such as:
///
/// - **Dollar-quoted strings**: Supports custom dollar-quoted tags (e.g., `$$`, `$tag$`).
/// - **Single and double-quoted strings**: Handles escaped quotes inside strings.
/// - **Comments**: Supports single-line (`--`) and multi-line (`/* ... */`) comments, preserving them as part of the statement.
/// - **Whitespace**: Retains all leading and trailing whitespace and comments around each statement.
/// - **Semicolons**: Recognizes semicolons as statement terminators, while ignoring them inside strings or comments.
///
/// If parsing fails or only one statement is found, the input is returned in full.
///
/// ```no_run
/// use sql_split_pest::split_psql;
/// let sql = r#"
/// -- First query
/// INSERT INTO users (name) VALUES ('Alice; Bob');
///
/// -- Second query
/// SELECT * FROM posts;
///
/// /* Multi-line
/// comment */
/// CREATE FUNCTION test_function()
/// RETURNS VOID AS $$
/// BEGIN
/// -- Multiple statements inside the function
/// INSERT INTO table_a VALUES (1);
/// INSERT INTO table_b VALUES (2);
/// END;
/// $$ LANGUAGE plpgsql;
///
/// -- invalid sql
/// SELECT 'This is an unterminated string FROM users;
/// SELECT * FROM users WHERE name = AND email = '[email protected]';
/// SELECT * FROM users JOIN other_table ON;
///
/// "#;
///
/// let statements = split_psql(sql);
/// dbg!(&statements);
/// assert_eq!(statements.len(), 4);
/// assert!(statements[0].contains("INSERT INTO users"));
/// assert!(statements[1].contains("SELECT * FROM posts"));
/// assert!(statements[2].contains("CREATE FUNCTION"));
/// assert!(statements[2].contains("plpgsql"));
/// assert!(statements[3].contains("other_table"));
/// ```
pub fn split_sql<S: AsRef<str>>(sql: S) -> Vec<String> {
let sql_str = sql.as_ref();

PsqlSpliter::parse(Rule::sql, sql_str).map_or_else(
|_| vec![sql_str.to_string()],
|mut parsed| match parsed.next() {
// this should never happen
None => vec![sql_str.to_string()],
Some(sql) => {
let mut statements = Vec::new();
let mut statement = String::new();
for pair in sql.into_inner() {
match pair.as_rule() {
Rule::WHITESPACE | Rule::COMMENT => statement.push_str(pair.as_str()),
Rule::statement | Rule::EOI => {
statement.push_str(pair.as_str());
// omit empty whitespace at the end of sql
if !statement.is_empty() && !statement.chars().all(char::is_whitespace)
{
statements.push(std::mem::take(&mut statement));
}
}
_ => unreachable!(),
}
}
statements
}
},
)
}
9 changes: 8 additions & 1 deletion tests/postgres/migrate.rs
Original file line number Diff line number Diff line change
Expand Up @@ -74,14 +74,21 @@ async fn no_tx(mut conn: PoolConnection<Postgres>) -> anyhow::Result<()> {
// run migration
migrator.run(&mut conn).await?;

// check outcome
// check outcomes
let res: String = conn
.fetch_one("SELECT datname FROM pg_database WHERE datname = 'test_db'")
.await?
.get(0);

assert_eq!(res, "test_db");

let res: String = conn
.fetch_one("SELECT email FROM users WHERE username = 'test_user'")
.await?
.get(0);

assert_eq!(res, "[email protected]");

Ok(())
}

Expand Down
11 changes: 11 additions & 0 deletions tests/postgres/migrations_no_tx/0_create_db.sql
Original file line number Diff line number Diff line change
@@ -1,3 +1,14 @@
-- no-transaction

CREATE DATABASE test_db;

CREATE TABLE users (
id SERIAL PRIMARY KEY,
username VARCHAR(50) NOT NULL,
email VARCHAR(100) NOT NULL,
created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP
);

CREATE INDEX CONCURRENTLY idx_users_email ON users(email);

INSERT INTO users (username, email) VALUES ('test_user', '[email protected]');