From a002ae6a022059ff1f23de991d66e2840120225f Mon Sep 17 00:00:00 2001 From: CheerlessCloud Date: Sat, 26 Jan 2019 20:56:43 +0300 Subject: [PATCH 1/2] feat(rpc): add application level retry functionality Also: - extract AMQPMessage mocks - extract AMQPMessageController with ack/reject methods - extract AMQPMessageRpcController with reply and resend methods - some rewrite tests This commit not introduce breaking changes to out public interface, but your code may be broken if you redefine handleFail private method in service class-based handlers. --- .eslintrc.js | 1 + src/AMQPAdapter.js | 4 +- src/AMQPMessage.js | 91 +++++++------ src/AMQPMessage.test.js | 143 +++++-------------- src/AMQPMessageController.js | 40 ++++++ src/AMQPMessageController.test.js | 82 +++++++++++ src/AMQPMessageMock.js | 72 ++++++++++ src/AMQPMessageRpcController.js | 54 ++++++++ src/AdapterConsumer.js | 2 + src/Client.js | 22 +++ src/Client.test.js | 36 +++++ src/Service.js | 40 ++---- src/rpc/Handler.js | 57 +++++--- src/rpc/Handler.test.js | 219 ++++++++++++++++++++++++------ test/adapter.js | 2 +- test/retries.js | 113 +++++++++++++++ 16 files changed, 734 insertions(+), 244 deletions(-) create mode 100644 src/AMQPMessageController.js create mode 100644 src/AMQPMessageController.test.js create mode 100644 src/AMQPMessageMock.js create mode 100644 src/AMQPMessageRpcController.js create mode 100644 src/Client.test.js create mode 100644 test/retries.js diff --git a/.eslintrc.js b/.eslintrc.js index d25e405..95f6409 100644 --- a/.eslintrc.js +++ b/.eslintrc.js @@ -37,6 +37,7 @@ module.exports = { "no-underscore-dangle": "off", "class-methods-use-this": "off", "import/no-named-as-default-member": "off", + "no-console": "error", }, "plugins": [ "flowtype", diff --git a/src/AMQPAdapter.js b/src/AMQPAdapter.js index 9f1d2b0..86ecc11 100644 --- a/src/AMQPAdapter.js +++ b/src/AMQPAdapter.js @@ -58,6 +58,8 @@ class AMQPAdapter { _state: _State = 'idle'; _eventBus: AMQPAdapterEventBus = (new EventEmitter(): any); _inflightHandlers: number = 0; + // @todo: add normal logger injection + // eslint-disable-next-line no-console _errorHandler: Error => mixed = err => console.error(err); _activeConsumers: string[] = []; @@ -224,7 +226,7 @@ class AMQPAdapter { this._inflightHandlers += 1; try { - const message = new AMQPMessage(originalMessage, this._channel); + const message = new AMQPMessage(originalMessage, this._channel, queue); await (handler: any)(message); } catch (err) { this._errorHandler( diff --git a/src/AMQPMessage.js b/src/AMQPMessage.js index 97b0f76..1ca5d27 100644 --- a/src/AMQPMessage.js +++ b/src/AMQPMessage.js @@ -7,35 +7,47 @@ export type MessageProps = { }; export interface IMessage { - +id: ?string; + +id: string; +correlationId: ?string; +payload: Object; +isSealed: boolean; +headers: Map; +isAnswerQueueEnabled: boolean; - +_props: MessageProps; - constructor(amqpMessage: CommonAMQPMessage, channel: AMQPChannel): IMessage; + +sourceQueue: string; + +applicationLevelRetryLimit: null | number; + +isApplicationLevelRetryEnabled: boolean; + // constructor( + // amqpMessage: CommonAMQPMessage, + // channel: AMQPChannel, + // sourceQueue: string, + // isSealed: ?boolean, + // ): IMessage; getPayloadAsObject(encoding?: buffer$NonBufferEncoding): Object; getPayloadAsString(encoding?: buffer$NonBufferEncoding): string; getPayloadAsBuffer(): Buffer; - ack(): Promise; - reject(requeue: ?boolean): Promise; - rejectAndRequeue(): Promise; + setApplicationLevelRetryLimit(number | string): void; } export default class AMQPMessage implements IMessage { _channel: AMQPChannel; _amqpMessage: CommonAMQPMessage; _isSealed: boolean = false; - - constructor(amqpMessage: CommonAMQPMessage, channel: AMQPChannel, isSealed: ?boolean): IMessage { + _sourceQueue: string; + + constructor( + amqpMessage: CommonAMQPMessage, + channel: AMQPChannel, + sourceQueue: string, + isSealed: ?boolean, + ): IMessage { this._channel = channel; this._amqpMessage = amqpMessage; this._isSealed = isSealed || false; + this._sourceQueue = sourceQueue; return this; } - get id(): ?string { + get id(): string { return this._amqpMessage.properties.messageId; } @@ -51,10 +63,35 @@ export default class AMQPMessage implements IMessage { return this._isSealed; } - get _props(): MessageProps { + get sourceQueue(): string { + return this._sourceQueue; + } + + get props(): MessageProps { return { ...this._amqpMessage.properties, ...this._amqpMessage.fields }; } + get headers(): Map { + return new Map(Object.entries(this._amqpMessage.properties.headers)); + } + + get isAnswerQueueEnabled(): boolean { + return !!this._amqpMessage.properties.replyTo; + } + + get applicationLevelRetryLimit(): null | number { + const retryLimitHeader = this.headers.get('X-Retry-Limit'); + return ![null, undefined].includes(retryLimitHeader) ? Number(retryLimitHeader) : null; + } + + get isApplicationLevelRetryEnabled(): boolean { + return typeof this.applicationLevelRetryLimit === 'number'; + } + + setApplicationLevelRetryLimit(value: number | string): void { + this._amqpMessage.properties.headers['X-Retry-Limit'] = String(value); + } + getPayloadAsObject(encoding?: buffer$NonBufferEncoding): Object { if (!encoding) { // eslint-disable-next-line no-param-reassign @@ -72,15 +109,7 @@ export default class AMQPMessage implements IMessage { return this._amqpMessage.content; } - get headers(): Map { - return new Map(Object.entries(this._amqpMessage.properties.headers)); - } - - get isAnswerQueueEnabled(): boolean { - return !!this._amqpMessage.properties.replyTo; - } - - _checkIsSealed() { + checkIsSealed() { if (!this._isSealed) { return; } @@ -88,28 +117,8 @@ export default class AMQPMessage implements IMessage { throw new Error('Message already acked/rejected or created in sealed mode'); } - async ack() { - this._checkIsSealed(); - await this._forceAck(); - } - - async reject(requeue: ?boolean = false) { - this._checkIsSealed(); - await this._forceReject(requeue); - } - - async _forceAck() { - this._channel.ack(this._amqpMessage); - this._isSealed = true; - } - - async _forceReject(requeue: ?boolean = false) { - this._channel.reject(this._amqpMessage, !!requeue); + toSeal() { + this.checkIsSealed(); this._isSealed = true; } - - async rejectAndRequeue(): Promise { - this._checkIsSealed(); - await this._forceReject(true); - } } diff --git a/src/AMQPMessage.test.js b/src/AMQPMessage.test.js index c52400a..ae43229 100644 --- a/src/AMQPMessage.test.js +++ b/src/AMQPMessage.test.js @@ -1,57 +1,25 @@ -/* eslint-disable no-param-reassign */ import test from 'ava'; -import uuid from 'uuid/v4'; import AMQPMessage from './AMQPMessage'; - -test.beforeEach(t => { - const ctx = {}; - t.context = ctx; - - ctx.channel = { - ack: () => {}, - reject: () => {}, - }; - - ctx.getAmqpMessageObject = (payloadBuffer, contentType) => ({ - content: payloadBuffer, - fields: { - deliveryTag: uuid(), - consumerTag: uuid(), - exchange: '', - routingKey: uuid(), - redelivered: false, - }, - properties: { - expiration: '1000', - userId: uuid(), - CC: '', - priority: 100, - persistent: true, - contentType, - contentEncoding: 'utf-8', - headers: {}, - correlationId: uuid(), - replyTo: uuid(), - messageId: uuid(), - timestamp: Date.now(), - type: uuid(), - appId: uuid(), - }, +import { + createAmqpMessageMock, + createChannelMock, + createAmqpMessageObjectMock, +} from './AMQPMessageMock'; + +test('construct AMQPMessage', t => { + const amqpMessage = createAmqpMessageObjectMock({ + payload: { foo: 42 }, }); -}); + const channel = createChannelMock(); -test('construct', t => { - const amqpMessage = t.context.getAmqpMessageObject(Buffer.from('1')); - const message = new AMQPMessage(amqpMessage, t.context.channel); - t.true(message instanceof AMQPMessage); + t.notThrows(() => new AMQPMessage(amqpMessage, channel)); }); test('get payload object', t => { const payload = { foo: 42 }; const payloadAsString = JSON.stringify(payload); const payloadAsBuffer = Buffer.from(payloadAsString); - const amqpMessage = t.context.getAmqpMessageObject(payloadAsBuffer); - const message = new AMQPMessage(amqpMessage, t.context.channel); + const { message } = createAmqpMessageMock({ payloadAsBuffer }); t.deepEqual(message.payload, payload); t.is(message.getPayloadAsString(), payloadAsString); @@ -59,75 +27,36 @@ test('get payload object', t => { t.deepEqual(message.getPayloadAsBuffer(), payloadAsBuffer); }); -// @todo implement this behavior in message and in service/client -test.skip('get payload buffer', t => { - const payloadAsBuffer = Buffer.from('myawesomebuffer: 0x12FFFF'); - const amqpMessage = t.context.getAmqpMessageObject(payloadAsBuffer, 'buffer'); - const message = new AMQPMessage(amqpMessage, t.context.channel); - - t.deepEqual(message.payload, payloadAsBuffer); - t.is(message.getPayloadAsString(), 'myawesomebuffer: 0x12FFFF'); - t.throws(() => message.getPayloadAsObject()); - t.deepEqual(message.getPayloadAsBuffer(), payloadAsBuffer); -}); - -test('correct ack', async t => { - t.plan(1); - const { channel } = t.context; - - const amqpMessage = t.context.getAmqpMessageObject(Buffer.from('1')); - - channel.ack = async message => { - t.is(message, amqpMessage); - }; - - const message = new AMQPMessage(amqpMessage, channel); - - await message.ack(); -}); - -test('correct reject', async t => { - t.plan(2); - const { channel } = t.context; - - const amqpMessage = t.context.getAmqpMessageObject(Buffer.from('1')); +test('application level retry getters', t => { + const { message: messageWithRetry } = createAmqpMessageMock({ + headers: { + 'X-Retry-Limit': 4, + }, + }); - channel.reject = async (message, requeue) => { - t.is(message, amqpMessage); - t.is(requeue, false); - }; + t.true(messageWithRetry.isApplicationLevelRetryEnabled); + t.is(messageWithRetry.applicationLevelRetryLimit, 4); - const message = new AMQPMessage(amqpMessage, channel); + const { message: messageWithoutRetry } = createAmqpMessageMock(); - await message.reject(); + t.false(messageWithoutRetry.isApplicationLevelRetryEnabled); + t.is(messageWithoutRetry.applicationLevelRetryLimit, null); }); -test('correct rejectAndRequeue', async t => { - t.plan(2); - const { channel } = t.context; - - const amqpMessage = t.context.getAmqpMessageObject(Buffer.from('1')); - - channel.reject = async (message, requeue) => { - t.is(message, amqpMessage); - t.is(requeue, true); - }; - - const message = new AMQPMessage(amqpMessage, channel); - - await message.rejectAndRequeue(); -}); +test('application level retry limit setter', t => { + const { message, amqpMessageObject } = createAmqpMessageMock({ + headers: { + 'X-Retry-Limit': '4', + }, + }); -test("can't ack or reject already acked/rejected message", async t => { - t.plan(2); - const { channel } = t.context; - channel.reject = async () => { - t.pass('rejected'); - }; + t.true(message.isApplicationLevelRetryEnabled); + t.is(message.applicationLevelRetryLimit, 4); + t.is(amqpMessageObject.properties.headers['X-Retry-Limit'], '4'); - const amqpMessage = t.context.getAmqpMessageObject(Buffer.from('1')); - const message = new AMQPMessage(amqpMessage, channel); + message.setApplicationLevelRetryLimit(6); - await message.reject(); - await t.throws(message.reject(), 'Message already acked/rejected or created in sealed mode'); + t.true(message.isApplicationLevelRetryEnabled); + t.is(message.applicationLevelRetryLimit, 6); + t.is(amqpMessageObject.properties.headers['X-Retry-Limit'], '6'); }); diff --git a/src/AMQPMessageController.js b/src/AMQPMessageController.js new file mode 100644 index 0000000..143e73c --- /dev/null +++ b/src/AMQPMessageController.js @@ -0,0 +1,40 @@ +// @flow +import type { Channel as AMQPChannel } from 'amqplib'; +import AMQPMessage from './AMQPMessage'; + +class AMQPMessageController { + _message: AMQPMessage; + _messageSourceChannel: AMQPChannel; + + constructor(message: AMQPMessage) { + this._message = message; + this._messageSourceChannel = message._channel; + } + + async ack() { + this._message.checkIsSealed(); + await this._forceAck(); + } + + async reject(requeue: ?boolean = false) { + this._message.checkIsSealed(); + await this._forceReject(requeue); + } + + async rejectAndRequeue(): Promise { + this._message.checkIsSealed(); + await this._forceReject(true); + } + + async _forceAck() { + this._messageSourceChannel.ack(this._message._amqpMessage); + this._message.toSeal(); + } + + async _forceReject(requeue: ?boolean = false) { + this._messageSourceChannel.reject(this._message._amqpMessage, !!requeue); + this._message.toSeal(); + } +} + +export default AMQPMessageController; diff --git a/src/AMQPMessageController.test.js b/src/AMQPMessageController.test.js new file mode 100644 index 0000000..ee823a7 --- /dev/null +++ b/src/AMQPMessageController.test.js @@ -0,0 +1,82 @@ +/* eslint-disable no-param-reassign */ +import test from 'ava'; +import AMQPMessageController from './AMQPMessageController'; +import { createAmqpMessageMock } from './AMQPMessageMock'; + +test.beforeEach(t => { + const ctx = {}; + t.context = ctx; + + const { message } = createAmqpMessageMock(); + ctx.message = message; + ctx.channel = message._channel; +}); + +test('construct', t => { + t.notThrows(() => new AMQPMessageController(t.context.message)); +}); + +test('correct ack', async t => { + t.plan(3); + const { channel, message } = t.context; + const controller = new AMQPMessageController(message); + + await t.notThrows(controller.ack()); + + t.true(channel.ack.calledOnce); + t.false(channel.reject.called); +}); + +test('correct reject', async t => { + t.plan(3); + const { channel, message } = t.context; + const controller = new AMQPMessageController(message); + + await t.notThrows(controller.reject()); + + t.false(channel.ack.called); + t.true(channel.reject.calledOnceWith(message._amqpMessage, false)); +}); + +test('correct rejectAndRequeue', async t => { + t.plan(3); + const { channel, message } = t.context; + const controller = new AMQPMessageController(message); + + await t.notThrows(controller.rejectAndRequeue()); + + t.false(channel.ack.called); + t.true(channel.reject.calledOnceWith(message._amqpMessage, true)); +}); + +test("can't ack/reject already acked message", async t => { + t.plan(6); + const { channel, message } = t.context; + const controller = new AMQPMessageController(message); + + await t.notThrows(controller.ack()); + + const errorMessage = 'Message already acked/rejected or created in sealed mode'; + await t.throws(controller.reject(), errorMessage); + await t.throws(controller.rejectAndRequeue(), errorMessage); + await t.throws(controller.ack(), errorMessage); + + t.true(channel.ack.calledOnce); + t.false(channel.reject.called); +}); + +test("can't ack/reject already rejected message", async t => { + t.plan(6); + const { channel, message } = t.context; + const controller = new AMQPMessageController(message); + + await t.notThrows(controller.reject()); + + const errorMessage = 'Message already acked/rejected or created in sealed mode'; + await t.throws(controller.reject(), errorMessage); + await t.throws(controller.rejectAndRequeue(), errorMessage); + await t.throws(controller.ack(), errorMessage); + + t.false(channel.ack.called); + t.true(channel.reject.calledOnce); +}); diff --git a/src/AMQPMessageMock.js b/src/AMQPMessageMock.js new file mode 100644 index 0000000..7381d7c --- /dev/null +++ b/src/AMQPMessageMock.js @@ -0,0 +1,72 @@ +import uuid from 'uuid/v4'; +// eslint-disable-next-line import/no-extraneous-dependencies +import { stub } from 'sinon'; +import AMQPMessage from './AMQPMessage'; + +const randomNumber = count => Math.round(Math.random() * count); + +export const createChannelMock = () => ({ + ack: stub().resolves(undefined), + reject: stub().resolves(undefined), +}); + +export const createAmqpMessageObjectMock = ({ + payloadAsBuffer, + payload = { defaultPayload: 'test' }, + contentType = 'application/json', + headers = {}, + redelivered = false, + routingKey = uuid(), + deliveryTag = randomNumber(1e6), + correlationId = uuid(), + replyTo = uuid(), + messageId = uuid(), + timestamp = Date.now(), + type = uuid(), + appId = uuid(), +} = {}) => ({ + content: payloadAsBuffer || Buffer.from(JSON.stringify(payload)), + fields: { + deliveryTag, + consumerTag: randomNumber(1e6), + exchange: '', + routingKey, + redelivered, + }, + properties: { + expiration: '1000', + userId: uuid(), + CC: '', + priority: 100, + persistent: true, + contentType, + contentEncoding: 'utf-8', + headers, + correlationId, + replyTo, + messageId, + timestamp, + type, + appId, + }, +}); + +export const createAmqpMessageMock = ( + messageProperties, + channel = createChannelMock(), + sourceQueue = uuid(), + isSealed = false, +) => { + const amqpMessageObject = createAmqpMessageObjectMock(messageProperties); + const amqpMessage = new AMQPMessage(amqpMessageObject, channel, sourceQueue, isSealed); + return { + message: amqpMessage, + amqpMessage, + amqpMessageObject, + channel, + sourceQueue, + isSealed, + }; +}; + +export default createAmqpMessageMock; diff --git a/src/AMQPMessageRpcController.js b/src/AMQPMessageRpcController.js new file mode 100644 index 0000000..889907a --- /dev/null +++ b/src/AMQPMessageRpcController.js @@ -0,0 +1,54 @@ +// @flow +import AMQPMessageController from './AMQPMessageController'; +import AMQPMessage from './AMQPMessage'; +import RpcService from './Service'; +import errorToObject from './rpc/errorToObject'; + +class AMQPMessageRpcController extends AMQPMessageController { + _service: RpcService; + + constructor(message: AMQPMessage, service: RpcService) { + super(message); + this._service = service; + } + + async reply({ payload, error }: { payload?: ?Object, error?: Error }) { + const { messageId, correlationId, replyTo } = this._message.props; + if (!replyTo) { + return; + } + + const adapter = this._service._getAdapter(); + await adapter.send( + replyTo, + { + error: errorToObject(error), + payload: payload === undefined ? null : payload, + }, + { messageId, correlationId }, + ); + } + + async resendAsRetry() { + const { messageId, correlationId, replyTo } = this._message.props; + + const retryLimit = this._message.applicationLevelRetryLimit; + + if (retryLimit === null) { + throw new Error('Retry disabled'); + } + + const adapter = this._service._getAdapter(); + await adapter.send(this._message.sourceQueue, this._message.payload, { + messageId, + correlationId, + replyTo, + headers: { + ...this._message.props.headers, + 'X-Retry-Limit': retryLimit, + }, + }); + } +} + +export default AMQPMessageRpcController; diff --git a/src/AdapterConsumer.js b/src/AdapterConsumer.js index 7b7bfc9..78a9a89 100644 --- a/src/AdapterConsumer.js +++ b/src/AdapterConsumer.js @@ -9,6 +9,8 @@ opaque type _ConnectOptions = ConnectOptions; export default class AdapterConsumer { _adapter: ?_AMQPAdapter; _connectParams: _ConnectOptions; + // @todo: add normal logger injection + // eslint-disable-next-line no-console _errorHandler: Error => mixed = err => console.error(err); _connectPromise: ?Promise = null; diff --git a/src/Client.js b/src/Client.js index f80912c..2e845aa 100644 --- a/src/Client.js +++ b/src/Client.js @@ -39,6 +39,7 @@ type RpcClientConstructorOptions = { connectParams?: ConnectOptions, service: string, version: string, + defaultRetryLimit?: number, waitResponseTimeout?: number, }; @@ -51,6 +52,7 @@ type RpcClientSendOptions = { +headers?: Object, +correlationId?: string, +type?: string, + +retryLimit?: number, waitResponseTimeout?: number, }; @@ -61,6 +63,7 @@ class RpcClient extends AdapterConsumer { _version: string; _replyQueueName: string = ''; _waitResponseTimeout: ?number; + _defaultRetryLimit: ?number; get service(): string { return this._service; @@ -79,12 +82,14 @@ class RpcClient extends AdapterConsumer { service, version, waitResponseTimeout, + defaultRetryLimit, }: RpcClientConstructorOptions) { super(); this._service = service; this._version = version; this._waitResponseTimeout = waitResponseTimeout; this._setConnectParams(connectParams); + this._defaultRetryLimit = defaultRetryLimit; } async _onInit() { @@ -154,12 +159,29 @@ class RpcClient extends AdapterConsumer { await this._sendMessage(messageId, payload, options); } + _getRetryLimitHeaders(options: ?RpcClientSendOptions = {}) { + const retryLimit = + (options || {}).retryLimit === undefined + ? this._defaultRetryLimit + : (options || {}).retryLimit; + + if ([null, undefined].includes(retryLimit)) { + return {}; + } + + return { 'X-Retry-Limit': retryLimit }; + } + async _sendMessage(messageId: string, payload: Object, options: ?RpcClientSendOptions) { const adapter = this._getAdapter(); // @todo check is queue exist before send await adapter.send(this.queueName, payload, { ...options, + headers: { + ...(options || {}).headers, + ...this._getRetryLimitHeaders(options), + }, messageId, replyTo: this._replyQueueName, timestamp: Date.now(), diff --git a/src/Client.test.js b/src/Client.test.js new file mode 100644 index 0000000..d17377c --- /dev/null +++ b/src/Client.test.js @@ -0,0 +1,36 @@ +/* eslint-disable no-param-reassign */ +import test from 'ava'; +import RpcClient from './Client'; + +test.beforeEach(t => { + const client = new RpcClient({ + service: '-mock-service-', + version: '1.2', + defaultRetryLimit: 3, + }); + + t.context = { client }; +}); + +test('should correct set default retry limit in constructor and apply on send', async t => { + const { client } = t.context; + + t.deepEqual(client._getRetryLimitHeaders(), { + 'X-Retry-Limit': client._defaultRetryLimit, + }); +}); + +test('should correct receive retry limit in options', async t => { + const { client } = t.context; + + t.deepEqual(client._getRetryLimitHeaders({ retryLimit: 5 }), { + 'X-Retry-Limit': 5, + }); +}); + +test('should empty headers on undefined retry limit in defaults and options', async t => { + const { client } = t.context; + + client._defaultRetryLimit = undefined; + t.deepEqual(client._getRetryLimitHeaders(), {}); +}); diff --git a/src/Service.js b/src/Service.js index 1d35fc5..c4feb90 100644 --- a/src/Service.js +++ b/src/Service.js @@ -5,6 +5,7 @@ import { type ConnectOptions, type QueueOptions } from './AMQPAdapter'; import AdapterConsumer from './AdapterConsumer'; import HandlerMap from './rpc/HandlerMap'; import type { IHandler } from './rpc/IHandler'; +import AMQPMessageRpcController from './AMQPMessageRpcController'; type RpcServiceQueueOptions = { ...$Exact, prefetch?: number }; @@ -41,30 +42,11 @@ class RpcService extends AdapterConsumer { this._setConnectParams(connectParams); } - // @todo plz kill me, sir! Duplicate in AMQPMessage - async _reply(message: IMessage, payload: ?Object = null, error: ?Error = null) { - const { messageId, correlationId, replyTo } = message._props; - if (!replyTo) { - return; - } - - let errorObj = null; - if (error) { - errorObj = {}; - Object.getOwnPropertyNames(error).forEach(key => { - (errorObj: any)[key] = (error: any)[key]; - }); - } - - const adapter = this._getAdapter(); - await adapter.send( - replyTo, - { - error: errorObj, - payload, - }, - { messageId, correlationId }, - ); + async _replyError(message: IMessage, error: Error) { + // $FlowFixMe + const messageController = new AMQPMessageRpcController(message, this); + await messageController.reject(); + await messageController.reply({ payload: null, error }); } async _initSubscriber() { @@ -91,7 +73,8 @@ class RpcService extends AdapterConsumer { } async _getHandlerClassByMessage(message: IMessage): Promise> { - const { type: action } = message._props; + // $FlowFixMe + const { type: action } = message.props; const Handler = this._handlerMap.get(action); if (Handler) { @@ -106,8 +89,7 @@ class RpcService extends AdapterConsumer { }); this._errorHandler(error); - await message.reject(); - await this._reply(message, null, error); + await this._replyError(message, error); } catch (error) { this._errorHandler(error); } @@ -126,8 +108,8 @@ class RpcService extends AdapterConsumer { originalError: err, }); this._errorHandler(error); - await message.reject(); - await this._reply(message, null, error); + + await this._replyError(message, error); } catch (error) { this._errorHandler(error); } diff --git a/src/rpc/Handler.js b/src/rpc/Handler.js index d3bc49c..c6dea8c 100644 --- a/src/rpc/Handler.js +++ b/src/rpc/Handler.js @@ -2,6 +2,7 @@ // @flow import EError from 'eerror'; import { type IMessage } from '../AMQPMessage'; +import AMQPMessageRpcController from '../AMQPMessageRpcController'; import RpcService from '../Service'; import errorToObject from './errorToObject'; import type { IHandler } from './IHandler'; @@ -10,11 +11,13 @@ import type { IHandler } from './IHandler'; function lastErrorHurdle(error: Error, handler: RpcHandler) { const errorCallback = handler._service._errorHandler; + // $FlowFixMe + const { correlationId } = handler._message.props; errorCallback( EError.wrap(error, { action: handler.action, messageId: handler._message.id, - correlationId: handler._message._props.correlationId, + correlationId, }), ); } @@ -22,6 +25,7 @@ function lastErrorHurdle(error: Error, handler: RpcHandler) { export default class RpcHandler implements IHandler { +_service: RpcService; +_message: IMessage; + +_messageController: AMQPMessageRpcController; +context: Object = {}; get action(): string { @@ -35,6 +39,8 @@ export default class RpcHandler implements IHandler { constructor({ service, message }: { service: RpcService, message: IMessage }): RpcHandler { this._service = service; this._message = message; + // $FlowFixMe + this._messageController = new AMQPMessageRpcController(message, service); if (this.handle === RpcHandler.prototype.handle) { throw new Error('You must override handle method'); @@ -43,35 +49,31 @@ export default class RpcHandler implements IHandler { return this; } - async reply({ payload, error }: { payload?: ?Object, error?: Error }) { - const { messageId, correlationId, replyTo } = this._message._props; - if (!replyTo) { - return; - } - - const adapter = this._service._getAdapter(); - await adapter.send( - replyTo, - { - error: errorToObject(error), - payload: payload === undefined ? null : payload, - }, - { messageId, correlationId }, - ); - } - async beforeHandle() {} async handle(): ?Object {} async handleFail(error: Error) { - await this.reply({ error }); - await this._message.reject(); + if (this._message.applicationLevelRetryLimit !== null) { + this._message.setApplicationLevelRetryLimit(this._message.applicationLevelRetryLimit - 1); + } + + if ( + this._message.applicationLevelRetryLimit === null || + this._message.applicationLevelRetryLimit <= 0 + ) { + await this._messageController.reply({ error }); + await this._messageController.reject(); + } else { + // retry flow + await this._messageController.ack(); + await this._messageController.resendAsRetry(); + } } async handleSuccess(replyPayload: ?Object) { - await this.reply({ payload: replyPayload }); - await this._message.ack(); + await this._messageController.reply({ payload: replyPayload }); + await this._messageController.ack(); } async onSuccess(replyPayload: ?Object) {} @@ -83,6 +85,17 @@ export default class RpcHandler implements IHandler { let replyPayload = null; try { + if (this._message.applicationLevelRetryLimit !== null) { + // $FlowFixMe + if (this._message.props.redelivered) { + this._message.setApplicationLevelRetryLimit(this._message.applicationLevelRetryLimit - 1); + } + + if (this._message.applicationLevelRetryLimit <= 0) { + throw new Error('Retry limit exceeded'); + } + } + await this.beforeHandle(); replyPayload = await this.handle(); await this.handleSuccess(replyPayload); diff --git a/src/rpc/Handler.test.js b/src/rpc/Handler.test.js index c7e93f2..56d22a6 100644 --- a/src/rpc/Handler.test.js +++ b/src/rpc/Handler.test.js @@ -1,11 +1,11 @@ /* eslint-disable no-param-reassign */ -// @flow import test from 'ava'; import { spy, stub } from 'sinon'; import uuid from 'uuid/v4'; import EError from 'eerror'; import errorToObject from './errorToObject'; import Handler from './Handler'; +import { createAmqpMessageMock } from '../AMQPMessageMock'; test.beforeEach(t => { t.context = {}; @@ -15,24 +15,22 @@ test.beforeEach(t => { return t.context.reply; } }; + t.context.adapterSendStub = stub().resolves(undefined); + t.context.adapterStub = { + send: t.context.adapterSendStub, + }; + t.context.serviceStub = { _errorHandler: stub(), - _getAdapter: () => ({ - send: t.context.adapterSendStub, - }), + _getAdapter: () => t.context.adapterStub, }; - t.context.messageStub = { - id: uuid(), - _props: { - messageId: uuid(), - correlationId: uuid(), - replyTo: uuid(), - }, + + const { message } = createAmqpMessageMock({ payload: { foo: 'bar' }, - ack: stub().resolves(undefined), - reject: stub().resolves(undefined), - }; + }); + + t.context.messageStub = message; }); test('construct handler', async t => { @@ -78,9 +76,9 @@ test('positive execute case', async t => { t.true(onSuccessSpy.calledOnce); t.true(onSuccessSpy.calledBefore(afterHandleSpy)); - t.true(messageStub.ack.calledOnce); - t.true(messageStub.ack.calledBefore(onSuccessSpy)); - t.false(messageStub.reject.called); + t.true(messageStub._channel.ack.calledOnce); + t.true(messageStub._channel.ack.calledBefore(onSuccessSpy)); + t.false(messageStub._channel.reject.called); t.true(afterHandleSpy.calledOnce); }); @@ -94,15 +92,15 @@ test('correct reply at positive execute case', async t => { handler.handleFail = err => t.fail(err); const handleSuccessSpy = spy(handler, 'handleSuccess'); - const replySpy = spy(handler, 'reply'); + const replySpy = spy(handler._messageController, 'reply'); await t.notThrows(handler.execute()); t.true(handleSuccessSpy.calledOnceWith(t.context.reply)); t.true(replySpy.calledOnceWith({ payload: t.context.reply })); t.true(t.context.adapterSendStub.calledOnce); - t.true(messageStub.ack.calledOnce); - t.false(messageStub.reject.called); + t.true(messageStub._channel.ack.calledOnce); + t.false(messageStub._channel.reject.called); }); test('correct reply when exception throwed in handler', async t => { @@ -118,7 +116,7 @@ test('correct reply when exception throwed in handler', async t => { const handleSuccessSpy = spy(handler, 'handleSuccess'); const handleFailSpy = spy(handler, 'handleFail'); const onFailSpy = spy(handler, 'onFail'); - const replySpy = spy(handler, 'reply'); + const replySpy = spy(handler._messageController, 'reply'); await t.notThrows(handler.execute()); @@ -130,8 +128,8 @@ test('correct reply when exception throwed in handler', async t => { t.deepEqual(replySpy.firstCall.args.pop(), { error }); t.true(t.context.adapterSendStub.calledOnce); - t.false(messageStub.ack.calledOnce); - t.true(messageStub.reject.calledOnce); + t.false(messageStub._channel.ack.calledOnce); + t.true(messageStub._channel.reject.calledOnce); t.true(onFailSpy.calledOnce); t.true(onFailSpy.calledAfter(handleFailSpy)); @@ -163,11 +161,11 @@ test('correct error flow when exception throwed in handleFail', async t => { t.is(finalError.stack, error.stack); t.is(finalError.action, handler.action); t.is(finalError.messageId, messageStub.id); - t.is(finalError.correlationId, messageStub._props.correlationId); + t.is(finalError.correlationId, messageStub.props.correlationId); t.false(handleSuccessSpy.called); - t.false(messageStub.ack.calledOnce); - t.false(messageStub.reject.calledOnce); + t.false(messageStub._channel.ack.calledOnce); + t.false(messageStub._channel.reject.calledOnce); t.false(onFailSpy.calledOnce); }); @@ -195,11 +193,11 @@ test('correct error flow when exception throwed in afterHandle', async t => { t.is(finalError.stack, error.stack); t.is(finalError.action, handler.action); t.is(finalError.messageId, messageStub.id); - t.is(finalError.correlationId, messageStub._props.correlationId); + t.is(finalError.correlationId, messageStub.props.correlationId); t.true(handleSuccessSpy.called); - t.true(messageStub.ack.calledOnce); - t.false(messageStub.reject.calledOnce); + t.true(messageStub._channel.ack.calledOnce); + t.false(messageStub._channel.reject.calledOnce); t.true(onSuccessSpy.calledOnceWith(t.context.reply)); }); @@ -216,14 +214,15 @@ test('must override handle method', async t => { ); }); -test('reply just return when no replyTo in message', async t => { +// TODO: move to AMQPMessageRpcController tests +test.skip('reply just return when no replyTo in message', async t => { const { AwesomeHandler, serviceStub, messageStub } = t.context; const handler = new AwesomeHandler({ service: serviceStub, message: { ...messageStub, - _props: { - ...messageStub._props, + props: { + ...messageStub.props, replyTo: undefined, }, }, @@ -232,14 +231,15 @@ test('reply just return when no replyTo in message', async t => { await t.notThrows(handler.reply({ payload: { foo: 42 } })); t.false(t.context.adapterSendStub.calledOnce); - t.false(messageStub.ack.calledOnce); - t.false(messageStub.reject.calledOnce); + t.false(messageStub._channel.ack.calledOnce); + t.false(messageStub._channel.reject.calledOnce); }); -test('reply on success', async t => { +// TODO: move to AMQPMessageRpcController tests +test.skip('reply on success', async t => { const { AwesomeHandler, serviceStub, messageStub, adapterSendStub } = t.context; const { - messageStub: { _props: props }, + messageStub: { props }, } = t.context; const handler = new AwesomeHandler({ service: serviceStub, @@ -255,18 +255,18 @@ test('reply on success', async t => { t.deepEqual(payload, { payload: { foo: 42 }, error: null }); t.deepEqual(options, { messageId: props.messageId, correlationId: props.correlationId }); - t.false(messageStub.ack.calledOnce); - t.false(messageStub.reject.calledOnce); + t.false(messageStub._channel.ack.calledOnce); + t.false(messageStub._channel.reject.calledOnce); }); -test('reply on error', async t => { +test.skip('reply on error', async t => { const { AwesomeHandler, serviceStub, messageStub, adapterSendStub } = t.context; const error = new EError('My awesome error').combine({ name: 'AwesomeError', foo: { bar: 42 }, }); const { - messageStub: { _props: props }, + messageStub: { props }, } = t.context; const handler = new AwesomeHandler({ service: serviceStub, @@ -282,8 +282,8 @@ test('reply on error', async t => { t.deepEqual(payload, { payload: null, error: errorToObject(error) }); t.deepEqual(options, { messageId: props.messageId, correlationId: props.correlationId }); - t.false(messageStub.ack.calledOnce); - t.false(messageStub.reject.calledOnce); + t.false(messageStub._channel.ack.calledOnce); + t.false(messageStub._channel.reject.calledOnce); }); test('default action name', async t => { @@ -305,7 +305,7 @@ test('payload getter', async t => { service: serviceStub, message: { ...messageStub, - get payload(): Object { + get payload() { t.pass(); return payload; }, @@ -314,3 +314,136 @@ test('payload getter', async t => { t.deepEqual(handler.payload, payload); }); + +test('#retry - fail with retry 1 and rejected', async t => { + const { AwesomeHandler, serviceStub } = t.context; + + const { message } = createAmqpMessageMock({ + headers: { + 'X-Retry-Limit': 1, + }, + redelivered: true, + }); + + const handler = new AwesomeHandler({ + service: serviceStub, + message, + }); + + const beforeHandleSpy = spy(handler, 'beforeHandle'); + const handleSpy = spy(handler, 'handle'); + const afterHandleSpy = spy(handler, 'afterHandle'); + const handleFailSpy = spy(handler, 'handleFail'); + const handleSuccessSpy = spy(handler, 'handleSuccess'); + const onFailSpy = spy(handler, 'onFail'); + const onSuccessSpy = spy(handler, 'onSuccess'); + + await t.notThrows(handler.execute()); + + t.false(beforeHandleSpy.called); + t.false(handleSpy.called); + t.true(handleFailSpy.called); + t.true(onFailSpy.called); + + t.false(handleSuccessSpy.called); + t.false(onSuccessSpy.called); + + t.true(message._channel.reject.calledOnce); + t.false(message._channel.ack.called); + t.true(afterHandleSpy.calledOnce); +}); + +test('#retry - reject when retry 1 and error throwed', async t => { + const { AwesomeHandler, serviceStub } = t.context; + + class MyHandler extends AwesomeHandler { + async handle() { + throw new Error(); + } + } + + const { message } = createAmqpMessageMock({ + headers: { + 'X-Retry-Limit': 1, + }, + redelivered: false, + }); + + const handler = new MyHandler({ + service: serviceStub, + message, + }); + + const beforeHandleSpy = spy(handler, 'beforeHandle'); + const handleSpy = spy(handler, 'handle'); + const afterHandleSpy = spy(handler, 'afterHandle'); + const handleFailSpy = spy(handler, 'handleFail'); + const handleSuccessSpy = spy(handler, 'handleSuccess'); + const onFailSpy = spy(handler, 'onFail'); + const onSuccessSpy = spy(handler, 'onSuccess'); + + await t.notThrows(handler.execute()); + + t.true(beforeHandleSpy.called); + t.true(handleSpy.called); + t.true(handleFailSpy.called); + t.true(onFailSpy.called); + + t.false(handleSuccessSpy.called); + t.false(onSuccessSpy.called); + + t.true(message._channel.reject.calledOnce); + t.false(message._channel.ack.called); + t.true(afterHandleSpy.calledOnce); +}); + +test('#retry - resend message when retry 2 and error throwed', async t => { + const { AwesomeHandler, serviceStub, adapterSendStub } = t.context; + + class MyHandler extends AwesomeHandler { + async handle() { + throw new Error(); + } + } + + const sourceQueueName = uuid(); + const { message } = createAmqpMessageMock( + { + headers: { + 'X-Retry-Limit': 3, + }, + }, + undefined, + sourceQueueName, + ); + + const handler = new MyHandler({ + service: serviceStub, + message, + }); + + const beforeHandleSpy = spy(handler, 'beforeHandle'); + const handleSpy = spy(handler, 'handle'); + const afterHandleSpy = spy(handler, 'afterHandle'); + const handleFailSpy = spy(handler, 'handleFail'); + const handleSuccessSpy = spy(handler, 'handleSuccess'); + const onFailSpy = spy(handler, 'onFail'); + const onSuccessSpy = spy(handler, 'onSuccess'); + const sendRetrySpy = spy(handler._messageController, 'resendAsRetry'); + + await t.notThrows(handler.execute()); + + t.true(beforeHandleSpy.called); + t.true(handleSpy.called); + t.true(handleFailSpy.called); + t.true(onFailSpy.called); + + t.false(handleSuccessSpy.called); + t.false(onSuccessSpy.called); + + t.false(message._channel.reject.calledOnce); + t.true(message._channel.ack.called); + t.true(sendRetrySpy.calledOnce); + t.is(adapterSendStub.args.pop()[0], sourceQueueName); + t.true(afterHandleSpy.calledOnce); +}); diff --git a/test/adapter.js b/test/adapter.js index e8e14e7..0597519 100644 --- a/test/adapter.js +++ b/test/adapter.js @@ -29,7 +29,7 @@ test('correct ack message after unsubscribe', async t => { await adapter.subscribe(queue, async msg => { try { await adapter._unsubscribeAll(); - await msg.ack(); + await msg._channel.ack(msg._amqpMessage); t.pass('message correctly acked after unsubscribe'); } catch (err) { t.fail(err); diff --git a/test/retries.js b/test/retries.js new file mode 100644 index 0000000..8d291f6 --- /dev/null +++ b/test/retries.js @@ -0,0 +1,113 @@ +/* eslint-disable no-param-reassign */ +import test from 'ava'; +import uuid from 'uuid/v4'; +import RpcClient from '../src/Client'; +import RpcService from '../src/Service'; +import RpcHandler from '../src/rpc/Handler'; + +test.beforeEach(async t => { + const ctx = {}; + t.context = ctx; + + ctx.serviceName = uuid(); + ctx.serviceVersion = '1.0'; + ctx.connectParams = { + url: process.env.RABBITMQ_URL || 'amqp://localhost:5672', + }; + + ctx.client = new RpcClient({ + service: ctx.serviceName, + version: ctx.serviceVersion, + connectParams: ctx.connectParams, + defaultRetryLimit: 3, + }); + + ctx.service = new RpcService({ + service: ctx.serviceName, + version: ctx.serviceVersion, + connectParams: ctx.connectParams, + queue: { + prefetch: 1, + durable: true, + maxPriority: 100, + }, + }); + + await Promise.all([ctx.client.ensureConnection(), ctx.service.ensureConnection()]); +}); + +test.afterEach(async t => { + const { client, service } = t.context; + + try { + await service.destroy(); + } catch (err) {} // eslint-disable-line no-empty + + try { + await client.destroy(); + } catch (err) {} // eslint-disable-line no-empty +}); + +test('should return result when tho fails and one success handle', async t => { + t.plan(10); + const { client, service } = t.context; + + const payload = { foo: 'bar' }; + const reply = { bar: 'foo' }; + let handlerCounter = 0; + + await service.addHandler( + class extends RpcHandler { + async handle() { + handlerCounter += 1; + t.deepEqual(this.payload, payload); + if (handlerCounter < 3) { + throw new Error('Some handler error'); + } + return reply; + } + + onFail(err) { + t.truthy('on fail called twice'); + t.is(err.message, 'Some handler error'); + } + + onSuccess() { + t.truthy('on success called once'); + } + }, + ); + + const callResult = await client.send(payload); + t.deepEqual(callResult, reply); + t.is(handlerCounter, 3); +}); + +test('should correct return error to client when retry limit exceeded', async t => { + t.plan(8); + const { client, service } = t.context; + + await client.ensureConnection(); + await service.ensureConnection(); + + const payload = { foo: 'bar' }; + let handlerCounter = 0; + + await service.addHandler( + class extends RpcHandler { + async handle() { + handlerCounter += 1; + throw new Error('Some handler error'); + } + + async onFail(err) { + t.truthy(err); + t.is(err.message, 'Some handler error'); + } + }, + ); + + await t.throws(client.send(payload), 'Some handler error'); + + t.is(handlerCounter, 3); +}); From 896ebf6e39ed198bb5800dfff6d9635592fb210f Mon Sep 17 00:00:00 2001 From: CheerlessCloud Date: Wed, 30 Jan 2019 13:58:00 +0300 Subject: [PATCH 2/2] docs(README.md): describe application level retries --- README.md | 42 +++++++++++++++++++++++++++--------------- 1 file changed, 27 insertions(+), 15 deletions(-) diff --git a/README.md b/README.md index 83f3319..94795a2 100644 --- a/README.md +++ b/README.md @@ -18,8 +18,8 @@ > **Attention, module currently in active development ⚠️**
**Soon to be released, maybe around - 15 october 2018 🖖** - + 30 february 2019 🖖** + ## Samples Client: @@ -35,12 +35,20 @@ import { RpcClient } from 'amq-rpc'; heartbeat: 30, }, waitResponseTimeout: 30 * 1000, // timeout for wait result from service + defaultRetryLimit: 10 // retry limit, by default retry 1 (disabled) }); await client.ensureConnection(); // accept in first param object as connectParams in constructor - const result = await client.send({ foo: 'bar' }, { correlationId: 'e.g. nginx req id' }); - const result2 = await client.call('myAction', { foo: 'bar' }, { correlationId: 'e.g. nginx req id' }); + // equal to call 'default' handler + const result = await client.send({ foo: 'bar' }, { + correlationId: 'e.g. nginx req id', + retryLimit: 5, // override default from constructor + }); + const result2 = await client.call('myAction', { foo: 'bar' }, { + correlationId: 'e.g. nginx req id', + retryLimit: 5, // override default from constructor + }); await client.destroy(); })().catch(err => console.error(err) || process.exit(1)); @@ -65,20 +73,20 @@ import { RpcService, RpcServiceHandler } from 'amq-rpc'; maxPriority: 100, }, }); - + service.setErrorHandler((error) => { // All errors, which can't passed to reject operation (as error in subscriber function, // outside of user handler), will be passed to this callback. }); - + await service.addHandler(class extends RpcServiceHandler { // If in message "type" property didn't fill (send without special options), - // service will find handler with action 'default' + // service will find handler with action 'default' get action() { // in base class, RpcServiceHandler, action equal to 'default' return 'myAction2'; } - + async beforeHandle() { // called nearly before handle method // use it for prepare data, init resources or logging @@ -97,10 +105,14 @@ import { RpcService, RpcServiceHandler } from 'amq-rpc'; async handleFail(error: Error) { /* In base class, RpcServiceHandler: - - reject message in queue - - reply to client error with messageId and correlationId + - if retry disabled or retry limit exceeded + - reject message in queue + - reply to client error with messageId and correlationId + - else + - ack currect message + - resend message to source queue with decremented retry limit header */ - // you can redefine and customize error handling behavior + // you can redefine and customize error handling behavior } // ⚠️ redefine this method only if you know what you do @@ -110,7 +122,7 @@ import { RpcService, RpcServiceHandler } from 'amq-rpc'; - ack message in queue - reply to client with payload and error: null */ - // you can redefine and customize success handling behavior + // you can redefine and customize success handling behavior } async onFail(error: Error) { @@ -125,7 +137,7 @@ import { RpcService, RpcServiceHandler } from 'amq-rpc'; // if current handler failed, error passed in first argument // if success handling, replyPayload passed as second argument // use it for logging or deinit resouces - // wrap this code in try..catch block, because all errors from afterHandle method just + // wrap this code in try..catch block, because all errors from afterHandle method just // pass to error handler callback } }); @@ -136,9 +148,9 @@ import { RpcService, RpcServiceHandler } from 'amq-rpc'; return { bar: `${this.payload.foo} 42` }; } }); - + await service.ensureConnection(); - + // If process receive SIGINT, service will be gracefully stopped // (wait for handler end work until timeout exceeded and then call for process.exit()) await service.interventSignalInterceptors({ stopSignal: 'SIGINT', gracefulStopTimeout: 10 * 1000 });