From 040df882f41bac141729075f80a2d7fc7c15264f Mon Sep 17 00:00:00 2001 From: Emma Zhong Date: Fri, 13 Dec 2024 13:45:21 -0800 Subject: [PATCH] [indexer-alt] add prune impls for each pipeline --- .../src/handlers/ev_emit_mod.rs | 15 +++++++++++++ .../src/handlers/ev_struct_inst.rs | 20 +++++++++++++++-- .../src/handlers/kv_checkpoints.rs | 8 +++++++ .../src/handlers/kv_epoch_ends.rs | 21 +++++++++++++++++- .../src/handlers/kv_epoch_starts.rs | 22 ++++++++++++++++++- .../src/handlers/kv_transactions.rs | 10 +++++++++ .../src/handlers/tx_affected_addresses.rs | 19 +++++++++++++++- .../src/handlers/tx_affected_objects.rs | 19 +++++++++++++++- .../src/handlers/tx_balance_changes.rs | 19 +++++++++++++++- .../sui-indexer-alt/src/handlers/tx_calls.rs | 18 ++++++++++++++- .../src/handlers/tx_digests.rs | 18 ++++++++++++++- .../sui-indexer-alt/src/handlers/tx_kinds.rs | 18 ++++++++++++++- 12 files changed, 197 insertions(+), 10 deletions(-) diff --git a/crates/sui-indexer-alt/src/handlers/ev_emit_mod.rs b/crates/sui-indexer-alt/src/handlers/ev_emit_mod.rs index b5f6f0a4b7ea3d..9aaf0cb7e513d0 100644 --- a/crates/sui-indexer-alt/src/handlers/ev_emit_mod.rs +++ b/crates/sui-indexer-alt/src/handlers/ev_emit_mod.rs @@ -1,10 +1,13 @@ // Copyright (c) Mysten Labs, Inc. // SPDX-License-Identifier: Apache-2.0 +use std::ops::Range; use std::{collections::BTreeSet, sync::Arc}; use anyhow::Result; +use diesel::{ExpressionMethods, QueryDsl}; use diesel_async::RunQueryDsl; +use sui_indexer_alt_framework::models::cp_sequence_numbers::tx_interval; use sui_indexer_alt_framework::pipeline::{concurrent::Handler, Processor}; use sui_indexer_alt_schema::{events::StoredEvEmitMod, schema::ev_emit_mod}; use sui_pg_db as db; @@ -57,4 +60,16 @@ impl Handler for EvEmitMod { .execute(conn) .await?) } + + async fn prune(from: u64, to: u64, conn: &mut db::Connection<'_>) -> Result { + let Range { + start: from_tx, + end: to_tx, + } = tx_interval(conn, from..to).await?; + + let filter = ev_emit_mod::table + .filter(ev_emit_mod::tx_sequence_number.between(from_tx as i64, to_tx as i64 - 1)); + + Ok(diesel::delete(filter).execute(conn).await?) + } } diff --git a/crates/sui-indexer-alt/src/handlers/ev_struct_inst.rs b/crates/sui-indexer-alt/src/handlers/ev_struct_inst.rs index c66d5592fe57ea..29eec49b7ee588 100644 --- a/crates/sui-indexer-alt/src/handlers/ev_struct_inst.rs +++ b/crates/sui-indexer-alt/src/handlers/ev_struct_inst.rs @@ -1,11 +1,15 @@ // Copyright (c) Mysten Labs, Inc. // SPDX-License-Identifier: Apache-2.0 -use std::{collections::BTreeSet, sync::Arc}; +use std::{collections::BTreeSet, ops::Range, sync::Arc}; use anyhow::{Context, Result}; +use diesel::{ExpressionMethods, QueryDsl}; use diesel_async::RunQueryDsl; -use sui_indexer_alt_framework::pipeline::{concurrent::Handler, Processor}; +use sui_indexer_alt_framework::{ + models::cp_sequence_numbers::tx_interval, + pipeline::{concurrent::Handler, Processor}, +}; use sui_indexer_alt_schema::{events::StoredEvStructInst, schema::ev_struct_inst}; use sui_pg_db as db; use sui_types::full_checkpoint_content::CheckpointData; @@ -60,4 +64,16 @@ impl Handler for EvStructInst { .execute(conn) .await?) } + + async fn prune(from: u64, to: u64, conn: &mut db::Connection<'_>) -> Result { + let Range { + start: from_tx, + end: to_tx, + } = tx_interval(conn, from..to).await?; + + let filter = ev_struct_inst::table + .filter(ev_struct_inst::tx_sequence_number.between(from_tx as i64, to_tx as i64 - 1)); + + Ok(diesel::delete(filter).execute(conn).await?) + } } diff --git a/crates/sui-indexer-alt/src/handlers/kv_checkpoints.rs b/crates/sui-indexer-alt/src/handlers/kv_checkpoints.rs index a9bc26a7e90f40..93523c94ce08f2 100644 --- a/crates/sui-indexer-alt/src/handlers/kv_checkpoints.rs +++ b/crates/sui-indexer-alt/src/handlers/kv_checkpoints.rs @@ -4,6 +4,7 @@ use std::sync::Arc; use anyhow::{Context, Result}; +use diesel::{ExpressionMethods, QueryDsl}; use diesel_async::RunQueryDsl; use sui_indexer_alt_framework::pipeline::{concurrent::Handler, Processor}; use sui_indexer_alt_schema::{checkpoints::StoredCheckpoint, schema::kv_checkpoints}; @@ -38,4 +39,11 @@ impl Handler for KvCheckpoints { .execute(conn) .await?) } + + async fn prune(from: u64, to: u64, conn: &mut db::Connection<'_>) -> Result { + let filter = kv_checkpoints::table + .filter(kv_checkpoints::sequence_number.between(from as i64, to as i64 - 1)); + + Ok(diesel::delete(filter).execute(conn).await?) + } } diff --git a/crates/sui-indexer-alt/src/handlers/kv_epoch_ends.rs b/crates/sui-indexer-alt/src/handlers/kv_epoch_ends.rs index 926d9325f442e4..64f1529a7a8351 100644 --- a/crates/sui-indexer-alt/src/handlers/kv_epoch_ends.rs +++ b/crates/sui-indexer-alt/src/handlers/kv_epoch_ends.rs @@ -1,11 +1,16 @@ // Copyright (c) Mysten Labs, Inc. // SPDX-License-Identifier: Apache-2.0 +use std::ops::Range; use std::sync::Arc; use anyhow::{bail, Context, Result}; +use diesel::{ExpressionMethods, QueryDsl}; use diesel_async::RunQueryDsl; -use sui_indexer_alt_framework::pipeline::{concurrent::Handler, Processor}; +use sui_indexer_alt_framework::{ + models::cp_sequence_numbers::epoch_interval, + pipeline::{concurrent::Handler, Processor}, +}; use sui_indexer_alt_schema::{epochs::StoredEpochEnd, schema::kv_epoch_ends}; use sui_pg_db as db; use sui_types::{ @@ -125,4 +130,18 @@ impl Handler for KvEpochEnds { .execute(conn) .await?) } + + async fn prune(from: u64, to: u64, conn: &mut db::Connection<'_>) -> Result { + let Range { + start: from_epoch, + end: to_epoch, + } = epoch_interval(conn, from..to).await?; + if from_epoch < to_epoch { + let filter = kv_epoch_ends::table + .filter(kv_epoch_ends::epoch.between(from_epoch as i64, to_epoch as i64 - 1)); + Ok(diesel::delete(filter).execute(conn).await?) + } else { + Ok(0) + } + } } diff --git a/crates/sui-indexer-alt/src/handlers/kv_epoch_starts.rs b/crates/sui-indexer-alt/src/handlers/kv_epoch_starts.rs index bd5efcdf614638..8ac1ad1fc2d64c 100644 --- a/crates/sui-indexer-alt/src/handlers/kv_epoch_starts.rs +++ b/crates/sui-indexer-alt/src/handlers/kv_epoch_starts.rs @@ -1,11 +1,16 @@ // Copyright (c) Mysten Labs, Inc. // SPDX-License-Identifier: Apache-2.0 +use std::ops::Range; use std::sync::Arc; use anyhow::{bail, Context, Result}; +use diesel::{ExpressionMethods, QueryDsl}; use diesel_async::RunQueryDsl; -use sui_indexer_alt_framework::pipeline::{concurrent::Handler, Processor}; +use sui_indexer_alt_framework::{ + models::cp_sequence_numbers::epoch_interval, + pipeline::{concurrent::Handler, Processor}, +}; use sui_indexer_alt_schema::{epochs::StoredEpochStart, schema::kv_epoch_starts}; use sui_pg_db as db; use sui_types::{ @@ -72,4 +77,19 @@ impl Handler for KvEpochStarts { .execute(conn) .await?) } + + async fn prune(from: u64, to: u64, conn: &mut db::Connection<'_>) -> Result { + let Range { + start: from_epoch, + end: to_epoch, + } = epoch_interval(conn, from..to).await?; + if from_epoch < to_epoch { + let filter = kv_epoch_starts::table + .filter(kv_epoch_starts::epoch.between(from_epoch as i64, to_epoch as i64 - 1)); + + Ok(diesel::delete(filter).execute(conn).await?) + } else { + Ok(0) + } + } } diff --git a/crates/sui-indexer-alt/src/handlers/kv_transactions.rs b/crates/sui-indexer-alt/src/handlers/kv_transactions.rs index 7bef2130d81774..27b96eb27437b4 100644 --- a/crates/sui-indexer-alt/src/handlers/kv_transactions.rs +++ b/crates/sui-indexer-alt/src/handlers/kv_transactions.rs @@ -4,6 +4,7 @@ use std::sync::Arc; use anyhow::{Context, Result}; +use diesel::{ExpressionMethods, QueryDsl}; use diesel_async::RunQueryDsl; use sui_indexer_alt_framework::pipeline::{concurrent::Handler, Processor}; use sui_indexer_alt_schema::{schema::kv_transactions, transactions::StoredTransaction}; @@ -66,4 +67,13 @@ impl Handler for KvTransactions { .execute(conn) .await?) } + + async fn prune(from: u64, to: u64, conn: &mut db::Connection<'_>) -> Result { + // TODO: use tx_interval. `tx_sequence_number` needs to be added to this table, and an index + // created as its primary key is on `tx_digest`. + let filter = kv_transactions::table + .filter(kv_transactions::cp_sequence_number.between(from as i64, to as i64 - 1)); + + Ok(diesel::delete(filter).execute(conn).await?) + } } diff --git a/crates/sui-indexer-alt/src/handlers/tx_affected_addresses.rs b/crates/sui-indexer-alt/src/handlers/tx_affected_addresses.rs index 51fb7e6917b8f8..964c25840702ce 100644 --- a/crates/sui-indexer-alt/src/handlers/tx_affected_addresses.rs +++ b/crates/sui-indexer-alt/src/handlers/tx_affected_addresses.rs @@ -1,12 +1,17 @@ // Copyright (c) Mysten Labs, Inc. // SPDX-License-Identifier: Apache-2.0 +use std::ops::Range; use std::sync::Arc; use anyhow::Result; +use diesel::{ExpressionMethods, QueryDsl}; use diesel_async::RunQueryDsl; use itertools::Itertools; -use sui_indexer_alt_framework::pipeline::{concurrent::Handler, Processor}; +use sui_indexer_alt_framework::{ + models::cp_sequence_numbers::tx_interval, + pipeline::{concurrent::Handler, Processor}, +}; use sui_indexer_alt_schema::{ schema::tx_affected_addresses, transactions::StoredTxAffectedAddress, }; @@ -69,4 +74,16 @@ impl Handler for TxAffectedAddresses { .execute(conn) .await?) } + + async fn prune(from: u64, to: u64, conn: &mut db::Connection<'_>) -> Result { + let Range { + start: from_tx, + end: to_tx, + } = tx_interval(conn, from..to).await?; + let filter = tx_affected_addresses::table.filter( + tx_affected_addresses::tx_sequence_number.between(from_tx as i64, to_tx as i64 - 1), + ); + + Ok(diesel::delete(filter).execute(conn).await?) + } } diff --git a/crates/sui-indexer-alt/src/handlers/tx_affected_objects.rs b/crates/sui-indexer-alt/src/handlers/tx_affected_objects.rs index c99f8dd56a49b6..7445fdce0de7ce 100644 --- a/crates/sui-indexer-alt/src/handlers/tx_affected_objects.rs +++ b/crates/sui-indexer-alt/src/handlers/tx_affected_objects.rs @@ -1,11 +1,16 @@ // Copyright (c) Mysten Labs, Inc. // SPDX-License-Identifier: Apache-2.0 +use std::ops::Range; use std::sync::Arc; use anyhow::Result; +use diesel::{ExpressionMethods, QueryDsl}; use diesel_async::RunQueryDsl; -use sui_indexer_alt_framework::pipeline::{concurrent::Handler, Processor}; +use sui_indexer_alt_framework::{ + models::cp_sequence_numbers::tx_interval, + pipeline::{concurrent::Handler, Processor}, +}; use sui_indexer_alt_schema::{schema::tx_affected_objects, transactions::StoredTxAffectedObject}; use sui_pg_db as db; use sui_types::{effects::TransactionEffectsAPI, full_checkpoint_content::CheckpointData}; @@ -59,4 +64,16 @@ impl Handler for TxAffectedObjects { .execute(conn) .await?) } + + async fn prune(from: u64, to: u64, conn: &mut db::Connection<'_>) -> Result { + let Range { + start: from_tx, + end: to_tx, + } = tx_interval(conn, from..to).await?; + let filter = tx_affected_objects::table.filter( + tx_affected_objects::tx_sequence_number.between(from_tx as i64, to_tx as i64 - 1), + ); + + Ok(diesel::delete(filter).execute(conn).await?) + } } diff --git a/crates/sui-indexer-alt/src/handlers/tx_balance_changes.rs b/crates/sui-indexer-alt/src/handlers/tx_balance_changes.rs index 31a49d33943ccc..15748b7474d0ca 100644 --- a/crates/sui-indexer-alt/src/handlers/tx_balance_changes.rs +++ b/crates/sui-indexer-alt/src/handlers/tx_balance_changes.rs @@ -1,11 +1,16 @@ // Copyright (c) Mysten Labs, Inc. // SPDX-License-Identifier: Apache-2.0 +use std::ops::Range; use std::{collections::BTreeMap, sync::Arc}; use anyhow::{Context, Result}; +use diesel::{ExpressionMethods, QueryDsl}; use diesel_async::RunQueryDsl; -use sui_indexer_alt_framework::pipeline::{concurrent::Handler, Processor}; +use sui_indexer_alt_framework::{ + models::cp_sequence_numbers::tx_interval, + pipeline::{concurrent::Handler, Processor}, +}; use sui_indexer_alt_schema::{ schema::tx_balance_changes, transactions::{BalanceChange, StoredTxBalanceChange}, @@ -65,6 +70,18 @@ impl Handler for TxBalanceChanges { .execute(conn) .await?) } + + async fn prune(from: u64, to: u64, conn: &mut db::Connection<'_>) -> Result { + let Range { + start: from_tx, + end: to_tx, + } = tx_interval(conn, from..to).await?; + let filter = tx_balance_changes::table.filter( + tx_balance_changes::tx_sequence_number.between(from_tx as i64, to_tx as i64 - 1), + ); + + Ok(diesel::delete(filter).execute(conn).await?) + } } /// Calculate balance changes based on the object's input and output objects. diff --git a/crates/sui-indexer-alt/src/handlers/tx_calls.rs b/crates/sui-indexer-alt/src/handlers/tx_calls.rs index e189bdd9acd2d2..fec4dce3a823ae 100644 --- a/crates/sui-indexer-alt/src/handlers/tx_calls.rs +++ b/crates/sui-indexer-alt/src/handlers/tx_calls.rs @@ -1,11 +1,16 @@ // Copyright (c) Mysten Labs, Inc. // SPDX-License-Identifier: Apache-2.0 +use std::ops::Range; use std::sync::Arc; use anyhow::{Ok, Result}; +use diesel::{ExpressionMethods, QueryDsl}; use diesel_async::RunQueryDsl; -use sui_indexer_alt_framework::pipeline::{concurrent::Handler, Processor}; +use sui_indexer_alt_framework::{ + models::cp_sequence_numbers::tx_interval, + pipeline::{concurrent::Handler, Processor}, +}; use sui_indexer_alt_schema::{schema::tx_calls, transactions::StoredTxCalls}; use sui_pg_db as db; use sui_types::full_checkpoint_content::CheckpointData; @@ -62,4 +67,15 @@ impl Handler for TxCalls { .execute(conn) .await?) } + + async fn prune(from: u64, to: u64, conn: &mut db::Connection<'_>) -> Result { + let Range { + start: from_tx, + end: to_tx, + } = tx_interval(conn, from..to).await?; + let filter = tx_calls::table + .filter(tx_calls::tx_sequence_number.between(from_tx as i64, to_tx as i64 - 1)); + + Ok(diesel::delete(filter).execute(conn).await?) + } } diff --git a/crates/sui-indexer-alt/src/handlers/tx_digests.rs b/crates/sui-indexer-alt/src/handlers/tx_digests.rs index 579ec324292409..0b04c2040df4f0 100644 --- a/crates/sui-indexer-alt/src/handlers/tx_digests.rs +++ b/crates/sui-indexer-alt/src/handlers/tx_digests.rs @@ -1,11 +1,16 @@ // Copyright (c) Mysten Labs, Inc. // SPDX-License-Identifier: Apache-2.0 +use std::ops::Range; use std::sync::Arc; use anyhow::Result; +use diesel::{ExpressionMethods, QueryDsl}; use diesel_async::RunQueryDsl; -use sui_indexer_alt_framework::pipeline::{concurrent::Handler, Processor}; +use sui_indexer_alt_framework::{ + models::cp_sequence_numbers::tx_interval, + pipeline::{concurrent::Handler, Processor}, +}; use sui_indexer_alt_schema::{schema::tx_digests, transactions::StoredTxDigest}; use sui_pg_db as db; use sui_types::full_checkpoint_content::CheckpointData; @@ -49,4 +54,15 @@ impl Handler for TxDigests { .execute(conn) .await?) } + + async fn prune(from: u64, to: u64, conn: &mut db::Connection<'_>) -> Result { + let Range { + start: from_tx, + end: to_tx, + } = tx_interval(conn, from..to).await?; + let filter = tx_digests::table + .filter(tx_digests::tx_sequence_number.between(from_tx as i64, to_tx as i64 - 1)); + + Ok(diesel::delete(filter).execute(conn).await?) + } } diff --git a/crates/sui-indexer-alt/src/handlers/tx_kinds.rs b/crates/sui-indexer-alt/src/handlers/tx_kinds.rs index 5f61e66be360f0..791e27a42b5017 100644 --- a/crates/sui-indexer-alt/src/handlers/tx_kinds.rs +++ b/crates/sui-indexer-alt/src/handlers/tx_kinds.rs @@ -1,11 +1,16 @@ // Copyright (c) Mysten Labs, Inc. // SPDX-License-Identifier: Apache-2.0 +use std::ops::Range; use std::sync::Arc; use anyhow::Result; +use diesel::{ExpressionMethods, QueryDsl}; use diesel_async::RunQueryDsl; -use sui_indexer_alt_framework::pipeline::{concurrent::Handler, Processor}; +use sui_indexer_alt_framework::{ + models::cp_sequence_numbers::tx_interval, + pipeline::{concurrent::Handler, Processor}, +}; use sui_indexer_alt_schema::{ schema::tx_kinds, transactions::{StoredKind, StoredTxKind}, @@ -60,4 +65,15 @@ impl Handler for TxKinds { .execute(conn) .await?) } + + async fn prune(from: u64, to: u64, conn: &mut db::Connection<'_>) -> Result { + let Range { + start: from_tx, + end: to_tx, + } = tx_interval(conn, from..to).await?; + let filter = tx_kinds::table + .filter(tx_kinds::tx_sequence_number.between(from_tx as i64, to_tx as i64 - 1)); + + Ok(diesel::delete(filter).execute(conn).await?) + } }