Example import pipeline

To register a completely executable pipeline in the context of the Import Pipelines component, a few steps are necessary, which are demonstrated using the vendor/techdivision/pacemaker-import-inventory inventory import pipeline.

  • This also serves as a template for defining and registering import pipelines

Plant Configuration units

To get a configurable basis, it is recommended to offer a configuration in the form of a system.xml for import pipelines.

  • To be able to set file patterns

  • To quickly activate/deactivate the pipeline in live mode

plant configuration units
Configuration Inventory-Import vendor/techdivision/pacemaker-import-inventory/etc/adminhtml/system.xml
<config xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:noNamespaceSchemaLocation="urn:magento:module:Magento_Config:etc/system_file.xsd">
    <system>
        <section id="techdivision_pacemaker_import">
            <group id="inventory" showInDefault="1" showInStore="0" showInWebsite="0
                   sortOrder="40" translate="label">
            <label>Inventory (stock) Import</label>
            <field id="enabled" translate="label" type="select" sortOrder="25
                       showInDefault="1" showInWebsite="0" showInStore="0"
            canRestore="0">
            <label>Enable Inventory Import Pipeline</label>
            <source_model>
                Magento\Config\Model\Config\Source\Yesno
            </source_model>
        </field>
        <field id="file_name_pattern" translate="label comment" type="text"
               sortOrder="30" showInDefault="1" showInWebsite="0
                       showInStore="0" canRestore="0">
        <label>File Name Pattern</label>
        <comment>Pattern for import file bunches</comment>.
    </field>
</group>
        </section>
        </system>
        </config>

Dependency Injection

Through DI (AOP) already existing structures and implementations can be adapted to own requirements.

Customization/Definition Filepattern from Configuration

For a customized BunchResolver, to know which files to read for processing, virtualization must get created regarding the file name pattern.

  • Thereby the previously defined configuration regarding pattern must be referenced

Virtualization filename pattern from config for inventory import vendor/techdivision/pacemaker-import-inventory/etc/di.xml
<virtualType name="TechDivision\PacemakerImportInventory\Virtual\ConfigProvider\GetFileNamePattern" type="TechDivision\PacemakerImportBase\Model\ConfigProvider\GetFileNamePattern">
    <arguments>
        <argument name="scopeConfigPath" xsi:type="string">
            techdivision_pacemaker_import/inventory/file_name_pattern
        </argument>
    </arguments>
</virtualType>

Customization/Definition BunchResolver

With the filepattern defined and virtualized, a specific bunch resolver can now be declared.

Virtualization BunchResolver for inventory import vendor/techdivision/pacemaker-import-inventory/etc/di.xml
<virtualType name="TechDivision\PacemakerImportInventory\Virtual\ImportBunchResolver" type="TechDivision\PacemakerImportBase\Model\ImportBunchResolver">
    <arguments>
        <argument name="getFileNamePattern" xsi:type="object">
            TechDivision\PacemakerImportInventory\Virtual\ConfigProvider\GetFileNamePattern
        </argument>
    </arguments>
</virtualType>

Definition Pipeline with XML

There are many ways to implement or declare a pipeline.

  • The currently most tested method is the declaration with XML with pipeline.xml

  • All necessary steps are indicated and used in the correct order

  • The pipeline can also get conditioned to run functionally or chronologically at the right moment for each step

The advantage of the XML declaration is the possible selection for pipeline generation in the Magento backend

DefinitionPipeline with XML
PipelinePacemakerBackendUI
The following issues should be considered when creating the pipeline:
  • May the pipeline only be initialized manually or with the DataFetcher?

  • What dependencies does the import step have?

  • Is there a need to update indexes?

  • What arguments does the import executor need?

XML Definition Inventory-Import Pipeline vendor/techdivision/pacemaker-import-inventory/etc/di.xml
<config xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
        xsi:noNamespaceSchemaLocation="urn:magento:module:TechDivision_ProcessPipelines:etc/pipeline.xsd">
    <pipeline name="pacemaker_import_inventory"
              description="Pacemaker Inventory (stock) import"
              use-working-directory="true" expire-in="1 hour">
        <conditions>
            <pipeline_condition
                    type="TechDivision\ProcessPipelines\Helper\Condition\Pipeline\NoAutoSpawn"
                    description="No automatic start for this pipeline"
            />
        </conditions>
        <step name="move_files"
              executorType="TechDivision\PacemakerImportBase\Model\Executor\MoveFilesToWorkingDirectory"
              sortOrder="10" description="Move files to working directory.">
            <conditions>
                <step_condition type="TechDivision\ProcessPipelines\Helper\Condition\Step\AttemptsLimit\Limit1"
                                description="Try once."/>
            </conditions>
        </step>
        <step name="inventory_transformation"
              executorType="TechDivision\PacemakerImportBase\Model\Executor\TransformationExecutor" sortOrder="20"
              description="Transform files into correct format and map values.">
            <conditions>
                <step_condition type="TechDivision\ProcessPipelines\Helper\Condition\Step\AttemptsLimit\Limit1"
                                description="Try once."/>
                <step_condition type="TechDivision\ProcessPipelines\Helper\Condition\Step\PreviousStepsCompleted"
                                description="Previous step needs to be finished."/>
            </conditions>
        </step>
        <step name="inventory_import" executorType="TechDivision\PacemakerImportBase\Model\Executor\ImportExecutor"
              sortOrder="40" description="Import attributes">
            <conditions>
                <step_condition type="TechDivision\ProcessPipelines\Helper\Condition\Step\AttemptsLimit\Limit1"
                                description="Try once."/>
                <step_condition
                        type="TechDivision\PacemakerImportInventory\Virtual\Model\Condition\Step\CouldSkipByNoFiles"
                        description="Check files to import."/>
                <step_condition type="TechDivision\ProcessPipelines\Helper\Condition\Step\PreviousStepsCompleted"
                                description="Previous step needs to be finished."/>
                <step_condition
                        type="TechDivision\PacemakerImportInventory\Virtual\Condition\NoConflictingStepsInProcess"
                        description="Avoid conflicts between import steps."/>
                <step_condition type="TechDivision\PacemakerIndexer\Virtual\Condition\NoConflictingIndexStepProcess"
                                description="Avoid conflicting steps for reindex process."/>
                <step_condition
                        type="TechDivision\PacemakerImportBase\Model\Condition\Step\NoConflictingIndexingProcess"
                        description="Avoid conflicting index processes for reindex process."/>
                <step_condition type="TechDivision\PacemakerImportBase\Model\Condition\Step\NoExceedMaxTime"
                                description="Prefer an indexer run if the configured time has been exceeded."/>
            </conditions>
            <arguments>
                <argument key="command" value="import:products:inventory"/>
                <argument key="operation" value="add-update"/>
            </arguments>
        </step>
        <step name="index_inventory_setter"
              executorType="TechDivision\PacemakerImportInventory\Virtual\Model\Indexer\InventoryIndexSetter"
              sortOrder="60" description="Set index info for related indexer">
            <conditions>
                <step_condition type="TechDivision\ProcessPipelines\Helper\Condition\Step\AttemptsLimit\Limit1"
                                description="Try once."/>
                <step_condition type="TechDivision\ProcessPipelines\Helper\Condition\Step\PreviousStepsCompleted"
                                description="Previous step needs to be finished."/>
            </conditions>
        </step>
        <step name="spawn_indexer" executorType="TechDivision\PacemakerIndexer\Model\IndexerSpawnExecutor"
              sortOrder="99" description="Spawn new indexer pipeline after import.">
            <conditions>
                <step_condition type="TechDivision\ProcessPipelines\Helper\Condition\Step\AttemptsLimit\Limit1"
                                description="Try once."/>
                <step_condition type="TechDivision\ProcessPipelines\Helper\Condition\Step\PreviousStepsCompleted"
                                description="Previous step needs to be finished."/>
            </conditions>
        </step>
    </pipeline>
</config>

Registration ImportFilesDataFetcher

With the existing configuration and the BunchResolver, the registration of the data processing in the ImportFilesDataFetcher can now be done.

With the existing pipeline definition, the PipelineInitializer and PipelineBuilder, which are executed through the Pacemaker heartbeat, can now be created.

The ImportFilesDataFetcher checks through the BunchResolver whether the pipeline is created. This ensures that the import pipeline is only created and executed if corresponding files are also stored.

ImportFilesDataFetcher extension for inventory import vendor/techdivision/pacemaker-import-inventory/etc/di.xml
<type name="TechDivision\PacemakerImportBase\Model\ImportFilesDataFetcher">
    <arguments>
        <argument name="resolverConfig" xsi:type="array">
            <item name="pacemaker.import.inventory" xsi:type="array">
                <item name="resolver" xsi:type="object">
                    TechDivision\PacemakerImportInventory\Virtual\ImportBunchResolver
                </item>
                <item name="validator" xsi:type="object">
                    TechDivision\PacemakerImportBase\Api\ImportBunchValidatorInterface
                </item>
                <item name="pipeline_name" xsi:type="string">
                    pacemaker_import_inventory
                </item>
                <item name="enable_config_path" xsi:type="string">
                    techdivision_pacemaker_import/inventory/enabled
                </item>
            </item>
        </argument>
    </arguments>
</type>

Dependency injection (optional)

If it is necessary to adjust some executors or conditions with DI.

Definition IndexSetter

If there is a need to regenerate Magento indexes after an import, it is recommended to use an index setter and an index spawner within the pipeline.

  • Here it makes sense to create a virtualization for the IndexSetter to set dedicated Index-Infos, which should be processed by the next running Indexer pipeline.

Virtualization IndexSetter for Inventory Import vendor/techdivision/pacemaker-import-inventory/etc/di.xml
<virtualType name="TechDivision\PacemakerImportInventory\Virtual\Model\Indexer\InventoryIndexSetter" type="TechDivision\PacemakerIndexer\Model\IndexerSetterExecutor">
    <arguments>
        <argument name="indexer" xsi:type="array">
            <item name="inventory" xsi:type="string">inventory</item>
            <item name="cataloginventory_stock" xsi:type="string">
                cataloginventory_stock
            </item>
        </argument>
    </arguments>
</virtualType>

Definition for Conditions and Executors

Additional specifications for pipeline conditions or step conditions can be declared with DI.

A step condition is useful here, which is necessary for the import step, so that cross influences from other processes are bypassed or blocked.

Example Virtualization NoConflictingSteps vendor/techdivision/pacemaker-import-inventory/etc/di.xml
<virtualType name="TechDivision\PacemakerImportInventory\Virtual\Condition\NoConflictingStepsInProcess" type="TechDivision\PacemakerImportBase\Model\Condition\Step\NoConflictingStepsInProcess">
    <arguments>
        <argument name="stepNames" xsi:type="array">
            <item name="inventory_import" xsi:type="string">inventory_import</item>.
        </argument>
    </arguments>
</virtualType>