Skip to content

Commit

Permalink
feat(rust): introduce env variables to adjust transport performance
Browse files Browse the repository at this point in the history
  • Loading branch information
SanjoDeundiak committed Dec 12, 2024
1 parent 7277027 commit c4afb4a
Show file tree
Hide file tree
Showing 27 changed files with 327 additions and 107 deletions.
6 changes: 5 additions & 1 deletion implementations/rust/ockam/ockam_api/src/influxdb/portal.rs
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,9 @@ use ockam_core::route;
use ockam_multiaddr::proto::Service;
use ockam_multiaddr::MultiAddr;
use ockam_transport_core::HostnamePort;
use ockam_transport_tcp::{PortalInletInterceptor, PortalOutletInterceptor};
use ockam_transport_tcp::{
read_portal_payload_length, PortalInletInterceptor, PortalOutletInterceptor,
};
use std::sync::Arc;
use std::time::Duration;

Expand Down Expand Up @@ -234,6 +236,7 @@ impl NodeManagerWorker {
http_interceptor_factory,
Arc::new(policy_access_control.create_outgoing(ctx).await?),
Arc::new(policy_access_control.create_incoming()),
read_portal_payload_length(),
)
.await?;

Expand Down Expand Up @@ -282,6 +285,7 @@ impl NodeManagerWorker {
http_interceptor_factory,
Arc::new(policy_access_control.create_incoming()),
Arc::new(policy_access_control.create_outgoing(ctx).await?),
read_portal_payload_length(),
)
.await?;
Ok(interceptor_address)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -40,7 +40,7 @@ use ockam_multiaddr::proto::Service;
use ockam_multiaddr::MultiAddr;
use ockam_node::compat::tokio;
use ockam_transport_core::HostnamePort;
use ockam_transport_tcp::PortalInletInterceptor;
use ockam_transport_tcp::{read_portal_payload_length, PortalInletInterceptor};

// TODO: upgrade to 13 by adding a metadata request to map uuid<=>topic_name
const TEST_KAFKA_API_VERSION: i16 = 12;
Expand Down Expand Up @@ -119,6 +119,7 @@ async fn create_kafka_service(
)),
Arc::new(AllowAll),
Arc::new(AllowAll),
read_portal_payload_length(),
)
.await?;

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@ use ockam_core::{route, Address, AllowAll, NeutralMessage, Routed, Worker};
use ockam_multiaddr::MultiAddr;
use ockam_node::database::SqlxDatabase;
use ockam_node::Context;
use ockam_transport_tcp::{PortalInterceptorWorker, PortalMessage, MAX_PAYLOAD_SIZE};
use ockam_transport_tcp::{read_portal_payload_length, PortalInterceptorWorker, PortalMessage};

use crate::kafka::inlet_controller::KafkaInletController;
use crate::kafka::key_exchange::controller::KafkaKeyExchangeControllerImpl;
Expand Down Expand Up @@ -169,7 +169,7 @@ async fn kafka_portal_worker__bigger_than_limit_kafka_message__error(
);

let huge_payload = request_buffer.as_ref();
for chunk in huge_payload.chunks(MAX_PAYLOAD_SIZE) {
for chunk in huge_payload.chunks(read_portal_payload_length()) {
let _error = context
.send(
route![portal_inlet_address.clone(), context.address()],
Expand Down Expand Up @@ -229,7 +229,10 @@ async fn kafka_portal_worker__almost_over_limit_than_limit_kafka_message__two_ka
// let's duplicate the message
huge_outgoing_request.extend(huge_outgoing_request.clone());

for chunk in huge_outgoing_request.as_ref().chunks(MAX_PAYLOAD_SIZE) {
for chunk in huge_outgoing_request
.as_ref()
.chunks(read_portal_payload_length())
{
context
.send(
route![portal_inlet_address.clone(), "tcp_payload_receiver"],
Expand Down Expand Up @@ -327,6 +330,7 @@ async fn setup_only_worker(context: &mut Context, handle: &NodeManagerHandle) ->
)),
TEST_MAX_KAFKA_MESSAGE_SIZE,
)),
read_portal_payload_length(),
)
.await
.unwrap()
Expand Down Expand Up @@ -425,6 +429,7 @@ async fn kafka_portal_worker__metadata_exchange__response_changed(
)),
MAX_KAFKA_MESSAGE_SIZE,
)),
read_portal_payload_length(),
)
.await?;

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,9 @@ use ockam_core::flow_control::FlowControls;
use ockam_core::route;
use ockam_multiaddr::proto::Project;
use ockam_multiaddr::MultiAddr;
use ockam_transport_tcp::{PortalInletInterceptor, PortalOutletInterceptor};
use ockam_transport_tcp::{
read_portal_payload_length, PortalInletInterceptor, PortalOutletInterceptor,
};
use std::sync::Arc;

impl NodeManagerWorker {
Expand Down Expand Up @@ -241,6 +243,7 @@ impl InMemoryNode {
)),
Arc::new(policy_access_control.create_incoming()),
Arc::new(policy_access_control.create_outgoing(context).await?),
read_portal_payload_length(),
)
.await?;

Expand Down Expand Up @@ -297,6 +300,7 @@ impl InMemoryNode {
)),
Arc::new(policy_access_control.create_outgoing(context).await?),
Arc::new(policy_access_control.create_incoming()),
read_portal_payload_length(),
)
.await?;

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -46,11 +46,14 @@ Tracing
- OCKAM_BACKGROUND_LOG_EXPORT_CUTOFF: Cutoff time for sending log records batches to an OpenTelemetry baclground node, without waiting for a response. Default value: `3s`.
- OCKAM_BACKGROUND_SPAN_EXPORT_CUTOFF: Cutoff time for sending span batches to an OpenTelemetry background inlet, without waiting for a response. Default value: `3s`.

UDP Puncture
UDP
- OCKAM_RENDEZVOUS_SERVER: set this variable to the hostname and port of the Rendezvous service
- OCKAM_UDP_PENDING_MESSAGES_PER_PEER: maximum number of messages per UDP peer that are cached to be assembled if their parts arrive out of order. Default value: 5
- OCKAM_UDP_MAX_ON_THE_WIRE_PACKET_SIZE: maximum size of a UDP packet on the wire. Default value: 508

TCP Portals
TCP
- OCKAM_PRIVILEGED: if variable is set, all TCP Inlets/Outlets will use eBPF (overrides `--privileged` argument for `ockam tcp-inlet create` and `ockam tcp-outlet create`).
- OCKAM_TCP_PORTAL_PAYLOAD_LENGTH: size of the buffer into which TCP Portal reads the TCP stream. Default value: `128 * 1024`

Devs Usage
- OCKAM: a `string` that defines the path to the ockam binary to use.
Expand All @@ -65,3 +68,4 @@ Devs Usage

Internal (to enable some special behavior in the logic)
- OCKAM_HELP_RENDER_MARKDOWN: a `boolean` to control the markdown rendering of the commands documentation.

36 changes: 20 additions & 16 deletions implementations/rust/ockam/ockam_core/src/env/env.rs
Original file line number Diff line number Diff line change
Expand Up @@ -6,34 +6,38 @@ use std::env::VarError;

/// Get environmental value `var_name`. If value is not found returns Ok(None)
pub fn get_env<T: FromString>(var_name: &str) -> Result<Option<T>> {
get_env_impl::<Option<T>>(var_name, None)
}

/// Return true if `var_name` is set and has a valid value
pub fn is_set<T: FromString>(var_name: &str) -> Result<bool> {
get_env_impl::<Option<T>>(var_name, None).map(|v| v.is_some())
}

/// Get environmental value `var_name`. If value is not found returns `default_value`
pub fn get_env_with_default<T: FromString>(var_name: &str, default_value: T) -> Result<T> {
get_env_impl::<T>(var_name, default_value)
}

fn get_env_impl<T: FromString>(var_name: &str, default_value: T) -> Result<T> {
match env::var(var_name) {
Ok(val) => {
match T::from_string(&val) {
Ok(v) => Ok(v),
Ok(v) => Ok(Some(v)),
Err(e) => Err(error(format!("The environment variable `{var_name}` cannot be decoded. The value `{val}` is invalid: {e:?}"))),
}
},
Err(e) => match e {
VarError::NotPresent => Ok(default_value),
VarError::NotPresent => Ok(None),
VarError::NotUnicode(_) => Err(error(format!("The environment variable `{var_name}` cannot be decoded because it is not some valid Unicode"))),
},
}
}

/// Return true if `var_name` is set and has a valid value
pub fn is_set<T: FromString>(var_name: &str) -> Result<bool> {
Ok(get_env::<T>(var_name)?.is_some())
}

/// Get environmental value `var_name`. If value is not found returns `default_value`
pub fn get_env_with_default<T: FromString>(var_name: &str, default_value: T) -> Result<T> {
Ok(get_env::<T>(var_name)?.unwrap_or(default_value))
}

/// Get environmental value `var_name`. If value is not found returns `default_value`
pub fn get_env_with_default_ignore_error<T: FromString>(var_name: &str, default_value: T) -> T {
get_env::<T>(var_name)
.ok()
.flatten()
.unwrap_or(default_value)
}

pub(crate) fn error(msg: String) -> Error {
Error::new(Origin::Core, Kind::Invalid, msg)
}
7 changes: 7 additions & 0 deletions implementations/rust/ockam/ockam_core/src/env/from_string.rs
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
use crate::env::error;
use crate::errcode::{Kind, Origin};
use crate::{Error, Result};
use core::str::FromStr;
use once_cell::sync::OnceCell;
use regex::Regex;
use std::path::PathBuf;
Expand Down Expand Up @@ -46,6 +47,12 @@ impl FromString for char {
}
}

impl FromString for usize {
fn from_string(s: &str) -> Result<Self> {
usize::from_str(s).map_err(|err| error(format!("usize parsing error: {err}")))
}
}

impl FromString for String {
fn from_string(s: &str) -> Result<Self> {
Ok(s.to_owned())
Expand Down
2 changes: 1 addition & 1 deletion implementations/rust/ockam/ockam_transport_tcp/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,7 @@ pub use options::{TcpConnectionOptions, TcpListenerOptions};
pub use portal::{
new_certificate_provider_cache, Direction, PortalInletInterceptor, PortalInterceptor,
PortalInterceptorFactory, PortalInterceptorWorker, PortalInternalMessage, PortalMessage,
PortalOutletInterceptor, TlsCertificate, TlsCertificateProvider, MAX_PAYLOAD_SIZE,
PortalOutletInterceptor, TlsCertificate, TlsCertificateProvider,
};
pub use protocol_version::*;
pub use registry::*;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -226,6 +226,7 @@ impl Processor for TcpInletListenProcessor {
addresses,
self.options.incoming_access_control.clone(),
self.options.outgoing_access_control.clone(),
self.options.portal_payload_length,
)
.await?;

Expand Down
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
use crate::{PortalMessage, MAX_PAYLOAD_SIZE};
use crate::PortalMessage;
use ockam_core::flow_control::{FlowControlId, FlowControlOutgoingAccessControl, FlowControls};
use ockam_core::{
async_trait, route, Address, AllowOnwardAddress, AllowSourceAddress, Any,
Expand Down Expand Up @@ -46,6 +46,7 @@ pub struct PortalOutletInterceptor {
outgoing_access_control: Arc<dyn OutgoingAccessControl>,
incoming_access_control: Arc<dyn IncomingAccessControl>,
spawner_flow_control_id: Option<FlowControlId>,
portal_payload_length: usize,
}

impl PortalOutletInterceptor {
Expand All @@ -66,12 +67,14 @@ impl PortalOutletInterceptor {
interceptor_factory: Arc<dyn PortalInterceptorFactory>,
outgoing_access_control: Arc<dyn OutgoingAccessControl>,
incoming_access_control: Arc<dyn IncomingAccessControl>,
portal_payload_length: usize,
) -> ockam_core::Result<()> {
let worker = Self {
spawner_flow_control_id,
interceptor_factory,
outgoing_access_control,
incoming_access_control: incoming_access_control.clone(),
portal_payload_length,
};

WorkerBuilder::new(worker)
Expand Down Expand Up @@ -110,6 +113,7 @@ impl Worker for PortalOutletInterceptor {
self.incoming_access_control.clone(),
self.outgoing_access_control.clone(),
self.interceptor_factory.create(),
self.portal_payload_length,
)
.await?;

Expand Down Expand Up @@ -145,6 +149,7 @@ pub struct PortalInletInterceptor {
interceptor_factory: Arc<dyn PortalInterceptorFactory>,
request_outgoing_access_control: Arc<dyn OutgoingAccessControl>,
response_incoming_access_control: Arc<dyn IncomingAccessControl>,
portal_payload_length: usize,
}

impl PortalInletInterceptor {
Expand All @@ -164,11 +169,13 @@ impl PortalInletInterceptor {
interceptor_factory: Arc<dyn PortalInterceptorFactory>,
response_incoming_access_control: Arc<dyn IncomingAccessControl>,
request_outgoing_access_control: Arc<dyn OutgoingAccessControl>,
portal_payload_length: usize,
) -> ockam_core::Result<()> {
let worker = Self {
interceptor_factory,
request_outgoing_access_control,
response_incoming_access_control,
portal_payload_length,
};

context.start_worker(listener_address, worker).await
Expand Down Expand Up @@ -213,6 +220,7 @@ impl Worker for PortalInletInterceptor {
self.request_outgoing_access_control.clone(),
self.response_incoming_access_control.clone(),
self.interceptor_factory.create(),
self.portal_payload_length,
)
.await?;

Expand All @@ -238,6 +246,7 @@ pub struct PortalInterceptorWorker {
disconnect_received: Arc<AtomicBool>,
interceptor: Arc<dyn PortalInterceptor>,
direction: Direction,
portal_payload_length: usize,
}

#[async_trait]
Expand Down Expand Up @@ -368,6 +377,7 @@ impl PortalInterceptorWorker {
outgoing_access_control: Arc<dyn OutgoingAccessControl>,
incoming_access_control: Arc<dyn IncomingAccessControl>,
interceptor: Arc<dyn PortalInterceptor>,
portal_payload_length: usize,
) -> ockam_core::Result<Address> {
let from_inlet_worker_address =
Address::random_tagged("InterceptorPortalWorker.from_inlet_to_outlet");
Expand All @@ -386,6 +396,7 @@ impl PortalInterceptorWorker {
disconnect_received: disconnect_received.clone(),
fixed_onward_route: Some(inlet_instance),
interceptor: interceptor.clone(),
portal_payload_length,
};

WorkerBuilder::new(from_outlet_worker)
Expand All @@ -400,6 +411,7 @@ impl PortalInterceptorWorker {
disconnect_received: disconnect_received.clone(),
fixed_onward_route: None,
interceptor: interceptor.clone(),
portal_payload_length,
};

WorkerBuilder::new(from_inlet_worker)
Expand Down Expand Up @@ -430,6 +442,7 @@ impl PortalInterceptorWorker {
/// - `spawner_flow_control_id` to account for future created outlets,
/// - `incoming_access_control` is the access control for the incoming messages.
/// - `outgoing_access_control` is the access control for the outgoing messages.
#[allow(clippy::too_many_arguments)]
async fn create_outlet_interceptor(
context: &mut Context,
outlet_route: Route,
Expand All @@ -438,6 +451,7 @@ impl PortalInterceptorWorker {
incoming_access_control: Arc<dyn IncomingAccessControl>,
outgoing_access_control: Arc<dyn OutgoingAccessControl>,
interceptor: Arc<dyn PortalInterceptor>,
portal_payload_length: usize,
) -> ockam_core::Result<Address> {
let from_inlet_worker_address =
Address::random_tagged("InterceptorPortalWorker.from_inlet_to_outlet");
Expand All @@ -451,13 +465,15 @@ impl PortalInterceptorWorker {
disconnect_received: disconnect_received.clone(),
fixed_onward_route: Some(outlet_route),
interceptor: interceptor.clone(),
portal_payload_length,
};
let from_outlet_worker = Self {
other_worker_address: from_inlet_worker_address.clone(),
direction: Direction::FromOutletToInlet,
disconnect_received: disconnect_received.clone(),
fixed_onward_route: None,
interceptor: interceptor.clone(),
portal_payload_length,
};

let flow_controls = context.flow_controls();
Expand Down Expand Up @@ -569,7 +585,7 @@ impl PortalInterceptorWorker {
onward_route = provided_onward_route.clone().modify().pop_front().into();
};

for chunk in buffer.chunks(MAX_PAYLOAD_SIZE) {
for chunk in buffer.chunks(self.portal_payload_length) {
let message = LocalMessage::new()
.with_onward_route(onward_route.clone())
.with_return_route(return_route.clone())
Expand Down
Loading

0 comments on commit c4afb4a

Please sign in to comment.