diff --git a/mm2src/coins/utxo/utxo_common.rs b/mm2src/coins/utxo/utxo_common.rs index 37955e9030..70c8522b58 100644 --- a/mm2src/coins/utxo/utxo_common.rs +++ b/mm2src/coins/utxo/utxo_common.rs @@ -2085,7 +2085,7 @@ pub fn watcher_validate_taker_fee( if tx_confirmed_before_block { return MmError::err(ValidatePaymentError::WrongPaymentTx(format!( "{}: Fee tx {:?} confirmed before min_block {}", - EARLY_CONFIRMATION_ERR_LOG, taker_fee_tx, min_block_number + EARLY_CONFIRMATION_ERR_LOG, tx_from_rpc, min_block_number ))); } diff --git a/mm2src/mm2_main/src/lp_healthcheck.rs b/mm2src/mm2_main/src/lp_healthcheck.rs index 820a5ad619..20a6004c95 100644 --- a/mm2src/mm2_main/src/lp_healthcheck.rs +++ b/mm2src/mm2_main/src/lp_healthcheck.rs @@ -9,6 +9,7 @@ use instant::{Duration, Instant}; use lazy_static::lazy_static; use mm2_core::mm_ctx::MmArc; use mm2_err_handle::prelude::MmError; +use mm2_err_handle::prelude::*; use mm2_libp2p::p2p_ctx::P2PContext; use mm2_libp2p::{decode_message, encode_message, pub_sub_topic, Libp2pPublic, PeerAddress, TopicPrefix}; use ser_error_derive::SerializeErrorType; @@ -16,7 +17,7 @@ use serde::{Deserialize, Serialize}; use std::convert::TryFrom; use std::sync::Mutex; -use crate::lp_network::broadcast_p2p_msg; +use crate::lp_network::{broadcast_p2p_msg, P2PRequestError, P2PRequestResult}; pub(crate) const PEER_HEALTHCHECK_PREFIX: TopicPrefix = "hcheck"; @@ -279,7 +280,10 @@ pub async fn peer_connection_healthcheck_rpc( Ok(rx.timeout(timeout_duration).await == Ok(Ok(()))) } -pub(crate) async fn process_p2p_healthcheck_message(ctx: &MmArc, message: mm2_libp2p::GossipsubMessage) { +pub(crate) async fn process_p2p_healthcheck_message( + ctx: &MmArc, + message: mm2_libp2p::GossipsubMessage, +) -> P2PRequestResult<()> { macro_rules! try_or_return { ($exp:expr, $msg: expr) => { match $exp { @@ -292,24 +296,17 @@ pub(crate) async fn process_p2p_healthcheck_message(ctx: &MmArc, message: mm2_li }; } - let data = try_or_return!( - HealthcheckMessage::decode(&message.data), - "Couldn't decode healthcheck message" - ); + let data = HealthcheckMessage::decode(&message.data) + .map_to_mm(|e| P2PRequestError::DecodeError(format!("Couldn't decode healthcheck message: {}", e)))?; + let sender_peer = data.is_received_message_valid().map_to_mm(|e| { + P2PRequestError::ValidationFailed(format!("Received an invalid healthcheck message. Error: {}", e)) + })?; let ctx = ctx.clone(); // Pass the remaining work to another thread to free up this one as soon as possible, // so KDF can handle a high amount of healthcheck messages more efficiently. ctx.spawner().spawn(async move { - let sender_peer = match data.is_received_message_valid() { - Ok(t) => t, - Err(e) => { - log::error!("Received an invalid healthcheck message. Error: {e}"); - return; - }, - }; - if data.should_reply() { // Reply the message so they know we are healthy. @@ -337,6 +334,8 @@ pub(crate) async fn process_p2p_healthcheck_message(ctx: &MmArc, message: mm2_li }; } }); + + Ok(()) } #[cfg(any(test, target_arch = "wasm32"))] diff --git a/mm2src/mm2_main/src/lp_network.rs b/mm2src/mm2_main/src/lp_network.rs index 08ae5f8b3e..b2ef53f3fb 100644 --- a/mm2src/mm2_main/src/lp_network.rs +++ b/mm2src/mm2_main/src/lp_network.rs @@ -62,6 +62,7 @@ pub enum P2PRequestError { ResponseError(String), #[display(fmt = "Expected 1 response, found {}", _0)] ExpectedSingleResponseError(usize), + ValidationFailed(String), } /// Enum covering error cases that can happen during P2P message processing. @@ -190,15 +191,16 @@ async fn process_p2p_message( to_propagate = true; }, Some(lp_swap::TX_HELPER_PREFIX) => { - if let Some(pair) = split.next() { - if let Ok(Some(coin)) = lp_coinfind(&ctx, pair).await { + if let Some(ticker) = split.next() { + if let Ok(Some(coin)) = lp_coinfind(&ctx, ticker).await { if let Err(e) = coin.tx_enum_from_bytes(&message.data) { log::error!("Message cannot continue the process due to: {:?}", e); return; }; - let fut = coin.send_raw_tx_bytes(&message.data); - ctx.spawner().spawn(async { + if coin.is_utxo_in_native_mode() { + let fut = coin.send_raw_tx_bytes(&message.data); + ctx.spawner().spawn(async { match fut.compat().await { Ok(id) => log::debug!("Transaction broadcasted successfully: {:?} ", id), // TODO (After https://github.com/KomodoPlatform/atomicDEX-API/pull/1433) @@ -207,11 +209,19 @@ async fn process_p2p_message( Err(e) => log::error!("Broadcast transaction failed (ignore this error if the transaction already sent by another seednode). {}", e), }; }) + } } + + to_propagate = true; } }, Some(lp_healthcheck::PEER_HEALTHCHECK_PREFIX) => { - lp_healthcheck::process_p2p_healthcheck_message(&ctx, message).await + if let Err(e) = lp_healthcheck::process_p2p_healthcheck_message(&ctx, message).await { + log::error!("{}", e); + return; + } + + to_propagate = true; }, None | Some(_) => (), } diff --git a/mm2src/mm2_main/src/lp_swap.rs b/mm2src/mm2_main/src/lp_swap.rs index 7692503c18..0acb7fc443 100644 --- a/mm2src/mm2_main/src/lp_swap.rs +++ b/mm2src/mm2_main/src/lp_swap.rs @@ -151,8 +151,11 @@ pub const TX_HELPER_PREFIX: TopicPrefix = "txhlp"; pub(crate) const LEGACY_SWAP_TYPE: u8 = 0; pub(crate) const MAKER_SWAP_V2_TYPE: u8 = 1; pub(crate) const TAKER_SWAP_V2_TYPE: u8 = 2; -const MAX_STARTED_AT_DIFF: u64 = 60; +pub(crate) const TAKER_FEE_VALIDATION_ATTEMPTS: usize = 6; +pub(crate) const TAKER_FEE_VALIDATION_RETRY_DELAY_SECS: f64 = 10.; + +const MAX_STARTED_AT_DIFF: u64 = 60; const NEGOTIATE_SEND_INTERVAL: f64 = 30.; /// If a certain P2P message is not received, swap will be aborted after this time expires. diff --git a/mm2src/mm2_main/src/lp_swap/maker_swap.rs b/mm2src/mm2_main/src/lp_swap/maker_swap.rs index 84c1bbc6aa..0eb72b8a71 100644 --- a/mm2src/mm2_main/src/lp_swap/maker_swap.rs +++ b/mm2src/mm2_main/src/lp_swap/maker_swap.rs @@ -9,7 +9,8 @@ use super::{broadcast_my_swap_status, broadcast_p2p_tx_msg, broadcast_swap_msg_e wait_for_maker_payment_conf_until, AtomicSwap, LockedAmount, MySwapInfo, NegotiationDataMsg, NegotiationDataV2, NegotiationDataV3, RecoveredSwap, RecoveredSwapAction, SavedSwap, SavedSwapIo, SavedTradeFee, SecretHashAlgo, SwapConfirmationsSettings, SwapError, SwapMsg, SwapPubkeys, SwapTxDataMsg, - SwapsContext, TransactionIdentifier, INCLUDE_REFUND_FEE, NO_REFUND_FEE, WAIT_CONFIRM_INTERVAL_SEC}; + SwapsContext, TransactionIdentifier, INCLUDE_REFUND_FEE, NO_REFUND_FEE, TAKER_FEE_VALIDATION_ATTEMPTS, + TAKER_FEE_VALIDATION_RETRY_DELAY_SECS, WAIT_CONFIRM_INTERVAL_SEC}; use crate::lp_dispatcher::{DispatcherContext, LpEvents}; use crate::lp_network::subscribe_to_topic; use crate::lp_ordermatch::MakerOrderBuilder; @@ -771,13 +772,13 @@ impl MakerSwap { { Ok(_) => break, Err(err) => { - if attempts >= 6 { + if attempts >= TAKER_FEE_VALIDATION_ATTEMPTS { return Ok((Some(MakerSwapCommand::Finish), vec![ MakerSwapEvent::TakerFeeValidateFailed(ERRL!("{}", err).into()), ])); } else { attempts += 1; - Timer::sleep(10.).await; + Timer::sleep(TAKER_FEE_VALIDATION_RETRY_DELAY_SECS).await; } }, }; diff --git a/mm2src/mm2_main/src/lp_swap/swap_watcher.rs b/mm2src/mm2_main/src/lp_swap/swap_watcher.rs index 95a1d1e88a..312f62e5c5 100644 --- a/mm2src/mm2_main/src/lp_swap/swap_watcher.rs +++ b/mm2src/mm2_main/src/lp_swap/swap_watcher.rs @@ -1,5 +1,6 @@ use super::{broadcast_p2p_tx_msg, get_payment_locktime, lp_coinfind, taker_payment_spend_deadline, tx_helper_topic, - H256Json, SwapsContext, WAIT_CONFIRM_INTERVAL_SEC}; + H256Json, SwapsContext, TAKER_FEE_VALIDATION_ATTEMPTS, TAKER_FEE_VALIDATION_RETRY_DELAY_SECS, + WAIT_CONFIRM_INTERVAL_SEC}; use crate::lp_network::{P2PRequestError, P2PRequestResult}; use crate::MmError; @@ -181,24 +182,31 @@ impl State for ValidateTakerFee { async fn on_changed(self: Box, watcher_ctx: &mut WatcherStateMachine) -> StateResult { debug!("Watcher validate taker fee"); - let validated_f = watcher_ctx - .taker_coin - .watcher_validate_taker_fee(WatcherValidateTakerFeeInput { - taker_fee_hash: watcher_ctx.data.taker_fee_hash.clone(), - sender_pubkey: watcher_ctx.verified_pub.clone(), - min_block_number: watcher_ctx.data.taker_coin_start_block, - fee_addr: DEX_FEE_ADDR_RAW_PUBKEY.clone(), - lock_duration: watcher_ctx.data.lock_duration, - }) - .compat(); - - if let Err(err) = validated_f.await { - return Self::change_state(Stopped::from_reason(StopReason::Error( - WatcherError::InvalidTakerFee(format!("{:?}", err)).into(), - ))); - }; - Self::change_state(ValidateTakerPayment {}) + let validation_result = retry_on_err!(async { + watcher_ctx + .taker_coin + .watcher_validate_taker_fee(WatcherValidateTakerFeeInput { + taker_fee_hash: watcher_ctx.data.taker_fee_hash.clone(), + sender_pubkey: watcher_ctx.verified_pub.clone(), + min_block_number: watcher_ctx.data.taker_coin_start_block, + fee_addr: DEX_FEE_ADDR_RAW_PUBKEY.clone(), + lock_duration: watcher_ctx.data.lock_duration, + }) + .compat() + .await + }) + .repeat_every_secs(TAKER_FEE_VALIDATION_RETRY_DELAY_SECS) + .attempts(TAKER_FEE_VALIDATION_ATTEMPTS) + .inspect_err(|e| error!("Error validating taker fee: {}", e)) + .await; + + match validation_result { + Ok(_) => Self::change_state(ValidateTakerPayment {}), + Err(repeat_err) => Self::change_state(Stopped::from_reason(StopReason::Error( + WatcherError::InvalidTakerFee(repeat_err.to_string()).into(), + ))), + } } }