Skip to content

Commit

Permalink
feat(ckbtc): rotate KYT RPC providers in case of transient error (#1864)
Browse files Browse the repository at this point in the history
XC-159: Support multiple RPC providers

Support multiple bitcoin mainnet RPC providers in the KYT canister. Each
RPC call will go to one provider, and the next call will go to the next
provider and so on, in a round robin manner.

When an RPC call returns http error code, it is treated as a transient
error so that the same call can be tried again with the next provider.

Because RPC errors can occur for all providers indefinitely, it is up to
the caller to decide whether or not to call `check_transaction` again.

---------

Co-authored-by: gregorydemay <[email protected]>
  • Loading branch information
ninegua and gregorydemay authored Oct 14, 2024
1 parent 7afabdc commit 094f99d
Show file tree
Hide file tree
Showing 7 changed files with 294 additions and 48 deletions.
2 changes: 1 addition & 1 deletion rs/bitcoin/kyt/src/dashboard.rs
Original file line number Diff line number Diff line change
Expand Up @@ -113,7 +113,7 @@ pub fn dashboard(page_index: usize) -> DashboardTemplate {
match status {
FetchTxStatus::PendingOutcall => Status::PendingOutcall,
FetchTxStatus::PendingRetry { .. } => Status::PendingRetry,
FetchTxStatus::Error(err) => Status::Error(format!("{:?}", err)),
FetchTxStatus::Error(err) => Status::Error(format!("{:?}", err.error)),
FetchTxStatus::Fetched(fetched) => {
// Return an empty list if no input address is available yet.
let input_addresses =
Expand Down
55 changes: 43 additions & 12 deletions rs/bitcoin/kyt/src/fetch.rs
Original file line number Diff line number Diff line change
@@ -1,6 +1,9 @@
use crate::state::{FetchGuardError, FetchTxStatus, FetchedTx, HttpGetTxError, TransactionKytData};
use crate::state::{
FetchGuardError, FetchTxStatus, FetchTxStatusError, FetchedTx, HttpGetTxError,
TransactionKytData,
};
use crate::types::{CheckTransactionResponse, CheckTransactionRetriable, CheckTransactionStatus};
use crate::{blocklist_contains, state};
use crate::{blocklist_contains, providers, state, BtcNetwork};
use bitcoin::Transaction;
use futures::future::try_join_all;
use ic_btc_interface::Txid;
Expand Down Expand Up @@ -44,7 +47,6 @@ pub enum FetchResult {
pub enum TryFetchResult<F> {
Pending,
HighLoad,
Error(HttpGetTxError),
NotEnoughCycles,
Fetched(FetchedTx),
ToFetch(F),
Expand All @@ -54,8 +56,11 @@ pub enum TryFetchResult<F> {
pub trait FetchEnv {
type FetchGuard;
fn new_fetch_guard(&self, txid: Txid) -> Result<Self::FetchGuard, FetchGuardError>;
fn btc_network(&self) -> BtcNetwork;

async fn http_get_tx(
&self,
provider: providers::Provider,
txid: Txid,
max_response_bytes: u32,
) -> Result<Transaction, HttpGetTxError>;
Expand All @@ -71,13 +76,24 @@ pub trait FetchEnv {
&self,
txid: Txid,
) -> TryFetchResult<impl futures::Future<Output = Result<FetchResult, Infallible>>> {
let max_response_bytes = match state::get_fetch_status(txid) {
None => INITIAL_MAX_RESPONSE_BYTES,
let (provider, max_response_bytes) = match state::get_fetch_status(txid) {
None => (
providers::next_provider(self.btc_network()),
INITIAL_MAX_RESPONSE_BYTES,
),
Some(FetchTxStatus::PendingRetry {
max_response_bytes, ..
}) => max_response_bytes,
}) => (
providers::next_provider(self.btc_network()),
max_response_bytes,
),
Some(FetchTxStatus::PendingOutcall { .. }) => return TryFetchResult::Pending,
Some(FetchTxStatus::Error(msg)) => return TryFetchResult::Error(msg),
Some(FetchTxStatus::Error(err)) => (
// An FetchTxStatus error can be retried with another provider
err.provider.next(),
// The next provider can use the same max_response_bytes
err.max_response_bytes,
),
Some(FetchTxStatus::Fetched(fetched)) => return TryFetchResult::Fetched(fetched),
};
let guard = match self.new_fetch_guard(txid) {
Expand All @@ -88,7 +104,7 @@ pub trait FetchEnv {
if self.cycles_accept(cycle_cost) < cycle_cost {
TryFetchResult::NotEnoughCycles
} else {
TryFetchResult::ToFetch(self.fetch_tx(guard, txid, max_response_bytes))
TryFetchResult::ToFetch(self.fetch_tx(guard, provider, txid, max_response_bytes))
}
}

Expand All @@ -104,10 +120,11 @@ pub trait FetchEnv {
async fn fetch_tx(
&self,
_guard: Self::FetchGuard,
provider: providers::Provider,
txid: Txid,
max_response_bytes: u32,
) -> Result<FetchResult, Infallible> {
match self.http_get_tx(txid, max_response_bytes).await {
match self.http_get_tx(provider, txid, max_response_bytes).await {
Ok(tx) => {
let input_addresses = tx.input.iter().map(|_| None).collect();
match TransactionKytData::try_from(tx) {
Expand All @@ -121,7 +138,14 @@ pub trait FetchEnv {
}
Err(err) => {
let err = HttpGetTxError::TxEncoding(err.to_string());
state::set_fetch_status(txid, FetchTxStatus::Error(err.clone()));
state::set_fetch_status(
txid,
FetchTxStatus::Error(FetchTxStatusError {
provider,
max_response_bytes,
error: err.clone(),
}),
);
Ok(FetchResult::Error(err))
}
}
Expand All @@ -138,7 +162,14 @@ pub trait FetchEnv {
Ok(FetchResult::RetryWithBiggerBuffer)
}
Err(err) => {
state::set_fetch_status(txid, FetchTxStatus::Error(err.clone()));
state::set_fetch_status(
txid,
FetchTxStatus::Error(FetchTxStatusError {
provider,
max_response_bytes,
error: err.clone(),
}),
);
Ok(FetchResult::Error(err))
}
}
Expand Down Expand Up @@ -191,7 +222,7 @@ pub trait FetchEnv {
state::set_fetched_address(txid, index, address.clone());
}
Pending => continue,
HighLoad | NotEnoughCycles | Error(_) => break,
HighLoad | NotEnoughCycles => break,
}
}
}
Expand Down
69 changes: 58 additions & 11 deletions rs/bitcoin/kyt/src/fetch/tests.rs
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
use super::*;
use crate::types::BtcNetwork;
use crate::{blocklist, CheckTransactionIrrecoverableError};
use crate::{
blocklist, providers::Provider, types::BtcNetwork, CheckTransactionIrrecoverableError,
};
use bitcoin::{
absolute::LockTime, address::Address, hashes::Hash, transaction::Version, Amount, OutPoint,
PubkeyHash, ScriptBuf, Sequence, Transaction, TxIn, TxOut, Witness,
Expand All @@ -18,6 +19,7 @@ struct MockEnv {
replies: RefCell<VecDeque<Result<Transaction, HttpGetTxError>>>,
available_cycles: RefCell<u128>,
accepted_cycles: RefCell<u128>,
called_provider: RefCell<Option<Provider>>,
}

impl FetchEnv for MockEnv {
Expand All @@ -30,19 +32,26 @@ impl FetchEnv for MockEnv {
Ok(())
}
}

fn btc_network(&self) -> BtcNetwork {
BtcNetwork::Mainnet
}

async fn http_get_tx(
&self,
provider: Provider,
txid: Txid,
max_response_bytes: u32,
) -> Result<Transaction, HttpGetTxError> {
self.calls
.borrow_mut()
.push_back((txid, max_response_bytes));
*self.called_provider.borrow_mut() = Some(provider);
self.replies
.borrow_mut()
.pop_front()
.unwrap_or(Err(HttpGetTxError::Rejected {
code: RejectionCode::from(0),
code: RejectionCode::SysTransient,
message: "no more reply".to_string(),
}))
}
Expand All @@ -67,6 +76,7 @@ impl MockEnv {
replies: RefCell::new(VecDeque::new()),
available_cycles: RefCell::new(available_cycles),
accepted_cycles: RefCell::new(0),
called_provider: RefCell::new(None),
}
}
fn assert_get_tx_call(&self, txid: Txid, max_response_bytes: u32) {
Expand Down Expand Up @@ -137,6 +147,7 @@ fn mock_transaction_with_inputs(input_txids: Vec<(Txid, u32)>) -> Transaction {
async fn test_mock_env() {
// Test cycle mock functions
let env = MockEnv::new(CHECK_TRANSACTION_CYCLES_REQUIRED);
let provider = providers::next_provider(env.btc_network());
assert_eq!(
env.cycles_accept(CHECK_TRANSACTION_CYCLES_SERVICE_FEE),
CHECK_TRANSACTION_CYCLES_SERVICE_FEE
Expand All @@ -155,7 +166,9 @@ async fn test_mock_env() {
let env = MockEnv::new(0);
let txid = mock_txid(0);
env.expect_get_tx_with_reply(Ok(mock_transaction()));
let result = env.http_get_tx(txid, INITIAL_MAX_RESPONSE_BYTES).await;
let result = env
.http_get_tx(provider, txid, INITIAL_MAX_RESPONSE_BYTES)
.await;
assert!(result.is_ok());
env.assert_get_tx_call(txid, INITIAL_MAX_RESPONSE_BYTES);
env.assert_no_more_get_tx_call();
Expand Down Expand Up @@ -210,6 +223,7 @@ fn test_try_fetch_tx() {
#[tokio::test]
async fn test_fetch_tx() {
let env = MockEnv::new(CHECK_TRANSACTION_CYCLES_REQUIRED);
let provider = providers::next_provider(env.btc_network());
let txid_0 = mock_txid(0);
let txid_1 = mock_txid(1);
let txid_2 = mock_txid(2);
Expand All @@ -218,7 +232,9 @@ async fn test_fetch_tx() {
let tx_0 = mock_transaction_with_inputs(vec![(txid_1, 0), (txid_2, 1)]);

env.expect_get_tx_with_reply(Ok(tx_0.clone()));
let result = env.fetch_tx((), txid_0, INITIAL_MAX_RESPONSE_BYTES).await;
let result = env
.fetch_tx((), provider, txid_0, INITIAL_MAX_RESPONSE_BYTES)
.await;
assert!(matches!(result, Ok(FetchResult::Fetched(_))));
assert!(matches!(
state::get_fetch_status(txid_0),
Expand All @@ -233,7 +249,9 @@ async fn test_fetch_tx() {

// case RetryWithBiggerBuffer
env.expect_get_tx_with_reply(Err(HttpGetTxError::ResponseTooLarge));
let result = env.fetch_tx((), txid_1, INITIAL_MAX_RESPONSE_BYTES).await;
let result = env
.fetch_tx((), provider, txid_1, INITIAL_MAX_RESPONSE_BYTES)
.await;
assert!(matches!(result, Ok(FetchResult::RetryWithBiggerBuffer)));
assert!(matches!(
state::get_fetch_status(txid_1),
Expand All @@ -243,23 +261,25 @@ async fn test_fetch_tx() {
env.expect_get_tx_with_reply(Err(HttpGetTxError::TxEncoding(
"failed to decode tx".to_string(),
)));
let result = env.fetch_tx((), txid_2, INITIAL_MAX_RESPONSE_BYTES).await;
let result = env
.fetch_tx((), provider, txid_2, INITIAL_MAX_RESPONSE_BYTES)
.await;
assert!(matches!(
result,
Ok(FetchResult::Error(HttpGetTxError::TxEncoding(_)))
));
assert!(matches!(
state::get_fetch_status(txid_2),
Some(FetchTxStatus::Error(HttpGetTxError::TxEncoding(_)))
Some(FetchTxStatus::Error(FetchTxStatusError {
error: HttpGetTxError::TxEncoding(_),
..
}))
));
}

#[tokio::test]
async fn test_check_fetched() {
let mut env = MockEnv::new(CHECK_TRANSACTION_CYCLES_REQUIRED);
state::set_config(state::Config {
btc_network: BtcNetwork::Mainnet,
});
let good_address = Address::from_str("12cbQLTFMXRnSzktFkuoG3eHoMeFtpTu3S")
.unwrap()
.assume_checked();
Expand Down Expand Up @@ -461,4 +481,31 @@ async fn test_check_fetched() {
- get_tx_cycle_cost(INITIAL_MAX_RESPONSE_BYTES) * 2
- get_tx_cycle_cost(RETRY_MAX_RESPONSE_BYTES)
);

// case HttpGetTxError can be retried.
let remaining_cycles = env.cycles_available();
let provider = providers::next_provider(env.btc_network());
state::set_fetch_status(
txid_2,
FetchTxStatus::Error(FetchTxStatusError {
provider,
max_response_bytes: RETRY_MAX_RESPONSE_BYTES,
error: HttpGetTxError::Rejected {
code: RejectionCode::SysTransient,
message: "no more reply".to_string(),
},
}),
);
env.expect_get_tx_with_reply(Ok(tx_2.clone()));
assert!(matches!(
env.check_fetched(txid_0, &fetched).await,
CheckTransactionResponse::Passed
));
// check if provider has been rotated
assert!(*env.called_provider.borrow() == Some(provider.next()));
// Check remaining cycle. The cost should match RETRY_MAX_RESPONSE_BYTES
assert_eq!(
env.cycles_available(),
remaining_cycles - get_tx_cycle_cost(RETRY_MAX_RESPONSE_BYTES)
);
}
17 changes: 8 additions & 9 deletions rs/bitcoin/kyt/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -54,26 +54,26 @@ impl FetchEnv for KytCanisterEnv {
state::FetchGuard::new(txid)
}

fn btc_network(&self) -> BtcNetwork {
get_config().btc_network
}

async fn http_get_tx(
&self,
provider: providers::Provider,
txid: Txid,
max_response_bytes: u32,
) -> Result<Transaction, HttpGetTxError> {
// TODO(XC-159): Support multiple providers
let request = providers::create_request(get_config().btc_network, txid, max_response_bytes);
let request = provider.create_request(txid, max_response_bytes);
let url = request.url.clone();
let cycles = get_tx_cycle_cost(max_response_bytes);
match http_request(request, cycles).await {
Ok((response,)) => {
// Ensure response is 200 before decoding
if response.status != 200u32 {
let code = if response.status == 429u32 {
RejectionCode::SysTransient
} else {
RejectionCode::SysFatal
};
// All non-200 status are treated as transient errors
return Err(HttpGetTxError::Rejected {
code,
code: RejectionCode::SysTransient,
message: format!("HTTP GET {} received code {}", url, response.status),
});
}
Expand Down Expand Up @@ -129,7 +129,6 @@ pub async fn check_transaction_inputs(txid: Txid) -> CheckTransactionResponse {
match env.try_fetch_tx(txid) {
TryFetchResult::Pending => CheckTransactionRetriable::Pending.into(),
TryFetchResult::HighLoad => CheckTransactionRetriable::HighLoad.into(),
TryFetchResult::Error(err) => (txid, err).into(),
TryFetchResult::NotEnoughCycles => CheckTransactionStatus::NotEnoughCycles.into(),
TryFetchResult::Fetched(fetched) => env.check_fetched(txid, &fetched).await,
TryFetchResult::ToFetch(do_fetch) => {
Expand Down
Loading

0 comments on commit 094f99d

Please sign in to comment.