From 0cd7843d8fd6f92acf879e0ef02bb7798474955b Mon Sep 17 00:00:00 2001 From: Joey de Waal Date: Wed, 25 Sep 2024 23:58:22 +0200 Subject: [PATCH 1/7] Remove unnecessary BoxFutures --- sqlx-core/src/any/connection/mod.rs | 24 +++--- sqlx-core/src/any/options.rs | 5 +- sqlx-core/src/any/transaction.rs | 14 ++- sqlx-core/src/connection.rs | 28 +++--- sqlx-core/src/transaction.rs | 8 +- sqlx-mysql/src/connection/mod.rs | 63 ++++++-------- sqlx-mysql/src/options/connect.rs | 123 +++++++++++++-------------- sqlx-mysql/src/transaction.rs | 46 +++++----- sqlx-postgres/src/any.rs | 16 ++-- sqlx-postgres/src/connection/mod.rs | 70 +++++++-------- sqlx-postgres/src/copy.rs | 18 ++-- sqlx-postgres/src/options/connect.rs | 5 +- sqlx-postgres/src/transaction.rs | 54 +++++------- sqlx-sqlite/src/connection/mod.rs | 59 ++++++------- sqlx-sqlite/src/options/connect.rs | 22 +++-- sqlx-sqlite/src/transaction.rs | 14 ++- 16 files changed, 259 insertions(+), 310 deletions(-) diff --git a/sqlx-core/src/any/connection/mod.rs b/sqlx-core/src/any/connection/mod.rs index b6f795848a..7d19c6f81f 100644 --- a/sqlx-core/src/any/connection/mod.rs +++ b/sqlx-core/src/any/connection/mod.rs @@ -71,31 +71,31 @@ impl Connection for AnyConnection { type Options = AnyConnectOptions; - fn close(self) -> BoxFuture<'static, Result<(), Error>> { - self.backend.close() + async fn close(self) -> Result<(), Error> { + self.backend.close().await } - fn close_hard(self) -> BoxFuture<'static, Result<(), Error>> { - self.backend.close() + async fn close_hard(self) -> Result<(), Error> { + self.backend.close().await } - fn ping(&mut self) -> BoxFuture<'_, Result<(), Error>> { - self.backend.ping() + async fn ping(&mut self) -> Result<(), Error> { + self.backend.ping().await } - fn begin(&mut self) -> BoxFuture<'_, Result, Error>> + async fn begin(&mut self) -> Result, Error> where Self: Sized, { - Transaction::begin(self) + Transaction::begin(self).await } fn cached_statements_size(&self) -> usize { self.backend.cached_statements_size() } - fn clear_cached_statements(&mut self) -> BoxFuture<'_, crate::Result<()>> { - self.backend.clear_cached_statements() + async fn clear_cached_statements(&mut self) -> crate::Result<()> { + self.backend.clear_cached_statements().await } fn shrink_buffers(&mut self) { @@ -103,8 +103,8 @@ impl Connection for AnyConnection { } #[doc(hidden)] - fn flush(&mut self) -> BoxFuture<'_, Result<(), Error>> { - self.backend.flush() + async fn flush(&mut self) -> Result<(), Error> { + self.backend.flush().await } #[doc(hidden)] diff --git a/sqlx-core/src/any/options.rs b/sqlx-core/src/any/options.rs index bb29d817c9..cf171b95de 100644 --- a/sqlx-core/src/any/options.rs +++ b/sqlx-core/src/any/options.rs @@ -1,7 +1,6 @@ use crate::any::AnyConnection; use crate::connection::{ConnectOptions, LogSettings}; use crate::error::Error; -use futures_core::future::BoxFuture; use log::LevelFilter; use std::str::FromStr; use std::time::Duration; @@ -48,8 +47,8 @@ impl ConnectOptions for AnyConnectOptions { } #[inline] - fn connect(&self) -> BoxFuture<'_, Result> { - AnyConnection::connect(self) + async fn connect(&self) -> Result { + AnyConnection::connect(self).await } fn log_statements(mut self, level: LevelFilter) -> Self { diff --git a/sqlx-core/src/any/transaction.rs b/sqlx-core/src/any/transaction.rs index fce4175626..c20d5ed7a1 100644 --- a/sqlx-core/src/any/transaction.rs +++ b/sqlx-core/src/any/transaction.rs @@ -1,5 +1,3 @@ -use futures_util::future::BoxFuture; - use crate::any::{Any, AnyConnection}; use crate::error::Error; use crate::transaction::TransactionManager; @@ -9,16 +7,16 @@ pub struct AnyTransactionManager; impl TransactionManager for AnyTransactionManager { type Database = Any; - fn begin(conn: &mut AnyConnection) -> BoxFuture<'_, Result<(), Error>> { - conn.backend.begin() + async fn begin(conn: &mut AnyConnection) -> Result<(), Error> { + conn.backend.begin().await } - fn commit(conn: &mut AnyConnection) -> BoxFuture<'_, Result<(), Error>> { - conn.backend.commit() + async fn commit(conn: &mut AnyConnection) -> Result<(), Error> { + conn.backend.commit().await } - fn rollback(conn: &mut AnyConnection) -> BoxFuture<'_, Result<(), Error>> { - conn.backend.rollback() + async fn rollback(conn: &mut AnyConnection) -> Result<(), Error> { + conn.backend.rollback().await } fn start_rollback(conn: &mut AnyConnection) { diff --git a/sqlx-core/src/connection.rs b/sqlx-core/src/connection.rs index ce2aa6c629..8d14111970 100644 --- a/sqlx-core/src/connection.rs +++ b/sqlx-core/src/connection.rs @@ -1,4 +1,5 @@ use crate::database::{Database, HasStatementCache}; +use std::future::Future; use crate::error::Error; use crate::transaction::Transaction; @@ -31,21 +32,21 @@ pub trait Connection: Send { /// /// Therefore it is recommended to call `.close()` on a connection when you are done using it /// and to `.await` the result to ensure the termination message is sent. - fn close(self) -> BoxFuture<'static, Result<(), Error>>; + fn close(self) -> impl Future> + Send + 'static; /// Immediately close the connection without sending a graceful shutdown. /// /// This should still at least send a TCP `FIN` frame to let the server know we're dying. #[doc(hidden)] - fn close_hard(self) -> BoxFuture<'static, Result<(), Error>>; + fn close_hard(self) -> impl Future> + Send + 'static; /// Checks if a connection to the database is still valid. - fn ping(&mut self) -> BoxFuture<'_, Result<(), Error>>; + fn ping(&mut self) -> impl Future> + Send + '_; /// Begin a new transaction or establish a savepoint within the active transaction. /// /// Returns a [`Transaction`] for controlling and tracking the new transaction. - fn begin(&mut self) -> BoxFuture<'_, Result, Error>> + fn begin(&mut self) -> impl Future, Error>> + Send + '_ where Self: Sized; @@ -66,7 +67,7 @@ pub trait Connection: Send { /// })).await /// # } /// ``` - fn transaction<'a, F, R, E>(&'a mut self, callback: F) -> BoxFuture<'a, Result> + fn transaction<'a, F, R, E>(&'a mut self, callback: F) -> impl Future> + Send + 'a where for<'c> F: FnOnce(&'c mut Transaction<'_, Self::Database>) -> BoxFuture<'c, Result> + 'a @@ -105,11 +106,11 @@ pub trait Connection: Send { /// Removes all statements from the cache, closing them on the server if /// needed. - fn clear_cached_statements(&mut self) -> BoxFuture<'_, Result<(), Error>> + fn clear_cached_statements(&mut self) -> impl Future> + Send + '_ where Self::Database: HasStatementCache, { - Box::pin(async move { Ok(()) }) + async { Ok(()) } } /// Restore any buffers in the connection to their default capacity, if possible. @@ -127,7 +128,7 @@ pub trait Connection: Send { fn shrink_buffers(&mut self); #[doc(hidden)] - fn flush(&mut self) -> BoxFuture<'_, Result<(), Error>>; + fn flush(&mut self) -> impl Future> + Send + '_; #[doc(hidden)] fn should_flush(&self) -> bool; @@ -136,7 +137,7 @@ pub trait Connection: Send { /// /// A value of [`Options`][Self::Options] is parsed from the provided connection string. This parsing /// is database-specific. - #[inline] + #[inline] fn connect(url: &str) -> BoxFuture<'static, Result> where Self: Sized, @@ -146,12 +147,13 @@ pub trait Connection: Send { Box::pin(async move { Self::connect_with(&options?).await }) } + /// Establish a new database connection with the provided options. - fn connect_with(options: &Self::Options) -> BoxFuture<'_, Result> + fn connect_with<'a>(options: &'a Self::Options) -> BoxFuture<'_, Result> where - Self: Sized, + Self: Sized + 'a, { - options.connect() + Box::pin(options.connect()) } } @@ -219,7 +221,7 @@ pub trait ConnectOptions: 'static + Send + Sync + FromStr + Debug + } /// Establish a new database connection with the options specified by `self`. - fn connect(&self) -> BoxFuture<'_, Result> + fn connect(&self) -> impl Future> + Send + '_ where Self::Connection: Sized; diff --git a/sqlx-core/src/transaction.rs b/sqlx-core/src/transaction.rs index 9cd38aab3a..67ee3a41f4 100644 --- a/sqlx-core/src/transaction.rs +++ b/sqlx-core/src/transaction.rs @@ -1,4 +1,5 @@ use std::borrow::Cow; +use std::future::Future; use std::fmt::{self, Debug, Formatter}; use std::ops::{Deref, DerefMut}; @@ -18,17 +19,18 @@ pub trait TransactionManager { /// Begin a new transaction or establish a savepoint within the active transaction. fn begin( conn: &mut ::Connection, - ) -> BoxFuture<'_, Result<(), Error>>; + ) -> impl Future> + Send; /// Commit the active transaction or release the most recent savepoint. fn commit( conn: &mut ::Connection, - ) -> BoxFuture<'_, Result<(), Error>>; + ) -> impl Future> + Send; /// Abort the active transaction or restore from the most recent savepoint. fn rollback( conn: &mut ::Connection, - ) -> BoxFuture<'_, Result<(), Error>>; + ) -> impl Future> + Send; + /// Starts to abort the active transaction or restore from the most recent snapshot. fn start_rollback(conn: &mut ::Connection); diff --git a/sqlx-mysql/src/connection/mod.rs b/sqlx-mysql/src/connection/mod.rs index c4978a7701..d17b891a97 100644 --- a/sqlx-mysql/src/connection/mod.rs +++ b/sqlx-mysql/src/connection/mod.rs @@ -1,6 +1,5 @@ use std::fmt::{self, Debug, Formatter}; -use futures_core::future::BoxFuture; use futures_util::FutureExt; pub(crate) use sqlx_core::connection::*; pub(crate) use stream::{MySqlStream, Waiting}; @@ -52,54 +51,46 @@ impl Connection for MySqlConnection { type Options = MySqlConnectOptions; - fn close(mut self) -> BoxFuture<'static, Result<(), Error>> { - Box::pin(async move { - self.inner.stream.send_packet(Quit).await?; - self.inner.stream.shutdown().await?; + async fn close(mut self) -> Result<(), Error> { + self.inner.stream.send_packet(Quit).await?; + self.inner.stream.shutdown().await?; - Ok(()) - }) + Ok(()) } - fn close_hard(mut self) -> BoxFuture<'static, Result<(), Error>> { - Box::pin(async move { - self.inner.stream.shutdown().await?; - Ok(()) - }) + async fn close_hard(mut self) -> Result<(), Error> { + self.inner.stream.shutdown().await?; + Ok(()) } - fn ping(&mut self) -> BoxFuture<'_, Result<(), Error>> { - Box::pin(async move { - self.inner.stream.wait_until_ready().await?; - self.inner.stream.send_packet(Ping).await?; - self.inner.stream.recv_ok().await?; + async fn ping(&mut self) -> Result<(), Error> { + self.inner.stream.wait_until_ready().await?; + self.inner.stream.send_packet(Ping).await?; + self.inner.stream.recv_ok().await?; - Ok(()) - }) + Ok(()) } #[doc(hidden)] - fn flush(&mut self) -> BoxFuture<'_, Result<(), Error>> { - self.inner.stream.wait_until_ready().boxed() + async fn flush(&mut self) -> Result<(), Error> { + self.inner.stream.wait_until_ready().await } fn cached_statements_size(&self) -> usize { self.inner.cache_statement.len() } - fn clear_cached_statements(&mut self) -> BoxFuture<'_, Result<(), Error>> { - Box::pin(async move { - while let Some((statement_id, _)) = self.inner.cache_statement.remove_lru() { - self.inner - .stream - .send_packet(StmtClose { - statement: statement_id, - }) - .await?; - } - - Ok(()) - }) + async fn clear_cached_statements(&mut self) -> Result<(), Error> { + while let Some((statement_id, _)) = self.inner.cache_statement.remove_lru() { + self.inner + .stream + .send_packet(StmtClose { + statement: statement_id, + }) + .await?; + } + + Ok(()) } #[doc(hidden)] @@ -107,11 +98,11 @@ impl Connection for MySqlConnection { !self.inner.stream.write_buffer().is_empty() } - fn begin(&mut self) -> BoxFuture<'_, Result, Error>> + async fn begin(&mut self) -> Result, Error> where Self: Sized, { - Transaction::begin(self) + Transaction::begin(self).await } fn shrink_buffers(&mut self) { diff --git a/sqlx-mysql/src/options/connect.rs b/sqlx-mysql/src/options/connect.rs index 116a49ccad..58bf21ac2f 100644 --- a/sqlx-mysql/src/options/connect.rs +++ b/sqlx-mysql/src/options/connect.rs @@ -2,7 +2,6 @@ use crate::connection::ConnectOptions; use crate::error::Error; use crate::executor::Executor; use crate::{MySqlConnectOptions, MySqlConnection}; -use futures_core::future::BoxFuture; use log::LevelFilter; use sqlx_core::Url; use std::time::Duration; @@ -18,71 +17,69 @@ impl ConnectOptions for MySqlConnectOptions { self.build_url() } - fn connect(&self) -> BoxFuture<'_, Result> + async fn connect(&self) -> Result where Self::Connection: Sized, { - Box::pin(async move { - let mut conn = MySqlConnection::establish(self).await?; - - // After the connection is established, we initialize by configuring a few - // connection parameters - - // https://mariadb.com/kb/en/sql-mode/ - - // PIPES_AS_CONCAT - Allows using the pipe character (ASCII 124) as string concatenation operator. - // This means that "A" || "B" can be used in place of CONCAT("A", "B"). - - // NO_ENGINE_SUBSTITUTION - If not set, if the available storage engine specified by a CREATE TABLE is - // not available, a warning is given and the default storage - // engine is used instead. - - // NO_ZERO_DATE - Don't allow '0000-00-00'. This is invalid in Rust. - - // NO_ZERO_IN_DATE - Don't allow 'YYYY-00-00'. This is invalid in Rust. - - // -- - - // Setting the time zone allows us to assume that the output - // from a TIMESTAMP field is UTC - - // -- - - // https://mathiasbynens.be/notes/mysql-utf8mb4 - - let mut sql_mode = Vec::new(); - if self.pipes_as_concat { - sql_mode.push(r#"PIPES_AS_CONCAT"#); - } - if self.no_engine_substitution { - sql_mode.push(r#"NO_ENGINE_SUBSTITUTION"#); - } - - let mut options = Vec::new(); - if !sql_mode.is_empty() { - options.push(format!( - r#"sql_mode=(SELECT CONCAT(@@sql_mode, ',{}'))"#, - sql_mode.join(",") - )); - } - if let Some(timezone) = &self.timezone { - options.push(format!(r#"time_zone='{}'"#, timezone)); - } - if self.set_names { - options.push(format!( - r#"NAMES {} COLLATE {}"#, - conn.inner.stream.charset.as_str(), - conn.inner.stream.collation.as_str() - )) - } - - if !options.is_empty() { - conn.execute(&*format!(r#"SET {};"#, options.join(","))) - .await?; - } - - Ok(conn) - }) + let mut conn = MySqlConnection::establish(self).await?; + + // After the connection is established, we initialize by configuring a few + // connection parameters + + // https://mariadb.com/kb/en/sql-mode/ + + // PIPES_AS_CONCAT - Allows using the pipe character (ASCII 124) as string concatenation operator. + // This means that "A" || "B" can be used in place of CONCAT("A", "B"). + + // NO_ENGINE_SUBSTITUTION - If not set, if the available storage engine specified by a CREATE TABLE is + // not available, a warning is given and the default storage + // engine is used instead. + + // NO_ZERO_DATE - Don't allow '0000-00-00'. This is invalid in Rust. + + // NO_ZERO_IN_DATE - Don't allow 'YYYY-00-00'. This is invalid in Rust. + + // -- + + // Setting the time zone allows us to assume that the output + // from a TIMESTAMP field is UTC + + // -- + + // https://mathiasbynens.be/notes/mysql-utf8mb4 + + let mut sql_mode = Vec::new(); + if self.pipes_as_concat { + sql_mode.push(r#"PIPES_AS_CONCAT"#); + } + if self.no_engine_substitution { + sql_mode.push(r#"NO_ENGINE_SUBSTITUTION"#); + } + + let mut options = Vec::new(); + if !sql_mode.is_empty() { + options.push(format!( + r#"sql_mode=(SELECT CONCAT(@@sql_mode, ',{}'))"#, + sql_mode.join(",") + )); + } + if let Some(timezone) = &self.timezone { + options.push(format!(r#"time_zone='{}'"#, timezone)); + } + if self.set_names { + options.push(format!( + r#"NAMES {} COLLATE {}"#, + conn.inner.stream.charset.as_str(), + conn.inner.stream.collation.as_str() + )) + } + + if !options.is_empty() { + conn.execute(&*format!(r#"SET {};"#, options.join(","))) + .await?; + } + + Ok(conn) } fn log_statements(mut self, level: LevelFilter) -> Self { diff --git a/sqlx-mysql/src/transaction.rs b/sqlx-mysql/src/transaction.rs index d8538cc2b3..c0967dc3ba 100644 --- a/sqlx-mysql/src/transaction.rs +++ b/sqlx-mysql/src/transaction.rs @@ -1,5 +1,3 @@ -use futures_core::future::BoxFuture; - use crate::connection::Waiting; use crate::error::Error; use crate::executor::Executor; @@ -14,41 +12,35 @@ pub struct MySqlTransactionManager; impl TransactionManager for MySqlTransactionManager { type Database = MySql; - fn begin(conn: &mut MySqlConnection) -> BoxFuture<'_, Result<(), Error>> { - Box::pin(async move { - let depth = conn.inner.transaction_depth; + async fn begin(conn: &mut MySqlConnection) -> Result<(), Error> { + let depth = conn.inner.transaction_depth; - conn.execute(&*begin_ansi_transaction_sql(depth)).await?; - conn.inner.transaction_depth = depth + 1; + conn.execute(&*begin_ansi_transaction_sql(depth)).await?; + conn.inner.transaction_depth = depth + 1; - Ok(()) - }) + Ok(()) } - fn commit(conn: &mut MySqlConnection) -> BoxFuture<'_, Result<(), Error>> { - Box::pin(async move { - let depth = conn.inner.transaction_depth; + async fn commit(conn: &mut MySqlConnection) -> Result<(), Error> { + let depth = conn.inner.transaction_depth; - if depth > 0 { - conn.execute(&*commit_ansi_transaction_sql(depth)).await?; - conn.inner.transaction_depth = depth - 1; - } + if depth > 0 { + conn.execute(&*commit_ansi_transaction_sql(depth)).await?; + conn.inner.transaction_depth = depth - 1; + } - Ok(()) - }) + Ok(()) } - fn rollback(conn: &mut MySqlConnection) -> BoxFuture<'_, Result<(), Error>> { - Box::pin(async move { - let depth = conn.inner.transaction_depth; + async fn rollback(conn: &mut MySqlConnection) -> Result<(), Error> { + let depth = conn.inner.transaction_depth; - if depth > 0 { - conn.execute(&*rollback_ansi_transaction_sql(depth)).await?; - conn.inner.transaction_depth = depth - 1; - } + if depth > 0 { + conn.execute(&*rollback_ansi_transaction_sql(depth)).await?; + conn.inner.transaction_depth = depth - 1; + } - Ok(()) - }) + Ok(()) } fn start_rollback(conn: &mut MySqlConnection) { diff --git a/sqlx-postgres/src/any.rs b/sqlx-postgres/src/any.rs index 7eae4bcb73..cf374f3fae 100644 --- a/sqlx-postgres/src/any.rs +++ b/sqlx-postgres/src/any.rs @@ -4,7 +4,7 @@ use crate::{ }; use futures_core::future::BoxFuture; use futures_core::stream::BoxStream; -use futures_util::{stream, StreamExt, TryFutureExt, TryStreamExt}; +use futures_util::{stream, FutureExt, StreamExt, TryFutureExt, TryStreamExt}; use std::future; pub use sqlx_core::any::*; @@ -25,27 +25,27 @@ impl AnyConnectionBackend for PgConnection { } fn close(self: Box) -> BoxFuture<'static, sqlx_core::Result<()>> { - Connection::close(*self) + Connection::close(*self).boxed() } fn close_hard(self: Box) -> BoxFuture<'static, sqlx_core::Result<()>> { - Connection::close_hard(*self) + Connection::close_hard(*self).boxed() } fn ping(&mut self) -> BoxFuture<'_, sqlx_core::Result<()>> { - Connection::ping(self) + Connection::ping(self).boxed() } fn begin(&mut self) -> BoxFuture<'_, sqlx_core::Result<()>> { - PgTransactionManager::begin(self) + PgTransactionManager::begin(self).boxed() } fn commit(&mut self) -> BoxFuture<'_, sqlx_core::Result<()>> { - PgTransactionManager::commit(self) + PgTransactionManager::commit(self).boxed() } fn rollback(&mut self) -> BoxFuture<'_, sqlx_core::Result<()>> { - PgTransactionManager::rollback(self) + PgTransactionManager::rollback(self).boxed() } fn start_rollback(&mut self) { @@ -57,7 +57,7 @@ impl AnyConnectionBackend for PgConnection { } fn flush(&mut self) -> BoxFuture<'_, sqlx_core::Result<()>> { - Connection::flush(self) + Connection::flush(self).boxed() } fn should_flush(&self) -> bool { diff --git a/sqlx-postgres/src/connection/mod.rs b/sqlx-postgres/src/connection/mod.rs index c139f8e53d..d4237a8c40 100644 --- a/sqlx-postgres/src/connection/mod.rs +++ b/sqlx-postgres/src/connection/mod.rs @@ -2,8 +2,6 @@ use std::fmt::{self, Debug, Formatter}; use std::sync::Arc; use crate::HashMap; -use futures_core::future::BoxFuture; -use futures_util::FutureExt; use crate::common::StatementCache; use crate::error::Error; @@ -140,75 +138,67 @@ impl Connection for PgConnection { type Options = PgConnectOptions; - fn close(mut self) -> BoxFuture<'static, Result<(), Error>> { + async fn close(mut self) -> Result<(), Error> { // The normal, graceful termination procedure is that the frontend sends a Terminate // message and immediately closes the connection. // On receipt of this message, the backend closes the // connection and terminates. - Box::pin(async move { - self.inner.stream.send(Terminate).await?; - self.inner.stream.shutdown().await?; + self.inner.stream.send(Terminate).await?; + self.inner.stream.shutdown().await?; - Ok(()) - }) + Ok(()) } - fn close_hard(mut self) -> BoxFuture<'static, Result<(), Error>> { - Box::pin(async move { - self.inner.stream.shutdown().await?; + async fn close_hard(mut self) -> Result<(), Error> { + self.inner.stream.shutdown().await?; - Ok(()) - }) + Ok(()) } - fn ping(&mut self) -> BoxFuture<'_, Result<(), Error>> { + async fn ping(&mut self) -> Result<(), Error> { // Users were complaining about this showing up in query statistics on the server. // By sending a comment we avoid an error if the connection was in the middle of a rowset // self.execute("/* SQLx ping */").map_ok(|_| ()).boxed() - Box::pin(async move { - // The simplest call-and-response that's possible. - self.write_sync(); - self.wait_until_ready().await - }) + // The simplest call-and-response that's possible. + self.write_sync(); + self.wait_until_ready().await } - fn begin(&mut self) -> BoxFuture<'_, Result, Error>> + async fn begin(&mut self) -> Result, Error> where Self: Sized, { - Transaction::begin(self) + Transaction::begin(self).await } fn cached_statements_size(&self) -> usize { self.inner.cache_statement.len() } - fn clear_cached_statements(&mut self) -> BoxFuture<'_, Result<(), Error>> { - Box::pin(async move { - self.inner.cache_type_oid.clear(); + async fn clear_cached_statements(&mut self) -> Result<(), Error> { + self.inner.cache_type_oid.clear(); - let mut cleared = 0_usize; + let mut cleared = 0_usize; - self.wait_until_ready().await?; + self.wait_until_ready().await?; - while let Some((id, _)) = self.inner.cache_statement.remove_lru() { - self.inner.stream.write_msg(Close::Statement(id))?; - cleared += 1; - } + while let Some((id, _)) = self.inner.cache_statement.remove_lru() { + self.inner.stream.write_msg(Close::Statement(id))?; + cleared += 1; + } - if cleared > 0 { - self.write_sync(); - self.inner.stream.flush().await?; + if cleared > 0 { + self.write_sync(); + self.inner.stream.flush().await?; - self.wait_for_close_complete(cleared).await?; - self.recv_ready_for_query().await?; - } + self.wait_for_close_complete(cleared).await?; + self.recv_ready_for_query().await?; + } - Ok(()) - }) + Ok(()) } fn shrink_buffers(&mut self) { @@ -216,8 +206,8 @@ impl Connection for PgConnection { } #[doc(hidden)] - fn flush(&mut self) -> BoxFuture<'_, Result<(), Error>> { - self.wait_until_ready().boxed() + async fn flush(&mut self) -> Result<(), Error> { + self.wait_until_ready().await } #[doc(hidden)] diff --git a/sqlx-postgres/src/copy.rs b/sqlx-postgres/src/copy.rs index ddc187e993..26ec2cc12d 100644 --- a/sqlx-postgres/src/copy.rs +++ b/sqlx-postgres/src/copy.rs @@ -1,7 +1,7 @@ use std::borrow::Cow; +use std::future::Future; use std::ops::{Deref, DerefMut}; -use futures_core::future::BoxFuture; use futures_core::stream::BoxStream; use sqlx_core::bytes::{BufMut, Bytes}; @@ -86,7 +86,7 @@ pub trait PgPoolCopyExt { fn copy_in_raw<'a>( &'a self, statement: &'a str, - ) -> BoxFuture<'a, Result>>>; + ) -> impl Future>>> + Send + 'a; /// Issue a `COPY TO STDOUT` statement and begin streaming data /// from Postgres. This is a more efficient way to export data from Postgres but @@ -110,22 +110,22 @@ pub trait PgPoolCopyExt { fn copy_out_raw<'a>( &'a self, statement: &'a str, - ) -> BoxFuture<'a, Result>>>; + ) -> impl Future>>> + Send + 'a; } impl PgPoolCopyExt for Pool { - fn copy_in_raw<'a>( + async fn copy_in_raw<'a>( &'a self, statement: &'a str, - ) -> BoxFuture<'a, Result>>> { - Box::pin(async { PgCopyIn::begin(self.acquire().await?, statement).await }) + ) -> Result>> { + PgCopyIn::begin(self.acquire().await?, statement).await } - fn copy_out_raw<'a>( + async fn copy_out_raw<'a>( &'a self, statement: &'a str, - ) -> BoxFuture<'a, Result>>> { - Box::pin(async { pg_begin_copy_out(self.acquire().await?, statement).await }) + ) -> Result>> { + pg_begin_copy_out(self.acquire().await?, statement).await } } diff --git a/sqlx-postgres/src/options/connect.rs b/sqlx-postgres/src/options/connect.rs index bc6e4adce9..07a9b6b6be 100644 --- a/sqlx-postgres/src/options/connect.rs +++ b/sqlx-postgres/src/options/connect.rs @@ -1,7 +1,6 @@ use crate::connection::ConnectOptions; use crate::error::Error; use crate::{PgConnectOptions, PgConnection}; -use futures_core::future::BoxFuture; use log::LevelFilter; use sqlx_core::Url; use std::time::Duration; @@ -17,11 +16,11 @@ impl ConnectOptions for PgConnectOptions { self.build_url() } - fn connect(&self) -> BoxFuture<'_, Result> + async fn connect(&self) -> Result where Self::Connection: Sized, { - Box::pin(PgConnection::establish(self)) + PgConnection::establish(self).await } fn log_statements(mut self, level: LevelFilter) -> Self { diff --git a/sqlx-postgres/src/transaction.rs b/sqlx-postgres/src/transaction.rs index e7c78488eb..bf7f13b437 100644 --- a/sqlx-postgres/src/transaction.rs +++ b/sqlx-postgres/src/transaction.rs @@ -1,5 +1,3 @@ -use futures_core::future::BoxFuture; - use crate::error::Error; use crate::executor::Executor; @@ -13,45 +11,37 @@ pub struct PgTransactionManager; impl TransactionManager for PgTransactionManager { type Database = Postgres; - fn begin(conn: &mut PgConnection) -> BoxFuture<'_, Result<(), Error>> { - Box::pin(async move { - let rollback = Rollback::new(conn); - let query = begin_ansi_transaction_sql(rollback.conn.inner.transaction_depth); - rollback.conn.queue_simple_query(&query)?; - rollback.conn.inner.transaction_depth += 1; - rollback.conn.wait_until_ready().await?; - rollback.defuse(); - - Ok(()) - }) + async fn begin(conn: &mut PgConnection) -> Result<(), Error> { + let rollback = Rollback::new(conn); + let query = begin_ansi_transaction_sql(rollback.conn.inner.transaction_depth); + rollback.conn.queue_simple_query(&query)?; + rollback.conn.inner.transaction_depth += 1; + rollback.conn.wait_until_ready().await?; + rollback.defuse(); + + Ok(()) } - fn commit(conn: &mut PgConnection) -> BoxFuture<'_, Result<(), Error>> { - Box::pin(async move { - if conn.inner.transaction_depth > 0 { - conn.execute(&*commit_ansi_transaction_sql(conn.inner.transaction_depth)) - .await?; + async fn commit(conn: &mut PgConnection) -> Result<(), Error> { + if conn.inner.transaction_depth > 0 { + conn.execute(&*commit_ansi_transaction_sql(conn.inner.transaction_depth)) + .await?; - conn.inner.transaction_depth -= 1; - } + conn.inner.transaction_depth -= 1; + } - Ok(()) - }) + Ok(()) } - fn rollback(conn: &mut PgConnection) -> BoxFuture<'_, Result<(), Error>> { - Box::pin(async move { - if conn.inner.transaction_depth > 0 { - conn.execute(&*rollback_ansi_transaction_sql( - conn.inner.transaction_depth, - )) + async fn rollback(conn: &mut PgConnection) -> Result<(), Error> { + if conn.inner.transaction_depth > 0 { + conn.execute(&*rollback_ansi_transaction_sql(conn.inner.transaction_depth)) .await?; - conn.inner.transaction_depth -= 1; - } + conn.inner.transaction_depth -= 1; + } - Ok(()) - }) + Ok(()) } fn start_rollback(conn: &mut PgConnection) { diff --git a/sqlx-sqlite/src/connection/mod.rs b/sqlx-sqlite/src/connection/mod.rs index a579b8a605..a6d5fd5417 100644 --- a/sqlx-sqlite/src/connection/mod.rs +++ b/sqlx-sqlite/src/connection/mod.rs @@ -7,7 +7,6 @@ use std::panic::catch_unwind; use std::ptr; use std::ptr::NonNull; -use futures_core::future::BoxFuture; use futures_intrusive::sync::MutexGuard; use futures_util::future; use libsqlite3_sys::{ @@ -200,42 +199,38 @@ impl Connection for SqliteConnection { type Options = SqliteConnectOptions; - fn close(mut self) -> BoxFuture<'static, Result<(), Error>> { - Box::pin(async move { - if let OptimizeOnClose::Enabled { analysis_limit } = self.optimize_on_close { - let mut pragma_string = String::new(); - if let Some(limit) = analysis_limit { - write!(pragma_string, "PRAGMA analysis_limit = {limit}; ").ok(); - } - pragma_string.push_str("PRAGMA optimize;"); - self.execute(&*pragma_string).await?; + async fn close(mut self) -> Result<(), Error> { + if let OptimizeOnClose::Enabled { analysis_limit } = self.optimize_on_close { + let mut pragma_string = String::new(); + if let Some(limit) = analysis_limit { + write!(pragma_string, "PRAGMA analysis_limit = {limit}; ").ok(); } - let shutdown = self.worker.shutdown(); - // Drop the statement worker, which should - // cover all references to the connection handle outside of the worker thread - drop(self); - // Ensure the worker thread has terminated - shutdown.await - }) + pragma_string.push_str("PRAGMA optimize;"); + self.execute(&*pragma_string).await?; + } + let shutdown = self.worker.shutdown(); + // Drop the statement worker, which should + // cover all references to the connection handle outside of the worker thread + drop(self); + // Ensure the worker thread has terminated + shutdown.await } - fn close_hard(self) -> BoxFuture<'static, Result<(), Error>> { - Box::pin(async move { - drop(self); - Ok(()) - }) + async fn close_hard(self) -> Result<(), Error> { + drop(self); + Ok(()) } /// Ensure the background worker thread is alive and accepting commands. - fn ping(&mut self) -> BoxFuture<'_, Result<(), Error>> { - Box::pin(self.worker.ping()) + async fn ping(&mut self) -> Result<(), Error> { + self.worker.ping().await } - fn begin(&mut self) -> BoxFuture<'_, Result, Error>> + async fn begin(&mut self) -> Result, Error> where Self: Sized, { - Transaction::begin(self) + Transaction::begin(self).await } fn cached_statements_size(&self) -> usize { @@ -245,11 +240,9 @@ impl Connection for SqliteConnection { .load(std::sync::atomic::Ordering::Acquire) } - fn clear_cached_statements(&mut self) -> BoxFuture<'_, Result<(), Error>> { - Box::pin(async move { - self.worker.clear_cache().await?; - Ok(()) - }) + async fn clear_cached_statements(&mut self) -> Result<(), Error> { + self.worker.clear_cache().await?; + Ok(()) } #[inline] @@ -258,12 +251,12 @@ impl Connection for SqliteConnection { } #[doc(hidden)] - fn flush(&mut self) -> BoxFuture<'_, Result<(), Error>> { + async fn flush(&mut self) -> Result<(), Error> { // For SQLite, FLUSH does effectively nothing... // Well, we could use this to ensure that the command channel has been cleared, // but it would only develop a backlog if a lot of queries are executed and then cancelled // partway through, and then this would only make that situation worse. - Box::pin(future::ok(())) + Ok(()) } #[doc(hidden)] diff --git a/sqlx-sqlite/src/options/connect.rs b/sqlx-sqlite/src/options/connect.rs index 309f2430e0..9a29da2df2 100644 --- a/sqlx-sqlite/src/options/connect.rs +++ b/sqlx-sqlite/src/options/connect.rs @@ -28,26 +28,24 @@ impl ConnectOptions for SqliteConnectOptions { self.build_url() } - fn connect(&self) -> BoxFuture<'_, Result> + async fn connect(&self) -> Result where Self::Connection: Sized, { - Box::pin(async move { - let mut conn = SqliteConnection::establish(self).await?; + let mut conn = SqliteConnection::establish(self).await?; - // Execute PRAGMAs - conn.execute(&*self.pragma_string()).await?; + // Execute PRAGMAs + conn.execute(&*self.pragma_string()).await?; - if !self.collations.is_empty() { - let mut locked = conn.lock_handle().await?; + if !self.collations.is_empty() { + let mut locked = conn.lock_handle().await?; - for collation in &self.collations { - collation.create(&mut locked.guard.handle)?; - } + for collation in &self.collations { + collation.create(&mut locked.guard.handle)?; } + } - Ok(conn) - }) + Ok(conn) } fn log_statements(mut self, level: LevelFilter) -> Self { diff --git a/sqlx-sqlite/src/transaction.rs b/sqlx-sqlite/src/transaction.rs index 24eaca51b1..90f9c64a6e 100644 --- a/sqlx-sqlite/src/transaction.rs +++ b/sqlx-sqlite/src/transaction.rs @@ -1,5 +1,3 @@ -use futures_core::future::BoxFuture; - use crate::{Sqlite, SqliteConnection}; use sqlx_core::error::Error; use sqlx_core::transaction::TransactionManager; @@ -10,16 +8,16 @@ pub struct SqliteTransactionManager; impl TransactionManager for SqliteTransactionManager { type Database = Sqlite; - fn begin(conn: &mut SqliteConnection) -> BoxFuture<'_, Result<(), Error>> { - Box::pin(conn.worker.begin()) + async fn begin(conn: &mut SqliteConnection) -> Result<(), Error> { + conn.worker.begin().await } - fn commit(conn: &mut SqliteConnection) -> BoxFuture<'_, Result<(), Error>> { - Box::pin(conn.worker.commit()) + async fn commit(conn: &mut SqliteConnection) -> Result<(), Error> { + conn.worker.commit().await } - fn rollback(conn: &mut SqliteConnection) -> BoxFuture<'_, Result<(), Error>> { - Box::pin(conn.worker.rollback()) + async fn rollback(conn: &mut SqliteConnection) -> Result<(), Error> { + conn.worker.rollback().await } fn start_rollback(conn: &mut SqliteConnection) { From 3d3e1189a4ff9e21e4a0d7c83be3bf4c8ee80748 Mon Sep 17 00:00:00 2001 From: Joey de Waal Date: Thu, 26 Sep 2024 00:18:40 +0200 Subject: [PATCH 2/7] Remove unnecessary extra lifetime bound --- sqlx-core/src/connection.rs | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/sqlx-core/src/connection.rs b/sqlx-core/src/connection.rs index 8d14111970..66233e1b5b 100644 --- a/sqlx-core/src/connection.rs +++ b/sqlx-core/src/connection.rs @@ -149,9 +149,9 @@ pub trait Connection: Send { /// Establish a new database connection with the provided options. - fn connect_with<'a>(options: &'a Self::Options) -> BoxFuture<'_, Result> + fn connect_with(options: &Self::Options) -> BoxFuture<'_, Result> where - Self: Sized + 'a, + Self: Sized { Box::pin(options.connect()) } From 2ff52fd81ab903553866c53796528608c8ce0238 Mon Sep 17 00:00:00 2001 From: Joey de Waal Date: Thu, 26 Sep 2024 11:33:12 +0200 Subject: [PATCH 3/7] Remove BoxFuture for Connection, MigrateDatabase, TransactionManager, ConnectOptions, TestSupport, PgPoolCopyExt --- sqlx-core/src/any/connection/mod.rs | 3 +- sqlx-core/src/any/driver.rs | 8 +-- sqlx-core/src/any/migrate.rs | 48 ++++++-------- sqlx-core/src/connection.rs | 12 ++-- sqlx-core/src/migrate/migrate.rs | 11 ++-- sqlx-core/src/testing/mod.rs | 6 +- sqlx-core/src/transaction.rs | 2 +- sqlx-mysql/src/migrate.rs | 54 +++++++--------- sqlx-mysql/src/testing/mod.rs | 54 ++++++++-------- sqlx-postgres/src/migrate.rs | 98 +++++++++++++---------------- sqlx-postgres/src/testing/mod.rs | 52 +++++++-------- sqlx-sqlite/src/migrate.rs | 64 +++++++++---------- sqlx-sqlite/src/testing/mod.rs | 16 +++-- 13 files changed, 196 insertions(+), 232 deletions(-) diff --git a/sqlx-core/src/any/connection/mod.rs b/sqlx-core/src/any/connection/mod.rs index 7d19c6f81f..6ccbdaa629 100644 --- a/sqlx-core/src/any/connection/mod.rs +++ b/sqlx-core/src/any/connection/mod.rs @@ -61,7 +61,8 @@ impl AnyConnection { #[cfg(feature = "migrate")] pub(crate) fn get_migrate( &mut self, - ) -> crate::Result<&mut (dyn crate::migrate::Migrate + Send + 'static)> { + ) -> crate::Result<&mut (dyn crate::migrate::Migrate + Send + 'static)> + { self.backend.as_migrate() } } diff --git a/sqlx-core/src/any/driver.rs b/sqlx-core/src/any/driver.rs index cf97b84812..b8eb3a81b5 100644 --- a/sqlx-core/src/any/driver.rs +++ b/sqlx-core/src/any/driver.rs @@ -67,10 +67,10 @@ impl AnyDriver { { Self { migrate_database: Some(AnyMigrateDatabase { - create_database: DebugFn(DB::create_database), - database_exists: DebugFn(DB::database_exists), - drop_database: DebugFn(DB::drop_database), - force_drop_database: DebugFn(DB::force_drop_database), + create_database: DebugFn(|url| Box::pin(async move { DB::create_database(url).await })), + database_exists: DebugFn(|url| Box::pin(async move { DB::database_exists(url).await })), + drop_database: DebugFn(|url| Box::pin(async move { DB::drop_database(url).await })), + force_drop_database: DebugFn(|url| Box::pin(async move { DB::force_drop_database(url).await })), }), ..Self::without_migrate::() } diff --git a/sqlx-core/src/any/migrate.rs b/sqlx-core/src/any/migrate.rs index cb4f72c340..9e3a2a8161 100644 --- a/sqlx-core/src/any/migrate.rs +++ b/sqlx-core/src/any/migrate.rs @@ -6,40 +6,32 @@ use futures_core::future::BoxFuture; use std::time::Duration; impl MigrateDatabase for Any { - fn create_database(url: &str) -> BoxFuture<'_, Result<(), Error>> { - Box::pin(async { - driver::from_url_str(url)? - .get_migrate_database()? - .create_database(url) - .await - }) + async fn create_database(url: &str) -> Result<(), Error> { + driver::from_url_str(url)? + .get_migrate_database()? + .create_database(url) + .await } - fn database_exists(url: &str) -> BoxFuture<'_, Result> { - Box::pin(async { - driver::from_url_str(url)? - .get_migrate_database()? - .database_exists(url) - .await - }) + async fn database_exists(url: &str) -> Result { + driver::from_url_str(url)? + .get_migrate_database()? + .database_exists(url) + .await } - fn drop_database(url: &str) -> BoxFuture<'_, Result<(), Error>> { - Box::pin(async { - driver::from_url_str(url)? - .get_migrate_database()? - .drop_database(url) - .await - }) + async fn drop_database(url: &str) -> Result<(), Error> { + driver::from_url_str(url)? + .get_migrate_database()? + .drop_database(url) + .await } - fn force_drop_database(url: &str) -> BoxFuture<'_, Result<(), Error>> { - Box::pin(async { - driver::from_url_str(url)? - .get_migrate_database()? - .force_drop_database(url) - .await - }) + async fn force_drop_database(url: &str) -> Result<(), Error> { + driver::from_url_str(url)? + .get_migrate_database()? + .force_drop_database(url) + .await } } diff --git a/sqlx-core/src/connection.rs b/sqlx-core/src/connection.rs index 66233e1b5b..4538a096fd 100644 --- a/sqlx-core/src/connection.rs +++ b/sqlx-core/src/connection.rs @@ -77,7 +77,7 @@ pub trait Connection: Send { R: Send, E: From + Send, { - Box::pin(async move { + async move { let mut transaction = self.begin().await?; let ret = callback(&mut transaction).await; @@ -93,7 +93,7 @@ pub trait Connection: Send { Err(err) } } - }) + } } /// The number of statements currently cached in the connection. @@ -138,22 +138,22 @@ pub trait Connection: Send { /// A value of [`Options`][Self::Options] is parsed from the provided connection string. This parsing /// is database-specific. #[inline] - fn connect(url: &str) -> BoxFuture<'static, Result> + fn connect(url: &str) -> impl Future> + Send + 'static where Self: Sized, { let options = url.parse(); - Box::pin(async move { Self::connect_with(&options?).await }) + async move { Self::connect_with(&options?).await } } /// Establish a new database connection with the provided options. - fn connect_with(options: &Self::Options) -> BoxFuture<'_, Result> + fn connect_with(options: &Self::Options) -> impl Future> + Send + '_ where Self: Sized { - Box::pin(options.connect()) + options.connect() } } diff --git a/sqlx-core/src/migrate/migrate.rs b/sqlx-core/src/migrate/migrate.rs index 0e4448a9bd..322f0d1671 100644 --- a/sqlx-core/src/migrate/migrate.rs +++ b/sqlx-core/src/migrate/migrate.rs @@ -2,24 +2,25 @@ use crate::error::Error; use crate::migrate::{AppliedMigration, MigrateError, Migration}; use futures_core::future::BoxFuture; use std::time::Duration; +use std::future::Future; pub trait MigrateDatabase { // create database in url // uses a maintenance database depending on driver - fn create_database(url: &str) -> BoxFuture<'_, Result<(), Error>>; + fn create_database(url: &str) -> impl Future> + Send + '_; // check if the database in url exists // uses a maintenance database depending on driver - fn database_exists(url: &str) -> BoxFuture<'_, Result>; + fn database_exists(url: &str) -> impl Future> + Send + '_; // drop database in url // uses a maintenance database depending on driver - fn drop_database(url: &str) -> BoxFuture<'_, Result<(), Error>>; + fn drop_database(url: &str) -> impl Future> + Send + '_; // force drop database in url // uses a maintenance database depending on driver - fn force_drop_database(_url: &str) -> BoxFuture<'_, Result<(), Error>> { - Box::pin(async { Err(MigrateError::ForceNotSupported)? }) + fn force_drop_database(_url: &str) -> impl Future> + Send + '_ { + async { Err(MigrateError::ForceNotSupported)? } } } diff --git a/sqlx-core/src/testing/mod.rs b/sqlx-core/src/testing/mod.rs index d82d1a3616..ae963369c9 100644 --- a/sqlx-core/src/testing/mod.rs +++ b/sqlx-core/src/testing/mod.rs @@ -24,9 +24,9 @@ pub trait TestSupport: Database { /// /// The implementation may require `DATABASE_URL` to be set in order to manage databases. /// The user credentials it contains must have the privilege to create and drop databases. - fn test_context(args: &TestArgs) -> BoxFuture<'_, Result, Error>>; + fn test_context(args: &TestArgs) -> impl Future, Error>> + Send + '_; - fn cleanup_test(db_name: &str) -> BoxFuture<'_, Result<(), Error>>; + fn cleanup_test(db_name: &str) -> impl Future> + Send + '_; /// Cleanup any test databases that are no longer in-use. /// @@ -34,7 +34,7 @@ pub trait TestSupport: Database { /// /// The implementation may require `DATABASE_URL` to be set in order to manage databases. /// The user credentials it contains must have the privilege to create and drop databases. - fn cleanup_test_dbs() -> BoxFuture<'static, Result, Error>>; + fn cleanup_test_dbs() -> impl Future, Error>> + Send + 'static; /// Take a snapshot of the current state of the database (data only). /// diff --git a/sqlx-core/src/transaction.rs b/sqlx-core/src/transaction.rs index 67ee3a41f4..a174ad8b07 100644 --- a/sqlx-core/src/transaction.rs +++ b/sqlx-core/src/transaction.rs @@ -233,7 +233,7 @@ impl<'c, 't, DB: Database> crate::acquire::Acquire<'t> for &'t mut Transaction<' type Connection = &'t mut ::Connection; #[inline] - fn acquire(self) -> BoxFuture<'t, Result> { +fn acquire(self) -> BoxFuture<'t, Result> { Box::pin(futures_util::future::ok(&mut **self)) } diff --git a/sqlx-mysql/src/migrate.rs b/sqlx-mysql/src/migrate.rs index 79b55ace3c..adf67219fc 100644 --- a/sqlx-mysql/src/migrate.rs +++ b/sqlx-mysql/src/migrate.rs @@ -31,46 +31,40 @@ fn parse_for_maintenance(url: &str) -> Result<(MySqlConnectOptions, String), Err } impl MigrateDatabase for MySql { - fn create_database(url: &str) -> BoxFuture<'_, Result<(), Error>> { - Box::pin(async move { - let (options, database) = parse_for_maintenance(url)?; - let mut conn = options.connect().await?; + async fn create_database(url: &str) -> Result<(), Error> { + let (options, database) = parse_for_maintenance(url)?; + let mut conn = options.connect().await?; - let _ = conn - .execute(&*format!("CREATE DATABASE `{database}`")) - .await?; + let _ = conn + .execute(&*format!("CREATE DATABASE `{database}`")) + .await?; - Ok(()) - }) + Ok(()) } - fn database_exists(url: &str) -> BoxFuture<'_, Result> { - Box::pin(async move { - let (options, database) = parse_for_maintenance(url)?; - let mut conn = options.connect().await?; + async fn database_exists(url: &str) -> Result { + let (options, database) = parse_for_maintenance(url)?; + let mut conn = options.connect().await?; - let exists: bool = query_scalar( - "select exists(SELECT 1 from INFORMATION_SCHEMA.SCHEMATA WHERE SCHEMA_NAME = ?)", - ) - .bind(database) - .fetch_one(&mut conn) - .await?; + let exists: bool = query_scalar( + "select exists(SELECT 1 from INFORMATION_SCHEMA.SCHEMATA WHERE SCHEMA_NAME = ?)", + ) + .bind(database) + .fetch_one(&mut conn) + .await?; - Ok(exists) - }) + Ok(exists) } - fn drop_database(url: &str) -> BoxFuture<'_, Result<(), Error>> { - Box::pin(async move { - let (options, database) = parse_for_maintenance(url)?; - let mut conn = options.connect().await?; + async fn drop_database(url: &str) -> Result<(), Error> { + let (options, database) = parse_for_maintenance(url)?; + let mut conn = options.connect().await?; - let _ = conn - .execute(&*format!("DROP DATABASE IF EXISTS `{database}`")) - .await?; + let _ = conn + .execute(&*format!("DROP DATABASE IF EXISTS `{database}`")) + .await?; - Ok(()) - }) + Ok(()) } } diff --git a/sqlx-mysql/src/testing/mod.rs b/sqlx-mysql/src/testing/mod.rs index 2a9216d1b8..a5dca18242 100644 --- a/sqlx-mysql/src/testing/mod.rs +++ b/sqlx-mysql/src/testing/mod.rs @@ -26,46 +26,42 @@ static MASTER_POOL: OnceCell> = OnceCell::new(); static DO_CLEANUP: AtomicBool = AtomicBool::new(true); impl TestSupport for MySql { - fn test_context(args: &TestArgs) -> BoxFuture<'_, Result, Error>> { - Box::pin(async move { test_context(args).await }) + async fn test_context(args: &TestArgs) -> Result, Error> { + test_context(args).await } - fn cleanup_test(db_name: &str) -> BoxFuture<'_, Result<(), Error>> { - Box::pin(async move { - let mut conn = MASTER_POOL - .get() - .expect("cleanup_test() invoked outside `#[sqlx::test]") - .acquire() - .await?; + async fn cleanup_test(db_name: &str) -> Result<(), Error> { + let mut conn = MASTER_POOL + .get() + .expect("cleanup_test() invoked outside `#[sqlx::test]") + .acquire() + .await?; - let db_id = db_id(db_name); + let db_id = db_id(db_name); - conn.execute(&format!("drop database if exists {db_name};")[..]) - .await?; + conn.execute(&format!("drop database if exists {db_name};")[..]) + .await?; - query("delete from _sqlx_test_databases where db_id = ?") - .bind(db_id) - .execute(&mut *conn) - .await?; + query("delete from _sqlx_test_databases where db_id = ?") + .bind(db_id) + .execute(&mut *conn) + .await?; - Ok(()) - }) + Ok(()) } - fn cleanup_test_dbs() -> BoxFuture<'static, Result, Error>> { - Box::pin(async move { - let url = dotenvy::var("DATABASE_URL").expect("DATABASE_URL must be set"); + async fn cleanup_test_dbs() -> Result, Error> { + let url = dotenvy::var("DATABASE_URL").expect("DATABASE_URL must be set"); - let mut conn = MySqlConnection::connect(&url).await?; + let mut conn = MySqlConnection::connect(&url).await?; - let now = SystemTime::now() - .duration_since(SystemTime::UNIX_EPOCH) - .unwrap(); + let now = SystemTime::now() + .duration_since(SystemTime::UNIX_EPOCH) + .unwrap(); - let num_deleted = do_cleanup(&mut conn, now).await?; - let _ = conn.close().await; - Ok(Some(num_deleted)) - }) + let num_deleted = do_cleanup(&mut conn, now).await?; + let _ = conn.close().await; + Ok(Some(num_deleted)) } fn snapshot( diff --git a/sqlx-postgres/src/migrate.rs b/sqlx-postgres/src/migrate.rs index c37e92f4d6..edf715dcb3 100644 --- a/sqlx-postgres/src/migrate.rs +++ b/sqlx-postgres/src/migrate.rs @@ -39,74 +39,66 @@ fn parse_for_maintenance(url: &str) -> Result<(PgConnectOptions, String), Error> } impl MigrateDatabase for Postgres { - fn create_database(url: &str) -> BoxFuture<'_, Result<(), Error>> { - Box::pin(async move { - let (options, database) = parse_for_maintenance(url)?; - let mut conn = options.connect().await?; - - let _ = conn - .execute(&*format!( - "CREATE DATABASE \"{}\"", - database.replace('"', "\"\"") - )) - .await?; + async fn create_database(url: &str) -> Result<(), Error> { + let (options, database) = parse_for_maintenance(url)?; + let mut conn = options.connect().await?; + + let _ = conn + .execute(&*format!( + "CREATE DATABASE \"{}\"", + database.replace('"', "\"\"") + )) + .await?; - Ok(()) - }) + Ok(()) } - fn database_exists(url: &str) -> BoxFuture<'_, Result> { - Box::pin(async move { - let (options, database) = parse_for_maintenance(url)?; - let mut conn = options.connect().await?; + async fn database_exists(url: &str) -> Result { + let (options, database) = parse_for_maintenance(url)?; + let mut conn = options.connect().await?; - let exists: bool = - query_scalar("select exists(SELECT 1 from pg_database WHERE datname = $1)") - .bind(database) - .fetch_one(&mut conn) - .await?; + let exists: bool = + query_scalar("select exists(SELECT 1 from pg_database WHERE datname = $1)") + .bind(database) + .fetch_one(&mut conn) + .await?; - Ok(exists) - }) + Ok(exists) } - fn drop_database(url: &str) -> BoxFuture<'_, Result<(), Error>> { - Box::pin(async move { - let (options, database) = parse_for_maintenance(url)?; - let mut conn = options.connect().await?; - - let _ = conn - .execute(&*format!( - "DROP DATABASE IF EXISTS \"{}\"", - database.replace('"', "\"\"") - )) - .await?; + async fn drop_database(url: &str) -> Result<(), Error> { + let (options, database) = parse_for_maintenance(url)?; + let mut conn = options.connect().await?; - Ok(()) - }) + let _ = conn + .execute(&*format!( + "DROP DATABASE IF EXISTS \"{}\"", + database.replace('"', "\"\"") + )) + .await?; + + Ok(()) } - fn force_drop_database(url: &str) -> BoxFuture<'_, Result<(), Error>> { - Box::pin(async move { - let (options, database) = parse_for_maintenance(url)?; - let mut conn = options.connect().await?; + async fn force_drop_database(url: &str) -> Result<(), Error> { + let (options, database) = parse_for_maintenance(url)?; + let mut conn = options.connect().await?; - let row: (String,) = query_as("SELECT current_setting('server_version_num')") - .fetch_one(&mut conn) - .await?; + let row: (String,) = query_as("SELECT current_setting('server_version_num')") + .fetch_one(&mut conn) + .await?; - let version = row.0.parse::().unwrap(); + let version = row.0.parse::().unwrap(); - let pid_type = if version >= 90200 { "pid" } else { "procpid" }; + let pid_type = if version >= 90200 { "pid" } else { "procpid" }; - conn.execute(&*format!( - "SELECT pg_terminate_backend(pg_stat_activity.{pid_type}) FROM pg_stat_activity \ - WHERE pg_stat_activity.datname = '{database}' AND {pid_type} <> pg_backend_pid()" - )) - .await?; + conn.execute(&*format!( + "SELECT pg_terminate_backend(pg_stat_activity.{pid_type}) FROM pg_stat_activity \ + WHERE pg_stat_activity.datname = '{database}' AND {pid_type} <> pg_backend_pid()" + )) + .await?; - Self::drop_database(url).await - }) + Self::drop_database(url).await } } diff --git a/sqlx-postgres/src/testing/mod.rs b/sqlx-postgres/src/testing/mod.rs index fb36ab4136..4d2efd418f 100644 --- a/sqlx-postgres/src/testing/mod.rs +++ b/sqlx-postgres/src/testing/mod.rs @@ -25,44 +25,40 @@ static MASTER_POOL: OnceCell> = OnceCell::new(); static DO_CLEANUP: AtomicBool = AtomicBool::new(true); impl TestSupport for Postgres { - fn test_context(args: &TestArgs) -> BoxFuture<'_, Result, Error>> { - Box::pin(async move { test_context(args).await }) + async fn test_context(args: &TestArgs) -> Result, Error> { + test_context(args).await } - fn cleanup_test(db_name: &str) -> BoxFuture<'_, Result<(), Error>> { - Box::pin(async move { - let mut conn = MASTER_POOL - .get() - .expect("cleanup_test() invoked outside `#[sqlx::test]") - .acquire() - .await?; + async fn cleanup_test(db_name: &str) -> Result<(), Error> { + let mut conn = MASTER_POOL + .get() + .expect("cleanup_test() invoked outside `#[sqlx::test]") + .acquire() + .await?; - conn.execute(&format!("drop database if exists {db_name:?};")[..]) - .await?; + conn.execute(&format!("drop database if exists {db_name:?};")[..]) + .await?; - query("delete from _sqlx_test.databases where db_name = $1") - .bind(db_name) - .execute(&mut *conn) - .await?; + query("delete from _sqlx_test.databases where db_name = $1") + .bind(db_name) + .execute(&mut *conn) + .await?; - Ok(()) - }) + Ok(()) } - fn cleanup_test_dbs() -> BoxFuture<'static, Result, Error>> { - Box::pin(async move { - let url = dotenvy::var("DATABASE_URL").expect("DATABASE_URL must be set"); + async fn cleanup_test_dbs() -> Result, Error> { + let url = dotenvy::var("DATABASE_URL").expect("DATABASE_URL must be set"); - let mut conn = PgConnection::connect(&url).await?; + let mut conn = PgConnection::connect(&url).await?; - let now = SystemTime::now() - .duration_since(SystemTime::UNIX_EPOCH) - .unwrap(); + let now = SystemTime::now() + .duration_since(SystemTime::UNIX_EPOCH) + .unwrap(); - let num_deleted = do_cleanup(&mut conn, now).await?; - let _ = conn.close().await; - Ok(Some(num_deleted)) - }) + let num_deleted = do_cleanup(&mut conn, now).await?; + let _ = conn.close().await; + Ok(Some(num_deleted)) } fn snapshot( diff --git a/sqlx-sqlite/src/migrate.rs b/sqlx-sqlite/src/migrate.rs index b9ce22dccd..48c83565ff 100644 --- a/sqlx-sqlite/src/migrate.rs +++ b/sqlx-sqlite/src/migrate.rs @@ -17,49 +17,43 @@ use std::time::Instant; pub(crate) use sqlx_core::migrate::*; impl MigrateDatabase for Sqlite { - fn create_database(url: &str) -> BoxFuture<'_, Result<(), Error>> { - Box::pin(async move { - let mut opts = SqliteConnectOptions::from_str(url)?.create_if_missing(true); - - // Since it doesn't make sense to include this flag in the connection URL, - // we just use an `AtomicBool` to pass it. - if super::CREATE_DB_WAL.load(Ordering::Acquire) { - opts = opts.journal_mode(SqliteJournalMode::Wal); - } - - // Opening a connection to sqlite creates the database - opts.connect() - .await? - // Ensure WAL mode tempfiles are cleaned up - .close() - .await?; + async fn create_database(url: &str) -> Result<(), Error> { + let mut opts = SqliteConnectOptions::from_str(url)?.create_if_missing(true); + + // Since it doesn't make sense to include this flag in the connection URL, + // we just use an `AtomicBool` to pass it. + if super::CREATE_DB_WAL.load(Ordering::Acquire) { + opts = opts.journal_mode(SqliteJournalMode::Wal); + } + + // Opening a connection to sqlite creates the database + opts.connect() + .await? + // Ensure WAL mode tempfiles are cleaned up + .close() + .await?; - Ok(()) - }) + Ok(()) } - fn database_exists(url: &str) -> BoxFuture<'_, Result> { - Box::pin(async move { - let options = SqliteConnectOptions::from_str(url)?; + async fn database_exists(url: &str) -> Result { + let options = SqliteConnectOptions::from_str(url)?; - if options.in_memory { - Ok(true) - } else { - Ok(options.filename.exists()) - } - }) + if options.in_memory { + Ok(true) + } else { + Ok(options.filename.exists()) + } } - fn drop_database(url: &str) -> BoxFuture<'_, Result<(), Error>> { - Box::pin(async move { - let options = SqliteConnectOptions::from_str(url)?; + async fn drop_database(url: &str) -> Result<(), Error> { + let options = SqliteConnectOptions::from_str(url)?; - if !options.in_memory { - fs::remove_file(&*options.filename).await?; - } + if !options.in_memory { + fs::remove_file(&*options.filename).await?; + } - Ok(()) - }) + Ok(()) } } diff --git a/sqlx-sqlite/src/testing/mod.rs b/sqlx-sqlite/src/testing/mod.rs index 3398c6b493..c55a7168d2 100644 --- a/sqlx-sqlite/src/testing/mod.rs +++ b/sqlx-sqlite/src/testing/mod.rs @@ -10,19 +10,17 @@ pub(crate) use sqlx_core::testing::*; const BASE_PATH: &str = "target/sqlx/test-dbs"; impl TestSupport for Sqlite { - fn test_context(args: &TestArgs) -> BoxFuture<'_, Result, Error>> { - Box::pin(async move { test_context(args).await }) + async fn test_context(args: &TestArgs) -> Result, Error> { + test_context(args).await } - fn cleanup_test(db_name: &str) -> BoxFuture<'_, Result<(), Error>> { - Box::pin(async move { Ok(crate::fs::remove_file(db_name).await?) }) + async fn cleanup_test(db_name: &str) -> Result<(), Error> { + Ok(crate::fs::remove_file(db_name).await?) } - fn cleanup_test_dbs() -> BoxFuture<'static, Result, Error>> { - Box::pin(async move { - crate::fs::remove_dir_all(BASE_PATH).await?; - Ok(None) - }) + async fn cleanup_test_dbs() -> Result, Error> { + crate::fs::remove_dir_all(BASE_PATH).await?; + Ok(None) } fn snapshot( From 1163a51569846dd3ddb0103da41f5bdac21002f4 Mon Sep 17 00:00:00 2001 From: Joey de Waal Date: Thu, 26 Sep 2024 11:42:22 +0200 Subject: [PATCH 4/7] fust fmt --- sqlx-core/src/any/connection/mod.rs | 3 +-- sqlx-core/src/any/driver.rs | 12 +++++++++--- sqlx-core/src/connection.rs | 20 +++++++++++++------- sqlx-core/src/migrate/migrate.rs | 2 +- sqlx-core/src/testing/mod.rs | 4 +++- sqlx-core/src/transaction.rs | 5 ++--- 6 files changed, 29 insertions(+), 17 deletions(-) diff --git a/sqlx-core/src/any/connection/mod.rs b/sqlx-core/src/any/connection/mod.rs index 6ccbdaa629..7d19c6f81f 100644 --- a/sqlx-core/src/any/connection/mod.rs +++ b/sqlx-core/src/any/connection/mod.rs @@ -61,8 +61,7 @@ impl AnyConnection { #[cfg(feature = "migrate")] pub(crate) fn get_migrate( &mut self, - ) -> crate::Result<&mut (dyn crate::migrate::Migrate + Send + 'static)> - { + ) -> crate::Result<&mut (dyn crate::migrate::Migrate + Send + 'static)> { self.backend.as_migrate() } } diff --git a/sqlx-core/src/any/driver.rs b/sqlx-core/src/any/driver.rs index b8eb3a81b5..0ad529ea18 100644 --- a/sqlx-core/src/any/driver.rs +++ b/sqlx-core/src/any/driver.rs @@ -67,10 +67,16 @@ impl AnyDriver { { Self { migrate_database: Some(AnyMigrateDatabase { - create_database: DebugFn(|url| Box::pin(async move { DB::create_database(url).await })), - database_exists: DebugFn(|url| Box::pin(async move { DB::database_exists(url).await })), + create_database: DebugFn(|url| { + Box::pin(async move { DB::create_database(url).await }) + }), + database_exists: DebugFn(|url| { + Box::pin(async move { DB::database_exists(url).await }) + }), drop_database: DebugFn(|url| Box::pin(async move { DB::drop_database(url).await })), - force_drop_database: DebugFn(|url| Box::pin(async move { DB::force_drop_database(url).await })), + force_drop_database: DebugFn(|url| { + Box::pin(async move { DB::force_drop_database(url).await }) + }), }), ..Self::without_migrate::() } diff --git a/sqlx-core/src/connection.rs b/sqlx-core/src/connection.rs index 4538a096fd..adf6e04df3 100644 --- a/sqlx-core/src/connection.rs +++ b/sqlx-core/src/connection.rs @@ -1,6 +1,6 @@ use crate::database::{Database, HasStatementCache}; -use std::future::Future; use crate::error::Error; +use std::future::Future; use crate::transaction::Transaction; use futures_core::future::BoxFuture; @@ -46,7 +46,9 @@ pub trait Connection: Send { /// Begin a new transaction or establish a savepoint within the active transaction. /// /// Returns a [`Transaction`] for controlling and tracking the new transaction. - fn begin(&mut self) -> impl Future, Error>> + Send + '_ + fn begin( + &mut self, + ) -> impl Future, Error>> + Send + '_ where Self: Sized; @@ -67,7 +69,10 @@ pub trait Connection: Send { /// })).await /// # } /// ``` - fn transaction<'a, F, R, E>(&'a mut self, callback: F) -> impl Future> + Send + 'a + fn transaction<'a, F, R, E>( + &'a mut self, + callback: F, + ) -> impl Future> + Send + 'a where for<'c> F: FnOnce(&'c mut Transaction<'_, Self::Database>) -> BoxFuture<'c, Result> + 'a @@ -137,7 +142,7 @@ pub trait Connection: Send { /// /// A value of [`Options`][Self::Options] is parsed from the provided connection string. This parsing /// is database-specific. - #[inline] + #[inline] fn connect(url: &str) -> impl Future> + Send + 'static where Self: Sized, @@ -147,11 +152,12 @@ pub trait Connection: Send { async move { Self::connect_with(&options?).await } } - /// Establish a new database connection with the provided options. - fn connect_with(options: &Self::Options) -> impl Future> + Send + '_ + fn connect_with( + options: &Self::Options, + ) -> impl Future> + Send + '_ where - Self: Sized + Self: Sized, { options.connect() } diff --git a/sqlx-core/src/migrate/migrate.rs b/sqlx-core/src/migrate/migrate.rs index 322f0d1671..e7b99906ec 100644 --- a/sqlx-core/src/migrate/migrate.rs +++ b/sqlx-core/src/migrate/migrate.rs @@ -1,8 +1,8 @@ use crate::error::Error; use crate::migrate::{AppliedMigration, MigrateError, Migration}; use futures_core::future::BoxFuture; -use std::time::Duration; use std::future::Future; +use std::time::Duration; pub trait MigrateDatabase { // create database in url diff --git a/sqlx-core/src/testing/mod.rs b/sqlx-core/src/testing/mod.rs index ae963369c9..ad194b5189 100644 --- a/sqlx-core/src/testing/mod.rs +++ b/sqlx-core/src/testing/mod.rs @@ -24,7 +24,9 @@ pub trait TestSupport: Database { /// /// The implementation may require `DATABASE_URL` to be set in order to manage databases. /// The user credentials it contains must have the privilege to create and drop databases. - fn test_context(args: &TestArgs) -> impl Future, Error>> + Send + '_; + fn test_context( + args: &TestArgs, + ) -> impl Future, Error>> + Send + '_; fn cleanup_test(db_name: &str) -> impl Future> + Send + '_; diff --git a/sqlx-core/src/transaction.rs b/sqlx-core/src/transaction.rs index a174ad8b07..3b02ae6ad9 100644 --- a/sqlx-core/src/transaction.rs +++ b/sqlx-core/src/transaction.rs @@ -1,6 +1,6 @@ use std::borrow::Cow; -use std::future::Future; use std::fmt::{self, Debug, Formatter}; +use std::future::Future; use std::ops::{Deref, DerefMut}; use futures_core::future::BoxFuture; @@ -31,7 +31,6 @@ pub trait TransactionManager { conn: &mut ::Connection, ) -> impl Future> + Send; - /// Starts to abort the active transaction or restore from the most recent snapshot. fn start_rollback(conn: &mut ::Connection); } @@ -233,7 +232,7 @@ impl<'c, 't, DB: Database> crate::acquire::Acquire<'t> for &'t mut Transaction<' type Connection = &'t mut ::Connection; #[inline] -fn acquire(self) -> BoxFuture<'t, Result> { + fn acquire(self) -> BoxFuture<'t, Result> { Box::pin(futures_util::future::ok(&mut **self)) } From 168fe2c9a61e60189b60d4dcb16e3b70fea99dda Mon Sep 17 00:00:00 2001 From: Joey de Waal Date: Thu, 26 Sep 2024 11:50:13 +0200 Subject: [PATCH 5/7] fix AnyConnectionBackend for mysql and sqlite --- sqlx-mysql/src/any.rs | 16 ++++++++-------- sqlx-sqlite/src/any.rs | 16 ++++++++-------- 2 files changed, 16 insertions(+), 16 deletions(-) diff --git a/sqlx-mysql/src/any.rs b/sqlx-mysql/src/any.rs index 0466bfc0a4..7d4f537a03 100644 --- a/sqlx-mysql/src/any.rs +++ b/sqlx-mysql/src/any.rs @@ -6,7 +6,7 @@ use crate::{ use either::Either; use futures_core::future::BoxFuture; use futures_core::stream::BoxStream; -use futures_util::{stream, StreamExt, TryFutureExt, TryStreamExt}; +use futures_util::{stream, FutureExt, StreamExt, TryFutureExt, TryStreamExt}; use sqlx_core::any::{ Any, AnyArguments, AnyColumn, AnyConnectOptions, AnyConnectionBackend, AnyQueryResult, AnyRow, AnyStatement, AnyTypeInfo, AnyTypeInfoKind, @@ -26,27 +26,27 @@ impl AnyConnectionBackend for MySqlConnection { } fn close(self: Box) -> BoxFuture<'static, sqlx_core::Result<()>> { - Connection::close(*self) + Connection::close(*self).boxed() } fn close_hard(self: Box) -> BoxFuture<'static, sqlx_core::Result<()>> { - Connection::close_hard(*self) + Connection::close_hard(*self).boxed() } fn ping(&mut self) -> BoxFuture<'_, sqlx_core::Result<()>> { - Connection::ping(self) + Connection::ping(self).boxed() } fn begin(&mut self) -> BoxFuture<'_, sqlx_core::Result<()>> { - MySqlTransactionManager::begin(self) + MySqlTransactionManager::begin(self).boxed() } fn commit(&mut self) -> BoxFuture<'_, sqlx_core::Result<()>> { - MySqlTransactionManager::commit(self) + MySqlTransactionManager::commit(self).boxed() } fn rollback(&mut self) -> BoxFuture<'_, sqlx_core::Result<()>> { - MySqlTransactionManager::rollback(self) + MySqlTransactionManager::rollback(self).boxed() } fn start_rollback(&mut self) { @@ -58,7 +58,7 @@ impl AnyConnectionBackend for MySqlConnection { } fn flush(&mut self) -> BoxFuture<'_, sqlx_core::Result<()>> { - Connection::flush(self) + Connection::flush(self).boxed() } fn should_flush(&self) -> bool { diff --git a/sqlx-sqlite/src/any.rs b/sqlx-sqlite/src/any.rs index 01600d9931..a5814ede38 100644 --- a/sqlx-sqlite/src/any.rs +++ b/sqlx-sqlite/src/any.rs @@ -4,7 +4,7 @@ use crate::{ }; use futures_core::future::BoxFuture; use futures_core::stream::BoxStream; -use futures_util::{StreamExt, TryFutureExt, TryStreamExt}; +use futures_util::{FutureExt, StreamExt, TryFutureExt, TryStreamExt}; use sqlx_core::any::{ Any, AnyArguments, AnyColumn, AnyConnectOptions, AnyConnectionBackend, AnyQueryResult, AnyRow, @@ -26,27 +26,27 @@ impl AnyConnectionBackend for SqliteConnection { } fn close(self: Box) -> BoxFuture<'static, sqlx_core::Result<()>> { - Connection::close(*self) + Connection::close(*self).boxed() } fn close_hard(self: Box) -> BoxFuture<'static, sqlx_core::Result<()>> { - Connection::close_hard(*self) + Connection::close_hard(*self).boxed() } fn ping(&mut self) -> BoxFuture<'_, sqlx_core::Result<()>> { - Connection::ping(self) + Connection::ping(self).boxed() } fn begin(&mut self) -> BoxFuture<'_, sqlx_core::Result<()>> { - SqliteTransactionManager::begin(self) + SqliteTransactionManager::begin(self).boxed() } fn commit(&mut self) -> BoxFuture<'_, sqlx_core::Result<()>> { - SqliteTransactionManager::commit(self) + SqliteTransactionManager::commit(self).boxed() } fn rollback(&mut self) -> BoxFuture<'_, sqlx_core::Result<()>> { - SqliteTransactionManager::rollback(self) + SqliteTransactionManager::rollback(self).boxed() } fn start_rollback(&mut self) { @@ -58,7 +58,7 @@ impl AnyConnectionBackend for SqliteConnection { } fn flush(&mut self) -> BoxFuture<'_, sqlx_core::Result<()>> { - Connection::flush(self) + Connection::flush(self).boxed() } fn should_flush(&self) -> bool { From 257d8693e8b203500f423f909cd6540c83ffcc54 Mon Sep 17 00:00:00 2001 From: Joey de Waal Date: Thu, 26 Sep 2024 11:53:58 +0200 Subject: [PATCH 6/7] Fix clippy warnings --- sqlx-mysql/src/connection/mod.rs | 1 - sqlx-sqlite/src/connection/mod.rs | 1 - sqlx-sqlite/src/options/connect.rs | 1 - 3 files changed, 3 deletions(-) diff --git a/sqlx-mysql/src/connection/mod.rs b/sqlx-mysql/src/connection/mod.rs index d17b891a97..6fe7e6bb4c 100644 --- a/sqlx-mysql/src/connection/mod.rs +++ b/sqlx-mysql/src/connection/mod.rs @@ -1,6 +1,5 @@ use std::fmt::{self, Debug, Formatter}; -use futures_util::FutureExt; pub(crate) use sqlx_core::connection::*; pub(crate) use stream::{MySqlStream, Waiting}; diff --git a/sqlx-sqlite/src/connection/mod.rs b/sqlx-sqlite/src/connection/mod.rs index a6d5fd5417..eff29ded67 100644 --- a/sqlx-sqlite/src/connection/mod.rs +++ b/sqlx-sqlite/src/connection/mod.rs @@ -8,7 +8,6 @@ use std::ptr; use std::ptr::NonNull; use futures_intrusive::sync::MutexGuard; -use futures_util::future; use libsqlite3_sys::{ sqlite3, sqlite3_commit_hook, sqlite3_progress_handler, sqlite3_rollback_hook, sqlite3_update_hook, SQLITE_DELETE, SQLITE_INSERT, SQLITE_UPDATE, diff --git a/sqlx-sqlite/src/options/connect.rs b/sqlx-sqlite/src/options/connect.rs index 9a29da2df2..111598b5fb 100644 --- a/sqlx-sqlite/src/options/connect.rs +++ b/sqlx-sqlite/src/options/connect.rs @@ -1,5 +1,4 @@ use crate::{SqliteConnectOptions, SqliteConnection}; -use futures_core::future::BoxFuture; use log::LevelFilter; use sqlx_core::connection::ConnectOptions; use sqlx_core::error::Error; From 880f0dfd22c8c86b559f6031c7df6b88c675af78 Mon Sep 17 00:00:00 2001 From: Joey de Waal Date: Wed, 9 Oct 2024 22:16:41 +0200 Subject: [PATCH 7/7] run cargo fmt --- sqlx-postgres/src/transaction.rs | 6 ++++-- 1 file changed, 4 insertions(+), 2 deletions(-) diff --git a/sqlx-postgres/src/transaction.rs b/sqlx-postgres/src/transaction.rs index bf7f13b437..d6f4ecdb9d 100644 --- a/sqlx-postgres/src/transaction.rs +++ b/sqlx-postgres/src/transaction.rs @@ -35,8 +35,10 @@ impl TransactionManager for PgTransactionManager { async fn rollback(conn: &mut PgConnection) -> Result<(), Error> { if conn.inner.transaction_depth > 0 { - conn.execute(&*rollback_ansi_transaction_sql(conn.inner.transaction_depth)) - .await?; + conn.execute(&*rollback_ansi_transaction_sql( + conn.inner.transaction_depth, + )) + .await?; conn.inner.transaction_depth -= 1; }