diff --git a/Cargo.lock b/Cargo.lock index 30b4a7d345ab2..37c9abee35739 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -13949,6 +13949,7 @@ dependencies = [ "serde", "sui-field-count", "sui-pg-temp-db", + "sui-protocol-config", "sui-storage", "sui-types", "telemetry-subscribers", diff --git a/crates/sui-indexer-alt/Cargo.toml b/crates/sui-indexer-alt/Cargo.toml index 7ca87940bbb8b..57dd7451a6455 100644 --- a/crates/sui-indexer-alt/Cargo.toml +++ b/crates/sui-indexer-alt/Cargo.toml @@ -38,6 +38,7 @@ url.workspace = true mysten-metrics.workspace = true sui-field-count.workspace = true sui-pg-temp-db.workspace = true +sui-protocol-config.workspace = true sui-storage.workspace = true sui-types.workspace = true diff --git a/crates/sui-indexer-alt/migrations/2024-10-19-113135_ev_indices/up.sql b/crates/sui-indexer-alt/migrations/2024-10-19-113135_ev_indices/up.sql index 8e553a30bfa0c..3b27698a1e738 100644 --- a/crates/sui-indexer-alt/migrations/2024-10-19-113135_ev_indices/up.sql +++ b/crates/sui-indexer-alt/migrations/2024-10-19-113135_ev_indices/up.sql @@ -1,8 +1,8 @@ CREATE TABLE IF NOT EXISTS ev_emit_mod ( - package BYTEA, - module TEXT, - tx_sequence_number BIGINT, + package BYTEA NOT NULL, + module TEXT NOT NULL, + tx_sequence_number BIGINT NOT NULL, sender BYTEA NOT NULL, PRIMARY KEY(package, module, tx_sequence_number) ); @@ -21,12 +21,12 @@ ON ev_emit_mod (sender, package, tx_sequence_number); CREATE TABLE IF NOT EXISTS ev_struct_inst ( - package BYTEA, - module TEXT, - name TEXT, + package BYTEA NOT NULL, + module TEXT NOT NULL, + name TEXT NOT NULL, -- BCS encoded array of TypeTags for type parameters. - instantiation BYTEA, - tx_sequence_number BIGINT, + instantiation BYTEA NOT NULL, + tx_sequence_number BIGINT NOT NULL, sender BYTEA NOT NULL, PRIMARY KEY(package, module, name, instantiation, tx_sequence_number) ); diff --git a/crates/sui-indexer-alt/migrations/2024-10-31-000319_sum_packages/down.sql b/crates/sui-indexer-alt/migrations/2024-10-31-000319_sum_packages/down.sql new file mode 100644 index 0000000000000..b8bbded70bf47 --- /dev/null +++ b/crates/sui-indexer-alt/migrations/2024-10-31-000319_sum_packages/down.sql @@ -0,0 +1 @@ +DROP TABLE IF EXISTS sum_packages; diff --git a/crates/sui-indexer-alt/migrations/2024-10-31-000319_sum_packages/up.sql b/crates/sui-indexer-alt/migrations/2024-10-31-000319_sum_packages/up.sql new file mode 100644 index 0000000000000..64a9f75b9d6fc --- /dev/null +++ b/crates/sui-indexer-alt/migrations/2024-10-31-000319_sum_packages/up.sql @@ -0,0 +1,14 @@ +CREATE TABLE IF NOT EXISTS sum_packages +( + package_id BYTEA PRIMARY KEY, + original_id BYTEA NOT NULL, + package_version BIGINT NOT NULL, + move_package BYTEA NOT NULL, + cp_sequence_number BIGINT NOT NULL +); + +CREATE INDEX IF NOT EXISTS sum_packages_cp_id_version +ON sum_packages (cp_sequence_number, original_id, package_version); + +CREATE INDEX IF NOT EXISTS sum_packages_id_version_cp +ON sum_packages (original_id, package_version, cp_sequence_number); diff --git a/crates/sui-indexer-alt/migrations/2024-10-31-174742_sum_displays/down.sql b/crates/sui-indexer-alt/migrations/2024-10-31-174742_sum_displays/down.sql new file mode 100644 index 0000000000000..1a3f10c1f6dbf --- /dev/null +++ b/crates/sui-indexer-alt/migrations/2024-10-31-174742_sum_displays/down.sql @@ -0,0 +1 @@ +DROP TABLE IF EXISTS sum_displays; diff --git a/crates/sui-indexer-alt/migrations/2024-10-31-174742_sum_displays/up.sql b/crates/sui-indexer-alt/migrations/2024-10-31-174742_sum_displays/up.sql new file mode 100644 index 0000000000000..873f0a581cc2b --- /dev/null +++ b/crates/sui-indexer-alt/migrations/2024-10-31-174742_sum_displays/up.sql @@ -0,0 +1,13 @@ +-- This table tracks the latest versions of `Display`, keyed by Object type. +CREATE TABLE IF NOT EXISTS sum_displays +( + -- BCS-encoded StructTag of the object that this Display belongs to. + object_type BYTEA PRIMARY KEY, + -- Object ID of the Display object + display_id BYTEA NOT NULL, + -- Version of the Display object (In the VersionUpdate event this is stored as a u16) + display_version SMALLINT NOT NULL, + -- BCS-encoded content of DisplayVersionUpdatedEvent that was indexed into + -- this record. + display BYTEA NOT NULL +); diff --git a/crates/sui-indexer-alt/migrations/2024-11-01-182359_kv_genesis/down.sql b/crates/sui-indexer-alt/migrations/2024-11-01-182359_kv_genesis/down.sql new file mode 100644 index 0000000000000..c16873902a63c --- /dev/null +++ b/crates/sui-indexer-alt/migrations/2024-11-01-182359_kv_genesis/down.sql @@ -0,0 +1 @@ +DROP TABLE IF EXISTS kv_genesis; diff --git a/crates/sui-indexer-alt/migrations/2024-11-01-182359_kv_genesis/up.sql b/crates/sui-indexer-alt/migrations/2024-11-01-182359_kv_genesis/up.sql new file mode 100644 index 0000000000000..d3ef9a887ffa0 --- /dev/null +++ b/crates/sui-indexer-alt/migrations/2024-11-01-182359_kv_genesis/up.sql @@ -0,0 +1,12 @@ +-- Stores information related to to the genesis checkpoint. +CREATE TABLE IF NOT EXISTS kv_genesis +( + -- The checkpoint digest of the genesis checkpoint + genesis_digest BYTEA PRIMARY KEY, + -- The protocol version from the gensis system state + initial_protocol_version BIGINT NOT NULL +); + +-- Index to ensure there can only be one row in the genesis table. +CREATE UNIQUE INDEX IF NOT EXISTS kv_genesis_unique +ON kv_genesis ((0)); diff --git a/crates/sui-indexer-alt/src/bootstrap.rs b/crates/sui-indexer-alt/src/bootstrap.rs new file mode 100644 index 0000000000000..5b80a8fb8fc52 --- /dev/null +++ b/crates/sui-indexer-alt/src/bootstrap.rs @@ -0,0 +1,108 @@ +// Copyright (c) Mysten Labs, Inc. +// SPDX-License-Identifier: Apache-2.0 + +use std::time::Duration; + +use anyhow::{bail, Context, Result}; +use diesel::{OptionalExtension, QueryDsl, SelectableHelper}; +use diesel_async::RunQueryDsl; +use sui_types::{ + full_checkpoint_content::CheckpointData, + sui_system_state::{get_sui_system_state, SuiSystemStateTrait}, + transaction::{TransactionDataAPI, TransactionKind}, +}; +use tokio_util::sync::CancellationToken; +use tracing::info; + +use crate::{ + models::checkpoints::StoredGenesis, schema::kv_genesis, task::graceful_shutdown, Indexer, +}; + +/// Ensures the genesis table has been populated before the rest of the indexer is run, and returns +/// the information stored there. If the database has been bootstrapped before, this function will +/// simply read the previously bootstrapped information. Otherwise, it will wait until the first +/// checkpoint is available and extract the necessary information from there. +/// +/// Can be cancelled via the `cancel` token, or through an interrupt signal (which will also cancel +/// the token). +pub async fn bootstrap( + indexer: &Indexer, + retry_interval: Duration, + cancel: CancellationToken, +) -> Result { + let Ok(mut conn) = indexer.db().connect().await else { + bail!("Bootstrap failed to get connection for DB"); + }; + + // If the row has already been written, return it. + if let Some(genesis) = kv_genesis::table + .select(StoredGenesis::as_select()) + .first(&mut conn) + .await + .optional()? + { + info!( + chain = genesis.chain()?.as_str(), + protocol = ?genesis.initial_protocol_version(), + "Indexer already bootstrapped", + ); + + return Ok(genesis); + } + + // Otherwise, extract the necessary information from the genesis checkpoint: + // + // - Get the Genesis system transaction from the genesis checkpoint. + // - Get the system state object that was written out by the system transaction. + let ingestion_client = indexer.ingestion_client().clone(); + let wait_cancel = cancel.clone(); + let genesis = tokio::spawn(async move { + ingestion_client + .wait_for(0, retry_interval, &wait_cancel) + .await + }); + + let Some(genesis_checkpoint) = graceful_shutdown(vec![genesis], cancel).await.pop() else { + bail!("Bootstrap cancelled"); + }; + + let genesis_checkpoint = genesis_checkpoint.context("Failed to fetch genesis checkpoint")?; + + let CheckpointData { + checkpoint_summary, + transactions, + .. + } = genesis_checkpoint.as_ref(); + + let Some(genesis_transaction) = transactions.iter().find(|tx| { + matches!( + tx.transaction.intent_message().value.kind(), + TransactionKind::Genesis(_) + ) + }) else { + bail!("Could not find Genesis transaction"); + }; + + let system_state = get_sui_system_state(&genesis_transaction.output_objects.as_slice()) + .context("Failed to get Genesis SystemState")?; + + let genesis = StoredGenesis { + genesis_digest: checkpoint_summary.digest().inner().to_vec(), + initial_protocol_version: system_state.protocol_version() as i64, + }; + + info!( + chain = genesis.chain()?.as_str(), + protocol = ?genesis.initial_protocol_version(), + "Bootstrapped indexer", + ); + + diesel::insert_into(kv_genesis::table) + .values(&genesis) + .on_conflict_do_nothing() + .execute(&mut conn) + .await + .context("Failed to write genesis record")?; + + Ok(genesis) +} diff --git a/crates/sui-indexer-alt/src/handlers/mod.rs b/crates/sui-indexer-alt/src/handlers/mod.rs index fd71fd7b339bd..925cf54de75ed 100644 --- a/crates/sui-indexer-alt/src/handlers/mod.rs +++ b/crates/sui-indexer-alt/src/handlers/mod.rs @@ -8,7 +8,9 @@ pub mod kv_objects; pub mod kv_transactions; pub mod obj_versions; pub mod sum_coin_balances; +pub mod sum_displays; pub mod sum_obj_types; +pub mod sum_packages; pub mod tx_affected_addresses; pub mod tx_affected_objects; pub mod tx_balance_changes; diff --git a/crates/sui-indexer-alt/src/handlers/sum_displays.rs b/crates/sui-indexer-alt/src/handlers/sum_displays.rs new file mode 100644 index 0000000000000..dfbfd0c888bbd --- /dev/null +++ b/crates/sui-indexer-alt/src/handlers/sum_displays.rs @@ -0,0 +1,89 @@ +// Copyright (c) Mysten Labs, Inc. +// SPDX-License-Identifier: Apache-2.0 + +use std::{collections::BTreeMap, sync::Arc}; + +use anyhow::{anyhow, Result}; +use diesel::{upsert::excluded, ExpressionMethods}; +use diesel_async::RunQueryDsl; +use futures::future::try_join_all; +use sui_types::{display::DisplayVersionUpdatedEvent, full_checkpoint_content::CheckpointData}; + +use crate::{ + db, + models::displays::StoredDisplay, + pipeline::{sequential::Handler, Processor}, + schema::sum_displays, +}; + +const CHUNK_ROWS: usize = i16::MAX as usize / 4; + +pub struct SumDisplays; + +impl Processor for SumDisplays { + const NAME: &'static str = "sum_displays"; + + type Value = StoredDisplay; + + fn process(checkpoint: &Arc) -> Result> { + let CheckpointData { transactions, .. } = checkpoint.as_ref(); + + let mut values = vec![]; + for tx in transactions { + let Some(events) = &tx.events else { + continue; + }; + + for event in &events.data { + let Some((object_type, update)) = DisplayVersionUpdatedEvent::try_from_event(event) + else { + continue; + }; + + values.push(StoredDisplay { + object_type: bcs::to_bytes(&object_type).map_err(|e| { + anyhow!( + "Error serializing object type {}: {e}", + object_type.to_canonical_display(/* with_prefix */ true) + ) + })?, + + display_id: update.id.bytes.to_vec(), + display_version: update.version as i16, + display: event.contents.clone(), + }) + } + } + + Ok(values) + } +} + +#[async_trait::async_trait] +impl Handler for SumDisplays { + type Batch = BTreeMap, Self::Value>; + + fn batch(batch: &mut Self::Batch, values: Vec) { + for value in values { + batch.insert(value.object_type.clone(), value); + } + } + + async fn commit(batch: &Self::Batch, conn: &mut db::Connection<'_>) -> Result { + let values: Vec<_> = batch.values().cloned().collect(); + let updates = values.chunks(CHUNK_ROWS).map(|chunk| { + diesel::insert_into(sum_displays::table) + .values(chunk) + .on_conflict(sum_displays::object_type) + .do_update() + .set(( + sum_displays::display_id.eq(excluded(sum_displays::display_id)), + sum_displays::display_version.eq(excluded(sum_displays::display_version)), + sum_displays::display.eq(excluded(sum_displays::display)), + )) + .execute(conn) + }); + + Ok(try_join_all(updates).await?.into_iter().sum()) + } +} diff --git a/crates/sui-indexer-alt/src/handlers/sum_packages.rs b/crates/sui-indexer-alt/src/handlers/sum_packages.rs new file mode 100644 index 0000000000000..d8c3254c61619 --- /dev/null +++ b/crates/sui-indexer-alt/src/handlers/sum_packages.rs @@ -0,0 +1,85 @@ +// Copyright (c) Mysten Labs, Inc. +// SPDX-License-Identifier: Apache-2.0 + +use std::{collections::BTreeMap, sync::Arc}; + +use anyhow::{anyhow, Result}; +use diesel::{upsert::excluded, ExpressionMethods}; +use diesel_async::RunQueryDsl; +use futures::future::try_join_all; +use sui_types::full_checkpoint_content::CheckpointData; + +use crate::{ + db, + models::packages::StoredPackage, + pipeline::{sequential::Handler, Processor}, + schema::sum_packages, +}; + +const CHUNK_ROWS: usize = i16::MAX as usize / 5; + +pub struct SumPackages; + +impl Processor for SumPackages { + const NAME: &'static str = "sum_packages"; + + type Value = StoredPackage; + + fn process(checkpoint: &Arc) -> Result> { + let CheckpointData { + checkpoint_summary, + transactions, + .. + } = checkpoint.as_ref(); + + let cp_sequence_number = checkpoint_summary.sequence_number as i64; + let mut values = vec![]; + for tx in transactions { + for obj in &tx.output_objects { + let Some(package) = obj.data.try_as_package() else { + continue; + }; + + values.push(StoredPackage { + package_id: obj.id().to_vec(), + original_id: package.original_package_id().to_vec(), + package_version: obj.version().value() as i64, + move_package: bcs::to_bytes(package) + .map_err(|e| anyhow!("Error serializing package {}: {e}", obj.id()))?, + cp_sequence_number, + }); + } + } + + Ok(values) + } +} + +#[async_trait::async_trait] +impl Handler for SumPackages { + type Batch = BTreeMap, StoredPackage>; + + fn batch(batch: &mut Self::Batch, values: Vec) { + for value in values { + batch.insert(value.package_id.clone(), value); + } + } + + async fn commit(batch: &Self::Batch, conn: &mut db::Connection<'_>) -> Result { + let values: Vec<_> = batch.values().cloned().collect(); + let updates = values.chunks(CHUNK_ROWS).map(|chunk| { + diesel::insert_into(sum_packages::table) + .values(chunk) + .on_conflict(sum_packages::package_id) + .do_update() + .set(( + sum_packages::package_version.eq(excluded(sum_packages::package_version)), + sum_packages::move_package.eq(excluded(sum_packages::move_package)), + sum_packages::cp_sequence_number.eq(excluded(sum_packages::cp_sequence_number)), + )) + .execute(conn) + }); + + Ok(try_join_all(updates).await?.into_iter().sum()) + } +} diff --git a/crates/sui-indexer-alt/src/ingestion/broadcaster.rs b/crates/sui-indexer-alt/src/ingestion/broadcaster.rs index 8b20b2693415e..a16894aef78ac 100644 --- a/crates/sui-indexer-alt/src/ingestion/broadcaster.rs +++ b/crates/sui-indexer-alt/src/ingestion/broadcaster.rs @@ -3,16 +3,15 @@ use std::sync::Arc; -use backoff::backoff::Constant; use futures::{future::try_join_all, TryStreamExt}; use mysten_metrics::spawn_monitored_task; use sui_types::full_checkpoint_content::CheckpointData; use tokio::{sync::mpsc, task::JoinHandle}; use tokio_stream::{wrappers::ReceiverStream, StreamExt}; use tokio_util::sync::CancellationToken; -use tracing::{debug, error, info}; +use tracing::{error, info}; -use crate::{ingestion::error::Error, metrics::IndexerMetrics}; +use crate::ingestion::error::Error; use super::{client::IngestionClient, IngestionConfig}; @@ -25,7 +24,6 @@ use super::{client::IngestionClient, IngestionConfig}; pub(super) fn broadcaster( config: IngestionConfig, client: IngestionClient, - metrics: Arc, checkpoint_rx: mpsc::Receiver, subscribers: Vec>>, cancel: CancellationToken, @@ -37,7 +35,6 @@ pub(super) fn broadcaster( .map(Ok) .try_for_each_concurrent(/* limit */ config.ingest_concurrency, |cp| { let client = client.clone(); - let metrics = metrics.clone(); let subscribers = subscribers.clone(); // One clone is for the supervisor to signal a cancel if it detects a @@ -46,33 +43,10 @@ pub(super) fn broadcaster( let supervisor_cancel = cancel.clone(); let cancel = cancel.clone(); - // Repeatedly retry if the checkpoint is not found, assuming that we are at the - // tip of the network and it will become available soon. - let backoff = Constant::new(config.retry_interval); - let fetch = move || { - let client = client.clone(); - let metrics = metrics.clone(); - let cancel = cancel.clone(); - - async move { - use backoff::Error as BE; - if cancel.is_cancelled() { - return Err(BE::permanent(Error::Cancelled)); - } - - client.fetch(cp, &cancel).await.map_err(|e| match e { - Error::NotFound(checkpoint) => { - debug!(checkpoint, "Checkpoint not found, retrying..."); - metrics.total_ingested_not_found_retries.inc(); - BE::transient(e) - } - e => BE::permanent(e), - }) - } - }; - async move { - let checkpoint = backoff::future::retry(backoff, fetch).await?; + // Repeatedly retry if the checkpoint is not found, assuming that we are at the + // tip of the network and it will become available soon. + let checkpoint = client.wait_for(cp, config.retry_interval, &cancel).await?; let futures = subscribers.iter().map(|s| s.send(checkpoint.clone())); if try_join_all(futures).await.is_err() { diff --git a/crates/sui-indexer-alt/src/ingestion/client.rs b/crates/sui-indexer-alt/src/ingestion/client.rs index b16a7c51daef1..0649b21a62287 100644 --- a/crates/sui-indexer-alt/src/ingestion/client.rs +++ b/crates/sui-indexer-alt/src/ingestion/client.rs @@ -6,6 +6,7 @@ use crate::ingestion::remote_client::RemoteIngestionClient; use crate::ingestion::Error as IngestionError; use crate::ingestion::Result as IngestionResult; use crate::metrics::IndexerMetrics; +use backoff::backoff::Constant; use backoff::Error as BE; use backoff::ExponentialBackoff; use std::path::PathBuf; @@ -43,7 +44,7 @@ pub enum FetchError { pub type FetchResult = Result; #[derive(Clone)] -pub(crate) struct IngestionClient { +pub struct IngestionClient { client: Arc, /// Wrap the metrics in an `Arc` to keep copies of the client cheap. metrics: Arc, @@ -60,14 +61,50 @@ impl IngestionClient { IngestionClient { client, metrics } } - /// Repeatedly retries transient errors with an exponential backoff (up to [MAX_RETRY_INTERVAL]). - /// Transient errors are either defined by the client implementation that - /// returns a `FetchError::Transient` error variant, or within this function - /// if we fail to deserialize the result as [CheckpointData]. + /// Fetch checkpoint data by sequence number. + /// + /// This function behaves like `IngestionClient::fetch`, but will repeatedly retry the fetch if + /// the checkpoint is not found, on a constant back-off. The time between fetches is controlled + /// by the `retry_interval` parameter. + pub async fn wait_for( + &self, + checkpoint: u64, + retry_interval: Duration, + cancel: &CancellationToken, + ) -> IngestionResult> { + let backoff = Constant::new(retry_interval); + let fetch = || async move { + use backoff::Error as BE; + if cancel.is_cancelled() { + return Err(BE::permanent(IngestionError::Cancelled)); + } + + self.fetch(checkpoint, cancel).await.map_err(|e| match e { + IngestionError::NotFound(checkpoint) => { + debug!(checkpoint, "Checkpoint not found, retrying..."); + self.metrics.total_ingested_not_found_retries.inc(); + BE::transient(e) + } + e => BE::permanent(e), + }) + }; + + backoff::future::retry(backoff, fetch).await + } + + /// Fetch checkpoint data by sequence number. + /// + /// Repeatedly retries transient errors with an exponential backoff (up to + /// [MAX_TRANSIENT_RETRY_INTERVAL]). Transient errors are either defined by the client + /// implementation that returns a [FetchError::Transient] error variant, or within this + /// function if we fail to deserialize the result as [CheckpointData]. + /// /// The function will immediately return on: - /// - non-transient errors determined by the client implementation, - /// This includes both the FetcherError::NotFound and FetcherError::Permanent variants. - /// - cancellation of the supplied `cancel` token. + /// + /// - Non-transient errors determined by the client implementation, this includes both the + /// [FetchError::NotFound] and [FetchError::Permanent] variants. + /// + /// - Cancellation of the supplied `cancel` token. pub(crate) async fn fetch( &self, checkpoint: u64, diff --git a/crates/sui-indexer-alt/src/ingestion/mod.rs b/crates/sui-indexer-alt/src/ingestion/mod.rs index 46e24b2f33d4e..a27860bd5f054 100644 --- a/crates/sui-indexer-alt/src/ingestion/mod.rs +++ b/crates/sui-indexer-alt/src/ingestion/mod.rs @@ -20,7 +20,7 @@ use crate::ingestion::regulator::regulator; use crate::metrics::IndexerMetrics; mod broadcaster; -mod client; +pub mod client; pub mod error; mod local_client; mod regulator; @@ -31,7 +31,6 @@ mod test_utils; pub struct IngestionService { config: IngestionConfig, client: IngestionClient, - metrics: Arc, ingest_hi_tx: mpsc::UnboundedSender<(&'static str, u64)>, ingest_hi_rx: mpsc::UnboundedReceiver<(&'static str, u64)>, subscribers: Vec>>, @@ -42,20 +41,20 @@ pub struct IngestionService { pub struct IngestionConfig { /// Remote Store to fetch checkpoints from. #[arg(long, required = true, group = "source")] - remote_store_url: Option, + pub remote_store_url: Option, /// Path to the local ingestion directory. /// If both remote_store_url and local_ingestion_path are provided, remote_store_url will be used. #[arg(long, required = true, group = "source")] - local_ingestion_path: Option, + pub local_ingestion_path: Option, /// Maximum size of checkpoint backlog across all workers downstream of the ingestion service. #[arg(long, default_value_t = 5000)] - checkpoint_buffer_size: usize, + pub checkpoint_buffer_size: usize, /// Maximum number of checkpoints to attempt to fetch concurrently. #[arg(long, default_value_t = 200)] - ingest_concurrency: usize, + pub ingest_concurrency: usize, /// Polling interval to retry fetching checkpoints that do not exist. #[arg( @@ -64,7 +63,7 @@ pub struct IngestionConfig { value_name = "MILLISECONDS", value_parser = |s: &str| s.parse().map(Duration::from_millis) )] - retry_interval: Duration, + pub retry_interval: Duration, } impl IngestionService { @@ -86,7 +85,6 @@ impl IngestionService { Ok(Self { config, client, - metrics, ingest_hi_tx, ingest_hi_rx, subscribers, @@ -94,6 +92,11 @@ impl IngestionService { }) } + /// The client this service uses to fetch checkpoints. + pub fn client(&self) -> &IngestionClient { + &self.client + } + /// Add a new subscription to the ingestion service. Note that the service is susceptible to /// the "slow receiver" problem: If one receiver is slower to process checkpoints than the /// checkpoint ingestion rate, it will eventually hold up all receivers. @@ -136,7 +139,6 @@ impl IngestionService { let IngestionService { config, client, - metrics, ingest_hi_tx: _, ingest_hi_rx, subscribers, @@ -157,14 +159,7 @@ impl IngestionService { cancel.clone(), ); - let broadcaster = broadcaster( - config, - client, - metrics, - checkpoint_rx, - subscribers, - cancel.clone(), - ); + let broadcaster = broadcaster(config, client, checkpoint_rx, subscribers, cancel.clone()); Ok((regulator, broadcaster)) } diff --git a/crates/sui-indexer-alt/src/lib.rs b/crates/sui-indexer-alt/src/lib.rs index 65017bf517b2f..e8e917aecf73d 100644 --- a/crates/sui-indexer-alt/src/lib.rs +++ b/crates/sui-indexer-alt/src/lib.rs @@ -5,7 +5,7 @@ use std::{collections::BTreeSet, net::SocketAddr, sync::Arc}; use anyhow::{ensure, Context, Result}; use db::{Db, DbConfig}; -use ingestion::{IngestionConfig, IngestionService}; +use ingestion::{client::IngestionClient, IngestionConfig, IngestionService}; use metrics::{IndexerMetrics, MetricsService}; use models::watermarks::CommitterWatermark; use pipeline::{concurrent, sequential, PipelineConfig, Processor}; @@ -15,6 +15,7 @@ use tokio_util::sync::CancellationToken; use tracing::info; pub mod args; +pub mod bootstrap; pub mod db; pub mod handlers; pub mod ingestion; @@ -134,6 +135,16 @@ impl Indexer { }) } + /// The database connection pool used by the indexer. + pub fn db(&self) -> &Db { + &self.db + } + + /// The ingestion client used by the indexer to fetch checkpoints. + pub fn ingestion_client(&self) -> &IngestionClient { + self.ingestion_service.client() + } + /// Adds a new pipeline to this indexer and starts it up. Although their tasks have started, /// they will be idle until the ingestion service starts, and serves it checkpoint data. /// diff --git a/crates/sui-indexer-alt/src/main.rs b/crates/sui-indexer-alt/src/main.rs index 2f656f36f8d8e..3c5a5f3fbce2d 100644 --- a/crates/sui-indexer-alt/src/main.rs +++ b/crates/sui-indexer-alt/src/main.rs @@ -4,16 +4,18 @@ use anyhow::{Context, Result}; use clap::Parser; use sui_indexer_alt::args::Command; +use sui_indexer_alt::bootstrap::bootstrap; use sui_indexer_alt::db::reset_database; use sui_indexer_alt::{ args::Args, handlers::{ ev_emit_mod::EvEmitMod, ev_struct_inst::EvStructInst, kv_checkpoints::KvCheckpoints, kv_objects::KvObjects, kv_transactions::KvTransactions, obj_versions::ObjVersions, - sum_coin_balances::SumCoinBalances, sum_obj_types::SumObjTypes, - tx_affected_addresses::TxAffectedAddress, tx_affected_objects::TxAffectedObjects, - tx_balance_changes::TxBalanceChanges, tx_calls_fun::TxCallsFun, tx_digests::TxDigests, - tx_kinds::TxKinds, wal_coin_balances::WalCoinBalances, wal_obj_types::WalObjTypes, + sum_coin_balances::SumCoinBalances, sum_displays::SumDisplays, sum_obj_types::SumObjTypes, + sum_packages::SumPackages, tx_affected_addresses::TxAffectedAddress, + tx_affected_objects::TxAffectedObjects, tx_balance_changes::TxBalanceChanges, + tx_calls_fun::TxCallsFun, tx_digests::TxDigests, tx_kinds::TxKinds, + wal_coin_balances::WalCoinBalances, wal_obj_types::WalObjTypes, }, Indexer, }; @@ -35,8 +37,11 @@ async fn main() -> Result<()> { indexer, consistent_range: lag, } => { + let retry_interval = indexer.ingestion_config.retry_interval; let mut indexer = Indexer::new(args.db_config, indexer, cancel.clone()).await?; + bootstrap(&indexer, retry_interval, cancel.clone()).await?; + indexer.concurrent_pipeline::().await?; indexer.concurrent_pipeline::().await?; indexer.concurrent_pipeline::().await?; @@ -53,7 +58,9 @@ async fn main() -> Result<()> { indexer.concurrent_pipeline::().await?; indexer.concurrent_pipeline::().await?; indexer.sequential_pipeline::(lag).await?; + indexer.sequential_pipeline::(None).await?; indexer.sequential_pipeline::(lag).await?; + indexer.sequential_pipeline::(None).await?; let h_indexer = indexer.run().await.context("Failed to start indexer")?; diff --git a/crates/sui-indexer-alt/src/models/checkpoints.rs b/crates/sui-indexer-alt/src/models/checkpoints.rs index 5a6cf3bcb20d6..e4ffe999433a1 100644 --- a/crates/sui-indexer-alt/src/models/checkpoints.rs +++ b/crates/sui-indexer-alt/src/models/checkpoints.rs @@ -1,9 +1,13 @@ // Copyright (c) Mysten Labs, Inc. // SPDX-License-Identifier: Apache-2.0 -use crate::schema::kv_checkpoints; +use anyhow::{anyhow, Result}; use diesel::prelude::*; use sui_field_count::FieldCount; +use sui_protocol_config::{Chain, ProtocolVersion}; +use sui_types::digests::{ChainIdentifier, CheckpointDigest}; + +use crate::schema::{kv_checkpoints, kv_genesis}; #[derive(Insertable, Debug, Clone, FieldCount)] #[diesel(table_name = kv_checkpoints)] @@ -14,3 +18,32 @@ pub struct StoredCheckpoint { /// BCS serialized CheckpointContents pub checkpoint_contents: Vec, } + +#[derive(Insertable, Selectable, Queryable, Debug, Clone)] +#[diesel(table_name = kv_genesis)] +pub struct StoredGenesis { + pub genesis_digest: Vec, + pub initial_protocol_version: i64, +} + +impl StoredGenesis { + /// Try and identify the chain that this indexer is idnexing based on its genesis checkpoint + /// digest. + pub fn chain(&self) -> Result { + let bytes: [u8; 32] = self + .genesis_digest + .clone() + .try_into() + .map_err(|_| anyhow!("Bad genesis digest"))?; + + let digest = CheckpointDigest::new(bytes); + let identifier = ChainIdentifier::from(digest); + + Ok(identifier.chain()) + } + + /// The protocol version that the chain was started at. + pub fn initial_protocol_version(&self) -> ProtocolVersion { + ProtocolVersion::new(self.initial_protocol_version as u64) + } +} diff --git a/crates/sui-indexer-alt/src/models/displays.rs b/crates/sui-indexer-alt/src/models/displays.rs new file mode 100644 index 0000000000000..afde5740098d8 --- /dev/null +++ b/crates/sui-indexer-alt/src/models/displays.rs @@ -0,0 +1,15 @@ +// Copyright (c) Mysten Labs, Inc. +// SPDX-License-Identifier: Apache-2.0 + +use diesel::prelude::*; + +use crate::schema::sum_displays; + +#[derive(Insertable, Debug, Clone)] +#[diesel(table_name = sum_displays, primary_key(object_type))] +pub struct StoredDisplay { + pub object_type: Vec, + pub display_id: Vec, + pub display_version: i16, + pub display: Vec, +} diff --git a/crates/sui-indexer-alt/src/models/mod.rs b/crates/sui-indexer-alt/src/models/mod.rs index b20e260b29176..df7c6c25238cf 100644 --- a/crates/sui-indexer-alt/src/models/mod.rs +++ b/crates/sui-indexer-alt/src/models/mod.rs @@ -2,7 +2,9 @@ // SPDX-License-Identifier: Apache-2.0 pub mod checkpoints; +pub mod displays; pub mod events; pub mod objects; +pub mod packages; pub mod transactions; pub mod watermarks; diff --git a/crates/sui-indexer-alt/src/models/packages.rs b/crates/sui-indexer-alt/src/models/packages.rs new file mode 100644 index 0000000000000..920419dd89e77 --- /dev/null +++ b/crates/sui-indexer-alt/src/models/packages.rs @@ -0,0 +1,16 @@ +// Copyright (c) Mysten Labs, Inc. +// SPDX-License-Identifier: Apache-2.0 + +use diesel::prelude::*; + +use crate::schema::sum_packages; + +#[derive(Insertable, Debug, Clone)] +#[diesel(table_name = sum_packages, primary_key(package_id))] +pub struct StoredPackage { + pub package_id: Vec, + pub original_id: Vec, + pub package_version: i64, + pub move_package: Vec, + pub cp_sequence_number: i64, +} diff --git a/crates/sui-indexer-alt/src/schema.rs b/crates/sui-indexer-alt/src/schema.rs index 432a88382d4e5..d7febe3eea6aa 100644 --- a/crates/sui-indexer-alt/src/schema.rs +++ b/crates/sui-indexer-alt/src/schema.rs @@ -30,6 +30,13 @@ diesel::table! { } } +diesel::table! { + kv_genesis (genesis_digest) { + genesis_digest -> Bytea, + initial_protocol_version -> Int8, + } +} + diesel::table! { kv_objects (object_id, object_version) { object_id -> Bytea, @@ -68,6 +75,15 @@ diesel::table! { } } +diesel::table! { + sum_displays (object_type) { + object_type -> Bytea, + display_id -> Bytea, + display_version -> Int2, + display -> Bytea, + } +} + diesel::table! { sum_obj_types (object_id) { object_id -> Bytea, @@ -81,6 +97,16 @@ diesel::table! { } } +diesel::table! { + sum_packages (package_id) { + package_id -> Bytea, + original_id -> Bytea, + package_version -> Int8, + move_package -> Bytea, + cp_sequence_number -> Int8, + } +} + diesel::table! { tx_affected_addresses (affected, tx_sequence_number) { affected -> Bytea, @@ -171,11 +197,14 @@ diesel::allow_tables_to_appear_in_same_query!( ev_emit_mod, ev_struct_inst, kv_checkpoints, + kv_genesis, kv_objects, kv_transactions, obj_versions, sum_coin_balances, + sum_displays, sum_obj_types, + sum_packages, tx_affected_addresses, tx_affected_objects, tx_balance_changes, diff --git a/crates/sui-indexer-alt/src/task.rs b/crates/sui-indexer-alt/src/task.rs index d027541a78310..f0c59183942f7 100644 --- a/crates/sui-indexer-alt/src/task.rs +++ b/crates/sui-indexer-alt/src/task.rs @@ -9,10 +9,12 @@ use tokio_util::sync::CancellationToken; /// Manages cleanly exiting the process, either because one of its constituent services has stopped /// or because an interrupt signal was sent to the process. -pub async fn graceful_shutdown( - services: impl IntoIterator>, +/// +/// Returns the exit values from all services that exited successfully. +pub async fn graceful_shutdown( + services: impl IntoIterator>, cancel: CancellationToken, -) { +) -> Vec { // If the service is naturalling winding down, we don't need to wait for an interrupt signal. // This channel is used to short-circuit the await in that case. let (cancel_ctrl_c_tx, cancel_ctrl_c_rx) = oneshot::channel(); @@ -24,20 +26,23 @@ pub async fn graceful_shutdown( _ = signal::ctrl_c() => cancel.cancel(), } - Ok(()) + None }; tokio::pin!(interrupt); let futures: Vec<_> = services .into_iter() - .map(Either::Left) + .map(|s| Either::Left(Box::pin(async move { s.await.ok() }))) .chain(iter::once(Either::Right(interrupt))) .collect(); // Wait for the first service to finish, or for an interrupt signal. - let (_, _, rest) = future::select_all(futures).await; + let (first, _, rest) = future::select_all(futures).await; let _ = cancel_ctrl_c_tx.send(()); // Wait for the remaining services to finish. - let _ = future::join_all(rest).await; + let mut results = vec![]; + results.extend(first); + results.extend(future::join_all(rest).await.into_iter().flatten()); + results }