Skip to content

Commit

Permalink
Merge branch 'main' into fix/units-charts
Browse files Browse the repository at this point in the history
  • Loading branch information
adarsh0728 authored Jan 21, 2025
2 parents 212affa + 05cd792 commit 38103bf
Show file tree
Hide file tree
Showing 19 changed files with 402 additions and 542 deletions.
8 changes: 4 additions & 4 deletions go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -49,9 +49,9 @@ require (
go.uber.org/goleak v1.3.0
go.uber.org/multierr v1.11.0
go.uber.org/zap v1.26.0
golang.org/x/crypto v0.31.0
golang.org/x/crypto v0.32.0
golang.org/x/exp v0.0.0-20240531132922-fd00a4e0eefc
golang.org/x/net v0.29.0
golang.org/x/net v0.34.0
golang.org/x/oauth2 v0.21.0
golang.org/x/sync v0.10.0
golang.org/x/tools v0.24.0
Expand Down Expand Up @@ -200,8 +200,8 @@ require (
go.mongodb.org/mongo-driver v1.15.0 // indirect
golang.org/x/arch v0.7.0 // indirect
golang.org/x/mod v0.20.0 // indirect
golang.org/x/sys v0.28.0 // indirect
golang.org/x/term v0.27.0 // indirect
golang.org/x/sys v0.29.0 // indirect
golang.org/x/term v0.28.0 // indirect
golang.org/x/text v0.21.0 // indirect
golang.org/x/time v0.6.0 // indirect
golang.org/x/xerrors v0.0.0-20220907171357-04be3eba64a2 // indirect
Expand Down
18 changes: 9 additions & 9 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -691,8 +691,8 @@ golang.org/x/crypto v0.0.0-20210921155107-089bfa567519/go.mod h1:GvvjBRRGRdwPK5y
golang.org/x/crypto v0.0.0-20220214200702-86341886e292/go.mod h1:IxCIyHEi3zRg3s0A5j5BB6A9Jmi73HwBIUl50j+osU4=
golang.org/x/crypto v0.3.0/go.mod h1:hebNnKkNXi2UzZN1eVRvBB7co0a+JxK6XbPiWVs/3J4=
golang.org/x/crypto v0.6.0/go.mod h1:OFC/31mSvZgRz0V1QTNCzfAI1aIRzbiufJtkMIlEp58=
golang.org/x/crypto v0.31.0 h1:ihbySMvVjLAeSH1IbfcRTkD/iNscyz8rGzjF/E5hV6U=
golang.org/x/crypto v0.31.0/go.mod h1:kDsLvtWBEx7MV9tJOj9bnXsPbxwJQ6csT/x4KIN4Ssk=
golang.org/x/crypto v0.32.0 h1:euUpcYgM8WcP71gNpTqQCn6rC2t6ULUPiOzfWaXVVfc=
golang.org/x/crypto v0.32.0/go.mod h1:ZnnJkOaASj8g0AjIduWNlq2NRxL0PlBrbKVyZ6V/Ugc=
golang.org/x/exp v0.0.0-20190121172915-509febef88a4/go.mod h1:CJ0aWSM057203Lf6IL+f9T1iT9GByDxfZKAQTCR3kQA=
golang.org/x/exp v0.0.0-20190306152737-a1d7652674e8/go.mod h1:CJ0aWSM057203Lf6IL+f9T1iT9GByDxfZKAQTCR3kQA=
golang.org/x/exp v0.0.0-20190510132918-efd6b22b2522/go.mod h1:ZjyILWgesfNpC6sMxTJOJm9Kp84zZh5NQWvqDGG3Qr8=
Expand Down Expand Up @@ -778,8 +778,8 @@ golang.org/x/net v0.0.0-20220722155237-a158d28d115b/go.mod h1:XRhObCWvk6IyKnWLug
golang.org/x/net v0.2.0/go.mod h1:KqCZLdyyvdV855qA2rE3GC2aiw5xGR5TEjj8smXukLY=
golang.org/x/net v0.6.0/go.mod h1:2Tu9+aMcznHK/AK1HMvgo6xiTLG5rD5rZLDS+rp2Bjs=
golang.org/x/net v0.7.0/go.mod h1:2Tu9+aMcznHK/AK1HMvgo6xiTLG5rD5rZLDS+rp2Bjs=
golang.org/x/net v0.29.0 h1:5ORfpBpCs4HzDYoodCDBbwHzdR5UrLBZ3sOnUJmFoHo=
golang.org/x/net v0.29.0/go.mod h1:gLkgy8jTGERgjzMic6DS9+SP0ajcu6Xu3Orq/SpETg0=
golang.org/x/net v0.34.0 h1:Mb7Mrk043xzHgnRM88suvJFwzVrRfHEHJEl5/71CKw0=
golang.org/x/net v0.34.0/go.mod h1:di0qlW3YNM5oh6GqDGQr92MyTozJPmybPK4Ev/Gm31k=
golang.org/x/oauth2 v0.0.0-20180821212333-d2e6202438be/go.mod h1:N/0e6XlmueqKjAGxoOufVs8QHGRruUQn6yWY3a++T0U=
golang.org/x/oauth2 v0.0.0-20190226205417-e64efc72b421/go.mod h1:gOpvHmFTYa4IltrdGE7lF6nIHvwfUNPOp7c8zoXwtLw=
golang.org/x/oauth2 v0.0.0-20190604053449-0f29369cfe45/go.mod h1:gOpvHmFTYa4IltrdGE7lF6nIHvwfUNPOp7c8zoXwtLw=
Expand Down Expand Up @@ -867,15 +867,15 @@ golang.org/x/sys v0.2.0/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg=
golang.org/x/sys v0.5.0/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg=
golang.org/x/sys v0.6.0/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg=
golang.org/x/sys v0.21.0/go.mod h1:/VUhepiaJMQUp4+oa/7Zr1D23ma6VTLIYjOOTFZPUcA=
golang.org/x/sys v0.28.0 h1:Fksou7UEQUWlKvIdsqzJmUmCX3cZuD2+P3XyyzwMhlA=
golang.org/x/sys v0.28.0/go.mod h1:/VUhepiaJMQUp4+oa/7Zr1D23ma6VTLIYjOOTFZPUcA=
golang.org/x/sys v0.29.0 h1:TPYlXGxvx1MGTn2GiZDhnjPA9wZzZeGKHHmKhHYvgaU=
golang.org/x/sys v0.29.0/go.mod h1:/VUhepiaJMQUp4+oa/7Zr1D23ma6VTLIYjOOTFZPUcA=
golang.org/x/term v0.0.0-20201126162022-7de9c90e9dd1/go.mod h1:bj7SfCRtBDWHUb9snDiAeCFNEtKQo2Wmx5Cou7ajbmo=
golang.org/x/term v0.0.0-20210615171337-6886f2dfbf5b/go.mod h1:jbD1KX2456YbFQfuXm/mYQcufACuNUgVhRMnK/tPxf8=
golang.org/x/term v0.0.0-20210927222741-03fcf44c2211/go.mod h1:jbD1KX2456YbFQfuXm/mYQcufACuNUgVhRMnK/tPxf8=
golang.org/x/term v0.2.0/go.mod h1:TVmDHMZPmdnySmBfhjOoOdhjzdE1h4u1VwSiw2l1Nuc=
golang.org/x/term v0.5.0/go.mod h1:jMB1sMXY+tzblOD4FWmEbocvup2/aLOaQEp7JmGp78k=
golang.org/x/term v0.27.0 h1:WP60Sv1nlK1T6SupCHbXzSaN0b9wUmsPoRS9b61A23Q=
golang.org/x/term v0.27.0/go.mod h1:iMsnZpn0cago0GOrHO2+Y7u7JPn5AylBrcoWkElMTSM=
golang.org/x/term v0.28.0 h1:/Ts8HFuMR2E6IP/jlo7QVLZHggjKQbhu/7H0LJFr3Gg=
golang.org/x/term v0.28.0/go.mod h1:Sw/lC2IAUZ92udQNf3WodGtn4k/XoLyZoh8v/8uiwek=
golang.org/x/text v0.0.0-20170915032832-14c0d48ead0c/go.mod h1:NqM8EUOU14njkJ3fqMW+pc6Ldnwhi/IjpwHt7yyuwOQ=
golang.org/x/text v0.3.0/go.mod h1:NqM8EUOU14njkJ3fqMW+pc6Ldnwhi/IjpwHt7yyuwOQ=
golang.org/x/text v0.3.1-0.20180807135948-17ff2d5776d2/go.mod h1:NqM8EUOU14njkJ3fqMW+pc6Ldnwhi/IjpwHt7yyuwOQ=
Expand Down Expand Up @@ -1164,4 +1164,4 @@ sigs.k8s.io/structured-merge-diff/v4 v4.4.1 h1:150L+0vs/8DA78h1u02ooW1/fFq/Lwr+s
sigs.k8s.io/structured-merge-diff/v4 v4.4.1/go.mod h1:N8hJocpFajUSSeSJ9bOZ77VzejKZaXsTtZo4/u7Io08=
sigs.k8s.io/yaml v1.2.0/go.mod h1:yfXDCHCao9+ENCvLSE62v9VSji2MKu5jeNfTrofGhJc=
sigs.k8s.io/yaml v1.4.0 h1:Mk1wCc2gy/F0THH0TAp1QYyJNzRm2KCLy3o5ASXVI5E=
sigs.k8s.io/yaml v1.4.0/go.mod h1:Ejl7/uTz7PSA4eKMyQCUTnhZYNmLIl+5c2lQPGR2BPY=
sigs.k8s.io/yaml v1.4.0/go.mod h1:Ejl7/uTz7PSA4eKMyQCUTnhZYNmLIl+5c2lQPGR2BPY=
14 changes: 1 addition & 13 deletions pkg/reconciler/isbsvc/controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -116,9 +116,7 @@ func (r *interStepBufferServiceReconciler) reconcile(ctx context.Context, isbSvc
if controllerutil.ContainsFinalizer(isbSvc, deprecatedFinalizerName) { // Remove deprecated finalizer if exists
controllerutil.RemoveFinalizer(isbSvc, deprecatedFinalizerName)
}
if needsFinalizer(isbSvc) {
controllerutil.AddFinalizer(isbSvc, finalizerName)
}
controllerutil.AddFinalizer(isbSvc, finalizerName)

defer func() {
if isbSvc.Status.IsHealthy() {
Expand All @@ -139,13 +137,3 @@ func (r *interStepBufferServiceReconciler) reconcile(ctx context.Context, isbSvc
}
return installer.Install(ctx, isbSvc, r.client, r.kubeClient, r.config, log, r.recorder)
}

func needsFinalizer(isbSvc *dfv1.InterStepBufferService) bool {
if isbSvc.Spec.Redis != nil && isbSvc.Spec.Redis.Native != nil && isbSvc.Spec.Redis.Native.Persistence != nil {
return true
}
if isbSvc.Spec.JetStream != nil && isbSvc.Spec.JetStream.Persistence != nil {
return true
}
return false
}
20 changes: 0 additions & 20 deletions pkg/reconciler/isbsvc/controller_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -217,26 +217,6 @@ func TestNeedsUpdate(t *testing.T) {
})
}

func TestNeedsFinalizer(t *testing.T) {
t.Run("needs finalizer redis", func(t *testing.T) {
testStorageClass := "test"
testIsbs := nativeRedisIsbs.DeepCopy()
testIsbs.Spec.Redis.Native.Persistence = &dfv1.PersistenceStrategy{
StorageClassName: &testStorageClass,
}
assert.True(t, needsFinalizer(testIsbs))
})

t.Run("needs finalizer jetstream", func(t *testing.T) {
testStorageClass := "test"
testIsbs := jetStreamIsbs.DeepCopy()
testIsbs.Spec.JetStream.Persistence = &dfv1.PersistenceStrategy{
StorageClassName: &testStorageClass,
}
assert.True(t, needsFinalizer(testIsbs))
})
}

func contains(arr []string, str string) bool {
for _, a := range arr {
if a == str {
Expand Down
33 changes: 31 additions & 2 deletions pkg/reconciler/metrics.go
Original file line number Diff line number Diff line change
Expand Up @@ -79,6 +79,20 @@ var (
Help: "A metric indicates the current replicas of a Vertex",
}, []string{metrics.LabelNamespace, metrics.LabelPipeline, metrics.LabelVertex})

// VertexMinReplicas indicates the min replicas of a Vertex.
VertexMinReplicas = prometheus.NewGaugeVec(prometheus.GaugeOpts{
Subsystem: "controller",
Name: "vertex_min_replicas",
Help: "A metric indicates the min replicas of a Vertex",
}, []string{metrics.LabelNamespace, metrics.LabelPipeline, metrics.LabelVertex})

// VertexMaxReplicas indicates the max replicas of a Vertex.
VertexMaxReplicas = prometheus.NewGaugeVec(prometheus.GaugeOpts{
Subsystem: "controller",
Name: "vertex_max_replicas",
Help: "A metric indicates the max replicas of a Vertex",
}, []string{metrics.LabelNamespace, metrics.LabelPipeline, metrics.LabelVertex})

// MonoVertexDesiredReplicas indicates the desired replicas of a MonoVertex.
MonoVertexDesiredReplicas = prometheus.NewGaugeVec(prometheus.GaugeOpts{
Subsystem: "controller",
Expand All @@ -92,11 +106,26 @@ var (
Name: "monovtx_current_replicas",
Help: "A metric indicates the current replicas of a MonoVertex",
}, []string{metrics.LabelNamespace, metrics.LabelMonoVertexName})

// MonoVertexMinReplicas indicates the min replicas of a MonoVertex.
MonoVertexMinReplicas = prometheus.NewGaugeVec(prometheus.GaugeOpts{
Subsystem: "controller",
Name: "monovtx_min_replicas",
Help: "A metric indicates the min replicas of a MonoVertex",
}, []string{metrics.LabelNamespace, metrics.LabelMonoVertexName})

// MonoVertexMaxReplicas indicates the max replicas of a MonoVertex.
MonoVertexMaxReplicas = prometheus.NewGaugeVec(prometheus.GaugeOpts{
Subsystem: "controller",
Name: "monovtx_max_replicas",
Help: "A metric indicates the max replicas of a MonoVertex",
}, []string{metrics.LabelNamespace, metrics.LabelMonoVertexName})
)

func init() {
ctrlmetrics.Registry.MustRegister(BuildInfo, ISBSvcHealth, PipelineHealth,
MonoVertexHealth, JetStreamISBSvcReplicas, RedisISBSvcReplicas,
VertexDesiredReplicas, VertexCurrentReplicas, MonoVertexDesiredReplicas,
MonoVertexCurrentReplicas)
VertexDesiredReplicas, VertexCurrentReplicas, VertexMinReplicas,
VertexMaxReplicas, MonoVertexDesiredReplicas, MonoVertexCurrentReplicas,
MonoVertexMinReplicas, MonoVertexMaxReplicas)
}
19 changes: 15 additions & 4 deletions pkg/reconciler/monovertex/controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -68,6 +68,8 @@ func (mr *monoVertexReconciler) Reconcile(ctx context.Context, req ctrl.Request)
monoVtx := &dfv1.MonoVertex{}
if err := mr.client.Get(ctx, req.NamespacedName, monoVtx); err != nil {
if apierrors.IsNotFound(err) {
// Clean up metrics here, since there's no finalizer defined for MonoVertex objects, best effort
cleanupMetrics(req.NamespacedName.Namespace, req.NamespacedName.Name)
return reconcile.Result{}, nil
}
mr.logger.Errorw("Unable to get MonoVertex", zap.Any("request", req), zap.Error(err))
Expand Down Expand Up @@ -100,10 +102,8 @@ func (mr *monoVertexReconciler) reconcile(ctx context.Context, monoVtx *dfv1.Mon
if !monoVtx.DeletionTimestamp.IsZero() {
log.Info("Deleting mono vertex")
mr.scaler.StopWatching(mVtxKey)
// Clean up metrics
_ = reconciler.MonoVertexHealth.DeleteLabelValues(monoVtx.Namespace, monoVtx.Name)
_ = reconciler.MonoVertexDesiredReplicas.DeleteLabelValues(monoVtx.Namespace, monoVtx.Name)
_ = reconciler.MonoVertexCurrentReplicas.DeleteLabelValues(monoVtx.Namespace, monoVtx.Name)
// Clean up metrics, best effort
cleanupMetrics(monoVtx.Namespace, monoVtx.Name)
return ctrl.Result{}, nil
}

Expand Down Expand Up @@ -188,6 +188,8 @@ func (mr *monoVertexReconciler) orchestratePods(ctx context.Context, monoVtx *df
defer func() {
reconciler.MonoVertexDesiredReplicas.WithLabelValues(monoVtx.Namespace, monoVtx.Name).Set(float64(desiredReplicas))
reconciler.MonoVertexCurrentReplicas.WithLabelValues(monoVtx.Namespace, monoVtx.Name).Set(float64(monoVtx.Status.Replicas))
reconciler.MonoVertexMinReplicas.WithLabelValues(monoVtx.Namespace, monoVtx.Name).Set(float64(monoVtx.Spec.Scale.GetMinReplicas()))
reconciler.MonoVertexMaxReplicas.WithLabelValues(monoVtx.Namespace, monoVtx.Name).Set(float64(monoVtx.Spec.Scale.GetMaxReplicas()))
}()

podSpec, err := mr.buildPodSpec(monoVtx)
Expand Down Expand Up @@ -618,3 +620,12 @@ func (mr *monoVertexReconciler) checkChildrenResourceStatus(ctx context.Context,

return nil
}

// Clean up metrics, should be called when corresponding mvtx is deleted
func cleanupMetrics(namespace, mvtx string) {
_ = reconciler.MonoVertexHealth.DeleteLabelValues(namespace, mvtx)
_ = reconciler.MonoVertexDesiredReplicas.DeleteLabelValues(namespace, mvtx)
_ = reconciler.MonoVertexCurrentReplicas.DeleteLabelValues(namespace, mvtx)
_ = reconciler.MonoVertexMaxReplicas.DeleteLabelValues(namespace, mvtx)
_ = reconciler.MonoVertexMinReplicas.DeleteLabelValues(namespace, mvtx)
}
4 changes: 4 additions & 0 deletions pkg/reconciler/pipeline/controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -143,6 +143,8 @@ func (r *pipelineReconciler) reconcile(ctx context.Context, pl *dfv1.Pipeline) (
// Delete corresponding vertex metrics
_ = reconciler.VertexDesiredReplicas.DeletePartialMatch(map[string]string{metrics.LabelNamespace: pl.Namespace, metrics.LabelPipeline: pl.Name})
_ = reconciler.VertexCurrentReplicas.DeletePartialMatch(map[string]string{metrics.LabelNamespace: pl.Namespace, metrics.LabelPipeline: pl.Name})
_ = reconciler.VertexMinReplicas.DeletePartialMatch(map[string]string{metrics.LabelNamespace: pl.Namespace, metrics.LabelPipeline: pl.Name})
_ = reconciler.VertexMaxReplicas.DeletePartialMatch(map[string]string{metrics.LabelNamespace: pl.Namespace, metrics.LabelPipeline: pl.Name})
}
return ctrl.Result{}, nil
}
Expand Down Expand Up @@ -378,6 +380,8 @@ func (r *pipelineReconciler) reconcileFixedResources(ctx context.Context, pl *df
// Clean up vertex replica metrics
reconciler.VertexDesiredReplicas.DeleteLabelValues(pl.Namespace, pl.Name, v.Spec.Name)
reconciler.VertexCurrentReplicas.DeleteLabelValues(pl.Namespace, pl.Name, v.Spec.Name)
reconciler.VertexMinReplicas.DeleteLabelValues(pl.Namespace, pl.Name, v.Spec.Name)
reconciler.VertexMaxReplicas.DeleteLabelValues(pl.Namespace, pl.Name, v.Spec.Name)
}

// Daemon service
Expand Down
2 changes: 2 additions & 0 deletions pkg/reconciler/vertex/controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -202,6 +202,8 @@ func (r *vertexReconciler) orchestratePods(ctx context.Context, vertex *dfv1.Ver
defer func() {
reconciler.VertexDesiredReplicas.WithLabelValues(vertex.Namespace, vertex.Spec.PipelineName, vertex.Spec.Name).Set(float64(desiredReplicas))
reconciler.VertexCurrentReplicas.WithLabelValues(vertex.Namespace, vertex.Spec.PipelineName, vertex.Spec.Name).Set(float64(vertex.Status.Replicas))
reconciler.VertexMinReplicas.WithLabelValues(vertex.Namespace, vertex.Spec.PipelineName, vertex.Spec.Name).Set(float64(vertex.Spec.Scale.GetMinReplicas()))
reconciler.VertexMaxReplicas.WithLabelValues(vertex.Namespace, vertex.Spec.PipelineName, vertex.Spec.Name).Set(float64(vertex.Spec.Scale.GetMaxReplicas()))
}()

// Build pod spec of the 1st replica to calculate the hash, which is used to determine whether the pod spec is changed
Expand Down
27 changes: 8 additions & 19 deletions rust/numaflow-core/src/monovertex.rs
Original file line number Diff line number Diff line change
@@ -1,4 +1,3 @@
use forwarder::ForwarderBuilder;
use tokio_util::sync::CancellationToken;
use tracing::info;

Expand All @@ -10,7 +9,6 @@ use crate::shared::create_components;
use crate::sink::SinkWriter;
use crate::source::Source;
use crate::tracker::TrackerHandle;
use crate::transformer::Transformer;
use crate::{metrics, shared};

/// [forwarder] orchestrates data movement from the Source to the Sink via the optional SourceTransformer.
Expand All @@ -26,19 +24,20 @@ pub(crate) async fn start_forwarder(
config: &MonovertexConfig,
) -> error::Result<()> {
let tracker_handle = TrackerHandle::new();
let (source, source_grpc_client) = create_components::create_source(
let (transformer, transformer_grpc_client) = create_components::create_transformer(
config.batch_size,
config.read_timeout,
&config.source_config,
config.transformer_config.clone(),
tracker_handle.clone(),
cln_token.clone(),
)
.await?;

let (transformer, transformer_grpc_client) = create_components::create_transformer(
let (source, source_grpc_client) = create_components::create_source(
config.batch_size,
config.transformer_config.clone(),
config.read_timeout,
&config.source_config,
tracker_handle.clone(),
transformer,
cln_token.clone(),
)
.await?;
Expand Down Expand Up @@ -69,7 +68,7 @@ pub(crate) async fn start_forwarder(
// FIXME: what to do with the handle
shared::metrics::start_metrics_server(config.metrics_config.clone(), metrics_state).await;

start(config.clone(), source, sink_writer, transformer, cln_token).await?;
start(config.clone(), source, sink_writer, cln_token).await?;

Ok(())
}
Expand All @@ -78,7 +77,6 @@ async fn start(
mvtx_config: MonovertexConfig,
source: Source,
sink: SinkWriter,
transformer: Option<Transformer>,
cln_token: CancellationToken,
) -> error::Result<()> {
// start the pending reader to publish pending metrics
Expand All @@ -89,18 +87,9 @@ async fn start(
.await;
let _pending_reader_handle = pending_reader.start(is_mono_vertex()).await;

let mut forwarder_builder = ForwarderBuilder::new(source, sink, cln_token);

// add transformer if exists
if let Some(transformer_client) = transformer {
forwarder_builder = forwarder_builder.transformer(transformer_client);
}

// build the final forwarder
let forwarder = forwarder_builder.build();
let forwarder = forwarder::Forwarder::new(source, sink, cln_token);

info!("Forwarder is starting...");

// start the forwarder, it will return only on Signal
forwarder.start().await?;

Expand Down
Loading

0 comments on commit 38103bf

Please sign in to comment.