Skip to content

Commit

Permalink
Fix large sized pubsub message transfer (#791)
Browse files Browse the repository at this point in the history
* Fix large sized pubsub message transfer

* update timeout

* update version for release

* update comments
  • Loading branch information
badrishc authored Nov 12, 2024
1 parent 652b49f commit 0368d21
Show file tree
Hide file tree
Showing 6 changed files with 42 additions and 12 deletions.
5 changes: 3 additions & 2 deletions .azure/pipelines/azure-pipelines-external-release.yml
Original file line number Diff line number Diff line change
@@ -1,9 +1,10 @@
######################################
# NOTE: Before running this pipeline to generate a new nuget package, update the version string in two places
# 1) update the name: string below (line 6) -- this is the version for the nuget package (e.g. 1.0.0)
# 1) update the name: string below (line 7) -- this is the version for the nuget package (e.g. 1.0.0)
# 2) update \libs\host\GarnetServer.cs readonly string version (~line 32) -- NOTE - these two values need to be the same
# 3) update the version in GarnetServer.csproj (~line 8)
######################################
name: 1.0.36
name: 1.0.37
trigger:
branches:
include:
Expand Down
4 changes: 2 additions & 2 deletions libs/host/GarnetServer.cs
Original file line number Diff line number Diff line change
Expand Up @@ -28,8 +28,8 @@ namespace Garnet
/// </summary>
public class GarnetServer : IDisposable
{
// IMPORTANT: Keep the version in sync with .azure\pipelines\azure-pipelines-external-release.yml line ~6.
readonly string version = "1.0.36";
// IMPORTANT: Keep the version in sync with .azure\pipelines\azure-pipelines-external-release.yml line ~7 and GarnetServer.csproj line ~8.
readonly string version = "1.0.37";

internal GarnetProvider Provider;

Expand Down
1 change: 1 addition & 0 deletions libs/server/Resp/RespServerSession.cs
Original file line number Diff line number Diff line change
Expand Up @@ -1039,6 +1039,7 @@ private void WriteDirectLarge(ReadOnlySpan<byte> src)

// Adjust number of bytes to copy, to space left on output buffer, then copy
src.Slice(0, destSpace).CopyTo(new Span<byte>(dcurr, destSpace));
dcurr += destSpace;
src = src.Slice(destSpace);

// Send and reset output buffer
Expand Down
4 changes: 2 additions & 2 deletions main/GarnetServer/GarnetServer.csproj
Original file line number Diff line number Diff line change
Expand Up @@ -4,8 +4,8 @@
<OutputType>Exe</OutputType>
<ServerGarbageCollection>true</ServerGarbageCollection>

<!-- IMPORTANT: Keep the version in sync with .azure\pipelines\azure-pipelines-external-release.yml line ~6. -->
<Version>1.0.36</Version>
<!-- IMPORTANT: Keep the version in sync with .azure\pipelines\azure-pipelines-external-release.yml line ~7 and GarnetServer.cs line ~32. -->
<Version>1.0.37</Version>
<PackageId>garnet-server</PackageId>
<PackAsTool>true</PackAsTool>
<ToolCommandName>garnet-server</ToolCommandName>
Expand Down
34 changes: 29 additions & 5 deletions test/Garnet.test/RespPubSubTests.cs
Original file line number Diff line number Diff line change
Expand Up @@ -3,8 +3,8 @@

using System;
using System.Linq;
using System.Security.Cryptography;
using System.Threading;
using System.Threading.Channels;
using NUnit.Framework;
using NUnit.Framework.Legacy;
using StackExchange.Redis;
Expand All @@ -20,7 +20,7 @@ class RespPubSubTests
public void Setup()
{
TestUtils.DeleteDirectory(TestUtils.MethodTestDir, wait: true);
server = TestUtils.CreateGarnetServer(TestUtils.MethodTestDir);
server = TestUtils.CreateGarnetServer(TestUtils.MethodTestDir, pubSubPageSize: "256k");
server.Start();
}

Expand Down Expand Up @@ -52,6 +52,27 @@ public void BasicSUBSCRIBE()
sub.Unsubscribe(RedisChannel.Literal("messages"));
}

[Test]
public void LargeSUBSCRIBE()
{
using var subRedis = ConnectionMultiplexer.Connect(TestUtils.GetConfig());
using var redis = ConnectionMultiplexer.Connect(TestUtils.GetConfig());
var sub = subRedis.GetSubscriber();
var db = redis.GetDatabase(0);
RedisValue value = RandomNumberGenerator.GetBytes(140 * 1024);

ManualResetEvent evt = new(false);

SubscribeAndPublish(sub, db, RedisChannel.Literal("messages"), RedisChannel.Literal("messages"), value, onSubscribe: (channel, message) =>
{
ClassicAssert.AreEqual("messages", (string)channel);
ClassicAssert.AreEqual(value, (string)message);
evt.Set();
});

sub.Unsubscribe(RedisChannel.Literal("messages"));
}

[Test]
public void BasicPSUBSCRIBE()
{
Expand Down Expand Up @@ -180,9 +201,12 @@ public void BasicPUBSUB_NUMSUB()
sub.Unsubscribe(RedisChannel.Literal("messagesB"));
}

private void SubscribeAndPublish(ISubscriber sub, IDatabase db, RedisChannel channel, RedisChannel? publishChannel = null, string message = null, Action<RedisChannel, RedisValue> onSubscribe = null)
private void SubscribeAndPublish(ISubscriber sub, IDatabase db, RedisChannel channel, RedisChannel? publishChannel = null, RedisValue? message = null, Action<RedisChannel, RedisValue> onSubscribe = null)
{
message ??= "published message";
if (!message.HasValue)
{
message = "published message";
}
publishChannel ??= channel;
ManualResetEvent evt = new(false);
sub.Subscribe(channel, (receivedChannel, receivedMessage) =>
Expand All @@ -197,7 +221,7 @@ private void SubscribeAndPublish(ISubscriber sub, IDatabase db, RedisChannel cha
int repeat = 5;
while (true)
{
db.Publish(publishChannel.Value, message);
db.Publish(publishChannel.Value, message.Value);
var ret = evt.WaitOne(TimeSpan.FromSeconds(1));
if (ret) break;
repeat--;
Expand Down
6 changes: 5 additions & 1 deletion test/Garnet.test/TestUtils.cs
Original file line number Diff line number Diff line change
Expand Up @@ -201,7 +201,8 @@ public static GarnetServer CreateGarnetServer(
IAuthenticationSettings authenticationSettings = null,
bool enableLua = false,
ILogger logger = null,
IEnumerable<string> loadModulePaths = null)
IEnumerable<string> loadModulePaths = null,
string pubSubPageSize = null)
{
if (UseAzureStorage)
IgnoreIfNotRunningAzureTests();
Expand Down Expand Up @@ -280,6 +281,9 @@ public static GarnetServer CreateGarnetServer(
LoadModuleCS = loadModulePaths
};

if (!string.IsNullOrEmpty(pubSubPageSize))
opts.PubSubPageSize = pubSubPageSize;

if (!string.IsNullOrEmpty(objectStoreHeapMemorySize))
opts.ObjectStoreHeapMemorySize = objectStoreHeapMemorySize;

Expand Down

0 comments on commit 0368d21

Please sign in to comment.