Skip to content

Commit

Permalink
WIP: Implementing subprocesses
Browse files Browse the repository at this point in the history
  • Loading branch information
christianbader committed May 6, 2024
1 parent 01f15c2 commit 45ed0a7
Show file tree
Hide file tree
Showing 4 changed files with 226 additions and 36 deletions.
183 changes: 183 additions & 0 deletions src/Command/DoPopulateIndex.php
@@ -0,0 +1,183 @@
<?php

declare(strict_types=1);

namespace Valantic\ElasticaBridgeBundle\Command;

use Elastica\Index as ElasticaIndex;
use Pimcore\Model\Element\AbstractElement;
use Symfony\Component\Console\Helper\ProgressBar;
use Symfony\Component\Console\Input\InputInterface;
use Symfony\Component\Console\Input\InputOption;
use Symfony\Component\Console\Output\OutputInterface;
use Valantic\ElasticaBridgeBundle\Exception\Command\DocumentFailedException;
use Valantic\ElasticaBridgeBundle\Exception\Command\IndexingFailedException;
use Valantic\ElasticaBridgeBundle\Index\IndexInterface;
use Valantic\ElasticaBridgeBundle\Repository\ConfigurationRepository;
use Valantic\ElasticaBridgeBundle\Repository\DocumentRepository;
use Valantic\ElasticaBridgeBundle\Repository\IndexRepository;
use Valantic\ElasticaBridgeBundle\Service\DocumentHelper;

class DoPopulateIndex extends BaseCommand
{
private const OPTION_CONFIG = 'config';
private const OPTION_INDEX = 'index';
private const OPTION_BATCH_NUMBER = 'batch-number';
private const OPTION_LISTING_COUNT = 'listing-count';
private const OPTION_DOCUMENT = 'document';

public function __construct(
private readonly IndexRepository $indexRepository,
private readonly DocumentRepository $documentRepository,
private readonly DocumentHelper $documentHelper,
private readonly ConfigurationRepository $configurationRepository,
) {
parent::__construct();
}

protected function configure(): void
{
$this->setName(self::COMMAND_NAMESPACE . 'do-populate-index')
->setHidden(true)
->setDescription('[INTERNAL]')
->addOption(self::OPTION_CONFIG, mode: InputOption::VALUE_REQUIRED)
->addOption(self::OPTION_INDEX, mode: InputOption::VALUE_REQUIRED)
->addOption(self::OPTION_BATCH_NUMBER, mode: InputOption::VALUE_REQUIRED)
->addOption(self::OPTION_LISTING_COUNT, mode: InputOption::VALUE_REQUIRED)
->addOption(self::OPTION_DOCUMENT, mode: InputOption::VALUE_REQUIRED)
;
}

protected function execute(InputInterface $input, OutputInterface $output): int
{
$indexConfig = $this->getIndex();

if (!$indexConfig instanceof IndexInterface) {
return self::FAILURE;
}

$index = $indexConfig->getBlueGreenInactiveElasticaIndex();
$this->populateIndex($indexConfig, $index);

return self::SUCCESS;
}

private function getIndex(): ?IndexInterface
{
foreach ($this->indexRepository->flattenedAll() as $indexConfig) {
if ($indexConfig->getName() === $this->input->getOption(self::OPTION_CONFIG)) {
return $indexConfig;
}
}

return null;
}

private function populateIndex(IndexInterface $indexConfig, ElasticaIndex $esIndex): void
{
ProgressBar::setFormatDefinition('custom', "%percent%%\t%remaining%\t%memory%\n%message%");

$batchNumber = (int) $this->input->getOption(self::OPTION_BATCH_NUMBER);
$listingCount = (int) $this->input->getOption(self::OPTION_LISTING_COUNT);

$progressBar = new ProgressBar($this->output, $listingCount > 0 ? $listingCount : 1);
$progressBar->setMessage('');
$progressBar->setFormat('custom');

$progressBar->setProgress((int) $batchNumber * $indexConfig->getBatchSize());

Check failure on line 87 in src/Command/DoPopulateIndex.php

View workflow job for this annotation

GitHub Actions / phpstan (PHP 8.1 with Pimcore ^11.0 (prefer-lowest) on ubuntu-latest)

Casting to int something that's already int.

Check failure on line 87 in src/Command/DoPopulateIndex.php

View workflow job for this annotation

GitHub Actions / phpstan (PHP 8.1 with Pimcore ^11.0 (prefer-stable) on ubuntu-latest)

Casting to int something that's already int.

Check failure on line 87 in src/Command/DoPopulateIndex.php

View workflow job for this annotation

GitHub Actions / phpstan (PHP 8.2 with Pimcore ^11.0 (prefer-lowest) on ubuntu-latest)

Casting to int something that's already int.

Check failure on line 87 in src/Command/DoPopulateIndex.php

View workflow job for this annotation

GitHub Actions / phpstan (PHP 8.2 with Pimcore ^11.0 (prefer-stable) on ubuntu-latest)

Casting to int something that's already int.

$allowedDocuments = $indexConfig->getAllowedDocuments();
$document = $this->input->getOption(self::OPTION_DOCUMENT);

foreach ($allowedDocuments as $allowedDocument) {
if ($allowedDocument === $document) {
$allowedDocument = $document;
}
}

$progressBar->setMessage($document);

$documentInstance = $this->documentRepository->get($allowedDocument);

Check failure on line 100 in src/Command/DoPopulateIndex.php

View workflow job for this annotation

GitHub Actions / phpstan (PHP 8.1 with Pimcore ^11.0 (prefer-lowest) on ubuntu-latest)

Variable $allowedDocument might not be defined.

Check failure on line 100 in src/Command/DoPopulateIndex.php

View workflow job for this annotation

GitHub Actions / phpstan (PHP 8.1 with Pimcore ^11.0 (prefer-stable) on ubuntu-latest)

Variable $allowedDocument might not be defined.

Check failure on line 100 in src/Command/DoPopulateIndex.php

View workflow job for this annotation

GitHub Actions / phpstan (PHP 8.2 with Pimcore ^11.0 (prefer-lowest) on ubuntu-latest)

Variable $allowedDocument might not be defined.

Check failure on line 100 in src/Command/DoPopulateIndex.php

View workflow job for this annotation

GitHub Actions / phpstan (PHP 8.2 with Pimcore ^11.0 (prefer-stable) on ubuntu-latest)

Variable $allowedDocument might not be defined.

try {
$listing = $documentInstance->getListingInstance($indexConfig);
$listing->setOffset($batchNumber * $indexConfig->getBatchSize());
$listing->setLimit($indexConfig->getBatchSize());

foreach ($listing->getData() ?? [] as $dataObject) {
try {
$progressBar->advance();

if (!$documentInstance->shouldIndex($dataObject)) {
continue;
}

$esDocuments[] = $this->documentHelper->elementToDocument($documentInstance, $dataObject);

Check failure on line 115 in src/Command/DoPopulateIndex.php

View workflow job for this annotation

GitHub Actions / phpstan (PHP 8.1 with Pimcore ^11.0 (prefer-lowest) on ubuntu-latest)

Implicit array creation is not allowed - variable $esDocuments might not exist.

Check failure on line 115 in src/Command/DoPopulateIndex.php

View workflow job for this annotation

GitHub Actions / phpstan (PHP 8.1 with Pimcore ^11.0 (prefer-stable) on ubuntu-latest)

Implicit array creation is not allowed - variable $esDocuments might not exist.

Check failure on line 115 in src/Command/DoPopulateIndex.php

View workflow job for this annotation

GitHub Actions / phpstan (PHP 8.2 with Pimcore ^11.0 (prefer-lowest) on ubuntu-latest)

Implicit array creation is not allowed - variable $esDocuments might not exist.

Check failure on line 115 in src/Command/DoPopulateIndex.php

View workflow job for this annotation

GitHub Actions / phpstan (PHP 8.2 with Pimcore ^11.0 (prefer-stable) on ubuntu-latest)

Implicit array creation is not allowed - variable $esDocuments might not exist.
} catch (\Throwable $throwable) {
$this->displayDocumentError($indexConfig, $document, $dataObject, $throwable);

if (!$this->configurationRepository->shouldSkipFailingDocuments()) {
throw new DocumentFailedException($throwable);
}
}
}

if (count($esDocuments) > 0) {

Check failure on line 125 in src/Command/DoPopulateIndex.php

View workflow job for this annotation

GitHub Actions / phpstan (PHP 8.1 with Pimcore ^11.0 (prefer-lowest) on ubuntu-latest)

Variable $esDocuments might not be defined.

Check failure on line 125 in src/Command/DoPopulateIndex.php

View workflow job for this annotation

GitHub Actions / phpstan (PHP 8.1 with Pimcore ^11.0 (prefer-stable) on ubuntu-latest)

Variable $esDocuments might not be defined.

Check failure on line 125 in src/Command/DoPopulateIndex.php

View workflow job for this annotation

GitHub Actions / phpstan (PHP 8.2 with Pimcore ^11.0 (prefer-lowest) on ubuntu-latest)

Variable $esDocuments might not be defined.

Check failure on line 125 in src/Command/DoPopulateIndex.php

View workflow job for this annotation

GitHub Actions / phpstan (PHP 8.2 with Pimcore ^11.0 (prefer-stable) on ubuntu-latest)

Variable $esDocuments might not be defined.
$esIndex->addDocuments($esDocuments);
$esDocuments = [];
}

if ($indexConfig->refreshIndexAfterEveryDocumentWhenPopulating()) {
$esIndex->refresh();
}
} catch (\Throwable $throwable) {
$this->displayIndexError($indexConfig, $throwable);

throw new IndexingFailedException($throwable);
} finally {
if (isset($documentInstance)) {
$this->documentHelper->setTenantIfNeeded($documentInstance, $indexConfig);
}
}
}

private function displayDocumentError(
IndexInterface $indexConfig,
string $document,
AbstractElement $dataObject,
\Throwable $throwable,
): void {
$this->output->writeln('');
$this->output->writeln(sprintf(
'<fg=red;options=bold>Error while populating index %s, processing documents of type %s, last processed element ID %s.</>',
$indexConfig::class,
$document,
$dataObject->getId()
));
$this->displayThrowable($throwable);
}

private function displayIndexError(IndexInterface $indexConfig, \Throwable $throwable): void
{
$this->output->writeln('');
$this->output->writeln(sprintf(
'<fg=red;options=bold>Error while populating index %s.</>',
$indexConfig::class,
));

$this->displayThrowable($throwable);
}

private function displayThrowable(\Throwable $throwable): void
{
$this->output->writeln('');
$this->output->writeln(sprintf('In %s line %d', $throwable->getFile(), $throwable->getLine()));
$this->output->writeln('');

$this->output->writeln($throwable->getMessage());
$this->output->writeln('');

$this->output->writeln($throwable->getTraceAsString());
$this->output->writeln('');
}
}
66 changes: 30 additions & 36 deletions src/Command/PopulateIndex.php
Expand Up @@ -9,8 +9,10 @@
use Symfony\Component\Console\Helper\ProgressBar;
use Symfony\Component\Console\Input\InputInterface;
use Symfony\Component\Console\Input\InputOption;
use Symfony\Component\Console\Output\ConsoleOutput;
use Symfony\Component\Console\Output\OutputInterface;
use Valantic\ElasticaBridgeBundle\Exception\Command\DocumentFailedException;
use Symfony\Component\HttpKernel\KernelInterface;
use Symfony\Component\Process\Process;
use Valantic\ElasticaBridgeBundle\Exception\Command\IndexingFailedException;
use Valantic\ElasticaBridgeBundle\Index\IndexInterface;
use Valantic\ElasticaBridgeBundle\Repository\ConfigurationRepository;
Expand All @@ -22,12 +24,14 @@ class PopulateIndex extends BaseCommand
{
private const OPTION_CONFIG = 'config';
private const OPTION_INDEX = 'index';
public static bool $isPopulating = false;

public function __construct(
private readonly IndexRepository $indexRepository,
private readonly DocumentRepository $documentRepository,
private readonly DocumentHelper $documentHelper,
private readonly ConfigurationRepository $configurationRepository,

Check failure on line 33 in src/Command/PopulateIndex.php

View workflow job for this annotation

GitHub Actions / phpstan (PHP 8.1 with Pimcore ^11.0 (prefer-lowest) on ubuntu-latest)

Property Valantic\ElasticaBridgeBundle\Command\PopulateIndex::$configurationRepository is never read, only written.

Check failure on line 33 in src/Command/PopulateIndex.php

View workflow job for this annotation

GitHub Actions / phpstan (PHP 8.1 with Pimcore ^11.0 (prefer-stable) on ubuntu-latest)

Property Valantic\ElasticaBridgeBundle\Command\PopulateIndex::$configurationRepository is never read, only written.

Check failure on line 33 in src/Command/PopulateIndex.php

View workflow job for this annotation

GitHub Actions / phpstan (PHP 8.2 with Pimcore ^11.0 (prefer-lowest) on ubuntu-latest)

Property Valantic\ElasticaBridgeBundle\Command\PopulateIndex::$configurationRepository is never read, only written.

Check failure on line 33 in src/Command/PopulateIndex.php

View workflow job for this annotation

GitHub Actions / phpstan (PHP 8.2 with Pimcore ^11.0 (prefer-stable) on ubuntu-latest)

Property Valantic\ElasticaBridgeBundle\Command\PopulateIndex::$configurationRepository is never read, only written.
private readonly KernelInterface $kernel,
) {
parent::__construct();
}
Expand Down Expand Up @@ -69,53 +73,44 @@ private function getIndex(): ?IndexInterface

private function populateIndex(IndexInterface $indexConfig, ElasticaIndex $esIndex): void
{
ProgressBar::setFormatDefinition('custom', "%percent%%\t%remaining%\t%memory%\n%message%");

$progressBar = new ProgressBar($this->output, 1);
$progressBar->setMessage('');
$progressBar->setFormat('custom');

try {
foreach ($indexConfig->getAllowedDocuments() as $document) {
$progressBar->setProgress(0);
$progressBar->setMessage($document);

$documentInstance = $this->documentRepository->get($document);

$this->documentHelper->setTenantIfNeeded($documentInstance, $indexConfig);

$listingCount = $documentInstance->getListingInstance($indexConfig)->count();
$progressBar->setMaxSteps($listingCount > 0 ? $listingCount : 1);
$esDocuments = [];
$numberOfBatches = ceil($listingCount / $indexConfig->getBatchSize());

for ($batchNumber = 0; $batchNumber < $numberOfBatches; $batchNumber++) {
$listing = $documentInstance->getListingInstance($indexConfig);
$listing->setOffset($batchNumber * $indexConfig->getBatchSize());
$listing->setLimit($indexConfig->getBatchSize());

foreach ($listing->getData() ?? [] as $dataObject) {
try {
$progressBar->advance();

if (!$documentInstance->shouldIndex($dataObject)) {
continue;
}

$esDocuments[] = $this->documentHelper->elementToDocument($documentInstance, $dataObject);
} catch (\Throwable $throwable) {
$this->displayDocumentError($indexConfig, $document, $dataObject, $throwable);

if (!$this->configurationRepository->shouldSkipFailingDocuments()) {
throw new DocumentFailedException($throwable);
}
self::$isPopulating = true;
$process = new Process(
[
'bin/console', self::COMMAND_NAMESPACE . 'do-populate-index',
'--config', $indexConfig->getName(),
'--index', $esIndex->getName(),
'--batch-number', $batchNumber,
'--listing-count', $listingCount,
'--document', $document,
...array_filter([$this->output->isVerbose() ? '-v' : null,
$this->output->isVeryVerbose() ? '-vv' : null,
$this->output->isDebug() ? '-vvv' : null,
]),
],
$this->kernel->getProjectDir(),
timeout: null
);

$process->run(function($type, $buffer): void {
if ($type === Process::ERR && $this->output instanceof ConsoleOutput) {
$this->output->getErrorOutput()->write($buffer);
} else {
$this->output->write($buffer);
}
}

if (count($esDocuments) > 0) {
$esIndex->addDocuments($esDocuments);
$esDocuments = [];
}
});
self::$isPopulating = false;
}

if (count($esDocuments) > 0) {
Expand All @@ -136,7 +131,6 @@ private function populateIndex(IndexInterface $indexConfig, ElasticaIndex $esInd
}
}

$progressBar->finish();
$this->output->writeln('');
}

Expand Down
5 changes: 5 additions & 0 deletions src/Index/AbstractIndex.php
Expand Up @@ -61,6 +61,11 @@ public function getBatchSize(): int
return 5000;
}

public function shouldIndexInSubprocesses(): bool
{
return false;
}

public function getElasticaIndex(): Index
{
return $this->client->getIndex($this->getName());
Expand Down
8 changes: 8 additions & 0 deletions src/Index/IndexInterface.php
Expand Up @@ -31,6 +31,14 @@ public function getName(): string;
*/
public function getBatchSize(): int;

/**
* Defines if the the index should be populated in subprocesses.
* This is useful for large indexes to avoid memory issues.
*
* @return bool
*/
public function shouldIndexInSubprocesses(): bool;

/**
* Defines the mapping to be used for this index.
* Passed 1:1 to Elasticsearch.
Expand Down

0 comments on commit 45ed0a7

Please sign in to comment.