Consumer / PipelineRunner
The PipelineRunner is a Magento consumer. This represents the connector of the pipelines to the configured message queue.
Several pipeline runners (consumers) can be active
Pipeline runners (consumers) are subject to the Magento standard
bin/magento queue:consumers:start [--max-messages=<value>] [--batch-size=<value>] [--single-thread] [--area-code=<value>] <consumer_name>
We recommend keeping several pipeline runners active, with a maximum number of at least one message, which is then processed |
It automatically deactivates the consumer again after processing
With cron consumer runners it is possible to run multiple runners and ensure that all runners are up and running at application side
with a system supervisor or the TechDivision module Supervisor Consumer the consumer count can get kept constant
After the automatic termination of a Pipelinerunner (Consumer), the configured amount of running Consumers get restored again
bin/magento queue:consumer:start pipeline.runner.db --max-messages=1
Based on the binding already described for the heartbeat, the published and enqueued topic from
the message queue gets passed to the target processing with pipeline_process_steps
<config xmlns:xsi="" xsi:noNamespaceSchemaLocation="urn:magento:framework-message-queue:etc/topology.xsd">
<exchange name="techdivision-process-pipelines" type="topic" connection="amqp">
<binding id="TechDivisionProcessPipelinesBinding" topic="pipeline.step.process" destinationType="queue" destination="pipeline_process_steps"/>
<exchange name="techdivision-process-pipelines-db" type="topic" connection="db">
<binding id="TechDivisionProcessPipelinesBindingDb" topic="pipeline.step.process" destinationType="queue" destination="pipeline_process_steps"/>
The pipeline runners (consumer) for the database or the Amqp get assigned to the
following processing destination:::
If a consumer is running, the message queue can pass the messages to the consumers in sequence so that the consumers process the data
<config xmlns:xsi="" xsi:noNamespaceSchemaLocation="urn:magento:framework-message-queue:etc/consumer.xsd">
<!-- Deprecated -->
<consumer name="pipelineRunner" queue="pipeline_process_steps" connection="amqp" handler="TechDivision\ProcessPipelines\Model\Runner::process"/>
<consumer name="pipeline.runner.amqp" queue="pipeline_process_steps" connection="amqp" handler="TechDivision\ProcessPipelines\Model\Runner::process"/>
<consumer name="pipeline.runner.db" queue="pipeline_process_steps" connection="db" handler="TechDivision\ProcessPipelines\Model\Runner::process"/>
Runner implementation
* Process the given message.
* @param string $messageBody
* @return void
* @throws Exception
public function process($messageBody): void
$validExecutableStatus = [StepInterface::STATUS_ENQUEUED, StepInterface::STATUS_ERROR];
$initMessage = sprintf('Initialize message execution: "%s"', $messageBody);
$step = null;
$executor = null;
try {
// Return if the step was not found
try {
$step = $this->loadStep($messageBody);
} catch (NoSuchEntityException $e) {
'Step (ID: %s) does not exist any more',
// only call executor if status is enqueued or error
if (!in_array($step->getStatus(), $validExecutableStatus)) {
$pipelineId = $step->getPipelineId();
sprintf('Update expire-at for pipeline %s based by before execute step %s', $pipelineId, $step->getId())
$executorType = $step->getExecutorType();
/** @var ExecutorInterface $executor */
$executor = $this->objectManager->create($executorType);
$startedMessage = sprintf(
$this->addAttempt($step); //saves the step
try {
$statusToSet = $step->getStatus() === StepInterface::STATUS_SKIPPED ?
StepInterface::STATUS_SKIPPED : StepInterface::STATUS_SUCCESS;
$logMessage = sprintf(
} catch (ExecutorException $execExp) {
$logMessage = $execExp->getMessage();
if ($execExp->getCode() === ExecutorInterface::NOT_FOUND_CODE) {
$statusToSet = StepInterface::STATUS_SKIPPED;
} elseif ($execExp->getCode() === ExecutorInterface::INVALID_DATA_CODE) {
$statusToSet = StepInterface::STATUS_SUCCESS;
} else {
throw $execExp;
$this->setStepInfos($step, $executor, $statusToSet, $logMessage);
$this->handleStepLog($step, $executor);
$this->handleExpireInAfterExecution((int)$pipelineId, $step);
} catch (\Throwable $exception) {
if (isset($step)) {
$this->handleStepLog($step, $executor);
$this->handleException($step, $exception);
} else {
The pipeline step gets loaded based on the message queue’s message, and the stored executor’s implementation is selected and executed
The expiration date of the pipeline is set dynamically based on the expireIn Information of the pipeline itself and its steps
The status of the step is set based on the executor’s return. Errors get logged with exception handling
If the mini-heartbeat is active, the next step of the pipeline is triggered directly
The component already contains abstract executors and implemented executor models that serve as examples and get used in standard pipelines (e.g., Clean-Up).