Skip to content

Commit

Permalink
minor(history): split HistoryListener
Browse files Browse the repository at this point in the history
  • Loading branch information
kbond committed Nov 24, 2024
1 parent 3392bd0 commit 05f50eb
Show file tree
Hide file tree
Showing 10 changed files with 311 additions and 206 deletions.
18 changes: 13 additions & 5 deletions config/storage_orm.php
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,9 @@
use Symfony\Component\Messenger\Event\WorkerMessageReceivedEvent;
use Zenstruck\Messenger\Monitor\Command\PurgeCommand;
use Zenstruck\Messenger\Monitor\Command\SchedulePurgeCommand;
use Zenstruck\Messenger\Monitor\History\HistoryListener;
use Zenstruck\Messenger\Monitor\EventListener\AddMonitorStampListener;
use Zenstruck\Messenger\Monitor\EventListener\HandleMonitorStampListener;
use Zenstruck\Messenger\Monitor\EventListener\ReceiveMonitorStampListener;
use Zenstruck\Messenger\Monitor\History\ResultNormalizer;
use Zenstruck\Messenger\Monitor\History\Storage;
use Zenstruck\Messenger\Monitor\History\Storage\ORMStorage;
Expand All @@ -29,14 +31,20 @@
->set('.zenstruck_messenger_monitor.history.result_normalizer', ResultNormalizer::class)
->args([param('kernel.project_dir')])

->set('.zenstruck_messenger_monitor.history.listener', HistoryListener::class)
->set('.zenstruck_messenger_monitor.listener.add_monitor_stamp', AddMonitorStampListener::class)
->tag('kernel.event_listener', ['method' => '__invoke', 'event' => SendMessageToTransportsEvent::class])

->set('.zenstruck_messenger_monitor.listener.receive_monitor_stamp', ReceiveMonitorStampListener::class)
->args([
abstract_arg('exclude_classes')
])
->tag('kernel.event_listener', ['method' => '__invoke', 'event' => WorkerMessageReceivedEvent::class])

->set('.zenstruck_messenger_monitor.listener.handle_monitor_stamp', HandleMonitorStampListener::class)
->args([
service('zenstruck_messenger_monitor.history.storage'),
service('.zenstruck_messenger_monitor.history.result_normalizer'),
abstract_arg('exclude_classes'),
])
->tag('kernel.event_listener', ['method' => 'addMonitorStamp', 'event' => SendMessageToTransportsEvent::class])
->tag('kernel.event_listener', ['method' => 'receiveMessage', 'event' => WorkerMessageReceivedEvent::class])
->tag('kernel.event_listener', ['method' => 'handleSuccess', 'event' => WorkerMessageHandledEvent::class])
->tag('kernel.event_listener', ['method' => 'handleFailure', 'event' => WorkerMessageFailedEvent::class])

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -103,8 +103,8 @@ protected function loadInternal(array $mergedConfig, ContainerBuilder $container
$container->getDefinition('zenstruck_messenger_monitor.history.storage')
->setArgument(1, $entity)
;
$container->getDefinition('.zenstruck_messenger_monitor.history.listener')
->setArgument(2, $mergedConfig['storage']['exclude'])
$container->getDefinition('.zenstruck_messenger_monitor.listener.receive_monitor_stamp')
->setArgument(0, $mergedConfig['storage']['exclude'])
;

if (!\class_exists(Schedule::class)) {
Expand Down
28 changes: 28 additions & 0 deletions src/EventListener/AddMonitorStampListener.php
Original file line number Diff line number Diff line change
@@ -0,0 +1,28 @@
<?php

/*
* This file is part of the zenstruck/messenger-monitor-bundle package.
*
* (c) Kevin Bond <[email protected]>
*
* For the full copyright and license information, please view the LICENSE
* file that was distributed with this source code.
*/

namespace Zenstruck\Messenger\Monitor\EventListener;

use Symfony\Component\Messenger\Event\SendMessageToTransportsEvent;
use Zenstruck\Messenger\Monitor\Stamp\MonitorStamp;

/**
* @author Kevin Bond <[email protected]>
*
* @internal
*/
final class AddMonitorStampListener
{
public function __invoke(SendMessageToTransportsEvent $event): void
{
$event->setEnvelope($event->getEnvelope()->with(new MonitorStamp()));
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -9,68 +9,31 @@
* file that was distributed with this source code.
*/

namespace Zenstruck\Messenger\Monitor\History;
namespace Zenstruck\Messenger\Monitor\EventListener;

use Symfony\Component\Messenger\Envelope;
use Symfony\Component\Messenger\Event\SendMessageToTransportsEvent;
use Symfony\Component\Messenger\Event\WorkerMessageFailedEvent;
use Symfony\Component\Messenger\Event\WorkerMessageHandledEvent;
use Symfony\Component\Messenger\Event\WorkerMessageReceivedEvent;
use Symfony\Component\Messenger\Exception\HandlerFailedException;
use Symfony\Component\Messenger\Stamp\HandledStamp;
use Symfony\Component\Scheduler\Messenger\ScheduledStamp;
use Zenstruck\Messenger\Monitor\History\Model\Result;
use Zenstruck\Messenger\Monitor\History\Model\Results;
use Zenstruck\Messenger\Monitor\Stamp\DisableMonitoringStamp;
use Zenstruck\Messenger\Monitor\History\ResultNormalizer;
use Zenstruck\Messenger\Monitor\History\Storage;
use Zenstruck\Messenger\Monitor\Stamp\MonitorStamp;
use Zenstruck\Messenger\Monitor\Stamp\TagStamp;

/**
* @author Kevin Bond <[email protected]>
*
* @internal
*
* @phpstan-import-type Structure from Result
*/
final class HistoryListener
final class HandleMonitorStampListener
{
/**
* @param class-string[] $excludedClasses
*/
public function __construct(
private Storage $storage,
private ResultNormalizer $normalizer,
private array $excludedClasses,
) {
}

public function addMonitorStamp(SendMessageToTransportsEvent $event): void
{
$event->setEnvelope($event->getEnvelope()->with(new MonitorStamp()));
}

public function receiveMessage(WorkerMessageReceivedEvent $event): void
{
$envelope = $event->getEnvelope();

if ($this->isMonitoringDisabled($envelope)) {
return;
}

$stamp = $envelope->last(MonitorStamp::class);

if (\class_exists(ScheduledStamp::class) && $scheduledStamp = $envelope->last(ScheduledStamp::class)) {
// scheduler transport doesn't trigger SendMessageToTransportsEvent
$stamp = new MonitorStamp($scheduledStamp->messageContext->triggeredAt);

$event->addStamps(TagStamp::forSchedule($scheduledStamp));
}

if ($stamp instanceof MonitorStamp) {
$event->addStamps($stamp->markReceived($event->getReceiverName()));
}
}

public function handleSuccess(WorkerMessageHandledEvent $event): void
{
if (!$stamp = $event->getEnvelope()->last(MonitorStamp::class)) {
Expand Down Expand Up @@ -107,29 +70,6 @@ public function handleFailure(WorkerMessageFailedEvent $event): void
);
}

private function isMonitoringDisabled(Envelope $envelope): bool
{
$messageClass = $envelope->getMessage()::class;

foreach ($this->excludedClasses as $excludedClass) {
if (\is_a($messageClass, $excludedClass, true)) {
return true;
}
}

$stamp = $envelope->last(DisableMonitoringStamp::class) ?? DisableMonitoringStamp::getFor($messageClass);

if (!$stamp) {
return false;
}

if ($stamp->onlyWhenNoHandler && !$this->hasNoHandlers($envelope)) {
return false;
}

return true;
}

private function createResults(Envelope $envelope, ?HandlerFailedException $exception = null): Results
{
$results = [];
Expand Down Expand Up @@ -157,9 +97,4 @@ private function createResults(Envelope $envelope, ?HandlerFailedException $exce

return new Results($results);
}

private function hasNoHandlers(Envelope $envelope): bool
{
return [] === $envelope->all(HandledStamp::class);
}
}
85 changes: 85 additions & 0 deletions src/EventListener/ReceiveMonitorStampListener.php
Original file line number Diff line number Diff line change
@@ -0,0 +1,85 @@
<?php

/*
* This file is part of the zenstruck/messenger-monitor-bundle package.
*
* (c) Kevin Bond <[email protected]>
*
* For the full copyright and license information, please view the LICENSE
* file that was distributed with this source code.
*/

namespace Zenstruck\Messenger\Monitor\EventListener;

use Symfony\Component\Messenger\Envelope;
use Symfony\Component\Messenger\Event\WorkerMessageReceivedEvent;
use Symfony\Component\Messenger\Stamp\HandledStamp;
use Symfony\Component\Scheduler\Messenger\ScheduledStamp;
use Zenstruck\Messenger\Monitor\Stamp\DisableMonitoringStamp;
use Zenstruck\Messenger\Monitor\Stamp\MonitorStamp;
use Zenstruck\Messenger\Monitor\Stamp\TagStamp;

/**
* @author Kevin Bond <[email protected]>
*
* @internal
*/
final class ReceiveMonitorStampListener
{
/**
* @param class-string[] $excludedClasses
*/
public function __construct(private array $excludedClasses)
{
}

public function __invoke(WorkerMessageReceivedEvent $event): void
{
$envelope = $event->getEnvelope();

if ($this->isMonitoringDisabled($envelope)) {
return;
}

$stamp = $envelope->last(MonitorStamp::class);

if (\class_exists(ScheduledStamp::class) && $scheduledStamp = $envelope->last(ScheduledStamp::class)) {
// scheduler transport doesn't trigger SendMessageToTransportsEvent
$stamp = new MonitorStamp($scheduledStamp->messageContext->triggeredAt);

$event->addStamps(TagStamp::forSchedule($scheduledStamp));
}

if ($stamp instanceof MonitorStamp) {
$event->addStamps($stamp->markReceived($event->getReceiverName()));
}
}

private function isMonitoringDisabled(Envelope $envelope): bool
{
$messageClass = $envelope->getMessage()::class;

foreach ($this->excludedClasses as $excludedClass) {
if (\is_a($messageClass, $excludedClass, true)) {
return true;
}
}

$stamp = $envelope->last(DisableMonitoringStamp::class) ?? DisableMonitoringStamp::getFor($messageClass);

if (!$stamp) {
return false;
}

if ($stamp->onlyWhenNoHandler && !$this->hasNoHandlers($envelope)) {
return false;
}

return true;
}

private function hasNoHandlers(Envelope $envelope): bool
{
return [] === $envelope->all(HandledStamp::class);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,9 @@
use PHPUnit\Framework\Constraint\LogicalNot;
use Symfony\Component\Config\Definition\Exception\InvalidConfigurationException;
use Zenstruck\Messenger\Monitor\DependencyInjection\ZenstruckMessengerMonitorExtension;
use Zenstruck\Messenger\Monitor\History\HistoryListener;
use Zenstruck\Messenger\Monitor\EventListener\AddMonitorStampListener;
use Zenstruck\Messenger\Monitor\EventListener\HandleMonitorStampListener;
use Zenstruck\Messenger\Monitor\EventListener\ReceiveMonitorStampListener;
use Zenstruck\Messenger\Monitor\History\Model\ProcessedMessage;
use Zenstruck\Messenger\Monitor\History\Storage;
use Zenstruck\Messenger\Monitor\History\Storage\ORMStorage;
Expand All @@ -40,7 +42,9 @@ public function no_config(): void
$this->assertContainerBuilderHasAlias(Transports::class, 'zenstruck_messenger_monitor.transports');
$this->assertContainerBuilderHasAlias(Workers::class, 'zenstruck_messenger_monitor.workers');
$this->assertThat($this->container, new LogicalNot(new ContainerBuilderHasAliasConstraint(Storage::class)));
$this->assertThat($this->container, new LogicalNot(new ContainerBuilderHasServiceDefinitionConstraint('.zenstruck_messenger_monitor.history.listener')));
$this->assertThat($this->container, new LogicalNot(new ContainerBuilderHasServiceDefinitionConstraint('.zenstruck_messenger_monitor.listener.add_monitor_stamp')));
$this->assertThat($this->container, new LogicalNot(new ContainerBuilderHasServiceDefinitionConstraint('.zenstruck_messenger_monitor.listener.receive_monitor_stamp')));
$this->assertThat($this->container, new LogicalNot(new ContainerBuilderHasServiceDefinitionConstraint('.zenstruck_messenger_monitor.listener.handle_monitor_stamp')));
}

/**
Expand All @@ -55,8 +59,10 @@ public function orm_config(): void
$this->assertContainerBuilderHasService('zenstruck_messenger_monitor.history.storage', ORMStorage::class);
$this->assertContainerBuilderHasServiceDefinitionWithArgument('zenstruck_messenger_monitor.history.storage', 1, ProcessedMessageImpl::class);
$this->assertContainerBuilderHasAlias(Storage::class, 'zenstruck_messenger_monitor.history.storage');
$this->assertContainerBuilderHasService('.zenstruck_messenger_monitor.history.listener', HistoryListener::class);
$this->assertContainerBuilderHasServiceDefinitionWithArgument('.zenstruck_messenger_monitor.history.listener', 2, []);
$this->assertContainerBuilderHasService('.zenstruck_messenger_monitor.listener.add_monitor_stamp', AddMonitorStampListener::class);
$this->assertContainerBuilderHasService('.zenstruck_messenger_monitor.listener.receive_monitor_stamp', ReceiveMonitorStampListener::class);
$this->assertContainerBuilderHasServiceDefinitionWithArgument('.zenstruck_messenger_monitor.listener.receive_monitor_stamp', 0, []);
$this->assertContainerBuilderHasService('.zenstruck_messenger_monitor.listener.handle_monitor_stamp', HandleMonitorStampListener::class);
}

/**
Expand All @@ -69,7 +75,7 @@ public function storage_with_excluded_classes(): void
'exclude' => ['stdClass'],
]]);

$this->assertContainerBuilderHasServiceDefinitionWithArgument('.zenstruck_messenger_monitor.history.listener', 2, [\stdClass::class]);
$this->assertContainerBuilderHasServiceDefinitionWithArgument('.zenstruck_messenger_monitor.listener.receive_monitor_stamp', 0, [\stdClass::class]);
}

/**
Expand Down
40 changes: 40 additions & 0 deletions tests/Unit/EventListener/AddMonitorStampListenerTest.php
Original file line number Diff line number Diff line change
@@ -0,0 +1,40 @@
<?php

/*
* This file is part of the zenstruck/messenger-monitor-bundle package.
*
* (c) Kevin Bond <[email protected]>
*
* For the full copyright and license information, please view the LICENSE
* file that was distributed with this source code.
*/

namespace Zenstruck\Messenger\Monitor\Tests\Unit\EventListener;

use PHPUnit\Framework\TestCase;
use Symfony\Component\Messenger\Envelope;
use Symfony\Component\Messenger\Event\SendMessageToTransportsEvent;
use Zenstruck\Messenger\Monitor\EventListener\AddMonitorStampListener;
use Zenstruck\Messenger\Monitor\Stamp\MonitorStamp;

/**
* @author Kevin Bond <[email protected]>
*/
final class AddMonitorStampListenerTest extends TestCase
{
/**
* @test
*/
public function adds_monitor_stamp(): void
{
$listener = new AddMonitorStampListener();
$envelope = new Envelope(new \stdClass());
$event = new SendMessageToTransportsEvent($envelope, []);

$this->assertNull($event->getEnvelope()->last(MonitorStamp::class));

$listener->__invoke($event);

$this->assertInstanceOf(MonitorStamp::class, $event->getEnvelope()->last(MonitorStamp::class));
}
}
Loading

0 comments on commit 05f50eb

Please sign in to comment.