A newer version of this documentation is available: View the Latest Version

Heartbeat

The heartbeat represents the basic logic of the pipelines.

  • All components and logics of the pipelines are operated with the heartbeat.

  • The heartbeat functionality can be listed via a CLI command.

  • The CLI command should be integrated automatically in a cron job or similar mechanism to execute the heartbeat functionality for processing pipelines in a regular cycle.

Heartbeat execution function

\TechDivision\ProcessPipelines\Model\PipelineHeartbeat::process
public function process($pulse)
{
    if ($this->lockManager->isLocked(self::HEARTBEAT_LOCK_KEY)) {
        throw new LocalizedException(
            __('Process is locked! Could not execute heartbeat.')
        );
    }
    $this->lockManager->lock(self::HEARTBEAT_LOCK_KEY);
    try {
        $this->spawnPipelines->setPulse($pulse);
        $this->spawnPipelines->execute();
        $this->executePendingSteps->execute();
        $this->cancelExpiredPipelines->execute();
        $this->updatePipelineStatus->execute();
        $this->lockManager->unlock(self::HEARTBEAT_LOCK_KEY);
    } catch (Throwable $throwable) {
        $this->lockManager->unlock(self::HEARTBEAT_LOCK_KEY);
        throw $throwable;
    }
}

The heartbeat is used to create pipelines that are ready according to the conditions and pipelines that are detected and prepared via the pipeline initializer.

To create dynamic pipelines, these must be implemented, and the configuration must be activated accordingly.

Conditions for non-dynamic pipelines are defined via the pipeline XML.

Pipelines Initialization

\TechDivision\ProcessPipelines\Model\HeartbeatCommand\SpawnPipelinesCommand::execute
/**
* Create new pipeline instance
* @return void
*/
public function execute()
{
    $this->logger->info("Initiating new pipelines...");
    $pipelineList = $this->config->getList();
    foreach ($pipelineList as $pipeline) {
        $this->logger->debug(
            sprintf(
                'Checking pipeline "%s"',
                $pipeline[SpawnPipelinesCommand::PIPELINE_CONFIG_NAME]
            )
        );
        if ($this->isPipelineReady($pipeline) === false) {
            $this->stepArguments = [];
            continue;
        }
        $this->logger->debug(
            sprintf(
                ' ==> Initiate pipeline "%s"',
                $pipeline[SpawnPipelinesCommand::PIPELINE_CONFIG_NAME]
            )
        );
        $this->pipelineBuilder->setStepArguments($this->stepArguments);
        $this->pipelineBuilder->createPipelineByConfig($pipeline);
    }
}

StepPublisher function

The heartbeat determines the following steps to be, or should be executed with the pipelines created.

The conditions of the steps are checked as follows:

Via the StepPublisher, the data of the step are passed to the configured message queue. Afterward, the data are passed to the XML Publisher argument Topic by setting the value pipeline.step.process.

The function of the StepPublisher (\TechDivision\ProcessPipelines\Model\StepPublisher)
/**
* Publishes a step message by given StepId.
*
* @param StepInterface $step
*/
public function publishStepMessage(StepInterface $step)
{
    try {
            $data = json_encode(['step_id' => $step->getId()]);
            $this->updateStepStatus($step);
            $this->publisher->publish(StepPublisher::TOPIC_NAME, $data);
        } catch (\Throwable $exception) {
            $this->processException($step, $exception);
    }
}

Registration of the connection topic

vendor/techdivision/process-pipelines/etc/queue_publisher.xml
<publisher topic="pipeline.step.process">
    <connection
            name="amqp"
            exchange="techdivision-process-pipelines"
            disabled="false"
    />
    <connection
            name="db"
            exchange="techdivision-process-pipelines-db"
            disabled="true"
    />
</publisher>

Exchange Registration / Exchange-Binding

vendor/techdivision/process-pipelines/etc/queue_topology.xml
<config xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:noNamespaceSchemaLocation="urn:magento:framework-message-queue:etc/topology.xsd">
    <exchange name="techdivision-process-pipelines" type="topic" connection="amqp">
        <binding id="TechDivisionProcessPipelinesBinding" topic="pipeline.step.process" destinationType="queue" destination="pipeline_process_steps"/>
    </exchange>
    <exchange name="techdivision-process-pipelines-db" type="topic" connection="db">
        <binding id="TechDivisionProcessPipelinesBindingDb" topic="pipeline.step.process" destinationType="queue" destination="pipeline_process_steps"/>
    </exchange>
</config>

Topic Registration

vendor/techdivision/process-pipelines/etc/communication.xml
<config xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:noNamespaceSchemaLocation="urn:magento:framework:Communication/etc/communication.xsd">
    <topic name="pipeline.step.process" is_synchronous="false"  request="string" />
</config>

The heartbeat also ensures that the pipelines are updated. It checks via step or pipeline arguments whether a pipeline has expired.

Based on the currently processed data and the data determined from the heartbeats, data/info and the status of the pipelines can now be set.