Skip to content

Commit

Permalink
fix: wait for all jobs to be completed before running AfterImportJob (S…
Browse files Browse the repository at this point in the history
…partnerNL#3992)

* fix: wait for all jobs to be completed before running AfterImportJob SpartnerNL#3560

* Fix l7

* Fix l7

* Rename delayCleanup to cleanupInterval

* Use interval correctly

* Remove job from cache if it fails as well

* Fix cleanup logic

---------

Co-authored-by: Patrick Brouwers <[email protected]>
  • Loading branch information
Tofandel and patrickbrouwers authored Oct 26, 2023
1 parent 3bed775 commit 6f0da5d
Show file tree
Hide file tree
Showing 8 changed files with 127 additions and 17 deletions.
4 changes: 3 additions & 1 deletion CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -5,9 +5,11 @@ All notable changes to this project will be documented in this file.
## [Unreleased]

### Fixed
- Bug preventing WithChunkReading from working with multiple sheets when using ToCollection or ToArray
- Bug preventing WithChunkReading from working with multiple sheets when using ToCollection or ToArray
- Bug that could delete the import file before all the jobs had finished when using WithChunkReading and ShouldQueueWithoutChain
- Fixed issue where isEmptyWhen was not being called when using OnEachRow


## [3.1.47] - 2023-02-16

- Support Laravel 10
Expand Down
13 changes: 7 additions & 6 deletions phpunit.xml.dist
Original file line number Diff line number Diff line change
@@ -1,10 +1,6 @@
<?xml version="1.0" encoding="UTF-8"?>
<phpunit xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" backupGlobals="false" bootstrap="vendor/autoload.php" colors="true" processIsolation="false" stopOnFailure="false" xsi:noNamespaceSchemaLocation="https://schema.phpunit.de/10.0/phpunit.xsd" cacheDirectory=".phpunit.cache" backupStaticProperties="false">
<coverage>
<include>
<directory suffix=".php">./src</directory>
</include>
</coverage>
<phpunit xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" backupGlobals="false" bootstrap="vendor/autoload.php" colors="true" processIsolation="false" stopOnFailure="false" xsi:noNamespaceSchemaLocation="https://schema.phpunit.de/10.3/phpunit.xsd" cacheDirectory=".phpunit.cache" backupStaticProperties="false">
<coverage/>
<php>
<env name="APP_KEY" value="base64:6igsHe3RYC88h3Wje3VzSNqPwUr7Z5ru+NZw/9qwY5M="/>
<env name="DB_HOST" value="127.0.0.1"/>
Expand All @@ -18,4 +14,9 @@
<directory suffix="Test.php">./tests/</directory>
</testsuite>
</testsuites>
<source>
<include>
<directory suffix=".php">./src</directory>
</include>
</source>
</phpunit>
6 changes: 4 additions & 2 deletions src/ChunkReader.php
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,7 @@ public function __construct(Container $container)
* @param WithChunkReading $import
* @param Reader $reader
* @param TemporaryFile $temporaryFile
* @return \Illuminate\Foundation\Bus\PendingDispatch|null
* @return PendingDispatch|Collection|null
*/
public function read(WithChunkReading $import, Reader $reader, TemporaryFile $temporaryFile)
{
Expand All @@ -50,7 +50,7 @@ public function read(WithChunkReading $import, Reader $reader, TemporaryFile $te
$totalRows = $reader->getTotalRows();
$worksheets = $reader->getWorksheets($import);
$queue = property_exists($import, 'queue') ? $import->queue : null;
$delayCleanup = property_exists($import, 'delayCleanup') ? $import->delayCleanup : 600;
$delayCleanup = property_exists($import, 'cleanupInterval') ? $import->cleanupInterval : 60;

if ($import instanceof WithProgressBar) {
$import->getConsoleOutput()->progressStart(array_sum($totalRows));
Expand Down Expand Up @@ -84,6 +84,8 @@ public function read(WithChunkReading $import, Reader $reader, TemporaryFile $te
$afterImportJob = new AfterImportJob($import, $reader);

if ($import instanceof ShouldQueueWithoutChain) {
$afterImportJob->setInterval($delayCleanup);
$afterImportJob->setDependencies($jobs);
$jobs->push($afterImportJob->delay($delayCleanup));

return $jobs->each(function ($job) use ($queue) {
Expand Down
33 changes: 32 additions & 1 deletion src/Jobs/AfterImportJob.php
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,8 @@

use Illuminate\Bus\Queueable;
use Illuminate\Contracts\Queue\ShouldQueue;
use Illuminate\Queue\InteractsWithQueue;
use Illuminate\Support\Collection;
use Maatwebsite\Excel\Concerns\WithEvents;
use Maatwebsite\Excel\Events\ImportFailed;
use Maatwebsite\Excel\HasEventBus;
Expand All @@ -12,7 +14,7 @@

class AfterImportJob implements ShouldQueue
{
use Queueable, HasEventBus;
use HasEventBus, InteractsWithQueue, Queueable;

/**
* @var WithEvents
Expand All @@ -24,6 +26,13 @@ class AfterImportJob implements ShouldQueue
*/
private $reader;

/**
* @var iterable
*/
private $dependencyIds = [];

private $interval = 60;

/**
* @param object $import
* @param Reader $reader
Expand All @@ -34,8 +43,30 @@ public function __construct($import, Reader $reader)
$this->reader = $reader;
}

public function setInterval(int $interval)
{
$this->interval = $interval;
}

public function setDependencies(Collection $jobs)
{
$this->dependencyIds = $jobs->map(function (ReadChunk $job) {
return $job->getUniqueId();
})->all();
}

public function handle()
{
foreach ($this->dependencyIds as $id) {
if (!ReadChunk::isComplete($id)) {
// Until there is no jobs left to run we put this job back into the queue every minute
// Note: this will do nothing in a SyncQueue but that's desired, because in a SyncQueue jobs run in order
$this->release($this->interval);

return;
}
}

if ($this->import instanceof ShouldQueue && $this->import instanceof WithEvents) {
$this->reader->clearListeners();
$this->reader->registerListeners($this->import->registerEvents());
Expand Down
33 changes: 28 additions & 5 deletions src/Jobs/ReadChunk.php
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@
use Illuminate\Bus\Queueable;
use Illuminate\Contracts\Queue\ShouldQueue;
use Illuminate\Queue\InteractsWithQueue;
use Illuminate\Support\Facades\Cache;
use Maatwebsite\Excel\Concerns\WithChunkReading;
use Maatwebsite\Excel\Concerns\WithCustomValueBinder;
use Maatwebsite\Excel\Concerns\WithEvents;
Expand Down Expand Up @@ -89,6 +90,11 @@ class ReadChunk implements ShouldQueue
*/
private $chunkSize;

/**
* @var string
*/
private $uniqueId;

/**
* @param WithChunkReading $import
* @param IReader $reader
Expand All @@ -115,6 +121,21 @@ public function __construct(WithChunkReading $import, IReader $reader, Temporary
$this->queue = property_exists($import, 'queue') ? $import->queue : null;
}

public function getUniqueId(): string
{
if (!isset($this->uniqueId)) {
$this->uniqueId = uniqid();
Cache::set('laravel-excel/read-chunk/' . $this->uniqueId, true);
}

return $this->uniqueId;
}

public static function isComplete(string $id): bool
{
return !Cache::has('laravel-excel/read-chunk/' . $id);
}

/**
* Get the middleware the job should be dispatched through.
*
Expand Down Expand Up @@ -202,9 +223,7 @@ public function handle(TransactionHandler $transaction)
*/
public function failed(Throwable $e)
{
if ($this->temporaryFile instanceof RemoteTemporaryFile) {
$this->temporaryFile->deleteLocalCopy();
}
$this->cleanUpTempFile(true);

if ($this->import instanceof WithEvents) {
$this->registerListeners($this->import->registerEvents());
Expand All @@ -216,9 +235,13 @@ public function failed(Throwable $e)
}
}

private function cleanUpTempFile()
private function cleanUpTempFile(bool $force = false): bool
{
if (!config('excel.temporary_files.force_resync_remote')) {
if (!empty($this->uniqueId)) {
Cache::delete('laravel-excel/read-chunk/' . $this->uniqueId);
}

if (!$force && !config('excel.temporary_files.force_resync_remote')) {
return true;
}

Expand Down
2 changes: 1 addition & 1 deletion src/Reader.php
Original file line number Diff line number Diff line change
Expand Up @@ -106,7 +106,7 @@ public function read($import, $filePath, string $readerType = null, string $disk
}

try {
$this->loadSpreadsheet($import, $this->reader);
$this->loadSpreadsheet($import);

($this->transaction)(function () use ($import) {
$sheetsToDisconnect = [];
Expand Down
2 changes: 1 addition & 1 deletion src/Writer.php
Original file line number Diff line number Diff line change
Expand Up @@ -166,7 +166,7 @@ public function write($export, TemporaryFile $temporaryFile, string $writerType)
);

$writer->save(
$path = $temporaryFile->getLocalPath()
$temporaryFile->getLocalPath()
);

if ($temporaryFile instanceof RemoteTemporaryFile) {
Expand Down
51 changes: 51 additions & 0 deletions tests/Concerns/ShouldQueueWithoutChainTest.php
Original file line number Diff line number Diff line change
@@ -1,5 +1,8 @@
<?php

use Illuminate\Queue\Events\JobProcessed;
use Illuminate\Queue\SyncQueue;
use Illuminate\Support\Facades\Event;
use Illuminate\Support\Facades\Queue;
use Maatwebsite\Excel\Jobs\AfterImportJob;
use Maatwebsite\Excel\Jobs\QueueImport;
Expand Down Expand Up @@ -67,4 +70,52 @@ public function a_queue_name_can_be_specified_when_importing()
Queue::assertPushedOn('queue-name', ReadChunk::class);
Queue::assertPushedOn('queue-name', AfterImportJob::class);
}

/**
* @test
*/
public function the_cleanup_only_runs_when_all_jobs_are_done()
{
$fake = Queue::fake();

if (method_exists($fake, 'serializeAndRestore')) {
$fake->serializeAndRestore(); // More realism
}

$import = new QueueImportWithoutJobChaining();

$import->import('import-users.xlsx');

$jobs = Queue::pushedJobs();
$chunks = collect($jobs[ReadChunk::class])->pluck('job');
$chunks->each(function (ReadChunk $chunk) {
self::assertFalse(ReadChunk::isComplete($chunk->getUniqueId()));
});
self::assertCount(2, $chunks);
$afterImport = $jobs[AfterImportJob::class][0]['job'];

if (!method_exists($fake, 'except')) {
/** @var SyncQueue $queue */
$fake = app(SyncQueue::class);
$fake->setContainer(app());
} else {
$fake->except([AfterImportJob::class, ReadChunk::class]);
}
$fake->push($chunks->first());
self::assertTrue(ReadChunk::isComplete($chunks->first()->getUniqueId()));
self::assertFalse(ReadChunk::isComplete($chunks->last()->getUniqueId()));

Event::listen(JobProcessed::class, function (JobProcessed $event) {
self::assertTrue($event->job->isReleased());
});
$fake->push($afterImport);
Event::forget(JobProcessed::class);
$fake->push($chunks->last());

Event::listen(JobProcessed::class, function (JobProcessed $event) {
self::assertFalse($event->job->isReleased());
});
$fake->push($afterImport);
Event::forget(JobProcessed::class);
}
}

0 comments on commit 6f0da5d

Please sign in to comment.