Provides a framework for submitting and consuming tasks asynchronously, using SQS as the task store
- Operation: a descriptor for what needs to be done and what constitutes a valid payload. e.g. 'SendEmail'
- Payload: the input for your operation. e.g.
{ recipient: '[email protected]' }
- Task: an instance of work. e.g.
Send email to recipient [email protected]
that contains an operationName (e.g. 'SendEmail'), a taskId, and a deserialized JSON payload (e.g.{ recipient: '[email protected]' }
) - Handler: the method that will receive your task
You will need to wire up the task client for two different aspects: submitting tasks and consuming tasks. Both will require a queue to be configured.
At a minimum you must provide a configuration for the default queue that will be used if an operation does not specify an alternative queue
const client = new AsyncTasksClient({
defaultQueue: {
queueUrl: 'https://your-sqs-queue-url'
}
})
This will by default create a new SQS default client that will be used for submitting and fetching tasks.
You must configure operations with the client before you can submit or process tasks.
An operation requires 3 parts:
- operationName: An identifier that will be used to route your tasks to the appropriate validation and handler. NOTE: both the task publisher and any task consumers will need to have the same mapping
- validate: An asynchronous function that receives a payload and makes sure it's sane, throwing an exception if not. This function will be called whenever submitTask() is called
- handle: An asynchronous function that receives a task along with a context and handles it, throwing an exception on failure
NOTE: all three parameters must be supplied regardless if this is the publisher or the consumer
client.registerOperation({
operationName: 'SendEmail',
validate: async (payload: SendEmailPayload): Promise<void> => {
if (typeof payload.recipient !== 'string') {
throw new TypeError('expected recipient to be a string')
}
},
handle: async (task: Task<SendEmailPayload>, ctx: MyTaskContext) {
const email = {
to: task.payload.recipient,
subject: 'Hello world',
body: 'Welcome!'
}
await ctx.emailer.sendEmail(email)
}
})
Once the client is configured you can use the submitTask
function to submit a new task request. The operationName
will be used to route to the correct operation configuration (defined in registerOperation
) for validation. Once validated, the task will be serialized into a JSON object and enqueued on the operation's target queue for processing.
const { taskId } = await client.submitTask({
operationName: 'SendEmail',
payload: {
recipient: '[email protected]'
}
})
// taskId: uuid
You can optionally also specify delaySeconds
, which will map to SQS's DelaySeconds
option (see
the SQS documentation for
more info). SQS defaults to the queue's configuration if no value is specified (usually 0 or immediately visible).
const { taskId } = await client.submitTask({
operationName: 'SendEmail',
payload: {
recipient: '[email protected]'
},
delaySeconds: 300
})
To save on network calls (and their resulting potential network failures) you can also use client.submitAllTasks to validate and then submit a batch of tasks to their assigned queues.
const { results } = await client.submitAllTasks([
{
operationName: 'SendEmail',
payload: {
recipient: '[email protected]'
},
delaySeconds: 300
},
{
operationName: 'SendSMS',
payload: {
recipient: '[email protected]'
}
},
{
operationName: 'SendPush',
payload: {
recipientId: 'user.v1.1234567'
}
}
])
> results
[
{ taskId: $uuid, status: BatchSubmitTaskStatus.SUCCESSFUL, error: undefined },
{ taskId: $uuid, status: BatchSubmitTaskStatus.FAILED, error: { code: $AWSCODE, message: $AWSMESSAGE, error: $SQSERROR }},
{ taskId: $uuid, status: BatchSubmitTaskStatus.SUCCESSFUL, error: undefined }
]
Notes:
- Each entry in results will be ordered according to the order of the inputs
- The same restrictions for SQS messages apply here, notable that the total payload must not exceed 256 KB
As with submitTask you can optionally specify delaySeconds
to delay visibility of the SQS message. See the SQS
documentation for more information.
Enqueued tasks require a worker to fetch, route, and handle the tasks. This library uses sqs-consumer
to handle fetching tasks from SQS. The generateConsumers
method will configure a hashmap of Consumer objects with the configured queueUrl, sqs client, and a handler function using the provided contextProvider
.
const contextProvider = async (sqsMessage: AWS.SQS.Message) => {
return {
emailer: new Emailer(),
sqsMessage
}
}
const consumersByQueueName = client.generateConsumers({ contextProvider })
const defaultConsumer = consumers['default']
defaultConsumer.on('processing_error', (err, message) => {
// throw exceptions, stat, etc
})
defaultConsumer.start()
More information about sqs-consumer: https://github.com/bbc/sqs-consumer
You can also request a Consumer for a specific queue by using client.generateConsumer
. This is useful for situations where you want to retrieve a specific consumer for splitting workloads or for creating multiple consumers for a particular queue on a single application instance, allowing for parallelized workload processing.
const queueName = 'default'
const contextProvider = async (sqsMessage: AWS.SQS.Message) => {
return {
emailer: new Emailer(),
sqsMessage
}
}
const defaultConsumer = client.generateConsumer({ queueName, contextProvider })
defaultConsumer.on('processing_error', (err, message) => {
// throw exceptions, stat, etc
})
defaultConsumer.start()
This library also provides a NoopClient that can be used for unit testing.
import { AsyncTasksClient, NoopClient, TaskClient } from '@fatlama/async-sqs-tasks'
const getTasksClient(env: Environment): TaskClient {
if (env === Environment.Test) {
return new NoopClient()
}
const config = { ... }
return new AsyncTasksClient(config)
}
- To just run tests:
yarn test
- To format the code using prettier:
yarn format
- To run the entire build process:
yarn release
Use the built-in npm version {patch|minor}
tool to increment the version number and trigger a release
$ git checkout -b release-1.0.1
$ npm version patch -m "Your release message here"
$ git push --tag
Once approved you should be able to merge into master. This will trigger a test-build-release flow in Circle CI. You
will need to press the confirm_publish
step and approve the publish.
NOTE: CircleCI will only listen for tags matching vX.Y.Z with any optional suffixes