Skip to content

Commit

Permalink
use big length delimited codec frame sizes
Browse files Browse the repository at this point in the history
  • Loading branch information
jeromegn committed Oct 22, 2024
1 parent 6fa8d9e commit 6fb772b
Show file tree
Hide file tree
Showing 6 changed files with 28 additions and 9 deletions.
7 changes: 6 additions & 1 deletion crates/corro-admin/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -295,7 +295,12 @@ async fn handle_conn(
) -> Result<(), AdminError> {
// wrap in stream in line delimited json decoder
let mut stream: FramedStream = tokio_serde::Framed::new(
tokio_util::codec::Framed::new(stream, LengthDelimitedCodec::new()),
tokio_util::codec::Framed::new(
stream,
LengthDelimitedCodec::builder()
.max_frame_length(100 * 1_024 * 1_024)
.new_codec(),
),
Json::<Command, Response>::default(),
);

Expand Down
2 changes: 1 addition & 1 deletion crates/corro-agent/src/agent/bi.rs
Original file line number Diff line number Diff line change
Expand Up @@ -56,7 +56,7 @@ pub fn spawn_bipayload_handler(
let agent = agent.clone();
let bookie = bookie.clone();
async move {
let mut framed = FramedRead::new(rx, LengthDelimitedCodec::new());
let mut framed = FramedRead::new(rx, LengthDelimitedCodec::builder().max_frame_length(100 * 1_024 * 1_024).new_codec());

loop {
match timeout(Duration::from_secs(5), StreamExt::next(&mut framed)).await {
Expand Down
7 changes: 6 additions & 1 deletion crates/corro-agent/src/agent/uni.rs
Original file line number Diff line number Diff line change
Expand Up @@ -41,7 +41,12 @@ pub fn spawn_unipayload_handler(tripwire: &Tripwire, conn: &quinn::Connection, a
tokio::spawn({
let agent = agent.clone();
async move {
let mut framed = FramedRead::new(rx, LengthDelimitedCodec::new());
let mut framed = FramedRead::new(
rx,
LengthDelimitedCodec::builder()
.max_frame_length(100 * 1_024 * 1_024)
.new_codec(),
);

loop {
match StreamExt::next(&mut framed).await {
Expand Down
10 changes: 6 additions & 4 deletions crates/corro-agent/src/api/peer.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1063,13 +1063,13 @@ pub async fn parallel_sync(
*actor_id,
*addr,
async {
let mut codec = LengthDelimitedCodec::new();
let mut codec = LengthDelimitedCodec::builder().max_frame_length(100 * 1_024 * 1_024).new_codec();
let mut send_buf = BytesMut::new();
let mut encode_buf = BytesMut::new();

let actor_id = *actor_id;
let (mut tx, rx) = transport.open_bi(*addr).await?;
let mut read = FramedRead::new(rx, LengthDelimitedCodec::new());
let mut read = FramedRead::new(rx, LengthDelimitedCodec::builder().max_frame_length(100 * 1_024 * 1_024).new_codec());

encode_write_bipayload_msg(
&mut codec,
Expand Down Expand Up @@ -1240,7 +1240,7 @@ pub async fn parallel_sync(

tokio::spawn(async move {
// reusable buffers and constructs
let mut codec = LengthDelimitedCodec::new();
let mut codec = LengthDelimitedCodec::builder().max_frame_length(100 * 1_024 * 1_024).new_codec();
let mut send_buf = BytesMut::new();
let mut encode_buf = BytesMut::new();

Expand Down Expand Up @@ -1481,7 +1481,9 @@ pub async fn serve_sync(
tracing::Span::current().set_parent(context);

debug!(actor_id = %their_actor_id, self_actor_id = %agent.actor_id(), "received sync request");
let mut codec = LengthDelimitedCodec::new();
let mut codec = LengthDelimitedCodec::builder()
.max_frame_length(100 * 1_024 * 1_024)
.new_codec();
let mut send_buf = BytesMut::new();
let mut encode_buf = BytesMut::new();

Expand Down
4 changes: 3 additions & 1 deletion crates/corro-agent/src/broadcast/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -377,7 +377,9 @@ pub fn runtime_loop(
tokio::spawn(async move {
const BROADCAST_CUTOFF: usize = 64 * 1024;

let mut bcast_codec = LengthDelimitedCodec::new();
let mut bcast_codec = LengthDelimitedCodec::builder()
.max_frame_length(100 * 1_024 * 1_024)
.new_codec();

let mut bcast_buf = BytesMut::new();
let mut local_bcast_buf = BytesMut::new();
Expand Down
7 changes: 6 additions & 1 deletion crates/corrosion/src/admin.rs
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,12 @@ impl AdminConn {
let stream = UnixStream::connect(path).await?;
Ok(Self {
stream: Framed::new(
tokio_util::codec::Framed::new(stream, LengthDelimitedCodec::new()),
tokio_util::codec::Framed::new(
stream,
LengthDelimitedCodec::builder()
.max_frame_length(100 * 1_024 * 1_024)
.new_codec(),
),
Json::default(),
),
})
Expand Down

0 comments on commit 6fb772b

Please sign in to comment.