Consumer / PipelineRunner
The PipelineRunner is a Magento consumer. This represents the connector of the pipelines to the configured message queue.
-
Several pipeline runners (consumers) can be active
-
Pipeline runners (consumers) are subject to the Magento standard
bin/magento queue:consumers:start [--max-messages=<value>] [--batch-size=<value>] [--single-thread] [--area-code=<value>] <consumer_name>
We recommend keeping several pipeline runners active, with a maximum number of at least one message, which is then processed |
-
It automatically deactivates the consumer again after processing
-
With cron consumer runners it is possible to run multiple runners and ensure that all runners are up and running at application side
-
with a system supervisor or the TechDivision module Supervisor Consumer the consumer count can get kept constant
-
-
After the automatic termination of a Pipelinerunner (Consumer), the configured amount of running Consumers get restored again
bin/magento queue:consumer:start pipeline.runner.db --max-messages=1
Based on the binding already described for the heartbeat, the published and enqueued topic from
the message queue gets passed to the target processing with pipeline_process_steps
.
vendor/techdivision/process-pipelines/etc/queue_topology.xml
<config xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:noNamespaceSchemaLocation="urn:magento:framework-message-queue:etc/topology.xsd">
<exchange name="techdivision-process-pipelines" type="topic" connection="amqp">
<binding id="TechDivisionProcessPipelinesBinding" topic="pipeline.step.process" destinationType="queue" destination="pipeline_process_steps"/>
</exchange>
<exchange name="techdivision-process-pipelines-db" type="topic" connection="db">
<binding id="TechDivisionProcessPipelinesBindingDb" topic="pipeline.step.process" destinationType="queue" destination="pipeline_process_steps"/>
</exchange>
</config>
The pipeline runners (consumer) for the database or the Amqp get assigned to the
following processing destination:::
-
If a consumer is running, the message queue can pass the messages to the consumers in sequence so that the consumers process the data
vendor/techdivision/process-pipelines/etc/queue_consumer.xml
<config xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:noNamespaceSchemaLocation="urn:magento:framework-message-queue:etc/consumer.xsd">
<!-- Deprecated -->
<consumer name="pipelineRunner" queue="pipeline_process_steps" connection="amqp" handler="TechDivision\ProcessPipelines\Model\Runner::process"/>
<consumer name="pipeline.runner.amqp" queue="pipeline_process_steps" connection="amqp" handler="TechDivision\ProcessPipelines\Model\Runner::process"/>
<consumer name="pipeline.runner.db" queue="pipeline_process_steps" connection="db" handler="TechDivision\ProcessPipelines\Model\Runner::process"/>
</config>
Runner implementation
<?php
/**
* Process the given message.
*
* @param string $messageBody
* @return void
* @throws Exception
*/
public function process($messageBody): void
{
$validExecutableStatus = [StepInterface::STATUS_ENQUEUED, StepInterface::STATUS_ERROR];
$initMessage = sprintf('Initialize message execution: "%s"', $messageBody);
$this->logger->debug($initMessage);
$step = null;
$executor = null;
try {
$this->ensureResourceConnection->execute();
// Return if the step was not found
try {
$step = $this->loadStep($messageBody);
} catch (NoSuchEntityException $e) {
$this->logger->warning(
sprintf(
'Step (ID: %s) does not exist any more',
$this->getStepIdByMessageBody($messageBody)
)
);
return;
}
// only call executor if status is enqueued or error
if (!in_array($step->getStatus(), $validExecutableStatus)) {
$this->logger->info(
sprintf(
RunnerInterface::RUNNER_STEP_NOT_ENQUEUED_STATUS,
$step->getPipelineId(),
$step->getId()
)
);
return;
}
$step->setStartedAt($this->stepRepository->getDatetime()->gmtDate());
$step->setStatus(StepInterface::STATUS_RUNNING);
$pipelineId = $step->getPipelineId();
$this->setPipelineExpiredByStep->execute(
$step,
$this->stepRepository->getDatetime()->gmtDate($step->getStartedAt())
);
$this->logger->debug(
sprintf('Update expire-at for pipeline %s based by before execute step %s', $pipelineId, $step->getId())
);
$executorType = $step->getExecutorType();
/** @var ExecutorInterface $executor */
$executor = $this->objectManager->create($executorType);
$startedMessage = sprintf(
RunnerInterface::RUNNER_STARED_LOG_MESSAGE,
$step->getPipelineId(),
$step->getId(),
$executorType
);
$this->logger->info($startedMessage);
$this->addAttempt($step); //saves the step
try {
$executor->process($step);
$statusToSet = $step->getStatus() === StepInterface::STATUS_SKIPPED ?
StepInterface::STATUS_SKIPPED : StepInterface::STATUS_SUCCESS;
$logMessage = sprintf(
RunnerInterface::RUNNER_SUCCESS_LOG_MESSAGE,
$step->getPipelineId(),
$step->getId(),
$executorType
);
} catch (ExecutorException $execExp) {
$logMessage = $execExp->getMessage();
if ($execExp->getCode() === ExecutorInterface::NOT_FOUND_CODE) {
$statusToSet = StepInterface::STATUS_SKIPPED;
} elseif ($execExp->getCode() === ExecutorInterface::INVALID_DATA_CODE) {
$statusToSet = StepInterface::STATUS_SUCCESS;
} else {
throw $execExp;
}
}
$this->setStepInfos($step, $executor, $statusToSet, $logMessage);
$this->handleStepLog($step, $executor);
$this->handleExpireInAfterExecution((int)$pipelineId, $step);
$this->stepRepository->save($step);
$this->postExecutionCommand->execute($step);
$this->ensureResourceConnection->execute();
$this->miniHeartbeat->process((int)$step->getPipelineId());
} catch (\Throwable $exception) {
if (isset($step)) {
$this->handleStepLog($step, $executor);
$this->handleException($step, $exception);
} else {
$this->logger->critical($exception->getMessage());
}
}
}
-
The pipeline step gets loaded based on the message queue’s message, and the stored executor’s implementation is selected and executed
-
The expiration date of the pipeline is set dynamically based on the expireIn Information of the pipeline itself and its steps
-
The status of the step is set based on the executor’s return. Errors get logged with exception handling
-
If the mini-heartbeat is active, the next step of the pipeline is triggered directly
-
The component already contains abstract executors and implemented executor models that serve as examples and get used in standard pipelines (e.g., Clean-Up).