Function Description

Topic Plugin

Initially, the topic plugin checks the request. If the route is V1/pacemaker/products and the module is enabled, the plugin modifies the request topic to async.bulk.pacemaker.post.save.

<?php

/**
 * @param Config $subject
 * @param callable $proceed
 * @param string $routeUrl
 * @param string $httpMethod
 * @return string
 * @see Config::getTopicName()
 * @SuppressWarnings(PHPMD.UnusedFormalParameter)
 */
public function aroundGetTopicName(Config $subject, callable $proceed, string $routeUrl, string $httpMethod): string
{
    if ($routeUrl === 'V1/pacemaker/products' && $this->config->isEnabled()) {
        return self::TOPIC_NAME;
    }
    return $proceed($routeUrl, $httpMethod);
}

Multiple Operation Plugin

The Multiple Operation plugin invokes the Operation Service to verify two conditions: whether the requests include a pacemaker topic and if the number of products exceeds the configured operation threshold. If both conditions are met, the Operation Service consolidates the requests into a single, larger operation. Otherwise, it resets the topic.

<?php

/**
 * @param SaveMultipleOperations $subject
 * @param array $operations
 * @return array
 * @SuppressWarnings(PHPMD.UnusedFormalParameter)
 */
public function beforeExecute(SaveMultipleOperations $subject, array $operations): array
{
    return [$this->operationService->summarizeOperations($operations)];
}

The Publisher is responsible for queuing the operation. Since the request should be processed by the Magento Consumer, but needs to be recognized later by the Pacemaker Operation Handler, the operation itself includes the Pacemaker topic. However, the Publisher Pool Plugin ensures that the publisher used is the Magento Product Publisher. For this purpose, it modifies the parameters of the request in the before hook so that the Pacemaker topic is transformed into a Magento product topic.

<?php

/**
 * phpcs:disable
 * @param PublisherPool $subject
 * @param string $topicName
 * @param $data
 * @return array
 * @see PublisherPool::publish()
 * @SuppressWarnings(PHPMD.UnusedFormalParameter)
 */
public function beforePublish(PublisherPool $subject, string $topicName, $data): array
{
    if ($topicName === 'async.bulk.pacemaker.post.save') {
        return ['async.magento.catalog.api.productrepositoryinterface.save.post', $data];
    }
    return [$topicName, $data];
}

Pacemaker Operation Handler

The Magento consumer decodes the operation topic and identifies the appropriate handler. The module’s consumer plugin detects the topic and returns the Pacemaker Operation Handler. This handler then receives the decoded products from the encoder plugin and processes the operation accordingly.

<?php

/**
 * @param ConsumerConfiguration $subject
 * @param callable $proceed
 * @param string $topicName
 * @return array
 * @see ConsumerConfiguration::getHandlers()
 * @SuppressWarnings(PHPMD.UnusedFormalParameter)
 */
public function aroundGetHandlers(ConsumerConfiguration $subject, callable $proceed, string $topicName): array
{
    if ($topicName === self::TOPIC_NAME) {
        return [
            [
                $this->pacemakerOperationHandler,
                'save'
            ]
        ];
    }
    return $proceed($topicName);
}

Operation Csv Exporter

The Pacemaker Operation Handler executes the 'OperationCsvExporter' which iterates over all products, decide if the product exists or have to be created and give the selected product to the ConverterChain.

  • Converter in the Converter Chain

    • Product Converter

    • Custom Attribute Converter

    • Stock Converter

    • Bundle Converter

    • Configurable Converter

    • Link Converter

    • Tier Price Converter

    • Media Gallery Converter

The Converters convert the product to a csv array and give it back to the Csv Exporter. The Csv Exporter split the collected data in new and existing products and return the arrays to the operation Handler. At next the pacemaker operation Handler write the data to two separated csv files.

<?php

/**
 * @param array $operations
 * @return array|array[]
 * @throws LocalizedException
 */
public function export(array $operations): array
{
    $dataRows = [
        self::EXISTING_PRODUCTS_KEY => [],
        self::NOT_EXISTING_PRODUCTS_KEY => []
    ];

    foreach ($operations as $operation) {
        $product = $operation["meta_information"];
        $entityParams = $this->encoder->decode(self::PRODUCT_TOPIC, $product);
        $storeViewId = (int) $operation["store_id"];

        /** @var ProductInterface $pr */
        foreach ($entityParams as $pr) {
            if (!$this->product->getIdBySku($pr->getSku())) {
                $convertedProduct = $this->converterChain->execute($pr, 0, true);
                $dataRows[self::NOT_EXISTING_PRODUCTS_KEY][] = $convertedProduct;
            } else {
                $convertedProduct = $this->converterChain->execute($pr, $storeViewId, false);
                $dataRows[self::EXISTING_PRODUCTS_KEY][] = $convertedProduct;
            }
        }
    }
    return $dataRows;
}

Upload Model Creation

Then a upload model for each csv file should be created which can be shown in the magento backend, also a import_catalog pipeline should be created. Now you can start the heartbeat to run the import pipeline steps.