Skip to content

Commit

Permalink
indexing first dao_proposals
Browse files Browse the repository at this point in the history
  • Loading branch information
Tguntenaar committed Jan 5, 2025
1 parent 9539b3a commit a22f730
Show file tree
Hide file tree
Showing 7 changed files with 241 additions and 100 deletions.
3 changes: 3 additions & 0 deletions .vscode/settings.json
Original file line number Diff line number Diff line change
@@ -0,0 +1,3 @@
{
"cSpell.words": ["astradao"]
}
21 changes: 2 additions & 19 deletions src/db/db_types.rs
Original file line number Diff line number Diff line change
Expand Up @@ -140,34 +140,17 @@ pub struct RfpDumpRecord {
pub rfp_id: i32,
}

#[derive(Debug, Clone, FromRow, Serialize, Deserialize, ToSchema)]
pub struct SputnikTxnsRecord {
pub id: i64,
pub hash: String,
pub author_id: String,
pub dao_instance: String,
pub proposer: String,
pub description: String,
pub kind: String,
pub status: String,
pub total_votes: i64,
pub vote_counts: serde_json::Value,
pub votes: serde_json::Value,
pub submission_time: i64,
pub proposal_action: String,
}

#[derive(Debug, Clone, FromRow, Serialize, Deserialize, ToSchema)]
pub struct SputnikProposalSnapshotRecord {
pub description: String,
pub id: i64,
pub id: i32,
pub kind: serde_json::Value,
pub proposer: String,
pub status: String,
pub submission_time: i64,
pub vote_counts: serde_json::Value,
pub votes: serde_json::Value,
pub total_votes: i64,
pub total_votes: i32,
pub dao_instance: String,
pub proposal_action: String,
pub tx_timestamp: i64,
Expand Down
169 changes: 131 additions & 38 deletions src/db/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -902,48 +902,140 @@ impl DB {

pub async fn upsert_dao_proposal_snapshot(
tx: &mut Transaction<'static, Postgres>,
record: SputnikProposalSnapshotRecord,
sputnik_proposal: SputnikProposalSnapshotRecord,
) -> anyhow::Result<()> {
let sql = r#"
INSERT INTO dao_proposals (description, id, kind, proposer, status, submission_time, vote_counts, votes, total_votes, dao_instance, proposal_action, tx_timestamp, hash)
VALUES ($1, $2, $3, $4, $5, $6, $7, $8, $9, $10, $11, $12, $13)
ON CONFLICT (id) DO UPDATE SET
description = $1,
kind = $3,
proposer = $4,
status = $5,
submission_time = $6,
vote_counts = $7,
votes = $8,
total_votes = $9,
dao_instance = $10,
proposal_action = $11,
tx_timestamp = $12,
hash = $13
"#;
let result = sqlx::query(sql)
.bind(record.description)
.bind(record.id)
.bind(record.kind)
.bind(record.proposer)
.bind(record.status)
.bind(record.submission_time)
.bind(record.vote_counts)
.bind(record.votes)
.bind(record.total_votes)
.bind(record.dao_instance)
.bind(record.proposal_action)
.bind(record.tx_timestamp)
.bind(record.hash)
.execute(tx.as_mut())
.await;
// JSONB fields
let kind = match serde_json::to_value(sputnik_proposal.kind) {
Ok(value) => value,
Err(e) => {
eprintln!("Error converting kind to JSON: {:?}", e);
return Err(e.into());
}
};

match result {
Ok(_) => Ok(()),
let vote_counts = match serde_json::to_value(sputnik_proposal.vote_counts) {
Ok(value) => value,
Err(e) => {
eprintln!("Failed to insert dao proposal snapshot: {:?}", e);
Err(anyhow::anyhow!("Failed to insert dao proposal snapshot"))
eprintln!("Error converting vote_counts to JSON: {:?}", e);
return Err(e.into());
}
};

let votes = match serde_json::to_value(&sputnik_proposal.votes) {
Ok(value) => value,
Err(e) => {
eprintln!("Error converting votes to JSON: {:?}", e);
return Err(e.into());
}
};

// Attempt to update the existing record
let update_result = sqlx::query!(
r#"
UPDATE dao_proposals SET
description = $1,
kind = $2,
proposer = $3,
status = $4,
submission_time = $5,
vote_counts = $6,
votes = $7,
total_votes = $8,
dao_instance = $9,
proposal_action = $10,
tx_timestamp = $11,
hash = $12
WHERE id = $13
RETURNING id
"#,
sputnik_proposal.description,
kind,
sputnik_proposal.proposer,
sputnik_proposal.status,
sputnik_proposal.submission_time,
vote_counts,
votes,
sputnik_proposal.total_votes as i32,
sputnik_proposal.dao_instance,
sputnik_proposal.proposal_action,
sputnik_proposal.tx_timestamp,
sputnik_proposal.hash,
sputnik_proposal.id as i32
)
.fetch_optional(tx.as_mut())
.await?;

if let Some(record) = update_result {
println!("Updated dao proposal snapshot: {:?}", record.id);
Ok(())
} else {
println!("Inserting description: {:?}", sputnik_proposal.description);
println!("Inserting id: {:?}", sputnik_proposal.id);
println!("Inserting kind: {:?}", kind);
println!("Inserting proposer: {:?}", sputnik_proposal.proposer);
println!("Inserting status: {:?}", sputnik_proposal.status);
println!(
"Inserting submission_time: {:?}",
sputnik_proposal.submission_time
);
println!("Inserting vote_counts: {:?}", vote_counts);
println!("Inserting votes: {:?}", sputnik_proposal.votes);
println!("Inserting total_votes: {:?}", sputnik_proposal.total_votes);
println!(
"Inserting dao_instance: {:?}",
sputnik_proposal.dao_instance
);
println!(
"Inserting proposal_action: {:?}",
sputnik_proposal.proposal_action
);
println!(
"Inserting tx_timestamp: {:?}",
sputnik_proposal.tx_timestamp
);
println!("Inserting hash: {:?}", sputnik_proposal.hash);
// If no rows were updated, insert a new record
let rec = sqlx::query!(
r#"
INSERT INTO dao_proposals (
description, id, kind, proposer, status, submission_time, vote_counts, votes, total_votes, dao_instance, proposal_action, tx_timestamp, hash
) VALUES (
$1, $2, $3, $4, $5, $6, $7, $8, $9, $10, $11, $12, $13
)
ON CONFLICT (id) DO NOTHING
RETURNING id
"#,
sputnik_proposal.description,
sputnik_proposal.id as i32,
serde_json::Value::String("".to_string()), // kind:
sputnik_proposal.proposer,
sputnik_proposal.status,
sputnik_proposal.submission_time,
serde_json::Value::String("".to_string()), // vote_counts:
serde_json::Value::String("".to_string()), // votes:
sputnik_proposal.total_votes as i32,
sputnik_proposal.dao_instance,
sputnik_proposal.proposal_action,
sputnik_proposal.tx_timestamp,
sputnik_proposal.hash
)
.fetch_optional(tx.as_mut())
.await;

match rec {
Ok(Some(record)) => {
println!("Inserted dao proposal snapshot: {:?}", record.id);
}
Ok(None) => {
println!("No record inserted due to conflict or other issue.");
eprintln!("No record inserted due to conflict or other issue.");
}
Err(e) => {
eprintln!("Error inserting dao proposal snapshot: {:?}", e);
return Err(anyhow::anyhow!("Failed to insert dao proposal snapshot"));
}
}
Ok(())
}
}

Expand Down Expand Up @@ -972,6 +1064,7 @@ impl DB {
* AND ($2 IS NULL OR kind->>'key' ILIKE '%' || $2 || '%')
AND ($3 IS NULL OR status->>'key' ILIKE '%' || $3 || '%')
*/
println!("where dao_instance: {:?}", dao_instance);
let sql = format!(
r#"
SELECT *
Expand Down
21 changes: 8 additions & 13 deletions src/entrypoints/sputnik/mod.rs
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
use crate::db::db_types::SputnikProposalSnapshotRecord;
use crate::db::DB;
use crate::nearblocks_client::transactions::update_nearblocks_data;
use crate::nearblocks_client::transactions::update_dao_via_nearblocks;
use crate::types::PaginatedResponse;
use near_account_id::AccountId;
use rocket::http::Status;
Expand Down Expand Up @@ -72,23 +72,18 @@ async fn get_dao_proposals(
}
};

let current_timestamp_nano = chrono::Utc::now().timestamp_nanos_opt().unwrap();
let last_updated_info = db
.get_last_updated_info_for_contract(&contract)
.await
.unwrap();

if current_timestamp_nano - last_updated_info.after_date
>= chrono::Duration::seconds(2).num_nanoseconds().unwrap()
{
update_nearblocks_data(
db.inner(),
&contract,
nearblocks_api_key.inner(),
Some(last_updated_info.after_block),
)
.await;
}
update_dao_via_nearblocks(
db.inner(),
&contract,
nearblocks_api_key.inner(),
Some(last_updated_info.after_block),
)
.await;

let (proposals, total) =
fetch_dao_proposals(db, account_id, limit, order, offset, filters).await;
Expand Down
21 changes: 11 additions & 10 deletions src/entrypoints/sputnik/sputnik_types.rs
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ use std::collections::{HashMap, HashSet};
pub type OldAccountId = String;
use near_sdk::json_types::Base64VecU8;
use std::cmp::min;
use std::fmt;

use super::policy::VersionedPolicy;

Expand Down Expand Up @@ -220,7 +221,7 @@ impl WeightOrRatio {
}
}

/// How the voting policy votes get weigthed.
/// How the voting policy votes get weighted.
#[derive(BorshSerialize, BorshDeserialize, Serialize, Deserialize, Clone, PartialEq)]
#[cfg_attr(not(target_arch = "wasm32"), derive(Debug))]
#[serde(crate = "near_sdk::serde")]
Expand Down Expand Up @@ -296,16 +297,16 @@ pub enum ProposalStatus {
Failed,
}

impl ToString for ProposalStatus {
fn to_string(&self) -> String {
impl fmt::Display for ProposalStatus {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
match self {
ProposalStatus::InProgress => "InProgress".to_string(),
ProposalStatus::Approved => "Approved".to_string(),
ProposalStatus::Rejected => "Rejected".to_string(),
ProposalStatus::Removed => "Removed".to_string(),
ProposalStatus::Expired => "Expired".to_string(),
ProposalStatus::Moved => "Moved".to_string(),
ProposalStatus::Failed => "Failed".to_string(),
ProposalStatus::InProgress => write!(f, "InProgress"),
ProposalStatus::Approved => write!(f, "Approved"),
ProposalStatus::Rejected => write!(f, "Rejected"),
ProposalStatus::Removed => write!(f, "Removed"),
ProposalStatus::Expired => write!(f, "Expired"),
ProposalStatus::Moved => write!(f, "Moved"),
ProposalStatus::Failed => write!(f, "Failed"),
}
}
}
Expand Down
20 changes: 8 additions & 12 deletions src/nearblocks_client/sputnik.rs
Original file line number Diff line number Diff line change
Expand Up @@ -81,16 +81,6 @@ pub async fn handle_add_proposal(
db: &State<DB>,
contract: &AccountId,
) -> Result<(), Status> {
let action = transaction
.actions
.as_ref()
.and_then(|actions| actions.first())
.ok_or(Status::InternalServerError)?;
let json_args = action.args.clone();
let args: AddProposalArgs = serde_json::from_str(&json_args.unwrap_or_default()).unwrap();

println!("Args: {:?}", args);

let rpc_service = RpcService::new(contract);

// get last proposal id
Expand Down Expand Up @@ -135,6 +125,7 @@ pub async fn handle_add_proposal(
};

println!("Proposal: {:?}", daop.id);
println!("Proposal description: {:?}", daop.proposal.description);

let proposal_action = decode_proposal_description("isStakeRequest", &daop.proposal.description); // TODO check v1 isStakeRequest as well

Expand All @@ -153,7 +144,7 @@ pub async fn handle_add_proposal(
submission_time: daop.proposal.submission_time.0 as i64,
vote_counts: serde_json::to_value(daop.proposal.vote_counts).unwrap(),
votes: serde_json::to_value(&daop.proposal.votes).unwrap(),
total_votes: daop.proposal.votes.len() as i64,
total_votes: daop.proposal.votes.len() as i32,
dao_instance: contract.to_string(),
proposal_action,
tx_timestamp: transaction.block_timestamp.parse::<i64>().unwrap(),
Expand All @@ -168,6 +159,11 @@ pub async fn handle_add_proposal(
Status::InternalServerError
})?;

tx.commit().await.map_err(|e| {
eprintln!("Failed to commit transaction: {:?}", e);
Status::InternalServerError
})?;

println!("Inserted proposal snapshot {}", daop.id);

Ok(())
Expand Down Expand Up @@ -229,7 +225,7 @@ pub async fn handle_act_proposal(
submission_time: dao_proposal.proposal.submission_time.0 as i64,
vote_counts: serde_json::to_value(dao_proposal.proposal.vote_counts).unwrap(),
votes: serde_json::to_value(&dao_proposal.proposal.votes).unwrap(),
total_votes: dao_proposal.proposal.votes.len() as i64,
total_votes: dao_proposal.proposal.votes.len() as i32,
dao_instance: contract.to_string(),
proposal_action,
tx_timestamp: transaction.block_timestamp.parse::<i64>().unwrap(),
Expand Down
Loading

0 comments on commit a22f730

Please sign in to comment.