Skip to content

Commit

Permalink
refacto filters and category import
Browse files Browse the repository at this point in the history
  • Loading branch information
TheGrimmChester committed Jul 4, 2024
1 parent da0450d commit 4da43a3
Show file tree
Hide file tree
Showing 42 changed files with 707 additions and 323 deletions.
1 change: 1 addition & 0 deletions src/Command/AbstractImportCommand.php
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,7 @@ protected function configure(): void
->addOption('parallel', 'p', InputOption::VALUE_NONE, 'Allow parallel task processing')
->addOption('disable-batch', 'd', InputOption::VALUE_NONE, 'Disable batch processing')
->addOption('batch-size', 's', InputOption::VALUE_OPTIONAL, 'Batch Size', 100)
->addOption('from-page', null, InputOption::VALUE_OPTIONAL, 'From page', 1)
->addOption('max-concurrency', 'c', InputOption::VALUE_OPTIONAL, 'Max process concurrency', 5)
->addOption('batch-after-fetch', 'a', InputOption::VALUE_OPTIONAL, 'Fetch all pages then start processing the batches', true)
->addOption('filter', 'f', InputOption::VALUE_OPTIONAL | InputOption::VALUE_IS_ARRAY, 'Add filter')
Expand Down
2 changes: 2 additions & 0 deletions src/Command/BatchImportAssociationTypesCommand.php
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@
use Synolia\SyliusAkeneoPlugin\Client\ClientFactoryInterface;
use Synolia\SyliusAkeneoPlugin\Payload\Association\AssociationTypePayload;
use Synolia\SyliusAkeneoPlugin\Task\AssociationType\BatchAssociationTypesTask;
use Webmozart\Assert\Assert;

final class BatchImportAssociationTypesCommand extends AbstractBatchCommand
{
Expand All @@ -33,6 +34,7 @@ protected function execute(
InputInterface $input,
OutputInterface $output,
) {
Assert::string($input->getArgument('ids'));
$ids = explode(',', $input->getArgument('ids'));

$this->logger->notice('Processing batch', ['from_id' => $ids[0], 'to_id' => $ids[\count($ids) - 1]]);
Expand Down
2 changes: 2 additions & 0 deletions src/Command/BatchImportAttributesCommand.php
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@
use Synolia\SyliusAkeneoPlugin\Client\ClientFactoryInterface;
use Synolia\SyliusAkeneoPlugin\Payload\Attribute\AttributePayload;
use Synolia\SyliusAkeneoPlugin\Task\Attribute\BatchAttributesTask;
use Webmozart\Assert\Assert;

final class BatchImportAttributesCommand extends AbstractBatchCommand
{
Expand All @@ -33,6 +34,7 @@ protected function execute(
InputInterface $input,
OutputInterface $output,
) {
Assert::string($input->getArgument('ids'));
$ids = explode(',', $input->getArgument('ids'));

$this->logger->notice('Processing batch', ['from_id' => $ids[0], 'to_id' => $ids[\count($ids) - 1]]);
Expand Down
50 changes: 50 additions & 0 deletions src/Command/BatchImportCategoriesCommand.php
Original file line number Diff line number Diff line change
@@ -0,0 +1,50 @@
<?php

declare(strict_types=1);

namespace Synolia\SyliusAkeneoPlugin\Command;

use Psr\Log\LoggerInterface;
use Symfony\Component\Console\Input\InputInterface;
use Symfony\Component\Console\Output\OutputInterface;
use Synolia\SyliusAkeneoPlugin\Client\ClientFactoryInterface;
use Synolia\SyliusAkeneoPlugin\Payload\Category\CategoryPayload;
use Synolia\SyliusAkeneoPlugin\Task\Category\BatchCategoriesTask;
use Webmozart\Assert\Assert;

final class BatchImportCategoriesCommand extends AbstractBatchCommand
{
protected static $defaultDescription = 'Import batch categories ids from Akeneo PIM.';

/** @var string */
protected static $defaultName = 'akeneo:batch:categories';

public function __construct(
private ClientFactoryInterface $clientFactory,
private LoggerInterface $logger,
private BatchCategoriesTask $task,
) {
parent::__construct(self::$defaultName);
}

/**
* {@inheritdoc}
*/
protected function execute(
InputInterface $input,
OutputInterface $output,
) {
Assert::string($input->getArgument('ids'));
$ids = explode(',', $input->getArgument('ids'));

$this->logger->notice('Processing batch', ['from_id' => $ids[0], 'to_id' => $ids[\count($ids) - 1]]);
$this->logger->debug(self::$defaultName, ['batched_ids' => $ids]);

$payload = new CategoryPayload($this->clientFactory->createFromApiCredentials());
$payload->setIds($ids);

$this->task->__invoke($payload);

return 0;
}
}
2 changes: 2 additions & 0 deletions src/Command/BatchImportProductModelsCommand.php
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@
use Synolia\SyliusAkeneoPlugin\Client\ClientFactoryInterface;
use Synolia\SyliusAkeneoPlugin\Payload\ProductModel\ProductModelPayload;
use Synolia\SyliusAkeneoPlugin\Task\ProductModel\BatchProductModelTask;
use Webmozart\Assert\Assert;

final class BatchImportProductModelsCommand extends AbstractBatchCommand
{
Expand All @@ -33,6 +34,7 @@ protected function execute(
InputInterface $input,
OutputInterface $output,
) {
Assert::string($input->getArgument('ids'));
$ids = explode(',', $input->getArgument('ids'));

$this->logger->notice('Processing batch', ['from_id' => $ids[0], 'to_id' => $ids[\count($ids) - 1]]);
Expand Down
2 changes: 2 additions & 0 deletions src/Command/BatchImportProductsCommand.php
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@
use Synolia\SyliusAkeneoPlugin\Client\ClientFactoryInterface;
use Synolia\SyliusAkeneoPlugin\Payload\Product\ProductPayload;
use Synolia\SyliusAkeneoPlugin\Task\Product\BatchProductsTask;
use Webmozart\Assert\Assert;

final class BatchImportProductsCommand extends AbstractBatchCommand
{
Expand All @@ -33,6 +34,7 @@ protected function execute(
InputInterface $input,
OutputInterface $output,
) {
Assert::string($input->getArgument('ids'));
$ids = explode(',', $input->getArgument('ids'));

$this->logger->notice('Processing batch', ['from_id' => $ids[0], 'to_id' => $ids[\count($ids) - 1]]);
Expand Down
50 changes: 19 additions & 31 deletions src/Command/ImportCategoriesCommand.php
Original file line number Diff line number Diff line change
Expand Up @@ -5,60 +5,48 @@
namespace Synolia\SyliusAkeneoPlugin\Command;

use Psr\Log\LoggerInterface;
use Symfony\Component\Console\Command\Command;
use Symfony\Component\Console\Command\LockableTrait;
use Symfony\Component\Console\Input\InputInterface;
use Symfony\Component\Console\Output\OutputInterface;
use Synolia\SyliusAkeneoPlugin\Client\ClientFactoryInterface;
use Synolia\SyliusAkeneoPlugin\Exceptions\Command\CommandLockedException;
use Synolia\SyliusAkeneoPlugin\Factory\CategoryPipelineFactory;
use Synolia\SyliusAkeneoPlugin\Logger\Messages;
use Synolia\SyliusAkeneoPlugin\Factory\PayloadFactoryInterface;
use Synolia\SyliusAkeneoPlugin\Payload\Category\CategoryPayload;

final class ImportCategoriesCommand extends Command
final class ImportCategoriesCommand extends AbstractImportCommand
{
use LockableTrait;

private const DESCRIPTION = 'Import Categories from Akeneo PIM.';
protected static $defaultDescription = 'Import Categories from Akeneo PIM.';

/** @var string */
protected static $defaultName = 'akeneo:import:categories';

public function __construct(
private CategoryPipelineFactory $categoryPipelineFactory,
private ClientFactoryInterface $clientFactory,
private LoggerInterface $logger,
CategoryPipelineFactory $pipelineFactory,
LoggerInterface $akeneoLogger,
PayloadFactoryInterface $payloadFactory,
) {
parent::__construct(self::$defaultName);
}

protected function configure(): void
{
$this->setDescription(self::DESCRIPTION);
parent::__construct($akeneoLogger, $payloadFactory, $pipelineFactory, self::$defaultName);
}

/**
* {@inheritdoc}
*/
protected function execute(
InputInterface $input,
OutputInterface $output,
) {
if (!$this->lock()) {
$output->writeln(Messages::commandAlreadyRunning());

return 0;
}
protected function execute(InputInterface $input, OutputInterface $output): int
{
try {
$this->preExecute();

$this->logger->notice(self::$defaultName);
/** @var \League\Pipeline\Pipeline $categoryPipeline */
$categoryPipeline = $this->categoryPipelineFactory->create();
$payload = $this->payloadFactory->createFromCommand(CategoryPayload::class, $input, $output);
$this->pipeline->process($payload);

/** @var \Synolia\SyliusAkeneoPlugin\Payload\Category\CategoryPayload $categoryPayload */
$categoryPayload = new CategoryPayload($this->clientFactory->createFromApiCredentials());
$categoryPipeline->process($categoryPayload);
$this->postExecute();
} catch (CommandLockedException $commandLockedException) {
$this->logger->warning($commandLockedException->getMessage());

$this->logger->notice(Messages::endOfCommand(self::$defaultName));
$this->release();
return 1;
}

return 0;
}
Expand Down
4 changes: 4 additions & 0 deletions src/Configuration/ConfigurationContextInterface.php
Original file line number Diff line number Diff line change
Expand Up @@ -43,4 +43,8 @@ public function setFilters(array $filters): self;
public function getHandler(): string;

public function setHandler(string $handler): self;

public function getFromPage(): int;

public function setFromPage(int $fromPage): self;
}
14 changes: 14 additions & 0 deletions src/Configuration/ConfigurationContextTrait.php
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,8 @@ trait ConfigurationContextTrait

private string $handler = SymfonyProcessTaskHandler::HANDLER_CODE;

private int $fromPage = 1;

public function getBatchSize(): int
{
return $this->batchSize;
Expand Down Expand Up @@ -147,4 +149,16 @@ public function setHandler(string $handler): ConfigurationContextInterface

return $this;
}

public function getFromPage(): int
{
return $this->fromPage;
}

public function setFromPage(int $fromPage): ConfigurationContextInterface
{
$this->fromPage = $fromPage;

return $this;
}
}
10 changes: 6 additions & 4 deletions src/Factory/CategoryPipelineFactory.php
Original file line number Diff line number Diff line change
Expand Up @@ -7,8 +7,9 @@
use League\Pipeline\Pipeline;
use League\Pipeline\PipelineInterface;
use Synolia\SyliusAkeneoPlugin\Pipeline\Processor;
use Synolia\SyliusAkeneoPlugin\Task\Category\CreateUpdateEntityTask;
use Synolia\SyliusAkeneoPlugin\Task\Category\RetrieveCategoriesTask;
use Synolia\SyliusAkeneoPlugin\Task\Category\ProcessCategoriesTask;
use Synolia\SyliusAkeneoPlugin\Task\SetupTask;
use Synolia\SyliusAkeneoPlugin\Task\TearDownTask;

final class CategoryPipelineFactory extends AbstractPipelineFactory
{
Expand All @@ -17,8 +18,9 @@ public function create(): PipelineInterface
$pipeline = new Pipeline(new Processor($this->dispatcher));

return $pipeline
->pipe($this->taskProvider->get(RetrieveCategoriesTask::class))
->pipe($this->taskProvider->get(CreateUpdateEntityTask::class))
->pipe($this->taskProvider->get(SetupTask::class))
->pipe($this->taskProvider->get(ProcessCategoriesTask::class))
->pipe($this->taskProvider->get(TearDownTask::class))
;
}
}
5 changes: 3 additions & 2 deletions src/Factory/PayloadFactory.php
Original file line number Diff line number Diff line change
Expand Up @@ -50,8 +50,9 @@ private function createContext(
->setAllowParallel($isParallelAllowed)
->setBatchingAllowed($isBatchingAllowed)
->setProcessAsSoonAsPossible(filter_var($batchAfterFetch, FILTER_VALIDATE_BOOLEAN))
->setBatchSize((int) $input->getOption('batch-size'))
->setMaxRunningProcessQueueSize((int) $input->getOption('max-concurrency'))
->setBatchSize((int) $input->getOption('batch-size')) /** @phpstan-ignore-line Cannot cast mixed to int */
->setFromPage((int) $input->getOption('from-page')) /** @phpstan-ignore-line Cannot cast mixed to int */
->setMaxRunningProcessQueueSize((int) $input->getOption('max-concurrency')) /** @phpstan-ignore-line Cannot cast mixed to int */
->setFilters((array) ($input->getOption('filter') ?: []))
->setHandler($input->getOption('handler') ?? $context->getHandler())
;
Expand Down
5 changes: 2 additions & 3 deletions src/Handler/Task/SymfonyMessengerTaskHandler.php
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,6 @@

use Akeneo\Pim\ApiClient\Pagination\Page;
use Akeneo\Pim\ApiClient\Pagination\PageInterface;
use Akeneo\Pim\ApiClient\Pagination\ResourceCursorInterface;
use Doctrine\ORM\EntityManagerInterface;
use Psr\Log\LoggerInterface;
use Symfony\Component\Messenger\MessageBusInterface;
Expand Down Expand Up @@ -37,7 +36,7 @@ public function batch(PipelinePayloadInterface $pipelinePayload, array $items):

public function handle(
PipelinePayloadInterface $pipelinePayload,
ResourceCursorInterface|PageInterface $handleType,
iterable|PageInterface $handleType,
): void {
$count = 0;
$items = [];
Expand Down Expand Up @@ -83,7 +82,7 @@ private function handleByPage(

private function handleByCursor(
PipelinePayloadInterface $payload,
ResourceCursorInterface $resourceCursor,
iterable $resourceCursor,
int &$count = 0,
array &$items = [],
): void {
Expand Down
5 changes: 2 additions & 3 deletions src/Handler/Task/SymfonyProcessTaskHandler.php
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,6 @@

use Akeneo\Pim\ApiClient\Pagination\Page;
use Akeneo\Pim\ApiClient\Pagination\PageInterface;
use Akeneo\Pim\ApiClient\Pagination\ResourceCursorInterface;
use Doctrine\DBAL\ParameterType;
use Doctrine\DBAL\Result;
use Doctrine\DBAL\Statement;
Expand Down Expand Up @@ -114,7 +113,7 @@ public function continue(PipelinePayloadInterface $pipelinePayload): void
*/
public function handle(
PipelinePayloadInterface $pipelinePayload,
ResourceCursorInterface|PageInterface $handleType,
iterable|PageInterface $handleType,
): void {
$this->processManager->setInstantProcessing($pipelinePayload->getProcessAsSoonAsPossible());
$this->processManager->setNumberOfParallelProcesses($pipelinePayload->getMaxRunningProcessQueueSize());
Expand Down Expand Up @@ -197,7 +196,7 @@ private function handleByPage(

private function handleByCursor(
PipelinePayloadInterface $payload,
ResourceCursorInterface $resourceCursor,
iterable $resourceCursor,
int &$count = 0,
array &$ids = [],
): void {
Expand Down
3 changes: 1 addition & 2 deletions src/Handler/Task/TaskHandlerInterface.php
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,6 @@
namespace Synolia\SyliusAkeneoPlugin\Handler\Task;

use Akeneo\Pim\ApiClient\Pagination\PageInterface;
use Akeneo\Pim\ApiClient\Pagination\ResourceCursorInterface;
use Symfony\Component\DependencyInjection\Attribute\AutoconfigureTag;
use Synolia\SyliusAkeneoPlugin\Payload\PipelinePayloadInterface;

Expand All @@ -25,7 +24,7 @@ public function batch(

public function handle(
PipelinePayloadInterface $pipelinePayload,
ResourceCursorInterface|PageInterface $handleType,
iterable|PageInterface $handleType,
): void;

public function continue(PipelinePayloadInterface $pipelinePayload): void;
Expand Down
12 changes: 12 additions & 0 deletions src/Message/Batch/CategoryBatchMessage.php
Original file line number Diff line number Diff line change
@@ -0,0 +1,12 @@
<?php

declare(strict_types=1);

namespace Synolia\SyliusAkeneoPlugin\Message\Batch;

class CategoryBatchMessage implements BatchMessageInterface
{
public function __construct(public array $items)
{
}
}
35 changes: 35 additions & 0 deletions src/MessageHandler/Batch/CategoryBatchMessageHandler.php
Original file line number Diff line number Diff line change
@@ -0,0 +1,35 @@
<?php

declare(strict_types=1);

namespace Synolia\SyliusAkeneoPlugin\MessageHandler\Batch;

use Symfony\Component\Messenger\Attribute\AsMessageHandler;
use Symfony\Contracts\EventDispatcher\EventDispatcherInterface;
use Synolia\SyliusAkeneoPlugin\Message\Batch\CategoryBatchMessage;
use Synolia\SyliusAkeneoPlugin\Processor\Resource\Category\CategoryResourceProcessor;
use Synolia\SyliusAkeneoPlugin\Processor\Resource\Exception\MaxResourceProcessorRetryException;

#[AsMessageHandler]
class CategoryBatchMessageHandler
{
public function __construct(
private EventDispatcherInterface $dispatcher,
private CategoryResourceProcessor $resourceProcessor,
) {
}

public function __invoke(CategoryBatchMessage $attributeBatchMessage): void
{
foreach ($attributeBatchMessage->items as $resource) {
try {
$this->resourceProcessor->process($resource);
} catch (MaxResourceProcessorRetryException) {
// Skip the failing line
$this->dispatcher->dispatch(new CategoryBatchMessage([$resource]));

continue;
}
}
}
}
1 change: 1 addition & 0 deletions src/Payload/AbstractPayload.php
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,7 @@ public function __construct(
$this->isContinue = $commandContext->isContinue();
$this->processAsSoonAsPossible = $commandContext->getProcessAsSoonAsPossible();
$this->handler = $commandContext->getHandler();
$this->fromPage = $commandContext->getFromPage();
}
}

Expand Down
Loading

0 comments on commit 4da43a3

Please sign in to comment.