diff --git a/bindings/ldk_node.udl b/bindings/ldk_node.udl index 6bd031379..e3ee743fd 100644 --- a/bindings/ldk_node.udl +++ b/bindings/ldk_node.udl @@ -116,6 +116,7 @@ interface Builder { void set_pathfinding_scores_source(string url); void set_liquidity_source_lsps1(PublicKey node_id, SocketAddress address, string? token); void set_liquidity_source_lsps2(PublicKey node_id, SocketAddress address, string? token); + void add_lsp(PublicKey node_id, SocketAddress address, string? token); void set_storage_dir_path(string storage_dir_path); void set_filesystem_logger(string? log_file_path, LogLevel? max_log_level); void set_log_facade_logger(); @@ -299,6 +300,8 @@ interface LSPS1Liquidity { [Throws=NodeError] LSPS1OrderStatus request_channel(u64 lsp_balance_sat, u64 client_balance_sat, u32 channel_expiry_blocks, boolean announce_channel); [Throws=NodeError] + LSPS1OrderStatus request_channel_from_lsp(u64 lsp_balance_sat, u64 client_balance_sat, u32 channel_expiry_blocks, boolean announce_channel, PublicKey node_id); + [Throws=NodeError] LSPS1OrderStatus check_order_status(LSPS1OrderId order_id); }; diff --git a/src/builder.rs b/src/builder.rs index 5d8a5a7a9..5576871f5 100644 --- a/src/builder.rs +++ b/src/builder.rs @@ -65,9 +65,7 @@ use crate::io::{ PENDING_PAYMENT_INFO_PERSISTENCE_PRIMARY_NAMESPACE, PENDING_PAYMENT_INFO_PERSISTENCE_SECONDARY_NAMESPACE, }; -use crate::liquidity::{ - LSPS1ClientConfig, LSPS2ClientConfig, LSPS2ServiceConfig, LiquiditySourceBuilder, -}; +use crate::liquidity::{LSPS2ServiceConfig, LiquiditySourceBuilder, LspConfig}; use crate::logger::{log_error, LdkLogger, LogLevel, LogWriter, Logger}; use crate::message_handler::NodeCustomMessageHandler; use crate::payment::asynchronous::om_mailbox::OnionMessageMailbox; @@ -119,10 +117,8 @@ struct PathfindingScoresSyncConfig { #[derive(Debug, Clone, Default)] struct LiquiditySourceConfig { - // Act as an LSPS1 client connecting to the given service. - lsps1_client: Option, - // Act as an LSPS2 client connecting to the given service. - lsps2_client: Option, + // Acts for both LSPS1 and LSPS2 clients connecting to the given service. + lsp_nodes: Vec, // Act as an LSPS2 service. lsps2_service: Option, } @@ -401,17 +397,12 @@ impl NodeBuilder { /// The given `token` will be used by the LSP to authenticate the user. /// /// [bLIP-51 / LSPS1]: https://github.com/lightning/blips/blob/master/blip-0051.md + #[deprecated(note = "Use `add_lsp` instead")] + #[allow(dead_code)] pub fn set_liquidity_source_lsps1( &mut self, node_id: PublicKey, address: SocketAddress, token: Option, ) -> &mut Self { - // Mark the LSP as trusted for 0conf - self.config.trusted_peers_0conf.push(node_id.clone()); - - let liquidity_source_config = - self.liquidity_source_config.get_or_insert(LiquiditySourceConfig::default()); - let lsps1_client_config = LSPS1ClientConfig { node_id, address, token }; - liquidity_source_config.lsps1_client = Some(lsps1_client_config); - self + self.add_lsp(node_id, address, token) } /// Configures the [`Node`] instance to source just-in-time inbound liquidity from the given @@ -422,16 +413,32 @@ impl NodeBuilder { /// The given `token` will be used by the LSP to authenticate the user. /// /// [bLIP-52 / LSPS2]: https://github.com/lightning/blips/blob/master/blip-0052.md + #[deprecated(note = "Use `add_lsp` instead")] + #[allow(dead_code)] pub fn set_liquidity_source_lsps2( &mut self, node_id: PublicKey, address: SocketAddress, token: Option, + ) -> &mut Self { + self.add_lsp(node_id, address, token) + } + + /// Configures the [`Node`] instance to source inbound liquidity from the given LSP, without specifying + /// the exact protocol used (e.g., LSPS1 or LSPS2). + /// + /// Will mark the LSP as trusted for 0-confirmation channels, see [`Config::trusted_peers_0conf`]. + /// + /// The given `token` will be used by the LSP to authenticate the user. + /// This method is useful when the user wants to connect to an LSP but does not want to be concerned with + /// the specific protocol used for liquidity provision. The node will automatically detect and use the + /// appropriate protocol supported by the LSP. + pub fn add_lsp( + &mut self, node_id: PublicKey, address: SocketAddress, token: Option, ) -> &mut Self { // Mark the LSP as trusted for 0conf - self.config.trusted_peers_0conf.push(node_id.clone()); + self.config.trusted_peers_0conf.push(node_id); let liquidity_source_config = self.liquidity_source_config.get_or_insert(LiquiditySourceConfig::default()); - let lsps2_client_config = LSPS2ClientConfig { node_id, address, token }; - liquidity_source_config.lsps2_client = Some(lsps2_client_config); + liquidity_source_config.lsp_nodes.push(LspConfig { node_id, address, token }); self } @@ -824,7 +831,7 @@ impl ArcedNodeBuilder { pub fn set_liquidity_source_lsps1( &self, node_id: PublicKey, address: SocketAddress, token: Option, ) { - self.inner.write().unwrap().set_liquidity_source_lsps1(node_id, address, token); + self.inner.write().unwrap().add_lsp(node_id, address, token); } /// Configures the [`Node`] instance to source just-in-time inbound liquidity from the given @@ -838,7 +845,20 @@ impl ArcedNodeBuilder { pub fn set_liquidity_source_lsps2( &self, node_id: PublicKey, address: SocketAddress, token: Option, ) { - self.inner.write().unwrap().set_liquidity_source_lsps2(node_id, address, token); + self.inner.write().unwrap().add_lsp(node_id, address, token); + } + + /// Configures the [`Node`] instance to source inbound liquidity from the given LSP, without specifying + /// the exact protocol used (e.g., LSPS1 or LSPS2). + /// + /// Will mark the LSP as trusted for 0-confirmation channels, see [`Config::trusted_peers_0conf`]. + /// + /// The given `token` will be used by the LSP to authenticate the user. + /// This method is useful when the user wants to connect to an LSP but does not want to be concerned with + /// the specific protocol used for liquidity provision. The node will automatically detect and use the + /// appropriate protocol supported by the LSP. + pub fn add_lsp(&self, node_id: PublicKey, address: SocketAddress, token: Option) { + self.inner.write().unwrap().add_lsp(node_id, address, token); } /// Configures the [`Node`] instance to provide an [LSPS2] service, issuing just-in-time @@ -1598,21 +1618,7 @@ fn build_with_store_internal( Arc::clone(&logger), ); - lsc.lsps1_client.as_ref().map(|config| { - liquidity_source_builder.lsps1_client( - config.node_id, - config.address.clone(), - config.token.clone(), - ) - }); - - lsc.lsps2_client.as_ref().map(|config| { - liquidity_source_builder.lsps2_client( - config.node_id, - config.address.clone(), - config.token.clone(), - ) - }); + liquidity_source_builder.set_lsp_nodes(lsc.lsp_nodes.clone()); let promise_secret = { let lsps_xpriv = derive_xprv( diff --git a/src/event.rs b/src/event.rs index 6f0ed8e09..c25a1e0e3 100644 --- a/src/event.rs +++ b/src/event.rs @@ -1221,12 +1221,8 @@ where let user_channel_id: u128 = rng().random(); let allow_0conf = self.config.trusted_peers_0conf.contains(&counterparty_node_id); let mut channel_override_config = None; - if let Some((lsp_node_id, _)) = self - .liquidity_source - .as_ref() - .and_then(|ls| ls.as_ref().get_lsps2_lsp_details()) - { - if lsp_node_id == counterparty_node_id { + if let Some(ls) = self.liquidity_source.as_ref() { + if ls.as_ref().is_lsps_node(&counterparty_node_id) { // When we're an LSPS2 client, allow claiming underpaying HTLCs as the LSP will skim off some fee. We'll // check that they don't take too much before claiming. // diff --git a/src/lib.rs b/src/lib.rs index d2222d949..24ce08031 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -661,6 +661,29 @@ impl Node { }); } + if let Some(liquidity_source) = self.liquidity_source.as_ref() { + let discovery_ls = Arc::clone(&liquidity_source); + let discovery_cm = Arc::clone(&self.connection_manager); + let discovery_logger = Arc::clone(&self.logger); + self.runtime.spawn_background_task(async move { + for (node_id, address) in discovery_ls.get_all_lsp_details() { + if let Err(e) = + discovery_cm.connect_peer_if_necessary(node_id, address.clone()).await + { + log_error!( + discovery_logger, + "Failed to connect to LSP {} for protocol discovery: {}", + node_id, + e + ); + continue; + } + } + + discovery_ls.discover_all_lsp_protocols().await; + }); + } + log_info!(self.logger, "Startup complete."); *is_running_lock = true; Ok(()) diff --git a/src/liquidity.rs b/src/liquidity.rs index cbfd7b109..051c4f836 100644 --- a/src/liquidity.rs +++ b/src/liquidity.rs @@ -20,9 +20,11 @@ use lightning::events::HTLCHandlingFailureType; use lightning::ln::channelmanager::{InterceptId, MIN_FINAL_CLTV_EXPIRY_DELTA}; use lightning::ln::msgs::SocketAddress; use lightning::ln::types::ChannelId; +use lightning::log_warn; use lightning::routing::router::{RouteHint, RouteHintHop}; use lightning_invoice::{Bolt11Invoice, Bolt11InvoiceDescription, InvoiceBuilder, RoutingFees}; use lightning_liquidity::events::LiquidityEvent; +use lightning_liquidity::lsps0::event::LSPS0ClientEvent; use lightning_liquidity::lsps0::ser::{LSPSDateTime, LSPSRequestId}; use lightning_liquidity::lsps1::client::LSPS1ClientConfig as LdkLSPS1ClientConfig; use lightning_liquidity::lsps1::event::LSPS1ClientEvent; @@ -54,39 +56,19 @@ const LIQUIDITY_REQUEST_TIMEOUT_SECS: u64 = 5; const LSPS2_GETINFO_REQUEST_EXPIRY: Duration = Duration::from_secs(60 * 60 * 24); const LSPS2_CHANNEL_CLTV_EXPIRY_DELTA: u32 = 72; -struct LSPS1Client { - lsp_node_id: PublicKey, - lsp_address: SocketAddress, - token: Option, - ldk_client_config: LdkLSPS1ClientConfig, - pending_opening_params_requests: - Mutex>>, - pending_create_order_requests: Mutex>>, - pending_check_order_status_requests: - Mutex>>, -} - #[derive(Debug, Clone)] -pub(crate) struct LSPS1ClientConfig { +pub(crate) struct LspConfig { pub node_id: PublicKey, pub address: SocketAddress, pub token: Option, } -struct LSPS2Client { - lsp_node_id: PublicKey, - lsp_address: SocketAddress, +pub(crate) struct LspNode { + node_id: PublicKey, + address: SocketAddress, token: Option, - ldk_client_config: LdkLSPS2ClientConfig, - pending_fee_requests: Mutex>>, - pending_buy_requests: Mutex>>, -} - -#[derive(Debug, Clone)] -pub(crate) struct LSPS2ClientConfig { - pub node_id: PublicKey, - pub address: SocketAddress, - pub token: Option, + // Protocol numbers discovered via LSPS0 (e.g., 1 = LSPS1, 2 = LSPS2, 5 = LSPS5). + supported_protocols: Mutex>>, } struct LSPS2Service { @@ -149,8 +131,7 @@ pub(crate) struct LiquiditySourceBuilder where L::Target: LdkLogger, { - lsps1_client: Option, - lsps2_client: Option, + lsp_nodes: Vec, lsps2_service: Option, wallet: Arc, channel_manager: Arc, @@ -171,12 +152,10 @@ where chain_source: Arc, tx_broadcaster: Arc, kv_store: Arc, config: Arc, logger: L, ) -> Self { - let lsps1_client = None; - let lsps2_client = None; + let lsp_nodes = Vec::new(); let lsps2_service = None; Self { - lsps1_client, - lsps2_client, + lsp_nodes, lsps2_service, wallet, channel_manager, @@ -189,40 +168,8 @@ where } } - pub(crate) fn lsps1_client( - &mut self, lsp_node_id: PublicKey, lsp_address: SocketAddress, token: Option, - ) -> &mut Self { - // TODO: allow to set max_channel_fees_msat - let ldk_client_config = LdkLSPS1ClientConfig { max_channel_fees_msat: None }; - let pending_opening_params_requests = Mutex::new(HashMap::new()); - let pending_create_order_requests = Mutex::new(HashMap::new()); - let pending_check_order_status_requests = Mutex::new(HashMap::new()); - self.lsps1_client = Some(LSPS1Client { - lsp_node_id, - lsp_address, - token, - ldk_client_config, - pending_opening_params_requests, - pending_create_order_requests, - pending_check_order_status_requests, - }); - self - } - - pub(crate) fn lsps2_client( - &mut self, lsp_node_id: PublicKey, lsp_address: SocketAddress, token: Option, - ) -> &mut Self { - let ldk_client_config = LdkLSPS2ClientConfig {}; - let pending_fee_requests = Mutex::new(HashMap::new()); - let pending_buy_requests = Mutex::new(HashMap::new()); - self.lsps2_client = Some(LSPS2Client { - lsp_node_id, - lsp_address, - token, - ldk_client_config, - pending_fee_requests, - pending_buy_requests, - }); + pub(crate) fn set_lsp_nodes(&mut self, lsp_nodes: Vec) -> &mut Self { + self.lsp_nodes = lsp_nodes; self } @@ -242,8 +189,14 @@ where LiquidityServiceConfig { lsps2_service_config, lsps5_service_config, advertise_service } }); - let lsps1_client_config = self.lsps1_client.as_ref().map(|s| s.ldk_client_config.clone()); - let lsps2_client_config = self.lsps2_client.as_ref().map(|s| s.ldk_client_config.clone()); + let has_lsp_nodes = !self.lsp_nodes.is_empty(); + let lsps1_client_config = if has_lsp_nodes { + // TODO: allow to set max_channel_fees_msat + Some(LdkLSPS1ClientConfig { max_channel_fees_msat: None }) + } else { + None + }; + let lsps2_client_config = if has_lsp_nodes { Some(LdkLSPS2ClientConfig {}) } else { None }; let lsps5_client_config = None; let liquidity_client_config = Some(LiquidityClientConfig { lsps1_client_config, @@ -268,8 +221,23 @@ where ); Ok(LiquiditySource { - lsps1_client: self.lsps1_client, - lsps2_client: self.lsps2_client, + lsp_nodes: self + .lsp_nodes + .into_iter() + .map(|cfg| LspNode { + node_id: cfg.node_id, + address: cfg.address, + token: cfg.token, + supported_protocols: Mutex::new(None), + }) + .collect(), + pending_lsps1_opening_params_requests: Mutex::new(HashMap::new()), + pending_lsps1_create_order_requests: Mutex::new(HashMap::new()), + pending_lsps1_check_order_status_requests: Mutex::new(HashMap::new()), + pending_lsps2_fee_requests: Mutex::new(HashMap::new()), + pending_lsps2_buy_requests: Mutex::new(HashMap::new()), + pending_lsps0_discovery: Mutex::new(HashMap::new()), + lsps1_order_lsp_map: Mutex::new(HashMap::new()), lsps2_service: self.lsps2_service, wallet: self.wallet, channel_manager: self.channel_manager, @@ -286,8 +254,17 @@ pub(crate) struct LiquiditySource where L::Target: LdkLogger, { - lsps1_client: Option, - lsps2_client: Option, + lsp_nodes: Vec, + pending_lsps1_opening_params_requests: + Mutex>>, + pending_lsps1_create_order_requests: + Mutex>>, + pending_lsps1_check_order_status_requests: + Mutex>>, + pending_lsps2_fee_requests: Mutex>>, + pending_lsps2_buy_requests: Mutex>>, + pending_lsps0_discovery: Mutex>>>, + lsps1_order_lsp_map: Mutex>, lsps2_service: Option, wallet: Arc, channel_manager: Arc, @@ -310,12 +287,8 @@ where Arc::clone(&self.liquidity_manager) } - pub(crate) fn get_lsps1_lsp_details(&self) -> Option<(PublicKey, SocketAddress)> { - self.lsps1_client.as_ref().map(|s| (s.lsp_node_id, s.lsp_address.clone())) - } - pub(crate) fn get_lsps2_lsp_details(&self) -> Option<(PublicKey, SocketAddress)> { - self.lsps2_client.as_ref().map(|s| (s.lsp_node_id, s.lsp_address.clone())) + self.select_lsps_for_protocol(2, None).map(|s| (s.node_id, s.address.clone())) } pub(crate) fn lsps2_channel_needs_manual_broadcast( @@ -392,21 +365,9 @@ where counterparty_node_id, supported_options, }) => { - if let Some(lsps1_client) = self.lsps1_client.as_ref() { - if counterparty_node_id != lsps1_client.lsp_node_id { - debug_assert!( - false, - "Received response from unexpected LSP counterparty. This should never happen." - ); - log_error!( - self.logger, - "Received response from unexpected LSP counterparty. This should never happen." - ); - return; - } - - if let Some(sender) = lsps1_client - .pending_opening_params_requests + if self.is_lsps_node(&counterparty_node_id) { + if let Some(sender) = self + .pending_lsps1_opening_params_requests .lock() .unwrap() .remove(&request_id) @@ -448,24 +409,9 @@ where payment, channel, }) => { - if let Some(lsps1_client) = self.lsps1_client.as_ref() { - if counterparty_node_id != lsps1_client.lsp_node_id { - debug_assert!( - false, - "Received response from unexpected LSP counterparty. This should never happen." - ); - log_error!( - self.logger, - "Received response from unexpected LSP counterparty. This should never happen." - ); - return; - } - - if let Some(sender) = lsps1_client - .pending_create_order_requests - .lock() - .unwrap() - .remove(&request_id) + if self.is_lsps_node(&counterparty_node_id) { + if let Some(sender) = + self.pending_lsps1_create_order_requests.lock().unwrap().remove(&request_id) { let response = LSPS1OrderStatus { order_id, @@ -506,21 +452,9 @@ where payment, channel, }) => { - if let Some(lsps1_client) = self.lsps1_client.as_ref() { - if counterparty_node_id != lsps1_client.lsp_node_id { - debug_assert!( - false, - "Received response from unexpected LSP counterparty. This should never happen." - ); - log_error!( - self.logger, - "Received response from unexpected LSP counterparty. This should never happen." - ); - return; - } - - if let Some(sender) = lsps1_client - .pending_check_order_status_requests + if self.is_lsps_node(&counterparty_node_id) { + if let Some(sender) = self + .pending_lsps1_check_order_status_requests .lock() .unwrap() .remove(&request_id) @@ -813,21 +747,9 @@ where counterparty_node_id, opening_fee_params_menu, }) => { - if let Some(lsps2_client) = self.lsps2_client.as_ref() { - if counterparty_node_id != lsps2_client.lsp_node_id { - debug_assert!( - false, - "Received response from unexpected LSP counterparty. This should never happen." - ); - log_error!( - self.logger, - "Received response from unexpected LSP counterparty. This should never happen." - ); - return; - } - + if self.is_lsps_node(&counterparty_node_id) { if let Some(sender) = - lsps2_client.pending_fee_requests.lock().unwrap().remove(&request_id) + self.pending_lsps2_fee_requests.lock().unwrap().remove(&request_id) { let response = LSPS2FeeResponse { opening_fee_params_menu }; @@ -865,31 +787,54 @@ where cltv_expiry_delta, .. }) => { - if let Some(lsps2_client) = self.lsps2_client.as_ref() { - if counterparty_node_id != lsps2_client.lsp_node_id { + if self.is_lsps_node(&counterparty_node_id) { + if let Some(sender) = + self.pending_lsps2_buy_requests.lock().unwrap().remove(&request_id) + { + let response = LSPS2BuyResponse { intercept_scid, cltv_expiry_delta }; + + match sender.send(response) { + Ok(()) => (), + Err(_) => { + log_error!( + self.logger, + "Failed to handle response for request {:?} from liquidity service", + request_id + ); + }, + } + } else { debug_assert!( false, - "Received response from unexpected LSP counterparty. This should never happen." + "Received response from liquidity service for unknown request." ); log_error!( self.logger, - "Received response from unexpected LSP counterparty. This should never happen." + "Received response from liquidity service for unknown request." ); - return; } - + } else { + log_error!( + self.logger, + "Received unexpected LSPS2Client::InvoiceParametersReady event!" + ); + } + }, + LiquidityEvent::LSPS0Client(LSPS0ClientEvent::ListProtocolsResponse { + counterparty_node_id, + protocols, + }) => { + if self.is_lsps_node(&counterparty_node_id) { if let Some(sender) = - lsps2_client.pending_buy_requests.lock().unwrap().remove(&request_id) + self.pending_lsps0_discovery.lock().unwrap().remove(&counterparty_node_id) { - let response = LSPS2BuyResponse { intercept_scid, cltv_expiry_delta }; - - match sender.send(response) { + match sender.send(protocols) { Ok(()) => (), Err(_) => { log_error!( self.logger, "Failed to handle response for request {:?} from liquidity service", - request_id + counterparty_node_id ); }, } @@ -906,7 +851,8 @@ where } else { log_error!( self.logger, - "Received unexpected LSPS2Client::InvoiceParametersReady event!" + "Received LSPS0 ListProtocolsResponse from unexpected counterparty {}.", + counterparty_node_id ); } }, @@ -917,9 +863,11 @@ where } pub(crate) async fn lsps1_request_opening_params( - &self, + &self, node_id: &PublicKey, ) -> Result { - let lsps1_client = self.lsps1_client.as_ref().ok_or(Error::LiquiditySourceUnavailable)?; + let lsps1_node = self + .select_lsps_for_protocol(1, Some(node_id)) + .ok_or(Error::LiquiditySourceUnavailable)?; let client_handler = self.liquidity_manager.lsps1_client_handler().ok_or_else(|| { log_error!(self.logger, "LSPS1 liquidity client was not configured.",); @@ -929,8 +877,8 @@ where let (request_sender, request_receiver) = oneshot::channel(); { let mut pending_opening_params_requests_lock = - lsps1_client.pending_opening_params_requests.lock().unwrap(); - let request_id = client_handler.request_supported_options(lsps1_client.lsp_node_id); + self.pending_lsps1_opening_params_requests.lock().unwrap(); + let request_id = client_handler.request_supported_options(lsps1_node.node_id); pending_opening_params_requests_lock.insert(request_id, request_sender); } @@ -948,15 +896,18 @@ where pub(crate) async fn lsps1_request_channel( &self, lsp_balance_sat: u64, client_balance_sat: u64, channel_expiry_blocks: u32, - announce_channel: bool, refund_address: bitcoin::Address, + announce_channel: bool, refund_address: bitcoin::Address, node_id: &PublicKey, ) -> Result { - let lsps1_client = self.lsps1_client.as_ref().ok_or(Error::LiquiditySourceUnavailable)?; + let lsps1_node = self + .select_lsps_for_protocol(1, Some(node_id)) + .ok_or(Error::LiquiditySourceUnavailable)?; + let client_handler = self.liquidity_manager.lsps1_client_handler().ok_or_else(|| { log_error!(self.logger, "LSPS1 liquidity client was not configured.",); Error::LiquiditySourceUnavailable })?; - let lsp_limits = self.lsps1_request_opening_params().await?.supported_options; + let lsp_limits = self.lsps1_request_opening_params(node_id).await?.supported_options; let channel_size_sat = lsp_balance_sat + client_balance_sat; if channel_size_sat < lsp_limits.min_channel_balance_sat @@ -1004,7 +955,7 @@ where required_channel_confirmations: lsp_limits.min_required_channel_confirmations, funding_confirms_within_blocks: lsp_limits.min_funding_confirms_within_blocks, channel_expiry_blocks, - token: lsps1_client.token.clone(), + token: lsps1_node.token.clone(), announce_channel, }; @@ -1012,9 +963,9 @@ where let request_id; { let mut pending_create_order_requests_lock = - lsps1_client.pending_create_order_requests.lock().unwrap(); + self.pending_lsps1_create_order_requests.lock().unwrap(); request_id = client_handler.create_order( - &lsps1_client.lsp_node_id, + &lsps1_node.node_id, order_params.clone(), Some(refund_address), ); @@ -1043,13 +994,25 @@ where return Err(Error::LiquidityRequestFailed); } + self.lsps1_order_lsp_map + .lock() + .unwrap() + .insert(response.order_id.clone(), lsps1_node.node_id); + Ok(response) } pub(crate) async fn lsps1_check_order_status( &self, order_id: LSPS1OrderId, ) -> Result { - let lsps1_client = self.lsps1_client.as_ref().ok_or(Error::LiquiditySourceUnavailable)?; + let lsp_node_id = { + let lock = self.lsps1_order_lsp_map.lock().unwrap(); + *lock.get(&order_id).ok_or_else(|| { + log_error!(self.logger, "No LSP node ID found for LSPS1 order ID {:?}.", order_id); + Error::LiquiditySourceUnavailable + })? + }; + let client_handler = self.liquidity_manager.lsps1_client_handler().ok_or_else(|| { log_error!(self.logger, "LSPS1 liquidity client was not configured.",); Error::LiquiditySourceUnavailable @@ -1058,8 +1021,8 @@ where let (request_sender, request_receiver) = oneshot::channel(); { let mut pending_check_order_status_requests_lock = - lsps1_client.pending_check_order_status_requests.lock().unwrap(); - let request_id = client_handler.check_order_status(&lsps1_client.lsp_node_id, order_id); + self.pending_lsps1_check_order_status_requests.lock().unwrap(); + let request_id = client_handler.check_order_status(&lsp_node_id, order_id); pending_check_order_status_requests_lock.insert(request_id, request_sender); } @@ -1084,17 +1047,40 @@ where &self, amount_msat: u64, description: &Bolt11InvoiceDescription, expiry_secs: u32, max_total_lsp_fee_limit_msat: Option, payment_hash: Option, ) -> Result<(Bolt11Invoice, u64), Error> { - let fee_response = self.lsps2_request_opening_fee_params().await?; + let lsps2_nodes = self.select_all_lsps_for_protocol(2); + if lsps2_nodes.is_empty() { + log_error!(self.logger, "No LSPs available for LSPS2 protocol.",); + return Err(Error::LiquiditySourceUnavailable); + } + + let mut all_offers = Vec::new(); + for lsp_node in &lsps2_nodes { + match self.lsps2_request_opening_fee_params(Some(&lsp_node.node_id)).await { + Ok(fee_response) => all_offers.push((lsp_node.node_id, fee_response)), + Err(e) => { + log_warn!( + self.logger, + "Failed to get fees from LSP {}: {}", + lsp_node.node_id, + e + ); + continue; + }, + } + } - let (min_total_fee_msat, min_opening_params) = fee_response - .opening_fee_params_menu + let (cheapest_lsp_node_id, min_total_fee_msat, min_opening_params) = all_offers .into_iter() - .filter_map(|params| { + .flat_map(|(node_id, resp)| { + resp.opening_fee_params_menu.into_iter().map(move |params| (node_id, params)) + }) + .filter_map(|(node_id, params)| { if amount_msat < params.min_payment_size_msat || amount_msat > params.max_payment_size_msat { log_debug!(self.logger, - "Skipping LSP-offered JIT parameters as the payment of {}msat doesn't meet LSP limits (min: {}msat, max: {}msat)", + "Skipping LSP {}'s JIT offer as the payment of {}msat doesn't meet LSP limits (min: {}msat, max: {}msat)", + node_id, amount_msat, params.min_payment_size_msat, params.max_payment_size_msat @@ -1102,10 +1088,10 @@ where None } else { compute_opening_fee(amount_msat, params.min_fee_msat, params.proportional as u64) - .map(|fee| (fee, params)) + .map(|fee| (node_id, fee, params)) } }) - .min_by_key(|p| p.0) + .min_by_key(|(_, fee, _)| *fee) .ok_or_else(|| { log_error!(self.logger, "Failed to handle response from liquidity service",); Error::LiquidityRequestFailed @@ -1127,14 +1113,20 @@ where min_total_fee_msat ); - let buy_response = - self.lsps2_send_buy_request(Some(amount_msat), min_opening_params).await?; + let buy_response = self + .lsps2_send_buy_request( + Some(amount_msat), + min_opening_params, + Some(&cheapest_lsp_node_id), + ) + .await?; let invoice = self.lsps2_create_jit_invoice( buy_response, Some(amount_msat), description, expiry_secs, payment_hash, + Some(&cheapest_lsp_node_id), )?; log_info!(self.logger, "JIT-channel invoice created: {}", invoice); @@ -1145,13 +1137,35 @@ where &self, description: &Bolt11InvoiceDescription, expiry_secs: u32, max_proportional_lsp_fee_limit_ppm_msat: Option, payment_hash: Option, ) -> Result<(Bolt11Invoice, u64), Error> { - let fee_response = self.lsps2_request_opening_fee_params().await?; + let lsps2_nodes = self.select_all_lsps_for_protocol(2); + if lsps2_nodes.is_empty() { + log_error!(self.logger, "No LSPs available for LSPS2 protocol.",); + return Err(Error::LiquiditySourceUnavailable); + } + + let mut all_offers = Vec::new(); + for lsp_node in &lsps2_nodes { + match self.lsps2_request_opening_fee_params(Some(&lsp_node.node_id)).await { + Ok(fee_response) => all_offers.push((lsp_node.node_id, fee_response)), + Err(e) => { + log_warn!( + self.logger, + "Failed to get fees from LSP {}: {}", + lsp_node.node_id, + e + ); + continue; + }, + } + } - let (min_prop_fee_ppm_msat, min_opening_params) = fee_response - .opening_fee_params_menu + let (cheapest_lsp_node_id, min_prop_fee_ppm_msat, min_opening_params) = all_offers .into_iter() - .map(|params| (params.proportional as u64, params)) - .min_by_key(|p| p.0) + .flat_map(|(node_id, resp)| { + resp.opening_fee_params_menu.into_iter().map(move |params| (node_id, params)) + }) + .map(|(node_id, params)| (node_id, params.proportional as u64, params)) + .min_by_key(|(_, ppm, _)| *ppm) .ok_or_else(|| { log_error!(self.logger, "Failed to handle response from liquidity service",); Error::LiquidityRequestFailed @@ -1176,21 +1190,27 @@ where min_prop_fee_ppm_msat ); - let buy_response = self.lsps2_send_buy_request(None, min_opening_params).await?; + let buy_response = self + .lsps2_send_buy_request(None, min_opening_params, Some(&cheapest_lsp_node_id)) + .await?; let invoice = self.lsps2_create_jit_invoice( buy_response, None, description, expiry_secs, payment_hash, + Some(&cheapest_lsp_node_id), )?; log_info!(self.logger, "JIT-channel invoice created: {}", invoice); Ok((invoice, min_prop_fee_ppm_msat)) } - async fn lsps2_request_opening_fee_params(&self) -> Result { - let lsps2_client = self.lsps2_client.as_ref().ok_or(Error::LiquiditySourceUnavailable)?; + async fn lsps2_request_opening_fee_params( + &self, node_id: Option<&PublicKey>, + ) -> Result { + let lsps2_node = + self.select_lsps_for_protocol(2, node_id).ok_or(Error::LiquiditySourceUnavailable)?; let client_handler = self.liquidity_manager.lsps2_client_handler().ok_or_else(|| { log_error!(self.logger, "Liquidity client was not configured.",); @@ -1199,9 +1219,9 @@ where let (fee_request_sender, fee_request_receiver) = oneshot::channel(); { - let mut pending_fee_requests_lock = lsps2_client.pending_fee_requests.lock().unwrap(); - let request_id = client_handler - .request_opening_params(lsps2_client.lsp_node_id, lsps2_client.token.clone()); + let mut pending_fee_requests_lock = self.pending_lsps2_fee_requests.lock().unwrap(); + let request_id = + client_handler.request_opening_params(lsps2_node.node_id, lsps2_node.token.clone()); pending_fee_requests_lock.insert(request_id, fee_request_sender); } @@ -1222,8 +1242,10 @@ where async fn lsps2_send_buy_request( &self, amount_msat: Option, opening_fee_params: LSPS2OpeningFeeParams, + node_id: Option<&PublicKey>, ) -> Result { - let lsps2_client = self.lsps2_client.as_ref().ok_or(Error::LiquiditySourceUnavailable)?; + let lsps2_node = + self.select_lsps_for_protocol(2, node_id).ok_or(Error::LiquiditySourceUnavailable)?; let client_handler = self.liquidity_manager.lsps2_client_handler().ok_or_else(|| { log_error!(self.logger, "Liquidity client was not configured.",); @@ -1232,9 +1254,9 @@ where let (buy_request_sender, buy_request_receiver) = oneshot::channel(); { - let mut pending_buy_requests_lock = lsps2_client.pending_buy_requests.lock().unwrap(); + let mut pending_buy_requests_lock = self.pending_lsps2_buy_requests.lock().unwrap(); let request_id = client_handler - .select_opening_params(lsps2_client.lsp_node_id, amount_msat, opening_fee_params) + .select_opening_params(lsps2_node.node_id, amount_msat, opening_fee_params) .map_err(|e| { log_error!( self.logger, @@ -1266,9 +1288,10 @@ where fn lsps2_create_jit_invoice( &self, buy_response: LSPS2BuyResponse, amount_msat: Option, description: &Bolt11InvoiceDescription, expiry_secs: u32, - payment_hash: Option, + payment_hash: Option, node_id: Option<&PublicKey>, ) -> Result { - let lsps2_client = self.lsps2_client.as_ref().ok_or(Error::LiquiditySourceUnavailable)?; + let lsps2_node = + self.select_lsps_for_protocol(2, node_id).ok_or(Error::LiquiditySourceUnavailable)?; // LSPS2 requires min_final_cltv_expiry_delta to be at least 2 more than usual. let min_final_cltv_expiry_delta = MIN_FINAL_CLTV_EXPIRY_DELTA + 2; @@ -1298,7 +1321,7 @@ where }; let route_hint = RouteHint(vec![RouteHintHop { - src_node_id: lsps2_client.lsp_node_id, + src_node_id: lsps2_node.node_id, short_channel_id: buy_response.intercept_scid, fees: RoutingFees { base_msat: 0, proportional_millionths: 0 }, cltv_expiry_delta: buy_response.cltv_expiry_delta as u16, @@ -1405,6 +1428,107 @@ where } } } + + pub(crate) fn select_all_lsps_for_protocol(&self, protocol: u16) -> Vec<&LspNode> { + self.lsp_nodes + .iter() + .filter(|lsp_node| { + let protocols_lock = lsp_node.supported_protocols.lock().unwrap(); + protocols_lock + .as_ref() + .map(|protocols| protocols.contains(&protocol)) + .unwrap_or(false) + }) + .collect() + } + + pub(crate) fn select_lsps_for_protocol( + &self, protocol: u16, override_node_id: Option<&PublicKey>, + ) -> Option<&LspNode> { + self.lsp_nodes.iter().find(|lsp_node| { + if let Some(override_node_id) = override_node_id { + lsp_node.node_id == *override_node_id + } else { + let protocols_lock = lsp_node.supported_protocols.lock().unwrap(); + protocols_lock + .as_ref() + .map(|protocols| protocols.contains(&protocol)) + .unwrap_or(false) + } + }) + } + + pub(crate) fn is_lsps_node(&self, node_id: &PublicKey) -> bool { + self.lsp_nodes.iter().any(|lsp_node| lsp_node.node_id == *node_id) + } + + pub(crate) fn get_all_lsp_details(&self) -> Vec<(PublicKey, SocketAddress)> { + self.lsp_nodes.iter().map(|n| (n.node_id, n.address.clone())).collect() + } + + pub(crate) async fn discover_lsp_protocols( + &self, node_id: &PublicKey, + ) -> Result, Error> { + let lsps0_handler = self.liquidity_manager.lsps0_client_handler(); + + let (sender, receiver) = oneshot::channel(); + { + let mut pending_discovery = self.pending_lsps0_discovery.lock().unwrap(); + lsps0_handler.list_protocols(node_id); + pending_discovery.insert(*node_id, sender); + } + + let protocols = + tokio::time::timeout(Duration::from_secs(LIQUIDITY_REQUEST_TIMEOUT_SECS), receiver) + .await + .map_err(|e| { + log_error!( + self.logger, + "LSPS0 discovery request timed out for {}: {}", + node_id, + e + ); + Error::LiquidityRequestFailed + })? + .map_err(|e| { + log_error!( + self.logger, + "Failed to handle LSPS0 discovery response from {}: {}", + node_id, + e + ); + Error::LiquidityRequestFailed + })?; + + if let Some(lsp_node) = self.lsp_nodes.iter().find(|n| &n.node_id == node_id) { + *lsp_node.supported_protocols.lock().unwrap() = Some(protocols.clone()); + } + + Ok(protocols) + } + + pub(crate) async fn discover_all_lsp_protocols(&self) { + for lsp_node in &self.lsp_nodes { + match self.discover_lsp_protocols(&lsp_node.node_id).await { + Ok(protocols) => { + log_info!( + self.logger, + "Discovered protocols for LSP {}: {:?}", + lsp_node.node_id, + protocols + ); + }, + Err(e) => { + log_error!( + self.logger, + "Failed to discover protocols for LSP {}: {:?}", + lsp_node.node_id, + e + ); + }, + } + } + } } #[derive(Debug, Clone)] @@ -1482,11 +1606,12 @@ impl LSPS1Liquidity { let liquidity_source = self.liquidity_source.as_ref().ok_or(Error::LiquiditySourceUnavailable)?; - let (lsp_node_id, lsp_address) = - liquidity_source.get_lsps1_lsp_details().ok_or(Error::LiquiditySourceUnavailable)?; + let lsps1_node = liquidity_source + .select_lsps_for_protocol(1, None) + .ok_or(Error::LiquiditySourceUnavailable)?; - let con_node_id = lsp_node_id; - let con_addr = lsp_address.clone(); + let con_node_id = lsps1_node.node_id; + let con_addr = lsps1_node.address.clone(); let con_cm = Arc::clone(&self.connection_manager); // We need to use our main runtime here as a local runtime might not be around to poll @@ -1495,7 +1620,7 @@ impl LSPS1Liquidity { con_cm.connect_peer_if_necessary(con_node_id, con_addr).await })?; - log_info!(self.logger, "Connected to LSP {}@{}. ", lsp_node_id, lsp_address); + log_info!(self.logger, "Connected to LSP {}@{}. ", lsps1_node.node_id, lsps1_node.address); let refund_address = self.wallet.get_new_address()?; @@ -1508,6 +1633,53 @@ impl LSPS1Liquidity { channel_expiry_blocks, announce_channel, refund_address, + &con_node_id, + ) + .await + })?; + + Ok(response) + } + + /// Connects to the specified configured LSP and places an order for an inbound channel. + /// + /// The channel will be opened after one of the returned payment options has successfully been + /// paid. + pub fn request_channel_from_lsp( + &self, lsp_balance_sat: u64, client_balance_sat: u64, channel_expiry_blocks: u32, + announce_channel: bool, node_id: PublicKey, + ) -> Result { + let liquidity_source = + self.liquidity_source.as_ref().ok_or(Error::LiquiditySourceUnavailable)?; + + let lsps1_node = liquidity_source + .select_lsps_for_protocol(1, Some(&node_id)) + .ok_or(Error::LiquiditySourceUnavailable)?; + + let con_node_id = lsps1_node.node_id; + let con_addr = lsps1_node.address.clone(); + let con_cm = Arc::clone(&self.connection_manager); + + // We need to use our main runtime here as a local runtime might not be around to poll + // connection futures going forward. + self.runtime.block_on(async move { + con_cm.connect_peer_if_necessary(con_node_id, con_addr).await + })?; + + log_info!(self.logger, "Connected to LSP {}@{}. ", lsps1_node.node_id, lsps1_node.address); + + let refund_address = self.wallet.get_new_address()?; + + let liquidity_source = Arc::clone(&liquidity_source); + let response = self.runtime.block_on(async move { + liquidity_source + .lsps1_request_channel( + lsp_balance_sat, + client_balance_sat, + channel_expiry_blocks, + announce_channel, + refund_address, + &con_node_id, ) .await })?; @@ -1520,11 +1692,20 @@ impl LSPS1Liquidity { let liquidity_source = self.liquidity_source.as_ref().ok_or(Error::LiquiditySourceUnavailable)?; - let (lsp_node_id, lsp_address) = - liquidity_source.get_lsps1_lsp_details().ok_or(Error::LiquiditySourceUnavailable)?; + let lsp_node_id = { + let lock = liquidity_source.lsps1_order_lsp_map.lock().unwrap(); + *lock.get(&order_id).ok_or_else(|| { + log_error!(self.logger, "No LSP node ID found for LSPS1 order ID {:?}.", order_id); + Error::LiquiditySourceUnavailable + })? + }; + + let lsps1_node = liquidity_source + .select_lsps_for_protocol(1, Some(&lsp_node_id)) + .ok_or(Error::LiquiditySourceUnavailable)?; - let con_node_id = lsp_node_id; - let con_addr = lsp_address.clone(); + let con_node_id = lsps1_node.node_id; + let con_addr = lsps1_node.address.clone(); let con_cm = Arc::clone(&self.connection_manager); // We need to use our main runtime here as a local runtime might not be around to poll diff --git a/tests/integration_tests_rust.rs b/tests/integration_tests_rust.rs index 605dd0613..ec686aaa7 100644 --- a/tests/integration_tests_rust.rs +++ b/tests/integration_tests_rust.rs @@ -1743,7 +1743,7 @@ async fn do_lsps2_client_service_integration(client_trusts_lsp: bool) { let client_config = random_config(true); setup_builder!(client_builder, client_config.node_config); client_builder.set_chain_source_esplora(esplora_url.clone(), Some(sync_config)); - client_builder.set_liquidity_source_lsps2(service_node_id, service_addr, None); + client_builder.add_lsp(service_node_id, service_addr, None); let client_node = client_builder.build(client_config.node_entropy.into()).unwrap(); client_node.start().unwrap(); @@ -2060,7 +2060,7 @@ async fn lsps2_client_trusts_lsp() { let client_config = random_config(true); setup_builder!(client_builder, client_config.node_config); client_builder.set_chain_source_esplora(esplora_url.clone(), Some(sync_config)); - client_builder.set_liquidity_source_lsps2(service_node_id, service_addr.clone(), None); + client_builder.add_lsp(service_node_id, service_addr.clone(), None); let client_node = client_builder.build(client_config.node_entropy.into()).unwrap(); client_node.start().unwrap(); let client_node_id = client_node.node_id(); @@ -2235,7 +2235,7 @@ async fn lsps2_lsp_trusts_client_but_client_does_not_claim() { let client_config = random_config(true); setup_builder!(client_builder, client_config.node_config); client_builder.set_chain_source_esplora(esplora_url.clone(), Some(sync_config)); - client_builder.set_liquidity_source_lsps2(service_node_id, service_addr.clone(), None); + client_builder.add_lsp(service_node_id, service_addr.clone(), None); let client_node = client_builder.build(client_config.node_entropy.into()).unwrap(); client_node.start().unwrap();