From 6e9fd5cb8fbda177dc49d4752f5b3f17810fe107 Mon Sep 17 00:00:00 2001 From: Dzmitry Mikhalapau Date: Fri, 27 Dec 2024 21:55:08 +0500 Subject: [PATCH 1/3] Tests on external context cancellation Signed-off-by: Dzmitry Mikhalapau --- rpc/rpc_test.go | 74 +++++++++++++++++++++++++++++++++++++++++++++++++ 1 file changed, 74 insertions(+) create mode 100644 rpc/rpc_test.go diff --git a/rpc/rpc_test.go b/rpc/rpc_test.go new file mode 100644 index 00000000..aa212899 --- /dev/null +++ b/rpc/rpc_test.go @@ -0,0 +1,74 @@ +package rpc + +import ( + "context" + "net" + "testing" + + "github.com/stretchr/testify/require" + + "capnproto.org/go/capnp/v3" + "capnproto.org/go/capnp/v3/rpc/internal/testcapnp" +) + +func TestConnection_BaseContext(t *testing.T) { + t.Run("background context", func(t *testing.T) { + client, server := net.Pipe() + doneCh := make(chan struct{}, 1) + + go func() { + bootstrapClient := testcapnp.StreamTest_ServerToClient(slowStreamTestServer{}) + conn := NewConn(NewStreamTransport(server), &Options{ + BootstrapClient: capnp.Client(bootstrapClient), + }) + defer conn.Close() + + <-conn.Done() + close(doneCh) + }() + + func() { + conn := NewConn(NewStreamTransport(client), nil) + defer conn.Close() + + ctx, cancel := context.WithCancel(context.Background()) + defer cancel() + + client := testcapnp.StreamTest(conn.Bootstrap(ctx)) + defer client.Release() + + err := client.Push(ctx, func(st testcapnp.StreamTest_push_Params) error { + return st.SetData(make([]byte, 1)) + }) + + require.NoError(t, err) + }() + + <-doneCh + }) + + t.Run("external context", func(t *testing.T) { + _, server := net.Pipe() + doneCh := make(chan struct{}, 1) + + ctx, cancel := context.WithCancel(context.Background()) + defer cancel() + + go func() { + bootstrapClient := testcapnp.StreamTest_ServerToClient(slowStreamTestServer{}) + conn := NewConn(NewStreamTransport(server), &Options{ + BootstrapClient: capnp.Client(bootstrapClient), + BaseContext: func() context.Context { + return ctx + }, + }) + defer conn.Close() + + <-conn.Done() + close(doneCh) + }() + + cancel() + <-doneCh + }) +} From baf4bcb028309aac2b16ce9f71a105407e2b1cc2 Mon Sep 17 00:00:00 2001 From: Dzmitry Mikhalapau Date: Fri, 10 Jan 2025 22:14:55 +0500 Subject: [PATCH 2/3] Make external context test more rich Signed-off-by: Dzmitry Mikhalapau --- rpc/rpc_test.go | 53 ++++++++++++++++++++++++++++++++++++++++++++----- 1 file changed, 48 insertions(+), 5 deletions(-) diff --git a/rpc/rpc_test.go b/rpc/rpc_test.go index aa212899..751de8d8 100644 --- a/rpc/rpc_test.go +++ b/rpc/rpc_test.go @@ -3,6 +3,7 @@ package rpc import ( "context" "net" + "sync" "testing" "github.com/stretchr/testify/require" @@ -48,13 +49,22 @@ func TestConnection_BaseContext(t *testing.T) { }) t.Run("external context", func(t *testing.T) { - _, server := net.Pipe() - doneCh := make(chan struct{}, 1) + client, server := net.Pipe() ctx, cancel := context.WithCancel(context.Background()) defer cancel() + clientConnectedCh := make(chan struct{}, 1) + contextCancelledCh := make(chan struct{}, 1) + wg := &sync.WaitGroup{} + wg.Add(2) + go func() { + defer wg.Done() + + ctx, cancel := context.WithCancel(ctx) + defer cancel() + bootstrapClient := testcapnp.StreamTest_ServerToClient(slowStreamTestServer{}) conn := NewConn(NewStreamTransport(server), &Options{ BootstrapClient: capnp.Client(bootstrapClient), @@ -64,11 +74,44 @@ func TestConnection_BaseContext(t *testing.T) { }) defer conn.Close() + select { + case <-clientConnectedCh: + cancel() + contextCancelledCh <- struct{}{} + case <-conn.Done(): + t.Failed() + } + + // Connection should close when external context is cancelled <-conn.Done() - close(doneCh) }() - cancel() - <-doneCh + go func() { + defer wg.Done() + + conn := NewConn(NewStreamTransport(client), nil) + defer conn.Close() + + ctx, cancel := context.WithCancel(context.Background()) + defer cancel() + + client := testcapnp.StreamTest(conn.Bootstrap(ctx)) + defer client.Release() + + if err := client.Resolve(ctx); err != nil { + require.NoError(t, err) + } + + clientConnectedCh <- struct{}{} + <-contextCancelledCh + + err := client.Push(ctx, func(st testcapnp.StreamTest_push_Params) error { + return st.SetData(make([]byte, 1)) + }) + + require.NoError(t, err) + }() + + wg.Wait() }) } From 68d1266e58e0c170994c3d130add90cbe6893b1b Mon Sep 17 00:00:00 2001 From: Dzmitry Mikhalapau Date: Sun, 12 Jan 2025 15:23:53 +0500 Subject: [PATCH 3/3] Mark connection context test to be executed in parallel Signed-off-by: Dzmitry Mikhalapau --- rpc/rpc_test.go | 2 ++ 1 file changed, 2 insertions(+) diff --git a/rpc/rpc_test.go b/rpc/rpc_test.go index 751de8d8..5167ccf4 100644 --- a/rpc/rpc_test.go +++ b/rpc/rpc_test.go @@ -13,6 +13,8 @@ import ( ) func TestConnection_BaseContext(t *testing.T) { + t.Parallel() + t.Run("background context", func(t *testing.T) { client, server := net.Pipe() doneCh := make(chan struct{}, 1)