Redis Queue
Basé sur Redis, la file d'attente de messages prend en charge le traitement différé des messages.
Installation
composer require webman/redis-queue
Fichier de configuration
Le fichier de configuration Redis est généré automatiquement dans {projet-principal}/config/plugin/webman/redis-queue/redis.php
, avec un contenu similaire à ceci :
<?php
return [
'default' => [
'host' => 'redis://127.0.0.1:6379',
'options' => [
'auth' => '', // Mot de passe, paramètre optionnel
'db' => 0, // Base de données
'max_attempts' => 5, // Nombre de tentatives en cas d'échec de la consommation
'retry_seconds' => 5, // Intervalles de réessai, en secondes
]
],
];
Réessai en cas d'échec de la consommation
Si la consommation échoue (une exception se produit), le message sera placé dans une file d'attente différée, en attente de la prochaine tentative. Le nombre de tentatives est contrôlé par le paramètre max_attempts
, tandis que l'intervalle de réessai est contrôlé conjointement par retry_seconds
et max_attempts
. Par exemple, si max_attempts
est de 5 et retry_seconds
est de 10, l'intervalle pour la 1ère tentative de réessai sera de 1*10
secondes, l'intervalle pour la 2ème tentative sera de 2*10
secondes, et ainsi de suite jusqu'à 5 tentatives. Si le nombre de tentatives dépasse max_attempts
, le message sera placé dans la file d'attente d'échec avec la clé {redis-queue}-failed
.
Envoi de messages (synchronisé)
<?php
namespace app\controller;
use support\Request;
use Webman\RedisQueue\Redis;
class Index
{
public function queue(Request $request)
{
// Nom de la file d'attente
$queue = 'send-mail';
// Données, vous pouvez directement passer un tableau sans sérialisation
$data = ['to' => 'tom@gmail.com', 'content' => 'hello'];
// Envoi du message
Redis::send($queue, $data);
// Envoi d'un message différé, le message sera traité après 60 secondes
Redis::send($queue, $data, 60);
return response('redis queue test');
}
}
Un envoi réussi Redis::send()
retourne true, sinon il retourne false ou lance une exception.
Astuce
Le temps de consommation de la file d'attente différée peut varier, par exemple si la vitesse de consommation est inférieure à la vitesse de production, cela peut entraîner un engorgement de la file d'attente et donc un retard de consommation. Une solution consiste à lancer plusieurs processus de consommation.
Envoi de messages (asynchrone)
<?php
namespace app\controller;
use support\Request;
use Webman\RedisQueue\Client;
class Index
{
public function queue(Request $request)
{
// Nom de la file d'attente
$queue = 'send-mail';
// Données, vous pouvez directement passer un tableau sans sérialisation
$data = ['to' => 'tom@gmail.com', 'content' => 'hello'];
// Envoi du message
Client::send($queue, $data);
// Envoi d'un message différé, le message sera traité après 60 secondes
Client::send($queue, $data, 60);
return response('redis queue test');
}
}
Client::send()
n'a pas de valeur de retour, il appartient à l'envoi asynchrone, il ne garantit pas que le message soit livré à 100 % vers Redis.
Astuce
Le principe deClient::send()
est de créer une file d'attente en mémoire locale et de synchroniser les messages vers Redis de manière asynchrone (la vitesse de synchronisation est très rapide, environ 10 000 messages par seconde). Si le processus redémarre et que certaines données de la file d'attente en mémoire ne sont pas synchronisées, cela peut entraîner une perte de messages. L'envoi asynchrone deClient::send()
convient aux messages peu importants.Astuce
Client::send()
est asynchrone et ne peut être utilisé que dans l'environnement d'exécution de workerman. Les scripts en ligne de commande doivent utiliser l'interface synchroniséeRedis::send()
.
Envoi de messages vers d'autres projets
Parfois, vous avez besoin d'envoyer des messages depuis d'autres projets sans pouvoir utiliser webman\redis-queue
, vous pouvez vous référer à la fonction suivante pour envoyer des messages vers la file d'attente.
function redis_queue_send($redis, $queue, $data, $delay = 0) {
$queue_waiting = '{redis-queue}-waiting';
$queue_delay = '{redis-queue}-delayed';
$now = time();
$package_str = json_encode([
'id' => rand(),
'time' => $now,
'delay' => $delay,
'attempts' => 0,
'queue' => $queue,
'data' => $data
]);
if ($delay) {
return $redis->zAdd($queue_delay, $now + $delay, $package_str);
}
return $redis->lPush($queue_waiting.$queue, $package_str);
}
Parmi eux, le paramètre $redis
est une instance de Redis. Par exemple, l'utilisation de l'extension Redis ressemble à ceci :
$redis = new Redis;
$redis->connect('127.0.0.1', 6379);
$queue = 'user-1';
$data= ['some', 'data'];
redis_queue_send($redis, $queue, $data);
Consommation
Le fichier de configuration du processus de consommation se trouve dans {projet-principal}/config/plugin/webman/redis-queue/process.php
.
Le répertoire des consommateurs se trouve sous {projet-principal}/app/queue/redis/
.
Exécutez la commande php webman redis-queue:consumer my-send-mail
pour générer le fichier {projet-principal}/app/queue/redis/MyMailSend.php
Astuce
Si la commande n'existe pas, vous pouvez également générer le fichier manuellement.
<?php
namespace app\queue\redis;
use Webman\RedisQueue\Consumer;
class MyMailSend implements Consumer
{
// Nom de la file d'attente à consommer
public $queue = 'send-mail';
// Nom de la connexion, correspondant à plugin/webman/redis-queue/redis.php
public $connection = 'default';
// Consommer
public function consume($data)
{
// Pas besoin de désérialisation
var_export($data); // Affiche ['to' => 'tom@gmail.com', 'content' => 'hello']
}
// Callback en cas d'échec de la consommation
/*
$package = [
'id' => 1357277951, // ID du message
'time' => 1709170510, // Temps du message
'delay' => 0, // Temps de retard
'attempts' => 2, // Nombre de tentatives de consommation
'queue' => 'send-mail', // Nom de la file d'attente
'data' => ['to' => 'tom@gmail.com', 'content' => 'hello'], // Contenu du message
'max_attempts' => 5, // Nombre maximal de tentatives
'error' => 'message d\'erreur' // Message d'erreur
]
*/
public function onConsumeFailure(\Throwable $e, $package)
{
echo "échec de la consommation\n";
echo $e->getMessage() . "\n";
// Pas besoin de désérialisation
var_export($package);
}
}
Remarque
Si aucune exception ni erreur n'est levée durant la consommation, cela est considéré comme une réussite de la consommation, sinon cela sera considéré comme un échec et le message passera dans la file d'attente de réessai.
Redis-queue n'a pas de mécanisme d'accusé de réception, vous pouvez le considérer comme un accusé de réception automatique (aucune exception ou erreur n'a été générée). Si durant la consommation, vous souhaitez marquer le message actuel comme non consommé avec succès, vous pouvez lever une exception manuellement pour que le message actuel entre dans la file d'attente de réessai. Cela n'est en réalité pas différent d'un mécanisme d'accusé de réception.Astuce
Les consommateurs prennent en charge plusieurs serveurs et plusieurs processus, et un même message ne sera pas consommé plusieurs fois. Les messages déjà consommés seront automatiquement supprimés de la file d'attente, sans nécessiter de suppression manuelle.Astuce
Les processus de consommation peuvent consommer simultanément plusieurs filets d'attente différents. L'ajout d'une nouvelle file d'attente ne nécessite pas de modification de la configuration dansprocess.php
, il suffit d'ajouter une nouvelle classeConsumer
correspondante dansapp/queue/redis
et de spécifier le nom de la file d'attente à consommer à l'aide de l'attribut de classe$queue
.Astuce
Les utilisateurs Windows doivent exécuterphp windows.php
pour démarrer webman, sinon le processus de consommation ne démarrera pas.Astuce
Le callbackonConsumeFailure
sera déclenché à chaque échec de consommation, vous pouvez y traiter la logique après un échec. (Cette fonctionnalité nécessitewebman/redis-queue>=1.3.2
workerman/redis-queue>=1.2.1
)
Configurer des processus de consommation différents pour différentes files d'attente
Par défaut, tous les consommateurs partagent le même processus de consommation. Cependant, il est parfois nécessaire d'isoler certaines consommations de files d'attente, par exemple, grouper le traitement des tâches lentes et des tâches rapides dans des ensembles de processus différents. Pour cela, nous pouvons diviser les consommateurs en deux répertoires, par exemple app_path() . '/queue/redis/fast'
et app_path() . '/queue/redis/slow'
(veillez à mettre à jour le namespace des classes de consommation), la configuration serait comme suit :
return [
...d'autres configurations omises...
'redis_consumer_fast' => [ // La clé peut être définie comme vous le souhaitez, ici nous l'appelons redis_consumer_fast
'handler' => Webman\RedisQueue\Process\Consumer::class,
'count' => 8,
'constructor' => [
// Répertoire de la classe consommateur
'consumer_dir' => app_path() . '/queue/redis/fast'
]
],
'redis_consumer_slow' => [ // La clé peut être définie comme vous le souhaitez, ici nous l'appelons redis_consumer_slow
'handler' => Webman\RedisQueue\Process\Consumer::class,
'count' => 8,
'constructor' => [
// Répertoire de la classe consommateur
'consumer_dir' => app_path() . '/queue/redis/slow'
]
]
];
Cela permet de placer les consommateurs de tâches rapides dans le répertoire queue/redis/fast
et les consommateurs de tâches lentes dans le répertoire queue/redis/slow
, atteignant ainsi l'objectif de spécifier des processus de consommation pour les files d'attente.
Configuration multi-redis
Configuration
config/plugin/webman/redis-queue/redis.php
<?php
return [
'default' => [
'host' => 'redis://192.168.0.1:6379',
'options' => [
'auth' => null, // Mot de passe, type chaîne, paramètre optionnel
'db' => 0, // Base de données
'max_attempts' => 5, // Nombre de tentatives en cas d'échec de la consommation
'retry_seconds' => 5, // Intervalles de réessai, en secondes
]
],
'other' => [
'host' => 'redis://192.168.0.2:6379',
'options' => [
'auth' => null, // Mot de passe, type chaîne, paramètre optionnel
'db' => 0, // Base de données
'max_attempts' => 5, // Nombre de tentatives en cas d'échec de la consommation
'retry_seconds' => 5, // Intervalles de réessai, en secondes
]
],
];
Remarque : une nouvelle configuration Redis a été ajoutée avec other
comme clé.
Envoyer des messages vers plusieurs Redis
// Envoyer un message vers la file d'attente avec `default` comme clé
Client::connection('default')->send($queue, $data);
Redis::connection('default')->send($queue, $data);
// Équivalent à
Client::send($queue, $data);
Redis::send($queue, $data);
// Envoyer un message vers la file d'attente avec `other` comme clé
Client::connection('other')->send($queue, $data);
Redis::connection('other')->send($queue, $data);
Consommer depuis plusieurs Redis
Dans la configuration de consommation, la clé other
est utilisée pour la file d'attente :
namespace app\queue\redis;
use Webman\RedisQueue\Consumer;
class SendMail implements Consumer
{
// Nom de la file d'attente à consommer
public $queue = 'send-mail';
// === Ici, définissez comme other, représentant la clé other dans la configuration de consommation ===
public $connection = 'other';
// Consommer
public function consume($data)
{
// Pas besoin de désérialisation
var_export($data);
}
}
Questions fréquentes
Pourquoi y a-t-il une erreur Workerman\Redis\Exception: Workerman Redis Wait Timeout (600 seconds)
?
Cette erreur ne se produit que dans l'interface d'envoi asynchrone Client::send()
. L'envoi asynchrone stocke d'abord les messages dans la mémoire locale, puis les envoie à Redis lorsque le processus est inactif. Si la vitesse de réception de Redis est inférieure à celle de la production de messages, ou si le processus est constamment occupé par d'autres tâches sans avoir assez de temps pour synchroniser les messages en mémoire vers Redis, cela entraînera des engorgements. Si des messages restent en attente de plus de 600 secondes, cette erreur sera déclenchée.
Solution : utilisez l'interface d'envoi synchronisé Redis::send()
pour envoyer des messages.