Function Descriptions
Heartbeat
-
All components and logic of the pipelines get operated with a heartbeat
-
The functionality can get executed with a separate heartbeat CLI command
This command should get integrated automatically in a cronjob or similar mechanism to execute the functionality for processing pipelines in a regular cycle |
\TechDivision\ProcessPipelines\Model\PipelineHeartbeat::process
<?php
public function process($pulse): void
{
if ($this->lockManager->isLocked(self::HEARTBEAT_LOCK_KEY)) {
throw new LocalizedException(__('Process is locked! Could not execute heartbeat.'));
}
$this->lockManager->lock(self::HEARTBEAT_LOCK_KEY);
try {
if ($this->isMaintenanceModeEnabled->execute()) {
$this->heartbeatLogger->info('Maintenance mode is active for heartbeat.');
} else {
$this->spawnPipelines->setPulse((string)$pulse);
$this->spawnPipelines->execute();
$this->executePendingSteps->execute();
}
$this->cancelExpiredPipelines->execute();
$this->updatePipelineStatus->execute();
$this->lockManager->unlock(self::HEARTBEAT_LOCK_KEY);
} catch (Throwable $throwable) {
$this->lockManager->unlock(self::HEARTBEAT_LOCK_KEY);
throw $throwable;
}
}
-
The heartbeat gets used to create pipelines that are ready according to the conditions and those that are detected and prepared by the pipeline initializer if the init pipeline functionality is not activated
-
You can also disable the spawning of pipelines with the execution options configuration
-
Additionally the heartbeat takes care of expiration dates and terminates a pipeline if the date has been exceeded
-
To create dynamic pipelines, they must get implemented separately, and the related configuration must get activated
-
Conditions for non-dynamic pipelines get defined with the pipeline XML
\TechDivision\ProcessPipelines\Model\HeartbeatCommand\SpawnPipelinesCommand::execute
<?php
/**
* Create new pipeline instance
* @return void
*/
public function execute()
{
$this->logger->info("Initiating new pipelines");
$pipelineList = $this->config->getList();
foreach ($pipelineList as $pipeline) {
$this->logger->debug (
sprintf(
'Checking pipeline "%s"',
$pipeline[SpawnPipelinesCommand::PIPELINE_CONFIG_NAME]
)
);
if ($this->isPipelineReady($pipeline) === false) {
$this->stepArguments = [];
continue;
}
$this->logger->debug
sprintf(
' ==> Initiate pipeline "%s"',
$pipeline[SpawnPipelinesCommand::PIPELINE_CONFIG_NAME]
)
);
$this->pipelineBuilder->setStepArguments($this->stepArguments)
$this->pipelineBuilder->createPipelineByConfig($pipeline)
}
}
-
The heartbeat determines the next steps that should or may get executed with created pipelines
-
The conditions of the steps also get checked
-
The
StepPublisher
thentransfers/publishes
the step data to the configured message queue
- Data gets passed with the Topic pipeline.step.process:
vendor/techdivision/process-pipelines/etc/publishStepMessage.php
<?php
/**
* Publishes a step message by given StepId.
*
* @param StepInterface $step
*/
public function publishStepMessage(StepInterface $step)
{
try {
$data = json_encode(['step_id' => $step->getId()]);
$this->updateStepStatus($
$this->publisher->publish(StepPublisher::TOPIC_NAME, $data);
} catch (\Throwable $exception) {
$this->processException($step, $exception);
}
}
vendor/techdivision/process-pipelines/etc/queue_publisher.xml
<publisher topic="pipeline.step.process">
<connection name="amqp" exchange="techdivision-process-pipelines"
disabled="false"
/>
<connection name="db" exchange="techdivision-process-pipelines-db"
disabled="true"
/>
</publisher>
vendor/techdivision/process-pipelines/etc/queue_topology.xml
<publisher topic="pipeline.step.process">
<connection name="amqp" exchange="techdivision-process-pipelines"
disabled="false"
/>
<connection name="db" exchange="techdivision-process-pipelines-db"
disabled="true"
/>
</publisher>
vendor/techdivision/process-pipelines/etc/communication.xml
<config xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:noNamespaceSchemaLocation="urn:magento:framework:Communication/etc/communication.xsd">
<topic name="pipeline.step.process" is_synchronous="false" request="string" />
</config>
-
A heartbeat ensures that pipelines are updated
-
A heartbeat uses step or pipeline arguments to check whether a pipeline has expired
-
Based on the data which gets processed or the heartbeat determined, data, information, and the status of the pipelines get set
-
Status and status data of the pipelines are set based on the information received
Mini Heartbeat
The Mini Heartbeat is an additional function to increase the performance and speed of pipeline processing.
Besides the normal heartbeat, which can get executed every minute, the Mini-Heartbeat gets executed directly at the end of a step processing with the PipelineRunner (Consumer).
The following steps get executed within the pipeline instead of waiting for the next heartbeat.
Accordingly, steps that only run for seconds or milliseconds can be executed quickly, one after the other, without waiting for the next heartbeat.
- functionality can get activated with a mini heartbeat configuration::
\TechDivision\ProcessPipelines\Model\MiniHeartbeat::process
<?php
public function process(int $pipelineId)
{
if (!$this->isMiniHeartbeatAllowed->execute()) {
$this->heartbeatLogger->info('Mini heartbeat is not allowed.');
return;
}
if ($this->lockManager->isLocked(PipelineHeartbeat::HEARTBEAT_LOCK_KEY)) {
...
return;
}
$lockName = $this->getLockName($pipelineId);
if ($this->lockManager->isLocked($lockName)) {
...
return;
}
$this->lockManager->lock($lockName);
try {
$pipeline = $this->getPipelineById->execute($pipelineId);
if ($this->isMaintenanceModeEnabled->execute()) {
$this->heartbeatLogger->info('Maintenance mode is active for mini heartbeat.');
} else {
$this->executeAllPendingStepsByPipelineCommand->execute($pipeline);
$this->cancelPipelineIfExpiredCommand->execute($pipeline);
}
$this->updatePipelineStatusCommand->execute($pipeline);
$this->lockManager->unlock($lockName);
} catch (Throwable $throwable) {
$this->lockManager->unlock($lockName);
throw $throwable;
}
}
Pipeline Initializer & Pipeline Builder
Pipeline Builder
- The pipeline builder is responsible for the code-specific generation of pipelines:
-
-
Pipeline creation is performed manually with the Magento backend or the CLI commands
-
A pipeline can be created (spawned) with the heartbeat
-
The Builder generates the pipeline based on the identifier and the configuration in the pipeline XML
-
This pipeline is then visible in the Magento backend and also on the CLI
-
-
Pipeline Initializer
- Dynamic pipelines can get created and processed with the Pipeline Initializer:
-
-
The Pipeline Initializer functionality gets located in a separate module
-
Instead of defining the pipelines with the pipeline XML, pipelines can get programmed directly
-
The data are determined with implemented data fetcher and written with a plugin when spawning pipelines
-
The functionality is activated/deactivated with the configuration
-
Init Pipeline
-
The Init-Pipeline for pipeline generation functionality enabled all pipelines using
InitializationDataFetcherChain
to be generated with an init pipeline and no longer with the heartbeat itself. This optimizes the heartbeat execution time -
The functionality is activated/deactivated by configuration
-
See the example of extending/implementing custom logic to use the init-pipeline
Data Fetcher
A Data-Fetcher is implemented with TechDivision\PacemakerPipelineInitializer\Api\InitializationDataFetcherInterface
and
can get registered with di.xml
in the Chain provided for it.
-
With the
execute
function, pipelines can get created based on XML settings
Working examples can get found in the Pacemaker Order Export component
\TechDivision\PacemakerOrderExport\Model\PipelineInitializer
Example
InitializationDataFetcherChain
<type name="TechDivision\PacemakerPipelineInitializer\Model\InitializationDataFetcherChain">
<arguments>
<argument name="dataFetcher" xsi:type="array">
<item name="pacemaker.order.export" xsi:type="object">TechDivision\PacemakerOrderExport\Model\PipelineInitializer</item>
</argument>
</arguments>
</type>
Consumer / PipelineRunner
The PipelineRunner is a Magento consumer. This represents the connector of the pipelines to the configured message queue.
-
Several pipeline runners (consumers) can be active
-
Pipeline runners (consumers) are subject to the Magento standard
bin/magento queue:consumers:start [--max-messages=<value>] [--batch-size=<value>] [--single-thread] [--area-code=<value>] <consumer_name>
We recommend keeping several pipeline runners active, with a maximum number of at least one message, which is then processed |
-
It automatically deactivates the consumer again after processing
-
With cron consumer runners it is possible to run multiple runners and ensure that all runners are up and running at application side
-
with a system supervisor or the TechDivision module Supervisor Consumer the consumer count can get kept constant
-
-
After the automatic termination of a Pipelinerunner (Consumer), the configured amount of running Consumers get restored again
bin/magento queue:consumer:start pipeline.runner.db --max-messages=1
Based on the binding already described for the heartbeat, the published and enqueued topic from
the message queue gets passed to the target processing with pipeline_process_steps
.
vendor/techdivision/process-pipelines/etc/queue_topology.xml
<config xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:noNamespaceSchemaLocation="urn:magento:framework-message-queue:etc/topology.xsd">
<exchange name="techdivision-process-pipelines" type="topic" connection="amqp">
<binding id="TechDivisionProcessPipelinesBinding" topic="pipeline.step.process" destinationType="queue" destination="pipeline_process_steps"/>
</exchange>
<exchange name="techdivision-process-pipelines-db" type="topic" connection="db">
<binding id="TechDivisionProcessPipelinesBindingDb" topic="pipeline.step.process" destinationType="queue" destination="pipeline_process_steps"/>
</exchange>
</config>
The pipeline runners (consumer) for the database or the Amqp get assigned to the
following processing destination:::
-
If a consumer is running, the message queue can pass the messages to the consumers in sequence so that the consumers process the data
vendor/techdivision/process-pipelines/etc/queue_consumer.xml
<config xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:noNamespaceSchemaLocation="urn:magento:framework-message-queue:etc/consumer.xsd">
<!-- Deprecated -->
<consumer name="pipelineRunner" queue="pipeline_process_steps" connection="amqp" handler="TechDivision\ProcessPipelines\Model\Runner::process"/>
<consumer name="pipeline.runner.amqp" queue="pipeline_process_steps" connection="amqp" handler="TechDivision\ProcessPipelines\Model\Runner::process"/>
<consumer name="pipeline.runner.db" queue="pipeline_process_steps" connection="db" handler="TechDivision\ProcessPipelines\Model\Runner::process"/>
</config>
Runner implementation
<?php
/**
* Process the given message.
*
* @param string $messageBody
* @return void
* @throws Exception
*/
public function process($messageBody): void
{
$validExecutableStatus = [StepInterface::STATUS_ENQUEUED, StepInterface::STATUS_ERROR];
$initMessage = sprintf('Initialize message execution: "%s"', $messageBody);
$this->logger->debug($initMessage);
$step = null;
$executor = null;
try {
$this->ensureResourceConnection->execute();
// Return if the step was not found
try {
$step = $this->loadStep($messageBody);
} catch (NoSuchEntityException $e) {
$this->logger->warning(
sprintf(
'Step (ID: %s) does not exist any more',
$this->getStepIdByMessageBody($messageBody)
)
);
return;
}
// only call executor if status is enqueued or error
if (!in_array($step->getStatus(), $validExecutableStatus)) {
$this->logger->info(
sprintf(
RunnerInterface::RUNNER_STEP_NOT_ENQUEUED_STATUS,
$step->getPipelineId(),
$step->getId()
)
);
return;
}
$step->setStartedAt($this->stepRepository->getDatetime()->gmtDate());
$step->setStatus(StepInterface::STATUS_RUNNING);
$pipelineId = $step->getPipelineId();
$this->setPipelineExpiredByStep->execute(
$step,
$this->stepRepository->getDatetime()->gmtDate($step->getStartedAt())
);
$this->logger->debug(
sprintf('Update expire-at for pipeline %s based by before execute step %s', $pipelineId, $step->getId())
);
$executorType = $step->getExecutorType();
/** @var ExecutorInterface $executor */
$executor = $this->objectManager->create($executorType);
$startedMessage = sprintf(
RunnerInterface::RUNNER_STARED_LOG_MESSAGE,
$step->getPipelineId(),
$step->getId(),
$executorType
);
$this->logger->info($startedMessage);
$this->addAttempt($step); //saves the step
try {
$executor->process($step);
$statusToSet = $step->getStatus() === StepInterface::STATUS_SKIPPED ?
StepInterface::STATUS_SKIPPED : StepInterface::STATUS_SUCCESS;
$logMessage = sprintf(
RunnerInterface::RUNNER_SUCCESS_LOG_MESSAGE,
$step->getPipelineId(),
$step->getId(),
$executorType
);
} catch (ExecutorException $execExp) {
$logMessage = $execExp->getMessage();
if ($execExp->getCode() === ExecutorInterface::NOT_FOUND_CODE) {
$statusToSet = StepInterface::STATUS_SKIPPED;
} elseif ($execExp->getCode() === ExecutorInterface::INVALID_DATA_CODE) {
$statusToSet = StepInterface::STATUS_SUCCESS;
} else {
throw $execExp;
}
}
$this->setStepInfos($step, $executor, $statusToSet, $logMessage);
$this->handleStepLog($step, $executor);
$this->handleExpireInAfterExecution((int)$pipelineId, $step);
$this->stepRepository->save($step);
$this->postExecutionCommand->execute($step);
$this->ensureResourceConnection->execute();
$this->miniHeartbeat->process((int)$step->getPipelineId());
} catch (\Throwable $exception) {
if (isset($step)) {
$this->handleStepLog($step, $executor);
$this->handleException($step, $exception);
} else {
$this->logger->critical($exception->getMessage());
}
}
}
-
The pipeline step gets loaded based on the message queue’s message, and the stored executor’s implementation is selected and executed
-
The expiration date of the pipeline is set dynamically based on the expireIn Information of the pipeline itself and its steps
-
The status of the step is set based on the executor’s return. Errors get logged with exception handling
-
If the mini-heartbeat is active, the next step of the pipeline is triggered directly
-
The component already contains abstract executors and implemented executor models that serve as examples and get used in standard pipelines (e.g., Clean-Up).
Executor
-
An executor is the executive body of a pipeline step
-
The pipeline defines the type of implementation of the executor
delete_pipeline_data
step vendor/techdivision/process-pipelines/etc/pipeline.xml
<step name="delete_pipeline_data" executorType="TechDivision\ProcessPipelines\Model\Executor\DeleteExecutor" sortOrder="20" description="" >
...
</step>
-
The logic of an executor gets contained in the function
process
-
Only the
\TechDivision\ProcessPipelines\Api\ExecutorInterface
has to get implemented -
The stored type of the executor (XML) gets passed to the message queue with the argument list
-
The message queue forwards the classified messages with the info to the PipelineRunner (Consumer), which selects and executes the executor’s implementation based on the provided information
TechDivision\ProcessPipelines\Model\Executor\CompressExecutor
<?php
public function process(StepInterface $step)
{
$compressTime = $this->getCompareTime(
ConfigInterface::CONFIG_PATH_CLEAN_UP_COMPRESS
);
$fileListToCompress = $this->getDirListOlderThenCompareTime($compressTime);
...
foreach ($fileListToCompress as $folderToCompress) {
if ($this->hasFiles($folderToCompress) === false) {
...
$this->delete($folderToCompress);
continue;
}
...
$this->archive($folderToCompress);
}
}
Abstract executor classes
Classname | Description |
---|---|
|
|
|
|
|
Implemented executor models
Classname | Description |
---|---|
|
|
|
|
|
|
|
|
|
Maintenance mode
-
The maintenance mode can get easily set by configuration
-
The maintenance mode ensures that no new pipelines get created, and no more steps get enqueued for processing
-
The maintenance mode shows a magento message and also an additional message to make the user aware of that the maintenance mode is currently enabled
The maintenance mode option is mainly intended before deployments to activate the mode during a release and thus block further processing |
Magento backend pipeline grid view
Magento backend pipeline create view
Magento backend pipeline step view