Skip to content

Commit

Permalink
Add nack redelivery test
Browse files Browse the repository at this point in the history
Signed-off-by: Peter Broadhurst <[email protected]>
  • Loading branch information
peterbroadhurst committed Jan 18, 2025
1 parent e700e74 commit 27de8ab
Show file tree
Hide file tree
Showing 2 changed files with 287 additions and 7 deletions.
12 changes: 5 additions & 7 deletions core/go/internal/txmgr/rpc_eventstreams.go
Original file line number Diff line number Diff line change
Expand Up @@ -44,10 +44,6 @@ func newRPCEventStreams(tm *txManager) *rpcEventStreams {
return es
}

func (es *rpcEventStreams) RPCAsyncHandler() rpcserver.RPCAsyncHandler {
return es
}

func (es *rpcEventStreams) StartMethod() string {
return "ptx_subscribe"
}
Expand Down Expand Up @@ -132,7 +128,7 @@ func (es *rpcEventStreams) HandleLifecycle(ctx context.Context, req *rpcclient.R
case "ptx_ack", "ptx_nack":
if sub != nil {
select {
case sub.acksNacks <- &rpcAckNack{ack: req.Method == "ptx_ack"}:
case sub.acksNacks <- &rpcAckNack{ack: (req.Method == "ptx_ack")}:
log.L(ctx).Infof("ack/nack received for subID %s ack=%t", subID, req.Method == "ptx_ack")
default:
}
Expand All @@ -141,8 +137,8 @@ func (es *rpcEventStreams) HandleLifecycle(ctx context.Context, req *rpcclient.R
case "ptx_unsubscribe":
if sub != nil {
sub.ctrl.Closed()
es.cleanupSubscription(subID)
}
es.cleanupSubscription(subID)
return &rpcclient.RPCResponse{
JSONRpc: "2.0",
ID: req.ID,
Expand Down Expand Up @@ -180,7 +176,9 @@ func (sub *receiptListenerSubscription) ConnectionClosed() {

func (es *rpcEventStreams) cleanupLocked(sub *receiptListenerSubscription) {
delete(sub.es.receiptSubs, sub.ctrl.ID())
sub.rrc.Close()
if sub.rrc != nil {
sub.rrc.Close()
}
close(sub.closed)
}

Expand Down
282 changes: 282 additions & 0 deletions core/go/internal/txmgr/rpc_eventstreams_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -159,3 +159,285 @@ func TestRPCEventListenerE2E(t *testing.T) {
<-unSubChan

}

func TestRPCEventListenerE2ENack(t *testing.T) {
ctx, url, txm, done := newTestTransactionManagerWithWebSocketRPC(t)
defer done()

wscConf, err := rpcclient.ParseWSConfig(ctx, &pldconf.WSClientConfig{
HTTPClientConfig: pldconf.HTTPClientConfig{URL: url},
})
require.NoError(t, err)

err = txm.CreateReceiptListener(ctx, &pldapi.TransactionReceiptListener{
Name: "listener1",
})
require.NoError(t, err)

wsc, err := wsclient.New(ctx, wscConf, nil, nil)
require.NoError(t, err)
err = wsc.Connect()
require.NoError(t, err)
defer wsc.Close()

subReqID, req := rpcTestRequest("ptx_subscribe", "receipts", "listener1")
err = wsc.Send(ctx, req)
require.NoError(t, err)

subIDChan := make(chan string)
unSubChan := make(chan bool)
sentNack := false
receipts := make(chan *pldapi.TransactionReceiptFull)
var unSubReqID atomic.Uint64
var subID atomic.Pointer[string]

go func() {
for payload := range wsc.Receive() {
var rpcPayload *rpcbackend.RPCResponse
err := json.Unmarshal(payload, &rpcPayload)
require.NoError(t, err)

if rpcPayload.Error != nil {
require.NoError(t, rpcPayload.Error.Error())
}

if !rpcPayload.ID.IsNil() {
var rpcID uint64
err := json.Unmarshal(rpcPayload.ID.Bytes(), &rpcID)
require.NoError(t, err)

switch rpcID {
case subReqID: // Subscribe reply
subIDChan <- rpcPayload.Result.AsString()
case unSubReqID.Load(): // Unsubscribe reply
unSubChan <- true
}
}

if rpcPayload.Method == "ptx_receiptBatch" {
var batchPayload pldapi.TransactionReceiptBatch
err := json.Unmarshal(rpcPayload.Params.Bytes(), &batchPayload)
require.NoError(t, err)

if !sentNack {
// send nack first
_, req := rpcTestRequest("ptx_nack", *subID.Load())
err = wsc.Send(ctx, req)
require.NoError(t, err)
sentNack = true
} else {
// then ack
for _, r := range batchPayload.Receipts {
receipts <- r
}
_, req := rpcTestRequest("ptx_ack", *subID.Load())
err = wsc.Send(ctx, req)
require.NoError(t, err)

}
}

}
}()

postCommit, err := txm.FinalizeTransactions(ctx, txm.p.DB(), []*components.ReceiptInput{
{
ReceiptType: components.RT_Success,
TransactionID: uuid.New(),
OnChain: randOnChain(tktypes.RandAddress()),
},
})
require.NoError(t, err)
postCommit()

subIDStr := <-subIDChan
_, err = uuid.Parse(subIDStr)
require.NoError(t, err)
subID.Store(&subIDStr)

// We get it on redelivery
<-receipts

reqID, req := rpcTestRequest("ptx_unsubscribe", subIDStr)
unSubReqID.Store(reqID)
err = wsc.Send(ctx, req)
require.NoError(t, err)
<-unSubChan

}

func TestRPCSubscribeNoType(t *testing.T) {
ctx, url, txm, done := newTestTransactionManagerWithWebSocketRPC(t)
defer done()

wscConf, err := rpcclient.ParseWSConfig(ctx, &pldconf.WSClientConfig{
HTTPClientConfig: pldconf.HTTPClientConfig{URL: url},
})
require.NoError(t, err)

err = txm.CreateReceiptListener(ctx, &pldapi.TransactionReceiptListener{
Name: "listener1",
})
require.NoError(t, err)

wsc, err := wsclient.New(ctx, wscConf, nil, nil)
require.NoError(t, err)
err = wsc.Connect()
require.NoError(t, err)
defer wsc.Close()

_, req := rpcTestRequest("ptx_subscribe")
err = wsc.Send(ctx, req)
require.NoError(t, err)

payload := <-wsc.Receive()

var rpcPayload *rpcbackend.RPCResponse
err = json.Unmarshal(payload, &rpcPayload)
require.NoError(t, err)
require.Regexp(t, "PD020003", rpcPayload.Error.Error())

}

func TestRPCSubscribeNoListener(t *testing.T) {
ctx, url, txm, done := newTestTransactionManagerWithWebSocketRPC(t)
defer done()

wscConf, err := rpcclient.ParseWSConfig(ctx, &pldconf.WSClientConfig{
HTTPClientConfig: pldconf.HTTPClientConfig{URL: url},
})
require.NoError(t, err)

err = txm.CreateReceiptListener(ctx, &pldapi.TransactionReceiptListener{
Name: "listener1",
})
require.NoError(t, err)

wsc, err := wsclient.New(ctx, wscConf, nil, nil)
require.NoError(t, err)
err = wsc.Connect()
require.NoError(t, err)
defer wsc.Close()

_, req := rpcTestRequest("ptx_subscribe", "receipts")
err = wsc.Send(ctx, req)
require.NoError(t, err)

payload := <-wsc.Receive()

var rpcPayload *rpcbackend.RPCResponse
err = json.Unmarshal(payload, &rpcPayload)
require.NoError(t, err)
require.Regexp(t, "PD012241", rpcPayload.Error.Error())

}

func TestRPCSubscribeBadListener(t *testing.T) {
ctx, url, txm, done := newTestTransactionManagerWithWebSocketRPC(t)
defer done()

wscConf, err := rpcclient.ParseWSConfig(ctx, &pldconf.WSClientConfig{
HTTPClientConfig: pldconf.HTTPClientConfig{URL: url},
})
require.NoError(t, err)

err = txm.CreateReceiptListener(ctx, &pldapi.TransactionReceiptListener{
Name: "listener1",
})
require.NoError(t, err)

wsc, err := wsclient.New(ctx, wscConf, nil, nil)
require.NoError(t, err)
err = wsc.Connect()
require.NoError(t, err)
defer wsc.Close()

_, req := rpcTestRequest("ptx_subscribe", "receipts", "unknown")
err = wsc.Send(ctx, req)
require.NoError(t, err)

payload := <-wsc.Receive()

var rpcPayload *rpcbackend.RPCResponse
err = json.Unmarshal(payload, &rpcPayload)
require.NoError(t, err)
require.Regexp(t, "PD012238", rpcPayload.Error.Error())

}

func TestUnsubscribeNoSubscriptionID(t *testing.T) {
ctx, url, txm, done := newTestTransactionManagerWithWebSocketRPC(t)
defer done()

wscConf, err := rpcclient.ParseWSConfig(ctx, &pldconf.WSClientConfig{
HTTPClientConfig: pldconf.HTTPClientConfig{URL: url},
})
require.NoError(t, err)

err = txm.CreateReceiptListener(ctx, &pldapi.TransactionReceiptListener{
Name: "listener1",
})
require.NoError(t, err)

wsc, err := wsclient.New(ctx, wscConf, nil, nil)
require.NoError(t, err)
err = wsc.Connect()
require.NoError(t, err)
defer wsc.Close()

_, req := rpcTestRequest("ptx_unsubscribe")
err = wsc.Send(ctx, req)
require.NoError(t, err)

payload := <-wsc.Receive()

var rpcPayload *rpcbackend.RPCResponse
err = json.Unmarshal(payload, &rpcPayload)
require.NoError(t, err)
require.Regexp(t, "PD012240", rpcPayload.Error.Error())

}

func TestHandleLifecycleUnkonwn(t *testing.T) {
ctx, _, txm, done := newTestTransactionManagerWithWebSocketRPC(t)
defer done()

res := txm.rpcEventStreams.HandleLifecycle(ctx, &rpcbackend.RPCRequest{
Method: "wrong",
Params: []*fftypes.JSONAny{fftypes.JSONAnyPtr(`"any"`)},
})
require.Regexp(t, "PD012239", res.Error.Error())

}

type mockRPCAsyncControl struct{}

func (ac *mockRPCAsyncControl) ID() string { return "sub1" }
func (ac *mockRPCAsyncControl) Closed() {}
func (ac *mockRPCAsyncControl) Send(method string, params any) {}

func TestHandleLifecycleNoBlockNack(t *testing.T) {
ctx, _, txm, done := newTestTransactionManagerWithWebSocketRPC(t)
defer done()

ctrl := &mockRPCAsyncControl{}
es := txm.rpcEventStreams
es.receiptSubs["sub1"] = &receiptListenerSubscription{
es: es,
ctrl: ctrl,
acksNacks: make(chan *rpcAckNack),
closed: make(chan struct{}),
}

res := es.HandleLifecycle(ctx, &rpcbackend.RPCRequest{
JSONRpc: "2.0",
ID: fftypes.JSONAnyPtr("12345"),
Method: "ptx_nack",
Params: []*fftypes.JSONAny{fftypes.JSONAnyPtr(`"sub1"`)},
})
require.Nil(t, res)

es.getSubscription("sub1").ConnectionClosed()
require.Empty(t, es.receiptSubs)

}

0 comments on commit 27de8ab

Please sign in to comment.