Skip to content

Commit

Permalink
address comments
Browse files Browse the repository at this point in the history
  • Loading branch information
gegaowp committed Nov 25, 2024
1 parent 187c915 commit 8bbdec3
Show file tree
Hide file tree
Showing 24 changed files with 35 additions and 47 deletions.
1 change: 0 additions & 1 deletion crates/sui-indexer-alt/src/handlers/ev_emit_mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -49,7 +49,6 @@ impl Processor for EvEmitMod {
#[async_trait::async_trait]
impl Handler for EvEmitMod {
const MIN_EAGER_ROWS: usize = 100;
const MAX_CHUNK_ROWS: usize = 1000;
const MAX_PENDING_ROWS: usize = 10000;

async fn commit(values: &[Self::Value], conn: &mut db::Connection<'_>) -> Result<usize> {
Expand Down
1 change: 0 additions & 1 deletion crates/sui-indexer-alt/src/handlers/ev_struct_inst.rs
Original file line number Diff line number Diff line change
Expand Up @@ -53,7 +53,6 @@ impl Processor for EvStructInst {
#[async_trait::async_trait]
impl Handler for EvStructInst {
const MIN_EAGER_ROWS: usize = 100;
const MAX_CHUNK_ROWS: usize = 1000;
const MAX_PENDING_ROWS: usize = 10000;

async fn commit(values: &[Self::Value], conn: &mut db::Connection<'_>) -> Result<usize> {
Expand Down
1 change: 0 additions & 1 deletion crates/sui-indexer-alt/src/handlers/kv_feature_flags.rs
Original file line number Diff line number Diff line change
Expand Up @@ -55,7 +55,6 @@ impl Processor for KvFeatureFlags {
#[async_trait::async_trait]
impl Handler for KvFeatureFlags {
const MIN_EAGER_ROWS: usize = 1;
const MAX_CHUNK_ROWS: usize = i16::MAX as usize / 3;
const MAX_PENDING_ROWS: usize = 10000;

async fn commit(values: &[Self::Value], conn: &mut db::Connection<'_>) -> Result<usize> {
Expand Down
1 change: 0 additions & 1 deletion crates/sui-indexer-alt/src/handlers/kv_objects.rs
Original file line number Diff line number Diff line change
Expand Up @@ -56,7 +56,6 @@ impl Processor for KvObjects {
#[async_trait::async_trait]
impl Handler for KvObjects {
const MIN_EAGER_ROWS: usize = 100;
const MAX_CHUNK_ROWS: usize = 1000;
const MAX_PENDING_ROWS: usize = 10000;
const WRITE_CONCURRENCY_OVERRIDE: Option<usize> = Some(20);

Expand Down
1 change: 0 additions & 1 deletion crates/sui-indexer-alt/src/handlers/kv_protocol_configs.rs
Original file line number Diff line number Diff line change
Expand Up @@ -55,7 +55,6 @@ impl Processor for KvProtocolConfigs {
#[async_trait::async_trait]
impl Handler for KvProtocolConfigs {
const MIN_EAGER_ROWS: usize = 1;
const MAX_CHUNK_ROWS: usize = i16::MAX as usize / 3;
const MAX_PENDING_ROWS: usize = 10000;

async fn commit(values: &[Self::Value], conn: &mut db::Connection<'_>) -> Result<usize> {
Expand Down
1 change: 0 additions & 1 deletion crates/sui-indexer-alt/src/handlers/kv_transactions.rs
Original file line number Diff line number Diff line change
Expand Up @@ -59,7 +59,6 @@ impl Processor for KvTransactions {
#[async_trait::async_trait]
impl Handler for KvTransactions {
const MIN_EAGER_ROWS: usize = 100;
const MAX_CHUNK_ROWS: usize = 1000;
const MAX_PENDING_ROWS: usize = 10000;

async fn commit(values: &[Self::Value], conn: &mut db::Connection<'_>) -> Result<usize> {
Expand Down
1 change: 0 additions & 1 deletion crates/sui-indexer-alt/src/handlers/obj_versions.rs
Original file line number Diff line number Diff line change
Expand Up @@ -49,7 +49,6 @@ impl Processor for ObjVersions {
#[async_trait::async_trait]
impl Handler for ObjVersions {
const MIN_EAGER_ROWS: usize = 100;
const MAX_CHUNK_ROWS: usize = 1000;
const MAX_PENDING_ROWS: usize = 10000;
const WRITE_CONCURRENCY_OVERRIDE: Option<usize> = Some(20);

Expand Down
1 change: 0 additions & 1 deletion crates/sui-indexer-alt/src/handlers/sum_coin_balances.rs
Original file line number Diff line number Diff line change
Expand Up @@ -151,7 +151,6 @@ impl Handler for SumCoinBalances {
deletes.push(update.object_id.to_vec());
}
}

let update_chunks = updates.chunks(Self::INSERT_CHUNK_ROWS).map(Either::Left);
let delete_chunks = deletes.chunks(Self::DELETE_CHUNK_ROWS).map(Either::Right);

Expand Down
30 changes: 16 additions & 14 deletions crates/sui-indexer-alt/src/handlers/sum_displays.rs
Original file line number Diff line number Diff line change
Expand Up @@ -16,8 +16,6 @@ use crate::{
schema::sum_displays,
};

const CHUNK_ROWS: usize = i16::MAX as usize / 4;

pub struct SumDisplays;

impl Processor for SumDisplays {
Expand Down Expand Up @@ -71,18 +69,22 @@ impl Handler for SumDisplays {

async fn commit(batch: &Self::Batch, conn: &mut db::Connection<'_>) -> Result<usize> {
let values: Vec<_> = batch.values().cloned().collect();
let updates = values.chunks(CHUNK_ROWS).map(|chunk| {
diesel::insert_into(sum_displays::table)
.values(chunk)
.on_conflict(sum_displays::object_type)
.do_update()
.set((
sum_displays::display_id.eq(excluded(sum_displays::display_id)),
sum_displays::display_version.eq(excluded(sum_displays::display_version)),
sum_displays::display.eq(excluded(sum_displays::display)),
))
.execute(conn)
});
let updates =
values
.chunks(Self::MAX_INSERT_CHUNK_ROWS)
.map(|chunk: &[StoredDisplay]| {
diesel::insert_into(sum_displays::table)
.values(chunk)
.on_conflict(sum_displays::object_type)
.do_update()
.set((
sum_displays::display_id.eq(excluded(sum_displays::display_id)),
sum_displays::display_version
.eq(excluded(sum_displays::display_version)),
sum_displays::display.eq(excluded(sum_displays::display)),
))
.execute(conn)
});

Ok(try_join_all(updates).await?.into_iter().sum())
}
Expand Down
4 changes: 1 addition & 3 deletions crates/sui-indexer-alt/src/handlers/sum_packages.rs
Original file line number Diff line number Diff line change
Expand Up @@ -16,8 +16,6 @@ use crate::{
schema::sum_packages,
};

const CHUNK_ROWS: usize = i16::MAX as usize / 5;

pub struct SumPackages;

impl Processor for SumPackages {
Expand Down Expand Up @@ -67,7 +65,7 @@ impl Handler for SumPackages {

async fn commit(batch: &Self::Batch, conn: &mut db::Connection<'_>) -> Result<usize> {
let values: Vec<_> = batch.values().cloned().collect();
let updates = values.chunks(CHUNK_ROWS).map(|chunk| {
let updates = values.chunks(Self::MAX_INSERT_CHUNK_ROWS).map(|chunk| {
diesel::insert_into(sum_packages::table)
.values(chunk)
.on_conflict(sum_packages::package_id)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -60,7 +60,6 @@ impl Processor for TxAffectedAddress {
#[async_trait::async_trait]
impl Handler for TxAffectedAddress {
const MIN_EAGER_ROWS: usize = 100;
const MAX_CHUNK_ROWS: usize = 1000;
const MAX_PENDING_ROWS: usize = 10000;

async fn commit(values: &[Self::Value], conn: &mut db::Connection<'_>) -> Result<usize> {
Expand Down
1 change: 0 additions & 1 deletion crates/sui-indexer-alt/src/handlers/tx_affected_objects.rs
Original file line number Diff line number Diff line change
Expand Up @@ -52,7 +52,6 @@ impl Processor for TxAffectedObjects {
#[async_trait::async_trait]
impl Handler for TxAffectedObjects {
const MIN_EAGER_ROWS: usize = 100;
const MAX_CHUNK_ROWS: usize = 1000;
const MAX_PENDING_ROWS: usize = 10000;
const WRITE_CONCURRENCY_OVERRIDE: Option<usize> = Some(20);

Expand Down
1 change: 0 additions & 1 deletion crates/sui-indexer-alt/src/handlers/tx_balance_changes.rs
Original file line number Diff line number Diff line change
Expand Up @@ -58,7 +58,6 @@ impl Processor for TxBalanceChanges {
#[async_trait::async_trait]
impl Handler for TxBalanceChanges {
const MIN_EAGER_ROWS: usize = 100;
const MAX_CHUNK_ROWS: usize = 1000;
const MAX_PENDING_ROWS: usize = 10000;

async fn commit(values: &[Self::Value], conn: &mut db::Connection<'_>) -> Result<usize> {
Expand Down
1 change: 0 additions & 1 deletion crates/sui-indexer-alt/src/handlers/tx_calls.rs
Original file line number Diff line number Diff line change
Expand Up @@ -55,7 +55,6 @@ impl Processor for TxCalls {
#[async_trait::async_trait]
impl Handler for TxCalls {
const MIN_EAGER_ROWS: usize = 100;
const MAX_CHUNK_ROWS: usize = 1000;
const MAX_PENDING_ROWS: usize = 10000;

async fn commit(values: &[Self::Value], conn: &mut db::Connection<'_>) -> Result<usize> {
Expand Down
1 change: 0 additions & 1 deletion crates/sui-indexer-alt/src/handlers/tx_digests.rs
Original file line number Diff line number Diff line change
Expand Up @@ -42,7 +42,6 @@ impl Processor for TxDigests {
#[async_trait::async_trait]
impl Handler for TxDigests {
const MIN_EAGER_ROWS: usize = 100;
const MAX_CHUNK_ROWS: usize = 1000;
const MAX_PENDING_ROWS: usize = 10000;

async fn commit(values: &[Self::Value], conn: &mut db::Connection<'_>) -> Result<usize> {
Expand Down
1 change: 0 additions & 1 deletion crates/sui-indexer-alt/src/handlers/tx_kinds.rs
Original file line number Diff line number Diff line change
Expand Up @@ -52,7 +52,6 @@ impl Processor for TxKinds {
#[async_trait::async_trait]
impl Handler for TxKinds {
const MIN_EAGER_ROWS: usize = 100;
const MAX_CHUNK_ROWS: usize = 1000;
const MAX_PENDING_ROWS: usize = 10000;

async fn commit(values: &[Self::Value], conn: &mut db::Connection<'_>) -> Result<usize> {
Expand Down
1 change: 0 additions & 1 deletion crates/sui-indexer-alt/src/handlers/wal_coin_balances.rs
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,6 @@ impl Processor for WalCoinBalances {
#[async_trait::async_trait]
impl Handler for WalCoinBalances {
const MIN_EAGER_ROWS: usize = 100;
const MAX_CHUNK_ROWS: usize = 1000;
const MAX_PENDING_ROWS: usize = 10000;

async fn commit(values: &[Self::Value], conn: &mut db::Connection<'_>) -> Result<usize> {
Expand Down
1 change: 0 additions & 1 deletion crates/sui-indexer-alt/src/handlers/wal_obj_types.rs
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,6 @@ impl Processor for WalObjTypes {
#[async_trait::async_trait]
impl Handler for WalObjTypes {
const MIN_EAGER_ROWS: usize = 100;
const MAX_CHUNK_ROWS: usize = 1000;
const MAX_PENDING_ROWS: usize = 10000;

async fn commit(values: &[Self::Value], conn: &mut db::Connection<'_>) -> Result<usize> {
Expand Down
3 changes: 2 additions & 1 deletion crates/sui-indexer-alt/src/models/displays.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,10 +2,11 @@
// SPDX-License-Identifier: Apache-2.0

use diesel::prelude::*;
use sui_field_count::FieldCount;

use crate::schema::sum_displays;

#[derive(Insertable, Debug, Clone)]
#[derive(Insertable, Debug, Clone, FieldCount)]
#[diesel(table_name = sum_displays, primary_key(object_type))]
pub struct StoredDisplay {
pub object_type: Vec<u8>,
Expand Down
3 changes: 2 additions & 1 deletion crates/sui-indexer-alt/src/models/packages.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,10 +2,11 @@
// SPDX-License-Identifier: Apache-2.0

use diesel::prelude::*;
use sui_field_count::FieldCount;

use crate::schema::sum_packages;

#[derive(Insertable, Debug, Clone)]
#[derive(Insertable, Debug, Clone, FieldCount)]
#[diesel(table_name = sum_packages, primary_key(package_id))]
pub struct StoredPackage {
pub package_id: Vec<u8>,
Expand Down
6 changes: 3 additions & 3 deletions crates/sui-indexer-alt/src/models/transactions.rs
Original file line number Diff line number Diff line change
Expand Up @@ -70,7 +70,7 @@ pub struct StoredTxBalanceChange {
pub balance_changes: Vec<u8>,
}

#[derive(Insertable, Debug, Clone)]
#[derive(Insertable, Debug, Clone, FieldCount)]
#[diesel(table_name = tx_calls)]
pub struct StoredTxCalls {
pub tx_sequence_number: i64,
Expand All @@ -80,7 +80,7 @@ pub struct StoredTxCalls {
pub sender: Vec<u8>,
}

#[derive(Insertable, Debug, Clone)]
#[derive(Insertable, Debug, Clone, FieldCount)]
#[diesel(table_name = tx_digests)]
pub struct StoredTxDigest {
pub tx_sequence_number: i64,
Expand All @@ -95,7 +95,7 @@ pub enum StoredKind {
ProgrammableTransaction = 1,
}

#[derive(Insertable, Debug, Clone)]
#[derive(Insertable, Debug, Clone, FieldCount)]
#[diesel(table_name = tx_kinds)]
pub struct StoredTxKind {
pub tx_sequence_number: i64,
Expand Down
5 changes: 3 additions & 2 deletions crates/sui-indexer-alt/src/pipeline/concurrent/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@
use std::{sync::Arc, time::Duration};

use reader_watermark::reader_watermark;
use sui_field_count::FieldCount;
use sui_types::full_checkpoint_content::CheckpointData;
use tokio::{sync::mpsc, task::JoinHandle};
use tokio_util::sync::CancellationToken;
Expand Down Expand Up @@ -56,8 +57,8 @@ pub trait Handler: Processor {
const MIN_EAGER_ROWS: usize = 50;

/// If there are more than this many rows pending, the committer will only commit this many in
/// one operation.
const MAX_CHUNK_ROWS: usize = 1000;
/// one operation. The size is chosen to maximize the rows without hitting the limit on bind parameters.
const MAX_CHUNK_ROWS: usize = i16::MAX as usize / Self::Value::FIELD_COUNT;

/// If there are more than this many rows pending, the committer applies backpressure.
const MAX_PENDING_ROWS: usize = 5000;
Expand Down
7 changes: 0 additions & 7 deletions crates/sui-indexer-alt/src/pipeline/processor.rs
Original file line number Diff line number Diff line change
Expand Up @@ -26,13 +26,6 @@ pub trait Processor {
/// How much concurrency to use when processing checkpoint data.
const FANOUT: usize = 10;

/// Each insert or update will include at most this many rows -- the size is chosen to maximize the
/// rows without hitting the limit on bind parameters.
const INSERT_CHUNK_ROWS: usize = i16::MAX as usize / Self::Value::FIELD_COUNT;

/// Each deletion will include at most this many rows without hitting the limit on bind parameters.
const DELETE_CHUNK_ROWS: usize = i16::MAX as usize;

/// The type of value being inserted by the handler.
type Value: Send + Sync + 'static + FieldCount;

Expand Down
8 changes: 8 additions & 0 deletions crates/sui-indexer-alt/src/pipeline/sequential/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@

use std::sync::Arc;

use sui_field_count::FieldCount;
use sui_types::full_checkpoint_content::CheckpointData;
use tokio::{sync::mpsc, task::JoinHandle};
use tokio_util::sync::CancellationToken;
Expand Down Expand Up @@ -41,6 +42,13 @@ pub trait Handler: Processor {
/// If at least this many rows are pending, the committer will commit them eagerly.
const MIN_EAGER_ROWS: usize = 50;

/// If there are more than this many rows pending, the committer will only commit this many in
/// one operation. The size is chosen to maximize the rows without hitting the limit on bind parameters.
const MAX_INSERT_CHUNK_ROWS: usize = i16::MAX as usize / Self::Value::FIELD_COUNT;

/// Each deletion will include at most this many rows without hitting the limit on bind parameters.
const MAX_DELETE_CHUNK_ROWS: usize = i16::MAX as usize;

/// Maximum number of checkpoints to try and write in a single batch. The larger this number
/// is, the more chances the pipeline has to merge redundant writes, but the longer each write
/// transaction is likely to be.
Expand Down

0 comments on commit 8bbdec3

Please sign in to comment.