Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

New Kafka portal architecture - multiple concurrent consumers #8717

Draft
wants to merge 7 commits into
base: davide-baldo/ockam-vault-proxy
Choose a base branch
from
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 1 addition & 2 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

1 change: 0 additions & 1 deletion implementations/rust/ockam/ockam_api/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -74,7 +74,6 @@ jaq-interpret = "1"
jaq-parse = "1"
jaq-std = "1"
kafka-protocol = "0.13"
log = "0.4"
miette = { version = "7.2.0", features = ["fancy-no-backtrace"] }
minicbor = { version = "0.25.1", default-features = false, features = ["alloc", "derive"] }
nix = { version = "0.29", features = ["signal"] }
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -12,8 +12,7 @@ use crate::authenticator::{
};
use ockam::identity::utils::now;
use ockam::identity::{
Identifier, Identities, SecureChannelListenerOptions, SecureChannelSqlxDatabase,
SecureChannels, TrustEveryonePolicy,
Identifier, Identities, SecureChannelListenerOptions, SecureChannels, TrustEveryonePolicy,
};
use ockam::tcp::{TcpListenerOptions, TcpTransport};
use ockam_core::compat::sync::Arc;
Expand Down Expand Up @@ -75,14 +74,12 @@ impl Authority {

let members = Arc::new(AuthorityMembersSqlxDatabase::new(database.clone()));
let tokens = Arc::new(AuthorityEnrollmentTokenSqlxDatabase::new(database.clone()));
let secure_channel_repository = Arc::new(SecureChannelSqlxDatabase::new(database.clone()));

Self::bootstrap_repository(members.clone(), configuration).await?;

let identities = Identities::create_with_node(database, node_name).build();

let secure_channels =
SecureChannels::from_identities(identities.clone(), secure_channel_repository);
let secure_channels = SecureChannels::from_identities(identities.clone());

let identifier = configuration.identifier();
info!(identifier=%identifier, "retrieved the authority identifier");
Expand Down
Original file line number Diff line number Diff line change
@@ -1,11 +1,10 @@
use rand::random;
use std::path::{Path, PathBuf};
use tokio::sync::broadcast::{channel, Receiver, Sender};

use ockam::SqlxDatabase;
use ockam_core::env::get_env_with_default;
use ockam_node::database::DatabaseConfiguration;
use ockam_node::Executor;
use rand::random;
use std::path::{Path, PathBuf};
use tokio::sync::broadcast::{channel, Receiver, Sender};

use crate::cli_state::error::Result;
use crate::cli_state::CliStateError;
Expand Down Expand Up @@ -343,10 +342,10 @@ mod tests {

// create 2 identities
let identity1 = cli
.create_identity_with_name_and_vault("identity1", "vault1")
.create_identity_with_name_and_vault(None, "identity1", "vault1")
.await?;
let identity2 = cli
.create_identity_with_name_and_vault("identity2", "vault2")
.create_identity_with_name_and_vault(None, "identity2", "vault2")
.await?;

// create 2 nodes
Expand Down
66 changes: 48 additions & 18 deletions implementations/rust/ockam/ockam_api/src/cli_state/identities.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ use ockam::identity::models::ChangeHistory;
use ockam::identity::{Identifier, Identity};
use ockam_core::errcode::{Kind, Origin};
use ockam_core::Error;
use ockam_node::Context;
use ockam_vault::{HandleToSecret, SigningSecretKeyHandle};

use crate::cli_state::{random_name, CliState, Result};
Expand Down Expand Up @@ -31,6 +32,7 @@ impl CliState {
#[instrument(skip_all, fields(name = %name, vault_name = %vault_name))]
pub async fn create_identity_with_name_and_vault(
&self,
context: Option<&Context>,
name: &str,
vault_name: &str,
) -> Result<NamedIdentity> {
Expand All @@ -39,7 +41,9 @@ impl CliState {
};

let vault = self.get_named_vault(vault_name).await?;
let identities = self.make_identities(self.make_vault(vault).await?).await?;
let vault = self.make_vault(context, vault).await?;

let identities = self.make_identities(vault).await?;
let identity = identities.identities_creation().create_identity().await?;
let named_identity = self
.store_named_identity(&identity, name, vault_name)
Expand All @@ -65,9 +69,13 @@ impl CliState {
/// Create an identity associated with a name, using the default vault
/// If there is already an identity with that name, return its identifier
#[instrument(skip_all, fields(name = %name))]
pub async fn create_identity_with_name(&self, name: &str) -> Result<NamedIdentity> {
pub async fn create_identity_with_name(
&self,
context: Option<&Context>,
name: &str,
) -> Result<NamedIdentity> {
let vault = self.get_or_create_default_named_vault().await?;
self.create_identity_with_name_and_vault(name, &vault.name())
self.create_identity_with_name_and_vault(context, name, &vault.name())
.await
}

Expand All @@ -77,6 +85,7 @@ impl CliState {
#[instrument(skip_all, fields(name = %name, vault_name = %vault_name, key_id = %key_id))]
pub async fn create_identity_with_key_id(
&self,
context: Option<&Context>,
name: &str,
vault_name: &str,
key_id: &str,
Expand All @@ -96,8 +105,10 @@ impl CliState {
key_id.as_bytes().to_vec(),
));

let vault = self.make_vault(context, vault).await?;

// create the identity
let identities = self.make_identities(self.make_vault(vault).await?).await?;
let identities = self.make_identities(vault).await?;
let identifier = identities
.identities_creation()
.identity_builder()
Expand Down Expand Up @@ -154,13 +165,14 @@ impl CliState {
#[instrument(skip_all, fields(name = name.clone()))]
pub async fn get_named_identity_or_default(
&self,
context: Option<&Context>,
name: &Option<String>,
) -> Result<NamedIdentity> {
match name {
// Identity specified.
Some(name) => self.get_named_identity(name).await,
// No identity specified.
None => self.get_or_create_default_named_identity().await,
None => self.get_or_create_default_named_identity(context).await,
}
}

Expand Down Expand Up @@ -191,7 +203,11 @@ impl CliState {
/// Return a full identity from its name
/// Use the default identity if no name is given
#[instrument(skip_all, fields(name = name.clone()))]
pub async fn get_identity_by_optional_name(&self, name: &Option<String>) -> Result<Identity> {
pub async fn get_identity_by_optional_name(
&self,
context: Option<&Context>,
name: &Option<String>,
) -> Result<Identity> {
let named_identity = match name {
Some(name) => {
self.identities_repository()
Expand All @@ -209,7 +225,7 @@ impl CliState {
Some(identity) => {
let change_history = self.get_change_history(&identity.identifier()).await?;
let named_vault = self.get_named_vault(&identity.vault_name).await?;
let identity_vault = self.make_vault(named_vault).await?;
let identity_vault = self.make_vault(context, named_vault).await?;
Ok(Identity::import_from_change_history(
Some(&identity.identifier()),
change_history,
Expand Down Expand Up @@ -243,14 +259,23 @@ impl CliState {
/// Return the name of the default identity.
/// This function creates the default identity if it does not exist!
#[instrument(skip_all)]
pub async fn get_default_identity_name(&self) -> Result<String> {
Ok(self.get_or_create_default_named_identity().await?.name())
pub async fn get_or_create_default_identity_name(
&self,
context: Option<&Context>,
) -> Result<String> {
Ok(self
.get_or_create_default_named_identity(context)
.await?
.name())
}

/// Return the default named identity
/// This function creates the default identity if it does not exist!
#[instrument(skip_all)]
pub async fn get_or_create_default_named_identity(&self) -> Result<NamedIdentity> {
pub async fn get_or_create_default_named_identity(
&self,
context: Option<&Context>,
) -> Result<NamedIdentity> {
match self
.identities_repository()
.get_default_named_identity()
Expand All @@ -263,7 +288,8 @@ impl CliState {
self.notify_message(fmt_log!(
"There is no default Identity on this machine, generating one...\n"
));
self.create_identity_with_name(&random_name()).await
self.create_identity_with_name(context, &random_name())
.await
}
}
}
Expand All @@ -272,10 +298,14 @@ impl CliState {
/// - the given name if defined
/// - or the name of the default identity (which is created if it does not already exist!)
#[instrument(skip_all, fields(name = name.clone()))]
pub async fn get_identity_name_or_default(&self, name: &Option<String>) -> Result<String> {
pub async fn get_or_create_identity_name_or_default(
&self,
context: Option<&Context>,
name: &Option<String>,
) -> Result<String> {
match name {
Some(name) => Ok(name.clone()),
None => self.get_default_identity_name().await,
None => self.get_or_create_default_identity_name(context).await,
}
}

Expand Down Expand Up @@ -472,14 +502,14 @@ mod tests {
// then create an identity
let identity_name = "identity-name";
let identity = cli
.create_identity_with_name_and_vault(identity_name, vault_name)
.create_identity_with_name_and_vault(None, identity_name, vault_name)
.await?;
let expected = cli.get_named_identity(identity_name).await?;
assert_eq!(identity, expected);

// don't recreate the identity if it already exists with that name
let _ = cli
.create_identity_with_name_and_vault(identity_name, vault_name)
.create_identity_with_name_and_vault(None, identity_name, vault_name)
.await?;
let identities = cli.get_named_identities().await?;
assert_eq!(identities.len(), 1);
Expand All @@ -493,7 +523,7 @@ mod tests {

// create an identity using the default vault
let identity_name = "identity-name";
let identity = cli.create_identity_with_name(identity_name).await?;
let identity = cli.create_identity_with_name(None, identity_name).await?;
let expected = cli.get_named_identity(identity_name).await?;
assert_eq!(identity, expected);

Expand All @@ -509,7 +539,7 @@ mod tests {
let cli = CliState::test().await?;

// when we retrieve the default identity, we create it if it doesn't exist
let identity = cli.get_or_create_default_named_identity().await?;
let identity = cli.get_or_create_default_named_identity(None).await?;

// when the identity is created there is a change history + a named identity
let result = cli.get_change_history(&identity.identifier()).await;
Expand All @@ -528,7 +558,7 @@ mod tests {
#[tokio::test]
async fn test_delete_identity() -> Result<()> {
let cli = CliState::test().await?;
let identity = cli.create_identity_with_name("name").await?;
let identity = cli.create_identity_with_name(None, "name").await?;

// when the identity is created there is a change history + a named identity
let result = cli.get_change_history(&identity.identifier()).await;
Expand Down
Loading
Loading