Coda Redis
Coda dei messaggi basata su Redis, supporta l'elaborazione rinviata dei messaggi.
Installazione
composer require webman/redis-queue
File di configurazione
Il file di configurazione redis viene generato automaticamente in {progetto principale}/config/plugin/webman/redis-queue/redis.php
, il contenuto è simile al seguente:
<?php
return [
'default' => [
'host' => 'redis://127.0.0.1:6379',
'options' => [
'auth' => '', // Password, parametro opzionale
'db' => 0, // Database
'max_attempts' => 5, // Numero di tentativi di ripetizione dopo il fallimento del consumo
'retry_seconds' => 5, // Intervallo di ripetizione, in secondi
]
],
];
Tentativi di ripetizione in caso di fallimento del consumo
Se il consumo fallisce (si verifica un'eccezione), il messaggio verrà inserito nella coda di rinvio, in attesa del prossimo tentativo. Il numero di tentativi di ripetizione è controllato dal parametro max_attempts
, l'intervallo di ripetizione è controllato congiuntamente da retry_seconds
e max_attempts
. Ad esempio, se max_attempts
è 5 e retry_seconds
è 10, il primo intervallo di ripetizione sarà di 1*10
secondi, il secondo intervallo di ripetizione sarà di 2*10
secondi, il terzo intervallo sarà di 3*10
secondi, e così via fino a 5 tentativi. Se si supera il numero di tentativi impostato in max_attempts
, il messaggio verrà inserito nella coda di fallimento con chiave {redis-queue}-failed
.
Invio di messaggi (sincrono)
<?php
namespace app\controller;
use support\Request;
use Webman\RedisQueue\Redis;
class Index
{
public function queue(Request $request)
{
// Nome della coda
$queue = 'send-mail';
// Dati, è possibile passare direttamente un array, non è necessaria la serializzazione
$data = ['to' => 'tom@gmail.com', 'content' => 'hello'];
// Inviare il messaggio
Redis::send($queue, $data);
// Invia messaggio rinviato, il messaggio sarà elaborato dopo 60 secondi
Redis::send($queue, $data, 60);
return response('redis queue test');
}
}
Se l'invio ha successo, Redis::send()
restituisce true, altrimenti restituisce false o genera un'eccezione.
Suggerimento
Il tempo di consumo della coda rinviata potrebbe presentare delle discrepanze, ad esempio, una velocità di consumo inferiore alla velocità di produzione porta all'accumulo nella coda, causando ritardi nel consumo. Il rimedio consiste nell'aprire più processi di consumo.
Invio di messaggi (asincrono)
<?php
namespace app\controller;
use support\Request;
use Webman\RedisQueue\Client;
class Index
{
public function queue(Request $request)
{
// Nome della coda
$queue = 'send-mail';
// Dati, è possibile passare direttamente un array, non è necessaria la serializzazione
$data = ['to' => 'tom@gmail.com', 'content' => 'hello'];
// Inviare il messaggio
Client::send($queue, $data);
// Invia messaggio rinviato, il messaggio sarà elaborato dopo 60 secondi
Client::send($queue, $data, 60);
return response('redis queue test');
}
}
Client::send()
non ha un valore restituito, appartiene all'invio asincrono, non garantisce che il messaggio arrivi al redis al 100%.
Suggerimento
Il principio diClient::send()
è quello di creare una coda di memoria locale e inviare i messaggi in modo asincrono a redis (la velocità di sincronizzazione è molto veloce, circa 10.000 messaggi al secondo). Se il processo viene riavviato e i dati nella coda locale non sono completamente sincronizzati, si potrebbe verificare la perdita di messaggi.Client::send()
per l'invio asincrono è adatto per inviare messaggi non importanti.Suggerimento
Client::send()
è asincrono e può essere utilizzato solo nell'ambiente di esecuzione di workerman; negli script da linea di comando è necessario utilizzare l'interfaccia sincronaRedis::send()
.
Invio di messaggi da altri progetti
A volte è necessario inviare messaggi da altri progetti e non è possibile utilizzare webman\redis-queue
; è possibile fare riferimento alla seguente funzione per inviare messaggi alla coda.
function redis_queue_send($redis, $queue, $data, $delay = 0) {
$queue_waiting = '{redis-queue}-waiting';
$queue_delay = '{redis-queue}-delayed';
$now = time();
$package_str = json_encode([
'id' => rand(),
'time' => $now,
'delay' => $delay,
'attempts' => 0,
'queue' => $queue,
'data' => $data
]);
if ($delay) {
return $redis->zAdd($queue_delay, $now + $delay, $package_str);
}
return $redis->lPush($queue_waiting.$queue, $package_str);
}
Dove il parametro $redis
è un'istanza di redis. Ad esempio, l'uso dell'estensione redis è simile a questo:
$redis = new Redis;
$redis->connect('127.0.0.1', 6379);
$queue = 'user-1';
$data= ['some', 'data'];
redis_queue_send($redis, $queue, $data);
Consumo
Il file di configurazione del processo di consumo si trova in {progetto principale}/config/plugin/webman/redis-queue/process.php
.
La directory del consumatore si trova sotto {progetto principale}/app/queue/redis/
.
Eseguendo il comando php webman redis-queue:consumer my-send-mail
, verrà generato il file {progetto principale}/app/queue/redis/MyMailSend.php
Suggerimento
Se il comando non esiste, è possibile generarlo manualmente
<?php
namespace app\queue\redis;
use Webman\RedisQueue\Consumer;
class MyMailSend implements Consumer
{
// Nome della coda da consumare
public $queue = 'send-mail';
// Nome della connessione, corrispondente alla connessione in plugin/webman/redis-queue/redis.php
public $connection = 'default';
// Consumare
public function consume($data)
{
// Non è necessaria la deserializzazione
var_export($data); // Output: ['to' => 'tom@gmail.com', 'content' => 'hello']
}
// Callback in caso di fallimento del consumo
/*
$package = [
'id' => 1357277951, // ID del messaggio
'time' => 1709170510, // Tempo del messaggio
'delay' => 0, // Tempo di ritardo
'attempts' => 2, // Numero di tentativi di consumo
'queue' => 'send-mail', // Nome della coda
'data' => ['to' => 'tom@gmail.com', 'content' => 'hello'], // Contenuto del messaggio
'max_attempts' => 5, // Massimo numero di tentativi
'error' => 'Messaggio di errore' // Messaggio di errore
]
*/
public function onConsumeFailure(\Throwable $e, $package)
{
echo "consumo fallito\n";
echo $e->getMessage() . "\n";
// Non è necessaria la deserializzazione
var_export($package);
}
}
Attenzione
Se durante il processo di consumo non viene generata alcuna eccezione o errore, si considera che il consumo sia andato a buon fine; in caso contrario, si considera un fallimento, e il messaggio entra nella coda di ripetizione.
redis-queue non ha un meccanismo di ack, puoi considerarlo come un ack automatico (nessuna eccezione o errore sollevati). Se durante il consumo vuoi contrassegnare il messaggio attuale come non consumato, puoi generare manualmente un'eccezione, portando il messaggio attuale nella coda di ripetizione. Questo, in effetti, non è diverso da un meccanismo di ack.Suggerimento
I consumatori supportano più server e più processi, e lo stesso messaggio non viene consumato più volte. I messaggi già consumati vengono automaticamente rimossi dalla coda, senza necessità di cancellazione manuale.Suggerimento
I processi di consumo possono consumare simultaneamente diverse code, e l'aggiunta di nuove code non richiede modifiche alla configurazione inprocess.php
, ma richiede solo l'aggiunta della corrispondente classeConsumer
sottoapp/queue/redis
, specificando il nome della coda da consumare con l'attributo di classe$queue
.Suggerimento
Gli utenti Windows devono eseguirephp windows.php
per avviare webman, altrimenti i processi di consumo non verranno avviati.Suggerimento
La callbackonConsumeFailure
verrà attivata ogni volta che il consumo fallisce, puoi gestire qui la logica successiva al fallimento. (Questa funzionalità richiedewebman/redis-queue>=1.3.2
workerman/redis-queue>=1.2.1
)
Impostare processi di consumo diversi per diverse code
Per impostazione predefinita, tutti i consumatori condividono lo stesso processo di consumo. Ma a volte dobbiamo effettuare il consumo di alcune code in modo indipendente, ad esempio portando a un gruppo di processi il business che consuma lentamente, e lasciando un altro gruppo per il business che consuma rapidamente. A tal fine, possiamo suddividere i consumatori in due directory, ad esempio app_path() . '/queue/redis/fast'
e app_path() . '/queue/redis/slow'
(nota che è necessario modificare di conseguenza il namespace delle classi di consumo), quindi la configurazione sarà la seguente:
return [
...altri parametri omessi...
'redis_consumer_fast' => [ // chiave personalizzata, nessuna restrizione di formato
'handler' => Webman\RedisQueue\Process\Consumer::class,
'count' => 8,
'constructor' => [
// Directory delle classi consumatrici
'consumer_dir' => app_path() . '/queue/redis/fast'
]
],
'redis_consumer_slow' => [ // chiave personalizzata, nessuna restrizione di formato
'handler' => Webman\RedisQueue\Process\Consumer::class,
'count' => 8,
'constructor' => [
// Directory delle classi consumatrici
'consumer_dir' => app_path() . '/queue/redis/slow'
]
]
];
In questo modo, i consumatori rapidi vengono messi nella directory queue/redis/fast
, e i consumatori lenti vengono messi nella directory queue/redis/slow
, raggiungendo l'obiettivo di specificare processi di consumo per diverse code.
Configurazione di più redis
Configurazione
config/plugin/webman/redis-queue/redis.php
<?php
return [
'default' => [
'host' => 'redis://192.168.0.1:6379',
'options' => [
'auth' => null, // Password, tipo stringa, parametro opzionale
'db' => 0, // Database
'max_attempts' => 5, // Numero di tentativi di ripetizione dopo il fallimento del consumo
'retry_seconds' => 5, // Intervallo di ripetizione, in secondi
]
],
'other' => [
'host' => 'redis://192.168.0.2:6379',
'options' => [
'auth' => null, // Password, tipo stringa, parametro opzionale
'db' => 0, // Database
'max_attempts' => 5, // Numero di tentativi di ripetizione dopo il fallimento del consumo
'retry_seconds' => 5, // Intervallo di ripetizione, in secondi
]
],
];
Si noti che è stata aggiunta una configurazione redis con la chiave other
.
Invio di messaggi a più redis
// Inviare un messaggio alla coda con chiave `default`
Client::connection('default')->send($queue, $data);
Redis::connection('default')->send($queue, $data);
// Equivalente a
Client::send($queue, $data);
Redis::send($queue, $data);
// Inviare un messaggio alla coda con chiave `other`
Client::connection('other')->send($queue, $data);
Redis::connection('other')->send($queue, $data);
Consumo da più redis
Nel file di configurazione del consumo, inviare messaggi alla coda con chiave other
namespace app\queue\redis;
use Webman\RedisQueue\Consumer;
class SendMail implements Consumer
{
// Nome della coda da consumare
public $queue = 'send-mail';
// === Qui impostato su other, che rappresenta la coda con chiave other nel file di configurazione ===
public $connection = 'other';
// Consumare
public function consume($data)
{
// Non è necessaria la deserializzazione
var_export($data);
}
}
Domande frequenti
Perché appare l'errore Workerman\Redis\Exception: Workerman Redis Wait Timeout (600 seconds)
Questo errore si verifica solo nell'interfaccia di invio asincrona Client::send()
. L'invio asincrono prima salverà i messaggi nella memoria locale, e quando il processo è inattivo, invierà i messaggi a redis. Se la velocità di ricezione di redis è inferiore alla velocità di produzione dei messaggi, oppure il processo è costantemente impegnato in altre attività e non ha tempo sufficiente per sincronizzare i messaggi dalla memoria a redis, si verificherà la compressione dei messaggi. Se la compressione dei messaggi supera i 600 secondi, verrà attivato questo errore.
Soluzione: usa l'interfaccia di invio sincrona Redis::send()
per inviare messaggi.