From 330b9eef84744b715a3d34aaa96b331e968841f2 Mon Sep 17 00:00:00 2001 From: Davide Baldo Date: Fri, 2 Aug 2024 20:04:31 +0200 Subject: [PATCH] feat(rust): added the possibility to encrypt specific fields in a kafka `JSON` record --- .../ockam_api/src/kafka/inlet_controller.rs | 31 +- .../src/kafka/key_exchange/controller.rs | 45 +- .../ockam_api/src/kafka/key_exchange/mod.rs | 52 ++- .../src/kafka/key_exchange/secure_channels.rs | 10 +- .../src/kafka/protocol_aware/inlet/mod.rs | 39 +- .../src/kafka/protocol_aware/inlet/request.rs | 100 +++-- .../kafka/protocol_aware/inlet/response.rs | 87 +++- .../src/kafka/protocol_aware/inlet/tests.rs | 404 ++++++++++++++++++ .../kafka/protocol_aware/length_delimited.rs | 6 +- .../ockam_api/src/kafka/protocol_aware/mod.rs | 34 +- .../src/kafka/protocol_aware/tests.rs | 8 +- .../src/kafka/tests/integration_test.rs | 5 +- .../src/kafka/tests/interceptor_test.rs | 12 +- .../ockam_api/src/nodes/models/services.rs | 7 + .../src/nodes/service/kafka_services.rs | 7 +- .../src/kafka/consumer/create.rs | 1 + .../ockam_command/src/kafka/inlet/create.rs | 6 + .../src/kafka/producer/create.rs | 1 + .../tests/bats/kafka/docker.bats | 91 ++++ .../ockam/ockam_core/src/routing/route.rs | 6 + 20 files changed, 839 insertions(+), 113 deletions(-) create mode 100644 implementations/rust/ockam/ockam_api/src/kafka/protocol_aware/inlet/tests.rs diff --git a/implementations/rust/ockam/ockam_api/src/kafka/inlet_controller.rs b/implementations/rust/ockam/ockam_api/src/kafka/inlet_controller.rs index 2494d6eb75b..4679acd76f7 100644 --- a/implementations/rust/ockam/ockam_api/src/kafka/inlet_controller.rs +++ b/implementations/rust/ockam/ockam_api/src/kafka/inlet_controller.rs @@ -1,5 +1,3 @@ -use std::fmt::Debug; - use crate::kafka::kafka_outlet_address; use crate::nodes::NodeManager; use crate::port_range::PortRange; @@ -14,6 +12,8 @@ use ockam_core::{Result, Route}; use ockam_multiaddr::MultiAddr; use ockam_node::Context; use ockam_transport_core::HostnamePort; +use std::fmt::Debug; +use std::sync::Weak; type BrokerId = i32; @@ -24,7 +24,7 @@ type BrokerId = i32; pub(crate) struct KafkaInletController { inner: Arc>, policy_expression: Option, - node_manager: Arc, + node_manager: Weak, } impl Debug for KafkaInletController { @@ -68,11 +68,28 @@ impl KafkaInletController { local_interceptor_route, remote_interceptor_route, })), - node_manager, + node_manager: Arc::downgrade(&node_manager), policy_expression, } } + #[cfg(test)] + pub(crate) fn stub() -> KafkaInletController { + Self { + inner: Arc::new(Mutex::new(KafkaInletMapInner { + outlet_node_multiaddr: Default::default(), + broker_map: Default::default(), + current_port: Default::default(), + port_range: PortRange::new(0, 0).unwrap(), + bind_hostname: Default::default(), + local_interceptor_route: Default::default(), + remote_interceptor_route: Default::default(), + })), + node_manager: Weak::new(), + policy_expression: Default::default(), + } + } + #[cfg(test)] pub(crate) async fn retrieve_inlet(&self, broker_id: BrokerId) -> Option { let inner = self.inner.lock().await; @@ -103,7 +120,11 @@ impl KafkaInletController { let inlet_bind_address = HostnamePort::new(inner.bind_hostname.clone(), inner.current_port); - self.node_manager + let node_manager = self.node_manager.upgrade().ok_or_else(|| { + Error::new(Origin::Node, Kind::Internal, "node manager was shut down") + })?; + + node_manager .create_inlet( context, inlet_bind_address.clone(), diff --git a/implementations/rust/ockam/ockam_api/src/kafka/key_exchange/controller.rs b/implementations/rust/ockam/ockam_api/src/kafka/key_exchange/controller.rs index b1969d38e1d..0641f2560a5 100644 --- a/implementations/rust/ockam/ockam_api/src/kafka/key_exchange/controller.rs +++ b/implementations/rust/ockam/ockam_api/src/kafka/key_exchange/controller.rs @@ -1,4 +1,5 @@ -use crate::kafka::key_exchange::{KafkaEncryptedContent, TopicPartition}; +use crate::kafka::key_exchange::{KafkaKeyExchangeController, TopicPartition}; +use crate::kafka::protocol_aware::KafkaEncryptedContent; use crate::kafka::{ConsumerPublishing, ConsumerResolution}; use crate::nodes::NodeManager; use ockam::identity::{ @@ -7,36 +8,27 @@ use ockam::identity::{ }; use ockam_abac::PolicyAccessControl; use ockam_core::compat::collections::{HashMap, HashSet}; -use ockam_core::{route, Address}; +use ockam_core::{async_trait, route, Address}; use ockam_node::Context; use std::sync::Arc; use tokio::sync::Mutex; #[derive(Clone)] -pub(crate) struct KafkaKeyExchangeController { - pub(crate) inner: Arc>, +pub(crate) struct KafkaKeyExchangeControllerImpl { + pub(crate) inner: Arc>, } -/// Offer simple APIs to encrypt and decrypt kafka messages. -/// Underneath it creates secure channels for each topic/partition -/// and uses them to encrypt the content. -/// Multiple secure channels may be created for the same topic/partition -/// but each will be explicitly labeled. -impl KafkaKeyExchangeController { - /// Encrypts the content specifically for the consumer waiting for that topic name and - /// partition. - /// To do so it'll create a secure channel which will be used for key exchange only. - /// The secure channel will be created only once and then re-used, hence the first time will - /// be slower, and may take up to few seconds. - pub async fn encrypt_content( +#[async_trait] +impl KafkaKeyExchangeController for KafkaKeyExchangeControllerImpl { + async fn encrypt_content( &self, context: &mut Context, topic_name: &str, - partition_id: i32, + partition_index: i32, content: Vec, ) -> ockam_core::Result { let secure_channel_entry = self - .get_or_create_secure_channel(context, topic_name, partition_id) + .get_or_create_secure_channel(context, topic_name, partition_index) .await?; let consumer_decryptor_address = secure_channel_entry.their_decryptor_address(); @@ -64,9 +56,7 @@ impl KafkaKeyExchangeController { }) } - /// Decrypts the content based on the consumer decryptor address - /// the secure channel is expected to be already initialized. - pub async fn decrypt_content( + async fn decrypt_content( &self, context: &mut Context, consumer_decryptor_address: &Address, @@ -97,10 +87,7 @@ impl KafkaKeyExchangeController { Ok(decrypted_content) } - /// Starts relays in the orchestrator for each {topic_name}_{partition} combination - /// should be used only by the consumer. - /// does nothing if they were already created, but fails it they already exist. - pub async fn publish_consumer( + async fn publish_consumer( &self, context: &mut Context, topic_name: &str, @@ -138,7 +125,7 @@ impl KafkaKeyExchangeController { } } -pub struct InnerSecureChannelControllerImpl { +pub struct InnerSecureChannelController { // we identify the secure channel instance by using the decryptor address of the consumer // which is known to both parties pub(crate) topic_encryptor_map: HashMap, @@ -157,7 +144,7 @@ pub struct InnerSecureChannelControllerImpl { pub(crate) producer_policy_access_control: PolicyAccessControl, } -impl KafkaKeyExchangeController { +impl KafkaKeyExchangeControllerImpl { pub(crate) fn new( node_manager: Arc, secure_channels: Arc, @@ -165,9 +152,9 @@ impl KafkaKeyExchangeController { consumer_publishing: ConsumerPublishing, consumer_policy_access_control: PolicyAccessControl, producer_policy_access_control: PolicyAccessControl, - ) -> KafkaKeyExchangeController { + ) -> KafkaKeyExchangeControllerImpl { Self { - inner: Arc::new(Mutex::new(InnerSecureChannelControllerImpl { + inner: Arc::new(Mutex::new(InnerSecureChannelController { topic_encryptor_map: Default::default(), identity_encryptor_map: Default::default(), topic_relay_set: Default::default(), diff --git a/implementations/rust/ockam/ockam_api/src/kafka/key_exchange/mod.rs b/implementations/rust/ockam/ockam_api/src/kafka/key_exchange/mod.rs index a0134fd274b..57747f6db7d 100644 --- a/implementations/rust/ockam/ockam_api/src/kafka/key_exchange/mod.rs +++ b/implementations/rust/ockam/ockam_api/src/kafka/key_exchange/mod.rs @@ -1,18 +1,12 @@ +use crate::kafka::protocol_aware::KafkaEncryptedContent; use minicbor::{CborLen, Decode, Encode}; - -use ockam_core::Address; +use ockam_core::{async_trait, Address}; use ockam_multiaddr::MultiAddr; +use ockam_node::Context; pub(crate) mod controller; mod secure_channels; -pub(crate) struct KafkaEncryptedContent { - /// The encrypted content - pub(crate) content: Vec, - /// The secure channel identifier used to encrypt the content - pub(crate) consumer_decryptor_address: Address, -} - /// Describe how to reach the consumer node: either directly or through a relay #[derive(Debug, Clone, Encode, Decode, CborLen)] #[rustfmt::skip] @@ -32,3 +26,43 @@ pub enum ConsumerPublishing { } type TopicPartition = (String, i32); + +/// Offer simple APIs to encrypt and decrypt kafka messages. +/// Underneath it creates secure channels for each topic/partition +/// and uses them to encrypt the content. +/// Multiple secure channels may be created for the same topic/partition +/// but each will be explicitly labeled. +#[async_trait] +pub(crate) trait KafkaKeyExchangeController: Send + Sync + 'static { + /// Encrypts the content specifically for the consumer waiting for that topic name and + /// partition. + /// To do so, it'll create a secure channel which will be used for key exchange only. + /// The secure channel will be created only once and then re-used, hence the first time will + /// be slower, and may take up to few seconds. + async fn encrypt_content( + &self, + context: &mut Context, + topic_name: &str, + partition_index: i32, + content: Vec, + ) -> ockam_core::Result; + + /// Decrypts the content based on the consumer decryptor address + /// the secure channel is expected to be already initialized. + async fn decrypt_content( + &self, + context: &mut Context, + consumer_decryptor_address: &Address, + encrypted_content: Vec, + ) -> ockam_core::Result>; + + /// Starts relays in the orchestrator for each {topic_name}_{partition} combination + /// should be used only by the consumer. + /// does nothing if they were already created, but fails it they already exist. + async fn publish_consumer( + &self, + context: &mut Context, + topic_name: &str, + partitions: Vec, + ) -> ockam_core::Result<()>; +} diff --git a/implementations/rust/ockam/ockam_api/src/kafka/key_exchange/secure_channels.rs b/implementations/rust/ockam/ockam_api/src/kafka/key_exchange/secure_channels.rs index 63ad2e9cf5e..a8317e06330 100644 --- a/implementations/rust/ockam/ockam_api/src/kafka/key_exchange/secure_channels.rs +++ b/implementations/rust/ockam/ockam_api/src/kafka/key_exchange/secure_channels.rs @@ -1,5 +1,5 @@ use crate::kafka::key_exchange::controller::{ - InnerSecureChannelControllerImpl, KafkaKeyExchangeController, + InnerSecureChannelController, KafkaKeyExchangeControllerImpl, }; use crate::kafka::ConsumerResolution; use crate::nodes::service::SecureChannelType; @@ -12,10 +12,10 @@ use ockam_multiaddr::MultiAddr; use ockam_node::Context; use tokio::sync::MutexGuard; -impl KafkaKeyExchangeController { +impl KafkaKeyExchangeControllerImpl { /// Creates a secure channel for the given destination. async fn create_secure_channel( - inner: &MutexGuard<'_, InnerSecureChannelControllerImpl>, + inner: &MutexGuard<'_, InnerSecureChannelController>, context: &Context, mut destination: MultiAddr, ) -> Result
{ @@ -37,7 +37,7 @@ impl KafkaKeyExchangeController { /// Creates a secure channel for the given destination, for key exchange only. async fn create_key_exchange_only_secure_channel( - inner: &MutexGuard<'_, InnerSecureChannelControllerImpl>, + inner: &MutexGuard<'_, InnerSecureChannelController>, context: &Context, mut destination: MultiAddr, ) -> Result
{ @@ -180,7 +180,7 @@ impl KafkaKeyExchangeController { } async fn validate_consumer_credentials( - inner: &MutexGuard<'_, InnerSecureChannelControllerImpl>, + inner: &MutexGuard<'_, InnerSecureChannelController>, entry: &SecureChannelRegistryEntry, ) -> Result<()> { let authorized = inner diff --git a/implementations/rust/ockam/ockam_api/src/kafka/protocol_aware/inlet/mod.rs b/implementations/rust/ockam/ockam_api/src/kafka/protocol_aware/inlet/mod.rs index ac2311f660a..d640f42168f 100644 --- a/implementations/rust/ockam/ockam_api/src/kafka/protocol_aware/inlet/mod.rs +++ b/implementations/rust/ockam/ockam_api/src/kafka/protocol_aware/inlet/mod.rs @@ -1,4 +1,5 @@ -use crate::kafka::key_exchange::controller::KafkaKeyExchangeController; +use crate::kafka::key_exchange::controller::KafkaKeyExchangeControllerImpl; +use crate::kafka::key_exchange::KafkaKeyExchangeController; use crate::kafka::protocol_aware::{ CorrelationId, KafkaMessageInterceptor, KafkaMessageInterceptorWrapper, RequestInfo, TopicUuidMap, MAX_KAFKA_MESSAGE_SIZE, @@ -12,13 +13,17 @@ use std::sync::{Arc, Mutex}; mod request; mod response; +#[cfg(test)] +mod tests; + #[derive(Clone)] pub(crate) struct InletInterceptorImpl { request_map: Arc>>, uuid_to_name: TopicUuidMap, - key_exchange_controller: KafkaKeyExchangeController, + key_exchange_controller: Arc, inlet_map: KafkaInletController, encrypt_content: bool, + encrypted_fields: Vec, } #[async_trait] @@ -26,10 +31,11 @@ impl KafkaMessageInterceptor for InletInterceptorImpl {} impl InletInterceptorImpl { pub(crate) fn new( - key_exchange_controller: KafkaKeyExchangeController, + key_exchange_controller: Arc, uuid_to_name: TopicUuidMap, inlet_map: KafkaInletController, encrypt_content: bool, + encrypted_fields: Vec, ) -> InletInterceptorImpl { Self { request_map: Arc::new(Mutex::new(Default::default())), @@ -37,28 +43,48 @@ impl InletInterceptorImpl { key_exchange_controller, inlet_map, encrypt_content, + encrypted_fields, } } + + #[cfg(test)] + pub(crate) fn add_request( + &self, + correlation_id: CorrelationId, + api_key: kafka_protocol::messages::ApiKey, + api_version: i16, + ) { + self.request_map.lock().unwrap().insert( + correlation_id, + RequestInfo { + request_api_key: api_key, + request_api_version: api_version, + }, + ); + } } pub(crate) struct KafkaInletInterceptorFactory { - secure_channel_controller: KafkaKeyExchangeController, + secure_channel_controller: KafkaKeyExchangeControllerImpl, uuid_to_name: TopicUuidMap, inlet_map: KafkaInletController, encrypt_content: bool, + encrypted_fields: Vec, } impl KafkaInletInterceptorFactory { pub(crate) fn new( - secure_channel_controller: KafkaKeyExchangeController, + secure_channel_controller: KafkaKeyExchangeControllerImpl, inlet_map: KafkaInletController, encrypt_content: bool, + encrypted_fields: Vec, ) -> Self { Self { secure_channel_controller, uuid_to_name: Default::default(), inlet_map, encrypt_content, + encrypted_fields, } } } @@ -67,10 +93,11 @@ impl PortalInterceptorFactory for KafkaInletInterceptorFactory { fn create(&self) -> Arc { Arc::new(KafkaMessageInterceptorWrapper::new( Arc::new(InletInterceptorImpl::new( - self.secure_channel_controller.clone(), + Arc::new(self.secure_channel_controller.clone()), self.uuid_to_name.clone(), self.inlet_map.clone(), self.encrypt_content, + self.encrypted_fields.clone(), )), MAX_KAFKA_MESSAGE_SIZE, )) diff --git a/implementations/rust/ockam/ockam_api/src/kafka/protocol_aware/inlet/request.rs b/implementations/rust/ockam/ockam_api/src/kafka/protocol_aware/inlet/request.rs index c777d4c7e1f..3d37616c65b 100644 --- a/implementations/rust/ockam/ockam_api/src/kafka/protocol_aware/inlet/request.rs +++ b/implementations/rust/ockam/ockam_api/src/kafka/protocol_aware/inlet/request.rs @@ -1,12 +1,12 @@ use crate::kafka::protocol_aware::inlet::InletInterceptorImpl; use crate::kafka::protocol_aware::utils::{decode_body, encode_request}; +use crate::kafka::protocol_aware::RequestInfo; use crate::kafka::protocol_aware::{InterceptError, KafkaMessageRequestInterceptor}; -use crate::kafka::protocol_aware::{MessageWrapper, RequestInfo}; use bytes::{Bytes, BytesMut}; use kafka_protocol::messages::fetch_request::FetchRequest; -use kafka_protocol::messages::produce_request::ProduceRequest; +use kafka_protocol::messages::produce_request::{PartitionProduceData, ProduceRequest}; use kafka_protocol::messages::request_header::RequestHeader; -use kafka_protocol::messages::ApiKey; +use kafka_protocol::messages::{ApiKey, TopicName}; use kafka_protocol::protocol::buf::ByteBuf; use kafka_protocol::protocol::Decodable; use kafka_protocol::records::{ @@ -187,32 +187,21 @@ impl InletInterceptorImpl { for record in records.iter_mut() { if let Some(record_value) = record.value.take() { - let encrypted_content = self - .key_exchange_controller - .encrypt_content( + let buffer = if !self.encrypted_fields.is_empty() { + // if we encrypt only specific fields, we assume the record must be + // valid JSON map + self.encrypt_specific_fields( context, topic_name, - data.index, - record_value.to_vec(), + data, + &record_value, ) - .await - .map_err(InterceptError::Ockam)?; - - // TODO: to target multiple consumers we could duplicate - // the content with a dedicated encryption for each consumer - let wrapper = MessageWrapper { - consumer_decryptor_address: encrypted_content - .consumer_decryptor_address, - content: encrypted_content.content, + .await? + } else { + self.encrypt_whole_record(context, topic_name, data, record_value) + .await? }; - - let mut write_buffer = Vec::with_capacity(1024); - let mut encoder = Encoder::new(&mut write_buffer); - encoder - .encode(wrapper) - .map_err(|_err| InterceptError::InvalidData)?; - - record.value = Some(write_buffer.into()); + record.value = Some(buffer.into()); } } @@ -239,4 +228,65 @@ impl InletInterceptorImpl { ApiKey::ProduceKey, ) } + + async fn encrypt_whole_record( + &self, + context: &mut Context, + topic_name: &TopicName, + data: &mut PartitionProduceData, + record_value: Bytes, + ) -> Result, InterceptError> { + let encrypted_content = self + .key_exchange_controller + .encrypt_content(context, topic_name, data.index, record_value.to_vec()) + .await + .map_err(InterceptError::Ockam)?; + + let mut write_buffer = Vec::with_capacity(1024); + let mut encoder = Encoder::new(&mut write_buffer); + encoder + .encode(encrypted_content) + .map_err(|_err| InterceptError::InvalidData)?; + + Ok(write_buffer) + } + + async fn encrypt_specific_fields( + &self, + context: &mut Context, + topic_name: &TopicName, + data: &mut PartitionProduceData, + record_value: &Bytes, + ) -> Result, InterceptError> { + let mut record_value = serde_json::from_slice::(record_value)?; + + if let serde_json::Value::Object(map) = &mut record_value { + for field in &self.encrypted_fields { + if let Some(value) = map.get_mut(field) { + let encrypted_content = self + .key_exchange_controller + .encrypt_content( + context, + topic_name, + data.index, + serde_json::to_vec(value).map_err(|_| InterceptError::InvalidData)?, + ) + .await + .map_err(InterceptError::Ockam)?; + + let mut write_buffer = Vec::with_capacity(1024); + let mut encoder = Encoder::new(&mut write_buffer); + encoder + .encode(encrypted_content) + .map_err(|_| InterceptError::InvalidData)?; + *value = serde_json::Value::String(hex::encode(&write_buffer)); + } + } + } else { + warn!("only JSON objects are supported for field encryption"); + return Err("Only JSON objects are supported".into()); + } + + Ok(record_value.to_string().as_bytes().to_vec()) + } } diff --git a/implementations/rust/ockam/ockam_api/src/kafka/protocol_aware/inlet/response.rs b/implementations/rust/ockam/ockam_api/src/kafka/protocol_aware/inlet/response.rs index 84a40fe95d0..a9a5c5d35af 100644 --- a/implementations/rust/ockam/ockam_api/src/kafka/protocol_aware/inlet/response.rs +++ b/implementations/rust/ockam/ockam_api/src/kafka/protocol_aware/inlet/response.rs @@ -1,7 +1,7 @@ use crate::kafka::protocol_aware::inlet::InletInterceptorImpl; use crate::kafka::protocol_aware::utils::{decode_body, encode_response}; use crate::kafka::protocol_aware::{ - InterceptError, KafkaMessageResponseInterceptor, MessageWrapper, RequestInfo, + InterceptError, KafkaEncryptedContent, KafkaMessageResponseInterceptor, RequestInfo, }; use crate::kafka::KafkaInletController; use bytes::{Bytes, BytesMut}; @@ -230,21 +230,11 @@ impl InletInterceptorImpl { for record in records.iter_mut() { if let Some(record_value) = record.value.take() { - let message_wrapper: MessageWrapper = - Decoder::new(record_value.as_ref()) - .decode() - .map_err(|_| InterceptError::InvalidData)?; - - let decrypted_content = self - .key_exchange_controller - .decrypt_content( - context, - &message_wrapper.consumer_decryptor_address, - message_wrapper.content, - ) - .await - .map_err(InterceptError::Ockam)?; - + let decrypted_content = if self.encrypted_fields.is_empty() { + self.decrypt_whole_record(context, record_value).await? + } else { + self.decrypt_specific_fields(context, record_value).await? + }; record.value = Some(decrypted_content.into()); } } @@ -271,4 +261,69 @@ impl InletInterceptorImpl { ApiKey::FetchKey, ) } + + async fn decrypt_whole_record( + &self, + context: &mut Context, + record_value: Bytes, + ) -> Result, InterceptError> { + let message_wrapper: KafkaEncryptedContent = + Decoder::new(record_value.as_ref()).decode()?; + + self.key_exchange_controller + .decrypt_content( + context, + &message_wrapper.consumer_decryptor_address, + message_wrapper.content, + ) + .await + .map_err(InterceptError::Ockam) + } + + async fn decrypt_specific_fields( + &self, + context: &mut Context, + record_value: Bytes, + ) -> Result, InterceptError> { + let mut record_value = serde_json::from_slice::(&record_value)?; + + if let serde_json::Value::Object(map) = &mut record_value { + for field in &self.encrypted_fields { + // when the encrypted field is present is expected to be a hex encoded string + // wrapped by the KafkaEncryptedContent struct + if let Some(value) = map.get_mut(field) { + let encrypted_content = if let serde_json::Value::String(string) = value { + hex::decode(string).map_err(|_| "Encrypted is not a valid hex string")? + } else { + error!("encrypted field is not a hex string"); + return Err("The encrypted field is not a hex-encoded string".into()); + }; + + let message_wrapper: KafkaEncryptedContent = + Decoder::new(&encrypted_content).decode()?; + + let decrypted_content = self + .key_exchange_controller + .decrypt_content( + context, + &message_wrapper.consumer_decryptor_address, + message_wrapper.content, + ) + .await + .map_err(InterceptError::Ockam)?; + + *value = serde_json::from_slice(decrypted_content.as_slice())?; + } + } + serde_json::to_vec(&record_value).map_err(|error| { + error!("cannot serialize decrypted fields"); + error.into() + }) + } else { + error!( + "cannot decrypt specific fields, expected a JSON object but got a different type" + ); + Err("Only JSON objects are supported in the message".into()) + } + } } diff --git a/implementations/rust/ockam/ockam_api/src/kafka/protocol_aware/inlet/tests.rs b/implementations/rust/ockam/ockam_api/src/kafka/protocol_aware/inlet/tests.rs new file mode 100644 index 00000000000..505319e3751 --- /dev/null +++ b/implementations/rust/ockam/ockam_api/src/kafka/protocol_aware/inlet/tests.rs @@ -0,0 +1,404 @@ +use crate::kafka::key_exchange::KafkaKeyExchangeController; +use crate::kafka::protocol_aware::inlet::InletInterceptorImpl; +use crate::kafka::protocol_aware::{ + utils, KafkaEncryptedContent, KafkaMessageRequestInterceptor, KafkaMessageResponseInterceptor, +}; +use crate::kafka::KafkaInletController; +use bytes::BytesMut; +use indexmap::IndexMap; +use kafka_protocol::messages::fetch_response::{FetchableTopicResponse, PartitionData}; +use kafka_protocol::messages::produce_request::{PartitionProduceData, TopicProduceData}; +use kafka_protocol::messages::ApiKey::ProduceKey; +use kafka_protocol::messages::{ + ApiKey, FetchResponse, ProduceRequest, RequestHeader, ResponseHeader, TopicName, +}; +use kafka_protocol::protocol::{Builder, Decodable, StrBytes}; +use kafka_protocol::records::{ + Compression, Record, RecordBatchDecoder, RecordBatchEncoder, RecordEncodeOptions, TimestampType, +}; +use minicbor::{Decoder, Encoder}; +use ockam_core::{async_trait, Address}; +use ockam_node::Context; +use serde_json::json; +use std::sync::Arc; + +const ENCRYPTED_PREFIX: &[u8] = b"encrypted:"; +const PREFIX_LEN: usize = ENCRYPTED_PREFIX.len(); + +struct MockKafkaKeyExchangeController; + +#[async_trait] +impl KafkaKeyExchangeController for MockKafkaKeyExchangeController { + async fn encrypt_content( + &self, + _context: &mut Context, + _topic_name: &str, + _partition_index: i32, + content: Vec, + ) -> ockam_core::Result { + let mut new_content = ENCRYPTED_PREFIX.to_vec(); + new_content.extend_from_slice(&content); + Ok(KafkaEncryptedContent { + consumer_decryptor_address: Address::from_string("mock"), + content: new_content, + }) + } + + async fn decrypt_content( + &self, + _context: &mut Context, + _consumer_decryptor_address: &Address, + encrypted_content: Vec, + ) -> ockam_core::Result> { + Ok(encrypted_content[PREFIX_LEN..].to_vec()) + } + + async fn publish_consumer( + &self, + _context: &mut Context, + _topic_name: &str, + _partitions: Vec, + ) -> ockam_core::Result<()> { + Ok(()) + } +} + +const TEST_KAFKA_API_VERSION: i16 = 13; + +pub fn create_kafka_produce_request(content: &[u8]) -> BytesMut { + let header = RequestHeader::builder() + .request_api_key(ApiKey::ProduceKey as i16) + .request_api_version(TEST_KAFKA_API_VERSION) + .correlation_id(1) + .client_id(Some(StrBytes::from_static_str("my-client-id"))) + .unknown_tagged_fields(Default::default()) + .build() + .unwrap(); + + let mut encoded = BytesMut::new(); + RecordBatchEncoder::encode( + &mut encoded, + [Record { + transactional: false, + control: false, + partition_leader_epoch: 0, + producer_id: 0, + producer_epoch: 0, + timestamp_type: TimestampType::Creation, + offset: 0, + sequence: 0, + timestamp: 0, + key: None, + value: Some(BytesMut::from(content).freeze()), + headers: Default::default(), + }] + .iter(), + &RecordEncodeOptions { + version: 2, + compression: Compression::None, + }, + ) + .unwrap(); + + let mut topic_data = IndexMap::new(); + topic_data.insert( + TopicName::from(StrBytes::from_static_str("topic-name")), + TopicProduceData::builder() + .partition_data(vec![PartitionProduceData::builder() + .index(1) + .records(Some(encoded.freeze())) + .unknown_tagged_fields(Default::default()) + .build() + .unwrap()]) + .unknown_tagged_fields(Default::default()) + .build() + .unwrap(), + ); + let request = ProduceRequest::builder() + .transactional_id(None) + .acks(0) + .timeout_ms(0) + .topic_data(topic_data) + .unknown_tagged_fields(Default::default()) + .build() + .unwrap(); + + utils::encode_request( + &header, + &request, + TEST_KAFKA_API_VERSION, + ApiKey::ProduceKey, + ) + .unwrap() +} + +pub fn create_kafka_fetch_response(content: &[u8]) -> BytesMut { + let header = ResponseHeader::builder() + .correlation_id(1) + .unknown_tagged_fields(Default::default()) + .build() + .unwrap(); + + let mut encoded = BytesMut::new(); + RecordBatchEncoder::encode( + &mut encoded, + [Record { + transactional: false, + control: false, + partition_leader_epoch: 0, + producer_id: 0, + producer_epoch: 0, + timestamp_type: TimestampType::Creation, + offset: 0, + sequence: 0, + timestamp: 0, + key: None, + value: Some(BytesMut::from(content).freeze()), + headers: Default::default(), + }] + .iter(), + &RecordEncodeOptions { + version: 2, + compression: Compression::None, + }, + ) + .unwrap(); + + let response = FetchResponse::builder() + .throttle_time_ms(Default::default()) + .error_code(Default::default()) + .session_id(Default::default()) + .responses(vec![FetchableTopicResponse::builder() + .topic(TopicName::from(StrBytes::from_static_str("topic-name"))) + .topic_id(Default::default()) + .partitions(vec![PartitionData::builder() + .partition_index(1) + .error_code(Default::default()) + .high_watermark(Default::default()) + .last_stable_offset(Default::default()) + .log_start_offset(Default::default()) + .diverging_epoch(Default::default()) + .current_leader(Default::default()) + .snapshot_id(Default::default()) + .aborted_transactions(Default::default()) + .preferred_read_replica(Default::default()) + .records(Some(encoded.freeze())) + .unknown_tagged_fields(Default::default()) + .build() + .unwrap()]) + .unknown_tagged_fields(Default::default()) + .build() + .unwrap()]) + .unknown_tagged_fields(Default::default()) + .build() + .unwrap(); + + utils::encode_response(&header, &response, TEST_KAFKA_API_VERSION, ApiKey::FetchKey).unwrap() +} + +pub fn parse_produce_request(content: &[u8]) -> ProduceRequest { + let mut buffer = BytesMut::from(content); + let _header = RequestHeader::decode( + &mut buffer, + ProduceKey.request_header_version(TEST_KAFKA_API_VERSION), + ) + .unwrap(); + utils::decode_body(&mut buffer, TEST_KAFKA_API_VERSION).unwrap() +} + +pub fn parse_fetch_response(content: &[u8]) -> FetchResponse { + let mut buffer = BytesMut::from(content); + let _header = ResponseHeader::decode( + &mut buffer, + ApiKey::FetchKey.response_header_version(TEST_KAFKA_API_VERSION), + ) + .unwrap(); + utils::decode_body(&mut buffer, TEST_KAFKA_API_VERSION).unwrap() +} + +pub fn decode_field_value(value: String) -> serde_json::Value { + let value = hex::decode(value).unwrap(); + let encrypted_content: KafkaEncryptedContent = Decoder::new(value.as_ref()).decode().unwrap(); + assert_eq!( + encrypted_content.consumer_decryptor_address, + Address::from_string("mock") + ); + + let encrypted_tag = + String::from_utf8(encrypted_content.content[0..PREFIX_LEN].to_vec()).unwrap(); + assert_eq!(encrypted_tag.as_bytes(), ENCRYPTED_PREFIX); + + let cleartext_content = encrypted_content.content[PREFIX_LEN..].to_vec(); + serde_json::from_slice::(&cleartext_content).unwrap() +} + +pub fn encode_field_value(value: serde_json::Value) -> String { + let cleartext_content = serde_json::to_vec(&value).unwrap(); + let mut encrypted_content = ENCRYPTED_PREFIX.to_vec(); + encrypted_content.extend_from_slice(&cleartext_content); + + let mut write_buffer = Vec::new(); + let mut encoder = Encoder::new(&mut write_buffer); + encoder + .encode(KafkaEncryptedContent { + consumer_decryptor_address: Address::from_string("mock"), + content: encrypted_content, + }) + .unwrap(); + + hex::encode(&write_buffer) +} + +#[ockam::test] +pub async fn json_encrypt_specific_fields(context: &mut Context) -> ockam::Result<()> { + let interceptor = InletInterceptorImpl::new( + Arc::new(MockKafkaKeyExchangeController {}), + Default::default(), + KafkaInletController::stub(), + true, + vec![ + "field1".to_string(), + "field2".to_string(), + "field3".to_string(), + ], + ); + + let encrypted_response = interceptor + .intercept_request( + context, + create_kafka_produce_request( + json!( + { + "field1": "value1", + "field2": { + "nested_field1": "nested_value1", + "nested_field2": "nested_value2" + }, + "field3": [ + "array_value1", + "array_value2" + ], + "cleartext_field": "cleartext_value" + } + ) + .to_string() + .as_bytes(), + ), + ) + .await + .unwrap(); + + let request = parse_produce_request(&encrypted_response); + let topic_data = request.topic_data.first().unwrap(); + assert_eq!("topic-name", topic_data.0 .0.as_str()); + + let mut batch_content = topic_data + .1 + .partition_data + .first() + .cloned() + .unwrap() + .records + .unwrap(); + + let records = RecordBatchDecoder::decode(&mut batch_content).unwrap(); + let record = records.first().unwrap(); + let record_content = record.value.clone().unwrap(); + + // The record content is a JSON object + let json: serde_json::value::Value = serde_json::from_slice(&record_content).unwrap(); + let map = json.as_object().unwrap(); + + let field1_value = decode_field_value(map.get("field1").unwrap().as_str().unwrap().to_string()); + assert_eq!(field1_value, json!("value1")); + + let field2_value = decode_field_value(map.get("field2").unwrap().as_str().unwrap().to_string()); + assert_eq!( + field2_value, + json!({"nested_field1": "nested_value1", "nested_field2": "nested_value2"}) + ); + + let field3_value = decode_field_value(map.get("field3").unwrap().as_str().unwrap().to_string()); + assert_eq!(field3_value, json!(["array_value1", "array_value2"])); + + let cleartext_value = map.get("cleartext_field").unwrap().as_str().unwrap(); + assert_eq!(cleartext_value, "cleartext_value"); + + Ok(()) +} + +#[ockam::test] +pub async fn json_decrypt_specific_fields(context: &mut Context) -> ockam::Result<()> { + let interceptor = InletInterceptorImpl::new( + Arc::new(MockKafkaKeyExchangeController {}), + Default::default(), + KafkaInletController::stub(), + true, + vec![ + "field1".to_string(), + "field2".to_string(), + "field3".to_string(), + ], + ); + + interceptor.add_request(1, ApiKey::FetchKey, TEST_KAFKA_API_VERSION); + + let field1_value = encode_field_value(json!("value1")); + let field2_value = encode_field_value(json!({ + "nested_field1": "nested_value1", + "nested_field2": "nested_value2" + })); + let field3_value = encode_field_value(json!(["array_value1", "array_value2"])); + + let cleartext_response = interceptor + .intercept_response( + context, + create_kafka_fetch_response( + json!( + { + "field1": field1_value, + "field2": field2_value, + "field3": field3_value, + "cleartext_field": "cleartext_value" + } + ) + .to_string() + .as_bytes(), + ), + ) + .await + .unwrap(); + + let response = parse_fetch_response(&cleartext_response); + let partition_data = response + .responses + .first() + .unwrap() + .partitions + .first() + .unwrap(); + let mut records = partition_data.records.clone().unwrap(); + let records = RecordBatchDecoder::decode(&mut records).unwrap(); + + let record = records.first().unwrap(); + let value = + serde_json::from_slice::(record.value.as_ref().unwrap()).unwrap(); + + assert_eq!( + json!({ + "field1": "value1", + "field2": { + "nested_field1": "nested_value1", + "nested_field2": "nested_value2" + }, + "field3": [ + "array_value1", + "array_value2" + ], + "cleartext_field": "cleartext_value" + }), + value + ); + + Ok(()) +} diff --git a/implementations/rust/ockam/ockam_api/src/kafka/protocol_aware/length_delimited.rs b/implementations/rust/ockam/ockam_api/src/kafka/protocol_aware/length_delimited.rs index cc134249493..a8b4fa27087 100644 --- a/implementations/rust/ockam/ockam_api/src/kafka/protocol_aware/length_delimited.rs +++ b/implementations/rust/ockam/ockam_api/src/kafka/protocol_aware/length_delimited.rs @@ -10,7 +10,7 @@ pub(super) struct KafkaMessageDecoder { } impl KafkaMessageDecoder { - pub(super) fn new() -> Self { + pub(crate) fn new() -> Self { Self { buffer: Default::default(), current_message_length: 0, @@ -18,7 +18,7 @@ impl KafkaMessageDecoder { } /// Accepts length encoded messages, returns complete messages - pub(super) fn extract_complete_messages( + pub(crate) fn extract_complete_messages( &mut self, mut incoming: BytesMut, max_message_size: u32, @@ -68,7 +68,7 @@ impl KafkaMessageDecoder { } /// Return a length encoded message -pub(super) fn length_encode(content: BytesMut) -> ockam::Result { +pub(crate) fn length_encode(content: BytesMut) -> ockam::Result { let mut buffer = BytesMut::new(); if content.len() >= u32::MAX as usize { Err(Error::new( diff --git a/implementations/rust/ockam/ockam_api/src/kafka/protocol_aware/mod.rs b/implementations/rust/ockam/ockam_api/src/kafka/protocol_aware/mod.rs index 1a3c8a165f2..2e1644dba45 100644 --- a/implementations/rust/ockam/ockam_api/src/kafka/protocol_aware/mod.rs +++ b/implementations/rust/ockam/ockam_api/src/kafka/protocol_aware/mod.rs @@ -117,6 +117,18 @@ impl PortalInterceptor for KafkaMessageInterceptorWrapper { ockam_core::Error::new(Origin::Transport, Kind::Io, "Invalid data") } InterceptError::Ockam(error) => error, + InterceptError::Io(error) => { + ockam_core::Error::new(Origin::Transport, Kind::Io, error) + } + InterceptError::Serde(error) => { + ockam_core::Error::new(Origin::Transport, Kind::Io, error) + } + InterceptError::Minicbor(error) => { + ockam_core::Error::new(Origin::Transport, Kind::Io, error) + } + InterceptError::Generic(error) => { + ockam_core::Error::new(Origin::Transport, Kind::Io, error) + } })?; // avoid copying the first message @@ -134,9 +146,11 @@ impl PortalInterceptor for KafkaMessageInterceptorWrapper { #[derive(Debug, Clone, Encode, Decode, CborLen)] #[rustfmt::skip] /// Wraps the content within every record batch -struct MessageWrapper { - #[n(0)] consumer_decryptor_address: Address, - #[n(1)] content: Vec +pub(crate) struct KafkaEncryptedContent { + /// The secure channel identifier used to encrypt the content + #[n(0)] pub consumer_decryptor_address: Address, + /// The encrypted content + #[n(1)] pub content: Vec } /// By default, kafka supports up to 1MB messages. 16MB is the maximum suggested @@ -145,6 +159,14 @@ pub(crate) const MAX_KAFKA_MESSAGE_SIZE: u32 = 16 * 1024 * 1024; // internal error to return both io and ockam errors #[derive(Debug, thiserror::Error)] pub(crate) enum InterceptError { + #[error("IO error: {0}")] + Io(#[from] std::io::Error), + #[error("Serde error: {0}")] + Serde(#[from] serde_json::Error), + #[error("CBOR error: {0}")] + Minicbor(#[from] minicbor::decode::Error), + #[error("{0}")] + Generic(&'static str), #[error("Unexpected kafka protocol data")] InvalidData, #[error("{0}")] @@ -156,3 +178,9 @@ impl From for InterceptError { InterceptError::InvalidData } } + +impl From<&'static str> for InterceptError { + fn from(error: &'static str) -> Self { + InterceptError::Generic(error) + } +} diff --git a/implementations/rust/ockam/ockam_api/src/kafka/protocol_aware/tests.rs b/implementations/rust/ockam/ockam_api/src/kafka/protocol_aware/tests.rs index 5ec2c25ce67..e0c538c951e 100644 --- a/implementations/rust/ockam/ockam_api/src/kafka/protocol_aware/tests.rs +++ b/implementations/rust/ockam/ockam_api/src/kafka/protocol_aware/tests.rs @@ -1,7 +1,7 @@ #[cfg(test)] mod test { use crate::kafka::inlet_controller::KafkaInletController; - use crate::kafka::key_exchange::controller::KafkaKeyExchangeController; + use crate::kafka::key_exchange::controller::KafkaKeyExchangeControllerImpl; use crate::kafka::protocol_aware::inlet::InletInterceptorImpl; use crate::kafka::protocol_aware::utils::{encode_request, encode_response}; use crate::kafka::protocol_aware::{ @@ -19,6 +19,7 @@ mod test { use ockam_core::route; use ockam_multiaddr::MultiAddr; use ockam_node::Context; + use std::sync::Arc; #[allow(non_snake_case)] #[ockam_macros::test(timeout = 5_000)] @@ -57,7 +58,7 @@ mod test { Some(handle.node_manager.identifier()), ); - let secure_channel_controller = KafkaKeyExchangeController::new( + let secure_channel_controller = KafkaKeyExchangeControllerImpl::new( (*handle.node_manager).clone(), secure_channels, ConsumerResolution::None, @@ -67,10 +68,11 @@ mod test { ); let interceptor = InletInterceptorImpl::new( - secure_channel_controller, + Arc::new(secure_channel_controller), Default::default(), inlet_map, true, + vec![], ); let mut correlation_id = 0; diff --git a/implementations/rust/ockam/ockam_api/src/kafka/tests/integration_test.rs b/implementations/rust/ockam/ockam_api/src/kafka/tests/integration_test.rs index 0f683e56a01..8eb78dda42c 100644 --- a/implementations/rust/ockam/ockam_api/src/kafka/tests/integration_test.rs +++ b/implementations/rust/ockam/ockam_api/src/kafka/tests/integration_test.rs @@ -26,7 +26,7 @@ use tokio::sync::Mutex; use tokio::task::JoinHandle; use uuid::Uuid; -use crate::kafka::key_exchange::controller::KafkaKeyExchangeController; +use crate::kafka::key_exchange::controller::KafkaKeyExchangeControllerImpl; use crate::kafka::protocol_aware::inlet::KafkaInletInterceptorFactory; use crate::kafka::protocol_aware::utils::{encode_request, encode_response}; use crate::kafka::{ConsumerPublishing, ConsumerResolution, KafkaInletController}; @@ -79,7 +79,7 @@ async fn create_kafka_service( ) .await?; - let secure_channel_controller = KafkaKeyExchangeController::new( + let secure_channel_controller = KafkaKeyExchangeControllerImpl::new( (*handle.node_manager).clone(), handle.secure_channels.clone(), ConsumerResolution::ViaRelay(MultiAddr::try_from("/service/api")?), @@ -117,6 +117,7 @@ async fn create_kafka_service( secure_channel_controller, inlet_controller, true, + vec![], )), Arc::new(AllowAll), Arc::new(AllowAll), diff --git a/implementations/rust/ockam/ockam_api/src/kafka/tests/interceptor_test.rs b/implementations/rust/ockam/ockam_api/src/kafka/tests/interceptor_test.rs index 81b5dd14512..606200a9f51 100644 --- a/implementations/rust/ockam/ockam_api/src/kafka/tests/interceptor_test.rs +++ b/implementations/rust/ockam/ockam_api/src/kafka/tests/interceptor_test.rs @@ -27,7 +27,7 @@ use ockam_node::Context; use ockam_transport_tcp::{PortalInterceptorWorker, PortalMessage, MAX_PAYLOAD_SIZE}; use crate::kafka::inlet_controller::KafkaInletController; -use crate::kafka::key_exchange::controller::KafkaKeyExchangeController; +use crate::kafka::key_exchange::controller::KafkaKeyExchangeControllerImpl; use crate::kafka::protocol_aware::inlet::InletInterceptorImpl; use crate::kafka::protocol_aware::KafkaMessageInterceptorWrapper; use crate::kafka::protocol_aware::MAX_KAFKA_MESSAGE_SIZE; @@ -318,7 +318,7 @@ async fn setup_only_worker(context: &mut Context, handle: &NodeManagerHandle) -> Some(authority_identifier.clone()), ); - let secure_channel_controller = KafkaKeyExchangeController::new( + let secure_channel_controller = KafkaKeyExchangeControllerImpl::new( (*handle.node_manager).clone(), secure_channels, ConsumerResolution::ViaRelay(MultiAddr::default()), @@ -335,10 +335,11 @@ async fn setup_only_worker(context: &mut Context, handle: &NodeManagerHandle) -> Arc::new(AllowAll), Arc::new(KafkaMessageInterceptorWrapper::new( Arc::new(InletInterceptorImpl::new( - secure_channel_controller, + Arc::new(secure_channel_controller), Default::default(), inlet_map, true, + vec![], )), TEST_MAX_KAFKA_MESSAGE_SIZE, )), @@ -408,7 +409,7 @@ async fn kafka_portal_worker__metadata_exchange__response_changed( ) .await?; - let secure_channel_controller = KafkaKeyExchangeController::new( + let secure_channel_controller = KafkaKeyExchangeControllerImpl::new( (*handle.node_manager).clone(), handle.secure_channels.clone(), ConsumerResolution::ViaRelay(MultiAddr::default()), @@ -435,10 +436,11 @@ async fn kafka_portal_worker__metadata_exchange__response_changed( Arc::new(AllowAll), Arc::new(KafkaMessageInterceptorWrapper::new( Arc::new(InletInterceptorImpl::new( - secure_channel_controller, + Arc::new(secure_channel_controller), Default::default(), inlet_map.clone(), true, + vec![], )), MAX_KAFKA_MESSAGE_SIZE, )), diff --git a/implementations/rust/ockam/ockam_api/src/nodes/models/services.rs b/implementations/rust/ockam/ockam_api/src/nodes/models/services.rs index f5d04c4b0ca..5f106d68d88 100644 --- a/implementations/rust/ockam/ockam_api/src/nodes/models/services.rs +++ b/implementations/rust/ockam/ockam_api/src/nodes/models/services.rs @@ -101,6 +101,7 @@ pub struct StartKafkaInletRequest { #[n(7)] inlet_policy_expression: Option, #[n(8)] consumer_policy_expression: Option, #[n(9)] producer_policy_expression: Option, + #[n(10)] encrypted_fields: Vec, } impl StartKafkaInletRequest { @@ -110,6 +111,7 @@ impl StartKafkaInletRequest { brokers_port_range: impl Into<(u16, u16)>, kafka_outlet_route: MultiAddr, encrypt_content: bool, + encrypted_fields: Vec, consumer_resolution: ConsumerResolution, consumer_publishing: ConsumerPublishing, inlet_policy_expression: Option, @@ -126,6 +128,7 @@ impl StartKafkaInletRequest { inlet_policy_expression, consumer_policy_expression, producer_policy_expression, + encrypted_fields, } } @@ -143,6 +146,10 @@ impl StartKafkaInletRequest { self.encrypt_content } + pub fn encrypted_fields(&self) -> Vec { + self.encrypted_fields.clone() + } + pub fn consumer_resolution(&self) -> ConsumerResolution { self.consumer_resolution.clone() } diff --git a/implementations/rust/ockam/ockam_api/src/nodes/service/kafka_services.rs b/implementations/rust/ockam/ockam_api/src/nodes/service/kafka_services.rs index 5821a15d922..26b5128b191 100644 --- a/implementations/rust/ockam/ockam_api/src/nodes/service/kafka_services.rs +++ b/implementations/rust/ockam/ockam_api/src/nodes/service/kafka_services.rs @@ -1,6 +1,6 @@ use super::NodeManagerWorker; use crate::error::ApiError; -use crate::kafka::key_exchange::controller::KafkaKeyExchangeController; +use crate::kafka::key_exchange::controller::KafkaKeyExchangeControllerImpl; use crate::kafka::protocol_aware::inlet::KafkaInletInterceptorFactory; use crate::kafka::protocol_aware::outlet::KafkaOutletInterceptorFactory; use crate::kafka::KafkaOutletController; @@ -45,6 +45,7 @@ impl NodeManagerWorker { request.brokers_port_range(), request.project_route(), request.encrypt_content(), + request.encrypted_fields(), request.consumer_resolution(), request.consumer_publishing(), request.inlet_policy_expression(), @@ -117,6 +118,7 @@ impl InMemoryNode { brokers_port_range: (u16, u16), outlet_node_multiaddr: MultiAddr, encrypt_content: bool, + encrypted_fields: Vec, consumer_resolution: ConsumerResolution, consumer_publishing: ConsumerPublishing, inlet_policy_expression: Option, @@ -147,7 +149,7 @@ impl InMemoryNode { ) .await?; - let secure_channel_controller = KafkaKeyExchangeController::new( + let secure_channel_controller = KafkaKeyExchangeControllerImpl::new( self.node_manager.clone(), self.secure_channels.clone(), consumer_resolution, @@ -233,6 +235,7 @@ impl InMemoryNode { secure_channel_controller, inlet_controller, encrypt_content, + encrypted_fields, )), Arc::new(policy_access_control.create_incoming()), Arc::new(policy_access_control.create_outgoing(context).await?), diff --git a/implementations/rust/ockam/ockam_command/src/kafka/consumer/create.rs b/implementations/rust/ockam/ockam_command/src/kafka/consumer/create.rs index 8db5363000b..15574244871 100644 --- a/implementations/rust/ockam/ockam_command/src/kafka/consumer/create.rs +++ b/implementations/rust/ockam/ockam_command/src/kafka/consumer/create.rs @@ -49,6 +49,7 @@ impl CreateCommand { publishing_relay: None, avoid_publishing: false, disable_content_encryption: false, + encrypted_fields: vec![], inlet_policy_expression: None, consumer_policy_expression: None, producer_policy_expression: None, diff --git a/implementations/rust/ockam/ockam_command/src/kafka/inlet/create.rs b/implementations/rust/ockam/ockam_command/src/kafka/inlet/create.rs index 01f84e49465..64dfa8797e4 100644 --- a/implementations/rust/ockam/ockam_command/src/kafka/inlet/create.rs +++ b/implementations/rust/ockam/ockam_command/src/kafka/inlet/create.rs @@ -92,6 +92,11 @@ pub struct CreateCommand { )] pub disable_content_encryption: bool, + /// The fields to encrypt in the kafka messages, assuming the record is a valid JSON map. + /// By default, the whole record is encrypted. + #[arg(long, long = "encrypted-field", value_name = "FIELD")] + pub encrypted_fields: Vec, + /// Policy expression that will be used for access control to the Kafka Inlet. /// If you don't provide it, the policy set for the "tcp-inlet" resource type will be used. /// @@ -177,6 +182,7 @@ impl Command for CreateCommand { brokers_port_range, to.clone(), !self.disable_content_encryption, + self.encrypted_fields, consumer_resolution, consumer_publishing, self.inlet_policy_expression, diff --git a/implementations/rust/ockam/ockam_command/src/kafka/producer/create.rs b/implementations/rust/ockam/ockam_command/src/kafka/producer/create.rs index d716a6c18d0..22ee2355542 100644 --- a/implementations/rust/ockam/ockam_command/src/kafka/producer/create.rs +++ b/implementations/rust/ockam/ockam_command/src/kafka/producer/create.rs @@ -48,6 +48,7 @@ impl CreateCommand { publishing_relay: None, avoid_publishing: false, disable_content_encryption: false, + encrypted_fields: vec![], inlet_policy_expression: None, consumer_policy_expression: None, producer_policy_expression: None, diff --git a/implementations/rust/ockam/ockam_command/tests/bats/kafka/docker.bats b/implementations/rust/ockam/ockam_command/tests/bats/kafka/docker.bats index 2fd3de2ff2f..80aeaa6d7be 100644 --- a/implementations/rust/ockam/ockam_command/tests/bats/kafka/docker.bats +++ b/implementations/rust/ockam/ockam_command/tests/bats/kafka/docker.bats @@ -462,3 +462,94 @@ kafka_docker_end_to_end_encrypted_offset_decryption() { start_kafka kafka_docker_end_to_end_encrypted_offset_decryption } + +kafka_docker_encrypt_only_two_fields() { + # Admin + export ADMIN_HOME="$OCKAM_HOME" + export OCKAM_LOGGING=1 + export OCKAM_LOG_LEVEL=info + + export CONSUMER_OUTPUT="$ADMIN_HOME/consumer.log" + + # create a kafka outlet and inlet with direct connection to the kafka instance + run_success "$OCKAM" kafka-outlet create --bootstrap-server 127.0.0.1:19092 + run_success "$OCKAM" kafka-inlet create --from 29092 \ + --encrypted-field encrypted_field_one \ + --encrypted-field encrypted_field_two \ + --avoid-publishing \ + --to self \ + --consumer self + + run kafka-topics --bootstrap-server localhost:29092 --delete --topic demo || true + sleep 5 + run_success kafka-topics --bootstrap-server localhost:29092 --create --topic demo --partitions 1 --replication-factor 1 + + # we push different records in the same topic + # ockam is expected to encrypt only the fields encrypted_field_one and encrypted_field_two + sleep 5 + RECORDS=( + '{"encrypted_field_one":"value1","encrypted_field_two":"value2","field_three":"value3"}' + '{"encrypted_field_one":{"key": "value"},"encrypted_field_two":["hello","world"]}' + ) + for record in "${RECORDS[@]}"; do echo $record; done | kafka-console-producer --topic demo --bootstrap-server localhost:29092 --max-block-ms 30000 + sleep 5 + + # connect directly to the broker to get the "raw" records + # the fields encrypted_field_one and encrypted_field_two should be encrypted + kafka-console-consumer --topic demo \ + --bootstrap-server localhost:19092 \ + --partition 0 \ + --offset 0 \ + --max-messages 1 --timeout-ms 30000 >"$CONSUMER_OUTPUT" + + run bash -c "cat \"\$CONSUMER_OUTPUT\" | jq -r '.encrypted_field_one'" + refute_output "value1" + + run bash -c "cat \"\$CONSUMER_OUTPUT\" | jq -r '.encrypted_field_two'" + refute_output "value2" + + run bash -c "cat \"\$CONSUMER_OUTPUT\" | jq -r '.field_three'" + assert_output "value3" + + # connect to the ockam kafka inlet to get the first record + # the fields encrypted_field_one and encrypted_field_two should be decrypted + kafka-console-consumer --topic demo \ + --bootstrap-server localhost:29092 \ + --partition 0 \ + --offset 0 \ + --max-messages 1 --timeout-ms 30000 >"$CONSUMER_OUTPUT" + + run bash -c "cat \"\$CONSUMER_OUTPUT\" | jq -r '.encrypted_field_one'" + assert_output "value1" + + run bash -c "cat \"\$CONSUMER_OUTPUT\" | jq -r '.encrypted_field_two'" + assert_output "value2" + + run bash -c "cat \"\$CONSUMER_OUTPUT\" | jq -r '.field_three'" + assert_output "value3" + + # same, for the second record + kafka-console-consumer --topic demo \ + --bootstrap-server localhost:29092 \ + --partition 0 \ + --offset 1 \ + --max-messages 1 --timeout-ms 30000 >"$CONSUMER_OUTPUT" + + run bash -c "cat \"\$CONSUMER_OUTPUT\" | jq -r '.encrypted_field_one.key'" + assert_output "value" + + run bash -c "cat \"\$CONSUMER_OUTPUT\" | jq -r '.encrypted_field_two[0]'" + assert_output "hello" +} + +@test "kafka - docker - encrypt only two fields - redpanda" { + export KAFKA_COMPOSE_FILE="redpanda-docker-compose.yaml" + start_kafka + kafka_docker_encrypt_only_two_fields +} + +@test "kafka - docker - encrypt only two fields - apache" { + export KAFKA_COMPOSE_FILE="apache-docker-compose.yaml" + start_kafka + kafka_docker_encrypt_only_two_fields +} diff --git a/implementations/rust/ockam/ockam_core/src/routing/route.rs b/implementations/rust/ockam/ockam_core/src/routing/route.rs index cc961e384f0..88840b5bca5 100644 --- a/implementations/rust/ockam/ockam_core/src/routing/route.rs +++ b/implementations/rust/ockam/ockam_core/src/routing/route.rs @@ -259,6 +259,12 @@ impl Route { } } +impl Default for Route { + fn default() -> Self { + Route::new().into() + } +} + impl Display for Route { fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result { write!(