Skip to content

Commit

Permalink
Merge pull request #601 from homier/fix-tests
Browse files Browse the repository at this point in the history
Test RPC server external context cancellation
  • Loading branch information
lthibault authored Jan 13, 2025
2 parents c559d94 + 68d1266 commit 3adab7c
Showing 1 changed file with 119 additions and 0 deletions.
119 changes: 119 additions & 0 deletions rpc/rpc_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,119 @@
package rpc

import (
"context"
"net"
"sync"
"testing"

"github.com/stretchr/testify/require"

"capnproto.org/go/capnp/v3"
"capnproto.org/go/capnp/v3/rpc/internal/testcapnp"
)

func TestConnection_BaseContext(t *testing.T) {
t.Parallel()

t.Run("background context", func(t *testing.T) {
client, server := net.Pipe()
doneCh := make(chan struct{}, 1)

go func() {
bootstrapClient := testcapnp.StreamTest_ServerToClient(slowStreamTestServer{})
conn := NewConn(NewStreamTransport(server), &Options{
BootstrapClient: capnp.Client(bootstrapClient),
})
defer conn.Close()

<-conn.Done()
close(doneCh)
}()

func() {
conn := NewConn(NewStreamTransport(client), nil)
defer conn.Close()

ctx, cancel := context.WithCancel(context.Background())
defer cancel()

client := testcapnp.StreamTest(conn.Bootstrap(ctx))
defer client.Release()

err := client.Push(ctx, func(st testcapnp.StreamTest_push_Params) error {
return st.SetData(make([]byte, 1))
})

require.NoError(t, err)
}()

<-doneCh
})

t.Run("external context", func(t *testing.T) {
client, server := net.Pipe()

ctx, cancel := context.WithCancel(context.Background())
defer cancel()

clientConnectedCh := make(chan struct{}, 1)
contextCancelledCh := make(chan struct{}, 1)
wg := &sync.WaitGroup{}
wg.Add(2)

go func() {
defer wg.Done()

ctx, cancel := context.WithCancel(ctx)
defer cancel()

bootstrapClient := testcapnp.StreamTest_ServerToClient(slowStreamTestServer{})
conn := NewConn(NewStreamTransport(server), &Options{
BootstrapClient: capnp.Client(bootstrapClient),
BaseContext: func() context.Context {
return ctx
},
})
defer conn.Close()

select {
case <-clientConnectedCh:
cancel()
contextCancelledCh <- struct{}{}
case <-conn.Done():
t.Failed()
}

// Connection should close when external context is cancelled
<-conn.Done()
}()

go func() {
defer wg.Done()

conn := NewConn(NewStreamTransport(client), nil)
defer conn.Close()

ctx, cancel := context.WithCancel(context.Background())
defer cancel()

client := testcapnp.StreamTest(conn.Bootstrap(ctx))
defer client.Release()

if err := client.Resolve(ctx); err != nil {
require.NoError(t, err)
}

clientConnectedCh <- struct{}{}
<-contextCancelledCh

err := client.Push(ctx, func(st testcapnp.StreamTest_push_Params) error {
return st.SetData(make([]byte, 1))
})

require.NoError(t, err)
}()

wg.Wait()
})
}

0 comments on commit 3adab7c

Please sign in to comment.