Skip to content

Commit

Permalink
feat(rust): switching key negotiation to a double secure channel
Browse files Browse the repository at this point in the history
  • Loading branch information
davide-baldo committed Dec 27, 2024
1 parent c85eb64 commit 1e28ec9
Show file tree
Hide file tree
Showing 21 changed files with 311 additions and 122 deletions.
1 change: 0 additions & 1 deletion Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

1 change: 0 additions & 1 deletion implementations/rust/ockam/ockam_api/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -112,7 +112,6 @@ tracing-error = "0.2.0"
tracing-opentelemetry = "0.27.0"
tracing-subscriber = { version = "0.3", features = ["json"] }
url = "2.5.2"
zeroize = { version = "1.8.1", features = ["zeroize_derive"] }

ockam_multiaddr = { path = "../ockam_multiaddr", version = "0.66.0", features = ["cbor", "serde"] }
ockam_transport_core = { path = "../ockam_transport_core", version = "^0.99.0" }
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -266,7 +266,8 @@ mod test {
pub fn rekey_rotation() -> ockam_core::Result<()> {
let runtime = Arc::new(Runtime::new().unwrap());
let runtime_cloned = runtime.clone();
std::env::set_var("OCKAM_LOGGING", "false");
std::env::set_var("OCKAM_LOGGING", "true");
std::env::set_var("OCKAM_LOG_LEVEL", "debug");

runtime_cloned.block_on(async move {
let test_body = async move {
Expand Down Expand Up @@ -297,24 +298,34 @@ mod test {
.get_flow_control_with_spawner(&DefaultAddress::SECURE_CHANNEL_LISTENER.into())
.unwrap();

let test_clock = TestClock::new(0);

KafkaKeyExchangeListener::create(
&consumer_node.context,
consumer_node
.node_manager
.secure_channels
.vault()
.encryption_at_rest_vault,
Duration::from_secs(60),
Duration::from_secs(60),
Duration::from_secs(60),
consumer_node
.node_manager
.secure_channels
.vault()
.secure_channel_vault,
consumer_node
.node_manager
.secure_channels
.secure_channel_registry(),
Duration::from_secs(5 * 60), //rotation
Duration::from_secs(10 * 60), //validity
Duration::from_secs(60), //rekey
&consumer_secure_channel_listener_flow_control_id,
AllowAll,
AllowAll,
test_clock.clone(),
)
.await?;

let test_clock = TestClock::new(0);

let destination = consumer_node.listen_address().await.multi_addr().unwrap();
let producer_secure_channel_controller = create_secure_channel_controller(
test_clock.clone(),
Expand Down Expand Up @@ -355,7 +366,10 @@ mod test {
.await?;

assert_eq!(third_key.rekey_counter, 1);
assert_eq!(first_key.secret_key_handle, third_key.secret_key_handle);
assert_eq!(
first_key.key_identifier_for_consumer,
third_key.key_identifier_for_consumer
);

// 04:00 - yet another rekey should happen, but no rotation
test_clock.add_seconds(60 * 3);
Expand All @@ -365,7 +379,10 @@ mod test {
.await?;

assert_eq!(fourth_key.rekey_counter, 2);
assert_eq!(first_key.secret_key_handle, fourth_key.secret_key_handle);
assert_eq!(
first_key.key_identifier_for_consumer,
fourth_key.key_identifier_for_consumer
);

// 05:00 - the default duration of the key is 10 minutes,
// but the rotation should happen after 5 minutes
Expand All @@ -375,7 +392,10 @@ mod test {
.get_or_exchange_key(&mut producer_node.context, "topic_name")
.await?;

assert_ne!(third_key.secret_key_handle, fifth_key.secret_key_handle);
assert_ne!(
third_key.key_identifier_for_consumer,
fifth_key.key_identifier_for_consumer
);
assert_eq!(fifth_key.rekey_counter, 0);

// Now let's simulate a failure to rekey by shutting down the consumer
Expand All @@ -389,7 +409,10 @@ mod test {
.await?;

assert_eq!(sixth_key.rekey_counter, 1);
assert_eq!(fifth_key.secret_key_handle, sixth_key.secret_key_handle);
assert_eq!(
fifth_key.key_identifier_for_consumer,
sixth_key.key_identifier_for_consumer
);

// 10:00 - Rotation fails, but the existing key is still valid
// and needs to be rekeyed
Expand All @@ -400,7 +423,10 @@ mod test {
.await?;

assert_eq!(seventh_key.rekey_counter, 2);
assert_eq!(fifth_key.secret_key_handle, seventh_key.secret_key_handle);
assert_eq!(
fifth_key.key_identifier_for_consumer,
seventh_key.key_identifier_for_consumer
);

// 15:00 - Rotation fails, and the existing key is no longer valid
test_clock.add_seconds(60 * 5);
Expand Down
Original file line number Diff line number Diff line change
@@ -1,37 +1,42 @@
use crate::DefaultAddress;
use minicbor::{CborLen, Decode, Encode};
use ockam::identity::TimestampInSeconds;
use ockam::identity::{
SecureChannelApiRequest, SecureChannelApiResponse, SecureChannelRegistry, TimestampInSeconds,
};
use ockam_core::compat::clock::Clock;
use ockam_core::flow_control::FlowControlId;
use ockam_core::{
async_trait, Address, Decodable, Encodable, Encoded, IncomingAccessControl, Message,
async_trait, route, Address, Decodable, Encodable, Encoded, IncomingAccessControl, Message,
OutgoingAccessControl, Routed, Worker,
};
use ockam_node::{Context, WorkerBuilder};
use ockam_vault::VaultForEncryptionAtRest;
use rand::Rng;
use ockam_vault::{VaultForEncryptionAtRest, VaultForSecureChannels};
use std::sync::Arc;
use std::time::Duration;

pub(crate) struct KafkaKeyExchangeListener {
encryption_at_rest: Arc<dyn VaultForEncryptionAtRest>,
secure_channel_vault: Arc<dyn VaultForSecureChannels>,
secure_channel_registry: SecureChannelRegistry,
rekey_period: Duration,
key_validity: Duration,
key_rotation: Duration,
clock: Box<dyn Clock>,
}

#[derive(Debug, CborLen, Encode, Decode)]
#[rustfmt::skip]
pub(crate) struct KeyExchangeRequest {
#[n(1)] pub local_decryptor_address: Address,
}

#[derive(Debug, CborLen, Encode, Decode)]
#[rustfmt::skip]
pub(crate) struct KeyExchangeResponse {
#[n(0)] pub key_identifier_for_consumer: Vec<u8>,
#[n(1)] pub secret_key: [u8; 32],
#[n(2)] pub valid_until: TimestampInSeconds,
#[n(3)] pub rotate_after: TimestampInSeconds,
#[n(4)] pub rekey_period: Duration,
#[n(1)] pub valid_until: TimestampInSeconds,
#[n(2)] pub rotate_after: TimestampInSeconds,
#[n(3)] pub rekey_period: Duration,
}

impl Encodable for KeyExchangeRequest {
Expand Down Expand Up @@ -70,14 +75,43 @@ impl Worker for KafkaKeyExchangeListener {
context: &mut Self::Context,
message: Routed<Self::Message>,
) -> ockam_core::Result<()> {
let mut secret_key = [0u8; 32];
rand::thread_rng().fill(&mut secret_key[..]);
let handle = self
.encryption_at_rest
.import_aead_key(secret_key.to_vec())
.await?;
let request: KeyExchangeRequest = minicbor::decode(message.payload())?;
let local_decryptor = Address::from_string(request.local_decryptor_address);

let entry = self
.secure_channel_registry
.get_channel_by_decryptor_address(&local_decryptor);
let handle = match entry {
None => {
warn!("No secure channel found for local decryptor {local_decryptor}",);
return Ok(());
}
Some(entry) => {
let response: SecureChannelApiResponse = context
.send_and_receive(
route![entry.decryptor_api_address().clone()],
SecureChannelApiRequest::ExtractKey,
)
.await?;

let key_identifier = match response {
SecureChannelApiResponse::Ok(key_identifier) => key_identifier,
SecureChannelApiResponse::Err(error) => {
error!("Error extracting key: {error}");
return Ok(());
}
};

let secret = self
.secure_channel_vault
.export_rekey(&key_identifier)
.await?;

self.encryption_at_rest.import_aead_key(secret).await?
}
};

let now = TimestampInSeconds(ockam_core::compat::time::now()?);
let now = TimestampInSeconds(self.clock.now()?);
let valid_until = now + self.key_validity;
let rotate_after = now + self.key_rotation;

Expand All @@ -86,7 +120,6 @@ impl Worker for KafkaKeyExchangeListener {
message.return_route().clone(),
KeyExchangeResponse {
key_identifier_for_consumer: handle.into_vec(),
secret_key,
valid_until,
rotate_after,
rekey_period: self.rekey_period,
Expand All @@ -103,12 +136,15 @@ impl KafkaKeyExchangeListener {
pub async fn create(
context: &Context,
encryption_at_rest: Arc<dyn VaultForEncryptionAtRest>,
secure_channel_vault: Arc<dyn VaultForSecureChannels>,
secure_channel_registry: SecureChannelRegistry,
key_rotation: Duration,
key_validity: Duration,
rekey_period: Duration,
secure_channel_flow_control: &FlowControlId,
incoming_access_control: impl IncomingAccessControl,
outgoing_access_control: impl OutgoingAccessControl,
clock: impl Clock,
) -> ockam_core::Result<()> {
let address = Address::from_string(DefaultAddress::KAFKA_CUSTODIAN);
context
Expand All @@ -117,9 +153,12 @@ impl KafkaKeyExchangeListener {

WorkerBuilder::new(KafkaKeyExchangeListener {
encryption_at_rest,
secure_channel_vault,
key_rotation,
key_validity,
rekey_period,
secure_channel_registry,
clock: Box::new(clock),
})
.with_address(address)
.with_incoming_access_control(incoming_access_control)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -5,9 +5,9 @@ use crate::kafka::key_exchange::controller::{
use crate::kafka::key_exchange::listener::{KeyExchangeRequest, KeyExchangeResponse};
use crate::kafka::ConsumerResolution;
use crate::DefaultAddress;
use ockam::identity::TimestampInSeconds;
use ockam::identity::{SecureChannelApiRequest, SecureChannelApiResponse, TimestampInSeconds};
use ockam_core::errcode::{Kind, Origin};
use ockam_core::{Error, Result};
use ockam_core::{route, Error, Result};
use ockam_multiaddr::proto::{Secure, Service};
use ockam_multiaddr::MultiAddr;
use ockam_node::{Context, MessageSendReceiveOptions};
Expand Down Expand Up @@ -35,6 +35,72 @@ impl KafkaKeyExchangeControllerImpl {
destination.push_back(Secure::new(DefaultAddress::SECURE_CHANNEL_LISTENER))?;
destination.push_back(Service::new(DefaultAddress::KAFKA_CUSTODIAN))?;
if let Some(node_manager) = inner.node_manager.upgrade() {
// create a second secure channel to be used for key exchange
let (aead_secret_key_handle, their_decryptor_address) = {
let secure_channel_for_key_exchange = node_manager
.make_connection(context, &destination, node_manager.identifier(), None, None)
.await?;

let encryptor_address = secure_channel_for_key_exchange
.secure_channel_encryptors
.first()
.expect("encryptor should be present");

let entry = node_manager
.secure_channels
.secure_channel_registry()
.get_channel_by_encryptor_address(encryptor_address)
.expect("channel should be present");

if !inner
.consumer_policy_access_control
.is_identity_authorized(entry.their_id())
.await?
{
secure_channel_for_key_exchange
.close(context, &node_manager)
.await?;
return Err(Error::new(
Origin::Channel,
Kind::Invalid,
"Consumer is not authorized to use the secure channel",
));
}

let response: SecureChannelApiResponse = context
.send_and_receive(
route![entry.encryptor_api_address().clone()],
SecureChannelApiRequest::ExtractKey,
)
.await?;

match response {
SecureChannelApiResponse::Ok(secret_handle) => {
let secret = node_manager
.secure_channels
.vault()
.secure_channel_vault
.export_rekey(&secret_handle)
.await?;

secure_channel_for_key_exchange
.close(context, &node_manager)
.await?;

(
self.encryption_at_rest.import_aead_key(secret).await?,
entry.their_decryptor_address(),
)
}
SecureChannelApiResponse::Err(error) => {
secure_channel_for_key_exchange
.close(context, &node_manager)
.await?;
return Err(error);
}
}
};

let connection = node_manager
.make_connection(context, &destination, node_manager.identifier(), None, None)
.await?;
Expand All @@ -52,14 +118,17 @@ impl KafkaKeyExchangeControllerImpl {

let route = connection.route()?;
let response: KeyExchangeResponse = context
.send_and_receive_extended(route, KeyExchangeRequest {}, send_and_receive_options)
.send_and_receive_extended(
route,
KeyExchangeRequest {
local_decryptor_address: their_decryptor_address,
},
send_and_receive_options,
)
.await?
.into_body()?;

let aead_secret_key_handle = self
.encryption_at_rest
.import_aead_key(response.secret_key.to_vec())
.await?;
connection.close(context, &node_manager).await?;

Ok(ExchangedKey {
secret_key_handler: aead_secret_key_handle,
Expand Down
Loading

0 comments on commit 1e28ec9

Please sign in to comment.