Skip to content

Commit

Permalink
[data ingestion] add support for bigtable in main workflow binary (#2…
Browse files Browse the repository at this point in the history
…0018)

## Description 

adding BigTable KV as a variant to main binary that manages internal
workflows

---

## Release notes

Check each box that your changes affect. If none of the boxes relate to
your changes, release notes aren't required.

For each box you select, include information after the relevant heading
that describes the impact of your changes that a user might notice and
any actions they must take to implement updates.

- [ ] Protocol: 
- [ ] Nodes (Validators and Full nodes): 
- [ ] Indexer: 
- [ ] JSON-RPC: 
- [ ] GraphQL: 
- [ ] CLI: 
- [ ] Rust SDK:
- [ ] REST API:
  • Loading branch information
phoenix-o authored Oct 31, 2024
1 parent c33374e commit d95918c
Show file tree
Hide file tree
Showing 8 changed files with 63 additions and 36 deletions.
1 change: 1 addition & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

1 change: 1 addition & 0 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -646,6 +646,7 @@ sui-json-rpc = { path = "crates/sui-json-rpc" }
sui-json-rpc-api = { path = "crates/sui-json-rpc-api" }
sui-json-rpc-types = { path = "crates/sui-json-rpc-types" }
sui-keys = { path = "crates/sui-keys" }
sui-kvstore = {path = "crates/sui-kvstore"}
sui-macros = { path = "crates/sui-macros" }
sui-metric-checker = { path = "crates/sui-metric-checker" }
sui-move = { path = "crates/sui-move" }
Expand Down
1 change: 1 addition & 0 deletions crates/sui-data-ingestion/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@ tracing.workspace = true
sui-archival.workspace = true
sui-storage.workspace = true
sui-data-ingestion-core.workspace = true
sui-kvstore.workspace = true
sui-types.workspace = true
tempfile.workspace = true
url.workspace = true
Expand Down
16 changes: 16 additions & 0 deletions crates/sui-data-ingestion/src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@ use sui_data_ingestion::{
};
use sui_data_ingestion_core::{DataIngestionMetrics, ReaderOptions};
use sui_data_ingestion_core::{IndexerExecutor, WorkerPool};
use sui_kvstore::{BigTableClient, KvWorker};
use tokio::signal;
use tokio::sync::oneshot;

Expand All @@ -21,6 +22,7 @@ enum Task {
Archival(ArchivalConfig),
Blob(BlobTaskConfig),
KV(KVStoreTaskConfig),
BigTableKV(BigTableTaskConfig),
}

#[derive(Serialize, Deserialize, Clone, Debug)]
Expand All @@ -40,6 +42,11 @@ struct ProgressStoreConfig {
pub table_name: String,
}

#[derive(Serialize, Deserialize, Clone, Debug)]
struct BigTableTaskConfig {
instance_id: String,
}

#[derive(Debug, Clone, Serialize, Deserialize)]
struct IndexerConfig {
path: PathBuf,
Expand Down Expand Up @@ -146,6 +153,15 @@ async fn main() -> Result<()> {
);
executor.register(worker_pool).await?;
}
Task::BigTableKV(kv_config) => {
let client = BigTableClient::new_remote(kv_config.instance_id, false, None).await?;
let worker_pool = WorkerPool::new(
KvWorker { client },
task_config.name,
task_config.concurrency,
);
executor.register(worker_pool).await?;
}
};
}
let reader_options = ReaderOptions {
Expand Down
1 change: 1 addition & 0 deletions crates/sui-kvstore/src/bigtable/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,3 +3,4 @@

pub(crate) mod client;
mod proto;
pub(crate) mod worker;
39 changes: 39 additions & 0 deletions crates/sui-kvstore/src/bigtable/worker.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,39 @@
// Copyright (c) Mysten Labs, Inc.
// SPDX-License-Identifier: Apache-2.0

use crate::{BigTableClient, KeyValueStoreWriter, TransactionData};
use async_trait::async_trait;
use sui_data_ingestion_core::Worker;
use sui_types::full_checkpoint_content::CheckpointData;

pub struct KvWorker {
pub client: BigTableClient,
}

#[async_trait]
impl Worker for KvWorker {
type Result = ();

async fn process_checkpoint(&self, checkpoint: &CheckpointData) -> anyhow::Result<()> {
let mut client = self.client.clone();
let mut objects = vec![];
let mut transactions = vec![];
for transaction in &checkpoint.transactions {
let full_transaction = TransactionData {
transaction: transaction.transaction.clone(),
effects: transaction.effects.clone(),
events: transaction.events.clone(),
checkpoint_number: checkpoint.checkpoint_summary.sequence_number,
timestamp: checkpoint.checkpoint_summary.timestamp_ms,
};
for object in &transaction.output_objects {
objects.push(object);
}
transactions.push(full_transaction);
}
client.save_objects(&objects).await?;
client.save_transactions(&transactions).await?;
client.save_checkpoint(checkpoint).await?;
Ok(())
}
}
1 change: 1 addition & 0 deletions crates/sui-kvstore/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ mod bigtable;
use anyhow::Result;
use async_trait::async_trait;
pub use bigtable::client::BigTableClient;
pub use bigtable::worker::KvWorker;
use sui_types::crypto::AuthorityStrongQuorumSignInfo;
use sui_types::digests::TransactionDigest;
use sui_types::effects::{TransactionEffects, TransactionEvents};
Expand Down
39 changes: 3 additions & 36 deletions crates/sui-kvstore/src/main.rs
Original file line number Diff line number Diff line change
@@ -1,44 +1,11 @@
// Copyright (c) Mysten Labs, Inc.
// SPDX-License-Identifier: Apache-2.0
use anyhow::Result;
use async_trait::async_trait;
use sui_data_ingestion_core::{setup_single_workflow, Worker};
use sui_kvstore::{BigTableClient, KeyValueStoreWriter, TransactionData};
use sui_types::full_checkpoint_content::CheckpointData;
use sui_data_ingestion_core::setup_single_workflow;
use sui_kvstore::BigTableClient;
use sui_kvstore::KvWorker;
use telemetry_subscribers::TelemetryConfig;

struct KvWorker {
client: BigTableClient,
}

#[async_trait]
impl Worker for KvWorker {
type Result = ();

async fn process_checkpoint(&self, checkpoint: &CheckpointData) -> Result<()> {
let mut client = self.client.clone();
let mut objects = vec![];
let mut transactions = vec![];
for transaction in &checkpoint.transactions {
let full_transaction = TransactionData {
transaction: transaction.transaction.clone(),
effects: transaction.effects.clone(),
events: transaction.events.clone(),
checkpoint_number: checkpoint.checkpoint_summary.sequence_number,
timestamp: checkpoint.checkpoint_summary.timestamp_ms,
};
for object in &transaction.output_objects {
objects.push(object);
}
transactions.push(full_transaction);
}
client.save_objects(&objects).await?;
client.save_transactions(&transactions).await?;
client.save_checkpoint(checkpoint).await?;
Ok(())
}
}

#[tokio::main]
async fn main() -> Result<()> {
let _guard = TelemetryConfig::new().with_env().init();
Expand Down

0 comments on commit d95918c

Please sign in to comment.