Skip to content

Commit

Permalink
[indexer-alt] Add object filter queries
Browse files Browse the repository at this point in the history
  • Loading branch information
lxfind committed Jan 9, 2025
1 parent 56fdf0b commit 03402fc
Show file tree
Hide file tree
Showing 5 changed files with 226 additions and 1 deletion.
1 change: 1 addition & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

2 changes: 1 addition & 1 deletion crates/sui-indexer-alt-schema/src/objects.rs
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@ pub struct StoredObject {
pub serialized_object: Option<Vec<u8>>,
}

#[derive(Insertable, Debug, Clone, FieldCount)]
#[derive(Insertable, QueryableByName, Selectable, Debug, Clone, FieldCount)]
#[diesel(table_name = obj_versions, primary_key(object_id, object_version))]
pub struct StoredObjVersion {
pub object_id: Vec<u8>,
Expand Down
1 change: 1 addition & 0 deletions crates/sui-indexer-alt/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@ diesel_migrations.workspace = true
futures.workspace = true
hex.workspace = true
itertools.workspace = true
move-core-types.workspace = true
serde.workspace = true
telemetry-subscribers.workspace = true
tokio.workspace = true
Expand Down
1 change: 1 addition & 0 deletions crates/sui-indexer-alt/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,7 @@ pub(crate) mod bootstrap;
pub mod config;
pub(crate) mod consistent_pruning;
pub(crate) mod handlers;
pub mod queries;

pub async fn start_indexer(
db_args: DbArgs,
Expand Down
222 changes: 222 additions & 0 deletions crates/sui-indexer-alt/src/queries/objects.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,222 @@
// Copyright (c) Mysten Labs, Inc.
// SPDX-License-Identifier: Apache-2.0

use diesel::dsl::sql_query;
use diesel::{
sql_types::{BigInt, Bytea},
QueryableByName, Selectable,
};
use diesel_async::RunQueryDsl;
use move_core_types::language_storage::StructTag;
use sui_indexer_alt_schema::objects::{StoredObjVersion, StoredOwnerKind};
use sui_indexer_alt_schema::schema::obj_info;
use sui_pg_db::Db;
use sui_types::base_types::{ObjectID, SuiAddress};

pub struct Cursor {
checkpoint: i64,
object_id: ObjectID,
}

#[derive(Clone, Debug, Eq, PartialEq)]
pub enum ModuleFilter {
/// Filter the module by the package it's from.
ByPackage(SuiAddress),

/// Exact match on the module.
ByModule(SuiAddress, String),
}

#[derive(Clone, Debug, Eq, PartialEq)]
pub enum TypeFilter {
/// Filter the type by the package or module it's from.
ByModule(ModuleFilter),

/// If the struct tag has type parameters, treat it as an exact filter on that instantiation,
/// otherwise treat it as either a filter on all generic instantiations of the type, or an exact
/// match on the type with no type parameters. E.g.
///
/// 0x2::coin::Coin
///
/// would match both 0x2::coin::Coin and 0x2::coin::Coin<0x2::sui::SUI>.
ByType(StructTag),
}

#[derive(Default, Debug, Clone, Eq, PartialEq)]
pub struct ObjectFilter {
/// Filter objects by their type's `package`, `package::module`, or their fully qualified type
/// name.
///
/// Generic types can be queried by either the generic type name, e.g. `0x2::coin::Coin`, or by
/// the full type name, such as `0x2::coin::Coin<0x2::sui::SUI>`.
pub type_filter: Option<TypeFilter>,

/// Filter for live objects by their current owners.
pub owner_filter: Option<SuiAddress>,
}

#[derive(QueryableByName, Selectable, Debug)]
#[diesel(table_name = obj_info)]
struct IdCheckpoint {
#[diesel(sql_type = BigInt)]
cp_sequence_number: i64,
#[diesel(sql_type = Bytea)]
object_id: Vec<u8>,
}

pub async fn query_objects_with_filters(
db: &Db,
view_checkpoint_number: i64,
filters: ObjectFilter,
cursor: Option<Cursor>,
limit: usize,
) -> anyhow::Result<Vec<StoredObjVersion>> {
let object_ids =
query_object_ids_with_filters(db, view_checkpoint_number, filters, cursor, limit).await?;
query_latest_object_versions(db, &object_ids).await
}

// TODO: Double check that this function is not prone to SQL injection.
async fn query_object_ids_with_filters(
db: &Db,
view_checkpoint_number: i64,
filters: ObjectFilter,
cursor: Option<Cursor>,
limit: usize,
) -> anyhow::Result<Vec<IdCheckpoint>> {
let owner_filter_condition = if let Some(owner) = filters.owner_filter {
format!(
" AND owner_kind = {} AND owner_id = '\\x{}'::bytea",
StoredOwnerKind::Address as i16,
hex::encode(owner.to_vec())
)
} else {
String::new()
};
let type_filter = if let Some(type_filter) = filters.type_filter {
match type_filter {
TypeFilter::ByModule(module) => match module {
ModuleFilter::ByPackage(package) => format!(
" AND package = '\\x{}'::bytea",
hex::encode(package.to_vec())
),
ModuleFilter::ByModule(package, module) => format!(
" AND package = '\\x{}'::bytea AND module = '{}'",
hex::encode(package.to_vec()),
module
),
},
TypeFilter::ByType(struct_tag) => {
format!(
" AND package = '\\x{}'::bytea AND module = '{:?}' AND name = '{:?}' AND instantiation = '\\x{}'::bytea",
hex::encode(struct_tag.address.to_vec()),
struct_tag.module,
struct_tag.name,
hex::encode(bcs::to_bytes(&struct_tag.type_params).unwrap())
)
}
}
} else {
String::new()
};
let cursor_filter_condition = if let Some(cursor) = cursor {
format!(
" AND (cp_sequence_number < {} OR (cp_sequence_number = {} AND object_id > '\\x{}'::bytea))",
cursor.checkpoint, cursor.checkpoint, hex::encode(cursor.object_id)
)
} else {
String::new()
};

let query = format!(
"
WITH filtered_rows AS (
SELECT
cp_sequence_number,
object_id
FROM
obj_info
WHERE
cp_sequence_number <= {view_checkpoint_number}
{owner_filter_condition}
{type_filter}
{cursor_filter_condition}
),
max_cp_per_object AS (
SELECT
object_id,
MAX(cp_sequence_number) AS max_cp_sequence_number
FROM
obj_info
WHERE
cp_sequence_number <= {view_checkpoint_number}
GROUP BY
object_id
)
SELECT
f.cp_sequence_number,
f.object_id
FROM
filtered_rows f
JOIN
max_cp_per_object m
ON
f.object_id = m.object_id
AND f.cp_sequence_number = m.max_cp_sequence_number
ORDER BY
f.cp_sequence_number DESC,
f.object_id ASC
LIMIT {limit};
",
);

let sql_query = sql_query(query);
let mut conn = db.connect().await?;
Ok(sql_query.load::<IdCheckpoint>(&mut conn).await?)
}

async fn query_latest_object_versions(
db: &Db,
objects: &[IdCheckpoint],
) -> anyhow::Result<Vec<StoredObjVersion>> {
if objects.is_empty() {
return Ok(vec![]);
}
let conditions = objects
.iter()
.map(|o| {
format!(
"(object_id = '\\x{}'::bytea AND cp_sequence_number <= {})",
hex::encode(&o.object_id),
o.cp_sequence_number
)
})
.collect::<Vec<_>>()
.join(" OR ");
let query = format!(
"
SELECT obj_versions.*
FROM obj_versions
JOIN (
SELECT object_id, MAX(cp_sequence_number) AS max_cp_sequence_number
FROM obj_versions
WHERE {}
GROUP BY object_id
) AS filtered_objects
ON obj_versions.object_id = filtered_objects.object_id
AND obj_versions.cp_sequence_number = filtered_objects.max_cp_sequence_number
",
conditions
);
let sql_query = sql_query(query);
let mut conn = db.connect().await?;
Ok(sql_query.load::<StoredObjVersion>(&mut conn).await?)
}

#[cfg(test)]
mod tests {
use super::*;

#[tokio::test]
async fn test_query_objects_with_filters() {}
}

0 comments on commit 03402fc

Please sign in to comment.