Skip to content

Commit

Permalink
Follow up after remove-gearman to bring clarity to terms and code (#494)
Browse files Browse the repository at this point in the history
* WIP: remove gearman

* Refactor QueueWatchCommand, Workflow.php, and WorkflowInterface
and code improvements

* More refactoring
Remove Gearman

* fix cs

* fix tests

* Remove unnecessary files, refactor Workflow.php

* Fix the test

* Remove more Gearman

* Remove Gearman from ci tests, docker, and configs

* update README.md

* add aws-sdk

* Refactor

* Refactor Workflows

* Refactor

* added retry mechanism

* Remove gearman from bin scripts

* running queue:watch in the background in bin/reindex script

* redirect outputs to /dev/null

* Use localstack v2 and restart opensearch container on failure

* Watching queue before running the import and refactoring import by requesting 100 result for each api call

* fix cs

* Revert to use iterator

* Revert "Revert to use iterator"

This reverts commit e1f8788.

* Lazy loading items to prevent memory exhaustion

* enable xdebug in docker

* fix a few bugs, and some improvements

* remove queue:watch from bin/reindex

* fix a minor bug

* Add a delay after each indexing

* Fix a bug

* Clean up

* Fix minor bugs and a few improvements

* Dockerise Remove gearman (#478)

* Improve docker (#476)

These changes primarily are to improve the experience of using docker to run the application, though they also start the process of being able to build a docker container for search.

Improvements:

- Docker-related files are now moved to the top-level directory of the project so we don't need to specify config files when running docker commands.
- A SQS queue watcher service and a gearman worker service are now run in docker compose without dev intervention.
- Included healthchecks for services and proper dependency resolution for services.
- Auto-setup of the queue and KV store indexes.
- The application files are copied into the docker image when built
- Add missing tools to docker image so that tests can be run in docker (rather than currently only on the search--ci instance)

* Create a CI pipeline in github workflows (#477)

Create a CI pipeline running in github workflows, that tests and then builds an image

* Run CI on develop too (#479)

* run CI on develop too

* fix syntax for or

* reenable building on remove-gearman branch

* add missing capabilities for connecting to opensearch

* Add limit option to restrict number of items imported

* Tidy up

* remove superflous transform

* move most logic up to AbstractWorkflow

* move retry logic up a level to run()

* Unify debugging output into AbstractWorkflow

* remove further unused code

* move most of the constructors to the AbstractWorkflow

* Expand tests (#493)

* rename method, and add extra debug line. Fix test typing.

* remove unused const

* remove unused imports

* add new test to test behaviours now stored in AbstractWorkflowTest

* create a new Indexer component, with ModelIndexers, and Changeset to represent the changes the ModelIndexers want to make

* port across the BlogArticleWorkflow logic to ModelIndexer

* add CollectionIndexer and test. refactor Indexers and Helpers to be more useful and less coupled

* add InterviewIndexer and test

* add LabsPostIndexer and test

* add PodcastEpisodeIndexer and test

* refactor ModelProvider to allow more than one Model definition

* add ResearchArticleIndexer and test

* fix, called AbstractModelIndexer constructor instead of assuming we know what it does

* add ReviewedPreprintIndexer and test

* remove old Workflows and wire up new Indexer. Various bits of cleanup along the way

* add back in removed tests

---------

Co-authored-by: Saeed Moghadamzadeh <[email protected]>
Co-authored-by: Saeed <[email protected]>
Co-authored-by: Ali Amin <[email protected]>
Co-authored-by: Nathan Lisgo <[email protected]>
Co-authored-by: Nathan Lisgo <[email protected]>
  • Loading branch information
6 people authored Oct 8, 2024
1 parent 33ed7c6 commit 4d490d3
Show file tree
Hide file tree
Showing 45 changed files with 1,419 additions and 2,124 deletions.
32 changes: 32 additions & 0 deletions src/Search/Indexer/ChangeSet.php
Original file line number Diff line number Diff line change
@@ -0,0 +1,32 @@
<?php

namespace eLife\Search\Indexer;

class ChangeSet
{
private array $inserts = [];
private array $deletes = [];

public function addInsert(string $id, string $json)
{
$this->inserts[] = [
'id' => $id,
'json' => $json,
];
}

public function addDelete(string $id)
{
$this->deletes[] = $id;
}

public function getInserts() : array
{
return $this->inserts;
}

public function getDeletes() : array
{
return $this->deletes;
}
}
131 changes: 131 additions & 0 deletions src/Search/Indexer/Indexer.php
Original file line number Diff line number Diff line change
@@ -0,0 +1,131 @@
<?php

namespace eLife\Search\Indexer;

use eLife\ApiSdk\Model\HasIdentifier;
use eLife\ApiSdk\Model\Model;
use eLife\Search\Api\Elasticsearch\MappedElasticsearchClient;
use eLife\Search\Api\HasSearchResultValidator;
use eLife\Search\Api\Elasticsearch\Response\IsDocumentResponse;
use Psr\Log\LoggerInterface;
use InvalidArgumentException;
use Throwable;
use Assert\Assertion;
use eLife\Search\Indexer\ModelIndexer\ResearchArticleIndexer;
use eLife\Search\Indexer\ModelIndexer\BlogArticleIndexer;
use eLife\Search\Indexer\ModelIndexer\InterviewIndexer;
use eLife\Search\Indexer\ModelIndexer\ReviewedPreprintIndexer;
use eLife\Search\Indexer\ModelIndexer\LabsPostIndexer;
use eLife\Search\Indexer\ModelIndexer\PodcastEpisodeIndexer;
use eLife\Search\Indexer\ModelIndexer\CollectionIndexer;
use Symfony\Component\Serializer\Serializer;

class Indexer
{
private $logger;
private $client;
private $validator;
private $modelIndexer;

public function __construct(
LoggerInterface $logger,
MappedElasticsearchClient $client,
HasSearchResultValidator $validator,
array $modelIndexer = []
) {
$this->logger = $logger;
$this->client = $client;
$this->validator = $validator;
$this->modelIndexer = $modelIndexer;
}

public static function getDefaultModelIndexers(Serializer $serializer, MappedElasticsearchClient $client, $rdsArticles) : array
{
return [
'article' => new ResearchArticleIndexer($serializer, $rdsArticles),
'blog-article' => new BlogArticleIndexer($serializer),
'interview' => new InterviewIndexer($serializer),
'reviewed-preprint' => new ReviewedPreprintIndexer($serializer, $client),
'labs-post' => new LabsPostIndexer($serializer),
'podcast-episode' => new PodcastEpisodeIndexer($serializer),
'collection' => new CollectionIndexer($serializer),

];
}

public function getModelIndexer($type): ModelIndexer
{
if (!isset($this->modelIndexer[$type])) {
throw new InvalidArgumentException("The {$type} is not valid.");
}

return $this->modelIndexer[$type];
}

public function index($entity): ChangeSet
{
if (!$entity instanceof Model || !$entity instanceof HasIdentifier) {
throw new InvalidArgumentException('The given Entity is not an '.Model::class.' or '.HasIdentifier::class);
}
$modelIndexer = $this->getModelIndexer($entity->getIdentifier()->getType());

$debugId = '<'.$entity->getIdentifier().'>';

$this->logger->debug($debugId.' preparing for indexing.');
$changeSet = $modelIndexer->prepareChangeSet($entity);

$inserts = $changeSet->getInserts();
if (count($inserts) === 0) {
$this->logger->debug($debugId.' skipping indexing');
}

foreach ($inserts as $insert) {
$doc = $insert['json'];
$docId = $insert['id'];
$this->logger->debug($debugId.' importing into Elasticsearch.');
$this->insert($doc, $docId);

$this->logger->debug($debugId.' post validating.');
try {
$this->postValidate($docId);
} catch (Throwable $e) {
$this->logger->error($debugId.' rolling back.', [
'exception' => $e,
'document' => $result ?? null,
]);
$this->client->deleteDocument($docId);

// We failed.
throw new \Exception($debugId.' post validate failed.');
}

$this->logger->info($debugId.' successfully imported.');
}

foreach ($changeSet->getDeletes() as $deleteId) {
$this->logger->debug('<'.$deleteId.'> removing from index.');
$this->client->deleteDocument($deleteId);
}

return $changeSet;
}

public function insert(string $json, string $id)
{
// Insert the document.
$this->client->indexJsonDocument($id, $json);
return [
'id' => $id,
];
}

public function postValidate($id)
{
// Post-validation, we got a document.
$document = $this->client->getDocumentById($id);
Assertion::isInstanceOf($document, IsDocumentResponse::class);
$result = $document->unwrap();
// That the document is valid JSON.
$this->validator->validateSearchResult($result, true);
}
}
9 changes: 9 additions & 0 deletions src/Search/Indexer/ModelIndexer.php
Original file line number Diff line number Diff line change
@@ -0,0 +1,9 @@
<?php
namespace eLife\Search\Indexer;

use eLife\ApiSdk\Model\Model;

interface ModelIndexer
{
public function prepareChangeSet(Model $model): ChangeSet;
}
27 changes: 27 additions & 0 deletions src/Search/Indexer/ModelIndexer/AbstractModelIndexer.php
Original file line number Diff line number Diff line change
@@ -0,0 +1,27 @@
<?php

namespace eLife\Search\Indexer\ModelIndexer;

use eLife\Search\Indexer\ModelIndexer;
use Symfony\Component\Serializer\Serializer;

abstract class AbstractModelIndexer implements ModelIndexer
{
use Helper\Blocks;
use Helper\JsonSerializerHelper;
use Helper\SortDate;

protected $serializer;

public function __construct(Serializer $serializer)
{
$this->serializer = $serializer;
}

protected function getSerializer(): Serializer
{
return $this->serializer;
}

abstract protected function getSdkClass() : string;
}
41 changes: 41 additions & 0 deletions src/Search/Indexer/ModelIndexer/BlogArticleIndexer.php
Original file line number Diff line number Diff line change
@@ -0,0 +1,41 @@
<?php

namespace eLife\Search\Indexer\ModelIndexer;

use eLife\ApiSdk\Model\BlogArticle;
use eLife\ApiSdk\Model\Model;
use eLife\Search\Indexer\ChangeSet;

final class BlogArticleIndexer extends AbstractModelIndexer
{

protected function getSdkClass(): string
{
return BlogArticle::class;
}

/**
* @param BlogArticle $blogArticle
* @return ChangeSet
*/
public function prepareChangeSet(Model $blogArticle) : ChangeSet
{
$changeSet = new ChangeSet();

// Normalized fields.
$blogArticleObject = json_decode($this->serialize($blogArticle));
$blogArticleObject->type = 'blog-article';
$blogArticleObject->body = $this->flattenBlocks($blogArticleObject->content ?? []);
unset($blogArticleObject->content);
$blogArticleObject->snippet = ['format' => 'json', 'value' => json_encode($this->snippet($blogArticle))];
$this->addSortDate($blogArticleObject, $blogArticle->getPublishedDate());


$changeSet->addInsert(
$blogArticleObject->type.'-'.$blogArticle->getId(),
json_encode($blogArticleObject)
);

return $changeSet;
}
}
38 changes: 38 additions & 0 deletions src/Search/Indexer/ModelIndexer/CollectionIndexer.php
Original file line number Diff line number Diff line change
@@ -0,0 +1,38 @@
<?php

namespace eLife\Search\Indexer\ModelIndexer;

use eLife\ApiSdk\Model\Collection;
use eLife\ApiSdk\Model\Model;
use eLife\Search\Indexer\ChangeSet;

final class CollectionIndexer extends AbstractModelIndexer
{
protected function getSdkClass(): string
{
return Collection::class;
}

/**
* @param Collection $collection
* @return ChangeSet
*/
public function prepareChangeSet(Model $collection) : ChangeSet
{
$changeSet = new ChangeSet();

// Normalized fields.
$collectionObject = json_decode($this->serialize($collection));
$collectionObject->type = 'collection';
$collectionObject->summary = $this->flattenBlocks($collectionObject->summary ?? []);
$collectionObject->snippet = ['format' => 'json', 'value' => json_encode($this->snippet($collection))];
$this->addSortDate($collectionObject, $collection->getPublishedDate());


$changeSet->addInsert(
$collectionObject->type.'-'.$collection->getId(),
json_encode($collectionObject),
);
return $changeSet;
}
}
Original file line number Diff line number Diff line change
@@ -1,17 +1,17 @@
<?php

namespace eLife\Search\Workflow;
namespace eLife\Search\Indexer\ModelIndexer\Helper;

use stdClass;

trait Blocks
{
final private function flattenBlocks(array $blocks) : string
protected function flattenBlocks(array $blocks) : string
{
return implode(' ', array_filter(array_map([$this, 'flattenBlock'], $blocks)));
}

final private function flattenBlock(stdClass $block) : string
protected function flattenBlock(stdClass $block) : string
{
return implode(' ', array_filter([
$block->id ?? null,
Expand All @@ -27,7 +27,7 @@ final private function flattenBlock(stdClass $block) : string
]));
}

final private function flattenItems(array $items) : string
protected function flattenItems(array $items) : string
{
return implode(' ', array_map(function ($item) {
if (is_string($item)) {
Expand Down
48 changes: 48 additions & 0 deletions src/Search/Indexer/ModelIndexer/Helper/JsonSerializerHelper.php
Original file line number Diff line number Diff line change
@@ -0,0 +1,48 @@
<?php

namespace eLife\Search\Indexer\ModelIndexer\Helper;

use Symfony\Component\Serializer\Serializer;

trait JsonSerializerHelper
{
private static $cache = [];

abstract protected function getSdkClass() : string;
abstract protected function getSerializer() : Serializer;

protected function deserialize(string $json)
{
return $this->getSerializer()->deserialize($json, $this->getSdkClass(), 'json');

//todo: the following code causes a conflict when reading from $cache
$key = sha1($json);
if (!isset(self::$cache[$key])) {
self::$cache[$key] = $this->getSerializer()->deserialize($json, $this->getSdkClass(), 'json');
}

return self::$cache[$key];
}

protected function serialize($item) : string
{
return $this->getSerializer()->serialize($item, 'json');

//todo: the following code causes a conflict when reading from $cache
$key = spl_object_hash($item);
if (!isset(self::$cache[$key])) {
self::$cache[$key] = $this->getSerializer()->serialize($item, 'json');
}

return self::$cache[$key];
}

protected function snippet($item) : array
{
return $this->getSerializer()->normalize(
$item,
null,
['snippet' => true, 'type' => true]
);
}
}
Original file line number Diff line number Diff line change
@@ -1,12 +1,12 @@
<?php

namespace eLife\Search\Workflow;
namespace eLife\Search\Indexer\ModelIndexer\Helper;

use DateTimeImmutable;

trait SortDate
{
public function addSortDate($object, DateTimeImmutable $date = null)
protected function addSortDate($object, DateTimeImmutable $date = null)
{
if ($date) {
$object->sortDate = $date->format('Y-m-d\TH:i:s\Z');
Expand Down
Loading

0 comments on commit 4d490d3

Please sign in to comment.