File d'attente Redis
File d'attente de messages basée sur Redis, prenant en charge le traitement différé des messages.
Installation
composer require webman/redis-queue
Fichier de configuration
Le fichier de configuration redis est généré automatiquement dans config/plugin/webman/redis-queue/redis.php
, le contenu est similaire à ceci :
<?php
return [
'default' => [
'host' => 'redis://127.0.0.1:6379',
'options' => [
'auth' => '', // mot de passe, paramètre facultatif
'db' => 0, // base de données
'max_attempts' => 5, // nombre de tentatives de reprise en cas d'échec de la consommation
'retry_seconds' => 5, // intervalle de réessai, en secondes
]
],
];
Réessayer en cas d'échec de la consommation
Si la consommation échoue (une exception se produit), alors le message est placé dans une file d'attente différée, en attente de réessai ultérieur. Le nombre de réessais est contrôlé par le paramètre max_attempts
et l'intervalle de réessai est déterminé par retry_seconds
et max_attempts
ensemble. Par exemple, si max_attempts
est 5 et retry_seconds
est 10, alors l'intervalle de réessai pour la première tentative est de 1*10
secondes, pour la deuxième tentative c'est de 2*10
secondes, pour la troisième tentative c'est de 3*10
secondes, et ainsi de suite jusqu'à 5 réessais. Si le nombre de tentatives de réessai dépasse le paramètre max_attempts
, le message est placé dans la file d'attente des échecs sous la clé {redis-queue}-failed
.
Livraison de messages (synchronisation)
Remarque
Nécessite webman/redis >= 1.2.0, dépend de l'extension Redis
<?php
namespace app\controller;
use support\Request;
use Webman\RedisQueue\Redis;
class Index
{
public function queue(Request $request)
{
// Nom de la file d'attente
$queue = 'send-mail';
// Données, peuvent être transmises directement en tant qu'array, pas besoin de sérialisation
$data = ['to' => 'tom@gmail.com', 'content' => 'hello'];
// Livraison du message
Redis::send($queue, $data);
// Livraison d'un message différé, le message sera traité après 60 secondes
Redis::send($queue, $data, 60);
return response('test de file d'attente Redis');
}
}
La réussite de l'envoi avec Redis::send()
renvoie true, sinon false ou génère une exception.
Astuce
Il peut y avoir un délai dans la consommation de la file d'attente différée, par exemple si la vitesse de consommation est inférieure à la vitesse de production, cela peut entraîner un retard de consommation. Pour atténuer cela, il est conseillé d'ouvrir plusieurs processus de consommation.
Livraison de messages (asynchrone)
<?php
namespace app\controller;
use support\Request;
use Webman\RedisQueue\Client;
class Index
{
public function queue(Request $request)
{
// Nom de la file d'attente
$queue = 'send-mail';
// Données, peuvent être transmises directement en tant qu'array, pas besoin de sérialisation
$data = ['to' => 'tom@gmail.com', 'content' => 'hello'];
// Livraison du message
Client::send($queue, $data);
// Livraison d'un message différé, le message sera traité après 60 secondes
Client::send($queue, $data, 60);
return response('test de file d'attente Redis');
}
}
Client::send()
n'a pas de valeur de retour, il s'agit d'une transmission asynchrone, sans garantie à 100% du transfert du message vers Redis.
Astuce
Le fonctionnement deClient::send()
consiste à créer une file d'attente en mémoire locale et à transmettre les messages de manière asynchrone à Redis (la vitesse de transmission est rapide, environ 10 000 messages par seconde). Si le processus redémarre et que tous les messages de la file d'attente en mémoire locale n'ont pas été transmis, cela entraînera une perte de messages.Client::send()
est donc adapté à la transmission de messages non critiques.Astuce
Client::send()
est asynchrone et ne peut être utilisé que dans l'environnement d'exécution de workerman. Pour les scripts en ligne de commande, veuillez utiliser l'interface synchroneRedis::send()
.
Livraison de messages dans un autre projet
Parfois, il est nécessaire de livrer des messages dans un autre projet et il n'est pas possible d'utiliser webman/redis-queue
. Dans ce cas, vous pouvez vous référer à la fonction suivante pour livrer des messages dans la file d'attente.
function redis_queue_send($redis, $queue, $data, $delay = 0) {
$queue_waiting = '{redis-queue}-waiting';
$queue_delay = '{redis-queue}-delayed';
$now = time();
$package_str = json_encode([
'id' => rand(),
'time' => $now,
'delay' => $delay,
'attempts' => 0,
'queue' => $queue,
'data' => $data
]);
if ($delay) {
return $redis->zAdd($queue_delay, $now + $delay, $package_str);
}
return $redis->lPush($queue_waiting.$queue, $package_str);
}
Dans cette fonction, le paramètre $redis
représente une instance de Redis. Par exemple, l'utilisation de l'extension Redis ressemblerait à ceci :
$redis = new Redis;
$redis->connect('127.0.0.1', 6379);
$queue = 'user-1';
$data= ['some', 'data'];
redis_queue_send($redis, $queue, $data);
Consommation
Le fichier de configuration du processus de consommation se trouve dans config/plugin/webman/redis-queue/process.php
.
Le répertoire du consommateur se trouve sous app/queue/redis/
.
Exécuter la commande php webman redis-queue:consumer my-send-mail
générera le fichier app/queue/redis/MyMailSend.php
.
Astuce
Si la commande n'existe pas, vous pouvez également le générer manuellement.
<?php
namespace app\queue\redis;
use Webman\RedisQueue\Consumer;
class MyMailSend implements Consumer
{
// Nom de la file d'attente à consommer
public $queue = 'send-mail';
// Nom de la connexion, correspondant à la connexion dans le fichier `plugin/webman/redis-queue/redis.php`
public $connection = 'default';
// Consommation
public function consume($data)
{
// Pas besoin de désérialiser
var_export($data); // Sortie ['to' => 'tom@gmail.com', 'content' => 'hello']
}
}
Remarque
Tout processus de consommation qui ne génère ni exception ni erreur est considéré comme une consommation réussie. Sinon, la consommation échoue et le message est placé dans la file d'attente de réessai.
Redis-queue n'a pas de mécanisme d'acknowledgement (confirmation de réception), vous pouvez le considérer comme un ack automatique (en l'absence d'exception ou d'erreur). Si vous souhaitez marquer un message comme n'ayant pas été consommé avec succès pendant la consommation, vous pouvez jeter manuellement une exception pour remettre le message dans la file d'attente de réessai. Cela équivaut en réalité au mécanisme d'acknowledgement.
Le processus de consommation prend en charge la consommation multi-serveurs multi-processus et le même message ne sera pas consommé à plusieurs reprises. Les messages consommés sont automatiquement supprimés de la file d'attente et n'ont pas besoin d'être supprimés manuellement.
Les processus de consommation peuvent consommer simultanément plusieurs files d'attente différentes, l'ajout de nouvelles files d'attente ne nécessite pas de modification de la configuration dansprocess.php
, il suffit d'ajouter une classeConsumer
correspondante dansapp/queue/redis
en spécifiant la propriété de classe$queue
pour indiquer la file d'attente à consommer.Astuce
Les utilisateurs de Windows doivent exécuterphp windows.php
pour démarrer webman, sinon les processus de consommation ne seront pas démarrés.
Configuration de processus de consommation différente pour différentes files d'attente
Par défaut, tous les consommateurs partagent le même processus de consommation. Cependant, parfois nous devons séparer la consommation de certaines files d'attente, par exemple, pour mettre les affaires lentes dans un groupe de processus de consommation et les affaires rapides dans un autre groupe. Pour ce faire, nous pouvons diviser les consommateurs en deux répertoires, par exemple app_path() . '/queue/redis/fast'
et app_path() . '/queue/redis/slow'
(notez que l'espace de noms de la classe de consommation doit être modifié en conséquence), puis configurer comme suit :
return [
... autres configurations ici...
'redis_consumer_fast' => [
'handler' => Webman\RedisQueue\Process\Consumer::class,
'count' => 8,
'constructor' => [
// Répertoire de classe de consommateur
'consumer_dir' => app_path() . '/queue/redis/fast'
]
],
'redis_consumer_slow' => [
'handler' => Webman\RedisQueue\Process\Consumer::class,
'count' => 8,
'constructor' => [
// Répertoire de classe de consommateur
'consumer_dir' => app_path() . '/queue/redis/slow'
]
]
];
Grâce à la classification des répertoires et aux configurations appropriées, nous pouvons facilement définir des processus de consommation différents pour différents consommateurs.
Configuration multi-redis
Configuration
config/plugin/webman/redis-queue/redis.php
<?php
return [
'default' => [
'host' => 'redis://192.168.0.1:6379',
'options' => [
'auth' => null, // Password, string type, optional parameter
'db' => 0, // Database
'max_attempts' => 5, // Retry times after consumption failure
'retry_seconds' => 5, // Retry interval, in seconds
]
],
'other' => [
'host' => 'redis://192.168.0.2:6379',
'options' => [
'auth' => null, // Password, string type, optional parameter
'db' => 0, // Database
'max_attempts' => 5, // Retry times after consumption failure
'retry_seconds' => 5, // Retry interval, in seconds
]
],
];
Note that a other
redis configuration has been added to the configuration.
Multi-redis message delivery
// Deliver messages to the queue with key 'default'
Client::connection('default')->send($queue, $data);
Redis::connection('default')->send($queue, $data);
// Same as
Client::send($queue, $data);
Redis::send($queue, $data);
// Deliver messages to the queue with key 'other'
Client::connection('other')->send($queue, $data);
Redis::connection('other')->send($queue, $data);
Multi-redis consumption
Consume messages from the queue with key other
in the consumption configuration
namespace app\queue\redis;
use Webman\RedisQueue\Consumer;
class SendMail implements Consumer
{
// Queue name to consume
public $queue = 'send-mail';
// === Set this to 'other' to consume from the queue with key 'other' in the consumption configuration ===
public $connection = 'other';
// Consume
public function consume($data)
{
// No need for deserialization
var_export($data);
}
}
Frequently Asked Questions
Why do I get the error Workerman\Redis\Exception: Workerman Redis Wait Timeout (600 seconds)
This error only exists in the asynchronous delivery interface Client::send()
. Asynchronous delivery first saves the message in local memory and sends it to redis when the process is idle. If the redis receiving speed is slower than the message production speed, or if the process is busy with other tasks and does not have enough time to synchronize the memory messages to redis, it can cause message congestion. If there is a message congestion for more than 600 seconds, this error will be triggered.
Solution: Use the synchronous delivery interface Redis::send()
for message delivery.