Consumer / PipelineRunner

The PipelineRunner is a Magento consumer. This represents the connector of the pipelines to the configured message queue.

  • Several pipeline runners (consumers) can be active

  • Pipeline runners (consumers) are subject to the Magento standard

bin/magento queue:consumers:start [--max-messages=<value>] [--batch-size=<value>] [--single-thread] [--area-code=<value>] <consumer_name>

We recommend keeping several pipeline runners active, with a maximum number of at least one message, which is then processed

  • It automatically deactivates the consumer again after processing

  • With cron consumer runners it is possible to run multiple runners and ensure that all runners are up and running at application side

    • with a system supervisor or the TechDivision module Supervisor Consumer the consumer count can get kept constant

  • After the automatic termination of a Pipelinerunner (Consumer), the configured amount of running Consumers get restored again

bin/magento queue:consumer:start pipeline.runner.db --max-messages=1

Based on the binding already described for the heartbeat, the published and enqueued topic from the message queue gets passed to the target processing with pipeline_process_steps.

Exchange registration / Exchange binding vendor/techdivision/process-pipelines/etc/queue_topology.xml
<config xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:noNamespaceSchemaLocation="urn:magento:framework-message-queue:etc/topology.xsd">
    <exchange name="techdivision-process-pipelines" type="topic" connection="amqp">
        <binding id="TechDivisionProcessPipelinesBinding" topic="pipeline.step.process" destinationType="queue" destination="pipeline_process_steps"/>
    </exchange>
    <exchange name="techdivision-process-pipelines-db" type="topic" connection="db">
        <binding id="TechDivisionProcessPipelinesBindingDb" topic="pipeline.step.process" destinationType="queue" destination="pipeline_process_steps"/>
    </exchange>
</config>

The pipeline runners (consumer) for the database or the Amqp get assigned to the
following processing destination:::

  • If a consumer is running, the message queue can pass the messages to the consumers in sequence so that the consumers process the data

Connection queue and consumer vendor/techdivision/process-pipelines/etc/queue_consumer.xml
<config xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:noNamespaceSchemaLocation="urn:magento:framework-message-queue:etc/consumer.xsd">
    <!-- Deprecated -->
    <consumer name="pipelineRunner" queue="pipeline_process_steps" connection="amqp" handler="TechDivision\ProcessPipelines\Model\Runner::process"/>

    <consumer name="pipeline.runner.amqp" queue="pipeline_process_steps" connection="amqp" handler="TechDivision\ProcessPipelines\Model\Runner::process"/>
    <consumer name="pipeline.runner.db" queue="pipeline_process_steps" connection="db" handler="TechDivision\ProcessPipelines\Model\Runner::process"/>
</config>

Runner implementation

<?php

/**
 * Process the given message.
 *
 * @param string $messageBody
 * @return void
 * @throws Exception
 */
public function process($messageBody): void
{
    $validExecutableStatus = [StepInterface::STATUS_ENQUEUED, StepInterface::STATUS_ERROR];
    $initMessage = sprintf('Initialize message execution: "%s"', $messageBody);
    $this->logger->debug($initMessage);

    $step = null;
    $executor = null;
    try {
        $this->ensureResourceConnection->execute();
        // Return if the step was not found
        try {
            $step = $this->loadStep($messageBody);
        } catch (NoSuchEntityException $e) {
            $this->logger->warning(
                sprintf(
                    'Step (ID: %s) does not exist any more',
                    $this->getStepIdByMessageBody($messageBody)
                )
            );
            return;
        }
        // only call executor if status is enqueued or error
        if (!in_array($step->getStatus(), $validExecutableStatus)) {
            $this->logger->info(
                sprintf(
                    RunnerInterface::RUNNER_STEP_NOT_ENQUEUED_STATUS,
                    $step->getPipelineId(),
                    $step->getId()
                )
            );
            return;
        }
        $step->setStartedAt($this->stepRepository->getDatetime()->gmtDate());
        $step->setStatus(StepInterface::STATUS_RUNNING);
        $pipelineId = $step->getPipelineId();

        $this->setPipelineExpiredByStep->execute(
            $step,
            $this->stepRepository->getDatetime()->gmtDate($step->getStartedAt())
        );
        $this->logger->debug(
            sprintf('Update expire-at for pipeline %s based by before execute step %s', $pipelineId, $step->getId())
        );

        $executorType = $step->getExecutorType();
        /** @var ExecutorInterface $executor */
        $executor = $this->objectManager->create($executorType);

        $startedMessage = sprintf(
            RunnerInterface::RUNNER_STARED_LOG_MESSAGE,
            $step->getPipelineId(),
            $step->getId(),
            $executorType
        );
        $this->logger->info($startedMessage);

        $this->addAttempt($step); //saves the step
        try {
            $executor->process($step);
            $statusToSet = $step->getStatus() === StepInterface::STATUS_SKIPPED ?
                StepInterface::STATUS_SKIPPED : StepInterface::STATUS_SUCCESS;
            $logMessage = sprintf(
                RunnerInterface::RUNNER_SUCCESS_LOG_MESSAGE,
                $step->getPipelineId(),
                $step->getId(),
                $executorType
            );
        } catch (ExecutorException $execExp) {
            $logMessage = $execExp->getMessage();
            if ($execExp->getCode() === ExecutorInterface::NOT_FOUND_CODE) {
                $statusToSet = StepInterface::STATUS_SKIPPED;
            } elseif ($execExp->getCode() === ExecutorInterface::INVALID_DATA_CODE) {
                $statusToSet = StepInterface::STATUS_SUCCESS;
            } else {
                throw $execExp;
            }
        }

        $this->setStepInfos($step, $executor, $statusToSet, $logMessage);
        $this->handleStepLog($step, $executor);
        $this->handleExpireInAfterExecution((int)$pipelineId, $step);

        $this->stepRepository->save($step);
        $this->postExecutionCommand->execute($step);
        $this->ensureResourceConnection->execute();
        $this->miniHeartbeat->process((int)$step->getPipelineId());
    } catch (\Throwable $exception) {
        if (isset($step)) {
            $this->handleStepLog($step, $executor);
            $this->handleException($step, $exception);
        } else {
            $this->logger->critical($exception->getMessage());
        }
    }
}
  1. The pipeline step gets loaded based on the message queue’s message, and the stored executor’s implementation is selected and executed

  2. The expiration date of the pipeline is set dynamically based on the expireIn Information of the pipeline itself and its steps

  3. The status of the step is set based on the executor’s return. Errors get logged with exception handling

    1. If the mini-heartbeat is active, the next step of the pipeline is triggered directly

The component already contains abstract executors and implemented executor models that serve as examples and get used in standard pipelines (e.g., Clean-Up).