A newer version of this documentation is available: View the Latest Version

Consumer/PipelineRunner

The PipelineRunner is a Magento consumer, which connects the pipelines to the configured message queue.

Several pipeline runners (consumers) can be active. They are bound to the Magento standard:

bin/magento queue:consumers:start [--max-messages=<value>] [--batch-size=<value>] [--single-thread] [--area-code=<value>] <consumer_name>

We recommend keeping several of these pipeline runners active with a maximum number of one message being processed.

The consumers get automatically deactivated after processing.

The number of consumers can be kept constant via a system supervisor or the TechDivision module Supervisor Consumer.

After the automatic termination of a Pipelinerunner (Consumer), the preconfigured amount of running Consumers is restored.

bin/magento queue:consumer:start pipeline.runner.db --max-messages=1

Due to the binding already described for the heartbeat, the published and enqueued topic is passed from the message queue of the target processing pipeline_process_steps.

Exchange Registration / Binding

vendor/techdivision/process-pipelines/etc/queue_topology.xml
<config xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:noNamespaceSchemaLocation="urn:magento:framework-message-queue:etc/topology.xsd">
    <exchange name="techdivision-process-pipelines" type="topic" connection="amqp">
        <binding id="TechDivisionProcessPipelinesBinding" topic="pipeline.step.process" destinationType="queue" destination="pipeline_process_steps"/>
    </exchange>
    <exchange name="techdivision-process-pipelines-db" type="topic" connection="db">
        <binding id="TechDivisionProcessPipelinesBindingDb" topic="pipeline.step.process" destinationType="queue" destination="pipeline_process_steps"/>
    </exchange>
</config>

The pipeline runner (consumer) for the database or the AMQP are assigned to this processing target.

It means that if a consumer is running, the message queue can pass the messages to the consumers sequentially to process the data.

Connection queue and consumer

vendor/techdivision/process-pipelines/etc/queue_consumer.xml
<config xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:noNamespaceSchemaLocation="urn:magento:framework-message-queue:etc/consumer.xsd">
    <!-- Deprecated -->
    <consumer name="pipelineRunner" queue="pipeline_process_steps" connection="amqp" handler="TechDivision\ProcessPipelines\Model\Runner::process"/>

    <consumer name="pipeline.runner.amqp" queue="pipeline_process_steps" connection="amqp" handler="TechDivision\ProcessPipelines\Model\Runner::process"/>
    <consumer name="pipeline.runner.db" queue="pipeline_process_steps" connection="db" handler="TechDivision\ProcessPipelines\Model\Runner::process"/>
</config>

The implementation of the runner looks like the following code:

/**
* Process the given message.
*
* @param string $messageBody
* @return void
* @throws Exception
*/
public function process($messageBody)
{
    $initMessage = sprintf('Initialize message execution: "%s"', $messageBody);
    $this->logger->log(Logger::DEBUG, $initMessage);

    $step = null;
    $executor = null;

    try {
        $this->ensureResourceConnection->execute();
        $step = $this->loadStep($messageBody);
        $step->setStartedAt($this->stepRepository->getDatetime()->gmtDate());
        $step->setStatus(StepInterface::STATUS_RUNNING);

        $executorType = $step->getExecutorType();
        /** @var ExecutorInterface $executor */
        $executor = $this->objectManager->create($executorType);

        $startedMessage = sprintf(
            RunnerInterface::RUNNER_STARED_LOG_MESSAGE,
            $step->getPipelineId(),
            $step->getId(),
            $executorType
        );
        $this->logger->log(Logger::INFO, $startedMessage);
        $this->addAttempt($step); //saves the step
        $executor->process($step)

        $this->ensureResourceConnection->execute();

        $step->setFinishedAt($this->stepRepository->getDatetime()->gmtDate());
        $step->setStatus(StepInterface::STATUS_SUCCESS);
        $step->setHasWarning($executor->getHasWarnings());

        $successMessage = sprintf(
        RunnerInterface::RUNNER_SUCCESS_LOG_MESSAGE,
        $step->getPipelineId(),
        $step->getId(),
        $executorType
        );

        $this->logger->log(Logger::INFO, $successMessage);

        $this->handleExecutorLog($step, $executor);

        $this->stepRepository->save($step);

        $this->postExecutionCommand->execute($step);
        $this->ensureResourceConnection->execute();

        $this->miniHeartbeat->process((int)$step->getPipelineId());
    } catch (\Throwable $exception) {
        if (isset($step)) {
            if (isset($executor)) {
                $this->handleExecutorLog($step, $executor);
            }
            $this->handleException($step, $exception);
        } else {
            $this->logger->log(Logger::CRITICAL, $exception->getMessage());
        }
    }
}

In the first part of the implementation, the pipeline step is loaded based on the message of the message queue, the implementation of the stored executor is selected and then executed.

In the second part, the status of the step is set based on the executor return. The exception-handling is logging occurring errors.

If the mini heartbeat is active, the next step of the pipeline is triggered directly.

The component already contains abstract executors and implemented executor models, which can be used as an example as they are also used in standard pipelines (e.g., Clean-Up) as well.