Skip to content

Commit

Permalink
Improve amqp, use methods instead of $delivery_info and optimize `B…
Browse files Browse the repository at this point in the history
…eforeConsume` event (#6239)
  • Loading branch information
huangdijia authored Oct 30, 2023
1 parent 58b406a commit 08c9f4d
Show file tree
Hide file tree
Showing 3 changed files with 13 additions and 9 deletions.
10 changes: 5 additions & 5 deletions src/Consumer.php
Original file line number Diff line number Diff line change
Expand Up @@ -165,17 +165,17 @@ protected function getConcurrent(string $pool): ?Concurrent
protected function getCallback(ConsumerMessageInterface $consumerMessage, AMQPMessage $message): callable
{
return function () use ($consumerMessage, $message) {
$data = $consumerMessage->unserialize($message->getBody());
/** @var AMQPChannel $channel */
$channel = $message->delivery_info['channel'];
$deliveryTag = $message->delivery_info['delivery_tag'];
$channel = $message->getChannel();
$deliveryTag = $message->getDeliveryTag();

try {
$data = $consumerMessage->unserialize($message->getBody());

$this->eventDispatcher?->dispatch(new BeforeConsume($consumerMessage));
$result = $consumerMessage->consumeMessage($data, $message);
$this->eventDispatcher?->dispatch(new AfterConsume($consumerMessage, $result));
} catch (Throwable $exception) {
$this->eventDispatcher?->dispatch(new FailToConsume($consumerMessage, $exception));
$this->eventDispatcher?->dispatch(new FailToConsume($consumerMessage, $exception, $message));
if ($this->container->has(FormatterInterface::class)) {
$formatter = $this->container->get(FormatterInterface::class);
$this->logger->error($formatter->format($exception));
Expand Down
8 changes: 7 additions & 1 deletion src/Event/FailToConsume.php
Original file line number Diff line number Diff line change
Expand Up @@ -12,11 +12,12 @@
namespace Hyperf\Amqp\Event;

use Hyperf\Amqp\Message\ConsumerMessageInterface;
use PhpAmqpLib\Message\AMQPMessage;
use Throwable;

class FailToConsume extends ConsumeEvent
{
public function __construct(ConsumerMessageInterface $message, protected Throwable $throwable)
public function __construct(ConsumerMessageInterface $message, protected Throwable $throwable, protected AMQPMessage $amqpMessage)
{
parent::__construct($message);
}
Expand All @@ -25,4 +26,9 @@ public function getThrowable(): Throwable
{
return $this->throwable;
}

public function getAMQPMessage(): AMQPMessage
{
return $this->amqpMessage;
}
}
4 changes: 1 addition & 3 deletions src/Message/ConsumerMessage.php
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,6 @@
use Hyperf\Amqp\Packer\Packer;
use Hyperf\Amqp\Result;
use Hyperf\Context\ApplicationContext;
use PhpAmqpLib\Channel\AMQPChannel;
use PhpAmqpLib\Message\AMQPMessage;
use Psr\Container\ContainerInterface;

Expand Down Expand Up @@ -151,8 +150,7 @@ protected function reply($data, AMQPMessage $message)
{
$packer = ApplicationContext::getContainer()->get(Packer::class);

/** @var AMQPChannel $channel */
$channel = $message->delivery_info['channel'];
$channel = $message->getChannel();
$channel->basic_publish(
new AMQPMessage($packer->pack($data), [
'correlation_id' => $message->get('correlation_id'),
Expand Down

0 comments on commit 08c9f4d

Please sign in to comment.