Skip to content

Commit

Permalink
.
Browse files Browse the repository at this point in the history
Signed-off-by: Derek Wang <[email protected]>
  • Loading branch information
whynowy committed Jan 24, 2025
1 parent 68386a7 commit bdbd7ed
Show file tree
Hide file tree
Showing 8 changed files with 31 additions and 31 deletions.
6 changes: 3 additions & 3 deletions pkg/shared/clients/nats/client_pool_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,7 @@ import (
)

func TestNewClientPool_Success(t *testing.T) {
os.Setenv(dfv1.EnvISBSvcJetStreamURL, "nats://localhost:4222")
os.Setenv(dfv1.EnvISBSvcJetStreamURL, "nats://nats:4222")
os.Setenv(dfv1.EnvISBSvcJetStreamUser, "user")
os.Setenv(dfv1.EnvISBSvcJetStreamPassword, "password")
ctx := context.Background()
Expand All @@ -39,7 +39,7 @@ func TestNewClientPool_Success(t *testing.T) {
}

func TestClientPool_NextAvailableClient(t *testing.T) {
os.Setenv(dfv1.EnvISBSvcJetStreamURL, "nats://localhost:4222")
os.Setenv(dfv1.EnvISBSvcJetStreamURL, "nats://nats:4222")
os.Setenv(dfv1.EnvISBSvcJetStreamUser, "user")
os.Setenv(dfv1.EnvISBSvcJetStreamPassword, "password")
ctx := context.Background()
Expand All @@ -58,7 +58,7 @@ func TestClientPool_NextAvailableClient(t *testing.T) {
}

func TestClientPool_CloseAll(t *testing.T) {
os.Setenv(dfv1.EnvISBSvcJetStreamURL, "nats://localhost:4222")
os.Setenv(dfv1.EnvISBSvcJetStreamURL, "nats://nats:4222")
os.Setenv(dfv1.EnvISBSvcJetStreamUser, "user")
os.Setenv(dfv1.EnvISBSvcJetStreamPassword, "password")
ctx := context.Background()
Expand Down
2 changes: 1 addition & 1 deletion pkg/shared/clients/nats/nats_client_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,7 @@ import (

func TestNewNATSClient(t *testing.T) {
// Setting up environment variables for the test
os.Setenv(dfv1.EnvISBSvcJetStreamURL, "nats://localhost:4222")
os.Setenv(dfv1.EnvISBSvcJetStreamURL, "nats://nats:4222")
os.Setenv(dfv1.EnvISBSvcJetStreamUser, "user")
os.Setenv(dfv1.EnvISBSvcJetStreamPassword, "password")
defer os.Clearenv()
Expand Down
16 changes: 8 additions & 8 deletions rust/numaflow-core/src/config/pipeline.rs
Original file line number Diff line number Diff line change
Expand Up @@ -479,7 +479,7 @@ mod tests {
fn test_pipeline_config_load_sink_vertex() {
let pipeline_cfg_base64 = "eyJtZXRhZGF0YSI6eyJuYW1lIjoic2ltcGxlLXBpcGVsaW5lLW91dCIsIm5hbWVzcGFjZSI6ImRlZmF1bHQiLCJjcmVhdGlvblRpbWVzdGFtcCI6bnVsbH0sInNwZWMiOnsibmFtZSI6Im91dCIsInNpbmsiOnsiYmxhY2tob2xlIjp7fSwicmV0cnlTdHJhdGVneSI6eyJvbkZhaWx1cmUiOiJyZXRyeSJ9fSwibGltaXRzIjp7InJlYWRCYXRjaFNpemUiOjUwMCwicmVhZFRpbWVvdXQiOiIxcyIsImJ1ZmZlck1heExlbmd0aCI6MzAwMDAsImJ1ZmZlclVzYWdlTGltaXQiOjgwfSwic2NhbGUiOnsibWluIjoxfSwidXBkYXRlU3RyYXRlZ3kiOnsidHlwZSI6IlJvbGxpbmdVcGRhdGUiLCJyb2xsaW5nVXBkYXRlIjp7Im1heFVuYXZhaWxhYmxlIjoiMjUlIn19LCJwaXBlbGluZU5hbWUiOiJzaW1wbGUtcGlwZWxpbmUiLCJpbnRlclN0ZXBCdWZmZXJTZXJ2aWNlTmFtZSI6IiIsInJlcGxpY2FzIjowLCJmcm9tRWRnZXMiOlt7ImZyb20iOiJpbiIsInRvIjoib3V0IiwiY29uZGl0aW9ucyI6bnVsbCwiZnJvbVZlcnRleFR5cGUiOiJTb3VyY2UiLCJmcm9tVmVydGV4UGFydGl0aW9uQ291bnQiOjEsImZyb21WZXJ0ZXhMaW1pdHMiOnsicmVhZEJhdGNoU2l6ZSI6NTAwLCJyZWFkVGltZW91dCI6IjFzIiwiYnVmZmVyTWF4TGVuZ3RoIjozMDAwMCwiYnVmZmVyVXNhZ2VMaW1pdCI6ODB9LCJ0b1ZlcnRleFR5cGUiOiJTaW5rIiwidG9WZXJ0ZXhQYXJ0aXRpb25Db3VudCI6MSwidG9WZXJ0ZXhMaW1pdHMiOnsicmVhZEJhdGNoU2l6ZSI6NTAwLCJyZWFkVGltZW91dCI6IjFzIiwiYnVmZmVyTWF4TGVuZ3RoIjozMDAwMCwiYnVmZmVyVXNhZ2VMaW1pdCI6ODB9fV0sIndhdGVybWFyayI6eyJtYXhEZWxheSI6IjBzIn19LCJzdGF0dXMiOnsicGhhc2UiOiIiLCJyZXBsaWNhcyI6MCwiZGVzaXJlZFJlcGxpY2FzIjowLCJsYXN0U2NhbGVkQXQiOm51bGx9fQ==".to_string();

let env_vars = [("NUMAFLOW_ISBSVC_JETSTREAM_URL", "localhost:4222")];
let env_vars = [("NUMAFLOW_ISBSVC_JETSTREAM_URL", "nats:4222")];
let pipeline_config = PipelineConfig::load(pipeline_cfg_base64, env_vars).unwrap();

let expected = PipelineConfig {
Expand All @@ -490,7 +490,7 @@ mod tests {
paf_concurrency: 1000,
read_timeout: Duration::from_secs(1),
js_client_config: isb::jetstream::ClientConfig {
url: "localhost:4222".to_string(),
url: "nats:4222".to_string(),
user: None,
password: None,
},
Expand Down Expand Up @@ -526,7 +526,7 @@ mod tests {
fn test_pipeline_config_load_all() {
let pipeline_cfg_base64 = "eyJtZXRhZGF0YSI6eyJuYW1lIjoic2ltcGxlLXBpcGVsaW5lLWluIiwibmFtZXNwYWNlIjoiZGVmYXVsdCIsImNyZWF0aW9uVGltZXN0YW1wIjpudWxsfSwic3BlYyI6eyJuYW1lIjoiaW4iLCJzb3VyY2UiOnsiZ2VuZXJhdG9yIjp7InJwdSI6MTAwMDAwLCJkdXJhdGlvbiI6IjFzIiwibXNnU2l6ZSI6OCwiaml0dGVyIjoiMHMifX0sImNvbnRhaW5lclRlbXBsYXRlIjp7InJlc291cmNlcyI6e30sImVudiI6W3sibmFtZSI6IlBBRl9CQVRDSF9TSVpFIiwidmFsdWUiOiIxMDAwMDAifV19LCJsaW1pdHMiOnsicmVhZEJhdGNoU2l6ZSI6MTAwMCwicmVhZFRpbWVvdXQiOiIxcyIsImJ1ZmZlck1heExlbmd0aCI6MTUwMDAwLCJidWZmZXJVc2FnZUxpbWl0Ijo4NX0sInNjYWxlIjp7Im1pbiI6MX0sInVwZGF0ZVN0cmF0ZWd5Ijp7InR5cGUiOiJSb2xsaW5nVXBkYXRlIiwicm9sbGluZ1VwZGF0ZSI6eyJtYXhVbmF2YWlsYWJsZSI6IjI1JSJ9fSwicGlwZWxpbmVOYW1lIjoic2ltcGxlLXBpcGVsaW5lIiwiaW50ZXJTdGVwQnVmZmVyU2VydmljZU5hbWUiOiIiLCJyZXBsaWNhcyI6MCwidG9FZGdlcyI6W3siZnJvbSI6ImluIiwidG8iOiJvdXQiLCJjb25kaXRpb25zIjpudWxsLCJmcm9tVmVydGV4VHlwZSI6IlNvdXJjZSIsImZyb21WZXJ0ZXhQYXJ0aXRpb25Db3VudCI6MSwiZnJvbVZlcnRleExpbWl0cyI6eyJyZWFkQmF0Y2hTaXplIjoxMDAwLCJyZWFkVGltZW91dCI6IjFzIiwiYnVmZmVyTWF4TGVuZ3RoIjoxNTAwMDAsImJ1ZmZlclVzYWdlTGltaXQiOjg1fSwidG9WZXJ0ZXhUeXBlIjoiU2luayIsInRvVmVydGV4UGFydGl0aW9uQ291bnQiOjEsInRvVmVydGV4TGltaXRzIjp7InJlYWRCYXRjaFNpemUiOjEwMDAsInJlYWRUaW1lb3V0IjoiMXMiLCJidWZmZXJNYXhMZW5ndGgiOjE1MDAwMCwiYnVmZmVyVXNhZ2VMaW1pdCI6ODV9fV0sIndhdGVybWFyayI6eyJkaXNhYmxlZCI6dHJ1ZSwibWF4RGVsYXkiOiIwcyJ9fSwic3RhdHVzIjp7InBoYXNlIjoiIiwicmVwbGljYXMiOjAsImRlc2lyZWRSZXBsaWNhcyI6MCwibGFzdFNjYWxlZEF0IjpudWxsfX0=";

let env_vars = [("NUMAFLOW_ISBSVC_JETSTREAM_URL", "localhost:4222")];
let env_vars = [("NUMAFLOW_ISBSVC_JETSTREAM_URL", "nats:4222")];
let pipeline_config =
PipelineConfig::load(pipeline_cfg_base64.to_string(), env_vars).unwrap();

Expand All @@ -538,7 +538,7 @@ mod tests {
paf_concurrency: 1000,
read_timeout: Duration::from_secs(1),
js_client_config: isb::jetstream::ClientConfig {
url: "localhost:4222".to_string(),
url: "nats:4222".to_string(),
user: None,
password: None,
},
Expand Down Expand Up @@ -579,7 +579,7 @@ mod tests {
fn test_pipeline_config_pulsar_source() {
let pipeline_cfg_base64 = "eyJtZXRhZGF0YSI6eyJuYW1lIjoic2ltcGxlLXBpcGVsaW5lLWluIiwibmFtZXNwYWNlIjoiZGVmYXVsdCIsImNyZWF0aW9uVGltZXN0YW1wIjpudWxsfSwic3BlYyI6eyJuYW1lIjoiaW4iLCJzb3VyY2UiOnsicHVsc2FyIjp7InNlcnZlckFkZHIiOiJwdWxzYXI6Ly9wdWxzYXItc2VydmljZTo2NjUwIiwidG9waWMiOiJ0ZXN0X3BlcnNpc3RlbnQiLCJjb25zdW1lck5hbWUiOiJteV9wZXJzaXN0ZW50X2NvbnN1bWVyIiwic3Vic2NyaXB0aW9uTmFtZSI6Im15X3BlcnNpc3RlbnRfc3Vic2NyaXB0aW9uIn19LCJsaW1pdHMiOnsicmVhZEJhdGNoU2l6ZSI6NTAsInJlYWRUaW1lb3V0IjoiMXMiLCJidWZmZXJNYXhMZW5ndGgiOjMwMDAwLCJidWZmZXJVc2FnZUxpbWl0Ijo4MH0sInNjYWxlIjp7Im1pbiI6MSwibWF4IjoxfSwidXBkYXRlU3RyYXRlZ3kiOnsidHlwZSI6IlJvbGxpbmdVcGRhdGUiLCJyb2xsaW5nVXBkYXRlIjp7Im1heFVuYXZhaWxhYmxlIjoiMjUlIn19LCJwaXBlbGluZU5hbWUiOiJzaW1wbGUtcGlwZWxpbmUiLCJpbnRlclN0ZXBCdWZmZXJTZXJ2aWNlTmFtZSI6IiIsInJlcGxpY2FzIjowLCJ0b0VkZ2VzIjpbeyJmcm9tIjoiaW4iLCJ0byI6Im91dCIsImNvbmRpdGlvbnMiOm51bGwsImZyb21WZXJ0ZXhUeXBlIjoiU291cmNlIiwiZnJvbVZlcnRleFBhcnRpdGlvbkNvdW50IjoxLCJmcm9tVmVydGV4TGltaXRzIjp7InJlYWRCYXRjaFNpemUiOjUwLCJyZWFkVGltZW91dCI6IjFzIiwiYnVmZmVyTWF4TGVuZ3RoIjozMDAwMCwiYnVmZmVyVXNhZ2VMaW1pdCI6ODB9LCJ0b1ZlcnRleFR5cGUiOiJTaW5rIiwidG9WZXJ0ZXhQYXJ0aXRpb25Db3VudCI6MSwidG9WZXJ0ZXhMaW1pdHMiOnsicmVhZEJhdGNoU2l6ZSI6NTAsInJlYWRUaW1lb3V0IjoiMXMiLCJidWZmZXJNYXhMZW5ndGgiOjMwMDAwLCJidWZmZXJVc2FnZUxpbWl0Ijo4MH19XSwid2F0ZXJtYXJrIjp7Im1heERlbGF5IjoiMHMifX0sInN0YXR1cyI6eyJwaGFzZSI6IiIsInJlcGxpY2FzIjowLCJkZXNpcmVkUmVwbGljYXMiOjAsImxhc3RTY2FsZWRBdCI6bnVsbH19";

let env_vars = [("NUMAFLOW_ISBSVC_JETSTREAM_URL", "localhost:4222")];
let env_vars = [("NUMAFLOW_ISBSVC_JETSTREAM_URL", "nats:4222")];
let pipeline_config =
PipelineConfig::load(pipeline_cfg_base64.to_string(), env_vars).unwrap();

Expand All @@ -591,7 +591,7 @@ mod tests {
paf_concurrency: 1000,
read_timeout: Duration::from_secs(1),
js_client_config: isb::jetstream::ClientConfig {
url: "localhost:4222".to_string(),
url: "nats:4222".to_string(),
user: None,
password: None,
},
Expand Down Expand Up @@ -702,7 +702,7 @@ mod tests {
fn test_pipeline_config_load_map_vertex() {
let pipeline_cfg_base64 = "eyJtZXRhZGF0YSI6eyJuYW1lIjoic2ltcGxlLXBpcGVsaW5lLW1hcCIsIm5hbWVzcGFjZSI6ImRlZmF1bHQiLCJjcmVhdGlvblRpbWVzdGFtcCI6bnVsbH0sInNwZWMiOnsibmFtZSI6Im1hcCIsInVkZiI6eyJjb250YWluZXIiOnsidGVtcGxhdGUiOiJkZWZhdWx0In19LCJsaW1pdHMiOnsicmVhZEJhdGNoU2l6ZSI6NTAwLCJyZWFkVGltZW91dCI6IjFzIiwiYnVmZmVyTWF4TGVuZ3RoIjozMDAwMCwiYnVmZmVyVXNhZ2VMaW1pdCI6ODB9LCJzY2FsZSI6eyJtaW4iOjF9LCJwaXBlbGluZU5hbWUiOiJzaW1wbGUtcGlwZWxpbmUiLCJpbnRlclN0ZXBCdWZmZXJTZXJ2aWNlTmFtZSI6IiIsInJlcGxpY2FzIjowLCJmcm9tRWRnZXMiOlt7ImZyb20iOiJpbiIsInRvIjoibWFwIiwiY29uZGl0aW9ucyI6bnVsbCwiZnJvbVZlcnRleFR5cGUiOiJTb3VyY2UiLCJmcm9tVmVydGV4UGFydGl0aW9uQ291bnQiOjEsImZyb21WZXJ0ZXhMaW1pdHMiOnsicmVhZEJhdGNoU2l6ZSI6NTAwLCJyZWFkVGltZW91dCI6IjFzIiwiYnVmZmVyTWF4TGVuZ3RoIjozMDAwMCwiYnVmZmVyVXNhZ2VMaW1pdCI6ODB9LCJ0b1ZlcnRleFR5cGUiOiJNYXAiLCJ0b1ZlcnRleFBhcnRpdGlvbkNvdW50IjoxLCJ0b1ZlcnRleExpbWl0cyI6eyJyZWFkQmF0Y2hTaXplIjo1MDAsInJlYWRUaW1lb3V0IjoiMXMiLCJidWZmZXJNYXhMZW5ndGgiOjMwMDAwLCJidWZmZXJVc2FnZUxpbWl0Ijo4MH19XSwid2F0ZXJtYXJrIjp7Im1heERlbGF5IjoiMHMifX0sInN0YXR1cyI6eyJwaGFzZSI6IiIsInJlcGxpY2FzIjowLCJkZXNpcmVkUmVwbGljYXMiOjAsImxhc3RTY2FsZWRBdCI6bnVsbH19";

let env_vars = [("NUMAFLOW_ISBSVC_JETSTREAM_URL", "localhost:4222")];
let env_vars = [("NUMAFLOW_ISBSVC_JETSTREAM_URL", "nats:4222")];
let pipeline_config =
PipelineConfig::load(pipeline_cfg_base64.to_string(), env_vars).unwrap();

Expand All @@ -714,7 +714,7 @@ mod tests {
paf_concurrency: 1000,
read_timeout: Duration::from_secs(1),
js_client_config: isb::jetstream::ClientConfig {
url: "localhost:4222".to_string(),
url: "nats:4222".to_string(),
user: None,
password: None,
},
Expand Down
4 changes: 2 additions & 2 deletions rust/numaflow-core/src/config/pipeline/isb.rs
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@ const DEFAULT_BUFFER_FULL_STRATEGY: BufferFullStrategy = BufferFullStrategy::Ret
const DEFAULT_WIP_ACK_INTERVAL_MILLIS: u64 = 1000;

pub(crate) mod jetstream {
const DEFAULT_URL: &str = "localhost:4222";
const DEFAULT_URL: &str = "nats:4222";
#[derive(Debug, Clone, PartialEq)]
pub(crate) struct ClientConfig {
pub url: String,
Expand Down Expand Up @@ -102,7 +102,7 @@ mod jetstream_client_config {
#[test]
fn test_default_client_config() {
let expected_config = ClientConfig {
url: "localhost:4222".to_string(),
url: "nats:4222".to_string(),
user: None,
password: None,
};
Expand Down
12 changes: 6 additions & 6 deletions rust/numaflow-core/src/pipeline.rs
Original file line number Diff line number Diff line change
Expand Up @@ -397,7 +397,7 @@ mod tests {
"default-test-forwarder-for-source-vertex-out-4",
];

let js_url = "localhost:4222";
let js_url = "nats:4222";
let client = async_nats::connect(js_url).await.unwrap();
let context = jetstream::new(client);

Expand Down Expand Up @@ -442,7 +442,7 @@ mod tests {
paf_concurrency: 30000,
read_timeout: Duration::from_secs(1),
js_client_config: isb::jetstream::ClientConfig {
url: "localhost:4222".to_string(),
url: "nats:4222".to_string(),
user: None,
password: None,
},
Expand Down Expand Up @@ -539,7 +539,7 @@ mod tests {
"default-test-forwarder-for-sink-vertex-out-4",
];

let js_url = "localhost:4222";
let js_url = "nats:4222";
let client = async_nats::connect(js_url).await.unwrap();
let context = jetstream::new(client);

Expand Down Expand Up @@ -611,7 +611,7 @@ mod tests {
paf_concurrency: 1000,
read_timeout: Duration::from_secs(1),
js_client_config: isb::jetstream::ClientConfig {
url: "localhost:4222".to_string(),
url: "nats:4222".to_string(),
user: None,
password: None,
},
Expand Down Expand Up @@ -736,7 +736,7 @@ mod tests {
"default-test-forwarder-for-map-vertex-out-4",
];

let js_url = "localhost:4222";
let js_url = "nats:4222";
let client = async_nats::connect(js_url).await.unwrap();
let context = jetstream::new(client);

Expand Down Expand Up @@ -840,7 +840,7 @@ mod tests {
paf_concurrency: 1000,
read_timeout: Duration::from_secs(1),
js_client_config: isb::jetstream::ClientConfig {
url: "localhost:4222".to_string(),
url: "nats:4222".to_string(),
user: None,
password: None,
},
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -229,7 +229,7 @@ mod tests {
);

// create a js writer
let js_url = "localhost:4222";
let js_url = "nats:4222";
// Create JetStream context
let client = async_nats::connect(js_url).await.unwrap();
let context = jetstream::new(client);
Expand Down
4 changes: 2 additions & 2 deletions rust/numaflow-core/src/pipeline/isb/jetstream/reader.rs
Original file line number Diff line number Diff line change
Expand Up @@ -301,7 +301,7 @@ mod tests {
#[cfg(feature = "nats-tests")]
#[tokio::test]
async fn test_jetstream_read() {
let js_url = "localhost:4222";
let js_url = "nats:4222";
// Create JetStream context
let client = async_nats::connect(js_url).await.unwrap();
let context = jetstream::new(client);
Expand Down Expand Up @@ -398,7 +398,7 @@ mod tests {
#[cfg(feature = "nats-tests")]
#[tokio::test]
async fn test_jetstream_ack() {
let js_url = "localhost:4222";
let js_url = "nats:4222";
// Create JetStream context
let client = async_nats::connect(js_url).await.unwrap();
let context = jetstream::new(client);
Expand Down
16 changes: 8 additions & 8 deletions rust/numaflow-core/src/pipeline/isb/jetstream/writer.rs
Original file line number Diff line number Diff line change
Expand Up @@ -484,7 +484,7 @@ mod tests {
async fn test_async_write() {
let tracker_handle = TrackerHandle::new(None);
let cln_token = CancellationToken::new();
let js_url = "localhost:4222";
let js_url = "nats:4222";
// Create JetStream context
let client = async_nats::connect(js_url).await.unwrap();
let context = jetstream::new(client);
Expand Down Expand Up @@ -559,7 +559,7 @@ mod tests {
#[tokio::test]
async fn test_sync_write() {
let cln_token = CancellationToken::new();
let js_url = "localhost:4222";
let js_url = "nats:4222";
// Create JetStream context
let client = async_nats::connect(js_url).await.unwrap();
let context = jetstream::new(client);
Expand Down Expand Up @@ -623,7 +623,7 @@ mod tests {
#[tokio::test]
async fn test_write_with_cancellation() {
let tracker_handle = TrackerHandle::new(None);
let js_url = "localhost:4222";
let js_url = "nats:4222";
// Create JetStream context
let client = async_nats::connect(js_url).await.unwrap();
let context = jetstream::new(client);
Expand Down Expand Up @@ -747,7 +747,7 @@ mod tests {
#[cfg(feature = "nats-tests")]
#[tokio::test]
async fn test_fetch_buffer_usage() {
let js_url = "localhost:4222";
let js_url = "nats:4222";
// Create JetStream context
let client = async_nats::connect(js_url).await.unwrap();
let context = jetstream::new(client);
Expand Down Expand Up @@ -823,7 +823,7 @@ mod tests {
#[tokio::test]
async fn test_check_stream_status() {
let tracker_handle = TrackerHandle::new(None);
let js_url = "localhost:4222";
let js_url = "nats:4222";
// Create JetStream context
let client = async_nats::connect(js_url).await.unwrap();
let context = jetstream::new(client);
Expand Down Expand Up @@ -916,7 +916,7 @@ mod tests {
#[tokio::test]
async fn test_streaming_write() {
let cln_token = CancellationToken::new();
let js_url = "localhost:4222";
let js_url = "nats:4222";
// Create JetStream context
let client = async_nats::connect(js_url).await.unwrap();
let context = jetstream::new(client);
Expand Down Expand Up @@ -1002,7 +1002,7 @@ mod tests {
#[cfg(feature = "nats-tests")]
#[tokio::test]
async fn test_streaming_write_with_cancellation() {
let js_url = "localhost:4222";
let js_url = "nats:4222";
// Create JetStream context
let client = async_nats::connect(js_url).await.unwrap();
let context = jetstream::new(client);
Expand Down Expand Up @@ -1118,7 +1118,7 @@ mod tests {
#[cfg(feature = "nats-tests")]
#[tokio::test]
async fn test_streaming_write_multiple_streams_vertices() {
let js_url = "localhost:4222";
let js_url = "nats:4222";
let client = async_nats::connect(js_url).await.unwrap();
let context = jetstream::new(client);
let tracker_handle = TrackerHandle::new(None);
Expand Down

0 comments on commit bdbd7ed

Please sign in to comment.