Очередь Redis

Очередь сообщений на основе Redis, поддерживающая отложенную обработку сообщений.

Установка

composer require webman/redis-queue

Файл конфигурации

Файл конфигурации Redis автоматически создается в config/plugin/webman/redis-queue/redis.php и имеет следующее содержимое:

<?php
return [
    'default' => [
        'host' => 'redis://127.0.0.1:6379',
        'options' => [
            'auth' => '',         // Пароль, необязательный параметр
            'db' => 0,            // База данных
            'max_attempts'  => 5, // Количество попыток повторной обработки после неудачного выполнения
            'retry_seconds' => 5, // Интервал повторной обработки в секундах
        ]
    ],
];

Повторная обработка при неудачной попытке

Если обработка сообщения завершилась неудачей (возникло исключение), то сообщение будет помещено в отложенную очередь и будет ожидать повторной обработки. Количество попыток повторной обработки контролируется параметром max_attempts, а интервал между повторами устанавливается с помощью retry_seconds. Например, если max_attempts установлено в 5, а retry_seconds в 10, то время ожидания перед первой повторной обработкой будет 1*10 секунд, перед второй - 2*10 секунд, и так далее, до пятой попытки. Если количество попыток превысит установленное значение max_attempts, то сообщение будет помещено в очередь неудачных сообщений redis-queue-failed.

Отправка сообщения (синхронно)

Примечание
Требуется webman/redis >= 1.2.0, зависимость от расширения redis

<?php
namespace app\controller;

use support\Request;
use Webman\RedisQueue\Redis;

class Index
{
    public function queue(Request $request)
    {
        // Название очереди
        $queue = 'send-mail';
        // Данные, можно передавать массив напрямую, не нужно сериализовать
        $data = ['to' => 'tom@gmail.com', 'content' => 'hello'];
        // Отправка сообщения
        Redis::send($queue, $data);
        // Отправка отложенного сообщения, которое будет обработано через 60 секунд
        Redis::send($queue, $data, 60);

        return response('Тест очереди Redis');
    }

}

Успешная отправка сообщения Redis::send() вернет true, в противном случае вернется false или будет вызвано исключение.

Подсказка
Время обработки отложенной очереди может быть неточным, например из-за недостаточной скорости обработки, что может привести к задержке обработки очереди. Для смягчения этого можно запустить несколько процессов обработки.

Отправка сообщения (асинхронно)

<?php
namespace app\controller;

use support\Request;
use Webman\RedisQueue\Client;

class Index
{
    public function queue(Request $request)
    {
        // Название очереди
        $queue = 'send-mail';
        // Данные, можно передавать массив напрямую, не нужно сериализовать
        $data = ['to' => 'tom@gmail.com', 'content' => 'hello'];
        // Отправка сообщения
        Client::send($queue, $data);
        // Отправка отложенного сообщения, которое будет обработано через 60 секунд
        Client::send($queue, $data, 60);

        return response('Тест очереди Redis');
    }

}

Client::send() не возвращает значения, так как это асинхронная отправка, которая не гарантирует 100% доставку сообщения в Redis.

Подсказка
Принцип работы Client::send() заключается в создании очереди в локальной памяти и асинхронном ее синхронизации с Redis (синхронизация происходит очень быстро, около 10 000 сообщений в секунду). Если процесс перезапустится, и данные из очереди локальной памяти не будут синхронизированы, то это может привести к потере сообщений. Client::send() подходит для асинхронной отправки неважных сообщений.

Подсказка
Client::send() работает асинхронно и может использоваться только в среде запуска Workerman, для командных файлов следует использовать синхронный интерфейс Redis::send()

Отправка сообщения из другого проекта

Иногда требуется отправить сообщение из другого проекта, и нельзя использовать webman/redis-queue. В этом случае можно воспользоваться следующей функцией для отправки сообщений в очередь:

function redis_queue_send($redis, $queue, $data, $delay = 0) {
    $queue_waiting = '{redis-queue}-waiting';
    $queue_delay = '{redis-queue}-delayed';
    $now = time();
    $package_str = json_encode([
        'id'       => rand(),
        'time'     => $now,
        'delay'    => $delay,
        'attempts' => 0,
        'queue'    => $queue,
        'data'     => $data
    ]);
    if ($delay) {
        return $redis->zAdd($queue_delay, $now + $delay, $package_str);
    }
    return $redis->lPush($queue_waiting.$queue, $package_str);
}

Где параметр $redis - это экземпляр Redis. Пример использования расширения Redis:

$redis = new Redis;
$redis->connect('127.0.0.1', 6379);
$queue = 'user-1';
$data= ['some', 'data'];
redis_queue_send($redis, $queue, $data);

Потребление

Файл конфигурации процесса потребления находится в config/plugin/webman/redis-queue/process.php.
Потребители находятся в каталоге app/queue/redis/.

Запуск команды php webman redis-queue:consumer my-send-mail создаст файл app/queue/redis/MyMailSend.php.

Подсказка
Если команда отсутствует, то можно создать файл вручную.

<?php

namespace app\queue\redis;

use Webman\RedisQueue\Consumer;

class MyMailSend implements Consumer
{
    // Название очереди для потребления
    public $queue = 'send-mail';

    // Имя соединения, соответствует соединению из `plugin/webman/redis-queue/redis.php`
    public $connection = 'default';

    // Потребление
    public function consume($data)
    {
        // Нет необходимости в десериализации
        var_export($data); // Вывод ['to' => 'tom@gmail.com', 'content' => 'hello']
    }
}

Внимание
Отсутствие выброса исключения и ошибки в процессе потребления считается успешным, в противном случае потребление будет считаться неудачным и сообщение будет заново отправлено на обработку.
В Redis-queue нет механизма подтверждения прочтения (ack), и его можно рассматривать как автоматическое подтверждение прочтения (при отсутствии исключения или ошибки). Если в процессе потребления необходимо отметить текущее сообщение как необработанное, можно вручную вызвать исключение, чтобы сообщение снова было отправлено на обработку. Фактически это ничем не отличается от механизма ack.

Подсказка
Потребитель поддерживает многопроцессорные серверы, и одно сообщение не будет обработано повторно. Обработанное сообщение автоматически удаляется из очереди, не требуя ручного удаления.

Подсказка
Потребитель может одновременно обрабатывать несколько разных очередей, для добавления новой очереди не требуется изменение конфигурации в файле process.php. Для добавления нового потребителя очереди достаточно добавить соответствующий класс Consumer в каталог app/queue/redis и указать через свойство класса$queue название очереди для потребления.

Подсказка
Пользователям Windows необходимо запускать PHP через файл windows.php для запуска Webman, в противном случае процессы потребления не будут запущены.

Настройка различных процессов потребления для разных очередей

По умолчанию все потребители используют общие процессы потребления. Однако иногда нам нужно отделить потребление некоторых очередей, например, разместить медленные задачи в одну группу процессов, а быстрые задачи - в другую. Для этого мы можем разделить потребителей на два каталога, например, app_path() . '/queue/redis/fast' и app_path() . '/queue/redis/slow' (обратите внимание, что пространство имен потребителей должно быть соответствующим), затем настроить следующим образом:


return [
... здесь опущены другие настройки ...
'redis_consumer_fast'  => [
    'handler'     => Webman\RedisQueue\Process\Consumer::class,
    'count'       => 8,
    'constructor' => [
        // Каталог классов потребителя
        'consumer_dir' => app_path() . '/queue/redis/fast'
    ]
],
'redis_consumer_slow'  => [
    'handler'     => Webman\RedisQueue\Process\Consumer::class,
    'count'       => 8,
    'constructor' => [
        // Каталог классов потребителя
        'consumer_dir' => app_path() . '/queue/redis/slow'
    ]
]

];


Путем классификации каталогов и соответствующей конфигурации мы можем легко настроить различные процессы потребления для различных потребителей.

## Несколько конфигураций Redis
#### Конфигурация
`config/plugin/webman/redis-queue/redis.php`
```php
<?php
return [
    'default' => [
        'host' => 'redis://192.168.0.1:6379',
        'options' => [
            'auth' => null,       // Пароль, строковый тип, опциональный параметр
            'db' => 0,            // База данных
            'max_attempts'  => 5, // Количество попыток перезапуска после неудачи
            'retry_seconds' => 5, // Интервал повторной попытки, в секундах
        ]
    ],
    'other' => [
        'host' => 'redis://192.168.0.2:6379',
        'options' => [
            'auth' => null,       // Пароль, строковый тип, опциональный параметр
            'db' => 0,            // База данных
            'max_attempts'  => 5, // Количество попыток перезапуска после неудачи
            'retry_seconds' => 5, // Интервал повторной попытки, в секундах
        ]
    ],
];

Обратите внимание, что в конфигурации добавлено еще одно значение other как ключ для конфигурации Redis.

Отправка сообщений в несколько Redis

// Отправка сообщений в очередь с ключом `default`
Client::connection('default')->send($queue, $data);
Redis::connection('default')->send($queue, $data);
//  То же самое, что и
Client::send($queue, $data);
Redis::send($queue, $data);

// Отправка сообщений в очередь с ключом `other`
Client::connection('other')->send($queue, $data);
Redis::connection('other')->send($queue, $data);

Потребление из нескольких Redis

Настройка потребления сообщений из очереди с ключом other

namespace app\queue\redis;

use Webman\RedisQueue\Consumer;

class SendMail implements Consumer
{
    // Имя очереди для потребления
    public $queue = 'send-mail';

    // === Установлено как "other", что соответствует настройке потребления для очереди с ключом "other" ===
    public $connection = 'other';

    // Потребление
    public function consume($data)
    {
        // Не нуждается в десериализации
        var_export($data);
    }
}

Часто задаваемые вопросы

Почему возникает ошибка "Workerman\Redis\Exception: Workerman Redis Wait Timeout (600 seconds)"

Эта ошибка возникает только при использовании асинхронного метода отправки Client::send(). Сначала сообщение сохраняется в локальной памяти, а затем, когда процесс не занят, оно отправляется в Redis. Если скорость приема сообщений Redis медленнее, чем скорость их создания или если процесс постоянно занят выполнением других задач и не имеет достаточно времени для отправки сообщений из памяти в Redis, возникает сжатие сообщений. Если сообщение остается в памяти более 600 секунд, возникает эта ошибка.

Решение: Вместо асинхронной отправки сообщений используйте синхронный метод Redis::send().