Redis Queue

Basé sur Redis, la file d'attente de messages prend en charge le traitement différé des messages.

Installation

composer require webman/redis-queue

Fichier de configuration

Le fichier de configuration Redis est généré automatiquement dans {projet-principal}/config/plugin/webman/redis-queue/redis.php, avec un contenu similaire à ceci :

<?php
return [
    'default' => [
        'host' => 'redis://127.0.0.1:6379',
        'options' => [
            'auth' => '',         // Mot de passe, paramètre optionnel
            'db' => 0,            // Base de données
            'max_attempts'  => 5, // Nombre de tentatives en cas d'échec de la consommation
            'retry_seconds' => 5, // Intervalles de réessai, en secondes
        ]
    ],
];

Réessai en cas d'échec de la consommation

Si la consommation échoue (une exception se produit), le message sera placé dans une file d'attente différée, en attente de la prochaine tentative. Le nombre de tentatives est contrôlé par le paramètre max_attempts, tandis que l'intervalle de réessai est contrôlé conjointement par retry_seconds et max_attempts. Par exemple, si max_attempts est de 5 et retry_seconds est de 10, l'intervalle pour la 1ère tentative de réessai sera de 1*10 secondes, l'intervalle pour la 2ème tentative sera de 2*10 secondes, et ainsi de suite jusqu'à 5 tentatives. Si le nombre de tentatives dépasse max_attempts, le message sera placé dans la file d'attente d'échec avec la clé {redis-queue}-failed.

Envoi de messages (synchronisé)

<?php
namespace app\controller;

use support\Request;
use Webman\RedisQueue\Redis;

class Index
{
    public function queue(Request $request)
    {
        // Nom de la file d'attente
        $queue = 'send-mail';
        // Données, vous pouvez directement passer un tableau sans sérialisation
        $data = ['to' => 'tom@gmail.com', 'content' => 'hello'];
        // Envoi du message
        Redis::send($queue, $data);
        // Envoi d'un message différé, le message sera traité après 60 secondes
        Redis::send($queue, $data, 60);

        return response('redis queue test');
    }

}

Un envoi réussi Redis::send() retourne true, sinon il retourne false ou lance une exception.

Astuce
Le temps de consommation de la file d'attente différée peut varier, par exemple si la vitesse de consommation est inférieure à la vitesse de production, cela peut entraîner un engorgement de la file d'attente et donc un retard de consommation. Une solution consiste à lancer plusieurs processus de consommation.

Envoi de messages (asynchrone)

<?php
namespace app\controller;

use support\Request;
use Webman\RedisQueue\Client;

class Index
{
    public function queue(Request $request)
    {
        // Nom de la file d'attente
        $queue = 'send-mail';
        // Données, vous pouvez directement passer un tableau sans sérialisation
        $data = ['to' => 'tom@gmail.com', 'content' => 'hello'];
        // Envoi du message
        Client::send($queue, $data);
        // Envoi d'un message différé, le message sera traité après 60 secondes
        Client::send($queue, $data, 60);

        return response('redis queue test');
    }

}

Client::send() n'a pas de valeur de retour, il appartient à l'envoi asynchrone, il ne garantit pas que le message soit livré à 100 % vers Redis.

Astuce
Le principe de Client::send() est de créer une file d'attente en mémoire locale et de synchroniser les messages vers Redis de manière asynchrone (la vitesse de synchronisation est très rapide, environ 10 000 messages par seconde). Si le processus redémarre et que certaines données de la file d'attente en mémoire ne sont pas synchronisées, cela peut entraîner une perte de messages. L'envoi asynchrone de Client::send() convient aux messages peu importants.

Astuce
Client::send() est asynchrone et ne peut être utilisé que dans l'environnement d'exécution de workerman. Les scripts en ligne de commande doivent utiliser l'interface synchronisée Redis::send().

Envoi de messages vers d'autres projets

Parfois, vous avez besoin d'envoyer des messages depuis d'autres projets sans pouvoir utiliser webman\redis-queue, vous pouvez vous référer à la fonction suivante pour envoyer des messages vers la file d'attente.

function redis_queue_send($redis, $queue, $data, $delay = 0) {
    $queue_waiting = '{redis-queue}-waiting';
    $queue_delay = '{redis-queue}-delayed';
    $now = time();
    $package_str = json_encode([
        'id'       => rand(),
        'time'     => $now,
        'delay'    => $delay,
        'attempts' => 0,
        'queue'    => $queue,
        'data'     => $data
    ]);
    if ($delay) {
        return $redis->zAdd($queue_delay, $now + $delay, $package_str);
    }
    return $redis->lPush($queue_waiting.$queue, $package_str);
}

Parmi eux, le paramètre $redis est une instance de Redis. Par exemple, l'utilisation de l'extension Redis ressemble à ceci :

$redis = new Redis;
$redis->connect('127.0.0.1', 6379);
$queue = 'user-1';
$data= ['some', 'data'];
redis_queue_send($redis, $queue, $data);

Consommation

Le fichier de configuration du processus de consommation se trouve dans {projet-principal}/config/plugin/webman/redis-queue/process.php.
Le répertoire des consommateurs se trouve sous {projet-principal}/app/queue/redis/.

Exécutez la commande php webman redis-queue:consumer my-send-mail pour générer le fichier {projet-principal}/app/queue/redis/MyMailSend.php

Astuce
Si la commande n'existe pas, vous pouvez également générer le fichier manuellement.

<?php

namespace app\queue\redis;

use Webman\RedisQueue\Consumer;

class MyMailSend implements Consumer
{
    // Nom de la file d'attente à consommer
    public $queue = 'send-mail';

    // Nom de la connexion, correspondant à plugin/webman/redis-queue/redis.php
    public $connection = 'default';

    // Consommer
    public function consume($data)
    {
        // Pas besoin de désérialisation
        var_export($data); // Affiche ['to' => 'tom@gmail.com', 'content' => 'hello']
    }
    // Callback en cas d'échec de la consommation
    /* 
    $package = [
        'id' => 1357277951, // ID du message
        'time' => 1709170510, // Temps du message
        'delay' => 0, // Temps de retard
        'attempts' => 2, // Nombre de tentatives de consommation
        'queue' => 'send-mail', // Nom de la file d'attente
        'data' => ['to' => 'tom@gmail.com', 'content' => 'hello'], // Contenu du message
        'max_attempts' => 5, // Nombre maximal de tentatives
        'error' => 'message d\'erreur' // Message d'erreur
    ]
    */
    public function onConsumeFailure(\Throwable $e, $package)
    {
        echo "échec de la consommation\n";
        echo $e->getMessage() . "\n";
        // Pas besoin de désérialisation
        var_export($package); 
    }
}

Remarque
Si aucune exception ni erreur n'est levée durant la consommation, cela est considéré comme une réussite de la consommation, sinon cela sera considéré comme un échec et le message passera dans la file d'attente de réessai.
Redis-queue n'a pas de mécanisme d'accusé de réception, vous pouvez le considérer comme un accusé de réception automatique (aucune exception ou erreur n'a été générée). Si durant la consommation, vous souhaitez marquer le message actuel comme non consommé avec succès, vous pouvez lever une exception manuellement pour que le message actuel entre dans la file d'attente de réessai. Cela n'est en réalité pas différent d'un mécanisme d'accusé de réception.

Astuce
Les consommateurs prennent en charge plusieurs serveurs et plusieurs processus, et un même message ne sera pas consommé plusieurs fois. Les messages déjà consommés seront automatiquement supprimés de la file d'attente, sans nécessiter de suppression manuelle.

Astuce
Les processus de consommation peuvent consommer simultanément plusieurs filets d'attente différents. L'ajout d'une nouvelle file d'attente ne nécessite pas de modification de la configuration dans process.php, il suffit d'ajouter une nouvelle classe Consumer correspondante dans app/queue/redis et de spécifier le nom de la file d'attente à consommer à l'aide de l'attribut de classe $queue.

Astuce
Les utilisateurs Windows doivent exécuter php windows.php pour démarrer webman, sinon le processus de consommation ne démarrera pas.

Astuce
Le callback onConsumeFailure sera déclenché à chaque échec de consommation, vous pouvez y traiter la logique après un échec. (Cette fonctionnalité nécessite webman/redis-queue>=1.3.2 workerman/redis-queue>=1.2.1)

Configurer des processus de consommation différents pour différentes files d'attente

Par défaut, tous les consommateurs partagent le même processus de consommation. Cependant, il est parfois nécessaire d'isoler certaines consommations de files d'attente, par exemple, grouper le traitement des tâches lentes et des tâches rapides dans des ensembles de processus différents. Pour cela, nous pouvons diviser les consommateurs en deux répertoires, par exemple app_path() . '/queue/redis/fast' et app_path() . '/queue/redis/slow' (veillez à mettre à jour le namespace des classes de consommation), la configuration serait comme suit :

return [
    ...d'autres configurations omises...

    'redis_consumer_fast'  => [ // La clé peut être définie comme vous le souhaitez, ici nous l'appelons redis_consumer_fast
        'handler'     => Webman\RedisQueue\Process\Consumer::class,
        'count'       => 8,
        'constructor' => [
            // Répertoire de la classe consommateur
            'consumer_dir' => app_path() . '/queue/redis/fast'
        ]
    ],
    'redis_consumer_slow'  => [  // La clé peut être définie comme vous le souhaitez, ici nous l'appelons redis_consumer_slow
        'handler'     => Webman\RedisQueue\Process\Consumer::class,
        'count'       => 8,
        'constructor' => [
            // Répertoire de la classe consommateur
            'consumer_dir' => app_path() . '/queue/redis/slow'
        ]
    ]
];

Cela permet de placer les consommateurs de tâches rapides dans le répertoire queue/redis/fast et les consommateurs de tâches lentes dans le répertoire queue/redis/slow, atteignant ainsi l'objectif de spécifier des processus de consommation pour les files d'attente.

Configuration multi-redis

Configuration

config/plugin/webman/redis-queue/redis.php

<?php
return [
    'default' => [
        'host' => 'redis://192.168.0.1:6379',
        'options' => [
            'auth' => null,       // Mot de passe, type chaîne, paramètre optionnel
            'db' => 0,            // Base de données
            'max_attempts'  => 5, // Nombre de tentatives en cas d'échec de la consommation
            'retry_seconds' => 5, // Intervalles de réessai, en secondes
        ]
    ],
    'other' => [
        'host' => 'redis://192.168.0.2:6379',
        'options' => [
            'auth' => null,       // Mot de passe, type chaîne, paramètre optionnel
            'db' => 0,            // Base de données
            'max_attempts'  => 5, // Nombre de tentatives en cas d'échec de la consommation
            'retry_seconds' => 5, // Intervalles de réessai, en secondes
        ]
    ],
];

Remarque : une nouvelle configuration Redis a été ajoutée avec other comme clé.

Envoyer des messages vers plusieurs Redis

// Envoyer un message vers la file d'attente avec `default` comme clé
Client::connection('default')->send($queue, $data);
Redis::connection('default')->send($queue, $data);
//  Équivalent à
Client::send($queue, $data);
Redis::send($queue, $data);

// Envoyer un message vers la file d'attente avec `other` comme clé
Client::connection('other')->send($queue, $data);
Redis::connection('other')->send($queue, $data);

Consommer depuis plusieurs Redis

Dans la configuration de consommation, la clé other est utilisée pour la file d'attente :

namespace app\queue\redis;

use Webman\RedisQueue\Consumer;

class SendMail implements Consumer
{
    // Nom de la file d'attente à consommer
    public $queue = 'send-mail';

    // === Ici, définissez comme other, représentant la clé other dans la configuration de consommation ===
    public $connection = 'other';

    // Consommer
    public function consume($data)
    {
        // Pas besoin de désérialisation
        var_export($data);
    }
}

Questions fréquentes

Pourquoi y a-t-il une erreur Workerman\Redis\Exception: Workerman Redis Wait Timeout (600 seconds) ?

Cette erreur ne se produit que dans l'interface d'envoi asynchrone Client::send(). L'envoi asynchrone stocke d'abord les messages dans la mémoire locale, puis les envoie à Redis lorsque le processus est inactif. Si la vitesse de réception de Redis est inférieure à celle de la production de messages, ou si le processus est constamment occupé par d'autres tâches sans avoir assez de temps pour synchroniser les messages en mémoire vers Redis, cela entraînera des engorgements. Si des messages restent en attente de plus de 600 secondes, cette erreur sera déclenchée.

Solution : utilisez l'interface d'envoi synchronisé Redis::send() pour envoyer des messages.