Skip to content

Commit

Permalink
feat(rust): added the possibility to encrypt specific fields in a kaf…
Browse files Browse the repository at this point in the history
…ka `JSON` record
  • Loading branch information
davide-baldo committed Aug 14, 2024
1 parent 858567c commit 330b9ee
Show file tree
Hide file tree
Showing 20 changed files with 839 additions and 113 deletions.
31 changes: 26 additions & 5 deletions implementations/rust/ockam/ockam_api/src/kafka/inlet_controller.rs
Original file line number Diff line number Diff line change
@@ -1,5 +1,3 @@
use std::fmt::Debug;

use crate::kafka::kafka_outlet_address;
use crate::nodes::NodeManager;
use crate::port_range::PortRange;
Expand All @@ -14,6 +12,8 @@ use ockam_core::{Result, Route};
use ockam_multiaddr::MultiAddr;
use ockam_node::Context;
use ockam_transport_core::HostnamePort;
use std::fmt::Debug;
use std::sync::Weak;

type BrokerId = i32;

Expand All @@ -24,7 +24,7 @@ type BrokerId = i32;
pub(crate) struct KafkaInletController {
inner: Arc<Mutex<KafkaInletMapInner>>,
policy_expression: Option<PolicyExpression>,
node_manager: Arc<NodeManager>,
node_manager: Weak<NodeManager>,
}

impl Debug for KafkaInletController {
Expand Down Expand Up @@ -68,11 +68,28 @@ impl KafkaInletController {
local_interceptor_route,
remote_interceptor_route,
})),
node_manager,
node_manager: Arc::downgrade(&node_manager),
policy_expression,
}
}

#[cfg(test)]
pub(crate) fn stub() -> KafkaInletController {
Self {
inner: Arc::new(Mutex::new(KafkaInletMapInner {
outlet_node_multiaddr: Default::default(),
broker_map: Default::default(),
current_port: Default::default(),
port_range: PortRange::new(0, 0).unwrap(),
bind_hostname: Default::default(),
local_interceptor_route: Default::default(),
remote_interceptor_route: Default::default(),
})),
node_manager: Weak::new(),
policy_expression: Default::default(),
}
}

#[cfg(test)]
pub(crate) async fn retrieve_inlet(&self, broker_id: BrokerId) -> Option<HostnamePort> {
let inner = self.inner.lock().await;
Expand Down Expand Up @@ -103,7 +120,11 @@ impl KafkaInletController {
let inlet_bind_address =
HostnamePort::new(inner.bind_hostname.clone(), inner.current_port);

self.node_manager
let node_manager = self.node_manager.upgrade().ok_or_else(|| {
Error::new(Origin::Node, Kind::Internal, "node manager was shut down")
})?;

node_manager
.create_inlet(
context,
inlet_bind_address.clone(),
Expand Down
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
use crate::kafka::key_exchange::{KafkaEncryptedContent, TopicPartition};
use crate::kafka::key_exchange::{KafkaKeyExchangeController, TopicPartition};
use crate::kafka::protocol_aware::KafkaEncryptedContent;
use crate::kafka::{ConsumerPublishing, ConsumerResolution};
use crate::nodes::NodeManager;
use ockam::identity::{
Expand All @@ -7,36 +8,27 @@ use ockam::identity::{
};
use ockam_abac::PolicyAccessControl;
use ockam_core::compat::collections::{HashMap, HashSet};
use ockam_core::{route, Address};
use ockam_core::{async_trait, route, Address};
use ockam_node::Context;
use std::sync::Arc;
use tokio::sync::Mutex;

#[derive(Clone)]
pub(crate) struct KafkaKeyExchangeController {
pub(crate) inner: Arc<Mutex<InnerSecureChannelControllerImpl>>,
pub(crate) struct KafkaKeyExchangeControllerImpl {
pub(crate) inner: Arc<Mutex<InnerSecureChannelController>>,
}

/// Offer simple APIs to encrypt and decrypt kafka messages.
/// Underneath it creates secure channels for each topic/partition
/// and uses them to encrypt the content.
/// Multiple secure channels may be created for the same topic/partition
/// but each will be explicitly labeled.
impl KafkaKeyExchangeController {
/// Encrypts the content specifically for the consumer waiting for that topic name and
/// partition.
/// To do so it'll create a secure channel which will be used for key exchange only.
/// The secure channel will be created only once and then re-used, hence the first time will
/// be slower, and may take up to few seconds.
pub async fn encrypt_content(
#[async_trait]
impl KafkaKeyExchangeController for KafkaKeyExchangeControllerImpl {
async fn encrypt_content(
&self,
context: &mut Context,
topic_name: &str,
partition_id: i32,
partition_index: i32,
content: Vec<u8>,
) -> ockam_core::Result<KafkaEncryptedContent> {
let secure_channel_entry = self
.get_or_create_secure_channel(context, topic_name, partition_id)
.get_or_create_secure_channel(context, topic_name, partition_index)
.await?;

let consumer_decryptor_address = secure_channel_entry.their_decryptor_address();
Expand Down Expand Up @@ -64,9 +56,7 @@ impl KafkaKeyExchangeController {
})
}

/// Decrypts the content based on the consumer decryptor address
/// the secure channel is expected to be already initialized.
pub async fn decrypt_content(
async fn decrypt_content(
&self,
context: &mut Context,
consumer_decryptor_address: &Address,
Expand Down Expand Up @@ -97,10 +87,7 @@ impl KafkaKeyExchangeController {
Ok(decrypted_content)
}

/// Starts relays in the orchestrator for each {topic_name}_{partition} combination
/// should be used only by the consumer.
/// does nothing if they were already created, but fails it they already exist.
pub async fn publish_consumer(
async fn publish_consumer(
&self,
context: &mut Context,
topic_name: &str,
Expand Down Expand Up @@ -138,7 +125,7 @@ impl KafkaKeyExchangeController {
}
}

pub struct InnerSecureChannelControllerImpl {
pub struct InnerSecureChannelController {
// we identify the secure channel instance by using the decryptor address of the consumer
// which is known to both parties
pub(crate) topic_encryptor_map: HashMap<TopicPartition, Address>,
Expand All @@ -157,17 +144,17 @@ pub struct InnerSecureChannelControllerImpl {
pub(crate) producer_policy_access_control: PolicyAccessControl,
}

impl KafkaKeyExchangeController {
impl KafkaKeyExchangeControllerImpl {
pub(crate) fn new(
node_manager: Arc<NodeManager>,
secure_channels: Arc<SecureChannels>,
consumer_resolution: ConsumerResolution,
consumer_publishing: ConsumerPublishing,
consumer_policy_access_control: PolicyAccessControl,
producer_policy_access_control: PolicyAccessControl,
) -> KafkaKeyExchangeController {
) -> KafkaKeyExchangeControllerImpl {
Self {
inner: Arc::new(Mutex::new(InnerSecureChannelControllerImpl {
inner: Arc::new(Mutex::new(InnerSecureChannelController {
topic_encryptor_map: Default::default(),
identity_encryptor_map: Default::default(),
topic_relay_set: Default::default(),
Expand Down
52 changes: 43 additions & 9 deletions implementations/rust/ockam/ockam_api/src/kafka/key_exchange/mod.rs
Original file line number Diff line number Diff line change
@@ -1,18 +1,12 @@
use crate::kafka::protocol_aware::KafkaEncryptedContent;
use minicbor::{CborLen, Decode, Encode};

use ockam_core::Address;
use ockam_core::{async_trait, Address};
use ockam_multiaddr::MultiAddr;
use ockam_node::Context;

pub(crate) mod controller;
mod secure_channels;

pub(crate) struct KafkaEncryptedContent {
/// The encrypted content
pub(crate) content: Vec<u8>,
/// The secure channel identifier used to encrypt the content
pub(crate) consumer_decryptor_address: Address,
}

/// Describe how to reach the consumer node: either directly or through a relay
#[derive(Debug, Clone, Encode, Decode, CborLen)]
#[rustfmt::skip]
Expand All @@ -32,3 +26,43 @@ pub enum ConsumerPublishing {
}

type TopicPartition = (String, i32);

/// Offer simple APIs to encrypt and decrypt kafka messages.
/// Underneath it creates secure channels for each topic/partition
/// and uses them to encrypt the content.
/// Multiple secure channels may be created for the same topic/partition
/// but each will be explicitly labeled.
#[async_trait]
pub(crate) trait KafkaKeyExchangeController: Send + Sync + 'static {
/// Encrypts the content specifically for the consumer waiting for that topic name and
/// partition.
/// To do so, it'll create a secure channel which will be used for key exchange only.
/// The secure channel will be created only once and then re-used, hence the first time will
/// be slower, and may take up to few seconds.
async fn encrypt_content(
&self,
context: &mut Context,
topic_name: &str,
partition_index: i32,
content: Vec<u8>,
) -> ockam_core::Result<KafkaEncryptedContent>;

/// Decrypts the content based on the consumer decryptor address
/// the secure channel is expected to be already initialized.
async fn decrypt_content(
&self,
context: &mut Context,
consumer_decryptor_address: &Address,
encrypted_content: Vec<u8>,
) -> ockam_core::Result<Vec<u8>>;

/// Starts relays in the orchestrator for each {topic_name}_{partition} combination
/// should be used only by the consumer.
/// does nothing if they were already created, but fails it they already exist.
async fn publish_consumer(
&self,
context: &mut Context,
topic_name: &str,
partitions: Vec<i32>,
) -> ockam_core::Result<()>;
}
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
use crate::kafka::key_exchange::controller::{
InnerSecureChannelControllerImpl, KafkaKeyExchangeController,
InnerSecureChannelController, KafkaKeyExchangeControllerImpl,
};
use crate::kafka::ConsumerResolution;
use crate::nodes::service::SecureChannelType;
Expand All @@ -12,10 +12,10 @@ use ockam_multiaddr::MultiAddr;
use ockam_node::Context;
use tokio::sync::MutexGuard;

impl KafkaKeyExchangeController {
impl KafkaKeyExchangeControllerImpl {
/// Creates a secure channel for the given destination.
async fn create_secure_channel(
inner: &MutexGuard<'_, InnerSecureChannelControllerImpl>,
inner: &MutexGuard<'_, InnerSecureChannelController>,
context: &Context,
mut destination: MultiAddr,
) -> Result<Address> {
Expand All @@ -37,7 +37,7 @@ impl KafkaKeyExchangeController {

/// Creates a secure channel for the given destination, for key exchange only.
async fn create_key_exchange_only_secure_channel(
inner: &MutexGuard<'_, InnerSecureChannelControllerImpl>,
inner: &MutexGuard<'_, InnerSecureChannelController>,
context: &Context,
mut destination: MultiAddr,
) -> Result<Address> {
Expand Down Expand Up @@ -180,7 +180,7 @@ impl KafkaKeyExchangeController {
}

async fn validate_consumer_credentials(
inner: &MutexGuard<'_, InnerSecureChannelControllerImpl>,
inner: &MutexGuard<'_, InnerSecureChannelController>,
entry: &SecureChannelRegistryEntry,
) -> Result<()> {
let authorized = inner
Expand Down
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
use crate::kafka::key_exchange::controller::KafkaKeyExchangeController;
use crate::kafka::key_exchange::controller::KafkaKeyExchangeControllerImpl;
use crate::kafka::key_exchange::KafkaKeyExchangeController;
use crate::kafka::protocol_aware::{
CorrelationId, KafkaMessageInterceptor, KafkaMessageInterceptorWrapper, RequestInfo,
TopicUuidMap, MAX_KAFKA_MESSAGE_SIZE,
Expand All @@ -12,53 +13,78 @@ use std::sync::{Arc, Mutex};
mod request;
mod response;

#[cfg(test)]
mod tests;

#[derive(Clone)]
pub(crate) struct InletInterceptorImpl {
request_map: Arc<Mutex<HashMap<CorrelationId, RequestInfo>>>,
uuid_to_name: TopicUuidMap,
key_exchange_controller: KafkaKeyExchangeController,
key_exchange_controller: Arc<dyn KafkaKeyExchangeController>,
inlet_map: KafkaInletController,
encrypt_content: bool,
encrypted_fields: Vec<String>,
}

#[async_trait]
impl KafkaMessageInterceptor for InletInterceptorImpl {}

impl InletInterceptorImpl {
pub(crate) fn new(
key_exchange_controller: KafkaKeyExchangeController,
key_exchange_controller: Arc<dyn KafkaKeyExchangeController>,
uuid_to_name: TopicUuidMap,
inlet_map: KafkaInletController,
encrypt_content: bool,
encrypted_fields: Vec<String>,
) -> InletInterceptorImpl {
Self {
request_map: Arc::new(Mutex::new(Default::default())),
uuid_to_name,
key_exchange_controller,
inlet_map,
encrypt_content,
encrypted_fields,
}
}

#[cfg(test)]
pub(crate) fn add_request(
&self,
correlation_id: CorrelationId,
api_key: kafka_protocol::messages::ApiKey,
api_version: i16,
) {
self.request_map.lock().unwrap().insert(
correlation_id,
RequestInfo {
request_api_key: api_key,
request_api_version: api_version,
},
);
}
}

pub(crate) struct KafkaInletInterceptorFactory {
secure_channel_controller: KafkaKeyExchangeController,
secure_channel_controller: KafkaKeyExchangeControllerImpl,
uuid_to_name: TopicUuidMap,
inlet_map: KafkaInletController,
encrypt_content: bool,
encrypted_fields: Vec<String>,
}

impl KafkaInletInterceptorFactory {
pub(crate) fn new(
secure_channel_controller: KafkaKeyExchangeController,
secure_channel_controller: KafkaKeyExchangeControllerImpl,
inlet_map: KafkaInletController,
encrypt_content: bool,
encrypted_fields: Vec<String>,
) -> Self {
Self {
secure_channel_controller,
uuid_to_name: Default::default(),
inlet_map,
encrypt_content,
encrypted_fields,
}
}
}
Expand All @@ -67,10 +93,11 @@ impl PortalInterceptorFactory for KafkaInletInterceptorFactory {
fn create(&self) -> Arc<dyn PortalInterceptor> {
Arc::new(KafkaMessageInterceptorWrapper::new(
Arc::new(InletInterceptorImpl::new(
self.secure_channel_controller.clone(),
Arc::new(self.secure_channel_controller.clone()),
self.uuid_to_name.clone(),
self.inlet_map.clone(),
self.encrypt_content,
self.encrypted_fields.clone(),
)),
MAX_KAFKA_MESSAGE_SIZE,
))
Expand Down
Loading

0 comments on commit 330b9ee

Please sign in to comment.