Skip to content

Commit

Permalink
Merge branch 'development' into ho_update_payment_id
Browse files Browse the repository at this point in the history
  • Loading branch information
SWvheerden authored Jan 17, 2025
2 parents 0ee24f5 + e5193a8 commit bbe15a8
Show file tree
Hide file tree
Showing 11 changed files with 333 additions and 192 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -771,7 +771,7 @@ impl wallet_server::Wallet for WalletGrpcServer {

let (mut sender, receiver) = mpsc::channel(transactions.len());
task::spawn(async move {
for (i, (_, txn)) in transactions.iter().enumerate() {
for (i, txn) in transactions.iter().enumerate() {
let response = GetCompletedTransactionsResponse {
transaction: Some(TransactionInfo {
tx_id: txn.tx_id.into(),
Expand Down Expand Up @@ -1092,15 +1092,17 @@ async fn handle_pending_outbound(
transaction_service: &mut TransactionServiceHandle,
sender: &mut Sender<Result<TransactionEventResponse, Status>>,
) {
match transaction_service.get_pending_outbound_transactions().await {
Ok(mut txs) => {
if let Some(tx) = txs.remove(&tx_id) {
use models::WalletTransaction::PendingOutbound;
match transaction_service.get_any_transaction(tx_id).await {
Ok(tx) => match tx {
Some(PendingOutbound(tx)) => {
let transaction_event =
convert_to_transaction_event(event.to_string(), TransactionWrapper::Outbound(Box::new(tx)));
convert_to_transaction_event(event.to_string(), TransactionWrapper::Outbound(Box::new(tx.clone())));
send_transaction_event(transaction_event, sender).await;
} else {
},
_ => {
error!(target: LOG_TARGET, "Not found in pending outbound set tx_id: {}", tx_id);
}
},
},
Err(e) => error!(target: LOG_TARGET, "Transaction service error: {}", e),
}
Expand Down
2 changes: 1 addition & 1 deletion applications/minotari_console_wallet/src/notifier/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -196,7 +196,7 @@ impl Notifier {
self.handle.spawn(async move {
match transaction_service.get_pending_outbound_transactions().await {
Ok(txs) => {
if let Some(tx) = txs.get(&tx_id) {
if let Some(tx) = txs.iter().find(|tx| tx.tx_id == tx_id) {
let args = args_from_outbound(tx, event);
let result = Command::new(program).args(&args).output();
let message = WalletEventMessage::Outbound {
Expand Down
21 changes: 4 additions & 17 deletions applications/minotari_console_wallet/src/ui/state/app_state.rs
Original file line number Diff line number Diff line change
Expand Up @@ -726,7 +726,7 @@ impl AppStateInner {
.transaction_service
.get_pending_inbound_transactions()
.await?
.values()
.iter()
.map(|t| CompletedTransaction::from(t.clone()))
.collect::<Vec<CompletedTransaction>>(),
);
Expand All @@ -735,11 +735,10 @@ impl AppStateInner {
.transaction_service
.get_pending_outbound_transactions()
.await?
.values()
.iter()
.map(|t| CompletedTransaction::from(t.clone()))
.collect::<Vec<CompletedTransaction>>(),
);

pending_transactions.sort_by(|a: &CompletedTransaction, b: &CompletedTransaction| {
b.timestamp.partial_cmp(&a.timestamp).unwrap()
});
Expand All @@ -752,26 +751,14 @@ impl AppStateInner {
.collect::<Result<Vec<_>, _>>()?;

let mut completed_transactions: Vec<CompletedTransaction> = Vec::new();
completed_transactions.extend(
self.wallet
.transaction_service
.get_completed_transactions()
.await?
.values()
.cloned()
.collect::<Vec<CompletedTransaction>>(),
);
completed_transactions.extend(self.wallet.transaction_service.get_completed_transactions().await?);

completed_transactions.extend(
self.wallet
.transaction_service
.get_cancelled_completed_transactions()
.await?
.values()
.cloned()
.collect::<Vec<CompletedTransaction>>(),
.await?,
);

completed_transactions.sort_by(|a, b| {
b.timestamp
.partial_cmp(&a.timestamp)
Expand Down
20 changes: 9 additions & 11 deletions base_layer/wallet/src/transaction_service/handle.rs
Original file line number Diff line number Diff line change
Expand Up @@ -417,9 +417,9 @@ pub enum TransactionServiceResponse {
template_registration: Box<CodeTemplateRegistration>,
},
TransactionCancelled,
PendingInboundTransactions(HashMap<TxId, InboundTransaction>),
PendingOutboundTransactions(HashMap<TxId, OutboundTransaction>),
CompletedTransactions(HashMap<TxId, CompletedTransaction>),
PendingInboundTransactions(Vec<InboundTransaction>),
PendingOutboundTransactions(Vec<OutboundTransaction>),
CompletedTransactions(Vec<CompletedTransaction>),
CompletedTransaction(Box<CompletedTransaction>),
BaseNodePublicKeySet,
UtxoImported(TxId),
Expand Down Expand Up @@ -913,7 +913,7 @@ impl TransactionServiceHandle {

pub async fn get_pending_inbound_transactions(
&mut self,
) -> Result<HashMap<TxId, InboundTransaction>, TransactionServiceError> {
) -> Result<Vec<InboundTransaction>, TransactionServiceError> {
match self
.handle
.call(TransactionServiceRequest::GetPendingInboundTransactions)
Expand All @@ -926,7 +926,7 @@ impl TransactionServiceHandle {

pub async fn get_cancelled_pending_inbound_transactions(
&mut self,
) -> Result<HashMap<TxId, InboundTransaction>, TransactionServiceError> {
) -> Result<Vec<InboundTransaction>, TransactionServiceError> {
match self
.handle
.call(TransactionServiceRequest::GetCancelledPendingInboundTransactions)
Expand All @@ -939,7 +939,7 @@ impl TransactionServiceHandle {

pub async fn get_pending_outbound_transactions(
&mut self,
) -> Result<HashMap<TxId, OutboundTransaction>, TransactionServiceError> {
) -> Result<Vec<OutboundTransaction>, TransactionServiceError> {
match self
.handle
.call(TransactionServiceRequest::GetPendingOutboundTransactions)
Expand All @@ -952,7 +952,7 @@ impl TransactionServiceHandle {

pub async fn get_cancelled_pending_outbound_transactions(
&mut self,
) -> Result<HashMap<TxId, OutboundTransaction>, TransactionServiceError> {
) -> Result<Vec<OutboundTransaction>, TransactionServiceError> {
match self
.handle
.call(TransactionServiceRequest::GetCancelledPendingOutboundTransactions)
Expand All @@ -963,9 +963,7 @@ impl TransactionServiceHandle {
}
}

pub async fn get_completed_transactions(
&mut self,
) -> Result<HashMap<TxId, CompletedTransaction>, TransactionServiceError> {
pub async fn get_completed_transactions(&mut self) -> Result<Vec<CompletedTransaction>, TransactionServiceError> {
match self
.handle
.call(TransactionServiceRequest::GetCompletedTransactions)
Expand All @@ -978,7 +976,7 @@ impl TransactionServiceHandle {

pub async fn get_cancelled_completed_transactions(
&mut self,
) -> Result<HashMap<TxId, CompletedTransaction>, TransactionServiceError> {
) -> Result<Vec<CompletedTransaction>, TransactionServiceError> {
match self
.handle
.call(TransactionServiceRequest::GetCancelledCompletedTransactions)
Expand Down
18 changes: 9 additions & 9 deletions base_layer/wallet/src/transaction_service/service.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2780,42 +2780,42 @@ where
>,
) -> Result<(), TransactionServiceError> {
let outbound_txs = self.db.get_pending_outbound_transactions()?;
for (tx_id, tx) in outbound_txs {
for tx in outbound_txs {
let (sender_protocol, stage) = if tx.send_count > 0 {
(None, TransactionSendProtocolStage::WaitForReply)
} else {
(Some(tx.sender_protocol), TransactionSendProtocolStage::Queued)
};
let (not_yet_pending, queued) = (
!self.pending_transaction_reply_senders.contains_key(&tx_id),
!self.pending_transaction_reply_senders.contains_key(&tx.tx_id),
stage == TransactionSendProtocolStage::Queued,
);

if not_yet_pending {
debug!(
target: LOG_TARGET,
"Restarting listening for Reply for Pending Outbound Transaction TxId: {}", tx_id
"Restarting listening for Reply for Pending Outbound Transaction TxId: {}", tx.tx_id
);
} else if queued {
debug!(
target: LOG_TARGET,
"Retry sending queued Pending Outbound Transaction TxId: {}", tx_id
"Retry sending queued Pending Outbound Transaction TxId: {}", tx.tx_id
);
let _sender = self.pending_transaction_reply_senders.remove(&tx_id);
let _sender = self.send_transaction_cancellation_senders.remove(&tx_id);
let _sender = self.pending_transaction_reply_senders.remove(&tx.tx_id);
let _sender = self.send_transaction_cancellation_senders.remove(&tx.tx_id);
} else {
// dont care
}

if not_yet_pending || queued {
let (tx_reply_sender, tx_reply_receiver) = mpsc::channel(100);
let (cancellation_sender, cancellation_receiver) = oneshot::channel();
self.pending_transaction_reply_senders.insert(tx_id, tx_reply_sender);
self.pending_transaction_reply_senders.insert(tx.tx_id, tx_reply_sender);
self.send_transaction_cancellation_senders
.insert(tx_id, cancellation_sender);
.insert(tx.tx_id, cancellation_sender);

let protocol = TransactionSendProtocol::new(
tx_id,
tx.tx_id,
self.resources.clone(),
tx_reply_receiver,
cancellation_receiver,
Expand Down
31 changes: 12 additions & 19 deletions base_layer/wallet/src/transaction_service/storage/database.rs
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,6 @@
// USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.

use std::{
collections::HashMap,
fmt,
fmt::{Display, Error, Formatter},
sync::Arc,
Expand Down Expand Up @@ -245,9 +244,9 @@ pub enum DbValue {
PendingOutboundTransaction(Box<OutboundTransaction>),
PendingInboundTransaction(Box<InboundTransaction>),
CompletedTransaction(Box<CompletedTransaction>),
PendingOutboundTransactions(HashMap<TxId, OutboundTransaction>),
PendingInboundTransactions(HashMap<TxId, InboundTransaction>),
CompletedTransactions(HashMap<TxId, CompletedTransaction>),
PendingOutboundTransactions(Vec<OutboundTransaction>),
PendingInboundTransactions(Vec<InboundTransaction>),
CompletedTransactions(Vec<CompletedTransaction>),
WalletTransaction(Box<WalletTransaction>),
}

Expand Down Expand Up @@ -508,22 +507,20 @@ where T: TransactionBackend + 'static
Ok(*t)
}

pub fn get_pending_inbound_transactions(
&self,
) -> Result<HashMap<TxId, InboundTransaction>, TransactionStorageError> {
pub fn get_pending_inbound_transactions(&self) -> Result<Vec<InboundTransaction>, TransactionStorageError> {
self.get_pending_inbound_transactions_by_cancelled(false)
}

pub fn get_cancelled_pending_inbound_transactions(
&self,
) -> Result<HashMap<TxId, InboundTransaction>, TransactionStorageError> {
) -> Result<Vec<InboundTransaction>, TransactionStorageError> {
self.get_pending_inbound_transactions_by_cancelled(true)
}

fn get_pending_inbound_transactions_by_cancelled(
&self,
cancelled: bool,
) -> Result<HashMap<TxId, InboundTransaction>, TransactionStorageError> {
) -> Result<Vec<InboundTransaction>, TransactionStorageError> {
let key = if cancelled {
DbKey::CancelledPendingInboundTransactions
} else {
Expand All @@ -544,22 +541,20 @@ where T: TransactionBackend + 'static
Ok(t)
}

pub fn get_pending_outbound_transactions(
&self,
) -> Result<HashMap<TxId, OutboundTransaction>, TransactionStorageError> {
pub fn get_pending_outbound_transactions(&self) -> Result<Vec<OutboundTransaction>, TransactionStorageError> {
self.get_pending_outbound_transactions_by_cancelled(false)
}

pub fn get_cancelled_pending_outbound_transactions(
&self,
) -> Result<HashMap<TxId, OutboundTransaction>, TransactionStorageError> {
) -> Result<Vec<OutboundTransaction>, TransactionStorageError> {
self.get_pending_outbound_transactions_by_cancelled(true)
}

fn get_pending_outbound_transactions_by_cancelled(
&self,
cancelled: bool,
) -> Result<HashMap<TxId, OutboundTransaction>, TransactionStorageError> {
) -> Result<Vec<OutboundTransaction>, TransactionStorageError> {
let key = if cancelled {
DbKey::CancelledPendingOutboundTransactions
} else {
Expand Down Expand Up @@ -588,13 +583,11 @@ where T: TransactionBackend + 'static
Ok(address)
}

pub fn get_completed_transactions(&self) -> Result<HashMap<TxId, CompletedTransaction>, TransactionStorageError> {
pub fn get_completed_transactions(&self) -> Result<Vec<CompletedTransaction>, TransactionStorageError> {
self.get_completed_transactions_by_cancelled(false)
}

pub fn get_cancelled_completed_transactions(
&self,
) -> Result<HashMap<TxId, CompletedTransaction>, TransactionStorageError> {
pub fn get_cancelled_completed_transactions(&self) -> Result<Vec<CompletedTransaction>, TransactionStorageError> {
self.get_completed_transactions_by_cancelled(true)
}

Expand All @@ -620,7 +613,7 @@ where T: TransactionBackend + 'static
fn get_completed_transactions_by_cancelled(
&self,
cancelled: bool,
) -> Result<HashMap<TxId, CompletedTransaction>, TransactionStorageError> {
) -> Result<Vec<CompletedTransaction>, TransactionStorageError> {
let key = if cancelled {
DbKey::CancelledCompletedTransactions
} else {
Expand Down
Loading

0 comments on commit bbe15a8

Please sign in to comment.