Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: make stream message buffer size configurable via CLI #2473

Closed
wants to merge 2 commits into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 5 additions & 0 deletions apps/hubble/src/cli.ts
Original file line number Diff line number Diff line change
Expand Up @@ -186,6 +186,11 @@ app
.option("--commit-lock-timeout <number>", "Rocks DB commit lock timeout in milliseconds (default: 500)", parseNumber)
.option("--commit-lock-max-pending <number>", "Rocks DB commit lock max pending jobs (default: 1000)", parseNumber)
.option("--rpc-auth <username:password,...>", "Require username-password auth for RPC submit. (default: disabled)")
.option(
"--stream-message-buffer-size <number>",
"Maximum number of messages to buffer when a stream is backed up (default: 10000)",
parseNumber,
)

.action(async (cliOptions) => {
const handleShutdownSignal = (signalName: string) => {
Expand Down
12 changes: 6 additions & 6 deletions apps/hubble/src/rpc/bufferedStreamWriter.ts
Original file line number Diff line number Diff line change
Expand Up @@ -4,8 +4,8 @@ import { err, ok } from "neverthrow";

export const STREAM_DRAIN_TIMEOUT_MS = 10_000;
export const SLOW_CLIENT_GRACE_PERIOD_MS = 60_000;
// TODO: Make this configurable via CLI
export const STREAM_MESSAGE_BUFFER_SIZE = 10_000;
// Default value if not configured via CLI
export const DEFAULT_STREAM_MESSAGE_BUFFER_SIZE = 10_000;

/**
* A BufferedStreamWriter is a wrapper around a gRPC stream that will buffer messages when the stream is backed up.
Expand All @@ -16,11 +16,12 @@ export const STREAM_MESSAGE_BUFFER_SIZE = 10_000;
export class BufferedStreamWriter {
private streamIsBackedUp = false;
private stream: ServerWritableStream<SubscribeRequest, HubEvent>;
// biome-ignore lint/suspicious/noExplicitAny: legacy code, avoid using ignore for new code
private dataWaitingForDrain: any[] = [];
private bufferSize: number;

constructor(stream: ServerWritableStream<SubscribeRequest, HubEvent>) {
constructor(stream: ServerWritableStream<SubscribeRequest, HubEvent>, bufferSize?: number) {
this.stream = stream;
this.bufferSize = bufferSize || DEFAULT_STREAM_MESSAGE_BUFFER_SIZE;
}

/**
Expand All @@ -29,12 +30,11 @@ export class BufferedStreamWriter {
* ok(false) if the message was buffered because the stream is full
* err if the stream can't be written to any more and will be closed.
*/
// biome-ignore lint/suspicious/noExplicitAny: legacy code, avoid using ignore for new code
public writeToStream(message: any): HubResult<boolean> {
if (this.streamIsBackedUp) {
this.dataWaitingForDrain.push(message);

if (this.dataWaitingForDrain.length > STREAM_MESSAGE_BUFFER_SIZE) {
if (this.dataWaitingForDrain.length > this.bufferSize) {
this.destroyStream();

return err(new HubError("unavailable.network_failure", "Stream is backed up and cache is full"));
Expand Down