Skip to content

Commit

Permalink
Add support for throwing errors with custom error code (#272)
Browse files Browse the repository at this point in the history
* Added support for custom RSocketError

Signed-off-by: palani <[email protected]>

---------

Signed-off-by: palani <[email protected]>
  • Loading branch information
palamccc authored Feb 21, 2024
1 parent cdfa824 commit 860ddd5
Show file tree
Hide file tree
Showing 4 changed files with 94 additions and 9 deletions.
26 changes: 26 additions & 0 deletions packages/rsocket-core/src/RSocketError.js
Original file line number Diff line number Diff line change
@@ -0,0 +1,26 @@
/** Copyright (c) Facebook, Inc. and its affiliates.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*
* @flow
*/

'use strict';

export default class RSocketError extends Error {
+errorCode: number;
constructor(errorCode: number, message: string) {
super(message);
this.errorCode = errorCode;
}
}
22 changes: 13 additions & 9 deletions packages/rsocket-core/src/RSocketMachine.js
Original file line number Diff line number Diff line change
Expand Up @@ -58,6 +58,7 @@ import {
ResponderLeaseHandler,
Disposable,
} from './RSocketLease';
import RSocketError from './RSocketError';

type Role = 'CLIENT' | 'SERVER';

Expand Down Expand Up @@ -443,7 +444,7 @@ class RSocketMachineImpl<D, M> implements RSocketMachine<D, M> {
this._sendStreamComplete(streamId);
},
onError: error => {
this._sendStreamError(streamId, error.message);
this._sendStreamError(streamId, error);
},
//Subscriber methods
onNext: payload => {
Expand Down Expand Up @@ -677,7 +678,7 @@ class RSocketMachineImpl<D, M> implements RSocketMachine<D, M> {
if (this._isRequest(frame.type)) {
const leaseError = this._useLeaseOrError(this._responderLeaseHandler);
if (leaseError) {
this._sendStreamError(streamId, leaseError);
this._sendStreamError(streamId, new Error(leaseError));
return;
}
}
Expand Down Expand Up @@ -758,7 +759,7 @@ class RSocketMachineImpl<D, M> implements RSocketMachine<D, M> {
onComplete: payload => {
this._sendStreamPayload(streamId, payload, true);
},
onError: error => this._sendStreamError(streamId, error.message),
onError: error => this._sendStreamError(streamId, error),
onSubscribe: cancel => {
const subscription = {
cancel,
Expand All @@ -773,7 +774,7 @@ class RSocketMachineImpl<D, M> implements RSocketMachine<D, M> {
const payload = this._deserializePayload(frame);
this._requestHandler.requestStream(payload).subscribe({
onComplete: () => this._sendStreamComplete(streamId),
onError: error => this._sendStreamError(streamId, error.message),
onError: error => this._sendStreamError(streamId, error),
onNext: payload => this._sendStreamPayload(streamId, payload),
onSubscribe: subscription => {
this._subscriptions.set(streamId, subscription);
Expand Down Expand Up @@ -835,7 +836,7 @@ class RSocketMachineImpl<D, M> implements RSocketMachine<D, M> {

this._requestHandler.requestChannel(framesToPayloads).subscribe({
onComplete: () => this._sendStreamComplete(streamId),
onError: error => this._sendStreamError(streamId, error.message),
onError: error => this._sendStreamError(streamId, error),
onNext: payload => this._sendStreamPayload(streamId, payload),
onSubscribe: subscription => {
this._subscriptions.set(streamId, subscription);
Expand Down Expand Up @@ -864,16 +865,19 @@ class RSocketMachineImpl<D, M> implements RSocketMachine<D, M> {
});
}

_sendStreamError(streamId: number, errorMessage: string): void {
_sendStreamError(streamId: number, err: Error): void {
this._subscriptions.delete(streamId);
this._connection.sendOne({
code: ERROR_CODES.APPLICATION_ERROR,
code:
err instanceof RSocketError
? err.errorCode
: ERROR_CODES.APPLICATION_ERROR,
flags: 0,
message: errorMessage,
message: err.message,
streamId,
type: FRAME_TYPES.ERROR,
});
const error = new Error(`terminated from the requester: ${errorMessage}`);
const error = new Error(`terminated from the requester: ${err.message}`);
this._handleStreamError(streamId, error);
}

Expand Down
51 changes: 51 additions & 0 deletions packages/rsocket-core/src/__tests__/RSocketServer-test.js
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@ import {genMockConnection} from 'MockDuplexConnection';
import {genMockSubscriber} from 'MockFlowableSubscriber';
import {genMockPublisher} from 'MockFlowableSubscription';
import {Single, Flowable} from 'rsocket-flowable';
import RSocketError from '../RSocketError';

jest.useFakeTimers();

Expand Down Expand Up @@ -226,6 +227,56 @@ describe('RSocketServer', () => {
expect(console.error).toHaveBeenCalled();
});

it('sends custom error code if request handler throws RSocketError', () => {
console.error = jest.fn();
const transport = genMockTransportServer();
const server = new RSocketServer({
getRequestHandler: () => {
return {
requestResponse: () => {
throw new RSocketError(1234, 'Custom Error');
},
};
},
transport,
});
server.start();
transport.mock.connect();
connection.receive.mock.publisher.onNext({
type: FRAME_TYPES.SETUP,
data: undefined,
dataMimeType: '<dataMimeType>',
flags: 0,
keepAlive: 42,
lifetime: 2017,
metadata: undefined,
metadataMimeType: '<metadataMimeType>',
resumeToken: null,
streamId: 0,
majorVersion: 1,
minorVersion: 0,
});
jest.runOnlyPendingTimers();
connection.receive.mock.publisher.onNext({
type: FRAME_TYPES.REQUEST_RESPONSE,
data: undefined,
dataMimeType: '<dataMimeType>',
flags: 0,
metadata: undefined,
metadataMimeType: '<metadataMimeType>',
streamId: 1,
});
expect(connection.sendOne.mock.calls.length).toBe(1);
expect(connection.sendOne.mock.frame).toEqual({
code: 1234,
flags: 0,
message: 'Custom Error',
streamId: 1,
type: FRAME_TYPES.ERROR,
});
expect(console.error).toHaveBeenCalled();
});

it('call subscription.cancel() for all active subscriptions', () => {
let cancelled = false;
const transport = genMockTransportServer();
Expand Down
4 changes: 4 additions & 0 deletions packages/rsocket-core/src/index.js
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,10 @@ import RSocketServer from './RSocketServer';

export {RSocketServer};

import RSocketError from './RSocketError';

export {RSocketError};

import RSocketResumableTransport from './RSocketResumableTransport';

export {RSocketResumableTransport};
Expand Down

0 comments on commit 860ddd5

Please sign in to comment.