Fila Redis

Uma fila de mensagens baseada em Redis que suporta o processamento de mensagens com atraso.

Instalação

composer require webman/redis-queue

Arquivo de configuração

O arquivo de configuração do Redis é gerado automaticamente em {projeto principal}/config/plugin/webman/redis-queue/redis.php, e seu conteúdo é semelhante ao seguinte:

<?php
return [
    'default' => [
        'host' => 'redis://127.0.0.1:6379',
        'options' => [
            'auth' => '',         // Senha, parâmetro opcional
            'db' => 0,            // Banco de dados
            'max_attempts'  => 5, // Número de tentativas de nova análise após falha no consumo
            'retry_seconds' => 5, // Intervalo de nova tentativa, em segundos
        ]
    ],
];

Tentativas de nova análise após falha

Se o consumo falhar (ocorrer uma exceção), a mensagem será colocada na fila de atraso, aguardando a próxima tentativa. O número de tentativas é controlado pelo parâmetro max_attempts, e o intervalo de nova tentativa é controlado por retry_seconds e max_attempts. Por exemplo, se max_attempts for 5 e retry_seconds for 10, o intervalo da 1ª tentativa será 1*10 segundos, o intervalo da 2ª tentativa será 2*10 segundos, o intervalo da 3ª tentativa será 3*10 segundos, e assim por diante até 5 tentativas. Se o número de tentativas exceder o que está configurado em max_attempts, a mensagem será colocada na fila de falhas com a chave {redis-queue}-failed.

Enviar mensagem (síncrono)

<?php
namespace app\controller;

use support\Request;
use Webman\RedisQueue\Redis;

class Index
{
    public function queue(Request $request)
    {
        // Nome da fila
        $queue = 'send-mail';
        // Dados, pode passar diretamente o array, não é necessário serializar
        $data = ['to' => 'tom@gmail.com', 'content' => 'hello'];
        // Enviar mensagem
        Redis::send($queue, $data);
        // Enviar mensagem com atraso, a mensagem será processada após 60 segundos
        Redis::send($queue, $data, 60);

        return response('teste de fila redis');
    }

}

Se o envio for bem-sucedido, Redis::send() retorna true, caso contrário, retorna false ou lança uma exceção.

Dica
O tempo de consumo da fila de atraso pode ter erros, como a velocidade de consumo sendo menor que a velocidade de produção, o que resulta em acúmulo de fila e, consequentemente, causa atrasos no consumo. Uma solução é abrir mais processos de consumo.

Enviar mensagem (assíncrono)

<?php
namespace app\controller;

use support\Request;
use Webman\RedisQueue\Client;

class Index
{
    public function queue(Request $request)
    {
        // Nome da fila
        $queue = 'send-mail';
        // Dados, pode passar diretamente o array, não é necessário serializar
        $data = ['to' => 'tom@gmail.com', 'content' => 'hello'];
        // Enviar mensagem
        Client::send($queue, $data);
        // Enviar mensagem com atraso, a mensagem será processada após 60 segundos
        Client::send($queue, $data, 60);

        return response('teste de fila redis');
    }

}

Client::send() não possui retorno, ele pertence ao envio assíncrono, não garante que a mensagem seja enviada com 100% de certeza para o Redis.

Dica
O princípio de Client::send() é estabelecer uma fila em memória local e enviar mensagens para o Redis de forma assíncrona (a velocidade de sincronização é muito rápida, cerca de 10.000 mensagens por segundo). Se o processo reiniciar enquanto os dados na fila na memória local não tiverem sido totalmente sincronizados, pode haver perda de mensagens. O envio assíncrono Client::send() é adequado para mensagens de baixa importância.

Dica
Client::send() é assíncrono e só pode ser usado no ambiente de execução do workerman. Para scripts em linha de comando, utilize a interface síncrona Redis::send().

Enviar mensagens em outros projetos

Às vezes, você pode precisar enviar mensagens em outros projetos e não consegue utilizar webman\redis-queue, nesse caso, você pode usar a seguinte função para enviar mensagens para a fila.

function redis_queue_send($redis, $queue, $data, $delay = 0) {
    $queue_waiting = '{redis-queue}-waiting';
    $queue_delay = '{redis-queue}-delayed';
    $now = time();
    $package_str = json_encode([
        'id'       => rand(),
        'time'     => $now,
        'delay'    => $delay,
        'attempts' => 0,
        'queue'    => $queue,
        'data'     => $data
    ]);
    if ($delay) {
        return $redis->zAdd($queue_delay, $now + $delay, $package_str);
    }
    return $redis->lPush($queue_waiting.$queue, $package_str);
}

Onde o parâmetro $redis é a instância do Redis. Por exemplo, o uso da extensão do Redis pode ser semelhante ao seguinte:

$redis = new Redis;
$redis->connect('127.0.0.1', 6379);
$queue = 'user-1';
$data= ['some', 'data'];
redis_queue_send($redis, $queue, $data);

Consumo

O arquivo de configuração do processo de consumo está em {projeto principal}/config/plugin/webman/redis-queue/process.php.
O diretório do consumidor está em {projeto principal}/app/queue/redis/.

Executando o comando php webman redis-queue:consumer my-send-mail, será gerado o arquivo {projeto principal}/app/queue/redis/MyMailSend.php.

Dica
Se o comando não existir, você também pode gerá-lo manualmente.

<?php

namespace app\queue\redis;

use Webman\RedisQueue\Consumer;

class MyMailSend implements Consumer
{
    // Nome da fila a ser consumida
    public $queue = 'send-mail';

    // Nome da conexão, correspondente a plugin/webman/redis-queue/redis.php 
    public $connection = 'default';

    // Consumir
    public function consume($data)
    {
        // Não é necessário desserializar
        var_export($data); // Saída: ['to' => 'tom@gmail.com', 'content' => 'hello']
    }
    // Chamada de retorno em caso de falha no consumo
    /* 
    $package = [
        'id' => 1357277951, // ID da mensagem
        'time' => 1709170510, // Tempo da mensagem
        'delay' => 0, // Tempo de atraso
        'attempts' => 2, // Número de consumos
        'queue' => 'send-mail', // Nome da fila
        'data' => ['to' => 'tom@gmail.com', 'content' => 'hello'], // Conteúdo da mensagem
        'max_attempts' => 5, // Número máximo de tentativas
        'error' => 'Informação de erro' // Mensagem de erro
    ]
    */
    public function onConsumeFailure(\Throwable $e, $package)
    {
        echo "falha no consumo\n";
        echo $e->getMessage() . "\n";
        // Não é necessário desserializar
        var_export($package); 
    }
}

Nota
Se durante o processo de consumo não ocorrerem exceções e erros, considera-se que o consumo foi bem-sucedido. Caso contrário, considera-se que o consumo falhou e a mensagem será enviada para a fila de tentativas.
O redis-queue não possui mecanismo de ack; você pode considerá-lo como um ack automático (sem exceções ou erros). Se durante o consumo você deseja marcar a mensagem atual como não consumida, pode lançar manualmente uma exceção para que a mensagem atual vá para a fila de tentativas. Na prática, isso não é diferente do mecanismo de ack.

Dica
Os consumidores suportam múltiplos servidores e múltiplos processos, e a mesma mensagem não será consumida repetidamente. Mensagens já consumidas serão automaticamente removidas da fila, não sendo necessário removê-las manualmente.

Dica
O processo de consumo pode consumir simultaneamente diferentes filas, adicionar novas filas não precisa modificar as configurações em process.php, ao adicionar consumidores de novas filas, basta criar a classe Consumer correspondente dentro de app/queue/redis e usar o atributo de classe $queue para especificar o nome da fila a ser consumida.

Dica
Usuários do Windows precisam executar php windows.php para iniciar o webman; caso contrário, o processo de consumo não será iniciado.

Dica
O callback onConsumeFailure será acionado sempre que houver falha no consumo, e você pode gerenciar a lógica após a falha aqui. (Esse recurso requer webman/redis-queue>=1.3.2 workerman/redis-queue>=1.2.1)

Definindo diferentes processos de consumo para diferentes filas

Por padrão, todos os consumidores compartilham o mesmo processo de consumo. No entanto, às vezes precisamos isolar certos consumos de filas, como negócios que processam lentamente em um conjunto de processos e negócios que processam rapidamente em outro conjunto. Para isso, podemos dividir os consumidores em dois diretórios, por exemplo, app_path() . '/queue/redis/fast' e app_path() . '/queue/redis/slow' (note que é necessário alterar o namespace das classes de consumo correspondentes), a configuração seria a seguinte:

return [
    ... Outras configurações omitidas ...

    'redis_consumer_fast'  => [ // chave é personalizada, sem restrições de formato, aqui chamamos de redis_consumer_fast
        'handler'     => Webman\RedisQueue\Process\Consumer::class,
        'count'       => 8,
        'constructor' => [
            // Diretório da classe do consumidor
            'consumer_dir' => app_path() . '/queue/redis/fast'
        ]
    ],
    'redis_consumer_slow'  => [  // chave é personalizada, sem restrições de formato, aqui chamamos de redis_consumer_slow
        'handler'     => Webman\RedisQueue\Process\Consumer::class,
        'count'       => 8,
        'constructor' => [
            // Diretório da classe do consumidor
            'consumer_dir' => app_path() . '/queue/redis/slow'
        ]
    ]
];

Dessa forma, os consumidores de tarefas rápidas ficam no diretório queue/redis/fast e as tarefas lentas ficam no diretório queue/redis/slow, permitindo especificar os processos de consumo para cada fila.

Múltiplas configurações do Redis

Configuração

config/plugin/webman/redis-queue/redis.php

<?php
return [
    'default' => [
        'host' => 'redis://192.168.0.1:6379',
        'options' => [
            'auth' => null,       // Senha, do tipo string, parâmetro opcional
            'db' => 0,            // Banco de dados
            'max_attempts'  => 5, // Número de tentativas de nova análise após falha no consumo
            'retry_seconds' => 5, // Intervalo de nova tentativa, em segundos
        ]
    ],
    'other' => [
        'host' => 'redis://192.168.0.2:6379',
        'options' => [
            'auth' => null,       // Senha, do tipo string, parâmetro opcional
            'db' => 0,            // Banco de dados
            'max_attempts'  => 5, // Número de tentativas de nova análise após falha no consumo
            'retry_seconds' => 5, // Intervalo de nova tentativa, em segundos
        ]
    ],
];

Note que foi adicionada uma configuração do Redis com a chave other.

Envio de mensagens para múltiplos Redis

// Enviar mensagem para a fila com a chave `default`
Client::connection('default')->send($queue, $data);
Redis::connection('default')->send($queue, $data);
// Equivalente a
Client::send($queue, $data);
Redis::send($queue, $data);

// Enviar mensagem para a fila com a chave `other`
Client::connection('other')->send($queue, $data);
Redis::connection('other')->send($queue, $data);

Consumo em múltiplos Redis

No consumidor, configure a fila a ser consumida com other como a chave da configuração do Redis:

namespace app\queue\redis;

use Webman\RedisQueue\Consumer;

class SendMail implements Consumer
{
    // Nome da fila a ser consumida
    public $queue = 'send-mail';

    // === Aqui definindo como other, indicando que a chave da configuração de consumo é other ===
    public $connection = 'other';

    // Consumir
    public function consume($data)
    {
        // Não é necessário desserializar
        var_export($data);
    }
}

Problemas comuns

Por que aparece o erro Workerman\Redis\Exception: Workerman Redis Wait Timeout (600 seconds)

Esse erro só ocorre na interface de envio assíncrono Client::send(). O envio assíncrono primeiro armazena a mensagem na memória local e, quando o processo está ocioso, envia a mensagem para o Redis. Se a velocidade de recebimento do Redis for menor que a velocidade de produção da mensagem ou se o processo estiver sempre ocupado com outros negócios sem tempo suficiente para sincronizar a mensagem em memória ao Redis, isso pode causar uma pressão de mensagens. Se a pressão de mensagens exceder 600 segundos, isso acionará esse erro.

Solução: Utilize a interface de envio síncrono Redis::send() para enviar mensagens.