From c7bb5b6afdf229d8a1b48f2de21b1001381553ee Mon Sep 17 00:00:00 2001 From: VolodymyrBg Date: Wed, 15 Jan 2025 22:24:32 +0200 Subject: [PATCH 1/2] feat: make stream message buffer size configurable via CLI --- apps/hubble/src/rpc/bufferedStreamWriter.ts | 12 ++++++------ 1 file changed, 6 insertions(+), 6 deletions(-) diff --git a/apps/hubble/src/rpc/bufferedStreamWriter.ts b/apps/hubble/src/rpc/bufferedStreamWriter.ts index 0a40288f20..603dd865a0 100644 --- a/apps/hubble/src/rpc/bufferedStreamWriter.ts +++ b/apps/hubble/src/rpc/bufferedStreamWriter.ts @@ -4,8 +4,8 @@ import { err, ok } from "neverthrow"; export const STREAM_DRAIN_TIMEOUT_MS = 10_000; export const SLOW_CLIENT_GRACE_PERIOD_MS = 60_000; -// TODO: Make this configurable via CLI -export const STREAM_MESSAGE_BUFFER_SIZE = 10_000; +// Default value if not configured via CLI +export const DEFAULT_STREAM_MESSAGE_BUFFER_SIZE = 10_000; /** * A BufferedStreamWriter is a wrapper around a gRPC stream that will buffer messages when the stream is backed up. @@ -16,11 +16,12 @@ export const STREAM_MESSAGE_BUFFER_SIZE = 10_000; export class BufferedStreamWriter { private streamIsBackedUp = false; private stream: ServerWritableStream; - // biome-ignore lint/suspicious/noExplicitAny: legacy code, avoid using ignore for new code private dataWaitingForDrain: any[] = []; + private bufferSize: number; - constructor(stream: ServerWritableStream) { + constructor(stream: ServerWritableStream, bufferSize?: number) { this.stream = stream; + this.bufferSize = bufferSize || DEFAULT_STREAM_MESSAGE_BUFFER_SIZE; } /** @@ -29,12 +30,11 @@ export class BufferedStreamWriter { * ok(false) if the message was buffered because the stream is full * err if the stream can't be written to any more and will be closed. */ - // biome-ignore lint/suspicious/noExplicitAny: legacy code, avoid using ignore for new code public writeToStream(message: any): HubResult { if (this.streamIsBackedUp) { this.dataWaitingForDrain.push(message); - if (this.dataWaitingForDrain.length > STREAM_MESSAGE_BUFFER_SIZE) { + if (this.dataWaitingForDrain.length > this.bufferSize) { this.destroyStream(); return err(new HubError("unavailable.network_failure", "Stream is backed up and cache is full")); From 80c9ad314f60e54d573176076184f960f19656f2 Mon Sep 17 00:00:00 2001 From: VolodymyrBg Date: Wed, 15 Jan 2025 22:25:13 +0200 Subject: [PATCH 2/2] feat: make stream message buffer size configurable via CLI - Add --stream-message-buffer-size CLI option to configure the maximum number of messages to buffer when a stream is backed up - Default value remains at 10,000 messages - Resolves TODO in bufferedStreamWriter.ts --- apps/hubble/src/cli.ts | 5 +++++ 1 file changed, 5 insertions(+) diff --git a/apps/hubble/src/cli.ts b/apps/hubble/src/cli.ts index 35849e35a4..1716ea3b79 100644 --- a/apps/hubble/src/cli.ts +++ b/apps/hubble/src/cli.ts @@ -186,6 +186,11 @@ app .option("--commit-lock-timeout ", "Rocks DB commit lock timeout in milliseconds (default: 500)", parseNumber) .option("--commit-lock-max-pending ", "Rocks DB commit lock max pending jobs (default: 1000)", parseNumber) .option("--rpc-auth ", "Require username-password auth for RPC submit. (default: disabled)") + .option( + "--stream-message-buffer-size ", + "Maximum number of messages to buffer when a stream is backed up (default: 10000)", + parseNumber, + ) .action(async (cliOptions) => { const handleShutdownSignal = (signalName: string) => {