Magento 2: How to Create a Message Queue

Magento 2: How to Create a Message Queue - SbDevBlog

The consumer queue is also known as a message queue. We will create a message queue in this post.

Before proceeding, let’s understand what the message queue is. A message queue, or consumer queue, provides an asynchronous communication system that processes the messages without having communication between sender and receiver.

The sender pushes the message into the queue using the publisher class. This process is known as the publishing of messages. The queue stores the messages published by the sender. A consumer picks up the message from the queue. A single consumer processes each message. The messages have different statuses like New, In Progress, Complete, Retry Required, Error, and Deleted. Click here for detailed information on the different statuses of the message queue.

We can use Rabbit MQ or MySQL message queues.

In this post, we will focus on the practical implementation of the message queue. For a detailed explanation, I will create a new post.

Magento 2: How to Create a Message Queue

To create a message queue, we must define its configuration. To determine the configuration, we must make the following files inside the etc directory of our custom module:

  1. communication.xml
  2. queue.xml
  3. queue_publisher.xml
  4. queue_topology.xml

As discussed, we will implement the practicalities of the above files. I will create new posts for the above files for detailed theoretical knowledge. So, let’s implement this.

  • communication.xml
    The components of the message queuing system that are common to all forms of communication are defined in this file.

    We must define a topic using a topic tag. The topic can be determined by passing a string in the name attribute of the topic tag. The request attribute specifies the data type of the topic.

    A string that uniquely defines the handler. The name can be derived from the topic name if the handler is specific. If the handler provides more generic capabilities, name the handler so that it describes those capabilities. The type attribute is used to specify a class or interface. The method we operate on in a specified class must pass in the method attribute as a default method.
<config xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:noNamespaceSchemaLocation="urn:magento:framework:Communication/etc/communication.xsd">
    <topic name="sbdevblog.consumer.example" request="string">
        <handler name="sbdevblog.consumer.example" type="SbDevBlog\Crons\Model\Consumer" method="process" />
    </topic>
</config>

Set the connection between the queue and its consumer.

  • queue.xml
    It determines the relationship between an existing queue and its consumer. It contains a broker tag and a queue as a subtag of the broker tag. A broker can have multiple queue tags to create relationships between the queue and its consumer.
<config xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:noNamespaceSchemaLocation="urn:magento:framework-message-queue:etc/queue.xsd">
    <broker topic="sbdevblog.consumer.example" exchange="magento-db" type="db">
        <queue name="sbdevblog.consumer.example"
               consumer="sbdevblog.consumer.example"
               consumerInstance="Magento\Framework\MessageQueue\Consumer"
               handler="SbDevBlog\Crons\Model\Consumer::process"/>
    </broker>
</config>
  • queue_publisher.xml
    It defines which connection and exchange to publish messages on a specific topic. This file contains the required publisher tag. This tag includes the topic name we define in the file communication.xml. The publisher tag has an optional tag connection to define an AMQP or database connection.
  • The connection tag also contains a disabled attribute. We can disable the consumer queue by passing true in the disabled attribute. By default, it is false.
  • We may publish a message queue using the name of the topic.
<config xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:noNamespaceSchemaLocation="urn:magento:framework-message-queue:etc/publisher.xsd">
    <publisher topic="sbdevblog.consumer.example">
        <connection name="db" exchange="magento-db" />
    </publisher>
</config>
  • queue_topology.xml
    It defines the message routing rules and declares queues and exchanges. It contains the following elements:
    exchange
    exchange/binding (optional)
    exchange/arguments (optional)
    exchange/binding/arguments (optional)
<config xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:noNamespaceSchemaLocation="urn:magento:framework-message-queue:etc/topology.xsd">
    <exchange name="magento-db" type="topic" connection="db">
        <binding id="exampleConsumer" topic="sbdevblog.consumer.example" destinationType="queue" destination="sbdevblog.consumer.example"/>
    </exchange>
</config>

Finally, we need to create a class with the method specified as the handler.

<?php

namespace SbDevBlog\Crons\Model;

use Psr\Log\LoggerInterface;

class Consumer
{
    /**
     * @var LoggerInterface
     */
    private LoggerInterface $logger;

    /**
     * Constructor
     *
     * @param LoggerInterface $logger
     */
    public function __construct(LoggerInterface $logger)
    {
        $this->logger = $logger;
    }

    /**
     * Executing consumer queue
     *
     * @return void
     */
    public function process()
    {
        $this->logger->info("We have executed the queue successfully");
    }
}

The consumer queue configuration

Although consumers run them automatically using cron jobs. We may also run consumer by using an explicit command. Note that we must publish the message queue to make it executable. Click here to learn how we can publish the message queue.

php bin/magento queue:consumer:start <consumer_identifier>

//In our case:

php bin/magento queue:consumer:start sbdevblog.consumer.example

Also, we must mention the consumer queue identifier inside env.php.

...
    'cron_consumers_runner' => [
        'cron_run' => false,
        'max_messages' => 20000,
        'consumers' => [
            'consumer1',
            'consumer2',
        ]
    ],
...

In the case of Adobe Commerce Cloud, we can define a consumer queue inside the .magento.env.yaml file.

stage:
  deploy:
    CRON_CONSUMERS_RUNNER:
      cron_run: true
      max_messages: 1000
      consumers:
        - example_consumer_1
        - example_consumer_2
-     multiple_processes:
        example_consumer_1: 4
        example_consumer_2: 3

Thanks for reading a blog on Magento 2: How to Create a Message Queue. Please subscribe and share with your connection. Also, you can use the comment box to give your feedback.

Click here to create a custom cron job.

Download the source code.

Note: Please verify the code of this blog and the relevant git repository before using it in production.
sb dev blog adobe commece Magento 2

🙂 HAPPY CODING 🙂

4 thoughts on “Magento 2: How to Create a Message Queue

Leave a Reply

Your email address will not be published. Required fields are marked *