Heartbeat
The heartbeat represents the basic logic of the pipelines.
-
All components and logics of the pipelines are operated with the heartbeat.
-
The heartbeat functionality can be listed via a CLI command.
-
The CLI command should be integrated automatically in a cron job or similar mechanism to execute the heartbeat functionality for processing pipelines in a regular cycle.
Heartbeat execution function
public function process($pulse)
{
if ($this->lockManager->isLocked(self::HEARTBEAT_LOCK_KEY)) {
throw new LocalizedException(
__('Process is locked! Could not execute heartbeat.')
);
}
$this->lockManager->lock(self::HEARTBEAT_LOCK_KEY);
try {
$this->spawnPipelines->setPulse($pulse);
$this->spawnPipelines->execute();
$this->executePendingSteps->execute();
$this->cancelExpiredPipelines->execute();
$this->updatePipelineStatus->execute();
$this->lockManager->unlock(self::HEARTBEAT_LOCK_KEY);
} catch (Throwable $throwable) {
$this->lockManager->unlock(self::HEARTBEAT_LOCK_KEY);
throw $throwable;
}
}
The heartbeat is used to create pipelines that are ready according to the conditions and pipelines that are detected and prepared via the pipeline initializer.
To create dynamic pipelines, these must be implemented, and the configuration must be activated accordingly.
Conditions for non-dynamic pipelines are defined via the pipeline XML.
Pipelines Initialization
/**
* Create new pipeline instance
* @return void
*/
public function execute()
{
$this->logger->info("Initiating new pipelines...");
$pipelineList = $this->config->getList();
foreach ($pipelineList as $pipeline) {
$this->logger->debug(
sprintf(
'Checking pipeline "%s"',
$pipeline[SpawnPipelinesCommand::PIPELINE_CONFIG_NAME]
)
);
if ($this->isPipelineReady($pipeline) === false) {
$this->stepArguments = [];
continue;
}
$this->logger->debug(
sprintf(
' ==> Initiate pipeline "%s"',
$pipeline[SpawnPipelinesCommand::PIPELINE_CONFIG_NAME]
)
);
$this->pipelineBuilder->setStepArguments($this->stepArguments);
$this->pipelineBuilder->createPipelineByConfig($pipeline);
}
}
StepPublisher function
The heartbeat determines the following steps to be, or should be executed with the pipelines created.
The conditions of the steps are checked as follows:
Via the StepPublisher, the data of the step are passed to the configured message queue.
Afterward, the data are passed to the XML Publisher argument Topic
by setting the value pipeline.step.process
.
/**
* Publishes a step message by given StepId.
*
* @param StepInterface $step
*/
public function publishStepMessage(StepInterface $step)
{
try {
$data = json_encode(['step_id' => $step->getId()]);
$this->updateStepStatus($step);
$this->publisher->publish(StepPublisher::TOPIC_NAME, $data);
} catch (\Throwable $exception) {
$this->processException($step, $exception);
}
}
Registration of the connection topic
<publisher topic="pipeline.step.process">
<connection
name="amqp"
exchange="techdivision-process-pipelines"
disabled="false"
/>
<connection
name="db"
exchange="techdivision-process-pipelines-db"
disabled="true"
/>
</publisher>
Exchange Registration / Exchange-Binding
<config xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:noNamespaceSchemaLocation="urn:magento:framework-message-queue:etc/topology.xsd">
<exchange name="techdivision-process-pipelines" type="topic" connection="amqp">
<binding id="TechDivisionProcessPipelinesBinding" topic="pipeline.step.process" destinationType="queue" destination="pipeline_process_steps"/>
</exchange>
<exchange name="techdivision-process-pipelines-db" type="topic" connection="db">
<binding id="TechDivisionProcessPipelinesBindingDb" topic="pipeline.step.process" destinationType="queue" destination="pipeline_process_steps"/>
</exchange>
</config>
Topic Registration
<config xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:noNamespaceSchemaLocation="urn:magento:framework:Communication/etc/communication.xsd">
<topic name="pipeline.step.process" is_synchronous="false" request="string" />
</config>
The heartbeat also ensures that the pipelines are updated. It checks via step or pipeline arguments whether a pipeline has expired.
Based on the currently processed data and the data determined from the heartbeats, data/info and the status of the pipelines can now be set.