From a22f730219549d4f6409abb3c94296d0919d4ac2 Mon Sep 17 00:00:00 2001 From: Thomas Date: Sun, 5 Jan 2025 11:25:36 -0400 Subject: [PATCH] indexing first dao_proposals --- .vscode/settings.json | 3 + src/db/db_types.rs | 21 +-- src/db/mod.rs | 169 ++++++++++++++++++----- src/entrypoints/sputnik/mod.rs | 21 ++- src/entrypoints/sputnik/sputnik_types.rs | 21 +-- src/nearblocks_client/sputnik.rs | 20 ++- src/nearblocks_client/transactions.rs | 86 ++++++++++-- 7 files changed, 241 insertions(+), 100 deletions(-) create mode 100644 .vscode/settings.json diff --git a/.vscode/settings.json b/.vscode/settings.json new file mode 100644 index 0000000..92e3150 --- /dev/null +++ b/.vscode/settings.json @@ -0,0 +1,3 @@ +{ + "cSpell.words": ["astradao"] +} diff --git a/src/db/db_types.rs b/src/db/db_types.rs index 7c07c05..dbc8ed6 100644 --- a/src/db/db_types.rs +++ b/src/db/db_types.rs @@ -140,34 +140,17 @@ pub struct RfpDumpRecord { pub rfp_id: i32, } -#[derive(Debug, Clone, FromRow, Serialize, Deserialize, ToSchema)] -pub struct SputnikTxnsRecord { - pub id: i64, - pub hash: String, - pub author_id: String, - pub dao_instance: String, - pub proposer: String, - pub description: String, - pub kind: String, - pub status: String, - pub total_votes: i64, - pub vote_counts: serde_json::Value, - pub votes: serde_json::Value, - pub submission_time: i64, - pub proposal_action: String, -} - #[derive(Debug, Clone, FromRow, Serialize, Deserialize, ToSchema)] pub struct SputnikProposalSnapshotRecord { pub description: String, - pub id: i64, + pub id: i32, pub kind: serde_json::Value, pub proposer: String, pub status: String, pub submission_time: i64, pub vote_counts: serde_json::Value, pub votes: serde_json::Value, - pub total_votes: i64, + pub total_votes: i32, pub dao_instance: String, pub proposal_action: String, pub tx_timestamp: i64, diff --git a/src/db/mod.rs b/src/db/mod.rs index 9d17c6f..1a7c2db 100644 --- a/src/db/mod.rs +++ b/src/db/mod.rs @@ -902,48 +902,140 @@ impl DB { pub async fn upsert_dao_proposal_snapshot( tx: &mut Transaction<'static, Postgres>, - record: SputnikProposalSnapshotRecord, + sputnik_proposal: SputnikProposalSnapshotRecord, ) -> anyhow::Result<()> { - let sql = r#" - INSERT INTO dao_proposals (description, id, kind, proposer, status, submission_time, vote_counts, votes, total_votes, dao_instance, proposal_action, tx_timestamp, hash) - VALUES ($1, $2, $3, $4, $5, $6, $7, $8, $9, $10, $11, $12, $13) - ON CONFLICT (id) DO UPDATE SET - description = $1, - kind = $3, - proposer = $4, - status = $5, - submission_time = $6, - vote_counts = $7, - votes = $8, - total_votes = $9, - dao_instance = $10, - proposal_action = $11, - tx_timestamp = $12, - hash = $13 - "#; - let result = sqlx::query(sql) - .bind(record.description) - .bind(record.id) - .bind(record.kind) - .bind(record.proposer) - .bind(record.status) - .bind(record.submission_time) - .bind(record.vote_counts) - .bind(record.votes) - .bind(record.total_votes) - .bind(record.dao_instance) - .bind(record.proposal_action) - .bind(record.tx_timestamp) - .bind(record.hash) - .execute(tx.as_mut()) - .await; + // JSONB fields + let kind = match serde_json::to_value(sputnik_proposal.kind) { + Ok(value) => value, + Err(e) => { + eprintln!("Error converting kind to JSON: {:?}", e); + return Err(e.into()); + } + }; - match result { - Ok(_) => Ok(()), + let vote_counts = match serde_json::to_value(sputnik_proposal.vote_counts) { + Ok(value) => value, Err(e) => { - eprintln!("Failed to insert dao proposal snapshot: {:?}", e); - Err(anyhow::anyhow!("Failed to insert dao proposal snapshot")) + eprintln!("Error converting vote_counts to JSON: {:?}", e); + return Err(e.into()); + } + }; + + let votes = match serde_json::to_value(&sputnik_proposal.votes) { + Ok(value) => value, + Err(e) => { + eprintln!("Error converting votes to JSON: {:?}", e); + return Err(e.into()); + } + }; + + // Attempt to update the existing record + let update_result = sqlx::query!( + r#" + UPDATE dao_proposals SET + description = $1, + kind = $2, + proposer = $3, + status = $4, + submission_time = $5, + vote_counts = $6, + votes = $7, + total_votes = $8, + dao_instance = $9, + proposal_action = $10, + tx_timestamp = $11, + hash = $12 + WHERE id = $13 + RETURNING id + "#, + sputnik_proposal.description, + kind, + sputnik_proposal.proposer, + sputnik_proposal.status, + sputnik_proposal.submission_time, + vote_counts, + votes, + sputnik_proposal.total_votes as i32, + sputnik_proposal.dao_instance, + sputnik_proposal.proposal_action, + sputnik_proposal.tx_timestamp, + sputnik_proposal.hash, + sputnik_proposal.id as i32 + ) + .fetch_optional(tx.as_mut()) + .await?; + + if let Some(record) = update_result { + println!("Updated dao proposal snapshot: {:?}", record.id); + Ok(()) + } else { + println!("Inserting description: {:?}", sputnik_proposal.description); + println!("Inserting id: {:?}", sputnik_proposal.id); + println!("Inserting kind: {:?}", kind); + println!("Inserting proposer: {:?}", sputnik_proposal.proposer); + println!("Inserting status: {:?}", sputnik_proposal.status); + println!( + "Inserting submission_time: {:?}", + sputnik_proposal.submission_time + ); + println!("Inserting vote_counts: {:?}", vote_counts); + println!("Inserting votes: {:?}", sputnik_proposal.votes); + println!("Inserting total_votes: {:?}", sputnik_proposal.total_votes); + println!( + "Inserting dao_instance: {:?}", + sputnik_proposal.dao_instance + ); + println!( + "Inserting proposal_action: {:?}", + sputnik_proposal.proposal_action + ); + println!( + "Inserting tx_timestamp: {:?}", + sputnik_proposal.tx_timestamp + ); + println!("Inserting hash: {:?}", sputnik_proposal.hash); + // If no rows were updated, insert a new record + let rec = sqlx::query!( + r#" + INSERT INTO dao_proposals ( + description, id, kind, proposer, status, submission_time, vote_counts, votes, total_votes, dao_instance, proposal_action, tx_timestamp, hash + ) VALUES ( + $1, $2, $3, $4, $5, $6, $7, $8, $9, $10, $11, $12, $13 + ) + ON CONFLICT (id) DO NOTHING + RETURNING id + "#, + sputnik_proposal.description, + sputnik_proposal.id as i32, + serde_json::Value::String("".to_string()), // kind: + sputnik_proposal.proposer, + sputnik_proposal.status, + sputnik_proposal.submission_time, + serde_json::Value::String("".to_string()), // vote_counts: + serde_json::Value::String("".to_string()), // votes: + sputnik_proposal.total_votes as i32, + sputnik_proposal.dao_instance, + sputnik_proposal.proposal_action, + sputnik_proposal.tx_timestamp, + sputnik_proposal.hash + ) + .fetch_optional(tx.as_mut()) + .await; + + match rec { + Ok(Some(record)) => { + println!("Inserted dao proposal snapshot: {:?}", record.id); + } + Ok(None) => { + println!("No record inserted due to conflict or other issue."); + eprintln!("No record inserted due to conflict or other issue."); + } + Err(e) => { + eprintln!("Error inserting dao proposal snapshot: {:?}", e); + return Err(anyhow::anyhow!("Failed to insert dao proposal snapshot")); + } } + Ok(()) } } @@ -972,6 +1064,7 @@ impl DB { * AND ($2 IS NULL OR kind->>'key' ILIKE '%' || $2 || '%') AND ($3 IS NULL OR status->>'key' ILIKE '%' || $3 || '%') */ + println!("where dao_instance: {:?}", dao_instance); let sql = format!( r#" SELECT * diff --git a/src/entrypoints/sputnik/mod.rs b/src/entrypoints/sputnik/mod.rs index 03ded71..192f338 100644 --- a/src/entrypoints/sputnik/mod.rs +++ b/src/entrypoints/sputnik/mod.rs @@ -1,6 +1,6 @@ use crate::db::db_types::SputnikProposalSnapshotRecord; use crate::db::DB; -use crate::nearblocks_client::transactions::update_nearblocks_data; +use crate::nearblocks_client::transactions::update_dao_via_nearblocks; use crate::types::PaginatedResponse; use near_account_id::AccountId; use rocket::http::Status; @@ -72,23 +72,18 @@ async fn get_dao_proposals( } }; - let current_timestamp_nano = chrono::Utc::now().timestamp_nanos_opt().unwrap(); let last_updated_info = db .get_last_updated_info_for_contract(&contract) .await .unwrap(); - if current_timestamp_nano - last_updated_info.after_date - >= chrono::Duration::seconds(2).num_nanoseconds().unwrap() - { - update_nearblocks_data( - db.inner(), - &contract, - nearblocks_api_key.inner(), - Some(last_updated_info.after_block), - ) - .await; - } + update_dao_via_nearblocks( + db.inner(), + &contract, + nearblocks_api_key.inner(), + Some(last_updated_info.after_block), + ) + .await; let (proposals, total) = fetch_dao_proposals(db, account_id, limit, order, offset, filters).await; diff --git a/src/entrypoints/sputnik/sputnik_types.rs b/src/entrypoints/sputnik/sputnik_types.rs index 524c64e..75f68b9 100644 --- a/src/entrypoints/sputnik/sputnik_types.rs +++ b/src/entrypoints/sputnik/sputnik_types.rs @@ -7,6 +7,7 @@ use std::collections::{HashMap, HashSet}; pub type OldAccountId = String; use near_sdk::json_types::Base64VecU8; use std::cmp::min; +use std::fmt; use super::policy::VersionedPolicy; @@ -220,7 +221,7 @@ impl WeightOrRatio { } } -/// How the voting policy votes get weigthed. +/// How the voting policy votes get weighted. #[derive(BorshSerialize, BorshDeserialize, Serialize, Deserialize, Clone, PartialEq)] #[cfg_attr(not(target_arch = "wasm32"), derive(Debug))] #[serde(crate = "near_sdk::serde")] @@ -296,16 +297,16 @@ pub enum ProposalStatus { Failed, } -impl ToString for ProposalStatus { - fn to_string(&self) -> String { +impl fmt::Display for ProposalStatus { + fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { match self { - ProposalStatus::InProgress => "InProgress".to_string(), - ProposalStatus::Approved => "Approved".to_string(), - ProposalStatus::Rejected => "Rejected".to_string(), - ProposalStatus::Removed => "Removed".to_string(), - ProposalStatus::Expired => "Expired".to_string(), - ProposalStatus::Moved => "Moved".to_string(), - ProposalStatus::Failed => "Failed".to_string(), + ProposalStatus::InProgress => write!(f, "InProgress"), + ProposalStatus::Approved => write!(f, "Approved"), + ProposalStatus::Rejected => write!(f, "Rejected"), + ProposalStatus::Removed => write!(f, "Removed"), + ProposalStatus::Expired => write!(f, "Expired"), + ProposalStatus::Moved => write!(f, "Moved"), + ProposalStatus::Failed => write!(f, "Failed"), } } } diff --git a/src/nearblocks_client/sputnik.rs b/src/nearblocks_client/sputnik.rs index 9c67821..6bd58ce 100644 --- a/src/nearblocks_client/sputnik.rs +++ b/src/nearblocks_client/sputnik.rs @@ -81,16 +81,6 @@ pub async fn handle_add_proposal( db: &State, contract: &AccountId, ) -> Result<(), Status> { - let action = transaction - .actions - .as_ref() - .and_then(|actions| actions.first()) - .ok_or(Status::InternalServerError)?; - let json_args = action.args.clone(); - let args: AddProposalArgs = serde_json::from_str(&json_args.unwrap_or_default()).unwrap(); - - println!("Args: {:?}", args); - let rpc_service = RpcService::new(contract); // get last proposal id @@ -135,6 +125,7 @@ pub async fn handle_add_proposal( }; println!("Proposal: {:?}", daop.id); + println!("Proposal description: {:?}", daop.proposal.description); let proposal_action = decode_proposal_description("isStakeRequest", &daop.proposal.description); // TODO check v1 isStakeRequest as well @@ -153,7 +144,7 @@ pub async fn handle_add_proposal( submission_time: daop.proposal.submission_time.0 as i64, vote_counts: serde_json::to_value(daop.proposal.vote_counts).unwrap(), votes: serde_json::to_value(&daop.proposal.votes).unwrap(), - total_votes: daop.proposal.votes.len() as i64, + total_votes: daop.proposal.votes.len() as i32, dao_instance: contract.to_string(), proposal_action, tx_timestamp: transaction.block_timestamp.parse::().unwrap(), @@ -168,6 +159,11 @@ pub async fn handle_add_proposal( Status::InternalServerError })?; + tx.commit().await.map_err(|e| { + eprintln!("Failed to commit transaction: {:?}", e); + Status::InternalServerError + })?; + println!("Inserted proposal snapshot {}", daop.id); Ok(()) @@ -229,7 +225,7 @@ pub async fn handle_act_proposal( submission_time: dao_proposal.proposal.submission_time.0 as i64, vote_counts: serde_json::to_value(dao_proposal.proposal.vote_counts).unwrap(), votes: serde_json::to_value(&dao_proposal.proposal.votes).unwrap(), - total_votes: dao_proposal.proposal.votes.len() as i64, + total_votes: dao_proposal.proposal.votes.len() as i32, dao_instance: contract.to_string(), proposal_action, tx_timestamp: transaction.block_timestamp.parse::().unwrap(), diff --git a/src/nearblocks_client/transactions.rs b/src/nearblocks_client/transactions.rs index 3240613..53e65c0 100644 --- a/src/nearblocks_client/transactions.rs +++ b/src/nearblocks_client/transactions.rs @@ -98,6 +98,84 @@ pub async fn update_nearblocks_data( } } +pub async fn update_dao_via_nearblocks( + db: &DB, + contract: &AccountId, + nearblocks_api_key: &str, + after_block: Option, +) { + let nearblocks_client = nearblocks_client::ApiClient::new(nearblocks_api_key.to_string()); + + let (all_transactions, _) = + fetch_all_new_transactions(&nearblocks_client, contract, after_block).await; + + println!("Total transactions fetched: {}", all_transactions.len()); + + let _ = nearblocks_client::transactions::process_dao_transactions( + &all_transactions, + db.into(), + contract, + ) + .await; + + if let Some(transaction) = all_transactions.last() { + let timestamp_nano = transaction.block_timestamp.parse::().unwrap(); + let _ = db + .set_last_updated_info_for_contract( + contract, + timestamp_nano, + transaction.block.block_height, + ) + .await; + } +} + +pub async fn process_dao_transactions( + transactions: &[Transaction], + db: &State, + contract: &AccountId, +) -> Result<(), Status> { + for transaction in transactions.iter() { + if let Some(action) = transaction + .actions + .as_ref() + .and_then(|actions| actions.first()) + { + if !transaction.receipt_outcome.status { + eprintln!( + "Proposal receipt outcome status is {:?}", + transaction.receipt_outcome.status + ); + // eprintln!("On transaction: {:?}", transaction); + continue; + } + let result = match action.method.as_deref().unwrap_or("") { + // TODO can't reuse this because the other contract has a function with the same name + "add_proposal" => { + println!("add_proposal"); + handle_add_proposal(transaction.to_owned(), db, contract).await + } + // TODO: Uncomment this + // "act_proposal" => { + // println!("act_proposal"); + // handle_act_proposal(transaction.to_owned(), db, contract).await + // } + _ => { + if action.action == "FUNCTION_CALL" { + println!("Unhandled method: {:?}", action.method.as_ref().unwrap()); + } else { + println!("Unhandled action: {:?}", action.action); + } + continue; + } + }; + result?; + } + } + + Ok(()) +} + pub async fn process( transactions: &[Transaction], db: &State, @@ -154,14 +232,6 @@ pub async fn process( println!("set_rfp_block_height_callback"); handle_set_rfp_block_height_callback(transaction.to_owned(), db, contract).await } - "add_proposal" => { - println!("add_proposal"); - handle_add_proposal(transaction.to_owned(), db, contract).await - } - // "act_proposal" => { - // println!("act_proposal"); - // handle_act_proposal(transaction.to_owned(), db, contract).await - // } _ => { if action.action == "FUNCTION_CALL" { // println!("Unhandled method: {:?}", action.method.as_ref().unwrap());