Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

indexer-alt: sum_packages pipeline #20118

Merged
merged 8 commits into from
Nov 5, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

1 change: 1 addition & 0 deletions crates/sui-indexer-alt/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,7 @@ url.workspace = true
mysten-metrics.workspace = true
sui-field-count.workspace = true
sui-pg-temp-db.workspace = true
sui-protocol-config.workspace = true
sui-storage.workspace = true
sui-types.workspace = true

Expand Down
Original file line number Diff line number Diff line change
@@ -1,8 +1,8 @@
CREATE TABLE IF NOT EXISTS ev_emit_mod
(
package BYTEA,
module TEXT,
tx_sequence_number BIGINT,
package BYTEA NOT NULL,
module TEXT NOT NULL,
tx_sequence_number BIGINT NOT NULL,
sender BYTEA NOT NULL,
PRIMARY KEY(package, module, tx_sequence_number)
);
Expand All @@ -21,12 +21,12 @@ ON ev_emit_mod (sender, package, tx_sequence_number);

CREATE TABLE IF NOT EXISTS ev_struct_inst
(
package BYTEA,
module TEXT,
name TEXT,
package BYTEA NOT NULL,
module TEXT NOT NULL,
name TEXT NOT NULL,
-- BCS encoded array of TypeTags for type parameters.
instantiation BYTEA,
tx_sequence_number BIGINT,
instantiation BYTEA NOT NULL,
tx_sequence_number BIGINT NOT NULL,
sender BYTEA NOT NULL,
PRIMARY KEY(package, module, name, instantiation, tx_sequence_number)
);
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
DROP TABLE IF EXISTS sum_packages;
Original file line number Diff line number Diff line change
@@ -0,0 +1,14 @@
CREATE TABLE IF NOT EXISTS sum_packages
amnn marked this conversation as resolved.
Show resolved Hide resolved
(
package_id BYTEA PRIMARY KEY,
original_id BYTEA NOT NULL,
package_version BIGINT NOT NULL,
move_package BYTEA NOT NULL,
cp_sequence_number BIGINT NOT NULL
);

CREATE INDEX IF NOT EXISTS sum_packages_cp_id_version
ON sum_packages (cp_sequence_number, original_id, package_version);

CREATE INDEX IF NOT EXISTS sum_packages_id_version_cp
ON sum_packages (original_id, package_version, cp_sequence_number);
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
DROP TABLE IF EXISTS sum_displays;
Original file line number Diff line number Diff line change
@@ -0,0 +1,13 @@
-- This table tracks the latest versions of `Display`, keyed by Object type.
CREATE TABLE IF NOT EXISTS sum_displays
(
-- BCS-encoded StructTag of the object that this Display belongs to.
object_type BYTEA PRIMARY KEY,
-- Object ID of the Display object
display_id BYTEA NOT NULL,
-- Version of the Display object (In the VersionUpdate event this is stored as a u16)
display_version SMALLINT NOT NULL,
-- BCS-encoded content of DisplayVersionUpdatedEvent that was indexed into
-- this record.
display BYTEA NOT NULL
);
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
DROP TABLE IF EXISTS kv_genesis;
Original file line number Diff line number Diff line change
@@ -0,0 +1,12 @@
-- Stores information related to to the genesis checkpoint.
CREATE TABLE IF NOT EXISTS kv_genesis
(
-- The checkpoint digest of the genesis checkpoint
genesis_digest BYTEA PRIMARY KEY,
-- The protocol version from the gensis system state
initial_protocol_version BIGINT NOT NULL
);

-- Index to ensure there can only be one row in the genesis table.
CREATE UNIQUE INDEX IF NOT EXISTS kv_genesis_unique
ON kv_genesis ((0));
108 changes: 108 additions & 0 deletions crates/sui-indexer-alt/src/bootstrap.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,108 @@
// Copyright (c) Mysten Labs, Inc.
// SPDX-License-Identifier: Apache-2.0

use std::time::Duration;

use anyhow::{bail, Context, Result};
use diesel::{OptionalExtension, QueryDsl, SelectableHelper};
use diesel_async::RunQueryDsl;
use sui_types::{
full_checkpoint_content::CheckpointData,
sui_system_state::{get_sui_system_state, SuiSystemStateTrait},
transaction::{TransactionDataAPI, TransactionKind},
};
use tokio_util::sync::CancellationToken;
use tracing::info;

use crate::{
models::checkpoints::StoredGenesis, schema::kv_genesis, task::graceful_shutdown, Indexer,
};

/// Ensures the genesis table has been populated before the rest of the indexer is run, and returns
/// the information stored there. If the database has been bootstrapped before, this function will
/// simply read the previously bootstrapped information. Otherwise, it will wait until the first
/// checkpoint is available and extract the necessary information from there.
///
/// Can be cancelled via the `cancel` token, or through an interrupt signal (which will also cancel
/// the token).
pub async fn bootstrap(
indexer: &Indexer,
retry_interval: Duration,
cancel: CancellationToken,
) -> Result<StoredGenesis> {
let Ok(mut conn) = indexer.db().connect().await else {
bail!("Bootstrap failed to get connection for DB");
};

// If the row has already been written, return it.
if let Some(genesis) = kv_genesis::table
.select(StoredGenesis::as_select())
.first(&mut conn)
.await
.optional()?
{
info!(
chain = genesis.chain()?.as_str(),
protocol = ?genesis.initial_protocol_version(),
"Indexer already bootstrapped",
);

return Ok(genesis);
}

// Otherwise, extract the necessary information from the genesis checkpoint:
//
// - Get the Genesis system transaction from the genesis checkpoint.
// - Get the system state object that was written out by the system transaction.
let ingestion_client = indexer.ingestion_client().clone();
let wait_cancel = cancel.clone();
let genesis = tokio::spawn(async move {
ingestion_client
.wait_for(0, retry_interval, &wait_cancel)
.await
});

let Some(genesis_checkpoint) = graceful_shutdown(vec![genesis], cancel).await.pop() else {
bail!("Bootstrap cancelled");
};

let genesis_checkpoint = genesis_checkpoint.context("Failed to fetch genesis checkpoint")?;

let CheckpointData {
checkpoint_summary,
transactions,
..
} = genesis_checkpoint.as_ref();

let Some(genesis_transaction) = transactions.iter().find(|tx| {
matches!(
tx.transaction.intent_message().value.kind(),
TransactionKind::Genesis(_)
)
}) else {
bail!("Could not find Genesis transaction");
};

let system_state = get_sui_system_state(&genesis_transaction.output_objects.as_slice())
.context("Failed to get Genesis SystemState")?;

let genesis = StoredGenesis {
genesis_digest: checkpoint_summary.digest().inner().to_vec(),
initial_protocol_version: system_state.protocol_version() as i64,
};

info!(
chain = genesis.chain()?.as_str(),
protocol = ?genesis.initial_protocol_version(),
"Bootstrapped indexer",
);

diesel::insert_into(kv_genesis::table)
.values(&genesis)
.on_conflict_do_nothing()
.execute(&mut conn)
.await
.context("Failed to write genesis record")?;

Ok(genesis)
}
2 changes: 2 additions & 0 deletions crates/sui-indexer-alt/src/handlers/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,9 @@ pub mod kv_objects;
pub mod kv_transactions;
pub mod obj_versions;
pub mod sum_coin_balances;
pub mod sum_displays;
pub mod sum_obj_types;
pub mod sum_packages;
pub mod tx_affected_addresses;
pub mod tx_affected_objects;
pub mod tx_balance_changes;
Expand Down
89 changes: 89 additions & 0 deletions crates/sui-indexer-alt/src/handlers/sum_displays.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,89 @@
// Copyright (c) Mysten Labs, Inc.
// SPDX-License-Identifier: Apache-2.0

use std::{collections::BTreeMap, sync::Arc};

use anyhow::{anyhow, Result};
use diesel::{upsert::excluded, ExpressionMethods};
use diesel_async::RunQueryDsl;
use futures::future::try_join_all;
use sui_types::{display::DisplayVersionUpdatedEvent, full_checkpoint_content::CheckpointData};

use crate::{
db,
models::displays::StoredDisplay,
pipeline::{sequential::Handler, Processor},
schema::sum_displays,
};

const CHUNK_ROWS: usize = i16::MAX as usize / 4;

pub struct SumDisplays;

impl Processor for SumDisplays {
const NAME: &'static str = "sum_displays";

type Value = StoredDisplay;

fn process(checkpoint: &Arc<CheckpointData>) -> Result<Vec<Self::Value>> {
let CheckpointData { transactions, .. } = checkpoint.as_ref();

let mut values = vec![];
for tx in transactions {
let Some(events) = &tx.events else {
continue;
};

for event in &events.data {
let Some((object_type, update)) = DisplayVersionUpdatedEvent::try_from_event(event)
else {
continue;
};

values.push(StoredDisplay {
object_type: bcs::to_bytes(&object_type).map_err(|e| {
anyhow!(
"Error serializing object type {}: {e}",
object_type.to_canonical_display(/* with_prefix */ true)
)
})?,

display_id: update.id.bytes.to_vec(),
display_version: update.version as i16,
display: event.contents.clone(),
})
}
}

Ok(values)
}
}

#[async_trait::async_trait]
impl Handler for SumDisplays {
type Batch = BTreeMap<Vec<u8>, Self::Value>;

fn batch(batch: &mut Self::Batch, values: Vec<Self::Value>) {
for value in values {
batch.insert(value.object_type.clone(), value);
}
}

async fn commit(batch: &Self::Batch, conn: &mut db::Connection<'_>) -> Result<usize> {
let values: Vec<_> = batch.values().cloned().collect();
let updates = values.chunks(CHUNK_ROWS).map(|chunk| {
diesel::insert_into(sum_displays::table)
.values(chunk)
.on_conflict(sum_displays::object_type)
.do_update()
.set((
sum_displays::display_id.eq(excluded(sum_displays::display_id)),
sum_displays::display_version.eq(excluded(sum_displays::display_version)),
sum_displays::display.eq(excluded(sum_displays::display)),
))
.execute(conn)
});

Ok(try_join_all(updates).await?.into_iter().sum())
}
}
85 changes: 85 additions & 0 deletions crates/sui-indexer-alt/src/handlers/sum_packages.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,85 @@
// Copyright (c) Mysten Labs, Inc.
// SPDX-License-Identifier: Apache-2.0

use std::{collections::BTreeMap, sync::Arc};

use anyhow::{anyhow, Result};
use diesel::{upsert::excluded, ExpressionMethods};
use diesel_async::RunQueryDsl;
use futures::future::try_join_all;
use sui_types::full_checkpoint_content::CheckpointData;

use crate::{
db,
models::packages::StoredPackage,
pipeline::{sequential::Handler, Processor},
schema::sum_packages,
};

const CHUNK_ROWS: usize = i16::MAX as usize / 5;

pub struct SumPackages;

impl Processor for SumPackages {
const NAME: &'static str = "sum_packages";

type Value = StoredPackage;

fn process(checkpoint: &Arc<CheckpointData>) -> Result<Vec<Self::Value>> {
let CheckpointData {
checkpoint_summary,
transactions,
..
} = checkpoint.as_ref();

let cp_sequence_number = checkpoint_summary.sequence_number as i64;
let mut values = vec![];
for tx in transactions {
for obj in &tx.output_objects {
let Some(package) = obj.data.try_as_package() else {
continue;
};

values.push(StoredPackage {
package_id: obj.id().to_vec(),
original_id: package.original_package_id().to_vec(),
package_version: obj.version().value() as i64,
move_package: bcs::to_bytes(package)
.map_err(|e| anyhow!("Error serializing package {}: {e}", obj.id()))?,
cp_sequence_number,
});
}
}

Ok(values)
}
}

#[async_trait::async_trait]
impl Handler for SumPackages {
type Batch = BTreeMap<Vec<u8>, StoredPackage>;

fn batch(batch: &mut Self::Batch, values: Vec<Self::Value>) {
for value in values {
batch.insert(value.package_id.clone(), value);
}
}

async fn commit(batch: &Self::Batch, conn: &mut db::Connection<'_>) -> Result<usize> {
let values: Vec<_> = batch.values().cloned().collect();
let updates = values.chunks(CHUNK_ROWS).map(|chunk| {
diesel::insert_into(sum_packages::table)
.values(chunk)
.on_conflict(sum_packages::package_id)
.do_update()
.set((
sum_packages::package_version.eq(excluded(sum_packages::package_version)),
sum_packages::move_package.eq(excluded(sum_packages::move_package)),
sum_packages::cp_sequence_number.eq(excluded(sum_packages::cp_sequence_number)),
))
.execute(conn)
amnn marked this conversation as resolved.
Show resolved Hide resolved
});

Ok(try_join_all(updates).await?.into_iter().sum())
}
}
Loading
Loading