Fila Redis
Fila de mensagens baseada em Redis que suporta processamento de mensagens com atraso.
Instalação
composer require webman/redis-queue
Arquivo de configuração
O arquivo de configuração Redis é gerado automaticamente em {projeto-principal}/config/plugin/webman/redis-queue/redis.php, com conteúdo similar ao seguinte:
<?php
return [
'default' => [
'host' => 'redis://127.0.0.1:6379',
'options' => [
'auth' => '', // Senha, opcional
'db' => 0, // Banco de dados
'max_attempts' => 5, // Número de tentativas após falha no consumo
'retry_seconds' => 5, // Intervalo de tentativa em segundos
]
],
];
Nova tentativa após falha no consumo
Se o consumo falhar (ocorrer exceção), a mensagem será colocada na fila de atraso e aguardará a próxima tentativa. O número de tentativas é controlado por max_attempts; o intervalo, por retry_seconds e max_attempts. Ex.: se max_attempts for 5 e retry_seconds for 10, o intervalo da 1.ª tentativa é 1*10 s, da 2.ª 2*10 s, da 3.ª 3*10 s, até 5 tentativas. Se o número de tentativas exceder o definido em max_attempts, a mensagem vai para a fila de falhas com chave {redis-queue}-failed.
Envio de mensagem (síncrono)
<?php
namespace app\controller;
use support\Request;
use Webman\RedisQueue\Redis;
class Index
{
public function queue(Request $request)
{
// Nome da fila
$queue = 'send-mail';
// Dados, podem ser passados como array diretamente, sem serialização
$data = ['to' => 'tom@gmail.com', 'content' => 'hello'];
// Enviar mensagem
Redis::send($queue, $data);
// Enviar mensagem atrasada, processada após 60 segundos
Redis::send($queue, $data, 60);
return response('redis queue test');
}
}
Em sucesso, Redis::send() retorna true; caso contrário false ou exceção.
Dica
Pode haver desvio no horário de consumo da fila atrasada. Ex.: quando a velocidade de consumo é menor que a de produção, a fila pode acumular e atrasar. Mitigação: rodar mais processos consumidores.
Envio de mensagem (assíncrono)
<?php
namespace app\controller;
use support\Request;
use Webman\RedisQueue\Client;
class Index
{
public function queue(Request $request)
{
// Nome da fila
$queue = 'send-mail';
// Dados, podem ser passados como array diretamente, sem serialização
$data = ['to' => 'tom@gmail.com', 'content' => 'hello'];
// Enviar mensagem
Client::send($queue, $data);
// Enviar mensagem atrasada, processada após 60 segundos
Client::send($queue, $data, 60);
return response('redis queue test');
}
}
Client::send() não retorna valor. É um envio assíncrono e não garante 100% de entrega ao Redis.
Dica
O princípio deClient::send()é criar uma fila em memória local e sincronizar mensagens de forma assíncrona com Redis (sincronização é rápida, ~10.000 mensagens/s). Se o processo reiniciar antes dos dados da fila em memória estarem totalmente sincronizados, mensagens podem ser perdidas. O envio assíncrono comClient::send()é adequado para mensagens não críticas.Dica
Client::send()é assíncrono e só pode ser usado no ambiente de execução Workerman. Para scripts em linha de comando use a interface síncronaRedis::send().
Enviar mensagens de outros projetos
Às vezes é preciso enviar mensagens de outros projetos e não se pode usar webman\redis-queue. Nesses casos, pode usar a função abaixo para enviar mensagens à fila.
function redis_queue_send($redis, $queue, $data, $delay = 0) {
$queue_waiting = '{redis-queue}-waiting';
$queue_delay = '{redis-queue}-delayed';
$now = time();
$package_str = json_encode([
'id' => rand(),
'time' => $now,
'delay' => $delay,
'attempts' => 0,
'queue' => $queue,
'data' => $data
]);
if ($delay) {
return $redis->zAdd($queue_delay, $now + $delay, $package_str);
}
return $redis->lPush($queue_waiting.$queue, $package_str);
}
Aqui o parâmetro $redis é a instância Redis. Ex.: o uso da extensão redis é similar a:
$redis = new Redis;
$redis->connect('127.0.0.1', 6379);
$queue = 'user-1';
$data= ['some', 'data'];
redis_queue_send($redis, $queue, $data);
Consumo
O arquivo de configuração do processo consumidor está em {projeto-principal}/config/plugin/webman/redis-queue/process.php. O diretório dos consumidores está em {projeto-principal}/app/queue/redis/.
O comando php webman redis-queue:consumer my-send-mail gera o arquivo {projeto-principal}/app/queue/redis/MyMailSend.php.
Dica
Este comando requer a instalação do plugin Console. Se não quiser instalar, pode criar manualmente um arquivo similar ao seguinte:
<?php
namespace app\queue\redis;
use Webman\RedisQueue\Consumer;
class MyMailSend implements Consumer
{
// Nome da fila a consumir
public $queue = 'send-mail';
// Nome da conexão, corresponde à conexão em plugin/webman/redis-queue/redis.php
public $connection = 'default';
// Consumo
public function consume($data)
{
// Não precisa deserializar
var_export($data); // Saída ['to' => 'tom@gmail.com', 'content' => 'hello']
}
// Callback de falha no consumo
/*
$package = [
'id' => 1357277951, // ID da mensagem
'time' => 1709170510, // Tempo da mensagem
'delay' => 0, // Tempo de atraso
'attempts' => 2, // Número de consumos
'queue' => 'send-mail', // Nome da fila
'data' => ['to' => 'tom@gmail.com', 'content' => 'hello'], // Conteúdo da mensagem
'max_attempts' => 5, // Número máximo de tentativas
'error' => 'Mensagem de erro' // Mensagem de erro
]
*/
public function onConsumeFailure(\Throwable $e, $package)
{
echo "consume failure\n";
echo $e->getMessage() . "\n";
// Não precisa deserializar
var_export($package);
}
}
Nota
O consumo é considerado bem-sucedido quando nenhuma exceção ou Error é lançada durante o consumo; caso contrário é falha e a mensagem entra na fila de nova tentativa. O redis-queue não tem mecanismo ack; pode ser visto como ack automático (quando não há exceção ou Error). Para marcar a mensagem atual como não consumida com sucesso, lance uma exceção manualmente para enviar à fila de nova tentativa. Na prática equivale a um mecanismo ack.Dica
Os consumidores suportam múltiplos servidores e processos, e a mesma mensagem não será consumida duas vezes. As mensagens consumidas são removidas automaticamente da fila; não é necessário remover manualmente.Dica
Os processos consumidores podem consumir várias filas diferentes ao mesmo tempo. Adicionar uma nova fila não exige alterar a configuração emprocess.php. Para adicionar consumidor de fila nova, basta adicionar a classeConsumercorrespondente emapp/queue/redise usar a propriedade$queuepara especificar o nome da fila a consumir.Dica
Usuários Windows precisam executarphp windows.phppara iniciar o webman; caso contrário o processo consumidor não será iniciado.Dica
O callback onConsumeFailure é acionado a cada falha no consumo. Aqui você pode tratar a lógica pós-falha. (Esta função requerwebman/redis-queue>=1.3.2eworkerman/redis-queue>=1.2.1)
Definir processos consumidores diferentes para filas diferentes
Por padrão todos os consumidores compartilham o mesmo processo. Às vezes quer-se separar o consumo de algumas filas—ex.: negócios de consumo lento em um grupo de processos, consumo rápido em outro. Para isso, os consumidores podem ser divididos em dois diretórios, ex.: app_path() . '/queue/redis/fast' e app_path() . '/queue/redis/slow' (o namespace da classe consumidora deve ser atualizado). A configuração fica:
return [
...outras configurações omitidas...
'redis_consumer_fast' => [ // A chave é personalizada, sem restrição de formato, aqui redis_consumer_fast
'handler' => Webman\RedisQueue\Process\Consumer::class,
'count' => 8,
'constructor' => [
// Diretório das classes consumidoras
'consumer_dir' => app_path() . '/queue/redis/fast'
]
],
'redis_consumer_slow' => [ // A chave é personalizada, sem restrição de formato, aqui redis_consumer_slow
'handler' => Webman\RedisQueue\Process\Consumer::class,
'count' => 8,
'constructor' => [
// Diretório das classes consumidoras
'consumer_dir' => app_path() . '/queue/redis/slow'
]
]
];
Assim, consumidores de negócios rápidos ficam em queue/redis/fast e os lentos em queue/redis/slow, atingindo o objetivo de atribuir processos consumidores às filas.
Configuração múltipla de Redis
Configuração
config/plugin/webman/redis-queue/redis.php
<?php
return [
'default' => [
'host' => 'redis://192.168.0.1:6379',
'options' => [
'auth' => null, // Senha, tipo string, opcional
'db' => 0, // Banco de dados
'max_attempts' => 5, // Tentativas após falha no consumo
'retry_seconds' => 5, // Intervalo de tentativa em segundos
]
],
'other' => [
'host' => 'redis://192.168.0.2:6379',
'options' => [
'auth' => null, // Senha, tipo string, opcional
'db' => 0, // Banco de dados
'max_attempts' => 5, // Tentativas após falha no consumo
'retry_seconds' => 5, // Intervalo de tentativa em segundos
]
],
];
A configuração inclui uma configuração Redis adicional com chave other.
Enviar mensagens para Redis múltiplos
// Enviar mensagem para a fila com chave `default`
Client::connection('default')->send($queue, $data);
Redis::connection('default')->send($queue, $data);
// O mesmo que
Client::send($queue, $data);
Redis::send($queue, $data);
// Enviar mensagem para a fila com chave `other`
Client::connection('other')->send($queue, $data);
Redis::connection('other')->send($queue, $data);
Consumir de Redis múltiplos
Consumir mensagens da fila com chave other na configuração:
namespace app\queue\redis;
use Webman\RedisQueue\Consumer;
class SendMail implements Consumer
{
// Nome da fila a consumir
public $queue = 'send-mail';
// === Definir 'other' aqui para consumir da fila com chave 'other' na configuração ===
public $connection = 'other';
// Consumo
public function consume($data)
{
// Não precisa deserializar
var_export($data);
}
}
FAQ
Por que ocorre o erro Workerman\Redis\Exception: Workerman Redis Wait Timeout (600 seconds)?
Este erro ocorre apenas com a interface de envio assíncrono Client::send(). O envio assíncrono salva primeiro as mensagens na memória local e depois as envia ao Redis quando o processo está ocioso. Se o Redis recebe mensagens mais devagar do que são produzidas, ou o processo está ocupado com outras tarefas e não tem tempo para sincronizar as mensagens da memória ao Redis, pode haver acúmulo. Se as mensagens ficarem acumuladas por mais de 600 segundos, este erro é acionado.
Solução: use a interface de envio síncrono Redis::send() para envio de mensagens.