Heartbeat

  • All components and logic of the pipelines get operated with a heartbeat

  • The functionality can get executed with a separate heartbeat CLI command

This command should get integrated automatically in a cronjob or similar mechanism to execute the functionality for processing pipelines in a regular cycle

Heartbeat execution function \TechDivision\ProcessPipelines\Model\PipelineHeartbeat::process
<?php

public function process($pulse): void
{
    if ($this->lockManager->isLocked(self::HEARTBEAT_LOCK_KEY)) {
        throw new LocalizedException(__('Process is locked! Could not execute heartbeat.'));
    }

    $this->lockManager->lock(self::HEARTBEAT_LOCK_KEY);
    try {
        if ($this->isMaintenanceModeEnabled->execute()) {
            $this->heartbeatLogger->info('Maintenance mode is active for heartbeat.');
        } else {
            $this->spawnPipelines->setPulse((string)$pulse);
            $this->spawnPipelines->execute();
            $this->executePendingSteps->execute();
        }
        $this->cancelExpiredPipelines->execute();
        $this->updatePipelineStatus->execute();
        $this->lockManager->unlock(self::HEARTBEAT_LOCK_KEY);
    } catch (Throwable $throwable) {
        $this->lockManager->unlock(self::HEARTBEAT_LOCK_KEY);
        throw $throwable;
    }
}
  • The heartbeat gets used to create pipelines that are ready according to the conditions and those that are detected and prepared by the pipeline initializer if the init pipeline functionality is not activated

  • You can also disable the spawning of pipelines with the execution options configuration

  • Additionally the heartbeat takes care of expiration dates and terminates a pipeline if the date has been exceeded

  • To create dynamic pipelines, they must get implemented separately, and the related configuration must get activated

  • Conditions for non-dynamic pipelines get defined with the pipeline XML

Initialization of Pipelines \TechDivision\ProcessPipelines\Model\HeartbeatCommand\SpawnPipelinesCommand::execute
<?php

/**
 * Create new pipeline instance
 * @return void
 */
public function execute()
{
    $this->logger->info("Initiating new pipelines");
    $pipelineList = $this->config->getList();
    foreach ($pipelineList as $pipeline) {
        $this->logger->debug            (
            sprintf(
                'Checking pipeline "%s"',
                $pipeline[SpawnPipelinesCommand::PIPELINE_CONFIG_NAME]
            )
        );
        if ($this->isPipelineReady($pipeline) === false) {
            $this->stepArguments = [];
            continue;
        }
        $this->logger->debug
            sprintf(
                ' ==> Initiate pipeline "%s"',
                $pipeline[SpawnPipelinesCommand::PIPELINE_CONFIG_NAME]
            )
        );
        $this->pipelineBuilder->setStepArguments($this->stepArguments)
        $this->pipelineBuilder->createPipelineByConfig($pipeline)
    }
}
  • The heartbeat determines the next steps that should or may get executed with created pipelines

  • The conditions of the steps also get checked

  • The StepPublisher then transfers/publishes the step data to the configured message queue

Data gets passed with the Topic pipeline.step.process:
Registration of the topic for the connection vendor/techdivision/process-pipelines/etc/publishStepMessage.php
<?php

/**
 * Publishes a step message by given StepId.
 *
 * @param StepInterface $step
 */
public function publishStepMessage(StepInterface $step)
{
    try {
        $data = json_encode(['step_id' => $step->getId()]);
        $this->updateStepStatus($
        $this->publisher->publish(StepPublisher::TOPIC_NAME, $data);
    } catch (\Throwable $exception) {
        $this->processException($step, $exception);
    }
}
Registration of the topic for the connection vendor/techdivision/process-pipelines/etc/queue_publisher.xml
<publisher topic="pipeline.step.process">
    <connection name="amqp" exchange="techdivision-process-pipelines"
                disabled="false"
    />
    <connection name="db" exchange="techdivision-process-pipelines-db"
                disabled="true"
    />
</publisher>
Exchange registration / exchange binding vendor/techdivision/process-pipelines/etc/queue_topology.xml
<publisher topic="pipeline.step.process">
    <connection name="amqp" exchange="techdivision-process-pipelines"
                disabled="false"
    />
    <connection name="db" exchange="techdivision-process-pipelines-db"
                disabled="true"
    />
</publisher>
Registration of the "Topic" vendor/techdivision/process-pipelines/etc/communication.xml
<config xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:noNamespaceSchemaLocation="urn:magento:framework:Communication/etc/communication.xsd">
    <topic name="pipeline.step.process" is_synchronous="false" request="string" />
</config>
  • A heartbeat ensures that pipelines are updated

  • A heartbeat uses step or pipeline arguments to check whether a pipeline has expired

  • Based on the data which gets processed or the heartbeat determined, data, information, and the status of the pipelines get set

  • Status and status data of the pipelines are set based on the information received