Skip to content

Commit

Permalink
WIP: Implementing subprocesses
Browse files Browse the repository at this point in the history
  • Loading branch information
christianbader committed May 7, 2024
1 parent 01f15c2 commit ddd708c
Show file tree
Hide file tree
Showing 4 changed files with 270 additions and 100 deletions.
210 changes: 210 additions & 0 deletions src/Command/DoPopulateIndex.php
Original file line number Diff line number Diff line change
@@ -0,0 +1,210 @@
<?php

declare(strict_types=1);

namespace Valantic\ElasticaBridgeBundle\Command;

use Elastica\Index as ElasticaIndex;
use Pimcore\Model\Element\AbstractElement;
use Symfony\Component\Console\Helper\ProgressBar;
use Symfony\Component\Console\Input\InputInterface;
use Symfony\Component\Console\Input\InputOption;
use Symfony\Component\Console\Output\OutputInterface;
use Valantic\ElasticaBridgeBundle\Document\DocumentInterface;
use Valantic\ElasticaBridgeBundle\Exception\Command\DocumentFailedException;
use Valantic\ElasticaBridgeBundle\Index\IndexInterface;
use Valantic\ElasticaBridgeBundle\Repository\ConfigurationRepository;
use Valantic\ElasticaBridgeBundle\Repository\DocumentRepository;
use Valantic\ElasticaBridgeBundle\Repository\IndexRepository;
use Valantic\ElasticaBridgeBundle\Service\DocumentHelper;

class DoPopulateIndex extends BaseCommand
{
private const OPTION_CONFIG = 'config';
private const OPTION_INDEX = 'index';
private const OPTION_BATCH_NUMBER = 'batch-number';
private const OPTION_LISTING_COUNT = 'listing-count';
private const OPTION_DOCUMENT = 'document';

public function __construct(
private readonly IndexRepository $indexRepository,
private readonly DocumentRepository $documentRepository,
private readonly DocumentHelper $documentHelper,
private readonly ConfigurationRepository $configurationRepository,
) {
parent::__construct();
}

protected function configure(): void
{
$this->setName(self::COMMAND_NAMESPACE . 'do-populate-index')
->setHidden(true)
->setDescription('[INTERNAL]')
->addOption(self::OPTION_CONFIG, mode: InputOption::VALUE_REQUIRED)
->addOption(self::OPTION_INDEX, mode: InputOption::VALUE_REQUIRED)
->addOption(self::OPTION_BATCH_NUMBER, mode: InputOption::VALUE_REQUIRED)
->addOption(self::OPTION_LISTING_COUNT, mode: InputOption::VALUE_REQUIRED)
->addOption(self::OPTION_DOCUMENT, mode: InputOption::VALUE_REQUIRED)
;
}

protected function execute(InputInterface $input, OutputInterface $output): int
{
$indexConfig = $this->getIndex();

if (!$indexConfig instanceof IndexInterface) {
return self::FAILURE;
}

$index = $indexConfig->getBlueGreenInactiveElasticaIndex();
$success = $this->populateIndex($indexConfig, $index);

if (!$success) {
return self::FAILURE;
}

return self::SUCCESS;
}

private function getIndex(): ?IndexInterface
{
foreach ($this->indexRepository->flattenedAll() as $indexConfig) {
if ($indexConfig->getName() === $this->input->getOption(self::OPTION_CONFIG)) {
return $indexConfig;
}
}

return null;
}

private function populateIndex(IndexInterface $indexConfig, ElasticaIndex $esIndex): bool
{
ProgressBar::setFormatDefinition('custom', "%percent%%\t%remaining%\t%memory%\n%message%");

$batchNumber = (int) $this->input->getOption(self::OPTION_BATCH_NUMBER);
$listingCount = (int) $this->input->getOption(self::OPTION_LISTING_COUNT);

$progressBar = new ProgressBar($this->output, $listingCount > 0 ? $listingCount : 1);
$progressBar->setMessage('');
$progressBar->setFormat('custom');

$progressBar->setProgress($batchNumber * $indexConfig->getBatchSize());

$allowedDocuments = $indexConfig->getAllowedDocuments();
$document = $this->input->getOption(self::OPTION_DOCUMENT);

if (!in_array($document, $allowedDocuments, true)) {
return false;
}

$progressBar->setMessage($document);

if (!$indexConfig->shouldIndexInSubprocesses()) {
$numberOfBatches = ceil($listingCount / $indexConfig->getBatchSize());

for ($batch = 0; $batch < $numberOfBatches; $batch++) {
$success = $this->doPopulateIndex($esIndex, $indexConfig, $progressBar, $document, $batch);

if (!$success) {
return false;
}
}

return true;
} else {
return $this->doPopulateIndex($esIndex, $indexConfig, $progressBar, $document, $batchNumber);
}
}

private function doPopulateIndex(
ElasticaIndex $esIndex,
IndexInterface $indexConfig,
ProgressBar $progressBar,
string $document,
int $batchNumber,
): bool {
$documentInstance = $this->documentRepository->get($document);

try {
$listing = $documentInstance->getListingInstance($indexConfig);
$listing->setOffset($batchNumber * $indexConfig->getBatchSize());
$listing->setLimit($indexConfig->getBatchSize());

$esDocuments = [];
foreach ($listing->getData() ?? [] as $dataObject) {
try {
$progressBar->advance();

if (!$documentInstance->shouldIndex($dataObject)) {
continue;
}

$esDocuments[] = $this->documentHelper->elementToDocument($documentInstance, $dataObject);
} catch (\Throwable $throwable) {
$this->displayDocumentError($indexConfig, $document, $dataObject, $throwable);

if (!$this->configurationRepository->shouldSkipFailingDocuments()) {
throw new DocumentFailedException($throwable);
}
}
}

if (count($esDocuments) > 0) {
$esIndex->addDocuments($esDocuments);
$esDocuments = [];
}

if ($indexConfig->refreshIndexAfterEveryDocumentWhenPopulating()) {
$esIndex->refresh();
}
} catch (\Throwable $throwable) {
$this->displayIndexError($indexConfig, $throwable);

return false;
} finally {
$this->documentHelper->setTenantIfNeeded($documentInstance, $indexConfig);
}

return true;
}

private function displayDocumentError(
IndexInterface $indexConfig,
string $document,
AbstractElement $dataObject,
\Throwable $throwable,
): void {
$this->output->writeln('');
$this->output->writeln(sprintf(
'<fg=red;options=bold>Error while populating index %s, processing documents of type %s, last processed element ID %s.</>',
$indexConfig::class,
$document,
$dataObject->getId()
));
$this->displayThrowable($throwable);
}

private function displayIndexError(IndexInterface $indexConfig, \Throwable $throwable): void
{
$this->output->writeln('');
$this->output->writeln(sprintf(
'<fg=red;options=bold>Error while populating index %s.</>',
$indexConfig::class,
));

$this->displayThrowable($throwable);
}

private function displayThrowable(\Throwable $throwable): void
{
$this->output->writeln('');
$this->output->writeln(sprintf('In %s line %d', $throwable->getFile(), $throwable->getLine()));
$this->output->writeln('');

$this->output->writeln($throwable->getMessage());
$this->output->writeln('');

$this->output->writeln($throwable->getTraceAsString());
$this->output->writeln('');
}
}

0 comments on commit ddd708c

Please sign in to comment.