Skip to content

Commit

Permalink
Merge pull request #17 from hirosystems/develop
Browse files Browse the repository at this point in the history
release beta
  • Loading branch information
rafaelcr authored Jul 5, 2024
2 parents 5c35603 + b87b921 commit abb4a40
Show file tree
Hide file tree
Showing 7 changed files with 357 additions and 228 deletions.
3 changes: 2 additions & 1 deletion src/db/cache/db_cache.rs
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@ use crate::{
pg_insert_balance_changes, pg_insert_ledger_entries, pg_insert_runes,
pg_insert_supply_changes,
},
try_debug,
try_debug, try_info,
};

/// Holds rows that have yet to be inserted into the database.
Expand All @@ -37,6 +37,7 @@ impl DbCache {

/// Insert all data into the DB and clear cache.
pub async fn flush(&mut self, db_tx: &mut Transaction<'_>, ctx: &Context) {
try_info!(ctx, "Flushing DB cache...");
if self.runes.len() > 0 {
try_debug!(ctx, "Flushing {} runes", self.runes.len());
let _ = pg_insert_runes(&self.runes, db_tx, ctx).await;
Expand Down
133 changes: 57 additions & 76 deletions src/db/cache/index_cache.rs
Original file line number Diff line number Diff line change
@@ -1,8 +1,4 @@
use std::{
collections::{HashMap, VecDeque},
num::NonZeroUsize,
str::FromStr,
};
use std::{collections::HashMap, num::NonZeroUsize, str::FromStr};

use bitcoin::Network;
use chainhook_sdk::{
Expand All @@ -16,20 +12,21 @@ use tokio_postgres::{Client, Transaction};
use crate::{
config::Config,
db::{
cache::utils::input_rune_balances_from_tx_inputs,
models::{
db_balance_change::DbBalanceChange, db_ledger_entry::DbLedgerEntry,
db_ledger_operation::DbLedgerOperation, db_rune::DbRune,
db_supply_change::DbSupplyChange,
},
pg_get_max_rune_number, pg_get_missed_input_rune_balances, pg_get_rune_by_id,
pg_get_rune_total_mints,
pg_get_max_rune_number, pg_get_rune_by_id, pg_get_rune_total_mints,
},
try_debug, try_info, try_warn,
};

use super::{
db_cache::DbCache,
transaction_cache::{InputRuneBalance, TransactionCache},
utils::move_tx_output_cache_to_output_cache,
};

/// Holds rune data across multiple blocks for faster computations. Processes rune events as they happen during transactions and
Expand All @@ -44,6 +41,9 @@ pub struct IndexCache {
rune_total_mints_cache: LruCache<RuneId, u128>,
/// LRU cache for outputs with rune balances.
output_cache: LruCache<(String, u32), HashMap<RuneId, Vec<InputRuneBalance>>>,
/// Same as above but only for the current transaction. We use a `HashMap` instead of an LRU cache to make sure we keep all
/// outputs in memory while we index this transaction. Must be cleared every time a new transaction is processed.
tx_output_cache: HashMap<(String, u32), HashMap<RuneId, Vec<InputRuneBalance>>>,
/// Holds a single transaction's rune cache. Must be cleared every time a new transaction is processed.
tx_cache: TransactionCache,
/// Keeps rows that have not yet been inserted in the DB.
Expand All @@ -60,6 +60,7 @@ impl IndexCache {
rune_cache: LruCache::new(cap),
rune_total_mints_cache: LruCache::new(cap),
output_cache: LruCache::new(cap),
tx_output_cache: HashMap::new(),
tx_cache: TransactionCache::new(network, &"".to_string(), 1, 0, &"".to_string(), 0),
db_cache: DbCache::new(),
}
Expand All @@ -86,12 +87,14 @@ impl IndexCache {
tx_id,
timestamp,
);
self.tx_output_cache.clear();
}

/// Finalizes the current transaction index cache.
pub fn end_transaction(&mut self, _db_tx: &mut Transaction<'_>, ctx: &Context) {
let entries = self.tx_cache.allocate_remaining_balances(ctx);
self.add_ledger_entries_to_db_cache(&entries);
move_tx_output_cache_to_output_cache(&mut self.tx_output_cache, &mut self.output_cache);
}

pub async fn apply_runestone(
Expand All @@ -103,7 +106,15 @@ impl IndexCache {
ctx: &Context,
) {
try_debug!(ctx, "{:?} {}", runestone, self.tx_cache.location);
self.scan_tx_input_rune_balance(tx_inputs, db_tx, ctx).await;
let input_balances = input_rune_balances_from_tx_inputs(
tx_inputs,
&self.tx_output_cache,
&mut self.output_cache,
db_tx,
ctx,
)
.await;
self.tx_cache.set_input_rune_balances(input_balances, ctx);
self.tx_cache
.apply_runestone_pointer(runestone, tx_outputs, ctx);
}
Expand All @@ -116,7 +127,15 @@ impl IndexCache {
ctx: &Context,
) {
try_debug!(ctx, "{:?} {}", cenotaph, self.tx_cache.location);
self.scan_tx_input_rune_balance(tx_inputs, db_tx, ctx).await;
let input_balances = input_rune_balances_from_tx_inputs(
tx_inputs,
&self.tx_output_cache,
&mut self.output_cache,
db_tx,
ctx,
)
.await;
self.tx_cache.set_input_rune_balances(input_balances, ctx);
let entries = self.tx_cache.apply_cenotaph_input_burn(cenotaph);
self.add_ledger_entries_to_db_cache(&entries);
}
Expand Down Expand Up @@ -298,60 +317,24 @@ impl IndexCache {
return Some(total);
}

/// Takes all transaction inputs and transform them into rune balances to be allocated.
async fn scan_tx_input_rune_balance(
&mut self,
tx_inputs: &Vec<TxIn>,
db_tx: &mut Transaction<'_>,
ctx: &Context,
) {
// Maps input index to all of its rune balances. Useful in order to keep rune inputs in order.
let mut indexed_input_runes = HashMap::new();

// Look in memory cache.
let mut cache_misses = vec![];
for (i, input) in tx_inputs.iter().enumerate() {
let tx_id = input.previous_output.txid.hash[2..].to_string();
let vout = input.previous_output.vout;
if let Some(map) = self.output_cache.get(&(tx_id.clone(), vout)) {
indexed_input_runes.insert(i as u32, map.clone());
} else {
cache_misses.push((i as u32, tx_id, vout));
}
}

// Look for misses in database.
if cache_misses.len() > 0 {
// self.db_cache.flush(db_tx, ctx).await;
let output_balances = pg_get_missed_input_rune_balances(cache_misses, db_tx, ctx).await;
indexed_input_runes.extend(output_balances);
}

let mut final_input_runes: HashMap<RuneId, VecDeque<InputRuneBalance>> = HashMap::new();
let mut input_keys: Vec<u32> = indexed_input_runes.keys().copied().collect();
input_keys.sort();
for key in input_keys.iter() {
let input_value = indexed_input_runes.get(key).unwrap();
for (rune_id, vec) in input_value.iter() {
if let Some(rune) = final_input_runes.get_mut(rune_id) {
rune.extend(vec.clone());
} else {
final_input_runes.insert(*rune_id, VecDeque::from(vec.clone()));
}
}
}

self.tx_cache
.set_input_rune_balances(final_input_runes, ctx);
}

/// Take ledger entries returned by the `TransactionCache` and add them to the `DbCache`. Update global balances and counters
/// as well.
fn add_ledger_entries_to_db_cache(&mut self, entries: &Vec<DbLedgerEntry>) {
self.db_cache.ledger_entries.extend(entries.clone());
for entry in entries.iter() {
match entry.operation {
DbLedgerOperation::Etching => {}
DbLedgerOperation::Etching => {
self.db_cache
.supply_changes
.entry(entry.rune_id.clone())
.and_modify(|i| {
i.total_operations += 1;
})
.or_insert(DbSupplyChange::from_operation(
entry.rune_id.clone(),
entry.block_height.clone(),
));
}
DbLedgerOperation::Mint => {
self.db_cache
.supply_changes
Expand Down Expand Up @@ -424,25 +407,23 @@ impl IndexCache {
address,
entry.amount.unwrap(),
));
}

// Add to output LRU cache if it's received balance.
let k = (entry.tx_id.clone(), entry.output.unwrap().0);
let rune_id = RuneId::from_str(entry.rune_id.as_str()).unwrap();
let balance = InputRuneBalance {
address: entry.address.clone(),
amount: entry.amount.unwrap().0,
};
if let Some(v) = self.output_cache.get_mut(&k) {
if let Some(rune_balance) = v.get_mut(&rune_id) {
rune_balance.push(balance);
} else {
v.insert(rune_id, vec![balance]);
}
} else {
let mut v = HashMap::new();
v.insert(rune_id, vec![balance]);
self.output_cache.push(k, v);
// Add to current transaction's output cache if it's received balance.
let k = (entry.tx_id.clone(), entry.output.unwrap().0);
let rune_id = RuneId::from_str(entry.rune_id.as_str()).unwrap();
let balance = InputRuneBalance {
address: entry.address.clone(),
amount: entry.amount.unwrap().0,
};
let mut default = HashMap::new();
default.insert(rune_id, vec![balance.clone()]);
self.tx_output_cache
.entry(k)
.and_modify(|i| {
i.entry(rune_id)
.and_modify(|v| v.push(balance.clone()))
.or_insert(vec![balance]);
})
.or_insert(default);
}
}
}
Expand Down
1 change: 1 addition & 0 deletions src/db/cache/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,3 +2,4 @@ pub mod db_cache;
pub mod index_cache;
pub mod transaction_cache;
pub mod transaction_location;
pub mod utils;
134 changes: 134 additions & 0 deletions src/db/cache/utils.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,134 @@
use std::collections::{HashMap, VecDeque};

use chainhook_sdk::{types::bitcoin::TxIn, utils::Context};
use lru::LruCache;
use ordinals::RuneId;
use tokio_postgres::Transaction;

use crate::db::pg_get_input_rune_balances;

use super::transaction_cache::InputRuneBalance;

/// Takes all transaction inputs and transforms them into rune balances to be allocated for operations. Looks inside an output LRU
/// cache and the DB when there are cache misses.
pub async fn input_rune_balances_from_tx_inputs(
tx_inputs: &Vec<TxIn>,
tx_output_cache: &HashMap<(String, u32), HashMap<RuneId, Vec<InputRuneBalance>>>,
output_cache: &mut LruCache<(String, u32), HashMap<RuneId, Vec<InputRuneBalance>>>,
db_tx: &mut Transaction<'_>,
ctx: &Context,
) -> HashMap<RuneId, VecDeque<InputRuneBalance>> {
// Maps input index to all of its rune balances. Useful in order to keep rune inputs in order.
let mut indexed_input_runes = HashMap::new();
let mut cache_misses = vec![];

// Look in both current transaction output cache and in long term LRU cache.
for (i, input) in tx_inputs.iter().enumerate() {
let tx_id = input.previous_output.txid.hash[2..].to_string();
let vout = input.previous_output.vout;
let k = (tx_id.clone(), vout);
if let Some(map) = tx_output_cache.get(&k) {
indexed_input_runes.insert(i as u32, map.clone());
} else if let Some(map) = output_cache.get(&k) {
indexed_input_runes.insert(i as u32, map.clone());
} else {
cache_misses.push((i as u32, tx_id, vout));
}
}
// Look for cache misses in database. We don't need to `flush` the DB cache here because we've already looked in the current
// transaction's output cache.
if cache_misses.len() > 0 {
let output_balances = pg_get_input_rune_balances(cache_misses, db_tx, ctx).await;
indexed_input_runes.extend(output_balances);
}

let mut final_input_runes: HashMap<RuneId, VecDeque<InputRuneBalance>> = HashMap::new();
let mut input_keys: Vec<u32> = indexed_input_runes.keys().copied().collect();
input_keys.sort();
for key in input_keys.iter() {
let input_value = indexed_input_runes.get(key).unwrap();
for (rune_id, vec) in input_value.iter() {
if let Some(rune) = final_input_runes.get_mut(rune_id) {
rune.extend(vec.clone());
} else {
final_input_runes.insert(*rune_id, VecDeque::from(vec.clone()));
}
}
}
final_input_runes
}

/// Moves data from the current transaction's output cache to the long-term LRU output cache. Clears the tx output cache when
/// done.
pub fn move_tx_output_cache_to_output_cache(
tx_output_cache: &mut HashMap<(String, u32), HashMap<RuneId, Vec<InputRuneBalance>>>,
output_cache: &mut LruCache<(String, u32), HashMap<RuneId, Vec<InputRuneBalance>>>,
) {
for (k, tx_output_map) in tx_output_cache.iter() {
if let Some(v) = output_cache.get_mut(&k) {
for (rune_id, balances) in tx_output_map.iter() {
if let Some(rune_balance) = v.get_mut(&rune_id) {
rune_balance.extend(balances.clone());
} else {
v.insert(*rune_id, balances.clone());
}
}
} else {
output_cache.push(k.clone(), tx_output_map.clone());
}
}
tx_output_cache.clear();
}

#[cfg(test)]
mod test {
// use std::{collections::HashMap, num::NonZeroUsize, str::FromStr};

// use chainhook_sdk::{
// types::{
// bitcoin::{OutPoint, TxIn},
// TransactionIdentifier,
// },
// utils::Context,
// };
// use lru::LruCache;
// use ordinals::RuneId;

// use crate::db::cache::transaction_cache::InputRuneBalance;

// #[test]
// fn from_output_cache() {
// let tx_inputs = vec![TxIn {
// previous_output: OutPoint {
// txid: TransactionIdentifier {
// hash: "aea76e5ef8135851d0387074cf7672013779e4506e56122e0e698e12ede62681"
// .to_string(),
// },
// vout: 2,
// value: 100,
// block_height: 848300,
// },
// script_sig: "".to_string(),
// sequence: 1,
// witness: vec![],
// }];
// let mut value = HashMap::new();
// value.insert(
// RuneId::from_str("840000:1").unwrap(),
// vec![InputRuneBalance {
// address: Some("1EDYZPvGqKzZYp6DoTtcgXwvSAkA9d9UKU".to_string()),
// amount: 10000,
// }],
// );
// let mut output_cache: LruCache<(String, u32), HashMap<RuneId, Vec<InputRuneBalance>>> =
// LruCache::new(NonZeroUsize::new(2).unwrap());
// output_cache.put(
// (
// "aea76e5ef8135851d0387074cf7672013779e4506e56122e0e698e12ede62681".to_string(),
// 2,
// ),
// value,
// );
// let ctx = Context::empty();
// }
}
Loading

0 comments on commit abb4a40

Please sign in to comment.