Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Upgrade hyper/http, use new APIs #458

Merged
merged 1 commit into from
Dec 18, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
13 changes: 8 additions & 5 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -76,11 +76,14 @@ compact_jwt = "0.4.2"
futures = "^0.3.25"
hex = "0.4.3"
http = "^0.2.9"
http-body = "=1.0.0-rc.2"
http-body-util = "=0.1.0-rc.2"
hyper = { version = "=1.0.0-rc.3", default-features = false, features = [
http-body = "1.0.1"
http-body-util = "0.1.2"
hyper = { version = "1.5.1", default-features = false, features = [
"http1",
] }
hyper-util = { version = "0.1.10", features = [
"tokio",
] }
nom = "7.1"
peg = "0.8.1"
openssl = "^0.10.56"
Expand All @@ -100,14 +103,14 @@ tokio = { version = "1.22.0", features = [
] }
tokio-native-tls = "^0.3.1"
tokio-stream = { version = "0.1", features = ["sync"] }
tokio-tungstenite = { version = "^0.18.0", features = ["native-tls"] }
tokio-tungstenite = { version = "^0.24.0", features = ["native-tls"] }
tracing = "^0.1.35"
tracing-subscriber = { version = "0.3", features = [
"env-filter",
"std",
"fmt",
] }
tungstenite = { version = "^0.18.0", default-features = false, features = [
tungstenite = { version = "^0.24.0", default-features = false, features = [
"handshake",
] }
url = "2"
Expand Down
1 change: 1 addition & 0 deletions cable-tunnel-server/backend/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ hex.workspace = true
http-body.workspace = true
http-body-util.workspace = true
hyper = { workspace = true, features = ["server"] }
hyper-util.workspace = true
thiserror.workspace = true
tokio.workspace = true
tokio-native-tls.workspace = true
Expand Down
4 changes: 3 additions & 1 deletion cable-tunnel-server/backend/src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@ use hyper::{
upgrade::Upgraded,
Request, Response, StatusCode,
};
use hyper_util::rt::tokio::TokioIo;
use tokio::{
select,
sync::{
Expand Down Expand Up @@ -202,7 +203,7 @@ impl CableError {
#[instrument(level = "info", skip_all, err, fields(addr = addr.to_string()))]
async fn handle_websocket(
state: Arc<ServerState>,
mut ws_stream: WebSocketStream<Upgraded>,
mut ws_stream: WebSocketStream<TokioIo<Upgraded>>,
tx: Tx,
mut rx: Rx,
addr: SocketAddr,
Expand Down Expand Up @@ -383,6 +384,7 @@ async fn handle_request(

match hyper::upgrade::on(&mut req).await {
Ok(upgraded) => {
let upgraded = TokioIo::new(upgraded);
let ws_stream =
WebSocketStream::from_raw_socket(upgraded, Role::Server, config).await;
handle_websocket(ss, ws_stream, tx, rx, addr).await.ok();
Expand Down
1 change: 1 addition & 0 deletions cable-tunnel-server/common/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@ hex.workspace = true
http-body.workspace = true
http-body-util.workspace = true
hyper = { workspace = true, features = ["server"] }
hyper-util.workspace = true
thiserror.workspace = true
tokio.workspace = true
tokio-native-tls.workspace = true
Expand Down
5 changes: 3 additions & 2 deletions cable-tunnel-server/common/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -322,10 +322,10 @@ pub async fn run_server<F, R, ResBody, T>(
bind_address: SocketAddr,
tls_acceptor: Option<TlsAcceptor>,
server_state: T,
mut request_handler: F,
request_handler: F,
) -> Result<(), Box<dyn StdError>>
where
F: FnMut(Arc<T>, SocketAddr, Request<Incoming>) -> R + Copy + Send + Sync + 'static,
F: Fn(Arc<T>, SocketAddr, Request<Incoming>) -> R + Copy + Send + Sync + 'static,
R: Future<Output = Result<Response<ResBody>, Infallible>> + Send,
ResBody: Body + Send + 'static,
<ResBody as Body>::Error: Into<Box<dyn StdError + Send + Sync>>,
Expand Down Expand Up @@ -362,6 +362,7 @@ where
}
},
};
let stream = hyper_util::rt::tokio::TokioIo::new(stream);

let conn =
hyper::server::conn::http1::Builder::new().serve_connection(stream, service);
Expand Down
1 change: 1 addition & 0 deletions cable-tunnel-server/frontend/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ clap.workspace = true
hex.workspace = true
http-body-util.workspace = true
hyper = { workspace = true, features = ["client", "server"] }
hyper-util.workspace = true
tokio.workspace = true
tokio-native-tls.workspace = true
tokio-tungstenite.workspace = true
Expand Down
10 changes: 7 additions & 3 deletions cable-tunnel-server/frontend/src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,8 @@ use hyper::{
};

use cable_tunnel_server_common::*;
use tokio::{io::AsyncWriteExt, net::TcpStream, sync::RwLock};
use hyper_util::rt::TokioIo;
use tokio::{io::AsyncWriteExt as _, net::TcpStream, sync::RwLock};
use tokio_native_tls::TlsConnector;
use tokio_tungstenite::MaybeTlsStream;

Expand Down Expand Up @@ -199,6 +200,7 @@ async fn handle_request(
}
}
};
let backend_socket = TokioIo::new(backend_socket);

let (mut sender, conn) = match hyper::client::conn::http1::handshake(backend_socket).await {
Ok(v) => v,
Expand Down Expand Up @@ -239,22 +241,24 @@ async fn handle_request(
// Set up the "upgrade" handler to connect the two sockets together
tokio::task::spawn(async move {
// Upgrade the connection to the backend
let mut backend_upgraded = match hyper::upgrade::on(&mut backend_res).await {
let backend_upgraded = match hyper::upgrade::on(&mut backend_res).await {
Ok(u) => u,
Err(e) => {
error!("failure upgrading connection to backend: {e}");
return;
}
};
let mut backend_upgraded = TokioIo::new(backend_upgraded);

// Upgrade the connection from the client
let mut client_upgraded = match hyper::upgrade::on(&mut req).await {
let client_upgraded = match hyper::upgrade::on(&mut req).await {
Ok(u) => u,
Err(e) => {
error!("failure upgrading connection to client: {e}");
return;
}
};
let mut client_upgraded = TokioIo::new(client_upgraded);

// Connect the two streams together directly.
match tokio::io::copy_bidirectional(&mut backend_upgraded, &mut client_upgraded).await {
Expand Down
Loading