Skip to content

Commit

Permalink
Merge pull request #2 from CheerlessCloud/application-level-retries
Browse files Browse the repository at this point in the history
Add application level retry functionality
  • Loading branch information
CheerlessCloud authored Jan 30, 2019
2 parents dd3a0dc + 896ebf6 commit dbba27e
Show file tree
Hide file tree
Showing 17 changed files with 761 additions and 259 deletions.
1 change: 1 addition & 0 deletions .eslintrc.js
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,7 @@ module.exports = {
"no-underscore-dangle": "off",
"class-methods-use-this": "off",
"import/no-named-as-default-member": "off",
"no-console": "error",
},
"plugins": [
"flowtype",
Expand Down
42 changes: 27 additions & 15 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -18,8 +18,8 @@


> **Attention, module currently in active development ⚠️**<br>**Soon to be released, maybe around
15 october 2018 🖖**
30 february 2019 🖖**

## Samples
Client:

Expand All @@ -35,12 +35,20 @@ import { RpcClient } from 'amq-rpc';
heartbeat: 30,
},
waitResponseTimeout: 30 * 1000, // timeout for wait result from service
defaultRetryLimit: 10 // retry limit, by default retry 1 (disabled)
});

await client.ensureConnection(); // accept in first param object as connectParams in constructor

const result = await client.send({ foo: 'bar' }, { correlationId: 'e.g. nginx req id' });
const result2 = await client.call('myAction', { foo: 'bar' }, { correlationId: 'e.g. nginx req id' });
// equal to call 'default' handler
const result = await client.send({ foo: 'bar' }, {
correlationId: 'e.g. nginx req id',
retryLimit: 5, // override default from constructor
});
const result2 = await client.call('myAction', { foo: 'bar' }, {
correlationId: 'e.g. nginx req id',
retryLimit: 5, // override default from constructor
});

await client.destroy();
})().catch(err => console.error(err) || process.exit(1));
Expand All @@ -65,20 +73,20 @@ import { RpcService, RpcServiceHandler } from 'amq-rpc';
maxPriority: 100,
},
});

service.setErrorHandler((error) => {
// All errors, which can't passed to reject operation (as error in subscriber function,
// outside of user handler), will be passed to this callback.
});

await service.addHandler(class extends RpcServiceHandler {
// If in message "type" property didn't fill (send without special options),
// service will find handler with action 'default'
// service will find handler with action 'default'
get action() {
// in base class, RpcServiceHandler, action equal to 'default'
return 'myAction2';
}

async beforeHandle() {
// called nearly before handle method
// use it for prepare data, init resources or logging
Expand All @@ -97,10 +105,14 @@ import { RpcService, RpcServiceHandler } from 'amq-rpc';
async handleFail(error: Error) {
/*
In base class, RpcServiceHandler:
- reject message in queue
- reply to client error with messageId and correlationId
- if retry disabled or retry limit exceeded
- reject message in queue
- reply to client error with messageId and correlationId
- else
- ack currect message
- resend message to source queue with decremented retry limit header
*/
// you can redefine and customize error handling behavior
// you can redefine and customize error handling behavior
}

// ⚠️ redefine this method only if you know what you do
Expand All @@ -110,7 +122,7 @@ import { RpcService, RpcServiceHandler } from 'amq-rpc';
- ack message in queue
- reply to client with payload and error: null
*/
// you can redefine and customize success handling behavior
// you can redefine and customize success handling behavior
}

async onFail(error: Error) {
Expand All @@ -125,7 +137,7 @@ import { RpcService, RpcServiceHandler } from 'amq-rpc';
// if current handler failed, error passed in first argument
// if success handling, replyPayload passed as second argument
// use it for logging or deinit resouces
// wrap this code in try..catch block, because all errors from afterHandle method just
// wrap this code in try..catch block, because all errors from afterHandle method just
// pass to error handler callback
}
});
Expand All @@ -136,9 +148,9 @@ import { RpcService, RpcServiceHandler } from 'amq-rpc';
return { bar: `${this.payload.foo} 42` };
}
});

await service.ensureConnection();

// If process receive SIGINT, service will be gracefully stopped
// (wait for handler end work until timeout exceeded and then call for process.exit())
await service.interventSignalInterceptors({ stopSignal: 'SIGINT', gracefulStopTimeout: 10 * 1000 });
Expand Down
4 changes: 3 additions & 1 deletion src/AMQPAdapter.js
Original file line number Diff line number Diff line change
Expand Up @@ -58,6 +58,8 @@ class AMQPAdapter {
_state: _State = 'idle';
_eventBus: AMQPAdapterEventBus = (new EventEmitter(): any);
_inflightHandlers: number = 0;
// @todo: add normal logger injection
// eslint-disable-next-line no-console
_errorHandler: Error => mixed = err => console.error(err);
_activeConsumers: string[] = [];

Expand Down Expand Up @@ -224,7 +226,7 @@ class AMQPAdapter {

this._inflightHandlers += 1;
try {
const message = new AMQPMessage(originalMessage, this._channel);
const message = new AMQPMessage(originalMessage, this._channel, queue);
await (handler: any)(message);
} catch (err) {
this._errorHandler(
Expand Down
91 changes: 50 additions & 41 deletions src/AMQPMessage.js
Original file line number Diff line number Diff line change
Expand Up @@ -7,35 +7,47 @@ export type MessageProps = {
};

export interface IMessage {
+id: ?string;
+id: string;
+correlationId: ?string;
+payload: Object;
+isSealed: boolean;
+headers: Map<string, mixed>;
+isAnswerQueueEnabled: boolean;
+_props: MessageProps;
constructor(amqpMessage: CommonAMQPMessage, channel: AMQPChannel): IMessage;
+sourceQueue: string;
+applicationLevelRetryLimit: null | number;
+isApplicationLevelRetryEnabled: boolean;
// constructor(
// amqpMessage: CommonAMQPMessage,
// channel: AMQPChannel,
// sourceQueue: string,
// isSealed: ?boolean,
// ): IMessage;
getPayloadAsObject(encoding?: buffer$NonBufferEncoding): Object;
getPayloadAsString(encoding?: buffer$NonBufferEncoding): string;
getPayloadAsBuffer(): Buffer;
ack(): Promise<void>;
reject(requeue: ?boolean): Promise<void>;
rejectAndRequeue(): Promise<void>;
setApplicationLevelRetryLimit(number | string): void;
}

export default class AMQPMessage implements IMessage {
_channel: AMQPChannel;
_amqpMessage: CommonAMQPMessage;
_isSealed: boolean = false;

constructor(amqpMessage: CommonAMQPMessage, channel: AMQPChannel, isSealed: ?boolean): IMessage {
_sourceQueue: string;

constructor(
amqpMessage: CommonAMQPMessage,
channel: AMQPChannel,
sourceQueue: string,
isSealed: ?boolean,
): IMessage {
this._channel = channel;
this._amqpMessage = amqpMessage;
this._isSealed = isSealed || false;
this._sourceQueue = sourceQueue;
return this;
}

get id(): ?string {
get id(): string {
return this._amqpMessage.properties.messageId;
}

Expand All @@ -51,10 +63,35 @@ export default class AMQPMessage implements IMessage {
return this._isSealed;
}

get _props(): MessageProps {
get sourceQueue(): string {
return this._sourceQueue;
}

get props(): MessageProps {
return { ...this._amqpMessage.properties, ...this._amqpMessage.fields };
}

get headers(): Map<string, mixed> {
return new Map(Object.entries(this._amqpMessage.properties.headers));
}

get isAnswerQueueEnabled(): boolean {
return !!this._amqpMessage.properties.replyTo;
}

get applicationLevelRetryLimit(): null | number {
const retryLimitHeader = this.headers.get('X-Retry-Limit');
return ![null, undefined].includes(retryLimitHeader) ? Number(retryLimitHeader) : null;
}

get isApplicationLevelRetryEnabled(): boolean {
return typeof this.applicationLevelRetryLimit === 'number';
}

setApplicationLevelRetryLimit(value: number | string): void {
this._amqpMessage.properties.headers['X-Retry-Limit'] = String(value);
}

getPayloadAsObject(encoding?: buffer$NonBufferEncoding): Object {
if (!encoding) {
// eslint-disable-next-line no-param-reassign
Expand All @@ -72,44 +109,16 @@ export default class AMQPMessage implements IMessage {
return this._amqpMessage.content;
}

get headers(): Map<string, mixed> {
return new Map(Object.entries(this._amqpMessage.properties.headers));
}

get isAnswerQueueEnabled(): boolean {
return !!this._amqpMessage.properties.replyTo;
}

_checkIsSealed() {
checkIsSealed() {
if (!this._isSealed) {
return;
}

throw new Error('Message already acked/rejected or created in sealed mode');
}

async ack() {
this._checkIsSealed();
await this._forceAck();
}

async reject(requeue: ?boolean = false) {
this._checkIsSealed();
await this._forceReject(requeue);
}

async _forceAck() {
this._channel.ack(this._amqpMessage);
this._isSealed = true;
}

async _forceReject(requeue: ?boolean = false) {
this._channel.reject(this._amqpMessage, !!requeue);
toSeal() {
this.checkIsSealed();
this._isSealed = true;
}

async rejectAndRequeue(): Promise<void> {
this._checkIsSealed();
await this._forceReject(true);
}
}
Loading

0 comments on commit dbba27e

Please sign in to comment.