Heartbeat
-
All components and logic of the pipelines get operated with a heartbeat
-
The functionality can get executed with a separate heartbeat CLI command
This command should get integrated automatically in a cronjob or similar mechanism to execute the functionality for processing pipelines in a regular cycle |
\TechDivision\ProcessPipelines\Model\PipelineHeartbeat::process
<?php
public function process($pulse): void
{
if ($this->lockManager->isLocked(self::HEARTBEAT_LOCK_KEY)) {
throw new LocalizedException(__('Process is locked! Could not execute heartbeat.'));
}
$this->lockManager->lock(self::HEARTBEAT_LOCK_KEY);
try {
if ($this->isMaintenanceModeEnabled->execute()) {
$this->heartbeatLogger->info('Maintenance mode is active for heartbeat.');
} else {
$this->spawnPipelines->setPulse((string)$pulse);
$this->spawnPipelines->execute();
$this->executePendingSteps->execute();
}
$this->cancelExpiredPipelines->execute();
$this->updatePipelineStatus->execute();
$this->lockManager->unlock(self::HEARTBEAT_LOCK_KEY);
} catch (Throwable $throwable) {
$this->lockManager->unlock(self::HEARTBEAT_LOCK_KEY);
throw $throwable;
}
}
-
The heartbeat gets used to create pipelines that are ready according to the conditions and those that are detected and prepared by the pipeline initializer if the init pipeline functionality is not activated
-
You can also disable the spawning of pipelines with the execution options configuration
-
Additionally the heartbeat takes care of expiration dates and terminates a pipeline if the date has been exceeded
-
To create dynamic pipelines, they must get implemented separately, and the related configuration must get activated
-
Conditions for non-dynamic pipelines get defined with the pipeline XML
\TechDivision\ProcessPipelines\Model\HeartbeatCommand\SpawnPipelinesCommand::execute
<?php
/**
* Create new pipeline instance
* @return void
*/
public function execute()
{
$this->logger->info("Initiating new pipelines");
$pipelineList = $this->config->getList();
foreach ($pipelineList as $pipeline) {
$this->logger->debug (
sprintf(
'Checking pipeline "%s"',
$pipeline[SpawnPipelinesCommand::PIPELINE_CONFIG_NAME]
)
);
if ($this->isPipelineReady($pipeline) === false) {
$this->stepArguments = [];
continue;
}
$this->logger->debug
sprintf(
' ==> Initiate pipeline "%s"',
$pipeline[SpawnPipelinesCommand::PIPELINE_CONFIG_NAME]
)
);
$this->pipelineBuilder->setStepArguments($this->stepArguments)
$this->pipelineBuilder->createPipelineByConfig($pipeline)
}
}
-
The heartbeat determines the next steps that should or may get executed with created pipelines
-
The conditions of the steps also get checked
-
The
StepPublisher
thentransfers/publishes
the step data to the configured message queue
- Data gets passed with the Topic pipeline.step.process:
vendor/techdivision/process-pipelines/etc/publishStepMessage.php
<?php
/**
* Publishes a step message by given StepId.
*
* @param StepInterface $step
*/
public function publishStepMessage(StepInterface $step)
{
try {
$data = json_encode(['step_id' => $step->getId()]);
$this->updateStepStatus($
$this->publisher->publish(StepPublisher::TOPIC_NAME, $data);
} catch (\Throwable $exception) {
$this->processException($step, $exception);
}
}
vendor/techdivision/process-pipelines/etc/queue_publisher.xml
<publisher topic="pipeline.step.process">
<connection name="amqp" exchange="techdivision-process-pipelines"
disabled="false"
/>
<connection name="db" exchange="techdivision-process-pipelines-db"
disabled="true"
/>
</publisher>
vendor/techdivision/process-pipelines/etc/queue_topology.xml
<publisher topic="pipeline.step.process">
<connection name="amqp" exchange="techdivision-process-pipelines"
disabled="false"
/>
<connection name="db" exchange="techdivision-process-pipelines-db"
disabled="true"
/>
</publisher>
vendor/techdivision/process-pipelines/etc/communication.xml
<config xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:noNamespaceSchemaLocation="urn:magento:framework:Communication/etc/communication.xsd">
<topic name="pipeline.step.process" is_synchronous="false" request="string" />
</config>
-
A heartbeat ensures that pipelines are updated
-
A heartbeat uses step or pipeline arguments to check whether a pipeline has expired
-
Based on the data which gets processed or the heartbeat determined, data, information, and the status of the pipelines get set
-
Status and status data of the pipelines are set based on the information received