Skip to content

Commit

Permalink
indexer-alt: epochs pipelines (#20150)
Browse files Browse the repository at this point in the history
## Description

Adds two tables and pipelines: `kv_epoch_starts` and `kv_epoch_ends`, to
index epoch information. This pipeline is different from epoch indexing
in `sui-indexer` in a number of ways:

- It is an append-only pipeline. The columns that are written at the
start and end of the epoch are split into two separate tables that can
be written to concurrently (by separate pipelines).
- The first row of `kv_epoch_starts` is written by the bootstrap process
which seeds the `kv_genesis` table as well, this avoids having to
condition on whether the checkpoint being processed is the genesis
checkpoint in the main pipeline.
- Instead of indexing the number of transactions in the epoch, it tracks
the transaction high watermark -- readers will need to read the records
to calculate the number of total transactions (this avoids having to
read the last epoch's total transactions in the write path).
- We index the `SuiSystemState` object as BCS, rather than the summary
structure.
- We explicitly record whether the epoch advancement at the end of the
epoch triggered safe mode (the system state object also tracks whether
the epoch was started in safe mode).
- Fields related to information that came from `SystemEpochInfoEvent`
have all been consolidated in `kv_epoch_ends`, and they are all
optional, in case of safe mode.

It's worth elaborating on the last bullet point, because this is quite a
subtle, but large change:

- Today, `total_stake` and `storage_fund_balance` are written at the
start of an epoch based on the fields of the `SystemEpochInfoEvent`
emitted from the previous epoch, and are `NOT NULL`.
- The remaining fields were nullable, but only because they would be
written to later, once the epoch was over.

This was awkward to work with in a couple of ways:

- It meant that for the genesis epoch, we needed to make some numbers up
(all zeroes) because we did not have an event to read from.
- We had to do something similar if we hit safe mode.
- When indexing the start and end of epochs separately, it meant that we
had to duplicate work (finding the system epoch info event).

By making the fields nullable, and consolidating them in
`kv_epoch_ends`, we can simplify the pipelines:

- `kv_epoch_starts` and the bootstrapping logic can work purely based on
the system state object.
- `kv_epoch_ends` can work purely based on the `SystemEpochInfoEvent`,
and can leave fields `NULL` if we are in safe mode.

In the case of `kv_epoch_starts` we could also have cut down fields to
just `epoch`, `cp_lo` and `system_state`. I chose not to do this because
the system state is actually quite a large object, and it is beneficial
to avoid having to deserialize to answer simpler queries.

## Test plan

Ran the indexer on the first 1M checkpoints, and correlated the
resulting info in the respective tables from the data that the current
indexer produced:

```
sui_indexer_alt=# SELECT epoch, protocol_version, cp_lo, TO_TIMESTAMP(start_timestamp_ms / 1000), reference_gas_price FROM kv_epoch_starts;
 epoch | protocol_version | cp_lo  |      to_timestamp      | reference_gas_price
-------+------------------+--------+------------------------+---------------------
     0 |                4 |      0 | 2023-04-12 18:00:00+01 |                1000
     1 |                4 |   9770 | 2023-04-13 18:00:02+01 |                1000
     2 |                4 |  85169 | 2023-04-14 18:00:04+01 |                1000
     3 |                4 | 161192 | 2023-04-15 18:00:08+01 |                1000
     4 |                4 | 237074 | 2023-04-16 18:00:11+01 |                1000
     5 |                4 | 314160 | 2023-04-17 18:00:15+01 |                1000
     6 |                4 | 391107 | 2023-04-18 18:00:18+01 |                1000
     7 |                4 | 467716 | 2023-04-19 18:00:21+01 |                1000
     8 |                4 | 544978 | 2023-04-20 18:00:26+01 |                1000
     9 |                5 | 621933 | 2023-04-21 18:00:28+01 |                1000
    10 |                6 | 699410 | 2023-04-22 18:00:31+01 |                1000
    11 |                6 | 777074 | 2023-04-23 18:00:34+01 |                1000
    12 |                6 | 855530 | 2023-04-24 18:00:36+01 |                1000
    13 |                6 | 933559 | 2023-04-25 18:00:39+01 |                1000
(14 rows)

sui_indexer_alt=# SELECT epoch, cp_hi, tx_hi, TO_TIMESTAMP(end_timestamp_ms / 1000), safe_mode, storage_fund_balance, storage_fund_reinvestment, storage_charge, storage_rebate, stake_subsidy_amount, total_gas_fees, total_stake_rewards_distributed, leftover_storage_fund_inflow FROM kv_epoch_ends ORDER BY epoch ASC;
 epoch | cp_hi  | tx_hi  |      to_timestamp      | safe_mode | storage_fund_balance | storage_fund_reinvestment | storage_charge | storage_rebate | stake_subsidy_amount | total_gas_fees | total_stake_rewards_distributed | leftover_storage_fund_inflow
-------+--------+--------+------------------------+-----------+----------------------+---------------------------+----------------+----------------+----------------------+----------------+---------------------------------+------------------------------
     0 |   9770 |   9771 | 2023-04-13 18:00:02+01 | f         |                    0 |                         0 |              0 |              0 |                    0 |              0 |                               0 |                            0
     1 |  85169 |  85174 | 2023-04-14 18:00:04+01 | f         |              2973880 |                         0 |        3952000 |         978120 |                    0 |      102000000 |                       102000000 |                            0
     2 | 161192 | 161199 | 2023-04-15 18:00:08+01 | f         |            717398960 |                         0 |      715403200 |         978120 |                    0 |        1000000 |                         1000000 |                            0
     3 | 237074 | 237084 | 2023-04-16 18:00:11+01 | f         |            733657184 |                         0 |     1430198400 |     1413940176 |                    0 |        2000000 |                         2000000 |                            0
     4 | 314160 | 314171 | 2023-04-17 18:00:15+01 | f         |            733657184 |                         0 |              0 |              0 |                    0 |              0 |                               0 |                            0
     5 | 391107 | 391119 | 2023-04-18 18:00:18+01 | f         |            733657184 |                         0 |              0 |              0 |                    0 |              0 |                               0 |                            0
     6 | 467716 | 467730 | 2023-04-19 18:00:21+01 | f         |            735633184 |                         0 |        1976000 |              0 |                    0 |        1000000 |                         1000000 |                            0
     7 | 544978 | 544994 | 2023-04-20 18:00:26+01 | f         |            729859616 |                         0 |      702475600 |      708249168 |                    0 |        1000000 |                         1000000 |                            0
     8 | 621933 | 621950 | 2023-04-21 18:00:28+01 | f         |            729859616 |                         0 |              0 |              0 |                    0 |              0 |                               0 |                            0
     9 | 699410 | 699428 | 2023-04-22 18:00:31+01 | f         |            729859616 |                         0 |              0 |              0 |                    0 |              0 |                               0 |                            0
    10 | 777074 | 777093 | 2023-04-23 18:00:34+01 | f         |            729859616 |                         0 |              0 |              0 |                    0 |              0 |                               0 |                            0
    11 | 855530 | 855550 | 2023-04-24 18:00:36+01 | f         |            729859616 |                         0 |              0 |              0 |                    0 |              0 |                               0 |                            0
    12 | 933559 | 933586 | 2023-04-25 18:00:39+01 | f         |            735866656 |                         0 |       13832000 |        7824960 |                    0 |        6000000 |                         6000000 |                            0
(13 rows)
```

## Stack

- #20118 
- #20132 
- #20147 
- #20148 
- #20149 

---

## Release notes

Check each box that your changes affect. If none of the boxes relate to
your changes, release notes aren't required.

For each box you select, include information after the relevant heading
that describes the impact of your changes that a user might notice and
any actions they must take to implement updates.

- [ ] Protocol: 
- [ ] Nodes (Validators and Full nodes): 
- [ ] Indexer: 
- [ ] JSON-RPC: 
- [ ] GraphQL: 
- [ ] CLI: 
- [ ] Rust SDK:
- [ ] REST API:
  • Loading branch information
amnn authored Nov 13, 2024
1 parent 05b7b16 commit d18cdde
Show file tree
Hide file tree
Showing 9 changed files with 358 additions and 2 deletions.
Original file line number Diff line number Diff line change
@@ -0,0 +1,2 @@
DROP TABLE IF EXISTS kv_epoch_starts;
DROP TABLE IF EXISTS kv_epoch_ends;
Original file line number Diff line number Diff line change
@@ -0,0 +1,56 @@
-- Information related to an epoch that is available when it starts
CREATE TABLE IF NOT EXISTS kv_epoch_starts
(
epoch BIGINT PRIMARY KEY,
protocol_version BIGINT NOT NULL,

-- Inclusive checkpoint lowerbound of the epoch.
cp_lo BIGINT NOT NULL,
-- The timestamp that the epoch starts at. This is always extracted from
-- the system state object.
start_timestamp_ms BIGINT NOT NULL,
-- The reference gas price that will be used for the rest of the epoch.
reference_gas_price BIGINT NOT NULL,
-- BCS serialized SystemState.
system_state BYTEA NOT NULL
);

-- Information related to an epoch that is available when it ends (after the
-- epoch advancement to the next epoch)
CREATE TABLE IF NOT EXISTS kv_epoch_ends
(
epoch BIGINT PRIMARY KEY,

-- Exclusive checkpoint upperbound of the epoch.
cp_hi BIGINT NOT NULL,
-- Exclusive transaction upperbound of the epoch.
tx_hi BIGINT NOT NULL,

-- The epoch ends at the timestamp of its last checkpoint.
end_timestamp_ms BIGINT NOT NULL,

-- Whether the epoch advancement at the end of this epoch entered safe
-- mode.
safe_mode BOOLEAN NOT NULL,

-- Staking information after advancement to the next epoch. These fields
-- are extracted from the `SystemEpochInfoEvent` emitted during epoch
-- advancement. If the epoch advancement entered safe mode, these fields
-- will all be NULL (because a safe mode advance epoch does not emit this
-- event).
total_stake BIGINT,
storage_fund_balance BIGINT,
storage_fund_reinvestment BIGINT,
storage_charge BIGINT,
storage_rebate BIGINT,
stake_subsidy_amount BIGINT,
total_gas_fees BIGINT,
total_stake_rewards_distributed
BIGINT,
leftover_storage_fund_inflow
BIGINT,

-- BCS serialized `Vec<EpochCommitment>` bytes, found in last
-- `CheckpointSummary` of the epoch.
epoch_commitments BYTEA NOT NULL
);
21 changes: 20 additions & 1 deletion crates/sui-indexer-alt/src/bootstrap.rs
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,10 @@ use tokio_util::sync::CancellationToken;
use tracing::info;

use crate::{
models::checkpoints::StoredGenesis, schema::kv_genesis, task::graceful_shutdown, Indexer,
models::{checkpoints::StoredGenesis, epochs::StoredEpochStart},
schema::{kv_epoch_starts, kv_genesis},
task::graceful_shutdown,
Indexer,
};

/// Ensures the genesis table has been populated before the rest of the indexer is run, and returns
Expand Down Expand Up @@ -91,6 +94,15 @@ pub async fn bootstrap(
initial_protocol_version: system_state.protocol_version() as i64,
};

let epoch_start = StoredEpochStart {
epoch: 0,
protocol_version: system_state.protocol_version() as i64,
cp_lo: 0,
start_timestamp_ms: system_state.epoch_start_timestamp_ms() as i64,
reference_gas_price: system_state.reference_gas_price() as i64,
system_state: bcs::to_bytes(&system_state).context("Failed to serialize SystemState")?,
};

info!(
chain = genesis.chain()?.as_str(),
protocol = ?genesis.initial_protocol_version(),
Expand All @@ -104,5 +116,12 @@ pub async fn bootstrap(
.await
.context("Failed to write genesis record")?;

diesel::insert_into(kv_epoch_starts::table)
.values(&epoch_start)
.on_conflict_do_nothing()
.execute(&mut conn)
.await
.context("Failed to write genesis epoch start record")?;

Ok(genesis)
}
132 changes: 132 additions & 0 deletions crates/sui-indexer-alt/src/handlers/kv_epoch_ends.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,132 @@
// Copyright (c) Mysten Labs, Inc.
// SPDX-License-Identifier: Apache-2.0

use std::sync::Arc;

use anyhow::{bail, Context, Result};
use diesel_async::RunQueryDsl;
use sui_types::{
event::SystemEpochInfoEvent,
full_checkpoint_content::CheckpointData,
transaction::{TransactionDataAPI, TransactionKind},
};

use crate::{
db,
models::epochs::StoredEpochEnd,
pipeline::{concurrent::Handler, Processor},
schema::kv_epoch_ends,
};

pub struct KvEpochEnds;

impl Processor for KvEpochEnds {
const NAME: &'static str = "kv_epoch_ends";

type Value = StoredEpochEnd;

fn process(&self, checkpoint: &Arc<CheckpointData>) -> Result<Vec<Self::Value>> {
let CheckpointData {
checkpoint_summary,
transactions,
..
} = checkpoint.as_ref();

let Some(end_of_epoch) = checkpoint_summary.end_of_epoch_data.as_ref() else {
return Ok(vec![]);
};

let Some(transaction) = transactions.iter().find(|tx| {
matches!(
tx.transaction.intent_message().value.kind(),
TransactionKind::ChangeEpoch(_) | TransactionKind::EndOfEpochTransaction(_)
)
}) else {
bail!(
"Failed to get end of epoch transaction in checkpoint {} with EndOfEpochData",
checkpoint_summary.sequence_number,
);
};

if let Some(SystemEpochInfoEvent {
total_stake,
storage_fund_reinvestment,
storage_charge,
storage_rebate,
storage_fund_balance,
stake_subsidy_amount,
total_gas_fees,
total_stake_rewards_distributed,
leftover_storage_fund_inflow,
..
}) = transaction
.events
.iter()
.flat_map(|events| &events.data)
.find_map(|event| {
event
.is_system_epoch_info_event()
.then(|| bcs::from_bytes(&event.contents))
})
.transpose()
.context("Failed to deserialize SystemEpochInfoEvent")?
{
Ok(vec![StoredEpochEnd {
epoch: checkpoint_summary.epoch as i64,
cp_hi: checkpoint_summary.sequence_number as i64 + 1,
tx_hi: checkpoint_summary.network_total_transactions as i64,
end_timestamp_ms: checkpoint_summary.timestamp_ms as i64,

safe_mode: false,

total_stake: Some(total_stake as i64),
storage_fund_balance: Some(storage_fund_balance as i64),
storage_fund_reinvestment: Some(storage_fund_reinvestment as i64),
storage_charge: Some(storage_charge as i64),
storage_rebate: Some(storage_rebate as i64),
stake_subsidy_amount: Some(stake_subsidy_amount as i64),
total_gas_fees: Some(total_gas_fees as i64),
total_stake_rewards_distributed: Some(total_stake_rewards_distributed as i64),
leftover_storage_fund_inflow: Some(leftover_storage_fund_inflow as i64),

epoch_commitments: bcs::to_bytes(&end_of_epoch.epoch_commitments)
.context("Failed to serialize EpochCommitment-s")?,
}])
} else {
Ok(vec![StoredEpochEnd {
epoch: checkpoint_summary.epoch as i64,
cp_hi: checkpoint_summary.sequence_number as i64 + 1,
tx_hi: checkpoint_summary.network_total_transactions as i64,
end_timestamp_ms: checkpoint_summary.timestamp_ms as i64,

safe_mode: true,

total_stake: None,
storage_fund_balance: None,
storage_fund_reinvestment: None,
storage_charge: None,
storage_rebate: None,
stake_subsidy_amount: None,
total_gas_fees: None,
total_stake_rewards_distributed: None,
leftover_storage_fund_inflow: None,

epoch_commitments: bcs::to_bytes(&end_of_epoch.epoch_commitments)
.context("Failed to serialize EpochCommitment-s")?,
}])
}
}
}

#[async_trait::async_trait]
impl Handler for KvEpochEnds {
const MIN_EAGER_ROWS: usize = 1;

async fn commit(values: &[Self::Value], conn: &mut db::Connection<'_>) -> Result<usize> {
Ok(diesel::insert_into(kv_epoch_ends::table)
.values(values)
.on_conflict_do_nothing()
.execute(conn)
.await?)
}
}
77 changes: 77 additions & 0 deletions crates/sui-indexer-alt/src/handlers/kv_epoch_starts.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,77 @@
// Copyright (c) Mysten Labs, Inc.
// SPDX-License-Identifier: Apache-2.0

use std::sync::Arc;

use anyhow::{bail, Context, Result};
use diesel_async::RunQueryDsl;
use sui_types::{
full_checkpoint_content::CheckpointData,
sui_system_state::{get_sui_system_state, SuiSystemStateTrait},
transaction::{TransactionDataAPI, TransactionKind},
};

use crate::{
db, models::epochs::StoredEpochStart, pipeline::concurrent::Handler, pipeline::Processor,
schema::kv_epoch_starts,
};

pub struct KvEpochStarts;

impl Processor for KvEpochStarts {
const NAME: &'static str = "kv_epoch_starts";

type Value = StoredEpochStart;

fn process(&self, checkpoint: &Arc<CheckpointData>) -> Result<Vec<Self::Value>> {
let CheckpointData {
checkpoint_summary,
transactions,
..
} = checkpoint.as_ref();

// If this is the last checkpoint in the current epoch, it will contain enough information
// about the start of the next epoch.
if !checkpoint_summary.is_last_checkpoint_of_epoch() {
return Ok(vec![]);
}

let Some(transaction) = transactions.iter().find(|tx| {
matches!(
tx.transaction.intent_message().value.kind(),
TransactionKind::ChangeEpoch(_) | TransactionKind::EndOfEpochTransaction(_)
)
}) else {
bail!(
"Failed to get end of epoch transaction in checkpoint {} with EndOfEpochData",
checkpoint_summary.sequence_number,
);
};

let system_state = get_sui_system_state(&transaction.output_objects.as_slice())
.context("Failed to find system state object output from end of epoch transaction")?;

Ok(vec![StoredEpochStart {
epoch: system_state.epoch() as i64,
protocol_version: system_state.protocol_version() as i64,
cp_lo: checkpoint_summary.sequence_number as i64 + 1,
start_timestamp_ms: system_state.epoch_start_timestamp_ms() as i64,
reference_gas_price: system_state.reference_gas_price() as i64,
system_state: bcs::to_bytes(&system_state)
.context("Failed to serialize SystemState")?,
}])
}
}

#[async_trait::async_trait]
impl Handler for KvEpochStarts {
const MIN_EAGER_ROWS: usize = 1;

async fn commit(values: &[Self::Value], conn: &mut db::Connection<'_>) -> Result<usize> {
Ok(diesel::insert_into(kv_epoch_starts::table)
.values(values)
.on_conflict_do_nothing()
.execute(conn)
.await?)
}
}
2 changes: 2 additions & 0 deletions crates/sui-indexer-alt/src/handlers/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,8 @@
pub mod ev_emit_mod;
pub mod ev_struct_inst;
pub mod kv_checkpoints;
pub mod kv_epoch_ends;
pub mod kv_epoch_starts;
pub mod kv_feature_flags;
pub mod kv_objects;
pub mod kv_protocol_configs;
Expand Down
4 changes: 4 additions & 0 deletions crates/sui-indexer-alt/src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,8 @@ use clap::Parser;
use sui_indexer_alt::args::Command;
use sui_indexer_alt::bootstrap::bootstrap;
use sui_indexer_alt::db::reset_database;
use sui_indexer_alt::handlers::kv_epoch_ends::KvEpochEnds;
use sui_indexer_alt::handlers::kv_epoch_starts::KvEpochStarts;
use sui_indexer_alt::handlers::kv_feature_flags::KvFeatureFlags;
use sui_indexer_alt::handlers::kv_protocol_configs::KvProtocolConfigs;
use sui_indexer_alt::{
Expand Down Expand Up @@ -49,6 +51,8 @@ async fn main() -> Result<()> {
indexer.concurrent_pipeline(EvEmitMod).await?;
indexer.concurrent_pipeline(EvStructInst).await?;
indexer.concurrent_pipeline(KvCheckpoints).await?;
indexer.concurrent_pipeline(KvEpochEnds).await?;
indexer.concurrent_pipeline(KvEpochStarts).await?;
indexer.concurrent_pipeline(kv_feature_flags).await?;
indexer.concurrent_pipeline(KvObjects).await?;
indexer.concurrent_pipeline(kv_protocol_configs).await?;
Expand Down
33 changes: 32 additions & 1 deletion crates/sui-indexer-alt/src/models/epochs.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,38 @@
use diesel::prelude::*;
use sui_field_count::FieldCount;

use crate::schema::{kv_feature_flags, kv_protocol_configs};
use crate::schema::{kv_epoch_ends, kv_epoch_starts, kv_feature_flags, kv_protocol_configs};

#[derive(Insertable, Debug, Clone, FieldCount)]
#[diesel(table_name = kv_epoch_ends)]
pub struct StoredEpochEnd {
pub epoch: i64,
pub cp_hi: i64,
pub tx_hi: i64,
pub end_timestamp_ms: i64,
pub safe_mode: bool,
pub total_stake: Option<i64>,
pub storage_fund_balance: Option<i64>,
pub storage_fund_reinvestment: Option<i64>,
pub storage_charge: Option<i64>,
pub storage_rebate: Option<i64>,
pub stake_subsidy_amount: Option<i64>,
pub total_gas_fees: Option<i64>,
pub total_stake_rewards_distributed: Option<i64>,
pub leftover_storage_fund_inflow: Option<i64>,
pub epoch_commitments: Vec<u8>,
}

#[derive(Insertable, Debug, Clone, FieldCount)]
#[diesel(table_name = kv_epoch_starts)]
pub struct StoredEpochStart {
pub epoch: i64,
pub protocol_version: i64,
pub cp_lo: i64,
pub start_timestamp_ms: i64,
pub reference_gas_price: i64,
pub system_state: Vec<u8>,
}

#[derive(Insertable, Debug, Clone, FieldCount)]
#[diesel(table_name = kv_feature_flags)]
Expand Down
Loading

0 comments on commit d18cdde

Please sign in to comment.