Skip to content

Commit

Permalink
comments: submodule and reorg codes
Browse files Browse the repository at this point in the history
  • Loading branch information
gegaowp committed Oct 31, 2024
1 parent bec36b2 commit e66ce7d
Show file tree
Hide file tree
Showing 3 changed files with 111 additions and 110 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,8 @@ use std::time::Duration;
use tracing::info;
use url::Url;

pub(crate) mod tempdb;

const MIGRATIONS: EmbeddedMigrations = embed_migrations!("migrations");

#[derive(Clone)]
Expand Down Expand Up @@ -173,9 +175,9 @@ pub async fn reset_database(
#[cfg(test)]
mod tests {
use super::*;
use crate::tempdb::TempDb;
use diesel::prelude::QueryableByName;
use diesel_async::RunQueryDsl;
use tempdb::TempDb;

#[derive(QueryableByName)]
struct CountResult {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@ use std::{
process::{Child, Command},
time::{Duration, Instant},
};
use tracing::trace;
use tracing::{event_enabled, info, trace};
use url::Url;

/// A temporary, local postgres database
Expand All @@ -26,10 +26,34 @@ pub struct TempDb {
dir: tempfile::TempDir,
}

/// Local instance of a `postgres` server.
///
/// See <https://www.postgresql.org/docs/16/app-postgres.html> for more info.
pub struct LocalDatabase {
dir: PathBuf,
port: u16,
url: Url,
process: Option<PostgresProcess>,
}

#[derive(Debug)]
struct PostgresProcess {
dir: PathBuf,
inner: Child,
}

#[derive(Debug)]
enum HealthCheckError {
NotRunning,
NotReady,
#[allow(unused)]
Unknown(String),
}

impl TempDb {
/// Create and start a new temporary postgres database.
///
/// A fresh database will be initialized in a temporary directory that will be cleandup on drop.
/// A fresh database will be initialized in a temporary directory that will be cleand up on drop.
/// The running `postgres` service will be serving traffic on an available, os-assigned port.
pub fn new() -> Result<Self> {
let dir = tempfile::TempDir::new()?;
Expand All @@ -53,105 +77,6 @@ impl TempDb {
}
}

#[derive(Debug)]
struct PostgresProcess {
dir: PathBuf,
inner: Child,
}

impl PostgresProcess {
fn start(dir: PathBuf, port: u16) -> Result<Self> {
let child = Command::new("postgres")
// Set the data directory to use
.arg("-D")
.arg(&dir)
// Set the port to listen for incoming connections
.args(["-p", &port.to_string()])
// Disable creating and listening on a UDS
.args(["-c", "unix_socket_directories="])
// pipe stdout and stderr to files located in the data directory
.stdout(
OpenOptions::new()
.create(true)
.append(true)
.open(dir.join("stdout"))?,
)
.stderr(
OpenOptions::new()
.create(true)
.append(true)
.open(dir.join("stderr"))?,
)
.spawn()
.context("command not found: postgres")?;

Ok(Self { dir, inner: child })
}

// https://www.postgresql.org/docs/16/app-pg-ctl.html
fn pg_ctl_stop(&mut self) -> Result<()> {
let output = Command::new("pg_ctl")
.arg("stop")
.arg("-D")
.arg(&self.dir)
.arg("-mfast")
.output()
.context("command not found: pg_ctl")?;

if output.status.success() {
Ok(())
} else {
Err(anyhow!("couldn't shut down postgres"))
}
}

fn dump_stdout_stderr(&self) -> Result<(String, String)> {
let stdout = std::fs::read_to_string(self.dir.join("stdout"))?;
let stderr = std::fs::read_to_string(self.dir.join("stderr"))?;

Ok((stdout, stderr))
}
}

impl Drop for PostgresProcess {
// When the Process struct goes out of scope we need to kill the child process
fn drop(&mut self) {
tracing::error!("dropping postgres");
// check if the process has already been terminated
match self.inner.try_wait() {
// The child process has already terminated, perhaps due to a crash
Ok(Some(_)) => {}

// The process is still running so we need to attempt to kill it
_ => {
if self.pg_ctl_stop().is_err() {
// Couldn't gracefully stop server so we'll just kill it
self.inner.kill().expect("postgres couldn't be killed");
}
self.inner.wait().unwrap();
}
}

// Dump the contents of stdout/stderr if TRACE is enabled
if tracing::event_enabled!(tracing::Level::TRACE) {
if let Ok((stdout, stderr)) = self.dump_stdout_stderr() {
trace!("stdout: {stdout}");
trace!("stderr: {stderr}");
}
}
}
}

/// Local instance of a `postgres` server.
///
/// See <https://www.postgresql.org/docs/16/app-postgres.html> for more info.
pub struct LocalDatabase {
dir: PathBuf,
port: u16,
url: Url,
process: Option<PostgresProcess>,
}

impl LocalDatabase {
/// Start a local `postgres` database service.
///
Expand Down Expand Up @@ -235,12 +160,87 @@ impl LocalDatabase {
}
}

#[derive(Debug)]
enum HealthCheckError {
NotRunning,
NotReady,
#[allow(unused)]
Unknown(String),
impl PostgresProcess {
fn start(dir: PathBuf, port: u16) -> Result<Self> {
let child = Command::new("postgres")
// Set the data directory to use
.arg("-D")
.arg(&dir)
// Set the port to listen for incoming connections
.args(["-p", &port.to_string()])
// Disable creating and listening on a UDS
.args(["-c", "unix_socket_directories="])
// pipe stdout and stderr to files located in the data directory
.stdout(
OpenOptions::new()
.create(true)
.append(true)
.open(dir.join("stdout"))?,
)
.stderr(
OpenOptions::new()
.create(true)
.append(true)
.open(dir.join("stderr"))?,
)
.spawn()
.context("command not found: postgres")?;

Ok(Self { dir, inner: child })
}

// https://www.postgresql.org/docs/16/app-pg-ctl.html
fn pg_ctl_stop(&mut self) -> Result<()> {
let output = Command::new("pg_ctl")
.arg("stop")
.arg("-D")
.arg(&self.dir)
.arg("-mfast")
.output()
.context("command not found: pg_ctl")?;

if output.status.success() {
Ok(())
} else {
Err(anyhow!("couldn't shut down postgres"))
}
}

fn dump_stdout_stderr(&self) -> Result<(String, String)> {
let stdout = std::fs::read_to_string(self.dir.join("stdout"))?;
let stderr = std::fs::read_to_string(self.dir.join("stderr"))?;

Ok((stdout, stderr))
}
}

impl Drop for PostgresProcess {
// When the Process struct goes out of scope we need to kill the child process
fn drop(&mut self) {
info!("dropping postgres");
// check if the process has already been terminated
match self.inner.try_wait() {
// The child process has already terminated, perhaps due to a crash
Ok(Some(_)) => {}

// The process is still running so we need to attempt to kill it
_ => {
if self.pg_ctl_stop().is_err() {
// Couldn't gracefully stop server so we'll just kill it
self.inner.kill().expect("postgres couldn't be killed");
}
self.inner.wait().unwrap();
}
}

// Dump the contents of stdout/stderr if TRACE is enabled
if event_enabled!(tracing::Level::TRACE) {
if let Ok((stdout, stderr)) = self.dump_stdout_stderr() {
trace!("stdout: {stdout}");
trace!("stderr: {stderr}");
}
}
}
}

/// Run the postgres `pg_isready` command to get the status of database
Expand Down Expand Up @@ -320,8 +320,8 @@ fn get_ephemeral_port() -> std::io::Result<u16> {
mod test {
#[tokio::test]
async fn smoketest() {
use crate::db::tempdb::TempDb;
use crate::db::{Db, DbConfig};
use crate::tempdb::TempDb;
use diesel_async::RunQueryDsl;

telemetry_subscribers::init_for_testing();
Expand Down
1 change: 0 additions & 1 deletion crates/sui-indexer-alt/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,6 @@ pub mod models;
pub mod pipeline;
pub mod schema;
pub mod task;
pub mod tempdb;

pub struct Indexer {
/// Connection pool to the database.
Expand Down

0 comments on commit e66ce7d

Please sign in to comment.