【MQ04】Message persistence and confirmation mechanism

Message persistence and confirmation mechanism

The core function of a message queue is the sequential sending and receiving of messages, which we have already understood before. The core guarantee mechanism is based on the basic functions, so that messages are not lost and messages are not sent repeatedly. For these two functions, most message queue applications will be implemented through the persistence mechanism and message confirmation mechanism. Today we will start with the related functions of RabbitMQ.

Endurance

For efficiency and performance, message queue products are basically in-memory databases. Yes, just like Redis, MongoDB and ES, it also uses memory as the main storage. Just imagine, if our consumer is very simple and can quickly process the data in the queue, then in fact, as soon as the producer sends it to the queue, the consumer will immediately take it away and consume it. In this case, memory is indeed the most suitable scenario, because the processing speed is fast and the memory does not take up a lot of space.

However, we must also take into account the complexity of consumer business and the problems that cannot be handled quickly. Moreover, this is also the core issue we want to introduce the message queue. Usually, it is to convert slow and time-consuming operations into asynchronous operations through message queues. This is its most typical application scenario. And if the production speed is very fast, but consumption cannot keep up, message accumulation will occur. We should process the data in the queue as quickly as possible. We can open multiple threads, coroutines, or even start multiple processes on multiple machines for consumption together. However, it is still possible to not keep up with the speed at which producers produce messages. If the power is cut off or restarted at this time, the message will be lost if only the memory is used.

This is what persistence does. To put it bluntly, it is the same concept as the Redis persistence we learned before. Remember Redis persistence, right? There are two types, RDB and AOF. RabbitMQ similarly performs data persistence in the form of append logs. But one thing to note is that in RabbitMQ, what we want to persist is the message data. At the same time, the queue can also be persisted, and if a switch is used, the switch can also be persisted.

The persistence of queues and switches actually means that when we restart the RabbitMQ instance, the corresponding queues and switches are still there. If it is not persisted, the queue and switch parts will also be empty.

Message persistence is the real persistence of data.

Let's first define the persistence of the queue.

//                                   durableThe parameters should be set to truechannel->queue_declare('hello', false, true, false, false);

Then, when the message object is instantiated, message persistence is specified by adding a delivery_mode parameter.

$msg = new AMQPMessage('Hello World!', ['delivery_mode' => AMQPMessage::DELIVERY_MODE_PERSISTENT]);

In this way, the persistence settings of the queue and message are completed. Now you can add a piece of data to the queue, then restart RabbitMQ, and then consume it to see if you can still consume the data before the restart. If the above configuration is not performed, the consumer will not obtain any data. You can just test this test yourself. I will demonstrate it in detail when I record the video.

Lazy queue

In addition to ordinary persistence, RabbitMQ also provides a feature called “lazy queue”. The lazy queue will store messages on disk as much as possible, and will only be loaded into memory when the consumer consumes the corresponding message. One of its important design goals is to be able to support longer queues, that is, to support more Message storage, after all, the capacity of disk is much higher than that of memory. Lazy queues are necessary when consumers are unable to consume messages for a long time due to various reasons (such as consumers being offline, down, or shut down due to maintenance, etc.).

Since a disk is used, it will definitely bring about a decrease in performance. Needless to say, everyone can guess this. Therefore, how to weigh and use it depends on the specific business scenario. If your producer is very fast, but the consumer processes very slowly due to various business logic, which can easily cause a large amount of messages to accumulate, then you must use a lazy queue.

Its configuration is also very simple. When defining the queue, add an x-queue-mode attribute parameter and set it to lazy.

$channel->queue_declare('hello', false,false,false,false,false,new AMQPTable([
    'x-queue-mode'=>'lazy'
]));

ACK confirmation

In addition to persistence, a core function of most message queue tools is ACK confirmation. This is a bit like the TCP three-way handshake. During the three-way handshake, the ACK flag is also sent to indicate confirmation. Okay, let’s not go too far. The ACK of the message queue actually means that by default, if a message is taken away, just like it is POPed in Redis, then the message will be deleted directly from the queue.

However, imagine a problem, that is, the consumer processing fails and an exception occurs. At this time, the message was actually not processed correctly. However, it has been deleted and removed from the message queue, resulting in message loss. The ACK mechanism actually means that when a problem occurs with the consumer or the consumer's connection is interrupted, if the message is not confirmed for consumption, it will be added back to the original consumption queue and consumed again.

It is very simple to use RabbitMQ to configure ACK. When the producer sends a message, we need to set the fourth parameter, no_ack, to false.

//                                         no_ack To be set to falsechannel->basic_consume('hello', '', false, false, false, false, $callback);

Then, on the consumer side, the ack() method of the Message object returned by the callback parameter needs to be called in the callback function.

$callback = function ($msg) {
    echo 'data received: ', $msg->body, PHP_EOL;
    $msg->ack();
};

This completes the application of the ACK mechanism. Now you can try to comment out the code that uses the ack() method in the callback function, and then use the consumer to consume. After the consumption is completed, directly close the consumer and open it again. You will find that messages that do not call the ack() method will always be consumed. This is because if no_ack is set to false in the message sent by the producer, then the message must be called ack() method before it will be considered to have been consumed normally. Otherwise, whether the client connection fails, reports an exception, or exceeds the timeout set in the specified rabbit.conf file, the message will be put back into the original queue.

The default timeout is 30 minutes, which is configured through consumer_timeout in the rabbit.conf file.

Release Confirmation

In addition to the confirmation of the news, there is also a confirmation of the release. The above ACK confirmation confirms whether the message has been consumed. The release confirmation refers to whether the message has been released to the queue. The key point of this concept is that in RabbitMQ, there are two layers of processing: switches and queues. We need to ensure that the message is sent to the queue, and then in the queue, there is a corresponding persistence mechanism to ensure that the message is not lost.

Or in other words, from a business perspective, the core of our producer business code is to call the queue interface to send messages. If the sending fails, it is actually an exception. In most cases, this exception is caused by network problems. This kind of problem can be caught through the release confirmation mechanism. This mechanism is a confirmation of whether a message has been queued, not a confirmation that the message has been consumed.

There are several forms of release confirmation, including single confirmation, batch confirmation and callback confirmation. The individual performance efficiency is relatively low, but it is actually sufficient for most applications. The callback function can facilitate our subsequent processing of unconfirmed and confirmed data.

// ……………………
channel->confirm_select(); // Turn on publishing confirmation

// Confirm callbackchannel->set_ack_handler(
    function (AMQPMessage message){
        echo 'The message has been sent successfully!', message->body, PHP_EOL;
    }
);
// Failure callback
channel->set_nack_handler(
    function (AMQPMessage message){
        echo 'Message sending failed,We have to do something else!', message->body, PHP_EOL;
    }
);
// ……………………
// ……………………
// ……………………channel->basic_publish(msg, '', 'hello'); // Put message in queue
//channel->wait_for_pending_acks(5);  // single confirmation
// ……………………

If there is a message that fails to be published, we can perform special processing on the message, such as recording it in the log, or putting it in the MySQL database, or putting it in another queue for processing by a specific consumer. Under normal circumstances, messages can be sent successfully.

> php 4.rq.p.php
The producer sends information to the message queue:Hello World!The message has been sent successfully!Hello World!

Using Redis driver in Laravel

We have said before that the List, PubSub and Stream functions in Redis are not a complete message queue application. The main reason is that there is no ACK mechanism in Redis.

Not to mention the persistence mechanism, Redis's RDB and AOF are its persistence mechanisms, and they can also persist data in the queue.

The lack of ACK mechanism can actually be made up for by business code. For example, the queue-related functions in Laravel or TP framework have a retry function. It may not be a complete ACK mechanism, but it can also be regarded as a supplement to the ACK mechanism. We can specify the number of retries when running the Job.

php artisan queue:work --tries=3

In this way, the data in the queue has three chances to be retried for execution. We can throw exceptions directly in the Job to simulate consumption failure.

public function handle()
{
  //
  echo 'Message received:' .$this->msg, ' ',time(),PHP_EOL;
  sleep(10);
  throw new Exception();
}

Then observe the queue data stored in Redis.

{
 "uuid": "4a38d37b-86e7-4755-a29a-f6843b7289cc",
 "timeout": null,
 "id": "mg3RA7n3JW7CB3WUllKXTT6sPUvdJ0rF",
 "backoff": null,
 "displayName": "App\\Jobs\\Queue4",
 "maxTries": null,
 "failOnTimeout": false,
 "maxExceptions": null,
 "retryUntil": null,
 "job": "Illuminate\\Queue\\CallQueuedHandler@call",
 "data": {
  "command": "O:15:\"App\\Jobs\\Queue4\":11:{s:3:\"msg\";s:6:\"\xe6\xb5\x8b\xe8\xaf\x95\";s:3:\"job\";N;s:10:\"connection\";N;s:5:\"queue\";N;s:15:\"chainConnection\";N;s:10:\"chainQueue\";N;s:19:\"chainCatchCallbacks\";N;s:5:\"delay\";N;s:11:\"afterCommit\";N;s:10:\"middleware\";a:0:{}s:7:\"chained\";a:0:{}}",
  "commandName": "App\\Jobs\\Queue4"
 },
 "attempts": 2   // Note that this field will always be added
}

Yes, the last field attempts is the number of retries for this queue data. When the number of retries we specify is exceeded, an exception will be returned.

[2022-12-31 03:57:03][mg3RA7n3JW7CB3WUllKXTT6sPUvdJ0rF] Processing: App\Jobs\Queue4
Message received:test 1672459023
[2022-12-31 03:57:13][mg3RA7n3JW7CB3WUllKXTT6sPUvdJ0rF] Processing: App\Jobs\Queue4
Message received:test 1672459033
[2022-12-31 03:57:23][mg3RA7n3JW7CB3WUllKXTT6sPUvdJ0rF] Failed:     App\Jobs\Queue4

The implementation of the above functions is based on the code in the Laravel framework, but the implementation of the TP queue component is similar. The specific code is in /vendor/laravel/framework/src/Illuminate/Queue/Worker.php, and the following method will eventually be called.

// If a given job has exceeded the maximum number of attempts allowed,then mark it as failed。
markJobAsFailedIfAlreadyExceedsMaxAttempts()

Summarize

In today's content, we mainly study the persistence and confirmation mechanism of messages. These two parts are also the main functions of various message queue systems to solve the problem of message loss and retransmission. We also learned that in the Laravel framework, using Redis as a queue driver actually achieves similar functions through business code and special fields in the queue data format. However, from here you can also see that although Redis can do queues, it does not have all the functional features of complete queues.

In RabbitMQ, there are also transaction-related functions. The release confirmation we learned above can also be regarded as a transaction implementation. Therefore, you can learn more about the transaction content yourself. The main ones used are $channel->txSelect(), $channel->tx_commit() and $channel->tx_rollback() these three methods.

Okay, we will see a few more examples like this next, and the learning continues.