Skip to content

Commit

Permalink
feat(rust): make start_worker sync
Browse files Browse the repository at this point in the history
  • Loading branch information
SanjoDeundiak committed Jan 10, 2025
1 parent a192b6a commit 6953d64
Show file tree
Hide file tree
Showing 194 changed files with 1,030 additions and 1,438 deletions.
7 changes: 3 additions & 4 deletions examples/rust/file_transfer/examples/receiver.rs
Original file line number Diff line number Diff line change
Expand Up @@ -81,7 +81,7 @@ impl Worker for FileReception {
#[ockam::node]
async fn main(ctx: Context) -> Result<()> {
let node = node(ctx).await?;
let tcp = node.create_tcp_transport().await?;
let tcp = node.create_tcp_transport()?;

// Create an Identity to represent Receiver.
let receiver = node.create_identity().await?;
Expand All @@ -97,8 +97,7 @@ async fn main(ctx: Context) -> Result<()> {

// Create a secure channel listener for Receiver that will wait for requests to
// initiate an Authenticated Key Exchange.
node.create_secure_channel_listener(&receiver, "listener", secure_channel_listener_options)
.await?;
node.create_secure_channel_listener(&receiver, "listener", secure_channel_listener_options)?;

// The computer that is running this program is likely within a private network and
// not accessible over the internet.
Expand All @@ -118,7 +117,7 @@ async fn main(ctx: Context) -> Result<()> {
println!("{}", relay.remote_address());

// Start a worker, of type FileReception, at address "receiver".
node.start_worker("receiver", FileReception::default()).await?;
node.start_worker("receiver", FileReception::default())?;

// We won't call ctx.shutdown_node() here, this program will quit when the file will be entirely received
Ok(())
Expand Down
2 changes: 1 addition & 1 deletion examples/rust/file_transfer/examples/sender.rs
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,7 @@ async fn main(ctx: Context) -> Result<()> {
let opt = Sender::parse();

let node = node(ctx).await?;
let tcp = node.create_tcp_transport().await?;
let tcp = node.create_tcp_transport()?;

// Create an Identity to represent Sender.
let sender = node.create_identity().await?;
Expand Down
2 changes: 1 addition & 1 deletion examples/rust/get_started/examples/02-worker.rs
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@ async fn main(ctx: Context) -> Result<()> {
let mut node = node(ctx).await?;

// Start a worker, of type Echoer, at address "echoer"
node.start_worker("echoer", Echoer).await?;
node.start_worker("echoer", Echoer)?;

// Send a message to the worker at address "echoer".
node.send("echoer", "Hello Ockam!".to_string()).await?;
Expand Down
8 changes: 4 additions & 4 deletions examples/rust/get_started/examples/03-routing-many-hops.rs
Original file line number Diff line number Diff line change
Expand Up @@ -9,12 +9,12 @@ async fn main(ctx: Context) -> Result<()> {
let mut node = node(ctx).await?;

// Start an Echoer worker at address "echoer"
node.start_worker("echoer", Echoer).await?;
node.start_worker("echoer", Echoer)?;

// Start 3 hop workers at addresses "h1", "h2" and "h3".
node.start_worker("h1", Hop).await?;
node.start_worker("h2", Hop).await?;
node.start_worker("h3", Hop).await?;
node.start_worker("h1", Hop)?;
node.start_worker("h2", Hop)?;
node.start_worker("h3", Hop)?;

// Send a message to the echoer worker via the "h1", "h2", and "h3" workers
let r = route!["h1", "h2", "h3", "echoer"];
Expand Down
4 changes: 2 additions & 2 deletions examples/rust/get_started/examples/03-routing.rs
Original file line number Diff line number Diff line change
Expand Up @@ -9,10 +9,10 @@ async fn main(ctx: Context) -> Result<()> {
let mut node = node(ctx).await?;

// Start a worker, of type Echoer, at address "echoer"
node.start_worker("echoer", Echoer).await?;
node.start_worker("echoer", Echoer)?;

// Start a worker, of type Hop, at address "h1"
node.start_worker("h1", Hop).await?;
node.start_worker("h1", Hop)?;

// Send a message to the worker at address "echoer",
// via the worker at address "h1"
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@ async fn main(ctx: Context) -> Result<()> {
let mut node = node(ctx).await?;

// Initialize the TCP Transport.
let tcp = node.create_tcp_transport().await?;
let tcp = node.create_tcp_transport()?;

// Create a TCP connection to a different node.
let connection_to_responder = tcp.connect("localhost:4000", TcpConnectionOptions::new()).await?;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -11,10 +11,10 @@ async fn main(ctx: Context) -> Result<()> {
let node = node(ctx).await?;

// Initialize the TCP Transport
let tcp = node.create_tcp_transport().await?;
let tcp = node.create_tcp_transport()?;

// Create an echoer worker
node.start_worker("echoer", Echoer).await?;
node.start_worker("echoer", Echoer)?;

// Create a TCP listener and wait for incoming connections.
let listener = tcp.listen("127.0.0.1:4000", TcpListenerOptions::new()).await?;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@ async fn main(ctx: Context) -> Result<()> {
let mut node = node(ctx).await?;

// Initialize the TCP Transport
let tcp = node.create_tcp_transport().await?;
let tcp = node.create_tcp_transport()?;

// Create a TCP connection to the middle node.
let connection_to_middle_node = tcp.connect("localhost:3000", TcpConnectionOptions::new()).await?;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -13,14 +13,13 @@ async fn main(ctx: Context) -> Result<()> {
let node = node(ctx).await?;

// Initialize the TCP Transport
let tcp = node.create_tcp_transport().await?;
let tcp = node.create_tcp_transport()?;

// Create a TCP connection to the responder node.
let connection_to_responder = tcp.connect("127.0.0.1:4000", TcpConnectionOptions::new()).await?;

// Create and start a Relay worker
node.start_worker("forward_to_responder", Relay::new(connection_to_responder))
.await?;
node.start_worker("forward_to_responder", Relay::new(connection_to_responder))?;

// Create a TCP listener and wait for incoming connections.
let listener = tcp.listen("127.0.0.1:3000", TcpListenerOptions::new()).await?;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -11,10 +11,10 @@ async fn main(ctx: Context) -> Result<()> {
let node = node(ctx).await?;

// Initialize the TCP Transport
let tcp = node.create_tcp_transport().await?;
let tcp = node.create_tcp_transport()?;

// Create an echoer worker
node.start_worker("echoer", Echoer).await?;
node.start_worker("echoer", Echoer)?;

// Create a TCP listener and wait for incoming connections.
let listener = tcp.listen("127.0.0.1:4000", TcpListenerOptions::new()).await?;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@ async fn main(ctx: Context) -> Result<()> {
.await?;

// Create an echoer worker
node.start_worker("echoer", Echoer).await?;
node.start_worker("echoer", Echoer)?;

node.flow_controls()
.add_consumer(&"echoer".into(), bind.flow_control_id());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@ async fn main(ctx: Context) -> Result<()> {
uds.listen("/tmp/ockam-example-echoer").await?;

// Create an echoer worker
node.start_worker("echoer", Echoer).await?;
node.start_worker("echoer", Echoer)?;

// Don't call node.shutdown() here so this node runs forever.
Ok(())
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@ async fn main(ctx: Context) -> Result<()> {
let alice = node.create_identity().await?;

// Create a TCP connection to the middle node.
let tcp = node.create_tcp_transport().await?;
let tcp = node.create_tcp_transport()?;
let connection_to_middle_node = tcp.connect("localhost:3000", TcpConnectionOptions::new()).await?;

// Connect to a secure channel listener and perform a handshake.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -13,14 +13,13 @@ async fn main(ctx: Context) -> Result<()> {
let node = node(ctx).await?;

// Initialize the TCP Transport
let tcp = node.create_tcp_transport().await?;
let tcp = node.create_tcp_transport()?;

// Create a TCP connection to Bob.
let connection_to_bob = tcp.connect("127.0.0.1:4000", TcpConnectionOptions::new()).await?;

// Start a Relay to forward messages to Bob using the TCP connection.
node.start_worker("forward_to_bob", Relay::new(route![connection_to_bob]))
.await?;
node.start_worker("forward_to_bob", Relay::new(route![connection_to_bob]))?;

// Create a TCP listener and wait for incoming connections.
let listener = tcp.listen("127.0.0.1:3000", TcpListenerOptions::new()).await?;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -12,9 +12,9 @@ async fn main(ctx: Context) -> Result<()> {
let node = node(ctx).await?;

// Initialize the TCP Transport.
let tcp = node.create_tcp_transport().await?;
let tcp = node.create_tcp_transport()?;

node.start_worker("echoer", Echoer).await?;
node.start_worker("echoer", Echoer)?;

let bob = node.create_identity().await?;

Expand All @@ -23,13 +23,11 @@ async fn main(ctx: Context) -> Result<()> {

// Create a secure channel listener for Bob that will wait for requests to
// initiate an Authenticated Key Exchange.
let secure_channel_listener = node
.create_secure_channel_listener(
&bob,
"bob_listener",
SecureChannelListenerOptions::new().as_consumer(listener.flow_control_id()),
)
.await?;
let secure_channel_listener = node.create_secure_channel_listener(
&bob,
"bob_listener",
SecureChannelListenerOptions::new().as_consumer(listener.flow_control_id()),
)?;

// Allow access to the Echoer via Secure Channels
node.flow_controls()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,8 +26,7 @@ async fn main(ctx: Context) -> Result<()> {
node.start_worker(
"forward_to_bob",
Relay::new(route![udp_bind.clone(), (UDP, "127.0.0.1:4000")]),
)
.await?;
)?;

node.flow_controls()
.add_consumer(&"forward_to_bob".into(), udp_bind.flow_control_id());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@ async fn main(ctx: Context) -> Result<()> {
// Initialize the UDP Transport.
let udp = node.create_udp_transport().await?;

node.start_worker("echoer", Echoer).await?;
node.start_worker("echoer", Echoer)?;

let bob = node.create_identity().await?;

Expand All @@ -27,13 +27,11 @@ async fn main(ctx: Context) -> Result<()> {

// Create a secure channel listener for Bob that will wait for requests to
// initiate an Authenticated Key Exchange.
let secure_channel_listener = node
.create_secure_channel_listener(
&bob,
"bob_listener",
SecureChannelListenerOptions::new().as_consumer(udp_bind.flow_control_id()),
)
.await?;
let secure_channel_listener = node.create_secure_channel_listener(
&bob,
"bob_listener",
SecureChannelListenerOptions::new().as_consumer(udp_bind.flow_control_id()),
)?;

// Allow access to the Echoer via Secure Channels
node.flow_controls()
Expand Down
Original file line number Diff line number Diff line change
@@ -1,11 +1,12 @@
use ockam::identity::{SecureChannelOptions, Vault};
use ockam::tcp::{TcpConnectionOptions, TcpTransportExtension};
use ockam::tcp::TcpConnectionOptions;
use ockam::vault::{EdDSACurve25519SecretKey, SigningSecret, SoftwareVaultForSigning};
use ockam::{route, Context, Node, Result};
use ockam_api::enroll::enrollment::Enrollment;
use ockam_api::nodes::NodeManager;
use ockam_api::DefaultAddress;
use ockam_multiaddr::MultiAddr;
use ockam_transport_tcp::TcpTransportExtension;

#[ockam::node]
async fn main(ctx: Context) -> Result<()> {
Expand All @@ -24,9 +25,9 @@ async fn main(ctx: Context) -> Result<()> {
let mut vault = Vault::create().await?;
vault.identity_vault = identity_vault;

let mut node = Node::builder().await?.with_vault(vault).build(&ctx).await?;
let mut node = Node::builder().await?.with_vault(vault).build(&ctx)?;
// Initialize the TCP Transport
let tcp = node.create_tcp_transport().await?;
let tcp = node.create_tcp_transport()?;

// Create an Identity representing the client
// We preload the client vault with a change history and secret key corresponding to the identity identifier
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,7 @@ async fn main(ctx: Context) -> Result<()> {
let mut vault = Vault::create().await?;
vault.identity_vault = identity_vault;

let node = Node::builder().await?.with_vault(vault).build(&ctx).await?;
let node = Node::builder().await?.with_vault(vault).build(&ctx)?;

let issuer_identity = hex::decode("81825837830101583285f68200815820afbca9cf5d440147450f9f0d0a038a337b3fe5c17086163f2c54509558b62ef4f41a654cf97d1a7818fc7d8200815840650c4c939b96142546559aed99c52b64aa8a2f7b242b46534f7f8d0c5cc083d2c97210b93e9bca990e9cb9301acc2b634ffb80be314025f9adc870713e6fde0d").unwrap();
let issuer = node.import_private_identity(None, &issuer_identity, &secret).await?;
Expand Down Expand Up @@ -87,8 +87,7 @@ async fn main(ctx: Context) -> Result<()> {
// Start a secure channel listener that only allows channels where the identity
// at the other end of the channel can authenticate with the latest private key
// corresponding to one of the above known public identifiers.
node.create_secure_channel_listener(&issuer, DefaultAddress::SECURE_CHANNEL_LISTENER, sc_listener_options)
.await?;
node.create_secure_channel_listener(&issuer, DefaultAddress::SECURE_CHANNEL_LISTENER, sc_listener_options)?;

// Start a credential issuer worker that will only accept incoming requests from
// authenticated secure channels with our known public identifiers.
Expand All @@ -100,11 +99,10 @@ async fn main(ctx: Context) -> Result<()> {
credential_issuer,
allow_known,
AllowAll,
)
.await?;
)?;

// Initialize TCP Transport, create a TCP listener, and wait for connections.
let tcp = node.create_tcp_transport().await?;
let tcp = node.create_tcp_transport()?;
tcp.listen("127.0.0.1:5000", tcp_listener_options).await?;

// Don't call node.shutdown() here so this node runs forever.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -28,10 +28,10 @@ async fn main(ctx: Context) -> Result<()> {
let mut vault = Vault::create().await?;
vault.identity_vault = identity_vault;

let node = Node::builder().await?.with_vault(vault).build(&ctx).await?;
let node = Node::builder().await?.with_vault(vault).build(&ctx)?;

// Initialize the TCP Transport
let tcp = node.create_tcp_transport().await?;
let tcp = node.create_tcp_transport()?;

// Create an Identity representing the server
// Load an identity corresponding to the following public identifier
Expand Down Expand Up @@ -96,20 +96,17 @@ async fn main(ctx: Context) -> Result<()> {
Some(issuer),
"cluster",
"production",
)
.await?;
)?;
node.start_worker_with_access_control(
DefaultAddress::ECHO_SERVICE,
Echoer,
allow_production_incoming,
allow_production_outgoing,
)
.await?;
)?;

// Start a secure channel listener that only allows channels with
// authenticated identities.
node.create_secure_channel_listener(&server, DefaultAddress::SECURE_CHANNEL_LISTENER, sc_listener_options)
.await?;
node.create_secure_channel_listener(&server, DefaultAddress::SECURE_CHANNEL_LISTENER, sc_listener_options)?;

// Create a TCP listener and wait for incoming connections
tcp.listen("127.0.0.1:4000", tcp_listener_options).await?;
Expand Down
Loading

0 comments on commit 6953d64

Please sign in to comment.