A gem containing libraries that support running background jobs or tasks. It’s designed to make it easy to:
-
Define a job
-
Enqueue a job
-
Poll for and execute jobs
It fits seamlessly into a rails environment (utilizing its configuration), but does not require one.
It is architected to support multiple types of queue implementations, but currently, it only includes an implementation using AWS SQS (aws.amazon.com/sqs/). Alternate queue implementations could be plugged in by the client using lib/simple_job/sqs_job_queue.rb as an example.
The AWS SQS queue implementation requires the aws-sdk gem, which must be initialized (by calling AWS.config) for this API to be capable of enqueuing or polling for jobs.
Queue configuration must be done by both the client and the server.
Only the queues that will be used by each must be defined (a client may configure a subset of the queues used by the server, so long as it configures all the queues that it uses).
SQSJobQueue is the queue implementation used by default. This may be overridden by calling SimpleJob::JobQueue.config :implementation => ‘my_queue_type’
SimpleJob::SQSJobQueue.config :queue_prefix => 'my-job' SimpleJob::SQSJobQueue.define_queue 'normal', :default => true
Complex configuration with explicit queue implementation, non-rails-defined environment, multiple queues, and CloudWatch monitoring¶ ↑
SimpleJob::JobQueue.config :implementation => 'sqs' SimpleJob::SQSJobQueue.config :queue_prefix => 'my-job', :environment => 'production', :cloud_watch_namespace => 'SimpleJob' SimpleJob::SQSJobQueue.define_queue 'normal', :visibility_timeout => 60, :default => true SimpleJob::SQSJobQueue.define_queue 'high-priority', :visibility_timeout => 10 SimpleJob::SQSJobQueue.define_queue 'long-running', :visibility_timeout => 3600
If you provide a CloudWatch namespace to SimpleJob::SQSJobQueue.config, then a series of metrics will be saved to AWS CloudWatch by the queue’s poll method:
SimpleJob::SQSJobQueue.config :cloud_watch_namespace => 'MyNamespace'
The following metrics will be published:
- MessageCheckCount
-
A count of the number of poll attempts
- MessageReceivedCount
-
A count of the number of poll attempts that return a message
- MessageMissCount
-
A count of the number of poll attempts that do not return a message
- ExecutionCount
-
A count of the number of jobs that are executed
- SuccessCount
-
A count of the number of jobs that are executed without error
- ErrorCount
-
A count of the number of jobs that raise an exception during execution
- ExecutionTime
-
The number of milliseconds it takes the job to execute
- TimeToCompletion
-
The number milliseconds between when a job is requested and when it is successfully completed
- ExecutionAttempts
-
The number of executions that occur before a job is successfully completed
The metrics above are published using the following dimensions:
- Environment
-
The current environment (ie. development vs. production)
- SQSQueueName
-
The name of the SQS queue being polled
- Host
-
The hostname of the machine running the poll operation
- JobType
-
The string label for the type of job being executed
Note that the JobType dimension will only be present on the ExecutionCount, SuccessCount, ErrorCount, ExecutionTime, TimeToCompletion, and ExecutionAttempts metrics.
If no namespace is provided to SimpleJob::SQSJobQueue.config, then no cloudwatch metrics will be gathered or published.
To define a job, simply include the SimpleJob::JobDefinition module, and implement its #execute method.
Attributes declared using simple_job_attribute will be automatically serialized into the JSON message that’s enqueued, and re-populated into the object once it’s dequeued. They each have standard getters/setters defined by attr_accessor, and may be set upon object initialization by passing them as hash keys/values to #new.
ActiveModel validation is supported and included automatically.
class FooSender include SimpleJob::JobDefinition simple_job_attribute :target, :foo_content # defines getters/setters for each, and # adds them to serialized message validates :target, :presence => true # standard ActiveModel validation def execute puts "#{foo_content} -> #{target}" end end
You may call #enqueue with no arguments, in which case JobQueue.default will be used (defined by passing the :default option when defining the queue as documented in the QueueConfiguration section).
Similar to ActiveRecord’s save method, enqueue will return true or false depending on whether the object passes validation.
f = FooSender.new(:target => 'joe', :foo_content => 'foo!') # can also assign attributes with f.target=, f.foo_content= if f.enqueue puts 'i just sent some foo to joe!' else puts "the following errors occurred: #{f.errors.full_messages.join('; ')}" end json = f.to_json # { "type": "foo_sender", "version": "1", "data": { "target": "joe", "foo_content": "foo!" } } f_copy = FooSender.new.from_json(json)
To explicitly specify the queue to use, simply specify its type identifier (defined in SQSJobQueue.define_queue declaration) when calling enqueue.
f = FooSender.new f.target = 'bob' f.foo_content = 'foo!' f.enqueue!('normal') # raises exception if operation fails
Alternatively, the queue may be attached to the job upfront for multiple enqueue operations:
FooSender.job_queue('high-priority') f1 = FooSender.new(:target => 'cookie monster', :foo_content => 'cookies and milk') f1.enqueue # high-priority queue will be used f2 = FooSender.new(:target => 'oscar the grouch', :foo_content => 'pizza') f2.enqueue # high-priority queue will be used
Calling #poll on a queue instance will, by default, dispatch each job to the proper registered class based on type and version. Its options are passed through to the underlying implementation. See SQSJobQueue::poll for documentation of the options it accepts.
JobQueue.default.poll(:poll_interval => 1, :idle_timeout => 60)
You can override the default behavior by passing a block to #poll that accepts both a job definition instance and an SQS message. In this case, you must call “definition.execute” in the block. The default handler simply calls definition.execute.
JobQueue.default.poll(:poll_interval => 1, :idle_timeout => 60) do |definition, message| logger.debug "received message: #{message.inspect}" logger.debug "dispatched to definition #{definition.inspect}; executing..." definition.execute end