From 69559e0b70f49197f0ae645820960e4cfd8852e2 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Rafael=20C=C3=A1rdenas?= Date: Sat, 6 Jul 2024 22:44:59 -0600 Subject: [PATCH] fix: move unallocated without need for a runestone (#22) * fix: move unallocated without need for a runestone * fix: block output cache --- src/db/cache/index_cache.rs | 106 +++++++++++++++--------------- src/db/cache/transaction_cache.rs | 103 +++++++---------------------- src/db/cache/utils.rs | 36 +++++----- src/db/index.rs | 83 +++++++++++++++-------- 4 files changed, 152 insertions(+), 176 deletions(-) diff --git a/src/db/cache/index_cache.rs b/src/db/cache/index_cache.rs index 35a33e7..bab51ef 100644 --- a/src/db/cache/index_cache.rs +++ b/src/db/cache/index_cache.rs @@ -1,10 +1,7 @@ use std::{collections::HashMap, num::NonZeroUsize, str::FromStr}; -use bitcoin::Network; -use chainhook_sdk::{ - types::bitcoin::{TxIn, TxOut}, - utils::Context, -}; +use bitcoin::{Network, ScriptBuf}; +use chainhook_sdk::{types::bitcoin::TxIn, utils::Context}; use lru::LruCache; use ordinals::{Cenotaph, Edict, Etching, Rune, RuneId, Runestone}; use tokio_postgres::{Client, Transaction}; @@ -26,13 +23,14 @@ use crate::{ use super::{ db_cache::DbCache, transaction_cache::{InputRuneBalance, TransactionCache}, - utils::move_tx_output_cache_to_output_cache, + transaction_location::TransactionLocation, + utils::move_block_output_cache_to_output_cache, }; /// Holds rune data across multiple blocks for faster computations. Processes rune events as they happen during transactions and /// generates database rows for later insertion. pub struct IndexCache { - network: Network, + pub network: Network, /// Number to be assigned to the next rune etching. next_rune_number: u32, /// LRU cache for runes. @@ -41,9 +39,9 @@ pub struct IndexCache { rune_total_mints_cache: LruCache, /// LRU cache for outputs with rune balances. output_cache: LruCache<(String, u32), HashMap>>, - /// Same as above but only for the current transaction. We use a `HashMap` instead of an LRU cache to make sure we keep all - /// outputs in memory while we index this transaction. Must be cleared every time a new transaction is processed. - tx_output_cache: HashMap<(String, u32), HashMap>>, + /// Same as above but only for the current block. We use a `HashMap` instead of an LRU cache to make sure we keep all outputs + /// in memory while we index this block. Must be cleared every time a new block is processed. + block_output_cache: HashMap<(String, u32), HashMap>>, /// Holds a single transaction's rune cache. Must be cleared every time a new transaction is processed. tx_cache: TransactionCache, /// Keeps rows that have not yet been inserted in the DB. @@ -60,8 +58,21 @@ impl IndexCache { rune_cache: LruCache::new(cap), rune_total_mints_cache: LruCache::new(cap), output_cache: LruCache::new(cap), - tx_output_cache: HashMap::new(), - tx_cache: TransactionCache::new(network, &"".to_string(), 1, 0, &"".to_string(), 0), + block_output_cache: HashMap::new(), + tx_cache: TransactionCache::new( + TransactionLocation { + network, + block_hash: "".to_string(), + block_height: 1, + timestamp: 0, + tx_index: 0, + tx_id: "".to_string(), + }, + HashMap::new(), + HashMap::new(), + None, + 0, + ), db_cache: DbCache::new(), } } @@ -73,69 +84,58 @@ impl IndexCache { /// Creates a fresh transaction index cache. pub async fn begin_transaction( &mut self, - block_hash: &String, - block_height: u64, - tx_index: u32, - tx_id: &String, - timestamp: u32, + location: TransactionLocation, + tx_inputs: &Vec, + eligible_outputs: HashMap, + first_eligible_output: Option, + total_outputs: u32, + db_tx: &mut Transaction<'_>, + ctx: &Context, ) { + let input_runes = input_rune_balances_from_tx_inputs( + tx_inputs, + &self.block_output_cache, + &mut self.output_cache, + db_tx, + ctx, + ) + .await; self.tx_cache = TransactionCache::new( - self.network, - block_hash, - block_height, - tx_index, - tx_id, - timestamp, + location, + input_runes, + eligible_outputs, + first_eligible_output, + total_outputs, ); - self.tx_output_cache.clear(); } - /// Finalizes the current transaction index cache. + /// Finalizes the current transaction index cache by moving all unallocated balances to the correct output. pub fn end_transaction(&mut self, _db_tx: &mut Transaction<'_>, ctx: &Context) { let entries = self.tx_cache.allocate_remaining_balances(ctx); self.add_ledger_entries_to_db_cache(&entries); - move_tx_output_cache_to_output_cache(&mut self.tx_output_cache, &mut self.output_cache); + } + + pub fn end_block(&mut self) { + move_block_output_cache_to_output_cache(&mut self.block_output_cache, &mut self.output_cache); } pub async fn apply_runestone( &mut self, runestone: &Runestone, - tx_inputs: &Vec, - tx_outputs: &Vec, - db_tx: &mut Transaction<'_>, + _db_tx: &mut Transaction<'_>, ctx: &Context, ) { try_debug!(ctx, "{:?} {}", runestone, self.tx_cache.location); - let input_balances = input_rune_balances_from_tx_inputs( - tx_inputs, - &self.tx_output_cache, - &mut self.output_cache, - db_tx, - ctx, - ) - .await; - self.tx_cache.set_input_rune_balances(input_balances, ctx); - self.tx_cache - .apply_runestone_pointer(runestone, tx_outputs, ctx); + self.tx_cache.apply_runestone_pointer(runestone, ctx); } pub async fn apply_cenotaph( &mut self, cenotaph: &Cenotaph, - tx_inputs: &Vec, - db_tx: &mut Transaction<'_>, + _db_tx: &mut Transaction<'_>, ctx: &Context, ) { try_debug!(ctx, "{:?} {}", cenotaph, self.tx_cache.location); - let input_balances = input_rune_balances_from_tx_inputs( - tx_inputs, - &self.tx_output_cache, - &mut self.output_cache, - db_tx, - ctx, - ) - .await; - self.tx_cache.set_input_rune_balances(input_balances, ctx); let entries = self.tx_cache.apply_cenotaph_input_burn(cenotaph); self.add_ledger_entries_to_db_cache(&entries); } @@ -407,7 +407,7 @@ impl IndexCache { address, entry.amount.unwrap(), )); - // Add to current transaction's output cache if it's received balance. + // Add to current block's output cache if it's received balance. let k = (entry.tx_id.clone(), entry.output.unwrap().0); let rune_id = RuneId::from_str(entry.rune_id.as_str()).unwrap(); let balance = InputRuneBalance { @@ -416,7 +416,7 @@ impl IndexCache { }; let mut default = HashMap::new(); default.insert(rune_id, vec![balance.clone()]); - self.tx_output_cache + self.block_output_cache .entry(k) .and_modify(|i| { i.entry(rune_id) diff --git a/src/db/cache/transaction_cache.rs b/src/db/cache/transaction_cache.rs index af724fe..77813ab 100644 --- a/src/db/cache/transaction_cache.rs +++ b/src/db/cache/transaction_cache.rs @@ -3,14 +3,17 @@ use std::{ vec, }; -use bitcoin::{Network, ScriptBuf}; -use chainhook_sdk::{types::bitcoin::TxOut, utils::Context}; +use bitcoin::ScriptBuf; +use chainhook_sdk::utils::Context; use ordinals::{Cenotaph, Edict, Etching, Rune, RuneId, Runestone}; use crate::{ - db::{cache::utils::{is_rune_mintable, new_ledger_entry}, models::{ - db_ledger_entry::DbLedgerEntry, db_ledger_operation::DbLedgerOperation, db_rune::DbRune, - }}, + db::{ + cache::utils::{is_rune_mintable, new_ledger_entry}, + models::{ + db_ledger_entry::DbLedgerEntry, db_ledger_operation::DbLedgerOperation, db_rune::DbRune, + }, + }, try_debug, try_info, try_warn, }; @@ -38,98 +41,38 @@ pub struct TransactionCache { input_runes: HashMap>, /// Non-OP_RETURN outputs in this transaction eligible_outputs: HashMap, + /// Index of the output that should receive unallocated runes if there is no `pointer` present. + first_eligible_output: Option, /// Total outputs contained in this transaction, including OP_RETURN outputs total_outputs: u32, } impl TransactionCache { pub fn new( - network: Network, - block_hash: &String, - block_height: u64, - tx_index: u32, - tx_id: &String, - timestamp: u32, + location: TransactionLocation, + input_runes: HashMap>, + eligible_outputs: HashMap, + first_eligible_output: Option, + total_outputs: u32, ) -> Self { TransactionCache { - location: TransactionLocation { - network, - block_hash: block_hash.clone(), - block_height, - tx_id: tx_id.clone(), - tx_index, - timestamp, - }, + location, next_event_index: 0, etching: None, pointer: None, - input_runes: HashMap::new(), - eligible_outputs: HashMap::new(), - total_outputs: 0, + input_runes, + eligible_outputs, + first_eligible_output, + total_outputs, } } - /// Takes this transaction's input runes and moves them to the unallocated balance for future edict allocation. - pub fn set_input_rune_balances( - &mut self, - input_runes: HashMap>, - _ctx: &Context, - ) { - #[cfg(feature = "debug")] - for (rune_id, vec) in input_runes.iter() { - for input in vec.iter() { - try_debug!( - _ctx, - "Input {} {:?} ({}) {}", - rune_id, - input.address, - input.amount, - self.location - ); - } - } - self.input_runes = input_runes; - } - /// Takes the runestone's output pointer and keeps a record of eligible outputs to send runes to. - pub fn apply_runestone_pointer( - &mut self, - runestone: &Runestone, - tx_outputs: &Vec, - ctx: &Context, - ) { - self.total_outputs = tx_outputs.len() as u32; - // Keep a record of non-OP_RETURN outputs. - let mut first_eligible_output: Option = None; - for (i, output) in tx_outputs.iter().enumerate() { - let Ok(bytes) = hex::decode(&output.script_pubkey[2..]) else { - try_warn!( - ctx, - "Unable to decode script for output {} {}", - i, - self.location - ); - continue; - }; - let script = ScriptBuf::from_bytes(bytes); - if !script.is_op_return() { - if first_eligible_output.is_none() { - first_eligible_output = Some(i as u32); - } - self.eligible_outputs.insert(i as u32, script); - } - } - if first_eligible_output.is_none() { - try_info!( - ctx, - "No eligible non-OP_RETURN output found, all runes will be burnt {}", - self.location - ); - } + pub fn apply_runestone_pointer(&mut self, runestone: &Runestone, _ctx: &Context) { self.pointer = if runestone.pointer.is_some() { runestone.pointer - } else if first_eligible_output.is_some() { - first_eligible_output + } else if self.first_eligible_output.is_some() { + self.first_eligible_output } else { None }; diff --git a/src/db/cache/utils.rs b/src/db/cache/utils.rs index 9528846..009ebed 100644 --- a/src/db/cache/utils.rs +++ b/src/db/cache/utils.rs @@ -8,7 +8,9 @@ use tokio_postgres::Transaction; use crate::{ db::{ - models::{db_ledger_entry::DbLedgerEntry, db_ledger_operation::DbLedgerOperation, db_rune::DbRune}, + models::{ + db_ledger_entry::DbLedgerEntry, db_ledger_operation::DbLedgerOperation, db_rune::DbRune, + }, pg_get_input_rune_balances, }, try_info, try_warn, @@ -20,7 +22,7 @@ use super::{transaction_cache::InputRuneBalance, transaction_location::Transacti /// cache and the DB when there are cache misses. pub async fn input_rune_balances_from_tx_inputs( tx_inputs: &Vec, - tx_output_cache: &HashMap<(String, u32), HashMap>>, + block_output_cache: &HashMap<(String, u32), HashMap>>, output_cache: &mut LruCache<(String, u32), HashMap>>, db_tx: &mut Transaction<'_>, ctx: &Context, @@ -29,12 +31,12 @@ pub async fn input_rune_balances_from_tx_inputs( let mut indexed_input_runes = HashMap::new(); let mut cache_misses = vec![]; - // Look in both current transaction output cache and in long term LRU cache. + // Look in both current block output cache and in long term LRU cache. for (i, input) in tx_inputs.iter().enumerate() { let tx_id = input.previous_output.txid.hash[2..].to_string(); let vout = input.previous_output.vout; let k = (tx_id.clone(), vout); - if let Some(map) = tx_output_cache.get(&k) { + if let Some(map) = block_output_cache.get(&k) { indexed_input_runes.insert(i as u32, map.clone()); } else if let Some(map) = output_cache.get(&k) { indexed_input_runes.insert(i as u32, map.clone()); @@ -43,7 +45,7 @@ pub async fn input_rune_balances_from_tx_inputs( } } // Look for cache misses in database. We don't need to `flush` the DB cache here because we've already looked in the current - // transaction's output cache. + // block's output cache. if cache_misses.len() > 0 { let output_balances = pg_get_input_rune_balances(cache_misses, db_tx, ctx).await; indexed_input_runes.extend(output_balances); @@ -65,15 +67,14 @@ pub async fn input_rune_balances_from_tx_inputs( final_input_runes } -/// Moves data from the current transaction's output cache to the long-term LRU output cache. Clears the tx output cache when -/// done. -pub fn move_tx_output_cache_to_output_cache( - tx_output_cache: &mut HashMap<(String, u32), HashMap>>, +/// Moves data from the current block's output cache to the long-term LRU output cache. Clears the block output cache when done. +pub fn move_block_output_cache_to_output_cache( + block_output_cache: &mut HashMap<(String, u32), HashMap>>, output_cache: &mut LruCache<(String, u32), HashMap>>, ) { - for (k, tx_output_map) in tx_output_cache.iter() { + for (k, block_output_map) in block_output_cache.iter() { if let Some(v) = output_cache.get_mut(&k) { - for (rune_id, balances) in tx_output_map.iter() { + for (rune_id, balances) in block_output_map.iter() { if let Some(rune_balance) = v.get_mut(&rune_id) { rune_balance.extend(balances.clone()); } else { @@ -81,10 +82,10 @@ pub fn move_tx_output_cache_to_output_cache( } } } else { - output_cache.push(k.clone(), tx_output_map.clone()); + output_cache.push(k.clone(), block_output_map.clone()); } } - tx_output_cache.clear(); + block_output_cache.clear(); } /// Creates a new ledger entry. @@ -247,7 +248,11 @@ pub fn move_rune_balance_to_output( } /// Determines if a mint is valid depending on the rune's mint terms. -pub fn is_rune_mintable(db_rune: &DbRune, total_mints: u128, location: &TransactionLocation) -> bool { +pub fn is_rune_mintable( + db_rune: &DbRune, + total_mints: u128, + location: &TransactionLocation, +) -> bool { if db_rune.terms_amount.is_none() { return false; } @@ -293,7 +298,8 @@ mod test { transaction_cache::InputRuneBalance, transaction_location::TransactionLocation, utils::move_rune_balance_to_output, }, - models::{db_ledger_operation::DbLedgerOperation, db_rune::DbRune}, types::{pg_numeric_u128::PgNumericU128, pg_numeric_u64::PgNumericU64}, + models::{db_ledger_operation::DbLedgerOperation, db_rune::DbRune}, + types::{pg_numeric_u128::PgNumericU128, pg_numeric_u64::PgNumericU64}, }; use super::is_rune_mintable; diff --git a/src/db/index.rs b/src/db/index.rs index f578b7e..2929718 100644 --- a/src/db/index.rs +++ b/src/db/index.rs @@ -1,3 +1,5 @@ +use std::collections::HashMap; + use bitcoin::absolute::LockTime; use bitcoin::transaction::TxOut; use bitcoin::Network; @@ -9,6 +11,7 @@ use ordinals::Artifact; use ordinals::Runestone; use tokio_postgres::Client; +use crate::db::cache::transaction_location::TransactionLocation; use crate::db::pg_roll_back_block; use crate::try_info; @@ -24,26 +27,40 @@ pub fn get_rune_genesis_block_height(network: Network) -> u64 { } } -/// Transforms a Bitcoin transaction from a Chainhook format to a rust bitcoin format so it can be consumed by ord. +/// Transforms a Bitcoin transaction from a Chainhook format to a rust bitcoin format so it can be consumed by ord. Also, takes +/// all non-OP_RETURN outputs and returns them so they can be used later to receive runes. fn bitcoin_tx_from_chainhook_tx( block: &BitcoinBlockData, tx: &BitcoinTransactionData, -) -> Transaction { - Transaction { - version: 2, - lock_time: LockTime::from_time(block.timestamp).unwrap(), - // Inputs don't matter for Runestone parsing. - input: vec![], - output: tx - .metadata - .outputs - .iter() - .map(|output| TxOut { - value: output.value, - script_pubkey: ScriptBuf::from_bytes(output.get_script_pubkey_bytes()), - }) - .collect(), +) -> (Transaction, HashMap, Option, u32) { + let mut outputs = vec![]; + let mut eligible_outputs = HashMap::new(); + let mut first_eligible_output: Option = None; + for (i, output) in tx.metadata.outputs.iter().enumerate() { + let script = ScriptBuf::from_bytes(output.get_script_pubkey_bytes()); + if !script.is_op_return() { + eligible_outputs.insert(i as u32, script.clone()); + if first_eligible_output.is_none() { + first_eligible_output = Some(i as u32); + } + } + outputs.push(TxOut { + value: output.value, + script_pubkey: script, + }); } + ( + Transaction { + version: 2, + lock_time: LockTime::from_time(block.timestamp).unwrap(), + // Inputs don't matter for Runestone parsing. + input: vec![], + output: outputs, + }, + eligible_outputs, + first_eligible_output, + tx.metadata.outputs.len() as u32, + ) } /// Index a Bitcoin block for runes data. @@ -64,23 +81,34 @@ pub async fn index_block( .expect("Unable to begin block processing pg transaction"); index_cache.reset_max_rune_number(&mut db_tx, ctx).await; for tx in block.transactions.iter() { - let transaction = bitcoin_tx_from_chainhook_tx(block, tx); + let (transaction, eligible_outputs, first_eligible_output, total_outputs) = + bitcoin_tx_from_chainhook_tx(block, tx); let tx_index = tx.metadata.index; let tx_id = &tx.transaction_identifier.hash; + let location = TransactionLocation { + network: index_cache.network, + block_hash: block_hash.clone(), + block_height, + tx_index, + tx_id: tx_id.clone(), + timestamp: block.timestamp, + }; index_cache - .begin_transaction(block_hash, block_height, tx_index, tx_id, block.timestamp) + .begin_transaction( + location, + &tx.metadata.inputs, + eligible_outputs, + first_eligible_output, + total_outputs, + &mut db_tx, + ctx, + ) .await; if let Some(artifact) = Runestone::decipher(&transaction) { match artifact { Artifact::Runestone(runestone) => { index_cache - .apply_runestone( - &runestone, - &tx.metadata.inputs, - &tx.metadata.outputs, - &mut db_tx, - ctx, - ) + .apply_runestone(&runestone, &mut db_tx, ctx) .await; if let Some(etching) = runestone.etching { index_cache.apply_etching(&etching, &mut db_tx, ctx).await; @@ -93,9 +121,7 @@ pub async fn index_block( } } Artifact::Cenotaph(cenotaph) => { - index_cache - .apply_cenotaph(&cenotaph, &tx.metadata.inputs, &mut db_tx, ctx) - .await; + index_cache.apply_cenotaph(&cenotaph, &mut db_tx, ctx).await; if let Some(etching) = cenotaph.etching { index_cache .apply_cenotaph_etching(&etching, &mut db_tx, ctx) @@ -111,6 +137,7 @@ pub async fn index_block( } index_cache.end_transaction(&mut db_tx, ctx); } + index_cache.end_block(); index_cache.db_cache.flush(&mut db_tx, ctx).await; db_tx .commit()