Magento 2: How to Publish a Message Queue

Magento 2 Publish Message Queue - SbDevBlog

We have created a custom message queue or consumer queue. Creating a custom consumer queue will only work if we push that message to the queue. Pushing the message into a queue is known as publishing the message queue of the consumer cron.

Consumers pick up a message from the database table queue_message based on the status in the DB table queue_message_status.

But the question is how to publish a message to the message queue. In other words, how can we add a message to the table queue_message and set its status as new in the table queue_message_status?

The answer is straightforward. Let’s check it out.

Magento 2: How to Publish a Message Queue

We can use the PublisherInterface interface of the message queue from the Magento framework. We must add it to the class according to our requirements. For example, if we are developing a module to import products in bulk from third-party servers based on time, we can publish our message using cron.

The product export feature in Magento 2 is the best example of understanding publishing the message queue. Whenever we try to export products from the backend of Magento 2, it shows the below message:

Error: “Message is added to queue, wait to get your file soon”

So, Magento 2 publishes a message named “import_export.export”. Once the consumer message executes, we can see the exported file in our file system.

In this post, I have created a console command that allows us to publish dynamic messages as an optional argument. This is an optional argument because I will post my sample message to the queue if the arguments don’t specify it.

  • We must create di.xml to create a console command.
<?xml version="1.0"?>
<!--
/**
 * @copyright Copyright (c) sbdevblog (https://www.sbdevblog.com)
 */
-->
<config xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
        xsi:noNamespaceSchemaLocation="urn:magento:framework:ObjectManager/etc/config.xsd">
    <type name="Magento\Framework\Console\CommandList">
        <arguments>
            <argument name="commands" xsi:type="array">
                <item name="publishconsumerexample" xsi:type="object">SbDevBlog\Crons\Console\PublishQueue</item>
            </argument>
        </arguments>
    </type>
</config>
  • Next, we must create a console class to publish the queue.
<?php

namespace SbDevBlog\Crons\Console;

use Symfony\Component\Console\Command\Command;
use Symfony\Component\Console\Input\InputInterface;
use Symfony\Component\Console\Output\OutputInterface;
use Magento\Framework\MessageQueue\PublisherInterface;
use Symfony\Component\Console\Input\InputOption;

class PublishQueue extends Command
{
    private const QUEUE_IDENTFIER = "sbdevblog.consumer.example";
    private const ARG_QUEUE = "queue_to_publish";
    private const QUEUE_PAYLOAD = "queue_payload";

    /**
     * @var PublisherInterface
     */
    private PublisherInterface $publisher;

    /**
     * Constructor
     *
     * @param PublisherInterface $publisher
     * @param string|null $name
     */
    public function __construct(
        PublisherInterface $publisher,
        string             $name = null
    )
    {
        $this->publisher = $publisher;
        parent::__construct($name);
    }

    /**
     * Configure command
     *
     * @return void
     */
    protected function configure()
    {
        $this->setName("sbdevblog:publish_queue")
            ->setDescription("Publish your queue");

        $this->setDefinition([
            new InputOption(
                self::ARG_QUEUE, 'sbq', InputOption::VALUE_OPTIONAL, __("Queue Identifier")
            ),
            new InputOption(
                self::QUEUE_PAYLOAD, 'sbp', InputOption::VALUE_OPTIONAL, __("Pass PayLoad must be string or JSON")
            )
        ]);
        parent::configure();
    }

    /**
     * Resetting index by key
     *
     * @param InputInterface $input
     * @param OutputInterface $output
     * @return void
     */
    protected function execute(InputInterface $input, OutputInterface $output)
    {
        $queue = $input->getOption(self::ARG_QUEUE) ? $input->getOption(self::ARG_QUEUE) : self::QUEUE_IDENTFIER;
        $payLoad = $input->getOption(self::QUEUE_PAYLOAD) ? $input->getOption(self::QUEUE_PAYLOAD) : "";
        $this->publisher->publish($queue, $payLoad);
        $output->writeln(__("Queue with identifier  %1 published", $queue));
    }
}

The publisher interface

The publisher class has a function named publish with two arguments. The first argument is the name of the message queue, and the second argument is taking data in a mixed format that we need in the consumer class to fulfil the requirement.

That’s it. Thanks for reading Post-Magento 2: How to Publish a Message Queue. Please subscribe and share with your connections, and yes, please post your suggestions in the comment section.

Download the source Code.

Note: Please verify the code of this blog and the relevant git repository before using it in production.
sb dev blog adobe commece Magento 2

🙂 HAPPY CODING 🙂

One thought on “Magento 2: How to Publish a Message Queue

Leave a Reply

Your email address will not be published. Required fields are marked *