Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

FEATURE: Allow jobManager to be interrupted by system SIGINT #48

Open
wants to merge 1 commit into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
12 changes: 12 additions & 0 deletions Classes/Command/JobCommandController.php
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@
*/

use Flowpack\JobQueue\Common\Exception as JobQueueException;
use Flowpack\JobQueue\Common\InterruptException;
use Flowpack\JobQueue\Common\Job\JobManager;
use Flowpack\JobQueue\Common\Queue\Message;
use Flowpack\JobQueue\Common\Queue\QueueManager;
Expand Down Expand Up @@ -74,6 +75,11 @@ public function workCommand($queue, $exitAfter = null, $limit = null, $verbose =
}
$this->outputLine('...');
}
if (function_exists('pcntl_signal')) {
pcntl_signal(SIGINT, static function () {
throw new InterruptException('Interrupted by SIGINT');
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
throw new InterruptException('Interrupted by SIGINT');
throw new InterruptException('Interrupted by SIGINT', 1602072222);

I forgot in my first round: This is missing a unique exception number. We usually add the unix timestamp to the throw calls in order to be able to trace them back to the calling side more easily

});
}
$startTime = time();
$timeout = null;
$numberOfJobExecutions = 0;
Expand All @@ -84,6 +90,12 @@ public function workCommand($queue, $exitAfter = null, $limit = null, $verbose =
}
try {
$message = $this->jobManager->waitAndExecute($queue, $timeout);
$this->jobManager->interruptMe();
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why is this method called here? This will be invoked for every successfully processed message in the queue. Does that make sense?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yeah. By calling this message, it allows the jobManager to be interrputed in this (and only in this) moment. And this is definitely a safe moment, because the last message has been "completed" (maybe by timeout, or successful execution). Usually it will probably simply do nothing.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

OK, I think my confusion came from the name since this doesn't always actually interrupt obviously.

} catch (InterruptException $exception) {
if ($verbose) {
$this->outputLine('Quitting after %d seconds due to received interruption', [time() - $startTime]);
}
$this->quit();
} catch (JobQueueException $exception) {
$numberOfJobExecutions ++;
$this->outputLine('<error>%s</error>', [$exception->getMessage()]);
Expand Down
8 changes: 8 additions & 0 deletions Classes/InterruptException.php
Original file line number Diff line number Diff line change
@@ -0,0 +1,8 @@
<?php

namespace Flowpack\JobQueue\Common;

class InterruptException extends Exception
{

}
13 changes: 13 additions & 0 deletions Classes/Job/JobManager.php
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@
* source code.
*/

use Flowpack\JobQueue\Common\InterruptException;
use Flowpack\JobQueue\Common\Queue\QueueInterface;
use Neos\Cache\Frontend\VariableFrontend;
use Neos\Flow\Annotations as Flow;
Expand Down Expand Up @@ -175,6 +176,18 @@ public function peek(string $queueName, int $limit = 1): array
}, $messages);
}

/**
* This method is here to be called by queues if they reached local polling timeouts, if this applies
*
* @throws InterruptException
bwaidelich marked this conversation as resolved.
Show resolved Hide resolved
*/
public function interruptMe(): void
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nitpick: I find the name interruptMe a little weird. Why not just JobManager::interrupt()

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Naming things is always the hardes task in programming. The idea had been, to find a name, which makes clear for the using classes, that this is more a less a signal to the job manager, that this would be a safe point for an interruption. Your suggestion is totally fine for me, if this makes more sense for you.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I was confused by the imperative verb. But since this doesn't actually interrupt anything I would suggest to call this something like processPcntlSignals() or so instead.. But this leaves me wondering: Why do we need this methond in the JobManager at all? Can't we just move this line to the CommandController?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It is there to make it easy for queues to signal their "safe state" to the job manager. They should not need to be aware of the CommandController. And if no Signal handler was registered, it just do nothing, so means no harm to call.

If you think the signal handler could be managed by the JobManager I'm with you. I had reasons to nut put it there - but I currently do not remember

{
if (function_exists('pcntl_signal_dispatch')) {
pcntl_signal_dispatch();
}
}

/**
* Signal that is triggered when a message has been submitted to a queue
*
Expand Down