From b4f14688b6b8ef45570312717d993b76a2173544 Mon Sep 17 00:00:00 2001 From: Sreekanth Date: Wed, 22 Jan 2025 09:49:30 +0530 Subject: [PATCH 1/3] Upgrade Rust to 1.84 Signed-off-by: Sreekanth --- Dockerfile | 2 +- rust/Cargo.lock | 119 +++++++++++++++++++-------- rust/Cargo.toml | 19 +++-- rust/Dockerfile | 59 ------------- rust/rust-toolchain.toml | 2 +- rust/serving/src/app/direct_proxy.rs | 2 +- 6 files changed, 100 insertions(+), 103 deletions(-) delete mode 100644 rust/Dockerfile diff --git a/Dockerfile b/Dockerfile index 48d9d6895..283781953 100644 --- a/Dockerfile +++ b/Dockerfile @@ -18,7 +18,7 @@ RUN chmod +x /bin/numaflow-rs #################################################################################################### # Rust binary #################################################################################################### -FROM lukemathwalker/cargo-chef:latest-rust-1.82 AS chef +FROM lukemathwalker/cargo-chef:latest-rust-1.84 AS chef ARG TARGETPLATFORM WORKDIR /numaflow RUN apt-get update && apt-get install -y protobuf-compiler diff --git a/rust/Cargo.lock b/rust/Cargo.lock index 44f658815..9b6ba0f58 100644 --- a/rust/Cargo.lock +++ b/rust/Cargo.lock @@ -1,6 +1,6 @@ # This file is automatically @generated by Cargo. # It is not intended for manual editing. -version = 3 +version = 4 [[package]] name = "addr2line" @@ -136,9 +136,9 @@ checksum = "ace50bade8e6234aa140d9a2f552bbee1db4d353f69b8217bc503490fc1a9f26" [[package]] name = "aws-lc-rs" -version = "1.11.1" +version = "1.12.2" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "f47bb8cc16b669d267eeccf585aea077d0882f4777b1c1f740217885d6e6e5a3" +checksum = "4c2b7ddaa2c56a367ad27a094ad8ef4faacf8a617c2575acb2ba88949df999ca" dependencies = [ "aws-lc-sys", "paste", @@ -147,16 +147,15 @@ dependencies = [ [[package]] name = "aws-lc-sys" -version = "0.23.1" +version = "0.25.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "a2101df3813227bbaaaa0b04cd61c534c7954b22bd68d399b440be937dc63ff7" +checksum = "71b2ddd3ada61a305e1d8bb6c005d1eaa7d14d903681edfc400406d523a9b491" dependencies = [ "bindgen", "cc", "cmake", "dunce", "fs_extra", - "libc", "paste", ] @@ -167,16 +166,43 @@ source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "edca88bc138befd0323b20752846e6587272d3b03b0343c8ea28a6f819e6e71f" dependencies = [ "async-trait", - "axum-core", + "axum-core 0.4.5", "bytes", "futures-util", "http 1.1.0", "http-body 1.0.1", "http-body-util", + "itoa", + "matchit 0.7.3", + "memchr", + "mime", + "percent-encoding", + "pin-project-lite", + "rustversion", + "serde", + "sync_wrapper 1.0.2", + "tower 0.5.2", + "tower-layer", + "tower-service", +] + +[[package]] +name = "axum" +version = "0.8.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "efea76243612a2436fb4074ba0cf3ba9ea29efdeb72645d8fc63f116462be1de" +dependencies = [ + "axum-core 0.5.1", + "bytes", + "form_urlencoded", + "futures-util", + "http 1.1.0", + "http-body 1.0.1", + "http-body-util", "hyper 1.5.1", "hyper-util", "itoa", - "matchit", + "matchit 0.8.4", "memchr", "mime", "percent-encoding", @@ -187,7 +213,6 @@ dependencies = [ "serde_path_to_error", "serde_urlencoded", "sync_wrapper 1.0.2", - "tokio", "tower 0.5.2", "tower-layer", "tower-service", @@ -212,6 +237,25 @@ dependencies = [ "sync_wrapper 1.0.2", "tower-layer", "tower-service", +] + +[[package]] +name = "axum-core" +version = "0.5.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "eab1b0df7cded837c40dacaa2e1c33aa17c84fc3356ae67b5645f1e83190753e" +dependencies = [ + "bytes", + "futures-core", + "http 1.1.0", + "http-body 1.0.1", + "http-body-util", + "mime", + "pin-project-lite", + "rustversion", + "sync_wrapper 1.0.2", + "tower-layer", + "tower-service", "tracing", ] @@ -241,7 +285,7 @@ dependencies = [ "hyper 1.5.1", "hyper-util", "pin-project-lite", - "rustls 0.23.19", + "rustls 0.23.21", "rustls-pemfile 2.2.0", "rustls-pki-types", "tokio", @@ -1070,7 +1114,7 @@ dependencies = [ "hyper 1.5.1", "hyper-util", "log", - "rustls 0.23.19", + "rustls 0.23.21", "rustls-native-certs 0.8.1", "rustls-pki-types", "tokio", @@ -1414,7 +1458,7 @@ dependencies = [ "k8s-openapi", "kube-core", "pem", - "rustls 0.23.19", + "rustls 0.23.21", "rustls-pemfile 2.2.0", "secrecy", "serde", @@ -1458,9 +1502,9 @@ checksum = "830d08ce1d1d941e6b30645f1a0eb5643013d835ce3779a5fc208261dbe10f55" [[package]] name = "libc" -version = "0.2.167" +version = "0.2.169" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "09d6582e104315a817dff97f75133544b2e094ee22447d2acf4a74e189ba06fc" +checksum = "b5aba8db14291edd000dfcc4d620c7ebfb122c613afb886ca8803fa4e128a20a" [[package]] name = "libloading" @@ -1515,6 +1559,12 @@ version = "0.7.3" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "0e7465ac9959cc2b1404e8e2367b43684a6d13790fe23056cc8c6c5a6b7bcb94" +[[package]] +name = "matchit" +version = "0.8.4" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "47e1ffaa40ddd1f3ed91f717a33c8c0ee23fff369e3aa8772b9605cc1d22f4c3" + [[package]] name = "memchr" version = "2.7.4" @@ -1660,7 +1710,7 @@ dependencies = [ "backoff", "numaflow-core", "numaflow-models", - "rustls 0.23.19", + "rustls 0.23.21", "servesink", "serving", "tokio", @@ -1717,7 +1767,7 @@ name = "numaflow-core" version = "0.1.0" dependencies = [ "async-nats", - "axum", + "axum 0.8.2", "axum-server", "backoff", "base64 0.22.1", @@ -1740,7 +1790,7 @@ dependencies = [ "rand", "rcgen", "reqwest 0.12.12", - "rustls 0.23.19", + "rustls 0.23.21", "semver", "serde", "serde_json", @@ -2216,7 +2266,7 @@ dependencies = [ "quinn-proto", "quinn-udp", "rustc-hash 2.1.0", - "rustls 0.23.19", + "rustls 0.23.21", "socket2", "thiserror 2.0.8", "tokio", @@ -2234,7 +2284,7 @@ dependencies = [ "rand", "ring", "rustc-hash 2.1.0", - "rustls 0.23.19", + "rustls 0.23.21", "rustls-pki-types", "slab", "thiserror 2.0.8", @@ -2454,7 +2504,7 @@ dependencies = [ "percent-encoding", "pin-project-lite", "quinn", - "rustls 0.23.19", + "rustls 0.23.21", "rustls-pemfile 2.2.0", "rustls-pki-types", "serde", @@ -2542,9 +2592,9 @@ dependencies = [ [[package]] name = "rustls" -version = "0.23.19" +version = "0.23.21" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "934b404430bb06b3fae2cba809eb45a1ab1aecd64491213d7c3301b88393f8d1" +checksum = "8f287924602bf649d949c63dc8ac8b235fa5387d394020705b80c4eb597ce5b8" dependencies = [ "aws-lc-rs", "log", @@ -2721,9 +2771,9 @@ checksum = "61697e0a1c7e512e84a621326239844a24d8207b4669b41bc18b32ea5cbf988b" [[package]] name = "serde" -version = "1.0.215" +version = "1.0.217" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "6513c1ad0b11a9376da888e3e0baa0077f1aed55c17f50e7b2397136129fb88f" +checksum = "02fc4265df13d6fa1d00ecff087228cc0a2b5f3c0e87e258d8b94a156e984c70" dependencies = [ "serde_derive", ] @@ -2740,9 +2790,9 @@ dependencies = [ [[package]] name = "serde_derive" -version = "1.0.215" +version = "1.0.217" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "ad1e866f866923f252f05c889987993144fb74e722403468a4ebd70c3cd756c0" +checksum = "5a9bf7cf98d04a2b28aead066b7496853d4779c9cc183c440dbac457641e19a0" dependencies = [ "proc-macro2", "quote", @@ -2831,7 +2881,7 @@ dependencies = [ name = "serving" version = "0.1.0" dependencies = [ - "axum", + "axum 0.8.2", "axum-macros", "axum-server", "backoff", @@ -2845,7 +2895,7 @@ dependencies = [ "rcgen", "redis", "reqwest 0.12.12", - "rustls 0.23.19", + "rustls 0.23.21", "serde", "serde_json", "thiserror 1.0.69", @@ -3174,9 +3224,9 @@ checksum = "1f3ccbac311fea05f86f61904b462b55fb3df8837a366dfc601a0161d0532f20" [[package]] name = "tokio" -version = "1.41.1" +version = "1.43.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "22cfb5bee7a6a52939ca9224d6ac897bb669134078daa8735560897f69de4d33" +checksum = "3d61fa4ffa3de412bfea335c6ecff681de2b609ba3c77ef3e00e521813a9ed9e" dependencies = [ "backtrace", "bytes", @@ -3191,9 +3241,9 @@ dependencies = [ [[package]] name = "tokio-macros" -version = "2.4.0" +version = "2.5.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "693d596312e88961bc67d7f1f97af8a70227d9f90c31bba5806eec004978d752" +checksum = "6e06d43f1345a3bcd39f6a56dbb7dcab2ba47e68e8ac134855e7e2bdbaf8cab8" dependencies = [ "proc-macro2", "quote", @@ -3227,7 +3277,7 @@ version = "0.26.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "0c7bc40d0e5a97695bb96e27995cd3a08538541b0a846f65bba7a359f36700d4" dependencies = [ - "rustls 0.23.19", + "rustls 0.23.21", "rustls-pki-types", "tokio", ] @@ -3285,7 +3335,7 @@ checksum = "877c5b330756d856ffcc4553ab34a5684481ade925ecc54bcd1bf02b1d0d4d52" dependencies = [ "async-stream", "async-trait", - "axum", + "axum 0.7.9", "base64 0.22.1", "bytes", "h2 0.4.7", @@ -3354,7 +3404,6 @@ dependencies = [ "tokio", "tower-layer", "tower-service", - "tracing", ] [[package]] diff --git a/rust/Cargo.toml b/rust/Cargo.toml index 53c2ab3fd..d27063163 100644 --- a/rust/Cargo.toml +++ b/rust/Cargo.toml @@ -59,13 +59,20 @@ numaflow-models = { path = "numaflow-models" } backoff = { path = "backoff" } numaflow-pb = { path = "numaflow-pb" } numaflow-pulsar = { path = "extns/numaflow-pulsar" } -tokio = "1.41.1" -bytes = "1.7.1" -tracing = "0.1.40" -axum = "0.7.5" +tokio = "1.43.0" +bytes = "1.9.0" +tracing = "0.1.41" +axum = { version = "0.8.2", default-features = false, features = [ + "http1", + "http2", + "json", + "matched-path", + "query", + "tracing", +] } axum-server = { version = "0.7.1", features = ["tls-rustls"] } -serde = { version = "1.0.204", features = ["derive"] } -rustls = { version = "0.23.12", features = ["aws_lc_rs"] } +serde = { version = "1.0.217", features = ["derive"] } +rustls = { version = "0.23.21", features = ["aws_lc_rs"] } reqwest = { version = "0.12.12", default-features = false, features = [ "http2", "rustls-tls", diff --git a/rust/Dockerfile b/rust/Dockerfile deleted file mode 100644 index ce50094ca..000000000 --- a/rust/Dockerfile +++ /dev/null @@ -1,59 +0,0 @@ -# Use multi-stage builds to keep the final image small -# Use an official Rust image for the build stage -FROM rust:1.80-bookworm as builder - -RUN curl -L --proto '=https' --tlsv1.2 -sSf https://raw.githubusercontent.com/cargo-bins/cargo-binstall/main/install-from-binstall-release.sh | bash - -RUN apt-get update -RUN apt-get install protobuf-compiler -y - -RUN cargo new numaflow -# Create a new empty shell project -WORKDIR /numaflow - -RUN cargo new servesink -COPY ./servesink/Cargo.toml ./servesink/Cargo.toml - -RUN cargo new backoff -COPY ./backoff/Cargo.toml ./backoff/Cargo.toml - -RUN cargo new numaflow-models -COPY ./numaflow-models/Cargo.toml ./numaflow-models/Cargo.toml - -RUN cargo new numaflow-pb -COPY ./numaflow-pb/Cargo.toml ./numaflow-pb/Cargo.toml - -RUN cargo new numaflow-core -COPY numaflow-core/Cargo.toml ./numaflow-core/Cargo.toml - -RUN cargo new serving -COPY ./serving/Cargo.toml ./serving/Cargo.toml - -# Copy all Cargo.toml and Cargo.lock files for caching dependencies -COPY ./Cargo.toml ./Cargo.lock ./ - -# Build to cache dependencies -RUN mkdir -p src/bin && echo "fn main() {}" > src/bin/main.rs && \ - cargo build --workspace --all --release - -# Copy the actual source code files of the main project and the subprojects -COPY ./src ./src -COPY ./servesink/src ./servesink/src -COPY ./backoff/src ./backoff/src -COPY ./numaflow-models/src ./numaflow-models/src -COPY ./serving/src ./serving/src -COPY numaflow-core/src ./numaflow-core/src -COPY ./numaflow-pb/src ./numaflow-pb/src -# Build the real binaries -RUN touch src/bin/main.rs && \ - cargo build --workspace --all --release - -# Use a lightweight image for the runtime -FROM debian:bookworm as numaflow-ext - -RUN apt-get update && apt-get install -y libssl3 - -COPY --from=builder /numaflow/target/release/ . -COPY serving/config config - -ENTRYPOINT ["./numaflow"] \ No newline at end of file diff --git a/rust/rust-toolchain.toml b/rust/rust-toolchain.toml index 06bea96e8..9e722f47d 100644 --- a/rust/rust-toolchain.toml +++ b/rust/rust-toolchain.toml @@ -1,3 +1,3 @@ [toolchain] profile = "default" -channel = "1.82" +channel = "1.84" diff --git a/rust/serving/src/app/direct_proxy.rs b/rust/serving/src/app/direct_proxy.rs index 9f08321e2..aaa7dd328 100644 --- a/rust/serving/src/app/direct_proxy.rs +++ b/rust/serving/src/app/direct_proxy.rs @@ -35,7 +35,7 @@ pub(crate) fn direct_proxy(client: Client, upstream_addr: String) -> Router { Router::new() // https://docs.rs/axum/latest/axum/struct.Router.html#wildcards - .route("/*upstream", get(proxy)) + .route("/{*upstream}", get(proxy)) .with_state(proxy_state) } From 5724adfdc6fb943de334d3719c82a40e915a83b2 Mon Sep 17 00:00:00 2001 From: Sreekanth Date: Wed, 22 Jan 2025 10:21:14 +0530 Subject: [PATCH 2/3] Fix unit test failure Signed-off-by: Sreekanth --- rust/Cargo.lock | 1 + rust/Cargo.toml | 1 + rust/serving/src/callback.rs | 15 +++------------ 3 files changed, 5 insertions(+), 12 deletions(-) diff --git a/rust/Cargo.lock b/rust/Cargo.lock index 9b6ba0f58..e3ce78761 100644 --- a/rust/Cargo.lock +++ b/rust/Cargo.lock @@ -213,6 +213,7 @@ dependencies = [ "serde_path_to_error", "serde_urlencoded", "sync_wrapper 1.0.2", + "tokio", "tower 0.5.2", "tower-layer", "tower-service", diff --git a/rust/Cargo.toml b/rust/Cargo.toml index d27063163..de613b2be 100644 --- a/rust/Cargo.toml +++ b/rust/Cargo.toml @@ -69,6 +69,7 @@ axum = { version = "0.8.2", default-features = false, features = [ "matched-path", "query", "tracing", + "tokio", ] } axum-server = { version = "0.7.1", features = ["tls-rustls"] } serde = { version = "1.0.217", features = ["derive"] } diff --git a/rust/serving/src/callback.rs b/rust/serving/src/callback.rs index d01530dca..d3f7ba893 100644 --- a/rust/serving/src/callback.rs +++ b/rust/serving/src/callback.rs @@ -245,7 +245,7 @@ mod tests { // On the server, this fails with SubGraphInvalidInput("Invalid callback: 1234, vertex: in") // We get 200 OK response from the server, since we already registered this request ID in the store. - callback_handler + let callback_task = callback_handler .callback( ID_VALUE.into(), format!("https://localhost:{port}/v1/process/callback"), @@ -253,17 +253,8 @@ mod tests { vec![], ) .await?; - let mut data = None; - for _ in 0..10 { - tokio::time::sleep(Duration::from_millis(2)).await; - data = { - let guard = store.data.lock().unwrap(); - guard.get(ID_VALUE).cloned() - }; - if data.is_some() { - break; - } - } + callback_task.await.unwrap(); + let data = store.data.lock().unwrap().get(ID_VALUE).cloned(); assert!(data.is_some(), "Callback data not found in store"); server_handle.abort(); Ok(()) From 473b4cc8023fcef6c54323371366b717d11743ee Mon Sep 17 00:00:00 2001 From: Sreekanth Date: Wed, 22 Jan 2025 14:55:47 +0530 Subject: [PATCH 3/3] Upgrade tower Signed-off-by: Sreekanth --- rust/Cargo.lock | 4 ++-- rust/Cargo.toml | 1 + rust/numaflow-core/Cargo.toml | 2 +- rust/numaflow-core/src/lib.rs | 4 ++-- rust/serving/Cargo.toml | 4 ++-- 5 files changed, 8 insertions(+), 7 deletions(-) diff --git a/rust/Cargo.lock b/rust/Cargo.lock index e3ce78761..2961bf493 100644 --- a/rust/Cargo.lock +++ b/rust/Cargo.lock @@ -1802,7 +1802,7 @@ dependencies = [ "tokio-stream", "tokio-util", "tonic", - "tower 0.4.13", + "tower 0.5.2", "tracing", "trait-variant", ] @@ -2901,7 +2901,7 @@ dependencies = [ "serde_json", "thiserror 1.0.69", "tokio", - "tower 0.4.13", + "tower 0.5.2", "tower-http", "tracing", "trait-variant", diff --git a/rust/Cargo.toml b/rust/Cargo.toml index de613b2be..48f9e31dd 100644 --- a/rust/Cargo.toml +++ b/rust/Cargo.toml @@ -72,6 +72,7 @@ axum = { version = "0.8.2", default-features = false, features = [ "tokio", ] } axum-server = { version = "0.7.1", features = ["tls-rustls"] } +tower = "0.5.2" serde = { version = "1.0.217", features = ["derive"] } rustls = { version = "0.23.21", features = ["aws_lc_rs"] } reqwest = { version = "0.12.12", default-features = false, features = [ diff --git a/rust/numaflow-core/Cargo.toml b/rust/numaflow-core/Cargo.toml index 33c7e1cbe..f22424a5f 100644 --- a/rust/numaflow-core/Cargo.toml +++ b/rust/numaflow-core/Cargo.toml @@ -25,6 +25,7 @@ axum-server.workspace = true bytes.workspace = true serde.workspace = true rustls.workspace = true +tower.workspace = true tonic = "0.12.3" thiserror = "2.0.3" tokio-util = "0.7.11" @@ -34,7 +35,6 @@ prost-types = "0.13.1" chrono = "0.4.31" base64 = "0.22.1" hyper-util = "0.1.6" -tower = "0.4.13" serde_json = "1.0.122" trait-variant = "0.1.2" rcgen = "0.13.1" diff --git a/rust/numaflow-core/src/lib.rs b/rust/numaflow-core/src/lib.rs index d65380f8d..c9665ca78 100644 --- a/rust/numaflow-core/src/lib.rs +++ b/rust/numaflow-core/src/lib.rs @@ -68,7 +68,7 @@ pub async fn run() -> Result<()> { let crd_type = config().custom_resource_type.clone(); match crd_type { CustomResourceType::MonoVertex(config) => { - info!("Starting monovertex forwarder with config: {:?}", config); + info!("Starting monovertex forwarder with config: {:#?}", config); // Run the forwarder with cancellation token. if let Err(e) = monovertex::start_forwarder(cln_token, &config).await { error!("Application error running monovertex: {:?}", e); @@ -80,7 +80,7 @@ pub async fn run() -> Result<()> { } } CustomResourceType::Pipeline(config) => { - info!("Starting pipeline forwarder with config: {:?}", config); + info!("Starting pipeline forwarder with config: {:#?}", config); if let Err(e) = pipeline::start_forwarder(cln_token, config).await { error!("Application error running pipeline: {:?}", e); diff --git a/rust/serving/Cargo.toml b/rust/serving/Cargo.toml index 673bf6046..d828eaede 100644 --- a/rust/serving/Cargo.toml +++ b/rust/serving/Cargo.toml @@ -19,11 +19,11 @@ axum.workspace = true axum-server.workspace = true bytes.workspace = true rustls.workspace = true +tower.workspace = true +serde.workspace = true axum-macros = "0.4.1" hyper-util = { version = "0.1.6", features = ["client-legacy"] } -serde = { version = "1.0.204", features = ["derive"] } serde_json = "1.0.120" -tower = "0.4.13" tower-http = { version = "0.5.2", features = ["trace", "timeout"] } uuid = { version = "1.10.0", features = ["v4"] } redis = { version = "0.26.0", features = [