Fila Redis
Uma fila de mensagens baseada em Redis que suporta o processamento de mensagens com atraso.
Instalação
composer require webman/redis-queue
Arquivo de configuração
O arquivo de configuração do Redis é gerado automaticamente em {projeto principal}/config/plugin/webman/redis-queue/redis.php
, e seu conteúdo é semelhante ao seguinte:
<?php
return [
'default' => [
'host' => 'redis://127.0.0.1:6379',
'options' => [
'auth' => '', // Senha, parâmetro opcional
'db' => 0, // Banco de dados
'max_attempts' => 5, // Número de tentativas de nova análise após falha no consumo
'retry_seconds' => 5, // Intervalo de nova tentativa, em segundos
]
],
];
Tentativas de nova análise após falha
Se o consumo falhar (ocorrer uma exceção), a mensagem será colocada na fila de atraso, aguardando a próxima tentativa. O número de tentativas é controlado pelo parâmetro max_attempts
, e o intervalo de nova tentativa é controlado por retry_seconds
e max_attempts
. Por exemplo, se max_attempts
for 5 e retry_seconds
for 10, o intervalo da 1ª tentativa será 1*10
segundos, o intervalo da 2ª tentativa será 2*10
segundos, o intervalo da 3ª tentativa será 3*10
segundos, e assim por diante até 5 tentativas. Se o número de tentativas exceder o que está configurado em max_attempts
, a mensagem será colocada na fila de falhas com a chave {redis-queue}-failed
.
Enviar mensagem (síncrono)
<?php
namespace app\controller;
use support\Request;
use Webman\RedisQueue\Redis;
class Index
{
public function queue(Request $request)
{
// Nome da fila
$queue = 'send-mail';
// Dados, pode passar diretamente o array, não é necessário serializar
$data = ['to' => 'tom@gmail.com', 'content' => 'hello'];
// Enviar mensagem
Redis::send($queue, $data);
// Enviar mensagem com atraso, a mensagem será processada após 60 segundos
Redis::send($queue, $data, 60);
return response('teste de fila redis');
}
}
Se o envio for bem-sucedido, Redis::send()
retorna true, caso contrário, retorna false ou lança uma exceção.
Dica
O tempo de consumo da fila de atraso pode ter erros, como a velocidade de consumo sendo menor que a velocidade de produção, o que resulta em acúmulo de fila e, consequentemente, causa atrasos no consumo. Uma solução é abrir mais processos de consumo.
Enviar mensagem (assíncrono)
<?php
namespace app\controller;
use support\Request;
use Webman\RedisQueue\Client;
class Index
{
public function queue(Request $request)
{
// Nome da fila
$queue = 'send-mail';
// Dados, pode passar diretamente o array, não é necessário serializar
$data = ['to' => 'tom@gmail.com', 'content' => 'hello'];
// Enviar mensagem
Client::send($queue, $data);
// Enviar mensagem com atraso, a mensagem será processada após 60 segundos
Client::send($queue, $data, 60);
return response('teste de fila redis');
}
}
Client::send()
não possui retorno, ele pertence ao envio assíncrono, não garante que a mensagem seja enviada com 100% de certeza para o Redis.
Dica
O princípio deClient::send()
é estabelecer uma fila em memória local e enviar mensagens para o Redis de forma assíncrona (a velocidade de sincronização é muito rápida, cerca de 10.000 mensagens por segundo). Se o processo reiniciar enquanto os dados na fila na memória local não tiverem sido totalmente sincronizados, pode haver perda de mensagens. O envio assíncronoClient::send()
é adequado para mensagens de baixa importância.Dica
Client::send()
é assíncrono e só pode ser usado no ambiente de execução do workerman. Para scripts em linha de comando, utilize a interface síncronaRedis::send()
.
Enviar mensagens em outros projetos
Às vezes, você pode precisar enviar mensagens em outros projetos e não consegue utilizar webman\redis-queue
, nesse caso, você pode usar a seguinte função para enviar mensagens para a fila.
function redis_queue_send($redis, $queue, $data, $delay = 0) {
$queue_waiting = '{redis-queue}-waiting';
$queue_delay = '{redis-queue}-delayed';
$now = time();
$package_str = json_encode([
'id' => rand(),
'time' => $now,
'delay' => $delay,
'attempts' => 0,
'queue' => $queue,
'data' => $data
]);
if ($delay) {
return $redis->zAdd($queue_delay, $now + $delay, $package_str);
}
return $redis->lPush($queue_waiting.$queue, $package_str);
}
Onde o parâmetro $redis
é a instância do Redis. Por exemplo, o uso da extensão do Redis pode ser semelhante ao seguinte:
$redis = new Redis;
$redis->connect('127.0.0.1', 6379);
$queue = 'user-1';
$data= ['some', 'data'];
redis_queue_send($redis, $queue, $data);
Consumo
O arquivo de configuração do processo de consumo está em {projeto principal}/config/plugin/webman/redis-queue/process.php
.
O diretório do consumidor está em {projeto principal}/app/queue/redis/
.
Executando o comando php webman redis-queue:consumer my-send-mail
, será gerado o arquivo {projeto principal}/app/queue/redis/MyMailSend.php
.
Dica
Se o comando não existir, você também pode gerá-lo manualmente.
<?php
namespace app\queue\redis;
use Webman\RedisQueue\Consumer;
class MyMailSend implements Consumer
{
// Nome da fila a ser consumida
public $queue = 'send-mail';
// Nome da conexão, correspondente a plugin/webman/redis-queue/redis.php
public $connection = 'default';
// Consumir
public function consume($data)
{
// Não é necessário desserializar
var_export($data); // Saída: ['to' => 'tom@gmail.com', 'content' => 'hello']
}
// Chamada de retorno em caso de falha no consumo
/*
$package = [
'id' => 1357277951, // ID da mensagem
'time' => 1709170510, // Tempo da mensagem
'delay' => 0, // Tempo de atraso
'attempts' => 2, // Número de consumos
'queue' => 'send-mail', // Nome da fila
'data' => ['to' => 'tom@gmail.com', 'content' => 'hello'], // Conteúdo da mensagem
'max_attempts' => 5, // Número máximo de tentativas
'error' => 'Informação de erro' // Mensagem de erro
]
*/
public function onConsumeFailure(\Throwable $e, $package)
{
echo "falha no consumo\n";
echo $e->getMessage() . "\n";
// Não é necessário desserializar
var_export($package);
}
}
Nota
Se durante o processo de consumo não ocorrerem exceções e erros, considera-se que o consumo foi bem-sucedido. Caso contrário, considera-se que o consumo falhou e a mensagem será enviada para a fila de tentativas.
O redis-queue não possui mecanismo de ack; você pode considerá-lo como um ack automático (sem exceções ou erros). Se durante o consumo você deseja marcar a mensagem atual como não consumida, pode lançar manualmente uma exceção para que a mensagem atual vá para a fila de tentativas. Na prática, isso não é diferente do mecanismo de ack.Dica
Os consumidores suportam múltiplos servidores e múltiplos processos, e a mesma mensagem não será consumida repetidamente. Mensagens já consumidas serão automaticamente removidas da fila, não sendo necessário removê-las manualmente.Dica
O processo de consumo pode consumir simultaneamente diferentes filas, adicionar novas filas não precisa modificar as configurações emprocess.php
, ao adicionar consumidores de novas filas, basta criar a classeConsumer
correspondente dentro deapp/queue/redis
e usar o atributo de classe$queue
para especificar o nome da fila a ser consumida.Dica
Usuários do Windows precisam executarphp windows.php
para iniciar o webman; caso contrário, o processo de consumo não será iniciado.Dica
O callbackonConsumeFailure
será acionado sempre que houver falha no consumo, e você pode gerenciar a lógica após a falha aqui. (Esse recurso requerwebman/redis-queue>=1.3.2
workerman/redis-queue>=1.2.1
)
Definindo diferentes processos de consumo para diferentes filas
Por padrão, todos os consumidores compartilham o mesmo processo de consumo. No entanto, às vezes precisamos isolar certos consumos de filas, como negócios que processam lentamente em um conjunto de processos e negócios que processam rapidamente em outro conjunto. Para isso, podemos dividir os consumidores em dois diretórios, por exemplo, app_path() . '/queue/redis/fast'
e app_path() . '/queue/redis/slow'
(note que é necessário alterar o namespace das classes de consumo correspondentes), a configuração seria a seguinte:
return [
... Outras configurações omitidas ...
'redis_consumer_fast' => [ // chave é personalizada, sem restrições de formato, aqui chamamos de redis_consumer_fast
'handler' => Webman\RedisQueue\Process\Consumer::class,
'count' => 8,
'constructor' => [
// Diretório da classe do consumidor
'consumer_dir' => app_path() . '/queue/redis/fast'
]
],
'redis_consumer_slow' => [ // chave é personalizada, sem restrições de formato, aqui chamamos de redis_consumer_slow
'handler' => Webman\RedisQueue\Process\Consumer::class,
'count' => 8,
'constructor' => [
// Diretório da classe do consumidor
'consumer_dir' => app_path() . '/queue/redis/slow'
]
]
];
Dessa forma, os consumidores de tarefas rápidas ficam no diretório queue/redis/fast
e as tarefas lentas ficam no diretório queue/redis/slow
, permitindo especificar os processos de consumo para cada fila.
Múltiplas configurações do Redis
Configuração
config/plugin/webman/redis-queue/redis.php
<?php
return [
'default' => [
'host' => 'redis://192.168.0.1:6379',
'options' => [
'auth' => null, // Senha, do tipo string, parâmetro opcional
'db' => 0, // Banco de dados
'max_attempts' => 5, // Número de tentativas de nova análise após falha no consumo
'retry_seconds' => 5, // Intervalo de nova tentativa, em segundos
]
],
'other' => [
'host' => 'redis://192.168.0.2:6379',
'options' => [
'auth' => null, // Senha, do tipo string, parâmetro opcional
'db' => 0, // Banco de dados
'max_attempts' => 5, // Número de tentativas de nova análise após falha no consumo
'retry_seconds' => 5, // Intervalo de nova tentativa, em segundos
]
],
];
Note que foi adicionada uma configuração do Redis com a chave other
.
Envio de mensagens para múltiplos Redis
// Enviar mensagem para a fila com a chave `default`
Client::connection('default')->send($queue, $data);
Redis::connection('default')->send($queue, $data);
// Equivalente a
Client::send($queue, $data);
Redis::send($queue, $data);
// Enviar mensagem para a fila com a chave `other`
Client::connection('other')->send($queue, $data);
Redis::connection('other')->send($queue, $data);
Consumo em múltiplos Redis
No consumidor, configure a fila a ser consumida com other
como a chave da configuração do Redis:
namespace app\queue\redis;
use Webman\RedisQueue\Consumer;
class SendMail implements Consumer
{
// Nome da fila a ser consumida
public $queue = 'send-mail';
// === Aqui definindo como other, indicando que a chave da configuração de consumo é other ===
public $connection = 'other';
// Consumir
public function consume($data)
{
// Não é necessário desserializar
var_export($data);
}
}
Problemas comuns
Por que aparece o erro Workerman\Redis\Exception: Workerman Redis Wait Timeout (600 seconds)
Esse erro só ocorre na interface de envio assíncrono Client::send()
. O envio assíncrono primeiro armazena a mensagem na memória local e, quando o processo está ocioso, envia a mensagem para o Redis. Se a velocidade de recebimento do Redis for menor que a velocidade de produção da mensagem ou se o processo estiver sempre ocupado com outros negócios sem tempo suficiente para sincronizar a mensagem em memória ao Redis, isso pode causar uma pressão de mensagens. Se a pressão de mensagens exceder 600 segundos, isso acionará esse erro.
Solução: Utilize a interface de envio síncrono Redis::send()
para enviar mensagens.