From bf0fa9cf1a85cdd1da049386439ebfe4e6dc1e3b Mon Sep 17 00:00:00 2001 From: Ashok Menon Date: Fri, 1 Nov 2024 00:21:38 +0000 Subject: [PATCH] indexer-alt: wal_coin_balances pipeline (#20117) ## Description `wal_coin_balances` is to `sum_coin_balances` what `wal_obj_types` is to `sum_obj_types`. ## Test plan Run the indexer, and correlate the live object set calculated from the write-ahead log against the one that's already in the summary table: ``` sui$ cargo run -p sui-indexer-alt --release -- \ --database-url "postgres://postgres:postgrespw@localhost:5432/sui_indexer_alt" \ indexer --remote-store-url https://checkpoints.mainnet.sui.io/ \ --last-checkpoint 5000 ``` ``` sui_indexer_alt=# SELECT COUNT(*) FROM sum_coin_balances; count ------- 178 (1 row) sui_indexer_alt=# SELECT COUNT(*) FROM ( SELECT DISTINCT ON (object_id) * FROM wal_coin_balances ORDER BY object_id, object_version DESC ) o WHERE o.owner_id IS NOT NULL; count ------- 178 (1 row) ``` ## Stack - #20089 - #20114 - #20116 --- ## Release notes Check each box that your changes affect. If none of the boxes relate to your changes, release notes aren't required. For each box you select, include information after the relevant heading that describes the impact of your changes that a user might notice and any actions they must take to implement updates. - [ ] Protocol: - [ ] Nodes (Validators and Full nodes): - [ ] Indexer: - [ ] JSON-RPC: - [ ] GraphQL: - [ ] CLI: - [ ] Rust SDK: - [ ] REST API: --- .../down.sql | 1 + .../up.sql | 49 +++++++++++++++ crates/sui-indexer-alt/src/handlers/mod.rs | 1 + .../src/handlers/wal_coin_balances.rs | 59 +++++++++++++++++++ crates/sui-indexer-alt/src/main.rs | 3 +- crates/sui-indexer-alt/src/models/objects.rs | 15 ++++- crates/sui-indexer-alt/src/schema.rs | 12 ++++ 7 files changed, 138 insertions(+), 2 deletions(-) create mode 100644 crates/sui-indexer-alt/migrations/2024-10-30-232206_wal_coin_balances/down.sql create mode 100644 crates/sui-indexer-alt/migrations/2024-10-30-232206_wal_coin_balances/up.sql create mode 100644 crates/sui-indexer-alt/src/handlers/wal_coin_balances.rs diff --git a/crates/sui-indexer-alt/migrations/2024-10-30-232206_wal_coin_balances/down.sql b/crates/sui-indexer-alt/migrations/2024-10-30-232206_wal_coin_balances/down.sql new file mode 100644 index 00000000000000..a60919b661e845 --- /dev/null +++ b/crates/sui-indexer-alt/migrations/2024-10-30-232206_wal_coin_balances/down.sql @@ -0,0 +1 @@ +DROP TABLE IF EXISTS wal_coin_balances; diff --git a/crates/sui-indexer-alt/migrations/2024-10-30-232206_wal_coin_balances/up.sql b/crates/sui-indexer-alt/migrations/2024-10-30-232206_wal_coin_balances/up.sql new file mode 100644 index 00000000000000..9a78eeea9303b3 --- /dev/null +++ b/crates/sui-indexer-alt/migrations/2024-10-30-232206_wal_coin_balances/up.sql @@ -0,0 +1,49 @@ +-- Write-ahead log for `sum_coin_balances`. +-- +-- It contains the same columns and indices as `sum_coin_balances`, but with +-- the following changes: +-- +-- - A `cp_sequence_number` column (and an index on it), to support pruning by +-- checkpoint. +-- +-- - The primary key includes the version, as the table may contain multiple +-- versions per object ID. +-- +-- - The other fields are nullable, because this table also tracks deleted and +-- wrapped objects. +-- +-- - There is an additional index on ID and version for querying the latest +-- version of every object. +-- +-- This table is used in conjunction with `sum_coin_balances` to support +-- consistent live object set queries: `sum_coin_balances` holds the state of +-- the live object set at some checkpoint `C < T` where `T` is the tip of the +-- chain, and `wal_coin_balances` stores all the updates and deletes between +-- `C` and `T`. +-- +-- To reconstruct the the live object set at some snapshot checkpoint `S` +-- between `C` and `T`, a query can be constructed that starts with the set +-- from `sum_coin_balances` and adds updates in `wal_coin_balances` from +-- `cp_sequence_number <= S`. +-- +-- See `up.sql` for the original `sum_coin_balances` table for documentation on +-- columns. +CREATE TABLE IF NOT EXISTS wal_coin_balances +( + object_id BYTEA NOT NULL, + object_version BIGINT NOT NULL, + owner_id BYTEA, + coin_type BYTEA, + coin_balance BIGINT, + cp_sequence_number BIGINT NOT NULL, + PRIMARY KEY (object_id, object_version) +); + +CREATE INDEX IF NOT EXISTS wal_coin_balances_cp_sequence_number +ON wal_coin_balances (cp_sequence_number); + +CREATE INDEX IF NOT EXISTS wal_coin_balances_version +ON wal_coin_balances (object_id, object_version); + +CREATE INDEX IF NOT EXISTS wal_coin_balances_owner_type +ON wal_coin_balances (owner_id, coin_type, coin_balance, object_id, object_version); diff --git a/crates/sui-indexer-alt/src/handlers/mod.rs b/crates/sui-indexer-alt/src/handlers/mod.rs index 6304ceefe2ec98..055ceb870d8a94 100644 --- a/crates/sui-indexer-alt/src/handlers/mod.rs +++ b/crates/sui-indexer-alt/src/handlers/mod.rs @@ -11,4 +11,5 @@ pub mod sum_coin_balances; pub mod sum_obj_types; pub mod tx_affected_objects; pub mod tx_balance_changes; +pub mod wal_coin_balances; pub mod wal_obj_types; diff --git a/crates/sui-indexer-alt/src/handlers/wal_coin_balances.rs b/crates/sui-indexer-alt/src/handlers/wal_coin_balances.rs new file mode 100644 index 00000000000000..6482d6fc94bb60 --- /dev/null +++ b/crates/sui-indexer-alt/src/handlers/wal_coin_balances.rs @@ -0,0 +1,59 @@ +// Copyright (c) Mysten Labs, Inc. +// SPDX-License-Identifier: Apache-2.0 + +use std::sync::Arc; + +use anyhow::Result; +use diesel_async::RunQueryDsl; +use sui_types::full_checkpoint_content::CheckpointData; + +use crate::{ + db, + models::objects::{StoredObjectUpdate, StoredSumCoinBalance, StoredWalCoinBalance}, + pipeline::{concurrent::Handler, Processor}, + schema::wal_coin_balances, +}; + +use super::sum_coin_balances::SumCoinBalances; + +pub struct WalCoinBalances; + +impl Processor for WalCoinBalances { + const NAME: &'static str = "wal_coin_balances"; + + type Value = StoredObjectUpdate; + + fn process(checkpoint: &Arc) -> Result> { + SumCoinBalances::process(checkpoint) + } +} + +#[async_trait::async_trait] +impl Handler for WalCoinBalances { + const MIN_EAGER_ROWS: usize = 100; + const MAX_CHUNK_ROWS: usize = 1000; + const MAX_PENDING_ROWS: usize = 10000; + + async fn commit(values: &[Self::Value], conn: &mut db::Connection<'_>) -> Result { + let values: Vec<_> = values + .iter() + .map(|value| StoredWalCoinBalance { + object_id: value.object_id.to_vec(), + object_version: value.object_version as i64, + + owner_id: value.update.as_ref().map(|o| o.owner_id.clone()), + + coin_type: value.update.as_ref().map(|o| o.coin_type.clone()), + coin_balance: value.update.as_ref().map(|o| o.coin_balance), + + cp_sequence_number: value.cp_sequence_number as i64, + }) + .collect(); + + Ok(diesel::insert_into(wal_coin_balances::table) + .values(&values) + .on_conflict_do_nothing() + .execute(conn) + .await?) + } +} diff --git a/crates/sui-indexer-alt/src/main.rs b/crates/sui-indexer-alt/src/main.rs index 63c2d33a6eb370..fd8e30cc8bee52 100644 --- a/crates/sui-indexer-alt/src/main.rs +++ b/crates/sui-indexer-alt/src/main.rs @@ -12,7 +12,7 @@ use sui_indexer_alt::{ kv_objects::KvObjects, kv_transactions::KvTransactions, obj_versions::ObjVersions, sum_coin_balances::SumCoinBalances, sum_obj_types::SumObjTypes, tx_affected_objects::TxAffectedObjects, tx_balance_changes::TxBalanceChanges, - wal_obj_types::WalObjTypes, + wal_coin_balances::WalCoinBalances, wal_obj_types::WalObjTypes, }, Indexer, }; @@ -44,6 +44,7 @@ async fn main() -> Result<()> { indexer.concurrent_pipeline::().await?; indexer.concurrent_pipeline::().await?; indexer.concurrent_pipeline::().await?; + indexer.concurrent_pipeline::().await?; indexer.concurrent_pipeline::().await?; indexer.sequential_pipeline::(lag).await?; indexer.sequential_pipeline::(lag).await?; diff --git a/crates/sui-indexer-alt/src/models/objects.rs b/crates/sui-indexer-alt/src/models/objects.rs index b2c7b927fb5083..46a5ac8d5a03b2 100644 --- a/crates/sui-indexer-alt/src/models/objects.rs +++ b/crates/sui-indexer-alt/src/models/objects.rs @@ -8,7 +8,9 @@ use diesel::{ use sui_field_count::FieldCount; use sui_types::base_types::ObjectID; -use crate::schema::{kv_objects, obj_versions, sum_coin_balances, sum_obj_types, wal_obj_types}; +use crate::schema::{ + kv_objects, obj_versions, sum_coin_balances, sum_obj_types, wal_coin_balances, wal_obj_types, +}; #[derive(Insertable, Debug, Clone, FieldCount)] #[diesel(table_name = kv_objects, primary_key(object_id, object_version))] @@ -71,6 +73,17 @@ pub struct StoredSumObjType { pub instantiation: Option>, } +#[derive(Insertable, Debug, Clone)] +#[diesel(table_name = wal_coin_balances, primary_key(object_id, object_version))] +pub struct StoredWalCoinBalance { + pub object_id: Vec, + pub object_version: i64, + pub owner_id: Option>, + pub coin_type: Option>, + pub coin_balance: Option, + pub cp_sequence_number: i64, +} + #[derive(Insertable, Debug, Clone)] #[diesel(table_name = wal_obj_types, primary_key(object_id, object_version))] pub struct StoredWalObjType { diff --git a/crates/sui-indexer-alt/src/schema.rs b/crates/sui-indexer-alt/src/schema.rs index 8fc96989de1213..4d9b24e4666a3e 100644 --- a/crates/sui-indexer-alt/src/schema.rs +++ b/crates/sui-indexer-alt/src/schema.rs @@ -96,6 +96,17 @@ diesel::table! { } } +diesel::table! { + wal_coin_balances (object_id, object_version) { + object_id -> Bytea, + object_version -> Int8, + owner_id -> Nullable, + coin_type -> Nullable, + coin_balance -> Nullable, + cp_sequence_number -> Int8, + } +} + diesel::table! { wal_obj_types (object_id, object_version) { object_id -> Bytea, @@ -134,6 +145,7 @@ diesel::allow_tables_to_appear_in_same_query!( sum_obj_types, tx_affected_objects, tx_balance_changes, + wal_coin_balances, wal_obj_types, watermarks, );