Cola de Redis

Una cola de mensajes basada en Redis, que admite el procesamiento de mensajes con retraso.

Instalación

composer require webman/redis-queue

Archivo de configuración

El archivo de configuración de Redis se genera automáticamente en {proyecto principal}/config/plugin/webman/redis-queue/redis.php, y su contenido es similar al siguiente:

<?php
return [
    'default' => [
        'host' => 'redis://127.0.0.1:6379',
        'options' => [
            'auth' => '',         // Contraseña, parámetro opcional
            'db' => 0,            // Base de datos
            'max_attempts'  => 5, // Número de reintentos después de un fallo en el consumo
            'retry_seconds' => 5, // Intervalo de reintento, en segundos
        ]
    ],
];

Reintentos en caso de fallo de consumo

Si el consumo falla (ocurre una excepción), el mensaje se colocará en una cola de retraso, esperando el próximo reintento. El número de reintentos se controla a través del parámetro max_attempts, y el intervalo de reintento es controlado conjuntamente por retry_seconds y max_attempts. Por ejemplo, si max_attempts es 5 y retry_seconds es 10, el primer intervalo de reintento será 1*10 segundos, el segundo intervalo de reintento será 2*10 segundos, el tercero será 3*10 segundos, y así sucesivamente hasta un máximo de 5 intentos. Si se supera el número de reintentos definido en max_attempts, el mensaje se colocará en la cola de fallos con la clave {redis-queue}-failed.

Envío de mensajes (sincrónico)

<?php
namespace app\controller;

use support\Request;
use Webman\RedisQueue\Redis;

class Index
{
    public function queue(Request $request)
    {
        // Nombre de la cola
        $queue = 'send-mail';
        // Datos, se puede pasar directamente el array, no es necesario serializar
        $data = ['to' => 'tom@gmail.com', 'content' => 'hello'];
        // Envío del mensaje
        Redis::send($queue, $data);
        // Envío de un mensaje con retraso, el mensaje será procesado después de 60 segundos
        Redis::send($queue, $data, 60);

        return response('prueba de cola redis');
    }

}

Si el envío es exitoso, Redis::send() devolverá true, de lo contrario devolverá false o lanzará una excepción.

Sugerencia
El tiempo de consumo de la cola de retraso puede presentar discrepancias, por ejemplo, si la velocidad de consumo es menor que la velocidad de producción, lo que lleva a una acumulación en la cola y, por tanto, a un retraso en el consumo. Una solución es abrir más procesos de consumo.

Envío de mensajes (asincrónico)

<?php
namespace app\controller;

use support\Request;
use Webman\RedisQueue\Client;

class Index
{
    public function queue(Request $request)
    {
        // Nombre de la cola
        $queue = 'send-mail';
        // Datos, se puede pasar directamente el array, no es necesario serializar
        $data = ['to' => 'tom@gmail.com', 'content' => 'hello'];
        // Envío del mensaje
        Client::send($queue, $data);
        // Envío de un mensaje con retraso, el mensaje será procesado después de 60 segundos
        Client::send($queue, $data, 60);

        return response('prueba de cola redis');
    }

}

Client::send() no tiene valor de retorno, pertenece a la categoría de envío asincrónico, y no garantiza que el mensaje se entregue al 100% a Redis.

Sugerencia
El principio de Client::send() es establecer una cola en memoria local y enviar los mensajes de manera asincrónica a Redis (con una velocidad de sincronización muy rápida, alrededor de 10,000 mensajes por segundo). Si el proceso se reinicia y los datos en la cola de memoria local no se han sincronizado completamente, puede resultar en la pérdida de mensajes. Client::send() es adecuado para enviar mensajes que no son críticos.

Sugerencia
Client::send() es asincrónico y solo puede ser utilizado en el entorno de ejecución de workerman. Para scripts de línea de comandos, debe usarse la interfaz sincrónica Redis::send().

Envío de mensajes desde otros proyectos

A veces, es necesario enviar mensajes desde otros proyectos sin poder usar webman\redis-queue. En este caso, puedes usar la siguiente función para enviar mensajes a la cola.

function redis_queue_send($redis, $queue, $data, $delay = 0) {
    $queue_waiting = '{redis-queue}-waiting';
    $queue_delay = '{redis-queue}-delayed';
    $now = time();
    $package_str = json_encode([
        'id'       => rand(),
        'time'     => $now,
        'delay'    => $delay,
        'attempts' => 0,
        'queue'    => $queue,
        'data'     => $data
    ]);
    if ($delay) {
        return $redis->zAdd($queue_delay, $now + $delay, $package_str);
    }
    return $redis->lPush($queue_waiting.$queue, $package_str);
}

Aquí, el parámetro $redis es una instancia de redis. Por ejemplo, el uso de la extensión de redis es similar al siguiente:

$redis = new Redis;
$redis->connect('127.0.0.1', 6379);
$queue = 'user-1';
$data= ['some', 'data'];
redis_queue_send($redis, $queue, $data);

Consumo

El archivo de configuración del proceso de consumo se encuentra en {proyecto principal}/config/plugin/webman/redis-queue/process.php.
El directorio de consumidores está en {proyecto principal}/app/queue/redis/.

Ejecutar el comando php webman redis-queue:consumer my-send-mail generará el archivo {proyecto principal}/app/queue/redis/MyMailSend.php

Sugerencia
Si el comando no existe, también se puede generar manualmente.

<?php

namespace app\queue\redis;

use Webman\RedisQueue\Consumer;

class MyMailSend implements Consumer
{
    // Nombre de la cola a consumir
    public $queue = 'send-mail';

    // Nombre de la conexión, correspondiente a plugin/webman/redis-queue/redis.php
    public $connection = 'default';

    // Consumo
    public function consume($data)
    {
        // No es necesario deserializar
        var_export($data); // Salida: ['to' => 'tom@gmail.com', 'content' => 'hello']
    }
    // Callback en caso de fallo en el consumo
    /* 
    $package = [
        'id' => 1357277951, // ID del mensaje
        'time' => 1709170510, // Hora del mensaje
        'delay' => 0, // Tiempo de retraso
        'attempts' => 2, // Número de intentos de consumo
        'queue' => 'send-mail', // Nombre de la cola
        'data' => ['to' => 'tom@gmail.com', 'content' => 'hello'], // Contenido del mensaje
        'max_attempts' => 5, // Número máximo de reintentos
        'error' => 'Información de error' // Información de error
    ]
    */
    public function onConsumeFailure(\Throwable $e, $package)
    {
        echo "fallo en el consumo\n";
        echo $e->getMessage() . "\n";
        // No es necesario deserializar
        var_export($package); 
    }
}

Nota
Si no se lanza ninguna excepción ni Error durante el proceso de consumo, se considera un consumo exitoso; de lo contrario, se considera un fallo en el consumo y se entra en la cola de reintento.
La cola de redis-queue no tiene mecanismo de ack, se puede considerar como un ack automático (sin excepciones ni Error). Si durante el proceso de consumo se desea marcar el mensaje actual como no consumido, se puede lanzar manualmente una excepción para que el mensaje actual entre en la cola de reintento. Esto en realidad no difiere del mecanismo de ack.

Sugerencia
Los consumidores admiten múltiples servidores y múltiples procesos, y un mismo mensaje no será consumido varias veces. Los mensajes ya consumidos se eliminarán automáticamente de la cola, no es necesario eliminarlos manualmente.

Sugerencia
Un proceso de consumo puede consumir simultáneamente diferentes colas. No es necesario modificar la configuración en process.php al agregar una nueva cola; al agregar un nuevo consumidor de cola, solo se necesita crear una nueva clase Consumer en app/queue/redis y usar la propiedad de clase $queue para especificar el nombre de la cola a consumir.

Sugerencia
Los usuarios de Windows deben ejecutar php windows.php para iniciar webman, de lo contrario, el proceso de consumo no se iniciará.

Sugerencia
El callback onConsumeFailure se activa cada vez que ocurre un fallo en el consumo, aquí puedes manejar la lógica posterior a un fallo. (Esta característica requiere webman/redis-queue>=1.3.2 workerman/redis-queue>=1.2.1)

Configuración de diferentes procesos de consumo para diferentes colas

Por defecto, todos los consumidores comparten el mismo proceso de consumo. Sin embargo, a veces es necesario hacer que el consumo de algunas colas sea independiente, por ejemplo, colocar los negocios de consumo lento en un grupo de procesos y los de consumo rápido en otro grupo. Para ello, podemos dividir los consumidores en dos directorios, como app_path() . '/queue/redis/fast' y app_path() . '/queue/redis/slow' (ten en cuenta que el espacio de nombres de las clases consumidoras debe modificarse en consecuencia), la configuración sería la siguiente:

return [
    ...aquí se omiten otras configuraciones...

    'redis_consumer_fast'  => [ // La clave es personalizada, no tiene restricciones de formato, aquí se nombra redis_consumer_fast
        'handler'     => Webman\RedisQueue\Process\Consumer::class,
        'count'       => 8,
        'constructor' => [
            // Directorio de clases consumidoras
            'consumer_dir' => app_path() . '/queue/redis/fast'
        ]
    ],
    'redis_consumer_slow'  => [  // La clave es personalizada, no tiene restricciones de formato, aquí se nombra redis_consumer_slow
        'handler'     => Webman\RedisQueue\Process\Consumer::class,
        'count'       => 8,
        'constructor' => [
            // Directorio de clases consumidoras
            'consumer_dir' => app_path() . '/queue/redis/slow'
        ]
    ]
];

De esta manera, los consumidores de negocios rápidos se colocan en el directorio queue/redis/fast, mientras que los de negocios lentos se colocan en el directorio queue/redis/slow, permitiendo así asignar procesos de consumo a cada cola.

Configuración de múltiples redis

Configuración

config/plugin/webman/redis-queue/redis.php

<?php
return [
    'default' => [
        'host' => 'redis://192.168.0.1:6379',
        'options' => [
            'auth' => null,       // Contraseña, tipo string, parámetro opcional
            'db' => 0,            // Base de datos
            'max_attempts'  => 5, // Número de reintentos después de un fallo en el consumo
            'retry_seconds' => 5, // Intervalo de reintento, en segundos
        ]
    ],
    'other' => [
        'host' => 'redis://192.168.0.2:6379',
        'options' => [
            'auth' => null,       // Contraseña, tipo string, parámetro opcional
            'db' => 0,             // Base de datos
            'max_attempts'  => 5, // Número de reintentos después de un fallo en el consumo
            'retry_seconds' => 5, // Intervalo de reintento, en segundos
        ]
    ],
];

Ten en cuenta que se ha añadido una configuración de redis con other como clave.

Envío de mensajes a múltiples redis

// Enviar mensajes a la cola con clave `default`
Client::connection('default')->send($queue, $data);
Redis::connection('default')->send($queue, $data);
//  Equivalente a
Client::send($queue, $data);
Redis::send($queue, $data);

// Enviar mensajes a la cola con clave `other`
Client::connection('other')->send($queue, $data);
Redis::connection('other')->send($queue, $data);

Consumo de múltiples redis

Consumir desde la cola configurada con other como clave:

namespace app\queue\redis;

use Webman\RedisQueue\Consumer;

class SendMail implements Consumer
{
    // Nombre de la cola a consumir
    public $queue = 'send-mail';

    // === Aquí se configura como other, representando la cola con clave other en la configuración ===
    public $connection = 'other';

    // Consumo
    public function consume($data)
    {
        // No es necesario deserializar
        var_export($data);
    }
}

Preguntas frecuentes

¿Por qué aparece el error Workerman\Redis\Exception: Workerman Redis Wait Timeout (600 seconds)?

Este error solo ocurrirá en la interfaz de envío asincrónico Client::send(). El envío asincrónico primero guardará el mensaje en la memoria local y, cuando el proceso esté libre, enviará el mensaje a Redis. Si la velocidad de recepción de Redis es menor a la velocidad de producción del mensaje, o si el proceso está muy ocupado con otros trabajos y no tiene suficiente tiempo para sincronizar los mensajes de la memoria a Redis, se producirá un embotellamiento. Si hay un embotellamiento de mensajes que dure más de 600 segundos, se activará este error.

Solución: utiliza la interfaz de envío sincrónico Redis::send().