diff --git a/crates/sui-indexer-alt/src/db.rs b/crates/sui-indexer-alt/src/db/mod.rs similarity index 95% rename from crates/sui-indexer-alt/src/db.rs rename to crates/sui-indexer-alt/src/db/mod.rs index e1d078a4f0134..6af6dedbe7d92 100644 --- a/crates/sui-indexer-alt/src/db.rs +++ b/crates/sui-indexer-alt/src/db/mod.rs @@ -16,7 +16,11 @@ use std::time::Duration; use tracing::info; use url::Url; +pub mod tempdb; + const MIGRATIONS: EmbeddedMigrations = embed_migrations!("migrations"); +const DEFAULT_POOL_SIZE: u32 = 100; +const DEFAULT_CONNECTION_TIMEOUT_SECS: u64 = 60; #[derive(Clone)] pub struct Db { @@ -30,33 +34,19 @@ pub struct DbConfig { database_url: Url, /// Number of connections to keep in the pool. - #[arg(long, default_value_t = 100)] + #[arg(long, default_value = stringify!(DEFAULT_POOL_SIZE))] connection_pool_size: u32, /// Time spent waiting for a connection from the pool to become available. #[arg( long, - default_value = "60", + default_value = stringify!(DEFAULT_CONNECTION_TIMEOUT_SECS), value_name = "SECONDS", value_parser = |s: &str| s.parse().map(Duration::from_secs) )] connection_timeout: Duration, } -impl DbConfig { - pub fn new( - database_url: Url, - connection_pool_size: Option, - connection_timeout: Option, - ) -> Self { - Self { - database_url, - connection_pool_size: connection_pool_size.unwrap_or(100), // clap default - connection_timeout: connection_timeout.unwrap_or(Duration::from_secs(60)), // clap default - } - } -} - pub type Connection<'p> = PooledConnection<'p, AsyncPgConnection>; impl Db { @@ -158,6 +148,20 @@ impl Db { } } +impl DbConfig { + pub fn new( + database_url: Url, + connection_pool_size: Option, + connection_timeout: Option, + ) -> Self { + Self { + database_url, + connection_pool_size: connection_pool_size.unwrap_or(DEFAULT_POOL_SIZE), // clap default + connection_timeout: connection_timeout.unwrap_or(Duration::from_secs(DEFAULT_CONNECTION_TIMEOUT_SECS)), // clap default + } + } +} + /// Drop all tables and rerunning migrations. pub async fn reset_database( db_config: DbConfig, @@ -173,9 +177,9 @@ pub async fn reset_database( #[cfg(test)] mod tests { use super::*; - use crate::tempdb::TempDb; use diesel::prelude::QueryableByName; use diesel_async::RunQueryDsl; + use tempdb::TempDb; #[derive(QueryableByName)] struct CountResult { diff --git a/crates/sui-indexer-alt/src/tempdb.rs b/crates/sui-indexer-alt/src/db/tempdb.rs similarity index 98% rename from crates/sui-indexer-alt/src/tempdb.rs rename to crates/sui-indexer-alt/src/db/tempdb.rs index 840800b01d90d..88d28fee3c60d 100644 --- a/crates/sui-indexer-alt/src/tempdb.rs +++ b/crates/sui-indexer-alt/src/db/tempdb.rs @@ -10,7 +10,7 @@ use std::{ process::{Child, Command}, time::{Duration, Instant}, }; -use tracing::trace; +use tracing::{event_enabled, info, trace}; use url::Url; /// A temporary, local postgres database @@ -26,10 +26,34 @@ pub struct TempDb { dir: tempfile::TempDir, } +/// Local instance of a `postgres` server. +/// +/// See for more info. +pub struct LocalDatabase { + dir: PathBuf, + port: u16, + url: Url, + process: Option, +} + +#[derive(Debug)] +struct PostgresProcess { + dir: PathBuf, + inner: Child, +} + +#[derive(Debug)] +enum HealthCheckError { + NotRunning, + NotReady, + #[allow(unused)] + Unknown(String), +} + impl TempDb { /// Create and start a new temporary postgres database. /// - /// A fresh database will be initialized in a temporary directory that will be cleandup on drop. + /// A fresh database will be initialized in a temporary directory that will be cleand up on drop. /// The running `postgres` service will be serving traffic on an available, os-assigned port. pub fn new() -> Result { let dir = tempfile::TempDir::new()?; @@ -53,105 +77,6 @@ impl TempDb { } } -#[derive(Debug)] -struct PostgresProcess { - dir: PathBuf, - inner: Child, -} - -impl PostgresProcess { - fn start(dir: PathBuf, port: u16) -> Result { - let child = Command::new("postgres") - // Set the data directory to use - .arg("-D") - .arg(&dir) - // Set the port to listen for incoming connections - .args(["-p", &port.to_string()]) - // Disable creating and listening on a UDS - .args(["-c", "unix_socket_directories="]) - // pipe stdout and stderr to files located in the data directory - .stdout( - OpenOptions::new() - .create(true) - .append(true) - .open(dir.join("stdout"))?, - ) - .stderr( - OpenOptions::new() - .create(true) - .append(true) - .open(dir.join("stderr"))?, - ) - .spawn() - .context("command not found: postgres")?; - - Ok(Self { dir, inner: child }) - } - - // https://www.postgresql.org/docs/16/app-pg-ctl.html - fn pg_ctl_stop(&mut self) -> Result<()> { - let output = Command::new("pg_ctl") - .arg("stop") - .arg("-D") - .arg(&self.dir) - .arg("-mfast") - .output() - .context("command not found: pg_ctl")?; - - if output.status.success() { - Ok(()) - } else { - Err(anyhow!("couldn't shut down postgres")) - } - } - - fn dump_stdout_stderr(&self) -> Result<(String, String)> { - let stdout = std::fs::read_to_string(self.dir.join("stdout"))?; - let stderr = std::fs::read_to_string(self.dir.join("stderr"))?; - - Ok((stdout, stderr)) - } -} - -impl Drop for PostgresProcess { - // When the Process struct goes out of scope we need to kill the child process - fn drop(&mut self) { - tracing::error!("dropping postgres"); - // check if the process has already been terminated - match self.inner.try_wait() { - // The child process has already terminated, perhaps due to a crash - Ok(Some(_)) => {} - - // The process is still running so we need to attempt to kill it - _ => { - if self.pg_ctl_stop().is_err() { - // Couldn't gracefully stop server so we'll just kill it - self.inner.kill().expect("postgres couldn't be killed"); - } - self.inner.wait().unwrap(); - } - } - - // Dump the contents of stdout/stderr if TRACE is enabled - if tracing::event_enabled!(tracing::Level::TRACE) { - if let Ok((stdout, stderr)) = self.dump_stdout_stderr() { - trace!("stdout: {stdout}"); - trace!("stderr: {stderr}"); - } - } - } -} - -/// Local instance of a `postgres` server. -/// -/// See for more info. -pub struct LocalDatabase { - dir: PathBuf, - port: u16, - url: Url, - process: Option, -} - impl LocalDatabase { /// Start a local `postgres` database service. /// @@ -235,12 +160,87 @@ impl LocalDatabase { } } -#[derive(Debug)] -enum HealthCheckError { - NotRunning, - NotReady, - #[allow(unused)] - Unknown(String), +impl PostgresProcess { + fn start(dir: PathBuf, port: u16) -> Result { + let child = Command::new("postgres") + // Set the data directory to use + .arg("-D") + .arg(&dir) + // Set the port to listen for incoming connections + .args(["-p", &port.to_string()]) + // Disable creating and listening on a UDS + .args(["-c", "unix_socket_directories="]) + // pipe stdout and stderr to files located in the data directory + .stdout( + OpenOptions::new() + .create(true) + .append(true) + .open(dir.join("stdout"))?, + ) + .stderr( + OpenOptions::new() + .create(true) + .append(true) + .open(dir.join("stderr"))?, + ) + .spawn() + .context("command not found: postgres")?; + + Ok(Self { dir, inner: child }) + } + + // https://www.postgresql.org/docs/16/app-pg-ctl.html + fn pg_ctl_stop(&mut self) -> Result<()> { + let output = Command::new("pg_ctl") + .arg("stop") + .arg("-D") + .arg(&self.dir) + .arg("-mfast") + .output() + .context("command not found: pg_ctl")?; + + if output.status.success() { + Ok(()) + } else { + Err(anyhow!("couldn't shut down postgres")) + } + } + + fn dump_stdout_stderr(&self) -> Result<(String, String)> { + let stdout = std::fs::read_to_string(self.dir.join("stdout"))?; + let stderr = std::fs::read_to_string(self.dir.join("stderr"))?; + + Ok((stdout, stderr)) + } +} + +impl Drop for PostgresProcess { + // When the Process struct goes out of scope we need to kill the child process + fn drop(&mut self) { + info!("dropping postgres"); + // check if the process has already been terminated + match self.inner.try_wait() { + // The child process has already terminated, perhaps due to a crash + Ok(Some(_)) => {} + + // The process is still running so we need to attempt to kill it + _ => { + if self.pg_ctl_stop().is_err() { + // Couldn't gracefully stop server so we'll just kill it + self.inner.kill().expect("postgres couldn't be killed"); + } + self.inner.wait().unwrap(); + } + } + + // Dump the contents of stdout/stderr if TRACE is enabled + if event_enabled!(tracing::Level::TRACE) { + if let Ok((stdout, stderr)) = self.dump_stdout_stderr() { + trace!("stdout: {stdout}"); + trace!("stderr: {stderr}"); + } + } + } } /// Run the postgres `pg_isready` command to get the status of database @@ -320,8 +320,8 @@ fn get_ephemeral_port() -> std::io::Result { mod test { #[tokio::test] async fn smoketest() { + use crate::db::tempdb::TempDb; use crate::db::{Db, DbConfig}; - use crate::tempdb::TempDb; use diesel_async::RunQueryDsl; telemetry_subscribers::init_for_testing(); diff --git a/crates/sui-indexer-alt/src/lib.rs b/crates/sui-indexer-alt/src/lib.rs index bf7dd732523b5..fdfe8057966dd 100644 --- a/crates/sui-indexer-alt/src/lib.rs +++ b/crates/sui-indexer-alt/src/lib.rs @@ -23,7 +23,6 @@ pub mod models; pub mod pipeline; pub mod schema; pub mod task; -pub mod tempdb; pub struct Indexer { /// Connection pool to the database.