From b364d32bdc6bf46d7f41b17754e72e0a147ff2e8 Mon Sep 17 00:00:00 2001 From: sabljak Date: Mon, 16 Sep 2024 12:33:12 +0200 Subject: [PATCH] Implement ActiveJob adapter --- .../queue_adapters/queue_classic_adapter.rb | 57 +++++++++++++++++++ 1 file changed, 57 insertions(+) create mode 100644 lib/active_job/queue_adapters/queue_classic_adapter.rb diff --git a/lib/active_job/queue_adapters/queue_classic_adapter.rb b/lib/active_job/queue_adapters/queue_classic_adapter.rb new file mode 100644 index 0000000..46f5ce4 --- /dev/null +++ b/lib/active_job/queue_adapters/queue_classic_adapter.rb @@ -0,0 +1,57 @@ +# frozen_string_literal: true + +if defined?(ActiveJob) && ActiveJob.version >= "8.0.0.alpha" + module ActiveJob + module QueueAdapters + # = queue_classic adapter for Active Job + # + # To use queue_classic set the queue_adapter config to +:queue_classic+. + # + # Rails.application.config.active_job.queue_adapter = :queue_classic + class QueueClassicAdapter < AbstractAdapter + def initialize(enqueue_after_transaction_commit: false) + @enqueue_after_transaction_commit = enqueue_after_transaction_commit + end + + def enqueue_after_transaction_commit? # :nodoc: + @enqueue_after_transaction_commit + end + + def enqueue(job) # :nodoc: + qc_job = build_queue(job.queue_name).enqueue("#{JobWrapper.name}.perform", job.serialize) + job.provider_job_id = qc_job["id"] if qc_job.is_a?(Hash) + qc_job + end + + def enqueue_at(job, timestamp) # :nodoc: + queue = build_queue(job.queue_name) + unless queue.respond_to?(:enqueue_at) + raise NotImplementedError, "To be able to schedule jobs with queue_classic " \ + "the QC::Queue needs to respond to `enqueue_at(timestamp, method, *args)`. " \ + "You can implement this yourself or you can use the queue_classic-later gem." + end + qc_job = queue.enqueue_at(timestamp, "#{JobWrapper.name}.perform", job.serialize) + job.provider_job_id = qc_job["id"] if qc_job.is_a?(Hash) + qc_job + end + + # Builds a +QC::Queue+ object to schedule jobs on. + # + # If you have a custom +QC::Queue+ subclass you'll need to subclass + # +ActiveJob::QueueAdapters::QueueClassicAdapter+ and override the + # build_queue method. + def build_queue(queue_name) + QC::Queue.new(queue_name) + end + + class JobWrapper # :nodoc: + class << self + def perform(job_data) + Base.execute job_data + end + end + end + end + end + end +end