Skip to content

Commit

Permalink
indexer-alt: wal_coin_balances pipeline (#20117)
Browse files Browse the repository at this point in the history
## Description

`wal_coin_balances` is to `sum_coin_balances` what `wal_obj_types` is to
`sum_obj_types`.

## Test plan

Run the indexer, and correlate the live object set calculated from the
write-ahead log against the one that's already in the summary table:

```
sui$ cargo run -p sui-indexer-alt --release --                                   \
  --database-url "postgres://postgres:postgrespw@localhost:5432/sui_indexer_alt" \
  indexer --remote-store-url https://checkpoints.mainnet.sui.io/                 \
  --last-checkpoint 5000
```

```
sui_indexer_alt=# SELECT COUNT(*) FROM sum_coin_balances;
 count
-------
   178
(1 row)

sui_indexer_alt=# SELECT
    COUNT(*)
FROM (
    SELECT DISTINCT ON (object_id)
        *
    FROM
        wal_coin_balances
    ORDER BY
        object_id,
        object_version DESC
) o
WHERE
    o.owner_id IS NOT NULL;
 count
-------
   178
(1 row)
```

## Stack

- #20089 
- #20114
- #20116 

---

## Release notes

Check each box that your changes affect. If none of the boxes relate to
your changes, release notes aren't required.

For each box you select, include information after the relevant heading
that describes the impact of your changes that a user might notice and
any actions they must take to implement updates.

- [ ] Protocol: 
- [ ] Nodes (Validators and Full nodes): 
- [ ] Indexer: 
- [ ] JSON-RPC: 
- [ ] GraphQL: 
- [ ] CLI: 
- [ ] Rust SDK:
- [ ] REST API:
  • Loading branch information
amnn committed Nov 1, 2024
1 parent 71f1faa commit b65f34c
Show file tree
Hide file tree
Showing 7 changed files with 138 additions and 2 deletions.
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
DROP TABLE IF EXISTS wal_coin_balances;
Original file line number Diff line number Diff line change
@@ -0,0 +1,49 @@
-- Write-ahead log for `sum_coin_balances`.
--
-- It contains the same columns and indices as `sum_coin_balances`, but with
-- the following changes:
--
-- - A `cp_sequence_number` column (and an index on it), to support pruning by
-- checkpoint.
--
-- - The primary key includes the version, as the table may contain multiple
-- versions per object ID.
--
-- - The other fields are nullable, because this table also tracks deleted and
-- wrapped objects.
--
-- - There is an additional index on ID and version for querying the latest
-- version of every object.
--
-- This table is used in conjunction with `sum_coin_balances` to support
-- consistent live object set queries: `sum_coin_balances` holds the state of
-- the live object set at some checkpoint `C < T` where `T` is the tip of the
-- chain, and `wal_coin_balances` stores all the updates and deletes between
-- `C` and `T`.
--
-- To reconstruct the the live object set at some snapshot checkpoint `S`
-- between `C` and `T`, a query can be constructed that starts with the set
-- from `sum_coin_balances` and adds updates in `wal_coin_balances` from
-- `cp_sequence_number <= S`.
--
-- See `up.sql` for the original `sum_coin_balances` table for documentation on
-- columns.
CREATE TABLE IF NOT EXISTS wal_coin_balances
(
object_id BYTEA NOT NULL,
object_version BIGINT NOT NULL,
owner_id BYTEA,
coin_type BYTEA,
coin_balance BIGINT,
cp_sequence_number BIGINT NOT NULL,
PRIMARY KEY (object_id, object_version)
);

CREATE INDEX IF NOT EXISTS wal_coin_balances_cp_sequence_number
ON wal_coin_balances (cp_sequence_number);

CREATE INDEX IF NOT EXISTS wal_coin_balances_version
ON wal_coin_balances (object_id, object_version);

CREATE INDEX IF NOT EXISTS wal_coin_balances_owner_type
ON wal_coin_balances (owner_id, coin_type, coin_balance, object_id, object_version);
1 change: 1 addition & 0 deletions crates/sui-indexer-alt/src/handlers/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -11,4 +11,5 @@ pub mod sum_coin_balances;
pub mod sum_obj_types;
pub mod tx_affected_objects;
pub mod tx_balance_changes;
pub mod wal_coin_balances;
pub mod wal_obj_types;
59 changes: 59 additions & 0 deletions crates/sui-indexer-alt/src/handlers/wal_coin_balances.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,59 @@
// Copyright (c) Mysten Labs, Inc.
// SPDX-License-Identifier: Apache-2.0

use std::sync::Arc;

use anyhow::Result;
use diesel_async::RunQueryDsl;
use sui_types::full_checkpoint_content::CheckpointData;

use crate::{
db,
models::objects::{StoredObjectUpdate, StoredSumCoinBalance, StoredWalCoinBalance},
pipeline::{concurrent::Handler, Processor},
schema::wal_coin_balances,
};

use super::sum_coin_balances::SumCoinBalances;

pub struct WalCoinBalances;

impl Processor for WalCoinBalances {
const NAME: &'static str = "wal_coin_balances";

type Value = StoredObjectUpdate<StoredSumCoinBalance>;

fn process(checkpoint: &Arc<CheckpointData>) -> Result<Vec<Self::Value>> {
SumCoinBalances::process(checkpoint)
}
}

#[async_trait::async_trait]
impl Handler for WalCoinBalances {
const MIN_EAGER_ROWS: usize = 100;
const MAX_CHUNK_ROWS: usize = 1000;
const MAX_PENDING_ROWS: usize = 10000;

async fn commit(values: &[Self::Value], conn: &mut db::Connection<'_>) -> Result<usize> {
let values: Vec<_> = values
.iter()
.map(|value| StoredWalCoinBalance {
object_id: value.object_id.to_vec(),
object_version: value.object_version as i64,

owner_id: value.update.as_ref().map(|o| o.owner_id.clone()),

coin_type: value.update.as_ref().map(|o| o.coin_type.clone()),
coin_balance: value.update.as_ref().map(|o| o.coin_balance),

cp_sequence_number: value.cp_sequence_number as i64,
})
.collect();

Ok(diesel::insert_into(wal_coin_balances::table)
.values(&values)
.on_conflict_do_nothing()
.execute(conn)
.await?)
}
}
3 changes: 2 additions & 1 deletion crates/sui-indexer-alt/src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@ use sui_indexer_alt::{
kv_objects::KvObjects, kv_transactions::KvTransactions, obj_versions::ObjVersions,
sum_coin_balances::SumCoinBalances, sum_obj_types::SumObjTypes,
tx_affected_objects::TxAffectedObjects, tx_balance_changes::TxBalanceChanges,
wal_obj_types::WalObjTypes,
wal_coin_balances::WalCoinBalances, wal_obj_types::WalObjTypes,
},
Indexer,
};
Expand Down Expand Up @@ -44,6 +44,7 @@ async fn main() -> Result<()> {
indexer.concurrent_pipeline::<ObjVersions>().await?;
indexer.concurrent_pipeline::<TxAffectedObjects>().await?;
indexer.concurrent_pipeline::<TxBalanceChanges>().await?;
indexer.concurrent_pipeline::<WalCoinBalances>().await?;
indexer.concurrent_pipeline::<WalObjTypes>().await?;
indexer.sequential_pipeline::<SumCoinBalances>(lag).await?;
indexer.sequential_pipeline::<SumObjTypes>(lag).await?;
Expand Down
15 changes: 14 additions & 1 deletion crates/sui-indexer-alt/src/models/objects.rs
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,9 @@ use diesel::{
use sui_field_count::FieldCount;
use sui_types::base_types::ObjectID;

use crate::schema::{kv_objects, obj_versions, sum_coin_balances, sum_obj_types, wal_obj_types};
use crate::schema::{
kv_objects, obj_versions, sum_coin_balances, sum_obj_types, wal_coin_balances, wal_obj_types,
};

#[derive(Insertable, Debug, Clone, FieldCount)]
#[diesel(table_name = kv_objects, primary_key(object_id, object_version))]
Expand Down Expand Up @@ -71,6 +73,17 @@ pub struct StoredSumObjType {
pub instantiation: Option<Vec<u8>>,
}

#[derive(Insertable, Debug, Clone)]
#[diesel(table_name = wal_coin_balances, primary_key(object_id, object_version))]
pub struct StoredWalCoinBalance {
pub object_id: Vec<u8>,
pub object_version: i64,
pub owner_id: Option<Vec<u8>>,
pub coin_type: Option<Vec<u8>>,
pub coin_balance: Option<i64>,
pub cp_sequence_number: i64,
}

#[derive(Insertable, Debug, Clone)]
#[diesel(table_name = wal_obj_types, primary_key(object_id, object_version))]
pub struct StoredWalObjType {
Expand Down
12 changes: 12 additions & 0 deletions crates/sui-indexer-alt/src/schema.rs
Original file line number Diff line number Diff line change
Expand Up @@ -96,6 +96,17 @@ diesel::table! {
}
}

diesel::table! {
wal_coin_balances (object_id, object_version) {
object_id -> Bytea,
object_version -> Int8,
owner_id -> Nullable<Bytea>,
coin_type -> Nullable<Bytea>,
coin_balance -> Nullable<Int8>,
cp_sequence_number -> Int8,
}
}

diesel::table! {
wal_obj_types (object_id, object_version) {
object_id -> Bytea,
Expand Down Expand Up @@ -134,6 +145,7 @@ diesel::allow_tables_to_appear_in_same_query!(
sum_obj_types,
tx_affected_objects,
tx_balance_changes,
wal_coin_balances,
wal_obj_types,
watermarks,
);

0 comments on commit b65f34c

Please sign in to comment.