Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

workaround: lock ttl/expiration #20

Closed
wants to merge 2 commits into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
13 changes: 6 additions & 7 deletions Gemfile.lock
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
PATH
remote: .
specs:
fleiss (0.4.4)
fleiss (0.4.5)
activejob (>= 6.0)
activerecord (>= 6.0)
concurrent-ruby
Expand Down Expand Up @@ -53,16 +53,16 @@ GEM
diff-lcs (>= 1.2.0, < 2.0)
rspec-support (~> 3.10.0)
rspec-support (3.10.2)
rubocop (1.21.0)
rubocop (1.22.0)
parallel (~> 1.10)
parser (>= 3.0.0.0)
rainbow (>= 2.2.2, < 4.0)
regexp_parser (>= 1.8, < 3.0)
rexml
rubocop-ast (>= 1.9.1, < 2.0)
rubocop-ast (>= 1.12.0, < 2.0)
ruby-progressbar (~> 1.7)
unicode-display_width (>= 1.4.0, < 3.0)
rubocop-ast (1.11.0)
rubocop-ast (1.12.0)
parser (>= 3.0.1.1)
rubocop-bsm (0.6.0)
rubocop (~> 1.0)
Expand All @@ -74,9 +74,8 @@ GEM
rubocop-ast (>= 0.4.0)
rubocop-rake (0.6.0)
rubocop (~> 1.0)
rubocop-rspec (2.4.0)
rubocop (~> 1.0)
rubocop-ast (>= 1.1.0)
rubocop-rspec (2.5.0)
rubocop (~> 1.19)
ruby-progressbar (1.11.0)
sqlite3 (1.4.2)
tzinfo (2.0.4)
Expand Down
8 changes: 8 additions & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -60,3 +60,11 @@ Run the worker:
```ruby
bundle exec fleiss -I . -r config/environment
```

## Tweaks

Worker heartbeat is not implemented, so in rare cases jobs can be stuck in "running" state if worker was KILL-ed (and not shut down gracefully). Or if worker is killed after it acquired the job and then DB connection was lost.

To work around that, `FLEISS_LOCK_TTL` (seconds) ENV variable can be set. This should be larger than maximum expected job perform time.

Jobs that are started but not finished in `FLEISS_LOCK_TTL` seconds can be picked up by other workers.
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Better English suggestions are welcome ^^

2 changes: 1 addition & 1 deletion fleiss.gemspec
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
Gem::Specification.new do |s|
s.name = 'fleiss'
s.version = '0.4.4'
s.version = '0.4.5'
Copy link
Contributor Author

@mxmCherry mxmCherry Sep 30, 2021

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

No incompatible changes. This is a new, optional feature to be explicitly opted-in by setting FLEISS_LOCK_TTL.

s.authors = ['Black Square Media Ltd']
s.email = ['[email protected]']
s.summary = %(Minimialist background jobs backed by ActiveJob and ActiveRecord.)
Expand Down
25 changes: 19 additions & 6 deletions lib/fleiss/backend/active_record/concern.rb
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,8 @@ class ActiveRecord
module Concern
extend ActiveSupport::Concern

DEFAULT_LOCK_TTL = ENV['FLEISS_LOCK_TTL']&.to_i # seconds
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

No dynamic / official config (per-worker or so).

Considering this a tweak (even hack).

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

OK, so now I see it, I don't like this. Let's add a proper heartbeart instead. How about:

  1. add a lock_expires_at column - not sure how to deal with the migration of this?
  2. document a def max_run_time callback (like we do with def ttl)
  3. set lock_expires_at when job.max_run_time is set - much like https://github.com/bsm/fleiss/blob/main/lib/fleiss/backend/active_record/concern.rb#L46
  4. fix def pending scope

Copy link
Contributor Author

@mxmCherry mxmCherry Oct 1, 2021

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I find suggested solution viable but I don't like it too much (still sound workaround-ish):

  • if you call it max_run_time - that sounds like you imply that job can be killed when this is reached (Time.now > started_at + max_run_time)
  • if you call it proper lock_ttl (which better be done globally, per worker) - this is kinda closer, but also not worth much without a proper heartbeat in a separate thread (maybe one per-worker thread, not per job)

I tend to implement second option (one heartbeat thread per worker).

UPD: looks like https://www.rubydoc.info/gems/concurrent-ruby/Concurrent/TimerTask is the thing for it.

UPD2: actually, it cannot be one per worker, but like one per pooled thread because job "owner" is based on thread ID... How much deeper is the rabbit hole? :)

Btw, migrations are solved by create_table(...) unless table_exists?(...) and add_column(...) unless column_exists?(...).


included do
scope :in_queue, ->(qs) { where(queue_name: Array.wrap(qs)) }
scope :finished, -> { where.not(finished_at: nil) }
Expand All @@ -14,6 +16,15 @@ module Concern
scope :started, -> { where(arel_table[:started_at].not_eq(nil)) }
scope :not_started, -> { where(arel_table[:started_at].eq(nil)) }
scope :scheduled, ->(now = Time.zone.now) { where(arel_table[:scheduled_at].gt(now)) }

scope :lock_expired, lambda {|now = Time.zone.now, lock_ttl = DEFAULT_LOCK_TTL|
if lock_ttl
min_started_at = now - (1.1 * lock_ttl) # 10% threshold
where(finished_at: nil).where(arel_table[:started_at].lt(min_started_at)) # not yet finished and started at quite a while ago
else
none # do nothing unless explicitly configured
end
mxmCherry marked this conversation as resolved.
Show resolved Hide resolved
}
end

module ClassMethods
Expand All @@ -26,12 +37,14 @@ def wrap_perform(&block)

# @return [ActiveRecord::Relation] pending scope
def pending(now = Time.zone.now)
not_finished
.not_expired(now)
.not_started
.where(arel_table[:scheduled_at].lteq(now))
.order(priority: :desc)
.order(scheduled_at: :asc)
pending = not_finished
.not_expired(now)
.not_started
.where(arel_table[:scheduled_at].lteq(now))
.order(priority: :desc)
.order(scheduled_at: :asc)

pending.or(lock_expired(now, DEFAULT_LOCK_TTL))
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

that's not right. if something is e.g. finished or expired, you don't want to include it!!
it should be:

not_finished
  .not_exipred
  .where(arel_table[:scheduled_at].lteq(now))
  .merge(not_started(now).or(lock_expired(now))
  .order(priority: :desc, scheduled_at: :asc)

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

🤦

end

# @return [ActiveRecord::Relation] in-progress scope
Expand Down
57 changes: 50 additions & 7 deletions spec/fleiss/backend/active_record_spec.rb
Original file line number Diff line number Diff line change
Expand Up @@ -47,15 +47,28 @@ def retrieve(job)
end

it 'scopes pending' do
j1 = TestJob.perform_later
expect(retrieve(j1).start('owner')).to be_truthy
expect(retrieve(j1).finish('owner')).to be_truthy
finished = TestJob.perform_later
expect(retrieve(finished).start('owner')).to be_truthy
expect(retrieve(finished).finish('owner')).to be_truthy

j2 = TestJob.perform_later
_j3 = TestJob.set(wait: 1.hour).perform_later
j4 = TestJob.set(priority: 2).perform_later
# jobs with expired locks are seen as pending:
lock_expired = travel_to(2.days.ago) do
stub_const('Fleiss::Backend::ActiveRecord::Concern::DEFAULT_LOCK_TTL', 1.day.seconds)

TestJob.perform_later.tap do |job|
expect(retrieve(job).start('owner')).to be_truthy
end
end

pending = TestJob.perform_later
pending_high_prio = TestJob.set(priority: 2).perform_later
_future = TestJob.set(wait: 1.hour).perform_later # not visible yet

expect(described_class.pending.ids).to eq [j4.provider_job_id, j2.provider_job_id]
expect(described_class.pending.ids).to eq [
pending_high_prio.provider_job_id,
lock_expired.provider_job_id,
pending.provider_job_id,
]
end

it 'scopes in_progress' do
Expand Down Expand Up @@ -132,4 +145,34 @@ def retrieve(job)
end
.to raise_error(::ActiveRecord::StatementInvalid) # re-raised anyway
end

context 'with internal helpers' do
it 'scopes lock_expired' do
# one not finished, but "recent" job:
travel_to(1.days.ago) { expect(retrieve(TestJob.perform_later).start('owner')).to be_truthy }

# not finished, and "old":
old = travel_to(2.days.ago) do
retrieve(TestJob.perform_later).tap do |rec|
expect(rec.start('owner')).to be_truthy
end
end

# one "old", but finished:
travel_to(2.days.ago) do
job = TestJob.perform_later
expect(retrieve(job).start('owner')).to be_truthy
expect(retrieve(job).finish('owner')).to be_truthy
end

# one "old", but not-started job (so not eligible for lock check)
travel_to(2.days.ago) { TestJob.perform_later }

# don't do anything unless TTL configured:
expect(described_class.lock_expired(Time.zone.now, nil)).not_to exist

expect(described_class.lock_expired(Time.zone.now, 1.day)).to contain_exactly(old)
expect(described_class.lock_expired(Time.zone.now, 2.day)).not_to exist
end
end
end
3 changes: 3 additions & 0 deletions spec/spec_helper.rb
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@
require 'fleiss/backend/active_record/migration'
require 'active_job'
require 'active_job/queue_adapters/fleiss_adapter'
require 'active_support/testing/time_helpers'
require 'fileutils'

ActiveJob::Base.queue_adapter = :fleiss
Expand Down Expand Up @@ -44,6 +45,8 @@ def perform(msg = nil)
end

RSpec.configure do |c|
c.include ActiveSupport::Testing::TimeHelpers
mxmCherry marked this conversation as resolved.
Show resolved Hide resolved

c.after do
TestJob.performed.clear
Fleiss.backend.delete_all
Expand Down