Skip to content

Commit

Permalink
Graceful shutdown
Browse files Browse the repository at this point in the history
  • Loading branch information
benthecarman committed Nov 20, 2023
1 parent 394b236 commit da3462b
Showing 1 changed file with 74 additions and 5 deletions.
79 changes: 74 additions & 5 deletions src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -19,22 +19,22 @@ use std::path::Path;
use std::str::FromStr;
use std::sync::Arc;
use std::time::Duration;
use tokio::select;
use tokio::sync::Mutex;
use tokio::signal::unix::{signal, SignalKind};
use tokio::sync::{oneshot, Mutex};
use tokio::{select, spawn};
use tonic_openssl_lnd::lnrpc::invoice::InvoiceState;
use tonic_openssl_lnd::lnrpc::{
ChannelBalanceRequest, GetInfoRequest, GetInfoResponse, Invoice, PaymentHash,
};
use tonic_openssl_lnd::LndLightningClient;
use tonic_openssl_lnd::{LndClient, LndLightningClient};

mod config;
mod payments;

#[tokio::main]
async fn main() -> anyhow::Result<()> {
let config: Config = Config::parse();
let mut keys = get_keys(&config.keys_file);
let tracker = Arc::new(Mutex::new(PaymentTracker::new()));
let keys = get_keys(&config.keys_file);

let mut lnd_client = tonic_openssl_lnd::connect(
config.lnd_host.clone(),
Expand Down Expand Up @@ -64,6 +64,64 @@ async fn main() -> anyhow::Result<()> {

println!("server pubkey: {}", keys.user_keys().public_key().to_hex());

// Set up a oneshot channel to handle shutdown signal
let (tx, rx) = oneshot::channel();

// Spawn a task to listen for shutdown signals
spawn(async move {
let mut term_signal = signal(SignalKind::terminate())
.map_err(|e| eprintln!("failed to install TERM signal handler: {e}"))
.unwrap();
let mut int_signal = signal(SignalKind::interrupt())
.map_err(|e| {
eprintln!("failed to install INT signal handler: {e}");
})
.unwrap();

select! {
_ = term_signal.recv() => {
println!("Received SIGTERM");
},
_ = int_signal.recv() => {
println!("Received SIGINT");
},
}

let _ = tx.send(());
});

let active_requests = Arc::new(Mutex::new(vec![]));
let active_requests_clone = active_requests.clone();
spawn(async move {
if let Err(e) = event_loop(config, keys, lnd_client, active_requests_clone).await {
eprintln!("Error: {e}");
}
});

rx.await?;

println!("Shutting down...");
// wait for active requests to complete
loop {
let active_requests = active_requests.clone();
let requests = active_requests.lock().await;
if requests.is_empty() {
break;
}
println!("Waiting for {} requests to complete...", requests.len());
tokio::time::sleep(Duration::from_secs(1)).await;
}

Ok(())
}

async fn event_loop(
config: Config,
mut keys: Nip47Keys,
mut lnd_client: LndClient,
active_requests: Arc<Mutex<Vec<EventId>>>,
) -> anyhow::Result<()> {
let tracker = Arc::new(Mutex::new(PaymentTracker::new()));
// loop in case we get disconnected
loop {
let client = Client::new(&keys.server_keys());
Expand Down Expand Up @@ -106,12 +164,19 @@ async fn main() -> anyhow::Result<()> {
&& event.verify().is_ok()
{
println!("Received event!");
let active_requests = active_requests.clone();
let keys = keys.clone();
let config = config.clone();
let client = client.clone();
let tracker = tracker.clone();
let lnd = lnd_client.lightning().clone();

tokio::task::spawn(async move {
let event_id = event.id;
let mut ar = active_requests.lock().await;
ar.push(event_id);
drop(ar);

if let Err(e) = tokio::time::timeout(
Duration::from_secs(60),
handle_nwc_request(event, &keys, &config, &client, tracker, lnd),
Expand All @@ -120,6 +185,10 @@ async fn main() -> anyhow::Result<()> {
{
eprintln!("Error: {e}");
}

// remove request from active requests
let mut ar = active_requests.lock().await;
ar.retain(|id| *id != event_id);
});
} else {
eprintln!("Invalid event: {}", event.as_json());
Expand Down

0 comments on commit da3462b

Please sign in to comment.