Redis очередь

Сообщенийная очередь на основе Redis, поддерживает задержку обработки сообщений.

Установка

composer require webman/redis-queue

Конфигурационный файл

Конфигурационный файл Redis автоматически создается по адресу {основной_проект}/config/plugin/webman/redis-queue/redis.php, его содержание похоже на следующее:

<?php
return [
    'default' => [
        'host' => 'redis://127.0.0.1:6379',
        'options' => [
            'auth' => '',         // Пароль, необязательный параметр
            'db' => 0,            // База данных
            'max_attempts'  => 5, // Количество повторных попыток при неудаче
            'retry_seconds' => 5, // Интервал повторной попытки, в секундах
        ]
    ],
];

Повторные попытки при неудаче

Если обработка завершается неудачей (возникает исключение), сообщение помещается в очередь задержки, ожидая следующей попытки. Количество попыток контролируется параметром max_attempts, а интервал повторной попытки контролируется retry_seconds и max_attempts вместе. Например, если max_attempts равно 5, а retry_seconds равно 10, то интервал для первой повторной попытки составит 1*10 секунд, для второй повторной попытки — 2*10 секунд, для третьей — 3*10 секунд, и так далее, пока не будет достигнуто 5 попыток. Если количество попыток превышает установленное значение max_attempts, то сообщение помещается в очередь ошибок с ключом {redis-queue}-failed.

Отправка сообщения (синхронно)

<?php
namespace app\controller;

use support\Request;
use Webman\RedisQueue\Redis;

class Index
{
    public function queue(Request $request)
    {
        // Имя очереди
        $queue = 'send-mail';
        // Данные, можно передать массив без сериализации
        $data = ['to' => 'tom@gmail.com', 'content' => 'hello'];
        // Отправка сообщения
        Redis::send($queue, $data);
        // Отправка сообщения с задержкой, сообщение будет обработано через 60 секунд
        Redis::send($queue, $data, 60);

        return response('redis queue test');
    }

}

При успешной отправке Redis::send() вернет true, в противном случае — false или выбросит исключение.

Подсказка
Время потребления из очереди с задержкой может отличаться от ожидаемого, например, если скорость потребления меньше скорости производства, это может привести к накоплению очереди и задержке в потреблении. Решение — запустить больше процессов потребления.

Отправка сообщения (асинхронно)

<?php
namespace app\controller;

use support\Request;
use Webman\RedisQueue\Client;

class Index
{
    public function queue(Request $request)
    {
        // Имя очереди
        $queue = 'send-mail';
        // Данные, можно передать массив без сериализации
        $data = ['to' => 'tom@gmail.com', 'content' => 'hello'];
        // Отправка сообщения
        Client::send($queue, $data);
        // Отправка сообщения с задержкой, сообщение будет обработано через 60 секунд
        Client::send($queue, $data, 60);

        return response('redis queue test');
    }

}

Client::send() не возвращает значение, это асинхронная отправка, она не гарантирует 100% доставку сообщения в Redis.

Подсказка
Принцип работы Client::send() заключается в создании очереди в локальной памяти, сообщения асинхронно синхронизируются с Redis (скорость синхронизации очень высокая, примерно 10 тысяч сообщений в секунду). Если процесс перезапускается, и данные в локальной памяти не синхронизированы полностью, это приведет к потере сообщения. Асинхронная отправка Client::send() подходит для отправки не важных сообщений.

Подсказка
Client::send() является асинхронным и может использоваться только в среде выполнения workerman, для командной строки следует использовать синхронный интерфейс Redis::send().

Отправка сообщений из других проектов

Иногда вам нужно отправить сообщения из других проектов и вы не можете использовать webman\redis-queue, тогда вы можете воспользоваться следующей функцией для отправки сообщений в очередь.

function redis_queue_send($redis, $queue, $data, $delay = 0) {
    $queue_waiting = '{redis-queue}-waiting';
    $queue_delay = '{redis-queue}-delayed';
    $now = time();
    $package_str = json_encode([
        'id'       => rand(),
        'time'     => $now,
        'delay'    => $delay,
        'attempts' => 0,
        'queue'    => $queue,
        'data'     => $data
    ]);
    if ($delay) {
        return $redis->zAdd($queue_delay, $now + $delay, $package_str);
    }
    return $redis->lPush($queue_waiting.$queue, $package_str);
}

При этом параметр $redis является экземпляром Redis. Например, использование расширения Redis может выглядеть следующим образом:

$redis = new Redis;
$redis->connect('127.0.0.1', 6379);
$queue = 'user-1';
$data= ['some', 'data'];
redis_queue_send($redis, $queue, $data);

Потребление

Конфигурационный файл процесса потребления находится по адресу {основной_проект}/config/plugin/webman/redis-queue/process.php.
Директория потребителей находится в {основной_проект}/app/queue/redis/.

Выполнение команды php webman redis-queue:consumer my-send-mail создаст файл {основной_проект}/app/queue/redis/MyMailSend.php

Подсказка
Если команды не существует, вы также можете создать файл вручную.

<?php

namespace app\queue\redis;

use Webman\RedisQueue\Consumer;

class MyMailSend implements Consumer
{
    // Имя очереди для потребления
    public $queue = 'send-mail';

    // Имя подключения, соответствует подключению в plugin/webman/redis-queue/redis.php
    public $connection = 'default';

    // Потребление
    public function consume($data)
    {
        // Сериализация не требуется
        var_export($data); // Вывод ['to' => 'tom@gmail.com', 'content' => 'hello']
    }
    // Обратный вызов при неудаче потребления
    /* 
    $package = [
        'id' => 1357277951, // ID сообщения
        'time' => 1709170510, // Время сообщения
        'delay' => 0, // Время задержки
        'attempts' => 2, // Количество попыток
        'queue' => 'send-mail', // Имя очереди
        'data' => ['to' => 'tom@gmail.com', 'content' => 'hello'], // Содержимое сообщения
        'max_attempts' => 5, // Максимальное количество повторных попыток
        'error' => 'Ошибка информации' // Сообщение об ошибке
    ]
    */
    public function onConsumeFailure(\Throwable $e, $package)
    {
        echo "Потребление неудачно\n";
        echo $e->getMessage() . "\n";
        // Сериализация не требуется
        var_export($package); 
    }
}

Внимание
Если во время обработки исключения или ошибки не возникает, это считается успешным потреблением, в противном случае считается неудачным, и сообщение попадает в очередь повторной попытки.
В Redis-очереди нет механизма подтверждения (ack), его можно рассматривать как автоматический ack (если исключение или ошибка не были сгенерированы). Если во время обработки сообщения вы хотите пометить текущее сообщение как неуспешное, вы можете вручную выбросить исключение, чтобы текущее сообщение попало в очередь повторных попыток. Это на самом деле не отличается от механизма ack.

Подсказка
Потребители поддерживают многосерверную и многопроцессорную архитектуру, и одно и то же сообщение не будет потребляться повторно. Потребленные сообщения автоматически удаляются из очереди, ручное удаление не требуется.

Подсказка
Процесс потребления может одновременно обрабатывать множество различных очередей, добавление новых очередей не требует изменения конфигурации в process.php, для добавления новых потребителей очереди необходимо просто добавить соответствующий класс Consumer в app/queue/redis и указать имя очереди, которую вы хотите обрабатывать, с помощью свойства класса $queue.

Подсказка
Пользователям Windows необходимо запускать php windows.php для запуска Webman, иначе процесс потребления не будет запущен.

Подсказка
Обратный вызов onConsumeFailure будет вызываться каждый раз при неудаче потребления, вы можете обработать в нем логику после неудачи. (Эта функция требует webman/redis-queue>=1.3.2 и workerman/redis-queue>=1.2.1)

Установка различных процессов потребления для различных очередей

По умолчанию, все потребители используют один и тот же процесс потребления. Но иногда нам необходимо раздельно обрабатывать некоторые очереди потребления, например, медленные бизнес-процессы — в одной группе процессов, а быстрые — в другой группе. Для этого мы можем разделить потребителей на две директории, например, app_path() . '/queue/redis/fast' и app_path() . '/queue/redis/slow' (обратите внимание, что пространство имен классов потребителей необходимо изменить соответственно), тогда конфигурация будет выглядеть следующим образом:

return [
    ...другие настройки опущены...

    'redis_consumer_fast'  => [ // ключ настраиваемый, нет ограничений по формату, здесь назван redis_consumer_fast
        'handler'     => Webman\RedisQueue\Process\Consumer::class,
        'count'       => 8,
        'constructor' => [
            // Директория классов потребителей
            'consumer_dir' => app_path() . '/queue/redis/fast'
        ]
    ],
    'redis_consumer_slow'  => [  // ключ настраиваемый, нет ограничений по формату, здесь назван redis_consumer_slow
        'handler'     => Webman\RedisQueue\Process\Consumer::class,
        'count'       => 8,
        'constructor' => [
            // Директория классов потребителей
            'consumer_dir' => app_path() . '/queue/redis/slow'
        ]
    ]
];

Таким образом, быстрые бизнес-процессы помещаются в директорию queue/redis/fast, а медленные бизнес-процессы помещаются в директорию queue/redis/slow, чтобы задать очередь для отдельных процессов потребления.

Многочисленные конфигурации Redis

Конфигурация

config/plugin/webman/redis-queue/redis.php

<?php
return [
    'default' => [
        'host' => 'redis://192.168.0.1:6379',
        'options' => [
            'auth' => null,       // Пароль, строкового типа, необязательный параметр
            'db' => 0,            // База данных
            'max_attempts'  => 5, // Количество повторных попыток при неудаче
            'retry_seconds' => 5, // Интервал повторной попытки, в секундах
        ]
    ],
    'other' => [
        'host' => 'redis://192.168.0.2:6379',
        'options' => [
            'auth' => null,       // Пароль, строкового типа, необязательный параметр
            'db' => 0,            // База данных
            'max_attempts'  => 5, // Количество повторных попыток при неудаче
            'retry_seconds' => 5, // Интервал повторной попытки, в секундах
        ]
    ],
];

Обратите внимание, что в конфигурацию добавлен параметр other как ключ для конфигурации Redis.

Многочисленные отправки сообщений в Redis

// Отправка сообщения в очередь с ключом `default`
Client::connection('default')->send($queue, $data);
Redis::connection('default')->send($queue, $data);
//  То же самое, что и
Client::send($queue, $data);
Redis::send($queue, $data);

// Отправка сообщения в очередь с ключом `other`
Client::connection('other')->send($queue, $data);
Redis::connection('other')->send($queue, $data);

Многочисленные потребления из Redis

Конфигурация для потребления, где ключом будет other

namespace app\queue\redis;

use Webman\RedisQueue\Consumer;

class SendMail implements Consumer
{
    // Имя очереди для потребления
    public $queue = 'send-mail';

    // === Здесь устанавливается значение other, что соответствует конфигурации с ключом other ===
    public $connection = 'other';

    // Потребление
    public function consume($data)
    {
        // Сериализация не требуется
        var_export($data);
    }
}

Часто задаваемые вопросы

Почему возникает ошибка Workerman\Redis\Exception: Workerman Redis Wait Timeout (600 seconds)?

Эта ошибка возникнет только в асинхронном методе отправки Client::send(). Асинхронная отправка сначала сохраняет сообщения в локальной памяти, а затем, когда процесс свободен, отправляет сообщения в Redis. Если скорость приема Redis медленнее, чем скорость производства сообщений, или если процесс долго загружен другими задачами и не имеет достаточно времени для синхронизации сообщений из памяти в Redis, это может привести к задержке сообщений. Если задержка превосходит 600 секунд, возникает эта ошибка.

Решение: используйте синхронный метод отправки Redis::send().