diff --git a/sqlx-core/src/pool/inner.rs b/sqlx-core/src/pool/inner.rs index a65c61a010..6b341bc87c 100644 --- a/sqlx-core/src/pool/inner.rs +++ b/sqlx-core/src/pool/inner.rs @@ -13,7 +13,9 @@ use std::task::ready; use crate::logger::private_level_filter_to_trace_level; use crate::pool::connect::{ConnectPermit, ConnectionCounter, DynConnector}; use crate::pool::idle::IdleQueue; -use crate::private_tracing_dynamic_event; +use crate::rt::JoinHandle; +use crate::{private_tracing_dynamic_event, rt}; +use either::Either; use futures_util::future::{self, OptionFuture}; use futures_util::FutureExt; use std::time::{Duration, Instant}; @@ -116,7 +118,7 @@ impl PoolInner { let mut close_event = pin!(self.close_event()); let mut deadline = pin!(crate::rt::sleep(self.options.acquire_timeout)); let mut acquire_idle = pin!(self.idle.acquire(self).fuse()); - let mut check_idle = pin!(OptionFuture::from(None)); + let mut before_acquire = OptionFuture::from(None); let mut acquire_connect_permit = pin!(OptionFuture::from(Some( self.counter.acquire_permit(self).fuse() ))); @@ -144,21 +146,26 @@ impl PoolInner { // Attempt to acquire a connection from the idle queue. if let Ready(idle) = acquire_idle.poll_unpin(cx) { - check_idle.set(Some(check_idle_conn(idle, &self.options)).into()); + // If we acquired an idle connection, run any checks that need to be done. + // + // Includes `test_on_acquire` and the `before_acquire` callback, if set. + match finish_acquire(idle) { + // There are checks needed to be done, so they're spawned as a task + // to be cancellation-safe. + Either::Left(check_task) => { + before_acquire = Some(check_task).into(); + } + // The connection is ready to go. + Either::Right(conn) => { + return Ready(Ok(conn)); + } + } } - // If we acquired an idle connection, run any checks that need to be done. - // - // Includes `test_on_acquire` and the `before_acquire` callback, if set. - // - // We don't want to race this step if it's already running because canceling it - // will result in the potentially unnecessary closure of a connection. - // - // Instead, we just wait and see what happens. If we already started connecting, - // that'll happen concurrently. - match ready!(check_idle.poll_unpin(cx)) { + // Poll the task returned by `finish_acquire` + match ready!(before_acquire.poll_unpin(cx)) { // The `.reattach()` call errors with "type annotations needed" if not qualified. - Some(Ok(live)) => return Ready(Ok(Floating::reattach(live))), + Some(Ok(conn)) => return Ready(Ok(conn)), Some(Err(permit)) => { // We don't strictly need to poll `connect` here; all we really want to do // is to check if it is `None`. But since currently there's no getter for that, @@ -178,7 +185,7 @@ impl PoolInner { // Attempt to acquire another idle connection concurrently to opening a new one. acquire_idle.set(self.idle.acquire(self).fuse()); // Annoyingly, `OptionFuture` doesn't fuse to `None` on its own - check_idle.set(None.into()); + before_acquire = None.into(); } None => (), } @@ -289,42 +296,51 @@ fn is_beyond_idle_timeout(idle: &Idle, options: &PoolOptions timeout) } -async fn check_idle_conn( - mut conn: Floating>, - options: &PoolOptions, -) -> Result>, ConnectPermit> { - if options.test_before_acquire { - // Check that the connection is still live - if let Err(error) = conn.ping().await { - // an error here means the other end has hung up or we lost connectivity - // either way we're fine to just discard the connection - // the error itself here isn't necessarily unexpected so WARN is too strong - tracing::info!(%error, "ping on idle connection returned error"); - // connection is broken so don't try to close nicely - return Err(conn.close_hard().await); - } - } - - if let Some(test) = &options.before_acquire { - let meta = conn.metadata(); - match test(&mut conn.live.raw, meta).await { - Ok(false) => { - // connection was rejected by user-defined hook, close nicely - return Err(conn.close().await); - } - - Err(error) => { - tracing::warn!(%error, "error from `before_acquire`"); +/// Execute `test_before_acquire` and/or `before_acquire` in a background task, if applicable. +/// +/// Otherwise, immediately returns the connection. +fn finish_acquire( + mut conn: Floating> +) -> Either, ConnectPermit>>, PoolConnection> { + let pool = conn.permit.pool(); + + if pool.options.test_before_acquire || pool.options.before_acquire.is_some() { + // Spawn a task so the call may complete even if `acquire()` is cancelled. + return Either::Left(rt::spawn(async move { + // Check that the connection is still live + if let Err(error) = conn.ping().await { + // an error here means the other end has hung up or we lost connectivity + // either way we're fine to just discard the connection + // the error itself here isn't necessarily unexpected so WARN is too strong + tracing::info!(%error, "ping on idle connection returned error"); // connection is broken so don't try to close nicely return Err(conn.close_hard().await); } - Ok(true) => {} - } - } + if let Some(test) = &conn.permit.pool().options.before_acquire { + let meta = conn.metadata(); + match test(&mut conn.inner.live.raw, meta).await { + Ok(false) => { + // connection was rejected by user-defined hook, close nicely + return Err(conn.close().await); + } - // No need to re-connect; connection is alive or we don't care - Ok(conn.into_live()) + Err(error) => { + tracing::warn!(%error, "error from `before_acquire`"); + // connection is broken so don't try to close nicely + return Err(conn.close_hard().await); + } + + Ok(true) => {} + } + } + + Ok(conn.into_live().reattach()) + })); + } + + // No checks are configured, return immediately. + Either::Right(conn.into_live().reattach()) } fn spawn_maintenance_tasks(pool: &Arc>) { @@ -353,7 +369,7 @@ fn spawn_maintenance_tasks(pool: &Arc>) { // Immediately cancel this task if the pool is closed. let mut close_event = pool.close_event(); - crate::rt::spawn(async move { + rt::spawn(async move { let _ = close_event .do_until(async { // If the last handle to the pool was dropped while we were sleeping @@ -386,10 +402,10 @@ fn spawn_maintenance_tasks(pool: &Arc>) { if let Some(duration) = next_run.checked_duration_since(Instant::now()) { // `async-std` doesn't have a `sleep_until()` - crate::rt::sleep(duration).await; + rt::sleep(duration).await; } else { // `next_run` is in the past, just yield. - crate::rt::yield_now().await; + rt::yield_now().await; } } })