Function Descriptions

Heartbeat

  • All components and logic of the pipelines get operated with a heartbeat

  • The functionality can get executed with a separate heartbeat CLI command

This command should get integrated automatically in a cronjob or similar mechanism to execute the functionality for processing pipelines in a regular cycle

Heartbeat execution function \TechDivision\ProcessPipelines\Model\PipelineHeartbeat::process
<?php

public function process($pulse): void
{
    if ($this->lockManager->isLocked(self::HEARTBEAT_LOCK_KEY)) {
        throw new LocalizedException(__('Process is locked! Could not execute heartbeat.'));
    }

    $this->lockManager->lock(self::HEARTBEAT_LOCK_KEY);
    try {
        if ($this->isMaintenanceModeEnabled->execute()) {
            $this->heartbeatLogger->info('Maintenance mode is active for heartbeat.');
        } else {
            $this->spawnPipelines->setPulse((string)$pulse);
            $this->spawnPipelines->execute();
            $this->executePendingSteps->execute();
        }
        $this->cancelExpiredPipelines->execute();
        $this->updatePipelineStatus->execute();
        $this->lockManager->unlock(self::HEARTBEAT_LOCK_KEY);
    } catch (Throwable $throwable) {
        $this->lockManager->unlock(self::HEARTBEAT_LOCK_KEY);
        throw $throwable;
    }
}
  • The heartbeat gets used to create pipelines that are ready according to the conditions and those that are detected and prepared by the pipeline initializer if the init pipeline functionality is not activated

  • You can also disable the spawning of pipelines with the execution options configuration

  • Additionally the heartbeat takes care of expiration dates and terminates a pipeline if the date has been exceeded

  • To create dynamic pipelines, they must get implemented separately, and the related configuration must get activated

  • Conditions for non-dynamic pipelines get defined with the pipeline XML

Initialization of Pipelines \TechDivision\ProcessPipelines\Model\HeartbeatCommand\SpawnPipelinesCommand::execute
<?php

/**
 * Create new pipeline instance
 * @return void
 */
public function execute()
{
    $this->logger->info("Initiating new pipelines");
    $pipelineList = $this->config->getList();
    foreach ($pipelineList as $pipeline) {
        $this->logger->debug            (
            sprintf(
                'Checking pipeline "%s"',
                $pipeline[SpawnPipelinesCommand::PIPELINE_CONFIG_NAME]
            )
        );
        if ($this->isPipelineReady($pipeline) === false) {
            $this->stepArguments = [];
            continue;
        }
        $this->logger->debug
            sprintf(
                ' ==> Initiate pipeline "%s"',
                $pipeline[SpawnPipelinesCommand::PIPELINE_CONFIG_NAME]
            )
        );
        $this->pipelineBuilder->setStepArguments($this->stepArguments)
        $this->pipelineBuilder->createPipelineByConfig($pipeline)
    }
}
  • The heartbeat determines the next steps that should or may get executed with created pipelines

  • The conditions of the steps also get checked

  • The StepPublisher then transfers/publishes the step data to the configured message queue

Data gets passed with the Topic pipeline.step.process:
Registration of the topic for the connection vendor/techdivision/process-pipelines/etc/publishStepMessage.php
<?php

/**
 * Publishes a step message by given StepId.
 *
 * @param StepInterface $step
 */
public function publishStepMessage(StepInterface $step)
{
    try {
        $data = json_encode(['step_id' => $step->getId()]);
        $this->updateStepStatus($
        $this->publisher->publish(StepPublisher::TOPIC_NAME, $data);
    } catch (\Throwable $exception) {
        $this->processException($step, $exception);
    }
}
Registration of the topic for the connection vendor/techdivision/process-pipelines/etc/queue_publisher.xml
<publisher topic="pipeline.step.process">
    <connection name="amqp" exchange="techdivision-process-pipelines"
                disabled="false"
    />
    <connection name="db" exchange="techdivision-process-pipelines-db"
                disabled="true"
    />
</publisher>
Exchange registration / exchange binding vendor/techdivision/process-pipelines/etc/queue_topology.xml
<publisher topic="pipeline.step.process">
    <connection name="amqp" exchange="techdivision-process-pipelines"
                disabled="false"
    />
    <connection name="db" exchange="techdivision-process-pipelines-db"
                disabled="true"
    />
</publisher>
Registration of the "Topic" vendor/techdivision/process-pipelines/etc/communication.xml
<config xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:noNamespaceSchemaLocation="urn:magento:framework:Communication/etc/communication.xsd">
    <topic name="pipeline.step.process" is_synchronous="false" request="string" />
</config>
  • A heartbeat ensures that pipelines are updated

  • A heartbeat uses step or pipeline arguments to check whether a pipeline has expired

  • Based on the data which gets processed or the heartbeat determined, data, information, and the status of the pipelines get set

  • Status and status data of the pipelines are set based on the information received

Mini Heartbeat

The Mini Heartbeat is an additional function to increase the performance and speed of pipeline processing.

Besides the normal heartbeat, which can get executed every minute, the Mini-Heartbeat gets executed directly at the end of a step processing with the PipelineRunner (Consumer).

The following steps get executed within the pipeline instead of waiting for the next heartbeat.

Accordingly, steps that only run for seconds or milliseconds can be executed quickly, one after the other, without waiting for the next heartbeat.

functionality can get activated with a mini heartbeat configuration::
Mini Heartbeat execution function \TechDivision\ProcessPipelines\Model\MiniHeartbeat::process
<?php

public function process(int $pipelineId)
{
    if (!$this->isMiniHeartbeatAllowed->execute()) {
        $this->heartbeatLogger->info('Mini heartbeat is not allowed.');
        return;
    }

    if ($this->lockManager->isLocked(PipelineHeartbeat::HEARTBEAT_LOCK_KEY)) {
        ...
        return;
    }

    $lockName = $this->getLockName($pipelineId);
    if ($this->lockManager->isLocked($lockName)) {
        ...
        return;
    }

    $this->lockManager->lock($lockName);
    try {
        $pipeline = $this->getPipelineById->execute($pipelineId);
        if ($this->isMaintenanceModeEnabled->execute()) {
            $this->heartbeatLogger->info('Maintenance mode is active for mini heartbeat.');
        } else {
            $this->executeAllPendingStepsByPipelineCommand->execute($pipeline);
            $this->cancelPipelineIfExpiredCommand->execute($pipeline);
        }
        $this->updatePipelineStatusCommand->execute($pipeline);
        $this->lockManager->unlock($lockName);
    } catch (Throwable $throwable) {
        $this->lockManager->unlock($lockName);
        throw $throwable;
    }
}

Pipeline Initializer & Pipeline Builder

Pipeline Builder

The pipeline builder is responsible for the code-specific generation of pipelines:
  • Pipeline creation is performed manually with the Magento backend or the CLI commands

  • A pipeline can be created (spawned) with the heartbeat

  • The Builder generates the pipeline based on the identifier and the configuration in the pipeline XML

    • This pipeline is then visible in the Magento backend and also on the CLI

Pipeline Initializer

Dynamic pipelines can get created and processed with the Pipeline Initializer:
  • The Pipeline Initializer functionality gets located in a separate module

  • Instead of defining the pipelines with the pipeline XML, pipelines can get programmed directly

  • The data are determined with implemented data fetcher and written with a plugin when spawning pipelines

  • The functionality is activated/deactivated with the configuration

Init Pipeline

  • The Init-Pipeline for pipeline generation functionality enabled all pipelines using InitializationDataFetcherChain to be generated with an init pipeline and no longer with the heartbeat itself. This optimizes the heartbeat execution time

  • The functionality is activated/deactivated by configuration

  • See the example of extending/implementing custom logic to use the init-pipeline

Data Fetcher

A Data-Fetcher is implemented with TechDivision\PacemakerPipelineInitializer\Api\InitializationDataFetcherInterface and can get registered with di.xml in the Chain provided for it.

  • With the execute function, pipelines can get created based on XML settings

Working examples can get found in the Pacemaker Order Export component \TechDivision\PacemakerOrderExport\Model\PipelineInitializer

Example

Registration Pacemaker Order Export in the InitializationDataFetcherChain
<type name="TechDivision\PacemakerPipelineInitializer\Model\InitializationDataFetcherChain">
    <arguments>
        <argument name="dataFetcher" xsi:type="array">
            <item name="pacemaker.order.export" xsi:type="object">TechDivision\PacemakerOrderExport\Model\PipelineInitializer</item>
        </argument>
    </arguments>
</type>

Consumer / PipelineRunner

The PipelineRunner is a Magento consumer. This represents the connector of the pipelines to the configured message queue.

  • Several pipeline runners (consumers) can be active

  • Pipeline runners (consumers) are subject to the Magento standard

bin/magento queue:consumers:start [--max-messages=<value>] [--batch-size=<value>] [--single-thread] [--area-code=<value>] <consumer_name>

We recommend keeping several pipeline runners active, with a maximum number of at least one message, which is then processed

  • It automatically deactivates the consumer again after processing

  • With cron consumer runners it is possible to run multiple runners and ensure that all runners are up and running at application side

    • with a system supervisor or the TechDivision module Supervisor Consumer the consumer count can get kept constant

  • After the automatic termination of a Pipelinerunner (Consumer), the configured amount of running Consumers get restored again

bin/magento queue:consumer:start pipeline.runner.db --max-messages=1

Based on the binding already described for the heartbeat, the published and enqueued topic from the message queue gets passed to the target processing with pipeline_process_steps.

Exchange registration / Exchange binding vendor/techdivision/process-pipelines/etc/queue_topology.xml
<config xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:noNamespaceSchemaLocation="urn:magento:framework-message-queue:etc/topology.xsd">
    <exchange name="techdivision-process-pipelines" type="topic" connection="amqp">
        <binding id="TechDivisionProcessPipelinesBinding" topic="pipeline.step.process" destinationType="queue" destination="pipeline_process_steps"/>
    </exchange>
    <exchange name="techdivision-process-pipelines-db" type="topic" connection="db">
        <binding id="TechDivisionProcessPipelinesBindingDb" topic="pipeline.step.process" destinationType="queue" destination="pipeline_process_steps"/>
    </exchange>
</config>

The pipeline runners (consumer) for the database or the Amqp get assigned to the
following processing destination:::

  • If a consumer is running, the message queue can pass the messages to the consumers in sequence so that the consumers process the data

Connection queue and consumer vendor/techdivision/process-pipelines/etc/queue_consumer.xml
<config xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:noNamespaceSchemaLocation="urn:magento:framework-message-queue:etc/consumer.xsd">
    <!-- Deprecated -->
    <consumer name="pipelineRunner" queue="pipeline_process_steps" connection="amqp" handler="TechDivision\ProcessPipelines\Model\Runner::process"/>

    <consumer name="pipeline.runner.amqp" queue="pipeline_process_steps" connection="amqp" handler="TechDivision\ProcessPipelines\Model\Runner::process"/>
    <consumer name="pipeline.runner.db" queue="pipeline_process_steps" connection="db" handler="TechDivision\ProcessPipelines\Model\Runner::process"/>
</config>

Runner implementation

<?php

/**
 * Process the given message.
 *
 * @param string $messageBody
 * @return void
 * @throws Exception
 */
public function process($messageBody): void
{
    $validExecutableStatus = [StepInterface::STATUS_ENQUEUED, StepInterface::STATUS_ERROR];
    $initMessage = sprintf('Initialize message execution: "%s"', $messageBody);
    $this->logger->debug($initMessage);

    $step = null;
    $executor = null;
    try {
        $this->ensureResourceConnection->execute();
        // Return if the step was not found
        try {
            $step = $this->loadStep($messageBody);
        } catch (NoSuchEntityException $e) {
            $this->logger->warning(
                sprintf(
                    'Step (ID: %s) does not exist any more',
                    $this->getStepIdByMessageBody($messageBody)
                )
            );
            return;
        }
        // only call executor if status is enqueued or error
        if (!in_array($step->getStatus(), $validExecutableStatus)) {
            $this->logger->info(
                sprintf(
                    RunnerInterface::RUNNER_STEP_NOT_ENQUEUED_STATUS,
                    $step->getPipelineId(),
                    $step->getId()
                )
            );
            return;
        }
        $step->setStartedAt($this->stepRepository->getDatetime()->gmtDate());
        $step->setStatus(StepInterface::STATUS_RUNNING);
        $pipelineId = $step->getPipelineId();

        $this->setPipelineExpiredByStep->execute(
            $step,
            $this->stepRepository->getDatetime()->gmtDate($step->getStartedAt())
        );
        $this->logger->debug(
            sprintf('Update expire-at for pipeline %s based by before execute step %s', $pipelineId, $step->getId())
        );

        $executorType = $step->getExecutorType();
        /** @var ExecutorInterface $executor */
        $executor = $this->objectManager->create($executorType);

        $startedMessage = sprintf(
            RunnerInterface::RUNNER_STARED_LOG_MESSAGE,
            $step->getPipelineId(),
            $step->getId(),
            $executorType
        );
        $this->logger->info($startedMessage);

        $this->addAttempt($step); //saves the step
        try {
            $executor->process($step);
            $statusToSet = $step->getStatus() === StepInterface::STATUS_SKIPPED ?
                StepInterface::STATUS_SKIPPED : StepInterface::STATUS_SUCCESS;
            $logMessage = sprintf(
                RunnerInterface::RUNNER_SUCCESS_LOG_MESSAGE,
                $step->getPipelineId(),
                $step->getId(),
                $executorType
            );
        } catch (ExecutorException $execExp) {
            $logMessage = $execExp->getMessage();
            if ($execExp->getCode() === ExecutorInterface::NOT_FOUND_CODE) {
                $statusToSet = StepInterface::STATUS_SKIPPED;
            } elseif ($execExp->getCode() === ExecutorInterface::INVALID_DATA_CODE) {
                $statusToSet = StepInterface::STATUS_SUCCESS;
            } else {
                throw $execExp;
            }
        }

        $this->setStepInfos($step, $executor, $statusToSet, $logMessage);
        $this->handleStepLog($step, $executor);
        $this->handleExpireInAfterExecution((int)$pipelineId, $step);

        $this->stepRepository->save($step);
        $this->postExecutionCommand->execute($step);
        $this->ensureResourceConnection->execute();
        $this->miniHeartbeat->process((int)$step->getPipelineId());
    } catch (\Throwable $exception) {
        if (isset($step)) {
            $this->handleStepLog($step, $executor);
            $this->handleException($step, $exception);
        } else {
            $this->logger->critical($exception->getMessage());
        }
    }
}
  1. The pipeline step gets loaded based on the message queue’s message, and the stored executor’s implementation is selected and executed

  2. The expiration date of the pipeline is set dynamically based on the expireIn Information of the pipeline itself and its steps

  3. The status of the step is set based on the executor’s return. Errors get logged with exception handling

    1. If the mini-heartbeat is active, the next step of the pipeline is triggered directly

The component already contains abstract executors and implemented executor models that serve as examples and get used in standard pipelines (e.g., Clean-Up).

Executor

  • An executor is the executive body of a pipeline step

  • The pipeline defines the type of implementation of the executor

Definition of the delete_pipeline_data step vendor/techdivision/process-pipelines/etc/pipeline.xml
<step name="delete_pipeline_data" executorType="TechDivision\ProcessPipelines\Model\Executor\DeleteExecutor" sortOrder="20" description="" >
    ...
</step>
  • The logic of an executor gets contained in the function process

  • Only the \TechDivision\ProcessPipelines\Api\ExecutorInterface has to get implemented

  • The stored type of the executor (XML) gets passed to the message queue with the argument list

  • The message queue forwards the classified messages with the info to the PipelineRunner (Consumer), which selects and executes the executor’s implementation based on the provided information

  • See PipelineRunners

Executor implementation TechDivision\ProcessPipelines\Model\Executor\CompressExecutor
<?php

public function process(StepInterface $step)
{
    $compressTime = $this->getCompareTime(
        ConfigInterface::CONFIG_PATH_CLEAN_UP_COMPRESS
    );
    $fileListToCompress = $this->getDirListOlderThenCompareTime($compressTime);
    ...
    foreach ($fileListToCompress as $folderToCompress) {
        if ($this->hasFiles($folderToCompress) === false) {
            ...
            $this->delete($folderToCompress);
            continue;
        }
        ...
        $this->archive($folderToCompress);
    }
}

Abstract executor classes

Classname Description

AbstractExecutor

  • The AbstractExecutor provides implementations for function getters and for deleting logs

AbstractArchiveExecutor

  • The AbstractArchiveExecutor gets copied from the AbstractExecutor and provides

    • Additional functions for file comparison

AbstractShellExecutor

  • The AbstractShellExecutor gets copied from the AbstractExecutor and provides

    • Additional functions to execute commands on the CLI level

Implemented executor models

Classname Description

DropCache

  • DropCache is a standard implementation to flush Magento caches with pipelines

  • DropCache uses the AbstractShellExecutor

  • In the process function, the command bin/magento cache:flush <CACHES> gets executed

  • The executor gets used in the Pacemaker Import Pipelines component

Reindex

  • The Reindex is a standard implementation to regenerate Magento indexes with pipelines

    • It uses the AbstractShellExecutor for this purpose

    • In the process function, the command bin/magento indexer:reindex <INDEXER> is executed

    • The executor gets used in the Pacemaker Import Pipelines component

CompressPipelineExecutor

  • 1. Step: The CompressPipelineExecutor is a standard implementation used in the component and the CleanUp pipeline

    • The executor compresses / archives the files of a pipeline by using the AbstractArchiveExecutor

DeleteExecutor

  • 2. Step: The DeleteExecutor is a standard implementation used in the component itself and the CleanUp pipeline

    • The executor deletes the files of a pipeline by using the AbstractArchiveExecutor

DeletePipelineExecutor

  • 3. Step: The DeletePipelineExecutor is a standard implementation used in the component and the CleanUp pipeline

    • The executor deletes the pipeline entries from the database using the AbstractArchiveExecutor

Maintenance mode

  • The maintenance mode can get easily set by configuration

  • The maintenance mode ensures that no new pipelines get created, and no more steps get enqueued for processing

  • The maintenance mode shows a magento message and also an additional message to make the user aware of that the maintenance mode is currently enabled

The maintenance mode option is mainly intended before deployments to activate the mode during a release and thus block further processing

Magento backend pipeline grid view

message pipeline index

Magento backend pipeline create view

message new pipeline

Magento backend pipeline step view

message pipeline step