Skip to content

Commit

Permalink
indexer-alt: sum_packages pipeline
Browse files Browse the repository at this point in the history
## Description

Pipeline for writing out the set of latest packages. This is used to do
type resolution during reads, and also to answer queries about package
versions.

Originally, this was called `kv_packages`, but I realised that because
of system packages, it is not append-only, so I renamed it to match the
other summary tables.

## Test plan

Ran the indexer with the new pipeline on the first 1.2M checkpoints.
This includes multiple system package upgrades and the first user
package publish.

Inspect the contents of the table at that point:

```
sui$ cargo run -p sui-indexer-alt --release --                                   \
  --database-url "postgres://postgres:postgrespw@localhost:5432/sui_indexer_alt" \
  indexer --remote-store-url https://checkpoints.mainnet.sui.io                  \
  --last-checkpoint 1200000 --pipeline sum_packages
```

```
sui_indexer_alt=# SELECT package_id, original_id, package_version FROM sum_packages;
                             package_id                             |                            original_id                             | package_version
--------------------------------------------------------------------+--------------------------------------------------------------------+-----------------
 \x0000000000000000000000000000000000000000000000000000000000000001 | \x0000000000000000000000000000000000000000000000000000000000000001 |               1
 \x0000000000000000000000000000000000000000000000000000000000000002 | \x0000000000000000000000000000000000000000000000000000000000000002 |               2
 \x0000000000000000000000000000000000000000000000000000000000000003 | \x0000000000000000000000000000000000000000000000000000000000000003 |               3
 \x000000000000000000000000000000000000000000000000000000000000dee9 | \x000000000000000000000000000000000000000000000000000000000000dee9 |               2
 \x39ac04c24dbedf422abb8d582973ee733dbbab07a597fc98300666abe7982034 | \x39ac04c24dbedf422abb8d582973ee733dbbab07a597fc98300666abe7982034 |               1
```
  • Loading branch information
amnn committed Nov 1, 2024
1 parent a9ec67d commit aaabb93
Show file tree
Hide file tree
Showing 8 changed files with 128 additions and 1 deletion.
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
DROP TABLE IF EXISTS sum_packages;
Original file line number Diff line number Diff line change
@@ -0,0 +1,14 @@
CREATE TABLE IF NOT EXISTS sum_packages
(
package_id BYTEA PRIMARY KEY,
original_id BYTEA NOT NULL,
package_version BIGINT NOT NULL,
move_package BYTEA NOT NULL,
cp_sequence_number BIGINT NOT NULL
);

CREATE INDEX IF NOT EXISTS sum_packages_cp_id_version
ON sum_packages (cp_sequence_number, original_id, package_version);

CREATE INDEX IF NOT EXISTS sum_packages_id_version_cp
ON sum_packages (original_id, package_version, cp_sequence_number);
1 change: 1 addition & 0 deletions crates/sui-indexer-alt/src/handlers/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@ pub mod kv_transactions;
pub mod obj_versions;
pub mod sum_coin_balances;
pub mod sum_obj_types;
pub mod sum_packages;
pub mod tx_affected_objects;
pub mod tx_balance_changes;
pub mod wal_coin_balances;
Expand Down
82 changes: 82 additions & 0 deletions crates/sui-indexer-alt/src/handlers/sum_packages.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,82 @@
// Copyright (c) Mysten Labs, Inc.
// SPDX-License-Identifier: Apache-2.0

use std::sync::Arc;

use anyhow::{anyhow, Result};
use diesel::{upsert::excluded, ExpressionMethods};
use diesel_async::RunQueryDsl;
use futures::future::try_join_all;
use sui_types::full_checkpoint_content::CheckpointData;

use crate::{
db,
models::packages::StoredPackage,
pipeline::{sequential::Handler, Processor},
schema::sum_packages,
};

const CHUNK_ROWS: usize = i16::MAX as usize / 5;

pub struct SumPackages;

impl Processor for SumPackages {
const NAME: &'static str = "sum_packages";

type Value = StoredPackage;

fn process(checkpoint: &Arc<CheckpointData>) -> Result<Vec<Self::Value>> {
let CheckpointData {
checkpoint_summary,
transactions,
..
} = checkpoint.as_ref();

let cp_sequence_number = checkpoint_summary.sequence_number as i64;
let mut values = vec![];
for tx in transactions {
for obj in &tx.output_objects {
let Some(package) = obj.data.try_as_package() else {
continue;
};

values.push(StoredPackage {
package_id: obj.id().to_vec(),
original_id: package.original_package_id().to_vec(),
package_version: obj.version().value() as i64,
move_package: bcs::to_bytes(package)
.map_err(|e| anyhow!("Error serializing package {}: {e}", obj.id()))?,
cp_sequence_number,
});
}
}

Ok(values)
}
}

#[async_trait::async_trait]
impl Handler for SumPackages {
type Batch = Vec<Self::Value>;

fn batch(batch: &mut Self::Batch, values: Vec<Self::Value>) {
batch.extend(values);
}

async fn commit(values: &Self::Batch, conn: &mut db::Connection<'_>) -> Result<usize> {
let updates = values.chunks(CHUNK_ROWS).map(|chunk| {
diesel::insert_into(sum_packages::table)
.values(chunk)
.on_conflict(sum_packages::package_id)
.do_update()
.set((
sum_packages::package_version.eq(excluded(sum_packages::package_version)),
sum_packages::move_package.eq(excluded(sum_packages::move_package)),
sum_packages::cp_sequence_number.eq(excluded(sum_packages::cp_sequence_number)),
))
.execute(conn)
});

Ok(try_join_all(updates).await?.into_iter().sum())
}
}
3 changes: 2 additions & 1 deletion crates/sui-indexer-alt/src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@ use sui_indexer_alt::{
handlers::{
ev_emit_mod::EvEmitMod, ev_struct_inst::EvStructInst, kv_checkpoints::KvCheckpoints,
kv_objects::KvObjects, kv_transactions::KvTransactions, obj_versions::ObjVersions,
sum_coin_balances::SumCoinBalances, sum_obj_types::SumObjTypes,
sum_coin_balances::SumCoinBalances, sum_obj_types::SumObjTypes, sum_packages::SumPackages,
tx_affected_objects::TxAffectedObjects, tx_balance_changes::TxBalanceChanges,
wal_coin_balances::WalCoinBalances, wal_obj_types::WalObjTypes,
},
Expand Down Expand Up @@ -48,6 +48,7 @@ async fn main() -> Result<()> {
indexer.concurrent_pipeline::<WalObjTypes>().await?;
indexer.sequential_pipeline::<SumCoinBalances>(lag).await?;
indexer.sequential_pipeline::<SumObjTypes>(lag).await?;
indexer.sequential_pipeline::<SumPackages>(None).await?;

let h_indexer = indexer.run().await.context("Failed to start indexer")?;

Expand Down
1 change: 1 addition & 0 deletions crates/sui-indexer-alt/src/models/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,5 +4,6 @@
pub mod checkpoints;
pub mod events;
pub mod objects;
pub mod packages;
pub mod transactions;
pub mod watermarks;
16 changes: 16 additions & 0 deletions crates/sui-indexer-alt/src/models/packages.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,16 @@
// Copyright (c) Mysten Labs, Inc.
// SPDX-License-Identifier: Apache-2.0

use diesel::prelude::*;

use crate::schema::sum_packages;

#[derive(Insertable, Debug, Clone)]
#[diesel(table_name = sum_packages, primary_key(package_id))]
pub struct StoredPackage {
pub package_id: Vec<u8>,
pub original_id: Vec<u8>,
pub package_version: i64,
pub move_package: Vec<u8>,
pub cp_sequence_number: i64,
}
11 changes: 11 additions & 0 deletions crates/sui-indexer-alt/src/schema.rs
Original file line number Diff line number Diff line change
Expand Up @@ -81,6 +81,16 @@ diesel::table! {
}
}

diesel::table! {
sum_packages (package_id) {
package_id -> Bytea,
original_id -> Bytea,
package_version -> Int8,
move_package -> Bytea,
cp_sequence_number -> Int8,
}
}

diesel::table! {
tx_affected_objects (affected, tx_sequence_number) {
tx_sequence_number -> Int8,
Expand Down Expand Up @@ -144,6 +154,7 @@ diesel::allow_tables_to_appear_in_same_query!(
obj_versions,
sum_coin_balances,
sum_obj_types,
sum_packages,
tx_affected_objects,
tx_balance_changes,
wal_coin_balances,
Expand Down

0 comments on commit aaabb93

Please sign in to comment.