-
Notifications
You must be signed in to change notification settings - Fork 121
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
feat: implement callbacks from vertices when serving is used as source #2311
Conversation
Signed-off-by: Sreekanth <[email protected]>
Signed-off-by: Sreekanth <[email protected]>
if let Some(ref callback_handler) = this.callback_handler { | ||
let metadata = message.metadata.ok_or_else(|| { | ||
Error::Source(format!( |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
we should write the callback after pafs are resolved
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Will update. I wrongly assumed .resolve_pafs()
would wait for them to resolve.
Signed-off-by: Sreekanth <[email protected]>
Codecov ReportAttention: Patch coverage is
Additional details and impacted files@@ Coverage Diff @@
## main #2311 +/- ##
==========================================
+ Coverage 67.47% 67.93% +0.45%
==========================================
Files 351 352 +1
Lines 45822 46528 +706
==========================================
+ Hits 30918 31608 +690
- Misses 13828 13849 +21
+ Partials 1076 1071 -5 ☔ View full report in Codecov by Sentry. |
Signed-off-by: Sreekanth <[email protected]>
Signed-off-by: Sreekanth <[email protected]>
Signed-off-by: Sreekanth <[email protected]>
Signed-off-by: Sreekanth <[email protected]>
Signed-off-by: Sreekanth <[email protected]>
Signed-off-by: Sreekanth <[email protected]>
Signed-off-by: Sreekanth <[email protected]>
Signed-off-by: Sreekanth <[email protected]>
Signed-off-by: Sreekanth <[email protected]>
rust/numaflow-core/src/tracker.rs
Outdated
pub(crate) async fn update(&self, message: &Message) -> Result<()> { | ||
let offset = message.id.offset.clone(); | ||
let mut responses: Option<Vec<String>> = None; | ||
if self.enable_callbacks { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
match, to avoid mut
Signed-off-by: Sreekanth <[email protected]>
Signed-off-by: Sreekanth <[email protected]>
Signed-off-by: Sreekanth <[email protected]>
Signed-off-by: Sreekanth <[email protected]>
Signed-off-by: Sreekanth <[email protected]>
Signed-off-by: Sreekanth <[email protected]>
Signed-off-by: Sreekanth <[email protected]>
Signed-off-by: Sreekanth <[email protected]>
Signed-off-by: Sreekanth <[email protected]>
Signed-off-by: Sreekanth <[email protected]>
Signed-off-by: Sreekanth <[email protected]>
Signed-off-by: Sreekanth <[email protected]>
Signed-off-by: Sreekanth <[email protected]>
Signed-off-by: Sreekanth <[email protected]>
Signed-off-by: Sreekanth <[email protected]>
Signed-off-by: Sreekanth <[email protected]>
Signed-off-by: Sreekanth <[email protected]>
Signed-off-by: Sreekanth <[email protected]>
The
In all other PRs
This was causing
Unfortunately, no other info was coming in
Downloaded the binary from the assets of the PR build, and ran it on an Intel Ubuntu 24.04 VM:
Somehow binary is linked with
Tried building static binary on an Intel Ubuntu 24.04 VM (same as Github CI runner, verified glibc version also). And this builds a static binary successfully. Disabled default features in reqwest = { version = "0.12.12", default-features = false, features = [
"http2",
"rustls-tls",
"json",
] } |
how come we didn't see this problem when running locally? perhaps we should see whether we can reproduce this in cloud-workspaces? |
I'm couldn't find the exact reason for this. It looks like sometimes the option to build static binary ( Another reference: https://github.com/rust-lang/rust/pull/71586/files#diff-6e9a08e032c83ccfc4b967a192b6d38c932eaaf6a153027bd79365830e4d7139
https://doc.rust-lang.org/reference/linkage.html#r-link.crt.ineffective
Our target ( |
Signed-off-by: Sreekanth <[email protected]>
Signed-off-by: Sreekanth <[email protected]>
rust/numaflow-core/src/mapper/map.rs
Outdated
@@ -304,6 +305,7 @@ impl MapHandle { | |||
Ok(()) | |||
}); | |||
|
|||
tracing::info!("Returning output_rx stream"); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
remove
@@ -286,9 +296,15 @@ impl PipelineConfig { | |||
.map(|(key, val)| (key.into(), val.into())) | |||
.filter(|(key, _val)| { | |||
// FIXME(cr): this filter is non-exhaustive, should we invert? |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
remove
@@ -343,14 +346,16 @@ impl MapHandle { | |||
match receiver.await { | |||
Ok(Ok(mut mapped_messages)) => { | |||
// update the tracker with the number of messages sent and send the mapped messages |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
change
@@ -605,6 +605,9 @@ pub(crate) async fn start_metrics_https_server( | |||
addr: SocketAddr, | |||
metrics_state: UserDefinedContainerState, | |||
) -> crate::Result<()> { | |||
// Setup the CryptoProvider (controls core cryptography used by rustls) for the process |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
do we need this?
Signed-off-by: Vigith Maurice <[email protected]>
Signed-off-by: Vigith Maurice <[email protected]>
Implements callbacks to source vertex when Serving source is used. #2318