Skip to content

Commit

Permalink
resolve conflicts and review comments
Browse files Browse the repository at this point in the history
Signed-off-by: adarsh0728 <[email protected]>
  • Loading branch information
adarsh0728 committed Jan 22, 2025
2 parents 6988355 + c492520 commit 6619dc1
Show file tree
Hide file tree
Showing 54 changed files with 1,774 additions and 644 deletions.
9 changes: 6 additions & 3 deletions .github/workflows/ci.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -80,7 +80,7 @@ jobs:

steps:
- name: Start Pulsar standalone container
run: docker run -d -p 6650:6650 -p 8080:8080 apachepulsar/pulsar:4.0.0 bin/pulsar standalone
run: docker run -d -p 6650:6650 -p 8080:8080 apachepulsar/pulsar:4.0.0 bin/pulsar standalone

- name: Set up Go 1.x
uses: actions/setup-go@v5
Expand Down Expand Up @@ -184,8 +184,11 @@ jobs:
run: RUSTFLAGS='-C target-feature=+crt-static' cargo build --release --target x86_64-unknown-linux-gnu
- name: Rename binary
run: cp -pv target/x86_64-unknown-linux-gnu/release/numaflow ./numaflow-rs-linux-amd64
- name: List files
run: pwd && ls -al && file ./numaflow-rs-linux-amd64
- name: Verfiy binary is Statically Linked
run: |
pwd && ls -al
file ./numaflow-rs-linux-amd64
file ./numaflow-rs-linux-amd64 | grep -q 'static-pie linked'
- name: Upload numaflow binary
uses: actions/upload-artifact@v4
with:
Expand Down
5 changes: 3 additions & 2 deletions pkg/reconciler/pipeline/controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -224,8 +224,9 @@ func isLifecycleChange(pl *dfv1.Pipeline) bool {
// Check if the desired phase of the pipeline is 'Paused', or if the current phase of the
// pipeline is either 'Paused' or 'Pausing'. This indicates a transition into or out of
// a paused state which is a lifecycle phase change
if oldPhase := pl.Status.Phase; pl.GetDesiredPhase() == dfv1.PipelinePhasePaused ||
oldPhase == dfv1.PipelinePhasePaused || oldPhase == dfv1.PipelinePhasePausing {
if currentPhase := pl.Status.Phase; pl.GetDesiredPhase() == dfv1.PipelinePhasePaused ||
currentPhase == dfv1.PipelinePhasePaused || currentPhase == dfv1.PipelinePhasePausing ||
pl.GetAnnotations()[dfv1.KeyPauseTimestamp] != "" {
return true
}

Expand Down
138 changes: 2 additions & 136 deletions rust/Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

6 changes: 5 additions & 1 deletion rust/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -66,4 +66,8 @@ axum = "0.7.5"
axum-server = { version = "0.7.1", features = ["tls-rustls"] }
serde = { version = "1.0.204", features = ["derive"] }
rustls = { version = "0.23.12", features = ["aws_lc_rs"] }
reqwest = "0.12.12"
reqwest = { version = "0.12.12", default-features = false, features = [
"http2",
"rustls-tls",
"json",
] }
8 changes: 6 additions & 2 deletions rust/numaflow-core/src/config.rs
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,10 @@ use crate::Result;
const ENV_MONO_VERTEX_OBJ: &str = "NUMAFLOW_MONO_VERTEX_OBJECT";
const ENV_VERTEX_OBJ: &str = "NUMAFLOW_VERTEX_OBJECT";

const ENV_CALLBACK_ENABLED: &str = "NUMAFLOW_CALLBACK_ENABLED";
const ENV_CALLBACK_CONCURRENCY: &str = "NUMAFLOW_CALLBACK_CONCURRENCY";
const DEFAULT_CALLBACK_CONCURRENCY: usize = 100;

/// Building blocks (Source, Sink, Transformer, FallBack, Metrics, etc.) to build a Pipeline or a
/// MonoVertex.
pub(crate) mod components;
Expand Down Expand Up @@ -97,7 +101,7 @@ pub(crate) struct Settings {
impl Settings {
/// load based on the CRD type, either a pipeline or a monovertex.
/// Settings are populated through reading the env vars set via the controller. The main
/// CRD is the base64 spec of the CR.
/// CRD is the base64 spec of the CR.
fn load() -> Result<Self> {
if let Ok(obj) = env::var(ENV_MONO_VERTEX_OBJ) {
let cfg = MonovertexConfig::load(obj)?;
Expand All @@ -112,7 +116,7 @@ impl Settings {
custom_resource_type: CustomResourceType::Pipeline(cfg),
});
}
Err(Error::Config("No configuration found".to_string()))
Err(Error::Config("No configuration found - environment variable {ENV_MONO_VERTEX_OBJ} or {ENV_VERTEX_OBJ} is not set".to_string()))
}
}

Expand Down
22 changes: 22 additions & 0 deletions rust/numaflow-core/src/config/monovertex.rs
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,10 @@ use crate::config::monovertex::sink::SinkType;
use crate::error::Error;
use crate::Result;

use super::pipeline::ServingCallbackConfig;

use super::{DEFAULT_CALLBACK_CONCURRENCY, ENV_CALLBACK_CONCURRENCY, ENV_CALLBACK_ENABLED};

const DEFAULT_BATCH_SIZE: u64 = 500;
const DEFAULT_TIMEOUT_IN_MS: u32 = 1000;
const DEFAULT_LOOKBACK_WINDOW_IN_SECS: u16 = 120;
Expand All @@ -33,6 +37,7 @@ pub(crate) struct MonovertexConfig {
pub(crate) transformer_config: Option<TransformerConfig>,
pub(crate) fb_sink_config: Option<SinkConfig>,
pub(crate) metrics_config: MetricsConfig,
pub(crate) callback_config: Option<ServingCallbackConfig>,
}

impl Default for MonovertexConfig {
Expand All @@ -53,6 +58,7 @@ impl Default for MonovertexConfig {
transformer_config: None,
fb_sink_config: None,
metrics_config: MetricsConfig::default(),
callback_config: None,
}
}
}
Expand Down Expand Up @@ -143,6 +149,21 @@ impl MonovertexConfig {
.and_then(|scale| scale.lookback_seconds.map(|x| x as u16))
.unwrap_or(DEFAULT_LOOKBACK_WINDOW_IN_SECS);

let mut callback_config = None;
if env::var(ENV_CALLBACK_ENABLED).is_ok() {
let callback_concurrency: usize = env::var(ENV_CALLBACK_CONCURRENCY)
.unwrap_or_else(|_| format!("{DEFAULT_CALLBACK_CONCURRENCY}"))
.parse()
.map_err(|e| {
Error::Config(format!(
"Parsing value of {ENV_CALLBACK_CONCURRENCY}: {e:?}"
))
})?;
callback_config = Some(ServingCallbackConfig {
callback_concurrency,
});
}

Ok(MonovertexConfig {
name: mono_vertex_name,
replica: *get_vertex_replica(),
Expand All @@ -153,6 +174,7 @@ impl MonovertexConfig {
sink_config,
transformer_config,
fb_sink_config,
callback_config,
})
}
}
Expand Down
Loading

0 comments on commit 6619dc1

Please sign in to comment.