Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Feature request: Job upsert #240

Closed
jamesholcomb opened this issue Jul 6, 2020 · 7 comments
Closed

Feature request: Job upsert #240

jamesholcomb opened this issue Jul 6, 2020 · 7 comments

Comments

@jamesholcomb
Copy link

Hi all,

I am attempting to use BullMQ to debounce an event stream on the trailing edge using delayed jobs. The debounce window can vary depending on the use case (from several seconds to months).

My first shot at this was to use job.name as a key to group jobs that correspond to the same window. Here is some pseudo code for a job scheduler:

// get all delayed jobs with the same name
// should never return length > 1 😄 
const [delayedJob] = await queue
  .getDelayed()
  .then((jobs = []) => jobs.filter((j) => j?.name === jobName))

if (delayedJob) {
  await job.remove()
}

await queue.add(jobName, jobData, {
  delay: jobDelay
})

Obviously this approach does not guarantee an atomic read+remove+add operation, and my test cases definitely repro race conditions when experiencing high stream throughput (i.e. > 1 delayedJob is present in the queue). I want to preserve the trailing edge since it contains most recent jobData state, so it's critical that the last event is queued.

Maybe a less transient approach could be read+update / read+add but there is currently no way to update the delay. There is an open issue in bull for this OptimalBits/bull#1733

I assume this feature will require lua scripting, but I have zero experience there and very minimal experience with Redis data structures in general.

If there is an alternative approach or advice on tackling the lua script I would be happy to hear/discuss it.

@jamesholcomb
Copy link
Author

jamesholcomb commented Nov 25, 2020

Related issue OptimalBits/bull#1034 was funded on IssueHunt.

@manast
Copy link
Contributor

manast commented Nov 25, 2020

What about the case when the job has already started and suddenly a new jobs comes in, should this last job be ignored, executed after the current one that has started has completed or cancel the current one replacing it?

@jamesholcomb
Copy link
Author

How about for the debounce scenario, if a job has started, it has passed the debounce window, so an upsert request would be a read+add op. The read op within the upsert would return no results for jobs that have already started.

@manast
Copy link
Contributor

manast commented Nov 26, 2020

@jamesholcomb ok, so you mean that if the the job has started is because the debounce window has "expired" and now adding would imply adding a new job whereas the one that has started will continue normally until completion?

@jamesholcomb
Copy link
Author

Yes.

@roggervalf
Copy link
Collaborator

as debounce option is available since v5.11.0 I can close this issue

@andrelung
Copy link

Did this solve the issue? As I understand it, debounce ignores new jobs. In an event stream it rather should ignore the older jobs, right?
(at least that's my use case)

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

No branches or pull requests

4 participants