Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Node: add XLEN command #358

Merged
merged 3 commits into from
Jun 12, 2024
Merged
Show file tree
Hide file tree
Changes from 2 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
21 changes: 21 additions & 0 deletions node/src/BaseClient.ts
Original file line number Diff line number Diff line change
Expand Up @@ -99,6 +99,7 @@ import {
createZRemRangeByScore,
createZScore,
createSUnionStore,
createXLen,
} from "./Commands";
import {
ClosingError,
Expand Down Expand Up @@ -2271,6 +2272,26 @@ export class BaseClient {
return this.createWritePromise(createXRead(keys_and_ids, options));
}

/**
* Returns the number of entries in the stream stored at `key`.
*
* See https://valkey.io/commands/xlen/ for more details.
*
* @param key - The key of the stream.
* @returns The number of entries in the stream. If `key` does not exist, returns `0`.
*
* @example
* ```typescript
* const numEntries = await client.xlen("my_stream");
* console.log(numEntries); // Output: 2 - "my_stream" contains 2 entries.
* ```
*/
public xlen(
key: string,
): Promise<number> {
return this.createWritePromise(createXLen(key));
}

private readonly MAP_READ_FROM_STRATEGY: Record<
ReadFrom,
connection_request.ReadFrom
Expand Down
7 changes: 7 additions & 0 deletions node/src/Commands.ts
Original file line number Diff line number Diff line change
Expand Up @@ -1418,6 +1418,13 @@ export function createXRead(
return createCommand(RequestType.XRead, args);
}

/**
* @internal
*/
export function createXLen(key: string): redis_request.Command {
return createCommand(RequestType.XLen, [key]);
}

/**
* @internal
*/
Expand Down
14 changes: 14 additions & 0 deletions node/src/Transaction.ts
Original file line number Diff line number Diff line change
Expand Up @@ -104,6 +104,7 @@ import {
createZRemRangeByScore,
createZScore,
createSUnionStore,
createXLen,
} from "./Commands";
import { redis_request } from "./ProtobufMessage";

Expand Down Expand Up @@ -1346,6 +1347,19 @@ export class BaseTransaction<T extends BaseTransaction<T>> {
return this.addAndReturn(createXRead(keys_and_ids, options));
}

/**
* Returns the number of entries in the stream stored at `key`.
*
* See https://valkey.io/commands/xlen/ for more details.
*
* @param key - The key of the stream.
*
* Command Response - The number of entries in the stream. If `key` does not exist, returns `0`.
*/
public xlen(key: string): T {
return this.addAndReturn(createXLen(key));
}

/**
* Renames `key` to `newkey`.
* If `newkey` already exists it is overwritten.
Expand Down
47 changes: 40 additions & 7 deletions node/tests/SharedTests.ts
Original file line number Diff line number Diff line change
Expand Up @@ -2467,10 +2467,12 @@ export function runBaseTests<Context>(config: {
);

it.each([ProtocolVersion.RESP2, ProtocolVersion.RESP3])(
`streams add and trim test_%p`,
async () => {
`streams add, trim, and len test_%p`,
async (protocol) => {
await runTest(async (client: BaseClient) => {
const key = uuidv4();
const nonExistingKey = uuidv4();
const stringKey = uuidv4();
const field1 = uuidv4();
const field2 = uuidv4();

Expand Down Expand Up @@ -2501,7 +2503,7 @@ export function runBaseTests<Context>(config: {
[field2, "bar2"],
]),
).not.toBeNull();
expect(await client.customCommand(["XLEN", key])).toEqual(2);
expect(await client.xlen(key)).toEqual(2);

// this will trim the first entry.
const id = await client.xadd(
Expand All @@ -2519,7 +2521,7 @@ export function runBaseTests<Context>(config: {
},
);
expect(id).not.toBeNull();
expect(await client.customCommand(["XLEN", key])).toEqual(2);
expect(await client.xlen(key)).toEqual(2);

// this will trim the 2nd entry.
expect(
Expand All @@ -2538,7 +2540,7 @@ export function runBaseTests<Context>(config: {
},
),
).not.toBeNull();
expect(await client.customCommand(["XLEN", key])).toEqual(2);
expect(await client.xlen(key)).toEqual(2);

expect(
await client.xtrim(key, {
Expand All @@ -2547,8 +2549,39 @@ export function runBaseTests<Context>(config: {
exact: true,
}),
).toEqual(1);
expect(await client.customCommand(["XLEN", key])).toEqual(1);
}, ProtocolVersion.RESP2);
expect(await client.xlen(key)).toEqual(1);

expect(
await client.xtrim(key, {
method: "maxlen",
threshold: 0,
exact: true,
}),
).toEqual(1);
// Unlike other Redis collection types, stream keys still exist even after removing all entries
expect(await client.exists([key])).toEqual(1);
expect(await client.xlen(key)).toEqual(0);

expect(
await client.xtrim(nonExistingKey, {
method: "maxlen",
threshold: 1,
exact: true,
}),
).toEqual(0);
expect(await client.xlen(nonExistingKey)).toEqual(0);

// key exists, but it is not a stream
expect(await client.set(stringKey, "foo")).toEqual("OK");
await expect(
client.xtrim(stringKey, {
method: "maxlen",
threshold: 1,
exact: true,
}),
).rejects.toThrow();
await expect(client.xlen(stringKey)).rejects.toThrow();
}, protocol);
},
config.timeout,
);
Expand Down
2 changes: 2 additions & 0 deletions node/tests/TestUtilities.ts
Original file line number Diff line number Diff line change
Expand Up @@ -387,6 +387,8 @@ export async function transactionTest(
args.push("0-2");
baseTransaction.xadd(key9, [["field", "value3"]], { id: "0-3" });
args.push("0-3");
baseTransaction.xlen(key9);
args.push(3);
baseTransaction.xread({ [key9]: "0-1" });
args.push({
[key9]: {
Expand Down
Loading