diff --git a/implementations/rust/ockam/ockam_api/src/http/interceptor.rs b/implementations/rust/ockam/ockam_api/src/http/interceptor.rs new file mode 100644 index 00000000000..62d91ab2965 --- /dev/null +++ b/implementations/rust/ockam/ockam_api/src/http/interceptor.rs @@ -0,0 +1,304 @@ +use crate::http::state::{ClientRequestWriter, RequestState}; +use crate::nodes::models::services::{DeleteServiceRequest, StartServiceRequest}; +use crate::nodes::registry::HttpHeaderInterceptorInfo; +use crate::nodes::{NodeManager, NodeManagerWorker}; +use crate::DefaultAddress; +use httparse::Request; +use minicbor::{CborLen, Decode, Encode}; +use ockam_abac::{Action, PolicyAccessControl, Resource, ResourceType}; +use ockam_core::api::Response; +use ockam_core::errcode::{Kind, Origin}; +use ockam_core::{async_trait, Address, AllowAll, IncomingAccessControl, OutgoingAccessControl}; +use ockam_node::Context; +use ockam_transport_tcp::{ + read_portal_payload_length, Direction, PortalInletInterceptor, PortalInterceptor, + PortalInterceptorFactory, +}; +use std::io::Write; +use std::sync::{Arc, Mutex as SyncMutex}; + +struct HttpHeadersInterceptorFactory { + headers: Arc>, +} + +impl HttpHeadersInterceptorFactory { + pub async fn create( + context: &Context, + listener_address: Address, + headers: Vec<(String, String)>, + policy_access_control: Option, + ) -> ockam_core::Result<()> { + let flow_control_id = context + .flow_controls() + .get_flow_control_with_spawner(&Address::from_string( + DefaultAddress::SECURE_CHANNEL_LISTENER, + )) + .ok_or_else(|| { + ockam_core::Error::new( + Origin::Channel, + Kind::NotFound, + "Secure channel listener not found", + ) + })?; + + context + .flow_controls() + .add_consumer(listener_address.clone(), &flow_control_id); + + let incoming_access_control: Arc; + let outgoing_access_control: Arc; + if let Some(policy_access_control) = policy_access_control { + incoming_access_control = Arc::new(policy_access_control.create_incoming()); + outgoing_access_control = + Arc::new(policy_access_control.create_outgoing(context).await?); + } else { + incoming_access_control = Arc::new(AllowAll); + outgoing_access_control = Arc::new(AllowAll); + } + + PortalInletInterceptor::create( + context, + listener_address, + Arc::new(HttpHeadersInterceptorFactory { + headers: Arc::new(headers), + }), + incoming_access_control, + outgoing_access_control, + read_portal_payload_length(), + ) + .await + } +} + +impl PortalInterceptorFactory for HttpHeadersInterceptorFactory { + fn create(&self) -> Arc { + Arc::new(HttpHeadersInterceptor { + headers: self.headers.clone(), + state: SyncMutex::new(RequestState::ParsingHeader(None)), + }) + } +} + +struct HttpHeadersInterceptor { + headers: Arc>, + state: SyncMutex, +} + +#[async_trait] +impl PortalInterceptor for HttpHeadersInterceptor { + async fn intercept( + &self, + _context: &mut Context, + direction: Direction, + buffer: &[u8], + ) -> ockam_core::Result>> { + match direction { + Direction::FromOutletToInlet => Ok(Some(buffer.to_vec())), + Direction::FromInletToOutlet => { + let mut guard = self.state.lock().unwrap(); + Ok(Some(guard.process_http_buffer(buffer, self)?)) + } + } + } +} + +impl ClientRequestWriter for &HttpHeadersInterceptor { + fn write_headers(&self, request: &Request, buffer: &mut Vec) -> ockam_core::Result<()> { + write!( + buffer, + "{} {} HTTP/1.{}\r\n", + request.method.unwrap(), + request.path.unwrap(), + request.version.unwrap() + ) + .unwrap(); + + for (name, value) in self.headers.iter() { + write!(buffer, "{}: {}\r\n", name, value).unwrap(); + } + + for h in &*request.headers { + if !self + .headers + .iter() + .any(|(name, _)| name.eq_ignore_ascii_case(h.name)) + { + write!(buffer, "{}: ", h.name).unwrap(); + buffer.extend_from_slice(h.value); + buffer.extend_from_slice(b"\r\n"); + } + } + + buffer.extend_from_slice(b"\r\n"); + Ok(()) + } +} + +/// Request body to create a new HTTP rewrite headers interceptor +#[derive(Clone, Debug, Encode, Decode, CborLen)] +#[rustfmt::skip] +#[cbor(map)] +pub struct HttpHeadersInterceptorRequest { + #[n(0)] pub headers: Vec<(String, String)>, +} + +impl NodeManagerWorker { + pub async fn start_http_header_service( + &self, + context: &Context, + request: StartServiceRequest, + ) -> ockam_core::Result, Response> { + let result = self + .node_manager + .start_http_header_service( + context, + Address::from_string(request.address()), + request.request().headers.clone(), + ) + .await; + + match result { + Ok(_) => Ok(Response::ok().body(())), + Err(e) => Err(Response::internal_error_no_request(&e.to_string())), + } + } + + pub async fn delete_http_overwrite_header_service( + &self, + context: &Context, + request: DeleteServiceRequest, + ) -> ockam_core::Result, Response> { + let result = self + .node_manager + .delete_http_overwrite_header_service(context, Address::from_string(request.address())) + .await; + + match result { + Ok(_) => Ok(Response::ok().body(())), + Err(e) => Err(Response::internal_error_no_request(&e.to_string())), + } + } +} + +impl NodeManager { + pub async fn start_http_header_service( + &self, + context: &Context, + listener_address: Address, + headers: Vec<(String, String)>, + ) -> ockam_core::Result<()> { + let policy_access_control = self + .policy_access_control( + self.project_authority().clone(), + Resource::new(listener_address.to_string(), ResourceType::TcpInlet), + Action::HandleMessage, + None, + ) + .await?; + + HttpHeadersInterceptorFactory::create( + context, + listener_address.clone(), + headers, + Some(policy_access_control), + ) + .await?; + + self.registry + .http_headers_interceptors + .insert(listener_address, HttpHeaderInterceptorInfo {}) + .await; + + Ok(()) + } + + pub async fn delete_http_overwrite_header_service( + &self, + context: &Context, + listener_address: Address, + ) -> ockam_core::Result<()> { + context.stop_worker(listener_address.clone()).await?; + + self.registry + .http_headers_interceptors + .remove(&listener_address) + .await; + + Ok(()) + } +} + +#[cfg(test)] +mod test { + use super::*; + use crate::nodes::service::{NodeManagerCredentialRetrieverOptions, NodeManagerTrustOptions}; + use crate::test_utils::start_manager_for_tests; + use ockam_core::NeutralMessage; + use ockam_transport_tcp::PortalMessage; + + #[ockam::test] + async fn main(context: &mut Context) -> ockam::Result<()> { + let handler = start_manager_for_tests( + context, + None, + Some(NodeManagerTrustOptions::new( + NodeManagerCredentialRetrieverOptions::None, + NodeManagerCredentialRetrieverOptions::None, + None, + NodeManagerCredentialRetrieverOptions::None, + )), + ) + .await?; + + HttpHeadersInterceptorFactory::create( + context, + "http_interceptor".into(), + vec![("Host".to_string(), "ockam.io".to_string())], + None, + ) + .await?; + + let connection = handler + .node_manager + .make_connection( + context, + &format!( + "/service/http_interceptor/service/{}", + context.address_ref().address() + ) + .parse()?, + handler.node_manager.identifier(), + None, + None, + ) + .await?; + + let route = connection.route()?; + + context + .send(route.clone(), PortalMessage::Ping.to_neutral_message()?) + .await?; + + let _ = context.receive::().await?; + + context + .send( + route.clone(), + PortalMessage::Payload(b"GET / HTTP/1.1\r\nHost: localhost\r\n\r\n", None) + .to_neutral_message()?, + ) + .await?; + + let message = context.receive::().await?; + let message = PortalMessage::decode(message.payload())?; + + if let PortalMessage::Payload(payload, _) = message { + let message = String::from_utf8(payload.to_vec()).unwrap(); + assert_eq!(message, "GET / HTTP/1.1\r\nHost: ockam.io\r\n\r\n"); + } else { + panic!("Decoded message is not a Payload"); + } + + Ok(()) + } +} diff --git a/implementations/rust/ockam/ockam_api/src/http/mod.rs b/implementations/rust/ockam/ockam_api/src/http/mod.rs new file mode 100644 index 00000000000..54f37be930e --- /dev/null +++ b/implementations/rust/ockam/ockam_api/src/http/mod.rs @@ -0,0 +1,2 @@ +pub(crate) mod interceptor; +pub mod state; diff --git a/implementations/rust/ockam/ockam_api/src/http/state.rs b/implementations/rust/ockam/ockam_api/src/http/state.rs new file mode 100644 index 00000000000..04c80493728 --- /dev/null +++ b/implementations/rust/ockam/ockam_api/src/http/state.rs @@ -0,0 +1,168 @@ +use httparse::{Header, Status}; +use ockam_core::errcode::{Kind, Origin}; +use std::convert::TryInto; +use tracing::error; + +#[derive(Debug, Clone, PartialEq)] +pub enum RequestState { + ParsingHeader(Option>), + ParsingChunkedHeader(Option>), + RemainingInChunk(usize), + RemainingBody(usize), +} + +pub(crate) trait ClientRequestWriter { + fn write_headers( + &self, + request: &httparse::Request, + buffer: &mut Vec, + ) -> ockam_core::Result<()>; +} + +impl RequestState { + /* Parse the incoming data, attaching an Authorization header token to it. + * data is received in chunks, and there is no warranty on what we get on each: + * incomplete requests, multiple requests, etc. + */ + pub(crate) fn process_http_buffer( + &mut self, + buf: &[u8], + request_writer: impl ClientRequestWriter, + ) -> ockam_core::Result> { + let mut acc = Vec::with_capacity(buf.len()); + let mut cursor = buf; + loop { + if cursor.is_empty() { + return Ok(acc); + } + match self { + RequestState::ParsingHeader(prev) => { + let (to_parse, prev_size): (&[u8], usize) = if let Some(b) = prev { + let prev_size = b.len(); + b.extend_from_slice(cursor); + (b, prev_size) + } else { + (cursor, 0usize) + }; + let mut headers = [httparse::EMPTY_HEADER; 64]; + let mut req = httparse::Request::new(&mut headers); + match req.parse(to_parse) { + Ok(Status::Partial) if prev_size == 0 => { + // No previous buffered, need to copy and own the unparsed data + *self = RequestState::ParsingHeader(Some(cursor.to_vec())); + return Ok(acc); + } + Ok(Status::Partial) => { + // There was a previous buffer, and we already added the newly data to it + return Ok(acc); + } + Ok(Status::Complete(body_offset)) => { + cursor = &cursor[body_offset - prev_size..]; + request_writer.write_headers(&req, &mut acc)?; + // interceptor::attach_auth_token_and_serialize_into(&req, &mut acc); + *self = body_state(req.headers)?; + } + Err(e) => { + error!("Error parsing header: {:?}", e); + return Err(ockam_core::Error::new( + Origin::Transport, + Kind::Invalid, + e, + )); + } + } + } + RequestState::RemainingBody(remaining) => { + if *remaining <= cursor.len() { + acc.extend_from_slice(&cursor[..*remaining]); + cursor = &cursor[*remaining..]; + *self = RequestState::ParsingHeader(None); + } else { + acc.extend_from_slice(cursor); + *remaining -= cursor.len(); + return Ok(acc); + } + } + RequestState::ParsingChunkedHeader(prev) => { + let (to_parse, prev_size): (&[u8], usize) = if let Some(b) = prev { + let prev_size = b.len(); + b.extend_from_slice(cursor); + (b, prev_size) + } else { + (cursor, 0usize) + }; + match httparse::parse_chunk_size(to_parse) { + Ok(Status::Complete((2, 0))) => { + // this is just a final \r\n. The spec said it should end in a 0-sized + // chunk.. but having seen this on the wild as well. + acc.extend_from_slice(&to_parse[..2]); + cursor = &cursor[2 - prev_size..]; + *self = RequestState::ParsingHeader(None); + } + Ok(Status::Complete((3, 0))) => { + // this is just a proper 0\r\n final chunk. + acc.extend_from_slice(&to_parse[..3]); + cursor = &cursor[3 - prev_size..]; + // There must be a final \r\n. And no more chunks, + // so just reuse the RemainingBody state for this + *self = RequestState::RemainingBody(2); + } + Ok(Status::Complete((pos, chunk_size))) => { + acc.extend_from_slice(&to_parse[..pos]); + cursor = &cursor[pos - prev_size..]; + let complete_size = chunk_size + 2; //chunks ends in \r\n + *self = + RequestState::RemainingInChunk(complete_size.try_into().unwrap()); + } + Ok(Status::Partial) if prev_size == 0 => { + // No previous buffered, need to copy and own the unparsed data + *self = RequestState::ParsingChunkedHeader(Some(cursor.to_vec())); + return Ok(acc); + } + Ok(Status::Partial) => { + // There was a previous buffer, and we already added the newly data to it + return Ok(acc); + } + Err(e) => { + error!("Error parsing chunk size: {:?}. Buffer: {:?}", e, prev); + return Err(ockam_core::Error::new( + Origin::Transport, + Kind::Invalid, + format!("Can't parse chunked body {:?}", e), + )); + } + } + } + RequestState::RemainingInChunk(size) => { + if cursor.len() >= *size { + acc.extend_from_slice(&cursor[..*size]); + cursor = &cursor[*size..]; + *self = RequestState::ParsingChunkedHeader(None); + } else { + acc.extend_from_slice(cursor); + *size -= cursor.len(); + return Ok(acc); + } + } + } + } + } +} + +fn body_state(headers: &[Header]) -> ockam_core::Result { + for h in headers { + if h.name.eq_ignore_ascii_case("Content-Length") { + if let Ok(str) = std::str::from_utf8(h.value) { + return str + .parse() + .map(RequestState::RemainingBody) + .map_err(|e| ockam_core::Error::new(Origin::Transport, Kind::Invalid, e)); + } + } else if h.name.eq_ignore_ascii_case("Transfer-Encoding") + && String::from_utf8(h.value.to_vec()).is_ok_and(|s| s.contains("chunked")) + { + return Ok(RequestState::ParsingChunkedHeader(None)); + } + } + Ok(RequestState::ParsingHeader(None)) +} diff --git a/implementations/rust/ockam/ockam_api/src/influxdb/gateway/interceptor.rs b/implementations/rust/ockam/ockam_api/src/influxdb/gateway/interceptor.rs index 8d46a86afde..e0b7eb8ab82 100644 --- a/implementations/rust/ockam/ockam_api/src/influxdb/gateway/interceptor.rs +++ b/implementations/rust/ockam/ockam_api/src/influxdb/gateway/interceptor.rs @@ -1,25 +1,14 @@ use std::io::Write; -use httparse::{Header, Status}; use ockam_core::async_trait; use ockam_node::Context; use ockam_transport_tcp::{Direction, PortalInterceptor, PortalInterceptorFactory}; use std::sync::Arc; use tokio::sync::Mutex; -use ockam::errcode::{Kind, Origin}; - -use tracing::{debug, error}; - use super::token_lease_refresher::TokenLeaseRefresher; - -#[derive(Debug, Clone, PartialEq)] -enum RequestState { - ParsingHeader(Option>), - ParsingChunkedHeader(Option>), - RemainingInChunk(usize), - RemainingBody(usize), -} +use crate::http::state::{ClientRequestWriter, RequestState}; +use tracing::{debug, error}; struct HttpAuthInterceptorState { state: RequestState, @@ -84,158 +73,6 @@ fn attach_auth_token_and_serialize_into( buffer.extend_from_slice(b"\r\n"); } -fn body_state(method: &str, headers: &[Header]) -> ockam_core::Result { - match method.to_uppercase().as_str() { - "POST" | "PUT" => { - for h in headers { - if h.name.eq_ignore_ascii_case("Content-Length") { - if let Ok(str) = std::str::from_utf8(h.value) { - return str.parse().map(RequestState::RemainingBody).map_err(|e| { - ockam_core::Error::new(Origin::Transport, Kind::Invalid, e) - }); - } - } else if h.name.eq_ignore_ascii_case("Transfer-Encoding") - && String::from_utf8(h.value.to_vec()).is_ok_and(|s| s.contains("chunked")) - { - return Ok(RequestState::ParsingChunkedHeader(None)); - } - } - // Not content-length, no chunked encoding, fail. - Err(ockam_core::Error::new( - Origin::Transport, - Kind::Invalid, - "No Content-Length nor chunked Transfer-Encoding", - )) - } - _ => Ok(RequestState::ParsingHeader(None)), - } -} - -impl RequestState { - /* Parse the incoming data, attaching an Authorization header token to it. - * data is received in chunks, and there is no warranty on what we get on each: - * incomplete requests, multiple requests, etc. - */ - fn process_http_buffer(&mut self, buf: &[u8], token: &str) -> ockam_core::Result> { - let mut acc = Vec::with_capacity(buf.len()); - let mut cursor = buf; - loop { - if cursor.is_empty() { - return Ok(acc); - } - match self { - RequestState::ParsingHeader(prev) => { - let (to_parse, prev_size): (&[u8], usize) = if let Some(b) = prev { - let prev_size = b.len(); - b.extend_from_slice(cursor); - (b, prev_size) - } else { - (cursor, 0usize) - }; - let mut headers = [httparse::EMPTY_HEADER; 64]; - let mut req = httparse::Request::new(&mut headers); - match req.parse(to_parse) { - Ok(httparse::Status::Partial) if prev_size == 0 => { - // No previous buffered, need to copy and own the unparsed data - *self = RequestState::ParsingHeader(Some(cursor.to_vec())); - return Ok(acc); - } - Ok(httparse::Status::Partial) => { - // There was a previous buffer, and we already added the newly data to it - return Ok(acc); - } - Ok(httparse::Status::Complete(body_offset)) => { - cursor = &cursor[body_offset - prev_size..]; - attach_auth_token_and_serialize_into(&req, token, &mut acc); - *self = body_state(req.method.unwrap(), req.headers)?; - } - Err(e) => { - error!("Error parsing header: {:?}", e); - return Err(ockam_core::Error::new( - Origin::Transport, - Kind::Invalid, - e, - )); - } - } - } - RequestState::RemainingBody(remaining) => { - if *remaining <= cursor.len() { - acc.extend_from_slice(&cursor[..*remaining]); - cursor = &cursor[*remaining..]; - *self = RequestState::ParsingHeader(None); - } else { - acc.extend_from_slice(cursor); - *remaining -= cursor.len(); - return Ok(acc); - } - } - RequestState::ParsingChunkedHeader(prev) => { - let (to_parse, prev_size): (&[u8], usize) = if let Some(b) = prev { - let prev_size = b.len(); - b.extend_from_slice(cursor); - (b, prev_size) - } else { - (cursor, 0usize) - }; - match httparse::parse_chunk_size(to_parse) { - Ok(Status::Complete((2, 0))) => { - // this is just a final \r\n. The spec said it should end in a 0-sized - // chunk.. but having seen this on the wild as well. - acc.extend_from_slice(&to_parse[..2]); - cursor = &cursor[2 - prev_size..]; - *self = RequestState::ParsingHeader(None); - } - Ok(Status::Complete((3, 0))) => { - // this is just a proper 0\r\n final chunk. - acc.extend_from_slice(&to_parse[..3]); - cursor = &cursor[3 - prev_size..]; - // There must be a final \r\n. And no more chunks, - // so just reuse the RemainingBody state for this - *self = RequestState::RemainingBody(2); - } - Ok(Status::Complete((pos, chunk_size))) => { - acc.extend_from_slice(&to_parse[..pos]); - cursor = &cursor[pos - prev_size..]; - let complete_size = chunk_size + 2; //chunks ends in \r\n - *self = - RequestState::RemainingInChunk(complete_size.try_into().unwrap()); - } - Ok(Status::Partial) if prev_size == 0 => { - // No previous buffered, need to copy and own the unparsed data - *self = RequestState::ParsingChunkedHeader(Some(cursor.to_vec())); - return Ok(acc); - } - Ok(Status::Partial) => { - // There was a previous buffer, and we already added the newly data to it - return Ok(acc); - } - Err(e) => { - error!("Error parsing chunk size: {:?}. Buffer: {:?}", e, prev); - return Err(ockam_core::Error::new( - Origin::Transport, - Kind::Invalid, - format!("Can't parse chunked body {:?}", e), - )); - } - } - } - RequestState::RemainingInChunk(size) => { - if cursor.len() >= *size { - acc.extend_from_slice(&cursor[..*size]); - cursor = &cursor[*size..]; - *self = RequestState::ParsingChunkedHeader(None); - } else { - acc.extend_from_slice(cursor); - *size -= cursor.len(); - return Ok(acc); - } - } - } - } - } -} - #[async_trait] impl PortalInterceptor for HttpAuthInterceptor { async fn intercept( @@ -245,26 +82,36 @@ impl PortalInterceptor for HttpAuthInterceptor { buffer: &[u8], ) -> ockam_core::Result>> { match direction { - Direction::FromOutletToInlet => ockam_core::Result::Ok(Some(buffer.to_vec())), - + Direction::FromOutletToInlet => Ok(Some(buffer.to_vec())), Direction::FromInletToOutlet => { let mut guard = self.state.lock().await; - let token = self.token_refresher.get_token().await; - if token.is_none() { - error!("No authorization token available"); - } - let out = guard - .state - .process_http_buffer(buffer, &token.unwrap_or_default())?; + let out = guard.state.process_http_buffer(buffer, self)?; Ok(Some(out)) } } } } +impl ClientRequestWriter for &HttpAuthInterceptor { + fn write_headers( + &self, + request: &httparse::Request, + buffer: &mut Vec, + ) -> ockam_core::Result<()> { + let token = self.token_refresher.get_token(); + if token.is_none() { + error!("No authorization token available"); + } + + attach_auth_token_and_serialize_into(request, &token.unwrap_or_default(), buffer); + Ok(()) + } +} + #[cfg(test)] mod tests { - use super::*; + use crate::http::state::{ClientRequestWriter, RequestState}; + use crate::influxdb::gateway::interceptor::attach_auth_token_and_serialize_into; const REQ: &str = "POST / HTTP/1.1\r\n\ Host: www.example.com\r\n\ @@ -283,6 +130,19 @@ Accept-Encoding: gzip, deflate, br\r\n\ Transfer-Encoding: gzip, chunked\r\n\r\n\ 4\r\nWiki\r\n7\r\npedia i\r\n0\r\n\r\n"; + struct RequestWriterSimulator; + + impl ClientRequestWriter for RequestWriterSimulator { + fn write_headers( + &self, + request: &httparse::Request, + buffer: &mut Vec, + ) -> ockam_core::Result<()> { + attach_auth_token_and_serialize_into(request, TOKEN, buffer); + Ok(()) + } + } + #[test] fn parse_post_with_chunked_transfers() { let mut data = Vec::new(); @@ -293,7 +153,9 @@ Transfer-Encoding: gzip, chunked\r\n\r\n\ let mut result = Vec::new(); let mut request_state = RequestState::ParsingHeader(None); for chunk in data.chunks(size) { - let data_out = request_state.process_http_buffer(chunk, TOKEN).unwrap(); + let data_out = request_state + .process_http_buffer(chunk, RequestWriterSimulator) + .unwrap(); result.extend_from_slice(&data_out); } assert_eq!( @@ -328,7 +190,9 @@ field1=value1&field2=value2", let mut result = Vec::new(); let mut request_state = RequestState::ParsingHeader(None); for chunk in data.chunks(size) { - let data_out = request_state.process_http_buffer(chunk, TOKEN).unwrap(); + let data_out = request_state + .process_http_buffer(chunk, RequestWriterSimulator) + .unwrap(); result.extend_from_slice(&data_out); } assert_eq!( @@ -356,7 +220,9 @@ field1=value1&field2=value2", let mut result = Vec::new(); let mut request_state = RequestState::ParsingHeader(None); for chunk in data.chunks(size) { - let data_out = request_state.process_http_buffer(chunk, TOKEN).unwrap(); + let data_out = request_state + .process_http_buffer(chunk, RequestWriterSimulator) + .unwrap(); result.extend_from_slice(&data_out); } assert_eq!(String::from_utf8(result).unwrap(), expected); diff --git a/implementations/rust/ockam/ockam_api/src/influxdb/gateway/token_lease_refresher.rs b/implementations/rust/ockam/ockam_api/src/influxdb/gateway/token_lease_refresher.rs index 92c54dbfe8b..1607b83fd8f 100644 --- a/implementations/rust/ockam/ockam_api/src/influxdb/gateway/token_lease_refresher.rs +++ b/implementations/rust/ockam/ockam_api/src/influxdb/gateway/token_lease_refresher.rs @@ -6,17 +6,16 @@ use ockam_core::{api::Error, AllowAll, DenyAll}; use ockam_multiaddr::MultiAddr; use ockam_node::Context; use std::cmp::max; -use std::sync::{Arc, Weak}; -use tokio::sync::RwLock; +use std::sync::{Arc, RwLock as SyncRwLock, Weak}; #[derive(Clone)] pub struct TokenLeaseRefresher { - token: Arc>>, + token: Arc>>, } impl TokenLeaseRefresher { pub fn new_with_fixed_token(token: String) -> TokenLeaseRefresher { - let token = Arc::new(RwLock::new(Some(token))); + let token = Arc::new(SyncRwLock::new(Some(token))); Self { token } } pub async fn new( @@ -24,7 +23,7 @@ impl TokenLeaseRefresher { node_manager: Weak, lease_issuer_route: MultiAddr, ) -> Result { - let token = Arc::new(RwLock::new(None)); + let token = Arc::new(SyncRwLock::new(None)); let mailboxes = Mailboxes::main( Address::random_tagged("LeaseRetriever"), Arc::new(DenyAll), @@ -49,13 +48,13 @@ impl TokenLeaseRefresher { Ok(Self { token }) } - pub async fn get_token(&self) -> Option { - self.token.read().await.clone() + pub fn get_token(&self) -> Option { + self.token.read().unwrap().clone() } } async fn refresh_loop( - token: Arc>>, + token: Arc>>, ctx: Context, node_manager: Weak, lease_issuer_route: MultiAddr, @@ -71,7 +70,7 @@ async fn refresh_loop( Ok(new_token) => { let duration = new_token.expires_at as u64 - now_t; debug!("Auth Token obtained expires at {}", new_token.expires_at); - let mut t = token.write().await; + let mut t = token.write().unwrap(); *t = Some(new_token.token); // We request a new token once reaching half its duration, with a minimum // of 5 seconds. diff --git a/implementations/rust/ockam/ockam_api/src/influxdb/portal.rs b/implementations/rust/ockam/ockam_api/src/influxdb/portal.rs index 6057c3bfc85..6f1293b68f3 100644 --- a/implementations/rust/ockam/ockam_api/src/influxdb/portal.rs +++ b/implementations/rust/ockam/ockam_api/src/influxdb/portal.rs @@ -122,6 +122,7 @@ impl NodeManagerWorker { disable_tcp_fallback, privileged, tls_certificate_provider, + .. } = body.tcp_inlet.clone(); //TODO: should be an easier way to tweak the multiaddr @@ -380,6 +381,7 @@ impl InfluxDBPortals for BackgroundNodeClient { disable_tcp_fallback, false, tls_certificate_provider, + route![], ); let payload = CreateInfluxDBInlet::new(inlet_payload, lease_usage, lease_issuer_route); Request::post("/node/influxdb_inlet").body(payload) diff --git a/implementations/rust/ockam/ockam_api/src/lib.rs b/implementations/rust/ockam/ockam_api/src/lib.rs index 55adead0306..90188778e46 100644 --- a/implementations/rust/ockam/ockam_api/src/lib.rs +++ b/implementations/rust/ockam/ockam_api/src/lib.rs @@ -44,6 +44,7 @@ pub mod logs; mod schema; mod date; +mod http; mod multiaddr_resolver; mod rendezvous_healthcheck; pub mod test_utils; diff --git a/implementations/rust/ockam/ockam_api/src/nodes/models/portal.rs b/implementations/rust/ockam/ockam_api/src/nodes/models/portal.rs index 71d2f8d7880..e54092b290d 100644 --- a/implementations/rust/ockam/ockam_api/src/nodes/models/portal.rs +++ b/implementations/rust/ockam/ockam_api/src/nodes/models/portal.rs @@ -57,6 +57,8 @@ pub struct CreateInlet { #[n(12)] pub(crate) privileged: bool, /// TLS certificate provider route. #[n(13)] pub(crate) tls_certificate_provider: Option, + /// The prefix route to be used for interceptors. + #[n(14)] pub(crate) prefix_route: Route, } impl CreateInlet { @@ -83,6 +85,7 @@ impl CreateInlet { disable_tcp_fallback, privileged, tls_certificate_provider: None, + prefix_route: Default::default(), } } @@ -110,6 +113,7 @@ impl CreateInlet { disable_tcp_fallback, privileged, tls_certificate_provider: None, + prefix_route: Default::default(), } } @@ -117,6 +121,10 @@ impl CreateInlet { self.tls_certificate_provider = Some(provider); } + pub fn set_prefix_route(&mut self, route: Route) { + self.prefix_route = route; + } + pub fn set_wait_ms(&mut self, ms: u64) { self.wait_for_outlet_duration = Some(Duration::from_millis(ms)) } diff --git a/implementations/rust/ockam/ockam_api/src/nodes/registry.rs b/implementations/rust/ockam/ockam_api/src/nodes/registry.rs index 85b4b702cf7..31dffefb87b 100644 --- a/implementations/rust/ockam/ockam_api/src/nodes/registry.rs +++ b/implementations/rust/ockam/ockam_api/src/nodes/registry.rs @@ -98,6 +98,9 @@ pub enum KafkaServiceKind { Outlet, } +#[derive(Clone)] +pub(crate) struct HttpHeaderInterceptorInfo {} + impl Display for KafkaServiceKind { fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { match self { @@ -182,6 +185,7 @@ pub(crate) struct Registry { pub(crate) echoer_services: RegistryOf, pub(crate) kafka_services: RegistryOf, pub(crate) hop_services: RegistryOf, + pub(crate) http_headers_interceptors: RegistryOf, pub(crate) relays: RegistryOf, pub(crate) inlets: RegistryOf, pub(crate) outlets: RegistryOf, diff --git a/implementations/rust/ockam/ockam_api/src/nodes/service.rs b/implementations/rust/ockam/ockam_api/src/nodes/service.rs index 03fed806739..19c5b505b6d 100644 --- a/implementations/rust/ockam/ockam_api/src/nodes/service.rs +++ b/implementations/rust/ockam/ockam_api/src/nodes/service.rs @@ -23,6 +23,7 @@ pub mod workers; mod certificate_provider; mod http; +mod interceptors; mod manager; mod trust; mod worker; diff --git a/implementations/rust/ockam/ockam_api/src/nodes/service/default_address.rs b/implementations/rust/ockam/ockam_api/src/nodes/service/default_address.rs index 833dcce2302..ca500089f73 100644 --- a/implementations/rust/ockam/ockam_api/src/nodes/service/default_address.rs +++ b/implementations/rust/ockam/ockam_api/src/nodes/service/default_address.rs @@ -10,6 +10,8 @@ impl DefaultAddress { pub const UPPERCASE_SERVICE: &'static str = "uppercase"; pub const ECHO_SERVICE: &'static str = "echo"; pub const HOP_SERVICE: &'static str = "hop"; + pub const HTTP_HEADERS_SERVICE: &'static str = "http_headers"; + pub const REMOTE_PROXY_VAULT: &'static str = "remote_proxy_vault"; pub const SECURE_CHANNEL_LISTENER: &'static str = "api"; pub const KEY_EXCHANGER_LISTENER: &'static str = "key_exchanger"; pub const UDP_PUNCTURE_NEGOTIATION_LISTENER: &'static str = "udp"; diff --git a/implementations/rust/ockam/ockam_api/src/nodes/service/interceptors.rs b/implementations/rust/ockam/ockam_api/src/nodes/service/interceptors.rs new file mode 100644 index 00000000000..693b361911b --- /dev/null +++ b/implementations/rust/ockam/ockam_api/src/nodes/service/interceptors.rs @@ -0,0 +1,32 @@ +use crate::http::interceptor::HttpHeadersInterceptorRequest; +use crate::nodes::models::services::StartServiceRequest; +use crate::nodes::BackgroundNodeClient; +use crate::DefaultAddress; +use ockam_core::api::Request; +use ockam_core::Address; +use ockam_node::Context; + +impl BackgroundNodeClient { + pub async fn create_http_header_overwrite_service( + &self, + context: &Context, + address: &Address, + headers: Vec<(String, String)>, + ) -> miette::Result<()> { + let body = StartServiceRequest::new( + HttpHeadersInterceptorRequest { + headers: headers.clone(), + }, + address.clone(), + ); + self.ask( + context, + Request::post(format!( + "/node/services/{}", + DefaultAddress::HTTP_HEADERS_SERVICE + )) + .body(body), + ) + .await + } +} diff --git a/implementations/rust/ockam/ockam_api/src/nodes/service/node_services.rs b/implementations/rust/ockam/ockam_api/src/nodes/service/node_services.rs index 6aa5895d041..4ff4dda165f 100644 --- a/implementations/rust/ockam/ockam_api/src/nodes/service/node_services.rs +++ b/implementations/rust/ockam/ockam_api/src/nodes/service/node_services.rs @@ -155,6 +155,17 @@ impl NodeManager { DefaultAddress::HOP_SERVICE, )) }); + self.registry + .http_headers_interceptors + .keys() + .await + .iter() + .for_each(|addr| { + list.push(ServiceStatus::new( + addr.address(), + DefaultAddress::HTTP_HEADERS_SERVICE, + )) + }); self.registry .kafka_services .entries() diff --git a/implementations/rust/ockam/ockam_api/src/nodes/service/tcp_inlets/background_node_client.rs b/implementations/rust/ockam/ockam_api/src/nodes/service/tcp_inlets/background_node_client.rs index c217aeb3f04..dc98120d049 100644 --- a/implementations/rust/ockam/ockam_api/src/nodes/service/tcp_inlets/background_node_client.rs +++ b/implementations/rust/ockam/ockam_api/src/nodes/service/tcp_inlets/background_node_client.rs @@ -1,7 +1,7 @@ use ockam::identity::Identifier; use ockam_abac::PolicyExpression; use ockam_core::api::{Reply, Request}; -use ockam_core::async_trait; +use ockam_core::{async_trait, Route}; use ockam_multiaddr::proto::Project as ProjectProto; use ockam_multiaddr::{MultiAddr, Protocol}; use ockam_node::Context; @@ -26,6 +26,7 @@ pub fn create_inlet_payload( disable_tcp_fallback: bool, privileged: bool, tls_certificate_provider: &Option, + prefix_route: Route, ) -> CreateInlet { let via_project = outlet_addr.matches(0, &[ProjectProto::CODE.into()]); let mut payload = if via_project { @@ -59,6 +60,7 @@ pub fn create_inlet_payload( if let Some(tls_provider) = tls_certificate_provider { payload.set_tls_certificate_provider(tls_provider.clone()) } + payload.set_prefix_route(prefix_route); payload.set_wait_ms(wait_for_outlet_timeout.as_millis() as u64); payload } @@ -80,6 +82,7 @@ impl Inlets for BackgroundNodeClient { disable_tcp_fallback: bool, privileged: bool, tls_certificate_provider: &Option, + prefix_route: Route, ) -> miette::Result> { let request = { let payload = create_inlet_payload( @@ -95,6 +98,7 @@ impl Inlets for BackgroundNodeClient { disable_tcp_fallback, privileged, tls_certificate_provider, + prefix_route, ); Request::post("/node/inlet").body(payload) }; diff --git a/implementations/rust/ockam/ockam_api/src/nodes/service/tcp_inlets/inlets_trait.rs b/implementations/rust/ockam/ockam_api/src/nodes/service/tcp_inlets/inlets_trait.rs index 439160f1b71..680c7e42df3 100644 --- a/implementations/rust/ockam/ockam_api/src/nodes/service/tcp_inlets/inlets_trait.rs +++ b/implementations/rust/ockam/ockam_api/src/nodes/service/tcp_inlets/inlets_trait.rs @@ -1,7 +1,7 @@ use ockam::identity::Identifier; use ockam_abac::PolicyExpression; use ockam_core::api::Reply; -use ockam_core::async_trait; +use ockam_core::{async_trait, Route}; use ockam_multiaddr::MultiAddr; use ockam_node::Context; use ockam_transport_core::HostnamePort; @@ -27,6 +27,7 @@ pub trait Inlets { disable_tcp_fallback: bool, privileged: bool, tls_certificate_provider: &Option, + prefix_route: Route, ) -> miette::Result>; async fn show_inlet(&self, ctx: &Context, alias: &str) -> miette::Result>; diff --git a/implementations/rust/ockam/ockam_api/src/nodes/service/tcp_inlets/node_manager_worker.rs b/implementations/rust/ockam/ockam_api/src/nodes/service/tcp_inlets/node_manager_worker.rs index 5aa912a5e4c..69a7eb10bdd 100644 --- a/implementations/rust/ockam/ockam_api/src/nodes/service/tcp_inlets/node_manager_worker.rs +++ b/implementations/rust/ockam/ockam_api/src/nodes/service/tcp_inlets/node_manager_worker.rs @@ -30,13 +30,14 @@ impl NodeManagerWorker { disable_tcp_fallback, privileged, tls_certificate_provider, + prefix_route, } = create_inlet; match self .node_manager .create_inlet( ctx, listen_addr, - route![], + prefix_route, route![], outlet_addr, alias, diff --git a/implementations/rust/ockam/ockam_api/src/nodes/service/worker.rs b/implementations/rust/ockam/ockam_api/src/nodes/service/worker.rs index 2580e6bcd63..74acb12b464 100644 --- a/implementations/rust/ockam/ockam_api/src/nodes/service/worker.rs +++ b/implementations/rust/ockam/ockam_api/src/nodes/service/worker.rs @@ -142,6 +142,17 @@ impl NodeManagerWorker { self.delete_kafka_service(ctx, dec.decode()?, KafkaServiceKind::Inlet) .await, )?, + (Post, ["node", "services", DefaultAddress::HTTP_HEADERS_SERVICE]) => encode_response( + req, + self.start_http_header_service(ctx, dec.decode()?).await, + )?, + (Delete, ["node", "services", DefaultAddress::HTTP_HEADERS_SERVICE]) => { + encode_response( + req, + self.delete_http_overwrite_header_service(ctx, dec.decode()?) + .await, + )? + } (Post, ["node", "services", DefaultAddress::LEASE_MANAGER]) => encode_response( req, self.start_influxdb_lease_issuer_service(ctx, dec.decode()?) diff --git a/implementations/rust/ockam/ockam_app_lib/src/incoming_services/commands.rs b/implementations/rust/ockam/ockam_app_lib/src/incoming_services/commands.rs index 14ef18e9e30..bc9bd202f73 100644 --- a/implementations/rust/ockam/ockam_app_lib/src/incoming_services/commands.rs +++ b/implementations/rust/ockam/ockam_app_lib/src/incoming_services/commands.rs @@ -2,6 +2,9 @@ use std::str::FromStr; use std::sync::Arc; use std::time::Duration; +use crate::background_node::BackgroundNodeClientTrait; +use crate::incoming_services::state::{IncomingService, Port}; +use crate::state::AppState; use miette::IntoDiagnostic; use ockam::abac::expr::{eq, ident, str}; use ockam::abac::PolicyExpression::FullExpression; @@ -14,13 +17,10 @@ use ockam_api::authenticator::direct::{ use ockam_api::nodes::service::tcp_inlets::Inlets; use ockam_api::ConnectionStatus; use ockam_core::api::Reply; +use ockam_core::route; use ockam_multiaddr::MultiAddr; use tracing::{debug, error, info, warn}; -use crate::background_node::BackgroundNodeClientTrait; -use crate::incoming_services::state::{IncomingService, Port}; -use crate::state::AppState; - impl AppState { pub(crate) async fn refresh_inlets(&self) -> crate::Result<()> { info!("Refreshing inlets"); @@ -212,6 +212,7 @@ impl AppState { false, false, &None, + route![], ) .await .map_err(|err| { diff --git a/implementations/rust/ockam/ockam_command/src/tcp/inlet/create.rs b/implementations/rust/ockam/ockam_command/src/tcp/inlet/create.rs index 2b25d03c751..5404ed06bab 100644 --- a/implementations/rust/ockam/ockam_command/src/tcp/inlet/create.rs +++ b/implementations/rust/ockam/ockam_command/src/tcp/inlet/create.rs @@ -3,6 +3,7 @@ use crate::shared_args::OptionalTimeoutArg; use crate::tcp::util::alias_parser; use crate::util::parsers::duration_parser; use crate::util::parsers::hostname_parser; +use crate::util::parsers::http_header_parser; use crate::util::{ port_is_free_guard, print_warning_for_deprecated_flag_replaced, process_nodes_multiaddr, }; @@ -28,9 +29,11 @@ use ockam_api::nodes::service::tcp_inlets::Inlets; use ockam_api::nodes::BackgroundNodeClient; use ockam_api::{fmt_info, fmt_log, fmt_ok, fmt_warn, ConnectionStatus}; use ockam_core::api::{Reply, Status}; +use ockam_core::{route, Address}; use ockam_multiaddr::proto; use ockam_multiaddr::{MultiAddr, Protocol as _}; use ockam_node::compat::asynchronous::resolve_peer; + use std::collections::HashMap; use std::str::FromStr; use std::time::Duration; @@ -154,6 +157,12 @@ pub struct CreateCommand { /// Requires `ockam-tls-certificate` credential attribute. #[arg(long, value_name = "ROUTE", hide = true)] pub tls_certificate_provider: Option, + + #[arg(long, value_name = "HTTP_HEADER", value_parser = http_header_parser)] + /// Set the provided HTTP headers in the client request. Existing headers with the same name + /// will be discarded. This option assumes the protocol is HTTP/1.0 or HTTP/1.1. + /// It expects a key-value pair in the format `key:value`. It can be specified multiple times. + pub http_header: Vec<(String, String)>, } pub(crate) fn tcp_inlet_default_from_addr() -> SchemeHostnamePort { @@ -177,6 +186,43 @@ impl Command for CreateCommand { let inlet_status = { let pb = opts.terminal.spinner(); + + let prefix_route = if !cmd.http_header.is_empty() { + let overwrite_http_header_address = Address::random_tagged("http_interceptor"); + + if let Some(pb) = pb.as_ref() { + pb.set_message(format!( + "Creating HTTP Interceptor Service at {}...\n", + color_primary(&overwrite_http_header_address) + )); + } + + let result = node + .create_http_header_overwrite_service( + ctx, + &overwrite_http_header_address, + cmd.http_header.clone(), + ) + .await; + + match result { + Ok(_) => { + if let Some(pb) = pb.as_ref() { + let created_message = format!( + "Created a new Interceptor Service bound to {}\n", + color_primary(overwrite_http_header_address.to_string()), + ); + pb.set_message(fmt_ok!("{}", created_message)); + } + } + Err(_) => Err(miette!("Failed to create interceptor"))?, + } + + route![overwrite_http_header_address] + } else { + route![] + }; + if let Some(pb) = pb.as_ref() { pb.set_message(format!( "Creating TCP Inlet at {}...\n", @@ -200,6 +246,7 @@ impl Command for CreateCommand { cmd.no_tcp_fallback, cmd.privileged, &cmd.tls_certificate_provider, + prefix_route.clone(), ) .await?; diff --git a/implementations/rust/ockam/ockam_command/src/util/parsers.rs b/implementations/rust/ockam/ockam_command/src/util/parsers.rs index 9645b76ae0f..a3963e2844a 100644 --- a/implementations/rust/ockam/ockam_command/src/util/parsers.rs +++ b/implementations/rust/ockam/ockam_command/src/util/parsers.rs @@ -21,6 +21,17 @@ pub(crate) fn hostname_parser(input: &str) -> Result { )) } +/// Helper function for parsing a key-value pair from user input +/// The input is expected to be in the format `key:value` +pub(crate) fn http_header_parser(input: &str) -> Result<(String, String)> { + let parts: Vec<&str> = input.split(':').collect(); + if parts.len() != 2 { + return Err(miette!("Invalid header format. Expected 'key:value'")); + } + + Ok((parts[0].trim().to_string(), parts[1].trim().to_string())) +} + /// Helper fn for parsing an identifier from user input by using /// [`ockam_identity::Identifier::from_str()`] pub(crate) fn identity_identifier_parser(input: &str) -> Result { @@ -69,3 +80,22 @@ pub(crate) fn duration_to_human_format(duration: &Duration) -> String { } parts.join(" ") } + +#[cfg(test)] +mod test { + use crate::util::parsers::http_header_parser; + + #[test] + pub fn test_http_header_parser() { + assert_eq!( + http_header_parser("key:value").unwrap(), + ("key".to_string(), "value".to_string()) + ); + assert_eq!( + http_header_parser("key: value").unwrap(), + ("key".to_string(), "value".to_string()) + ); + assert!(http_header_parser("key:value:extra").is_err()); + assert!(http_header_parser("key").is_err()); + } +} diff --git a/implementations/rust/ockam/ockam_command/tests/bats/local/portals.bats b/implementations/rust/ockam/ockam_command/tests/bats/local/portals.bats index aa81ecfcf7b..f4f14ebba36 100644 --- a/implementations/rust/ockam/ockam_command/tests/bats/local/portals.bats +++ b/implementations/rust/ockam/ockam_command/tests/bats/local/portals.bats @@ -495,3 +495,31 @@ teardown() { run_success "$OCKAM" tcp-inlet create --from "127.0.0.1:$port" --to /node/n/secure/api/service/outlet --identity alt run_success curl -sfI --retry-all-errors --retry-delay 5 --retry 2 -m 5 "127.0.0.1:$port" } + +@test "portals - http set header" { + inlet_port="$(random_port)" + server_port="$(random_port)" + + # to validate the header has been set, we start an authenticated HTTP server + # and we inject the credential into the request + run_success "$OCKAM" tcp-outlet create --to "127.0.0.1:${server_port}" + run_success "$OCKAM" tcp-inlet create --from "127.0.0.1:${inlet_port}" --to /service/outlet + + uploadserver --basic-auth username:password --bind 127.0.0.1 ${server_port} &>"$HOME/.bats-tests/authenticated_python_server.log" & + server_pid="$!" + + sleep 1 + + trap "kill -9 ${server_pid}" EXIT + + run_failure curl -sf -m 3 "http://127.0.0.1:${inlet_port}" + + authentication="$(echo -n "username:password" | base64)" + run_success "$OCKAM" tcp-inlet delete --all --yes + run_success "$OCKAM" tcp-inlet create --http-header "Authorization: Basic ${authentication}" --from "127.0.0.1:${inlet_port}" --to /service/outlet + + run_success curl -sf --retry-connrefused --retry-delay 5 --retry 2 -m 5 "http://127.0.0.1:${inlet_port}" + + trap - EXIT + kill ${server_pid} +}